jobs: add ability to stop group
Adds new rc call to stop all running jobs in a group. Fixes #5561
This commit is contained in:
parent
8c02fe7b89
commit
4a4379b312
2 changed files with 73 additions and 0 deletions
|
@ -406,3 +406,34 @@ func rcJobStop(ctx context.Context, in rc.Params) (out rc.Params, err error) {
|
||||||
job.Stop()
|
job.Stop()
|
||||||
return out, nil
|
return out, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func init() {
|
||||||
|
rc.Add(rc.Call{
|
||||||
|
Path: "job/stopgroup",
|
||||||
|
Fn: rcGroupStop,
|
||||||
|
Title: "Stop all running jobs in a group",
|
||||||
|
Help: `Parameters:
|
||||||
|
|
||||||
|
- group - name of the group (string).
|
||||||
|
`,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
// Stops all running jobs in a group
|
||||||
|
func rcGroupStop(ctx context.Context, in rc.Params) (out rc.Params, err error) {
|
||||||
|
group, err := in.GetString("group")
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
running.mu.RLock()
|
||||||
|
defer running.mu.RUnlock()
|
||||||
|
for _, job := range running.jobs {
|
||||||
|
if job.Group == group {
|
||||||
|
job.mu.Lock()
|
||||||
|
job.Stop()
|
||||||
|
job.mu.Unlock()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
out = make(rc.Params)
|
||||||
|
return out, nil
|
||||||
|
}
|
||||||
|
|
|
@ -452,6 +452,48 @@ func TestRcSyncJobStop(t *testing.T) {
|
||||||
assert.Equal(t, false, out["success"])
|
assert.Equal(t, false, out["success"])
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestRcJobStopGroup(t *testing.T) {
|
||||||
|
ctx := context.Background()
|
||||||
|
jobID = 0
|
||||||
|
_, _, err := NewJob(ctx, ctxFn, rc.Params{
|
||||||
|
"_async": true,
|
||||||
|
"_group": "myparty",
|
||||||
|
})
|
||||||
|
require.NoError(t, err)
|
||||||
|
_, _, err = NewJob(ctx, ctxFn, rc.Params{
|
||||||
|
"_async": true,
|
||||||
|
"_group": "myparty",
|
||||||
|
})
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
call := rc.Calls.Get("job/stopgroup")
|
||||||
|
assert.NotNil(t, call)
|
||||||
|
in := rc.Params{"group": "myparty"}
|
||||||
|
out, err := call.Fn(context.Background(), in)
|
||||||
|
require.NoError(t, err)
|
||||||
|
require.Empty(t, out)
|
||||||
|
|
||||||
|
in = rc.Params{}
|
||||||
|
_, err = call.Fn(context.Background(), in)
|
||||||
|
require.Error(t, err)
|
||||||
|
assert.Contains(t, err.Error(), "Didn't find key")
|
||||||
|
|
||||||
|
time.Sleep(10 * time.Millisecond)
|
||||||
|
|
||||||
|
call = rc.Calls.Get("job/status")
|
||||||
|
assert.NotNil(t, call)
|
||||||
|
for i := 1; i <= 2; i++ {
|
||||||
|
in = rc.Params{"jobid": i}
|
||||||
|
out, err = call.Fn(context.Background(), in)
|
||||||
|
require.NoError(t, err)
|
||||||
|
require.NotNil(t, out)
|
||||||
|
assert.Equal(t, "myparty", out["group"])
|
||||||
|
assert.Equal(t, "context canceled", out["error"])
|
||||||
|
assert.Equal(t, true, out["finished"])
|
||||||
|
assert.Equal(t, false, out["success"])
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func TestOnFinish(t *testing.T) {
|
func TestOnFinish(t *testing.T) {
|
||||||
jobID = 0
|
jobID = 0
|
||||||
done := make(chan struct{})
|
done := make(chan struct{})
|
||||||
|
|
Loading…
Reference in a new issue