// Copyright 2019 Yunion // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. package providerdriver import ( "context" "strconv" "sync" "time" "yunion.io/x/cloudmux/pkg/cloudprovider" "yunion.io/x/log" "yunion.io/x/pkg/errors" api "yunion.io/x/onecloud/pkg/apis/compute" "yunion.io/x/onecloud/pkg/cloudmon/options" "yunion.io/x/onecloud/pkg/util/influxdb" ) type HcsOpCollect struct { SCollectByResourceIdDriver } func (self *HcsOpCollect) GetProvider() string { return api.CLOUD_PROVIDER_HCSOP } func init() { Register(&HcsOpCollect{}) } func (self *HcsOpCollect) IsSupportMetrics() bool { return true } func (self *HcsOpCollect) CollectStorageMetrics(ctx context.Context, manager api.CloudproviderDetails, provider cloudprovider.ICloudProvider, res map[string]api.StorageDetails, start, end time.Time) error { metrics := []influxdb.SMetricData{} for _, storage := range res { metric := influxdb.SMetricData{ Name: string(cloudprovider.METRIC_RESOURCE_TYPE_STORAGE), Timestamp: time.Now(), Tags: []influxdb.SKeyValue{}, Metrics: []influxdb.SKeyValue{}, } for k, v := range storage.GetMetricTags() { metric.Tags = append(metric.Tags, influxdb.SKeyValue{ Key: k, Value: v, }) } for k, v := range storage.GetMetricPairs() { metric.Metrics = append(metric.Metrics, influxdb.SKeyValue{ Key: k, Value: v, }) } metrics = append(metrics, metric) } return self.sendMetrics(ctx, manager, "storage", len(res), metrics) } func (self *HcsOpCollect) CollectWireMetrics(ctx context.Context, manager api.CloudproviderDetails, provider cloudprovider.ICloudProvider, res map[string]api.WireDetails, start, end time.Time) error { ch := make(chan struct{}, options.Options.CloudResourceCollectMetricsBatchCount) defer close(ch) metrics := []influxdb.SMetricData{} var wg sync.WaitGroup var mu sync.Mutex for i := range res { ch <- struct{}{} wg.Add(1) go func(wire api.WireDetails) { defer func() { wg.Done() <-ch }() opts := &cloudprovider.MetricListOptions{ ResourceType: cloudprovider.METRIC_RESOURCE_TYPE_WIRE, StartTime: start, EndTime: end, } opts.ResourceId = wire.ExternalId tags := []influxdb.SKeyValue{} for k, v := range wire.GetMetricTags() { tags = append(tags, influxdb.SKeyValue{ Key: k, Value: v, }) } data, err := provider.GetMetrics(opts) if err != nil { if errors.Cause(err) != cloudprovider.ErrNotImplemented && errors.Cause(err) != cloudprovider.ErrNotSupported { log.Errorf("get rds %s(%s) error: %v", wire.Name, wire.Id, err) return } return } for _, values := range data { for _, value := range values.Values { metric := influxdb.SMetricData{ Name: values.MetricType.Name(), Timestamp: value.Timestamp, Tags: tags, Metrics: []influxdb.SKeyValue{ { Key: values.MetricType.Key(), Value: strconv.FormatFloat(value.Value, 'E', -1, 64), }, }, } for k, v := range value.Tags { metric.Tags = append(metric.Tags, influxdb.SKeyValue{ Key: k, Value: v, }) } mu.Lock() metrics = append(metrics, metric) mu.Unlock() } } }(res[i]) } wg.Wait() return self.sendMetrics(ctx, manager, "wire", len(res), metrics) }