cloudproviders.go 10.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336
  1. // Copyright 2019 Yunion
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package models
  15. import (
  16. "context"
  17. "time"
  18. "yunion.io/x/cloudmux/pkg/cloudprovider"
  19. "yunion.io/x/jsonutils"
  20. "yunion.io/x/log"
  21. "yunion.io/x/pkg/errors"
  22. "yunion.io/x/pkg/util/compare"
  23. "yunion.io/x/pkg/util/timeutils"
  24. "yunion.io/x/sqlchemy"
  25. api "yunion.io/x/onecloud/pkg/apis/compute"
  26. "yunion.io/x/onecloud/pkg/cloudcommon/db"
  27. "yunion.io/x/onecloud/pkg/cloudcommon/db/taskman"
  28. "yunion.io/x/onecloud/pkg/cloudevent/options"
  29. "yunion.io/x/onecloud/pkg/httperrors"
  30. "yunion.io/x/onecloud/pkg/mcclient"
  31. "yunion.io/x/onecloud/pkg/mcclient/auth"
  32. modules "yunion.io/x/onecloud/pkg/mcclient/modules/compute"
  33. "yunion.io/x/onecloud/pkg/s3gateway/session"
  34. )
  35. type SCloudproviderManager struct {
  36. db.SEnabledStatusStandaloneResourceBaseManager
  37. db.SProjectizedResourceBaseManager
  38. }
  39. var CloudproviderManager *SCloudproviderManager
  40. func init() {
  41. CloudproviderManager = &SCloudproviderManager{
  42. SEnabledStatusStandaloneResourceBaseManager: db.NewEnabledStatusStandaloneResourceBaseManager(
  43. SCloudprovider{},
  44. "cloudproviders_tbl",
  45. "cloudprovider",
  46. "cloudproviders",
  47. ),
  48. }
  49. CloudproviderManager.SetVirtualObject(CloudproviderManager)
  50. }
  51. type SCloudprovider struct {
  52. db.SEnabledStatusStandaloneResourceBase
  53. db.SProjectizedResourceBase
  54. SyncStatus string
  55. LastSync time.Time
  56. LastSyncEndAt time.Time
  57. LastSyncTimeAt time.Time
  58. Provider string `width:"64" charset:"ascii" list:"domain"`
  59. Brand string `width:"64" charset:"ascii" list:"domain"`
  60. }
  61. func (manager *SCloudproviderManager) GetRegionCloudproviders(ctx context.Context, userCred mcclient.TokenCredential) ([]SCloudprovider, error) {
  62. s := session.GetSession(ctx, userCred)
  63. data := []jsonutils.JSONObject{}
  64. offset := int64(0)
  65. params := jsonutils.NewDict()
  66. params.Set("scope", jsonutils.NewString("system"))
  67. params.Set("limit", jsonutils.NewInt(1024))
  68. for {
  69. params.Set("offset", jsonutils.NewInt(offset))
  70. result, err := modules.Cloudproviders.List(s, params)
  71. if err != nil {
  72. return nil, errors.Wrap(err, "modules.Cloudproviders.List")
  73. }
  74. data = append(data, result.Data...)
  75. if len(data) >= result.Total {
  76. break
  77. }
  78. offset += 1024
  79. }
  80. providers := []SCloudprovider{}
  81. err := jsonutils.Update(&providers, data)
  82. if err != nil {
  83. return nil, errors.Wrap(err, "jsonutils.Update")
  84. }
  85. return providers, nil
  86. }
  87. func (manager *SCloudproviderManager) GetLocalCloudproviders() ([]SCloudprovider, error) {
  88. dbProviders := []SCloudprovider{}
  89. q := manager.Query()
  90. err := db.FetchModelObjects(manager, q, &dbProviders)
  91. if err != nil {
  92. return nil, err
  93. }
  94. return dbProviders, nil
  95. }
  96. func (manager *SCloudproviderManager) QueryDistinctExtraField(q *sqlchemy.SQuery, field string) (*sqlchemy.SQuery, error) {
  97. var err error
  98. q, err = manager.SEnabledStatusStandaloneResourceBaseManager.QueryDistinctExtraField(q, field)
  99. if err == nil {
  100. return q, nil
  101. }
  102. return q, httperrors.ErrNotFound
  103. }
  104. func (manager *SCloudproviderManager) syncCloudproviders(ctx context.Context, userCred mcclient.TokenCredential) compare.SyncResult {
  105. result := compare.SyncResult{}
  106. providers, err := manager.GetRegionCloudproviders(ctx, userCred)
  107. if err != nil {
  108. result.Error(errors.Wrap(err, "GetRegionCloudproviders"))
  109. return result
  110. }
  111. dbProviders, err := manager.GetLocalCloudproviders()
  112. if err != nil {
  113. result.Error(errors.Wrap(err, "GetLocalCloudproviders"))
  114. return result
  115. }
  116. removed := make([]SCloudprovider, 0)
  117. commondb := make([]SCloudprovider, 0)
  118. commonext := make([]SCloudprovider, 0)
  119. added := make([]SCloudprovider, 0)
  120. err = compare.CompareSets(dbProviders, providers, &removed, &commondb, &commonext, &added)
  121. if err != nil {
  122. result.Error(errors.Wrap(err, "CompareSets"))
  123. return result
  124. }
  125. for i := 0; i < len(removed); i++ {
  126. err = removed[i].Delete(ctx, userCred)
  127. if err != nil {
  128. result.DeleteError(err)
  129. continue
  130. }
  131. result.Delete()
  132. }
  133. for i := 0; i < len(commondb); i++ {
  134. err = commondb[i].syncWithRegionProvider(ctx, userCred, commonext[i])
  135. if err != nil {
  136. result.UpdateError(err)
  137. continue
  138. }
  139. result.Update()
  140. }
  141. for i := 0; i < len(added); i++ {
  142. err = manager.newFromRegionProvider(ctx, userCred, added[i])
  143. if err != nil {
  144. result.AddError(err)
  145. continue
  146. }
  147. result.Add()
  148. }
  149. return result
  150. }
  151. func (manager *SCloudproviderManager) SyncCloudproviders(ctx context.Context, userCred mcclient.TokenCredential, isStart bool) {
  152. result := manager.syncCloudproviders(ctx, userCred)
  153. info := result.Result()
  154. log.Infof("sync cloudproviders result: %s", info)
  155. }
  156. func (provider *SCloudprovider) syncWithRegionProvider(ctx context.Context, userCred mcclient.TokenCredential, cloudprovider SCloudprovider) error {
  157. _, err := db.Update(provider, func() error {
  158. provider.Status = cloudprovider.Status
  159. provider.Enabled = cloudprovider.Enabled
  160. provider.Brand = cloudprovider.Brand
  161. provider.ProjectId = cloudprovider.ProjectId
  162. provider.DomainId = cloudprovider.DomainId
  163. return nil
  164. })
  165. return err
  166. }
  167. func (self *SCloudprovider) MarkSyncing(userCred mcclient.TokenCredential) error {
  168. _, err := db.Update(self, func() error {
  169. self.SyncStatus = api.CLOUD_PROVIDER_SYNC_STATUS_SYNCING
  170. self.LastSync = timeutils.UtcNow()
  171. self.LastSyncEndAt = time.Time{}
  172. return nil
  173. })
  174. if err != nil {
  175. return errors.Wrap(err, "db.Update")
  176. }
  177. return nil
  178. }
  179. func (self *SCloudprovider) MarkEndSync(userCred mcclient.TokenCredential) error {
  180. _, err := db.Update(self, func() error {
  181. self.SyncStatus = api.CLOUD_PROVIDER_SYNC_STATUS_IDLE
  182. self.LastSyncEndAt = timeutils.UtcNow()
  183. return nil
  184. })
  185. if err != nil {
  186. return errors.Wrap(err, "db.Update")
  187. }
  188. return nil
  189. }
  190. func (self *SCloudprovider) SetLastSyncTimeAt(userCred mcclient.TokenCredential, last time.Time) error {
  191. _, err := db.Update(self, func() error {
  192. self.LastSyncTimeAt = last
  193. return nil
  194. })
  195. if err != nil {
  196. return errors.Wrap(err, "db.Update")
  197. }
  198. return nil
  199. }
  200. func (manager *SCloudproviderManager) newFromRegionProvider(ctx context.Context, userCred mcclient.TokenCredential, cloudprovider SCloudprovider) error {
  201. cloudprovider.SyncStatus = api.CLOUD_PROVIDER_SYNC_STATUS_IDLE
  202. return manager.TableSpec().Insert(ctx, &cloudprovider)
  203. }
  204. func (manager *SCloudproviderManager) syncCloudeventTask(ctx context.Context, userCred mcclient.TokenCredential) error {
  205. cloudproviders := []SCloudprovider{}
  206. q := manager.Query().IsTrue("enabled").Equals("status", api.CLOUD_PROVIDER_CONNECTED).Equals("sync_status", api.CLOUD_PROVIDER_SYNC_STATUS_IDLE)
  207. err := db.FetchModelObjects(manager, q, &cloudproviders)
  208. if err != nil {
  209. return errors.Wrap(err, "db.FetchModelObjects")
  210. }
  211. for i := range cloudproviders {
  212. err = cloudproviders[i].StartCloudeventSyncTask(ctx, userCred)
  213. if err != nil {
  214. log.Errorf("Failed start cloudevent sync task for cloudprovider %s (%s) error: %v", cloudproviders[i].Name, cloudproviders[i].Id, err)
  215. }
  216. }
  217. return nil
  218. }
  219. func (manager *SCloudproviderManager) SyncCloudeventTask(ctx context.Context, userCred mcclient.TokenCredential, isStart bool) {
  220. if options.Options.DisableSyncCloudEvent {
  221. return
  222. }
  223. err := manager.syncCloudeventTask(ctx, userCred)
  224. if err != nil {
  225. log.Errorf("syncCloudeventTask error: %v", err)
  226. }
  227. }
  228. func (provider *SCloudprovider) StartCloudeventSyncTask(ctx context.Context, userCred mcclient.TokenCredential) error {
  229. params := jsonutils.NewDict()
  230. task, err := taskman.TaskManager.NewTask(ctx, "CloudeventSyncTask", provider, userCred, params, "", "", nil)
  231. if err != nil {
  232. return errors.Wrap(err, "NewTask")
  233. }
  234. provider.MarkSyncing(userCred)
  235. task.ScheduleRun(nil)
  236. return nil
  237. }
  238. func (self *SCloudprovider) GetNextTimeRange() (time.Time, time.Time, error) {
  239. start, end := time.Time{}, time.Time{}
  240. factory, err := self.GetProviderFactory()
  241. if err != nil {
  242. return start, end, errors.Wrap(err, "self.GetProviderFactory")
  243. }
  244. if !self.LastSyncTimeAt.IsZero() {
  245. start = self.LastSyncTimeAt
  246. } else {
  247. start = time.Now().AddDate(0, 0, -1*factory.GetMaxCloudEventKeepDays())
  248. }
  249. // 避免cloudevent过长时间未运行,再次运行时记录的最后一条时间距离现在间隔太长
  250. if start.Before(time.Now().AddDate(0, 0, factory.GetMaxCloudEventKeepDays()*-1)) {
  251. start = time.Now().AddDate(0, 0, factory.GetMaxCloudEventKeepDays()*-1)
  252. }
  253. if options.Options.OneSyncForHours > factory.GetMaxCloudEventSyncDays()*24 {
  254. end = start.Add(time.Duration(factory.GetMaxCloudEventSyncDays()*24) * time.Hour)
  255. } else {
  256. end = start.Add(time.Duration(options.Options.OneSyncForHours) * time.Hour)
  257. }
  258. start = start.Add(time.Second)
  259. if end.After(time.Now()) {
  260. end = time.Now()
  261. }
  262. return start, end, nil
  263. }
  264. func (provider *SCloudprovider) GetProviderFactory() (cloudprovider.ICloudProviderFactory, error) {
  265. return cloudprovider.GetProviderFactory(provider.Provider)
  266. }
  267. func (provider SCloudprovider) GetGlobalId() string {
  268. return provider.Id
  269. }
  270. func (provider SCloudprovider) GetExternalId() string {
  271. return provider.Id
  272. }
  273. func (self *SCloudprovider) GetProvider() (cloudprovider.ICloudProvider, error) {
  274. ctx := context.Background()
  275. s := auth.GetAdminSession(ctx, options.Options.Region)
  276. return modules.Cloudproviders.GetProvider(ctx, s, self.Id)
  277. }
  278. func (manager *SCloudproviderManager) InitializeData() error {
  279. providers := []SCloudprovider{}
  280. q := manager.Query().NotEquals("sync_status", api.CLOUD_PROVIDER_SYNC_STATUS_IDLE)
  281. err := db.FetchModelObjects(manager, q, &providers)
  282. if err != nil {
  283. return err
  284. }
  285. for i := range providers {
  286. _, err = db.Update(&providers[i], func() error {
  287. providers[i].SyncStatus = api.CLOUD_PROVIDER_SYNC_STATUS_IDLE
  288. return nil
  289. })
  290. if err != nil {
  291. return err
  292. }
  293. }
  294. return nil
  295. }