drive: add --drive-stop-on-upload-limit flag to stop syncs when upload limit reached
If the --drive-stop-on-upload-limit flag is in effect this checks the error string from Google Drive to see if it is the error you get when you've breached your 750GB a day limit. If so then it turns this error into a Fatal error which should stop the sync. Fixes #3857
This commit is contained in:
parent
ba01d5e8ab
commit
58064bdd2b
2 changed files with 49 additions and 26 deletions
|
@ -437,6 +437,23 @@ be removed.
|
|||
|
||||
See: https://github.com/rclone/rclone/issues/3631
|
||||
|
||||
`,
|
||||
Advanced: true,
|
||||
}, {
|
||||
Name: "stop_on_upload_limit",
|
||||
Default: false,
|
||||
Help: `Make upload limit errors be fatal
|
||||
|
||||
At the time of writing it is only possible to upload 750GB of data to
|
||||
Google Drive a day (this is an undocumented limit). When this limit is
|
||||
reached Google Drive produces a slightly different error message. When
|
||||
this flag is set it causes these errors to be fatal. These will stop
|
||||
the in-progress sync.
|
||||
|
||||
Note that this detection is relying on error message strings which
|
||||
Google don't document so it may break in the future.
|
||||
|
||||
See: https://github.com/rclone/rclone/issues/3857
|
||||
`,
|
||||
Advanced: true,
|
||||
}},
|
||||
|
@ -488,6 +505,7 @@ type Options struct {
|
|||
PacerBurst int `config:"pacer_burst"`
|
||||
ServerSideAcrossConfigs bool `config:"server_side_across_configs"`
|
||||
DisableHTTP2 bool `config:"disable_http2"`
|
||||
StopOnUploadLimit bool `config:"stop_on_upload_limit"`
|
||||
}
|
||||
|
||||
// Fs represents a remote drive server
|
||||
|
@ -558,7 +576,7 @@ func (f *Fs) Features() *fs.Features {
|
|||
}
|
||||
|
||||
// shouldRetry determines whether a given err rates being retried
|
||||
func shouldRetry(err error) (bool, error) {
|
||||
func (f *Fs) shouldRetry(err error) (bool, error) {
|
||||
if err == nil {
|
||||
return false, nil
|
||||
}
|
||||
|
@ -574,6 +592,10 @@ func shouldRetry(err error) (bool, error) {
|
|||
if len(gerr.Errors) > 0 {
|
||||
reason := gerr.Errors[0].Reason
|
||||
if reason == "rateLimitExceeded" || reason == "userRateLimitExceeded" {
|
||||
if f.opt.StopOnUploadLimit && gerr.Errors[0].Message == "User rate limit exceeded." {
|
||||
fs.Errorf(f, "Received upload limit error: %v", err)
|
||||
return false, fserrors.FatalError(err)
|
||||
}
|
||||
return true, err
|
||||
}
|
||||
}
|
||||
|
@ -610,7 +632,7 @@ func (f *Fs) getRootID() (string, error) {
|
|||
Fields("id").
|
||||
SupportsAllDrives(true).
|
||||
Do()
|
||||
return shouldRetry(err)
|
||||
return f.shouldRetry(err)
|
||||
})
|
||||
if err != nil {
|
||||
return "", errors.Wrap(err, "couldn't find root directory ID")
|
||||
|
@ -723,7 +745,7 @@ OUTER:
|
|||
var files *drive.FileList
|
||||
err = f.pacer.Call(func() (bool, error) {
|
||||
files, err = list.Fields(googleapi.Field(fields)).Context(ctx).Do()
|
||||
return shouldRetry(err)
|
||||
return f.shouldRetry(err)
|
||||
})
|
||||
if err != nil {
|
||||
return false, errors.Wrap(err, "couldn't list directory")
|
||||
|
@ -860,11 +882,12 @@ func configTeamDrive(ctx context.Context, opt *Options, m configmap.Mapper, name
|
|||
var driveIDs, driveNames []string
|
||||
listTeamDrives := svc.Teamdrives.List().PageSize(100)
|
||||
listFailed := false
|
||||
var defaultFs Fs // default Fs with default Options
|
||||
for {
|
||||
var teamDrives *drive.TeamDriveList
|
||||
err = newPacer(opt).Call(func() (bool, error) {
|
||||
teamDrives, err = listTeamDrives.Context(ctx).Do()
|
||||
return shouldRetry(err)
|
||||
return defaultFs.shouldRetry(err)
|
||||
})
|
||||
if err != nil {
|
||||
fmt.Printf("Listing team drives failed: %v\n", err)
|
||||
|
@ -1305,7 +1328,7 @@ func (f *Fs) CreateDir(ctx context.Context, pathID, leaf string) (newID string,
|
|||
Fields("id").
|
||||
SupportsAllDrives(true).
|
||||
Do()
|
||||
return shouldRetry(err)
|
||||
return f.shouldRetry(err)
|
||||
})
|
||||
if err != nil {
|
||||
return "", err
|
||||
|
@ -1348,7 +1371,7 @@ func (f *Fs) fetchFormats() {
|
|||
about, err = f.svc.About.Get().
|
||||
Fields("exportFormats,importFormats").
|
||||
Do()
|
||||
return shouldRetry(err)
|
||||
return f.shouldRetry(err)
|
||||
})
|
||||
if err != nil {
|
||||
fs.Errorf(f, "Failed to get Drive exportFormats and importFormats: %v", err)
|
||||
|
@ -1832,7 +1855,7 @@ func (f *Fs) PutUnchecked(ctx context.Context, in io.Reader, src fs.ObjectInfo,
|
|||
SupportsAllDrives(true).
|
||||
KeepRevisionForever(f.opt.KeepRevisionForever).
|
||||
Do()
|
||||
return shouldRetry(err)
|
||||
return f.shouldRetry(err)
|
||||
})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
|
@ -1875,7 +1898,7 @@ func (f *Fs) MergeDirs(ctx context.Context, dirs []fs.Directory) error {
|
|||
Fields("").
|
||||
SupportsAllDrives(true).
|
||||
Do()
|
||||
return shouldRetry(err)
|
||||
return f.shouldRetry(err)
|
||||
})
|
||||
if err != nil {
|
||||
return errors.Wrapf(err, "MergeDirs move failed on %q in %v", info.Name, srcDir)
|
||||
|
@ -1921,7 +1944,7 @@ func (f *Fs) rmdir(ctx context.Context, directoryID string, useTrash bool) error
|
|||
SupportsAllDrives(true).
|
||||
Do()
|
||||
}
|
||||
return shouldRetry(err)
|
||||
return f.shouldRetry(err)
|
||||
})
|
||||
}
|
||||
|
||||
|
@ -2019,7 +2042,7 @@ func (f *Fs) Copy(ctx context.Context, src fs.Object, remote string) (fs.Object,
|
|||
SupportsAllDrives(true).
|
||||
KeepRevisionForever(f.opt.KeepRevisionForever).
|
||||
Do()
|
||||
return shouldRetry(err)
|
||||
return f.shouldRetry(err)
|
||||
})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
|
@ -2068,7 +2091,7 @@ func (f *Fs) Purge(ctx context.Context) error {
|
|||
SupportsAllDrives(true).
|
||||
Do()
|
||||
}
|
||||
return shouldRetry(err)
|
||||
return f.shouldRetry(err)
|
||||
})
|
||||
f.dirCache.ResetRoot()
|
||||
if err != nil {
|
||||
|
@ -2081,7 +2104,7 @@ func (f *Fs) Purge(ctx context.Context) error {
|
|||
func (f *Fs) CleanUp(ctx context.Context) error {
|
||||
err := f.pacer.Call(func() (bool, error) {
|
||||
err := f.svc.Files.EmptyTrash().Context(ctx).Do()
|
||||
return shouldRetry(err)
|
||||
return f.shouldRetry(err)
|
||||
})
|
||||
|
||||
if err != nil {
|
||||
|
@ -2098,7 +2121,7 @@ func (f *Fs) teamDriveOK(ctx context.Context) (err error) {
|
|||
var td *drive.Drive
|
||||
err = f.pacer.Call(func() (bool, error) {
|
||||
td, err = f.svc.Drives.Get(f.opt.TeamDriveID).Fields("name,id,capabilities,createdTime,restrictions").Context(ctx).Do()
|
||||
return shouldRetry(err)
|
||||
return f.shouldRetry(err)
|
||||
})
|
||||
if err != nil {
|
||||
return errors.Wrap(err, "failed to get Team/Shared Drive info")
|
||||
|
@ -2121,7 +2144,7 @@ func (f *Fs) About(ctx context.Context) (*fs.Usage, error) {
|
|||
var err error
|
||||
err = f.pacer.Call(func() (bool, error) {
|
||||
about, err = f.svc.About.Get().Fields("storageQuota").Context(ctx).Do()
|
||||
return shouldRetry(err)
|
||||
return f.shouldRetry(err)
|
||||
})
|
||||
if err != nil {
|
||||
return nil, errors.Wrap(err, "failed to get Drive storageQuota")
|
||||
|
@ -2193,7 +2216,7 @@ func (f *Fs) Move(ctx context.Context, src fs.Object, remote string) (fs.Object,
|
|||
Fields(partialFields).
|
||||
SupportsAllDrives(true).
|
||||
Do()
|
||||
return shouldRetry(err)
|
||||
return f.shouldRetry(err)
|
||||
})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
|
@ -2229,7 +2252,7 @@ func (f *Fs) PublicLink(ctx context.Context, remote string) (link string, err er
|
|||
Fields("").
|
||||
SupportsAllDrives(true).
|
||||
Do()
|
||||
return shouldRetry(err)
|
||||
return f.shouldRetry(err)
|
||||
})
|
||||
if err != nil {
|
||||
return "", err
|
||||
|
@ -2329,7 +2352,7 @@ func (f *Fs) DirMove(ctx context.Context, src fs.Fs, srcRemote, dstRemote string
|
|||
Fields("").
|
||||
SupportsAllDrives(true).
|
||||
Do()
|
||||
return shouldRetry(err)
|
||||
return f.shouldRetry(err)
|
||||
})
|
||||
if err != nil {
|
||||
return err
|
||||
|
@ -2395,7 +2418,7 @@ func (f *Fs) changeNotifyStartPageToken() (pageToken string, err error) {
|
|||
changes.DriveId(f.opt.TeamDriveID)
|
||||
}
|
||||
startPageToken, err = changes.Do()
|
||||
return shouldRetry(err)
|
||||
return f.shouldRetry(err)
|
||||
})
|
||||
if err != nil {
|
||||
return
|
||||
|
@ -2424,7 +2447,7 @@ func (f *Fs) changeNotifyRunner(ctx context.Context, notifyFunc func(string, fs.
|
|||
changesCall.Spaces("appDataFolder")
|
||||
}
|
||||
changeList, err = changesCall.Context(ctx).Do()
|
||||
return shouldRetry(err)
|
||||
return f.shouldRetry(err)
|
||||
})
|
||||
if err != nil {
|
||||
return
|
||||
|
@ -2615,7 +2638,7 @@ func (o *baseObject) SetModTime(ctx context.Context, modTime time.Time) error {
|
|||
Fields(partialFields).
|
||||
SupportsAllDrives(true).
|
||||
Do()
|
||||
return shouldRetry(err)
|
||||
return o.fs.shouldRetry(err)
|
||||
})
|
||||
if err != nil {
|
||||
return err
|
||||
|
@ -2654,7 +2677,7 @@ func (o *baseObject) httpResponse(ctx context.Context, url, method string, optio
|
|||
_ = res.Body.Close() // ignore error
|
||||
}
|
||||
}
|
||||
return shouldRetry(err)
|
||||
return o.fs.shouldRetry(err)
|
||||
})
|
||||
if err != nil {
|
||||
return req, nil, err
|
||||
|
@ -2744,7 +2767,7 @@ func (o *Object) Open(ctx context.Context, options ...fs.OpenOption) (in io.Read
|
|||
Fields("downloadUrl").
|
||||
SupportsAllDrives(true).
|
||||
Do()
|
||||
return shouldRetry(err)
|
||||
return o.fs.shouldRetry(err)
|
||||
})
|
||||
if err == nil {
|
||||
fs.Debugf(o, "Using v2 download: %v", v2File.DownloadUrl)
|
||||
|
@ -2825,7 +2848,7 @@ func (o *baseObject) update(ctx context.Context, updateInfo *drive.File, uploadM
|
|||
SupportsAllDrives(true).
|
||||
KeepRevisionForever(o.fs.opt.KeepRevisionForever).
|
||||
Do()
|
||||
return shouldRetry(err)
|
||||
return o.fs.shouldRetry(err)
|
||||
})
|
||||
return
|
||||
}
|
||||
|
@ -2925,7 +2948,7 @@ func (o *baseObject) Remove(ctx context.Context) error {
|
|||
SupportsAllDrives(true).
|
||||
Do()
|
||||
}
|
||||
return shouldRetry(err)
|
||||
return o.fs.shouldRetry(err)
|
||||
})
|
||||
return err
|
||||
}
|
||||
|
|
|
@ -94,7 +94,7 @@ func (f *Fs) Upload(ctx context.Context, in io.Reader, size int64, contentType,
|
|||
defer googleapi.CloseBody(res)
|
||||
err = googleapi.CheckResponse(res)
|
||||
}
|
||||
return shouldRetry(err)
|
||||
return f.shouldRetry(err)
|
||||
})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
|
@ -211,7 +211,7 @@ func (rx *resumableUpload) Upload(ctx context.Context) (*drive.File, error) {
|
|||
err = rx.f.pacer.Call(func() (bool, error) {
|
||||
fs.Debugf(rx.remote, "Sending chunk %d length %d", start, reqSize)
|
||||
StatusCode, err = rx.transferChunk(ctx, start, chunk, reqSize)
|
||||
again, err := shouldRetry(err)
|
||||
again, err := rx.f.shouldRetry(err)
|
||||
if StatusCode == statusResumeIncomplete || StatusCode == http.StatusCreated || StatusCode == http.StatusOK {
|
||||
again = false
|
||||
err = nil
|
||||
|
|
Loading…
Reference in a new issue