service.go 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313
  1. // Copyright 2019 Yunion
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package service
  15. import (
  16. "context"
  17. "io/ioutil"
  18. "os"
  19. "path/filepath"
  20. "time"
  21. "yunion.io/x/cloudmux/pkg/multicloud/esxi"
  22. "yunion.io/x/jsonutils"
  23. "yunion.io/x/log"
  24. "yunion.io/x/pkg/errors"
  25. "yunion.io/x/pkg/util/httputils"
  26. _ "yunion.io/x/sqlchemy/backends"
  27. "yunion.io/x/onecloud/pkg/apis"
  28. api "yunion.io/x/onecloud/pkg/apis/compute"
  29. "yunion.io/x/onecloud/pkg/apis/identity"
  30. "yunion.io/x/onecloud/pkg/appsrv"
  31. "yunion.io/x/onecloud/pkg/cloudcommon"
  32. common_app "yunion.io/x/onecloud/pkg/cloudcommon/app"
  33. "yunion.io/x/onecloud/pkg/cloudcommon/consts"
  34. "yunion.io/x/onecloud/pkg/cloudcommon/cronman"
  35. "yunion.io/x/onecloud/pkg/cloudcommon/db"
  36. "yunion.io/x/onecloud/pkg/cloudcommon/db/cachesync"
  37. "yunion.io/x/onecloud/pkg/cloudcommon/db/taskman"
  38. "yunion.io/x/onecloud/pkg/cloudcommon/elect"
  39. "yunion.io/x/onecloud/pkg/cloudcommon/etcd"
  40. "yunion.io/x/onecloud/pkg/cloudcommon/notifyclient"
  41. common_options "yunion.io/x/onecloud/pkg/cloudcommon/options"
  42. _ "yunion.io/x/onecloud/pkg/compute/container_drivers/device"
  43. _ "yunion.io/x/onecloud/pkg/compute/container_drivers/lifecycle"
  44. _ "yunion.io/x/onecloud/pkg/compute/container_drivers/volume_mount"
  45. _ "yunion.io/x/onecloud/pkg/compute/guestdrivers"
  46. _ "yunion.io/x/onecloud/pkg/compute/hostdrivers"
  47. "yunion.io/x/onecloud/pkg/compute/models"
  48. "yunion.io/x/onecloud/pkg/compute/options"
  49. "yunion.io/x/onecloud/pkg/compute/policy"
  50. _ "yunion.io/x/onecloud/pkg/compute/regiondrivers"
  51. _ "yunion.io/x/onecloud/pkg/compute/storagedrivers"
  52. _ "yunion.io/x/onecloud/pkg/compute/tasks"
  53. cloudaccount_tasks "yunion.io/x/onecloud/pkg/compute/tasks/cloudaccount"
  54. "yunion.io/x/onecloud/pkg/controller/autoscaling"
  55. "yunion.io/x/onecloud/pkg/httperrors"
  56. "yunion.io/x/onecloud/pkg/mcclient/auth"
  57. )
  58. func StartService() {
  59. StartServiceWithJobs(nil)
  60. }
  61. func StartServiceWithJobs(jobs func(cron *cronman.SCronJobManager)) {
  62. StartServiceWithJobsAndApp(jobs, nil)
  63. }
  64. func StartServiceWithJobsAndApp(jobs func(cron *cronman.SCronJobManager), appCllback func(app *appsrv.Application)) {
  65. opts := &options.Options
  66. commonOpts := &options.Options.CommonOptions
  67. baseOpts := &options.Options.BaseOptions
  68. dbOpts := &options.Options.DBOptions
  69. common_options.ParseOptions(opts, os.Args, "region.conf", api.SERVICE_TYPE)
  70. policy.Init()
  71. if opts.PortV2 > 0 {
  72. log.Infof("Port V2 %d is specified, use v2 port", opts.PortV2)
  73. commonOpts.Port = opts.PortV2
  74. }
  75. common_app.InitAuth(commonOpts, func() {
  76. log.Infof("Auth complete!!")
  77. })
  78. common_options.StartOptionManager(opts, opts.ConfigSyncPeriodSeconds, api.SERVICE_TYPE, api.SERVICE_VERSION, options.OnOptionsChange)
  79. serviceUrl, err := auth.GetServiceURL(apis.SERVICE_TYPE_REGION, opts.Region, "", identity.EndpointInterfaceInternal, httputils.POST)
  80. if err != nil {
  81. log.Fatalf("unable to get service url: %v", err)
  82. }
  83. log.Infof("serviceUrl: %s", serviceUrl)
  84. taskman.SetServiceUrl(serviceUrl)
  85. // err = taskman.UpdateWorkerCount(opts.TaskWorkerCount)
  86. // if err != nil {
  87. // log.Fatalf("failed update task manager worker count %s", err)
  88. // }
  89. err = esxi.InitEsxiConfig(opts.EsxiOptions)
  90. if err != nil {
  91. log.Fatalf("unable to init esxi configs: %v", err)
  92. }
  93. // always try to init etcd options
  94. if err := initEtcdLockOpts(opts); err != nil {
  95. log.Errorf("try to init etcd options error: %v", err)
  96. }
  97. app := common_app.InitApp(baseOpts, true).
  98. OnException(func(method, path string, body jsonutils.JSONObject, err error) {
  99. ctx := context.Background()
  100. session := auth.GetAdminSession(ctx, commonOpts.Region)
  101. notifyclient.EventNotifyServiceAbnormal(ctx, session.GetToken(), consts.GetServiceType(), method, path, body, err)
  102. })
  103. cloudcommon.InitDB(dbOpts)
  104. InitHandlers(app, opts.IsSlaveNode)
  105. if appCllback != nil {
  106. appCllback(app)
  107. }
  108. db.EnsureAppSyncDB(app, dbOpts, models.InitDB)
  109. defer cloudcommon.CloseDB()
  110. if !opts.IsSlaveNode {
  111. cancelFunc := startMasterTasks(opts, dbOpts, jobs)
  112. defer cancelFunc()
  113. }
  114. common_app.ServeForever(app, baseOpts)
  115. }
  116. func startMasterTasks(opts *options.ComputeOptions, dbOpts *common_options.DBOptions, jobs func(cron *cronman.SCronJobManager)) context.CancelFunc {
  117. setInfluxdbRetentionPolicy()
  118. models.InitSyncWorkers(opts.CloudSyncWorkerCount)
  119. cloudaccount_tasks.InitCloudproviderSyncWorkers(opts.CloudProviderSyncWorkerCount)
  120. var (
  121. electObj *elect.Elect
  122. )
  123. ctx, cancelFunc := context.WithCancel(context.Background())
  124. if opts.LockmanMethod == common_options.LockMethodEtcd {
  125. etcdCfg, err := elect.NewEtcdConfigFromDBOptions(dbOpts)
  126. if err != nil {
  127. log.Fatalf("etcd config for elect: %v", err)
  128. }
  129. electObj, err = elect.NewElect(etcdCfg, "@master-role")
  130. if err != nil {
  131. log.Fatalf("new elect instance: %v", err)
  132. }
  133. go electObj.Start(ctx)
  134. }
  135. if opts.EnableHostHealthCheck {
  136. if err := initDefaultEtcdClient(dbOpts); err != nil {
  137. log.Fatalf("init etcd client failed %s", err)
  138. }
  139. if err := models.InitHostHealthChecker(etcd.Default(), opts.HostHealthTimeout).
  140. StartHostsHealthCheck(context.Background()); err != nil {
  141. log.Fatalf("failed start host health checker %s", err)
  142. }
  143. }
  144. cronFunc := func() {
  145. err := taskman.TaskManager.InitializeData()
  146. if err != nil {
  147. log.Fatalf("TaskManager.InitializeData fail %s", err)
  148. }
  149. cachesync.StartTenantCacheSync(opts.TenantCacheExpireSeconds)
  150. cron := cronman.InitCronJobManager(true, options.Options.CronJobWorkerCount, options.Options.TimeZone)
  151. cron.AddJobAtIntervals("CleanPendingDeleteServers", time.Duration(opts.PendingDeleteCheckSeconds)*time.Second, models.GuestManager.CleanPendingDeleteServers)
  152. cron.AddJobAtIntervals("CleanPendingDeleteDisks", time.Duration(opts.PendingDeleteCheckSeconds)*time.Second, models.DiskManager.CleanPendingDeleteDisks)
  153. if opts.PrepaidExpireCheck {
  154. cron.AddJobAtIntervals("CleanExpiredPrepaidServers", time.Duration(opts.PrepaidExpireCheckSeconds)*time.Second, models.GuestManager.DeleteExpiredPrepaidServers)
  155. }
  156. if opts.PrepaidAutoRenew {
  157. cron.AddJobAtIntervals("AutoRenewPrepaidServers", time.Duration(opts.PrepaidAutoRenewHours)*time.Hour, models.GuestManager.AutoRenewPrepaidServer)
  158. }
  159. cron.AddJobAtIntervals("CleanExpiredPostpaidElasticCaches", time.Duration(opts.PrepaidExpireCheckSeconds)*time.Second, models.ElasticcacheManager.DeleteExpiredPostpaids)
  160. cron.AddJobAtIntervals("CleanExpiredPostpaidDBInstances", time.Duration(opts.PrepaidExpireCheckSeconds)*time.Second, models.DBInstanceManager.DeleteExpiredPostpaids)
  161. cron.AddJobAtIntervals("CleanExpiredPostpaidServers", time.Duration(opts.PrepaidExpireCheckSeconds)*time.Second, models.GuestManager.DeleteExpiredPostpaidServers)
  162. cron.AddJobAtIntervals("CleanExpiredPostpaidNatGateways", time.Duration(opts.PrepaidExpireCheckSeconds)*time.Second, models.NatGatewayManager.DeleteExpiredPostpaids)
  163. cron.AddJobAtIntervals("CleanExpiredPostpaidNas", time.Duration(opts.PrepaidExpireCheckSeconds)*time.Second, models.FileSystemManager.DeleteExpiredPostpaids)
  164. cron.AddJobAtIntervals("StartHostPingDetectionTask", time.Duration(opts.HostOfflineDetectionInterval)*time.Second, models.HostManager.PingDetectionTask)
  165. cron.AddJobAtIntervals("RefreshCloudproviderHostStatus", time.Duration(opts.ManagedHostSyncStatusIntervalSeconds)*time.Second, models.RefreshCloudproviderHostStatus)
  166. cron.AddJobAtIntervalsWithStartRun("CalculateQuotaUsages", time.Duration(opts.CalculateQuotaUsageIntervalSeconds)*time.Second, models.QuotaManager.CalculateQuotaUsages, true)
  167. cron.AddJobAtIntervalsWithStartRun("CalculateRegionQuotaUsages", time.Duration(opts.CalculateQuotaUsageIntervalSeconds)*time.Second, models.RegionQuotaManager.CalculateQuotaUsages, true)
  168. cron.AddJobAtIntervalsWithStartRun("CalculateZoneQuotaUsages", time.Duration(opts.CalculateQuotaUsageIntervalSeconds)*time.Second, models.ZoneQuotaManager.CalculateQuotaUsages, true)
  169. cron.AddJobAtIntervalsWithStartRun("CalculateProjectQuotaUsages", time.Duration(opts.CalculateQuotaUsageIntervalSeconds)*time.Second, models.ProjectQuotaManager.CalculateQuotaUsages, true)
  170. cron.AddJobAtIntervalsWithStartRun("CalculateDomainQuotaUsages", time.Duration(opts.CalculateQuotaUsageIntervalSeconds)*time.Second, models.DomainQuotaManager.CalculateQuotaUsages, true)
  171. cron.AddJobAtIntervalsWithStartRun("CalculateInfrasQuotaUsages", time.Duration(opts.CalculateQuotaUsageIntervalSeconds)*time.Second, models.InfrasQuotaManager.CalculateQuotaUsages, true)
  172. cron.AddJobAtIntervalsWithStartRun("AutoSyncCloudaccountStatusTask", time.Duration(opts.CloudAutoSyncIntervalSeconds)*time.Second, models.CloudaccountManager.AutoSyncCloudaccountStatusTask, true)
  173. cron.AddJobAtIntervalsWithStartRun("SyncCapacityUsedForEsxiStorage", time.Duration(opts.SyncStorageCapacityUsedIntervalMinutes)*time.Minute, models.StorageManager.SyncCapacityUsedForEsxiStorage, true)
  174. cron.AddJobEveryFewHour("AutoPurgeSplitable", 4, 30, 0, db.AutoPurgeSplitable, false)
  175. cron.AddJobEveryFewHour("AutoDiskSnapshot", 1, 5, 0, models.DiskManager.AutoDiskSnapshot, false)
  176. cron.AddJobEveryFewHour("AutoServerSnapshot", 1, 10, 0, models.InstanceSnapshotManager.AutoServerSnapshot, false)
  177. cron.AddJobEveryFewHour("SnapshotsCleanup", 1, 35, 0, models.SnapshotManager.CleanupSnapshots, false)
  178. cron.AddJobEveryFewHour("InstanceSnapshotsCleanup", 1, 35, 0, models.InstanceSnapshotManager.CleanupInstanceSnapshots, false)
  179. cron.AddJobEveryFewHour("AutoCleanImageCache", 1, 5, 0, models.CachedimageManager.AutoCleanImageCaches, false)
  180. cron.AddJobAtIntervalsWithStartRun("SyncSkus", time.Duration(opts.ServerSkuSyncIntervalMinutes)*time.Minute, models.SyncServerSkus, true)
  181. cron.AddJobEveryFewDays("SyncDBInstanceSkus", opts.SyncSkusDay, opts.SyncSkusHour, 0, 0, models.SyncDBInstanceSkus, true)
  182. cron.AddJobEveryFewDays("SyncNatSkus", opts.SyncSkusDay, opts.SyncSkusHour, 0, 0, models.SyncNatSkus, true)
  183. cron.AddJobEveryFewDays("SyncNasSkus", opts.SyncSkusDay, opts.SyncSkusHour, 0, 0, models.SyncNasSkus, true)
  184. cron.AddJobEveryFewDays("SyncElasticCacheSkus", opts.SyncSkusDay, opts.SyncSkusHour, 0, 0, models.SyncElasticCacheSkus, true)
  185. cron.AddJobEveryFewDays("SnapshotDataCleaning", 1, 0, 0, 0, models.SnapshotManager.DataCleaning, true)
  186. cron.AddJobAtIntervalsWithStartRun("SyncCloudImages", time.Duration(opts.CloudImagesSyncIntervalHours)*time.Hour, models.SyncPublicCloudImages, true)
  187. cron.AddJobEveryFewHour("InspectAllTemplate", 1, 0, 0, models.GuestTemplateManager.InspectAllTemplate, true)
  188. cron.AddJobEveryFewHour("CheckBillingResourceExpireAt", opts.ExpiredReleaseNotifyHour, 0, 0, models.CheckBillingResourceExpireAt, false)
  189. cron.AddJobEveryFewDays(
  190. "CleanRecycleDiskFiles", 1, 3, 0, 0, models.StoragesCleanRecycleDiskfiles, false)
  191. cron.AddJobAtIntervalsWithStartRun("TaskCleanupJob", time.Duration(options.Options.TaskArchiveIntervalMinutes)*time.Minute, taskman.TaskManager.TaskCleanupJob, true)
  192. if jobs != nil {
  193. jobs(cron)
  194. }
  195. // init auto scaling controller
  196. autoscaling.ASController.Init(options.Options.SASControllerOptions, cron)
  197. go cron.Start2(ctx, electObj)
  198. }
  199. go cronFunc()
  200. return cancelFunc
  201. }
  202. func initDefaultEtcdClient(opts *common_options.DBOptions) error {
  203. if etcd.Default() != nil {
  204. return nil
  205. }
  206. tlsConfig, err := opts.GetEtcdTLSConfig()
  207. if err != nil {
  208. return err
  209. }
  210. onKeepaliveFailure := func() {
  211. cli := etcd.Default()
  212. if opts.LockmanMethod == common_options.LockMethodEtcd {
  213. log.Fatalf("etcd keepalive failed and exit when lockman_method is %s", common_options.LockMethodEtcd)
  214. }
  215. if err := cli.RestartSession(); err != nil {
  216. log.Errorf("restart default session error: %v", err)
  217. return
  218. }
  219. }
  220. err = etcd.InitDefaultEtcdClient(&etcd.SEtcdOptions{
  221. EtcdEndpoint: opts.EtcdEndpoints,
  222. EtcdUsername: opts.EtcdUsername,
  223. EtcdPassword: opts.EtcdPassword,
  224. EtcdEnabldSsl: opts.EtcdUseTLS,
  225. TLSConfig: tlsConfig,
  226. }, onKeepaliveFailure)
  227. if err != nil {
  228. return errors.Wrap(err, "init default etcd client")
  229. }
  230. return nil
  231. }
  232. func initEtcdLockOpts(opts *options.ComputeOptions) error {
  233. etcdEndpoint, err := common_app.FetchEtcdServiceInfo()
  234. if err != nil {
  235. if errors.Cause(err) == httperrors.ErrNotFound {
  236. return nil
  237. }
  238. return errors.Wrap(err, "fetch etcd service info")
  239. }
  240. if etcdEndpoint != nil {
  241. opts.EtcdEndpoints = []string{etcdEndpoint.Url}
  242. if len(etcdEndpoint.CertId) > 0 {
  243. dir, err := ioutil.TempDir("", "etcd-cluster-tls")
  244. if err != nil {
  245. return errors.Wrap(err, "create dir etcd cluster tls")
  246. }
  247. opts.EtcdCert, err = writeFile(dir, "etcd.crt", []byte(etcdEndpoint.Certificate))
  248. if err != nil {
  249. return errors.Wrap(err, "write file certificate")
  250. }
  251. opts.EtcdKey, err = writeFile(dir, "etcd.key", []byte(etcdEndpoint.PrivateKey))
  252. if err != nil {
  253. return errors.Wrap(err, "write file private key")
  254. }
  255. opts.EtcdCacert, err = writeFile(dir, "etcd-ca.crt", []byte(etcdEndpoint.CaCertificate))
  256. if err != nil {
  257. return errors.Wrap(err, "write file cacert")
  258. }
  259. opts.EtcdUseTLS = true
  260. }
  261. }
  262. return nil
  263. }
  264. func writeFile(dir, file string, data []byte) (string, error) {
  265. p := filepath.Join(dir, file)
  266. return p, ioutil.WriteFile(p, data, 0600)
  267. }