kafka.go 8.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329
  1. // Copyright 2019 Yunion
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package aliyun
  15. import (
  16. "fmt"
  17. "strings"
  18. "time"
  19. "yunion.io/x/jsonutils"
  20. "yunion.io/x/pkg/errors"
  21. billing_api "yunion.io/x/cloudmux/pkg/apis/billing"
  22. api "yunion.io/x/cloudmux/pkg/apis/compute"
  23. "yunion.io/x/cloudmux/pkg/cloudprovider"
  24. "yunion.io/x/cloudmux/pkg/multicloud"
  25. )
  26. type SKafka struct {
  27. multicloud.SBillingBase
  28. multicloud.SVirtualResourceBase
  29. AliyunTags
  30. region *SRegion
  31. AllConfig string `json:"AllConfig"`
  32. DeployType int `json:"DeployType"`
  33. SpecType string `json:"SpecType"`
  34. PaidType int `json:"PaidType"`
  35. InstanceId string `json:"InstanceId"`
  36. MsgRetain int `json:"MsgRetain"`
  37. ZoneId string `json:"ZoneId"`
  38. IoMax int `json:"IoMax"`
  39. VSwitchId string `json:"VSwitchId"`
  40. VpcId string `json:"VpcId"`
  41. UpgradeServiceDetailInfo struct {
  42. Current2OpenSourceVersion string `json:"Current2OpenSourceVersion"`
  43. } `json:"UpgradeServiceDetailInfo"`
  44. ServiceStatus int `json:"ServiceStatus"`
  45. Name string `json:"Name"`
  46. TopicNumLimit int `json:"TopicNumLimit"`
  47. DiskSize int `json:"DiskSize"`
  48. RegionId string `json:"RegionId"`
  49. CreateTime int64 `json:"CreateTime"`
  50. SslEndPoint string `json:"SslEndPoint"`
  51. EipMax int `json:"EipMax"`
  52. EndPoint string `json:"EndPoint"`
  53. ExpiredTime int64 `json:"ExpiredTime"`
  54. DiskType int `json:"DiskType"`
  55. SecurityGroup string `json:"SecurityGroup"`
  56. }
  57. func (self *SKafka) SetTags(tags map[string]string, replace bool) error {
  58. return self.region.SetResourceTags(ALIYUN_SERVICE_KAFKA, "INSTANCE", self.InstanceId, tags, replace)
  59. }
  60. func (self *SKafka) GetName() string {
  61. return self.Name
  62. }
  63. func (self *SKafka) GetGlobalId() string {
  64. return self.InstanceId
  65. }
  66. func (self *SKafka) GetId() string {
  67. return self.InstanceId
  68. }
  69. func (self *SKafka) GetVpcId() string {
  70. return self.VpcId
  71. }
  72. func (self *SKafka) GetNetworkId() string {
  73. return self.VSwitchId
  74. }
  75. func (self *SKafka) GetCreatedAt() time.Time {
  76. return time.Unix(self.CreateTime/1000, self.CreateTime%1000)
  77. }
  78. func (self *SKafka) GetBillingType() string {
  79. if self.PaidType == 0 {
  80. return billing_api.BILLING_TYPE_PREPAID
  81. }
  82. return billing_api.BILLING_TYPE_POSTPAID
  83. }
  84. func (self *SKafka) GetInstanceType() string {
  85. return self.SpecType
  86. }
  87. func (self *SKafka) GetDiskSizeGb() int {
  88. return self.DiskSize
  89. }
  90. func (self *SKafka) GetVersion() string {
  91. return self.UpgradeServiceDetailInfo.Current2OpenSourceVersion
  92. }
  93. func (self *SKafka) IsMultiAz() bool {
  94. return false
  95. }
  96. func (self *SKafka) GetStorageType() string {
  97. switch self.DiskType {
  98. case 0:
  99. return api.STORAGE_CLOUD_EFFICIENCY
  100. case 1:
  101. return api.STORAGE_CLOUD_SSD
  102. }
  103. return ""
  104. }
  105. func (self *SKafka) GetBandwidthMb() int {
  106. if self.EipMax > 0 {
  107. return self.EipMax
  108. }
  109. return self.IoMax
  110. }
  111. func (self *SKafka) GetEndpoint() string {
  112. ret := []string{}
  113. if len(self.EndPoint) > 0 {
  114. ret = append(ret, self.EndPoint)
  115. }
  116. if len(self.SslEndPoint) > 0 {
  117. ret = append(ret, self.SslEndPoint)
  118. }
  119. return strings.Join(ret, ",")
  120. }
  121. func (self *SKafka) GetMsgRetentionMinute() int {
  122. return self.MsgRetain * 60
  123. }
  124. func (self *SKafka) GetZoneId() string {
  125. if len(self.ZoneId) > 0 {
  126. return fmt.Sprintf("%s-%s", self.RegionId, strings.TrimPrefix(self.ZoneId, "zone"))
  127. }
  128. return ""
  129. }
  130. func (self *SKafka) GetStatus() string {
  131. switch self.ServiceStatus {
  132. case 0, 1, 2:
  133. return api.KAFKA_STATUS_CREATING
  134. case 5:
  135. return api.KAFKA_STATUS_AVAILABLE
  136. case 15:
  137. return api.KAFKA_STATUS_UNAVAILABLE
  138. }
  139. return api.KAFKA_STATUS_UNKNOWN
  140. }
  141. func (self *SKafka) Refresh() error {
  142. kafka, err := self.region.GetKafka(self.InstanceId)
  143. if err != nil {
  144. return errors.Wrapf(err, "GetKafka")
  145. }
  146. return jsonutils.Update(self, kafka)
  147. }
  148. func (self *SKafka) Delete() error {
  149. return self.region.DeleteKafka(self.InstanceId)
  150. }
  151. func (self *SRegion) GetICloudKafkaById(id string) (cloudprovider.ICloudKafka, error) {
  152. kafka, err := self.GetKafka(id)
  153. if err != nil {
  154. return nil, errors.Wrapf(err, "GetKafka(%s)", id)
  155. }
  156. return kafka, nil
  157. }
  158. func (self *SRegion) GetICloudKafkas() ([]cloudprovider.ICloudKafka, error) {
  159. kafkas, err := self.GetKafkas(nil)
  160. if err != nil {
  161. return nil, errors.Wrapf(err, "GetKafkas")
  162. }
  163. ret := []cloudprovider.ICloudKafka{}
  164. for i := range kafkas {
  165. kafkas[i].region = self
  166. ret = append(ret, &kafkas[i])
  167. }
  168. return ret, nil
  169. }
  170. func (self *SRegion) GetKafka(id string) (*SKafka, error) {
  171. kafkas, err := self.GetKafkas([]string{id})
  172. if err != nil {
  173. return nil, errors.Wrapf(err, "GetKafkas")
  174. }
  175. for i := range kafkas {
  176. if kafkas[i].GetGlobalId() == id {
  177. kafkas[i].region = self
  178. return &kafkas[i], nil
  179. }
  180. }
  181. return nil, errors.Wrapf(cloudprovider.ErrNotFound, "%s", id)
  182. }
  183. func (self *SRegion) GetKafkas(ids []string) ([]SKafka, error) {
  184. params := map[string]string{}
  185. for idx, id := range ids {
  186. params[fmt.Sprintf("InstanceId.%d", idx)] = id
  187. }
  188. resp, err := self.kafkaRequest("GetInstanceList", params)
  189. if err != nil {
  190. return nil, errors.Wrapf(err, "GetInstanceList")
  191. }
  192. ret := struct {
  193. Code int
  194. Message string
  195. RequestId string
  196. Success bool
  197. InstanceList struct {
  198. InstanceVO []SKafka
  199. }
  200. }{}
  201. err = resp.Unmarshal(&ret)
  202. if err != nil {
  203. return nil, errors.Wrapf(err, "resp.Unmarshal")
  204. }
  205. if ret.Code != 200 {
  206. return nil, errors.Errorf("message: %s requestId: %s", ret.Message, ret.RequestId)
  207. }
  208. for i := 0; i < len(ret.InstanceList.InstanceVO); i++ {
  209. ret.InstanceList.InstanceVO[i].region = self
  210. }
  211. return ret.InstanceList.InstanceVO, nil
  212. }
  213. func (self *SRegion) DeleteKafka(id string) error {
  214. params := map[string]string{
  215. "RegionId": self.RegionId,
  216. "InstanceId": id,
  217. }
  218. _, err := self.kafkaRequest("DeleteInstance", params)
  219. return errors.Wrapf(err, "DeleteInstance")
  220. }
  221. func (self *SRegion) ReleaseKafka(id string) error {
  222. params := map[string]string{
  223. "RegionId": self.RegionId,
  224. "InstanceId": id,
  225. "ForceDeleteInstance": "true",
  226. }
  227. _, err := self.kafkaRequest("ReleaseInstance", params)
  228. return errors.Wrapf(err, "ReleaseInstance")
  229. }
  230. func (self *SKafka) GetTopics() ([]cloudprovider.SKafkaTopic, error) {
  231. ret := []cloudprovider.SKafkaTopic{}
  232. pageSize := 100
  233. for {
  234. part, total, err := self.region.GetKafkaTopics(self.InstanceId, len(ret)/pageSize+1, pageSize)
  235. if err != nil {
  236. return nil, errors.Wrapf(err, "GetKafkaTopics")
  237. }
  238. ret = append(ret, part...)
  239. if len(ret) >= total {
  240. break
  241. }
  242. }
  243. return ret, nil
  244. }
  245. func (self *SRegion) GetKafkaTopics(id string, page int, pageSize int) ([]cloudprovider.SKafkaTopic, int, error) {
  246. if page < 1 {
  247. page = 1
  248. }
  249. if pageSize < 1 {
  250. pageSize = 50
  251. }
  252. params := map[string]string{
  253. "InstanceId": id,
  254. "CurrentPage": fmt.Sprintf("%d", page),
  255. "PageSize": fmt.Sprintf("%d", pageSize),
  256. }
  257. resp, err := self.kafkaRequest("GetTopicList", params)
  258. if err != nil {
  259. return nil, 0, errors.Wrapf(err, "GetTopicList")
  260. }
  261. result := struct {
  262. Code int
  263. Total int
  264. Message string
  265. TopicList struct {
  266. TopicVO []struct {
  267. CompactTopic bool
  268. CreateTime int
  269. InstanceId string
  270. LocalTopic bool
  271. PartitionNum int
  272. RegionId string
  273. Remark string
  274. Status int
  275. StatusName string
  276. Topic string
  277. }
  278. }
  279. }{}
  280. err = resp.Unmarshal(&result)
  281. if err != nil {
  282. return nil, 0, errors.Wrapf(err, "resp.Unmarshal")
  283. }
  284. ret := []cloudprovider.SKafkaTopic{}
  285. for _, topic := range result.TopicList.TopicVO {
  286. ret = append(ret, cloudprovider.SKafkaTopic{
  287. Id: topic.Topic,
  288. Name: topic.Topic,
  289. Description: topic.Remark,
  290. })
  291. }
  292. return ret, result.Total, nil
  293. }