| 12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250125112521253125412551256125712581259126012611262126312641265126612671268126912701271127212731274127512761277127812791280128112821283128412851286128712881289129012911292129312941295129612971298129913001301130213031304130513061307130813091310131113121313131413151316131713181319132013211322132313241325132613271328132913301331133213331334133513361337133813391340134113421343134413451346134713481349135013511352135313541355135613571358135913601361136213631364136513661367136813691370137113721373137413751376137713781379138013811382138313841385138613871388138913901391139213931394139513961397139813991400140114021403140414051406140714081409141014111412141314141415141614171418141914201421142214231424142514261427142814291430143114321433143414351436143714381439144014411442144314441445144614471448144914501451145214531454145514561457145814591460146114621463146414651466146714681469147014711472147314741475147614771478147914801481148214831484148514861487148814891490149114921493149414951496149714981499150015011502150315041505150615071508150915101511151215131514151515161517151815191520152115221523152415251526152715281529153015311532153315341535153615371538153915401541154215431544154515461547154815491550155115521553155415551556155715581559156015611562156315641565156615671568156915701571157215731574157515761577157815791580158115821583158415851586158715881589 |
- // Copyright 2019 Yunion
- //
- // Licensed under the Apache License, Version 2.0 (the "License");
- // you may not use this file except in compliance with the License.
- // You may obtain a copy of the License at
- //
- // http://www.apache.org/licenses/LICENSE-2.0
- //
- // Unless required by applicable law or agreed to in writing, software
- // distributed under the License is distributed on an "AS IS" BASIS,
- // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- // See the License for the specific language governing permissions and
- // limitations under the License.
- package taskman
- import (
- "context"
- "database/sql"
- "fmt"
- "net/http"
- "reflect"
- "runtime/debug"
- "sort"
- "strconv"
- "strings"
- "time"
- "yunion.io/x/jsonutils"
- "yunion.io/x/log"
- "yunion.io/x/pkg/appctx"
- "yunion.io/x/pkg/errors"
- "yunion.io/x/pkg/gotypes"
- "yunion.io/x/pkg/util/httputils"
- "yunion.io/x/pkg/util/rbacscope"
- "yunion.io/x/pkg/util/reflectutils"
- "yunion.io/x/pkg/util/stringutils"
- "yunion.io/x/pkg/util/timeutils"
- "yunion.io/x/pkg/util/version"
- "yunion.io/x/pkg/utils"
- "yunion.io/x/sqlchemy"
- "yunion.io/x/onecloud/pkg/apis"
- "yunion.io/x/onecloud/pkg/appsrv"
- "yunion.io/x/onecloud/pkg/cloudcommon/consts"
- "yunion.io/x/onecloud/pkg/cloudcommon/db"
- "yunion.io/x/onecloud/pkg/cloudcommon/db/lockman"
- "yunion.io/x/onecloud/pkg/cloudcommon/db/quotas"
- "yunion.io/x/onecloud/pkg/httperrors"
- "yunion.io/x/onecloud/pkg/mcclient"
- "yunion.io/x/onecloud/pkg/mcclient/auth"
- "yunion.io/x/onecloud/pkg/mcclient/modules/yunionconf"
- "yunion.io/x/onecloud/pkg/util/ctx"
- "yunion.io/x/onecloud/pkg/util/logclient"
- "yunion.io/x/onecloud/pkg/util/stringutils2"
- )
- const (
- PARENT_TASK_ID_KEY = "parent_task_id"
- PENDING_USAGE_KEY = "__pending_usage__"
- PARENT_TASK_NOTIFY_KEY = "__parent_task_notifyurl"
- REQUEST_CONTEXT_KEY = "__request_context"
- TASK_STAGE_FAILED = "failed"
- TASK_STAGE_COMPLETE = "complete"
- MAX_REMOTE_NOTIFY_TRIES = 5
- MULTI_OBJECTS_ID = "[--MULTI_OBJECTS--]"
- TASK_INIT_STAGE = "on_init"
- CONVERT_TASK = "convert_task"
- LANG = "lang"
- taskStatusDone = "done"
- TASK_STATUS_QUEUE = "queue"
- )
- type STaskManager struct {
- db.SModelBaseManager
- db.SProjectizedResourceBaseManager
- db.SStatusResourceBaseManager
- }
- var TaskManager *STaskManager
- var userCredWidthLimit = 0
- func init() {
- TaskManager = &STaskManager{
- SModelBaseManager: db.NewModelBaseManager(STask{}, "tasks_tbl", "task", "tasks"),
- }
- TaskManager.SetVirtualObject(TaskManager)
- if field, ok := reflect.TypeOf(&STask{}).Elem().FieldByName("UserCred"); ok {
- if widthStr := field.Tag.Get(sqlchemy.TAG_WIDTH); len(widthStr) > 0 {
- userCredWidthLimit, _ = strconv.Atoi(widthStr)
- }
- }
- TaskManager.TableSpec().AddIndex(true, "id", "created_at", "parent_task_id", "stage")
- }
- type STask struct {
- db.SModelBase
- // 资源创建时间
- CreatedAt time.Time `nullable:"false" created_at:"true" index:"true" get:"user" list:"user" json:"created_at"`
- // 资源更新时间
- UpdatedAt time.Time `nullable:"false" updated_at:"true" list:"user" json:"updated_at"`
- // 资源被更新次数
- UpdateVersion int `default:"0" nullable:"false" auto_version:"true" list:"user" json:"update_version"`
- Id string `width:"36" charset:"ascii" primary:"true" list:"user"` // Column(VARCHAR(36, charset='ascii'), primary_key=True, default=get_uuid)
- STaskBase
- db.SProjectizedResourceBase
- taskObject db.IStandaloneModel `ignore:"true"`
- taskObjects []db.IStandaloneModel `ignore:"true"`
- SubTaskCount int `ignore:"true" json:"sub_task_count"`
- FailSubTaskCnt int `ignore:"true" json:"fail_sub_task_cnt"`
- SuccSubTaskCnt int `ignore:"true" json:"succ_sub_task_cnt"`
- }
- func (manager *STaskManager) CreateByInsertOrUpdate() bool {
- return false
- }
- func (manager *STaskManager) AllowListItems(ctx context.Context, userCred mcclient.TokenCredential, query jsonutils.JSONObject) bool {
- return true
- }
- func (manager *STaskManager) AllowCreateItem(ctx context.Context, userCred mcclient.TokenCredential, query jsonutils.JSONObject, data jsonutils.JSONObject) bool {
- return false
- }
- func (manager *STaskManager) FilterById(q *sqlchemy.SQuery, idStr string) *sqlchemy.SQuery {
- return q.Equals("id", idStr)
- }
- func (manager *STaskManager) FilterByNotId(q *sqlchemy.SQuery, idStr string) *sqlchemy.SQuery {
- return q.NotEquals("id", idStr)
- }
- func (manager *STaskManager) FilterByName(q *sqlchemy.SQuery, name string) *sqlchemy.SQuery {
- return q.Equals("id", name)
- }
- func (manager *STaskManager) PerformAction(ctx context.Context, userCred mcclient.TokenCredential, taskId string, query jsonutils.JSONObject, data jsonutils.JSONObject) (jsonutils.JSONObject, error) {
- err := runTask(taskId, data)
- if err != nil {
- return nil, errors.Wrapf(err, "runTask")
- }
- resp := jsonutils.NewDict()
- resp.Add(jsonutils.NewString("ok"), "result")
- return resp, nil
- }
- func (manager *STask) PreCheckPerformAction(
- ctx context.Context, userCred mcclient.TokenCredential,
- action string, query jsonutils.JSONObject, data jsonutils.JSONObject,
- ) error {
- return nil
- }
- func (task *STask) GetOwnerId() mcclient.IIdentityProvider {
- return task.SProjectizedResourceBase.GetOwnerId()
- }
- func (manager *STaskManager) FilterByOwner(ctx context.Context, q *sqlchemy.SQuery, man db.FilterByOwnerProvider, userCred mcclient.TokenCredential, owner mcclient.IIdentityProvider, scope rbacscope.TRbacScope) *sqlchemy.SQuery {
- taskQ := TaskObjectManager.Query("task_id")
- taskQ = taskQ.Snapshot()
- taskQ = manager.SProjectizedResourceBaseManager.FilterByOwner(ctx, taskQ, man, userCred, owner, scope)
- if taskQ.IsAltered() {
- taskSubQ := taskQ.SubQuery()
- q = q.Join(taskSubQ, sqlchemy.Equals(q.Field("id"), taskSubQ.Field("task_id")))
- }
- return q
- }
- func (manager *STaskManager) FetchOwnerId(ctx context.Context, data jsonutils.JSONObject) (mcclient.IIdentityProvider, error) {
- return manager.SProjectizedResourceBaseManager.FetchOwnerId(ctx, data)
- }
- func (manager *STaskManager) FetchTaskById(taskId string) *STask {
- return manager.fetchTask(taskId)
- }
- func (task *STask) ValidateDeleteCondition(ctx context.Context, info jsonutils.JSONObject) error {
- return httperrors.NewForbiddenError("forbidden")
- }
- func (task *STask) ValidateUpdateCondition(ctx context.Context) error {
- return httperrors.NewForbiddenError("forbidden")
- }
- func (task *STask) BeforeInsert() {
- if len(task.Id) == 0 {
- task.Id = stringutils.UUID4()
- }
- }
- func (task *STask) GetId() string {
- return task.Id
- }
- func (task *STask) GetName() string {
- return task.TaskName
- }
- func (task *STask) saveStartAt() {
- if !task.StartAt.IsZero() {
- return
- }
- _, err := db.Update(task, func() error {
- task.StartAt = timeutils.UtcNow()
- return nil
- })
- if err != nil {
- log.Errorf("task %s save start_at fail: %s", task.String(), err)
- }
- }
- func fetchTaskParams(
- ctx context.Context,
- taskName string,
- taskData *jsonutils.JSONDict,
- parentTaskId string,
- parentTaskNotifyUrl string,
- pendingUsages []quotas.IQuota,
- ) *jsonutils.JSONDict {
- var data *jsonutils.JSONDict
- if taskData != nil {
- excludeKeys := []string{
- PARENT_TASK_ID_KEY, PARENT_TASK_NOTIFY_KEY, PENDING_USAGE_KEY,
- }
- for i := 1; taskData.Contains(pendingUsageKey(i)); i += 1 {
- excludeKeys = append(excludeKeys, pendingUsageKey(i))
- }
- data = taskData.CopyExcludes(excludeKeys...)
- } else {
- data = jsonutils.NewDict()
- }
- reqContext := appctx.FetchAppContextData(ctx)
- if !reqContext.IsZero() {
- data.Add(jsonutils.Marshal(&reqContext), REQUEST_CONTEXT_KEY)
- }
- if len(parentTaskId) > 0 || len(parentTaskNotifyUrl) > 0 {
- if len(parentTaskId) > 0 {
- data.Add(jsonutils.NewString(parentTaskId), PARENT_TASK_ID_KEY)
- }
- if len(parentTaskNotifyUrl) > 0 {
- data.Add(jsonutils.NewString(parentTaskNotifyUrl), PARENT_TASK_NOTIFY_KEY)
- log.Infof("%s notify parent url: %s", taskName, parentTaskNotifyUrl)
- }
- } else {
- if !reqContext.IsZero() {
- if len(reqContext.TaskId) > 0 && len(reqContext.TaskNotifyUrl) == 0 {
- data.Add(jsonutils.NewString(reqContext.TaskId), PARENT_TASK_ID_KEY)
- }
- if len(reqContext.TaskNotifyUrl) > 0 {
- data.Add(jsonutils.NewString(reqContext.TaskNotifyUrl), PARENT_TASK_NOTIFY_KEY)
- log.Infof("%s notify parent url: %s", taskName, reqContext.TaskNotifyUrl)
- }
- }
- }
- if len(pendingUsages) > 0 {
- for i := range pendingUsages {
- pendingUsage := pendingUsages[i]
- if gotypes.IsNil(pendingUsage) {
- continue
- }
- key := pendingUsageKey(i)
- data.Add(jsonutils.Marshal(pendingUsage), key)
- }
- }
- return data
- }
- func (manager *STaskManager) NewTask(
- ctx context.Context,
- taskName string,
- obj db.IStandaloneModel,
- userCred mcclient.TokenCredential,
- taskData *jsonutils.JSONDict,
- parentTaskId string,
- parentTaskNotifyUrl string,
- pendingUsage ...quotas.IQuota,
- ) (*STask, error) {
- if userCredWidthLimit > 0 && len(userCred.String()) > userCredWidthLimit {
- return nil, fmt.Errorf("Too many permissions for user %s", userCred.GetUserName())
- }
- if !isTaskExist(taskName) {
- return nil, fmt.Errorf("task %s not found", taskName)
- }
- data := fetchTaskParams(ctx, taskName, taskData, parentTaskId, parentTaskNotifyUrl, pendingUsage)
- task := &STask{
- STaskBase: STaskBase{
- ObjType: obj.Keyword(),
- ObjId: obj.GetId(),
- Object: obj.GetName(),
- TaskName: taskName,
- UserCred: userCred,
- Params: data,
- Stage: TASK_INIT_STAGE,
- ParentTaskId: parentTaskId,
- },
- }
- task.SetModelManager(manager, task)
- err := manager.TableSpec().Insert(ctx, task)
- if err != nil {
- log.Errorf("Task insert error %s", err)
- return nil, err
- }
- task.SetProgressAndStatus(0, TASK_STATUS_QUEUE)
- {
- to, err := TaskObjectManager.insertObject(ctx, task.Id, obj)
- if err != nil {
- log.Errorf("Taskobject insert error %s", err)
- return nil, errors.Wrap(err, "TaskObjectManager.insertObject")
- }
- db.Update(task, func() error {
- task.ProjectId = to.ProjectId
- task.DomainId = to.DomainId
- return nil
- })
- }
- parentTask := task.GetParentTask()
- if parentTask != nil {
- st := &SSubTask{TaskId: parentTask.Id, Stage: parentTask.Stage, SubtaskId: task.Id}
- st.SetModelManager(SubTaskManager, st)
- err := SubTaskManager.TableSpec().Insert(ctx, st)
- if err != nil {
- log.Errorf("Subtask insert error %s", err)
- return nil, err
- }
- }
- return task, nil
- }
- func (manager *STaskManager) NewParallelTask(
- ctx context.Context,
- taskName string,
- objs []db.IStandaloneModel,
- userCred mcclient.TokenCredential,
- taskData *jsonutils.JSONDict,
- parentTaskId string,
- parentTaskNotifyUrl string,
- pendingUsage ...quotas.IQuota,
- ) (*STask, error) {
- if !isTaskExist(taskName) {
- return nil, fmt.Errorf("task %s not found", taskName)
- }
- if len(objs) == 0 {
- return nil, fmt.Errorf("failed to do task %s with zero objs", taskName)
- }
- log.Debugf("number of objs: %d", len(objs))
- data := fetchTaskParams(ctx, taskName, taskData, parentTaskId, parentTaskNotifyUrl, pendingUsage)
- task := &STask{
- STaskBase: STaskBase{
- ObjType: objs[0].Keyword(),
- Object: MULTI_OBJECTS_ID,
- ObjId: MULTI_OBJECTS_ID,
- TaskName: taskName,
- UserCred: userCred,
- Params: data,
- Stage: TASK_INIT_STAGE,
- ParentTaskId: parentTaskId,
- },
- }
- task.SetModelManager(manager, task)
- err := manager.TableSpec().Insert(ctx, task)
- if err != nil {
- log.Errorf("Task insert error %s", err)
- return nil, err
- }
- task.SetProgressAndStatus(0, TASK_STATUS_QUEUE)
- domainIds := stringutils2.NewSortedStrings(nil)
- tenantIds := stringutils2.NewSortedStrings(nil)
- for i := range objs {
- to, err := TaskObjectManager.insertObject(ctx, task.Id, objs[i])
- if err != nil {
- log.Errorf("Taskobject insert error %s", err)
- return nil, errors.Wrap(err, "insert task object")
- }
- tenantIds = tenantIds.Append(to.ProjectId)
- domainIds = domainIds.Append(to.DomainId)
- }
- db.Update(task, func() error {
- task.DomainId = strings.Join(domainIds, ",")
- task.ProjectId = strings.Join(tenantIds, ",")
- return nil
- })
- parentTask := task.GetParentTask()
- if parentTask != nil {
- st := &SSubTask{TaskId: parentTask.Id, Stage: parentTask.Stage, SubtaskId: task.Id}
- st.SetModelManager(SubTaskManager, st)
- err := SubTaskManager.TableSpec().Insert(ctx, st)
- if err != nil {
- log.Errorf("Subtask insert error %s", err)
- return nil, err
- }
- }
- return task, nil
- }
- func (manager *STaskManager) fetchTask(idStr string) *STask {
- iTask, err := db.NewModelObject(manager)
- if err != nil {
- log.Errorf("New task object fail: %s", err)
- return nil
- }
- err = manager.Query().Equals("id", idStr).First(iTask)
- if err != nil {
- log.Errorf("GetTask %s fail: %s", idStr, err)
- return nil
- }
- task := iTask.(*STask)
- task.fixParams()
- return task
- }
- func (task *STask) fixParams() {
- if task.Params == nil {
- task.Params = jsonutils.NewDict()
- }
- }
- func (manager *STaskManager) execTask(taskId string, data jsonutils.JSONObject) {
- baseTask := manager.fetchTask(taskId)
- if baseTask == nil {
- return
- }
- manager.execTaskObject(baseTask, data)
- }
- func (manager *STaskManager) execTaskObject(baseTask *STask, data jsonutils.JSONObject) {
- taskType, ok := taskTable[baseTask.TaskName]
- if !ok {
- log.Errorf("Cannot find task %s", baseTask.TaskName)
- return
- }
- log.Debugf("Do task %s(%s) with data %s at stage %s", taskType, baseTask.Id, data, baseTask.Stage)
- taskValue := reflect.New(taskType)
- if taskValue.Type().Implements(ITaskType) {
- execITask(taskValue, baseTask, data, false)
- } else if taskValue.Type().Implements(IBatchTaskType) {
- execITask(taskValue, baseTask, data, true)
- } else {
- log.Errorf("Unsupported task type?? %s", taskValue.Type())
- }
- }
- type sSortedObjects []db.IStandaloneModel
- func (a sSortedObjects) Len() int { return len(a) }
- func (a sSortedObjects) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
- func (a sSortedObjects) Less(i, j int) bool { return a[i].GetId() < a[j].GetId() }
- func execITask(taskValue reflect.Value, task *STask, odata jsonutils.JSONObject, isMulti bool) {
- ctxData := task.GetRequestContext()
- ctx := ctxData.GetContext()
- task.saveStartAt()
- taskFailed := false
- var data jsonutils.JSONObject
- if odata != nil {
- switch dictdata := odata.(type) {
- case *jsonutils.JSONDict:
- taskStatus, _ := odata.GetString("__status__")
- if len(taskStatus) > 0 && taskStatus != "OK" {
- taskFailed = true
- dictdata.Set("__stage__", jsonutils.NewString(task.Stage))
- if !dictdata.Contains("__reason__") {
- reasonJson := dictdata.CopyExcludes("__status__", "__stage__")
- dictdata.Set("__reason__", reasonJson)
- }
- /*if vdata, ok := data.(*jsonutils.JSONDict); ok {
- reason, err := vdata.Get("__reason__") // only dict support Get
- if err != nil {
- reason = jsonutils.NewString(fmt.Sprintf("Task failed due to unknown remote errors! %s", odata))
- vdata.Set("__reason__", reason)
- }
- }*/
- }
- data = dictdata
- default:
- data = odata
- }
- } else {
- data = jsonutils.NewDict()
- }
- stageName := task.Stage
- if taskFailed {
- stageName = fmt.Sprintf("%sFailed", task.Stage)
- if strings.Contains(stageName, "_") {
- stageName = fmt.Sprintf("%s_failed", task.Stage)
- }
- }
- if strings.Contains(stageName, "_") {
- stageName = utils.Kebab2Camel(stageName, "_")
- }
- funcValue := taskValue.MethodByName(stageName)
- if !funcValue.IsValid() || funcValue.IsNil() {
- msg := fmt.Sprintf("Stage %s not found", stageName)
- if taskFailed {
- // failed handler is optional, ignore the error
- log.Warningf("%s", msg)
- msg, _ = data.GetString()
- } else {
- log.Errorf("%s", msg)
- }
- task.SetStageFailed(ctx, jsonutils.NewString(msg))
- task.SaveRequestContext(&ctxData)
- return
- }
- objManager := db.GetModelManager(task.ObjType)
- if objManager == nil {
- msg := fmt.Sprintf("model %s %s(%s) not found??? ...", task.ObjType, task.Object, task.ObjId)
- log.Errorf("%s", msg)
- task.SetStageFailed(ctx, jsonutils.NewString(msg))
- task.SaveRequestContext(&ctxData)
- return
- }
- // log.Debugf("objManager: %s", objManager)
- objResManager, ok := objManager.(db.IStandaloneModelManager)
- if !ok {
- msg := fmt.Sprintf("model %s %s(%s) is not a resource??? ...", task.ObjType, task.Object, task.ObjId)
- log.Errorf("%s", msg)
- task.SetStageFailed(ctx, jsonutils.NewString(msg))
- task.SaveRequestContext(&ctxData)
- return
- }
- params := make([]reflect.Value, 3)
- params[0] = reflect.ValueOf(ctx)
- if isMulti {
- objIds := TaskObjectManager.GetObjectIds(task)
- objs := make([]db.IStandaloneModel, len(objIds))
- for i, objId := range objIds {
- obj, err := objResManager.FetchById(objId)
- if err != nil {
- msg := fmt.Sprintf("fail to find %s object %s", task.ObjType, objId)
- log.Errorf("%s", msg)
- task.SetStageFailed(ctx, jsonutils.NewString(msg))
- task.SaveRequestContext(&ctxData)
- return
- }
- objs[i] = obj.(db.IStandaloneModel)
- }
- task.taskObjects = objs
- // sort objects by ids to avoid deadlock
- sort.Sort(sSortedObjects(objs))
- for i := range objs {
- lockman.LockObject(ctx, objs[i])
- defer lockman.ReleaseObject(ctx, objs[i])
- }
- params[1] = reflect.ValueOf(objs)
- } else {
- obj, err := objResManager.FetchById(task.ObjId)
- if err != nil {
- msg := fmt.Sprintf("fail to find %s object %s", task.ObjType, task.ObjId)
- log.Errorf("%s", msg)
- task.SetStageFailed(ctx, jsonutils.NewString(msg))
- task.SaveRequestContext(&ctxData)
- return
- }
- task.taskObject = obj.(db.IStandaloneModel)
- lockman.LockObject(ctx, obj)
- defer lockman.ReleaseObject(ctx, obj)
- params[1] = reflect.ValueOf(obj)
- }
- params[2] = reflect.ValueOf(data)
- filled := reflectutils.FillEmbededStructValue(taskValue.Elem(), reflect.Indirect(reflect.ValueOf(task)))
- if !filled {
- log.Errorf("Cannot locate baseTask embedded struct, give up...")
- return
- }
- defer func() {
- if r := recover(); r != nil {
- // call set stage failed, should not call task.SetStageFailed
- // func SetStageFailed may be overloading
- yunionconf.BugReport.SendBugReport(ctx, version.GetShortString(), string(debug.Stack()), errors.Errorf("%s", r))
- log.Errorf("Task %s PANIC on stage %s: %v \n%s", task.TaskName, stageName, r, debug.Stack())
- SetStageFailedFuncValue := taskValue.MethodByName("SetStageFailed")
- SetStageFailedFuncValue.Call(
- []reflect.Value{
- reflect.ValueOf(ctx),
- reflect.ValueOf(jsonutils.NewString(fmt.Sprintf("%v", r))),
- },
- )
- obj, err := objResManager.FetchById(task.ObjId)
- if err != nil {
- return
- }
- statusObj, ok := obj.(db.IStatusStandaloneModel)
- if ok {
- db.StatusBaseSetStatus(ctx, statusObj, task.GetUserCred(), apis.STATUS_UNKNOWN, fmt.Sprintf("%v", r))
- }
- notes := map[string]interface{}{
- "Stack": string(debug.Stack()),
- "Version": version.GetShortString(),
- "Task": task.TaskName,
- "Stage": stageName,
- "Message": fmt.Sprintf("%v", r),
- }
- logclient.AddSimpleActionLog(obj, logclient.ACT_PANIC, notes, task.GetUserCred(), false)
- }
- }()
- log.Debugf("Call %s(%s) %s %#v", task.TaskName, task.Id, stageName, params)
- funcValue.Call(params)
- // call save request context
- saveRequestContextFuncValue := taskValue.MethodByName("SaveRequestContext")
- saveRequestContextFuncValue.Call([]reflect.Value{reflect.ValueOf(&ctxData)})
- }
- func (task *STask) ScheduleRun(data jsonutils.JSONObject) error {
- return runTask(task.Id, data)
- }
- func (task *STask) IsSubtask() bool {
- return task.HasParentTask()
- }
- func (task *STask) GetParentTaskId() string {
- if len(task.ParentTaskId) > 0 {
- return task.ParentTaskId
- }
- parentTaskId, _ := task.Params.GetString(PARENT_TASK_ID_KEY)
- if len(parentTaskId) > 0 {
- return parentTaskId
- }
- return ""
- }
- func (task *STask) HasParentTask() bool {
- parentTaskId := task.GetParentTaskId()
- if len(parentTaskId) > 0 {
- return true
- }
- return false
- }
- func (task *STask) GetParentTask() *STask {
- parentTaskId := task.GetParentTaskId()
- if len(parentTaskId) > 0 {
- return TaskManager.fetchTask(parentTaskId)
- }
- return nil
- }
- func (task *STask) GetRequestContext() appctx.AppContextData {
- ctxData := appctx.AppContextData{}
- if task.Params != nil {
- ctxJson, _ := task.Params.Get(REQUEST_CONTEXT_KEY)
- if ctxJson != nil {
- ctxJson.Unmarshal(&ctxData)
- }
- }
- // clear parentTaskId
- ctxData.TaskId = ""
- ctxData.TaskNotifyUrl = ""
- return ctxData
- }
- func (task *STask) SaveRequestContext(data *appctx.AppContextData) {
- jsonData := jsonutils.Marshal(data)
- log.Debugf("SaveRequestContext %s(%s) %s param %s", task.TaskName, task.Id, jsonData, task.Params)
- _, err := db.Update(task, func() error {
- params := task.Params.CopyExcludes(REQUEST_CONTEXT_KEY)
- params.Add(jsonData, REQUEST_CONTEXT_KEY)
- task.Params = params
- task.EndAt = timeutils.UtcNow()
- return nil
- })
- log.Debugf("Params: %s(%s) %s", task.TaskName, task.Id, task.Params)
- if err != nil {
- log.Errorf("save_request_context fail %s", err)
- }
- }
- func (task *STask) SaveParams(data *jsonutils.JSONDict) error {
- return task.SetStage("", data)
- }
- func (task *STask) SetStage(stageName string, data *jsonutils.JSONDict) error {
- _, err := db.Update(task, func() error {
- params := jsonutils.NewDict()
- params.Update(task.Params)
- if data != nil {
- params.Update(data)
- }
- if len(stageName) > 0 {
- stages, _ := params.Get("__stages")
- if stages == nil {
- stages = jsonutils.NewArray()
- params.Add(stages, "__stages")
- }
- stageList := stages.(*jsonutils.JSONArray)
- stageData := jsonutils.NewDict()
- stageData.Add(jsonutils.NewString(task.Stage), "name")
- stageData.Add(jsonutils.NewTimeString(time.Now()), "complete_at")
- stageList.Add(stageData)
- task.Stage = stageName
- }
- task.Params = params
- return nil
- })
- if err != nil {
- log.Errorf("Task %s(%s) set_stage %s fail %s", task.TaskName, task.Id, stageName, err)
- }
- return err
- }
- func (task *STask) GetObjectIdStr() string {
- if task.ObjId == MULTI_OBJECTS_ID {
- return strings.Join(TaskObjectManager.GetObjectIds(task), ",")
- } else {
- return task.ObjId
- }
- }
- func (task *STask) GetObjectStr() string {
- if task.ObjId == MULTI_OBJECTS_ID {
- return strings.Join(TaskObjectManager.GetObjectNames(task), ",")
- } else {
- return task.Object
- }
- }
- func (task *STask) SetStageComplete(ctx context.Context, data *jsonutils.JSONDict) {
- log.Infof("XXX TASK %s(%s) complete", task.TaskName, task.Id)
- task.SetStage(TASK_STAGE_COMPLETE, data)
- task.SetProgressAndStatus(100, taskStatusDone)
- if data == nil {
- data = jsonutils.NewDict()
- }
- if data.Size() == 0 {
- data.Add(jsonutils.NewString(task.GetObjectIdStr()), "id")
- data.Add(jsonutils.NewString(task.GetObjectStr()), "name")
- data.Add(jsonutils.NewString(task.ObjType), "type")
- }
- task.NotifyParentTaskComplete(ctx, data, false)
- }
- func (task *STask) SetStageFailed(ctx context.Context, reason jsonutils.JSONObject) {
- if task.Stage == TASK_STAGE_FAILED {
- log.Warningf("Task %s(%s) has been failed", task.TaskName, task.Id)
- return
- }
- log.Infof("XXX TASK %s(%s) failed: %s on stage %s", task.TaskName, task.Id, reason, task.Stage)
- reasonDict := jsonutils.NewDict()
- reasonDict.Add(jsonutils.NewString(task.Stage), "stage")
- if reason != nil {
- reasonDict.Add(reason, "reason")
- }
- reason = reasonDict
- prevFailed, _ := task.Params.Get("__failed_reason")
- if prevFailed != nil {
- switch prevFailed2 := prevFailed.(type) {
- case *jsonutils.JSONArray:
- prevFailed2.Add(reason)
- reason = prevFailed
- default:
- reason = jsonutils.NewArray(prevFailed, reason)
- }
- }
- data := jsonutils.NewDict()
- data.Add(reason, "__failed_reason")
- task.SetStage(TASK_STAGE_FAILED, data)
- task.SetProgressAndStatus(100, taskStatusDone)
- task.NotifyParentTaskFailure(ctx, reason)
- }
- func (task *STask) NotifyParentTaskComplete(ctx context.Context, body *jsonutils.JSONDict, failed bool) {
- log.Infof("notify_parent_task_complete: %s(%s) params %s", task.TaskName, task.Id, task.Params)
- parentTaskId := task.GetParentTaskId()
- parentTaskNotify, _ := task.Params.GetString(PARENT_TASK_NOTIFY_KEY)
- if len(parentTaskId) > 0 {
- subTask := SubTaskManager.GetSubTask(parentTaskId, task.Id)
- if subTask != nil {
- subTask.SaveResults(failed, body)
- }
- func() {
- lockman.LockRawObject(ctx, "tasks", parentTaskId)
- defer lockman.ReleaseRawObject(ctx, "tasks", parentTaskId)
- pTask := TaskManager.fetchTask(parentTaskId)
- if pTask == nil {
- saveTaskUpCallStatus(task.GetId(), parentTaskId, false, errors.Wrap(errors.ErrNotFound, "Parent task not found").Error())
- log.Errorf("Parent task %s not found", parentTaskId)
- return
- }
- if pTask.IsCurrentStageComplete() {
- err := pTask.ScheduleRun(body)
- if err != nil {
- saveTaskUpCallStatus(task.GetId(), parentTaskId, false, err.Error())
- } else {
- saveTaskUpCallStatus(task.GetId(), parentTaskId, true, "")
- }
- }
- }()
- }
- if len(parentTaskNotify) > 0 {
- header := task.getTaskHeader()
- go notifyRemoteTask(ctx, parentTaskNotify, task.GetId(), header, body, 0)
- }
- }
- func notifyRemoteTask(ctx context.Context, notifyUrl string, taskId string, header http.Header, body jsonutils.JSONObject, tried uint) {
- client := httputils.GetDefaultClient()
- _, body, err := httputils.JSONRequest(client, ctx, "POST", notifyUrl, header, body, false)
- if err != nil {
- log.Errorf("notifyRemoteTask fail %s", err)
- if tried > MAX_REMOTE_NOTIFY_TRIES {
- log.Errorf("notifyRemoteTask max tried reached, give up...")
- saveTaskUpCallStatus(taskId, notifyUrl, false, fmt.Sprintf("notifyRemoteTask max tried reached, give up... %s", err.Error()))
- } else {
- time.Sleep(time.Second * time.Duration(1<<tried))
- notifyRemoteTask(ctx, notifyUrl, taskId, header, body, tried+1)
- }
- return
- } else {
- log.Infof("Notify remote URL %s get acked: %s, taskId: %s", notifyUrl, body.String(), taskId)
- saveTaskUpCallStatus(taskId, notifyUrl, true, body.String())
- }
- }
- func (task *STask) NotifyParentTaskFailure(ctx context.Context, reason jsonutils.JSONObject) {
- body := jsonutils.NewDict()
- body.Add(jsonutils.NewString("error"), "__status__")
- body.Add(jsonutils.NewString(task.TaskName), "__task_name__")
- body.Add(reason, "__reason__")
- task.NotifyParentTaskComplete(ctx, body, true)
- }
- func (task *STask) IsCurrentStageComplete() bool {
- totalSubtasksCnt, _ := SubTaskManager.GetTotalSubtasksCount(task.Id, task.Stage)
- initSubtasksCnt, _ := SubTaskManager.GetInitSubtasksCount(task.Id, task.Stage)
- log.Debugf("Task %s IsCurrentStageComplete totalSubtasks %d initSubtasks %d ", task.String(), totalSubtasksCnt, initSubtasksCnt)
- task.SetProgress(float32(totalSubtasksCnt-initSubtasksCnt) * 100 / float32(totalSubtasksCnt))
- if totalSubtasksCnt > 0 && initSubtasksCnt == 0 {
- return true
- } else {
- return false
- }
- }
- func (task *STask) GetPendingUsage(quota quotas.IQuota, index int) error {
- key := pendingUsageKey(index)
- if task.Params.Contains(key) {
- quotaJson, err := task.Params.Get(key)
- if err != nil {
- return errors.Wrapf(err, "task.Params.Get %s", key)
- }
- err = quotaJson.Unmarshal(quota)
- if err != nil {
- return errors.Wrap(err, "quotaJson.Unmarshal")
- }
- }
- return nil
- }
- func pendingUsageKey(index int) string {
- key := PENDING_USAGE_KEY
- if index > 0 {
- key += "." + strconv.FormatInt(int64(index), 10)
- }
- return key
- }
- func (task *STask) SetPendingUsage(quota quotas.IQuota, index int) error {
- _, err := db.Update(task, func() error {
- key := pendingUsageKey(index)
- params := task.Params.CopyExcludes(key)
- params.Add(jsonutils.Marshal(quota), key)
- task.Params = params
- return nil
- })
- if err != nil {
- log.Errorf("set_pending_usage fail %s", err)
- }
- return err
- }
- func (task *STask) ClearPendingUsage(index int) error {
- _, err := db.Update(task, func() error {
- key := pendingUsageKey(index)
- params := task.Params.CopyExcludes(key)
- task.Params = params
- return nil
- })
- if err != nil {
- log.Errorf("clear_pending_usage fail %s", err)
- }
- return err
- }
- func (task *STask) GetParams() *jsonutils.JSONDict {
- return task.Params
- }
- func (task *STask) GetUserCred() mcclient.TokenCredential {
- return task.UserCred
- }
- func (task *STask) GetTaskId() string {
- return task.GetId()
- }
- func (task *STask) GetObject() db.IStandaloneModel {
- return task.taskObject
- }
- func (task *STask) GetObjects() []db.IStandaloneModel {
- return task.taskObjects
- }
- func (task *STask) getTaskHeader() http.Header {
- userCred := task.GetUserCred()
- if !userCred.IsValid() {
- userCred = auth.AdminCredential()
- }
- header := mcclient.GetTokenHeaders(userCred)
- header.Set(mcclient.TASK_ID, task.GetTaskId())
- return header
- }
- func (task *STask) GetTaskRequestHeader() http.Header {
- header := task.getTaskHeader()
- if len(serviceUrl) > 0 {
- notifyUrl := fmt.Sprintf("%s/tasks/%s", serviceUrl, task.GetTaskId())
- header.Set(mcclient.TASK_NOTIFY_URL, notifyUrl)
- }
- return header
- }
- func (task *STask) String() string {
- return fmt.Sprintf("%s(%s,%s)", task.Id, task.TaskName, task.Stage)
- }
- var serviceUrl string
- func SetServiceUrl(url string) {
- serviceUrl = url
- }
- func (task *STask) GetStartTime() time.Time {
- return task.CreatedAt
- }
- func (manager *STaskManager) QueryTasksOfObject(obj db.IStandaloneModel, since time.Time, isOpen *bool) *sqlchemy.SQuery {
- subq1 := manager.Query()
- {
- subq1 = subq1.Equals("obj_id", obj.GetId())
- subq1 = subq1.Equals("obj_type", obj.Keyword())
- if !since.IsZero() {
- subq1 = subq1.GE("created_at", since)
- }
- if isOpen != nil {
- if *isOpen {
- subq1 = subq1.Filter(sqlchemy.NOT(
- sqlchemy.In(subq1.Field("stage"), []string{"complete", "failed"}),
- ))
- } else if !*isOpen {
- subq1 = subq1.In("stage", []string{"complete", "failed"})
- }
- }
- }
- subq2 := manager.Query()
- {
- taskObjs := TaskObjectManager.TableSpec().Instance()
- subq2 = subq2.Join(taskObjs, sqlchemy.AND(
- sqlchemy.Equals(taskObjs.Field("task_id"), subq2.Field("id")),
- sqlchemy.Equals(taskObjs.Field("obj_id"), obj.GetId()),
- ))
- subq2 = subq2.Filter(sqlchemy.Equals(subq2.Field("obj_id"), MULTI_OBJECTS_ID))
- subq2 = subq2.Filter(sqlchemy.Equals(subq2.Field("obj_type"), obj.Keyword()))
- if !since.IsZero() {
- subq2 = subq2.Filter(sqlchemy.GE(subq2.Field("created_at"), since))
- }
- if isOpen != nil {
- if *isOpen {
- subq2 = subq2.Filter(sqlchemy.NOT(
- sqlchemy.In(subq2.Field("stage"), []string{"complete", "failed"}),
- ))
- } else if !*isOpen {
- subq2 = subq2.In("stage", []string{"complete", "failed"})
- }
- }
- }
- // subq1 and subq2 do not overlap for the fact that they have
- // different conditions on tasks_tbl.obj_id field
- return sqlchemy.Union(subq1, subq2).Query().Desc("created_at")
- }
- func (manager *STaskManager) IsInTask(obj db.IStandaloneModel) bool {
- tasks, err := manager.FetchIncompleteTasksOfObject(obj)
- if err == nil && len(tasks) == 0 {
- return false
- }
- return true
- }
- func (manager *STaskManager) FetchIncompleteTasksOfObject(obj db.IStandaloneModel) ([]STask, error) {
- isOpen := true
- return manager.FetchTasksOfObjectLatest(obj, 1*time.Hour, &isOpen)
- }
- func (manager *STaskManager) FetchTasksOfObjectLatest(obj db.IStandaloneModel, interval time.Duration, isOpen *bool) ([]STask, error) {
- since := timeutils.UtcNow().Add(-1 * interval)
- return manager.FetchTasksOfObject(obj, since, isOpen)
- }
- func (manager *STaskManager) FetchTasksOfObject(obj db.IStandaloneModel, since time.Time, isOpen *bool) ([]STask, error) {
- q := manager.QueryTasksOfObject(obj, since, isOpen)
- tasks := make([]STask, 0)
- err := db.FetchModelObjects(manager, q, &tasks)
- if err != nil && err != sql.ErrNoRows {
- return nil, err
- }
- return tasks, nil
- }
- // 操作日志列表
- func (manager *STaskManager) ListItemFilter(
- ctx context.Context,
- q *sqlchemy.SQuery,
- userCred mcclient.TokenCredential,
- input apis.TaskListInput,
- ) (*sqlchemy.SQuery, error) {
- var err error
- q, err = manager.SModelBaseManager.ListItemFilter(ctx, q, userCred, input.ModelBaseListInput)
- if err != nil {
- return q, errors.Wrap(err, "SResourceBaseManager.ListItemFilter")
- }
- q, err = manager.SStatusResourceBaseManager.ListItemFilter(ctx, q, userCred, input.StatusResourceBaseListInput)
- if err != nil {
- return q, errors.Wrap(err, "SStatusResourceBaseManager.ListItemFilter")
- }
- if len(input.Id) > 0 {
- q = q.In("id", input.Id)
- }
- if len(input.ObjId) > 0 {
- taskObjQ := TaskObjectManager.Query("task_id").In("obj_id", input.ObjId).Distinct().SubQuery()
- q = q.Join(taskObjQ, sqlchemy.Equals(q.Field("id"), taskObjQ.Field("task_id")))
- }
- if len(input.ObjName) > 0 {
- q = q.In("object", input.ObjName)
- }
- if len(input.ObjType) > 0 {
- q = q.In("obj_type", input.ObjType)
- }
- if len(input.TaskName) > 0 {
- q = q.In("task_name", input.TaskName)
- }
- if len(input.Stage) > 0 {
- q = q.In("stage", input.Stage)
- }
- if len(input.NotStage) > 0 {
- q = q.NotIn("stage", input.NotStage)
- }
- if len(input.ParentId) > 0 {
- q = q.In("parent_task_id", input.ParentId)
- }
- if input.IsComplete != nil {
- if *input.IsComplete {
- q = q.In("stage", []string{TASK_STAGE_FAILED, TASK_STAGE_COMPLETE})
- } else {
- q = q.NotIn("stage", []string{TASK_STAGE_FAILED, TASK_STAGE_COMPLETE})
- }
- }
- if input.IsInit != nil {
- if *input.IsInit {
- q = q.Equals("stage", TASK_INIT_STAGE)
- } else {
- q = q.NotEquals("stage", TASK_INIT_STAGE)
- }
- }
- if input.IsMulti != nil {
- if *input.IsMulti {
- q = q.Equals("obj_id", MULTI_OBJECTS_ID)
- } else {
- q = q.NotEquals("obj_id", MULTI_OBJECTS_ID)
- }
- }
- if input.IsRoot != nil {
- if *input.IsRoot {
- q = q.IsNullOrEmpty("parent_task_id")
- } else {
- q = q.IsNotEmpty("parent_task_id")
- }
- }
- if len(input.ParentTaskId) > 0 {
- q = q.Equals("parent_task_id", input.ParentTaskId)
- }
- if input.SubTask != nil && *input.SubTask {
- subSQFunc := func(status string, cntField string) *sqlchemy.SSubQuery {
- subQ := SubTaskManager.Query()
- if len(status) > 0 {
- subQ = subQ.Equals("status", status)
- }
- subQ = subQ.GroupBy(subQ.Field("task_id"))
- subQ = subQ.AppendField(subQ.Field("task_id"))
- subQ = subQ.AppendField(sqlchemy.COUNT(cntField))
- return subQ.SubQuery()
- }
- {
- subSQ := subSQFunc("", "sub_task_count")
- q = q.LeftJoin(subSQ, sqlchemy.Equals(subSQ.Field("task_id"), q.Field("id")))
- q = q.AppendField(subSQ.Field("sub_task_count"))
- }
- {
- failSubSQ := subSQFunc(SUBTASK_FAIL, "fail_sub_task_cnt")
- q = q.LeftJoin(failSubSQ, sqlchemy.Equals(failSubSQ.Field("task_id"), q.Field("id")))
- q = q.AppendField(failSubSQ.Field("fail_sub_task_cnt"))
- }
- {
- succSubSQ := subSQFunc(SUBTASK_SUCC, "succ_sub_task_cnt")
- q = q.LeftJoin(succSubSQ, sqlchemy.Equals(succSubSQ.Field("task_id"), q.Field("id")))
- q = q.AppendField(succSubSQ.Field("succ_sub_task_cnt"))
- }
- for _, c := range manager.TableSpec().Columns() {
- q = q.AppendField(q.Field(c.Name()))
- }
- }
- // q.DebugQuery2("taskQuery")
- return q, nil
- }
- func (manager *STaskManager) ListItemExportKeys(ctx context.Context, q *sqlchemy.SQuery, userCred mcclient.TokenCredential, keys stringutils2.SSortedStrings) (*sqlchemy.SQuery, error) {
- var err error
- q, err = manager.SModelBaseManager.ListItemExportKeys(ctx, q, userCred, keys)
- if err != nil {
- return nil, errors.Wrap(err, "SModelBaseManager.ListItemExportKeys")
- }
- // q, err = manager.SProjectizedResourceBaseManager.ListItemExportKeys(ctx, q, userCred, keys)
- // if err != nil {
- // return nil, errors.Wrap(err, "SProjectizedResourceBaseManager.ListItemExportKeys")
- // }
- return q, nil
- }
- func (manager *STaskManager) QueryDistinctExtraField(q *sqlchemy.SQuery, field string) (*sqlchemy.SQuery, error) {
- var err error
- q, err = manager.SModelBaseManager.QueryDistinctExtraField(q, field)
- if err == nil {
- return q, nil
- }
- // q, err = manager.SProjectizedResourceBaseManager.QueryDistinctExtraField(q, field)
- // if err == nil {
- // return q, nil
- // }
- return q, httperrors.ErrNotFound
- }
- func (manager *STaskManager) ResourceScope() rbacscope.TRbacScope {
- return manager.SProjectizedResourceBaseManager.ResourceScope()
- }
- func (manager *STaskManager) FetchCustomizeColumns(
- ctx context.Context,
- userCred mcclient.TokenCredential,
- query jsonutils.JSONObject,
- objs []interface{},
- fields stringutils2.SSortedStrings,
- isList bool,
- ) []apis.TaskDetails {
- rows := make([]apis.TaskDetails, len(objs))
- bases := manager.SModelBaseManager.FetchCustomizeColumns(ctx, userCred, query, objs, fields, isList)
- projs := manager.SProjectizedResourceBaseManager.FetchCustomizeColumns(ctx, userCred, query, objs, fields, isList)
- for i := range objs {
- rows[i] = apis.TaskDetails{
- ModelBaseDetails: bases[i],
- ProjectizedResourceInfo: projs[i],
- }
- }
- return rows
- }
- func (manager *STaskManager) OrderByExtraFields(
- ctx context.Context,
- q *sqlchemy.SQuery,
- userCred mcclient.TokenCredential,
- query apis.TaskListInput,
- ) (*sqlchemy.SQuery, error) {
- var err error
- q, err = manager.SModelBaseManager.OrderByExtraFields(ctx, q, userCred, query.ModelBaseListInput)
- if err != nil {
- return q, errors.Wrap(err, "SModelBaseManager.OrderByExtraField")
- }
- // q, err = manager.SProjectizedResourceBaseManager.OrderByExtraFields(ctx, q, userCred, query.ProjectizedResourceListInput)
- // if err != nil {
- // return q, errors.Wrap(err, "SProjectizedResourceBaseManager.OrderByExtraField")
- // }
- return q, nil
- }
- func (task *STask) SetProgressAndStatus(progress float32, status string) error {
- _, err := db.Update(task, func() error {
- task.SetStatusValue(status)
- task.SetProgressValue(progress)
- return nil
- })
- if err != nil {
- return errors.Wrap(err, "Update")
- }
- return nil
- }
- func (task *STask) SetProgress(progress float32) error {
- _, err := db.Update(task, func() error {
- task.SetProgressValue(progress)
- return nil
- })
- if err != nil {
- return errors.Wrap(err, "Update")
- }
- return nil
- }
- func (manager *STaskManager) InitializeData() error {
- {
- err := manager.failTimeoutTasks()
- if err != nil {
- return errors.Wrap(err, "failTimeoutTasks")
- }
- }
- {
- err := manager.migrateObjectInfo()
- if err != nil {
- return errors.Wrap(err, "migrateObjectInfo")
- }
- }
- {
- err := manager.clearnUpSubtasks()
- if err != nil {
- return errors.Wrap(err, "clearnUpSubtasks")
- }
- }
- {
- err := manager.clearnUpTaskObjects()
- if err != nil {
- return errors.Wrap(err, "clearnUpTaskObjects")
- }
- }
- return nil
- }
- func (manager *STaskManager) failTimeoutTasks() error {
- q := manager.Query().NotIn("stage", []string{TASK_STAGE_FAILED, TASK_STAGE_COMPLETE})
- tasks := make([]STask, 0)
- err := db.FetchModelObjects(manager, q, &tasks)
- if err != nil {
- return errors.Wrap(err, "FetchModelObjects")
- }
- reason := jsonutils.NewDict()
- reason.Add(jsonutils.NewString("service restart"), "__reason__")
- reason.Add(jsonutils.NewString("error"), "__status__")
- for i := range tasks {
- task := &tasks[i]
- task.fixParams()
- manager.execTaskObject(task, reason)
- }
- return nil
- }
- func (manager *STaskManager) clearnUpSubtasks() error {
- start := time.Now()
- log.Infof("start clearnUpSubtasks")
- defer func() {
- log.Infof("end clearnUpSubtasks, takes %f seconds", time.Since(start).Seconds())
- }()
- q := SubTaskManager.Query("task_id")
- tasksQ := TaskManager.Query().SubQuery()
- q = q.LeftJoin(tasksQ, sqlchemy.Equals(q.Field("task_id"), tasksQ.Field("id")))
- q = q.Filter(sqlchemy.IsNull(tasksQ.Field("id")))
- q = q.Distinct()
- rows, err := q.Rows()
- if err != nil {
- return errors.Wrap(err, "q.Rows")
- }
- defer rows.Close()
- for rows.Next() {
- val, err := q.Row2Map(rows)
- if err != nil {
- return errors.Wrap(err, "Row2Map")
- }
- sql := fmt.Sprintf("delete from `%s` where task_id = '%s'", SubTaskManager.TableSpec().Name(), val["task_id"])
- log.Infof("%s", sql)
- _, err = SubTaskManager.TableSpec().GetTableSpec().Database().Exec(sql)
- if err != nil {
- return errors.Wrap(err, "exec")
- }
- sql = fmt.Sprintf("delete from `%s` where task_id = '%s'", TaskObjectManager.TableSpec().Name(), val["task_id"])
- log.Infof("%s", sql)
- _, err = TaskObjectManager.TableSpec().GetTableSpec().Database().Exec(sql)
- if err != nil {
- return errors.Wrap(err, "exec")
- }
- }
- return nil
- }
- func (manager *STaskManager) clearnUpTaskObjects() error {
- start := time.Now()
- log.Infof("start clearnUpTaskObjects")
- defer func() {
- log.Infof("end clearnUpTaskObjects, takes %f seconds", time.Since(start).Seconds())
- }()
- q := TaskObjectManager.Query("task_id")
- tasksQ := TaskManager.Query().SubQuery()
- q = q.LeftJoin(tasksQ, sqlchemy.Equals(q.Field("task_id"), tasksQ.Field("id")))
- q = q.Filter(sqlchemy.IsNull(tasksQ.Field("id")))
- q = q.Distinct()
- rows, err := q.Rows()
- if err != nil {
- return errors.Wrap(err, "q.Rows")
- }
- defer rows.Close()
- for rows.Next() {
- val, err := q.Row2Map(rows)
- if err != nil {
- return errors.Wrap(err, "Row2Map")
- }
- sql := fmt.Sprintf("delete from `%s` where task_id = '%s'", TaskObjectManager.TableSpec().Name(), val["task_id"])
- log.Infof("%s", sql)
- _, err = TaskObjectManager.TableSpec().GetTableSpec().Database().Exec(sql)
- if err != nil {
- return errors.Wrap(err, "exec")
- }
- }
- return nil
- }
- func (manager *STaskManager) migrateObjectInfo() error {
- start := time.Now()
- log.Infof("start migrateObjectInfo")
- defer func() {
- log.Infof("end migrateObjectInfo, takes %f seconds", time.Since(start).Seconds())
- }()
- q := manager.Query().NotEquals("obj_id", MULTI_OBJECTS_ID)
- taskObj := TaskObjectManager.Query().SubQuery()
- q = q.LeftJoin(taskObj, sqlchemy.Equals(q.Field("id"), taskObj.Field("task_id")))
- q = q.Filter(sqlchemy.IsNull(taskObj.Field("task_id")))
- q = q.Asc("created_at")
- // q.DebugQuery2("migrateObjectInfo")
- rows, err := q.Rows()
- if err != nil {
- return errors.Wrap(err, "query.Rows")
- }
- defer rows.Close()
- for rows.Next() {
- task := STask{}
- task.SetModelManager(manager, &task)
- err := q.Row2Struct(rows, &task)
- if err != nil {
- return errors.Wrap(err, "row2struct")
- }
- taskObj := STaskObject{}
- taskObj.ObjId = task.ObjId
- taskObj.Object = task.Object
- taskObj.TaskId = task.Id
- taskObj.DomainId = task.DomainId
- taskObj.ProjectId = task.ProjectId
- taskObj.SetModelManager(TaskObjectManager, &taskObj)
- err = TaskObjectManager.TableSpec().Insert(ctx.CtxWithTime(), &taskObj)
- if err != nil {
- return errors.Wrap(err, "Insert taskObject")
- }
- }
- return nil
- }
- var (
- taskCleanuoWorkerManager = appsrv.NewWorkerManager(
- "taskCleanupWorkerManager",
- 1,
- 1024,
- true,
- )
- )
- func (manager *STaskManager) TaskCleanupJob(ctx context.Context, userCred mcclient.TokenCredential, isStart bool) {
- taskCleanuoWorkerManager.Run(&sTaskCleanupJob{}, nil, nil)
- }
- type sTaskCleanupJob struct{}
- func (job *sTaskCleanupJob) Run() {
- count, err := TaskManager.doTaskCleanupJob()
- if err != nil {
- log.Errorf("doTaskCleanupJob fail %s", err)
- return
- }
- if count > 0 {
- taskCleanuoWorkerManager.Run(&sTaskCleanupJob{}, nil, nil)
- }
- }
- func (job *sTaskCleanupJob) Dump() string {
- return "TaskCleanupJob"
- }
- func (manager *STaskManager) doTaskCleanupJob() (int, error) {
- ctx := context.WithValue(context.Background(), "task_cleanup_job", true)
- q := manager.Query().LT("created_at", time.Now().Add(-time.Duration(consts.TaskArchiveThresholdHours())*time.Hour)).Asc("created_at")
- if consts.TaskArchiveBatchLimit() > 0 {
- q = q.Limit(consts.TaskArchiveBatchLimit())
- }
- rows, err := q.Rows()
- if err != nil {
- log.Errorf("query rows fail %s", err)
- return 0, errors.Wrap(err, "query rows")
- }
- defer rows.Close()
- taskStart := time.Now()
- count := 0
- for rows.Next() {
- task := STask{}
- err := q.Row2Struct(rows, &task)
- if err != nil {
- log.Errorf("Row2Struct fail %s", err)
- return 0, errors.Wrap(err, "row2struct")
- }
- task.SetModelManager(ArchivedTaskManager, &task)
- err = ArchivedTaskManager.Insert(ctx, &task)
- if err != nil {
- log.Errorf("insert archive fail %s", err)
- return 0, errors.Wrap(err, "insert archive")
- }
- // cleanup
- for _, sql := range []string{
- fmt.Sprintf("DELETE FROM `%s` WHERE id = ?", manager.TableSpec().Name()),
- fmt.Sprintf("DELETE FROM `%s` WHERE task_id = ?", TaskObjectManager.TableSpec().Name()),
- fmt.Sprintf("DELETE FROM `%s` WHERE task_id = ?", SubTaskManager.TableSpec().Name()),
- } {
- _, err := manager.TableSpec().GetTableSpec().Database().Exec(sql, task.Id)
- if err != nil {
- log.Errorf("exec %s %s fail: %s", sql, task.Id, err)
- return 0, errors.Wrap(err, "exec")
- }
- }
- count++
- }
- log.Infof("TaskCleanupJob migrate %d tasks, takes %f seconds, batch limit=%d threshold hours=%d", count, time.Since(taskStart).Seconds(), consts.TaskArchiveBatchLimit(), consts.TaskArchiveThresholdHours())
- return count, nil
- }
- func (task *STask) PerformCancel(
- ctx context.Context,
- userCred mcclient.TokenCredential,
- query jsonutils.JSONObject,
- input apis.TaskCancelInput,
- ) (jsonutils.JSONObject, error) {
- if utils.IsInArray(task.Stage, []string{TASK_STAGE_FAILED, TASK_STAGE_COMPLETE}) {
- return nil, errors.Wrapf(errors.ErrInvalidStatus, "cannot cancel stage in %s", task.Stage)
- }
- err := task.cancel(ctx, nil)
- if err != nil {
- return nil, errors.Wrap(err, "cancel")
- }
- return nil, nil
- }
- func (task *STask) fetchSubTasks() ([]STask, error) {
- q := task.GetModelManager().Query().Equals("parent_task_id", task.Id)
- tasks := make([]STask, 0)
- err := db.FetchModelObjects(task.GetModelManager(), q, &tasks)
- if err != nil {
- return nil, errors.Wrap(err, "db.FetchModelObjects")
- }
- return tasks, nil
- }
- func (task *STask) cancel(ctx context.Context, reason *jsonutils.JSONDict) error {
- if utils.IsInArray(task.Stage, []string{TASK_STAGE_FAILED, TASK_STAGE_COMPLETE}) {
- return nil
- }
- if reason == nil {
- reason = jsonutils.NewDict()
- reason.Add(jsonutils.NewString("cancel"), "__reason__")
- reason.Add(jsonutils.NewString("error"), "__status__")
- }
- subtasks, err := task.fetchSubTasks()
- if err != nil {
- return errors.Wrap(err, "fetchSubTasks")
- }
- for i := range subtasks {
- err := subtasks[i].cancel(ctx, reason)
- if err != nil {
- return errors.Wrap(err, "cancelTask")
- }
- }
- task.fixParams()
- TaskManager.execTask(task.GetTaskId(), reason)
- return nil
- }
|