aboutsummaryrefslogtreecommitdiff
path: root/rwtransaction.go
diff options
context:
space:
mode:
authorBen Johnson <benbjohnson@yahoo.com>2014-01-30 18:22:02 -0500
committerBen Johnson <benbjohnson@yahoo.com>2014-01-30 22:20:50 -0500
commit26f6fefeadf1b3e38b86a0a12ba8d1cbb7f347d3 (patch)
tree3f270655af94ff2dbbce62f6065d4f1c24030c71 /rwtransaction.go
parentMerge pull request #2 from benbjohnson/master (diff)
downloaddedo-26f6fefeadf1b3e38b86a0a12ba8d1cbb7f347d3.tar.gz
dedo-26f6fefeadf1b3e38b86a0a12ba8d1cbb7f347d3.tar.xz
Add RWTransaction.write().
Diffstat (limited to 'rwtransaction.go')
-rw-r--r--rwtransaction.go167
1 files changed, 148 insertions, 19 deletions
diff --git a/rwtransaction.go b/rwtransaction.go
index 2c76dbd..1667609 100644
--- a/rwtransaction.go
+++ b/rwtransaction.go
@@ -1,6 +1,7 @@
package bolt
import (
+ "sort"
"unsafe"
)
@@ -12,31 +13,43 @@ type RWTransaction struct {
leafs map[pgid]*leaf
}
+// init initializes the transaction.
+func (t *RWTransaction) init(db *DB) {
+ t.Transaction.init(db)
+ t.pages = make(map[pgid]*page)
+
+ // Copy the meta and increase the transaction id.
+ t.meta = &meta{}
+ db.meta().copy(t.meta)
+ t.meta.txnid += txnid(2)
+}
+
// CreateBucket creates a new bucket.
func (t *RWTransaction) CreateBucket(name string) error {
// Check if bucket already exists.
if b := t.Bucket(name); b != nil {
return &Error{"bucket already exists", nil}
+ } else if len(name) == 0 {
+ return &Error{"bucket name cannot be blank", nil}
+ } else if len(name) > MaxBucketNameSize {
+ return &Error{"bucket name too large", nil}
}
- // Create a new bucket entry.
- var buf [unsafe.Sizeof(bucket{})]byte
- var raw = (*bucket)(unsafe.Pointer(&buf[0]))
- raw.root = 0
+ // Create a blank root leaf page.
+ p := t.allocate(1)
+ p.flags = p_leaf
- // Move cursor to insertion location.
- c := t.sys.Cursor()
- c.Goto([]byte(name))
-
- // Load the leaf node from the cursor and insert the key/value.
- t.leaf(c).put([]byte(name), buf[:])
+ // Add bucket to system page.
+ t.sys.put(name, &bucket{root: p.id})
return nil
}
// DropBucket deletes a bucket.
-func (t *RWTransaction) DeleteBucket(b *Bucket) error {
- // TODO: Remove from main DB.
+func (t *RWTransaction) DeleteBucket(name string) error {
+ // Remove from system page.
+ t.sys.del(name)
+
// TODO: Delete entry from system bucket.
// TODO: Free all pages.
// TODO: Remove cursor.
@@ -74,13 +87,31 @@ func (t *RWTransaction) Delete(key []byte) error {
return nil
}
+// Commit writes all changes to disk.
func (t *RWTransaction) Commit() error {
// TODO(benbjohnson): Use vectorized I/O to write out dirty pages.
- // TODO: Flush data.
+ // TODO: Rebalance.
+
+ // Spill data onto dirty pages.
+ t.spill()
- // TODO: Update meta.
- // TODO: Write meta.
+ // Spill system page.
+ p := t.allocate((t.sys.size() / t.db.pageSize) + 1)
+ t.sys.write(p)
+
+ // Write dirty pages to disk.
+ if err := t.write(); err != nil {
+ return err
+ }
+
+ // Update the meta.
+ t.meta.sys = p.id
+
+ // Write meta to disk.
+ if err := t.writeMeta(); err != nil {
+ return err
+ }
return nil
}
@@ -99,10 +130,107 @@ func (t *RWTransaction) close() error {
}
// allocate returns a contiguous block of memory starting at a given page.
-func (t *RWTransaction) allocate(size int) (*page, error) {
- // TODO: Find a continuous block of free pages.
- // TODO: If no free pages are available, resize the mmap to allocate more.
- return nil, nil
+func (t *RWTransaction) allocate(count int) *page {
+ // TODO(benbjohnson): Use pages from the freelist.
+
+ // Allocate a set of contiguous pages from the end of the file.
+ buf := make([]byte, count*t.db.pageSize)
+ p := (*page)(unsafe.Pointer(&buf[0]))
+ p.id = t.meta.pgid
+ p.overflow = uint32(count - 1)
+
+ // Increment the last page id.
+ t.meta.pgid += pgid(count)
+
+ // Save it in our page cache.
+ t.pages[p.id] = p
+
+ return p
+}
+
+// spill writes all the leafs and branches to dirty pages.
+func (t *RWTransaction) spill() {
+ // Spill leafs first.
+ for _, l := range t.leafs {
+ t.spillLeaf(l)
+ }
+
+ // Sort branches by highest depth first.
+ branches := make(branches, 0, len(t.branches))
+ for _, b := range t.branches {
+ branches = append(branches, b)
+ }
+ sort.Sort(branches)
+
+ // Spill branches by deepest first.
+ for _, b := range branches {
+ t.spillBranch(b)
+ }
+}
+
+// spillLeaf writes a leaf to one or more dirty pages.
+func (t *RWTransaction) spillLeaf(l *leaf) {
+ parent := l.parent
+
+ // Split leaf, if necessary.
+ leafs := l.split(t.db.pageSize)
+
+ // TODO: If this is a root leaf and we split then add a parent branch.
+
+ // Process each resulting leaf.
+ previd := leafs[0].pgid
+ for index, l := range leafs {
+ // Allocate contiguous space for the leaf.
+ p := t.allocate((l.size() / t.db.pageSize) + 1)
+
+ // Write the leaf to the page.
+ l.write(p)
+
+ // Insert or replace the node in the parent branch with the new pgid.
+ if parent != nil {
+ parent.put(previd, p.id, l.items[0].key, (index == 0))
+ previd = l.pgid
+ }
+ }
+}
+
+// spillBranch writes a branch to one or more dirty pages.
+func (t *RWTransaction) spillBranch(l *branch) {
+ warn("[pending] RWTransaction.spillBranch()") // TODO
+}
+
+// write writes any dirty pages to disk.
+func (t *RWTransaction) write() error {
+ // TODO(benbjohnson): If our last page id is greater than the mmap size then lock the DB and resize.
+
+ // Sort pages by id.
+ pages := make(pages, 0, len(t.pages))
+ for _, p := range t.pages {
+ pages = append(pages, p)
+ }
+ sort.Sort(pages)
+
+ // Write pages to disk in order.
+ for _, p := range pages {
+ size := (int(p.overflow) + 1) * t.db.pageSize
+ buf := (*[maxAllocSize]byte)(unsafe.Pointer(p))[:size]
+ t.db.file.WriteAt(buf, int64(p.id)*int64(t.db.pageSize))
+ }
+
+ return nil
+}
+
+// writeMeta writes the meta to the disk.
+func (t *RWTransaction) writeMeta() error {
+ // Create a temporary buffer for the meta page.
+ buf := make([]byte, t.db.pageSize)
+ p := t.db.pageInBuffer(buf, 0)
+ t.meta.write(p)
+
+ // Write the meta page to file.
+ t.db.metafile.WriteAt(buf, int64(p.id)*int64(t.db.pageSize))
+
+ return nil
}
// leaf retrieves a leaf object based on the current position of a cursor.
@@ -141,6 +269,7 @@ func (t *RWTransaction) branch(stack []elem) *branch {
// Otherwise create a branch and cache it.
b := &branch{}
b.read(t.page(id))
+ b.depth = len(stack) - 1
b.parent = t.branch(stack[:len(stack)-1])
t.branches[id] = b