diff options
author | EuAndreh <eu@euandre.org> | 2025-02-13 18:12:09 -0300 |
---|---|---|
committer | EuAndreh <eu@euandre.org> | 2025-02-13 18:12:09 -0300 |
commit | a7d29c7137bb9b41265725670b547bec29583e0e (patch) | |
tree | 869f4c831706829d228b54476a6548c3db8338c9 | |
parent | src/dedo.go: Include CreateBucketIfNotExists() in BucketI (diff) | |
download | dedo-a7d29c7137bb9b41265725670b547bec29583e0e.tar.gz dedo-a7d29c7137bb9b41265725670b547bec29583e0e.tar.xz |
src/dedo.go: Evolve implementation of inMemoryDatabaseI
-rw-r--r-- | src/dedo.go | 303 |
1 files changed, 246 insertions, 57 deletions
diff --git a/src/dedo.go b/src/dedo.go index 0541bab..0125690 100644 --- a/src/dedo.go +++ b/src/dedo.go @@ -132,6 +132,29 @@ type transactionT struct{ commitHandlers []func() } +type inMemoryCursorT struct{ + bucket *inMemoryValueT + iterator *pds.MapIterator[[]byte, *inMemoryValueT] +} + +type inMemoryValueT struct{ + isBucket bool + tx *inMemoryTxT + value []byte + data *pds.Map[[]byte, *inMemoryValueT] +} + +type inMemoryTxT struct{ + commitHandlers []func() + root *inMemoryValueT + writable bool +} + +type inMemoryDatabaseT struct{ + root *pds.Map[[]byte, *inMemoryValueT] + rwmutex sync.RWMutex +} + type pgid uint64 /// bucket represents the on-file representation of a bucket. This is stored as @@ -177,16 +200,6 @@ type elemRef struct { index int } -type inMemoryTx struct{ - commitHandlers []func() - db *InMemory - writable bool -} - -type InMemory struct{ - *pds.Map[[]byte, *bucketT] -} - /// databaseT represents a collection of buckets persisted to a file on disk. All data /// access is performed through transactions which can be obtained through the /// databaseT. All the functions on databaseT will return a ErrDatabaseNotOpen if accessed @@ -901,10 +914,7 @@ func bytesLE(num uint64) []byte { } func bucketNextID(b *bucketT) []byte { - id, err := b.NextSequence() - if err != nil { - panic(err) - } + id := g.Must(b.NextSequence()) return bytesLE(id) } @@ -1761,107 +1771,286 @@ func initDB(db *databaseT, size int64) error { return nil } -/* -func (tx *inMemoryTx) Bucket(name []byte) *bucketT { - bucket, _ := tx.db.Get(name) - return bucket +func (cursor *inMemoryCursorT) First() ([]byte, []byte) { + return nil, nil // FIXME } -func (bucket *bucketT) value() []byte { - return nil +func (cursor *inMemoryCursorT) Last() ([]byte, []byte) { + return nil, nil // FIXME } -// FIXME: split Tx/Snapshot -func (tx *inMemoryTx) CreateBucket(name []byte) (*bucketT, error) { - if tx.db == nil { +func (cursor *inMemoryCursorT) Next() ([]byte, []byte) { + return nil, nil // FIXME +} + +func (cursor *inMemoryCursorT) Prev() ([]byte, []byte) { + return nil, nil // FIXME +} + +func (cursor *inMemoryCursorT) Seek([]byte) ([]byte, []byte) { + return nil, nil // FIXME +} + +func (cursor *inMemoryCursorT) Delete() error { + return nil // FIXME +} + +func (bucket *inMemoryValueT) Bucket(name []byte) (BucketI, error) { + if bucket.tx.root == nil { return nil, ErrTxClosed } - if len(name) == 0 { + value, ok := bucket.data.Get(name) + if !ok { + return nil, ErrBucketNotFound + } + + if !value.isBucket { + return nil, ErrBucketBadFlag + } + + return value, nil +} + +func (bucket *inMemoryValueT) CreateBucket(name []byte) (BucketI, error) { + if bucket.tx.root == nil { + return nil, ErrTxClosed + } else if !bucket.tx.writable { + return nil, ErrTxNotWritable + } else if len(name) == 0 { return nil, ErrBucketNameRequired } - _, ok := tx.db.Get(name) - if ok { + _, err := bucket.Bucket(name) + if err == nil { return nil, ErrBucketExists } - bucket := &bucketT{} - tx.db.set(name, bucket) - return bucket, nil + newBucket := &inMemoryValueT{ + isBucket: true, + tx: bucket.tx, + value: nil, + data: pds.NewMap[[]byte, *inMemoryValueT](nil), + } + bucket.data = bucket.data.Set(name, newBucket) + + return newBucket, nil } -func (tx *inMemoryTx) CreateBucketIfNotExists(name []byte) (*bucketT, error) { - bucket, err := tx.CreateBucket(name) +func (bucket *inMemoryValueT) CreateBucketIfNotExists(name []byte) (BucketI, error) { + if bucket.tx.root == nil { + return nil, ErrTxClosed + } else if !bucket.tx.writable { + return nil, ErrTxNotWritable + } + + newBucket, err := bucket.CreateBucket(name) if err == ErrBucketExists { - return bucket.Bucket(name), nil + return bucket.Bucket(name) } else if err != nil { return nil, err } - return bucket, nil + return newBucket, nil } -func (tx *inMemoryTx) Cursor() *cursorT { +func (bucket *inMemoryValueT) Delete(name []byte) error { + if bucket.tx.root == nil { + return ErrTxClosed + } else if !bucket.tx.writable { + return ErrTxNotWritable + } + + bucket.data = bucket.data.Delete(name) return nil } -func (tx *inMemoryTx) DeleteBucket(name []byte) error { +func (bucket *inMemoryValueT) DeleteBucket(name []byte) error { + if bucket.tx.root == nil { + return ErrTxClosed + } else if !bucket.tx.writable { + return ErrTxNotWritable + } + + bucket.data = bucket.data.Delete(name) return nil } -func (tx *inMemoryTx) ForEach(fn func([]byte, *bucketT) error) error { +func (bucket *inMemoryValueT) Cursor() CursorI { + return &inMemoryCursorT{ + bucket: bucket, + iterator: bucket.data.Iterator(), + } +} + +func (bucket *inMemoryValueT) ForEach(fn func([]byte, []byte) error) error { + if bucket.tx.root == nil { + return ErrTxClosed + } + + cursor := bucket.Cursor() + for key, value := cursor.First(); key != nil; key, value = cursor.Next() { + err := fn(key, value) + if err != nil { + return err + } + } return nil } -func (tx *inMemoryTx) OnCommit(fn func()) { - tx.commitHandlers = append(tx.commitHandlers, fn) +func (bucket *inMemoryValueT) Get(key []byte) []byte { + value, ok := bucket.data.Get(key) + if !ok { + return nil + } + + return value.value } -func (tx *inMemoryTx) Check() <-chan error { +func (bucket *inMemoryValueT) Put(key []byte, value []byte) error { + if bucket.tx.root == nil { + return ErrTxClosed + } else if !bucket.tx.writable { + return ErrTxNotWritable + } + + newValue := &inMemoryValueT{ + isBucket: false, + tx: nil, + value: value, + data: nil, + } + bucket.data = bucket.data.Set(key, newValue) + return nil } -func (tx *inMemoryTx) WriteTo(io.Writer) (int64, error) { - return 0, nil +func (bucket *inMemoryValueT) NextID() []byte { + id := g.Must(bucket.NextSequence()) + return bytesLE(id) +} + +func (bucket *inMemoryValueT) NextSequence() (uint64, error) { + if bucket.tx.root == nil { + return 0, ErrTxClosed + } else if !bucket.tx.writable { + return 0, ErrTxNotWritable + } + + return 0, nil // FIXME +} + +func (tx *inMemoryTxT) Bucket(name []byte) (BucketI, error) { + if tx.root == nil { + return nil, ErrTxClosed + } + + return tx.root.Bucket(name) +} + +func (tx *inMemoryTxT) CreateBucket(name []byte) (BucketI, error) { + // return tx.root.CreateBucket(name) + return nil, nil // FIXME +} + +func (tx *inMemoryTxT) CreateBucketIfNotExists(name []byte) (BucketI, error) { + return nil, nil // FIXME } -func (m *InMemory) set(key []byte, value *bucketT) { - m = &InMemory{m.Set(key, value)} +func (tx *inMemoryTxT) Cursor() CursorI { + return nil // FIXME } -func (m *InMemory) Close() error { +func (tx *inMemoryTxT) DeleteBucket(name []byte) error { + if tx.root == nil { + return ErrTxClosed + } + + return tx.root.DeleteBucket(name) +} + +func (tx *inMemoryTxT) ForEach(fn func([]byte, BucketI) error) error { + return nil // FIXME +} + +func (tx *inMemoryTxT) OnCommit(fn func()) { + tx.commitHandlers = append(tx.commitHandlers, fn) +} + +func (tx *inMemoryTxT) Check() <-chan error { + ch := make(chan error) + close(ch) + return ch +} + +func (tx *inMemoryTxT) WriteTo(io.Writer) (int64, error) { + return 0, nil // FIXME +} + +func (db *inMemoryDatabaseT) Close() error { + db.root = nil return nil } -func (m *InMemory) Path() string { - return "" +func (db *inMemoryDatabaseT) Path() string { + return "" // FIXME } -func (m *InMemory) Update(fn func(TransactionI) error) error { - tx := &inMemoryTx{ - db: m, +func (db *inMemoryDatabaseT) Update(fn func(TransactionI) error) error { + db.rwmutex.Lock() + defer db.rwmutex.Unlock() + + tx := &inMemoryTxT{ + root: &inMemoryValueT{ + isBucket: true, + value: nil, + data: db.root, + }, commitHandlers: []func(){}, - writable: true, + writable: true, } + tx.root.tx = tx + err := fn(tx) + if err != nil { + tx.root = nil + return err + } + + // commit + db.root = tx.root.data - // Execute commit handlers now that the locks have been removed. for _, fn := range tx.commitHandlers { - fn() + go fn() } - return err + return nil } -func (m *InMemory) View(func(SnapshotI) error) error { - return nil +func (db *inMemoryDatabaseT) View(fn func(SnapshotI) error) error { + db.rwmutex.RLock() + defer db.rwmutex.RUnlock() + + tx := &inMemoryTxT{ + root: &inMemoryValueT{ + isBucket: true, + value: nil, + data: db.root, + }, + commitHandlers: nil, + writable: false, + } + tx.root.tx = tx + return fn(tx) } func OpenMemory() DatabaseI { - return &InMemory{pds.NewMap[[]byte, *bucketT](nil)} + var rwmutex sync.RWMutex + return &inMemoryDatabaseT{ + root: pds.NewMap[[]byte, *inMemoryValueT](nil), + rwmutex: rwmutex, + } } -*/ /// Open creates and opens a database at the given path. If the file does not /// exist then it will be created automatically. |