pikpak: improve upload reliability and resolve potential file conflicts

This attempts to resolve upload conflicts by implementing cancel/cleanup on failed
uploads

* fix panic error on defer cancel upload
* increase pacer min sleep from 10 to 100 ms
* stop using uploadByForm()
* introduce force sleep before and after async tasks
* use pacer's retry scheme instead of manual implementation

Fixes #7787
This commit is contained in:
wiserain 2024-06-11 02:08:07 +09:00 committed by GitHub
parent d2af114139
commit ce5024bf33
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
3 changed files with 87 additions and 36 deletions

View file

@ -251,10 +251,12 @@ type Media struct {
// FileParams includes parameters for instant open // FileParams includes parameters for instant open
type FileParams struct { type FileParams struct {
DeviceID string `json:"device_id,omitempty"`
Duration int64 `json:"duration,omitempty,string"` // in seconds Duration int64 `json:"duration,omitempty,string"` // in seconds
Height int `json:"height,omitempty,string"` Height int `json:"height,omitempty,string"`
Platform string `json:"platform,omitempty"` // "Upload" Platform string `json:"platform,omitempty"` // "Upload"
PlatformIcon string `json:"platform_icon,omitempty"` PlatformIcon string `json:"platform_icon,omitempty"`
TaskID string `json:"task_id"`
URL string `json:"url,omitempty"` URL string `json:"url,omitempty"`
Width int `json:"width,omitempty,string"` Width int `json:"width,omitempty,string"`
} }

View file

@ -9,7 +9,9 @@ import (
"fmt" "fmt"
"io" "io"
"net/http" "net/http"
"net/url"
"os" "os"
"strconv"
"github.com/rclone/rclone/backend/pikpak/api" "github.com/rclone/rclone/backend/pikpak/api"
"github.com/rclone/rclone/lib/rest" "github.com/rclone/rclone/lib/rest"
@ -141,9 +143,8 @@ func (f *Fs) getFile(ctx context.Context, ID string) (info *api.File, err error)
var resp *http.Response var resp *http.Response
err = f.pacer.Call(func() (bool, error) { err = f.pacer.Call(func() (bool, error) {
resp, err = f.rst.CallJSON(ctx, &opts, nil, &info) resp, err = f.rst.CallJSON(ctx, &opts, nil, &info)
if err == nil && info.Phase != api.PhaseTypeComplete { if err == nil && !info.Links.ApplicationOctetStream.Valid() {
// could be pending right after file is created/uploaded. return true, errors.New("no link")
return true, errors.New("not PHASE_TYPE_COMPLETE")
} }
return f.shouldRetry(ctx, resp, err) return f.shouldRetry(ctx, resp, err)
}) })
@ -167,6 +168,45 @@ func (f *Fs) patchFile(ctx context.Context, ID string, req *api.File) (info *api
return return
} }
// getTask gets api.Task from API for the ID passed
func (f *Fs) getTask(ctx context.Context, ID string, checkPhase bool) (info *api.Task, err error) {
opts := rest.Opts{
Method: "GET",
Path: "/drive/v1/tasks/" + ID,
}
var resp *http.Response
err = f.pacer.Call(func() (bool, error) {
resp, err = f.rst.CallJSON(ctx, &opts, nil, &info)
if checkPhase {
if err == nil && info.Phase != api.PhaseTypeComplete {
// could be pending right after file is created/uploaded.
return true, errors.New(info.Phase)
}
}
return f.shouldRetry(ctx, resp, err)
})
return
}
// deleteTask remove a task having the specified ID
func (f *Fs) deleteTask(ctx context.Context, ID string, deleteFiles bool) (err error) {
params := url.Values{}
params.Set("delete_files", strconv.FormatBool(deleteFiles))
params.Set("task_ids", ID)
opts := rest.Opts{
Method: "DELETE",
Path: "/drive/v1/tasks",
Parameters: params,
NoResponse: true,
}
var resp *http.Response
err = f.pacer.Call(func() (bool, error) {
resp, err = f.rst.CallJSON(ctx, &opts, nil, nil)
return f.shouldRetry(ctx, resp, err)
})
return
}
// getAbout gets drive#quota information from server // getAbout gets drive#quota information from server
func (f *Fs) getAbout(ctx context.Context) (info *api.About, err error) { func (f *Fs) getAbout(ctx context.Context) (info *api.About, err error) {
opts := rest.Opts{ opts := rest.Opts{

View file

@ -53,6 +53,7 @@ import (
"github.com/rclone/rclone/fs/config/obscure" "github.com/rclone/rclone/fs/config/obscure"
"github.com/rclone/rclone/fs/fserrors" "github.com/rclone/rclone/fs/fserrors"
"github.com/rclone/rclone/fs/hash" "github.com/rclone/rclone/fs/hash"
"github.com/rclone/rclone/lib/atexit"
"github.com/rclone/rclone/lib/dircache" "github.com/rclone/rclone/lib/dircache"
"github.com/rclone/rclone/lib/encoder" "github.com/rclone/rclone/lib/encoder"
"github.com/rclone/rclone/lib/oauthutil" "github.com/rclone/rclone/lib/oauthutil"
@ -66,8 +67,9 @@ import (
const ( const (
rcloneClientID = "YNxT9w7GMdWvEOKa" rcloneClientID = "YNxT9w7GMdWvEOKa"
rcloneEncryptedClientSecret = "aqrmB6M1YJ1DWCBxVxFSjFo7wzWEky494YMmkqgAl1do1WKOe2E" rcloneEncryptedClientSecret = "aqrmB6M1YJ1DWCBxVxFSjFo7wzWEky494YMmkqgAl1do1WKOe2E"
minSleep = 10 * time.Millisecond minSleep = 100 * time.Millisecond
maxSleep = 2 * time.Second maxSleep = 2 * time.Second
waitTime = 500 * time.Millisecond
decayConstant = 2 // bigger for slower decay, exponential decayConstant = 2 // bigger for slower decay, exponential
rootURL = "https://api-drive.mypikpak.com" rootURL = "https://api-drive.mypikpak.com"
) )
@ -1175,12 +1177,13 @@ func (f *Fs) uploadByResumable(ctx context.Context, in io.Reader, resumable *api
return return
} }
func (f *Fs) upload(ctx context.Context, in io.Reader, leaf, dirID, sha1Str string, size int64, options ...fs.OpenOption) (*api.File, error) { func (f *Fs) upload(ctx context.Context, in io.Reader, leaf, dirID, sha1Str string, size int64, options ...fs.OpenOption) (info *api.File, err error) {
// determine upload type // determine upload type
uploadType := api.UploadTypeResumable uploadType := api.UploadTypeResumable
if size >= 0 && size < int64(5*fs.Mebi) { // if size >= 0 && size < int64(5*fs.Mebi) {
uploadType = api.UploadTypeForm // uploadType = api.UploadTypeForm
} // }
// stop using uploadByForm() cause it is not as reliable as uploadByResumable() for a large number of small files
// request upload ticket to API // request upload ticket to API
req := api.RequestNewFile{ req := api.RequestNewFile{
@ -1195,29 +1198,46 @@ func (f *Fs) upload(ctx context.Context, in io.Reader, leaf, dirID, sha1Str stri
if uploadType == api.UploadTypeResumable { if uploadType == api.UploadTypeResumable {
req.Resumable = map[string]string{"provider": "PROVIDER_ALIYUN"} req.Resumable = map[string]string{"provider": "PROVIDER_ALIYUN"}
} }
newfile, err := f.requestNewFile(ctx, &req) new, err := f.requestNewFile(ctx, &req)
if err != nil { if err != nil {
return nil, fmt.Errorf("failed to create a new file: %w", err) return nil, fmt.Errorf("failed to create a new file: %w", err)
} }
if newfile.File == nil { if new.File == nil {
return nil, fmt.Errorf("invalid response: %+v", newfile) return nil, fmt.Errorf("invalid response: %+v", new)
} else if newfile.File.Phase == api.PhaseTypeComplete { } else if new.File.Phase == api.PhaseTypeComplete {
// early return; in case of zero-byte objects // early return; in case of zero-byte objects
return newfile.File, nil return new.File, nil
} }
if uploadType == api.UploadTypeForm && newfile.Form != nil { defer atexit.OnError(&err, func() {
err = f.uploadByForm(ctx, in, req.Name, size, newfile.Form, options...) fs.Debugf(leaf, "canceling upload: %v", err)
} else if uploadType == api.UploadTypeResumable && newfile.Resumable != nil { if cancelErr := f.deleteObjects(ctx, []string{new.File.ID}, false); cancelErr != nil {
err = f.uploadByResumable(ctx, in, newfile.Resumable) fs.Logf(leaf, "failed to cancel upload: %v", cancelErr)
}
if cancelErr := f.deleteTask(ctx, new.Task.ID, false); cancelErr != nil {
fs.Logf(leaf, "failed to cancel upload: %v", cancelErr)
}
fs.Debugf(leaf, "waiting %v for the cancellation to be effective", waitTime)
time.Sleep(waitTime)
})()
if uploadType == api.UploadTypeForm && new.Form != nil {
err = f.uploadByForm(ctx, in, req.Name, size, new.Form, options...)
} else if uploadType == api.UploadTypeResumable && new.Resumable != nil {
err = f.uploadByResumable(ctx, in, new.Resumable)
} else { } else {
return nil, fmt.Errorf("unable to proceed upload: %+v", newfile) err = fmt.Errorf("no method available for uploading: %+v", new)
} }
if err != nil { if err != nil {
return nil, fmt.Errorf("failed to upload: %w", err) return nil, fmt.Errorf("failed to upload: %w", err)
} }
return newfile.File, nil fs.Debugf(leaf, "sleeping for %v before checking upload status", waitTime)
time.Sleep(waitTime)
if _, err = f.getTask(ctx, new.Task.ID, true); err != nil {
return nil, fmt.Errorf("unable to complete the upload: %w", err)
}
return new.File, nil
} }
// Put the object // Put the object
@ -1470,22 +1490,11 @@ func (o *Object) setMetaDataWithLink(ctx context.Context) error {
return nil return nil
} }
// fetch download link with retry scheme
// 1 initial attempt and 2 retries are reasonable based on empirical analysis
retries := 2
for i := 1; i <= retries+1; i++ {
info, err := o.fs.getFile(ctx, o.id) info, err := o.fs.getFile(ctx, o.id)
if err != nil { if err != nil {
return fmt.Errorf("can't fetch download link: %w", err) return err
} }
if err = o.setMetaData(info); err == nil && o.link.Valid() { return o.setMetaData(info)
return nil
}
if i <= retries {
time.Sleep(time.Duration(200*i) * time.Millisecond)
}
}
return errors.New("can't download - no link to download")
} }
// readMetaData gets the metadata if it hasn't already been fetched // readMetaData gets the metadata if it hasn't already been fetched
@ -1619,14 +1628,14 @@ func (o *Object) open(ctx context.Context, url string, options ...fs.OpenOption)
// Open an object for read // Open an object for read
func (o *Object) Open(ctx context.Context, options ...fs.OpenOption) (in io.ReadCloser, err error) { func (o *Object) Open(ctx context.Context, options ...fs.OpenOption) (in io.ReadCloser, err error) {
if o.id == "" { if o.id == "" {
return nil, errors.New("can't download - no id") return nil, errors.New("can't download: no id")
} }
if o.size == 0 { if o.size == 0 {
// zero-byte objects may have no download link // zero-byte objects may have no download link
return io.NopCloser(bytes.NewBuffer([]byte(nil))), nil return io.NopCloser(bytes.NewBuffer([]byte(nil))), nil
} }
if err = o.setMetaDataWithLink(ctx); err != nil { if err = o.setMetaDataWithLink(ctx); err != nil {
return nil, err return nil, fmt.Errorf("can't download: %w", err)
} }
return o.open(ctx, o.link.URL, options...) return o.open(ctx, o.link.URL, options...)
} }