diff options
author | Ben Johnson <benbjohnson@yahoo.com> | 2014-01-30 18:22:02 -0500 |
---|---|---|
committer | Ben Johnson <benbjohnson@yahoo.com> | 2014-01-30 22:20:50 -0500 |
commit | 26f6fefeadf1b3e38b86a0a12ba8d1cbb7f347d3 (patch) | |
tree | 3f270655af94ff2dbbce62f6065d4f1c24030c71 /rwtransaction.go | |
parent | Merge pull request #2 from benbjohnson/master (diff) | |
download | dedo-26f6fefeadf1b3e38b86a0a12ba8d1cbb7f347d3.tar.gz dedo-26f6fefeadf1b3e38b86a0a12ba8d1cbb7f347d3.tar.xz |
Add RWTransaction.write().
Diffstat (limited to 'rwtransaction.go')
-rw-r--r-- | rwtransaction.go | 167 |
1 files changed, 148 insertions, 19 deletions
diff --git a/rwtransaction.go b/rwtransaction.go index 2c76dbd..1667609 100644 --- a/rwtransaction.go +++ b/rwtransaction.go @@ -1,6 +1,7 @@ package bolt import ( + "sort" "unsafe" ) @@ -12,31 +13,43 @@ type RWTransaction struct { leafs map[pgid]*leaf } +// init initializes the transaction. +func (t *RWTransaction) init(db *DB) { + t.Transaction.init(db) + t.pages = make(map[pgid]*page) + + // Copy the meta and increase the transaction id. + t.meta = &meta{} + db.meta().copy(t.meta) + t.meta.txnid += txnid(2) +} + // CreateBucket creates a new bucket. func (t *RWTransaction) CreateBucket(name string) error { // Check if bucket already exists. if b := t.Bucket(name); b != nil { return &Error{"bucket already exists", nil} + } else if len(name) == 0 { + return &Error{"bucket name cannot be blank", nil} + } else if len(name) > MaxBucketNameSize { + return &Error{"bucket name too large", nil} } - // Create a new bucket entry. - var buf [unsafe.Sizeof(bucket{})]byte - var raw = (*bucket)(unsafe.Pointer(&buf[0])) - raw.root = 0 + // Create a blank root leaf page. + p := t.allocate(1) + p.flags = p_leaf - // Move cursor to insertion location. - c := t.sys.Cursor() - c.Goto([]byte(name)) - - // Load the leaf node from the cursor and insert the key/value. - t.leaf(c).put([]byte(name), buf[:]) + // Add bucket to system page. + t.sys.put(name, &bucket{root: p.id}) return nil } // DropBucket deletes a bucket. -func (t *RWTransaction) DeleteBucket(b *Bucket) error { - // TODO: Remove from main DB. +func (t *RWTransaction) DeleteBucket(name string) error { + // Remove from system page. + t.sys.del(name) + // TODO: Delete entry from system bucket. // TODO: Free all pages. // TODO: Remove cursor. @@ -74,13 +87,31 @@ func (t *RWTransaction) Delete(key []byte) error { return nil } +// Commit writes all changes to disk. func (t *RWTransaction) Commit() error { // TODO(benbjohnson): Use vectorized I/O to write out dirty pages. - // TODO: Flush data. + // TODO: Rebalance. + + // Spill data onto dirty pages. + t.spill() - // TODO: Update meta. - // TODO: Write meta. + // Spill system page. + p := t.allocate((t.sys.size() / t.db.pageSize) + 1) + t.sys.write(p) + + // Write dirty pages to disk. + if err := t.write(); err != nil { + return err + } + + // Update the meta. + t.meta.sys = p.id + + // Write meta to disk. + if err := t.writeMeta(); err != nil { + return err + } return nil } @@ -99,10 +130,107 @@ func (t *RWTransaction) close() error { } // allocate returns a contiguous block of memory starting at a given page. -func (t *RWTransaction) allocate(size int) (*page, error) { - // TODO: Find a continuous block of free pages. - // TODO: If no free pages are available, resize the mmap to allocate more. - return nil, nil +func (t *RWTransaction) allocate(count int) *page { + // TODO(benbjohnson): Use pages from the freelist. + + // Allocate a set of contiguous pages from the end of the file. + buf := make([]byte, count*t.db.pageSize) + p := (*page)(unsafe.Pointer(&buf[0])) + p.id = t.meta.pgid + p.overflow = uint32(count - 1) + + // Increment the last page id. + t.meta.pgid += pgid(count) + + // Save it in our page cache. + t.pages[p.id] = p + + return p +} + +// spill writes all the leafs and branches to dirty pages. +func (t *RWTransaction) spill() { + // Spill leafs first. + for _, l := range t.leafs { + t.spillLeaf(l) + } + + // Sort branches by highest depth first. + branches := make(branches, 0, len(t.branches)) + for _, b := range t.branches { + branches = append(branches, b) + } + sort.Sort(branches) + + // Spill branches by deepest first. + for _, b := range branches { + t.spillBranch(b) + } +} + +// spillLeaf writes a leaf to one or more dirty pages. +func (t *RWTransaction) spillLeaf(l *leaf) { + parent := l.parent + + // Split leaf, if necessary. + leafs := l.split(t.db.pageSize) + + // TODO: If this is a root leaf and we split then add a parent branch. + + // Process each resulting leaf. + previd := leafs[0].pgid + for index, l := range leafs { + // Allocate contiguous space for the leaf. + p := t.allocate((l.size() / t.db.pageSize) + 1) + + // Write the leaf to the page. + l.write(p) + + // Insert or replace the node in the parent branch with the new pgid. + if parent != nil { + parent.put(previd, p.id, l.items[0].key, (index == 0)) + previd = l.pgid + } + } +} + +// spillBranch writes a branch to one or more dirty pages. +func (t *RWTransaction) spillBranch(l *branch) { + warn("[pending] RWTransaction.spillBranch()") // TODO +} + +// write writes any dirty pages to disk. +func (t *RWTransaction) write() error { + // TODO(benbjohnson): If our last page id is greater than the mmap size then lock the DB and resize. + + // Sort pages by id. + pages := make(pages, 0, len(t.pages)) + for _, p := range t.pages { + pages = append(pages, p) + } + sort.Sort(pages) + + // Write pages to disk in order. + for _, p := range pages { + size := (int(p.overflow) + 1) * t.db.pageSize + buf := (*[maxAllocSize]byte)(unsafe.Pointer(p))[:size] + t.db.file.WriteAt(buf, int64(p.id)*int64(t.db.pageSize)) + } + + return nil +} + +// writeMeta writes the meta to the disk. +func (t *RWTransaction) writeMeta() error { + // Create a temporary buffer for the meta page. + buf := make([]byte, t.db.pageSize) + p := t.db.pageInBuffer(buf, 0) + t.meta.write(p) + + // Write the meta page to file. + t.db.metafile.WriteAt(buf, int64(p.id)*int64(t.db.pageSize)) + + return nil } // leaf retrieves a leaf object based on the current position of a cursor. @@ -141,6 +269,7 @@ func (t *RWTransaction) branch(stack []elem) *branch { // Otherwise create a branch and cache it. b := &branch{} b.read(t.page(id)) + b.depth = len(stack) - 1 b.parent = t.branch(stack[:len(stack)-1]) t.branches[id] = b |