kafka.go 23 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712
  1. // Copyright 2019 Yunion
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package models
  15. import (
  16. "context"
  17. "fmt"
  18. "strings"
  19. "time"
  20. "yunion.io/x/cloudmux/pkg/cloudprovider"
  21. "yunion.io/x/jsonutils"
  22. "yunion.io/x/log"
  23. "yunion.io/x/pkg/errors"
  24. "yunion.io/x/pkg/util/compare"
  25. "yunion.io/x/pkg/util/rbacscope"
  26. "yunion.io/x/sqlchemy"
  27. billing_api "yunion.io/x/onecloud/pkg/apis/billing"
  28. api "yunion.io/x/onecloud/pkg/apis/compute"
  29. "yunion.io/x/onecloud/pkg/cloudcommon/db"
  30. "yunion.io/x/onecloud/pkg/cloudcommon/db/lockman"
  31. "yunion.io/x/onecloud/pkg/cloudcommon/db/taskman"
  32. "yunion.io/x/onecloud/pkg/cloudcommon/notifyclient"
  33. "yunion.io/x/onecloud/pkg/compute/options"
  34. "yunion.io/x/onecloud/pkg/httperrors"
  35. "yunion.io/x/onecloud/pkg/mcclient"
  36. "yunion.io/x/onecloud/pkg/util/rbacutils"
  37. "yunion.io/x/onecloud/pkg/util/stringutils2"
  38. )
  39. type SKafkaManager struct {
  40. db.SVirtualResourceBaseManager
  41. db.SExternalizedResourceBaseManager
  42. SDeletePreventableResourceBaseManager
  43. SCloudregionResourceBaseManager
  44. SManagedResourceBaseManager
  45. SVpcResourceBaseManager
  46. }
  47. var KafkaManager *SKafkaManager
  48. func init() {
  49. KafkaManager = &SKafkaManager{
  50. SVirtualResourceBaseManager: db.NewVirtualResourceBaseManager(
  51. SKafka{},
  52. "kafkas_tbl",
  53. "kafka",
  54. "kafkas",
  55. ),
  56. }
  57. KafkaManager.SetVirtualObject(KafkaManager)
  58. }
  59. type SKafka struct {
  60. db.SVirtualResourceBase
  61. db.SExternalizedResourceBase
  62. SManagedResourceBase
  63. SBillingResourceBase
  64. SCloudregionResourceBase
  65. SDeletePreventableResourceBase
  66. // 版本
  67. Version string `width:"16" charset:"ascii" nullable:"false" list:"user" create:"required"`
  68. // 套餐名称
  69. // example: elasticsearch.sn2ne.xlarge
  70. InstanceType string `width:"64" charset:"utf8" nullable:"true" list:"user" create:"optional"`
  71. // 存储类型
  72. // example: local_ssd
  73. StorageType string `nullable:"true" list:"user" create:"required"`
  74. // 存储大小
  75. // example: 1024
  76. DiskSizeGb int `nullable:"false" list:"user" create:"required"`
  77. // 带宽峰值
  78. // example: 1024
  79. BandwidthMb int `nullable:"false" list:"user" create:"required"`
  80. // 消息保留时长
  81. MsgRetentionMinute int `nullable:"false" list:"user" create:"required"`
  82. // 连接端点
  83. Endpoint string `width:"256" charset:"ascii" nullable:"false" list:"user" create:"optional"`
  84. VpcId string `width:"36" charset:"ascii" nullable:"true" list:"user" create:"optional" json:"vpc_id"`
  85. NetworkId string `width:"36" charset:"ascii" nullable:"true" list:"user" create:"optional" json:"network_id"`
  86. // 可用区Id
  87. ZoneId string `width:"36" charset:"ascii" nullable:"true" list:"user" create:"optional" json:"zone_id"`
  88. // 是否是多可用区部署
  89. IsMultiAz bool `nullable:"false" default:"false" list:"user" update:"user" create:"optional"`
  90. }
  91. func (manager *SKafkaManager) GetContextManagers() [][]db.IModelManager {
  92. return [][]db.IModelManager{
  93. {CloudregionManager},
  94. }
  95. }
  96. // Kafka实例列表
  97. func (man *SKafkaManager) ListItemFilter(
  98. ctx context.Context,
  99. q *sqlchemy.SQuery,
  100. userCred mcclient.TokenCredential,
  101. query api.KafkaListInput,
  102. ) (*sqlchemy.SQuery, error) {
  103. var err error
  104. q, err = man.SVirtualResourceBaseManager.ListItemFilter(ctx, q, userCred, query.VirtualResourceListInput)
  105. if err != nil {
  106. return nil, errors.Wrap(err, "SVirtualResourceBaseManager.ListItemFilter")
  107. }
  108. q, err = man.SExternalizedResourceBaseManager.ListItemFilter(ctx, q, userCred, query.ExternalizedResourceBaseListInput)
  109. if err != nil {
  110. return nil, errors.Wrap(err, "SExternalizedResourceBaseManager.ListItemFilter")
  111. }
  112. q, err = man.SDeletePreventableResourceBaseManager.ListItemFilter(ctx, q, userCred, query.DeletePreventableResourceBaseListInput)
  113. if err != nil {
  114. return nil, errors.Wrap(err, "SDeletePreventableResourceBaseManager.ListItemFilter")
  115. }
  116. q, err = man.SManagedResourceBaseManager.ListItemFilter(ctx, q, userCred, query.ManagedResourceListInput)
  117. if err != nil {
  118. return nil, errors.Wrap(err, "SManagedResourceBaseManager.ListItemFilter")
  119. }
  120. q, err = man.SCloudregionResourceBaseManager.ListItemFilter(ctx, q, userCred, query.RegionalFilterListInput)
  121. if err != nil {
  122. return nil, errors.Wrap(err, "SCloudregionResourceBaseManager.ListItemFilter")
  123. }
  124. q, err = man.SVpcResourceBaseManager.ListItemFilter(ctx, q, userCred, query.VpcFilterListInput)
  125. if err != nil {
  126. return nil, errors.Wrap(err, "SVpcResourceBaseManager.ListItemFilter")
  127. }
  128. return q, nil
  129. }
  130. func (man *SKafkaManager) OrderByExtraFields(
  131. ctx context.Context,
  132. q *sqlchemy.SQuery,
  133. userCred mcclient.TokenCredential,
  134. query api.KafkaListInput,
  135. ) (*sqlchemy.SQuery, error) {
  136. q, err := man.SVirtualResourceBaseManager.OrderByExtraFields(ctx, q, userCred, query.VirtualResourceListInput)
  137. if err != nil {
  138. return nil, errors.Wrap(err, "SVirtualResourceBaseManager.OrderByExtraFields")
  139. }
  140. q, err = man.SCloudregionResourceBaseManager.OrderByExtraFields(ctx, q, userCred, query.RegionalFilterListInput)
  141. if err != nil {
  142. return nil, errors.Wrap(err, "SCloudregionResourceBaseManager.OrderByExtraFields")
  143. }
  144. q, err = man.SManagedResourceBaseManager.OrderByExtraFields(ctx, q, userCred, query.ManagedResourceListInput)
  145. if err != nil {
  146. return nil, errors.Wrap(err, "SManagedResourceBaseManager.OrderByExtraFields")
  147. }
  148. q, err = man.SVpcResourceBaseManager.OrderByExtraFields(ctx, q, userCred, query.VpcFilterListInput)
  149. if err != nil {
  150. return nil, errors.Wrap(err, "SVpcResourceBaseManager.OrderByExtraFields")
  151. }
  152. return q, nil
  153. }
  154. func (man *SKafkaManager) QueryDistinctExtraField(q *sqlchemy.SQuery, field string) (*sqlchemy.SQuery, error) {
  155. q, err := man.SVirtualResourceBaseManager.QueryDistinctExtraField(q, field)
  156. if err == nil {
  157. return q, nil
  158. }
  159. q, err = man.SCloudregionResourceBaseManager.QueryDistinctExtraField(q, field)
  160. if err == nil {
  161. return q, nil
  162. }
  163. q, err = man.SManagedResourceBaseManager.QueryDistinctExtraField(q, field)
  164. if err == nil {
  165. return q, nil
  166. }
  167. q, err = man.SVpcResourceBaseManager.QueryDistinctExtraField(q, field)
  168. if err == nil {
  169. return q, nil
  170. }
  171. return q, httperrors.ErrNotFound
  172. }
  173. func (manager *SKafkaManager) QueryDistinctExtraFields(q *sqlchemy.SQuery, resource string, fields []string) (*sqlchemy.SQuery, error) {
  174. var err error
  175. q, err = manager.SManagedResourceBaseManager.QueryDistinctExtraFields(q, resource, fields)
  176. if err == nil {
  177. return q, nil
  178. }
  179. return q, httperrors.ErrNotFound
  180. }
  181. func (man *SKafkaManager) ValidateCreateData(ctx context.Context, userCred mcclient.TokenCredential, ownerId mcclient.IIdentityProvider, query jsonutils.JSONObject, input api.KafkaCreateInput) (api.KafkaCreateInput, error) {
  182. return input, httperrors.NewNotImplementedError("Not Implemented")
  183. }
  184. func (manager *SKafkaManager) FetchCustomizeColumns(
  185. ctx context.Context,
  186. userCred mcclient.TokenCredential,
  187. query jsonutils.JSONObject,
  188. objs []interface{},
  189. fields stringutils2.SSortedStrings,
  190. isList bool,
  191. ) []api.KafkaDetails {
  192. rows := make([]api.KafkaDetails, len(objs))
  193. virtRows := manager.SVirtualResourceBaseManager.FetchCustomizeColumns(ctx, userCred, query, objs, fields, isList)
  194. manRows := manager.SManagedResourceBaseManager.FetchCustomizeColumns(ctx, userCred, query, objs, fields, isList)
  195. regRows := manager.SCloudregionResourceBaseManager.FetchCustomizeColumns(ctx, userCred, query, objs, fields, isList)
  196. vpcIds := make([]string, len(objs))
  197. netIds := make([]string, len(objs))
  198. zoneIds := make([]string, len(objs))
  199. for i := range rows {
  200. rows[i] = api.KafkaDetails{
  201. VirtualResourceDetails: virtRows[i],
  202. ManagedResourceInfo: manRows[i],
  203. CloudregionResourceInfo: regRows[i],
  204. }
  205. kafka := objs[i].(*SKafka)
  206. vpcIds[i] = kafka.VpcId
  207. netIds[i] = kafka.NetworkId
  208. zoneIds[i] = kafka.ZoneId
  209. }
  210. vpcMaps, err := db.FetchIdNameMap2(VpcManager, vpcIds)
  211. if err != nil {
  212. return rows
  213. }
  214. netMaps, err := db.FetchIdNameMap2(NetworkManager, netIds)
  215. if err != nil {
  216. return rows
  217. }
  218. zoneMaps, err := db.FetchIdNameMap2(ZoneManager, zoneIds)
  219. if err != nil {
  220. return rows
  221. }
  222. for i := range rows {
  223. rows[i].Vpc, _ = vpcMaps[vpcIds[i]]
  224. rows[i].Network, _ = netMaps[netIds[i]]
  225. rows[i].Zone, _ = zoneMaps[zoneIds[i]]
  226. }
  227. return rows
  228. }
  229. func (self *SCloudregion) GetKafkas(managerId string) ([]SKafka, error) {
  230. q := KafkaManager.Query().Equals("cloudregion_id", self.Id)
  231. if len(managerId) > 0 {
  232. q = q.Equals("manager_id", managerId)
  233. }
  234. ret := []SKafka{}
  235. err := db.FetchModelObjects(KafkaManager, q, &ret)
  236. if err != nil {
  237. return nil, errors.Wrapf(err, "db.FetchModelObjects")
  238. }
  239. return ret, nil
  240. }
  241. func (self *SCloudregion) SyncKafkas(
  242. ctx context.Context,
  243. userCred mcclient.TokenCredential,
  244. provider *SCloudprovider,
  245. exts []cloudprovider.ICloudKafka,
  246. xor bool,
  247. ) compare.SyncResult {
  248. lockman.LockRawObject(ctx, KafkaManager.KeywordPlural(), fmt.Sprintf("%s-%s", provider.Id, self.Id))
  249. defer lockman.ReleaseRawObject(ctx, KafkaManager.KeywordPlural(), fmt.Sprintf("%s-%s", provider.Id, self.Id))
  250. result := compare.SyncResult{}
  251. dbEss, err := self.GetKafkas(provider.Id)
  252. if err != nil {
  253. result.Error(err)
  254. return result
  255. }
  256. removed := make([]SKafka, 0)
  257. commondb := make([]SKafka, 0)
  258. commonext := make([]cloudprovider.ICloudKafka, 0)
  259. added := make([]cloudprovider.ICloudKafka, 0)
  260. // 本地和云上资源列表进行比对
  261. err = compare.CompareSets(dbEss, exts, &removed, &commondb, &commonext, &added)
  262. if err != nil {
  263. result.Error(err)
  264. return result
  265. }
  266. // 删除云上没有的资源
  267. for i := 0; i < len(removed); i++ {
  268. err := removed[i].syncRemoveCloudKafka(ctx, userCred)
  269. if err != nil {
  270. result.DeleteError(err)
  271. continue
  272. }
  273. result.Delete()
  274. }
  275. if !xor {
  276. // 和云上资源属性进行同步
  277. for i := 0; i < len(commondb); i++ {
  278. err := commondb[i].SyncWithCloudKafka(ctx, userCred, commonext[i])
  279. if err != nil {
  280. result.UpdateError(err)
  281. continue
  282. }
  283. result.Update()
  284. }
  285. }
  286. // 创建本地没有的云上资源
  287. for i := 0; i < len(added); i++ {
  288. _, err := self.newFromCloudKafka(ctx, userCred, provider, added[i])
  289. if err != nil {
  290. result.AddError(err)
  291. continue
  292. }
  293. result.Add()
  294. }
  295. return result
  296. }
  297. type SKafkaCountStat struct {
  298. TotalKafkaCount int
  299. TotalDiskSizeGb int
  300. }
  301. func (man *SKafkaManager) TotalCount(
  302. ctx context.Context,
  303. scope rbacscope.TRbacScope,
  304. ownerId mcclient.IIdentityProvider,
  305. rangeObjs []db.IStandaloneModel,
  306. providers []string, brands []string, cloudEnv string,
  307. policyResult rbacutils.SPolicyResult,
  308. ) (SKafkaCountStat, error) {
  309. kq := man.Query()
  310. kq = scopeOwnerIdFilter(kq, scope, ownerId)
  311. kq = CloudProviderFilter(kq, kq.Field("manager_id"), providers, brands, cloudEnv)
  312. kq = RangeObjectsFilter(kq, rangeObjs, kq.Field("cloudregion_id"), nil, kq.Field("manager_id"), nil, nil)
  313. kq = db.ObjectIdQueryWithPolicyResult(ctx, kq, man, policyResult)
  314. sq := kq.SubQuery()
  315. q := sq.Query(sqlchemy.COUNT("total_kafka_count"),
  316. sqlchemy.SUM("total_disk_size_gb", sq.Field("disk_size_gb")))
  317. stat := SKafkaCountStat{}
  318. row := q.Row()
  319. err := q.Row2Struct(row, &stat)
  320. return stat, err
  321. }
  322. // 判断资源是否可以删除
  323. func (self *SKafka) ValidateDeleteCondition(ctx context.Context, info jsonutils.JSONObject) error {
  324. if self.DisableDelete.IsTrue() {
  325. return httperrors.NewInvalidStatusError("Kafka is locked, cannot delete")
  326. }
  327. return self.SStatusStandaloneResourceBase.ValidateDeleteCondition(ctx, nil)
  328. }
  329. func (self *SKafka) Delete(ctx context.Context, userCred mcclient.TokenCredential) error {
  330. return nil
  331. }
  332. func (self *SKafka) RealDelete(ctx context.Context, userCred mcclient.TokenCredential) error {
  333. return self.SVirtualResourceBase.Delete(ctx, userCred)
  334. }
  335. func (self *SKafka) CustomizeDelete(ctx context.Context, userCred mcclient.TokenCredential, query jsonutils.JSONObject, data jsonutils.JSONObject) error {
  336. return self.StartDeleteTask(ctx, userCred, "")
  337. }
  338. func (self *SKafka) StartDeleteTask(ctx context.Context, userCred mcclient.TokenCredential, parentTaskId string) error {
  339. task, err := taskman.TaskManager.NewTask(ctx, "KafkaDeleteTask", self, userCred, nil, parentTaskId, "", nil)
  340. if err != nil {
  341. return err
  342. }
  343. self.SetStatus(ctx, userCred, api.KAFKA_STATUS_DELETING, "")
  344. task.ScheduleRun(nil)
  345. return nil
  346. }
  347. func (self *SKafka) GetIRegion(ctx context.Context) (cloudprovider.ICloudRegion, error) {
  348. region, err := self.GetRegion()
  349. if err != nil {
  350. return nil, errors.Wrapf(err, "GetRegion")
  351. }
  352. provider, err := self.GetDriver(ctx)
  353. if err != nil {
  354. return nil, errors.Wrap(err, "self.GetDriver")
  355. }
  356. return provider.GetIRegionById(region.GetExternalId())
  357. }
  358. func (self *SKafka) GetIKafka(ctx context.Context) (cloudprovider.ICloudKafka, error) {
  359. if len(self.ExternalId) == 0 {
  360. return nil, errors.Wrapf(cloudprovider.ErrNotFound, "empty externalId")
  361. }
  362. iRegion, err := self.GetIRegion(ctx)
  363. if err != nil {
  364. return nil, errors.Wrapf(cloudprovider.ErrNotFound, "GetIRegion")
  365. }
  366. return iRegion.GetICloudKafkaById(self.ExternalId)
  367. }
  368. func (self *SKafka) syncRemoveCloudKafka(ctx context.Context, userCred mcclient.TokenCredential) error {
  369. err := self.RealDelete(ctx, userCred)
  370. if err != nil {
  371. return err
  372. }
  373. notifyclient.EventNotify(ctx, userCred, notifyclient.SEventNotifyParam{
  374. Obj: self,
  375. Action: notifyclient.ActionSyncDelete,
  376. })
  377. return nil
  378. }
  379. // 同步资源属性
  380. func (self *SKafka) SyncWithCloudKafka(ctx context.Context, userCred mcclient.TokenCredential, ext cloudprovider.ICloudKafka) error {
  381. diff, err := db.UpdateWithLock(ctx, self, func() error {
  382. if options.Options.EnableSyncName {
  383. newName, _ := db.GenerateAlterName(self, ext.GetName())
  384. if len(newName) > 0 {
  385. self.Name = newName
  386. }
  387. }
  388. self.Status = ext.GetStatus()
  389. self.InstanceType = ext.GetInstanceType()
  390. self.Version = ext.GetVersion()
  391. self.DiskSizeGb = ext.GetDiskSizeGb()
  392. self.StorageType = ext.GetStorageType()
  393. self.Version = ext.GetVersion()
  394. self.BandwidthMb = ext.GetBandwidthMb()
  395. self.Endpoint = ext.GetEndpoint()
  396. self.MsgRetentionMinute = ext.GetMsgRetentionMinute()
  397. self.IsMultiAz = ext.IsMultiAz()
  398. self.BillingType = billing_api.TBillingType(ext.GetBillingType())
  399. self.ExpiredAt = time.Time{}
  400. self.AutoRenew = false
  401. if self.BillingType == billing_api.BILLING_TYPE_PREPAID {
  402. self.ExpiredAt = ext.GetExpiredAt()
  403. self.AutoRenew = ext.IsAutoRenew()
  404. }
  405. if networkId := ext.GetNetworkId(); len(networkId) > 0 && len(self.NetworkId) == 0 {
  406. _network, err := db.FetchByExternalIdAndManagerId(NetworkManager, networkId, func(q *sqlchemy.SQuery) *sqlchemy.SQuery {
  407. wire := WireManager.Query().SubQuery()
  408. vpc := VpcManager.Query().SubQuery()
  409. return q.Join(wire, sqlchemy.Equals(wire.Field("id"), q.Field("wire_id"))).
  410. Join(vpc, sqlchemy.Equals(vpc.Field("id"), wire.Field("vpc_id"))).
  411. Filter(sqlchemy.Equals(vpc.Field("manager_id"), self.ManagerId))
  412. })
  413. if err != nil {
  414. log.Errorf("failed to found network for kafka %s by externalId: %s", self.Name, networkId)
  415. } else {
  416. network := _network.(*SNetwork)
  417. self.NetworkId = network.Id
  418. vpc, _ := network.GetVpc()
  419. self.VpcId = vpc.Id
  420. if zone, _ := network.GetZone(); zone != nil {
  421. self.ZoneId = zone.Id
  422. }
  423. }
  424. }
  425. if vpcId := ext.GetVpcId(); len(vpcId) > 0 && len(self.VpcId) == 0 {
  426. vpc, err := db.FetchByExternalIdAndManagerId(VpcManager, vpcId, func(q *sqlchemy.SQuery) *sqlchemy.SQuery {
  427. return q.Equals("manager_id", self.ManagerId)
  428. })
  429. if err == nil {
  430. self.VpcId = vpc.GetId()
  431. }
  432. }
  433. if len(self.ZoneId) == 0 {
  434. zoneId := ext.GetZoneId()
  435. if len(zoneId) > 0 {
  436. region, err := self.GetRegion()
  437. if err != nil {
  438. return errors.Wrapf(err, "GetRegion")
  439. }
  440. zones, err := region.GetZones()
  441. if err != nil {
  442. return errors.Wrapf(err, "GetZones")
  443. }
  444. for _, zone := range zones {
  445. if strings.HasSuffix(zone.ExternalId, zoneId) {
  446. self.ZoneId = zone.Id
  447. break
  448. }
  449. }
  450. }
  451. }
  452. return nil
  453. })
  454. if err != nil {
  455. return errors.Wrapf(err, "db.Update")
  456. }
  457. if len(diff) > 0 {
  458. notifyclient.EventNotify(ctx, userCred, notifyclient.SEventNotifyParam{
  459. Obj: self,
  460. Action: notifyclient.ActionSyncUpdate,
  461. })
  462. }
  463. if account := self.GetCloudaccount(); account != nil {
  464. syncVirtualResourceMetadata(ctx, userCred, self, ext, account.ReadOnly)
  465. }
  466. if provider := self.GetCloudprovider(); provider != nil {
  467. SyncCloudProject(ctx, userCred, self, provider.GetOwnerId(), ext, provider)
  468. }
  469. db.OpsLog.LogSyncUpdate(self, diff, userCred)
  470. return nil
  471. }
  472. func (self *SCloudregion) newFromCloudKafka(ctx context.Context, userCred mcclient.TokenCredential, provider *SCloudprovider, ext cloudprovider.ICloudKafka) (*SKafka, error) {
  473. kafka := SKafka{}
  474. kafka.SetModelManager(KafkaManager, &kafka)
  475. kafka.ExternalId = ext.GetGlobalId()
  476. kafka.CloudregionId = self.Id
  477. kafka.ManagerId = provider.Id
  478. kafka.IsEmulated = ext.IsEmulated()
  479. kafka.Status = ext.GetStatus()
  480. kafka.InstanceType = ext.GetInstanceType()
  481. kafka.Version = ext.GetVersion()
  482. kafka.DiskSizeGb = ext.GetDiskSizeGb()
  483. kafka.StorageType = ext.GetStorageType()
  484. kafka.Version = ext.GetVersion()
  485. kafka.BandwidthMb = ext.GetBandwidthMb()
  486. kafka.Endpoint = ext.GetEndpoint()
  487. kafka.MsgRetentionMinute = ext.GetMsgRetentionMinute()
  488. kafka.IsMultiAz = ext.IsMultiAz()
  489. if createdAt := ext.GetCreatedAt(); !createdAt.IsZero() {
  490. kafka.CreatedAt = createdAt
  491. }
  492. kafka.BillingType = billing_api.TBillingType(ext.GetBillingType())
  493. kafka.ExpiredAt = time.Time{}
  494. kafka.AutoRenew = false
  495. if kafka.BillingType == billing_api.BILLING_TYPE_PREPAID {
  496. kafka.ExpiredAt = ext.GetExpiredAt()
  497. kafka.AutoRenew = ext.IsAutoRenew()
  498. }
  499. if networkId := ext.GetNetworkId(); len(networkId) > 0 {
  500. _network, err := db.FetchByExternalIdAndManagerId(NetworkManager, networkId, func(q *sqlchemy.SQuery) *sqlchemy.SQuery {
  501. wire := WireManager.Query().SubQuery()
  502. vpc := VpcManager.Query().SubQuery()
  503. return q.Join(wire, sqlchemy.Equals(wire.Field("id"), q.Field("wire_id"))).
  504. Join(vpc, sqlchemy.Equals(vpc.Field("id"), wire.Field("vpc_id"))).
  505. Filter(sqlchemy.Equals(vpc.Field("manager_id"), provider.Id))
  506. })
  507. if err != nil {
  508. log.Errorf("failed to found network for kafka %s by externalId: %s", kafka.Name, networkId)
  509. } else {
  510. network := _network.(*SNetwork)
  511. kafka.NetworkId = network.Id
  512. vpc, _ := network.GetVpc()
  513. kafka.VpcId = vpc.Id
  514. if zone, _ := network.GetZone(); zone != nil {
  515. kafka.ZoneId = zone.Id
  516. }
  517. }
  518. }
  519. if len(kafka.VpcId) == 0 {
  520. if vpcId := ext.GetVpcId(); len(vpcId) > 0 {
  521. vpc, err := db.FetchByExternalIdAndManagerId(VpcManager, vpcId, func(q *sqlchemy.SQuery) *sqlchemy.SQuery {
  522. return q.Equals("manager_id", provider.Id)
  523. })
  524. if err == nil {
  525. kafka.VpcId = vpc.GetId()
  526. }
  527. }
  528. }
  529. if len(kafka.ZoneId) == 0 {
  530. zoneId := ext.GetZoneId()
  531. if len(zoneId) > 0 {
  532. zones, _ := self.GetZones()
  533. for _, zone := range zones {
  534. if strings.HasSuffix(zone.ExternalId, zoneId) {
  535. kafka.ZoneId = zone.Id
  536. break
  537. }
  538. }
  539. }
  540. }
  541. var err error
  542. err = func() error {
  543. // 这里加锁是为了防止名称重复
  544. lockman.LockRawObject(ctx, KafkaManager.Keyword(), "name")
  545. defer lockman.ReleaseRawObject(ctx, KafkaManager.Keyword(), "name")
  546. kafka.Name, err = db.GenerateName(ctx, KafkaManager, provider.GetOwnerId(), ext.GetName())
  547. if err != nil {
  548. return errors.Wrapf(err, "db.GenerateName")
  549. }
  550. return KafkaManager.TableSpec().Insert(ctx, &kafka)
  551. }()
  552. if err != nil {
  553. return nil, errors.Wrapf(err, "newFromCloudKafka.Insert")
  554. }
  555. notifyclient.EventNotify(ctx, userCred, notifyclient.SEventNotifyParam{
  556. Obj: &kafka,
  557. Action: notifyclient.ActionSyncCreate,
  558. })
  559. // 同步标签
  560. syncVirtualResourceMetadata(ctx, userCred, &kafka, ext, false)
  561. // 同步项目归属
  562. SyncCloudProject(ctx, userCred, &kafka, provider.GetOwnerId(), ext, provider)
  563. db.OpsLog.LogEvent(&kafka, db.ACT_CREATE, kafka.GetShortDesc(ctx), userCred)
  564. return &kafka, nil
  565. }
  566. func (manager *SKafkaManager) ListItemExportKeys(ctx context.Context,
  567. q *sqlchemy.SQuery,
  568. userCred mcclient.TokenCredential,
  569. keys stringutils2.SSortedStrings,
  570. ) (*sqlchemy.SQuery, error) {
  571. var err error
  572. q, err = manager.SVirtualResourceBaseManager.ListItemExportKeys(ctx, q, userCred, keys)
  573. if err != nil {
  574. return nil, errors.Wrap(err, "SVirtualResourceBaseManager.ListItemExportKeys")
  575. }
  576. if keys.ContainsAny(manager.SManagedResourceBaseManager.GetExportKeys()...) {
  577. q, err = manager.SManagedResourceBaseManager.ListItemExportKeys(ctx, q, userCred, keys)
  578. if err != nil {
  579. return nil, errors.Wrap(err, "SManagedResourceBaseManager.ListItemExportKeys")
  580. }
  581. }
  582. if keys.ContainsAny(manager.SCloudregionResourceBaseManager.GetExportKeys()...) {
  583. q, err = manager.SCloudregionResourceBaseManager.ListItemExportKeys(ctx, q, userCred, keys)
  584. if err != nil {
  585. return nil, errors.Wrap(err, "SCloudregionResourceBaseManager.ListItemExportKeys")
  586. }
  587. }
  588. return q, nil
  589. }
  590. // 同步Kafka实例状态
  591. func (self *SKafka) PerformSyncstatus(ctx context.Context, userCred mcclient.TokenCredential, query jsonutils.JSONObject, data jsonutils.JSONObject) (jsonutils.JSONObject, error) {
  592. var openTask = true
  593. count, err := taskman.TaskManager.QueryTasksOfObject(self, time.Now().Add(-3*time.Minute), &openTask).CountWithError()
  594. if err != nil {
  595. return nil, err
  596. }
  597. if count > 0 {
  598. return nil, httperrors.NewBadRequestError("Kafka has %d task active, can't sync status", count)
  599. }
  600. return nil, StartResourceSyncStatusTask(ctx, userCred, self, "KafkaSyncstatusTask", "")
  601. }
  602. func (self *SKafka) StartKafkaSyncTask(ctx context.Context, userCred mcclient.TokenCredential, parentTaskId string) error {
  603. return StartResourceSyncStatusTask(ctx, userCred, self, "KafkaSyncstatusTask", parentTaskId)
  604. }
  605. // 获取Kafka Topic列表
  606. func (self *SKafka) GetDetailsTopics(ctx context.Context, userCred mcclient.TokenCredential, query jsonutils.JSONObject) ([]cloudprovider.SKafkaTopic, error) {
  607. iKafka, err := self.GetIKafka(ctx)
  608. if err != nil {
  609. return nil, httperrors.NewGeneralError(errors.Wrapf(err, "GetIKafka"))
  610. }
  611. return iKafka.GetTopics()
  612. }
  613. func (self *SKafka) StartRemoteUpdateTask(ctx context.Context, userCred mcclient.TokenCredential, replaceTags bool, parentTaskId string) error {
  614. data := jsonutils.NewDict()
  615. if replaceTags {
  616. data.Add(jsonutils.JSONTrue, "replace_tags")
  617. }
  618. if task, err := taskman.TaskManager.NewTask(ctx, "KafkaRemoteUpdateTask", self, userCred, data, parentTaskId, "", nil); err != nil {
  619. return errors.Wrap(err, "Start ElasticSearchRemoteUpdateTask")
  620. } else {
  621. self.SetStatus(ctx, userCred, api.ELASTIC_SEARCH_UPDATE_TAGS, "StartRemoteUpdateTask")
  622. task.ScheduleRun(nil)
  623. }
  624. return nil
  625. }
  626. func (self *SKafka) OnMetadataUpdated(ctx context.Context, userCred mcclient.TokenCredential) {
  627. if len(self.ExternalId) == 0 || options.Options.KeepTagLocalization {
  628. return
  629. }
  630. if account := self.GetCloudaccount(); account != nil && account.ReadOnly {
  631. return
  632. }
  633. err := self.StartRemoteUpdateTask(ctx, userCred, true, "")
  634. if err != nil {
  635. log.Errorf("StartRemoteUpdateTask fail: %s", err)
  636. }
  637. }