schedule.go 8.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284
  1. // Copyright 2019 Yunion
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package utils
  15. import (
  16. "context"
  17. "fmt"
  18. "sort"
  19. "yunion.io/x/jsonutils"
  20. api "yunion.io/x/onecloud/pkg/apis/compute"
  21. schedapi "yunion.io/x/onecloud/pkg/apis/scheduler"
  22. "yunion.io/x/onecloud/pkg/cloudcommon/db"
  23. "yunion.io/x/onecloud/pkg/cloudcommon/db/lockman"
  24. "yunion.io/x/onecloud/pkg/cloudcommon/db/quotas"
  25. "yunion.io/x/onecloud/pkg/cloudcommon/db/taskman"
  26. "yunion.io/x/onecloud/pkg/cloudcommon/notifyclient"
  27. "yunion.io/x/onecloud/pkg/compute/models"
  28. "yunion.io/x/onecloud/pkg/compute/options"
  29. "yunion.io/x/onecloud/pkg/mcclient"
  30. "yunion.io/x/onecloud/pkg/mcclient/auth"
  31. "yunion.io/x/onecloud/pkg/mcclient/modules/scheduler"
  32. "yunion.io/x/onecloud/pkg/util/logclient"
  33. )
  34. type IScheduleModel interface {
  35. db.IStandaloneModel
  36. SetStatus(ctx context.Context, userCred mcclient.TokenCredential, status string, reason string) error
  37. }
  38. type IScheduleTask interface {
  39. GetUserCred() mcclient.TokenCredential
  40. GetSchedParams() (*schedapi.ScheduleInput, error)
  41. GetPendingUsage(quota quotas.IQuota, index int) error
  42. SetStage(stageName string, data *jsonutils.JSONDict) error
  43. SetStageFailed(ctx context.Context, reason jsonutils.JSONObject)
  44. OnStartSchedule(obj IScheduleModel)
  45. OnScheduleFailCallback(ctx context.Context, obj IScheduleModel, reason jsonutils.JSONObject, index int)
  46. // OnScheduleComplete(ctx context.Context, items []db.IStandaloneModel, data *jsonutils.JSONDict)
  47. SaveScheduleResult(ctx context.Context, obj IScheduleModel, candidate *schedapi.CandidateResource, index int)
  48. SaveScheduleResultWithBackup(ctx context.Context, obj IScheduleModel, master, slave *schedapi.CandidateResource, index int)
  49. OnScheduleFailed(ctx context.Context, reason jsonutils.JSONObject)
  50. }
  51. type SSchedTask struct {
  52. taskman.STask
  53. input *schedapi.ScheduleInput
  54. }
  55. func (self *SSchedTask) OnStartSchedule(obj IScheduleModel) {
  56. db.OpsLog.LogEvent(obj, db.ACT_ALLOCATING, nil, self.GetUserCred())
  57. obj.SetStatus(context.Background(), self.GetUserCred(), api.VM_SCHEDULE, "")
  58. }
  59. func (self *SSchedTask) OnScheduleFailCallback(ctx context.Context, obj IScheduleModel, reason jsonutils.JSONObject, index int) {
  60. obj.SetStatus(ctx, self.GetUserCred(), api.VM_SCHEDULE_FAILED, reason.String())
  61. db.OpsLog.LogEvent(obj, db.ACT_ALLOCATE_FAIL, reason, self.GetUserCred())
  62. logclient.AddActionLogWithStartable(self, obj, logclient.ACT_ALLOCATE, reason, self.GetUserCred(), false)
  63. notifyclient.EventNotify(ctx, self.GetUserCred(), notifyclient.SEventNotifyParam{
  64. Obj: obj,
  65. Action: notifyclient.ActionCreate,
  66. IsFail: true,
  67. })
  68. notifyclient.NotifySystemErrorWithCtx(ctx, obj.GetId(), obj.GetName(), api.VM_SCHEDULE_FAILED, reason.String())
  69. }
  70. func (self *SSchedTask) OnScheduleComplete(ctx context.Context, items []db.IStandaloneModel, data *jsonutils.JSONDict) {
  71. self.SetStageComplete(ctx, nil)
  72. }
  73. func (self *SSchedTask) SaveScheduleResult(ctx context.Context, obj IScheduleModel, candidate *schedapi.CandidateResource, index int) {
  74. // ...
  75. }
  76. func (self *SSchedTask) SaveScheduleResultWithBackup(ctx context.Context, obj IScheduleModel, master, slave *schedapi.CandidateResource, index int) {
  77. // ...
  78. }
  79. func (self *SSchedTask) OnScheduleFailed(ctx context.Context, reason jsonutils.JSONObject) {
  80. self.SetStageFailed(ctx, reason)
  81. }
  82. func StartScheduleObjects(
  83. ctx context.Context,
  84. task IScheduleTask,
  85. objs []db.IStandaloneModel,
  86. ) {
  87. schedObjs := make([]IScheduleModel, len(objs))
  88. for i, obj := range objs {
  89. schedObj := obj.(IScheduleModel)
  90. schedObjs[i] = schedObj
  91. task.OnStartSchedule(schedObj)
  92. }
  93. doScheduleObjects(ctx, task, schedObjs)
  94. }
  95. func doScheduleWithInput(
  96. ctx context.Context,
  97. task IScheduleTask,
  98. schedInput *schedapi.ScheduleInput,
  99. count int,
  100. ) (*schedapi.ScheduleOutput, error) {
  101. computeUsage := models.SQuota{}
  102. task.GetPendingUsage(&computeUsage, 0)
  103. regionUsage := models.SRegionQuota{}
  104. task.GetPendingUsage(&regionUsage, 1)
  105. schedInput.PendingUsages = []jsonutils.JSONObject{
  106. jsonutils.Marshal(&computeUsage),
  107. jsonutils.Marshal(&regionUsage),
  108. }
  109. var params *jsonutils.JSONDict
  110. if count > 0 {
  111. // if object count <=0, don't need update schedule params
  112. params = jsonutils.Marshal(schedInput).(*jsonutils.JSONDict)
  113. }
  114. task.SetStage("OnScheduleComplete", params)
  115. s := auth.GetSession(ctx, task.GetUserCred(), options.Options.Region)
  116. return scheduler.SchedManager.DoSchedule(s, schedInput, count)
  117. }
  118. func doScheduleObjects(
  119. ctx context.Context,
  120. task IScheduleTask,
  121. objs []IScheduleModel,
  122. ) {
  123. schedInput, err := task.GetSchedParams()
  124. if err != nil {
  125. onSchedulerRequestFail(ctx, task, objs, jsonutils.NewString(fmt.Sprintf("GetSchedParams fail: %s", err)))
  126. return
  127. }
  128. sort.Sort(sortedIScheduleModelList(objs))
  129. schedInput.GuestIds = make([]string, len(objs))
  130. for i := range objs {
  131. schedInput.GuestIds[i] = objs[i].GetId()
  132. }
  133. output, err := doScheduleWithInput(ctx, task, schedInput, len(objs))
  134. if err != nil {
  135. onSchedulerRequestFail(ctx, task, objs, jsonutils.NewString(err.Error()))
  136. return
  137. }
  138. onSchedulerResults(ctx, task, objs, output.Candidates)
  139. }
  140. func cancelPendingUsage(ctx context.Context, task IScheduleTask) {
  141. ClearTaskPendingUsage(ctx, task.(taskman.ITask))
  142. ClearTaskPendingRegionUsage(ctx, task.(taskman.ITask))
  143. }
  144. func onSchedulerRequestFail(
  145. ctx context.Context,
  146. task IScheduleTask,
  147. objs []IScheduleModel,
  148. reason jsonutils.JSONObject,
  149. ) {
  150. for i, obj := range objs {
  151. onObjScheduleFail(ctx, task, obj, reason, i)
  152. }
  153. task.OnScheduleFailed(ctx, reason)
  154. cancelPendingUsage(ctx, task)
  155. }
  156. func onObjScheduleFail(
  157. ctx context.Context,
  158. task IScheduleTask,
  159. obj IScheduleModel,
  160. msg jsonutils.JSONObject,
  161. idx int,
  162. ) {
  163. lockman.LockObject(ctx, obj)
  164. defer lockman.ReleaseObject(ctx, obj)
  165. var reason jsonutils.JSONObject
  166. reason = jsonutils.NewString("No matching resources")
  167. if msg != nil {
  168. reason = jsonutils.NewArray(reason, msg)
  169. }
  170. task.OnScheduleFailCallback(ctx, obj, reason, idx)
  171. }
  172. type sortedIScheduleModelList []IScheduleModel
  173. func (a sortedIScheduleModelList) Len() int { return len(a) }
  174. func (a sortedIScheduleModelList) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
  175. func (a sortedIScheduleModelList) Less(i, j int) bool { return a[i].GetName() < a[j].GetName() }
  176. func onSchedulerResults(
  177. ctx context.Context,
  178. task IScheduleTask,
  179. objs []IScheduleModel,
  180. results []*schedapi.CandidateResource,
  181. ) {
  182. if len(objs) == 0 {
  183. // sched with out object can't clean sched cache immediately
  184. task.SaveScheduleResult(ctx, nil, results[0], 0)
  185. return
  186. }
  187. succCount := 0
  188. for idx := 0; idx < len(objs); idx += 1 {
  189. obj := objs[idx]
  190. result := results[idx]
  191. if len(result.Error) != 0 {
  192. onObjScheduleFail(ctx, task, obj, jsonutils.NewString(result.Error), idx)
  193. continue
  194. }
  195. if result.BackupCandidate == nil {
  196. // normal schedule
  197. onScheduleSucc(ctx, task, obj, result, idx)
  198. } else {
  199. // backup schedule
  200. onMasterSlaveScheduleSucc(ctx, task, obj, result, result.BackupCandidate, idx)
  201. }
  202. succCount += 1
  203. }
  204. if succCount == 0 {
  205. task.OnScheduleFailed(ctx, jsonutils.NewString("Schedule failed"))
  206. }
  207. cancelPendingUsage(ctx, task)
  208. }
  209. func onMasterSlaveScheduleSucc(
  210. ctx context.Context,
  211. task IScheduleTask,
  212. obj IScheduleModel,
  213. master, slave *schedapi.CandidateResource,
  214. index int,
  215. ) {
  216. lockman.LockObject(ctx, obj)
  217. defer lockman.ReleaseObject(ctx, obj)
  218. task.SaveScheduleResultWithBackup(ctx, obj, master, slave, index)
  219. models.HostManager.ClearSchedDescSessionCache(master.HostId, master.SessionId)
  220. models.HostManager.ClearSchedDescSessionCache(slave.HostId, slave.SessionId)
  221. }
  222. func onScheduleSucc(
  223. ctx context.Context,
  224. task IScheduleTask,
  225. obj IScheduleModel,
  226. candidate *schedapi.CandidateResource,
  227. index int,
  228. ) {
  229. hostId := candidate.HostId
  230. lockman.LockRawObject(ctx, models.HostManager.KeywordPlural(), hostId)
  231. defer lockman.ReleaseRawObject(ctx, models.HostManager.KeywordPlural(), hostId)
  232. task.SaveScheduleResult(ctx, obj, candidate, index)
  233. models.HostManager.ClearSchedDescSessionCache(candidate.HostId, candidate.SessionId)
  234. }
  235. func GetBatchParamsAtIndex(task taskman.ITask, index int) *jsonutils.JSONDict {
  236. var data *jsonutils.JSONDict
  237. params := task.GetParams()
  238. paramsArray, _ := params.GetArray("data")
  239. if len(paramsArray) > 0 {
  240. if len(paramsArray) > index {
  241. data = paramsArray[index].(*jsonutils.JSONDict)
  242. } else {
  243. data = paramsArray[0].(*jsonutils.JSONDict)
  244. }
  245. } else {
  246. data = params
  247. }
  248. return data
  249. }