bucketprobe.go 4.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177
  1. // Copyright 2019 Yunion
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package misc
  15. import (
  16. "context"
  17. "fmt"
  18. "time"
  19. "yunion.io/x/jsonutils"
  20. "yunion.io/x/log"
  21. "yunion.io/x/pkg/errors"
  22. computeapi "yunion.io/x/onecloud/pkg/apis/compute"
  23. "yunion.io/x/onecloud/pkg/cloudmon/options"
  24. "yunion.io/x/onecloud/pkg/mcclient"
  25. "yunion.io/x/onecloud/pkg/mcclient/auth"
  26. computemodules "yunion.io/x/onecloud/pkg/mcclient/modules/compute"
  27. baseoptions "yunion.io/x/onecloud/pkg/mcclient/options"
  28. "yunion.io/x/onecloud/pkg/util/influxdb"
  29. )
  30. func BucketProbe(ctx context.Context, userCred mcclient.TokenCredential, isStart bool) {
  31. if options.Options.EnableBucketProbeDebug {
  32. log.Debugf("BucketProbe start")
  33. }
  34. if !options.Options.EnableBucketProbe {
  35. if options.Options.EnableBucketProbeDebug {
  36. log.Debugf("BucketProbe is disabled")
  37. }
  38. return
  39. }
  40. sess := auth.GetSession(ctx, userCred, options.Options.Region)
  41. metrics, err := gatherBucketMetrics(ctx, sess)
  42. if err != nil {
  43. log.Errorf("BucketProbe gatherBucketMetrics failed: %s", err)
  44. return
  45. }
  46. err = sendMetrics(sess, metrics, "telegraf")
  47. if err != nil {
  48. log.Errorf("StatusProbe SendMetrics error: %s", err)
  49. }
  50. }
  51. func gatherBucketMetrics(ctx context.Context, sess *mcclient.ClientSession) ([]influxdb.SMetricData, error) {
  52. allMetrics := []influxdb.SMetricData{}
  53. params := baseoptions.BaseListOptions{}
  54. params.Scope = "max"
  55. limit := 1000
  56. params.Limit = &limit
  57. params.Filter = []string{
  58. "enable_perf_mon.equals(1)",
  59. }
  60. boolTrue := true
  61. params.Details = &boolTrue
  62. total := -1
  63. offset := 0
  64. for total < 0 || offset < total {
  65. params.Offset = &offset
  66. results, err := computemodules.Buckets.List(sess, jsonutils.Marshal(params))
  67. if err != nil {
  68. return nil, errors.Wrap(err, "computemodules.Buckets.List")
  69. }
  70. total = results.Total
  71. offset = results.Offset + len(results.Data)
  72. for _, bucket := range results.Data {
  73. bucketDetails := computeapi.BucketDetails{}
  74. err = bucket.Unmarshal(&bucketDetails)
  75. if err != nil {
  76. log.Errorf("BucketProbe failed: %s", err)
  77. continue
  78. }
  79. metrics, err := probeBucketStats(ctx, sess, &bucketDetails)
  80. if err != nil {
  81. log.Errorf("BucketProbe failed: %s", err)
  82. continue
  83. }
  84. allMetrics = append(allMetrics, metrics...)
  85. }
  86. }
  87. return allMetrics, nil
  88. }
  89. func probeBucketStats(ctx context.Context, sess *mcclient.ClientSession, bucketDetails *computeapi.BucketDetails) ([]influxdb.SMetricData, error) {
  90. bucket, err := computemodules.GetIBucket(ctx, sess, bucketDetails)
  91. if err != nil {
  92. return nil, errors.Wrap(err, "getIBucket")
  93. }
  94. resultDelay, err := computemodules.ProbeBucketStats(ctx, bucket, options.Options.BucketProbeTestKey, 0)
  95. if err != nil {
  96. return nil, errors.Wrap(err, "doProbeBucketStats zero")
  97. }
  98. resultRate, err := computemodules.ProbeBucketStats(ctx, bucket, options.Options.BucketProbeTestKey, int64(options.Options.BucketProbeTestSizeMb)*1024*1024)
  99. if err != nil {
  100. return nil, errors.Wrap(err, "doProbeBucketStats with payload")
  101. }
  102. metricTags := []influxdb.SKeyValue{}
  103. for k, v := range bucketDetails.GetMetricTags() {
  104. if len(v) == 0 {
  105. continue
  106. }
  107. metricTags = append(metricTags, influxdb.SKeyValue{
  108. Key: k,
  109. Value: v,
  110. })
  111. }
  112. metrics := []influxdb.SKeyValue{}
  113. for k, v := range bucketDetails.GetMetricTags() {
  114. if len(v) == 0 {
  115. continue
  116. }
  117. metrics = append(metrics, influxdb.SKeyValue{
  118. Key: k,
  119. Value: v,
  120. })
  121. }
  122. metrics = append(metrics,
  123. influxdb.SKeyValue{
  124. Key: "upload_delay_ms",
  125. Value: fmt.Sprintf("%f", resultDelay.UploadDelayMs()),
  126. },
  127. influxdb.SKeyValue{
  128. Key: "download_delay_ms",
  129. Value: fmt.Sprintf("%f", resultDelay.DownloadDelayMs()),
  130. },
  131. influxdb.SKeyValue{
  132. Key: "delete_delay_ms",
  133. Value: fmt.Sprintf("%f", resultDelay.DeleteDelayMs()),
  134. },
  135. influxdb.SKeyValue{
  136. Key: "upload_rate_mbps",
  137. Value: fmt.Sprintf("%f", resultRate.UploadThroughputMbps(options.Options.BucketProbeTestSizeMb)),
  138. },
  139. influxdb.SKeyValue{
  140. Key: "download_rate_mbps",
  141. Value: fmt.Sprintf("%f", resultRate.DownloadThroughputMbps(options.Options.BucketProbeTestSizeMb)),
  142. },
  143. )
  144. if options.Options.EnableBucketProbeDebug {
  145. log.Debugf("BucketProbe for bucket %s metrics: %s", bucketDetails.Name, jsonutils.Marshal(metrics))
  146. }
  147. return []influxdb.SMetricData{
  148. {
  149. Name: "bucket_perf",
  150. Tags: metricTags,
  151. Metrics: metrics,
  152. Timestamp: time.Now(),
  153. },
  154. }, nil
  155. }