aboutsummaryrefslogtreecommitdiff
path: root/bucket.go
diff options
context:
space:
mode:
Diffstat (limited to 'bucket.go')
-rw-r--r--bucket.go366
1 files changed, 329 insertions, 37 deletions
diff --git a/bucket.go b/bucket.go
index 919d33b..c01d2c5 100644
--- a/bucket.go
+++ b/bucket.go
@@ -3,6 +3,9 @@ package bolt
import (
"bytes"
"errors"
+ "fmt"
+ "sort"
+ "unsafe"
)
var (
@@ -16,14 +19,6 @@ var (
// ErrBucketNameRequired is returned when creating a bucket with a blank name.
ErrBucketNameRequired = errors.New("bucket name required")
- // ErrBucketNameTooLarge is returned when creating a bucket with a name
- // that is longer than MaxBucketNameSize.
- ErrBucketNameTooLarge = errors.New("bucket name too large")
-
- // ErrBucketNotWritable is returned when changing data on a bucket
- // reference that was created from a read-only transaction.
- ErrBucketNotWritable = errors.New("bucket not writable")
-
// ErrKeyRequired is returned when inserting a zero-length key.
ErrKeyRequired = errors.New("key required")
@@ -33,6 +28,11 @@ var (
// ErrValueTooLarge is returned when inserting a value that is larger than MaxValueSize.
ErrValueTooLarge = errors.New("value too large")
+ // ErrIncompatibleValue is returned when trying create or delete a bucket
+ // on an existing non-bucket key or when trying to create or delete a
+ // non-bucket key on an existing bucket key.
+ ErrIncompatibleValue = errors.New("incompatible value")
+
// ErrSequenceOverflow is returned when the next sequence number will be
// larger than the maximum integer size.
ErrSequenceOverflow = errors.New("sequence overflow")
@@ -41,8 +41,10 @@ var (
// Bucket represents a collection of key/value pairs inside the database.
type Bucket struct {
*bucket
- name string
- tx *Tx
+ tx *Tx
+ buckets map[string]*Bucket
+ nodes map[pgid]*node
+ pending []*node
}
// bucket represents the on-file representation of a bucket.
@@ -51,9 +53,14 @@ type bucket struct {
sequence uint64
}
-// Name returns the name of the bucket.
-func (b *Bucket) Name() string {
- return b.name
+// newBucket returns a new bucket associated with a transaction.
+func newBucket(tx *Tx) Bucket {
+ var b = Bucket{tx: tx}
+ b.buckets = make(map[string]*Bucket)
+ if tx.writable {
+ b.nodes = make(map[pgid]*node)
+ }
+ return b
}
// Writable returns whether the bucket is writable.
@@ -70,17 +77,145 @@ func (b *Bucket) Cursor() *Cursor {
// Allocate and return a cursor.
return &Cursor{
- tx: b.tx,
- root: b.root,
- stack: make([]elemRef, 0),
+ bucket: b,
+ stack: make([]elemRef, 0),
+ }
+}
+
+// Bucket retrieves a nested bucket by name.
+// Returns nil if the bucket does not exist.
+func (b *Bucket) Bucket(name []byte) *Bucket {
+ if child := b.buckets[string(name)]; child != nil {
+ return child
+ }
+
+ // Move cursor to key.
+ c := b.Cursor()
+ k, v, flags := c.seek(name)
+
+ // Return nil if the key doesn't exist or it is not a bucket.
+ if !bytes.Equal(name, k) || (flags&bucketLeafFlag) == 0 {
+ return nil
+ }
+
+ // Otherwise create a bucket and cache it.
+ var child = newBucket(b.tx)
+ child.bucket = &bucket{}
+ *child.bucket = *(*bucket)(unsafe.Pointer(&v[0]))
+ b.buckets[string(name)] = &child
+
+ return &child
+}
+
+// CreateBucket creates a new bucket at the given key.
+// Returns an error if the key already exists, if the bucket name is blank, or if the bucket name is too long.
+func (b *Bucket) CreateBucket(key []byte) error {
+ if b.tx.db == nil {
+ return ErrTxClosed
+ } else if !b.tx.writable {
+ return ErrTxNotWritable
+ } else if len(key) == 0 {
+ return ErrBucketNameRequired
}
+
+ // Move cursor to correct position.
+ c := b.Cursor()
+ k, _, flags := c.seek(key)
+
+ // Return an error if there is an existing key.
+ if bytes.Equal(key, k) {
+ if (flags & bucketLeafFlag) != 0 {
+ return ErrBucketExists
+ } else {
+ return ErrIncompatibleValue
+ }
+ }
+
+ // Create a blank root leaf page.
+ p, err := b.tx.allocate(1)
+ if err != nil {
+ return err
+ }
+ p.flags = leafPageFlag
+
+ // Insert key/value.
+ value := make([]byte, unsafe.Sizeof(bucket{}))
+ bucket := (*bucket)(unsafe.Pointer(&value[0]))
+ bucket.root = p.id
+
+ // Insert into node.
+ c.node().put(key, key, value, 0, bucketLeafFlag)
+
+ return nil
+}
+
+// CreateBucketIfNotExists creates a new bucket if it doesn't already exist.
+// Returns an error if the bucket name is blank, or if the bucket name is too long.
+func (b *Bucket) CreateBucketIfNotExists(key []byte) error {
+ err := b.CreateBucket(key)
+ if err != nil && err != ErrBucketExists {
+ return err
+ }
+ return nil
+}
+
+// DeleteBucket deletes a bucket at the given key.
+// Returns an error if the bucket does not exists, or if the key represents a non-bucket value.
+func (b *Bucket) DeleteBucket(key []byte) error {
+ if b.tx.db == nil {
+ return ErrTxClosed
+ } else if !b.Writable() {
+ return ErrTxNotWritable
+ }
+
+ // Move cursor to correct position.
+ c := b.Cursor()
+ k, _, flags := c.seek(key)
+
+ // Return an error if bucket doesn't exist or is not a bucket.
+ if !bytes.Equal(key, k) {
+ return ErrBucketNotFound
+ } else if (flags & bucketLeafFlag) == 0 {
+ return ErrIncompatibleValue
+ }
+
+ // Recursively delete all child buckets.
+ child := b.Bucket(key)
+ err := child.ForEach(func(k, v []byte) error {
+ if v == nil {
+ if err := child.DeleteBucket(k); err != nil {
+ return fmt.Errorf("delete bucket: %s", err)
+ }
+ }
+ return nil
+ })
+ if err != nil {
+ return err
+ }
+
+ // Remove cached copy.
+ delete(b.buckets, string(key))
+
+ // Release all bucket pages to freelist.
+ b.tx.forEachPage(child.root, 0, func(p *page, _ int) {
+ b.tx.db.freelist.free(b.tx.id(), p)
+ })
+
+ // Delete the node if we have a matching key.
+ c.node().del(key)
+
+ return nil
}
// Get retrieves the value for a key in the bucket.
-// Returns a nil value if the key does not exist.
+// Returns a nil value if the key does not exist or if the key is a nested bucket.
func (b *Bucket) Get(key []byte) []byte {
- c := b.Cursor()
- k, v := c.Seek(key)
+ k, v, flags := b.Cursor().seek(key)
+
+ // Return nil if this is a bucket.
+ if (flags & bucketLeafFlag) != 0 {
+ return nil
+ }
// If our target node isn't the same key as what's passed in then return nil.
if !bytes.Equal(key, k) {
@@ -96,11 +231,8 @@ func (b *Bucket) Put(key []byte, value []byte) error {
if b.tx.db == nil {
return ErrTxClosed
} else if !b.Writable() {
- return ErrBucketNotWritable
- }
-
- // Validate the key and data size.
- if len(key) == 0 {
+ return ErrTxNotWritable
+ } else if len(key) == 0 {
return ErrKeyRequired
} else if len(key) > MaxKeySize {
return ErrKeyTooLarge
@@ -110,10 +242,15 @@ func (b *Bucket) Put(key []byte, value []byte) error {
// Move cursor to correct position.
c := b.Cursor()
- c.Seek(key)
+ k, _, flags := c.seek(key)
+
+ // Return an error if there is an existing key with a bucket value.
+ if bytes.Equal(key, k) && (flags&bucketLeafFlag) != 0 {
+ return ErrIncompatibleValue
+ }
- // Insert the key/value.
- c.node(b.tx).put(key, key, value, 0)
+ // Insert into node.
+ c.node().put(key, key, value, 0, 0)
return nil
}
@@ -125,15 +262,20 @@ func (b *Bucket) Delete(key []byte) error {
if b.tx.db == nil {
return ErrTxClosed
} else if !b.Writable() {
- return ErrBucketNotWritable
+ return ErrTxNotWritable
}
// Move cursor to correct position.
c := b.Cursor()
- c.Seek(key)
+ _, _, flags := c.seek(key)
+
+ // Return an error if there is already existing bucket value.
+ if (flags & bucketLeafFlag) != 0 {
+ return ErrIncompatibleValue
+ }
// Delete the node if we have a matching key.
- c.node(b.tx).del(key)
+ c.node().del(key)
return nil
}
@@ -143,7 +285,7 @@ func (b *Bucket) NextSequence() (int, error) {
if b.tx.db == nil {
return 0, ErrTxClosed
} else if !b.Writable() {
- return 0, ErrBucketNotWritable
+ return 0, ErrTxNotWritable
}
// Make sure next sequence number will not be larger than the maximum
@@ -194,6 +336,162 @@ func (b *Bucket) Stat() *BucketStat {
return s
}
+// spill writes all the nodes for this bucket to dirty pages.
+func (b *Bucket) spill() error {
+ // Spill all child buckets first.
+ for name, child := range b.buckets {
+ if err := child.spill(); err != nil {
+ return err
+ }
+
+ // Update the child bucket header in this bucket.
+ value := make([]byte, unsafe.Sizeof(bucket{}))
+ bucket := (*bucket)(unsafe.Pointer(&value[0]))
+ *bucket = *child.bucket
+
+ // Update parent node.
+ c := b.Cursor()
+ k, _, flags := c.seek([]byte(name))
+ _assert(bytes.Equal([]byte(name), k), "misplaced bucket header: %x -> %x", []byte(name), k)
+ _assert(flags&bucketLeafFlag != 0, "unexpected bucket header flag: %x", flags)
+ c.node().put([]byte(name), []byte(name), value, 0, bucketLeafFlag)
+ }
+
+ // Ignore if there are no nodes to spill.
+ if len(b.nodes) == 0 {
+ return nil
+ }
+
+ // Sort nodes by highest depth first.
+ nodes := make(nodesByDepth, 0, len(b.nodes))
+ for _, n := range b.nodes {
+ nodes = append(nodes, n)
+ }
+ sort.Sort(nodes)
+
+ // Spill nodes by deepest first.
+ for i := 0; i < len(nodes); i++ {
+ n := nodes[i]
+
+ // Split nodes into appropriate sized nodes.
+ // The first node in this list will be a reference to n to preserve ancestry.
+ newNodes := n.split(b.tx.db.pageSize)
+ b.pending = newNodes
+
+ // If this is a root node that split then create a parent node.
+ if n.parent == nil && len(newNodes) > 1 {
+ n.parent = &node{bucket: b, isLeaf: false}
+ nodes = append(nodes, n.parent)
+ }
+
+ // Add node's page to the freelist.
+ if n.pgid > 0 {
+ b.tx.db.freelist.free(b.tx.id(), b.tx.page(n.pgid))
+ }
+
+ // Write nodes to dirty pages.
+ for i, newNode := range newNodes {
+ // Allocate contiguous space for the node.
+ p, err := b.tx.allocate((newNode.size() / b.tx.db.pageSize) + 1)
+ if err != nil {
+ return err
+ }
+
+ // Write the node to the page.
+ newNode.write(p)
+ newNode.pgid = p.id
+ newNode.parent = n.parent
+
+ // The first node should use the existing entry, other nodes are inserts.
+ var oldKey []byte
+ if i == 0 {
+ oldKey = n.key
+ } else {
+ oldKey = newNode.inodes[0].key
+ }
+
+ // Update the parent entry.
+ if newNode.parent != nil {
+ newNode.parent.put(oldKey, newNode.inodes[0].key, nil, newNode.pgid, 0)
+ }
+
+ // Update the statistics.
+ b.tx.stats.Spill++
+ }
+
+ b.pending = nil
+ }
+
+ // Clear out nodes now that they are all spilled.
+ b.nodes = make(map[pgid]*node)
+
+ // Update the root node for this bucket.
+ b.root = nodes[len(nodes)-1].pgid
+
+ return nil
+}
+
+// rebalance attempts to balance all nodes.
+func (b *Bucket) rebalance() {
+ for _, n := range b.nodes {
+ n.rebalance()
+ }
+ for _, child := range b.buckets {
+ child.rebalance()
+ }
+}
+
+// node creates a node from a page and associates it with a given parent.
+func (b *Bucket) node(pgid pgid, parent *node) *node {
+ _assert(b.nodes != nil, "nodes map expected")
+ // Retrieve node if it's already been created.
+ if n := b.nodes[pgid]; n != nil {
+ return n
+ }
+
+ // Otherwise create a branch and cache it.
+ n := &node{bucket: b, parent: parent}
+ if n.parent != nil {
+ n.depth = n.parent.depth + 1
+ }
+ n.read(b.tx.page(pgid))
+ b.nodes[pgid] = n
+
+ // Update statistics.
+ b.tx.stats.NodeCount++
+
+ return n
+}
+
+// dereference removes all references to the old mmap.
+func (b *Bucket) dereference() {
+ for _, n := range b.nodes {
+ n.dereference()
+ }
+
+ for _, n := range b.pending {
+ n.dereference()
+ }
+
+ for _, child := range b.buckets {
+ child.dereference()
+ }
+
+ // Update statistics
+ b.tx.stats.NodeDeref += len(b.nodes) + len(b.pending)
+}
+
+// pageNode returns the in-memory node, if it exists.
+// Otherwise returns the underlying page.
+func (b *Bucket) pageNode(id pgid) (*page, *node) {
+ if b.nodes != nil {
+ if n := b.nodes[id]; n != nil {
+ return nil, n
+ }
+ }
+ return b.tx.page(id), nil
+}
+
// BucketStat represents stats on a bucket such as branch pages and leaf pages.
type BucketStat struct {
BranchPageCount int
@@ -202,9 +500,3 @@ type BucketStat struct {
KeyCount int
MaxDepth int
}
-
-type bucketsByName []*Bucket
-
-func (s bucketsByName) Len() int { return len(s) }
-func (s bucketsByName) Swap(i, j int) { s[i], s[j] = s[j], s[i] }
-func (s bucketsByName) Less(i, j int) bool { return s[i].name < s[j].name }