| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368 |
- // Copyright 2019 Yunion
- //
- // Licensed under the Apache License, Version 2.0 (the "License");
- // you may not use this file except in compliance with the License.
- // You may obtain a copy of the License at
- //
- // http://www.apache.org/licenses/LICENSE-2.0
- //
- // Unless required by applicable law or agreed to in writing, software
- // distributed under the License is distributed on an "AS IS" BASIS,
- // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- // See the License for the specific language governing permissions and
- // limitations under the License.
- package victoriametrics
- import (
- "context"
- "encoding/json"
- "strconv"
- "time"
- "github.com/influxdata/influxql"
- "github.com/influxdata/promql/v2/pkg/labels"
- "github.com/zexi/influxql-to-metricsql/converter"
- "github.com/zexi/influxql-to-metricsql/converter/translator"
- "golang.org/x/sync/errgroup"
- "yunion.io/x/log"
- "yunion.io/x/pkg/errors"
- "yunion.io/x/pkg/util/sets"
- "yunion.io/x/onecloud/pkg/apis/monitor"
- mod "yunion.io/x/onecloud/pkg/mcclient/modules/monitor"
- "yunion.io/x/onecloud/pkg/monitor/tsdb"
- "yunion.io/x/onecloud/pkg/monitor/tsdb/driver/influxdb"
- )
- func init() {
- tsdb.RegisterTsdbQueryEndpoint(monitor.DataSourceTypeVictoriaMetrics, NewVMAdapter)
- }
- type vmAdapter struct {
- datasource *tsdb.DataSource
- influxdbExecutor *influxdb.InfluxdbExecutor
- }
- func NewVMAdapter(datasource *tsdb.DataSource) (tsdb.TsdbQueryEndpoint, error) {
- ie, _ := influxdb.NewInfluxdbExecutor(nil)
- return &vmAdapter{
- datasource: datasource,
- influxdbExecutor: ie.(*influxdb.InfluxdbExecutor),
- }, nil
- }
- // Query implements tsdb.TsdbQueryEndpoint.
- func (vm *vmAdapter) Query(ctx context.Context, ds *tsdb.DataSource, query *tsdb.TsdbQuery) (*tsdb.Response, error) {
- // TODO: use interval inside query
- tsdbResp, _, err := vm.query(ctx, ds, query)
- if err != nil {
- return nil, err
- }
- return tsdbResp, nil
- }
- func (vm *vmAdapter) query(ctx context.Context, ds *tsdb.DataSource, query *tsdb.TsdbQuery) (*tsdb.Response, *Response, error) {
- rawQuery, influxQs, err := vm.influxdbExecutor.GetRawQuery(ds, query)
- if err != nil {
- return nil, nil, errors.Wrapf(err, "get influxdb raw query: %#v", influxQs)
- }
- // TODO: use interval inside query
- return queryByRaw(ctx, ds, rawQuery, query, influxQs[0].Interval)
- }
- func queryByRaw(ctx context.Context, ds *tsdb.DataSource, rawQuery string, query *tsdb.TsdbQuery, interval time.Duration) (*tsdb.Response, *Response, error) {
- promQL, tr, err := convertInfluxQL(rawQuery)
- if err != nil {
- return nil, nil, errors.Wrap(err, "convert influxQL to promQL")
- }
- start := time.Now()
- defer func() {
- log.Infof("influxQL: %s, promQL: %s, elapsed: %s", rawQuery, promQL, time.Now().Sub(start))
- }()
- resp, err := queryRange(ctx, ds, tr, promQL, interval)
- if err != nil {
- return nil, nil, errors.Wrapf(err, "query VM range by: %s", promQL)
- }
- //log.Infof("get vm %s resp: %s", promQL, jsonutils.Marshal(resp).PrettyString())
- tsdbRet, err := convertVMResponse(rawQuery, query, resp)
- if err != nil {
- return nil, resp, errors.Wrapf(err, "convert to tsdb.Response")
- }
- return tsdbRet, resp, nil
- }
- func queryRange(ctx context.Context, ds *tsdb.DataSource, tr *influxql.TimeRange, promQL string, interval time.Duration) (*Response, error) {
- cli, err := NewClient(ds.Url)
- if err != nil {
- return nil, errors.Wrap(err, "New VM client")
- }
- httpCli, err := ds.GetHttpClient()
- if err != nil {
- return nil, errors.Wrap(err, "GetHttpClient of data source")
- }
- vmTr := NewTimeRangeByInfluxTimeRange(tr)
- if interval <= 0 || interval < 1*time.Minute {
- interval = time.Minute * 5
- }
- return cli.QueryRange(ctx, httpCli, promQL, interval, vmTr, false)
- }
- func convertInfluxQL(influxQL string) (string, *influxql.TimeRange, error) {
- promQL, timeRange, err := converter.TranslateWithTimeRange(influxQL)
- if err != nil {
- return "", nil, errors.Wrapf(err, "TranslateWithTimeRange: %s", influxQL)
- }
- return promQL, timeRange, nil
- }
- func convertVMResponse(rawQuery string, tsdbQuery *tsdb.TsdbQuery, resp *Response) (*tsdb.Response, error) {
- result := &tsdb.Response{
- Results: make(map[string]*tsdb.QueryResult),
- }
- for _, query := range tsdbQuery.Queries {
- ret, err := translateResponse(resp, query)
- if err != nil {
- return nil, errors.Wrap(err, "translate response")
- }
- ret.Meta = monitor.QueryResultMeta{
- RawQuery: rawQuery,
- }
- result.Results[query.RefId] = ret
- }
- return result, nil
- }
- func translateResponse(resp *Response, query *tsdb.Query) (*tsdb.QueryResult, error) {
- queryRes := tsdb.NewQueryResult()
- isUnionResult := false
- results := resp.Data.Result
- if len(results) > 0 {
- _, isUnionResult = results[0].Metric[translator.UNION_RESULT_NAME]
- }
- // 添加值不同的 tag key
- diffTagKeys := sets.NewString()
- if len(results) > 1 {
- result0 := results[0]
- restResults := results[1:]
- for tagKey, tagVal := range result0.Metric {
- for _, result := range restResults {
- resultTagVal := result.Metric[tagKey]
- if tagVal != resultTagVal {
- diffTagKeys.Insert(tagKey)
- break
- }
- }
- }
- } else if len(results) == 1 {
- for tagKey := range results[0].Metric {
- diffTagKeys.Insert(tagKey)
- }
- }
- if !isUnionResult {
- for _, result := range results {
- ss := transformSeries(result, query, diffTagKeys)
- queryRes.Series = append(queryRes.Series, ss...)
- }
- } else {
- // process union multiple fields response
- points, err := newPointsByResults(results)
- if err != nil {
- return nil, errors.Wrap(err, "process multi fields")
- }
- ss := transPointsToSeries(points, query)
- queryRes.Series = ss
- }
- return queryRes, nil
- }
- // Check VictoriaMetrics response at: https://docs.victoriametrics.com/keyConcepts.html#range-query
- func transformSeries(vmResult ResponseDataResult, query *tsdb.Query, diffTagKeys sets.String) monitor.TimeSeriesSlice {
- var result monitor.TimeSeriesSlice
- metric := vmResult.Metric
- points := transValuesToTSDBPoints(vmResult.Values)
- tags := reviseTags(metric)
- aliasName := ""
- if len(query.Selects) > 0 {
- lastSel := query.Selects[len(query.Selects)-1]
- lastSelPart := lastSel[len(lastSel)-1]
- if lastSelPart.Type == "alias" && len(lastSelPart.Params) > 0 {
- aliasName = lastSelPart.Params[0]
- }
- }
- metricName := metric[labels.MetricName]
- if metricName == "" {
- metricName = "value"
- }
- if aliasName != "" {
- metricName = aliasName
- }
- ts := tsdb.NewTimeSeries(metricName, formatRawName(0, metricName, query, tags, diffTagKeys), []string{metricName, "time"}, points, tags)
- result = append(result, ts)
- return result
- }
- func formatRawName(idx int, name string, query *tsdb.Query, tags map[string]string, diffTagKeys sets.String) string {
- groupByTags := []string{}
- if query != nil {
- for _, group := range query.GroupBy {
- if group.Type == "tag" {
- groupByTags = append(groupByTags, group.Params[0])
- }
- }
- }
- return tsdb.FormatRawName(idx, name, groupByTags, tags, diffTagKeys)
- }
- func parseTimepoint(val ResponseDataResultValue) (monitor.TimePoint, error) {
- timepoint := make(monitor.TimePoint, 0)
- // parse timestamp
- timestampNumber, _ := val[0].(json.Number)
- timestamp, err := timestampNumber.Float64()
- if err != nil {
- return monitor.TimePoint{}, errors.Wrapf(err, "parse timestampNumber")
- }
- // to influxdb timestamp format, millisecond ?
- timestamp *= 1000
- // parse value
- for i := 1; i < len(val); i++ {
- valStr := val[i]
- pVal := parsePointValue(valStr)
- timepoint = append(timepoint, pVal)
- }
- timepoint = append(timepoint, timestamp)
- return timepoint, nil
- }
- func parsePointValue(value interface{}) interface{} {
- number, ok := value.(json.Number)
- if !ok {
- // try parse string
- valStr, ok := value.(string)
- if ok {
- valF, err := strconv.ParseFloat(valStr, 64)
- if err == nil {
- return &valF
- }
- return value
- }
- return value
- }
- fvalue, err := number.Float64()
- if err == nil {
- return &fvalue
- }
- ivalue, err := number.Int64()
- if err == nil {
- ret := float64(ivalue)
- return &ret
- }
- return number.String()
- }
- func (vm *vmAdapter) checkMeasurementField(ctx context.Context, ds *tsdb.DataSource, from, to string, ms *monitor.InfluxMeasurement, field string, tagFilter *monitor.MetricQueryTag) (bool, error) {
- q := mod.NewAlertQuery(ms.Database, ms.Measurement).From(from).To(to)
- q.Interval("30m")
- q.Selects().Select(field).COUNT()
- if tagFilter != nil {
- q.Where().AddTag(tagFilter)
- }
- tq := q.ToTsdbQuery()
- resp, rawResp, err := vm.query(ctx, ds, tq)
- if err != nil {
- return false, errors.Wrap(err, "VictoriaMetrics.Query")
- }
- ss := resp.Results[""].Series
- for _, s := range ss {
- if len(s.Points) > 0 {
- return true, nil
- }
- }
- if rawResp.Stats.SeriesFetched != "" {
- fetched, _ := strconv.Atoi(rawResp.Stats.SeriesFetched)
- if fetched >= 1 {
- return true, nil
- }
- }
- return false, nil
- }
- func (vm *vmAdapter) FilterMeasurement(ctx context.Context, ds *tsdb.DataSource, from, to string, ms *monitor.InfluxMeasurement, tagFilter *monitor.MetricQueryTag) (*monitor.InfluxMeasurement, error) {
- retMs := new(monitor.InfluxMeasurement)
- retFieldExists := make([]bool, len(ms.FieldKey))
- errgrp := new(errgroup.Group)
- for i := range ms.FieldKey {
- index := i
- errgrp.Go(func() error {
- field := ms.FieldKey[index]
- exists, err := vm.checkMeasurementField(ctx, ds, from, to, ms, field, tagFilter)
- if err != nil {
- return errors.Wrapf(err, "check meaurement field %s %s", ms.Measurement, field)
- }
- if exists {
- retFieldExists[index] = true
- }
- return nil
- })
- }
- if err := errgrp.Wait(); err != nil {
- log.Warningf("victoriametrics check measurement field: %s", err)
- }
- retFields := sets.NewString()
- for i := range retFieldExists {
- if retFieldExists[i] {
- retFields.Insert(ms.FieldKey[i])
- }
- }
- retMs.FieldKey = retFields.List()
- if len(retMs.FieldKey) != 0 {
- retMs.Measurement = ms.Measurement
- retMs.Database = ms.Database
- retMs.ResType = ms.ResType
- }
- return retMs, nil
- }
- func (vm *vmAdapter) FillSelect(query *monitor.AlertQuery, isAlert bool) *monitor.AlertQuery {
- if isAlert {
- query = influxdb.FillSelectWithMean(query)
- }
- return query
- }
- func (vm *vmAdapter) FillGroupBy(query *monitor.AlertQuery, inputQuery *monitor.MetricQueryInput, tagId string, isAlert bool) *monitor.AlertQuery {
- if isAlert {
- query = FillGroupByWithWildChar(query, inputQuery, tagId)
- }
- return query
- }
- func FillGroupByWithWildChar(query *monitor.AlertQuery, inputQuery *monitor.MetricQueryInput, tagId string) *monitor.AlertQuery {
- if len(tagId) == 0 {
- tagId = "*"
- }
- query.Model.GroupBy = append(query.Model.GroupBy,
- monitor.MetricQueryPart{
- Type: "field",
- Params: []string{tagId},
- })
- return query
- }
|