worker.go 5.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197
  1. // Copyright 2019 Yunion
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. /*
  15. Copyright 2015 The Kubernetes Authors.
  16. Licensed under the Apache License, Version 2.0 (the "License");
  17. you may not use this file except in compliance with the License.
  18. You may obtain a copy of the License at
  19. http://www.apache.org/licenses/LICENSE-2.0
  20. Unless required by applicable law or agreed to in writing, software
  21. distributed under the License is distributed on an "AS IS" BASIS,
  22. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  23. See the License for the specific language governing permissions and
  24. limitations under the License.
  25. */
  26. package prober
  27. import (
  28. "math/rand"
  29. "time"
  30. "yunion.io/x/log"
  31. "yunion.io/x/pkg/util/runtime"
  32. "yunion.io/x/onecloud/pkg/apis"
  33. hostapi "yunion.io/x/onecloud/pkg/apis/host"
  34. "yunion.io/x/onecloud/pkg/hostman/container/prober/results"
  35. )
  36. // worker handles the periodic probing of its assigned container. Each worker has a go-routine
  37. // associated with it which runs the probe loop until the container permanently terminates, or the
  38. // stop channel is closed. The worker uses the probe Manager's statusManager to get up-to-date
  39. // container IDs.
  40. type worker struct {
  41. // Channel for stopping the probe.
  42. stopCh chan struct{}
  43. // The pod containing this probe (read-only)
  44. pod IPod
  45. // The container to probe (read-only)
  46. container *hostapi.ContainerDesc
  47. // Describes the probe configuration (read-only)
  48. spec *apis.ContainerProbe
  49. // The type of the worker.
  50. probeType apis.ContainerProbeType
  51. // The probe value during the initial delay.
  52. initialValue results.Result
  53. // Where to store this workers results.
  54. resultsManager results.Manager
  55. probeManager *manager
  56. // The last known container ID for this worker.
  57. containerId string
  58. // The last probe result for this worker.
  59. lastResult results.Result
  60. // How many times in a row the probe has returned the same result.
  61. resultRun int
  62. // If set, skip probing
  63. onHold bool
  64. }
  65. // Creates and starts a new probe worker.
  66. func newWorker(
  67. m *manager,
  68. probeType apis.ContainerProbeType,
  69. pod IPod,
  70. container *hostapi.ContainerDesc) *worker {
  71. w := &worker{
  72. stopCh: make(chan struct{}, 1), // Buffer so stop() can be non-blocking.
  73. pod: pod,
  74. container: container,
  75. probeType: probeType,
  76. probeManager: m,
  77. containerId: container.Id,
  78. }
  79. switch probeType {
  80. //case apis.ContainerProbeTypeLiveness:
  81. // w.spec = container.Spec.LivenessProbe
  82. // w.resultsManager = m.livenessManager
  83. // w.initialValue = results.Success
  84. case apis.ContainerProbeTypeStartup:
  85. w.spec = container.Spec.StartupProbe
  86. w.resultsManager = m.startupManager
  87. w.initialValue = results.Unknown
  88. }
  89. return w
  90. }
  91. // run periodically probes the container.
  92. func (w *worker) run() {
  93. probeTickerPeriod := time.Duration(w.spec.PeriodSeconds) * time.Second
  94. // If host restarted the probes could be started in rapid succession.
  95. // Let the worker wait for a random portion of tickerPeriod before probing.
  96. time.Sleep(time.Duration(rand.Float64() * float64(probeTickerPeriod)))
  97. probeTicker := time.NewTicker(probeTickerPeriod)
  98. defer func() {
  99. // Clean up.
  100. probeTicker.Stop()
  101. if len(w.containerId) != 0 {
  102. w.resultsManager.Remove(w.containerId)
  103. }
  104. w.probeManager.removeWorker(w.pod.GetId(), w.container.Name, w.probeType)
  105. }()
  106. probeLoop:
  107. for w.doProbe() {
  108. // Wait for next probe tick.
  109. select {
  110. case <-w.stopCh:
  111. break probeLoop
  112. case <-probeTicker.C:
  113. // continue
  114. }
  115. }
  116. }
  117. // stop stops the probe worker. The worker handles cleanup and removes itself from its manager.
  118. // It is safe to call stop multiple times.
  119. func (w *worker) stop() {
  120. select {
  121. case w.stopCh <- struct{}{}:
  122. default: // Non-blocking.
  123. }
  124. }
  125. // doProbe probes the container once and records the result.
  126. // Returns whether the worker should continue.
  127. func (w *worker) doProbe() (keepGoing bool) {
  128. // Actually eat panics (HandleCrash takes care of logging)
  129. defer func() { recover() }()
  130. defer runtime.HandleCrash(func(_ interface{}) {
  131. keepGoing = true
  132. })
  133. result, err := w.probeManager.prober.probe(w.probeType, w.pod, w.container)
  134. if err != nil {
  135. log.Errorf("probe: %s, pod: %s, container: %s, error: %v", w.probeType, w.pod.GetId(), w.container.Id, err)
  136. // prober error, throw away the result.
  137. return true
  138. }
  139. if w.lastResult == result.Result {
  140. w.resultRun++
  141. } else {
  142. w.lastResult = result.Result
  143. w.resultRun = 1
  144. }
  145. _, isContainerDirty := w.probeManager.dirtyContainers.Load(w.container.Id)
  146. if (result.Result == results.Failure && w.resultRun < int(w.spec.FailureThreshold)) ||
  147. (result.Result == results.Success && w.resultRun < int(w.spec.SuccessThreshold)) {
  148. return true
  149. }
  150. w.resultsManager.Set(w.containerId, result, w.pod, isContainerDirty)
  151. if isContainerDirty {
  152. log.Infof("clean dirty container %s of probe manager", w.container.Id)
  153. w.probeManager.cleanDirtyContainer(w.container.Id)
  154. }
  155. if (w.probeType == apis.ContainerProbeTypeLiveness || w.probeType == apis.ContainerProbeTypeStartup) && result.Result == results.Failure {
  156. // The container fails a liveness/startup check, it will need to be restarted.
  157. // Stop probing until we see a new container ID. This is to reduce the
  158. // chance of hitting #21751, where running `docker exec` when a
  159. // container is being stopped may lead to corrupted container state.
  160. w.onHold = true
  161. w.resultRun = 0
  162. }
  163. return true
  164. }