diff options
author | Ben Johnson <benbjohnson@yahoo.com> | 2014-01-30 19:26:10 -0800 |
---|---|---|
committer | Ben Johnson <benbjohnson@yahoo.com> | 2014-01-30 19:26:10 -0800 |
commit | d05191d164dbab56adbb3a17a62f66a62695c6d3 (patch) | |
tree | 3f270655af94ff2dbbce62f6065d4f1c24030c71 | |
parent | Merge pull request #2 from benbjohnson/master (diff) | |
parent | Add RWTransaction.write(). (diff) | |
download | dedo-d05191d164dbab56adbb3a17a62f66a62695c6d3.tar.gz dedo-d05191d164dbab56adbb3a17a62f66a62695c6d3.tar.xz |
Merge pull request #3 from benbjohnson/spill
Spill to dirty pages, write to disk
-rw-r--r-- | branch.go | 18 | ||||
-rw-r--r-- | bucket.go | 2 | ||||
-rw-r--r-- | db.go | 14 | ||||
-rw-r--r-- | leaf.go | 7 | ||||
-rw-r--r-- | meta.go | 14 | ||||
-rw-r--r-- | page.go | 9 | ||||
-rw-r--r-- | rwtransaction.go | 167 | ||||
-rw-r--r-- | rwtransaction_test.go | 22 | ||||
-rw-r--r-- | sys.go | 95 | ||||
-rw-r--r-- | sys_test.go | 70 | ||||
-rw-r--r-- | transaction.go | 44 |
11 files changed, 387 insertions, 75 deletions
@@ -7,6 +7,8 @@ import ( // branch represents a temporary in-memory branch page. type branch struct { + pgid pgid + depth int parent *branch items branchItems } @@ -42,11 +44,11 @@ func (b *branch) put(id pgid, newid pgid, key []byte, replace bool) { } // read initializes the item data from an on-disk page. -func (b *branch) read(page *page) { - ncount := int(page.count) - b.items = make(branchItems, ncount) - bnodes := (*[maxNodesPerPage]bnode)(unsafe.Pointer(&page.ptr)) - for i := 0; i < ncount; i++ { +func (b *branch) read(p *page) { + b.pgid = p.id + b.items = make(branchItems, int(p.count)) + bnodes := (*[maxNodesPerPage]bnode)(unsafe.Pointer(&p.ptr)) + for i := 0; i < int(p.count); i++ { bnode := &bnodes[i] item := &b.items[i] item.pgid = bnode.pgid @@ -109,6 +111,12 @@ func (b *branch) split(pageSize int) []*branch { return branches } +type branches []*branch + +func (s branches) Len() int { return len(s) } +func (s branches) Swap(i, j int) { s[i], s[j] = s[j], s[i] } +func (s branches) Less(i, j int) bool { return s[i].depth < s[j].depth } + type branchItems []branchItem type branchItem struct { @@ -1,5 +1,7 @@ package bolt +const MaxBucketNameSize = 255 + type Bucket struct { *bucket name string @@ -118,7 +118,9 @@ func (db *DB) mmap() error { } else if int(info.Size()) < db.pageSize*2 { return &Error{"file size too small", err} } - size := int(info.Size()) + + // TEMP(benbjohnson): Set max size to 1MB. + size := 2 << 20 // Memory-map the data file as a byte slice. if db.data, err = db.syscall.Mmap(int(db.file.Fd()), 0, size, syscall.PROT_READ, syscall.MAP_SHARED); err != nil { @@ -159,7 +161,9 @@ func (db *DB) init() error { m.pageSize = uint32(db.pageSize) m.version = Version m.free = 2 - m.sys.root = 3 + m.sys = 3 + m.pgid = 4 + m.txnid = txnid(i) } // Write an empty freelist at page 3. @@ -171,7 +175,7 @@ func (db *DB) init() error { // Write an empty leaf page at page 4. p = db.pageInBuffer(buf[:], pgid(3)) p.id = pgid(3) - p.flags = p_leaf + p.flags = p_sys p.count = 0 // Write the buffer to our data file. @@ -206,7 +210,7 @@ func (db *DB) Transaction() (*Transaction, error) { // Create a transaction associated with the database. t := &Transaction{} - t.init(db, db.meta()) + t.init(db) return t, nil } @@ -230,7 +234,7 @@ func (db *DB) RWTransaction() (*RWTransaction, error) { branches: make(map[pgid]*branch), leafs: make(map[pgid]*leaf), } - t.init(db, db.meta()) + t.init(db) return t, nil } @@ -8,6 +8,7 @@ import ( // leaf represents an in-memory, deserialized leaf page. type leaf struct { + pgid pgid parent *branch items leafItems } @@ -39,10 +40,10 @@ func (l *leaf) put(key []byte, value []byte) { // read initializes the item data from an on-disk page. func (l *leaf) read(p *page) { - ncount := int(p.count) - l.items = make(leafItems, ncount) + l.pgid = p.id + l.items = make(leafItems, int(p.count)) lnodes := (*[maxNodesPerPage]lnode)(unsafe.Pointer(&p.ptr)) - for i := 0; i < ncount; i++ { + for i := 0; i < int(p.count); i++ { lnode := &lnodes[i] item := &l.items[i] item.key = lnode.key() @@ -14,8 +14,8 @@ type meta struct { pageSize uint32 pgid pgid free pgid + sys pgid txnid txnid - sys bucket } // validate checks the marker bytes and version of the meta page to ensure it matches this binary. @@ -30,8 +30,20 @@ func (m *meta) validate() error { // copy copies one meta object to another. func (m *meta) copy(dest *meta) { + dest.magic = m.magic + dest.version = m.version dest.pageSize = m.pageSize dest.pgid = m.pgid + dest.free = m.free dest.txnid = m.txnid dest.sys = m.sys } + +// write writes the meta onto a page. +func (m *meta) write(p *page) { + // Page id is either going to be 0 or 1 which we can determine by the Txn ID. + p.id = pgid(m.txnid % 2) + p.flags |= p_meta + + m.copy(p.meta()) +} @@ -14,7 +14,8 @@ const ( p_branch = 0x01 p_leaf = 0x02 p_meta = 0x04 - p_freelist = 0x08 + p_sys = 0x08 + p_freelist = 0x10 ) type pgid uint64 @@ -56,3 +57,9 @@ func (p *page) bnodes() []bnode { func (p *page) freelist() []pgid { return ((*[maxNodesPerPage]pgid)(unsafe.Pointer(&p.ptr)))[0:p.count] } + +type pages []*page + +func (s pages) Len() int { return len(s) } +func (s pages) Swap(i, j int) { s[i], s[j] = s[j], s[i] } +func (s pages) Less(i, j int) bool { return s[i].id < s[j].id } diff --git a/rwtransaction.go b/rwtransaction.go index 2c76dbd..1667609 100644 --- a/rwtransaction.go +++ b/rwtransaction.go @@ -1,6 +1,7 @@ package bolt import ( + "sort" "unsafe" ) @@ -12,31 +13,43 @@ type RWTransaction struct { leafs map[pgid]*leaf } +// init initializes the transaction. +func (t *RWTransaction) init(db *DB) { + t.Transaction.init(db) + t.pages = make(map[pgid]*page) + + // Copy the meta and increase the transaction id. + t.meta = &meta{} + db.meta().copy(t.meta) + t.meta.txnid += txnid(2) +} + // CreateBucket creates a new bucket. func (t *RWTransaction) CreateBucket(name string) error { // Check if bucket already exists. if b := t.Bucket(name); b != nil { return &Error{"bucket already exists", nil} + } else if len(name) == 0 { + return &Error{"bucket name cannot be blank", nil} + } else if len(name) > MaxBucketNameSize { + return &Error{"bucket name too large", nil} } - // Create a new bucket entry. - var buf [unsafe.Sizeof(bucket{})]byte - var raw = (*bucket)(unsafe.Pointer(&buf[0])) - raw.root = 0 + // Create a blank root leaf page. + p := t.allocate(1) + p.flags = p_leaf - // Move cursor to insertion location. - c := t.sys.Cursor() - c.Goto([]byte(name)) - - // Load the leaf node from the cursor and insert the key/value. - t.leaf(c).put([]byte(name), buf[:]) + // Add bucket to system page. + t.sys.put(name, &bucket{root: p.id}) return nil } // DropBucket deletes a bucket. -func (t *RWTransaction) DeleteBucket(b *Bucket) error { - // TODO: Remove from main DB. +func (t *RWTransaction) DeleteBucket(name string) error { + // Remove from system page. + t.sys.del(name) + // TODO: Delete entry from system bucket. // TODO: Free all pages. // TODO: Remove cursor. @@ -74,13 +87,31 @@ func (t *RWTransaction) Delete(key []byte) error { return nil } +// Commit writes all changes to disk. func (t *RWTransaction) Commit() error { // TODO(benbjohnson): Use vectorized I/O to write out dirty pages. - // TODO: Flush data. + // TODO: Rebalance. + + // Spill data onto dirty pages. + t.spill() - // TODO: Update meta. - // TODO: Write meta. + // Spill system page. + p := t.allocate((t.sys.size() / t.db.pageSize) + 1) + t.sys.write(p) + + // Write dirty pages to disk. + if err := t.write(); err != nil { + return err + } + + // Update the meta. + t.meta.sys = p.id + + // Write meta to disk. + if err := t.writeMeta(); err != nil { + return err + } return nil } @@ -99,10 +130,107 @@ func (t *RWTransaction) close() error { } // allocate returns a contiguous block of memory starting at a given page. -func (t *RWTransaction) allocate(size int) (*page, error) { - // TODO: Find a continuous block of free pages. - // TODO: If no free pages are available, resize the mmap to allocate more. - return nil, nil +func (t *RWTransaction) allocate(count int) *page { + // TODO(benbjohnson): Use pages from the freelist. + + // Allocate a set of contiguous pages from the end of the file. + buf := make([]byte, count*t.db.pageSize) + p := (*page)(unsafe.Pointer(&buf[0])) + p.id = t.meta.pgid + p.overflow = uint32(count - 1) + + // Increment the last page id. + t.meta.pgid += pgid(count) + + // Save it in our page cache. + t.pages[p.id] = p + + return p +} + +// spill writes all the leafs and branches to dirty pages. +func (t *RWTransaction) spill() { + // Spill leafs first. + for _, l := range t.leafs { + t.spillLeaf(l) + } + + // Sort branches by highest depth first. + branches := make(branches, 0, len(t.branches)) + for _, b := range t.branches { + branches = append(branches, b) + } + sort.Sort(branches) + + // Spill branches by deepest first. + for _, b := range branches { + t.spillBranch(b) + } +} + +// spillLeaf writes a leaf to one or more dirty pages. +func (t *RWTransaction) spillLeaf(l *leaf) { + parent := l.parent + + // Split leaf, if necessary. + leafs := l.split(t.db.pageSize) + + // TODO: If this is a root leaf and we split then add a parent branch. + + // Process each resulting leaf. + previd := leafs[0].pgid + for index, l := range leafs { + // Allocate contiguous space for the leaf. + p := t.allocate((l.size() / t.db.pageSize) + 1) + + // Write the leaf to the page. + l.write(p) + + // Insert or replace the node in the parent branch with the new pgid. + if parent != nil { + parent.put(previd, p.id, l.items[0].key, (index == 0)) + previd = l.pgid + } + } +} + +// spillBranch writes a branch to one or more dirty pages. +func (t *RWTransaction) spillBranch(l *branch) { + warn("[pending] RWTransaction.spillBranch()") // TODO +} + +// write writes any dirty pages to disk. +func (t *RWTransaction) write() error { + // TODO(benbjohnson): If our last page id is greater than the mmap size then lock the DB and resize. + + // Sort pages by id. + pages := make(pages, 0, len(t.pages)) + for _, p := range t.pages { + pages = append(pages, p) + } + sort.Sort(pages) + + // Write pages to disk in order. + for _, p := range pages { + size := (int(p.overflow) + 1) * t.db.pageSize + buf := (*[maxAllocSize]byte)(unsafe.Pointer(p))[:size] + t.db.file.WriteAt(buf, int64(p.id)*int64(t.db.pageSize)) + } + + return nil +} + +// writeMeta writes the meta to the disk. +func (t *RWTransaction) writeMeta() error { + // Create a temporary buffer for the meta page. + buf := make([]byte, t.db.pageSize) + p := t.db.pageInBuffer(buf, 0) + t.meta.write(p) + + // Write the meta page to file. + t.db.metafile.WriteAt(buf, int64(p.id)*int64(t.db.pageSize)) + + return nil } // leaf retrieves a leaf object based on the current position of a cursor. @@ -141,6 +269,7 @@ func (t *RWTransaction) branch(stack []elem) *branch { // Otherwise create a branch and cache it. b := &branch{} b.read(t.page(id)) + b.depth = len(stack) - 1 b.parent = t.branch(stack[:len(stack)-1]) t.branches[id] = b diff --git a/rwtransaction_test.go b/rwtransaction_test.go index 71edc64..2bcf252 100644 --- a/rwtransaction_test.go +++ b/rwtransaction_test.go @@ -27,18 +27,16 @@ func TestTransactionCreateBucket(t *testing.T) { err = txn.Commit() assert.NoError(t, err) - /* - // Open a separate read-only transaction. - rtxn, err := db.Transaction() - assert.NotNil(t, txn) - assert.NoError(t, err) - - b, err := rtxn.Bucket("widgets") - assert.NoError(t, err) - if assert.NotNil(t, b) { - assert.Equal(t, b.Name(), "widgets") - } - */ + // Open a separate read-only transaction. + rtxn, err := db.Transaction() + assert.NotNil(t, txn) + assert.NoError(t, err) + + b := rtxn.Bucket("widgets") + assert.NoError(t, err) + if assert.NotNil(t, b) { + assert.Equal(t, b.Name(), "widgets") + } }) } @@ -0,0 +1,95 @@ +package bolt + +import ( + "sort" + "unsafe" +) + +// sys represents a in-memory system page. +type sys struct { + pgid pgid + buckets map[string]*bucket +} + +// size returns the size of the page after serialization. +func (s *sys) size() int { + var size int = pageHeaderSize + for key, _ := range s.buckets { + size += int(unsafe.Sizeof(bucket{})) + len(key) + } + return size +} + +// get retrieves a bucket by name. +func (s *sys) get(key string) *bucket { + return s.buckets[key] +} + +// put sets a new value for a bucket. +func (s *sys) put(key string, b *bucket) { + s.buckets[key] = b +} + +// del deletes a bucket by name. +func (s *sys) del(key string) { + delete(s.buckets, key) +} + +// read initializes the data from an on-disk page. +func (s *sys) read(p *page) { + s.pgid = p.id + s.buckets = make(map[string]*bucket) + + var buckets []*bucket + var keys []string + + // Read buckets. + nodes := (*[maxNodesPerPage]bucket)(unsafe.Pointer(&p.ptr)) + for i := 0; i < int(p.count); i++ { + node := &nodes[i] + buckets = append(buckets, node) + } + + // Read keys. + buf := (*[maxAllocSize]byte)(unsafe.Pointer(&nodes[p.count]))[:] + for i := 0; i < int(p.count); i++ { + size := int(buf[0]) + buf = buf[1:] + keys = append(keys, string(buf[:size])) + buf = buf[size:] + } + + // Associate keys and buckets. + for index, key := range keys { + s.buckets[key] = buckets[index] + } +} + +// write writes the items onto a page. +func (s *sys) write(p *page) { + // Initialize page. + p.flags |= p_sys + p.count = uint16(len(s.buckets)) + + // Sort keys. + var keys []string + for key, _ := range s.buckets { + keys = append(keys, key) + } + sort.StringSlice(keys).Sort() + + // Write each bucket to the page. + buckets := (*[maxNodesPerPage]bucket)(unsafe.Pointer(&p.ptr)) + for index, key := range keys { + buckets[index] = *s.buckets[key] + } + + // Write each key to the page. + buf := (*[maxAllocSize]byte)(unsafe.Pointer(&buckets[p.count]))[:] + for _, key := range keys { + buf[0] = byte(len(key)) + buf = buf[1:] + copy(buf, []byte(key)) + buf = buf[len(key):] + } +} diff --git a/sys_test.go b/sys_test.go new file mode 100644 index 0000000..0dcae66 --- /dev/null +++ b/sys_test.go @@ -0,0 +1,70 @@ +package bolt + +import ( + "testing" + "unsafe" + + "github.com/stretchr/testify/assert" +) + +// Ensure that a system page can set a bucket. +func TestSysPut(t *testing.T) { + s := &sys{buckets: make(map[string]*bucket)} + s.put("foo", &bucket{root: 2}) + s.put("bar", &bucket{root: 3}) + s.put("foo", &bucket{root: 4}) + assert.Equal(t, len(s.buckets), 2) + assert.Equal(t, s.get("foo").root, pgid(4)) + assert.Equal(t, s.get("bar").root, pgid(3)) + assert.Nil(t, s.get("no_such_bucket")) +} + +// Ensure that a system page can deserialize from a page. +func TestSysRead(t *testing.T) { + // Create a page. + var buf [4096]byte + page := (*page)(unsafe.Pointer(&buf[0])) + page.count = 2 + + // Insert 2 buckets at the beginning. + buckets := (*[3]bucket)(unsafe.Pointer(&page.ptr)) + buckets[0] = bucket{root: 3} + buckets[1] = bucket{root: 4} + + // Write data for the nodes at the end. + data := (*[4096]byte)(unsafe.Pointer(&buckets[2])) + data[0] = 3 + copy(data[1:], []byte("bar")) + data[4] = 10 + copy(data[5:], []byte("helloworld")) + + // Deserialize page into a system page. + s := &sys{buckets: make(map[string]*bucket)} + s.read(page) + + // Check that there are two items with correct data. + assert.Equal(t, len(s.buckets), 2) + assert.Equal(t, s.get("bar").root, pgid(3)) + assert.Equal(t, s.get("helloworld").root, pgid(4)) +} + +// Ensure that a system page can serialize itself. +func TestSysWrite(t *testing.T) { + s := &sys{buckets: make(map[string]*bucket)} + s.put("foo", &bucket{root: 2}) + s.put("bar", &bucket{root: 3}) + + // Write it to a page. + var buf [4096]byte + p := (*page)(unsafe.Pointer(&buf[0])) + s.write(p) + + // Read the page back in. + s2 := &sys{buckets: make(map[string]*bucket)} + s2.read(p) + + // Check that the two pages are the same. + assert.Equal(t, len(s.buckets), 2) + assert.Equal(t, s.get("foo").root, pgid(2)) + assert.Equal(t, s.get("bar").root, pgid(3)) +} diff --git a/transaction.go b/transaction.go index 3b03328..0dfb240 100644 --- a/transaction.go +++ b/transaction.go @@ -1,9 +1,5 @@ package bolt -import ( - "unsafe" -) - var ( InvalidTransactionError = &Error{"txn is invalid", nil} BucketAlreadyExistsError = &Error{"bucket already exists", nil} @@ -19,22 +15,21 @@ const ( type txnid uint64 type Transaction struct { - id int - db *DB - meta *meta - sys Bucket - buckets map[string]*Bucket - pages map[pgid]*page + id int + db *DB + meta *meta + sys *sys + pages map[pgid]*page } // init initializes the transaction and associates it with a database. -func (t *Transaction) init(db *DB, meta *meta) { +func (t *Transaction) init(db *DB) { t.db = db - t.meta = meta - t.buckets = make(map[string]*Bucket) + t.meta = db.meta() t.pages = nil - t.sys.transaction = t - t.sys.bucket = &t.meta.sys + + t.sys = &sys{} + t.sys.read(t.page(t.meta.sys)) } func (t *Transaction) Close() error { @@ -48,26 +43,17 @@ func (t *Transaction) DB() *DB { // Bucket retrieves a bucket by name. func (t *Transaction) Bucket(name string) *Bucket { - // Return cached reference if it's already been looked up. - if b := t.buckets[name]; b != nil { - return b - } - - // Retrieve bucket data from the system bucket. - value := t.sys.Cursor().Get([]byte(name)) - if value == nil { + // Lookup bucket from the system page. + b := t.sys.get(name) + if b == nil { return nil } - // Create a bucket that overlays the data. - b := &Bucket{ - bucket: (*bucket)(unsafe.Pointer(&value[0])), + return &Bucket{ + bucket: b, name: name, transaction: t, } - t.buckets[name] = b - - return b } // Cursor creates a cursor associated with a given bucket. |