zoho: switch to large file upload API for larger files, fix missing URL encoding of filenames for the upload API
This commit is contained in:
parent
7daed30754
commit
c172742cef
3 changed files with 147 additions and 32 deletions
|
@ -180,11 +180,38 @@ func (ui *UploadInfo) GetUploadFileInfo() (*UploadFileInfo, error) {
|
|||
return &ufi, nil
|
||||
}
|
||||
|
||||
// LargeUploadInfo is once again a slightly different version of UploadInfo
|
||||
// returned as part of an LargeUploadResponse by the large file upload API.
|
||||
type LargeUploadInfo struct {
|
||||
Attributes struct {
|
||||
ParentID string `json:"parent_id"`
|
||||
FileName string `json:"file_name"`
|
||||
RessourceID string `json:"resource_id"`
|
||||
FileInfo string `json:"file_info"`
|
||||
} `json:"attributes"`
|
||||
}
|
||||
|
||||
// GetUploadFileInfo decodes the embedded FileInfo
|
||||
func (ui *LargeUploadInfo) GetUploadFileInfo() (*UploadFileInfo, error) {
|
||||
var ufi UploadFileInfo
|
||||
err := json.Unmarshal([]byte(ui.Attributes.FileInfo), &ufi)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to decode FileInfo: %w", err)
|
||||
}
|
||||
return &ufi, nil
|
||||
}
|
||||
|
||||
// UploadResponse is the response to a file Upload
|
||||
type UploadResponse struct {
|
||||
Uploads []UploadInfo `json:"data"`
|
||||
}
|
||||
|
||||
// LargeUploadResponse is the response returned by large file upload API.
|
||||
type LargeUploadResponse struct {
|
||||
Uploads []LargeUploadInfo `json:"data"`
|
||||
Status string `json:"status"`
|
||||
}
|
||||
|
||||
// WriteMetadataRequest is used to write metadata for a
|
||||
// single item
|
||||
type WriteMetadataRequest struct {
|
||||
|
|
|
@ -14,6 +14,7 @@ import (
|
|||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/google/uuid"
|
||||
"github.com/rclone/rclone/lib/encoder"
|
||||
"github.com/rclone/rclone/lib/pacer"
|
||||
"github.com/rclone/rclone/lib/random"
|
||||
|
@ -63,6 +64,7 @@ var (
|
|||
}
|
||||
rootURL = "https://workdrive.zoho.eu/api/v1"
|
||||
downloadURL = "https://download.zoho.eu/v1/workdrive"
|
||||
uploadURL = "http://upload.zoho.eu/workdrive-api/v1/"
|
||||
accountsURL = "https://accounts.zoho.eu"
|
||||
)
|
||||
|
||||
|
@ -202,14 +204,15 @@ type Options struct {
|
|||
|
||||
// Fs represents a remote workdrive
|
||||
type Fs struct {
|
||||
name string // name of this remote
|
||||
root string // the path we are working on
|
||||
opt Options // parsed options
|
||||
features *fs.Features // optional features
|
||||
srv *rest.Client // the connection to the server
|
||||
downloadsrv *rest.Client // the connection to the Download server
|
||||
dirCache *dircache.DirCache // Map of directory path to directory id
|
||||
pacer *fs.Pacer // pacer for API calls
|
||||
name string // name of this remote
|
||||
root string // the path we are working on
|
||||
opt Options // parsed options
|
||||
features *fs.Features // optional features
|
||||
srv *rest.Client // the connection to the server
|
||||
downloadsrv *rest.Client // the connection to the download server
|
||||
uploadsrv *rest.Client // the connection to the upload server
|
||||
dirCache *dircache.DirCache // Map of directory path to directory id
|
||||
pacer *fs.Pacer // pacer for API calls
|
||||
}
|
||||
|
||||
// Object describes a Zoho WorkDrive object
|
||||
|
@ -232,7 +235,8 @@ func setupRegion(m configmap.Mapper) error {
|
|||
return errors.New("no region set")
|
||||
}
|
||||
rootURL = fmt.Sprintf("https://workdrive.zoho.%s/api/v1", region)
|
||||
downloadURL = fmt.Sprintf("https://download.zoho.%s/v1/workdrive",region)
|
||||
downloadURL = fmt.Sprintf("https://download.zoho.%s/v1/workdrive", region)
|
||||
uploadURL = fmt.Sprintf("https://upload.zoho.%s/workdrive-api/v1", region)
|
||||
accountsURL = fmt.Sprintf("https://accounts.zoho.%s", region)
|
||||
oauthConfig.Endpoint.AuthURL = fmt.Sprintf("https://accounts.zoho.%s/oauth/v2/auth", region)
|
||||
oauthConfig.Endpoint.TokenURL = fmt.Sprintf("https://accounts.zoho.%s/oauth/v2/token", region)
|
||||
|
@ -405,12 +409,13 @@ func NewFs(ctx context.Context, name, root string, m configmap.Mapper) (fs.Fs, e
|
|||
}
|
||||
|
||||
f := &Fs{
|
||||
name: name,
|
||||
root: root,
|
||||
opt: *opt,
|
||||
srv: rest.NewClient(oAuthClient).SetRoot(rootURL),
|
||||
downloadsrv: rest.NewClient(oAuthClient).SetRoot(downloadURL),
|
||||
pacer: fs.NewPacer(ctx, pacer.NewDefault(pacer.MinSleep(minSleep), pacer.MaxSleep(maxSleep), pacer.DecayConstant(decayConstant))),
|
||||
name: name,
|
||||
root: root,
|
||||
opt: *opt,
|
||||
srv: rest.NewClient(oAuthClient).SetRoot(rootURL),
|
||||
downloadsrv: rest.NewClient(oAuthClient).SetRoot(downloadURL),
|
||||
uploadsrv: rest.NewClient(oAuthClient).SetRoot(uploadURL),
|
||||
pacer: fs.NewPacer(ctx, pacer.NewDefault(pacer.MinSleep(minSleep), pacer.MaxSleep(maxSleep), pacer.DecayConstant(decayConstant))),
|
||||
}
|
||||
f.features = (&fs.Features{
|
||||
CanHaveEmptyDirectories: true,
|
||||
|
@ -648,9 +653,61 @@ func (f *Fs) createObject(ctx context.Context, remote string, size int64, modTim
|
|||
return
|
||||
}
|
||||
|
||||
func (f *Fs) uploadLargeFile(ctx context.Context, name string, parent string, size int64, in io.Reader, options ...fs.OpenOption) (*api.Item, error) {
|
||||
opts := rest.Opts{
|
||||
Method: "POST",
|
||||
Path: "/stream/upload",
|
||||
Body: in,
|
||||
ContentLength: &size,
|
||||
ContentType: "application/octet-stream",
|
||||
Options: options,
|
||||
ExtraHeaders: map[string]string{
|
||||
"x-filename": url.QueryEscape(name),
|
||||
"x-parent_id": parent,
|
||||
"override-name-exist": "true",
|
||||
"upload-id": uuid.New().String(),
|
||||
"x-streammode": "1",
|
||||
},
|
||||
}
|
||||
|
||||
var err error
|
||||
var resp *http.Response
|
||||
var uploadResponse *api.LargeUploadResponse
|
||||
err = f.pacer.CallNoRetry(func() (bool, error) {
|
||||
resp, err = f.uploadsrv.CallJSON(ctx, &opts, nil, &uploadResponse)
|
||||
return shouldRetry(ctx, resp, err)
|
||||
})
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("upload large error: %v", err)
|
||||
}
|
||||
if len(uploadResponse.Uploads) != 1 {
|
||||
return nil, errors.New("upload: invalid response")
|
||||
}
|
||||
upload := uploadResponse.Uploads[0]
|
||||
uploadInfo, err := upload.GetUploadFileInfo()
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("upload error: %w", err)
|
||||
}
|
||||
|
||||
// Fill in the api.Item from the api.UploadFileInfo
|
||||
var info api.Item
|
||||
info.ID = upload.Attributes.RessourceID
|
||||
info.Attributes.Name = upload.Attributes.FileName
|
||||
// info.Attributes.Type = not used
|
||||
info.Attributes.IsFolder = false
|
||||
// info.Attributes.CreatedTime = not used
|
||||
info.Attributes.ModifiedTime = uploadInfo.GetModTime()
|
||||
// info.Attributes.UploadedTime = 0 not used
|
||||
info.Attributes.StorageInfo.Size = uploadInfo.Size
|
||||
info.Attributes.StorageInfo.FileCount = 0
|
||||
info.Attributes.StorageInfo.FolderCount = 0
|
||||
|
||||
return &info, nil
|
||||
}
|
||||
|
||||
func (f *Fs) upload(ctx context.Context, name string, parent string, size int64, in io.Reader, options ...fs.OpenOption) (*api.Item, error) {
|
||||
params := url.Values{}
|
||||
params.Set("filename", name)
|
||||
params.Set("filename", url.QueryEscape(name))
|
||||
params.Set("parent_id", parent)
|
||||
params.Set("override-name-exist", strconv.FormatBool(true))
|
||||
formReader, contentType, overhead, err := rest.MultipartUpload(ctx, in, nil, "content", name)
|
||||
|
@ -710,21 +767,40 @@ func (f *Fs) upload(ctx context.Context, name string, parent string, size int64,
|
|||
//
|
||||
// The new object may have been created if an error is returned
|
||||
func (f *Fs) Put(ctx context.Context, in io.Reader, src fs.ObjectInfo, options ...fs.OpenOption) (fs.Object, error) {
|
||||
size := src.Size()
|
||||
remote := src.Remote()
|
||||
existingObj, err := f.NewObject(ctx, src.Remote())
|
||||
switch err {
|
||||
case nil:
|
||||
return existingObj, existingObj.Update(ctx, in, src, options...)
|
||||
case fs.ErrorObjectNotFound:
|
||||
size := src.Size()
|
||||
remote := src.Remote()
|
||||
|
||||
// Create the directory for the object if it doesn't exist
|
||||
leaf, directoryID, err := f.dirCache.FindPath(ctx, remote, true)
|
||||
if err != nil {
|
||||
// Create the directory for the object if it doesn't exist
|
||||
leaf, directoryID, err := f.dirCache.FindPath(ctx, remote, true)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// use normal upload API for small sizes (<10MiB)
|
||||
if size < 10*1024*1024 {
|
||||
info, err := f.upload(ctx, f.opt.Enc.FromStandardName(leaf), directoryID, size, in, options...)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return f.newObjectWithInfo(ctx, remote, info)
|
||||
}
|
||||
|
||||
// large file API otherwise
|
||||
info, err := f.uploadLargeFile(ctx, f.opt.Enc.FromStandardName(leaf), directoryID, size, in, options...)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return f.newObjectWithInfo(ctx, remote, info)
|
||||
default:
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// Upload the file
|
||||
info, err := f.upload(ctx, f.opt.Enc.FromStandardName(leaf), directoryID, size, in, options...)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return f.newObjectWithInfo(ctx, remote, info)
|
||||
}
|
||||
|
||||
// Mkdir creates the container if it doesn't exist
|
||||
|
@ -1188,11 +1264,22 @@ func (o *Object) Update(ctx context.Context, in io.Reader, src fs.ObjectInfo, op
|
|||
return err
|
||||
}
|
||||
|
||||
// Overwrite the old file
|
||||
info, err := o.fs.upload(ctx, o.fs.opt.Enc.FromStandardName(leaf), directoryID, size, in, options...)
|
||||
// use normal upload API for small sizes (<10MiB)
|
||||
if size < 10*1024*1024 {
|
||||
info, err := o.fs.upload(ctx, o.fs.opt.Enc.FromStandardName(leaf), directoryID, size, in, options...)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return o.setMetaData(info)
|
||||
}
|
||||
|
||||
// large file API otherwise
|
||||
info, err := o.fs.uploadLargeFile(ctx, o.fs.opt.Enc.FromStandardName(leaf), directoryID, size, in, options...)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return o.setMetaData(info)
|
||||
}
|
||||
|
||||
|
|
|
@ -11,7 +11,8 @@ import (
|
|||
// TestIntegration runs integration tests against the remote
|
||||
func TestIntegration(t *testing.T) {
|
||||
fstests.Run(t, &fstests.Opt{
|
||||
RemoteName: "TestZoho:",
|
||||
NilObject: (*zoho.Object)(nil),
|
||||
RemoteName: "TestZoho:",
|
||||
SkipInvalidUTF8: true,
|
||||
NilObject: (*zoho.Object)(nil),
|
||||
})
|
||||
}
|
||||
|
|
Loading…
Reference in a new issue