aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorEuAndreh <eu@euandre.org>2025-02-13 18:12:09 -0300
committerEuAndreh <eu@euandre.org>2025-02-13 18:12:09 -0300
commita7d29c7137bb9b41265725670b547bec29583e0e (patch)
tree869f4c831706829d228b54476a6548c3db8338c9
parentsrc/dedo.go: Include CreateBucketIfNotExists() in BucketI (diff)
downloaddedo-a7d29c7137bb9b41265725670b547bec29583e0e.tar.gz
dedo-a7d29c7137bb9b41265725670b547bec29583e0e.tar.xz
src/dedo.go: Evolve implementation of inMemoryDatabaseI
-rw-r--r--src/dedo.go303
1 files changed, 246 insertions, 57 deletions
diff --git a/src/dedo.go b/src/dedo.go
index 0541bab..0125690 100644
--- a/src/dedo.go
+++ b/src/dedo.go
@@ -132,6 +132,29 @@ type transactionT struct{
commitHandlers []func()
}
+type inMemoryCursorT struct{
+ bucket *inMemoryValueT
+ iterator *pds.MapIterator[[]byte, *inMemoryValueT]
+}
+
+type inMemoryValueT struct{
+ isBucket bool
+ tx *inMemoryTxT
+ value []byte
+ data *pds.Map[[]byte, *inMemoryValueT]
+}
+
+type inMemoryTxT struct{
+ commitHandlers []func()
+ root *inMemoryValueT
+ writable bool
+}
+
+type inMemoryDatabaseT struct{
+ root *pds.Map[[]byte, *inMemoryValueT]
+ rwmutex sync.RWMutex
+}
+
type pgid uint64
/// bucket represents the on-file representation of a bucket. This is stored as
@@ -177,16 +200,6 @@ type elemRef struct {
index int
}
-type inMemoryTx struct{
- commitHandlers []func()
- db *InMemory
- writable bool
-}
-
-type InMemory struct{
- *pds.Map[[]byte, *bucketT]
-}
-
/// databaseT represents a collection of buckets persisted to a file on disk. All data
/// access is performed through transactions which can be obtained through the
/// databaseT. All the functions on databaseT will return a ErrDatabaseNotOpen if accessed
@@ -901,10 +914,7 @@ func bytesLE(num uint64) []byte {
}
func bucketNextID(b *bucketT) []byte {
- id, err := b.NextSequence()
- if err != nil {
- panic(err)
- }
+ id := g.Must(b.NextSequence())
return bytesLE(id)
}
@@ -1761,107 +1771,286 @@ func initDB(db *databaseT, size int64) error {
return nil
}
-/*
-func (tx *inMemoryTx) Bucket(name []byte) *bucketT {
- bucket, _ := tx.db.Get(name)
- return bucket
+func (cursor *inMemoryCursorT) First() ([]byte, []byte) {
+ return nil, nil // FIXME
}
-func (bucket *bucketT) value() []byte {
- return nil
+func (cursor *inMemoryCursorT) Last() ([]byte, []byte) {
+ return nil, nil // FIXME
}
-// FIXME: split Tx/Snapshot
-func (tx *inMemoryTx) CreateBucket(name []byte) (*bucketT, error) {
- if tx.db == nil {
+func (cursor *inMemoryCursorT) Next() ([]byte, []byte) {
+ return nil, nil // FIXME
+}
+
+func (cursor *inMemoryCursorT) Prev() ([]byte, []byte) {
+ return nil, nil // FIXME
+}
+
+func (cursor *inMemoryCursorT) Seek([]byte) ([]byte, []byte) {
+ return nil, nil // FIXME
+}
+
+func (cursor *inMemoryCursorT) Delete() error {
+ return nil // FIXME
+}
+
+func (bucket *inMemoryValueT) Bucket(name []byte) (BucketI, error) {
+ if bucket.tx.root == nil {
return nil, ErrTxClosed
}
- if len(name) == 0 {
+ value, ok := bucket.data.Get(name)
+ if !ok {
+ return nil, ErrBucketNotFound
+ }
+
+ if !value.isBucket {
+ return nil, ErrBucketBadFlag
+ }
+
+ return value, nil
+}
+
+func (bucket *inMemoryValueT) CreateBucket(name []byte) (BucketI, error) {
+ if bucket.tx.root == nil {
+ return nil, ErrTxClosed
+ } else if !bucket.tx.writable {
+ return nil, ErrTxNotWritable
+ } else if len(name) == 0 {
return nil, ErrBucketNameRequired
}
- _, ok := tx.db.Get(name)
- if ok {
+ _, err := bucket.Bucket(name)
+ if err == nil {
return nil, ErrBucketExists
}
- bucket := &bucketT{}
- tx.db.set(name, bucket)
- return bucket, nil
+ newBucket := &inMemoryValueT{
+ isBucket: true,
+ tx: bucket.tx,
+ value: nil,
+ data: pds.NewMap[[]byte, *inMemoryValueT](nil),
+ }
+ bucket.data = bucket.data.Set(name, newBucket)
+
+ return newBucket, nil
}
-func (tx *inMemoryTx) CreateBucketIfNotExists(name []byte) (*bucketT, error) {
- bucket, err := tx.CreateBucket(name)
+func (bucket *inMemoryValueT) CreateBucketIfNotExists(name []byte) (BucketI, error) {
+ if bucket.tx.root == nil {
+ return nil, ErrTxClosed
+ } else if !bucket.tx.writable {
+ return nil, ErrTxNotWritable
+ }
+
+ newBucket, err := bucket.CreateBucket(name)
if err == ErrBucketExists {
- return bucket.Bucket(name), nil
+ return bucket.Bucket(name)
} else if err != nil {
return nil, err
}
- return bucket, nil
+ return newBucket, nil
}
-func (tx *inMemoryTx) Cursor() *cursorT {
+func (bucket *inMemoryValueT) Delete(name []byte) error {
+ if bucket.tx.root == nil {
+ return ErrTxClosed
+ } else if !bucket.tx.writable {
+ return ErrTxNotWritable
+ }
+
+ bucket.data = bucket.data.Delete(name)
return nil
}
-func (tx *inMemoryTx) DeleteBucket(name []byte) error {
+func (bucket *inMemoryValueT) DeleteBucket(name []byte) error {
+ if bucket.tx.root == nil {
+ return ErrTxClosed
+ } else if !bucket.tx.writable {
+ return ErrTxNotWritable
+ }
+
+ bucket.data = bucket.data.Delete(name)
return nil
}
-func (tx *inMemoryTx) ForEach(fn func([]byte, *bucketT) error) error {
+func (bucket *inMemoryValueT) Cursor() CursorI {
+ return &inMemoryCursorT{
+ bucket: bucket,
+ iterator: bucket.data.Iterator(),
+ }
+}
+
+func (bucket *inMemoryValueT) ForEach(fn func([]byte, []byte) error) error {
+ if bucket.tx.root == nil {
+ return ErrTxClosed
+ }
+
+ cursor := bucket.Cursor()
+ for key, value := cursor.First(); key != nil; key, value = cursor.Next() {
+ err := fn(key, value)
+ if err != nil {
+ return err
+ }
+ }
return nil
}
-func (tx *inMemoryTx) OnCommit(fn func()) {
- tx.commitHandlers = append(tx.commitHandlers, fn)
+func (bucket *inMemoryValueT) Get(key []byte) []byte {
+ value, ok := bucket.data.Get(key)
+ if !ok {
+ return nil
+ }
+
+ return value.value
}
-func (tx *inMemoryTx) Check() <-chan error {
+func (bucket *inMemoryValueT) Put(key []byte, value []byte) error {
+ if bucket.tx.root == nil {
+ return ErrTxClosed
+ } else if !bucket.tx.writable {
+ return ErrTxNotWritable
+ }
+
+ newValue := &inMemoryValueT{
+ isBucket: false,
+ tx: nil,
+ value: value,
+ data: nil,
+ }
+ bucket.data = bucket.data.Set(key, newValue)
+
return nil
}
-func (tx *inMemoryTx) WriteTo(io.Writer) (int64, error) {
- return 0, nil
+func (bucket *inMemoryValueT) NextID() []byte {
+ id := g.Must(bucket.NextSequence())
+ return bytesLE(id)
+}
+
+func (bucket *inMemoryValueT) NextSequence() (uint64, error) {
+ if bucket.tx.root == nil {
+ return 0, ErrTxClosed
+ } else if !bucket.tx.writable {
+ return 0, ErrTxNotWritable
+ }
+
+ return 0, nil // FIXME
+}
+
+func (tx *inMemoryTxT) Bucket(name []byte) (BucketI, error) {
+ if tx.root == nil {
+ return nil, ErrTxClosed
+ }
+
+ return tx.root.Bucket(name)
+}
+
+func (tx *inMemoryTxT) CreateBucket(name []byte) (BucketI, error) {
+ // return tx.root.CreateBucket(name)
+ return nil, nil // FIXME
+}
+
+func (tx *inMemoryTxT) CreateBucketIfNotExists(name []byte) (BucketI, error) {
+ return nil, nil // FIXME
}
-func (m *InMemory) set(key []byte, value *bucketT) {
- m = &InMemory{m.Set(key, value)}
+func (tx *inMemoryTxT) Cursor() CursorI {
+ return nil // FIXME
}
-func (m *InMemory) Close() error {
+func (tx *inMemoryTxT) DeleteBucket(name []byte) error {
+ if tx.root == nil {
+ return ErrTxClosed
+ }
+
+ return tx.root.DeleteBucket(name)
+}
+
+func (tx *inMemoryTxT) ForEach(fn func([]byte, BucketI) error) error {
+ return nil // FIXME
+}
+
+func (tx *inMemoryTxT) OnCommit(fn func()) {
+ tx.commitHandlers = append(tx.commitHandlers, fn)
+}
+
+func (tx *inMemoryTxT) Check() <-chan error {
+ ch := make(chan error)
+ close(ch)
+ return ch
+}
+
+func (tx *inMemoryTxT) WriteTo(io.Writer) (int64, error) {
+ return 0, nil // FIXME
+}
+
+func (db *inMemoryDatabaseT) Close() error {
+ db.root = nil
return nil
}
-func (m *InMemory) Path() string {
- return ""
+func (db *inMemoryDatabaseT) Path() string {
+ return "" // FIXME
}
-func (m *InMemory) Update(fn func(TransactionI) error) error {
- tx := &inMemoryTx{
- db: m,
+func (db *inMemoryDatabaseT) Update(fn func(TransactionI) error) error {
+ db.rwmutex.Lock()
+ defer db.rwmutex.Unlock()
+
+ tx := &inMemoryTxT{
+ root: &inMemoryValueT{
+ isBucket: true,
+ value: nil,
+ data: db.root,
+ },
commitHandlers: []func(){},
- writable: true,
+ writable: true,
}
+ tx.root.tx = tx
+
err := fn(tx)
+ if err != nil {
+ tx.root = nil
+ return err
+ }
+
+ // commit
+ db.root = tx.root.data
- // Execute commit handlers now that the locks have been removed.
for _, fn := range tx.commitHandlers {
- fn()
+ go fn()
}
- return err
+ return nil
}
-func (m *InMemory) View(func(SnapshotI) error) error {
- return nil
+func (db *inMemoryDatabaseT) View(fn func(SnapshotI) error) error {
+ db.rwmutex.RLock()
+ defer db.rwmutex.RUnlock()
+
+ tx := &inMemoryTxT{
+ root: &inMemoryValueT{
+ isBucket: true,
+ value: nil,
+ data: db.root,
+ },
+ commitHandlers: nil,
+ writable: false,
+ }
+ tx.root.tx = tx
+ return fn(tx)
}
func OpenMemory() DatabaseI {
- return &InMemory{pds.NewMap[[]byte, *bucketT](nil)}
+ var rwmutex sync.RWMutex
+ return &inMemoryDatabaseT{
+ root: pds.NewMap[[]byte, *inMemoryValueT](nil),
+ rwmutex: rwmutex,
+ }
}
-*/
/// Open creates and opens a database at the given path. If the file does not
/// exist then it will be created automatically.