/// Package dedo implements a low-level key/value store in pure Go. It supports /// fully serializable transactions, ACID semantics, and lock-free MVCC with /// multiple readers and a single writer. Dedo can be used for projects that /// want a simple data store without the need to add large dependencies such as /// Postgres or MySQL. /// /// Dedo is a single-level, zero-copy, B+tree data store. This means that Dedo /// is optimized for fast read access and does not require recovery in the event /// of a system crash. Transactions which have not finished committing will /// simply be rolled back in the event of a crash. /// /// The design of Dedo is based on Howard Chu's LMDB database project. /// /// /// == Basics /// /// There are only a few types in Dedo: databaseT, bucketT, transactionT, and cursorT. The databaseT is /// a collection of buckets and is represented by a single file on disk. A /// bucket is a collection of unique keys that are associated with values. /// /// Transactions provide either read-only or read-write access to the database. /// Read-only transactions can retrieve key/value pairs and can use cursorTs to /// iterate over the dataset sequentially. Read-write transactions can create /// and delete buckets and can insert and remove keys. Only one read-write /// transaction is allowed at a time. /// /// /// == Caveats /// /// The database uses a read-only, memory-mapped data file to ensure that /// applications cannot corrupt the database, however, this means that keys and /// values returned from Dedo cannot be changed. Writing to a read-only byte /// slice will cause Go to panic. /// /// Keys and values retrieved from the database are only valid for the life of /// the transaction. When used outside the transaction, these byte slices can /// point to different data or can point to invalid memory which will cause a /// panic. package dedo import ( "bytes" "encoding/binary" "errors" "flag" "fmt" "hash/fnv" "io" "io/fs" "log" "os" "slices" "sort" "sync" "syscall" "time" "unsafe" "pds" g "gobang" ) type baseCursorI interface{ First() ([]byte, []byte) Last() ([]byte, []byte) Next() ([]byte, []byte) Prev() ([]byte, []byte) Seek([]byte) ([]byte, []byte) } type ROCursorI interface{ baseCursorI } type RWCursorI interface{ baseCursorI Delete() error } type baseBucketI interface{ Get([]byte) []byte ForEach(func([]byte, []byte) error) error } type ROBucketI interface{ baseBucketI ROBucket([]byte) (ROBucketI, error) ROCursor() ROCursorI } type RWBucketI interface{ baseBucketI RWBucket([]byte) (RWBucketI, error) RWCursor() RWCursorI Put([]byte, []byte) error Delete([]byte) error CreateBucket([]byte) (RWBucketI, error) CreateBucketIfNotExists([]byte) (RWBucketI, error) DeleteBucket([]byte) error NextID() []byte NextSequence() (uint64, error) } type baseOperationI interface{ WriteTo(io.Writer) (int64, error) Check() <-chan error } type SnapshotI interface{ baseOperationI ROBucket([]byte) (ROBucketI, error) ROCursor() ROCursorI ROForEach(func([]byte, ROBucketI) error) error } type TransactionI interface{ baseOperationI RWBucket([]byte) (RWBucketI, error) RWCursor() RWCursorI RWForEach(func([]byte, RWBucketI) error) error CreateBucket ([]byte) (RWBucketI, error) CreateBucketIfNotExists([]byte) (RWBucketI, error) DeleteBucket([]byte) error OnCommit(func()) } type DatabaseI interface{ Close() error View (func(tx SnapshotI) error) error Update(func(tx TransactionI) error) error OnCommit(func()) Path() string } /// transactionT represents a read-only or read/write transaction on the /// database. Read-only transactions can be used for retrieving values for keys /// and creating cursors. Read/write transactions can create and remove buckets /// and create and remove keys. /// /// IMPORTANT: You must commit or rollback transactions when you are done with /// them. Pages can not be reclaimed by the writer until no more transactions /// are using them. A long running read transaction can cause the database to /// quickly grow. type transactionT struct{ writable bool db *databaseT meta *meta root bucketT pages map[pgid]*page commitHandlers []func() } type inMemoryCursorT struct{ bucket *inMemoryValueT iterator *pds.MapIterator[[]byte, *inMemoryValueT] } type inMemoryValueT struct{ isBucket bool tx *inMemoryTxT value []byte data *pds.Map[[]byte, *inMemoryValueT] } type inMemoryTxT struct{ root *inMemoryValueT writable bool commitHandlers []func() } type inMemoryDatabaseT struct{ root *pds.Map[[]byte, *inMemoryValueT] rwmutex sync.RWMutex commitHandlers []func() } type pgid uint64 /// bucket represents the on-file representation of a bucket. This is stored as /// the "value" of a bucket key. If the bucket is small enough, then its root /// page can be stored inline in the "value", after the bucket header. In the /// case of inline buckets, the "root" will be 0. type bucket struct { root pgid /// page id of the bucket's root-level page sequence uint64 /// monotonically incrementing, used by NextSequence() } /// bucketT represents a distinct collection of key/value pairs inside the /// database. Keys aren't unique across different buckets. type bucketT struct { ref *bucket tx *transactionT /// the associated transaction buckets map[string]*bucketT /// subbucket cache page *page /// inline page reference rootNode *node /// materialized node for the root page nodes map[pgid]*node /// node cache } /// cursorT represents an iterator that can traverse over all key/value pairs in /// a bucket in sorted order. cursorTs see nested buckets with value == nil. /// cursorTs can be obtained from a transaction and are valid as long as the /// transaction is open. /// /// Keys and values returned from the cursor are only valid for the life of the /// transaction. /// /// Changing data while traversing with a cursor may cause it to be invalidated /// and return unexpected keys and/or values. You must reposition your cursor /// after mutating data. type cursorT struct { bucket *bucketT stack []elemRef } /// elemRef represents a reference to an element on a given page/node. type elemRef struct { page *page node *node index int } /// databaseT represents a collection of buckets persisted to a file on disk. All data /// access is performed through transactions which can be obtained through the /// databaseT. All the functions on databaseT will return a ErrDatabaseNotOpen if accessed /// before Open() is called. type databaseT struct { /// MaxBatchSize is the maximum size of a batch. Default value is /// copied from DefaultMaxBatchSize in Open. /// /// If <=0, disables batching. /// /// Do not change concurrently with calls to Batch. MaxBatchSize int /// MaxBatchDelay is the maximum delay before a batch starts. Default /// value is copied from DefaultMaxBatchDelay in Open. /// /// If <=0, effectively disables batching. /// /// Do not change concurrently with calls to Batch. MaxBatchDelay time.Duration /// allocSize is the amount of space allocated when the database needs /// to create new pages. This is done to amortize the cost of /// truncate() and fsync() when growing the data file. allocSize int path string file *os.File dataref []byte /// mmap'ed read-only via PROT_READ, write throws SEGV data *[maxMapSize]byte datasz int filesz int /// current on disk file size meta0 *meta meta1 *meta pageSize int opened bool rwtx *transactionT txs []*transactionT freelist *freelist magic uint32 pagePool sync.Pool batchMu sync.Mutex batch *batch rwlock sync.Mutex /// Allows only one writer at a time. metalock sync.Mutex /// Protects meta page access. mmaplock sync.RWMutex /// Protects mmap access during remapping. commitHandlers []func() } type OpenOptionsT struct{ Magic uint32 } type call struct { fn func(TransactionI) error err chan<- error } type batch struct { db *databaseT timer *time.Timer start sync.Once calls []call } type panicked struct { reason any } type meta struct { magic uint32 version uint32 pageSize uint32 flags uint32 root bucket freelist pgid pgid pgid txid txid checksum uint64 } /// freelist represents a list of all pages that are available for allocation. /// It also tracks pages that have been freed but are still in use by open /// transactions. type freelist struct { ids []pgid /// all free and available free page ids pending map[txid][]pgid /// mapping of soon-to-be free page ids by tx cache map[pgid]bool /// fast lookup of all free and pending page ids } /// node represents an in-memory, deserialized page. type node struct { bucket *bucketT isLeaf bool unbalanced bool spilled bool key []byte pgid pgid parent *node children nodes inodes inodes } type nodes []*node type pages []*page /// inode represents an internal node inside of a node. /// It can be used to point to elements in a page or point /// to an element which hasn't been added to a page yet. type inode struct { flags uint32 pgid pgid key []byte value []byte } type inodes []inode type page struct { id pgid flags uint16 count uint16 overflow uint32 ptr uintptr } /// branchPageElement represents a node on a branch page. type branchPageElement struct { pos uint32 ksize uint32 pgid pgid } /// leafPageElement represents a node on a leaf page. type leafPageElement struct { flags uint32 pos uint32 ksize uint32 vsize uint32 } type pgids []pgid /// txid represents the internal transaction identifier. type txid uint64 /// walkFunc is the type of the function called for keys (buckets and "normal" /// values) discovered by Walk. Keys is the list of keys to descend to the /// bucket owning the discovered key/value pair k/v. type walkFunc func(keys [][]byte, k, v []byte, seq uint64) error type argsT struct{ databasePath string command string allArgs []string args []string bucket []byte key []byte value []byte } type commandT struct{ name string getopt func(argsT, io.Writer) (argsT, bool) exec func(argsT, DatabaseI, io.Reader, io.Writer) error } const ( /// maxMapSize represents the largest mmap size supported by Dedo. maxMapSize = 0xFFFFFFFFFFFF /// 256TB /// maxAllocSize is the size used when creating array pointers. maxAllocSize = 0x7FFFFFFF // FIXME: why? /// MaxKeySize is the maximum length of a key, in bytes. MaxKeySize = 32768 // FIXME: why? /// MaxValueSize is the maximum length of a value, in bytes. MaxValueSize = (1 << 31) - 2 bucketHeaderSize = int(unsafe.Sizeof(bucket{})) /// The largest step that can be taken when remapping the mmap. maxMmapStep = 1 << 30 /// 1GB /// The data file format version. version = 2 /// Represents a marker value to indicate that a file is a Dedo databaseT. defaultMagic uint32 = 0xFACADAAB /// Default values if not set in a databaseT instance. DefaultMaxBatchSize int = 1000 DefaultMaxBatchDelay = 10 * time.Millisecond allocGrowthSize = 16 * 1024 * 1024 // 16MiB pageHeaderSize = int(unsafe.Offsetof(((*page)(nil)).ptr)) minKeysPerPage = 2 branchPageElementSize = int(unsafe.Sizeof(branchPageElement{})) leafPageElementSize = int(unsafe.Sizeof(leafPageElement{})) branchPageFlag = 0x01 leafPageFlag = 0x02 metaPageFlag = 0x04 freelistPageFlag = 0x10 bucketLeafFlag = 0x01 /// PageHeaderSize represents the size of the dedo.page header. PageHeaderSize = 16 ) var ( defaultOptions = OpenOptionsT{ Magic: defaultMagic, } /// trySolo is a special sentinel error value used for signaling that a /// transaction function should be re-run. It should never be seen by /// callers. trySolo = errors.New( "batch function returned an error and should be re-run solo", ) /// /// These errors can be returned when opening or calling methods on a /// databaseT. /// /// ErrDatabaseNotOpen is returned when a databaseT instance is accessed before /// it is opened or after it is closed. ErrDatabaseNotOpen = errors.New("database not open") /// ErrInvalid is returned when both meta pages on a database are /// invalid. This typically occurs when a file is not a dedo database. ErrInvalid = errors.New("invalid database") /// ErrVersionMismatch is returned when the data file was created with a /// different version of Dedo. ErrVersionMismatch = errors.New("version mismatch") /// ErrChecksum is returned when either meta page checksum does not /// match. ErrChecksum = errors.New("checksum error") /// /// These errors can occur when beginning or committing a transactionT. /// /// ErrTxNotWritable is returned when performing a write operation on a /// read-only transaction. ErrTxNotWritable = errors.New("tx not writable") /// ErrTxClosed is returned when committing or rolling back a /// transaction that has already been committed or rolled back. ErrTxClosed = errors.New("tx closed") /// /// These errors can occur when putting or deleting a value or a bucket. /// ErrBucketBadFlag = errors.New("bucket does not have bucketLeafFlag") /// ErrBucketNotFound is returned when trying to access a bucket that /// has not been created yet. ErrBucketNotFound = errors.New("bucket not found") /// ErrBucketExists is returned when creating a bucket that already /// exists. ErrBucketExists = errors.New("bucket already exists") /// ErrBucketNameRequired is returned when creating a bucket with a /// blank name. ErrBucketNameRequired = errors.New("bucket name required") /// ErrKeyRequired is returned when inserting a zero-length key. ErrKeyRequired = errors.New("key required") /// ErrKeyTooLarge is returned when inserting a key that is larger than /// MaxKeySize. ErrKeyTooLarge = errors.New("key too large") /// ErrValueTooLarge is returned when inserting a value that is larger /// than MaxValueSize. ErrValueTooLarge = errors.New("value too large") /// ErrIncompatibleValue is returned when trying create or delete a /// bucket on an existing non-bucket key or when trying to create or /// delete a non-bucket key on an existing bucket key. ErrIncompatibleValue = errors.New("incompatible value") /// ErrMissingKey is returned when the CLI tries to get a key that it /// can't find in the specified bucket. ErrMissingKey = errors.New("missing key") /// ErrUsage is returned when a usage message was printed and the /// process should simply exit with an error. ErrUsage = errors.New("usage") /// ErrUnknownCommand is returned when a CLI command is not specified. ErrUnknownCommand = errors.New("unknown command") /// ErrPathRequired is returned when the path to a Dedo database is not /// specified. ErrPathRequired = errors.New("path required") /// ErrFileNotFound is returned when a Dedo database does not exist. ErrFileNotFound = errors.New("file not found") /// ErrCorrupt is returned when a checking a data file finds errors. ErrCorrupt = errors.New("invalid value") /// ErrNonDivisibleBatchSize is returned when the batch size can't be /// evenly divided by the iteration count. ErrNonDivisibleBatchSize = errors.New( "number of iterations must be divisible by the batch size", ) /// ErrPageIDRequired is returned when a required page id is not /// specified. ErrPageIDRequired = errors.New("page id required") ) func runHandlers(handlers []func()) { for _, fn := range handlers { fn() } } /// fdatasync() flushes written data to a file descriptor. func fdatasync(db *databaseT) error { return db.file.Sync() } /// flock() acquires an advisory lock on a file descriptor. func flock(db *databaseT) error { const lockFlags = syscall.LOCK_EX | syscall.LOCK_NB for { err := syscall.Flock(int(db.file.Fd()), lockFlags) if err == nil { return nil } else if err != syscall.EWOULDBLOCK { return err } // Wait for a bit and try again. time.Sleep(50 * time.Millisecond) } } /// funlock() releases an advisory lock on a file descriptor. func funlock(db *databaseT) error { return syscall.Flock(int(db.file.Fd()), syscall.LOCK_UN) } /// mmap memory() maps a databaseT's data file. func mmap(db *databaseT, sz int) error { fd := int(db.file.Fd()) b, err := syscall.Mmap(fd, 0, sz, syscall.PROT_READ, syscall.MAP_SHARED) if err != nil { return err } err = madvise(b, syscall.MADV_RANDOM) if err != nil { return fmt.Errorf("madvise: %s", err) } // Save the original byte slice and convert to a byte array pointer. db.dataref = b db.data = (*[maxMapSize]byte)(unsafe.Pointer(&b[0])) db.datasz = sz return nil } /// munmap() unmaps a databaseT's data file from memory. func munmap(db *databaseT) error { if db.dataref == nil { return nil } err := syscall.Munmap(db.dataref) db.dataref = nil db.data = nil db.datasz = 0 return err } func madvise(b []byte, advice int) (err error) { _, _, e1 := syscall.Syscall( syscall.SYS_MADVISE, uintptr(unsafe.Pointer(&b[0])), uintptr(len(b)), uintptr(advice), ) if e1 != 0 { err = e1 } return } /// newBucket() returns a new bucket associated with a transaction. func newBucket(tx *transactionT) bucketT { b := bucketT{tx: tx} if tx.writable { b.buckets = make(map[string]*bucketT) b.nodes = make(map[pgid]*node) } return b } /// bucketT.Cursor() creates a cursor associated with the bucket. The cursor is /// only valid as long as the transaction is open. Do not use a cursor after /// the transaction is closed. func bucketCursor(b *bucketT) *cursorT { return &cursorT{ bucket: b, stack: make([]elemRef, 0), } } func (b *bucketT) ROCursor() ROCursorI { return bucketCursor(b) } func (b *bucketT) RWCursor() RWCursorI { return bucketCursor(b) } /// bucketT.Bucket() retrieves a nested bucket by name. Returns nil if the /// bucket does not exist. The bucket instance is only valid for the lifetime /// of the transaction. func bucketGetNested(b *bucketT, name []byte) (*bucketT, error) { if b.buckets != nil { child := b.buckets[string(name)] if child != nil { return child, nil } } // Move cursor to key. c := bucketCursor(b) k, v, flags := c.seek(name) // Return nil if the key doesn't exist or it is not a bucket. if !bytes.Equal(name, k) { return nil, ErrBucketNotFound } if (flags & bucketLeafFlag) == 0 { return nil, ErrBucketBadFlag } // Otherwise create a bucket and cache it. child := b.openBucket(v) if b.buckets != nil { b.buckets[string(name)] = child } return child, nil } func (b *bucketT) ROBucket(name []byte) (ROBucketI, error) { return bucketGetNested(b, name) } func (b *bucketT) RWBucket(name []byte) (RWBucketI, error) { return bucketGetNested(b, name) } /// bucketT.Helper() method that re-interprets a sub-bucket value from a parent /// into a bucketT func (b *bucketT) openBucket(value []byte) *bucketT { child := newBucket(b.tx) // If this is a writable transaction then we need to copy the bucket // entry. Read-only transactions can point directly at the mmap entry. if b.tx.writable { child.ref = &bucket{} *child.ref = *(*bucket)(unsafe.Pointer(&value[0])) } else { child.ref = (*bucket)(unsafe.Pointer(&value[0])) } // Save a reference to the inline page if the bucket is inline. if child.ref.root == 0 { child.page = (*page)(unsafe.Pointer(&value[bucketHeaderSize])) } return &child } /// bucketT.CreateBucket() creates a new bucket at the given key and returns the /// new bucket. Returns an error if the key already exists, if the bucket name /// is blank, or if the bucket name is too long. The bucket instance is only /// valid for the lifetime of the transaction. func bucketCreateBucket(b *bucketT, key []byte) (*bucketT, error) { if b.tx.db == nil { return nil, ErrTxClosed } else if !b.tx.writable { return nil, ErrTxNotWritable } else if len(key) == 0 { return nil, ErrBucketNameRequired } // Move cursor to correct position. c := bucketCursor(b) k, _, flags := c.seek(key) // Return an error if there is an existing key. if bytes.Equal(key, k) { if (flags & bucketLeafFlag) != 0 { return nil, ErrBucketExists } return nil, ErrIncompatibleValue } // Create empty, inline bucket. bucket := bucketT{ ref: &bucket{}, rootNode: &node{isLeaf: true}, } value := bucket.write() // Insert into node. key = cloneBytes(key) c.node().put(key, key, value, 0, bucketLeafFlag) // Since subbuckets are not allowed on inline buckets, we need to // dereference the inline page, if it exists. This will cause the // bucket to be treated as a regular, non-inline bucket for the rest of // the tx. b.page = nil return bucketGetNested(b, key) } func (b *bucketT) CreateBucket(key []byte) (RWBucketI, error) { return bucketCreateBucket(b, key) } /// bucketT.CreateBucketIfNotExists() creates a new bucket if it doesn't already /// exist and returns a reference to it. Returns an error if the bucket name is /// blank, or if the bucket name is too long. The bucket instance is only valid /// for the lifetime of the transaction. func bucketCreateBucketIfNotExists(b *bucketT, key []byte) (*bucketT, error) { child, err := bucketCreateBucket(b, key) if err == ErrBucketExists { return bucketGetNested(b, key) } else if err != nil { return nil, err } return child, nil } func (b *bucketT) CreateBucketIfNotExists(key []byte) (RWBucketI, error) { return bucketCreateBucketIfNotExists(b, key) } /// bucketT.DeleteBucket() deletes a bucket at the given key. Returns an error /// if the bucket does not exists, or if the key represents a non-bucket value. func bucketDeleteBucket(b *bucketT, key []byte) error { if b.tx.db == nil { return ErrTxClosed } else if !b.tx.writable { return ErrTxNotWritable } // Move cursor to correct position. c := bucketCursor(b) k, _, flags := c.seek(key) // Return an error if bucket doesn't exist or is not a bucket. if !bytes.Equal(key, k) { return ErrBucketNotFound } else if (flags & bucketLeafFlag) == 0 { return ErrIncompatibleValue } // Recursively delete all child buckets. child, err := bucketGetNested(b, key) if err != nil { return err } err = child.ForEach(func(k, v []byte) error { if v == nil { err := child.DeleteBucket(k) if err != nil { return fmt.Errorf("delete bucket: %s", err) } } return nil }) if err != nil { return err } // Remove cached copy. delete(b.buckets, string(key)) // Release all bucket pages to freelist. child.nodes = nil child.rootNode = nil child.free() // Delete the node if we have a matching key. c.node().del(key) return nil } func (b *bucketT) DeleteBucket(key []byte) error { return bucketDeleteBucket(b, key) } /// bucketT.Get() retrieves the value for a key in the bucket. Returns a nil /// value if the key does not exist or if the key is a nested bucket. The /// returned value is only valid for the life of the transaction. func bucketGet(b *bucketT, key []byte) []byte { k, v, flags := bucketCursor(b).seek(key) // Return nil if this is a bucket. if (flags & bucketLeafFlag) != 0 { return nil } // If our target node isn't the same key as what's passed in then return // nil. if !bytes.Equal(key, k) { return nil } return v } func (b *bucketT) Get(key []byte) []byte { return bucketGet(b, key) } /// bucketT.Put() sets the value for a key in the bucket. If the key exist then /// its previous value will be overwritten. Supplied value must remain valid /// for the life of the transaction. Returns an error if the bucket was created /// from a read-only transaction, if the key is blank, if the key is too large, /// or if the value is too large. func bucketPut(b *bucketT, key []byte, value []byte) error { if b.tx.db == nil { return ErrTxClosed } else if !b.tx.writable { return ErrTxNotWritable } else if len(key) == 0 { return ErrKeyRequired } else if len(key) > MaxKeySize { return ErrKeyTooLarge } else if int64(len(value)) > MaxValueSize { return ErrValueTooLarge } // Move cursor to correct position. c := bucketCursor(b) k, _, flags := c.seek(key) // Return an error if there is an existing key with a bucket value. if bytes.Equal(key, k) && (flags&bucketLeafFlag) != 0 { return ErrIncompatibleValue } // Insert into node. key = cloneBytes(key) c.node().put(key, key, value, 0, 0) return nil } func (b *bucketT) Put(key []byte, value []byte) error { return bucketPut(b, key, value) } /// bucketT.Delete() removes a key from the bucket. If the key does not exist /// then nothing is done and a nil error is returned. Returns an error if the /// bucket was created from a read-only transaction. func bucketDelete(b *bucketT, key []byte) error { if b.tx.db == nil { return ErrTxClosed } else if !b.tx.writable { return ErrTxNotWritable } // Move cursor to correct position. c := bucketCursor(b) _, _, flags := c.seek(key) // Return an error if there is already existing bucket value. if (flags & bucketLeafFlag) != 0 { return ErrIncompatibleValue } // Delete the node if we have a matching key. c.node().del(key) return nil } func (b *bucketT) Delete(key []byte) error { return bucketDelete(b, key) } func ID2Bytes(num uint64) []byte { arr := make([]byte, 8) binary.LittleEndian.PutUint64(arr, num) return arr } func Bytes2ID(bytes []byte) uint64 { return binary.LittleEndian.Uint64(bytes) } func bucketNextID(b *bucketT) []byte { id := g.Must(b.NextSequence()) return ID2Bytes(id) } func (b *bucketT) NextID() []byte { return bucketNextID(b) } /// bucketT.NextSequence() returns an autoincrementing integer for the bucket. func bucketNextSequence(b *bucketT) (uint64, error) { if b.tx.db == nil { return 0, ErrTxClosed } else if !b.tx.writable { return 0, ErrTxNotWritable } // Materialize the root node if it hasn't been already so that the // bucket will be saved during commit. if b.rootNode == nil { _ = b.node(b.ref.root, nil) } // Increment and return the sequence. b.ref.sequence++ return b.ref.sequence, nil } func (b *bucketT) NextSequence() (uint64, error) { return bucketNextSequence(b) } /// bucketT.ForEach() executes a function for each key/value pair in a bucket. /// If the provided function returns an error then the iteration is stopped and /// the error is returned to the caller. The provided function must not modify /// the bucket; this will result in undefined behavior. func bucketForEach(b *bucketT, fn func(k, v []byte) error) error { if b.tx.db == nil { return ErrTxClosed } c := bucketCursor(b) for k, v := c.First(); k != nil; k, v = c.Next() { err := fn(k, v) if err != nil { return err } } return nil } func (b *bucketT) ForEach(fn func(k, v []byte) error) error { return bucketForEach(b, fn) } /// bucketT.forEachPageNode() iterates over every page (or node) in a bucket. /// This also includes inline pages. func bucketForEachPageNode(b *bucketT, fn func(*page, *node, int)) { // If we have an inline page or root node then just use that. if b.page != nil { fn(b.page, nil, 0) return } bucketForEachPageNodeRec(b, b.ref.root, 0, fn) } func bucketForEachPageNodeRec( b *bucketT, pgid pgid, depth int, fn func(*page, *node, int), ) { p, n := b.pageNode(pgid) // Execute function. fn(p, n, depth) // Recursively loop over children. if p != nil { if (p.flags & branchPageFlag) != 0 { for i := 0; i < int(p.count); i++ { elem := p.branchPageElement(uint16(i)) bucketForEachPageNodeRec(b, elem.pgid, depth+1, fn) } } } else { if !n.isLeaf { for _, inode := range n.inodes { bucketForEachPageNodeRec(b, inode.pgid, depth+1, fn) } } } } /// bucketT.spill() writes all the nodes for this bucket to dirty pages. func (b *bucketT) spill() error { // Spill all child buckets first. for name, child := range b.buckets { // If the child bucket is small enough and it has no child // buckets then write it inline into the parent bucket's page. // Otherwise spill it like a normal bucket and make the parent // value a pointer to the page. var value []byte if child.inlineable() { child.free() value = child.write() } else { err := child.spill() if err != nil { return err } // Update the child bucket header in this bucket. value = make([]byte, unsafe.Sizeof(bucket{})) bucket := (*bucket)(unsafe.Pointer(&value[0])) *bucket = *child.ref } // Skip writing the bucket if there are no materialized nodes. if child.rootNode == nil { continue } // Update parent node. c := bucketCursor(b) k, _, flags := c.seek([]byte(name)) if !bytes.Equal([]byte(name), k) { panic(fmt.Sprintf( "misplaced bucket header: %x -> %x", []byte(name), k, )) } if flags&bucketLeafFlag == 0 { panic(fmt.Sprintf( "unexpected bucket header flag: %x", flags, )) } c.node().put( []byte(name), []byte(name), value, 0, bucketLeafFlag, ) } // Ignore if there's not a materialized root node. if b.rootNode == nil { return nil } // Spill nodes. err := b.rootNode.spill() if err != nil { return err } b.rootNode = b.rootNode.root() // Update the root node for this bucket. if b.rootNode.pgid >= b.tx.meta.pgid { panic(fmt.Sprintf( "pgid (%d) above high water mark (%d)", b.rootNode.pgid, b.tx.meta.pgid, )) } b.ref.root = b.rootNode.pgid return nil } /// bucketT.inlineable() returns true if a bucket is small enough to be written /// inline and if it contains no subbuckets. Otherwise returns false. func (b *bucketT) inlineable() bool { n := b.rootNode // bucketT must only contain a single leaf node. if n == nil || !n.isLeaf { return false } // bucketT is not inlineable if it contains subbuckets or if it goes // beyond our threshold for inline bucket size. size := pageHeaderSize for _, inode := range n.inodes { size += leafPageElementSize + len(inode.key) + len(inode.value) if inode.flags&bucketLeafFlag != 0 { return false } else if size > b.maxInlineBucketSize() { return false } } return true } /// bucketT.maInlineBucketSize() returns the maximum total size of a bucket to /// make it a candidate for inlining. func (b *bucketT) maxInlineBucketSize() int { return b.tx.db.pageSize / 4 } /// bucketT.write() allocates and writes a bucket to a byte slice. func (b *bucketT) write() []byte { // Allocate the appropriate size. n := b.rootNode value := make([]byte, bucketHeaderSize+n.size()) // Write a bucket header. bucket := (*bucket)(unsafe.Pointer(&value[0])) *bucket = *b.ref // Convert byte slice to a fake page and write the root node. p := (*page)(unsafe.Pointer(&value[bucketHeaderSize])) n.write(p) return value } /// bucketT.rebalance() attempts to balance all nodes. func (b *bucketT) rebalance() { for _, n := range b.nodes { n.rebalance() } for _, child := range b.buckets { child.rebalance() } } /// bucketT.node() creates a node from a page and associates it with a given /// parent. func (b *bucketT) node(pgid pgid, parent *node) *node { g.Assert(b.nodes != nil, "nodes map expected") // Retrieve node if it's already been created. n := b.nodes[pgid] if n != nil { return n } // Otherwise create a node and cache it. n = &node{bucket: b, parent: parent} if parent == nil { b.rootNode = n } else { parent.children = append(parent.children, n) } // Use the inline page if this is an inline bucket. p := b.page if p == nil { p = b.tx.page(pgid) } // Read the page into the node and cache it. n.read(p) b.nodes[pgid] = n return n } /// bucketT.free() recursively frees all pages in the bucket. func (b *bucketT) free() { if b.ref.root == 0 { return } tx := b.tx bucketForEachPageNode(b, func(p *page, n *node, _ int) { if p != nil { tx.db.freelist.free(tx.meta.txid, p) } else { n.free() } }) b.ref.root = 0 } /// bucketT.dereference() removes all references to the old mmap. func (b *bucketT) dereference() { if b.rootNode != nil { b.rootNode.root().dereference() } for _, child := range b.buckets { child.dereference() } } /// bucketT.pageNode() returns the in-memory node, if it exists. Otherwise /// returns the underlying page. func (b *bucketT) pageNode(id pgid) (*page, *node) { // Inline buckets have a fake page embedded in their value so treat them // differently. We'll return the rootNode (if available) or the fake // page. if b.ref.root == 0 { if id != 0 { panic(fmt.Sprintf( "inline bucket non-zero page access(2): " + "%d != 0", id, )) } if b.rootNode != nil { return nil, b.rootNode } return b.page, nil } // Check the node cache for non-inline buckets. if b.nodes != nil { n := b.nodes[id] if n != nil { return nil, n } } // Finally lookup the page from the transaction if no node is // materialized. return b.tx.page(id), nil } /// cloneBytes() returns a copy of a given slice. func cloneBytes(v []byte) []byte { clone := make([]byte, len(v)) copy(clone, v) return clone } /// cursorT.First() moves the cursor to the first item in the bucket and returns /// its key and value. If the bucket is empty then a nil key and value are /// returned. The returned key and value are only valid for the life of the /// transaction. func cursorFirst(c *cursorT) ([]byte, []byte) { g.Assert(c.bucket.tx.db != nil, "tx closed") c.stack = c.stack[:0] p, n := c.bucket.pageNode(c.bucket.ref.root) c.stack = append(c.stack, elemRef{page: p, node: n, index: 0}) c.first() // If we land on an empty page then move to the next value. // https://github.com/boltdb/bolt/issues/450 if c.stack[len(c.stack)-1].count() == 0 { c.next() } k, v, flags := c.keyValue() if (flags & uint32(bucketLeafFlag)) != 0 { return k, nil } return k, v } func (c *cursorT) First() ([]byte, []byte) { return cursorFirst(c) } /// cursorT.Last() moves the cursor to the last item in the bucket and returns /// its key and value. If the bucket is empty then a nil key and value are /// returned. The returned key and value are only valid for the life of the /// transaction. func cursorLast(c *cursorT) ([]byte, []byte) { g.Assert(c.bucket.tx.db != nil, "tx closed") c.stack = c.stack[:0] p, n := c.bucket.pageNode(c.bucket.ref.root) ref := elemRef{page: p, node: n} ref.index = ref.count() - 1 c.stack = append(c.stack, ref) c.last() k, v, flags := c.keyValue() if (flags & uint32(bucketLeafFlag)) != 0 { return k, nil } return k, v } func (c *cursorT) Last() ([]byte, []byte) { return cursorLast(c) } /// cursorT.Next() moves the cursor to the next item in the bucket and returns /// its key and value. If the cursor is at the end of the bucket then a nil key /// and value are returned. The returned key and value are only valid for the /// life of the transaction. func cursorNext(c *cursorT) ([]byte, []byte) { g.Assert(c.bucket.tx.db != nil, "tx closed") k, v, flags := c.next() if (flags & uint32(bucketLeafFlag)) != 0 { return k, nil } return k, v } func (c *cursorT) Next() ([]byte, []byte) { return cursorNext(c) } /// cursorT.Prev() moves the cursor to the previous item in the bucket and /// returns its key and value. If the cursor is at the beginning of the bucket /// then a nil key and value are returned. The returned key and value are only /// valid for the life of the transaction. func cursorPrev(c *cursorT) ([]byte, []byte) { g.Assert(c.bucket.tx.db != nil, "tx closed") // Attempt to move back one element until we're successful. // Move up the stack as we hit the beginning of each page in our stack. for i := len(c.stack) - 1; i >= 0; i-- { elem := &c.stack[i] if elem.index > 0 { elem.index-- break } c.stack = c.stack[:i] } // If we've hit the end then return nil. if len(c.stack) == 0 { return nil, nil } // Move down the stack to find the last element of the last leaf under // this branch. c.last() k, v, flags := c.keyValue() if (flags & uint32(bucketLeafFlag)) != 0 { return k, nil } return k, v } func (c *cursorT) Prev() ([]byte, []byte) { return cursorPrev(c) } /// cursorT.Seek() moves the cursor to a given key and returns it. If the key /// does not exist then the next key is used. If no keys follow, a nil key is /// returned. The returned key and value are only valid for the life of the /// transaction. func cursorSeek(c *cursorT, seek []byte) ([]byte, []byte) { k, v, flags := c.seek(seek) // If we ended up after the last element of a page then move to the next // one. ref := &c.stack[len(c.stack)-1] if ref.index >= ref.count() { k, v, flags = c.next() } if k == nil { return nil, nil } else if (flags & uint32(bucketLeafFlag)) != 0 { return k, nil } else { return k, v } } func (c *cursorT) Seek(seek []byte) ([]byte, []byte) { return cursorSeek(c, seek) } /// cursorT.Delete() removes the current key/value under the cursor from the /// bucket. cursorT.Delete() fails if current key/value is a bucket or if the /// transaction is not writable. func cursorDelete(c *cursorT) error { if c.bucket.tx.db == nil { return ErrTxClosed } else if !c.bucket.tx.writable { return ErrTxNotWritable } key, _, flags := c.keyValue() // Return an error if current value is a bucket. if (flags & bucketLeafFlag) != 0 { return ErrIncompatibleValue } c.node().del(key) return nil } func (c *cursorT) Delete() error { return cursorDelete(c) } /// cursorT.seek() moves the cursor to a given key and returns it. If the key /// does not exist then the next key is used. func (c *cursorT) seek(seek []byte) (key []byte, value []byte, flags uint32) { g.Assert(c.bucket.tx.db != nil, "tx closed") // Start from root page/node and traverse to correct page. c.stack = c.stack[:0] c.search(seek, c.bucket.ref.root) ref := &c.stack[len(c.stack)-1] // If the cursor is pointing to the end of page/node then return nil. if ref.index >= ref.count() { return nil, nil, 0 } // If this is a bucket then return a nil value. return c.keyValue() } /// cursorT.first() moves the cursor to the first leaf element under the last /// page in the stack. func (c *cursorT) first() { for { // Exit when we hit a leaf page. ref := &c.stack[len(c.stack)-1] if ref.isLeaf() { break } // Keep adding pages pointing to the first element to the stack. var pgid pgid if ref.node != nil { pgid = ref.node.inodes[ref.index].pgid } else { pgid = ref.page.branchPageElement( uint16(ref.index), ).pgid } p, n := c.bucket.pageNode(pgid) c.stack = append(c.stack, elemRef{page: p, node: n, index: 0}) } } /// cursorT.last() moves the cursor to the last leaf element under the last page /// in the stack. func (c *cursorT) last() { for { // Exit when we hit a leaf page. ref := &c.stack[len(c.stack)-1] if ref.isLeaf() { break } // Keep adding pages pointing to the last element in the stack. var pgid pgid if ref.node != nil { pgid = ref.node.inodes[ref.index].pgid } else { pgid = ref.page.branchPageElement( uint16(ref.index), ).pgid } p, n := c.bucket.pageNode(pgid) nextRef := elemRef{page: p, node: n} nextRef.index = nextRef.count() - 1 c.stack = append(c.stack, nextRef) } } /// cursorT.next() moves to the next leaf element and returns the key and value. /// If the cursor is at the last leaf element then it stays there and returns /// nil. func (c *cursorT) next() (key []byte, value []byte, flags uint32) { for { // Attempt to move over one element until we're successful. // Move up the stack as we hit the end of each page in our // stack. var i int for i = len(c.stack) - 1; i >= 0; i-- { elem := &c.stack[i] if elem.index < elem.count()-1 { elem.index++ break } } // If we've hit the root page then stop and return. This will // leave the cursor on the last element of the last page. if i == -1 { return nil, nil, 0 } // Otherwise start from where we left off in the stack and find // the first element of the first leaf page. c.stack = c.stack[:i+1] c.first() // If this is an empty page then restart and move back up the // stack. // https://github.com/boltdb/bolt/issues/450 if c.stack[len(c.stack)-1].count() == 0 { continue } return c.keyValue() } } /// cursorT.search() recursively performs a binary search against a given /// page/node until it finds a given key. func (c *cursorT) search(key []byte, pgid pgid) { p, n := c.bucket.pageNode(pgid) if p != nil && (p.flags&(branchPageFlag|leafPageFlag)) == 0 { panic(fmt.Sprintf("invalid page type: %d: %x", p.id, p.flags)) } e := elemRef{page: p, node: n} c.stack = append(c.stack, e) // If we're on a leaf page/node then find the specific node. if e.isLeaf() { c.nsearch(key) return } if n != nil { c.searchNode(key, n) return } c.searchPage(key, p) } func (c *cursorT) searchNode(key []byte, n *node) { var exact bool index := sort.Search(len(n.inodes), func(i int) bool { // TODO(benbjohnson): Optimize this range search. It's a bit // hacky right now. sort.Search() finds the lowest index where // f() != -1 but we need the highest index. ret := bytes.Compare(n.inodes[i].key, key) if ret == 0 { exact = true } return ret != -1 }) if !exact && index > 0 { index-- } c.stack[len(c.stack)-1].index = index // Recursively search to the next page. c.search(key, n.inodes[index].pgid) } func (c *cursorT) searchPage(key []byte, p *page) { // Binary search for the correct range. inodes := p.branchPageElements() var exact bool index := sort.Search(int(p.count), func(i int) bool { // TODO(benbjohnson): Optimize this range search. It's a bit // hacky right now. sort.Search() finds the lowest index where // f() != -1 but we need the highest index. ret := bytes.Compare(inodes[i].key(), key) if ret == 0 { exact = true } return ret != -1 }) if !exact && index > 0 { index-- } c.stack[len(c.stack)-1].index = index // Recursively search to the next page. c.search(key, inodes[index].pgid) } /// cursorT.nsearch() searches the leaf node on the top of the stack for a key. func (c *cursorT) nsearch(key []byte) { e := &c.stack[len(c.stack)-1] p, n := e.page, e.node // If we have a node then search its inodes. if n != nil { index := sort.Search(len(n.inodes), func(i int) bool { return bytes.Compare(n.inodes[i].key, key) != -1 }) e.index = index return } // If we have a page then search its leaf elements. inodes := p.leafPageElements() index := sort.Search(int(p.count), func(i int) bool { return bytes.Compare(inodes[i].key(), key) != -1 }) e.index = index } /// cursorT.keyValue() returns the key and value of the current leaf element. func (c *cursorT) keyValue() ([]byte, []byte, uint32) { ref := &c.stack[len(c.stack)-1] if ref.count() == 0 || ref.index >= ref.count() { return nil, nil, 0 } // Retrieve value from node. if ref.node != nil { inode := &ref.node.inodes[ref.index] return inode.key, inode.value, inode.flags } // Or retrieve value from page. elem := ref.page.leafPageElement(uint16(ref.index)) return elem.key(), elem.value(), elem.flags } /// cursorT.node() returns the node that the cursor is currently positioned on. func (c *cursorT) node() *node { g.Assert( len(c.stack) > 0, "accessing a node with a zero-length cursor stack", ) // If the top of the stack is a leaf node then just return it. ref := &c.stack[len(c.stack)-1] if ref.node != nil && ref.isLeaf() { return ref.node } // Start from root and traverse down the hierarchy. n := c.stack[0].node if n == nil { n = c.bucket.node(c.stack[0].page.id, nil) } for _, ref := range c.stack[:len(c.stack)-1] { g.Assert(!n.isLeaf, "expected branch node") n = n.childAt(int(ref.index)) } g.Assert(n.isLeaf, "expected leaf node") return n } /// elemRef.isLeaf() returns whether the ref is pointing at a leaf page/node. func (r *elemRef) isLeaf() bool { if r.node != nil { return r.node.isLeaf } return (r.page.flags & leafPageFlag) != 0 } /// elemRef.count() returns the number of inodes or page elements. func (r *elemRef) count() int { if r.node != nil { return len(r.node.inodes) } return int(r.page.count) } /// databaseT.Path() returns the path to currently open database file. func (db *databaseT) Path() string { return db.path } func newDB(path string, file *os.File, options OpenOptionsT) *databaseT { return &databaseT{ MaxBatchSize: DefaultMaxBatchSize, MaxBatchDelay: DefaultMaxBatchDelay, opened: true, path: path, file: file, magic: options.Magic, commitHandlers: []func(){}, } } func openFile(path string) (*os.File, error) { const ( fileMode = 0o666 flagsRW = os.O_RDWR | os.O_CREATE flagsRO = os.O_RDONLY ) file, err := os.OpenFile(path, flagsRW, fileMode) if err == nil { return file, nil } if !errors.Is(err, fs.ErrPermission) { return nil, err } return os.OpenFile(path, flagsRO, fileMode) } func readPageSize(db *databaseT) (int, error) { // Read the first meta page to determine the page size. const size4KiB = 0x1000 buf := [0x1000]byte{} _, err := db.file.ReadAt(buf[:], 0) if err != nil { return 0, err } m := pageInBuffer(db, buf[:], 0).meta() err = m.validate(db) if err != nil { return 0, err } return int(m.pageSize), nil } /// initDB() creates a new database file and initializes its meta pages. func initDB(db *databaseT, size int64) error { if size != 0 { pageSize, err := readPageSize(db) if err != nil { return err } db.pageSize = pageSize return nil } // Set the page size to the OS page size. db.pageSize = os.Getpagesize() // Create two meta pages on a buffer. buf := make([]byte, db.pageSize*4) for i := 0; i < 2; i++ { p := pageInBuffer(db, buf[:], pgid(i)) p.id = pgid(i) p.flags = metaPageFlag // Initialize the meta page. m := p.meta() m.magic = db.magic m.version = version m.pageSize = uint32(db.pageSize) m.freelist = 2 m.root = bucket{root: 3} m.pgid = 4 m.txid = txid(i) m.checksum = m.sum64() } // Write an empty freelist at page 3. p := pageInBuffer(db, buf[:], pgid(2)) p.id = pgid(2) p.flags = freelistPageFlag p.count = 0 // Write an empty leaf page at page 4. p = pageInBuffer(db, buf[:], pgid(3)) p.id = pgid(3) p.flags = leafPageFlag p.count = 0 // Write the buffer to our data file. _, err := db.file.WriteAt(buf, 0) if err != nil { return err } err = fdatasync(db) if err != nil { return err } return nil } func (cursor *inMemoryCursorT) First() ([]byte, []byte) { return nil, nil // FIXME } func (cursor *inMemoryCursorT) Last() ([]byte, []byte) { return nil, nil // FIXME } func (cursor *inMemoryCursorT) Next() ([]byte, []byte) { return nil, nil // FIXME } func (cursor *inMemoryCursorT) Prev() ([]byte, []byte) { return nil, nil // FIXME } func (cursor *inMemoryCursorT) Seek([]byte) ([]byte, []byte) { return nil, nil // FIXME } func (cursor *inMemoryCursorT) Delete() error { return nil // FIXME } func memBucketGetNested( bucket *inMemoryValueT, name []byte, ) (*inMemoryValueT, error) { if bucket.tx.root == nil { return nil, ErrTxClosed } value, ok := bucket.data.Get(name) if !ok { return nil, ErrBucketNotFound } if !value.isBucket { return nil, ErrBucketBadFlag } return value, nil } func (bucket *inMemoryValueT) ROBucket(name []byte) (ROBucketI, error) { return memBucketGetNested(bucket, name) } func (bucket *inMemoryValueT) RWBucket(name []byte) (RWBucketI, error) { return memBucketGetNested(bucket, name) } func (bucket *inMemoryValueT) CreateBucket(name []byte) (RWBucketI, error) { if bucket.tx.root == nil { return nil, ErrTxClosed } else if !bucket.tx.writable { return nil, ErrTxNotWritable } else if len(name) == 0 { return nil, ErrBucketNameRequired } _, err := bucket.RWBucket(name) if err == nil { return nil, ErrBucketExists } newBucket := &inMemoryValueT{ isBucket: true, tx: bucket.tx, value: nil, data: pds.NewMap[[]byte, *inMemoryValueT](nil), } bucket.data = bucket.data.Set(name, newBucket) return newBucket, nil } func (bucket *inMemoryValueT) CreateBucketIfNotExists(name []byte) (RWBucketI, error) { if bucket.tx.root == nil { return nil, ErrTxClosed } else if !bucket.tx.writable { return nil, ErrTxNotWritable } newBucket, err := bucket.CreateBucket(name) if err == ErrBucketExists { return bucket.RWBucket(name) } else if err != nil { return nil, err } return newBucket, nil } func (bucket *inMemoryValueT) Delete(name []byte) error { if bucket.tx.root == nil { return ErrTxClosed } else if !bucket.tx.writable { return ErrTxNotWritable } bucket.data = bucket.data.Delete(name) return nil } func (bucket *inMemoryValueT) DeleteBucket(name []byte) error { if bucket.tx.root == nil { return ErrTxClosed } else if !bucket.tx.writable { return ErrTxNotWritable } bucket.data = bucket.data.Delete(name) return nil } func (bucket *inMemoryValueT) ROCursor() ROCursorI { return &inMemoryCursorT{ bucket: bucket, iterator: bucket.data.Iterator(), } } func (bucket *inMemoryValueT) RWCursor() RWCursorI { return &inMemoryCursorT{ bucket: bucket, iterator: bucket.data.Iterator(), } } func (bucket *inMemoryValueT) ForEach(fn func([]byte, []byte) error) error { if bucket.tx.root == nil { return ErrTxClosed } cursor := bucket.ROCursor() for key, value := cursor.First(); key != nil; key, value = cursor.Next() { err := fn(key, value) if err != nil { return err } } return nil } func (bucket *inMemoryValueT) Get(key []byte) []byte { value, ok := bucket.data.Get(key) if !ok { return nil } return value.value } func (bucket *inMemoryValueT) Put(key []byte, value []byte) error { if bucket.tx.root == nil { return ErrTxClosed } else if !bucket.tx.writable { return ErrTxNotWritable } newValue := &inMemoryValueT{ isBucket: false, tx: nil, value: value, data: nil, } bucket.data = bucket.data.Set(key, newValue) return nil } func (bucket *inMemoryValueT) NextID() []byte { id := g.Must(bucket.NextSequence()) return ID2Bytes(id) } func (bucket *inMemoryValueT) NextSequence() (uint64, error) { if bucket.tx.root == nil { return 0, ErrTxClosed } else if !bucket.tx.writable { return 0, ErrTxNotWritable } return 0, nil // FIXME } func memTxBucket(tx *inMemoryTxT, name []byte) (*inMemoryValueT, error) { if tx.root == nil { return nil, ErrTxClosed } return memBucketGetNested(tx.root, name) } func (tx *inMemoryTxT) ROBucket(name []byte) (ROBucketI, error) { return memTxBucket(tx, name) } func (tx *inMemoryTxT) RWBucket(name []byte) (RWBucketI, error) { return memTxBucket(tx, name) } func (tx *inMemoryTxT) CreateBucket(name []byte) (RWBucketI, error) { // return tx.root.CreateBucket(name) return nil, nil // FIXME } func (tx *inMemoryTxT) CreateBucketIfNotExists(name []byte) (RWBucketI, error) { return nil, nil // FIXME } func (tx *inMemoryTxT) ROCursor() ROCursorI { return nil // FIXME } func (tx *inMemoryTxT) RWCursor() RWCursorI { return nil // FIXME } func (tx *inMemoryTxT) DeleteBucket(name []byte) error { if tx.root == nil { return ErrTxClosed } return tx.root.DeleteBucket(name) } func (tx *inMemoryTxT) ROForEach(fn func([]byte, ROBucketI) error) error { return nil // FIXME } func (tx *inMemoryTxT) RWForEach(fn func([]byte, RWBucketI) error) error { return nil // FIXME } func (tx *inMemoryTxT) OnCommit(fn func()) { tx.commitHandlers = append(tx.commitHandlers, fn) } func (tx *inMemoryTxT) Check() <-chan error { ch := make(chan error) close(ch) return ch } func (tx *inMemoryTxT) WriteTo(io.Writer) (int64, error) { return 0, nil // FIXME } func (db *inMemoryDatabaseT) Close() error { db.root = nil return nil } func (db *inMemoryDatabaseT) Path() string { return "" // FIXME } func (db *inMemoryDatabaseT) Update(fn func(TransactionI) error) error { db.rwmutex.Lock() defer db.rwmutex.Unlock() tx := &inMemoryTxT{ root: &inMemoryValueT{ isBucket: true, value: nil, data: db.root, }, commitHandlers: []func(){}, writable: true, } tx.root.tx = tx err := fn(tx) if err != nil { tx.root = nil return err } // commit db.root = tx.root.data runHandlers(slices.Concat(tx.commitHandlers, db.commitHandlers)) return nil } func (db *inMemoryDatabaseT) View(fn func(SnapshotI) error) error { db.rwmutex.RLock() defer db.rwmutex.RUnlock() tx := &inMemoryTxT{ root: &inMemoryValueT{ isBucket: true, value: nil, data: db.root, }, commitHandlers: nil, writable: false, } tx.root.tx = tx return fn(tx) } func OpenMemory() DatabaseI { var rwmutex sync.RWMutex return &inMemoryDatabaseT{ root: pds.NewMap[[]byte, *inMemoryValueT](nil), rwmutex: rwmutex, commitHandlers: []func(){}, } } /// Open creates and opens a database at the given path. If the file does not /// exist then it will be created automatically. func OpenWith(path string, options OpenOptionsT) (DatabaseI, error) { file, err := openFile(path) if err != nil { return nil, err } info, err := file.Stat() if err != nil { file.Close() return nil, err } db := newDB(path, file, options) err = flock(db) if err != nil { _ = db.close() return nil, err } err = initDB(db, info.Size()) if err != nil { return nil, err } // Initialize page pool. db.pagePool = sync.Pool{ New: func() any { return make([]byte, db.pageSize) }, } // Memory map the data file. err = db.mmap(0) if err != nil { _ = db.close() return nil, err } // Read in the freelist. db.freelist = newFreelist() db.freelist.read(db.page(db.meta().freelist)) // Mark the database as opened and return. return db, nil } func Open(path string) (DatabaseI, error) { return OpenWith(path, defaultOptions) } /// databaseT.mmap() opens the underlying memory-mapped file and initializes the meta /// references. minsz is the minimum size that the new mmap can be. func (db *databaseT) mmap(minsz int) error { db.mmaplock.Lock() defer db.mmaplock.Unlock() info, err := db.file.Stat() if err != nil { return fmt.Errorf("mmap stat error: %s", err) } else if int(info.Size()) < db.pageSize*2 { return fmt.Errorf("file size too small") } // Ensure the size is at least the minimum size. size := int(info.Size()) if size < minsz { size = minsz } size, err = db.mmapSize(size) if err != nil { return err } // Dereference all mmap references before unmapping. if db.rwtx != nil { db.rwtx.root.dereference() } // Unmap existing data before continuing. err = db.munmap() if err != nil { return err } // Memory-map the data file as a byte slice. err = mmap(db, size) if err != nil { return err } // Save references to the meta pages. db.meta0 = db.page(0).meta() db.meta1 = db.page(1).meta() // Validate the meta pages. We only return an error if both meta pages // fail validation, since meta0 failing validation means that it wasn't // saved properly -- but we can recover using meta1. And vice-versa. err0 := db.meta0.validate(db) err1 := db.meta1.validate(db) if err0 != nil && err1 != nil { return err0 } return nil } /// databaseT.munmap() unmaps the data file from memory. func (db *databaseT) munmap() error { err := munmap(db) if err != nil { return fmt.Errorf("unmap error: " + err.Error()) } return nil } /// databaseT. mmapSize() determines the appropriate size for the mmap given the /// current size of the database. The minimum size is 32KB and doubles until it /// reaches 1GB. Returns an error if the new mmap size is greater than the max /// allowed. func (db *databaseT) mmapSize(size int) (int, error) { // Double the size from 32KB until 1GB. for i := uint(15); i <= 30; i++ { if size <= 1< maxMapSize { return 0, fmt.Errorf("mmap too large") } // If larger than 1GB then grow by 1GB at a time. sz := int64(size) remainder := sz % int64(maxMmapStep) if remainder > 0 { sz += int64(maxMmapStep) - remainder } // Ensure that the mmap size is a multiple of the page size. // This should always be true since we're incrementing in MBs. pageSize := int64(db.pageSize) if (sz % pageSize) != 0 { sz = ((sz / pageSize) + 1) * pageSize } // If we've exceeded the max size then only grow up to the max size. if sz > maxMapSize { sz = maxMapSize } return int(sz), nil } /// databaseT.Close() releases all database resources. All transactions must be closed /// before closing the database. func (db *databaseT) Close() error { db.rwlock.Lock() defer db.rwlock.Unlock() db.metalock.Lock() defer db.metalock.Unlock() db.mmaplock.RLock() defer db.mmaplock.RUnlock() return db.close() } func (db *databaseT) close() error { if !db.opened { return nil } db.opened = false db.freelist = nil // Close the mmap. err := db.munmap() if err != nil { return err } // Close file handles. if db.file != nil { // No need to unlock read-only file. // Unlock the file. err := funlock(db) if err != nil { log.Printf("dedo.Close(): funlock error: %s", err) } // Close the file descriptor. err = db.file.Close() if err != nil { return fmt.Errorf("db file close: %s", err) } db.file = nil } db.path = "" return nil } /// databaseT.begin() starts a new transaction. Multiple read-only transactions can be /// used concurrently but only one write transaction can be used at a time. /// Starting multiple write transactions will cause the calls to block and be /// serialized until the current write transaction finishes. /// /// Transactions should not be dependent on one another. Opening a read /// transaction and a write transaction in the same goroutine can cause the /// writer to deadlock because the database periodically needs to re-mmap itself /// as it grows and it cannot do that while a read transaction is open. /// /// If a long running read transaction (for example, a snapshot transaction) is /// needed, you might want to set databaseT.InitialMmapSize to a large enough value to /// avoid potential blocking of write transaction. /// /// IMPORTANT: You must close read-only transactions after you are finished or /// else the database will not reclaim old pages. func (db *databaseT) beginSnapshot() (*transactionT, error) { // Lock the meta pages while we initialize the transaction. We obtain // the meta lock before the mmap lock because that's the order that the // write transaction will obtain them. db.metalock.Lock() // Obtain a read-only lock on the mmap. When the mmap is remapped it // will obtain a write lock so all transactions must finish before it // can be remapped. db.mmaplock.RLock() // Exit if the database is not open yet. if !db.opened { db.mmaplock.RUnlock() db.metalock.Unlock() return nil, ErrDatabaseNotOpen } // Create a transaction associated with the database. t := &transactionT{} t.init(db) // Keep track of transaction until it closes. db.txs = append(db.txs, t) // Unlock the meta pages. db.metalock.Unlock() return t, nil } func (db *databaseT) beginTransaction() (*transactionT, error) { // Obtain writer lock. This is released by the transaction when it // closes. This enforces only one writer transaction at a time. db.rwlock.Lock() // Once we have the writer lock then we can lock the meta pages so that // we can set up the transaction. db.metalock.Lock() defer db.metalock.Unlock() // Exit if the database is not open yet. if !db.opened { db.rwlock.Unlock() return nil, ErrDatabaseNotOpen } // Create a transaction associated with the database. t := &transactionT{writable: true} t.init(db) db.rwtx = t // Free any pages associated with closed read-only transactions. minid := txid(0xFFFFFFFFFFFFFFFF) for _, t := range db.txs { if t.meta.txid < minid { minid = t.meta.txid } } if minid > 0 { db.freelist.release(minid - 1) } return t, nil } /// databaseT.removeTx() removes a transaction from the database. func (db *databaseT) removeTx(tx *transactionT) { // Release the read lock on the mmap. db.mmaplock.RUnlock() // Use the meta lock to restrict access to the databaseT object. db.metalock.Lock() // Remove the transaction. for i, t := range db.txs { if t == tx { last := len(db.txs) - 1 db.txs[i] = db.txs[last] db.txs[last] = nil db.txs = db.txs[:last] break } } // Unlock the meta pages. db.metalock.Unlock() } /// databaseT.Update() executes a function within the context of a read-write managed /// transaction. If no error is returned from the function then the transaction /// is committed. If an error is returned then the entire transaction is rolled /// back. Any error that is returned from the function or returned from the /// commit is returned from the databaseT.Update() method. /// /// Attempting to manually commit or rollback within the function will cause a /// panic. func (db *databaseT) Update(fn func(TransactionI) error) error { t, err := db.beginTransaction() if err != nil { return err } // Make sure the transaction rolls back in the event of a panic. defer func() { if t.db != nil { t.doRollback() } }() // If an error is returned from the function then rollback and return // error. err = fn(t) if err != nil { _ = t.rollback() return err } return t.commit() } /// databaseT.View() executes a function within the context of a managed read-only /// transaction. Any error that is returned from the function is returned from /// the databaseT.View() method. /// /// Attempting to manually rollback within the function will cause a panic. func (db *databaseT) View(fn func(SnapshotI) error) error { t, err := db.beginSnapshot() if err != nil { return err } // Make sure the transaction rolls back in the event of a panic. defer func() { if t.db != nil { t.doRollback() } }() // If an error is returned from the function then pass it through. err = fn(t) if err != nil { _ = t.rollback() return err } err = t.rollback() if err != nil { return err } return nil } func needsNewBatch(batch *batch, max int) bool { return (batch == nil) || (batch != nil && len(batch.calls) >= max) } /// databaseT.Batch() calls fn as part of a batch. It behaves similar to Update, /// except: /// /// 1. concurrent databaseT.Batch() calls can be combined into a single Dedo /// transaction. /// /// 2. the function passed to databaseT.Batch() may be called multiple times, /// regardless of whether it returns error or not. /// /// This means that databaseT.Batch() function side effects must be idempotent and take /// permanent effect only after a successful return is seen in caller. /// /// The maximum batch size and delay can be adjusted with databaseT.MaxBatchSize and /// databaseT.MaxBatchDelay, respectively. /// /// databaseT.Batch() is only useful when there are multiple goroutines calling it. func (db *databaseT) Batch(fn func(TransactionI) error) error { errCh := make(chan error, 1) db.batchMu.Lock() if needsNewBatch(db.batch, db.MaxBatchSize) { // There is no existing batch, or the existing batch is full; // start a new one. db.batch = &batch{ db: db, } db.batch.timer = time.AfterFunc( db.MaxBatchDelay, db.batch.trigger, ) } db.batch.calls = append(db.batch.calls, call{fn: fn, err: errCh}) if len(db.batch.calls) >= db.MaxBatchSize { // wake up batch, it's ready to run go db.batch.trigger() } db.batchMu.Unlock() err := <-errCh if err == trySolo { err = db.Update(fn) } return err } /// batch.trigger() runs the batch if it hasn't already been run. func (b *batch) trigger() { b.start.Do(b.run) } /// batch.run() performs the transactions in the batch and communicates results /// back to databaseT.Batch. func (b *batch) run() { b.db.batchMu.Lock() b.timer.Stop() // Make sure no new work is added to this batch, but don't break // other batches. if b.db.batch == b { b.db.batch = nil } b.db.batchMu.Unlock() retry: for len(b.calls) > 0 { failIdx := -1 err := b.db.Update(func(tx TransactionI) error { for i, c := range b.calls { err := safelyCall(c.fn, tx) if err != nil { failIdx = i return err } } return nil }) if failIdx >= 0 { // take the failing transaction out of the batch. it's // safe to shorten b.calls here because db.batch no // longer points to us, and we hold the mutex anyway. c := b.calls[failIdx] b.calls[failIdx], b.calls = b.calls[ len(b.calls) - 1], b.calls[:len(b.calls) - 1] // tell the submitter re-run it solo, continue with the // rest of the batch c.err <- trySolo continue retry } // pass success, or dedo internal errors, to all callers for _, c := range b.calls { c.err <- err } break retry } } func (p panicked) Error() string { err, ok := p.reason.(error) if ok { return err.Error() } return fmt.Sprintf("panic: %v", p.reason) } func safelyCall(fn func(TransactionI) error, tx TransactionI) (err error) { defer func() { p := recover() if p != nil { err = panicked{p} } }() return fn(tx) } /// db.page() retrieves a page reference from the mmap based on the current page /// size. func (db *databaseT) page(id pgid) *page { pos := id * pgid(db.pageSize) return (*page)(unsafe.Pointer(&db.data[pos])) } /// pageInBuffer() retrieves a page reference from a given byte array based on /// the current page size. func pageInBuffer(db *databaseT, b []byte, id pgid) *page { return (*page)(unsafe.Pointer(&b[id*pgid(db.pageSize)])) } /// db.meta() retrieves the current meta page reference. func (db *databaseT) meta() *meta { // We have to return the meta with the highest txid which doesn't fail // validation. Otherwise, we can cause errors when in fact the database // is in a consistent state. metaA is the one with the higher txid. metaA := db.meta0 metaB := db.meta1 if db.meta1.txid > db.meta0.txid { metaA = db.meta1 metaB = db.meta0 } // Use higher meta page if valid. Otherwise fallback to previous, if // valid. err := metaA.validate(db) if err == nil { return metaA } err = metaB.validate(db) if err == nil { return metaB } // This should never be reached, because both meta1 and meta0 were // validated on mmap() and we do fsync() on every write. panic("dedo.databaseT.meta(): invalid meta pages") } /// db.allocate() returns a contiguous block of memory starting at a given page. func (db *databaseT) allocate(count int) (*page, error) { // Allocate a temporary buffer for the page. var buf []byte if count == 1 { buf = db.pagePool.Get().([]byte) } else { buf = make([]byte, count*db.pageSize) } p := (*page)(unsafe.Pointer(&buf[0])) p.overflow = uint32(count - 1) // Use pages from the freelist if they are available. p.id = db.freelist.allocate(count) if p.id != 0 { return p, nil } // Resize mmap() if we're at the end. p.id = db.rwtx.meta.pgid minsz := int((p.id+pgid(count))+1) * db.pageSize if minsz >= db.datasz { err := db.mmap(minsz) if err != nil { return nil, fmt.Errorf("mmap allocate error: %s", err) } } // Move the page id high water mark. db.rwtx.meta.pgid += pgid(count) return p, nil } /// db.grow() grows the size of the database to the given sz. func (db *databaseT) grow(sz int) error { // Ignore if the new size is less than available file size. if sz <= db.filesz { return nil } // If the data is smaller than the alloc size then only allocate what's // needed. Once it goes over the allocation size then allocate in // chunks. if db.datasz < allocGrowthSize { sz = db.datasz } else { sz += allocGrowthSize } // Truncate and fsync to ensure file size metadata is flushed. // https://github.com/boltdb/bolt/issues/284 err := db.file.Truncate(int64(sz)) if err != nil { return fmt.Errorf("file resize error: %s", err) } err = db.file.Sync() if err != nil { return fmt.Errorf("file sync error: %s", err) } db.filesz = sz return nil } /// meta.validate() checks the marker bytes and version of the meta page to /// ensure it matches this binary. func (m *meta) validate(db *databaseT) error { if m.magic != db.magic { return ErrInvalid } else if m.version != version { return ErrVersionMismatch } else if m.checksum != 0 && m.checksum != m.sum64() { return ErrChecksum } return nil } /// meta.copy() copies one meta object to another. func (m *meta) copy(dest *meta) { *dest = *m } /// meta.write() writes the meta onto a page. func (m *meta) write(p *page) { if m.root.root >= m.pgid { panic(fmt.Sprintf( "root bucket pgid (%d) above high water mark (%d)", m.root.root, m.pgid, )) } else if m.freelist >= m.pgid { panic(fmt.Sprintf( "freelist pgid (%d) above high water mark (%d)", m.freelist, m.pgid, )) } // Page id is either going to be 0 or 1 which we can determine by the // transaction ID. p.id = pgid(m.txid % 2) p.flags |= metaPageFlag // Calculate the checksum. m.checksum = m.sum64() m.copy(p.meta()) } /// meta.sum64() generates the checksum for the meta. func (m *meta) sum64() uint64 { h := fnv.New64a() _, _ = h.Write( (*[unsafe.Offsetof(meta{}.checksum)]byte)(unsafe.Pointer(m))[:], ) return h.Sum64() } /// newFreelist() returns an empty, initialized freelist. func newFreelist() *freelist { return &freelist{ pending: make(map[txid][]pgid), cache: make(map[pgid]bool), } } /// freelist.size() returns the size of the page after serialization. func (f *freelist) size() int { n := f.count() if n >= 0xFFFF { // The first element will be used to store the count. See // freelist.write. n++ } return pageHeaderSize + (int(unsafe.Sizeof(pgid(0))) * n) } /// freelist.count() returns count of pages on the freelist func (f *freelist) count() int { return f.free_count() + f.pending_count() } /// freelist.free_count() returns count of free pages func (f *freelist) free_count() int { return len(f.ids) } /// freelist.pending_count() returns count of pending pages func (f *freelist) pending_count() int { count := 0 for _, list := range f.pending { count += len(list) } return count } /// freelist.copyall() copies into dst a list of all free ids and all pending /// ids in one sorted list. /// f.count returns the minimum length required for dst. func (f *freelist) copyall(dst []pgid) { m := make(pgids, 0, f.pending_count()) for _, list := range f.pending { m = append(m, list...) } sort.Sort(m) mergepgids(dst, f.ids, m) } /// freelist.allocate() returns the starting page id of a contiguous list of /// pages of a given size. If a contiguous block cannot be found then 0 is /// returned. func (f *freelist) allocate(n int) pgid { if len(f.ids) == 0 { return 0 } initial := pgid(0) previd := pgid(0) for i, id := range f.ids { if id <= 1 { panic(fmt.Sprintf("invalid page allocation: %d", id)) } // Reset initial page if this is not contiguous. if previd == 0 || id-previd != 1 { initial = id } // If we found a contiguous block then remove it and return it. if (id-initial)+1 == pgid(n) { // If we're allocating off the beginning then take the // fast path and just adjust the existing slice. This // will use extra memory temporarily but the append() in // free() will realloc the slice as is necessary. if (i + 1) == n { f.ids = f.ids[i+1:] } else { copy(f.ids[i-n+1:], f.ids[i+1:]) f.ids = f.ids[:len(f.ids)-n] } // Remove from the free cache. for i := pgid(0); i < pgid(n); i++ { delete(f.cache, initial+i) } return initial } previd = id } return 0 } /// freelist.free() releases a page and its overflow for a given transaction id. /// If the page is already free then a panic will occur. func (f *freelist) free(txid txid, p *page) { if p.id <= 1 { panic(fmt.Sprintf("cannot free page 0 or 1: %d", p.id)) } // Free page and all its overflow pages. ids := f.pending[txid] for id := p.id; id <= p.id+pgid(p.overflow); id++ { // Verify that page is not already free. if f.cache[id] { panic(fmt.Sprintf("page %d already freed", id)) } // Add to the freelist and cache. ids = append(ids, id) f.cache[id] = true } f.pending[txid] = ids } /// release moves all page ids for a transaction id (or older) to the freelist. func (f *freelist) release(txid txid) { m := make(pgids, 0) for tid, ids := range f.pending { if tid <= txid { // Move transaction's pending pages to the available // freelist. Don't remove from the cache since the page // is still free. m = append(m, ids...) delete(f.pending, tid) } } sort.Sort(m) f.ids = pgids(f.ids).merge(m) } /// rollback removes the pages from a given pending tx. func (f *freelist) rollback(txid txid) { // Remove page ids from cache. for _, id := range f.pending[txid] { delete(f.cache, id) } // Remove pages from pending list. delete(f.pending, txid) } /// freed returns whether a given page is in the free list. func (f *freelist) freed(pgid pgid) bool { return f.cache[pgid] } /// freelist.read() initializes the freelist from a freelist page. func (f *freelist) read(p *page) { // If the page.count is at the max uint16 value (64k) then it's // considered an overflow and the size of the freelist is stored as the // first element. idx, count := 0, int(p.count) if count == 0xFFFF { idx = 1 count = int(((*[maxAllocSize]pgid)(unsafe.Pointer(&p.ptr)))[0]) } // Copy the list of page ids from the freelist. if count == 0 { f.ids = nil } else { ids := (*[maxAllocSize]pgid)(unsafe.Pointer(&p.ptr))[idx:count] f.ids = make([]pgid, len(ids)) copy(f.ids, ids) // Make sure they're sorted. sort.Sort(pgids(f.ids)) } // Rebuild the page cache. f.reindex() } /// freelist.write() writes the page ids onto a freelist page. All free and /// pending ids are saved to disk since in the event of a program crash, all /// pending ids will become free. func (f *freelist) write(p *page) error { // Combine the old free pgids and pgids waiting on an open transaction. // Update the header flag. p.flags |= freelistPageFlag // The page.count can only hold up to 64k elements so if we overflow // that number then we handle it by putting the size in the first // element. lenids := f.count() if lenids == 0 { p.count = uint16(lenids) } else if lenids < 0xFFFF { p.count = uint16(lenids) f.copyall(((*[maxAllocSize]pgid)(unsafe.Pointer(&p.ptr)))[:]) } else { p.count = 0xFFFF pageID := pgid(lenids) ((*[maxAllocSize]pgid)(unsafe.Pointer(&p.ptr)))[0] = pageID f.copyall(((*[maxAllocSize]pgid)(unsafe.Pointer(&p.ptr)))[1:]) } return nil } /// reload reads the freelist from a page and filters out pending items. func (f *freelist) reload(p *page) { f.read(p) // Build a cache of only pending pages. pcache := make(map[pgid]bool) for _, pendingIDs := range f.pending { for _, pendingID := range pendingIDs { pcache[pendingID] = true } } // Check each page in the freelist and build a new available freelist // with any pages not in the pending lists. a := []pgid{} for _, id := range f.ids { if !pcache[id] { a = append(a, id) } } f.ids = a // Once the available list is rebuilt then rebuild the free cache so // that it includes the available and pending free pages. f.reindex() } /// freelist.reindex() rebuilds the free cache based on available and pending /// free lists. func (f *freelist) reindex() { f.cache = make(map[pgid]bool, len(f.ids)) for _, id := range f.ids { f.cache[id] = true } for _, pendingIDs := range f.pending { for _, pendingID := range pendingIDs { f.cache[pendingID] = true } } } /// node.root() returns the top-level node this node is attached to. func (n *node) root() *node { if n.parent == nil { return n } return n.parent.root() } /// node.minKeys() returns the minimum number of inodes this node should have. func (n *node) minKeys() int { if n.isLeaf { return 1 } return 2 } /// node.size() returns the size of the node after serialization. func (n *node) size() int { sz, elsz := pageHeaderSize, n.pageElementSize() for i := 0; i < len(n.inodes); i++ { item := &n.inodes[i] sz += elsz + len(item.key) + len(item.value) } return sz } /// node.sizeLessThan() returns true if the node is less than a given size. /// This is an optimization to avoid calculating a large node when we only need /// to know if it fits inside a certain page size. func (n *node) sizeLessThan(v int) bool { sz, elsz := pageHeaderSize, n.pageElementSize() for i := 0; i < len(n.inodes); i++ { item := &n.inodes[i] sz += elsz + len(item.key) + len(item.value) if sz >= v { return false } } return true } /// node.pageElementSize() returns the size of each page element based on the /// type of node. func (n *node) pageElementSize() int { if n.isLeaf { return leafPageElementSize } else { return branchPageElementSize } } /// node.childAt() returns the child node at a given index. func (n *node) childAt(index int) *node { if n.isLeaf { panic(fmt.Sprintf("invalid childAt(%d) on a leaf node", index)) } return n.bucket.node(n.inodes[index].pgid, n) } /// node.childIndex() returns the index of a given child node. func (n *node) childIndex(child *node) int { index := sort.Search(len(n.inodes), func(i int) bool { return bytes.Compare(n.inodes[i].key, child.key) != -1 }) return index } /// node.numChildren() returns the number of children. func (n *node) numChildren() int { return len(n.inodes) } /// node.nextSibling() returns the next node with the same parent. func (n *node) nextSibling() *node { if n.parent == nil { return nil } index := n.parent.childIndex(n) if index >= n.parent.numChildren()-1 { return nil } return n.parent.childAt(index + 1) } /// node.prevSibling() returns the previous node with the same parent. func (n *node) prevSibling() *node { if n.parent == nil { return nil } index := n.parent.childIndex(n) if index == 0 { return nil } return n.parent.childAt(index - 1) } /// node.put() inserts a key/value. func (n *node) put(oldKey, newKey, value []byte, pgid pgid, flags uint32) { if pgid >= n.bucket.tx.meta.pgid { panic(fmt.Sprintf( "pgid (%d) above high water mark (%d)", pgid, n.bucket.tx.meta.pgid, )) } else if len(oldKey) <= 0 { panic("put: zero-length old key") } else if len(newKey) <= 0 { panic("put: zero-length new key") } // Find insertion index. index := sort.Search(len(n.inodes), func(i int) bool { return bytes.Compare(n.inodes[i].key, oldKey) != -1 }) // Add capacity and shift nodes if we don't have an exact match and need // to insert. exact := len(n.inodes) > 0 && index < len(n.inodes) && bytes.Equal(n.inodes[index].key, oldKey) if !exact { n.inodes = append(n.inodes, inode{}) copy(n.inodes[index+1:], n.inodes[index:]) } inode := &n.inodes[index] inode.flags = flags inode.key = newKey inode.value = value inode.pgid = pgid g.Assert(len(inode.key) > 0, "put: zero-length inode key") } /// node.del() removes a key from the node. func (n *node) del(key []byte) { // Find index of key. index := sort.Search(len(n.inodes), func(i int) bool { return bytes.Compare(n.inodes[i].key, key) != -1 }) // Exit if the key isn't found. if index >= len(n.inodes) || !bytes.Equal(n.inodes[index].key, key) { return } // Delete inode from the node. n.inodes = append(n.inodes[:index], n.inodes[index+1:]...) // Mark the node as needing rebalancing. n.unbalanced = true } /// node.read() initializes the node from a page. func (n *node) read(p *page) { n.pgid = p.id n.isLeaf = ((p.flags & leafPageFlag) != 0) n.inodes = make(inodes, int(p.count)) for i := 0; i < int(p.count); i++ { inode := &n.inodes[i] if n.isLeaf { elem := p.leafPageElement(uint16(i)) inode.flags = elem.flags inode.key = elem.key() inode.value = elem.value() } else { elem := p.branchPageElement(uint16(i)) inode.pgid = elem.pgid inode.key = elem.key() } g.Assert(len(inode.key) > 0, "read: zero-length inode key") } // Save first key so we can find the node in the parent when we spill. if len(n.inodes) > 0 { n.key = n.inodes[0].key g.Assert(len(n.key) > 0, "read: zero-length node key") } else { n.key = nil } } func u32position(a unsafe.Pointer, b unsafe.Pointer) uint32 { return uint32(uintptr(a) - uintptr(b)) } /// node.write() writes the items onto one or more pages. func (n *node) write(p *page) { // Initialize page. if n.isLeaf { p.flags |= leafPageFlag } else { p.flags |= branchPageFlag } if len(n.inodes) >= 0xFFFF { panic(fmt.Sprintf( "inode overflow: %d (pgid=%d)", len(n.inodes), p.id, )) } p.count = uint16(len(n.inodes)) // Stop here if there are no items to write. if p.count == 0 { return } // Loop over each item and write it to the page. nth := n.pageElementSize() * len(n.inodes) b := (*[maxAllocSize]byte)(unsafe.Pointer(&p.ptr))[nth:] for i, item := range n.inodes { g.Assert(len(item.key) > 0, "write: zero-length inode key") // Write the page element. if n.isLeaf { elem := p.leafPageElement(uint16(i)) elem.pos = u32position( unsafe.Pointer(&b[0]), unsafe.Pointer(elem), ) elem.flags = item.flags elem.ksize = uint32(len(item.key)) elem.vsize = uint32(len(item.value)) } else { elem := p.branchPageElement(uint16(i)) elem.pos = u32position( unsafe.Pointer(&b[0]), unsafe.Pointer(elem), ) elem.ksize = uint32(len(item.key)) elem.pgid = item.pgid g.Assert( elem.pgid != p.id, "write: circular dependency occurred", ) } // If the length of key+value is larger than the max allocation // size then we need to reallocate the byte array pointer. // // See: https://github.com/boltdb/bolt/pull/335 klen, vlen := len(item.key), len(item.value) if len(b) < klen+vlen { b = (*[maxAllocSize]byte)(unsafe.Pointer(&b[0]))[:] } // Write data for the element to the end of the page. copy(b[0:], item.key) b = b[klen:] copy(b[0:], item.value) b = b[vlen:] } } /// node.split() breaks up a node into multiple smaller nodes, if appropriate. /// This should only be called from the spill() function. func (n *node) split(pageSize int) []*node { nodes := []*node{} node := n for { // Split node into two. a, b := node.splitTwo(pageSize) nodes = append(nodes, a) // If we can't split then exit the loop. if b == nil { break } // Set node to b so it gets split on the next iteration. node = b } return nodes } /// node.splitTwo() breaks up a node into two smaller nodes, if appropriate. /// This should only be called from the split() function. func (n *node) splitTwo(pageSize int) (*node, *node) { const fillPercent = 0.5 // Ignore the split if the page doesn't have at least enough nodes for // two pages or if the nodes can fit in a single page. if len(n.inodes) <= (minKeysPerPage*2) || n.sizeLessThan(pageSize) { return n, nil } threshold := int(float64(pageSize) * fillPercent) // Determine split position and sizes of the two pages. splitIndex, _ := n.splitIndex(threshold) // Split node into two separate nodes. // If there's no parent then we'll need to create one. if n.parent == nil { n.parent = &node{bucket: n.bucket, children: []*node{n}} } // Create a new node and add it to the parent. next := &node{bucket: n.bucket, isLeaf: n.isLeaf, parent: n.parent} n.parent.children = append(n.parent.children, next) // Split inodes across two nodes. next.inodes = n.inodes[splitIndex:] n.inodes = n.inodes[:splitIndex] return n, next } /// node.splitIndex() finds the position where a page will fill a given /// threshold. It returns the index as well as the size of the first page. /// This is only be called from split(). func (n *node) splitIndex(threshold int) (index, sz int) { sz = pageHeaderSize // Loop until we only have the minimum number of keys required for the // second page. for i := 0; i < len(n.inodes)-minKeysPerPage; i++ { index = i inode := n.inodes[i] elsize := n.pageElementSize() + len(inode.key) + len(inode.value) // If we have at least the minimum number of keys and adding // another node would put us over the threshold then exit and // return. if i >= minKeysPerPage && sz+elsize > threshold { break } // Add the element size to the total size. sz += elsize } return } /// node.spill() writes the nodes to dirty pages and splits nodes as it goes. /// Returns an error if dirty pages cannot be allocated. func (n *node) spill() error { tx := n.bucket.tx if n.spilled { return nil } // Spill child nodes first. Child nodes can materialize sibling nodes // in the case of split-merge so we cannot use a range loop. We have to // check the children size on every loop iteration. sort.Sort(n.children) for i := 0; i < len(n.children); i++ { err := n.children[i].spill() if err != nil { return err } } // We no longer need the child list because it's only used for spill // tracking. n.children = nil // Split nodes into appropriate sizes. The first node will always be n. nodes := n.split(tx.db.pageSize) for _, node := range nodes { // Add node's page to the freelist if it's not new. if node.pgid > 0 { tx.db.freelist.free(tx.meta.txid, tx.page(node.pgid)) node.pgid = 0 } // Allocate contiguous space for the node. p, err := tx.allocate((node.size() / tx.db.pageSize) + 1) if err != nil { return err } // Write the node. if p.id >= tx.meta.pgid { panic(fmt.Sprintf( "pgid (%d) above high water mark (%d)", p.id, tx.meta.pgid, )) } node.pgid = p.id node.write(p) node.spilled = true // Insert into parent inodes. if node.parent != nil { key := node.key if key == nil { key = node.inodes[0].key } node.parent.put( key, node.inodes[0].key, nil, node.pgid, 0, ) node.key = node.inodes[0].key g.Assert( len(node.key) > 0, "spill: zero-length node key", ) } } // If the root node split and created a new root then we need to spill // that as well. We'll clear out the children to make sure it doesn't // try to respill. if n.parent != nil && n.parent.pgid == 0 { n.children = nil return n.parent.spill() } return nil } /// node.rebalance() attempts to combine the node with sibling nodes if the node /// fill size is below a threshold or if there are not enough keys. func (n *node) rebalance() { if !n.unbalanced { return } n.unbalanced = false // Ignore if node is above threshold (25%) and has enough keys. threshold := n.bucket.tx.db.pageSize / 4 if n.size() > threshold && len(n.inodes) > n.minKeys() { return } // Root node has special handling. if n.parent == nil { // If root node is a branch and only has one node then collapse // it. if !n.isLeaf && len(n.inodes) == 1 { // Move root's child up. child := n.bucket.node(n.inodes[0].pgid, n) n.isLeaf = child.isLeaf n.inodes = child.inodes[:] n.children = child.children // Reparent all child nodes being moved. for _, inode := range n.inodes { child, ok := n.bucket.nodes[inode.pgid] if ok { child.parent = n } } // Remove old child. child.parent = nil delete(n.bucket.nodes, child.pgid) child.free() } return } // If node has no keys then just remove it. if n.numChildren() == 0 { n.parent.del(n.key) n.parent.removeChild(n) delete(n.bucket.nodes, n.pgid) n.free() n.parent.rebalance() return } g.Assert( n.parent.numChildren() > 1, "parent must have at least 2 children", ) // Destination node is right sibling if idx == 0, otherwise left // sibling. var target *node useNextSibling := n.parent.childIndex(n) == 0 if useNextSibling { target = n.nextSibling() } else { target = n.prevSibling() } // If both this node and the target node are too small then merge them. if useNextSibling { // Reparent all child nodes being moved. for _, inode := range target.inodes { child, ok := n.bucket.nodes[inode.pgid] if ok { child.parent.removeChild(child) child.parent = n child.parent.children = append( child.parent.children, child, ) } } // Copy over inodes from target and remove target. n.inodes = append(n.inodes, target.inodes...) n.parent.del(target.key) n.parent.removeChild(target) delete(n.bucket.nodes, target.pgid) target.free() } else { // Reparent all child nodes being moved. for _, inode := range n.inodes { child, ok := n.bucket.nodes[inode.pgid] if ok { child.parent.removeChild(child) child.parent = target child.parent.children = append( child.parent.children, child, ) } } // Copy over inodes to target and remove node. target.inodes = append(target.inodes, n.inodes...) n.parent.del(n.key) n.parent.removeChild(n) delete(n.bucket.nodes, n.pgid) n.free() } // Either this node or the target node was deleted from the parent so // rebalance it. n.parent.rebalance() } /// node.removeChild() removes a node from the list of in-memory children. This /// does not affect the inodes. func (n *node) removeChild(target *node) { for i, child := range n.children { if child == target { n.children = append(n.children[:i], n.children[i+1:]...) return } } } /// node.dereference() causes the node to copy all its inode key/value /// references to heap memory. This is required when the mmap is reallocated so /// inodes are not pointing to stale data. func (n *node) dereference() { if n.key != nil { key := make([]byte, len(n.key)) copy(key, n.key) n.key = key g.Assert( n.pgid == 0 || len(n.key) > 0, "dereference: zero-length node key on existing node", ) } for i := range n.inodes { inode := &n.inodes[i] key := make([]byte, len(inode.key)) copy(key, inode.key) inode.key = key g.Assert( len(inode.key) > 0, "dereference: zero-length inode key", ) value := make([]byte, len(inode.value)) copy(value, inode.value) inode.value = value } // Recursively dereference children. for _, child := range n.children { child.dereference() } } /// node.free() adds the node's underlying page to the freelist. func (n *node) free() { if n.pgid != 0 { n.bucket.tx.db.freelist.free( n.bucket.tx.meta.txid, n.bucket.tx.page(n.pgid), ) n.pgid = 0 } } func (s nodes) Len() int { return len(s) } func (s nodes) Swap(i, j int) { s[i], s[j] = s[j], s[i] } func (s nodes) Less(i, j int) bool { return bytes.Compare(s[i].inodes[0].key, s[j].inodes[0].key) == -1 } /// typ returns a human readable page type string used for debugging. func (p *page) typ() string { if (p.flags & branchPageFlag) != 0 { return "branch" } else if (p.flags & leafPageFlag) != 0 { return "leaf" } else if (p.flags & metaPageFlag) != 0 { return "meta" } else if (p.flags & freelistPageFlag) != 0 { return "freelist" } return fmt.Sprintf("unknown<%02x>", p.flags) } /// page.meta() returns a pointer to the metadata section of the page. func (p *page) meta() *meta { return (*meta)(unsafe.Pointer(&p.ptr)) } /// page.leafPageElement() retrieves the leaf node by index func (p *page) leafPageElement(index uint16) *leafPageElement { n := &((*[0x7FFFFFF]leafPageElement)(unsafe.Pointer(&p.ptr)))[index] return n } /// page.leafPageElements() retrieves a list of leaf nodes. func (p *page) leafPageElements() []leafPageElement { if p.count == 0 { return nil } return ((*[0x7FFFFFF]leafPageElement)(unsafe.Pointer(&p.ptr)))[:] } /// page.branchPageElement() retrieves the branch node by index func (p *page) branchPageElement(index uint16) *branchPageElement { return &((*[0x7FFFFFF]branchPageElement)(unsafe.Pointer(&p.ptr)))[index] } /// page.branchPageElements() retrieves a list of branch nodes. func (p *page) branchPageElements() []branchPageElement { if p.count == 0 { return nil } return ((*[0x7FFFFFF]branchPageElement)(unsafe.Pointer(&p.ptr)))[:] } func (s pages) Len() int { return len(s) } func (s pages) Swap(i, j int) { s[i], s[j] = s[j], s[i] } func (s pages) Less(i, j int) bool { return s[i].id < s[j].id } /// branchPageElement.key() returns a byte slice of the node key. func (n *branchPageElement) key() []byte { buf := (*[maxAllocSize]byte)(unsafe.Pointer(n)) return (*[maxAllocSize]byte)(unsafe.Pointer(&buf[n.pos]))[:n.ksize] } /// leafPageElement.key() returns a byte slice of the node key. func (n *leafPageElement) key() []byte { buf := (*[maxAllocSize]byte)(unsafe.Pointer(n)) return (*[maxAllocSize]byte)( unsafe.Pointer(&buf[n.pos]), )[:n.ksize:n.ksize] } /// leafPageElement.value() returns a byte slice of the node value. func (n *leafPageElement) value() []byte { buf := (*[maxAllocSize]byte)(unsafe.Pointer(n)) return (*[maxAllocSize]byte)( unsafe.Pointer(&buf[n.pos+n.ksize]), )[:n.vsize:n.vsize] } func (s pgids) Len() int { return len(s) } func (s pgids) Swap(i, j int) { s[i], s[j] = s[j], s[i] } func (s pgids) Less(i, j int) bool { return s[i] < s[j] } /// pgids.merge() returns the sorted union of a and b. func (a pgids) merge(b pgids) pgids { // Return the opposite slice if one is nil. if len(a) == 0 { return b } if len(b) == 0 { return a } merged := make(pgids, len(a)+len(b)) mergepgids(merged, a, b) return merged } /// mergepgids() copies the sorted union of a and b into dst. If dst is too /// small, it panics. func mergepgids(dst, a, b pgids) { if len(dst) < len(a)+len(b) { panic(fmt.Errorf( "mergepgids bad len %d < %d + %d", len(dst), len(a), len(b), )) } // Copy in the opposite slice if one is nil. if len(a) == 0 { copy(dst, b) return } if len(b) == 0 { copy(dst, a) return } // Merged will hold all elements from both lists. merged := dst[:0] // Assign lead to the slice with a lower starting value, follow to the // higher value. lead, follow := a, b if b[0] < a[0] { lead, follow = b, a } // Continue while there are elements in the lead. for len(lead) > 0 { // Merge largest prefix of lead that is ahead of follow[0]. n := sort.Search(len(lead), func(i int) bool { return lead[i] > follow[0] }) merged = append(merged, lead[:n]...) if n >= len(lead) { break } // Swap lead and follow. lead, follow = follow, lead[n:] } // Append what's left in follow. _ = append(merged, follow...) } /// transactionT.init() initializes the transaction. func (tx *transactionT) init(db *databaseT) { tx.db = db tx.pages = nil // Copy the meta page since it can be changed by the writer. tx.meta = &meta{} db.meta().copy(tx.meta) // Copy over the root bucket. tx.root = newBucket(tx) tx.root.ref = &bucket{} *tx.root.ref = tx.meta.root // Increment the transaction id and add a page cache for writable // transactions. if tx.writable { tx.pages = make(map[pgid]*page) tx.meta.txid += txid(1) } } /// transactionT.ID() returns the transaction id. func txID(tx *transactionT) int { return int(tx.meta.txid) } func (tx *transactionT) ID() int { return txID(tx) } /// transactionT.Size() returns current database size in bytes as seen by this /// transaction. func txSize(tx *transactionT) int64 { return int64(tx.meta.pgid) * int64(tx.db.pageSize) } func (tx *transactionT) Size() int64 { return txSize(tx) } /// transactionT.Cursor() creates a cursor associated with the root bucket. All items in /// the cursor will return a nil value because all root bucket keys point to /// buckets. The cursor is only valid as long as the transaction is open. Do /// not use a cursor after the transaction is closed. func txCursor(tx *transactionT) *cursorT { return bucketCursor(&tx.root) } func (tx *transactionT) ROCursor() ROCursorI { return txCursor(tx) } func (tx *transactionT) RWCursor() RWCursorI { return txCursor(tx) } /// transactionT.Bucket() retrieves a bucket by name. Returns nil if the bucket does not /// exist. The bucket instance is only valid for the lifetime of the /// transaction. func txBucket(tx *transactionT, name []byte) (*bucketT, error) { return bucketGetNested(&tx.root, name) } func (tx *transactionT) ROBucket(name []byte) (ROBucketI, error) { return txBucket(tx, name) } func (tx *transactionT) RWBucket(name []byte) (RWBucketI, error) { return txBucket(tx, name) } /// transactionT.CreateBucket() creates a new bucket. Returns an error if the bucket /// already exists, if the bucket name is blank, or if the bucket name is too /// long. The bucket instance is only valid for the lifetime of the /// transaction. func txCreateBucket(tx *transactionT, name []byte) (*bucketT, error) { return bucketCreateBucket(&tx.root, name) } func (tx *transactionT) CreateBucket(name []byte) (RWBucketI, error) { return txCreateBucket(tx, name) } /// transactionT.CreateBucketIfNotExists() creates a new bucket if it doesn't already /// exist. Returns an error if the bucket name is blank, or if the bucket name /// is too long. The bucket instance is only valid for the lifetime of the /// transaction. func txCreateBucketIfNotExists(tx *transactionT, name []byte) (*bucketT, error) { return bucketCreateBucketIfNotExists(&tx.root, name) } func (tx *transactionT) CreateBucketIfNotExists(name []byte) (RWBucketI, error) { return txCreateBucketIfNotExists(tx, name) } /// transactionT.DeleteBucket() deletes a bucket. Returns an error if the bucket cannot /// be found or if the key represents a non-bucket value. func txDeleteBucket(tx *transactionT, name []byte) error { return tx.root.DeleteBucket(name) } func (tx *transactionT) DeleteBucket(name []byte) error { return txDeleteBucket(tx, name) } /// transactionT.ForEach() executes a function for each bucket in the root. If the /// provided function returns an error then the iteration is stopped and the /// error is returned to the caller. func txForEach[T any]( tx *transactionT, fn func([]byte, T) error, bucketFn func(*bucketT, []byte) (T, error), ) error { return tx.root.ForEach(func(k []byte, v []byte) error { bucket, err := bucketFn(&tx.root, k) if err != nil { return err } return fn(k, bucket) }) } func (tx *transactionT) ROForEach(fn func([]byte, ROBucketI) error) error { return txForEach(tx, fn, func( bucket *bucketT, name []byte, ) (ROBucketI, error) { return bucket.ROBucket(name) }) } func (tx *transactionT) RWForEach(fn func([]byte, RWBucketI) error) error { return txForEach(tx, fn, func( bucket *bucketT, name []byte, ) (RWBucketI, error) { return bucket.RWBucket(name) }) } /// transactionT.OnCommit() adds a handler function to be executed after the transaction /// successfully commits. func (tx *transactionT) OnCommit(fn func()) { tx.commitHandlers = append(tx.commitHandlers, fn) } /// transactionT.commit() writes all changes to disk and updates the meta page. Returns /// an error if a disk write error occurs, or if transactionT.commiti() is called on a /// read-only transaction. func (tx *transactionT) commit() error { if tx.db == nil { return ErrTxClosed } else if !tx.writable { return ErrTxNotWritable } // TODO(benbjohnson): Use vectorized I/O to write out dirty pages. // Rebalance nodes which have had deletions. tx.root.rebalance() // spill data onto dirty pages. err := tx.root.spill() if err != nil { tx.doRollback() return err } // Free the old root bucket. tx.meta.root.root = tx.root.ref.root opgid := tx.meta.pgid // Free the freelist and allocate new pages for it. This will // overestimate the size of the freelist but not underestimate the size // (which would be bad). tx.db.freelist.free(tx.meta.txid, tx.db.page(tx.meta.freelist)) p, err := tx.allocate((tx.db.freelist.size() / tx.db.pageSize) + 1) if err != nil { tx.doRollback() return err } err = tx.db.freelist.write(p) if err != nil { tx.doRollback() return err } tx.meta.freelist = p.id // If the high water mark has moved up then attempt to grow the // database. if tx.meta.pgid > opgid { err := tx.db.grow(int(tx.meta.pgid + 1) * tx.db.pageSize) if err != nil { tx.doRollback() return err } } // Write dirty pages to disk. err = tx.write() if err != nil { tx.doRollback() return err } // Write meta to disk. err = tx.writeMeta() if err != nil { tx.doRollback() return err } handlers := slices.Concat(tx.commitHandlers, tx.db.commitHandlers) tx.close() // Execute commit handlers now that the locks have been removed. runHandlers(handlers) return nil } /// transactionT.rollback() closes the transaction and ignores all previous updates. /// Read-only transactions must be rolled back and not committed. func (tx *transactionT) rollback() error { if tx.db == nil { return ErrTxClosed } tx.doRollback() return nil } func (tx *transactionT) doRollback() { if tx.db == nil { return } if tx.writable { tx.db.freelist.rollback(tx.meta.txid) tx.db.freelist.reload(tx.db.page(tx.db.meta().freelist)) } tx.close() } func (tx *transactionT) close() { if tx.db == nil { return } if tx.writable { // Remove transaction ref & writer lock. tx.db.rwtx = nil tx.db.rwlock.Unlock() } else { tx.db.removeTx(tx) } // Clear all references. tx.db = nil tx.meta = nil tx.root = bucketT{tx: tx} tx.pages = nil } /// transactionT.WriteTo() writes the entire database to a writer. If err == nil then /// exactly tx.Size() bytes will be written into the writer. func (tx *transactionT) WriteTo(w io.Writer) (n int64, err error) { f, err := os.OpenFile(tx.db.path, os.O_RDONLY, 0) if err != nil { return 0, err } defer f.Close() // Generate a meta page. We use the same page data for both meta pages. buf := make([]byte, tx.db.pageSize) page := (*page)(unsafe.Pointer(&buf[0])) page.flags = metaPageFlag *page.meta() = *tx.meta // Write meta 0. page.id = 0 page.meta().checksum = page.meta().sum64() nn, err := w.Write(buf) n += int64(nn) if err != nil { return n, fmt.Errorf("meta 0 copy: %s", err) } // Write meta 1 with a lower transaction id. page.id = 1 page.meta().txid -= 1 page.meta().checksum = page.meta().sum64() nn, err = w.Write(buf) n += int64(nn) if err != nil { return n, fmt.Errorf("meta 1 copy: %s", err) } // Move past the meta pages in the file. _, err = f.Seek(int64(tx.db.pageSize*2), os.SEEK_SET) if err != nil { return n, fmt.Errorf("seek: %s", err) } // Copy data pages. wn, err := io.CopyN(w, f, tx.Size()-int64(tx.db.pageSize*2)) n += wn if err != nil { return n, err } return n, f.Close() } /// transactionT.Check() performs several consistency checks on the database for this /// transaction. An error is returned if any inconsistency is found. /// /// It can be safely run concurrently on a writable transaction. However, this /// incurs a high cost for large databases and databases with a lot of /// subbuckets because of caching. This overhead can be removed if running on a /// read-only transaction, however, it is not safe to execute other writer /// transactions at the same time. func (tx *transactionT) Check() <-chan error { ch := make(chan error) go tx.check(ch) return ch } func (tx *transactionT) check(ch chan error) { // Check if any pages are double freed. freed := make(map[pgid]bool) all := make([]pgid, tx.db.freelist.count()) tx.db.freelist.copyall(all) for _, id := range all { if freed[id] { ch <- fmt.Errorf("page %d: already freed", id) } freed[id] = true } // Track every reachable page. reachable := make(map[pgid]*page) reachable[0] = tx.page(0) // meta0 reachable[1] = tx.page(1) // meta1 for i := uint32(0); i <= tx.page(tx.meta.freelist).overflow; i++ { reachable[tx.meta.freelist+pgid(i)] = tx.page(tx.meta.freelist) } // Recursively check buckets. tx.checkBucket(&tx.root, reachable, freed, ch) // Ensure all pages below high water mark are either reachable or freed. for i := pgid(0); i < tx.meta.pgid; i++ { _, isReachable := reachable[i] if !isReachable && !freed[i] { ch <- fmt.Errorf("page %d: unreachable unfreed", int(i)) } } // Close the channel to signal completion. close(ch) } func isBranchPage(p *page) bool { return (p.flags & branchPageFlag) == 0 } func isLeafPage(p *page) bool { return (p.flags & leafPageFlag) == 0 } func (tx *transactionT) checkBucket( b *bucketT, reachable map[pgid]*page, freed map[pgid]bool, ch chan error, ) { // Ignore inline buckets. if b.ref.root == 0 { return } // Check every page used by this bucket. b.tx.forEachPage(b.ref.root, 0, func(p *page, _ int) { if p.id > tx.meta.pgid { ch <- fmt.Errorf( "page %d: out of bounds: %d", int(p.id), int(b.tx.meta.pgid), ) } // Ensure each page is only referenced once. for i := pgid(0); i <= pgid(p.overflow); i++ { id := p.id + i _, ok := reachable[id] if ok { ch <- fmt.Errorf( "page %d: multiple references", int(id), ) } reachable[id] = p } // We should only encounter un-freed leaf and branch pages. if freed[p.id] { ch <- fmt.Errorf("page %d: reachable freed", int(p.id)) } else if isBranchPage(p) && isLeafPage(p) { ch <- fmt.Errorf( "page %d: invalid type: %s", int(p.id), p.typ(), ) } }) // Check each bucket within this bucket. _ = b.ForEach(func(k, v []byte) error { child, err := bucketGetNested(b, k) if err != nil{ return nil } tx.checkBucket(child, reachable, freed, ch) return nil }) } /// transactionT.allocate() returns a contiguous block of memory starting at a given page. func (tx *transactionT) allocate(count int) (*page, error) { p, err := tx.db.allocate(count) if err != nil { return nil, err } // Save to our page cache. tx.pages[p.id] = p return p, nil } /// transactionT.write() writes any dirty pages to disk. func (tx *transactionT) write() error { // Sort pages by id. pages := make(pages, 0, len(tx.pages)) for _, p := range tx.pages { pages = append(pages, p) } // Clear out page cache early. tx.pages = make(map[pgid]*page) sort.Sort(pages) // Write pages to disk in order. for _, p := range pages { size := (int(p.overflow) + 1) * tx.db.pageSize offset := int64(p.id) * int64(tx.db.pageSize) // Write out page in "max allocation" sized chunks. ptr := (*[maxAllocSize]byte)(unsafe.Pointer(p)) for { // Limit our write to our max allocation size. sz := size if sz > maxAllocSize-1 { sz = maxAllocSize - 1 } // Write chunk to disk. buf := ptr[:sz] _, err := tx.db.file.WriteAt(buf, offset) if err != nil { return err } // Exit inner for loop if we've written all the chunks. size -= sz if size == 0 { break } // Otherwise move offset forward and move pointer to // next chunk. offset += int64(sz) ptr = (*[maxAllocSize]byte)(unsafe.Pointer(&ptr[sz])) } } err := fdatasync(tx.db) if err != nil { return err } // Put small pages back to page pool. for _, p := range pages { // Ignore page sizes over 1 page. // These are allocated using make() instead of the page pool. if int(p.overflow) != 0 { continue } buf := (*[maxAllocSize]byte)(unsafe.Pointer(p))[:tx.db.pageSize] for i := range buf { buf[i] = 0 } tx.db.pagePool.Put(buf) } return nil } /// transactionT.writeMeta() writes the meta to the disk. func (tx *transactionT) writeMeta() error { // Create a temporary buffer for the meta page. buf := make([]byte, tx.db.pageSize) p := pageInBuffer(tx.db, buf, 0) tx.meta.write(p) // Write the meta page to file. _, err := tx.db.file.WriteAt(buf, int64(p.id)*int64(tx.db.pageSize)) if err != nil { return err } err = fdatasync(tx.db) if err != nil { return err } return nil } /// transactionT.page() returns a reference to the page with a given id. If page has been /// written to then a temporary buffered page is returned. func (tx *transactionT) page(id pgid) *page { // Check the dirty pages first. if tx.pages != nil { p, ok := tx.pages[id] if ok { return p } } // Otherwise return directly from the mmap. return tx.db.page(id) } /// transactionT.forEachPage() iterates over every page within a given page and executes a /// function. func (tx *transactionT) forEachPage(pgid pgid, depth int, fn func(*page, int)) { p := tx.page(pgid) // Execute function. fn(p, depth) // Recursively loop over children. if (p.flags & branchPageFlag) != 0 { for i := 0; i < int(p.count); i++ { elem := p.branchPageElement(uint16(i)) tx.forEachPage(elem.pgid, depth+1, fn) } } } func (database *inMemoryDatabaseT) OnCommit(fn func()) { database.commitHandlers = append(database.commitHandlers, fn) } func (database *databaseT) OnCommit(fn func()) { database.commitHandlers = append(database.commitHandlers, fn) } func noopGetopt(args argsT, _w io.Writer) (argsT, bool) { return args, true } func getGetopt(args argsT, w io.Writer) (argsT, bool) { if len(args.args) < 1 { fmt.Fprintf(w, "Missing KEY.\n") return args, false } args.key = []byte(args.args[0]) return args, true } func setGetopt(args argsT, w io.Writer) (argsT, bool) { if len(args.args) < 1 { fmt.Fprintf(w, "Missing KEY and VALUE.\n") return args, false } if len(args.args) < 2 { fmt.Fprintf(w, "Missing VALUE.\n") return args, false } args.key = []byte(args.args[0]) args.value = []byte(args.args[1]) return args, true } func checkExec(args argsT, db DatabaseI, _r io.Reader, _w io.Writer) error { return db.View(func(snapshot SnapshotI) error { var errs error for err := range snapshot.Check() { errs = g.WrapErrors(errs, err) } return errs }) } func getExec(args argsT, db DatabaseI, r io.Reader, w io.Writer) error { return db.View(func(snapshot SnapshotI) error { bucket, err := snapshot.ROBucket(args.bucket) if err != nil { return err } value := bucket.Get(args.key) if value == nil { return ErrMissingKey } fmt.Fprintf(w, "%s\n", string(value)) return nil }) } func setExec(args argsT, db DatabaseI, r io.Reader, w io.Writer) error { return db.Update(func(tx TransactionI) error { bucket, err := tx.CreateBucketIfNotExists(args.bucket) if err != nil { return err } return bucket.Put(args.key, args.value) }) } func rmExec(args argsT, db DatabaseI, r io.Reader, w io.Writer) error { return db.Update(func(tx TransactionI) error { bucket, err := tx.RWBucket(args.bucket) if err != nil { return err } return bucket.Delete(args.key) }) } func listExec(args argsT, db DatabaseI, r io.Reader, w io.Writer) error { return db.View(func(snapshot SnapshotI) error { if len(args.bucket) == 0 { return snapshot.ROForEach(func( name []byte, bucket ROBucketI, ) error { fmt.Fprintf(w, "%s\n", string(name)) return nil }) } bucket, err := snapshot.ROBucket(args.bucket) if err != nil { return err } return bucket.ForEach(func(key []byte, value []byte) error { fmt.Fprintf(w, "%s\t%s\n", string(key), string(value)) return nil }) }) } func usage(argv0 string, w io.Writer) { fmt.Fprintf( w, "Usage: %s [-f FILE] [-b BUCKET] COMMAND [ARGS...]\n", argv0, ) } func getopt( allArgs []string, commandsMap map[string]commandT, w io.Writer, ) (argsT, commandT, int) { argv0 := allArgs[0] argv := allArgs[1:] fs := flag.NewFlagSet("", flag.ContinueOnError) fs.Usage = func() {} fs.SetOutput(w) databasePath := fs.String( "f", "data.kv", "The path to the key-value database file", ) bucket := fs.String( "b", "default", "The name of the bucket that'll be operated on", ) if fs.Parse(argv) != nil { usage(argv0, w) return argsT{}, commandT{}, 2 } subArgs := fs.Args() if len(subArgs) == 0 { fmt.Fprintf(w, "Missing COMMAND.\n") usage(argv0, w) return argsT{}, commandT{}, 2 } args := argsT{ databasePath: *databasePath, command: subArgs[0], allArgs: allArgs, args: subArgs[1:], bucket: []byte(*bucket), } command := commandsMap[args.command] if command.name == "" { fmt.Fprintf(w, "Bad COMMAND: \"%s\".\n", args.command) usage(argv0, w) return argsT{}, commandT{}, 2 } args, ok := command.getopt(args, w) if !ok { usage(argv0, w) return argsT{}, commandT{}, 2 } return args, command, 0 } func runCommand( args argsT, command commandT, stdin io.Reader, stdout io.Writer, stderr io.Writer, ) int { db, err := Open(args.databasePath) if err != nil { fmt.Fprintln(stderr, err) return 1 } defer db.Close() err = command.exec(args, db, stdin, stdout) if err != nil { fmt.Fprintln(stderr, err) return 1 } return 0 } var commands = map[string]commandT{ "check": commandT{ name: "check", getopt: noopGetopt, exec: checkExec, }, "get": commandT{ name: "get", getopt: getGetopt, exec: getExec, }, "set": commandT{ name: "set", getopt: setGetopt, exec: setExec, }, "rm": commandT{ name: "rm", getopt: getGetopt, exec: rmExec, }, "list": commandT{ name: "list", getopt: noopGetopt, exec: listExec, }, } func Main() { g.Init() args, command, rc := getopt(os.Args, commands, os.Stderr) if rc != 0 { os.Exit(rc) } os.Exit(runCommand(args, command, os.Stdin, os.Stdout, os.Stderr)) }