From 38e70f1797c371611e514a6b7b6af79fc32e0648 Mon Sep 17 00:00:00 2001 From: Aleksandar Jankovic Date: Thu, 23 Jan 2020 12:44:56 +0100 Subject: [PATCH] rc/jobs: add listener for finished jobs Add jobs.OnFinish method to register listener that will trigger when job is finished. Includes fix for stopping listeners. --- fs/rc/jobs/job.go | 105 ++++++++++++++++++++++++++++------------- fs/rc/jobs/job_test.go | 51 ++++++++++++++++++++ 2 files changed, 123 insertions(+), 33 deletions(-) diff --git a/fs/rc/jobs/job.go b/fs/rc/jobs/job.go index d27a7788b..5296bfe70 100644 --- a/fs/rc/jobs/job.go +++ b/fs/rc/jobs/job.go @@ -29,6 +29,7 @@ type Job struct { Duration float64 `json:"duration"` Output rc.Params `json:"output"` Stop func() `json:"-"` + listeners []*func() // realErr is the Error before printing it as a string, it's used to return // the real error to the upper application layers while still printing the @@ -36,6 +37,62 @@ type Job struct { realErr error } +// mark the job as finished +func (job *Job) finish(out rc.Params, err error) { + job.mu.Lock() + job.EndTime = time.Now() + if out == nil { + out = make(rc.Params) + } + job.Output = out + job.Duration = job.EndTime.Sub(job.StartTime).Seconds() + if err != nil { + job.realErr = err + job.Error = err.Error() + job.Success = false + } else { + job.realErr = nil + job.Error = "" + job.Success = true + } + job.Finished = true + + // Notify listeners that the job is finished + for i := range job.listeners { + go (*job.listeners[i])() + } + + job.mu.Unlock() + running.kickExpire() // make sure this job gets expired +} + +func (job *Job) addListener(fn *func()) { + job.mu.Lock() + defer job.mu.Unlock() + job.listeners = append(job.listeners, fn) +} + +func (job *Job) removeListener(fn *func()) { + job.mu.Lock() + defer job.mu.Unlock() + for i, ln := range job.listeners { + if ln == fn { + job.listeners = append(job.listeners[:i], job.listeners[i+1:]...) + return + } + } +} + +// run the job until completion writing the return status +func (job *Job) run(ctx context.Context, fn rc.Func, in rc.Params) { + defer func() { + if r := recover(); r != nil { + job.finish(nil, errors.Errorf("panic received: %v \n%s", r, string(debug.Stack()))) + } + }() + job.finish(fn(ctx, in)) +} + // Jobs describes a collection of running tasks type Jobs struct { mu sync.RWMutex @@ -117,39 +174,6 @@ func (jobs *Jobs) Get(ID int64) *Job { return jobs.jobs[ID] } -// mark the job as finished -func (job *Job) finish(out rc.Params, err error) { - job.mu.Lock() - job.EndTime = time.Now() - if out == nil { - out = make(rc.Params) - } - job.Output = out - job.Duration = job.EndTime.Sub(job.StartTime).Seconds() - if err != nil { - job.realErr = err - job.Error = err.Error() - job.Success = false - } else { - job.realErr = nil - job.Error = "" - job.Success = true - } - job.Finished = true - job.mu.Unlock() - running.kickExpire() // make sure this job gets expired -} - -// run the job until completion writing the return status -func (job *Job) run(ctx context.Context, fn rc.Func, in rc.Params) { - defer func() { - if r := recover(); r != nil { - job.finish(nil, errors.Errorf("panic received: %v \n%s", r, string(debug.Stack()))) - } - }() - job.finish(fn(ctx, in)) -} - func getGroup(in rc.Params) string { // Check to see if the group is set group, err := in.GetString("_group") @@ -231,6 +255,21 @@ func ExecuteJob(ctx context.Context, fn rc.Func, in rc.Params) (rc.Params, int64 return job.Output, job.ID, job.realErr } +// OnFinish adds listener to jobid that will be triggered when job is finished. +// It returns a function to cancel listening. +func OnFinish(jobID int64, fn func()) (func(), error) { + job := running.Get(jobID) + if job == nil { + return func() {}, errors.New("job not found") + } + if job.Finished { + fn() + } else { + job.addListener(&fn) + } + return func() { job.removeListener(&fn) }, nil +} + func init() { rc.Add(rc.Call{ Path: "job/status", diff --git a/fs/rc/jobs/job_test.go b/fs/rc/jobs/job_test.go index 4126c32f9..4bcf40c28 100644 --- a/fs/rc/jobs/job_test.go +++ b/fs/rc/jobs/job_test.go @@ -102,6 +102,18 @@ var ctxFn = func(ctx context.Context, in rc.Params) (rc.Params, error) { } } +var ctxParmFn = func(paramCtx context.Context, returnError bool) func(ctx context.Context, in rc.Params) (rc.Params, error) { + return func(ctx context.Context, in rc.Params) (rc.Params, error) { + select { + case <-paramCtx.Done(): + if returnError { + return nil, ctx.Err() + } + return rc.Params{}, nil + } + } +} + const ( sleepTime = 100 * time.Millisecond floatSleepTime = float64(sleepTime) / 1e9 / 2 @@ -346,3 +358,42 @@ func TestRcSyncJobStop(t *testing.T) { assert.Equal(t, true, out["finished"]) assert.Equal(t, false, out["success"]) } + +func TestOnFinish(t *testing.T) { + jobID = 0 + done := make(chan struct{}) + ctx, cancel := context.WithCancel(context.Background()) + _, err := StartAsyncJob(ctxParmFn(ctx, false), rc.Params{}) + assert.NoError(t, err) + + stop, err := OnFinish(jobID, func() { close(done) }) + defer stop() + assert.NoError(t, err) + + cancel() + + select { + case <-done: + case <-time.After(time.Second): + t.Fatal("Timeout waiting for OnFinish to fire") + } +} + +func TestOnFinishAlreadyFinished(t *testing.T) { + jobID = 0 + done := make(chan struct{}) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + _, id, err := ExecuteJob(ctx, shortFn, rc.Params{}) + assert.NoError(t, err) + + stop, err := OnFinish(id, func() { close(done) }) + defer stop() + assert.NoError(t, err) + + select { + case <-done: + case <-time.After(time.Second): + t.Fatal("Timeout waiting for OnFinish to fire") + } +}