diff options
author | Ben Johnson <benbjohnson@yahoo.com> | 2014-04-21 07:24:48 -0600 |
---|---|---|
committer | Ben Johnson <benbjohnson@yahoo.com> | 2014-04-21 07:24:48 -0600 |
commit | afe8123d91e9b7e492fec95fad537f37d911f5d0 (patch) | |
tree | 9c59a10b225461923f594e8e9c8b374f0df09136 | |
parent | Merge pull request #135 from benbjohnson/bench (diff) | |
parent | all tests pass (diff) | |
download | dedo-afe8123d91e9b7e492fec95fad537f37d911f5d0.tar.gz dedo-afe8123d91e9b7e492fec95fad537f37d911f5d0.tar.xz |
Merge pull request #134 from Shopify/c_cursor
C cursor
-rw-r--r-- | bucket.go | 10 | ||||
-rw-r--r-- | c/cursor.go | 381 | ||||
-rw-r--r-- | c/cursor_test.go | 220 | ||||
-rw-r--r-- | c/doc.go | 4 | ||||
-rw-r--r-- | cursor_test.go | 10 | ||||
-rw-r--r-- | db.go | 11 |
6 files changed, 631 insertions, 5 deletions
@@ -63,6 +63,16 @@ func newBucket(tx *Tx) Bucket { return b } +// Tx returns the tx of the bucket. +func (b *Bucket) Tx() *Tx { + return b.tx +} + +// Root returns the root of the bucket. +func (b *Bucket) Root() pgid { + return b.root +} + // Writable returns whether the bucket is writable. func (b *Bucket) Writable() bool { return b.tx.writable diff --git a/c/cursor.go b/c/cursor.go new file mode 100644 index 0000000..49fdcd8 --- /dev/null +++ b/c/cursor.go @@ -0,0 +1,381 @@ +package c + +/* +#include <stdint.h> +#include <stdlib.h> +#include <stdio.h> +#include <string.h> +#include <inttypes.h> + +//------------------------------------------------------------------------------ +// Constants +//------------------------------------------------------------------------------ + +// This represents the maximum number of levels that a cursor can traverse. +#define MAX_DEPTH 100 + +// These flags mark the type of page and are set in the page.flags. +#define PAGE_BRANCH 0x01 +#define PAGE_LEAF 0x02 +#define PAGE_META 0x04 +#define PAGE_FREELIST 0x10 + + +//------------------------------------------------------------------------------ +// Typedefs +//------------------------------------------------------------------------------ + +// These types MUST have the same layout as their corresponding Go types + +typedef int64_t pgid; + +// Page represents a header struct of a block in the mmap. +typedef struct page { + pgid id; + uint16_t flags; + uint16_t count; + uint32_t overflow; +} page; + +// The branch element represents an a item in a branch page +// that points to a child page. +typedef struct branch_element { + uint32_t pos; + uint32_t ksize; + pgid pgid; +} branch_element; + +// The leaf element represents an a item in a leaf page +// that points to a key/value pair. +typedef struct leaf_element { + uint32_t flags; + uint32_t pos; + uint32_t ksize; + uint32_t vsize; +} leaf_element; + +// elem_ref represents a pointer to an element inside of a page. +// It is used by the cursor stack to track the position at each level. +typedef struct elem_ref { + page *page; + uint16_t index; +} elem_ref; + +// bolt_val represents a pointer to a fixed-length series of bytes. +// It is used to represent keys and values returned by the cursor. +typedef struct bolt_val { + uint32_t size; + void *data; +} bolt_val; + +// bolt_cursor represents a cursor attached to a bucket. +typedef struct bolt_cursor { + void *data; + pgid root; + size_t pgsz; + int top; + elem_ref stack[MAX_DEPTH]; +} bolt_cursor; + + +//------------------------------------------------------------------------------ +// Forward Declarations +//------------------------------------------------------------------------------ + +elem_ref *cursor_push(bolt_cursor *c, pgid id); + +elem_ref *cursor_current(bolt_cursor *c); + +elem_ref *cursor_pop(bolt_cursor *c); + +void cursor_first_leaf(bolt_cursor *c); + +void cursor_key_value(bolt_cursor *c, bolt_val *key, bolt_val *value, uint32_t *flags); + +void cursor_search(bolt_cursor *c, bolt_val key, pgid id); + +void cursor_search_branch(bolt_cursor *c, bolt_val key); + +void cursor_search_leaf(bolt_cursor *c, bolt_val key); + +//------------------------------------------------------------------------------ +// Public Functions +//------------------------------------------------------------------------------ + +// Initializes a cursor. +void bolt_cursor_init(bolt_cursor *c, void *data, size_t pgsz, pgid root) { + c->data = data; + c->root = root; + c->pgsz = pgsz; + c->top = -1; +} + +// Positions the cursor to the first leaf element and returns the key/value pair. +void bolt_cursor_first(bolt_cursor *c, bolt_val *key, bolt_val *value, uint32_t *flags) { + // reset stack to initial state + elem_ref *ref = cursor_push(c, c->root); + + // Find first leaf and return key/value. + cursor_first_leaf(c); + cursor_key_value(c, key, value, flags); +} + +// Positions the cursor to the next leaf element and returns the key/value pair. +void bolt_cursor_next(bolt_cursor *c, bolt_val *key, bolt_val *value, uint32_t *flags) { + int i; + elem_ref *ref; + + // Attempt to move over one element until we're successful. + // Move up the stack as we hit the end of each page in our stack. + for (ref = cursor_current(c); ref != NULL; ref = cursor_current(c)) { + ref->index++; + if (ref->index < ref->page->count) break; + cursor_pop(c); + }; + + // If we are at the top of the stack then return a blank key/value pair. + if (ref == NULL) { + key->size = value->size = 0; + key->data = value->data = NULL; + *flags = 0; + return; + }; + + // Find first leaf and return key/value. + cursor_first_leaf(c); + cursor_key_value(c, key, value, flags); +} + +// Positions the cursor first leaf element starting from a given key. +// If there is a matching key then the cursor will be place on that key. +// If there not a match then the cursor will be placed on the next key, if available. +void bolt_cursor_seek(bolt_cursor *c, bolt_val seek, bolt_val *key, bolt_val *value, uint32_t *flags) { + // Start from root page/node and traverse to correct page. + cursor_push(c, c->root); + if (seek.size > 0) cursor_search(c, seek, c->root); + elem_ref *ref = cursor_current(c); + + // If the cursor is pointing to the end of page then return nil. + if (ref == NULL) { + key->size = value->size = 0; + key->data = value->data = NULL; + *flags = 0; + return; + }; + + // Find first leaf and return key/value. + cursor_first_leaf(c); + cursor_key_value(c, key, value, flags); +} + + +//------------------------------------------------------------------------------ +// Private Functions +//------------------------------------------------------------------------------ + +// Push ref to the first element of the page onto the cursor stack +// If the page is the root page reset the stack to initial state +elem_ref *cursor_push(bolt_cursor *c, pgid id) { + elem_ref *ref; + if (id == c->root) + c->top = 0; + else + c->top++; + ref = &(c->stack[c->top]); + ref->page = (page *)(c->data + (c->pgsz * id)); + ref->index = 0; + return ref; +} + +// Return current element ref from the cursor stack +// If stack is empty return null +elem_ref *cursor_current(bolt_cursor *c) { + if (c->top < 0) return NULL; + return &c->stack[c->top]; +} + +// Pop current element ref off the cursor stack +elem_ref *cursor_pop(bolt_cursor *c) { + elem_ref *ref = cursor_current(c); + if (ref != NULL) c->top--; + return ref; +} + +// Returns the branch element at a given index on a given page. +branch_element *page_branch_element(page *p, uint16_t index) { + branch_element *elements = (branch_element*)((void*)(p) + sizeof(page)); + return &elements[index]; +} + +// Returns the leaf element at a given index on a given page. +leaf_element *page_leaf_element(page *p, uint16_t index) { + leaf_element *elements = (leaf_element*)((void*)(p) + sizeof(page)); + return &elements[index]; +} + +// Returns the key/value pair for the current position of the cursor. +void cursor_key_value(bolt_cursor *c, bolt_val *key, bolt_val *value, uint32_t *flags) { + elem_ref *ref = cursor_current(c); + leaf_element *elem = page_leaf_element(ref->page,ref->index); + + // Assign key pointer. + key->size = elem->ksize; + key->data = ((void*)elem) + elem->pos; + + // Assign value pointer. + value->size = elem->vsize; + value->data = key->data + key->size; + + // Return the element flags. + *flags = elem->flags; +} + +// Traverses from the current stack position down to the first leaf element. +void cursor_first_leaf(bolt_cursor *c) { + elem_ref *ref = cursor_current(c); + while (ref->page->flags & PAGE_BRANCH) { + branch_element *elem = page_branch_element(ref->page,ref->index); + ref = cursor_push(c, elem->pgid); + }; +} + +// Recursively performs a binary search against a given page/node until it finds a given key. +void cursor_search(bolt_cursor *c, bolt_val key, pgid id) { + // Push page onto the cursor stack. + elem_ref *ref = cursor_push(c, id); + + // If we're on a leaf page/node then find the specific node. + if (ref->page->flags & PAGE_LEAF) { + cursor_search_leaf(c, key); + return; + } + + // Otherwise search the branch page. + cursor_search_branch(c, key); +} + +// Recursively search over a leaf page for a key. +void cursor_search_leaf(bolt_cursor *c, bolt_val key) { + elem_ref *ref = cursor_current(c); + int i; + + // HACK: Simply loop over elements to find the right one. Replace with a binary search. + leaf_element *elems = (leaf_element*)((void*)(ref->page) + sizeof(page)); + for (i=0; i<ref->page->count; i++) { + leaf_element *elem = &elems[i]; + int rc = memcmp(key.data, ((void*)elem) + elem->pos, (elem->ksize < key.size ? elem->ksize : key.size)); + + // printf("? %.*s | %.*s\n", key.size, key.data, elem->ksize, ((void*)elem) + elem->pos); + // printf("rc=%d; key.size(%d) >= elem->ksize(%d)\n", rc, key.size, elem->ksize); + if ((rc == 0 && key.size >= elem->ksize) || rc < 0) { + ref->index = i; + return; + } + } + + // If nothing was greater than the key then pop the current page off the stack. + cursor_pop(c); +} + +// Recursively search over a branch page for a key. +void cursor_search_branch(bolt_cursor *c, bolt_val key) { + elem_ref *ref = cursor_current(c); + int i; + + // HACK: Simply loop over elements to find the right one. Replace with a binary search. + branch_element *elems = (branch_element*)((void*)(ref->page) + sizeof(page)); + for (i=0; i<ref->page->count; i++) { + branch_element *elem = &elems[i]; + int rc = memcmp(key.data, ((void*)elem) + elem->pos, (elem->ksize < key.size ? elem->ksize : key.size)); + + if (rc == 0 && key.size == elem->ksize) { + // exact match, done + ref->index = i; + return; + } else if ((rc == 0 && key.size < elem->ksize) || rc < 0) { + // if key is less than anything in this subtree we are done + if (i == 0) return; + // otherwise search the previous subtree + cursor_search(c, key, elems[i-1].pgid); + // didn't find anything greater than key? + if (cursor_current(c) == ref) + ref->index = i; + else + ref->index = i-1; + return; + } + } + + // If nothing was greater than the key then pop the current page off the stack. + cursor_pop(c); +} + +*/ +import "C" + +import ( + "fmt" + "os" + "unsafe" + + "github.com/boltdb/bolt" +) + +// Cursor represents a wrapper around a Bolt C cursor. +type Cursor struct { + C *C.bolt_cursor +} + +// NewCursor creates a C cursor from a Bucket. +func NewCursor(b *bolt.Bucket) *Cursor { + info := b.Tx().DB().Info() + root := b.Root() + c := &Cursor{C: new(C.bolt_cursor)} + C.bolt_cursor_init(c.C, unsafe.Pointer(&info.Data[0]), C.size_t(info.PageSize), C.pgid(root)) + return c +} + +// Next moves the cursor to the first element and returns the key and value. +// Returns a nil key if there are no elements. +func (c *Cursor) First() (key, value []byte) { + var k, v C.bolt_val + var flags C.uint32_t + C.bolt_cursor_first(c.C, &k, &v, &flags) + return C.GoBytes(k.data, C.int(k.size)), C.GoBytes(v.data, C.int(v.size)) +} + +// Next moves the cursor to the next element and returns the key and value. +// Returns a nil key if there are no more key/value pairs. +func (c *Cursor) Next() (key, value []byte) { + var k, v C.bolt_val + var flags C.uint32_t + C.bolt_cursor_next(c.C, &k, &v, &flags) + return C.GoBytes(k.data, C.int(k.size)), C.GoBytes(v.data, C.int(v.size)) +} + +// Seek moves the cursor to a given key and returns it. +// If the key does not exist then the next key is used. If no keys +// follow, an empty value is returned. +func (c *Cursor) Seek(seek []byte) (key, value []byte, flags int) { + var _flags C.uint32_t + var _seek, k, v C.bolt_val + if len(seek) > 0 { + _seek.size = C.uint32_t(len(seek)) + _seek.data = unsafe.Pointer(&seek[0]) + } + C.bolt_cursor_seek(c.C, _seek, &k, &v, &_flags) + //fmt.Printf("Key %v [%v]\n", k.data, k.size) + //fmt.Printf("Value %v [%v]\n", k.data, k.size) + if k.data == nil { + return nil, nil, 0 + } + return C.GoBytes(k.data, C.int(k.size)), C.GoBytes(v.data, C.int(v.size)), int(_flags) +} + +func warn(v ...interface{}) { + fmt.Fprintln(os.Stderr, v...) +} + +func warnf(msg string, v ...interface{}) { + fmt.Fprintf(os.Stderr, msg+"\n", v...) +} diff --git a/c/cursor_test.go b/c/cursor_test.go new file mode 100644 index 0000000..9e7cb1b --- /dev/null +++ b/c/cursor_test.go @@ -0,0 +1,220 @@ +package c_test + +import ( + "fmt" + "io/ioutil" + "os" + "testing" + + "github.com/boltdb/bolt" + . "github.com/boltdb/bolt/c" + "github.com/stretchr/testify/assert" +) + +// Ensure that the C cursor can +func TestCursor_First(t *testing.T) { + withDB(func(db *bolt.DB) { + db.Update(func(tx *bolt.Tx) error { + b, _ := tx.CreateBucket([]byte("widgets")) + return b.Put([]byte("foo"), []byte("barz")) + }) + db.View(func(tx *bolt.Tx) error { + c := NewCursor(tx.Bucket([]byte("widgets"))) + key, value := c.First() + assert.Equal(t, []byte("foo"), key) + assert.Equal(t, []byte("barz"), value) + return nil + }) + }) +} + +// Ensure that a C cursor can seek to the appropriate keys. +func TestCursor_Seek(t *testing.T) { + withDB(func(db *bolt.DB) { + db.Update(func(tx *bolt.Tx) error { + b, err := tx.CreateBucket([]byte("widgets")) + assert.NoError(t, err) + assert.NoError(t, b.Put([]byte("foo"), []byte("0001"))) + assert.NoError(t, b.Put([]byte("bar"), []byte("0002"))) + assert.NoError(t, b.Put([]byte("baz"), []byte("0003"))) + _, err = b.CreateBucket([]byte("bkt")) + assert.NoError(t, err) + return nil + }) + db.View(func(tx *bolt.Tx) error { + c := NewCursor(tx.Bucket([]byte("widgets"))) + + // Exact match should go to the key. + k, v, flags := c.Seek([]byte("bar")) + assert.Equal(t, "bar", string(k)) + assert.Equal(t, "0002", string(v)) + assert.Equal(t, 0, flags) + + // Inexact match should go to the next key. + k, v, flags = c.Seek([]byte("bas")) + assert.Equal(t, "baz", string(k)) + assert.Equal(t, "0003", string(v)) + assert.Equal(t, 0, flags) + + // Low key should go to the first key. + k, v, flags = c.Seek([]byte("")) + assert.Equal(t, "bar", string(k)) + assert.Equal(t, "0002", string(v)) + assert.Equal(t, 0, flags) + + // High key should return no key. + k, v, flags = c.Seek([]byte("zzz")) + assert.Equal(t, "", string(k)) + assert.Equal(t, "", string(v)) + assert.Equal(t, 0, flags) + + // Buckets should return their key but no value. + k, v, flags = c.Seek([]byte("bkt")) + assert.Equal(t, []byte("bkt"), k) + assert.True(t, len(v) > 0) + assert.Equal(t, 1, flags) // bucketLeafFlag + + return nil + }) + }) +} + +// Ensure that a C cursor can iterate over a single root with a couple elements. +func TestCursor_Iterate_Leaf(t *testing.T) { + withDB(func(db *bolt.DB) { + db.Update(func(tx *bolt.Tx) error { + tx.CreateBucket([]byte("widgets")) + tx.Bucket([]byte("widgets")).Put([]byte("baz"), []byte{}) + tx.Bucket([]byte("widgets")).Put([]byte("foo"), []byte{0}) + tx.Bucket([]byte("widgets")).Put([]byte("bar"), []byte{1}) + return nil + }) + db.View(func(tx *bolt.Tx) error { + c := NewCursor(tx.Bucket([]byte("widgets"))) + + k, v := c.First() + assert.Equal(t, string(k), "bar") + assert.Equal(t, []byte{1}, v) + + k, v = c.Next() + assert.Equal(t, string(k), "baz") + assert.Equal(t, []byte{}, v) + + k, v = c.Next() + assert.Equal(t, string(k), "foo") + assert.Equal(t, []byte{0}, v) + + k, v = c.Next() + assert.Equal(t, []byte{}, k) + assert.Equal(t, []byte{}, v) + + k, v = c.Next() + assert.Equal(t, []byte{}, k) + assert.Equal(t, []byte{}, v) + return nil + }) + }) +} + +// Ensure that a C cursor can iterate over branches and leafs. +func TestCursor_Iterate_Large(t *testing.T) { + withDB(func(db *bolt.DB) { + db.Update(func(tx *bolt.Tx) error { + b, _ := tx.CreateBucket([]byte("widgets")) + for i := 0; i < 1000; i++ { + b.Put([]byte(fmt.Sprintf("%05d", i)), []byte(fmt.Sprintf("%020d", i))) + } + return nil + }) + db.View(func(tx *bolt.Tx) error { + var index int + c := NewCursor(tx.Bucket([]byte("widgets"))) + for k, v := c.First(); len(k) > 0; k, v = c.Next() { + assert.Equal(t, fmt.Sprintf("%05d", index), string(k)) + assert.Equal(t, fmt.Sprintf("%020d", index), string(v)) + index++ + } + assert.Equal(t, 1000, index) + return nil + }) + }) +} + +// Ensure that a C cursor can seek over branches and leafs. +func TestCursor_Seek_Large(t *testing.T) { + withDB(func(db *bolt.DB) { + db.Update(func(tx *bolt.Tx) error { + b, _ := tx.CreateBucket([]byte("widgets")) + for i := 1; i < 1000; i++ { + b.Put([]byte(fmt.Sprintf("%05d\000", i*10)), []byte(fmt.Sprintf("%020d", i*10))) + } + return nil + }) + db.View(func(tx *bolt.Tx) error { + c := NewCursor(tx.Bucket([]byte("widgets"))) + + // Exact match should go to the key. + k, v, _ := c.Seek([]byte("05000\000")) + assert.Equal(t, "05000\000", string(k)) + assert.Equal(t, fmt.Sprintf("%020d", 5000), string(v)) + + // Inexact match should go to the next key. + k, v, _ = c.Seek([]byte("07495\000")) + assert.Equal(t, "07500\000", string(k)) + assert.Equal(t, fmt.Sprintf("%020d", 7500), string(v)) + + // Low key should go to the first key. + k, v, _ = c.Seek([]byte("00000\000")) + assert.Equal(t, "00010\000", string(k)) + assert.Equal(t, fmt.Sprintf("%020d", 10), string(v)) + + // High key should return no key. + k, v, _ = c.Seek([]byte("40000\000")) + assert.Equal(t, "", string(k)) + assert.Equal(t, "", string(v)) + + return nil + }) + }) +} + +// tempfile returns a temporary path. +func tempfile() string { + f, _ := ioutil.TempFile("", "bolt-c-") + f.Close() + os.Remove(f.Name()) + return f.Name() +} + +// withDB executes a function with an already opened database. +func withDB(fn func(*bolt.DB)) { + path := tempfile() + db, err := bolt.Open(path, 0666) + if err != nil { + panic("cannot open db: " + err.Error()) + } + defer os.Remove(path) + defer db.Close() + + fn(db) + + // Check database consistency after every test. + mustCheck(db) +} + +// mustCheck runs a consistency check on the database and panics if any errors are found. +func mustCheck(db *bolt.DB) { + if err := db.Check(); err != nil { + // Copy db off first. + var path = tempfile() + db.CopyFile(path, 0600) + + if errors, ok := err.(bolt.ErrorList); ok { + for _, err := range errors { + fmt.Println(err) + } + } + fmt.Println(err) + panic("check failure: " + path) + } +} diff --git a/c/doc.go b/c/doc.go new file mode 100644 index 0000000..cbb98fe --- /dev/null +++ b/c/doc.go @@ -0,0 +1,4 @@ +/* +Package c provides a C interface to Bolt. +*/ +package c diff --git a/cursor_test.go b/cursor_test.go index 5dd178e..23b3a8e 100644 --- a/cursor_test.go +++ b/cursor_test.go @@ -101,7 +101,7 @@ func TestCursor_EmptyBucketReverse(t *testing.T) { } // Ensure that a Tx cursor can iterate over a single root with a couple elements. -func TestCursor_LeafRoot(t *testing.T) { +func TestCursor_Iterate_Leaf(t *testing.T) { withOpenDB(func(db *DB, path string) { db.Update(func(tx *Tx) error { tx.CreateBucket([]byte("widgets")) @@ -204,7 +204,7 @@ func TestCursor_Restart(t *testing.T) { } // Ensure that a Tx can iterate over all elements in a bucket. -func TestCursor_Iterate(t *testing.T) { +func TestCursor_QuickCheck(t *testing.T) { f := func(items testdata) bool { withOpenDB(func(db *DB, path string) { // Bulk insert all values. @@ -239,7 +239,7 @@ func TestCursor_Iterate(t *testing.T) { } // Ensure that a transaction can iterate over all elements in a bucket in reverse. -func TestCursor_Iterate_Reverse(t *testing.T) { +func TestCursor_QuickCheck_Reverse(t *testing.T) { f := func(items testdata) bool { withOpenDB(func(db *DB, path string) { // Bulk insert all values. @@ -274,7 +274,7 @@ func TestCursor_Iterate_Reverse(t *testing.T) { } // Ensure that a Tx cursor can iterate over subbuckets. -func TestCursor_Iterate_BucketsOnly(t *testing.T) { +func TestCursor_QuickCheck_BucketsOnly(t *testing.T) { withOpenDB(func(db *DB, path string) { db.Update(func(tx *Tx) error { b, err := tx.CreateBucket([]byte("widgets")) @@ -301,7 +301,7 @@ func TestCursor_Iterate_BucketsOnly(t *testing.T) { } // Ensure that a Tx cursor can reverse iterate over subbuckets. -func TestCursor_Iterate_BucketsOnly_Reverse(t *testing.T) { +func TestCursor_QuickCheck_BucketsOnly_Reverse(t *testing.T) { withOpenDB(func(db *DB, path string) { db.Update(func(tx *Tx) error { b, err := tx.CreateBucket([]byte("widgets")) @@ -578,6 +578,12 @@ func (db *DB) checkBucket(b *Bucket, reachable map[pgid]*page, errors *ErrorList }) } +// This is for internal access to the raw data bytes from the C cursor, use +// carefully, or not at all. +func (db *DB) Info() *Info { + return &Info{db.data, db.pageSize} +} + // page retrieves a page reference from the mmap based on the current page size. func (db *DB) page(id pgid) *page { pos := id * pgid(db.pageSize) @@ -641,3 +647,8 @@ func (s *Stats) Sub(other *Stats) Stats { func (s *Stats) add(other *Stats) { s.TxStats.add(&other.TxStats) } + +type Info struct { + Data []byte + PageSize int +} |