tasks.go 45 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250125112521253125412551256125712581259126012611262126312641265126612671268126912701271127212731274127512761277127812791280128112821283128412851286128712881289129012911292129312941295129612971298129913001301130213031304130513061307130813091310131113121313131413151316131713181319132013211322132313241325132613271328132913301331133213331334133513361337133813391340134113421343134413451346134713481349135013511352135313541355135613571358135913601361136213631364136513661367136813691370137113721373137413751376137713781379138013811382138313841385138613871388138913901391139213931394139513961397139813991400140114021403140414051406140714081409141014111412141314141415141614171418141914201421142214231424142514261427142814291430143114321433143414351436143714381439144014411442144314441445144614471448144914501451145214531454145514561457145814591460146114621463146414651466146714681469147014711472147314741475147614771478147914801481148214831484148514861487148814891490149114921493149414951496149714981499150015011502150315041505150615071508150915101511151215131514151515161517151815191520152115221523152415251526152715281529153015311532153315341535153615371538153915401541154215431544154515461547154815491550155115521553155415551556155715581559156015611562156315641565156615671568156915701571157215731574157515761577157815791580158115821583158415851586158715881589
  1. // Copyright 2019 Yunion
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package taskman
  15. import (
  16. "context"
  17. "database/sql"
  18. "fmt"
  19. "net/http"
  20. "reflect"
  21. "runtime/debug"
  22. "sort"
  23. "strconv"
  24. "strings"
  25. "time"
  26. "yunion.io/x/jsonutils"
  27. "yunion.io/x/log"
  28. "yunion.io/x/pkg/appctx"
  29. "yunion.io/x/pkg/errors"
  30. "yunion.io/x/pkg/gotypes"
  31. "yunion.io/x/pkg/util/httputils"
  32. "yunion.io/x/pkg/util/rbacscope"
  33. "yunion.io/x/pkg/util/reflectutils"
  34. "yunion.io/x/pkg/util/stringutils"
  35. "yunion.io/x/pkg/util/timeutils"
  36. "yunion.io/x/pkg/util/version"
  37. "yunion.io/x/pkg/utils"
  38. "yunion.io/x/sqlchemy"
  39. "yunion.io/x/onecloud/pkg/apis"
  40. "yunion.io/x/onecloud/pkg/appsrv"
  41. "yunion.io/x/onecloud/pkg/cloudcommon/consts"
  42. "yunion.io/x/onecloud/pkg/cloudcommon/db"
  43. "yunion.io/x/onecloud/pkg/cloudcommon/db/lockman"
  44. "yunion.io/x/onecloud/pkg/cloudcommon/db/quotas"
  45. "yunion.io/x/onecloud/pkg/httperrors"
  46. "yunion.io/x/onecloud/pkg/mcclient"
  47. "yunion.io/x/onecloud/pkg/mcclient/auth"
  48. "yunion.io/x/onecloud/pkg/mcclient/modules/yunionconf"
  49. "yunion.io/x/onecloud/pkg/util/ctx"
  50. "yunion.io/x/onecloud/pkg/util/logclient"
  51. "yunion.io/x/onecloud/pkg/util/stringutils2"
  52. )
  53. const (
  54. PARENT_TASK_ID_KEY = "parent_task_id"
  55. PENDING_USAGE_KEY = "__pending_usage__"
  56. PARENT_TASK_NOTIFY_KEY = "__parent_task_notifyurl"
  57. REQUEST_CONTEXT_KEY = "__request_context"
  58. TASK_STAGE_FAILED = "failed"
  59. TASK_STAGE_COMPLETE = "complete"
  60. MAX_REMOTE_NOTIFY_TRIES = 5
  61. MULTI_OBJECTS_ID = "[--MULTI_OBJECTS--]"
  62. TASK_INIT_STAGE = "on_init"
  63. CONVERT_TASK = "convert_task"
  64. LANG = "lang"
  65. taskStatusDone = "done"
  66. TASK_STATUS_QUEUE = "queue"
  67. )
  68. type STaskManager struct {
  69. db.SModelBaseManager
  70. db.SProjectizedResourceBaseManager
  71. db.SStatusResourceBaseManager
  72. }
  73. var TaskManager *STaskManager
  74. var userCredWidthLimit = 0
  75. func init() {
  76. TaskManager = &STaskManager{
  77. SModelBaseManager: db.NewModelBaseManager(STask{}, "tasks_tbl", "task", "tasks"),
  78. }
  79. TaskManager.SetVirtualObject(TaskManager)
  80. if field, ok := reflect.TypeOf(&STask{}).Elem().FieldByName("UserCred"); ok {
  81. if widthStr := field.Tag.Get(sqlchemy.TAG_WIDTH); len(widthStr) > 0 {
  82. userCredWidthLimit, _ = strconv.Atoi(widthStr)
  83. }
  84. }
  85. TaskManager.TableSpec().AddIndex(true, "id", "created_at", "parent_task_id", "stage")
  86. }
  87. type STask struct {
  88. db.SModelBase
  89. // 资源创建时间
  90. CreatedAt time.Time `nullable:"false" created_at:"true" index:"true" get:"user" list:"user" json:"created_at"`
  91. // 资源更新时间
  92. UpdatedAt time.Time `nullable:"false" updated_at:"true" list:"user" json:"updated_at"`
  93. // 资源被更新次数
  94. UpdateVersion int `default:"0" nullable:"false" auto_version:"true" list:"user" json:"update_version"`
  95. Id string `width:"36" charset:"ascii" primary:"true" list:"user"` // Column(VARCHAR(36, charset='ascii'), primary_key=True, default=get_uuid)
  96. STaskBase
  97. db.SProjectizedResourceBase
  98. taskObject db.IStandaloneModel `ignore:"true"`
  99. taskObjects []db.IStandaloneModel `ignore:"true"`
  100. SubTaskCount int `ignore:"true" json:"sub_task_count"`
  101. FailSubTaskCnt int `ignore:"true" json:"fail_sub_task_cnt"`
  102. SuccSubTaskCnt int `ignore:"true" json:"succ_sub_task_cnt"`
  103. }
  104. func (manager *STaskManager) CreateByInsertOrUpdate() bool {
  105. return false
  106. }
  107. func (manager *STaskManager) AllowListItems(ctx context.Context, userCred mcclient.TokenCredential, query jsonutils.JSONObject) bool {
  108. return true
  109. }
  110. func (manager *STaskManager) AllowCreateItem(ctx context.Context, userCred mcclient.TokenCredential, query jsonutils.JSONObject, data jsonutils.JSONObject) bool {
  111. return false
  112. }
  113. func (manager *STaskManager) FilterById(q *sqlchemy.SQuery, idStr string) *sqlchemy.SQuery {
  114. return q.Equals("id", idStr)
  115. }
  116. func (manager *STaskManager) FilterByNotId(q *sqlchemy.SQuery, idStr string) *sqlchemy.SQuery {
  117. return q.NotEquals("id", idStr)
  118. }
  119. func (manager *STaskManager) FilterByName(q *sqlchemy.SQuery, name string) *sqlchemy.SQuery {
  120. return q.Equals("id", name)
  121. }
  122. func (manager *STaskManager) PerformAction(ctx context.Context, userCred mcclient.TokenCredential, taskId string, query jsonutils.JSONObject, data jsonutils.JSONObject) (jsonutils.JSONObject, error) {
  123. err := runTask(taskId, data)
  124. if err != nil {
  125. return nil, errors.Wrapf(err, "runTask")
  126. }
  127. resp := jsonutils.NewDict()
  128. resp.Add(jsonutils.NewString("ok"), "result")
  129. return resp, nil
  130. }
  131. func (manager *STask) PreCheckPerformAction(
  132. ctx context.Context, userCred mcclient.TokenCredential,
  133. action string, query jsonutils.JSONObject, data jsonutils.JSONObject,
  134. ) error {
  135. return nil
  136. }
  137. func (task *STask) GetOwnerId() mcclient.IIdentityProvider {
  138. return task.SProjectizedResourceBase.GetOwnerId()
  139. }
  140. func (manager *STaskManager) FilterByOwner(ctx context.Context, q *sqlchemy.SQuery, man db.FilterByOwnerProvider, userCred mcclient.TokenCredential, owner mcclient.IIdentityProvider, scope rbacscope.TRbacScope) *sqlchemy.SQuery {
  141. taskQ := TaskObjectManager.Query("task_id")
  142. taskQ = taskQ.Snapshot()
  143. taskQ = manager.SProjectizedResourceBaseManager.FilterByOwner(ctx, taskQ, man, userCred, owner, scope)
  144. if taskQ.IsAltered() {
  145. taskSubQ := taskQ.SubQuery()
  146. q = q.Join(taskSubQ, sqlchemy.Equals(q.Field("id"), taskSubQ.Field("task_id")))
  147. }
  148. return q
  149. }
  150. func (manager *STaskManager) FetchOwnerId(ctx context.Context, data jsonutils.JSONObject) (mcclient.IIdentityProvider, error) {
  151. return manager.SProjectizedResourceBaseManager.FetchOwnerId(ctx, data)
  152. }
  153. func (manager *STaskManager) FetchTaskById(taskId string) *STask {
  154. return manager.fetchTask(taskId)
  155. }
  156. func (task *STask) ValidateDeleteCondition(ctx context.Context, info jsonutils.JSONObject) error {
  157. return httperrors.NewForbiddenError("forbidden")
  158. }
  159. func (task *STask) ValidateUpdateCondition(ctx context.Context) error {
  160. return httperrors.NewForbiddenError("forbidden")
  161. }
  162. func (task *STask) BeforeInsert() {
  163. if len(task.Id) == 0 {
  164. task.Id = stringutils.UUID4()
  165. }
  166. }
  167. func (task *STask) GetId() string {
  168. return task.Id
  169. }
  170. func (task *STask) GetName() string {
  171. return task.TaskName
  172. }
  173. func (task *STask) saveStartAt() {
  174. if !task.StartAt.IsZero() {
  175. return
  176. }
  177. _, err := db.Update(task, func() error {
  178. task.StartAt = timeutils.UtcNow()
  179. return nil
  180. })
  181. if err != nil {
  182. log.Errorf("task %s save start_at fail: %s", task.String(), err)
  183. }
  184. }
  185. func fetchTaskParams(
  186. ctx context.Context,
  187. taskName string,
  188. taskData *jsonutils.JSONDict,
  189. parentTaskId string,
  190. parentTaskNotifyUrl string,
  191. pendingUsages []quotas.IQuota,
  192. ) *jsonutils.JSONDict {
  193. var data *jsonutils.JSONDict
  194. if taskData != nil {
  195. excludeKeys := []string{
  196. PARENT_TASK_ID_KEY, PARENT_TASK_NOTIFY_KEY, PENDING_USAGE_KEY,
  197. }
  198. for i := 1; taskData.Contains(pendingUsageKey(i)); i += 1 {
  199. excludeKeys = append(excludeKeys, pendingUsageKey(i))
  200. }
  201. data = taskData.CopyExcludes(excludeKeys...)
  202. } else {
  203. data = jsonutils.NewDict()
  204. }
  205. reqContext := appctx.FetchAppContextData(ctx)
  206. if !reqContext.IsZero() {
  207. data.Add(jsonutils.Marshal(&reqContext), REQUEST_CONTEXT_KEY)
  208. }
  209. if len(parentTaskId) > 0 || len(parentTaskNotifyUrl) > 0 {
  210. if len(parentTaskId) > 0 {
  211. data.Add(jsonutils.NewString(parentTaskId), PARENT_TASK_ID_KEY)
  212. }
  213. if len(parentTaskNotifyUrl) > 0 {
  214. data.Add(jsonutils.NewString(parentTaskNotifyUrl), PARENT_TASK_NOTIFY_KEY)
  215. log.Infof("%s notify parent url: %s", taskName, parentTaskNotifyUrl)
  216. }
  217. } else {
  218. if !reqContext.IsZero() {
  219. if len(reqContext.TaskId) > 0 && len(reqContext.TaskNotifyUrl) == 0 {
  220. data.Add(jsonutils.NewString(reqContext.TaskId), PARENT_TASK_ID_KEY)
  221. }
  222. if len(reqContext.TaskNotifyUrl) > 0 {
  223. data.Add(jsonutils.NewString(reqContext.TaskNotifyUrl), PARENT_TASK_NOTIFY_KEY)
  224. log.Infof("%s notify parent url: %s", taskName, reqContext.TaskNotifyUrl)
  225. }
  226. }
  227. }
  228. if len(pendingUsages) > 0 {
  229. for i := range pendingUsages {
  230. pendingUsage := pendingUsages[i]
  231. if gotypes.IsNil(pendingUsage) {
  232. continue
  233. }
  234. key := pendingUsageKey(i)
  235. data.Add(jsonutils.Marshal(pendingUsage), key)
  236. }
  237. }
  238. return data
  239. }
  240. func (manager *STaskManager) NewTask(
  241. ctx context.Context,
  242. taskName string,
  243. obj db.IStandaloneModel,
  244. userCred mcclient.TokenCredential,
  245. taskData *jsonutils.JSONDict,
  246. parentTaskId string,
  247. parentTaskNotifyUrl string,
  248. pendingUsage ...quotas.IQuota,
  249. ) (*STask, error) {
  250. if userCredWidthLimit > 0 && len(userCred.String()) > userCredWidthLimit {
  251. return nil, fmt.Errorf("Too many permissions for user %s", userCred.GetUserName())
  252. }
  253. if !isTaskExist(taskName) {
  254. return nil, fmt.Errorf("task %s not found", taskName)
  255. }
  256. data := fetchTaskParams(ctx, taskName, taskData, parentTaskId, parentTaskNotifyUrl, pendingUsage)
  257. task := &STask{
  258. STaskBase: STaskBase{
  259. ObjType: obj.Keyword(),
  260. ObjId: obj.GetId(),
  261. Object: obj.GetName(),
  262. TaskName: taskName,
  263. UserCred: userCred,
  264. Params: data,
  265. Stage: TASK_INIT_STAGE,
  266. ParentTaskId: parentTaskId,
  267. },
  268. }
  269. task.SetModelManager(manager, task)
  270. err := manager.TableSpec().Insert(ctx, task)
  271. if err != nil {
  272. log.Errorf("Task insert error %s", err)
  273. return nil, err
  274. }
  275. task.SetProgressAndStatus(0, TASK_STATUS_QUEUE)
  276. {
  277. to, err := TaskObjectManager.insertObject(ctx, task.Id, obj)
  278. if err != nil {
  279. log.Errorf("Taskobject insert error %s", err)
  280. return nil, errors.Wrap(err, "TaskObjectManager.insertObject")
  281. }
  282. db.Update(task, func() error {
  283. task.ProjectId = to.ProjectId
  284. task.DomainId = to.DomainId
  285. return nil
  286. })
  287. }
  288. parentTask := task.GetParentTask()
  289. if parentTask != nil {
  290. st := &SSubTask{TaskId: parentTask.Id, Stage: parentTask.Stage, SubtaskId: task.Id}
  291. st.SetModelManager(SubTaskManager, st)
  292. err := SubTaskManager.TableSpec().Insert(ctx, st)
  293. if err != nil {
  294. log.Errorf("Subtask insert error %s", err)
  295. return nil, err
  296. }
  297. }
  298. return task, nil
  299. }
  300. func (manager *STaskManager) NewParallelTask(
  301. ctx context.Context,
  302. taskName string,
  303. objs []db.IStandaloneModel,
  304. userCred mcclient.TokenCredential,
  305. taskData *jsonutils.JSONDict,
  306. parentTaskId string,
  307. parentTaskNotifyUrl string,
  308. pendingUsage ...quotas.IQuota,
  309. ) (*STask, error) {
  310. if !isTaskExist(taskName) {
  311. return nil, fmt.Errorf("task %s not found", taskName)
  312. }
  313. if len(objs) == 0 {
  314. return nil, fmt.Errorf("failed to do task %s with zero objs", taskName)
  315. }
  316. log.Debugf("number of objs: %d", len(objs))
  317. data := fetchTaskParams(ctx, taskName, taskData, parentTaskId, parentTaskNotifyUrl, pendingUsage)
  318. task := &STask{
  319. STaskBase: STaskBase{
  320. ObjType: objs[0].Keyword(),
  321. Object: MULTI_OBJECTS_ID,
  322. ObjId: MULTI_OBJECTS_ID,
  323. TaskName: taskName,
  324. UserCred: userCred,
  325. Params: data,
  326. Stage: TASK_INIT_STAGE,
  327. ParentTaskId: parentTaskId,
  328. },
  329. }
  330. task.SetModelManager(manager, task)
  331. err := manager.TableSpec().Insert(ctx, task)
  332. if err != nil {
  333. log.Errorf("Task insert error %s", err)
  334. return nil, err
  335. }
  336. task.SetProgressAndStatus(0, TASK_STATUS_QUEUE)
  337. domainIds := stringutils2.NewSortedStrings(nil)
  338. tenantIds := stringutils2.NewSortedStrings(nil)
  339. for i := range objs {
  340. to, err := TaskObjectManager.insertObject(ctx, task.Id, objs[i])
  341. if err != nil {
  342. log.Errorf("Taskobject insert error %s", err)
  343. return nil, errors.Wrap(err, "insert task object")
  344. }
  345. tenantIds = tenantIds.Append(to.ProjectId)
  346. domainIds = domainIds.Append(to.DomainId)
  347. }
  348. db.Update(task, func() error {
  349. task.DomainId = strings.Join(domainIds, ",")
  350. task.ProjectId = strings.Join(tenantIds, ",")
  351. return nil
  352. })
  353. parentTask := task.GetParentTask()
  354. if parentTask != nil {
  355. st := &SSubTask{TaskId: parentTask.Id, Stage: parentTask.Stage, SubtaskId: task.Id}
  356. st.SetModelManager(SubTaskManager, st)
  357. err := SubTaskManager.TableSpec().Insert(ctx, st)
  358. if err != nil {
  359. log.Errorf("Subtask insert error %s", err)
  360. return nil, err
  361. }
  362. }
  363. return task, nil
  364. }
  365. func (manager *STaskManager) fetchTask(idStr string) *STask {
  366. iTask, err := db.NewModelObject(manager)
  367. if err != nil {
  368. log.Errorf("New task object fail: %s", err)
  369. return nil
  370. }
  371. err = manager.Query().Equals("id", idStr).First(iTask)
  372. if err != nil {
  373. log.Errorf("GetTask %s fail: %s", idStr, err)
  374. return nil
  375. }
  376. task := iTask.(*STask)
  377. task.fixParams()
  378. return task
  379. }
  380. func (task *STask) fixParams() {
  381. if task.Params == nil {
  382. task.Params = jsonutils.NewDict()
  383. }
  384. }
  385. func (manager *STaskManager) execTask(taskId string, data jsonutils.JSONObject) {
  386. baseTask := manager.fetchTask(taskId)
  387. if baseTask == nil {
  388. return
  389. }
  390. manager.execTaskObject(baseTask, data)
  391. }
  392. func (manager *STaskManager) execTaskObject(baseTask *STask, data jsonutils.JSONObject) {
  393. taskType, ok := taskTable[baseTask.TaskName]
  394. if !ok {
  395. log.Errorf("Cannot find task %s", baseTask.TaskName)
  396. return
  397. }
  398. log.Debugf("Do task %s(%s) with data %s at stage %s", taskType, baseTask.Id, data, baseTask.Stage)
  399. taskValue := reflect.New(taskType)
  400. if taskValue.Type().Implements(ITaskType) {
  401. execITask(taskValue, baseTask, data, false)
  402. } else if taskValue.Type().Implements(IBatchTaskType) {
  403. execITask(taskValue, baseTask, data, true)
  404. } else {
  405. log.Errorf("Unsupported task type?? %s", taskValue.Type())
  406. }
  407. }
  408. type sSortedObjects []db.IStandaloneModel
  409. func (a sSortedObjects) Len() int { return len(a) }
  410. func (a sSortedObjects) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
  411. func (a sSortedObjects) Less(i, j int) bool { return a[i].GetId() < a[j].GetId() }
  412. func execITask(taskValue reflect.Value, task *STask, odata jsonutils.JSONObject, isMulti bool) {
  413. ctxData := task.GetRequestContext()
  414. ctx := ctxData.GetContext()
  415. task.saveStartAt()
  416. taskFailed := false
  417. var data jsonutils.JSONObject
  418. if odata != nil {
  419. switch dictdata := odata.(type) {
  420. case *jsonutils.JSONDict:
  421. taskStatus, _ := odata.GetString("__status__")
  422. if len(taskStatus) > 0 && taskStatus != "OK" {
  423. taskFailed = true
  424. dictdata.Set("__stage__", jsonutils.NewString(task.Stage))
  425. if !dictdata.Contains("__reason__") {
  426. reasonJson := dictdata.CopyExcludes("__status__", "__stage__")
  427. dictdata.Set("__reason__", reasonJson)
  428. }
  429. /*if vdata, ok := data.(*jsonutils.JSONDict); ok {
  430. reason, err := vdata.Get("__reason__") // only dict support Get
  431. if err != nil {
  432. reason = jsonutils.NewString(fmt.Sprintf("Task failed due to unknown remote errors! %s", odata))
  433. vdata.Set("__reason__", reason)
  434. }
  435. }*/
  436. }
  437. data = dictdata
  438. default:
  439. data = odata
  440. }
  441. } else {
  442. data = jsonutils.NewDict()
  443. }
  444. stageName := task.Stage
  445. if taskFailed {
  446. stageName = fmt.Sprintf("%sFailed", task.Stage)
  447. if strings.Contains(stageName, "_") {
  448. stageName = fmt.Sprintf("%s_failed", task.Stage)
  449. }
  450. }
  451. if strings.Contains(stageName, "_") {
  452. stageName = utils.Kebab2Camel(stageName, "_")
  453. }
  454. funcValue := taskValue.MethodByName(stageName)
  455. if !funcValue.IsValid() || funcValue.IsNil() {
  456. msg := fmt.Sprintf("Stage %s not found", stageName)
  457. if taskFailed {
  458. // failed handler is optional, ignore the error
  459. log.Warningf("%s", msg)
  460. msg, _ = data.GetString()
  461. } else {
  462. log.Errorf("%s", msg)
  463. }
  464. task.SetStageFailed(ctx, jsonutils.NewString(msg))
  465. task.SaveRequestContext(&ctxData)
  466. return
  467. }
  468. objManager := db.GetModelManager(task.ObjType)
  469. if objManager == nil {
  470. msg := fmt.Sprintf("model %s %s(%s) not found??? ...", task.ObjType, task.Object, task.ObjId)
  471. log.Errorf("%s", msg)
  472. task.SetStageFailed(ctx, jsonutils.NewString(msg))
  473. task.SaveRequestContext(&ctxData)
  474. return
  475. }
  476. // log.Debugf("objManager: %s", objManager)
  477. objResManager, ok := objManager.(db.IStandaloneModelManager)
  478. if !ok {
  479. msg := fmt.Sprintf("model %s %s(%s) is not a resource??? ...", task.ObjType, task.Object, task.ObjId)
  480. log.Errorf("%s", msg)
  481. task.SetStageFailed(ctx, jsonutils.NewString(msg))
  482. task.SaveRequestContext(&ctxData)
  483. return
  484. }
  485. params := make([]reflect.Value, 3)
  486. params[0] = reflect.ValueOf(ctx)
  487. if isMulti {
  488. objIds := TaskObjectManager.GetObjectIds(task)
  489. objs := make([]db.IStandaloneModel, len(objIds))
  490. for i, objId := range objIds {
  491. obj, err := objResManager.FetchById(objId)
  492. if err != nil {
  493. msg := fmt.Sprintf("fail to find %s object %s", task.ObjType, objId)
  494. log.Errorf("%s", msg)
  495. task.SetStageFailed(ctx, jsonutils.NewString(msg))
  496. task.SaveRequestContext(&ctxData)
  497. return
  498. }
  499. objs[i] = obj.(db.IStandaloneModel)
  500. }
  501. task.taskObjects = objs
  502. // sort objects by ids to avoid deadlock
  503. sort.Sort(sSortedObjects(objs))
  504. for i := range objs {
  505. lockman.LockObject(ctx, objs[i])
  506. defer lockman.ReleaseObject(ctx, objs[i])
  507. }
  508. params[1] = reflect.ValueOf(objs)
  509. } else {
  510. obj, err := objResManager.FetchById(task.ObjId)
  511. if err != nil {
  512. msg := fmt.Sprintf("fail to find %s object %s", task.ObjType, task.ObjId)
  513. log.Errorf("%s", msg)
  514. task.SetStageFailed(ctx, jsonutils.NewString(msg))
  515. task.SaveRequestContext(&ctxData)
  516. return
  517. }
  518. task.taskObject = obj.(db.IStandaloneModel)
  519. lockman.LockObject(ctx, obj)
  520. defer lockman.ReleaseObject(ctx, obj)
  521. params[1] = reflect.ValueOf(obj)
  522. }
  523. params[2] = reflect.ValueOf(data)
  524. filled := reflectutils.FillEmbededStructValue(taskValue.Elem(), reflect.Indirect(reflect.ValueOf(task)))
  525. if !filled {
  526. log.Errorf("Cannot locate baseTask embedded struct, give up...")
  527. return
  528. }
  529. defer func() {
  530. if r := recover(); r != nil {
  531. // call set stage failed, should not call task.SetStageFailed
  532. // func SetStageFailed may be overloading
  533. yunionconf.BugReport.SendBugReport(ctx, version.GetShortString(), string(debug.Stack()), errors.Errorf("%s", r))
  534. log.Errorf("Task %s PANIC on stage %s: %v \n%s", task.TaskName, stageName, r, debug.Stack())
  535. SetStageFailedFuncValue := taskValue.MethodByName("SetStageFailed")
  536. SetStageFailedFuncValue.Call(
  537. []reflect.Value{
  538. reflect.ValueOf(ctx),
  539. reflect.ValueOf(jsonutils.NewString(fmt.Sprintf("%v", r))),
  540. },
  541. )
  542. obj, err := objResManager.FetchById(task.ObjId)
  543. if err != nil {
  544. return
  545. }
  546. statusObj, ok := obj.(db.IStatusStandaloneModel)
  547. if ok {
  548. db.StatusBaseSetStatus(ctx, statusObj, task.GetUserCred(), apis.STATUS_UNKNOWN, fmt.Sprintf("%v", r))
  549. }
  550. notes := map[string]interface{}{
  551. "Stack": string(debug.Stack()),
  552. "Version": version.GetShortString(),
  553. "Task": task.TaskName,
  554. "Stage": stageName,
  555. "Message": fmt.Sprintf("%v", r),
  556. }
  557. logclient.AddSimpleActionLog(obj, logclient.ACT_PANIC, notes, task.GetUserCred(), false)
  558. }
  559. }()
  560. log.Debugf("Call %s(%s) %s %#v", task.TaskName, task.Id, stageName, params)
  561. funcValue.Call(params)
  562. // call save request context
  563. saveRequestContextFuncValue := taskValue.MethodByName("SaveRequestContext")
  564. saveRequestContextFuncValue.Call([]reflect.Value{reflect.ValueOf(&ctxData)})
  565. }
  566. func (task *STask) ScheduleRun(data jsonutils.JSONObject) error {
  567. return runTask(task.Id, data)
  568. }
  569. func (task *STask) IsSubtask() bool {
  570. return task.HasParentTask()
  571. }
  572. func (task *STask) GetParentTaskId() string {
  573. if len(task.ParentTaskId) > 0 {
  574. return task.ParentTaskId
  575. }
  576. parentTaskId, _ := task.Params.GetString(PARENT_TASK_ID_KEY)
  577. if len(parentTaskId) > 0 {
  578. return parentTaskId
  579. }
  580. return ""
  581. }
  582. func (task *STask) HasParentTask() bool {
  583. parentTaskId := task.GetParentTaskId()
  584. if len(parentTaskId) > 0 {
  585. return true
  586. }
  587. return false
  588. }
  589. func (task *STask) GetParentTask() *STask {
  590. parentTaskId := task.GetParentTaskId()
  591. if len(parentTaskId) > 0 {
  592. return TaskManager.fetchTask(parentTaskId)
  593. }
  594. return nil
  595. }
  596. func (task *STask) GetRequestContext() appctx.AppContextData {
  597. ctxData := appctx.AppContextData{}
  598. if task.Params != nil {
  599. ctxJson, _ := task.Params.Get(REQUEST_CONTEXT_KEY)
  600. if ctxJson != nil {
  601. ctxJson.Unmarshal(&ctxData)
  602. }
  603. }
  604. // clear parentTaskId
  605. ctxData.TaskId = ""
  606. ctxData.TaskNotifyUrl = ""
  607. return ctxData
  608. }
  609. func (task *STask) SaveRequestContext(data *appctx.AppContextData) {
  610. jsonData := jsonutils.Marshal(data)
  611. log.Debugf("SaveRequestContext %s(%s) %s param %s", task.TaskName, task.Id, jsonData, task.Params)
  612. _, err := db.Update(task, func() error {
  613. params := task.Params.CopyExcludes(REQUEST_CONTEXT_KEY)
  614. params.Add(jsonData, REQUEST_CONTEXT_KEY)
  615. task.Params = params
  616. task.EndAt = timeutils.UtcNow()
  617. return nil
  618. })
  619. log.Debugf("Params: %s(%s) %s", task.TaskName, task.Id, task.Params)
  620. if err != nil {
  621. log.Errorf("save_request_context fail %s", err)
  622. }
  623. }
  624. func (task *STask) SaveParams(data *jsonutils.JSONDict) error {
  625. return task.SetStage("", data)
  626. }
  627. func (task *STask) SetStage(stageName string, data *jsonutils.JSONDict) error {
  628. _, err := db.Update(task, func() error {
  629. params := jsonutils.NewDict()
  630. params.Update(task.Params)
  631. if data != nil {
  632. params.Update(data)
  633. }
  634. if len(stageName) > 0 {
  635. stages, _ := params.Get("__stages")
  636. if stages == nil {
  637. stages = jsonutils.NewArray()
  638. params.Add(stages, "__stages")
  639. }
  640. stageList := stages.(*jsonutils.JSONArray)
  641. stageData := jsonutils.NewDict()
  642. stageData.Add(jsonutils.NewString(task.Stage), "name")
  643. stageData.Add(jsonutils.NewTimeString(time.Now()), "complete_at")
  644. stageList.Add(stageData)
  645. task.Stage = stageName
  646. }
  647. task.Params = params
  648. return nil
  649. })
  650. if err != nil {
  651. log.Errorf("Task %s(%s) set_stage %s fail %s", task.TaskName, task.Id, stageName, err)
  652. }
  653. return err
  654. }
  655. func (task *STask) GetObjectIdStr() string {
  656. if task.ObjId == MULTI_OBJECTS_ID {
  657. return strings.Join(TaskObjectManager.GetObjectIds(task), ",")
  658. } else {
  659. return task.ObjId
  660. }
  661. }
  662. func (task *STask) GetObjectStr() string {
  663. if task.ObjId == MULTI_OBJECTS_ID {
  664. return strings.Join(TaskObjectManager.GetObjectNames(task), ",")
  665. } else {
  666. return task.Object
  667. }
  668. }
  669. func (task *STask) SetStageComplete(ctx context.Context, data *jsonutils.JSONDict) {
  670. log.Infof("XXX TASK %s(%s) complete", task.TaskName, task.Id)
  671. task.SetStage(TASK_STAGE_COMPLETE, data)
  672. task.SetProgressAndStatus(100, taskStatusDone)
  673. if data == nil {
  674. data = jsonutils.NewDict()
  675. }
  676. if data.Size() == 0 {
  677. data.Add(jsonutils.NewString(task.GetObjectIdStr()), "id")
  678. data.Add(jsonutils.NewString(task.GetObjectStr()), "name")
  679. data.Add(jsonutils.NewString(task.ObjType), "type")
  680. }
  681. task.NotifyParentTaskComplete(ctx, data, false)
  682. }
  683. func (task *STask) SetStageFailed(ctx context.Context, reason jsonutils.JSONObject) {
  684. if task.Stage == TASK_STAGE_FAILED {
  685. log.Warningf("Task %s(%s) has been failed", task.TaskName, task.Id)
  686. return
  687. }
  688. log.Infof("XXX TASK %s(%s) failed: %s on stage %s", task.TaskName, task.Id, reason, task.Stage)
  689. reasonDict := jsonutils.NewDict()
  690. reasonDict.Add(jsonutils.NewString(task.Stage), "stage")
  691. if reason != nil {
  692. reasonDict.Add(reason, "reason")
  693. }
  694. reason = reasonDict
  695. prevFailed, _ := task.Params.Get("__failed_reason")
  696. if prevFailed != nil {
  697. switch prevFailed2 := prevFailed.(type) {
  698. case *jsonutils.JSONArray:
  699. prevFailed2.Add(reason)
  700. reason = prevFailed
  701. default:
  702. reason = jsonutils.NewArray(prevFailed, reason)
  703. }
  704. }
  705. data := jsonutils.NewDict()
  706. data.Add(reason, "__failed_reason")
  707. task.SetStage(TASK_STAGE_FAILED, data)
  708. task.SetProgressAndStatus(100, taskStatusDone)
  709. task.NotifyParentTaskFailure(ctx, reason)
  710. }
  711. func (task *STask) NotifyParentTaskComplete(ctx context.Context, body *jsonutils.JSONDict, failed bool) {
  712. log.Infof("notify_parent_task_complete: %s(%s) params %s", task.TaskName, task.Id, task.Params)
  713. parentTaskId := task.GetParentTaskId()
  714. parentTaskNotify, _ := task.Params.GetString(PARENT_TASK_NOTIFY_KEY)
  715. if len(parentTaskId) > 0 {
  716. subTask := SubTaskManager.GetSubTask(parentTaskId, task.Id)
  717. if subTask != nil {
  718. subTask.SaveResults(failed, body)
  719. }
  720. func() {
  721. lockman.LockRawObject(ctx, "tasks", parentTaskId)
  722. defer lockman.ReleaseRawObject(ctx, "tasks", parentTaskId)
  723. pTask := TaskManager.fetchTask(parentTaskId)
  724. if pTask == nil {
  725. saveTaskUpCallStatus(task.GetId(), parentTaskId, false, errors.Wrap(errors.ErrNotFound, "Parent task not found").Error())
  726. log.Errorf("Parent task %s not found", parentTaskId)
  727. return
  728. }
  729. if pTask.IsCurrentStageComplete() {
  730. err := pTask.ScheduleRun(body)
  731. if err != nil {
  732. saveTaskUpCallStatus(task.GetId(), parentTaskId, false, err.Error())
  733. } else {
  734. saveTaskUpCallStatus(task.GetId(), parentTaskId, true, "")
  735. }
  736. }
  737. }()
  738. }
  739. if len(parentTaskNotify) > 0 {
  740. header := task.getTaskHeader()
  741. go notifyRemoteTask(ctx, parentTaskNotify, task.GetId(), header, body, 0)
  742. }
  743. }
  744. func notifyRemoteTask(ctx context.Context, notifyUrl string, taskId string, header http.Header, body jsonutils.JSONObject, tried uint) {
  745. client := httputils.GetDefaultClient()
  746. _, body, err := httputils.JSONRequest(client, ctx, "POST", notifyUrl, header, body, false)
  747. if err != nil {
  748. log.Errorf("notifyRemoteTask fail %s", err)
  749. if tried > MAX_REMOTE_NOTIFY_TRIES {
  750. log.Errorf("notifyRemoteTask max tried reached, give up...")
  751. saveTaskUpCallStatus(taskId, notifyUrl, false, fmt.Sprintf("notifyRemoteTask max tried reached, give up... %s", err.Error()))
  752. } else {
  753. time.Sleep(time.Second * time.Duration(1<<tried))
  754. notifyRemoteTask(ctx, notifyUrl, taskId, header, body, tried+1)
  755. }
  756. return
  757. } else {
  758. log.Infof("Notify remote URL %s get acked: %s, taskId: %s", notifyUrl, body.String(), taskId)
  759. saveTaskUpCallStatus(taskId, notifyUrl, true, body.String())
  760. }
  761. }
  762. func (task *STask) NotifyParentTaskFailure(ctx context.Context, reason jsonutils.JSONObject) {
  763. body := jsonutils.NewDict()
  764. body.Add(jsonutils.NewString("error"), "__status__")
  765. body.Add(jsonutils.NewString(task.TaskName), "__task_name__")
  766. body.Add(reason, "__reason__")
  767. task.NotifyParentTaskComplete(ctx, body, true)
  768. }
  769. func (task *STask) IsCurrentStageComplete() bool {
  770. totalSubtasksCnt, _ := SubTaskManager.GetTotalSubtasksCount(task.Id, task.Stage)
  771. initSubtasksCnt, _ := SubTaskManager.GetInitSubtasksCount(task.Id, task.Stage)
  772. log.Debugf("Task %s IsCurrentStageComplete totalSubtasks %d initSubtasks %d ", task.String(), totalSubtasksCnt, initSubtasksCnt)
  773. task.SetProgress(float32(totalSubtasksCnt-initSubtasksCnt) * 100 / float32(totalSubtasksCnt))
  774. if totalSubtasksCnt > 0 && initSubtasksCnt == 0 {
  775. return true
  776. } else {
  777. return false
  778. }
  779. }
  780. func (task *STask) GetPendingUsage(quota quotas.IQuota, index int) error {
  781. key := pendingUsageKey(index)
  782. if task.Params.Contains(key) {
  783. quotaJson, err := task.Params.Get(key)
  784. if err != nil {
  785. return errors.Wrapf(err, "task.Params.Get %s", key)
  786. }
  787. err = quotaJson.Unmarshal(quota)
  788. if err != nil {
  789. return errors.Wrap(err, "quotaJson.Unmarshal")
  790. }
  791. }
  792. return nil
  793. }
  794. func pendingUsageKey(index int) string {
  795. key := PENDING_USAGE_KEY
  796. if index > 0 {
  797. key += "." + strconv.FormatInt(int64(index), 10)
  798. }
  799. return key
  800. }
  801. func (task *STask) SetPendingUsage(quota quotas.IQuota, index int) error {
  802. _, err := db.Update(task, func() error {
  803. key := pendingUsageKey(index)
  804. params := task.Params.CopyExcludes(key)
  805. params.Add(jsonutils.Marshal(quota), key)
  806. task.Params = params
  807. return nil
  808. })
  809. if err != nil {
  810. log.Errorf("set_pending_usage fail %s", err)
  811. }
  812. return err
  813. }
  814. func (task *STask) ClearPendingUsage(index int) error {
  815. _, err := db.Update(task, func() error {
  816. key := pendingUsageKey(index)
  817. params := task.Params.CopyExcludes(key)
  818. task.Params = params
  819. return nil
  820. })
  821. if err != nil {
  822. log.Errorf("clear_pending_usage fail %s", err)
  823. }
  824. return err
  825. }
  826. func (task *STask) GetParams() *jsonutils.JSONDict {
  827. return task.Params
  828. }
  829. func (task *STask) GetUserCred() mcclient.TokenCredential {
  830. return task.UserCred
  831. }
  832. func (task *STask) GetTaskId() string {
  833. return task.GetId()
  834. }
  835. func (task *STask) GetObject() db.IStandaloneModel {
  836. return task.taskObject
  837. }
  838. func (task *STask) GetObjects() []db.IStandaloneModel {
  839. return task.taskObjects
  840. }
  841. func (task *STask) getTaskHeader() http.Header {
  842. userCred := task.GetUserCred()
  843. if !userCred.IsValid() {
  844. userCred = auth.AdminCredential()
  845. }
  846. header := mcclient.GetTokenHeaders(userCred)
  847. header.Set(mcclient.TASK_ID, task.GetTaskId())
  848. return header
  849. }
  850. func (task *STask) GetTaskRequestHeader() http.Header {
  851. header := task.getTaskHeader()
  852. if len(serviceUrl) > 0 {
  853. notifyUrl := fmt.Sprintf("%s/tasks/%s", serviceUrl, task.GetTaskId())
  854. header.Set(mcclient.TASK_NOTIFY_URL, notifyUrl)
  855. }
  856. return header
  857. }
  858. func (task *STask) String() string {
  859. return fmt.Sprintf("%s(%s,%s)", task.Id, task.TaskName, task.Stage)
  860. }
  861. var serviceUrl string
  862. func SetServiceUrl(url string) {
  863. serviceUrl = url
  864. }
  865. func (task *STask) GetStartTime() time.Time {
  866. return task.CreatedAt
  867. }
  868. func (manager *STaskManager) QueryTasksOfObject(obj db.IStandaloneModel, since time.Time, isOpen *bool) *sqlchemy.SQuery {
  869. subq1 := manager.Query()
  870. {
  871. subq1 = subq1.Equals("obj_id", obj.GetId())
  872. subq1 = subq1.Equals("obj_type", obj.Keyword())
  873. if !since.IsZero() {
  874. subq1 = subq1.GE("created_at", since)
  875. }
  876. if isOpen != nil {
  877. if *isOpen {
  878. subq1 = subq1.Filter(sqlchemy.NOT(
  879. sqlchemy.In(subq1.Field("stage"), []string{"complete", "failed"}),
  880. ))
  881. } else if !*isOpen {
  882. subq1 = subq1.In("stage", []string{"complete", "failed"})
  883. }
  884. }
  885. }
  886. subq2 := manager.Query()
  887. {
  888. taskObjs := TaskObjectManager.TableSpec().Instance()
  889. subq2 = subq2.Join(taskObjs, sqlchemy.AND(
  890. sqlchemy.Equals(taskObjs.Field("task_id"), subq2.Field("id")),
  891. sqlchemy.Equals(taskObjs.Field("obj_id"), obj.GetId()),
  892. ))
  893. subq2 = subq2.Filter(sqlchemy.Equals(subq2.Field("obj_id"), MULTI_OBJECTS_ID))
  894. subq2 = subq2.Filter(sqlchemy.Equals(subq2.Field("obj_type"), obj.Keyword()))
  895. if !since.IsZero() {
  896. subq2 = subq2.Filter(sqlchemy.GE(subq2.Field("created_at"), since))
  897. }
  898. if isOpen != nil {
  899. if *isOpen {
  900. subq2 = subq2.Filter(sqlchemy.NOT(
  901. sqlchemy.In(subq2.Field("stage"), []string{"complete", "failed"}),
  902. ))
  903. } else if !*isOpen {
  904. subq2 = subq2.In("stage", []string{"complete", "failed"})
  905. }
  906. }
  907. }
  908. // subq1 and subq2 do not overlap for the fact that they have
  909. // different conditions on tasks_tbl.obj_id field
  910. return sqlchemy.Union(subq1, subq2).Query().Desc("created_at")
  911. }
  912. func (manager *STaskManager) IsInTask(obj db.IStandaloneModel) bool {
  913. tasks, err := manager.FetchIncompleteTasksOfObject(obj)
  914. if err == nil && len(tasks) == 0 {
  915. return false
  916. }
  917. return true
  918. }
  919. func (manager *STaskManager) FetchIncompleteTasksOfObject(obj db.IStandaloneModel) ([]STask, error) {
  920. isOpen := true
  921. return manager.FetchTasksOfObjectLatest(obj, 1*time.Hour, &isOpen)
  922. }
  923. func (manager *STaskManager) FetchTasksOfObjectLatest(obj db.IStandaloneModel, interval time.Duration, isOpen *bool) ([]STask, error) {
  924. since := timeutils.UtcNow().Add(-1 * interval)
  925. return manager.FetchTasksOfObject(obj, since, isOpen)
  926. }
  927. func (manager *STaskManager) FetchTasksOfObject(obj db.IStandaloneModel, since time.Time, isOpen *bool) ([]STask, error) {
  928. q := manager.QueryTasksOfObject(obj, since, isOpen)
  929. tasks := make([]STask, 0)
  930. err := db.FetchModelObjects(manager, q, &tasks)
  931. if err != nil && err != sql.ErrNoRows {
  932. return nil, err
  933. }
  934. return tasks, nil
  935. }
  936. // 操作日志列表
  937. func (manager *STaskManager) ListItemFilter(
  938. ctx context.Context,
  939. q *sqlchemy.SQuery,
  940. userCred mcclient.TokenCredential,
  941. input apis.TaskListInput,
  942. ) (*sqlchemy.SQuery, error) {
  943. var err error
  944. q, err = manager.SModelBaseManager.ListItemFilter(ctx, q, userCred, input.ModelBaseListInput)
  945. if err != nil {
  946. return q, errors.Wrap(err, "SResourceBaseManager.ListItemFilter")
  947. }
  948. q, err = manager.SStatusResourceBaseManager.ListItemFilter(ctx, q, userCred, input.StatusResourceBaseListInput)
  949. if err != nil {
  950. return q, errors.Wrap(err, "SStatusResourceBaseManager.ListItemFilter")
  951. }
  952. if len(input.Id) > 0 {
  953. q = q.In("id", input.Id)
  954. }
  955. if len(input.ObjId) > 0 {
  956. taskObjQ := TaskObjectManager.Query("task_id").In("obj_id", input.ObjId).Distinct().SubQuery()
  957. q = q.Join(taskObjQ, sqlchemy.Equals(q.Field("id"), taskObjQ.Field("task_id")))
  958. }
  959. if len(input.ObjName) > 0 {
  960. q = q.In("object", input.ObjName)
  961. }
  962. if len(input.ObjType) > 0 {
  963. q = q.In("obj_type", input.ObjType)
  964. }
  965. if len(input.TaskName) > 0 {
  966. q = q.In("task_name", input.TaskName)
  967. }
  968. if len(input.Stage) > 0 {
  969. q = q.In("stage", input.Stage)
  970. }
  971. if len(input.NotStage) > 0 {
  972. q = q.NotIn("stage", input.NotStage)
  973. }
  974. if len(input.ParentId) > 0 {
  975. q = q.In("parent_task_id", input.ParentId)
  976. }
  977. if input.IsComplete != nil {
  978. if *input.IsComplete {
  979. q = q.In("stage", []string{TASK_STAGE_FAILED, TASK_STAGE_COMPLETE})
  980. } else {
  981. q = q.NotIn("stage", []string{TASK_STAGE_FAILED, TASK_STAGE_COMPLETE})
  982. }
  983. }
  984. if input.IsInit != nil {
  985. if *input.IsInit {
  986. q = q.Equals("stage", TASK_INIT_STAGE)
  987. } else {
  988. q = q.NotEquals("stage", TASK_INIT_STAGE)
  989. }
  990. }
  991. if input.IsMulti != nil {
  992. if *input.IsMulti {
  993. q = q.Equals("obj_id", MULTI_OBJECTS_ID)
  994. } else {
  995. q = q.NotEquals("obj_id", MULTI_OBJECTS_ID)
  996. }
  997. }
  998. if input.IsRoot != nil {
  999. if *input.IsRoot {
  1000. q = q.IsNullOrEmpty("parent_task_id")
  1001. } else {
  1002. q = q.IsNotEmpty("parent_task_id")
  1003. }
  1004. }
  1005. if len(input.ParentTaskId) > 0 {
  1006. q = q.Equals("parent_task_id", input.ParentTaskId)
  1007. }
  1008. if input.SubTask != nil && *input.SubTask {
  1009. subSQFunc := func(status string, cntField string) *sqlchemy.SSubQuery {
  1010. subQ := SubTaskManager.Query()
  1011. if len(status) > 0 {
  1012. subQ = subQ.Equals("status", status)
  1013. }
  1014. subQ = subQ.GroupBy(subQ.Field("task_id"))
  1015. subQ = subQ.AppendField(subQ.Field("task_id"))
  1016. subQ = subQ.AppendField(sqlchemy.COUNT(cntField))
  1017. return subQ.SubQuery()
  1018. }
  1019. {
  1020. subSQ := subSQFunc("", "sub_task_count")
  1021. q = q.LeftJoin(subSQ, sqlchemy.Equals(subSQ.Field("task_id"), q.Field("id")))
  1022. q = q.AppendField(subSQ.Field("sub_task_count"))
  1023. }
  1024. {
  1025. failSubSQ := subSQFunc(SUBTASK_FAIL, "fail_sub_task_cnt")
  1026. q = q.LeftJoin(failSubSQ, sqlchemy.Equals(failSubSQ.Field("task_id"), q.Field("id")))
  1027. q = q.AppendField(failSubSQ.Field("fail_sub_task_cnt"))
  1028. }
  1029. {
  1030. succSubSQ := subSQFunc(SUBTASK_SUCC, "succ_sub_task_cnt")
  1031. q = q.LeftJoin(succSubSQ, sqlchemy.Equals(succSubSQ.Field("task_id"), q.Field("id")))
  1032. q = q.AppendField(succSubSQ.Field("succ_sub_task_cnt"))
  1033. }
  1034. for _, c := range manager.TableSpec().Columns() {
  1035. q = q.AppendField(q.Field(c.Name()))
  1036. }
  1037. }
  1038. // q.DebugQuery2("taskQuery")
  1039. return q, nil
  1040. }
  1041. func (manager *STaskManager) ListItemExportKeys(ctx context.Context, q *sqlchemy.SQuery, userCred mcclient.TokenCredential, keys stringutils2.SSortedStrings) (*sqlchemy.SQuery, error) {
  1042. var err error
  1043. q, err = manager.SModelBaseManager.ListItemExportKeys(ctx, q, userCred, keys)
  1044. if err != nil {
  1045. return nil, errors.Wrap(err, "SModelBaseManager.ListItemExportKeys")
  1046. }
  1047. // q, err = manager.SProjectizedResourceBaseManager.ListItemExportKeys(ctx, q, userCred, keys)
  1048. // if err != nil {
  1049. // return nil, errors.Wrap(err, "SProjectizedResourceBaseManager.ListItemExportKeys")
  1050. // }
  1051. return q, nil
  1052. }
  1053. func (manager *STaskManager) QueryDistinctExtraField(q *sqlchemy.SQuery, field string) (*sqlchemy.SQuery, error) {
  1054. var err error
  1055. q, err = manager.SModelBaseManager.QueryDistinctExtraField(q, field)
  1056. if err == nil {
  1057. return q, nil
  1058. }
  1059. // q, err = manager.SProjectizedResourceBaseManager.QueryDistinctExtraField(q, field)
  1060. // if err == nil {
  1061. // return q, nil
  1062. // }
  1063. return q, httperrors.ErrNotFound
  1064. }
  1065. func (manager *STaskManager) ResourceScope() rbacscope.TRbacScope {
  1066. return manager.SProjectizedResourceBaseManager.ResourceScope()
  1067. }
  1068. func (manager *STaskManager) FetchCustomizeColumns(
  1069. ctx context.Context,
  1070. userCred mcclient.TokenCredential,
  1071. query jsonutils.JSONObject,
  1072. objs []interface{},
  1073. fields stringutils2.SSortedStrings,
  1074. isList bool,
  1075. ) []apis.TaskDetails {
  1076. rows := make([]apis.TaskDetails, len(objs))
  1077. bases := manager.SModelBaseManager.FetchCustomizeColumns(ctx, userCred, query, objs, fields, isList)
  1078. projs := manager.SProjectizedResourceBaseManager.FetchCustomizeColumns(ctx, userCred, query, objs, fields, isList)
  1079. for i := range objs {
  1080. rows[i] = apis.TaskDetails{
  1081. ModelBaseDetails: bases[i],
  1082. ProjectizedResourceInfo: projs[i],
  1083. }
  1084. }
  1085. return rows
  1086. }
  1087. func (manager *STaskManager) OrderByExtraFields(
  1088. ctx context.Context,
  1089. q *sqlchemy.SQuery,
  1090. userCred mcclient.TokenCredential,
  1091. query apis.TaskListInput,
  1092. ) (*sqlchemy.SQuery, error) {
  1093. var err error
  1094. q, err = manager.SModelBaseManager.OrderByExtraFields(ctx, q, userCred, query.ModelBaseListInput)
  1095. if err != nil {
  1096. return q, errors.Wrap(err, "SModelBaseManager.OrderByExtraField")
  1097. }
  1098. // q, err = manager.SProjectizedResourceBaseManager.OrderByExtraFields(ctx, q, userCred, query.ProjectizedResourceListInput)
  1099. // if err != nil {
  1100. // return q, errors.Wrap(err, "SProjectizedResourceBaseManager.OrderByExtraField")
  1101. // }
  1102. return q, nil
  1103. }
  1104. func (task *STask) SetProgressAndStatus(progress float32, status string) error {
  1105. _, err := db.Update(task, func() error {
  1106. task.SetStatusValue(status)
  1107. task.SetProgressValue(progress)
  1108. return nil
  1109. })
  1110. if err != nil {
  1111. return errors.Wrap(err, "Update")
  1112. }
  1113. return nil
  1114. }
  1115. func (task *STask) SetProgress(progress float32) error {
  1116. _, err := db.Update(task, func() error {
  1117. task.SetProgressValue(progress)
  1118. return nil
  1119. })
  1120. if err != nil {
  1121. return errors.Wrap(err, "Update")
  1122. }
  1123. return nil
  1124. }
  1125. func (manager *STaskManager) InitializeData() error {
  1126. {
  1127. err := manager.failTimeoutTasks()
  1128. if err != nil {
  1129. return errors.Wrap(err, "failTimeoutTasks")
  1130. }
  1131. }
  1132. {
  1133. err := manager.migrateObjectInfo()
  1134. if err != nil {
  1135. return errors.Wrap(err, "migrateObjectInfo")
  1136. }
  1137. }
  1138. {
  1139. err := manager.clearnUpSubtasks()
  1140. if err != nil {
  1141. return errors.Wrap(err, "clearnUpSubtasks")
  1142. }
  1143. }
  1144. {
  1145. err := manager.clearnUpTaskObjects()
  1146. if err != nil {
  1147. return errors.Wrap(err, "clearnUpTaskObjects")
  1148. }
  1149. }
  1150. return nil
  1151. }
  1152. func (manager *STaskManager) failTimeoutTasks() error {
  1153. q := manager.Query().NotIn("stage", []string{TASK_STAGE_FAILED, TASK_STAGE_COMPLETE})
  1154. tasks := make([]STask, 0)
  1155. err := db.FetchModelObjects(manager, q, &tasks)
  1156. if err != nil {
  1157. return errors.Wrap(err, "FetchModelObjects")
  1158. }
  1159. reason := jsonutils.NewDict()
  1160. reason.Add(jsonutils.NewString("service restart"), "__reason__")
  1161. reason.Add(jsonutils.NewString("error"), "__status__")
  1162. for i := range tasks {
  1163. task := &tasks[i]
  1164. task.fixParams()
  1165. manager.execTaskObject(task, reason)
  1166. }
  1167. return nil
  1168. }
  1169. func (manager *STaskManager) clearnUpSubtasks() error {
  1170. start := time.Now()
  1171. log.Infof("start clearnUpSubtasks")
  1172. defer func() {
  1173. log.Infof("end clearnUpSubtasks, takes %f seconds", time.Since(start).Seconds())
  1174. }()
  1175. q := SubTaskManager.Query("task_id")
  1176. tasksQ := TaskManager.Query().SubQuery()
  1177. q = q.LeftJoin(tasksQ, sqlchemy.Equals(q.Field("task_id"), tasksQ.Field("id")))
  1178. q = q.Filter(sqlchemy.IsNull(tasksQ.Field("id")))
  1179. q = q.Distinct()
  1180. rows, err := q.Rows()
  1181. if err != nil {
  1182. return errors.Wrap(err, "q.Rows")
  1183. }
  1184. defer rows.Close()
  1185. for rows.Next() {
  1186. val, err := q.Row2Map(rows)
  1187. if err != nil {
  1188. return errors.Wrap(err, "Row2Map")
  1189. }
  1190. sql := fmt.Sprintf("delete from `%s` where task_id = '%s'", SubTaskManager.TableSpec().Name(), val["task_id"])
  1191. log.Infof("%s", sql)
  1192. _, err = SubTaskManager.TableSpec().GetTableSpec().Database().Exec(sql)
  1193. if err != nil {
  1194. return errors.Wrap(err, "exec")
  1195. }
  1196. sql = fmt.Sprintf("delete from `%s` where task_id = '%s'", TaskObjectManager.TableSpec().Name(), val["task_id"])
  1197. log.Infof("%s", sql)
  1198. _, err = TaskObjectManager.TableSpec().GetTableSpec().Database().Exec(sql)
  1199. if err != nil {
  1200. return errors.Wrap(err, "exec")
  1201. }
  1202. }
  1203. return nil
  1204. }
  1205. func (manager *STaskManager) clearnUpTaskObjects() error {
  1206. start := time.Now()
  1207. log.Infof("start clearnUpTaskObjects")
  1208. defer func() {
  1209. log.Infof("end clearnUpTaskObjects, takes %f seconds", time.Since(start).Seconds())
  1210. }()
  1211. q := TaskObjectManager.Query("task_id")
  1212. tasksQ := TaskManager.Query().SubQuery()
  1213. q = q.LeftJoin(tasksQ, sqlchemy.Equals(q.Field("task_id"), tasksQ.Field("id")))
  1214. q = q.Filter(sqlchemy.IsNull(tasksQ.Field("id")))
  1215. q = q.Distinct()
  1216. rows, err := q.Rows()
  1217. if err != nil {
  1218. return errors.Wrap(err, "q.Rows")
  1219. }
  1220. defer rows.Close()
  1221. for rows.Next() {
  1222. val, err := q.Row2Map(rows)
  1223. if err != nil {
  1224. return errors.Wrap(err, "Row2Map")
  1225. }
  1226. sql := fmt.Sprintf("delete from `%s` where task_id = '%s'", TaskObjectManager.TableSpec().Name(), val["task_id"])
  1227. log.Infof("%s", sql)
  1228. _, err = TaskObjectManager.TableSpec().GetTableSpec().Database().Exec(sql)
  1229. if err != nil {
  1230. return errors.Wrap(err, "exec")
  1231. }
  1232. }
  1233. return nil
  1234. }
  1235. func (manager *STaskManager) migrateObjectInfo() error {
  1236. start := time.Now()
  1237. log.Infof("start migrateObjectInfo")
  1238. defer func() {
  1239. log.Infof("end migrateObjectInfo, takes %f seconds", time.Since(start).Seconds())
  1240. }()
  1241. q := manager.Query().NotEquals("obj_id", MULTI_OBJECTS_ID)
  1242. taskObj := TaskObjectManager.Query().SubQuery()
  1243. q = q.LeftJoin(taskObj, sqlchemy.Equals(q.Field("id"), taskObj.Field("task_id")))
  1244. q = q.Filter(sqlchemy.IsNull(taskObj.Field("task_id")))
  1245. q = q.Asc("created_at")
  1246. // q.DebugQuery2("migrateObjectInfo")
  1247. rows, err := q.Rows()
  1248. if err != nil {
  1249. return errors.Wrap(err, "query.Rows")
  1250. }
  1251. defer rows.Close()
  1252. for rows.Next() {
  1253. task := STask{}
  1254. task.SetModelManager(manager, &task)
  1255. err := q.Row2Struct(rows, &task)
  1256. if err != nil {
  1257. return errors.Wrap(err, "row2struct")
  1258. }
  1259. taskObj := STaskObject{}
  1260. taskObj.ObjId = task.ObjId
  1261. taskObj.Object = task.Object
  1262. taskObj.TaskId = task.Id
  1263. taskObj.DomainId = task.DomainId
  1264. taskObj.ProjectId = task.ProjectId
  1265. taskObj.SetModelManager(TaskObjectManager, &taskObj)
  1266. err = TaskObjectManager.TableSpec().Insert(ctx.CtxWithTime(), &taskObj)
  1267. if err != nil {
  1268. return errors.Wrap(err, "Insert taskObject")
  1269. }
  1270. }
  1271. return nil
  1272. }
  1273. var (
  1274. taskCleanuoWorkerManager = appsrv.NewWorkerManager(
  1275. "taskCleanupWorkerManager",
  1276. 1,
  1277. 1024,
  1278. true,
  1279. )
  1280. )
  1281. func (manager *STaskManager) TaskCleanupJob(ctx context.Context, userCred mcclient.TokenCredential, isStart bool) {
  1282. taskCleanuoWorkerManager.Run(&sTaskCleanupJob{}, nil, nil)
  1283. }
  1284. type sTaskCleanupJob struct{}
  1285. func (job *sTaskCleanupJob) Run() {
  1286. count, err := TaskManager.doTaskCleanupJob()
  1287. if err != nil {
  1288. log.Errorf("doTaskCleanupJob fail %s", err)
  1289. return
  1290. }
  1291. if count > 0 {
  1292. taskCleanuoWorkerManager.Run(&sTaskCleanupJob{}, nil, nil)
  1293. }
  1294. }
  1295. func (job *sTaskCleanupJob) Dump() string {
  1296. return "TaskCleanupJob"
  1297. }
  1298. func (manager *STaskManager) doTaskCleanupJob() (int, error) {
  1299. ctx := context.WithValue(context.Background(), "task_cleanup_job", true)
  1300. q := manager.Query().LT("created_at", time.Now().Add(-time.Duration(consts.TaskArchiveThresholdHours())*time.Hour)).Asc("created_at")
  1301. if consts.TaskArchiveBatchLimit() > 0 {
  1302. q = q.Limit(consts.TaskArchiveBatchLimit())
  1303. }
  1304. rows, err := q.Rows()
  1305. if err != nil {
  1306. log.Errorf("query rows fail %s", err)
  1307. return 0, errors.Wrap(err, "query rows")
  1308. }
  1309. defer rows.Close()
  1310. taskStart := time.Now()
  1311. count := 0
  1312. for rows.Next() {
  1313. task := STask{}
  1314. err := q.Row2Struct(rows, &task)
  1315. if err != nil {
  1316. log.Errorf("Row2Struct fail %s", err)
  1317. return 0, errors.Wrap(err, "row2struct")
  1318. }
  1319. task.SetModelManager(ArchivedTaskManager, &task)
  1320. err = ArchivedTaskManager.Insert(ctx, &task)
  1321. if err != nil {
  1322. log.Errorf("insert archive fail %s", err)
  1323. return 0, errors.Wrap(err, "insert archive")
  1324. }
  1325. // cleanup
  1326. for _, sql := range []string{
  1327. fmt.Sprintf("DELETE FROM `%s` WHERE id = ?", manager.TableSpec().Name()),
  1328. fmt.Sprintf("DELETE FROM `%s` WHERE task_id = ?", TaskObjectManager.TableSpec().Name()),
  1329. fmt.Sprintf("DELETE FROM `%s` WHERE task_id = ?", SubTaskManager.TableSpec().Name()),
  1330. } {
  1331. _, err := manager.TableSpec().GetTableSpec().Database().Exec(sql, task.Id)
  1332. if err != nil {
  1333. log.Errorf("exec %s %s fail: %s", sql, task.Id, err)
  1334. return 0, errors.Wrap(err, "exec")
  1335. }
  1336. }
  1337. count++
  1338. }
  1339. log.Infof("TaskCleanupJob migrate %d tasks, takes %f seconds, batch limit=%d threshold hours=%d", count, time.Since(taskStart).Seconds(), consts.TaskArchiveBatchLimit(), consts.TaskArchiveThresholdHours())
  1340. return count, nil
  1341. }
  1342. func (task *STask) PerformCancel(
  1343. ctx context.Context,
  1344. userCred mcclient.TokenCredential,
  1345. query jsonutils.JSONObject,
  1346. input apis.TaskCancelInput,
  1347. ) (jsonutils.JSONObject, error) {
  1348. if utils.IsInArray(task.Stage, []string{TASK_STAGE_FAILED, TASK_STAGE_COMPLETE}) {
  1349. return nil, errors.Wrapf(errors.ErrInvalidStatus, "cannot cancel stage in %s", task.Stage)
  1350. }
  1351. err := task.cancel(ctx, nil)
  1352. if err != nil {
  1353. return nil, errors.Wrap(err, "cancel")
  1354. }
  1355. return nil, nil
  1356. }
  1357. func (task *STask) fetchSubTasks() ([]STask, error) {
  1358. q := task.GetModelManager().Query().Equals("parent_task_id", task.Id)
  1359. tasks := make([]STask, 0)
  1360. err := db.FetchModelObjects(task.GetModelManager(), q, &tasks)
  1361. if err != nil {
  1362. return nil, errors.Wrap(err, "db.FetchModelObjects")
  1363. }
  1364. return tasks, nil
  1365. }
  1366. func (task *STask) cancel(ctx context.Context, reason *jsonutils.JSONDict) error {
  1367. if utils.IsInArray(task.Stage, []string{TASK_STAGE_FAILED, TASK_STAGE_COMPLETE}) {
  1368. return nil
  1369. }
  1370. if reason == nil {
  1371. reason = jsonutils.NewDict()
  1372. reason.Add(jsonutils.NewString("cancel"), "__reason__")
  1373. reason.Add(jsonutils.NewString("error"), "__status__")
  1374. }
  1375. subtasks, err := task.fetchSubTasks()
  1376. if err != nil {
  1377. return errors.Wrap(err, "fetchSubTasks")
  1378. }
  1379. for i := range subtasks {
  1380. err := subtasks[i].cancel(ctx, reason)
  1381. if err != nil {
  1382. return errors.Wrap(err, "cancelTask")
  1383. }
  1384. }
  1385. task.fixParams()
  1386. TaskManager.execTask(task.GetTaskId(), reason)
  1387. return nil
  1388. }