notifications_send_task.go 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359
  1. // Copyright 2019 Yunion
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package tasks
  15. import (
  16. "context"
  17. "database/sql"
  18. "fmt"
  19. "strings"
  20. "sync"
  21. "time"
  22. "yunion.io/x/jsonutils"
  23. "yunion.io/x/log"
  24. "yunion.io/x/pkg/errors"
  25. apis "yunion.io/x/onecloud/pkg/apis/notify"
  26. "yunion.io/x/onecloud/pkg/cloudcommon/db"
  27. "yunion.io/x/onecloud/pkg/cloudcommon/db/taskman"
  28. "yunion.io/x/onecloud/pkg/notify/models"
  29. "yunion.io/x/onecloud/pkg/notify/options"
  30. "yunion.io/x/onecloud/pkg/util/logclient"
  31. )
  32. type NotificationSendTask struct {
  33. taskman.STask
  34. }
  35. func init() {
  36. taskman.RegisterTask(NotificationSendTask{})
  37. }
  38. func (self *NotificationSendTask) taskFailed(ctx context.Context, notification *models.SNotification, reason string, all bool) {
  39. log.Errorf("fail to send notification %q", notification.GetId())
  40. if all {
  41. notification.SetStatus(ctx, self.UserCred, apis.NOTIFICATION_STATUS_FAILED, reason)
  42. } else {
  43. notification.SetStatus(ctx, self.UserCred, apis.NOTIFICATION_STATUS_PART_OK, reason)
  44. }
  45. notification.AddOne()
  46. logclient.AddActionLogWithContext(ctx, notification, logclient.ACT_SEND_NOTIFICATION, reason, self.UserCred, false)
  47. self.SetStageFailed(ctx, jsonutils.NewString(reason))
  48. }
  49. type DomainContact struct {
  50. DomainId string
  51. Contact string
  52. }
  53. type ReceiverSpec struct {
  54. receiver models.IReceiver
  55. rNotificaion *models.SReceiverNotification
  56. }
  57. var notificationSendMap sync.Map
  58. var notificationGroupLock sync.Mutex
  59. func init() {
  60. notificationGroupLock = sync.Mutex{}
  61. }
  62. func (self *NotificationSendTask) OnInit(ctx context.Context, obj db.IStandaloneModel, body jsonutils.JSONObject) {
  63. notification := obj.(*models.SNotification)
  64. if notification.Status == apis.NOTIFICATION_STATUS_OK {
  65. self.SetStageComplete(ctx, nil)
  66. return
  67. }
  68. rns, err := notification.ReceiverNotificationsNotOK()
  69. if err != nil {
  70. self.taskFailed(ctx, notification, errors.Wrapf(err, "ReceiverNotificationsNotOK").Error(), true)
  71. return
  72. }
  73. event, err := models.EventManager.GetEvent(notification.EventId)
  74. if err != nil {
  75. if err != sql.ErrNoRows {
  76. self.taskFailed(ctx, notification, errors.Wrapf(err, "GetEvent").Error(), true)
  77. return
  78. }
  79. }
  80. notification.SetStatus(ctx, self.UserCred, apis.NOTIFICATION_STATUS_SENDING, "")
  81. // build contactMap
  82. receivers := make([]ReceiverSpec, 0)
  83. receiversEn := make([]ReceiverSpec, 0, len(rns)/2)
  84. receiversCn := make([]ReceiverSpec, 0, len(rns)/2)
  85. failedRecord := make([]string, 0)
  86. sendFail := func(rn *models.SReceiverNotification, reason string) {
  87. rn.AfterSend(ctx, false, reason)
  88. failedRecord = append(failedRecord, fmt.Sprintf("%s: %s", rn.ReceiverID, reason))
  89. }
  90. robotUseTemplate := false
  91. for i := range rns {
  92. receiver, err := rns[i].Receiver()
  93. if err != nil {
  94. sendFail(&rns[i], fmt.Sprintf("fail to fetch Receiver: %s", err.Error()))
  95. continue
  96. }
  97. // check receiver enabled
  98. if !receiver.IsEnabled() {
  99. sendFail(&rns[i], "disabled receiver")
  100. continue
  101. }
  102. // check contact enabled
  103. if notification.ContactType == apis.WEBHOOK {
  104. notification.ContactType = apis.WEBHOOK_ROBOT
  105. }
  106. if receiver.IsRobot() {
  107. robot := receiver.(*models.SRobot)
  108. notification.ContactType = fmt.Sprintf("%s-robot", robot.Type)
  109. robotUseTemplate = robot.UseTemplate.Bool()
  110. }
  111. enabled, err := receiver.IsEnabledContactType(notification.ContactType)
  112. if err != nil {
  113. logclient.AddSimpleActionLog(notification, logclient.ACT_SEND_NOTIFICATION, errors.Wrapf(err, "GetEnabledContactTypes"), self.GetUserCred(), false)
  114. continue
  115. }
  116. driver := models.GetDriver(notification.ContactType)
  117. if driver == nil || !enabled {
  118. sendFail(&rns[i], fmt.Sprintf("disabled contactType %q", notification.ContactType))
  119. continue
  120. }
  121. // check contact verified
  122. verified, err := receiver.IsVerifiedContactType(notification.ContactType)
  123. if err != nil {
  124. sendFail(&rns[i], fmt.Sprintf("IsVerifiedContactType error for receiver: %s", err.Error()))
  125. continue
  126. }
  127. if !verified {
  128. contact, _ := receiver.GetContact(notification.ContactType)
  129. sendFail(&rns[i], fmt.Sprintf("unverified contactType %q for contact %s", notification.ContactType, contact))
  130. continue
  131. }
  132. lang, err := receiver.GetTemplateLang(ctx)
  133. if err != nil {
  134. reason := fmt.Sprintf("fail to GetTemplateLang: %s", err.Error())
  135. sendFail(&rns[i], reason)
  136. continue
  137. }
  138. switch lang {
  139. case "":
  140. receivers = append(receivers, ReceiverSpec{
  141. receiver: receiver,
  142. rNotificaion: &rns[i],
  143. })
  144. case apis.TEMPLATE_LANG_EN:
  145. receiversEn = append(receiversEn, ReceiverSpec{
  146. receiver: receiver,
  147. rNotificaion: &rns[i],
  148. })
  149. case apis.TEMPLATE_LANG_CN:
  150. receiversCn = append(receiversCn, ReceiverSpec{
  151. receiver: receiver,
  152. rNotificaion: &rns[i],
  153. })
  154. }
  155. }
  156. nn, err := notification.Notification(robotUseTemplate)
  157. if err != nil {
  158. self.taskFailed(ctx, notification, errors.Wrapf(err, "Notification").Error(), true)
  159. return
  160. }
  161. for lang, receivers := range map[string][]ReceiverSpec{
  162. "": receivers,
  163. apis.TEMPLATE_LANG_CN: receiversCn,
  164. apis.TEMPLATE_LANG_EN: receiversEn,
  165. } {
  166. if len(receivers) == 0 {
  167. log.Warningf("no receiver to send for %s %s, skip ...", notification.ContactType, lang)
  168. continue
  169. }
  170. // send
  171. topicId := ""
  172. if event != nil {
  173. topicId = event.TopicId
  174. }
  175. p, err := notification.GetTemplate(ctx, topicId, lang, nn)
  176. if err != nil {
  177. logclient.AddSimpleActionLog(notification, logclient.ACT_SEND_NOTIFICATION, errors.Wrapf(err, "FillWithTemplate(%s)", lang), self.GetUserCred(), false)
  178. continue
  179. }
  180. if event != nil {
  181. p.Event = event.Event
  182. }
  183. if notification.ContactType != apis.MOBILE && notification.ContactType != apis.WEBHOOK_ROBOT {
  184. switch lang {
  185. case apis.TEMPLATE_LANG_CN:
  186. p.Message += "\n来自 " + options.Options.ApiServer
  187. tz, _ := time.LoadLocation(options.Options.TimeZone)
  188. p.Message += "\n发生于 " + time.Now().In(tz).Format("2006-01-02 15:04:05")
  189. case apis.TEMPLATE_LANG_EN:
  190. p.Message += "\nfrom " + options.Options.ApiServer
  191. p.Message += "\nat " + time.Now().In(time.UTC).Format("2006-01-02 15:04:05")
  192. }
  193. }
  194. p.DomainId = self.UserCred.GetDomainId()
  195. // set status before send
  196. now := time.Now()
  197. for _, rn := range receivers {
  198. rn.rNotificaion.BeforeSend(ctx, now)
  199. }
  200. // send
  201. fds, err := self.batchSend(ctx, notification, receivers, p)
  202. if err != nil {
  203. for _, r := range receivers {
  204. sendFail(r.rNotificaion, err.Error())
  205. }
  206. continue
  207. }
  208. // check result
  209. failedRnIds := make(map[int64]struct{}, 0)
  210. for _, fd := range fds {
  211. sendFail(fd.rNotificaion, fd.Reason)
  212. failedRnIds[fd.rNotificaion.RowId] = struct{}{}
  213. }
  214. // after send for successful notify
  215. for _, r := range receivers {
  216. if _, ok := failedRnIds[r.rNotificaion.RowId]; ok {
  217. continue
  218. }
  219. r.rNotificaion.AfterSend(ctx, true, "")
  220. }
  221. }
  222. if len(failedRecord) > 0 && len(failedRecord) >= len(rns) {
  223. self.taskFailed(ctx, notification, strings.Join(failedRecord, "; "), true)
  224. return
  225. }
  226. if len(failedRecord) > 0 {
  227. self.taskFailed(ctx, notification, strings.Join(failedRecord, "; "), false)
  228. return
  229. }
  230. notification.SetStatus(ctx, self.UserCred, apis.NOTIFICATION_STATUS_OK, "")
  231. logclient.AddActionLogWithContext(ctx, notification, logclient.ACT_SEND_NOTIFICATION, "", self.UserCred, true)
  232. self.SetStageComplete(ctx, nil)
  233. }
  234. type FailedReceiverSpec struct {
  235. ReceiverSpec
  236. Reason string
  237. }
  238. func (notificationSendTask *NotificationSendTask) batchSend(ctx context.Context, notification *models.SNotification, receivers []ReceiverSpec, params apis.SendParams) (fails []FailedReceiverSpec, err error) {
  239. if notification.ContactType == apis.WEBCONSOLE {
  240. return
  241. }
  242. for i := range receivers {
  243. if receivers[i].receiver.IsRobot() {
  244. robot := receivers[i].receiver.(*models.SRobot)
  245. driver := models.GetDriver(fmt.Sprintf("%s-robot", robot.Type))
  246. params.Receivers.Contact = robot.Address
  247. params.Header = robot.Header
  248. params.Body = robot.Body
  249. params.MsgKey = robot.MsgKey
  250. params.SecretKey = robot.SecretKey
  251. params.GroupTimes = uint(receivers[i].rNotificaion.GroupTimes)
  252. err = driver.Send(ctx, params)
  253. if err != nil {
  254. fails = append(fails, FailedReceiverSpec{ReceiverSpec: receivers[i], Reason: err.Error()})
  255. }
  256. } else if receivers[i].receiver.IsReceiver() {
  257. receiver := receivers[i].receiver.(*models.SReceiver)
  258. params.Receivers.Contact, _ = receiver.GetContact(notification.ContactType)
  259. params.GroupTimes = uint(receivers[i].rNotificaion.GroupTimes)
  260. driver := models.GetDriver(notification.ContactType)
  261. if notification.ContactType == apis.EMAIL {
  262. params.EmailMsg = apis.SEmailMessage{
  263. To: []string{receiver.Email},
  264. Subject: params.Title,
  265. Body: params.Message,
  266. }
  267. }
  268. if notification.ContactType == apis.MOBILE {
  269. mobileArr := strings.Split(receiver.Mobile, " ")
  270. mobile := strings.Join(mobileArr, "")
  271. params.Receivers.Contact = mobile
  272. }
  273. params.ReceiverId = receiver.Id
  274. params.SendTime = time.Now().Truncate(time.Second)
  275. if len(params.GroupKey) > 0 && params.GroupTimes > 0 {
  276. notificationGroupLock.Lock()
  277. if _, ok := notificationSendMap.Load(params.GroupKey + receiver.Id + notification.ContactType); ok {
  278. err = models.NotificationGroupManager.TaskCreate(ctx, notification.ContactType, params)
  279. } else {
  280. err = models.NotificationGroupManager.TaskCreate(ctx, notification.ContactType, params)
  281. if err != nil {
  282. fails = append(fails, FailedReceiverSpec{ReceiverSpec: receivers[i], Reason: err.Error()})
  283. }
  284. err = driver.Send(ctx, params)
  285. createTimeTicker(ctx, driver, params, receiver.Id, notification.ContactType)
  286. }
  287. notificationGroupLock.Unlock()
  288. } else {
  289. err = driver.Send(ctx, params)
  290. }
  291. if err != nil {
  292. fails = append(fails, FailedReceiverSpec{ReceiverSpec: receivers[i], Reason: err.Error()})
  293. }
  294. } else {
  295. receiver := receivers[i].receiver.(*models.SContact)
  296. params.Receivers.Contact, _ = receiver.GetContact(notification.ContactType)
  297. driver := models.GetDriver(notification.ContactType)
  298. err = driver.Send(ctx, params)
  299. if err != nil {
  300. fails = append(fails, FailedReceiverSpec{ReceiverSpec: receivers[i], Reason: err.Error()})
  301. }
  302. }
  303. }
  304. return fails, nil
  305. }
  306. func createTimeTicker(ctx context.Context, driver models.ISenderDriver, params apis.SendParams, receiverId, contactType string) {
  307. // 创建一个计时器,每秒触发一次
  308. // params.GroupTimes = 5
  309. timer := time.NewTicker(time.Duration(params.GroupTimes) * time.Minute)
  310. notificationSendMap.Store(params.GroupKey+receiverId+contactType, apis.SNotificationGroupSearchInput{
  311. GroupKey: params.GroupKey,
  312. ReceiverId: receiverId,
  313. ContactType: contactType,
  314. StartTime: params.SendTime,
  315. EndTime: params.SendTime.Add(time.Duration(params.GroupTimes) * time.Minute),
  316. })
  317. // 启动一个goroutine来处理计时器触发的事件
  318. go func() {
  319. for {
  320. // 等待计时器触发的事件
  321. <-timer.C
  322. // 处理计时器触发的事件
  323. arrValue, ok := notificationSendMap.Load(params.GroupKey + receiverId + contactType)
  324. if !ok {
  325. return
  326. }
  327. input := arrValue.(apis.SNotificationGroupSearchInput)
  328. // 组装聚合后的消息
  329. sendParams, err := models.NotificationGroupManager.TaskSend(ctx, input)
  330. if err != nil {
  331. log.Errorln("TaskSend err:", err)
  332. return
  333. }
  334. driver.Send(ctx, *sendParams)
  335. notificationSendMap.Delete(params.GroupKey + receiverId + contactType)
  336. }
  337. }()
  338. }