b2: cancel in progress multipart uploads and copies on rclone exit #4300

This commit is contained in:
Nick Craig-Wood 2020-06-25 14:14:44 +01:00
parent 5f75444ef6
commit 47b17dc1bb

View file

@ -20,6 +20,7 @@ import (
"github.com/rclone/rclone/fs" "github.com/rclone/rclone/fs"
"github.com/rclone/rclone/fs/accounting" "github.com/rclone/rclone/fs/accounting"
"github.com/rclone/rclone/fs/hash" "github.com/rclone/rclone/fs/hash"
"github.com/rclone/rclone/lib/atexit"
"github.com/rclone/rclone/lib/rest" "github.com/rclone/rclone/lib/rest"
"golang.org/x/sync/errgroup" "golang.org/x/sync/errgroup"
) )
@ -330,6 +331,7 @@ func (up *largeUpload) finish(ctx context.Context) error {
// cancel aborts the large upload // cancel aborts the large upload
func (up *largeUpload) cancel(ctx context.Context) error { func (up *largeUpload) cancel(ctx context.Context) error {
fs.Debugf(up.o, "Cancelling large file %s", up.what)
opts := rest.Opts{ opts := rest.Opts{
Method: "POST", Method: "POST",
Path: "/b2_cancel_large_file", Path: "/b2_cancel_large_file",
@ -342,28 +344,19 @@ func (up *largeUpload) cancel(ctx context.Context) error {
resp, err := up.f.srv.CallJSON(ctx, &opts, &request, &response) resp, err := up.f.srv.CallJSON(ctx, &opts, &request, &response)
return up.f.shouldRetry(ctx, resp, err) return up.f.shouldRetry(ctx, resp, err)
}) })
if err != nil {
fs.Errorf(up.o, "Failed to cancel large file %s: %v", up.what, err)
}
return err return err
} }
// If the error pointer is not nil then cancel the transfer
func (up *largeUpload) cancelOnError(ctx context.Context, err *error) {
if *err == nil {
return
}
fs.Debugf(up.o, "Cancelling large file %s due to error: %v", up.what, *err)
cancelErr := up.cancel(ctx)
if cancelErr != nil {
fs.Errorf(up.o, "Failed to cancel large file %s: %v", up.what, cancelErr)
}
}
// Stream uploads the chunks from the input, starting with a required initial // Stream uploads the chunks from the input, starting with a required initial
// chunk. Assumes the file size is unknown and will upload until the input // chunk. Assumes the file size is unknown and will upload until the input
// reaches EOF. // reaches EOF.
// //
// Note that initialUploadBlock must be returned to f.putBuf() // Note that initialUploadBlock must be returned to f.putBuf()
func (up *largeUpload) Stream(ctx context.Context, initialUploadBlock []byte) (err error) { func (up *largeUpload) Stream(ctx context.Context, initialUploadBlock []byte) (err error) {
defer up.cancelOnError(ctx, &err) defer atexit.OnError(&err, func() { _ = up.cancel(ctx) })()
fs.Debugf(up.o, "Starting streaming of large file (id %q)", up.id) fs.Debugf(up.o, "Starting streaming of large file (id %q)", up.id)
var ( var (
g, gCtx = errgroup.WithContext(ctx) g, gCtx = errgroup.WithContext(ctx)
@ -434,7 +427,7 @@ func (up *largeUpload) Stream(ctx context.Context, initialUploadBlock []byte) (e
// Upload uploads the chunks from the input // Upload uploads the chunks from the input
func (up *largeUpload) Upload(ctx context.Context) (err error) { func (up *largeUpload) Upload(ctx context.Context) (err error) {
defer up.cancelOnError(ctx, &err) defer atexit.OnError(&err, func() { _ = up.cancel(ctx) })()
fs.Debugf(up.o, "Starting %s of large file in %d chunks (id %q)", up.what, up.parts, up.id) fs.Debugf(up.o, "Starting %s of large file in %d chunks (id %q)", up.what, up.parts, up.id)
var ( var (
g, gCtx = errgroup.WithContext(ctx) g, gCtx = errgroup.WithContext(ctx)