storagecaches.go 32 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993
  1. // Copyright 2019 Yunion
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package models
  15. import (
  16. "context"
  17. "database/sql"
  18. "fmt"
  19. "strings"
  20. "github.com/serialx/hashring"
  21. "yunion.io/x/cloudmux/pkg/cloudprovider"
  22. "yunion.io/x/jsonutils"
  23. "yunion.io/x/log"
  24. "yunion.io/x/pkg/errors"
  25. "yunion.io/x/pkg/util/compare"
  26. "yunion.io/x/pkg/util/imagetools"
  27. "yunion.io/x/sqlchemy"
  28. api "yunion.io/x/onecloud/pkg/apis/compute"
  29. "yunion.io/x/onecloud/pkg/cloudcommon/db"
  30. "yunion.io/x/onecloud/pkg/cloudcommon/db/lockman"
  31. "yunion.io/x/onecloud/pkg/cloudcommon/db/taskman"
  32. "yunion.io/x/onecloud/pkg/httperrors"
  33. "yunion.io/x/onecloud/pkg/mcclient"
  34. "yunion.io/x/onecloud/pkg/util/stringutils2"
  35. )
  36. type SStoragecacheManager struct {
  37. db.SStandaloneResourceBaseManager
  38. db.SExternalizedResourceBaseManager
  39. SManagedResourceBaseManager
  40. }
  41. var StoragecacheManager *SStoragecacheManager
  42. func init() {
  43. StoragecacheManager = &SStoragecacheManager{
  44. SStandaloneResourceBaseManager: db.NewStandaloneResourceBaseManager(
  45. SStoragecache{},
  46. "storagecaches_tbl",
  47. "storagecache",
  48. "storagecaches",
  49. ),
  50. }
  51. StoragecacheManager.SetVirtualObject(StoragecacheManager)
  52. }
  53. type SStoragecache struct {
  54. db.SStandaloneResourceBase
  55. db.SExternalizedResourceBase
  56. SManagedResourceBase
  57. // 镜像存储地址
  58. Path string `width:"256" charset:"utf8" nullable:"true" list:"user" update:"admin" create:"admin_optional"` // = Column(VARCHAR(256, charset='utf8'), nullable=True)
  59. // master host id
  60. MasterHost string `width:"36" charset:"ascii" nullable:"true" list:"user" create:"optional" update:"user" json:"master_host"`
  61. }
  62. func (sc *SStoragecache) getStorages() []SStorage {
  63. storages := make([]SStorage, 0)
  64. q := StorageManager.Query().Equals("storagecache_id", sc.Id)
  65. err := db.FetchModelObjects(StorageManager, q, &storages)
  66. if err != nil {
  67. return nil
  68. }
  69. return storages
  70. }
  71. func (sc *SStoragecache) getValidStorages() []SStorage {
  72. storages := []SStorage{}
  73. q := StorageManager.Query()
  74. zones := ZoneManager.Query().Equals("status", api.ZONE_ENABLE).SubQuery()
  75. q = q.Equals("storagecache_id", sc.Id).
  76. Filter(sqlchemy.In(q.Field("status"), []string{api.STORAGE_ENABLED, api.STORAGE_ONLINE})).
  77. Filter(sqlchemy.IsTrue(q.Field("enabled"))).
  78. Filter(sqlchemy.IsFalse(q.Field("deleted")))
  79. q = q.Join(zones, sqlchemy.Equals(q.Field("zone_id"), zones.Field("id")))
  80. err := db.FetchModelObjects(StorageManager, q, &storages)
  81. if err != nil {
  82. return nil
  83. }
  84. return storages
  85. }
  86. func (sc *SStoragecache) getStorageNames() []string {
  87. storages := sc.getStorages()
  88. if storages == nil {
  89. return nil
  90. }
  91. names := make([]string, len(storages))
  92. for i := 0; i < len(storages); i += 1 {
  93. names[i] = storages[i].Name
  94. }
  95. return names
  96. }
  97. func (sc *SStoragecache) GetEsxiAgentHostDesc() (*jsonutils.JSONDict, error) {
  98. if !strings.Contains(sc.Name, "esxiagent") {
  99. return nil, nil
  100. }
  101. obj, err := BaremetalagentManager.FetchById(sc.ExternalId)
  102. if err != nil {
  103. return nil, errors.Wrapf(err, "unable to fetch baremetalagent %s", obj.GetId())
  104. }
  105. agent := obj.(*SBaremetalagent)
  106. host := &SHost{}
  107. host.Id = agent.Id
  108. host.Name = agent.Name
  109. host.ZoneId = agent.ZoneId
  110. host.SetModelManager(HostManager, host)
  111. ret := host.GetShortDesc(context.Background())
  112. ret.Set("provider", jsonutils.NewString(api.CLOUD_PROVIDER_VMWARE))
  113. ret.Set("brand", jsonutils.NewString(api.CLOUD_PROVIDER_VMWARE))
  114. return ret, nil
  115. }
  116. func (sc *SStoragecache) GetMasterHost() (*SHost, error) {
  117. if sc.MasterHost != "" {
  118. host, err := HostManager.FetchById(sc.MasterHost)
  119. if err != nil {
  120. return nil, errors.Wrap(err, "HostManager.FetchById")
  121. }
  122. return host.(*SHost), nil
  123. }
  124. hostId, err := sc.getHostId()
  125. if err != nil {
  126. return nil, errors.Wrap(err, "sc.getHostId")
  127. }
  128. if len(hostId) == 0 {
  129. return nil, errors.Errorf("failed to get any available host for storagecache %s", sc.Name)
  130. }
  131. host, err := HostManager.FetchById(hostId)
  132. if err != nil {
  133. return nil, errors.Wrap(err, "HostManager.FetchById")
  134. }
  135. return host.(*SHost), nil
  136. }
  137. func (sc *SStoragecache) GetRegion() (*SCloudregion, error) {
  138. host, err := sc.GetMasterHost()
  139. if err != nil {
  140. return nil, errors.Wrapf(err, "GetHost")
  141. }
  142. region, err := host.GetRegion()
  143. if err != nil {
  144. return nil, errors.Wrapf(err, "GetRegion")
  145. }
  146. return region, nil
  147. }
  148. func (sc *SStoragecache) GetHosts() ([]SHost, error) {
  149. hoststorages := HoststorageManager.Query().SubQuery()
  150. storages := StorageManager.Query().SubQuery()
  151. hosts := make([]SHost, 0)
  152. host := HostManager.Query().SubQuery()
  153. q := host.Query(host.Field("id"))
  154. err := q.Join(hoststorages, sqlchemy.AND(
  155. sqlchemy.Equals(hoststorages.Field("host_id"), host.Field("id")),
  156. sqlchemy.OR(
  157. sqlchemy.Equals(host.Field("host_status"), api.HOST_ONLINE),
  158. sqlchemy.Equals(host.Field("host_type"), api.HOST_TYPE_BAREMETAL),
  159. ),
  160. sqlchemy.IsTrue(host.Field("enabled")),
  161. )).
  162. Join(storages, sqlchemy.AND(sqlchemy.Equals(storages.Field("storagecache_id"), sc.Id),
  163. sqlchemy.In(storages.Field("status"), []string{api.STORAGE_ENABLED, api.STORAGE_ONLINE}),
  164. sqlchemy.IsTrue(storages.Field("enabled")))).
  165. Filter(sqlchemy.Equals(hoststorages.Field("storage_id"), storages.Field("id"))).All(&hosts)
  166. if err != nil {
  167. return nil, err
  168. }
  169. return hosts, nil
  170. }
  171. func (sc *SStoragecache) getHostId() (string, error) {
  172. hosts, err := sc.GetHosts()
  173. if err != nil {
  174. return "", errors.Wrap(err, "GetHosts")
  175. }
  176. hostIds := make([]string, 0)
  177. for _, h := range hosts {
  178. hostIds = append(hostIds, h.Id)
  179. }
  180. if len(hostIds) == 0 {
  181. return "", nil
  182. }
  183. ring := hashring.New(hostIds)
  184. ret, _ := ring.GetNode(sc.Id)
  185. return ret, nil
  186. }
  187. func (manager *SStoragecacheManager) SyncWithCloudStoragecache(ctx context.Context, userCred mcclient.TokenCredential, cloudCache cloudprovider.ICloudStoragecache, provider *SCloudprovider, xor bool) (*SStoragecache, bool, error) {
  188. lockman.LockClass(ctx, manager, db.GetLockClassKey(manager, userCred))
  189. defer lockman.ReleaseClass(ctx, manager, db.GetLockClassKey(manager, userCred))
  190. localCacheObj, err := db.FetchByExternalIdAndManagerId(manager, cloudCache.GetGlobalId(), func(q *sqlchemy.SQuery) *sqlchemy.SQuery {
  191. return q.Equals("manager_id", provider.Id)
  192. })
  193. if err != nil {
  194. if err == sql.ErrNoRows {
  195. localCache, err := manager.newFromCloudStoragecache(ctx, userCred, cloudCache, provider)
  196. if err != nil {
  197. return nil, false, err
  198. } else {
  199. return localCache, true, nil
  200. }
  201. } else {
  202. return nil, false, errors.Wrapf(err, "db.FetchByExternalIdAndManagerId(%s)", cloudCache.GetGlobalId())
  203. }
  204. } else {
  205. localCache := localCacheObj.(*SStoragecache)
  206. if !xor {
  207. localCache.syncWithCloudStoragecache(ctx, userCred, cloudCache, provider)
  208. }
  209. return localCache, false, nil
  210. }
  211. }
  212. func (manager *SStoragecacheManager) newFromCloudStoragecache(ctx context.Context, userCred mcclient.TokenCredential, cloudCache cloudprovider.ICloudStoragecache, provider *SCloudprovider) (*SStoragecache, error) {
  213. local := SStoragecache{}
  214. local.SetModelManager(manager, &local)
  215. local.ExternalId = cloudCache.GetGlobalId()
  216. local.IsEmulated = cloudCache.IsEmulated()
  217. local.ManagerId = provider.Id
  218. local.Path = cloudCache.GetPath()
  219. var err = func() error {
  220. lockman.LockRawObject(ctx, manager.Keyword(), "name")
  221. defer lockman.ReleaseRawObject(ctx, manager.Keyword(), "name")
  222. newName, err := db.GenerateName(ctx, manager, userCred, cloudCache.GetName())
  223. if err != nil {
  224. return err
  225. }
  226. local.Name = newName
  227. return manager.TableSpec().Insert(ctx, &local)
  228. }()
  229. if err != nil {
  230. return nil, err
  231. }
  232. db.OpsLog.LogEvent(&local, db.ACT_CREATE, local.GetShortDesc(ctx), userCred)
  233. return &local, nil
  234. }
  235. func (sc *SStoragecache) syncWithCloudStoragecache(ctx context.Context, userCred mcclient.TokenCredential, cloudCache cloudprovider.ICloudStoragecache, provider *SCloudprovider) error {
  236. diff, err := db.UpdateWithLock(ctx, sc, func() error {
  237. sc.Name = cloudCache.GetName()
  238. sc.Path = cloudCache.GetPath()
  239. sc.IsEmulated = cloudCache.IsEmulated()
  240. sc.ManagerId = provider.Id
  241. return nil
  242. })
  243. if err != nil {
  244. return err
  245. }
  246. db.OpsLog.LogSyncUpdate(sc, diff, userCred)
  247. return nil
  248. }
  249. func (manager *SStoragecacheManager) FetchCustomizeColumns(
  250. ctx context.Context,
  251. userCred mcclient.TokenCredential,
  252. query jsonutils.JSONObject,
  253. objs []interface{},
  254. fields stringutils2.SSortedStrings,
  255. isList bool,
  256. ) []api.StoragecacheDetails {
  257. rows := make([]api.StoragecacheDetails, len(objs))
  258. stdRows := manager.SStandaloneResourceBaseManager.FetchCustomizeColumns(ctx, userCred, query, objs, fields, isList)
  259. manRows := manager.SManagedResourceBaseManager.FetchCustomizeColumns(ctx, userCred, query, objs, fields, isList)
  260. for i := range rows {
  261. rows[i] = api.StoragecacheDetails{
  262. StandaloneResourceDetails: stdRows[i],
  263. ManagedResourceInfo: manRows[i],
  264. }
  265. rows[i] = objs[i].(*SStoragecache).getMoreDetails(ctx, rows[i])
  266. }
  267. return rows
  268. }
  269. func (sc *SStoragecache) getMoreDetails(ctx context.Context, out api.StoragecacheDetails) api.StoragecacheDetails {
  270. out.Storages = sc.getStorageNames()
  271. out.Size = sc.getCachedImageSize()
  272. out.Count = sc.getCachedImageCount()
  273. host, _ := sc.GetMasterHost()
  274. if host != nil {
  275. out.Host = host.GetShortDesc(ctx)
  276. }
  277. return out
  278. }
  279. func (sc *SStoragecache) getCachedImageList(excludeIds []string, imageType string, status []string) []SCachedimage {
  280. images := make([]SCachedimage, 0)
  281. cachedImages := CachedimageManager.Query().SubQuery()
  282. storagecachedImages := StoragecachedimageManager.Query().SubQuery()
  283. q := cachedImages.Query()
  284. q = q.Join(storagecachedImages, sqlchemy.Equals(cachedImages.Field("id"), storagecachedImages.Field("cachedimage_id")))
  285. q = q.Filter(sqlchemy.Equals(storagecachedImages.Field("storagecache_id"), sc.Id))
  286. if len(excludeIds) > 0 {
  287. q = q.Filter(sqlchemy.NotIn(cachedImages.Field("id"), excludeIds))
  288. }
  289. if len(imageType) > 0 {
  290. q = q.Filter(sqlchemy.Equals(cachedImages.Field("image_type"), imageType))
  291. }
  292. if len(status) > 0 {
  293. q = q.Filter(sqlchemy.In(storagecachedImages.Field("status"), status))
  294. }
  295. err := db.FetchModelObjects(CachedimageManager, q, &images)
  296. if err != nil {
  297. if err != sql.ErrNoRows {
  298. log.Errorf("%s", err)
  299. }
  300. return nil
  301. }
  302. return images
  303. }
  304. func (sc *SStoragecache) getCachedImages() ([]SStoragecachedimage, error) {
  305. images := make([]SStoragecachedimage, 0)
  306. q := StoragecachedimageManager.Query().Equals("storagecache_id", sc.Id)
  307. err := db.FetchModelObjects(StoragecachedimageManager, q, &images)
  308. if err != nil {
  309. return nil, errors.Wrapf(err, "db.FetchModelObjects")
  310. }
  311. return images, nil
  312. }
  313. func (sc *SStoragecache) getCustomdCachedImages() ([]SStoragecachedimage, error) {
  314. images := make([]SStoragecachedimage, 0)
  315. sq := CachedimageManager.Query("id").Equals("image_type", "customized").SubQuery()
  316. q := StoragecachedimageManager.Query().Equals("storagecache_id", sc.Id).In("cachedimage_id", sq)
  317. err := db.FetchModelObjects(StoragecachedimageManager, q, &images)
  318. if err != nil {
  319. return nil, errors.Wrapf(err, "db.FetchModelObjects")
  320. }
  321. return images, nil
  322. }
  323. func (sc *SStoragecache) getCachedImageCount() int {
  324. images, _ := sc.getCachedImages()
  325. return len(images)
  326. }
  327. func (sc *SStoragecache) getCachedImageSize() int64 {
  328. images, _ := sc.getCachedImages()
  329. if images == nil {
  330. return 0
  331. }
  332. var size int64 = 0
  333. for _, img := range images {
  334. imginfo := img.GetCachedimage()
  335. if imginfo != nil {
  336. size += imginfo.Size
  337. }
  338. }
  339. return size
  340. }
  341. func (manager *SStoragecacheManager) StartImageCacheTask(ctx context.Context, userCred mcclient.TokenCredential, scs []SStoragecache, input api.CacheImageInput) error {
  342. objs := make([]db.IStandaloneModel, len(scs))
  343. inputs := make([]api.CacheImageInput, len(scs))
  344. for i := range scs {
  345. objs[i] = &scs[i]
  346. inputs[i] = input
  347. }
  348. params := jsonutils.NewDict()
  349. params.Add(jsonutils.Marshal(inputs), "params")
  350. task, err := taskman.TaskManager.NewParallelTask(ctx, "StorageBatchCacheImageTask", objs, userCred, params, input.ParentTaskId, "", nil)
  351. if err != nil {
  352. return errors.Wrapf(err, "NewParallelTask")
  353. }
  354. return task.ScheduleRun(nil)
  355. }
  356. func (sc *SStoragecache) StartImageCacheTask(ctx context.Context, userCred mcclient.TokenCredential, input api.CacheImageInput) error {
  357. StoragecachedimageManager.Register(ctx, userCred, sc.Id, input.ImageId, "")
  358. image, _ := CachedimageManager.GetImageById(ctx, userCred, input.ImageId, false)
  359. if image != nil {
  360. imgInfo := imagetools.NormalizeImageInfo(image.Name, image.Properties["os_arch"], image.Properties["os_type"],
  361. image.Properties["os_distribution"], image.Properties["os_version"])
  362. input.OsType = imgInfo.OsType
  363. input.OsArch = imgInfo.OsArch
  364. input.OsDistribution = imgInfo.OsDistro
  365. input.OsVersion = imgInfo.OsVersion
  366. input.OsFullVersion = imgInfo.OsFullVersion
  367. input.ImageName = image.Name
  368. }
  369. data := jsonutils.Marshal(input).(*jsonutils.JSONDict)
  370. task, err := taskman.TaskManager.NewTask(ctx, "StorageCacheImageTask", sc, userCred, data, input.ParentTaskId, "", nil)
  371. if err != nil {
  372. return errors.Wrapf(err, "NewTask")
  373. }
  374. return task.ScheduleRun(nil)
  375. }
  376. func (sc *SStoragecache) StartImageUncacheTask(ctx context.Context, userCred mcclient.TokenCredential, imageId string, isPurge bool, parentTaskId string) error {
  377. data := jsonutils.NewDict()
  378. data.Add(jsonutils.NewString(imageId), "image_id")
  379. if isPurge {
  380. data.Add(jsonutils.JSONTrue, "is_purge")
  381. }
  382. task, err := taskman.TaskManager.NewTask(ctx, "StorageUncacheImageTask", sc, userCred, data, parentTaskId, "", nil)
  383. if err != nil {
  384. return err
  385. }
  386. task.ScheduleRun(nil)
  387. return nil
  388. }
  389. func (sc *SStoragecache) GetIStorageCache(ctx context.Context) (cloudprovider.ICloudStoragecache, error) {
  390. storages := sc.getValidStorages()
  391. if len(storages) == 0 {
  392. msg := fmt.Sprintf("no storages for this storagecache %s(%s)???", sc.Name, sc.Id)
  393. log.Errorf("%v", msg)
  394. return nil, fmt.Errorf("%v", msg)
  395. }
  396. istorage, err := storages[0].GetIStorage(ctx)
  397. if err != nil {
  398. return nil, errors.Wrapf(err, "GetIStorages")
  399. }
  400. return istorage.GetIStoragecache(), nil
  401. }
  402. // 镜像缓存存储列表
  403. func (manager *SStoragecacheManager) ListItemFilter(
  404. ctx context.Context,
  405. q *sqlchemy.SQuery,
  406. userCred mcclient.TokenCredential,
  407. query api.StoragecacheListInput,
  408. ) (*sqlchemy.SQuery, error) {
  409. var err error
  410. q, err = manager.SStandaloneResourceBaseManager.ListItemFilter(ctx, q, userCred, query.StandaloneResourceListInput)
  411. if err != nil {
  412. return nil, errors.Wrap(err, "SStandaloneResourceBaseManager.ListItemFilter")
  413. }
  414. q, err = manager.SExternalizedResourceBaseManager.ListItemFilter(ctx, q, userCred, query.ExternalizedResourceBaseListInput)
  415. if err != nil {
  416. return nil, errors.Wrap(err, "SExternalizedResourceBaseManager.ListItemFilter")
  417. }
  418. q, err = manager.SManagedResourceBaseManager.ListItemFilter(ctx, q, userCred, query.ManagedResourceListInput)
  419. if err != nil {
  420. return nil, errors.Wrap(err, "SManagedResourceBaseManager.ListItemFilter")
  421. }
  422. if len(query.Path) > 0 {
  423. q = q.In("path", query.Path)
  424. }
  425. return q, nil
  426. }
  427. func (manager *SStoragecacheManager) OrderByExtraFields(
  428. ctx context.Context,
  429. q *sqlchemy.SQuery,
  430. userCred mcclient.TokenCredential,
  431. query api.StoragecacheListInput,
  432. ) (*sqlchemy.SQuery, error) {
  433. var err error
  434. q, err = manager.SStandaloneResourceBaseManager.OrderByExtraFields(ctx, q, userCred, query.StandaloneResourceListInput)
  435. if err != nil {
  436. return nil, errors.Wrap(err, "SStandaloneResourceBaseManager.OrderByExtraFields")
  437. }
  438. q, err = manager.SManagedResourceBaseManager.OrderByExtraFields(ctx, q, userCred, query.ManagedResourceListInput)
  439. if err != nil {
  440. return nil, errors.Wrap(err, "SManagedResourceBaseManager.OrderByExtraFields")
  441. }
  442. return q, nil
  443. }
  444. func (manager *SStoragecacheManager) QueryDistinctExtraField(q *sqlchemy.SQuery, field string) (*sqlchemy.SQuery, error) {
  445. var err error
  446. q, err = manager.SStandaloneResourceBaseManager.QueryDistinctExtraField(q, field)
  447. if err == nil {
  448. return q, nil
  449. }
  450. q, err = manager.SManagedResourceBaseManager.QueryDistinctExtraField(q, field)
  451. if err == nil {
  452. return q, nil
  453. }
  454. return q, httperrors.ErrNotFound
  455. }
  456. func (manager *SStoragecacheManager) QueryDistinctExtraFields(q *sqlchemy.SQuery, resource string, fields []string) (*sqlchemy.SQuery, error) {
  457. var err error
  458. q, err = manager.SManagedResourceBaseManager.QueryDistinctExtraFields(q, resource, fields)
  459. if err == nil {
  460. return q, nil
  461. }
  462. return q, httperrors.ErrNotFound
  463. }
  464. func (manager *SStoragecacheManager) FetchStoragecacheById(storageCacheId string) *SStoragecache {
  465. iStorageCache, _ := manager.FetchById(storageCacheId)
  466. if iStorageCache == nil {
  467. return nil
  468. }
  469. return iStorageCache.(*SStoragecache)
  470. }
  471. func (manager *SStoragecacheManager) GetCachePathById(storageCacheId string) string {
  472. iStorageCache, _ := manager.FetchById(storageCacheId)
  473. if iStorageCache == nil {
  474. return ""
  475. }
  476. sc := iStorageCache.(*SStoragecache)
  477. return sc.Path
  478. }
  479. func (sc *SStoragecache) ValidateDeleteCondition(ctx context.Context, info jsonutils.JSONObject) error {
  480. if sc.getCachedImageCount() > 0 {
  481. return httperrors.NewNotEmptyError("storage cache not empty")
  482. }
  483. storages := sc.getStorages()
  484. if len(storages) > 0 {
  485. return httperrors.NewNotEmptyError("referered by storages")
  486. }
  487. return sc.SStandaloneResourceBase.ValidateDeleteCondition(ctx, nil)
  488. }
  489. func (sc *SStoragecache) PerformUncacheImage(ctx context.Context, userCred mcclient.TokenCredential, query jsonutils.JSONObject, data jsonutils.JSONObject) (jsonutils.JSONObject, error) {
  490. imageStr, _ := data.GetString("image")
  491. if len(imageStr) == 0 {
  492. return nil, httperrors.NewMissingParameterError("image")
  493. }
  494. isForce := jsonutils.QueryBoolean(data, "is_force", false)
  495. var imageId string
  496. imgObj, err := CachedimageManager.FetchByIdOrName(ctx, nil, imageStr)
  497. if err != nil {
  498. if err == sql.ErrNoRows {
  499. return nil, httperrors.NewResourceNotFoundError2(CachedimageManager.Keyword(), imageStr)
  500. } else {
  501. return nil, httperrors.NewGeneralError(err)
  502. }
  503. } else {
  504. cachedImage := imgObj.(*SCachedimage)
  505. if cloudprovider.TImageType(cachedImage.ImageType) != cloudprovider.ImageTypeCustomized && !isForce {
  506. return nil, httperrors.NewForbiddenError("cannot uncache non-customized images")
  507. }
  508. imageId = imgObj.GetId()
  509. _, err := CachedimageManager.getImageInfo(ctx, userCred, imageStr, isForce)
  510. if err != nil && errors.Cause(err) != httperrors.ErrNotFound {
  511. log.Infof("get image %s info error %s", imageStr, err)
  512. if !isForce {
  513. return nil, errors.Wrapf(err, "get image %s info", imageStr)
  514. }
  515. }
  516. }
  517. scimg := StoragecachedimageManager.GetStoragecachedimage(sc.Id, imageId)
  518. if scimg == nil {
  519. return nil, httperrors.NewResourceNotFoundError("storage not cache image")
  520. }
  521. if scimg.Status == api.CACHED_IMAGE_STATUS_INIT || isForce {
  522. err = scimg.Detach(ctx, userCred)
  523. return nil, err
  524. }
  525. err = scimg.markDeleting(ctx, userCred, isForce)
  526. if err != nil {
  527. return nil, httperrors.NewInvalidStatusError("Fail to mark cache status: %s", err)
  528. }
  529. err = sc.StartImageUncacheTask(ctx, userCred, imageId, isForce, "")
  530. return nil, err
  531. }
  532. func (sc *SStoragecache) PerformCacheImage(ctx context.Context, userCred mcclient.TokenCredential, query jsonutils.JSONObject, input api.CacheImageInput) (jsonutils.JSONObject, error) {
  533. if len(input.ImageId) == 0 {
  534. return nil, httperrors.NewMissingParameterError("image_id")
  535. }
  536. image, err := CachedimageManager.getImageInfo(ctx, userCred, input.ImageId, input.IsForce)
  537. if err != nil {
  538. return nil, httperrors.NewImageNotFoundError(input.ImageId)
  539. }
  540. if len(image.Checksum) == 0 {
  541. return nil, httperrors.NewInvalidStatusError("Cannot cache image with no checksum")
  542. }
  543. input.ImageId = image.Id
  544. return nil, sc.StartImageCacheTask(ctx, userCred, input)
  545. }
  546. func (sc *SStoragecache) SyncCloudImages(
  547. ctx context.Context,
  548. userCred mcclient.TokenCredential,
  549. iStoragecache cloudprovider.ICloudStoragecache,
  550. region *SCloudregion,
  551. xor bool,
  552. ) compare.SyncResult {
  553. lockman.LockObject(ctx, sc)
  554. defer lockman.ReleaseObject(ctx, sc)
  555. lockman.LockRawObject(ctx, CachedimageManager.Keyword(), sc.Id)
  556. defer lockman.ReleaseRawObject(ctx, CachedimageManager.Keyword(), sc.Id)
  557. result := compare.SyncResult{}
  558. driver, err := sc.GetProviderFactory()
  559. if err != nil {
  560. result.Error(errors.Wrapf(err, "GetProviderFactory(%s)", region.Provider))
  561. return result
  562. }
  563. if driver.IsPublicCloud() {
  564. err = func() error {
  565. err := region.SyncCloudImages(ctx, userCred, false, xor)
  566. if err != nil {
  567. return errors.Wrapf(err, "SyncCloudImages")
  568. }
  569. err = sc.CheckCloudimages(ctx, userCred, region.Name, region.Id)
  570. if err != nil {
  571. return errors.Wrapf(err, "CheckCloudimages")
  572. }
  573. return nil
  574. }()
  575. if err != nil {
  576. log.Errorf("sync public image error: %v", err)
  577. }
  578. log.Debugln("localCachedImages started")
  579. localCachedImages, err := sc.getCustomdCachedImages()
  580. if err != nil {
  581. result.Error(errors.Wrapf(err, "getCustomdCachedImages"))
  582. return result
  583. }
  584. log.Debugf("localCachedImages %d", len(localCachedImages))
  585. remoteImages, err := iStoragecache.GetICustomizedCloudImages()
  586. if err != nil {
  587. result.Error(errors.Wrapf(err, "GetICustomizedCloudImages"))
  588. return result
  589. }
  590. result = sc.syncCloudImages(ctx, userCred, localCachedImages, remoteImages, xor)
  591. } else {
  592. log.Debugln("localCachedImages started")
  593. localCachedImages, err := sc.getCachedImages()
  594. if err != nil {
  595. result.Error(errors.Wrapf(err, "getCachedImages"))
  596. return result
  597. }
  598. log.Debugf("localCachedImages %d", len(localCachedImages))
  599. remoteImages, err := iStoragecache.GetICloudImages()
  600. if err != nil {
  601. result.Error(errors.Wrapf(err, "GetICloudImages"))
  602. return result
  603. }
  604. result = sc.syncCloudImages(ctx, userCred, localCachedImages, remoteImages, xor)
  605. }
  606. return result
  607. }
  608. func (cache *SStoragecache) syncCloudImages(
  609. ctx context.Context,
  610. userCred mcclient.TokenCredential,
  611. localCachedImages []SStoragecachedimage,
  612. remoteImages []cloudprovider.ICloudImage,
  613. xor bool,
  614. ) compare.SyncResult {
  615. syncResult := compare.SyncResult{}
  616. var syncOwnerId mcclient.IIdentityProvider
  617. provider := cache.GetCloudprovider()
  618. if provider != nil {
  619. syncOwnerId = provider.GetOwnerId()
  620. }
  621. removed := make([]SStoragecachedimage, 0)
  622. commondb := make([]SStoragecachedimage, 0)
  623. commonext := make([]cloudprovider.ICloudImage, 0)
  624. added := make([]cloudprovider.ICloudImage, 0)
  625. err := compare.CompareSets(localCachedImages, remoteImages, &removed, &commondb, &commonext, &added)
  626. if err != nil {
  627. syncResult.Error(errors.Wrapf(err, "compare.CompareSets"))
  628. return syncResult
  629. }
  630. for i := 0; i < len(removed); i += 1 {
  631. err := removed[i].syncRemoveCloudImage(ctx, userCred)
  632. if err != nil {
  633. syncResult.DeleteError(err)
  634. } else {
  635. syncResult.Delete()
  636. }
  637. }
  638. if !xor {
  639. for i := 0; i < len(commondb); i += 1 {
  640. err = commondb[i].syncWithCloudImage(ctx, userCred, syncOwnerId, commonext[i], cache.ManagerId)
  641. if err != nil {
  642. syncResult.UpdateError(err)
  643. } else {
  644. syncResult.Update()
  645. }
  646. }
  647. }
  648. for i := 0; i < len(added); i += 1 {
  649. err = StoragecachedimageManager.newFromCloudImage(ctx, userCred, syncOwnerId, added[i], cache)
  650. if err != nil {
  651. syncResult.AddError(err)
  652. } else {
  653. syncResult.Add()
  654. }
  655. }
  656. return syncResult
  657. }
  658. func (sc *SStoragecache) IsReachCapacityLimit(imageId string) bool {
  659. imgObj, _ := CachedimageManager.FetchById(imageId)
  660. if imgObj == nil {
  661. log.Debugf("no such cached image %s", imageId)
  662. return false
  663. }
  664. cachedImage := imgObj.(*SCachedimage)
  665. if cloudprovider.TImageType(cachedImage.ImageType) != cloudprovider.ImageTypeCustomized {
  666. // no need to cache
  667. log.Debugf("image %s is not a customized image, no need to cache", imageId)
  668. return false
  669. }
  670. cachedImages := sc.getCachedImageList(nil, string(cloudprovider.ImageTypeCustomized), []string{api.CACHED_IMAGE_STATUS_ACTIVE})
  671. for i := range cachedImages {
  672. if cachedImages[i].Id == imageId {
  673. // already cached
  674. log.Debugf("image %s has been cached in storage cache %s(%s)", imageId, sc.Id, sc.Name)
  675. return false
  676. }
  677. }
  678. host, _ := sc.GetMasterHost()
  679. if host == nil {
  680. return false
  681. }
  682. driver, _ := host.GetHostDriver()
  683. if driver == nil {
  684. return false
  685. }
  686. return driver.IsReachStoragecacheCapacityLimit(host, cachedImages)
  687. }
  688. func (sc *SStoragecache) GetStoragecachedimages() ([]SStoragecachedimage, error) {
  689. q := StoragecachedimageManager.Query().Equals("storagecache_id", sc.Id)
  690. ret := []SStoragecachedimage{}
  691. return ret, db.FetchModelObjects(StoragecachedimageManager, q, &ret)
  692. }
  693. func (sc *SStoragecache) Delete(ctx context.Context, userCred mcclient.TokenCredential) error {
  694. scis, err := sc.GetStoragecachedimages()
  695. if err != nil {
  696. return errors.Wrapf(err, "GetStoragecachedimages")
  697. }
  698. for i := range scis {
  699. err := scis[i].Delete(ctx, userCred)
  700. if err != nil {
  701. return errors.Wrapf(err, "delete storagecached images %d", scis[i].RowId)
  702. }
  703. }
  704. if len(sc.ManagerId) > 0 {
  705. return db.RealDeleteModel(ctx, userCred, sc)
  706. }
  707. return sc.SStandaloneResourceBase.Delete(ctx, userCred)
  708. }
  709. func (sc *SStoragecache) StartRelinquishLeastUsedCachedImageTask(ctx context.Context, userCred mcclient.TokenCredential, imageId string, parentTaskId string) error {
  710. cachedImages := sc.getCachedImageList([]string{imageId}, string(cloudprovider.ImageTypeCustomized), []string{api.CACHED_IMAGE_STATUS_ACTIVE})
  711. leastUsedIdx := -1
  712. leastRefCount := -1
  713. for i := range cachedImages {
  714. if leastRefCount < 0 || leastRefCount > cachedImages[i].RefCount {
  715. leastRefCount = cachedImages[i].RefCount
  716. leastUsedIdx = i
  717. }
  718. }
  719. return sc.StartImageUncacheTask(ctx, userCred, cachedImages[leastUsedIdx].GetId(), false, parentTaskId)
  720. }
  721. func (cache *SStoragecache) CustomizeDelete(ctx context.Context, userCred mcclient.TokenCredential, query jsonutils.JSONObject, data jsonutils.JSONObject) error {
  722. err := cache.SStandaloneResourceBase.CustomizeDelete(ctx, userCred, query, data)
  723. if err != nil {
  724. return err
  725. }
  726. if len(cache.ExternalId) > 0 {
  727. agentObj, err := BaremetalagentManager.FetchById(cache.ExternalId)
  728. if err == nil {
  729. agentObj.(*SBaremetalagent).setStoragecacheId("")
  730. } else if err != sql.ErrNoRows {
  731. return err
  732. }
  733. }
  734. return nil
  735. }
  736. func (manager *SStoragecacheManager) ListItemExportKeys(ctx context.Context,
  737. q *sqlchemy.SQuery,
  738. userCred mcclient.TokenCredential,
  739. keys stringutils2.SSortedStrings,
  740. ) (*sqlchemy.SQuery, error) {
  741. q, err := manager.SStandaloneResourceBaseManager.ListItemExportKeys(ctx, q, userCred, keys)
  742. if err != nil {
  743. return nil, errors.Wrap(err, "SStandaloneResourceBaseManager.ListItemExportKeys")
  744. }
  745. if keys.ContainsAny(manager.SManagedResourceBaseManager.GetExportKeys()...) {
  746. q, err = manager.SManagedResourceBaseManager.ListItemExportKeys(ctx, q, userCred, keys)
  747. if err != nil {
  748. return nil, errors.Wrap(err, "SManagedResourceBaseManager.ListItemExportKeys")
  749. }
  750. }
  751. return q, nil
  752. }
  753. func (sc *SStoragecache) linkCloudimages(ctx context.Context, regionName, regionId string) (int, error) {
  754. cloudimages := CloudimageManager.Query("external_id").Equals("cloudregion_id", regionId).SubQuery()
  755. sq := StoragecachedimageManager.Query("cachedimage_id").Equals("storagecache_id", sc.Id).SubQuery()
  756. q := CachedimageManager.Query().Equals("image_type", cloudprovider.ImageTypeSystem).In("external_id", cloudimages).NotIn("id", sq)
  757. images := []SCachedimage{}
  758. err := db.FetchModelObjects(CachedimageManager, q, &images)
  759. if err != nil {
  760. return 0, errors.Wrapf(err, "db.FetchModelObjects")
  761. }
  762. for i := range images {
  763. sci := &SStoragecachedimage{}
  764. sci.SetModelManager(StoragecachedimageManager, sci)
  765. sci.StoragecacheId = sc.Id
  766. sci.CachedimageId = images[i].Id
  767. sci.Status = api.CACHED_IMAGE_STATUS_ACTIVE
  768. err = StoragecachedimageManager.TableSpec().Insert(ctx, sci)
  769. if err != nil {
  770. return 0, errors.Wrapf(err, "Insert")
  771. }
  772. }
  773. return len(images), nil
  774. }
  775. func (sc *SStoragecache) unlinkCloudimages(ctx context.Context, userCred mcclient.TokenCredential, regionName, regionId string) (int, error) {
  776. cloudimages := CloudimageManager.Query("external_id").Equals("cloudregion_id", regionId).SubQuery()
  777. sq := CachedimageManager.Query("id").Equals("image_type", cloudprovider.ImageTypeSystem).NotIn("external_id", cloudimages).SubQuery()
  778. q := StoragecachedimageManager.Query().Equals("storagecache_id", sc.Id).In("cachedimage_id", sq)
  779. scis := []SStoragecachedimage{}
  780. err := db.FetchModelObjects(StoragecachedimageManager, q, &scis)
  781. if err != nil {
  782. return 0, errors.Wrapf(err, "db.FetchModelObjects")
  783. }
  784. for i := range scis {
  785. err = scis[i].Delete(ctx, userCred)
  786. if err != nil {
  787. log.Warningf("detach image %v error: %v", scis[i].GetCachedimage(), err)
  788. }
  789. }
  790. return len(scis), nil
  791. }
  792. func (sc *SStoragecache) updateSystemImageStatus() (int, error) {
  793. sq := CachedimageManager.Query("id").Equals("image_type", cloudprovider.ImageTypeSystem)
  794. q := StoragecachedimageManager.Query().
  795. Equals("storagecache_id", sc.Id).In("cachedimage_id", sq.SubQuery()).
  796. NotEquals("status", api.CACHED_IMAGE_STATUS_ACTIVE)
  797. scis := []SStoragecachedimage{}
  798. err := db.FetchModelObjects(StoragecachedimageManager, q, &scis)
  799. if err != nil {
  800. return 0, errors.Wrapf(err, "db.FetchModelObjects")
  801. }
  802. for i := range scis {
  803. db.Update(&scis[i], func() error {
  804. scis[i].Status = api.CACHED_IMAGE_STATUS_ACTIVE
  805. return nil
  806. })
  807. }
  808. return len(scis), nil
  809. }
  810. func (sc *SStoragecache) getSystemImageCount() (int, error) {
  811. sq := StoragecachedimageManager.Query("cachedimage_id").Equals("storagecache_id", sc.Id)
  812. q := CachedimageManager.Query().Equals("image_type", cloudprovider.ImageTypeSystem).In("id", sq.SubQuery())
  813. return q.CountWithError()
  814. }
  815. func (sc *SStoragecache) CheckCloudimages(ctx context.Context, userCred mcclient.TokenCredential, regionName, regionId string) error {
  816. lockman.LockRawObject(ctx, CachedimageManager.Keyword(), regionId)
  817. defer lockman.ReleaseRawObject(ctx, CachedimageManager.Keyword(), regionId)
  818. result := compare.SyncResult{}
  819. var err error
  820. result.DelCnt, err = sc.unlinkCloudimages(ctx, userCred, regionName, regionId)
  821. if err != nil {
  822. return errors.Wrapf(err, "unlinkCloudimages")
  823. }
  824. sc.updateSystemImageStatus()
  825. result.UpdateCnt, err = sc.getSystemImageCount()
  826. if err != nil {
  827. log.Errorf("getSystemImageCount error: %v", err)
  828. }
  829. result.AddCnt, err = sc.linkCloudimages(ctx, regionName, regionId)
  830. if err != nil {
  831. return errors.Wrapf(err, "linkCloudimages")
  832. }
  833. log.Infof("SycSystemImages for region %s(%s) storagecache %s result: %s", regionName, regionId, sc.Name, result.Result())
  834. return nil
  835. }
  836. func (manager *SStoragecacheManager) FetchStoragecaches(filter func(q *sqlchemy.SQuery) *sqlchemy.SQuery) ([]SStoragecache, error) {
  837. q := manager.Query()
  838. q = filter(q)
  839. storageCaches := make([]SStoragecache, 0)
  840. err := db.FetchModelObjects(manager, q, &storageCaches)
  841. if err != nil {
  842. return nil, errors.Wrap(err, "FetchModelObjects")
  843. }
  844. return storageCaches, nil
  845. }
  846. func (manager *SStoragecacheManager) FetchStoragecachesByFilters(ctx context.Context, filters api.SStorageCacheFilters) ([]SStoragecache, error) {
  847. return manager.FetchStoragecaches(func(q *sqlchemy.SQuery) *sqlchemy.SQuery {
  848. storagesQ := StorageManager.Query().IsTrue("enabled").Equals("status", api.STORAGE_ONLINE)
  849. if len(filters.StorageType) > 0 || !filters.StorageTags.IsEmpty() {
  850. if len(filters.StorageType) > 0 {
  851. storagesQ = storagesQ.In("storage_type", filters.StorageType)
  852. }
  853. if !filters.StorageTags.IsEmpty() {
  854. storagesQ = db.ObjectIdQueryWithTagFilters(ctx, storagesQ, "id", StorageManager.Keyword(), filters.StorageTags)
  855. }
  856. }
  857. hostsQ := HostManager.Query().IsTrue("enabled").Equals("status", api.HOST_STATUS_RUNNING).Equals("host_status", api.HOST_ONLINE)
  858. if len(filters.HostType) > 0 || !filters.HostTags.IsEmpty() {
  859. if len(filters.HostType) > 0 {
  860. hostsQ = hostsQ.In("host_type", filters.HostType)
  861. }
  862. if !filters.HostTags.IsEmpty() {
  863. hostsQ = db.ObjectIdQueryWithTagFilters(ctx, hostsQ, "id", HostManager.Keyword(), filters.HostTags)
  864. }
  865. }
  866. hosts := hostsQ.SubQuery()
  867. hostStorages := HoststorageManager.Query().SubQuery()
  868. storages := storagesQ.SubQuery()
  869. q = q.Join(storages, sqlchemy.Equals(storages.Field("storagecache_id"), q.Field("id")))
  870. q = q.Join(hostStorages, sqlchemy.Equals(storages.Field("id"), hostStorages.Field("storage_id")))
  871. q = q.Join(hosts, sqlchemy.Equals(hostStorages.Field("host_id"), hosts.Field("id")))
  872. return q.Distinct()
  873. })
  874. }