aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--README.md10
-rw-r--r--bolt_386.go3
-rw-r--r--bolt_amd64.go3
-rw-r--r--bolt_arm.go21
-rw-r--r--bolt_arm64.go3
-rw-r--r--bolt_ppc64le.go3
-rw-r--r--bolt_s390x.go3
-rw-r--r--bucket.go32
-rw-r--r--bucket_test.go42
-rw-r--r--cmd/bolt/main.go212
-rw-r--r--cmd/bolt/main_test.go171
-rw-r--r--freelist.go2
12 files changed, 496 insertions, 9 deletions
diff --git a/README.md b/README.md
index 9df9319..0d3b4bd 100644
--- a/README.md
+++ b/README.md
@@ -15,11 +15,11 @@ and setting values. That's it.
## Project Status
-Bolt is stable and the API is fixed. Full unit test coverage and randomized
-black box testing are used to ensure database consistency and thread safety.
-Bolt is currently in high-load production environments serving databases as
-large as 1TB. Many companies such as Shopify and Heroku use Bolt-backed
-services every day.
+Bolt is stable, the API is fixed, and the file format is fixed. Full unit
+test coverage and randomized black box testing are used to ensure database
+consistency and thread safety. Bolt is currently in high-load production
+environments serving databases as large as 1TB. Many companies such as
+Shopify and Heroku use Bolt-backed services every day.
## Table of Contents
diff --git a/bolt_386.go b/bolt_386.go
index e659bfb..820d533 100644
--- a/bolt_386.go
+++ b/bolt_386.go
@@ -5,3 +5,6 @@ const maxMapSize = 0x7FFFFFFF // 2GB
// maxAllocSize is the size used when creating array pointers.
const maxAllocSize = 0xFFFFFFF
+
+// Are unaligned load/stores broken on this arch?
+var brokenUnaligned = false
diff --git a/bolt_amd64.go b/bolt_amd64.go
index cca6b7e..98fafdb 100644
--- a/bolt_amd64.go
+++ b/bolt_amd64.go
@@ -5,3 +5,6 @@ const maxMapSize = 0xFFFFFFFFFFFF // 256TB
// maxAllocSize is the size used when creating array pointers.
const maxAllocSize = 0x7FFFFFFF
+
+// Are unaligned load/stores broken on this arch?
+var brokenUnaligned = false
diff --git a/bolt_arm.go b/bolt_arm.go
index e659bfb..7e5cb4b 100644
--- a/bolt_arm.go
+++ b/bolt_arm.go
@@ -1,7 +1,28 @@
package bolt
+import "unsafe"
+
// maxMapSize represents the largest mmap size supported by Bolt.
const maxMapSize = 0x7FFFFFFF // 2GB
// maxAllocSize is the size used when creating array pointers.
const maxAllocSize = 0xFFFFFFF
+
+// Are unaligned load/stores broken on this arch?
+var brokenUnaligned bool
+
+func init() {
+ // Simple check to see whether this arch handles unaligned load/stores
+ // correctly.
+
+ // ARM9 and older devices require load/stores to be from/to aligned
+ // addresses. If not, the lower 2 bits are cleared and that address is
+ // read in a jumbled up order.
+
+ // See http://infocenter.arm.com/help/index.jsp?topic=/com.arm.doc.faqs/ka15414.html
+
+ raw := [6]byte{0xfe, 0xef, 0x11, 0x22, 0x22, 0x11}
+ val := *(*uint32)(unsafe.Pointer(uintptr(unsafe.Pointer(&raw)) + 2))
+
+ brokenUnaligned = val != 0x11222211
+}
diff --git a/bolt_arm64.go b/bolt_arm64.go
index 6d23093..b26d84f 100644
--- a/bolt_arm64.go
+++ b/bolt_arm64.go
@@ -7,3 +7,6 @@ const maxMapSize = 0xFFFFFFFFFFFF // 256TB
// maxAllocSize is the size used when creating array pointers.
const maxAllocSize = 0x7FFFFFFF
+
+// Are unaligned load/stores broken on this arch?
+var brokenUnaligned = false
diff --git a/bolt_ppc64le.go b/bolt_ppc64le.go
index 8351e12..8c143bc 100644
--- a/bolt_ppc64le.go
+++ b/bolt_ppc64le.go
@@ -7,3 +7,6 @@ const maxMapSize = 0xFFFFFFFFFFFF // 256TB
// maxAllocSize is the size used when creating array pointers.
const maxAllocSize = 0x7FFFFFFF
+
+// Are unaligned load/stores broken on this arch?
+var brokenUnaligned = false
diff --git a/bolt_s390x.go b/bolt_s390x.go
index f4dd26b..d7c39af 100644
--- a/bolt_s390x.go
+++ b/bolt_s390x.go
@@ -7,3 +7,6 @@ const maxMapSize = 0xFFFFFFFFFFFF // 256TB
// maxAllocSize is the size used when creating array pointers.
const maxAllocSize = 0x7FFFFFFF
+
+// Are unaligned load/stores broken on this arch?
+var brokenUnaligned = false
diff --git a/bucket.go b/bucket.go
index d2f8c52..511ce72 100644
--- a/bucket.go
+++ b/bucket.go
@@ -130,9 +130,17 @@ func (b *Bucket) Bucket(name []byte) *Bucket {
func (b *Bucket) openBucket(value []byte) *Bucket {
var child = newBucket(b.tx)
+ // If unaligned load/stores are broken on this arch and value is
+ // unaligned simply clone to an aligned byte array.
+ unaligned := brokenUnaligned && uintptr(unsafe.Pointer(&value[0]))&3 != 0
+
+ if unaligned {
+ value = cloneBytes(value)
+ }
+
// If this is a writable transaction then we need to copy the bucket entry.
// Read-only transactions can point directly at the mmap entry.
- if b.tx.writable {
+ if b.tx.writable && !unaligned {
child.bucket = &bucket{}
*child.bucket = *(*bucket)(unsafe.Pointer(&value[0]))
} else {
@@ -329,6 +337,28 @@ func (b *Bucket) Delete(key []byte) error {
return nil
}
+// Sequence returns the current integer for the bucket without incrementing it.
+func (b *Bucket) Sequence() uint64 { return b.bucket.sequence }
+
+// SetSequence updates the sequence number for the bucket.
+func (b *Bucket) SetSequence(v uint64) error {
+ if b.tx.db == nil {
+ return ErrTxClosed
+ } else if !b.Writable() {
+ return ErrTxNotWritable
+ }
+
+ // Materialize the root node if it hasn't been already so that the
+ // bucket will be saved during commit.
+ if b.rootNode == nil {
+ _ = b.node(b.root, nil)
+ }
+
+ // Increment and return the sequence.
+ b.bucket.sequence = v
+ return nil
+}
+
// NextSequence returns an autoincrementing integer for the bucket.
func (b *Bucket) NextSequence() (uint64, error) {
if b.tx.db == nil {
diff --git a/bucket_test.go b/bucket_test.go
index 528fec2..cddbe27 100644
--- a/bucket_test.go
+++ b/bucket_test.go
@@ -782,6 +782,48 @@ func TestBucket_DeleteBucket_IncompatibleValue(t *testing.T) {
}
}
+// Ensure bucket can set and update its sequence number.
+func TestBucket_Sequence(t *testing.T) {
+ db := MustOpenDB()
+ defer db.MustClose()
+
+ if err := db.Update(func(tx *bolt.Tx) error {
+ bkt, err := tx.CreateBucket([]byte("0"))
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ // Retrieve sequence.
+ if v := bkt.Sequence(); v != 0 {
+ t.Fatalf("unexpected sequence: %d", v)
+ }
+
+ // Update sequence.
+ if err := bkt.SetSequence(1000); err != nil {
+ t.Fatal(err)
+ }
+
+ // Read sequence again.
+ if v := bkt.Sequence(); v != 1000 {
+ t.Fatalf("unexpected sequence: %d", v)
+ }
+
+ return nil
+ }); err != nil {
+ t.Fatal(err)
+ }
+
+ // Verify sequence in separate transaction.
+ if err := db.View(func(tx *bolt.Tx) error {
+ if v := tx.Bucket([]byte("0")).Sequence(); v != 1000 {
+ t.Fatalf("unexpected sequence: %d", v)
+ }
+ return nil
+ }); err != nil {
+ t.Fatal(err)
+ }
+}
+
// Ensure that a bucket can return an autoincrementing sequence.
func TestBucket_NextSequence(t *testing.T) {
db := MustOpenDB()
diff --git a/cmd/bolt/main.go b/cmd/bolt/main.go
index b96e6f7..29e393f 100644
--- a/cmd/bolt/main.go
+++ b/cmd/bolt/main.go
@@ -102,6 +102,8 @@ func (m *Main) Run(args ...string) error {
return newBenchCommand(m).Run(args[1:]...)
case "check":
return newCheckCommand(m).Run(args[1:]...)
+ case "compact":
+ return newCompactCommand(m).Run(args[1:]...)
case "dump":
return newDumpCommand(m).Run(args[1:]...)
case "info":
@@ -130,6 +132,7 @@ The commands are:
bench run synthetic benchmark against bolt
check verifies integrity of bolt database
+ compact copies a bolt database, compacting it in the process
info print basic info
help print this screen
pages print list of pages with their types
@@ -539,9 +542,9 @@ func (cmd *PageCommand) PrintLeaf(w io.Writer, buf []byte) error {
b := (*bucket)(unsafe.Pointer(&e.value()[0]))
v = fmt.Sprintf("<pgid=%d,seq=%d>", b.root, b.sequence)
} else if isPrintable(string(e.value())) {
- k = fmt.Sprintf("%q", string(e.value()))
+ v = fmt.Sprintf("%q", string(e.value()))
} else {
- k = fmt.Sprintf("%x", string(e.value()))
+ v = fmt.Sprintf("%x", string(e.value()))
}
fmt.Fprintf(w, "%s: %s\n", k, v)
@@ -1530,3 +1533,208 @@ func (n *leafPageElement) value() []byte {
buf := (*[maxAllocSize]byte)(unsafe.Pointer(n))
return buf[n.pos+n.ksize : n.pos+n.ksize+n.vsize]
}
+
+// CompactCommand represents the "compact" command execution.
+type CompactCommand struct {
+ Stdin io.Reader
+ Stdout io.Writer
+ Stderr io.Writer
+
+ SrcPath string
+ DstPath string
+ TxMaxSize int64
+}
+
+// newCompactCommand returns a CompactCommand.
+func newCompactCommand(m *Main) *CompactCommand {
+ return &CompactCommand{
+ Stdin: m.Stdin,
+ Stdout: m.Stdout,
+ Stderr: m.Stderr,
+ }
+}
+
+// Run executes the command.
+func (cmd *CompactCommand) Run(args ...string) (err error) {
+ // Parse flags.
+ fs := flag.NewFlagSet("", flag.ContinueOnError)
+ fs.SetOutput(ioutil.Discard)
+ fs.StringVar(&cmd.DstPath, "o", "", "")
+ fs.Int64Var(&cmd.TxMaxSize, "tx-max-size", 65536, "")
+ if err := fs.Parse(args); err == flag.ErrHelp {
+ fmt.Fprintln(cmd.Stderr, cmd.Usage())
+ return ErrUsage
+ } else if err != nil {
+ return err
+ } else if cmd.DstPath == "" {
+ return fmt.Errorf("output file required")
+ }
+
+ // Require database paths.
+ cmd.SrcPath = fs.Arg(0)
+ if cmd.SrcPath == "" {
+ return ErrPathRequired
+ }
+
+ // Ensure source file exists.
+ fi, err := os.Stat(cmd.SrcPath)
+ if os.IsNotExist(err) {
+ return ErrFileNotFound
+ } else if err != nil {
+ return err
+ }
+ initialSize := fi.Size()
+
+ // Open source database.
+ src, err := bolt.Open(cmd.SrcPath, 0444, nil)
+ if err != nil {
+ return err
+ }
+ defer src.Close()
+
+ // Open destination database.
+ dst, err := bolt.Open(cmd.DstPath, fi.Mode(), nil)
+ if err != nil {
+ return err
+ }
+ defer dst.Close()
+
+ // Run compaction.
+ if err := cmd.compact(dst, src); err != nil {
+ return err
+ }
+
+ // Report stats on new size.
+ fi, err = os.Stat(cmd.DstPath)
+ if err != nil {
+ return err
+ } else if fi.Size() == 0 {
+ return fmt.Errorf("zero db size")
+ }
+ fmt.Fprintf(cmd.Stdout, "%d -> %d bytes (gain=%.2fx)\n", initialSize, fi.Size(), float64(initialSize)/float64(fi.Size()))
+
+ return nil
+}
+
+func (cmd *CompactCommand) compact(dst, src *bolt.DB) error {
+ // commit regularly, or we'll run out of memory for large datasets if using one transaction.
+ var size int64
+ tx, err := dst.Begin(true)
+ if err != nil {
+ return err
+ }
+ defer tx.Rollback()
+
+ if err := cmd.walk(src, func(keys [][]byte, k, v []byte, seq uint64) error {
+ // On each key/value, check if we have exceeded tx size.
+ sz := int64(len(k) + len(v))
+ if size+sz > cmd.TxMaxSize && cmd.TxMaxSize != 0 {
+ // Commit previous transaction.
+ if err := tx.Commit(); err != nil {
+ return err
+ }
+
+ // Start new transaction.
+ tx, err = dst.Begin(true)
+ if err != nil {
+ return err
+ }
+ size = 0
+ }
+ size += sz
+
+ // Create bucket on the root transaction if this is the first level.
+ nk := len(keys)
+ if nk == 0 {
+ bkt, err := tx.CreateBucket(k)
+ if err != nil {
+ return err
+ }
+ if err := bkt.SetSequence(seq); err != nil {
+ return err
+ }
+ return nil
+ }
+
+ // Create buckets on subsequent levels, if necessary.
+ b := tx.Bucket(keys[0])
+ if nk > 1 {
+ for _, k := range keys[1:] {
+ b = b.Bucket(k)
+ }
+ }
+
+ // If there is no value then this is a bucket call.
+ if v == nil {
+ bkt, err := b.CreateBucket(k)
+ if err != nil {
+ return err
+ }
+ if err := bkt.SetSequence(seq); err != nil {
+ return err
+ }
+ return nil
+ }
+
+ // Otherwise treat it as a key/value pair.
+ return b.Put(k, v)
+ }); err != nil {
+ return err
+ }
+
+ return tx.Commit()
+}
+
+// walkFunc is the type of the function called for keys (buckets and "normal"
+// values) discovered by Walk. keys is the list of keys to descend to the bucket
+// owning the discovered key/value pair k/v.
+type walkFunc func(keys [][]byte, k, v []byte, seq uint64) error
+
+// walk walks recursively the bolt database db, calling walkFn for each key it finds.
+func (cmd *CompactCommand) walk(db *bolt.DB, walkFn walkFunc) error {
+ return db.View(func(tx *bolt.Tx) error {
+ return tx.ForEach(func(name []byte, b *bolt.Bucket) error {
+ return cmd.walkBucket(b, nil, name, nil, b.Sequence(), walkFn)
+ })
+ })
+}
+
+func (cmd *CompactCommand) walkBucket(b *bolt.Bucket, keypath [][]byte, k, v []byte, seq uint64, fn walkFunc) error {
+ // Execute callback.
+ if err := fn(keypath, k, v, seq); err != nil {
+ return err
+ }
+
+ // If this is not a bucket then stop.
+ if v != nil {
+ return nil
+ }
+
+ // Iterate over each child key/value.
+ keypath = append(keypath, k)
+ return b.ForEach(func(k, v []byte) error {
+ if v == nil {
+ bkt := b.Bucket(k)
+ return cmd.walkBucket(bkt, keypath, k, nil, bkt.Sequence(), fn)
+ }
+ return cmd.walkBucket(b, keypath, k, v, b.Sequence(), fn)
+ })
+}
+
+// Usage returns the help message.
+func (cmd *CompactCommand) Usage() string {
+ return strings.TrimLeft(`
+usage: bolt compact [options] -o DST SRC
+
+Compact opens a database at SRC path and walks it recursively, copying keys
+as they are found from all buckets, to a newly created database at DST path.
+
+The original database is left untouched.
+
+Additional options include:
+
+ -tx-max-size NUM
+ Specifies the maximum size of individual transactions.
+ Defaults to 64KB.
+`, "\n")
+}
diff --git a/cmd/bolt/main_test.go b/cmd/bolt/main_test.go
index c378b79..0a11ff3 100644
--- a/cmd/bolt/main_test.go
+++ b/cmd/bolt/main_test.go
@@ -2,7 +2,12 @@ package main_test
import (
"bytes"
+ crypto "crypto/rand"
+ "encoding/binary"
+ "fmt"
+ "io"
"io/ioutil"
+ "math/rand"
"os"
"strconv"
"testing"
@@ -183,3 +188,169 @@ func (db *DB) Close() error {
defer os.Remove(db.Path)
return db.DB.Close()
}
+
+func TestCompactCommand_Run(t *testing.T) {
+ var s int64
+ if err := binary.Read(crypto.Reader, binary.BigEndian, &s); err != nil {
+ t.Fatal(err)
+ }
+ rand.Seed(s)
+
+ dstdb := MustOpen(0666, nil)
+ dstdb.Close()
+
+ // fill the db
+ db := MustOpen(0666, nil)
+ if err := db.Update(func(tx *bolt.Tx) error {
+ n := 2 + rand.Intn(5)
+ for i := 0; i < n; i++ {
+ k := []byte(fmt.Sprintf("b%d", i))
+ b, err := tx.CreateBucketIfNotExists(k)
+ if err != nil {
+ return err
+ }
+ if err := b.SetSequence(uint64(i)); err != nil {
+ return err
+ }
+ if err := fillBucket(b, append(k, '.')); err != nil {
+ return err
+ }
+ }
+ return nil
+ }); err != nil {
+ db.Close()
+ t.Fatal(err)
+ }
+
+ // make the db grow by adding large values, and delete them.
+ if err := db.Update(func(tx *bolt.Tx) error {
+ b, err := tx.CreateBucketIfNotExists([]byte("large_vals"))
+ if err != nil {
+ return err
+ }
+ n := 5 + rand.Intn(5)
+ for i := 0; i < n; i++ {
+ v := make([]byte, 1000*1000*(1+rand.Intn(5)))
+ _, err := crypto.Read(v)
+ if err != nil {
+ return err
+ }
+ if err := b.Put([]byte(fmt.Sprintf("l%d", i)), v); err != nil {
+ return err
+ }
+ }
+ return nil
+ }); err != nil {
+ db.Close()
+ t.Fatal(err)
+ }
+ if err := db.Update(func(tx *bolt.Tx) error {
+ c := tx.Bucket([]byte("large_vals")).Cursor()
+ for k, _ := c.First(); k != nil; k, _ = c.Next() {
+ if err := c.Delete(); err != nil {
+ return err
+ }
+ }
+ return tx.DeleteBucket([]byte("large_vals"))
+ }); err != nil {
+ db.Close()
+ t.Fatal(err)
+ }
+ db.DB.Close()
+ defer db.Close()
+ defer dstdb.Close()
+
+ dbChk, err := chkdb(db.Path)
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ m := NewMain()
+ if err := m.Run("compact", "-o", dstdb.Path, db.Path); err != nil {
+ t.Fatal(err)
+ }
+
+ dbChkAfterCompact, err := chkdb(db.Path)
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ dstdbChk, err := chkdb(dstdb.Path)
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ if !bytes.Equal(dbChk, dbChkAfterCompact) {
+ t.Error("the original db has been touched")
+ }
+ if !bytes.Equal(dbChk, dstdbChk) {
+ t.Error("the compacted db data isn't the same than the original db")
+ }
+}
+
+func fillBucket(b *bolt.Bucket, prefix []byte) error {
+ n := 10 + rand.Intn(50)
+ for i := 0; i < n; i++ {
+ v := make([]byte, 10*(1+rand.Intn(4)))
+ _, err := crypto.Read(v)
+ if err != nil {
+ return err
+ }
+ k := append(prefix, []byte(fmt.Sprintf("k%d", i))...)
+ if err := b.Put(k, v); err != nil {
+ return err
+ }
+ }
+ // limit depth of subbuckets
+ s := 2 + rand.Intn(4)
+ if len(prefix) > (2*s + 1) {
+ return nil
+ }
+ n = 1 + rand.Intn(3)
+ for i := 0; i < n; i++ {
+ k := append(prefix, []byte(fmt.Sprintf("b%d", i))...)
+ sb, err := b.CreateBucket(k)
+ if err != nil {
+ return err
+ }
+ if err := fillBucket(sb, append(k, '.')); err != nil {
+ return err
+ }
+ }
+ return nil
+}
+
+func chkdb(path string) ([]byte, error) {
+ db, err := bolt.Open(path, 0666, nil)
+ if err != nil {
+ return nil, err
+ }
+ defer db.Close()
+ var buf bytes.Buffer
+ err = db.View(func(tx *bolt.Tx) error {
+ return tx.ForEach(func(name []byte, b *bolt.Bucket) error {
+ return walkBucket(b, name, nil, &buf)
+ })
+ })
+ if err != nil {
+ return nil, err
+ }
+ return buf.Bytes(), nil
+}
+
+func walkBucket(parent *bolt.Bucket, k []byte, v []byte, w io.Writer) error {
+ if _, err := fmt.Fprintf(w, "%d:%x=%x\n", parent.Sequence(), k, v); err != nil {
+ return err
+ }
+
+ // not a bucket, exit.
+ if v != nil {
+ return nil
+ }
+ return parent.ForEach(func(k, v []byte) error {
+ if v == nil {
+ return walkBucket(parent.Bucket(k), k, nil, w)
+ }
+ return walkBucket(parent, k, v, w)
+ })
+}
diff --git a/freelist.go b/freelist.go
index 1b7ba91..d32f6cd 100644
--- a/freelist.go
+++ b/freelist.go
@@ -236,7 +236,7 @@ func (f *freelist) reload(p *page) {
// reindex rebuilds the free cache based on available and pending free lists.
func (f *freelist) reindex() {
- f.cache = make(map[pgid]bool)
+ f.cache = make(map[pgid]bool, len(f.ids))
for _, id := range f.ids {
f.cache[id] = true
}