aboutsummaryrefslogtreecommitdiff
path: root/funcs.go
blob: b8c8464601d244e15415d1190898f33e30167a5b (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
package stm

import (
	"math/rand"
	"runtime/pprof"
	"sync"
	"time"
)

var (
	txPool = sync.Pool{New: func() interface{} {
		expvars.Add("new txs", 1)
		tx := &Tx{
			reads:    make(map[*Var]VarValue),
			writes:   make(map[*Var]interface{}),
			watching: make(map[*Var]struct{}),
		}
		tx.cond.L = &tx.mu
		return tx
	}}
	failedCommitsProfile *pprof.Profile
)

const (
	profileFailedCommits = false
	sleepBetweenRetries  = false
)

func init() {
	if profileFailedCommits {
		failedCommitsProfile = pprof.NewProfile("stmFailedCommits")
	}
}

func newTx() *Tx {
	return txPool.Get().(*Tx)
}

func WouldBlock(fn Operation) (block bool) {
	tx := newTx()
	tx.reset()
	_, block = catchRetry(fn, tx)
	return
}

// Atomically executes the atomic function fn.
func Atomically(op Operation) interface{} {
	expvars.Add("atomically", 1)
	// run the transaction
	tx := newTx()
	tx.tries = 0
retry:
	tx.tries++
	tx.reset()
	if sleepBetweenRetries {
		shift := int64(tx.tries - 1)
		const maxShift = 30
		if shift > maxShift {
			shift = maxShift
		}
		ns := int64(1) << shift
		d := time.Duration(rand.Int63n(ns))
		if d > 100*time.Microsecond {
			tx.updateWatchers()
			time.Sleep(time.Duration(ns))
		}
	}
	ret, retry := catchRetry(op, tx)
	if retry {
		expvars.Add("retries", 1)
		// wait for one of the variables we read to change before retrying
		tx.wait()
		goto retry
	}
	// verify the read log
	tx.lockAllVars()
	if tx.inputsChanged() {
		tx.unlock()
		expvars.Add("failed commits", 1)
		if profileFailedCommits {
			failedCommitsProfile.Add(new(int), 0)
		}
		goto retry
	}
	// commit the write log and broadcast that variables have changed
	tx.commit()
	tx.mu.Lock()
	tx.completed = true
	tx.cond.Broadcast()
	tx.mu.Unlock()
	tx.unlock()
	expvars.Add("commits", 1)
	tx.recycle()
	return ret
}

// AtomicGet is a helper function that atomically reads a value.
func AtomicGet(v *Var) interface{} {
	return v.value.Load().(VarValue).Get()
}

// AtomicSet is a helper function that atomically writes a value.
func AtomicSet(v *Var, val interface{}) {
	v.mu.Lock()
	v.changeValue(val)
	v.mu.Unlock()
}

// Compose is a helper function that composes multiple transactions into a
// single transaction.
func Compose(fns ...Operation) Operation {
	return func(tx *Tx) interface{} {
		for _, f := range fns {
			f(tx)
		}
		return nil
	}
}

// Select runs the supplied functions in order. Execution stops when a
// function succeeds without calling Retry. If no functions succeed, the
// entire selection will be retried.
func Select(fns ...Operation) Operation {
	return func(tx *Tx) interface{} {
		switch len(fns) {
		case 0:
			// empty Select blocks forever
			tx.Retry()
			panic("unreachable")
		case 1:
			return fns[0](tx)
		default:
			oldWrites := tx.writes
			tx.writes = make(map[*Var]interface{}, len(oldWrites))
			for k, v := range oldWrites {
				tx.writes[k] = v
			}
			ret, retry := catchRetry(fns[0], tx)
			if retry {
				tx.writes = oldWrites
				return Select(fns[1:]...)(tx)
			} else {
				return ret
			}
		}
	}
}

type Operation func(*Tx) interface{}

func VoidOperation(f func(*Tx)) Operation {
	return func(tx *Tx) interface{} {
		f(tx)
		return nil
	}
}