| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437 |
- // Copyright 2019 Yunion
- //
- // Licensed under the Apache License, Version 2.0 (the "License");
- // you may not use this file except in compliance with the License.
- // You may obtain a copy of the License at
- //
- // http://www.apache.org/licenses/LICENSE-2.0
- //
- // Unless required by applicable law or agreed to in writing, software
- // distributed under the License is distributed on an "AS IS" BASIS,
- // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- // See the License for the specific language governing permissions and
- // limitations under the License.
- package models
- import (
- "context"
- "database/sql"
- "fmt"
- "time"
- "yunion.io/x/jsonutils"
- "yunion.io/x/log"
- "yunion.io/x/pkg/errors"
- "yunion.io/x/pkg/utils"
- api "yunion.io/x/onecloud/pkg/apis/compute"
- monapi "yunion.io/x/onecloud/pkg/apis/monitor"
- "yunion.io/x/onecloud/pkg/cloudcommon/db"
- "yunion.io/x/onecloud/pkg/httperrors"
- "yunion.io/x/onecloud/pkg/mcclient"
- "yunion.io/x/onecloud/pkg/mcclient/auth"
- "yunion.io/x/onecloud/pkg/mcclient/modules/monitor"
- )
- type IScalingTriggerDesc interface {
- TriggerDescription() string
- }
- type IScalingTrigger interface {
- IScalingTriggerDesc
- // ValidateCreateData check and verify the input when creating SScalingPolicy
- ValidateCreateData(input api.ScalingPolicyCreateInput) (api.ScalingPolicyCreateInput, error)
- // Register
- Register(ctx context.Context, userCred mcclient.TokenCredential) error
- UnRegister(ctx context.Context, userCred mcclient.TokenCredential) error
- TriggerId() string
- IsTrigger() bool
- }
- type SScalingManual struct {
- SScalingPolicyBase
- }
- func (sm SScalingManual) TriggerDescription() string {
- name := sm.ScalingPolicyId
- sp, _ := sm.ScalingPolicy()
- if sp != nil {
- name = sp.Name
- }
- return fmt.Sprintf(`A user request to execute scaling policy "%s"`, name)
- }
- type SScalingPolicyBase struct {
- ScalingPolicyId string `width:"36" charset:"ascii"`
- }
- func (spb *SScalingPolicyBase) ScalingGroup() (*SScalingGroup, error) {
- q := ScalingPolicyManager.Query().In("id", ScalingPolicyManager.Query("scaling_group_id").Equals("id",
- spb.ScalingPolicyId).SubQuery())
- var sg SScalingGroup
- err := q.First(&sg)
- return &sg, err
- }
- func (spb *SScalingPolicyBase) ScalingPolicy() (*SScalingPolicy, error) {
- model, err := ScalingPolicyManager.FetchById(spb.ScalingPolicyId)
- if err != nil {
- return nil, errors.Wrap(err, "ScalingPolicyManager.FetchById")
- }
- return model.(*SScalingPolicy), nil
- }
- type SScalingTimerManager struct {
- db.SStandaloneResourceBaseManager
- }
- type SScalingTimer struct {
- db.SStandaloneResourceBase
- SScalingPolicyBase
- STimer
- }
- type SScalingAlarmManager struct {
- db.SStandaloneResourceBaseManager
- }
- // 1st, 2nd, 3rd
- type SScalingAlarm struct {
- db.SStandaloneResourceBase
- SScalingPolicyBase
- // ID of alarm config in alarm service
- AlarmId string `width:"128" charset:"ascii"`
- // Trigger when the cumulative count is reached
- Cumulate int
- Cycle int
- Indicator string `width:"32" charset:"ascii"`
- // Wrapper instruct how to calculate collective data based on individual data
- Wrapper string `width:"16" charset:"ascii"`
- Operator string `width:"2" charset:"ascii"`
- Value float64
- // Real-time cumulate number
- RealCumulate int `default:"0"`
- // Last trigger time
- LastTriggerTime time.Time
- }
- var (
- ScalingTimerManager *SScalingTimerManager
- ScalingAlarmManager *SScalingAlarmManager
- )
- func init() {
- ScalingTimerManager = &SScalingTimerManager{
- SStandaloneResourceBaseManager: db.NewStandaloneResourceBaseManager(
- SScalingTimer{},
- "scalingtimers_tbl",
- "scalingtimer",
- "scalingtimers",
- ),
- }
- ScalingTimerManager.SetVirtualObject(ScalingTimerManager)
- ScalingAlarmManager = &SScalingAlarmManager{
- SStandaloneResourceBaseManager: db.NewStandaloneResourceBaseManager(
- SScalingAlarm{},
- "scalingalarms_tbl",
- "scalingalarm",
- "scalingalarms",
- ),
- }
- ScalingAlarmManager.SetVirtualObject(ScalingAlarmManager)
- }
- func (sa *SScalingAlarm) AlarmDetails() api.ScalingAlarmDetails {
- return api.ScalingAlarmDetails{
- Cumulate: sa.Cumulate,
- Cycle: sa.Cycle,
- Indicator: sa.Indicator,
- Wrapper: sa.Wrapper,
- Operator: sa.Operator,
- Value: sa.Value,
- }
- }
- func (st *SScalingTimer) ValidateCreateData(input api.ScalingPolicyCreateInput) (api.ScalingPolicyCreateInput, error) {
- var err error
- if input.TriggerType == api.TRIGGER_TIMING {
- input.Timer, err = checkTimerCreateInput(input.Timer)
- } else {
- input.CycleTimer, err = checkCycleTimerCreateInput(input.CycleTimer)
- }
- if err != nil {
- return input, httperrors.NewInputParameterError("%v", err)
- }
- return input, nil
- }
- func (st *SScalingTimer) Register(ctx context.Context, userCred mcclient.TokenCredential) error {
- // insert
- st.Update(time.Time{})
- err := ScalingTimerManager.TableSpec().Insert(ctx, st)
- if err != nil {
- return errors.Wrap(err, "STableSpec.Insert")
- }
- return nil
- }
- func (st *SScalingTimer) UnRegister(ctx context.Context, userCred mcclient.TokenCredential) error {
- err := st.Delete(ctx, userCred)
- if err != nil {
- return errors.Wrap(err, "SScalingTimer.Delete")
- }
- return nil
- }
- func (st *SScalingTimer) TriggerId() string {
- return st.GetId()
- }
- var cstSh, _ = time.LoadLocation("Asia/Shanghai")
- func (st *SScalingTimer) TriggerDescription() string {
- detail := st.descEnglish()
- name := st.ScalingPolicyId
- sp, _ := st.ScalingPolicy()
- if sp != nil {
- name = sp.Name
- }
- return fmt.Sprintf(`Schedule task(%s) execute scaling policy "%s"`, detail, name)
- }
- func (st *SScalingTimer) IsTrigger() bool {
- return true
- }
- func (sa *SScalingAlarm) ValidateCreateData(input api.ScalingPolicyCreateInput) (api.ScalingPolicyCreateInput, error) {
- if len(input.Alarm.Operator) == 0 {
- input.Alarm.Operator = api.OPERATOR_GT
- }
- if input.Alarm.Cycle == 0 {
- input.Alarm.Cycle = 300
- }
- if !utils.IsInStringArray(input.Alarm.Operator, []string{api.OPERATOR_GT, api.OPERATOR_LT}) {
- return input, httperrors.NewInputParameterError("unkown operator in alarm %s", input.Alarm.Operator)
- }
- if !utils.IsInStringArray(input.Alarm.Indicator, []string{api.INDICATOR_CPU, api.INDICATOR_DISK_READ,
- api.INDICATOR_DISK_WRITE, api.INDICATOR_FLOW_INTO, api.INDICATOR_FLOW_OUT}) {
- return input, httperrors.NewInputParameterError("unkown indicator in alarm %s", input.Alarm.Indicator)
- }
- if !utils.IsInStringArray(input.Alarm.Wrapper, []string{api.WRAPPER_MIN, api.WRAPPER_MAX, api.WRAPPER_AVER}) {
- return input, httperrors.NewInputParameterError("unkown wrapper in alarm %s", input.Alarm.Wrapper)
- }
- if input.Alarm.Cycle < 300 {
- return input, httperrors.NewInputParameterError("the min value of cycle in alarm is 300")
- }
- return input, nil
- }
- func (spm *SScalingPolicyManager) NotificationID(session *mcclient.ClientSession) (string, error) {
- var notificationID = ""
- params := jsonutils.NewDict()
- params.Set("type", jsonutils.NewString(monapi.AlertNotificationTypeAutoScaling))
- result, err := monitor.Notifications.List(session, params)
- if err != nil && errors.Cause(err) != sql.ErrNoRows {
- return "", errors.Wrap(err, "Notifications.List")
- }
- if result.Total != 0 {
- notificationID, _ = result.Data[0].GetString("id")
- return notificationID, nil
- }
- // To create new one
- conTrue, conFalse := true, false
- ncinput := monapi.NotificationCreateInput{
- Name: fmt.Sprintf("as-%s-%s", session.GetDomainName(), session.GetProjectName()),
- Type: monapi.AlertNotificationTypeAutoScaling,
- IsDefault: false,
- SendReminder: &conFalse,
- DisableResolveMessage: &conTrue,
- Settings: jsonutils.NewDict(),
- }
- ret, err := monitor.Notifications.Create(session, jsonutils.Marshal(ncinput))
- if err != nil {
- return "", errors.Wrap(err, "Notification.Create")
- }
- notificationID, _ = ret.GetString("id")
- return notificationID, nil
- }
- func (sa *SScalingAlarm) Register(ctx context.Context, userCred mcclient.TokenCredential) error {
- sp, err := sa.ScalingPolicy()
- if err != nil {
- return err
- }
- session := auth.GetSession(ctx, userCred, "")
- notificationID, err := ScalingPolicyManager.NotificationID(session)
- if err != nil {
- return errors.Wrap(err, "ScalingPolicyManager.NotificationID")
- }
- // create Alert
- config, err := sa.generateAlertConfig(sp)
- if err != nil {
- return errors.Wrap(err, "ScalingAlarm.generateAlertConfig")
- }
- alert, err := monitor.CommonAlerts.DoCreate(session, config, &monapi.CommonAlertCreateBaseInput{
- AlertType: monapi.CommonAlertServiceAlertType,
- })
- if err != nil {
- return errors.Wrap(err, "create Alert failed")
- }
- alarmId, _ := alert.GetString("id")
- // detach
- params := jsonutils.NewDict()
- params.Set("scaling_policy_id", jsonutils.NewString(sa.ScalingPolicyId))
- detachParams := jsonutils.NewDict()
- detachParams.Set("params", params)
- _, err = monitor.Alertnotification.Attach(session, alarmId, notificationID, detachParams)
- if err != nil {
- monitor.Alerts.Delete(session, alarmId, jsonutils.NewDict())
- return errors.Wrap(err, "attach alert with notification")
- }
- sa.AlarmId = alarmId
- // insert
- err = ScalingAlarmManager.TableSpec().Insert(ctx, sa)
- if err != nil {
- return errors.Wrap(err, "STableSpec.Insert")
- }
- return nil
- }
- type sTableField struct {
- Table string
- Field string
- }
- var indicatorMap = map[string]sTableField{
- api.INDICATOR_CPU: {"vm_cpu", "usage_active"},
- api.INDICATOR_DISK_WRITE: {"vm_diskio", "write_bps"},
- api.INDICATOR_DISK_READ: {"vm_diskio", "read_bps"},
- api.INDICATOR_FLOW_INTO: {"vm_netio", "bps_recv"},
- api.INDICATOR_FLOW_OUT: {"vm_netio", "bps_sent"},
- }
- var alertConfigUsedBy = "scaling_group"
- func (sa *SScalingAlarm) generateAlertConfig(sp *SScalingPolicy) (*monitor.AlertConfig, error) {
- config, err := monitor.NewAlertConfig(fmt.Sprintf("sp-%s", sp.Id), fmt.Sprintf("%ds", sa.Cycle), true)
- if err != nil {
- return nil, err
- }
- config.UsedBy = alertConfigUsedBy
- cond := config.Condition("telegraf", indicatorMap[sa.Indicator].Table).Avg()
- log.Debugf("alarm: %#v", sa)
- switch sa.Operator {
- case api.OPERATOR_LT:
- cond = cond.LT(sa.Value)
- case api.OPERATOR_GT:
- cond = cond.GT(sa.Value)
- }
- q := cond.Query().From("1h")
- sel := q.Selects().Select(indicatorMap[sa.Indicator].Field)
- switch sa.Wrapper {
- case api.WRAPPER_AVER:
- sel = sel.MEAN()
- case api.WRAPPER_MAX:
- sel = sel.MAX()
- case api.WRAPPER_MIN:
- sel = sel.MIN()
- }
- q.Where().Equal("vm_scaling_group_id", sp.ScalingGroupId)
- q.GroupBy().TAG("*").FILL_NULL()
- return config, nil
- }
- func (sa *SScalingAlarm) UnRegister(ctx context.Context, userCred mcclient.TokenCredential) error {
- session := auth.GetSession(ctx, userCred, "")
- _, err := monitor.Alerts.Delete(session, sa.AlarmId, jsonutils.NewDict())
- if err != nil && errors.Cause(err) != httperrors.ErrResourceNotFound {
- return errors.Wrap(err, "Alerts.Delete")
- }
- err = sa.Delete(ctx, userCred)
- if err != nil {
- return errors.Wrap(err, "SSCalingAlarm.Delete")
- }
- return nil
- }
- func (sa *SScalingAlarm) TriggerId() string {
- return sa.GetId()
- }
- func (sa *SScalingAlarm) TriggerDescription() string {
- name := sa.ScalingPolicyId
- sp, _ := sa.ScalingPolicy()
- if sp != nil {
- name = sp.Name
- }
- return fmt.Sprintf(
- `Alarm task(the %s %s of the instance is %s than %f%s) execute scaling policy "%s"`,
- descs[sa.Wrapper], descs[sa.Indicator], descs[sa.Operator],
- sa.Value, units[sa.Indicator], name,
- )
- }
- func (sa *SScalingAlarm) IsTrigger() (is bool) {
- realCumulate := sa.RealCumulate
- lastTriggerTime := sa.LastTriggerTime
- now := time.Now()
- if lastTriggerTime.Add(time.Duration(sa.Cycle) * 2 * time.Second).Before(now) {
- realCumulate = 1
- } else {
- realCumulate += 1
- }
- lastTriggerTime = now
- if realCumulate == sa.Cumulate {
- is = true
- realCumulate = 0
- }
- _, err := db.Update(sa, func() error {
- sa.RealCumulate = realCumulate
- sa.LastTriggerTime = lastTriggerTime
- return nil
- })
- if err != nil {
- log.Errorf("db.Update in ScalingAlarm.IsTrigger failed: %s", err.Error())
- }
- return
- }
- var descs = map[string]string{
- api.INDICATOR_CPU: "CPU utilization",
- api.INDICATOR_MEM: "memory utilization",
- api.INDICATOR_DISK_READ: "disk read rate",
- api.INDICATOR_DISK_WRITE: "disk write rate",
- api.INDICATOR_FLOW_INTO: "network inflow rate",
- api.INDICATOR_FLOW_OUT: "network outflow rate",
- api.WRAPPER_MAX: "maximum",
- api.WRAPPER_MIN: "minimum",
- api.WRAPPER_AVER: "average",
- api.OPERATOR_GT: "greater",
- api.OPERATOR_LT: "less",
- }
- var units = map[string]string{
- api.INDICATOR_CPU: "%",
- api.INDICATOR_MEM: "%",
- api.INDICATOR_DISK_READ: "kB/s",
- api.INDICATOR_DISK_WRITE: "kB/s",
- api.INDICATOR_FLOW_INTO: "KB/s",
- api.INDICATOR_FLOW_OUT: "KB/s",
- }
|