diff --git a/fs/rc/job.go b/fs/rc/job.go index 42ec47dbe..98f0c7ce5 100644 --- a/fs/rc/job.go +++ b/fs/rc/job.go @@ -15,15 +15,15 @@ import ( // Job describes a asynchronous task started via the rc package type Job struct { mu sync.Mutex - ID int64 `json:"id"` - StartTime time.Time `json:"startTime"` - EndTime time.Time `json:"endTime"` - Error string `json:"error"` - Finished bool `json:"finished"` - Success bool `json:"success"` - Duration float64 `json:"duration"` - Output Params `json:"output"` - Context context.Context `json:"-"` + ID int64 `json:"id"` + StartTime time.Time `json:"startTime"` + EndTime time.Time `json:"endTime"` + Error string `json:"error"` + Finished bool `json:"finished"` + Success bool `json:"success"` + Duration float64 `json:"duration"` + Output Params `json:"output"` + Stop func() `json:"-"` } // Jobs describes a collection of running tasks @@ -117,23 +117,29 @@ func (job *Job) finish(out Params, err error) { } // run the job until completion writing the return status -func (job *Job) run(fn Func, in Params) { +func (job *Job) run(ctx context.Context, fn Func, in Params) { defer func() { if r := recover(); r != nil { job.finish(nil, errors.Errorf("panic received: %v", r)) } }() - job.finish(fn(job.Context, in)) + job.finish(fn(ctx, in)) } // NewJob start a new Job off func (jobs *Jobs) NewJob(fn Func, in Params) *Job { + ctx, cancel := context.WithCancel(context.Background()) + stop := func() { + cancel() + // Wait for cancel to propagate before returning. + <-ctx.Done() + } job := &Job{ ID: atomic.AddInt64(&jobID, 1), StartTime: time.Now(), - Context: context.Background(), + Stop: stop, } - go job.run(fn, in) + go job.run(ctx, fn, in) jobs.mu.Lock() jobs.jobs[job.ID] = job jobs.mu.Unlock() @@ -205,9 +211,37 @@ Results }) } -// Returns the status of a job +// Returns list of job ids. func rcJobList(ctx context.Context, in Params) (out Params, err error) { out = make(Params) out["jobids"] = running.IDs() return out, nil } + +func init() { + Add(Call{ + Path: "job/stop", + Fn: rcJobStop, + Title: "Stop the running job", + Help: `Parameters +- jobid - id of the job (integer) +`, + }) +} + +// Stops the running job. +func rcJobStop(ctx context.Context, in Params) (out Params, err error) { + jobID, err := in.GetInt64("jobid") + if err != nil { + return nil, err + } + job := running.Get(jobID) + if job == nil { + return nil, errors.New("job not found") + } + job.mu.Lock() + defer job.mu.Unlock() + out = make(Params) + job.Stop() + return out, nil +} diff --git a/fs/rc/job_test.go b/fs/rc/job_test.go index 75eb6e240..8c4c77838 100644 --- a/fs/rc/job_test.go +++ b/fs/rc/job_test.go @@ -82,11 +82,17 @@ func TestJobsGet(t *testing.T) { } var longFn = func(ctx context.Context, in Params) (Params, error) { - // TODO get execution time from context? time.Sleep(1 * time.Hour) return nil, nil } +var ctxFn = func(ctx context.Context, in Params) (Params, error) { + select { + case <-ctx.Done(): + return nil, ctx.Err() + } +} + const ( sleepTime = 100 * time.Millisecond floatSleepTime = float64(sleepTime) / 1E9 / 2 @@ -184,7 +190,7 @@ func TestJobsNewJob(t *testing.T) { job := jobs.NewJob(noopFn, Params{}) assert.Equal(t, int64(1), job.ID) assert.Equal(t, job, jobs.Get(1)) - + assert.NotEmpty(t, job.Stop) } func TestStartJob(t *testing.T) { @@ -234,3 +240,39 @@ func TestRcJobList(t *testing.T) { require.NotNil(t, out) assert.Equal(t, Params{"jobids": []int64{1}}, out) } + +func TestRcJobStop(t *testing.T) { + jobID = 0 + _, err := StartJob(ctxFn, Params{}) + assert.NoError(t, err) + + call := Calls.Get("job/stop") + assert.NotNil(t, call) + in := Params{"jobid": 1} + out, err := call.Fn(context.Background(), in) + require.NoError(t, err) + require.Empty(t, out) + + in = Params{"jobid": 123123123} + _, err = call.Fn(context.Background(), in) + require.Error(t, err) + assert.Contains(t, err.Error(), "job not found") + + in = Params{"jobidx": 123123123} + _, err = call.Fn(context.Background(), in) + require.Error(t, err) + assert.Contains(t, err.Error(), "Didn't find key") + + time.Sleep(10 * time.Millisecond) + + call = Calls.Get("job/status") + assert.NotNil(t, call) + in = Params{"jobid": 1} + out, err = call.Fn(context.Background(), in) + require.NoError(t, err) + require.NotNil(t, out) + assert.Equal(t, float64(1), out["id"]) + assert.Equal(t, "context canceled", out["error"]) + assert.Equal(t, true, out["finished"]) + assert.Equal(t, false, out["success"]) +}