pod_state.go 3.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157
  1. package statusman
  2. import (
  3. "context"
  4. "time"
  5. "yunion.io/x/pkg/errors"
  6. "yunion.io/x/onecloud/pkg/apis"
  7. computeapi "yunion.io/x/onecloud/pkg/apis/compute"
  8. "yunion.io/x/onecloud/pkg/hostman/hostutils"
  9. )
  10. var (
  11. statusManager IPodStatusManager
  12. )
  13. func init() {
  14. statusManager = newPodStatusManager()
  15. }
  16. func GetManager() IPodStatusManager {
  17. return statusManager
  18. }
  19. type IPodStatusManager interface {
  20. UpdateStatus(req *PodStatusUpdateRequest) error
  21. Start()
  22. Stop()
  23. }
  24. type ContainerStatus struct {
  25. Status string
  26. RestartCount int
  27. StartedAt *time.Time
  28. LastFinishedAt *time.Time
  29. }
  30. type IPod interface {
  31. MarkContainerProbeDirty(ctrStatus string, ctrId string, reason string)
  32. }
  33. type PodStatusUpdateRequest struct {
  34. Id string
  35. Pod IPod
  36. Status string
  37. ContainerStatuses map[string]*ContainerStatus
  38. Reason string
  39. Result chan error `json:"-"`
  40. }
  41. func (r PodStatusUpdateRequest) ToServerPerformStatusInput() *computeapi.ServerPerformStatusInput {
  42. powerState := computeapi.VM_POWER_STATES_OFF
  43. if r.Status == computeapi.VM_RUNNING {
  44. powerState = computeapi.VM_POWER_STATES_ON
  45. }
  46. guestStatus := &computeapi.ServerPerformStatusInput{
  47. PerformStatusInput: apis.PerformStatusInput{
  48. Status: r.Status,
  49. PowerStates: powerState,
  50. Reason: r.Reason,
  51. },
  52. Containers: make(map[string]*computeapi.ContainerPerformStatusInput),
  53. }
  54. for ctrId, ctrStatus := range r.ContainerStatuses {
  55. guestStatus.Containers[ctrId] = &computeapi.ContainerPerformStatusInput{
  56. PerformStatusInput: apis.PerformStatusInput{
  57. Status: ctrStatus.Status,
  58. Reason: r.Reason,
  59. },
  60. RestartCount: ctrStatus.RestartCount,
  61. StartedAt: ctrStatus.StartedAt,
  62. LastFinishedAt: ctrStatus.LastFinishedAt,
  63. }
  64. }
  65. return guestStatus
  66. }
  67. func (r PodStatusUpdateRequest) ToHostUploadGuestsStatusInput() *computeapi.HostUploadGuestsStatusInput {
  68. id := r.Id
  69. guestStatus := &computeapi.HostUploadGuestStatusInput{
  70. PerformStatusInput: apis.PerformStatusInput{
  71. Status: r.Status,
  72. Reason: r.Reason,
  73. },
  74. Containers: make(map[string]*computeapi.ContainerPerformStatusInput),
  75. }
  76. for ctrId, ctrStatus := range r.ContainerStatuses {
  77. guestStatus.Containers[ctrId] = &computeapi.ContainerPerformStatusInput{
  78. PerformStatusInput: apis.PerformStatusInput{
  79. Status: ctrStatus.Status,
  80. Reason: r.Reason,
  81. },
  82. RestartCount: ctrStatus.RestartCount,
  83. StartedAt: ctrStatus.StartedAt,
  84. LastFinishedAt: ctrStatus.LastFinishedAt,
  85. }
  86. }
  87. return &computeapi.HostUploadGuestsStatusInput{
  88. Guests: map[string]*computeapi.HostUploadGuestStatusInput{
  89. id: guestStatus,
  90. },
  91. }
  92. }
  93. type podStatusManager struct {
  94. updateChan chan *PodStatusUpdateRequest
  95. stopChan chan struct{}
  96. }
  97. func newPodStatusManager() IPodStatusManager {
  98. return &podStatusManager{
  99. updateChan: make(chan *PodStatusUpdateRequest),
  100. stopChan: make(chan struct{}),
  101. }
  102. }
  103. func (m *podStatusManager) Start() {
  104. go m.processLoop()
  105. }
  106. func (m *podStatusManager) Stop() {
  107. close(m.stopChan)
  108. }
  109. func (m *podStatusManager) UpdateStatus(req *PodStatusUpdateRequest) error {
  110. result := make(chan error, 1)
  111. req.Result = result
  112. m.updateChan <- req
  113. return <-result
  114. }
  115. func (m *podStatusManager) processLoop() {
  116. for {
  117. select {
  118. case <-m.stopChan:
  119. return
  120. case req := <-m.updateChan:
  121. err := m.handleUpdate(req)
  122. req.Result <- err
  123. }
  124. }
  125. }
  126. func (m *podStatusManager) handleUpdate(req *PodStatusUpdateRequest) error {
  127. input := req.ToServerPerformStatusInput()
  128. if _, err := hostutils.UpdateServerContainersStatus(context.Background(), req.Id, input); err != nil {
  129. return errors.Wrapf(err, "update server containers status")
  130. }
  131. for ctrId, ctrStatus := range input.Containers {
  132. // 同步容器状态可能会出现 probing 状态,所以需要 mark 成 dirty,等待 probe manager 重新探测容器状态
  133. req.Pod.MarkContainerProbeDirty(ctrStatus.Status, ctrId, input.Reason)
  134. }
  135. return nil
  136. }