workers.go 8.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395
  1. // Copyright 2019 Yunion
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package appsrv
  15. import (
  16. "container/list"
  17. "context"
  18. "fmt"
  19. "net/http"
  20. "runtime/debug"
  21. "sync"
  22. "time"
  23. "yunion.io/x/jsonutils"
  24. "yunion.io/x/log"
  25. "yunion.io/x/pkg/errors"
  26. )
  27. const (
  28. WORKER_STATE_ACTIVE = 0
  29. WORKER_STATE_DETACH = 1
  30. )
  31. var isDebug = false
  32. func enableDebug() {
  33. isDebug = true
  34. }
  35. var (
  36. workerManagers []*SWorkerManager
  37. workerManagerLock *sync.Mutex
  38. )
  39. func init() {
  40. workerManagers = make([]*SWorkerManager, 0)
  41. workerManagerLock = &sync.Mutex{}
  42. }
  43. type SWorker struct {
  44. id uint64
  45. state int
  46. container *list.Element
  47. task *sWorkerTask
  48. manager *SWorkerManager
  49. }
  50. func newWorker(id uint64, manager *SWorkerManager) *SWorker {
  51. return &SWorker{
  52. id: id,
  53. state: WORKER_STATE_ACTIVE,
  54. container: nil,
  55. manager: manager,
  56. }
  57. }
  58. func (worker *SWorker) isDetached() bool {
  59. worker.manager.workerLock.Lock()
  60. defer worker.manager.workerLock.Unlock()
  61. return worker.state == WORKER_STATE_DETACH
  62. }
  63. func (worker *SWorker) run() {
  64. defer worker.manager.removeWorker(worker)
  65. for {
  66. if worker.isDetached() {
  67. break
  68. }
  69. req := worker.manager.queue.Pop()
  70. if req != nil {
  71. task := req.(*sWorkerTask)
  72. if worker.manager.cancelPrevIdent {
  73. // cancel previous identical tasks
  74. findIdent := false
  75. worker.manager.queue.Range(func(iReq interface{}) bool {
  76. iTask := iReq.(*sWorkerTask)
  77. if iTask.task.Dump() == task.task.Dump() {
  78. // found idential task
  79. findIdent = true
  80. return false
  81. }
  82. return true
  83. })
  84. if findIdent {
  85. continue
  86. }
  87. }
  88. if task.worker != nil {
  89. task.worker <- worker
  90. }
  91. task.start = time.Now()
  92. worker.task = task
  93. execCallback(task)
  94. } else {
  95. break
  96. }
  97. }
  98. }
  99. func (worker *SWorker) Detach(reason string) {
  100. worker.manager.workerLock.Lock()
  101. defer worker.manager.workerLock.Unlock()
  102. worker.state = WORKER_STATE_DETACH
  103. worker.manager.activeWorker.removeWithLock(worker)
  104. worker.manager.detachedWorker.addWithLock(worker)
  105. log.Warningf("detach worker %s(%s) due to reason %s after %s", worker, worker.task.task.Dump(), reason, time.Now().Sub(worker.task.start))
  106. worker.manager.scheduleWithLock()
  107. }
  108. func (worker *SWorker) StateStr() string {
  109. if worker.state == WORKER_STATE_ACTIVE {
  110. return "active"
  111. } else {
  112. return "detach"
  113. }
  114. }
  115. func (worker *SWorker) String() string {
  116. workerInfo := ""
  117. if worker.task != nil {
  118. workerInfo = worker.task.task.Dump()
  119. }
  120. return fmt.Sprintf("#%d(%p, %s) %s", worker.id, worker, worker.StateStr(), workerInfo)
  121. }
  122. type SWorkerList struct {
  123. list *list.List
  124. }
  125. func newWorkerList() *SWorkerList {
  126. return &SWorkerList{
  127. list: list.New(),
  128. }
  129. }
  130. func (wl *SWorkerList) addWithLock(worker *SWorker) {
  131. ele := wl.list.PushBack(worker)
  132. worker.container = ele
  133. }
  134. func (wl *SWorkerList) removeWithLock(worker *SWorker) {
  135. wl.list.Remove(worker.container)
  136. worker.container = nil
  137. }
  138. func (wl *SWorkerList) size() int {
  139. return wl.list.Len()
  140. }
  141. type SWorkerManager struct {
  142. name string
  143. queue *Ring
  144. workerCount int
  145. backlog int
  146. activeWorker *SWorkerList
  147. detachedWorker *SWorkerList
  148. workerLock *sync.Mutex
  149. workerId uint64
  150. dbWorker bool
  151. ignoreOverflow bool
  152. cancelPrevIdent bool
  153. queueInitHook func() error
  154. queueEmptyHook func() error
  155. }
  156. func NewWorkerManager(name string, workerCount int, backlog int, dbWorker bool) *SWorkerManager {
  157. return NewWorkerManagerIgnoreOverflow(name, workerCount, backlog, dbWorker, false)
  158. }
  159. func NewWorkerManagerIgnoreOverflow(name string, workerCount int, backlog int, dbWorker bool, ignoreOverflow bool) *SWorkerManager {
  160. if workerCount <= 0 {
  161. workerCount = 1
  162. }
  163. if backlog <= 0 {
  164. backlog = 128
  165. }
  166. manager := SWorkerManager{name: name,
  167. queue: NewRing(workerCount * backlog),
  168. workerCount: workerCount,
  169. backlog: backlog,
  170. activeWorker: newWorkerList(),
  171. detachedWorker: newWorkerList(),
  172. workerLock: &sync.Mutex{},
  173. workerId: 0,
  174. dbWorker: dbWorker,
  175. ignoreOverflow: ignoreOverflow,
  176. cancelPrevIdent: false,
  177. }
  178. workerManagerLock.Lock()
  179. defer workerManagerLock.Unlock()
  180. workerManagers = append(workerManagers, &manager)
  181. return &manager
  182. }
  183. type IWorkerTask interface {
  184. Run()
  185. Dump() string
  186. }
  187. type sWorkerTask struct {
  188. task IWorkerTask
  189. worker chan *SWorker
  190. onError func(error)
  191. start time.Time
  192. }
  193. func (wm *SWorkerManager) SetQueueInitHook(f func() error) {
  194. wm.queueInitHook = f
  195. }
  196. func (wm *SWorkerManager) SetQueueEmptyHook(f func() error) {
  197. wm.queueEmptyHook = f
  198. }
  199. func (wm *SWorkerManager) EnableCancelPreviousIdenticalTask() {
  200. wm.cancelPrevIdent = true
  201. }
  202. func (wm *SWorkerManager) UpdateWorkerCount(workerCount int) error {
  203. wm.workerLock.Lock()
  204. defer wm.workerLock.Unlock()
  205. if wm.queue.Size() > 0 {
  206. return errors.Errorf("worker queue is not empty")
  207. }
  208. wm.queue = NewRing(workerCount * wm.backlog)
  209. wm.workerCount = workerCount
  210. return nil
  211. }
  212. func (wm *SWorkerManager) String() string {
  213. return wm.name
  214. }
  215. func (wm *SWorkerManager) Run(task IWorkerTask, worker chan *SWorker, onErr func(error)) bool {
  216. ret := wm.queue.Push(&sWorkerTask{task: task, worker: worker, onError: onErr})
  217. if ret {
  218. wm.schedule()
  219. } else if !wm.ignoreOverflow {
  220. log.Warningf("[%s] queue full, task dropped", wm)
  221. }
  222. return ret
  223. }
  224. func (wm *SWorkerManager) removeWorker(worker *SWorker) {
  225. wm.workerLock.Lock()
  226. defer wm.workerLock.Unlock()
  227. if worker.state == WORKER_STATE_ACTIVE {
  228. wm.activeWorker.removeWithLock(worker)
  229. } else {
  230. wm.detachedWorker.removeWithLock(worker)
  231. }
  232. if wm.activeWorker.size()+wm.detachedWorker.size() == 0 && wm.queueEmptyHook != nil {
  233. err := wm.queueEmptyHook()
  234. if err != nil {
  235. log.Errorf("queueEmptyHook fail %s", err)
  236. }
  237. }
  238. }
  239. func execCallback(task *sWorkerTask) {
  240. defer func() {
  241. if r := recover(); r != nil {
  242. log.Errorf("WorkerManager exec callback error: %s", r)
  243. if task.onError != nil {
  244. task.onError(fmt.Errorf("%s", r))
  245. }
  246. debug.PrintStack()
  247. }
  248. }()
  249. task.task.Run()
  250. }
  251. func (wm *SWorkerManager) schedule() {
  252. wm.workerLock.Lock()
  253. defer wm.workerLock.Unlock()
  254. wm.scheduleWithLock()
  255. }
  256. func (wm *SWorkerManager) scheduleWithLock() {
  257. if wm.activeWorker.size()+wm.detachedWorker.size() == 0 && wm.queueInitHook != nil {
  258. err := wm.queueInitHook()
  259. if err != nil {
  260. log.Errorf("queueInitHook fail %s", err)
  261. return
  262. }
  263. }
  264. queueSize := wm.queue.Size()
  265. if wm.activeWorker.size() < wm.workerCount && queueSize > 0 {
  266. wm.workerId += 1
  267. worker := newWorker(wm.workerId, wm)
  268. wm.activeWorker.addWithLock(worker)
  269. if isDebug {
  270. log.Debugf("no enough worker, add new worker %s", worker)
  271. }
  272. go worker.run()
  273. } else if queueSize > 50 {
  274. w := wm.activeWorker.list.Front()
  275. if w != nil {
  276. worker := w.Value.(*SWorker)
  277. log.Warningf("work [%s]%s stucking for a while", worker.task.start, worker.task.task.Dump())
  278. }
  279. } else if queueSize > 10 {
  280. log.Warningf("[%s] BUSY activeWork %d detachedWork %d max %d queue: %d", wm, wm.ActiveWorkerCount(), wm.DetachedWorkerCount(), wm.workerCount, wm.queue.Size())
  281. }
  282. }
  283. func (wm *SWorkerManager) ActiveWorkerCount() int {
  284. return wm.activeWorker.size()
  285. }
  286. func (wm *SWorkerManager) DetachedWorkerCount() int {
  287. return wm.detachedWorker.size()
  288. }
  289. type SWorkerManagerStates struct {
  290. Name string
  291. Backlog int
  292. QueueCnt int
  293. MaxWorkerCnt int
  294. ActiveWorkerCnt int
  295. DetachWorkerCnt int
  296. DbWorker bool
  297. AllowOverflow bool
  298. }
  299. func (s SWorkerManagerStates) IsBusy() bool {
  300. if s.QueueCnt == 0 && s.ActiveWorkerCnt == 0 && s.DetachWorkerCnt == 0 {
  301. return false
  302. }
  303. return true
  304. }
  305. func (wm *SWorkerManager) getState() SWorkerManagerStates {
  306. state := SWorkerManagerStates{}
  307. state.Name = wm.name
  308. state.Backlog = wm.backlog
  309. state.QueueCnt = wm.queue.Size()
  310. state.MaxWorkerCnt = wm.workerCount
  311. state.ActiveWorkerCnt = wm.activeWorker.size()
  312. state.DetachWorkerCnt = wm.detachedWorker.size()
  313. state.DbWorker = wm.dbWorker
  314. state.AllowOverflow = wm.ignoreOverflow
  315. return state
  316. }
  317. func WorkerStatsHandler(ctx context.Context, w http.ResponseWriter, r *http.Request) {
  318. stats := make([]SWorkerManagerStates, 0)
  319. for i := 0; i < len(workerManagers); i += 1 {
  320. stats = append(stats, workerManagers[i].getState())
  321. }
  322. result := jsonutils.NewDict()
  323. result.Add(jsonutils.Marshal(&stats), "workers")
  324. fmt.Fprintf(w, "%s", result.String())
  325. }
  326. func GetDBConnectionCount() int {
  327. conn := 0
  328. for i := 0; i < len(workerManagers); i += 1 {
  329. if workerManagers[i].dbWorker {
  330. conn += workerManagers[i].workerCount
  331. }
  332. }
  333. return conn
  334. }