aboutsummaryrefslogtreecommitdiff
path: root/tx.go
diff options
context:
space:
mode:
Diffstat (limited to 'tx.go')
-rw-r--r--tx.go294
1 files changed, 41 insertions, 253 deletions
diff --git a/tx.go b/tx.go
index bfdcce1..9f86d5e 100644
--- a/tx.go
+++ b/tx.go
@@ -33,10 +33,8 @@ type Tx struct {
managed bool
db *DB
meta *meta
- buckets *buckets
- nodes map[pgid]*node
+ root Bucket
pages map[pgid]*page
- pending []*node
stats TxStats
commitHandlers []func()
}
@@ -50,15 +48,14 @@ func (tx *Tx) init(db *DB) {
tx.meta = &meta{}
db.meta().copy(tx.meta)
- // Read in the buckets page.
- tx.buckets = &buckets{}
- tx.buckets.read(tx.page(tx.meta.buckets))
+ // Copy over the root bucket.
+ tx.root = newBucket(tx)
+ tx.root.bucket = &bucket{}
+ *tx.root.bucket = tx.meta.root
+ // Increment the transaction id and add a page cache for writable transactions.
if tx.writable {
tx.pages = make(map[pgid]*page)
- tx.nodes = make(map[pgid]*node)
-
- // Increment the transaction id.
tx.meta.txid += txid(1)
}
}
@@ -85,95 +82,38 @@ func (tx *Tx) Stats() TxStats {
// Bucket retrieves a bucket by name.
// Returns nil if the bucket does not exist.
-func (tx *Tx) Bucket(name string) *Bucket {
- b := tx.buckets.get(name)
- if b == nil {
- return nil
- }
-
- return &Bucket{
- bucket: b,
- name: name,
- tx: tx,
- }
-}
-
-// Buckets retrieves a list of all buckets.
-func (tx *Tx) Buckets() []*Bucket {
- buckets := make([]*Bucket, 0, len(tx.buckets.items))
- for name, b := range tx.buckets.items {
- bucket := &Bucket{
- bucket: b,
- name: name,
- tx: tx,
- }
- buckets = append(buckets, bucket)
- }
- sort.Sort(bucketsByName(buckets))
- return buckets
+func (tx *Tx) Bucket(name []byte) *Bucket {
+ return tx.root.Bucket(name)
}
// CreateBucket creates a new bucket.
// Returns an error if the bucket already exists, if the bucket name is blank, or if the bucket name is too long.
-func (tx *Tx) CreateBucket(name string) error {
- if tx.db == nil {
- return ErrTxClosed
- } else if !tx.writable {
- return ErrTxNotWritable
- } else if b := tx.Bucket(name); b != nil {
- return ErrBucketExists
- } else if len(name) == 0 {
- return ErrBucketNameRequired
- } else if len(name) > MaxBucketNameSize {
- return ErrBucketNameTooLarge
- }
-
- // Create a blank root leaf page.
- p, err := tx.allocate(1)
- if err != nil {
- return err
- }
- p.flags = leafPageFlag
-
- // Add bucket to buckets page.
- tx.buckets.put(name, &bucket{root: p.id})
-
- return nil
+func (tx *Tx) CreateBucket(name []byte) error {
+ return tx.root.CreateBucket(name)
}
// CreateBucketIfNotExists creates a new bucket if it doesn't already exist.
// Returns an error if the bucket name is blank, or if the bucket name is too long.
-func (tx *Tx) CreateBucketIfNotExists(name string) error {
- err := tx.CreateBucket(name)
- if err != nil && err != ErrBucketExists {
- return err
- }
- return nil
+func (tx *Tx) CreateBucketIfNotExists(name []byte) error {
+ return tx.root.CreateBucketIfNotExists(name)
}
// DeleteBucket deletes a bucket.
-// Returns an error if the bucket cannot be found.
-func (tx *Tx) DeleteBucket(name string) error {
- if tx.db == nil {
- return ErrTxClosed
- } else if !tx.writable {
- return ErrTxNotWritable
- }
-
- b := tx.Bucket(name)
- if b == nil {
- return ErrBucketNotFound
- }
-
- // Remove from buckets page.
- tx.buckets.del(name)
+// Returns an error if the bucket cannot be found or if the key represents a non-bucket value.
+func (tx *Tx) DeleteBucket(name []byte) error {
+ return tx.root.DeleteBucket(name)
+}
- // Free all pages.
- tx.forEachPage(b.root, 0, func(p *page, depth int) {
- tx.db.freelist.free(tx.id(), p)
+// ForEach executes a function for each bucket in the root.
+// If the provided function returns an error then the iteration is stopped and
+// the error is returned to the caller.
+func (tx *Tx) ForEach(fn func(name []byte, b *Bucket) error) error {
+ return tx.root.ForEach(func(k, v []byte) error {
+ if err := fn(k, tx.root.Bucket(k)); err != nil {
+ return err
+ }
+ return nil
})
-
- return nil
}
// OnCommit adds a handler function to be executed after the transaction successfully commits.
@@ -184,9 +124,8 @@ func (tx *Tx) OnCommit(fn func()) {
// Commit writes all changes to disk and updates the meta page.
// Returns an error if a disk write error occurs.
func (tx *Tx) Commit() error {
- if tx.managed {
- panic("managed tx commit not allowed")
- } else if tx.db == nil {
+ _assert(!tx.managed, "managed tx commit not allowed")
+ if tx.db == nil {
return ErrTxClosed
} else if !tx.writable {
return ErrTxNotWritable
@@ -196,33 +135,24 @@ func (tx *Tx) Commit() error {
// Rebalance nodes which have had deletions.
var startTime = time.Now()
- tx.rebalance()
+ tx.root.rebalance()
tx.stats.RebalanceTime += time.Since(startTime)
// spill data onto dirty pages.
startTime = time.Now()
- if err := tx.spill(); err != nil {
+ if err := tx.root.spill(); err != nil {
tx.close()
return err
}
tx.stats.SpillTime += time.Since(startTime)
- // Spill buckets page.
- p, err := tx.allocate((tx.buckets.size() / tx.db.pageSize) + 1)
- if err != nil {
- tx.close()
- return err
- }
- tx.buckets.write(p)
-
- // Free previous bucket page and update meta.
- tx.db.freelist.free(tx.id(), tx.page(tx.meta.buckets))
- tx.meta.buckets = p.id
+ // Free the old root bucket.
+ tx.meta.root.root = tx.root.root
// Free the freelist and allocate new pages for it. This will overestimate
// the size of the freelist but not underestimate the size (which would be bad).
- tx.db.freelist.free(tx.id(), tx.page(tx.meta.freelist))
- p, err = tx.allocate((tx.db.freelist.size() / tx.db.pageSize) + 1)
+ tx.db.freelist.free(tx.id(), tx.db.page(tx.meta.freelist))
+ p, err := tx.allocate((tx.db.freelist.size() / tx.db.pageSize) + 1)
if err != nil {
tx.close()
return err
@@ -257,9 +187,8 @@ func (tx *Tx) Commit() error {
// Rollback closes the transaction and ignores all previous updates.
func (tx *Tx) Rollback() error {
- if tx.managed {
- panic("managed tx rollback not allowed")
- } else if tx.db == nil {
+ _assert(!tx.managed, "managed tx rollback not allowed")
+ if tx.db == nil {
return ErrTxClosed
}
tx.close()
@@ -268,13 +197,13 @@ func (tx *Tx) Rollback() error {
func (tx *Tx) close() {
if tx.writable {
- // Merge statistics.
- tx.db.metalock.Lock()
- tx.db.stats.TxStats.add(&tx.stats)
- tx.db.metalock.Unlock()
-
// Remove writer lock.
tx.db.rwlock.Unlock()
+
+ // Merge statistics.
+ tx.db.statlock.Lock()
+ tx.db.stats.TxStats.add(&tx.stats)
+ tx.db.statlock.Unlock()
} else {
tx.db.removeTx(tx)
}
@@ -298,99 +227,6 @@ func (tx *Tx) allocate(count int) (*page, error) {
return p, nil
}
-// rebalance attempts to balance all nodes.
-func (tx *Tx) rebalance() {
- for _, n := range tx.nodes {
- n.rebalance()
- }
-}
-
-// spill writes all the nodes to dirty pages.
-func (tx *Tx) spill() error {
- // Keep track of the current root nodes.
- // We will update this at the end once all nodes are created.
- type root struct {
- node *node
- pgid pgid
- }
- var roots []root
-
- // Sort nodes by highest depth first.
- nodes := make(nodesByDepth, 0, len(tx.nodes))
- for _, n := range tx.nodes {
- nodes = append(nodes, n)
- }
- sort.Sort(nodes)
-
- // Spill nodes by deepest first.
- for i := 0; i < len(nodes); i++ {
- n := nodes[i]
-
- // Save existing root buckets for later.
- if n.parent == nil && n.pgid != 0 {
- roots = append(roots, root{n, n.pgid})
- }
-
- // Split nodes into appropriate sized nodes.
- // The first node in this list will be a reference to n to preserve ancestry.
- newNodes := n.split(tx.db.pageSize)
- tx.pending = newNodes
-
- // If this is a root node that split then create a parent node.
- if n.parent == nil && len(newNodes) > 1 {
- n.parent = &node{tx: tx, isLeaf: false}
- nodes = append(nodes, n.parent)
- }
-
- // Add node's page to the freelist.
- if n.pgid > 0 {
- tx.db.freelist.free(tx.id(), tx.page(n.pgid))
- }
-
- // Write nodes to dirty pages.
- for i, newNode := range newNodes {
- // Allocate contiguous space for the node.
- p, err := tx.allocate((newNode.size() / tx.db.pageSize) + 1)
- if err != nil {
- return err
- }
-
- // Write the node to the page.
- newNode.write(p)
- newNode.pgid = p.id
- newNode.parent = n.parent
-
- // The first node should use the existing entry, other nodes are inserts.
- var oldKey []byte
- if i == 0 {
- oldKey = n.key
- } else {
- oldKey = newNode.inodes[0].key
- }
-
- // Update the parent entry.
- if newNode.parent != nil {
- newNode.parent.put(oldKey, newNode.inodes[0].key, nil, newNode.pgid)
- }
-
- // Update the statistics.
- tx.stats.Spill++
- }
-
- tx.pending = nil
- }
-
- // Update roots with new roots.
- for _, root := range roots {
- tx.buckets.updateRoot(root.pgid, root.node.root().pgid)
- }
-
- // Clear out nodes now that they are all spilled.
- tx.nodes = make(map[pgid]*node)
-
- return nil
-}
-
// write writes any dirty pages to disk.
func (tx *Tx) write() error {
// Sort pages by id.
@@ -443,43 +279,6 @@ func (tx *Tx) writeMeta() error {
return nil
}
-// node creates a node from a page and associates it with a given parent.
-func (tx *Tx) node(pgid pgid, parent *node) *node {
- // Retrieve node if it's already been created.
- if tx.nodes == nil {
- return nil
- } else if n := tx.nodes[pgid]; n != nil {
- return n
- }
-
- // Otherwise create a branch and cache it.
- n := &node{tx: tx, parent: parent}
- if n.parent != nil {
- n.depth = n.parent.depth + 1
- }
- n.read(tx.page(pgid))
- tx.nodes[pgid] = n
-
- // Update statistics.
- tx.stats.NodeCount++
-
- return n
-}
-
-// dereference removes all references to the old mmap.
-func (tx *Tx) dereference() {
- for _, n := range tx.nodes {
- n.dereference()
- }
-
- for _, n := range tx.pending {
- n.dereference()
- }
-
- // Update statistics
- tx.stats.NodeDeref += len(tx.nodes) + len(tx.pending)
-}
-
// page returns a reference to the page with a given id.
// If page has been written to then a temporary bufferred page is returned.
func (tx *Tx) page(id pgid) *page {
@@ -494,17 +293,6 @@ func (tx *Tx) page(id pgid) *page {
return tx.db.page(id)
}
-// pageNode returns the in-memory node, if it exists.
-// Otherwise returns the underlying page.
-func (tx *Tx) pageNode(id pgid) (*page, *node) {
- if tx.nodes != nil {
- if n := tx.nodes[id]; n != nil {
- return nil, n
- }
- }
- return tx.page(id), nil
-}
-
// forEachPage iterates over every page within a given page and executes a function.
func (tx *Tx) forEachPage(pgid pgid, depth int, fn func(*page, int)) {
p := tx.page(pgid)
@@ -533,7 +321,7 @@ func (tx *Tx) Page(id int) (*PageInfo, error) {
}
// Build the page info.
- p := tx.page(pgid(id))
+ p := tx.db.page(pgid(id))
info := &PageInfo{
ID: id,
Count: int(p.count),