jobs: add ability to stop async jobs

Depends on #3257
This commit is contained in:
Aleksandar Jankovic 2019-06-17 10:47:29 +02:00 committed by Nick Craig-Wood
parent f78cd1e043
commit 5935cb0a29
2 changed files with 92 additions and 16 deletions

View file

@ -23,7 +23,7 @@ type Job struct {
Success bool `json:"success"`
Duration float64 `json:"duration"`
Output Params `json:"output"`
Context context.Context `json:"-"`
Stop func() `json:"-"`
}
// Jobs describes a collection of running tasks
@ -117,23 +117,29 @@ func (job *Job) finish(out Params, err error) {
}
// run the job until completion writing the return status
func (job *Job) run(fn Func, in Params) {
func (job *Job) run(ctx context.Context, fn Func, in Params) {
defer func() {
if r := recover(); r != nil {
job.finish(nil, errors.Errorf("panic received: %v", r))
}
}()
job.finish(fn(job.Context, in))
job.finish(fn(ctx, in))
}
// NewJob start a new Job off
func (jobs *Jobs) NewJob(fn Func, in Params) *Job {
ctx, cancel := context.WithCancel(context.Background())
stop := func() {
cancel()
// Wait for cancel to propagate before returning.
<-ctx.Done()
}
job := &Job{
ID: atomic.AddInt64(&jobID, 1),
StartTime: time.Now(),
Context: context.Background(),
Stop: stop,
}
go job.run(fn, in)
go job.run(ctx, fn, in)
jobs.mu.Lock()
jobs.jobs[job.ID] = job
jobs.mu.Unlock()
@ -205,9 +211,37 @@ Results
})
}
// Returns the status of a job
// Returns list of job ids.
func rcJobList(ctx context.Context, in Params) (out Params, err error) {
out = make(Params)
out["jobids"] = running.IDs()
return out, nil
}
func init() {
Add(Call{
Path: "job/stop",
Fn: rcJobStop,
Title: "Stop the running job",
Help: `Parameters
- jobid - id of the job (integer)
`,
})
}
// Stops the running job.
func rcJobStop(ctx context.Context, in Params) (out Params, err error) {
jobID, err := in.GetInt64("jobid")
if err != nil {
return nil, err
}
job := running.Get(jobID)
if job == nil {
return nil, errors.New("job not found")
}
job.mu.Lock()
defer job.mu.Unlock()
out = make(Params)
job.Stop()
return out, nil
}

View file

@ -82,11 +82,17 @@ func TestJobsGet(t *testing.T) {
}
var longFn = func(ctx context.Context, in Params) (Params, error) {
// TODO get execution time from context?
time.Sleep(1 * time.Hour)
return nil, nil
}
var ctxFn = func(ctx context.Context, in Params) (Params, error) {
select {
case <-ctx.Done():
return nil, ctx.Err()
}
}
const (
sleepTime = 100 * time.Millisecond
floatSleepTime = float64(sleepTime) / 1E9 / 2
@ -184,7 +190,7 @@ func TestJobsNewJob(t *testing.T) {
job := jobs.NewJob(noopFn, Params{})
assert.Equal(t, int64(1), job.ID)
assert.Equal(t, job, jobs.Get(1))
assert.NotEmpty(t, job.Stop)
}
func TestStartJob(t *testing.T) {
@ -234,3 +240,39 @@ func TestRcJobList(t *testing.T) {
require.NotNil(t, out)
assert.Equal(t, Params{"jobids": []int64{1}}, out)
}
func TestRcJobStop(t *testing.T) {
jobID = 0
_, err := StartJob(ctxFn, Params{})
assert.NoError(t, err)
call := Calls.Get("job/stop")
assert.NotNil(t, call)
in := Params{"jobid": 1}
out, err := call.Fn(context.Background(), in)
require.NoError(t, err)
require.Empty(t, out)
in = Params{"jobid": 123123123}
_, err = call.Fn(context.Background(), in)
require.Error(t, err)
assert.Contains(t, err.Error(), "job not found")
in = Params{"jobidx": 123123123}
_, err = call.Fn(context.Background(), in)
require.Error(t, err)
assert.Contains(t, err.Error(), "Didn't find key")
time.Sleep(10 * time.Millisecond)
call = Calls.Get("job/status")
assert.NotNil(t, call)
in = Params{"jobid": 1}
out, err = call.Fn(context.Background(), in)
require.NoError(t, err)
require.NotNil(t, out)
assert.Equal(t, float64(1), out["id"])
assert.Equal(t, "context canceled", out["error"])
assert.Equal(t, true, out["finished"])
assert.Equal(t, false, out["success"])
}