From 1f5a29209ef3e5ecb67ed7a128cde8e2c4b9d7bb Mon Sep 17 00:00:00 2001 From: Nick Craig-Wood Date: Sun, 11 Jun 2023 15:23:39 +0100 Subject: [PATCH] rc: add Job to ctx so it can be used elsewhere See: https://forum.rclone.org/t/uploads-start-repeatedly-failing-after-a-while-using-rc-sync-copy-vs-rclone-copy-for-dropbox/38873/ --- fs/rc/jobs/job.go | 41 ++++++++++++++++++++++++++++++++++++----- fs/rc/jobs/job_test.go | 9 +++++++++ 2 files changed, 45 insertions(+), 5 deletions(-) diff --git a/fs/rc/jobs/job.go b/fs/rc/jobs/job.go index 613093e25..1501e365b 100644 --- a/fs/rc/jobs/job.go +++ b/fs/rc/jobs/job.go @@ -83,6 +83,17 @@ func (job *Job) removeListener(fn *func()) { } } +// OnFinish adds listener to job that will be triggered when job is finished. +// It returns a function to cancel listening. +func (job *Job) OnFinish(fn func()) func() { + if job.Finished { + fn() + } else { + job.addListener(&fn) + } + return func() { job.removeListener(&fn) } +} + // run the job until completion writing the return status func (job *Job) run(ctx context.Context, fn rc.Func, in rc.Params) { defer func() { @@ -237,6 +248,11 @@ func getFilter(ctx context.Context, in rc.Params) (context.Context, error) { return ctx, nil } +type jobKeyType struct{} + +// Key for adding jobs to ctx +var jobKey = jobKeyType{} + // NewJob creates a Job and executes it, possibly in the background if _async is set func (jobs *Jobs) NewJob(ctx context.Context, fn rc.Func, in rc.Params) (job *Job, out rc.Params, err error) { id := atomic.AddInt64(&jobID, 1) @@ -274,9 +290,14 @@ func (jobs *Jobs) NewJob(ctx context.Context, fn rc.Func, in rc.Params) (job *Jo StartTime: time.Now(), Stop: stop, } + jobs.mu.Lock() jobs.jobs[job.ID] = job jobs.mu.Unlock() + + // Add the job to the context + ctx = context.WithValue(ctx, jobKey, job) + if isAsync { go job.run(ctx, fn, in) out = make(rc.Params) @@ -303,12 +324,22 @@ func OnFinish(jobID int64, fn func()) (func(), error) { if job == nil { return func() {}, errors.New("job not found") } - if job.Finished { - fn() - } else { - job.addListener(&fn) + return job.OnFinish(fn), nil +} + +// GetJob gets the Job from the context if possible +func GetJob(ctx context.Context) (job *Job, ok bool) { + job, ok = ctx.Value(jobKey).(*Job) + return job, ok +} + +// GetJobID gets the Job from the context if possible +func GetJobID(ctx context.Context) (jobID int64, ok bool) { + job, ok := GetJob(ctx) + if !ok { + return -1, ok } - return func() { job.removeListener(&fn) }, nil + return job.ID, true } func init() { diff --git a/fs/rc/jobs/job_test.go b/fs/rc/jobs/job_test.go index 7552465de..2fa6ff4f3 100644 --- a/fs/rc/jobs/job_test.go +++ b/fs/rc/jobs/job_test.go @@ -44,13 +44,22 @@ func TestJobsExpire(t *testing.T) { jobs := newJobs() jobs.opt.JobExpireInterval = time.Millisecond assert.Equal(t, false, jobs.expireRunning) + var gotJobID int64 + var gotJob *Job job, out, err := jobs.NewJob(ctx, func(ctx context.Context, in rc.Params) (rc.Params, error) { defer close(wait) + var ok bool + gotJobID, ok = GetJobID(ctx) + assert.True(t, ok) + gotJob, ok = GetJob(ctx) + assert.True(t, ok) return in, nil }, rc.Params{"_async": true}) require.NoError(t, err) assert.Equal(t, 1, len(out)) <-wait + assert.Equal(t, job.ID, gotJobID, "check can get JobID from ctx") + assert.Equal(t, job, gotJob, "check can get Job from ctx") assert.Equal(t, 1, len(jobs.jobs)) jobs.Expire() assert.Equal(t, 1, len(jobs.jobs))