scaling_policy.go 17 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518
  1. // Copyright 2019 Yunion
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package models
  15. import (
  16. "context"
  17. "database/sql"
  18. "fmt"
  19. "time"
  20. "yunion.io/x/jsonutils"
  21. "yunion.io/x/log"
  22. "yunion.io/x/pkg/errors"
  23. "yunion.io/x/pkg/utils"
  24. "yunion.io/x/sqlchemy"
  25. "yunion.io/x/onecloud/pkg/apis"
  26. api "yunion.io/x/onecloud/pkg/apis/compute"
  27. "yunion.io/x/onecloud/pkg/cloudcommon/db"
  28. "yunion.io/x/onecloud/pkg/cloudcommon/notifyclient"
  29. "yunion.io/x/onecloud/pkg/httperrors"
  30. "yunion.io/x/onecloud/pkg/mcclient"
  31. "yunion.io/x/onecloud/pkg/util/logclient"
  32. "yunion.io/x/onecloud/pkg/util/stringutils2"
  33. )
  34. // +onecloud:swagger-gen-model-singular=scalingpolicy
  35. // +onecloud:swagger-gen-model-plural=scalingpolicies
  36. type SScalingPolicyManager struct {
  37. db.SVirtualResourceBaseManager
  38. SScalingGroupResourceBaseManager
  39. db.SEnabledResourceBaseManager
  40. }
  41. type SScalingPolicy struct {
  42. db.SVirtualResourceBase
  43. SScalingGroupResourceBase
  44. db.SEnabledResourceBase
  45. TriggerType string `width:"16" charset:"ascii" default:"timing" create:"required" list:"user"`
  46. TriggerId string `width:"128" charset:"ascii"`
  47. // Action of scaling activity
  48. Action string `width:"8" charset:"ascii" default:"set" create:"required" list:"user"`
  49. Number int `nullable:"false" default:"1" create:"required" list:"user"`
  50. // Unit of Number
  51. Unit string `width:"4" charset:"ascii" create:"required" list:"user"`
  52. // Scaling activity triggered by alarms will be rejected during this period about CoolingTime
  53. CoolingTime int `nullable:"false" default:"300" create:"required" list:"user"`
  54. }
  55. var ScalingPolicyManager *SScalingPolicyManager
  56. func init() {
  57. ScalingPolicyManager = &SScalingPolicyManager{
  58. SVirtualResourceBaseManager: db.NewVirtualResourceBaseManager(
  59. SScalingPolicy{},
  60. "scalingpolicies_tbl",
  61. "scalingpolicy",
  62. "scalingpolicies",
  63. ),
  64. }
  65. ScalingPolicyManager.SetVirtualObject(ScalingPolicyManager)
  66. }
  67. func (spm *SScalingPolicyManager) ValidateListConditions(ctx context.Context, userCred mcclient.TokenCredential,
  68. query *jsonutils.JSONDict) (*jsonutils.JSONDict, error) {
  69. var err error
  70. query, err = spm.SVirtualResourceBaseManager.ValidateListConditions(ctx, userCred, query)
  71. if err != nil {
  72. return nil, err
  73. }
  74. if !query.Contains("scaling_group") {
  75. return nil, httperrors.NewInputParameterError("every scaling policy belong to a scaling group")
  76. }
  77. return query, nil
  78. }
  79. func (spm *SScalingPolicyManager) ListItemFilter(ctx context.Context, q *sqlchemy.SQuery,
  80. userCred mcclient.TokenCredential, input api.ScalingPolicyListInput) (*sqlchemy.SQuery, error) {
  81. var err error
  82. q, err = spm.SVirtualResourceBaseManager.ListItemFilter(ctx, q, userCred, input.VirtualResourceListInput)
  83. if err != nil {
  84. return q, err
  85. }
  86. q, err = spm.SScalingGroupResourceBaseManager.ListItemFilter(ctx, q, userCred, input.ScalingGroupFilterListInput)
  87. if err != nil {
  88. return q, err
  89. }
  90. q, err = spm.SEnabledResourceBaseManager.ListItemFilter(ctx, q, userCred, input.EnabledResourceBaseListInput)
  91. if err != nil {
  92. return q, err
  93. }
  94. if len(input.TriggerType) != 0 {
  95. q = q.Equals("trigger_type", input.TriggerType)
  96. }
  97. return q, nil
  98. }
  99. func (spm *SScalingPolicyManager) QueryDistinctExtraField(q *sqlchemy.SQuery, field string) (*sqlchemy.SQuery, error) {
  100. q, err := spm.SVirtualResourceBaseManager.QueryDistinctExtraField(q, field)
  101. if err == nil {
  102. return q, nil
  103. }
  104. return spm.SScalingGroupResourceBaseManager.QueryDistinctExtraField(q, field)
  105. }
  106. func (sgm *SScalingPolicy) GetUniqValues() jsonutils.JSONObject {
  107. return jsonutils.Marshal(map[string]string{"scaling_group_id": sgm.ScalingGroupId})
  108. }
  109. func (spm *SScalingPolicyManager) FetchUniqValues(ctx context.Context, data jsonutils.JSONObject) jsonutils.JSONObject {
  110. return spm.SScalingGroupResourceBaseManager.FetchUniqValues(ctx, data)
  111. }
  112. func (spm *SScalingPolicyManager) FilterByUniqValues(q *sqlchemy.SQuery, values jsonutils.JSONObject) *sqlchemy.SQuery {
  113. return spm.SScalingGroupResourceBaseManager.FilterByUniqValues(q, values)
  114. }
  115. func (spm *SScalingPolicyManager) OrderByExtraFields(ctx context.Context, q *sqlchemy.SQuery,
  116. userCred mcclient.TokenCredential, query api.ScalingPolicyListInput) (*sqlchemy.SQuery, error) {
  117. return spm.SVirtualResourceBaseManager.OrderByExtraFields(ctx, q, userCred, query.VirtualResourceListInput)
  118. }
  119. func (spm *SScalingPolicyManager) FetchCustomizeColumns(
  120. ctx context.Context,
  121. userCred mcclient.TokenCredential,
  122. query jsonutils.JSONObject,
  123. objs []interface{},
  124. fields stringutils2.SSortedStrings,
  125. isList bool,
  126. ) []api.ScalingPolicyDetails {
  127. rows := make([]api.ScalingPolicyDetails, len(objs))
  128. statusRows := spm.SVirtualResourceBaseManager.FetchCustomizeColumns(ctx, userCred, query, objs, fields, isList)
  129. sgRows := spm.SScalingGroupResourceBaseManager.FetchCustomizeColumns(ctx, userCred, query, objs, fields, isList)
  130. var err error
  131. for i := range rows {
  132. rows[i], err = objs[i].(*SScalingPolicy).getMoreDetails(ctx, userCred, query, isList)
  133. if err != nil {
  134. log.Errorf("SScalingPolicy.getMoreDetails error: %s", err)
  135. }
  136. rows[i].VirtualResourceDetails = statusRows[i]
  137. rows[i].ScalingGroupResourceInfo = sgRows[i]
  138. }
  139. return rows
  140. }
  141. func (sp *SScalingPolicy) getMoreDetails(ctx context.Context, userCred mcclient.TokenCredential,
  142. query jsonutils.JSONObject, isList bool) (api.ScalingPolicyDetails, error) {
  143. var out api.ScalingPolicyDetails
  144. switch sp.TriggerType {
  145. case api.TRIGGER_ALARM:
  146. model, err := ScalingAlarmManager.FetchById(sp.TriggerId)
  147. if errors.Cause(err) == sql.ErrNoRows {
  148. return out, nil
  149. }
  150. if err != nil {
  151. return out, errors.Wrap(err, "ScalingAlarmManager.FetchById")
  152. }
  153. out.Alarm = model.(*SScalingAlarm).AlarmDetails()
  154. case api.TRIGGER_TIMING:
  155. model, err := ScalingTimerManager.FetchById(sp.TriggerId)
  156. if errors.Cause(err) == sql.ErrNoRows {
  157. return out, nil
  158. }
  159. if err != nil {
  160. return out, errors.Wrap(err, "ScalingTimerManager.FetchById")
  161. }
  162. out.Timer = model.(*SScalingTimer).TimerDetails()
  163. case api.TRIGGER_CYCLE:
  164. model, err := ScalingTimerManager.FetchById(sp.TriggerId)
  165. if errors.Cause(err) == sql.ErrNoRows {
  166. return out, nil
  167. }
  168. if err != nil {
  169. return out, errors.Wrap(err, "ScalingTimerManager.FetchById")
  170. }
  171. out.CycleTimer = model.(*SScalingTimer).CycleTimerDetails()
  172. }
  173. return out, nil
  174. }
  175. func (spm *SScalingPolicyManager) ValidateCreateData(ctx context.Context, userCred mcclient.TokenCredential,
  176. ownerId mcclient.IIdentityProvider, query jsonutils.JSONObject, input api.ScalingPolicyCreateInput) (
  177. api.ScalingPolicyCreateInput, error) {
  178. log.Debugf("insert validateCreateData")
  179. var err error
  180. input.VirtualResourceCreateInput, err = spm.SVirtualResourceBaseManager.ValidateCreateData(ctx, userCred, ownerId, query,
  181. input.VirtualResourceCreateInput)
  182. if err != nil {
  183. return input, err
  184. }
  185. // check scaling group
  186. idOrName := input.ScalingGroup
  187. if len(input.ScalingGroupId) != 0 {
  188. idOrName = input.ScalingGroupId
  189. }
  190. model, err := ScalingGroupManager.FetchByIdOrName(ctx, userCred, idOrName)
  191. if errors.Cause(err) == sql.ErrNoRows {
  192. return input, httperrors.NewInputParameterError("no such scaling group %s", idOrName)
  193. }
  194. if err != nil {
  195. return input, errors.Wrap(err, "ScalingGroupManager.FetchByIdOrName")
  196. }
  197. input.ScalingGroupId = model.GetId()
  198. if !utils.IsInStringArray(input.TriggerType, []string{api.TRIGGER_TIMING, api.TRIGGER_CYCLE, api.TRIGGER_ALARM}) {
  199. return input, httperrors.NewInputParameterError("unkown trigger type %s", input.TriggerType)
  200. }
  201. if !utils.IsInStringArray(input.Action, []string{api.ACTION_ADD, api.ACTION_REMOVE, api.ACTION_SET}) {
  202. return input, httperrors.NewInputParameterError("unkown scaling policy action %s", input.Action)
  203. }
  204. if !utils.IsInStringArray(input.Unit, []string{api.UNIT_ONE, api.UNIT_PERCENT}) {
  205. return input, httperrors.NewInputParameterError("unkown scaling policy unit %s", input.Unit)
  206. }
  207. trigger, err := ScalingPolicyManager.Trigger(&input)
  208. if err != nil {
  209. return input, errors.Wrap(err, "ScalingPolicyManager.Trigger")
  210. }
  211. input, err = trigger.ValidateCreateData(input)
  212. if err != nil {
  213. return input, httperrors.NewInputParameterError("%v", err)
  214. }
  215. return input, err
  216. }
  217. func (sp *SScalingPolicy) CustomizeCreate(ctx context.Context, userCred mcclient.TokenCredential,
  218. ownerId mcclient.IIdentityProvider, query jsonutils.JSONObject, data jsonutils.JSONObject) error {
  219. // sp.Project must be same with sp.ScalingGroup
  220. sg, err := sp.ScalingGroup()
  221. if err != nil {
  222. return err
  223. }
  224. ownerId = sg.GetOwnerId()
  225. return sp.SVirtualResourceBase.CustomizeCreate(ctx, userCred, ownerId, query, data)
  226. }
  227. func (sp *SScalingPolicy) Delete(ctx context.Context, userCred mcclient.TokenCredential) error {
  228. // do nothing
  229. sp.SetStatus(ctx, userCred, api.SP_STATUS_DELETING, "")
  230. return nil
  231. }
  232. func (sp *SScalingPolicy) RealDelete(ctx context.Context, userCred mcclient.TokenCredential) error {
  233. trigger, err := sp.Trigger(nil)
  234. if err != nil {
  235. return errors.Wrap(err, "SScalingPolicy.Trigger")
  236. }
  237. if trigger != nil {
  238. err = trigger.UnRegister(ctx, userCred)
  239. if err != nil {
  240. return errors.Wrap(err, "IScalingTrigger.UnRegister")
  241. }
  242. }
  243. return sp.SResourceBase.Delete(ctx, userCred)
  244. }
  245. func (sp *SScalingPolicy) PostDelete(ctx context.Context, userCred mcclient.TokenCredential) {
  246. go func() {
  247. fail := func(sg *SScalingGroup, reason string) {
  248. if sg != nil {
  249. logclient.AddActionLogWithContext(ctx, sg, logclient.ACT_DELETE_SCALING_POLICY, reason, userCred, false)
  250. }
  251. sp.SetStatus(ctx, userCred, api.SP_STATUS_DELETE_FAILED, reason)
  252. }
  253. sg, err := sp.ScalingGroup()
  254. if err != nil {
  255. fail(nil, err.Error())
  256. return
  257. }
  258. err = sp.RealDelete(ctx, userCred)
  259. if err != nil {
  260. fail(sg, err.Error())
  261. return
  262. }
  263. logclient.AddActionLogWithContext(ctx, sg, logclient.ACT_DELETE_SCALING_POLICY, "", userCred, true)
  264. }()
  265. }
  266. func (spm *SScalingPolicyManager) Trigger(input *api.ScalingPolicyCreateInput) (IScalingTrigger, error) {
  267. tem := SScalingPolicy{TriggerType: input.TriggerType}
  268. return tem.Trigger(input)
  269. }
  270. func (sp *SScalingPolicy) Trigger(input *api.ScalingPolicyCreateInput) (IScalingTrigger, error) {
  271. log.Debugf("inset Trigger")
  272. if input != nil {
  273. switch sp.TriggerType {
  274. case api.TRIGGER_TIMING:
  275. return &SScalingTimer{
  276. SScalingPolicyBase: SScalingPolicyBase{sp.GetId()},
  277. STimer: STimer{
  278. Type: api.TIMER_TYPE_ONCE,
  279. StartTime: input.Timer.ExecTime,
  280. EndTime: input.Timer.ExecTime,
  281. NextTime: input.Timer.ExecTime,
  282. },
  283. }, nil
  284. case api.TRIGGER_CYCLE:
  285. trigger := &SScalingTimer{
  286. SScalingPolicyBase: SScalingPolicyBase{sp.GetId()},
  287. STimer: STimer{
  288. Type: input.CycleTimer.CycleType,
  289. Minute: input.CycleTimer.Minute,
  290. Hour: input.CycleTimer.Hour,
  291. StartTime: input.CycleTimer.StartTime,
  292. EndTime: input.CycleTimer.EndTime,
  293. NextTime: time.Time{},
  294. },
  295. }
  296. log.Debugf("setweekdays")
  297. trigger.SetWeekDays(input.CycleTimer.WeekDays)
  298. log.Debugf("setmonthdays")
  299. trigger.SetMonthDays(input.CycleTimer.MonthDays)
  300. trigger.Update(time.Now())
  301. return trigger, nil
  302. case api.TRIGGER_ALARM:
  303. return &SScalingAlarm{
  304. SScalingPolicyBase: SScalingPolicyBase{sp.GetId()},
  305. Cumulate: input.Alarm.Cumulate,
  306. Cycle: input.Alarm.Cycle,
  307. Indicator: input.Alarm.Indicator,
  308. Wrapper: input.Alarm.Wrapper,
  309. Operator: input.Alarm.Operator,
  310. Value: input.Alarm.Value,
  311. RealCumulate: 0,
  312. LastTriggerTime: time.Now(),
  313. }, nil
  314. default:
  315. return nil, fmt.Errorf("unkown trigger type %s", sp.TriggerType)
  316. }
  317. }
  318. if len(sp.TriggerId) == 0 {
  319. return nil, nil
  320. }
  321. switch sp.TriggerType {
  322. case api.TRIGGER_TIMING, api.TRIGGER_CYCLE:
  323. model, err := ScalingTimerManager.FetchById(sp.TriggerId)
  324. if err != nil {
  325. return nil, errors.Wrap(err, "SScalingTimerManager.FetchById")
  326. }
  327. return model.(*SScalingTimer), nil
  328. case api.TRIGGER_ALARM:
  329. model, err := ScalingAlarmManager.FetchById(sp.TriggerId)
  330. if err != nil {
  331. return nil, errors.Wrap(err, "SScalingAlarmManager.FetchById")
  332. }
  333. return model.(*SScalingAlarm), nil
  334. default:
  335. return nil, fmt.Errorf("unkown trigger type %s", sp.TriggerType)
  336. }
  337. }
  338. func (sp *SScalingPolicy) PerformTrigger(ctx context.Context, userCred mcclient.TokenCredential, query jsonutils.JSONObject, data jsonutils.JSONObject) (jsonutils.JSONObject, error) {
  339. if sp.Status != api.SP_STATUS_READY {
  340. return nil, httperrors.NewForbiddenError("Can't trigger scaling policy without status 'ready'")
  341. }
  342. var (
  343. triggerDesc IScalingTriggerDesc
  344. err error
  345. )
  346. if sp.Enabled.IsFalse() {
  347. return nil, nil
  348. }
  349. sg, err := sp.ScalingGroup()
  350. if err != nil {
  351. return nil, errors.Wrap(err, "ScalingPolicy.ScalingGroup")
  352. }
  353. if sg.Enabled.IsFalse() {
  354. return nil, nil
  355. }
  356. manual, _ := data.Bool("manual")
  357. if manual {
  358. triggerDesc = SScalingManual{SScalingPolicyBase{sp.Id}}
  359. } else {
  360. trigger, err := sp.Trigger(nil)
  361. if err != nil {
  362. return nil, errors.Wrap(err, "fetch trigger failed")
  363. }
  364. if data.Contains("alarm_id") {
  365. alarmId, _ := data.GetString("alarm_id")
  366. if alarmId != trigger.(*SScalingAlarm).AlarmId {
  367. return nil, httperrors.NewInputParameterError("mismatched alarm id")
  368. }
  369. }
  370. if !trigger.IsTrigger() {
  371. return nil, nil
  372. }
  373. triggerDesc = trigger
  374. }
  375. err = sg.Scale(ctx, triggerDesc, sp, sp.CoolingTime)
  376. if err != nil {
  377. return nil, errors.Wrap(err, "ScalingPolicy.Scale")
  378. }
  379. sp.EventNotify(ctx, userCred)
  380. return nil, err
  381. }
  382. func (sp *SScalingPolicy) EventNotify(ctx context.Context, userCred mcclient.TokenCredential) {
  383. notifyclient.EventNotify(ctx, userCred, notifyclient.SEventNotifyParam{
  384. Obj: sp,
  385. Action: notifyclient.ActionExecute,
  386. })
  387. }
  388. func (sp *SScalingPolicy) ScalingGroup() (*SScalingGroup, error) {
  389. model, err := ScalingGroupManager.FetchById(sp.ScalingGroupId)
  390. if err != nil {
  391. return nil, errors.Wrap(err, "ScalingGroupManager.FetchById")
  392. }
  393. return model.(*SScalingGroup), nil
  394. }
  395. type IScalingAction interface {
  396. Exec(int) int
  397. CheckCoolTime() bool
  398. }
  399. func (sp *SScalingPolicy) Exec(from int) int {
  400. diff := sp.Number
  401. if sp.Unit == api.UNIT_PERCENT {
  402. diff = diff * from / 100
  403. }
  404. switch sp.Action {
  405. case api.ACTION_ADD:
  406. return from + diff
  407. case api.ACTION_REMOVE:
  408. return from - diff
  409. case api.ACTION_SET:
  410. return diff
  411. default:
  412. return from
  413. }
  414. }
  415. func (sp *SScalingPolicy) CheckCoolTime() bool {
  416. if sp.TriggerType == api.TRIGGER_ALARM {
  417. return true
  418. }
  419. return false
  420. }
  421. func (sp *SScalingPolicy) PerformEnable(ctx context.Context, userCred mcclient.TokenCredential,
  422. query jsonutils.JSONObject, input apis.PerformEnableInput) (jsonutils.JSONObject, error) {
  423. err := db.EnabledPerformEnable(sp, ctx, userCred, true)
  424. if err != nil {
  425. return nil, errors.Wrap(err, "EnabledPerformEnable")
  426. }
  427. return nil, nil
  428. }
  429. func (sp *SScalingPolicy) PerformDisable(ctx context.Context, userCred mcclient.TokenCredential, query jsonutils.JSONObject,
  430. input apis.PerformDisableInput) (jsonutils.JSONObject, error) {
  431. err := db.EnabledPerformEnable(sp, ctx, userCred, false)
  432. if err != nil {
  433. return nil, errors.Wrap(err, "EnabledPerformEnable")
  434. }
  435. return nil, nil
  436. }
  437. func (sg *SScalingPolicy) PostCreate(ctx context.Context, userCred mcclient.TokenCredential, ownerId mcclient.IIdentityProvider, query jsonutils.JSONObject, data jsonutils.JSONObject) {
  438. sg.SStandaloneResourceBase.PostCreate(ctx, userCred, ownerId, query, data)
  439. sg.SetStatus(ctx, userCred, api.SP_STATUS_CREATING, "")
  440. go func() {
  441. sp, err := sg.ScalingGroup()
  442. if err != nil {
  443. log.Errorf("Get ScalingGroup of ScalingPolicy '%s' failed: %s", sg.GetId(), err.Error())
  444. }
  445. createFailed := func(reason string) {
  446. logclient.AddActionLogWithContext(ctx, sp, logclient.ACT_CREATE_SCALING_POLICY, reason, userCred, false)
  447. sg.SetStatus(ctx, userCred, api.SP_STATUS_CREATE_FAILED, reason)
  448. }
  449. input := api.ScalingPolicyCreateInput{}
  450. err = data.Unmarshal(&input)
  451. if err != nil {
  452. createFailed(fmt.Sprintf("data.Unmarshal: %s", err.Error()))
  453. return
  454. }
  455. trigger, err := sg.Trigger(&input)
  456. if err != nil {
  457. createFailed(fmt.Sprintf("ScalingPolicy get trigger: %s", err.Error()))
  458. return
  459. }
  460. err = trigger.Register(ctx, userCred)
  461. if err != nil {
  462. createFailed(fmt.Sprintf("Trigger.Register: %s", err.Error()))
  463. return
  464. }
  465. logclient.AddActionLogWithContext(ctx, sp, logclient.ACT_CREATE_SCALING_POLICY, "", userCred, true)
  466. db.Update(sg, func() error {
  467. sg.TriggerId = trigger.TriggerId()
  468. sg.Status = api.SP_STATUS_READY
  469. sg.SetEnabled(true)
  470. return nil
  471. })
  472. }()
  473. }