timer.go 3.4 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798
  1. // Copyright 2019 Yunion
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package autoscaling
  15. import (
  16. "context"
  17. "time"
  18. "yunion.io/x/jsonutils"
  19. "yunion.io/x/log"
  20. "yunion.io/x/onecloud/pkg/apis/compute"
  21. "yunion.io/x/onecloud/pkg/cloudcommon/db"
  22. "yunion.io/x/onecloud/pkg/compute/models"
  23. "yunion.io/x/onecloud/pkg/mcclient"
  24. "yunion.io/x/onecloud/pkg/mcclient/auth"
  25. modules "yunion.io/x/onecloud/pkg/mcclient/modules/compute"
  26. )
  27. type STimeScope struct {
  28. Start time.Time
  29. End time.Time
  30. Median time.Time
  31. }
  32. func (asc *SASController) timeScope(median time.Time, interval time.Duration) STimeScope {
  33. ri := interval / 2
  34. return STimeScope{
  35. Start: median.Add(-ri),
  36. End: median.Add(ri),
  37. Median: median,
  38. }
  39. }
  40. func (asc *SASController) Timer(ctx context.Context, userCred mcclient.TokenCredential, isStart bool) {
  41. // 60 is for fault tolerance
  42. interval := asc.options.TimerInterval + 30
  43. timeScope := asc.timeScope(time.Now(), time.Duration(interval)*time.Second)
  44. spSubQ := models.ScalingPolicyManager.Query("id").Equals("status", compute.SP_STATUS_READY).SubQuery()
  45. q := models.ScalingTimerManager.Query().
  46. LT("next_time", timeScope.End).
  47. IsFalse("is_expired").In("scaling_policy_id", spSubQ)
  48. scalingTimers := make([]models.SScalingTimer, 0, 5)
  49. err := db.FetchModelObjects(models.ScalingTimerManager, q, &scalingTimers)
  50. if err != nil {
  51. log.Errorf("db.FetchModelObjects error: %s", err.Error())
  52. return
  53. }
  54. log.Debugf("total %d need to exec, %v", len(scalingTimers), scalingTimers)
  55. log.Debugf("timeScope: start: %s, end: %s", timeScope.Start, timeScope.End)
  56. session := auth.GetSession(ctx, userCred, "")
  57. triggerParams := jsonutils.NewDict()
  58. for i := range scalingTimers {
  59. scalingTimer := scalingTimers[i]
  60. asc.timerQueue <- struct{}{}
  61. go func(ctx context.Context) {
  62. defer func() {
  63. <-asc.timerQueue
  64. }()
  65. if scalingTimer.NextTime.Before(timeScope.Start) {
  66. // For unknown reasons, the scalingTimer did not execute at the specified time
  67. scalingTimer.Update(timeScope.Start)
  68. // scalingTimer should not exec for now.
  69. if scalingTimer.NextTime.After(timeScope.End) || scalingTimer.IsExpired {
  70. err = models.ScalingTimerManager.TableSpec().InsertOrUpdate(ctx, &scalingTimer)
  71. if err != nil {
  72. log.Errorf("update ScalingTimer whose ScalingPolicyId is %s error: %s",
  73. scalingTimer.ScalingPolicyId, err.Error())
  74. }
  75. return
  76. }
  77. }
  78. _, err = modules.ScalingPolicy.PerformAction(session, scalingTimer.ScalingPolicyId, "trigger",
  79. triggerParams)
  80. if err != nil {
  81. log.Errorf("unable to request to trigger ScalingPolicy '%s'", scalingTimer.ScalingPolicyId)
  82. }
  83. scalingTimer.Update(timeScope.End)
  84. err = models.ScalingTimerManager.TableSpec().InsertOrUpdate(ctx, &scalingTimer)
  85. if err != nil {
  86. log.Errorf("update ScalingTimer whose ScalingPolicyId is %s error: %s",
  87. scalingTimer.ScalingPolicyId, err.Error())
  88. }
  89. }(ctx)
  90. }
  91. }