| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782 |
- // Copyright 2019 Yunion
- //
- // Licensed under the Apache License, Version 2.0 (the "License");
- // you may not use this file except in compliance with the License.
- // You may obtain a copy of the License at
- //
- // http://www.apache.org/licenses/LICENSE-2.0
- //
- // Unless required by applicable law or agreed to in writing, software
- // distributed under the License is distributed on an "AS IS" BASIS,
- // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- // See the License for the specific language governing permissions and
- // limitations under the License.
- package core
- import (
- "fmt"
- "sort"
- "strings"
- "sync"
- "yunion.io/x/log"
- "yunion.io/x/pkg/tristate"
- "yunion.io/x/onecloud/pkg/compute/models"
- "yunion.io/x/onecloud/pkg/scheduler/api"
- "yunion.io/x/onecloud/pkg/scheduler/core/score"
- )
- const (
- EmptyCapacity int64 = -1
- MaxCapacity int64 = 0x7FFFFFFFFFFFFFFF
- )
- var (
- EmptyCapacities = make(map[string]Counter)
- EmptySelectPriorityValue = SSelectPriorityValue(0)
- )
- type SharedResourceManager struct {
- resourceMap map[string]Counter
- lock sync.Mutex
- }
- func NewSharedResourceManager() *SharedResourceManager {
- return &SharedResourceManager{
- lock: sync.Mutex{},
- resourceMap: make(map[string]Counter),
- }
- }
- func (m *SharedResourceManager) Add(resourceKey string, capacity Counter) {
- m.lock.Lock()
- defer m.lock.Unlock()
- m.resourceMap[resourceKey] = capacity
- }
- type CounterManager struct {
- Counters map[string]Counter
- lock sync.Mutex
- }
- func NewCounterManager() *CounterManager {
- return &CounterManager{
- Counters: make(map[string]Counter),
- lock: sync.Mutex{},
- }
- }
- func (m *CounterManager) Get(key string) Counter {
- m.lock.Lock()
- defer m.lock.Unlock()
- if counter, ok := m.Counters[key]; ok {
- return counter
- }
- return nil
- }
- func (m *CounterManager) GetOrCreate(key string, creator func() Counter) Counter {
- m.lock.Lock()
- defer m.lock.Unlock()
- if counter, ok := m.Counters[key]; ok {
- return counter
- }
- counter := creator()
- if counter == nil {
- return nil
- }
- m.Counters[key] = counter
- return counter
- }
- type Counter interface {
- GetCount() int64
- }
- type MultiCounter interface {
- Counter
- Add(counter Counter)
- }
- type NormalCounter struct {
- Value int64
- }
- func NewNormalCounter(value int64) *NormalCounter {
- return &NormalCounter{
- Value: value,
- }
- }
- func (c *NormalCounter) GetCount() int64 {
- return c.Value
- }
- type Counters struct {
- counters []Counter
- lock sync.Mutex
- sum int64
- }
- func NewCounters() *Counters {
- return &Counters{
- sum: EmptyCapacity,
- lock: sync.Mutex{},
- }
- }
- func (c *Counters) Add(cnt Counter) {
- c.lock.Lock()
- defer c.lock.Unlock()
- c.counters = append(c.counters, cnt)
- c.sum = EmptyCapacity
- }
- func (c *Counters) GetCount() int64 {
- if c.sum == EmptyCapacity {
- c.sum = c.calculateCount()
- }
- return c.sum
- }
- func (c *Counters) calculateCount() int64 {
- if len(c.counters) == 0 {
- return 0
- }
- c.lock.Lock()
- defer c.lock.Unlock()
- value := int64(0)
- for _, c := range c.counters {
- count := c.GetCount()
- if count != EmptyCapacity {
- value += c.GetCount()
- }
- }
- return value
- }
- type MinCounters struct {
- counters []Counter
- }
- func NewMinCounters() *MinCounters {
- return &MinCounters{}
- }
- func (c *MinCounters) Add(counter Counter) {
- c.counters = append(c.counters, counter)
- }
- func (c *MinCounters) GetCount() int64 {
- if len(c.counters) == 0 {
- return EmptyCapacity
- }
- minCount := c.counters[0].GetCount()
- if len(c.counters) == 1 {
- return minCount
- }
- for _, c0 := range c.counters[1:] {
- count := c0.GetCount()
- if count < minCount {
- minCount = count
- }
- }
- return minCount
- }
- type Capacity struct {
- Values map[string]Counter
- MinValue int64
- }
- type Score struct {
- *score.ScoreBucket
- }
- func newScore() *Score {
- return &Score{
- ScoreBucket: score.NewScoreBuckets(),
- }
- }
- func newZeroScore() Score {
- s := newScore()
- s.SetScore(score.NewZeroScore(), tristate.None)
- return *s
- }
- type SchedContextDataItem struct {
- Networks *sync.Map
- Data map[string]interface{}
- }
- type LogMessage struct {
- Type string
- Info string
- }
- type LogMessages []*LogMessage
- func (ms LogMessages) String() string {
- ss := make([]string, 0)
- for _, s := range ms {
- ss = append(ss, fmt.Sprintf("%s: %s", s.Type, s.Info))
- }
- return strings.Join(ss, ",")
- }
- type SchedLog struct {
- Candidate string
- Action string
- Messages LogMessages
- IsFailed bool
- }
- func NewSchedLog(candidate, action string, messages LogMessages, isFailed bool) SchedLog {
- return SchedLog{candidate, action, messages, isFailed}
- }
- func (log *SchedLog) String() string {
- prefix := "Success"
- if log.IsFailed {
- prefix = "Failed"
- }
- return fmt.Sprintf("%s: %v [%v] %v", prefix, log.Candidate, log.Action, log.Messages.String())
- }
- type SchedLogList []SchedLog
- func (logList SchedLogList) Get(index string) *SchedLog {
- for _, l := range logList {
- if l.Candidate == index {
- return &l
- }
- }
- return nil
- }
- func (logList SchedLogList) Len() int {
- return len(logList)
- }
- func (logList SchedLogList) Less(i, j int) bool {
- r := strings.Compare(logList[i].Candidate, logList[j].Candidate)
- if r != 0 {
- return r < 0
- }
- r = strings.Compare(logList[i].Messages.String(), logList[j].Messages.String())
- if r != 0 {
- return r < 0
- }
- return strings.Compare(logList[i].Action, logList[j].Action) < 0
- }
- func (logList SchedLogList) Swap(i, j int) {
- logList[i], logList[j] = logList[j], logList[i]
- }
- type SchedLogManager struct {
- Logs SchedLogList
- lock sync.Mutex
- sorted bool
- }
- func NewSchedLogManager() *SchedLogManager {
- return &SchedLogManager{
- lock: sync.Mutex{},
- Logs: SchedLogList{},
- }
- }
- /*func (m *SchedLogManager) Append(candidate, action, message string, isFailed bool) {
- m.lock.Lock()
- defer m.lock.Unlock()
- m.Logs = append(m.Logs, NewSchedLog(candidate, action, message, isFailed))
- }*/
- func (m *SchedLogManager) Appends(logs []SchedLog) {
- m.lock.Lock()
- defer m.lock.Unlock()
- m.Logs = append(m.Logs, logs...)
- }
- func (m *SchedLogManager) FailedLogs() SchedLogList {
- var logs SchedLogList
- for _, l := range m.Logs {
- if l.IsFailed {
- logs = append(logs, l)
- }
- }
- return logs
- }
- func (m *SchedLogManager) Read() []string {
- rets := []string{}
- m.lock.Lock()
- defer m.lock.Unlock()
- if len(m.Logs) == 0 {
- return rets
- }
- if !m.sorted {
- sort.Sort(m.Logs)
- m.sorted = true
- }
- joinLogs := func(startIndex, endIndex int) string {
- if endIndex == startIndex+1 {
- return m.Logs[startIndex].String()
- }
- log := m.Logs[startIndex]
- actions := []string{}
- var isFailed bool
- for ; startIndex < endIndex; startIndex++ {
- actions = append(actions, m.Logs[startIndex].Action)
- if m.Logs[startIndex].IsFailed {
- isFailed = true
- }
- }
- newLog := NewSchedLog(log.Candidate, strings.Join(actions, ","), log.Messages, isFailed)
- return newLog.String()
- }
- startIndex := -1
- for index, len := 0, len(m.Logs); index < len; index++ {
- if startIndex < 0 {
- startIndex = index
- } else {
- log0, log := m.Logs[startIndex], m.Logs[index]
- if log0.Candidate != log.Candidate || log0.Messages.String() != log.Messages.String() {
- rets = append(rets, joinLogs(startIndex, index))
- startIndex = index
- }
- }
- }
- rets = append(rets, joinLogs(startIndex, len(m.Logs)))
- return rets
- }
- // Unit wraps sched input info and other log and record manager
- type Unit struct {
- SchedInfo *api.SchedInfo
- CapacityMap map[string]*Capacity
- ScoreMap map[string]Score
- DataMap map[string]*SchedContextDataItem
- SharedResourceManager *SharedResourceManager
- CounterManager *CounterManager
- capacityLock sync.Mutex
- scoreLock sync.Mutex
- FailedCandidateMap map[string]*FailedCandidates
- failedCandidateMapLock sync.Mutex
- //ScoreMap map[string]Score
- //LogManager *LogManager
- //ReservedPool *data_manager.ReservedPool
- SchedulerManager interface{}
- selectPlugins []SelectPlugin
- LogManager *SchedLogManager
- AllocatedResources map[string]*AllocatedResource
- SelectPriorityMap map[string]SSelectPriority
- SelectPriorityUpdaterMap map[string]SSelectPriorityUpdater
- SelectPriorityLock sync.Mutex
- }
- func NewScheduleUnit(info *api.SchedInfo, schedManager interface{}) *Unit {
- cmap := make(map[string]*Capacity) // candidate_id, Capacity
- smap := make(map[string]Score) // candidate_id, Score
- spmap := make(map[string]SSelectPriority)
- spumap := make(map[string]SSelectPriorityUpdater)
- unit := &Unit{
- SchedInfo: info,
- FailedCandidateMap: make(map[string]*FailedCandidates),
- failedCandidateMapLock: sync.Mutex{},
- CapacityMap: cmap,
- ScoreMap: smap,
- capacityLock: sync.Mutex{},
- scoreLock: sync.Mutex{},
- DataMap: make(map[string]*SchedContextDataItem),
- SharedResourceManager: NewSharedResourceManager(),
- CounterManager: NewCounterManager(),
- LogManager: NewSchedLogManager(),
- SchedulerManager: schedManager,
- AllocatedResources: make(map[string]*AllocatedResource),
- SelectPriorityMap: spmap,
- SelectPriorityUpdaterMap: spumap,
- }
- return unit
- }
- func (u *Unit) Info() string {
- return u.SchedInfo.JSON(u.SchedInfo).String()
- }
- func (u *Unit) SessionID() string {
- return u.SchedInfo.SessionId
- }
- func (u *Unit) SchedData() *api.SchedInfo {
- return u.SchedInfo
- }
- func (u *Unit) GetHypervisor() string {
- driver, _ := models.GetHostDriver(u.SchedInfo.Hypervisor, u.SchedInfo.Provider)
- if driver != nil {
- return driver.GetHypervisor()
- }
- return u.SchedData().Hypervisor
- }
- func (u *Unit) GetHypervisorDriver() models.IGuestDriver {
- hypervisor := u.GetHypervisor()
- driver, _ := models.GetDriver(hypervisor, u.SchedInfo.Provider)
- return driver
- }
- func (u *Unit) AppendFailedCandidates(fcs []FailedCandidate) {
- if len(fcs) == 0 {
- return
- }
- u.failedCandidateMapLock.Lock()
- defer u.failedCandidateMapLock.Unlock()
- for _, fc := range fcs {
- fcs, ok := u.FailedCandidateMap[fc.Stage]
- if !ok {
- fcs = &FailedCandidates{}
- u.FailedCandidateMap[fc.Stage] = fcs
- }
- fcs.Candidates = append(fcs.Candidates, fc)
- }
- }
- func (u *Unit) AppendSelectPlugin(p SelectPlugin) {
- u.selectPlugins = append(u.selectPlugins, p)
- }
- func (u *Unit) AllSelectPlugins() []SelectPlugin {
- return u.selectPlugins
- }
- func (u *Unit) GetCapacity(id string) int64 {
- var (
- capacityObj *Capacity
- ok bool
- )
- u.capacityLock.Lock()
- defer u.capacityLock.Unlock()
- if capacityObj, ok = u.CapacityMap[id]; !ok {
- return 0
- }
- if capacityObj.MinValue == EmptyCapacity {
- capacity := MaxCapacity
- for _, counter := range capacityObj.Values {
- count := counter.GetCount()
- if capacity > count {
- capacity = count
- }
- }
- capacityObj.MinValue = capacity
- }
- return capacityObj.MinValue
- }
- func (u *Unit) GetCapacityOfName(id string, name string) int64 {
- u.capacityLock.Lock()
- defer u.capacityLock.Unlock()
- if capacityObj, ok := u.CapacityMap[id]; ok {
- if counter, ok0 := capacityObj.Values[name]; ok0 {
- return counter.GetCount()
- }
- }
- return EmptyCapacity
- }
- func (u *Unit) GetCapacities(id string) map[string]Counter {
- if capacityObj, ok := u.CapacityMap[id]; ok {
- return capacityObj.Values
- }
- return EmptyCapacities
- }
- func (u *Unit) SetCapacity(id string, name string, capacity Counter) error {
- u.capacityLock.Lock()
- defer u.capacityLock.Unlock()
- // Capacity must >= -1
- if !validateCapacityInput(capacity) {
- err := fmt.Errorf("capacity counter %#v invalid %d", capacity, capacity.GetCount())
- log.Errorf("SetCapacity error: %v", err)
- return err
- }
- log.Debugf("%q setCapacity id: %s, capacity: %d", name, id, capacity.GetCount())
- var (
- capacityObj *Capacity
- ok bool
- )
- if capacityObj, ok = u.CapacityMap[id]; !ok {
- capacityObj = &Capacity{Values: make(map[string]Counter), MinValue: EmptyCapacity}
- u.CapacityMap[id] = capacityObj
- }
- capacityObj.Values[name] = capacity
- capacityObj.MinValue = EmptyCapacity
- return nil
- }
- func (u *Unit) GetSelectPriority(id string) SSelectPriorityValue {
- if sp, ok := u.SelectPriorityMap[id]; ok {
- return sp.Value()
- }
- return EmptySelectPriorityValue
- }
- func (u *Unit) SetSelectPriorityWithLock(id string, name string, spv SSelectPriorityValue) {
- u.SelectPriorityLock.Lock()
- defer u.SelectPriorityLock.Unlock()
- sp, ok := u.SelectPriorityMap[id]
- if !ok {
- sp = NewSSelctPriority()
- u.SelectPriorityMap[id] = sp
- }
- sp[name] = spv
- }
- func (u *Unit) UpdateSelectPriority() {
- for hostID, sp := range u.SelectPriorityMap {
- for name, spv := range sp {
- sp[name] = u.SelectPriorityUpdaterMap[name](u, spv, hostID)
- }
- }
- }
- func (u *Unit) GetMaxSelectPriority() (max SSelectPriorityValue) {
- max = EmptySelectPriorityValue
- for _, sp := range u.SelectPriorityMap {
- val := sp.Value()
- if max.Less(val) {
- max = val
- }
- }
- return
- }
- func (u *Unit) RegisterSelectPriorityUpdater(name string, f SSelectPriorityUpdater) {
- u.SelectPriorityLock.Lock()
- defer u.SelectPriorityLock.Unlock()
- u.SelectPriorityUpdaterMap[name] = f
- }
- func validateCapacityInput(c Counter) bool {
- if c != nil && c.GetCount() >= -1 {
- return true
- }
- return false
- }
- type ScoreValue struct {
- value score.TScore
- }
- func (u *Unit) setScore(id string, val score.SScore, prefer tristate.TriState) {
- u.scoreLock.Lock()
- defer u.scoreLock.Unlock()
- var (
- scoreObj Score
- ok bool
- )
- if scoreObj, ok = u.ScoreMap[id]; !ok {
- scoreObj = *newScore()
- u.ScoreMap[id] = scoreObj
- }
- scoreObj.ScoreBucket.SetScore(val, prefer)
- log.V(10).Infof("SetScore: %q -> %s, prefer: %s", id, val.String(), prefer)
- }
- func (u *Unit) SetScore(id string, val score.SScore) {
- u.setScore(id, val, tristate.None)
- }
- func (u *Unit) SetPreferScore(id string, val score.SScore) {
- u.setScore(id, val, tristate.True)
- }
- func (u *Unit) SetAvoidScore(id string, val score.SScore) {
- u.setScore(id, val, tristate.False)
- }
- func (u *Unit) GetScore(id string) Score {
- var (
- scoreObj Score
- ok bool
- )
- if scoreObj, ok = u.ScoreMap[id]; !ok {
- return *newScore()
- }
- return scoreObj
- }
- func (u *Unit) GetScoreDetails(id string) string {
- if score, ok := u.ScoreMap[id]; ok {
- return score.String()
- }
- return "EmptyScore"
- }
- func (u *Unit) SetFiltedData(id string, name string, data interface{}) error {
- u.scoreLock.Lock()
- defer u.scoreLock.Unlock()
- dataItem, ok := u.DataMap[id]
- if !ok {
- dataItem = &SchedContextDataItem{
- Data: make(map[string]interface{}),
- }
- u.DataMap[id] = dataItem
- }
- if name == "network" {
- dataItem.Networks = data.(*sync.Map)
- } else {
- if m, ok := data.(map[string]interface{}); ok {
- for key, value := range m {
- dataItem.Data[key] = value
- }
- }
- }
- return nil
- }
- func (u *Unit) GetFiltedData(id string, count int64) map[string]interface{} {
- schedContextData := make(map[string]interface{})
- if data, ok := u.DataMap[id]; ok {
- // deal networks
- networks := make(map[string]int64)
- if data.Networks != nil {
- data.Networks.Range(func(networkID, ipNumber interface{}) bool {
- networkIDString := networkID.(string)
- ipNumberInt64 := ipNumber.(int64)
- if count > ipNumberInt64 {
- networks[networkIDString] = ipNumberInt64
- count = count - ipNumberInt64
- return true
- } else if count <= ipNumberInt64 {
- networks[networkIDString] = count
- count = 0
- return false
- }
- return false
- })
- }
- schedContextData["networks"] = networks
- // others
- for key, value := range data.Data {
- schedContextData[key] = value
- }
- return schedContextData
- }
- return nil
- }
- func (u *Unit) GetAllocatedResource(candidateId string) *AllocatedResource {
- ret, ok := u.AllocatedResources[candidateId]
- if !ok {
- ret = NewAllocatedResource()
- u.AllocatedResources[candidateId] = ret
- }
- return ret
- }
- type SSelectPriority map[string]SSelectPriorityValue
- func (s SSelectPriority) Value() (val SSelectPriorityValue) {
- val = EmptySelectPriorityValue
- for _, v := range s {
- if v > val {
- val = v
- }
- }
- return
- }
- func NewSSelctPriority() SSelectPriority {
- return make(map[string]SSelectPriorityValue)
- }
- // SSelectPriorityUpdater will call to update the specified host after each round of selection
- type SSelectPriorityUpdater func(u *Unit, origin SSelectPriorityValue, hostID string) SSelectPriorityValue
- type SSelectPriorityValue int
- func (s SSelectPriorityValue) Less(sp SSelectPriorityValue) bool {
- return s < sp
- }
- func (s SSelectPriorityValue) Sub(sp SSelectPriorityValue) (ret SSelectPriorityValue) {
- ret = s - sp
- if ret.Less(EmptySelectPriorityValue) {
- ret = EmptySelectPriorityValue
- }
- return
- }
- func (s SSelectPriorityValue) SubOne() SSelectPriorityValue {
- return s.Sub(SSelectPriorityValue(1))
- }
- func (s SSelectPriorityValue) IsEmpty() bool {
- return s == EmptySelectPriorityValue
- }
|