From cbccad9491f4206be1ba71cc7b2d03995eb35cde Mon Sep 17 00:00:00 2001 From: wiserain Date: Thu, 20 Jun 2024 00:07:05 +0900 Subject: [PATCH] pikpak: improves data consistency by ensuring async tasks complete Similar to uploads implemented in commit ce5024bf3317dbbe2072abda15036e2d8507d3fd, this change ensures most asynchronous file operations (copy, move, delete, purge, and cleanup) complete before proceeding with subsequent actions. This reduces the risk of data inconsistencies and improves overall reliability. --- backend/pikpak/helper.go | 29 ++++++++++++++++++++++------- backend/pikpak/pikpak.go | 25 +++++++++++-------------- 2 files changed, 33 insertions(+), 21 deletions(-) diff --git a/backend/pikpak/helper.go b/backend/pikpak/helper.go index 6351ef1b4..2e0306d2b 100644 --- a/backend/pikpak/helper.go +++ b/backend/pikpak/helper.go @@ -12,6 +12,7 @@ import ( "net/url" "os" "strconv" + "time" "github.com/rclone/rclone/backend/pikpak/api" "github.com/rclone/rclone/lib/rest" @@ -82,19 +83,21 @@ func (f *Fs) getVIPInfo(ctx context.Context) (info *api.VIP, err error) { // action can be one of batch{Copy,Delete,Trash,Untrash} func (f *Fs) requestBatchAction(ctx context.Context, action string, req *api.RequestBatch) (err error) { opts := rest.Opts{ - Method: "POST", - Path: "/drive/v1/files:" + action, - NoResponse: true, // Only returns `{"task_id":""} + Method: "POST", + Path: "/drive/v1/files:" + action, } + info := struct { + TaskID string `json:"task_id"` + }{} var resp *http.Response err = f.pacer.Call(func() (bool, error) { - resp, err = f.rst.CallJSON(ctx, &opts, &req, nil) + resp, err = f.rst.CallJSON(ctx, &opts, &req, &info) return f.shouldRetry(ctx, resp, err) }) if err != nil { return fmt.Errorf("batch action %q failed: %w", action, err) } - return nil + return f.waitTask(ctx, info.TaskID) } // requestNewTask requests a new api.NewTask and returns api.Task @@ -179,8 +182,8 @@ func (f *Fs) getTask(ctx context.Context, ID string, checkPhase bool) (info *api resp, err = f.rst.CallJSON(ctx, &opts, nil, &info) if checkPhase { if err == nil && info.Phase != api.PhaseTypeComplete { - // could be pending right after file is created/uploaded. - return true, errors.New(info.Phase) + // could be pending right after the task is created + return true, fmt.Errorf("%s (%s) is still in %s", info.Name, info.Type, info.Phase) } } return f.shouldRetry(ctx, resp, err) @@ -188,6 +191,18 @@ func (f *Fs) getTask(ctx context.Context, ID string, checkPhase bool) (info *api return } +// waitTask waits for async tasks to be completed +func (f *Fs) waitTask(ctx context.Context, ID string) (err error) { + time.Sleep(taskWaitTime) + if info, err := f.getTask(ctx, ID, true); err != nil { + if info == nil { + return fmt.Errorf("can't verify the task is completed: %q", ID) + } + return fmt.Errorf("can't verify the task is completed: %#v", info) + } + return +} + // deleteTask remove a task having the specified ID func (f *Fs) deleteTask(ctx context.Context, ID string, deleteFiles bool) (err error) { params := url.Values{} diff --git a/backend/pikpak/pikpak.go b/backend/pikpak/pikpak.go index 4a3fbd3d9..565180cde 100644 --- a/backend/pikpak/pikpak.go +++ b/backend/pikpak/pikpak.go @@ -69,7 +69,7 @@ const ( rcloneEncryptedClientSecret = "aqrmB6M1YJ1DWCBxVxFSjFo7wzWEky494YMmkqgAl1do1WKOe2E" minSleep = 100 * time.Millisecond maxSleep = 2 * time.Second - waitTime = 500 * time.Millisecond + taskWaitTime = 500 * time.Millisecond decayConstant = 2 // bigger for slower decay, exponential rootURL = "https://api-drive.mypikpak.com" minChunkSize = fs.SizeSuffix(s3manager.MinUploadPartSize) @@ -917,19 +917,21 @@ func (f *Fs) Purge(ctx context.Context, dir string) error { // CleanUp empties the trash func (f *Fs) CleanUp(ctx context.Context) (err error) { opts := rest.Opts{ - Method: "PATCH", - Path: "/drive/v1/files/trash:empty", - NoResponse: true, // Only returns `{"task_id":""} + Method: "PATCH", + Path: "/drive/v1/files/trash:empty", } + info := struct { + TaskID string `json:"task_id"` + }{} var resp *http.Response err = f.pacer.Call(func() (bool, error) { - resp, err = f.rst.Call(ctx, &opts) + resp, err = f.rst.CallJSON(ctx, &opts, nil, &info) return f.shouldRetry(ctx, resp, err) }) if err != nil { return fmt.Errorf("couldn't empty trash: %w", err) } - return nil + return f.waitTask(ctx, info.TaskID) } // Move the object @@ -1262,8 +1264,8 @@ func (f *Fs) upload(ctx context.Context, in io.Reader, leaf, dirID, sha1Str stri if cancelErr := f.deleteTask(ctx, new.Task.ID, false); cancelErr != nil { fs.Logf(leaf, "failed to cancel upload: %v", cancelErr) } - fs.Debugf(leaf, "waiting %v for the cancellation to be effective", waitTime) - time.Sleep(waitTime) + fs.Debugf(leaf, "waiting %v for the cancellation to be effective", taskWaitTime) + time.Sleep(taskWaitTime) })() if uploadType == api.UploadTypeForm && new.Form != nil { @@ -1277,12 +1279,7 @@ func (f *Fs) upload(ctx context.Context, in io.Reader, leaf, dirID, sha1Str stri if err != nil { return nil, fmt.Errorf("failed to upload: %w", err) } - fs.Debugf(leaf, "sleeping for %v before checking upload status", waitTime) - time.Sleep(waitTime) - if _, err = f.getTask(ctx, new.Task.ID, true); err != nil { - return nil, fmt.Errorf("unable to complete the upload: %w", err) - } - return new.File, nil + return new.File, f.waitTask(ctx, new.Task.ID) } // Put the object