subcription.go 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440
  1. // Copyright 2019 Yunion
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package subscriptionmodel
  15. import (
  16. "context"
  17. "encoding/json"
  18. "fmt"
  19. "strings"
  20. "sync"
  21. "time"
  22. "yunion.io/x/jsonutils"
  23. "yunion.io/x/log"
  24. "yunion.io/x/pkg/errors"
  25. "yunion.io/x/onecloud/pkg/apis/monitor"
  26. "yunion.io/x/onecloud/pkg/cloudcommon/db"
  27. "yunion.io/x/onecloud/pkg/cloudcommon/notifyclient"
  28. "yunion.io/x/onecloud/pkg/mcclient"
  29. "yunion.io/x/onecloud/pkg/mcclient/auth"
  30. "yunion.io/x/onecloud/pkg/mcclient/modules/notify"
  31. "yunion.io/x/onecloud/pkg/monitor/alerting"
  32. cond "yunion.io/x/onecloud/pkg/monitor/alerting/conditions"
  33. "yunion.io/x/onecloud/pkg/monitor/alerting/notifiers"
  34. "yunion.io/x/onecloud/pkg/monitor/alerting/notifiers/templates"
  35. sub "yunion.io/x/onecloud/pkg/monitor/influxdbsubscribe"
  36. "yunion.io/x/onecloud/pkg/monitor/models"
  37. "yunion.io/x/onecloud/pkg/monitor/registry"
  38. )
  39. var (
  40. SubscriptionManager *SSubscriptionManager
  41. )
  42. func init() {
  43. SubscriptionManager = &SSubscriptionManager{
  44. SVirtualResourceBaseManager: db.NewVirtualResourceBaseManager(
  45. &SSubscriptionManager{},
  46. "",
  47. "subscription",
  48. "subscriptions",
  49. ),
  50. systemAlerts: new(sync.Map),
  51. }
  52. SubscriptionManager.SetVirtualObject(SubscriptionManager)
  53. }
  54. type SSubscriptionManager struct {
  55. db.SVirtualResourceBaseManager
  56. systemAlerts *sync.Map
  57. }
  58. func (self *SSubscriptionManager) getThisFunctionUrl() string {
  59. return fmt.Sprintf("https://%s:%d/%s", monitor.MonitorComponentType, monitor.MonitorComponentPort, monitor.SubscribAPI)
  60. }
  61. func (self *SSubscriptionManager) AddSubscription() {
  62. sub := models.InfluxdbSubscription{
  63. SubName: monitor.MonitorSubName,
  64. DataBase: monitor.MonitorSubDataBase,
  65. Rc: monitor.MonitorDefaultRC,
  66. Url: self.getThisFunctionUrl(),
  67. }
  68. err := models.DataSourceManager.DropSubscription(sub)
  69. if err != nil {
  70. log.Errorln("DropSubscription err:", err)
  71. return
  72. }
  73. log.Infof("drop success")
  74. /*err = models.DataSourceManager.AddSubscription(sub)
  75. if err != nil {
  76. log.Errorln("add subscription err:", err)
  77. return
  78. }
  79. log.Infof("add success")
  80. if err := self.LoadSystemAlerts(); err != nil {
  81. log.Errorf("load system alerts error: %v", err)
  82. return
  83. }*/
  84. }
  85. func (self *SSubscriptionManager) LoadSystemAlerts() error {
  86. alerts, err := models.CommonAlertManager.GetSystemAlerts()
  87. if err != nil {
  88. return errors.Wrap(err, "load system alerts")
  89. }
  90. for _, alert := range alerts {
  91. self.SetAlert(&alert)
  92. }
  93. return nil
  94. }
  95. func (self *SSubscriptionManager) SetAlert(alert *models.SCommonAlert) {
  96. self.systemAlerts.Store(alert.GetId(), alert)
  97. }
  98. func (self *SSubscriptionManager) DeleteAlert(alert *models.SCommonAlert) {
  99. self.systemAlerts.Delete(alert.GetId())
  100. }
  101. func (self *SSubscriptionManager) GetSystemAlerts() []*models.SCommonAlert {
  102. ret := make([]*models.SCommonAlert, 0)
  103. self.systemAlerts.Range(func(key, val interface{}) bool {
  104. ret = append(ret, val.(*models.SCommonAlert))
  105. return true
  106. })
  107. return nil
  108. }
  109. func (self *SSubscriptionManager) PerformWrite(ctx context.Context, userCred mcclient.TokenCredential,
  110. query jsonutils.JSONObject, data []sub.Point) {
  111. sysAlerts := self.GetSystemAlerts()
  112. for _, sysalert := range sysAlerts {
  113. details := monitor.CommonAlertDetails{}
  114. details, err := sysalert.GetMoreDetails(ctx, details)
  115. if err != nil {
  116. log.Errorln("sysalert getMoreDetails err", err)
  117. continue
  118. }
  119. for _, metricDetails := range details.CommonAlertMetricDetails {
  120. evalMatch, match, err := self.Eval(*metricDetails, *sysalert, data)
  121. if err != nil {
  122. log.Errorln("SSubscriptionManager Eval error:", err)
  123. continue
  124. }
  125. if evalMatch {
  126. evalCtx := alerting.EvalContext{
  127. Firing: true,
  128. IsTestRun: false,
  129. IsDebug: false,
  130. EvalMatches: []*monitor.EvalMatch{match},
  131. Logs: nil,
  132. Error: nil,
  133. ConditionEvals: "",
  134. StartTime: time.Now(),
  135. EndTime: time.Now(),
  136. Rule: nil,
  137. NoDataFound: false,
  138. PrevAlertState: sysalert.GetState(),
  139. Ctx: context.Background(),
  140. UserCred: auth.AdminCredential(),
  141. }
  142. err := self.evalNotifyOfAlert(*sysalert, *metricDetails, evalCtx)
  143. if err != nil {
  144. log.Errorln(err)
  145. }
  146. }
  147. }
  148. }
  149. }
  150. func getPointsMeasurement(points []sub.Point) string {
  151. measurements := make(map[string]int)
  152. strBuff := new(strings.Builder)
  153. for _, point := range points {
  154. if val, ok := measurements[point.Name()]; ok {
  155. measurements[point.Name()] = val + 1
  156. }
  157. measurements[point.Name()] = 1
  158. }
  159. for key, count := range measurements {
  160. strBuff.WriteString(fmt.Sprintf("measurement:%s,count:%d", key, count))
  161. strBuff.WriteString("\n")
  162. }
  163. return strBuff.String()
  164. }
  165. func (self *SSubscriptionManager) isContainNotications(alert models.SCommonAlert) bool {
  166. alertNotis, err := alert.GetNotifications()
  167. if err != nil {
  168. log.Errorln(err)
  169. return false
  170. }
  171. if len(alertNotis) == 0 {
  172. return false
  173. }
  174. for _, an := range alertNotis {
  175. noti, err := an.GetNotification()
  176. if err != nil {
  177. return false
  178. }
  179. if !noti.IsDefault {
  180. return true
  181. }
  182. }
  183. return false
  184. }
  185. func (self *SSubscriptionManager) Eval(details monitor.CommonAlertMetricDetails, alert models.SCommonAlert, points []sub.Point) (bool,
  186. *monitor.EvalMatch, error) {
  187. serie := self.getPointsByAlertDetail(details, alert, points)
  188. reduceCondition := monitor.Condition{
  189. Type: details.Reduce,
  190. }
  191. if len(details.FieldOpt) != 0 {
  192. reduceCondition.Operators = []string{details.FieldOpt}
  193. }
  194. reducer, err := cond.NewAlertReducer(&reduceCondition)
  195. if err != nil {
  196. return false, nil, err
  197. }
  198. reduceValue, _ := reducer.Reduce(serie)
  199. evalCond := monitor.Condition{
  200. Type: getQueryEvalType(details.Comparator),
  201. Params: []float64{details.Threshold},
  202. }
  203. evaluator, err := cond.NewAlertEvaluator(&evalCond)
  204. if err != nil {
  205. return false, nil, err
  206. }
  207. if reduceValue != nil {
  208. log.Printf("name:%s,reduceValue:%f", serie.Name, *reduceValue)
  209. }
  210. if evaluator.Eval(reduceValue) {
  211. match := monitor.EvalMatch{
  212. Condition: "",
  213. Value: reduceValue,
  214. Metric: serie.Name,
  215. Tags: serie.Tags,
  216. }
  217. return true, &match, nil
  218. }
  219. return false, nil, nil
  220. }
  221. func getQueryEvalType(evalType string) string {
  222. typ := ""
  223. switch evalType {
  224. case ">=", ">":
  225. typ = "gt"
  226. case "<=", "<":
  227. typ = "lt"
  228. }
  229. return typ
  230. }
  231. func (self *SSubscriptionManager) getPointsByAlertDetail(details monitor.CommonAlertMetricDetails, alert models.SCommonAlert,
  232. points []sub.Point) *monitor.TimeSeries {
  233. metricPoints := make([]sub.Point, 0)
  234. serie := monitor.TimeSeries{
  235. RawName: "",
  236. Name: "",
  237. Points: make(monitor.TimeSeriesPoints, 0),
  238. Tags: nil,
  239. }
  240. if len(points) == 0 {
  241. return &serie
  242. }
  243. setting, _ := alert.GetSettings()
  244. model := setting.Conditions[0].Query.Model
  245. serie.Name = fmt.Sprintf("%s.%s", details.Measurement, details.Field)
  246. for _, point := range points {
  247. if point.Name() == model.Measurement {
  248. tagBool := true
  249. for _, tag := range model.Tags {
  250. if point.Tags().Map()[tag.Key] == tag.Value {
  251. tagBool = true
  252. } else {
  253. tagBool = false
  254. }
  255. if strings.ToUpper(tag.Condition) == "AND" && !tagBool {
  256. break
  257. }
  258. }
  259. if !tagBool {
  260. continue
  261. }
  262. if details.Groupby != "" && point.Tags().Map()[details.Groupby] == "" {
  263. continue
  264. }
  265. metricPoints = append(metricPoints, point)
  266. }
  267. }
  268. if len(metricPoints) == 0 {
  269. return &serie
  270. }
  271. serie.Tags = metricPoints[0].Tags().Map()
  272. for _, metricPoint := range metricPoints {
  273. if len(model.Selects) > 1 {
  274. fieldMap := metricPoint.Fields()
  275. point := make(monitor.TimePoint, 0)
  276. for _, sel := range model.Selects {
  277. point = append(point, parseValue(fieldMap[sel[0].Params[0]]))
  278. }
  279. point = append(point, float64(metricPoint.UnixNano()))
  280. serie.Points = append(serie.Points, point)
  281. continue
  282. }
  283. fieldPoint := metricPoint.FieldIterator()
  284. for fieldPoint.Next() {
  285. if string(fieldPoint.FieldKey()) == details.Field && isValid(fieldPoint) {
  286. val := fieldPoint.FloatValue()
  287. timePoint := monitor.NewTimePoint(&val, float64(metricPoint.UnixNano()))
  288. serie.Points = append(serie.Points, timePoint)
  289. }
  290. }
  291. }
  292. return &serie
  293. }
  294. func parseValue(value interface{}) *float64 {
  295. number, ok := value.(json.Number)
  296. if !ok {
  297. return nil
  298. }
  299. fvalue, err := number.Float64()
  300. if err == nil {
  301. return &fvalue
  302. }
  303. ivalue, err := number.Int64()
  304. if err == nil {
  305. ret := float64(ivalue)
  306. return &ret
  307. }
  308. return nil
  309. }
  310. func (self *SSubscriptionManager) evalNotifyOfAlert(alert models.SCommonAlert,
  311. metricDetails monitor.CommonAlertMetricDetails, evalContext alerting.EvalContext) error {
  312. rule, _ := alerting.NewRuleFromDBAlert(&alert.SAlert)
  313. rule.State = monitor.AlertStateAlerting
  314. evalContext.Rule = rule
  315. var err error
  316. if self.isContainNotications(alert) {
  317. switch metricDetails.Reduce {
  318. case "avg", "sum", "count", "median":
  319. self.updateAlertJob(alert)
  320. default:
  321. // alerting
  322. err = self.notifyByAlertNotis(alert, evalContext)
  323. if err != nil {
  324. log.Errorln("notifyByAlertNotis err:", err)
  325. }
  326. }
  327. } else {
  328. err = self.notifyBySysConfig(evalContext)
  329. }
  330. return err
  331. }
  332. func (self *SSubscriptionManager) updateAlertJob(alert models.SCommonAlert) {
  333. //upate alert value to dispatched immediately
  334. alert.Frequency = 1
  335. rule, err := alerting.NewRuleFromDBAlert(&alert.SAlert)
  336. if err != nil {
  337. log.Errorln("SSubscriptionManager updateAlertJob error:", err)
  338. return
  339. }
  340. services := registry.GetServices()
  341. for _, svc := range services {
  342. if svc.Name == "AlertEngine" {
  343. service := svc.Instance.(*alerting.AlertEngine)
  344. service.Scheduler.Update([]*alerting.Rule{rule})
  345. }
  346. }
  347. }
  348. func (self *SSubscriptionManager) notifyByAlertNotis(alert models.SCommonAlert, evalContext alerting.EvalContext) error {
  349. return self.doNotify(evalContext.Rule.Notifications, &evalContext)
  350. }
  351. func (n *SSubscriptionManager) doNotify(nIds []string, evalCtx *alerting.EvalContext) error {
  352. notis, err := models.NotificationManager.GetNotificationsWithDefault(nIds)
  353. if err != nil {
  354. return err
  355. }
  356. for _, obj := range notis {
  357. not, err := alerting.InitNotifier(alerting.NotificationConfig{
  358. Id: obj.GetId(),
  359. Name: obj.GetName(),
  360. Type: obj.Type,
  361. Frequency: time.Duration(obj.Frequency),
  362. SendReminder: obj.SendReminder,
  363. DisableResolveMessage: obj.DisableResolveMessage,
  364. Settings: obj.Settings,
  365. })
  366. if err != nil {
  367. log.Errorf("Could not create notifier %s, error: %v", obj.GetId(), err)
  368. continue
  369. }
  370. state, err := models.AlertNotificationManager.Get(evalCtx.Rule.Id, obj.GetId())
  371. if err != nil {
  372. log.Errorf(" notification %s to alert %s error: %v", obj.GetName(), evalCtx.Rule.Id, err)
  373. continue
  374. }
  375. err = not.Notify(evalCtx, state.Params)
  376. if err != nil {
  377. log.Errorln("not Notify err:", err)
  378. }
  379. }
  380. return nil
  381. }
  382. func (self *SSubscriptionManager) notifyBySysConfig(evalContext alerting.EvalContext) error {
  383. config := notifiers.GetNotifyTemplateConfig(&evalContext, false, evalContext.EvalMatches)
  384. contentConfig := templates.NewTemplateConfig(config)
  385. content, err := contentConfig.GenerateMarkdown()
  386. if err != nil {
  387. return err
  388. }
  389. log.Printf("统一报警[alertId:%s,alertName:%s]发生告警", evalContext.Rule.Id, evalContext.Rule.Name)
  390. notifyclient.SystemNotify(notify.TNotifyPriority(config.Priority), config.Title, jsonutils.NewString(content))
  391. return nil
  392. }
  393. func isValid(iterator sub.FieldIterator) bool {
  394. switch iterator.Type() {
  395. case sub.Float:
  396. return true
  397. default:
  398. return false
  399. }
  400. }