emailqueue.go 8.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304
  1. // Copyright 2019 Yunion
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package models
  15. import (
  16. "context"
  17. "fmt"
  18. "strings"
  19. "time"
  20. "yunion.io/x/jsonutils"
  21. "yunion.io/x/log"
  22. "yunion.io/x/pkg/errors"
  23. "yunion.io/x/pkg/util/regutils"
  24. "yunion.io/x/sqlchemy"
  25. api "yunion.io/x/onecloud/pkg/apis/notify"
  26. "yunion.io/x/onecloud/pkg/cloudcommon/consts"
  27. "yunion.io/x/onecloud/pkg/cloudcommon/db"
  28. "yunion.io/x/onecloud/pkg/httperrors"
  29. "yunion.io/x/onecloud/pkg/mcclient"
  30. "yunion.io/x/onecloud/pkg/util/stringutils2"
  31. )
  32. const (
  33. maxEmailDestCount = 256
  34. )
  35. type SEmailQueueManager struct {
  36. db.SLogBaseManager
  37. }
  38. type SEmailQueue struct {
  39. db.SLogBase
  40. RecvAt time.Time `nullable:"false" created_at:"true" index:"true" get:"user" list:"user" json:"recv_at"`
  41. Dest string `width:"256" charset:"ascii" nullable:"false" list:"user" create:"admin_required"`
  42. Subject string `width:"256" charset:"utf8" nullable:"false" list:"user" create:"admin_required"`
  43. DestCc string `width:"256" charset:"ascii" nullable:"false" list:"user" create:"admin_optional"`
  44. DestBcc string `width:"256" charset:"ascii" nullable:"false" list:"user" create:"admin_optional"`
  45. SessionId string `width:"256" charset:"utf8" nullable:"false" list:"user" create:"admin_optional"`
  46. Content jsonutils.JSONObject `length:"long" charset:"utf8" nullable:"false" list:"user" create:"admin_required"`
  47. MoreDetails jsonutils.JSONObject `length:"long" charset:"utf8" nullable:"true" list:"user" create:"admin_optional"`
  48. ProjectId string `width:"128" charset:"ascii" list:"user" create:"admin_optional" index:"true"`
  49. Project string `width:"128" charset:"utf8" list:"user" create:"admin_optional"`
  50. ProjectDomainId string `name:"project_domain_id" default:"default" width:"128" charset:"ascii" list:"user" create:"admin_optional"`
  51. ProjectDomain string `name:"project_domain" default:"Default" width:"128" charset:"utf8" list:"user" create:"admin_optional"`
  52. UserId string `width:"128" charset:"ascii" list:"user" create:"admin_required"`
  53. User string `width:"128" charset:"utf8" list:"user" create:"admin_required"`
  54. DomainId string `width:"128" charset:"ascii" list:"user" create:"admin_optional"`
  55. Domain string `width:"128" charset:"utf8" list:"user" create:"admin_optional"`
  56. Roles string `width:"64" charset:"utf8" list:"user" create:"admin_optional"`
  57. }
  58. var EmailQueueManager *SEmailQueueManager
  59. func InitEmailQueue() {
  60. EmailQueueManager = &SEmailQueueManager{
  61. SLogBaseManager: db.NewLogBaseManager(SEmailQueue{}, "emailqueue_tbl", "emailqueue", "emailqueues", "recv_at", consts.OpsLogWithClickhouse),
  62. }
  63. EmailQueueManager.SetVirtualObject(EmailQueueManager)
  64. }
  65. func (e *SEmailQueue) GetRecordTime() time.Time {
  66. return e.RecvAt
  67. }
  68. func (manager *SEmailQueueManager) ValidateCreateData(
  69. ctx context.Context,
  70. userCred mcclient.TokenCredential,
  71. ownerId mcclient.IIdentityProvider,
  72. query jsonutils.JSONObject,
  73. input api.EmailQueueCreateInput,
  74. ) (api.EmailQueueCreateInput, error) {
  75. // check permission
  76. if db.IsAdminAllowCreate(userCred, manager).Result.IsDeny() {
  77. return input, errors.Wrap(httperrors.ErrForbidden, "only admin can send email")
  78. }
  79. // validate data
  80. if len(input.To) == 0 {
  81. return input, errors.Wrap(httperrors.ErrInputParameter, "empty receiver")
  82. }
  83. invalidTos := make([]string, 0)
  84. for _, tos := range [][]string{
  85. input.To,
  86. input.Cc,
  87. input.Bcc,
  88. } {
  89. for _, to := range tos {
  90. if !regutils.MatchEmail(to) {
  91. invalidTos = append(invalidTos, to)
  92. }
  93. }
  94. }
  95. if len(invalidTos) > 0 {
  96. return input, errors.Wrapf(httperrors.ErrInputParameter, "invalid email %s", strings.Join(invalidTos, ","))
  97. }
  98. input.Dest = strings.Join(input.To, ",")
  99. input.DestCc = strings.Join(input.Cc, ",")
  100. input.DestBcc = strings.Join(input.Bcc, ",")
  101. if len(input.Dest) > maxEmailDestCount {
  102. return input, errors.Wrap(httperrors.ErrInputParameter, "too many tos")
  103. }
  104. if len(input.DestCc) > maxEmailDestCount {
  105. return input, errors.Wrap(httperrors.ErrInputParameter, "too many ccs")
  106. }
  107. if len(input.DestBcc) > maxEmailDestCount {
  108. return input, errors.Wrap(httperrors.ErrInputParameter, "too many bccs")
  109. }
  110. msg := api.SEmailMessage{
  111. Body: input.Body,
  112. Attachments: input.Attachments,
  113. }
  114. input.Content = jsonutils.Marshal(msg)
  115. input.Project = userCred.GetProjectName()
  116. input.ProjectId = userCred.GetProjectId()
  117. input.ProjectDomain = userCred.GetProjectDomain()
  118. input.ProjectDomainId = userCred.GetProjectDomainId()
  119. input.User = userCred.GetUserName()
  120. input.UserId = userCred.GetUserId()
  121. input.Domain = userCred.GetDomainName()
  122. input.DomainId = userCred.GetDomainId()
  123. input.Roles = strings.Join(userCred.GetRoles(), ",")
  124. return input, nil
  125. }
  126. func (eq *SEmailQueue) PostCreate(
  127. ctx context.Context,
  128. userCred mcclient.TokenCredential,
  129. ownerId mcclient.IIdentityProvider,
  130. query jsonutils.JSONObject,
  131. data jsonutils.JSONObject,
  132. ) {
  133. eq.SLogBase.PostCreate(ctx, userCred, ownerId, query, data)
  134. eq.setStatus(ctx, api.EmailQueued, nil)
  135. eq.doSendAsync()
  136. }
  137. func (eq *SEmailQueue) doSendAsync() {
  138. Worker.Run(eq, nil, nil)
  139. }
  140. func (eq *SEmailQueue) Dump() string {
  141. return fmt.Sprintf("send email %s", eq.Subject)
  142. }
  143. func (eq *SEmailQueue) Run() {
  144. log.Debugf("send email")
  145. eq.doSend(context.TODO())
  146. }
  147. func (eq *SEmailQueue) doSend(ctx context.Context) {
  148. msg, err := eq.getMessage()
  149. if err != nil {
  150. eq.setStatus(ctx, api.EmailFail, err)
  151. return
  152. }
  153. eq.setStatus(ctx, api.EmailSending, nil)
  154. driver := GetDriver(api.EMAIL)
  155. err = driver.Send(ctx, api.SendParams{
  156. EmailMsg: *msg,
  157. })
  158. if err != nil {
  159. eq.setStatus(ctx, api.EmailFail, err)
  160. return
  161. }
  162. eq.setStatus(ctx, api.EmailSuccess, nil)
  163. }
  164. func (eq *SEmailQueue) getMessage() (*api.SEmailMessage, error) {
  165. msg := api.SEmailMessage{}
  166. err := eq.Content.Unmarshal(&msg)
  167. if err != nil {
  168. return nil, errors.Wrap(err, "Unmarshal")
  169. }
  170. msg.To = strings.Split(eq.Dest, ",")
  171. msg.Cc = strings.Split(eq.DestCc, ",")
  172. msg.Bcc = strings.Split(eq.DestBcc, ",")
  173. msg.Subject = eq.Subject
  174. return &msg, nil
  175. }
  176. func (eq *SEmailQueue) setStatus(ctx context.Context, status string, results error) {
  177. eqs := SEmailQueueStatus{
  178. Id: eq.Id,
  179. Status: status,
  180. }
  181. if results != nil {
  182. eqs.Results = results.Error()
  183. }
  184. if status == api.EmailSuccess || status == api.EmailFail {
  185. eqs.SentAt = time.Now()
  186. }
  187. EmailQueueStatusManager.TableSpec().InsertOrUpdate(ctx, &eqs)
  188. }
  189. // 宿主机/物理机列表
  190. func (manager *SEmailQueueManager) ListItemFilter(
  191. ctx context.Context,
  192. q *sqlchemy.SQuery,
  193. userCred mcclient.TokenCredential,
  194. query api.EmailQueueListInput,
  195. ) (*sqlchemy.SQuery, error) {
  196. var err error
  197. q, err = manager.SLogBaseManager.ListItemFilter(ctx, q, userCred, query.LogBaseListInput)
  198. if err != nil {
  199. return q, errors.Wrap(err, "SLogBaseManager.ListItemFilter")
  200. }
  201. if len(query.To) > 0 {
  202. cond := make([]sqlchemy.ICondition, 0)
  203. for _, to := range query.To {
  204. cond = append(cond, sqlchemy.Contains(q.Field("dest"), to))
  205. }
  206. q = q.Filter(sqlchemy.OR(cond...))
  207. }
  208. if len(query.Subject) > 0 {
  209. q = q.Contains("subject", query.Subject)
  210. }
  211. if len(query.SessionId) > 0 {
  212. q = q.In("session_id", query.SessionId)
  213. }
  214. return q, nil
  215. }
  216. func (eq *SEmailQueue) PerformSend(
  217. ctx context.Context,
  218. userCred mcclient.TokenCredential,
  219. query jsonutils.JSONObject,
  220. input api.EmailQueueSendInput,
  221. ) (jsonutils.JSONObject, error) {
  222. eq.setStatus(ctx, api.EmailQueued, nil)
  223. if input.Sync {
  224. log.Debugf("send email synchronously")
  225. eq.doSend(ctx)
  226. } else {
  227. log.Debugf("send email Asynchronously")
  228. eq.doSendAsync()
  229. }
  230. return nil, nil
  231. }
  232. func (manager *SEmailQueueManager) FetchCustomizeColumns(
  233. ctx context.Context,
  234. userCred mcclient.TokenCredential,
  235. query jsonutils.JSONObject,
  236. objs []interface{},
  237. fields stringutils2.SSortedStrings,
  238. isList bool,
  239. ) []api.EmailQueueDetails {
  240. rows := make([]api.EmailQueueDetails, len(objs))
  241. baseRows := manager.SModelBaseManager.FetchCustomizeColumns(ctx, userCred, query, objs, fields, isList)
  242. emailIds := make([]int64, len(objs))
  243. for i := range rows {
  244. rows[i] = api.EmailQueueDetails{
  245. ModelBaseDetails: baseRows[i],
  246. }
  247. eq := objs[i].(*SEmailQueue)
  248. emailIds[i] = eq.Id
  249. }
  250. rets, err := EmailQueueStatusManager.fetchEmailQueueStatus(emailIds)
  251. if err != nil {
  252. log.Errorf("EmailQueueStatusManager.fetchEmailQueueStatus fail %s", err)
  253. return rows
  254. }
  255. for i := range rows {
  256. eq := objs[i].(*SEmailQueue)
  257. if eqs, ok := rets[eq.Id]; ok {
  258. rows[i].Status = eqs.Status
  259. rows[i].SentAt = eqs.SentAt
  260. rows[i].Results = eqs.Results
  261. }
  262. }
  263. return rows
  264. }