| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351 |
- // Copyright 2019 Yunion
- //
- // Licensed under the Apache License, Version 2.0 (the "License");
- // you may not use this file except in compliance with the License.
- // You may obtain a copy of the License at
- //
- // http://www.apache.org/licenses/LICENSE-2.0
- //
- // Unless required by applicable law or agreed to in writing, software
- // distributed under the License is distributed on an "AS IS" BASIS,
- // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- // See the License for the specific language governing permissions and
- // limitations under the License.
- package guestman
- import (
- "context"
- "fmt"
- "path/filepath"
- "sync"
- "time"
- "yunion.io/x/jsonutils"
- "yunion.io/x/log"
- "yunion.io/x/pkg/errors"
- "yunion.io/x/pkg/util/sets"
- computeapi "yunion.io/x/onecloud/pkg/apis/compute"
- hostapi "yunion.io/x/onecloud/pkg/apis/host"
- "yunion.io/x/onecloud/pkg/hostman/guestman/pod/pleg"
- "yunion.io/x/onecloud/pkg/hostman/guestman/pod/runtime"
- "yunion.io/x/onecloud/pkg/hostman/hostutils"
- "yunion.io/x/onecloud/pkg/util/fileutils2"
- )
- func (m *SGuestManager) reconcileContainerLoop(cache runtime.Cache) {
- log.Infof("start reconcile container loop")
- for {
- m.Servers.Range(func(id, obj interface{}) bool {
- podObj, ok := obj.(*sPodGuestInstance)
- if !ok {
- return true
- }
- if podObj.isPodDirtyShutdown() {
- log.Infof("pod %s is dirty shutdown, using dirty shutdown manager to start it", podObj.GetName())
- return true
- }
- if err := m.reconcileContainer(podObj, cache); err != nil {
- log.Warningf("reconcile pod %s: %v", podObj.GetId(), err)
- }
- return true
- })
- time.Sleep(5 * time.Second)
- }
- }
- func (m *SGuestManager) reconcileContainer(obj *sPodGuestInstance, cache runtime.Cache) error {
- ps, err := cache.Get(obj.GetId())
- if err != nil {
- return errors.Wrapf(err, "get pod status")
- }
- getContainerStatus := func(name string) *runtime.Status {
- for i := range ps.ContainerStatuses {
- cs := ps.ContainerStatuses[i]
- if cs.Name == name {
- return cs
- }
- }
- return nil
- }
- ctrs := obj.GetContainers()
- var errs []error
- for i := range ctrs {
- ctr := ctrs[i]
- cs := getContainerStatus(ctr.Name)
- if cs == nil {
- // container is deleted
- continue
- }
- if cs.State == runtime.ContainerStateExited && (cs.ExitCode != 0 || ctr.Spec.AlwaysRestart) {
- if err := m.startContainer(obj, ctr, cs); err != nil {
- errs = append(errs, errors.Wrapf(err, "start container %s", ctr.Name))
- }
- }
- }
- return errors.NewAggregate(errs)
- }
- func (m *SGuestManager) startContainer(obj *sPodGuestInstance, ctr *hostapi.ContainerDesc, cs *runtime.Status) error {
- _, isInternalStopped := obj.IsInternalStopped(cs.ID.ID)
- if isInternalStopped {
- return nil
- }
- finishedAt := ctr.StartedAt
- if !ctr.LastFinishedAt.IsZero() {
- finishedAt = ctr.LastFinishedAt
- }
- attempt := ctr.RestartCount
- step := 5 * time.Second
- internal := time.Duration(int(step) * (attempt * attempt))
- curInternal := time.Now().Sub(finishedAt)
- if !ctr.Spec.AlwaysRestart {
- if curInternal < internal {
- log.Infof("current internal time (%s) < crash_back_off time (%s), skipping restart container(%s/%s)", curInternal, internal, obj.GetId(), ctr.Name)
- return nil
- } else {
- log.Infof("current internal time (%s | %s) > crash_back_off time (%s), restart container(%s/%s)", finishedAt, curInternal, internal, obj.GetId(), ctr.Name)
- }
- } else {
- log.Infof("always restart container(%s/%s) ...", obj.GetId(), ctr.Name)
- }
- reason := fmt.Sprintf("start died container %s when exit code is %d", ctr.Id, cs.ExitCode)
- ctx := context.Background()
- userCred := hostutils.GetComputeSession(ctx).GetToken()
- if obj.ShouldRestartPodOnCrash() {
- // FIXME: 目前不用 worker 来后台异步运行 pod restart task
- // 这里异步运行会导致容器如果在 10s 没启动完成,又会进行新一轮排队
- // 所以改成同步串行执行
- //obj.RestartLocalPodAndContainers(ctx, userCred)
- newLocalPodRestartTask(ctx, userCred, obj).Run()
- } else {
- _, err := obj.StartLocalContainer(ctx, userCred, ctr.Id)
- if err != nil {
- return errors.Wrap(err, reason)
- } else {
- log.Infof("%s: start local container (%s/%s) success", reason, obj.GetId(), ctr.Name)
- }
- }
- return nil
- }
- func (m *SGuestManager) GetPleg() pleg.PodLifecycleEventGenerator {
- return m.pleg
- }
- func (m *SGuestManager) syncContainerLoop(plegCh chan *pleg.PodLifecycleEvent) {
- log.Infof("start sync container loop")
- for {
- m.syncContainerLoopIteration(plegCh)
- }
- }
- func (m *SGuestManager) syncContainerLoopIteration(plegCh chan *pleg.PodLifecycleEvent) {
- select {
- case e := <-plegCh:
- podMan := m.getPodByEvent(e)
- if podMan == nil {
- log.Warningf("can not find pod manager by %s", jsonutils.Marshal(e))
- return
- }
- if podMan.(*sPodGuestInstance).isPodDirtyShutdown() {
- log.Infof("pod %s is dirty shutdown, waiting it to started", podMan.GetName())
- return
- }
- podInstance := podMan.(*sPodGuestInstance)
- if e.Type == pleg.ContainerStarted {
- // 防止读取 podMan.GetCRIId 还没有刷新的问题
- podInstance.startPodLock.Lock()
- defer podInstance.startPodLock.Unlock()
- log.Infof("pod container started: %s", jsonutils.Marshal(e))
- ctrId := e.Data.(string)
- if ctrId == podMan.GetCRIId() {
- log.Infof("pod %s(%s) is started", podMan.GetId(), ctrId)
- } else {
- ctrObj, _ := podMan.GetContainerByCRIId(ctrId)
- if ctrObj != nil {
- podMan.SyncStatus(fmt.Sprintf("pod container started: %s(%s)", ctrObj.Name, ctrObj.Id), ctrObj.Id)
- } else {
- podMan.SyncStatus("pod container started", "")
- }
- }
- }
- if e.Type == pleg.ContainerRemoved {
- /*isInternalRemoved := podMan.IsInternalRemoved(e)
- if !isInternalRemoved {
- log.Infof("pod container removed: %s, try recreated", jsonutils.Marshal(e))
- } else {
- log.Infof("pod container removed: %s", jsonutils.Marshal(e))
- }*/
- log.Infof("pod container removed: %s", jsonutils.Marshal(e))
- }
- if e.Type == pleg.ContainerDied {
- ctrCriId := e.Data.(string)
- ctr, isInternalStopped := podMan.IsInternalStopped(ctrCriId)
- ctx := context.Background()
- ctrObj, _ := podMan.GetContainerByCRIId(ctrCriId)
- ccStatus := computeapi.CONTAINER_STATUS_EXITED
- if ctrObj != nil {
- ccStatus, _, _ = podMan.GetContainerStatus(ctx, ctrObj.Id)
- }
- if !isInternalStopped && sets.NewString(computeapi.CONTAINER_STATUS_EXITED, computeapi.CONTAINER_STATUS_CRASH_LOOP_BACK_OFF).Has(ccStatus) {
- podStatus, err := m.podCache.Get(e.Id)
- if err != nil {
- log.Errorf("get pod %s status error: %v", e.Id, err)
- return
- }
- log.Infof("pod container exited: %s", jsonutils.Marshal(e))
- // start container again
- ctrStatus := podStatus.GetContainerStatus(ctrCriId)
- var reason string
- if ctrStatus == nil {
- log.Errorf("can't get container %s status", ctrCriId)
- reason = "container not exist"
- } else {
- if ctrStatus.ExitCode == 0 {
- log.Infof("container %s exited", ctrCriId)
- reason = fmt.Sprintf("container %s exited", ctrCriId)
- } else {
- ctrId := ctrCriId
- if ctr != nil {
- ctrId = ctr.Id
- }
- reason = fmt.Sprintf("exit code of died container %s is %d", ctrId, ctrStatus.ExitCode)
- }
- }
- log.Infof("sync pod %s container %s status: %s", e.Id, ctrCriId, reason)
- // 如果是 primary container 退出,就退出其他容器
- syncCtrId := ""
- if ctrObj != nil && !isInternalStopped && podMan.IsPrimaryContainer(ctrObj.Id) && ccStatus == computeapi.CONTAINER_STATUS_EXITED {
- reason = fmt.Sprintf("stop all containers when primary container %s exited", ctrObj.Name)
- if err := podMan.StopAll(context.Background()); err != nil {
- log.Errorf("stop all pod containers error: %s", err.Error())
- }
- }
- if ctrObj != nil && !isInternalStopped && !podMan.IsPrimaryContainer(ctrObj.Id) {
- syncCtrId = ctrObj.Id
- }
- podMan.SyncStatus(reason, syncCtrId)
- } else {
- log.Infof("pod container exited: %s", jsonutils.Marshal(e))
- }
- }
- }
- }
- func (m *SGuestManager) getPodByEvent(event *pleg.PodLifecycleEvent) PodInstance {
- obj, ok := m.GetServer(event.Id)
- if !ok {
- return nil
- }
- return obj.(PodInstance)
- }
- func (s *sPodGuestInstance) IsInternalStopped(ctrCriId string) (*ContainerExpectedStatus, bool) {
- ctr, ok := s.expectedStatus.Containers[ctrCriId]
- if !ok {
- return nil, true
- }
- if ctr.Status == computeapi.CONTAINER_STATUS_EXITED {
- return ctr, true
- }
- return ctr, false
- }
- func (s *sPodGuestInstance) IsInternalRemoved(ctrCriId string) bool {
- _, ok := s.expectedStatus.Containers[ctrCriId]
- if !ok {
- return true
- }
- return false
- }
- type ContainerExpectedStatus struct {
- Id string `json:"id"`
- Name string `json:"name"`
- Status string `json:"status"`
- }
- type PodExpectedStatus struct {
- lock sync.RWMutex
- homeDir string
- Status string `json:"status"`
- Containers map[string]*ContainerExpectedStatus `json:"containers"`
- }
- func NewPodExpectedStatus(homeDir string, status string) (*PodExpectedStatus, error) {
- ps := &PodExpectedStatus{
- homeDir: homeDir,
- Status: status,
- Containers: make(map[string]*ContainerExpectedStatus),
- }
- if fileutils2.Exists(ps.getFilePath()) {
- content, err := fileutils2.FileGetContents(ps.getFilePath())
- if content == "" {
- return ps, nil
- }
- if err != nil {
- return nil, errors.Wrapf(err, "get %s content", ps.getFilePath())
- }
- obj, err := jsonutils.ParseString(content)
- if err != nil {
- return nil, errors.Wrapf(err, "parse %s content: %s", ps.getFilePath(), content)
- }
- if err := obj.Unmarshal(ps); err != nil {
- return nil, errors.Wrapf(err, "unmarshal to expected status %s", ps.getFilePath())
- }
- }
- return ps, nil
- }
- func (s *PodExpectedStatus) getFilePath() string {
- return filepath.Join(s.homeDir, "expected_status.json")
- }
- func (s *PodExpectedStatus) updateFile() error {
- content := jsonutils.Marshal(s).PrettyString()
- if err := fileutils2.FilePutContents(s.getFilePath(), content, false); err != nil {
- return errors.Wrapf(err, "put %s content: %s", s.getFilePath(), content)
- }
- return nil
- }
- func (s *PodExpectedStatus) SetStatus(status string) error {
- s.lock.Lock()
- defer s.lock.Unlock()
- s.Status = status
- if err := s.updateFile(); err != nil {
- return errors.Wrapf(err, "update file")
- }
- return nil
- }
- func (s *PodExpectedStatus) SetContainerStatus(criId string, id string, status string) error {
- s.lock.Lock()
- defer s.lock.Unlock()
- s.Containers[criId] = &ContainerExpectedStatus{
- Id: id,
- Status: status,
- }
- if err := s.updateFile(); err != nil {
- return errors.Wrapf(err, "update file")
- }
- return nil
- }
- func (s *PodExpectedStatus) RemoveContainer(id string) error {
- s.lock.Lock()
- defer s.lock.Unlock()
- delete(s.Containers, id)
- if err := s.updateFile(); err != nil {
- return errors.Wrapf(err, "update file")
- }
- return nil
- }
|