aboutsummaryrefslogtreecommitdiff
path: root/db.go
diff options
context:
space:
mode:
authorBen Johnson <benbjohnson@yahoo.com>2016-01-02 22:34:41 -0700
committerBen Johnson <benbjohnson@yahoo.com>2016-01-02 22:34:41 -0700
commit34a0fa5307f7562980fb8e7ff4723f7987edf49b (patch)
tree26a01c0d5a3da53f45c7f54062c3af11479c4a2f /db.go
parentMerge pull request #474 from elithrar/patch-1 (diff)
parenttest suite refactoring (diff)
downloaddedo-34a0fa5307f7562980fb8e7ff4723f7987edf49b.tar.gz
dedo-34a0fa5307f7562980fb8e7ff4723f7987edf49b.tar.xz
Merge pull request #477 from benbjohnson/testing
Test suite refactoring
Diffstat (limited to 'db.go')
-rw-r--r--db.go136
1 files changed, 135 insertions, 1 deletions
diff --git a/db.go b/db.go
index 44ebabb..f559ee5 100644
--- a/db.go
+++ b/db.go
@@ -1,8 +1,10 @@
package bolt
import (
+ "errors"
"fmt"
"hash/fnv"
+ "log"
"os"
"runtime"
"runtime/debug"
@@ -387,7 +389,9 @@ func (db *DB) close() error {
// No need to unlock read-only file.
if !db.readOnly {
// Unlock the file.
- _ = funlock(db.file)
+ if err := funlock(db.file); err != nil {
+ log.Printf("bolt.Close(): funlock error: %s", err)
+ }
}
// Close the file descriptor.
@@ -598,6 +602,136 @@ func (db *DB) View(fn func(*Tx) error) error {
return nil
}
+// Batch calls fn as part of a batch. It behaves similar to Update,
+// except:
+//
+// 1. concurrent Batch calls can be combined into a single Bolt
+// transaction.
+//
+// 2. the function passed to Batch may be called multiple times,
+// regardless of whether it returns error or not.
+//
+// This means that Batch function side effects must be idempotent and
+// take permanent effect only after a successful return is seen in
+// caller.
+//
+// The maximum batch size and delay can be adjusted with DB.MaxBatchSize
+// and DB.MaxBatchDelay, respectively.
+//
+// Batch is only useful when there are multiple goroutines calling it.
+func (db *DB) Batch(fn func(*Tx) error) error {
+ errCh := make(chan error, 1)
+
+ db.batchMu.Lock()
+ if (db.batch == nil) || (db.batch != nil && len(db.batch.calls) >= db.MaxBatchSize) {
+ // There is no existing batch, or the existing batch is full; start a new one.
+ db.batch = &batch{
+ db: db,
+ }
+ db.batch.timer = time.AfterFunc(db.MaxBatchDelay, db.batch.trigger)
+ }
+ db.batch.calls = append(db.batch.calls, call{fn: fn, err: errCh})
+ if len(db.batch.calls) >= db.MaxBatchSize {
+ // wake up batch, it's ready to run
+ go db.batch.trigger()
+ }
+ db.batchMu.Unlock()
+
+ err := <-errCh
+ if err == trySolo {
+ err = db.Update(fn)
+ }
+ return err
+}
+
+type call struct {
+ fn func(*Tx) error
+ err chan<- error
+}
+
+type batch struct {
+ db *DB
+ timer *time.Timer
+ start sync.Once
+ calls []call
+}
+
+// trigger runs the batch if it hasn't already been run.
+func (b *batch) trigger() {
+ b.start.Do(b.run)
+}
+
+// run performs the transactions in the batch and communicates results
+// back to DB.Batch.
+func (b *batch) run() {
+ b.db.batchMu.Lock()
+ b.timer.Stop()
+ // Make sure no new work is added to this batch, but don't break
+ // other batches.
+ if b.db.batch == b {
+ b.db.batch = nil
+ }
+ b.db.batchMu.Unlock()
+
+retry:
+ for len(b.calls) > 0 {
+ var failIdx = -1
+ err := b.db.Update(func(tx *Tx) error {
+ for i, c := range b.calls {
+ if err := safelyCall(c.fn, tx); err != nil {
+ failIdx = i
+ return err
+ }
+ }
+ return nil
+ })
+
+ if failIdx >= 0 {
+ // take the failing transaction out of the batch. it's
+ // safe to shorten b.calls here because db.batch no longer
+ // points to us, and we hold the mutex anyway.
+ c := b.calls[failIdx]
+ b.calls[failIdx], b.calls = b.calls[len(b.calls)-1], b.calls[:len(b.calls)-1]
+ // tell the submitter re-run it solo, continue with the rest of the batch
+ c.err <- trySolo
+ continue retry
+ }
+
+ // pass success, or bolt internal errors, to all callers
+ for _, c := range b.calls {
+ if c.err != nil {
+ c.err <- err
+ }
+ }
+ break retry
+ }
+}
+
+// trySolo is a special sentinel error value used for signaling that a
+// transaction function should be re-run. It should never be seen by
+// callers.
+var trySolo = errors.New("batch function returned an error and should be re-run solo")
+
+type panicked struct {
+ reason interface{}
+}
+
+func (p panicked) Error() string {
+ if err, ok := p.reason.(error); ok {
+ return err.Error()
+ }
+ return fmt.Sprintf("panic: %v", p.reason)
+}
+
+func safelyCall(fn func(*Tx) error, tx *Tx) (err error) {
+ defer func() {
+ if p := recover(); p != nil {
+ err = panicked{p}
+ }
+ }()
+ return fn(tx)
+}
+
// Sync executes fdatasync() against the database file handle.
//
// This is not necessary under normal operation, however, if you use NoSync