worker.go 2.1 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374
  1. package worker
  2. import (
  3. "sync"
  4. "yunion.io/x/jsonutils"
  5. "yunion.io/x/log"
  6. "yunion.io/x/onecloud/pkg/appsrv"
  7. "yunion.io/x/onecloud/pkg/cloudcommon/db/taskman"
  8. "yunion.io/x/onecloud/pkg/llm/options"
  9. )
  10. var (
  11. localTaskWorkerManLock *sync.Mutex
  12. backupTaskWorkerMan *appsrv.SWorkerManager
  13. startTaskWorkerMan *appsrv.SWorkerManager
  14. importTaskWorkerMan *appsrv.SWorkerManager
  15. )
  16. func init() {
  17. localTaskWorkerManLock = &sync.Mutex{}
  18. }
  19. func BackupTaskRun(task taskman.ITask, proc func() (jsonutils.JSONObject, error)) {
  20. taskman.LocalTaskRunWithWorkers(task, proc, getBackupTaskWorkerMan())
  21. }
  22. func StartTaskRun(task taskman.ITask, proc func() (jsonutils.JSONObject, error)) {
  23. taskman.LocalTaskRunWithWorkers(task, proc, getStartTaskWorkerMan())
  24. }
  25. func ImportTaskRun(task taskman.ITask, proc func() (jsonutils.JSONObject, error)) {
  26. taskman.LocalTaskRunWithWorkers(task, proc, getImportTaskWorkerMan())
  27. }
  28. func getBackupTaskWorkerMan() *appsrv.SWorkerManager {
  29. localTaskWorkerManLock.Lock()
  30. defer localTaskWorkerManLock.Unlock()
  31. if backupTaskWorkerMan != nil {
  32. return backupTaskWorkerMan
  33. }
  34. log.Infof("BackupTaskWorkerCount %d", options.Options.BackupTaskWorkerCount)
  35. backupTaskWorkerMan = appsrv.NewWorkerManager("BackupTaskWorkerManager", options.Options.BackupTaskWorkerCount, 1024, false)
  36. return backupTaskWorkerMan
  37. }
  38. func getStartTaskWorkerMan() *appsrv.SWorkerManager {
  39. localTaskWorkerManLock.Lock()
  40. defer localTaskWorkerManLock.Unlock()
  41. if startTaskWorkerMan != nil {
  42. return startTaskWorkerMan
  43. }
  44. log.Infof("StartTaskWorkerCount %d", options.Options.StartTaskWorkerCount)
  45. startTaskWorkerMan = appsrv.NewWorkerManager("StartTaskWorkerManager", options.Options.StartTaskWorkerCount, 1024, false)
  46. return startTaskWorkerMan
  47. }
  48. func getImportTaskWorkerMan() *appsrv.SWorkerManager {
  49. localTaskWorkerManLock.Lock()
  50. defer localTaskWorkerManLock.Unlock()
  51. if importTaskWorkerMan != nil {
  52. return importTaskWorkerMan
  53. }
  54. log.Infof("ImportTaskWorkerCount %d", options.Options.ImportTaskWorkerCount)
  55. importTaskWorkerMan = appsrv.NewWorkerManager("ImportTaskWorkerManager", options.Options.ImportTaskWorkerCount, 1024, false)
  56. return importTaskWorkerMan
  57. }