| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471 |
- // Copyright 2019 Yunion
- //
- // Licensed under the Apache License, Version 2.0 (the "License");
- // you may not use this file except in compliance with the License.
- // You may obtain a copy of the License at
- //
- // http://www.apache.org/licenses/LICENSE-2.0
- //
- // Unless required by applicable law or agreed to in writing, software
- // distributed under the License is distributed on an "AS IS" BASIS,
- // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- // See the License for the specific language governing permissions and
- // limitations under the License.
- package manager
- import (
- "encoding/json"
- "fmt"
- "time"
- "k8s.io/client-go/kubernetes"
- "yunion.io/x/log"
- "yunion.io/x/onecloud/pkg/scheduler/api"
- "yunion.io/x/onecloud/pkg/scheduler/cache/candidate"
- candidatecache "yunion.io/x/onecloud/pkg/scheduler/cache/candidate"
- "yunion.io/x/onecloud/pkg/scheduler/core"
- "yunion.io/x/onecloud/pkg/scheduler/data_manager"
- "yunion.io/x/onecloud/pkg/scheduler/data_manager/common"
- schedmodels "yunion.io/x/onecloud/pkg/scheduler/models"
- o "yunion.io/x/onecloud/pkg/scheduler/options"
- "yunion.io/x/onecloud/pkg/util/k8s"
- )
- const defaultIgnorePool = true
- var schedManager *SchedulerManager
- type SchedulerManager struct {
- ExpireManager *ExpireManager
- CompletedManager *CompletedManager
- HistoryManager *HistoryManager
- TaskManager *TaskManager
- DataManager *data_manager.DataManager
- CandidateManager *data_manager.CandidateManager
- KubeClusterManager *k8s.SKubeClusterManager
- }
- func newSchedulerManager(stopCh <-chan struct{}) *SchedulerManager {
- sm := &SchedulerManager{}
- sm.DataManager = data_manager.NewDataManager(stopCh)
- sm.CandidateManager = data_manager.NewCandidateManager(sm.DataManager, stopCh)
- sm.ExpireManager = NewExpireManager(stopCh)
- sm.CompletedManager = NewCompletedManager(stopCh)
- sm.HistoryManager = NewHistoryManager(stopCh)
- sm.TaskManager = NewTaskManager(stopCh)
- sm.KubeClusterManager = k8s.NewKubeClusterManager(o.Options.Region, 30*time.Second)
- common.RegisterCacheManager(sm.CandidateManager)
- return sm
- }
- func GetScheduleManager() *SchedulerManager {
- return schedManager
- }
- func GetK8sClient() (*kubernetes.Clientset, error) {
- return GetScheduleManager().KubeClusterManager.GetK8sClient()
- }
- func InitAndStart(stopCh <-chan struct{}) {
- if schedManager != nil {
- log.Warningf("Global scheduler already init.")
- return
- }
- schedManager = newSchedulerManager(stopCh)
- go schedManager.start()
- log.Infof("InitAndStart ok")
- }
- func (sm *SchedulerManager) start() {
- startFuncs := []func(){
- sm.ExpireManager.Run,
- sm.CompletedManager.Run,
- sm.HistoryManager.Run,
- sm.TaskManager.Run,
- sm.DataManager.Run,
- sm.CandidateManager.Run,
- //sm.KubeClusterManager.Start,
- }
- for _, f := range startFuncs {
- go f()
- }
- }
- func (sm *SchedulerManager) schedule(info *api.SchedInfo) (*core.ScheduleResult, error) {
- // force sync clean expire cache before do schedule
- // sm.ExpireManager.Trigger()
- log.V(10).Infof("SchedulerManager do schedule, input: %#v", info)
- task, err := sm.TaskManager.AddTask(sm, info)
- if err != nil {
- return nil, err
- }
- sm.HistoryManager.NewHistoryItem(task)
- results, err := task.Wait()
- if err != nil {
- return nil, err
- }
- log.V(10).Infof("SchedulerManager finish schedule, selected candidates: %#v", results)
- return results, nil
- }
- // NewSessionID returns the current timestamp of a string type with precision of
- // milliseconds. And it should be consistent with the format of Region.
- // just like: 1509699887616
- func NewSessionID() string {
- return fmt.Sprintf("%v", time.Now().UnixNano()/1000000)
- }
- // Schedule process the request data that is scheduled for dispatch and complements
- // the session information.
- func Schedule(info *api.SchedInfo) (*core.ScheduleResult, error) {
- if len(info.SessionId) == 0 {
- info.SessionId = NewSessionID()
- }
- return schedManager.schedule(info)
- }
- func IsReady() bool {
- return schedManager != nil
- }
- func GetCandidateManager() *data_manager.CandidateManager {
- return schedManager.CandidateManager
- }
- func Expire(expireArgs *api.ExpireArgs, trigger bool) (*api.ExpireResult, error) {
- schedManager.ExpireManager.Add(expireArgs)
- if trigger {
- schedManager.ExpireManager.Trigger()
- }
- return &api.ExpireResult{}, nil
- }
- func CompletedNotify(completedNotifyArgs *api.CompletedNotifyArgs) (*api.CompletedNotifyResult, error) {
- schedManager.CompletedManager.Add(completedNotifyArgs)
- return &api.CompletedNotifyResult{}, nil
- }
- func getHostCandidatesList(args *api.CandidateListArgs) (*api.CandidateListResult, error) {
- r := new(api.CandidateListResult)
- r.Limit = args.Limit
- r.Offset = args.Offset
- cs, err := GetCandidateManager().GetCandidates(data_manager.CandidateGetArgs{
- ResType: "host",
- ZoneID: args.Zone,
- RegionID: args.Region,
- })
- if err != nil {
- return nil, fmt.Errorf("Get host candidates err: %v", err)
- }
- return GetCandidateHostList(cs, args, r)
- }
- func getBaremetalCandidatesList(args *api.CandidateListArgs) (*api.CandidateListResult, error) {
- r := new(api.CandidateListResult)
- r.Limit = args.Limit
- r.Offset = args.Offset
- cs, err := GetCandidateManager().GetCandidates(data_manager.CandidateGetArgs{
- ResType: "baremetal",
- ZoneID: args.Zone,
- RegionID: args.Region,
- })
- if err != nil {
- return nil, fmt.Errorf("Get baremetal candidates err: %v", err)
- }
- return GetCandidateBaremetalList(cs, args, r)
- }
- func mergeAllCandidateList(host, baremetal *api.CandidateListResult, args *api.CandidateListArgs) (*api.CandidateListResult, error) {
- res := make([]api.CandidateListResultItem, 0)
- if host.Total == args.Limit {
- return host, nil
- } else {
- // must < args.Limit
- res = append(res, host.Data...)
- }
- for _, bm := range baremetal.Data {
- if int64(len(res)) >= args.Limit {
- break
- }
- res = append(res, bm)
- }
- r := new(api.CandidateListResult)
- r.Limit = args.Limit
- r.Offset = args.Offset
- r.Total = int64(len(res))
- r.Data = res
- return r, nil
- }
- func GetCandidateList(args *api.CandidateListArgs) (*api.CandidateListResult, error) {
- var (
- hostRes, bmRes *api.CandidateListResult
- err error
- )
- switch args.Type {
- case "all":
- hostRes, err = getHostCandidatesList(args)
- if err != nil {
- return nil, err
- }
- bmRes, err = getBaremetalCandidatesList(args)
- if err != nil {
- return nil, err
- }
- return mergeAllCandidateList(hostRes, bmRes, args)
- case "host":
- return getHostCandidatesList(args)
- case "baremetal":
- return getBaremetalCandidatesList(args)
- default:
- return nil, fmt.Errorf("Unsupport candidate type %q", args.Type)
- }
- }
- func GetCandidateHostList(
- candidates []core.Candidater,
- args *api.CandidateListArgs,
- r *api.CandidateListResult,
- ) (*api.CandidateListResult, error) {
- r.Total = int64(len(candidates))
- for _, cc := range candidates {
- if int64(len(r.Data)) >= args.Limit {
- break
- }
- c := cc.(*candidate.HostDesc)
- mem := api.NewResultResourceInt64(
- c.GetFreeMemSize(false),
- c.GetReservedMemSize(),
- c.GetTotalMemSize(false))
- cpu := api.NewResultResourceInt64(
- c.GetFreeCPUCount(false),
- c.GetReservedCPUCount(),
- c.GetTotalCPUCount(false))
- storage := api.NewResultResourceInt64(
- c.GetFreeLocalStorageSize(false),
- c.GetReservedStorageSize(),
- c.GetTotalLocalStorageSize(false))
- item := api.CandidateListResultItem{
- ID: c.IndexKey(),
- Name: c.Name,
- Mem: *mem,
- Cpu: *cpu,
- Storage: *storage,
- Status: c.Status,
- HostStatus: c.HostStatus,
- HostType: c.GetHostType(),
- EnableStatus: c.GetEnableStatus(),
- }
- pendingUsage, _ := schedmodels.HostPendingUsageManager.GetPendingUsage(c.GetId())
- if pendingUsage != nil {
- item.PendingUsage = pendingUsage.ToMap()
- }
- r.Data = append(r.Data, item)
- }
- return r, nil
- }
- func GetCandidateBaremetalList(
- candidates []core.Candidater,
- args *api.CandidateListArgs,
- r *api.CandidateListResult,
- ) (*api.CandidateListResult, error) {
- r.Total = int64(len(candidates))
- for _, cc := range candidates {
- if int64(len(r.Data)) >= args.Limit {
- break
- }
- c := cc.(*candidate.BaremetalDesc)
- mem := api.NewResultResourceInt64(
- c.FreeMemSize(),
- 0,
- int64(c.MemSize))
- cpu := api.NewResultResourceInt64(
- c.FreeCPUCount(),
- 0,
- int64(c.CpuCount))
- storage := api.NewResultResourceInt64(
- c.FreeStorageSize(),
- 0,
- c.StorageSize)
- item := api.CandidateListResultItem{
- ID: c.IndexKey(),
- Name: c.Name,
- Mem: *mem,
- Cpu: *cpu,
- Storage: *storage,
- Status: c.Status,
- HostStatus: c.HostStatus,
- HostType: c.GetHostType(),
- EnableStatus: c.GetEnableStatus(),
- }
- r.Data = append(r.Data, item)
- }
- return r, nil
- }
- func GetCandidateDetail(args *api.CandidateDetailArgs) (*api.CandidateDetailResult, error) {
- r := new(api.CandidateDetailResult)
- candidate, err := GetCandidateManager().GetCandidate(args.ID, args.Type)
- if err != nil {
- return nil, err
- }
- r.Candidate = candidate
- return r, nil
- }
- func Cleanup(cleanupArgs *api.CleanupArgs) (*api.CleanupResult, error) {
- r := new(api.CleanupResult)
- cm := GetCandidateManager()
- if cleanupArgs.ResType != "" {
- cm.ReloadAll(cleanupArgs.ResType)
- } else {
- cm.ReloadAll("host")
- cm.ReloadAll("baremetal")
- }
- return r, nil
- }
- func GetHistoryList(args *api.HistoryArgs) (*api.HistoryResult, error) {
- offset, limit, all, isSuggestion := args.Offset, args.Limit, args.All, args.IsSuggestion
- if limit == int64(0) {
- limit = int64(50)
- }
- historyItems, total := schedManager.HistoryManager.GetHistoryList(offset, limit, all, isSuggestion)
- items := []*api.HistoryItem{}
- for idx := range historyItems {
- hi := historyItems[idx]
- items = append(items, hi.ToAPI())
- }
- return &api.HistoryResult{
- Items: items,
- Offset: offset,
- Limit: int64(len(items)),
- Total: total,
- }, nil
- }
- func GetHistoryDetail(historyDetailArgs *api.HistoryDetailArgs) (*api.HistoryDetailResult, error) {
- historyItem := schedManager.HistoryManager.GetHistory(historyDetailArgs.ID)
- if historyItem == nil {
- return nil, fmt.Errorf("History '%v' not found", historyDetailArgs.ID)
- }
- task := historyItem.Task
- schedInfo := task.SchedInfo
- historyTasks := []api.HistoryTask{}
- data := schedInfo
- taskExecutor := task.GetTaskExecutor(data.Tag)
- historyTask := api.HistoryTask{
- Type: data.Hypervisor,
- Data: data,
- }
- if taskExecutor != nil {
- historyTask.Status = taskExecutor.Status
- historyTask.Time = taskExecutor.Time.Local().Format("2006-01-02 15:04:05")
- historyTask.Consuming = fmt.Sprintf("%s", taskExecutor.Consuming)
- resultItems, err := taskExecutor.GetResult()
- historyTask.Result = resultItems
- if err != nil {
- historyTask.Error = fmt.Sprintf("%v", err)
- }
- if historyDetailArgs.Log {
- historyTask.Logs = taskExecutor.GetLogs()
- historyTask.CapacityMap = taskExecutor.GetCapacityMap()
- }
- }
- historyTasks = append(historyTasks, historyTask)
- var inputStr, outputStr, errStr string
- result, err := task.GetResult()
- if err != nil {
- errStr = fmt.Sprintf("%v", err)
- } else {
- if bytes, err0 := json.MarshalIndent(result, "", " "); err0 == nil {
- outputStr = string(bytes)
- }
- }
- if historyDetailArgs.Raw {
- inputStr = schedInfo.Raw
- }
- historyDetail := &api.HistoryDetail{
- Time: historyItem.Time.Local().Format("2006-01-02 15:04:05"),
- Consuming: fmt.Sprintf("%s", task.Consuming),
- SessionID: task.GetSessionID(),
- Tasks: historyTasks,
- Input: inputStr,
- Output: outputStr,
- Error: errStr,
- }
- return &api.HistoryDetailResult{
- Detail: historyDetail,
- }, nil
- }
- func GetCandidateHostsDesc() ([]core.Candidater, error) {
- return GetCandidateManager().GetCandidates(data_manager.CandidateGetArgs{ResType: "host"})
- }
- func GetK8sCandidateHosts(nodesName ...string) ([]*candidatecache.HostDesc, error) {
- hosts, err := GetCandidateHostsDesc()
- if err != nil {
- return nil, err
- }
- findHost := func(nodeName string) *candidatecache.HostDesc {
- for _, host := range hosts {
- if host.Getter().Name() == nodeName {
- return host.(*candidatecache.HostDesc)
- }
- }
- return nil
- }
- ret := make([]*candidatecache.HostDesc, 0)
- for _, nodeName := range nodesName {
- if host := findHost(nodeName); host != nil {
- ret = append(ret, host)
- }
- }
- return ret, nil
- }
|