| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757 |
- /*
- Copyright 2014 The Kubernetes Authors.
- Licensed under the Apache License, Version 2.0 (the "License");
- you may not use this file except in compliance with the License.
- You may obtain a copy of the License at
- http://www.apache.org/licenses/LICENSE-2.0
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License.
- */
- package wait
- import (
- "context"
- "errors"
- "math"
- "math/rand"
- "sync"
- "time"
- "k8s.io/apimachinery/pkg/util/runtime"
- "k8s.io/utils/clock"
- )
- // For any test of the style:
- //
- // ...
- // <- time.After(timeout):
- // t.Errorf("Timed out")
- //
- // The value for timeout should effectively be "forever." Obviously we don't want our tests to truly lock up forever, but 30s
- // is long enough that it is effectively forever for the things that can slow down a run on a heavily contended machine
- // (GC, seeks, etc), but not so long as to make a developer ctrl-c a test run if they do happen to break that test.
- var ForeverTestTimeout = time.Second * 30
- // NeverStop may be passed to Until to make it never stop.
- var NeverStop <-chan struct{} = make(chan struct{})
- // Group allows to start a group of goroutines and wait for their completion.
- type Group struct {
- wg sync.WaitGroup
- }
- func (g *Group) Wait() {
- g.wg.Wait()
- }
- // StartWithChannel starts f in a new goroutine in the group.
- // stopCh is passed to f as an argument. f should stop when stopCh is available.
- func (g *Group) StartWithChannel(stopCh <-chan struct{}, f func(stopCh <-chan struct{})) {
- g.Start(func() {
- f(stopCh)
- })
- }
- // StartWithContext starts f in a new goroutine in the group.
- // ctx is passed to f as an argument. f should stop when ctx.Done() is available.
- func (g *Group) StartWithContext(ctx context.Context, f func(context.Context)) {
- g.Start(func() {
- f(ctx)
- })
- }
- // Start starts f in a new goroutine in the group.
- func (g *Group) Start(f func()) {
- g.wg.Add(1)
- go func() {
- defer g.wg.Done()
- f()
- }()
- }
- // Forever calls f every period for ever.
- //
- // Forever is syntactic sugar on top of Until.
- func Forever(f func(), period time.Duration) {
- Until(f, period, NeverStop)
- }
- // Until loops until stop channel is closed, running f every period.
- //
- // Until is syntactic sugar on top of JitterUntil with zero jitter factor and
- // with sliding = true (which means the timer for period starts after the f
- // completes).
- func Until(f func(), period time.Duration, stopCh <-chan struct{}) {
- JitterUntil(f, period, 0.0, true, stopCh)
- }
- // UntilWithContext loops until context is done, running f every period.
- //
- // UntilWithContext is syntactic sugar on top of JitterUntilWithContext
- // with zero jitter factor and with sliding = true (which means the timer
- // for period starts after the f completes).
- func UntilWithContext(ctx context.Context, f func(context.Context), period time.Duration) {
- JitterUntilWithContext(ctx, f, period, 0.0, true)
- }
- // NonSlidingUntil loops until stop channel is closed, running f every
- // period.
- //
- // NonSlidingUntil is syntactic sugar on top of JitterUntil with zero jitter
- // factor, with sliding = false (meaning the timer for period starts at the same
- // time as the function starts).
- func NonSlidingUntil(f func(), period time.Duration, stopCh <-chan struct{}) {
- JitterUntil(f, period, 0.0, false, stopCh)
- }
- // NonSlidingUntilWithContext loops until context is done, running f every
- // period.
- //
- // NonSlidingUntilWithContext is syntactic sugar on top of JitterUntilWithContext
- // with zero jitter factor, with sliding = false (meaning the timer for period
- // starts at the same time as the function starts).
- func NonSlidingUntilWithContext(ctx context.Context, f func(context.Context), period time.Duration) {
- JitterUntilWithContext(ctx, f, period, 0.0, false)
- }
- // JitterUntil loops until stop channel is closed, running f every period.
- //
- // If jitterFactor is positive, the period is jittered before every run of f.
- // If jitterFactor is not positive, the period is unchanged and not jittered.
- //
- // If sliding is true, the period is computed after f runs. If it is false then
- // period includes the runtime for f.
- //
- // Close stopCh to stop. f may not be invoked if stop channel is already
- // closed. Pass NeverStop to if you don't want it stop.
- func JitterUntil(f func(), period time.Duration, jitterFactor float64, sliding bool, stopCh <-chan struct{}) {
- BackoffUntil(f, NewJitteredBackoffManager(period, jitterFactor, &clock.RealClock{}), sliding, stopCh)
- }
- // BackoffUntil loops until stop channel is closed, run f every duration given by BackoffManager.
- //
- // If sliding is true, the period is computed after f runs. If it is false then
- // period includes the runtime for f.
- func BackoffUntil(f func(), backoff BackoffManager, sliding bool, stopCh <-chan struct{}) {
- var t clock.Timer
- for {
- select {
- case <-stopCh:
- return
- default:
- }
- if !sliding {
- t = backoff.Backoff()
- }
- func() {
- defer runtime.HandleCrash()
- f()
- }()
- if sliding {
- t = backoff.Backoff()
- }
- // NOTE: b/c there is no priority selection in golang
- // it is possible for this to race, meaning we could
- // trigger t.C and stopCh, and t.C select falls through.
- // In order to mitigate we re-check stopCh at the beginning
- // of every loop to prevent extra executions of f().
- select {
- case <-stopCh:
- if !t.Stop() {
- <-t.C()
- }
- return
- case <-t.C():
- }
- }
- }
- // JitterUntilWithContext loops until context is done, running f every period.
- //
- // If jitterFactor is positive, the period is jittered before every run of f.
- // If jitterFactor is not positive, the period is unchanged and not jittered.
- //
- // If sliding is true, the period is computed after f runs. If it is false then
- // period includes the runtime for f.
- //
- // Cancel context to stop. f may not be invoked if context is already expired.
- func JitterUntilWithContext(ctx context.Context, f func(context.Context), period time.Duration, jitterFactor float64, sliding bool) {
- JitterUntil(func() { f(ctx) }, period, jitterFactor, sliding, ctx.Done())
- }
- // Jitter returns a time.Duration between duration and duration + maxFactor *
- // duration.
- //
- // This allows clients to avoid converging on periodic behavior. If maxFactor
- // is 0.0, a suggested default value will be chosen.
- func Jitter(duration time.Duration, maxFactor float64) time.Duration {
- if maxFactor <= 0.0 {
- maxFactor = 1.0
- }
- wait := duration + time.Duration(rand.Float64()*maxFactor*float64(duration))
- return wait
- }
- // ErrWaitTimeout is returned when the condition exited without success.
- var ErrWaitTimeout = errors.New("timed out waiting for the condition")
- // ConditionFunc returns true if the condition is satisfied, or an error
- // if the loop should be aborted.
- type ConditionFunc func() (done bool, err error)
- // ConditionWithContextFunc returns true if the condition is satisfied, or an error
- // if the loop should be aborted.
- //
- // The caller passes along a context that can be used by the condition function.
- type ConditionWithContextFunc func(context.Context) (done bool, err error)
- // WithContext converts a ConditionFunc into a ConditionWithContextFunc
- func (cf ConditionFunc) WithContext() ConditionWithContextFunc {
- return func(context.Context) (done bool, err error) {
- return cf()
- }
- }
- // runConditionWithCrashProtection runs a ConditionFunc with crash protection
- func runConditionWithCrashProtection(condition ConditionFunc) (bool, error) {
- return runConditionWithCrashProtectionWithContext(context.TODO(), condition.WithContext())
- }
- // runConditionWithCrashProtectionWithContext runs a
- // ConditionWithContextFunc with crash protection.
- func runConditionWithCrashProtectionWithContext(ctx context.Context, condition ConditionWithContextFunc) (bool, error) {
- defer runtime.HandleCrash()
- return condition(ctx)
- }
- // Backoff holds parameters applied to a Backoff function.
- type Backoff struct {
- // The initial duration.
- Duration time.Duration
- // Duration is multiplied by factor each iteration, if factor is not zero
- // and the limits imposed by Steps and Cap have not been reached.
- // Should not be negative.
- // The jitter does not contribute to the updates to the duration parameter.
- Factor float64
- // The sleep at each iteration is the duration plus an additional
- // amount chosen uniformly at random from the interval between
- // zero and `jitter*duration`.
- Jitter float64
- // The remaining number of iterations in which the duration
- // parameter may change (but progress can be stopped earlier by
- // hitting the cap). If not positive, the duration is not
- // changed. Used for exponential backoff in combination with
- // Factor and Cap.
- Steps int
- // A limit on revised values of the duration parameter. If a
- // multiplication by the factor parameter would make the duration
- // exceed the cap then the duration is set to the cap and the
- // steps parameter is set to zero.
- Cap time.Duration
- }
- // Step (1) returns an amount of time to sleep determined by the
- // original Duration and Jitter and (2) mutates the provided Backoff
- // to update its Steps and Duration.
- func (b *Backoff) Step() time.Duration {
- if b.Steps < 1 {
- if b.Jitter > 0 {
- return Jitter(b.Duration, b.Jitter)
- }
- return b.Duration
- }
- b.Steps--
- duration := b.Duration
- // calculate the next step
- if b.Factor != 0 {
- b.Duration = time.Duration(float64(b.Duration) * b.Factor)
- if b.Cap > 0 && b.Duration > b.Cap {
- b.Duration = b.Cap
- b.Steps = 0
- }
- }
- if b.Jitter > 0 {
- duration = Jitter(duration, b.Jitter)
- }
- return duration
- }
- // ContextForChannel derives a child context from a parent channel.
- //
- // The derived context's Done channel is closed when the returned cancel function
- // is called or when the parent channel is closed, whichever happens first.
- //
- // Note the caller must *always* call the CancelFunc, otherwise resources may be leaked.
- func ContextForChannel(parentCh <-chan struct{}) (context.Context, context.CancelFunc) {
- ctx, cancel := context.WithCancel(context.Background())
- go func() {
- select {
- case <-parentCh:
- cancel()
- case <-ctx.Done():
- }
- }()
- return ctx, cancel
- }
- // BackoffManager manages backoff with a particular scheme based on its underlying implementation. It provides
- // an interface to return a timer for backoff, and caller shall backoff until Timer.C() drains. If the second Backoff()
- // is called before the timer from the first Backoff() call finishes, the first timer will NOT be drained and result in
- // undetermined behavior.
- // The BackoffManager is supposed to be called in a single-threaded environment.
- type BackoffManager interface {
- Backoff() clock.Timer
- }
- type exponentialBackoffManagerImpl struct {
- backoff *Backoff
- backoffTimer clock.Timer
- lastBackoffStart time.Time
- initialBackoff time.Duration
- backoffResetDuration time.Duration
- clock clock.Clock
- }
- // NewExponentialBackoffManager returns a manager for managing exponential backoff. Each backoff is jittered and
- // backoff will not exceed the given max. If the backoff is not called within resetDuration, the backoff is reset.
- // This backoff manager is used to reduce load during upstream unhealthiness.
- func NewExponentialBackoffManager(initBackoff, maxBackoff, resetDuration time.Duration, backoffFactor, jitter float64, c clock.Clock) BackoffManager {
- return &exponentialBackoffManagerImpl{
- backoff: &Backoff{
- Duration: initBackoff,
- Factor: backoffFactor,
- Jitter: jitter,
- // the current impl of wait.Backoff returns Backoff.Duration once steps are used up, which is not
- // what we ideally need here, we set it to max int and assume we will never use up the steps
- Steps: math.MaxInt32,
- Cap: maxBackoff,
- },
- backoffTimer: nil,
- initialBackoff: initBackoff,
- lastBackoffStart: c.Now(),
- backoffResetDuration: resetDuration,
- clock: c,
- }
- }
- func (b *exponentialBackoffManagerImpl) getNextBackoff() time.Duration {
- if b.clock.Now().Sub(b.lastBackoffStart) > b.backoffResetDuration {
- b.backoff.Steps = math.MaxInt32
- b.backoff.Duration = b.initialBackoff
- }
- b.lastBackoffStart = b.clock.Now()
- return b.backoff.Step()
- }
- // Backoff implements BackoffManager.Backoff, it returns a timer so caller can block on the timer for exponential backoff.
- // The returned timer must be drained before calling Backoff() the second time
- func (b *exponentialBackoffManagerImpl) Backoff() clock.Timer {
- if b.backoffTimer == nil {
- b.backoffTimer = b.clock.NewTimer(b.getNextBackoff())
- } else {
- b.backoffTimer.Reset(b.getNextBackoff())
- }
- return b.backoffTimer
- }
- type jitteredBackoffManagerImpl struct {
- clock clock.Clock
- duration time.Duration
- jitter float64
- backoffTimer clock.Timer
- }
- // NewJitteredBackoffManager returns a BackoffManager that backoffs with given duration plus given jitter. If the jitter
- // is negative, backoff will not be jittered.
- func NewJitteredBackoffManager(duration time.Duration, jitter float64, c clock.Clock) BackoffManager {
- return &jitteredBackoffManagerImpl{
- clock: c,
- duration: duration,
- jitter: jitter,
- backoffTimer: nil,
- }
- }
- func (j *jitteredBackoffManagerImpl) getNextBackoff() time.Duration {
- jitteredPeriod := j.duration
- if j.jitter > 0.0 {
- jitteredPeriod = Jitter(j.duration, j.jitter)
- }
- return jitteredPeriod
- }
- // Backoff implements BackoffManager.Backoff, it returns a timer so caller can block on the timer for jittered backoff.
- // The returned timer must be drained before calling Backoff() the second time
- func (j *jitteredBackoffManagerImpl) Backoff() clock.Timer {
- backoff := j.getNextBackoff()
- if j.backoffTimer == nil {
- j.backoffTimer = j.clock.NewTimer(backoff)
- } else {
- j.backoffTimer.Reset(backoff)
- }
- return j.backoffTimer
- }
- // ExponentialBackoff repeats a condition check with exponential backoff.
- //
- // It repeatedly checks the condition and then sleeps, using `backoff.Step()`
- // to determine the length of the sleep and adjust Duration and Steps.
- // Stops and returns as soon as:
- // 1. the condition check returns true or an error,
- // 2. `backoff.Steps` checks of the condition have been done, or
- // 3. a sleep truncated by the cap on duration has been completed.
- // In case (1) the returned error is what the condition function returned.
- // In all other cases, ErrWaitTimeout is returned.
- func ExponentialBackoff(backoff Backoff, condition ConditionFunc) error {
- for backoff.Steps > 0 {
- if ok, err := runConditionWithCrashProtection(condition); err != nil || ok {
- return err
- }
- if backoff.Steps == 1 {
- break
- }
- time.Sleep(backoff.Step())
- }
- return ErrWaitTimeout
- }
- // Poll tries a condition func until it returns true, an error, or the timeout
- // is reached.
- //
- // Poll always waits the interval before the run of 'condition'.
- // 'condition' will always be invoked at least once.
- //
- // Some intervals may be missed if the condition takes too long or the time
- // window is too short.
- //
- // If you want to Poll something forever, see PollInfinite.
- func Poll(interval, timeout time.Duration, condition ConditionFunc) error {
- return PollWithContext(context.Background(), interval, timeout, condition.WithContext())
- }
- // PollWithContext tries a condition func until it returns true, an error,
- // or when the context expires or the timeout is reached, whichever
- // happens first.
- //
- // PollWithContext always waits the interval before the run of 'condition'.
- // 'condition' will always be invoked at least once.
- //
- // Some intervals may be missed if the condition takes too long or the time
- // window is too short.
- //
- // If you want to Poll something forever, see PollInfinite.
- func PollWithContext(ctx context.Context, interval, timeout time.Duration, condition ConditionWithContextFunc) error {
- return poll(ctx, false, poller(interval, timeout), condition)
- }
- // PollUntil tries a condition func until it returns true, an error or stopCh is
- // closed.
- //
- // PollUntil always waits interval before the first run of 'condition'.
- // 'condition' will always be invoked at least once.
- func PollUntil(interval time.Duration, condition ConditionFunc, stopCh <-chan struct{}) error {
- ctx, cancel := ContextForChannel(stopCh)
- defer cancel()
- return PollUntilWithContext(ctx, interval, condition.WithContext())
- }
- // PollUntilWithContext tries a condition func until it returns true,
- // an error or the specified context is cancelled or expired.
- //
- // PollUntilWithContext always waits interval before the first run of 'condition'.
- // 'condition' will always be invoked at least once.
- func PollUntilWithContext(ctx context.Context, interval time.Duration, condition ConditionWithContextFunc) error {
- return poll(ctx, false, poller(interval, 0), condition)
- }
- // PollInfinite tries a condition func until it returns true or an error
- //
- // PollInfinite always waits the interval before the run of 'condition'.
- //
- // Some intervals may be missed if the condition takes too long or the time
- // window is too short.
- func PollInfinite(interval time.Duration, condition ConditionFunc) error {
- return PollInfiniteWithContext(context.Background(), interval, condition.WithContext())
- }
- // PollInfiniteWithContext tries a condition func until it returns true or an error
- //
- // PollInfiniteWithContext always waits the interval before the run of 'condition'.
- //
- // Some intervals may be missed if the condition takes too long or the time
- // window is too short.
- func PollInfiniteWithContext(ctx context.Context, interval time.Duration, condition ConditionWithContextFunc) error {
- return poll(ctx, false, poller(interval, 0), condition)
- }
- // PollImmediate tries a condition func until it returns true, an error, or the timeout
- // is reached.
- //
- // PollImmediate always checks 'condition' before waiting for the interval. 'condition'
- // will always be invoked at least once.
- //
- // Some intervals may be missed if the condition takes too long or the time
- // window is too short.
- //
- // If you want to immediately Poll something forever, see PollImmediateInfinite.
- func PollImmediate(interval, timeout time.Duration, condition ConditionFunc) error {
- return PollImmediateWithContext(context.Background(), interval, timeout, condition.WithContext())
- }
- // PollImmediateWithContext tries a condition func until it returns true, an error,
- // or the timeout is reached or the specified context expires, whichever happens first.
- //
- // PollImmediateWithContext always checks 'condition' before waiting for the interval.
- // 'condition' will always be invoked at least once.
- //
- // Some intervals may be missed if the condition takes too long or the time
- // window is too short.
- //
- // If you want to immediately Poll something forever, see PollImmediateInfinite.
- func PollImmediateWithContext(ctx context.Context, interval, timeout time.Duration, condition ConditionWithContextFunc) error {
- return poll(ctx, true, poller(interval, timeout), condition)
- }
- // PollImmediateUntil tries a condition func until it returns true, an error or stopCh is closed.
- //
- // PollImmediateUntil runs the 'condition' before waiting for the interval.
- // 'condition' will always be invoked at least once.
- func PollImmediateUntil(interval time.Duration, condition ConditionFunc, stopCh <-chan struct{}) error {
- ctx, cancel := ContextForChannel(stopCh)
- defer cancel()
- return PollImmediateUntilWithContext(ctx, interval, condition.WithContext())
- }
- // PollImmediateUntilWithContext tries a condition func until it returns true,
- // an error or the specified context is cancelled or expired.
- //
- // PollImmediateUntilWithContext runs the 'condition' before waiting for the interval.
- // 'condition' will always be invoked at least once.
- func PollImmediateUntilWithContext(ctx context.Context, interval time.Duration, condition ConditionWithContextFunc) error {
- return poll(ctx, true, poller(interval, 0), condition)
- }
- // PollImmediateInfinite tries a condition func until it returns true or an error
- //
- // PollImmediateInfinite runs the 'condition' before waiting for the interval.
- //
- // Some intervals may be missed if the condition takes too long or the time
- // window is too short.
- func PollImmediateInfinite(interval time.Duration, condition ConditionFunc) error {
- return PollImmediateInfiniteWithContext(context.Background(), interval, condition.WithContext())
- }
- // PollImmediateInfiniteWithContext tries a condition func until it returns true
- // or an error or the specified context gets cancelled or expired.
- //
- // PollImmediateInfiniteWithContext runs the 'condition' before waiting for the interval.
- //
- // Some intervals may be missed if the condition takes too long or the time
- // window is too short.
- func PollImmediateInfiniteWithContext(ctx context.Context, interval time.Duration, condition ConditionWithContextFunc) error {
- return poll(ctx, true, poller(interval, 0), condition)
- }
- // Internally used, each of the public 'Poll*' function defined in this
- // package should invoke this internal function with appropriate parameters.
- // ctx: the context specified by the caller, for infinite polling pass
- // a context that never gets cancelled or expired.
- // immediate: if true, the 'condition' will be invoked before waiting for the interval,
- // in this case 'condition' will always be invoked at least once.
- // wait: user specified WaitFunc function that controls at what interval the condition
- // function should be invoked periodically and whether it is bound by a timeout.
- // condition: user specified ConditionWithContextFunc function.
- func poll(ctx context.Context, immediate bool, wait WaitWithContextFunc, condition ConditionWithContextFunc) error {
- if immediate {
- done, err := runConditionWithCrashProtectionWithContext(ctx, condition)
- if err != nil {
- return err
- }
- if done {
- return nil
- }
- }
- select {
- case <-ctx.Done():
- // returning ctx.Err() will break backward compatibility
- return ErrWaitTimeout
- default:
- return WaitForWithContext(ctx, wait, condition)
- }
- }
- // WaitFunc creates a channel that receives an item every time a test
- // should be executed and is closed when the last test should be invoked.
- type WaitFunc func(done <-chan struct{}) <-chan struct{}
- // WithContext converts the WaitFunc to an equivalent WaitWithContextFunc
- func (w WaitFunc) WithContext() WaitWithContextFunc {
- return func(ctx context.Context) <-chan struct{} {
- return w(ctx.Done())
- }
- }
- // WaitWithContextFunc creates a channel that receives an item every time a test
- // should be executed and is closed when the last test should be invoked.
- //
- // When the specified context gets cancelled or expires the function
- // stops sending item and returns immediately.
- type WaitWithContextFunc func(ctx context.Context) <-chan struct{}
- // WaitFor continually checks 'fn' as driven by 'wait'.
- //
- // WaitFor gets a channel from 'wait()”, and then invokes 'fn' once for every value
- // placed on the channel and once more when the channel is closed. If the channel is closed
- // and 'fn' returns false without error, WaitFor returns ErrWaitTimeout.
- //
- // If 'fn' returns an error the loop ends and that error is returned. If
- // 'fn' returns true the loop ends and nil is returned.
- //
- // ErrWaitTimeout will be returned if the 'done' channel is closed without fn ever
- // returning true.
- //
- // When the done channel is closed, because the golang `select` statement is
- // "uniform pseudo-random", the `fn` might still run one or multiple time,
- // though eventually `WaitFor` will return.
- func WaitFor(wait WaitFunc, fn ConditionFunc, done <-chan struct{}) error {
- ctx, cancel := ContextForChannel(done)
- defer cancel()
- return WaitForWithContext(ctx, wait.WithContext(), fn.WithContext())
- }
- // WaitForWithContext continually checks 'fn' as driven by 'wait'.
- //
- // WaitForWithContext gets a channel from 'wait()”, and then invokes 'fn'
- // once for every value placed on the channel and once more when the
- // channel is closed. If the channel is closed and 'fn'
- // returns false without error, WaitForWithContext returns ErrWaitTimeout.
- //
- // If 'fn' returns an error the loop ends and that error is returned. If
- // 'fn' returns true the loop ends and nil is returned.
- //
- // context.Canceled will be returned if the ctx.Done() channel is closed
- // without fn ever returning true.
- //
- // When the ctx.Done() channel is closed, because the golang `select` statement is
- // "uniform pseudo-random", the `fn` might still run one or multiple times,
- // though eventually `WaitForWithContext` will return.
- func WaitForWithContext(ctx context.Context, wait WaitWithContextFunc, fn ConditionWithContextFunc) error {
- waitCtx, cancel := context.WithCancel(context.Background())
- defer cancel()
- c := wait(waitCtx)
- for {
- select {
- case _, open := <-c:
- ok, err := runConditionWithCrashProtectionWithContext(ctx, fn)
- if err != nil {
- return err
- }
- if ok {
- return nil
- }
- if !open {
- return ErrWaitTimeout
- }
- case <-ctx.Done():
- // returning ctx.Err() will break backward compatibility
- return ErrWaitTimeout
- }
- }
- }
- // poller returns a WaitFunc that will send to the channel every interval until
- // timeout has elapsed and then closes the channel.
- //
- // Over very short intervals you may receive no ticks before the channel is
- // closed. A timeout of 0 is interpreted as an infinity, and in such a case
- // it would be the caller's responsibility to close the done channel.
- // Failure to do so would result in a leaked goroutine.
- //
- // Output ticks are not buffered. If the channel is not ready to receive an
- // item, the tick is skipped.
- func poller(interval, timeout time.Duration) WaitWithContextFunc {
- return WaitWithContextFunc(func(ctx context.Context) <-chan struct{} {
- ch := make(chan struct{})
- go func() {
- defer close(ch)
- tick := time.NewTicker(interval)
- defer tick.Stop()
- var after <-chan time.Time
- if timeout != 0 {
- // time.After is more convenient, but it
- // potentially leaves timers around much longer
- // than necessary if we exit early.
- timer := time.NewTimer(timeout)
- after = timer.C
- defer timer.Stop()
- }
- for {
- select {
- case <-tick.C:
- // If the consumer isn't ready for this signal drop it and
- // check the other channels.
- select {
- case ch <- struct{}{}:
- default:
- }
- case <-after:
- return
- case <-ctx.Done():
- return
- }
- }
- }()
- return ch
- })
- }
- // ExponentialBackoffWithContext works with a request context and a Backoff. It ensures that the retry wait never
- // exceeds the deadline specified by the request context.
- func ExponentialBackoffWithContext(ctx context.Context, backoff Backoff, condition ConditionFunc) error {
- for backoff.Steps > 0 {
- select {
- case <-ctx.Done():
- return ctx.Err()
- default:
- }
- if ok, err := runConditionWithCrashProtection(condition); err != nil || ok {
- return err
- }
- if backoff.Steps == 1 {
- break
- }
- waitBeforeRetry := backoff.Step()
- select {
- case <-ctx.Done():
- return ctx.Err()
- case <-time.After(waitBeforeRetry):
- }
- }
- return ErrWaitTimeout
- }
|