task.go 5.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191
  1. // Copyright 2019 Yunion
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package aliyun
  15. import (
  16. "fmt"
  17. "strconv"
  18. "strings"
  19. "time"
  20. "yunion.io/x/log"
  21. )
  22. type TaskStatusType string
  23. type TaskActionType string
  24. const (
  25. ImportImageTask = TaskActionType("ImportImage")
  26. ExportImageTask = TaskActionType("ExportImage")
  27. // Finished:已完成
  28. // Processing:运行中
  29. // Waiting:多任务排队中
  30. // Deleted:已取消
  31. // Paused:暂停
  32. // Failed:失败
  33. TaskStatusFinished = TaskStatusType("Finished")
  34. TaskStatusProcessing = TaskStatusType("Processing")
  35. TaskStatusWaiting = TaskStatusType("Waiting")
  36. TaskStatusDeleted = TaskStatusType("Deleted")
  37. TaskStatusPaused = TaskStatusType("Paused")
  38. TaskStatusFailed = TaskStatusType("Failed")
  39. )
  40. type STask struct {
  41. TaskId string
  42. TaskStatus TaskStatusType
  43. TaskAction string
  44. SupportCancel bool
  45. FinishedTime time.Time
  46. CreationTime time.Time
  47. }
  48. func (self *SRegion) WaitTaskStatus(action TaskActionType, taskId string, targetStatus TaskStatusType, interval time.Duration, timeout time.Duration, min, max int8, progress func(float32)) error {
  49. start := time.Now()
  50. for time.Now().Sub(start) < timeout {
  51. status, percent, err := self.GetTaskStatus(action, taskId)
  52. if err != nil {
  53. return err
  54. }
  55. if progress != nil {
  56. progress(float32(min) + float32(float64(max-min)*float64(percent)/100))
  57. }
  58. if status == targetStatus {
  59. if progress != nil {
  60. progress(float32(max))
  61. }
  62. return nil
  63. }
  64. time.Sleep(interval)
  65. }
  66. return fmt.Errorf("timeout for waitting task %s(%s) after %f minutes", taskId, action, timeout.Minutes())
  67. }
  68. func (self *SRegion) waitTaskStatus(action TaskActionType, taskId string, targetStatus TaskStatusType, interval time.Duration, timeout time.Duration) error {
  69. return self.WaitTaskStatus(action, taskId, targetStatus, interval, timeout, 0, 0, nil)
  70. }
  71. func (self *SRegion) GetTaskStatus(action TaskActionType, taskId string) (TaskStatusType, int8, error) {
  72. task, err := self.GetTask(taskId)
  73. if err != nil {
  74. return "", 0, err
  75. }
  76. progress, _ := strconv.Atoi(strings.TrimSuffix(task.TaskProcess, "%"))
  77. return task.TaskStatus, int8(progress), nil
  78. }
  79. func (self *SRegion) GetTasks(action TaskActionType, taskId []string, taskStatus TaskStatusType, offset int, limit int) ([]STask, int, error) {
  80. if limit > 50 || limit <= 0 {
  81. limit = 50
  82. }
  83. params := make(map[string]string)
  84. params["RegionId"] = self.RegionId
  85. params["PageSize"] = fmt.Sprintf("%d", limit)
  86. params["PageNumber"] = fmt.Sprintf("%d", (offset/limit)+1)
  87. params["TaskAction"] = string(action)
  88. if taskId != nil && len(taskId) > 0 {
  89. params["TaskIds"] = strings.Join(taskId, ",")
  90. }
  91. if len(taskStatus) > 0 {
  92. params["TaskStatus"] = string(taskStatus)
  93. }
  94. body, err := self.ecsRequest("DescribeTasks", params)
  95. if err != nil {
  96. log.Errorf("GetTasks fail %s", err)
  97. return nil, 0, err
  98. }
  99. log.Infof("%s", body)
  100. tasks := make([]STask, 0)
  101. err = body.Unmarshal(&tasks, "TaskSet", "Task")
  102. if err != nil {
  103. log.Errorf("Unmarshal task fail %s", err)
  104. return nil, 0, err
  105. }
  106. total, _ := body.Int("TotalCount")
  107. return tasks, int(total), nil
  108. }
  109. type STaskError struct {
  110. ErrorCode string
  111. ErrorMsg string
  112. OperationStatus string
  113. }
  114. type STaskDetail struct {
  115. CreationTime time.Time
  116. FailedCount int
  117. FinishedTime time.Time
  118. RegionId string
  119. OperationProgressSet map[string][]STaskError
  120. RequestId string
  121. SuccessCount int
  122. SupportCancel bool
  123. TaskAction string
  124. TaskId string
  125. TaskProcess string
  126. TaskStatus TaskStatusType
  127. TotalCount int
  128. }
  129. func (self *SRegion) GetTask(taskId string) (*STaskDetail, error) {
  130. params := map[string]string{
  131. "RegionId": self.RegionId,
  132. "TaskId": taskId,
  133. }
  134. body, err := self.ecsRequest("DescribeTaskAttribute", params)
  135. if err != nil {
  136. return nil, err
  137. }
  138. log.Infof("%s", body)
  139. detail := &STaskDetail{}
  140. return detail, body.Unmarshal(detail)
  141. }
  142. func (self *SRegion) CancelTask(taskId string) error {
  143. params := map[string]string{
  144. "RegionId": self.RegionId,
  145. "TaskId": taskId,
  146. }
  147. _, err := self.ecsRequest("CancelTask", params)
  148. return err
  149. }
  150. func (region *SRegion) CancelImageImportTasks() error {
  151. tasks, _, _ := region.GetTasks(ImportImageTask, []string{}, TaskStatusProcessing, 0, 50)
  152. for i := 0; i < len(tasks); i++ {
  153. task, err := region.GetTask(tasks[i].TaskId)
  154. if err != nil {
  155. log.Errorf("failed get task %s %s error: %v", tasks[i].CreationTime, tasks[i].TaskId, err)
  156. }
  157. if task != nil {
  158. log.Debugf("task info: %s(%s) cancelable %t process %s", task.TaskId, task.CreationTime, task.SupportCancel, task.TaskProcess)
  159. } else {
  160. log.Debugf("task info: %s(%s) cancelable %t", tasks[i].TaskId, tasks[i].CreationTime, tasks[i].SupportCancel)
  161. }
  162. if tasks[i].SupportCancel {
  163. err := region.CancelTask(tasks[i].TaskId)
  164. if err != nil {
  165. return fmt.Errorf("failed to cancel task %s error: %v", tasks[i].TaskId, err)
  166. }
  167. }
  168. }
  169. return nil
  170. }