vm.go 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368
  1. // Copyright 2019 Yunion
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package victoriametrics
  15. import (
  16. "context"
  17. "encoding/json"
  18. "strconv"
  19. "time"
  20. "github.com/influxdata/influxql"
  21. "github.com/influxdata/promql/v2/pkg/labels"
  22. "github.com/zexi/influxql-to-metricsql/converter"
  23. "github.com/zexi/influxql-to-metricsql/converter/translator"
  24. "golang.org/x/sync/errgroup"
  25. "yunion.io/x/log"
  26. "yunion.io/x/pkg/errors"
  27. "yunion.io/x/pkg/util/sets"
  28. "yunion.io/x/onecloud/pkg/apis/monitor"
  29. mod "yunion.io/x/onecloud/pkg/mcclient/modules/monitor"
  30. "yunion.io/x/onecloud/pkg/monitor/tsdb"
  31. "yunion.io/x/onecloud/pkg/monitor/tsdb/driver/influxdb"
  32. )
  33. func init() {
  34. tsdb.RegisterTsdbQueryEndpoint(monitor.DataSourceTypeVictoriaMetrics, NewVMAdapter)
  35. }
  36. type vmAdapter struct {
  37. datasource *tsdb.DataSource
  38. influxdbExecutor *influxdb.InfluxdbExecutor
  39. }
  40. func NewVMAdapter(datasource *tsdb.DataSource) (tsdb.TsdbQueryEndpoint, error) {
  41. ie, _ := influxdb.NewInfluxdbExecutor(nil)
  42. return &vmAdapter{
  43. datasource: datasource,
  44. influxdbExecutor: ie.(*influxdb.InfluxdbExecutor),
  45. }, nil
  46. }
  47. // Query implements tsdb.TsdbQueryEndpoint.
  48. func (vm *vmAdapter) Query(ctx context.Context, ds *tsdb.DataSource, query *tsdb.TsdbQuery) (*tsdb.Response, error) {
  49. // TODO: use interval inside query
  50. tsdbResp, _, err := vm.query(ctx, ds, query)
  51. if err != nil {
  52. return nil, err
  53. }
  54. return tsdbResp, nil
  55. }
  56. func (vm *vmAdapter) query(ctx context.Context, ds *tsdb.DataSource, query *tsdb.TsdbQuery) (*tsdb.Response, *Response, error) {
  57. rawQuery, influxQs, err := vm.influxdbExecutor.GetRawQuery(ds, query)
  58. if err != nil {
  59. return nil, nil, errors.Wrapf(err, "get influxdb raw query: %#v", influxQs)
  60. }
  61. // TODO: use interval inside query
  62. return queryByRaw(ctx, ds, rawQuery, query, influxQs[0].Interval)
  63. }
  64. func queryByRaw(ctx context.Context, ds *tsdb.DataSource, rawQuery string, query *tsdb.TsdbQuery, interval time.Duration) (*tsdb.Response, *Response, error) {
  65. promQL, tr, err := convertInfluxQL(rawQuery)
  66. if err != nil {
  67. return nil, nil, errors.Wrap(err, "convert influxQL to promQL")
  68. }
  69. start := time.Now()
  70. defer func() {
  71. log.Infof("influxQL: %s, promQL: %s, elapsed: %s", rawQuery, promQL, time.Now().Sub(start))
  72. }()
  73. resp, err := queryRange(ctx, ds, tr, promQL, interval)
  74. if err != nil {
  75. return nil, nil, errors.Wrapf(err, "query VM range by: %s", promQL)
  76. }
  77. //log.Infof("get vm %s resp: %s", promQL, jsonutils.Marshal(resp).PrettyString())
  78. tsdbRet, err := convertVMResponse(rawQuery, query, resp)
  79. if err != nil {
  80. return nil, resp, errors.Wrapf(err, "convert to tsdb.Response")
  81. }
  82. return tsdbRet, resp, nil
  83. }
  84. func queryRange(ctx context.Context, ds *tsdb.DataSource, tr *influxql.TimeRange, promQL string, interval time.Duration) (*Response, error) {
  85. cli, err := NewClient(ds.Url)
  86. if err != nil {
  87. return nil, errors.Wrap(err, "New VM client")
  88. }
  89. httpCli, err := ds.GetHttpClient()
  90. if err != nil {
  91. return nil, errors.Wrap(err, "GetHttpClient of data source")
  92. }
  93. vmTr := NewTimeRangeByInfluxTimeRange(tr)
  94. if interval <= 0 || interval < 1*time.Minute {
  95. interval = time.Minute * 5
  96. }
  97. return cli.QueryRange(ctx, httpCli, promQL, interval, vmTr, false)
  98. }
  99. func convertInfluxQL(influxQL string) (string, *influxql.TimeRange, error) {
  100. promQL, timeRange, err := converter.TranslateWithTimeRange(influxQL)
  101. if err != nil {
  102. return "", nil, errors.Wrapf(err, "TranslateWithTimeRange: %s", influxQL)
  103. }
  104. return promQL, timeRange, nil
  105. }
  106. func convertVMResponse(rawQuery string, tsdbQuery *tsdb.TsdbQuery, resp *Response) (*tsdb.Response, error) {
  107. result := &tsdb.Response{
  108. Results: make(map[string]*tsdb.QueryResult),
  109. }
  110. for _, query := range tsdbQuery.Queries {
  111. ret, err := translateResponse(resp, query)
  112. if err != nil {
  113. return nil, errors.Wrap(err, "translate response")
  114. }
  115. ret.Meta = monitor.QueryResultMeta{
  116. RawQuery: rawQuery,
  117. }
  118. result.Results[query.RefId] = ret
  119. }
  120. return result, nil
  121. }
  122. func translateResponse(resp *Response, query *tsdb.Query) (*tsdb.QueryResult, error) {
  123. queryRes := tsdb.NewQueryResult()
  124. isUnionResult := false
  125. results := resp.Data.Result
  126. if len(results) > 0 {
  127. _, isUnionResult = results[0].Metric[translator.UNION_RESULT_NAME]
  128. }
  129. // 添加值不同的 tag key
  130. diffTagKeys := sets.NewString()
  131. if len(results) > 1 {
  132. result0 := results[0]
  133. restResults := results[1:]
  134. for tagKey, tagVal := range result0.Metric {
  135. for _, result := range restResults {
  136. resultTagVal := result.Metric[tagKey]
  137. if tagVal != resultTagVal {
  138. diffTagKeys.Insert(tagKey)
  139. break
  140. }
  141. }
  142. }
  143. } else if len(results) == 1 {
  144. for tagKey := range results[0].Metric {
  145. diffTagKeys.Insert(tagKey)
  146. }
  147. }
  148. if !isUnionResult {
  149. for _, result := range results {
  150. ss := transformSeries(result, query, diffTagKeys)
  151. queryRes.Series = append(queryRes.Series, ss...)
  152. }
  153. } else {
  154. // process union multiple fields response
  155. points, err := newPointsByResults(results)
  156. if err != nil {
  157. return nil, errors.Wrap(err, "process multi fields")
  158. }
  159. ss := transPointsToSeries(points, query)
  160. queryRes.Series = ss
  161. }
  162. return queryRes, nil
  163. }
  164. // Check VictoriaMetrics response at: https://docs.victoriametrics.com/keyConcepts.html#range-query
  165. func transformSeries(vmResult ResponseDataResult, query *tsdb.Query, diffTagKeys sets.String) monitor.TimeSeriesSlice {
  166. var result monitor.TimeSeriesSlice
  167. metric := vmResult.Metric
  168. points := transValuesToTSDBPoints(vmResult.Values)
  169. tags := reviseTags(metric)
  170. aliasName := ""
  171. if len(query.Selects) > 0 {
  172. lastSel := query.Selects[len(query.Selects)-1]
  173. lastSelPart := lastSel[len(lastSel)-1]
  174. if lastSelPart.Type == "alias" && len(lastSelPart.Params) > 0 {
  175. aliasName = lastSelPart.Params[0]
  176. }
  177. }
  178. metricName := metric[labels.MetricName]
  179. if metricName == "" {
  180. metricName = "value"
  181. }
  182. if aliasName != "" {
  183. metricName = aliasName
  184. }
  185. ts := tsdb.NewTimeSeries(metricName, formatRawName(0, metricName, query, tags, diffTagKeys), []string{metricName, "time"}, points, tags)
  186. result = append(result, ts)
  187. return result
  188. }
  189. func formatRawName(idx int, name string, query *tsdb.Query, tags map[string]string, diffTagKeys sets.String) string {
  190. groupByTags := []string{}
  191. if query != nil {
  192. for _, group := range query.GroupBy {
  193. if group.Type == "tag" {
  194. groupByTags = append(groupByTags, group.Params[0])
  195. }
  196. }
  197. }
  198. return tsdb.FormatRawName(idx, name, groupByTags, tags, diffTagKeys)
  199. }
  200. func parseTimepoint(val ResponseDataResultValue) (monitor.TimePoint, error) {
  201. timepoint := make(monitor.TimePoint, 0)
  202. // parse timestamp
  203. timestampNumber, _ := val[0].(json.Number)
  204. timestamp, err := timestampNumber.Float64()
  205. if err != nil {
  206. return monitor.TimePoint{}, errors.Wrapf(err, "parse timestampNumber")
  207. }
  208. // to influxdb timestamp format, millisecond ?
  209. timestamp *= 1000
  210. // parse value
  211. for i := 1; i < len(val); i++ {
  212. valStr := val[i]
  213. pVal := parsePointValue(valStr)
  214. timepoint = append(timepoint, pVal)
  215. }
  216. timepoint = append(timepoint, timestamp)
  217. return timepoint, nil
  218. }
  219. func parsePointValue(value interface{}) interface{} {
  220. number, ok := value.(json.Number)
  221. if !ok {
  222. // try parse string
  223. valStr, ok := value.(string)
  224. if ok {
  225. valF, err := strconv.ParseFloat(valStr, 64)
  226. if err == nil {
  227. return &valF
  228. }
  229. return value
  230. }
  231. return value
  232. }
  233. fvalue, err := number.Float64()
  234. if err == nil {
  235. return &fvalue
  236. }
  237. ivalue, err := number.Int64()
  238. if err == nil {
  239. ret := float64(ivalue)
  240. return &ret
  241. }
  242. return number.String()
  243. }
  244. func (vm *vmAdapter) checkMeasurementField(ctx context.Context, ds *tsdb.DataSource, from, to string, ms *monitor.InfluxMeasurement, field string, tagFilter *monitor.MetricQueryTag) (bool, error) {
  245. q := mod.NewAlertQuery(ms.Database, ms.Measurement).From(from).To(to)
  246. q.Interval("30m")
  247. q.Selects().Select(field).COUNT()
  248. if tagFilter != nil {
  249. q.Where().AddTag(tagFilter)
  250. }
  251. tq := q.ToTsdbQuery()
  252. resp, rawResp, err := vm.query(ctx, ds, tq)
  253. if err != nil {
  254. return false, errors.Wrap(err, "VictoriaMetrics.Query")
  255. }
  256. ss := resp.Results[""].Series
  257. for _, s := range ss {
  258. if len(s.Points) > 0 {
  259. return true, nil
  260. }
  261. }
  262. if rawResp.Stats.SeriesFetched != "" {
  263. fetched, _ := strconv.Atoi(rawResp.Stats.SeriesFetched)
  264. if fetched >= 1 {
  265. return true, nil
  266. }
  267. }
  268. return false, nil
  269. }
  270. func (vm *vmAdapter) FilterMeasurement(ctx context.Context, ds *tsdb.DataSource, from, to string, ms *monitor.InfluxMeasurement, tagFilter *monitor.MetricQueryTag) (*monitor.InfluxMeasurement, error) {
  271. retMs := new(monitor.InfluxMeasurement)
  272. retFieldExists := make([]bool, len(ms.FieldKey))
  273. errgrp := new(errgroup.Group)
  274. for i := range ms.FieldKey {
  275. index := i
  276. errgrp.Go(func() error {
  277. field := ms.FieldKey[index]
  278. exists, err := vm.checkMeasurementField(ctx, ds, from, to, ms, field, tagFilter)
  279. if err != nil {
  280. return errors.Wrapf(err, "check meaurement field %s %s", ms.Measurement, field)
  281. }
  282. if exists {
  283. retFieldExists[index] = true
  284. }
  285. return nil
  286. })
  287. }
  288. if err := errgrp.Wait(); err != nil {
  289. log.Warningf("victoriametrics check measurement field: %s", err)
  290. }
  291. retFields := sets.NewString()
  292. for i := range retFieldExists {
  293. if retFieldExists[i] {
  294. retFields.Insert(ms.FieldKey[i])
  295. }
  296. }
  297. retMs.FieldKey = retFields.List()
  298. if len(retMs.FieldKey) != 0 {
  299. retMs.Measurement = ms.Measurement
  300. retMs.Database = ms.Database
  301. retMs.ResType = ms.ResType
  302. }
  303. return retMs, nil
  304. }
  305. func (vm *vmAdapter) FillSelect(query *monitor.AlertQuery, isAlert bool) *monitor.AlertQuery {
  306. if isAlert {
  307. query = influxdb.FillSelectWithMean(query)
  308. }
  309. return query
  310. }
  311. func (vm *vmAdapter) FillGroupBy(query *monitor.AlertQuery, inputQuery *monitor.MetricQueryInput, tagId string, isAlert bool) *monitor.AlertQuery {
  312. if isAlert {
  313. query = FillGroupByWithWildChar(query, inputQuery, tagId)
  314. }
  315. return query
  316. }
  317. func FillGroupByWithWildChar(query *monitor.AlertQuery, inputQuery *monitor.MetricQueryInput, tagId string) *monitor.AlertQuery {
  318. if len(tagId) == 0 {
  319. tagId = "*"
  320. }
  321. query.Model.GroupBy = append(query.Model.GroupBy,
  322. monitor.MetricQueryPart{
  323. Type: "field",
  324. Params: []string{tagId},
  325. })
  326. return query
  327. }