webdav: add chunked uploading option for nextcloud
This commit is contained in:
parent
f4068d406b
commit
fc6bd0dd77
1 changed files with 139 additions and 18 deletions
|
@ -10,7 +10,9 @@ package webdav
|
|||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"crypto/md5"
|
||||
"crypto/tls"
|
||||
"encoding/hex"
|
||||
"encoding/xml"
|
||||
"fmt"
|
||||
"io"
|
||||
|
@ -34,6 +36,7 @@ import (
|
|||
"github.com/rclone/rclone/fs/fserrors"
|
||||
"github.com/rclone/rclone/fs/fshttp"
|
||||
"github.com/rclone/rclone/fs/hash"
|
||||
"github.com/rclone/rclone/lib/atexit"
|
||||
"github.com/rclone/rclone/lib/encoder"
|
||||
"github.com/rclone/rclone/lib/pacer"
|
||||
"github.com/rclone/rclone/lib/rest"
|
||||
|
@ -113,6 +116,14 @@ func init() {
|
|||
Name: config.ConfigEncoding,
|
||||
Help: configEncodingHelp,
|
||||
Advanced: true,
|
||||
}, {
|
||||
Name: "chunk_size",
|
||||
Help: `Chunk size to use for uploading (Nextcloud only)
|
||||
|
||||
Set to 0 to disable chunked uploading.
|
||||
`,
|
||||
Advanced: true,
|
||||
Default: fs.SizeSuffix(0), // off by default
|
||||
}},
|
||||
})
|
||||
}
|
||||
|
@ -126,6 +137,7 @@ type Options struct {
|
|||
BearerToken string `config:"bearer_token"`
|
||||
BearerTokenCommand string `config:"bearer_token_command"`
|
||||
Enc encoder.MultiEncoder `config:"encoding"`
|
||||
ChunkSize fs.SizeSuffix `config:"chunk_size"`
|
||||
}
|
||||
|
||||
// Fs represents a remote webdav
|
||||
|
@ -146,6 +158,7 @@ type Fs struct {
|
|||
hasMD5 bool // set if can use owncloud style checksums for MD5
|
||||
hasSHA1 bool // set if can use owncloud style checksums for SHA1
|
||||
ntlmAuthMu sync.Mutex // mutex to serialize NTLM auth roundtrips
|
||||
canChunk bool // set if nextcloud and chunk_size is set
|
||||
}
|
||||
|
||||
// Object describes a webdav object
|
||||
|
@ -513,6 +526,9 @@ func (f *Fs) setQuirks(ctx context.Context, vendor string) error {
|
|||
f.precision = time.Second
|
||||
f.useOCMtime = true
|
||||
f.hasSHA1 = true
|
||||
if f.opt.ChunkSize != 0 {
|
||||
f.canChunk = true
|
||||
}
|
||||
case "sharepoint":
|
||||
// To mount sharepoint, two Cookies are required
|
||||
// They have to be set instead of BasicAuth
|
||||
|
@ -1262,36 +1278,63 @@ func (o *Object) Update(ctx context.Context, in io.Reader, src fs.ObjectInfo, op
|
|||
return errors.Wrap(err, "Update mkParentDir failed")
|
||||
}
|
||||
|
||||
size := src.Size()
|
||||
var resp *http.Response
|
||||
opts := rest.Opts{
|
||||
Method: "PUT",
|
||||
Path: o.filePath(),
|
||||
Body: in,
|
||||
NoResponse: true,
|
||||
ContentLength: &size, // FIXME this isn't necessary with owncloud - See https://github.com/nextcloud/nextcloud-snap/issues/365
|
||||
ContentType: fs.MimeType(ctx, src),
|
||||
Options: options,
|
||||
if o.fs.canChunk {
|
||||
err = o.updateChunked(ctx, in, src, options...)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
} else {
|
||||
size := src.Size()
|
||||
contentType := fs.MimeType(ctx, src)
|
||||
filePath := o.filePath()
|
||||
extraHeaders := o.extraHeaders(ctx, src)
|
||||
err = o.updateSimple(ctx, in, filePath, size, contentType, extraHeaders, options...)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
// read metadata from remote
|
||||
o.hasMetaData = false
|
||||
return o.readMetaData(ctx)
|
||||
}
|
||||
|
||||
func (o *Object) extraHeaders(ctx context.Context, src fs.ObjectInfo) map[string]string {
|
||||
extraHeaders := map[string]string{}
|
||||
if o.fs.useOCMtime || o.fs.hasMD5 || o.fs.hasSHA1 {
|
||||
opts.ExtraHeaders = map[string]string{}
|
||||
if o.fs.useOCMtime {
|
||||
opts.ExtraHeaders["X-OC-Mtime"] = fmt.Sprintf("%d", src.ModTime(ctx).Unix())
|
||||
extraHeaders["X-OC-Mtime"] = fmt.Sprintf("%d", src.ModTime(ctx).Unix())
|
||||
}
|
||||
// Set one upload checksum
|
||||
// Owncloud uses one checksum only to check the upload and stores its own SHA1 and MD5
|
||||
// Nextcloud stores the checksum you supply (SHA1 or MD5) but only stores one
|
||||
if o.fs.hasSHA1 {
|
||||
if sha1, _ := src.Hash(ctx, hash.SHA1); sha1 != "" {
|
||||
opts.ExtraHeaders["OC-Checksum"] = "SHA1:" + sha1
|
||||
extraHeaders["OC-Checksum"] = "SHA1:" + sha1
|
||||
}
|
||||
}
|
||||
if o.fs.hasMD5 && opts.ExtraHeaders["OC-Checksum"] == "" {
|
||||
if o.fs.hasMD5 && extraHeaders["OC-Checksum"] == "" {
|
||||
if md5, _ := src.Hash(ctx, hash.MD5); md5 != "" {
|
||||
opts.ExtraHeaders["OC-Checksum"] = "MD5:" + md5
|
||||
extraHeaders["OC-Checksum"] = "MD5:" + md5
|
||||
}
|
||||
}
|
||||
}
|
||||
return extraHeaders
|
||||
}
|
||||
|
||||
// Standard update
|
||||
func (o *Object) updateSimple(ctx context.Context, in io.Reader, filePath string, size int64, contentType string, extraHeaders map[string]string, options ...fs.OpenOption) (err error) {
|
||||
var resp *http.Response
|
||||
opts := rest.Opts{
|
||||
Method: "PUT",
|
||||
Path: filePath,
|
||||
Body: in,
|
||||
NoResponse: true,
|
||||
ContentLength: &size, // FIXME this isn't necessary with owncloud - See https://github.com/nextcloud/nextcloud-snap/issues/365
|
||||
ContentType: contentType,
|
||||
Options: options,
|
||||
ExtraHeaders: extraHeaders,
|
||||
}
|
||||
err = o.fs.pacer.CallNoRetry(func() (bool, error) {
|
||||
resp, err = o.fs.srv.Call(ctx, &opts)
|
||||
return o.fs.shouldRetry(ctx, resp, err)
|
||||
|
@ -1307,9 +1350,87 @@ func (o *Object) Update(ctx context.Context, in io.Reader, src fs.ObjectInfo, op
|
|||
_ = o.Remove(ctx)
|
||||
return err
|
||||
}
|
||||
// read metadata from remote
|
||||
o.hasMetaData = false
|
||||
return o.readMetaData(ctx)
|
||||
return nil
|
||||
|
||||
}
|
||||
|
||||
// Chunked update for Nextcloud (see
|
||||
// https://docs.nextcloud.com/server/20/developer_manual/client_apis/WebDAV/chunking.html)
|
||||
func (o *Object) updateChunked(ctx context.Context, in io.Reader, src fs.ObjectInfo, options ...fs.OpenOption) (err error) {
|
||||
hasher := md5.New()
|
||||
_, err = hasher.Write([]byte(o.filePath()))
|
||||
if err != nil {
|
||||
return errors.Wrap(err, "Chunked upload couldn't hash URL")
|
||||
}
|
||||
uploadDir := "chunked-upload-" + hex.EncodeToString(hasher.Sum(nil))
|
||||
fs.Debugf(src, "Starting multipart upload to temp dir %q", uploadDir)
|
||||
|
||||
saveRoot := o.fs.root
|
||||
saveRemote := o.remote
|
||||
o.fs.root = "/"
|
||||
o.fs.srv.SetRoot(strings.Replace(o.fs.endpointURL, "files", "uploads", 1))
|
||||
|
||||
err = o.fs.mkdir(ctx, uploadDir)
|
||||
if err != nil {
|
||||
return errors.Wrap(err, "Making upload directory failed")
|
||||
}
|
||||
defer atexit.OnError(&err, func() {
|
||||
// Try to abort the upload, but ignore the error.
|
||||
fs.Debugf(src, "Cancelling chunked upload")
|
||||
_ = o.fs.pacer.Call(func() (bool, error) {
|
||||
err := o.fs.Purge(ctx, uploadDir)
|
||||
return o.fs.shouldRetry(ctx, nil, err)
|
||||
})
|
||||
})()
|
||||
|
||||
size := src.Size()
|
||||
var uploadedSize int64 = 0
|
||||
for uploadedSize < size {
|
||||
o.fs.root = uploadDir
|
||||
// Upload chunk
|
||||
contentLength := int64(o.fs.opt.ChunkSize)
|
||||
if size-uploadedSize < contentLength {
|
||||
contentLength = size - uploadedSize
|
||||
}
|
||||
o.remote = fmt.Sprintf("%015d", uploadedSize)
|
||||
chunkPath := o.filePath()
|
||||
extraHeaders := map[string]string{}
|
||||
err = o.updateSimple(ctx, io.LimitReader(in, int64(o.fs.opt.ChunkSize)), chunkPath, contentLength, "", extraHeaders, options...)
|
||||
if err != nil {
|
||||
return errors.Wrap(err, "Uploading chunk failed")
|
||||
}
|
||||
uploadedSize += contentLength
|
||||
}
|
||||
|
||||
// Finish
|
||||
o.fs.root = saveRoot
|
||||
o.remote = saveRemote
|
||||
var resp *http.Response
|
||||
opts := rest.Opts{
|
||||
Method: "MOVE",
|
||||
Path: rest.URLPathEscape(path.Join(uploadDir, ".file")),
|
||||
NoResponse: true,
|
||||
Options: options,
|
||||
}
|
||||
destinationURL, err := rest.URLJoin(o.fs.endpoint, o.filePath())
|
||||
if err != nil {
|
||||
return errors.Wrap(err, "Finalize chunked upload couldn't join URL")
|
||||
}
|
||||
opts.ExtraHeaders = o.extraHeaders(ctx, src)
|
||||
opts.ExtraHeaders["Destination"] = destinationURL.String()
|
||||
err = o.fs.pacer.CallNoRetry(func() (bool, error) {
|
||||
resp, err = o.fs.srv.Call(ctx, &opts)
|
||||
return o.fs.shouldRetry(ctx, resp, err)
|
||||
})
|
||||
if err != nil {
|
||||
return errors.Wrap(err, "Finalize chunked upload failed")
|
||||
}
|
||||
|
||||
o.remote = saveRemote
|
||||
o.fs.root = saveRoot
|
||||
o.fs.srv.SetRoot(o.fs.endpointURL)
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// Remove an object
|
||||
|
|
Loading…
Add table
Reference in a new issue