From 114631de51d1c35a9994a7fb72d85e19bf352c88 Mon Sep 17 00:00:00 2001 From: Nick Craig-Wood Date: Tue, 23 Feb 2021 20:23:21 +0000 Subject: [PATCH] webdav: chunked uploading - WIP DO NOT MERGE --- backend/webdav/webdav.go | 78 +++++++++++--------- backend/webdav/webdav_test.go | 25 +++++-- fstest/testserver/init.d/TestWebdavNextcloud | 5 +- 3 files changed, 66 insertions(+), 42 deletions(-) diff --git a/backend/webdav/webdav.go b/backend/webdav/webdav.go index 90d0828e3..942cc44fd 100644 --- a/backend/webdav/webdav.go +++ b/backend/webdav/webdav.go @@ -20,6 +20,7 @@ import ( "net/url" "os/exec" "path" + "regexp" "strconv" "strings" "sync" @@ -148,6 +149,7 @@ type Fs struct { features *fs.Features // optional features endpoint *url.URL // URL of the host endpointURL string // endpoint as a string + uploadURL string // upload URL for nextcloud chunked srv *rest.Client // the connection to the one drive server pacer *fs.Pacer // pacer for API calls precision time.Duration // mod time precision @@ -470,6 +472,12 @@ func NewFs(ctx context.Context, name, root string, m configmap.Mapper) (fs.Fs, e return f, nil } +// set the chunk size for testing +func (f *Fs) setUploadChunkSize(cs fs.SizeSuffix) (old fs.SizeSuffix, err error) { + old, f.opt.ChunkSize = f.opt.ChunkSize, cs + return +} + // sets the BearerToken up func (f *Fs) setBearerToken(token string) { f.opt.BearerToken = token @@ -513,6 +521,8 @@ func (f *Fs) fetchAndSetBearerToken() error { return nil } +var matchNextcloudURL = regexp.MustCompile(`^.*/dav/files/[^/]+/?$`) + // setQuirks adjusts the Fs for the vendor passed in func (f *Fs) setQuirks(ctx context.Context, vendor string) error { switch vendor { @@ -526,9 +536,12 @@ func (f *Fs) setQuirks(ctx context.Context, vendor string) error { f.precision = time.Second f.useOCMtime = true f.hasSHA1 = true - if f.opt.ChunkSize != 0 { - f.canChunk = true + f.canChunk = true + if f.opt.ChunkSize != 0 && !matchNextcloudURL.MatchString(f.endpointURL) { + return errors.New("chunked upload with nextcloud must use /dav/files/USER endpoint not /webdav") } + f.uploadURL = strings.Replace(f.endpointURL, "/dav/files/", "/dav/uploads/", 1) + fs.Logf(nil, f.uploadURL) case "sharepoint": // To mount sharepoint, two Cookies are required // They have to be set instead of BasicAuth @@ -1278,17 +1291,17 @@ func (o *Object) Update(ctx context.Context, in io.Reader, src fs.ObjectInfo, op return errors.Wrap(err, "update mkParentDir failed") } - if o.fs.canChunk { + size := src.Size() + if o.fs.canChunk && o.fs.opt.ChunkSize > 0 && size > int64(o.fs.opt.ChunkSize) { err = o.updateChunked(ctx, in, src, options...) if err != nil { return err } } else { - size := src.Size() contentType := fs.MimeType(ctx, src) filePath := o.filePath() extraHeaders := o.extraHeaders(ctx, src) - err = o.updateSimple(ctx, in, filePath, size, contentType, extraHeaders, options...) + err = o.updateSimple(ctx, in, filePath, size, contentType, extraHeaders, o.fs.endpointURL, options...) if err != nil { return err } @@ -1323,7 +1336,7 @@ func (o *Object) extraHeaders(ctx context.Context, src fs.ObjectInfo) map[string } // Standard update -func (o *Object) updateSimple(ctx context.Context, in io.Reader, filePath string, size int64, contentType string, extraHeaders map[string]string, options ...fs.OpenOption) (err error) { +func (o *Object) updateSimple(ctx context.Context, in io.Reader, filePath string, size int64, contentType string, extraHeaders map[string]string, rootURL string, options ...fs.OpenOption) (err error) { var resp *http.Response opts := rest.Opts{ Method: "PUT", @@ -1334,6 +1347,7 @@ func (o *Object) updateSimple(ctx context.Context, in io.Reader, filePath string ContentType: contentType, Options: options, ExtraHeaders: extraHeaders, + RootURL: rootURL, } err = o.fs.pacer.CallNoRetry(func() (bool, error) { resp, err = o.fs.srv.Call(ctx, &opts) @@ -1362,40 +1376,44 @@ func (o *Object) updateChunked(ctx context.Context, in io.Reader, src fs.ObjectI if err != nil { return errors.Wrap(err, "chunked upload couldn't hash URL") } - uploadDir := "chunked-upload-" + hex.EncodeToString(hasher.Sum(nil)) + uploadDir := "rclone-chunked-upload-" + hex.EncodeToString(hasher.Sum(nil)) fs.Debugf(src, "Starting multipart upload to temp dir %q", uploadDir) - saveRoot := o.fs.root - saveRemote := o.remote - o.fs.root = "/" - o.fs.srv.SetRoot(strings.Replace(o.fs.endpointURL, "files", "uploads", 1)) - - err = o.fs.mkdir(ctx, uploadDir) + opts := rest.Opts{ + Method: "MKCOL", + Path: uploadDir + "/", + NoResponse: true, + RootURL: o.fs.uploadURL, + } + err = o.fs.pacer.Call(func() (bool, error) { + resp, err := o.fs.srv.Call(ctx, &opts) + return o.fs.shouldRetry(ctx, resp, err) + }) if err != nil { return errors.Wrap(err, "making upload directory failed") } defer atexit.OnError(&err, func() { // Try to abort the upload, but ignore the error. fs.Debugf(src, "Cancelling chunked upload") - _ = o.fs.pacer.Call(func() (bool, error) { - err := o.fs.Purge(ctx, uploadDir) - return o.fs.shouldRetry(ctx, nil, err) - }) + _ = o.fs.Purge(ctx, uploadDir) })() - size := src.Size() - var uploadedSize int64 = 0 + var ( + size = src.Size() + uploadedSize = int64(0) + partObj = &Object{ + fs: o.fs, + } + ) for uploadedSize < size { - o.fs.root = uploadDir // Upload chunk - contentLength := int64(o.fs.opt.ChunkSize) + contentLength := int64(partObj.fs.opt.ChunkSize) if size-uploadedSize < contentLength { contentLength = size - uploadedSize } - o.remote = fmt.Sprintf("%015d", uploadedSize) - chunkPath := o.filePath() + partObj.remote = fmt.Sprintf("%s/%015d-%015d", uploadDir, uploadedSize, uploadedSize+contentLength) extraHeaders := map[string]string{} - err = o.updateSimple(ctx, io.LimitReader(in, int64(o.fs.opt.ChunkSize)), chunkPath, contentLength, "", extraHeaders, options...) + err = partObj.updateSimple(ctx, io.LimitReader(in, int64(partObj.fs.opt.ChunkSize)), partObj.remote, contentLength, "", extraHeaders, o.fs.uploadURL, options...) if err != nil { return errors.Wrap(err, "uploading chunk failed") } @@ -1403,14 +1421,13 @@ func (o *Object) updateChunked(ctx context.Context, in io.Reader, src fs.ObjectI } // Finish - o.fs.root = saveRoot - o.remote = saveRemote var resp *http.Response - opts := rest.Opts{ + opts = rest.Opts{ Method: "MOVE", - Path: rest.URLPathEscape(path.Join(uploadDir, ".file")), + Path: o.fs.filePath(path.Join(uploadDir, ".file")), NoResponse: true, Options: options, + RootURL: o.fs.uploadURL, } destinationURL, err := rest.URLJoin(o.fs.endpoint, o.filePath()) if err != nil { @@ -1425,11 +1442,6 @@ func (o *Object) updateChunked(ctx context.Context, in io.Reader, src fs.ObjectI if err != nil { return errors.Wrap(err, "finalize chunked upload failed") } - - o.remote = saveRemote - o.fs.root = saveRoot - o.fs.srv.SetRoot(o.fs.endpointURL) - return nil } diff --git a/backend/webdav/webdav_test.go b/backend/webdav/webdav_test.go index e23176afe..0c93a1962 100644 --- a/backend/webdav/webdav_test.go +++ b/backend/webdav/webdav_test.go @@ -1,10 +1,10 @@ // Test Webdav filesystem interface -package webdav_test +package webdav import ( "testing" - "github.com/rclone/rclone/backend/webdav" + "github.com/rclone/rclone/fs" "github.com/rclone/rclone/fstest" "github.com/rclone/rclone/fstest/fstests" ) @@ -13,7 +13,10 @@ import ( func TestIntegration(t *testing.T) { fstests.Run(t, &fstests.Opt{ RemoteName: "TestWebdavNextcloud:", - NilObject: (*webdav.Object)(nil), + NilObject: (*Object)(nil), + ChunkedUpload: fstests.ChunkedUploadConfig{ + MinChunkSize: 1 * fs.MebiByte, + }, }) } @@ -24,7 +27,10 @@ func TestIntegration2(t *testing.T) { } fstests.Run(t, &fstests.Opt{ RemoteName: "TestWebdavOwncloud:", - NilObject: (*webdav.Object)(nil), + NilObject: (*Object)(nil), + ChunkedUpload: fstests.ChunkedUploadConfig{ + Skip: true, + }, }) } @@ -35,7 +41,10 @@ func TestIntegration3(t *testing.T) { } fstests.Run(t, &fstests.Opt{ RemoteName: "TestWebdavRclone:", - NilObject: (*webdav.Object)(nil), + NilObject: (*Object)(nil), + ChunkedUpload: fstests.ChunkedUploadConfig{ + Skip: true, + }, }) } @@ -46,6 +55,10 @@ func TestIntegration4(t *testing.T) { } fstests.Run(t, &fstests.Opt{ RemoteName: "TestWebdavNTLM:", - NilObject: (*webdav.Object)(nil), + NilObject: (*Object)(nil), }) } + +func (f *Fs) SetUploadChunkSize(cs fs.SizeSuffix) (fs.SizeSuffix, error) { + return f.setUploadChunkSize(cs) +} diff --git a/fstest/testserver/init.d/TestWebdavNextcloud b/fstest/testserver/init.d/TestWebdavNextcloud index 8f61a0610..9cf391e38 100755 --- a/fstest/testserver/init.d/TestWebdavNextcloud +++ b/fstest/testserver/init.d/TestWebdavNextcloud @@ -17,11 +17,10 @@ start() { nextcloud:latest echo type=webdav - echo url=http://$(docker_ip)/remote.php/webdav/ + echo url=http://$(docker_ip)/remote.php/dav/files/$USER/ echo user=$USER echo pass=$(rclone obscure $PASS) - # the tests don't pass if we use the nextcloud features - # echo vendor=nextcloud + echo vendor=nextcloud echo _connect=$(docker_ip):80 }