| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471 |
- // Copyright 2019 Yunion
- //
- // Licensed under the Apache License, Version 2.0 (the "License");
- // you may not use this file except in compliance with the License.
- // You may obtain a copy of the License at
- //
- // http://www.apache.org/licenses/LICENSE-2.0
- //
- // Unless required by applicable law or agreed to in writing, software
- // distributed under the License is distributed on an "AS IS" BASIS,
- // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- // See the License for the specific language governing permissions and
- // limitations under the License.
- package cronman
- import (
- "container/heap"
- "context"
- "fmt"
- "runtime/debug"
- "sync"
- "time"
- "yunion.io/x/log"
- "yunion.io/x/pkg/appctx"
- "yunion.io/x/pkg/errors"
- "yunion.io/x/pkg/util/version"
- "yunion.io/x/onecloud/pkg/appsrv"
- "yunion.io/x/onecloud/pkg/cloudcommon/consts"
- "yunion.io/x/onecloud/pkg/cloudcommon/elect"
- "yunion.io/x/onecloud/pkg/mcclient"
- "yunion.io/x/onecloud/pkg/mcclient/auth"
- "yunion.io/x/onecloud/pkg/mcclient/modules/yunionconf"
- "yunion.io/x/onecloud/pkg/util/ctx"
- )
- var (
- DefaultAdminSessionGenerator = auth.AdminCredential
- ErrCronJobNameConflict = errors.Error("Cron job Name Conflict")
- )
- type TCronJobFunction func(ctx context.Context, userCred mcclient.TokenCredential, isStart bool)
- type TCronJobFunctionWithStartTime func(ctx context.Context, userCred mcclient.TokenCredential, start time.Time, isStart bool)
- var manager *SCronJobManager
- type ICronTimer interface {
- Next(time.Time) time.Time
- }
- type Timer1 struct {
- dur time.Duration
- }
- func (t *Timer1) Next(now time.Time) time.Time {
- return now.Add(t.dur)
- }
- type Timer2 struct {
- day, hour, min, sec int
- }
- func (t *Timer2) Next(now time.Time) time.Time {
- next := now.Add(time.Hour * time.Duration(t.day) * 24)
- nextTime := time.Date(next.Year(), next.Month(), next.Day(), t.hour, t.min, t.sec, 0, next.Location())
- if nextTime.Sub(now) > time.Duration(t.day)*time.Hour*24 {
- nextTime = nextTime.Add(-time.Duration(t.day) * time.Hour * 24)
- }
- return nextTime
- }
- type TimerHour struct {
- hour, min, sec int
- }
- func (t *TimerHour) Next(now time.Time) time.Time {
- next := now.Add(time.Hour * time.Duration(t.hour))
- nextTime := time.Date(next.Year(), next.Month(), next.Day(), next.Hour(), t.min, t.sec, 0, next.Location())
- if nextTime.Sub(now) > time.Duration(t.hour)*time.Hour {
- nextTime = nextTime.Add(-time.Duration(t.hour) * time.Hour)
- }
- return nextTime
- }
- type SCronJob struct {
- Name string
- job TCronJobFunction
- jobWithStartTime TCronJobFunctionWithStartTime
- Timer ICronTimer
- Next time.Time
- StartRun bool
- times []time.Time
- }
- type CronJobTimerHeap []*SCronJob
- func (c CronJobTimerHeap) String() string {
- var s string
- for i := 0; i < len(c); i++ {
- s += c[i].Name + " : " + c[i].Next.String() + "\n"
- }
- return s
- }
- func (cjth CronJobTimerHeap) Len() int {
- return len(cjth)
- }
- func (cjth CronJobTimerHeap) Swap(i, j int) {
- cjth[i], cjth[j] = cjth[j], cjth[i]
- }
- func (cjth CronJobTimerHeap) Less(i, j int) bool {
- if cjth[i].Next.IsZero() {
- return false
- }
- if cjth[j].Next.IsZero() {
- return true
- }
- return cjth[i].Next.Before(cjth[j].Next)
- }
- func (cjth *CronJobTimerHeap) Push(x interface{}) {
- *cjth = append(*cjth, x.(*SCronJob))
- }
- func (cjth *CronJobTimerHeap) Pop() interface{} {
- old := *cjth
- n := old.Len()
- x := old[n-1]
- *cjth = old[0 : n-1]
- return x
- }
- type SCronJobManager struct {
- jobs CronJobTimerHeap
- stopFunc context.CancelFunc
- add chan struct{}
- running bool
- workers *appsrv.SWorkerManager
- dataLock *sync.Mutex
- timezone *time.Location
- }
- func InitCronJobManager(isDbWorker bool, workerCount int, timezone string) *SCronJobManager {
- if manager == nil {
- tz, err := time.LoadLocation(timezone)
- if err != nil {
- log.Errorf("InitCronJobManager failed")
- tz = time.UTC
- }
- manager = &SCronJobManager{
- jobs: make([]*SCronJob, 0),
- workers: appsrv.NewWorkerManager("CronJobWorkers", workerCount, 1024, isDbWorker),
- dataLock: new(sync.Mutex),
- add: make(chan struct{}),
- timezone: tz,
- }
- }
- return manager
- }
- func GetCronJobManager() *SCronJobManager {
- return manager
- }
- func (self *SCronJobManager) IsNameUnique(name string) bool {
- for i := 0; i < len(self.jobs); i++ {
- if self.jobs[i].Name == name {
- return false
- }
- }
- return true
- }
- func (self *SCronJobManager) String() string {
- return self.jobs.String()
- }
- func (self *SCronJobManager) AddJobAtIntervals(name string, interval time.Duration, jobFunc TCronJobFunction) error {
- return self.AddJobAtIntervalsWithStartRun(name, interval, jobFunc, false)
- }
- func (self *SCronJobManager) AddJobAtIntervalsWithStarTime(name string, interval time.Duration, jobFunc TCronJobFunctionWithStartTime) error {
- return self.AddJobAtIntervalsWithStarTimeStartRun(name, interval, jobFunc, false)
- }
- func (self *SCronJobManager) AddJobAtIntervalsWithStarTimeStartRun(name string, interval time.Duration, jobFunc TCronJobFunctionWithStartTime, startRun bool) error {
- if interval <= 0 {
- return errors.Error("AddJobAtIntervals: interval must > 0")
- }
- self.dataLock.Lock()
- defer self.dataLock.Unlock()
- if !self.IsNameUnique(name) {
- return ErrCronJobNameConflict
- }
- t := Timer1{
- dur: interval,
- }
- job := SCronJob{
- Name: name,
- jobWithStartTime: jobFunc,
- Timer: &t,
- StartRun: startRun,
- }
- if !self.running {
- self.jobs = append(self.jobs, &job)
- } else {
- self.addJob(&job)
- }
- return nil
- }
- func (self *SCronJobManager) AddJobAtIntervalsWithStartRun(name string, interval time.Duration, jobFunc TCronJobFunction, startRun bool) error {
- if interval <= 0 {
- return errors.Error("AddJobAtIntervals: interval must > 0")
- }
- self.dataLock.Lock()
- defer self.dataLock.Unlock()
- if !self.IsNameUnique(name) {
- return ErrCronJobNameConflict
- }
- t := Timer1{
- dur: interval,
- }
- job := SCronJob{
- Name: name,
- job: jobFunc,
- Timer: &t,
- StartRun: startRun,
- }
- if !self.running {
- self.jobs = append(self.jobs, &job)
- } else {
- self.addJob(&job)
- }
- return nil
- }
- func (self *SCronJobManager) AddJobEveryFewDays(name string, day, hour, min, sec int, jobFunc TCronJobFunction, startRun bool) error {
- switch {
- case day <= 0:
- return errors.Error("AddJobEveryFewDays: day must > 0")
- case hour < 0:
- return errors.Error("AddJobEveryFewDays: hour must > 0")
- case min < 0:
- return errors.Error("AddJobEveryFewDays: min must > 0")
- case sec < 0:
- return errors.Error("AddJobEveryFewDays: sec must > 0")
- }
- self.dataLock.Lock()
- defer self.dataLock.Unlock()
- if !self.IsNameUnique(name) {
- return ErrCronJobNameConflict
- }
- t := Timer2{
- day: day,
- hour: hour,
- min: min,
- sec: sec,
- }
- job := SCronJob{
- Name: name,
- job: jobFunc,
- Timer: &t,
- StartRun: startRun,
- }
- if !self.running {
- self.jobs = append(self.jobs, &job)
- } else {
- self.addJob(&job)
- }
- return nil
- }
- func (self *SCronJobManager) AddJobEveryFewHour(name string, hour, min, sec int, jobFunc TCronJobFunction, startRun bool) error {
- switch {
- case hour <= 0:
- return errors.Error("AddJobEveryFewHour: hour must > 0")
- case min < 0:
- return errors.Error("AddJobEveryFewHour: min must > 0")
- case sec < 0:
- return errors.Error("AddJobEveryFewHour: sec must > 0")
- }
- self.dataLock.Lock()
- defer self.dataLock.Unlock()
- if !self.IsNameUnique(name) {
- return ErrCronJobNameConflict
- }
- t := TimerHour{
- hour: hour,
- min: min,
- sec: sec,
- }
- job := SCronJob{
- Name: name,
- job: jobFunc,
- Timer: &t,
- StartRun: startRun,
- }
- if !self.running {
- self.jobs = append(self.jobs, &job)
- } else {
- self.addJob(&job)
- }
- return nil
- }
- func (self *SCronJobManager) addJob(newJob *SCronJob) {
- now := time.Now().In(self.timezone)
- newJob.Next = newJob.Timer.Next(now)
- if newJob.StartRun {
- newJob.runJob(true, now)
- }
- heap.Push(&self.jobs, newJob)
- go func() { self.add <- struct{}{} }()
- }
- func (self *SCronJobManager) Remove(name string) error {
- self.dataLock.Lock()
- defer self.dataLock.Unlock()
- var jobIndex = -1
- for i := 0; i < len(self.jobs); i++ {
- if self.jobs[i].Name == name {
- jobIndex = i
- break
- }
- }
- if jobIndex == -1 {
- return errors.Errorf("job %s not found", name)
- }
- heap.Remove(&self.jobs, jobIndex)
- return nil
- }
- func (self *SCronJobManager) next(now time.Time) {
- for _, job := range self.jobs {
- job.Next = job.Timer.Next(now)
- }
- }
- func (self *SCronJobManager) Start2(ctx context.Context, electObj *elect.Elect) {
- ctx, self.stopFunc = context.WithCancel(ctx)
- if electObj == nil {
- self.start(ctx)
- return
- }
- electObj.SubscribeWithAction(ctx, func() { self.start(ctx) }, self.Stop)
- }
- func (self *SCronJobManager) Start() {
- ctx := ctx.CtxWithTime()
- ctx, self.stopFunc = context.WithCancel(ctx)
- self.start(ctx)
- }
- func (self *SCronJobManager) start(ctx context.Context) {
- if self.running {
- return
- }
- self.dataLock.Lock()
- defer self.dataLock.Unlock()
- self.running = true
- self.init()
- go self.run(ctx)
- }
- func (self *SCronJobManager) Stop() {
- self.stopFunc()
- }
- func (self *SCronJobManager) init() {
- now := time.Now().In(self.timezone)
- self.next(now)
- heap.Init(&self.jobs)
- for i := 0; i < len(self.jobs); i += 1 {
- if self.jobs[i].StartRun {
- self.jobs[i].StartRun = false
- self.jobs[i].runJob(true, now)
- }
- }
- }
- func (self *SCronJobManager) run(ctx context.Context) {
- var timer *time.Timer
- var now = time.Now().In(self.timezone)
- for {
- self.dataLock.Lock()
- if len(self.jobs) == 0 || self.jobs[0].Next.IsZero() {
- timer = time.NewTimer(100000 * time.Hour)
- } else {
- timer = time.NewTimer(self.jobs[0].Next.Sub(now))
- }
- self.dataLock.Unlock()
- select {
- case now = <-timer.C:
- self.runJobs(now)
- case <-self.add:
- continue
- case <-ctx.Done():
- timer.Stop()
- self.running = false
- return
- }
- }
- }
- func (self *SCronJobManager) runJobs(now time.Time) {
- self.dataLock.Lock()
- defer self.dataLock.Unlock()
- for i := 0; i < len(self.jobs); i++ {
- if !(self.jobs[i].Next.After(now) || self.jobs[i].Next.IsZero()) {
- self.jobs[i].runJob(false, now)
- self.jobs[i].Next = self.jobs[i].Timer.Next(now)
- heap.Fix(&self.jobs, i)
- }
- }
- }
- func (job *SCronJob) Run() {
- startTime := time.Now()
- if len(job.times) > 0 {
- startTime = job.times[0]
- job.times = job.times[1:]
- }
- job.runJobInWorker(job.StartRun, startTime)
- }
- func (job *SCronJob) Dump() string {
- return ""
- }
- func (job *SCronJob) runJob(isStart bool, now time.Time) {
- job.StartRun = isStart
- job.times = append(job.times, now)
- manager.workers.Run(job, nil, nil)
- }
- func (job *SCronJob) runJobInWorker(isStart bool, startTime time.Time) {
- defer func() {
- if r := recover(); r != nil {
- log.Errorf("CronJob task %s run error: %s", job.Name, r)
- debug.PrintStack()
- yunionconf.BugReport.SendBugReport(context.Background(), version.GetShortString(), string(debug.Stack()), errors.Errorf("%s", r))
- }
- }()
- log.Debugf("Cron job: %s started, startTime: %s", job.Name, startTime.Format(time.RFC3339))
- ctx := context.Background()
- ctx = context.WithValue(ctx, appctx.APP_CONTEXT_KEY_APPNAME, fmt.Sprintf("%s/cron-service", consts.GetServiceName()))
- ctx = context.WithValue(ctx, appctx.APP_CONTEXT_KEY_TASKNAME, fmt.Sprintf("%s-%d", job.Name, time.Now().Unix()))
- userCred := DefaultAdminSessionGenerator()
- if job.job != nil {
- job.job(ctx, userCred, isStart)
- } else if job.jobWithStartTime != nil {
- job.jobWithStartTime(ctx, userCred, startTime, isStart)
- }
- }
|