| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994 |
- // Copyright 2019 Yunion
- //
- // Licensed under the Apache License, Version 2.0 (the "License");
- // you may not use this file except in compliance with the License.
- // You may obtain a copy of the License at
- //
- // http://www.apache.org/licenses/LICENSE-2.0
- //
- // Unless required by applicable law or agreed to in writing, software
- // distributed under the License is distributed on an "AS IS" BASIS,
- // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- // See the License for the specific language governing permissions and
- // limitations under the License.
- package resources
- import (
- "context"
- "fmt"
- "math"
- "strings"
- "sync"
- "time"
- "yunion.io/x/cloudmux/pkg/cloudprovider"
- "yunion.io/x/jsonutils"
- "yunion.io/x/log"
- "yunion.io/x/pkg/appctx"
- "yunion.io/x/pkg/errors"
- api "yunion.io/x/onecloud/pkg/apis/compute"
- "yunion.io/x/onecloud/pkg/cloudcommon/tsdb"
- "yunion.io/x/onecloud/pkg/cloudmon/options"
- "yunion.io/x/onecloud/pkg/cloudmon/providerdriver"
- "yunion.io/x/onecloud/pkg/mcclient"
- "yunion.io/x/onecloud/pkg/mcclient/auth"
- "yunion.io/x/onecloud/pkg/mcclient/modulebase"
- "yunion.io/x/onecloud/pkg/mcclient/modules/compute"
- "yunion.io/x/onecloud/pkg/mcclient/modules/identity"
- "yunion.io/x/onecloud/pkg/util/influxdb"
- "yunion.io/x/onecloud/pkg/util/logclient"
- )
- type sBaseInfo struct {
- Id string
- ExternalId string
- ManagerId string
- CreatedAt time.Time
- ImportedAt time.Time
- DeletedAt time.Time
- UpdatedAt time.Time
- Metadata map[string]string
- }
- type SBaseResources struct {
- manager modulebase.Manager
- importedAt time.Time
- createdAt time.Time
- deletedAt time.Time
- updatedAt time.Time
- resourceLock sync.Mutex
- Resources map[string]jsonutils.JSONObject
- providerLock sync.Mutex
- ProviderResources map[string]map[string]jsonutils.JSONObject
- }
- func (self *SBaseResources) getResources(ctx context.Context, managerId string) map[string]jsonutils.JSONObject {
- ret := map[string]jsonutils.JSONObject{}
- if len(managerId) == 0 {
- return self.Resources
- }
- res, ok := self.ProviderResources[managerId]
- if ok {
- return res
- }
- return ret
- }
- func (self *SBaseResources) init(ctx context.Context) error {
- s := auth.GetAdminSession(ctx, options.Options.Region)
- query := map[string]interface{}{
- "limit": 20,
- "scope": "system",
- "details": true,
- "order_by.0": "created_at",
- "order_by.1": "imported_at",
- "order": "asc",
- "pending_delete": "all",
- }
- if self.manager.GetKeyword() == compute.Hosts.GetKeyword() { // private and vmware
- query["cloud_env"] = "private_or_onpremise"
- }
- if self.manager.GetKeyword() != compute.Cloudproviders.GetKeyword() {
- query["filter.0"] = "external_id.isnotempty()"
- }
- offset := 0
- for {
- query["offset"] = offset
- resp, err := self.manager.List(s, jsonutils.Marshal(query))
- if err != nil {
- return errors.Wrapf(err, "%s.List", self.manager.GetKeyword())
- }
- offset += len(resp.Data)
- for i := range resp.Data {
- baseInfo := sBaseInfo{}
- resp.Data[i].Unmarshal(&baseInfo)
- if len(baseInfo.ExternalId) == 0 && (self.manager.GetKeyword() != compute.Cloudproviders.GetKeyword() &&
- self.manager.GetKeyword() != compute.Cloudaccounts.GetKeyword() &&
- self.manager.GetKeyword() != identity.Projects.GetKeyword()) {
- continue
- }
- key := baseInfo.ExternalId
- if len(key) == 0 {
- key = baseInfo.Id
- }
- self.resourceLock.Lock()
- self.Resources[key] = resp.Data[i]
- self.resourceLock.Unlock()
- if len(baseInfo.ManagerId) > 0 {
- if _, ok := self.ProviderResources[baseInfo.ManagerId]; !ok {
- self.ProviderResources[baseInfo.ManagerId] = map[string]jsonutils.JSONObject{}
- }
- self.providerLock.Lock()
- self.ProviderResources[baseInfo.ManagerId][key] = resp.Data[i]
- self.providerLock.Unlock()
- }
- if self.importedAt.IsZero() || self.importedAt.Before(baseInfo.ImportedAt) {
- self.importedAt = baseInfo.ImportedAt
- }
- if self.createdAt.IsZero() || self.createdAt.Before(baseInfo.CreatedAt) {
- self.createdAt = baseInfo.CreatedAt
- }
- }
- if offset >= resp.Total {
- break
- }
- }
- self.deletedAt = time.Now()
- self.updatedAt = time.Now()
- self.importedAt = time.Now()
- log.Infof("init %d %s importedAt: %s createdAt: %s", len(self.Resources), self.manager.GetKeyword(), self.importedAt, self.createdAt)
- return nil
- }
- func (self *SBaseResources) increment(ctx context.Context) error {
- s := auth.GetAdminSession(ctx, options.Options.Region)
- timeFilter := fmt.Sprintf("imported_at.gt('%s')", self.importedAt.Format(time.RFC3339))
- query := map[string]interface{}{
- "limit": 20,
- "scope": "system",
- "details": true,
- "order_by.0": "created_at",
- "order_by.1": "imported_at",
- "order": "asc",
- "filter.0": timeFilter,
- }
- if self.manager.GetKeyword() != compute.Cloudproviders.GetKeyword() {
- query["filter.1"] = "external_id.isnotempty()"
- }
- if self.manager.GetKeyword() == compute.Hosts.GetKeyword() {
- query["cloud_env"] = "private_or_onpremise"
- }
- ret := []jsonutils.JSONObject{}
- for {
- query["offset"] = len(ret)
- resp, err := self.manager.List(s, jsonutils.Marshal(query))
- if err != nil {
- return errors.Wrapf(err, "%s.List", self.manager.GetKeyword())
- }
- ret = append(ret, resp.Data...)
- if len(ret) >= resp.Total {
- break
- }
- }
- for i := range ret {
- baseInfo := sBaseInfo{}
- ret[i].Unmarshal(&baseInfo)
- if len(baseInfo.ExternalId) == 0 && (self.manager.GetKeyword() != compute.Cloudproviders.GetKeyword() &&
- self.manager.GetKeyword() != identity.Projects.GetKeyword() &&
- self.manager.GetKeyword() != compute.Cloudaccounts.GetKeyword()) {
- continue
- }
- key := baseInfo.ExternalId
- if len(key) == 0 {
- key = baseInfo.Id
- }
- self.resourceLock.Lock()
- self.Resources[key] = ret[i]
- self.resourceLock.Unlock()
- if len(baseInfo.ManagerId) > 0 {
- if _, ok := self.ProviderResources[baseInfo.ManagerId]; !ok {
- self.ProviderResources[baseInfo.ManagerId] = map[string]jsonutils.JSONObject{}
- }
- self.providerLock.Lock()
- self.ProviderResources[baseInfo.ManagerId][key] = ret[i]
- self.providerLock.Unlock()
- }
- if self.importedAt.IsZero() || self.importedAt.Before(baseInfo.ImportedAt) {
- self.importedAt = baseInfo.ImportedAt
- }
- if self.createdAt.IsZero() || self.createdAt.Before(baseInfo.CreatedAt) {
- self.createdAt = baseInfo.CreatedAt
- }
- }
- log.Infof("increment %d %s", len(ret), self.manager.GetKeyword())
- return nil
- }
- func (self *SBaseResources) decrement(ctx context.Context) error {
- s := auth.GetAdminSession(ctx, options.Options.Region)
- timeFilter := fmt.Sprintf("deleted_at.gt('%s')", self.deletedAt.Format(time.RFC3339))
- query := map[string]interface{}{
- "limit": 20,
- "scope": "system",
- "details": true,
- "order_by.0": "deleted_at",
- "order": "asc",
- "delete": "all",
- "@deleted": "true",
- "filter.0": timeFilter,
- }
- if self.manager.GetKeyword() != compute.Cloudproviders.GetKeyword() {
- query["filter.1"] = "external_id.isnotempty()"
- }
- if self.manager.GetKeyword() == compute.Hosts.GetKeyword() {
- query["cloud_env"] = "private_or_onpremise"
- }
- ret := []jsonutils.JSONObject{}
- for {
- query["offset"] = len(ret)
- resp, err := self.manager.List(s, jsonutils.Marshal(query))
- if err != nil {
- return errors.Wrapf(err, "%s.List", self.manager.GetKeyword())
- }
- ret = append(ret, resp.Data...)
- if len(ret) >= resp.Total {
- break
- }
- }
- for i := range ret {
- baseInfo := sBaseInfo{}
- ret[i].Unmarshal(&baseInfo)
- if len(baseInfo.ExternalId) == 0 && self.manager.GetKeyword() != compute.Cloudproviders.GetKeyword() &&
- self.manager.GetKeyword() != compute.Cloudaccounts.GetKeyword() &&
- self.manager.GetKeyword() != identity.Projects.GetKeyword() {
- continue
- }
- key := baseInfo.ExternalId
- if len(key) == 0 {
- key = baseInfo.Id
- }
- delete(self.Resources, key)
- if len(baseInfo.ManagerId) > 0 {
- providerInfo, ok := self.ProviderResources[baseInfo.ManagerId]
- if ok {
- delete(providerInfo, key)
- self.providerLock.Lock()
- self.ProviderResources[baseInfo.ManagerId] = providerInfo
- self.providerLock.Unlock()
- }
- }
- if self.deletedAt.Before(baseInfo.DeletedAt) {
- self.deletedAt = baseInfo.DeletedAt
- }
- }
- log.Infof("decrement %d %s", len(ret), self.manager.GetKeyword())
- return nil
- }
- func (self *SBaseResources) update(ctx context.Context) error {
- s := auth.GetAdminSession(ctx, options.Options.Region)
- timeFilter := fmt.Sprintf("updated_at.gt('%s')", self.updatedAt.Format(time.RFC3339))
- query := map[string]interface{}{
- "limit": 20,
- "scope": "system",
- "details": true,
- "order_by.0": "updated_at",
- "order": "asc",
- "pending_delete": "all",
- "filter.0": timeFilter,
- }
- if self.manager.GetKeyword() != compute.Cloudproviders.GetKeyword() {
- query["filter.1"] = "external_id.isnotempty()"
- }
- if self.manager.GetKeyword() == compute.Hosts.GetKeyword() {
- query["cloud_env"] = "private_or_onpremise"
- }
- ret := []jsonutils.JSONObject{}
- for {
- query["offset"] = len(ret)
- resp, err := self.manager.List(s, jsonutils.Marshal(query))
- if err != nil {
- return errors.Wrapf(err, "%s.List", self.manager.GetKeyword())
- }
- ret = append(ret, resp.Data...)
- if len(ret) >= resp.Total {
- break
- }
- }
- for i := range ret {
- baseInfo := sBaseInfo{}
- ret[i].Unmarshal(&baseInfo)
- key := baseInfo.ExternalId
- if len(key) == 0 {
- key = baseInfo.Id
- }
- self.resourceLock.Lock()
- self.Resources[key] = ret[i]
- self.resourceLock.Unlock()
- if len(baseInfo.ManagerId) > 0 {
- _, ok := self.ProviderResources[baseInfo.ManagerId]
- if ok {
- self.providerLock.Lock()
- self.ProviderResources[baseInfo.ManagerId][key] = ret[i]
- self.providerLock.Unlock()
- }
- }
- }
- self.updatedAt = time.Now()
- log.Infof("update %d %s", len(ret), self.manager.GetKeyword())
- return nil
- }
- func NewBaseResources(manager modulebase.Manager) *SBaseResources {
- return &SBaseResources{
- manager: manager,
- Resources: map[string]jsonutils.JSONObject{},
- ProviderResources: map[string]map[string]jsonutils.JSONObject{},
- }
- }
- type TResource interface {
- init(ctx context.Context) error
- increment(ctx context.Context) error
- decrement(ctx context.Context) error
- update(ctx context.Context) error
- getResources(ctx context.Context, managerId string) map[string]jsonutils.JSONObject
- }
- type SResources struct {
- init bool
- Cloudaccounts TResource
- Cloudproviders TResource
- DBInstances TResource
- Servers TResource
- Hosts TResource
- Redis TResource
- Loadbalancers TResource
- Buckets TResource
- KubeClusters TResource
- Storages TResource
- ModelartsPool TResource
- Wires TResource
- Projects TResource
- ElasticIps TResource
- }
- func (self *SResources) IsInit() bool {
- return self.init
- }
- func NewResources() *SResources {
- return &SResources{
- Cloudaccounts: NewBaseResources(&compute.Cloudaccounts),
- Cloudproviders: NewBaseResources(&compute.Cloudproviders),
- DBInstances: NewBaseResources(&compute.DBInstance),
- Servers: NewBaseResources(&compute.Servers),
- Hosts: NewBaseResources(&compute.Hosts),
- Storages: NewBaseResources(&compute.Storages),
- Redis: NewBaseResources(&compute.ElasticCache),
- Loadbalancers: NewBaseResources(&compute.Loadbalancers),
- Buckets: NewBaseResources(&compute.Buckets),
- KubeClusters: NewBaseResources(&compute.KubeClusters),
- ModelartsPool: NewBaseResources(&compute.ModelartsPools),
- Wires: NewBaseResources(&compute.Wires),
- Projects: NewBaseResources(&identity.Projects),
- ElasticIps: NewBaseResources(&compute.Elasticips),
- }
- }
- func (self *SResources) Init(ctx context.Context, userCred mcclient.TokenCredential, isStart bool) {
- if isStart {
- err := func() error {
- errs := []error{}
- err := self.Cloudaccounts.init(ctx)
- if err != nil {
- errs = append(errs, errors.Wrapf(err, "Cloudaccount.init"))
- }
- err = self.Projects.init(ctx)
- if err != nil {
- errs = append(errs, errors.Wrapf(err, "Projects.init"))
- }
- err = self.Cloudproviders.init(ctx)
- if err != nil {
- errs = append(errs, errors.Wrapf(err, "Cloudproviders.init"))
- }
- err = self.DBInstances.init(ctx)
- if err != nil {
- errs = append(errs, errors.Wrapf(err, "DBInstances.init"))
- }
- err = self.Servers.init(ctx)
- if err != nil {
- errs = append(errs, errors.Wrapf(err, "Servers.init"))
- }
- err = self.Hosts.init(ctx)
- if err != nil {
- errs = append(errs, errors.Wrapf(err, "Hosts.init"))
- }
- err = self.Storages.init(ctx)
- if err != nil {
- errs = append(errs, errors.Wrapf(err, "Storages.init"))
- }
- err = self.Redis.init(ctx)
- if err != nil {
- errs = append(errs, errors.Wrapf(err, "Redis.init"))
- }
- err = self.Loadbalancers.init(ctx)
- if err != nil {
- errs = append(errs, errors.Wrapf(err, "Loadbalancers.init"))
- }
- err = self.Buckets.init(ctx)
- if err != nil {
- errs = append(errs, errors.Wrapf(err, "Buckets.init"))
- }
- err = self.KubeClusters.init(ctx)
- if err != nil {
- errs = append(errs, errors.Wrapf(err, "KubeClusters.init"))
- }
- err = self.ModelartsPool.init(ctx)
- if err != nil {
- errs = append(errs, errors.Wrapf(err, "ModelartsPool.init"))
- }
- err = self.ElasticIps.init(ctx)
- if err != nil {
- errs = append(errs, errors.Wrapf(err, "ElasticIps.init"))
- }
- err = self.Wires.init(ctx)
- if err != nil {
- errs = append(errs, errors.Wrapf(err, "Wires.init"))
- }
- return errors.NewAggregate(errs)
- }()
- if err != nil {
- log.Errorf("Resource init error: %v", err)
- }
- self.init = true
- }
- }
- var incrementSync = false
- func (self *SResources) IncrementSync(ctx context.Context, userCred mcclient.TokenCredential, isStart bool) {
- if isStart || incrementSync {
- return
- }
- incrementSync = true
- defer func() {
- incrementSync = false
- }()
- err := func() error {
- errs := []error{}
- err := self.Cloudaccounts.increment(ctx)
- if err != nil {
- errs = append(errs, errors.Wrapf(err, "Cloudaccounts.increment"))
- }
- err = self.Projects.increment(ctx)
- if err != nil {
- errs = append(errs, errors.Wrapf(err, "Projects.increment"))
- }
- err = self.Cloudproviders.increment(ctx)
- if err != nil {
- errs = append(errs, errors.Wrapf(err, "Cloudproviders.increment"))
- }
- err = self.DBInstances.increment(ctx)
- if err != nil {
- errs = append(errs, errors.Wrapf(err, "DBInstances.increment"))
- }
- err = self.Servers.increment(ctx)
- if err != nil {
- errs = append(errs, errors.Wrapf(err, "Servers.increment"))
- }
- err = self.Hosts.increment(ctx)
- if err != nil {
- errs = append(errs, errors.Wrapf(err, "Hosts.increment"))
- }
- err = self.Storages.increment(ctx)
- if err != nil {
- errs = append(errs, errors.Wrapf(err, "Storages.increment"))
- }
- err = self.Redis.increment(ctx)
- if err != nil {
- errs = append(errs, errors.Wrapf(err, "Redis.increment"))
- }
- err = self.Loadbalancers.increment(ctx)
- if err != nil {
- errs = append(errs, errors.Wrapf(err, "Loadbalancers.increment"))
- }
- err = self.Buckets.increment(ctx)
- if err != nil {
- errs = append(errs, errors.Wrapf(err, "Buckets.increment"))
- }
- err = self.KubeClusters.increment(ctx)
- if err != nil {
- errs = append(errs, errors.Wrapf(err, "KubeClusters.increment"))
- }
- err = self.ModelartsPool.increment(ctx)
- if err != nil {
- errs = append(errs, errors.Wrapf(err, "ModelartsPool.increment"))
- }
- err = self.ElasticIps.increment(ctx)
- if err != nil {
- errs = append(errs, errors.Wrapf(err, "Elasticips.increment"))
- }
- err = self.Wires.increment(ctx)
- if err != nil {
- errs = append(errs, errors.Wrapf(err, "Wires.increment"))
- }
- return errors.NewAggregate(errs)
- }()
- if err != nil {
- log.Errorf("Increment error: %v", err)
- }
- }
- var decrementSync = false
- func (self *SResources) DecrementSync(ctx context.Context, userCred mcclient.TokenCredential, isStart bool) {
- if isStart || decrementSync {
- return
- }
- decrementSync = true
- defer func() {
- decrementSync = false
- }()
- err := func() error {
- errs := []error{}
- err := self.Cloudaccounts.decrement(ctx)
- if err != nil {
- errs = append(errs, errors.Wrapf(err, "Cloudaccounts.decrement"))
- }
- err = self.Cloudproviders.decrement(ctx)
- if err != nil {
- errs = append(errs, errors.Wrapf(err, "Cloudproviders.decrement"))
- }
- err = self.DBInstances.decrement(ctx)
- if err != nil {
- errs = append(errs, errors.Wrapf(err, "DBInstances.decrement"))
- }
- err = self.Servers.decrement(ctx)
- if err != nil {
- errs = append(errs, errors.Wrapf(err, "Servers.decrement"))
- }
- err = self.Hosts.decrement(ctx)
- if err != nil {
- errs = append(errs, errors.Wrapf(err, "Hosts.decrement"))
- }
- err = self.Storages.decrement(ctx)
- if err != nil {
- errs = append(errs, errors.Wrapf(err, "Storages.decrement"))
- }
- err = self.Redis.decrement(ctx)
- if err != nil {
- errs = append(errs, errors.Wrapf(err, "Redis.decrement"))
- }
- err = self.Loadbalancers.decrement(ctx)
- if err != nil {
- errs = append(errs, errors.Wrapf(err, "Loadbalancers.decrement"))
- }
- err = self.Buckets.decrement(ctx)
- if err != nil {
- errs = append(errs, errors.Wrapf(err, "Buckets.decrement"))
- }
- err = self.KubeClusters.decrement(ctx)
- if err != nil {
- errs = append(errs, errors.Wrapf(err, "KubeClusters.decrement"))
- }
- err = self.ModelartsPool.decrement(ctx)
- if err != nil {
- errs = append(errs, errors.Wrapf(err, "ModelartsPool.decrement"))
- }
- err = self.Wires.decrement(ctx)
- if err != nil {
- errs = append(errs, errors.Wrapf(err, "ModelartsPool.decrement"))
- }
- err = self.ElasticIps.decrement(ctx)
- if err != nil {
- errs = append(errs, errors.Wrapf(err, "ElasticIps.decrement"))
- }
- err = self.Projects.decrement(ctx)
- if err != nil {
- errs = append(errs, errors.Wrapf(err, "Projects.decrement"))
- }
- return errors.NewAggregate(errs)
- }()
- if err != nil {
- log.Errorf("Increment error: %v", err)
- }
- }
- var updateSync = false
- func (self *SResources) UpdateSync(ctx context.Context, userCred mcclient.TokenCredential, isStart bool) {
- if isStart || updateSync {
- return
- }
- updateSync = true
- defer func() {
- updateSync = false
- }()
- err := func() error {
- errs := []error{}
- err := self.Cloudaccounts.update(ctx)
- if err != nil {
- errs = append(errs, errors.Wrapf(err, "Cloudacconts.update"))
- }
- err = self.Projects.update(ctx)
- if err != nil {
- errs = append(errs, errors.Wrapf(err, "Projects.update"))
- }
- err = self.DBInstances.update(ctx)
- if err != nil {
- errs = append(errs, errors.Wrapf(err, "DBInstances.update"))
- }
- err = self.Servers.update(ctx)
- if err != nil {
- errs = append(errs, errors.Wrapf(err, "Servers.update"))
- }
- err = self.Hosts.update(ctx)
- if err != nil {
- errs = append(errs, errors.Wrapf(err, "Hosts.update"))
- }
- err = self.Storages.update(ctx)
- if err != nil {
- errs = append(errs, errors.Wrapf(err, "Storages.update"))
- }
- err = self.Redis.update(ctx)
- if err != nil {
- errs = append(errs, errors.Wrapf(err, "Redis.update"))
- }
- err = self.Loadbalancers.update(ctx)
- if err != nil {
- errs = append(errs, errors.Wrapf(err, "Loadbalancers.update"))
- }
- err = self.ModelartsPool.update(ctx)
- if err != nil {
- errs = append(errs, errors.Wrapf(err, "ModelartsPool.update"))
- }
- return errors.NewAggregate(errs)
- }()
- if err != nil {
- log.Errorf("Update error: %v", err)
- }
- }
- type sMetricProvider struct {
- api.CloudproviderDetails
- }
- func (p sMetricProvider) GetId() string {
- return p.Id
- }
- func (p sMetricProvider) GetName() string {
- return p.Name
- }
- func (p sMetricProvider) Keyword() string {
- return "cloudprovider"
- }
- func (res *SResources) CollectMetrics(ctx context.Context, userCred mcclient.TokenCredential, taskStartTime time.Time, isStart bool) {
- if isStart {
- return
- }
- ch := make(chan struct{}, options.Options.CloudAccountCollectMetricsBatchCount)
- defer close(ch)
- s := auth.GetAdminSession(ctx, options.Options.Region)
- resources := res.Cloudproviders.getResources(ctx, "")
- cloudproviders := map[string]api.CloudproviderDetails{}
- jsonutils.Update(&cloudproviders, resources)
- az, _ := time.LoadLocation(options.Options.TimeZone)
- _endTime := taskStartTime.In(az)
- _startTime := _endTime.Add(-1 * time.Minute * time.Duration(options.Options.CollectMetricInterval))
- var wg sync.WaitGroup
- for i := range cloudproviders {
- ch <- struct{}{}
- wg.Add(1)
- goctx := context.WithValue(ctx, appctx.APP_CONTEXT_KEY_START_TIME, time.Now().UTC())
- go func(ctx context.Context, manager api.CloudproviderDetails) {
- succ := true
- msgs := make([]string, 0)
- defer func() {
- if len(msgs) > 0 {
- logclient.AddActionLogWithContext(ctx, &sMetricProvider{manager}, logclient.ACT_COLLECT_METRICS, strings.Join(msgs, ";"), userCred, succ)
- }
- wg.Done()
- <-ch
- }()
- if strings.Contains(strings.ToLower(options.Options.SkipMetricPullProviders), strings.ToLower(manager.Provider)) {
- logmsg := fmt.Sprintf("skip %s metric pull with options: %s", manager.Provider, options.Options.SkipMetricPullProviders)
- log.Infoln(logmsg)
- return
- }
- driver, err := providerdriver.GetDriver(manager.Provider)
- if err != nil {
- logmsg := fmt.Sprintf("failed get provider %s(%s) driver %v", manager.Name, manager.Provider, err)
- log.Errorln(logmsg)
- msgs = append(msgs, logmsg)
- succ = false
- return
- }
- if !driver.IsSupportMetrics() {
- logmsg := fmt.Sprintf("%s not support metrics, skip", driver.GetProvider())
- log.Infoln(logmsg)
- return
- }
- provider, err := compute.Cloudproviders.GetProvider(ctx, s, manager.Id)
- if err != nil {
- logmsg := fmt.Sprintf("failed get provider %s(%s) driver %v", manager.Name, manager.Provider, err)
- log.Errorln(logmsg)
- msgs = append(msgs, logmsg)
- succ = false
- return
- }
- duration := driver.GetDelayDuration()
- endTime := _endTime.Add(-1 * duration)
- startTime := _startTime.Add(-1 * duration).Add(time.Second * -59)
- resources = res.DBInstances.getResources(ctx, manager.Id)
- dbinstances := map[string]api.DBInstanceDetails{}
- err = jsonutils.Update(&dbinstances, resources)
- if err != nil {
- logmsg := fmt.Sprintf("unmarshal rds resources error: %v", err)
- log.Errorln(logmsg)
- msgs = append(msgs, logmsg)
- succ = false
- }
- if len(dbinstances) > 0 {
- err = driver.CollectDBInstanceMetrics(ctx, manager, provider, dbinstances, startTime, endTime)
- if err != nil && errors.Cause(err) != cloudprovider.ErrNotImplemented && errors.Cause(err) != cloudprovider.ErrNotSupported {
- logmsg := fmt.Sprintf("CollectDBInstanceMetrics for %s(%s) error: %v", manager.Name, manager.Provider, err)
- log.Errorln(logmsg)
- msgs = append(msgs, logmsg)
- succ = false
- }
- }
- resources = res.Servers.getResources(ctx, manager.Id)
- servers := map[string]api.ServerDetails{}
- err = jsonutils.Update(&servers, resources)
- if err != nil {
- logmsg := fmt.Sprintf("unmarsha server resources error: %v", err)
- log.Errorln(logmsg)
- msgs = append(msgs, logmsg)
- succ = false
- }
- if len(servers) > 0 {
- err = driver.CollectServerMetrics(ctx, manager, provider, servers, startTime, endTime)
- if err != nil && errors.Cause(err) != cloudprovider.ErrNotImplemented && errors.Cause(err) != cloudprovider.ErrNotSupported {
- logmsg := fmt.Sprintf("CollectServerMetrics for %s(%s) error: %v", manager.Name, manager.Provider, err)
- log.Errorf("%s", logmsg)
- msgs = append(msgs, logmsg)
- succ = false
- }
- }
- resources = res.Hosts.getResources(ctx, manager.Id)
- hosts := map[string]api.HostDetails{}
- err = jsonutils.Update(&hosts, resources)
- if err != nil {
- logmsg := fmt.Sprintf("unmarsha host resources error: %v", err)
- log.Errorln(logmsg)
- msgs = append(msgs, logmsg)
- succ = false
- }
- if len(hosts) > 0 {
- err = driver.CollectHostMetrics(ctx, manager, provider, hosts, startTime, endTime)
- if err != nil && errors.Cause(err) != cloudprovider.ErrNotImplemented && errors.Cause(err) != cloudprovider.ErrNotSupported {
- logmsg := fmt.Sprintf("CollectHostMetrics for %s(%s) error: %v", manager.Name, manager.Provider, err)
- log.Errorf("%s", logmsg)
- msgs = append(msgs, logmsg)
- succ = false
- }
- }
- resources = res.Storages.getResources(ctx, manager.Id)
- storages := map[string]api.StorageDetails{}
- err = jsonutils.Update(&storages, resources)
- if err != nil {
- logmsg := fmt.Sprintf("unmarsha storage resources error: %v", err)
- log.Errorf("%s", logmsg)
- msgs = append(msgs, logmsg)
- succ = false
- }
- if len(storages) > 0 {
- err = driver.CollectStorageMetrics(ctx, manager, provider, storages, startTime, endTime)
- if err != nil && errors.Cause(err) != cloudprovider.ErrNotImplemented && errors.Cause(err) != cloudprovider.ErrNotSupported {
- logmsg := fmt.Sprintf("CollectStorageMetrics for %s(%s) error: %v", manager.Name, manager.Provider, err)
- log.Errorf("%s", logmsg)
- msgs = append(msgs, logmsg)
- succ = false
- }
- }
- resources = res.Redis.getResources(ctx, manager.Id)
- caches := map[string]api.ElasticcacheDetails{}
- err = jsonutils.Update(&caches, resources)
- if err != nil {
- logmsg := fmt.Sprintf("unmarsha redis resources error: %v", err)
- log.Errorf("%s", logmsg)
- msgs = append(msgs, logmsg)
- succ = false
- }
- if len(caches) > 0 {
- err = driver.CollectRedisMetrics(ctx, manager, provider, caches, startTime, endTime)
- if err != nil && errors.Cause(err) != cloudprovider.ErrNotImplemented && errors.Cause(err) != cloudprovider.ErrNotSupported {
- logmsg := fmt.Sprintf("CollectRedisMetrics for %s(%s) error: %v", manager.Name, manager.Provider, err)
- log.Errorf("%s", logmsg)
- msgs = append(msgs, logmsg)
- succ = false
- }
- }
- resources = res.Loadbalancers.getResources(ctx, manager.Id)
- lbs := map[string]api.LoadbalancerDetails{}
- err = jsonutils.Update(&lbs, resources)
- if err != nil {
- logmsg := fmt.Sprintf("unmarsha lb resources error: %v", err)
- log.Errorf("%s", logmsg)
- msgs = append(msgs, logmsg)
- succ = false
- }
- if len(lbs) > 0 {
- err = driver.CollectLoadbalancerMetrics(ctx, manager, provider, lbs, startTime, endTime)
- if err != nil && errors.Cause(err) != cloudprovider.ErrNotImplemented && errors.Cause(err) != cloudprovider.ErrNotSupported {
- logmsg := fmt.Sprintf("CollectLoadbalancerMetrics for %s(%s) error: %v", manager.Name, manager.Provider, err)
- log.Errorf("%s", logmsg)
- msgs = append(msgs, logmsg)
- succ = false
- }
- }
- resources = res.Buckets.getResources(ctx, manager.Id)
- buckets := map[string]api.BucketDetails{}
- err = jsonutils.Update(&buckets, resources)
- if err != nil {
- logmsg := fmt.Sprintf("unmarsha bucket resources error: %v", err)
- log.Errorf("%s", logmsg)
- msgs = append(msgs, logmsg)
- succ = false
- }
- if len(buckets) > 0 {
- err = driver.CollectBucketMetrics(ctx, manager, provider, buckets, startTime, endTime)
- if err != nil && errors.Cause(err) != cloudprovider.ErrNotImplemented && errors.Cause(err) != cloudprovider.ErrNotSupported {
- logmsg := fmt.Sprintf("CollectBucketMetrics for %s(%s) error: %v", manager.Name, manager.Provider, err)
- log.Errorf("%s", logmsg)
- msgs = append(msgs, logmsg)
- succ = false
- }
- }
- resources = res.KubeClusters.getResources(ctx, manager.Id)
- clusters := map[string]api.KubeClusterDetails{}
- err = jsonutils.Update(&clusters, resources)
- if err != nil {
- logmsg := fmt.Sprintf("unmarsha k8s resources error: %v", err)
- log.Errorln(logmsg)
- msgs = append(msgs, logmsg)
- succ = false
- }
- if len(clusters) > 0 {
- err = driver.CollectK8sMetrics(ctx, manager, provider, clusters, startTime, endTime)
- if err != nil && errors.Cause(err) != cloudprovider.ErrNotImplemented && errors.Cause(err) != cloudprovider.ErrNotSupported {
- logmsg := fmt.Sprintf("CollectK8sMetrics for %s(%s) error: %v", manager.Name, manager.Provider, err)
- log.Errorln(logmsg)
- msgs = append(msgs, logmsg)
- succ = false
- }
- }
- resources = res.ModelartsPool.getResources(ctx, manager.Id)
- pools := map[string]api.ModelartsPoolDetails{}
- err = jsonutils.Update(&pools, resources)
- if err != nil {
- logmsg := fmt.Sprintf("unmarsha modelarts resources error: %v", err)
- log.Errorln(logmsg)
- msgs = append(msgs, logmsg)
- succ = false
- }
- if len(pools) > 0 {
- err = driver.CollectModelartsPoolMetrics(ctx, manager, provider, pools, startTime, endTime)
- if err != nil && errors.Cause(err) != cloudprovider.ErrNotImplemented && errors.Cause(err) != cloudprovider.ErrNotSupported {
- logmsg := fmt.Sprintf("CollectModelartsPoolMetrics for %s(%s) error: %v", manager.Name, manager.Provider, err)
- log.Errorln(logmsg)
- msgs = append(msgs, logmsg)
- succ = false
- }
- }
- resources = res.Wires.getResources(ctx, manager.Id)
- wires := map[string]api.WireDetails{}
- err = jsonutils.Update(&wires, resources)
- if err != nil {
- logmsg := fmt.Sprintf("unmarsha wires resources error: %v", err)
- log.Errorln(logmsg)
- msgs = append(msgs, logmsg)
- succ = false
- }
- if len(wires) > 0 {
- err = driver.CollectWireMetrics(ctx, manager, provider, wires, startTime, endTime)
- if err != nil && errors.Cause(err) != cloudprovider.ErrNotImplemented && errors.Cause(err) != cloudprovider.ErrNotSupported {
- logmsg := fmt.Sprintf("CollectWireMetrics for %s(%s) error: %v", manager.Name, manager.Provider, err)
- log.Errorln(logmsg)
- msgs = append(msgs, logmsg)
- succ = false
- }
- }
- resources = res.ElasticIps.getResources(ctx, manager.Id)
- eips := map[string]api.ElasticipDetails{}
- err = jsonutils.Update(&eips, resources)
- if err != nil {
- logmsg := fmt.Sprintf("unmarsha eips resources error: %v", err)
- log.Errorln(logmsg)
- msgs = append(msgs, logmsg)
- succ = false
- }
- if len(eips) > 0 {
- err = driver.CollectEipMetrics(ctx, manager, provider, eips, startTime, endTime)
- if err != nil && errors.Cause(err) != cloudprovider.ErrNotImplemented && errors.Cause(err) != cloudprovider.ErrNotSupported {
- logmsg := fmt.Sprintf("CollectEipMetrics for %s(%s) error: %v", manager.Name, manager.Provider, err)
- log.Errorln(logmsg)
- msgs = append(msgs, logmsg)
- succ = false
- }
- }
- }(goctx, cloudproviders[i])
- }
- wg.Wait()
- resources = res.Cloudaccounts.getResources(ctx, "")
- accounts := map[string]api.CloudaccountDetail{}
- jsonutils.Update(&accounts, resources)
- metrics := []influxdb.SMetricData{}
- for _, account := range accounts {
- driver, err := providerdriver.GetDriver(account.Provider)
- if err != nil {
- log.Errorf("failed get account %s(%s) driver %v", account.Name, account.Provider, err)
- continue
- }
- if math.Abs(account.Balance) < 0.000001 {
- continue
- }
- metric, err := driver.CollectAccountMetrics(ctx, account)
- if err != nil {
- continue
- }
- metrics = append(metrics, metric)
- }
- log.Debugf("send %d account metrics to meter_db", len(metrics))
- urls, err := tsdb.GetDefaultServiceSourceURLs(s, options.Options.SessionEndpointType)
- if err != nil {
- log.Errorf("Get influxdb %s service url: %v", options.Options.SessionEndpointType, err)
- return
- }
- if err := influxdb.SendMetrics(urls, "meter_db", metrics, false); err != nil {
- log.Errorf("SendMetrics to meter_db: %v", err)
- return
- }
- }
|