1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
|
package stm
import (
"runtime/pprof"
"sync"
)
var (
txPool = sync.Pool{New: func() interface{} {
expvars.Add("new txs", 1)
tx := &Tx{
reads: make(map[*Var]uint64),
writes: make(map[*Var]interface{}),
}
tx.cond.L = &tx.mu
return tx
}}
failedCommitsProfile *pprof.Profile
)
const profileFailedCommits = false
func init() {
if profileFailedCommits {
failedCommitsProfile = pprof.NewProfile("stmFailedCommits")
}
}
func newTx() *Tx {
return txPool.Get().(*Tx)
}
func WouldBlock(fn Operation) (block bool) {
tx := newTx()
tx.reset()
_, block = catchRetry(fn, tx)
return
}
// Atomically executes the atomic function fn.
func Atomically(op Operation) interface{} {
expvars.Add("atomically", 1)
// run the transaction
tx := newTx()
retry:
tx.reset()
ret, retry := catchRetry(op, tx)
if retry {
expvars.Add("retries", 1)
// wait for one of the variables we read to change before retrying
tx.wait()
goto retry
}
// verify the read log
tx.lockAllVars()
if !tx.verify() {
tx.unlock()
expvars.Add("failed commits", 1)
if profileFailedCommits {
failedCommitsProfile.Add(new(int), 0)
}
goto retry
}
// commit the write log and broadcast that variables have changed
tx.commit()
tx.unlock()
expvars.Add("commits", 1)
tx.recycle()
return ret
}
// AtomicGet is a helper function that atomically reads a value.
func AtomicGet(v *Var) interface{} {
return v.loadState().val
}
// AtomicSet is a helper function that atomically writes a value.
func AtomicSet(v *Var, val interface{}) {
v.mu.Lock()
v.changeValue(val)
v.mu.Unlock()
}
// Compose is a helper function that composes multiple transactions into a
// single transaction.
func Compose(fns ...Operation) Operation {
return func(tx *Tx) interface{} {
for _, f := range fns {
f(tx)
}
return nil
}
}
// Select runs the supplied functions in order. Execution stops when a
// function succeeds without calling Retry. If no functions succeed, the
// entire selection will be retried.
func Select(fns ...Operation) Operation {
return func(tx *Tx) interface{} {
switch len(fns) {
case 0:
// empty Select blocks forever
tx.Retry()
panic("unreachable")
case 1:
return fns[0](tx)
default:
oldWrites := tx.writes
tx.writes = make(map[*Var]interface{}, len(oldWrites))
for k, v := range oldWrites {
tx.writes[k] = v
}
ret, retry := catchRetry(fns[0], tx)
if retry {
tx.writes = oldWrites
return Select(fns[1:]...)(tx)
} else {
return ret
}
}
}
}
type Operation func(*Tx) interface{}
func VoidOperation(f func(*Tx)) Operation {
return func(tx *Tx) interface{} {
f(tx)
return nil
}
}
|