worker.go 2.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101
  1. // Copyright 2019 Yunion
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package worker
  15. import (
  16. "context"
  17. "runtime"
  18. "runtime/debug"
  19. "yunion.io/x/log"
  20. "yunion.io/x/pkg/errors"
  21. "yunion.io/x/onecloud/pkg/apihelper"
  22. "yunion.io/x/onecloud/pkg/appsrv"
  23. "yunion.io/x/onecloud/pkg/monitor/models"
  24. "yunion.io/x/onecloud/pkg/monitor/options"
  25. "yunion.io/x/onecloud/pkg/vpcagent/worker"
  26. )
  27. type Worker struct {
  28. opts *options.AlerterOptions
  29. apih *apihelper.APIHelper
  30. }
  31. func NewWorker(opts *options.AlerterOptions) (worker.IWorker, error) {
  32. man := models.MonitorResourceManager
  33. modelSets := man.GetModelSets()
  34. apiOpts := &apihelper.Options{
  35. CommonOptions: opts.CommonOptions,
  36. SyncIntervalSeconds: opts.APISyncIntervalSeconds,
  37. RunDelayMilliseconds: opts.APIRunDelayMilliseconds,
  38. ListBatchSize: opts.APIListBatchSize,
  39. IncludeDetails: true,
  40. IncludeOtherCloudEnv: true,
  41. }
  42. apih, err := apihelper.NewAPIHelper(apiOpts, modelSets)
  43. if err != nil {
  44. return nil, errors.Wrap(err, "NewAPIHelper")
  45. }
  46. man.SetAPIHelper(apih)
  47. w := &Worker{
  48. opts: opts,
  49. apih: apih,
  50. }
  51. return w, nil
  52. }
  53. func (w *Worker) Start(ctx context.Context, app *appsrv.Application, prefix string) {
  54. defer func() {
  55. log.Infoln("monitor resource: worker bye")
  56. }()
  57. log.Infoln("start to get api Resource")
  58. go w.apih.Start(ctx, nil, "")
  59. var mss *models.MonitorResModelSets
  60. for {
  61. select {
  62. case imss := <-w.apih.ModelSets():
  63. log.Infof("monitorRes: got new data from api helper")
  64. mss = imss.(*models.MonitorResModelSets)
  65. if err := w.run(ctx, mss); err != nil {
  66. log.Errorf("monitorResWork err: %v", err)
  67. }
  68. case <-ctx.Done():
  69. return
  70. }
  71. }
  72. }
  73. func (w *Worker) run(ctx context.Context, mss *models.MonitorResModelSets) (err error) {
  74. defer func() {
  75. if panicVal := recover(); panicVal != nil {
  76. if panicErr, ok := panicVal.(runtime.Error); ok {
  77. err = errors.Wrap(panicErr, string(debug.Stack()))
  78. } else if panicErr, ok := panicVal.(error); ok {
  79. err = panicErr
  80. } else {
  81. panic(panicVal)
  82. }
  83. }
  84. }()
  85. err = models.MonitorResourceManager.SyncResources(ctx, mss)
  86. if err != nil {
  87. return err
  88. }
  89. return nil
  90. }