var.go 1.5 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576
  1. package stm
  2. import (
  3. "sync"
  4. "github.com/alecthomas/atomic"
  5. )
  6. // Holds an STM variable.
  7. type Var[T any] struct {
  8. value atomic.Value[VarValue]
  9. watchers sync.Map
  10. mu sync.Mutex
  11. }
  12. func (v *Var[T]) getValue() *atomic.Value[VarValue] {
  13. return &v.value
  14. }
  15. func (v *Var[T]) getWatchers() *sync.Map {
  16. return &v.watchers
  17. }
  18. func (v *Var[T]) getLock() *sync.Mutex {
  19. return &v.mu
  20. }
  21. func (v *Var[T]) changeValue(new any) {
  22. old := v.value.Load()
  23. newVarValue := old.Set(new)
  24. v.value.Store(newVarValue)
  25. if old.Changed(newVarValue) {
  26. go v.wakeWatchers(newVarValue)
  27. }
  28. }
  29. func (v *Var[T]) wakeWatchers(new VarValue) {
  30. v.watchers.Range(func(k, _ any) bool {
  31. tx := k.(*Tx)
  32. // We have to lock here to ensure that the Tx is waiting before we signal it. Otherwise we
  33. // could signal it before it goes to sleep and it will miss the notification.
  34. tx.mu.Lock()
  35. if read := tx.reads[v]; read != nil && read.Changed(new) {
  36. tx.cond.Broadcast()
  37. for !tx.waiting && !tx.completed {
  38. tx.cond.Wait()
  39. }
  40. }
  41. tx.mu.Unlock()
  42. return !v.value.Load().Changed(new)
  43. })
  44. }
  45. // Returns a new STM variable.
  46. func NewVar[T any](val T) *Var[T] {
  47. v := &Var[T]{}
  48. v.value.Store(versionedValue[T]{
  49. value: val,
  50. })
  51. return v
  52. }
  53. func NewCustomVar[T any](val T, changed func(T, T) bool) *Var[T] {
  54. v := &Var[T]{}
  55. v.value.Store(customVarValue[T]{
  56. value: val,
  57. changed: changed,
  58. })
  59. return v
  60. }
  61. func NewBuiltinEqVar[T comparable](val T) *Var[T] {
  62. return NewCustomVar(val, func(a, b T) bool {
  63. return a != b
  64. })
  65. }