pubsub.go 1.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113
  1. package pubsub
  2. import (
  3. "sync"
  4. )
  5. type PubSub[T any] struct {
  6. mu sync.Mutex
  7. next chan item[T]
  8. closed bool
  9. }
  10. type item[T any] struct {
  11. value T
  12. next chan item[T]
  13. }
  14. type Subscription[T any] struct {
  15. next chan item[T]
  16. Values chan T
  17. mu sync.Mutex
  18. closed chan struct{}
  19. }
  20. func (me *PubSub[T]) init() {
  21. me.next = make(chan item[T], 1)
  22. }
  23. func (me *PubSub[T]) lazyInit() {
  24. me.mu.Lock()
  25. defer me.mu.Unlock()
  26. if me.closed {
  27. return
  28. }
  29. if me.next == nil {
  30. me.init()
  31. }
  32. }
  33. func (me *PubSub[T]) Publish(v T) {
  34. me.lazyInit()
  35. next := make(chan item[T], 1)
  36. i := item[T]{v, next}
  37. me.mu.Lock()
  38. if !me.closed {
  39. me.next <- i
  40. me.next = next
  41. }
  42. me.mu.Unlock()
  43. }
  44. func (me *Subscription[T]) Close() {
  45. me.mu.Lock()
  46. defer me.mu.Unlock()
  47. select {
  48. case <-me.closed:
  49. default:
  50. close(me.closed)
  51. }
  52. }
  53. func (me *Subscription[T]) runner() {
  54. defer close(me.Values)
  55. for {
  56. select {
  57. case i, ok := <-me.next:
  58. if !ok {
  59. me.Close()
  60. return
  61. }
  62. // Send the value back into the channel for someone else. This
  63. // won't block because the channel has a capacity of 1, and this
  64. // is currently the only copy of this value being sent to this
  65. // channel.
  66. me.next <- i
  67. // The next value comes from the channel given to us by the value
  68. // we just got.
  69. me.next = i.next
  70. select {
  71. case me.Values <- i.value:
  72. case <-me.closed:
  73. return
  74. }
  75. case <-me.closed:
  76. return
  77. }
  78. }
  79. }
  80. func (me *PubSub[T]) Subscribe() (ret *Subscription[T]) {
  81. me.lazyInit()
  82. ret = &Subscription[T]{
  83. closed: make(chan struct{}),
  84. Values: make(chan T),
  85. }
  86. me.mu.Lock()
  87. ret.next = me.next
  88. me.mu.Unlock()
  89. go ret.runner()
  90. return
  91. }
  92. func (me *PubSub[T]) Close() {
  93. me.mu.Lock()
  94. defer me.mu.Unlock()
  95. if me.closed {
  96. return
  97. }
  98. if me.next != nil {
  99. close(me.next)
  100. }
  101. me.closed = true
  102. }