aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorBen Johnson <benbjohnson@yahoo.com>2016-09-01 14:47:06 -0600
committerBen Johnson <benbjohnson@yahoo.com>2016-09-01 14:47:06 -0600
commit52d0f5e6a9539643cdfd94be2aed5003ad199866 (patch)
tree6dde57d3f39e42b671c3622066332f7c7f74eb5f
parentMerge pull request #579 from asdine/master (diff)
parentcompact: allow splitting transactions for large datasets (diff)
downloaddedo-52d0f5e6a9539643cdfd94be2aed5003ad199866.tar.gz
dedo-52d0f5e6a9539643cdfd94be2aed5003ad199866.tar.xz
Merge branch 'compact-db' of https://github.com/vincent-petithory/bolt into vincent-petithory-compact-db
-rw-r--r--cmd/bolt/main.go180
-rw-r--r--cmd/bolt/main_test.go180
2 files changed, 360 insertions, 0 deletions
diff --git a/cmd/bolt/main.go b/cmd/bolt/main.go
index b96e6f7..1f78e03 100644
--- a/cmd/bolt/main.go
+++ b/cmd/bolt/main.go
@@ -102,6 +102,8 @@ func (m *Main) Run(args ...string) error {
return newBenchCommand(m).Run(args[1:]...)
case "check":
return newCheckCommand(m).Run(args[1:]...)
+ case "compact":
+ return newCompactCommand(m).Run(args[1:]...)
case "dump":
return newDumpCommand(m).Run(args[1:]...)
case "info":
@@ -130,6 +132,7 @@ The commands are:
bench run synthetic benchmark against bolt
check verifies integrity of bolt database
+ compact copies a bolt database, compacting it in the process
info print basic info
help print this screen
pages print list of pages with their types
@@ -1530,3 +1533,180 @@ func (n *leafPageElement) value() []byte {
buf := (*[maxAllocSize]byte)(unsafe.Pointer(n))
return buf[n.pos+n.ksize : n.pos+n.ksize+n.vsize]
}
+
+// CompactCommand represents the "compact" command execution.
+type CompactCommand struct {
+ Stdin io.Reader
+ Stdout io.Writer
+ Stderr io.Writer
+}
+
+// newCompactCommand returns a CompactCommand.
+func newCompactCommand(m *Main) *CompactCommand {
+ return &CompactCommand{
+ Stdin: m.Stdin,
+ Stdout: m.Stdout,
+ Stderr: m.Stderr,
+ }
+}
+
+// BucketWalkFunc is the type of the function called for keys (buckets and "normal" values)
+// discovered by Walk.
+// keys is the list of keys to descend to the bucket owning the discovered key/value pair k/v.
+type BucketWalkFunc func(keys [][]byte, k []byte, v []byte) error
+
+// Walk walks recursively the bolt database db, calling walkFn for each key it finds.
+func (cmd *CompactCommand) Walk(db *bolt.DB, walkFn BucketWalkFunc) error {
+ return db.View(func(tx *bolt.Tx) error {
+ return tx.ForEach(func(name []byte, b *bolt.Bucket) error {
+ return cmd.walkBucket(b, nil, name, nil, walkFn)
+ })
+ })
+}
+
+func (cmd *CompactCommand) walkBucket(b *bolt.Bucket, keys [][]byte, k []byte, v []byte, walkFn BucketWalkFunc) error {
+ if err := walkFn(keys, k, v); err != nil {
+ return err
+ }
+ // not a bucket, exit.
+ if v != nil {
+ return nil
+ }
+
+ keys2 := append(keys, k)
+ return b.ForEach(func(k, v []byte) error {
+ if v == nil {
+ return cmd.walkBucket(b.Bucket(k), keys2, k, nil, walkFn)
+ }
+ return cmd.walkBucket(b, keys2, k, v, walkFn)
+ })
+}
+
+// Run executes the command.
+func (cmd *CompactCommand) Run(args ...string) (err error) {
+ // Parse flags.
+ fs := flag.NewFlagSet("", flag.ContinueOnError)
+ fs.SetOutput(cmd.Stderr)
+ var txMaxSize int64
+ fs.Int64Var(&txMaxSize, "tx-max-size", 0, "commit tx when key/value size sum exceed this value. If 0, only one transaction is used. If you are compacting a large database, set this to a value appropriate for the available memory.")
+ help := fs.Bool("h", false, "print this help")
+ if err := fs.Parse(args); err != nil {
+ return err
+ } else if *help {
+ fmt.Fprintln(cmd.Stderr, cmd.Usage())
+ fs.PrintDefaults()
+ return ErrUsage
+ }
+
+ // Require database path.
+ path := fs.Arg(0)
+ if path == "" {
+ return ErrPathRequired
+ } else if _, err := os.Stat(path); os.IsNotExist(err) {
+ return ErrFileNotFound
+ }
+ fi, err := os.Stat(path)
+ if err != nil {
+ return err
+ }
+ initialSize := fi.Size()
+
+ // Open database.
+ db, err := bolt.Open(path, 0444, nil)
+ if err != nil {
+ return err
+ }
+ defer db.Close()
+
+ var dstPath string
+ if fs.NArg() < 2 {
+ f, err := ioutil.TempFile("", "bolt-compact-")
+ if err != nil {
+ return fmt.Errorf("temp file: %v", err)
+ }
+ _ = f.Close()
+ _ = os.Remove(f.Name())
+ dstPath = f.Name()
+ fmt.Fprintf(cmd.Stdout, "compacting db to %s\n", dstPath)
+ } else {
+ dstPath = fs.Arg(1)
+ }
+
+ defer func() {
+ fi, err := os.Stat(dstPath)
+ if err != nil {
+ fmt.Fprintln(cmd.Stderr, err)
+ }
+ newSize := fi.Size()
+ if newSize == 0 {
+ fmt.Fprintln(cmd.Stderr, "db size is 0")
+ }
+ fmt.Fprintf(cmd.Stdout, "%d -> %d bytes (gain=%.2fx)\n", initialSize, newSize, float64(initialSize)/float64(newSize))
+ }()
+
+ dstdb, err := bolt.Open(dstPath, 0666, nil)
+ if err != nil {
+ return err
+ }
+ defer dstdb.Close()
+
+ // commit regularly, or we'll run out of memory for large datasets if using one transaction.
+ var size int64
+ tx, err := dstdb.Begin(true)
+ if err != nil {
+ return err
+ }
+ defer func() {
+ if err != nil {
+ _ = tx.Rollback()
+ } else {
+ err = tx.Commit()
+ }
+ }()
+ return cmd.Walk(db, func(keys [][]byte, k []byte, v []byte) error {
+ s := int64(len(k) + len(v))
+ if size+s > txMaxSize && txMaxSize != 0 {
+ if err := tx.Commit(); err != nil {
+ return err
+ }
+ tx, err = dstdb.Begin(true)
+ if err != nil {
+ return err
+ }
+ size = 0
+ }
+ size += s
+ nk := len(keys)
+ if nk == 0 {
+ _, err := tx.CreateBucket(k)
+ return err
+ }
+
+ b := tx.Bucket(keys[0])
+ if nk > 1 {
+ for _, k := range keys[1:] {
+ b = b.Bucket(k)
+ }
+ }
+ if v == nil {
+ _, err := b.CreateBucket(k)
+ return err
+ }
+ return b.Put(k, v)
+ })
+}
+
+// Usage returns the help message.
+func (cmd *CompactCommand) Usage() string {
+ return strings.TrimLeft(`
+usage: bolt compact PATH [DST_PATH]
+
+Compact opens a database at PATH and walks it recursively entirely,
+copying keys as they are found from all buckets, to a newly created db.
+
+If DST_PATH is non-empty, the new db is created at DST_PATH, else it will be
+in a temporary location.
+
+The original db is left untouched.
+`, "\n")
+}
diff --git a/cmd/bolt/main_test.go b/cmd/bolt/main_test.go
index c378b79..e8942a0 100644
--- a/cmd/bolt/main_test.go
+++ b/cmd/bolt/main_test.go
@@ -2,7 +2,12 @@ package main_test
import (
"bytes"
+ crypto "crypto/rand"
+ "encoding/binary"
+ "fmt"
+ "io"
"io/ioutil"
+ "math/rand"
"os"
"strconv"
"testing"
@@ -183,3 +188,178 @@ func (db *DB) Close() error {
defer os.Remove(db.Path)
return db.DB.Close()
}
+
+func TestCompactCommand_Run(t *testing.T) {
+ var s int64
+ if err := binary.Read(crypto.Reader, binary.BigEndian, &s); err != nil {
+ t.Fatal(err)
+ }
+ rand.Seed(s)
+
+ dstdb := MustOpen(0666, nil)
+ dstdb.Close()
+
+ // fill the db
+ db := MustOpen(0666, nil)
+ if err := db.Update(func(tx *bolt.Tx) error {
+ n := 2 + rand.Intn(5)
+ for i := 0; i < n; i++ {
+ k := []byte(fmt.Sprintf("b%d", i))
+ b, err := tx.CreateBucketIfNotExists(k)
+ if err != nil {
+ return err
+ }
+ if err := fillBucket(b, append(k, '.')); err != nil {
+ return err
+ }
+ }
+ return nil
+ }); err != nil {
+ db.Close()
+ t.Fatal(err)
+ }
+
+ // make the db grow by adding large values, and delete them.
+ if err := db.Update(func(tx *bolt.Tx) error {
+ b, err := tx.CreateBucketIfNotExists([]byte("large_vals"))
+ if err != nil {
+ return err
+ }
+ n := 5 + rand.Intn(5)
+ for i := 0; i < n; i++ {
+ v := make([]byte, 1000*1000*(1+rand.Intn(5)))
+ _, err := crypto.Read(v)
+ if err != nil {
+ return err
+ }
+ if err := b.Put([]byte(fmt.Sprintf("l%d", i)), v); err != nil {
+ return err
+ }
+ }
+ return nil
+ }); err != nil {
+ db.Close()
+ t.Fatal(err)
+ }
+ if err := db.Update(func(tx *bolt.Tx) error {
+ c := tx.Bucket([]byte("large_vals")).Cursor()
+ for k, _ := c.First(); k != nil; k, _ = c.Next() {
+ if err := c.Delete(); err != nil {
+ return err
+ }
+ }
+ return tx.DeleteBucket([]byte("large_vals"))
+ }); err != nil {
+ db.Close()
+ t.Fatal(err)
+ }
+ db.DB.Close()
+ defer db.Close()
+ defer dstdb.Close()
+
+ dbChk, err := chkdb(db.Path)
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ m := NewMain()
+ if err := m.Run("compact", db.Path, dstdb.Path); err != nil {
+ t.Fatal(err)
+ }
+
+ dbChkAfterCompact, err := chkdb(db.Path)
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ dstdbChk, err := chkdb(dstdb.Path)
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ if !bytes.Equal(dbChk, dbChkAfterCompact) {
+ t.Error("the original db has been touched")
+ }
+ if !bytes.Equal(dbChk, dstdbChk) {
+ t.Error("the compacted db data isn't the same than the original db")
+ }
+}
+
+func fillBucket(b *bolt.Bucket, prefix []byte) error {
+ n := 10 + rand.Intn(50)
+ for i := 0; i < n; i++ {
+ v := make([]byte, 10*(1+rand.Intn(4)))
+ _, err := crypto.Read(v)
+ if err != nil {
+ return err
+ }
+ k := append(prefix, []byte(fmt.Sprintf("k%d", i))...)
+ if err := b.Put(k, v); err != nil {
+ return err
+ }
+ }
+ // limit depth of subbuckets
+ s := 2 + rand.Intn(4)
+ if len(prefix) > (2*s + 1) {
+ return nil
+ }
+ n = 1 + rand.Intn(3)
+ for i := 0; i < n; i++ {
+ k := append(prefix, []byte(fmt.Sprintf("b%d", i))...)
+ sb, err := b.CreateBucket(k)
+ if err != nil {
+ return err
+ }
+ if err := fillBucket(sb, append(k, '.')); err != nil {
+ return err
+ }
+ }
+ return nil
+}
+
+func chkdb(path string) ([]byte, error) {
+ db, err := bolt.Open(path, 0666, nil)
+ if err != nil {
+ return nil, err
+ }
+ defer db.Close()
+ var buf bytes.Buffer
+ err = db.View(func(tx *bolt.Tx) error {
+ return tx.ForEach(func(name []byte, b *bolt.Bucket) error {
+ return walkBucket(b, name, nil, &buf)
+ })
+ })
+ if err != nil {
+ return nil, err
+ }
+ return buf.Bytes(), nil
+}
+
+func walkBucket(parent *bolt.Bucket, k []byte, v []byte, w io.Writer) error {
+ _, err := w.Write(k)
+ if err != nil {
+ return err
+ }
+ _, err = io.WriteString(w, ":")
+ if err != nil {
+ return err
+ }
+ _, err = w.Write(v)
+ if err != nil {
+ return err
+ }
+ _, err = fmt.Fprintln(w)
+ if err != nil {
+ return err
+ }
+ // not a bucket, exit.
+ if v != nil {
+ return nil
+ }
+ return parent.ForEach(func(k, v []byte) error {
+ if v == nil {
+ return walkBucket(parent.Bucket(k), k, nil, w)
+ }
+ return walkBucket(parent, k, v, w)
+ })
+}