From fc6bd0dd772c1a4ab12f3bfda158d506db24e1ec Mon Sep 17 00:00:00 2001 From: Thibault Coupin Date: Fri, 11 Dec 2020 22:11:46 +0100 Subject: [PATCH] webdav: add chunked uploading option for nextcloud --- backend/webdav/webdav.go | 157 ++++++++++++++++++++++++++++++++++----- 1 file changed, 139 insertions(+), 18 deletions(-) diff --git a/backend/webdav/webdav.go b/backend/webdav/webdav.go index ce2fb3ae1..c560ba21d 100644 --- a/backend/webdav/webdav.go +++ b/backend/webdav/webdav.go @@ -10,7 +10,9 @@ package webdav import ( "bytes" "context" + "crypto/md5" "crypto/tls" + "encoding/hex" "encoding/xml" "fmt" "io" @@ -34,6 +36,7 @@ import ( "github.com/rclone/rclone/fs/fserrors" "github.com/rclone/rclone/fs/fshttp" "github.com/rclone/rclone/fs/hash" + "github.com/rclone/rclone/lib/atexit" "github.com/rclone/rclone/lib/encoder" "github.com/rclone/rclone/lib/pacer" "github.com/rclone/rclone/lib/rest" @@ -113,6 +116,14 @@ func init() { Name: config.ConfigEncoding, Help: configEncodingHelp, Advanced: true, + }, { + Name: "chunk_size", + Help: `Chunk size to use for uploading (Nextcloud only) + +Set to 0 to disable chunked uploading. +`, + Advanced: true, + Default: fs.SizeSuffix(0), // off by default }}, }) } @@ -126,6 +137,7 @@ type Options struct { BearerToken string `config:"bearer_token"` BearerTokenCommand string `config:"bearer_token_command"` Enc encoder.MultiEncoder `config:"encoding"` + ChunkSize fs.SizeSuffix `config:"chunk_size"` } // Fs represents a remote webdav @@ -146,6 +158,7 @@ type Fs struct { hasMD5 bool // set if can use owncloud style checksums for MD5 hasSHA1 bool // set if can use owncloud style checksums for SHA1 ntlmAuthMu sync.Mutex // mutex to serialize NTLM auth roundtrips + canChunk bool // set if nextcloud and chunk_size is set } // Object describes a webdav object @@ -513,6 +526,9 @@ func (f *Fs) setQuirks(ctx context.Context, vendor string) error { f.precision = time.Second f.useOCMtime = true f.hasSHA1 = true + if f.opt.ChunkSize != 0 { + f.canChunk = true + } case "sharepoint": // To mount sharepoint, two Cookies are required // They have to be set instead of BasicAuth @@ -1262,36 +1278,63 @@ func (o *Object) Update(ctx context.Context, in io.Reader, src fs.ObjectInfo, op return errors.Wrap(err, "Update mkParentDir failed") } - size := src.Size() - var resp *http.Response - opts := rest.Opts{ - Method: "PUT", - Path: o.filePath(), - Body: in, - NoResponse: true, - ContentLength: &size, // FIXME this isn't necessary with owncloud - See https://github.com/nextcloud/nextcloud-snap/issues/365 - ContentType: fs.MimeType(ctx, src), - Options: options, + if o.fs.canChunk { + err = o.updateChunked(ctx, in, src, options...) + if err != nil { + return err + } + } else { + size := src.Size() + contentType := fs.MimeType(ctx, src) + filePath := o.filePath() + extraHeaders := o.extraHeaders(ctx, src) + err = o.updateSimple(ctx, in, filePath, size, contentType, extraHeaders, options...) + if err != nil { + return err + } } + + // read metadata from remote + o.hasMetaData = false + return o.readMetaData(ctx) +} + +func (o *Object) extraHeaders(ctx context.Context, src fs.ObjectInfo) map[string]string { + extraHeaders := map[string]string{} if o.fs.useOCMtime || o.fs.hasMD5 || o.fs.hasSHA1 { - opts.ExtraHeaders = map[string]string{} if o.fs.useOCMtime { - opts.ExtraHeaders["X-OC-Mtime"] = fmt.Sprintf("%d", src.ModTime(ctx).Unix()) + extraHeaders["X-OC-Mtime"] = fmt.Sprintf("%d", src.ModTime(ctx).Unix()) } // Set one upload checksum // Owncloud uses one checksum only to check the upload and stores its own SHA1 and MD5 // Nextcloud stores the checksum you supply (SHA1 or MD5) but only stores one if o.fs.hasSHA1 { if sha1, _ := src.Hash(ctx, hash.SHA1); sha1 != "" { - opts.ExtraHeaders["OC-Checksum"] = "SHA1:" + sha1 + extraHeaders["OC-Checksum"] = "SHA1:" + sha1 } } - if o.fs.hasMD5 && opts.ExtraHeaders["OC-Checksum"] == "" { + if o.fs.hasMD5 && extraHeaders["OC-Checksum"] == "" { if md5, _ := src.Hash(ctx, hash.MD5); md5 != "" { - opts.ExtraHeaders["OC-Checksum"] = "MD5:" + md5 + extraHeaders["OC-Checksum"] = "MD5:" + md5 } } } + return extraHeaders +} + +// Standard update +func (o *Object) updateSimple(ctx context.Context, in io.Reader, filePath string, size int64, contentType string, extraHeaders map[string]string, options ...fs.OpenOption) (err error) { + var resp *http.Response + opts := rest.Opts{ + Method: "PUT", + Path: filePath, + Body: in, + NoResponse: true, + ContentLength: &size, // FIXME this isn't necessary with owncloud - See https://github.com/nextcloud/nextcloud-snap/issues/365 + ContentType: contentType, + Options: options, + ExtraHeaders: extraHeaders, + } err = o.fs.pacer.CallNoRetry(func() (bool, error) { resp, err = o.fs.srv.Call(ctx, &opts) return o.fs.shouldRetry(ctx, resp, err) @@ -1307,9 +1350,87 @@ func (o *Object) Update(ctx context.Context, in io.Reader, src fs.ObjectInfo, op _ = o.Remove(ctx) return err } - // read metadata from remote - o.hasMetaData = false - return o.readMetaData(ctx) + return nil + +} + +// Chunked update for Nextcloud (see +// https://docs.nextcloud.com/server/20/developer_manual/client_apis/WebDAV/chunking.html) +func (o *Object) updateChunked(ctx context.Context, in io.Reader, src fs.ObjectInfo, options ...fs.OpenOption) (err error) { + hasher := md5.New() + _, err = hasher.Write([]byte(o.filePath())) + if err != nil { + return errors.Wrap(err, "Chunked upload couldn't hash URL") + } + uploadDir := "chunked-upload-" + hex.EncodeToString(hasher.Sum(nil)) + fs.Debugf(src, "Starting multipart upload to temp dir %q", uploadDir) + + saveRoot := o.fs.root + saveRemote := o.remote + o.fs.root = "/" + o.fs.srv.SetRoot(strings.Replace(o.fs.endpointURL, "files", "uploads", 1)) + + err = o.fs.mkdir(ctx, uploadDir) + if err != nil { + return errors.Wrap(err, "Making upload directory failed") + } + defer atexit.OnError(&err, func() { + // Try to abort the upload, but ignore the error. + fs.Debugf(src, "Cancelling chunked upload") + _ = o.fs.pacer.Call(func() (bool, error) { + err := o.fs.Purge(ctx, uploadDir) + return o.fs.shouldRetry(ctx, nil, err) + }) + })() + + size := src.Size() + var uploadedSize int64 = 0 + for uploadedSize < size { + o.fs.root = uploadDir + // Upload chunk + contentLength := int64(o.fs.opt.ChunkSize) + if size-uploadedSize < contentLength { + contentLength = size - uploadedSize + } + o.remote = fmt.Sprintf("%015d", uploadedSize) + chunkPath := o.filePath() + extraHeaders := map[string]string{} + err = o.updateSimple(ctx, io.LimitReader(in, int64(o.fs.opt.ChunkSize)), chunkPath, contentLength, "", extraHeaders, options...) + if err != nil { + return errors.Wrap(err, "Uploading chunk failed") + } + uploadedSize += contentLength + } + + // Finish + o.fs.root = saveRoot + o.remote = saveRemote + var resp *http.Response + opts := rest.Opts{ + Method: "MOVE", + Path: rest.URLPathEscape(path.Join(uploadDir, ".file")), + NoResponse: true, + Options: options, + } + destinationURL, err := rest.URLJoin(o.fs.endpoint, o.filePath()) + if err != nil { + return errors.Wrap(err, "Finalize chunked upload couldn't join URL") + } + opts.ExtraHeaders = o.extraHeaders(ctx, src) + opts.ExtraHeaders["Destination"] = destinationURL.String() + err = o.fs.pacer.CallNoRetry(func() (bool, error) { + resp, err = o.fs.srv.Call(ctx, &opts) + return o.fs.shouldRetry(ctx, resp, err) + }) + if err != nil { + return errors.Wrap(err, "Finalize chunked upload failed") + } + + o.remote = saveRemote + o.fs.root = saveRoot + o.fs.srv.SetRoot(o.fs.endpointURL) + + return nil } // Remove an object