notifier.go 10.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308
  1. // Copyright 2019 Yunion
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package alerting
  15. import (
  16. "database/sql"
  17. "time"
  18. "yunion.io/x/jsonutils"
  19. "yunion.io/x/log"
  20. "yunion.io/x/pkg/errors"
  21. "yunion.io/x/onecloud/pkg/apis"
  22. "yunion.io/x/onecloud/pkg/apis/monitor"
  23. "yunion.io/x/onecloud/pkg/cloudcommon/db"
  24. "yunion.io/x/onecloud/pkg/mcclient"
  25. "yunion.io/x/onecloud/pkg/monitor/models"
  26. "yunion.io/x/onecloud/pkg/monitor/notifydrivers"
  27. )
  28. type notificationService struct {
  29. }
  30. func newNotificationService() *notificationService {
  31. return &notificationService{}
  32. }
  33. func (n *notificationService) SendIfNeeded(evalCtx *EvalContext) error {
  34. notifierStates, shouldNotify, err := n.getNeededNotifiers(evalCtx.Rule.Notifications, evalCtx)
  35. if err != nil {
  36. return errors.Wrap(err, "failed to get alert notifiers")
  37. }
  38. n.syncResources(evalCtx, shouldNotify)
  39. if len(notifierStates) == 0 {
  40. return nil
  41. }
  42. return n.sendNotifications(evalCtx, notifierStates)
  43. }
  44. type notifierState struct {
  45. notifier Notifier
  46. state *models.SAlertnotification
  47. }
  48. type notifierStateSlice []*notifierState
  49. func (n *notificationService) sendNotification(evalCtx *EvalContext, state *notifierState) error {
  50. if !evalCtx.IsTestRun {
  51. if err := state.state.SetToPending(); err != nil {
  52. return errors.Wrap(err, "SetToPending")
  53. }
  54. }
  55. return n.sendAndMarkAsComplete(evalCtx, state)
  56. }
  57. func (n *notificationService) sendAndMarkAsComplete(evalCtx *EvalContext, state *notifierState) error {
  58. notifier := state.notifier
  59. log.Debugf("Sending notification, type %s, id %s", notifier.GetType(), notifier.GetNotifierId())
  60. if err := notifier.Notify(evalCtx, state.state.GetParams()); err != nil {
  61. return errors.Wrapf(err, "notify driver %s(%s)", notifier.GetType(), notifier.GetNotifierId())
  62. }
  63. if evalCtx.IsTestRun {
  64. return nil
  65. }
  66. err := state.state.UpdateSendTime()
  67. if err != nil {
  68. return errors.Wrap(err, "notifierState UpdateSendTime")
  69. }
  70. return state.state.SetToCompleted()
  71. }
  72. func (n *notificationService) sendNotifications(evalCtx *EvalContext, states notifierStateSlice) error {
  73. for _, state := range states {
  74. if err := n.sendNotification(evalCtx, state); err != nil {
  75. log.Errorf("failed to send %s(%s) notification: %v", state.notifier.GetType(), state.notifier.GetNotifierId(), err)
  76. if evalCtx.IsTestRun {
  77. return err
  78. }
  79. }
  80. }
  81. return nil
  82. }
  83. func (n *notificationService) getNeededNotifiers(nIds []string, evalCtx *EvalContext) (notifierStateSlice, bool, error) {
  84. notis, err := models.NotificationManager.GetNotificationsWithDefault(nIds)
  85. if err != nil {
  86. return nil, false, errors.Wrapf(err, "GetNotificationsWithDefault with %v", nIds)
  87. }
  88. var result notifierStateSlice
  89. shouldNotify := false
  90. for _, obj := range notis {
  91. not, err := InitNotifier(NotificationConfig{
  92. Ctx: evalCtx.Ctx,
  93. Id: obj.GetId(),
  94. Name: obj.GetName(),
  95. Type: obj.Type,
  96. Frequency: time.Duration(obj.Frequency),
  97. SendReminder: obj.SendReminder,
  98. DisableResolveMessage: obj.DisableResolveMessage,
  99. Settings: obj.Settings,
  100. })
  101. if err != nil {
  102. log.Errorf("Could not create notifier %s, error: %v", obj.GetId(), err)
  103. continue
  104. }
  105. state, err := models.AlertNotificationManager.Get(evalCtx.Rule.Id, obj.GetId())
  106. if err != nil {
  107. if errors.Cause(err) == sql.ErrNoRows {
  108. state, err = obj.AttachToAlert(evalCtx.Ctx, evalCtx.UserCred, evalCtx.Rule.Id)
  109. if err != nil {
  110. log.Errorf("Attach notification %s to alert %s error: %v", obj.GetName(), evalCtx.Rule.Id, err)
  111. continue
  112. }
  113. } else {
  114. log.Errorf("Get alert state: %v, alertId %s, notifierId: %s", err, evalCtx.Rule.Id, obj.GetId())
  115. continue
  116. }
  117. }
  118. if not.ShouldNotify(evalCtx.Ctx, evalCtx, state) {
  119. shouldNotify = true
  120. result = append(result, &notifierState{
  121. notifier: not,
  122. state: state,
  123. })
  124. }
  125. }
  126. return result, shouldNotify, nil
  127. }
  128. func (n *notificationService) syncResources(evalCtx *EvalContext, shouldNotify bool) {
  129. n.processNeedShieldEvalMatches(evalCtx, evalCtx.EvalMatches)
  130. n.processNeedShieldEvalMatches(evalCtx, evalCtx.AlertOkEvalMatches)
  131. if shouldNotify || evalCtx.Rule.State == monitor.AlertStateAlerting {
  132. go func() {
  133. if err := n.createAlertRecordWhenNotify(evalCtx, shouldNotify); err != nil {
  134. log.Errorf("createAlertRecordWhenNotify error: %v", err)
  135. }
  136. }()
  137. }
  138. if !shouldNotify && evalCtx.shouldUpdateAlertState() && evalCtx.NoDataFound {
  139. go func() {
  140. n.detachAlertResourceWhenNodata(evalCtx)
  141. }()
  142. }
  143. if len(evalCtx.AlertOkEvalMatches) > 0 {
  144. go func() {
  145. if err := n.syncMonitorResourceAlerts(evalCtx); err != nil {
  146. log.Errorf("syncMonitorResourceAlerts error: %v", err)
  147. }
  148. }()
  149. }
  150. }
  151. func (n *notificationService) createAlertRecordWhenNotify(evalCtx *EvalContext, shouldNotify bool) error {
  152. var matches []*monitor.EvalMatch
  153. if evalCtx.Firing {
  154. matches = evalCtx.EvalMatches
  155. } else {
  156. matches = evalCtx.AlertOkEvalMatches
  157. }
  158. recordCreateInput := monitor.AlertRecordCreateInput{
  159. StandaloneResourceCreateInput: apis.StandaloneResourceCreateInput{
  160. GenerateName: evalCtx.Rule.Name,
  161. },
  162. AlertId: evalCtx.Rule.Id,
  163. Level: evalCtx.Rule.Level,
  164. State: string(evalCtx.Rule.State),
  165. SendState: monitor.SEND_STATE_OK,
  166. EvalData: matches,
  167. AlertRule: evalCtx.Rule.RuleDescription,
  168. }
  169. if !shouldNotify {
  170. recordCreateInput.SendState = monitor.SEND_STATE_SILENT
  171. }
  172. recordCreateInput.ResType = recordCreateInput.AlertRule[0].ResType
  173. if len(recordCreateInput.ResType) == 0 {
  174. recordCreateInput.ResType = monitor.METRIC_RES_TYPE_HOST
  175. }
  176. createData := recordCreateInput.JSON(recordCreateInput)
  177. alert, _ := models.CommonAlertManager.GetAlert(evalCtx.Rule.Id)
  178. record, err := db.DoCreate(models.AlertRecordManager, evalCtx.Ctx, evalCtx.UserCred, jsonutils.NewDict(), createData, evalCtx.UserCred)
  179. if err != nil {
  180. return errors.Wrapf(err, "db.DoCreate")
  181. }
  182. alertData := jsonutils.Marshal(alert)
  183. alertData.(*jsonutils.JSONDict).Set("project_id", jsonutils.NewString(alert.GetProjectId()))
  184. db.PerformSetScope(evalCtx.Ctx, record.(*models.SAlertRecord), evalCtx.UserCred, alertData)
  185. dbMatches, _ := record.(*models.SAlertRecord).GetEvalData()
  186. if !evalCtx.Firing {
  187. evalCtx.AlertOkEvalMatches = make([]*monitor.EvalMatch, len(dbMatches))
  188. for i := range dbMatches {
  189. evalCtx.AlertOkEvalMatches[i] = &dbMatches[i]
  190. }
  191. }
  192. record.PostCreate(evalCtx.Ctx, evalCtx.UserCred, evalCtx.UserCred, nil, createData)
  193. return nil
  194. }
  195. func (n *notificationService) processNeedShieldEvalMatches(evalCtx *EvalContext, match []*monitor.EvalMatch) {
  196. input := monitor.AlertRecordShieldListInput{
  197. AlertId: evalCtx.Rule.Id,
  198. }
  199. for i := range match {
  200. input.ResId = monitor.GetMeasurementResourceId(match[i].Tags, input.ResType)
  201. alertRecordShields, err := models.AlertRecordShieldManager.GetRecordShields(input)
  202. if err != nil {
  203. log.Errorf("GetRecordShields by input: %s, err: %v", jsonutils.Marshal(input), err)
  204. continue
  205. }
  206. if len(alertRecordShields) != 0 {
  207. for _, shield := range alertRecordShields {
  208. if shield.EndTime.After(time.Now().UTC()) && shield.StartTime.Before(time.Now().UTC()) {
  209. match[i].Tags[monitor.ALERT_RESOURCE_RECORD_SHIELD_KEY] = monitor.ALERT_RESOURCE_RECORD_SHIELD_VALUE
  210. break
  211. }
  212. }
  213. }
  214. }
  215. }
  216. func (n *notificationService) detachAlertResourceWhenNodata(evalCtx *EvalContext) {
  217. errs := models.CommonAlertManager.DetachAlertResourceByAlertId(evalCtx.Ctx, evalCtx.UserCred, evalCtx.Rule.Id)
  218. if len(errs) != 0 {
  219. log.Errorf("detachAlertResourceWhenNodata err: %v", errors.NewAggregate(errs).Error())
  220. }
  221. }
  222. func (n *notificationService) syncMonitorResourceAlerts(evalCtx *EvalContext) error {
  223. if len(evalCtx.AlertOkEvalMatches) == 0 {
  224. log.Infof("alert_ok_eval_matches is empty, skip syncMonitorResourceAlerts")
  225. return nil
  226. }
  227. // only sync resource not need notify
  228. matches := make([]monitor.EvalMatch, len(evalCtx.AlertOkEvalMatches))
  229. for i := range evalCtx.AlertOkEvalMatches {
  230. matches[i] = *evalCtx.AlertOkEvalMatches[i]
  231. }
  232. alertRule := evalCtx.Rule.RuleDescription
  233. input := &models.UpdateMonitorResourceAlertInput{
  234. AlertId: evalCtx.Rule.Id,
  235. Matches: matches,
  236. ResType: alertRule[0].ResType,
  237. AlertState: string(monitor.AlertStateOK),
  238. SendState: monitor.SEND_STATE_SILENT,
  239. TriggerTime: time.Now(),
  240. AlertRecordId: "",
  241. }
  242. if err := models.MonitorResourceManager.UpdateMonitorResourceAttachJoint(evalCtx.Ctx, evalCtx.UserCred, input); err != nil {
  243. return errors.Wrap(err, "UpdateMonitorResourceAttachJoint")
  244. }
  245. return nil
  246. }
  247. type NotifierPlugin struct {
  248. Type string
  249. Factory NotifierFactory
  250. ValidateCreateData func(cred mcclient.IIdentityProvider, input monitor.NotificationCreateInput) (monitor.NotificationCreateInput, error)
  251. }
  252. type NotificationConfig notifydrivers.NotificationConfig
  253. // NotifierFactory is a signature for creating notifiers
  254. type NotifierFactory func(config NotificationConfig) (Notifier, error)
  255. func RegisterNotifier(plug *NotifierPlugin) {
  256. notifydrivers.RegisterNotifier(&notifydrivers.NotifierPlugin{
  257. Type: plug.Type,
  258. Factory: func(cfg notifydrivers.NotificationConfig) (notifydrivers.Notifier, error) {
  259. ret, err := plug.Factory(NotificationConfig(cfg))
  260. if err != nil {
  261. return nil, err
  262. }
  263. return ret.(notifydrivers.Notifier), nil
  264. },
  265. ValidateCreateData: plug.ValidateCreateData,
  266. })
  267. }
  268. // InitNotifier construct a new notifier
  269. func InitNotifier(config NotificationConfig) (Notifier, error) {
  270. plug, err := notifydrivers.InitNotifier(notifydrivers.NotificationConfig(config))
  271. if err != nil {
  272. return nil, err
  273. }
  274. return plug.(Notifier), nil
  275. }