rc/jobs: make job expiry timeouts configurable

This commit is contained in:
Aleksandar Jankovic 2019-06-13 14:04:59 +02:00 committed by Nick Craig-Wood
parent 22368b997c
commit 93207ead9c
5 changed files with 19 additions and 10 deletions

View file

@ -85,6 +85,14 @@ style.
Default Off. Default Off.
### --rc-job-expire-duration=DURATION
Expire finished async jobs older than DURATION (default 60s).
### --rc-job-expire-interval=DURATION
Interval duration to check for expired async jobs (default 10s).
### --rc-no-auth ### --rc-no-auth
By default rclone will require authorisation to have been set up on By default rclone will require authorisation to have been set up on

View file

@ -96,6 +96,8 @@ type ConfigInfo struct {
ClientKey string // Client Side Key ClientKey string // Client Side Key
MultiThreadCutoff SizeSuffix MultiThreadCutoff SizeSuffix
MultiThreadStreams int MultiThreadStreams int
RcJobExpireDuration time.Duration
RcJobExpireInterval time.Duration
} }
// NewConfig creates a new config with everything set to the default // NewConfig creates a new config with everything set to the default
@ -129,6 +131,8 @@ func NewConfig() *ConfigInfo {
// c.StatsOneLineDateFormat = "2006/01/02 15:04:05 - " // c.StatsOneLineDateFormat = "2006/01/02 15:04:05 - "
c.MultiThreadCutoff = SizeSuffix(250 * 1024 * 1024) c.MultiThreadCutoff = SizeSuffix(250 * 1024 * 1024)
c.MultiThreadStreams = 4 c.MultiThreadStreams = 4
c.RcJobExpireDuration = 60 * time.Second
c.RcJobExpireInterval = 10 * time.Second
return c return c
} }

View file

@ -98,6 +98,8 @@ func AddFlags(flagSet *pflag.FlagSet) {
flags.StringVarP(flagSet, &fs.Config.ClientKey, "client-key", "", fs.Config.ClientKey, "Client SSL private key (PEM) for mutual TLS auth") flags.StringVarP(flagSet, &fs.Config.ClientKey, "client-key", "", fs.Config.ClientKey, "Client SSL private key (PEM) for mutual TLS auth")
flags.FVarP(flagSet, &fs.Config.MultiThreadCutoff, "multi-thread-cutoff", "", "Use multi-thread downloads for files above this size.") flags.FVarP(flagSet, &fs.Config.MultiThreadCutoff, "multi-thread-cutoff", "", "Use multi-thread downloads for files above this size.")
flags.IntVarP(flagSet, &fs.Config.MultiThreadStreams, "multi-thread-streams", "", fs.Config.MultiThreadStreams, "Max number of streams to use for multi-thread downloads.") flags.IntVarP(flagSet, &fs.Config.MultiThreadStreams, "multi-thread-streams", "", fs.Config.MultiThreadStreams, "Max number of streams to use for multi-thread downloads.")
flags.DurationVarP(flagSet, &fs.Config.RcJobExpireDuration, "rc-job-expire-duration", "", fs.Config.RcJobExpireDuration, "expire finished async jobs older than this value")
flags.DurationVarP(flagSet, &fs.Config.RcJobExpireInterval, "rc-job-expire-interval", "", fs.Config.RcJobExpireInterval, "interval to check for expired async jobs")
} }
// SetFlags converts any flags into config which weren't straight forward // SetFlags converts any flags into config which weren't straight forward

View file

@ -7,16 +7,10 @@ import (
"sync/atomic" "sync/atomic"
"time" "time"
"github.com/ncw/rclone/fs"
"github.com/pkg/errors" "github.com/pkg/errors"
) )
const (
// expire the job when it is finished and older than this
expireDuration = 60 * time.Second
// inteval to run the expire cache
expireInterval = 10 * time.Second
)
// Job describes a asynchronous task started via the rc package // Job describes a asynchronous task started via the rc package
type Job struct { type Job struct {
mu sync.Mutex mu sync.Mutex
@ -47,7 +41,7 @@ var (
func newJobs() *Jobs { func newJobs() *Jobs {
return &Jobs{ return &Jobs{
jobs: map[int64]*Job{}, jobs: map[int64]*Job{},
expireInterval: expireInterval, expireInterval: fs.Config.RcJobExpireInterval,
} }
} }
@ -68,7 +62,7 @@ func (jobs *Jobs) Expire() {
now := time.Now() now := time.Now()
for ID, job := range jobs.jobs { for ID, job := range jobs.jobs {
job.mu.Lock() job.mu.Lock()
if job.Finished && now.Sub(job.EndTime) > expireDuration { if job.Finished && now.Sub(job.EndTime) > fs.Config.RcJobExpireDuration {
delete(jobs.jobs, ID) delete(jobs.jobs, ID)
} }
job.mu.Unlock() job.mu.Unlock()

View file

@ -5,6 +5,7 @@ import (
"testing" "testing"
"time" "time"
"github.com/ncw/rclone/fs"
"github.com/pkg/errors" "github.com/pkg/errors"
"github.com/stretchr/testify/assert" "github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
@ -44,7 +45,7 @@ func TestJobsExpire(t *testing.T) {
assert.Equal(t, 1, len(jobs.jobs)) assert.Equal(t, 1, len(jobs.jobs))
jobs.mu.Lock() jobs.mu.Lock()
job.mu.Lock() job.mu.Lock()
job.EndTime = time.Now().Add(-expireDuration - 60*time.Second) job.EndTime = time.Now().Add(-fs.Config.RcJobExpireDuration - 60*time.Second)
assert.Equal(t, true, jobs.expireRunning) assert.Equal(t, true, jobs.expireRunning)
job.mu.Unlock() job.mu.Unlock()
jobs.mu.Unlock() jobs.mu.Unlock()