managedvirtual.go 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389
  1. // Copyright 2019 Yunion
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package hostdrivers
  15. import (
  16. "context"
  17. "fmt"
  18. "io"
  19. "time"
  20. "github.com/pkg/errors"
  21. "yunion.io/x/cloudmux/pkg/cloudprovider"
  22. "yunion.io/x/jsonutils"
  23. "yunion.io/x/log"
  24. "yunion.io/x/pkg/utils"
  25. api "yunion.io/x/onecloud/pkg/apis/compute"
  26. "yunion.io/x/onecloud/pkg/cloudcommon/db"
  27. "yunion.io/x/onecloud/pkg/cloudcommon/db/lockman"
  28. "yunion.io/x/onecloud/pkg/cloudcommon/db/taskman"
  29. "yunion.io/x/onecloud/pkg/compute/models"
  30. "yunion.io/x/onecloud/pkg/compute/options"
  31. "yunion.io/x/onecloud/pkg/httperrors"
  32. "yunion.io/x/onecloud/pkg/mcclient"
  33. "yunion.io/x/onecloud/pkg/mcclient/auth"
  34. modules "yunion.io/x/onecloud/pkg/mcclient/modules/image"
  35. "yunion.io/x/onecloud/pkg/util/logclient"
  36. )
  37. type SManagedVirtualizationHostDriver struct {
  38. SVirtualizationHostDriver
  39. }
  40. func (self *SManagedVirtualizationHostDriver) CheckAndSetCacheImage(ctx context.Context, userCred mcclient.TokenCredential, host *models.SHost, storageCache *models.SStoragecache, task taskman.ITask) error {
  41. input := api.CacheImageInput{}
  42. task.GetParams().Unmarshal(&input)
  43. image := &cloudprovider.SImageCreateOption{}
  44. task.GetParams().Unmarshal(&image)
  45. if len(image.ImageId) == 0 {
  46. return fmt.Errorf("no image_id params")
  47. }
  48. providerName := storageCache.GetProviderName()
  49. if utils.IsInStringArray(providerName, []string{api.CLOUD_PROVIDER_HUAWEI, api.CLOUD_PROVIDER_HCSO, api.CLOUD_PROVIDER_HCS, api.CLOUD_PROVIDER_UCLOUD}) {
  50. image.OsVersion = input.OsFullVersion
  51. }
  52. size := int64(0)
  53. taskman.LocalTaskRun(task, func() (jsonutils.JSONObject, error) {
  54. lockman.LockRawObject(ctx, models.CachedimageManager.Keyword(), fmt.Sprintf("%s-%s", storageCache.Id, image.ImageId))
  55. defer lockman.ReleaseRawObject(ctx, models.CachedimageManager.Keyword(), fmt.Sprintf("%s-%s", storageCache.Id, image.ImageId))
  56. log.Debugf("XXX Hold lockman key %p cachedimages %s-%s", ctx, storageCache.Id, image.ImageId)
  57. scimg := models.StoragecachedimageManager.Register(ctx, task.GetUserCred(), storageCache.Id, image.ImageId, "")
  58. cachedImage := scimg.GetCachedimage()
  59. if cachedImage == nil {
  60. return nil, errors.Wrap(httperrors.ErrImageNotFound, "cached image not found???")
  61. }
  62. iStorageCache, err := storageCache.GetIStorageCache(ctx)
  63. if err != nil {
  64. return nil, errors.Wrap(err, "storageCache.GetIStorageCache")
  65. }
  66. image.ExternalId = scimg.ExternalId
  67. if cloudprovider.TImageType(cachedImage.ImageType) == cloudprovider.ImageTypeCustomized {
  68. var guest *models.SGuest
  69. if len(input.ServerId) > 0 {
  70. server, _ := models.GuestManager.FetchById(input.ServerId)
  71. if server != nil {
  72. guest = server.(*models.SGuest)
  73. }
  74. }
  75. callback := func(progress float32) {
  76. guestInfo := ""
  77. if guest != nil {
  78. guest.SetProgress(progress)
  79. guestInfo = fmt.Sprintf(" for server %s ", guest.Name)
  80. }
  81. log.Infof("Upload image %s from storagecache %s%s status: %.2f%%", image.ImageName, storageCache.Name, guestInfo, progress)
  82. }
  83. image.ExternalId, err = func() (string, error) {
  84. if len(image.ExternalId) > 0 {
  85. log.Debugf("UploadImage: Image external ID exists %s", image.ExternalId)
  86. iImg, err := iStorageCache.GetIImageById(image.ExternalId)
  87. if err != nil {
  88. if errors.Cause(err) != cloudprovider.ErrNotFound {
  89. return "", errors.Wrapf(err, "GetIImageById(%s)", image.ExternalId)
  90. }
  91. return iStorageCache.UploadImage(ctx, image, callback)
  92. }
  93. if iImg.GetImageStatus() == cloudprovider.IMAGE_STATUS_ACTIVE && !input.IsForce {
  94. return image.ExternalId, nil
  95. }
  96. log.Debugf("UploadImage: %s status: %s is_force: %v", image.ExternalId, iImg.GetStatus(), input.IsForce)
  97. err = iImg.Delete(ctx)
  98. if err != nil {
  99. log.Warningf("delete image %s(%s) error: %v", iImg.GetName(), iImg.GetGlobalId(), err)
  100. }
  101. }
  102. s := auth.GetAdminSession(ctx, options.Options.Region)
  103. info, err := modules.Images.Get(s, image.ImageId, nil)
  104. if err != nil {
  105. return "", errors.Wrapf(err, "Images.Get(%s)", image.ImageId)
  106. }
  107. image.Description, _ = info.GetString("description")
  108. image.Checksum, _ = info.GetString("checksum")
  109. minDiskMb, _ := info.Int("min_disk")
  110. image.MinDiskMb = int(minDiskMb)
  111. minRamMb, _ := info.Int("min_ram")
  112. image.MinRamMb = int(minRamMb)
  113. image.TmpPath = options.Options.TempPath
  114. image.GetReader = func(imageId, format string) (io.Reader, int64, error) {
  115. _, reader, sizeByte, err := modules.Images.Download(s, imageId, format, false)
  116. return reader, sizeByte, err
  117. }
  118. log.Debugf("UploadImage: no external ID")
  119. return iStorageCache.UploadImage(ctx, image, callback)
  120. }()
  121. if err != nil {
  122. return nil, err
  123. }
  124. log.Infof("upload image %s id: %s", image.ImageName, image.ExternalId)
  125. } else {
  126. _, err := iStorageCache.GetIImageById(cachedImage.ExternalId)
  127. if err != nil {
  128. return nil, errors.Wrapf(err, "iStorageCache.GetIImageById(%s) for %s", cachedImage.ExternalId, iStorageCache.GetGlobalId())
  129. }
  130. image.ExternalId = cachedImage.ExternalId
  131. size = cachedImage.Size
  132. }
  133. // should record the externalId immediately
  134. // so the waiting goroutine could pick the new externalId
  135. // and avoid duplicate uploading
  136. scimg.SetExternalId(image.ExternalId)
  137. ret := jsonutils.NewDict()
  138. ret.Add(jsonutils.NewString(image.ExternalId), "image_id")
  139. ret.Add(jsonutils.NewInt(size), "size")
  140. return ret, nil
  141. })
  142. return nil
  143. }
  144. func (self *SManagedVirtualizationHostDriver) RequestUncacheImage(ctx context.Context, host *models.SHost, storageCache *models.SStoragecache, task taskman.ITask, deactivateImage bool) error {
  145. params := task.GetParams()
  146. imageId, err := params.GetString("image_id")
  147. if err != nil {
  148. return err
  149. }
  150. scimg := models.StoragecachedimageManager.Register(ctx, task.GetUserCred(), storageCache.Id, imageId, "")
  151. if scimg == nil {
  152. task.ScheduleRun(nil)
  153. return nil
  154. }
  155. if len(scimg.ExternalId) == 0 {
  156. log.Errorf("cached image has not external ID???")
  157. task.ScheduleRun(nil)
  158. return nil
  159. }
  160. taskman.LocalTaskRun(task, func() (jsonutils.JSONObject, error) {
  161. lockman.LockRawObject(ctx, "cachedimages", fmt.Sprintf("%s-%s", storageCache.Id, imageId))
  162. defer lockman.ReleaseRawObject(ctx, "cachedimages", fmt.Sprintf("%s-%s", storageCache.Id, imageId))
  163. iStorageCache, err := storageCache.GetIStorageCache(ctx)
  164. if err != nil {
  165. return nil, errors.Wrapf(err, "GetIStorageCache")
  166. }
  167. iImage, err := iStorageCache.GetIImageById(scimg.ExternalId)
  168. if err != nil {
  169. if errors.Cause(err) == cloudprovider.ErrNotFound {
  170. return nil, nil
  171. }
  172. return nil, errors.Wrap(err, "iStorageCache.GetIImageById")
  173. }
  174. err = iImage.Delete(ctx)
  175. if err != nil {
  176. return nil, errors.Wrap(err, "iImage.Delete")
  177. }
  178. return nil, nil
  179. })
  180. return nil
  181. }
  182. func (self *SManagedVirtualizationHostDriver) RequestPrepareSaveDiskOnHost(ctx context.Context, host *models.SHost, disk *models.SDisk, imageId string, task taskman.ITask) error {
  183. task.ScheduleRun(nil)
  184. return nil
  185. }
  186. func (self *SManagedVirtualizationHostDriver) RequestSaveUploadImageOnHost(ctx context.Context, host *models.SHost, disk *models.SDisk, imageId string, task taskman.ITask, data jsonutils.JSONObject) error {
  187. return cloudprovider.ErrNotSupported
  188. }
  189. func (self *SManagedVirtualizationHostDriver) RequestResizeDiskOnHost(ctx context.Context, host *models.SHost, storage *models.SStorage, disk *models.SDisk, sizeMb int64, task taskman.ITask) error {
  190. iDisk, err := disk.GetIDisk(ctx)
  191. if err != nil {
  192. return errors.Wrapf(err, "GetIDisk")
  193. }
  194. taskman.LocalTaskRun(task, func() (jsonutils.JSONObject, error) {
  195. err = iDisk.Resize(ctx, sizeMb)
  196. if err != nil {
  197. return nil, errors.Wrapf(err, "iDisk.Resize")
  198. }
  199. err = cloudprovider.WaitStatus(iDisk, api.DISK_READY, time.Second*5, time.Minute*3)
  200. if err != nil {
  201. return nil, errors.Wrapf(err, "Wait disk ready")
  202. }
  203. return jsonutils.Marshal(map[string]int64{"disk_size": sizeMb}), nil
  204. })
  205. return nil
  206. }
  207. func (self *SManagedVirtualizationHostDriver) RequestAllocateDiskOnStorage(ctx context.Context, userCred mcclient.TokenCredential, host *models.SHost, storage *models.SStorage, disk *models.SDisk, task taskman.ITask, input api.DiskAllocateInput) error {
  208. iCloudStorage, err := storage.GetIStorage(ctx)
  209. if err != nil {
  210. return err
  211. }
  212. taskman.LocalTaskRun(task, func() (jsonutils.JSONObject, error) {
  213. _cloudprovider := storage.GetCloudprovider()
  214. if _cloudprovider == nil {
  215. return nil, fmt.Errorf("invalid cloudprovider for storage %s(%s)", storage.Name, storage.Id)
  216. }
  217. projectId, err := _cloudprovider.SyncProject(ctx, userCred, disk.ProjectId)
  218. if err != nil {
  219. if errors.Cause(err) != cloudprovider.ErrNotSupported && errors.Cause(err) != cloudprovider.ErrNotImplemented {
  220. logclient.AddSimpleActionLog(disk, logclient.ACT_SYNC_CLOUD_PROJECT, err, userCred, false)
  221. }
  222. }
  223. opts := cloudprovider.DiskCreateConfig{
  224. Name: disk.GetName(),
  225. SizeGb: input.DiskSizeMb >> 10,
  226. ProjectId: projectId,
  227. Iops: disk.Iops,
  228. Throughput: disk.Throughput,
  229. Desc: disk.Description,
  230. }
  231. opts.Tags, _ = disk.GetAllUserMetadata()
  232. iDisk, err := iCloudStorage.CreateIDisk(&opts)
  233. if err != nil {
  234. return nil, err
  235. }
  236. err = db.SetExternalId(disk, task.GetUserCred(), iDisk.GetGlobalId())
  237. if err != nil {
  238. return nil, errors.Wrapf(err, "db.SetExternalId")
  239. }
  240. cloudprovider.WaitStatus(iDisk, api.DISK_READY, time.Second*5, time.Minute*5)
  241. if account := host.GetCloudaccount(); account != nil {
  242. models.SyncVirtualResourceMetadata(ctx, task.GetUserCred(), disk, iDisk, account.ReadOnly)
  243. }
  244. data := jsonutils.NewDict()
  245. data.Add(jsonutils.NewInt(int64(iDisk.GetDiskSizeMB())), "disk_size")
  246. data.Add(jsonutils.NewString(iDisk.GetDiskFormat()), "disk_format")
  247. data.Add(jsonutils.NewString(iDisk.GetAccessPath()), "disk_path")
  248. return data, nil
  249. })
  250. return nil
  251. }
  252. func (self *SManagedVirtualizationHostDriver) RequestDeallocateDiskOnHost(ctx context.Context, host *models.SHost, storage *models.SStorage, disk *models.SDisk, cleanSnapshots bool, task taskman.ITask) error {
  253. taskman.LocalTaskRun(task, func() (jsonutils.JSONObject, error) {
  254. idisk, err := disk.GetIDisk(ctx)
  255. if err != nil {
  256. if errors.Cause(err) == cloudprovider.ErrNotFound {
  257. return nil, nil
  258. }
  259. return nil, err
  260. }
  261. return nil, idisk.Delete(ctx)
  262. })
  263. return nil
  264. }
  265. func (self *SManagedVirtualizationHostDriver) ValidateResetDisk(ctx context.Context, userCred mcclient.TokenCredential, disk *models.SDisk, snapshot *models.SSnapshot, guests []models.SGuest, input *api.DiskResetInput) (*api.DiskResetInput, error) {
  266. return input, nil
  267. }
  268. func (self *SManagedVirtualizationHostDriver) RequestResetDisk(ctx context.Context, host *models.SHost, disk *models.SDisk, params *jsonutils.JSONDict, task taskman.ITask) error {
  269. taskman.LocalTaskRun(task, func() (jsonutils.JSONObject, error) {
  270. iDisk, err := disk.GetIDisk(ctx)
  271. if err != nil {
  272. return nil, errors.Wrapf(err, "GetIDisk")
  273. }
  274. snapshotId, err := params.GetString("snapshot_id")
  275. if err != nil {
  276. return nil, errors.Wrapf(err, "get snapshot_id")
  277. }
  278. exteranlId, err := iDisk.Reset(ctx, snapshotId)
  279. if err != nil {
  280. return nil, errors.Wrapf(err, "Reset")
  281. }
  282. _, err = db.Update(disk, func() error {
  283. if len(exteranlId) > 0 {
  284. disk.ExternalId = exteranlId
  285. }
  286. return nil
  287. })
  288. if err != nil {
  289. return nil, errors.Wrapf(err, "db.Update")
  290. }
  291. iDisk, err = disk.GetIDisk(ctx)
  292. if err != nil {
  293. return nil, errors.Wrapf(err, "GetIDisk")
  294. }
  295. _, err = db.Update(disk, func() error {
  296. if len(exteranlId) > 0 {
  297. disk.DiskSize = iDisk.GetDiskSizeMB()
  298. return nil
  299. }
  300. return nil
  301. })
  302. return nil, err
  303. })
  304. return nil
  305. }
  306. func (self *SManagedVirtualizationHostDriver) RequestRebuildDiskOnStorage(ctx context.Context, host *models.SHost, storage *models.SStorage, disk *models.SDisk, task taskman.ITask, input api.DiskAllocateInput) error {
  307. iDisk, err := disk.GetIDisk(ctx)
  308. if err != nil {
  309. return err
  310. }
  311. taskman.LocalTaskRun(task, func() (jsonutils.JSONObject, error) {
  312. err := iDisk.Rebuild(ctx)
  313. if err != nil {
  314. return nil, err
  315. }
  316. data := jsonutils.NewDict()
  317. data.Add(jsonutils.NewInt(int64(iDisk.GetDiskSizeMB())), "disk_size")
  318. data.Add(jsonutils.NewString(iDisk.GetDiskFormat()), "disk_format")
  319. data.Add(jsonutils.NewString(iDisk.GetAccessPath()), "disk_path")
  320. return data, nil
  321. })
  322. return nil
  323. }
  324. func (driver *SManagedVirtualizationHostDriver) IsReachStoragecacheCapacityLimit(host *models.SHost, cachedImages []models.SCachedimage) bool {
  325. hostDriver, err := host.GetHostDriver()
  326. if err != nil {
  327. return false
  328. }
  329. quota := hostDriver.GetStoragecacheQuota(host)
  330. log.Debugf("Cached image total: %d quota: %d", len(cachedImages), quota)
  331. if quota > 0 && len(cachedImages) >= quota {
  332. return true
  333. }
  334. return false
  335. }