result_parser.go 5.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192
  1. // Copyright 2019 Yunion
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package victoriametrics
  15. import (
  16. "fmt"
  17. "reflect"
  18. "sort"
  19. "strings"
  20. "github.com/influxdata/promql/v2/pkg/labels"
  21. "github.com/zexi/influxql-to-metricsql/converter/translator"
  22. "yunion.io/x/log"
  23. "yunion.io/x/pkg/errors"
  24. "yunion.io/x/pkg/util/sets"
  25. "yunion.io/x/onecloud/pkg/apis/monitor"
  26. "yunion.io/x/onecloud/pkg/monitor/tsdb"
  27. )
  28. func newMapId(input map[string]string, ignoreKeys ...string) string {
  29. keys := make([]string, 0)
  30. ignoreKS := sets.NewString(ignoreKeys...)
  31. for key := range input {
  32. if ignoreKS.Has(key) {
  33. continue
  34. }
  35. keys = append(keys, key)
  36. }
  37. sort.Strings(keys)
  38. pairs := make([]string, len(keys))
  39. for i, key := range keys {
  40. pair := fmt.Sprintf("%s->%s", key, input[key])
  41. pairs[i] = pair
  42. }
  43. return strings.Join(pairs, ",")
  44. }
  45. type points struct {
  46. id string
  47. columns []string
  48. values []ResponseDataResultValue
  49. tags map[string]string
  50. }
  51. func (p *points) add(op *points) error {
  52. if len(op.columns) != 1 {
  53. return errors.Errorf("input points' columns are %#v, which length isn't equal 1", op.columns)
  54. }
  55. p.columns = append(p.columns, op.columns[0])
  56. // merge values
  57. for i, val := range p.values {
  58. oVal := op.values[i]
  59. valTime := val[0]
  60. oValTime := oVal[0]
  61. if valTime != oValTime {
  62. return errors.Errorf("value time %v != other value time %v", valTime, oValTime)
  63. }
  64. if len(oVal) != 2 {
  65. return errors.Errorf("input values' are %#v, which length isn't equal 2", oVal)
  66. }
  67. val = append(val, oVal[1])
  68. p.values[i] = val
  69. }
  70. return nil
  71. }
  72. func (p *points) isEqual(op *points) bool {
  73. if p.id != op.id {
  74. return false
  75. }
  76. return reflect.DeepEqual(p.columns, op.columns) && reflect.DeepEqual(p.tags, op.tags) && reflect.DeepEqual(p.values, op.values)
  77. }
  78. func newPointsByResult(result ResponseDataResult, sameTimes sets.String) (*points, error) {
  79. tags := result.Metric
  80. column, ok := tags[translator.UNION_RESULT_NAME]
  81. if !ok {
  82. return nil, errors.Errorf("result tags %#v don't contain key %s", tags, translator.UNION_RESULT_NAME)
  83. }
  84. for _, ignoreKey := range []string{
  85. translator.UNION_RESULT_NAME,
  86. labels.MetricName,
  87. } {
  88. delete(tags, ignoreKey)
  89. }
  90. values := result.Values
  91. id := newMapId(tags)
  92. filterValues := []ResponseDataResultValue{}
  93. for _, val := range values {
  94. valTime := fmt.Sprintf("%s", val[0])
  95. if sameTimes.Has(valTime) {
  96. tmpVal := val
  97. filterValues = append(filterValues, tmpVal)
  98. }
  99. }
  100. return &points{
  101. id: id,
  102. columns: []string{column},
  103. values: filterValues,
  104. tags: tags,
  105. }, nil
  106. }
  107. func newPointsByResults(results []ResponseDataResult) ([]*points, error) {
  108. uniq := make(map[string]*points, 0)
  109. ret := make([]*points, 0)
  110. var sameTimes sets.String = nil
  111. for _, result := range results {
  112. resultTime := sets.NewString()
  113. for _, v := range result.Values {
  114. cTime := fmt.Sprintf("%v", v[0])
  115. resultTime.Insert(cTime)
  116. }
  117. if sameTimes == nil {
  118. sameTimes = resultTime
  119. } else {
  120. sameTimes = sameTimes.Intersection(resultTime)
  121. }
  122. }
  123. for _, result := range results {
  124. p, err := newPointsByResult(result, sameTimes)
  125. if err != nil {
  126. return nil, errors.Wrapf(err, "new points by result: %#v", result)
  127. }
  128. if ep, ok := uniq[p.id]; ok {
  129. if err := ep.add(p); err != nil {
  130. return nil, errors.Wrapf(err, "add point %#v", p)
  131. }
  132. } else {
  133. uniq[p.id] = p
  134. ret = append(ret, p)
  135. }
  136. }
  137. return ret, nil
  138. }
  139. func transPointsToSeries(points []*points, query *tsdb.Query) monitor.TimeSeriesSlice {
  140. var result monitor.TimeSeriesSlice
  141. for _, point := range points {
  142. result = append(result, transPointToSeries(point, query)...)
  143. }
  144. return result
  145. }
  146. func transValuesToTSDBPoints(vals []ResponseDataResultValue) monitor.TimeSeriesPoints {
  147. var points monitor.TimeSeriesPoints
  148. for _, val := range vals {
  149. point, err := parseTimepoint(val)
  150. if err != nil {
  151. log.Errorf("parseTimepoint: %#v", val)
  152. } else {
  153. points = append(points, point)
  154. }
  155. }
  156. return points
  157. }
  158. func reviseTags(tags map[string]string) map[string]string {
  159. ret := make(map[string]string)
  160. for key, val := range tags {
  161. val_ := strings.ReplaceAll(val, "+", " ")
  162. ret[key] = val_
  163. }
  164. return ret
  165. }
  166. func transPointToSeries(p *points, query *tsdb.Query) monitor.TimeSeriesSlice {
  167. var result monitor.TimeSeriesSlice
  168. points := transValuesToTSDBPoints(p.values)
  169. tags := reviseTags(p.tags)
  170. metricName := strings.Join(p.columns, ",")
  171. ts := tsdb.NewTimeSeries(metricName, formatRawName(0, metricName, query, tags, nil), append(p.columns, "time"), points, tags)
  172. result = append(result, ts)
  173. return result
  174. }