pikpak: improves data consistency by ensuring async tasks complete

Similar to uploads implemented in commit ce5024bf33, 
this change ensures most asynchronous file operations (copy, move, delete, 
purge, and cleanup) complete before proceeding with subsequent actions. 
This reduces the risk of data inconsistencies and improves overall reliability.
This commit is contained in:
wiserain 2024-06-20 00:07:05 +09:00 committed by GitHub
parent 9f1a7cfa67
commit cbccad9491
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
2 changed files with 33 additions and 21 deletions

View file

@ -12,6 +12,7 @@ import (
"net/url" "net/url"
"os" "os"
"strconv" "strconv"
"time"
"github.com/rclone/rclone/backend/pikpak/api" "github.com/rclone/rclone/backend/pikpak/api"
"github.com/rclone/rclone/lib/rest" "github.com/rclone/rclone/lib/rest"
@ -82,19 +83,21 @@ func (f *Fs) getVIPInfo(ctx context.Context) (info *api.VIP, err error) {
// action can be one of batch{Copy,Delete,Trash,Untrash} // action can be one of batch{Copy,Delete,Trash,Untrash}
func (f *Fs) requestBatchAction(ctx context.Context, action string, req *api.RequestBatch) (err error) { func (f *Fs) requestBatchAction(ctx context.Context, action string, req *api.RequestBatch) (err error) {
opts := rest.Opts{ opts := rest.Opts{
Method: "POST", Method: "POST",
Path: "/drive/v1/files:" + action, Path: "/drive/v1/files:" + action,
NoResponse: true, // Only returns `{"task_id":""}
} }
info := struct {
TaskID string `json:"task_id"`
}{}
var resp *http.Response var resp *http.Response
err = f.pacer.Call(func() (bool, error) { err = f.pacer.Call(func() (bool, error) {
resp, err = f.rst.CallJSON(ctx, &opts, &req, nil) resp, err = f.rst.CallJSON(ctx, &opts, &req, &info)
return f.shouldRetry(ctx, resp, err) return f.shouldRetry(ctx, resp, err)
}) })
if err != nil { if err != nil {
return fmt.Errorf("batch action %q failed: %w", action, err) return fmt.Errorf("batch action %q failed: %w", action, err)
} }
return nil return f.waitTask(ctx, info.TaskID)
} }
// requestNewTask requests a new api.NewTask and returns api.Task // requestNewTask requests a new api.NewTask and returns api.Task
@ -179,8 +182,8 @@ func (f *Fs) getTask(ctx context.Context, ID string, checkPhase bool) (info *api
resp, err = f.rst.CallJSON(ctx, &opts, nil, &info) resp, err = f.rst.CallJSON(ctx, &opts, nil, &info)
if checkPhase { if checkPhase {
if err == nil && info.Phase != api.PhaseTypeComplete { if err == nil && info.Phase != api.PhaseTypeComplete {
// could be pending right after file is created/uploaded. // could be pending right after the task is created
return true, errors.New(info.Phase) return true, fmt.Errorf("%s (%s) is still in %s", info.Name, info.Type, info.Phase)
} }
} }
return f.shouldRetry(ctx, resp, err) return f.shouldRetry(ctx, resp, err)
@ -188,6 +191,18 @@ func (f *Fs) getTask(ctx context.Context, ID string, checkPhase bool) (info *api
return return
} }
// waitTask waits for async tasks to be completed
func (f *Fs) waitTask(ctx context.Context, ID string) (err error) {
time.Sleep(taskWaitTime)
if info, err := f.getTask(ctx, ID, true); err != nil {
if info == nil {
return fmt.Errorf("can't verify the task is completed: %q", ID)
}
return fmt.Errorf("can't verify the task is completed: %#v", info)
}
return
}
// deleteTask remove a task having the specified ID // deleteTask remove a task having the specified ID
func (f *Fs) deleteTask(ctx context.Context, ID string, deleteFiles bool) (err error) { func (f *Fs) deleteTask(ctx context.Context, ID string, deleteFiles bool) (err error) {
params := url.Values{} params := url.Values{}

View file

@ -69,7 +69,7 @@ const (
rcloneEncryptedClientSecret = "aqrmB6M1YJ1DWCBxVxFSjFo7wzWEky494YMmkqgAl1do1WKOe2E" rcloneEncryptedClientSecret = "aqrmB6M1YJ1DWCBxVxFSjFo7wzWEky494YMmkqgAl1do1WKOe2E"
minSleep = 100 * time.Millisecond minSleep = 100 * time.Millisecond
maxSleep = 2 * time.Second maxSleep = 2 * time.Second
waitTime = 500 * time.Millisecond taskWaitTime = 500 * time.Millisecond
decayConstant = 2 // bigger for slower decay, exponential decayConstant = 2 // bigger for slower decay, exponential
rootURL = "https://api-drive.mypikpak.com" rootURL = "https://api-drive.mypikpak.com"
minChunkSize = fs.SizeSuffix(s3manager.MinUploadPartSize) minChunkSize = fs.SizeSuffix(s3manager.MinUploadPartSize)
@ -917,19 +917,21 @@ func (f *Fs) Purge(ctx context.Context, dir string) error {
// CleanUp empties the trash // CleanUp empties the trash
func (f *Fs) CleanUp(ctx context.Context) (err error) { func (f *Fs) CleanUp(ctx context.Context) (err error) {
opts := rest.Opts{ opts := rest.Opts{
Method: "PATCH", Method: "PATCH",
Path: "/drive/v1/files/trash:empty", Path: "/drive/v1/files/trash:empty",
NoResponse: true, // Only returns `{"task_id":""}
} }
info := struct {
TaskID string `json:"task_id"`
}{}
var resp *http.Response var resp *http.Response
err = f.pacer.Call(func() (bool, error) { err = f.pacer.Call(func() (bool, error) {
resp, err = f.rst.Call(ctx, &opts) resp, err = f.rst.CallJSON(ctx, &opts, nil, &info)
return f.shouldRetry(ctx, resp, err) return f.shouldRetry(ctx, resp, err)
}) })
if err != nil { if err != nil {
return fmt.Errorf("couldn't empty trash: %w", err) return fmt.Errorf("couldn't empty trash: %w", err)
} }
return nil return f.waitTask(ctx, info.TaskID)
} }
// Move the object // Move the object
@ -1262,8 +1264,8 @@ func (f *Fs) upload(ctx context.Context, in io.Reader, leaf, dirID, sha1Str stri
if cancelErr := f.deleteTask(ctx, new.Task.ID, false); cancelErr != nil { if cancelErr := f.deleteTask(ctx, new.Task.ID, false); cancelErr != nil {
fs.Logf(leaf, "failed to cancel upload: %v", cancelErr) fs.Logf(leaf, "failed to cancel upload: %v", cancelErr)
} }
fs.Debugf(leaf, "waiting %v for the cancellation to be effective", waitTime) fs.Debugf(leaf, "waiting %v for the cancellation to be effective", taskWaitTime)
time.Sleep(waitTime) time.Sleep(taskWaitTime)
})() })()
if uploadType == api.UploadTypeForm && new.Form != nil { if uploadType == api.UploadTypeForm && new.Form != nil {
@ -1277,12 +1279,7 @@ func (f *Fs) upload(ctx context.Context, in io.Reader, leaf, dirID, sha1Str stri
if err != nil { if err != nil {
return nil, fmt.Errorf("failed to upload: %w", err) return nil, fmt.Errorf("failed to upload: %w", err)
} }
fs.Debugf(leaf, "sleeping for %v before checking upload status", waitTime) return new.File, f.waitTask(ctx, new.Task.ID)
time.Sleep(waitTime)
if _, err = f.getTask(ctx, new.Task.ID, true); err != nil {
return nil, fmt.Errorf("unable to complete the upload: %w", err)
}
return new.File, nil
} }
// Put the object // Put the object