controller.go 24 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773
  1. // Copyright 2019 Yunion
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package autoscaling
  15. import (
  16. "bytes"
  17. "context"
  18. "database/sql"
  19. "fmt"
  20. "math/rand"
  21. "strings"
  22. "sync"
  23. "time"
  24. "yunion.io/x/jsonutils"
  25. "yunion.io/x/log"
  26. "yunion.io/x/pkg/errors"
  27. "yunion.io/x/pkg/util/httputils"
  28. "yunion.io/x/pkg/util/sets"
  29. "yunion.io/x/sqlchemy"
  30. "yunion.io/x/onecloud/pkg/apis/compute"
  31. "yunion.io/x/onecloud/pkg/cloudcommon/cronman"
  32. "yunion.io/x/onecloud/pkg/cloudcommon/db"
  33. "yunion.io/x/onecloud/pkg/compute/models"
  34. "yunion.io/x/onecloud/pkg/compute/options"
  35. "yunion.io/x/onecloud/pkg/mcclient"
  36. "yunion.io/x/onecloud/pkg/mcclient/auth"
  37. modules "yunion.io/x/onecloud/pkg/mcclient/modules/compute"
  38. "yunion.io/x/onecloud/pkg/mcclient/modules/scheduler"
  39. "yunion.io/x/onecloud/pkg/util/logclient"
  40. "yunion.io/x/onecloud/pkg/util/nopanic"
  41. )
  42. type SASController struct {
  43. options options.SASControllerOptions
  44. scalingQueue chan struct{}
  45. timerQueue chan struct{}
  46. scalingGroupSet *SLockedSet
  47. scalingSql *sqlchemy.SQuery
  48. // record the consecutive failures of scaling group's scale
  49. failRecord map[string]int
  50. }
  51. type SScalingInfo struct {
  52. ScalingGroup *models.SScalingGroup
  53. Total int
  54. }
  55. type SLockedSet struct {
  56. set sets.String
  57. lock sync.Mutex
  58. }
  59. func (set *SLockedSet) Has(s string) bool {
  60. return set.set.Has(s)
  61. }
  62. func (set *SLockedSet) CheckAndInsert(s string) bool {
  63. set.lock.Lock()
  64. defer set.lock.Unlock()
  65. if set.set.Has(s) {
  66. return false
  67. }
  68. set.set.Insert(s)
  69. return true
  70. }
  71. func (set *SLockedSet) Delete(s string) {
  72. set.lock.Lock()
  73. defer set.lock.Unlock()
  74. set.set.Delete(s)
  75. }
  76. var ASController = new(SASController)
  77. func (asc *SASController) Init(options options.SASControllerOptions, cronm *cronman.SCronJobManager) {
  78. asc.options = options
  79. asc.timerQueue = make(chan struct{}, 20)
  80. asc.scalingQueue = make(chan struct{}, options.ConcurrentUpper)
  81. asc.scalingGroupSet = &SLockedSet{set: sets.NewString()}
  82. asc.failRecord = make(map[string]int)
  83. // init scalingSql
  84. sggQ := models.ScalingGroupGuestManager.Query("scaling_group_id").GroupBy("scaling_group_id")
  85. sggQ = sggQ.AppendField(sqlchemy.COUNT("total", sggQ.Field("guest_id")))
  86. sggSubQ := sggQ.SubQuery()
  87. sgQ := models.ScalingGroupManager.Query("id", "desire_instance_number").IsTrue("enabled")
  88. sgQ = sgQ.LeftJoin(sggSubQ, sqlchemy.AND(sqlchemy.Equals(sggSubQ.Field("scaling_group_id"),
  89. sgQ.Field("id")), sqlchemy.NotEquals(sggSubQ.Field("total"), sgQ.Field("desire_instance_number"))))
  90. sgQ.AppendField(sggSubQ.Field("total"))
  91. asc.scalingSql = sgQ
  92. cronm.AddJobAtIntervalsWithStartRun("CheckTimer", time.Duration(options.TimerInterval)*time.Second, asc.Timer, true)
  93. cronm.AddJobAtIntervalsWithStartRun("CheckScale", time.Duration(options.CheckScaleInterval)*time.Second, asc.CheckScale, true)
  94. cronm.AddJobAtIntervalsWithStartRun("CheckInstanceHealth", time.Duration(options.CheckHealthInterval)*time.Minute, asc.CheckInstanceHealth, true)
  95. // check all scaling activity
  96. nopanic.Run(func() {
  97. log.Infof("check and update scaling activities...")
  98. sas := make([]models.SScalingActivity, 0, 10)
  99. q := models.ScalingActivityManager.Query().Equals("status", compute.SA_STATUS_EXEC)
  100. err := db.FetchModelObjects(models.ScalingActivityManager, q, &sas)
  101. if err != nil {
  102. log.Errorf("unable to check and update scaling activities")
  103. return
  104. }
  105. for i := range sas {
  106. sas[i].SetFailed("", "As the service restarts, the status becomes unknown")
  107. }
  108. log.Infof("check and update scalngactivities complete")
  109. })
  110. }
  111. func (asc *SASController) PreScale(group *models.SScalingGroup, userCred mcclient.TokenCredential) bool {
  112. maxFailures := 3
  113. disableReason := fmt.Sprintf("The number of consecutive failures of creating a machine exceeds %d times", maxFailures)
  114. times := asc.failRecord[group.GetId()]
  115. if times >= maxFailures {
  116. _, err := db.Update(group, func() error {
  117. group.SetEnabled(false)
  118. return nil
  119. })
  120. if err != nil {
  121. return false
  122. }
  123. logclient.AddSimpleActionLog(group, logclient.ACT_DISABLE, disableReason, userCred, true)
  124. return false
  125. }
  126. return true
  127. }
  128. func (asc *SASController) Finish(groupId string, success bool) {
  129. asc.scalingGroupSet.Delete(groupId)
  130. if success {
  131. asc.failRecord[groupId] = 0
  132. return
  133. }
  134. asc.failRecord[groupId]++
  135. }
  136. // SScalingGroupShort wrap the ScalingGroup's ID and DesireInstanceNumber with field 'total' which means the total
  137. // guests number in this ScalingGroup
  138. type SScalingGroupShort struct {
  139. ID string
  140. DesireInstanceNumber int `default:"0"`
  141. Total int `default:"0"`
  142. }
  143. func (asc *SASController) CheckScale(ctx context.Context, userCred mcclient.TokenCredential, isStart bool) {
  144. sgShorts, err := asc.ScalingGroupsNeedScale()
  145. if err != nil {
  146. log.Errorf("asc.ScalingGroupNeedScale: %s", err.Error())
  147. return
  148. }
  149. for _, short := range sgShorts {
  150. if short.DesireInstanceNumber == short.Total {
  151. continue
  152. }
  153. insert := asc.scalingGroupSet.CheckAndInsert(short.ID)
  154. if !insert {
  155. log.Infof("A scaling activity of ScalingGroup %s is in progress, so current scaling activity was rejected.", short.ID)
  156. continue
  157. }
  158. asc.scalingQueue <- struct{}{}
  159. go asc.Scale(ctx, userCred, short)
  160. }
  161. // log.Debugf("This cronJob about CheckScale finished")
  162. }
  163. func (asc *SASController) Scale(ctx context.Context, userCred mcclient.TokenCredential, short SScalingGroupShort) {
  164. log.Debugf("scale for ScalingGroup '%s', desire: %d, total: %d", short.ID, short.DesireInstanceNumber, short.Total)
  165. var (
  166. err error
  167. success = true
  168. )
  169. setFail := func(sa *models.SScalingActivity, reason string) {
  170. success = false
  171. err = sa.SetFailed("", reason)
  172. }
  173. defer func() {
  174. if err != nil {
  175. log.Errorf("Scaling for ScalingGroup '%s': %s", short.ID, err.Error())
  176. }
  177. asc.Finish(short.ID, success)
  178. <-asc.scalingQueue
  179. log.Debugf("Scale for ScalingGroup '%s' finished", short.ID)
  180. }()
  181. log.Debugf("fetch the latest data")
  182. // fetch the latest data
  183. model, err := models.ScalingGroupManager.FetchById(short.ID)
  184. if err != nil {
  185. if err == sql.ErrNoRows {
  186. err = nil
  187. }
  188. return
  189. }
  190. sg := model.(*models.SScalingGroup)
  191. if !asc.PreScale(sg, userCred) {
  192. success = true
  193. return
  194. }
  195. total, err := sg.GuestNumber()
  196. if err != nil {
  197. return
  198. }
  199. log.Debugf("total: %d, desire: %d", total, sg.DesireInstanceNumber)
  200. // don't scale
  201. if sg.DesireInstanceNumber-total == 0 {
  202. return
  203. }
  204. log.Debugf("insert sa")
  205. scalingActivity, err := models.ScalingActivityManager.CreateScalingActivity(
  206. ctx,
  207. sg.Id,
  208. fmt.Sprintf(`The Desire Instance Number was changed, so change the Total Instance Number from "%d" to "%d"`,
  209. total, sg.DesireInstanceNumber,
  210. ),
  211. compute.SA_STATUS_EXEC,
  212. )
  213. if err != nil {
  214. return
  215. }
  216. // userCred是管理员,ownerId是拥有者
  217. ownerId := sg.GetOwnerId()
  218. num := sg.DesireInstanceNumber - total
  219. switch {
  220. case num > 0:
  221. // check guest template
  222. gt := sg.GetGuestTemplate()
  223. if gt == nil {
  224. setFail(scalingActivity, fmt.Sprintf("fetch GuestTemplate of ScalingGroup '%s' error", sg.Id))
  225. return
  226. }
  227. nets, err := sg.NetworkIds()
  228. if err != nil {
  229. setFail(scalingActivity, fmt.Sprintf("fetch Networks of ScalingGroup '%s' error", sg.Id))
  230. return
  231. }
  232. valid, msg := gt.Validate(context.TODO(), auth.AdminCredential(), gt.GetOwnerId(),
  233. models.SGuestTemplateValidate{
  234. Hypervisor: sg.Hypervisor,
  235. CloudregionId: sg.CloudregionId,
  236. VpcId: sg.VpcId,
  237. NetworkIds: nets,
  238. },
  239. )
  240. if !valid {
  241. err = scalingActivity.SetReject("", msg)
  242. return
  243. }
  244. if len(nets) == 0 {
  245. setFail(scalingActivity, fmt.Sprintf("empty networks '%s' error", sg.Id))
  246. return
  247. }
  248. succeedInstances, err := asc.CreateInstances(ctx, userCred, ownerId, sg, gt, nets[0], num)
  249. switch len(succeedInstances) {
  250. case 0:
  251. setFail(scalingActivity, fmt.Sprintf("All instances create failed: %s", err.Error()))
  252. case num:
  253. var action bytes.Buffer
  254. action.WriteString("Instances ")
  255. for _, instance := range succeedInstances {
  256. action.WriteString(fmt.Sprintf("'%s', ", instance.Name))
  257. }
  258. action.Truncate(action.Len() - 2)
  259. action.WriteString(" are created")
  260. err = scalingActivity.SetResult(action.String(), compute.SA_STATUS_SUCCEED, "", sg.DesireInstanceNumber)
  261. default:
  262. var action bytes.Buffer
  263. action.WriteString("Instances ")
  264. for _, instance := range succeedInstances {
  265. action.WriteString(fmt.Sprintf("'%s', ", instance.Name))
  266. }
  267. action.Truncate(action.Len() - 2)
  268. action.WriteString(" are created")
  269. err = scalingActivity.SetResult(action.String(), compute.SA_STATUS_PART_SUCCEED, fmt.Sprintf("Some instances create failed: %s", err.Error()), total+len(succeedInstances))
  270. }
  271. return
  272. case num < 0:
  273. num = -num
  274. succeedInstances, err := asc.DetachInstances(ctx, userCred, ownerId, sg, num)
  275. switch len(succeedInstances) {
  276. case 0:
  277. setFail(scalingActivity, fmt.Sprintf("All instance remove failed: %s", err.Error()))
  278. case num:
  279. var action bytes.Buffer
  280. action.WriteString("Instances ")
  281. for _, instance := range succeedInstances {
  282. action.WriteString(fmt.Sprintf("'%s', ", instance.Name))
  283. }
  284. action.Truncate(action.Len() - 2)
  285. action.WriteString(" are deleted")
  286. err = scalingActivity.SetResult(action.String(), compute.SA_STATUS_SUCCEED, "", sg.DesireInstanceNumber)
  287. default:
  288. var action bytes.Buffer
  289. action.WriteString("Instances ")
  290. for _, instance := range succeedInstances {
  291. action.WriteString(fmt.Sprintf("'%s', ", instance.Name))
  292. }
  293. action.Truncate(action.Len() - 2)
  294. action.WriteString(" are deleted")
  295. err = scalingActivity.SetResult(action.String(), compute.SA_STATUS_PART_SUCCEED, fmt.Sprintf("Some instance removed failed: %s", err.Error()), sg.DesireInstanceNumber)
  296. }
  297. return
  298. }
  299. }
  300. func (asc *SASController) DetachInstances(ctx context.Context, userCred mcclient.TokenCredential,
  301. ownerId mcclient.IIdentityProvider, sg *models.SScalingGroup, num int) ([]SInstance, error) {
  302. instances, err := asc.findSuitableInstance(sg, num)
  303. if err != nil {
  304. return nil, errors.Wrap(err, "find suitable instances failed")
  305. }
  306. removeParams := jsonutils.NewDict()
  307. removeParams.Set("scaling_group", jsonutils.NewString(sg.Id))
  308. removeParams.Set("delete_server", jsonutils.JSONTrue)
  309. removeParams.Set("auto", jsonutils.JSONTrue)
  310. session := auth.GetSession(ctx, userCred, "")
  311. failedList := make([]string, 0)
  312. waitList := make([]string, 0, len(instances))
  313. instanceMap := make(map[string]SInstance, len(instances))
  314. // request to detach instances with scaling group
  315. for i := range instances {
  316. instanceMap[instances[i].Id] = SInstance{instances[i].Id, instances[i].Name}
  317. _, err := modules.Servers.PerformAction(session, instances[i].GetId(), "detach-scaling-group", removeParams)
  318. if err != nil {
  319. failedList = append(failedList, fmt.Sprintf("remove instance '%s' failed: %s", instances[i].GetId(), err.Error()))
  320. continue
  321. }
  322. waitList = append(waitList, instances[i].GetId())
  323. }
  324. // wait for all requests finished
  325. succeedList := sets.NewString(waitList...)
  326. ticker := time.NewTicker(3 * time.Second)
  327. timer := time.NewTimer(5 * time.Minute)
  328. Loop:
  329. for {
  330. select {
  331. default:
  332. sggs, err := sg.ScalingGroupGuests(waitList)
  333. if err != nil {
  334. log.Errorf("ScalingGroup.ScalingGroupGuests error: %s", err.Error())
  335. <-ticker.C
  336. continue Loop
  337. }
  338. waitList = make([]string, 0, 1)
  339. for i := range sggs {
  340. switch sggs[i].GuestStatus {
  341. case compute.SG_GUEST_STATUS_REMOVE_FAILED:
  342. succeedList.Delete(sggs[i].GetId())
  343. failedList = append(failedList, fmt.Sprintf("remove instance '%s' failed", sggs[i].GetId()))
  344. case compute.SG_GUEST_STATUS_READY, compute.SG_GUEST_STATUS_REMOVING, compute.SG_GUEST_STATUS_PENDING_REMOVE:
  345. waitList = append(waitList, sggs[i].GetId())
  346. default:
  347. log.Errorf("unkown guest status for ScalingGroupGuest '%s'", sggs[i].GetId())
  348. }
  349. }
  350. if len(waitList) == 0 {
  351. break Loop
  352. }
  353. <-ticker.C
  354. case <-timer.C:
  355. log.Errorf("come check jobs for removing servers timeout")
  356. for _, id := range waitList {
  357. failedList = append(failedList, fmt.Sprintf("remove instance '%s' timeout", id))
  358. succeedList.Delete(id)
  359. }
  360. }
  361. }
  362. ticker.Stop()
  363. timer.Stop()
  364. log.Debugf("finish all check jobs when removing servers")
  365. err = nil
  366. if len(failedList) != 0 {
  367. err = fmt.Errorf("%s", strings.Join(failedList, "; "))
  368. }
  369. instanceRet := make([]SInstance, 0, succeedList.Len())
  370. for _, id := range succeedList.UnsortedList() {
  371. instanceRet = append(instanceRet, instanceMap[id])
  372. }
  373. return instanceRet, err
  374. }
  375. func (asc *SASController) findSuitableInstance(sg *models.SScalingGroup, num int) ([]models.SGuest, error) {
  376. ggSubQ := models.ScalingGroupGuestManager.Query("guest_id").Equals("scaling_group_id", sg.Id).SubQuery()
  377. guestQ := models.GuestManager.Query().In("id", ggSubQ)
  378. switch sg.ShrinkPrinciple {
  379. case compute.SHRINK_EARLIEST_CREATION_FIRST:
  380. guestQ = guestQ.Asc("created_at").Limit(num)
  381. case compute.SHRINK_LATEST_CREATION_FIRST:
  382. guestQ = guestQ.Desc("created_at").Limit(num)
  383. }
  384. guests := make([]models.SGuest, 0, num)
  385. err := db.FetchModelObjects(models.GuestManager, guestQ, &guests)
  386. if err != nil {
  387. return nil, err
  388. }
  389. return guests, nil
  390. }
  391. func (asc *SASController) createInstances(session *mcclient.ClientSession, params jsonutils.JSONObject, count int,
  392. failedList []string, succeedList []SInstance) ([]string, []SInstance) {
  393. // forcast first
  394. dict := params.(*jsonutils.JSONDict)
  395. dict.Set("count", jsonutils.NewInt(int64(count)))
  396. _, err := scheduler.SchedManager.DoForecast(session, dict)
  397. if err != nil {
  398. clientErr := err.(*httputils.JSONClientError)
  399. failedList = append(failedList, clientErr.Details)
  400. }
  401. dict.Remove("domain_id")
  402. dict.Remove("count")
  403. if count == 1 {
  404. ret, err := modules.Servers.Create(session, params)
  405. if err != nil {
  406. clientErr := err.(*httputils.JSONClientError)
  407. failedList = append(failedList, clientErr.Details)
  408. return failedList, succeedList
  409. }
  410. id, _ := ret.GetString("id")
  411. name, _ := ret.GetString("name")
  412. succeedList = append(succeedList, SInstance{id, name})
  413. return failedList, succeedList
  414. }
  415. rets := modules.Servers.BatchCreate(session, params, count)
  416. for _, ret := range rets {
  417. if ret.Status >= 400 {
  418. failedList = append(failedList, ret.Data.String())
  419. } else {
  420. id, _ := ret.Data.GetString("id")
  421. name, _ := ret.Data.GetString("name")
  422. succeedList = append(succeedList, SInstance{id, name})
  423. }
  424. }
  425. return failedList, succeedList
  426. }
  427. type SInstance struct {
  428. ID string
  429. Name string
  430. }
  431. func (asc *SASController) CreateInstances(
  432. ctx context.Context,
  433. userCred mcclient.TokenCredential,
  434. ownerId mcclient.IIdentityProvider,
  435. sg *models.SScalingGroup,
  436. gt *models.SGuestTemplate,
  437. defaultNet string,
  438. num int,
  439. ) ([]SInstance, error) {
  440. // build the create request data
  441. content := gt.Content.(*jsonutils.JSONDict)
  442. // no network, add a default
  443. if len(gt.VpcId) == 0 {
  444. net := jsonutils.NewDict()
  445. net.Set("network", jsonutils.NewString(defaultNet))
  446. content.Set("nets", jsonutils.NewArray(net))
  447. }
  448. content.Set("auto_start", jsonutils.JSONTrue)
  449. // set description
  450. content.Set("description", jsonutils.NewString(fmt.Sprintf("Belong to scaling group '%s'", sg.Name)))
  451. // For compatibility
  452. content.Remove("__count__")
  453. // set onwer project and id
  454. content.Set("project_id", jsonutils.NewString(ownerId.GetProjectId()))
  455. content.Set("user_id", jsonutils.NewString(ownerId.GetUserId()))
  456. countPR, requests := asc.countPRAndRequests(num)
  457. log.Debugf("countPR: %d, requests: %d", countPR, requests)
  458. rand.Seed(time.Now().UnixNano())
  459. session := auth.GetSession(ctx, userCred, "")
  460. failedList := make([]string, 0)
  461. succeedList := make([]SInstance, 0, num/2)
  462. // fisrt stage: create request
  463. for rn := 0; rn < requests; rn++ {
  464. generateName := fmt.Sprintf("sg-%s-%s", sg.Name, asc.randStringRunes(5))
  465. content.Set("generate_name", jsonutils.NewString(generateName))
  466. failedList, succeedList = asc.createInstances(session, content, countPR, failedList, succeedList)
  467. }
  468. if remain := num - requests*countPR; remain > 0 {
  469. generateName := fmt.Sprintf("sg-%s-%s", sg.Name, asc.randStringRunes(5))
  470. content.Set("generate_name", jsonutils.NewString(generateName))
  471. failedList, succeedList = asc.createInstances(session, content, remain, failedList, succeedList)
  472. }
  473. // second stage: joining scaling group
  474. for _, instance := range succeedList {
  475. err := models.ScalingGroupGuestManager.Attach(ctx, sg.Id, instance.ID, false)
  476. if err != nil {
  477. log.Errorf("Attach ScalingGroup '%s' with Guest '%s' failed", sg.Id, instance.ID)
  478. }
  479. }
  480. // third stage: wait for create complete
  481. retChan := make(chan SCreateRet, requests)
  482. guestIds := make([]string, len(succeedList))
  483. instanceMap := make(map[string]SInstance, len(succeedList))
  484. for i, instane := range succeedList {
  485. guestIds[i] = instane.ID
  486. instanceMap[instane.ID] = instane
  487. }
  488. // check all server's status
  489. var waitLimit, waitinterval time.Duration
  490. if sg.Hypervisor == compute.HYPERVISOR_KVM {
  491. waitLimit = 5 * time.Minute
  492. waitinterval = 3 * time.Second
  493. } else {
  494. waitLimit = 10 * time.Minute
  495. waitinterval = 10 * time.Second
  496. }
  497. go asc.checkAllServer(session, guestIds, retChan, waitLimit, waitinterval)
  498. // fourth stage: bind lb and db
  499. failRecord := &SFailRecord{
  500. recordList: failedList,
  501. }
  502. succeedInstances := make([]string, 0, len(succeedList))
  503. workerLimit := make(chan struct{}, requests)
  504. for {
  505. ret, ok := <-retChan
  506. if !ok {
  507. break
  508. }
  509. workerLimit <- struct{}{}
  510. // bind ld and db
  511. go func() {
  512. succeed := asc.actionAfterCreate(ctx, userCred, session, sg, ret, failRecord)
  513. log.Debugf("action after create instance '%s', succeed: %t", ret.Id, succeed)
  514. if succeed {
  515. succeedInstances = append(succeedInstances, ret.Id)
  516. }
  517. <-workerLimit
  518. }()
  519. }
  520. log.Debugf("wait fo all worker finish")
  521. // wait for all worker finish
  522. log.Debugf("workerlimit cap: %d", cap(workerLimit))
  523. for i := 0; i < cap(workerLimit); i++ {
  524. log.Debugf("no.%d insert worker limit", i)
  525. workerLimit <- struct{}{}
  526. }
  527. instances := make([]SInstance, 0, len(succeedInstances))
  528. for _, id := range succeedInstances {
  529. instances = append(instances, instanceMap[id])
  530. }
  531. return instances, fmt.Errorf("%s", failRecord.String())
  532. }
  533. type SCreateRet struct {
  534. Id string
  535. Status string
  536. }
  537. func (asc *SASController) checkAllServer(session *mcclient.ClientSession, guestIds []string, retChan chan SCreateRet,
  538. waitLimit, waitInterval time.Duration) {
  539. guestIDSet := sets.NewString(guestIds...)
  540. timer := time.NewTimer(waitLimit)
  541. ticker := time.NewTicker(waitInterval)
  542. defer func() {
  543. close(retChan)
  544. ticker.Stop()
  545. timer.Stop()
  546. log.Debugf("finish all check jobs when creating servers")
  547. }()
  548. log.Debugf("guestIds: %s", guestIds)
  549. for {
  550. select {
  551. default:
  552. for _, id := range guestIDSet.UnsortedList() {
  553. ret, e := modules.Servers.GetSpecific(session, id, "status", nil)
  554. if e != nil {
  555. log.Errorf("Servers.GetSpecific failed: %s", e)
  556. <-ticker.C
  557. continue
  558. }
  559. log.Debugf("ret from GetSpecific: %s", ret.String())
  560. status, _ := ret.GetString("status")
  561. if status == compute.VM_RUNNING || strings.HasSuffix(status, "fail") || strings.HasSuffix(status, "failed") {
  562. guestIDSet.Delete(id)
  563. retChan <- SCreateRet{
  564. Id: id,
  565. Status: status,
  566. }
  567. }
  568. }
  569. if guestIDSet.Len() == 0 {
  570. return
  571. }
  572. <-ticker.C
  573. case <-timer.C:
  574. log.Errorf("some check jobs for server timeout")
  575. for _, id := range guestIDSet.UnsortedList() {
  576. retChan <- SCreateRet{
  577. Id: id,
  578. Status: "timeout",
  579. }
  580. }
  581. return
  582. }
  583. }
  584. }
  585. type SFailRecord struct {
  586. lock sync.Mutex
  587. recordList []string
  588. }
  589. func (fr *SFailRecord) Append(record string) {
  590. fr.lock.Lock()
  591. defer fr.lock.Unlock()
  592. fr.recordList = append(fr.recordList, record)
  593. }
  594. func (fr *SFailRecord) String() string {
  595. return strings.Join(fr.recordList, "; ")
  596. }
  597. func (asc *SASController) actionAfterCreate(
  598. ctx context.Context,
  599. userCred mcclient.TokenCredential,
  600. session *mcclient.ClientSession,
  601. sg *models.SScalingGroup,
  602. ret SCreateRet,
  603. failRecord *SFailRecord,
  604. ) (succeed bool) {
  605. log.Debugf("start to action After create")
  606. deleteParams := jsonutils.NewDict()
  607. deleteParams.Set("override_pending_delete", jsonutils.JSONTrue)
  608. updateParams := jsonutils.NewDict()
  609. updateParams.Set("disable_delete", jsonutils.JSONFalse)
  610. rollback := func(failedReason string) {
  611. failRecord.Append(failedReason)
  612. // get scalingguest
  613. sggs, err := models.ScalingGroupGuestManager.Fetch(sg.GetId(), ret.Id)
  614. if err != nil || len(sggs) == 0 {
  615. log.Errorf("ScalingGroupGuestManager.Fetch failed: %s", err.Error())
  616. return
  617. }
  618. // cancel delete project
  619. _, e := modules.Servers.Update(session, ret.Id, updateParams)
  620. if err != nil {
  621. sggs[0].SetGuestStatus(compute.SG_GUEST_STATUS_READY)
  622. log.Errorf("cancel delete project of instance '%s' failed: %s", ret.Id, e.Error())
  623. return
  624. }
  625. // delete corresponding instance
  626. _, e = modules.Servers.Delete(session, ret.Id, deleteParams)
  627. if e != nil {
  628. // delete failed
  629. sggs[0].SetGuestStatus(compute.SG_GUEST_STATUS_READY)
  630. log.Errorf("delete instance '%s' failed: %s", ret.Id, e.Error())
  631. return
  632. }
  633. sggs[0].Detach(ctx, userCred)
  634. return
  635. }
  636. if ret.Status != compute.VM_RUNNING {
  637. if ret.Status == "timeout" {
  638. rollback(fmt.Sprintf("the creation process for instance '%s' has timed out", ret.Id))
  639. } else {
  640. // fetch the reason
  641. var reason string
  642. params := jsonutils.NewDict()
  643. params.Add(jsonutils.NewString(ret.Id), "obj_id")
  644. params.Add(jsonutils.NewStringArray([]string{db.ACT_ALLOCATE_FAIL}), "action")
  645. events, err := modules.Logs.List(session, params)
  646. if err != nil {
  647. log.Errorf("Logs.List failed: %s", err.Error())
  648. reason = fmt.Sprintf("instance '%s' which status is '%s' create failed", ret.Id, ret.Status)
  649. } else {
  650. switch events.Total {
  651. case 1:
  652. reason, _ = events.Data[0].GetString("notes")
  653. case 0:
  654. log.Errorf("These is no opslog about action '%s' for instance '%s", db.ACT_ALLOCATE_FAIL, ret.Id)
  655. reason = fmt.Sprintf("instance '%s' which status is '%s' create failed", ret.Id, ret.Status)
  656. default:
  657. log.Debugf("These are more than one optlogs about action '%s' for instance '%s'", db.ACT_ALLOCATE_FAIL, ret.Id)
  658. reason, _ = events.Data[0].GetString("notes")
  659. }
  660. }
  661. rollback(reason)
  662. }
  663. return
  664. }
  665. // bind lb
  666. if len(sg.BackendGroupId) != 0 {
  667. params := jsonutils.NewDict()
  668. params.Set("backend", jsonutils.NewString(ret.Id))
  669. params.Set("backend_type", jsonutils.NewString("guest"))
  670. params.Set("port", jsonutils.NewInt(int64(sg.LoadbalancerBackendPort)))
  671. params.Set("weight", jsonutils.NewInt(int64(sg.LoadbalancerBackendWeight)))
  672. params.Set("backend_group", jsonutils.NewString(sg.BackendGroupId))
  673. _, err := modules.LoadbalancerBackends.Create(session, params)
  674. if err != nil {
  675. rollback(fmt.Sprintf("bind instance '%s' to loadbalancer backend gropu '%s' failed: %s", ret.Id, sg.BackendGroupId, err.Error()))
  676. }
  677. }
  678. // todo bind bd
  679. // fifth stage: join scaling group finished
  680. sggs, err := models.ScalingGroupGuestManager.Fetch(sg.GetId(), ret.Id)
  681. if err != nil || sggs == nil || len(sggs) == 0 {
  682. log.Errorf("ScalingGroupGuestManager.Fetch failed; ScalingGroup '%s', Guest '%s'", sg.Id, ret.Id)
  683. return
  684. }
  685. sggs[0].SetGuestStatus(compute.SG_GUEST_STATUS_READY)
  686. return true
  687. }
  688. func (asc *SASController) randStringRunes(n int) string {
  689. var letterRunes = []rune("abcdefghijklmnopqrstuvwxyz1234567890")
  690. b := make([]rune, n)
  691. for i := range b {
  692. b[i] = letterRunes[rand.Intn(len(letterRunes))]
  693. }
  694. return string(b)
  695. }
  696. func (asc *SASController) countPRAndRequests(num int) (int, int) {
  697. key, tmp := 5, 1
  698. countPR, requests := 1, num
  699. if requests >= key {
  700. countPR := tmp * key
  701. requests = num / countPR
  702. tmp += 1
  703. }
  704. return countPR, requests
  705. }
  706. // ScalingGroupNeedScale will fetch all ScalingGroup need to scale
  707. func (asc *SASController) ScalingGroupsNeedScale() ([]SScalingGroupShort, error) {
  708. rows, err := asc.scalingSql.Rows()
  709. if err != nil {
  710. return nil, errors.Wrap(err, "execute scaling sql error")
  711. }
  712. defer rows.Close()
  713. sgShorts := make([]SScalingGroupShort, 0, 10)
  714. for rows.Next() {
  715. sgPro := SScalingGroupShort{}
  716. err := asc.scalingSql.Row2Struct(rows, &sgPro)
  717. if err != nil {
  718. return nil, errors.Wrap(err, "sqlchemy.SQuery.Row2Struct error")
  719. }
  720. sgShorts = append(sgShorts, sgPro)
  721. }
  722. return sgShorts, nil
  723. }