aboutsummaryrefslogtreecommitdiff
path: root/functional_test.go
blob: e155bc3de0c3c79f38fb0bcef0457f813081a068 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
package bolt

import (
	"fmt"
	"os"
	"sync"
	"testing"
	"testing/quick"
	"time"

	"github.com/stretchr/testify/assert"
)

// Ensure that multiple threads can use the DB without race detector errors.
func TestParallelTransactions(t *testing.T) {
	var mutex sync.RWMutex

	err := quick.Check(func(numReaders, batchSize uint, items testdata) bool {
		// Limit the readers & writers to something reasonable.
		numReaders = (numReaders % 10) + 1
		batchSize = (batchSize % 50) + 1

		// Maintain the current dataset.
		var current testdata

		withOpenDB(func(db *DB, path string) {
			db.CreateBucket("widgets")

			// Maintain a set of concurrent readers.
			var wg sync.WaitGroup
			var c = make(chan bool, 0)
			go func() {
				var readers = make(chan int, numReaders)
				for {
					wg.Add(1)

					// Attempt to start a new reader unless we're stopped.
					select {
					case readers <- 0:
					case <-c:
						wg.Done()
						return
					}

					go func() {
						mutex.RLock()
						local := current
						txn, err := db.Transaction()
						mutex.RUnlock()
						if !assert.NoError(t, err) {
							t.FailNow()
						}

						// Verify all data is in for local data list.
						for _, item := range local {
							value, err := txn.Get("widgets", item.Key)
							if !assert.NoError(t, err) || !assert.Equal(t, value, item.Value) {
								txn.Close()
								wg.Done()
								t.FailNow()
							}
						}

						txn.Close()
						wg.Done()
						<-readers
					}()
				}
			}()

			// Batch insert items.
			pending := items
			for {
				// Determine next batch.
				currentBatchSize := int(batchSize)
				if currentBatchSize > len(pending) {
					currentBatchSize = len(pending)
				}
				batchItems := pending[0:currentBatchSize]
				pending = pending[currentBatchSize:]

				// Start write transaction.
				txn, err := db.RWTransaction()
				if !assert.NoError(t, err) {
					t.FailNow()
				}

				// Insert whole batch.
				for _, item := range batchItems {
					err := txn.Put("widgets", item.Key, item.Value)
					if !assert.NoError(t, err) {
						t.FailNow()
					}
				}

				// Commit and update the current list.
				mutex.Lock()
				err = txn.Commit()
				current = append(current, batchItems...)
				mutex.Unlock()
				if !assert.NoError(t, err) {
					t.FailNow()
				}

				// If there are no more left then exit.
				if len(pending) == 0 {
					break
				}

				time.Sleep(1 * time.Millisecond)
			}

			// Notify readers to stop.
			close(c)

			// Wait for readers to finish.
			wg.Wait()
		})
		fmt.Fprint(os.Stderr, ".")
		return true
	}, qconfig())
	assert.NoError(t, err)
	fmt.Fprint(os.Stderr, "\n")
}