| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708 |
- // Copyright 2019 Yunion
- //
- // Licensed under the Apache License, Version 2.0 (the "License");
- // you may not use this file except in compliance with the License.
- // You may obtain a copy of the License at
- //
- // http://www.apache.org/licenses/LICENSE-2.0
- //
- // Unless required by applicable law or agreed to in writing, software
- // distributed under the License is distributed on an "AS IS" BASIS,
- // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- // See the License for the specific language governing permissions and
- // limitations under the License.
- package core
- import (
- "context"
- "fmt"
- "sort"
- "strings"
- "sync"
- "sync/atomic"
- "time"
- "yunion.io/x/log"
- "yunion.io/x/pkg/util/errors"
- gp "yunion.io/x/pkg/util/goroutine_pool"
- utiltrace "yunion.io/x/pkg/util/trace"
- "yunion.io/x/pkg/util/workqueue"
- o "yunion.io/x/onecloud/pkg/scheduler/options"
- )
- const (
- NoResourceAvailableMsg = "No resource are avaliable that match all of the following predicates:"
- )
- // goroutine pool is to solve the problem of go expansion in
- // the stack, check goroutine pool status every minute.
- var (
- pool = gp.New(60 * time.Second)
- )
- type FailedPredicateMap map[string][]PredicateFailureReason
- type FitError struct {
- Unit *Unit
- FailedCandidateMap map[string]*FailedCandidates
- }
- // Error returns detailed information of why the guest failed to fit on each host
- func (fe *FitError) Error() string {
- ss := []string{}
- for stage, fcs := range fe.FailedCandidateMap {
- ss = append(ss, fmt.Sprintf("%v(-%v)", stage, len(fcs.Candidates)))
- }
- reasonMsg := fmt.Sprintf("%s filter by %v, session_id=%q", NoResourceAvailableMsg,
- strings.Join(ss, ", "), fe.Unit.SessionID())
- return reasonMsg
- }
- type NoResourceError struct {
- info string
- sessionID string
- }
- func (e *NoResourceError) Error() string {
- return fmt.Sprintf("No resource avaliable to schedule, session_id: %q, info: %q", e.sessionID, e.info)
- }
- type Scheduler interface {
- BeforePredicate() error
- Predicates() (map[string]FitPredicate, error)
- PriorityConfigs() ([]PriorityConfig, error)
- // mark already selected candidates dirty that
- // can't be use again until cleanup them
- //DirtySelectedCandidates([]*SelectedCandidate)
- }
- type GenericScheduler struct {
- Scheduler
- predicates map[string]FitPredicate
- priorities []PriorityConfig
- }
- func NewGenericScheduler(s Scheduler) (*GenericScheduler, error) {
- g := &GenericScheduler{}
- predicates, err := s.Predicates()
- if err != nil {
- return nil, err
- }
- priorities, err := s.PriorityConfigs()
- if err != nil {
- return nil, err
- }
- g.Scheduler = s
- g.predicates = predicates
- g.priorities = priorities
- return g, nil
- }
- func (g *GenericScheduler) Schedule(ctx context.Context, unit *Unit, candidates []Candidater, helper IResultHelper) (*ScheduleResult, error) {
- startTime := time.Now()
- defer func() {
- log.V(4).Infof("Schedule cost time: %v", time.Since(startTime))
- }()
- // get schedule context and information
- schedInfo := unit.SchedInfo
- isSuggestion := schedInfo.IsSuggestion
- // new trace follow all steps
- trace := utiltrace.New(fmt.Sprintf("SessionID: %s, schedule info: %s",
- schedInfo.SessionId, unit.Info()))
- defer trace.LogIfLong(1 * time.Second)
- if len(candidates) == 0 {
- return nil, &NoResourceError{
- sessionID: schedInfo.SessionId,
- info: unit.Info(),
- }
- }
- // setup something before run predicates, but now there is no actions
- err := g.BeforePredicate()
- if err != nil {
- return nil, err
- }
- trace.Step("Computing predicates")
- // load all predicates and find the candidate can statisfy schedule condition
- filteredCandidates, err := findCandidatesThatFit(ctx, unit, candidates, g.predicates)
- if err != nil {
- return nil, err
- }
- // if there is no candidate and not from scheduler/test api will return
- if len(filteredCandidates) == 0 && !isSuggestion {
- return nil, &FitError{
- Unit: unit,
- FailedCandidateMap: unit.FailedCandidateMap,
- }
- }
- var selectedCandidates []*SelectedCandidate
- if len(filteredCandidates) > 0 {
- trace.Step("Prioritizing")
- // prioritizing candidates
- // load all priorities and calculate the candidate's score
- priorityList, err := PrioritizeCandidates(unit, filteredCandidates, g.priorities)
- if err != nil {
- return nil, err
- }
- trace.Step("Selecting hosts")
- // select target candidate hosts
- selectedCandidates, err = SelectHosts(unit, priorityList)
- if err != nil {
- return nil, err
- }
- } else {
- selectedCandidates = []*SelectedCandidate{}
- }
- if unit.SchedInfo.IsSuggestion {
- filteredCandidates = candidates
- }
- resultItems, err := generateScheduleResult(unit, selectedCandidates, filteredCandidates)
- if err != nil {
- return nil, err
- }
- itemList := &SchedResultItemList{Unit: unit, Data: resultItems}
- return helper.ResultHelp(itemList, unit.SchedInfo), nil
- }
- func doSelect(u *Unit, candidate Candidater, count int64) {
- plugins := u.AllSelectPlugins()
- analysor := newPredicateAnalysor("do select for " + candidate.IndexKey())
- defer analysor.ShowResult()
- for _, plugin := range plugins {
- an := fmt.Sprintf("selected plugin: %s for %s", plugin.Name(), candidate.IndexKey())
- analysor.Start(an)
- plugin.OnSelectEnd(u, candidate, count)
- analysor.End(an, time.Now())
- }
- }
- func newSchedResultByCtx(u *Unit, count int64, c Candidater, useSelect bool) *SchedResultItem {
- showDetails := u.SchedInfo.ShowSuggestionDetails
- id := c.IndexKey()
- if useSelect {
- doSelect(u, c, count)
- }
- r := &SchedResultItem{
- ID: id,
- Count: count,
- Capacity: u.GetCapacity(id),
- Name: c.Getter().Name(),
- Score: u.GetScore(id),
- Data: u.GetFiltedData(id, count),
- Candidater: c,
- AllocatedResource: u.GetAllocatedResource(id),
- SchedData: u.SchedData(),
- }
- if showDetails {
- r.CapacityDetails = GetCapacities(u, id)
- r.ScoreDetails = u.GetScoreDetails(id)
- }
- return r
- }
- func generateScheduleResult(u *Unit, scs []*SelectedCandidate, fcs []Candidater) (SchedResultItems, error) {
- results := make(SchedResultItems, 0)
- itemMap := make(map[string]int)
- for _, it := range scs {
- cid := it.Candidate.IndexKey()
- r := newSchedResultByCtx(u, it.Count, it.Candidate, false)
- results = append(results, r)
- itemMap[cid] = 1
- }
- suggestionLimit := u.SchedInfo.SuggestionLimit
- for _, c := range fcs {
- if suggestionLimit <= int64(len(results)) {
- break
- }
- id := c.IndexKey()
- if _, ok := itemMap[id]; !ok && u.GetCapacity(id) > 0 {
- itemMap[id] = 1
- r := newSchedResultByCtx(u, 0, c, true)
- results = append(results, r)
- }
- }
- suggestionAll := u.SchedInfo.SuggestionAll
- if suggestionAll || len(u.SchedData().PreferCandidates) > 0 {
- for _, c := range fcs {
- if suggestionLimit <= int64(len(results)) {
- break
- }
- id := c.IndexKey()
- if _, ok := itemMap[id]; !ok {
- itemMap[id] = 0
- r := newSchedResultByCtx(u, 0, c, true)
- results = append(results, r)
- }
- }
- }
- return results, nil
- }
- type StorageUsed struct {
- used map[string]int64
- }
- func NewStorageUsed() *StorageUsed {
- return &StorageUsed{
- used: make(map[string]int64),
- }
- }
- func (s *StorageUsed) Get(storageId string) int64 {
- if used, ok := s.used[storageId]; ok {
- return used
- }
- return 0
- }
- func (s *StorageUsed) Add(storageId string, used int64) {
- if s.used == nil {
- s.used = make(map[string]int64)
- }
- oUsed, ok := s.used[storageId]
- if ok {
- s.used[storageId] = oUsed + used
- } else {
- s.used[storageId] = used
- }
- }
- type sortStorage struct {
- Id string
- FeeSize int64
- }
- type sortStorages []sortStorage
- func (s sortStorages) Len() int {
- return len(s)
- }
- func (s sortStorages) Swap(i, j int) {
- s[i], s[j] = s[j], s[i]
- }
- func (s sortStorages) Less(i, j int) bool {
- s1 := s[i]
- s2 := s[j]
- return s1.FeeSize > s2.FeeSize
- }
- func (s sortStorages) getIds() []string {
- ret := make([]string, 0)
- for _, obj := range s {
- ret = append(ret, obj.Id)
- }
- return ret
- }
- func GetCapacities(u *Unit, id string) (res map[string]int64) {
- res = make(map[string]int64)
- capacities := u.GetCapacities(id)
- if len(capacities) > 0 {
- for name, capacity := range capacities {
- res[name] = capacity.GetCount()
- }
- }
- return
- }
- type SelectedCandidate struct {
- Count int64
- Candidate Candidater
- }
- func (s SelectedCandidate) Index() (string, error) {
- return s.Candidate.IndexKey(), nil
- }
- func (s SelectedCandidate) GetCount() uint64 {
- return uint64(s.Count)
- }
- // SelectHosts takes a prioritized list of candidates and then picks
- // a group of hosts
- func SelectHosts(unit *Unit, priorityList HostPriorityList) ([]*SelectedCandidate, error) {
- if len(priorityList) == 0 {
- return nil, fmt.Errorf("SelectHosts get empty priorityList.")
- }
- selectedMap := make(map[string]*SelectedCandidate)
- noSelectedMap := make(map[string]Candidater)
- for _, item := range priorityList {
- noSelectedMap[item.Host] = item.Candidate
- }
- schedData := unit.SchedData()
- count := schedData.Count
- isSuggestion := unit.SchedInfo.IsSuggestion
- bestEffort := unit.SchedInfo.BestEffort
- selectedCandidates := []*SelectedCandidate{}
- sort.Sort(sort.Reverse(priorityList))
- completed:
- for len(priorityList) > 0 {
- log.V(10).Debugf("PriorityList: %#v", priorityList)
- priorityList0 := HostPriorityList{}
- for _, it := range priorityList {
- if count <= 0 {
- break completed
- }
- hostID := it.Host
- var (
- selectedItem *SelectedCandidate
- ok bool
- )
- if selectedItem, ok = selectedMap[hostID]; !ok {
- selectedItem = &SelectedCandidate{
- Count: 0,
- Candidate: it.Candidate,
- }
- selectedMap[hostID] = selectedItem
- delete(noSelectedMap, hostID)
- }
- selectedItem.Count++
- count--
- // if capacity of the host large than selected count, this host can be added to priorityList.
- if unit.GetCapacity(hostID) > selectedItem.Count {
- priorityList0 = append(priorityList0, it)
- }
- }
- // sort by score
- priorityList = priorityList0
- //sort.Sort(sort.Reverse(priorityList))
- }
- analysor := newPredicateAnalysor("select Execute")
- defer analysor.ShowResult()
- for _, sc := range selectedMap {
- doSelect(unit, sc.Candidate, sc.Count)
- selectedCandidates = append(selectedCandidates, sc)
- }
- // hack: not selected host should also execute OnSelectEnd step to inject result of network and storage candidates
- /*for _, nsc := range noSelectedMap {
- for _, plugin := range plugins {
- an := fmt.Sprintf("not selected plugin: %s for %s", plugin.Name(), nsc.IndexKey())
- analysor.Start(an)
- plugin.OnSelectEnd(unit, nsc, 0)
- analysor.End(an, time.Now())
- }
- }*/
- if !isSuggestion && !bestEffort {
- if count > 0 {
- return nil, fmt.Errorf("No enough resource, request/capacity: %d/%d", schedData.Count, schedData.Count-count)
- }
- }
- return selectedCandidates, nil
- }
- func findCandidatesThatFit(ctx context.Context, unit *Unit, candidates []Candidater, predicates map[string]FitPredicate) ([]Candidater, error) {
- var filtered []Candidater
- newPredicates, err := preExecPredicate(ctx, unit, candidates, predicates)
- if err != nil {
- return nil, err
- }
- // sort predicates by their name
- predicateNames := make([]string, 0, len(newPredicates))
- for name := range newPredicates {
- predicateNames = append(predicateNames, name)
- }
- sort.Strings(predicateNames)
- predicateArray := make([]FitPredicate, 0, len(predicateNames))
- for _, name := range predicateNames {
- predicateArray = append(predicateArray, newPredicates[name])
- }
- // do predicate filter
- if len(predicateArray) == 0 {
- filtered = candidates
- } else {
- // Create predicate list with enough space to avoid growing it
- // and allow assigning.
- filtered = make([]Candidater, len(candidates))
- errsChannel := make(chan error, len(candidates))
- var filteredLen int32
- checkUnit := func(i int) {
- fits, fcs, err := unitFitsOnCandidate(ctx, unit, candidates[i], predicateArray)
- if err != nil {
- errsChannel <- err
- return
- }
- if fits {
- filtered[atomic.AddInt32(&filteredLen, 1)-1] = candidates[i]
- } else {
- unit.AppendFailedCandidates(fcs)
- }
- }
- workerSize := o.Options.PredicateParallelizeSize
- if workerSize == 0 {
- workerSize = 1
- }
- workqueue.Parallelize(workerSize, len(candidates), checkUnit)
- filtered = filtered[:filteredLen]
- if len(errsChannel) > 0 {
- errs := make([]error, 0)
- length := len(errsChannel)
- for ; length > 0; length-- {
- errs = append(errs, <-errsChannel)
- }
- return []Candidater{}, errors.NewAggregate(errs)
- }
- }
- return filtered, nil
- }
- func preExecPredicate(ctx context.Context, unit *Unit, candidates []Candidater, predicates map[string]FitPredicate) (map[string]FitPredicate, error) {
- newPredicateFuncs := map[string]FitPredicate{}
- // analysor := newPredicateAnalysor("preExecPredicate")
- // defer analysor.ShowResult()
- for name, predicate := range predicates {
- // analysor.Start(name)
- // generate new FitPredicates because of race condition?
- newPredicate := predicate.Clone()
- ok, err := newPredicate.PreExecute(ctx, unit, candidates)
- // analysor.End(name, time.Now())
- if err != nil {
- return nil, err
- }
- if ok {
- newPredicateFuncs[name] = newPredicate
- }
- }
- return newPredicateFuncs, nil
- }
- type WaitGroupWrapper struct {
- sync.WaitGroup
- }
- func (w *WaitGroupWrapper) Wrap(cb func()) {
- w.Add(1)
- pool.Go(func() {
- cb()
- w.Done()
- })
- }
- func unitFitsOnCandidate(
- ctx context.Context,
- unit *Unit,
- candidate Candidater,
- predicates []FitPredicate,
- ) (bool, []FailedCandidate, error) {
- var (
- fit bool
- reasons []PredicateFailureReason
- err error
- fcs []FailedCandidate
- logs []SchedLog
- )
- isFit := true
- defer func() {
- if len(logs) > 0 {
- unit.LogManager.Appends(logs)
- }
- }()
- toLog := func(fit bool, reasons []PredicateFailureReason,
- err error, stage string) SchedLog {
- var (
- //sFit string
- messages = make([]*LogMessage, 0)
- )
- /*if fit {
- sFit = "Success."
- } else {
- sFit = "Failed:"
- }*/
- if err != nil {
- messages = append(messages, &LogMessage{Type: "error", Info: fmt.Sprintf("%v", err)})
- } else {
- if len(reasons) != 0 {
- for _, reason := range reasons {
- messages = append(messages, &LogMessage{Type: reason.GetType(), Info: reason.GetReason()})
- }
- }
- }
- candidateLogIndex := fmt.Sprintf("%v:%s", candidate.Getter().Name(), candidate.IndexKey())
- return NewSchedLog(candidateLogIndex, stage, messages, !fit)
- }
- analysor := newPredicateAnalysor("predicate Execute")
- defer analysor.ShowResult()
- for _, predicate := range predicates {
- n := fmt.Sprintf("%s for %s", predicate.Name(), candidate.Getter().Name())
- analysor.Start(n)
- fit, reasons, err = predicate.Execute(ctx, unit, candidate)
- analysor.End(n, time.Now())
- logs = append(logs, toLog(fit, reasons, err, predicate.Name()))
- if err != nil {
- return false, nil, err
- }
- if !fit {
- fcs = append(fcs, FailedCandidate{
- Stage: predicate.Name(),
- Candidate: candidate,
- Reasons: reasons,
- })
- isFit = false
- // When AlwaysCheckAllPredicates is set to true, scheduler checks all
- // the configured predicates even after one or more of them fails.
- // When the flag is set to false, scheduler skips checking the rest
- // of the predicates after it finds one predicate that failed.
- if !o.Options.AlwaysCheckAllPredicates {
- break
- }
- }
- }
- return isFit, fcs, nil
- }
- // PrioritizeCandidates by running the individual priority functions in parallel.
- // Each priority function is expected to set a score of 0-10
- // 0 is the lowest priority score (least preffered node) and 10 is the highest
- // / Each priority function can also have its own weight
- // The resource scores returned by priority function are multiplied by the weights to get weighted scores
- // All scores are finally combined (added) to get the total weighted scores of all resources
- func PrioritizeCandidates(
- unit *Unit,
- candidates []Candidater,
- priorities []PriorityConfig,
- ) (HostPriorityList, error) {
- // If no priority configs are provided, then the EqualPriority function is applied
- // This is required to generate the priority list in the required format
- if len(priorities) == 0 {
- result := make(HostPriorityList, 0, len(candidates))
- for _, candidate := range candidates {
- hostPriority, err := EqualPriority(unit, candidate)
- if err != nil {
- return nil, err
- }
- result = append(result, hostPriority)
- }
- return result, nil
- }
- wg := sync.WaitGroup{}
- results := make([]HostPriorityList, len(priorities))
- newPriorities, err := preExecPriorities(priorities, unit, candidates)
- if err != nil {
- return nil, err
- }
- // Max : 3 * len(newPriorities)
- errsChannel := make(chan error, 3*len(newPriorities))
- for i := range newPriorities {
- results[i] = make(HostPriorityList, len(candidates))
- }
- // map reduce take priorities
- processCandidate := func(index int) {
- var err error
- candidate := candidates[index]
- for i := range newPriorities {
- results[i][index], err = newPriorities[i].Map(unit, candidate)
- if err != nil {
- errsChannel <- err
- return
- }
- }
- }
- workqueue.Parallelize(o.Options.PriorityParallelizeSize, len(candidates), processCandidate)
- for i, p := range newPriorities {
- wg.Add(1)
- go func(index int, priority PriorityConfig) {
- defer wg.Done()
- if err := priority.Reduce(unit, candidates, results[index]); err != nil {
- errsChannel <- err
- }
- }(i, p)
- }
- // Wait for all computations to be finished.
- wg.Wait()
- if len(errsChannel) != 0 {
- errs := make([]error, 0)
- length := len(errsChannel)
- for ; length > 0; length-- {
- errs = append(errs, <-errsChannel)
- }
- return HostPriorityList{}, errors.NewAggregate(errs)
- }
- // Summarize all scores
- result := make(HostPriorityList, 0, len(candidates))
- // TODO: Consider parallelizing it
- // Do plugin priorities step
- for _, candidate := range candidates {
- for _, plugin := range unit.AllSelectPlugins() {
- plugin.OnPriorityEnd(unit, candidate)
- }
- }
- for i, candidate := range candidates {
- result = append(result, HostPriority{Host: candidates[i].IndexKey(), Score: *newScore(), Candidate: candidates[i]})
- //for j := range newPriorities {
- //result[i].Score += results[j][i].Score * newPriorities[j].Weight
- //}
- result[i].Score = unit.GetScore(candidate.IndexKey())
- }
- if log.V(10) {
- for i := range result {
- log.Infof("Host %s => Score %s", result[i].Host, result[i].Score.String())
- }
- }
- return result, nil
- }
- func preExecPriorities(priorities []PriorityConfig, unit *Unit, candidates []Candidater) ([]PriorityConfig, error) {
- newPriorities := []PriorityConfig{}
- for _, p := range priorities {
- ok, _, err := p.Pre(unit, candidates)
- if err != nil {
- return nil, err
- }
- if ok {
- newPriorities = append(newPriorities, p)
- }
- }
- return newPriorities, nil
- }
- // EqualPriority is a prioritizer function that gives an equal weight of one to all candidates
- func EqualPriority(_ *Unit, candidate Candidater) (HostPriority, error) {
- indexKey := candidate.IndexKey()
- if indexKey == "" {
- return HostPriority{}, fmt.Errorf("Candidate indexKey is empty")
- }
- return HostPriority{
- Host: indexKey,
- Score: newZeroScore(),
- Candidate: candidate,
- }, nil
- }
|