diff options
Diffstat (limited to 'db.go')
-rw-r--r-- | db.go | 170 |
1 files changed, 163 insertions, 7 deletions
@@ -1,8 +1,10 @@ package bolt import ( + "errors" "fmt" "hash/fnv" + "log" "os" "runtime" "runtime/debug" @@ -63,6 +65,10 @@ type DB struct { // https://github.com/boltdb/bolt/issues/284 NoGrowSync bool + // If you want to read the entire database fast, you can set MmapFlag to + // syscall.MAP_POPULATE on Linux 2.6.23+ for sequential read-ahead. + MmapFlags int + // MaxBatchSize is the maximum size of a batch. Default value is // copied from DefaultMaxBatchSize in Open. // @@ -137,6 +143,7 @@ func Open(path string, mode os.FileMode, options *Options) (*DB, error) { options = DefaultOptions } db.NoGrowSync = options.NoGrowSync + db.MmapFlags = options.MmapFlags // Set default values for later DB operations. db.MaxBatchSize = DefaultMaxBatchSize @@ -173,7 +180,7 @@ func Open(path string, mode os.FileMode, options *Options) (*DB, error) { // Initialize the database if it doesn't exist. if info, err := db.file.Stat(); err != nil { - return nil, fmt.Errorf("stat error: %s", err) + return nil, err } else if info.Size() == 0 { // Initialize new files with meta pages. if err := db.init(); err != nil { @@ -185,14 +192,14 @@ func Open(path string, mode os.FileMode, options *Options) (*DB, error) { if _, err := db.file.ReadAt(buf[:], 0); err == nil { m := db.pageInBuffer(buf[:], 0).meta() if err := m.validate(); err != nil { - return nil, fmt.Errorf("meta0 error: %s", err) + return nil, err } db.pageSize = int(m.pageSize) } } // Memory map the data file. - if err := db.mmap(0); err != nil { + if err := db.mmap(options.InitialMmapSize); err != nil { _ = db.close() return nil, err } @@ -249,10 +256,10 @@ func (db *DB) mmap(minsz int) error { // Validate the meta pages. if err := db.meta0.validate(); err != nil { - return fmt.Errorf("meta0 error: %s", err) + return err } if err := db.meta1.validate(); err != nil { - return fmt.Errorf("meta1 error: %s", err) + return err } return nil @@ -267,7 +274,7 @@ func (db *DB) munmap() error { } // mmapSize determines the appropriate size for the mmap given the current size -// of the database. The minimum size is 1MB and doubles until it reaches 1GB. +// of the database. The minimum size is 32KB and doubles until it reaches 1GB. // Returns an error if the new mmap size is greater than the max allowed. func (db *DB) mmapSize(size int) (int, error) { // Double the size from 32KB until 1GB. @@ -383,7 +390,9 @@ func (db *DB) close() error { // No need to unlock read-only file. if !db.readOnly { // Unlock the file. - _ = funlock(db.file) + if err := funlock(db.file); err != nil { + log.Printf("bolt.Close(): funlock error: %s", err) + } } // Close the file descriptor. @@ -407,6 +416,10 @@ func (db *DB) close() error { // writer to deadlock because the database periodically needs to re-mmap itself // as it grows and it cannot do that while a read transaction is open. // +// If a long running read transaction (for example, a snapshot transaction) is +// needed, you might want to set DB.InitialMmapSize to a large enough value +// to avoid potential blocking of write transaction. +// // IMPORTANT: You must close read-only transactions after you are finished or // else the database will not reclaim old pages. func (db *DB) Begin(writable bool) (*Tx, error) { @@ -590,6 +603,136 @@ func (db *DB) View(fn func(*Tx) error) error { return nil } +// Batch calls fn as part of a batch. It behaves similar to Update, +// except: +// +// 1. concurrent Batch calls can be combined into a single Bolt +// transaction. +// +// 2. the function passed to Batch may be called multiple times, +// regardless of whether it returns error or not. +// +// This means that Batch function side effects must be idempotent and +// take permanent effect only after a successful return is seen in +// caller. +// +// The maximum batch size and delay can be adjusted with DB.MaxBatchSize +// and DB.MaxBatchDelay, respectively. +// +// Batch is only useful when there are multiple goroutines calling it. +func (db *DB) Batch(fn func(*Tx) error) error { + errCh := make(chan error, 1) + + db.batchMu.Lock() + if (db.batch == nil) || (db.batch != nil && len(db.batch.calls) >= db.MaxBatchSize) { + // There is no existing batch, or the existing batch is full; start a new one. + db.batch = &batch{ + db: db, + } + db.batch.timer = time.AfterFunc(db.MaxBatchDelay, db.batch.trigger) + } + db.batch.calls = append(db.batch.calls, call{fn: fn, err: errCh}) + if len(db.batch.calls) >= db.MaxBatchSize { + // wake up batch, it's ready to run + go db.batch.trigger() + } + db.batchMu.Unlock() + + err := <-errCh + if err == trySolo { + err = db.Update(fn) + } + return err +} + +type call struct { + fn func(*Tx) error + err chan<- error +} + +type batch struct { + db *DB + timer *time.Timer + start sync.Once + calls []call +} + +// trigger runs the batch if it hasn't already been run. +func (b *batch) trigger() { + b.start.Do(b.run) +} + +// run performs the transactions in the batch and communicates results +// back to DB.Batch. +func (b *batch) run() { + b.db.batchMu.Lock() + b.timer.Stop() + // Make sure no new work is added to this batch, but don't break + // other batches. + if b.db.batch == b { + b.db.batch = nil + } + b.db.batchMu.Unlock() + +retry: + for len(b.calls) > 0 { + var failIdx = -1 + err := b.db.Update(func(tx *Tx) error { + for i, c := range b.calls { + if err := safelyCall(c.fn, tx); err != nil { + failIdx = i + return err + } + } + return nil + }) + + if failIdx >= 0 { + // take the failing transaction out of the batch. it's + // safe to shorten b.calls here because db.batch no longer + // points to us, and we hold the mutex anyway. + c := b.calls[failIdx] + b.calls[failIdx], b.calls = b.calls[len(b.calls)-1], b.calls[:len(b.calls)-1] + // tell the submitter re-run it solo, continue with the rest of the batch + c.err <- trySolo + continue retry + } + + // pass success, or bolt internal errors, to all callers + for _, c := range b.calls { + if c.err != nil { + c.err <- err + } + } + break retry + } +} + +// trySolo is a special sentinel error value used for signaling that a +// transaction function should be re-run. It should never be seen by +// callers. +var trySolo = errors.New("batch function returned an error and should be re-run solo") + +type panicked struct { + reason interface{} +} + +func (p panicked) Error() string { + if err, ok := p.reason.(error); ok { + return err.Error() + } + return fmt.Sprintf("panic: %v", p.reason) +} + +func safelyCall(fn func(*Tx) error, tx *Tx) (err error) { + defer func() { + if p := recover(); p != nil { + err = panicked{p} + } + }() + return fn(tx) +} + // Sync executes fdatasync() against the database file handle. // // This is not necessary under normal operation, however, if you use NoSync @@ -704,6 +847,19 @@ type Options struct { // Open database in read-only mode. Uses flock(..., LOCK_SH |LOCK_NB) to // grab a shared lock (UNIX). ReadOnly bool + + // Sets the DB.MmapFlags flag before memory mapping the file. + MmapFlags int + + // InitialMmapSize is the initial mmap size of the database + // in bytes. Read transactions won't block write transaction + // if the InitialMmapSize is large enough to hold database mmap + // size. (See DB.Begin for more information) + // + // If <=0, the initial map size is 0. + // If initialMmapSize is smaller than the previous database size, + // it takes no effect. + InitialMmapSize int } // DefaultOptions represent the options used if nil options are passed into Open(). |