datasource.go 26 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888
  1. // Copyright 2019 Yunion
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package models
  15. import (
  16. "bytes"
  17. "context"
  18. "fmt"
  19. "math"
  20. "regexp"
  21. "sort"
  22. "strings"
  23. "time"
  24. "github.com/influxdata/promql/v2/pkg/labels"
  25. "github.com/zexi/influxql-to-metricsql/converter/translator"
  26. "golang.org/x/sync/errgroup"
  27. "yunion.io/x/jsonutils"
  28. "yunion.io/x/log"
  29. "yunion.io/x/pkg/errors"
  30. "yunion.io/x/pkg/tristate"
  31. "yunion.io/x/pkg/util/sets"
  32. "yunion.io/x/pkg/utils"
  33. "yunion.io/x/onecloud/pkg/apis/monitor"
  34. "yunion.io/x/onecloud/pkg/cloudcommon/db"
  35. "yunion.io/x/onecloud/pkg/hostman/hostinfo/hostconsts"
  36. "yunion.io/x/onecloud/pkg/httperrors"
  37. "yunion.io/x/onecloud/pkg/mcclient"
  38. "yunion.io/x/onecloud/pkg/mcclient/auth"
  39. "yunion.io/x/onecloud/pkg/monitor/datasource"
  40. merrors "yunion.io/x/onecloud/pkg/monitor/errors"
  41. "yunion.io/x/onecloud/pkg/monitor/tsdb"
  42. "yunion.io/x/onecloud/pkg/monitor/validators"
  43. "yunion.io/x/onecloud/pkg/util/influxdb"
  44. "yunion.io/x/onecloud/pkg/util/stringutils2"
  45. )
  46. const (
  47. VICTORIA_METRICS_DB_TAG_KEY = "db"
  48. VICTORIA_METRICS_DB_TAG_VAL_TELEGRAF = "telegraf"
  49. )
  50. var (
  51. DataSourceManager *SDataSourceManager
  52. compile = regexp.MustCompile(`\w{8}(-\w{4}){3}-\w{12}`)
  53. )
  54. func init() {
  55. DataSourceManager = &SDataSourceManager{
  56. SStandaloneResourceBaseManager: db.NewStandaloneResourceBaseManager(
  57. SDataSource{},
  58. "datasources_tbl",
  59. "datasource",
  60. "datasources",
  61. ),
  62. }
  63. DataSourceManager.SetVirtualObject(DataSourceManager)
  64. }
  65. // +onecloud:swagger-gen-model-singular=datasource
  66. // +onecloud:swagger-gen-model-plural=datasources
  67. type SDataSourceManager struct {
  68. db.SStandaloneResourceBaseManager
  69. }
  70. type SDataSource struct {
  71. db.SStandaloneResourceBase
  72. Type string `nullable:"false" list:"user"`
  73. Url string `nullable:"false" list:"user"`
  74. User string `width:"64" charset:"utf8" nullable:"true"`
  75. Password string `width:"64" charset:"utf8" nullable:"true"`
  76. Database string `width:"64" charset:"utf8" nullable:"true"`
  77. IsDefault tristate.TriState `default:"false" create:"optional"`
  78. /*
  79. TimeInterval string
  80. BasicAuth bool
  81. BasicAuthUser string
  82. BasicAuthPassword string
  83. */
  84. }
  85. func (m *SDataSourceManager) GetSource(id string) (*SDataSource, error) {
  86. ret, err := m.FetchById(id)
  87. if err != nil {
  88. return nil, err
  89. }
  90. return ret.(*SDataSource), nil
  91. }
  92. func (m *SDataSourceManager) GetDatabases() (jsonutils.JSONObject, error) {
  93. ret := jsonutils.NewDict()
  94. dataSource, err := datasource.GetDefaultSource("")
  95. if err != nil {
  96. return jsonutils.JSONNull, errors.Wrap(err, "s.GetDefaultSource")
  97. }
  98. db := influxdb.NewInfluxdb(dataSource.Url)
  99. //db.SetDatabase("telegraf")
  100. databases, err := db.GetDatabases()
  101. if err != nil {
  102. return jsonutils.JSONNull, errors.Wrap(err, "GetDatabases")
  103. }
  104. ret.Add(jsonutils.NewStringArray(databases), "databases")
  105. return ret, nil
  106. }
  107. func (m *SDataSourceManager) GetMeasurements(query jsonutils.JSONObject,
  108. measurementFilter, tagFilter string) (jsonutils.JSONObject,
  109. error) {
  110. ret := jsonutils.NewDict()
  111. measurements, err := m.getMeasurementQueryInfluxdb(query, measurementFilter, tagFilter)
  112. if err != nil {
  113. return jsonutils.JSONNull, err
  114. }
  115. ret.Add(jsonutils.Marshal(&measurements), "measurements")
  116. return ret, nil
  117. }
  118. func (m *SDataSourceManager) getMeasurementQueryInfluxdb(query jsonutils.JSONObject,
  119. measurementFilter, tagFilter string) (rtnMeasurements []monitor.InfluxMeasurement, err error) {
  120. database, _ := query.GetString("database")
  121. if database == "" {
  122. return rtnMeasurements, merrors.NewArgIsEmptyErr("database")
  123. }
  124. dataSource, err := datasource.GetDefaultSource("")
  125. if err != nil {
  126. return rtnMeasurements, errors.Wrap(err, "s.GetDefaultSource")
  127. }
  128. db := influxdb.NewInfluxdb(dataSource.Url)
  129. db.SetDatabase(database)
  130. var buffer bytes.Buffer
  131. buffer.WriteString(" SHOW MEASUREMENTS ON ")
  132. buffer.WriteString(database)
  133. if len(measurementFilter) != 0 {
  134. buffer.WriteString(" WITH ")
  135. buffer.WriteString(measurementFilter)
  136. }
  137. if len(tagFilter) != 0 {
  138. buffer.WriteString(" WHERE ")
  139. buffer.WriteString(tagFilter)
  140. }
  141. dbRtn, err := db.Query(buffer.String())
  142. if err != nil {
  143. return rtnMeasurements, errors.Wrap(err, "SHOW MEASUREMENTS")
  144. }
  145. if len(dbRtn) > 0 && len(dbRtn[0]) > 0 {
  146. res := dbRtn[0][0]
  147. measurements := make([]monitor.InfluxMeasurement, len(res.Values))
  148. for i := range res.Values {
  149. tmpDict := jsonutils.NewDict()
  150. tmpDict.Add(res.Values[i][0], "measurement")
  151. err = tmpDict.Unmarshal(&measurements[i])
  152. if err != nil {
  153. return rtnMeasurements, errors.Wrap(err, "measurement unmarshal error")
  154. }
  155. }
  156. rtnMeasurements = append(rtnMeasurements, measurements...)
  157. }
  158. return
  159. }
  160. func (m *SDataSourceManager) GetMeasurementsWithDescriptionInfos(query jsonutils.JSONObject, tagFilter *monitor.MetricQueryTag) (jsonutils.JSONObject, error) {
  161. ret := jsonutils.NewDict()
  162. rtnMeasurements := make([]monitor.InfluxMeasurement, 0)
  163. measurements, err := MetricMeasurementManager.getMeasurementsFromDB()
  164. if err != nil {
  165. return jsonutils.JSONNull, errors.Wrap(err, "getMeasurementsFromDB")
  166. }
  167. /*filterMeasurements, err := m.filterMeasurementsByTime(measurements, query, tagFilter)
  168. if err != nil {
  169. return jsonutils.JSONNull, errors.Wrap(err, "filterMeasurementsByTime error")
  170. }*/
  171. filterMeasurements := m.getMetricDescriptions(measurements)
  172. if len(filterMeasurements) != 0 {
  173. rtnMeasurements = append(rtnMeasurements, filterMeasurements...)
  174. }
  175. ret.Add(jsonutils.Marshal(&rtnMeasurements), "measurements")
  176. resTypeMap := make(map[string][]monitor.InfluxMeasurement, 0)
  177. resTypes := make([]string, 0)
  178. for _, measurement := range rtnMeasurements {
  179. if typeMeasurements, ok := resTypeMap[measurement.ResType]; ok {
  180. resTypeMap[measurement.ResType] = append(typeMeasurements, measurement)
  181. continue
  182. }
  183. resTypes = append(resTypes, measurement.ResType)
  184. resTypeMap[measurement.ResType] = []monitor.InfluxMeasurement{measurement}
  185. }
  186. sort.Slice(resTypes, func(i, j int) bool {
  187. r1 := resTypes[i]
  188. r2 := resTypes[j]
  189. return monitor.ResTypeScoreMap[r1] < monitor.ResTypeScoreMap[r2]
  190. })
  191. for _, measures := range resTypeMap {
  192. sort.Slice(measures, func(i, j int) bool {
  193. return measures[i].Score < measures[j].Score
  194. })
  195. }
  196. ret.Add(jsonutils.Marshal(&resTypes), "res_types")
  197. ret.Add(jsonutils.Marshal(&resTypeMap), "res_type_measurements")
  198. return ret, nil
  199. }
  200. func (m *SDataSourceManager) GetMeasurementsWithOutTimeFilter(query jsonutils.JSONObject,
  201. measurementFilter, tagFilter string) (jsonutils.JSONObject,
  202. error) {
  203. ret := jsonutils.NewDict()
  204. database, _ := query.GetString("database")
  205. if database == "" {
  206. return jsonutils.JSONNull, httperrors.NewInputParameterError("not support database")
  207. }
  208. dataSource, err := datasource.GetDefaultSource("")
  209. if err != nil {
  210. return jsonutils.JSONNull, errors.Wrap(err, "s.GetDefaultSource")
  211. }
  212. db := influxdb.NewInfluxdb(dataSource.Url)
  213. db.SetDatabase(database)
  214. var buffer bytes.Buffer
  215. buffer.WriteString(" SHOW MEASUREMENTS ON ")
  216. buffer.WriteString(database)
  217. if len(measurementFilter) != 0 {
  218. buffer.WriteString(" WITH ")
  219. buffer.WriteString(measurementFilter)
  220. }
  221. if len(tagFilter) != 0 {
  222. buffer.WriteString(" WHERE ")
  223. buffer.WriteString(tagFilter)
  224. }
  225. dbRtn, err := db.Query(buffer.String())
  226. if err != nil {
  227. return jsonutils.JSONNull, errors.Wrap(err, "SHOW MEASUREMENTS")
  228. }
  229. if len(dbRtn) > 0 && len(dbRtn[0]) > 0 {
  230. res := dbRtn[0][0]
  231. measurements := make([]monitor.InfluxMeasurement, len(res.Values))
  232. for i := range res.Values {
  233. tmpDict := jsonutils.NewDict()
  234. tmpDict.Add(res.Values[i][0], "measurement")
  235. err := tmpDict.Unmarshal(&measurements[i])
  236. if err != nil {
  237. return jsonutils.JSONNull, errors.Wrap(err, "measurement unmarshal error")
  238. }
  239. }
  240. ret.Add(jsonutils.Marshal(&measurements), "measurements")
  241. }
  242. return ret, nil
  243. }
  244. func (m *SDataSourceManager) getMetricDescriptions(influxdbMeasurements []monitor.InfluxMeasurement) (
  245. descMeasurements []monitor.InfluxMeasurement) {
  246. userCred := auth.AdminCredential()
  247. listInput := new(monitor.MetricListInput)
  248. for _, measurement := range influxdbMeasurements {
  249. listInput.Measurement.Names = append(listInput.Measurement.Names, measurement.Measurement)
  250. }
  251. query, err := MetricMeasurementManager.ListItemFilter(context.Background(), MetricMeasurementManager.Query(), userCred,
  252. *listInput)
  253. if err != nil {
  254. log.Errorln(errors.Wrap(err, "DataSourceManager getMetricDescriptions error"))
  255. }
  256. descriMeasurements, err := MetricMeasurementManager.getMeasurement(query)
  257. if len(descriMeasurements) != 0 {
  258. measurementsIns := make([]interface{}, len(descriMeasurements))
  259. for i, _ := range descriMeasurements {
  260. measurementsIns[i] = &descriMeasurements[i]
  261. }
  262. details := MetricMeasurementManager.FetchCustomizeColumns(context.Background(), userCred, jsonutils.NewDict(), measurementsIns,
  263. stringutils2.NewSortedStrings([]string{}), true)
  264. if err != nil {
  265. log.Errorln(errors.Wrap(err, "DataSourceManager getMetricDescriptions error"))
  266. }
  267. for i, measureDes := range descriMeasurements {
  268. for j, _ := range influxdbMeasurements {
  269. if measureDes.Name == influxdbMeasurements[j].Measurement {
  270. if len(measureDes.DisplayName) != 0 {
  271. influxdbMeasurements[j].MeasurementDisplayName = measureDes.DisplayName
  272. }
  273. if len(measureDes.ResType) != 0 {
  274. influxdbMeasurements[j].ResType = measureDes.ResType
  275. }
  276. if measureDes.Score != 0 {
  277. influxdbMeasurements[j].Score = measureDes.Score
  278. }
  279. fieldDesMap := make(map[string]monitor.MetricFieldDetail, 0)
  280. fields := make([]string, 0)
  281. fieldKeys := stringutils2.NewSortedStrings(influxdbMeasurements[j].FieldKey)
  282. for fieldIndex, fieldDes := range details[i].MetricFields {
  283. if len(fieldDes.DisplayName) != 0 {
  284. fieldDesMap[fieldDes.Name] = details[i].MetricFields[fieldIndex]
  285. }
  286. if fieldKeys.Contains(fieldDes.Name) {
  287. fields = append(fields, fieldDes.Name)
  288. }
  289. }
  290. influxdbMeasurements[j].FieldDescriptions = fieldDesMap
  291. influxdbMeasurements[j].Database = measureDes.Database
  292. influxdbMeasurements[j].FieldKey = fields
  293. descMeasurements = append(descMeasurements, influxdbMeasurements[j])
  294. }
  295. }
  296. }
  297. }
  298. return
  299. }
  300. func (m *SDataSourceManager) filterMeasurementsByTime(
  301. measurements []monitor.InfluxMeasurement, query jsonutils.JSONObject, tagFilter *monitor.MetricQueryTag) ([]monitor.InfluxMeasurement, error) {
  302. timeF, err := m.getFromAndToFromParam(query)
  303. if err != nil {
  304. return nil, err
  305. }
  306. filterMeasurements, err := m.getFilterMeasurementsParallel(timeF.From, timeF.To, measurements, tagFilter)
  307. if err != nil {
  308. return nil, err
  309. }
  310. return filterMeasurements, nil
  311. }
  312. type timeFilter struct {
  313. From string
  314. To string
  315. }
  316. func (m *SDataSourceManager) getFromAndToFromParam(query jsonutils.JSONObject) (timeFilter, error) {
  317. timeF := timeFilter{}
  318. from, _ := query.GetString("from")
  319. if len(from) == 0 {
  320. from = "6h"
  321. }
  322. to, _ := query.GetString("to")
  323. if len(to) == 0 {
  324. to = "now"
  325. }
  326. timeFilter := monitor.AlertQuery{
  327. From: from,
  328. To: to,
  329. }
  330. err := validators.ValidateFromAndToValue(timeFilter)
  331. if err != nil {
  332. return timeF, err
  333. }
  334. timeF.From = from
  335. timeF.To = to
  336. return timeF, nil
  337. }
  338. func (m *SDataSourceManager) getFilterMeasurementsParallel(from, to string,
  339. measurements []monitor.InfluxMeasurement, tagFilter *monitor.MetricQueryTag) ([]monitor.InfluxMeasurement, error) {
  340. filterMeasurements := make([]monitor.InfluxMeasurement, len(measurements))
  341. ctx, cancel := context.WithTimeout(context.Background(), time.Second*60)
  342. defer cancel()
  343. measurementQueryGroup, _ := errgroup.WithContext(ctx)
  344. for i := range measurements {
  345. index := i
  346. tmp := measurements[index]
  347. measurementQueryGroup.Go(func() error {
  348. errCh := make(chan error)
  349. go func() {
  350. ret, err := m.getFilterMeasurement(from, to, tmp, tagFilter)
  351. if err != nil {
  352. errCh <- errors.Wrapf(err, "getFilterMeasurement %d", index)
  353. return
  354. }
  355. filterMeasurements[index] = *ret
  356. errCh <- nil
  357. }()
  358. for {
  359. select {
  360. case <-ctx.Done():
  361. return errors.Wrap(ctx.Err(), "filter measurement from TSDB")
  362. case err := <-errCh:
  363. if err != nil {
  364. return err
  365. }
  366. return nil
  367. }
  368. }
  369. })
  370. }
  371. if err := measurementQueryGroup.Wait(); err != nil {
  372. return nil, errors.Wrap(err, "measuremetnQueryGroup.Wait()")
  373. }
  374. ret := make([]monitor.InfluxMeasurement, 0)
  375. for _, fm := range filterMeasurements {
  376. if len(fm.Measurement) != 0 {
  377. tmp := fm
  378. ret = append(ret, tmp)
  379. }
  380. }
  381. return ret, nil
  382. }
  383. func (m *SDataSourceManager) GetTSDBDriver() (tsdb.TsdbQueryEndpoint, error) {
  384. ep, err := datasource.GetDefaultQueryEndpoint()
  385. if err != nil {
  386. return nil, errors.Wrap(err, "GetDefaultQueryEndpoint")
  387. }
  388. return ep, nil
  389. }
  390. func (m *SDataSourceManager) getFilterMeasurement(from, to string, measurement monitor.InfluxMeasurement, tagFilter *monitor.MetricQueryTag) (*monitor.InfluxMeasurement, error) {
  391. dds, _ := datasource.GetDefaultSource("")
  392. ep, err := m.GetTSDBDriver()
  393. if err != nil {
  394. return nil, errors.Wrap(err, "GetDefaultQueryEndpoint")
  395. }
  396. retMs, err := ep.FilterMeasurement(context.Background(), dds, from, to, &measurement, tagFilter)
  397. if err != nil {
  398. return nil, errors.Wrap(err, "Get endpoint filtered measurement")
  399. }
  400. return retMs, nil
  401. }
  402. func renderTimeFilter(from, to string) string {
  403. if strings.Contains(from, "now-") {
  404. from = "now() - " + strings.Replace(from, "now-", "", 1)
  405. } else {
  406. from = "now() - " + from
  407. }
  408. tmp := ""
  409. if to != "now" && to != "" {
  410. tmp = " and time < now() - " + strings.Replace(to, "now-", "", 1)
  411. }
  412. return fmt.Sprintf("time > %s%s", from, tmp)
  413. }
  414. func (m *SDataSourceManager) GetMetricMeasurement(userCred mcclient.TokenCredential, query jsonutils.JSONObject, tagFilter *monitor.MetricQueryTag) (jsonutils.JSONObject, error) {
  415. database, _ := query.GetString("database")
  416. if database == "" {
  417. return jsonutils.JSONNull, merrors.NewArgIsEmptyErr("database")
  418. }
  419. measurement, _ := query.GetString("measurement")
  420. if measurement == "" {
  421. return jsonutils.JSONNull, merrors.NewArgIsEmptyErr("measurement")
  422. }
  423. field, _ := query.GetString("field")
  424. if field == "" {
  425. return jsonutils.JSONNull, merrors.NewArgIsEmptyErr("field")
  426. }
  427. from, _ := query.GetString("from")
  428. if len(from) == 0 {
  429. return jsonutils.JSONNull, merrors.NewArgIsEmptyErr("from")
  430. }
  431. timeF, err := m.getFromAndToFromParam(query)
  432. if err != nil {
  433. return nil, errors.Wrap(err, "getFromAndToFromParam")
  434. }
  435. //skipCheckSeries := jsonutils.QueryBoolean(query, "skip_check_series", false)
  436. output := new(monitor.InfluxMeasurement)
  437. output.Measurement = measurement
  438. output.Database = database
  439. output.TagValue = make(map[string][]string, 0)
  440. output.FieldKey = []string{field}
  441. // 只查询过去 30m 的指标
  442. if timeF.To == "now" {
  443. timeF.From = "30m"
  444. }
  445. if err := getTagValues(userCred, output, timeF, tagFilter, true); err != nil {
  446. return jsonutils.JSONNull, errors.Wrap(err, "getTagValues error")
  447. }
  448. m.filterRtnTags(output)
  449. return jsonutils.Marshal(output), nil
  450. }
  451. func (m *SDataSourceManager) filterRtnTags(output *monitor.InfluxMeasurement) {
  452. for _, tag := range []string{hostconsts.TELEGRAF_TAG_KEY_BRAND, hostconsts.TELEGRAF_TAG_KEY_PLATFORM,
  453. hostconsts.TELEGRAF_TAG_KEY_HYPERVISOR} {
  454. if val, ok := output.TagValue[tag]; ok {
  455. output.TagValue[hostconsts.TELEGRAF_TAG_KEY_BRAND] = val
  456. break
  457. }
  458. }
  459. for _, tag := range []string{
  460. "source", hostconsts.TELEGRAF_TAG_KEY_HOST_TYPE, hostconsts.TELEGRAF_TAG_KEY_RES_TYPE,
  461. "is_vm", "os_type", hostconsts.TELEGRAF_TAG_KEY_PLATFORM, hostconsts.TELEGRAF_TAG_KEY_HYPERVISOR,
  462. "domain_name", "region", "ips", "vip", "vip_eip", "eip", "eip_mode",
  463. labels.MetricName, translator.UNION_RESULT_NAME,
  464. } {
  465. if _, ok := output.TagValue[tag]; ok {
  466. delete(output.TagValue, tag)
  467. }
  468. }
  469. // hide VictoriaMetrics telegraf db tag
  470. if val, ok := output.TagValue[VICTORIA_METRICS_DB_TAG_KEY]; ok {
  471. if len(val) == 1 && val[0] == VICTORIA_METRICS_DB_TAG_VAL_TELEGRAF {
  472. delete(output.TagValue, VICTORIA_METRICS_DB_TAG_KEY)
  473. }
  474. }
  475. repTag := make([]string, 0)
  476. for tag, _ := range output.TagValue {
  477. repTag = append(repTag, tag)
  478. }
  479. output.TagKey = repTag
  480. }
  481. func (m *SDataSourceManager) filterTagValue(measurement monitor.InfluxMeasurement, timeF timeFilter,
  482. db *influxdb.SInfluxdb, tagValChan *influxdbTagValueChan, tagFilter string) error {
  483. ctx, cancel := context.WithTimeout(context.Background(), time.Second*15)
  484. defer cancel()
  485. tagValGroup2, _ := errgroup.WithContext(ctx)
  486. tagValChan2 := influxdbTagValueChan{
  487. rtnChan: make(chan map[string][]string, len(measurement.TagKey)),
  488. count: len(measurement.TagKey),
  489. }
  490. for i, _ := range measurement.TagKey {
  491. tmpkey := measurement.TagKey[i]
  492. tagValGroup2.Go(func() error {
  493. return m.getFilterMeasurementTagValue(&tagValChan2, timeF.From, timeF.To, measurement.FieldKey[0],
  494. tmpkey, measurement, db, tagFilter)
  495. })
  496. }
  497. tagValGroup2.Go(func() error {
  498. valMaps := make(map[string][]string, 0)
  499. for i := 0; i < tagValChan2.count; i++ {
  500. select {
  501. case valMap := <-tagValChan2.rtnChan:
  502. for key, val := range valMap {
  503. if _, ok := valMaps[key]; ok {
  504. valMaps[key] = append(valMaps[key], val...)
  505. continue
  506. }
  507. valMaps[key] = val
  508. }
  509. case <-ctx.Done():
  510. return fmt.Errorf("filter getFilterMeasurementTagValue time out")
  511. }
  512. }
  513. tagValChan.rtnChan <- valMaps
  514. close(tagValChan2.rtnChan)
  515. return nil
  516. })
  517. return tagValGroup2.Wait()
  518. }
  519. func tagValUnion(measurement *monitor.InfluxMeasurement, rtn map[string][]string) {
  520. keys := make([]string, 0)
  521. for _, tag := range measurement.TagKey {
  522. if rtnTagVal, ok := rtn[tag]; ok {
  523. keys = append(keys, tag)
  524. if _, ok := measurement.TagValue[tag]; !ok {
  525. measurement.TagValue[tag] = rtnTagVal
  526. continue
  527. }
  528. measurement.TagValue[tag] = union(measurement.TagValue[tag], rtnTagVal)
  529. }
  530. }
  531. measurement.TagKey = keys
  532. }
  533. func union(slice1, slice2 []string) []string {
  534. m := make(map[string]int)
  535. for _, v := range slice1 {
  536. m[v]++
  537. }
  538. for _, v := range slice2 {
  539. times, _ := m[v]
  540. if times == 0 {
  541. slice1 = append(slice1, v)
  542. }
  543. }
  544. return slice1
  545. }
  546. type InfluxdbSubscription struct {
  547. SubName string
  548. DataBase string
  549. //retention policy
  550. Rc string
  551. Url string
  552. }
  553. func (m *SDataSourceManager) AddSubscription(subscription InfluxdbSubscription) error {
  554. query := fmt.Sprintf("CREATE SUBSCRIPTION %s ON %s.%s DESTINATIONS ALL %s",
  555. jsonutils.NewString(subscription.SubName).String(),
  556. jsonutils.NewString(subscription.DataBase).String(),
  557. jsonutils.NewString(subscription.Rc).String(),
  558. strings.ReplaceAll(jsonutils.NewString(subscription.Url).String(), "\"", "'"),
  559. )
  560. dataSource, err := datasource.GetDefaultSource("")
  561. if err != nil {
  562. return errors.Wrap(err, "s.GetDefaultSource")
  563. }
  564. db := influxdb.NewInfluxdbWithDebug(dataSource.Url, true)
  565. db.SetDatabase(subscription.DataBase)
  566. rtn, err := db.GetQuery(query)
  567. if err != nil {
  568. return err
  569. }
  570. for _, result := range rtn {
  571. for _, obj := range result {
  572. objJson := jsonutils.Marshal(&obj)
  573. log.Errorln(objJson.String())
  574. }
  575. }
  576. return nil
  577. }
  578. func (m *SDataSourceManager) DropSubscription(subscription InfluxdbSubscription) error {
  579. query := fmt.Sprintf("DROP SUBSCRIPTION %s ON %s.%s", jsonutils.NewString(subscription.SubName).String(),
  580. jsonutils.NewString(subscription.DataBase).String(),
  581. jsonutils.NewString(subscription.Rc).String(),
  582. )
  583. dataSource, err := datasource.GetDefaultSource("")
  584. if err != nil {
  585. return errors.Wrap(err, "s.GetDefaultSource")
  586. }
  587. db := influxdb.NewInfluxdb(dataSource.Url)
  588. db.SetDatabase(subscription.DataBase)
  589. rtn, err := db.Query(query)
  590. if err != nil {
  591. return err
  592. }
  593. for _, result := range rtn {
  594. for _, obj := range result {
  595. objJson := jsonutils.Marshal(&obj)
  596. log.Errorln(objJson.String())
  597. }
  598. }
  599. return nil
  600. }
  601. /*func getAttributesOnMeasurement(database, tp string, output *monitor.InfluxMeasurement, db *influxdb.SInfluxdb) error {
  602. query := fmt.Sprintf("SHOW %s KEYS ON %s FROM %s", tp, database, output.Measurement)
  603. dbRtn, err := db.Query(query)
  604. if err != nil {
  605. return errors.Wrapf(err, "SHOW MEASUREMENTS: %s", query)
  606. }
  607. if len(dbRtn) == 0 || len(dbRtn[0]) == 0 {
  608. return nil
  609. }
  610. res := dbRtn[0][0]
  611. tmpDict := jsonutils.NewDict()
  612. tmpArr := jsonutils.NewArray()
  613. for i := range res.Values {
  614. v, _ := res.Values[i][0].(*jsonutils.JSONString).GetString()
  615. if filterTagKey(v) {
  616. continue
  617. }
  618. tmpArr.Add(res.Values[i][0])
  619. }
  620. tmpDict.Add(tmpArr, res.Columns[0])
  621. err = tmpDict.Unmarshal(output)
  622. if err != nil {
  623. return errors.Wrap(err, "measurement unmarshal error")
  624. }
  625. return nil
  626. }*/
  627. func getTagValues(userCred mcclient.TokenCredential, output *monitor.InfluxMeasurement, timeF timeFilter, tagFilter *monitor.MetricQueryTag, skipCheckSeries bool) error {
  628. mq := monitor.MetricQuery{
  629. Database: output.Database,
  630. Measurement: output.Measurement,
  631. Selects: []monitor.MetricQuerySelect{
  632. {
  633. {
  634. Type: "field",
  635. Params: []string{output.FieldKey[0]},
  636. },
  637. {
  638. Type: "last",
  639. },
  640. },
  641. },
  642. GroupBy: []monitor.MetricQueryPart{
  643. {
  644. Type: "field",
  645. Params: []string{"*"},
  646. },
  647. },
  648. }
  649. if tagFilter != nil {
  650. mq.Tags = []monitor.MetricQueryTag{
  651. {
  652. Key: tagFilter.Key,
  653. Operator: tagFilter.Operator,
  654. Value: tagFilter.Value,
  655. },
  656. }
  657. }
  658. aq := &monitor.AlertQuery{
  659. Model: mq,
  660. From: timeF.From,
  661. To: timeF.To,
  662. }
  663. q := monitor.MetricQueryInput{
  664. From: timeF.From,
  665. To: timeF.To,
  666. Interval: "3m",
  667. MetricQuery: []*monitor.AlertQuery{
  668. aq,
  669. },
  670. SkipCheckSeries: skipCheckSeries,
  671. }
  672. ret, err := doQuery(userCred, q)
  673. if err != nil {
  674. return errors.Wrapf(err, "getTagValues query error %s", jsonutils.Marshal(q))
  675. }
  676. // 2. group tag and values
  677. tagValMap := make(map[string][]string)
  678. tagKeys := make([]string, 0)
  679. if len(ret.Series) == 0 {
  680. return nil
  681. }
  682. for _, s := range ret.Series {
  683. tagMap := s.Tags
  684. for key, valStr := range tagMap {
  685. valStr = renderTagVal(valStr)
  686. if len(valStr) == 0 || valStr == "null" || filterTagValue(valStr) {
  687. continue
  688. }
  689. if filterTagKey(key) {
  690. continue
  691. }
  692. if valArr, ok := tagValMap[key]; ok {
  693. if !utils.IsInStringArray(valStr, valArr) {
  694. tagValMap[key] = append(valArr, valStr)
  695. }
  696. continue
  697. }
  698. tagValMap[key] = []string{valStr}
  699. tagKeys = append(tagKeys, key)
  700. }
  701. }
  702. output.TagValue = tagValMap
  703. sort.Strings(tagKeys)
  704. output.TagKey = tagKeys
  705. return nil
  706. }
  707. func getTagValue(database string, output *monitor.InfluxMeasurement, db *influxdb.SInfluxdb) error {
  708. if len(output.TagKey) == 0 {
  709. return nil
  710. }
  711. tagKeyStr := jsonutils.NewStringArray(output.TagKey).String()
  712. tagKeyStr = tagKeyStr[1 : len(tagKeyStr)-1]
  713. dbRtn, err := db.Query(fmt.Sprintf("SHOW TAG VALUES ON %s FROM %s WITH KEY IN (%s)", database, output.Measurement, tagKeyStr))
  714. if err != nil {
  715. return err
  716. }
  717. res := dbRtn[0][0]
  718. tagValue := make(map[string][]string, 0)
  719. keys := strings.Join(output.TagKey, ",")
  720. for i := range res.Values {
  721. val, _ := res.Values[i][0].(*jsonutils.JSONString).GetString()
  722. if !strings.Contains(keys, val) {
  723. continue
  724. }
  725. if _, ok := tagValue[val]; !ok {
  726. tagValue[val] = make([]string, 0)
  727. }
  728. tag, _ := res.Values[i][1].(*jsonutils.JSONString).GetString()
  729. if filterTagValue(tag) {
  730. delete(tagValue, val)
  731. continue
  732. }
  733. tagValue[val] = append(tagValue[val], tag)
  734. }
  735. output.TagValue = tagValue
  736. //TagKey == TagValue.keys
  737. tagK := make([]string, 0)
  738. for tag, _ := range output.TagValue {
  739. tagK = append(tagK, tag)
  740. }
  741. output.TagKey = tagK
  742. return nil
  743. }
  744. type influxdbTagValueChan struct {
  745. rtnChan chan map[string][]string
  746. count int
  747. }
  748. func (m *SDataSourceManager) getFilterMeasurementTagValue(tagValueChan *influxdbTagValueChan, from string,
  749. to string, field string, tagKey string,
  750. measurement monitor.InfluxMeasurement, db *influxdb.SInfluxdb, tagFilter string) error {
  751. var buffer bytes.Buffer
  752. buffer.WriteString(fmt.Sprintf(`SELECT last("%s") FROM "%s" WHERE %s `, field, measurement.Measurement,
  753. renderTimeFilter(from, to)))
  754. if len(tagFilter) != 0 {
  755. buffer.WriteString(fmt.Sprintf(` AND %s `, tagFilter))
  756. }
  757. buffer.WriteString(fmt.Sprintf(` GROUP BY %q`, tagKey))
  758. log.Errorln(buffer.String())
  759. rtn, err := db.Query(buffer.String())
  760. if err != nil {
  761. return errors.Wrap(err, "getFilterMeasurementTagValue query error")
  762. }
  763. tagValMap := make(map[string][]string)
  764. if len(rtn) != 0 && len(rtn[0]) != 0 {
  765. for rtnIndex, _ := range rtn {
  766. for serieIndex, _ := range rtn[rtnIndex] {
  767. tagMap, _ := rtn[rtnIndex][serieIndex].Tags.GetMap()
  768. for key, valObj := range tagMap {
  769. valStr, _ := valObj.GetString()
  770. valStr = renderTagVal(valStr)
  771. if len(valStr) == 0 || valStr == "null" || filterTagValue(valStr) {
  772. continue
  773. }
  774. if !utils.IsInStringArray(key, measurement.TagKey) {
  775. //measurement.TagKey = append(measurement.TagKey, key)
  776. continue
  777. }
  778. if valArr, ok := tagValMap[key]; ok {
  779. if !utils.IsInStringArray(valStr, valArr) {
  780. tagValMap[key] = append(valArr, valStr)
  781. }
  782. continue
  783. }
  784. tagValMap[key] = []string{valStr}
  785. }
  786. }
  787. }
  788. measurement.TagValue = tagValMap
  789. }
  790. tagValueChan.rtnChan <- tagValMap
  791. return nil
  792. }
  793. func renderTagVal(val string) string {
  794. return strings.ReplaceAll(val, "+", " ")
  795. }
  796. func floatEquals(a, b float64) bool {
  797. eps := 0.000000001
  798. if math.Abs(a-b) < eps {
  799. return true
  800. }
  801. return false
  802. }
  803. var filterKey = []string{"perf_instance", "res_type", "status", "cloudregion", "os_type", "is_vm"}
  804. func filterTagKey(key string) bool {
  805. whiteListIdKeys := sets.NewString("dev_id", "die_id")
  806. if whiteListIdKeys.Has(key) {
  807. return false
  808. }
  809. if strings.Contains(key, "_id") {
  810. return true
  811. }
  812. if key == "perf_instance" {
  813. return true
  814. }
  815. return false
  816. }
  817. func filterTagValue(val string) bool {
  818. if compile.MatchString(val) {
  819. return true
  820. }
  821. return false
  822. }