snapshotpolicy.go 35 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021
  1. // Copyright 2019 Yunion
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package models
  15. import (
  16. "context"
  17. "fmt"
  18. "strings"
  19. "yunion.io/x/cloudmux/pkg/cloudprovider"
  20. "yunion.io/x/jsonutils"
  21. "yunion.io/x/log"
  22. "yunion.io/x/pkg/errors"
  23. "yunion.io/x/pkg/util/compare"
  24. "yunion.io/x/pkg/utils"
  25. "yunion.io/x/sqlchemy"
  26. "yunion.io/x/onecloud/pkg/apis"
  27. api "yunion.io/x/onecloud/pkg/apis/compute"
  28. "yunion.io/x/onecloud/pkg/cloudcommon/db"
  29. "yunion.io/x/onecloud/pkg/cloudcommon/db/lockman"
  30. "yunion.io/x/onecloud/pkg/cloudcommon/db/taskman"
  31. "yunion.io/x/onecloud/pkg/cloudcommon/notifyclient"
  32. "yunion.io/x/onecloud/pkg/cloudcommon/validators"
  33. "yunion.io/x/onecloud/pkg/compute/options"
  34. "yunion.io/x/onecloud/pkg/httperrors"
  35. "yunion.io/x/onecloud/pkg/mcclient"
  36. "yunion.io/x/onecloud/pkg/util/logclient"
  37. "yunion.io/x/onecloud/pkg/util/stringutils2"
  38. )
  39. // +onecloud:swagger-gen-model-singular=snapshotpolicy
  40. // +onecloud:swagger-gen-model-plural=snapshotpolicies
  41. type SSnapshotPolicyManager struct {
  42. db.SVirtualResourceBaseManager
  43. db.SExternalizedResourceBaseManager
  44. SManagedResourceBaseManager
  45. SCloudregionResourceBaseManager
  46. }
  47. type SSnapshotPolicy struct {
  48. db.SVirtualResourceBase
  49. db.SExternalizedResourceBase
  50. SManagedResourceBase
  51. SCloudregionResourceBase `width:"36" charset:"ascii" nullable:"false" list:"domain" create:"domain_required" default:"default"`
  52. // 快照保留天数, -1: 表示永久保留
  53. RetentionDays int `nullable:"false" list:"user" get:"user" update:"user" create:"required"`
  54. // 快照保留数量, 优先级高于 RetentionDays, 且仅对本地IDC资源有效
  55. RetentionCount int `nullable:"true" list:"user" get:"user" update:"user" create:"optional"`
  56. // 快照类型, 目前支持 disk, server
  57. // disk: 自动磁盘快照策略, 只能关联磁盘
  58. // server: 自动主机快照策略, 只能关联主机
  59. Type string `width:"36" charset:"ascii" default:"disk" list:"user" create:"required"`
  60. // 1~7, 1 is Monday, 7 is Sunday
  61. RepeatWeekdays api.RepeatWeekdays `charset:"utf8" create:"required" list:"user" get:"user" update:"user"`
  62. // 0~23, 每小时
  63. // 创建自动快照策略的时间必须与 RepeatWeekdays 对应的创建周期相一致
  64. TimePoints api.TimePoints `charset:"utf8" create:"required" list:"user" get:"user" update:"user"`
  65. }
  66. var SnapshotPolicyManager *SSnapshotPolicyManager
  67. func init() {
  68. SnapshotPolicyManager = &SSnapshotPolicyManager{
  69. SVirtualResourceBaseManager: db.NewVirtualResourceBaseManager(
  70. SSnapshotPolicy{},
  71. "snapshot_policies_tbl",
  72. "snapshotpolicy",
  73. "snapshotpolicies",
  74. ),
  75. }
  76. SnapshotPolicyManager.SetVirtualObject(SnapshotPolicyManager)
  77. }
  78. // 创建自动快照策略
  79. func (manager *SSnapshotPolicyManager) ValidateCreateData(
  80. ctx context.Context,
  81. userCred mcclient.TokenCredential,
  82. ownerId mcclient.IIdentityProvider,
  83. query jsonutils.JSONObject,
  84. input *api.SSnapshotPolicyCreateInput,
  85. ) (*api.SSnapshotPolicyCreateInput, error) {
  86. if input.RetentionDays < -1 || input.RetentionDays == 0 || input.RetentionDays > options.Options.RetentionDaysLimit {
  87. return nil, httperrors.NewInputParameterError("Retention days must in 1~%d or -1", options.Options.RetentionDaysLimit)
  88. }
  89. if input.RetentionCount > options.Options.RetentionCountLimit {
  90. return nil, httperrors.NewInputParameterError("Retention count must less than %d", options.Options.RetentionCountLimit)
  91. }
  92. var err error
  93. input.VirtualResourceCreateInput, err = manager.SVirtualResourceBaseManager.ValidateCreateData(ctx, userCred, ownerId, query, input.VirtualResourceCreateInput)
  94. if err != nil {
  95. return nil, err
  96. }
  97. if len(input.Type) == 0 {
  98. input.Type = api.SNAPSHOT_POLICY_TYPE_DISK
  99. }
  100. input.Status = apis.STATUS_CREATING
  101. if len(input.CloudregionId) == 0 {
  102. input.CloudregionId = api.DEFAULT_REGION_ID
  103. }
  104. regionObj, err := validators.ValidateModel(ctx, userCred, CloudregionManager, &input.CloudregionId)
  105. if err != nil {
  106. return nil, err
  107. }
  108. region := regionObj.(*SCloudregion)
  109. input, err = region.GetDriver().ValidateCreateSnapshotPolicy(ctx, userCred, region, input)
  110. if err != nil {
  111. return nil, err
  112. }
  113. err = input.Validate()
  114. if err != nil {
  115. return nil, err
  116. }
  117. return input, nil
  118. }
  119. func (sp *SSnapshotPolicy) PostCreate(ctx context.Context, userCred mcclient.TokenCredential, ownerId mcclient.IIdentityProvider, query jsonutils.JSONObject, data jsonutils.JSONObject) {
  120. sp.StartCreateTask(ctx, userCred)
  121. }
  122. func (sp *SSnapshotPolicy) StartCreateTask(ctx context.Context, userCred mcclient.TokenCredential) error {
  123. task, err := taskman.TaskManager.NewTask(ctx, "SnapshotPolicyCreateTask", sp, userCred, nil, "", "", nil)
  124. if err != nil {
  125. return errors.Wrapf(err, "NewTask")
  126. }
  127. return task.ScheduleRun(nil)
  128. }
  129. // 更新自动快照策略
  130. func (self *SSnapshotPolicy) ValidateUpdateData(ctx context.Context, userCred mcclient.TokenCredential, query jsonutils.JSONObject, input *api.SSnapshotPolicyUpdateInput) (*api.SSnapshotPolicyUpdateInput, error) {
  131. var err error
  132. input.VirtualResourceBaseUpdateInput, err = self.SVirtualResourceBase.ValidateUpdateData(ctx, userCred, query, input.VirtualResourceBaseUpdateInput)
  133. if err != nil {
  134. return input, errors.Wrap(err, "SVirtualResourceBase.ValidateUpdateData")
  135. }
  136. if input.RetentionDays != nil {
  137. if *input.RetentionDays < -1 || *input.RetentionDays == 0 || *input.RetentionDays > options.Options.RetentionDaysLimit {
  138. return nil, httperrors.NewInputParameterError("Retention days must in 1~%d or -1", options.Options.RetentionDaysLimit)
  139. }
  140. }
  141. if input.RetentionCount != nil {
  142. if *input.RetentionCount > options.Options.RetentionCountLimit {
  143. return nil, httperrors.NewInputParameterError("Retention count must less than %d", options.Options.RetentionCountLimit)
  144. }
  145. }
  146. err = input.Validate()
  147. if err != nil {
  148. return nil, err
  149. }
  150. return input, nil
  151. }
  152. func (sp *SSnapshotPolicy) CustomizeDelete(ctx context.Context, userCred mcclient.TokenCredential, query jsonutils.JSONObject, data jsonutils.JSONObject) error {
  153. return sp.StartDeleteTask(ctx, userCred)
  154. }
  155. func (sp *SSnapshotPolicy) StartDeleteTask(ctx context.Context, userCred mcclient.TokenCredential) error {
  156. sp.SetStatus(ctx, userCred, apis.STATUS_DELETING, "")
  157. task, err := taskman.TaskManager.NewTask(ctx, "SnapshotPolicyDeleteTask", sp, userCred, nil, "", "", nil)
  158. if err != nil {
  159. return errors.Wrapf(err, "NewTask")
  160. }
  161. return task.ScheduleRun(nil)
  162. }
  163. func (manager *SSnapshotPolicyManager) FetchCustomizeColumns(
  164. ctx context.Context,
  165. userCred mcclient.TokenCredential,
  166. query jsonutils.JSONObject,
  167. objs []interface{},
  168. fields stringutils2.SSortedStrings,
  169. isList bool,
  170. ) []api.SnapshotPolicyDetails {
  171. rows := make([]api.SnapshotPolicyDetails, len(objs))
  172. virtRows := manager.SVirtualResourceBaseManager.FetchCustomizeColumns(ctx, userCred, query, objs, fields, isList)
  173. manRows := manager.SManagedResourceBaseManager.FetchCustomizeColumns(ctx, userCred, query, objs, fields, isList)
  174. regionRows := manager.SCloudregionResourceBaseManager.FetchCustomizeColumns(ctx, userCred, query, objs, fields, isList)
  175. policyIds := make([]string, len(objs))
  176. for i := range rows {
  177. rows[i] = api.SnapshotPolicyDetails{
  178. VirtualResourceDetails: virtRows[i],
  179. ManagedResourceInfo: manRows[i],
  180. CloudregionResourceInfo: regionRows[i],
  181. }
  182. policy := objs[i].(*SSnapshotPolicy)
  183. policyIds[i] = policy.Id
  184. }
  185. diskIds := []string{}
  186. q := SnapshotPolicyResourceManager.Query().In("snapshotpolicy_id", policyIds)
  187. sprs := []SSnapshotPolicyResource{}
  188. err := q.All(&sprs)
  189. if err != nil {
  190. log.Errorf("query snapshot policy resources error: %v", err)
  191. return rows
  192. }
  193. sprmap := map[string][]SSnapshotPolicyResource{}
  194. for _, sp := range sprs {
  195. _, ok := sprmap[sp.SnapshotpolicyId]
  196. if !ok {
  197. sprmap[sp.SnapshotpolicyId] = []SSnapshotPolicyResource{}
  198. }
  199. if sp.ResourceType == api.SNAPSHOT_POLICY_TYPE_DISK {
  200. diskIds = append(diskIds, sp.ResourceId)
  201. }
  202. sprmap[sp.SnapshotpolicyId] = append(sprmap[sp.SnapshotpolicyId], sp)
  203. }
  204. sq := SnapshotManager.Query().In("disk_id", diskIds).SubQuery()
  205. q = sq.Query(
  206. sq.Field("disk_id"),
  207. sqlchemy.COUNT("count", sq.Field("id")),
  208. ).GroupBy(sq.Field("disk_id"))
  209. snapshotCounts := []struct {
  210. DiskId string
  211. Count int
  212. }{}
  213. err = q.All(&snapshotCounts)
  214. if err != nil {
  215. log.Errorf("query snapshot counts error: %v", err)
  216. return rows
  217. }
  218. snapshotCountMap := map[string]int{}
  219. for _, snapshotCount := range snapshotCounts {
  220. snapshotCountMap[snapshotCount.DiskId] = snapshotCount.Count
  221. }
  222. for i := range rows {
  223. resources := sprmap[policyIds[i]]
  224. rows[i].BindingResourceCount = len(resources)
  225. for _, resource := range resources {
  226. if resource.ResourceType == api.SNAPSHOT_POLICY_TYPE_DISK {
  227. cnt, ok := snapshotCountMap[resource.ResourceId]
  228. if ok {
  229. rows[i].SnapshotCount += cnt
  230. }
  231. }
  232. }
  233. sp := objs[i].(*SSnapshotPolicy)
  234. if sp.Type == api.SNAPSHOT_POLICY_TYPE_DISK {
  235. rows[i].BindingDiskCount = len(resources)
  236. }
  237. }
  238. return rows
  239. }
  240. func (sp *SSnapshotPolicy) ExecuteNotify(ctx context.Context, userCred mcclient.TokenCredential, diskName string) {
  241. notifyclient.EventNotify(ctx, userCred, notifyclient.SEventNotifyParam{
  242. Obj: sp,
  243. Action: notifyclient.ActionExecute,
  244. ObjDetailsDecorator: func(ctx context.Context, details *jsonutils.JSONDict) {
  245. details.Set("disk", jsonutils.NewString(diskName))
  246. },
  247. })
  248. }
  249. func (self *SCloudregion) GetSnapshotPolicies(managerId string) ([]SSnapshotPolicy, error) {
  250. q := SnapshotPolicyManager.Query().Equals("cloudregion_id", self.Id)
  251. if len(managerId) > 0 {
  252. q = q.Equals("manager_id", managerId)
  253. }
  254. ret := []SSnapshotPolicy{}
  255. err := db.FetchModelObjects(SnapshotPolicyManager, q, &ret)
  256. if err != nil {
  257. return nil, err
  258. }
  259. return ret, nil
  260. }
  261. func (region *SCloudregion) SyncSnapshotPolicies(
  262. ctx context.Context,
  263. userCred mcclient.TokenCredential,
  264. provider *SCloudprovider,
  265. policies []cloudprovider.ICloudSnapshotPolicy,
  266. syncOwnerId mcclient.IIdentityProvider,
  267. xor bool,
  268. ) compare.SyncResult {
  269. lockman.LockRawObject(ctx, SnapshotPolicyManager.Keyword(), fmt.Sprintf("%s-%s", provider.Id, region.Id))
  270. defer lockman.ReleaseRawObject(ctx, SnapshotPolicyManager.Keyword(), fmt.Sprintf("%s-%s", provider.Id, region.Id))
  271. result := compare.SyncResult{}
  272. dbPolicies, err := region.GetSnapshotPolicies(provider.Id)
  273. if err != nil {
  274. result.Error(err)
  275. return result
  276. }
  277. added := make([]cloudprovider.ICloudSnapshotPolicy, 0, 1)
  278. commonext := make([]cloudprovider.ICloudSnapshotPolicy, 0, 1)
  279. commondb := make([]SSnapshotPolicy, 0, 1)
  280. removed := make([]SSnapshotPolicy, 0, 1)
  281. err = compare.CompareSets(dbPolicies, policies, &removed, &commondb, &commonext, &added)
  282. if err != nil {
  283. result.Error(err)
  284. return result
  285. }
  286. for i := 0; i < len(removed); i += 1 {
  287. err = removed[i].RealDelete(ctx, userCred)
  288. if err != nil {
  289. result.DeleteError(err)
  290. continue
  291. }
  292. result.Delete()
  293. }
  294. for i := 0; i < len(commondb); i += 1 {
  295. if !xor {
  296. err = commondb[i].SyncWithCloudPolicy(ctx, userCred, provider, commonext[i])
  297. if err != nil {
  298. result.UpdateError(err)
  299. continue
  300. }
  301. }
  302. result.Update()
  303. }
  304. for i := 0; i < len(added); i += 1 {
  305. _, err := region.newFromCloudPolicy(ctx, userCred, provider, added[i])
  306. if err != nil {
  307. result.AddError(err)
  308. continue
  309. }
  310. result.Add()
  311. }
  312. return result
  313. }
  314. func (self *SSnapshotPolicy) SyncWithCloudPolicy(
  315. ctx context.Context, userCred mcclient.TokenCredential,
  316. provider *SCloudprovider,
  317. ext cloudprovider.ICloudSnapshotPolicy,
  318. ) error {
  319. _, err := db.Update(self, func() error {
  320. if options.Options.EnableSyncName {
  321. newName, _ := db.GenerateAlterName(self, ext.GetName())
  322. if len(newName) > 0 {
  323. self.Name = newName
  324. }
  325. }
  326. self.RetentionDays = ext.GetRetentionDays()
  327. var err error
  328. self.RepeatWeekdays, err = ext.GetRepeatWeekdays()
  329. if err != nil {
  330. return errors.Wrapf(err, "GetRepeatWeekdays")
  331. }
  332. self.TimePoints, err = ext.GetTimePoints()
  333. if err != nil {
  334. return errors.Wrapf(err, "GetTimePoints")
  335. }
  336. self.Status = ext.GetStatus()
  337. return nil
  338. })
  339. if err != nil {
  340. return errors.Wrapf(err, "Update")
  341. }
  342. syncOwnerId := provider.GetOwnerId()
  343. SyncCloudProject(ctx, userCred, self, syncOwnerId, ext, provider)
  344. if account, _ := provider.GetCloudaccount(); account != nil {
  345. syncVirtualResourceMetadata(ctx, userCred, self, ext, account.ReadOnly)
  346. }
  347. err = self.SyncDisks(ctx, userCred, ext)
  348. if err != nil {
  349. return errors.Wrapf(err, "SyncDisks")
  350. }
  351. return nil
  352. }
  353. func (self *SCloudregion) newFromCloudPolicy(
  354. ctx context.Context, userCred mcclient.TokenCredential,
  355. provider *SCloudprovider,
  356. ext cloudprovider.ICloudSnapshotPolicy,
  357. ) (*SSnapshotPolicy, error) {
  358. policy := &SSnapshotPolicy{}
  359. policy.SetModelManager(SnapshotPolicyManager, policy)
  360. policy.CloudregionId = self.Id
  361. policy.ManagerId = provider.Id
  362. policy.ExternalId = ext.GetGlobalId()
  363. policy.RetentionDays = ext.GetRetentionDays()
  364. var err error
  365. policy.RepeatWeekdays, err = ext.GetRepeatWeekdays()
  366. if err != nil {
  367. return nil, errors.Wrapf(err, "GetRepeatWeekdays")
  368. }
  369. policy.TimePoints, err = ext.GetTimePoints()
  370. if err != nil {
  371. return nil, errors.Wrapf(err, "GetTimePoints")
  372. }
  373. policy.Status = ext.GetStatus()
  374. policy.Name = ext.GetName()
  375. syncOwnerId := provider.GetOwnerId()
  376. err = func() error {
  377. lockman.LockRawObject(ctx, SnapshotPolicyManager.Keyword(), "name")
  378. defer lockman.ReleaseRawObject(ctx, SnapshotPolicyManager.Keyword(), "name")
  379. newName, err := db.GenerateName(ctx, SnapshotPolicyManager, syncOwnerId, policy.Name)
  380. if err != nil {
  381. return err
  382. }
  383. policy.Name = newName
  384. return SnapshotPolicyManager.TableSpec().Insert(ctx, policy)
  385. }()
  386. if err != nil {
  387. return nil, errors.Wrapf(err, "Insert")
  388. }
  389. SyncCloudProject(ctx, userCred, policy, syncOwnerId, ext, provider)
  390. syncVirtualResourceMetadata(ctx, userCred, policy, ext, false)
  391. err = policy.SyncDisks(ctx, userCred, ext)
  392. if err != nil {
  393. return nil, errors.Wrapf(err, "SyncDisks")
  394. }
  395. return policy, nil
  396. }
  397. func (sp *SSnapshotPolicy) Delete(ctx context.Context, userCred mcclient.TokenCredential) error {
  398. return nil
  399. }
  400. func (sp *SSnapshotPolicy) RealDelete(ctx context.Context, userCred mcclient.TokenCredential) error {
  401. err := SnapshotPolicyResourceManager.RemoveBySnapshotpolicy(sp.Id)
  402. if err != nil {
  403. return errors.Wrapf(err, "RemoveBySnapshotpolicy for policy %s", sp.Name)
  404. }
  405. return sp.SVirtualResourceBase.Delete(ctx, userCred)
  406. }
  407. func (sp *SSnapshotPolicy) StartBindDisksTask(ctx context.Context, userCred mcclient.TokenCredential, diskIds []string) error {
  408. sp.SetStatus(ctx, userCred, api.SNAPSHOT_POLICY_APPLY, jsonutils.Marshal(diskIds).String())
  409. params := jsonutils.Marshal(map[string]interface{}{"disk_ids": diskIds}).(*jsonutils.JSONDict)
  410. task, err := taskman.TaskManager.NewTask(ctx, "SnapshotpolicyBindDisksTask", sp, userCred, params, "", "", nil)
  411. if err != nil {
  412. return errors.Wrapf(err, "NewTask")
  413. }
  414. return task.ScheduleRun(nil)
  415. }
  416. // 绑定磁盘
  417. func (sp *SSnapshotPolicy) PerformBindDisks(
  418. ctx context.Context,
  419. userCred mcclient.TokenCredential,
  420. query jsonutils.JSONObject,
  421. input *api.SnapshotPolicyDisksInput,
  422. ) (jsonutils.JSONObject, error) {
  423. if sp.Type != api.SNAPSHOT_POLICY_TYPE_DISK {
  424. return nil, httperrors.NewBadRequestError("The snapshot policy %s is not a disk snapshot policy", sp.Name)
  425. }
  426. if len(input.Disks) == 0 {
  427. return nil, httperrors.NewMissingParameterError("disks")
  428. }
  429. diskIds := []string{}
  430. for i := range input.Disks {
  431. diskObj, err := validators.ValidateModel(ctx, userCred, DiskManager, &input.Disks[i])
  432. if err != nil {
  433. return nil, err
  434. }
  435. disk := diskObj.(*SDisk)
  436. // 磁盘只能绑定一个快照策略
  437. cnt, err := SnapshotPolicyResourceManager.GetBindingCount(disk.Id, api.SNAPSHOT_POLICY_TYPE_DISK)
  438. if err != nil {
  439. return nil, errors.Wrap(err, "GetBindingCount")
  440. }
  441. if cnt > 0 {
  442. return nil, httperrors.NewConflictError("disk %s already bound to a snapshot policy", disk.Name)
  443. }
  444. // 若磁盘所属主机已绑定主机快照策略,则磁盘不能再绑定
  445. if guest := disk.GetGuest(); guest != nil {
  446. guestCnt, err := SnapshotPolicyResourceManager.GetBindingCount(guest.Id, api.SNAPSHOT_POLICY_TYPE_SERVER)
  447. if err != nil {
  448. return nil, errors.Wrap(err, "GetBindingCount for guest")
  449. }
  450. if guestCnt > 0 {
  451. return nil, httperrors.NewConflictError("guest %s already has server snapshot policy, disk cannot bind snapshot policy", guest.Name)
  452. }
  453. }
  454. if len(sp.ManagerId) > 0 {
  455. storage, err := disk.GetStorage()
  456. if err != nil {
  457. return nil, errors.Wrapf(err, "GetStorage for disk %s", disk.Name)
  458. }
  459. if storage.ManagerId != sp.ManagerId {
  460. return nil, httperrors.NewConflictError("The snapshot policy %s and disk account are different", sp.Name)
  461. }
  462. zone, err := storage.GetZone()
  463. if err != nil {
  464. return nil, errors.Wrapf(err, "GetZone")
  465. }
  466. if sp.CloudregionId != zone.CloudregionId {
  467. return nil, httperrors.NewConflictError("The snapshot policy %s and the disk are in different region", sp.Name)
  468. }
  469. }
  470. if !utils.IsInStringArray(disk.Id, diskIds) {
  471. diskIds = append(diskIds, disk.Id)
  472. }
  473. }
  474. return nil, sp.StartBindDisksTask(ctx, userCred, diskIds)
  475. }
  476. // 绑定资源
  477. // 目前仅支持绑定主机和磁盘
  478. // 磁盘只能绑定一个快照策略,已绑定时报错
  479. // 若磁盘所属主机已绑定主机快照策略,则磁盘不能再绑定快照策略
  480. // 主机只能绑定一个快照策略,已绑定时报错
  481. // 若主机下任意磁盘已绑定快照策略,则主机不能再绑定主机快照策略
  482. func (sp *SSnapshotPolicy) PerformBindResources(
  483. ctx context.Context,
  484. userCred mcclient.TokenCredential,
  485. query jsonutils.JSONObject,
  486. input *api.SnapshotPolicyResourcesInput,
  487. ) (jsonutils.JSONObject, error) {
  488. if len(input.Resources) == 0 {
  489. return nil, httperrors.NewMissingParameterError("resources")
  490. }
  491. for i := range input.Resources {
  492. switch input.Resources[i].Type {
  493. case api.SNAPSHOT_POLICY_TYPE_DISK:
  494. if sp.Type != api.SNAPSHOT_POLICY_TYPE_DISK {
  495. return nil, httperrors.NewBadRequestError("The snapshot policy %s is not a disk snapshot policy", sp.Name)
  496. }
  497. diskObj, err := validators.ValidateModel(ctx, userCred, DiskManager, &input.Resources[i].Id)
  498. if err != nil {
  499. return nil, err
  500. }
  501. disk := diskObj.(*SDisk)
  502. // 磁盘只能绑定一个快照策略
  503. cnt, err := SnapshotPolicyResourceManager.GetBindingCount(disk.Id, api.SNAPSHOT_POLICY_TYPE_DISK)
  504. if err != nil {
  505. return nil, errors.Wrap(err, "GetBindingCount")
  506. }
  507. if cnt > 0 {
  508. return nil, httperrors.NewConflictError("disk %s already bound to a snapshot policy", disk.Name)
  509. }
  510. // 若磁盘所属主机已绑定主机快照策略,则磁盘不能再绑定
  511. if guest := disk.GetGuest(); guest != nil {
  512. guestCnt, err := SnapshotPolicyResourceManager.GetBindingCount(guest.Id, api.SNAPSHOT_POLICY_TYPE_SERVER)
  513. if err != nil {
  514. return nil, errors.Wrap(err, "GetBindingCount for guest")
  515. }
  516. if guestCnt > 0 {
  517. return nil, httperrors.NewConflictError("guest %s already has server snapshot policy, disk cannot bind snapshot policy", guest.Name)
  518. }
  519. }
  520. case api.SNAPSHOT_POLICY_TYPE_SERVER:
  521. if sp.Type != api.SNAPSHOT_POLICY_TYPE_SERVER {
  522. return nil, httperrors.NewBadRequestError("The snapshot policy %s is not a server snapshot policy", sp.Name)
  523. }
  524. guestObj, err := validators.ValidateModel(ctx, userCred, GuestManager, &input.Resources[i].Id)
  525. if err != nil {
  526. return nil, err
  527. }
  528. guest := guestObj.(*SGuest)
  529. // 主机只能绑定一个快照策略
  530. cnt, err := SnapshotPolicyResourceManager.GetBindingCount(guest.Id, api.SNAPSHOT_POLICY_TYPE_SERVER)
  531. if err != nil {
  532. return nil, errors.Wrap(err, "GetBindingCount")
  533. }
  534. if cnt > 0 {
  535. return nil, httperrors.NewConflictError("guest %s already bound to a snapshot policy", guest.Name)
  536. }
  537. // 若主机下任意磁盘已绑定快照策略,则主机不能再绑定主机快照策略
  538. disks, err := guest.GetDisks()
  539. if err != nil {
  540. return nil, errors.Wrap(err, "guest.GetDisks")
  541. }
  542. for _, d := range disks {
  543. diskCnt, err := SnapshotPolicyResourceManager.GetBindingCount(d.Id, api.SNAPSHOT_POLICY_TYPE_DISK)
  544. if err != nil {
  545. return nil, errors.Wrap(err, "GetBindingCount for disk")
  546. }
  547. if diskCnt > 0 {
  548. return nil, httperrors.NewConflictError("guest %s has disk %s bound to snapshot policy, guest cannot bind server snapshot policy", guest.Name, d.Name)
  549. }
  550. }
  551. default:
  552. return nil, httperrors.NewBadRequestError("Invalid resource type: %s", input.Resources[i].Type)
  553. }
  554. }
  555. for i := range input.Resources {
  556. sr := &SSnapshotPolicyResource{}
  557. sr.SetModelManager(SnapshotPolicyResourceManager, sr)
  558. sr.SnapshotpolicyId = sp.Id
  559. sr.ResourceId = input.Resources[i].Id
  560. sr.ResourceType = input.Resources[i].Type
  561. err := SnapshotPolicyResourceManager.TableSpec().Insert(ctx, sr)
  562. if err != nil {
  563. return nil, errors.Wrapf(err, "Insert")
  564. }
  565. }
  566. logclient.AddActionLogWithContext(ctx, sp, logclient.ACT_BIND, input, userCred, true)
  567. return nil, nil
  568. }
  569. // 解绑主机
  570. func (sp *SSnapshotPolicy) PerformUnbindResources(
  571. ctx context.Context,
  572. userCred mcclient.TokenCredential,
  573. query jsonutils.JSONObject,
  574. input *api.SnapshotPolicyResourcesInput,
  575. ) (jsonutils.JSONObject, error) {
  576. for i := range input.Resources {
  577. switch input.Resources[i].Type {
  578. case api.SNAPSHOT_POLICY_TYPE_DISK:
  579. _, err := validators.ValidateModel(ctx, userCred, DiskManager, &input.Resources[i].Id)
  580. if err != nil {
  581. return nil, err
  582. }
  583. case api.SNAPSHOT_POLICY_TYPE_SERVER:
  584. _, err := validators.ValidateModel(ctx, userCred, GuestManager, &input.Resources[i].Id)
  585. if err != nil {
  586. return nil, err
  587. }
  588. default:
  589. return nil, httperrors.NewBadRequestError("Invalid resource type: %s", input.Resources[i].Type)
  590. }
  591. }
  592. for i := range input.Resources {
  593. err := SnapshotPolicyResourceManager.RemoveByResource(input.Resources[i].Id, input.Resources[i].Type)
  594. if err != nil {
  595. return nil, errors.Wrapf(err, "RemoveByResource")
  596. }
  597. }
  598. logclient.AddActionLogWithContext(ctx, sp, logclient.ACT_UNBIND, input, userCred, true)
  599. return nil, nil
  600. }
  601. func (sp *SSnapshotPolicy) StartUnbindDisksTask(ctx context.Context, userCred mcclient.TokenCredential, diskIds []string) error {
  602. sp.SetStatus(ctx, userCred, api.SNAPSHOT_POLICY_CANCEL, jsonutils.Marshal(diskIds).String())
  603. params := jsonutils.Marshal(map[string]interface{}{"disk_ids": diskIds}).(*jsonutils.JSONDict)
  604. task, err := taskman.TaskManager.NewTask(ctx, "SnapshotpolicyUnbindDisksTask", sp, userCred, params, "", "", nil)
  605. if err != nil {
  606. return errors.Wrapf(err, "NewTask")
  607. }
  608. return task.ScheduleRun(nil)
  609. }
  610. // 解绑磁盘
  611. func (sp *SSnapshotPolicy) PerformUnbindDisks(
  612. ctx context.Context,
  613. userCred mcclient.TokenCredential,
  614. query jsonutils.JSONObject,
  615. input *api.SnapshotPolicyDisksInput,
  616. ) (jsonutils.JSONObject, error) {
  617. if sp.Type != api.SNAPSHOT_POLICY_TYPE_DISK {
  618. return nil, httperrors.NewBadRequestError("The snapshot policy %s is not a disk snapshot policy", sp.Name)
  619. }
  620. if len(input.Disks) == 0 {
  621. return nil, httperrors.NewMissingParameterError("disks")
  622. }
  623. diskIds := []string{}
  624. for i := range input.Disks {
  625. diskObj, err := validators.ValidateModel(ctx, userCred, DiskManager, &input.Disks[i])
  626. if err != nil {
  627. return nil, err
  628. }
  629. disk := diskObj.(*SDisk)
  630. if !utils.IsInStringArray(disk.Id, diskIds) {
  631. diskIds = append(diskIds, disk.Id)
  632. }
  633. }
  634. return nil, sp.StartUnbindDisksTask(ctx, userCred, diskIds)
  635. }
  636. // 同步快照策略状态
  637. func (self *SSnapshotPolicy) PerformSyncstatus(
  638. ctx context.Context,
  639. userCred mcclient.TokenCredential,
  640. query jsonutils.JSONObject,
  641. input jsonutils.JSONObject,
  642. ) (jsonutils.JSONObject, error) {
  643. if self.CloudregionId == api.DEFAULT_REGION_ID {
  644. return nil, self.SetStatus(ctx, userCred, apis.STATUS_AVAILABLE, "")
  645. }
  646. return nil, self.StartSyncstatusTask(ctx, userCred, "")
  647. }
  648. func (sp *SSnapshotPolicy) StartSyncstatusTask(ctx context.Context, userCred mcclient.TokenCredential, parentTaskId string) error {
  649. return StartResourceSyncStatusTask(ctx, userCred, sp, "SnapshotpolicySyncstatusTask", parentTaskId)
  650. }
  651. // 快照策略列表
  652. func (manager *SSnapshotPolicyManager) ListItemFilter(
  653. ctx context.Context,
  654. q *sqlchemy.SQuery,
  655. userCred mcclient.TokenCredential,
  656. input api.SnapshotPolicyListInput,
  657. ) (*sqlchemy.SQuery, error) {
  658. q, err := manager.SVirtualResourceBaseManager.ListItemFilter(ctx, q, userCred, input.VirtualResourceListInput)
  659. if err != nil {
  660. return nil, errors.Wrap(err, "SVirtualResourceBaseManager.ListItemFilter")
  661. }
  662. q, err = manager.SExternalizedResourceBaseManager.ListItemFilter(ctx, q, userCred, input.ExternalizedResourceBaseListInput)
  663. if err != nil {
  664. return nil, errors.Wrap(err, "SExternalizedResourceBaseManager.ListItemFilter")
  665. }
  666. q, err = manager.SManagedResourceBaseManager.ListItemFilter(ctx, q, userCred, input.ManagedResourceListInput)
  667. if err != nil {
  668. return nil, errors.Wrap(err, "SManagedResourceBaseManager.ListItemFilter")
  669. }
  670. q, err = manager.SCloudregionResourceBaseManager.ListItemFilter(ctx, q, userCred, input.RegionalFilterListInput)
  671. if err != nil {
  672. return nil, errors.Wrap(err, "SCloudregionResourceBaseManager.ListItemFilter")
  673. }
  674. if len(input.Type) > 0 {
  675. q = q.Equals("type", input.Type)
  676. }
  677. if len(input.ResourceId) > 0 {
  678. sq := SnapshotPolicyResourceManager.Query("snapshotpolicy_id").In("resource_id", input.ResourceId).SubQuery()
  679. q = q.In("id", sq)
  680. }
  681. return q, nil
  682. }
  683. func (manager *SSnapshotPolicyManager) OrderByExtraFields(
  684. ctx context.Context,
  685. q *sqlchemy.SQuery,
  686. userCred mcclient.TokenCredential,
  687. input api.SnapshotPolicyListInput,
  688. ) (*sqlchemy.SQuery, error) {
  689. var err error
  690. q, err = manager.SVirtualResourceBaseManager.OrderByExtraFields(ctx, q, userCred, input.VirtualResourceListInput)
  691. if err != nil {
  692. return nil, errors.Wrap(err, "SVirtualResourceBaseManager.OrderByExtraFields")
  693. }
  694. q, err = manager.SManagedResourceBaseManager.OrderByExtraFields(ctx, q, userCred, input.ManagedResourceListInput)
  695. if err != nil {
  696. return nil, errors.Wrap(err, "SManagedResourceBaseManager.OrderByExtraFields")
  697. }
  698. q, err = manager.SCloudregionResourceBaseManager.OrderByExtraFields(ctx, q, userCred, input.RegionalFilterListInput)
  699. if err != nil {
  700. return nil, errors.Wrap(err, "SCloudregionResourceBaseManager.OrderByExtraFields")
  701. }
  702. if db.NeedOrderQuery([]string{input.OrderByBindDiskCount}) {
  703. sdQ := SnapshotPolicyResourceManager.Query().Equals("resource_type", api.SNAPSHOT_POLICY_TYPE_DISK)
  704. sdSQ := sdQ.AppendField(sdQ.Field("snapshotpolicy_id"), sqlchemy.COUNT("disk_count")).GroupBy(sdQ.Field("snapshotpolicy_id")).SubQuery()
  705. q = q.LeftJoin(sdSQ, sqlchemy.Equals(sdSQ.Field("snapshotpolicy_id"), q.Field("id")))
  706. q = q.AppendField(q.QueryFields()...)
  707. q = q.AppendField(sdSQ.Field("disk_count"))
  708. q = db.OrderByFields(q, []string{input.OrderByBindDiskCount}, []sqlchemy.IQueryField{q.Field("disk_count")})
  709. }
  710. if db.NeedOrderQuery([]string{input.OrderBySnapshotCount}) {
  711. spSQ := SnapshotPolicyResourceManager.Query().Equals("resource_type", api.SNAPSHOT_POLICY_TYPE_DISK).SubQuery()
  712. ssq := SnapshotManager.Query().SubQuery()
  713. pQ := spSQ.Query(
  714. spSQ.Field("snapshotpolicy_id"),
  715. sqlchemy.COUNT("snapshot_count", ssq.Field("id")),
  716. ).Join(ssq, sqlchemy.Equals(ssq.Field("disk_id"), spSQ.Field("resource_id"))).GroupBy(spSQ.Field("snapshotpolicy_id"))
  717. pq := pQ.SubQuery()
  718. q = q.LeftJoin(pq, sqlchemy.Equals(pq.Field("snapshotpolicy_id"), q.Field("id")))
  719. q = q.AppendField(q.QueryFields()...)
  720. q = q.AppendField(pq.Field("snapshot_count"))
  721. q = db.OrderByFields(q, []string{input.OrderBySnapshotCount}, []sqlchemy.IQueryField{q.Field("snapshot_count")})
  722. }
  723. return q, nil
  724. }
  725. func (manager *SSnapshotPolicyManager) QueryDistinctExtraField(q *sqlchemy.SQuery, field string) (*sqlchemy.SQuery, error) {
  726. var err error
  727. q, err = manager.SVirtualResourceBaseManager.QueryDistinctExtraField(q, field)
  728. if err == nil {
  729. return q, nil
  730. }
  731. q, err = manager.SManagedResourceBaseManager.QueryDistinctExtraField(q, field)
  732. if err == nil {
  733. return q, nil
  734. }
  735. q, err = manager.SCloudregionResourceBaseManager.QueryDistinctExtraField(q, field)
  736. if err == nil {
  737. return q, nil
  738. }
  739. return q, httperrors.ErrNotFound
  740. }
  741. func (manager *SSnapshotPolicyManager) QueryDistinctExtraFields(q *sqlchemy.SQuery, resource string, fields []string) (*sqlchemy.SQuery, error) {
  742. var err error
  743. q, err = manager.SManagedResourceBaseManager.QueryDistinctExtraFields(q, resource, fields)
  744. if err == nil {
  745. return q, nil
  746. }
  747. return q, httperrors.ErrNotFound
  748. }
  749. func (manager *SSnapshotPolicyManager) ListItemExportKeys(ctx context.Context,
  750. q *sqlchemy.SQuery,
  751. userCred mcclient.TokenCredential,
  752. keys stringutils2.SSortedStrings,
  753. ) (*sqlchemy.SQuery, error) {
  754. var err error
  755. q, err = manager.SVirtualResourceBaseManager.ListItemExportKeys(ctx, q, userCred, keys)
  756. if err != nil {
  757. return nil, errors.Wrap(err, "SVirtualResourceBaseManager.ListItemExportKeys")
  758. }
  759. if keys.ContainsAny(manager.SCloudregionResourceBaseManager.GetExportKeys()...) {
  760. q, err = manager.SCloudregionResourceBaseManager.ListItemExportKeys(ctx, q, userCred, keys)
  761. if err != nil {
  762. return nil, errors.Wrap(err, "SCloudregionResourceBaseManager.ListItemExportKeys")
  763. }
  764. }
  765. if keys.ContainsAny(manager.SManagedResourceBaseManager.GetExportKeys()...) {
  766. q, err = manager.SManagedResourceBaseManager.ListItemExportKeys(ctx, q, userCred, keys)
  767. if err != nil {
  768. return nil, errors.Wrap(err, "SManagedResourceBaseManager.ListItemExportKeys")
  769. }
  770. }
  771. return q, nil
  772. }
  773. func (self *SSnapshotPolicy) GetISnapshotPolicy(ctx context.Context) (cloudprovider.ICloudSnapshotPolicy, error) {
  774. if len(self.ExternalId) == 0 {
  775. return nil, errors.Wrapf(cloudprovider.ErrNotFound, "empty external id")
  776. }
  777. iRegion, err := self.GetIRegion(ctx)
  778. if err != nil {
  779. return nil, errors.Wrapf(err, "GetIRegion")
  780. }
  781. return iRegion.GetISnapshotPolicyById(self.ExternalId)
  782. }
  783. func (self *SSnapshotPolicy) GetIRegion(ctx context.Context) (cloudprovider.ICloudRegion, error) {
  784. region, err := self.GetRegion()
  785. if err != nil {
  786. return nil, errors.Wrapf(err, "GetRegion")
  787. }
  788. provider, err := self.GetProvider(ctx)
  789. if err != nil {
  790. return nil, errors.Wrapf(err, "GetProvider")
  791. }
  792. return provider.GetIRegionById(region.ExternalId)
  793. }
  794. func (self *SSnapshotPolicy) GetCloudprovider() (*SCloudprovider, error) {
  795. providerObj, err := CloudproviderManager.FetchById(self.ManagerId)
  796. if err != nil {
  797. return nil, errors.Wrapf(err, "FetchById")
  798. }
  799. return providerObj.(*SCloudprovider), nil
  800. }
  801. func (self *SSnapshotPolicy) GetProvider(ctx context.Context) (cloudprovider.ICloudProvider, error) {
  802. manager, err := self.GetCloudprovider()
  803. if err != nil {
  804. return nil, errors.Wrapf(err, "GetProvider")
  805. }
  806. return manager.GetProvider(ctx)
  807. }
  808. func (self *SSnapshotPolicy) GetUnbindDisks(diskIds []string) ([]SDisk, error) {
  809. sq := SnapshotPolicyResourceManager.Query("resource_id").Equals("resource_type", api.SNAPSHOT_POLICY_TYPE_DISK).Equals("snapshotpolicy_id", self.Id).SubQuery()
  810. q := DiskManager.Query().In("id", diskIds)
  811. q = q.Filter(sqlchemy.NotIn(q.Field("id"), sq))
  812. ret := []SDisk{}
  813. err := db.FetchModelObjects(DiskManager, q, &ret)
  814. if err != nil {
  815. return nil, err
  816. }
  817. return ret, nil
  818. }
  819. func (self *SSnapshotPolicy) GetBindDisks(diskIds []string) ([]SDisk, error) {
  820. sq := SnapshotPolicyResourceManager.Query("resource_id").Equals("resource_type", api.SNAPSHOT_POLICY_TYPE_DISK).Equals("snapshotpolicy_id", self.Id).SubQuery()
  821. q := DiskManager.Query().In("id", diskIds)
  822. q = q.Filter(sqlchemy.In(q.Field("id"), sq))
  823. ret := []SDisk{}
  824. err := db.FetchModelObjects(DiskManager, q, &ret)
  825. if err != nil {
  826. return nil, err
  827. }
  828. return ret, nil
  829. }
  830. func (self *SSnapshotPolicy) GetDisks() ([]SDisk, error) {
  831. sq := SnapshotPolicyResourceManager.Query().Equals("resource_type", api.SNAPSHOT_POLICY_TYPE_DISK).Equals("snapshotpolicy_id", self.Id).SubQuery()
  832. q := DiskManager.Query()
  833. q = q.Join(sq, sqlchemy.Equals(q.Field("id"), sq.Field("resource_id")))
  834. ret := []SDisk{}
  835. err := db.FetchModelObjects(DiskManager, q, &ret)
  836. if err != nil {
  837. return nil, err
  838. }
  839. return ret, nil
  840. }
  841. func (sp *SSnapshotPolicy) BindDisks(ctx context.Context, disks []SDisk) error {
  842. for i := range disks {
  843. spd := &SSnapshotPolicyResource{}
  844. spd.SetModelManager(SnapshotPolicyResourceManager, spd)
  845. spd.ResourceId = disks[i].Id
  846. spd.ResourceType = api.SNAPSHOT_POLICY_TYPE_DISK
  847. spd.SnapshotpolicyId = sp.Id
  848. err := SnapshotPolicyResourceManager.TableSpec().Insert(ctx, spd)
  849. if err != nil {
  850. return err
  851. }
  852. }
  853. return nil
  854. }
  855. func (sp *SSnapshotPolicy) UnbindDisks(diskIds []string) error {
  856. vars := []interface{}{sp.Id}
  857. placeholders := make([]string, len(diskIds))
  858. for i := range placeholders {
  859. placeholders[i] = "?"
  860. vars = append(vars, diskIds[i])
  861. }
  862. _, err := sqlchemy.GetDB().Exec(
  863. fmt.Sprintf(
  864. "delete from %s where snapshotpolicy_id = ? and resource_id in (%s)",
  865. SnapshotPolicyResourceManager.TableSpec().Name(), strings.Join(placeholders, ","),
  866. ), vars...,
  867. )
  868. return err
  869. }
  870. func (sp *SSnapshotPolicy) SyncDisks(ctx context.Context, userCred mcclient.TokenCredential, ext cloudprovider.ICloudSnapshotPolicy) error {
  871. extIds, err := ext.GetApplyDiskIds()
  872. if err != nil {
  873. if errors.Cause(err) == cloudprovider.ErrNotImplemented || errors.Cause(err) == cloudprovider.ErrNotSupported {
  874. return nil
  875. }
  876. return errors.Wrapf(err, "GetApplyDiskIds")
  877. }
  878. {
  879. sq := SnapshotPolicyResourceManager.Query("resource_id").Equals("resource_type", api.SNAPSHOT_POLICY_TYPE_DISK).Equals("snapshotpolicy_id", sp.Id).SubQuery()
  880. q := DiskManager.Query().In("id", sq).NotIn("external_id", extIds)
  881. needCancel := []SDisk{}
  882. err = db.FetchModelObjects(DiskManager, q, &needCancel)
  883. if err != nil {
  884. return errors.Wrapf(err, "db.FetchModelObjects")
  885. }
  886. for _, disk := range needCancel {
  887. err = SnapshotPolicyResourceManager.RemoveByResource(disk.Id, api.SNAPSHOT_POLICY_TYPE_DISK)
  888. if err != nil {
  889. return errors.Wrapf(err, "RemoveByResource")
  890. }
  891. }
  892. }
  893. {
  894. sq := SnapshotPolicyResourceManager.Query("resource_id").Equals("resource_type", api.SNAPSHOT_POLICY_TYPE_DISK).Equals("snapshotpolicy_id", sp.Id).SubQuery()
  895. storages := StorageManager.Query().Equals("manager_id", sp.ManagerId).SubQuery()
  896. q := DiskManager.Query()
  897. q = q.Join(storages, sqlchemy.Equals(q.Field("storage_id"), storages.Field("id")))
  898. q = q.Filter(
  899. sqlchemy.AND(
  900. sqlchemy.NotIn(q.Field("id"), sq),
  901. sqlchemy.In(q.Field("external_id"), extIds),
  902. ),
  903. )
  904. needApply := []SDisk{}
  905. err = db.FetchModelObjects(DiskManager, q, &needApply)
  906. if err != nil {
  907. return errors.Wrapf(err, "db.FetchModelObjects")
  908. }
  909. for _, disk := range needApply {
  910. spd := &SSnapshotPolicyResource{}
  911. spd.SetModelManager(SnapshotPolicyResourceManager, spd)
  912. spd.SnapshotpolicyId = sp.Id
  913. spd.ResourceId = disk.Id
  914. spd.ResourceType = api.SNAPSHOT_POLICY_TYPE_DISK
  915. err := SnapshotPolicyResourceManager.TableSpec().Insert(ctx, spd)
  916. if err != nil {
  917. return errors.Wrapf(err, "Insert")
  918. }
  919. }
  920. }
  921. return nil
  922. }