From 3ecbd603ab5efdaab8a516c7d7280f3a4bb9388e Mon Sep 17 00:00:00 2001 From: Nick Craig-Wood Date: Sat, 10 Aug 2019 17:12:22 +0100 Subject: [PATCH] rc: move job expire flags to rc to fix initalization problem See: https://forum.rclone.org/t/rc-rc-job-expire-interval-bug/11188 rclone was ignoring the --rc-job-expire-duration and --rc-job-interval flags. This turned out to be an initialization order problem and was fixed by moving those flags out of global config into rc config. --- fs/config.go | 4 ---- fs/config/configflags/configflags.go | 2 -- fs/rc/jobs/job.go | 23 ++++++++++++++--------- fs/rc/jobs/job_test.go | 8 ++++---- fs/rc/rc.go | 10 +++++++--- fs/rc/rcflags/rcflags.go | 2 ++ fs/rc/rcserver/rcserver.go | 1 + 7 files changed, 28 insertions(+), 22 deletions(-) diff --git a/fs/config.go b/fs/config.go index c94da532d..b08dcc2c9 100644 --- a/fs/config.go +++ b/fs/config.go @@ -100,8 +100,6 @@ type ConfigInfo struct { ClientKey string // Client Side Key MultiThreadCutoff SizeSuffix MultiThreadStreams int - RcJobExpireDuration time.Duration - RcJobExpireInterval time.Duration } // NewConfig creates a new config with everything set to the default @@ -136,8 +134,6 @@ func NewConfig() *ConfigInfo { // c.StatsOneLineDateFormat = "2006/01/02 15:04:05 - " c.MultiThreadCutoff = SizeSuffix(250 * 1024 * 1024) c.MultiThreadStreams = 4 - c.RcJobExpireDuration = 60 * time.Second - c.RcJobExpireInterval = 10 * time.Second return c } diff --git a/fs/config/configflags/configflags.go b/fs/config/configflags/configflags.go index b81be4a67..e1670a892 100644 --- a/fs/config/configflags/configflags.go +++ b/fs/config/configflags/configflags.go @@ -103,8 +103,6 @@ func AddFlags(flagSet *pflag.FlagSet) { flags.StringVarP(flagSet, &fs.Config.ClientKey, "client-key", "", fs.Config.ClientKey, "Client SSL private key (PEM) for mutual TLS auth") flags.FVarP(flagSet, &fs.Config.MultiThreadCutoff, "multi-thread-cutoff", "", "Use multi-thread downloads for files above this size.") flags.IntVarP(flagSet, &fs.Config.MultiThreadStreams, "multi-thread-streams", "", fs.Config.MultiThreadStreams, "Max number of streams to use for multi-thread downloads.") - flags.DurationVarP(flagSet, &fs.Config.RcJobExpireDuration, "rc-job-expire-duration", "", fs.Config.RcJobExpireDuration, "expire finished async jobs older than this value") - flags.DurationVarP(flagSet, &fs.Config.RcJobExpireInterval, "rc-job-expire-interval", "", fs.Config.RcJobExpireInterval, "interval to check for expired async jobs") flags.BoolVarP(flagSet, &fs.Config.UseJSONLog, "use-json-log", "", fs.Config.UseJSONLog, "Use json log format.") } diff --git a/fs/rc/jobs/job.go b/fs/rc/jobs/job.go index 887b7d805..b2c18da5b 100644 --- a/fs/rc/jobs/job.go +++ b/fs/rc/jobs/job.go @@ -35,10 +35,10 @@ type Job struct { // Jobs describes a collection of running tasks type Jobs struct { - mu sync.RWMutex - jobs map[int64]*Job - expireInterval time.Duration - expireRunning bool + mu sync.RWMutex + jobs map[int64]*Job + opt *rc.Options + expireRunning bool } var ( @@ -49,17 +49,22 @@ var ( // newJobs makes a new Jobs structure func newJobs() *Jobs { return &Jobs{ - jobs: map[int64]*Job{}, - expireInterval: fs.Config.RcJobExpireInterval, + jobs: map[int64]*Job{}, + opt: &rc.DefaultOpt, } } +// SetOpt sets the options when they are known +func SetOpt(opt *rc.Options) { + running.opt = opt +} + // kickExpire makes sure Expire is running func (jobs *Jobs) kickExpire() { jobs.mu.Lock() defer jobs.mu.Unlock() if !jobs.expireRunning { - time.AfterFunc(jobs.expireInterval, jobs.Expire) + time.AfterFunc(jobs.opt.JobExpireInterval, jobs.Expire) jobs.expireRunning = true } } @@ -71,13 +76,13 @@ func (jobs *Jobs) Expire() { now := time.Now() for ID, job := range jobs.jobs { job.mu.Lock() - if job.Finished && now.Sub(job.EndTime) > fs.Config.RcJobExpireDuration { + if job.Finished && now.Sub(job.EndTime) > jobs.opt.JobExpireDuration { delete(jobs.jobs, ID) } job.mu.Unlock() } if len(jobs.jobs) != 0 { - time.AfterFunc(jobs.expireInterval, jobs.Expire) + time.AfterFunc(jobs.opt.JobExpireInterval, jobs.Expire) jobs.expireRunning = true } else { jobs.expireRunning = false diff --git a/fs/rc/jobs/job_test.go b/fs/rc/jobs/job_test.go index 99b779432..3125c6cf4 100644 --- a/fs/rc/jobs/job_test.go +++ b/fs/rc/jobs/job_test.go @@ -7,8 +7,8 @@ import ( "time" "github.com/pkg/errors" - "github.com/rclone/rclone/fs" "github.com/rclone/rclone/fs/rc" + "github.com/rclone/rclone/fs/rc/rcflags" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" ) @@ -20,7 +20,7 @@ func TestNewJobs(t *testing.T) { func TestJobsKickExpire(t *testing.T) { jobs := newJobs() - jobs.expireInterval = time.Millisecond + jobs.opt.JobExpireInterval = time.Millisecond assert.Equal(t, false, jobs.expireRunning) jobs.kickExpire() jobs.mu.Lock() @@ -35,7 +35,7 @@ func TestJobsKickExpire(t *testing.T) { func TestJobsExpire(t *testing.T) { wait := make(chan struct{}) jobs := newJobs() - jobs.expireInterval = time.Millisecond + jobs.opt.JobExpireInterval = time.Millisecond assert.Equal(t, false, jobs.expireRunning) job := jobs.NewAsyncJob(func(ctx context.Context, in rc.Params) (rc.Params, error) { defer close(wait) @@ -47,7 +47,7 @@ func TestJobsExpire(t *testing.T) { assert.Equal(t, 1, len(jobs.jobs)) jobs.mu.Lock() job.mu.Lock() - job.EndTime = time.Now().Add(-fs.Config.RcJobExpireDuration - 60*time.Second) + job.EndTime = time.Now().Add(-rcflags.Opt.JobExpireDuration - 60*time.Second) assert.Equal(t, true, jobs.expireRunning) job.mu.Unlock() jobs.mu.Unlock() diff --git a/fs/rc/rc.go b/fs/rc/rc.go index 5159aea2b..a6164d9d7 100644 --- a/fs/rc/rc.go +++ b/fs/rc/rc.go @@ -11,6 +11,7 @@ import ( "encoding/json" "io" _ "net/http/pprof" // install the pprof http handlers + "time" "github.com/rclone/rclone/cmd/serve/httplib" ) @@ -26,13 +27,16 @@ type Options struct { WebGUIUpdate bool // set to download new update WebGUIFetchURL string // set the default url for fetching webgui AccessControlAllowOrigin string // set the access control for CORS configuration - + JobExpireDuration time.Duration + JobExpireInterval time.Duration } // DefaultOpt is the default values used for Options var DefaultOpt = Options{ - HTTPOptions: httplib.DefaultOpt, - Enabled: false, + HTTPOptions: httplib.DefaultOpt, + Enabled: false, + JobExpireDuration: 60 * time.Second, + JobExpireInterval: 10 * time.Second, } func init() { diff --git a/fs/rc/rcflags/rcflags.go b/fs/rc/rcflags/rcflags.go index 1a996e04d..0bf75c983 100644 --- a/fs/rc/rcflags/rcflags.go +++ b/fs/rc/rcflags/rcflags.go @@ -24,5 +24,7 @@ func AddFlags(flagSet *pflag.FlagSet) { flags.BoolVarP(flagSet, &Opt.WebGUIUpdate, "rc-web-gui-update", "", false, "Update / Force update to latest version of web gui") flags.StringVarP(flagSet, &Opt.WebGUIFetchURL, "rc-web-fetch-url", "", "https://api.github.com/repos/rclone/rclone-webui-react/releases/latest", "URL to fetch the releases for webgui.") flags.StringVarP(flagSet, &Opt.AccessControlAllowOrigin, "rc-allow-origin", "", "", "Set the allowed origin for CORS.") + flags.DurationVarP(flagSet, &Opt.JobExpireDuration, "rc-job-expire-duration", "", Opt.JobExpireDuration, "expire finished async jobs older than this value") + flags.DurationVarP(flagSet, &Opt.JobExpireInterval, "rc-job-expire-interval", "", Opt.JobExpireInterval, "interval to check for expired async jobs") httpflags.AddFlagsPrefix(flagSet, "rc-", &Opt.HTTPOptions) } diff --git a/fs/rc/rcserver/rcserver.go b/fs/rc/rcserver/rcserver.go index 646eaa649..d1e635f73 100644 --- a/fs/rc/rcserver/rcserver.go +++ b/fs/rc/rcserver/rcserver.go @@ -30,6 +30,7 @@ import ( // // If the server wasn't configured the *Server returned may be nil func Start(opt *rc.Options) (*Server, error) { + jobs.SetOpt(opt) // set the defaults for jobs if opt.Enabled { // Serve on the DefaultServeMux so can have global registrations appear s := newServer(opt, http.DefaultServeMux)