disk_batch_create_task.go 5.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187
  1. // Copyright 2019 Yunion
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package disk
  15. import (
  16. "context"
  17. "fmt"
  18. "yunion.io/x/jsonutils"
  19. "yunion.io/x/log"
  20. api "yunion.io/x/onecloud/pkg/apis/compute"
  21. schedapi "yunion.io/x/onecloud/pkg/apis/scheduler"
  22. "yunion.io/x/onecloud/pkg/cloudcommon/db"
  23. "yunion.io/x/onecloud/pkg/cloudcommon/db/quotas"
  24. "yunion.io/x/onecloud/pkg/cloudcommon/db/taskman"
  25. "yunion.io/x/onecloud/pkg/cloudcommon/notifyclient"
  26. "yunion.io/x/onecloud/pkg/compute/models"
  27. "yunion.io/x/onecloud/pkg/compute/tasks/utils"
  28. "yunion.io/x/onecloud/pkg/util/logclient"
  29. )
  30. type DiskBatchCreateTask struct {
  31. utils.SSchedTask
  32. }
  33. func init() {
  34. taskman.RegisterTask(DiskBatchCreateTask{})
  35. }
  36. func (task *DiskBatchCreateTask) getNeedScheduleDisks(objs []db.IStandaloneModel) []db.IStandaloneModel {
  37. toSchedDisks := make([]db.IStandaloneModel, 0)
  38. for _, obj := range objs {
  39. disk := obj.(*models.SDisk)
  40. if disk.StorageId == "" {
  41. toSchedDisks = append(toSchedDisks, disk)
  42. }
  43. }
  44. return toSchedDisks
  45. }
  46. func (task *DiskBatchCreateTask) clearPendingUsage(ctx context.Context, disk *models.SDisk) {
  47. utils.ClearTaskPendingUsage(ctx, task)
  48. utils.ClearTaskPendingRegionUsage(ctx, task)
  49. }
  50. func (task *DiskBatchCreateTask) OnInit(ctx context.Context, objs []db.IStandaloneModel, body jsonutils.JSONObject) {
  51. toSchedDisks := task.getNeedScheduleDisks(objs)
  52. if len(toSchedDisks) == 0 {
  53. task.SetStage("OnScheduleComplete", nil)
  54. // create not need schedule disks directly
  55. for _, disk := range objs {
  56. task.startCreateDisk(ctx, disk.(*models.SDisk))
  57. }
  58. return
  59. }
  60. utils.StartScheduleObjects(ctx, task, toSchedDisks)
  61. }
  62. func (task *DiskBatchCreateTask) GetCreateInput(data jsonutils.JSONObject) (*api.DiskCreateInput, error) {
  63. input := new(api.DiskCreateInput)
  64. err := data.Unmarshal(input)
  65. return input, err
  66. }
  67. func (task *DiskBatchCreateTask) GetSchedParams() (*schedapi.ScheduleInput, error) {
  68. data := utils.GetBatchParamsAtIndex(task, 0)
  69. return task.getSchedParamsInternal(data)
  70. }
  71. func (task *DiskBatchCreateTask) getSchedParamsInternal(data jsonutils.JSONObject) (*schedapi.ScheduleInput, error) {
  72. input, err := task.GetCreateInput(data)
  73. if err != nil {
  74. return nil, err
  75. }
  76. ret := new(schedapi.ScheduleInput)
  77. srvInput := input.ToServerCreateInput()
  78. err = srvInput.JSON(srvInput).Unmarshal(ret)
  79. return ret, err
  80. }
  81. func (task *DiskBatchCreateTask) GetDisks(data jsonutils.JSONObject) ([]*api.DiskConfig, error) {
  82. input, err := task.getSchedParamsInternal(data)
  83. if err != nil {
  84. return nil, err
  85. }
  86. return input.Disks, nil
  87. }
  88. func (task *DiskBatchCreateTask) GetFirstDisk(data jsonutils.JSONObject) (*api.DiskConfig, error) {
  89. disks, err := task.GetDisks(data)
  90. if err != nil {
  91. return nil, err
  92. }
  93. if len(disks) == 0 {
  94. return nil, fmt.Errorf("Empty disks to schedule")
  95. }
  96. return disks[0], nil
  97. }
  98. func (task *DiskBatchCreateTask) OnScheduleFailCallback(ctx context.Context, obj utils.IScheduleModel, reason jsonutils.JSONObject, index int) {
  99. task.SSchedTask.OnScheduleFailCallback(ctx, obj, reason, index)
  100. disk := obj.(*models.SDisk)
  101. log.Errorf("Schedule disk %s failed", disk.Name)
  102. task.clearPendingUsage(ctx, disk)
  103. }
  104. func (task *DiskBatchCreateTask) SaveScheduleResult(ctx context.Context, obj utils.IScheduleModel, candidate *schedapi.CandidateResource, index int) {
  105. var err error
  106. disk := obj.(*models.SDisk)
  107. // pendingUsage := models.SQuota{}
  108. // err = task.GetPendingUsage(&pendingUsage, 0)
  109. // if err != nil {
  110. // log.Errorf("GetPendingUsage fail %s", err)
  111. // }
  112. // input, _ := task.GetCreateInput()
  113. // quotaPlatform := models.GetQuotaPlatformID(input.Hypervisor)
  114. // quotaStorage := models.SQuota{Storage: disk.DiskSize}
  115. onError := func(err error) {
  116. task.clearPendingUsage(ctx, disk)
  117. disk.SetStatus(ctx, task.UserCred, api.DISK_ALLOC_FAILED, "")
  118. logclient.AddActionLogWithStartable(task, disk, logclient.ACT_ALLOCATE, err, task.UserCred, false)
  119. task.SetStageFailed(ctx, jsonutils.NewString(err.Error()))
  120. db.OpsLog.LogEvent(disk, db.ACT_ALLOCATE_FAIL, err, task.UserCred)
  121. notifyclient.NotifySystemErrorWithCtx(ctx, disk.Id, disk.Name, api.DISK_ALLOC_FAILED, err.Error())
  122. }
  123. data := utils.GetBatchParamsAtIndex(task, index)
  124. diskConfig, err := task.GetFirstDisk(data)
  125. if err != nil {
  126. onError(err)
  127. return
  128. }
  129. storageIds := []string{}
  130. var hostId string
  131. if candidate != nil && len(candidate.Disks) != 0 {
  132. hostId = candidate.HostId
  133. storageIds = candidate.Disks[0].StorageIds
  134. }
  135. err = disk.SetStorageByHost(hostId, diskConfig, storageIds)
  136. if err != nil {
  137. onError(err)
  138. return
  139. }
  140. task.startCreateDisk(ctx, disk)
  141. }
  142. func (task *DiskBatchCreateTask) startCreateDisk(ctx context.Context, disk *models.SDisk) {
  143. pendingUsage := models.SQuota{}
  144. err := task.GetPendingUsage(&pendingUsage, 0)
  145. if err != nil {
  146. log.Warningf("GetPendingUsage fail %s", err)
  147. }
  148. quotaStorage := models.SQuota{Storage: disk.DiskSize}
  149. keys, err := disk.GetQuotaKeys()
  150. if err != nil {
  151. log.Warningf("disk.GetQuotaKeys fail %s", err)
  152. }
  153. quotaStorage.SetKeys(keys)
  154. quotas.CancelPendingUsage(ctx, task.UserCred, &pendingUsage, &quotaStorage, true) // success
  155. task.SetPendingUsage(&pendingUsage, 0)
  156. disk.StartDiskCreateTask(ctx, task.GetUserCred(), false, disk.SnapshotId, task.GetTaskId())
  157. }
  158. func (task *DiskBatchCreateTask) OnScheduleComplete(ctx context.Context, items []db.IStandaloneModel, data *jsonutils.JSONDict) {
  159. task.SetStageComplete(ctx, nil)
  160. }