aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorEuAndreh <eu@euandre.org>2025-02-13 18:28:45 -0300
committerEuAndreh <eu@euandre.org>2025-02-13 18:28:45 -0300
commit04e89e71f715bad1fea8875364262dccd90a72c9 (patch)
tree33d883ac8112cf36192fdaa5b2c699affd7b41e4
parentsrc/dedo.go: Evolve implementation of inMemoryDatabaseI (diff)
downloaddedo-04e89e71f715bad1fea8875364262dccd90a72c9.tar.gz
dedo-04e89e71f715bad1fea8875364262dccd90a72c9.tar.xz
src/dedo.go: Add DatabaseI.OnCommit()
-rw-r--r--src/dedo.go55
1 files changed, 34 insertions, 21 deletions
diff --git a/src/dedo.go b/src/dedo.go
index 0125690..84c8857 100644
--- a/src/dedo.go
+++ b/src/dedo.go
@@ -49,6 +49,7 @@ import (
"io/fs"
"log"
"os"
+ "slices"
"sort"
"sync"
"syscall"
@@ -111,6 +112,7 @@ type DatabaseI interface{
Close() error
View (func(tx SnapshotI) error) error
Update(func(tx TransactionI) error) error
+ OnCommit(func())
Path() string
}
@@ -145,14 +147,15 @@ type inMemoryValueT struct{
}
type inMemoryTxT struct{
- commitHandlers []func()
root *inMemoryValueT
writable bool
+ commitHandlers []func()
}
type inMemoryDatabaseT struct{
root *pds.Map[[]byte, *inMemoryValueT]
rwmutex sync.RWMutex
+ commitHandlers []func()
}
type pgid uint64
@@ -249,6 +252,8 @@ type databaseT struct {
rwlock sync.Mutex /// Allows only one writer at a time.
metalock sync.Mutex /// Protects meta page access.
mmaplock sync.RWMutex /// Protects mmap access during remapping.
+
+ commitHandlers []func()
}
type OpenOptionsT struct{
@@ -537,6 +542,12 @@ var (
+func runHandlers(handlers []func()) {
+ for _, fn := range handlers {
+ fn()
+ }
+}
+
/// fdatasync() flushes written data to a file descriptor.
func fdatasync(db *databaseT) error {
return db.file.Sync()
@@ -1666,12 +1677,13 @@ func (db *databaseT) Path() string {
func newDB(path string, file *os.File, options OpenOptionsT) *databaseT {
return &databaseT{
- MaxBatchSize: DefaultMaxBatchSize,
- MaxBatchDelay: DefaultMaxBatchDelay,
- opened: true,
- path: path,
- file: file,
- magic: options.Magic,
+ MaxBatchSize: DefaultMaxBatchSize,
+ MaxBatchDelay: DefaultMaxBatchDelay,
+ opened: true,
+ path: path,
+ file: file,
+ magic: options.Magic,
+ commitHandlers: []func(){},
}
}
@@ -2020,9 +2032,7 @@ func (db *inMemoryDatabaseT) Update(fn func(TransactionI) error) error {
// commit
db.root = tx.root.data
- for _, fn := range tx.commitHandlers {
- go fn()
- }
+ runHandlers(slices.Concat(tx.commitHandlers, db.commitHandlers))
return nil
}
@@ -2047,8 +2057,9 @@ func (db *inMemoryDatabaseT) View(fn func(SnapshotI) error) error {
func OpenMemory() DatabaseI {
var rwmutex sync.RWMutex
return &inMemoryDatabaseT{
- root: pds.NewMap[[]byte, *inMemoryValueT](nil),
- rwmutex: rwmutex,
+ root: pds.NewMap[[]byte, *inMemoryValueT](nil),
+ rwmutex: rwmutex,
+ commitHandlers: []func(){},
}
}
@@ -3842,12 +3853,8 @@ func (tx *transactionT) ForEach(fn func([]byte, BucketI) error) error {
/// transactionT.OnCommit() adds a handler function to be executed after the transaction
/// successfully commits.
-func txOnCommit(tx *transactionT, fn func()) {
- tx.commitHandlers = append(tx.commitHandlers, fn)
-}
-
func (tx *transactionT) OnCommit(fn func()) {
- txOnCommit(tx, fn)
+ tx.commitHandlers = append(tx.commitHandlers, fn)
}
/// transactionT.commit() writes all changes to disk and updates the meta page. Returns
@@ -3918,13 +3925,11 @@ func (tx *transactionT) commit() error {
return err
}
- // Finalize the transaction.
+ handlers := slices.Concat(tx.commitHandlers, tx.db.commitHandlers)
tx.close()
// Execute commit handlers now that the locks have been removed.
- for _, fn := range tx.commitHandlers {
- fn()
- }
+ runHandlers(handlers)
return nil
}
@@ -4268,6 +4273,14 @@ func (tx *transactionT) forEachPage(pgid pgid, depth int, fn func(*page, int)) {
}
}
+func (database *inMemoryDatabaseT) OnCommit(fn func()) {
+ database.commitHandlers = append(database.commitHandlers, fn)
+}
+
+func (database *databaseT) OnCommit(fn func()) {
+ database.commitHandlers = append(database.commitHandlers, fn)
+}
+
func noopGetopt(args argsT, _w io.Writer) (argsT, bool) {
return args, true
}