aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--README.md42
-rw-r--r--batch.go135
-rw-r--r--batch_benchmark_test.go170
-rw-r--r--batch_example_test.go148
-rw-r--r--batch_test.go167
-rw-r--r--db.go29
-rw-r--r--db_test.go28
7 files changed, 719 insertions, 0 deletions
diff --git a/README.md b/README.md
index cb65702..bfaccfc 100644
--- a/README.md
+++ b/README.md
@@ -125,6 +125,48 @@ no mutating operations are allowed within a read-only transaction. You can only
retrieve buckets, retrieve values, and copy the database within a read-only
transaction.
+
+#### Batch read-write transactions
+
+Each `DB.Update()` waits for disk to commit the writes. This overhead
+can be minimized by combining multiple updates with the `DB.Batch()`
+function:
+
+```go
+err := db.Batch(func(tx *bolt.Tx) error {
+ ...
+ return nil
+})
+```
+
+Concurrent Batch calls are opportunistically combined into larger
+transactions. Batch is only useful when there are multiple goroutines
+calling it.
+
+The trade-off is that `Batch` can call the given
+function multiple times, if parts of the transaction fail. The
+function must be idempotent and side effects must take effect only
+after a successful return from `DB.Batch()`.
+
+For example: don't display messages from inside the function, instead
+set variables in the enclosing scope:
+
+```go
+var id uint64
+err := db.Batch(func(tx *bolt.Tx) error {
+ // Find last key in bucket, decode as bigendian uint64, increment
+ // by one, encode back to []byte, and add new key.
+ ...
+ id = newValue
+ return nil
+})
+if err != nil {
+ return ...
+}
+fmt.Println("Allocated ID %d", id)
+```
+
+
#### Managing transactions manually
The `DB.View()` and `DB.Update()` functions are wrappers around the `DB.Begin()`
diff --git a/batch.go b/batch.go
new file mode 100644
index 0000000..bef1f4a
--- /dev/null
+++ b/batch.go
@@ -0,0 +1,135 @@
+package bolt
+
+import (
+ "errors"
+ "fmt"
+ "sync"
+ "time"
+)
+
+// Batch calls fn as part of a batch. It behaves similar to Update,
+// except:
+//
+// 1. concurrent Batch calls can be combined into a single Bolt
+// transaction.
+//
+// 2. the function passed to Batch may be called multiple times,
+// regardless of whether it returns error or not.
+//
+// This means that Batch function side effects must be idempotent and
+// take permanent effect only after a successful return is seen in
+// caller.
+//
+// Batch is only useful when there are multiple goroutines calling it.
+func (db *DB) Batch(fn func(*Tx) error) error {
+ errCh := make(chan error, 1)
+
+ db.batchMu.Lock()
+ if (db.batch == nil) || (db.batch != nil && len(db.batch.calls) >= db.MaxBatchSize) {
+ // There is no existing batch, or the existing batch is full; start a new one.
+ db.batch = &batch{
+ db: db,
+ }
+ db.batch.timer = time.AfterFunc(db.MaxBatchDelay, db.batch.trigger)
+ }
+ db.batch.calls = append(db.batch.calls, call{fn: fn, err: errCh})
+ if len(db.batch.calls) >= db.MaxBatchSize {
+ // wake up batch, it's ready to run
+ go db.batch.trigger()
+ }
+ db.batchMu.Unlock()
+
+ err := <-errCh
+ if err == trySolo {
+ err = db.Update(fn)
+ }
+ return err
+}
+
+type call struct {
+ fn func(*Tx) error
+ err chan<- error
+}
+
+type batch struct {
+ db *DB
+ timer *time.Timer
+ start sync.Once
+ calls []call
+}
+
+// trigger runs the batch if it hasn't already been run.
+func (b *batch) trigger() {
+ b.start.Do(b.run)
+}
+
+// run performs the transactions in the batch and communicates results
+// back to DB.Batch.
+func (b *batch) run() {
+ b.db.batchMu.Lock()
+ b.timer.Stop()
+ // Make sure no new work is added to this batch, but don't break
+ // other batches.
+ if b.db.batch == b {
+ b.db.batch = nil
+ }
+ b.db.batchMu.Unlock()
+
+retry:
+ for len(b.calls) > 0 {
+ var failIdx = -1
+ err := b.db.Update(func(tx *Tx) error {
+ for i, c := range b.calls {
+ if err := safelyCall(c.fn, tx); err != nil {
+ failIdx = i
+ return err
+ }
+ }
+ return nil
+ })
+
+ if failIdx >= 0 {
+ // take the failing transaction out of the batch. it's
+ // safe to shorten b.calls here because db.batch no longer
+ // points to us, and we hold the mutex anyway.
+ c := b.calls[failIdx]
+ b.calls[failIdx], b.calls = b.calls[len(b.calls)-1], b.calls[:len(b.calls)-1]
+ // tell the submitter re-run it solo, continue with the rest of the batch
+ c.err <- trySolo
+ continue retry
+ }
+
+ // pass success, or bolt internal errors, to all callers
+ for _, c := range b.calls {
+ if c.err != nil {
+ c.err <- err
+ }
+ }
+ break retry
+ }
+}
+
+// trySolo is a special sentinel error value used for signaling that a
+// transaction function should be re-run. It should never be seen by
+// callers.
+var trySolo = errors.New("batch function returned an error and should be re-run solo")
+
+type panicked struct {
+ reason interface{}
+}
+
+func (p panicked) Error() string {
+ if err, ok := p.reason.(error); ok {
+ return err.Error()
+ }
+ return fmt.Sprintf("panic: %v", p.reason)
+}
+
+func safelyCall(fn func(*Tx) error, tx *Tx) (err error) {
+ defer func() {
+ if p := recover(); p != nil {
+ err = panicked{p}
+ }
+ }()
+ return fn(tx)
+}
diff --git a/batch_benchmark_test.go b/batch_benchmark_test.go
new file mode 100644
index 0000000..b745a37
--- /dev/null
+++ b/batch_benchmark_test.go
@@ -0,0 +1,170 @@
+package bolt_test
+
+import (
+ "bytes"
+ "encoding/binary"
+ "errors"
+ "hash/fnv"
+ "sync"
+ "testing"
+
+ "github.com/boltdb/bolt"
+)
+
+func validateBatchBench(b *testing.B, db *TestDB) {
+ var rollback = errors.New("sentinel error to cause rollback")
+ validate := func(tx *bolt.Tx) error {
+ bucket := tx.Bucket([]byte("bench"))
+ h := fnv.New32a()
+ buf := make([]byte, 4)
+ for id := uint32(0); id < 1000; id++ {
+ binary.LittleEndian.PutUint32(buf, id)
+ h.Reset()
+ h.Write(buf[:])
+ k := h.Sum(nil)
+ v := bucket.Get(k)
+ if v == nil {
+ b.Errorf("not found id=%d key=%x", id, k)
+ continue
+ }
+ if g, e := v, []byte("filler"); !bytes.Equal(g, e) {
+ b.Errorf("bad value for id=%d key=%x: %s != %q", id, k, g, e)
+ }
+ if err := bucket.Delete(k); err != nil {
+ return err
+ }
+ }
+ // should be empty now
+ c := bucket.Cursor()
+ for k, v := c.First(); k != nil; k, v = c.Next() {
+ b.Errorf("unexpected key: %x = %q", k, v)
+ }
+ return rollback
+ }
+ if err := db.Update(validate); err != nil && err != rollback {
+ b.Error(err)
+ }
+}
+
+func BenchmarkDBBatchAutomatic(b *testing.B) {
+ db := NewTestDB()
+ defer db.Close()
+ db.MustCreateBucket([]byte("bench"))
+
+ b.ResetTimer()
+ for i := 0; i < b.N; i++ {
+ start := make(chan struct{})
+ var wg sync.WaitGroup
+
+ for round := 0; round < 1000; round++ {
+ wg.Add(1)
+
+ go func(id uint32) {
+ defer wg.Done()
+ <-start
+
+ h := fnv.New32a()
+ buf := make([]byte, 4)
+ binary.LittleEndian.PutUint32(buf, id)
+ h.Write(buf[:])
+ k := h.Sum(nil)
+ insert := func(tx *bolt.Tx) error {
+ b := tx.Bucket([]byte("bench"))
+ return b.Put(k, []byte("filler"))
+ }
+ if err := db.Batch(insert); err != nil {
+ b.Error(err)
+ return
+ }
+ }(uint32(round))
+ }
+ close(start)
+ wg.Wait()
+ }
+
+ b.StopTimer()
+ validateBatchBench(b, db)
+}
+
+func BenchmarkDBBatchSingle(b *testing.B) {
+ db := NewTestDB()
+ defer db.Close()
+ db.MustCreateBucket([]byte("bench"))
+
+ b.ResetTimer()
+ for i := 0; i < b.N; i++ {
+ start := make(chan struct{})
+ var wg sync.WaitGroup
+
+ for round := 0; round < 1000; round++ {
+ wg.Add(1)
+ go func(id uint32) {
+ defer wg.Done()
+ <-start
+
+ h := fnv.New32a()
+ buf := make([]byte, 4)
+ binary.LittleEndian.PutUint32(buf, id)
+ h.Write(buf[:])
+ k := h.Sum(nil)
+ insert := func(tx *bolt.Tx) error {
+ b := tx.Bucket([]byte("bench"))
+ return b.Put(k, []byte("filler"))
+ }
+ if err := db.Update(insert); err != nil {
+ b.Error(err)
+ return
+ }
+ }(uint32(round))
+ }
+ close(start)
+ wg.Wait()
+ }
+
+ b.StopTimer()
+ validateBatchBench(b, db)
+}
+
+func BenchmarkDBBatchManual10x100(b *testing.B) {
+ db := NewTestDB()
+ defer db.Close()
+ db.MustCreateBucket([]byte("bench"))
+
+ b.ResetTimer()
+ for i := 0; i < b.N; i++ {
+ start := make(chan struct{})
+ var wg sync.WaitGroup
+
+ for major := 0; major < 10; major++ {
+ wg.Add(1)
+ go func(id uint32) {
+ defer wg.Done()
+ <-start
+
+ insert100 := func(tx *bolt.Tx) error {
+ h := fnv.New32a()
+ buf := make([]byte, 4)
+ for minor := uint32(0); minor < 100; minor++ {
+ binary.LittleEndian.PutUint32(buf, uint32(id*100+minor))
+ h.Reset()
+ h.Write(buf[:])
+ k := h.Sum(nil)
+ b := tx.Bucket([]byte("bench"))
+ if err := b.Put(k, []byte("filler")); err != nil {
+ return err
+ }
+ }
+ return nil
+ }
+ if err := db.Update(insert100); err != nil {
+ b.Fatal(err)
+ }
+ }(uint32(major))
+ }
+ close(start)
+ wg.Wait()
+ }
+
+ b.StopTimer()
+ validateBatchBench(b, db)
+}
diff --git a/batch_example_test.go b/batch_example_test.go
new file mode 100644
index 0000000..74eff8a
--- /dev/null
+++ b/batch_example_test.go
@@ -0,0 +1,148 @@
+package bolt_test
+
+import (
+ "encoding/binary"
+ "fmt"
+ "io/ioutil"
+ "log"
+ "math/rand"
+ "net/http"
+ "net/http/httptest"
+ "os"
+
+ "github.com/boltdb/bolt"
+)
+
+// Set this to see how the counts are actually updated.
+const verbose = false
+
+// Counter updates a counter in Bolt for every URL path requested.
+type counter struct {
+ db *bolt.DB
+}
+
+func (c counter) ServeHTTP(rw http.ResponseWriter, req *http.Request) {
+ // Communicates the new count from a successful database
+ // transaction.
+ var result uint64
+
+ increment := func(tx *bolt.Tx) error {
+ b, err := tx.CreateBucketIfNotExists([]byte("hits"))
+ if err != nil {
+ return err
+ }
+ key := []byte(req.URL.String())
+ // Decode handles key not found for us.
+ count := decode(b.Get(key)) + 1
+ b.Put(key, encode(count))
+ // All good, communicate new count.
+ result = count
+ return nil
+ }
+ if err := c.db.Batch(increment); err != nil {
+ http.Error(rw, err.Error(), 500)
+ return
+ }
+
+ if verbose {
+ log.Printf("server: %s: %d", req.URL.String(), result)
+ }
+
+ rw.Header().Set("Content-Type", "application/octet-stream")
+ fmt.Fprintf(rw, "%d\n", result)
+}
+
+func client(id int, base string, paths []string) error {
+ // Process paths in random order.
+ rng := rand.New(rand.NewSource(int64(id)))
+ permutation := rng.Perm(len(paths))
+
+ for i := range paths {
+ path := paths[permutation[i]]
+ resp, err := http.Get(base + path)
+ if err != nil {
+ return err
+ }
+ defer resp.Body.Close()
+ buf, err := ioutil.ReadAll(resp.Body)
+ if err != nil {
+ return err
+ }
+ if verbose {
+ log.Printf("client: %s: %s", path, buf)
+ }
+ }
+ return nil
+}
+
+func ExampleDB_Batch() {
+ // Open the database.
+ db, _ := bolt.Open(tempfile(), 0666, nil)
+ defer os.Remove(db.Path())
+ defer db.Close()
+
+ // Start our web server
+ count := counter{db}
+ srv := httptest.NewServer(count)
+ defer srv.Close()
+
+ // Decrease the batch size to make things more interesting.
+ db.MaxBatchSize = 3
+
+ // Get every path multiple times concurrently.
+ const clients = 10
+ paths := []string{
+ "/foo",
+ "/bar",
+ "/baz",
+ "/quux",
+ "/thud",
+ "/xyzzy",
+ }
+ errors := make(chan error, clients)
+ for i := 0; i < clients; i++ {
+ go func(id int) {
+ errors <- client(id, srv.URL, paths)
+ }(i)
+ }
+ // Check all responses to make sure there's no error.
+ for i := 0; i < clients; i++ {
+ if err := <-errors; err != nil {
+ fmt.Printf("client error: %v", err)
+ return
+ }
+ }
+
+ // Check the final result
+ db.View(func(tx *bolt.Tx) error {
+ b := tx.Bucket([]byte("hits"))
+ c := b.Cursor()
+ for k, v := c.First(); k != nil; k, v = c.Next() {
+ fmt.Printf("hits to %s: %d\n", k, decode(v))
+ }
+ return nil
+ })
+
+ // Output:
+ // hits to /bar: 10
+ // hits to /baz: 10
+ // hits to /foo: 10
+ // hits to /quux: 10
+ // hits to /thud: 10
+ // hits to /xyzzy: 10
+}
+
+// encode marshals a counter.
+func encode(n uint64) []byte {
+ buf := make([]byte, 8)
+ binary.BigEndian.PutUint64(buf, n)
+ return buf
+}
+
+// decode unmarshals a counter. Nil buffers are decoded as 0.
+func decode(buf []byte) uint64 {
+ if buf == nil {
+ return 0
+ }
+ return binary.BigEndian.Uint64(buf)
+}
diff --git a/batch_test.go b/batch_test.go
new file mode 100644
index 0000000..0b5075f
--- /dev/null
+++ b/batch_test.go
@@ -0,0 +1,167 @@
+package bolt_test
+
+import (
+ "testing"
+ "time"
+
+ "github.com/boltdb/bolt"
+)
+
+// Ensure two functions can perform updates in a single batch.
+func TestDB_Batch(t *testing.T) {
+ db := NewTestDB()
+ defer db.Close()
+ db.MustCreateBucket([]byte("widgets"))
+
+ // Iterate over multiple updates in separate goroutines.
+ n := 2
+ ch := make(chan error)
+ for i := 0; i < n; i++ {
+ go func(i int) {
+ ch <- db.Batch(func(tx *bolt.Tx) error {
+ return tx.Bucket([]byte("widgets")).Put(u64tob(uint64(i)), []byte{})
+ })
+ }(i)
+ }
+
+ // Check all responses to make sure there's no error.
+ for i := 0; i < n; i++ {
+ if err := <-ch; err != nil {
+ t.Fatal(err)
+ }
+ }
+
+ // Ensure data is correct.
+ db.MustView(func(tx *bolt.Tx) error {
+ b := tx.Bucket([]byte("widgets"))
+ for i := 0; i < n; i++ {
+ if v := b.Get(u64tob(uint64(i))); v == nil {
+ t.Errorf("key not found: %d", i)
+ }
+ }
+ return nil
+ })
+}
+
+func TestDB_Batch_Panic(t *testing.T) {
+ db := NewTestDB()
+ defer db.Close()
+
+ var sentinel int
+ var bork = &sentinel
+ var problem interface{}
+ var err error
+
+ // Execute a function inside a batch that panics.
+ func() {
+ defer func() {
+ if p := recover(); p != nil {
+ problem = p
+ }
+ }()
+ err = db.Batch(func(tx *bolt.Tx) error {
+ panic(bork)
+ })
+ }()
+
+ // Verify there is no error.
+ if g, e := err, error(nil); g != e {
+ t.Fatalf("wrong error: %v != %v", g, e)
+ }
+ // Verify the panic was captured.
+ if g, e := problem, bork; g != e {
+ t.Fatalf("wrong error: %v != %v", g, e)
+ }
+}
+
+func TestDB_BatchFull(t *testing.T) {
+ db := NewTestDB()
+ defer db.Close()
+ db.MustCreateBucket([]byte("widgets"))
+
+ const size = 3
+ // buffered so we never leak goroutines
+ ch := make(chan error, size)
+ put := func(i int) {
+ ch <- db.Batch(func(tx *bolt.Tx) error {
+ return tx.Bucket([]byte("widgets")).Put(u64tob(uint64(i)), []byte{})
+ })
+ }
+
+ db.MaxBatchSize = size
+ // high enough to never trigger here
+ db.MaxBatchDelay = 1 * time.Hour
+
+ go put(1)
+ go put(2)
+
+ // Give the batch a chance to exhibit bugs.
+ time.Sleep(10 * time.Millisecond)
+
+ // not triggered yet
+ select {
+ case <-ch:
+ t.Fatalf("batch triggered too early")
+ default:
+ }
+
+ go put(3)
+
+ // Check all responses to make sure there's no error.
+ for i := 0; i < size; i++ {
+ if err := <-ch; err != nil {
+ t.Fatal(err)
+ }
+ }
+
+ // Ensure data is correct.
+ db.MustView(func(tx *bolt.Tx) error {
+ b := tx.Bucket([]byte("widgets"))
+ for i := 1; i <= size; i++ {
+ if v := b.Get(u64tob(uint64(i))); v == nil {
+ t.Errorf("key not found: %d", i)
+ }
+ }
+ return nil
+ })
+}
+
+func TestDB_BatchTime(t *testing.T) {
+ db := NewTestDB()
+ defer db.Close()
+ db.MustCreateBucket([]byte("widgets"))
+
+ const size = 1
+ // buffered so we never leak goroutines
+ ch := make(chan error, size)
+ put := func(i int) {
+ ch <- db.Batch(func(tx *bolt.Tx) error {
+ return tx.Bucket([]byte("widgets")).Put(u64tob(uint64(i)), []byte{})
+ })
+ }
+
+ db.MaxBatchSize = 1000
+ db.MaxBatchDelay = 0
+
+ go put(1)
+
+ // Batch must trigger by time alone.
+
+ // Check all responses to make sure there's no error.
+ for i := 0; i < size; i++ {
+ if err := <-ch; err != nil {
+ t.Fatal(err)
+ }
+ }
+
+ // Ensure data is correct.
+ db.MustView(func(tx *bolt.Tx) error {
+ b := tx.Bucket([]byte("widgets"))
+ for i := 1; i <= size; i++ {
+ if v := b.Get(u64tob(uint64(i))); v == nil {
+ t.Errorf("key not found: %d", i)
+ }
+ }
+ return nil
+ })
+}
diff --git a/db.go b/db.go
index 4775850..d4c85fb 100644
--- a/db.go
+++ b/db.go
@@ -27,6 +27,12 @@ const magic uint32 = 0xED0CDAED
// must be synchronzied using the msync(2) syscall.
const IgnoreNoSync = runtime.GOOS == "openbsd"
+// Default values if not set in a DB instance.
+const (
+ DefaultMaxBatchSize int = 1000
+ DefaultMaxBatchDelay = 10 * time.Millisecond
+)
+
// DB represents a collection of buckets persisted to a file on disk.
// All data access is performed through transactions which can be obtained through the DB.
// All the functions on DB will return a ErrDatabaseNotOpen if accessed before Open() is called.
@@ -49,6 +55,22 @@ type DB struct {
// THIS IS UNSAFE. PLEASE USE WITH CAUTION.
NoSync bool
+ // MaxBatchSize is the maximum size of a batch. Default value is
+ // copied from DefaultMaxBatchSize in Open.
+ //
+ // If <=0, disables batching.
+ //
+ // Do not change concurrently with calls to Batch.
+ MaxBatchSize int
+
+ // MaxBatchDelay is the maximum delay before a batch starts.
+ // Default value is copied from DefaultMaxBatchDelay in Open.
+ //
+ // If <=0, effectively disables batching.
+ //
+ // Do not change concurrently with calls to Batch.
+ MaxBatchDelay time.Duration
+
path string
file *os.File
dataref []byte
@@ -63,6 +85,9 @@ type DB struct {
freelist *freelist
stats Stats
+ batchMu sync.Mutex
+ batch *batch
+
rwlock sync.Mutex // Allows only one writer at a time.
metalock sync.Mutex // Protects meta page access.
mmaplock sync.RWMutex // Protects mmap access during remapping.
@@ -99,6 +124,10 @@ func Open(path string, mode os.FileMode, options *Options) (*DB, error) {
options = DefaultOptions
}
+ // Set default values for later DB operations.
+ db.MaxBatchSize = DefaultMaxBatchSize
+ db.MaxBatchDelay = DefaultMaxBatchDelay
+
// Open data file and separate sync handler for metadata writes.
db.path = path
diff --git a/db_test.go b/db_test.go
index ecff7d8..ad17e87 100644
--- a/db_test.go
+++ b/db_test.go
@@ -618,6 +618,34 @@ func NewTestDB() *TestDB {
return &TestDB{db}
}
+// MustView executes a read-only function. Panic on error.
+func (db *TestDB) MustView(fn func(tx *bolt.Tx) error) {
+ if err := db.DB.View(func(tx *bolt.Tx) error {
+ return fn(tx)
+ }); err != nil {
+ panic(err.Error())
+ }
+}
+
+// MustUpdate executes a read-write function. Panic on error.
+func (db *TestDB) MustUpdate(fn func(tx *bolt.Tx) error) {
+ if err := db.DB.View(func(tx *bolt.Tx) error {
+ return fn(tx)
+ }); err != nil {
+ panic(err.Error())
+ }
+}
+
+// MustCreateBucket creates a new bucket. Panic on error.
+func (db *TestDB) MustCreateBucket(name []byte) {
+ if err := db.Update(func(tx *bolt.Tx) error {
+ _, err := tx.CreateBucket([]byte(name))
+ return err
+ }); err != nil {
+ panic(err.Error())
+ }
+}
+
// Close closes the database and deletes the underlying file.
func (db *TestDB) Close() {
// Log statistics.