| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395 |
- // Copyright 2019 Yunion
- //
- // Licensed under the Apache License, Version 2.0 (the "License");
- // you may not use this file except in compliance with the License.
- // You may obtain a copy of the License at
- //
- // http://www.apache.org/licenses/LICENSE-2.0
- //
- // Unless required by applicable law or agreed to in writing, software
- // distributed under the License is distributed on an "AS IS" BASIS,
- // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- // See the License for the specific language governing permissions and
- // limitations under the License.
- package appsrv
- import (
- "container/list"
- "context"
- "fmt"
- "net/http"
- "runtime/debug"
- "sync"
- "time"
- "yunion.io/x/jsonutils"
- "yunion.io/x/log"
- "yunion.io/x/pkg/errors"
- )
- const (
- WORKER_STATE_ACTIVE = 0
- WORKER_STATE_DETACH = 1
- )
- var isDebug = false
- func enableDebug() {
- isDebug = true
- }
- var (
- workerManagers []*SWorkerManager
- workerManagerLock *sync.Mutex
- )
- func init() {
- workerManagers = make([]*SWorkerManager, 0)
- workerManagerLock = &sync.Mutex{}
- }
- type SWorker struct {
- id uint64
- state int
- container *list.Element
- task *sWorkerTask
- manager *SWorkerManager
- }
- func newWorker(id uint64, manager *SWorkerManager) *SWorker {
- return &SWorker{
- id: id,
- state: WORKER_STATE_ACTIVE,
- container: nil,
- manager: manager,
- }
- }
- func (worker *SWorker) isDetached() bool {
- worker.manager.workerLock.Lock()
- defer worker.manager.workerLock.Unlock()
- return worker.state == WORKER_STATE_DETACH
- }
- func (worker *SWorker) run() {
- defer worker.manager.removeWorker(worker)
- for {
- if worker.isDetached() {
- break
- }
- req := worker.manager.queue.Pop()
- if req != nil {
- task := req.(*sWorkerTask)
- if worker.manager.cancelPrevIdent {
- // cancel previous identical tasks
- findIdent := false
- worker.manager.queue.Range(func(iReq interface{}) bool {
- iTask := iReq.(*sWorkerTask)
- if iTask.task.Dump() == task.task.Dump() {
- // found idential task
- findIdent = true
- return false
- }
- return true
- })
- if findIdent {
- continue
- }
- }
- if task.worker != nil {
- task.worker <- worker
- }
- task.start = time.Now()
- worker.task = task
- execCallback(task)
- } else {
- break
- }
- }
- }
- func (worker *SWorker) Detach(reason string) {
- worker.manager.workerLock.Lock()
- defer worker.manager.workerLock.Unlock()
- worker.state = WORKER_STATE_DETACH
- worker.manager.activeWorker.removeWithLock(worker)
- worker.manager.detachedWorker.addWithLock(worker)
- log.Warningf("detach worker %s(%s) due to reason %s after %s", worker, worker.task.task.Dump(), reason, time.Now().Sub(worker.task.start))
- worker.manager.scheduleWithLock()
- }
- func (worker *SWorker) StateStr() string {
- if worker.state == WORKER_STATE_ACTIVE {
- return "active"
- } else {
- return "detach"
- }
- }
- func (worker *SWorker) String() string {
- workerInfo := ""
- if worker.task != nil {
- workerInfo = worker.task.task.Dump()
- }
- return fmt.Sprintf("#%d(%p, %s) %s", worker.id, worker, worker.StateStr(), workerInfo)
- }
- type SWorkerList struct {
- list *list.List
- }
- func newWorkerList() *SWorkerList {
- return &SWorkerList{
- list: list.New(),
- }
- }
- func (wl *SWorkerList) addWithLock(worker *SWorker) {
- ele := wl.list.PushBack(worker)
- worker.container = ele
- }
- func (wl *SWorkerList) removeWithLock(worker *SWorker) {
- wl.list.Remove(worker.container)
- worker.container = nil
- }
- func (wl *SWorkerList) size() int {
- return wl.list.Len()
- }
- type SWorkerManager struct {
- name string
- queue *Ring
- workerCount int
- backlog int
- activeWorker *SWorkerList
- detachedWorker *SWorkerList
- workerLock *sync.Mutex
- workerId uint64
- dbWorker bool
- ignoreOverflow bool
- cancelPrevIdent bool
- queueInitHook func() error
- queueEmptyHook func() error
- }
- func NewWorkerManager(name string, workerCount int, backlog int, dbWorker bool) *SWorkerManager {
- return NewWorkerManagerIgnoreOverflow(name, workerCount, backlog, dbWorker, false)
- }
- func NewWorkerManagerIgnoreOverflow(name string, workerCount int, backlog int, dbWorker bool, ignoreOverflow bool) *SWorkerManager {
- if workerCount <= 0 {
- workerCount = 1
- }
- if backlog <= 0 {
- backlog = 128
- }
- manager := SWorkerManager{name: name,
- queue: NewRing(workerCount * backlog),
- workerCount: workerCount,
- backlog: backlog,
- activeWorker: newWorkerList(),
- detachedWorker: newWorkerList(),
- workerLock: &sync.Mutex{},
- workerId: 0,
- dbWorker: dbWorker,
- ignoreOverflow: ignoreOverflow,
- cancelPrevIdent: false,
- }
- workerManagerLock.Lock()
- defer workerManagerLock.Unlock()
- workerManagers = append(workerManagers, &manager)
- return &manager
- }
- type IWorkerTask interface {
- Run()
- Dump() string
- }
- type sWorkerTask struct {
- task IWorkerTask
- worker chan *SWorker
- onError func(error)
- start time.Time
- }
- func (wm *SWorkerManager) SetQueueInitHook(f func() error) {
- wm.queueInitHook = f
- }
- func (wm *SWorkerManager) SetQueueEmptyHook(f func() error) {
- wm.queueEmptyHook = f
- }
- func (wm *SWorkerManager) EnableCancelPreviousIdenticalTask() {
- wm.cancelPrevIdent = true
- }
- func (wm *SWorkerManager) UpdateWorkerCount(workerCount int) error {
- wm.workerLock.Lock()
- defer wm.workerLock.Unlock()
- if wm.queue.Size() > 0 {
- return errors.Errorf("worker queue is not empty")
- }
- wm.queue = NewRing(workerCount * wm.backlog)
- wm.workerCount = workerCount
- return nil
- }
- func (wm *SWorkerManager) String() string {
- return wm.name
- }
- func (wm *SWorkerManager) Run(task IWorkerTask, worker chan *SWorker, onErr func(error)) bool {
- ret := wm.queue.Push(&sWorkerTask{task: task, worker: worker, onError: onErr})
- if ret {
- wm.schedule()
- } else if !wm.ignoreOverflow {
- log.Warningf("[%s] queue full, task dropped", wm)
- }
- return ret
- }
- func (wm *SWorkerManager) removeWorker(worker *SWorker) {
- wm.workerLock.Lock()
- defer wm.workerLock.Unlock()
- if worker.state == WORKER_STATE_ACTIVE {
- wm.activeWorker.removeWithLock(worker)
- } else {
- wm.detachedWorker.removeWithLock(worker)
- }
- if wm.activeWorker.size()+wm.detachedWorker.size() == 0 && wm.queueEmptyHook != nil {
- err := wm.queueEmptyHook()
- if err != nil {
- log.Errorf("queueEmptyHook fail %s", err)
- }
- }
- }
- func execCallback(task *sWorkerTask) {
- defer func() {
- if r := recover(); r != nil {
- log.Errorf("WorkerManager exec callback error: %s", r)
- if task.onError != nil {
- task.onError(fmt.Errorf("%s", r))
- }
- debug.PrintStack()
- }
- }()
- task.task.Run()
- }
- func (wm *SWorkerManager) schedule() {
- wm.workerLock.Lock()
- defer wm.workerLock.Unlock()
- wm.scheduleWithLock()
- }
- func (wm *SWorkerManager) scheduleWithLock() {
- if wm.activeWorker.size()+wm.detachedWorker.size() == 0 && wm.queueInitHook != nil {
- err := wm.queueInitHook()
- if err != nil {
- log.Errorf("queueInitHook fail %s", err)
- return
- }
- }
- queueSize := wm.queue.Size()
- if wm.activeWorker.size() < wm.workerCount && queueSize > 0 {
- wm.workerId += 1
- worker := newWorker(wm.workerId, wm)
- wm.activeWorker.addWithLock(worker)
- if isDebug {
- log.Debugf("no enough worker, add new worker %s", worker)
- }
- go worker.run()
- } else if queueSize > 50 {
- w := wm.activeWorker.list.Front()
- if w != nil {
- worker := w.Value.(*SWorker)
- log.Warningf("work [%s]%s stucking for a while", worker.task.start, worker.task.task.Dump())
- }
- } else if queueSize > 10 {
- log.Warningf("[%s] BUSY activeWork %d detachedWork %d max %d queue: %d", wm, wm.ActiveWorkerCount(), wm.DetachedWorkerCount(), wm.workerCount, wm.queue.Size())
- }
- }
- func (wm *SWorkerManager) ActiveWorkerCount() int {
- return wm.activeWorker.size()
- }
- func (wm *SWorkerManager) DetachedWorkerCount() int {
- return wm.detachedWorker.size()
- }
- type SWorkerManagerStates struct {
- Name string
- Backlog int
- QueueCnt int
- MaxWorkerCnt int
- ActiveWorkerCnt int
- DetachWorkerCnt int
- DbWorker bool
- AllowOverflow bool
- }
- func (s SWorkerManagerStates) IsBusy() bool {
- if s.QueueCnt == 0 && s.ActiveWorkerCnt == 0 && s.DetachWorkerCnt == 0 {
- return false
- }
- return true
- }
- func (wm *SWorkerManager) getState() SWorkerManagerStates {
- state := SWorkerManagerStates{}
- state.Name = wm.name
- state.Backlog = wm.backlog
- state.QueueCnt = wm.queue.Size()
- state.MaxWorkerCnt = wm.workerCount
- state.ActiveWorkerCnt = wm.activeWorker.size()
- state.DetachWorkerCnt = wm.detachedWorker.size()
- state.DbWorker = wm.dbWorker
- state.AllowOverflow = wm.ignoreOverflow
- return state
- }
- func WorkerStatsHandler(ctx context.Context, w http.ResponseWriter, r *http.Request) {
- stats := make([]SWorkerManagerStates, 0)
- for i := 0; i < len(workerManagers); i += 1 {
- stats = append(stats, workerManagers[i].getState())
- }
- result := jsonutils.NewDict()
- result.Add(jsonutils.Marshal(&stats), "workers")
- fmt.Fprintf(w, "%s", result.String())
- }
- func GetDBConnectionCount() int {
- conn := 0
- for i := 0; i < len(workerManagers); i += 1 {
- if workerManagers[i].dbWorker {
- conn += workerManagers[i].workerCount
- }
- }
- return conn
- }
|