manager.go 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471
  1. // Copyright 2019 Yunion
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package manager
  15. import (
  16. "encoding/json"
  17. "fmt"
  18. "time"
  19. "k8s.io/client-go/kubernetes"
  20. "yunion.io/x/log"
  21. "yunion.io/x/onecloud/pkg/scheduler/api"
  22. "yunion.io/x/onecloud/pkg/scheduler/cache/candidate"
  23. candidatecache "yunion.io/x/onecloud/pkg/scheduler/cache/candidate"
  24. "yunion.io/x/onecloud/pkg/scheduler/core"
  25. "yunion.io/x/onecloud/pkg/scheduler/data_manager"
  26. "yunion.io/x/onecloud/pkg/scheduler/data_manager/common"
  27. schedmodels "yunion.io/x/onecloud/pkg/scheduler/models"
  28. o "yunion.io/x/onecloud/pkg/scheduler/options"
  29. "yunion.io/x/onecloud/pkg/util/k8s"
  30. )
  31. const defaultIgnorePool = true
  32. var schedManager *SchedulerManager
  33. type SchedulerManager struct {
  34. ExpireManager *ExpireManager
  35. CompletedManager *CompletedManager
  36. HistoryManager *HistoryManager
  37. TaskManager *TaskManager
  38. DataManager *data_manager.DataManager
  39. CandidateManager *data_manager.CandidateManager
  40. KubeClusterManager *k8s.SKubeClusterManager
  41. }
  42. func newSchedulerManager(stopCh <-chan struct{}) *SchedulerManager {
  43. sm := &SchedulerManager{}
  44. sm.DataManager = data_manager.NewDataManager(stopCh)
  45. sm.CandidateManager = data_manager.NewCandidateManager(sm.DataManager, stopCh)
  46. sm.ExpireManager = NewExpireManager(stopCh)
  47. sm.CompletedManager = NewCompletedManager(stopCh)
  48. sm.HistoryManager = NewHistoryManager(stopCh)
  49. sm.TaskManager = NewTaskManager(stopCh)
  50. sm.KubeClusterManager = k8s.NewKubeClusterManager(o.Options.Region, 30*time.Second)
  51. common.RegisterCacheManager(sm.CandidateManager)
  52. return sm
  53. }
  54. func GetScheduleManager() *SchedulerManager {
  55. return schedManager
  56. }
  57. func GetK8sClient() (*kubernetes.Clientset, error) {
  58. return GetScheduleManager().KubeClusterManager.GetK8sClient()
  59. }
  60. func InitAndStart(stopCh <-chan struct{}) {
  61. if schedManager != nil {
  62. log.Warningf("Global scheduler already init.")
  63. return
  64. }
  65. schedManager = newSchedulerManager(stopCh)
  66. go schedManager.start()
  67. log.Infof("InitAndStart ok")
  68. }
  69. func (sm *SchedulerManager) start() {
  70. startFuncs := []func(){
  71. sm.ExpireManager.Run,
  72. sm.CompletedManager.Run,
  73. sm.HistoryManager.Run,
  74. sm.TaskManager.Run,
  75. sm.DataManager.Run,
  76. sm.CandidateManager.Run,
  77. //sm.KubeClusterManager.Start,
  78. }
  79. for _, f := range startFuncs {
  80. go f()
  81. }
  82. }
  83. func (sm *SchedulerManager) schedule(info *api.SchedInfo) (*core.ScheduleResult, error) {
  84. // force sync clean expire cache before do schedule
  85. // sm.ExpireManager.Trigger()
  86. log.V(10).Infof("SchedulerManager do schedule, input: %#v", info)
  87. task, err := sm.TaskManager.AddTask(sm, info)
  88. if err != nil {
  89. return nil, err
  90. }
  91. sm.HistoryManager.NewHistoryItem(task)
  92. results, err := task.Wait()
  93. if err != nil {
  94. return nil, err
  95. }
  96. log.V(10).Infof("SchedulerManager finish schedule, selected candidates: %#v", results)
  97. return results, nil
  98. }
  99. // NewSessionID returns the current timestamp of a string type with precision of
  100. // milliseconds. And it should be consistent with the format of Region.
  101. // just like: 1509699887616
  102. func NewSessionID() string {
  103. return fmt.Sprintf("%v", time.Now().UnixNano()/1000000)
  104. }
  105. // Schedule process the request data that is scheduled for dispatch and complements
  106. // the session information.
  107. func Schedule(info *api.SchedInfo) (*core.ScheduleResult, error) {
  108. if len(info.SessionId) == 0 {
  109. info.SessionId = NewSessionID()
  110. }
  111. return schedManager.schedule(info)
  112. }
  113. func IsReady() bool {
  114. return schedManager != nil
  115. }
  116. func GetCandidateManager() *data_manager.CandidateManager {
  117. return schedManager.CandidateManager
  118. }
  119. func Expire(expireArgs *api.ExpireArgs, trigger bool) (*api.ExpireResult, error) {
  120. schedManager.ExpireManager.Add(expireArgs)
  121. if trigger {
  122. schedManager.ExpireManager.Trigger()
  123. }
  124. return &api.ExpireResult{}, nil
  125. }
  126. func CompletedNotify(completedNotifyArgs *api.CompletedNotifyArgs) (*api.CompletedNotifyResult, error) {
  127. schedManager.CompletedManager.Add(completedNotifyArgs)
  128. return &api.CompletedNotifyResult{}, nil
  129. }
  130. func getHostCandidatesList(args *api.CandidateListArgs) (*api.CandidateListResult, error) {
  131. r := new(api.CandidateListResult)
  132. r.Limit = args.Limit
  133. r.Offset = args.Offset
  134. cs, err := GetCandidateManager().GetCandidates(data_manager.CandidateGetArgs{
  135. ResType: "host",
  136. ZoneID: args.Zone,
  137. RegionID: args.Region,
  138. })
  139. if err != nil {
  140. return nil, fmt.Errorf("Get host candidates err: %v", err)
  141. }
  142. return GetCandidateHostList(cs, args, r)
  143. }
  144. func getBaremetalCandidatesList(args *api.CandidateListArgs) (*api.CandidateListResult, error) {
  145. r := new(api.CandidateListResult)
  146. r.Limit = args.Limit
  147. r.Offset = args.Offset
  148. cs, err := GetCandidateManager().GetCandidates(data_manager.CandidateGetArgs{
  149. ResType: "baremetal",
  150. ZoneID: args.Zone,
  151. RegionID: args.Region,
  152. })
  153. if err != nil {
  154. return nil, fmt.Errorf("Get baremetal candidates err: %v", err)
  155. }
  156. return GetCandidateBaremetalList(cs, args, r)
  157. }
  158. func mergeAllCandidateList(host, baremetal *api.CandidateListResult, args *api.CandidateListArgs) (*api.CandidateListResult, error) {
  159. res := make([]api.CandidateListResultItem, 0)
  160. if host.Total == args.Limit {
  161. return host, nil
  162. } else {
  163. // must < args.Limit
  164. res = append(res, host.Data...)
  165. }
  166. for _, bm := range baremetal.Data {
  167. if int64(len(res)) >= args.Limit {
  168. break
  169. }
  170. res = append(res, bm)
  171. }
  172. r := new(api.CandidateListResult)
  173. r.Limit = args.Limit
  174. r.Offset = args.Offset
  175. r.Total = int64(len(res))
  176. r.Data = res
  177. return r, nil
  178. }
  179. func GetCandidateList(args *api.CandidateListArgs) (*api.CandidateListResult, error) {
  180. var (
  181. hostRes, bmRes *api.CandidateListResult
  182. err error
  183. )
  184. switch args.Type {
  185. case "all":
  186. hostRes, err = getHostCandidatesList(args)
  187. if err != nil {
  188. return nil, err
  189. }
  190. bmRes, err = getBaremetalCandidatesList(args)
  191. if err != nil {
  192. return nil, err
  193. }
  194. return mergeAllCandidateList(hostRes, bmRes, args)
  195. case "host":
  196. return getHostCandidatesList(args)
  197. case "baremetal":
  198. return getBaremetalCandidatesList(args)
  199. default:
  200. return nil, fmt.Errorf("Unsupport candidate type %q", args.Type)
  201. }
  202. }
  203. func GetCandidateHostList(
  204. candidates []core.Candidater,
  205. args *api.CandidateListArgs,
  206. r *api.CandidateListResult,
  207. ) (*api.CandidateListResult, error) {
  208. r.Total = int64(len(candidates))
  209. for _, cc := range candidates {
  210. if int64(len(r.Data)) >= args.Limit {
  211. break
  212. }
  213. c := cc.(*candidate.HostDesc)
  214. mem := api.NewResultResourceInt64(
  215. c.GetFreeMemSize(false),
  216. c.GetReservedMemSize(),
  217. c.GetTotalMemSize(false))
  218. cpu := api.NewResultResourceInt64(
  219. c.GetFreeCPUCount(false),
  220. c.GetReservedCPUCount(),
  221. c.GetTotalCPUCount(false))
  222. storage := api.NewResultResourceInt64(
  223. c.GetFreeLocalStorageSize(false),
  224. c.GetReservedStorageSize(),
  225. c.GetTotalLocalStorageSize(false))
  226. item := api.CandidateListResultItem{
  227. ID: c.IndexKey(),
  228. Name: c.Name,
  229. Mem: *mem,
  230. Cpu: *cpu,
  231. Storage: *storage,
  232. Status: c.Status,
  233. HostStatus: c.HostStatus,
  234. HostType: c.GetHostType(),
  235. EnableStatus: c.GetEnableStatus(),
  236. }
  237. pendingUsage, _ := schedmodels.HostPendingUsageManager.GetPendingUsage(c.GetId())
  238. if pendingUsage != nil {
  239. item.PendingUsage = pendingUsage.ToMap()
  240. }
  241. r.Data = append(r.Data, item)
  242. }
  243. return r, nil
  244. }
  245. func GetCandidateBaremetalList(
  246. candidates []core.Candidater,
  247. args *api.CandidateListArgs,
  248. r *api.CandidateListResult,
  249. ) (*api.CandidateListResult, error) {
  250. r.Total = int64(len(candidates))
  251. for _, cc := range candidates {
  252. if int64(len(r.Data)) >= args.Limit {
  253. break
  254. }
  255. c := cc.(*candidate.BaremetalDesc)
  256. mem := api.NewResultResourceInt64(
  257. c.FreeMemSize(),
  258. 0,
  259. int64(c.MemSize))
  260. cpu := api.NewResultResourceInt64(
  261. c.FreeCPUCount(),
  262. 0,
  263. int64(c.CpuCount))
  264. storage := api.NewResultResourceInt64(
  265. c.FreeStorageSize(),
  266. 0,
  267. c.StorageSize)
  268. item := api.CandidateListResultItem{
  269. ID: c.IndexKey(),
  270. Name: c.Name,
  271. Mem: *mem,
  272. Cpu: *cpu,
  273. Storage: *storage,
  274. Status: c.Status,
  275. HostStatus: c.HostStatus,
  276. HostType: c.GetHostType(),
  277. EnableStatus: c.GetEnableStatus(),
  278. }
  279. r.Data = append(r.Data, item)
  280. }
  281. return r, nil
  282. }
  283. func GetCandidateDetail(args *api.CandidateDetailArgs) (*api.CandidateDetailResult, error) {
  284. r := new(api.CandidateDetailResult)
  285. candidate, err := GetCandidateManager().GetCandidate(args.ID, args.Type)
  286. if err != nil {
  287. return nil, err
  288. }
  289. r.Candidate = candidate
  290. return r, nil
  291. }
  292. func Cleanup(cleanupArgs *api.CleanupArgs) (*api.CleanupResult, error) {
  293. r := new(api.CleanupResult)
  294. cm := GetCandidateManager()
  295. if cleanupArgs.ResType != "" {
  296. cm.ReloadAll(cleanupArgs.ResType)
  297. } else {
  298. cm.ReloadAll("host")
  299. cm.ReloadAll("baremetal")
  300. }
  301. return r, nil
  302. }
  303. func GetHistoryList(args *api.HistoryArgs) (*api.HistoryResult, error) {
  304. offset, limit, all, isSuggestion := args.Offset, args.Limit, args.All, args.IsSuggestion
  305. if limit == int64(0) {
  306. limit = int64(50)
  307. }
  308. historyItems, total := schedManager.HistoryManager.GetHistoryList(offset, limit, all, isSuggestion)
  309. items := []*api.HistoryItem{}
  310. for idx := range historyItems {
  311. hi := historyItems[idx]
  312. items = append(items, hi.ToAPI())
  313. }
  314. return &api.HistoryResult{
  315. Items: items,
  316. Offset: offset,
  317. Limit: int64(len(items)),
  318. Total: total,
  319. }, nil
  320. }
  321. func GetHistoryDetail(historyDetailArgs *api.HistoryDetailArgs) (*api.HistoryDetailResult, error) {
  322. historyItem := schedManager.HistoryManager.GetHistory(historyDetailArgs.ID)
  323. if historyItem == nil {
  324. return nil, fmt.Errorf("History '%v' not found", historyDetailArgs.ID)
  325. }
  326. task := historyItem.Task
  327. schedInfo := task.SchedInfo
  328. historyTasks := []api.HistoryTask{}
  329. data := schedInfo
  330. taskExecutor := task.GetTaskExecutor(data.Tag)
  331. historyTask := api.HistoryTask{
  332. Type: data.Hypervisor,
  333. Data: data,
  334. }
  335. if taskExecutor != nil {
  336. historyTask.Status = taskExecutor.Status
  337. historyTask.Time = taskExecutor.Time.Local().Format("2006-01-02 15:04:05")
  338. historyTask.Consuming = fmt.Sprintf("%s", taskExecutor.Consuming)
  339. resultItems, err := taskExecutor.GetResult()
  340. historyTask.Result = resultItems
  341. if err != nil {
  342. historyTask.Error = fmt.Sprintf("%v", err)
  343. }
  344. if historyDetailArgs.Log {
  345. historyTask.Logs = taskExecutor.GetLogs()
  346. historyTask.CapacityMap = taskExecutor.GetCapacityMap()
  347. }
  348. }
  349. historyTasks = append(historyTasks, historyTask)
  350. var inputStr, outputStr, errStr string
  351. result, err := task.GetResult()
  352. if err != nil {
  353. errStr = fmt.Sprintf("%v", err)
  354. } else {
  355. if bytes, err0 := json.MarshalIndent(result, "", " "); err0 == nil {
  356. outputStr = string(bytes)
  357. }
  358. }
  359. if historyDetailArgs.Raw {
  360. inputStr = schedInfo.Raw
  361. }
  362. historyDetail := &api.HistoryDetail{
  363. Time: historyItem.Time.Local().Format("2006-01-02 15:04:05"),
  364. Consuming: fmt.Sprintf("%s", task.Consuming),
  365. SessionID: task.GetSessionID(),
  366. Tasks: historyTasks,
  367. Input: inputStr,
  368. Output: outputStr,
  369. Error: errStr,
  370. }
  371. return &api.HistoryDetailResult{
  372. Detail: historyDetail,
  373. }, nil
  374. }
  375. func GetCandidateHostsDesc() ([]core.Candidater, error) {
  376. return GetCandidateManager().GetCandidates(data_manager.CandidateGetArgs{ResType: "host"})
  377. }
  378. func GetK8sCandidateHosts(nodesName ...string) ([]*candidatecache.HostDesc, error) {
  379. hosts, err := GetCandidateHostsDesc()
  380. if err != nil {
  381. return nil, err
  382. }
  383. findHost := func(nodeName string) *candidatecache.HostDesc {
  384. for _, host := range hosts {
  385. if host.Getter().Name() == nodeName {
  386. return host.(*candidatecache.HostDesc)
  387. }
  388. }
  389. return nil
  390. }
  391. ret := make([]*candidatecache.HostDesc, 0)
  392. for _, nodeName := range nodesName {
  393. if host := findHost(nodeName); host != nil {
  394. ret = append(ret, host)
  395. }
  396. }
  397. return ret, nil
  398. }