unifiedmonitor.go 24 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828
  1. // Copyright 2019 Yunion
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package models
  15. import (
  16. "context"
  17. "fmt"
  18. "net/http"
  19. "sort"
  20. "strconv"
  21. "strings"
  22. "time"
  23. "github.com/influxdata/promql/v2/pkg/labels"
  24. "github.com/zexi/influxql-to-metricsql/converter/translator"
  25. "yunion.io/x/jsonutils"
  26. "yunion.io/x/log"
  27. "yunion.io/x/pkg/errors"
  28. "yunion.io/x/pkg/util/httputils"
  29. "yunion.io/x/pkg/util/rbacscope"
  30. "yunion.io/x/onecloud/pkg/apis/monitor"
  31. "yunion.io/x/onecloud/pkg/appsrv"
  32. "yunion.io/x/onecloud/pkg/cloudcommon/db"
  33. "yunion.io/x/onecloud/pkg/hostman/hostinfo/hostconsts"
  34. "yunion.io/x/onecloud/pkg/httperrors"
  35. "yunion.io/x/onecloud/pkg/mcclient"
  36. mod "yunion.io/x/onecloud/pkg/mcclient/modules/monitor"
  37. "yunion.io/x/onecloud/pkg/monitor/datasource"
  38. merrors "yunion.io/x/onecloud/pkg/monitor/errors"
  39. mq "yunion.io/x/onecloud/pkg/monitor/metricquery"
  40. "yunion.io/x/onecloud/pkg/monitor/options"
  41. "yunion.io/x/onecloud/pkg/monitor/tsdb/driver/victoriametrics"
  42. "yunion.io/x/onecloud/pkg/monitor/validators"
  43. )
  44. const (
  45. TELEGRAF_DATABASE = "telegraf"
  46. )
  47. var (
  48. UnifiedMonitorManager *SUnifiedMonitorManager
  49. )
  50. func init() {
  51. UnifiedMonitorManager = &SUnifiedMonitorManager{
  52. SVirtualResourceBaseManager: db.NewVirtualResourceBaseManager(
  53. &SUnifiedMonitorManager{},
  54. "",
  55. "unifiedmonitor",
  56. "unifiedmonitors",
  57. ),
  58. }
  59. UnifiedMonitorManager.SetVirtualObject(UnifiedMonitorManager)
  60. }
  61. type SUnifiedMonitorManager struct {
  62. db.SVirtualResourceBaseManager
  63. }
  64. type SUnifiedMonitorModel struct {
  65. }
  66. func (self *SUnifiedMonitorManager) GetPropertyDatabases(ctx context.Context, userCred mcclient.TokenCredential,
  67. query jsonutils.JSONObject) (jsonutils.JSONObject, error) {
  68. return DataSourceManager.GetDatabases()
  69. }
  70. func (self *SUnifiedMonitorManager) GetPropertyMeasurements(ctx context.Context, userCred mcclient.TokenCredential,
  71. query jsonutils.JSONObject) (jsonutils.JSONObject, error) {
  72. filter, err := getTagFilterByRequestQuery(ctx, userCred, query)
  73. if err != nil {
  74. return nil, errors.Wrap(err, "getTagFilterByRequestQuery")
  75. }
  76. return DataSourceManager.GetMeasurementsWithDescriptionInfos(query, filter)
  77. }
  78. func getTagFilterByRequestQuery(ctx context.Context, userCred mcclient.TokenCredential, query jsonutils.JSONObject) (*monitor.MetricQueryTag, error) {
  79. scope, _ := query.GetString("scope")
  80. return filterByScope(ctx, userCred, scope, query)
  81. }
  82. func filterByScope(ctx context.Context, userCred mcclient.TokenCredential, scope string, data jsonutils.JSONObject) (*monitor.MetricQueryTag, error) {
  83. domainId := jsonutils.GetAnyString(data, db.DomainFetchKeys)
  84. projectId := jsonutils.GetAnyString(data, db.ProjectFetchKeys)
  85. if projectId != "" {
  86. project, err := db.DefaultProjectFetcher(ctx, projectId, domainId)
  87. if err != nil {
  88. return nil, errors.Wrap(err, "db.DefaultProjectFetcher")
  89. }
  90. projectId = project.GetProjectId()
  91. domainId = project.GetProjectDomainId()
  92. }
  93. if domainId != "" {
  94. domain, err := db.DefaultDomainFetcher(ctx, domainId)
  95. if err != nil {
  96. return nil, errors.Wrap(err, "db.DefaultDomainFetcher")
  97. }
  98. domainId = domain.GetProjectDomainId()
  99. domain.GetProjectId()
  100. }
  101. switch scope {
  102. case "system":
  103. return nil, nil
  104. case "domain":
  105. if domainId == "" {
  106. domainId = userCred.GetProjectDomainId()
  107. }
  108. return getProjectIdsFilterByDomain(domainId)
  109. default:
  110. if projectId == "" {
  111. projectId = userCred.GetProjectId()
  112. }
  113. return getProjectIdFilterByProject(projectId)
  114. }
  115. }
  116. func getTenantIdStr(role string, userCred mcclient.TokenCredential) (*monitor.MetricQueryTag, error) {
  117. if role == "admin" {
  118. return nil, nil
  119. }
  120. if role == "domainadmin" {
  121. domainId := userCred.GetDomainId()
  122. return getProjectIdsFilterByDomain(domainId)
  123. }
  124. if role == "member" {
  125. tenantId := userCred.GetProjectId()
  126. return getProjectIdFilterByProject(tenantId)
  127. }
  128. return nil, errors.Wrapf(errors.ErrNotFound, "not supported role %q", role)
  129. }
  130. func getProjectIdsFilterByDomain(domainId string) (*monitor.MetricQueryTag, error) {
  131. //s := auth.GetAdminSession(context.Background(), "", "")
  132. //params := jsonutils.Marshal(map[string]string{"domain_id": domainId})
  133. //tenants, err := modules.Projects.List(s, params)
  134. //if err != nil {
  135. // return "", errors.Wrap(err, "Projects.List")
  136. //}
  137. //var buffer bytes.Buffer
  138. //buffer.WriteString("( ")
  139. //for index, tenant := range tenants.Data {
  140. // tenantId, _ := tenant.GetString("id")
  141. // if index != len(tenants.Data)-1 {
  142. // buffer.WriteString(fmt.Sprintf(" %s =~ /%s/ %s ", "tenant_id", tenantId, "OR"))
  143. // } else {
  144. // buffer.WriteString(fmt.Sprintf(" %s =~ /%s/ ", "tenant_id", tenantId))
  145. // }
  146. //}
  147. //buffer.WriteString(" )")
  148. //return buffer.String(), nil
  149. return &monitor.MetricQueryTag{
  150. Key: "domain_id",
  151. Operator: "=~",
  152. Value: fmt.Sprintf("/%s/", domainId),
  153. }, nil
  154. //return fmt.Sprintf(`%s =~ /%s/`, "domain_id", domainId), nil
  155. }
  156. func getProjectIdFilterByProject(projectId string) (*monitor.MetricQueryTag, error) {
  157. //return fmt.Sprintf(`%s =~ /%s/`, "tenant_id", projectId), nil
  158. return &monitor.MetricQueryTag{
  159. Key: "tenant_id",
  160. Operator: "=~",
  161. Value: fmt.Sprintf("/%s/", projectId),
  162. }, nil
  163. }
  164. func (self *SUnifiedMonitorManager) GetPropertyMetricMeasurement(ctx context.Context, userCred mcclient.TokenCredential, query jsonutils.JSONObject) (jsonutils.JSONObject, error) {
  165. metricFunc := monitor.MetricFunc{
  166. FieldOptType: monitor.UNIFIED_MONITOR_FIELD_OPT_TYPE,
  167. FieldOptValue: monitor.UNIFIED_MONITOR_FIELD_OPT_VALUE,
  168. GroupOptType: monitor.UNIFIED_MONITOR_GROUPBY_OPT_TYPE,
  169. GroupOptValue: monitor.UNIFIED_MONITOR_GROUPBY_OPT_VALUE,
  170. }
  171. filter, err := getTagFilterByRequestQuery(ctx, userCred, query)
  172. if err != nil {
  173. return nil, errors.Wrapf(err, "getTagFilterByRequestQuery %s", query.String())
  174. }
  175. rtn, err := DataSourceManager.GetMetricMeasurement(userCred, query, filter)
  176. if err != nil {
  177. return nil, errors.Wrapf(err, "GetMetricMeasurement by query %s, filter %s", query.String(), filter)
  178. }
  179. rtn.(*jsonutils.JSONDict).Add(jsonutils.Marshal(&metricFunc), "func")
  180. return rtn, nil
  181. }
  182. func (self *SUnifiedMonitorManager) SetHandlerProcessTimeout(info *appsrv.SHandlerInfo, r *http.Request) time.Duration {
  183. return 5 * time.Minute
  184. }
  185. // +onecloud:swagger-gen-route-method=POST
  186. // +onecloud:swagger-gen-route-path=/unifiedmonitors/query
  187. // +onecloud:swagger-gen-route-tag=unifiedmonitor
  188. // +onecloud:swagger-gen-param-body-index=0
  189. // +onecloud:swagger-gen-resp-index=0
  190. // +onecloud:swagger-gen-resp-body-key=unifiedmonitor
  191. // 查询监控数据接口
  192. func PerformQuery(input monitor.MetricQueryInput) *monitor.MetricsQueryResult {
  193. return nil
  194. }
  195. func (self *SUnifiedMonitorManager) PerformQuery(ctx context.Context, userCred mcclient.TokenCredential, query jsonutils.JSONObject, data jsonutils.JSONObject) (*monitor.MetricsQueryResult, error) {
  196. tmp := jsonutils.DeepCopy(data)
  197. self.handleDataPreSignature(ctx, tmp)
  198. if !options.Options.DisableQuerySignatureCheck {
  199. if err := ValidateQuerySignature(tmp); err != nil {
  200. return nil, errors.Wrap(err, "ValidateQuerySignature")
  201. }
  202. }
  203. inputQuery := new(monitor.MetricQueryInput)
  204. err := data.Unmarshal(inputQuery)
  205. if err != nil {
  206. return nil, err
  207. }
  208. if len(inputQuery.MetricQuery) == 0 {
  209. return nil, merrors.NewArgIsEmptyErr("metric_query")
  210. }
  211. for _, q := range inputQuery.MetricQuery {
  212. scope, _ := data.GetString("scope")
  213. ownId, _ := self.FetchOwnerId(ctx, data)
  214. if ownId == nil {
  215. ownId = userCred
  216. }
  217. setDefaultValue(q, inputQuery, scope, ownId, false)
  218. if err := self.ValidateInputQuery(q, inputQuery); err != nil {
  219. return nil, errors.Wrapf(err, "ValidateInputQuery")
  220. }
  221. }
  222. var groupByTag = make([]string, 0)
  223. for _, query := range inputQuery.MetricQuery {
  224. for _, group := range query.Model.GroupBy {
  225. if group.Type == "tag" {
  226. groupByTag = append(groupByTag, group.Params[0])
  227. }
  228. }
  229. }
  230. return self.performQuery(ctx, userCred, inputQuery)
  231. }
  232. func (self *SUnifiedMonitorManager) performQuery(ctx context.Context, userCred mcclient.TokenCredential, inputQuery *monitor.MetricQueryInput) (*monitor.MetricsQueryResult, error) {
  233. rtn, err := doQuery(userCred, *inputQuery)
  234. if err != nil {
  235. return nil, errors.Wrapf(err, "doQuery with input %s", jsonutils.Marshal(inputQuery))
  236. }
  237. if len(inputQuery.Soffset) != 0 && len(inputQuery.Slimit) != 0 {
  238. // seriesTotal := self.fillSearchSeriesTotalQuery(userCred, *inputQuery.MetricQuery[0])
  239. // do offset and limit
  240. total := rtn.SeriesTotal
  241. offset, err := strconv.Atoi(inputQuery.Soffset)
  242. if err != nil {
  243. return nil, httperrors.NewInputParameterError("soffset %q is not integer", inputQuery.Soffset)
  244. }
  245. limit, err := strconv.Atoi(inputQuery.Slimit)
  246. if err != nil {
  247. return nil, httperrors.NewInputParameterError("slimit %q is not integer", inputQuery.Slimit)
  248. }
  249. start := offset
  250. end := start + limit
  251. if end > int(total) {
  252. end = int(total)
  253. }
  254. ss := rtn.Series
  255. if start >= end {
  256. rtn.Series = nil
  257. } else {
  258. rtn.Series = ss[start:end]
  259. }
  260. }
  261. fillSerieTags(&rtn.Series)
  262. return rtn, nil
  263. }
  264. func (self *SUnifiedMonitorManager) fillSearchSeriesTotalQuery(userCred mcclient.TokenCredential, fork monitor.AlertQuery) int64 {
  265. newGroupByPart := make([]monitor.MetricQueryPart, 0)
  266. newGroupByPart = append(newGroupByPart, fork.Model.GroupBy[0])
  267. fork.Model.GroupBy = newGroupByPart
  268. forkInputQury := new(monitor.MetricQueryInput)
  269. forkInputQury.MetricQuery = []*monitor.AlertQuery{&fork}
  270. rtn, err := doQuery(userCred, *forkInputQury)
  271. if err != nil {
  272. log.Errorf("exec forkInputQury err:%v", err)
  273. return 0
  274. }
  275. return int64(len(rtn.Series))
  276. }
  277. func (self *SUnifiedMonitorManager) handleDataPreSignature(ctx context.Context, data jsonutils.JSONObject) {
  278. scope, _ := data.GetString("scope")
  279. isIdentityName, _ := data.Bool("identity_name")
  280. switch scope {
  281. case "system":
  282. case "domain":
  283. domain, err := data.GetString("project_domain")
  284. if err == nil {
  285. domainObj, _ := db.DefaultDomainFetcher(ctx, domain)
  286. if isIdentityName {
  287. domain = domainObj.Name
  288. }
  289. data.(*jsonutils.JSONDict).Remove("project_domain")
  290. data.(*jsonutils.JSONDict).Set("domain_id", jsonutils.NewString(domain))
  291. }
  292. default:
  293. project, err := data.GetString("project")
  294. if err == nil {
  295. domain, _ := data.GetString("project_domain")
  296. tenant, _ := db.DefaultProjectFetcher(ctx, project, domain)
  297. if isIdentityName {
  298. project = tenant.Name
  299. }
  300. data.(*jsonutils.JSONDict).Remove("project")
  301. data.(*jsonutils.JSONDict).Set("project_id", jsonutils.NewString(project))
  302. }
  303. }
  304. }
  305. func doQuery(userCred mcclient.TokenCredential, query monitor.MetricQueryInput) (*monitor.MetricsQueryResult, error) {
  306. conds := make([]*monitor.AlertCondition, 0)
  307. for _, q := range query.MetricQuery {
  308. if q.To == "" {
  309. q.To = query.To
  310. }
  311. if q.From == "" {
  312. q.From = query.From
  313. }
  314. if q.Model.Interval == "" {
  315. q.Model.Interval = query.Interval
  316. }
  317. condition := monitor.AlertCondition{
  318. Type: monitor.ConditionTypeMetricQuery,
  319. Query: *q,
  320. }
  321. if q.ResultReducer != nil {
  322. condition.Reducer = *q.ResultReducer
  323. condition.ReducerOrder = q.ResultReducerOrder
  324. }
  325. conds = append(conds, &condition)
  326. }
  327. factory := mq.GetQueryFactories()[monitor.ConditionTypeMetricQuery]
  328. metricQ, err := factory(conds)
  329. if err != nil {
  330. return nil, errors.Wrap(err, "factory")
  331. }
  332. metrics, err := metricQ.ExecuteQuery(userCred, query.Scope, query.SkipCheckSeries)
  333. if err != nil {
  334. return nil, errors.Wrap(err, "ExecuteQuery")
  335. }
  336. // drop metas contains raw_query
  337. if !query.ShowMeta {
  338. metrics.Metas = nil
  339. }
  340. metrics.SeriesTotal = int64(len(metrics.Series))
  341. return metrics, nil
  342. }
  343. func (self *SUnifiedMonitorManager) ValidateInputQuery(query *monitor.AlertQuery, input *monitor.MetricQueryInput) error {
  344. if input.From == "" {
  345. input.From = "1h"
  346. }
  347. if input.To == "" {
  348. input.To = "now"
  349. }
  350. if input.Interval == "" {
  351. input.Interval = "5m"
  352. if input.To == "now" {
  353. if input.From == "10m" {
  354. input.Interval = "1m"
  355. }
  356. }
  357. }
  358. if query.From == "" {
  359. query.From = input.From
  360. }
  361. if query.Model.Interval == "" {
  362. query.Model.Interval = input.Interval
  363. }
  364. if query.To == "" {
  365. query.To = input.To
  366. }
  367. if _, err := time.ParseDuration(query.Model.Interval); err != nil {
  368. return httperrors.NewInputParameterError("Invalid interval format: %s", query.Model.Interval)
  369. }
  370. return validators.ValidateSelectOfMetricQuery(*query)
  371. }
  372. func setDefaultValue(
  373. query *monitor.AlertQuery,
  374. inputQuery *monitor.MetricQueryInput,
  375. scope string, ownerId mcclient.IIdentityProvider,
  376. isAlert bool) {
  377. if query.From == "" {
  378. query.From = inputQuery.From
  379. }
  380. if query.To == "" {
  381. query.To = inputQuery.To
  382. }
  383. if query.Model.Interval == "" {
  384. query.Model.Interval = inputQuery.Interval
  385. }
  386. metricMeasurement, _ := MetricMeasurementManager.GetCache().Get(query.Model.Measurement)
  387. checkQueryGroupBy(query, inputQuery, isAlert)
  388. if len(inputQuery.Interval) != 0 {
  389. query.Model.GroupBy = append(query.Model.GroupBy,
  390. monitor.MetricQueryPart{
  391. Type: "time",
  392. Params: []string{"$interval"},
  393. },
  394. monitor.MetricQueryPart{
  395. Type: "fill",
  396. Params: []string{"none"},
  397. })
  398. }
  399. // HACK: not set slimit and soffset, getting all series then do offset and limit
  400. // if len(inputQuery.Slimit) != 0 && len(inputQuery.Soffset) != 0 {
  401. // query.Model.GroupBy = append(query.Model.GroupBy,
  402. // monitor.MetricQueryPart{Type: "slimit", Params: []string{inputQuery.Slimit}},
  403. // monitor.MetricQueryPart{Type: "soffset", Params: []string{inputQuery.Soffset}},
  404. // )
  405. // }
  406. if query.Model.Database == "" {
  407. database := ""
  408. if metricMeasurement == nil {
  409. log.Warningf("Not found measurement %s from metrics measurement cache", query.Model.Measurement)
  410. } else {
  411. database = metricMeasurement.Database
  412. }
  413. if database == "" {
  414. // hack: query from default telegraf database if no metric measurement matched
  415. database = TELEGRAF_DATABASE
  416. }
  417. query.Model.Database = database
  418. }
  419. drv, _ := DataSourceManager.GetTSDBDriver()
  420. query = drv.FillSelect(query, isAlert)
  421. var projectId, domainId string
  422. switch rbacscope.TRbacScope(scope) {
  423. case rbacscope.ScopeProject:
  424. projectId = ownerId.GetProjectId()
  425. containId := false
  426. for _, tagFilter := range query.Model.Tags {
  427. if tagFilter.Key == "tenant_id" {
  428. containId = true
  429. break
  430. }
  431. }
  432. if !containId {
  433. query.Model.Tags = append(query.Model.Tags, monitor.MetricQueryTag{
  434. Key: "tenant_id",
  435. Operator: "=",
  436. Value: projectId,
  437. Condition: "and",
  438. })
  439. }
  440. case rbacscope.ScopeDomain:
  441. domainId = ownerId.GetProjectDomainId()
  442. containId := false
  443. for _, tagFilter := range query.Model.Tags {
  444. if tagFilter.Key == "domain_id" {
  445. containId = true
  446. break
  447. }
  448. }
  449. if !containId {
  450. query.Model.Tags = append(query.Model.Tags, monitor.MetricQueryTag{
  451. Key: "domain_id",
  452. Operator: "=",
  453. Value: domainId,
  454. Condition: "and",
  455. })
  456. }
  457. }
  458. if metricMeasurement != nil && metricMeasurement.ResType == hostconsts.TELEGRAF_TAG_ONECLOUD_RES_TYPE {
  459. query.Model.Tags = append(query.Model.Tags, monitor.MetricQueryTag{
  460. Key: hostconsts.TELEGRAF_TAG_KEY_RES_TYPE,
  461. Operator: "=",
  462. Value: hostconsts.TELEGRAF_TAG_ONECLOUD_RES_TYPE,
  463. Condition: "and",
  464. })
  465. }
  466. }
  467. func checkQueryGroupBy(query *monitor.AlertQuery, inputQuery *monitor.MetricQueryInput, isAlert bool) {
  468. if len(query.Model.GroupBy) != 0 {
  469. return
  470. }
  471. metricMeasurement, _ := MetricMeasurementManager.GetCache().Get(query.Model.Measurement)
  472. if inputQuery.Unit || metricMeasurement == nil && query.Model.Database == monitor.METRIC_DATABASE_METER {
  473. return
  474. }
  475. tagId := ""
  476. if metricMeasurement != nil {
  477. tagId = monitor.GetMeasurementTagIdKeyByResType(metricMeasurement.ResType)
  478. }
  479. drv, _ := DataSourceManager.GetTSDBDriver()
  480. query = drv.FillGroupBy(query, inputQuery, tagId, isAlert)
  481. }
  482. func fillSerieTags(series *monitor.TimeSeriesSlice) {
  483. for i, serie := range *series {
  484. for _, tag := range []string{"brand", "platform", "hypervisor"} {
  485. if val, ok := serie.Tags[tag]; ok {
  486. serie.Tags["brand"] = val
  487. break
  488. }
  489. }
  490. for _, tag := range []string{
  491. "source", "status", hostconsts.TELEGRAF_TAG_KEY_HOST_TYPE,
  492. hostconsts.TELEGRAF_TAG_KEY_RES_TYPE,
  493. "is_vm", "os_type", "domain_name", "region",
  494. labels.MetricName, translator.UNION_RESULT_NAME,
  495. } {
  496. if _, ok := serie.Tags[tag]; ok {
  497. delete(serie.Tags, tag)
  498. }
  499. }
  500. if val, ok := serie.Tags[VICTORIA_METRICS_DB_TAG_KEY]; ok {
  501. if val == VICTORIA_METRICS_DB_TAG_VAL_TELEGRAF {
  502. delete(serie.Tags, VICTORIA_METRICS_DB_TAG_KEY)
  503. }
  504. }
  505. (*series)[i] = serie
  506. }
  507. }
  508. func (self *SUnifiedMonitorManager) GetPropertySimpleQuery(ctx context.Context, userCred mcclient.TokenCredential, input *monitor.SimpleQueryInput) (jsonutils.JSONObject, error) {
  509. if len(input.Database) == 0 {
  510. input.Database = "telegraf"
  511. }
  512. if len(input.MetricName) == 0 {
  513. return nil, httperrors.NewMissingParameterError("metric_name")
  514. }
  515. metric := strings.Split(input.MetricName, ".")
  516. if len(metric) != 2 {
  517. return nil, httperrors.NewInputParameterError("invalid metric_name %s", input.MetricName)
  518. }
  519. measurement, field := metric[0], metric[1]
  520. data := mod.NewMetricQueryInputWithDB(input.Database, measurement).SkipCheckSeries(true)
  521. data.Selects().Select(field)
  522. where := data.Where()
  523. if len(input.Id) > 0 {
  524. where.Equal("id", input.Id)
  525. }
  526. if input.EndTime.IsZero() {
  527. input.EndTime = time.Now()
  528. }
  529. if input.StartTime.IsZero() {
  530. input.StartTime = input.EndTime.Add(time.Hour * -1)
  531. }
  532. if input.EndTime.Sub(input.StartTime).Hours() > 1 {
  533. return nil, httperrors.NewInputParameterError("The query interval is greater than one hour")
  534. }
  535. for k, v := range input.Tags {
  536. where.Equal(k, v)
  537. }
  538. if len(input.Interval) == 0 {
  539. input.Interval = "5m"
  540. }
  541. _, err := time.ParseDuration(input.Interval)
  542. if err != nil {
  543. return nil, httperrors.NewInputParameterError("invalid interval %s", input.Interval)
  544. }
  545. data.From(input.StartTime).To(input.EndTime).Interval(input.Interval)
  546. queryData := data.ToQueryData()
  547. dbRtn, err := self.performQuery(ctx, userCred, queryData)
  548. if err != nil {
  549. return nil, errors.Wrapf(err, "performQuery with data: %s", jsonutils.Marshal(queryData))
  550. }
  551. ret := []monitor.SimpleQueryOutput{}
  552. for _, s := range dbRtn.Series {
  553. id, ok := s.Tags["id"]
  554. if !ok {
  555. log.Warningf("Not found id from series: %s", jsonutils.Marshal(s))
  556. continue
  557. }
  558. for _, point := range s.Points {
  559. if len(point) != 2 {
  560. log.Warningf("invalid series: %s", jsonutils.Marshal(s))
  561. break
  562. }
  563. timestamp := point[len(point)-1]
  564. valPtr, ok := point[0].(*float64)
  565. if !ok || valPtr == nil {
  566. log.Warningf("invalid series point: %#v", point)
  567. break
  568. }
  569. ret = append(ret, monitor.SimpleQueryOutput{
  570. Id: id,
  571. Time: time.UnixMilli(int64(timestamp.(float64))),
  572. Value: *valPtr,
  573. })
  574. }
  575. }
  576. return jsonutils.Marshal(map[string]interface{}{"values": ret}), nil
  577. }
  578. func (self *SUnifiedMonitorManager) GetPropertyCdfQuery(ctx context.Context, userCred mcclient.TokenCredential, input *monitor.CdfQueryInput) (*monitor.CdfQueryOutput, error) {
  579. if len(input.Database) == 0 {
  580. input.Database = "telegraf"
  581. }
  582. if len(input.MetricName) == 0 {
  583. return nil, httperrors.NewMissingParameterError("metric_name")
  584. }
  585. input.MetricName = strings.Replace(input.MetricName, ".", "_", 1)
  586. ds, err := datasource.GetDefaultSource(input.Database)
  587. if err != nil {
  588. return nil, errors.Wrapf(err, "GetDefaultSource")
  589. }
  590. client, err := victoriametrics.NewClient(ds.Url)
  591. if err != nil {
  592. return nil, errors.Wrapf(err, "NewClient")
  593. }
  594. if len(input.Period) == 0 {
  595. input.Period = "1h"
  596. }
  597. sql := fmt.Sprintf("sum(histogram_over_time(avg_over_time(%s[%s]))) by (vmrange)", input.MetricName, input.Period)
  598. if len(input.Tags) > 0 {
  599. tags := []string{}
  600. for k, v := range input.Tags {
  601. tags = append(tags, fmt.Sprintf(`%s="%s"`, k, v))
  602. }
  603. sql = fmt.Sprintf("sum(histogram_over_time(avg_over_time(%s{%s}[%s]))) by (vmrange)", input.MetricName, strings.Join(tags, ","), input.Period)
  604. }
  605. resp, err := client.RawQuery(ctx, httputils.GetAdaptiveTimeoutClient(), sql, true)
  606. if err != nil {
  607. return nil, err
  608. }
  609. ret, total := monitor.CdfQueryDataSet{}, 0
  610. for _, r := range resp.Data.Result {
  611. if len(r.Metric) == 0 {
  612. continue
  613. }
  614. vmrange, _ := r.Metric["vmrange"]
  615. if len(vmrange) == 0 {
  616. continue
  617. }
  618. if len(r.Value) != 2 {
  619. continue
  620. }
  621. v, _ := strconv.Atoi(r.Value[1].(string))
  622. ret = append(ret, monitor.CdfQueryData{
  623. Vmrange: vmrange,
  624. Value: v,
  625. })
  626. total += v
  627. }
  628. sort.Sort(ret)
  629. for i := range ret {
  630. ret[i].Total = total
  631. if i == 0 {
  632. ret[i].ValueAsc = ret[i].Value
  633. } else {
  634. ret[i].ValueAsc = ret[i-1].ValueAsc + ret[i].Value
  635. }
  636. }
  637. for i := len(ret) - 1; i >= 0; i-- {
  638. if i == len(ret)-1 {
  639. ret[i].ValueDesc = ret[i].Value
  640. } else {
  641. ret[i].ValueDesc = ret[i+1].ValueDesc + ret[i].Value
  642. }
  643. }
  644. result := &monitor.CdfQueryOutput{
  645. Data: monitor.CdfQueryDataSet{},
  646. }
  647. for i := range ret {
  648. v1 := ret[i]
  649. v1.Metric = v1.Start()
  650. v2 := v1.Copy()
  651. v2.Metric = v2.End()
  652. result.Data = append(result.Data, v1, v2)
  653. }
  654. return result, nil
  655. }
  656. func (self *SUnifiedMonitorManager) PerformResourceMetrics(
  657. ctx context.Context,
  658. userCred mcclient.TokenCredential,
  659. query jsonutils.JSONObject,
  660. data jsonutils.JSONObject,
  661. ) (jsonutils.JSONObject, error) {
  662. input := new(monitor.ResourceMetricsQueryInput)
  663. if err := data.Unmarshal(input); err != nil {
  664. return nil, httperrors.NewInputParameterError("unmarshal input: %v", err)
  665. }
  666. drv := GetResourceMetricDriver(input.ResType)
  667. if drv == nil {
  668. return nil, httperrors.NewInputParameterError("unsupported res_type %q", input.ResType)
  669. }
  670. if len(input.ResIds) == 0 {
  671. return nil, httperrors.NewMissingParameterError("res_ids")
  672. }
  673. if input.EndTime.IsZero() {
  674. input.EndTime = time.Now()
  675. }
  676. if input.StartTime.IsZero() {
  677. input.StartTime = input.EndTime.Add(-1 * time.Hour)
  678. }
  679. if len(input.Interval) == 0 {
  680. input.Interval = "5m"
  681. }
  682. tagKey := drv.GetTagKey()
  683. specs := drv.GetMetricSpecs()
  684. result := make(map[string]*monitor.ResourceMetricValues, len(input.ResIds))
  685. for _, id := range input.ResIds {
  686. result[id] = &monitor.ResourceMetricValues{}
  687. }
  688. for _, spec := range specs {
  689. qInput := mod.NewMetricQueryInputWithDB(TELEGRAF_DATABASE, spec.Measurement).SkipCheckSeries(true)
  690. sel := qInput.Selects()
  691. for _, f := range spec.Fields {
  692. sel.Select(f).MEAN()
  693. }
  694. where := qInput.Where()
  695. where.IN(tagKey, input.ResIds)
  696. qInput.GroupBy().TAG(tagKey)
  697. qInput.From(input.StartTime).To(input.EndTime).Interval(input.Interval)
  698. queryData := qInput.ToQueryData()
  699. dbRtn, err := self.performQuery(ctx, userCred, queryData)
  700. if err != nil {
  701. log.Warningf("query %s metrics error: %v", spec.Measurement, err)
  702. continue
  703. }
  704. for _, series := range dbRtn.Series {
  705. resId := series.Tags[tagKey]
  706. if resId == "" {
  707. continue
  708. }
  709. rv, ok := result[resId]
  710. if !ok {
  711. continue
  712. }
  713. // 取最后一个有效数据点
  714. for pi := len(series.Points) - 1; pi >= 0; pi-- {
  715. point := series.Points[pi]
  716. if len(point) < 2 {
  717. continue
  718. }
  719. allValid := true
  720. for fi := 0; fi < len(spec.Fields) && fi < len(point)-1; fi++ {
  721. if point[fi] == nil {
  722. allValid = false
  723. break
  724. }
  725. if fval, ok := point[fi].(*float64); !ok || fval == nil {
  726. allValid = false
  727. break
  728. }
  729. }
  730. if !allValid {
  731. continue
  732. }
  733. for fi, outputKey := range spec.OutputKeys {
  734. if fi < len(point)-1 {
  735. val := 0.0
  736. if fval, ok := point[fi].(*float64); ok && fval != nil {
  737. val = *fval
  738. } else if fval, ok := point[fi].(float64); ok {
  739. val = fval
  740. }
  741. SetResourceMetricValue(rv, outputKey, val)
  742. }
  743. }
  744. break
  745. }
  746. }
  747. }
  748. // 查询告警状态
  749. monResources, err := MonitorResourceManager.GetMonitorResources(monitor.MonitorResourceListInput{
  750. ResId: input.ResIds,
  751. })
  752. if err != nil {
  753. log.Warningf("GetMonitorResources error: %v", err)
  754. } else {
  755. for _, mr := range monResources {
  756. if rv, ok := result[mr.ResId]; ok {
  757. rv.AlertState = mr.AlertState
  758. }
  759. }
  760. }
  761. output := monitor.ResourceMetricsQueryOutput{
  762. ResourceMetrics: make(map[string]monitor.ResourceMetricValues, len(result)),
  763. }
  764. for id, rv := range result {
  765. output.ResourceMetrics[id] = *rv
  766. }
  767. return jsonutils.Marshal(output), nil
  768. }