task.go 4.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179
  1. // Copyright 2019 Yunion
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package apsara
  15. import (
  16. "fmt"
  17. "strings"
  18. "time"
  19. "yunion.io/x/log"
  20. )
  21. type TaskStatusType string
  22. type TaskActionType string
  23. const (
  24. ImportImageTask = TaskActionType("ImportImage")
  25. ExportImageTask = TaskActionType("ExportImage")
  26. // Finished:已完成
  27. // Processing:运行中
  28. // Waiting:多任务排队中
  29. // Deleted:已取消
  30. // Paused:暂停
  31. // Failed:失败
  32. TaskStatusFinished = TaskStatusType("Finished")
  33. TaskStatusProcessing = TaskStatusType("Processing")
  34. TaskStatusWaiting = TaskStatusType("Waiting")
  35. TaskStatusDeleted = TaskStatusType("Deleted")
  36. TaskStatusPaused = TaskStatusType("Paused")
  37. TaskStatusFailed = TaskStatusType("Failed")
  38. )
  39. type STask struct {
  40. TaskId string
  41. TaskStatus TaskStatusType
  42. TaskAction string
  43. SupportCancel bool
  44. FinishedTime time.Time
  45. CreationTime time.Time
  46. }
  47. func (self *SRegion) waitTaskStatus(action TaskActionType, taskId string, targetStatus TaskStatusType, interval time.Duration, timeout time.Duration) error {
  48. start := time.Now()
  49. for time.Now().Sub(start) < timeout {
  50. status, err := self.GetTaskStatus(action, taskId)
  51. if err != nil {
  52. return err
  53. }
  54. if status == targetStatus {
  55. return nil
  56. }
  57. time.Sleep(interval)
  58. }
  59. return fmt.Errorf("timeout for waitting task %s(%s) after %f minutes", taskId, action, timeout.Minutes())
  60. }
  61. func (self *SRegion) GetTaskStatus(action TaskActionType, taskId string) (TaskStatusType, error) {
  62. task, err := self.GetTask(taskId)
  63. if err != nil {
  64. return "", err
  65. }
  66. return task.TaskStatus, nil
  67. }
  68. func (self *SRegion) GetTasks(action TaskActionType, taskId []string, taskStatus TaskStatusType, offset int, limit int) ([]STask, int, error) {
  69. if limit > 50 || limit <= 0 {
  70. limit = 50
  71. }
  72. params := make(map[string]string)
  73. params["RegionId"] = self.RegionId
  74. params["PageSize"] = fmt.Sprintf("%d", limit)
  75. params["PageNumber"] = fmt.Sprintf("%d", (offset/limit)+1)
  76. params["TaskAction"] = string(action)
  77. if taskId != nil && len(taskId) > 0 {
  78. params["TaskIds"] = strings.Join(taskId, ",")
  79. }
  80. if len(taskStatus) > 0 {
  81. params["TaskStatus"] = string(taskStatus)
  82. }
  83. body, err := self.ecsRequest("DescribeTasks", params)
  84. if err != nil {
  85. log.Errorf("GetTasks fail %s", err)
  86. return nil, 0, err
  87. }
  88. log.Infof("%s", body)
  89. tasks := make([]STask, 0)
  90. err = body.Unmarshal(&tasks, "TaskSet", "Task")
  91. if err != nil {
  92. log.Errorf("Unmarshal task fail %s", err)
  93. return nil, 0, err
  94. }
  95. total, _ := body.Int("TotalCount")
  96. return tasks, int(total), nil
  97. }
  98. type STaskError struct {
  99. ErrorCode string
  100. ErrorMsg string
  101. OperationStatus string
  102. }
  103. type STaskDetail struct {
  104. CreationTime time.Time
  105. FailedCount int
  106. FinishedTime time.Time
  107. RegionId string
  108. OperationProgressSet map[string][]STaskError
  109. RequestId string
  110. SuccessCount int
  111. SupportCancel bool
  112. TaskAction string
  113. TaskId string
  114. TaskProcess string
  115. TaskStatus TaskStatusType
  116. TotalCount int
  117. }
  118. func (self *SRegion) GetTask(taskId string) (*STaskDetail, error) {
  119. params := map[string]string{
  120. "RegionId": self.RegionId,
  121. "TaskId": taskId,
  122. }
  123. body, err := self.ecsRequest("DescribeTaskAttribute", params)
  124. if err != nil {
  125. return nil, err
  126. }
  127. log.Infof("%s", body)
  128. detail := &STaskDetail{}
  129. return detail, body.Unmarshal(detail)
  130. }
  131. func (self *SRegion) CancelTask(taskId string) error {
  132. params := map[string]string{
  133. "RegionId": self.RegionId,
  134. "TaskId": taskId,
  135. }
  136. _, err := self.ecsRequest("CancelTask", params)
  137. return err
  138. }
  139. func (region *SRegion) CancelImageImportTasks() error {
  140. tasks, _, _ := region.GetTasks(ImportImageTask, []string{}, TaskStatusProcessing, 0, 50)
  141. for i := 0; i < len(tasks); i++ {
  142. task, err := region.GetTask(tasks[i].TaskId)
  143. if err != nil {
  144. log.Errorf("failed get task %s %s error: %v", tasks[i].CreationTime, tasks[i].TaskId, err)
  145. }
  146. if task != nil {
  147. log.Debugf("task info: %s(%s) cancelable %t process %s", task.TaskId, task.CreationTime, task.SupportCancel, task.TaskProcess)
  148. } else {
  149. log.Debugf("task info: %s(%s) cancelable %t", tasks[i].TaskId, tasks[i].CreationTime, tasks[i].SupportCancel)
  150. }
  151. if tasks[i].SupportCancel {
  152. err := region.CancelTask(tasks[i].TaskId)
  153. if err != nil {
  154. return fmt.Errorf("failed to cancel task %s error: %v", tasks[i].TaskId, err)
  155. }
  156. }
  157. }
  158. return nil
  159. }