generic.go 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436
  1. // Copyright 2019 Yunion
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package pleg
  15. import (
  16. "fmt"
  17. "sync/atomic"
  18. "time"
  19. runtimeapi "k8s.io/cri-api/pkg/apis/runtime/v1"
  20. "yunion.io/x/log"
  21. "yunion.io/x/pkg/util/clock"
  22. "yunion.io/x/pkg/util/sets"
  23. "yunion.io/x/pkg/util/wait"
  24. "yunion.io/x/onecloud/pkg/hostman/guestman/pod/runtime"
  25. )
  26. // plegContainerState has a one-to-one mapping to the
  27. // kubecontainer.State except for the non-existent state. This state
  28. // is introduced here to complete the state transition scenarios.
  29. type plegContainerState string
  30. const (
  31. plegContainerRunning plegContainerState = "running"
  32. plegContainerExited plegContainerState = "exited"
  33. plegContainerUnknown plegContainerState = "unknown"
  34. plegContainerNonExistent plegContainerState = "non-existent"
  35. // The threshold needs to be greater than the relisting period + the
  36. // relisting time, which can vary significantly. Set a conservative
  37. // threshold to avoid flipping between healthy and unhealthy.
  38. relistThreshold = 3 * time.Minute
  39. )
  40. func convertState(state runtime.State) plegContainerState {
  41. switch state {
  42. case runtime.ContainerStateCreated:
  43. // kubelet doesn't use the "created" state yet, hence convert it to "unknown".
  44. return plegContainerUnknown
  45. case runtime.ContainerStateRunning:
  46. return plegContainerRunning
  47. case runtime.ContainerStateExited:
  48. return plegContainerExited
  49. case runtime.ContainerStateUnknown:
  50. return plegContainerUnknown
  51. default:
  52. panic(fmt.Sprintf("unrecognized container state: %v", state))
  53. }
  54. }
  55. type GenericPLEG struct {
  56. // The period for relisting.
  57. relistPeriod time.Duration
  58. // The container runtime.
  59. runtime runtime.Runtime
  60. // The channel from which the subscriber listens events.
  61. eventChannel chan *PodLifecycleEvent
  62. podRecords podRecords
  63. // Time of the last relisting.
  64. relistTime atomic.Value
  65. // Cache for storing the runtime states required for syncing pods.
  66. cache runtime.Cache
  67. clock clock.Clock
  68. // Pods that failed to have their status retrieved during a relist. These pods will be
  69. // retried during the next relisting.
  70. podsToReinspect map[string]*runtime.Pod
  71. }
  72. const (
  73. // Capacity of the channel for receiving pod lifecycle events. This number
  74. // is a bit arbitrary and may be adjusted in the future.
  75. ChannelCapacity = 1000
  76. // Generic PLEG relies on relisting for discovering container events.
  77. // A longer period means that kubelet will take longer to detect container
  78. // changes and to update pod status. On the other hand, a shorter period
  79. // will cause more frequent relisting (e.g., container runtime operations),
  80. // leading to higher cpu usage.
  81. // Note that even though we set the period to 1s, the relisting itself can
  82. // take more than 1s to finish if the container runtime responds slowly
  83. // and/or when there are many container changes in one cycle.
  84. RelistPeriod = time.Second * 1
  85. )
  86. func NewGenericPLEG(runtimeManager runtime.Runtime, channelCapacity int,
  87. relistPeriod time.Duration, cache runtime.Cache, clock clock.Clock) PodLifecycleEventGenerator {
  88. return &GenericPLEG{
  89. relistPeriod: relistPeriod,
  90. runtime: runtimeManager,
  91. eventChannel: make(chan *PodLifecycleEvent, channelCapacity),
  92. podRecords: make(podRecords),
  93. cache: cache,
  94. clock: clock,
  95. }
  96. }
  97. func (g *GenericPLEG) Start() {
  98. go wait.Until(g.relist, g.relistPeriod, wait.NeverStop)
  99. }
  100. func (g *GenericPLEG) Watch() chan *PodLifecycleEvent {
  101. return g.eventChannel
  102. }
  103. func (g *GenericPLEG) getRelistTime() time.Time {
  104. val := g.relistTime.Load()
  105. if val == nil {
  106. return time.Time{}
  107. }
  108. return val.(time.Time)
  109. }
  110. func (g *GenericPLEG) updateRelistTime(timestamp time.Time) {
  111. g.relistTime.Store(timestamp)
  112. }
  113. // relist queries the container runtime for list of pods/containers, compare
  114. // with the internal pods/containers, and generates events accordingly.
  115. func (g *GenericPLEG) relist() {
  116. // log.Debugf("GenericPLEG: Relisting")
  117. timestamp := g.clock.Now()
  118. // Get all the pods.
  119. podList, err := g.runtime.GetPods(true)
  120. if err != nil {
  121. log.Errorf("Unable to retrieve pods: %v", err)
  122. return
  123. }
  124. g.updateRelistTime(timestamp)
  125. pods := runtime.Pods(podList)
  126. g.podRecords.setCurrent(pods)
  127. // Compare the old and the current pods, and generate events.
  128. eventsByPodID := map[string][]*PodLifecycleEvent{}
  129. for pid := range g.podRecords {
  130. oldPod := g.podRecords.getOld(pid)
  131. pod := g.podRecords.getCurrent(pid)
  132. // Get all containers in the old and the new pod.
  133. allContainers := getContainersFromPods(oldPod, pod)
  134. for _, container := range allContainers {
  135. events := computeEvents(oldPod, pod, &container.ID)
  136. for _, e := range events {
  137. updateEvents(eventsByPodID, e)
  138. }
  139. }
  140. }
  141. var needsReinspection map[string]*runtime.Pod
  142. if g.cacheEnabled() {
  143. needsReinspection = make(map[string]*runtime.Pod)
  144. }
  145. // If there are events associated with a pod, we should update the
  146. // podCache.
  147. for pid, events := range eventsByPodID {
  148. pod := g.podRecords.getCurrent(pid)
  149. if g.cacheEnabled() {
  150. // updateCache() will inspect the pod and update the cache. If an
  151. // error occurs during the inspection, we want PLEG to retry again
  152. // in the next relist. To achieve this, we do not update the
  153. // associated podRecord of the pod, so that the change will be
  154. // detect again in the next relist.
  155. // TODO: If many pods changed during the same relist period,
  156. // inspecting the pod and getting the PodStatus to update the cache
  157. // serially may take a while. We should be aware of this and
  158. // parallelize if needed.
  159. if err := g.updateCache(pod, pid); err != nil {
  160. // Rely on updateCache calling GetPodStatus to log the actual error.
  161. log.Infof("PLEG: Ignoring events for pod %s/%s: %v", pod.Name, pod.Namespace, err)
  162. // make sure we try to reinspect the pod during the next relisting
  163. needsReinspection[pid] = pod
  164. continue
  165. } else {
  166. // this pod was in the list to reinspect and we did so because it had events, so remove it
  167. // from the list (we don't want the reinspection code below to inspect it a second time in
  168. // this relist execution)
  169. delete(g.podsToReinspect, pid)
  170. }
  171. }
  172. // Update the internal storage and send out the events.
  173. g.podRecords.update(pid)
  174. for i := range events {
  175. // Filter out events that are not reliable and no other components use yet.
  176. if events[i].Type == ContainerChanged {
  177. continue
  178. }
  179. select {
  180. case g.eventChannel <- events[i]:
  181. default:
  182. log.Errorf("event channel is full, discard this relist() cycle event")
  183. }
  184. }
  185. }
  186. if g.cacheEnabled() {
  187. // reinspect any pods that failed inspection during the previous relist
  188. if len(g.podsToReinspect) > 0 {
  189. log.Infof("GenericPLEG: Reinspecting pods that previously failed inspection")
  190. for pid, pod := range g.podsToReinspect {
  191. if err := g.updateCache(pod, pid); err != nil {
  192. // Rely on updateCache calling GetPodStatus to log the actual error.
  193. log.Infof("PLEG: pod %s/%s failed reinspection: %v", pod.Name, pod.Namespace, err)
  194. needsReinspection[pid] = pod
  195. }
  196. }
  197. }
  198. // Update the cache timestamp. This needs to happen *after*
  199. // all pods have been properly updated in the cache.
  200. g.cache.UpdateTime(timestamp)
  201. }
  202. // make sure we retain the list of pods that need reinspecting the next time relist is called
  203. g.podsToReinspect = needsReinspection
  204. }
  205. func (g *GenericPLEG) cacheEnabled() bool {
  206. return g.cache != nil
  207. }
  208. func (g *GenericPLEG) updateCache(pod *runtime.Pod, pid string) error {
  209. if pod == nil {
  210. // The pod is missing in the current relist. This means that
  211. // the pod has no visible (active or inactive) containers.
  212. log.Infof("PLEG: Delete status for pod %q", string(pid))
  213. g.cache.Delete(pid)
  214. return nil
  215. }
  216. timestamp := g.clock.Now()
  217. // TODO: Consider adding a new runtime method
  218. // GetPodStatus(pod *kubecontainer.Pod) so that Docker can avoid listing
  219. // all containers again.
  220. status, err := g.runtime.GetPodStatus(pod.Id, pod.Name, pod.Namespace)
  221. log.Debugf("PLEG: Write status for %s/%s: %#v (err: %v)", pod.Name, pod.Namespace, status, err)
  222. if err == nil {
  223. // Preserve the pod IP across cache updates if the new IP is empty.
  224. // When a pod is torn down, kubelet may race with PLEG and retrieve
  225. // a pod status after network teardown, but the kubernetes API expects
  226. // the completed pod's IP to be available after the pod is dead.
  227. status.IPs = g.getPodIPs(pid, status)
  228. }
  229. g.cache.Set(pod.Id, status, err, timestamp)
  230. return err
  231. }
  232. // getPodIP preserves an older cached status' pod IP if the new status has no pod IPs
  233. // and its sandboxes have exited
  234. func (g *GenericPLEG) getPodIPs(pid string, status *runtime.PodStatus) []string {
  235. if len(status.IPs) != 0 {
  236. return status.IPs
  237. }
  238. oldStatus, err := g.cache.Get(pid)
  239. if err != nil || len(oldStatus.IPs) == 0 {
  240. return nil
  241. }
  242. for _, sandboxStatus := range status.SandboxStatuses {
  243. // If at least one sandbox is ready, then use this status update's pod IP
  244. if sandboxStatus.State == runtimeapi.PodSandboxState_SANDBOX_READY {
  245. return status.IPs
  246. }
  247. }
  248. // For pods with no ready containers or sandboxes (like exited pods)
  249. // use the old status' pod IP
  250. return oldStatus.IPs
  251. }
  252. func updateEvents(eventsByPodID map[string][]*PodLifecycleEvent, e *PodLifecycleEvent) {
  253. if e == nil {
  254. return
  255. }
  256. eventsByPodID[e.Id] = append(eventsByPodID[e.Id], e)
  257. }
  258. func getContainersFromPods(pods ...*runtime.Pod) []*runtime.Container {
  259. cidSet := sets.NewString()
  260. var containers []*runtime.Container
  261. for _, p := range pods {
  262. if p == nil {
  263. continue
  264. }
  265. for _, c := range p.Containers {
  266. cid := string(c.ID.ID)
  267. if cidSet.Has(cid) {
  268. continue
  269. }
  270. cidSet.Insert(cid)
  271. containers = append(containers, c)
  272. }
  273. // Update sandboxes as containers
  274. // TODO: keep track of sandboxes explicitly.
  275. for _, c := range p.Sandboxes {
  276. cid := string(c.ID.ID)
  277. if cidSet.Has(cid) {
  278. continue
  279. }
  280. cidSet.Insert(cid)
  281. containers = append(containers, c)
  282. }
  283. }
  284. return containers
  285. }
  286. func computeEvents(oldPod, newPod *runtime.Pod, cid *runtime.ContainerID) []*PodLifecycleEvent {
  287. var pid string
  288. if oldPod != nil {
  289. pid = oldPod.Id
  290. } else if newPod != nil {
  291. pid = newPod.Id
  292. }
  293. oldState := getContainerState(oldPod, cid)
  294. newState := getContainerState(newPod, cid)
  295. return generateEvents(pid, cid.ID, oldState, newState)
  296. }
  297. func generateEvents(podID string, cid string, oldState, newState plegContainerState) []*PodLifecycleEvent {
  298. if newState == oldState {
  299. return nil
  300. }
  301. log.Infof("GenericPLEG: %v/%v: %v -> %v", podID, cid, oldState, newState)
  302. switch newState {
  303. case plegContainerRunning:
  304. return []*PodLifecycleEvent{{Id: podID, Type: ContainerStarted, Data: cid}}
  305. case plegContainerExited:
  306. return []*PodLifecycleEvent{{Id: podID, Type: ContainerDied, Data: cid}}
  307. case plegContainerUnknown:
  308. return []*PodLifecycleEvent{{Id: podID, Type: ContainerChanged, Data: cid}}
  309. case plegContainerNonExistent:
  310. switch oldState {
  311. case plegContainerExited:
  312. // We already reported that the container died before.
  313. return []*PodLifecycleEvent{{Id: podID, Type: ContainerRemoved, Data: cid}}
  314. default:
  315. return []*PodLifecycleEvent{{Id: podID, Type: ContainerDied, Data: cid}, {Id: podID, Type: ContainerRemoved, Data: cid}}
  316. }
  317. default:
  318. panic(fmt.Sprintf("unrecognized container state: %v", newState))
  319. }
  320. }
  321. func getContainerState(pod *runtime.Pod, cid *runtime.ContainerID) plegContainerState {
  322. // Default to the non-existent state.
  323. state := plegContainerNonExistent
  324. if pod == nil {
  325. return state
  326. }
  327. c := pod.FindContainerByID(*cid)
  328. if c != nil {
  329. return convertState(c.State)
  330. }
  331. // Search through sandboxes too.
  332. c = pod.FindSandboxByID(*cid)
  333. if c != nil {
  334. return convertState(c.State)
  335. }
  336. return state
  337. }
  338. type podRecord struct {
  339. old *runtime.Pod
  340. current *runtime.Pod
  341. }
  342. type podRecords map[string]*podRecord
  343. func (pr podRecords) getOld(id string) *runtime.Pod {
  344. r, ok := pr[id]
  345. if !ok {
  346. return nil
  347. }
  348. return r.old
  349. }
  350. func (pr podRecords) getCurrent(id string) *runtime.Pod {
  351. r, ok := pr[id]
  352. if !ok {
  353. return nil
  354. }
  355. return r.current
  356. }
  357. func (pr podRecords) setCurrent(pods []*runtime.Pod) {
  358. for i := range pr {
  359. pr[i].current = nil
  360. }
  361. for _, pod := range pods {
  362. if r, ok := pr[pod.Id]; ok {
  363. r.current = pod
  364. } else {
  365. pr[pod.Id] = &podRecord{current: pod}
  366. }
  367. }
  368. }
  369. func (pr podRecords) update(id string) {
  370. r, ok := pr[id]
  371. if !ok {
  372. return
  373. }
  374. pr.updateInternal(id, r)
  375. }
  376. func (pr podRecords) updateInternal(id string, r *podRecord) {
  377. if r.current == nil {
  378. // Pod no longer exists; delete the entry.
  379. delete(pr, id)
  380. return
  381. }
  382. r.old = r.current
  383. r.current = nil
  384. }