aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorBen Johnson <benbjohnson@yahoo.com>2014-07-23 15:08:59 -0600
committerBen Johnson <benbjohnson@yahoo.com>2014-07-23 15:08:59 -0600
commitaa66291b9b1e91773afc9c7d2081e570de742521 (patch)
treed9a6eb8f34baf289c8cb931620bb4e8d32b704a0
parentFix double spill. (diff)
downloaddedo-aa66291b9b1e91773afc9c7d2081e570de742521.tar.gz
dedo-aa66291b9b1e91773afc9c7d2081e570de742521.tar.xz
Fix root split on very large append.
This commit fixes a bug where a root split on a very large insert would cause an overflow of the root node. The problem was that the new root was not split when it was created so a new root with more than 64K child nodes would overflow the page.count (uint16).
-rw-r--r--bucket_test.go25
-rw-r--r--db_test.go37
-rw-r--r--node.go21
3 files changed, 61 insertions, 22 deletions
diff --git a/bucket_test.go b/bucket_test.go
index 13e6a8a..99fdad8 100644
--- a/bucket_test.go
+++ b/bucket_test.go
@@ -107,6 +107,31 @@ func TestBucket_Put_Large(t *testing.T) {
})
}
+// Ensure that a database can perform multiple large appends safely.
+func TestDB_Put_VeryLarge(t *testing.T) {
+ if testing.Short() {
+ t.Skip("skipping test in short mode.")
+ }
+
+ n, batchN := 400000, 200000
+ ksize, vsize := 8, 500
+
+ withOpenDB(func(db *DB, path string) {
+ for i := 0; i < n; i += batchN {
+ err := db.Update(func(tx *Tx) error {
+ b, _ := tx.CreateBucketIfNotExists([]byte("widgets"))
+ for j := 0; j < batchN; j++ {
+ k, v := make([]byte, ksize), make([]byte, vsize)
+ binary.BigEndian.PutUint32(k, uint32(i+j))
+ assert.NoError(t, b.Put(k, v))
+ }
+ return nil
+ })
+ assert.NoError(t, err)
+ }
+ })
+}
+
// Ensure that a setting a value on a key with a bucket value returns an error.
func TestBucket_Put_IncompatibleValue(t *testing.T) {
withOpenDB(func(db *DB, path string) {
diff --git a/db_test.go b/db_test.go
index 0809bcb..2063249 100644
--- a/db_test.go
+++ b/db_test.go
@@ -608,15 +608,36 @@ func withOpenDB(fn func(*DB, string)) {
// mustCheck runs a consistency check on the database and panics if any errors are found.
func mustCheck(db *DB) {
- err := db.View(func(tx *Tx) error {
- return <-tx.Check()
+ db.View(func(tx *Tx) error {
+ // Collect all the errors.
+ var errors []error
+ for err := range tx.Check() {
+ errors = append(errors, err)
+ if len(errors) > 10 {
+ break
+ }
+ }
+
+ // If errors occurred, copy the DB and print the errors.
+ if len(errors) > 0 {
+ var path = tempfile()
+ tx.CopyFile(path, 0600)
+
+ // Print errors.
+ fmt.Print("\n\n")
+ fmt.Printf("consistency check failed (%d errors)\n", len(errors))
+ for _, err := range errors {
+ fmt.Println(err)
+ }
+ fmt.Println("")
+ fmt.Println("db saved to:")
+ fmt.Println(path)
+ fmt.Print("\n\n")
+ os.Exit(-1)
+ }
+
+ return nil
})
- if err != nil {
- // Copy db off first.
- var path = tempfile()
- db.View(func(tx *Tx) error { return tx.CopyFile(path, 0600) })
- panic("check failure: " + err.Error() + ": " + path)
- }
}
// mustContainKeys checks that a bucket contains a given set of keys.
diff --git a/node.go b/node.go
index cda5201..75433ad 100644
--- a/node.go
+++ b/node.go
@@ -188,6 +188,8 @@ func (n *node) write(p *page) {
} else {
p.flags |= branchPageFlag
}
+
+ _assert(len(n.inodes) < 0xFFFF, "inode overflow: %d (pgid=%d)", len(n.inodes), p.id)
p.count = uint16(len(n.inodes))
// Loop over each item and write it to the page.
@@ -367,20 +369,11 @@ func (n *node) spill() error {
tx.stats.Spill++
}
- // This is a special case where we need to write the parent if it is new
- // and caused by a split in the root.
- var parent = n.parent
- if parent != nil && parent.pgid == 0 {
- // Allocate contiguous space for the node.
- p, err := tx.allocate((parent.size() / tx.db.pageSize) + 1)
- if err != nil {
- return err
- }
-
- // Write the new root.
- _assert(p.id < tx.meta.pgid, "pgid (%d) above high water mark (%d)", p.id, tx.meta.pgid)
- parent.pgid = p.id
- parent.write(p)
+ // If the root node split and created a new root then we need to spill that
+ // as well. We'll clear out the children to make sure it doesn't try to respill.
+ if n.parent != nil && n.parent.pgid == 0 {
+ n.children = nil
+ return n.parent.spill()
}
return nil