task_queue.go 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430
  1. // Copyright 2019 Yunion
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package manager
  15. import (
  16. "context"
  17. "fmt"
  18. "strings"
  19. "sync"
  20. "time"
  21. "yunion.io/x/log"
  22. "yunion.io/x/pkg/errors"
  23. schedapi "yunion.io/x/onecloud/pkg/apis/scheduler"
  24. computemodels "yunion.io/x/onecloud/pkg/compute/models"
  25. "yunion.io/x/onecloud/pkg/scheduler/api"
  26. "yunion.io/x/onecloud/pkg/scheduler/core"
  27. schedmodels "yunion.io/x/onecloud/pkg/scheduler/models"
  28. )
  29. const (
  30. TaskExecutorStatusWaiting string = "waiting"
  31. TaskExecutorStatusRunning string = "running"
  32. TaskExecutorStatusFailed string = "failed"
  33. TaskExecutorStatusKilled string = "killed"
  34. TaskExecutorStatusSuccess string = "success"
  35. )
  36. type TaskExecuteCallback func(task *TaskExecutor)
  37. type TaskExecutor struct {
  38. Tag string
  39. Status string
  40. Time time.Time
  41. Consuming time.Duration
  42. scheduler Scheduler
  43. callback TaskExecuteCallback
  44. unit *core.Unit
  45. resultItems *core.ScheduleResult
  46. resultError error
  47. logs []string
  48. capacityMap interface{}
  49. completed bool
  50. }
  51. func NewTaskExecutor(scheduler Scheduler, taskExecuteCallback TaskExecuteCallback) *TaskExecutor {
  52. return &TaskExecutor{
  53. Tag: scheduler.SchedData().Tag,
  54. Status: TaskExecutorStatusWaiting,
  55. Time: time.Now(),
  56. scheduler: scheduler,
  57. callback: taskExecuteCallback,
  58. completed: false,
  59. }
  60. }
  61. func (te *TaskExecutor) Execute(ctx context.Context) {
  62. te.Status = TaskExecutorStatusRunning
  63. te.resultItems, te.resultError = te.execute(ctx)
  64. te.completed = true
  65. if te.resultError != nil {
  66. te.Status = TaskExecutorStatusFailed
  67. } else {
  68. te.Status = TaskExecutorStatusSuccess
  69. }
  70. if te.callback != nil {
  71. te.callback(te)
  72. }
  73. }
  74. // do execute schedule()
  75. func (te *TaskExecutor) execute(ctx context.Context) (*core.ScheduleResult, error) {
  76. scheduler := te.scheduler
  77. genericScheduler, err := core.NewGenericScheduler(scheduler.(core.Scheduler))
  78. if err != nil {
  79. return nil, err
  80. }
  81. // Get current resources from DB.
  82. candidates, err := scheduler.Candidates()
  83. if err != nil {
  84. return nil, err
  85. }
  86. te.unit = scheduler.Unit()
  87. schedInfo := te.unit.SchedInfo
  88. // generate result helper
  89. helper := GenerateResultHelper(schedInfo)
  90. result, err := genericScheduler.Schedule(ctx, te.unit, candidates, helper)
  91. if err != nil {
  92. return nil, errors.Wrap(err, "genericScheduler.Schedule")
  93. }
  94. if schedInfo.IsSuggestion {
  95. return result, nil
  96. }
  97. driver := te.unit.GetHypervisorDriver()
  98. // set sched pending usage
  99. if err := setSchedPendingUsage(driver, schedInfo, result.Result); err != nil {
  100. return nil, errors.Wrap(err, "setSchedPendingUsage")
  101. }
  102. return result, nil
  103. }
  104. func GenerateResultHelper(schedInfo *api.SchedInfo) core.IResultHelper {
  105. if !schedInfo.IsSuggestion {
  106. return core.SResultHelperFunc(core.ResultHelp)
  107. }
  108. if schedInfo.ShowSuggestionDetails && schedInfo.SuggestionAll {
  109. return core.SResultHelperFunc(core.ResultHelpForForcast)
  110. }
  111. return core.SResultHelperFunc(core.ResultHelpForTest)
  112. }
  113. func setSchedPendingUsage(driver computemodels.IGuestDriver, req *api.SchedInfo, resp *schedapi.ScheduleOutput) error {
  114. if req.IsSuggestion || IsDriverSkipScheduleDirtyMark(driver) {
  115. return nil
  116. }
  117. for i, item := range resp.Candidates {
  118. if item.Error != "" {
  119. // schedule failed skip add pending usage
  120. continue
  121. }
  122. var guestId string
  123. if len(req.GuestIds) > i {
  124. guestId = req.GuestIds[i]
  125. }
  126. schedmodels.HostPendingUsageManager.AddPendingUsage(guestId, req, item)
  127. }
  128. return nil
  129. }
  130. func IsDriverSkipScheduleDirtyMark(driver computemodels.IGuestDriver) bool {
  131. return driver == nil || !(driver.DoScheduleCPUFilter() && driver.DoScheduleMemoryFilter() && driver.DoScheduleStorageFilter())
  132. }
  133. func (te *TaskExecutor) cleanup() {
  134. te.unit = nil
  135. te.scheduler = nil
  136. te.callback = nil
  137. }
  138. func (te *TaskExecutor) Kill() {
  139. if te.Status == TaskExecutorStatusWaiting {
  140. te.Status = TaskExecutorStatusKilled
  141. }
  142. }
  143. func (te *TaskExecutor) GetResult() (*core.ScheduleResult, error) {
  144. return te.resultItems, te.resultError
  145. }
  146. func (te *TaskExecutor) GetLogs() []string {
  147. return te.logs
  148. }
  149. func (te *TaskExecutor) GetCapacityMap() interface{} {
  150. return te.capacityMap
  151. }
  152. type TaskExecutorQueue struct {
  153. schedType string
  154. queue chan *TaskExecutor
  155. running bool
  156. }
  157. func (teq *TaskExecutorQueue) AddTaskExecutor(scheduler Scheduler,
  158. callback TaskExecuteCallback) *TaskExecutor {
  159. taskExecutor := NewTaskExecutor(scheduler, callback)
  160. teq.queue <- taskExecutor
  161. return taskExecutor
  162. }
  163. func NewTaskExecutorQueue(schedType string, stopCh <-chan struct{}) *TaskExecutorQueue {
  164. teq := &TaskExecutorQueue{
  165. schedType: schedType,
  166. running: false,
  167. }
  168. teq.Start(stopCh)
  169. return teq
  170. }
  171. func (teq *TaskExecutorQueue) Start(stopCh <-chan struct{}) {
  172. if teq.running {
  173. return
  174. }
  175. teq.running = true
  176. teq.queue = make(chan *TaskExecutor, 5000)
  177. ctx, ctxCancel := context.WithCancel(context.Background())
  178. go func() {
  179. defer close(teq.queue)
  180. var taskExecutor *TaskExecutor
  181. for taskExecutor = <-teq.queue; teq.running; taskExecutor = <-teq.queue {
  182. if taskExecutor.Status == TaskExecutorStatusWaiting {
  183. taskExecutor.Execute(ctx)
  184. }
  185. }
  186. }()
  187. go func() {
  188. <-stopCh
  189. teq.running = false
  190. teq.queue <- nil
  191. ctxCancel()
  192. }()
  193. }
  194. type TaskExecutorQueueManager struct {
  195. taskExecutorMap map[string]*TaskExecutorQueue
  196. lock sync.Mutex
  197. stopCh <-chan struct{}
  198. }
  199. func NewTaskExecutorQueueManager(stopCh <-chan struct{}) *TaskExecutorQueueManager {
  200. return &TaskExecutorQueueManager{
  201. taskExecutorMap: make(map[string]*TaskExecutorQueue),
  202. lock: sync.Mutex{},
  203. stopCh: stopCh,
  204. }
  205. }
  206. func (teqm *TaskExecutorQueueManager) GetQueue(schedType string) *TaskExecutorQueue {
  207. teqm.lock.Lock()
  208. defer teqm.lock.Unlock()
  209. var (
  210. taskExecutorQueue *TaskExecutorQueue
  211. ok bool
  212. )
  213. if taskExecutorQueue, ok = teqm.taskExecutorMap[schedType]; !ok {
  214. taskExecutorQueue = NewTaskExecutorQueue(schedType, teqm.stopCh)
  215. teqm.taskExecutorMap[schedType] = taskExecutorQueue
  216. }
  217. return taskExecutorQueue
  218. }
  219. func (teqm *TaskExecutorQueueManager) AddTaskExecutor(
  220. scheduler Scheduler, callback TaskExecuteCallback) *TaskExecutor {
  221. schedData := scheduler.SchedData()
  222. taskQueue := teqm.GetQueue(schedData.Hypervisor)
  223. return taskQueue.AddTaskExecutor(scheduler, callback)
  224. }
  225. type TaskManager struct {
  226. taskExecutorQueueManager *TaskExecutorQueueManager
  227. stopCh <-chan struct{}
  228. lock sync.Mutex
  229. }
  230. func NewTaskManager(stopCh <-chan struct{}) *TaskManager {
  231. return &TaskManager{
  232. taskExecutorQueueManager: NewTaskExecutorQueueManager(stopCh),
  233. stopCh: stopCh,
  234. lock: sync.Mutex{},
  235. }
  236. }
  237. func (tm *TaskManager) Run() {
  238. // Do nothing
  239. }
  240. // AddTask provides an interface to increase the scheduling task,
  241. // it will be a scheduling request by the host specification type
  242. // split into multiple scheduling tasks, added to the scheduling
  243. // task manager.
  244. func (tm *TaskManager) AddTask(schedulerManager *SchedulerManager, schedInfo *api.SchedInfo) (*Task, error) {
  245. var (
  246. scheduler Scheduler
  247. err error
  248. )
  249. task := NewTask(schedulerManager, schedInfo)
  250. // Split into multiple scheduling tasks by host specification type.
  251. if schedInfo.Hypervisor == api.SchedTypeBaremetal {
  252. scheduler, err = newBaremetalScheduler(schedulerManager, schedInfo)
  253. } else {
  254. scheduler, err = newGuestScheduler(schedulerManager, schedInfo)
  255. }
  256. if err != nil {
  257. return nil, err
  258. }
  259. taskExecutorCallback := func(taskExecutor *TaskExecutor) {
  260. taskExecutor.Consuming = time.Since(taskExecutor.Time)
  261. task.onTaskCompleted(taskExecutor)
  262. }
  263. tm.lock.Lock()
  264. defer tm.lock.Unlock()
  265. taskExecutor := tm.taskExecutorQueueManager.AddTaskExecutor(scheduler, taskExecutorCallback)
  266. task.taskExecutors = append(task.taskExecutors, taskExecutor)
  267. return task, nil
  268. }
  269. type Task struct {
  270. Time time.Time
  271. SchedInfo *api.SchedInfo
  272. Consuming time.Duration
  273. taskExecutors []*TaskExecutor
  274. manager *SchedulerManager
  275. lock sync.Mutex
  276. waitCh chan struct{}
  277. completedCount int
  278. resultItems *core.ScheduleResult
  279. resultError error
  280. }
  281. func NewTask(manager *SchedulerManager, schedInfo *api.SchedInfo) *Task {
  282. return &Task{
  283. Time: time.Now(),
  284. SchedInfo: schedInfo,
  285. manager: manager,
  286. taskExecutors: []*TaskExecutor{},
  287. lock: sync.Mutex{},
  288. waitCh: make(chan struct{}),
  289. resultError: nil,
  290. }
  291. }
  292. func (t *Task) GetTaskExecutor(tag string) *TaskExecutor {
  293. for _, executor := range t.taskExecutors {
  294. if executor.Tag == tag {
  295. return executor
  296. }
  297. }
  298. return nil
  299. }
  300. func (t *Task) GetSessionID() string {
  301. return t.SchedInfo.SessionId
  302. }
  303. func (t *Task) GetStatus() string {
  304. statusMap := make(map[string]int)
  305. for _, executor := range t.taskExecutors {
  306. if count, ok := statusMap[executor.Status]; ok {
  307. statusMap[executor.Status] = count + 1
  308. } else {
  309. statusMap[executor.Status] = 1
  310. }
  311. }
  312. ss := []string{}
  313. for status, count := range statusMap {
  314. ss = append(ss, fmt.Sprintf("%v %v", count, status))
  315. }
  316. return strings.Join(ss, ", ")
  317. }
  318. func (t *Task) onTaskCompleted(taskExecutor *TaskExecutor) {
  319. t.lock.Lock()
  320. defer t.lock.Unlock()
  321. log.V(10).Infof("onTaskCompleted executor: %#v", taskExecutor)
  322. if taskExecutor.resultError != nil {
  323. t.resultError = taskExecutor.resultError
  324. t.onError()
  325. } else {
  326. t.resultItems = taskExecutor.resultItems
  327. t.completedCount += 1
  328. if t.completedCount >= len(t.taskExecutors) {
  329. t.onCompleted()
  330. }
  331. }
  332. go func() {
  333. t.readLog(taskExecutor)
  334. taskExecutor.cleanup()
  335. }()
  336. }
  337. func (t *Task) readLog(taskExecutor *TaskExecutor) {
  338. u := taskExecutor.unit
  339. if u != nil {
  340. logs := u.LogManager.Read()
  341. taskExecutor.logs = logs
  342. taskExecutor.capacityMap = u.CapacityMap
  343. }
  344. }
  345. func (t *Task) onError() {
  346. for _, taskExecutor := range t.taskExecutors {
  347. taskExecutor.Kill()
  348. }
  349. log.Errorf("Remove Session on error: %v", t.SchedInfo.SessionId)
  350. close(t.waitCh)
  351. }
  352. func (t *Task) onCompleted() {
  353. t.Consuming = time.Since(t.Time)
  354. close(t.waitCh)
  355. }
  356. func (t *Task) Wait() (*core.ScheduleResult, error) {
  357. log.V(10).Infof("Task wait...")
  358. <-t.waitCh
  359. return t.GetResult()
  360. }
  361. func (t *Task) GetResult() (*core.ScheduleResult, error) {
  362. return t.resultItems, t.resultError
  363. }