| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647 |
- package translator
- import (
- "fmt"
- "log"
- "strconv"
- "strings"
- "time"
- "github.com/influxdata/influxql"
- "github.com/influxdata/promql/v2"
- "github.com/influxdata/promql/v2/pkg/labels"
- "github.com/pkg/errors"
- "github.com/prometheus/common/model"
- )
- const UNION_RESULT_NAME = "__union_result__"
- const (
- CALL_TOP = "top"
- CALL_BOTTOM = "bottom"
- CALL_PERCENTILE = "percentile"
- CALL_SUM = "sum"
- CALL_MIN = "min"
- CALL_MAX = "max"
- )
- var MUL_ARGS_AGGREGATOR MulArgsAggregator = []string{CALL_TOP, CALL_PERCENTILE, CALL_BOTTOM}
- type MulArgsAggregator []string
- func (m MulArgsAggregator) Has(aggr string) bool {
- for _, a := range m {
- if a == aggr {
- return true
- }
- }
- return false
- }
- type promQL struct {
- groupByWildcard bool
- timeRange *influxql.TimeRange
- fieldIsWildcard bool
- measurement string
- labelsVisitor *labelsVisitor
- }
- func NewPromQL() Translator {
- return &promQL{
- labelsVisitor: newLabelsVisitor(),
- }
- }
- func (m *promQL) Translate(s influxql.Statement) (string, error) {
- selectS, ok := s.(*influxql.SelectStatement)
- if !ok {
- return "", errors.Errorf("Only SelectStatement is supported, input %t", s)
- }
- return m.translate(selectS)
- }
- func (m *promQL) GetTimeRange() *influxql.TimeRange {
- return m.timeRange
- }
- type fieldResult struct {
- metricName string
- aggrOps []*AggrOperator
- expr promql.Expr
- }
- func newFieldResult(metricName string, ops []*AggrOperator, expr promql.Expr) *fieldResult {
- return &fieldResult{
- metricName: metricName,
- aggrOps: ops,
- expr: expr,
- }
- }
- func (m *promQL) translateField(s *influxql.SelectStatement, field *influxql.Field) (*fieldResult, error) {
- metricName, err := getMetricName(s.Sources, field)
- if err != nil {
- if errors.Cause(err) == ErrVariableIsWildcard {
- m.measurement = metricName
- m.fieldIsWildcard = true
- } else {
- return nil, errors.Wrap(err, "getMetricName")
- }
- }
- aggrOps, err := getAggrOperators(field)
- if err != nil {
- return nil, errors.Wrap(err, "get field aggregate operator")
- }
- cond, timeRange, err := getTimeRange(s.Condition)
- if err != nil {
- return nil, errors.Wrap(err, "getTimeRange")
- }
- m.timeRange = timeRange
- matchers, err := m.getLabels(m.labelsVisitor, cond)
- if err != nil {
- return nil, errors.Wrap(err, "get matchers")
- }
- if !m.fieldIsWildcard {
- nameMatcher, _ := labels.NewMatcher(labels.MatchEqual, labels.MetricName, metricName)
- matchers = append(matchers, nameMatcher)
- }
- lookbehindWin, groups, err := m.getGroups(s.Dimensions)
- if err != nil {
- return nil, errors.Wrap(err, "get groups")
- }
- //interval, err := s.GroupByInterval()
- //if err != nil {
- // return "", errors.Wrap(err, "GroupByInterval")
- //}
- //fmt.Printf("==get interval: %#v\n", interval)
- expr, err := m.generateExpr(metricName, matchers, lookbehindWin, aggrOps, groups)
- if err != nil {
- return nil, errors.Wrap(err, "generate expression")
- }
- return newFieldResult(metricName, aggrOps, expr), nil
- }
- func (m *promQL) translate(s *influxql.SelectStatement) (string, error) {
- exprs := make([]*fieldResult, 0)
- var resultExpr promql.Expr
- for _, field := range s.Fields {
- expr, err := m.translateField(s, field)
- if err != nil {
- return "", errors.Wrapf(err, "translate field %s", field)
- }
- exprs = append(exprs, expr)
- }
- if len(exprs) == 1 {
- resultExpr = exprs[0].expr
- } else {
- // union field expr
- resultExpr = unionFieldsExpr(exprs)
- }
- return m.formatExpr(resultExpr), nil
- }
- func unionFieldsExpr(exprs []*fieldResult) promql.Expr {
- result := make([]promql.Expr, len(exprs))
- // 1. wrap each expr with label_set: https://docs.victoriametrics.com/MetricsQL.html#label_set
- setKey := UNION_RESULT_NAME
- for i := range exprs {
- expr := exprs[i]
- setValue := expr.metricName
- if len(expr.aggrOps) > 0 {
- opsNames := make([]string, len(expr.aggrOps))
- for i := range expr.aggrOps {
- opsNames[i] = expr.aggrOps[i].Name
- }
- setValue = fmt.Sprintf("%s_%s", strings.Join(opsNames, "_"), expr.metricName)
- }
- result[i] = &promql.Call{
- Func: &promql.Function{
- Name: "label_set",
- ArgTypes: []promql.ValueType{promql.ValueTypeVector, promql.ValueTypeString, promql.ValueTypeString},
- Variadic: 1,
- ReturnType: promql.ValueTypeVector,
- },
- Args: promql.Expressions{
- expr.expr,
- &promql.StringLiteral{setKey},
- &promql.StringLiteral{setValue},
- },
- }
- }
- // 2. use union: https://docs.victoriametrics.com/MetricsQL.html#union
- return &promql.Call{
- Func: &promql.Function{
- Name: "union",
- Variadic: 1,
- ReturnType: promql.ValueTypeVector,
- },
- Args: result,
- }
- }
- func getTimeRange(cond influxql.Expr) (influxql.Expr, *influxql.TimeRange, error) {
- // parse time range
- //mustParseTime := func(value string) time.Time {
- // ts, err := time.Parse(time.RFC3339, value)
- // if err != nil {
- // panic(fmt.Errorf("unable to parse time: %s", err))
- // }
- // return ts
- //}
- //now := mustParseTime("2000-01-01T00:00:00Z")
- valuer := influxql.NowValuer{
- Now: time.Now(),
- Location: time.UTC,
- }
- cond, timeRange, err := influxql.ConditionExpr(cond, &valuer)
- if err != nil {
- return nil, nil, errors.Wrapf(err, "parse time range from %q", cond)
- }
- if timeRange.IsZero() {
- return cond, nil, nil
- }
- // process maxTime
- if !timeRange.MaxTime().IsZero() {
- year, month, day := timeRange.Max.Date()
- // FIX: time.Date(1, time.January, 1, 0, 0, 0, 0, time.UTC)
- if year == 1 && month == 1 && day == 1 {
- timeRange.Max = time.Now()
- }
- }
- return cond, &timeRange, nil
- }
- func (m promQL) generateExpr(
- metricName string,
- ls []*labels.Matcher,
- lookbehindWindow string,
- aggrOps []*AggrOperator,
- groups []string) (promql.Expr, error) {
- //fmt.Printf("=====name: %s, labels: %#v, lookbehindWindow: %q, aggrOps: %#v, groups: %#v\n", metricName, ls, lookbehindWindow, aggrOps, groups)
- for _, l := range ls {
- fmt.Printf("label: %s\n", l.String())
- }
- if m.fieldIsWildcard {
- measurementM, _ := labels.NewMatcher(labels.MatchRegexp, labels.MetricName, fmt.Sprintf("^%s_.*", m.measurement))
- ls = append(ls, measurementM)
- }
- var result promql.Expr
- if len(aggrOps) != 0 {
- if lookbehindWindow == "" {
- lookbehindWindow = "1m"
- }
- dur, err := model.ParseDuration(lookbehindWindow)
- if err != nil {
- return nil, errors.Wrapf(err, "ParseDuration: %q", lookbehindWindow)
- }
- ms := &promql.MatrixSelector{
- LabelMatchers: ls,
- Range: time.Duration(dur),
- }
- if !m.fieldIsWildcard {
- ms.Name = metricName
- }
- result = ms
- } else {
- vs := &promql.VectorSelector{
- LabelMatchers: ls,
- }
- if !m.fieldIsWildcard {
- vs.Name = metricName
- }
- result = vs
- }
- if len(groups) != 0 && len(aggrOps) == 0 {
- return nil, errors.Errorf("Can't use group by when aggregate operator is empty")
- }
- result = getAggrExpr(aggrOps, result)
- //fmt.Printf("=====m.GroupByWildcard: %v, %#v, aggrOps: %#v\n", m.groupByWildcard, result, aggrOps)
- shouldSkipAggr := func(opName string) bool {
- switch opName {
- case CALL_PERCENTILE, CALL_TOP, CALL_BOTTOM, "last":
- return true
- }
- return false
- }
- if len(aggrOps) != 0 {
- op := promql.ItemAvg
- opName := aggrOps[0].Name
- if !shouldSkipAggr(opName) {
- switch opName {
- case CALL_SUM:
- op = promql.ItemSum
- case CALL_MAX:
- op = promql.ItemMax
- case CALL_MIN:
- op = promql.ItemMin
- }
- expr := &promql.AggregateExpr{
- Op: op,
- Expr: result,
- }
- if len(groups) != 0 {
- expr.Grouping = groups
- }
- if !m.groupByWildcard {
- result = expr
- }
- }
- }
- return result, nil
- }
- func (m promQL) formatExpr(expr promql.Expr) string {
- initialExpr := expr.String()
- //fmt.Printf("---replaceLabels: %#v\n", m.labelsVisitor.replaceLabels)
- //fmt.Printf("--src: %s\n", initialExpr)
- if len(m.labelsVisitor.replaceLabels) > 0 && len(m.labelsVisitor.labels) > 1 {
- for src, replace := range m.labelsVisitor.replaceLabels {
- initialExpr = strings.ReplaceAll(initialExpr, src, replace)
- }
- }
- return initialExpr
- }
- func newAggrExpr(name string, argType promql.ValueType, returnType promql.ValueType, restExpr promql.Expr) promql.Expr {
- return newAggrExprWithArgs(name, []promql.ValueType{argType}, returnType, promql.Expressions{restExpr})
- }
- func newAggrExprWithArgs(name string, args []promql.ValueType, returnType promql.ValueType, restExprs promql.Expressions) promql.Expr {
- return &promql.Call{
- Func: &promql.Function{
- Name: name,
- ArgTypes: args,
- Variadic: 0,
- ReturnType: returnType,
- },
- Args: restExprs,
- }
- }
- func getAggrExpr(ops []*AggrOperator, expr promql.Expr) promql.Expr {
- if len(ops) == 0 {
- return expr
- }
- aggrOp := ops[0]
- restOps := ops[1:]
- restExpr := getAggrExpr(restOps, expr)
- switch aggrOp.Name {
- case "abs":
- // https://prometheus.io/docs/prometheus/latest/querying/functions/#abs
- expr = newAggrExpr("abs", promql.ValueTypeVector, promql.ValueTypeVector, restExpr)
- case "mean":
- // https://docs.victoriametrics.com/MetricsQL.html#avg_over_time
- expr = newAggrExpr("avg_over_time", promql.ValueTypeMatrix, promql.ValueTypeVector, restExpr)
- case "last":
- // https://docs.victoriametrics.com/MetricsQL.html#last_over_time
- expr = newAggrExpr("last_over_time", promql.ValueTypeMatrix, promql.ValueTypeVector, restExpr)
- case "stddev":
- // https://prometheus.io/docs/prometheus/latest/querying/functions/#aggregation_over_time
- expr = newAggrExpr("stddev_over_time", promql.ValueTypeMatrix, promql.ValueTypeVector, restExpr)
- case "count":
- // use count, not use 'count_over_time' https://docs.victoriametrics.com/MetricsQL.html#count_over_time
- expr = newAggrExpr("count", promql.ValueTypeMatrix, promql.ValueTypeVector, restExpr)
- case "median":
- // https://docs.victoriametrics.com/MetricsQL.html#median_over_time
- expr = newAggrExpr("median_over_time", promql.ValueTypeMatrix, promql.ValueTypeVector, restExpr)
- /*case "max":
- // https://docs.victoriametrics.com/MetricsQL.html#max_over_time
- expr = newAggrExpr("max", promql.ValueTypeMatrix, promql.ValueTypeVector, restExpr)
- case "min":
- // https://docs.victoriametrics.com/MetricsQL.html#min_over_time
- expr = newAggrExpr("min", promql.ValueTypeMatrix, promql.ValueTypeVector, restExpr)*/
- case "mode":
- // https://docs.victoriametrics.com/MetricsQL.html#mode_over_time
- expr = newAggrExpr("mode_over_time", promql.ValueTypeMatrix, promql.ValueTypeVector, restExpr)
- case "integral":
- // https://docs.victoriametrics.com/MetricsQL.html#integrate
- expr = newAggrExpr("integrate", promql.ValueTypeMatrix, promql.ValueTypeVector, restExpr)
- case "distinct":
- expr = newAggrExpr("distinct", promql.ValueTypeMatrix, promql.ValueTypeVector, restExpr)
- case CALL_TOP:
- expr = newAggrExprWithArgs("topk_avg",
- []promql.ValueType{
- promql.ValueTypeString,
- promql.ValueTypeMatrix,
- }, promql.ValueTypeVector,
- promql.Expressions{
- aggrOp.Args[0],
- restExpr})
- case CALL_BOTTOM:
- expr = newAggrExprWithArgs("bottomk_avg",
- []promql.ValueType{
- promql.ValueTypeString,
- promql.ValueTypeMatrix,
- }, promql.ValueTypeVector,
- promql.Expressions{
- aggrOp.Args[0],
- restExpr})
- case CALL_PERCENTILE:
- expr = newAggrExprWithArgs("quantile_over_time",
- []promql.ValueType{
- promql.ValueTypeString,
- promql.ValueTypeMatrix,
- }, promql.ValueTypeVector,
- promql.Expressions{
- aggrOp.Args[0],
- restExpr})
- }
- return expr
- }
- type AggrOperator struct {
- Name string
- Args promql.Expressions
- }
- func newAggrOperatorByName(name string) *AggrOperator {
- return &AggrOperator{
- Name: name,
- }
- }
- func getAggrOperator(op *influxql.Call) ([]*AggrOperator, error) {
- if len(op.Args) != 1 && !MUL_ARGS_AGGREGATOR.Has(op.Name) {
- return nil, errors.Errorf("not supported aggregator: %s with args: %#v", op.String(), op.Args)
- }
- aggOp := newAggrOperatorByName(op.Name)
- if op.Name == CALL_TOP || op.Name == CALL_BOTTOM || op.Name == CALL_PERCENTILE {
- numStr := op.Args[len(op.Args)-1].String()
- num, err := strconv.Atoi(numStr)
- if err != nil {
- return nil, errors.Wrapf(err, "parse top/bottom aggregator: %s", op)
- }
- numFloat := float64(num)
- if op.Name == CALL_PERCENTILE {
- if numFloat > 100 {
- return nil, errors.Errorf("percentile %f is large than 100", numFloat)
- }
- numFloat = numFloat / 100
- }
- aggOp.Args = promql.Expressions{
- &promql.NumberLiteral{Val: numFloat},
- }
- }
- ret := []*AggrOperator{aggOp}
- args, ok := op.Args[0].(*influxql.Call)
- if !ok {
- return ret, nil
- }
- rest, err := getAggrOperator(args)
- if err != nil {
- return nil, errors.Wrapf(err, "get rest aggregate operator: %s", args.String())
- }
- ret = append(ret, rest...)
- return ret, nil
- }
- func getAggrOperators(field *influxql.Field) ([]*AggrOperator, error) {
- aggrOp, ok := field.Expr.(*influxql.Call)
- if !ok {
- return nil, nil
- }
- return getAggrOperator(aggrOp)
- }
- func getMetricName(sources influxql.Sources, field *influxql.Field) (string, error) {
- if len(sources) != 1 {
- return "", errors.Errorf("sources %#v length doesn't equal 1", sources)
- }
- src := sources[0]
- measurement, ok := src.(*influxql.Measurement)
- if !ok {
- return "", errors.Errorf("source %#v is not measurement type", src)
- }
- var (
- fieldName string
- err error
- )
- switch expr := field.Expr.(type) {
- case *influxql.VarRef:
- fieldName = expr.Val
- case *influxql.Call:
- fieldName, err = getCallVariable(expr)
- default:
- return "", errors.Errorf("field.Expr %#v is not supported", expr)
- }
- if err != nil {
- return measurement.Name, err
- }
- return fmt.Sprintf("%s_%s", measurement.Name, fieldName), nil
- }
- var (
- ErrVariableIsWildcard = errors.New("variable field is wildcard")
- )
- func getCallVariable(c *influxql.Call) (string, error) {
- if len(c.Args) != 1 && !MUL_ARGS_AGGREGATOR.Has(c.Name) {
- return "", errors.Errorf("length of call %q args %#v != 1", c.Name, c.Args)
- }
- switch args := c.Args[0].(type) {
- case *influxql.VarRef:
- return args.Val, nil
- case *influxql.Wildcard:
- return "", ErrVariableIsWildcard
- case *influxql.Call:
- return getCallVariable(args)
- default:
- return "", errors.Errorf("unsupported args %#v", args)
- }
- return c.Args[0].String(), nil
- }
- type labelsVisitor struct {
- err error
- labels []*labels.Matcher
- curKey string
- curOp influxql.Token
- curVal string
- replaceLabels map[string]string
- }
- func newLabelsVisitor() *labelsVisitor {
- return &labelsVisitor{
- err: nil,
- labels: make([]*labels.Matcher, 0),
- replaceLabels: map[string]string{},
- }
- }
- func (l *labelsVisitor) Error() error {
- return l.err
- }
- func (l *labelsVisitor) Labels() []*labels.Matcher {
- return l.labels
- }
- func (l *labelsVisitor) commitLabel() error {
- if l.err != nil {
- return l.err
- }
- var (
- label *labels.Matcher
- err error
- )
- var promOP labels.MatchType
- switch l.curOp {
- case influxql.EQ:
- promOP = labels.MatchEqual
- case influxql.NEQ:
- promOP = labels.MatchNotEqual
- case influxql.EQREGEX:
- promOP = labels.MatchRegexp
- case influxql.NEQREGEX:
- promOP = labels.MatchNotRegexp
- default:
- return errors.Errorf("Not suport influxdb operator: %s", l.curOp)
- }
- label, err = labels.NewMatcher(promOP, l.curKey, l.curVal)
- if err != nil {
- return errors.Wrapf(err, "not supported operator: %q", l.curOp)
- }
- l.labels = append(l.labels, label)
- return nil
- }
- func (l *labelsVisitor) Visit(node influxql.Node) influxql.Visitor {
- //fmt.Printf("-- visit: %s, %#v\n", node, node)
- if l.err != nil {
- log.Printf("error happend: %v, visting skipped", l.err)
- return l
- }
- switch expr := node.(type) {
- case *influxql.BinaryExpr:
- if expr.Op != influxql.OR && expr.Op != influxql.AND {
- l.curOp = expr.Op
- }
- l.Visit(expr.LHS)
- if expr.Op == influxql.OR {
- curLabel := l.labels[len(l.labels)-1]
- key := fmt.Sprintf("%s,", curLabel.String())
- l.replaceLabels[key] = fmt.Sprintf("%s or ", curLabel.String())
- }
- l.Visit(expr.RHS)
- return nil
- case *influxql.VarRef:
- l.curKey = expr.Val
- case *influxql.StringLiteral:
- l.curVal = expr.Val
- if err := l.commitLabel(); err != nil {
- l.err = err
- }
- case *influxql.RegexLiteral:
- l.curVal = expr.Val.String()
- if err := l.commitLabel(); err != nil {
- l.err = err
- }
- }
- return l
- }
- func (m promQL) getLabels(v *labelsVisitor, cond influxql.Expr) ([]*labels.Matcher, error) {
- if cond == nil {
- return nil, nil
- }
- influxql.Walk(v, cond)
- return v.Labels(), v.Error()
- }
- func (m *promQL) getGroups(groups influxql.Dimensions) (string, []string, error) {
- result := []string{}
- var (
- lookbehindWindow string
- )
- for _, group := range groups {
- tmpWin, grp, err := m.getGroup(group)
- if err != nil {
- return "", result, errors.Wrapf(err, "getGroup %q", group)
- }
- if tmpWin != "" {
- lookbehindWindow = tmpWin
- }
- if grp != "" {
- result = append(result, grp)
- }
- }
- return lookbehindWindow, result, nil
- }
- func (m *promQL) getGroup(group *influxql.Dimension) (string, string, error) {
- //fmt.Printf("---try group: %#v\n", group)
- grp := group.Expr
- lookbehindWindow := ""
- switch expr := grp.(type) {
- case *influxql.Call:
- if expr.Name == "time" {
- lookbehindWindow = expr.Args[0].String()
- }
- return lookbehindWindow, "", nil
- case *influxql.VarRef:
- return "", expr.Val, nil
- case *influxql.Wildcard:
- m.groupByWildcard = true
- return "", "", nil
- }
- return "", "", errors.Errorf("not support %q", group.String())
- }
|