// Copyright 2019 Yunion // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. package pleg import ( "fmt" "sync/atomic" "time" runtimeapi "k8s.io/cri-api/pkg/apis/runtime/v1" "yunion.io/x/log" "yunion.io/x/pkg/util/clock" "yunion.io/x/pkg/util/sets" "yunion.io/x/pkg/util/wait" "yunion.io/x/onecloud/pkg/hostman/guestman/pod/runtime" ) // plegContainerState has a one-to-one mapping to the // kubecontainer.State except for the non-existent state. This state // is introduced here to complete the state transition scenarios. type plegContainerState string const ( plegContainerRunning plegContainerState = "running" plegContainerExited plegContainerState = "exited" plegContainerUnknown plegContainerState = "unknown" plegContainerNonExistent plegContainerState = "non-existent" // The threshold needs to be greater than the relisting period + the // relisting time, which can vary significantly. Set a conservative // threshold to avoid flipping between healthy and unhealthy. relistThreshold = 3 * time.Minute ) func convertState(state runtime.State) plegContainerState { switch state { case runtime.ContainerStateCreated: // kubelet doesn't use the "created" state yet, hence convert it to "unknown". return plegContainerUnknown case runtime.ContainerStateRunning: return plegContainerRunning case runtime.ContainerStateExited: return plegContainerExited case runtime.ContainerStateUnknown: return plegContainerUnknown default: panic(fmt.Sprintf("unrecognized container state: %v", state)) } } type GenericPLEG struct { // The period for relisting. relistPeriod time.Duration // The container runtime. runtime runtime.Runtime // The channel from which the subscriber listens events. eventChannel chan *PodLifecycleEvent podRecords podRecords // Time of the last relisting. relistTime atomic.Value // Cache for storing the runtime states required for syncing pods. cache runtime.Cache clock clock.Clock // Pods that failed to have their status retrieved during a relist. These pods will be // retried during the next relisting. podsToReinspect map[string]*runtime.Pod } const ( // Capacity of the channel for receiving pod lifecycle events. This number // is a bit arbitrary and may be adjusted in the future. ChannelCapacity = 1000 // Generic PLEG relies on relisting for discovering container events. // A longer period means that kubelet will take longer to detect container // changes and to update pod status. On the other hand, a shorter period // will cause more frequent relisting (e.g., container runtime operations), // leading to higher cpu usage. // Note that even though we set the period to 1s, the relisting itself can // take more than 1s to finish if the container runtime responds slowly // and/or when there are many container changes in one cycle. RelistPeriod = time.Second * 1 ) func NewGenericPLEG(runtimeManager runtime.Runtime, channelCapacity int, relistPeriod time.Duration, cache runtime.Cache, clock clock.Clock) PodLifecycleEventGenerator { return &GenericPLEG{ relistPeriod: relistPeriod, runtime: runtimeManager, eventChannel: make(chan *PodLifecycleEvent, channelCapacity), podRecords: make(podRecords), cache: cache, clock: clock, } } func (g *GenericPLEG) Start() { go wait.Until(g.relist, g.relistPeriod, wait.NeverStop) } func (g *GenericPLEG) Watch() chan *PodLifecycleEvent { return g.eventChannel } func (g *GenericPLEG) getRelistTime() time.Time { val := g.relistTime.Load() if val == nil { return time.Time{} } return val.(time.Time) } func (g *GenericPLEG) updateRelistTime(timestamp time.Time) { g.relistTime.Store(timestamp) } // relist queries the container runtime for list of pods/containers, compare // with the internal pods/containers, and generates events accordingly. func (g *GenericPLEG) relist() { // log.Debugf("GenericPLEG: Relisting") timestamp := g.clock.Now() // Get all the pods. podList, err := g.runtime.GetPods(true) if err != nil { log.Errorf("Unable to retrieve pods: %v", err) return } g.updateRelistTime(timestamp) pods := runtime.Pods(podList) g.podRecords.setCurrent(pods) // Compare the old and the current pods, and generate events. eventsByPodID := map[string][]*PodLifecycleEvent{} for pid := range g.podRecords { oldPod := g.podRecords.getOld(pid) pod := g.podRecords.getCurrent(pid) // Get all containers in the old and the new pod. allContainers := getContainersFromPods(oldPod, pod) for _, container := range allContainers { events := computeEvents(oldPod, pod, &container.ID) for _, e := range events { updateEvents(eventsByPodID, e) } } } var needsReinspection map[string]*runtime.Pod if g.cacheEnabled() { needsReinspection = make(map[string]*runtime.Pod) } // If there are events associated with a pod, we should update the // podCache. for pid, events := range eventsByPodID { pod := g.podRecords.getCurrent(pid) if g.cacheEnabled() { // updateCache() will inspect the pod and update the cache. If an // error occurs during the inspection, we want PLEG to retry again // in the next relist. To achieve this, we do not update the // associated podRecord of the pod, so that the change will be // detect again in the next relist. // TODO: If many pods changed during the same relist period, // inspecting the pod and getting the PodStatus to update the cache // serially may take a while. We should be aware of this and // parallelize if needed. if err := g.updateCache(pod, pid); err != nil { // Rely on updateCache calling GetPodStatus to log the actual error. log.Infof("PLEG: Ignoring events for pod %s/%s: %v", pod.Name, pod.Namespace, err) // make sure we try to reinspect the pod during the next relisting needsReinspection[pid] = pod continue } else { // this pod was in the list to reinspect and we did so because it had events, so remove it // from the list (we don't want the reinspection code below to inspect it a second time in // this relist execution) delete(g.podsToReinspect, pid) } } // Update the internal storage and send out the events. g.podRecords.update(pid) for i := range events { // Filter out events that are not reliable and no other components use yet. if events[i].Type == ContainerChanged { continue } select { case g.eventChannel <- events[i]: default: log.Errorf("event channel is full, discard this relist() cycle event") } } } if g.cacheEnabled() { // reinspect any pods that failed inspection during the previous relist if len(g.podsToReinspect) > 0 { log.Infof("GenericPLEG: Reinspecting pods that previously failed inspection") for pid, pod := range g.podsToReinspect { if err := g.updateCache(pod, pid); err != nil { // Rely on updateCache calling GetPodStatus to log the actual error. log.Infof("PLEG: pod %s/%s failed reinspection: %v", pod.Name, pod.Namespace, err) needsReinspection[pid] = pod } } } // Update the cache timestamp. This needs to happen *after* // all pods have been properly updated in the cache. g.cache.UpdateTime(timestamp) } // make sure we retain the list of pods that need reinspecting the next time relist is called g.podsToReinspect = needsReinspection } func (g *GenericPLEG) cacheEnabled() bool { return g.cache != nil } func (g *GenericPLEG) updateCache(pod *runtime.Pod, pid string) error { if pod == nil { // The pod is missing in the current relist. This means that // the pod has no visible (active or inactive) containers. log.Infof("PLEG: Delete status for pod %q", string(pid)) g.cache.Delete(pid) return nil } timestamp := g.clock.Now() // TODO: Consider adding a new runtime method // GetPodStatus(pod *kubecontainer.Pod) so that Docker can avoid listing // all containers again. status, err := g.runtime.GetPodStatus(pod.Id, pod.Name, pod.Namespace) log.Debugf("PLEG: Write status for %s/%s: %#v (err: %v)", pod.Name, pod.Namespace, status, err) if err == nil { // Preserve the pod IP across cache updates if the new IP is empty. // When a pod is torn down, kubelet may race with PLEG and retrieve // a pod status after network teardown, but the kubernetes API expects // the completed pod's IP to be available after the pod is dead. status.IPs = g.getPodIPs(pid, status) } g.cache.Set(pod.Id, status, err, timestamp) return err } // getPodIP preserves an older cached status' pod IP if the new status has no pod IPs // and its sandboxes have exited func (g *GenericPLEG) getPodIPs(pid string, status *runtime.PodStatus) []string { if len(status.IPs) != 0 { return status.IPs } oldStatus, err := g.cache.Get(pid) if err != nil || len(oldStatus.IPs) == 0 { return nil } for _, sandboxStatus := range status.SandboxStatuses { // If at least one sandbox is ready, then use this status update's pod IP if sandboxStatus.State == runtimeapi.PodSandboxState_SANDBOX_READY { return status.IPs } } // For pods with no ready containers or sandboxes (like exited pods) // use the old status' pod IP return oldStatus.IPs } func updateEvents(eventsByPodID map[string][]*PodLifecycleEvent, e *PodLifecycleEvent) { if e == nil { return } eventsByPodID[e.Id] = append(eventsByPodID[e.Id], e) } func getContainersFromPods(pods ...*runtime.Pod) []*runtime.Container { cidSet := sets.NewString() var containers []*runtime.Container for _, p := range pods { if p == nil { continue } for _, c := range p.Containers { cid := string(c.ID.ID) if cidSet.Has(cid) { continue } cidSet.Insert(cid) containers = append(containers, c) } // Update sandboxes as containers // TODO: keep track of sandboxes explicitly. for _, c := range p.Sandboxes { cid := string(c.ID.ID) if cidSet.Has(cid) { continue } cidSet.Insert(cid) containers = append(containers, c) } } return containers } func computeEvents(oldPod, newPod *runtime.Pod, cid *runtime.ContainerID) []*PodLifecycleEvent { var pid string if oldPod != nil { pid = oldPod.Id } else if newPod != nil { pid = newPod.Id } oldState := getContainerState(oldPod, cid) newState := getContainerState(newPod, cid) return generateEvents(pid, cid.ID, oldState, newState) } func generateEvents(podID string, cid string, oldState, newState plegContainerState) []*PodLifecycleEvent { if newState == oldState { return nil } log.Infof("GenericPLEG: %v/%v: %v -> %v", podID, cid, oldState, newState) switch newState { case plegContainerRunning: return []*PodLifecycleEvent{{Id: podID, Type: ContainerStarted, Data: cid}} case plegContainerExited: return []*PodLifecycleEvent{{Id: podID, Type: ContainerDied, Data: cid}} case plegContainerUnknown: return []*PodLifecycleEvent{{Id: podID, Type: ContainerChanged, Data: cid}} case plegContainerNonExistent: switch oldState { case plegContainerExited: // We already reported that the container died before. return []*PodLifecycleEvent{{Id: podID, Type: ContainerRemoved, Data: cid}} default: return []*PodLifecycleEvent{{Id: podID, Type: ContainerDied, Data: cid}, {Id: podID, Type: ContainerRemoved, Data: cid}} } default: panic(fmt.Sprintf("unrecognized container state: %v", newState)) } } func getContainerState(pod *runtime.Pod, cid *runtime.ContainerID) plegContainerState { // Default to the non-existent state. state := plegContainerNonExistent if pod == nil { return state } c := pod.FindContainerByID(*cid) if c != nil { return convertState(c.State) } // Search through sandboxes too. c = pod.FindSandboxByID(*cid) if c != nil { return convertState(c.State) } return state } type podRecord struct { old *runtime.Pod current *runtime.Pod } type podRecords map[string]*podRecord func (pr podRecords) getOld(id string) *runtime.Pod { r, ok := pr[id] if !ok { return nil } return r.old } func (pr podRecords) getCurrent(id string) *runtime.Pod { r, ok := pr[id] if !ok { return nil } return r.current } func (pr podRecords) setCurrent(pods []*runtime.Pod) { for i := range pr { pr[i].current = nil } for _, pod := range pods { if r, ok := pr[pod.Id]; ok { r.current = pod } else { pr[pod.Id] = &podRecord{current: pod} } } } func (pr podRecords) update(id string) { r, ok := pr[id] if !ok { return } pr.updateInternal(id, r) } func (pr podRecords) updateInternal(id string, r *podRecord) { if r.current == nil { // Pod no longer exists; delete the entry. delete(pr, id) return } r.old = r.current r.current = nil }