rc: move job expire flags to rc to fix initalization problem

See: https://forum.rclone.org/t/rc-rc-job-expire-interval-bug/11188

rclone was ignoring the --rc-job-expire-duration and --rc-job-interval
flags.  This turned out to be an initialization order problem and was
fixed by moving those flags out of global config into rc config.
This commit is contained in:
Nick Craig-Wood 2019-08-10 17:12:22 +01:00
parent 0693deea1c
commit 3ecbd603ab
7 changed files with 28 additions and 22 deletions

View file

@ -100,8 +100,6 @@ type ConfigInfo struct {
ClientKey string // Client Side Key ClientKey string // Client Side Key
MultiThreadCutoff SizeSuffix MultiThreadCutoff SizeSuffix
MultiThreadStreams int MultiThreadStreams int
RcJobExpireDuration time.Duration
RcJobExpireInterval time.Duration
} }
// NewConfig creates a new config with everything set to the default // NewConfig creates a new config with everything set to the default
@ -136,8 +134,6 @@ func NewConfig() *ConfigInfo {
// c.StatsOneLineDateFormat = "2006/01/02 15:04:05 - " // c.StatsOneLineDateFormat = "2006/01/02 15:04:05 - "
c.MultiThreadCutoff = SizeSuffix(250 * 1024 * 1024) c.MultiThreadCutoff = SizeSuffix(250 * 1024 * 1024)
c.MultiThreadStreams = 4 c.MultiThreadStreams = 4
c.RcJobExpireDuration = 60 * time.Second
c.RcJobExpireInterval = 10 * time.Second
return c return c
} }

View file

@ -103,8 +103,6 @@ func AddFlags(flagSet *pflag.FlagSet) {
flags.StringVarP(flagSet, &fs.Config.ClientKey, "client-key", "", fs.Config.ClientKey, "Client SSL private key (PEM) for mutual TLS auth") flags.StringVarP(flagSet, &fs.Config.ClientKey, "client-key", "", fs.Config.ClientKey, "Client SSL private key (PEM) for mutual TLS auth")
flags.FVarP(flagSet, &fs.Config.MultiThreadCutoff, "multi-thread-cutoff", "", "Use multi-thread downloads for files above this size.") flags.FVarP(flagSet, &fs.Config.MultiThreadCutoff, "multi-thread-cutoff", "", "Use multi-thread downloads for files above this size.")
flags.IntVarP(flagSet, &fs.Config.MultiThreadStreams, "multi-thread-streams", "", fs.Config.MultiThreadStreams, "Max number of streams to use for multi-thread downloads.") flags.IntVarP(flagSet, &fs.Config.MultiThreadStreams, "multi-thread-streams", "", fs.Config.MultiThreadStreams, "Max number of streams to use for multi-thread downloads.")
flags.DurationVarP(flagSet, &fs.Config.RcJobExpireDuration, "rc-job-expire-duration", "", fs.Config.RcJobExpireDuration, "expire finished async jobs older than this value")
flags.DurationVarP(flagSet, &fs.Config.RcJobExpireInterval, "rc-job-expire-interval", "", fs.Config.RcJobExpireInterval, "interval to check for expired async jobs")
flags.BoolVarP(flagSet, &fs.Config.UseJSONLog, "use-json-log", "", fs.Config.UseJSONLog, "Use json log format.") flags.BoolVarP(flagSet, &fs.Config.UseJSONLog, "use-json-log", "", fs.Config.UseJSONLog, "Use json log format.")
} }

View file

