diff options
Diffstat (limited to 'tx.go')
-rw-r--r-- | tx.go | 294 |
1 files changed, 41 insertions, 253 deletions
@@ -33,10 +33,8 @@ type Tx struct { managed bool db *DB meta *meta - buckets *buckets - nodes map[pgid]*node + root Bucket pages map[pgid]*page - pending []*node stats TxStats commitHandlers []func() } @@ -50,15 +48,14 @@ func (tx *Tx) init(db *DB) { tx.meta = &meta{} db.meta().copy(tx.meta) - // Read in the buckets page. - tx.buckets = &buckets{} - tx.buckets.read(tx.page(tx.meta.buckets)) + // Copy over the root bucket. + tx.root = newBucket(tx) + tx.root.bucket = &bucket{} + *tx.root.bucket = tx.meta.root + // Increment the transaction id and add a page cache for writable transactions. if tx.writable { tx.pages = make(map[pgid]*page) - tx.nodes = make(map[pgid]*node) - - // Increment the transaction id. tx.meta.txid += txid(1) } } @@ -85,95 +82,38 @@ func (tx *Tx) Stats() TxStats { // Bucket retrieves a bucket by name. // Returns nil if the bucket does not exist. -func (tx *Tx) Bucket(name string) *Bucket { - b := tx.buckets.get(name) - if b == nil { - return nil - } - - return &Bucket{ - bucket: b, - name: name, - tx: tx, - } -} - -// Buckets retrieves a list of all buckets. -func (tx *Tx) Buckets() []*Bucket { - buckets := make([]*Bucket, 0, len(tx.buckets.items)) - for name, b := range tx.buckets.items { - bucket := &Bucket{ - bucket: b, - name: name, - tx: tx, - } - buckets = append(buckets, bucket) - } - sort.Sort(bucketsByName(buckets)) - return buckets +func (tx *Tx) Bucket(name []byte) *Bucket { + return tx.root.Bucket(name) } // CreateBucket creates a new bucket. // Returns an error if the bucket already exists, if the bucket name is blank, or if the bucket name is too long. -func (tx *Tx) CreateBucket(name string) error { - if tx.db == nil { - return ErrTxClosed - } else if !tx.writable { - return ErrTxNotWritable - } else if b := tx.Bucket(name); b != nil { - return ErrBucketExists - } else if len(name) == 0 { - return ErrBucketNameRequired - } else if len(name) > MaxBucketNameSize { - return ErrBucketNameTooLarge - } - - // Create a blank root leaf page. - p, err := tx.allocate(1) - if err != nil { - return err - } - p.flags = leafPageFlag - - // Add bucket to buckets page. - tx.buckets.put(name, &bucket{root: p.id}) - - return nil +func (tx *Tx) CreateBucket(name []byte) error { + return tx.root.CreateBucket(name) } // CreateBucketIfNotExists creates a new bucket if it doesn't already exist. // Returns an error if the bucket name is blank, or if the bucket name is too long. -func (tx *Tx) CreateBucketIfNotExists(name string) error { - err := tx.CreateBucket(name) - if err != nil && err != ErrBucketExists { - return err - } - return nil +func (tx *Tx) CreateBucketIfNotExists(name []byte) error { + return tx.root.CreateBucketIfNotExists(name) } // DeleteBucket deletes a bucket. -// Returns an error if the bucket cannot be found. -func (tx *Tx) DeleteBucket(name string) error { - if tx.db == nil { - return ErrTxClosed - } else if !tx.writable { - return ErrTxNotWritable - } - - b := tx.Bucket(name) - if b == nil { - return ErrBucketNotFound - } - - // Remove from buckets page. - tx.buckets.del(name) +// Returns an error if the bucket cannot be found or if the key represents a non-bucket value. +func (tx *Tx) DeleteBucket(name []byte) error { + return tx.root.DeleteBucket(name) +} - // Free all pages. - tx.forEachPage(b.root, 0, func(p *page, depth int) { - tx.db.freelist.free(tx.id(), p) +// ForEach executes a function for each bucket in the root. +// If the provided function returns an error then the iteration is stopped and +// the error is returned to the caller. +func (tx *Tx) ForEach(fn func(name []byte, b *Bucket) error) error { + return tx.root.ForEach(func(k, v []byte) error { + if err := fn(k, tx.root.Bucket(k)); err != nil { + return err + } + return nil }) - - return nil } // OnCommit adds a handler function to be executed after the transaction successfully commits. @@ -184,9 +124,8 @@ func (tx *Tx) OnCommit(fn func()) { // Commit writes all changes to disk and updates the meta page. // Returns an error if a disk write error occurs. func (tx *Tx) Commit() error { - if tx.managed { - panic("managed tx commit not allowed") - } else if tx.db == nil { + _assert(!tx.managed, "managed tx commit not allowed") + if tx.db == nil { return ErrTxClosed } else if !tx.writable { return ErrTxNotWritable @@ -196,33 +135,24 @@ func (tx *Tx) Commit() error { // Rebalance nodes which have had deletions. var startTime = time.Now() - tx.rebalance() + tx.root.rebalance() tx.stats.RebalanceTime += time.Since(startTime) // spill data onto dirty pages. startTime = time.Now() - if err := tx.spill(); err != nil { + if err := tx.root.spill(); err != nil { tx.close() return err } tx.stats.SpillTime += time.Since(startTime) - // Spill buckets page. - p, err := tx.allocate((tx.buckets.size() / tx.db.pageSize) + 1) - if err != nil { - tx.close() - return err - } - tx.buckets.write(p) - - // Free previous bucket page and update meta. - tx.db.freelist.free(tx.id(), tx.page(tx.meta.buckets)) - tx.meta.buckets = p.id + // Free the old root bucket. + tx.meta.root.root = tx.root.root // Free the freelist and allocate new pages for it. This will overestimate // the size of the freelist but not underestimate the size (which would be bad). - tx.db.freelist.free(tx.id(), tx.page(tx.meta.freelist)) - p, err = tx.allocate((tx.db.freelist.size() / tx.db.pageSize) + 1) + tx.db.freelist.free(tx.id(), tx.db.page(tx.meta.freelist)) + p, err := tx.allocate((tx.db.freelist.size() / tx.db.pageSize) + 1) if err != nil { tx.close() return err @@ -257,9 +187,8 @@ func (tx *Tx) Commit() error { // Rollback closes the transaction and ignores all previous updates. func (tx *Tx) Rollback() error { - if tx.managed { - panic("managed tx rollback not allowed") - } else if tx.db == nil { + _assert(!tx.managed, "managed tx rollback not allowed") + if tx.db == nil { return ErrTxClosed } tx.close() @@ -268,13 +197,13 @@ func (tx *Tx) Rollback() error { func (tx *Tx) close() { if tx.writable { - // Merge statistics. - tx.db.metalock.Lock() - tx.db.stats.TxStats.add(&tx.stats) - tx.db.metalock.Unlock() - // Remove writer lock. tx.db.rwlock.Unlock() + + // Merge statistics. + tx.db.statlock.Lock() + tx.db.stats.TxStats.add(&tx.stats) + tx.db.statlock.Unlock() } else { tx.db.removeTx(tx) } @@ -298,99 +227,6 @@ func (tx *Tx) allocate(count int) (*page, error) { return p, nil } -// rebalance attempts to balance all nodes. -func (tx *Tx) rebalance() { - for _, n := range tx.nodes { - n.rebalance() - } -} - -// spill writes all the nodes to dirty pages. -func (tx *Tx) spill() error { - // Keep track of the current root nodes. - // We will update this at the end once all nodes are created. - type root struct { - node *node - pgid pgid - } - var roots []root - - // Sort nodes by highest depth first. - nodes := make(nodesByDepth, 0, len(tx.nodes)) - for _, n := range tx.nodes { - nodes = append(nodes, n) - } - sort.Sort(nodes) - - // Spill nodes by deepest first. - for i := 0; i < len(nodes); i++ { - n := nodes[i] - - // Save existing root buckets for later. - if n.parent == nil && n.pgid != 0 { - roots = append(roots, root{n, n.pgid}) - } - - // Split nodes into appropriate sized nodes. - // The first node in this list will be a reference to n to preserve ancestry. - newNodes := n.split(tx.db.pageSize) - tx.pending = newNodes - - // If this is a root node that split then create a parent node. - if n.parent == nil && len(newNodes) > 1 { - n.parent = &node{tx: tx, isLeaf: false} - nodes = append(nodes, n.parent) - } - - // Add node's page to the freelist. - if n.pgid > 0 { - tx.db.freelist.free(tx.id(), tx.page(n.pgid)) - } - - // Write nodes to dirty pages. - for i, newNode := range newNodes { - // Allocate contiguous space for the node. - p, err := tx.allocate((newNode.size() / tx.db.pageSize) + 1) - if err != nil { - return err - } - - // Write the node to the page. - newNode.write(p) - newNode.pgid = p.id - newNode.parent = n.parent - - // The first node should use the existing entry, other nodes are inserts. - var oldKey []byte - if i == 0 { - oldKey = n.key - } else { - oldKey = newNode.inodes[0].key - } - - // Update the parent entry. - if newNode.parent != nil { - newNode.parent.put(oldKey, newNode.inodes[0].key, nil, newNode.pgid) - } - - // Update the statistics. - tx.stats.Spill++ - } - - tx.pending = nil - } - - // Update roots with new roots. - for _, root := range roots { - tx.buckets.updateRoot(root.pgid, root.node.root().pgid) - } - - // Clear out nodes now that they are all spilled. - tx.nodes = make(map[pgid]*node) - - return nil -} - // write writes any dirty pages to disk. func (tx *Tx) write() error { // Sort pages by id. @@ -443,43 +279,6 @@ func (tx *Tx) writeMeta() error { return nil } -// node creates a node from a page and associates it with a given parent. -func (tx *Tx) node(pgid pgid, parent *node) *node { - // Retrieve node if it's already been created. - if tx.nodes == nil { - return nil - } else if n := tx.nodes[pgid]; n != nil { - return n - } - - // Otherwise create a branch and cache it. - n := &node{tx: tx, parent: parent} - if n.parent != nil { - n.depth = n.parent.depth + 1 - } - n.read(tx.page(pgid)) - tx.nodes[pgid] = n - - // Update statistics. - tx.stats.NodeCount++ - - return n -} - -// dereference removes all references to the old mmap. -func (tx *Tx) dereference() { - for _, n := range tx.nodes { - n.dereference() - } - - for _, n := range tx.pending { - n.dereference() - } - - // Update statistics - tx.stats.NodeDeref += len(tx.nodes) + len(tx.pending) -} - // page returns a reference to the page with a given id. // If page has been written to then a temporary bufferred page is returned. func (tx *Tx) page(id pgid) *page { @@ -494,17 +293,6 @@ func (tx *Tx) page(id pgid) *page { return tx.db.page(id) } -// pageNode returns the in-memory node, if it exists. -// Otherwise returns the underlying page. -func (tx *Tx) pageNode(id pgid) (*page, *node) { - if tx.nodes != nil { - if n := tx.nodes[id]; n != nil { - return nil, n - } - } - return tx.page(id), nil -} - // forEachPage iterates over every page within a given page and executes a function. func (tx *Tx) forEachPage(pgid pgid, depth int, fn func(*page, int)) { p := tx.page(pgid) @@ -533,7 +321,7 @@ func (tx *Tx) Page(id int) (*PageInfo, error) { } // Build the page info. - p := tx.page(pgid(id)) + p := tx.db.page(pgid(id)) info := &PageInfo{ ID: id, Count: int(p.count), |