From 30cccc71014b85818ba1f04748d052feb85b4080 Mon Sep 17 00:00:00 2001 From: Nick Craig-Wood Date: Sun, 11 Jun 2023 15:25:14 +0100 Subject: [PATCH] cache: fix backends shutting down when in use when used via the rc Before this fix, if a long running task (eg a copy) was started by the rc then the backend could expire before the copy had finished. The typical symptom was with the dropbox backend giving "batcher is shutting down" errors. This patch fixes the problem by pinning the backend until the job has finished. See: https://forum.rclone.org/t/uploads-start-repeatedly-failing-after-a-while-using-rc-sync-copy-vs-rclone-copy-for-dropbox/38873/ --- fs/cache/cache.go | 25 ++++++++++++++++++++++++- fs/rc/jobs/job.go | 7 +++++++ 2 files changed, 31 insertions(+), 1 deletion(-) diff --git a/fs/cache/cache.go b/fs/cache/cache.go index 4e2c4638e..c5eab3e39 100644 --- a/fs/cache/cache.go +++ b/fs/cache/cache.go @@ -120,6 +120,14 @@ func Unpin(f fs.Fs) { c.Unpin(fs.ConfigString(f)) } +// To avoid circular dependencies these are filled in by fs/rc/jobs/job.go +var ( + // JobGetJobID for internal use only + JobGetJobID func(context.Context) (int64, bool) + // JobOnFinish for internal use only + JobOnFinish func(int64, func()) (func(), error) +) + // Get gets an fs.Fs named fsString either from the cache or creates it afresh func Get(ctx context.Context, fsString string) (f fs.Fs, err error) { // If we are making a long lived backend which lives longer @@ -129,7 +137,22 @@ func Get(ctx context.Context, fsString string) (f fs.Fs, err error) { newCtx := context.Background() newCtx = fs.CopyConfig(newCtx, ctx) newCtx = filter.CopyConfig(newCtx, ctx) - return GetFn(newCtx, fsString, fs.NewFs) + f, err = GetFn(newCtx, fsString, fs.NewFs) + if f == nil || (err != nil && err != fs.ErrorIsFile) { + return f, err + } + // If this is part of an rc job then pin the backend until it finishes + if JobOnFinish != nil && JobGetJobID != nil { + if jobID, ok := JobGetJobID(ctx); ok { + // fs.Debugf(f, "Pin for job %d", jobID) + Pin(f) + _, _ = JobOnFinish(jobID, func() { + // fs.Debugf(f, "Unpin for job %d", jobID) + Unpin(f) + }) + } + } + return f, err } // GetArr gets []fs.Fs from []fsStrings either from the cache or creates it afresh diff --git a/fs/rc/jobs/job.go b/fs/rc/jobs/job.go index 1501e365b..1cdfd91e0 100644 --- a/fs/rc/jobs/job.go +++ b/fs/rc/jobs/job.go @@ -12,10 +12,17 @@ import ( "github.com/rclone/rclone/fs" "github.com/rclone/rclone/fs/accounting" + "github.com/rclone/rclone/fs/cache" "github.com/rclone/rclone/fs/filter" "github.com/rclone/rclone/fs/rc" ) +// Fill in these to avoid circular dependencies +func init() { + cache.JobOnFinish = OnFinish + cache.JobGetJobID = GetJobID +} + // Job describes an asynchronous task started via the rc package type Job struct { mu sync.Mutex