cronman.go 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471
  1. // Copyright 2019 Yunion
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package cronman
  15. import (
  16. "container/heap"
  17. "context"
  18. "fmt"
  19. "runtime/debug"
  20. "sync"
  21. "time"
  22. "yunion.io/x/log"
  23. "yunion.io/x/pkg/appctx"
  24. "yunion.io/x/pkg/errors"
  25. "yunion.io/x/pkg/util/version"
  26. "yunion.io/x/onecloud/pkg/appsrv"
  27. "yunion.io/x/onecloud/pkg/cloudcommon/consts"
  28. "yunion.io/x/onecloud/pkg/cloudcommon/elect"
  29. "yunion.io/x/onecloud/pkg/mcclient"
  30. "yunion.io/x/onecloud/pkg/mcclient/auth"
  31. "yunion.io/x/onecloud/pkg/mcclient/modules/yunionconf"
  32. "yunion.io/x/onecloud/pkg/util/ctx"
  33. )
  34. var (
  35. DefaultAdminSessionGenerator = auth.AdminCredential
  36. ErrCronJobNameConflict = errors.Error("Cron job Name Conflict")
  37. )
  38. type TCronJobFunction func(ctx context.Context, userCred mcclient.TokenCredential, isStart bool)
  39. type TCronJobFunctionWithStartTime func(ctx context.Context, userCred mcclient.TokenCredential, start time.Time, isStart bool)
  40. var manager *SCronJobManager
  41. type ICronTimer interface {
  42. Next(time.Time) time.Time
  43. }
  44. type Timer1 struct {
  45. dur time.Duration
  46. }
  47. func (t *Timer1) Next(now time.Time) time.Time {
  48. return now.Add(t.dur)
  49. }
  50. type Timer2 struct {
  51. day, hour, min, sec int
  52. }
  53. func (t *Timer2) Next(now time.Time) time.Time {
  54. next := now.Add(time.Hour * time.Duration(t.day) * 24)
  55. nextTime := time.Date(next.Year(), next.Month(), next.Day(), t.hour, t.min, t.sec, 0, next.Location())
  56. if nextTime.Sub(now) > time.Duration(t.day)*time.Hour*24 {
  57. nextTime = nextTime.Add(-time.Duration(t.day) * time.Hour * 24)
  58. }
  59. return nextTime
  60. }
  61. type TimerHour struct {
  62. hour, min, sec int
  63. }
  64. func (t *TimerHour) Next(now time.Time) time.Time {
  65. next := now.Add(time.Hour * time.Duration(t.hour))
  66. nextTime := time.Date(next.Year(), next.Month(), next.Day(), next.Hour(), t.min, t.sec, 0, next.Location())
  67. if nextTime.Sub(now) > time.Duration(t.hour)*time.Hour {
  68. nextTime = nextTime.Add(-time.Duration(t.hour) * time.Hour)
  69. }
  70. return nextTime
  71. }
  72. type SCronJob struct {
  73. Name string
  74. job TCronJobFunction
  75. jobWithStartTime TCronJobFunctionWithStartTime
  76. Timer ICronTimer
  77. Next time.Time
  78. StartRun bool
  79. times []time.Time
  80. }
  81. type CronJobTimerHeap []*SCronJob
  82. func (c CronJobTimerHeap) String() string {
  83. var s string
  84. for i := 0; i < len(c); i++ {
  85. s += c[i].Name + " : " + c[i].Next.String() + "\n"
  86. }
  87. return s
  88. }
  89. func (cjth CronJobTimerHeap) Len() int {
  90. return len(cjth)
  91. }
  92. func (cjth CronJobTimerHeap) Swap(i, j int) {
  93. cjth[i], cjth[j] = cjth[j], cjth[i]
  94. }
  95. func (cjth CronJobTimerHeap) Less(i, j int) bool {
  96. if cjth[i].Next.IsZero() {
  97. return false
  98. }
  99. if cjth[j].Next.IsZero() {
  100. return true
  101. }
  102. return cjth[i].Next.Before(cjth[j].Next)
  103. }
  104. func (cjth *CronJobTimerHeap) Push(x interface{}) {
  105. *cjth = append(*cjth, x.(*SCronJob))
  106. }
  107. func (cjth *CronJobTimerHeap) Pop() interface{} {
  108. old := *cjth
  109. n := old.Len()
  110. x := old[n-1]
  111. *cjth = old[0 : n-1]
  112. return x
  113. }
  114. type SCronJobManager struct {
  115. jobs CronJobTimerHeap
  116. stopFunc context.CancelFunc
  117. add chan struct{}
  118. running bool
  119. workers *appsrv.SWorkerManager
  120. dataLock *sync.Mutex
  121. timezone *time.Location
  122. }
  123. func InitCronJobManager(isDbWorker bool, workerCount int, timezone string) *SCronJobManager {
  124. if manager == nil {
  125. tz, err := time.LoadLocation(timezone)
  126. if err != nil {
  127. log.Errorf("InitCronJobManager failed")
  128. tz = time.UTC
  129. }
  130. manager = &SCronJobManager{
  131. jobs: make([]*SCronJob, 0),
  132. workers: appsrv.NewWorkerManager("CronJobWorkers", workerCount, 1024, isDbWorker),
  133. dataLock: new(sync.Mutex),
  134. add: make(chan struct{}),
  135. timezone: tz,
  136. }
  137. }
  138. return manager
  139. }
  140. func GetCronJobManager() *SCronJobManager {
  141. return manager
  142. }
  143. func (self *SCronJobManager) IsNameUnique(name string) bool {
  144. for i := 0; i < len(self.jobs); i++ {
  145. if self.jobs[i].Name == name {
  146. return false
  147. }
  148. }
  149. return true
  150. }
  151. func (self *SCronJobManager) String() string {
  152. return self.jobs.String()
  153. }
  154. func (self *SCronJobManager) AddJobAtIntervals(name string, interval time.Duration, jobFunc TCronJobFunction) error {
  155. return self.AddJobAtIntervalsWithStartRun(name, interval, jobFunc, false)
  156. }
  157. func (self *SCronJobManager) AddJobAtIntervalsWithStarTime(name string, interval time.Duration, jobFunc TCronJobFunctionWithStartTime) error {
  158. return self.AddJobAtIntervalsWithStarTimeStartRun(name, interval, jobFunc, false)
  159. }
  160. func (self *SCronJobManager) AddJobAtIntervalsWithStarTimeStartRun(name string, interval time.Duration, jobFunc TCronJobFunctionWithStartTime, startRun bool) error {
  161. if interval <= 0 {
  162. return errors.Error("AddJobAtIntervals: interval must > 0")
  163. }
  164. self.dataLock.Lock()
  165. defer self.dataLock.Unlock()
  166. if !self.IsNameUnique(name) {
  167. return ErrCronJobNameConflict
  168. }
  169. t := Timer1{
  170. dur: interval,
  171. }
  172. job := SCronJob{
  173. Name: name,
  174. jobWithStartTime: jobFunc,
  175. Timer: &t,
  176. StartRun: startRun,
  177. }
  178. if !self.running {
  179. self.jobs = append(self.jobs, &job)
  180. } else {
  181. self.addJob(&job)
  182. }
  183. return nil
  184. }
  185. func (self *SCronJobManager) AddJobAtIntervalsWithStartRun(name string, interval time.Duration, jobFunc TCronJobFunction, startRun bool) error {
  186. if interval <= 0 {
  187. return errors.Error("AddJobAtIntervals: interval must > 0")
  188. }
  189. self.dataLock.Lock()
  190. defer self.dataLock.Unlock()
  191. if !self.IsNameUnique(name) {
  192. return ErrCronJobNameConflict
  193. }
  194. t := Timer1{
  195. dur: interval,
  196. }
  197. job := SCronJob{
  198. Name: name,
  199. job: jobFunc,
  200. Timer: &t,
  201. StartRun: startRun,
  202. }
  203. if !self.running {
  204. self.jobs = append(self.jobs, &job)
  205. } else {
  206. self.addJob(&job)
  207. }
  208. return nil
  209. }
  210. func (self *SCronJobManager) AddJobEveryFewDays(name string, day, hour, min, sec int, jobFunc TCronJobFunction, startRun bool) error {
  211. switch {
  212. case day <= 0:
  213. return errors.Error("AddJobEveryFewDays: day must > 0")
  214. case hour < 0:
  215. return errors.Error("AddJobEveryFewDays: hour must > 0")
  216. case min < 0:
  217. return errors.Error("AddJobEveryFewDays: min must > 0")
  218. case sec < 0:
  219. return errors.Error("AddJobEveryFewDays: sec must > 0")
  220. }
  221. self.dataLock.Lock()
  222. defer self.dataLock.Unlock()
  223. if !self.IsNameUnique(name) {
  224. return ErrCronJobNameConflict
  225. }
  226. t := Timer2{
  227. day: day,
  228. hour: hour,
  229. min: min,
  230. sec: sec,
  231. }
  232. job := SCronJob{
  233. Name: name,
  234. job: jobFunc,
  235. Timer: &t,
  236. StartRun: startRun,
  237. }
  238. if !self.running {
  239. self.jobs = append(self.jobs, &job)
  240. } else {
  241. self.addJob(&job)
  242. }
  243. return nil
  244. }
  245. func (self *SCronJobManager) AddJobEveryFewHour(name string, hour, min, sec int, jobFunc TCronJobFunction, startRun bool) error {
  246. switch {
  247. case hour <= 0:
  248. return errors.Error("AddJobEveryFewHour: hour must > 0")
  249. case min < 0:
  250. return errors.Error("AddJobEveryFewHour: min must > 0")
  251. case sec < 0:
  252. return errors.Error("AddJobEveryFewHour: sec must > 0")
  253. }
  254. self.dataLock.Lock()
  255. defer self.dataLock.Unlock()
  256. if !self.IsNameUnique(name) {
  257. return ErrCronJobNameConflict
  258. }
  259. t := TimerHour{
  260. hour: hour,
  261. min: min,
  262. sec: sec,
  263. }
  264. job := SCronJob{
  265. Name: name,
  266. job: jobFunc,
  267. Timer: &t,
  268. StartRun: startRun,
  269. }
  270. if !self.running {
  271. self.jobs = append(self.jobs, &job)
  272. } else {
  273. self.addJob(&job)
  274. }
  275. return nil
  276. }
  277. func (self *SCronJobManager) addJob(newJob *SCronJob) {
  278. now := time.Now().In(self.timezone)
  279. newJob.Next = newJob.Timer.Next(now)
  280. if newJob.StartRun {
  281. newJob.runJob(true, now)
  282. }
  283. heap.Push(&self.jobs, newJob)
  284. go func() { self.add <- struct{}{} }()
  285. }
  286. func (self *SCronJobManager) Remove(name string) error {
  287. self.dataLock.Lock()
  288. defer self.dataLock.Unlock()
  289. var jobIndex = -1
  290. for i := 0; i < len(self.jobs); i++ {
  291. if self.jobs[i].Name == name {
  292. jobIndex = i
  293. break
  294. }
  295. }
  296. if jobIndex == -1 {
  297. return errors.Errorf("job %s not found", name)
  298. }
  299. heap.Remove(&self.jobs, jobIndex)
  300. return nil
  301. }
  302. func (self *SCronJobManager) next(now time.Time) {
  303. for _, job := range self.jobs {
  304. job.Next = job.Timer.Next(now)
  305. }
  306. }
  307. func (self *SCronJobManager) Start2(ctx context.Context, electObj *elect.Elect) {
  308. ctx, self.stopFunc = context.WithCancel(ctx)
  309. if electObj == nil {
  310. self.start(ctx)
  311. return
  312. }
  313. electObj.SubscribeWithAction(ctx, func() { self.start(ctx) }, self.Stop)
  314. }
  315. func (self *SCronJobManager) Start() {
  316. ctx := ctx.CtxWithTime()
  317. ctx, self.stopFunc = context.WithCancel(ctx)
  318. self.start(ctx)
  319. }
  320. func (self *SCronJobManager) start(ctx context.Context) {
  321. if self.running {
  322. return
  323. }
  324. self.dataLock.Lock()
  325. defer self.dataLock.Unlock()
  326. self.running = true
  327. self.init()
  328. go self.run(ctx)
  329. }
  330. func (self *SCronJobManager) Stop() {
  331. self.stopFunc()
  332. }
  333. func (self *SCronJobManager) init() {
  334. now := time.Now().In(self.timezone)
  335. self.next(now)
  336. heap.Init(&self.jobs)
  337. for i := 0; i < len(self.jobs); i += 1 {
  338. if self.jobs[i].StartRun {
  339. self.jobs[i].StartRun = false
  340. self.jobs[i].runJob(true, now)
  341. }
  342. }
  343. }
  344. func (self *SCronJobManager) run(ctx context.Context) {
  345. var timer *time.Timer
  346. var now = time.Now().In(self.timezone)
  347. for {
  348. self.dataLock.Lock()
  349. if len(self.jobs) == 0 || self.jobs[0].Next.IsZero() {
  350. timer = time.NewTimer(100000 * time.Hour)
  351. } else {
  352. timer = time.NewTimer(self.jobs[0].Next.Sub(now))
  353. }
  354. self.dataLock.Unlock()
  355. select {
  356. case now = <-timer.C:
  357. self.runJobs(now)
  358. case <-self.add:
  359. continue
  360. case <-ctx.Done():
  361. timer.Stop()
  362. self.running = false
  363. return
  364. }
  365. }
  366. }
  367. func (self *SCronJobManager) runJobs(now time.Time) {
  368. self.dataLock.Lock()
  369. defer self.dataLock.Unlock()
  370. for i := 0; i < len(self.jobs); i++ {
  371. if !(self.jobs[i].Next.After(now) || self.jobs[i].Next.IsZero()) {
  372. self.jobs[i].runJob(false, now)
  373. self.jobs[i].Next = self.jobs[i].Timer.Next(now)
  374. heap.Fix(&self.jobs, i)
  375. }
  376. }
  377. }
  378. func (job *SCronJob) Run() {
  379. startTime := time.Now()
  380. if len(job.times) > 0 {
  381. startTime = job.times[0]
  382. job.times = job.times[1:]
  383. }
  384. job.runJobInWorker(job.StartRun, startTime)
  385. }
  386. func (job *SCronJob) Dump() string {
  387. return ""
  388. }
  389. func (job *SCronJob) runJob(isStart bool, now time.Time) {
  390. job.StartRun = isStart
  391. job.times = append(job.times, now)
  392. manager.workers.Run(job, nil, nil)
  393. }
  394. func (job *SCronJob) runJobInWorker(isStart bool, startTime time.Time) {
  395. defer func() {
  396. if r := recover(); r != nil {
  397. log.Errorf("CronJob task %s run error: %s", job.Name, r)
  398. debug.PrintStack()
  399. yunionconf.BugReport.SendBugReport(context.Background(), version.GetShortString(), string(debug.Stack()), errors.Errorf("%s", r))
  400. }
  401. }()
  402. log.Debugf("Cron job: %s started, startTime: %s", job.Name, startTime.Format(time.RFC3339))
  403. ctx := context.Background()
  404. ctx = context.WithValue(ctx, appctx.APP_CONTEXT_KEY_APPNAME, fmt.Sprintf("%s/cron-service", consts.GetServiceName()))
  405. ctx = context.WithValue(ctx, appctx.APP_CONTEXT_KEY_TASKNAME, fmt.Sprintf("%s-%d", job.Name, time.Now().Unix()))
  406. userCred := DefaultAdminSessionGenerator()
  407. if job.job != nil {
  408. job.job(ctx, userCred, isStart)
  409. } else if job.jobWithStartTime != nil {
  410. job.jobWithStartTime(ctx, userCred, startTime, isStart)
  411. }
  412. }