wait.go 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345
  1. // Copyright 2019 Yunion
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package wait
  15. import (
  16. "errors"
  17. "math/rand"
  18. "time"
  19. "yunion.io/x/pkg/util/runtime"
  20. )
  21. // For any test of the style:
  22. // ...
  23. // <- time.After(timeout):
  24. // t.Errorf("Timed out")
  25. // The value for timeout should effectively be "forever." Obviously we don't want our tests to truly lock up forever, but 30s
  26. // is long enough that it is effectively forever for the things that can slow down a run on a heavily contended machine
  27. // (GC, seeks, etc), but not so long as to make a developer ctrl-c a test run if they do happen to break that test.
  28. var ForeverTestTimeout = time.Second * 30
  29. // NeverStop may be passed to Until to make it never stop.
  30. var NeverStop <-chan struct{} = make(chan struct{})
  31. // Forever calls f every period for ever.
  32. //
  33. // Forever is syntactic sugar on top of Until.
  34. func Forever(f func(), period time.Duration) {
  35. Until(f, period, NeverStop)
  36. }
  37. // Until loops until stop channel is closed, running f every period.
  38. //
  39. // Until is syntactic sugar on top of JitterUntil with zero jitter factor and
  40. // with sliding = true (which means the timer for period starts after the f
  41. // completes).
  42. func Until(f func(), period time.Duration, stopCh <-chan struct{}) {
  43. JitterUntil(f, period, 0.0, true, stopCh)
  44. }
  45. // NonSlidingUntil loops until stop channel is closed, running f every
  46. // period.
  47. //
  48. // NonSlidingUntil is syntactic sugar on top of JitterUntil with zero jitter
  49. // factor, with sliding = false (meaning the timer for period starts at the same
  50. // time as the function starts).
  51. func NonSlidingUntil(f func(), period time.Duration, stopCh <-chan struct{}) {
  52. JitterUntil(f, period, 0.0, false, stopCh)
  53. }
  54. // JitterUntil loops until stop channel is closed, running f every period.
  55. //
  56. // If jitterFactor is positive, the period is jittered before every run of f.
  57. // If jitterFactor is not positive, the period is unchanged and not jitterd.
  58. //
  59. // If sliding is true, the period is computed after f runs. If it is false then
  60. // period includes the runtime for f.
  61. //
  62. // Close stopCh to stop. f may not be invoked if stop channel is already
  63. // closed. Pass NeverStop to if you don't want it stop.
  64. func JitterUntil(f func(), period time.Duration, jitterFactor float64, sliding bool, stopCh <-chan struct{}) {
  65. var t *time.Timer
  66. var sawTimeout bool
  67. for {
  68. select {
  69. case <-stopCh:
  70. return
  71. default:
  72. }
  73. jitteredPeriod := period
  74. if jitterFactor > 0.0 {
  75. jitteredPeriod = Jitter(period, jitterFactor)
  76. }
  77. if !sliding {
  78. t = resetOrReuseTimer(t, jitteredPeriod, sawTimeout)
  79. }
  80. func() {
  81. defer runtime.HandleCrash()
  82. f()
  83. }()
  84. if sliding {
  85. t = resetOrReuseTimer(t, jitteredPeriod, sawTimeout)
  86. }
  87. // NOTE: b/c there is no priority selection in golang
  88. // it is possible for this to race, meaning we could
  89. // trigger t.C and stopCh, and t.C select falls through.
  90. // In order to mitigate we re-check stopCh at the beginning
  91. // of every loop to prevent extra executions of f().
  92. select {
  93. case <-stopCh:
  94. return
  95. case <-t.C:
  96. sawTimeout = true
  97. }
  98. }
  99. }
  100. // Jitter returns a time.Duration between duration and duration + maxFactor *
  101. // duration.
  102. //
  103. // This allows clients to avoid converging on periodic behavior. If maxFactor
  104. // is 0.0, a suggested default value will be chosen.
  105. func Jitter(duration time.Duration, maxFactor float64) time.Duration {
  106. if maxFactor <= 0.0 {
  107. maxFactor = 1.0
  108. }
  109. wait := duration + time.Duration(rand.Float64()*maxFactor*float64(duration))
  110. return wait
  111. }
  112. // ErrWaitTimeout is returned when the condition exited without success.
  113. var ErrWaitTimeout = errors.New("timed out waiting for the condition")
  114. // ConditionFunc returns true if the condition is satisfied, or an error
  115. // if the loop should be aborted.
  116. type ConditionFunc func() (done bool, err error)
  117. // Backoff holds parameters applied to a Backoff function.
  118. type Backoff struct {
  119. Duration time.Duration // the base duration
  120. Factor float64 // Duration is multipled by factor each iteration
  121. Jitter float64 // The amount of jitter applied each iteration
  122. Steps int // Exit with error after this many steps
  123. }
  124. // ExponentialBackoff repeats a condition check with exponential backoff.
  125. //
  126. // It checks the condition up to Steps times, increasing the wait by multipling
  127. // the previous duration by Factor.
  128. //
  129. // If Jitter is greater than zero, a random amount of each duration is added
  130. // (between duration and duration*(1+jitter)).
  131. //
  132. // If the condition never returns true, ErrWaitTimeout is returned. All other
  133. // errors terminate immediately.
  134. func ExponentialBackoff(backoff Backoff, condition ConditionFunc) error {
  135. duration := backoff.Duration
  136. for i := 0; i < backoff.Steps; i++ {
  137. if i != 0 {
  138. adjusted := duration
  139. if backoff.Jitter > 0.0 {
  140. adjusted = Jitter(duration, backoff.Jitter)
  141. }
  142. time.Sleep(adjusted)
  143. duration = time.Duration(float64(duration) * backoff.Factor)
  144. }
  145. if ok, err := condition(); err != nil || ok {
  146. return err
  147. }
  148. }
  149. return ErrWaitTimeout
  150. }
  151. // Poll tries a condition func until it returns true, an error, or the timeout
  152. // is reached.
  153. //
  154. // Poll always waits the interval before the run of 'condition'.
  155. // 'condition' will always be invoked at least once.
  156. //
  157. // Some intervals may be missed if the condition takes too long or the time
  158. // window is too short.
  159. //
  160. // If you want to Poll something forever, see PollInfinite.
  161. func Poll(interval, timeout time.Duration, condition ConditionFunc) error {
  162. return pollInternal(poller(interval, timeout), condition)
  163. }
  164. func pollInternal(wait WaitFunc, condition ConditionFunc) error {
  165. return WaitFor(wait, condition, NeverStop)
  166. }
  167. // PollImmediate tries a condition func until it returns true, an error, or the timeout
  168. // is reached.
  169. //
  170. // Poll always checks 'condition' before waiting for the interval. 'condition'
  171. // will always be invoked at least once.
  172. //
  173. // Some intervals may be missed if the condition takes too long or the time
  174. // window is too short.
  175. //
  176. // If you want to Poll something forever, see PollInfinite.
  177. func PollImmediate(interval, timeout time.Duration, condition ConditionFunc) error {
  178. return pollImmediateInternal(poller(interval, timeout), condition)
  179. }
  180. func pollImmediateInternal(wait WaitFunc, condition ConditionFunc) error {
  181. done, err := condition()
  182. if err != nil {
  183. return err
  184. }
  185. if done {
  186. return nil
  187. }
  188. return pollInternal(wait, condition)
  189. }
  190. // PollInfinite tries a condition func until it returns true or an error
  191. //
  192. // PollInfinite always waits the interval before the run of 'condition'.
  193. //
  194. // Some intervals may be missed if the condition takes too long or the time
  195. // window is too short.
  196. func PollInfinite(interval time.Duration, condition ConditionFunc) error {
  197. done := make(chan struct{})
  198. defer close(done)
  199. return PollUntil(interval, condition, done)
  200. }
  201. // PollImmediateInfinite tries a condition func until it returns true or an error
  202. //
  203. // PollImmediateInfinite runs the 'condition' before waiting for the interval.
  204. //
  205. // Some intervals may be missed if the condition takes too long or the time
  206. // window is too short.
  207. func PollImmediateInfinite(interval time.Duration, condition ConditionFunc) error {
  208. done, err := condition()
  209. if err != nil {
  210. return err
  211. }
  212. if done {
  213. return nil
  214. }
  215. return PollInfinite(interval, condition)
  216. }
  217. // PollUntil tries a condition func until it returns true, an error or stopCh is
  218. // closed.
  219. //
  220. // PolUntil always waits interval before the first run of 'condition'.
  221. // 'condition' will always be invoked at least once.
  222. func PollUntil(interval time.Duration, condition ConditionFunc, stopCh <-chan struct{}) error {
  223. return WaitFor(poller(interval, 0), condition, stopCh)
  224. }
  225. // WaitFunc creates a channel that receives an item every time a test
  226. // should be executed and is closed when the last test should be invoked.
  227. type WaitFunc func(done <-chan struct{}) <-chan struct{}
  228. // WaitFor continually checks 'fn' as driven by 'wait'.
  229. //
  230. // WaitFor gets a channel from 'wait()'', and then invokes 'fn' once for every value
  231. // placed on the channel and once more when the channel is closed.
  232. //
  233. // If 'fn' returns an error the loop ends and that error is returned, and if
  234. // 'fn' returns true the loop ends and nil is returned.
  235. //
  236. // ErrWaitTimeout will be returned if the channel is closed without fn ever
  237. // returning true.
  238. func WaitFor(wait WaitFunc, fn ConditionFunc, done <-chan struct{}) error {
  239. c := wait(done)
  240. for {
  241. _, open := <-c
  242. ok, err := fn()
  243. if err != nil {
  244. return err
  245. }
  246. if ok {
  247. return nil
  248. }
  249. if !open {
  250. break
  251. }
  252. }
  253. return ErrWaitTimeout
  254. }
  255. // poller returns a WaitFunc that will send to the channel every interval until
  256. // timeout has elapsed and then closes the channel.
  257. //
  258. // Over very short intervals you may receive no ticks before the channel is
  259. // closed. A timeout of 0 is interpreted as an infinity.
  260. //
  261. // Output ticks are not buffered. If the channel is not ready to receive an
  262. // item, the tick is skipped.
  263. func poller(interval, timeout time.Duration) WaitFunc {
  264. return WaitFunc(func(done <-chan struct{}) <-chan struct{} {
  265. ch := make(chan struct{})
  266. go func() {
  267. defer close(ch)
  268. tick := time.NewTicker(interval)
  269. defer tick.Stop()
  270. var after <-chan time.Time
  271. if timeout != 0 {
  272. // time.After is more convenient, but it
  273. // potentially leaves timers around much longer
  274. // than necessary if we exit early.
  275. timer := time.NewTimer(timeout)
  276. after = timer.C
  277. defer timer.Stop()
  278. }
  279. for {
  280. select {
  281. case <-tick.C:
  282. // If the consumer isn't ready for this signal drop it and
  283. // check the other channels.
  284. select {
  285. case ch <- struct{}{}:
  286. default:
  287. }
  288. case <-after:
  289. return
  290. case <-done:
  291. return
  292. }
  293. }
  294. }()
  295. return ch
  296. })
  297. }
  298. // resetOrReuseTimer avoids allocating a new timer if one is already in use.
  299. // Not safe for multiple threads.
  300. func resetOrReuseTimer(t *time.Timer, d time.Duration, sawTimeout bool) *time.Timer {
  301. if t == nil {
  302. return time.NewTimer(d)
  303. }
  304. if !t.Stop() && !sawTimeout {
  305. <-t.C
  306. }
  307. t.Reset(d)
  308. return t
  309. }