| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775 |
- // Copyright 2019 Yunion
- //
- // Licensed under the Apache License, Version 2.0 (the "License");
- // you may not use this file except in compliance with the License.
- // You may obtain a copy of the License at
- //
- // http://www.apache.org/licenses/LICENSE-2.0
- //
- // Unless required by applicable law or agreed to in writing, software
- // distributed under the License is distributed on an "AS IS" BASIS,
- // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- // See the License for the specific language governing permissions and
- // limitations under the License.
- package models
- import (
- "context"
- "database/sql"
- "fmt"
- "sort"
- "strings"
- "golang.org/x/sync/errgroup"
- "k8s.io/apimachinery/pkg/util/sets"
- "yunion.io/x/jsonutils"
- "yunion.io/x/log"
- "yunion.io/x/pkg/errors"
- "yunion.io/x/pkg/tristate"
- "yunion.io/x/pkg/util/rbacscope"
- "yunion.io/x/pkg/utils"
- "yunion.io/x/sqlchemy"
- api "yunion.io/x/onecloud/pkg/apis/notify"
- "yunion.io/x/onecloud/pkg/cloudcommon/db"
- "yunion.io/x/onecloud/pkg/httperrors"
- "yunion.io/x/onecloud/pkg/mcclient"
- "yunion.io/x/onecloud/pkg/mcclient/auth"
- "yunion.io/x/onecloud/pkg/mcclient/modules/identity"
- modules "yunion.io/x/onecloud/pkg/mcclient/modules/identity"
- "yunion.io/x/onecloud/pkg/notify/options"
- "yunion.io/x/onecloud/pkg/util/logclient"
- "yunion.io/x/onecloud/pkg/util/stringutils2"
- )
- var SubscriberManager *SSubscriberManager
- func init() {
- SubscriberManager = &SSubscriberManager{
- SStandaloneAnonResourceBaseManager: db.NewStandaloneAnonResourceBaseManager(
- SSubscriber{},
- "subscriber_tbl",
- "subscriber",
- "subscribers",
- ),
- }
- SubscriberManager.SetVirtualObject(SubscriberManager)
- }
- type SSubscriberManager struct {
- db.SStandaloneAnonResourceBaseManager
- db.SEnabledResourceBaseManager
- }
- // 消息订阅接收人
- type SSubscriber struct {
- db.SStandaloneAnonResourceBase
- db.SEnabledResourceBase
- TopicId string `width:"128" charset:"ascii" nullable:"false" index:"true" get:"user" list:"user" create:"required"`
- Type string `width:"16" charset:"ascii" nullable:"false" index:"true" get:"user" list:"user" create:"required"`
- Identification string `width:"128" charset:"ascii" nullable:"false" index:"true"`
- RoleScope string `width:"8" charset:"ascii" nullable:"false" get:"user" list:"user" create:"optional"`
- ResourceScope string `width:"8" charset:"ascii" nullable:"false" get:"user" list:"user" create:"required"`
- ResourceAttributionId string `width:"128" charset:"ascii" nullable:"false" get:"user" list:"user" create:"optional"`
- ResourceAttributionName string `width:"128" charset:"utf8" list:"user" create:"optional"`
- Scope string `width:"128" charset:"ascii" nullable:"false" create:"required"`
- DomainId string `width:"128" charset:"ascii" nullable:"false" create:"optional"`
- // minutes
- GroupTimes uint32 `nullable:"true" list:"user" update:"user"`
- }
- func (sm *SSubscriberManager) validateReceivers(ctx context.Context, receivers []string) ([]string, error) {
- rs, err := ReceiverManager.FetchByIdOrNames(ctx, receivers...)
- if err != nil {
- return nil, errors.Wrap(err, "unable to fetch Receivers")
- }
- reSet := sets.NewString(receivers...)
- reIds := make([]string, len(rs))
- for i := range rs {
- reSet.Delete(rs[i].GetId())
- reSet.Delete(rs[i].GetName())
- reIds[i] = rs[i].GetId()
- }
- if reSet.Len() > 0 {
- return nil, httperrors.NewInputParameterError("receivers %q not found", strings.Join(reSet.UnsortedList(), ", "))
- }
- return reIds, nil
- }
- func (self *SSubscriber) GetEnabledReceivers() ([]SReceiver, error) {
- q := ReceiverManager.Query().IsTrue("enabled")
- sq := SubscriberReceiverManager.Query().SubQuery()
- q = q.Join(sq, sqlchemy.Equals(q.Field("id"), sq.Field("receiver_id"))).Filter(sqlchemy.Equals(sq.Field("subscriber_id"), self.Id))
- ret := []SReceiver{}
- return ret, db.FetchModelObjects(ReceiverManager, q, &ret)
- }
- func (self *SSubscriber) GetRobot() (*SRobot, error) {
- robot, err := RobotManager.FetchById(self.Identification)
- if err != nil {
- return nil, errors.Wrapf(err, "RobotManager.FetchById(%s)", self.Identification)
- }
- return robot.(*SRobot), nil
- }
- func (sm *SSubscriberManager) ValidateCreateData(ctx context.Context, userCred mcclient.TokenCredential, ownerId mcclient.IIdentityProvider, query jsonutils.JSONObject, input api.SubscriberCreateInput) (api.SubscriberCreateInput, error) {
- var err error
- // permission check
- sSystem, sDomain := string(rbacscope.ScopeSystem), string(rbacscope.ScopeDomain)
- switch input.Scope {
- case sSystem:
- allow := db.IsAdminAllowCreate(userCred, sm)
- if allow.Result.IsDeny() {
- return input, httperrors.NewForbiddenError("The scope %s and the role of the operator do not match", input.Scope)
- }
- case sDomain:
- allow := db.IsDomainAllowCreate(userCred, sm)
- if allow.Result.IsDeny() {
- return input, httperrors.NewForbiddenError("The scope %s and the role of the operator do not match", input.Scope)
- }
- default:
- return input, httperrors.NewInputParameterError("unknown scope %s", input.Scope)
- }
- input.StandaloneAnonResourceCreateInput, err = sm.SStandaloneAnonResourceBaseManager.ValidateCreateData(ctx, userCred, ownerId, query, input.StandaloneAnonResourceCreateInput)
- if err != nil {
- return input, errors.Wrap(err, "SVirtualResourceBaseManager.ValidateCreateData")
- }
- // check topic
- t, err := TopicManager.FetchById(input.TopicID)
- if err != nil {
- return input, errors.Wrapf(err, "unable to fetch topic %s", input.TopicID)
- }
- // check resource scope
- if !utils.IsInStringArray(input.ResourceScope, []string{api.SUBSCRIBER_SCOPE_SYSTEM, api.SUBSCRIBER_SCOPE_DOMAIN, api.SUBSCRIBER_SCOPE_PROJECT}) {
- return input, httperrors.NewInputParameterError("unknown resource_scope %q", input.ResourceScope)
- }
- // resource Attribution Id
- var domainId string
- switch input.ResourceScope {
- case api.SUBSCRIBER_SCOPE_SYSTEM:
- input.ResourceAttributionId = ""
- input.DomainId = ""
- case api.SUBSCRIBER_SCOPE_PROJECT:
- tenant, err := db.TenantCacheManager.FetchTenantById(ctx, input.ResourceAttributionId)
- if err != nil {
- return input, errors.Wrapf(err, "unable to fetch project %s", input.ResourceAttributionId)
- }
- domainId = tenant.DomainId
- input.DomainId = domainId
- input.ResourceAttributionId = tenant.GetId()
- input.ResourceAttributionName = tenant.GetName()
- case api.SUBSCRIBER_SCOPE_DOMAIN:
- tenant, err := db.TenantCacheManager.FetchDomainByIdOrName(ctx, input.ResourceAttributionId)
- if err != nil {
- return input, errors.Wrapf(err, "unable to fetch domain %s", input.ResourceAttributionId)
- }
- domainId = tenant.Id
- input.DomainId = domainId
- input.ResourceAttributionId = tenant.Id
- input.ResourceAttributionName = tenant.Name
- }
- if input.Scope == sDomain && domainId != userCred.GetProjectDomainId() {
- return input, httperrors.NewForbiddenError("domain %s admin can't create subscriber for domain %s", userCred.GetProjectDomainId(), domainId)
- }
- var checkQuery *sqlchemy.SQuery
- input.TopicID = t.GetId()
- switch input.Type {
- case api.SUBSCRIBER_TYPE_RECEIVER:
- reIds, err := sm.validateReceivers(ctx, input.Receivers)
- if err != nil {
- return input, err
- }
- input.Receivers = reIds
- case api.SUBSCRIBER_TYPE_ROLE:
- if input.RoleScope == "" {
- input.RoleScope = input.ResourceScope
- }
- roleCache, err := db.RoleCacheManager.FetchRoleByIdOrName(ctx, input.Role)
- if err != nil {
- return input, errors.Wrapf(err, "unable find role %q", input.Role)
- }
- input.Role = roleCache.GetId()
- checkQuery = sm.Query().Equals("topic_id", input.TopicID).Equals("type", api.SUBSCRIBER_TYPE_ROLE).Equals("resource_scope", input.ResourceScope).Equals("identification", input.Role).Equals("role_scope", input.RoleScope)
- case api.SUBSCRIBER_TYPE_ROBOT:
- robot, err := RobotManager.FetchByIdOrName(ctx, userCred, input.Robot)
- if errors.Cause(err) == sql.ErrNoRows {
- return input, httperrors.NewInputParameterError("robot %q not found", input.Robot)
- }
- if err != nil {
- return input, errors.Wrapf(err, "unable to fetch robot %q", input.Robot)
- }
- input.Robot = robot.GetId()
- checkQuery = sm.Query().Equals("type", api.SUBSCRIBER_TYPE_ROLE).Equals("topic_id", input.TopicID).Equals("resource_scope", input.ResourceScope).Equals("identification", input.Robot)
- default:
- return input, httperrors.NewInputParameterError("unkown type %q", input.Type)
- }
- // check type+resourceScope+identification
- if checkQuery != nil {
- count, err := checkQuery.CountWithError()
- if err != nil {
- return input, errors.Wrap(err, "unable to count")
- }
- if count > 0 {
- return input, httperrors.NewForbiddenError("repeated with existing subscribers")
- }
- }
- if input.GroupTimes != nil {
- if *input.GroupTimes < 0 {
- return input, httperrors.NewInputParameterError("invalidate group_times %d", input.GroupTimes)
- }
- }
- return input, nil
- }
- func (s *SSubscriber) PostCreate(ctx context.Context, userCred mcclient.TokenCredential, ownerId mcclient.IIdentityProvider, query jsonutils.JSONObject, data jsonutils.JSONObject) {
- s.SStandaloneAnonResourceBase.PostCreate(ctx, userCred, ownerId, query, data)
- var input api.SubscriberCreateInput
- _ = data.Unmarshal(&input)
- if s.Type == api.SUBSCRIBER_TYPE_RECEIVER {
- err := s.SetReceivers(ctx, input.Receivers)
- if err != nil {
- logclient.AddActionLogWithContext(ctx, s, logclient.ACT_CREATE, err.Error(), userCred, false)
- _, err := db.Update(s, func() error {
- s.SetEnabled(false)
- return nil
- })
- if err != nil {
- log.Errorf("unable to enable subscriber: %v", err)
- }
- }
- }
- logclient.AddActionLogWithContext(ctx, s, logclient.ACT_CREATE, "", userCred, true)
- return
- }
- func (s *SSubscriber) CustomizeCreate(ctx context.Context, userCred mcclient.TokenCredential, ownerId mcclient.IIdentityProvider, query jsonutils.JSONObject, data jsonutils.JSONObject) error {
- err := s.SStandaloneAnonResourceBase.CustomizeCreate(ctx, userCred, ownerId, query, data)
- if err != nil {
- return errors.Wrap(err, "SVirtualResourceBase.CustomizeCreate")
- }
- var input api.SubscriberCreateInput
- _ = data.Unmarshal(&input)
- switch input.Type {
- case api.SUBSCRIBER_TYPE_RECEIVER:
- case api.SUBSCRIBER_TYPE_ROBOT:
- s.Identification = input.Robot
- case api.SUBSCRIBER_TYPE_ROLE:
- s.Identification = input.Role
- }
- s.Enabled = tristate.True
- return nil
- }
- func (s *SSubscriber) PerformChange(ctx context.Context, userCred mcclient.TokenCredential, query jsonutils.JSONObject, input api.SubscriberChangeInput) (jsonutils.JSONObject, error) {
- if s.Scope == string(rbacscope.ScopeSystem) {
- if !db.IsAdminAllowUpdate(ctx, userCred, s) {
- return nil, httperrors.NewForbiddenError("")
- }
- } else {
- if !db.IsDomainAllowUpdate(ctx, userCred, s) {
- return nil, httperrors.NewForbiddenError("")
- }
- if s.DomainId != userCred.GetProjectDomainId() {
- return nil, httperrors.NewForbiddenError("")
- }
- }
- switch s.Type {
- case api.SUBSCRIBER_TYPE_RECEIVER:
- err := s.SetReceivers(ctx, input.Receivers)
- if err != nil {
- log.Errorf("unable to set receivers %s", input.Receivers)
- }
- case api.SUBSCRIBER_TYPE_ROBOT:
- _, err := db.Update(s, func() error {
- s.Identification = input.Robot
- return nil
- })
- if err != nil {
- return nil, errors.Wrap(err, "unable to update subscriber")
- }
- case api.SUBSCRIBER_TYPE_ROLE:
- _, err := db.Update(s, func() error {
- s.Identification = input.Role
- if input.RoleScope != "" {
- s.RoleScope = input.RoleScope
- }
- return nil
- })
- if err != nil {
- return nil, errors.Wrap(err, "unable to update subscriber")
- }
- }
- if input.GroupTimes != nil {
- _, err := db.Update(s, func() error {
- s.GroupTimes = *input.GroupTimes
- return nil
- })
- if err != nil {
- return nil, errors.Wrap(err, "unable to update subscriber group_times")
- }
- }
- return nil, nil
- }
- func (sm *SSubscriberManager) ListItemFilter(ctx context.Context, q *sqlchemy.SQuery, userCred mcclient.TokenCredential, input api.SubscriberListInput) (*sqlchemy.SQuery, error) {
- var err error
- q, err = sm.SStandaloneAnonResourceBaseManager.ListItemFilter(ctx, q, userCred, input.StandaloneAnonResourceListInput)
- if err != nil {
- return nil, errors.Wrap(err, "SVirtualResourceBaseManager.ListItemFilter")
- }
- q, err = sm.SEnabledResourceBaseManager.ListItemFilter(ctx, q, userCred, input.EnabledResourceBaseListInput)
- if err != nil {
- return nil, errors.Wrap(err, "SEnabledResourceBaseManager.ListItemFilter")
- }
- sSystem, sDomain := string(rbacscope.ScopeSystem), string(rbacscope.ScopeDomain)
- if input.Scope == "" {
- input.Scope = sSystem
- }
- switch input.Scope {
- case sSystem:
- allow := db.IsAdminAllowList(userCred, sm)
- if allow.Result.IsDeny() {
- return nil, httperrors.NewForbiddenError("")
- }
- case sDomain:
- allow := db.IsDomainAllowList(userCred, sm)
- if allow.Result.IsDeny() {
- return nil, httperrors.NewForbiddenError("")
- }
- q = q.Equals("domain_id", userCred.GetProjectDomainId())
- default:
- return nil, httperrors.NewInputParameterError("unkown scope %s", input.Scope)
- }
- if input.TopicID != "" {
- q = q.Equals("topic_id", input.TopicID)
- }
- if input.Type != "" {
- q = q.Equals("type", input.Type)
- }
- if input.ResourceScope != "" {
- q = q.Equals("resource_scope", input.ResourceScope)
- }
- return q, nil
- }
- func (sm *SSubscriberManager) FetchCustomizeColumns(ctx context.Context, userCred mcclient.TokenCredential, query jsonutils.JSONObject, objs []interface{}, fields stringutils2.SSortedStrings, isList bool) []api.SubscriberDetails {
- var err error
- vRows := sm.SStandaloneAnonResourceBaseManager.FetchCustomizeColumns(ctx, userCred, query, objs, fields, isList)
- rows := make([]api.SubscriberDetails, len(objs))
- for i := range rows {
- rows[i].StandaloneAnonResourceDetails = vRows[i]
- s := objs[i].(*SSubscriber)
- switch s.Type {
- case api.SUBSCRIBER_TYPE_RECEIVER:
- rows[i].Receivers, err = s.receiverIdentifications()
- if err != nil {
- log.Errorf("unable to get receiverIdentifications for subscriber %q: %v", s.Id, err)
- }
- case api.SUBSCRIBER_TYPE_ROBOT:
- rows[i].Robot, err = s.robotIdentification()
- if err != nil {
- log.Errorf("unable get robotIdentification for subscriber %q: %v", s.Id, err)
- }
- case api.SUBSCRIBER_TYPE_ROLE:
- rows[i].Role, err = s.roleIdentification(ctx)
- if err != nil {
- log.Errorf("unable to get roleIdentification for subscriber %q: %v", s.Id, err)
- }
- }
- }
- return rows
- }
- func (s *SSubscriber) CustomizeDelete(ctx context.Context, userCred mcclient.TokenCredential, query jsonutils.JSONObject, data jsonutils.JSONObject) error {
- err := s.SStandaloneAnonResourceBase.CustomizeDelete(ctx, userCred, query, data)
- if err != nil {
- return err
- }
- if s.Scope == string(rbacscope.ScopeSystem) {
- if !db.IsAdminAllowDelete(ctx, userCred, s) {
- return httperrors.NewForbiddenError("")
- }
- } else {
- if !db.IsDomainAllowDelete(ctx, userCred, s) {
- return httperrors.NewForbiddenError("")
- }
- if s.DomainId != userCred.GetProjectDomainId() {
- return httperrors.NewForbiddenError("")
- }
- }
- return nil
- }
- func (s *SSubscriber) receiverIdentifications() ([]api.Identification, error) {
- srSubq := SubscriberReceiverManager.Query().Equals("subscriber_id", s.Id).SubQuery()
- rq := ReceiverManager.Query("id", "name")
- rq = rq.Join(srSubq, sqlchemy.Equals(srSubq.Field("receiver_id"), rq.Field("id")))
- var ret []api.Identification
- err := rq.All(&ret)
- if err != nil {
- return nil, err
- }
- return ret, nil
- }
- func (s *SSubscriber) robotIdentification() (api.Identification, error) {
- var ret api.Identification
- q := RobotManager.Query("id", "name").Equals("id", s.Identification)
- err := q.First(&ret)
- if err != nil {
- return ret, err
- }
- return ret, nil
- }
- func (s *SSubscriber) roleIdentification(ctx context.Context) (api.Identification, error) {
- var ret api.Identification
- roleCache, err := db.RoleCacheManager.FetchRoleById(ctx, s.Identification)
- if err != nil {
- return ret, errors.Wrapf(err, "unable to find role %q", s.Identification)
- }
- ret.ID = s.Identification
- ret.Name = roleCache.Name
- return ret, nil
- }
- func (srm *SSubscriberManager) robot(tid, projectDomainId, projectId string) (map[string]uint32, error) {
- srs, err := srm.findSuitableOnes(tid, projectDomainId, projectId, api.SUBSCRIBER_TYPE_ROBOT)
- if err != nil {
- return nil, err
- }
- robotIds := make(map[string]uint32)
- for i := range srs {
- // robotIds[i] = srs[i].Identification
- robotIds[srs[i].Identification] = srs[i].GroupTimes
- }
- return robotIds, nil
- }
- func (srm *SSubscriberManager) findSuitableOnes(tid, projectDomainId, projectId string, types ...string) ([]SSubscriber, error) {
- q := srm.Query().Equals("topic_id", tid).IsTrue("enabled")
- q = q.Filter(sqlchemy.OR(
- sqlchemy.AND(
- sqlchemy.Equals(q.Field("resource_scope"), api.SUBSCRIBER_SCOPE_PROJECT),
- sqlchemy.Equals(q.Field("resource_attribution_id"), projectId),
- ),
- sqlchemy.AND(
- sqlchemy.Equals(q.Field("resource_scope"), api.SUBSCRIBER_SCOPE_DOMAIN),
- sqlchemy.Equals(q.Field("resource_attribution_id"), projectDomainId),
- ),
- sqlchemy.Equals(q.Field("resource_scope"), api.SUBSCRIBER_SCOPE_SYSTEM),
- ))
- switch len(types) {
- case 0:
- case 1:
- q = q.Equals("type", types[0])
- default:
- q = q.In("type", types)
- }
- srs := make([]SSubscriber, 0, 1)
- err := db.FetchModelObjects(srm, q, &srs)
- if err != nil {
- return nil, err
- }
- return srs, nil
- }
- // TODO: Use cache to increase speed
- func (srm *SSubscriberManager) getReceiversSent(ctx context.Context, tid string, projectDomainId string, projectId string) (map[string]uint32, error) {
- srs, err := srm.findSuitableOnes(tid, projectDomainId, projectId, api.SUBSCRIBER_TYPE_RECEIVER, api.SUBSCRIBER_TYPE_ROLE)
- if err != nil {
- return nil, err
- }
- // 接受人-聚合时间
- receivers := make(map[string]uint32)
- // 角色-接受人
- roleMap := make(map[string][]string, 3)
- // 接受角色-接受人-聚合时间
- receivermap := make(map[string]map[string]uint32, 3)
- // 聚合时间
- roleGroupTimes := 0
- for _, sr := range srs {
- if sr.Type == api.SUBSCRIBER_TYPE_RECEIVER {
- rIds, err := sr.getReceivers()
- if err != nil {
- return nil, errors.Wrap(err, "unable to get receivers")
- }
- for _, receiveId := range rIds {
- // receivers = append(receivers, api.SReceiverWithGroupTimes{ReceiverId: receiveId, GroupTimes: sr.GroupTimes})
- receivers[receiveId] = sr.GroupTimes
- }
- } else if sr.Type == api.SUBSCRIBER_TYPE_ROLE {
- roleGroupTimes = int(sr.GroupTimes)
- roleMap[sr.RoleScope] = append(roleMap[sr.RoleScope], sr.Identification)
- receivermap[sr.RoleScope] = map[string]uint32{}
- }
- }
- errgo, _ := errgroup.WithContext(ctx)
- for _scope, _roles := range roleMap {
- scope, roles := _scope, _roles
- errgo.Go(func() error {
- query := jsonutils.NewDict()
- query.Set("roles", jsonutils.NewStringArray(roles))
- query.Set("effective", jsonutils.JSONTrue)
- switch scope {
- case api.SUBSCRIBER_SCOPE_SYSTEM:
- case api.SUBSCRIBER_SCOPE_DOMAIN:
- if len(projectDomainId) == 0 {
- return fmt.Errorf("need projectDomainId")
- }
- query.Set("project_domain_id", jsonutils.NewString(projectDomainId))
- case api.SUBSCRIBER_SCOPE_PROJECT:
- if len(projectId) == 0 {
- return fmt.Errorf("need projectId")
- }
- query.Add(jsonutils.NewString(projectId), "scope", "project", "id")
- }
- s := auth.GetAdminSession(ctx, "")
- listRet, err := modules.RoleAssignments.List(s, query)
- if err != nil {
- return errors.Wrap(err, "unable to list RoleAssignments")
- }
- for i := range listRet.Data {
- ras := listRet.Data[i]
- user, err := ras.Get("user")
- if err == nil {
- id, err := user.GetString("id")
- if err != nil {
- return errors.Wrap(err, "unable to get user.id from result of RoleAssignments.List")
- }
- if _, ok := receivermap[scope][id]; !ok {
- receivermap[scope][id] = uint32(roleGroupTimes)
- }
- }
- }
- return nil
- })
- }
- err = errgo.Wait()
- if err != nil {
- return nil, err
- }
- for _, res := range receivermap {
- for receive, time := range res {
- if t, ok := receivers[receive]; !ok || t == 0 {
- receivers[receive] = time
- }
- }
- }
- // de-duplication
- return receivers, nil
- }
- func (sr *SSubscriber) getReceivers() ([]string, error) {
- srrs, err := SubscriberReceiverManager.getBySubscriberId(sr.Id)
- if err != nil {
- return nil, err
- }
- rIds := make([]string, len(srrs))
- for i := range srrs {
- rIds[i] = srrs[i].ReceiverId
- }
- return rIds, nil
- }
- func (sr *SSubscriber) SetReceivers(ctx context.Context, receiverIds []string) error {
- srrs, err := SubscriberReceiverManager.getBySubscriberId(sr.Id)
- if err != nil {
- return errors.Wrapf(err, "unable to get SRReceiver by Subscriber %s", sr.Id)
- }
- dbReceivers := make([]string, len(srrs))
- for i := range srrs {
- dbReceivers[i] = srrs[i].ReceiverId
- }
- var addReceivers, rmReceivers []string
- sort.Strings(dbReceivers)
- sort.Strings(receiverIds)
- for i, j := 0, 0; i < len(dbReceivers) || j < len(receiverIds); {
- switch {
- case i == len(dbReceivers):
- addReceivers = append(addReceivers, receiverIds[j])
- j++
- case j == len(receiverIds):
- rmReceivers = append(rmReceivers, dbReceivers[i])
- i++
- case dbReceivers[i] > receiverIds[j]:
- addReceivers = append(addReceivers, receiverIds[j])
- j++
- case dbReceivers[i] < receiverIds[j]:
- rmReceivers = append(rmReceivers, dbReceivers[i])
- i++
- case dbReceivers[i] == receiverIds[j]:
- i++
- j++
- }
- }
- // add
- for _, rId := range addReceivers {
- _, err := SubscriberReceiverManager.create(ctx, sr.Id, rId)
- if err != nil {
- return errors.Wrapf(err, "unable to connect subscription receiver %q with receiver %q", sr.Id, rId)
- }
- }
- for _, rId := range rmReceivers {
- err := SubscriberReceiverManager.delete(sr.Id, rId)
- if err != nil {
- return errors.Wrapf(err, "unable to disconnect subscription receiver %q with receiver %q", sr.Id, rId)
- }
- }
- return nil
- }
- func (s *SSubscriber) PerformSetReceiver(ctx context.Context, userCred mcclient.TokenCredential, query jsonutils.JSONObject, input api.SubscriberSetReceiverInput) (jsonutils.JSONObject, error) {
- reIds, err := SubscriberManager.validateReceivers(ctx, input.Receivers)
- if err != nil {
- return nil, err
- }
- return nil, s.SetReceivers(ctx, reIds)
- }
- func (s *SSubscriber) PerformEnable(ctx context.Context, userCred mcclient.TokenCredential, query jsonutils.JSONObject, input jsonutils.JSONObject) (jsonutils.JSONObject, error) {
- err := db.EnabledPerformEnable(s, ctx, userCred, true)
- if err != nil {
- return nil, errors.Wrap(err, "EnabledPerformEnable")
- }
- return nil, nil
- }
- func (s *SSubscriber) PerformDisable(ctx context.Context, userCred mcclient.TokenCredential, query jsonutils.JSONObject, input jsonutils.JSONObject) (jsonutils.JSONObject, error) {
- err := db.EnabledPerformEnable(s, ctx, userCred, false)
- if err != nil {
- return nil, errors.Wrap(err, "EnabledPerformEnable")
- }
- return nil, nil
- }
- func (sm *SSubscriberManager) QueryDistinctExtraField(q *sqlchemy.SQuery, field string) (*sqlchemy.SQuery, error) {
- q, err := sm.SStandaloneAnonResourceBaseManager.QueryDistinctExtraField(q, field)
- if err == nil {
- return q, nil
- }
- switch field {
- case "resource_scope":
- return sm.Query("resource_scope").Distinct(), nil
- case "type":
- return sm.Query("type").Distinct(), nil
- }
- return q, nil
- }
- var defaultNotifyTopics = []string{
- DefaultServerPanicked,
- DefaultServiceAbnormal,
- DefaultNetOutOfSync,
- DefaultMysqlOutOfSync,
- DefaultActionLogExceedCount,
- }
- func (sm *SSubscriberManager) InitializeData() error {
- ctx := context.Background()
- session := auth.GetAdminSession(ctx, options.Options.Region)
- // 获取系统管理员角色id
- params := map[string]interface{}{
- "project_domain": "default",
- }
- role, err := identity.RolesV3.Get(session, "admin", jsonutils.Marshal(params))
- if err != nil {
- return errors.Wrap(err, "identity.RolesV3.List")
- }
- roleId, _ := role.GetString("id")
- q := TopicManager.Query()
- q = q.Filter(sqlchemy.OR(sqlchemy.In(q.Field("name"), defaultNotifyTopics)))
- topics := []STopic{}
- err = db.FetchModelObjects(TopicManager, q, &topics)
- if err != nil {
- return errors.Wrap(err, "FetchModelObjects topic")
- }
- for _, topic := range topics {
- q := sm.Query()
- q = q.Equals("topic_id", topic.Id)
- q = q.Equals("type", api.SUBSCRIBER_TYPE_ROLE)
- q = q.Equals("identification", roleId)
- count, err := q.CountWithError()
- if err != nil {
- return errors.Wrap(err, "CountWithError")
- }
- if count != 0 {
- continue
- }
- subscriber := SSubscriber{}
- subscriber.Type = api.SUBSCRIBER_TYPE_ROLE
- subscriber.Identification = roleId
- subscriber.TopicId = topic.Id
- subscriber.Scope = api.SUBSCRIBER_SCOPE_SYSTEM
- subscriber.ResourceScope = api.SUBSCRIBER_SCOPE_SYSTEM
- subscriber.Enabled = tristate.True
- sm.TableSpec().Insert(ctx, &subscriber)
- }
- return nil
- }
- // 根据接受人ID获取订阅
- func getSubscriberByReceiverId(receiverId string, showDisabled bool) ([]SSubscriber, error) {
- results := []SSubscriber{}
- tempRes := []SSubscriber{}
- // q1 根据接受人ID查找(优先)
- q1 := SubscriberManager.Query()
- q1 = q1.Equals("type", api.SUBSCRIBER_TYPE_RECEIVER)
- srq := SubscriberReceiverManager.Query().Equals("receiver_id", receiverId)
- srsq := srq.SubQuery()
- if !showDisabled {
- q1 = q1.Equals("enabled", true)
- }
- q1.Join(srsq, sqlchemy.Equals(q1.Field("id"), srsq.Field("subscriber_id")))
- err := db.FetchModelObjects(SubscriberManager, q1, &tempRes)
- if err != nil {
- return nil, errors.Wrap(err, "fetch receiver")
- }
- results = append(results, tempRes...)
- roleArr := []string{}
- // 获取当前接受人所有角色
- s := auth.GetAdminSession(context.Background(), options.Options.Region)
- query := jsonutils.NewDict()
- query.Add(jsonutils.NewString("system"), "scope")
- query.Add(jsonutils.NewString("user"), "resource")
- query.Add(jsonutils.NewBool(true), "details")
- query.Add(jsonutils.NewString("project"), "group_by")
- query.Add(jsonutils.NewBool(true), "effective")
- resp, err := identity.RoleAssignments.GetProjectRole(s, receiverId, query)
- if err != nil {
- return nil, errors.Wrap(err, "UserCacheManager.FetchUserByIdOrName")
- }
- dataArr, _ := resp.GetArray("data")
- for _, data := range dataArr {
- groupArr, _ := data.GetArray("groups")
- for _, group := range groupArr {
- rolesArr, _ := group.GetArray("roles")
- for _, role := range rolesArr {
- roleId, _ := role.GetString("id")
- roleArr = append(roleArr, roleId)
- }
- }
- }
- // q2 根据角色查找
- q2 := SubscriberManager.Query()
- q2 = q2.Equals("type", api.SUBSCRIBER_TYPE_ROLE)
- if !showDisabled {
- q2 = q2.Equals("enabled", true)
- }
- q2 = q2.In("identification", roleArr)
- tempRes = []SSubscriber{}
- err = db.FetchModelObjects(SubscriberManager, q2, &tempRes)
- if err != nil {
- return nil, errors.Wrap(err, "fetch role")
- }
- results = append(results, tempRes...)
- return results, nil
- }
|