| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975 |
- // Copyright 2019 Yunion
- //
- // Licensed under the Apache License, Version 2.0 (the "License");
- // you may not use this file except in compliance with the License.
- // You may obtain a copy of the License at
- //
- // http://www.apache.org/licenses/LICENSE-2.0
- //
- // Unless required by applicable law or agreed to in writing, software
- // distributed under the License is distributed on an "AS IS" BASIS,
- // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- // See the License for the specific language governing permissions and
- // limitations under the License.
- package guest
- import (
- "context"
- "database/sql"
- "fmt"
- "yunion.io/x/jsonutils"
- "yunion.io/x/log"
- "yunion.io/x/pkg/errors"
- "yunion.io/x/pkg/util/httputils"
- "yunion.io/x/pkg/utils"
- api "yunion.io/x/onecloud/pkg/apis/compute"
- schedapi "yunion.io/x/onecloud/pkg/apis/scheduler"
- "yunion.io/x/onecloud/pkg/cloudcommon/db"
- "yunion.io/x/onecloud/pkg/cloudcommon/db/taskman"
- "yunion.io/x/onecloud/pkg/cloudcommon/notifyclient"
- "yunion.io/x/onecloud/pkg/compute/models"
- taskutils "yunion.io/x/onecloud/pkg/compute/tasks/utils"
- "yunion.io/x/onecloud/pkg/util/cgrouputils/cpuset"
- "yunion.io/x/onecloud/pkg/util/logclient"
- )
- type GuestMigrateTask struct {
- taskutils.SSchedTask
- }
- type GuestLiveMigrateTask struct {
- GuestMigrateTask
- }
- type ManagedGuestMigrateTask struct {
- SGuestBaseTask
- }
- type ManagedGuestLiveMigrateTask struct {
- SGuestBaseTask
- }
- func init() {
- taskman.RegisterTask(GuestLiveMigrateTask{})
- taskman.RegisterTask(GuestMigrateTask{})
- taskman.RegisterTask(ManagedGuestMigrateTask{})
- taskman.RegisterTask(ManagedGuestLiveMigrateTask{})
- }
- func (task *GuestMigrateTask) isLiveMigrate() bool {
- guestStatus, _ := task.Params.GetString("guest_status")
- if !task.isRescueMode() && (guestStatus == api.VM_RUNNING || guestStatus == api.VM_SUSPEND) {
- return true
- }
- return false
- }
- func (task *GuestMigrateTask) OnInit(ctx context.Context, obj db.IStandaloneModel, data jsonutils.JSONObject) {
- taskutils.StartScheduleObjects(ctx, task, []db.IStandaloneModel{obj})
- }
- func (task *GuestMigrateTask) GetSchedParams() (*schedapi.ScheduleInput, error) {
- obj := task.GetObject()
- guest := obj.(*models.SGuest)
- input := new(api.ServerMigrateForecastInput)
- if task.Params.Contains("prefer_host_id") {
- preferHostId, _ := task.Params.GetString("prefer_host_id")
- input.PreferHostId = preferHostId
- }
- if jsonutils.QueryBoolean(task.Params, "reset_cpu_numa_pin", false) {
- input.ResetCpuNumaPin = true
- }
- if task.isLiveMigrate() {
- input.LiveMigrate = true
- skipCpuCheck := jsonutils.QueryBoolean(task.Params, "skip_cpu_check", false)
- skipKernelCheck := jsonutils.QueryBoolean(task.Params, "skip_kernel_check", false)
- input.SkipCpuCheck = skipCpuCheck
- input.SkipKernelCheck = skipKernelCheck
- }
- res := guest.GetSchedMigrateParams(task.GetUserCred(), input)
- if devs, _ := guest.GetIsolatedDevices(); len(devs) > 0 {
- preferNumaNodesSet := cpuset.NewBuilder()
- for i := range devs {
- if devs[i].NumaNode >= 0 {
- preferNumaNodesSet.Add(int(devs[i].NumaNode))
- }
- }
- res.PreferNumaNodes = preferNumaNodesSet.Result().ToSlice()
- }
- return res, nil
- }
- func (task *GuestMigrateTask) OnStartSchedule(obj taskutils.IScheduleModel) {
- guest := obj.(*models.SGuest)
- guestStatus, _ := task.Params.GetString("guest_status")
- if guestStatus != api.VM_RUNNING && guestStatus != api.VM_SUSPEND {
- guest.SetStatus(context.Background(), task.UserCred, api.VM_MIGRATING, "")
- }
- db.OpsLog.LogEvent(guest, db.ACT_MIGRATING, "", task.UserCred)
- }
- func (task *GuestMigrateTask) OnScheduleFailCallback(ctx context.Context, obj taskutils.IScheduleModel, reason jsonutils.JSONObject, index int) {
- // do nothing
- }
- func (task *GuestMigrateTask) OnScheduleFailed(ctx context.Context, reason jsonutils.JSONObject) {
- obj := task.GetObject()
- guest := obj.(*models.SGuest)
- task.TaskFailed(ctx, guest, reason)
- }
- func (task *GuestMigrateTask) SaveScheduleResult(ctx context.Context, obj taskutils.IScheduleModel, target *schedapi.CandidateResource, index int) {
- guest := obj.(*models.SGuest)
- if jsonutils.QueryBoolean(task.Params, "reset_cpu_numa_pin", false) {
- guest.SetCpuNumaPin(ctx, task.UserCred, target.CpuNumaPin, nil)
- db.OpsLog.LogEvent(guest, db.ACT_RESET_CPU_NUMA_PIN, fmt.Sprintf("reset cpu numa pin %s", jsonutils.Marshal(target.CpuNumaPin)), task.UserCred)
- task.SetStageComplete(ctx, nil)
- return
- }
- targetHostId := target.HostId
- targetHost := models.HostManager.FetchHostById(targetHostId)
- if targetHost == nil {
- task.TaskFailed(ctx, guest, jsonutils.NewString("target host not found?"))
- return
- }
- body := jsonutils.NewDict()
- body.Set("target_host_id", jsonutils.NewString(targetHostId))
- if len(target.CpuNumaPin) > 0 {
- body.Set("target_cpu_numa_pin", jsonutils.Marshal(target.CpuNumaPin))
- }
- // for params notes
- body.Set("target_host_name", jsonutils.NewString(targetHost.Name))
- srcHost := models.HostManager.FetchHostById(guest.HostId)
- body.Set("source_host_name", jsonutils.NewString(srcHost.Name))
- body.Set("source_host_id", jsonutils.NewString(srcHost.Id))
- disks, _ := guest.GetGuestDisks()
- disk := disks[0].GetDisk()
- storage, _ := disk.GetStorage()
- isLocalStorage := utils.IsInStringArray(storage.StorageType,
- api.STORAGE_LOCAL_TYPES)
- if isLocalStorage {
- targetStorages := jsonutils.NewArray()
- for i := 0; i < len(disks); i++ {
- var targetStroage string
- if len(target.Disks[i].StorageIds) == 0 {
- targetStroage = targetHost.GetLeastUsedStorage(storage.StorageType).Id
- } else {
- targetStroage = target.Disks[i].StorageIds[0]
- }
- targetStorages.Add(jsonutils.NewString(targetStroage))
- }
- body.Set("target_storages", targetStorages)
- body.Set("is_local_storage", jsonutils.JSONTrue)
- } else {
- body.Set("is_local_storage", jsonutils.JSONFalse)
- }
- // prepare disk for migration
- if len(disk.TemplateId) > 0 && isLocalStorage {
- templates := []string{}
- if sourceGuestId := guest.GetMetadata(ctx, api.SERVER_META_CONVERT_FROM_ESXI, task.UserCred); len(sourceGuestId) > 0 {
- // skip cache images
- } else if sourceGuestId := guest.GetMetadata(ctx, api.SERVER_META_CONVERT_FROM_CLOUDPODS, task.UserCred); len(sourceGuestId) > 0 {
- // skip cache images
- } else {
- guestdisks, _ := guest.GetDisks()
- for i := range guestdisks {
- if guestdisks[i].TemplateId != "" {
- templates = append(templates, guestdisks[i].TemplateId)
- }
- }
- }
- if len(templates) > 0 {
- body.Set("cache_templates", jsonutils.NewStringArray(templates))
- }
- }
- db.OpsLog.LogEvent(guest, db.ACT_MIGRATING, fmt.Sprintf("guest start migrate from host %s to %s", guest.HostId, targetHostId), task.UserCred)
- logclient.AddActionLogWithContext(ctx, guest, logclient.ACT_MIGRATING,
- fmt.Sprintf("guest start migrate from host %s to %s(%s)", guest.HostId, targetHostId, targetHost.GetName()), task.UserCred, true)
- task.SetStage("OnStartCacheImages", body)
- task.OnStartCacheImages(ctx, guest, nil)
- }
- func (task *GuestMigrateTask) tryRecoverImageCache(ctx context.Context, guest *models.SGuest, input *api.CacheImageInput) error {
- if _, err := models.CachedimageManager.FetchById(input.ImageId); err != nil {
- if err != sql.ErrNoRows {
- return err
- }
- if _, err := models.CachedimageManager.RecoverCachedImage(ctx, task.UserCred, input.ImageId); err != nil {
- log.Errorf("failed recache image %s: %s", input.ImageId, err)
- }
- srcHost, err := guest.GetHost()
- if err != nil {
- return err
- }
- srcStorageCache := srcHost.GetLocalStoragecache()
- if scImg := models.StoragecachedimageManager.GetStoragecachedimage(srcStorageCache.Id, input.ImageId); scImg == nil {
- _, err = models.StoragecachedimageManager.RecoverStoragecachedImage(ctx, task.UserCred, srcStorageCache.Id, input.ImageId)
- if err != nil {
- log.Errorf("failed RecoverStoragecachedImage %s:%s %s", srcStorageCache.Id, input.ImageId, err)
- }
- }
- }
- return nil
- }
- func (task *GuestMigrateTask) OnStartCacheImages(ctx context.Context, guest *models.SGuest, data jsonutils.JSONObject) {
- templates, _ := task.Params.GetArray("cache_templates")
- if len(templates) == 0 {
- task.OnCachedImageComplete(ctx, guest, nil)
- return
- }
- templateId, _ := templates[0].GetString()
- task.Params.Set("cache_templates", jsonutils.NewArray(templates[1:]...))
- task.SetStage("OnStartCacheImages", nil)
- targetHostId, _ := task.Params.GetString("target_host_id")
- targetHost := models.HostManager.FetchHostById(targetHostId)
- targetStorageCache := targetHost.GetLocalStoragecache()
- if targetStorageCache != nil {
- input := api.CacheImageInput{
- ImageId: templateId,
- IsForce: false,
- SourceHostId: guest.HostId,
- ParentTaskId: task.GetTaskId(),
- }
- if err := task.tryRecoverImageCache(ctx, guest, &input); err != nil {
- task.TaskFailed(ctx, guest, jsonutils.NewString(err.Error()))
- return
- }
- err := targetStorageCache.StartImageCacheTask(ctx, task.UserCred, input)
- if err != nil {
- task.TaskFailed(ctx, guest, jsonutils.NewString(err.Error()))
- return
- }
- } else {
- task.OnStartCacheImages(ctx, guest, nil)
- }
- }
- func (task *GuestMigrateTask) OnStartCacheImagesFailed(ctx context.Context, guest *models.SGuest, data jsonutils.JSONObject) {
- task.TaskFailed(ctx, guest, data)
- }
- // For local storage get disk info
- func (task *GuestMigrateTask) OnCachedImageComplete(ctx context.Context, guest *models.SGuest, data jsonutils.JSONObject) {
- task.SetStage("OnCachedCdromComplete", nil)
- isLocalStorage, _ := task.Params.Bool("is_local_storage")
- if cdrom := guest.GetCdrom(); cdrom != nil && len(cdrom.ImageId) > 0 && isLocalStorage {
- targetHostId, _ := task.Params.GetString("target_host_id")
- targetHost := models.HostManager.FetchHostById(targetHostId)
- targetStorageCache := targetHost.GetLocalStoragecache()
- if targetStorageCache != nil {
- input := api.CacheImageInput{
- ImageId: cdrom.ImageId,
- Format: "iso",
- IsForce: false,
- ParentTaskId: task.GetTaskId(),
- SourceHostId: guest.HostId,
- }
- if err := task.tryRecoverImageCache(ctx, guest, &input); err != nil {
- task.TaskFailed(ctx, guest, jsonutils.NewString(err.Error()))
- return
- }
- err := targetStorageCache.StartImageCacheTask(ctx, task.UserCred, input)
- if err != nil {
- task.TaskFailed(ctx, guest, jsonutils.NewString(err.Error()))
- return
- }
- }
- } else {
- task.OnCachedCdromComplete(ctx, guest, nil)
- }
- }
- func (task *GuestMigrateTask) OnCachedCdromComplete(ctx context.Context, guest *models.SGuest, data jsonutils.JSONObject) {
- header := task.GetTaskRequestHeader()
- body := jsonutils.NewDict()
- if task.isLiveMigrate() {
- body.Set("live_migrate", jsonutils.JSONTrue)
- body.Set("enable_tls", jsonutils.NewBool(jsonutils.QueryBoolean(task.GetParams(), "enable_tls", false)))
- }
- if !task.isRescueMode() {
- host, _ := guest.GetHost()
- url := fmt.Sprintf("%s/servers/%s/src-prepare-migrate", host.ManagerUri, guest.Id)
- task.SetStage("OnSrcPrepareComplete", body)
- _, _, err := httputils.JSONRequest(httputils.GetDefaultClient(), ctx, "POST",
- url, header, body, false)
- if err != nil {
- task.TaskFailed(ctx, guest, jsonutils.NewString(err.Error()))
- return
- }
- } else {
- task.OnSrcPrepareComplete(ctx, guest, nil)
- }
- }
- func (task *GuestMigrateTask) OnCachedCdromCompleteFailed(ctx context.Context, guest *models.SGuest, data jsonutils.JSONObject) {
- task.TaskFailed(ctx, guest, data)
- }
- func (task *GuestMigrateTask) OnCachedImageCompleteFailed(ctx context.Context, guest *models.SGuest, data jsonutils.JSONObject) {
- task.TaskFailed(ctx, guest, data)
- }
- func (task *GuestMigrateTask) OnSrcPrepareCompleteFailed(ctx context.Context, guest *models.SGuest, data jsonutils.JSONObject) {
- task.TaskFailed(ctx, guest, data)
- }
- func (task *GuestMigrateTask) OnSrcPrepareComplete(ctx context.Context, guest *models.SGuest, data jsonutils.JSONObject) {
- targetHostId, _ := task.Params.GetString("target_host_id")
- targetHost := models.HostManager.FetchHostById(targetHostId)
- var body *jsonutils.JSONDict
- var err error
- if jsonutils.QueryBoolean(task.Params, "is_local_storage", false) {
- body, err = task.localStorageMigrateConf(ctx, guest, targetHost, data)
- } else {
- body, err = task.sharedStorageMigrateConf(ctx, guest, targetHost)
- }
- if err != nil {
- task.TaskFailed(ctx, guest, jsonutils.NewString(errors.Wrap(err, "get storage migrate conf").Error()))
- return
- }
- if task.isLiveMigrate() {
- srcDesc, err := data.Get("src_desc")
- if err != nil {
- task.TaskFailed(ctx, guest, jsonutils.NewString(errors.Wrap(err, "get src_desc from data").Error()))
- return
- }
- body.Set("src_desc", srcDesc)
- body.Set("live_migrate", jsonutils.JSONTrue)
- }
- if jsonutils.QueryBoolean(task.GetParams(), "enable_tls", false) {
- body.Set("enable_tls", jsonutils.JSONTrue)
- certsObj, err := data.Get("migrate_certs")
- if err != nil {
- task.TaskFailed(ctx, guest, jsonutils.NewString(errors.Wrap(err, "get migrate_certs from data").Error()))
- return
- }
- body.Set("migrate_certs", certsObj)
- }
- if err != nil {
- task.TaskFailed(ctx, guest, jsonutils.NewString(err.Error()))
- return
- }
- headers := task.GetTaskRequestHeader()
- url := fmt.Sprintf("%s/servers/%s/dest-prepare-migrate", targetHost.ManagerUri, guest.Id)
- task.SetStage("OnMigrateConfAndDiskComplete", nil)
- _, _, err = httputils.JSONRequest(httputils.GetDefaultClient(),
- ctx, "POST", url, headers, body, false)
- if err != nil {
- task.TaskFailed(ctx, guest, jsonutils.NewString(err.Error()))
- }
- }
- func (task *GuestMigrateTask) OnMigrateConfAndDiskCompleteFailed(ctx context.Context, guest *models.SGuest, data jsonutils.JSONObject) {
- targetHostId, _ := task.Params.GetString("target_host_id")
- err := jsonutils.NewDict()
- err.Set("MigrateConfAndDiskFailedReason", data)
- task.SetStage("OnUndeployTargetGuestSucc", err)
- guest.StartUndeployGuestTask(ctx, task.UserCred, task.GetTaskId(), targetHostId)
- task.TaskFailed(ctx, guest, data)
- }
- func (task *GuestMigrateTask) OnUndeployTargetGuestSucc(ctx context.Context, guest *models.SGuest, data jsonutils.JSONObject) {
- err, _ := task.Params.Get("MigrateConfAndDiskFailedReason")
- task.TaskFailed(ctx, guest, err)
- }
- func (task *GuestMigrateTask) OnUndeployTargetGuestSuccFailed(ctx context.Context, guest *models.SGuest, data jsonutils.JSONObject) {
- prevErr, _ := task.Params.Get("MigrateConfAndDiskFailedReason")
- err := jsonutils.NewDict()
- err.Set("MigrateConfAndDiskFailedReason", prevErr)
- err.Set("UndeployTargetGuestFailedReason", data)
- task.TaskFailed(ctx, guest, err)
- }
- func (task *GuestMigrateTask) OnMigrateConfAndDiskComplete(ctx context.Context, guest *models.SGuest, data jsonutils.JSONObject) {
- if data.Contains("dest_prepared_memory_snapshots") {
- msData, _ := data.Get("dest_prepared_memory_snapshots")
- task.Params.Set("dest_prepared_memory_snapshots", msData)
- }
- if task.isLiveMigrate() {
- // Live migrate
- task.SetStage("OnStartDestComplete", nil)
- } else {
- // Normal migrate
- task.OnNormalMigrateComplete(ctx, guest, data)
- }
- }
- func (task *GuestMigrateTask) OnNormalMigrateComplete(ctx context.Context, guest *models.SGuest, data jsonutils.JSONObject) {
- oldHostId := guest.HostId
- task.setGuest(ctx, guest)
- guestStatus, _ := task.Params.GetString("guest_status")
- guest.SetStatus(ctx, task.UserCred, guestStatus, "")
- if task.isRescueMode() {
- guest.StartGueststartTask(ctx, task.UserCred, nil, "")
- task.TaskComplete(ctx, guest)
- } else {
- task.SetStage("OnUndeployOldHostSucc", nil)
- guest.StartUndeployGuestTask(ctx, task.UserCred, task.GetTaskId(), oldHostId)
- }
- }
- // Server migrate complete
- func (task *GuestMigrateTask) OnUndeployOldHostSucc(ctx context.Context, guest *models.SGuest, data jsonutils.JSONObject) {
- if jsonutils.QueryBoolean(task.Params, "auto_start", false) {
- task.SetStage("OnGuestStartSucc", nil)
- guest.StartGueststartTask(ctx, task.UserCred, nil, task.GetId())
- } else {
- task.TaskComplete(ctx, guest)
- }
- }
- func (task *GuestMigrateTask) OnUndeployOldHostSuccFailed(ctx context.Context, guest *models.SGuest, data jsonutils.JSONObject) {
- task.TaskFailed(ctx, guest, data)
- }
- func (task *GuestMigrateTask) OnGuestStartSucc(ctx context.Context, guest *models.SGuest, data jsonutils.JSONObject) {
- task.TaskComplete(ctx, guest)
- }
- func (task *GuestMigrateTask) OnGuestStartSuccFailed(ctx context.Context, guest *models.SGuest, data jsonutils.JSONObject) {
- task.TaskFailed(ctx, guest, data)
- }
- func (task *GuestMigrateTask) isRescueMode() bool {
- return jsonutils.QueryBoolean(task.Params, "is_rescue_mode", false)
- }
- func (task *GuestMigrateTask) getInstanceSnapShotsWithMemory(guest *models.SGuest) ([]*models.SInstanceSnapshot, error) {
- isps, err := guest.GetInstanceSnapshots()
- if err != nil {
- return nil, errors.Wrap(err, "GetInstanceSnapshots")
- }
- ret := make([]*models.SInstanceSnapshot, 0)
- for idx := range isps {
- if isps[idx].WithMemory {
- if task.isRescueMode() {
- // do not copy memory snapshot in rescure mode, as it is not accessible
- // remove memory flag, because the memory snapshot will be lost after migration
- db.Update(&isps[idx], func() error {
- isps[idx].WithMemory = false
- return nil
- })
- } else {
- ret = append(ret, &isps[idx])
- }
- }
- }
- return ret, nil
- }
- func (task *GuestMigrateTask) getInstanceSnapShotIdsWithMemory(guest *models.SGuest) (*jsonutils.JSONArray, error) {
- isps, err := task.getInstanceSnapShotsWithMemory(guest)
- if err != nil {
- return nil, errors.Wrap(err, "getInstanceSnapshotsWithMemory")
- }
- ret := []string{}
- for _, isp := range isps {
- ret = append(ret, isp.GetId())
- }
- return jsonutils.Marshal(ret).(*jsonutils.JSONArray), nil
- }
- func (task *GuestMigrateTask) setBodyMemorySnapshotParams(guest *models.SGuest, srcHost *models.SHost, body *jsonutils.JSONDict) error {
- isps, err := task.getInstanceSnapShotIdsWithMemory(guest)
- if err != nil {
- return errors.Wrap(err, "getInstanceSnapShotsWithMemory")
- }
- memSnapshotUri := fmt.Sprintf("%s/download/memory_snapshots", srcHost.ManagerUri)
- body.Set("memory_snapshots_uri", jsonutils.NewString(memSnapshotUri))
- body.Set("src_memory_snapshots", isps)
- return nil
- }
- func (task *GuestMigrateTask) sharedStorageMigrateConf(ctx context.Context, guest *models.SGuest, targetHost *models.SHost) (*jsonutils.JSONDict, error) {
- body := jsonutils.NewDict()
- body.Set("is_local_storage", jsonutils.JSONFalse)
- body.Set("qemu_version", jsonutils.NewString(guest.GetQemuVersion(task.UserCred)))
- targetDesc := guest.GetJsonDescAtHypervisor(ctx, targetHost)
- if task.Params.Contains("target_cpu_numa_pin") {
- if err := task.setCpuNumaPin(targetDesc); err != nil {
- return nil, errors.Wrap(err, "setCpuNumaPin")
- }
- }
- body.Set("desc", jsonutils.Marshal(targetDesc))
- sourceHost, _ := guest.GetHost()
- if err := task.setBodyMemorySnapshotParams(guest, sourceHost, body); err != nil {
- return nil, errors.Wrap(err, "setBodyMemorySnapshotParams")
- }
- return body, nil
- }
- func (task *GuestMigrateTask) localStorageMigrateConf(ctx context.Context,
- guest *models.SGuest, targetHost *models.SHost, data jsonutils.JSONObject) (*jsonutils.JSONDict, error) {
- body := jsonutils.NewDict()
- if data != nil {
- body.Update(data.(*jsonutils.JSONDict))
- }
- params := jsonutils.NewDict()
- disks, _ := guest.GetGuestDisks()
- for i := 0; i < len(disks); i++ {
- snapChain := []string{}
- if body.Contains("disk_snaps_chain", disks[i].DiskId) {
- err := body.Unmarshal(&snapChain, "disk_snaps_chain", disks[i].DiskId)
- if err != nil {
- return nil, errors.Wrap(err, "unmarshal snap chain")
- }
- }
- snapshots := models.SnapshotManager.GetDiskSnapshots(disks[i].DiskId)
- outChainSnapshotIds := jsonutils.NewArray()
- for j := 0; j < len(snapshots); j++ {
- if !utils.IsInStringArray(snapshots[j].Id, snapChain) {
- outChainSnapshotIds.Add(jsonutils.NewString(snapshots[j].Id))
- }
- }
- params.Set(disks[i].DiskId, outChainSnapshotIds)
- }
- sourceHost, _ := guest.GetHost()
- snapshotsUri := fmt.Sprintf("%s/download/snapshots/", sourceHost.ManagerUri)
- disksUri := fmt.Sprintf("%s/download/disks/", sourceHost.ManagerUri)
- serverUrl := fmt.Sprintf("%s/download/servers/%s", sourceHost.ManagerUri, guest.Id)
- body.Set("out_chain_snapshots", params)
- body.Set("snapshots_uri", jsonutils.NewString(snapshotsUri))
- body.Set("disks_uri", jsonutils.NewString(disksUri))
- body.Set("server_url", jsonutils.NewString(serverUrl))
- body.Set("qemu_version", jsonutils.NewString(guest.GetQemuVersion(task.UserCred)))
- if err := task.setBodyMemorySnapshotParams(guest, sourceHost, body); err != nil {
- return nil, errors.Wrap(err, "setBodyMemorySnapshotParams")
- }
- targetDesc := guest.GetJsonDescAtHypervisor(ctx, targetHost)
- if len(targetDesc.Disks) == 0 {
- return nil, errors.Errorf("Get disksDesc error")
- }
- if task.Params.Contains("target_cpu_numa_pin") {
- if err := task.setCpuNumaPin(targetDesc); err != nil {
- return nil, errors.Wrap(err, "setCpuNumaPin")
- }
- }
- targetStorages, _ := task.Params.GetArray("target_storages")
- for i := 0; i < len(disks); i++ {
- targetStorageId, err := targetStorages[i].GetString()
- if err != nil {
- return nil, errors.Wrapf(err, "Get disk %d target storage id", i)
- }
- targetDesc.Disks[i].TargetStorageId = targetStorageId
- }
- body.Set("desc", jsonutils.Marshal(targetDesc))
- body.Set("rebase_disks", jsonutils.JSONTrue)
- body.Set("is_local_storage", jsonutils.JSONTrue)
- return body, nil
- }
- func (task *GuestMigrateTask) setCpuNumaPin(targetDesc *api.GuestJsonDesc) error {
- cpuNumaPin := make([]schedapi.SCpuNumaPin, 0)
- if err := task.Params.Unmarshal(&cpuNumaPin, "cpu_numa_pin"); err != nil {
- return errors.Wrap(err, "unmarshal cpu_numa_pin")
- }
- for i := range targetDesc.CpuNumaPin {
- for j := range targetDesc.CpuNumaPin[i].VcpuPin {
- targetDesc.CpuNumaPin[i].VcpuPin[j].Pcpu = cpuNumaPin[i].CpuPin[j]
- }
- }
- task.Params.Set("target_vcpu_numa_pin", jsonutils.Marshal(targetDesc.CpuNumaPin))
- return nil
- }
- func (task *GuestLiveMigrateTask) OnStartDestComplete(ctx context.Context, guest *models.SGuest, data jsonutils.JSONObject) {
- liveMigrateDestPort, err := data.Get("live_migrate_dest_port")
- if err != nil {
- task.TaskFailed(ctx, guest, jsonutils.NewString(fmt.Sprintf("Get migrate port error: %s", err)))
- return
- }
- var body = jsonutils.NewDict()
- var nbdServerPort jsonutils.JSONObject
- if !jsonutils.QueryBoolean(data, "nbd_server_disabled", false) {
- nbdServerPort, err = data.Get("nbd_server_port")
- if err != nil {
- task.TaskFailed(ctx, guest, jsonutils.NewString(fmt.Sprintf("Get nbd server port error: %s", err)))
- return
- }
- body.Set("nbd_server_port", nbdServerPort)
- }
- targetHostId, _ := task.Params.GetString("target_host_id")
- targetHost := models.HostManager.FetchHostById(targetHostId)
- isLocalStorage, _ := task.Params.Get("is_local_storage")
- body.Set("is_local_storage", isLocalStorage)
- body.Set("live_migrate_dest_port", liveMigrateDestPort)
- body.Set("dest_ip", jsonutils.NewString(targetHost.AccessIp))
- body.Set("enable_tls", jsonutils.NewBool(jsonutils.QueryBoolean(task.GetParams(), "enable_tls", false)))
- body.Set("quickly_finish", jsonutils.NewBool(jsonutils.QueryBoolean(task.GetParams(), "quickly_finish", false)))
- if task.Params.Contains("max_bandwidth_mb") {
- maxBandwidthMb, _ := task.Params.Get("max_bandwidth_mb")
- body.Set("max_bandwidth_mb", maxBandwidthMb)
- }
- headers := task.GetTaskRequestHeader()
- host, _ := guest.GetHost()
- url := fmt.Sprintf("%s/servers/%s/live-migrate", host.ManagerUri, guest.Id)
- task.SetStage("OnLiveMigrateComplete", nil)
- guest.SetStatus(ctx, task.UserCred, api.VM_LIVE_MIGRATING, "")
- _, _, err = httputils.JSONRequest(httputils.GetDefaultClient(),
- ctx, "POST", url, headers, body, false)
- if err != nil {
- task.OnLiveMigrateCompleteFailed(ctx, guest, jsonutils.NewString(err.Error()))
- }
- }
- func (task *GuestLiveMigrateTask) OnStartDestCompleteFailed(ctx context.Context, guest *models.SGuest, data jsonutils.JSONObject) {
- if !jsonutils.QueryBoolean(task.Params, "keep_dest_guest_on_failed", false) {
- targetHostId, _ := task.Params.GetString("target_host_id")
- guest.StartUndeployGuestTask(ctx, task.UserCred, "", targetHostId)
- }
- task.TaskFailed(ctx, guest, data)
- }
- func (task *GuestMigrateTask) setGuest(ctx context.Context, guest *models.SGuest) error {
- targetHostId, _ := task.Params.GetString("target_host_id")
- if jsonutils.QueryBoolean(task.Params, "is_local_storage", false) {
- targetStorages, _ := task.Params.GetArray("target_storages")
- disks, _ := guest.GetDisks()
- for i := 0; i < len(disks); i++ {
- disk := &disks[i]
- db.Update(disk, func() error {
- disk.Status = api.DISK_READY
- disk.StorageId, _ = targetStorages[i].GetString()
- return nil
- })
- snapshots := models.SnapshotManager.GetDiskSnapshots(disk.Id)
- for _, snapshot := range snapshots {
- db.Update(&snapshot, func() error {
- snapshot.StorageId, _ = targetStorages[i].GetString()
- return nil
- })
- }
- }
- }
- if task.Params.Contains("target_cpu_numa_pin") {
- var cpuNumaPinSrc []schedapi.SCpuNumaPin = nil
- var cpuNumaPin []api.SCpuNumaPin = nil
- val, _ := task.Params.Get("target_cpu_numa_pin")
- if !val.Equals(jsonutils.JSONNull) {
- cpuNumaPinSrc = make([]schedapi.SCpuNumaPin, 0)
- if err := task.Params.Unmarshal(&cpuNumaPinSrc, "target_cpu_numa_pin"); err != nil {
- return errors.Wrap(err, "unmarshal target_cpu_numa_pin")
- }
- cpuNumaPin = make([]api.SCpuNumaPin, 0)
- if err := task.Params.Unmarshal(&cpuNumaPin, "target_vcpu_numa_pin"); err != nil {
- return errors.Wrap(err, "unmarshal target_vcpu_numa_pin")
- }
- }
- if err := guest.SetCpuNumaPin(ctx, task.UserCred, cpuNumaPinSrc, cpuNumaPin); err != nil {
- return errors.Wrap(err, "SetCpuNumaPin")
- }
- }
- oldHost, _ := guest.GetHost()
- oldHost.ClearSchedDescCache()
- err := guest.OnScheduleToHost(ctx, task.UserCred, targetHostId)
- if err != nil {
- return err
- }
- return nil
- }
- func (task *GuestLiveMigrateTask) OnLiveMigrateCompleteFailed(ctx context.Context, guest *models.SGuest, data jsonutils.JSONObject) {
- if reason, _ := data.GetString("__reason__"); reason == "cancelled" {
- task.Params.Set("migrate_cancelled", jsonutils.JSONTrue)
- }
- if !jsonutils.QueryBoolean(task.Params, "keep_dest_guest_on_failed", false) {
- targetHostId, _ := task.Params.GetString("target_host_id")
- task.SetStage("OnGuestUndeployed", nil)
- guest.StartUndeployGuestTask(ctx, task.UserCred, task.Id, targetHostId)
- } else {
- task.OnGuestUndeployed(ctx, guest, data)
- }
- }
- func (task *GuestLiveMigrateTask) OnGuestUndeployed(ctx context.Context, guest *models.SGuest, data jsonutils.JSONObject) {
- task.TaskFailed(ctx, guest, data)
- if jsonutils.QueryBoolean(task.Params, "migrate_cancelled", false) {
- guest.StartSyncstatus(ctx, task.UserCred, "")
- }
- }
- func (task *GuestLiveMigrateTask) OnGuestUndeployedFailed(ctx context.Context, guest *models.SGuest, data jsonutils.JSONObject) {
- task.TaskFailed(ctx, guest, data)
- }
- func (task *GuestLiveMigrateTask) OnLiveMigrateComplete(ctx context.Context, guest *models.SGuest, data jsonutils.JSONObject) {
- if migInfo, err := data.Get("migration_info"); err != nil {
- db.OpsLog.LogEvent(guest, db.ACT_MIGRATE, migInfo, task.UserCred)
- logclient.AddActionLogWithContext(ctx, guest, logclient.ACT_MIGRATE, migInfo, task.UserCred, true)
- }
- headers := task.GetTaskRequestHeader()
- body := jsonutils.NewDict()
- body.Set("live_migrate", jsonutils.JSONTrue)
- body.Set("clean_tls", jsonutils.NewBool(jsonutils.QueryBoolean(task.GetParams(), "enable_tls", false)))
- targetHostId, _ := task.Params.GetString("target_host_id")
- task.SetStage("OnResumeDestGuestComplete", nil)
- targetHost := models.HostManager.FetchHostById(targetHostId)
- url := fmt.Sprintf("%s/servers/%s/resume", targetHost.ManagerUri, guest.Id)
- _, _, err := httputils.JSONRequest(httputils.GetDefaultClient(),
- ctx, "POST", url, headers, body, false)
- if err != nil {
- task.OnResumeDestGuestCompleteFailed(ctx, guest, jsonutils.NewString(err.Error()))
- }
- }
- func (task *GuestLiveMigrateTask) OnResumeDestGuestCompleteFailed(ctx context.Context,
- guest *models.SGuest, data jsonutils.JSONObject) {
- task.markFailed(ctx, guest, data)
- if !jsonutils.QueryBoolean(task.Params, "keep_dest_guest_on_failed", false) {
- targetHostId, _ := task.Params.GetString("target_host_id")
- guest.StartUndeployGuestTask(ctx, task.UserCred, "", targetHostId)
- }
- task.SetStage("OnResumeSourceGuestComplete", nil)
- sourceHost := models.HostManager.FetchHostById(guest.HostId)
- headers := task.GetTaskRequestHeader()
- body := jsonutils.NewDict()
- url := fmt.Sprintf("%s/servers/%s/resume", sourceHost.ManagerUri, guest.Id)
- _, _, err := httputils.JSONRequest(httputils.GetDefaultClient(),
- ctx, "POST", url, headers, body, false)
- if err != nil {
- task.OnResumeSourceGuestCompleteFailed(ctx, guest, jsonutils.NewString(err.Error()))
- }
- }
- func (task *GuestLiveMigrateTask) OnResumeSourceGuestCompleteFailed(ctx context.Context,
- guest *models.SGuest, data jsonutils.JSONObject) {
- db.OpsLog.LogEvent(guest, db.ACT_RESUME_FAIL, data, task.UserCred)
- logclient.AddActionLogWithContext(ctx, guest, logclient.ACT_VM_RESUME, data, task.UserCred, false)
- task.TaskFailed(ctx, guest, data)
- }
- func (task *GuestLiveMigrateTask) OnResumeSourceGuestComplete(ctx context.Context,
- guest *models.SGuest, data jsonutils.JSONObject) {
- task.TaskFailed(ctx, guest, data)
- }
- func (task *GuestLiveMigrateTask) OnResumeDestGuestComplete(ctx context.Context, guest *models.SGuest, data jsonutils.JSONObject) {
- oldHostId := guest.HostId
- err := task.setGuest(ctx, guest)
- if err != nil {
- task.TaskFailed(ctx, guest, jsonutils.NewString(err.Error()))
- }
- task.SetStage("OnUndeploySrcGuestComplete", nil)
- err = guest.StartUndeployGuestTask(ctx, task.UserCred, task.GetTaskId(), oldHostId)
- if err != nil {
- task.TaskFailed(ctx, guest, jsonutils.NewString(err.Error()))
- }
- }
- func (task *GuestLiveMigrateTask) OnUndeploySrcGuestComplete(ctx context.Context, guest *models.SGuest, data jsonutils.JSONObject) {
- db.OpsLog.LogEvent(guest, db.ACT_MIGRATE, "OnUndeploySrcGuestComplete", task.UserCred)
- status, _ := task.Params.GetString("guest_status")
- if status != guest.Status {
- task.SetStage("OnGuestSyncStatus", nil)
- guest.StartSyncstatus(ctx, task.UserCred, task.GetTaskId())
- } else {
- task.OnGuestSyncStatus(ctx, guest, nil)
- }
- }
- // Server live migrate complete
- func (task *GuestLiveMigrateTask) OnGuestSyncStatus(ctx context.Context, guest *models.SGuest, data jsonutils.JSONObject) {
- task.TaskComplete(ctx, guest)
- }
- func (task *GuestMigrateTask) updateInstanceSnapshotMemory(ctx context.Context, guest *models.SGuest) error {
- if !task.Params.Contains("dest_prepared_memory_snapshots") {
- return nil
- }
- ms, err := task.Params.Get("dest_prepared_memory_snapshots")
- if err != nil {
- return errors.Wrap(err, "get dest_prepared_memory_snapshots from params")
- }
- isps, err := task.getInstanceSnapShotsWithMemory(guest)
- if err != nil {
- return errors.Wrap(err, "getInstanceSnapShotsWithMemory")
- }
- for _, isp := range isps {
- msPath, err := ms.GetString(isp.GetId())
- if err != nil {
- return errors.Wrapf(err, "get instance snapshot %s memory path from dest prepared", isp.GetId())
- }
- if _, err := db.Update(isp, func() error {
- isp.MemoryFilePath = msPath
- isp.MemoryFileHostId = guest.HostId
- return nil
- }); err != nil {
- return errors.Wrapf(err, "update instance snapshot %q memory_filie_path", isp.GetId())
- }
- }
- return nil
- }
- func (task *GuestMigrateTask) TaskComplete(ctx context.Context, guest *models.SGuest) {
- if err := task.updateInstanceSnapshotMemory(ctx, guest); err != nil {
- task.TaskFailed(ctx, guest, jsonutils.NewString(err.Error()))
- return
- }
- task.SetStageComplete(ctx, nil)
- db.OpsLog.LogEvent(guest, db.ACT_MIGRATE, "Migrate success", task.UserCred)
- logclient.AddActionLogWithContext(ctx, guest, logclient.ACT_MIGRATE, task.Params, task.UserCred, true)
- }
- func (task *GuestMigrateTask) TaskFailed(ctx context.Context, guest *models.SGuest, reason jsonutils.JSONObject) {
- task.markFailed(ctx, guest, reason)
- task.SetStageFailed(ctx, reason)
- }
- func (task *GuestMigrateTask) markFailed(ctx context.Context, guest *models.SGuest, reason jsonutils.JSONObject) {
- guest.SetStatus(ctx, task.UserCred, api.VM_MIGRATE_FAILED, reason.String())
- db.OpsLog.LogEvent(guest, db.ACT_MIGRATE_FAIL, reason, task.UserCred)
- logclient.AddActionLogWithContext(ctx, guest, logclient.ACT_MIGRATE, reason, task.UserCred, false)
- notifyclient.NotifySystemErrorWithCtx(ctx, guest.Id, guest.Name, api.VM_MIGRATE_FAILED, reason.String())
- notifyclient.EventNotify(ctx, task.GetUserCred(), notifyclient.SEventNotifyParam{
- Obj: guest,
- Action: notifyclient.ActionMigrate,
- IsFail: true,
- })
- }
- // ManagedGuestMigrateTask
- func (task *ManagedGuestMigrateTask) OnInit(ctx context.Context, obj db.IStandaloneModel, data jsonutils.JSONObject) {
- guest := obj.(*models.SGuest)
- db.OpsLog.LogEvent(guest, db.ACT_MIGRATING, nil, task.UserCred)
- task.MigrateStart(ctx, guest, data)
- }
- func (task *ManagedGuestMigrateTask) MigrateStart(ctx context.Context, guest *models.SGuest, data jsonutils.JSONObject) {
- task.SetStage("OnMigrateComplete", nil)
- guest.SetStatus(ctx, task.UserCred, api.VM_MIGRATING, "")
- input := api.GuestMigrateInput{}
- task.GetParams().Unmarshal(&input)
- drv, err := guest.GetDriver()
- if err != nil {
- task.OnMigrateCompleteFailed(ctx, guest, jsonutils.NewString(err.Error()))
- return
- }
- if err := drv.RequestMigrate(ctx, guest, task.UserCred, input, task); err != nil {
- task.OnMigrateCompleteFailed(ctx, guest, jsonutils.NewString(err.Error()))
- }
- }
- func (task *ManagedGuestMigrateTask) OnMigrateComplete(ctx context.Context, obj db.IStandaloneModel, data jsonutils.JSONObject) {
- guest := obj.(*models.SGuest)
- logclient.AddActionLogWithContext(ctx, guest, logclient.ACT_MIGRATE, task.Params, task.UserCred, true)
- db.OpsLog.LogEvent(guest, db.ACT_MIGRATE, guest.GetShortDesc(ctx), task.UserCred)
- if jsonutils.QueryBoolean(task.Params, "auto_start", false) {
- task.SetStage("OnGuestStartSucc", nil)
- guest.StartGueststartTask(ctx, task.UserCred, nil, task.GetId())
- } else {
- task.SetStage("OnGuestSyncStatus", nil)
- guest.StartSyncstatus(ctx, task.UserCred, task.GetTaskId())
- }
- }
- func (task *ManagedGuestMigrateTask) OnGuestStartSucc(ctx context.Context, guest *models.SGuest, data jsonutils.JSONObject) {
- task.SetStageComplete(ctx, nil)
- }
- func (task *ManagedGuestMigrateTask) OnGuestSyncStatus(ctx context.Context, guest *models.SGuest, data jsonutils.JSONObject) {
- task.SetStageComplete(ctx, nil)
- }
- func (task *ManagedGuestMigrateTask) OnGuestSyncStatusFailed(ctx context.Context, guest *models.SGuest, data jsonutils.JSONObject) {
- task.SetStageFailed(ctx, data)
- }
- func (task *ManagedGuestMigrateTask) OnGuestStartSuccFailed(ctx context.Context, guest *models.SGuest, data jsonutils.JSONObject) {
- task.SetStageFailed(ctx, data)
- }
- func (task *ManagedGuestMigrateTask) OnMigrateCompleteFailed(ctx context.Context, guest *models.SGuest, data jsonutils.JSONObject) {
- guest.SetStatus(ctx, task.UserCred, api.VM_MIGRATE_FAILED, "")
- db.OpsLog.LogEvent(guest, db.ACT_MIGRATE_FAIL, data, task.UserCred)
- logclient.AddActionLogWithContext(ctx, guest, logclient.ACT_MIGRATE, data, task.UserCred, false)
- task.SetStageFailed(ctx, data)
- notifyclient.NotifySystemErrorWithCtx(ctx, guest.Id, guest.Name, api.VM_MIGRATE_FAILED, data.String())
- }
- // ManagedGuestLiveMigrateTask
- func (task *ManagedGuestLiveMigrateTask) OnInit(ctx context.Context, obj db.IStandaloneModel, data jsonutils.JSONObject) {
- guest := obj.(*models.SGuest)
- db.OpsLog.LogEvent(guest, db.ACT_MIGRATING, nil, task.UserCred)
- task.MigrateStart(ctx, guest, data)
- }
- func (task *ManagedGuestLiveMigrateTask) MigrateStart(ctx context.Context, guest *models.SGuest, data jsonutils.JSONObject) {
- task.SetStage("OnMigrateComplete", nil)
- guest.SetStatus(ctx, task.UserCred, api.VM_MIGRATING, "")
- input := api.GuestLiveMigrateInput{}
- task.GetParams().Unmarshal(&input)
- drv, err := guest.GetDriver()
- if err != nil {
- task.OnMigrateCompleteFailed(ctx, guest, jsonutils.NewString(err.Error()))
- return
- }
- if err := drv.RequestLiveMigrate(ctx, guest, task.UserCred, input, task); err != nil {
- task.OnMigrateCompleteFailed(ctx, guest, jsonutils.NewString(err.Error()))
- }
- }
- func (task *ManagedGuestLiveMigrateTask) OnMigrateComplete(ctx context.Context, guest *models.SGuest, data jsonutils.JSONObject) {
- task.SetStage("OnGuestSyncStatus", nil)
- db.OpsLog.LogEvent(guest, db.ACT_MIGRATE, guest.GetShortDesc(ctx), task.UserCred)
- logclient.AddActionLogWithContext(ctx, guest, logclient.ACT_MIGRATE, task.Params, task.UserCred, true)
- guest.StartSyncstatus(ctx, task.UserCred, task.GetTaskId())
- }
- func (task *ManagedGuestLiveMigrateTask) OnGuestSyncStatus(ctx context.Context, guest *models.SGuest, data jsonutils.JSONObject) {
- task.SetStageComplete(ctx, nil)
- }
- func (task *ManagedGuestLiveMigrateTask) OnGuestSyncStatusFailed(ctx context.Context, guest *models.SGuest, data jsonutils.JSONObject) {
- task.SetStageFailed(ctx, data)
- }
- func (task *ManagedGuestLiveMigrateTask) OnMigrateCompleteFailed(ctx context.Context, guest *models.SGuest, data jsonutils.JSONObject) {
- guest.SetStatus(ctx, task.UserCred, api.VM_MIGRATE_FAILED, "")
- db.OpsLog.LogEvent(guest, db.ACT_MIGRATE_FAIL, data, task.UserCred)
- logclient.AddActionLogWithContext(ctx, guest, logclient.ACT_MIGRATE, data, task.UserCred, false)
- task.SetStageFailed(ctx, data)
- notifyclient.NotifySystemErrorWithCtx(ctx, guest.Id, guest.Name, api.VM_MIGRATE_FAILED, data.String())
- }
|