guest_live_migrate_task.go 37 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975
  1. // Copyright 2019 Yunion
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package guest
  15. import (
  16. "context"
  17. "database/sql"
  18. "fmt"
  19. "yunion.io/x/jsonutils"
  20. "yunion.io/x/log"
  21. "yunion.io/x/pkg/errors"
  22. "yunion.io/x/pkg/util/httputils"
  23. "yunion.io/x/pkg/utils"
  24. api "yunion.io/x/onecloud/pkg/apis/compute"
  25. schedapi "yunion.io/x/onecloud/pkg/apis/scheduler"
  26. "yunion.io/x/onecloud/pkg/cloudcommon/db"
  27. "yunion.io/x/onecloud/pkg/cloudcommon/db/taskman"
  28. "yunion.io/x/onecloud/pkg/cloudcommon/notifyclient"
  29. "yunion.io/x/onecloud/pkg/compute/models"
  30. taskutils "yunion.io/x/onecloud/pkg/compute/tasks/utils"
  31. "yunion.io/x/onecloud/pkg/util/cgrouputils/cpuset"
  32. "yunion.io/x/onecloud/pkg/util/logclient"
  33. )
  34. type GuestMigrateTask struct {
  35. taskutils.SSchedTask
  36. }
  37. type GuestLiveMigrateTask struct {
  38. GuestMigrateTask
  39. }
  40. type ManagedGuestMigrateTask struct {
  41. SGuestBaseTask
  42. }
  43. type ManagedGuestLiveMigrateTask struct {
  44. SGuestBaseTask
  45. }
  46. func init() {
  47. taskman.RegisterTask(GuestLiveMigrateTask{})
  48. taskman.RegisterTask(GuestMigrateTask{})
  49. taskman.RegisterTask(ManagedGuestMigrateTask{})
  50. taskman.RegisterTask(ManagedGuestLiveMigrateTask{})
  51. }
  52. func (task *GuestMigrateTask) isLiveMigrate() bool {
  53. guestStatus, _ := task.Params.GetString("guest_status")
  54. if !task.isRescueMode() && (guestStatus == api.VM_RUNNING || guestStatus == api.VM_SUSPEND) {
  55. return true
  56. }
  57. return false
  58. }
  59. func (task *GuestMigrateTask) OnInit(ctx context.Context, obj db.IStandaloneModel, data jsonutils.JSONObject) {
  60. taskutils.StartScheduleObjects(ctx, task, []db.IStandaloneModel{obj})
  61. }
  62. func (task *GuestMigrateTask) GetSchedParams() (*schedapi.ScheduleInput, error) {
  63. obj := task.GetObject()
  64. guest := obj.(*models.SGuest)
  65. input := new(api.ServerMigrateForecastInput)
  66. if task.Params.Contains("prefer_host_id") {
  67. preferHostId, _ := task.Params.GetString("prefer_host_id")
  68. input.PreferHostId = preferHostId
  69. }
  70. if jsonutils.QueryBoolean(task.Params, "reset_cpu_numa_pin", false) {
  71. input.ResetCpuNumaPin = true
  72. }
  73. if task.isLiveMigrate() {
  74. input.LiveMigrate = true
  75. skipCpuCheck := jsonutils.QueryBoolean(task.Params, "skip_cpu_check", false)
  76. skipKernelCheck := jsonutils.QueryBoolean(task.Params, "skip_kernel_check", false)
  77. input.SkipCpuCheck = skipCpuCheck
  78. input.SkipKernelCheck = skipKernelCheck
  79. }
  80. res := guest.GetSchedMigrateParams(task.GetUserCred(), input)
  81. if devs, _ := guest.GetIsolatedDevices(); len(devs) > 0 {
  82. preferNumaNodesSet := cpuset.NewBuilder()
  83. for i := range devs {
  84. if devs[i].NumaNode >= 0 {
  85. preferNumaNodesSet.Add(int(devs[i].NumaNode))
  86. }
  87. }
  88. res.PreferNumaNodes = preferNumaNodesSet.Result().ToSlice()
  89. }
  90. return res, nil
  91. }
  92. func (task *GuestMigrateTask) OnStartSchedule(obj taskutils.IScheduleModel) {
  93. guest := obj.(*models.SGuest)
  94. guestStatus, _ := task.Params.GetString("guest_status")
  95. if guestStatus != api.VM_RUNNING && guestStatus != api.VM_SUSPEND {
  96. guest.SetStatus(context.Background(), task.UserCred, api.VM_MIGRATING, "")
  97. }
  98. db.OpsLog.LogEvent(guest, db.ACT_MIGRATING, "", task.UserCred)
  99. }
  100. func (task *GuestMigrateTask) OnScheduleFailCallback(ctx context.Context, obj taskutils.IScheduleModel, reason jsonutils.JSONObject, index int) {
  101. // do nothing
  102. }
  103. func (task *GuestMigrateTask) OnScheduleFailed(ctx context.Context, reason jsonutils.JSONObject) {
  104. obj := task.GetObject()
  105. guest := obj.(*models.SGuest)
  106. task.TaskFailed(ctx, guest, reason)
  107. }
  108. func (task *GuestMigrateTask) SaveScheduleResult(ctx context.Context, obj taskutils.IScheduleModel, target *schedapi.CandidateResource, index int) {
  109. guest := obj.(*models.SGuest)
  110. if jsonutils.QueryBoolean(task.Params, "reset_cpu_numa_pin", false) {
  111. guest.SetCpuNumaPin(ctx, task.UserCred, target.CpuNumaPin, nil)
  112. db.OpsLog.LogEvent(guest, db.ACT_RESET_CPU_NUMA_PIN, fmt.Sprintf("reset cpu numa pin %s", jsonutils.Marshal(target.CpuNumaPin)), task.UserCred)
  113. task.SetStageComplete(ctx, nil)
  114. return
  115. }
  116. targetHostId := target.HostId
  117. targetHost := models.HostManager.FetchHostById(targetHostId)
  118. if targetHost == nil {
  119. task.TaskFailed(ctx, guest, jsonutils.NewString("target host not found?"))
  120. return
  121. }
  122. body := jsonutils.NewDict()
  123. body.Set("target_host_id", jsonutils.NewString(targetHostId))
  124. if len(target.CpuNumaPin) > 0 {
  125. body.Set("target_cpu_numa_pin", jsonutils.Marshal(target.CpuNumaPin))
  126. }
  127. // for params notes
  128. body.Set("target_host_name", jsonutils.NewString(targetHost.Name))
  129. srcHost := models.HostManager.FetchHostById(guest.HostId)
  130. body.Set("source_host_name", jsonutils.NewString(srcHost.Name))
  131. body.Set("source_host_id", jsonutils.NewString(srcHost.Id))
  132. disks, _ := guest.GetGuestDisks()
  133. disk := disks[0].GetDisk()
  134. storage, _ := disk.GetStorage()
  135. isLocalStorage := utils.IsInStringArray(storage.StorageType,
  136. api.STORAGE_LOCAL_TYPES)
  137. if isLocalStorage {
  138. targetStorages := jsonutils.NewArray()
  139. for i := 0; i < len(disks); i++ {
  140. var targetStroage string
  141. if len(target.Disks[i].StorageIds) == 0 {
  142. targetStroage = targetHost.GetLeastUsedStorage(storage.StorageType).Id
  143. } else {
  144. targetStroage = target.Disks[i].StorageIds[0]
  145. }
  146. targetStorages.Add(jsonutils.NewString(targetStroage))
  147. }
  148. body.Set("target_storages", targetStorages)
  149. body.Set("is_local_storage", jsonutils.JSONTrue)
  150. } else {
  151. body.Set("is_local_storage", jsonutils.JSONFalse)
  152. }
  153. // prepare disk for migration
  154. if len(disk.TemplateId) > 0 && isLocalStorage {
  155. templates := []string{}
  156. if sourceGuestId := guest.GetMetadata(ctx, api.SERVER_META_CONVERT_FROM_ESXI, task.UserCred); len(sourceGuestId) > 0 {
  157. // skip cache images
  158. } else if sourceGuestId := guest.GetMetadata(ctx, api.SERVER_META_CONVERT_FROM_CLOUDPODS, task.UserCred); len(sourceGuestId) > 0 {
  159. // skip cache images
  160. } else {
  161. guestdisks, _ := guest.GetDisks()
  162. for i := range guestdisks {
  163. if guestdisks[i].TemplateId != "" {
  164. templates = append(templates, guestdisks[i].TemplateId)
  165. }
  166. }
  167. }
  168. if len(templates) > 0 {
  169. body.Set("cache_templates", jsonutils.NewStringArray(templates))
  170. }
  171. }
  172. db.OpsLog.LogEvent(guest, db.ACT_MIGRATING, fmt.Sprintf("guest start migrate from host %s to %s", guest.HostId, targetHostId), task.UserCred)
  173. logclient.AddActionLogWithContext(ctx, guest, logclient.ACT_MIGRATING,
  174. fmt.Sprintf("guest start migrate from host %s to %s(%s)", guest.HostId, targetHostId, targetHost.GetName()), task.UserCred, true)
  175. task.SetStage("OnStartCacheImages", body)
  176. task.OnStartCacheImages(ctx, guest, nil)
  177. }
  178. func (task *GuestMigrateTask) tryRecoverImageCache(ctx context.Context, guest *models.SGuest, input *api.CacheImageInput) error {
  179. if _, err := models.CachedimageManager.FetchById(input.ImageId); err != nil {
  180. if err != sql.ErrNoRows {
  181. return err
  182. }
  183. if _, err := models.CachedimageManager.RecoverCachedImage(ctx, task.UserCred, input.ImageId); err != nil {
  184. log.Errorf("failed recache image %s: %s", input.ImageId, err)
  185. }
  186. srcHost, err := guest.GetHost()
  187. if err != nil {
  188. return err
  189. }
  190. srcStorageCache := srcHost.GetLocalStoragecache()
  191. if scImg := models.StoragecachedimageManager.GetStoragecachedimage(srcStorageCache.Id, input.ImageId); scImg == nil {
  192. _, err = models.StoragecachedimageManager.RecoverStoragecachedImage(ctx, task.UserCred, srcStorageCache.Id, input.ImageId)
  193. if err != nil {
  194. log.Errorf("failed RecoverStoragecachedImage %s:%s %s", srcStorageCache.Id, input.ImageId, err)
  195. }
  196. }
  197. }
  198. return nil
  199. }
  200. func (task *GuestMigrateTask) OnStartCacheImages(ctx context.Context, guest *models.SGuest, data jsonutils.JSONObject) {
  201. templates, _ := task.Params.GetArray("cache_templates")
  202. if len(templates) == 0 {
  203. task.OnCachedImageComplete(ctx, guest, nil)
  204. return
  205. }
  206. templateId, _ := templates[0].GetString()
  207. task.Params.Set("cache_templates", jsonutils.NewArray(templates[1:]...))
  208. task.SetStage("OnStartCacheImages", nil)
  209. targetHostId, _ := task.Params.GetString("target_host_id")
  210. targetHost := models.HostManager.FetchHostById(targetHostId)
  211. targetStorageCache := targetHost.GetLocalStoragecache()
  212. if targetStorageCache != nil {
  213. input := api.CacheImageInput{
  214. ImageId: templateId,
  215. IsForce: false,
  216. SourceHostId: guest.HostId,
  217. ParentTaskId: task.GetTaskId(),
  218. }
  219. if err := task.tryRecoverImageCache(ctx, guest, &input); err != nil {
  220. task.TaskFailed(ctx, guest, jsonutils.NewString(err.Error()))
  221. return
  222. }
  223. err := targetStorageCache.StartImageCacheTask(ctx, task.UserCred, input)
  224. if err != nil {
  225. task.TaskFailed(ctx, guest, jsonutils.NewString(err.Error()))
  226. return
  227. }
  228. } else {
  229. task.OnStartCacheImages(ctx, guest, nil)
  230. }
  231. }
  232. func (task *GuestMigrateTask) OnStartCacheImagesFailed(ctx context.Context, guest *models.SGuest, data jsonutils.JSONObject) {
  233. task.TaskFailed(ctx, guest, data)
  234. }
  235. // For local storage get disk info
  236. func (task *GuestMigrateTask) OnCachedImageComplete(ctx context.Context, guest *models.SGuest, data jsonutils.JSONObject) {
  237. task.SetStage("OnCachedCdromComplete", nil)
  238. isLocalStorage, _ := task.Params.Bool("is_local_storage")
  239. if cdrom := guest.GetCdrom(); cdrom != nil && len(cdrom.ImageId) > 0 && isLocalStorage {
  240. targetHostId, _ := task.Params.GetString("target_host_id")
  241. targetHost := models.HostManager.FetchHostById(targetHostId)
  242. targetStorageCache := targetHost.GetLocalStoragecache()
  243. if targetStorageCache != nil {
  244. input := api.CacheImageInput{
  245. ImageId: cdrom.ImageId,
  246. Format: "iso",
  247. IsForce: false,
  248. ParentTaskId: task.GetTaskId(),
  249. SourceHostId: guest.HostId,
  250. }
  251. if err := task.tryRecoverImageCache(ctx, guest, &input); err != nil {
  252. task.TaskFailed(ctx, guest, jsonutils.NewString(err.Error()))
  253. return
  254. }
  255. err := targetStorageCache.StartImageCacheTask(ctx, task.UserCred, input)
  256. if err != nil {
  257. task.TaskFailed(ctx, guest, jsonutils.NewString(err.Error()))
  258. return
  259. }
  260. }
  261. } else {
  262. task.OnCachedCdromComplete(ctx, guest, nil)
  263. }
  264. }
  265. func (task *GuestMigrateTask) OnCachedCdromComplete(ctx context.Context, guest *models.SGuest, data jsonutils.JSONObject) {
  266. header := task.GetTaskRequestHeader()
  267. body := jsonutils.NewDict()
  268. if task.isLiveMigrate() {
  269. body.Set("live_migrate", jsonutils.JSONTrue)
  270. body.Set("enable_tls", jsonutils.NewBool(jsonutils.QueryBoolean(task.GetParams(), "enable_tls", false)))
  271. }
  272. if !task.isRescueMode() {
  273. host, _ := guest.GetHost()
  274. url := fmt.Sprintf("%s/servers/%s/src-prepare-migrate", host.ManagerUri, guest.Id)
  275. task.SetStage("OnSrcPrepareComplete", body)
  276. _, _, err := httputils.JSONRequest(httputils.GetDefaultClient(), ctx, "POST",
  277. url, header, body, false)
  278. if err != nil {
  279. task.TaskFailed(ctx, guest, jsonutils.NewString(err.Error()))
  280. return
  281. }
  282. } else {
  283. task.OnSrcPrepareComplete(ctx, guest, nil)
  284. }
  285. }
  286. func (task *GuestMigrateTask) OnCachedCdromCompleteFailed(ctx context.Context, guest *models.SGuest, data jsonutils.JSONObject) {
  287. task.TaskFailed(ctx, guest, data)
  288. }
  289. func (task *GuestMigrateTask) OnCachedImageCompleteFailed(ctx context.Context, guest *models.SGuest, data jsonutils.JSONObject) {
  290. task.TaskFailed(ctx, guest, data)
  291. }
  292. func (task *GuestMigrateTask) OnSrcPrepareCompleteFailed(ctx context.Context, guest *models.SGuest, data jsonutils.JSONObject) {
  293. task.TaskFailed(ctx, guest, data)
  294. }
  295. func (task *GuestMigrateTask) OnSrcPrepareComplete(ctx context.Context, guest *models.SGuest, data jsonutils.JSONObject) {
  296. targetHostId, _ := task.Params.GetString("target_host_id")
  297. targetHost := models.HostManager.FetchHostById(targetHostId)
  298. var body *jsonutils.JSONDict
  299. var err error
  300. if jsonutils.QueryBoolean(task.Params, "is_local_storage", false) {
  301. body, err = task.localStorageMigrateConf(ctx, guest, targetHost, data)
  302. } else {
  303. body, err = task.sharedStorageMigrateConf(ctx, guest, targetHost)
  304. }
  305. if err != nil {
  306. task.TaskFailed(ctx, guest, jsonutils.NewString(errors.Wrap(err, "get storage migrate conf").Error()))
  307. return
  308. }
  309. if task.isLiveMigrate() {
  310. srcDesc, err := data.Get("src_desc")
  311. if err != nil {
  312. task.TaskFailed(ctx, guest, jsonutils.NewString(errors.Wrap(err, "get src_desc from data").Error()))
  313. return
  314. }
  315. body.Set("src_desc", srcDesc)
  316. body.Set("live_migrate", jsonutils.JSONTrue)
  317. }
  318. if jsonutils.QueryBoolean(task.GetParams(), "enable_tls", false) {
  319. body.Set("enable_tls", jsonutils.JSONTrue)
  320. certsObj, err := data.Get("migrate_certs")
  321. if err != nil {
  322. task.TaskFailed(ctx, guest, jsonutils.NewString(errors.Wrap(err, "get migrate_certs from data").Error()))
  323. return
  324. }
  325. body.Set("migrate_certs", certsObj)
  326. }
  327. if err != nil {
  328. task.TaskFailed(ctx, guest, jsonutils.NewString(err.Error()))
  329. return
  330. }
  331. headers := task.GetTaskRequestHeader()
  332. url := fmt.Sprintf("%s/servers/%s/dest-prepare-migrate", targetHost.ManagerUri, guest.Id)
  333. task.SetStage("OnMigrateConfAndDiskComplete", nil)
  334. _, _, err = httputils.JSONRequest(httputils.GetDefaultClient(),
  335. ctx, "POST", url, headers, body, false)
  336. if err != nil {
  337. task.TaskFailed(ctx, guest, jsonutils.NewString(err.Error()))
  338. }
  339. }
  340. func (task *GuestMigrateTask) OnMigrateConfAndDiskCompleteFailed(ctx context.Context, guest *models.SGuest, data jsonutils.JSONObject) {
  341. targetHostId, _ := task.Params.GetString("target_host_id")
  342. err := jsonutils.NewDict()
  343. err.Set("MigrateConfAndDiskFailedReason", data)
  344. task.SetStage("OnUndeployTargetGuestSucc", err)
  345. guest.StartUndeployGuestTask(ctx, task.UserCred, task.GetTaskId(), targetHostId)
  346. task.TaskFailed(ctx, guest, data)
  347. }
  348. func (task *GuestMigrateTask) OnUndeployTargetGuestSucc(ctx context.Context, guest *models.SGuest, data jsonutils.JSONObject) {
  349. err, _ := task.Params.Get("MigrateConfAndDiskFailedReason")
  350. task.TaskFailed(ctx, guest, err)
  351. }
  352. func (task *GuestMigrateTask) OnUndeployTargetGuestSuccFailed(ctx context.Context, guest *models.SGuest, data jsonutils.JSONObject) {
  353. prevErr, _ := task.Params.Get("MigrateConfAndDiskFailedReason")
  354. err := jsonutils.NewDict()
  355. err.Set("MigrateConfAndDiskFailedReason", prevErr)
  356. err.Set("UndeployTargetGuestFailedReason", data)
  357. task.TaskFailed(ctx, guest, err)
  358. }
  359. func (task *GuestMigrateTask) OnMigrateConfAndDiskComplete(ctx context.Context, guest *models.SGuest, data jsonutils.JSONObject) {
  360. if data.Contains("dest_prepared_memory_snapshots") {
  361. msData, _ := data.Get("dest_prepared_memory_snapshots")
  362. task.Params.Set("dest_prepared_memory_snapshots", msData)
  363. }
  364. if task.isLiveMigrate() {
  365. // Live migrate
  366. task.SetStage("OnStartDestComplete", nil)
  367. } else {
  368. // Normal migrate
  369. task.OnNormalMigrateComplete(ctx, guest, data)
  370. }
  371. }
  372. func (task *GuestMigrateTask) OnNormalMigrateComplete(ctx context.Context, guest *models.SGuest, data jsonutils.JSONObject) {
  373. oldHostId := guest.HostId
  374. task.setGuest(ctx, guest)
  375. guestStatus, _ := task.Params.GetString("guest_status")
  376. guest.SetStatus(ctx, task.UserCred, guestStatus, "")
  377. if task.isRescueMode() {
  378. guest.StartGueststartTask(ctx, task.UserCred, nil, "")
  379. task.TaskComplete(ctx, guest)
  380. } else {
  381. task.SetStage("OnUndeployOldHostSucc", nil)
  382. guest.StartUndeployGuestTask(ctx, task.UserCred, task.GetTaskId(), oldHostId)
  383. }
  384. }
  385. // Server migrate complete
  386. func (task *GuestMigrateTask) OnUndeployOldHostSucc(ctx context.Context, guest *models.SGuest, data jsonutils.JSONObject) {
  387. if jsonutils.QueryBoolean(task.Params, "auto_start", false) {
  388. task.SetStage("OnGuestStartSucc", nil)
  389. guest.StartGueststartTask(ctx, task.UserCred, nil, task.GetId())
  390. } else {
  391. task.TaskComplete(ctx, guest)
  392. }
  393. }
  394. func (task *GuestMigrateTask) OnUndeployOldHostSuccFailed(ctx context.Context, guest *models.SGuest, data jsonutils.JSONObject) {
  395. task.TaskFailed(ctx, guest, data)
  396. }
  397. func (task *GuestMigrateTask) OnGuestStartSucc(ctx context.Context, guest *models.SGuest, data jsonutils.JSONObject) {
  398. task.TaskComplete(ctx, guest)
  399. }
  400. func (task *GuestMigrateTask) OnGuestStartSuccFailed(ctx context.Context, guest *models.SGuest, data jsonutils.JSONObject) {
  401. task.TaskFailed(ctx, guest, data)
  402. }
  403. func (task *GuestMigrateTask) isRescueMode() bool {
  404. return jsonutils.QueryBoolean(task.Params, "is_rescue_mode", false)
  405. }
  406. func (task *GuestMigrateTask) getInstanceSnapShotsWithMemory(guest *models.SGuest) ([]*models.SInstanceSnapshot, error) {
  407. isps, err := guest.GetInstanceSnapshots()
  408. if err != nil {
  409. return nil, errors.Wrap(err, "GetInstanceSnapshots")
  410. }
  411. ret := make([]*models.SInstanceSnapshot, 0)
  412. for idx := range isps {
  413. if isps[idx].WithMemory {
  414. if task.isRescueMode() {
  415. // do not copy memory snapshot in rescure mode, as it is not accessible
  416. // remove memory flag, because the memory snapshot will be lost after migration
  417. db.Update(&isps[idx], func() error {
  418. isps[idx].WithMemory = false
  419. return nil
  420. })
  421. } else {
  422. ret = append(ret, &isps[idx])
  423. }
  424. }
  425. }
  426. return ret, nil
  427. }
  428. func (task *GuestMigrateTask) getInstanceSnapShotIdsWithMemory(guest *models.SGuest) (*jsonutils.JSONArray, error) {
  429. isps, err := task.getInstanceSnapShotsWithMemory(guest)
  430. if err != nil {
  431. return nil, errors.Wrap(err, "getInstanceSnapshotsWithMemory")
  432. }
  433. ret := []string{}
  434. for _, isp := range isps {
  435. ret = append(ret, isp.GetId())
  436. }
  437. return jsonutils.Marshal(ret).(*jsonutils.JSONArray), nil
  438. }
  439. func (task *GuestMigrateTask) setBodyMemorySnapshotParams(guest *models.SGuest, srcHost *models.SHost, body *jsonutils.JSONDict) error {
  440. isps, err := task.getInstanceSnapShotIdsWithMemory(guest)
  441. if err != nil {
  442. return errors.Wrap(err, "getInstanceSnapShotsWithMemory")
  443. }
  444. memSnapshotUri := fmt.Sprintf("%s/download/memory_snapshots", srcHost.ManagerUri)
  445. body.Set("memory_snapshots_uri", jsonutils.NewString(memSnapshotUri))
  446. body.Set("src_memory_snapshots", isps)
  447. return nil
  448. }
  449. func (task *GuestMigrateTask) sharedStorageMigrateConf(ctx context.Context, guest *models.SGuest, targetHost *models.SHost) (*jsonutils.JSONDict, error) {
  450. body := jsonutils.NewDict()
  451. body.Set("is_local_storage", jsonutils.JSONFalse)
  452. body.Set("qemu_version", jsonutils.NewString(guest.GetQemuVersion(task.UserCred)))
  453. targetDesc := guest.GetJsonDescAtHypervisor(ctx, targetHost)
  454. if task.Params.Contains("target_cpu_numa_pin") {
  455. if err := task.setCpuNumaPin(targetDesc); err != nil {
  456. return nil, errors.Wrap(err, "setCpuNumaPin")
  457. }
  458. }
  459. body.Set("desc", jsonutils.Marshal(targetDesc))
  460. sourceHost, _ := guest.GetHost()
  461. if err := task.setBodyMemorySnapshotParams(guest, sourceHost, body); err != nil {
  462. return nil, errors.Wrap(err, "setBodyMemorySnapshotParams")
  463. }
  464. return body, nil
  465. }
  466. func (task *GuestMigrateTask) localStorageMigrateConf(ctx context.Context,
  467. guest *models.SGuest, targetHost *models.SHost, data jsonutils.JSONObject) (*jsonutils.JSONDict, error) {
  468. body := jsonutils.NewDict()
  469. if data != nil {
  470. body.Update(data.(*jsonutils.JSONDict))
  471. }
  472. params := jsonutils.NewDict()
  473. disks, _ := guest.GetGuestDisks()
  474. for i := 0; i < len(disks); i++ {
  475. snapChain := []string{}
  476. if body.Contains("disk_snaps_chain", disks[i].DiskId) {
  477. err := body.Unmarshal(&snapChain, "disk_snaps_chain", disks[i].DiskId)
  478. if err != nil {
  479. return nil, errors.Wrap(err, "unmarshal snap chain")
  480. }
  481. }
  482. snapshots := models.SnapshotManager.GetDiskSnapshots(disks[i].DiskId)
  483. outChainSnapshotIds := jsonutils.NewArray()
  484. for j := 0; j < len(snapshots); j++ {
  485. if !utils.IsInStringArray(snapshots[j].Id, snapChain) {
  486. outChainSnapshotIds.Add(jsonutils.NewString(snapshots[j].Id))
  487. }
  488. }
  489. params.Set(disks[i].DiskId, outChainSnapshotIds)
  490. }
  491. sourceHost, _ := guest.GetHost()
  492. snapshotsUri := fmt.Sprintf("%s/download/snapshots/", sourceHost.ManagerUri)
  493. disksUri := fmt.Sprintf("%s/download/disks/", sourceHost.ManagerUri)
  494. serverUrl := fmt.Sprintf("%s/download/servers/%s", sourceHost.ManagerUri, guest.Id)
  495. body.Set("out_chain_snapshots", params)
  496. body.Set("snapshots_uri", jsonutils.NewString(snapshotsUri))
  497. body.Set("disks_uri", jsonutils.NewString(disksUri))
  498. body.Set("server_url", jsonutils.NewString(serverUrl))
  499. body.Set("qemu_version", jsonutils.NewString(guest.GetQemuVersion(task.UserCred)))
  500. if err := task.setBodyMemorySnapshotParams(guest, sourceHost, body); err != nil {
  501. return nil, errors.Wrap(err, "setBodyMemorySnapshotParams")
  502. }
  503. targetDesc := guest.GetJsonDescAtHypervisor(ctx, targetHost)
  504. if len(targetDesc.Disks) == 0 {
  505. return nil, errors.Errorf("Get disksDesc error")
  506. }
  507. if task.Params.Contains("target_cpu_numa_pin") {
  508. if err := task.setCpuNumaPin(targetDesc); err != nil {
  509. return nil, errors.Wrap(err, "setCpuNumaPin")
  510. }
  511. }
  512. targetStorages, _ := task.Params.GetArray("target_storages")
  513. for i := 0; i < len(disks); i++ {
  514. targetStorageId, err := targetStorages[i].GetString()
  515. if err != nil {
  516. return nil, errors.Wrapf(err, "Get disk %d target storage id", i)
  517. }
  518. targetDesc.Disks[i].TargetStorageId = targetStorageId
  519. }
  520. body.Set("desc", jsonutils.Marshal(targetDesc))
  521. body.Set("rebase_disks", jsonutils.JSONTrue)
  522. body.Set("is_local_storage", jsonutils.JSONTrue)
  523. return body, nil
  524. }
  525. func (task *GuestMigrateTask) setCpuNumaPin(targetDesc *api.GuestJsonDesc) error {
  526. cpuNumaPin := make([]schedapi.SCpuNumaPin, 0)
  527. if err := task.Params.Unmarshal(&cpuNumaPin, "cpu_numa_pin"); err != nil {
  528. return errors.Wrap(err, "unmarshal cpu_numa_pin")
  529. }
  530. for i := range targetDesc.CpuNumaPin {
  531. for j := range targetDesc.CpuNumaPin[i].VcpuPin {
  532. targetDesc.CpuNumaPin[i].VcpuPin[j].Pcpu = cpuNumaPin[i].CpuPin[j]
  533. }
  534. }
  535. task.Params.Set("target_vcpu_numa_pin", jsonutils.Marshal(targetDesc.CpuNumaPin))
  536. return nil
  537. }
  538. func (task *GuestLiveMigrateTask) OnStartDestComplete(ctx context.Context, guest *models.SGuest, data jsonutils.JSONObject) {
  539. liveMigrateDestPort, err := data.Get("live_migrate_dest_port")
  540. if err != nil {
  541. task.TaskFailed(ctx, guest, jsonutils.NewString(fmt.Sprintf("Get migrate port error: %s", err)))
  542. return
  543. }
  544. var body = jsonutils.NewDict()
  545. var nbdServerPort jsonutils.JSONObject
  546. if !jsonutils.QueryBoolean(data, "nbd_server_disabled", false) {
  547. nbdServerPort, err = data.Get("nbd_server_port")
  548. if err != nil {
  549. task.TaskFailed(ctx, guest, jsonutils.NewString(fmt.Sprintf("Get nbd server port error: %s", err)))
  550. return
  551. }
  552. body.Set("nbd_server_port", nbdServerPort)
  553. }
  554. targetHostId, _ := task.Params.GetString("target_host_id")
  555. targetHost := models.HostManager.FetchHostById(targetHostId)
  556. isLocalStorage, _ := task.Params.Get("is_local_storage")
  557. body.Set("is_local_storage", isLocalStorage)
  558. body.Set("live_migrate_dest_port", liveMigrateDestPort)
  559. body.Set("dest_ip", jsonutils.NewString(targetHost.AccessIp))
  560. body.Set("enable_tls", jsonutils.NewBool(jsonutils.QueryBoolean(task.GetParams(), "enable_tls", false)))
  561. body.Set("quickly_finish", jsonutils.NewBool(jsonutils.QueryBoolean(task.GetParams(), "quickly_finish", false)))
  562. if task.Params.Contains("max_bandwidth_mb") {
  563. maxBandwidthMb, _ := task.Params.Get("max_bandwidth_mb")
  564. body.Set("max_bandwidth_mb", maxBandwidthMb)
  565. }
  566. headers := task.GetTaskRequestHeader()
  567. host, _ := guest.GetHost()
  568. url := fmt.Sprintf("%s/servers/%s/live-migrate", host.ManagerUri, guest.Id)
  569. task.SetStage("OnLiveMigrateComplete", nil)
  570. guest.SetStatus(ctx, task.UserCred, api.VM_LIVE_MIGRATING, "")
  571. _, _, err = httputils.JSONRequest(httputils.GetDefaultClient(),
  572. ctx, "POST", url, headers, body, false)
  573. if err != nil {
  574. task.OnLiveMigrateCompleteFailed(ctx, guest, jsonutils.NewString(err.Error()))
  575. }
  576. }
  577. func (task *GuestLiveMigrateTask) OnStartDestCompleteFailed(ctx context.Context, guest *models.SGuest, data jsonutils.JSONObject) {
  578. if !jsonutils.QueryBoolean(task.Params, "keep_dest_guest_on_failed", false) {
  579. targetHostId, _ := task.Params.GetString("target_host_id")
  580. guest.StartUndeployGuestTask(ctx, task.UserCred, "", targetHostId)
  581. }
  582. task.TaskFailed(ctx, guest, data)
  583. }
  584. func (task *GuestMigrateTask) setGuest(ctx context.Context, guest *models.SGuest) error {
  585. targetHostId, _ := task.Params.GetString("target_host_id")
  586. if jsonutils.QueryBoolean(task.Params, "is_local_storage", false) {
  587. targetStorages, _ := task.Params.GetArray("target_storages")
  588. disks, _ := guest.GetDisks()
  589. for i := 0; i < len(disks); i++ {
  590. disk := &disks[i]
  591. db.Update(disk, func() error {
  592. disk.Status = api.DISK_READY
  593. disk.StorageId, _ = targetStorages[i].GetString()
  594. return nil
  595. })
  596. snapshots := models.SnapshotManager.GetDiskSnapshots(disk.Id)
  597. for _, snapshot := range snapshots {
  598. db.Update(&snapshot, func() error {
  599. snapshot.StorageId, _ = targetStorages[i].GetString()
  600. return nil
  601. })
  602. }
  603. }
  604. }
  605. if task.Params.Contains("target_cpu_numa_pin") {
  606. var cpuNumaPinSrc []schedapi.SCpuNumaPin = nil
  607. var cpuNumaPin []api.SCpuNumaPin = nil
  608. val, _ := task.Params.Get("target_cpu_numa_pin")
  609. if !val.Equals(jsonutils.JSONNull) {
  610. cpuNumaPinSrc = make([]schedapi.SCpuNumaPin, 0)
  611. if err := task.Params.Unmarshal(&cpuNumaPinSrc, "target_cpu_numa_pin"); err != nil {
  612. return errors.Wrap(err, "unmarshal target_cpu_numa_pin")
  613. }
  614. cpuNumaPin = make([]api.SCpuNumaPin, 0)
  615. if err := task.Params.Unmarshal(&cpuNumaPin, "target_vcpu_numa_pin"); err != nil {
  616. return errors.Wrap(err, "unmarshal target_vcpu_numa_pin")
  617. }
  618. }
  619. if err := guest.SetCpuNumaPin(ctx, task.UserCred, cpuNumaPinSrc, cpuNumaPin); err != nil {
  620. return errors.Wrap(err, "SetCpuNumaPin")
  621. }
  622. }
  623. oldHost, _ := guest.GetHost()
  624. oldHost.ClearSchedDescCache()
  625. err := guest.OnScheduleToHost(ctx, task.UserCred, targetHostId)
  626. if err != nil {
  627. return err
  628. }
  629. return nil
  630. }
  631. func (task *GuestLiveMigrateTask) OnLiveMigrateCompleteFailed(ctx context.Context, guest *models.SGuest, data jsonutils.JSONObject) {
  632. if reason, _ := data.GetString("__reason__"); reason == "cancelled" {
  633. task.Params.Set("migrate_cancelled", jsonutils.JSONTrue)
  634. }
  635. if !jsonutils.QueryBoolean(task.Params, "keep_dest_guest_on_failed", false) {
  636. targetHostId, _ := task.Params.GetString("target_host_id")
  637. task.SetStage("OnGuestUndeployed", nil)
  638. guest.StartUndeployGuestTask(ctx, task.UserCred, task.Id, targetHostId)
  639. } else {
  640. task.OnGuestUndeployed(ctx, guest, data)
  641. }
  642. }
  643. func (task *GuestLiveMigrateTask) OnGuestUndeployed(ctx context.Context, guest *models.SGuest, data jsonutils.JSONObject) {
  644. task.TaskFailed(ctx, guest, data)
  645. if jsonutils.QueryBoolean(task.Params, "migrate_cancelled", false) {
  646. guest.StartSyncstatus(ctx, task.UserCred, "")
  647. }
  648. }
  649. func (task *GuestLiveMigrateTask) OnGuestUndeployedFailed(ctx context.Context, guest *models.SGuest, data jsonutils.JSONObject) {
  650. task.TaskFailed(ctx, guest, data)
  651. }
  652. func (task *GuestLiveMigrateTask) OnLiveMigrateComplete(ctx context.Context, guest *models.SGuest, data jsonutils.JSONObject) {
  653. if migInfo, err := data.Get("migration_info"); err != nil {
  654. db.OpsLog.LogEvent(guest, db.ACT_MIGRATE, migInfo, task.UserCred)
  655. logclient.AddActionLogWithContext(ctx, guest, logclient.ACT_MIGRATE, migInfo, task.UserCred, true)
  656. }
  657. headers := task.GetTaskRequestHeader()
  658. body := jsonutils.NewDict()
  659. body.Set("live_migrate", jsonutils.JSONTrue)
  660. body.Set("clean_tls", jsonutils.NewBool(jsonutils.QueryBoolean(task.GetParams(), "enable_tls", false)))
  661. targetHostId, _ := task.Params.GetString("target_host_id")
  662. task.SetStage("OnResumeDestGuestComplete", nil)
  663. targetHost := models.HostManager.FetchHostById(targetHostId)
  664. url := fmt.Sprintf("%s/servers/%s/resume", targetHost.ManagerUri, guest.Id)
  665. _, _, err := httputils.JSONRequest(httputils.GetDefaultClient(),
  666. ctx, "POST", url, headers, body, false)
  667. if err != nil {
  668. task.OnResumeDestGuestCompleteFailed(ctx, guest, jsonutils.NewString(err.Error()))
  669. }
  670. }
  671. func (task *GuestLiveMigrateTask) OnResumeDestGuestCompleteFailed(ctx context.Context,
  672. guest *models.SGuest, data jsonutils.JSONObject) {
  673. task.markFailed(ctx, guest, data)
  674. if !jsonutils.QueryBoolean(task.Params, "keep_dest_guest_on_failed", false) {
  675. targetHostId, _ := task.Params.GetString("target_host_id")
  676. guest.StartUndeployGuestTask(ctx, task.UserCred, "", targetHostId)
  677. }
  678. task.SetStage("OnResumeSourceGuestComplete", nil)
  679. sourceHost := models.HostManager.FetchHostById(guest.HostId)
  680. headers := task.GetTaskRequestHeader()
  681. body := jsonutils.NewDict()
  682. url := fmt.Sprintf("%s/servers/%s/resume", sourceHost.ManagerUri, guest.Id)
  683. _, _, err := httputils.JSONRequest(httputils.GetDefaultClient(),
  684. ctx, "POST", url, headers, body, false)
  685. if err != nil {
  686. task.OnResumeSourceGuestCompleteFailed(ctx, guest, jsonutils.NewString(err.Error()))
  687. }
  688. }
  689. func (task *GuestLiveMigrateTask) OnResumeSourceGuestCompleteFailed(ctx context.Context,
  690. guest *models.SGuest, data jsonutils.JSONObject) {
  691. db.OpsLog.LogEvent(guest, db.ACT_RESUME_FAIL, data, task.UserCred)
  692. logclient.AddActionLogWithContext(ctx, guest, logclient.ACT_VM_RESUME, data, task.UserCred, false)
  693. task.TaskFailed(ctx, guest, data)
  694. }
  695. func (task *GuestLiveMigrateTask) OnResumeSourceGuestComplete(ctx context.Context,
  696. guest *models.SGuest, data jsonutils.JSONObject) {
  697. task.TaskFailed(ctx, guest, data)
  698. }
  699. func (task *GuestLiveMigrateTask) OnResumeDestGuestComplete(ctx context.Context, guest *models.SGuest, data jsonutils.JSONObject) {
  700. oldHostId := guest.HostId
  701. err := task.setGuest(ctx, guest)
  702. if err != nil {
  703. task.TaskFailed(ctx, guest, jsonutils.NewString(err.Error()))
  704. }
  705. task.SetStage("OnUndeploySrcGuestComplete", nil)
  706. err = guest.StartUndeployGuestTask(ctx, task.UserCred, task.GetTaskId(), oldHostId)
  707. if err != nil {
  708. task.TaskFailed(ctx, guest, jsonutils.NewString(err.Error()))
  709. }
  710. }
  711. func (task *GuestLiveMigrateTask) OnUndeploySrcGuestComplete(ctx context.Context, guest *models.SGuest, data jsonutils.JSONObject) {
  712. db.OpsLog.LogEvent(guest, db.ACT_MIGRATE, "OnUndeploySrcGuestComplete", task.UserCred)
  713. status, _ := task.Params.GetString("guest_status")
  714. if status != guest.Status {
  715. task.SetStage("OnGuestSyncStatus", nil)
  716. guest.StartSyncstatus(ctx, task.UserCred, task.GetTaskId())
  717. } else {
  718. task.OnGuestSyncStatus(ctx, guest, nil)
  719. }
  720. }
  721. // Server live migrate complete
  722. func (task *GuestLiveMigrateTask) OnGuestSyncStatus(ctx context.Context, guest *models.SGuest, data jsonutils.JSONObject) {
  723. task.TaskComplete(ctx, guest)
  724. }
  725. func (task *GuestMigrateTask) updateInstanceSnapshotMemory(ctx context.Context, guest *models.SGuest) error {
  726. if !task.Params.Contains("dest_prepared_memory_snapshots") {
  727. return nil
  728. }
  729. ms, err := task.Params.Get("dest_prepared_memory_snapshots")
  730. if err != nil {
  731. return errors.Wrap(err, "get dest_prepared_memory_snapshots from params")
  732. }
  733. isps, err := task.getInstanceSnapShotsWithMemory(guest)
  734. if err != nil {
  735. return errors.Wrap(err, "getInstanceSnapShotsWithMemory")
  736. }
  737. for _, isp := range isps {
  738. msPath, err := ms.GetString(isp.GetId())
  739. if err != nil {
  740. return errors.Wrapf(err, "get instance snapshot %s memory path from dest prepared", isp.GetId())
  741. }
  742. if _, err := db.Update(isp, func() error {
  743. isp.MemoryFilePath = msPath
  744. isp.MemoryFileHostId = guest.HostId
  745. return nil
  746. }); err != nil {
  747. return errors.Wrapf(err, "update instance snapshot %q memory_filie_path", isp.GetId())
  748. }
  749. }
  750. return nil
  751. }
  752. func (task *GuestMigrateTask) TaskComplete(ctx context.Context, guest *models.SGuest) {
  753. if err := task.updateInstanceSnapshotMemory(ctx, guest); err != nil {
  754. task.TaskFailed(ctx, guest, jsonutils.NewString(err.Error()))
  755. return
  756. }
  757. task.SetStageComplete(ctx, nil)
  758. db.OpsLog.LogEvent(guest, db.ACT_MIGRATE, "Migrate success", task.UserCred)
  759. logclient.AddActionLogWithContext(ctx, guest, logclient.ACT_MIGRATE, task.Params, task.UserCred, true)
  760. }
  761. func (task *GuestMigrateTask) TaskFailed(ctx context.Context, guest *models.SGuest, reason jsonutils.JSONObject) {
  762. task.markFailed(ctx, guest, reason)
  763. task.SetStageFailed(ctx, reason)
  764. }
  765. func (task *GuestMigrateTask) markFailed(ctx context.Context, guest *models.SGuest, reason jsonutils.JSONObject) {
  766. guest.SetStatus(ctx, task.UserCred, api.VM_MIGRATE_FAILED, reason.String())
  767. db.OpsLog.LogEvent(guest, db.ACT_MIGRATE_FAIL, reason, task.UserCred)
  768. logclient.AddActionLogWithContext(ctx, guest, logclient.ACT_MIGRATE, reason, task.UserCred, false)
  769. notifyclient.NotifySystemErrorWithCtx(ctx, guest.Id, guest.Name, api.VM_MIGRATE_FAILED, reason.String())
  770. notifyclient.EventNotify(ctx, task.GetUserCred(), notifyclient.SEventNotifyParam{
  771. Obj: guest,
  772. Action: notifyclient.ActionMigrate,
  773. IsFail: true,
  774. })
  775. }
  776. // ManagedGuestMigrateTask
  777. func (task *ManagedGuestMigrateTask) OnInit(ctx context.Context, obj db.IStandaloneModel, data jsonutils.JSONObject) {
  778. guest := obj.(*models.SGuest)
  779. db.OpsLog.LogEvent(guest, db.ACT_MIGRATING, nil, task.UserCred)
  780. task.MigrateStart(ctx, guest, data)
  781. }
  782. func (task *ManagedGuestMigrateTask) MigrateStart(ctx context.Context, guest *models.SGuest, data jsonutils.JSONObject) {
  783. task.SetStage("OnMigrateComplete", nil)
  784. guest.SetStatus(ctx, task.UserCred, api.VM_MIGRATING, "")
  785. input := api.GuestMigrateInput{}
  786. task.GetParams().Unmarshal(&input)
  787. drv, err := guest.GetDriver()
  788. if err != nil {
  789. task.OnMigrateCompleteFailed(ctx, guest, jsonutils.NewString(err.Error()))
  790. return
  791. }
  792. if err := drv.RequestMigrate(ctx, guest, task.UserCred, input, task); err != nil {
  793. task.OnMigrateCompleteFailed(ctx, guest, jsonutils.NewString(err.Error()))
  794. }
  795. }
  796. func (task *ManagedGuestMigrateTask) OnMigrateComplete(ctx context.Context, obj db.IStandaloneModel, data jsonutils.JSONObject) {
  797. guest := obj.(*models.SGuest)
  798. logclient.AddActionLogWithContext(ctx, guest, logclient.ACT_MIGRATE, task.Params, task.UserCred, true)
  799. db.OpsLog.LogEvent(guest, db.ACT_MIGRATE, guest.GetShortDesc(ctx), task.UserCred)
  800. if jsonutils.QueryBoolean(task.Params, "auto_start", false) {
  801. task.SetStage("OnGuestStartSucc", nil)
  802. guest.StartGueststartTask(ctx, task.UserCred, nil, task.GetId())
  803. } else {
  804. task.SetStage("OnGuestSyncStatus", nil)
  805. guest.StartSyncstatus(ctx, task.UserCred, task.GetTaskId())
  806. }
  807. }
  808. func (task *ManagedGuestMigrateTask) OnGuestStartSucc(ctx context.Context, guest *models.SGuest, data jsonutils.JSONObject) {
  809. task.SetStageComplete(ctx, nil)
  810. }
  811. func (task *ManagedGuestMigrateTask) OnGuestSyncStatus(ctx context.Context, guest *models.SGuest, data jsonutils.JSONObject) {
  812. task.SetStageComplete(ctx, nil)
  813. }
  814. func (task *ManagedGuestMigrateTask) OnGuestSyncStatusFailed(ctx context.Context, guest *models.SGuest, data jsonutils.JSONObject) {
  815. task.SetStageFailed(ctx, data)
  816. }
  817. func (task *ManagedGuestMigrateTask) OnGuestStartSuccFailed(ctx context.Context, guest *models.SGuest, data jsonutils.JSONObject) {
  818. task.SetStageFailed(ctx, data)
  819. }
  820. func (task *ManagedGuestMigrateTask) OnMigrateCompleteFailed(ctx context.Context, guest *models.SGuest, data jsonutils.JSONObject) {
  821. guest.SetStatus(ctx, task.UserCred, api.VM_MIGRATE_FAILED, "")
  822. db.OpsLog.LogEvent(guest, db.ACT_MIGRATE_FAIL, data, task.UserCred)
  823. logclient.AddActionLogWithContext(ctx, guest, logclient.ACT_MIGRATE, data, task.UserCred, false)
  824. task.SetStageFailed(ctx, data)
  825. notifyclient.NotifySystemErrorWithCtx(ctx, guest.Id, guest.Name, api.VM_MIGRATE_FAILED, data.String())
  826. }
  827. // ManagedGuestLiveMigrateTask
  828. func (task *ManagedGuestLiveMigrateTask) OnInit(ctx context.Context, obj db.IStandaloneModel, data jsonutils.JSONObject) {
  829. guest := obj.(*models.SGuest)
  830. db.OpsLog.LogEvent(guest, db.ACT_MIGRATING, nil, task.UserCred)
  831. task.MigrateStart(ctx, guest, data)
  832. }
  833. func (task *ManagedGuestLiveMigrateTask) MigrateStart(ctx context.Context, guest *models.SGuest, data jsonutils.JSONObject) {
  834. task.SetStage("OnMigrateComplete", nil)
  835. guest.SetStatus(ctx, task.UserCred, api.VM_MIGRATING, "")
  836. input := api.GuestLiveMigrateInput{}
  837. task.GetParams().Unmarshal(&input)
  838. drv, err := guest.GetDriver()
  839. if err != nil {
  840. task.OnMigrateCompleteFailed(ctx, guest, jsonutils.NewString(err.Error()))
  841. return
  842. }
  843. if err := drv.RequestLiveMigrate(ctx, guest, task.UserCred, input, task); err != nil {
  844. task.OnMigrateCompleteFailed(ctx, guest, jsonutils.NewString(err.Error()))
  845. }
  846. }
  847. func (task *ManagedGuestLiveMigrateTask) OnMigrateComplete(ctx context.Context, guest *models.SGuest, data jsonutils.JSONObject) {
  848. task.SetStage("OnGuestSyncStatus", nil)
  849. db.OpsLog.LogEvent(guest, db.ACT_MIGRATE, guest.GetShortDesc(ctx), task.UserCred)
  850. logclient.AddActionLogWithContext(ctx, guest, logclient.ACT_MIGRATE, task.Params, task.UserCred, true)
  851. guest.StartSyncstatus(ctx, task.UserCred, task.GetTaskId())
  852. }
  853. func (task *ManagedGuestLiveMigrateTask) OnGuestSyncStatus(ctx context.Context, guest *models.SGuest, data jsonutils.JSONObject) {
  854. task.SetStageComplete(ctx, nil)
  855. }
  856. func (task *ManagedGuestLiveMigrateTask) OnGuestSyncStatusFailed(ctx context.Context, guest *models.SGuest, data jsonutils.JSONObject) {
  857. task.SetStageFailed(ctx, data)
  858. }
  859. func (task *ManagedGuestLiveMigrateTask) OnMigrateCompleteFailed(ctx context.Context, guest *models.SGuest, data jsonutils.JSONObject) {
  860. guest.SetStatus(ctx, task.UserCred, api.VM_MIGRATE_FAILED, "")
  861. db.OpsLog.LogEvent(guest, db.ACT_MIGRATE_FAIL, data, task.UserCred)
  862. logclient.AddActionLogWithContext(ctx, guest, logclient.ACT_MIGRATE, data, task.UserCred, false)
  863. task.SetStageFailed(ctx, data)
  864. notifyclient.NotifySystemErrorWithCtx(ctx, guest.Id, guest.Name, api.VM_MIGRATE_FAILED, data.String())
  865. }