elastic_search.go 24 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719
  1. // Copyright 2019 Yunion
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package models
  15. import (
  16. "context"
  17. "fmt"
  18. "strings"
  19. "time"
  20. "yunion.io/x/cloudmux/pkg/cloudprovider"
  21. "yunion.io/x/jsonutils"
  22. "yunion.io/x/log"
  23. "yunion.io/x/pkg/errors"
  24. "yunion.io/x/pkg/util/compare"
  25. "yunion.io/x/pkg/util/rbacscope"
  26. "yunion.io/x/sqlchemy"
  27. billing_api "yunion.io/x/onecloud/pkg/apis/billing"
  28. api "yunion.io/x/onecloud/pkg/apis/compute"
  29. "yunion.io/x/onecloud/pkg/cloudcommon/db"
  30. "yunion.io/x/onecloud/pkg/cloudcommon/db/lockman"
  31. "yunion.io/x/onecloud/pkg/cloudcommon/db/taskman"
  32. "yunion.io/x/onecloud/pkg/cloudcommon/notifyclient"
  33. "yunion.io/x/onecloud/pkg/compute/options"
  34. "yunion.io/x/onecloud/pkg/httperrors"
  35. "yunion.io/x/onecloud/pkg/mcclient"
  36. "yunion.io/x/onecloud/pkg/util/rbacutils"
  37. "yunion.io/x/onecloud/pkg/util/stringutils2"
  38. )
  39. type SElasticSearchManager struct {
  40. db.SVirtualResourceBaseManager
  41. db.SExternalizedResourceBaseManager
  42. SDeletePreventableResourceBaseManager
  43. SCloudregionResourceBaseManager
  44. SManagedResourceBaseManager
  45. SVpcResourceBaseManager
  46. }
  47. var ElasticSearchManager *SElasticSearchManager
  48. func init() {
  49. ElasticSearchManager = &SElasticSearchManager{
  50. SVirtualResourceBaseManager: db.NewVirtualResourceBaseManager(
  51. SElasticSearch{},
  52. "elastic_searchs_tbl",
  53. "elastic_search",
  54. "elastic_searchs",
  55. ),
  56. }
  57. ElasticSearchManager.SetVirtualObject(ElasticSearchManager)
  58. notifyclient.AddNotifyDBHookResources(ElasticSearchManager.KeywordPlural())
  59. }
  60. type SElasticSearch struct {
  61. db.SVirtualResourceBase
  62. db.SExternalizedResourceBase
  63. SManagedResourceBase
  64. SBillingResourceBase
  65. SCloudregionResourceBase
  66. SDeletePreventableResourceBase
  67. // 版本
  68. Version string `width:"16" charset:"ascii" nullable:"false" list:"user" create:"required"`
  69. // 套餐名称
  70. // example: elasticsearch.sn2ne.xlarge
  71. InstanceType string `width:"64" charset:"utf8" nullable:"true" list:"user" create:"optional"`
  72. // CPU数量
  73. // example: 1
  74. VcpuCount int `nullable:"false" default:"1" list:"user" create:"optional"`
  75. // 内存大小
  76. // example: 1024
  77. VmemSizeGb int `nullable:"false" list:"user" create:"optional"`
  78. // 存储类型
  79. // example: local_ssd
  80. StorageType string `width:"16" charset:"utf8" nullable:"false" list:"user" create:"required"`
  81. // 存储大小
  82. // example: 1024
  83. DiskSizeGb int `nullable:"false" list:"user" create:"required"`
  84. // 实例类型
  85. // example: ha
  86. Category string `width:"16" charset:"ascii" nullable:"false" list:"user" create:"optional"`
  87. VpcId string `width:"36" charset:"ascii" nullable:"true" list:"user" create:"optional" json:"vpc_id"`
  88. NetworkId string `width:"36" charset:"ascii" nullable:"true" list:"user" create:"optional" json:"network_id"`
  89. // 可用区Id
  90. ZoneId string `width:"36" charset:"ascii" nullable:"true" list:"user" create:"optional" json:"zone_id"`
  91. // 是否是多可用区部署
  92. IsMultiAz bool `nullable:"false" default:"false" list:"user" update:"user" create:"optional"`
  93. }
  94. func (manager *SElasticSearchManager) GetContextManagers() [][]db.IModelManager {
  95. return [][]db.IModelManager{
  96. {CloudregionManager},
  97. }
  98. }
  99. // ElasticSearch实例列表
  100. func (man *SElasticSearchManager) ListItemFilter(
  101. ctx context.Context,
  102. q *sqlchemy.SQuery,
  103. userCred mcclient.TokenCredential,
  104. query api.ElasticSearchListInput,
  105. ) (*sqlchemy.SQuery, error) {
  106. var err error
  107. q, err = man.SVirtualResourceBaseManager.ListItemFilter(ctx, q, userCred, query.VirtualResourceListInput)
  108. if err != nil {
  109. return nil, errors.Wrap(err, "SVirtualResourceBaseManager.ListItemFilter")
  110. }
  111. q, err = man.SExternalizedResourceBaseManager.ListItemFilter(ctx, q, userCred, query.ExternalizedResourceBaseListInput)
  112. if err != nil {
  113. return nil, errors.Wrap(err, "SExternalizedResourceBaseManager.ListItemFilter")
  114. }
  115. q, err = man.SDeletePreventableResourceBaseManager.ListItemFilter(ctx, q, userCred, query.DeletePreventableResourceBaseListInput)
  116. if err != nil {
  117. return nil, errors.Wrap(err, "SDeletePreventableResourceBaseManager.ListItemFilter")
  118. }
  119. q, err = man.SManagedResourceBaseManager.ListItemFilter(ctx, q, userCred, query.ManagedResourceListInput)
  120. if err != nil {
  121. return nil, errors.Wrap(err, "SManagedResourceBaseManager.ListItemFilter")
  122. }
  123. q, err = man.SCloudregionResourceBaseManager.ListItemFilter(ctx, q, userCred, query.RegionalFilterListInput)
  124. if err != nil {
  125. return nil, errors.Wrap(err, "SCloudregionResourceBaseManager.ListItemFilter")
  126. }
  127. q, err = man.SVpcResourceBaseManager.ListItemFilter(ctx, q, userCred, query.VpcFilterListInput)
  128. if err != nil {
  129. return nil, errors.Wrap(err, "SVpcResourceBaseManager.ListItemFilter")
  130. }
  131. return q, nil
  132. }
  133. func (man *SElasticSearchManager) OrderByExtraFields(
  134. ctx context.Context,
  135. q *sqlchemy.SQuery,
  136. userCred mcclient.TokenCredential,
  137. query api.ElasticSearchListInput,
  138. ) (*sqlchemy.SQuery, error) {
  139. q, err := man.SVirtualResourceBaseManager.OrderByExtraFields(ctx, q, userCred, query.VirtualResourceListInput)
  140. if err != nil {
  141. return nil, errors.Wrap(err, "SVirtualResourceBaseManager.OrderByExtraFields")
  142. }
  143. q, err = man.SCloudregionResourceBaseManager.OrderByExtraFields(ctx, q, userCred, query.RegionalFilterListInput)
  144. if err != nil {
  145. return nil, errors.Wrap(err, "SCloudregionResourceBaseManager.OrderByExtraFields")
  146. }
  147. q, err = man.SManagedResourceBaseManager.OrderByExtraFields(ctx, q, userCred, query.ManagedResourceListInput)
  148. if err != nil {
  149. return nil, errors.Wrap(err, "SManagedResourceBaseManager.OrderByExtraFields")
  150. }
  151. q, err = man.SVpcResourceBaseManager.OrderByExtraFields(ctx, q, userCred, query.VpcFilterListInput)
  152. if err != nil {
  153. return nil, errors.Wrap(err, "SVpcResourceBaseManager.OrderByExtraFields")
  154. }
  155. return q, nil
  156. }
  157. func (man *SElasticSearchManager) QueryDistinctExtraField(q *sqlchemy.SQuery, field string) (*sqlchemy.SQuery, error) {
  158. q, err := man.SVirtualResourceBaseManager.QueryDistinctExtraField(q, field)
  159. if err == nil {
  160. return q, nil
  161. }
  162. q, err = man.SCloudregionResourceBaseManager.QueryDistinctExtraField(q, field)
  163. if err == nil {
  164. return q, nil
  165. }
  166. q, err = man.SManagedResourceBaseManager.QueryDistinctExtraField(q, field)
  167. if err == nil {
  168. return q, nil
  169. }
  170. q, err = man.SVpcResourceBaseManager.QueryDistinctExtraField(q, field)
  171. if err == nil {
  172. return q, nil
  173. }
  174. return q, httperrors.ErrNotFound
  175. }
  176. func (manager *SElasticSearchManager) QueryDistinctExtraFields(q *sqlchemy.SQuery, resource string, fields []string) (*sqlchemy.SQuery, error) {
  177. var err error
  178. q, err = manager.SManagedResourceBaseManager.QueryDistinctExtraFields(q, resource, fields)
  179. if err == nil {
  180. return q, nil
  181. }
  182. return q, httperrors.ErrNotFound
  183. }
  184. func (man *SElasticSearchManager) ValidateCreateData(ctx context.Context, userCred mcclient.TokenCredential, ownerId mcclient.IIdentityProvider, query jsonutils.JSONObject, input api.ElasticSearchCreateInput) (api.ElasticSearchCreateInput, error) {
  185. return input, httperrors.NewNotImplementedError("Not Implemented")
  186. }
  187. func (manager *SElasticSearchManager) FetchCustomizeColumns(
  188. ctx context.Context,
  189. userCred mcclient.TokenCredential,
  190. query jsonutils.JSONObject,
  191. objs []interface{},
  192. fields stringutils2.SSortedStrings,
  193. isList bool,
  194. ) []api.ElasticSearchDetails {
  195. rows := make([]api.ElasticSearchDetails, len(objs))
  196. virtRows := manager.SVirtualResourceBaseManager.FetchCustomizeColumns(ctx, userCred, query, objs, fields, isList)
  197. manRows := manager.SManagedResourceBaseManager.FetchCustomizeColumns(ctx, userCred, query, objs, fields, isList)
  198. regRows := manager.SCloudregionResourceBaseManager.FetchCustomizeColumns(ctx, userCred, query, objs, fields, isList)
  199. vpcIds := make([]string, len(objs))
  200. netIds := make([]string, len(objs))
  201. zoneIds := make([]string, len(objs))
  202. for i := range rows {
  203. rows[i] = api.ElasticSearchDetails{
  204. VirtualResourceDetails: virtRows[i],
  205. ManagedResourceInfo: manRows[i],
  206. CloudregionResourceInfo: regRows[i],
  207. }
  208. es := objs[i].(*SElasticSearch)
  209. vpcIds[i] = es.VpcId
  210. netIds[i] = es.NetworkId
  211. zoneIds[i] = es.ZoneId
  212. }
  213. vpcMaps, err := db.FetchIdNameMap2(VpcManager, vpcIds)
  214. if err != nil {
  215. return rows
  216. }
  217. netMaps, err := db.FetchIdNameMap2(NetworkManager, netIds)
  218. if err != nil {
  219. return rows
  220. }
  221. zoneMaps, err := db.FetchIdNameMap2(ZoneManager, zoneIds)
  222. if err != nil {
  223. return rows
  224. }
  225. for i := range rows {
  226. rows[i].Vpc, _ = vpcMaps[vpcIds[i]]
  227. rows[i].Network, _ = netMaps[netIds[i]]
  228. rows[i].Zone, _ = zoneMaps[zoneIds[i]]
  229. }
  230. return rows
  231. }
  232. func (self *SCloudregion) GetElasticSearchs(managerId string) ([]SElasticSearch, error) {
  233. q := ElasticSearchManager.Query().Equals("cloudregion_id", self.Id)
  234. if len(managerId) > 0 {
  235. q = q.Equals("manager_id", managerId)
  236. }
  237. ret := []SElasticSearch{}
  238. err := db.FetchModelObjects(ElasticSearchManager, q, &ret)
  239. if err != nil {
  240. return nil, errors.Wrapf(err, "db.FetchModelObjects")
  241. }
  242. return ret, nil
  243. }
  244. func (self *SCloudregion) SyncElasticSearchs(
  245. ctx context.Context,
  246. userCred mcclient.TokenCredential,
  247. provider *SCloudprovider,
  248. exts []cloudprovider.ICloudElasticSearch,
  249. xor bool,
  250. ) compare.SyncResult {
  251. // 加锁防止重入
  252. lockman.LockRawObject(ctx, ElasticSearchManager.KeywordPlural(), fmt.Sprintf("%s-%s", provider.Id, self.Id))
  253. defer lockman.ReleaseRawObject(ctx, ElasticSearchManager.KeywordPlural(), fmt.Sprintf("%s-%s", provider.Id, self.Id))
  254. result := compare.SyncResult{}
  255. dbEss, err := self.GetElasticSearchs(provider.Id)
  256. if err != nil {
  257. result.Error(err)
  258. return result
  259. }
  260. removed := make([]SElasticSearch, 0)
  261. commondb := make([]SElasticSearch, 0)
  262. commonext := make([]cloudprovider.ICloudElasticSearch, 0)
  263. added := make([]cloudprovider.ICloudElasticSearch, 0)
  264. // 本地和云上资源列表进行比对
  265. err = compare.CompareSets(dbEss, exts, &removed, &commondb, &commonext, &added)
  266. if err != nil {
  267. result.Error(err)
  268. return result
  269. }
  270. // 删除云上没有的资源
  271. for i := 0; i < len(removed); i++ {
  272. err := removed[i].syncRemoveCloudElasticSearch(ctx, userCred)
  273. if err != nil {
  274. result.DeleteError(err)
  275. continue
  276. }
  277. result.Delete()
  278. }
  279. if !xor {
  280. // 和云上资源属性进行同步
  281. for i := 0; i < len(commondb); i++ {
  282. err := commondb[i].SyncWithCloudElasticSearch(ctx, userCred, commonext[i])
  283. if err != nil {
  284. result.UpdateError(err)
  285. continue
  286. }
  287. result.Update()
  288. }
  289. }
  290. // 创建本地没有的云上资源
  291. for i := 0; i < len(added); i++ {
  292. _, err := self.newFromCloudElasticSearch(ctx, userCred, provider, added[i])
  293. if err != nil {
  294. result.AddError(err)
  295. continue
  296. }
  297. result.Add()
  298. }
  299. return result
  300. }
  301. type SEsCountStat struct {
  302. TotalEsCount int
  303. TotalCpuCount int
  304. TotalMemSizeGb int
  305. }
  306. func (man *SElasticSearchManager) TotalCount(
  307. ctx context.Context,
  308. scope rbacscope.TRbacScope,
  309. ownerId mcclient.IIdentityProvider,
  310. rangeObjs []db.IStandaloneModel,
  311. providers []string, brands []string, cloudEnv string,
  312. policyResult rbacutils.SPolicyResult,
  313. ) (SEsCountStat, error) {
  314. esq := man.Query()
  315. esq = scopeOwnerIdFilter(esq, scope, ownerId)
  316. esq = CloudProviderFilter(esq, esq.Field("manager_id"), providers, brands, cloudEnv)
  317. esq = RangeObjectsFilter(esq, rangeObjs, esq.Field("cloudregion_id"), nil, esq.Field("manager_id"), nil, nil)
  318. esq = db.ObjectIdQueryWithPolicyResult(ctx, esq, man, policyResult)
  319. sq := esq.SubQuery()
  320. q := sq.Query(sqlchemy.COUNT("total_es_count"),
  321. sqlchemy.SUM("total_cpu_count", sq.Field("vcpu_count")),
  322. sqlchemy.SUM("total_mem_size_gb", sq.Field("vmem_size_gb")))
  323. stat := SEsCountStat{}
  324. row := q.Row()
  325. err := q.Row2Struct(row, &stat)
  326. return stat, err
  327. }
  328. // 判断资源是否可以删除
  329. func (self *SElasticSearch) ValidateDeleteCondition(ctx context.Context, info jsonutils.JSONObject) error {
  330. if self.DisableDelete.IsTrue() {
  331. return httperrors.NewInvalidStatusError("ElasticSearch is locked, cannot delete")
  332. }
  333. return self.SStatusStandaloneResourceBase.ValidateDeleteCondition(ctx, nil)
  334. }
  335. func (self *SElasticSearch) Delete(ctx context.Context, userCred mcclient.TokenCredential) error {
  336. return nil
  337. }
  338. func (self *SElasticSearch) RealDelete(ctx context.Context, userCred mcclient.TokenCredential) error {
  339. return self.SVirtualResourceBase.Delete(ctx, userCred)
  340. }
  341. func (self *SElasticSearch) CustomizeDelete(ctx context.Context, userCred mcclient.TokenCredential, query jsonutils.JSONObject, data jsonutils.JSONObject) error {
  342. return self.StartDeleteTask(ctx, userCred, "")
  343. }
  344. func (self *SElasticSearch) StartDeleteTask(ctx context.Context, userCred mcclient.TokenCredential, parentTaskId string) error {
  345. task, err := taskman.TaskManager.NewTask(ctx, "ElasticSearchDeleteTask", self, userCred, nil, parentTaskId, "", nil)
  346. if err != nil {
  347. return err
  348. }
  349. self.SetStatus(ctx, userCred, api.ELASTIC_SEARCH_STATUS_DELETING, "")
  350. task.ScheduleRun(nil)
  351. return nil
  352. }
  353. func (self *SElasticSearch) GetIRegion(ctx context.Context) (cloudprovider.ICloudRegion, error) {
  354. region, err := self.GetRegion()
  355. if err != nil {
  356. return nil, errors.Wrapf(err, "GetRegion")
  357. }
  358. provider, err := self.GetDriver(ctx)
  359. if err != nil {
  360. return nil, errors.Wrap(err, "self.GetDriver")
  361. }
  362. return provider.GetIRegionById(region.GetExternalId())
  363. }
  364. func (self *SElasticSearch) GetIElasticSearch(ctx context.Context) (cloudprovider.ICloudElasticSearch, error) {
  365. if len(self.ExternalId) == 0 {
  366. return nil, errors.Wrapf(cloudprovider.ErrNotFound, "empty externalId")
  367. }
  368. iRegion, err := self.GetIRegion(ctx)
  369. if err != nil {
  370. return nil, errors.Wrapf(cloudprovider.ErrNotFound, "GetIRegion")
  371. }
  372. return iRegion.GetIElasticSearchById(self.ExternalId)
  373. }
  374. func (self *SElasticSearch) syncRemoveCloudElasticSearch(ctx context.Context, userCred mcclient.TokenCredential) error {
  375. err := self.RealDelete(ctx, userCred)
  376. if err != nil {
  377. return err
  378. }
  379. notifyclient.EventNotify(ctx, userCred, notifyclient.SEventNotifyParam{
  380. Obj: self,
  381. Action: notifyclient.ActionSyncDelete,
  382. })
  383. return nil
  384. }
  385. // 同步资源属性
  386. func (self *SElasticSearch) SyncWithCloudElasticSearch(ctx context.Context, userCred mcclient.TokenCredential, ext cloudprovider.ICloudElasticSearch) error {
  387. diff, err := db.UpdateWithLock(ctx, self, func() error {
  388. if options.Options.EnableSyncName {
  389. newName, _ := db.GenerateAlterName(self, ext.GetName())
  390. if len(newName) > 0 {
  391. self.Name = newName
  392. }
  393. }
  394. self.Status = ext.GetStatus()
  395. self.Version = ext.GetVersion()
  396. self.StorageType = ext.GetStorageType()
  397. self.DiskSizeGb = ext.GetDiskSizeGb()
  398. self.Category = ext.GetCategory()
  399. self.InstanceType = ext.GetInstanceType()
  400. self.VcpuCount = ext.GetVcpuCount()
  401. self.VmemSizeGb = ext.GetVmemSizeGb()
  402. self.IsMultiAz = ext.IsMultiAz()
  403. self.BillingType = billing_api.TBillingType(ext.GetBillingType())
  404. self.ExpiredAt = time.Time{}
  405. self.AutoRenew = false
  406. if self.BillingType == billing_api.BILLING_TYPE_PREPAID {
  407. self.ExpiredAt = ext.GetExpiredAt()
  408. self.AutoRenew = ext.IsAutoRenew()
  409. }
  410. if networkId := ext.GetNetworkId(); len(networkId) > 0 && len(self.NetworkId) == 0 {
  411. _network, err := db.FetchByExternalIdAndManagerId(NetworkManager, networkId, func(q *sqlchemy.SQuery) *sqlchemy.SQuery {
  412. wire := WireManager.Query().SubQuery()
  413. vpc := VpcManager.Query().SubQuery()
  414. return q.Join(wire, sqlchemy.Equals(wire.Field("id"), q.Field("wire_id"))).
  415. Join(vpc, sqlchemy.Equals(vpc.Field("id"), wire.Field("vpc_id"))).
  416. Filter(sqlchemy.Equals(vpc.Field("manager_id"), self.ManagerId))
  417. })
  418. if err != nil {
  419. log.Errorf("failed to found network for elastic search %s by externalId: %s", self.Name, networkId)
  420. } else {
  421. network := _network.(*SNetwork)
  422. self.NetworkId = network.Id
  423. vpc, _ := network.GetVpc()
  424. self.VpcId = vpc.Id
  425. if zone, _ := network.GetZone(); zone != nil {
  426. self.ZoneId = zone.Id
  427. }
  428. }
  429. }
  430. if vpcId := ext.GetVpcId(); len(vpcId) > 0 && len(self.VpcId) == 0 {
  431. vpc, err := db.FetchByExternalIdAndManagerId(VpcManager, vpcId, func(q *sqlchemy.SQuery) *sqlchemy.SQuery {
  432. return q.Equals("manager_id", self.ManagerId)
  433. })
  434. if err == nil {
  435. self.VpcId = vpc.GetId()
  436. }
  437. }
  438. if len(self.ZoneId) == 0 {
  439. zoneId := ext.GetZoneId()
  440. if len(zoneId) > 0 {
  441. region, err := self.GetRegion()
  442. if err != nil {
  443. return errors.Wrapf(err, "GetRegion")
  444. }
  445. zones, err := region.GetZones()
  446. if err != nil {
  447. return errors.Wrapf(err, "GetZone")
  448. }
  449. for _, zone := range zones {
  450. if strings.HasSuffix(zone.ExternalId, zoneId) {
  451. self.ZoneId = zone.Id
  452. break
  453. }
  454. }
  455. }
  456. }
  457. return nil
  458. })
  459. if err != nil {
  460. return errors.Wrapf(err, "db.Update")
  461. }
  462. if len(diff) > 0 {
  463. notifyclient.EventNotify(ctx, userCred, notifyclient.SEventNotifyParam{
  464. Obj: self,
  465. Action: notifyclient.ActionSyncUpdate,
  466. })
  467. }
  468. if account := self.GetCloudaccount(); account != nil {
  469. syncVirtualResourceMetadata(ctx, userCred, self, ext, account.ReadOnly)
  470. }
  471. if provider := self.GetCloudprovider(); provider != nil {
  472. SyncCloudProject(ctx, userCred, self, provider.GetOwnerId(), ext, provider)
  473. }
  474. db.OpsLog.LogSyncUpdate(self, diff, userCred)
  475. return nil
  476. }
  477. func (self *SCloudregion) newFromCloudElasticSearch(ctx context.Context, userCred mcclient.TokenCredential, provider *SCloudprovider, ext cloudprovider.ICloudElasticSearch) (*SElasticSearch, error) {
  478. es := SElasticSearch{}
  479. es.SetModelManager(ElasticSearchManager, &es)
  480. es.ExternalId = ext.GetGlobalId()
  481. es.CloudregionId = self.Id
  482. es.ManagerId = provider.Id
  483. es.IsEmulated = ext.IsEmulated()
  484. es.Status = ext.GetStatus()
  485. es.Version = ext.GetVersion()
  486. es.StorageType = ext.GetStorageType()
  487. es.DiskSizeGb = ext.GetDiskSizeGb()
  488. es.Category = ext.GetCategory()
  489. es.InstanceType = ext.GetInstanceType()
  490. es.VcpuCount = ext.GetVcpuCount()
  491. es.VmemSizeGb = ext.GetVmemSizeGb()
  492. es.IsMultiAz = ext.IsMultiAz()
  493. if networkId := ext.GetNetworkId(); len(networkId) > 0 {
  494. _network, err := db.FetchByExternalIdAndManagerId(NetworkManager, networkId, func(q *sqlchemy.SQuery) *sqlchemy.SQuery {
  495. wire := WireManager.Query().SubQuery()
  496. vpc := VpcManager.Query().SubQuery()
  497. return q.Join(wire, sqlchemy.Equals(wire.Field("id"), q.Field("wire_id"))).
  498. Join(vpc, sqlchemy.Equals(vpc.Field("id"), wire.Field("vpc_id"))).
  499. Filter(sqlchemy.Equals(vpc.Field("manager_id"), provider.Id))
  500. })
  501. if err != nil {
  502. log.Errorf("failed to found network for elastic search %s by externalId: %s", es.Name, networkId)
  503. } else {
  504. network := _network.(*SNetwork)
  505. es.NetworkId = network.Id
  506. vpc, _ := network.GetVpc()
  507. es.VpcId = vpc.Id
  508. if zone, _ := network.GetZone(); zone != nil {
  509. es.ZoneId = zone.Id
  510. }
  511. }
  512. }
  513. if len(es.VpcId) == 0 {
  514. if vpcId := ext.GetVpcId(); len(vpcId) > 0 {
  515. vpc, err := db.FetchByExternalIdAndManagerId(VpcManager, vpcId, func(q *sqlchemy.SQuery) *sqlchemy.SQuery {
  516. return q.Equals("manager_id", provider.Id)
  517. })
  518. if err == nil {
  519. es.VpcId = vpc.GetId()
  520. }
  521. }
  522. }
  523. if len(es.ZoneId) == 0 {
  524. zoneId := ext.GetZoneId()
  525. if len(zoneId) > 0 {
  526. zones, _ := self.GetZones()
  527. for _, zone := range zones {
  528. if strings.HasSuffix(zone.ExternalId, zoneId) {
  529. es.ZoneId = zone.Id
  530. break
  531. }
  532. }
  533. }
  534. }
  535. if createdAt := ext.GetCreatedAt(); !createdAt.IsZero() {
  536. es.CreatedAt = createdAt
  537. }
  538. es.BillingType = billing_api.TBillingType(ext.GetBillingType())
  539. es.ExpiredAt = time.Time{}
  540. es.AutoRenew = false
  541. if es.BillingType == billing_api.BILLING_TYPE_PREPAID {
  542. es.ExpiredAt = ext.GetExpiredAt()
  543. es.AutoRenew = ext.IsAutoRenew()
  544. }
  545. var err error
  546. err = func() error {
  547. // 这里加锁是为了防止名称重复
  548. lockman.LockRawObject(ctx, ElasticSearchManager.Keyword(), "name")
  549. defer lockman.ReleaseRawObject(ctx, ElasticSearchManager.Keyword(), "name")
  550. es.Name, err = db.GenerateName(ctx, ElasticSearchManager, provider.GetOwnerId(), ext.GetName())
  551. if err != nil {
  552. return errors.Wrapf(err, "db.GenerateName")
  553. }
  554. return ElasticSearchManager.TableSpec().Insert(ctx, &es)
  555. }()
  556. if err != nil {
  557. return nil, errors.Wrapf(err, "newFromCloudElasticSearch.Insert")
  558. }
  559. notifyclient.EventNotify(ctx, userCred, notifyclient.SEventNotifyParam{
  560. Obj: &es,
  561. Action: notifyclient.ActionSyncCreate,
  562. })
  563. // 同步标签
  564. syncVirtualResourceMetadata(ctx, userCred, &es, ext, false)
  565. // 同步项目归属
  566. SyncCloudProject(ctx, userCred, &es, provider.GetOwnerId(), ext, provider)
  567. db.OpsLog.LogEvent(&es, db.ACT_CREATE, es.GetShortDesc(ctx), userCred)
  568. return &es, nil
  569. }
  570. func (manager *SElasticSearchManager) ListItemExportKeys(ctx context.Context,
  571. q *sqlchemy.SQuery,
  572. userCred mcclient.TokenCredential,
  573. keys stringutils2.SSortedStrings,
  574. ) (*sqlchemy.SQuery, error) {
  575. var err error
  576. q, err = manager.SVirtualResourceBaseManager.ListItemExportKeys(ctx, q, userCred, keys)
  577. if err != nil {
  578. return nil, errors.Wrap(err, "SVirtualResourceBaseManager.ListItemExportKeys")
  579. }
  580. if keys.ContainsAny(manager.SManagedResourceBaseManager.GetExportKeys()...) {
  581. q, err = manager.SManagedResourceBaseManager.ListItemExportKeys(ctx, q, userCred, keys)
  582. if err != nil {
  583. return nil, errors.Wrap(err, "SManagedResourceBaseManager.ListItemExportKeys")
  584. }
  585. }
  586. if keys.ContainsAny(manager.SCloudregionResourceBaseManager.GetExportKeys()...) {
  587. q, err = manager.SCloudregionResourceBaseManager.ListItemExportKeys(ctx, q, userCred, keys)
  588. if err != nil {
  589. return nil, errors.Wrap(err, "SCloudregionResourceBaseManager.ListItemExportKeys")
  590. }
  591. }
  592. return q, nil
  593. }
  594. // 同步ElasticSearch实例状态
  595. func (self *SElasticSearch) PerformSyncstatus(ctx context.Context, userCred mcclient.TokenCredential, query jsonutils.JSONObject, data jsonutils.JSONObject) (jsonutils.JSONObject, error) {
  596. var openTask = true
  597. count, err := taskman.TaskManager.QueryTasksOfObject(self, time.Now().Add(-3*time.Minute), &openTask).CountWithError()
  598. if err != nil {
  599. return nil, err
  600. }
  601. if count > 0 {
  602. return nil, httperrors.NewBadRequestError("ElasticSearch has %d task active, can't sync status", count)
  603. }
  604. return nil, StartResourceSyncStatusTask(ctx, userCred, self, "ElasticSearchSyncstatusTask", "")
  605. }
  606. func (self *SElasticSearch) GetDetailsAccessInfo(ctx context.Context, userCred mcclient.TokenCredential, input api.ElasticSearchAccessInfoInput) (*cloudprovider.ElasticSearchAccessInfo, error) {
  607. iEs, err := self.GetIElasticSearch(ctx)
  608. if err != nil {
  609. return nil, err
  610. }
  611. return iEs.GetAccessInfo()
  612. }
  613. func (es *SElasticSearch) PostUpdate(ctx context.Context, userCred mcclient.TokenCredential, query jsonutils.JSONObject, data jsonutils.JSONObject) {
  614. es.SVirtualResourceBase.PostUpdate(ctx, userCred, query, data)
  615. }
  616. func (self *SElasticSearch) StartSElasticSearchSyncTask(ctx context.Context, userCred mcclient.TokenCredential, parentTaskId string) error {
  617. return StartResourceSyncStatusTask(ctx, userCred, self, "ElasticSearchSyncstatusTask", parentTaskId)
  618. }
  619. func (self *SElasticSearch) StartRemoteUpdateTask(ctx context.Context, userCred mcclient.TokenCredential, replaceTags bool, parentTaskId string) error {
  620. data := jsonutils.NewDict()
  621. if replaceTags {
  622. data.Add(jsonutils.JSONTrue, "replace_tags")
  623. }
  624. if task, err := taskman.TaskManager.NewTask(ctx, "ElasticSearchRemoteUpdateTask", self, userCred, data, parentTaskId, "", nil); err != nil {
  625. return errors.Wrap(err, "Start ElasticSearchRemoteUpdateTask")
  626. } else {
  627. self.SetStatus(ctx, userCred, api.ELASTIC_SEARCH_UPDATE_TAGS, "StartRemoteUpdateTask")
  628. task.ScheduleRun(nil)
  629. }
  630. return nil
  631. }
  632. func (self *SElasticSearch) OnMetadataUpdated(ctx context.Context, userCred mcclient.TokenCredential) {
  633. if len(self.ExternalId) == 0 || options.Options.KeepTagLocalization {
  634. return
  635. }
  636. if account := self.GetCloudaccount(); account != nil && account.ReadOnly {
  637. return
  638. }
  639. err := self.StartRemoteUpdateTask(ctx, userCred, true, "")
  640. if err != nil {
  641. log.Errorf("StartRemoteUpdateTask fail: %s", err)
  642. }
  643. }