aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorBen Johnson <benbjohnson@yahoo.com>2016-09-01 15:34:35 -0600
committerBen Johnson <benbjohnson@yahoo.com>2016-09-05 15:43:02 -0600
commitf5d275b53730adc5a0c988cf79d4fac84c0a3210 (patch)
tree3402b7e1d9ddcc83102b385181d52bbac81e70d0
parentMerge branch 'compact-db' of https://github.com/vincent-petithory/bolt into v... (diff)
downloaddedo-f5d275b53730adc5a0c988cf79d4fac84c0a3210.tar.gz
dedo-f5d275b53730adc5a0c988cf79d4fac84c0a3210.tar.xz
Minor bolt compact revisions
-rw-r--r--bucket.go22
-rw-r--r--bucket_test.go42
-rw-r--r--cmd/bolt/main.go224
-rw-r--r--cmd/bolt/main_test.go21
4 files changed, 196 insertions, 113 deletions
diff --git a/bucket.go b/bucket.go
index d2f8c52..8e00380 100644
--- a/bucket.go
+++ b/bucket.go
@@ -329,6 +329,28 @@ func (b *Bucket) Delete(key []byte) error {
return nil
}
+// Sequence returns the current integer for the bucket without incrementing it.
+func (b *Bucket) Sequence() uint64 { return b.bucket.sequence }
+
+// SetSequence updates the sequence number for the bucket.
+func (b *Bucket) SetSequence(v uint64) error {
+ if b.tx.db == nil {
+ return ErrTxClosed
+ } else if !b.Writable() {
+ return ErrTxNotWritable
+ }
+
+ // Materialize the root node if it hasn't been already so that the
+ // bucket will be saved during commit.
+ if b.rootNode == nil {
+ _ = b.node(b.root, nil)
+ }
+
+ // Increment and return the sequence.
+ b.bucket.sequence = v
+ return nil
+}
+
// NextSequence returns an autoincrementing integer for the bucket.
func (b *Bucket) NextSequence() (uint64, error) {
if b.tx.db == nil {
diff --git a/bucket_test.go b/bucket_test.go
index 528fec2..cddbe27 100644
--- a/bucket_test.go
+++ b/bucket_test.go
@@ -782,6 +782,48 @@ func TestBucket_DeleteBucket_IncompatibleValue(t *testing.T) {
}
}
+// Ensure bucket can set and update its sequence number.
+func TestBucket_Sequence(t *testing.T) {
+ db := MustOpenDB()
+ defer db.MustClose()
+
+ if err := db.Update(func(tx *bolt.Tx) error {
+ bkt, err := tx.CreateBucket([]byte("0"))
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ // Retrieve sequence.
+ if v := bkt.Sequence(); v != 0 {
+ t.Fatalf("unexpected sequence: %d", v)
+ }
+
+ // Update sequence.
+ if err := bkt.SetSequence(1000); err != nil {
+ t.Fatal(err)
+ }
+
+ // Read sequence again.
+ if v := bkt.Sequence(); v != 1000 {
+ t.Fatalf("unexpected sequence: %d", v)
+ }
+
+ return nil
+ }); err != nil {
+ t.Fatal(err)
+ }
+
+ // Verify sequence in separate transaction.
+ if err := db.View(func(tx *bolt.Tx) error {
+ if v := tx.Bucket([]byte("0")).Sequence(); v != 1000 {
+ t.Fatalf("unexpected sequence: %d", v)
+ }
+ return nil
+ }); err != nil {
+ t.Fatal(err)
+ }
+}
+
// Ensure that a bucket can return an autoincrementing sequence.
func TestBucket_NextSequence(t *testing.T) {
db := MustOpenDB()
diff --git a/cmd/bolt/main.go b/cmd/bolt/main.go
index 1f78e03..a132ec0 100644
--- a/cmd/bolt/main.go
+++ b/cmd/bolt/main.go
@@ -1539,6 +1539,10 @@ type CompactCommand struct {
Stdin io.Reader
Stdout io.Writer
Stderr io.Writer
+
+ SrcPath string
+ DstPath string
+ TxMaxSize int64
}
// newCompactCommand returns a CompactCommand.
@@ -1550,163 +1554,187 @@ func newCompactCommand(m *Main) *CompactCommand {
}
}
-// BucketWalkFunc is the type of the function called for keys (buckets and "normal" values)
-// discovered by Walk.
-// keys is the list of keys to descend to the bucket owning the discovered key/value pair k/v.
-type BucketWalkFunc func(keys [][]byte, k []byte, v []byte) error
-
-// Walk walks recursively the bolt database db, calling walkFn for each key it finds.
-func (cmd *CompactCommand) Walk(db *bolt.DB, walkFn BucketWalkFunc) error {
- return db.View(func(tx *bolt.Tx) error {
- return tx.ForEach(func(name []byte, b *bolt.Bucket) error {
- return cmd.walkBucket(b, nil, name, nil, walkFn)
- })
- })
-}
-
-func (cmd *CompactCommand) walkBucket(b *bolt.Bucket, keys [][]byte, k []byte, v []byte, walkFn BucketWalkFunc) error {
- if err := walkFn(keys, k, v); err != nil {
- return err
- }
- // not a bucket, exit.
- if v != nil {
- return nil
- }
-
- keys2 := append(keys, k)
- return b.ForEach(func(k, v []byte) error {
- if v == nil {
- return cmd.walkBucket(b.Bucket(k), keys2, k, nil, walkFn)
- }
- return cmd.walkBucket(b, keys2, k, v, walkFn)
- })
-}
-
// Run executes the command.
func (cmd *CompactCommand) Run(args ...string) (err error) {
// Parse flags.
fs := flag.NewFlagSet("", flag.ContinueOnError)
- fs.SetOutput(cmd.Stderr)
- var txMaxSize int64
- fs.Int64Var(&txMaxSize, "tx-max-size", 0, "commit tx when key/value size sum exceed this value. If 0, only one transaction is used. If you are compacting a large database, set this to a value appropriate for the available memory.")
- help := fs.Bool("h", false, "print this help")
- if err := fs.Parse(args); err != nil {
- return err
- } else if *help {
+ fs.SetOutput(ioutil.Discard)
+ fs.StringVar(&cmd.DstPath, "o", "", "")
+ fs.Int64Var(&cmd.TxMaxSize, "tx-max-size", 65536, "")
+ if err := fs.Parse(args); err == flag.ErrHelp {
fmt.Fprintln(cmd.Stderr, cmd.Usage())
- fs.PrintDefaults()
return ErrUsage
+ } else if err != nil {
+ return err
+ } else if cmd.DstPath == "" {
+ return fmt.Errorf("output file required")
}
- // Require database path.
- path := fs.Arg(0)
- if path == "" {
+ // Require database paths.
+ cmd.SrcPath = fs.Arg(0)
+ if cmd.SrcPath == "" {
return ErrPathRequired
- } else if _, err := os.Stat(path); os.IsNotExist(err) {
- return ErrFileNotFound
}
- fi, err := os.Stat(path)
- if err != nil {
+
+ // Ensure source file exists.
+ fi, err := os.Stat(cmd.SrcPath)
+ if os.IsNotExist(err) {
+ return ErrFileNotFound
+ } else if err != nil {
return err
}
initialSize := fi.Size()
- // Open database.
- db, err := bolt.Open(path, 0444, nil)
+ // Open source database.
+ src, err := bolt.Open(cmd.SrcPath, 0444, nil)
if err != nil {
return err
}
- defer db.Close()
+ defer src.Close()
- var dstPath string
- if fs.NArg() < 2 {
- f, err := ioutil.TempFile("", "bolt-compact-")
- if err != nil {
- return fmt.Errorf("temp file: %v", err)
- }
- _ = f.Close()
- _ = os.Remove(f.Name())
- dstPath = f.Name()
- fmt.Fprintf(cmd.Stdout, "compacting db to %s\n", dstPath)
- } else {
- dstPath = fs.Arg(1)
+ // Open destination database.
+ dst, err := bolt.Open(cmd.DstPath, fi.Mode(), nil)
+ if err != nil {
+ return err
}
+ defer dst.Close()
- defer func() {
- fi, err := os.Stat(dstPath)
- if err != nil {
- fmt.Fprintln(cmd.Stderr, err)
- }
- newSize := fi.Size()
- if newSize == 0 {
- fmt.Fprintln(cmd.Stderr, "db size is 0")
- }
- fmt.Fprintf(cmd.Stdout, "%d -> %d bytes (gain=%.2fx)\n", initialSize, newSize, float64(initialSize)/float64(newSize))
- }()
+ // Run compaction.
+ if err := cmd.compact(dst, src); err != nil {
+ return err
+ }
- dstdb, err := bolt.Open(dstPath, 0666, nil)
+ // Report stats on new size.
+ fi, err = os.Stat(cmd.DstPath)
if err != nil {
return err
+ } else if fi.Size() == 0 {
+ return fmt.Errorf("zero db size")
}
- defer dstdb.Close()
+ fmt.Fprintf(cmd.Stdout, "%d -> %d bytes (gain=%.2fx)\n", initialSize, fi.Size(), float64(initialSize)/float64(fi.Size()))
+ return nil
+}
+
+func (cmd *CompactCommand) compact(dst, src *bolt.DB) error {
// commit regularly, or we'll run out of memory for large datasets if using one transaction.
var size int64
- tx, err := dstdb.Begin(true)
+ tx, err := dst.Begin(true)
if err != nil {
return err
}
- defer func() {
- if err != nil {
- _ = tx.Rollback()
- } else {
- err = tx.Commit()
- }
- }()
- return cmd.Walk(db, func(keys [][]byte, k []byte, v []byte) error {
- s := int64(len(k) + len(v))
- if size+s > txMaxSize && txMaxSize != 0 {
+ defer tx.Rollback()
+
+ if err := cmd.walk(src, func(keys [][]byte, k, v []byte, seq uint64) error {
+ // On each key/value, check if we have exceeded tx size.
+ sz := int64(len(k) + len(v))
+ if size+sz > cmd.TxMaxSize && cmd.TxMaxSize != 0 {
+ // Commit previous transaction.
if err := tx.Commit(); err != nil {
return err
}
- tx, err = dstdb.Begin(true)
+
+ // Start new transaction.
+ tx, err = dst.Begin(true)
if err != nil {
return err
}
size = 0
}
- size += s
+ size += sz
+
+ // Create bucket on the root transaction if this is the first level.
nk := len(keys)
if nk == 0 {
- _, err := tx.CreateBucket(k)
- return err
+ bkt, err := tx.CreateBucket(k)
+ if err != nil {
+ return err
+ }
+ if err := bkt.SetSequence(seq); err != nil {
+ return err
+ }
+ return nil
}
+ // Create buckets on subsequent levels, if necessary.
b := tx.Bucket(keys[0])
if nk > 1 {
for _, k := range keys[1:] {
b = b.Bucket(k)
}
}
+
+ // If there is no value then this is a bucket call.
if v == nil {
- _, err := b.CreateBucket(k)
- return err
+ bkt, err := b.CreateBucket(k)
+ if err != nil {
+ return err
+ }
+ if err := bkt.SetSequence(seq); err != nil {
+ return err
+ }
+ return nil
}
+
+ // Otherwise treat it as a key/value pair.
return b.Put(k, v)
+ }); err != nil {
+ return err
+ }
+
+ return tx.Commit()
+}
+
+// walkFunc is the type of the function called for keys (buckets and "normal"
+// values) discovered by Walk. keys is the list of keys to descend to the bucket
+// owning the discovered key/value pair k/v.
+type walkFunc func(keys [][]byte, k, v []byte, seq uint64) error
+
+// walk walks recursively the bolt database db, calling walkFn for each key it finds.
+func (cmd *CompactCommand) walk(db *bolt.DB, walkFn walkFunc) error {
+ return db.View(func(tx *bolt.Tx) error {
+ return tx.ForEach(func(name []byte, b *bolt.Bucket) error {
+ return cmd.walkBucket(b, nil, name, nil, b.Sequence(), walkFn)
+ })
+ })
+}
+
+func (cmd *CompactCommand) walkBucket(b *bolt.Bucket, keypath [][]byte, k, v []byte, seq uint64, fn walkFunc) error {
+ // Execute callback.
+ if err := fn(keypath, k, v, seq); err != nil {
+ return err
+ }
+
+ // If this is not a bucket then stop.
+ if v != nil {
+ return nil
+ }
+
+ // Iterate over each child key/value.
+ keypath = append(keypath, k)
+ return b.ForEach(func(k, v []byte) error {
+ if v == nil {
+ bkt := b.Bucket(k)
+ return cmd.walkBucket(bkt, keypath, k, nil, bkt.Sequence(), fn)
+ }
+ return cmd.walkBucket(b, keypath, k, v, b.Sequence(), fn)
})
}
// Usage returns the help message.
func (cmd *CompactCommand) Usage() string {
return strings.TrimLeft(`
-usage: bolt compact PATH [DST_PATH]
+usage: bolt compact [options] -o DST SRC
+
+Compact opens a database at SRC path and walks it recursively, copying keys
+as they are found from all buckets, to a newly created database at DST path.
-Compact opens a database at PATH and walks it recursively entirely,
-copying keys as they are found from all buckets, to a newly created db.
+The original database is left untouched.
-If DST_PATH is non-empty, the new db is created at DST_PATH, else it will be
-in a temporary location.
+Additional options include:
-The original db is left untouched.
+ -tx-max-size NUM
+ Specifies the maximum size of individual transactions.
+ Defaults to 64KB.
`, "\n")
}
diff --git a/cmd/bolt/main_test.go b/cmd/bolt/main_test.go
index e8942a0..0a11ff3 100644
--- a/cmd/bolt/main_test.go
+++ b/cmd/bolt/main_test.go
@@ -209,6 +209,9 @@ func TestCompactCommand_Run(t *testing.T) {
if err != nil {
return err
}
+ if err := b.SetSequence(uint64(i)); err != nil {
+ return err
+ }
if err := fillBucket(b, append(k, '.')); err != nil {
return err
}
@@ -263,7 +266,7 @@ func TestCompactCommand_Run(t *testing.T) {
}
m := NewMain()
- if err := m.Run("compact", db.Path, dstdb.Path); err != nil {
+ if err := m.Run("compact", "-o", dstdb.Path, db.Path); err != nil {
t.Fatal(err)
}
@@ -336,22 +339,10 @@ func chkdb(path string) ([]byte, error) {
}
func walkBucket(parent *bolt.Bucket, k []byte, v []byte, w io.Writer) error {
- _, err := w.Write(k)
- if err != nil {
- return err
- }
- _, err = io.WriteString(w, ":")
- if err != nil {
- return err
- }
- _, err = w.Write(v)
- if err != nil {
- return err
- }
- _, err = fmt.Fprintln(w)
- if err != nil {
+ if _, err := fmt.Fprintf(w, "%d:%x=%x\n", parent.Sequence(), k, v); err != nil {
return err
}
+
// not a bucket, exit.
if v != nil {
return nil