@ -35,10 +35,10 @@ type Job struct {
// Jobs describes a collection of running tasks // Jobs describes a collection of running tasks
type Jobs struct { type Jobs struct {
mu sync.RWMutex mu sync.RWMutex
jobs map[int64]*Job jobs map[int64]*Job
expireInterval time.Duration opt *rc.Options
expireRunning bool expireRunning bool
} }
var ( var (
@ -49,17 +49,22 @@ var (
// newJobs makes a new Jobs structure // newJobs makes a new Jobs structure
func newJobs() *Jobs { func newJobs() *Jobs {
return &Jobs{ return &Jobs{
jobs: map[int64]*Job{}, jobs: map[int64]*Job{},
expireInterval: fs.Config.RcJobExpireInterval, opt: &rc.DefaultOpt,
} }
} }
// SetOpt sets the options when they are known
func SetOpt(opt *rc.Options) {
running.opt = opt
}
// kickExpire makes sure Expire is running // kickExpire makes sure Expire is running
func (jobs *Jobs) kickExpire() { func (jobs *Jobs) kickExpire() {
jobs.mu.Lock() jobs.mu.Lock()
defer jobs.mu.Unlock() defer jobs.mu.Unlock()
if !jobs.expireRunning { if !jobs.expireRunning {
time.AfterFunc(jobs.expireInterval, jobs.Expire) time.AfterFunc(jobs.opt.JobExpireInterval, jobs.Expire)
jobs.expireRunning = true jobs.expireRunning = true
} }
} }
@ -71,13 +76,13 @@ func (jobs *Jobs) Expire() {
now := time.Now() now := time.Now()
for ID, job := range jobs.jobs { for ID, job := range jobs.jobs {
job.mu.Lock() job.mu.Lock()
if job.Finished && now.Sub(job.EndTime) > fs.Config.RcJobExpireDuration { if job.Finished && now.Sub(job.EndTime) > jobs.opt.JobExpireDuration {
delete(jobs.jobs, ID) delete(jobs.jobs, ID)
} }
job.mu.Unlock() job.mu.Unlock()
} }
if len(jobs.jobs) != 0 { if len(jobs.jobs) != 0 {
time.AfterFunc(jobs.expireInterval, jobs.Expire) time.AfterFunc(jobs.opt.JobExpireInterval, jobs.Expire)
jobs.expireRunning = true jobs.expireRunning = true
} else { } else {
jobs.expireRunning = false jobs.expireRunning = false

View file

@ -7,8 +7,8 @@ import (
"time" "time"
"github.com/pkg/errors" "github.com/pkg/errors"
"github.com/rclone/rclone/fs"
"github.com/rclone/rclone/fs/rc" "github.com/rclone/rclone/fs/rc"
"github.com/rclone/rclone/fs/rc/rcflags"
"github.com/stretchr/testify/assert" "github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
) )
@ -20,7 +20,7 @@ func TestNewJobs(t *testing.T) {
func TestJobsKickExpire(t *testing.T) { func TestJobsKickExpire(t *testing.T) {
jobs := newJobs() jobs := newJobs()
jobs.expireInterval = time.Millisecond jobs.opt.JobExpireInterval = time.Millisecond
assert.Equal(t, false, jobs.expireRunning) assert.Equal(t, false, jobs.expireRunning)
jobs.kickExpire() jobs.kickExpire()
jobs.mu.Lock() jobs.mu.Lock()
@ -35,7 +35,7 @@ func TestJobsKickExpire(t *testing.T) {
func TestJobsExpire(t *testing.T) { func TestJobsExpire(t *testing.T) {
wait := make(chan struct{}) wait := make(chan struct{})
jobs := newJobs() jobs := newJobs()
jobs.expireInterval = time.Millisecond jobs.opt.JobExpireInterval = time.Millisecond
assert.Equal(t, false, jobs.expireRunning) assert.Equal(t, false, jobs.expireRunning)
job := jobs.NewAsyncJob(func(ctx context.Context, in rc.Params) (rc.Params, error) { job := jobs.NewAsyncJob(func(ctx context.Context, in rc.Params) (rc.Params, error) {
defer close(wait) defer close(wait)
@ -47,7 +47,7 @@ func TestJobsExpire(t *testing.T) {
assert.Equal(t, 1, len(jobs.jobs)) assert.Equal(t, 1, len(jobs.jobs))
jobs.mu.Lock() jobs.mu.Lock()
job.mu.Lock() job.mu.Lock()
job.EndTime = time.Now().Add(-fs.Config.RcJobExpireDuration - 60*time.Second) job.EndTime = time.Now().Add(-rcflags.Opt.JobExpireDuration - 60*time.Second)
assert.Equal(t, true, jobs.expireRunning) assert.Equal(t, true, jobs.expireRunning)
job.mu.Unlock() job.mu.Unlock()
jobs.mu.Unlock() jobs.mu.Unlock()

View file

@ -11,6 +11,7 @@ import (
"encoding/json" "encoding/json"
"io" "io"
_ "net/http/pprof" // install the pprof http handlers _ "net/http/pprof" // install the pprof http handlers
"time"
"github.com/rclone/rclone/cmd/serve/httplib" "github.com/rclone/rclone/cmd/serve/httplib"
) )
@ -26,13 +27,16 @@ type Options struct {
WebGUIUpdate bool // set to download new update WebGUIUpdate bool // set to download new update
WebGUIFetchURL string // set the default url for fetching webgui WebGUIFetchURL string // set the default url for fetching webgui
AccessControlAllowOrigin string // set the access control for CORS configuration AccessControlAllowOrigin string // set the access control for CORS configuration
JobExpireDuration time.Duration
JobExpireInterval time.Duration
} }
// DefaultOpt is the default values used for Options // DefaultOpt is the default values used for Options
var DefaultOpt = Options{ var DefaultOpt = Options{
HTTPOptions: httplib.DefaultOpt, HTTPOptions: httplib.DefaultOpt,
Enabled: false, Enabled: false,
JobExpireDuration: 60 * time.Second,
JobExpireInterval: 10 * time.Second,
} }
func init() { func init() {

View file

@ -24,5 +24,7 @@ func AddFlags(flagSet *pflag.FlagSet) {
flags.BoolVarP(flagSet, &Opt.WebGUIUpdate, "rc-web-gui-update", "", false, "Update / Force update to latest version of web gui") flags.BoolVarP(flagSet, &Opt.WebGUIUpdate, "rc-web-gui-update", "", false, "Update / Force update to latest version of web gui")
flags.StringVarP(flagSet, &Opt.WebGUIFetchURL, "rc-web-fetch-url", "", "https://api.github.com/repos/rclone/rclone-webui-react/releases/latest", "URL to fetch the releases for webgui.") flags.StringVarP(flagSet, &Opt.WebGUIFetchURL, "rc-web-fetch-url", "", "https://api.github.com/repos/rclone/rclone-webui-react/releases/latest", "URL to fetch the releases for webgui.")
flags.StringVarP(flagSet, &Opt.AccessControlAllowOrigin, "rc-allow-origin", "", "", "Set the allowed origin for CORS.") flags.StringVarP(flagSet, &Opt.AccessControlAllowOrigin, "rc-allow-origin", "", "", "Set the allowed origin for CORS.")
flags.DurationVarP(flagSet, &Opt.JobExpireDuration, "rc-job-expire-duration", "", Opt.JobExpireDuration, "expire finished async jobs older than this value")
flags.DurationVarP(flagSet, &Opt.JobExpireInterval, "rc-job-expire-interval", "", Opt.JobExpireInterval, "interval to check for expired async jobs")
httpflags.AddFlagsPrefix(flagSet, "rc-", &Opt.HTTPOptions) httpflags.AddFlagsPrefix(flagSet, "rc-", &Opt.HTTPOptions)
} }

View file

@ -30,6 +30,7 @@ import (
// //
// If the server wasn't configured the *Server returned may be nil // If the server wasn't configured the *Server returned may be nil
func Start(opt *rc.Options) (*Server, error) { func Start(opt *rc.Options) (*Server, error) {
jobs.SetOpt(opt) // set the defaults for jobs
if opt.Enabled { if opt.Enabled {
// Serve on the DefaultServeMux so can have global registrations appear // Serve on the DefaultServeMux so can have global registrations appear
s := newServer(opt, http.DefaultServeMux) s := newServer(opt, http.DefaultServeMux)