aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorBen Johnson <benbjohnson@yahoo.com>2014-01-30 19:26:10 -0800
committerBen Johnson <benbjohnson@yahoo.com>2014-01-30 19:26:10 -0800
commitd05191d164dbab56adbb3a17a62f66a62695c6d3 (patch)
tree3f270655af94ff2dbbce62f6065d4f1c24030c71
parentMerge pull request #2 from benbjohnson/master (diff)
parentAdd RWTransaction.write(). (diff)
downloaddedo-d05191d164dbab56adbb3a17a62f66a62695c6d3.tar.gz
dedo-d05191d164dbab56adbb3a17a62f66a62695c6d3.tar.xz
Merge pull request #3 from benbjohnson/spill
Spill to dirty pages, write to disk
-rw-r--r--branch.go18
-rw-r--r--bucket.go2
-rw-r--r--db.go14
-rw-r--r--leaf.go7
-rw-r--r--meta.go14
-rw-r--r--page.go9
-rw-r--r--rwtransaction.go167
-rw-r--r--rwtransaction_test.go22
-rw-r--r--sys.go95
-rw-r--r--sys_test.go70
-rw-r--r--transaction.go44
11 files changed, 387 insertions, 75 deletions
diff --git a/branch.go b/branch.go
index d752ac6..c4f0b1f 100644
--- a/branch.go
+++ b/branch.go
@@ -7,6 +7,8 @@ import (
// branch represents a temporary in-memory branch page.
type branch struct {
+ pgid pgid
+ depth int
parent *branch
items branchItems
}
@@ -42,11 +44,11 @@ func (b *branch) put(id pgid, newid pgid, key []byte, replace bool) {
}
// read initializes the item data from an on-disk page.
-func (b *branch) read(page *page) {
- ncount := int(page.count)
- b.items = make(branchItems, ncount)
- bnodes := (*[maxNodesPerPage]bnode)(unsafe.Pointer(&page.ptr))
- for i := 0; i < ncount; i++ {
+func (b *branch) read(p *page) {
+ b.pgid = p.id
+ b.items = make(branchItems, int(p.count))
+ bnodes := (*[maxNodesPerPage]bnode)(unsafe.Pointer(&p.ptr))
+ for i := 0; i < int(p.count); i++ {
bnode := &bnodes[i]
item := &b.items[i]
item.pgid = bnode.pgid
@@ -109,6 +111,12 @@ func (b *branch) split(pageSize int) []*branch {
return branches
}
+type branches []*branch
+
+func (s branches) Len() int { return len(s) }
+func (s branches) Swap(i, j int) { s[i], s[j] = s[j], s[i] }
+func (s branches) Less(i, j int) bool { return s[i].depth < s[j].depth }
+
type branchItems []branchItem
type branchItem struct {
diff --git a/bucket.go b/bucket.go
index 6cfe1ca..15dc6ac 100644
--- a/bucket.go
+++ b/bucket.go
@@ -1,5 +1,7 @@
package bolt
+const MaxBucketNameSize = 255
+
type Bucket struct {
*bucket
name string
diff --git a/db.go b/db.go
index 3bee89a..7ffbe09 100644
--- a/db.go
+++ b/db.go
@@ -118,7 +118,9 @@ func (db *DB) mmap() error {
} else if int(info.Size()) < db.pageSize*2 {
return &Error{"file size too small", err}
}
- size := int(info.Size())
+
+ // TEMP(benbjohnson): Set max size to 1MB.
+ size := 2 << 20
// Memory-map the data file as a byte slice.
if db.data, err = db.syscall.Mmap(int(db.file.Fd()), 0, size, syscall.PROT_READ, syscall.MAP_SHARED); err != nil {
@@ -159,7 +161,9 @@ func (db *DB) init() error {
m.pageSize = uint32(db.pageSize)
m.version = Version
m.free = 2
- m.sys.root = 3
+ m.sys = 3
+ m.pgid = 4
+ m.txnid = txnid(i)
}
// Write an empty freelist at page 3.
@@ -171,7 +175,7 @@ func (db *DB) init() error {
// Write an empty leaf page at page 4.
p = db.pageInBuffer(buf[:], pgid(3))
p.id = pgid(3)
- p.flags = p_leaf
+ p.flags = p_sys
p.count = 0
// Write the buffer to our data file.
@@ -206,7 +210,7 @@ func (db *DB) Transaction() (*Transaction, error) {
// Create a transaction associated with the database.
t := &Transaction{}
- t.init(db, db.meta())
+ t.init(db)
return t, nil
}
@@ -230,7 +234,7 @@ func (db *DB) RWTransaction() (*RWTransaction, error) {
branches: make(map[pgid]*branch),
leafs: make(map[pgid]*leaf),
}
- t.init(db, db.meta())
+ t.init(db)
return t, nil
}
diff --git a/leaf.go b/leaf.go
index 84cb08d..c86e041 100644
--- a/leaf.go
+++ b/leaf.go
@@ -8,6 +8,7 @@ import (
// leaf represents an in-memory, deserialized leaf page.
type leaf struct {
+ pgid pgid
parent *branch
items leafItems
}
@@ -39,10 +40,10 @@ func (l *leaf) put(key []byte, value []byte) {
// read initializes the item data from an on-disk page.
func (l *leaf) read(p *page) {
- ncount := int(p.count)
- l.items = make(leafItems, ncount)
+ l.pgid = p.id
+ l.items = make(leafItems, int(p.count))
lnodes := (*[maxNodesPerPage]lnode)(unsafe.Pointer(&p.ptr))
- for i := 0; i < ncount; i++ {
+ for i := 0; i < int(p.count); i++ {
lnode := &lnodes[i]
item := &l.items[i]
item.key = lnode.key()
diff --git a/meta.go b/meta.go
index f3bc4b6..659bfa7 100644
--- a/meta.go
+++ b/meta.go
@@ -14,8 +14,8 @@ type meta struct {
pageSize uint32
pgid pgid
free pgid
+ sys pgid
txnid txnid
- sys bucket
}
// validate checks the marker bytes and version of the meta page to ensure it matches this binary.
@@ -30,8 +30,20 @@ func (m *meta) validate() error {
// copy copies one meta object to another.
func (m *meta) copy(dest *meta) {
+ dest.magic = m.magic
+ dest.version = m.version
dest.pageSize = m.pageSize
dest.pgid = m.pgid
+ dest.free = m.free
dest.txnid = m.txnid
dest.sys = m.sys
}
+
+// write writes the meta onto a page.
+func (m *meta) write(p *page) {
+ // Page id is either going to be 0 or 1 which we can determine by the Txn ID.
+ p.id = pgid(m.txnid % 2)
+ p.flags |= p_meta
+
+ m.copy(p.meta())
+}
diff --git a/page.go b/page.go
index 7129208..c18c6b4 100644
--- a/page.go
+++ b/page.go
@@ -14,7 +14,8 @@ const (
p_branch = 0x01
p_leaf = 0x02
p_meta = 0x04
- p_freelist = 0x08
+ p_sys = 0x08
+ p_freelist = 0x10
)
type pgid uint64
@@ -56,3 +57,9 @@ func (p *page) bnodes() []bnode {
func (p *page) freelist() []pgid {
return ((*[maxNodesPerPage]pgid)(unsafe.Pointer(&p.ptr)))[0:p.count]
}
+
+type pages []*page
+
+func (s pages) Len() int { return len(s) }
+func (s pages) Swap(i, j int) { s[i], s[j] = s[j], s[i] }
+func (s pages) Less(i, j int) bool { return s[i].id < s[j].id }
diff --git a/rwtransaction.go b/rwtransaction.go
index 2c76dbd..1667609 100644
--- a/rwtransaction.go
+++ b/rwtransaction.go
@@ -1,6 +1,7 @@
package bolt
import (
+ "sort"
"unsafe"
)
@@ -12,31 +13,43 @@ type RWTransaction struct {
leafs map[pgid]*leaf
}
+// init initializes the transaction.
+func (t *RWTransaction) init(db *DB) {
+ t.Transaction.init(db)
+ t.pages = make(map[pgid]*page)
+
+ // Copy the meta and increase the transaction id.
+ t.meta = &meta{}
+ db.meta().copy(t.meta)
+ t.meta.txnid += txnid(2)
+}
+
// CreateBucket creates a new bucket.
func (t *RWTransaction) CreateBucket(name string) error {
// Check if bucket already exists.
if b := t.Bucket(name); b != nil {
return &Error{"bucket already exists", nil}
+ } else if len(name) == 0 {
+ return &Error{"bucket name cannot be blank", nil}
+ } else if len(name) > MaxBucketNameSize {
+ return &Error{"bucket name too large", nil}
}
- // Create a new bucket entry.
- var buf [unsafe.Sizeof(bucket{})]byte
- var raw = (*bucket)(unsafe.Pointer(&buf[0]))
- raw.root = 0
+ // Create a blank root leaf page.
+ p := t.allocate(1)
+ p.flags = p_leaf
- // Move cursor to insertion location.
- c := t.sys.Cursor()
- c.Goto([]byte(name))
-
- // Load the leaf node from the cursor and insert the key/value.
- t.leaf(c).put([]byte(name), buf[:])
+ // Add bucket to system page.
+ t.sys.put(name, &bucket{root: p.id})
return nil
}
// DropBucket deletes a bucket.
-func (t *RWTransaction) DeleteBucket(b *Bucket) error {
- // TODO: Remove from main DB.
+func (t *RWTransaction) DeleteBucket(name string) error {
+ // Remove from system page.
+ t.sys.del(name)
+
// TODO: Delete entry from system bucket.
// TODO: Free all pages.
// TODO: Remove cursor.
@@ -74,13 +87,31 @@ func (t *RWTransaction) Delete(key []byte) error {
return nil
}
+// Commit writes all changes to disk.
func (t *RWTransaction) Commit() error {
// TODO(benbjohnson): Use vectorized I/O to write out dirty pages.
- // TODO: Flush data.
+ // TODO: Rebalance.
+
+ // Spill data onto dirty pages.
+ t.spill()
- // TODO: Update meta.
- // TODO: Write meta.
+ // Spill system page.
+ p := t.allocate((t.sys.size() / t.db.pageSize) + 1)
+ t.sys.write(p)
+
+ // Write dirty pages to disk.
+ if err := t.write(); err != nil {
+ return err
+ }
+
+ // Update the meta.
+ t.meta.sys = p.id
+
+ // Write meta to disk.
+ if err := t.writeMeta(); err != nil {
+ return err
+ }
return nil
}
@@ -99,10 +130,107 @@ func (t *RWTransaction) close() error {
}
// allocate returns a contiguous block of memory starting at a given page.
-func (t *RWTransaction) allocate(size int) (*page, error) {
- // TODO: Find a continuous block of free pages.
- // TODO: If no free pages are available, resize the mmap to allocate more.
- return nil, nil
+func (t *RWTransaction) allocate(count int) *page {
+ // TODO(benbjohnson): Use pages from the freelist.
+
+ // Allocate a set of contiguous pages from the end of the file.
+ buf := make([]byte, count*t.db.pageSize)
+ p := (*page)(unsafe.Pointer(&buf[0]))
+ p.id = t.meta.pgid
+ p.overflow = uint32(count - 1)
+
+ // Increment the last page id.
+ t.meta.pgid += pgid(count)
+
+ // Save it in our page cache.
+ t.pages[p.id] = p
+
+ return p
+}
+
+// spill writes all the leafs and branches to dirty pages.
+func (t *RWTransaction) spill() {
+ // Spill leafs first.
+ for _, l := range t.leafs {
+ t.spillLeaf(l)
+ }
+
+ // Sort branches by highest depth first.
+ branches := make(branches, 0, len(t.branches))
+ for _, b := range t.branches {
+ branches = append(branches, b)
+ }
+ sort.Sort(branches)
+
+ // Spill branches by deepest first.
+ for _, b := range branches {
+ t.spillBranch(b)
+ }
+}
+
+// spillLeaf writes a leaf to one or more dirty pages.
+func (t *RWTransaction) spillLeaf(l *leaf) {
+ parent := l.parent
+
+ // Split leaf, if necessary.
+ leafs := l.split(t.db.pageSize)
+
+ // TODO: If this is a root leaf and we split then add a parent branch.
+
+ // Process each resulting leaf.
+ previd := leafs[0].pgid
+ for index, l := range leafs {
+ // Allocate contiguous space for the leaf.
+ p := t.allocate((l.size() / t.db.pageSize) + 1)
+
+ // Write the leaf to the page.
+ l.write(p)
+
+ // Insert or replace the node in the parent branch with the new pgid.
+ if parent != nil {
+ parent.put(previd, p.id, l.items[0].key, (index == 0))
+ previd = l.pgid
+ }
+ }
+}
+
+// spillBranch writes a branch to one or more dirty pages.
+func (t *RWTransaction) spillBranch(l *branch) {
+ warn("[pending] RWTransaction.spillBranch()") // TODO
+}
+
+// write writes any dirty pages to disk.
+func (t *RWTransaction) write() error {
+ // TODO(benbjohnson): If our last page id is greater than the mmap size then lock the DB and resize.
+
+ // Sort pages by id.
+ pages := make(pages, 0, len(t.pages))
+ for _, p := range t.pages {
+ pages = append(pages, p)
+ }
+ sort.Sort(pages)
+
+ // Write pages to disk in order.
+ for _, p := range pages {
+ size := (int(p.overflow) + 1) * t.db.pageSize
+ buf := (*[maxAllocSize]byte)(unsafe.Pointer(p))[:size]
+ t.db.file.WriteAt(buf, int64(p.id)*int64(t.db.pageSize))
+ }
+
+ return nil
+}
+
+// writeMeta writes the meta to the disk.
+func (t *RWTransaction) writeMeta() error {
+ // Create a temporary buffer for the meta page.
+ buf := make([]byte, t.db.pageSize)
+ p := t.db.pageInBuffer(buf, 0)
+ t.meta.write(p)
+
+ // Write the meta page to file.
+ t.db.metafile.WriteAt(buf, int64(p.id)*int64(t.db.pageSize))
+
+ return nil
}
// leaf retrieves a leaf object based on the current position of a cursor.
@@ -141,6 +269,7 @@ func (t *RWTransaction) branch(stack []elem) *branch {
// Otherwise create a branch and cache it.
b := &branch{}
b.read(t.page(id))
+ b.depth = len(stack) - 1
b.parent = t.branch(stack[:len(stack)-1])
t.branches[id] = b
diff --git a/rwtransaction_test.go b/rwtransaction_test.go
index 71edc64..2bcf252 100644
--- a/rwtransaction_test.go
+++ b/rwtransaction_test.go
@@ -27,18 +27,16 @@ func TestTransactionCreateBucket(t *testing.T) {
err = txn.Commit()
assert.NoError(t, err)
- /*
- // Open a separate read-only transaction.
- rtxn, err := db.Transaction()
- assert.NotNil(t, txn)
- assert.NoError(t, err)
-
- b, err := rtxn.Bucket("widgets")
- assert.NoError(t, err)
- if assert.NotNil(t, b) {
- assert.Equal(t, b.Name(), "widgets")
- }
- */
+ // Open a separate read-only transaction.
+ rtxn, err := db.Transaction()
+ assert.NotNil(t, txn)
+ assert.NoError(t, err)
+
+ b := rtxn.Bucket("widgets")
+ assert.NoError(t, err)
+ if assert.NotNil(t, b) {
+ assert.Equal(t, b.Name(), "widgets")
+ }
})
}
diff --git a/sys.go b/sys.go
new file mode 100644
index 0000000..657c973
--- /dev/null
+++ b/sys.go
@@ -0,0 +1,95 @@
+package bolt
+
+import (
+ "sort"
+ "unsafe"
+)
+
+// sys represents a in-memory system page.
+type sys struct {
+ pgid pgid
+ buckets map[string]*bucket
+}
+
+// size returns the size of the page after serialization.
+func (s *sys) size() int {
+ var size int = pageHeaderSize
+ for key, _ := range s.buckets {
+ size += int(unsafe.Sizeof(bucket{})) + len(key)
+ }
+ return size
+}
+
+// get retrieves a bucket by name.
+func (s *sys) get(key string) *bucket {
+ return s.buckets[key]
+}
+
+// put sets a new value for a bucket.
+func (s *sys) put(key string, b *bucket) {
+ s.buckets[key] = b
+}
+
+// del deletes a bucket by name.
+func (s *sys) del(key string) {
+ delete(s.buckets, key)
+}
+
+// read initializes the data from an on-disk page.
+func (s *sys) read(p *page) {
+ s.pgid = p.id
+ s.buckets = make(map[string]*bucket)
+
+ var buckets []*bucket
+ var keys []string
+
+ // Read buckets.
+ nodes := (*[maxNodesPerPage]bucket)(unsafe.Pointer(&p.ptr))
+ for i := 0; i < int(p.count); i++ {
+ node := &nodes[i]
+ buckets = append(buckets, node)
+ }
+
+ // Read keys.
+ buf := (*[maxAllocSize]byte)(unsafe.Pointer(&nodes[p.count]))[:]
+ for i := 0; i < int(p.count); i++ {
+ size := int(buf[0])
+ buf = buf[1:]
+ keys = append(keys, string(buf[:size]))
+ buf = buf[size:]
+ }
+
+ // Associate keys and buckets.
+ for index, key := range keys {
+ s.buckets[key] = buckets[index]
+ }
+}
+
+// write writes the items onto a page.
+func (s *sys) write(p *page) {
+ // Initialize page.
+ p.flags |= p_sys
+ p.count = uint16(len(s.buckets))
+
+ // Sort keys.
+ var keys []string
+ for key, _ := range s.buckets {
+ keys = append(keys, key)
+ }
+ sort.StringSlice(keys).Sort()
+
+ // Write each bucket to the page.
+ buckets := (*[maxNodesPerPage]bucket)(unsafe.Pointer(&p.ptr))
+ for index, key := range keys {
+ buckets[index] = *s.buckets[key]
+ }
+
+ // Write each key to the page.
+ buf := (*[maxAllocSize]byte)(unsafe.Pointer(&buckets[p.count]))[:]
+ for _, key := range keys {
+ buf[0] = byte(len(key))
+ buf = buf[1:]
+ copy(buf, []byte(key))
+ buf = buf[len(key):]
+ }
+}
diff --git a/sys_test.go b/sys_test.go
new file mode 100644
index 0000000..0dcae66
--- /dev/null
+++ b/sys_test.go
@@ -0,0 +1,70 @@
+package bolt
+
+import (
+ "testing"
+ "unsafe"
+
+ "github.com/stretchr/testify/assert"
+)
+
+// Ensure that a system page can set a bucket.
+func TestSysPut(t *testing.T) {
+ s := &sys{buckets: make(map[string]*bucket)}
+ s.put("foo", &bucket{root: 2})
+ s.put("bar", &bucket{root: 3})
+ s.put("foo", &bucket{root: 4})
+ assert.Equal(t, len(s.buckets), 2)
+ assert.Equal(t, s.get("foo").root, pgid(4))
+ assert.Equal(t, s.get("bar").root, pgid(3))
+ assert.Nil(t, s.get("no_such_bucket"))
+}
+
+// Ensure that a system page can deserialize from a page.
+func TestSysRead(t *testing.T) {
+ // Create a page.
+ var buf [4096]byte
+ page := (*page)(unsafe.Pointer(&buf[0]))
+ page.count = 2
+
+ // Insert 2 buckets at the beginning.
+ buckets := (*[3]bucket)(unsafe.Pointer(&page.ptr))
+ buckets[0] = bucket{root: 3}
+ buckets[1] = bucket{root: 4}
+
+ // Write data for the nodes at the end.
+ data := (*[4096]byte)(unsafe.Pointer(&buckets[2]))
+ data[0] = 3
+ copy(data[1:], []byte("bar"))
+ data[4] = 10
+ copy(data[5:], []byte("helloworld"))
+
+ // Deserialize page into a system page.
+ s := &sys{buckets: make(map[string]*bucket)}
+ s.read(page)
+
+ // Check that there are two items with correct data.
+ assert.Equal(t, len(s.buckets), 2)
+ assert.Equal(t, s.get("bar").root, pgid(3))
+ assert.Equal(t, s.get("helloworld").root, pgid(4))
+}
+
+// Ensure that a system page can serialize itself.
+func TestSysWrite(t *testing.T) {
+ s := &sys{buckets: make(map[string]*bucket)}
+ s.put("foo", &bucket{root: 2})
+ s.put("bar", &bucket{root: 3})
+
+ // Write it to a page.
+ var buf [4096]byte
+ p := (*page)(unsafe.Pointer(&buf[0]))
+ s.write(p)
+
+ // Read the page back in.
+ s2 := &sys{buckets: make(map[string]*bucket)}
+ s2.read(p)
+
+ // Check that the two pages are the same.
+ assert.Equal(t, len(s.buckets), 2)
+ assert.Equal(t, s.get("foo").root, pgid(2))
+ assert.Equal(t, s.get("bar").root, pgid(3))
+}
diff --git a/transaction.go b/transaction.go
index 3b03328..0dfb240 100644
--- a/transaction.go
+++ b/transaction.go
@@ -1,9 +1,5 @@
package bolt
-import (
- "unsafe"
-)
-
var (
InvalidTransactionError = &Error{"txn is invalid", nil}
BucketAlreadyExistsError = &Error{"bucket already exists", nil}
@@ -19,22 +15,21 @@ const (
type txnid uint64
type Transaction struct {
- id int
- db *DB
- meta *meta
- sys Bucket
- buckets map[string]*Bucket
- pages map[pgid]*page
+ id int
+ db *DB
+ meta *meta
+ sys *sys
+ pages map[pgid]*page
}
// init initializes the transaction and associates it with a database.
-func (t *Transaction) init(db *DB, meta *meta) {
+func (t *Transaction) init(db *DB) {
t.db = db
- t.meta = meta
- t.buckets = make(map[string]*Bucket)
+ t.meta = db.meta()
t.pages = nil
- t.sys.transaction = t
- t.sys.bucket = &t.meta.sys
+
+ t.sys = &sys{}
+ t.sys.read(t.page(t.meta.sys))
}
func (t *Transaction) Close() error {
@@ -48,26 +43,17 @@ func (t *Transaction) DB() *DB {
// Bucket retrieves a bucket by name.
func (t *Transaction) Bucket(name string) *Bucket {
- // Return cached reference if it's already been looked up.
- if b := t.buckets[name]; b != nil {
- return b
- }
-
- // Retrieve bucket data from the system bucket.
- value := t.sys.Cursor().Get([]byte(name))
- if value == nil {
+ // Lookup bucket from the system page.
+ b := t.sys.get(name)
+ if b == nil {
return nil
}
- // Create a bucket that overlays the data.
- b := &Bucket{
- bucket: (*bucket)(unsafe.Pointer(&value[0])),
+ return &Bucket{
+ bucket: b,
name: name,
transaction: t,
}
- t.buckets[name] = b
-
- return b
}
// Cursor creates a cursor associated with a given bucket.