forked from TrueCloudLab/rclone
backend: pcloud: Implement OpenWriterAt feature
This commit is contained in:
parent
258092f9c6
commit
4c1cb0622e
3 changed files with 315 additions and 20 deletions
|
@ -109,6 +109,37 @@ type Hashes struct {
|
|||
SHA256 string `json:"sha256"`
|
||||
}
|
||||
|
||||
// FileTruncateResponse is the response from /file_truncate
|
||||
type FileTruncateResponse struct {
|
||||
Error
|
||||
}
|
||||
|
||||
// FileCloseResponse is the response from /file_close
|
||||
type FileCloseResponse struct {
|
||||
Error
|
||||
}
|
||||
|
||||
// FileOpenResponse is the response from /file_open
|
||||
type FileOpenResponse struct {
|
||||
Error
|
||||
Fileid int64 `json:"fileid"`
|
||||
FileDescriptor int64 `json:"fd"`
|
||||
}
|
||||
|
||||
// FileChecksumResponse is the response from /file_checksum
|
||||
type FileChecksumResponse struct {
|
||||
Error
|
||||
MD5 string `json:"md5"`
|
||||
SHA1 string `json:"sha1"`
|
||||
SHA256 string `json:"sha256"`
|
||||
}
|
||||
|
||||
// FilePWriteResponse is the response from /file_pwrite
|
||||
type FilePWriteResponse struct {
|
||||
Error
|
||||
Bytes int64 `json:"bytes"`
|
||||
}
|
||||
|
||||
// UploadFileResponse is the response from /uploadfile
|
||||
type UploadFileResponse struct {
|
||||
Error
|
||||
|
|
|
@ -14,6 +14,7 @@ import (
|
|||
"net/http"
|
||||
"net/url"
|
||||
"path"
|
||||
"strconv"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
|
@ -146,7 +147,8 @@ we have to rely on user password authentication for it.`,
|
|||
Help: "Your pcloud password.",
|
||||
IsPassword: true,
|
||||
Advanced: true,
|
||||
}}...),
|
||||
},
|
||||
}...),
|
||||
})
|
||||
}
|
||||
|
||||
|
@ -161,15 +163,16 @@ type Options struct {
|
|||
|
||||
// Fs represents a remote pcloud
|
||||
type Fs struct {
|
||||
name string // name of this remote
|
||||
root string // the path we are working on
|
||||
opt Options // parsed options
|
||||
features *fs.Features // optional features
|
||||
srv *rest.Client // the connection to the server
|
||||
cleanupSrv *rest.Client // the connection used for the cleanup method
|
||||
dirCache *dircache.DirCache // Map of directory path to directory id
|
||||
pacer *fs.Pacer // pacer for API calls
|
||||
tokenRenewer *oauthutil.Renew // renew the token on expiry
|
||||
name string // name of this remote
|
||||
root string // the path we are working on
|
||||
opt Options // parsed options
|
||||
features *fs.Features // optional features
|
||||
ts *oauthutil.TokenSource // the token source, used to create new clients
|
||||
srv *rest.Client // the connection to the server
|
||||
cleanupSrv *rest.Client // the connection used for the cleanup method
|
||||
dirCache *dircache.DirCache // Map of directory path to directory id
|
||||
pacer *fs.Pacer // pacer for API calls
|
||||
tokenRenewer *oauthutil.Renew // renew the token on expiry
|
||||
}
|
||||
|
||||
// Object describes a pcloud object
|
||||
|
@ -317,6 +320,7 @@ func NewFs(ctx context.Context, name, root string, m configmap.Mapper) (fs.Fs, e
|
|||
name: name,
|
||||
root: root,
|
||||
opt: *opt,
|
||||
ts: ts,
|
||||
srv: rest.NewClient(oAuthClient).SetRoot("https://" + opt.Hostname),
|
||||
pacer: fs.NewPacer(ctx, pacer.NewDefault(pacer.MinSleep(minSleep), pacer.MaxSleep(maxSleep), pacer.DecayConstant(decayConstant))),
|
||||
}
|
||||
|
@ -326,6 +330,7 @@ func NewFs(ctx context.Context, name, root string, m configmap.Mapper) (fs.Fs, e
|
|||
f.features = (&fs.Features{
|
||||
CaseInsensitive: false,
|
||||
CanHaveEmptyDirectories: true,
|
||||
PartialUploads: true,
|
||||
}).Fill(ctx, f)
|
||||
if !canCleanup {
|
||||
f.features.CleanUp = nil
|
||||
|
@ -333,7 +338,7 @@ func NewFs(ctx context.Context, name, root string, m configmap.Mapper) (fs.Fs, e
|
|||
f.srv.SetErrorHandler(errorHandler)
|
||||
|
||||
// Renew the token in the background
|
||||
f.tokenRenewer = oauthutil.NewRenew(f.String(), ts, func() error {
|
||||
f.tokenRenewer = oauthutil.NewRenew(f.String(), f.ts, func() error {
|
||||
_, err := f.readMetaDataForPath(ctx, "")
|
||||
return err
|
||||
})
|
||||
|
@ -375,6 +380,56 @@ func NewFs(ctx context.Context, name, root string, m configmap.Mapper) (fs.Fs, e
|
|||
return f, nil
|
||||
}
|
||||
|
||||
// OpenWriterAt opens with a handle for random access writes
|
||||
//
|
||||
// Pass in the remote desired and the size if known.
|
||||
//
|
||||
// It truncates any existing object
|
||||
func (f *Fs) OpenWriterAt(ctx context.Context, remote string, size int64) (fs.WriterAtCloser, error) {
|
||||
client, err := f.newSingleConnClient(ctx)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("create client: %w", err)
|
||||
}
|
||||
// init an empty file
|
||||
leaf, directoryID, err := f.dirCache.FindPath(ctx, remote, true)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("resolve src: %w", err)
|
||||
}
|
||||
openResult, err := fileOpenNew(ctx, client, f, directoryID, leaf)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("open file: %w", err)
|
||||
}
|
||||
|
||||
writer := &writerAt{
|
||||
ctx: ctx,
|
||||
client: client,
|
||||
fs: f,
|
||||
size: size,
|
||||
remote: remote,
|
||||
fd: openResult.FileDescriptor,
|
||||
fileID: openResult.Fileid,
|
||||
}
|
||||
|
||||
return writer, nil
|
||||
}
|
||||
|
||||
// Create a new http client, accepting keep-alive headers, limited to single connection.
|
||||
// Necessary for pcloud fileops API, as it binds the session to the underlying TCP connection.
|
||||
// File descriptors are only valid within the same connection and auto-closed when the connection is closed,
|
||||
// hence we need a separate client (with single connection) for each fd to avoid all sorts of errors and race conditions.
|
||||
func (f *Fs) newSingleConnClient(ctx context.Context) (*rest.Client, error) {
|
||||
baseClient := fshttp.NewClient(ctx)
|
||||
baseClient.Transport = fshttp.NewTransportCustom(ctx, func(t *http.Transport) {
|
||||
t.MaxConnsPerHost = 1
|
||||
t.DisableKeepAlives = false
|
||||
})
|
||||
// Set our own http client in the context
|
||||
ctx = oauthutil.Context(ctx, baseClient)
|
||||
// create a new oauth client, re-use the token source
|
||||
oAuthClient := oauth2.NewClient(ctx, f.ts)
|
||||
return rest.NewClient(oAuthClient).SetRoot("https://" + f.opt.Hostname), nil
|
||||
}
|
||||
|
||||
// Return an Object from a path
|
||||
//
|
||||
// If it can't be found it returns the error fs.ErrorObjectNotFound.
|
||||
|
@ -1098,14 +1153,7 @@ func (o *Object) SetModTime(ctx context.Context, modTime time.Time) error {
|
|||
if err != nil {
|
||||
return err
|
||||
}
|
||||
return o.setModTime(ctx, fileIDtoNumber(o.id), filename, directoryID, modTime)
|
||||
}
|
||||
|
||||
func (o *Object) setModTime(
|
||||
ctx context.Context,
|
||||
fileID, filename, directoryID string,
|
||||
modTime time.Time,
|
||||
) error {
|
||||
fileID := fileIDtoNumber(o.id)
|
||||
filename = o.fs.opt.Enc.FromStandardName(filename)
|
||||
opts := rest.Opts{
|
||||
Method: "PUT",
|
||||
|
@ -1124,7 +1172,7 @@ func (o *Object) setModTime(
|
|||
opts.Parameters.Set("mtime", strconv.FormatInt(modTime.Unix(), 10))
|
||||
|
||||
result := &api.ItemResult{}
|
||||
err := o.fs.pacer.CallNoRetry(func() (bool, error) {
|
||||
err = o.fs.pacer.CallNoRetry(func() (bool, error) {
|
||||
resp, err := o.fs.srv.CallJSON(ctx, &opts, nil, result)
|
||||
err = result.Error.Update(err)
|
||||
return shouldRetry(ctx, resp, err)
|
||||
|
|
216
backend/pcloud/writer_at.go
Normal file
216
backend/pcloud/writer_at.go
Normal file
|
@ -0,0 +1,216 @@
|
|||
package pcloud
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"crypto/sha1"
|
||||
"encoding/hex"
|
||||
"fmt"
|
||||
"net/url"
|
||||
"strconv"
|
||||
"time"
|
||||
|
||||
"github.com/rclone/rclone/backend/pcloud/api"
|
||||
"github.com/rclone/rclone/fs"
|
||||
"github.com/rclone/rclone/lib/rest"
|
||||
)
|
||||
|
||||
// writerAt implements fs.WriterAtCloser, adding the OpenWrtierAt feature to pcloud.
|
||||
type writerAt struct {
|
||||
ctx context.Context
|
||||
client *rest.Client
|
||||
fs *Fs
|
||||
size int64
|
||||
remote string
|
||||
fd int64
|
||||
fileID int64
|
||||
}
|
||||
|
||||
// Close implements WriterAt.Close.
|
||||
func (c *writerAt) Close() error {
|
||||
// close fd
|
||||
if _, err := c.fileClose(c.ctx); err != nil {
|
||||
return fmt.Errorf("close fd: %w", err)
|
||||
}
|
||||
|
||||
// Avoiding race conditions: Depending on the tcp connection, there might be
|
||||
// caching issues when checking the size immediately after write.
|
||||
// Hence we try avoiding them by checking the resulting size on a different connection.
|
||||
if c.size < 0 {
|
||||
// Without knowing the size, we cannot do size checks.
|
||||
// Falling back to a sleep of 1s for sake of hope.
|
||||
time.Sleep(1 * time.Second)
|
||||
return nil
|
||||
}
|
||||
sizeOk := false
|
||||
sizeLastSeen := int64(0)
|
||||
for retry := 0; retry < 5; retry++ {
|
||||
fs.Debugf(c.remote, "checking file size: try %d/5", retry)
|
||||
obj, err := c.fs.NewObject(c.ctx, c.remote)
|
||||
if err != nil {
|
||||
return fmt.Errorf("get uploaded obj: %w", err)
|
||||
}
|
||||
sizeLastSeen = obj.Size()
|
||||
if obj.Size() == c.size {
|
||||
sizeOk = true
|
||||
break
|
||||
}
|
||||
time.Sleep(1 * time.Second)
|
||||
}
|
||||
|
||||
if !sizeOk {
|
||||
return fmt.Errorf("incorrect size after upload: got %d, want %d", sizeLastSeen, c.size)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// WriteAt implements fs.WriteAt.
|
||||
func (c *writerAt) WriteAt(buffer []byte, offset int64) (n int, err error) {
|
||||
contentLength := len(buffer)
|
||||
|
||||
inSHA1Bytes := sha1.Sum(buffer)
|
||||
inSHA1 := hex.EncodeToString(inSHA1Bytes[:])
|
||||
|
||||
// get target hash
|
||||
outChecksum, err := c.fileChecksum(c.ctx, offset, int64(contentLength))
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
outSHA1 := outChecksum.SHA1
|
||||
|
||||
if outSHA1 == "" || inSHA1 == "" {
|
||||
return 0, fmt.Errorf("expect both hashes to be filled: src: %q, target: %q", inSHA1, outSHA1)
|
||||
}
|
||||
|
||||
// check hash of buffer, skip if fits
|
||||
if inSHA1 == outSHA1 {
|
||||
return contentLength, nil
|
||||
}
|
||||
|
||||
// upload buffer with offset if necessary
|
||||
if _, err := c.filePWrite(c.ctx, offset, buffer); err != nil {
|
||||
return 0, err
|
||||
}
|
||||
|
||||
return contentLength, nil
|
||||
}
|
||||
|
||||
// Call pcloud file_open using folderid and name with O_CREAT and O_WRITE flags, see [API Doc.]
|
||||
// [API Doc]: https://docs.pcloud.com/methods/fileops/file_open.html
|
||||
func fileOpenNew(ctx context.Context, c *rest.Client, srcFs *Fs, directoryID, filename string) (*api.FileOpenResponse, error) {
|
||||
opts := rest.Opts{
|
||||
Method: "PUT",
|
||||
Path: "/file_open",
|
||||
Parameters: url.Values{},
|
||||
TransferEncoding: []string{"identity"}, // pcloud doesn't like chunked encoding
|
||||
ExtraHeaders: map[string]string{
|
||||
"Connection": "keep-alive",
|
||||
},
|
||||
}
|
||||
filename = srcFs.opt.Enc.FromStandardName(filename)
|
||||
opts.Parameters.Set("name", filename)
|
||||
opts.Parameters.Set("folderid", dirIDtoNumber(directoryID))
|
||||
opts.Parameters.Set("flags", "0x0042") // O_CREAT, O_WRITE
|
||||
|
||||
result := &api.FileOpenResponse{}
|
||||
err := srcFs.pacer.CallNoRetry(func() (bool, error) {
|
||||
resp, err := c.CallJSON(ctx, &opts, nil, result)
|
||||
err = result.Error.Update(err)
|
||||
return shouldRetry(ctx, resp, err)
|
||||
})
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("open new file descriptor: %w", err)
|
||||
}
|
||||
return result, nil
|
||||
}
|
||||
|
||||
// Call pcloud file_checksum, see [API Doc.]
|
||||
// [API Doc]: https://docs.pcloud.com/methods/fileops/file_checksum.html
|
||||
func (c *writerAt) fileChecksum(
|
||||
ctx context.Context,
|
||||
offset, count int64,
|
||||
) (*api.FileChecksumResponse, error) {
|
||||
opts := rest.Opts{
|
||||
Method: "PUT",
|
||||
Path: "/file_checksum",
|
||||
Parameters: url.Values{},
|
||||
TransferEncoding: []string{"identity"}, // pcloud doesn't like chunked encoding
|
||||
ExtraHeaders: map[string]string{
|
||||
"Connection": "keep-alive",
|
||||
},
|
||||
}
|
||||
opts.Parameters.Set("fd", strconv.FormatInt(c.fd, 10))
|
||||
opts.Parameters.Set("offset", strconv.FormatInt(offset, 10))
|
||||
opts.Parameters.Set("count", strconv.FormatInt(count, 10))
|
||||
|
||||
result := &api.FileChecksumResponse{}
|
||||
err := c.fs.pacer.CallNoRetry(func() (bool, error) {
|
||||
resp, err := c.client.CallJSON(ctx, &opts, nil, result)
|
||||
err = result.Error.Update(err)
|
||||
return shouldRetry(ctx, resp, err)
|
||||
})
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("checksum of fd %d with offset %d and size %d: %w", c.fd, offset, count, err)
|
||||
}
|
||||
return result, nil
|
||||
}
|
||||
|
||||
// Call pcloud file_pwrite, see [API Doc.]
|
||||
// [API Doc]: https://docs.pcloud.com/methods/fileops/file_pwrite.html
|
||||
func (c *writerAt) filePWrite(
|
||||
ctx context.Context,
|
||||
offset int64,
|
||||
buf []byte,
|
||||
) (*api.FilePWriteResponse, error) {
|
||||
contentLength := int64(len(buf))
|
||||
opts := rest.Opts{
|
||||
Method: "PUT",
|
||||
Path: "/file_pwrite",
|
||||
Body: bytes.NewReader(buf),
|
||||
ContentLength: &contentLength,
|
||||
Parameters: url.Values{},
|
||||
TransferEncoding: []string{"identity"}, // pcloud doesn't like chunked encoding
|
||||
Close: false,
|
||||
ExtraHeaders: map[string]string{
|
||||
"Connection": "keep-alive",
|
||||
},
|
||||
}
|
||||
opts.Parameters.Set("fd", strconv.FormatInt(c.fd, 10))
|
||||
opts.Parameters.Set("offset", strconv.FormatInt(offset, 10))
|
||||
|
||||
result := &api.FilePWriteResponse{}
|
||||
err := c.fs.pacer.CallNoRetry(func() (bool, error) {
|
||||
resp, err := c.client.CallJSON(ctx, &opts, nil, result)
|
||||
err = result.Error.Update(err)
|
||||
return shouldRetry(ctx, resp, err)
|
||||
})
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("write %d bytes to fd %d with offset %d: %w", contentLength, c.fd, offset, err)
|
||||
}
|
||||
return result, nil
|
||||
}
|
||||
|
||||
// Call pcloud file_close, see [API Doc.]
|
||||
// [API Doc]: https://docs.pcloud.com/methods/fileops/file_close.html
|
||||
func (c *writerAt) fileClose(ctx context.Context) (*api.FileCloseResponse, error) {
|
||||
opts := rest.Opts{
|
||||
Method: "PUT",
|
||||
Path: "/file_close",
|
||||
Parameters: url.Values{},
|
||||
TransferEncoding: []string{"identity"}, // pcloud doesn't like chunked encoding
|
||||
Close: true,
|
||||
}
|
||||
opts.Parameters.Set("fd", strconv.FormatInt(c.fd, 10))
|
||||
|
||||
result := &api.FileCloseResponse{}
|
||||
err := c.fs.pacer.CallNoRetry(func() (bool, error) {
|
||||
resp, err := c.client.CallJSON(ctx, &opts, nil, result)
|
||||
err = result.Error.Update(err)
|
||||
return shouldRetry(ctx, resp, err)
|
||||
})
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("close file descriptor: %w", err)
|
||||
}
|
||||
return result, nil
|
||||
}
|
Loading…
Reference in a new issue