diff options
author | EuAndreh <eu@euandre.org> | 2025-02-13 18:28:45 -0300 |
---|---|---|
committer | EuAndreh <eu@euandre.org> | 2025-02-13 18:28:45 -0300 |
commit | 04e89e71f715bad1fea8875364262dccd90a72c9 (patch) | |
tree | 33d883ac8112cf36192fdaa5b2c699affd7b41e4 | |
parent | src/dedo.go: Evolve implementation of inMemoryDatabaseI (diff) | |
download | dedo-04e89e71f715bad1fea8875364262dccd90a72c9.tar.gz dedo-04e89e71f715bad1fea8875364262dccd90a72c9.tar.xz |
src/dedo.go: Add DatabaseI.OnCommit()
-rw-r--r-- | src/dedo.go | 55 |
1 files changed, 34 insertions, 21 deletions
diff --git a/src/dedo.go b/src/dedo.go index 0125690..84c8857 100644 --- a/src/dedo.go +++ b/src/dedo.go @@ -49,6 +49,7 @@ import ( "io/fs" "log" "os" + "slices" "sort" "sync" "syscall" @@ -111,6 +112,7 @@ type DatabaseI interface{ Close() error View (func(tx SnapshotI) error) error Update(func(tx TransactionI) error) error + OnCommit(func()) Path() string } @@ -145,14 +147,15 @@ type inMemoryValueT struct{ } type inMemoryTxT struct{ - commitHandlers []func() root *inMemoryValueT writable bool + commitHandlers []func() } type inMemoryDatabaseT struct{ root *pds.Map[[]byte, *inMemoryValueT] rwmutex sync.RWMutex + commitHandlers []func() } type pgid uint64 @@ -249,6 +252,8 @@ type databaseT struct { rwlock sync.Mutex /// Allows only one writer at a time. metalock sync.Mutex /// Protects meta page access. mmaplock sync.RWMutex /// Protects mmap access during remapping. + + commitHandlers []func() } type OpenOptionsT struct{ @@ -537,6 +542,12 @@ var ( +func runHandlers(handlers []func()) { + for _, fn := range handlers { + fn() + } +} + /// fdatasync() flushes written data to a file descriptor. func fdatasync(db *databaseT) error { return db.file.Sync() @@ -1666,12 +1677,13 @@ func (db *databaseT) Path() string { func newDB(path string, file *os.File, options OpenOptionsT) *databaseT { return &databaseT{ - MaxBatchSize: DefaultMaxBatchSize, - MaxBatchDelay: DefaultMaxBatchDelay, - opened: true, - path: path, - file: file, - magic: options.Magic, + MaxBatchSize: DefaultMaxBatchSize, + MaxBatchDelay: DefaultMaxBatchDelay, + opened: true, + path: path, + file: file, + magic: options.Magic, + commitHandlers: []func(){}, } } @@ -2020,9 +2032,7 @@ func (db *inMemoryDatabaseT) Update(fn func(TransactionI) error) error { // commit db.root = tx.root.data - for _, fn := range tx.commitHandlers { - go fn() - } + runHandlers(slices.Concat(tx.commitHandlers, db.commitHandlers)) return nil } @@ -2047,8 +2057,9 @@ func (db *inMemoryDatabaseT) View(fn func(SnapshotI) error) error { func OpenMemory() DatabaseI { var rwmutex sync.RWMutex return &inMemoryDatabaseT{ - root: pds.NewMap[[]byte, *inMemoryValueT](nil), - rwmutex: rwmutex, + root: pds.NewMap[[]byte, *inMemoryValueT](nil), + rwmutex: rwmutex, + commitHandlers: []func(){}, } } @@ -3842,12 +3853,8 @@ func (tx *transactionT) ForEach(fn func([]byte, BucketI) error) error { /// transactionT.OnCommit() adds a handler function to be executed after the transaction /// successfully commits. -func txOnCommit(tx *transactionT, fn func()) { - tx.commitHandlers = append(tx.commitHandlers, fn) -} - func (tx *transactionT) OnCommit(fn func()) { - txOnCommit(tx, fn) + tx.commitHandlers = append(tx.commitHandlers, fn) } /// transactionT.commit() writes all changes to disk and updates the meta page. Returns @@ -3918,13 +3925,11 @@ func (tx *transactionT) commit() error { return err } - // Finalize the transaction. + handlers := slices.Concat(tx.commitHandlers, tx.db.commitHandlers) tx.close() // Execute commit handlers now that the locks have been removed. - for _, fn := range tx.commitHandlers { - fn() - } + runHandlers(handlers) return nil } @@ -4268,6 +4273,14 @@ func (tx *transactionT) forEachPage(pgid pgid, depth int, fn func(*page, int)) { } } +func (database *inMemoryDatabaseT) OnCommit(fn func()) { + database.commitHandlers = append(database.commitHandlers, fn) +} + +func (database *databaseT) OnCommit(fn func()) { + database.commitHandlers = append(database.commitHandlers, fn) +} + func noopGetopt(args argsT, _w io.Writer) (argsT, bool) { return args, true } |