| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157 |
- // Copyright 2019 Yunion
- //
- // Licensed under the Apache License, Version 2.0 (the "License");
- // you may not use this file except in compliance with the License.
- // You may obtain a copy of the License at
- //
- // http://www.apache.org/licenses/LICENSE-2.0
- //
- // Unless required by applicable law or agreed to in writing, software
- // distributed under the License is distributed on an "AS IS" BASIS,
- // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- // See the License for the specific language governing permissions and
- // limitations under the License.
- package manager
- import (
- "sync"
- "time"
- "yunion.io/x/log"
- "yunion.io/x/pkg/util/sets"
- u "yunion.io/x/pkg/utils"
- "yunion.io/x/onecloud/pkg/scheduler/api"
- o "yunion.io/x/onecloud/pkg/scheduler/options"
- )
- type ExpireManager struct {
- expireChannel chan *api.ExpireArgs
- stopCh <-chan struct{}
- mergeLock *sync.Mutex
- }
- func NewExpireManager(stopCh <-chan struct{}) *ExpireManager {
- return &ExpireManager{
- expireChannel: make(chan *api.ExpireArgs, o.Options.ExpireQueueMaxLength),
- stopCh: stopCh,
- mergeLock: new(sync.Mutex),
- }
- }
- func (e *ExpireManager) Add(expireArgs *api.ExpireArgs) {
- e.expireChannel <- expireArgs
- }
- func (e *ExpireManager) Trigger() {
- e.batchMergeExpire()
- }
- type expireHost struct {
- Id string
- SessionId string
- }
- func newExpireHost(id string, sid string) *expireHost {
- return &expireHost{
- Id: id,
- SessionId: sid,
- }
- }
- func (e *ExpireManager) Run() {
- t := time.Tick(u.ToDuration(o.Options.ExpireQueueConsumptionPeriod))
- // Watching the expires.
- for {
- select {
- case <-t:
- e.batchMergeExpire()
- case <-e.stopCh:
- // update all the expire before return
- e.batchMergeExpire()
- close(e.expireChannel)
- e.expireChannel = nil
- log.Errorln("expire manager EXIT!")
- return
- default:
- // if expire number is bigger then 80 then update
- if len(e.expireChannel) >= o.Options.ExpireQueueDealLength {
- e.batchMergeExpire()
- } else {
- time.Sleep(1 * time.Second)
- }
- }
- }
- }
- func (e *ExpireManager) batchMergeExpire() {
- e.mergeLock.Lock()
- defer e.mergeLock.Unlock()
- expireRequestNumber := len(e.expireChannel)
- // If the expireRequestNumber then return right now.
- if expireRequestNumber <= 0 {
- return
- }
- dirtyHostSets := sets.NewString()
- dirtyBaremetalSets := sets.NewString()
- dirtyHosts := make([]*expireHost, 0)
- dirtyBaremetals := make([]*expireHost, 0)
- // Merge all same host.
- for i := 0; i < expireRequestNumber; i++ {
- expireArgs := <-e.expireChannel
- log.V(4).Infof("Get expireArgs from channel: %#v", expireArgs)
- dirtyHostSets.Insert(expireArgs.DirtyHosts...)
- for _, host := range expireArgs.DirtyHosts {
- dirtyHosts = append(dirtyHosts, newExpireHost(host, expireArgs.SessionId))
- }
- dirtyBaremetalSets.Insert(expireArgs.DirtyBaremetals...)
- for _, baremetal := range expireArgs.DirtyBaremetals {
- dirtyBaremetals = append(dirtyBaremetals, newExpireHost(baremetal, expireArgs.SessionId))
- }
- }
- log.V(4).Infof("batchMergeExpire dirtyHosts: %v, dirtyBaremetals: %v", dirtyHosts, dirtyBaremetals)
- wg := &sync.WaitGroup{}
- wg.Add(2)
- go func() {
- defer wg.Done()
- //dirtyHosts = notInSession(dirtyHosts, "host")
- if len(dirtyHosts) > 0 {
- log.V(10).Debugf("CleanDirty Hosts: %v\n", dirtyHosts)
- if _, err := schedManager.CandidateManager.Reload("host", dirtyHostSets.List()); err != nil {
- log.Errorf("Clean dirty hosts %v: %v", dirtyHosts, err)
- }
- schedManager.HistoryManager.CancelCandidatesPendingUsage(dirtyHosts)
- }
- }()
- go func() {
- defer wg.Done()
- //dirtyBaremetals = notInSession(dirtyBaremetals, "baremetal")
- if len(dirtyBaremetals) > 0 {
- log.V(10).Debugf("CleanDirty Baremetals: %v\n", dirtyBaremetals)
- if _, err := schedManager.CandidateManager.Reload("baremetal", dirtyBaremetalSets.List()); err != nil {
- log.Errorf("Clean dirty baremetals %v: %v", dirtyBaremetals, err)
- }
- schedManager.HistoryManager.CancelCandidatesPendingUsage(dirtyBaremetals)
- }
- }()
- if ok := e.waitTimeOut(wg, u.ToDuration(o.Options.ExpireQueueConsumptionTimeout)); !ok {
- log.Errorln("time out reload data.")
- }
- }
- func (e *ExpireManager) waitTimeOut(wg *sync.WaitGroup, timeout time.Duration) bool {
- ch := make(chan struct{})
- go func() {
- wg.Wait()
- close(ch)
- }()
- select {
- case <-ch:
- return true
- case <-time.After(timeout):
- return false
- }
- }
|