// Manage background jobs that the rc is running package rc import ( "context" "sync" "sync/atomic" "time" "github.com/ncw/rclone/fs" "github.com/pkg/errors" ) // Job describes a asynchronous task started via the rc package type Job struct { mu sync.Mutex ID int64 `json:"id"` StartTime time.Time `json:"startTime"` EndTime time.Time `json:"endTime"` Error string `json:"error"` Finished bool `json:"finished"` Success bool `json:"success"` Duration float64 `json:"duration"` Output Params `json:"output"` Stop func() `json:"-"` } // Jobs describes a collection of running tasks type Jobs struct { mu sync.RWMutex jobs map[int64]*Job expireInterval time.Duration expireRunning bool } var ( running = newJobs() jobID = int64(0) ) // newJobs makes a new Jobs structure func newJobs() *Jobs { return &Jobs{ jobs: map[int64]*Job{}, expireInterval: fs.Config.RcJobExpireInterval, } } // kickExpire makes sure Expire is running func (jobs *Jobs) kickExpire() { jobs.mu.Lock() defer jobs.mu.Unlock() if !jobs.expireRunning { time.AfterFunc(jobs.expireInterval, jobs.Expire) jobs.expireRunning = true } } // Expire expires any jobs that haven't been collected func (jobs *Jobs) Expire() { jobs.mu.Lock() defer jobs.mu.Unlock() now := time.Now() for ID, job := range jobs.jobs { job.mu.Lock() if job.Finished && now.Sub(job.EndTime) > fs.Config.RcJobExpireDuration { delete(jobs.jobs, ID) } job.mu.Unlock() } if len(jobs.jobs) != 0 { time.AfterFunc(jobs.expireInterval, jobs.Expire) jobs.expireRunning = true } else { jobs.expireRunning = false } } // IDs returns the IDs of the running jobs func (jobs *Jobs) IDs() (IDs []int64) { jobs.mu.RLock() defer jobs.mu.RUnlock() IDs = []int64{} for ID := range jobs.jobs { IDs = append(IDs, ID) } return IDs } // Get a job with a given ID or nil if it doesn't exist func (jobs *Jobs) Get(ID int64) *Job { jobs.mu.RLock() defer jobs.mu.RUnlock() return jobs.jobs[ID] } // mark the job as finished func (job *Job) finish(out Params, err error) { job.mu.Lock() job.EndTime = time.Now() if out == nil { out = make(Params) } job.Output = out job.Duration = job.EndTime.Sub(job.StartTime).Seconds() if err != nil { job.Error = err.Error() job.Success = false } else { job.Error = "" job.Success = true } job.Finished = true job.mu.Unlock() running.kickExpire() // make sure this job gets expired } // run the job until completion writing the return status func (job *Job) run(ctx context.Context, fn Func, in Params) { defer func() { if r := recover(); r != nil { job.finish(nil, errors.Errorf("panic received: %v", r)) } }() job.finish(fn(ctx, in)) } // NewJob start a new Job off func (jobs *Jobs) NewJob(fn Func, in Params) *Job { ctx, cancel := context.WithCancel(context.Background()) stop := func() { cancel() // Wait for cancel to propagate before returning. <-ctx.Done() } job := &Job{ ID: atomic.AddInt64(&jobID, 1), StartTime: time.Now(), Stop: stop, } go job.run(ctx, fn, in) jobs.mu.Lock() jobs.jobs[job.ID] = job jobs.mu.Unlock() return job } // StartJob starts a new job and returns a Param suitable for output func StartJob(fn Func, in Params) (Params, error) { job := running.NewJob(fn, in) out := make(Params) out["jobid"] = job.ID return out, nil } func init() { Add(Call{ Path: "job/status", Fn: rcJobStatus, Title: "Reads the status of the job ID", Help: `Parameters - jobid - id of the job (integer) Results - finished - boolean - duration - time in seconds that the job ran for - endTime - time the job finished (eg "2018-10-26T18:50:20.528746884+01:00") - error - error from the job or empty string for no error - finished - boolean whether the job has finished or not - id - as passed in above - startTime - time the job started (eg "2018-10-26T18:50:20.528336039+01:00") - success - boolean - true for success false otherwise - output - output of the job as would have been returned if called synchronously - progress - output of the progress related to the underlying job `, }) } // Returns the status of a job func rcJobStatus(ctx context.Context, in Params) (out Params, err error) { jobID, err := in.GetInt64("jobid") if err != nil { return nil, err } job := running.Get(jobID) if job == nil { return nil, errors.New("job not found") } job.mu.Lock() defer job.mu.Unlock() out = make(Params) err = Reshape(&out, job) if err != nil { return nil, errors.Wrap(err, "reshape failed in job status") } return out, nil } func init() { Add(Call{ Path: "job/list", Fn: rcJobList, Title: "Lists the IDs of the running jobs", Help: `Parameters - None Results - jobids - array of integer job ids `, }) } // Returns list of job ids. func rcJobList(ctx context.Context, in Params) (out Params, err error) { out = make(Params) out["jobids"] = running.IDs() return out, nil } func init() { Add(Call{ Path: "job/stop", Fn: rcJobStop, Title: "Stop the running job", Help: `Parameters - jobid - id of the job (integer) `, }) } // Stops the running job. func rcJobStop(ctx context.Context, in Params) (out Params, err error) { jobID, err := in.GetInt64("jobid") if err != nil { return nil, err } job := running.Get(jobID) if job == nil { return nil, errors.New("job not found") } job.mu.Lock() defer job.mu.Unlock() out = make(Params) job.Stop() return out, nil }