Add vendor Infinite Scale with TUS upload support
This commit is contained in:
parent
ca8860177e
commit
7c3db32502
5 changed files with 446 additions and 14 deletions
28
backend/webdav/tus-errors.go
Normal file
28
backend/webdav/tus-errors.go
Normal file
|
@ -0,0 +1,28 @@
|
|||
package webdav
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"fmt"
|
||||
)
|
||||
|
||||
var (
|
||||
ErrChuckSize = errors.New("tus chunk size must be greater than zero")
|
||||
ErrNilLogger = errors.New("tus logger can't be nil")
|
||||
ErrNilStore = errors.New("tus store can't be nil if Resume is enable")
|
||||
ErrNilUpload = errors.New("tus upload can't be nil")
|
||||
ErrLargeUpload = errors.New("tus upload body is to large")
|
||||
ErrVersionMismatch = errors.New("tus protocol version mismatch")
|
||||
ErrOffsetMismatch = errors.New("tus upload offset mismatch")
|
||||
ErrUploadNotFound = errors.New("tus upload not found")
|
||||
ErrResumeNotEnabled = errors.New("tus resuming not enabled")
|
||||
ErrFingerprintNotSet = errors.New("tus fingerprint not set")
|
||||
)
|
||||
|
||||
type ClientError struct {
|
||||
Code int
|
||||
Body []byte
|
||||
}
|
||||
|
||||
func (c ClientError) Error() string {
|
||||
return fmt.Sprintf("unexpected status code: %d", c.Code)
|
||||
}
|
83
backend/webdav/tus-upload.go
Normal file
83
backend/webdav/tus-upload.go
Normal file
|
@ -0,0 +1,83 @@
|
|||
package webdav
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"encoding/base64"
|
||||
"fmt"
|
||||
"io"
|
||||
"strings"
|
||||
)
|
||||
|
||||
type Metadata map[string]string
|
||||
|
||||
type Upload struct {
|
||||
stream io.ReadSeeker
|
||||
size int64
|
||||
offset int64
|
||||
|
||||
Fingerprint string
|
||||
Metadata Metadata
|
||||
}
|
||||
|
||||
// Updates the Upload information based on offset.
|
||||
func (u *Upload) updateProgress(offset int64) {
|
||||
u.offset = offset
|
||||
}
|
||||
|
||||
// Returns whether this upload is finished or not.
|
||||
func (u *Upload) Finished() bool {
|
||||
return u.offset >= u.size
|
||||
}
|
||||
|
||||
// Returns the progress in a percentage.
|
||||
func (u *Upload) Progress() int64 {
|
||||
return (u.offset * 100) / u.size
|
||||
}
|
||||
|
||||
// Returns the current upload offset.
|
||||
func (u *Upload) Offset() int64 {
|
||||
return u.offset
|
||||
}
|
||||
|
||||
// Returns the size of the upload body.
|
||||
func (u *Upload) Size() int64 {
|
||||
return u.size
|
||||
}
|
||||
|
||||
// EncodedMetadata encodes the upload metadata.
|
||||
func (u *Upload) EncodedMetadata() string {
|
||||
var encoded []string
|
||||
|
||||
for k, v := range u.Metadata {
|
||||
encoded = append(encoded, fmt.Sprintf("%s %s", k, b64encode(v)))
|
||||
}
|
||||
|
||||
return strings.Join(encoded, ",")
|
||||
}
|
||||
|
||||
func b64encode(s string) string {
|
||||
return base64.StdEncoding.EncodeToString([]byte(s))
|
||||
}
|
||||
|
||||
// NewUpload creates a new upload from an io.Reader.
|
||||
func NewUpload(reader io.Reader, size int64, metadata Metadata, fingerprint string) *Upload {
|
||||
stream, ok := reader.(io.ReadSeeker)
|
||||
|
||||
if !ok {
|
||||
buf := new(bytes.Buffer)
|
||||
buf.ReadFrom(reader)
|
||||
stream = bytes.NewReader(buf.Bytes())
|
||||
}
|
||||
|
||||
if metadata == nil {
|
||||
metadata = make(Metadata)
|
||||
}
|
||||
|
||||
return &Upload{
|
||||
stream: stream,
|
||||
size: size,
|
||||
|
||||
Fingerprint: fingerprint,
|
||||
Metadata: metadata,
|
||||
}
|
||||
}
|
186
backend/webdav/tus-uploader.go
Normal file
186
backend/webdav/tus-uploader.go
Normal file
|
@ -0,0 +1,186 @@
|
|||
package webdav
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"fmt"
|
||||
"io"
|
||||
"net/http"
|
||||
"net/url"
|
||||
"strconv"
|
||||
|
||||
"github.com/rclone/rclone/fs"
|
||||
"github.com/rclone/rclone/lib/rest"
|
||||
)
|
||||
|
||||
type Uploader struct {
|
||||
fs *Fs
|
||||
url string
|
||||
upload *Upload
|
||||
offset int64
|
||||
aborted bool
|
||||
uploadSubs []chan Upload
|
||||
notifyChan chan bool
|
||||
overridePatchMethod bool
|
||||
}
|
||||
|
||||
// Subscribes to progress updates.
|
||||
func (u *Uploader) NotifyUploadProgress(c chan Upload) {
|
||||
u.uploadSubs = append(u.uploadSubs, c)
|
||||
}
|
||||
|
||||
func (f *Fs) shouldRetryChunk(ctx context.Context, resp *http.Response, err error, newOff *int64) (bool, error) {
|
||||
|
||||
switch resp.StatusCode {
|
||||
case 204:
|
||||
if off, err := strconv.ParseInt(resp.Header.Get("Upload-Offset"), 10, 64); err == nil {
|
||||
*newOff = off
|
||||
return false, nil
|
||||
} else {
|
||||
return false, err
|
||||
}
|
||||
case 409:
|
||||
return false, ErrOffsetMismatch
|
||||
case 412:
|
||||
return false, ErrVersionMismatch
|
||||
case 413:
|
||||
return false, ErrLargeUpload
|
||||
}
|
||||
|
||||
return f.shouldRetry(ctx, resp, err)
|
||||
}
|
||||
|
||||
func (u *Uploader) uploadChunck(ctx context.Context, body io.Reader, size int64, offset int64) (int64, error) {
|
||||
var method string
|
||||
|
||||
if !u.overridePatchMethod {
|
||||
method = "PATCH"
|
||||
} else {
|
||||
method = "POST"
|
||||
}
|
||||
|
||||
extraHeaders := map[string]string{} // FIXME: Use extraHeaders(ctx, src) from Object maybe?
|
||||
extraHeaders["Upload-Offset"] = strconv.FormatInt(offset, 10)
|
||||
extraHeaders["Tus-Resumable"] = "1.0.0"
|
||||
extraHeaders["filetype"] = u.upload.Metadata["filetype"]
|
||||
if u.overridePatchMethod {
|
||||
extraHeaders["X-HTTP-Method-Override"] = "PATCH"
|
||||
}
|
||||
|
||||
url, err := url.Parse(u.url)
|
||||
if err != nil {
|
||||
return 0, fmt.Errorf("upload Chunk failed, could not parse url")
|
||||
}
|
||||
|
||||
// FIXME: Use GetBody func as in chunking.go
|
||||
opts := rest.Opts{
|
||||
Method: method,
|
||||
Path: url.Path,
|
||||
NoResponse: true,
|
||||
RootURL: fmt.Sprintf("%s://%s", url.Scheme, url.Host),
|
||||
ContentLength: &size,
|
||||
Body: body,
|
||||
ContentType: "application/offset+octet-stream",
|
||||
ExtraHeaders: extraHeaders,
|
||||
}
|
||||
|
||||
var newOffset int64
|
||||
|
||||
err = u.fs.pacer.CallNoRetry(func() (bool, error) {
|
||||
res, err := u.fs.srv.Call(ctx, &opts)
|
||||
return u.fs.shouldRetryChunk(ctx, res, err, &newOffset)
|
||||
})
|
||||
if err != nil {
|
||||
return 0, fmt.Errorf("uploadChunk failed: %w", err)
|
||||
// FIXME What do we do here? Remove the entire upload?
|
||||
// See https://github.com/tus/tusd/issues/176
|
||||
}
|
||||
|
||||
return newOffset, nil
|
||||
}
|
||||
|
||||
// Upload uploads the entire body to the server.
|
||||
func (u *Uploader) Upload(ctx context.Context) error {
|
||||
var cnt int = 1
|
||||
|
||||
fs.Debug(u.fs, "Uploaded starts")
|
||||
for u.offset < u.upload.size && !u.aborted {
|
||||
err := u.UploadChunck(ctx, cnt)
|
||||
cnt++
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
fs.Debug(u.fs, "-- Uploaded finished")
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// UploadChunck uploads a single chunck.
|
||||
func (u *Uploader) UploadChunck(ctx context.Context, cnt int) error {
|
||||
chunkSize := u.fs.opt.ChunkSize
|
||||
data := make([]byte, chunkSize)
|
||||
|
||||
_, err := u.upload.stream.Seek(u.offset, 0)
|
||||
|
||||
if err != nil {
|
||||
fs.Errorf(u.fs, "Chunk %d: Error seek in stream failed: %v", cnt, err)
|
||||
return err
|
||||
}
|
||||
|
||||
size, err := u.upload.stream.Read(data)
|
||||
|
||||
if err != nil {
|
||||
fs.Errorf(u.fs, "Chunk %d: Error: Can not read from data strem: %v", cnt, err)
|
||||
return err
|
||||
}
|
||||
|
||||
body := bytes.NewBuffer(data[:size])
|
||||
|
||||
newOffset, err := u.uploadChunck(ctx, body, int64(size), u.offset)
|
||||
|
||||
if err == nil {
|
||||
fs.Debugf(u.fs, "Uploaded chunk no %d ok, range %d -> %d", cnt, u.offset, newOffset)
|
||||
} else {
|
||||
fs.Errorf(u.fs, "Uploaded chunk no %d failed: %v", cnt, err)
|
||||
|
||||
return err
|
||||
}
|
||||
|
||||
u.offset = newOffset
|
||||
|
||||
u.upload.updateProgress(u.offset)
|
||||
|
||||
u.notifyChan <- true
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// Waits for a signal to broadcast to all subscribers
|
||||
func (u *Uploader) broadcastProgress() {
|
||||
for _ = range u.notifyChan {
|
||||
for _, c := range u.uploadSubs {
|
||||
c <- *u.upload
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// NewUploader creates a new Uploader.
|
||||
func NewUploader(f *Fs, url string, upload *Upload, offset int64) *Uploader {
|
||||
notifyChan := make(chan bool)
|
||||
|
||||
uploader := &Uploader{
|
||||
f,
|
||||
url,
|
||||
upload,
|
||||
offset,
|
||||
false,
|
||||
nil,
|
||||
notifyChan,
|
||||
false,
|
||||
}
|
||||
|
||||
go uploader.broadcastProgress()
|
||||
|
||||
return uploader
|
||||
}
|
110
backend/webdav/tus.go
Normal file
110
backend/webdav/tus.go
Normal file
|
@ -0,0 +1,110 @@
|
|||
package webdav
|
||||
|
||||
/*
|
||||
Chunked upload based on the tus protocol for ownCloud Infinite Scale
|
||||
See https://tus.io/protocols/resumable-upload
|
||||
*/
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"io"
|
||||
"net/http"
|
||||
"path/filepath"
|
||||
"strconv"
|
||||
|
||||
"github.com/rclone/rclone/fs"
|
||||
"github.com/rclone/rclone/lib/rest"
|
||||
)
|
||||
|
||||
// set the chunk size for testing
|
||||
func (f *Fs) setUploadTusSize(cs fs.SizeSuffix) (old fs.SizeSuffix, err error) {
|
||||
old, f.opt.ChunkSize = f.opt.ChunkSize, cs
|
||||
return
|
||||
}
|
||||
|
||||
func (o *Object) updateViaTus(ctx context.Context, in io.Reader, contentType string, src fs.ObjectInfo, options ...fs.OpenOption) (err error) {
|
||||
|
||||
fn := filepath.Base(src.Remote())
|
||||
metadata := map[string]string{
|
||||
"filename": fn,
|
||||
"mtime": strconv.FormatInt(src.ModTime(ctx).Unix(), 10),
|
||||
"filetype": contentType,
|
||||
}
|
||||
|
||||
// Fingerprint is used to identify the upload when resuming. That is not yet implemented
|
||||
fingerprint := ""
|
||||
|
||||
// create an upload from a file.
|
||||
upload := NewUpload(in, src.Size(), metadata, fingerprint)
|
||||
|
||||
// create the uploader.
|
||||
uploader, err := o.CreateUploader(ctx, upload)
|
||||
if err == nil {
|
||||
// start the uploading process.
|
||||
err = uploader.Upload(ctx)
|
||||
}
|
||||
|
||||
return err
|
||||
}
|
||||
|
||||
func (f *Fs) shouldRetryCreateUpload(ctx context.Context, resp *http.Response, err error) (bool, error) {
|
||||
|
||||
switch resp.StatusCode {
|
||||
case 201:
|
||||
location := resp.Header.Get("Location")
|
||||
f.chunksUploadURL = location
|
||||
return false, nil
|
||||
case 412:
|
||||
return false, ErrVersionMismatch
|
||||
case 413:
|
||||
return false, ErrLargeUpload
|
||||
}
|
||||
|
||||
return f.shouldRetry(ctx, resp, err)
|
||||
}
|
||||
|
||||
// CreateUpload creates a new upload in the server.
|
||||
func (o *Object) CreateUploader(ctx context.Context, u *Upload) (*Uploader, error) {
|
||||
if u == nil {
|
||||
return nil, ErrNilUpload
|
||||
}
|
||||
|
||||
// if c.Config.Resume && len(u.Fingerprint) == 0 {
|
||||
// return nil, ErrFingerprintNotSet
|
||||
// }
|
||||
|
||||
l := int64(0)
|
||||
p := o.filePath()
|
||||
// cut the filename off
|
||||
dir, _ := filepath.Split(p)
|
||||
if dir == "" {
|
||||
dir = "/"
|
||||
}
|
||||
|
||||
opts := rest.Opts{
|
||||
Method: "POST",
|
||||
Path: dir,
|
||||
NoResponse: true,
|
||||
RootURL: o.fs.endpointURL,
|
||||
ContentLength: &l,
|
||||
ExtraHeaders: o.extraHeaders(ctx, o),
|
||||
}
|
||||
opts.ExtraHeaders["Upload-Length"] = strconv.FormatInt(u.size, 10)
|
||||
opts.ExtraHeaders["Upload-Metadata"] = u.EncodedMetadata()
|
||||
opts.ExtraHeaders["Tus-Resumable"] = "1.0.0"
|
||||
// opts.ExtraHeaders["mtime"] = strconv.FormatInt(src.ModTime(ctx).Unix(), 10)
|
||||
|
||||
// rclone http call
|
||||
err := o.fs.pacer.CallNoRetry(func() (bool, error) {
|
||||
res, err := o.fs.srv.Call(ctx, &opts)
|
||||
return o.fs.shouldRetryCreateUpload(ctx, res, err)
|
||||
})
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("making upload directory failed: %w", err)
|
||||
}
|
||||
|
||||
uploader := NewUploader(o.fs, o.fs.chunksUploadURL, u, 0)
|
||||
|
||||
return uploader, nil
|
||||
}
|
|
@ -84,7 +84,10 @@ func init() {
|
|||
Help: "Nextcloud",
|
||||
}, {
|
||||
Value: "owncloud",
|
||||
Help: "Owncloud",
|
||||
Help: "Owncloud 10 PHP based WebDAV server",
|
||||
}, {
|
||||
Value: "InfiniteScale",
|
||||
Help: "ownCloud Infinite Scale",
|
||||
}, {
|
||||
Value: "sharepoint",
|
||||
Help: "Sharepoint Online, authenticated by Microsoft account",
|
||||
|
@ -612,6 +615,14 @@ func (f *Fs) setQuirks(ctx context.Context, vendor string) error {
|
|||
f.propsetMtime = true
|
||||
f.hasOCMD5 = true
|
||||
f.hasOCSHA1 = true
|
||||
case "Infinite Scale":
|
||||
f.precision = time.Second
|
||||
f.useOCMtime = true
|
||||
f.propsetMtime = true
|
||||
f.hasOCMD5 = false
|
||||
f.hasOCSHA1 = true
|
||||
f.canChunk = true
|
||||
f.opt.ChunkSize = 10 * fs.Mebi
|
||||
case "nextcloud":
|
||||
f.precision = time.Second
|
||||
f.useOCMtime = true
|
||||
|
@ -1478,24 +1489,38 @@ func (o *Object) Update(ctx context.Context, in io.Reader, src fs.ObjectInfo, op
|
|||
return fmt.Errorf("Update mkParentDir failed: %w", err)
|
||||
}
|
||||
|
||||
if o.shouldUseChunkedUpload(src) {
|
||||
fs.Debugf(src, "Update will use the chunked upload strategy")
|
||||
err = o.updateChunked(ctx, in, src, options...)
|
||||
if o.fs.opt.Vendor == "Infinite Scale" {
|
||||
// Infinite Scale always prefers tus for upload
|
||||
fs.Debugf(src, "Update will use the tus protocol to upload")
|
||||
contentType := fs.MimeType(ctx, src)
|
||||
err = o.updateViaTus(ctx, in, contentType, src, options...)
|
||||
if err != nil {
|
||||
return err
|
||||
fs.Debug(src, "tus update failed.")
|
||||
return fmt.Errorf("tus update failed: %w", err)
|
||||
}
|
||||
} else {
|
||||
fs.Debugf(src, "Update will use the normal upload strategy (no chunks)")
|
||||
contentType := fs.MimeType(ctx, src)
|
||||
filePath := o.filePath()
|
||||
extraHeaders := o.extraHeaders(ctx, src)
|
||||
// TODO: define getBody() to enable low-level HTTP/2 retries
|
||||
err = o.updateSimple(ctx, in, nil, filePath, src.Size(), contentType, extraHeaders, o.fs.endpointURL, options...)
|
||||
if err != nil {
|
||||
return err
|
||||
if o.shouldUseChunkedUpload(src) {
|
||||
if o.fs.opt.Vendor == "nextcloud" {
|
||||
fs.Debugf(src, "Update will use the chunked upload strategy")
|
||||
err = o.updateChunked(ctx, in, src, options...)
|
||||
} else {
|
||||
fs.Debug(src, "Chunking - unknown vendor")
|
||||
}
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
} else {
|
||||
fs.Debugf(src, "Update will use the normal upload strategy (no chunks)")
|
||||
contentType := fs.MimeType(ctx, src)
|
||||
filePath := o.filePath()
|
||||
extraHeaders := o.extraHeaders(ctx, src)
|
||||
// TODO: define getBody() to enable low-level HTTP/2 retries
|
||||
err = o.updateSimple(ctx, in, nil, filePath, src.Size(), contentType, extraHeaders, o.fs.endpointURL, options...)
|
||||
if err != nil {
|
||||
return fmt.Errorf("unchunked simple update failed: %w", err)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// read metadata from remote
|
||||
o.hasMetaData = false
|
||||
return o.readMetaData(ctx)
|
||||
|
|
Loading…
Reference in a new issue