forked from TrueCloudLab/rclone
webdav: chunked uploading - WIP DO NOT MERGE
This commit is contained in:
parent
71db19d8d8
commit
114631de51
3 changed files with 66 additions and 42 deletions
|
@ -20,6 +20,7 @@ import (
|
|||
"net/url"
|
||||
"os/exec"
|
||||
"path"
|
||||
"regexp"
|
||||
"strconv"
|
||||
"strings"
|
||||
"sync"
|
||||
|
@ -148,6 +149,7 @@ type Fs struct {
|
|||
features *fs.Features // optional features
|
||||
endpoint *url.URL // URL of the host
|
||||
endpointURL string // endpoint as a string
|
||||
uploadURL string // upload URL for nextcloud chunked
|
||||
srv *rest.Client // the connection to the one drive server
|
||||
pacer *fs.Pacer // pacer for API calls
|
||||
precision time.Duration // mod time precision
|
||||
|
@ -470,6 +472,12 @@ func NewFs(ctx context.Context, name, root string, m configmap.Mapper) (fs.Fs, e
|
|||
return f, nil
|
||||
}
|
||||
|
||||
// set the chunk size for testing
|
||||
func (f *Fs) setUploadChunkSize(cs fs.SizeSuffix) (old fs.SizeSuffix, err error) {
|
||||
old, f.opt.ChunkSize = f.opt.ChunkSize, cs
|
||||
return
|
||||
}
|
||||
|
||||
// sets the BearerToken up
|
||||
func (f *Fs) setBearerToken(token string) {
|
||||
f.opt.BearerToken = token
|
||||
|
@ -513,6 +521,8 @@ func (f *Fs) fetchAndSetBearerToken() error {
|
|||
return nil
|
||||
}
|
||||
|
||||
var matchNextcloudURL = regexp.MustCompile(`^.*/dav/files/[^/]+/?$`)
|
||||
|
||||
// setQuirks adjusts the Fs for the vendor passed in
|
||||
func (f *Fs) setQuirks(ctx context.Context, vendor string) error {
|
||||
switch vendor {
|
||||
|
@ -526,9 +536,12 @@ func (f *Fs) setQuirks(ctx context.Context, vendor string) error {
|
|||
f.precision = time.Second
|
||||
f.useOCMtime = true
|
||||
f.hasSHA1 = true
|
||||
if f.opt.ChunkSize != 0 {
|
||||
f.canChunk = true
|
||||
f.canChunk = true
|
||||
if f.opt.ChunkSize != 0 && !matchNextcloudURL.MatchString(f.endpointURL) {
|
||||
return errors.New("chunked upload with nextcloud must use /dav/files/USER endpoint not /webdav")
|
||||
}
|
||||
f.uploadURL = strings.Replace(f.endpointURL, "/dav/files/", "/dav/uploads/", 1)
|
||||
fs.Logf(nil, f.uploadURL)
|
||||
case "sharepoint":
|
||||
// To mount sharepoint, two Cookies are required
|
||||
// They have to be set instead of BasicAuth
|
||||
|
@ -1278,17 +1291,17 @@ func (o *Object) Update(ctx context.Context, in io.Reader, src fs.ObjectInfo, op
|
|||
return errors.Wrap(err, "update mkParentDir failed")
|
||||
}
|
||||
|
||||
if o.fs.canChunk {
|
||||
size := src.Size()
|
||||
if o.fs.canChunk && o.fs.opt.ChunkSize > 0 && size > int64(o.fs.opt.ChunkSize) {
|
||||
err = o.updateChunked(ctx, in, src, options...)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
} else {
|
||||
size := src.Size()
|
||||
contentType := fs.MimeType(ctx, src)
|
||||
filePath := o.filePath()
|
||||
extraHeaders := o.extraHeaders(ctx, src)
|
||||
err = o.updateSimple(ctx, in, filePath, size, contentType, extraHeaders, options...)
|
||||
err = o.updateSimple(ctx, in, filePath, size, contentType, extraHeaders, o.fs.endpointURL, options...)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
@ -1323,7 +1336,7 @@ func (o *Object) extraHeaders(ctx context.Context, src fs.ObjectInfo) map[string
|
|||
}
|
||||
|
||||
// Standard update
|
||||
func (o *Object) updateSimple(ctx context.Context, in io.Reader, filePath string, size int64, contentType string, extraHeaders map[string]string, options ...fs.OpenOption) (err error) {
|
||||
func (o *Object) updateSimple(ctx context.Context, in io.Reader, filePath string, size int64, contentType string, extraHeaders map[string]string, rootURL string, options ...fs.OpenOption) (err error) {
|
||||
var resp *http.Response
|
||||
opts := rest.Opts{
|
||||
Method: "PUT",
|
||||
|
@ -1334,6 +1347,7 @@ func (o *Object) updateSimple(ctx context.Context, in io.Reader, filePath string
|
|||
ContentType: contentType,
|
||||
Options: options,
|
||||
ExtraHeaders: extraHeaders,
|
||||
RootURL: rootURL,
|
||||
}
|
||||
err = o.fs.pacer.CallNoRetry(func() (bool, error) {
|
||||
resp, err = o.fs.srv.Call(ctx, &opts)
|
||||
|
@ -1362,40 +1376,44 @@ func (o *Object) updateChunked(ctx context.Context, in io.Reader, src fs.ObjectI
|
|||
if err != nil {
|
||||
return errors.Wrap(err, "chunked upload couldn't hash URL")
|
||||
}
|
||||
uploadDir := "chunked-upload-" + hex.EncodeToString(hasher.Sum(nil))
|
||||
uploadDir := "rclone-chunked-upload-" + hex.EncodeToString(hasher.Sum(nil))
|
||||
fs.Debugf(src, "Starting multipart upload to temp dir %q", uploadDir)
|
||||
|
||||
saveRoot := o.fs.root
|
||||
saveRemote := o.remote
|
||||
o.fs.root = "/"
|
||||
o.fs.srv.SetRoot(strings.Replace(o.fs.endpointURL, "files", "uploads", 1))
|
||||
|
||||
err = o.fs.mkdir(ctx, uploadDir)
|
||||
opts := rest.Opts{
|
||||
Method: "MKCOL",
|
||||
Path: uploadDir + "/",
|
||||
NoResponse: true,
|
||||
RootURL: o.fs.uploadURL,
|
||||
}
|
||||
err = o.fs.pacer.Call(func() (bool, error) {
|
||||
resp, err := o.fs.srv.Call(ctx, &opts)
|
||||
return o.fs.shouldRetry(ctx, resp, err)
|
||||
})
|
||||
if err != nil {
|
||||
return errors.Wrap(err, "making upload directory failed")
|
||||
}
|
||||
defer atexit.OnError(&err, func() {
|
||||
// Try to abort the upload, but ignore the error.
|
||||
fs.Debugf(src, "Cancelling chunked upload")
|
||||
_ = o.fs.pacer.Call(func() (bool, error) {
|
||||
err := o.fs.Purge(ctx, uploadDir)
|
||||
return o.fs.shouldRetry(ctx, nil, err)
|
||||
})
|
||||
_ = o.fs.Purge(ctx, uploadDir)
|
||||
})()
|
||||
|
||||
size := src.Size()
|
||||
var uploadedSize int64 = 0
|
||||
var (
|
||||
size = src.Size()
|
||||
uploadedSize = int64(0)
|
||||
partObj = &Object{
|
||||
fs: o.fs,
|
||||
}
|
||||
)
|
||||
for uploadedSize < size {
|
||||
o.fs.root = uploadDir
|
||||
// Upload chunk
|
||||
contentLength := int64(o.fs.opt.ChunkSize)
|
||||
contentLength := int64(partObj.fs.opt.ChunkSize)
|
||||
if size-uploadedSize < contentLength {
|
||||
contentLength = size - uploadedSize
|
||||
}
|
||||
o.remote = fmt.Sprintf("%015d", uploadedSize)
|
||||
chunkPath := o.filePath()
|
||||
partObj.remote = fmt.Sprintf("%s/%015d-%015d", uploadDir, uploadedSize, uploadedSize+contentLength)
|
||||
extraHeaders := map[string]string{}
|
||||
err = o.updateSimple(ctx, io.LimitReader(in, int64(o.fs.opt.ChunkSize)), chunkPath, contentLength, "", extraHeaders, options...)
|
||||
err = partObj.updateSimple(ctx, io.LimitReader(in, int64(partObj.fs.opt.ChunkSize)), partObj.remote, contentLength, "", extraHeaders, o.fs.uploadURL, options...)
|
||||
if err != nil {
|
||||
return errors.Wrap(err, "uploading chunk failed")
|
||||
}
|
||||
|
@ -1403,14 +1421,13 @@ func (o *Object) updateChunked(ctx context.Context, in io.Reader, src fs.ObjectI
|
|||
}
|
||||
|
||||
// Finish
|
||||
o.fs.root = saveRoot
|
||||
o.remote = saveRemote
|
||||
var resp *http.Response
|
||||
opts := rest.Opts{
|
||||
opts = rest.Opts{
|
||||
Method: "MOVE",
|
||||
Path: rest.URLPathEscape(path.Join(uploadDir, ".file")),
|
||||
Path: o.fs.filePath(path.Join(uploadDir, ".file")),
|
||||
NoResponse: true,
|
||||
Options: options,
|
||||
RootURL: o.fs.uploadURL,
|
||||
}
|
||||
destinationURL, err := rest.URLJoin(o.fs.endpoint, o.filePath())
|
||||
if err != nil {
|
||||
|
@ -1425,11 +1442,6 @@ func (o *Object) updateChunked(ctx context.Context, in io.Reader, src fs.ObjectI
|
|||
if err != nil {
|
||||
return errors.Wrap(err, "finalize chunked upload failed")
|
||||
}
|
||||
|
||||
o.remote = saveRemote
|
||||
o.fs.root = saveRoot
|
||||
o.fs.srv.SetRoot(o.fs.endpointURL)
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
|
|
|
@ -1,10 +1,10 @@
|
|||
// Test Webdav filesystem interface
|
||||
package webdav_test
|
||||
package webdav
|
||||
|
||||
import (
|
||||
"testing"
|
||||
|
||||
"github.com/rclone/rclone/backend/webdav"
|
||||
"github.com/rclone/rclone/fs"
|
||||
"github.com/rclone/rclone/fstest"
|
||||
"github.com/rclone/rclone/fstest/fstests"
|
||||
)
|
||||
|
@ -13,7 +13,10 @@ import (
|
|||
func TestIntegration(t *testing.T) {
|
||||
fstests.Run(t, &fstests.Opt{
|
||||
RemoteName: "TestWebdavNextcloud:",
|
||||
NilObject: (*webdav.Object)(nil),
|
||||
NilObject: (*Object)(nil),
|
||||
ChunkedUpload: fstests.ChunkedUploadConfig{
|
||||
MinChunkSize: 1 * fs.MebiByte,
|
||||
},
|
||||
})
|
||||
}
|
||||
|
||||
|
@ -24,7 +27,10 @@ func TestIntegration2(t *testing.T) {
|
|||
}
|
||||
fstests.Run(t, &fstests.Opt{
|
||||
RemoteName: "TestWebdavOwncloud:",
|
||||
NilObject: (*webdav.Object)(nil),
|
||||
NilObject: (*Object)(nil),
|
||||
ChunkedUpload: fstests.ChunkedUploadConfig{
|
||||
Skip: true,
|
||||
},
|
||||
})
|
||||
}
|
||||
|
||||
|
@ -35,7 +41,10 @@ func TestIntegration3(t *testing.T) {
|
|||
}
|
||||
fstests.Run(t, &fstests.Opt{
|
||||
RemoteName: "TestWebdavRclone:",
|
||||
NilObject: (*webdav.Object)(nil),
|
||||
NilObject: (*Object)(nil),
|
||||
ChunkedUpload: fstests.ChunkedUploadConfig{
|
||||
Skip: true,
|
||||
},
|
||||
})
|
||||
}
|
||||
|
||||
|
@ -46,6 +55,10 @@ func TestIntegration4(t *testing.T) {
|
|||
}
|
||||
fstests.Run(t, &fstests.Opt{
|
||||
RemoteName: "TestWebdavNTLM:",
|
||||
NilObject: (*webdav.Object)(nil),
|
||||
NilObject: (*Object)(nil),
|
||||
})
|
||||
}
|
||||
|
||||
func (f *Fs) SetUploadChunkSize(cs fs.SizeSuffix) (fs.SizeSuffix, error) {
|
||||
return f.setUploadChunkSize(cs)
|
||||
}
|
||||
|
|
|
@ -17,11 +17,10 @@ start() {
|
|||
nextcloud:latest
|
||||
|
||||
echo type=webdav
|
||||
echo url=http://$(docker_ip)/remote.php/webdav/
|
||||
echo url=http://$(docker_ip)/remote.php/dav/files/$USER/
|
||||
echo user=$USER
|
||||
echo pass=$(rclone obscure $PASS)
|
||||
# the tests don't pass if we use the nextcloud features
|
||||
# echo vendor=nextcloud
|
||||
echo vendor=nextcloud
|
||||
echo _connect=$(docker_ip):80
|
||||
}
|
||||
|
||||
|
|
Loading…
Add table
Reference in a new issue