aboutsummaryrefslogtreecommitdiff
path: root/functional_test.go
diff options
context:
space:
mode:
authorBen Johnson <benbjohnson@yahoo.com>2014-02-15 21:50:34 -0700
committerBen Johnson <benbjohnson@yahoo.com>2014-02-15 21:50:34 -0700
commit7fb06feea4012543b6a1b0726a09e9a4c4a84933 (patch)
tree057b2bf9d7487343f477f62726919421dc66b6b6 /functional_test.go
parentMerge pull request #32 from benbjohnson/blocks (diff)
downloaddedo-7fb06feea4012543b6a1b0726a09e9a4c4a84933.tar.gz
dedo-7fb06feea4012543b6a1b0726a09e9a4c4a84933.tar.xz
Add parallel usage test and race detector.
Diffstat (limited to 'functional_test.go')
-rw-r--r--functional_test.go124
1 files changed, 124 insertions, 0 deletions
diff --git a/functional_test.go b/functional_test.go
new file mode 100644
index 0000000..e155bc3
--- /dev/null
+++ b/functional_test.go
@@ -0,0 +1,124 @@
+package bolt
+
+import (
+ "fmt"
+ "os"
+ "sync"
+ "testing"
+ "testing/quick"
+ "time"
+
+ "github.com/stretchr/testify/assert"
+)
+
+// Ensure that multiple threads can use the DB without race detector errors.
+func TestParallelTransactions(t *testing.T) {
+ var mutex sync.RWMutex
+
+ err := quick.Check(func(numReaders, batchSize uint, items testdata) bool {
+ // Limit the readers & writers to something reasonable.
+ numReaders = (numReaders % 10) + 1
+ batchSize = (batchSize % 50) + 1
+
+ // Maintain the current dataset.
+ var current testdata
+
+ withOpenDB(func(db *DB, path string) {
+ db.CreateBucket("widgets")
+
+ // Maintain a set of concurrent readers.
+ var wg sync.WaitGroup
+ var c = make(chan bool, 0)
+ go func() {
+ var readers = make(chan int, numReaders)
+ for {
+ wg.Add(1)
+
+ // Attempt to start a new reader unless we're stopped.
+ select {
+ case readers <- 0:
+ case <-c:
+ wg.Done()
+ return
+ }
+
+ go func() {
+ mutex.RLock()
+ local := current
+ txn, err := db.Transaction()
+ mutex.RUnlock()
+ if !assert.NoError(t, err) {
+ t.FailNow()
+ }
+
+ // Verify all data is in for local data list.
+ for _, item := range local {
+ value, err := txn.Get("widgets", item.Key)
+ if !assert.NoError(t, err) || !assert.Equal(t, value, item.Value) {
+ txn.Close()
+ wg.Done()
+ t.FailNow()
+ }
+ }
+
+ txn.Close()
+ wg.Done()
+ <-readers
+ }()
+ }
+ }()
+
+ // Batch insert items.
+ pending := items
+ for {
+ // Determine next batch.
+ currentBatchSize := int(batchSize)
+ if currentBatchSize > len(pending) {
+ currentBatchSize = len(pending)
+ }
+ batchItems := pending[0:currentBatchSize]
+ pending = pending[currentBatchSize:]
+
+ // Start write transaction.
+ txn, err := db.RWTransaction()
+ if !assert.NoError(t, err) {
+ t.FailNow()
+ }
+
+ // Insert whole batch.
+ for _, item := range batchItems {
+ err := txn.Put("widgets", item.Key, item.Value)
+ if !assert.NoError(t, err) {
+ t.FailNow()
+ }
+ }
+
+ // Commit and update the current list.
+ mutex.Lock()
+ err = txn.Commit()
+ current = append(current, batchItems...)
+ mutex.Unlock()
+ if !assert.NoError(t, err) {
+ t.FailNow()
+ }
+
+ // If there are no more left then exit.
+ if len(pending) == 0 {
+ break
+ }
+
+ time.Sleep(1 * time.Millisecond)
+ }
+
+ // Notify readers to stop.
+ close(c)
+
+ // Wait for readers to finish.
+ wg.Wait()
+ })
+ fmt.Fprint(os.Stderr, ".")
+ return true
+ }, qconfig())
+ assert.NoError(t, err)
+ fmt.Fprint(os.Stderr, "\n")
+}