| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888 |
- // Copyright 2019 Yunion
- //
- // Licensed under the Apache License, Version 2.0 (the "License");
- // you may not use this file except in compliance with the License.
- // You may obtain a copy of the License at
- //
- // http://www.apache.org/licenses/LICENSE-2.0
- //
- // Unless required by applicable law or agreed to in writing, software
- // distributed under the License is distributed on an "AS IS" BASIS,
- // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- // See the License for the specific language governing permissions and
- // limitations under the License.
- package models
- import (
- "bytes"
- "context"
- "fmt"
- "math"
- "regexp"
- "sort"
- "strings"
- "time"
- "github.com/influxdata/promql/v2/pkg/labels"
- "github.com/zexi/influxql-to-metricsql/converter/translator"
- "golang.org/x/sync/errgroup"
- "yunion.io/x/jsonutils"
- "yunion.io/x/log"
- "yunion.io/x/pkg/errors"
- "yunion.io/x/pkg/tristate"
- "yunion.io/x/pkg/util/sets"
- "yunion.io/x/pkg/utils"
- "yunion.io/x/onecloud/pkg/apis/monitor"
- "yunion.io/x/onecloud/pkg/cloudcommon/db"
- "yunion.io/x/onecloud/pkg/hostman/hostinfo/hostconsts"
- "yunion.io/x/onecloud/pkg/httperrors"
- "yunion.io/x/onecloud/pkg/mcclient"
- "yunion.io/x/onecloud/pkg/mcclient/auth"
- "yunion.io/x/onecloud/pkg/monitor/datasource"
- merrors "yunion.io/x/onecloud/pkg/monitor/errors"
- "yunion.io/x/onecloud/pkg/monitor/tsdb"
- "yunion.io/x/onecloud/pkg/monitor/validators"
- "yunion.io/x/onecloud/pkg/util/influxdb"
- "yunion.io/x/onecloud/pkg/util/stringutils2"
- )
- const (
- VICTORIA_METRICS_DB_TAG_KEY = "db"
- VICTORIA_METRICS_DB_TAG_VAL_TELEGRAF = "telegraf"
- )
- var (
- DataSourceManager *SDataSourceManager
- compile = regexp.MustCompile(`\w{8}(-\w{4}){3}-\w{12}`)
- )
- func init() {
- DataSourceManager = &SDataSourceManager{
- SStandaloneResourceBaseManager: db.NewStandaloneResourceBaseManager(
- SDataSource{},
- "datasources_tbl",
- "datasource",
- "datasources",
- ),
- }
- DataSourceManager.SetVirtualObject(DataSourceManager)
- }
- // +onecloud:swagger-gen-model-singular=datasource
- // +onecloud:swagger-gen-model-plural=datasources
- type SDataSourceManager struct {
- db.SStandaloneResourceBaseManager
- }
- type SDataSource struct {
- db.SStandaloneResourceBase
- Type string `nullable:"false" list:"user"`
- Url string `nullable:"false" list:"user"`
- User string `width:"64" charset:"utf8" nullable:"true"`
- Password string `width:"64" charset:"utf8" nullable:"true"`
- Database string `width:"64" charset:"utf8" nullable:"true"`
- IsDefault tristate.TriState `default:"false" create:"optional"`
- /*
- TimeInterval string
- BasicAuth bool
- BasicAuthUser string
- BasicAuthPassword string
- */
- }
- func (m *SDataSourceManager) GetSource(id string) (*SDataSource, error) {
- ret, err := m.FetchById(id)
- if err != nil {
- return nil, err
- }
- return ret.(*SDataSource), nil
- }
- func (m *SDataSourceManager) GetDatabases() (jsonutils.JSONObject, error) {
- ret := jsonutils.NewDict()
- dataSource, err := datasource.GetDefaultSource("")
- if err != nil {
- return jsonutils.JSONNull, errors.Wrap(err, "s.GetDefaultSource")
- }
- db := influxdb.NewInfluxdb(dataSource.Url)
- //db.SetDatabase("telegraf")
- databases, err := db.GetDatabases()
- if err != nil {
- return jsonutils.JSONNull, errors.Wrap(err, "GetDatabases")
- }
- ret.Add(jsonutils.NewStringArray(databases), "databases")
- return ret, nil
- }
- func (m *SDataSourceManager) GetMeasurements(query jsonutils.JSONObject,
- measurementFilter, tagFilter string) (jsonutils.JSONObject,
- error) {
- ret := jsonutils.NewDict()
- measurements, err := m.getMeasurementQueryInfluxdb(query, measurementFilter, tagFilter)
- if err != nil {
- return jsonutils.JSONNull, err
- }
- ret.Add(jsonutils.Marshal(&measurements), "measurements")
- return ret, nil
- }
- func (m *SDataSourceManager) getMeasurementQueryInfluxdb(query jsonutils.JSONObject,
- measurementFilter, tagFilter string) (rtnMeasurements []monitor.InfluxMeasurement, err error) {
- database, _ := query.GetString("database")
- if database == "" {
- return rtnMeasurements, merrors.NewArgIsEmptyErr("database")
- }
- dataSource, err := datasource.GetDefaultSource("")
- if err != nil {
- return rtnMeasurements, errors.Wrap(err, "s.GetDefaultSource")
- }
- db := influxdb.NewInfluxdb(dataSource.Url)
- db.SetDatabase(database)
- var buffer bytes.Buffer
- buffer.WriteString(" SHOW MEASUREMENTS ON ")
- buffer.WriteString(database)
- if len(measurementFilter) != 0 {
- buffer.WriteString(" WITH ")
- buffer.WriteString(measurementFilter)
- }
- if len(tagFilter) != 0 {
- buffer.WriteString(" WHERE ")
- buffer.WriteString(tagFilter)
- }
- dbRtn, err := db.Query(buffer.String())
- if err != nil {
- return rtnMeasurements, errors.Wrap(err, "SHOW MEASUREMENTS")
- }
- if len(dbRtn) > 0 && len(dbRtn[0]) > 0 {
- res := dbRtn[0][0]
- measurements := make([]monitor.InfluxMeasurement, len(res.Values))
- for i := range res.Values {
- tmpDict := jsonutils.NewDict()
- tmpDict.Add(res.Values[i][0], "measurement")
- err = tmpDict.Unmarshal(&measurements[i])
- if err != nil {
- return rtnMeasurements, errors.Wrap(err, "measurement unmarshal error")
- }
- }
- rtnMeasurements = append(rtnMeasurements, measurements...)
- }
- return
- }
- func (m *SDataSourceManager) GetMeasurementsWithDescriptionInfos(query jsonutils.JSONObject, tagFilter *monitor.MetricQueryTag) (jsonutils.JSONObject, error) {
- ret := jsonutils.NewDict()
- rtnMeasurements := make([]monitor.InfluxMeasurement, 0)
- measurements, err := MetricMeasurementManager.getMeasurementsFromDB()
- if err != nil {
- return jsonutils.JSONNull, errors.Wrap(err, "getMeasurementsFromDB")
- }
- /*filterMeasurements, err := m.filterMeasurementsByTime(measurements, query, tagFilter)
- if err != nil {
- return jsonutils.JSONNull, errors.Wrap(err, "filterMeasurementsByTime error")
- }*/
- filterMeasurements := m.getMetricDescriptions(measurements)
- if len(filterMeasurements) != 0 {
- rtnMeasurements = append(rtnMeasurements, filterMeasurements...)
- }
- ret.Add(jsonutils.Marshal(&rtnMeasurements), "measurements")
- resTypeMap := make(map[string][]monitor.InfluxMeasurement, 0)
- resTypes := make([]string, 0)
- for _, measurement := range rtnMeasurements {
- if typeMeasurements, ok := resTypeMap[measurement.ResType]; ok {
- resTypeMap[measurement.ResType] = append(typeMeasurements, measurement)
- continue
- }
- resTypes = append(resTypes, measurement.ResType)
- resTypeMap[measurement.ResType] = []monitor.InfluxMeasurement{measurement}
- }
- sort.Slice(resTypes, func(i, j int) bool {
- r1 := resTypes[i]
- r2 := resTypes[j]
- return monitor.ResTypeScoreMap[r1] < monitor.ResTypeScoreMap[r2]
- })
- for _, measures := range resTypeMap {
- sort.Slice(measures, func(i, j int) bool {
- return measures[i].Score < measures[j].Score
- })
- }
- ret.Add(jsonutils.Marshal(&resTypes), "res_types")
- ret.Add(jsonutils.Marshal(&resTypeMap), "res_type_measurements")
- return ret, nil
- }
- func (m *SDataSourceManager) GetMeasurementsWithOutTimeFilter(query jsonutils.JSONObject,
- measurementFilter, tagFilter string) (jsonutils.JSONObject,
- error) {
- ret := jsonutils.NewDict()
- database, _ := query.GetString("database")
- if database == "" {
- return jsonutils.JSONNull, httperrors.NewInputParameterError("not support database")
- }
- dataSource, err := datasource.GetDefaultSource("")
- if err != nil {
- return jsonutils.JSONNull, errors.Wrap(err, "s.GetDefaultSource")
- }
- db := influxdb.NewInfluxdb(dataSource.Url)
- db.SetDatabase(database)
- var buffer bytes.Buffer
- buffer.WriteString(" SHOW MEASUREMENTS ON ")
- buffer.WriteString(database)
- if len(measurementFilter) != 0 {
- buffer.WriteString(" WITH ")
- buffer.WriteString(measurementFilter)
- }
- if len(tagFilter) != 0 {
- buffer.WriteString(" WHERE ")
- buffer.WriteString(tagFilter)
- }
- dbRtn, err := db.Query(buffer.String())
- if err != nil {
- return jsonutils.JSONNull, errors.Wrap(err, "SHOW MEASUREMENTS")
- }
- if len(dbRtn) > 0 && len(dbRtn[0]) > 0 {
- res := dbRtn[0][0]
- measurements := make([]monitor.InfluxMeasurement, len(res.Values))
- for i := range res.Values {
- tmpDict := jsonutils.NewDict()
- tmpDict.Add(res.Values[i][0], "measurement")
- err := tmpDict.Unmarshal(&measurements[i])
- if err != nil {
- return jsonutils.JSONNull, errors.Wrap(err, "measurement unmarshal error")
- }
- }
- ret.Add(jsonutils.Marshal(&measurements), "measurements")
- }
- return ret, nil
- }
- func (m *SDataSourceManager) getMetricDescriptions(influxdbMeasurements []monitor.InfluxMeasurement) (
- descMeasurements []monitor.InfluxMeasurement) {
- userCred := auth.AdminCredential()
- listInput := new(monitor.MetricListInput)
- for _, measurement := range influxdbMeasurements {
- listInput.Measurement.Names = append(listInput.Measurement.Names, measurement.Measurement)
- }
- query, err := MetricMeasurementManager.ListItemFilter(context.Background(), MetricMeasurementManager.Query(), userCred,
- *listInput)
- if err != nil {
- log.Errorln(errors.Wrap(err, "DataSourceManager getMetricDescriptions error"))
- }
- descriMeasurements, err := MetricMeasurementManager.getMeasurement(query)
- if len(descriMeasurements) != 0 {
- measurementsIns := make([]interface{}, len(descriMeasurements))
- for i, _ := range descriMeasurements {
- measurementsIns[i] = &descriMeasurements[i]
- }
- details := MetricMeasurementManager.FetchCustomizeColumns(context.Background(), userCred, jsonutils.NewDict(), measurementsIns,
- stringutils2.NewSortedStrings([]string{}), true)
- if err != nil {
- log.Errorln(errors.Wrap(err, "DataSourceManager getMetricDescriptions error"))
- }
- for i, measureDes := range descriMeasurements {
- for j, _ := range influxdbMeasurements {
- if measureDes.Name == influxdbMeasurements[j].Measurement {
- if len(measureDes.DisplayName) != 0 {
- influxdbMeasurements[j].MeasurementDisplayName = measureDes.DisplayName
- }
- if len(measureDes.ResType) != 0 {
- influxdbMeasurements[j].ResType = measureDes.ResType
- }
- if measureDes.Score != 0 {
- influxdbMeasurements[j].Score = measureDes.Score
- }
- fieldDesMap := make(map[string]monitor.MetricFieldDetail, 0)
- fields := make([]string, 0)
- fieldKeys := stringutils2.NewSortedStrings(influxdbMeasurements[j].FieldKey)
- for fieldIndex, fieldDes := range details[i].MetricFields {
- if len(fieldDes.DisplayName) != 0 {
- fieldDesMap[fieldDes.Name] = details[i].MetricFields[fieldIndex]
- }
- if fieldKeys.Contains(fieldDes.Name) {
- fields = append(fields, fieldDes.Name)
- }
- }
- influxdbMeasurements[j].FieldDescriptions = fieldDesMap
- influxdbMeasurements[j].Database = measureDes.Database
- influxdbMeasurements[j].FieldKey = fields
- descMeasurements = append(descMeasurements, influxdbMeasurements[j])
- }
- }
- }
- }
- return
- }
- func (m *SDataSourceManager) filterMeasurementsByTime(
- measurements []monitor.InfluxMeasurement, query jsonutils.JSONObject, tagFilter *monitor.MetricQueryTag) ([]monitor.InfluxMeasurement, error) {
- timeF, err := m.getFromAndToFromParam(query)
- if err != nil {
- return nil, err
- }
- filterMeasurements, err := m.getFilterMeasurementsParallel(timeF.From, timeF.To, measurements, tagFilter)
- if err != nil {
- return nil, err
- }
- return filterMeasurements, nil
- }
- type timeFilter struct {
- From string
- To string
- }
- func (m *SDataSourceManager) getFromAndToFromParam(query jsonutils.JSONObject) (timeFilter, error) {
- timeF := timeFilter{}
- from, _ := query.GetString("from")
- if len(from) == 0 {
- from = "6h"
- }
- to, _ := query.GetString("to")
- if len(to) == 0 {
- to = "now"
- }
- timeFilter := monitor.AlertQuery{
- From: from,
- To: to,
- }
- err := validators.ValidateFromAndToValue(timeFilter)
- if err != nil {
- return timeF, err
- }
- timeF.From = from
- timeF.To = to
- return timeF, nil
- }
- func (m *SDataSourceManager) getFilterMeasurementsParallel(from, to string,
- measurements []monitor.InfluxMeasurement, tagFilter *monitor.MetricQueryTag) ([]monitor.InfluxMeasurement, error) {
- filterMeasurements := make([]monitor.InfluxMeasurement, len(measurements))
- ctx, cancel := context.WithTimeout(context.Background(), time.Second*60)
- defer cancel()
- measurementQueryGroup, _ := errgroup.WithContext(ctx)
- for i := range measurements {
- index := i
- tmp := measurements[index]
- measurementQueryGroup.Go(func() error {
- errCh := make(chan error)
- go func() {
- ret, err := m.getFilterMeasurement(from, to, tmp, tagFilter)
- if err != nil {
- errCh <- errors.Wrapf(err, "getFilterMeasurement %d", index)
- return
- }
- filterMeasurements[index] = *ret
- errCh <- nil
- }()
- for {
- select {
- case <-ctx.Done():
- return errors.Wrap(ctx.Err(), "filter measurement from TSDB")
- case err := <-errCh:
- if err != nil {
- return err
- }
- return nil
- }
- }
- })
- }
- if err := measurementQueryGroup.Wait(); err != nil {
- return nil, errors.Wrap(err, "measuremetnQueryGroup.Wait()")
- }
- ret := make([]monitor.InfluxMeasurement, 0)
- for _, fm := range filterMeasurements {
- if len(fm.Measurement) != 0 {
- tmp := fm
- ret = append(ret, tmp)
- }
- }
- return ret, nil
- }
- func (m *SDataSourceManager) GetTSDBDriver() (tsdb.TsdbQueryEndpoint, error) {
- ep, err := datasource.GetDefaultQueryEndpoint()
- if err != nil {
- return nil, errors.Wrap(err, "GetDefaultQueryEndpoint")
- }
- return ep, nil
- }
- func (m *SDataSourceManager) getFilterMeasurement(from, to string, measurement monitor.InfluxMeasurement, tagFilter *monitor.MetricQueryTag) (*monitor.InfluxMeasurement, error) {
- dds, _ := datasource.GetDefaultSource("")
- ep, err := m.GetTSDBDriver()
- if err != nil {
- return nil, errors.Wrap(err, "GetDefaultQueryEndpoint")
- }
- retMs, err := ep.FilterMeasurement(context.Background(), dds, from, to, &measurement, tagFilter)
- if err != nil {
- return nil, errors.Wrap(err, "Get endpoint filtered measurement")
- }
- return retMs, nil
- }
- func renderTimeFilter(from, to string) string {
- if strings.Contains(from, "now-") {
- from = "now() - " + strings.Replace(from, "now-", "", 1)
- } else {
- from = "now() - " + from
- }
- tmp := ""
- if to != "now" && to != "" {
- tmp = " and time < now() - " + strings.Replace(to, "now-", "", 1)
- }
- return fmt.Sprintf("time > %s%s", from, tmp)
- }
- func (m *SDataSourceManager) GetMetricMeasurement(userCred mcclient.TokenCredential, query jsonutils.JSONObject, tagFilter *monitor.MetricQueryTag) (jsonutils.JSONObject, error) {
- database, _ := query.GetString("database")
- if database == "" {
- return jsonutils.JSONNull, merrors.NewArgIsEmptyErr("database")
- }
- measurement, _ := query.GetString("measurement")
- if measurement == "" {
- return jsonutils.JSONNull, merrors.NewArgIsEmptyErr("measurement")
- }
- field, _ := query.GetString("field")
- if field == "" {
- return jsonutils.JSONNull, merrors.NewArgIsEmptyErr("field")
- }
- from, _ := query.GetString("from")
- if len(from) == 0 {
- return jsonutils.JSONNull, merrors.NewArgIsEmptyErr("from")
- }
- timeF, err := m.getFromAndToFromParam(query)
- if err != nil {
- return nil, errors.Wrap(err, "getFromAndToFromParam")
- }
- //skipCheckSeries := jsonutils.QueryBoolean(query, "skip_check_series", false)
- output := new(monitor.InfluxMeasurement)
- output.Measurement = measurement
- output.Database = database
- output.TagValue = make(map[string][]string, 0)
- output.FieldKey = []string{field}
- // 只查询过去 30m 的指标
- if timeF.To == "now" {
- timeF.From = "30m"
- }
- if err := getTagValues(userCred, output, timeF, tagFilter, true); err != nil {
- return jsonutils.JSONNull, errors.Wrap(err, "getTagValues error")
- }
- m.filterRtnTags(output)
- return jsonutils.Marshal(output), nil
- }
- func (m *SDataSourceManager) filterRtnTags(output *monitor.InfluxMeasurement) {
- for _, tag := range []string{hostconsts.TELEGRAF_TAG_KEY_BRAND, hostconsts.TELEGRAF_TAG_KEY_PLATFORM,
- hostconsts.TELEGRAF_TAG_KEY_HYPERVISOR} {
- if val, ok := output.TagValue[tag]; ok {
- output.TagValue[hostconsts.TELEGRAF_TAG_KEY_BRAND] = val
- break
- }
- }
- for _, tag := range []string{
- "source", hostconsts.TELEGRAF_TAG_KEY_HOST_TYPE, hostconsts.TELEGRAF_TAG_KEY_RES_TYPE,
- "is_vm", "os_type", hostconsts.TELEGRAF_TAG_KEY_PLATFORM, hostconsts.TELEGRAF_TAG_KEY_HYPERVISOR,
- "domain_name", "region", "ips", "vip", "vip_eip", "eip", "eip_mode",
- labels.MetricName, translator.UNION_RESULT_NAME,
- } {
- if _, ok := output.TagValue[tag]; ok {
- delete(output.TagValue, tag)
- }
- }
- // hide VictoriaMetrics telegraf db tag
- if val, ok := output.TagValue[VICTORIA_METRICS_DB_TAG_KEY]; ok {
- if len(val) == 1 && val[0] == VICTORIA_METRICS_DB_TAG_VAL_TELEGRAF {
- delete(output.TagValue, VICTORIA_METRICS_DB_TAG_KEY)
- }
- }
- repTag := make([]string, 0)
- for tag, _ := range output.TagValue {
- repTag = append(repTag, tag)
- }
- output.TagKey = repTag
- }
- func (m *SDataSourceManager) filterTagValue(measurement monitor.InfluxMeasurement, timeF timeFilter,
- db *influxdb.SInfluxdb, tagValChan *influxdbTagValueChan, tagFilter string) error {
- ctx, cancel := context.WithTimeout(context.Background(), time.Second*15)
- defer cancel()
- tagValGroup2, _ := errgroup.WithContext(ctx)
- tagValChan2 := influxdbTagValueChan{
- rtnChan: make(chan map[string][]string, len(measurement.TagKey)),
- count: len(measurement.TagKey),
- }
- for i, _ := range measurement.TagKey {
- tmpkey := measurement.TagKey[i]
- tagValGroup2.Go(func() error {
- return m.getFilterMeasurementTagValue(&tagValChan2, timeF.From, timeF.To, measurement.FieldKey[0],
- tmpkey, measurement, db, tagFilter)
- })
- }
- tagValGroup2.Go(func() error {
- valMaps := make(map[string][]string, 0)
- for i := 0; i < tagValChan2.count; i++ {
- select {
- case valMap := <-tagValChan2.rtnChan:
- for key, val := range valMap {
- if _, ok := valMaps[key]; ok {
- valMaps[key] = append(valMaps[key], val...)
- continue
- }
- valMaps[key] = val
- }
- case <-ctx.Done():
- return fmt.Errorf("filter getFilterMeasurementTagValue time out")
- }
- }
- tagValChan.rtnChan <- valMaps
- close(tagValChan2.rtnChan)
- return nil
- })
- return tagValGroup2.Wait()
- }
- func tagValUnion(measurement *monitor.InfluxMeasurement, rtn map[string][]string) {
- keys := make([]string, 0)
- for _, tag := range measurement.TagKey {
- if rtnTagVal, ok := rtn[tag]; ok {
- keys = append(keys, tag)
- if _, ok := measurement.TagValue[tag]; !ok {
- measurement.TagValue[tag] = rtnTagVal
- continue
- }
- measurement.TagValue[tag] = union(measurement.TagValue[tag], rtnTagVal)
- }
- }
- measurement.TagKey = keys
- }
- func union(slice1, slice2 []string) []string {
- m := make(map[string]int)
- for _, v := range slice1 {
- m[v]++
- }
- for _, v := range slice2 {
- times, _ := m[v]
- if times == 0 {
- slice1 = append(slice1, v)
- }
- }
- return slice1
- }
- type InfluxdbSubscription struct {
- SubName string
- DataBase string
- //retention policy
- Rc string
- Url string
- }
- func (m *SDataSourceManager) AddSubscription(subscription InfluxdbSubscription) error {
- query := fmt.Sprintf("CREATE SUBSCRIPTION %s ON %s.%s DESTINATIONS ALL %s",
- jsonutils.NewString(subscription.SubName).String(),
- jsonutils.NewString(subscription.DataBase).String(),
- jsonutils.NewString(subscription.Rc).String(),
- strings.ReplaceAll(jsonutils.NewString(subscription.Url).String(), "\"", "'"),
- )
- dataSource, err := datasource.GetDefaultSource("")
- if err != nil {
- return errors.Wrap(err, "s.GetDefaultSource")
- }
- db := influxdb.NewInfluxdbWithDebug(dataSource.Url, true)
- db.SetDatabase(subscription.DataBase)
- rtn, err := db.GetQuery(query)
- if err != nil {
- return err
- }
- for _, result := range rtn {
- for _, obj := range result {
- objJson := jsonutils.Marshal(&obj)
- log.Errorln(objJson.String())
- }
- }
- return nil
- }
- func (m *SDataSourceManager) DropSubscription(subscription InfluxdbSubscription) error {
- query := fmt.Sprintf("DROP SUBSCRIPTION %s ON %s.%s", jsonutils.NewString(subscription.SubName).String(),
- jsonutils.NewString(subscription.DataBase).String(),
- jsonutils.NewString(subscription.Rc).String(),
- )
- dataSource, err := datasource.GetDefaultSource("")
- if err != nil {
- return errors.Wrap(err, "s.GetDefaultSource")
- }
- db := influxdb.NewInfluxdb(dataSource.Url)
- db.SetDatabase(subscription.DataBase)
- rtn, err := db.Query(query)
- if err != nil {
- return err
- }
- for _, result := range rtn {
- for _, obj := range result {
- objJson := jsonutils.Marshal(&obj)
- log.Errorln(objJson.String())
- }
- }
- return nil
- }
- /*func getAttributesOnMeasurement(database, tp string, output *monitor.InfluxMeasurement, db *influxdb.SInfluxdb) error {
- query := fmt.Sprintf("SHOW %s KEYS ON %s FROM %s", tp, database, output.Measurement)
- dbRtn, err := db.Query(query)
- if err != nil {
- return errors.Wrapf(err, "SHOW MEASUREMENTS: %s", query)
- }
- if len(dbRtn) == 0 || len(dbRtn[0]) == 0 {
- return nil
- }
- res := dbRtn[0][0]
- tmpDict := jsonutils.NewDict()
- tmpArr := jsonutils.NewArray()
- for i := range res.Values {
- v, _ := res.Values[i][0].(*jsonutils.JSONString).GetString()
- if filterTagKey(v) {
- continue
- }
- tmpArr.Add(res.Values[i][0])
- }
- tmpDict.Add(tmpArr, res.Columns[0])
- err = tmpDict.Unmarshal(output)
- if err != nil {
- return errors.Wrap(err, "measurement unmarshal error")
- }
- return nil
- }*/
- func getTagValues(userCred mcclient.TokenCredential, output *monitor.InfluxMeasurement, timeF timeFilter, tagFilter *monitor.MetricQueryTag, skipCheckSeries bool) error {
- mq := monitor.MetricQuery{
- Database: output.Database,
- Measurement: output.Measurement,
- Selects: []monitor.MetricQuerySelect{
- {
- {
- Type: "field",
- Params: []string{output.FieldKey[0]},
- },
- {
- Type: "last",
- },
- },
- },
- GroupBy: []monitor.MetricQueryPart{
- {
- Type: "field",
- Params: []string{"*"},
- },
- },
- }
- if tagFilter != nil {
- mq.Tags = []monitor.MetricQueryTag{
- {
- Key: tagFilter.Key,
- Operator: tagFilter.Operator,
- Value: tagFilter.Value,
- },
- }
- }
- aq := &monitor.AlertQuery{
- Model: mq,
- From: timeF.From,
- To: timeF.To,
- }
- q := monitor.MetricQueryInput{
- From: timeF.From,
- To: timeF.To,
- Interval: "3m",
- MetricQuery: []*monitor.AlertQuery{
- aq,
- },
- SkipCheckSeries: skipCheckSeries,
- }
- ret, err := doQuery(userCred, q)
- if err != nil {
- return errors.Wrapf(err, "getTagValues query error %s", jsonutils.Marshal(q))
- }
- // 2. group tag and values
- tagValMap := make(map[string][]string)
- tagKeys := make([]string, 0)
- if len(ret.Series) == 0 {
- return nil
- }
- for _, s := range ret.Series {
- tagMap := s.Tags
- for key, valStr := range tagMap {
- valStr = renderTagVal(valStr)
- if len(valStr) == 0 || valStr == "null" || filterTagValue(valStr) {
- continue
- }
- if filterTagKey(key) {
- continue
- }
- if valArr, ok := tagValMap[key]; ok {
- if !utils.IsInStringArray(valStr, valArr) {
- tagValMap[key] = append(valArr, valStr)
- }
- continue
- }
- tagValMap[key] = []string{valStr}
- tagKeys = append(tagKeys, key)
- }
- }
- output.TagValue = tagValMap
- sort.Strings(tagKeys)
- output.TagKey = tagKeys
- return nil
- }
- func getTagValue(database string, output *monitor.InfluxMeasurement, db *influxdb.SInfluxdb) error {
- if len(output.TagKey) == 0 {
- return nil
- }
- tagKeyStr := jsonutils.NewStringArray(output.TagKey).String()
- tagKeyStr = tagKeyStr[1 : len(tagKeyStr)-1]
- dbRtn, err := db.Query(fmt.Sprintf("SHOW TAG VALUES ON %s FROM %s WITH KEY IN (%s)", database, output.Measurement, tagKeyStr))
- if err != nil {
- return err
- }
- res := dbRtn[0][0]
- tagValue := make(map[string][]string, 0)
- keys := strings.Join(output.TagKey, ",")
- for i := range res.Values {
- val, _ := res.Values[i][0].(*jsonutils.JSONString).GetString()
- if !strings.Contains(keys, val) {
- continue
- }
- if _, ok := tagValue[val]; !ok {
- tagValue[val] = make([]string, 0)
- }
- tag, _ := res.Values[i][1].(*jsonutils.JSONString).GetString()
- if filterTagValue(tag) {
- delete(tagValue, val)
- continue
- }
- tagValue[val] = append(tagValue[val], tag)
- }
- output.TagValue = tagValue
- //TagKey == TagValue.keys
- tagK := make([]string, 0)
- for tag, _ := range output.TagValue {
- tagK = append(tagK, tag)
- }
- output.TagKey = tagK
- return nil
- }
- type influxdbTagValueChan struct {
- rtnChan chan map[string][]string
- count int
- }
- func (m *SDataSourceManager) getFilterMeasurementTagValue(tagValueChan *influxdbTagValueChan, from string,
- to string, field string, tagKey string,
- measurement monitor.InfluxMeasurement, db *influxdb.SInfluxdb, tagFilter string) error {
- var buffer bytes.Buffer
- buffer.WriteString(fmt.Sprintf(`SELECT last("%s") FROM "%s" WHERE %s `, field, measurement.Measurement,
- renderTimeFilter(from, to)))
- if len(tagFilter) != 0 {
- buffer.WriteString(fmt.Sprintf(` AND %s `, tagFilter))
- }
- buffer.WriteString(fmt.Sprintf(` GROUP BY %q`, tagKey))
- log.Errorln(buffer.String())
- rtn, err := db.Query(buffer.String())
- if err != nil {
- return errors.Wrap(err, "getFilterMeasurementTagValue query error")
- }
- tagValMap := make(map[string][]string)
- if len(rtn) != 0 && len(rtn[0]) != 0 {
- for rtnIndex, _ := range rtn {
- for serieIndex, _ := range rtn[rtnIndex] {
- tagMap, _ := rtn[rtnIndex][serieIndex].Tags.GetMap()
- for key, valObj := range tagMap {
- valStr, _ := valObj.GetString()
- valStr = renderTagVal(valStr)
- if len(valStr) == 0 || valStr == "null" || filterTagValue(valStr) {
- continue
- }
- if !utils.IsInStringArray(key, measurement.TagKey) {
- //measurement.TagKey = append(measurement.TagKey, key)
- continue
- }
- if valArr, ok := tagValMap[key]; ok {
- if !utils.IsInStringArray(valStr, valArr) {
- tagValMap[key] = append(valArr, valStr)
- }
- continue
- }
- tagValMap[key] = []string{valStr}
- }
- }
- }
- measurement.TagValue = tagValMap
- }
- tagValueChan.rtnChan <- tagValMap
- return nil
- }
- func renderTagVal(val string) string {
- return strings.ReplaceAll(val, "+", " ")
- }
- func floatEquals(a, b float64) bool {
- eps := 0.000000001
- if math.Abs(a-b) < eps {
- return true
- }
- return false
- }
- var filterKey = []string{"perf_instance", "res_type", "status", "cloudregion", "os_type", "is_vm"}
- func filterTagKey(key string) bool {
- whiteListIdKeys := sets.NewString("dev_id", "die_id")
- if whiteListIdKeys.Has(key) {
- return false
- }
- if strings.Contains(key, "_id") {
- return true
- }
- if key == "perf_instance" {
- return true
- }
- return false
- }
- func filterTagValue(val string) bool {
- if compile.MatchString(val) {
- return true
- }
- return false
- }
|