rc: factor out duplicate code in job creation

This commit is contained in:
Nick Craig-Wood 2020-12-11 19:10:54 +00:00
parent ae3963e4b4
commit a0fc10e41a

View file

@ -184,15 +184,15 @@ func getGroup(in rc.Params) string {
return group
}
// NewAsyncJob start a new asynchronous Job off
func (jobs *Jobs) NewAsyncJob(fn rc.Func, in rc.Params) *Job {
// NewJob creates a Job ready to be executed
func (jobs *Jobs) NewJob(ctx context.Context, in rc.Params) (*Job, context.Context) {
id := atomic.AddInt64(&jobID, 1)
group := getGroup(in)
if group == "" {
group = fmt.Sprintf("job/%d", id)
}
ctx := accounting.WithStatsGroup(context.Background(), group)
ctx = accounting.WithStatsGroup(ctx, group)
ctx, cancel := context.WithCancel(ctx)
stop := func() {
cancel()
@ -208,36 +208,16 @@ func (jobs *Jobs) NewAsyncJob(fn rc.Func, in rc.Params) *Job {
jobs.mu.Lock()
jobs.jobs[job.ID] = job
jobs.mu.Unlock()
go job.run(ctx, fn, in)
return job
}
// NewSyncJob start a new synchronous Job off
func (jobs *Jobs) NewSyncJob(ctx context.Context, in rc.Params) (*Job, context.Context) {
id := atomic.AddInt64(&jobID, 1)
group := getGroup(in)
if group == "" {
group = fmt.Sprintf("job/%d", id)
}
ctxG := accounting.WithStatsGroup(ctx, fmt.Sprintf("job/%d", id))
ctx, cancel := context.WithCancel(ctxG)
stop := func() {
cancel()
// Wait for cancel to propagate before returning.
<-ctx.Done()
}
job := &Job{
ID: id,
Group: group,
StartTime: time.Now(),
Stop: stop,
}
jobs.mu.Lock()
jobs.jobs[job.ID] = job
jobs.mu.Unlock()
return job, ctx
}
// NewAsyncJob start a new asynchronous Job off
func (jobs *Jobs) NewAsyncJob(fn rc.Func, in rc.Params) *Job {
job, ctx := jobs.NewJob(context.Background(), in)
go job.run(ctx, fn, in)
return job
}
// StartAsyncJob starts a new job asynchronously and returns a Param suitable
// for output.
func StartAsyncJob(fn rc.Func, in rc.Params) (rc.Params, error) {
@ -250,7 +230,7 @@ func StartAsyncJob(fn rc.Func, in rc.Params) (rc.Params, error) {
// ExecuteJob executes new job synchronously and returns a Param suitable for
// output.
func ExecuteJob(ctx context.Context, fn rc.Func, in rc.Params) (rc.Params, int64, error) {
job, ctx := running.NewSyncJob(ctx, in)
job, ctx := running.NewJob(ctx, in)
job.run(ctx, fn, in)
return job.Output, job.ID, job.realErr
}