aboutsummaryrefslogtreecommitdiff
path: root/db.go
diff options
context:
space:
mode:
Diffstat (limited to 'db.go')
-rw-r--r--db.go170
1 files changed, 163 insertions, 7 deletions
diff --git a/db.go b/db.go
index fd920a0..a4b8efb 100644
--- a/db.go
+++ b/db.go
@@ -1,8 +1,10 @@
package bolt
import (
+ "errors"
"fmt"
"hash/fnv"
+ "log"
"os"
"runtime"
"runtime/debug"
@@ -63,6 +65,10 @@ type DB struct {
// https://github.com/boltdb/bolt/issues/284
NoGrowSync bool
+ // If you want to read the entire database fast, you can set MmapFlag to
+ // syscall.MAP_POPULATE on Linux 2.6.23+ for sequential read-ahead.
+ MmapFlags int
+
// MaxBatchSize is the maximum size of a batch. Default value is
// copied from DefaultMaxBatchSize in Open.
//
@@ -137,6 +143,7 @@ func Open(path string, mode os.FileMode, options *Options) (*DB, error) {
options = DefaultOptions
}
db.NoGrowSync = options.NoGrowSync
+ db.MmapFlags = options.MmapFlags
// Set default values for later DB operations.
db.MaxBatchSize = DefaultMaxBatchSize
@@ -173,7 +180,7 @@ func Open(path string, mode os.FileMode, options *Options) (*DB, error) {
// Initialize the database if it doesn't exist.
if info, err := db.file.Stat(); err != nil {
- return nil, fmt.Errorf("stat error: %s", err)
+ return nil, err
} else if info.Size() == 0 {
// Initialize new files with meta pages.
if err := db.init(); err != nil {
@@ -185,14 +192,14 @@ func Open(path string, mode os.FileMode, options *Options) (*DB, error) {
if _, err := db.file.ReadAt(buf[:], 0); err == nil {
m := db.pageInBuffer(buf[:], 0).meta()
if err := m.validate(); err != nil {
- return nil, fmt.Errorf("meta0 error: %s", err)
+ return nil, err
}
db.pageSize = int(m.pageSize)
}
}
// Memory map the data file.
- if err := db.mmap(0); err != nil {
+ if err := db.mmap(options.InitialMmapSize); err != nil {
_ = db.close()
return nil, err
}
@@ -249,10 +256,10 @@ func (db *DB) mmap(minsz int) error {
// Validate the meta pages.
if err := db.meta0.validate(); err != nil {
- return fmt.Errorf("meta0 error: %s", err)
+ return err
}
if err := db.meta1.validate(); err != nil {
- return fmt.Errorf("meta1 error: %s", err)
+ return err
}
return nil
@@ -267,7 +274,7 @@ func (db *DB) munmap() error {
}
// mmapSize determines the appropriate size for the mmap given the current size
-// of the database. The minimum size is 1MB and doubles until it reaches 1GB.
+// of the database. The minimum size is 32KB and doubles until it reaches 1GB.
// Returns an error if the new mmap size is greater than the max allowed.
func (db *DB) mmapSize(size int) (int, error) {
// Double the size from 32KB until 1GB.
@@ -383,7 +390,9 @@ func (db *DB) close() error {
// No need to unlock read-only file.
if !db.readOnly {
// Unlock the file.
- _ = funlock(db.file)
+ if err := funlock(db.file); err != nil {
+ log.Printf("bolt.Close(): funlock error: %s", err)
+ }
}
// Close the file descriptor.
@@ -407,6 +416,10 @@ func (db *DB) close() error {
// writer to deadlock because the database periodically needs to re-mmap itself
// as it grows and it cannot do that while a read transaction is open.
//
+// If a long running read transaction (for example, a snapshot transaction) is
+// needed, you might want to set DB.InitialMmapSize to a large enough value
+// to avoid potential blocking of write transaction.
+//
// IMPORTANT: You must close read-only transactions after you are finished or
// else the database will not reclaim old pages.
func (db *DB) Begin(writable bool) (*Tx, error) {
@@ -590,6 +603,136 @@ func (db *DB) View(fn func(*Tx) error) error {
return nil
}
+// Batch calls fn as part of a batch. It behaves similar to Update,
+// except:
+//
+// 1. concurrent Batch calls can be combined into a single Bolt
+// transaction.
+//
+// 2. the function passed to Batch may be called multiple times,
+// regardless of whether it returns error or not.
+//
+// This means that Batch function side effects must be idempotent and
+// take permanent effect only after a successful return is seen in
+// caller.
+//
+// The maximum batch size and delay can be adjusted with DB.MaxBatchSize
+// and DB.MaxBatchDelay, respectively.
+//
+// Batch is only useful when there are multiple goroutines calling it.
+func (db *DB) Batch(fn func(*Tx) error) error {
+ errCh := make(chan error, 1)
+
+ db.batchMu.Lock()
+ if (db.batch == nil) || (db.batch != nil && len(db.batch.calls) >= db.MaxBatchSize) {
+ // There is no existing batch, or the existing batch is full; start a new one.
+ db.batch = &batch{
+ db: db,
+ }
+ db.batch.timer = time.AfterFunc(db.MaxBatchDelay, db.batch.trigger)
+ }
+ db.batch.calls = append(db.batch.calls, call{fn: fn, err: errCh})
+ if len(db.batch.calls) >= db.MaxBatchSize {
+ // wake up batch, it's ready to run
+ go db.batch.trigger()
+ }
+ db.batchMu.Unlock()
+
+ err := <-errCh
+ if err == trySolo {
+ err = db.Update(fn)
+ }
+ return err
+}
+
+type call struct {
+ fn func(*Tx) error
+ err chan<- error
+}
+
+type batch struct {
+ db *DB
+ timer *time.Timer
+ start sync.Once
+ calls []call
+}
+
+// trigger runs the batch if it hasn't already been run.
+func (b *batch) trigger() {
+ b.start.Do(b.run)
+}
+
+// run performs the transactions in the batch and communicates results
+// back to DB.Batch.
+func (b *batch) run() {
+ b.db.batchMu.Lock()
+ b.timer.Stop()
+ // Make sure no new work is added to this batch, but don't break
+ // other batches.
+ if b.db.batch == b {
+ b.db.batch = nil
+ }
+ b.db.batchMu.Unlock()
+
+retry:
+ for len(b.calls) > 0 {
+ var failIdx = -1
+ err := b.db.Update(func(tx *Tx) error {
+ for i, c := range b.calls {
+ if err := safelyCall(c.fn, tx); err != nil {
+ failIdx = i
+ return err
+ }
+ }
+ return nil
+ })
+
+ if failIdx >= 0 {
+ // take the failing transaction out of the batch. it's
+ // safe to shorten b.calls here because db.batch no longer
+ // points to us, and we hold the mutex anyway.
+ c := b.calls[failIdx]
+ b.calls[failIdx], b.calls = b.calls[len(b.calls)-1], b.calls[:len(b.calls)-1]
+ // tell the submitter re-run it solo, continue with the rest of the batch
+ c.err <- trySolo
+ continue retry
+ }
+
+ // pass success, or bolt internal errors, to all callers
+ for _, c := range b.calls {
+ if c.err != nil {
+ c.err <- err
+ }
+ }
+ break retry
+ }
+}
+
+// trySolo is a special sentinel error value used for signaling that a
+// transaction function should be re-run. It should never be seen by
+// callers.
+var trySolo = errors.New("batch function returned an error and should be re-run solo")
+
+type panicked struct {
+ reason interface{}
+}
+
+func (p panicked) Error() string {
+ if err, ok := p.reason.(error); ok {
+ return err.Error()
+ }
+ return fmt.Sprintf("panic: %v", p.reason)
+}
+
+func safelyCall(fn func(*Tx) error, tx *Tx) (err error) {
+ defer func() {
+ if p := recover(); p != nil {
+ err = panicked{p}
+ }
+ }()
+ return fn(tx)
+}
+
// Sync executes fdatasync() against the database file handle.
//
// This is not necessary under normal operation, however, if you use NoSync
@@ -704,6 +847,19 @@ type Options struct {
// Open database in read-only mode. Uses flock(..., LOCK_SH |LOCK_NB) to
// grab a shared lock (UNIX).
ReadOnly bool
+
+ // Sets the DB.MmapFlags flag before memory mapping the file.
+ MmapFlags int
+
+ // InitialMmapSize is the initial mmap size of the database
+ // in bytes. Read transactions won't block write transaction
+ // if the InitialMmapSize is large enough to hold database mmap
+ // size. (See DB.Begin for more information)
+ //
+ // If <=0, the initial map size is 0.
+ // If initialMmapSize is smaller than the previous database size,
+ // it takes no effect.
+ InitialMmapSize int
}
// DefaultOptions represent the options used if nil options are passed into Open().