pod_sync_loop.go 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351
  1. // Copyright 2019 Yunion
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package guestman
  15. import (
  16. "context"
  17. "fmt"
  18. "path/filepath"
  19. "sync"
  20. "time"
  21. "yunion.io/x/jsonutils"
  22. "yunion.io/x/log"
  23. "yunion.io/x/pkg/errors"
  24. "yunion.io/x/pkg/util/sets"
  25. computeapi "yunion.io/x/onecloud/pkg/apis/compute"
  26. hostapi "yunion.io/x/onecloud/pkg/apis/host"
  27. "yunion.io/x/onecloud/pkg/hostman/guestman/pod/pleg"
  28. "yunion.io/x/onecloud/pkg/hostman/guestman/pod/runtime"
  29. "yunion.io/x/onecloud/pkg/hostman/hostutils"
  30. "yunion.io/x/onecloud/pkg/util/fileutils2"
  31. )
  32. func (m *SGuestManager) reconcileContainerLoop(cache runtime.Cache) {
  33. log.Infof("start reconcile container loop")
  34. for {
  35. m.Servers.Range(func(id, obj interface{}) bool {
  36. podObj, ok := obj.(*sPodGuestInstance)
  37. if !ok {
  38. return true
  39. }
  40. if podObj.isPodDirtyShutdown() {
  41. log.Infof("pod %s is dirty shutdown, using dirty shutdown manager to start it", podObj.GetName())
  42. return true
  43. }
  44. if err := m.reconcileContainer(podObj, cache); err != nil {
  45. log.Warningf("reconcile pod %s: %v", podObj.GetId(), err)
  46. }
  47. return true
  48. })
  49. time.Sleep(5 * time.Second)
  50. }
  51. }
  52. func (m *SGuestManager) reconcileContainer(obj *sPodGuestInstance, cache runtime.Cache) error {
  53. ps, err := cache.Get(obj.GetId())
  54. if err != nil {
  55. return errors.Wrapf(err, "get pod status")
  56. }
  57. getContainerStatus := func(name string) *runtime.Status {
  58. for i := range ps.ContainerStatuses {
  59. cs := ps.ContainerStatuses[i]
  60. if cs.Name == name {
  61. return cs
  62. }
  63. }
  64. return nil
  65. }
  66. ctrs := obj.GetContainers()
  67. var errs []error
  68. for i := range ctrs {
  69. ctr := ctrs[i]
  70. cs := getContainerStatus(ctr.Name)
  71. if cs == nil {
  72. // container is deleted
  73. continue
  74. }
  75. if cs.State == runtime.ContainerStateExited && (cs.ExitCode != 0 || ctr.Spec.AlwaysRestart) {
  76. if err := m.startContainer(obj, ctr, cs); err != nil {
  77. errs = append(errs, errors.Wrapf(err, "start container %s", ctr.Name))
  78. }
  79. }
  80. }
  81. return errors.NewAggregate(errs)
  82. }
  83. func (m *SGuestManager) startContainer(obj *sPodGuestInstance, ctr *hostapi.ContainerDesc, cs *runtime.Status) error {
  84. _, isInternalStopped := obj.IsInternalStopped(cs.ID.ID)
  85. if isInternalStopped {
  86. return nil
  87. }
  88. finishedAt := ctr.StartedAt
  89. if !ctr.LastFinishedAt.IsZero() {
  90. finishedAt = ctr.LastFinishedAt
  91. }
  92. attempt := ctr.RestartCount
  93. step := 5 * time.Second
  94. internal := time.Duration(int(step) * (attempt * attempt))
  95. curInternal := time.Now().Sub(finishedAt)
  96. if !ctr.Spec.AlwaysRestart {
  97. if curInternal < internal {
  98. log.Infof("current internal time (%s) < crash_back_off time (%s), skipping restart container(%s/%s)", curInternal, internal, obj.GetId(), ctr.Name)
  99. return nil
  100. } else {
  101. log.Infof("current internal time (%s | %s) > crash_back_off time (%s), restart container(%s/%s)", finishedAt, curInternal, internal, obj.GetId(), ctr.Name)
  102. }
  103. } else {
  104. log.Infof("always restart container(%s/%s) ...", obj.GetId(), ctr.Name)
  105. }
  106. reason := fmt.Sprintf("start died container %s when exit code is %d", ctr.Id, cs.ExitCode)
  107. ctx := context.Background()
  108. userCred := hostutils.GetComputeSession(ctx).GetToken()
  109. if obj.ShouldRestartPodOnCrash() {
  110. // FIXME: 目前不用 worker 来后台异步运行 pod restart task
  111. // 这里异步运行会导致容器如果在 10s 没启动完成,又会进行新一轮排队
  112. // 所以改成同步串行执行
  113. //obj.RestartLocalPodAndContainers(ctx, userCred)
  114. newLocalPodRestartTask(ctx, userCred, obj).Run()
  115. } else {
  116. _, err := obj.StartLocalContainer(ctx, userCred, ctr.Id)
  117. if err != nil {
  118. return errors.Wrap(err, reason)
  119. } else {
  120. log.Infof("%s: start local container (%s/%s) success", reason, obj.GetId(), ctr.Name)
  121. }
  122. }
  123. return nil
  124. }
  125. func (m *SGuestManager) GetPleg() pleg.PodLifecycleEventGenerator {
  126. return m.pleg
  127. }
  128. func (m *SGuestManager) syncContainerLoop(plegCh chan *pleg.PodLifecycleEvent) {
  129. log.Infof("start sync container loop")
  130. for {
  131. m.syncContainerLoopIteration(plegCh)
  132. }
  133. }
  134. func (m *SGuestManager) syncContainerLoopIteration(plegCh chan *pleg.PodLifecycleEvent) {
  135. select {
  136. case e := <-plegCh:
  137. podMan := m.getPodByEvent(e)
  138. if podMan == nil {
  139. log.Warningf("can not find pod manager by %s", jsonutils.Marshal(e))
  140. return
  141. }
  142. if podMan.(*sPodGuestInstance).isPodDirtyShutdown() {
  143. log.Infof("pod %s is dirty shutdown, waiting it to started", podMan.GetName())
  144. return
  145. }
  146. podInstance := podMan.(*sPodGuestInstance)
  147. if e.Type == pleg.ContainerStarted {
  148. // 防止读取 podMan.GetCRIId 还没有刷新的问题
  149. podInstance.startPodLock.Lock()
  150. defer podInstance.startPodLock.Unlock()
  151. log.Infof("pod container started: %s", jsonutils.Marshal(e))
  152. ctrId := e.Data.(string)
  153. if ctrId == podMan.GetCRIId() {
  154. log.Infof("pod %s(%s) is started", podMan.GetId(), ctrId)
  155. } else {
  156. ctrObj, _ := podMan.GetContainerByCRIId(ctrId)
  157. if ctrObj != nil {
  158. podMan.SyncStatus(fmt.Sprintf("pod container started: %s(%s)", ctrObj.Name, ctrObj.Id), ctrObj.Id)
  159. } else {
  160. podMan.SyncStatus("pod container started", "")
  161. }
  162. }
  163. }
  164. if e.Type == pleg.ContainerRemoved {
  165. /*isInternalRemoved := podMan.IsInternalRemoved(e)
  166. if !isInternalRemoved {
  167. log.Infof("pod container removed: %s, try recreated", jsonutils.Marshal(e))
  168. } else {
  169. log.Infof("pod container removed: %s", jsonutils.Marshal(e))
  170. }*/
  171. log.Infof("pod container removed: %s", jsonutils.Marshal(e))
  172. }
  173. if e.Type == pleg.ContainerDied {
  174. ctrCriId := e.Data.(string)
  175. ctr, isInternalStopped := podMan.IsInternalStopped(ctrCriId)
  176. ctx := context.Background()
  177. ctrObj, _ := podMan.GetContainerByCRIId(ctrCriId)
  178. ccStatus := computeapi.CONTAINER_STATUS_EXITED
  179. if ctrObj != nil {
  180. ccStatus, _, _ = podMan.GetContainerStatus(ctx, ctrObj.Id)
  181. }
  182. if !isInternalStopped && sets.NewString(computeapi.CONTAINER_STATUS_EXITED, computeapi.CONTAINER_STATUS_CRASH_LOOP_BACK_OFF).Has(ccStatus) {
  183. podStatus, err := m.podCache.Get(e.Id)
  184. if err != nil {
  185. log.Errorf("get pod %s status error: %v", e.Id, err)
  186. return
  187. }
  188. log.Infof("pod container exited: %s", jsonutils.Marshal(e))
  189. // start container again
  190. ctrStatus := podStatus.GetContainerStatus(ctrCriId)
  191. var reason string
  192. if ctrStatus == nil {
  193. log.Errorf("can't get container %s status", ctrCriId)
  194. reason = "container not exist"
  195. } else {
  196. if ctrStatus.ExitCode == 0 {
  197. log.Infof("container %s exited", ctrCriId)
  198. reason = fmt.Sprintf("container %s exited", ctrCriId)
  199. } else {
  200. ctrId := ctrCriId
  201. if ctr != nil {
  202. ctrId = ctr.Id
  203. }
  204. reason = fmt.Sprintf("exit code of died container %s is %d", ctrId, ctrStatus.ExitCode)
  205. }
  206. }
  207. log.Infof("sync pod %s container %s status: %s", e.Id, ctrCriId, reason)
  208. // 如果是 primary container 退出,就退出其他容器
  209. syncCtrId := ""
  210. if ctrObj != nil && !isInternalStopped && podMan.IsPrimaryContainer(ctrObj.Id) && ccStatus == computeapi.CONTAINER_STATUS_EXITED {
  211. reason = fmt.Sprintf("stop all containers when primary container %s exited", ctrObj.Name)
  212. if err := podMan.StopAll(context.Background()); err != nil {
  213. log.Errorf("stop all pod containers error: %s", err.Error())
  214. }
  215. }
  216. if ctrObj != nil && !isInternalStopped && !podMan.IsPrimaryContainer(ctrObj.Id) {
  217. syncCtrId = ctrObj.Id
  218. }
  219. podMan.SyncStatus(reason, syncCtrId)
  220. } else {
  221. log.Infof("pod container exited: %s", jsonutils.Marshal(e))
  222. }
  223. }
  224. }
  225. }
  226. func (m *SGuestManager) getPodByEvent(event *pleg.PodLifecycleEvent) PodInstance {
  227. obj, ok := m.GetServer(event.Id)
  228. if !ok {
  229. return nil
  230. }
  231. return obj.(PodInstance)
  232. }
  233. func (s *sPodGuestInstance) IsInternalStopped(ctrCriId string) (*ContainerExpectedStatus, bool) {
  234. ctr, ok := s.expectedStatus.Containers[ctrCriId]
  235. if !ok {
  236. return nil, true
  237. }
  238. if ctr.Status == computeapi.CONTAINER_STATUS_EXITED {
  239. return ctr, true
  240. }
  241. return ctr, false
  242. }
  243. func (s *sPodGuestInstance) IsInternalRemoved(ctrCriId string) bool {
  244. _, ok := s.expectedStatus.Containers[ctrCriId]
  245. if !ok {
  246. return true
  247. }
  248. return false
  249. }
  250. type ContainerExpectedStatus struct {
  251. Id string `json:"id"`
  252. Name string `json:"name"`
  253. Status string `json:"status"`
  254. }
  255. type PodExpectedStatus struct {
  256. lock sync.RWMutex
  257. homeDir string
  258. Status string `json:"status"`
  259. Containers map[string]*ContainerExpectedStatus `json:"containers"`
  260. }
  261. func NewPodExpectedStatus(homeDir string, status string) (*PodExpectedStatus, error) {
  262. ps := &PodExpectedStatus{
  263. homeDir: homeDir,
  264. Status: status,
  265. Containers: make(map[string]*ContainerExpectedStatus),
  266. }
  267. if fileutils2.Exists(ps.getFilePath()) {
  268. content, err := fileutils2.FileGetContents(ps.getFilePath())
  269. if content == "" {
  270. return ps, nil
  271. }
  272. if err != nil {
  273. return nil, errors.Wrapf(err, "get %s content", ps.getFilePath())
  274. }
  275. obj, err := jsonutils.ParseString(content)
  276. if err != nil {
  277. return nil, errors.Wrapf(err, "parse %s content: %s", ps.getFilePath(), content)
  278. }
  279. if err := obj.Unmarshal(ps); err != nil {
  280. return nil, errors.Wrapf(err, "unmarshal to expected status %s", ps.getFilePath())
  281. }
  282. }
  283. return ps, nil
  284. }
  285. func (s *PodExpectedStatus) getFilePath() string {
  286. return filepath.Join(s.homeDir, "expected_status.json")
  287. }
  288. func (s *PodExpectedStatus) updateFile() error {
  289. content := jsonutils.Marshal(s).PrettyString()
  290. if err := fileutils2.FilePutContents(s.getFilePath(), content, false); err != nil {
  291. return errors.Wrapf(err, "put %s content: %s", s.getFilePath(), content)
  292. }
  293. return nil
  294. }
  295. func (s *PodExpectedStatus) SetStatus(status string) error {
  296. s.lock.Lock()
  297. defer s.lock.Unlock()
  298. s.Status = status
  299. if err := s.updateFile(); err != nil {
  300. return errors.Wrapf(err, "update file")
  301. }
  302. return nil
  303. }
  304. func (s *PodExpectedStatus) SetContainerStatus(criId string, id string, status string) error {
  305. s.lock.Lock()
  306. defer s.lock.Unlock()
  307. s.Containers[criId] = &ContainerExpectedStatus{
  308. Id: id,
  309. Status: status,
  310. }
  311. if err := s.updateFile(); err != nil {
  312. return errors.Wrapf(err, "update file")
  313. }
  314. return nil
  315. }
  316. func (s *PodExpectedStatus) RemoveContainer(id string) error {
  317. s.lock.Lock()
  318. defer s.lock.Unlock()
  319. delete(s.Containers, id)
  320. if err := s.updateFile(); err != nil {
  321. return errors.Wrapf(err, "update file")
  322. }
  323. return nil
  324. }