service.go 5.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160
  1. // Copyright 2019 Yunion
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package service
  15. import (
  16. "context"
  17. "os"
  18. "time"
  19. "golang.org/x/sync/errgroup"
  20. "yunion.io/x/jsonutils"
  21. "yunion.io/x/log"
  22. _ "yunion.io/x/sqlchemy/backends"
  23. "yunion.io/x/onecloud/pkg/cloudcommon"
  24. common_app "yunion.io/x/onecloud/pkg/cloudcommon/app"
  25. "yunion.io/x/onecloud/pkg/cloudcommon/consts"
  26. "yunion.io/x/onecloud/pkg/cloudcommon/cronman"
  27. "yunion.io/x/onecloud/pkg/cloudcommon/db"
  28. "yunion.io/x/onecloud/pkg/cloudcommon/db/taskman"
  29. "yunion.io/x/onecloud/pkg/cloudcommon/notifyclient"
  30. common_options "yunion.io/x/onecloud/pkg/cloudcommon/options"
  31. "yunion.io/x/onecloud/pkg/mcclient/auth"
  32. _ "yunion.io/x/onecloud/pkg/monitor/alerting"
  33. _ "yunion.io/x/onecloud/pkg/monitor/alerting/conditions"
  34. _ "yunion.io/x/onecloud/pkg/monitor/alerting/notifiers"
  35. _ "yunion.io/x/onecloud/pkg/monitor/alertresourcedrivers"
  36. "yunion.io/x/onecloud/pkg/monitor/controller/balancer"
  37. "yunion.io/x/onecloud/pkg/monitor/models"
  38. _ "yunion.io/x/onecloud/pkg/monitor/notifydrivers"
  39. "yunion.io/x/onecloud/pkg/monitor/options"
  40. "yunion.io/x/onecloud/pkg/monitor/registry"
  41. "yunion.io/x/onecloud/pkg/monitor/subscriptionmodel"
  42. _ "yunion.io/x/onecloud/pkg/monitor/tsdb/driver/influxdb"
  43. _ "yunion.io/x/onecloud/pkg/monitor/tsdb/driver/victoriametrics"
  44. "yunion.io/x/onecloud/pkg/monitor/worker"
  45. )
  46. func StartService() {
  47. opts := &options.Options
  48. common_options.ParseOptions(opts, os.Args, "monitor.conf", "monitor")
  49. commonOpts := &opts.CommonOptions
  50. common_app.InitAuth(commonOpts, func() {
  51. log.Infof("Auth complete")
  52. })
  53. dbOpts := &opts.DBOptions
  54. baseOpts := &opts.BaseOptions
  55. app := common_app.InitApp(baseOpts, true).
  56. OnException(func(method, path string, body jsonutils.JSONObject, err error) {
  57. ctx := context.Background()
  58. session := auth.GetAdminSession(ctx, commonOpts.Region)
  59. notifyclient.EventNotifyServiceAbnormal(ctx, session.GetToken(), consts.GetServiceType(), method, path, body, err)
  60. })
  61. cloudcommon.InitDB(dbOpts)
  62. InitHandlers(app, opts.IsSlaveNode)
  63. db.EnsureAppSyncDB(app, dbOpts, models.InitDB)
  64. defer cloudcommon.CloseDB()
  65. if !opts.IsSlaveNode {
  66. go startServices()
  67. err := taskman.TaskManager.InitializeData()
  68. if err != nil {
  69. log.Fatalf("TaskManager.InitializeData fail %s", err)
  70. }
  71. cron := cronman.InitCronJobManager(true, opts.CronJobWorkerCount, opts.TimeZone)
  72. cron.AddJobAtIntervalsWithStartRun("InitAlertResourceAdminRoleUsers", time.Duration(opts.InitAlertResourceAdminRoleUsersIntervalSeconds)*time.Second, models.GetAlertResourceManager().GetAdminRoleUsers, true)
  73. cron.AddJobEveryFewDays("DeleteRecordsOfThirtyDaysAgoRecords", 1, 0, 0, 0,
  74. models.AlertRecordManager.DeleteRecordsOfThirtyDaysAgo, false)
  75. //cron.AddJobAtIntervalsWithStartRun("MonitorResourceSync", time.Duration(opts.MonitorResourceSyncIntervalSeconds)*time.Minute*60, models.MonitorResourceManager.SyncResources, true)
  76. cron.AddJobEveryFewHour("AutoPurgeSplitable", 4, 30, 0, db.AutoPurgeSplitable, false)
  77. cron.AddJobAtIntervalsWithStartRun("TaskCleanupJob", time.Duration(options.Options.TaskArchiveIntervalMinutes)*time.Minute, taskman.TaskManager.TaskCleanupJob, true)
  78. cron.Start()
  79. defer cron.Stop()
  80. worker, err := worker.NewWorker(opts)
  81. if err != nil {
  82. log.Fatalf("new worker failed: %v", err)
  83. }
  84. go worker.Start(app.GetContext(), app, "")
  85. }
  86. InitInfluxDBSubscriptionHandlers(app, baseOpts)
  87. // start migration recover routine
  88. go func() {
  89. if err := balancer.RecoverInProcessAlerts(app.GetContext(), auth.GetAdminSession(app.GetContext(), options.Options.Region)); err != nil {
  90. log.Errorf("RecoverInProcessAlerts error: %v", err)
  91. }
  92. }()
  93. }
  94. func startServices() {
  95. services := registry.GetServices()
  96. // Initialize services
  97. for _, svc := range services {
  98. if registry.IsDisabled(svc.Instance) {
  99. continue
  100. }
  101. log.Infof("Initializing %s", svc.Name)
  102. if err := svc.Instance.Init(); err != nil {
  103. log.Fatalf("Service %s init failed: %v", svc.Name, err)
  104. }
  105. }
  106. subscriptionmodel.SubscriptionManager.AddSubscription()
  107. models.CommonAlertManager.SetSubscriptionManager(subscriptionmodel.SubscriptionManager)
  108. childRoutines, ctx := errgroup.WithContext(context.Background())
  109. // Start background services
  110. for _, svc := range services {
  111. service, ok := svc.Instance.(registry.BackgroundService)
  112. if !ok {
  113. continue
  114. }
  115. if registry.IsDisabled(svc.Instance) {
  116. continue
  117. }
  118. // Variable is needed for accessing loop variable in callback
  119. descriptor := svc
  120. childRoutines.Go(func() error {
  121. if err := service.Run(ctx); err != nil {
  122. log.Errorf("Stopped %s: %v", descriptor.Name, err)
  123. return err
  124. }
  125. return nil
  126. })
  127. }
  128. defer func() {
  129. log.Debugf("Waiting on services...")
  130. if waitErr := childRoutines.Wait(); waitErr != nil {
  131. log.Errorf("A service failed: %v", waitErr)
  132. }
  133. }()
  134. }