| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828 |
- // Copyright 2019 Yunion
- //
- // Licensed under the Apache License, Version 2.0 (the "License");
- // you may not use this file except in compliance with the License.
- // You may obtain a copy of the License at
- //
- // http://www.apache.org/licenses/LICENSE-2.0
- //
- // Unless required by applicable law or agreed to in writing, software
- // distributed under the License is distributed on an "AS IS" BASIS,
- // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- // See the License for the specific language governing permissions and
- // limitations under the License.
- package models
- import (
- "context"
- "fmt"
- "net/http"
- "sort"
- "strconv"
- "strings"
- "time"
- "github.com/influxdata/promql/v2/pkg/labels"
- "github.com/zexi/influxql-to-metricsql/converter/translator"
- "yunion.io/x/jsonutils"
- "yunion.io/x/log"
- "yunion.io/x/pkg/errors"
- "yunion.io/x/pkg/util/httputils"
- "yunion.io/x/pkg/util/rbacscope"
- "yunion.io/x/onecloud/pkg/apis/monitor"
- "yunion.io/x/onecloud/pkg/appsrv"
- "yunion.io/x/onecloud/pkg/cloudcommon/db"
- "yunion.io/x/onecloud/pkg/hostman/hostinfo/hostconsts"
- "yunion.io/x/onecloud/pkg/httperrors"
- "yunion.io/x/onecloud/pkg/mcclient"
- mod "yunion.io/x/onecloud/pkg/mcclient/modules/monitor"
- "yunion.io/x/onecloud/pkg/monitor/datasource"
- merrors "yunion.io/x/onecloud/pkg/monitor/errors"
- mq "yunion.io/x/onecloud/pkg/monitor/metricquery"
- "yunion.io/x/onecloud/pkg/monitor/options"
- "yunion.io/x/onecloud/pkg/monitor/tsdb/driver/victoriametrics"
- "yunion.io/x/onecloud/pkg/monitor/validators"
- )
- const (
- TELEGRAF_DATABASE = "telegraf"
- )
- var (
- UnifiedMonitorManager *SUnifiedMonitorManager
- )
- func init() {
- UnifiedMonitorManager = &SUnifiedMonitorManager{
- SVirtualResourceBaseManager: db.NewVirtualResourceBaseManager(
- &SUnifiedMonitorManager{},
- "",
- "unifiedmonitor",
- "unifiedmonitors",
- ),
- }
- UnifiedMonitorManager.SetVirtualObject(UnifiedMonitorManager)
- }
- type SUnifiedMonitorManager struct {
- db.SVirtualResourceBaseManager
- }
- type SUnifiedMonitorModel struct {
- }
- func (self *SUnifiedMonitorManager) GetPropertyDatabases(ctx context.Context, userCred mcclient.TokenCredential,
- query jsonutils.JSONObject) (jsonutils.JSONObject, error) {
- return DataSourceManager.GetDatabases()
- }
- func (self *SUnifiedMonitorManager) GetPropertyMeasurements(ctx context.Context, userCred mcclient.TokenCredential,
- query jsonutils.JSONObject) (jsonutils.JSONObject, error) {
- filter, err := getTagFilterByRequestQuery(ctx, userCred, query)
- if err != nil {
- return nil, errors.Wrap(err, "getTagFilterByRequestQuery")
- }
- return DataSourceManager.GetMeasurementsWithDescriptionInfos(query, filter)
- }
- func getTagFilterByRequestQuery(ctx context.Context, userCred mcclient.TokenCredential, query jsonutils.JSONObject) (*monitor.MetricQueryTag, error) {
- scope, _ := query.GetString("scope")
- return filterByScope(ctx, userCred, scope, query)
- }
- func filterByScope(ctx context.Context, userCred mcclient.TokenCredential, scope string, data jsonutils.JSONObject) (*monitor.MetricQueryTag, error) {
- domainId := jsonutils.GetAnyString(data, db.DomainFetchKeys)
- projectId := jsonutils.GetAnyString(data, db.ProjectFetchKeys)
- if projectId != "" {
- project, err := db.DefaultProjectFetcher(ctx, projectId, domainId)
- if err != nil {
- return nil, errors.Wrap(err, "db.DefaultProjectFetcher")
- }
- projectId = project.GetProjectId()
- domainId = project.GetProjectDomainId()
- }
- if domainId != "" {
- domain, err := db.DefaultDomainFetcher(ctx, domainId)
- if err != nil {
- return nil, errors.Wrap(err, "db.DefaultDomainFetcher")
- }
- domainId = domain.GetProjectDomainId()
- domain.GetProjectId()
- }
- switch scope {
- case "system":
- return nil, nil
- case "domain":
- if domainId == "" {
- domainId = userCred.GetProjectDomainId()
- }
- return getProjectIdsFilterByDomain(domainId)
- default:
- if projectId == "" {
- projectId = userCred.GetProjectId()
- }
- return getProjectIdFilterByProject(projectId)
- }
- }
- func getTenantIdStr(role string, userCred mcclient.TokenCredential) (*monitor.MetricQueryTag, error) {
- if role == "admin" {
- return nil, nil
- }
- if role == "domainadmin" {
- domainId := userCred.GetDomainId()
- return getProjectIdsFilterByDomain(domainId)
- }
- if role == "member" {
- tenantId := userCred.GetProjectId()
- return getProjectIdFilterByProject(tenantId)
- }
- return nil, errors.Wrapf(errors.ErrNotFound, "not supported role %q", role)
- }
- func getProjectIdsFilterByDomain(domainId string) (*monitor.MetricQueryTag, error) {
- //s := auth.GetAdminSession(context.Background(), "", "")
- //params := jsonutils.Marshal(map[string]string{"domain_id": domainId})
- //tenants, err := modules.Projects.List(s, params)
- //if err != nil {
- // return "", errors.Wrap(err, "Projects.List")
- //}
- //var buffer bytes.Buffer
- //buffer.WriteString("( ")
- //for index, tenant := range tenants.Data {
- // tenantId, _ := tenant.GetString("id")
- // if index != len(tenants.Data)-1 {
- // buffer.WriteString(fmt.Sprintf(" %s =~ /%s/ %s ", "tenant_id", tenantId, "OR"))
- // } else {
- // buffer.WriteString(fmt.Sprintf(" %s =~ /%s/ ", "tenant_id", tenantId))
- // }
- //}
- //buffer.WriteString(" )")
- //return buffer.String(), nil
- return &monitor.MetricQueryTag{
- Key: "domain_id",
- Operator: "=~",
- Value: fmt.Sprintf("/%s/", domainId),
- }, nil
- //return fmt.Sprintf(`%s =~ /%s/`, "domain_id", domainId), nil
- }
- func getProjectIdFilterByProject(projectId string) (*monitor.MetricQueryTag, error) {
- //return fmt.Sprintf(`%s =~ /%s/`, "tenant_id", projectId), nil
- return &monitor.MetricQueryTag{
- Key: "tenant_id",
- Operator: "=~",
- Value: fmt.Sprintf("/%s/", projectId),
- }, nil
- }
- func (self *SUnifiedMonitorManager) GetPropertyMetricMeasurement(ctx context.Context, userCred mcclient.TokenCredential, query jsonutils.JSONObject) (jsonutils.JSONObject, error) {
- metricFunc := monitor.MetricFunc{
- FieldOptType: monitor.UNIFIED_MONITOR_FIELD_OPT_TYPE,
- FieldOptValue: monitor.UNIFIED_MONITOR_FIELD_OPT_VALUE,
- GroupOptType: monitor.UNIFIED_MONITOR_GROUPBY_OPT_TYPE,
- GroupOptValue: monitor.UNIFIED_MONITOR_GROUPBY_OPT_VALUE,
- }
- filter, err := getTagFilterByRequestQuery(ctx, userCred, query)
- if err != nil {
- return nil, errors.Wrapf(err, "getTagFilterByRequestQuery %s", query.String())
- }
- rtn, err := DataSourceManager.GetMetricMeasurement(userCred, query, filter)
- if err != nil {
- return nil, errors.Wrapf(err, "GetMetricMeasurement by query %s, filter %s", query.String(), filter)
- }
- rtn.(*jsonutils.JSONDict).Add(jsonutils.Marshal(&metricFunc), "func")
- return rtn, nil
- }
- func (self *SUnifiedMonitorManager) SetHandlerProcessTimeout(info *appsrv.SHandlerInfo, r *http.Request) time.Duration {
- return 5 * time.Minute
- }
- // +onecloud:swagger-gen-route-method=POST
- // +onecloud:swagger-gen-route-path=/unifiedmonitors/query
- // +onecloud:swagger-gen-route-tag=unifiedmonitor
- // +onecloud:swagger-gen-param-body-index=0
- // +onecloud:swagger-gen-resp-index=0
- // +onecloud:swagger-gen-resp-body-key=unifiedmonitor
- // 查询监控数据接口
- func PerformQuery(input monitor.MetricQueryInput) *monitor.MetricsQueryResult {
- return nil
- }
- func (self *SUnifiedMonitorManager) PerformQuery(ctx context.Context, userCred mcclient.TokenCredential, query jsonutils.JSONObject, data jsonutils.JSONObject) (*monitor.MetricsQueryResult, error) {
- tmp := jsonutils.DeepCopy(data)
- self.handleDataPreSignature(ctx, tmp)
- if !options.Options.DisableQuerySignatureCheck {
- if err := ValidateQuerySignature(tmp); err != nil {
- return nil, errors.Wrap(err, "ValidateQuerySignature")
- }
- }
- inputQuery := new(monitor.MetricQueryInput)
- err := data.Unmarshal(inputQuery)
- if err != nil {
- return nil, err
- }
- if len(inputQuery.MetricQuery) == 0 {
- return nil, merrors.NewArgIsEmptyErr("metric_query")
- }
- for _, q := range inputQuery.MetricQuery {
- scope, _ := data.GetString("scope")
- ownId, _ := self.FetchOwnerId(ctx, data)
- if ownId == nil {
- ownId = userCred
- }
- setDefaultValue(q, inputQuery, scope, ownId, false)
- if err := self.ValidateInputQuery(q, inputQuery); err != nil {
- return nil, errors.Wrapf(err, "ValidateInputQuery")
- }
- }
- var groupByTag = make([]string, 0)
- for _, query := range inputQuery.MetricQuery {
- for _, group := range query.Model.GroupBy {
- if group.Type == "tag" {
- groupByTag = append(groupByTag, group.Params[0])
- }
- }
- }
- return self.performQuery(ctx, userCred, inputQuery)
- }
- func (self *SUnifiedMonitorManager) performQuery(ctx context.Context, userCred mcclient.TokenCredential, inputQuery *monitor.MetricQueryInput) (*monitor.MetricsQueryResult, error) {
- rtn, err := doQuery(userCred, *inputQuery)
- if err != nil {
- return nil, errors.Wrapf(err, "doQuery with input %s", jsonutils.Marshal(inputQuery))
- }
- if len(inputQuery.Soffset) != 0 && len(inputQuery.Slimit) != 0 {
- // seriesTotal := self.fillSearchSeriesTotalQuery(userCred, *inputQuery.MetricQuery[0])
- // do offset and limit
- total := rtn.SeriesTotal
- offset, err := strconv.Atoi(inputQuery.Soffset)
- if err != nil {
- return nil, httperrors.NewInputParameterError("soffset %q is not integer", inputQuery.Soffset)
- }
- limit, err := strconv.Atoi(inputQuery.Slimit)
- if err != nil {
- return nil, httperrors.NewInputParameterError("slimit %q is not integer", inputQuery.Slimit)
- }
- start := offset
- end := start + limit
- if end > int(total) {
- end = int(total)
- }
- ss := rtn.Series
- if start >= end {
- rtn.Series = nil
- } else {
- rtn.Series = ss[start:end]
- }
- }
- fillSerieTags(&rtn.Series)
- return rtn, nil
- }
- func (self *SUnifiedMonitorManager) fillSearchSeriesTotalQuery(userCred mcclient.TokenCredential, fork monitor.AlertQuery) int64 {
- newGroupByPart := make([]monitor.MetricQueryPart, 0)
- newGroupByPart = append(newGroupByPart, fork.Model.GroupBy[0])
- fork.Model.GroupBy = newGroupByPart
- forkInputQury := new(monitor.MetricQueryInput)
- forkInputQury.MetricQuery = []*monitor.AlertQuery{&fork}
- rtn, err := doQuery(userCred, *forkInputQury)
- if err != nil {
- log.Errorf("exec forkInputQury err:%v", err)
- return 0
- }
- return int64(len(rtn.Series))
- }
- func (self *SUnifiedMonitorManager) handleDataPreSignature(ctx context.Context, data jsonutils.JSONObject) {
- scope, _ := data.GetString("scope")
- isIdentityName, _ := data.Bool("identity_name")
- switch scope {
- case "system":
- case "domain":
- domain, err := data.GetString("project_domain")
- if err == nil {
- domainObj, _ := db.DefaultDomainFetcher(ctx, domain)
- if isIdentityName {
- domain = domainObj.Name
- }
- data.(*jsonutils.JSONDict).Remove("project_domain")
- data.(*jsonutils.JSONDict).Set("domain_id", jsonutils.NewString(domain))
- }
- default:
- project, err := data.GetString("project")
- if err == nil {
- domain, _ := data.GetString("project_domain")
- tenant, _ := db.DefaultProjectFetcher(ctx, project, domain)
- if isIdentityName {
- project = tenant.Name
- }
- data.(*jsonutils.JSONDict).Remove("project")
- data.(*jsonutils.JSONDict).Set("project_id", jsonutils.NewString(project))
- }
- }
- }
- func doQuery(userCred mcclient.TokenCredential, query monitor.MetricQueryInput) (*monitor.MetricsQueryResult, error) {
- conds := make([]*monitor.AlertCondition, 0)
- for _, q := range query.MetricQuery {
- if q.To == "" {
- q.To = query.To
- }
- if q.From == "" {
- q.From = query.From
- }
- if q.Model.Interval == "" {
- q.Model.Interval = query.Interval
- }
- condition := monitor.AlertCondition{
- Type: monitor.ConditionTypeMetricQuery,
- Query: *q,
- }
- if q.ResultReducer != nil {
- condition.Reducer = *q.ResultReducer
- condition.ReducerOrder = q.ResultReducerOrder
- }
- conds = append(conds, &condition)
- }
- factory := mq.GetQueryFactories()[monitor.ConditionTypeMetricQuery]
- metricQ, err := factory(conds)
- if err != nil {
- return nil, errors.Wrap(err, "factory")
- }
- metrics, err := metricQ.ExecuteQuery(userCred, query.Scope, query.SkipCheckSeries)
- if err != nil {
- return nil, errors.Wrap(err, "ExecuteQuery")
- }
- // drop metas contains raw_query
- if !query.ShowMeta {
- metrics.Metas = nil
- }
- metrics.SeriesTotal = int64(len(metrics.Series))
- return metrics, nil
- }
- func (self *SUnifiedMonitorManager) ValidateInputQuery(query *monitor.AlertQuery, input *monitor.MetricQueryInput) error {
- if input.From == "" {
- input.From = "1h"
- }
- if input.To == "" {
- input.To = "now"
- }
- if input.Interval == "" {
- input.Interval = "5m"
- if input.To == "now" {
- if input.From == "10m" {
- input.Interval = "1m"
- }
- }
- }
- if query.From == "" {
- query.From = input.From
- }
- if query.Model.Interval == "" {
- query.Model.Interval = input.Interval
- }
- if query.To == "" {
- query.To = input.To
- }
- if _, err := time.ParseDuration(query.Model.Interval); err != nil {
- return httperrors.NewInputParameterError("Invalid interval format: %s", query.Model.Interval)
- }
- return validators.ValidateSelectOfMetricQuery(*query)
- }
- func setDefaultValue(
- query *monitor.AlertQuery,
- inputQuery *monitor.MetricQueryInput,
- scope string, ownerId mcclient.IIdentityProvider,
- isAlert bool) {
- if query.From == "" {
- query.From = inputQuery.From
- }
- if query.To == "" {
- query.To = inputQuery.To
- }
- if query.Model.Interval == "" {
- query.Model.Interval = inputQuery.Interval
- }
- metricMeasurement, _ := MetricMeasurementManager.GetCache().Get(query.Model.Measurement)
- checkQueryGroupBy(query, inputQuery, isAlert)
- if len(inputQuery.Interval) != 0 {
- query.Model.GroupBy = append(query.Model.GroupBy,
- monitor.MetricQueryPart{
- Type: "time",
- Params: []string{"$interval"},
- },
- monitor.MetricQueryPart{
- Type: "fill",
- Params: []string{"none"},
- })
- }
- // HACK: not set slimit and soffset, getting all series then do offset and limit
- // if len(inputQuery.Slimit) != 0 && len(inputQuery.Soffset) != 0 {
- // query.Model.GroupBy = append(query.Model.GroupBy,
- // monitor.MetricQueryPart{Type: "slimit", Params: []string{inputQuery.Slimit}},
- // monitor.MetricQueryPart{Type: "soffset", Params: []string{inputQuery.Soffset}},
- // )
- // }
- if query.Model.Database == "" {
- database := ""
- if metricMeasurement == nil {
- log.Warningf("Not found measurement %s from metrics measurement cache", query.Model.Measurement)
- } else {
- database = metricMeasurement.Database
- }
- if database == "" {
- // hack: query from default telegraf database if no metric measurement matched
- database = TELEGRAF_DATABASE
- }
- query.Model.Database = database
- }
- drv, _ := DataSourceManager.GetTSDBDriver()
- query = drv.FillSelect(query, isAlert)
- var projectId, domainId string
- switch rbacscope.TRbacScope(scope) {
- case rbacscope.ScopeProject:
- projectId = ownerId.GetProjectId()
- containId := false
- for _, tagFilter := range query.Model.Tags {
- if tagFilter.Key == "tenant_id" {
- containId = true
- break
- }
- }
- if !containId {
- query.Model.Tags = append(query.Model.Tags, monitor.MetricQueryTag{
- Key: "tenant_id",
- Operator: "=",
- Value: projectId,
- Condition: "and",
- })
- }
- case rbacscope.ScopeDomain:
- domainId = ownerId.GetProjectDomainId()
- containId := false
- for _, tagFilter := range query.Model.Tags {
- if tagFilter.Key == "domain_id" {
- containId = true
- break
- }
- }
- if !containId {
- query.Model.Tags = append(query.Model.Tags, monitor.MetricQueryTag{
- Key: "domain_id",
- Operator: "=",
- Value: domainId,
- Condition: "and",
- })
- }
- }
- if metricMeasurement != nil && metricMeasurement.ResType == hostconsts.TELEGRAF_TAG_ONECLOUD_RES_TYPE {
- query.Model.Tags = append(query.Model.Tags, monitor.MetricQueryTag{
- Key: hostconsts.TELEGRAF_TAG_KEY_RES_TYPE,
- Operator: "=",
- Value: hostconsts.TELEGRAF_TAG_ONECLOUD_RES_TYPE,
- Condition: "and",
- })
- }
- }
- func checkQueryGroupBy(query *monitor.AlertQuery, inputQuery *monitor.MetricQueryInput, isAlert bool) {
- if len(query.Model.GroupBy) != 0 {
- return
- }
- metricMeasurement, _ := MetricMeasurementManager.GetCache().Get(query.Model.Measurement)
- if inputQuery.Unit || metricMeasurement == nil && query.Model.Database == monitor.METRIC_DATABASE_METER {
- return
- }
- tagId := ""
- if metricMeasurement != nil {
- tagId = monitor.GetMeasurementTagIdKeyByResType(metricMeasurement.ResType)
- }
- drv, _ := DataSourceManager.GetTSDBDriver()
- query = drv.FillGroupBy(query, inputQuery, tagId, isAlert)
- }
- func fillSerieTags(series *monitor.TimeSeriesSlice) {
- for i, serie := range *series {
- for _, tag := range []string{"brand", "platform", "hypervisor"} {
- if val, ok := serie.Tags[tag]; ok {
- serie.Tags["brand"] = val
- break
- }
- }
- for _, tag := range []string{
- "source", "status", hostconsts.TELEGRAF_TAG_KEY_HOST_TYPE,
- hostconsts.TELEGRAF_TAG_KEY_RES_TYPE,
- "is_vm", "os_type", "domain_name", "region",
- labels.MetricName, translator.UNION_RESULT_NAME,
- } {
- if _, ok := serie.Tags[tag]; ok {
- delete(serie.Tags, tag)
- }
- }
- if val, ok := serie.Tags[VICTORIA_METRICS_DB_TAG_KEY]; ok {
- if val == VICTORIA_METRICS_DB_TAG_VAL_TELEGRAF {
- delete(serie.Tags, VICTORIA_METRICS_DB_TAG_KEY)
- }
- }
- (*series)[i] = serie
- }
- }
- func (self *SUnifiedMonitorManager) GetPropertySimpleQuery(ctx context.Context, userCred mcclient.TokenCredential, input *monitor.SimpleQueryInput) (jsonutils.JSONObject, error) {
- if len(input.Database) == 0 {
- input.Database = "telegraf"
- }
- if len(input.MetricName) == 0 {
- return nil, httperrors.NewMissingParameterError("metric_name")
- }
- metric := strings.Split(input.MetricName, ".")
- if len(metric) != 2 {
- return nil, httperrors.NewInputParameterError("invalid metric_name %s", input.MetricName)
- }
- measurement, field := metric[0], metric[1]
- data := mod.NewMetricQueryInputWithDB(input.Database, measurement).SkipCheckSeries(true)
- data.Selects().Select(field)
- where := data.Where()
- if len(input.Id) > 0 {
- where.Equal("id", input.Id)
- }
- if input.EndTime.IsZero() {
- input.EndTime = time.Now()
- }
- if input.StartTime.IsZero() {
- input.StartTime = input.EndTime.Add(time.Hour * -1)
- }
- if input.EndTime.Sub(input.StartTime).Hours() > 1 {
- return nil, httperrors.NewInputParameterError("The query interval is greater than one hour")
- }
- for k, v := range input.Tags {
- where.Equal(k, v)
- }
- if len(input.Interval) == 0 {
- input.Interval = "5m"
- }
- _, err := time.ParseDuration(input.Interval)
- if err != nil {
- return nil, httperrors.NewInputParameterError("invalid interval %s", input.Interval)
- }
- data.From(input.StartTime).To(input.EndTime).Interval(input.Interval)
- queryData := data.ToQueryData()
- dbRtn, err := self.performQuery(ctx, userCred, queryData)
- if err != nil {
- return nil, errors.Wrapf(err, "performQuery with data: %s", jsonutils.Marshal(queryData))
- }
- ret := []monitor.SimpleQueryOutput{}
- for _, s := range dbRtn.Series {
- id, ok := s.Tags["id"]
- if !ok {
- log.Warningf("Not found id from series: %s", jsonutils.Marshal(s))
- continue
- }
- for _, point := range s.Points {
- if len(point) != 2 {
- log.Warningf("invalid series: %s", jsonutils.Marshal(s))
- break
- }
- timestamp := point[len(point)-1]
- valPtr, ok := point[0].(*float64)
- if !ok || valPtr == nil {
- log.Warningf("invalid series point: %#v", point)
- break
- }
- ret = append(ret, monitor.SimpleQueryOutput{
- Id: id,
- Time: time.UnixMilli(int64(timestamp.(float64))),
- Value: *valPtr,
- })
- }
- }
- return jsonutils.Marshal(map[string]interface{}{"values": ret}), nil
- }
- func (self *SUnifiedMonitorManager) GetPropertyCdfQuery(ctx context.Context, userCred mcclient.TokenCredential, input *monitor.CdfQueryInput) (*monitor.CdfQueryOutput, error) {
- if len(input.Database) == 0 {
- input.Database = "telegraf"
- }
- if len(input.MetricName) == 0 {
- return nil, httperrors.NewMissingParameterError("metric_name")
- }
- input.MetricName = strings.Replace(input.MetricName, ".", "_", 1)
- ds, err := datasource.GetDefaultSource(input.Database)
- if err != nil {
- return nil, errors.Wrapf(err, "GetDefaultSource")
- }
- client, err := victoriametrics.NewClient(ds.Url)
- if err != nil {
- return nil, errors.Wrapf(err, "NewClient")
- }
- if len(input.Period) == 0 {
- input.Period = "1h"
- }
- sql := fmt.Sprintf("sum(histogram_over_time(avg_over_time(%s[%s]))) by (vmrange)", input.MetricName, input.Period)
- if len(input.Tags) > 0 {
- tags := []string{}
- for k, v := range input.Tags {
- tags = append(tags, fmt.Sprintf(`%s="%s"`, k, v))
- }
- sql = fmt.Sprintf("sum(histogram_over_time(avg_over_time(%s{%s}[%s]))) by (vmrange)", input.MetricName, strings.Join(tags, ","), input.Period)
- }
- resp, err := client.RawQuery(ctx, httputils.GetAdaptiveTimeoutClient(), sql, true)
- if err != nil {
- return nil, err
- }
- ret, total := monitor.CdfQueryDataSet{}, 0
- for _, r := range resp.Data.Result {
- if len(r.Metric) == 0 {
- continue
- }
- vmrange, _ := r.Metric["vmrange"]
- if len(vmrange) == 0 {
- continue
- }
- if len(r.Value) != 2 {
- continue
- }
- v, _ := strconv.Atoi(r.Value[1].(string))
- ret = append(ret, monitor.CdfQueryData{
- Vmrange: vmrange,
- Value: v,
- })
- total += v
- }
- sort.Sort(ret)
- for i := range ret {
- ret[i].Total = total
- if i == 0 {
- ret[i].ValueAsc = ret[i].Value
- } else {
- ret[i].ValueAsc = ret[i-1].ValueAsc + ret[i].Value
- }
- }
- for i := len(ret) - 1; i >= 0; i-- {
- if i == len(ret)-1 {
- ret[i].ValueDesc = ret[i].Value
- } else {
- ret[i].ValueDesc = ret[i+1].ValueDesc + ret[i].Value
- }
- }
- result := &monitor.CdfQueryOutput{
- Data: monitor.CdfQueryDataSet{},
- }
- for i := range ret {
- v1 := ret[i]
- v1.Metric = v1.Start()
- v2 := v1.Copy()
- v2.Metric = v2.End()
- result.Data = append(result.Data, v1, v2)
- }
- return result, nil
- }
- func (self *SUnifiedMonitorManager) PerformResourceMetrics(
- ctx context.Context,
- userCred mcclient.TokenCredential,
- query jsonutils.JSONObject,
- data jsonutils.JSONObject,
- ) (jsonutils.JSONObject, error) {
- input := new(monitor.ResourceMetricsQueryInput)
- if err := data.Unmarshal(input); err != nil {
- return nil, httperrors.NewInputParameterError("unmarshal input: %v", err)
- }
- drv := GetResourceMetricDriver(input.ResType)
- if drv == nil {
- return nil, httperrors.NewInputParameterError("unsupported res_type %q", input.ResType)
- }
- if len(input.ResIds) == 0 {
- return nil, httperrors.NewMissingParameterError("res_ids")
- }
- if input.EndTime.IsZero() {
- input.EndTime = time.Now()
- }
- if input.StartTime.IsZero() {
- input.StartTime = input.EndTime.Add(-1 * time.Hour)
- }
- if len(input.Interval) == 0 {
- input.Interval = "5m"
- }
- tagKey := drv.GetTagKey()
- specs := drv.GetMetricSpecs()
- result := make(map[string]*monitor.ResourceMetricValues, len(input.ResIds))
- for _, id := range input.ResIds {
- result[id] = &monitor.ResourceMetricValues{}
- }
- for _, spec := range specs {
- qInput := mod.NewMetricQueryInputWithDB(TELEGRAF_DATABASE, spec.Measurement).SkipCheckSeries(true)
- sel := qInput.Selects()
- for _, f := range spec.Fields {
- sel.Select(f).MEAN()
- }
- where := qInput.Where()
- where.IN(tagKey, input.ResIds)
- qInput.GroupBy().TAG(tagKey)
- qInput.From(input.StartTime).To(input.EndTime).Interval(input.Interval)
- queryData := qInput.ToQueryData()
- dbRtn, err := self.performQuery(ctx, userCred, queryData)
- if err != nil {
- log.Warningf("query %s metrics error: %v", spec.Measurement, err)
- continue
- }
- for _, series := range dbRtn.Series {
- resId := series.Tags[tagKey]
- if resId == "" {
- continue
- }
- rv, ok := result[resId]
- if !ok {
- continue
- }
- // 取最后一个有效数据点
- for pi := len(series.Points) - 1; pi >= 0; pi-- {
- point := series.Points[pi]
- if len(point) < 2 {
- continue
- }
- allValid := true
- for fi := 0; fi < len(spec.Fields) && fi < len(point)-1; fi++ {
- if point[fi] == nil {
- allValid = false
- break
- }
- if fval, ok := point[fi].(*float64); !ok || fval == nil {
- allValid = false
- break
- }
- }
- if !allValid {
- continue
- }
- for fi, outputKey := range spec.OutputKeys {
- if fi < len(point)-1 {
- val := 0.0
- if fval, ok := point[fi].(*float64); ok && fval != nil {
- val = *fval
- } else if fval, ok := point[fi].(float64); ok {
- val = fval
- }
- SetResourceMetricValue(rv, outputKey, val)
- }
- }
- break
- }
- }
- }
- // 查询告警状态
- monResources, err := MonitorResourceManager.GetMonitorResources(monitor.MonitorResourceListInput{
- ResId: input.ResIds,
- })
- if err != nil {
- log.Warningf("GetMonitorResources error: %v", err)
- } else {
- for _, mr := range monResources {
- if rv, ok := result[mr.ResId]; ok {
- rv.AlertState = mr.AlertState
- }
- }
- }
- output := monitor.ResourceMetricsQueryOutput{
- ResourceMetrics: make(map[string]monitor.ResourceMetricValues, len(result)),
- }
- for id, rv := range result {
- output.ResourceMetrics[id] = *rv
- }
- return jsonutils.Marshal(output), nil
- }
|