base.go 37 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248
  1. // Copyright 2019 Yunion
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package providerdriver
  15. import (
  16. "context"
  17. "strconv"
  18. "sync"
  19. "time"
  20. "yunion.io/x/cloudmux/pkg/cloudprovider"
  21. "yunion.io/x/jsonutils"
  22. "yunion.io/x/log"
  23. "yunion.io/x/pkg/errors"
  24. "yunion.io/x/pkg/gotypes"
  25. api "yunion.io/x/onecloud/pkg/apis/compute"
  26. "yunion.io/x/onecloud/pkg/cloudcommon/tsdb"
  27. "yunion.io/x/onecloud/pkg/cloudmon/options"
  28. "yunion.io/x/onecloud/pkg/mcclient/auth"
  29. "yunion.io/x/onecloud/pkg/mcclient/modules/compute"
  30. "yunion.io/x/onecloud/pkg/util/hashcache"
  31. "yunion.io/x/onecloud/pkg/util/influxdb"
  32. )
  33. type SBaseCollectDriver struct {
  34. }
  35. func (self *SBaseCollectDriver) GetDelayDuration() time.Duration {
  36. return 6 * time.Minute
  37. }
  38. func (self *SBaseCollectDriver) IsSupportMetrics() bool {
  39. return false
  40. }
  41. func (self *SBaseCollectDriver) CollectAccountMetrics(ctx context.Context, account api.CloudaccountDetail) (influxdb.SMetricData, error) {
  42. metric := influxdb.SMetricData{
  43. Name: string(cloudprovider.METRIC_RESOURCE_TYPE_CLOUD_ACCOUNT),
  44. Timestamp: time.Now(),
  45. Tags: []influxdb.SKeyValue{},
  46. Metrics: []influxdb.SKeyValue{},
  47. }
  48. for k, v := range account.GetMetricTags() {
  49. metric.Tags = append([]influxdb.SKeyValue{
  50. {
  51. Key: k,
  52. Value: v,
  53. },
  54. }, metric.Tags...)
  55. }
  56. for k, v := range account.GetMetricPairs() {
  57. metric.Metrics = append([]influxdb.SKeyValue{
  58. {
  59. Key: k,
  60. Value: v,
  61. },
  62. }, metric.Metrics...)
  63. }
  64. return metric, nil
  65. }
  66. func (self *SBaseCollectDriver) CollectDBInstanceMetrics(ctx context.Context, manager api.CloudproviderDetails, provider cloudprovider.ICloudProvider, res map[string]api.DBInstanceDetails, start, end time.Time) error {
  67. return cloudprovider.ErrNotImplemented
  68. }
  69. func (self *SBaseCollectDriver) CollectServerMetrics(ctx context.Context, manager api.CloudproviderDetails, provider cloudprovider.ICloudProvider, res map[string]api.ServerDetails, start, end time.Time) error {
  70. return cloudprovider.ErrNotImplemented
  71. }
  72. func (self *SBaseCollectDriver) CollectHostMetrics(ctx context.Context, manager api.CloudproviderDetails, provider cloudprovider.ICloudProvider, res map[string]api.HostDetails, start, end time.Time) error {
  73. return cloudprovider.ErrNotImplemented
  74. }
  75. func (self *SBaseCollectDriver) CollectWireMetrics(ctx context.Context, manager api.CloudproviderDetails, provider cloudprovider.ICloudProvider, res map[string]api.WireDetails, start, end time.Time) error {
  76. return cloudprovider.ErrNotImplemented
  77. }
  78. func (self *SBaseCollectDriver) CollectEipMetrics(ctx context.Context, manager api.CloudproviderDetails, provider cloudprovider.ICloudProvider, res map[string]api.ElasticipDetails, start, end time.Time) error {
  79. return cloudprovider.ErrNotImplemented
  80. }
  81. func (self *SBaseCollectDriver) CollectStorageMetrics(ctx context.Context, manager api.CloudproviderDetails, provider cloudprovider.ICloudProvider, res map[string]api.StorageDetails, start, end time.Time) error {
  82. metrics := []influxdb.SMetricData{}
  83. for _, storage := range res {
  84. metric := influxdb.SMetricData{
  85. Name: string(cloudprovider.METRIC_RESOURCE_TYPE_STORAGE),
  86. Timestamp: time.Now(),
  87. Tags: []influxdb.SKeyValue{},
  88. Metrics: []influxdb.SKeyValue{},
  89. }
  90. for k, v := range storage.GetMetricTags() {
  91. metric.Tags = append(metric.Tags, influxdb.SKeyValue{
  92. Key: k,
  93. Value: v,
  94. })
  95. }
  96. for k, v := range storage.GetMetricPairs() {
  97. metric.Metrics = append(metric.Metrics, influxdb.SKeyValue{
  98. Key: k,
  99. Value: v,
  100. })
  101. }
  102. metrics = append(metrics, metric)
  103. }
  104. return self.sendMetrics(ctx, manager, "storage", len(res), metrics)
  105. }
  106. func (self *SBaseCollectDriver) sendMetrics(ctx context.Context, manager api.CloudproviderDetails, resName string, resCnt int, metrics []influxdb.SMetricData) error {
  107. s := auth.GetAdminSession(ctx, options.Options.Region)
  108. urls, err := tsdb.GetDefaultServiceSourceURLs(s, options.Options.SessionEndpointType)
  109. if err != nil {
  110. return errors.Wrap(err, "GetServiceURLs")
  111. }
  112. log.Infof("send %d %s with %d metrics for %s(%s)", resCnt, resName, len(metrics), manager.Name, manager.Id)
  113. return influxdb.BatchSendMetrics(urls, options.Options.InfluxDatabase, metrics, false)
  114. }
  115. func (self *SBaseCollectDriver) CollectRedisMetrics(ctx context.Context, manager api.CloudproviderDetails, provider cloudprovider.ICloudProvider, res map[string]api.ElasticcacheDetails, start, end time.Time) error {
  116. return cloudprovider.ErrNotImplemented
  117. }
  118. func (self *SBaseCollectDriver) CollectLoadbalancerMetrics(ctx context.Context, manager api.CloudproviderDetails, provider cloudprovider.ICloudProvider, res map[string]api.LoadbalancerDetails, start, end time.Time) error {
  119. return cloudprovider.ErrNotImplemented
  120. }
  121. func (self *SBaseCollectDriver) CollectBucketMetrics(ctx context.Context, manager api.CloudproviderDetails, provider cloudprovider.ICloudProvider, res map[string]api.BucketDetails, start, end time.Time) error {
  122. return cloudprovider.ErrNotImplemented
  123. }
  124. func (self *SBaseCollectDriver) CollectK8sMetrics(ctx context.Context, manager api.CloudproviderDetails, provider cloudprovider.ICloudProvider, res map[string]api.KubeClusterDetails, start, end time.Time) error {
  125. return cloudprovider.ErrNotImplemented
  126. }
  127. func (self *SBaseCollectDriver) CollectModelartsPoolMetrics(ctx context.Context, manager api.CloudproviderDetails, provider cloudprovider.ICloudProvider, res map[string]api.ModelartsPoolDetails, start, end time.Time) error {
  128. return cloudprovider.ErrNotImplemented
  129. }
  130. type SCollectByResourceIdDriver struct {
  131. SBaseCollectDriver
  132. }
  133. func (self *SCollectByResourceIdDriver) CollectDBInstanceMetrics(ctx context.Context, manager api.CloudproviderDetails, provider cloudprovider.ICloudProvider, res map[string]api.DBInstanceDetails, start, end time.Time) error {
  134. ch := make(chan struct{}, options.Options.CloudResourceCollectMetricsBatchCount)
  135. defer close(ch)
  136. metrics := []influxdb.SMetricData{}
  137. var wg sync.WaitGroup
  138. var mu sync.Mutex
  139. for i := range res {
  140. ch <- struct{}{}
  141. wg.Add(1)
  142. go func(rds api.DBInstanceDetails) {
  143. defer func() {
  144. wg.Done()
  145. <-ch
  146. }()
  147. opts := &cloudprovider.MetricListOptions{
  148. ResourceType: cloudprovider.METRIC_RESOURCE_TYPE_RDS,
  149. StartTime: start,
  150. EndTime: end,
  151. }
  152. opts.ResourceId = rds.ExternalId
  153. opts.RegionExtId = rds.RegionExtId
  154. opts.Engine = rds.Engine
  155. tags := []influxdb.SKeyValue{}
  156. for k, v := range rds.GetMetricTags() {
  157. tags = append(tags, influxdb.SKeyValue{
  158. Key: k,
  159. Value: v,
  160. })
  161. }
  162. data, err := provider.GetMetrics(opts)
  163. if err != nil {
  164. if errors.Cause(err) != cloudprovider.ErrNotImplemented && errors.Cause(err) != cloudprovider.ErrNotSupported {
  165. log.Errorf("get rds %s(%s) error: %v", rds.Name, rds.Id, err)
  166. return
  167. }
  168. return
  169. }
  170. for _, values := range data {
  171. for _, value := range values.Values {
  172. metric := influxdb.SMetricData{
  173. Name: values.MetricType.Name(),
  174. Timestamp: value.Timestamp,
  175. Tags: tags,
  176. Metrics: []influxdb.SKeyValue{
  177. {
  178. Key: values.MetricType.Key(),
  179. Value: strconv.FormatFloat(value.Value, 'E', -1, 64),
  180. },
  181. },
  182. }
  183. for k, v := range value.Tags {
  184. metric.Tags = append(metric.Tags, influxdb.SKeyValue{
  185. Key: k,
  186. Value: v,
  187. })
  188. }
  189. mu.Lock()
  190. metrics = append(metrics, metric)
  191. mu.Unlock()
  192. }
  193. }
  194. }(res[i])
  195. }
  196. wg.Wait()
  197. return self.sendMetrics(ctx, manager, "rds", len(res), metrics)
  198. }
  199. func (self *SCollectByResourceIdDriver) CollectServerMetrics(ctx context.Context, manager api.CloudproviderDetails, provider cloudprovider.ICloudProvider, res map[string]api.ServerDetails, start, end time.Time) error {
  200. cnt := options.Options.CloudResourceCollectMetricsBatchCount
  201. if manager.Provider == api.CLOUD_PROVIDER_ORACLE { // oracle 限速
  202. cnt = options.Options.OracleCloudResourceCollectMetricsBatchCount
  203. }
  204. ch := make(chan struct{}, cnt)
  205. defer close(ch)
  206. metrics := []influxdb.SMetricData{}
  207. var wg sync.WaitGroup
  208. var mu sync.Mutex
  209. for i := range res {
  210. ch <- struct{}{}
  211. wg.Add(1)
  212. go func(vm api.ServerDetails) {
  213. defer func() {
  214. wg.Done()
  215. <-ch
  216. }()
  217. opts := &cloudprovider.MetricListOptions{
  218. ResourceType: cloudprovider.METRIC_RESOURCE_TYPE_SERVER,
  219. RegionExtId: vm.RegionExtId,
  220. StartTime: start,
  221. EndTime: end,
  222. OsType: vm.OsType,
  223. IsSupportAzureTableStorageMetric: options.Options.SupportAzureTableStorageMetric,
  224. }
  225. opts.ResourceId = vm.ExternalId
  226. tags := []influxdb.SKeyValue{}
  227. for k, v := range vm.GetMetricTags() {
  228. tags = append(tags, influxdb.SKeyValue{
  229. Key: k,
  230. Value: v,
  231. })
  232. }
  233. pairs := []influxdb.SKeyValue{}
  234. for k, v := range vm.GetMetricPairs() {
  235. pairs = append(pairs, influxdb.SKeyValue{
  236. Key: k,
  237. Value: v,
  238. })
  239. }
  240. data, err := provider.GetMetrics(opts)
  241. if err != nil {
  242. if errors.Cause(err) != cloudprovider.ErrNotImplemented && errors.Cause(err) != cloudprovider.ErrNotSupported {
  243. log.Errorf("get server %s(%s) error: %v", vm.Name, vm.Id, err)
  244. return
  245. }
  246. return
  247. }
  248. for _, values := range data {
  249. metricKey := values.MetricType.Key()
  250. for _, value := range values.Values {
  251. metric := influxdb.SMetricData{
  252. Name: values.MetricType.Name(),
  253. Timestamp: value.Timestamp,
  254. Tags: []influxdb.SKeyValue{},
  255. Metrics: []influxdb.SKeyValue{
  256. {
  257. Key: metricKey,
  258. Value: strconv.FormatFloat(value.Value, 'E', -1, 64),
  259. },
  260. },
  261. }
  262. for k, v := range value.Tags {
  263. metric.Tags = append(metric.Tags, influxdb.SKeyValue{
  264. Key: k,
  265. Value: v,
  266. })
  267. }
  268. metric.Tags = append(metric.Tags, tags...)
  269. metric.Metrics = append(metric.Metrics, pairs...)
  270. mu.Lock()
  271. metrics = append(metrics, metric)
  272. mu.Unlock()
  273. }
  274. }
  275. }(res[i])
  276. }
  277. wg.Wait()
  278. return self.sendMetrics(ctx, manager, "server", len(res), metrics)
  279. }
  280. var (
  281. hostMemoryCache = hashcache.NewCache(1024, time.Minute*5)
  282. hostCpuCache = hashcache.NewCache(1024, time.Minute*5)
  283. hostIdCache = hashcache.NewCache(1024, time.Minute*5)
  284. )
  285. func (self *SCollectByResourceIdDriver) CollectHostMetrics(ctx context.Context, manager api.CloudproviderDetails, provider cloudprovider.ICloudProvider, res map[string]api.HostDetails, start, end time.Time) error {
  286. ch := make(chan struct{}, options.Options.CloudResourceCollectMetricsBatchCount)
  287. defer close(ch)
  288. metrics := []influxdb.SMetricData{}
  289. var wg sync.WaitGroup
  290. var mu sync.Mutex
  291. s := auth.GetAdminSession(ctx, options.Options.Region)
  292. for i := range res {
  293. ch <- struct{}{}
  294. wg.Add(1)
  295. go func(host api.HostDetails) {
  296. defer func() {
  297. wg.Done()
  298. <-ch
  299. }()
  300. opts := &cloudprovider.MetricListOptions{
  301. ResourceType: cloudprovider.METRIC_RESOURCE_TYPE_HOST,
  302. ResourceId: host.ExternalId,
  303. RegionExtId: host.RegionExtId,
  304. StartTime: start,
  305. EndTime: end,
  306. }
  307. tags := []influxdb.SKeyValue{}
  308. for k, v := range host.GetMetricTags() {
  309. tags = append(tags, influxdb.SKeyValue{
  310. Key: k,
  311. Value: v,
  312. })
  313. }
  314. pairs := []influxdb.SKeyValue{}
  315. for k, v := range host.GetMetricPairs() {
  316. pairs = append(pairs, influxdb.SKeyValue{
  317. Key: k,
  318. Value: v,
  319. })
  320. }
  321. data, err := provider.GetMetrics(opts)
  322. if err != nil {
  323. if errors.Cause(err) != cloudprovider.ErrNotImplemented && errors.Cause(err) != cloudprovider.ErrNotSupported {
  324. log.Errorf("get host %s(%s) error: %v", host.Name, host.Id, err)
  325. return
  326. }
  327. return
  328. }
  329. cpu, cpuCnt, memory, memoryCnt := 0.0, 0.0, 0.0, 0.0
  330. for _, values := range data {
  331. for _, value := range values.Values {
  332. switch values.MetricType {
  333. case cloudprovider.HOST_METRIC_TYPE_CPU_USAGE:
  334. cpu += value.Value
  335. cpuCnt++
  336. case cloudprovider.HOST_METRIC_TYPE_MEM_USAGE:
  337. memory += value.Value
  338. memoryCnt++
  339. }
  340. metric := influxdb.SMetricData{
  341. Name: values.MetricType.Name(),
  342. Timestamp: value.Timestamp,
  343. Tags: []influxdb.SKeyValue{},
  344. Metrics: []influxdb.SKeyValue{
  345. {
  346. Key: values.MetricType.Key(),
  347. Value: strconv.FormatFloat(value.Value, 'E', -1, 64),
  348. },
  349. },
  350. }
  351. for k, v := range value.Tags {
  352. metric.Tags = append(metric.Tags, influxdb.SKeyValue{
  353. Key: k,
  354. Value: v,
  355. })
  356. }
  357. metric.Tags = append(metric.Tags, tags...)
  358. metric.Metrics = append(metric.Metrics, pairs...)
  359. mu.Lock()
  360. metrics = append(metrics, metric)
  361. mu.Unlock()
  362. }
  363. }
  364. avgCpu := cpu / float64(cpuCnt)
  365. avgMemory := memory / float64(memoryCnt)
  366. info := hostIdCache.AtomicGet(host.Id)
  367. if gotypes.IsNil(info) {
  368. pingInfo := api.SHostPingInput{}
  369. pingInfo.WithData = true
  370. pingInfo.MemoryUsedMb = int(float64(host.MemSize)*avgMemory) / 100
  371. pingInfo.CpuUsagePercent = avgCpu
  372. _, err := compute.Hosts.PerformAction(s, host.Id, "ping", jsonutils.Marshal(pingInfo))
  373. if err != nil {
  374. log.Errorf("perform ping %s(%s) error: %v", host.Name, host.Id, err)
  375. return
  376. }
  377. hostIdCache.AtomicSet(host.Id, host.Id)
  378. }
  379. }(res[i])
  380. }
  381. wg.Wait()
  382. return self.sendMetrics(ctx, manager, "host", len(res), metrics)
  383. }
  384. func (self *SCollectByResourceIdDriver) CollectRedisMetrics(ctx context.Context, manager api.CloudproviderDetails, provider cloudprovider.ICloudProvider, res map[string]api.ElasticcacheDetails, start, end time.Time) error {
  385. ch := make(chan struct{}, options.Options.CloudResourceCollectMetricsBatchCount)
  386. defer close(ch)
  387. metrics := []influxdb.SMetricData{}
  388. var wg sync.WaitGroup
  389. var mu sync.Mutex
  390. for i := range res {
  391. ch <- struct{}{}
  392. wg.Add(1)
  393. go func(vm api.ElasticcacheDetails) {
  394. defer func() {
  395. wg.Done()
  396. <-ch
  397. }()
  398. opts := &cloudprovider.MetricListOptions{
  399. ResourceType: cloudprovider.METRIC_RESOURCE_TYPE_REDIS,
  400. RegionExtId: vm.RegionExtId,
  401. StartTime: start,
  402. EndTime: end,
  403. }
  404. opts.ResourceId = vm.ExternalId
  405. tags := []influxdb.SKeyValue{}
  406. for k, v := range vm.GetMetricTags() {
  407. tags = append(tags, influxdb.SKeyValue{
  408. Key: k,
  409. Value: v,
  410. })
  411. }
  412. pairs := []influxdb.SKeyValue{}
  413. for k, v := range vm.GetMetricPairs() {
  414. pairs = append(pairs, influxdb.SKeyValue{
  415. Key: k,
  416. Value: v,
  417. })
  418. }
  419. data, err := provider.GetMetrics(opts)
  420. if err != nil {
  421. if errors.Cause(err) != cloudprovider.ErrNotImplemented && errors.Cause(err) != cloudprovider.ErrNotSupported {
  422. log.Errorf("get %s %s(%s) error: %v", opts.ResourceType, vm.Name, vm.Id, err)
  423. return
  424. }
  425. return
  426. }
  427. for _, values := range data {
  428. for _, value := range values.Values {
  429. metric := influxdb.SMetricData{
  430. Name: values.MetricType.Name(),
  431. Timestamp: value.Timestamp,
  432. Tags: []influxdb.SKeyValue{},
  433. Metrics: []influxdb.SKeyValue{
  434. {
  435. Key: values.MetricType.Key(),
  436. Value: strconv.FormatFloat(value.Value, 'E', -1, 64),
  437. },
  438. },
  439. }
  440. for k, v := range value.Tags {
  441. metric.Tags = append(metric.Tags, influxdb.SKeyValue{
  442. Key: k,
  443. Value: v,
  444. })
  445. }
  446. metric.Tags = append(metric.Tags, tags...)
  447. metric.Metrics = append(metric.Metrics, pairs...)
  448. mu.Lock()
  449. metrics = append(metrics, metric)
  450. mu.Unlock()
  451. }
  452. }
  453. }(res[i])
  454. }
  455. wg.Wait()
  456. return self.sendMetrics(ctx, manager, "redis", len(res), metrics)
  457. }
  458. func (self *SCollectByResourceIdDriver) CollectBucketMetrics(ctx context.Context, manager api.CloudproviderDetails, provider cloudprovider.ICloudProvider, res map[string]api.BucketDetails, start, end time.Time) error {
  459. ch := make(chan struct{}, options.Options.CloudResourceCollectMetricsBatchCount)
  460. defer close(ch)
  461. metrics := []influxdb.SMetricData{}
  462. var wg sync.WaitGroup
  463. var mu sync.Mutex
  464. for i := range res {
  465. ch <- struct{}{}
  466. wg.Add(1)
  467. go func(vm api.BucketDetails) {
  468. defer func() {
  469. wg.Done()
  470. <-ch
  471. }()
  472. opts := &cloudprovider.MetricListOptions{
  473. ResourceType: cloudprovider.METRIC_RESOURCE_TYPE_BUCKET,
  474. RegionExtId: vm.RegionExtId,
  475. StartTime: start,
  476. EndTime: end,
  477. }
  478. opts.ResourceId = vm.ExternalId
  479. tags := []influxdb.SKeyValue{}
  480. for k, v := range vm.GetMetricTags() {
  481. tags = append(tags, influxdb.SKeyValue{
  482. Key: k,
  483. Value: v,
  484. })
  485. }
  486. pairs := []influxdb.SKeyValue{}
  487. for k, v := range vm.GetMetricPairs() {
  488. pairs = append(pairs, influxdb.SKeyValue{
  489. Key: k,
  490. Value: v,
  491. })
  492. }
  493. data, err := provider.GetMetrics(opts)
  494. if err != nil {
  495. if errors.Cause(err) != cloudprovider.ErrNotImplemented && errors.Cause(err) != cloudprovider.ErrNotSupported {
  496. log.Errorf("get %s %s(%s) error: %v", opts.ResourceType, vm.Name, vm.Id, err)
  497. return
  498. }
  499. return
  500. }
  501. for _, values := range data {
  502. for _, value := range values.Values {
  503. metric := influxdb.SMetricData{
  504. Name: values.MetricType.Name(),
  505. Timestamp: value.Timestamp,
  506. Tags: []influxdb.SKeyValue{},
  507. Metrics: []influxdb.SKeyValue{
  508. {
  509. Key: values.MetricType.Key(),
  510. Value: strconv.FormatFloat(value.Value, 'E', -1, 64),
  511. },
  512. },
  513. }
  514. for k, v := range value.Tags {
  515. metric.Tags = append(metric.Tags, influxdb.SKeyValue{
  516. Key: k,
  517. Value: v,
  518. })
  519. }
  520. metric.Metrics = append(metric.Metrics, pairs...)
  521. mu.Lock()
  522. metrics = append(metrics, metric)
  523. mu.Unlock()
  524. }
  525. }
  526. }(res[i])
  527. }
  528. wg.Wait()
  529. return self.sendMetrics(ctx, manager, "bucket", len(res), metrics)
  530. }
  531. func (self *SCollectByResourceIdDriver) CollectK8sMetrics(ctx context.Context, manager api.CloudproviderDetails, provider cloudprovider.ICloudProvider, res map[string]api.KubeClusterDetails, start, end time.Time) error {
  532. ch := make(chan struct{}, options.Options.CloudResourceCollectMetricsBatchCount)
  533. defer close(ch)
  534. metrics := []influxdb.SMetricData{}
  535. var wg sync.WaitGroup
  536. var mu sync.Mutex
  537. for i := range res {
  538. ch <- struct{}{}
  539. wg.Add(1)
  540. go func(vm api.KubeClusterDetails) {
  541. defer func() {
  542. wg.Done()
  543. <-ch
  544. }()
  545. // 未同步到本地k8s集群
  546. if len(vm.ExternalClusterId) == 0 {
  547. log.Infof("skip collect %s %s(%s) metric, because not with local kubeserver", vm.Name, manager.Name, manager.Id)
  548. return
  549. }
  550. opts := &cloudprovider.MetricListOptions{
  551. ResourceType: cloudprovider.METRIC_RESOURCE_TYPE_K8S,
  552. ResourceId: vm.ExternalId,
  553. RegionExtId: vm.RegionExtId,
  554. StartTime: start,
  555. EndTime: end,
  556. }
  557. data, err := provider.GetMetrics(opts)
  558. if err != nil {
  559. if errors.Cause(err) != cloudprovider.ErrNotImplemented && errors.Cause(err) != cloudprovider.ErrNotSupported {
  560. log.Errorf("get %s %s(%s) error: %v", opts.ResourceType, vm.Name, vm.Id, err)
  561. return
  562. }
  563. return
  564. }
  565. tags := []influxdb.SKeyValue{}
  566. for k, v := range vm.GetMetricTags() {
  567. tags = append(tags, influxdb.SKeyValue{
  568. Key: k,
  569. Value: v,
  570. })
  571. }
  572. pairs := []influxdb.SKeyValue{}
  573. for k, v := range vm.GetMetricPairs() {
  574. pairs = append(pairs, influxdb.SKeyValue{
  575. Key: k,
  576. Value: v,
  577. })
  578. }
  579. for _, values := range data {
  580. for _, value := range values.Values {
  581. metric := influxdb.SMetricData{
  582. Name: values.MetricType.Name(),
  583. Timestamp: value.Timestamp,
  584. Tags: []influxdb.SKeyValue{},
  585. Metrics: []influxdb.SKeyValue{
  586. {
  587. Key: values.MetricType.Key(),
  588. Value: strconv.FormatFloat(value.Value, 'E', -1, 64),
  589. },
  590. },
  591. }
  592. for k, v := range value.Tags {
  593. metric.Tags = append(metric.Tags, influxdb.SKeyValue{
  594. Key: k,
  595. Value: v,
  596. })
  597. }
  598. metric.Metrics = append(metric.Metrics, pairs...)
  599. mu.Lock()
  600. metrics = append(metrics, metric)
  601. mu.Unlock()
  602. }
  603. }
  604. }(res[i])
  605. }
  606. wg.Wait()
  607. return self.sendMetrics(ctx, manager, "k8s", len(res), metrics)
  608. }
  609. func (self *SCollectByResourceIdDriver) CollectLoadbalancerMetrics(ctx context.Context, manager api.CloudproviderDetails, provider cloudprovider.ICloudProvider, res map[string]api.LoadbalancerDetails, start, end time.Time) error {
  610. metrics := []influxdb.SMetricData{}
  611. var wg sync.WaitGroup
  612. var mu sync.Mutex
  613. for i := range res {
  614. wg.Add(1)
  615. go func(lb api.LoadbalancerDetails) {
  616. defer func() {
  617. wg.Done()
  618. }()
  619. opts := &cloudprovider.MetricListOptions{
  620. ResourceType: cloudprovider.METRIC_RESOURCE_TYPE_LB,
  621. StartTime: start,
  622. EndTime: end,
  623. }
  624. opts.ResourceId = lb.ExternalId
  625. opts.RegionExtId = lb.RegionExtId
  626. tags := []influxdb.SKeyValue{}
  627. for k, v := range lb.GetMetricTags() {
  628. tags = append(tags, influxdb.SKeyValue{
  629. Key: k,
  630. Value: v,
  631. })
  632. }
  633. data, err := provider.GetMetrics(opts)
  634. if err != nil {
  635. if errors.Cause(err) != cloudprovider.ErrNotImplemented && errors.Cause(err) != cloudprovider.ErrNotSupported {
  636. log.Errorf("get loadbalancers %s(%s) error: %v", lb.Name, lb.Id, err)
  637. return
  638. }
  639. return
  640. }
  641. for _, values := range data {
  642. for _, value := range values.Values {
  643. metric := influxdb.SMetricData{
  644. Name: values.MetricType.Name(),
  645. Timestamp: value.Timestamp,
  646. Tags: tags,
  647. Metrics: []influxdb.SKeyValue{
  648. {
  649. Key: values.MetricType.Key(),
  650. Value: strconv.FormatFloat(value.Value, 'E', -1, 64),
  651. },
  652. },
  653. }
  654. for k, v := range value.Tags {
  655. metric.Tags = append(metric.Tags, influxdb.SKeyValue{
  656. Key: k,
  657. Value: v,
  658. })
  659. }
  660. mu.Lock()
  661. metrics = append(metrics, metric)
  662. mu.Unlock()
  663. }
  664. }
  665. }(res[i])
  666. }
  667. wg.Wait()
  668. return self.sendMetrics(ctx, manager, "slb", len(res), metrics)
  669. }
  670. type SCollectByMetricTypeDriver struct {
  671. SBaseCollectDriver
  672. }
  673. func (self *SCollectByMetricTypeDriver) CollectDBInstanceMetrics(ctx context.Context, manager api.CloudproviderDetails, provider cloudprovider.ICloudProvider, res map[string]api.DBInstanceDetails, start, end time.Time) error {
  674. metrics := []influxdb.SMetricData{}
  675. var wg sync.WaitGroup
  676. var mu sync.Mutex
  677. for _, _metricType := range cloudprovider.ALL_RDS_METRIC_TYPES {
  678. wg.Add(1)
  679. go func(metricType cloudprovider.TMetricType) {
  680. defer func() {
  681. wg.Done()
  682. }()
  683. opts := &cloudprovider.MetricListOptions{
  684. ResourceType: cloudprovider.METRIC_RESOURCE_TYPE_RDS,
  685. MetricType: metricType,
  686. StartTime: start,
  687. EndTime: end,
  688. }
  689. data, err := provider.GetMetrics(opts)
  690. if err != nil {
  691. if errors.Cause(err) != cloudprovider.ErrNotImplemented && errors.Cause(err) != cloudprovider.ErrNotSupported {
  692. log.Errorf("get rds %s(%s) %s error: %v", manager.Name, manager.Id, metricType, err)
  693. return
  694. }
  695. return
  696. }
  697. for _, value := range data {
  698. rds, ok := res[value.Id]
  699. if !ok {
  700. continue
  701. }
  702. tags := []influxdb.SKeyValue{}
  703. for k, v := range rds.GetMetricTags() {
  704. tags = append(tags, influxdb.SKeyValue{
  705. Key: k,
  706. Value: v,
  707. })
  708. }
  709. for _, v := range value.Values {
  710. metric := influxdb.SMetricData{
  711. Name: value.MetricType.Name(),
  712. Timestamp: v.Timestamp,
  713. Tags: []influxdb.SKeyValue{},
  714. Metrics: []influxdb.SKeyValue{
  715. {
  716. Key: value.MetricType.Key(),
  717. Value: strconv.FormatFloat(v.Value, 'E', -1, 64),
  718. },
  719. },
  720. }
  721. for k, v := range v.Tags {
  722. metric.Tags = append(metric.Tags, influxdb.SKeyValue{
  723. Key: k,
  724. Value: v,
  725. })
  726. }
  727. metric.Tags = append(metric.Tags, tags...)
  728. mu.Lock()
  729. metrics = append(metrics, metric)
  730. mu.Unlock()
  731. }
  732. }
  733. }(_metricType)
  734. }
  735. wg.Wait()
  736. return self.sendMetrics(ctx, manager, "rds", len(res), metrics)
  737. }
  738. func (self *SCollectByMetricTypeDriver) CollectServerMetrics(ctx context.Context, manager api.CloudproviderDetails, provider cloudprovider.ICloudProvider, res map[string]api.ServerDetails, start, end time.Time) error {
  739. metrics := []influxdb.SMetricData{}
  740. var wg sync.WaitGroup
  741. var mu sync.Mutex
  742. for _, _metricType := range cloudprovider.ALL_VM_METRIC_TYPES {
  743. wg.Add(1)
  744. go func(metricType cloudprovider.TMetricType) {
  745. defer func() {
  746. wg.Done()
  747. }()
  748. opts := &cloudprovider.MetricListOptions{
  749. ResourceType: cloudprovider.METRIC_RESOURCE_TYPE_SERVER,
  750. MetricType: metricType,
  751. StartTime: start,
  752. EndTime: end,
  753. }
  754. data, err := provider.GetMetrics(opts)
  755. if err != nil {
  756. if errors.Cause(err) != cloudprovider.ErrNotImplemented && errors.Cause(err) != cloudprovider.ErrNotSupported {
  757. log.Errorf("get server metric %s for %s(%s) error: %v", metricType, manager.Name, manager.Id, err)
  758. return
  759. }
  760. return
  761. }
  762. for _, value := range data {
  763. vm, ok := res[value.Id]
  764. if !ok {
  765. continue
  766. }
  767. tags := []influxdb.SKeyValue{}
  768. for k, v := range vm.GetMetricTags() {
  769. tags = append(tags, influxdb.SKeyValue{
  770. Key: k,
  771. Value: v,
  772. })
  773. }
  774. pairs := []influxdb.SKeyValue{}
  775. for k, v := range vm.GetMetricPairs() {
  776. pairs = append(pairs, influxdb.SKeyValue{
  777. Key: k,
  778. Value: v,
  779. })
  780. }
  781. for _, v := range value.Values {
  782. metric := influxdb.SMetricData{
  783. Name: value.MetricType.Name(),
  784. Timestamp: v.Timestamp,
  785. Tags: []influxdb.SKeyValue{},
  786. Metrics: []influxdb.SKeyValue{
  787. {
  788. Key: value.MetricType.Key(),
  789. Value: strconv.FormatFloat(v.Value, 'E', -1, 64),
  790. },
  791. },
  792. }
  793. for k, v := range v.Tags {
  794. metric.Tags = append(metric.Tags, influxdb.SKeyValue{
  795. Key: k,
  796. Value: v,
  797. })
  798. }
  799. metric.Metrics = append(metric.Metrics, pairs...)
  800. metric.Tags = append(metric.Tags, tags...)
  801. mu.Lock()
  802. metrics = append(metrics, metric)
  803. mu.Unlock()
  804. }
  805. }
  806. }(_metricType)
  807. }
  808. wg.Wait()
  809. return self.sendMetrics(ctx, manager, "server", len(res), metrics)
  810. }
  811. func (self *SCollectByMetricTypeDriver) CollectHostMetrics(ctx context.Context, manager api.CloudproviderDetails, provider cloudprovider.ICloudProvider, res map[string]api.HostDetails, start, end time.Time) error {
  812. metrics := []influxdb.SMetricData{}
  813. var wg sync.WaitGroup
  814. var mu sync.Mutex
  815. s := auth.GetAdminSession(ctx, options.Options.Region)
  816. for _, _metricType := range cloudprovider.ALL_HOST_METRIC_TYPES {
  817. wg.Add(1)
  818. go func(metricType cloudprovider.TMetricType) {
  819. defer func() {
  820. wg.Done()
  821. }()
  822. opts := &cloudprovider.MetricListOptions{
  823. ResourceType: cloudprovider.METRIC_RESOURCE_TYPE_HOST,
  824. MetricType: metricType,
  825. StartTime: start,
  826. EndTime: end,
  827. }
  828. data, err := provider.GetMetrics(opts)
  829. if err != nil {
  830. if errors.Cause(err) != cloudprovider.ErrNotImplemented && errors.Cause(err) != cloudprovider.ErrNotSupported {
  831. log.Errorf("get server metric %s for %s(%s) error: %v", metricType, manager.Name, manager.Id, err)
  832. return
  833. }
  834. return
  835. }
  836. for _, value := range data {
  837. vm, ok := res[value.Id]
  838. if !ok {
  839. continue
  840. }
  841. tags := []influxdb.SKeyValue{}
  842. for k, v := range vm.GetMetricTags() {
  843. tags = append(tags, influxdb.SKeyValue{
  844. Key: k,
  845. Value: v,
  846. })
  847. }
  848. pairs := []influxdb.SKeyValue{}
  849. for k, v := range vm.GetMetricPairs() {
  850. pairs = append(pairs, influxdb.SKeyValue{
  851. Key: k,
  852. Value: v,
  853. })
  854. }
  855. total := 0.0
  856. for _, v := range value.Values {
  857. total += v.Value
  858. metric := influxdb.SMetricData{
  859. Name: value.MetricType.Name(),
  860. Timestamp: v.Timestamp,
  861. Tags: []influxdb.SKeyValue{},
  862. Metrics: []influxdb.SKeyValue{
  863. {
  864. Key: value.MetricType.Key(),
  865. Value: strconv.FormatFloat(v.Value, 'E', -1, 64),
  866. },
  867. },
  868. }
  869. for k, v := range v.Tags {
  870. metric.Tags = append(metric.Tags, influxdb.SKeyValue{
  871. Key: k,
  872. Value: v,
  873. })
  874. }
  875. metric.Metrics = append(metric.Metrics, pairs...)
  876. metric.Tags = append(metric.Tags, tags...)
  877. mu.Lock()
  878. metrics = append(metrics, metric)
  879. mu.Unlock()
  880. }
  881. avg := total / float64(len(value.Values))
  882. info := hostIdCache.AtomicGet(vm.Id)
  883. if gotypes.IsNil(info) {
  884. pingInfo := api.SHostPingInput{}
  885. pingInfo.WithData = true
  886. switch metricType {
  887. case cloudprovider.HOST_METRIC_TYPE_CPU_USAGE:
  888. pingInfo.CpuUsagePercent = avg
  889. case cloudprovider.HOST_METRIC_TYPE_MEM_USAGE:
  890. pingInfo.MemoryUsedMb = int(float64(vm.MemSize)*avg) / 100
  891. }
  892. _, err := compute.Hosts.PerformAction(s, vm.Id, "ping", jsonutils.Marshal(pingInfo))
  893. if err != nil {
  894. log.Errorf("perform ping %s(%s) error: %v", vm.Name, vm.Id, err)
  895. return
  896. }
  897. hostIdCache.AtomicSet(vm.Id, vm.Id)
  898. }
  899. }
  900. }(_metricType)
  901. }
  902. wg.Wait()
  903. return self.sendMetrics(ctx, manager, "host", len(res), metrics)
  904. }
  905. func (self *SCollectByMetricTypeDriver) CollectRedisMetrics(ctx context.Context, manager api.CloudproviderDetails, provider cloudprovider.ICloudProvider, res map[string]api.ElasticcacheDetails, start, end time.Time) error {
  906. metrics := []influxdb.SMetricData{}
  907. var wg sync.WaitGroup
  908. var mu sync.Mutex
  909. for _, _metricType := range cloudprovider.ALL_REDIS_METRIC_TYPES {
  910. wg.Add(1)
  911. go func(metricType cloudprovider.TMetricType) {
  912. defer func() {
  913. wg.Done()
  914. }()
  915. opts := &cloudprovider.MetricListOptions{
  916. ResourceType: cloudprovider.METRIC_RESOURCE_TYPE_REDIS,
  917. MetricType: metricType,
  918. StartTime: start,
  919. EndTime: end,
  920. }
  921. data, err := provider.GetMetrics(opts)
  922. if err != nil {
  923. if errors.Cause(err) != cloudprovider.ErrNotImplemented && errors.Cause(err) != cloudprovider.ErrNotSupported {
  924. log.Errorf("get metric %s for %s(%s) error: %v", opts.MetricType, manager.Name, manager.Id, err)
  925. return
  926. }
  927. return
  928. }
  929. for _, value := range data {
  930. vm, ok := res[value.Id]
  931. if !ok {
  932. continue
  933. }
  934. tags := []influxdb.SKeyValue{}
  935. for k, v := range vm.GetMetricTags() {
  936. tags = append(tags, influxdb.SKeyValue{
  937. Key: k,
  938. Value: v,
  939. })
  940. }
  941. pairs := []influxdb.SKeyValue{}
  942. for k, v := range vm.GetMetricPairs() {
  943. pairs = append(pairs, influxdb.SKeyValue{
  944. Key: k,
  945. Value: v,
  946. })
  947. }
  948. for _, v := range value.Values {
  949. metric := influxdb.SMetricData{
  950. Name: value.MetricType.Name(),
  951. Timestamp: v.Timestamp,
  952. Tags: []influxdb.SKeyValue{},
  953. Metrics: []influxdb.SKeyValue{
  954. {
  955. Key: value.MetricType.Key(),
  956. Value: strconv.FormatFloat(v.Value, 'E', -1, 64),
  957. },
  958. },
  959. }
  960. for k, v := range v.Tags {
  961. metric.Tags = append(metric.Tags, influxdb.SKeyValue{
  962. Key: k,
  963. Value: v,
  964. })
  965. }
  966. metric.Metrics = append(metric.Metrics, pairs...)
  967. metric.Tags = append(metric.Tags, tags...)
  968. mu.Lock()
  969. metrics = append(metrics, metric)
  970. mu.Unlock()
  971. }
  972. }
  973. }(_metricType)
  974. }
  975. wg.Wait()
  976. return self.sendMetrics(ctx, manager, "redis", len(res), metrics)
  977. }
  978. func (self *SCollectByMetricTypeDriver) CollectBucketMetrics(ctx context.Context, manager api.CloudproviderDetails, provider cloudprovider.ICloudProvider, res map[string]api.BucketDetails, start, end time.Time) error {
  979. metrics := []influxdb.SMetricData{}
  980. var wg sync.WaitGroup
  981. var mu sync.Mutex
  982. for _, _metricType := range cloudprovider.ALL_BUCKET_TYPES {
  983. wg.Add(1)
  984. go func(metricType cloudprovider.TMetricType) {
  985. defer func() {
  986. wg.Done()
  987. }()
  988. opts := &cloudprovider.MetricListOptions{
  989. ResourceType: cloudprovider.METRIC_RESOURCE_TYPE_BUCKET,
  990. MetricType: metricType,
  991. StartTime: start,
  992. EndTime: end,
  993. }
  994. data, err := provider.GetMetrics(opts)
  995. if err != nil {
  996. if errors.Cause(err) != cloudprovider.ErrNotImplemented && errors.Cause(err) != cloudprovider.ErrNotSupported {
  997. log.Errorf("get metric %s for %s(%s) error: %v", opts.MetricType, manager.Name, manager.Id, err)
  998. return
  999. }
  1000. return
  1001. }
  1002. for _, value := range data {
  1003. vm, ok := res[value.Id]
  1004. if !ok {
  1005. continue
  1006. }
  1007. tags := []influxdb.SKeyValue{}
  1008. for k, v := range vm.GetMetricTags() {
  1009. tags = append(tags, influxdb.SKeyValue{
  1010. Key: k,
  1011. Value: v,
  1012. })
  1013. }
  1014. pairs := []influxdb.SKeyValue{}
  1015. for k, v := range vm.GetMetricPairs() {
  1016. pairs = append(pairs, influxdb.SKeyValue{
  1017. Key: k,
  1018. Value: v,
  1019. })
  1020. }
  1021. for _, v := range value.Values {
  1022. metric := influxdb.SMetricData{
  1023. Name: value.MetricType.Name(),
  1024. Timestamp: v.Timestamp,
  1025. Tags: []influxdb.SKeyValue{},
  1026. Metrics: []influxdb.SKeyValue{
  1027. {
  1028. Key: value.MetricType.Key(),
  1029. Value: strconv.FormatFloat(v.Value, 'E', -1, 64),
  1030. },
  1031. },
  1032. }
  1033. for k, v := range v.Tags {
  1034. metric.Tags = append(metric.Tags, influxdb.SKeyValue{
  1035. Key: k,
  1036. Value: v,
  1037. })
  1038. }
  1039. metric.Metrics = append(metric.Metrics, pairs...)
  1040. metric.Tags = append(metric.Tags, tags...)
  1041. mu.Lock()
  1042. metrics = append(metrics, metric)
  1043. mu.Unlock()
  1044. }
  1045. }
  1046. }(_metricType)
  1047. }
  1048. wg.Wait()
  1049. return self.sendMetrics(ctx, manager, "bucket", len(res), metrics)
  1050. }
  1051. func (self *SCollectByMetricTypeDriver) CollectK8sMetrics(ctx context.Context, manager api.CloudproviderDetails, provider cloudprovider.ICloudProvider, res map[string]api.KubeClusterDetails, start, end time.Time) error {
  1052. metrics := []influxdb.SMetricData{}
  1053. var wg sync.WaitGroup
  1054. var mu sync.Mutex
  1055. for _, _metricType := range cloudprovider.ALL_K8S_NODE_TYPES {
  1056. wg.Add(1)
  1057. go func(metricType cloudprovider.TMetricType) {
  1058. defer func() {
  1059. wg.Done()
  1060. }()
  1061. opts := &cloudprovider.MetricListOptions{
  1062. ResourceType: cloudprovider.METRIC_RESOURCE_TYPE_K8S,
  1063. MetricType: metricType,
  1064. StartTime: start,
  1065. EndTime: end,
  1066. }
  1067. data, err := provider.GetMetrics(opts)
  1068. if err != nil {
  1069. if errors.Cause(err) != cloudprovider.ErrNotImplemented && errors.Cause(err) != cloudprovider.ErrNotSupported {
  1070. log.Errorf("get rds %s(%s) %s error: %v", manager.Name, manager.Id, metricType, err)
  1071. return
  1072. }
  1073. return
  1074. }
  1075. for _, value := range data {
  1076. k8s, ok := res[value.Id]
  1077. if !ok {
  1078. continue
  1079. }
  1080. tags := []influxdb.SKeyValue{}
  1081. for k, v := range k8s.GetMetricTags() {
  1082. tags = append(tags, influxdb.SKeyValue{
  1083. Key: k,
  1084. Value: v,
  1085. })
  1086. }
  1087. for _, v := range value.Values {
  1088. metric := influxdb.SMetricData{
  1089. Name: value.MetricType.Name(),
  1090. Timestamp: v.Timestamp,
  1091. Tags: []influxdb.SKeyValue{},
  1092. Metrics: []influxdb.SKeyValue{
  1093. {
  1094. Key: value.MetricType.Key(),
  1095. Value: strconv.FormatFloat(v.Value, 'E', -1, 64),
  1096. },
  1097. },
  1098. }
  1099. for k, v := range v.Tags {
  1100. metric.Tags = append([]influxdb.SKeyValue{
  1101. {
  1102. Key: k,
  1103. Value: v,
  1104. },
  1105. }, tags...)
  1106. }
  1107. mu.Lock()
  1108. metrics = append(metrics, metric)
  1109. mu.Unlock()
  1110. }
  1111. }
  1112. }(_metricType)
  1113. }
  1114. wg.Wait()
  1115. return self.sendMetrics(ctx, manager, "k8s", len(res), metrics)
  1116. }
  1117. func (driver *SCollectByMetricTypeDriver) CollectLoadbalancerMetrics(ctx context.Context, manager api.CloudproviderDetails, provider cloudprovider.ICloudProvider, res map[string]api.LoadbalancerDetails, start, end time.Time) error {
  1118. metrics := []influxdb.SMetricData{}
  1119. var wg sync.WaitGroup
  1120. var mu sync.Mutex
  1121. for _, _metricType := range cloudprovider.ALL_LB_METRIC_TYPES {
  1122. wg.Add(1)
  1123. go func(metricType cloudprovider.TMetricType) {
  1124. defer func() {
  1125. wg.Done()
  1126. }()
  1127. opts := &cloudprovider.MetricListOptions{
  1128. ResourceType: cloudprovider.METRIC_RESOURCE_TYPE_LB,
  1129. MetricType: metricType,
  1130. StartTime: start,
  1131. EndTime: end,
  1132. }
  1133. data, err := provider.GetMetrics(opts)
  1134. if err != nil {
  1135. if errors.Cause(err) != cloudprovider.ErrNotImplemented && errors.Cause(err) != cloudprovider.ErrNotSupported {
  1136. log.Errorf("get slb %s(%s) %s error: %v", manager.Name, manager.Id, metricType, err)
  1137. return
  1138. }
  1139. return
  1140. }
  1141. for _, value := range data {
  1142. slb, ok := res[value.Id]
  1143. if !ok {
  1144. continue
  1145. }
  1146. tags := []influxdb.SKeyValue{}
  1147. for k, v := range slb.GetMetricTags() {
  1148. tags = append(tags, influxdb.SKeyValue{
  1149. Key: k,
  1150. Value: v,
  1151. })
  1152. }
  1153. for _, v := range value.Values {
  1154. metric := influxdb.SMetricData{
  1155. Name: value.MetricType.Name(),
  1156. Timestamp: v.Timestamp,
  1157. Tags: []influxdb.SKeyValue{},
  1158. Metrics: []influxdb.SKeyValue{
  1159. {
  1160. Key: value.MetricType.Key(),
  1161. Value: strconv.FormatFloat(v.Value, 'E', -1, 64),
  1162. },
  1163. },
  1164. }
  1165. for k, v := range v.Tags {
  1166. metric.Tags = append(metric.Tags, influxdb.SKeyValue{
  1167. Key: k,
  1168. Value: v,
  1169. })
  1170. }
  1171. metric.Tags = append(metric.Tags, tags...)
  1172. mu.Lock()
  1173. metrics = append(metrics, metric)
  1174. mu.Unlock()
  1175. }
  1176. }
  1177. }(_metricType)
  1178. }
  1179. wg.Wait()
  1180. return driver.sendMetrics(ctx, manager, "slb", len(res), metrics)
  1181. }