aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--Makefile5
-rw-r--r--bucket.go2
-rw-r--r--buckets.go2
-rw-r--r--db.go6
-rw-r--r--db_test.go7
-rw-r--r--functional_test.go124
-rw-r--r--rwtransaction.go4
-rw-r--r--transaction.go6
8 files changed, 145 insertions, 11 deletions
diff --git a/Makefile b/Makefile
index a7cd5e7..a6ad051 100644
--- a/Makefile
+++ b/Makefile
@@ -14,6 +14,11 @@ fmt:
@go fmt ./...
test: fmt
+ @echo "=== TESTS ==="
@go test -v -cover -test.run=$(TEST)
+ @echo ""
+ @echo ""
+ @echo "=== RACE DETECTOR ==="
+ @go test -v -race -test.run=Parallel
.PHONY: bench cover fmt test
diff --git a/bucket.go b/bucket.go
index 9c9ee29..a973f0e 100644
--- a/bucket.go
+++ b/bucket.go
@@ -12,7 +12,7 @@ type Bucket struct {
// bucket represents the on-file representation of a bucket.
type bucket struct {
- root pgid
+ root pgid
sequence uint64
}
diff --git a/buckets.go b/buckets.go
index 6d9f27c..3226873 100644
--- a/buckets.go
+++ b/buckets.go
@@ -64,7 +64,7 @@ func (b *buckets) read(p *page) {
// Associate keys and items.
for index, key := range keys {
b.items[key] = &bucket{
- root: items[index].root,
+ root: items[index].root,
sequence: items[index].sequence,
}
}
diff --git a/db.go b/db.go
index 34de9f3..57ffa7b 100644
--- a/db.go
+++ b/db.go
@@ -239,10 +239,6 @@ func (db *DB) Close() {
}
func (db *DB) close() {
- // Wait for pending transactions before closing and unmapping the data.
- // db.mmaplock.Lock()
- // defer db.mmaplock.Unlock()
-
// TODO(benbjohnson): Undo everything in Open().
db.freelist = nil
db.path = ""
@@ -391,7 +387,7 @@ func (db *DB) DeleteBucket(name string) error {
// NextSequence returns an autoincrementing integer for the bucket.
// This function can return an error if the bucket does not exist.
func (db *DB) NextSequence(name string) (int, error) {
- var seq int
+ var seq int
err := db.Do(func(t *RWTransaction) error {
var err error
seq, err = t.NextSequence(name)
diff --git a/db_test.go b/db_test.go
index 1a9aa02..666ba4a 100644
--- a/db_test.go
+++ b/db_test.go
@@ -259,3 +259,10 @@ func withOpenDB(fn func(*DB, string)) {
fn(db, path)
})
}
+
+func trunc(b []byte, length int) []byte {
+ if length < len(b) {
+ return b[:length]
+ }
+ return b
+}
diff --git a/functional_test.go b/functional_test.go
new file mode 100644
index 0000000..e155bc3
--- /dev/null
+++ b/functional_test.go
@@ -0,0 +1,124 @@
+package bolt
+
+import (
+ "fmt"
+ "os"
+ "sync"
+ "testing"
+ "testing/quick"
+ "time"
+
+ "github.com/stretchr/testify/assert"
+)
+
+// Ensure that multiple threads can use the DB without race detector errors.
+func TestParallelTransactions(t *testing.T) {
+ var mutex sync.RWMutex
+
+ err := quick.Check(func(numReaders, batchSize uint, items testdata) bool {
+ // Limit the readers & writers to something reasonable.
+ numReaders = (numReaders % 10) + 1
+ batchSize = (batchSize % 50) + 1
+
+ // Maintain the current dataset.
+ var current testdata
+
+ withOpenDB(func(db *DB, path string) {
+ db.CreateBucket("widgets")
+
+ // Maintain a set of concurrent readers.
+ var wg sync.WaitGroup
+ var c = make(chan bool, 0)
+ go func() {
+ var readers = make(chan int, numReaders)
+ for {
+ wg.Add(1)
+
+ // Attempt to start a new reader unless we're stopped.
+ select {
+ case readers <- 0:
+ case <-c:
+ wg.Done()
+ return
+ }
+
+ go func() {
+ mutex.RLock()
+ local := current
+ txn, err := db.Transaction()
+ mutex.RUnlock()
+ if !assert.NoError(t, err) {
+ t.FailNow()
+ }
+
+ // Verify all data is in for local data list.
+ for _, item := range local {
+ value, err := txn.Get("widgets", item.Key)
+ if !assert.NoError(t, err) || !assert.Equal(t, value, item.Value) {
+ txn.Close()
+ wg.Done()
+ t.FailNow()
+ }
+ }
+
+ txn.Close()
+ wg.Done()
+ <-readers
+ }()
+ }
+ }()
+
+ // Batch insert items.
+ pending := items
+ for {
+ // Determine next batch.
+ currentBatchSize := int(batchSize)
+ if currentBatchSize > len(pending) {
+ currentBatchSize = len(pending)
+ }
+ batchItems := pending[0:currentBatchSize]
+ pending = pending[currentBatchSize:]
+
+ // Start write transaction.
+ txn, err := db.RWTransaction()
+ if !assert.NoError(t, err) {
+ t.FailNow()
+ }
+
+ // Insert whole batch.
+ for _, item := range batchItems {
+ err := txn.Put("widgets", item.Key, item.Value)
+ if !assert.NoError(t, err) {
+ t.FailNow()
+ }
+ }
+
+ // Commit and update the current list.
+ mutex.Lock()
+ err = txn.Commit()
+ current = append(current, batchItems...)
+ mutex.Unlock()
+ if !assert.NoError(t, err) {
+ t.FailNow()
+ }
+
+ // If there are no more left then exit.
+ if len(pending) == 0 {
+ break
+ }
+
+ time.Sleep(1 * time.Millisecond)
+ }
+
+ // Notify readers to stop.
+ close(c)
+
+ // Wait for readers to finish.
+ wg.Wait()
+ })
+ fmt.Fprint(os.Stderr, ".")
+ return true
+ }, qconfig())
+ assert.NoError(t, err)
+ fmt.Fprint(os.Stderr, "\n")
+}
diff --git a/rwtransaction.go b/rwtransaction.go
index 568960a..afb55c8 100644
--- a/rwtransaction.go
+++ b/rwtransaction.go
@@ -20,9 +20,7 @@ func (t *RWTransaction) init(db *DB) {
t.Transaction.init(db)
t.pages = make(map[pgid]*page)
- // Copy the meta and increase the transaction id.
- t.meta = &meta{}
- db.meta().copy(t.meta)
+ // Increment the transaction id.
t.meta.txnid += txnid(1)
}
diff --git a/transaction.go b/transaction.go
index 713a019..46adefe 100644
--- a/transaction.go
+++ b/transaction.go
@@ -20,9 +20,13 @@ type txnid uint64
// init initializes the transaction and associates it with a database.
func (t *Transaction) init(db *DB) {
t.db = db
- t.meta = db.meta()
t.pages = nil
+ // Copy the meta page since it can be changed by the writer.
+ t.meta = &meta{}
+ db.meta().copy(t.meta)
+
+ // Read in the buckets page.
t.buckets = &buckets{}
t.buckets.read(t.page(t.meta.buckets))
}