subscriber.go 26 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775
  1. // Copyright 2019 Yunion
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package models
  15. import (
  16. "context"
  17. "database/sql"
  18. "fmt"
  19. "sort"
  20. "strings"
  21. "golang.org/x/sync/errgroup"
  22. "k8s.io/apimachinery/pkg/util/sets"
  23. "yunion.io/x/jsonutils"
  24. "yunion.io/x/log"
  25. "yunion.io/x/pkg/errors"
  26. "yunion.io/x/pkg/tristate"
  27. "yunion.io/x/pkg/util/rbacscope"
  28. "yunion.io/x/pkg/utils"
  29. "yunion.io/x/sqlchemy"
  30. api "yunion.io/x/onecloud/pkg/apis/notify"
  31. "yunion.io/x/onecloud/pkg/cloudcommon/db"
  32. "yunion.io/x/onecloud/pkg/httperrors"
  33. "yunion.io/x/onecloud/pkg/mcclient"
  34. "yunion.io/x/onecloud/pkg/mcclient/auth"
  35. "yunion.io/x/onecloud/pkg/mcclient/modules/identity"
  36. modules "yunion.io/x/onecloud/pkg/mcclient/modules/identity"
  37. "yunion.io/x/onecloud/pkg/notify/options"
  38. "yunion.io/x/onecloud/pkg/util/logclient"
  39. "yunion.io/x/onecloud/pkg/util/stringutils2"
  40. )
  41. var SubscriberManager *SSubscriberManager
  42. func init() {
  43. SubscriberManager = &SSubscriberManager{
  44. SStandaloneAnonResourceBaseManager: db.NewStandaloneAnonResourceBaseManager(
  45. SSubscriber{},
  46. "subscriber_tbl",
  47. "subscriber",
  48. "subscribers",
  49. ),
  50. }
  51. SubscriberManager.SetVirtualObject(SubscriberManager)
  52. }
  53. type SSubscriberManager struct {
  54. db.SStandaloneAnonResourceBaseManager
  55. db.SEnabledResourceBaseManager
  56. }
  57. // 消息订阅接收人
  58. type SSubscriber struct {
  59. db.SStandaloneAnonResourceBase
  60. db.SEnabledResourceBase
  61. TopicId string `width:"128" charset:"ascii" nullable:"false" index:"true" get:"user" list:"user" create:"required"`
  62. Type string `width:"16" charset:"ascii" nullable:"false" index:"true" get:"user" list:"user" create:"required"`
  63. Identification string `width:"128" charset:"ascii" nullable:"false" index:"true"`
  64. RoleScope string `width:"8" charset:"ascii" nullable:"false" get:"user" list:"user" create:"optional"`
  65. ResourceScope string `width:"8" charset:"ascii" nullable:"false" get:"user" list:"user" create:"required"`
  66. ResourceAttributionId string `width:"128" charset:"ascii" nullable:"false" get:"user" list:"user" create:"optional"`
  67. ResourceAttributionName string `width:"128" charset:"utf8" list:"user" create:"optional"`
  68. Scope string `width:"128" charset:"ascii" nullable:"false" create:"required"`
  69. DomainId string `width:"128" charset:"ascii" nullable:"false" create:"optional"`
  70. // minutes
  71. GroupTimes uint32 `nullable:"true" list:"user" update:"user"`
  72. }
  73. func (sm *SSubscriberManager) validateReceivers(ctx context.Context, receivers []string) ([]string, error) {
  74. rs, err := ReceiverManager.FetchByIdOrNames(ctx, receivers...)
  75. if err != nil {
  76. return nil, errors.Wrap(err, "unable to fetch Receivers")
  77. }
  78. reSet := sets.NewString(receivers...)
  79. reIds := make([]string, len(rs))
  80. for i := range rs {
  81. reSet.Delete(rs[i].GetId())
  82. reSet.Delete(rs[i].GetName())
  83. reIds[i] = rs[i].GetId()
  84. }
  85. if reSet.Len() > 0 {
  86. return nil, httperrors.NewInputParameterError("receivers %q not found", strings.Join(reSet.UnsortedList(), ", "))
  87. }
  88. return reIds, nil
  89. }
  90. func (self *SSubscriber) GetEnabledReceivers() ([]SReceiver, error) {
  91. q := ReceiverManager.Query().IsTrue("enabled")
  92. sq := SubscriberReceiverManager.Query().SubQuery()
  93. q = q.Join(sq, sqlchemy.Equals(q.Field("id"), sq.Field("receiver_id"))).Filter(sqlchemy.Equals(sq.Field("subscriber_id"), self.Id))
  94. ret := []SReceiver{}
  95. return ret, db.FetchModelObjects(ReceiverManager, q, &ret)
  96. }
  97. func (self *SSubscriber) GetRobot() (*SRobot, error) {
  98. robot, err := RobotManager.FetchById(self.Identification)
  99. if err != nil {
  100. return nil, errors.Wrapf(err, "RobotManager.FetchById(%s)", self.Identification)
  101. }
  102. return robot.(*SRobot), nil
  103. }
  104. func (sm *SSubscriberManager) ValidateCreateData(ctx context.Context, userCred mcclient.TokenCredential, ownerId mcclient.IIdentityProvider, query jsonutils.JSONObject, input api.SubscriberCreateInput) (api.SubscriberCreateInput, error) {
  105. var err error
  106. // permission check
  107. sSystem, sDomain := string(rbacscope.ScopeSystem), string(rbacscope.ScopeDomain)
  108. switch input.Scope {
  109. case sSystem:
  110. allow := db.IsAdminAllowCreate(userCred, sm)
  111. if allow.Result.IsDeny() {
  112. return input, httperrors.NewForbiddenError("The scope %s and the role of the operator do not match", input.Scope)
  113. }
  114. case sDomain:
  115. allow := db.IsDomainAllowCreate(userCred, sm)
  116. if allow.Result.IsDeny() {
  117. return input, httperrors.NewForbiddenError("The scope %s and the role of the operator do not match", input.Scope)
  118. }
  119. default:
  120. return input, httperrors.NewInputParameterError("unknown scope %s", input.Scope)
  121. }
  122. input.StandaloneAnonResourceCreateInput, err = sm.SStandaloneAnonResourceBaseManager.ValidateCreateData(ctx, userCred, ownerId, query, input.StandaloneAnonResourceCreateInput)
  123. if err != nil {
  124. return input, errors.Wrap(err, "SVirtualResourceBaseManager.ValidateCreateData")
  125. }
  126. // check topic
  127. t, err := TopicManager.FetchById(input.TopicID)
  128. if err != nil {
  129. return input, errors.Wrapf(err, "unable to fetch topic %s", input.TopicID)
  130. }
  131. // check resource scope
  132. if !utils.IsInStringArray(input.ResourceScope, []string{api.SUBSCRIBER_SCOPE_SYSTEM, api.SUBSCRIBER_SCOPE_DOMAIN, api.SUBSCRIBER_SCOPE_PROJECT}) {
  133. return input, httperrors.NewInputParameterError("unknown resource_scope %q", input.ResourceScope)
  134. }
  135. // resource Attribution Id
  136. var domainId string
  137. switch input.ResourceScope {
  138. case api.SUBSCRIBER_SCOPE_SYSTEM:
  139. input.ResourceAttributionId = ""
  140. input.DomainId = ""
  141. case api.SUBSCRIBER_SCOPE_PROJECT:
  142. tenant, err := db.TenantCacheManager.FetchTenantById(ctx, input.ResourceAttributionId)
  143. if err != nil {
  144. return input, errors.Wrapf(err, "unable to fetch project %s", input.ResourceAttributionId)
  145. }
  146. domainId = tenant.DomainId
  147. input.DomainId = domainId
  148. input.ResourceAttributionId = tenant.GetId()
  149. input.ResourceAttributionName = tenant.GetName()
  150. case api.SUBSCRIBER_SCOPE_DOMAIN:
  151. tenant, err := db.TenantCacheManager.FetchDomainByIdOrName(ctx, input.ResourceAttributionId)
  152. if err != nil {
  153. return input, errors.Wrapf(err, "unable to fetch domain %s", input.ResourceAttributionId)
  154. }
  155. domainId = tenant.Id
  156. input.DomainId = domainId
  157. input.ResourceAttributionId = tenant.Id
  158. input.ResourceAttributionName = tenant.Name
  159. }
  160. if input.Scope == sDomain && domainId != userCred.GetProjectDomainId() {
  161. return input, httperrors.NewForbiddenError("domain %s admin can't create subscriber for domain %s", userCred.GetProjectDomainId(), domainId)
  162. }
  163. var checkQuery *sqlchemy.SQuery
  164. input.TopicID = t.GetId()
  165. switch input.Type {
  166. case api.SUBSCRIBER_TYPE_RECEIVER:
  167. reIds, err := sm.validateReceivers(ctx, input.Receivers)
  168. if err != nil {
  169. return input, err
  170. }
  171. input.Receivers = reIds
  172. case api.SUBSCRIBER_TYPE_ROLE:
  173. if input.RoleScope == "" {
  174. input.RoleScope = input.ResourceScope
  175. }
  176. roleCache, err := db.RoleCacheManager.FetchRoleByIdOrName(ctx, input.Role)
  177. if err != nil {
  178. return input, errors.Wrapf(err, "unable find role %q", input.Role)
  179. }
  180. input.Role = roleCache.GetId()
  181. checkQuery = sm.Query().Equals("topic_id", input.TopicID).Equals("type", api.SUBSCRIBER_TYPE_ROLE).Equals("resource_scope", input.ResourceScope).Equals("identification", input.Role).Equals("role_scope", input.RoleScope)
  182. case api.SUBSCRIBER_TYPE_ROBOT:
  183. robot, err := RobotManager.FetchByIdOrName(ctx, userCred, input.Robot)
  184. if errors.Cause(err) == sql.ErrNoRows {
  185. return input, httperrors.NewInputParameterError("robot %q not found", input.Robot)
  186. }
  187. if err != nil {
  188. return input, errors.Wrapf(err, "unable to fetch robot %q", input.Robot)
  189. }
  190. input.Robot = robot.GetId()
  191. checkQuery = sm.Query().Equals("type", api.SUBSCRIBER_TYPE_ROLE).Equals("topic_id", input.TopicID).Equals("resource_scope", input.ResourceScope).Equals("identification", input.Robot)
  192. default:
  193. return input, httperrors.NewInputParameterError("unkown type %q", input.Type)
  194. }
  195. // check type+resourceScope+identification
  196. if checkQuery != nil {
  197. count, err := checkQuery.CountWithError()
  198. if err != nil {
  199. return input, errors.Wrap(err, "unable to count")
  200. }
  201. if count > 0 {
  202. return input, httperrors.NewForbiddenError("repeated with existing subscribers")
  203. }
  204. }
  205. if input.GroupTimes != nil {
  206. if *input.GroupTimes < 0 {
  207. return input, httperrors.NewInputParameterError("invalidate group_times %d", input.GroupTimes)
  208. }
  209. }
  210. return input, nil
  211. }
  212. func (s *SSubscriber) PostCreate(ctx context.Context, userCred mcclient.TokenCredential, ownerId mcclient.IIdentityProvider, query jsonutils.JSONObject, data jsonutils.JSONObject) {
  213. s.SStandaloneAnonResourceBase.PostCreate(ctx, userCred, ownerId, query, data)
  214. var input api.SubscriberCreateInput
  215. _ = data.Unmarshal(&input)
  216. if s.Type == api.SUBSCRIBER_TYPE_RECEIVER {
  217. err := s.SetReceivers(ctx, input.Receivers)
  218. if err != nil {
  219. logclient.AddActionLogWithContext(ctx, s, logclient.ACT_CREATE, err.Error(), userCred, false)
  220. _, err := db.Update(s, func() error {
  221. s.SetEnabled(false)
  222. return nil
  223. })
  224. if err != nil {
  225. log.Errorf("unable to enable subscriber: %v", err)
  226. }
  227. }
  228. }
  229. logclient.AddActionLogWithContext(ctx, s, logclient.ACT_CREATE, "", userCred, true)
  230. return
  231. }
  232. func (s *SSubscriber) CustomizeCreate(ctx context.Context, userCred mcclient.TokenCredential, ownerId mcclient.IIdentityProvider, query jsonutils.JSONObject, data jsonutils.JSONObject) error {
  233. err := s.SStandaloneAnonResourceBase.CustomizeCreate(ctx, userCred, ownerId, query, data)
  234. if err != nil {
  235. return errors.Wrap(err, "SVirtualResourceBase.CustomizeCreate")
  236. }
  237. var input api.SubscriberCreateInput
  238. _ = data.Unmarshal(&input)
  239. switch input.Type {
  240. case api.SUBSCRIBER_TYPE_RECEIVER:
  241. case api.SUBSCRIBER_TYPE_ROBOT:
  242. s.Identification = input.Robot
  243. case api.SUBSCRIBER_TYPE_ROLE:
  244. s.Identification = input.Role
  245. }
  246. s.Enabled = tristate.True
  247. return nil
  248. }
  249. func (s *SSubscriber) PerformChange(ctx context.Context, userCred mcclient.TokenCredential, query jsonutils.JSONObject, input api.SubscriberChangeInput) (jsonutils.JSONObject, error) {
  250. if s.Scope == string(rbacscope.ScopeSystem) {
  251. if !db.IsAdminAllowUpdate(ctx, userCred, s) {
  252. return nil, httperrors.NewForbiddenError("")
  253. }
  254. } else {
  255. if !db.IsDomainAllowUpdate(ctx, userCred, s) {
  256. return nil, httperrors.NewForbiddenError("")
  257. }
  258. if s.DomainId != userCred.GetProjectDomainId() {
  259. return nil, httperrors.NewForbiddenError("")
  260. }
  261. }
  262. switch s.Type {
  263. case api.SUBSCRIBER_TYPE_RECEIVER:
  264. err := s.SetReceivers(ctx, input.Receivers)
  265. if err != nil {
  266. log.Errorf("unable to set receivers %s", input.Receivers)
  267. }
  268. case api.SUBSCRIBER_TYPE_ROBOT:
  269. _, err := db.Update(s, func() error {
  270. s.Identification = input.Robot
  271. return nil
  272. })
  273. if err != nil {
  274. return nil, errors.Wrap(err, "unable to update subscriber")
  275. }
  276. case api.SUBSCRIBER_TYPE_ROLE:
  277. _, err := db.Update(s, func() error {
  278. s.Identification = input.Role
  279. if input.RoleScope != "" {
  280. s.RoleScope = input.RoleScope
  281. }
  282. return nil
  283. })
  284. if err != nil {
  285. return nil, errors.Wrap(err, "unable to update subscriber")
  286. }
  287. }
  288. if input.GroupTimes != nil {
  289. _, err := db.Update(s, func() error {
  290. s.GroupTimes = *input.GroupTimes
  291. return nil
  292. })
  293. if err != nil {
  294. return nil, errors.Wrap(err, "unable to update subscriber group_times")
  295. }
  296. }
  297. return nil, nil
  298. }
  299. func (sm *SSubscriberManager) ListItemFilter(ctx context.Context, q *sqlchemy.SQuery, userCred mcclient.TokenCredential, input api.SubscriberListInput) (*sqlchemy.SQuery, error) {
  300. var err error
  301. q, err = sm.SStandaloneAnonResourceBaseManager.ListItemFilter(ctx, q, userCred, input.StandaloneAnonResourceListInput)
  302. if err != nil {
  303. return nil, errors.Wrap(err, "SVirtualResourceBaseManager.ListItemFilter")
  304. }
  305. q, err = sm.SEnabledResourceBaseManager.ListItemFilter(ctx, q, userCred, input.EnabledResourceBaseListInput)
  306. if err != nil {
  307. return nil, errors.Wrap(err, "SEnabledResourceBaseManager.ListItemFilter")
  308. }
  309. sSystem, sDomain := string(rbacscope.ScopeSystem), string(rbacscope.ScopeDomain)
  310. if input.Scope == "" {
  311. input.Scope = sSystem
  312. }
  313. switch input.Scope {
  314. case sSystem:
  315. allow := db.IsAdminAllowList(userCred, sm)
  316. if allow.Result.IsDeny() {
  317. return nil, httperrors.NewForbiddenError("")
  318. }
  319. case sDomain:
  320. allow := db.IsDomainAllowList(userCred, sm)
  321. if allow.Result.IsDeny() {
  322. return nil, httperrors.NewForbiddenError("")
  323. }
  324. q = q.Equals("domain_id", userCred.GetProjectDomainId())
  325. default:
  326. return nil, httperrors.NewInputParameterError("unkown scope %s", input.Scope)
  327. }
  328. if input.TopicID != "" {
  329. q = q.Equals("topic_id", input.TopicID)
  330. }
  331. if input.Type != "" {
  332. q = q.Equals("type", input.Type)
  333. }
  334. if input.ResourceScope != "" {
  335. q = q.Equals("resource_scope", input.ResourceScope)
  336. }
  337. return q, nil
  338. }
  339. func (sm *SSubscriberManager) FetchCustomizeColumns(ctx context.Context, userCred mcclient.TokenCredential, query jsonutils.JSONObject, objs []interface{}, fields stringutils2.SSortedStrings, isList bool) []api.SubscriberDetails {
  340. var err error
  341. vRows := sm.SStandaloneAnonResourceBaseManager.FetchCustomizeColumns(ctx, userCred, query, objs, fields, isList)
  342. rows := make([]api.SubscriberDetails, len(objs))
  343. for i := range rows {
  344. rows[i].StandaloneAnonResourceDetails = vRows[i]
  345. s := objs[i].(*SSubscriber)
  346. switch s.Type {
  347. case api.SUBSCRIBER_TYPE_RECEIVER:
  348. rows[i].Receivers, err = s.receiverIdentifications()
  349. if err != nil {
  350. log.Errorf("unable to get receiverIdentifications for subscriber %q: %v", s.Id, err)
  351. }
  352. case api.SUBSCRIBER_TYPE_ROBOT:
  353. rows[i].Robot, err = s.robotIdentification()
  354. if err != nil {
  355. log.Errorf("unable get robotIdentification for subscriber %q: %v", s.Id, err)
  356. }
  357. case api.SUBSCRIBER_TYPE_ROLE:
  358. rows[i].Role, err = s.roleIdentification(ctx)
  359. if err != nil {
  360. log.Errorf("unable to get roleIdentification for subscriber %q: %v", s.Id, err)
  361. }
  362. }
  363. }
  364. return rows
  365. }
  366. func (s *SSubscriber) CustomizeDelete(ctx context.Context, userCred mcclient.TokenCredential, query jsonutils.JSONObject, data jsonutils.JSONObject) error {
  367. err := s.SStandaloneAnonResourceBase.CustomizeDelete(ctx, userCred, query, data)
  368. if err != nil {
  369. return err
  370. }
  371. if s.Scope == string(rbacscope.ScopeSystem) {
  372. if !db.IsAdminAllowDelete(ctx, userCred, s) {
  373. return httperrors.NewForbiddenError("")
  374. }
  375. } else {
  376. if !db.IsDomainAllowDelete(ctx, userCred, s) {
  377. return httperrors.NewForbiddenError("")
  378. }
  379. if s.DomainId != userCred.GetProjectDomainId() {
  380. return httperrors.NewForbiddenError("")
  381. }
  382. }
  383. return nil
  384. }
  385. func (s *SSubscriber) receiverIdentifications() ([]api.Identification, error) {
  386. srSubq := SubscriberReceiverManager.Query().Equals("subscriber_id", s.Id).SubQuery()
  387. rq := ReceiverManager.Query("id", "name")
  388. rq = rq.Join(srSubq, sqlchemy.Equals(srSubq.Field("receiver_id"), rq.Field("id")))
  389. var ret []api.Identification
  390. err := rq.All(&ret)
  391. if err != nil {
  392. return nil, err
  393. }
  394. return ret, nil
  395. }
  396. func (s *SSubscriber) robotIdentification() (api.Identification, error) {
  397. var ret api.Identification
  398. q := RobotManager.Query("id", "name").Equals("id", s.Identification)
  399. err := q.First(&ret)
  400. if err != nil {
  401. return ret, err
  402. }
  403. return ret, nil
  404. }
  405. func (s *SSubscriber) roleIdentification(ctx context.Context) (api.Identification, error) {
  406. var ret api.Identification
  407. roleCache, err := db.RoleCacheManager.FetchRoleById(ctx, s.Identification)
  408. if err != nil {
  409. return ret, errors.Wrapf(err, "unable to find role %q", s.Identification)
  410. }
  411. ret.ID = s.Identification
  412. ret.Name = roleCache.Name
  413. return ret, nil
  414. }
  415. func (srm *SSubscriberManager) robot(tid, projectDomainId, projectId string) (map[string]uint32, error) {
  416. srs, err := srm.findSuitableOnes(tid, projectDomainId, projectId, api.SUBSCRIBER_TYPE_ROBOT)
  417. if err != nil {
  418. return nil, err
  419. }
  420. robotIds := make(map[string]uint32)
  421. for i := range srs {
  422. // robotIds[i] = srs[i].Identification
  423. robotIds[srs[i].Identification] = srs[i].GroupTimes
  424. }
  425. return robotIds, nil
  426. }
  427. func (srm *SSubscriberManager) findSuitableOnes(tid, projectDomainId, projectId string, types ...string) ([]SSubscriber, error) {
  428. q := srm.Query().Equals("topic_id", tid).IsTrue("enabled")
  429. q = q.Filter(sqlchemy.OR(
  430. sqlchemy.AND(
  431. sqlchemy.Equals(q.Field("resource_scope"), api.SUBSCRIBER_SCOPE_PROJECT),
  432. sqlchemy.Equals(q.Field("resource_attribution_id"), projectId),
  433. ),
  434. sqlchemy.AND(
  435. sqlchemy.Equals(q.Field("resource_scope"), api.SUBSCRIBER_SCOPE_DOMAIN),
  436. sqlchemy.Equals(q.Field("resource_attribution_id"), projectDomainId),
  437. ),
  438. sqlchemy.Equals(q.Field("resource_scope"), api.SUBSCRIBER_SCOPE_SYSTEM),
  439. ))
  440. switch len(types) {
  441. case 0:
  442. case 1:
  443. q = q.Equals("type", types[0])
  444. default:
  445. q = q.In("type", types)
  446. }
  447. srs := make([]SSubscriber, 0, 1)
  448. err := db.FetchModelObjects(srm, q, &srs)
  449. if err != nil {
  450. return nil, err
  451. }
  452. return srs, nil
  453. }
  454. // TODO: Use cache to increase speed
  455. func (srm *SSubscriberManager) getReceiversSent(ctx context.Context, tid string, projectDomainId string, projectId string) (map[string]uint32, error) {
  456. srs, err := srm.findSuitableOnes(tid, projectDomainId, projectId, api.SUBSCRIBER_TYPE_RECEIVER, api.SUBSCRIBER_TYPE_ROLE)
  457. if err != nil {
  458. return nil, err
  459. }
  460. // 接受人-聚合时间
  461. receivers := make(map[string]uint32)
  462. // 角色-接受人
  463. roleMap := make(map[string][]string, 3)
  464. // 接受角色-接受人-聚合时间
  465. receivermap := make(map[string]map[string]uint32, 3)
  466. // 聚合时间
  467. roleGroupTimes := 0
  468. for _, sr := range srs {
  469. if sr.Type == api.SUBSCRIBER_TYPE_RECEIVER {
  470. rIds, err := sr.getReceivers()
  471. if err != nil {
  472. return nil, errors.Wrap(err, "unable to get receivers")
  473. }
  474. for _, receiveId := range rIds {
  475. // receivers = append(receivers, api.SReceiverWithGroupTimes{ReceiverId: receiveId, GroupTimes: sr.GroupTimes})
  476. receivers[receiveId] = sr.GroupTimes
  477. }
  478. } else if sr.Type == api.SUBSCRIBER_TYPE_ROLE {
  479. roleGroupTimes = int(sr.GroupTimes)
  480. roleMap[sr.RoleScope] = append(roleMap[sr.RoleScope], sr.Identification)
  481. receivermap[sr.RoleScope] = map[string]uint32{}
  482. }
  483. }
  484. errgo, _ := errgroup.WithContext(ctx)
  485. for _scope, _roles := range roleMap {
  486. scope, roles := _scope, _roles
  487. errgo.Go(func() error {
  488. query := jsonutils.NewDict()
  489. query.Set("roles", jsonutils.NewStringArray(roles))
  490. query.Set("effective", jsonutils.JSONTrue)
  491. switch scope {
  492. case api.SUBSCRIBER_SCOPE_SYSTEM:
  493. case api.SUBSCRIBER_SCOPE_DOMAIN:
  494. if len(projectDomainId) == 0 {
  495. return fmt.Errorf("need projectDomainId")
  496. }
  497. query.Set("project_domain_id", jsonutils.NewString(projectDomainId))
  498. case api.SUBSCRIBER_SCOPE_PROJECT:
  499. if len(projectId) == 0 {
  500. return fmt.Errorf("need projectId")
  501. }
  502. query.Add(jsonutils.NewString(projectId), "scope", "project", "id")
  503. }
  504. s := auth.GetAdminSession(ctx, "")
  505. listRet, err := modules.RoleAssignments.List(s, query)
  506. if err != nil {
  507. return errors.Wrap(err, "unable to list RoleAssignments")
  508. }
  509. for i := range listRet.Data {
  510. ras := listRet.Data[i]
  511. user, err := ras.Get("user")
  512. if err == nil {
  513. id, err := user.GetString("id")
  514. if err != nil {
  515. return errors.Wrap(err, "unable to get user.id from result of RoleAssignments.List")
  516. }
  517. if _, ok := receivermap[scope][id]; !ok {
  518. receivermap[scope][id] = uint32(roleGroupTimes)
  519. }
  520. }
  521. }
  522. return nil
  523. })
  524. }
  525. err = errgo.Wait()
  526. if err != nil {
  527. return nil, err
  528. }
  529. for _, res := range receivermap {
  530. for receive, time := range res {
  531. if t, ok := receivers[receive]; !ok || t == 0 {
  532. receivers[receive] = time
  533. }
  534. }
  535. }
  536. // de-duplication
  537. return receivers, nil
  538. }
  539. func (sr *SSubscriber) getReceivers() ([]string, error) {
  540. srrs, err := SubscriberReceiverManager.getBySubscriberId(sr.Id)
  541. if err != nil {
  542. return nil, err
  543. }
  544. rIds := make([]string, len(srrs))
  545. for i := range srrs {
  546. rIds[i] = srrs[i].ReceiverId
  547. }
  548. return rIds, nil
  549. }
  550. func (sr *SSubscriber) SetReceivers(ctx context.Context, receiverIds []string) error {
  551. srrs, err := SubscriberReceiverManager.getBySubscriberId(sr.Id)
  552. if err != nil {
  553. return errors.Wrapf(err, "unable to get SRReceiver by Subscriber %s", sr.Id)
  554. }
  555. dbReceivers := make([]string, len(srrs))
  556. for i := range srrs {
  557. dbReceivers[i] = srrs[i].ReceiverId
  558. }
  559. var addReceivers, rmReceivers []string
  560. sort.Strings(dbReceivers)
  561. sort.Strings(receiverIds)
  562. for i, j := 0, 0; i < len(dbReceivers) || j < len(receiverIds); {
  563. switch {
  564. case i == len(dbReceivers):
  565. addReceivers = append(addReceivers, receiverIds[j])
  566. j++
  567. case j == len(receiverIds):
  568. rmReceivers = append(rmReceivers, dbReceivers[i])
  569. i++
  570. case dbReceivers[i] > receiverIds[j]:
  571. addReceivers = append(addReceivers, receiverIds[j])
  572. j++
  573. case dbReceivers[i] < receiverIds[j]:
  574. rmReceivers = append(rmReceivers, dbReceivers[i])
  575. i++
  576. case dbReceivers[i] == receiverIds[j]:
  577. i++
  578. j++
  579. }
  580. }
  581. // add
  582. for _, rId := range addReceivers {
  583. _, err := SubscriberReceiverManager.create(ctx, sr.Id, rId)
  584. if err != nil {
  585. return errors.Wrapf(err, "unable to connect subscription receiver %q with receiver %q", sr.Id, rId)
  586. }
  587. }
  588. for _, rId := range rmReceivers {
  589. err := SubscriberReceiverManager.delete(sr.Id, rId)
  590. if err != nil {
  591. return errors.Wrapf(err, "unable to disconnect subscription receiver %q with receiver %q", sr.Id, rId)
  592. }
  593. }
  594. return nil
  595. }
  596. func (s *SSubscriber) PerformSetReceiver(ctx context.Context, userCred mcclient.TokenCredential, query jsonutils.JSONObject, input api.SubscriberSetReceiverInput) (jsonutils.JSONObject, error) {
  597. reIds, err := SubscriberManager.validateReceivers(ctx, input.Receivers)
  598. if err != nil {
  599. return nil, err
  600. }
  601. return nil, s.SetReceivers(ctx, reIds)
  602. }
  603. func (s *SSubscriber) PerformEnable(ctx context.Context, userCred mcclient.TokenCredential, query jsonutils.JSONObject, input jsonutils.JSONObject) (jsonutils.JSONObject, error) {
  604. err := db.EnabledPerformEnable(s, ctx, userCred, true)
  605. if err != nil {
  606. return nil, errors.Wrap(err, "EnabledPerformEnable")
  607. }
  608. return nil, nil
  609. }
  610. func (s *SSubscriber) PerformDisable(ctx context.Context, userCred mcclient.TokenCredential, query jsonutils.JSONObject, input jsonutils.JSONObject) (jsonutils.JSONObject, error) {
  611. err := db.EnabledPerformEnable(s, ctx, userCred, false)
  612. if err != nil {
  613. return nil, errors.Wrap(err, "EnabledPerformEnable")
  614. }
  615. return nil, nil
  616. }
  617. func (sm *SSubscriberManager) QueryDistinctExtraField(q *sqlchemy.SQuery, field string) (*sqlchemy.SQuery, error) {
  618. q, err := sm.SStandaloneAnonResourceBaseManager.QueryDistinctExtraField(q, field)
  619. if err == nil {
  620. return q, nil
  621. }
  622. switch field {
  623. case "resource_scope":
  624. return sm.Query("resource_scope").Distinct(), nil
  625. case "type":
  626. return sm.Query("type").Distinct(), nil
  627. }
  628. return q, nil
  629. }
  630. var defaultNotifyTopics = []string{
  631. DefaultServerPanicked,
  632. DefaultServiceAbnormal,
  633. DefaultNetOutOfSync,
  634. DefaultMysqlOutOfSync,
  635. DefaultActionLogExceedCount,
  636. }
  637. func (sm *SSubscriberManager) InitializeData() error {
  638. ctx := context.Background()
  639. session := auth.GetAdminSession(ctx, options.Options.Region)
  640. // 获取系统管理员角色id
  641. params := map[string]interface{}{
  642. "project_domain": "default",
  643. }
  644. role, err := identity.RolesV3.Get(session, "admin", jsonutils.Marshal(params))
  645. if err != nil {
  646. return errors.Wrap(err, "identity.RolesV3.List")
  647. }
  648. roleId, _ := role.GetString("id")
  649. q := TopicManager.Query()
  650. q = q.Filter(sqlchemy.OR(sqlchemy.In(q.Field("name"), defaultNotifyTopics)))
  651. topics := []STopic{}
  652. err = db.FetchModelObjects(TopicManager, q, &topics)
  653. if err != nil {
  654. return errors.Wrap(err, "FetchModelObjects topic")
  655. }
  656. for _, topic := range topics {
  657. q := sm.Query()
  658. q = q.Equals("topic_id", topic.Id)
  659. q = q.Equals("type", api.SUBSCRIBER_TYPE_ROLE)
  660. q = q.Equals("identification", roleId)
  661. count, err := q.CountWithError()
  662. if err != nil {
  663. return errors.Wrap(err, "CountWithError")
  664. }
  665. if count != 0 {
  666. continue
  667. }
  668. subscriber := SSubscriber{}
  669. subscriber.Type = api.SUBSCRIBER_TYPE_ROLE
  670. subscriber.Identification = roleId
  671. subscriber.TopicId = topic.Id
  672. subscriber.Scope = api.SUBSCRIBER_SCOPE_SYSTEM
  673. subscriber.ResourceScope = api.SUBSCRIBER_SCOPE_SYSTEM
  674. subscriber.Enabled = tristate.True
  675. sm.TableSpec().Insert(ctx, &subscriber)
  676. }
  677. return nil
  678. }
  679. // 根据接受人ID获取订阅
  680. func getSubscriberByReceiverId(receiverId string, showDisabled bool) ([]SSubscriber, error) {
  681. results := []SSubscriber{}
  682. tempRes := []SSubscriber{}
  683. // q1 根据接受人ID查找(优先)
  684. q1 := SubscriberManager.Query()
  685. q1 = q1.Equals("type", api.SUBSCRIBER_TYPE_RECEIVER)
  686. srq := SubscriberReceiverManager.Query().Equals("receiver_id", receiverId)
  687. srsq := srq.SubQuery()
  688. if !showDisabled {
  689. q1 = q1.Equals("enabled", true)
  690. }
  691. q1.Join(srsq, sqlchemy.Equals(q1.Field("id"), srsq.Field("subscriber_id")))
  692. err := db.FetchModelObjects(SubscriberManager, q1, &tempRes)
  693. if err != nil {
  694. return nil, errors.Wrap(err, "fetch receiver")
  695. }
  696. results = append(results, tempRes...)
  697. roleArr := []string{}
  698. // 获取当前接受人所有角色
  699. s := auth.GetAdminSession(context.Background(), options.Options.Region)
  700. query := jsonutils.NewDict()
  701. query.Add(jsonutils.NewString("system"), "scope")
  702. query.Add(jsonutils.NewString("user"), "resource")
  703. query.Add(jsonutils.NewBool(true), "details")
  704. query.Add(jsonutils.NewString("project"), "group_by")
  705. query.Add(jsonutils.NewBool(true), "effective")
  706. resp, err := identity.RoleAssignments.GetProjectRole(s, receiverId, query)
  707. if err != nil {
  708. return nil, errors.Wrap(err, "UserCacheManager.FetchUserByIdOrName")
  709. }
  710. dataArr, _ := resp.GetArray("data")
  711. for _, data := range dataArr {
  712. groupArr, _ := data.GetArray("groups")
  713. for _, group := range groupArr {
  714. rolesArr, _ := group.GetArray("roles")
  715. for _, role := range rolesArr {
  716. roleId, _ := role.GetString("id")
  717. roleArr = append(roleArr, roleId)
  718. }
  719. }
  720. }
  721. // q2 根据角色查找
  722. q2 := SubscriberManager.Query()
  723. q2 = q2.Equals("type", api.SUBSCRIBER_TYPE_ROLE)
  724. if !showDisabled {
  725. q2 = q2.Equals("enabled", true)
  726. }
  727. q2 = q2.In("identification", roleArr)
  728. tempRes = []SSubscriber{}
  729. err = db.FetchModelObjects(SubscriberManager, q2, &tempRes)
  730. if err != nil {
  731. return nil, errors.Wrap(err, "fetch role")
  732. }
  733. results = append(results, tempRes...)
  734. return results, nil
  735. }