resources.go 31 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994
  1. // Copyright 2019 Yunion
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package resources
  15. import (
  16. "context"
  17. "fmt"
  18. "math"
  19. "strings"
  20. "sync"
  21. "time"
  22. "yunion.io/x/cloudmux/pkg/cloudprovider"
  23. "yunion.io/x/jsonutils"
  24. "yunion.io/x/log"
  25. "yunion.io/x/pkg/appctx"
  26. "yunion.io/x/pkg/errors"
  27. api "yunion.io/x/onecloud/pkg/apis/compute"
  28. "yunion.io/x/onecloud/pkg/cloudcommon/tsdb"
  29. "yunion.io/x/onecloud/pkg/cloudmon/options"
  30. "yunion.io/x/onecloud/pkg/cloudmon/providerdriver"
  31. "yunion.io/x/onecloud/pkg/mcclient"
  32. "yunion.io/x/onecloud/pkg/mcclient/auth"
  33. "yunion.io/x/onecloud/pkg/mcclient/modulebase"
  34. "yunion.io/x/onecloud/pkg/mcclient/modules/compute"
  35. "yunion.io/x/onecloud/pkg/mcclient/modules/identity"
  36. "yunion.io/x/onecloud/pkg/util/influxdb"
  37. "yunion.io/x/onecloud/pkg/util/logclient"
  38. )
  39. type sBaseInfo struct {
  40. Id string
  41. ExternalId string
  42. ManagerId string
  43. CreatedAt time.Time
  44. ImportedAt time.Time
  45. DeletedAt time.Time
  46. UpdatedAt time.Time
  47. Metadata map[string]string
  48. }
  49. type SBaseResources struct {
  50. manager modulebase.Manager
  51. importedAt time.Time
  52. createdAt time.Time
  53. deletedAt time.Time
  54. updatedAt time.Time
  55. resourceLock sync.Mutex
  56. Resources map[string]jsonutils.JSONObject
  57. providerLock sync.Mutex
  58. ProviderResources map[string]map[string]jsonutils.JSONObject
  59. }
  60. func (self *SBaseResources) getResources(ctx context.Context, managerId string) map[string]jsonutils.JSONObject {
  61. ret := map[string]jsonutils.JSONObject{}
  62. if len(managerId) == 0 {
  63. return self.Resources
  64. }
  65. res, ok := self.ProviderResources[managerId]
  66. if ok {
  67. return res
  68. }
  69. return ret
  70. }
  71. func (self *SBaseResources) init(ctx context.Context) error {
  72. s := auth.GetAdminSession(ctx, options.Options.Region)
  73. query := map[string]interface{}{
  74. "limit": 20,
  75. "scope": "system",
  76. "details": true,
  77. "order_by.0": "created_at",
  78. "order_by.1": "imported_at",
  79. "order": "asc",
  80. "pending_delete": "all",
  81. }
  82. if self.manager.GetKeyword() == compute.Hosts.GetKeyword() { // private and vmware
  83. query["cloud_env"] = "private_or_onpremise"
  84. }
  85. if self.manager.GetKeyword() != compute.Cloudproviders.GetKeyword() {
  86. query["filter.0"] = "external_id.isnotempty()"
  87. }
  88. offset := 0
  89. for {
  90. query["offset"] = offset
  91. resp, err := self.manager.List(s, jsonutils.Marshal(query))
  92. if err != nil {
  93. return errors.Wrapf(err, "%s.List", self.manager.GetKeyword())
  94. }
  95. offset += len(resp.Data)
  96. for i := range resp.Data {
  97. baseInfo := sBaseInfo{}
  98. resp.Data[i].Unmarshal(&baseInfo)
  99. if len(baseInfo.ExternalId) == 0 && (self.manager.GetKeyword() != compute.Cloudproviders.GetKeyword() &&
  100. self.manager.GetKeyword() != compute.Cloudaccounts.GetKeyword() &&
  101. self.manager.GetKeyword() != identity.Projects.GetKeyword()) {
  102. continue
  103. }
  104. key := baseInfo.ExternalId
  105. if len(key) == 0 {
  106. key = baseInfo.Id
  107. }
  108. self.resourceLock.Lock()
  109. self.Resources[key] = resp.Data[i]
  110. self.resourceLock.Unlock()
  111. if len(baseInfo.ManagerId) > 0 {
  112. if _, ok := self.ProviderResources[baseInfo.ManagerId]; !ok {
  113. self.ProviderResources[baseInfo.ManagerId] = map[string]jsonutils.JSONObject{}
  114. }
  115. self.providerLock.Lock()
  116. self.ProviderResources[baseInfo.ManagerId][key] = resp.Data[i]
  117. self.providerLock.Unlock()
  118. }
  119. if self.importedAt.IsZero() || self.importedAt.Before(baseInfo.ImportedAt) {
  120. self.importedAt = baseInfo.ImportedAt
  121. }
  122. if self.createdAt.IsZero() || self.createdAt.Before(baseInfo.CreatedAt) {
  123. self.createdAt = baseInfo.CreatedAt
  124. }
  125. }
  126. if offset >= resp.Total {
  127. break
  128. }
  129. }
  130. self.deletedAt = time.Now()
  131. self.updatedAt = time.Now()
  132. self.importedAt = time.Now()
  133. log.Infof("init %d %s importedAt: %s createdAt: %s", len(self.Resources), self.manager.GetKeyword(), self.importedAt, self.createdAt)
  134. return nil
  135. }
  136. func (self *SBaseResources) increment(ctx context.Context) error {
  137. s := auth.GetAdminSession(ctx, options.Options.Region)
  138. timeFilter := fmt.Sprintf("imported_at.gt('%s')", self.importedAt.Format(time.RFC3339))
  139. query := map[string]interface{}{
  140. "limit": 20,
  141. "scope": "system",
  142. "details": true,
  143. "order_by.0": "created_at",
  144. "order_by.1": "imported_at",
  145. "order": "asc",
  146. "filter.0": timeFilter,
  147. }
  148. if self.manager.GetKeyword() != compute.Cloudproviders.GetKeyword() {
  149. query["filter.1"] = "external_id.isnotempty()"
  150. }
  151. if self.manager.GetKeyword() == compute.Hosts.GetKeyword() {
  152. query["cloud_env"] = "private_or_onpremise"
  153. }
  154. ret := []jsonutils.JSONObject{}
  155. for {
  156. query["offset"] = len(ret)
  157. resp, err := self.manager.List(s, jsonutils.Marshal(query))
  158. if err != nil {
  159. return errors.Wrapf(err, "%s.List", self.manager.GetKeyword())
  160. }
  161. ret = append(ret, resp.Data...)
  162. if len(ret) >= resp.Total {
  163. break
  164. }
  165. }
  166. for i := range ret {
  167. baseInfo := sBaseInfo{}
  168. ret[i].Unmarshal(&baseInfo)
  169. if len(baseInfo.ExternalId) == 0 && (self.manager.GetKeyword() != compute.Cloudproviders.GetKeyword() &&
  170. self.manager.GetKeyword() != identity.Projects.GetKeyword() &&
  171. self.manager.GetKeyword() != compute.Cloudaccounts.GetKeyword()) {
  172. continue
  173. }
  174. key := baseInfo.ExternalId
  175. if len(key) == 0 {
  176. key = baseInfo.Id
  177. }
  178. self.resourceLock.Lock()
  179. self.Resources[key] = ret[i]
  180. self.resourceLock.Unlock()
  181. if len(baseInfo.ManagerId) > 0 {
  182. if _, ok := self.ProviderResources[baseInfo.ManagerId]; !ok {
  183. self.ProviderResources[baseInfo.ManagerId] = map[string]jsonutils.JSONObject{}
  184. }
  185. self.providerLock.Lock()
  186. self.ProviderResources[baseInfo.ManagerId][key] = ret[i]
  187. self.providerLock.Unlock()
  188. }
  189. if self.importedAt.IsZero() || self.importedAt.Before(baseInfo.ImportedAt) {
  190. self.importedAt = baseInfo.ImportedAt
  191. }
  192. if self.createdAt.IsZero() || self.createdAt.Before(baseInfo.CreatedAt) {
  193. self.createdAt = baseInfo.CreatedAt
  194. }
  195. }
  196. log.Infof("increment %d %s", len(ret), self.manager.GetKeyword())
  197. return nil
  198. }
  199. func (self *SBaseResources) decrement(ctx context.Context) error {
  200. s := auth.GetAdminSession(ctx, options.Options.Region)
  201. timeFilter := fmt.Sprintf("deleted_at.gt('%s')", self.deletedAt.Format(time.RFC3339))
  202. query := map[string]interface{}{
  203. "limit": 20,
  204. "scope": "system",
  205. "details": true,
  206. "order_by.0": "deleted_at",
  207. "order": "asc",
  208. "delete": "all",
  209. "@deleted": "true",
  210. "filter.0": timeFilter,
  211. }
  212. if self.manager.GetKeyword() != compute.Cloudproviders.GetKeyword() {
  213. query["filter.1"] = "external_id.isnotempty()"
  214. }
  215. if self.manager.GetKeyword() == compute.Hosts.GetKeyword() {
  216. query["cloud_env"] = "private_or_onpremise"
  217. }
  218. ret := []jsonutils.JSONObject{}
  219. for {
  220. query["offset"] = len(ret)
  221. resp, err := self.manager.List(s, jsonutils.Marshal(query))
  222. if err != nil {
  223. return errors.Wrapf(err, "%s.List", self.manager.GetKeyword())
  224. }
  225. ret = append(ret, resp.Data...)
  226. if len(ret) >= resp.Total {
  227. break
  228. }
  229. }
  230. for i := range ret {
  231. baseInfo := sBaseInfo{}
  232. ret[i].Unmarshal(&baseInfo)
  233. if len(baseInfo.ExternalId) == 0 && self.manager.GetKeyword() != compute.Cloudproviders.GetKeyword() &&
  234. self.manager.GetKeyword() != compute.Cloudaccounts.GetKeyword() &&
  235. self.manager.GetKeyword() != identity.Projects.GetKeyword() {
  236. continue
  237. }
  238. key := baseInfo.ExternalId
  239. if len(key) == 0 {
  240. key = baseInfo.Id
  241. }
  242. delete(self.Resources, key)
  243. if len(baseInfo.ManagerId) > 0 {
  244. providerInfo, ok := self.ProviderResources[baseInfo.ManagerId]
  245. if ok {
  246. delete(providerInfo, key)
  247. self.providerLock.Lock()
  248. self.ProviderResources[baseInfo.ManagerId] = providerInfo
  249. self.providerLock.Unlock()
  250. }
  251. }
  252. if self.deletedAt.Before(baseInfo.DeletedAt) {
  253. self.deletedAt = baseInfo.DeletedAt
  254. }
  255. }
  256. log.Infof("decrement %d %s", len(ret), self.manager.GetKeyword())
  257. return nil
  258. }
  259. func (self *SBaseResources) update(ctx context.Context) error {
  260. s := auth.GetAdminSession(ctx, options.Options.Region)
  261. timeFilter := fmt.Sprintf("updated_at.gt('%s')", self.updatedAt.Format(time.RFC3339))
  262. query := map[string]interface{}{
  263. "limit": 20,
  264. "scope": "system",
  265. "details": true,
  266. "order_by.0": "updated_at",
  267. "order": "asc",
  268. "pending_delete": "all",
  269. "filter.0": timeFilter,
  270. }
  271. if self.manager.GetKeyword() != compute.Cloudproviders.GetKeyword() {
  272. query["filter.1"] = "external_id.isnotempty()"
  273. }
  274. if self.manager.GetKeyword() == compute.Hosts.GetKeyword() {
  275. query["cloud_env"] = "private_or_onpremise"
  276. }
  277. ret := []jsonutils.JSONObject{}
  278. for {
  279. query["offset"] = len(ret)
  280. resp, err := self.manager.List(s, jsonutils.Marshal(query))
  281. if err != nil {
  282. return errors.Wrapf(err, "%s.List", self.manager.GetKeyword())
  283. }
  284. ret = append(ret, resp.Data...)
  285. if len(ret) >= resp.Total {
  286. break
  287. }
  288. }
  289. for i := range ret {
  290. baseInfo := sBaseInfo{}
  291. ret[i].Unmarshal(&baseInfo)
  292. key := baseInfo.ExternalId
  293. if len(key) == 0 {
  294. key = baseInfo.Id
  295. }
  296. self.resourceLock.Lock()
  297. self.Resources[key] = ret[i]
  298. self.resourceLock.Unlock()
  299. if len(baseInfo.ManagerId) > 0 {
  300. _, ok := self.ProviderResources[baseInfo.ManagerId]
  301. if ok {
  302. self.providerLock.Lock()
  303. self.ProviderResources[baseInfo.ManagerId][key] = ret[i]
  304. self.providerLock.Unlock()
  305. }
  306. }
  307. }
  308. self.updatedAt = time.Now()
  309. log.Infof("update %d %s", len(ret), self.manager.GetKeyword())
  310. return nil
  311. }
  312. func NewBaseResources(manager modulebase.Manager) *SBaseResources {
  313. return &SBaseResources{
  314. manager: manager,
  315. Resources: map[string]jsonutils.JSONObject{},
  316. ProviderResources: map[string]map[string]jsonutils.JSONObject{},
  317. }
  318. }
  319. type TResource interface {
  320. init(ctx context.Context) error
  321. increment(ctx context.Context) error
  322. decrement(ctx context.Context) error
  323. update(ctx context.Context) error
  324. getResources(ctx context.Context, managerId string) map[string]jsonutils.JSONObject
  325. }
  326. type SResources struct {
  327. init bool
  328. Cloudaccounts TResource
  329. Cloudproviders TResource
  330. DBInstances TResource
  331. Servers TResource
  332. Hosts TResource
  333. Redis TResource
  334. Loadbalancers TResource
  335. Buckets TResource
  336. KubeClusters TResource
  337. Storages TResource
  338. ModelartsPool TResource
  339. Wires TResource
  340. Projects TResource
  341. ElasticIps TResource
  342. }
  343. func (self *SResources) IsInit() bool {
  344. return self.init
  345. }
  346. func NewResources() *SResources {
  347. return &SResources{
  348. Cloudaccounts: NewBaseResources(&compute.Cloudaccounts),
  349. Cloudproviders: NewBaseResources(&compute.Cloudproviders),
  350. DBInstances: NewBaseResources(&compute.DBInstance),
  351. Servers: NewBaseResources(&compute.Servers),
  352. Hosts: NewBaseResources(&compute.Hosts),
  353. Storages: NewBaseResources(&compute.Storages),
  354. Redis: NewBaseResources(&compute.ElasticCache),
  355. Loadbalancers: NewBaseResources(&compute.Loadbalancers),
  356. Buckets: NewBaseResources(&compute.Buckets),
  357. KubeClusters: NewBaseResources(&compute.KubeClusters),
  358. ModelartsPool: NewBaseResources(&compute.ModelartsPools),
  359. Wires: NewBaseResources(&compute.Wires),
  360. Projects: NewBaseResources(&identity.Projects),
  361. ElasticIps: NewBaseResources(&compute.Elasticips),
  362. }
  363. }
  364. func (self *SResources) Init(ctx context.Context, userCred mcclient.TokenCredential, isStart bool) {
  365. if isStart {
  366. err := func() error {
  367. errs := []error{}
  368. err := self.Cloudaccounts.init(ctx)
  369. if err != nil {
  370. errs = append(errs, errors.Wrapf(err, "Cloudaccount.init"))
  371. }
  372. err = self.Projects.init(ctx)
  373. if err != nil {
  374. errs = append(errs, errors.Wrapf(err, "Projects.init"))
  375. }
  376. err = self.Cloudproviders.init(ctx)
  377. if err != nil {
  378. errs = append(errs, errors.Wrapf(err, "Cloudproviders.init"))
  379. }
  380. err = self.DBInstances.init(ctx)
  381. if err != nil {
  382. errs = append(errs, errors.Wrapf(err, "DBInstances.init"))
  383. }
  384. err = self.Servers.init(ctx)
  385. if err != nil {
  386. errs = append(errs, errors.Wrapf(err, "Servers.init"))
  387. }
  388. err = self.Hosts.init(ctx)
  389. if err != nil {
  390. errs = append(errs, errors.Wrapf(err, "Hosts.init"))
  391. }
  392. err = self.Storages.init(ctx)
  393. if err != nil {
  394. errs = append(errs, errors.Wrapf(err, "Storages.init"))
  395. }
  396. err = self.Redis.init(ctx)
  397. if err != nil {
  398. errs = append(errs, errors.Wrapf(err, "Redis.init"))
  399. }
  400. err = self.Loadbalancers.init(ctx)
  401. if err != nil {
  402. errs = append(errs, errors.Wrapf(err, "Loadbalancers.init"))
  403. }
  404. err = self.Buckets.init(ctx)
  405. if err != nil {
  406. errs = append(errs, errors.Wrapf(err, "Buckets.init"))
  407. }
  408. err = self.KubeClusters.init(ctx)
  409. if err != nil {
  410. errs = append(errs, errors.Wrapf(err, "KubeClusters.init"))
  411. }
  412. err = self.ModelartsPool.init(ctx)
  413. if err != nil {
  414. errs = append(errs, errors.Wrapf(err, "ModelartsPool.init"))
  415. }
  416. err = self.ElasticIps.init(ctx)
  417. if err != nil {
  418. errs = append(errs, errors.Wrapf(err, "ElasticIps.init"))
  419. }
  420. err = self.Wires.init(ctx)
  421. if err != nil {
  422. errs = append(errs, errors.Wrapf(err, "Wires.init"))
  423. }
  424. return errors.NewAggregate(errs)
  425. }()
  426. if err != nil {
  427. log.Errorf("Resource init error: %v", err)
  428. }
  429. self.init = true
  430. }
  431. }
  432. var incrementSync = false
  433. func (self *SResources) IncrementSync(ctx context.Context, userCred mcclient.TokenCredential, isStart bool) {
  434. if isStart || incrementSync {
  435. return
  436. }
  437. incrementSync = true
  438. defer func() {
  439. incrementSync = false
  440. }()
  441. err := func() error {
  442. errs := []error{}
  443. err := self.Cloudaccounts.increment(ctx)
  444. if err != nil {
  445. errs = append(errs, errors.Wrapf(err, "Cloudaccounts.increment"))
  446. }
  447. err = self.Projects.increment(ctx)
  448. if err != nil {
  449. errs = append(errs, errors.Wrapf(err, "Projects.increment"))
  450. }
  451. err = self.Cloudproviders.increment(ctx)
  452. if err != nil {
  453. errs = append(errs, errors.Wrapf(err, "Cloudproviders.increment"))
  454. }
  455. err = self.DBInstances.increment(ctx)
  456. if err != nil {
  457. errs = append(errs, errors.Wrapf(err, "DBInstances.increment"))
  458. }
  459. err = self.Servers.increment(ctx)
  460. if err != nil {
  461. errs = append(errs, errors.Wrapf(err, "Servers.increment"))
  462. }
  463. err = self.Hosts.increment(ctx)
  464. if err != nil {
  465. errs = append(errs, errors.Wrapf(err, "Hosts.increment"))
  466. }
  467. err = self.Storages.increment(ctx)
  468. if err != nil {
  469. errs = append(errs, errors.Wrapf(err, "Storages.increment"))
  470. }
  471. err = self.Redis.increment(ctx)
  472. if err != nil {
  473. errs = append(errs, errors.Wrapf(err, "Redis.increment"))
  474. }
  475. err = self.Loadbalancers.increment(ctx)
  476. if err != nil {
  477. errs = append(errs, errors.Wrapf(err, "Loadbalancers.increment"))
  478. }
  479. err = self.Buckets.increment(ctx)
  480. if err != nil {
  481. errs = append(errs, errors.Wrapf(err, "Buckets.increment"))
  482. }
  483. err = self.KubeClusters.increment(ctx)
  484. if err != nil {
  485. errs = append(errs, errors.Wrapf(err, "KubeClusters.increment"))
  486. }
  487. err = self.ModelartsPool.increment(ctx)
  488. if err != nil {
  489. errs = append(errs, errors.Wrapf(err, "ModelartsPool.increment"))
  490. }
  491. err = self.ElasticIps.increment(ctx)
  492. if err != nil {
  493. errs = append(errs, errors.Wrapf(err, "Elasticips.increment"))
  494. }
  495. err = self.Wires.increment(ctx)
  496. if err != nil {
  497. errs = append(errs, errors.Wrapf(err, "Wires.increment"))
  498. }
  499. return errors.NewAggregate(errs)
  500. }()
  501. if err != nil {
  502. log.Errorf("Increment error: %v", err)
  503. }
  504. }
  505. var decrementSync = false
  506. func (self *SResources) DecrementSync(ctx context.Context, userCred mcclient.TokenCredential, isStart bool) {
  507. if isStart || decrementSync {
  508. return
  509. }
  510. decrementSync = true
  511. defer func() {
  512. decrementSync = false
  513. }()
  514. err := func() error {
  515. errs := []error{}
  516. err := self.Cloudaccounts.decrement(ctx)
  517. if err != nil {
  518. errs = append(errs, errors.Wrapf(err, "Cloudaccounts.decrement"))
  519. }
  520. err = self.Cloudproviders.decrement(ctx)
  521. if err != nil {
  522. errs = append(errs, errors.Wrapf(err, "Cloudproviders.decrement"))
  523. }
  524. err = self.DBInstances.decrement(ctx)
  525. if err != nil {
  526. errs = append(errs, errors.Wrapf(err, "DBInstances.decrement"))
  527. }
  528. err = self.Servers.decrement(ctx)
  529. if err != nil {
  530. errs = append(errs, errors.Wrapf(err, "Servers.decrement"))
  531. }
  532. err = self.Hosts.decrement(ctx)
  533. if err != nil {
  534. errs = append(errs, errors.Wrapf(err, "Hosts.decrement"))
  535. }
  536. err = self.Storages.decrement(ctx)
  537. if err != nil {
  538. errs = append(errs, errors.Wrapf(err, "Storages.decrement"))
  539. }
  540. err = self.Redis.decrement(ctx)
  541. if err != nil {
  542. errs = append(errs, errors.Wrapf(err, "Redis.decrement"))
  543. }
  544. err = self.Loadbalancers.decrement(ctx)
  545. if err != nil {
  546. errs = append(errs, errors.Wrapf(err, "Loadbalancers.decrement"))
  547. }
  548. err = self.Buckets.decrement(ctx)
  549. if err != nil {
  550. errs = append(errs, errors.Wrapf(err, "Buckets.decrement"))
  551. }
  552. err = self.KubeClusters.decrement(ctx)
  553. if err != nil {
  554. errs = append(errs, errors.Wrapf(err, "KubeClusters.decrement"))
  555. }
  556. err = self.ModelartsPool.decrement(ctx)
  557. if err != nil {
  558. errs = append(errs, errors.Wrapf(err, "ModelartsPool.decrement"))
  559. }
  560. err = self.Wires.decrement(ctx)
  561. if err != nil {
  562. errs = append(errs, errors.Wrapf(err, "ModelartsPool.decrement"))
  563. }
  564. err = self.ElasticIps.decrement(ctx)
  565. if err != nil {
  566. errs = append(errs, errors.Wrapf(err, "ElasticIps.decrement"))
  567. }
  568. err = self.Projects.decrement(ctx)
  569. if err != nil {
  570. errs = append(errs, errors.Wrapf(err, "Projects.decrement"))
  571. }
  572. return errors.NewAggregate(errs)
  573. }()
  574. if err != nil {
  575. log.Errorf("Increment error: %v", err)
  576. }
  577. }
  578. var updateSync = false
  579. func (self *SResources) UpdateSync(ctx context.Context, userCred mcclient.TokenCredential, isStart bool) {
  580. if isStart || updateSync {
  581. return
  582. }
  583. updateSync = true
  584. defer func() {
  585. updateSync = false
  586. }()
  587. err := func() error {
  588. errs := []error{}
  589. err := self.Cloudaccounts.update(ctx)
  590. if err != nil {
  591. errs = append(errs, errors.Wrapf(err, "Cloudacconts.update"))
  592. }
  593. err = self.Projects.update(ctx)
  594. if err != nil {
  595. errs = append(errs, errors.Wrapf(err, "Projects.update"))
  596. }
  597. err = self.DBInstances.update(ctx)
  598. if err != nil {
  599. errs = append(errs, errors.Wrapf(err, "DBInstances.update"))
  600. }
  601. err = self.Servers.update(ctx)
  602. if err != nil {
  603. errs = append(errs, errors.Wrapf(err, "Servers.update"))
  604. }
  605. err = self.Hosts.update(ctx)
  606. if err != nil {
  607. errs = append(errs, errors.Wrapf(err, "Hosts.update"))
  608. }
  609. err = self.Storages.update(ctx)
  610. if err != nil {
  611. errs = append(errs, errors.Wrapf(err, "Storages.update"))
  612. }
  613. err = self.Redis.update(ctx)
  614. if err != nil {
  615. errs = append(errs, errors.Wrapf(err, "Redis.update"))
  616. }
  617. err = self.Loadbalancers.update(ctx)
  618. if err != nil {
  619. errs = append(errs, errors.Wrapf(err, "Loadbalancers.update"))
  620. }
  621. err = self.ModelartsPool.update(ctx)
  622. if err != nil {
  623. errs = append(errs, errors.Wrapf(err, "ModelartsPool.update"))
  624. }
  625. return errors.NewAggregate(errs)
  626. }()
  627. if err != nil {
  628. log.Errorf("Update error: %v", err)
  629. }
  630. }
  631. type sMetricProvider struct {
  632. api.CloudproviderDetails
  633. }
  634. func (p sMetricProvider) GetId() string {
  635. return p.Id
  636. }
  637. func (p sMetricProvider) GetName() string {
  638. return p.Name
  639. }
  640. func (p sMetricProvider) Keyword() string {
  641. return "cloudprovider"
  642. }
  643. func (res *SResources) CollectMetrics(ctx context.Context, userCred mcclient.TokenCredential, taskStartTime time.Time, isStart bool) {
  644. if isStart {
  645. return
  646. }
  647. ch := make(chan struct{}, options.Options.CloudAccountCollectMetricsBatchCount)
  648. defer close(ch)
  649. s := auth.GetAdminSession(ctx, options.Options.Region)
  650. resources := res.Cloudproviders.getResources(ctx, "")
  651. cloudproviders := map[string]api.CloudproviderDetails{}
  652. jsonutils.Update(&cloudproviders, resources)
  653. az, _ := time.LoadLocation(options.Options.TimeZone)
  654. _endTime := taskStartTime.In(az)
  655. _startTime := _endTime.Add(-1 * time.Minute * time.Duration(options.Options.CollectMetricInterval))
  656. var wg sync.WaitGroup
  657. for i := range cloudproviders {
  658. ch <- struct{}{}
  659. wg.Add(1)
  660. goctx := context.WithValue(ctx, appctx.APP_CONTEXT_KEY_START_TIME, time.Now().UTC())
  661. go func(ctx context.Context, manager api.CloudproviderDetails) {
  662. succ := true
  663. msgs := make([]string, 0)
  664. defer func() {
  665. if len(msgs) > 0 {
  666. logclient.AddActionLogWithContext(ctx, &sMetricProvider{manager}, logclient.ACT_COLLECT_METRICS, strings.Join(msgs, ";"), userCred, succ)
  667. }
  668. wg.Done()
  669. <-ch
  670. }()
  671. if strings.Contains(strings.ToLower(options.Options.SkipMetricPullProviders), strings.ToLower(manager.Provider)) {
  672. logmsg := fmt.Sprintf("skip %s metric pull with options: %s", manager.Provider, options.Options.SkipMetricPullProviders)
  673. log.Infoln(logmsg)
  674. return
  675. }
  676. driver, err := providerdriver.GetDriver(manager.Provider)
  677. if err != nil {
  678. logmsg := fmt.Sprintf("failed get provider %s(%s) driver %v", manager.Name, manager.Provider, err)
  679. log.Errorln(logmsg)
  680. msgs = append(msgs, logmsg)
  681. succ = false
  682. return
  683. }
  684. if !driver.IsSupportMetrics() {
  685. logmsg := fmt.Sprintf("%s not support metrics, skip", driver.GetProvider())
  686. log.Infoln(logmsg)
  687. return
  688. }
  689. provider, err := compute.Cloudproviders.GetProvider(ctx, s, manager.Id)
  690. if err != nil {
  691. logmsg := fmt.Sprintf("failed get provider %s(%s) driver %v", manager.Name, manager.Provider, err)
  692. log.Errorln(logmsg)
  693. msgs = append(msgs, logmsg)
  694. succ = false
  695. return
  696. }
  697. duration := driver.GetDelayDuration()
  698. endTime := _endTime.Add(-1 * duration)
  699. startTime := _startTime.Add(-1 * duration).Add(time.Second * -59)
  700. resources = res.DBInstances.getResources(ctx, manager.Id)
  701. dbinstances := map[string]api.DBInstanceDetails{}
  702. err = jsonutils.Update(&dbinstances, resources)
  703. if err != nil {
  704. logmsg := fmt.Sprintf("unmarshal rds resources error: %v", err)
  705. log.Errorln(logmsg)
  706. msgs = append(msgs, logmsg)
  707. succ = false
  708. }
  709. if len(dbinstances) > 0 {
  710. err = driver.CollectDBInstanceMetrics(ctx, manager, provider, dbinstances, startTime, endTime)
  711. if err != nil && errors.Cause(err) != cloudprovider.ErrNotImplemented && errors.Cause(err) != cloudprovider.ErrNotSupported {
  712. logmsg := fmt.Sprintf("CollectDBInstanceMetrics for %s(%s) error: %v", manager.Name, manager.Provider, err)
  713. log.Errorln(logmsg)
  714. msgs = append(msgs, logmsg)
  715. succ = false
  716. }
  717. }
  718. resources = res.Servers.getResources(ctx, manager.Id)
  719. servers := map[string]api.ServerDetails{}
  720. err = jsonutils.Update(&servers, resources)
  721. if err != nil {
  722. logmsg := fmt.Sprintf("unmarsha server resources error: %v", err)
  723. log.Errorln(logmsg)
  724. msgs = append(msgs, logmsg)
  725. succ = false
  726. }
  727. if len(servers) > 0 {
  728. err = driver.CollectServerMetrics(ctx, manager, provider, servers, startTime, endTime)
  729. if err != nil && errors.Cause(err) != cloudprovider.ErrNotImplemented && errors.Cause(err) != cloudprovider.ErrNotSupported {
  730. logmsg := fmt.Sprintf("CollectServerMetrics for %s(%s) error: %v", manager.Name, manager.Provider, err)
  731. log.Errorf("%s", logmsg)
  732. msgs = append(msgs, logmsg)
  733. succ = false
  734. }
  735. }
  736. resources = res.Hosts.getResources(ctx, manager.Id)
  737. hosts := map[string]api.HostDetails{}
  738. err = jsonutils.Update(&hosts, resources)
  739. if err != nil {
  740. logmsg := fmt.Sprintf("unmarsha host resources error: %v", err)
  741. log.Errorln(logmsg)
  742. msgs = append(msgs, logmsg)
  743. succ = false
  744. }
  745. if len(hosts) > 0 {
  746. err = driver.CollectHostMetrics(ctx, manager, provider, hosts, startTime, endTime)
  747. if err != nil && errors.Cause(err) != cloudprovider.ErrNotImplemented && errors.Cause(err) != cloudprovider.ErrNotSupported {
  748. logmsg := fmt.Sprintf("CollectHostMetrics for %s(%s) error: %v", manager.Name, manager.Provider, err)
  749. log.Errorf("%s", logmsg)
  750. msgs = append(msgs, logmsg)
  751. succ = false
  752. }
  753. }
  754. resources = res.Storages.getResources(ctx, manager.Id)
  755. storages := map[string]api.StorageDetails{}
  756. err = jsonutils.Update(&storages, resources)
  757. if err != nil {
  758. logmsg := fmt.Sprintf("unmarsha storage resources error: %v", err)
  759. log.Errorf("%s", logmsg)
  760. msgs = append(msgs, logmsg)
  761. succ = false
  762. }
  763. if len(storages) > 0 {
  764. err = driver.CollectStorageMetrics(ctx, manager, provider, storages, startTime, endTime)
  765. if err != nil && errors.Cause(err) != cloudprovider.ErrNotImplemented && errors.Cause(err) != cloudprovider.ErrNotSupported {
  766. logmsg := fmt.Sprintf("CollectStorageMetrics for %s(%s) error: %v", manager.Name, manager.Provider, err)
  767. log.Errorf("%s", logmsg)
  768. msgs = append(msgs, logmsg)
  769. succ = false
  770. }
  771. }
  772. resources = res.Redis.getResources(ctx, manager.Id)
  773. caches := map[string]api.ElasticcacheDetails{}
  774. err = jsonutils.Update(&caches, resources)
  775. if err != nil {
  776. logmsg := fmt.Sprintf("unmarsha redis resources error: %v", err)
  777. log.Errorf("%s", logmsg)
  778. msgs = append(msgs, logmsg)
  779. succ = false
  780. }
  781. if len(caches) > 0 {
  782. err = driver.CollectRedisMetrics(ctx, manager, provider, caches, startTime, endTime)
  783. if err != nil && errors.Cause(err) != cloudprovider.ErrNotImplemented && errors.Cause(err) != cloudprovider.ErrNotSupported {
  784. logmsg := fmt.Sprintf("CollectRedisMetrics for %s(%s) error: %v", manager.Name, manager.Provider, err)
  785. log.Errorf("%s", logmsg)
  786. msgs = append(msgs, logmsg)
  787. succ = false
  788. }
  789. }
  790. resources = res.Loadbalancers.getResources(ctx, manager.Id)
  791. lbs := map[string]api.LoadbalancerDetails{}
  792. err = jsonutils.Update(&lbs, resources)
  793. if err != nil {
  794. logmsg := fmt.Sprintf("unmarsha lb resources error: %v", err)
  795. log.Errorf("%s", logmsg)
  796. msgs = append(msgs, logmsg)
  797. succ = false
  798. }
  799. if len(lbs) > 0 {
  800. err = driver.CollectLoadbalancerMetrics(ctx, manager, provider, lbs, startTime, endTime)
  801. if err != nil && errors.Cause(err) != cloudprovider.ErrNotImplemented && errors.Cause(err) != cloudprovider.ErrNotSupported {
  802. logmsg := fmt.Sprintf("CollectLoadbalancerMetrics for %s(%s) error: %v", manager.Name, manager.Provider, err)
  803. log.Errorf("%s", logmsg)
  804. msgs = append(msgs, logmsg)
  805. succ = false
  806. }
  807. }
  808. resources = res.Buckets.getResources(ctx, manager.Id)
  809. buckets := map[string]api.BucketDetails{}
  810. err = jsonutils.Update(&buckets, resources)
  811. if err != nil {
  812. logmsg := fmt.Sprintf("unmarsha bucket resources error: %v", err)
  813. log.Errorf("%s", logmsg)
  814. msgs = append(msgs, logmsg)
  815. succ = false
  816. }
  817. if len(buckets) > 0 {
  818. err = driver.CollectBucketMetrics(ctx, manager, provider, buckets, startTime, endTime)
  819. if err != nil && errors.Cause(err) != cloudprovider.ErrNotImplemented && errors.Cause(err) != cloudprovider.ErrNotSupported {
  820. logmsg := fmt.Sprintf("CollectBucketMetrics for %s(%s) error: %v", manager.Name, manager.Provider, err)
  821. log.Errorf("%s", logmsg)
  822. msgs = append(msgs, logmsg)
  823. succ = false
  824. }
  825. }
  826. resources = res.KubeClusters.getResources(ctx, manager.Id)
  827. clusters := map[string]api.KubeClusterDetails{}
  828. err = jsonutils.Update(&clusters, resources)
  829. if err != nil {
  830. logmsg := fmt.Sprintf("unmarsha k8s resources error: %v", err)
  831. log.Errorln(logmsg)
  832. msgs = append(msgs, logmsg)
  833. succ = false
  834. }
  835. if len(clusters) > 0 {
  836. err = driver.CollectK8sMetrics(ctx, manager, provider, clusters, startTime, endTime)
  837. if err != nil && errors.Cause(err) != cloudprovider.ErrNotImplemented && errors.Cause(err) != cloudprovider.ErrNotSupported {
  838. logmsg := fmt.Sprintf("CollectK8sMetrics for %s(%s) error: %v", manager.Name, manager.Provider, err)
  839. log.Errorln(logmsg)
  840. msgs = append(msgs, logmsg)
  841. succ = false
  842. }
  843. }
  844. resources = res.ModelartsPool.getResources(ctx, manager.Id)
  845. pools := map[string]api.ModelartsPoolDetails{}
  846. err = jsonutils.Update(&pools, resources)
  847. if err != nil {
  848. logmsg := fmt.Sprintf("unmarsha modelarts resources error: %v", err)
  849. log.Errorln(logmsg)
  850. msgs = append(msgs, logmsg)
  851. succ = false
  852. }
  853. if len(pools) > 0 {
  854. err = driver.CollectModelartsPoolMetrics(ctx, manager, provider, pools, startTime, endTime)
  855. if err != nil && errors.Cause(err) != cloudprovider.ErrNotImplemented && errors.Cause(err) != cloudprovider.ErrNotSupported {
  856. logmsg := fmt.Sprintf("CollectModelartsPoolMetrics for %s(%s) error: %v", manager.Name, manager.Provider, err)
  857. log.Errorln(logmsg)
  858. msgs = append(msgs, logmsg)
  859. succ = false
  860. }
  861. }
  862. resources = res.Wires.getResources(ctx, manager.Id)
  863. wires := map[string]api.WireDetails{}
  864. err = jsonutils.Update(&wires, resources)
  865. if err != nil {
  866. logmsg := fmt.Sprintf("unmarsha wires resources error: %v", err)
  867. log.Errorln(logmsg)
  868. msgs = append(msgs, logmsg)
  869. succ = false
  870. }
  871. if len(wires) > 0 {
  872. err = driver.CollectWireMetrics(ctx, manager, provider, wires, startTime, endTime)
  873. if err != nil && errors.Cause(err) != cloudprovider.ErrNotImplemented && errors.Cause(err) != cloudprovider.ErrNotSupported {
  874. logmsg := fmt.Sprintf("CollectWireMetrics for %s(%s) error: %v", manager.Name, manager.Provider, err)
  875. log.Errorln(logmsg)
  876. msgs = append(msgs, logmsg)
  877. succ = false
  878. }
  879. }
  880. resources = res.ElasticIps.getResources(ctx, manager.Id)
  881. eips := map[string]api.ElasticipDetails{}
  882. err = jsonutils.Update(&eips, resources)
  883. if err != nil {
  884. logmsg := fmt.Sprintf("unmarsha eips resources error: %v", err)
  885. log.Errorln(logmsg)
  886. msgs = append(msgs, logmsg)
  887. succ = false
  888. }
  889. if len(eips) > 0 {
  890. err = driver.CollectEipMetrics(ctx, manager, provider, eips, startTime, endTime)
  891. if err != nil && errors.Cause(err) != cloudprovider.ErrNotImplemented && errors.Cause(err) != cloudprovider.ErrNotSupported {
  892. logmsg := fmt.Sprintf("CollectEipMetrics for %s(%s) error: %v", manager.Name, manager.Provider, err)
  893. log.Errorln(logmsg)
  894. msgs = append(msgs, logmsg)
  895. succ = false
  896. }
  897. }
  898. }(goctx, cloudproviders[i])
  899. }
  900. wg.Wait()
  901. resources = res.Cloudaccounts.getResources(ctx, "")
  902. accounts := map[string]api.CloudaccountDetail{}
  903. jsonutils.Update(&accounts, resources)
  904. metrics := []influxdb.SMetricData{}
  905. for _, account := range accounts {
  906. driver, err := providerdriver.GetDriver(account.Provider)
  907. if err != nil {
  908. log.Errorf("failed get account %s(%s) driver %v", account.Name, account.Provider, err)
  909. continue
  910. }
  911. if math.Abs(account.Balance) < 0.000001 {
  912. continue
  913. }
  914. metric, err := driver.CollectAccountMetrics(ctx, account)
  915. if err != nil {
  916. continue
  917. }
  918. metrics = append(metrics, metric)
  919. }
  920. log.Debugf("send %d account metrics to meter_db", len(metrics))
  921. urls, err := tsdb.GetDefaultServiceSourceURLs(s, options.Options.SessionEndpointType)
  922. if err != nil {
  923. log.Errorf("Get influxdb %s service url: %v", options.Options.SessionEndpointType, err)
  924. return
  925. }
  926. if err := influxdb.SendMetrics(urls, "meter_db", metrics, false); err != nil {
  927. log.Errorf("SendMetrics to meter_db: %v", err)
  928. return
  929. }
  930. }