funcs.go 3.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169
  1. package stm
  2. import (
  3. "math/rand"
  4. "runtime/pprof"
  5. "sync"
  6. "time"
  7. )
  8. var (
  9. txPool = sync.Pool{New: func() any {
  10. expvars.Add("new txs", 1)
  11. tx := &Tx{
  12. reads: make(map[txVar]VarValue),
  13. writes: make(map[txVar]any),
  14. watching: make(map[txVar]struct{}),
  15. }
  16. tx.cond.L = &tx.mu
  17. return tx
  18. }}
  19. failedCommitsProfile *pprof.Profile
  20. )
  21. const (
  22. profileFailedCommits = false
  23. sleepBetweenRetries = false
  24. )
  25. func init() {
  26. if profileFailedCommits {
  27. failedCommitsProfile = pprof.NewProfile("stmFailedCommits")
  28. }
  29. }
  30. func newTx() *Tx {
  31. tx := txPool.Get().(*Tx)
  32. tx.tries = 0
  33. tx.completed = false
  34. return tx
  35. }
  36. func WouldBlock[R any](fn Operation[R]) (block bool) {
  37. tx := newTx()
  38. tx.reset()
  39. _, block = catchRetry(fn, tx)
  40. if len(tx.watching) != 0 {
  41. panic("shouldn't have installed any watchers")
  42. }
  43. tx.recycle()
  44. return
  45. }
  46. // Atomically executes the atomic function fn.
  47. func Atomically[R any](op Operation[R]) R {
  48. expvars.Add("atomically", 1)
  49. // run the transaction
  50. tx := newTx()
  51. retry:
  52. tx.tries++
  53. tx.reset()
  54. if sleepBetweenRetries {
  55. shift := int64(tx.tries - 1)
  56. const maxShift = 30
  57. if shift > maxShift {
  58. shift = maxShift
  59. }
  60. ns := int64(1) << shift
  61. d := time.Duration(rand.Int63n(ns))
  62. if d > 100*time.Microsecond {
  63. tx.updateWatchers()
  64. time.Sleep(time.Duration(ns))
  65. }
  66. }
  67. tx.mu.Lock()
  68. ret, retry := catchRetry(op, tx)
  69. tx.mu.Unlock()
  70. if retry {
  71. expvars.Add("retries", 1)
  72. // wait for one of the variables we read to change before retrying
  73. tx.wait()
  74. goto retry
  75. }
  76. // verify the read log
  77. tx.lockAllVars()
  78. if tx.inputsChanged() {
  79. tx.unlock()
  80. expvars.Add("failed commits", 1)
  81. if profileFailedCommits {
  82. failedCommitsProfile.Add(new(int), 0)
  83. }
  84. goto retry
  85. }
  86. // commit the write log and broadcast that variables have changed
  87. tx.commit()
  88. tx.mu.Lock()
  89. tx.completed = true
  90. tx.cond.Broadcast()
  91. tx.mu.Unlock()
  92. tx.unlock()
  93. expvars.Add("commits", 1)
  94. tx.recycle()
  95. return ret
  96. }
  97. // AtomicGet is a helper function that atomically reads a value.
  98. func AtomicGet[T any](v *Var[T]) T {
  99. return v.value.Load().Get().(T)
  100. }
  101. // AtomicSet is a helper function that atomically writes a value.
  102. func AtomicSet[T any](v *Var[T], val T) {
  103. v.mu.Lock()
  104. v.changeValue(val)
  105. v.mu.Unlock()
  106. }
  107. // Compose is a helper function that composes multiple transactions into a
  108. // single transaction.
  109. func Compose[R any](fns ...Operation[R]) Operation[struct{}] {
  110. return VoidOperation(func(tx *Tx) {
  111. for _, f := range fns {
  112. f(tx)
  113. }
  114. })
  115. }
  116. // Select runs the supplied functions in order. Execution stops when a
  117. // function succeeds without calling Retry. If no functions succeed, the
  118. // entire selection will be retried.
  119. func Select[R any](fns ...Operation[R]) Operation[R] {
  120. return func(tx *Tx) R {
  121. switch len(fns) {
  122. case 0:
  123. // empty Select blocks forever
  124. tx.Retry()
  125. panic("unreachable")
  126. case 1:
  127. return fns[0](tx)
  128. default:
  129. oldWrites := tx.writes
  130. tx.writes = make(map[txVar]any, len(oldWrites))
  131. for k, v := range oldWrites {
  132. tx.writes[k] = v
  133. }
  134. ret, retry := catchRetry(fns[0], tx)
  135. if retry {
  136. tx.writes = oldWrites
  137. return Select(fns[1:]...)(tx)
  138. } else {
  139. return ret
  140. }
  141. }
  142. }
  143. }
  144. type Operation[R any] func(*Tx) R
  145. func VoidOperation(f func(*Tx)) Operation[struct{}] {
  146. return func(tx *Tx) struct{} {
  147. f(tx)
  148. return struct{}{}
  149. }
  150. }
  151. func AtomicModify[T any](v *Var[T], f func(T) T) {
  152. Atomically(VoidOperation(func(tx *Tx) {
  153. v.Set(tx, f(v.Get(tx)))
  154. }))
  155. }