scaling_trigger.go 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437
  1. // Copyright 2019 Yunion
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package models
  15. import (
  16. "context"
  17. "database/sql"
  18. "fmt"
  19. "time"
  20. "yunion.io/x/jsonutils"
  21. "yunion.io/x/log"
  22. "yunion.io/x/pkg/errors"
  23. "yunion.io/x/pkg/utils"
  24. api "yunion.io/x/onecloud/pkg/apis/compute"
  25. monapi "yunion.io/x/onecloud/pkg/apis/monitor"
  26. "yunion.io/x/onecloud/pkg/cloudcommon/db"
  27. "yunion.io/x/onecloud/pkg/httperrors"
  28. "yunion.io/x/onecloud/pkg/mcclient"
  29. "yunion.io/x/onecloud/pkg/mcclient/auth"
  30. "yunion.io/x/onecloud/pkg/mcclient/modules/monitor"
  31. )
  32. type IScalingTriggerDesc interface {
  33. TriggerDescription() string
  34. }
  35. type IScalingTrigger interface {
  36. IScalingTriggerDesc
  37. // ValidateCreateData check and verify the input when creating SScalingPolicy
  38. ValidateCreateData(input api.ScalingPolicyCreateInput) (api.ScalingPolicyCreateInput, error)
  39. // Register
  40. Register(ctx context.Context, userCred mcclient.TokenCredential) error
  41. UnRegister(ctx context.Context, userCred mcclient.TokenCredential) error
  42. TriggerId() string
  43. IsTrigger() bool
  44. }
  45. type SScalingManual struct {
  46. SScalingPolicyBase
  47. }
  48. func (sm SScalingManual) TriggerDescription() string {
  49. name := sm.ScalingPolicyId
  50. sp, _ := sm.ScalingPolicy()
  51. if sp != nil {
  52. name = sp.Name
  53. }
  54. return fmt.Sprintf(`A user request to execute scaling policy "%s"`, name)
  55. }
  56. type SScalingPolicyBase struct {
  57. ScalingPolicyId string `width:"36" charset:"ascii"`
  58. }
  59. func (spb *SScalingPolicyBase) ScalingGroup() (*SScalingGroup, error) {
  60. q := ScalingPolicyManager.Query().In("id", ScalingPolicyManager.Query("scaling_group_id").Equals("id",
  61. spb.ScalingPolicyId).SubQuery())
  62. var sg SScalingGroup
  63. err := q.First(&sg)
  64. return &sg, err
  65. }
  66. func (spb *SScalingPolicyBase) ScalingPolicy() (*SScalingPolicy, error) {
  67. model, err := ScalingPolicyManager.FetchById(spb.ScalingPolicyId)
  68. if err != nil {
  69. return nil, errors.Wrap(err, "ScalingPolicyManager.FetchById")
  70. }
  71. return model.(*SScalingPolicy), nil
  72. }
  73. type SScalingTimerManager struct {
  74. db.SStandaloneResourceBaseManager
  75. }
  76. type SScalingTimer struct {
  77. db.SStandaloneResourceBase
  78. SScalingPolicyBase
  79. STimer
  80. }
  81. type SScalingAlarmManager struct {
  82. db.SStandaloneResourceBaseManager
  83. }
  84. // 1st, 2nd, 3rd
  85. type SScalingAlarm struct {
  86. db.SStandaloneResourceBase
  87. SScalingPolicyBase
  88. // ID of alarm config in alarm service
  89. AlarmId string `width:"128" charset:"ascii"`
  90. // Trigger when the cumulative count is reached
  91. Cumulate int
  92. Cycle int
  93. Indicator string `width:"32" charset:"ascii"`
  94. // Wrapper instruct how to calculate collective data based on individual data
  95. Wrapper string `width:"16" charset:"ascii"`
  96. Operator string `width:"2" charset:"ascii"`
  97. Value float64
  98. // Real-time cumulate number
  99. RealCumulate int `default:"0"`
  100. // Last trigger time
  101. LastTriggerTime time.Time
  102. }
  103. var (
  104. ScalingTimerManager *SScalingTimerManager
  105. ScalingAlarmManager *SScalingAlarmManager
  106. )
  107. func init() {
  108. ScalingTimerManager = &SScalingTimerManager{
  109. SStandaloneResourceBaseManager: db.NewStandaloneResourceBaseManager(
  110. SScalingTimer{},
  111. "scalingtimers_tbl",
  112. "scalingtimer",
  113. "scalingtimers",
  114. ),
  115. }
  116. ScalingTimerManager.SetVirtualObject(ScalingTimerManager)
  117. ScalingAlarmManager = &SScalingAlarmManager{
  118. SStandaloneResourceBaseManager: db.NewStandaloneResourceBaseManager(
  119. SScalingAlarm{},
  120. "scalingalarms_tbl",
  121. "scalingalarm",
  122. "scalingalarms",
  123. ),
  124. }
  125. ScalingAlarmManager.SetVirtualObject(ScalingAlarmManager)
  126. }
  127. func (sa *SScalingAlarm) AlarmDetails() api.ScalingAlarmDetails {
  128. return api.ScalingAlarmDetails{
  129. Cumulate: sa.Cumulate,
  130. Cycle: sa.Cycle,
  131. Indicator: sa.Indicator,
  132. Wrapper: sa.Wrapper,
  133. Operator: sa.Operator,
  134. Value: sa.Value,
  135. }
  136. }
  137. func (st *SScalingTimer) ValidateCreateData(input api.ScalingPolicyCreateInput) (api.ScalingPolicyCreateInput, error) {
  138. var err error
  139. if input.TriggerType == api.TRIGGER_TIMING {
  140. input.Timer, err = checkTimerCreateInput(input.Timer)
  141. } else {
  142. input.CycleTimer, err = checkCycleTimerCreateInput(input.CycleTimer)
  143. }
  144. if err != nil {
  145. return input, httperrors.NewInputParameterError("%v", err)
  146. }
  147. return input, nil
  148. }
  149. func (st *SScalingTimer) Register(ctx context.Context, userCred mcclient.TokenCredential) error {
  150. // insert
  151. st.Update(time.Time{})
  152. err := ScalingTimerManager.TableSpec().Insert(ctx, st)
  153. if err != nil {
  154. return errors.Wrap(err, "STableSpec.Insert")
  155. }
  156. return nil
  157. }
  158. func (st *SScalingTimer) UnRegister(ctx context.Context, userCred mcclient.TokenCredential) error {
  159. err := st.Delete(ctx, userCred)
  160. if err != nil {
  161. return errors.Wrap(err, "SScalingTimer.Delete")
  162. }
  163. return nil
  164. }
  165. func (st *SScalingTimer) TriggerId() string {
  166. return st.GetId()
  167. }
  168. var cstSh, _ = time.LoadLocation("Asia/Shanghai")
  169. func (st *SScalingTimer) TriggerDescription() string {
  170. detail := st.descEnglish()
  171. name := st.ScalingPolicyId
  172. sp, _ := st.ScalingPolicy()
  173. if sp != nil {
  174. name = sp.Name
  175. }
  176. return fmt.Sprintf(`Schedule task(%s) execute scaling policy "%s"`, detail, name)
  177. }
  178. func (st *SScalingTimer) IsTrigger() bool {
  179. return true
  180. }
  181. func (sa *SScalingAlarm) ValidateCreateData(input api.ScalingPolicyCreateInput) (api.ScalingPolicyCreateInput, error) {
  182. if len(input.Alarm.Operator) == 0 {
  183. input.Alarm.Operator = api.OPERATOR_GT
  184. }
  185. if input.Alarm.Cycle == 0 {
  186. input.Alarm.Cycle = 300
  187. }
  188. if !utils.IsInStringArray(input.Alarm.Operator, []string{api.OPERATOR_GT, api.OPERATOR_LT}) {
  189. return input, httperrors.NewInputParameterError("unkown operator in alarm %s", input.Alarm.Operator)
  190. }
  191. if !utils.IsInStringArray(input.Alarm.Indicator, []string{api.INDICATOR_CPU, api.INDICATOR_DISK_READ,
  192. api.INDICATOR_DISK_WRITE, api.INDICATOR_FLOW_INTO, api.INDICATOR_FLOW_OUT}) {
  193. return input, httperrors.NewInputParameterError("unkown indicator in alarm %s", input.Alarm.Indicator)
  194. }
  195. if !utils.IsInStringArray(input.Alarm.Wrapper, []string{api.WRAPPER_MIN, api.WRAPPER_MAX, api.WRAPPER_AVER}) {
  196. return input, httperrors.NewInputParameterError("unkown wrapper in alarm %s", input.Alarm.Wrapper)
  197. }
  198. if input.Alarm.Cycle < 300 {
  199. return input, httperrors.NewInputParameterError("the min value of cycle in alarm is 300")
  200. }
  201. return input, nil
  202. }
  203. func (spm *SScalingPolicyManager) NotificationID(session *mcclient.ClientSession) (string, error) {
  204. var notificationID = ""
  205. params := jsonutils.NewDict()
  206. params.Set("type", jsonutils.NewString(monapi.AlertNotificationTypeAutoScaling))
  207. result, err := monitor.Notifications.List(session, params)
  208. if err != nil && errors.Cause(err) != sql.ErrNoRows {
  209. return "", errors.Wrap(err, "Notifications.List")
  210. }
  211. if result.Total != 0 {
  212. notificationID, _ = result.Data[0].GetString("id")
  213. return notificationID, nil
  214. }
  215. // To create new one
  216. conTrue, conFalse := true, false
  217. ncinput := monapi.NotificationCreateInput{
  218. Name: fmt.Sprintf("as-%s-%s", session.GetDomainName(), session.GetProjectName()),
  219. Type: monapi.AlertNotificationTypeAutoScaling,
  220. IsDefault: false,
  221. SendReminder: &conFalse,
  222. DisableResolveMessage: &conTrue,
  223. Settings: jsonutils.NewDict(),
  224. }
  225. ret, err := monitor.Notifications.Create(session, jsonutils.Marshal(ncinput))
  226. if err != nil {
  227. return "", errors.Wrap(err, "Notification.Create")
  228. }
  229. notificationID, _ = ret.GetString("id")
  230. return notificationID, nil
  231. }
  232. func (sa *SScalingAlarm) Register(ctx context.Context, userCred mcclient.TokenCredential) error {
  233. sp, err := sa.ScalingPolicy()
  234. if err != nil {
  235. return err
  236. }
  237. session := auth.GetSession(ctx, userCred, "")
  238. notificationID, err := ScalingPolicyManager.NotificationID(session)
  239. if err != nil {
  240. return errors.Wrap(err, "ScalingPolicyManager.NotificationID")
  241. }
  242. // create Alert
  243. config, err := sa.generateAlertConfig(sp)
  244. if err != nil {
  245. return errors.Wrap(err, "ScalingAlarm.generateAlertConfig")
  246. }
  247. alert, err := monitor.CommonAlerts.DoCreate(session, config, &monapi.CommonAlertCreateBaseInput{
  248. AlertType: monapi.CommonAlertServiceAlertType,
  249. })
  250. if err != nil {
  251. return errors.Wrap(err, "create Alert failed")
  252. }
  253. alarmId, _ := alert.GetString("id")
  254. // detach
  255. params := jsonutils.NewDict()
  256. params.Set("scaling_policy_id", jsonutils.NewString(sa.ScalingPolicyId))
  257. detachParams := jsonutils.NewDict()
  258. detachParams.Set("params", params)
  259. _, err = monitor.Alertnotification.Attach(session, alarmId, notificationID, detachParams)
  260. if err != nil {
  261. monitor.Alerts.Delete(session, alarmId, jsonutils.NewDict())
  262. return errors.Wrap(err, "attach alert with notification")
  263. }
  264. sa.AlarmId = alarmId
  265. // insert
  266. err = ScalingAlarmManager.TableSpec().Insert(ctx, sa)
  267. if err != nil {
  268. return errors.Wrap(err, "STableSpec.Insert")
  269. }
  270. return nil
  271. }
  272. type sTableField struct {
  273. Table string
  274. Field string
  275. }
  276. var indicatorMap = map[string]sTableField{
  277. api.INDICATOR_CPU: {"vm_cpu", "usage_active"},
  278. api.INDICATOR_DISK_WRITE: {"vm_diskio", "write_bps"},
  279. api.INDICATOR_DISK_READ: {"vm_diskio", "read_bps"},
  280. api.INDICATOR_FLOW_INTO: {"vm_netio", "bps_recv"},
  281. api.INDICATOR_FLOW_OUT: {"vm_netio", "bps_sent"},
  282. }
  283. var alertConfigUsedBy = "scaling_group"
  284. func (sa *SScalingAlarm) generateAlertConfig(sp *SScalingPolicy) (*monitor.AlertConfig, error) {
  285. config, err := monitor.NewAlertConfig(fmt.Sprintf("sp-%s", sp.Id), fmt.Sprintf("%ds", sa.Cycle), true)
  286. if err != nil {
  287. return nil, err
  288. }
  289. config.UsedBy = alertConfigUsedBy
  290. cond := config.Condition("telegraf", indicatorMap[sa.Indicator].Table).Avg()
  291. log.Debugf("alarm: %#v", sa)
  292. switch sa.Operator {
  293. case api.OPERATOR_LT:
  294. cond = cond.LT(sa.Value)
  295. case api.OPERATOR_GT:
  296. cond = cond.GT(sa.Value)
  297. }
  298. q := cond.Query().From("1h")
  299. sel := q.Selects().Select(indicatorMap[sa.Indicator].Field)
  300. switch sa.Wrapper {
  301. case api.WRAPPER_AVER:
  302. sel = sel.MEAN()
  303. case api.WRAPPER_MAX:
  304. sel = sel.MAX()
  305. case api.WRAPPER_MIN:
  306. sel = sel.MIN()
  307. }
  308. q.Where().Equal("vm_scaling_group_id", sp.ScalingGroupId)
  309. q.GroupBy().TAG("*").FILL_NULL()
  310. return config, nil
  311. }
  312. func (sa *SScalingAlarm) UnRegister(ctx context.Context, userCred mcclient.TokenCredential) error {
  313. session := auth.GetSession(ctx, userCred, "")
  314. _, err := monitor.Alerts.Delete(session, sa.AlarmId, jsonutils.NewDict())
  315. if err != nil && errors.Cause(err) != httperrors.ErrResourceNotFound {
  316. return errors.Wrap(err, "Alerts.Delete")
  317. }
  318. err = sa.Delete(ctx, userCred)
  319. if err != nil {
  320. return errors.Wrap(err, "SSCalingAlarm.Delete")
  321. }
  322. return nil
  323. }
  324. func (sa *SScalingAlarm) TriggerId() string {
  325. return sa.GetId()
  326. }
  327. func (sa *SScalingAlarm) TriggerDescription() string {
  328. name := sa.ScalingPolicyId
  329. sp, _ := sa.ScalingPolicy()
  330. if sp != nil {
  331. name = sp.Name
  332. }
  333. return fmt.Sprintf(
  334. `Alarm task(the %s %s of the instance is %s than %f%s) execute scaling policy "%s"`,
  335. descs[sa.Wrapper], descs[sa.Indicator], descs[sa.Operator],
  336. sa.Value, units[sa.Indicator], name,
  337. )
  338. }
  339. func (sa *SScalingAlarm) IsTrigger() (is bool) {
  340. realCumulate := sa.RealCumulate
  341. lastTriggerTime := sa.LastTriggerTime
  342. now := time.Now()
  343. if lastTriggerTime.Add(time.Duration(sa.Cycle) * 2 * time.Second).Before(now) {
  344. realCumulate = 1
  345. } else {
  346. realCumulate += 1
  347. }
  348. lastTriggerTime = now
  349. if realCumulate == sa.Cumulate {
  350. is = true
  351. realCumulate = 0
  352. }
  353. _, err := db.Update(sa, func() error {
  354. sa.RealCumulate = realCumulate
  355. sa.LastTriggerTime = lastTriggerTime
  356. return nil
  357. })
  358. if err != nil {
  359. log.Errorf("db.Update in ScalingAlarm.IsTrigger failed: %s", err.Error())
  360. }
  361. return
  362. }
  363. var descs = map[string]string{
  364. api.INDICATOR_CPU: "CPU utilization",
  365. api.INDICATOR_MEM: "memory utilization",
  366. api.INDICATOR_DISK_READ: "disk read rate",
  367. api.INDICATOR_DISK_WRITE: "disk write rate",
  368. api.INDICATOR_FLOW_INTO: "network inflow rate",
  369. api.INDICATOR_FLOW_OUT: "network outflow rate",
  370. api.WRAPPER_MAX: "maximum",
  371. api.WRAPPER_MIN: "minimum",
  372. api.WRAPPER_AVER: "average",
  373. api.OPERATOR_GT: "greater",
  374. api.OPERATOR_LT: "less",
  375. }
  376. var units = map[string]string{
  377. api.INDICATOR_CPU: "%",
  378. api.INDICATOR_MEM: "%",
  379. api.INDICATOR_DISK_READ: "kB/s",
  380. api.INDICATOR_DISK_WRITE: "kB/s",
  381. api.INDICATOR_FLOW_INTO: "KB/s",
  382. api.INDICATOR_FLOW_OUT: "KB/s",
  383. }