| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192 |
- // Copyright 2019 Yunion
- //
- // Licensed under the Apache License, Version 2.0 (the "License");
- // you may not use this file except in compliance with the License.
- // You may obtain a copy of the License at
- //
- // http://www.apache.org/licenses/LICENSE-2.0
- //
- // Unless required by applicable law or agreed to in writing, software
- // distributed under the License is distributed on an "AS IS" BASIS,
- // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- // See the License for the specific language governing permissions and
- // limitations under the License.
- package victoriametrics
- import (
- "fmt"
- "reflect"
- "sort"
- "strings"
- "github.com/influxdata/promql/v2/pkg/labels"
- "github.com/zexi/influxql-to-metricsql/converter/translator"
- "yunion.io/x/log"
- "yunion.io/x/pkg/errors"
- "yunion.io/x/pkg/util/sets"
- "yunion.io/x/onecloud/pkg/apis/monitor"
- "yunion.io/x/onecloud/pkg/monitor/tsdb"
- )
- func newMapId(input map[string]string, ignoreKeys ...string) string {
- keys := make([]string, 0)
- ignoreKS := sets.NewString(ignoreKeys...)
- for key := range input {
- if ignoreKS.Has(key) {
- continue
- }
- keys = append(keys, key)
- }
- sort.Strings(keys)
- pairs := make([]string, len(keys))
- for i, key := range keys {
- pair := fmt.Sprintf("%s->%s", key, input[key])
- pairs[i] = pair
- }
- return strings.Join(pairs, ",")
- }
- type points struct {
- id string
- columns []string
- values []ResponseDataResultValue
- tags map[string]string
- }
- func (p *points) add(op *points) error {
- if len(op.columns) != 1 {
- return errors.Errorf("input points' columns are %#v, which length isn't equal 1", op.columns)
- }
- p.columns = append(p.columns, op.columns[0])
- // merge values
- for i, val := range p.values {
- oVal := op.values[i]
- valTime := val[0]
- oValTime := oVal[0]
- if valTime != oValTime {
- return errors.Errorf("value time %v != other value time %v", valTime, oValTime)
- }
- if len(oVal) != 2 {
- return errors.Errorf("input values' are %#v, which length isn't equal 2", oVal)
- }
- val = append(val, oVal[1])
- p.values[i] = val
- }
- return nil
- }
- func (p *points) isEqual(op *points) bool {
- if p.id != op.id {
- return false
- }
- return reflect.DeepEqual(p.columns, op.columns) && reflect.DeepEqual(p.tags, op.tags) && reflect.DeepEqual(p.values, op.values)
- }
- func newPointsByResult(result ResponseDataResult, sameTimes sets.String) (*points, error) {
- tags := result.Metric
- column, ok := tags[translator.UNION_RESULT_NAME]
- if !ok {
- return nil, errors.Errorf("result tags %#v don't contain key %s", tags, translator.UNION_RESULT_NAME)
- }
- for _, ignoreKey := range []string{
- translator.UNION_RESULT_NAME,
- labels.MetricName,
- } {
- delete(tags, ignoreKey)
- }
- values := result.Values
- id := newMapId(tags)
- filterValues := []ResponseDataResultValue{}
- for _, val := range values {
- valTime := fmt.Sprintf("%s", val[0])
- if sameTimes.Has(valTime) {
- tmpVal := val
- filterValues = append(filterValues, tmpVal)
- }
- }
- return &points{
- id: id,
- columns: []string{column},
- values: filterValues,
- tags: tags,
- }, nil
- }
- func newPointsByResults(results []ResponseDataResult) ([]*points, error) {
- uniq := make(map[string]*points, 0)
- ret := make([]*points, 0)
- var sameTimes sets.String = nil
- for _, result := range results {
- resultTime := sets.NewString()
- for _, v := range result.Values {
- cTime := fmt.Sprintf("%v", v[0])
- resultTime.Insert(cTime)
- }
- if sameTimes == nil {
- sameTimes = resultTime
- } else {
- sameTimes = sameTimes.Intersection(resultTime)
- }
- }
- for _, result := range results {
- p, err := newPointsByResult(result, sameTimes)
- if err != nil {
- return nil, errors.Wrapf(err, "new points by result: %#v", result)
- }
- if ep, ok := uniq[p.id]; ok {
- if err := ep.add(p); err != nil {
- return nil, errors.Wrapf(err, "add point %#v", p)
- }
- } else {
- uniq[p.id] = p
- ret = append(ret, p)
- }
- }
- return ret, nil
- }
- func transPointsToSeries(points []*points, query *tsdb.Query) monitor.TimeSeriesSlice {
- var result monitor.TimeSeriesSlice
- for _, point := range points {
- result = append(result, transPointToSeries(point, query)...)
- }
- return result
- }
- func transValuesToTSDBPoints(vals []ResponseDataResultValue) monitor.TimeSeriesPoints {
- var points monitor.TimeSeriesPoints
- for _, val := range vals {
- point, err := parseTimepoint(val)
- if err != nil {
- log.Errorf("parseTimepoint: %#v", val)
- } else {
- points = append(points, point)
- }
- }
- return points
- }
- func reviseTags(tags map[string]string) map[string]string {
- ret := make(map[string]string)
- for key, val := range tags {
- val_ := strings.ReplaceAll(val, "+", " ")
- ret[key] = val_
- }
- return ret
- }
- func transPointToSeries(p *points, query *tsdb.Query) monitor.TimeSeriesSlice {
- var result monitor.TimeSeriesSlice
- points := transValuesToTSDBPoints(p.values)
- tags := reviseTags(p.tags)
- metricName := strings.Join(p.columns, ",")
- ts := tsdb.NewTimeSeries(metricName, formatRawName(0, metricName, query, tags, nil), append(p.columns, "time"), points, tags)
- result = append(result, ts)
- return result
- }
|