diff options
-rw-r--r-- | README.md | 10 | ||||
-rw-r--r-- | bolt_386.go | 3 | ||||
-rw-r--r-- | bolt_amd64.go | 3 | ||||
-rw-r--r-- | bolt_arm.go | 21 | ||||
-rw-r--r-- | bolt_arm64.go | 3 | ||||
-rw-r--r-- | bolt_ppc64le.go | 3 | ||||
-rw-r--r-- | bolt_s390x.go | 3 | ||||
-rw-r--r-- | bucket.go | 32 | ||||
-rw-r--r-- | bucket_test.go | 42 | ||||
-rw-r--r-- | cmd/bolt/main.go | 212 | ||||
-rw-r--r-- | cmd/bolt/main_test.go | 171 | ||||
-rw-r--r-- | freelist.go | 2 |
12 files changed, 496 insertions, 9 deletions
@@ -15,11 +15,11 @@ and setting values. That's it. ## Project Status -Bolt is stable and the API is fixed. Full unit test coverage and randomized -black box testing are used to ensure database consistency and thread safety. -Bolt is currently in high-load production environments serving databases as -large as 1TB. Many companies such as Shopify and Heroku use Bolt-backed -services every day. +Bolt is stable, the API is fixed, and the file format is fixed. Full unit +test coverage and randomized black box testing are used to ensure database +consistency and thread safety. Bolt is currently in high-load production +environments serving databases as large as 1TB. Many companies such as +Shopify and Heroku use Bolt-backed services every day. ## Table of Contents diff --git a/bolt_386.go b/bolt_386.go index e659bfb..820d533 100644 --- a/bolt_386.go +++ b/bolt_386.go @@ -5,3 +5,6 @@ const maxMapSize = 0x7FFFFFFF // 2GB // maxAllocSize is the size used when creating array pointers. const maxAllocSize = 0xFFFFFFF + +// Are unaligned load/stores broken on this arch? +var brokenUnaligned = false diff --git a/bolt_amd64.go b/bolt_amd64.go index cca6b7e..98fafdb 100644 --- a/bolt_amd64.go +++ b/bolt_amd64.go @@ -5,3 +5,6 @@ const maxMapSize = 0xFFFFFFFFFFFF // 256TB // maxAllocSize is the size used when creating array pointers. const maxAllocSize = 0x7FFFFFFF + +// Are unaligned load/stores broken on this arch? +var brokenUnaligned = false diff --git a/bolt_arm.go b/bolt_arm.go index e659bfb..7e5cb4b 100644 --- a/bolt_arm.go +++ b/bolt_arm.go @@ -1,7 +1,28 @@ package bolt +import "unsafe" + // maxMapSize represents the largest mmap size supported by Bolt. const maxMapSize = 0x7FFFFFFF // 2GB // maxAllocSize is the size used when creating array pointers. const maxAllocSize = 0xFFFFFFF + +// Are unaligned load/stores broken on this arch? +var brokenUnaligned bool + +func init() { + // Simple check to see whether this arch handles unaligned load/stores + // correctly. + + // ARM9 and older devices require load/stores to be from/to aligned + // addresses. If not, the lower 2 bits are cleared and that address is + // read in a jumbled up order. + + // See http://infocenter.arm.com/help/index.jsp?topic=/com.arm.doc.faqs/ka15414.html + + raw := [6]byte{0xfe, 0xef, 0x11, 0x22, 0x22, 0x11} + val := *(*uint32)(unsafe.Pointer(uintptr(unsafe.Pointer(&raw)) + 2)) + + brokenUnaligned = val != 0x11222211 +} diff --git a/bolt_arm64.go b/bolt_arm64.go index 6d23093..b26d84f 100644 --- a/bolt_arm64.go +++ b/bolt_arm64.go @@ -7,3 +7,6 @@ const maxMapSize = 0xFFFFFFFFFFFF // 256TB // maxAllocSize is the size used when creating array pointers. const maxAllocSize = 0x7FFFFFFF + +// Are unaligned load/stores broken on this arch? +var brokenUnaligned = false diff --git a/bolt_ppc64le.go b/bolt_ppc64le.go index 8351e12..8c143bc 100644 --- a/bolt_ppc64le.go +++ b/bolt_ppc64le.go @@ -7,3 +7,6 @@ const maxMapSize = 0xFFFFFFFFFFFF // 256TB // maxAllocSize is the size used when creating array pointers. const maxAllocSize = 0x7FFFFFFF + +// Are unaligned load/stores broken on this arch? +var brokenUnaligned = false diff --git a/bolt_s390x.go b/bolt_s390x.go index f4dd26b..d7c39af 100644 --- a/bolt_s390x.go +++ b/bolt_s390x.go @@ -7,3 +7,6 @@ const maxMapSize = 0xFFFFFFFFFFFF // 256TB // maxAllocSize is the size used when creating array pointers. const maxAllocSize = 0x7FFFFFFF + +// Are unaligned load/stores broken on this arch? +var brokenUnaligned = false @@ -130,9 +130,17 @@ func (b *Bucket) Bucket(name []byte) *Bucket { func (b *Bucket) openBucket(value []byte) *Bucket { var child = newBucket(b.tx) + // If unaligned load/stores are broken on this arch and value is + // unaligned simply clone to an aligned byte array. + unaligned := brokenUnaligned && uintptr(unsafe.Pointer(&value[0]))&3 != 0 + + if unaligned { + value = cloneBytes(value) + } + // If this is a writable transaction then we need to copy the bucket entry. // Read-only transactions can point directly at the mmap entry. - if b.tx.writable { + if b.tx.writable && !unaligned { child.bucket = &bucket{} *child.bucket = *(*bucket)(unsafe.Pointer(&value[0])) } else { @@ -329,6 +337,28 @@ func (b *Bucket) Delete(key []byte) error { return nil } +// Sequence returns the current integer for the bucket without incrementing it. +func (b *Bucket) Sequence() uint64 { return b.bucket.sequence } + +// SetSequence updates the sequence number for the bucket. +func (b *Bucket) SetSequence(v uint64) error { + if b.tx.db == nil { + return ErrTxClosed + } else if !b.Writable() { + return ErrTxNotWritable + } + + // Materialize the root node if it hasn't been already so that the + // bucket will be saved during commit. + if b.rootNode == nil { + _ = b.node(b.root, nil) + } + + // Increment and return the sequence. + b.bucket.sequence = v + return nil +} + // NextSequence returns an autoincrementing integer for the bucket. func (b *Bucket) NextSequence() (uint64, error) { if b.tx.db == nil { diff --git a/bucket_test.go b/bucket_test.go index 528fec2..cddbe27 100644 --- a/bucket_test.go +++ b/bucket_test.go @@ -782,6 +782,48 @@ func TestBucket_DeleteBucket_IncompatibleValue(t *testing.T) { } } +// Ensure bucket can set and update its sequence number. +func TestBucket_Sequence(t *testing.T) { + db := MustOpenDB() + defer db.MustClose() + + if err := db.Update(func(tx *bolt.Tx) error { + bkt, err := tx.CreateBucket([]byte("0")) + if err != nil { + t.Fatal(err) + } + + // Retrieve sequence. + if v := bkt.Sequence(); v != 0 { + t.Fatalf("unexpected sequence: %d", v) + } + + // Update sequence. + if err := bkt.SetSequence(1000); err != nil { + t.Fatal(err) + } + + // Read sequence again. + if v := bkt.Sequence(); v != 1000 { + t.Fatalf("unexpected sequence: %d", v) + } + + return nil + }); err != nil { + t.Fatal(err) + } + + // Verify sequence in separate transaction. + if err := db.View(func(tx *bolt.Tx) error { + if v := tx.Bucket([]byte("0")).Sequence(); v != 1000 { + t.Fatalf("unexpected sequence: %d", v) + } + return nil + }); err != nil { + t.Fatal(err) + } +} + // Ensure that a bucket can return an autoincrementing sequence. func TestBucket_NextSequence(t *testing.T) { db := MustOpenDB() diff --git a/cmd/bolt/main.go b/cmd/bolt/main.go index b96e6f7..29e393f 100644 --- a/cmd/bolt/main.go +++ b/cmd/bolt/main.go @@ -102,6 +102,8 @@ func (m *Main) Run(args ...string) error { return newBenchCommand(m).Run(args[1:]...) case "check": return newCheckCommand(m).Run(args[1:]...) + case "compact": + return newCompactCommand(m).Run(args[1:]...) case "dump": return newDumpCommand(m).Run(args[1:]...) case "info": @@ -130,6 +132,7 @@ The commands are: bench run synthetic benchmark against bolt check verifies integrity of bolt database + compact copies a bolt database, compacting it in the process info print basic info help print this screen pages print list of pages with their types @@ -539,9 +542,9 @@ func (cmd *PageCommand) PrintLeaf(w io.Writer, buf []byte) error { b := (*bucket)(unsafe.Pointer(&e.value()[0])) v = fmt.Sprintf("<pgid=%d,seq=%d>", b.root, b.sequence) } else if isPrintable(string(e.value())) { - k = fmt.Sprintf("%q", string(e.value())) + v = fmt.Sprintf("%q", string(e.value())) } else { - k = fmt.Sprintf("%x", string(e.value())) + v = fmt.Sprintf("%x", string(e.value())) } fmt.Fprintf(w, "%s: %s\n", k, v) @@ -1530,3 +1533,208 @@ func (n *leafPageElement) value() []byte { buf := (*[maxAllocSize]byte)(unsafe.Pointer(n)) return buf[n.pos+n.ksize : n.pos+n.ksize+n.vsize] } + +// CompactCommand represents the "compact" command execution. +type CompactCommand struct { + Stdin io.Reader + Stdout io.Writer + Stderr io.Writer + + SrcPath string + DstPath string + TxMaxSize int64 +} + +// newCompactCommand returns a CompactCommand. +func newCompactCommand(m *Main) *CompactCommand { + return &CompactCommand{ + Stdin: m.Stdin, + Stdout: m.Stdout, + Stderr: m.Stderr, + } +} + +// Run executes the command. +func (cmd *CompactCommand) Run(args ...string) (err error) { + // Parse flags. + fs := flag.NewFlagSet("", flag.ContinueOnError) + fs.SetOutput(ioutil.Discard) + fs.StringVar(&cmd.DstPath, "o", "", "") + fs.Int64Var(&cmd.TxMaxSize, "tx-max-size", 65536, "") + if err := fs.Parse(args); err == flag.ErrHelp { + fmt.Fprintln(cmd.Stderr, cmd.Usage()) + return ErrUsage + } else if err != nil { + return err + } else if cmd.DstPath == "" { + return fmt.Errorf("output file required") + } + + // Require database paths. + cmd.SrcPath = fs.Arg(0) + if cmd.SrcPath == "" { + return ErrPathRequired + } + + // Ensure source file exists. + fi, err := os.Stat(cmd.SrcPath) + if os.IsNotExist(err) { + return ErrFileNotFound + } else if err != nil { + return err + } + initialSize := fi.Size() + + // Open source database. + src, err := bolt.Open(cmd.SrcPath, 0444, nil) + if err != nil { + return err + } + defer src.Close() + + // Open destination database. + dst, err := bolt.Open(cmd.DstPath, fi.Mode(), nil) + if err != nil { + return err + } + defer dst.Close() + + // Run compaction. + if err := cmd.compact(dst, src); err != nil { + return err + } + + // Report stats on new size. + fi, err = os.Stat(cmd.DstPath) + if err != nil { + return err + } else if fi.Size() == 0 { + return fmt.Errorf("zero db size") + } + fmt.Fprintf(cmd.Stdout, "%d -> %d bytes (gain=%.2fx)\n", initialSize, fi.Size(), float64(initialSize)/float64(fi.Size())) + + return nil +} + +func (cmd *CompactCommand) compact(dst, src *bolt.DB) error { + // commit regularly, or we'll run out of memory for large datasets if using one transaction. + var size int64 + tx, err := dst.Begin(true) + if err != nil { + return err + } + defer tx.Rollback() + + if err := cmd.walk(src, func(keys [][]byte, k, v []byte, seq uint64) error { + // On each key/value, check if we have exceeded tx size. + sz := int64(len(k) + len(v)) + if size+sz > cmd.TxMaxSize && cmd.TxMaxSize != 0 { + // Commit previous transaction. + if err := tx.Commit(); err != nil { + return err + } + + // Start new transaction. + tx, err = dst.Begin(true) + if err != nil { + return err + } + size = 0 + } + size += sz + + // Create bucket on the root transaction if this is the first level. + nk := len(keys) + if nk == 0 { + bkt, err := tx.CreateBucket(k) + if err != nil { + return err + } + if err := bkt.SetSequence(seq); err != nil { + return err + } + return nil + } + + // Create buckets on subsequent levels, if necessary. + b := tx.Bucket(keys[0]) + if nk > 1 { + for _, k := range keys[1:] { + b = b.Bucket(k) + } + } + + // If there is no value then this is a bucket call. + if v == nil { + bkt, err := b.CreateBucket(k) + if err != nil { + return err + } + if err := bkt.SetSequence(seq); err != nil { + return err + } + return nil + } + + // Otherwise treat it as a key/value pair. + return b.Put(k, v) + }); err != nil { + return err + } + + return tx.Commit() +} + +// walkFunc is the type of the function called for keys (buckets and "normal" +// values) discovered by Walk. keys is the list of keys to descend to the bucket +// owning the discovered key/value pair k/v. +type walkFunc func(keys [][]byte, k, v []byte, seq uint64) error + +// walk walks recursively the bolt database db, calling walkFn for each key it finds. +func (cmd *CompactCommand) walk(db *bolt.DB, walkFn walkFunc) error { + return db.View(func(tx *bolt.Tx) error { + return tx.ForEach(func(name []byte, b *bolt.Bucket) error { + return cmd.walkBucket(b, nil, name, nil, b.Sequence(), walkFn) + }) + }) +} + +func (cmd *CompactCommand) walkBucket(b *bolt.Bucket, keypath [][]byte, k, v []byte, seq uint64, fn walkFunc) error { + // Execute callback. + if err := fn(keypath, k, v, seq); err != nil { + return err + } + + // If this is not a bucket then stop. + if v != nil { + return nil + } + + // Iterate over each child key/value. + keypath = append(keypath, k) + return b.ForEach(func(k, v []byte) error { + if v == nil { + bkt := b.Bucket(k) + return cmd.walkBucket(bkt, keypath, k, nil, bkt.Sequence(), fn) + } + return cmd.walkBucket(b, keypath, k, v, b.Sequence(), fn) + }) +} + +// Usage returns the help message. +func (cmd *CompactCommand) Usage() string { + return strings.TrimLeft(` +usage: bolt compact [options] -o DST SRC + +Compact opens a database at SRC path and walks it recursively, copying keys +as they are found from all buckets, to a newly created database at DST path. + +The original database is left untouched. + +Additional options include: + + -tx-max-size NUM + Specifies the maximum size of individual transactions. + Defaults to 64KB. +`, "\n") +} diff --git a/cmd/bolt/main_test.go b/cmd/bolt/main_test.go index c378b79..0a11ff3 100644 --- a/cmd/bolt/main_test.go +++ b/cmd/bolt/main_test.go @@ -2,7 +2,12 @@ package main_test import ( "bytes" + crypto "crypto/rand" + "encoding/binary" + "fmt" + "io" "io/ioutil" + "math/rand" "os" "strconv" "testing" @@ -183,3 +188,169 @@ func (db *DB) Close() error { defer os.Remove(db.Path) return db.DB.Close() } + +func TestCompactCommand_Run(t *testing.T) { + var s int64 + if err := binary.Read(crypto.Reader, binary.BigEndian, &s); err != nil { + t.Fatal(err) + } + rand.Seed(s) + + dstdb := MustOpen(0666, nil) + dstdb.Close() + + // fill the db + db := MustOpen(0666, nil) + if err := db.Update(func(tx *bolt.Tx) error { + n := 2 + rand.Intn(5) + for i := 0; i < n; i++ { + k := []byte(fmt.Sprintf("b%d", i)) + b, err := tx.CreateBucketIfNotExists(k) + if err != nil { + return err + } + if err := b.SetSequence(uint64(i)); err != nil { + return err + } + if err := fillBucket(b, append(k, '.')); err != nil { + return err + } + } + return nil + }); err != nil { + db.Close() + t.Fatal(err) + } + + // make the db grow by adding large values, and delete them. + if err := db.Update(func(tx *bolt.Tx) error { + b, err := tx.CreateBucketIfNotExists([]byte("large_vals")) + if err != nil { + return err + } + n := 5 + rand.Intn(5) + for i := 0; i < n; i++ { + v := make([]byte, 1000*1000*(1+rand.Intn(5))) + _, err := crypto.Read(v) + if err != nil { + return err + } + if err := b.Put([]byte(fmt.Sprintf("l%d", i)), v); err != nil { + return err + } + } + return nil + }); err != nil { + db.Close() + t.Fatal(err) + } + if err := db.Update(func(tx *bolt.Tx) error { + c := tx.Bucket([]byte("large_vals")).Cursor() + for k, _ := c.First(); k != nil; k, _ = c.Next() { + if err := c.Delete(); err != nil { + return err + } + } + return tx.DeleteBucket([]byte("large_vals")) + }); err != nil { + db.Close() + t.Fatal(err) + } + db.DB.Close() + defer db.Close() + defer dstdb.Close() + + dbChk, err := chkdb(db.Path) + if err != nil { + t.Fatal(err) + } + + m := NewMain() + if err := m.Run("compact", "-o", dstdb.Path, db.Path); err != nil { + t.Fatal(err) + } + + dbChkAfterCompact, err := chkdb(db.Path) + if err != nil { + t.Fatal(err) + } + + dstdbChk, err := chkdb(dstdb.Path) + if err != nil { + t.Fatal(err) + } + + if !bytes.Equal(dbChk, dbChkAfterCompact) { + t.Error("the original db has been touched") + } + if !bytes.Equal(dbChk, dstdbChk) { + t.Error("the compacted db data isn't the same than the original db") + } +} + +func fillBucket(b *bolt.Bucket, prefix []byte) error { + n := 10 + rand.Intn(50) + for i := 0; i < n; i++ { + v := make([]byte, 10*(1+rand.Intn(4))) + _, err := crypto.Read(v) + if err != nil { + return err + } + k := append(prefix, []byte(fmt.Sprintf("k%d", i))...) + if err := b.Put(k, v); err != nil { + return err + } + } + // limit depth of subbuckets + s := 2 + rand.Intn(4) + if len(prefix) > (2*s + 1) { + return nil + } + n = 1 + rand.Intn(3) + for i := 0; i < n; i++ { + k := append(prefix, []byte(fmt.Sprintf("b%d", i))...) + sb, err := b.CreateBucket(k) + if err != nil { + return err + } + if err := fillBucket(sb, append(k, '.')); err != nil { + return err + } + } + return nil +} + +func chkdb(path string) ([]byte, error) { + db, err := bolt.Open(path, 0666, nil) + if err != nil { + return nil, err + } + defer db.Close() + var buf bytes.Buffer + err = db.View(func(tx *bolt.Tx) error { + return tx.ForEach(func(name []byte, b *bolt.Bucket) error { + return walkBucket(b, name, nil, &buf) + }) + }) + if err != nil { + return nil, err + } + return buf.Bytes(), nil +} + +func walkBucket(parent *bolt.Bucket, k []byte, v []byte, w io.Writer) error { + if _, err := fmt.Fprintf(w, "%d:%x=%x\n", parent.Sequence(), k, v); err != nil { + return err + } + + // not a bucket, exit. + if v != nil { + return nil + } + return parent.ForEach(func(k, v []byte) error { + if v == nil { + return walkBucket(parent.Bucket(k), k, nil, w) + } + return walkBucket(parent, k, v, w) + }) +} diff --git a/freelist.go b/freelist.go index 1b7ba91..d32f6cd 100644 --- a/freelist.go +++ b/freelist.go @@ -236,7 +236,7 @@ func (f *freelist) reload(p *page) { // reindex rebuilds the free cache based on available and pending free lists. func (f *freelist) reindex() { - f.cache = make(map[pgid]bool) + f.cache = make(map[pgid]bool, len(f.ids)) for _, id := range f.ids { f.cache[id] = true } |