| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773 |
- // Copyright 2019 Yunion
- //
- // Licensed under the Apache License, Version 2.0 (the "License");
- // you may not use this file except in compliance with the License.
- // You may obtain a copy of the License at
- //
- // http://www.apache.org/licenses/LICENSE-2.0
- //
- // Unless required by applicable law or agreed to in writing, software
- // distributed under the License is distributed on an "AS IS" BASIS,
- // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- // See the License for the specific language governing permissions and
- // limitations under the License.
- package autoscaling
- import (
- "bytes"
- "context"
- "database/sql"
- "fmt"
- "math/rand"
- "strings"
- "sync"
- "time"
- "yunion.io/x/jsonutils"
- "yunion.io/x/log"
- "yunion.io/x/pkg/errors"
- "yunion.io/x/pkg/util/httputils"
- "yunion.io/x/pkg/util/sets"
- "yunion.io/x/sqlchemy"
- "yunion.io/x/onecloud/pkg/apis/compute"
- "yunion.io/x/onecloud/pkg/cloudcommon/cronman"
- "yunion.io/x/onecloud/pkg/cloudcommon/db"
- "yunion.io/x/onecloud/pkg/compute/models"
- "yunion.io/x/onecloud/pkg/compute/options"
- "yunion.io/x/onecloud/pkg/mcclient"
- "yunion.io/x/onecloud/pkg/mcclient/auth"
- modules "yunion.io/x/onecloud/pkg/mcclient/modules/compute"
- "yunion.io/x/onecloud/pkg/mcclient/modules/scheduler"
- "yunion.io/x/onecloud/pkg/util/logclient"
- "yunion.io/x/onecloud/pkg/util/nopanic"
- )
- type SASController struct {
- options options.SASControllerOptions
- scalingQueue chan struct{}
- timerQueue chan struct{}
- scalingGroupSet *SLockedSet
- scalingSql *sqlchemy.SQuery
- // record the consecutive failures of scaling group's scale
- failRecord map[string]int
- }
- type SScalingInfo struct {
- ScalingGroup *models.SScalingGroup
- Total int
- }
- type SLockedSet struct {
- set sets.String
- lock sync.Mutex
- }
- func (set *SLockedSet) Has(s string) bool {
- return set.set.Has(s)
- }
- func (set *SLockedSet) CheckAndInsert(s string) bool {
- set.lock.Lock()
- defer set.lock.Unlock()
- if set.set.Has(s) {
- return false
- }
- set.set.Insert(s)
- return true
- }
- func (set *SLockedSet) Delete(s string) {
- set.lock.Lock()
- defer set.lock.Unlock()
- set.set.Delete(s)
- }
- var ASController = new(SASController)
- func (asc *SASController) Init(options options.SASControllerOptions, cronm *cronman.SCronJobManager) {
- asc.options = options
- asc.timerQueue = make(chan struct{}, 20)
- asc.scalingQueue = make(chan struct{}, options.ConcurrentUpper)
- asc.scalingGroupSet = &SLockedSet{set: sets.NewString()}
- asc.failRecord = make(map[string]int)
- // init scalingSql
- sggQ := models.ScalingGroupGuestManager.Query("scaling_group_id").GroupBy("scaling_group_id")
- sggQ = sggQ.AppendField(sqlchemy.COUNT("total", sggQ.Field("guest_id")))
- sggSubQ := sggQ.SubQuery()
- sgQ := models.ScalingGroupManager.Query("id", "desire_instance_number").IsTrue("enabled")
- sgQ = sgQ.LeftJoin(sggSubQ, sqlchemy.AND(sqlchemy.Equals(sggSubQ.Field("scaling_group_id"),
- sgQ.Field("id")), sqlchemy.NotEquals(sggSubQ.Field("total"), sgQ.Field("desire_instance_number"))))
- sgQ.AppendField(sggSubQ.Field("total"))
- asc.scalingSql = sgQ
- cronm.AddJobAtIntervalsWithStartRun("CheckTimer", time.Duration(options.TimerInterval)*time.Second, asc.Timer, true)
- cronm.AddJobAtIntervalsWithStartRun("CheckScale", time.Duration(options.CheckScaleInterval)*time.Second, asc.CheckScale, true)
- cronm.AddJobAtIntervalsWithStartRun("CheckInstanceHealth", time.Duration(options.CheckHealthInterval)*time.Minute, asc.CheckInstanceHealth, true)
- // check all scaling activity
- nopanic.Run(func() {
- log.Infof("check and update scaling activities...")
- sas := make([]models.SScalingActivity, 0, 10)
- q := models.ScalingActivityManager.Query().Equals("status", compute.SA_STATUS_EXEC)
- err := db.FetchModelObjects(models.ScalingActivityManager, q, &sas)
- if err != nil {
- log.Errorf("unable to check and update scaling activities")
- return
- }
- for i := range sas {
- sas[i].SetFailed("", "As the service restarts, the status becomes unknown")
- }
- log.Infof("check and update scalngactivities complete")
- })
- }
- func (asc *SASController) PreScale(group *models.SScalingGroup, userCred mcclient.TokenCredential) bool {
- maxFailures := 3
- disableReason := fmt.Sprintf("The number of consecutive failures of creating a machine exceeds %d times", maxFailures)
- times := asc.failRecord[group.GetId()]
- if times >= maxFailures {
- _, err := db.Update(group, func() error {
- group.SetEnabled(false)
- return nil
- })
- if err != nil {
- return false
- }
- logclient.AddSimpleActionLog(group, logclient.ACT_DISABLE, disableReason, userCred, true)
- return false
- }
- return true
- }
- func (asc *SASController) Finish(groupId string, success bool) {
- asc.scalingGroupSet.Delete(groupId)
- if success {
- asc.failRecord[groupId] = 0
- return
- }
- asc.failRecord[groupId]++
- }
- // SScalingGroupShort wrap the ScalingGroup's ID and DesireInstanceNumber with field 'total' which means the total
- // guests number in this ScalingGroup
- type SScalingGroupShort struct {
- ID string
- DesireInstanceNumber int `default:"0"`
- Total int `default:"0"`
- }
- func (asc *SASController) CheckScale(ctx context.Context, userCred mcclient.TokenCredential, isStart bool) {
- sgShorts, err := asc.ScalingGroupsNeedScale()
- if err != nil {
- log.Errorf("asc.ScalingGroupNeedScale: %s", err.Error())
- return
- }
- for _, short := range sgShorts {
- if short.DesireInstanceNumber == short.Total {
- continue
- }
- insert := asc.scalingGroupSet.CheckAndInsert(short.ID)
- if !insert {
- log.Infof("A scaling activity of ScalingGroup %s is in progress, so current scaling activity was rejected.", short.ID)
- continue
- }
- asc.scalingQueue <- struct{}{}
- go asc.Scale(ctx, userCred, short)
- }
- // log.Debugf("This cronJob about CheckScale finished")
- }
- func (asc *SASController) Scale(ctx context.Context, userCred mcclient.TokenCredential, short SScalingGroupShort) {
- log.Debugf("scale for ScalingGroup '%s', desire: %d, total: %d", short.ID, short.DesireInstanceNumber, short.Total)
- var (
- err error
- success = true
- )
- setFail := func(sa *models.SScalingActivity, reason string) {
- success = false
- err = sa.SetFailed("", reason)
- }
- defer func() {
- if err != nil {
- log.Errorf("Scaling for ScalingGroup '%s': %s", short.ID, err.Error())
- }
- asc.Finish(short.ID, success)
- <-asc.scalingQueue
- log.Debugf("Scale for ScalingGroup '%s' finished", short.ID)
- }()
- log.Debugf("fetch the latest data")
- // fetch the latest data
- model, err := models.ScalingGroupManager.FetchById(short.ID)
- if err != nil {
- if err == sql.ErrNoRows {
- err = nil
- }
- return
- }
- sg := model.(*models.SScalingGroup)
- if !asc.PreScale(sg, userCred) {
- success = true
- return
- }
- total, err := sg.GuestNumber()
- if err != nil {
- return
- }
- log.Debugf("total: %d, desire: %d", total, sg.DesireInstanceNumber)
- // don't scale
- if sg.DesireInstanceNumber-total == 0 {
- return
- }
- log.Debugf("insert sa")
- scalingActivity, err := models.ScalingActivityManager.CreateScalingActivity(
- ctx,
- sg.Id,
- fmt.Sprintf(`The Desire Instance Number was changed, so change the Total Instance Number from "%d" to "%d"`,
- total, sg.DesireInstanceNumber,
- ),
- compute.SA_STATUS_EXEC,
- )
- if err != nil {
- return
- }
- // userCred是管理员,ownerId是拥有者
- ownerId := sg.GetOwnerId()
- num := sg.DesireInstanceNumber - total
- switch {
- case num > 0:
- // check guest template
- gt := sg.GetGuestTemplate()
- if gt == nil {
- setFail(scalingActivity, fmt.Sprintf("fetch GuestTemplate of ScalingGroup '%s' error", sg.Id))
- return
- }
- nets, err := sg.NetworkIds()
- if err != nil {
- setFail(scalingActivity, fmt.Sprintf("fetch Networks of ScalingGroup '%s' error", sg.Id))
- return
- }
- valid, msg := gt.Validate(context.TODO(), auth.AdminCredential(), gt.GetOwnerId(),
- models.SGuestTemplateValidate{
- Hypervisor: sg.Hypervisor,
- CloudregionId: sg.CloudregionId,
- VpcId: sg.VpcId,
- NetworkIds: nets,
- },
- )
- if !valid {
- err = scalingActivity.SetReject("", msg)
- return
- }
- if len(nets) == 0 {
- setFail(scalingActivity, fmt.Sprintf("empty networks '%s' error", sg.Id))
- return
- }
- succeedInstances, err := asc.CreateInstances(ctx, userCred, ownerId, sg, gt, nets[0], num)
- switch len(succeedInstances) {
- case 0:
- setFail(scalingActivity, fmt.Sprintf("All instances create failed: %s", err.Error()))
- case num:
- var action bytes.Buffer
- action.WriteString("Instances ")
- for _, instance := range succeedInstances {
- action.WriteString(fmt.Sprintf("'%s', ", instance.Name))
- }
- action.Truncate(action.Len() - 2)
- action.WriteString(" are created")
- err = scalingActivity.SetResult(action.String(), compute.SA_STATUS_SUCCEED, "", sg.DesireInstanceNumber)
- default:
- var action bytes.Buffer
- action.WriteString("Instances ")
- for _, instance := range succeedInstances {
- action.WriteString(fmt.Sprintf("'%s', ", instance.Name))
- }
- action.Truncate(action.Len() - 2)
- action.WriteString(" are created")
- err = scalingActivity.SetResult(action.String(), compute.SA_STATUS_PART_SUCCEED, fmt.Sprintf("Some instances create failed: %s", err.Error()), total+len(succeedInstances))
- }
- return
- case num < 0:
- num = -num
- succeedInstances, err := asc.DetachInstances(ctx, userCred, ownerId, sg, num)
- switch len(succeedInstances) {
- case 0:
- setFail(scalingActivity, fmt.Sprintf("All instance remove failed: %s", err.Error()))
- case num:
- var action bytes.Buffer
- action.WriteString("Instances ")
- for _, instance := range succeedInstances {
- action.WriteString(fmt.Sprintf("'%s', ", instance.Name))
- }
- action.Truncate(action.Len() - 2)
- action.WriteString(" are deleted")
- err = scalingActivity.SetResult(action.String(), compute.SA_STATUS_SUCCEED, "", sg.DesireInstanceNumber)
- default:
- var action bytes.Buffer
- action.WriteString("Instances ")
- for _, instance := range succeedInstances {
- action.WriteString(fmt.Sprintf("'%s', ", instance.Name))
- }
- action.Truncate(action.Len() - 2)
- action.WriteString(" are deleted")
- err = scalingActivity.SetResult(action.String(), compute.SA_STATUS_PART_SUCCEED, fmt.Sprintf("Some instance removed failed: %s", err.Error()), sg.DesireInstanceNumber)
- }
- return
- }
- }
- func (asc *SASController) DetachInstances(ctx context.Context, userCred mcclient.TokenCredential,
- ownerId mcclient.IIdentityProvider, sg *models.SScalingGroup, num int) ([]SInstance, error) {
- instances, err := asc.findSuitableInstance(sg, num)
- if err != nil {
- return nil, errors.Wrap(err, "find suitable instances failed")
- }
- removeParams := jsonutils.NewDict()
- removeParams.Set("scaling_group", jsonutils.NewString(sg.Id))
- removeParams.Set("delete_server", jsonutils.JSONTrue)
- removeParams.Set("auto", jsonutils.JSONTrue)
- session := auth.GetSession(ctx, userCred, "")
- failedList := make([]string, 0)
- waitList := make([]string, 0, len(instances))
- instanceMap := make(map[string]SInstance, len(instances))
- // request to detach instances with scaling group
- for i := range instances {
- instanceMap[instances[i].Id] = SInstance{instances[i].Id, instances[i].Name}
- _, err := modules.Servers.PerformAction(session, instances[i].GetId(), "detach-scaling-group", removeParams)
- if err != nil {
- failedList = append(failedList, fmt.Sprintf("remove instance '%s' failed: %s", instances[i].GetId(), err.Error()))
- continue
- }
- waitList = append(waitList, instances[i].GetId())
- }
- // wait for all requests finished
- succeedList := sets.NewString(waitList...)
- ticker := time.NewTicker(3 * time.Second)
- timer := time.NewTimer(5 * time.Minute)
- Loop:
- for {
- select {
- default:
- sggs, err := sg.ScalingGroupGuests(waitList)
- if err != nil {
- log.Errorf("ScalingGroup.ScalingGroupGuests error: %s", err.Error())
- <-ticker.C
- continue Loop
- }
- waitList = make([]string, 0, 1)
- for i := range sggs {
- switch sggs[i].GuestStatus {
- case compute.SG_GUEST_STATUS_REMOVE_FAILED:
- succeedList.Delete(sggs[i].GetId())
- failedList = append(failedList, fmt.Sprintf("remove instance '%s' failed", sggs[i].GetId()))
- case compute.SG_GUEST_STATUS_READY, compute.SG_GUEST_STATUS_REMOVING, compute.SG_GUEST_STATUS_PENDING_REMOVE:
- waitList = append(waitList, sggs[i].GetId())
- default:
- log.Errorf("unkown guest status for ScalingGroupGuest '%s'", sggs[i].GetId())
- }
- }
- if len(waitList) == 0 {
- break Loop
- }
- <-ticker.C
- case <-timer.C:
- log.Errorf("come check jobs for removing servers timeout")
- for _, id := range waitList {
- failedList = append(failedList, fmt.Sprintf("remove instance '%s' timeout", id))
- succeedList.Delete(id)
- }
- }
- }
- ticker.Stop()
- timer.Stop()
- log.Debugf("finish all check jobs when removing servers")
- err = nil
- if len(failedList) != 0 {
- err = fmt.Errorf("%s", strings.Join(failedList, "; "))
- }
- instanceRet := make([]SInstance, 0, succeedList.Len())
- for _, id := range succeedList.UnsortedList() {
- instanceRet = append(instanceRet, instanceMap[id])
- }
- return instanceRet, err
- }
- func (asc *SASController) findSuitableInstance(sg *models.SScalingGroup, num int) ([]models.SGuest, error) {
- ggSubQ := models.ScalingGroupGuestManager.Query("guest_id").Equals("scaling_group_id", sg.Id).SubQuery()
- guestQ := models.GuestManager.Query().In("id", ggSubQ)
- switch sg.ShrinkPrinciple {
- case compute.SHRINK_EARLIEST_CREATION_FIRST:
- guestQ = guestQ.Asc("created_at").Limit(num)
- case compute.SHRINK_LATEST_CREATION_FIRST:
- guestQ = guestQ.Desc("created_at").Limit(num)
- }
- guests := make([]models.SGuest, 0, num)
- err := db.FetchModelObjects(models.GuestManager, guestQ, &guests)
- if err != nil {
- return nil, err
- }
- return guests, nil
- }
- func (asc *SASController) createInstances(session *mcclient.ClientSession, params jsonutils.JSONObject, count int,
- failedList []string, succeedList []SInstance) ([]string, []SInstance) {
- // forcast first
- dict := params.(*jsonutils.JSONDict)
- dict.Set("count", jsonutils.NewInt(int64(count)))
- _, err := scheduler.SchedManager.DoForecast(session, dict)
- if err != nil {
- clientErr := err.(*httputils.JSONClientError)
- failedList = append(failedList, clientErr.Details)
- }
- dict.Remove("domain_id")
- dict.Remove("count")
- if count == 1 {
- ret, err := modules.Servers.Create(session, params)
- if err != nil {
- clientErr := err.(*httputils.JSONClientError)
- failedList = append(failedList, clientErr.Details)
- return failedList, succeedList
- }
- id, _ := ret.GetString("id")
- name, _ := ret.GetString("name")
- succeedList = append(succeedList, SInstance{id, name})
- return failedList, succeedList
- }
- rets := modules.Servers.BatchCreate(session, params, count)
- for _, ret := range rets {
- if ret.Status >= 400 {
- failedList = append(failedList, ret.Data.String())
- } else {
- id, _ := ret.Data.GetString("id")
- name, _ := ret.Data.GetString("name")
- succeedList = append(succeedList, SInstance{id, name})
- }
- }
- return failedList, succeedList
- }
- type SInstance struct {
- ID string
- Name string
- }
- func (asc *SASController) CreateInstances(
- ctx context.Context,
- userCred mcclient.TokenCredential,
- ownerId mcclient.IIdentityProvider,
- sg *models.SScalingGroup,
- gt *models.SGuestTemplate,
- defaultNet string,
- num int,
- ) ([]SInstance, error) {
- // build the create request data
- content := gt.Content.(*jsonutils.JSONDict)
- // no network, add a default
- if len(gt.VpcId) == 0 {
- net := jsonutils.NewDict()
- net.Set("network", jsonutils.NewString(defaultNet))
- content.Set("nets", jsonutils.NewArray(net))
- }
- content.Set("auto_start", jsonutils.JSONTrue)
- // set description
- content.Set("description", jsonutils.NewString(fmt.Sprintf("Belong to scaling group '%s'", sg.Name)))
- // For compatibility
- content.Remove("__count__")
- // set onwer project and id
- content.Set("project_id", jsonutils.NewString(ownerId.GetProjectId()))
- content.Set("user_id", jsonutils.NewString(ownerId.GetUserId()))
- countPR, requests := asc.countPRAndRequests(num)
- log.Debugf("countPR: %d, requests: %d", countPR, requests)
- rand.Seed(time.Now().UnixNano())
- session := auth.GetSession(ctx, userCred, "")
- failedList := make([]string, 0)
- succeedList := make([]SInstance, 0, num/2)
- // fisrt stage: create request
- for rn := 0; rn < requests; rn++ {
- generateName := fmt.Sprintf("sg-%s-%s", sg.Name, asc.randStringRunes(5))
- content.Set("generate_name", jsonutils.NewString(generateName))
- failedList, succeedList = asc.createInstances(session, content, countPR, failedList, succeedList)
- }
- if remain := num - requests*countPR; remain > 0 {
- generateName := fmt.Sprintf("sg-%s-%s", sg.Name, asc.randStringRunes(5))
- content.Set("generate_name", jsonutils.NewString(generateName))
- failedList, succeedList = asc.createInstances(session, content, remain, failedList, succeedList)
- }
- // second stage: joining scaling group
- for _, instance := range succeedList {
- err := models.ScalingGroupGuestManager.Attach(ctx, sg.Id, instance.ID, false)
- if err != nil {
- log.Errorf("Attach ScalingGroup '%s' with Guest '%s' failed", sg.Id, instance.ID)
- }
- }
- // third stage: wait for create complete
- retChan := make(chan SCreateRet, requests)
- guestIds := make([]string, len(succeedList))
- instanceMap := make(map[string]SInstance, len(succeedList))
- for i, instane := range succeedList {
- guestIds[i] = instane.ID
- instanceMap[instane.ID] = instane
- }
- // check all server's status
- var waitLimit, waitinterval time.Duration
- if sg.Hypervisor == compute.HYPERVISOR_KVM {
- waitLimit = 5 * time.Minute
- waitinterval = 3 * time.Second
- } else {
- waitLimit = 10 * time.Minute
- waitinterval = 10 * time.Second
- }
- go asc.checkAllServer(session, guestIds, retChan, waitLimit, waitinterval)
- // fourth stage: bind lb and db
- failRecord := &SFailRecord{
- recordList: failedList,
- }
- succeedInstances := make([]string, 0, len(succeedList))
- workerLimit := make(chan struct{}, requests)
- for {
- ret, ok := <-retChan
- if !ok {
- break
- }
- workerLimit <- struct{}{}
- // bind ld and db
- go func() {
- succeed := asc.actionAfterCreate(ctx, userCred, session, sg, ret, failRecord)
- log.Debugf("action after create instance '%s', succeed: %t", ret.Id, succeed)
- if succeed {
- succeedInstances = append(succeedInstances, ret.Id)
- }
- <-workerLimit
- }()
- }
- log.Debugf("wait fo all worker finish")
- // wait for all worker finish
- log.Debugf("workerlimit cap: %d", cap(workerLimit))
- for i := 0; i < cap(workerLimit); i++ {
- log.Debugf("no.%d insert worker limit", i)
- workerLimit <- struct{}{}
- }
- instances := make([]SInstance, 0, len(succeedInstances))
- for _, id := range succeedInstances {
- instances = append(instances, instanceMap[id])
- }
- return instances, fmt.Errorf("%s", failRecord.String())
- }
- type SCreateRet struct {
- Id string
- Status string
- }
- func (asc *SASController) checkAllServer(session *mcclient.ClientSession, guestIds []string, retChan chan SCreateRet,
- waitLimit, waitInterval time.Duration) {
- guestIDSet := sets.NewString(guestIds...)
- timer := time.NewTimer(waitLimit)
- ticker := time.NewTicker(waitInterval)
- defer func() {
- close(retChan)
- ticker.Stop()
- timer.Stop()
- log.Debugf("finish all check jobs when creating servers")
- }()
- log.Debugf("guestIds: %s", guestIds)
- for {
- select {
- default:
- for _, id := range guestIDSet.UnsortedList() {
- ret, e := modules.Servers.GetSpecific(session, id, "status", nil)
- if e != nil {
- log.Errorf("Servers.GetSpecific failed: %s", e)
- <-ticker.C
- continue
- }
- log.Debugf("ret from GetSpecific: %s", ret.String())
- status, _ := ret.GetString("status")
- if status == compute.VM_RUNNING || strings.HasSuffix(status, "fail") || strings.HasSuffix(status, "failed") {
- guestIDSet.Delete(id)
- retChan <- SCreateRet{
- Id: id,
- Status: status,
- }
- }
- }
- if guestIDSet.Len() == 0 {
- return
- }
- <-ticker.C
- case <-timer.C:
- log.Errorf("some check jobs for server timeout")
- for _, id := range guestIDSet.UnsortedList() {
- retChan <- SCreateRet{
- Id: id,
- Status: "timeout",
- }
- }
- return
- }
- }
- }
- type SFailRecord struct {
- lock sync.Mutex
- recordList []string
- }
- func (fr *SFailRecord) Append(record string) {
- fr.lock.Lock()
- defer fr.lock.Unlock()
- fr.recordList = append(fr.recordList, record)
- }
- func (fr *SFailRecord) String() string {
- return strings.Join(fr.recordList, "; ")
- }
- func (asc *SASController) actionAfterCreate(
- ctx context.Context,
- userCred mcclient.TokenCredential,
- session *mcclient.ClientSession,
- sg *models.SScalingGroup,
- ret SCreateRet,
- failRecord *SFailRecord,
- ) (succeed bool) {
- log.Debugf("start to action After create")
- deleteParams := jsonutils.NewDict()
- deleteParams.Set("override_pending_delete", jsonutils.JSONTrue)
- updateParams := jsonutils.NewDict()
- updateParams.Set("disable_delete", jsonutils.JSONFalse)
- rollback := func(failedReason string) {
- failRecord.Append(failedReason)
- // get scalingguest
- sggs, err := models.ScalingGroupGuestManager.Fetch(sg.GetId(), ret.Id)
- if err != nil || len(sggs) == 0 {
- log.Errorf("ScalingGroupGuestManager.Fetch failed: %s", err.Error())
- return
- }
- // cancel delete project
- _, e := modules.Servers.Update(session, ret.Id, updateParams)
- if err != nil {
- sggs[0].SetGuestStatus(compute.SG_GUEST_STATUS_READY)
- log.Errorf("cancel delete project of instance '%s' failed: %s", ret.Id, e.Error())
- return
- }
- // delete corresponding instance
- _, e = modules.Servers.Delete(session, ret.Id, deleteParams)
- if e != nil {
- // delete failed
- sggs[0].SetGuestStatus(compute.SG_GUEST_STATUS_READY)
- log.Errorf("delete instance '%s' failed: %s", ret.Id, e.Error())
- return
- }
- sggs[0].Detach(ctx, userCred)
- return
- }
- if ret.Status != compute.VM_RUNNING {
- if ret.Status == "timeout" {
- rollback(fmt.Sprintf("the creation process for instance '%s' has timed out", ret.Id))
- } else {
- // fetch the reason
- var reason string
- params := jsonutils.NewDict()
- params.Add(jsonutils.NewString(ret.Id), "obj_id")
- params.Add(jsonutils.NewStringArray([]string{db.ACT_ALLOCATE_FAIL}), "action")
- events, err := modules.Logs.List(session, params)
- if err != nil {
- log.Errorf("Logs.List failed: %s", err.Error())
- reason = fmt.Sprintf("instance '%s' which status is '%s' create failed", ret.Id, ret.Status)
- } else {
- switch events.Total {
- case 1:
- reason, _ = events.Data[0].GetString("notes")
- case 0:
- log.Errorf("These is no opslog about action '%s' for instance '%s", db.ACT_ALLOCATE_FAIL, ret.Id)
- reason = fmt.Sprintf("instance '%s' which status is '%s' create failed", ret.Id, ret.Status)
- default:
- log.Debugf("These are more than one optlogs about action '%s' for instance '%s'", db.ACT_ALLOCATE_FAIL, ret.Id)
- reason, _ = events.Data[0].GetString("notes")
- }
- }
- rollback(reason)
- }
- return
- }
- // bind lb
- if len(sg.BackendGroupId) != 0 {
- params := jsonutils.NewDict()
- params.Set("backend", jsonutils.NewString(ret.Id))
- params.Set("backend_type", jsonutils.NewString("guest"))
- params.Set("port", jsonutils.NewInt(int64(sg.LoadbalancerBackendPort)))
- params.Set("weight", jsonutils.NewInt(int64(sg.LoadbalancerBackendWeight)))
- params.Set("backend_group", jsonutils.NewString(sg.BackendGroupId))
- _, err := modules.LoadbalancerBackends.Create(session, params)
- if err != nil {
- rollback(fmt.Sprintf("bind instance '%s' to loadbalancer backend gropu '%s' failed: %s", ret.Id, sg.BackendGroupId, err.Error()))
- }
- }
- // todo bind bd
- // fifth stage: join scaling group finished
- sggs, err := models.ScalingGroupGuestManager.Fetch(sg.GetId(), ret.Id)
- if err != nil || sggs == nil || len(sggs) == 0 {
- log.Errorf("ScalingGroupGuestManager.Fetch failed; ScalingGroup '%s', Guest '%s'", sg.Id, ret.Id)
- return
- }
- sggs[0].SetGuestStatus(compute.SG_GUEST_STATUS_READY)
- return true
- }
- func (asc *SASController) randStringRunes(n int) string {
- var letterRunes = []rune("abcdefghijklmnopqrstuvwxyz1234567890")
- b := make([]rune, n)
- for i := range b {
- b[i] = letterRunes[rand.Intn(len(letterRunes))]
- }
- return string(b)
- }
- func (asc *SASController) countPRAndRequests(num int) (int, int) {
- key, tmp := 5, 1
- countPR, requests := 1, num
- if requests >= key {
- countPR := tmp * key
- requests = num / countPR
- tmp += 1
- }
- return countPR, requests
- }
- // ScalingGroupNeedScale will fetch all ScalingGroup need to scale
- func (asc *SASController) ScalingGroupsNeedScale() ([]SScalingGroupShort, error) {
- rows, err := asc.scalingSql.Rows()
- if err != nil {
- return nil, errors.Wrap(err, "execute scaling sql error")
- }
- defer rows.Close()
- sgShorts := make([]SScalingGroupShort, 0, 10)
- for rows.Next() {
- sgPro := SScalingGroupShort{}
- err := asc.scalingSql.Row2Struct(rows, &sgPro)
- if err != nil {
- return nil, errors.Wrap(err, "sqlchemy.SQuery.Row2Struct error")
- }
- sgShorts = append(sgShorts, sgPro)
- }
- return sgShorts, nil
- }
|