kafka.go 5.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254
  1. // Copyright 2019 Yunion
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package qcloud
  15. import (
  16. "fmt"
  17. "strings"
  18. "time"
  19. "yunion.io/x/jsonutils"
  20. "yunion.io/x/pkg/errors"
  21. "yunion.io/x/pkg/utils"
  22. billing_api "yunion.io/x/cloudmux/pkg/apis/billing"
  23. api "yunion.io/x/cloudmux/pkg/apis/compute"
  24. "yunion.io/x/cloudmux/pkg/cloudprovider"
  25. "yunion.io/x/cloudmux/pkg/multicloud"
  26. )
  27. type SKafka struct {
  28. multicloud.SVirtualResourceBase
  29. multicloud.SBillingBase
  30. QcloudTags
  31. region *SRegion
  32. InstanceId string
  33. InstanceName string
  34. Vip string
  35. Vport string
  36. VipList []struct {
  37. Vip string
  38. Vport string
  39. }
  40. Status int
  41. Bandwidth int
  42. DiskSize int
  43. ZoneId int
  44. VpcId string
  45. SubnetId string
  46. RenewFlag int
  47. Healthy int
  48. HealthyMessage string
  49. CreateTime int
  50. ExpireTime int
  51. IsInternal int
  52. TopicNum int
  53. Version string
  54. ZoneIds []int
  55. Cvm int
  56. InstanceType string
  57. DiskType string
  58. MaxTopicNumber int
  59. MaxPartitionNubmer int
  60. RebalanceTime string
  61. MsgRetentionTime int
  62. }
  63. func (self *SKafka) GetName() string {
  64. if len(self.InstanceName) > 0 {
  65. return self.InstanceName
  66. }
  67. return self.InstanceId
  68. }
  69. func (self *SKafka) GetId() string {
  70. return self.InstanceId
  71. }
  72. func (self *SKafka) SetTags(tags map[string]string, replace bool) error {
  73. return self.region.SetResourceTags("ckafka", "instance", []string{self.InstanceId}, tags, replace)
  74. }
  75. func (self *SKafka) GetGlobalId() string {
  76. return self.InstanceId
  77. }
  78. func (self *SKafka) GetVpcId() string {
  79. return self.VpcId
  80. }
  81. func (self *SKafka) GetNetworkId() string {
  82. return self.SubnetId
  83. }
  84. func (self *SKafka) IsAutoRenew() bool {
  85. return self.RenewFlag == 1
  86. }
  87. func (self *SKafka) GetBillingType() string {
  88. return billing_api.BILLING_TYPE_PREPAID
  89. }
  90. func (self *SKafka) GetCreatedAt() time.Time {
  91. return time.Unix(int64(self.CreateTime), 0)
  92. }
  93. func (self *SKafka) GetExpiredAt() time.Time {
  94. return time.Unix(int64(self.ExpireTime), 0)
  95. }
  96. func (self *SKafka) GetInstanceType() string {
  97. return self.InstanceType
  98. }
  99. func (self *SKafka) GetDiskSizeGb() int {
  100. return self.DiskSize
  101. }
  102. func (self *SKafka) GetVersion() string {
  103. return self.Version
  104. }
  105. func (self *SKafka) IsMultiAz() bool {
  106. return len(self.ZoneIds) > 1
  107. }
  108. func (self *SKafka) GetBandwidthMb() int {
  109. return self.Bandwidth / 8
  110. }
  111. func (self *SKafka) GetStorageType() string {
  112. return self.DiskType
  113. }
  114. func (self *SKafka) GetEndpoint() string {
  115. endpoints := []string{}
  116. var add = func(vip, vport string) {
  117. endpoint := fmt.Sprintf("%s:%s", vip, vport)
  118. if len(vip) > 0 && len(vport) > 0 && !utils.IsInStringArray(endpoint, endpoints) {
  119. endpoints = append(endpoints, endpoint)
  120. }
  121. }
  122. for _, ed := range self.VipList {
  123. add(ed.Vip, ed.Vport)
  124. }
  125. add(self.Vip, self.Vport)
  126. return strings.Join(endpoints, ",")
  127. }
  128. func (self *SKafka) GetStatus() string {
  129. switch self.Status {
  130. case 0:
  131. return api.KAFKA_STATUS_CREATING
  132. case 1:
  133. return api.KAFKA_STATUS_AVAILABLE
  134. case 2:
  135. return api.KAFKA_STATUS_DELETING
  136. default:
  137. return fmt.Sprintf("%d", self.Status)
  138. }
  139. }
  140. func (self *SKafka) GetMsgRetentionMinute() int {
  141. if self.MsgRetentionTime == 0 {
  142. self.Refresh()
  143. }
  144. return self.MsgRetentionTime
  145. }
  146. func (self *SKafka) GetZoneId() string {
  147. return fmt.Sprintf("%s-%d", self.region.Region, self.ZoneId)
  148. }
  149. func (self *SKafka) Refresh() error {
  150. kafka, err := self.region.GetKafka(self.InstanceId)
  151. if err != nil {
  152. return err
  153. }
  154. return jsonutils.Update(self, kafka)
  155. }
  156. func (self *SKafka) Delete() error {
  157. return cloudprovider.ErrNotSupported
  158. }
  159. func (self *SRegion) GetICloudKafkaById(id string) (cloudprovider.ICloudKafka, error) {
  160. kafka, err := self.GetKafka(id)
  161. if err != nil {
  162. return nil, errors.Wrapf(err, "GetKafka(%s)", id)
  163. }
  164. return kafka, nil
  165. }
  166. func (self *SRegion) GetICloudKafkas() ([]cloudprovider.ICloudKafka, error) {
  167. kafkas := []SKafka{}
  168. for {
  169. part, total, err := self.GetKafkas("", 20, len(kafkas))
  170. if err != nil {
  171. return nil, errors.Wrapf(err, "GetKafkas")
  172. }
  173. kafkas = append(kafkas, part...)
  174. if len(kafkas) >= total {
  175. break
  176. }
  177. }
  178. ret := []cloudprovider.ICloudKafka{}
  179. for i := range kafkas {
  180. kafkas[i].region = self
  181. ret = append(ret, &kafkas[i])
  182. }
  183. return ret, nil
  184. }
  185. func (self *SRegion) GetKafka(id string) (*SKafka, error) {
  186. params := map[string]string{
  187. "InstanceId": id,
  188. }
  189. resp, err := self.kafkaRequest("DescribeInstanceAttributes", params)
  190. if err != nil {
  191. return nil, errors.Wrapf(err, "DescribeInstanceAttributes")
  192. }
  193. ret := SKafka{region: self}
  194. err = resp.Unmarshal(&ret, "Result")
  195. if err != nil {
  196. return nil, errors.Wrapf(err, "resp.Unmarshal")
  197. }
  198. return &ret, nil
  199. }
  200. func (self *SRegion) GetKafkas(id string, limit, offset int) ([]SKafka, int, error) {
  201. if limit < 1 || limit > 20 {
  202. limit = 20
  203. }
  204. params := map[string]string{
  205. "Limit": fmt.Sprintf("%d", limit),
  206. "Offset": fmt.Sprintf("%d", offset),
  207. }
  208. if len(id) > 0 {
  209. params["InstanceId"] = id
  210. }
  211. resp, err := self.kafkaRequest("DescribeInstancesDetail", params)
  212. if err != nil {
  213. return nil, 0, errors.Wrapf(err, "DescribeInstancesDetail")
  214. }
  215. ret := []SKafka{}
  216. err = resp.Unmarshal(&ret, "Result", "InstanceList")
  217. if err != nil {
  218. return nil, 0, errors.Wrapf(err, "resp.Unmarshal")
  219. }
  220. totalCount, _ := resp.Int("Result", "TotalCount")
  221. return ret, int(totalCount), nil
  222. }