generic_scheduler.go 19 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708
  1. // Copyright 2019 Yunion
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package core
  15. import (
  16. "context"
  17. "fmt"
  18. "sort"
  19. "strings"
  20. "sync"
  21. "sync/atomic"
  22. "time"
  23. "yunion.io/x/log"
  24. "yunion.io/x/pkg/util/errors"
  25. gp "yunion.io/x/pkg/util/goroutine_pool"
  26. utiltrace "yunion.io/x/pkg/util/trace"
  27. "yunion.io/x/pkg/util/workqueue"
  28. o "yunion.io/x/onecloud/pkg/scheduler/options"
  29. )
  30. const (
  31. NoResourceAvailableMsg = "No resource are avaliable that match all of the following predicates:"
  32. )
  33. // goroutine pool is to solve the problem of go expansion in
  34. // the stack, check goroutine pool status every minute.
  35. var (
  36. pool = gp.New(60 * time.Second)
  37. )
  38. type FailedPredicateMap map[string][]PredicateFailureReason
  39. type FitError struct {
  40. Unit *Unit
  41. FailedCandidateMap map[string]*FailedCandidates
  42. }
  43. // Error returns detailed information of why the guest failed to fit on each host
  44. func (fe *FitError) Error() string {
  45. ss := []string{}
  46. for stage, fcs := range fe.FailedCandidateMap {
  47. ss = append(ss, fmt.Sprintf("%v(-%v)", stage, len(fcs.Candidates)))
  48. }
  49. reasonMsg := fmt.Sprintf("%s filter by %v, session_id=%q", NoResourceAvailableMsg,
  50. strings.Join(ss, ", "), fe.Unit.SessionID())
  51. return reasonMsg
  52. }
  53. type NoResourceError struct {
  54. info string
  55. sessionID string
  56. }
  57. func (e *NoResourceError) Error() string {
  58. return fmt.Sprintf("No resource avaliable to schedule, session_id: %q, info: %q", e.sessionID, e.info)
  59. }
  60. type Scheduler interface {
  61. BeforePredicate() error
  62. Predicates() (map[string]FitPredicate, error)
  63. PriorityConfigs() ([]PriorityConfig, error)
  64. // mark already selected candidates dirty that
  65. // can't be use again until cleanup them
  66. //DirtySelectedCandidates([]*SelectedCandidate)
  67. }
  68. type GenericScheduler struct {
  69. Scheduler
  70. predicates map[string]FitPredicate
  71. priorities []PriorityConfig
  72. }
  73. func NewGenericScheduler(s Scheduler) (*GenericScheduler, error) {
  74. g := &GenericScheduler{}
  75. predicates, err := s.Predicates()
  76. if err != nil {
  77. return nil, err
  78. }
  79. priorities, err := s.PriorityConfigs()
  80. if err != nil {
  81. return nil, err
  82. }
  83. g.Scheduler = s
  84. g.predicates = predicates
  85. g.priorities = priorities
  86. return g, nil
  87. }
  88. func (g *GenericScheduler) Schedule(ctx context.Context, unit *Unit, candidates []Candidater, helper IResultHelper) (*ScheduleResult, error) {
  89. startTime := time.Now()
  90. defer func() {
  91. log.V(4).Infof("Schedule cost time: %v", time.Since(startTime))
  92. }()
  93. // get schedule context and information
  94. schedInfo := unit.SchedInfo
  95. isSuggestion := schedInfo.IsSuggestion
  96. // new trace follow all steps
  97. trace := utiltrace.New(fmt.Sprintf("SessionID: %s, schedule info: %s",
  98. schedInfo.SessionId, unit.Info()))
  99. defer trace.LogIfLong(1 * time.Second)
  100. if len(candidates) == 0 {
  101. return nil, &NoResourceError{
  102. sessionID: schedInfo.SessionId,
  103. info: unit.Info(),
  104. }
  105. }
  106. // setup something before run predicates, but now there is no actions
  107. err := g.BeforePredicate()
  108. if err != nil {
  109. return nil, err
  110. }
  111. trace.Step("Computing predicates")
  112. // load all predicates and find the candidate can statisfy schedule condition
  113. filteredCandidates, err := findCandidatesThatFit(ctx, unit, candidates, g.predicates)
  114. if err != nil {
  115. return nil, err
  116. }
  117. // if there is no candidate and not from scheduler/test api will return
  118. if len(filteredCandidates) == 0 && !isSuggestion {
  119. return nil, &FitError{
  120. Unit: unit,
  121. FailedCandidateMap: unit.FailedCandidateMap,
  122. }
  123. }
  124. var selectedCandidates []*SelectedCandidate
  125. if len(filteredCandidates) > 0 {
  126. trace.Step("Prioritizing")
  127. // prioritizing candidates
  128. // load all priorities and calculate the candidate's score
  129. priorityList, err := PrioritizeCandidates(unit, filteredCandidates, g.priorities)
  130. if err != nil {
  131. return nil, err
  132. }
  133. trace.Step("Selecting hosts")
  134. // select target candidate hosts
  135. selectedCandidates, err = SelectHosts(unit, priorityList)
  136. if err != nil {
  137. return nil, err
  138. }
  139. } else {
  140. selectedCandidates = []*SelectedCandidate{}
  141. }
  142. if unit.SchedInfo.IsSuggestion {
  143. filteredCandidates = candidates
  144. }
  145. resultItems, err := generateScheduleResult(unit, selectedCandidates, filteredCandidates)
  146. if err != nil {
  147. return nil, err
  148. }
  149. itemList := &SchedResultItemList{Unit: unit, Data: resultItems}
  150. return helper.ResultHelp(itemList, unit.SchedInfo), nil
  151. }
  152. func doSelect(u *Unit, candidate Candidater, count int64) {
  153. plugins := u.AllSelectPlugins()
  154. analysor := newPredicateAnalysor("do select for " + candidate.IndexKey())
  155. defer analysor.ShowResult()
  156. for _, plugin := range plugins {
  157. an := fmt.Sprintf("selected plugin: %s for %s", plugin.Name(), candidate.IndexKey())
  158. analysor.Start(an)
  159. plugin.OnSelectEnd(u, candidate, count)
  160. analysor.End(an, time.Now())
  161. }
  162. }
  163. func newSchedResultByCtx(u *Unit, count int64, c Candidater, useSelect bool) *SchedResultItem {
  164. showDetails := u.SchedInfo.ShowSuggestionDetails
  165. id := c.IndexKey()
  166. if useSelect {
  167. doSelect(u, c, count)
  168. }
  169. r := &SchedResultItem{
  170. ID: id,
  171. Count: count,
  172. Capacity: u.GetCapacity(id),
  173. Name: c.Getter().Name(),
  174. Score: u.GetScore(id),
  175. Data: u.GetFiltedData(id, count),
  176. Candidater: c,
  177. AllocatedResource: u.GetAllocatedResource(id),
  178. SchedData: u.SchedData(),
  179. }
  180. if showDetails {
  181. r.CapacityDetails = GetCapacities(u, id)
  182. r.ScoreDetails = u.GetScoreDetails(id)
  183. }
  184. return r
  185. }
  186. func generateScheduleResult(u *Unit, scs []*SelectedCandidate, fcs []Candidater) (SchedResultItems, error) {
  187. results := make(SchedResultItems, 0)
  188. itemMap := make(map[string]int)
  189. for _, it := range scs {
  190. cid := it.Candidate.IndexKey()
  191. r := newSchedResultByCtx(u, it.Count, it.Candidate, false)
  192. results = append(results, r)
  193. itemMap[cid] = 1
  194. }
  195. suggestionLimit := u.SchedInfo.SuggestionLimit
  196. for _, c := range fcs {
  197. if suggestionLimit <= int64(len(results)) {
  198. break
  199. }
  200. id := c.IndexKey()
  201. if _, ok := itemMap[id]; !ok && u.GetCapacity(id) > 0 {
  202. itemMap[id] = 1
  203. r := newSchedResultByCtx(u, 0, c, true)
  204. results = append(results, r)
  205. }
  206. }
  207. suggestionAll := u.SchedInfo.SuggestionAll
  208. if suggestionAll || len(u.SchedData().PreferCandidates) > 0 {
  209. for _, c := range fcs {
  210. if suggestionLimit <= int64(len(results)) {
  211. break
  212. }
  213. id := c.IndexKey()
  214. if _, ok := itemMap[id]; !ok {
  215. itemMap[id] = 0
  216. r := newSchedResultByCtx(u, 0, c, true)
  217. results = append(results, r)
  218. }
  219. }
  220. }
  221. return results, nil
  222. }
  223. type StorageUsed struct {
  224. used map[string]int64
  225. }
  226. func NewStorageUsed() *StorageUsed {
  227. return &StorageUsed{
  228. used: make(map[string]int64),
  229. }
  230. }
  231. func (s *StorageUsed) Get(storageId string) int64 {
  232. if used, ok := s.used[storageId]; ok {
  233. return used
  234. }
  235. return 0
  236. }
  237. func (s *StorageUsed) Add(storageId string, used int64) {
  238. if s.used == nil {
  239. s.used = make(map[string]int64)
  240. }
  241. oUsed, ok := s.used[storageId]
  242. if ok {
  243. s.used[storageId] = oUsed + used
  244. } else {
  245. s.used[storageId] = used
  246. }
  247. }
  248. type sortStorage struct {
  249. Id string
  250. FeeSize int64
  251. }
  252. type sortStorages []sortStorage
  253. func (s sortStorages) Len() int {
  254. return len(s)
  255. }
  256. func (s sortStorages) Swap(i, j int) {
  257. s[i], s[j] = s[j], s[i]
  258. }
  259. func (s sortStorages) Less(i, j int) bool {
  260. s1 := s[i]
  261. s2 := s[j]
  262. return s1.FeeSize > s2.FeeSize
  263. }
  264. func (s sortStorages) getIds() []string {
  265. ret := make([]string, 0)
  266. for _, obj := range s {
  267. ret = append(ret, obj.Id)
  268. }
  269. return ret
  270. }
  271. func GetCapacities(u *Unit, id string) (res map[string]int64) {
  272. res = make(map[string]int64)
  273. capacities := u.GetCapacities(id)
  274. if len(capacities) > 0 {
  275. for name, capacity := range capacities {
  276. res[name] = capacity.GetCount()
  277. }
  278. }
  279. return
  280. }
  281. type SelectedCandidate struct {
  282. Count int64
  283. Candidate Candidater
  284. }
  285. func (s SelectedCandidate) Index() (string, error) {
  286. return s.Candidate.IndexKey(), nil
  287. }
  288. func (s SelectedCandidate) GetCount() uint64 {
  289. return uint64(s.Count)
  290. }
  291. // SelectHosts takes a prioritized list of candidates and then picks
  292. // a group of hosts
  293. func SelectHosts(unit *Unit, priorityList HostPriorityList) ([]*SelectedCandidate, error) {
  294. if len(priorityList) == 0 {
  295. return nil, fmt.Errorf("SelectHosts get empty priorityList.")
  296. }
  297. selectedMap := make(map[string]*SelectedCandidate)
  298. noSelectedMap := make(map[string]Candidater)
  299. for _, item := range priorityList {
  300. noSelectedMap[item.Host] = item.Candidate
  301. }
  302. schedData := unit.SchedData()
  303. count := schedData.Count
  304. isSuggestion := unit.SchedInfo.IsSuggestion
  305. bestEffort := unit.SchedInfo.BestEffort
  306. selectedCandidates := []*SelectedCandidate{}
  307. sort.Sort(sort.Reverse(priorityList))
  308. completed:
  309. for len(priorityList) > 0 {
  310. log.V(10).Debugf("PriorityList: %#v", priorityList)
  311. priorityList0 := HostPriorityList{}
  312. for _, it := range priorityList {
  313. if count <= 0 {
  314. break completed
  315. }
  316. hostID := it.Host
  317. var (
  318. selectedItem *SelectedCandidate
  319. ok bool
  320. )
  321. if selectedItem, ok = selectedMap[hostID]; !ok {
  322. selectedItem = &SelectedCandidate{
  323. Count: 0,
  324. Candidate: it.Candidate,
  325. }
  326. selectedMap[hostID] = selectedItem
  327. delete(noSelectedMap, hostID)
  328. }
  329. selectedItem.Count++
  330. count--
  331. // if capacity of the host large than selected count, this host can be added to priorityList.
  332. if unit.GetCapacity(hostID) > selectedItem.Count {
  333. priorityList0 = append(priorityList0, it)
  334. }
  335. }
  336. // sort by score
  337. priorityList = priorityList0
  338. //sort.Sort(sort.Reverse(priorityList))
  339. }
  340. analysor := newPredicateAnalysor("select Execute")
  341. defer analysor.ShowResult()
  342. for _, sc := range selectedMap {
  343. doSelect(unit, sc.Candidate, sc.Count)
  344. selectedCandidates = append(selectedCandidates, sc)
  345. }
  346. // hack: not selected host should also execute OnSelectEnd step to inject result of network and storage candidates
  347. /*for _, nsc := range noSelectedMap {
  348. for _, plugin := range plugins {
  349. an := fmt.Sprintf("not selected plugin: %s for %s", plugin.Name(), nsc.IndexKey())
  350. analysor.Start(an)
  351. plugin.OnSelectEnd(unit, nsc, 0)
  352. analysor.End(an, time.Now())
  353. }
  354. }*/
  355. if !isSuggestion && !bestEffort {
  356. if count > 0 {
  357. return nil, fmt.Errorf("No enough resource, request/capacity: %d/%d", schedData.Count, schedData.Count-count)
  358. }
  359. }
  360. return selectedCandidates, nil
  361. }
  362. func findCandidatesThatFit(ctx context.Context, unit *Unit, candidates []Candidater, predicates map[string]FitPredicate) ([]Candidater, error) {
  363. var filtered []Candidater
  364. newPredicates, err := preExecPredicate(ctx, unit, candidates, predicates)
  365. if err != nil {
  366. return nil, err
  367. }
  368. // sort predicates by their name
  369. predicateNames := make([]string, 0, len(newPredicates))
  370. for name := range newPredicates {
  371. predicateNames = append(predicateNames, name)
  372. }
  373. sort.Strings(predicateNames)
  374. predicateArray := make([]FitPredicate, 0, len(predicateNames))
  375. for _, name := range predicateNames {
  376. predicateArray = append(predicateArray, newPredicates[name])
  377. }
  378. // do predicate filter
  379. if len(predicateArray) == 0 {
  380. filtered = candidates
  381. } else {
  382. // Create predicate list with enough space to avoid growing it
  383. // and allow assigning.
  384. filtered = make([]Candidater, len(candidates))
  385. errsChannel := make(chan error, len(candidates))
  386. var filteredLen int32
  387. checkUnit := func(i int) {
  388. fits, fcs, err := unitFitsOnCandidate(ctx, unit, candidates[i], predicateArray)
  389. if err != nil {
  390. errsChannel <- err
  391. return
  392. }
  393. if fits {
  394. filtered[atomic.AddInt32(&filteredLen, 1)-1] = candidates[i]
  395. } else {
  396. unit.AppendFailedCandidates(fcs)
  397. }
  398. }
  399. workerSize := o.Options.PredicateParallelizeSize
  400. if workerSize == 0 {
  401. workerSize = 1
  402. }
  403. workqueue.Parallelize(workerSize, len(candidates), checkUnit)
  404. filtered = filtered[:filteredLen]
  405. if len(errsChannel) > 0 {
  406. errs := make([]error, 0)
  407. length := len(errsChannel)
  408. for ; length > 0; length-- {
  409. errs = append(errs, <-errsChannel)
  410. }
  411. return []Candidater{}, errors.NewAggregate(errs)
  412. }
  413. }
  414. return filtered, nil
  415. }
  416. func preExecPredicate(ctx context.Context, unit *Unit, candidates []Candidater, predicates map[string]FitPredicate) (map[string]FitPredicate, error) {
  417. newPredicateFuncs := map[string]FitPredicate{}
  418. // analysor := newPredicateAnalysor("preExecPredicate")
  419. // defer analysor.ShowResult()
  420. for name, predicate := range predicates {
  421. // analysor.Start(name)
  422. // generate new FitPredicates because of race condition?
  423. newPredicate := predicate.Clone()
  424. ok, err := newPredicate.PreExecute(ctx, unit, candidates)
  425. // analysor.End(name, time.Now())
  426. if err != nil {
  427. return nil, err
  428. }
  429. if ok {
  430. newPredicateFuncs[name] = newPredicate
  431. }
  432. }
  433. return newPredicateFuncs, nil
  434. }
  435. type WaitGroupWrapper struct {
  436. sync.WaitGroup
  437. }
  438. func (w *WaitGroupWrapper) Wrap(cb func()) {
  439. w.Add(1)
  440. pool.Go(func() {
  441. cb()
  442. w.Done()
  443. })
  444. }
  445. func unitFitsOnCandidate(
  446. ctx context.Context,
  447. unit *Unit,
  448. candidate Candidater,
  449. predicates []FitPredicate,
  450. ) (bool, []FailedCandidate, error) {
  451. var (
  452. fit bool
  453. reasons []PredicateFailureReason
  454. err error
  455. fcs []FailedCandidate
  456. logs []SchedLog
  457. )
  458. isFit := true
  459. defer func() {
  460. if len(logs) > 0 {
  461. unit.LogManager.Appends(logs)
  462. }
  463. }()
  464. toLog := func(fit bool, reasons []PredicateFailureReason,
  465. err error, stage string) SchedLog {
  466. var (
  467. //sFit string
  468. messages = make([]*LogMessage, 0)
  469. )
  470. /*if fit {
  471. sFit = "Success."
  472. } else {
  473. sFit = "Failed:"
  474. }*/
  475. if err != nil {
  476. messages = append(messages, &LogMessage{Type: "error", Info: fmt.Sprintf("%v", err)})
  477. } else {
  478. if len(reasons) != 0 {
  479. for _, reason := range reasons {
  480. messages = append(messages, &LogMessage{Type: reason.GetType(), Info: reason.GetReason()})
  481. }
  482. }
  483. }
  484. candidateLogIndex := fmt.Sprintf("%v:%s", candidate.Getter().Name(), candidate.IndexKey())
  485. return NewSchedLog(candidateLogIndex, stage, messages, !fit)
  486. }
  487. analysor := newPredicateAnalysor("predicate Execute")
  488. defer analysor.ShowResult()
  489. for _, predicate := range predicates {
  490. n := fmt.Sprintf("%s for %s", predicate.Name(), candidate.Getter().Name())
  491. analysor.Start(n)
  492. fit, reasons, err = predicate.Execute(ctx, unit, candidate)
  493. analysor.End(n, time.Now())
  494. logs = append(logs, toLog(fit, reasons, err, predicate.Name()))
  495. if err != nil {
  496. return false, nil, err
  497. }
  498. if !fit {
  499. fcs = append(fcs, FailedCandidate{
  500. Stage: predicate.Name(),
  501. Candidate: candidate,
  502. Reasons: reasons,
  503. })
  504. isFit = false
  505. // When AlwaysCheckAllPredicates is set to true, scheduler checks all
  506. // the configured predicates even after one or more of them fails.
  507. // When the flag is set to false, scheduler skips checking the rest
  508. // of the predicates after it finds one predicate that failed.
  509. if !o.Options.AlwaysCheckAllPredicates {
  510. break
  511. }
  512. }
  513. }
  514. return isFit, fcs, nil
  515. }
  516. // PrioritizeCandidates by running the individual priority functions in parallel.
  517. // Each priority function is expected to set a score of 0-10
  518. // 0 is the lowest priority score (least preffered node) and 10 is the highest
  519. // / Each priority function can also have its own weight
  520. // The resource scores returned by priority function are multiplied by the weights to get weighted scores
  521. // All scores are finally combined (added) to get the total weighted scores of all resources
  522. func PrioritizeCandidates(
  523. unit *Unit,
  524. candidates []Candidater,
  525. priorities []PriorityConfig,
  526. ) (HostPriorityList, error) {
  527. // If no priority configs are provided, then the EqualPriority function is applied
  528. // This is required to generate the priority list in the required format
  529. if len(priorities) == 0 {
  530. result := make(HostPriorityList, 0, len(candidates))
  531. for _, candidate := range candidates {
  532. hostPriority, err := EqualPriority(unit, candidate)
  533. if err != nil {
  534. return nil, err
  535. }
  536. result = append(result, hostPriority)
  537. }
  538. return result, nil
  539. }
  540. wg := sync.WaitGroup{}
  541. results := make([]HostPriorityList, len(priorities))
  542. newPriorities, err := preExecPriorities(priorities, unit, candidates)
  543. if err != nil {
  544. return nil, err
  545. }
  546. // Max : 3 * len(newPriorities)
  547. errsChannel := make(chan error, 3*len(newPriorities))
  548. for i := range newPriorities {
  549. results[i] = make(HostPriorityList, len(candidates))
  550. }
  551. // map reduce take priorities
  552. processCandidate := func(index int) {
  553. var err error
  554. candidate := candidates[index]
  555. for i := range newPriorities {
  556. results[i][index], err = newPriorities[i].Map(unit, candidate)
  557. if err != nil {
  558. errsChannel <- err
  559. return
  560. }
  561. }
  562. }
  563. workqueue.Parallelize(o.Options.PriorityParallelizeSize, len(candidates), processCandidate)
  564. for i, p := range newPriorities {
  565. wg.Add(1)
  566. go func(index int, priority PriorityConfig) {
  567. defer wg.Done()
  568. if err := priority.Reduce(unit, candidates, results[index]); err != nil {
  569. errsChannel <- err
  570. }
  571. }(i, p)
  572. }
  573. // Wait for all computations to be finished.
  574. wg.Wait()
  575. if len(errsChannel) != 0 {
  576. errs := make([]error, 0)
  577. length := len(errsChannel)
  578. for ; length > 0; length-- {
  579. errs = append(errs, <-errsChannel)
  580. }
  581. return HostPriorityList{}, errors.NewAggregate(errs)
  582. }
  583. // Summarize all scores
  584. result := make(HostPriorityList, 0, len(candidates))
  585. // TODO: Consider parallelizing it
  586. // Do plugin priorities step
  587. for _, candidate := range candidates {
  588. for _, plugin := range unit.AllSelectPlugins() {
  589. plugin.OnPriorityEnd(unit, candidate)
  590. }
  591. }
  592. for i, candidate := range candidates {
  593. result = append(result, HostPriority{Host: candidates[i].IndexKey(), Score: *newScore(), Candidate: candidates[i]})
  594. //for j := range newPriorities {
  595. //result[i].Score += results[j][i].Score * newPriorities[j].Weight
  596. //}
  597. result[i].Score = unit.GetScore(candidate.IndexKey())
  598. }
  599. if log.V(10) {
  600. for i := range result {
  601. log.Infof("Host %s => Score %s", result[i].Host, result[i].Score.String())
  602. }
  603. }
  604. return result, nil
  605. }
  606. func preExecPriorities(priorities []PriorityConfig, unit *Unit, candidates []Candidater) ([]PriorityConfig, error) {
  607. newPriorities := []PriorityConfig{}
  608. for _, p := range priorities {
  609. ok, _, err := p.Pre(unit, candidates)
  610. if err != nil {
  611. return nil, err
  612. }
  613. if ok {
  614. newPriorities = append(newPriorities, p)
  615. }
  616. }
  617. return newPriorities, nil
  618. }
  619. // EqualPriority is a prioritizer function that gives an equal weight of one to all candidates
  620. func EqualPriority(_ *Unit, candidate Candidater) (HostPriority, error) {
  621. indexKey := candidate.IndexKey()
  622. if indexKey == "" {
  623. return HostPriority{}, fmt.Errorf("Candidate indexKey is empty")
  624. }
  625. return HostPriority{
  626. Host: indexKey,
  627. Score: newZeroScore(),
  628. Candidate: candidate,
  629. }, nil
  630. }