prober_manager.go 7.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250
  1. // Copyright 2019 Yunion
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. /*
  15. Copyright 2015 The Kubernetes Authors.
  16. Licensed under the Apache License, Version 2.0 (the "License");
  17. you may not use this file except in compliance with the License.
  18. You may obtain a copy of the License at
  19. http://www.apache.org/licenses/LICENSE-2.0
  20. Unless required by applicable law or agreed to in writing, software
  21. distributed under the License is distributed on an "AS IS" BASIS,
  22. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  23. See the License for the specific language governing permissions and
  24. limitations under the License.
  25. */
  26. package prober
  27. import (
  28. "fmt"
  29. "sync"
  30. "yunion.io/x/log"
  31. "yunion.io/x/pkg/util/sets"
  32. "yunion.io/x/pkg/util/wait"
  33. "yunion.io/x/onecloud/pkg/apis"
  34. "yunion.io/x/onecloud/pkg/apis/host"
  35. "yunion.io/x/onecloud/pkg/hostman/container/prober/results"
  36. "yunion.io/x/onecloud/pkg/hostman/container/status"
  37. "yunion.io/x/onecloud/pkg/hostman/guestman/container"
  38. "yunion.io/x/onecloud/pkg/hostman/guestman/desc"
  39. )
  40. // Key uniquely identifying container probes
  41. type probeKey struct {
  42. podUid string
  43. containerName string
  44. probeType apis.ContainerProbeType
  45. }
  46. type IPod interface {
  47. GetId() string
  48. GetName() string
  49. GetDesc() *desc.SGuestDesc
  50. GetContainers() []*host.ContainerDesc
  51. IsRunning() bool
  52. }
  53. // Manager manages pod probing. It creates a probe "worker" for every container that specifies a
  54. // probe (AddPod). The worker periodically probes its assigned container and caches the results. The
  55. // manager use the cached probe results to set the appropriate Ready state in the PodStatus when
  56. // requested (UpdatePodStatus). Updating probe parameters is not currently supported.
  57. // TODO: Move liveness probing out of the runtime, to here.
  58. type Manager interface {
  59. // AddPod creates new probe workers for every container probe. This should be called for every
  60. // pod created.
  61. AddPod(pod IPod)
  62. // RemovePod handles cleaning up the removed pod state, including terminating probe workers and
  63. // deleting cached results.
  64. RemovePod(pod IPod)
  65. // CleanupPods handles cleaning up pods which should no longer be running.
  66. // It takes a map of "desired pods" which should not be cleaned up.
  67. CleanupPods(desiredPods map[string]sets.Empty)
  68. // UpdatePodStatus modifies the given PodStatus with the appropriate Ready state for each
  69. // container based on container running status, cached probe results and worker states.
  70. UpdatePodStatus(podId string)
  71. // Start starts the Manager sync loops.
  72. Start()
  73. SetDirtyContainer(ctrId string, reason string)
  74. }
  75. type manager struct {
  76. // Map of active workers for probes
  77. workers map[probeKey]*worker
  78. // Lock for accessing & mutating workers
  79. workerLock sync.RWMutex
  80. statusManager status.Manager
  81. // readinessManager manages the results of readiness probes
  82. // readinessManager results.Manager
  83. // livenessManager manages the results of liveness probes
  84. livenessManager results.Manager
  85. // startupManager manages the results of startup probes
  86. startupManager results.Manager
  87. // prober executes the probe actions
  88. prober *prober
  89. dirtyContainers sync.Map
  90. }
  91. func NewManager(
  92. statusManager status.Manager,
  93. livenessManager results.Manager,
  94. startupManager results.Manager,
  95. runner container.CommandRunner) Manager {
  96. prober := newProber(runner)
  97. return &manager{
  98. statusManager: statusManager,
  99. prober: prober,
  100. livenessManager: livenessManager,
  101. startupManager: startupManager,
  102. workers: make(map[probeKey]*worker),
  103. workerLock: sync.RWMutex{},
  104. dirtyContainers: sync.Map{},
  105. }
  106. }
  107. func (m *manager) SetDirtyContainer(ctrId string, reason string) {
  108. log.Infof("[set dirty container] %s: %s", ctrId, reason)
  109. m.dirtyContainers.Store(ctrId, true)
  110. }
  111. func (m *manager) cleanDirtyContainer(ctrId string) {
  112. m.dirtyContainers.Delete(ctrId)
  113. }
  114. // Start syncing probe status. This should only be called once.
  115. func (m *manager) Start() {
  116. // start syncing readiness.
  117. //go wait.Forever(m.updateReadiness, 0)
  118. // start syncing startup.
  119. go wait.Forever(m.updateStartup, 0)
  120. }
  121. func (m *manager) AddPod(pod IPod) {
  122. m.workerLock.Lock()
  123. defer m.workerLock.Unlock()
  124. key := probeKey{podUid: pod.GetId()}
  125. for _, c := range pod.GetContainers() {
  126. key.containerName = c.Name
  127. if c.Spec.StartupProbe != nil {
  128. key.probeType = apis.ContainerProbeTypeStartup
  129. if _, ok := m.workers[key]; ok {
  130. log.Errorf("Startup probe already exists: %s:%s", pod.GetName(), c.Name)
  131. return
  132. }
  133. w := newWorker(m, key.probeType, pod, c)
  134. m.workers[key] = w
  135. go w.run()
  136. }
  137. /*if c.Spec.LivenessProbe != nil {
  138. key.probeType = apis.ContainerProbeTypeLiveness
  139. if _, ok := m.workers[key]; ok {
  140. log.Errorf("Liveness probe already exists: %s:%s", pod.Name, c.Name)
  141. return
  142. }
  143. w := newWorker(m, key.probeType, pod, c)
  144. m.workers[key] = w
  145. go w.run()
  146. }*/
  147. }
  148. }
  149. func (m *manager) RemovePod(pod IPod) {
  150. m.workerLock.RLock()
  151. defer m.workerLock.RUnlock()
  152. key := probeKey{podUid: pod.GetId()}
  153. for _, c := range pod.GetContainers() {
  154. key.containerName = c.Name
  155. for _, probeType := range []apis.ContainerProbeType{apis.ContainerProbeTypeLiveness, apis.ContainerProbeTypeReadiness, apis.ContainerProbeTypeStartup} {
  156. key.probeType = probeType
  157. if worker, ok := m.workers[key]; ok {
  158. worker.stop()
  159. }
  160. }
  161. }
  162. }
  163. func (m *manager) CleanupPods(desiredPods map[string]sets.Empty) {
  164. m.workerLock.RLock()
  165. defer m.workerLock.RUnlock()
  166. for key, worker := range m.workers {
  167. if _, ok := desiredPods[key.podUid]; !ok {
  168. worker.stop()
  169. }
  170. }
  171. }
  172. func (m *manager) UpdatePodStatus(status string) {}
  173. func (m *manager) getWorker(podId string, containerName string, probeType apis.ContainerProbeType) (*worker, bool) {
  174. m.workerLock.RLock()
  175. defer m.workerLock.RUnlock()
  176. worker, ok := m.workers[probeKey{podId, containerName, probeType}]
  177. return worker, ok
  178. }
  179. // Called by the worker after exiting
  180. func (m *manager) removeWorker(podId string, containerName string, probeType apis.ContainerProbeType) {
  181. m.workerLock.Lock()
  182. defer m.workerLock.Unlock()
  183. delete(m.workers, probeKey{podUid: podId, containerName: containerName, probeType: probeType})
  184. }
  185. func (m *manager) workerCount() int {
  186. m.workerLock.RLock()
  187. defer m.workerLock.RUnlock()
  188. return len(m.workers)
  189. }
  190. /*func (m *manager) updateReadiness() {
  191. update := <-m.readinessManager.Updates()
  192. ready := update.Result == results.Success
  193. m.statusManager.SetContainerReadiness(update.PodUID, update.ContainerID, ready)
  194. }*/
  195. func (m *manager) updateStartup() {
  196. update := <-m.startupManager.Updates()
  197. started := update.Result.Result == results.Success
  198. if err := m.statusManager.SetContainerStartup(
  199. update.PodUID,
  200. update.ContainerID,
  201. started,
  202. update.Result,
  203. update.Pod,
  204. ); err != nil {
  205. reason := fmt.Sprintf("set container %s/%s startup error: %v", update.PodUID, update.ContainerID, err)
  206. m.SetDirtyContainer(update.ContainerID, reason)
  207. }
  208. }