diff options
-rw-r--r-- | README.md | 22 | ||||
-rw-r--r-- | bolt_unix.go | 24 | ||||
-rw-r--r-- | bolt_windows.go | 10 | ||||
-rw-r--r-- | db.go | 61 | ||||
-rw-r--r-- | db_test.go | 115 | ||||
-rw-r--r-- | errors.go | 4 | ||||
-rw-r--r-- | node.go | 13 | ||||
-rw-r--r-- | page.go | 6 | ||||
-rw-r--r-- | tx.go | 36 | ||||
-rw-r--r-- | tx_test.go | 32 |
10 files changed, 294 insertions, 29 deletions
@@ -87,6 +87,11 @@ are not thread safe. To work with data in multiple goroutines you must start a transaction for each one or use locking to ensure only one goroutine accesses a transaction at a time. Creating transaction from the `DB` is thread safe. +Read-only transactions and read-write transactions should not depend on one +another and generally shouldn't be opened simultaneously in the same goroutine. +This can cause a deadlock as the read-write transaction needs to periodically +re-map the data file but it cannot do so while a read-only transaction is open. + #### Read-write transactions @@ -446,6 +451,21 @@ It's also useful to pipe these stats to a service such as statsd for monitoring or to provide an HTTP endpoint that will perform a fixed-length sample. +### Read-Only Mode + +Sometimes it is useful to create a shared, read-only Bolt database. To this, +set the `Options.ReadOnly` flag when opening your database. Read-only mode +uses a shared lock to allow multiple processes to read from the database but +it will block any processes from opening the database in read-write mode. + +```go +db, err := bolt.Open("my.db", 0666, &bolt.Options{ReadOnly: true}) +if err != nil { + log.Fatal(err) +} +``` + + ## Resources For more information on getting started with Bolt, check out the following articles: @@ -593,5 +613,7 @@ Below is a list of public, open source projects that use Bolt: * [Seaweed File System](https://github.com/chrislusf/weed-fs) - Highly scalable distributed key~file system with O(1) disk read. * [InfluxDB](http://influxdb.com) - Scalable datastore for metrics, events, and real-time analytics. * [Freehold](http://tshannon.bitbucket.org/freehold/) - An open, secure, and lightweight platform for your files and data. +* [Prometheus Annotation Server](https://github.com/oliver006/prom_annotation_server) - Annotation server for PromDash & Prometheus service monitoring system. +* [Consul](https://github.com/hashicorp/consul) - Consul is service discovery and configuration made easy. Distributed, highly available, and datacenter-aware. If you are using Bolt in a project please send a pull request to add it to the list. diff --git a/bolt_unix.go b/bolt_unix.go index 35dce08..17ca318 100644 --- a/bolt_unix.go +++ b/bolt_unix.go @@ -11,7 +11,7 @@ import ( ) // flock acquires an advisory lock on a file descriptor. -func flock(f *os.File, timeout time.Duration) error { +func flock(f *os.File, exclusive bool, timeout time.Duration) error { var t time.Time for { // If we're beyond our timeout then return an error. @@ -21,9 +21,13 @@ func flock(f *os.File, timeout time.Duration) error { } else if timeout > 0 && time.Since(t) > timeout { return ErrTimeout } + flag := syscall.LOCK_SH + if exclusive { + flag = syscall.LOCK_EX + } // Otherwise attempt to obtain an exclusive lock. - err := syscall.Flock(int(f.Fd()), syscall.LOCK_EX|syscall.LOCK_NB) + err := syscall.Flock(int(f.Fd()), flag|syscall.LOCK_NB) if err == nil { return nil } else if err != syscall.EWOULDBLOCK { @@ -44,7 +48,7 @@ func funlock(f *os.File) error { func mmap(db *DB, sz int) error { // Truncate and fsync to ensure file size metadata is flushed. // https://github.com/boltdb/bolt/issues/284 - if !db.NoGrowSync { + if !db.NoGrowSync && !db.readOnly { if err := db.file.Truncate(int64(sz)); err != nil { return fmt.Errorf("file resize error: %s", err) } @@ -59,6 +63,11 @@ func mmap(db *DB, sz int) error { return err } + // Advise the kernel that the mmap is accessed randomly. + if err := madvise(b, syscall.MADV_RANDOM); err != nil { + return fmt.Errorf("madvise: %s", err) + } + // Save the original byte slice and convert to a byte array pointer. db.dataref = b db.data = (*[maxMapSize]byte)(unsafe.Pointer(&b[0])) @@ -80,3 +89,12 @@ func munmap(db *DB) error { db.datasz = 0 return err } + +// NOTE: This function is copied from stdlib because it is not available on darwin. +func madvise(b []byte, advice int) (err error) { + _, _, e1 := syscall.Syscall(syscall.SYS_MADVISE, uintptr(unsafe.Pointer(&b[0])), uintptr(len(b)), uintptr(advice)) + if e1 != 0 { + err = e1 + } + return +} diff --git a/bolt_windows.go b/bolt_windows.go index c8539d4..8b782be 100644 --- a/bolt_windows.go +++ b/bolt_windows.go @@ -16,7 +16,7 @@ func fdatasync(db *DB) error { } // flock acquires an advisory lock on a file descriptor. -func flock(f *os.File, _ time.Duration) error { +func flock(f *os.File, _ bool, _ time.Duration) error { return nil } @@ -28,9 +28,11 @@ func funlock(f *os.File) error { // mmap memory maps a DB's data file. // Based on: https://github.com/edsrzf/mmap-go func mmap(db *DB, sz int) error { - // Truncate the database to the size of the mmap. - if err := db.file.Truncate(int64(sz)); err != nil { - return fmt.Errorf("truncate: %s", err) + if !db.readOnly { + // Truncate the database to the size of the mmap. + if err := db.file.Truncate(int64(sz)); err != nil { + return fmt.Errorf("truncate: %s", err) + } } // Open a file mapping handle. @@ -104,6 +104,10 @@ type DB struct { ops struct { writeAt func(b []byte, off int64) (n int, err error) } + + // Read only mode. + // When true, Update() and Begin(true) return ErrDatabaseReadOnly immediately. + readOnly bool } // Path returns the path to currently open database file. @@ -137,19 +141,28 @@ func Open(path string, mode os.FileMode, options *Options) (*DB, error) { db.MaxBatchSize = DefaultMaxBatchSize db.MaxBatchDelay = DefaultMaxBatchDelay + flag := os.O_RDWR + if options.ReadOnly { + flag = os.O_RDONLY + db.readOnly = true + } + // Open data file and separate sync handler for metadata writes. db.path = path - var err error - if db.file, err = os.OpenFile(db.path, os.O_RDWR|os.O_CREATE, mode); err != nil { + if db.file, err = os.OpenFile(db.path, flag|os.O_CREATE, mode); err != nil { _ = db.close() return nil, err } - // Lock file so that other processes using Bolt cannot use the database - // at the same time. This would cause corruption since the two processes - // would write meta pages and free pages separately. - if err := flock(db.file, options.Timeout); err != nil { + // Lock file so that other processes using Bolt in read-write mode cannot + // use the database at the same time. This would cause corruption since + // the two processes would write meta pages and free pages separately. + // The database file is locked exclusively (only one process can grab the lock) + // if !options.ReadOnly. + // The database file is locked using the shared lock (more than one process may + // hold a lock at the same time) otherwise (options.ReadOnly is set). + if err := flock(db.file, !db.readOnly, options.Timeout); err != nil { _ = db.close() return nil, err } @@ -256,8 +269,8 @@ func (db *DB) munmap() error { // of the database. The minimum size is 1MB and doubles until it reaches 1GB. // Returns an error if the new mmap size is greater than the max allowed. func (db *DB) mmapSize(size int) (int, error) { - // Double the size from 1MB until 1GB. - for i := uint(20); i <= 30; i++ { + // Double the size from 32KB until 1GB. + for i := uint(15); i <= 30; i++ { if size <= 1<<i { return 1 << i, nil } @@ -338,8 +351,15 @@ func (db *DB) init() error { // Close releases all database resources. // All transactions must be closed before closing the database. func (db *DB) Close() error { + db.rwlock.Lock() + defer db.rwlock.Unlock() + db.metalock.Lock() defer db.metalock.Unlock() + + db.mmaplock.RLock() + defer db.mmaplock.RUnlock() + return db.close() } @@ -359,8 +379,11 @@ func (db *DB) close() error { // Close file handles. if db.file != nil { - // Unlock the file. - _ = funlock(db.file) + // No need to unlock read-only file. + if !db.readOnly { + // Unlock the file. + _ = funlock(db.file) + } // Close the file descriptor. if err := db.file.Close(); err != nil { @@ -378,6 +401,11 @@ func (db *DB) close() error { // will cause the calls to block and be serialized until the current write // transaction finishes. // +// Transactions should not be depedent on one another. Opening a read +// transaction and a write transaction in the same goroutine can cause the +// writer to deadlock because the database periodically needs to re-mmap itself +// as it grows and it cannot do that while a read transaction is open. +// // IMPORTANT: You must close read-only transactions after you are finished or // else the database will not reclaim old pages. func (db *DB) Begin(writable bool) (*Tx, error) { @@ -426,6 +454,11 @@ func (db *DB) beginTx() (*Tx, error) { } func (db *DB) beginRWTx() (*Tx, error) { + // If the database was opened with Options.ReadOnly, return an error. + if db.readOnly { + return nil, ErrDatabaseReadOnly + } + // Obtain writer lock. This is released by the transaction when it closes. // This enforces only one writer transaction at a time. db.rwlock.Lock() @@ -622,6 +655,10 @@ func (db *DB) allocate(count int) (*page, error) { return p, nil } +func (db *DB) IsReadOnly() bool { + return db.readOnly +} + // Options represents the options that can be set when opening a database. type Options struct { // Timeout is the amount of time to wait to obtain a file lock. @@ -631,6 +668,10 @@ type Options struct { // Sets the DB.NoGrowSync flag before memory mapping the file. NoGrowSync bool + + // Open database in read-only mode. Uses flock(..., LOCK_SH |LOCK_NB) to + // grab a shared lock (UNIX). + ReadOnly bool } // DefaultOptions represent the options used if nil options are passed into Open(). @@ -224,6 +224,76 @@ func TestDB_Open_FileTooSmall(t *testing.T) { equals(t, errors.New("file size too small"), err) } +// Ensure that a database can be opened in read-only mode by multiple processes +// and that a database can not be opened in read-write mode and in read-only +// mode at the same time. +func TestOpen_ReadOnly(t *testing.T) { + bucket, key, value := []byte(`bucket`), []byte(`key`), []byte(`value`) + + path := tempfile() + defer os.Remove(path) + + // Open in read-write mode. + db, err := bolt.Open(path, 0666, nil) + ok(t, db.Update(func(tx *bolt.Tx) error { + b, err := tx.CreateBucket(bucket) + if err != nil { + return err + } + return b.Put(key, value) + })) + assert(t, db != nil, "") + assert(t, !db.IsReadOnly(), "") + ok(t, err) + ok(t, db.Close()) + + // Open in read-only mode. + db0, err := bolt.Open(path, 0666, &bolt.Options{ReadOnly: true}) + ok(t, err) + defer db0.Close() + + // Opening in read-write mode should return an error. + _, err = bolt.Open(path, 0666, &bolt.Options{Timeout: time.Millisecond * 100}) + assert(t, err != nil, "") + + // And again (in read-only mode). + db1, err := bolt.Open(path, 0666, &bolt.Options{ReadOnly: true}) + ok(t, err) + defer db1.Close() + + // Verify both read-only databases are accessible. + for _, db := range []*bolt.DB{db0, db1} { + // Verify is is in read only mode indeed. + assert(t, db.IsReadOnly(), "") + + // Read-only databases should not allow updates. + assert(t, + bolt.ErrDatabaseReadOnly == db.Update(func(*bolt.Tx) error { + panic(`should never get here`) + }), + "") + + // Read-only databases should not allow beginning writable txns. + _, err = db.Begin(true) + assert(t, bolt.ErrDatabaseReadOnly == err, "") + + // Verify the data. + ok(t, db.View(func(tx *bolt.Tx) error { + b := tx.Bucket(bucket) + if b == nil { + return fmt.Errorf("expected bucket `%s`", string(bucket)) + } + + got := string(b.Get(key)) + expected := string(value) + if got != expected { + return fmt.Errorf("expected `%s`, got `%s`", expected, got) + } + return nil + })) + } +} + // TODO(benbjohnson): Test corruption at every byte of the first two pages. // Ensure that a database cannot open a transaction when it's not open. @@ -254,6 +324,49 @@ func TestDB_BeginRW_Closed(t *testing.T) { assert(t, tx == nil, "") } +func TestDB_Close_PendingTx_RW(t *testing.T) { testDB_Close_PendingTx(t, true) } +func TestDB_Close_PendingTx_RO(t *testing.T) { testDB_Close_PendingTx(t, false) } + +// Ensure that a database cannot close while transactions are open. +func testDB_Close_PendingTx(t *testing.T, writable bool) { + db := NewTestDB() + defer db.Close() + + // Start transaction. + tx, err := db.Begin(true) + if err != nil { + t.Fatal(err) + } + + // Open update in separate goroutine. + done := make(chan struct{}) + go func() { + db.Close() + close(done) + }() + + // Ensure database hasn't closed. + time.Sleep(100 * time.Millisecond) + select { + case <-done: + t.Fatal("database closed too early") + default: + } + + // Commit transaction. + if err := tx.Commit(); err != nil { + t.Fatal(err) + } + + // Ensure database closed now. + time.Sleep(100 * time.Millisecond) + select { + case <-done: + default: + t.Fatal("database did not close") + } +} + // Ensure a database can provide a transactional block. func TestDB_Update(t *testing.T) { db := NewTestDB() @@ -678,7 +791,7 @@ func (db *TestDB) PrintStats() { // MustCheck runs a consistency check on the database and panics if any errors are found. func (db *TestDB) MustCheck() { - db.View(func(tx *bolt.Tx) error { + db.Update(func(tx *bolt.Tx) error { // Collect all the errors. var errors []error for err := range tx.Check() { @@ -36,6 +36,10 @@ var ( // ErrTxClosed is returned when committing or rolling back a transaction // that has already been committed or rolled back. ErrTxClosed = errors.New("tx closed") + + // ErrDatabaseReadOnly is returned when a mutating transaction is started on a + // read-only database. + ErrDatabaseReadOnly = errors.New("database is in read-only mode") ) // These errors can occur when putting or deleting a value or a bucket. @@ -221,11 +221,20 @@ func (n *node) write(p *page) { _assert(elem.pgid != p.id, "write: circular dependency occurred") } + // If the length of key+value is larger than the max allocation size + // then we need to reallocate the byte array pointer. + // + // See: https://github.com/boltdb/bolt/pull/335 + klen, vlen := len(item.key), len(item.value) + if len(b) < klen+vlen { + b = (*[maxAllocSize]byte)(unsafe.Pointer(&b[0]))[:] + } + // Write data for the element to the end of the page. copy(b[0:], item.key) - b = b[len(item.key):] + b = b[klen:] copy(b[0:], item.value) - b = b[len(item.value):] + b = b[vlen:] } // DEBUG ONLY: n.dump() @@ -96,7 +96,7 @@ type branchPageElement struct { // key returns a byte slice of the node key. func (n *branchPageElement) key() []byte { buf := (*[maxAllocSize]byte)(unsafe.Pointer(n)) - return buf[n.pos : n.pos+n.ksize] + return (*[maxAllocSize]byte)(unsafe.Pointer(&buf[n.pos]))[:n.ksize] } // leafPageElement represents a node on a leaf page. @@ -110,13 +110,13 @@ type leafPageElement struct { // key returns a byte slice of the node key. func (n *leafPageElement) key() []byte { buf := (*[maxAllocSize]byte)(unsafe.Pointer(n)) - return buf[n.pos : n.pos+n.ksize] + return (*[maxAllocSize]byte)(unsafe.Pointer(&buf[n.pos]))[:n.ksize] } // value returns a byte slice of the node value. func (n *leafPageElement) value() []byte { buf := (*[maxAllocSize]byte)(unsafe.Pointer(n)) - return buf[n.pos+n.ksize : n.pos+n.ksize+n.vsize] + return (*[maxAllocSize]byte)(unsafe.Pointer(&buf[n.pos+n.ksize]))[:n.vsize] } // PageInfo represents human readable information about a page. @@ -423,15 +423,39 @@ func (tx *Tx) write() error { // Write pages to disk in order. for _, p := range pages { size := (int(p.overflow) + 1) * tx.db.pageSize - buf := (*[maxAllocSize]byte)(unsafe.Pointer(p))[:size] offset := int64(p.id) * int64(tx.db.pageSize) - if _, err := tx.db.ops.writeAt(buf, offset); err != nil { - return err - } - // Update statistics. - tx.stats.Write++ + // Write out page in "max allocation" sized chunks. + ptr := (*[maxAllocSize]byte)(unsafe.Pointer(p)) + for { + // Limit our write to our max allocation size. + sz := size + if sz > maxAllocSize-1 { + sz = maxAllocSize - 1 + } + + // Write chunk to disk. + buf := ptr[:sz] + if _, err := tx.db.ops.writeAt(buf, offset); err != nil { + return err + } + + // Update statistics. + tx.stats.Write++ + + // Exit inner for loop if we've written all the chunks. + size -= sz + if size == 0 { + break + } + + // Otherwise move offset forward and move pointer to next chunk. + offset += int64(sz) + ptr = (*[maxAllocSize]byte)(unsafe.Pointer(&ptr[sz])) + } } + + // Ignore file sync if flag is set on DB. if !tx.db.NoSync || IgnoreNoSync { if err := fdatasync(tx.db); err != nil { return err @@ -252,6 +252,38 @@ func TestTx_DeleteBucket_NotFound(t *testing.T) { }) } +// Ensure that no error is returned when a tx.ForEach function does not return +// an error. +func TestTx_ForEach_NoError(t *testing.T) { + db := NewTestDB() + defer db.Close() + db.Update(func(tx *bolt.Tx) error { + tx.CreateBucket([]byte("widgets")) + tx.Bucket([]byte("widgets")).Put([]byte("foo"), []byte("bar")) + + equals(t, nil, tx.ForEach(func(name []byte, b *bolt.Bucket) error { + return nil + })) + return nil + }) +} + +// Ensure that an error is returned when a tx.ForEach function returns an error. +func TestTx_ForEach_WithError(t *testing.T) { + db := NewTestDB() + defer db.Close() + db.Update(func(tx *bolt.Tx) error { + tx.CreateBucket([]byte("widgets")) + tx.Bucket([]byte("widgets")).Put([]byte("foo"), []byte("bar")) + + err := errors.New("foo") + equals(t, err, tx.ForEach(func(name []byte, b *bolt.Bucket) error { + return err + })) + return nil + }) +} + // Ensure that Tx commit handlers are called after a transaction successfully commits. func TestTx_OnCommit(t *testing.T) { var x int |