controller.go 1.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899
  1. package backoff
  2. import (
  3. "context"
  4. "sync"
  5. "time"
  6. )
  7. type controller struct {
  8. ctx context.Context
  9. cancel func()
  10. ig IntervalGenerator
  11. maxRetries int
  12. mu *sync.RWMutex
  13. next chan struct{} // user-facing channel
  14. resetTimer chan time.Duration
  15. retries int
  16. timer *time.Timer
  17. }
  18. func newController(ctx context.Context, ig IntervalGenerator, options ...ControllerOption) *controller {
  19. cctx, cancel := context.WithCancel(ctx) // DO NOT fire this cancel here
  20. maxRetries := 10
  21. for _, option := range options {
  22. switch option.Ident() {
  23. case identMaxRetries{}:
  24. maxRetries = option.Value().(int)
  25. }
  26. }
  27. c := &controller{
  28. cancel: cancel,
  29. ctx: cctx,
  30. ig: ig,
  31. maxRetries: maxRetries,
  32. mu: &sync.RWMutex{},
  33. next: make(chan struct{}, 1),
  34. resetTimer: make(chan time.Duration, 1),
  35. timer: time.NewTimer(ig.Next()),
  36. }
  37. // enqueue a single fake event so the user gets to retry once
  38. c.next <- struct{}{}
  39. go c.loop()
  40. return c
  41. }
  42. func (c *controller) loop() {
  43. for {
  44. select {
  45. case <-c.ctx.Done():
  46. return
  47. case d := <-c.resetTimer:
  48. if !c.timer.Stop() {
  49. select {
  50. case <-c.timer.C:
  51. default:
  52. }
  53. }
  54. c.timer.Reset(d)
  55. case <-c.timer.C:
  56. select {
  57. case <-c.ctx.Done():
  58. return
  59. case c.next <- struct{}{}:
  60. }
  61. if c.maxRetries > 0 {
  62. c.retries++
  63. }
  64. if !c.check() {
  65. c.cancel()
  66. return
  67. }
  68. c.resetTimer <- c.ig.Next()
  69. }
  70. }
  71. }
  72. func (c *controller) check() bool {
  73. if c.maxRetries > 0 && c.retries >= c.maxRetries {
  74. return false
  75. }
  76. return true
  77. }
  78. func (c *controller) Done() <-chan struct{} {
  79. c.mu.RLock()
  80. defer c.mu.RUnlock()
  81. return c.ctx.Done()
  82. }
  83. func (c *controller) Next() <-chan struct{} {
  84. c.mu.RLock()
  85. defer c.mu.RUnlock()
  86. return c.next
  87. }