forked from TrueCloudLab/rclone
pikpak: optimize upload by pre-fetching gcid from API
This commit optimizes the PikPak upload process by pre-fetching the Global Content Identifier (gcid) from the API server before calculating it locally. Previously, a gcid required for uploads was calculated locally. This process was resource-intensive and time-consuming. By first checking for a cached gcid on the server, we can potentially avoid the local calculation entirely. This significantly improves upload speed especially for large files.
This commit is contained in:
parent
afd2663057
commit
471531eb6a
2 changed files with 127 additions and 23 deletions
|
@ -8,13 +8,16 @@ import (
|
|||
"errors"
|
||||
"fmt"
|
||||
"io"
|
||||
"math/rand"
|
||||
"net/http"
|
||||
"net/url"
|
||||
"os"
|
||||
"strconv"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/rclone/rclone/backend/pikpak/api"
|
||||
"github.com/rclone/rclone/fs"
|
||||
"github.com/rclone/rclone/lib/rest"
|
||||
)
|
||||
|
||||
|
@ -253,6 +256,37 @@ func (f *Fs) requestShare(ctx context.Context, req *api.RequestShare) (info *api
|
|||
return
|
||||
}
|
||||
|
||||
// getGcid retrieves Gcid cached in API server
|
||||
func (f *Fs) getGcid(ctx context.Context, src fs.ObjectInfo) (gcid string, err error) {
|
||||
cid, err := calcCid(ctx, src)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
params := url.Values{}
|
||||
params.Set("cid", cid)
|
||||
params.Set("file_size", strconv.FormatInt(src.Size(), 10))
|
||||
opts := rest.Opts{
|
||||
Method: "GET",
|
||||
Path: "/drive/v1/resource/cid",
|
||||
Parameters: params,
|
||||
ExtraHeaders: map[string]string{"x-device-id": f.deviceID},
|
||||
}
|
||||
|
||||
info := struct {
|
||||
Gcid string `json:"gcid,omitempty"`
|
||||
}{}
|
||||
var resp *http.Response
|
||||
err = f.pacer.Call(func() (bool, error) {
|
||||
resp, err = f.rst.CallJSON(ctx, &opts, nil, &info)
|
||||
return f.shouldRetry(ctx, resp, err)
|
||||
})
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
return info.Gcid, nil
|
||||
}
|
||||
|
||||
// Read the gcid of in returning a reader which will read the same contents
|
||||
//
|
||||
// The cleanup function should be called when out is finished with
|
||||
|
@ -306,6 +340,9 @@ func readGcid(in io.Reader, size, threshold int64) (gcid string, out io.Reader,
|
|||
return
|
||||
}
|
||||
|
||||
// calcGcid calculates Gcid from reader
|
||||
//
|
||||
// Gcid is a custom hash to index a file contents
|
||||
func calcGcid(r io.Reader, size int64) (string, error) {
|
||||
calcBlockSize := func(j int64) int64 {
|
||||
var psize int64 = 0x40000
|
||||
|
@ -330,3 +367,64 @@ func calcGcid(r io.Reader, size int64) (string, error) {
|
|||
}
|
||||
return hex.EncodeToString(totalHash.Sum(nil)), nil
|
||||
}
|
||||
|
||||
// calcCid calculates Cid from source
|
||||
//
|
||||
// Cid is a simplified version of Gcid
|
||||
func calcCid(ctx context.Context, src fs.ObjectInfo) (cid string, err error) {
|
||||
srcObj := fs.UnWrapObjectInfo(src)
|
||||
if srcObj == nil {
|
||||
return "", fmt.Errorf("failed to unwrap object from src: %s", src)
|
||||
}
|
||||
|
||||
size := src.Size()
|
||||
hash := sha1.New()
|
||||
var rc io.ReadCloser
|
||||
|
||||
readHash := func(start, length int64) (err error) {
|
||||
end := start + length - 1
|
||||
if rc, err = srcObj.Open(ctx, &fs.RangeOption{Start: start, End: end}); err != nil {
|
||||
return fmt.Errorf("failed to open src with range (%d, %d): %w", start, end, err)
|
||||
}
|
||||
defer fs.CheckClose(rc, &err)
|
||||
_, err = io.Copy(hash, rc)
|
||||
return err
|
||||
}
|
||||
|
||||
if size <= 0xF000 { // 61440 = 60KB
|
||||
err = readHash(0, size)
|
||||
} else { // 20KB from three different parts
|
||||
for _, start := range []int64{0, size / 3, size - 0x5000} {
|
||||
err = readHash(start, 0x5000)
|
||||
if err != nil {
|
||||
break
|
||||
}
|
||||
}
|
||||
}
|
||||
if err != nil {
|
||||
return "", fmt.Errorf("failed to hash: %w", err)
|
||||
}
|
||||
cid = strings.ToUpper(hex.EncodeToString(hash.Sum(nil)))
|
||||
return
|
||||
}
|
||||
|
||||
// randomly generates device id used for request header 'x-device-id'
|
||||
//
|
||||
// original javascript implementation
|
||||
//
|
||||
// return "xxxxxxxxxxxx4xxxyxxxxxxxxxxxxxxx".replace(/[xy]/g, (e) => {
|
||||
// const t = (16 * Math.random()) | 0;
|
||||
// return ("x" == e ? t : (3 & t) | 8).toString(16);
|
||||
// });
|
||||
func genDeviceID() string {
|
||||
base := []byte("xxxxxxxxxxxx4xxxyxxxxxxxxxxxxxxx")
|
||||
for i, char := range base {
|
||||
switch char {
|
||||
case 'x':
|
||||
base[i] = fmt.Sprintf("%x", rand.Intn(16))[0]
|
||||
case 'y':
|
||||
base[i] = fmt.Sprintf("%x", rand.Intn(16)&3|8)[0]
|
||||
}
|
||||
}
|
||||
return string(base)
|
||||
}
|
||||
|
|
|
@ -274,6 +274,7 @@ type Fs struct {
|
|||
dirCache *dircache.DirCache // Map of directory path to directory id
|
||||
pacer *fs.Pacer // pacer for API calls
|
||||
rootFolderID string // the id of the root folder
|
||||
deviceID string // device id used for api requests
|
||||
client *http.Client // authorized client
|
||||
m configmap.Mapper
|
||||
tokenMu *sync.Mutex // when renewing tokens
|
||||
|
@ -489,6 +490,7 @@ func newFs(ctx context.Context, name, path string, m configmap.Mapper) (*Fs, err
|
|||
CanHaveEmptyDirectories: true, // can have empty directories
|
||||
NoMultiThreading: true, // can't have multiple threads downloading
|
||||
}).Fill(ctx, f)
|
||||
f.deviceID = genDeviceID()
|
||||
|
||||
if err := f.newClientWithPacer(ctx); err != nil {
|
||||
return nil, err
|
||||
|
@ -1694,32 +1696,36 @@ func (o *Object) upload(ctx context.Context, in io.Reader, src fs.ObjectInfo, wi
|
|||
}
|
||||
|
||||
// Calculate gcid; grabbed from package jottacloud
|
||||
var gcid string
|
||||
if srcObj := fs.UnWrapObjectInfo(src); srcObj != nil && srcObj.Fs().Features().IsLocal {
|
||||
// No buffering; directly calculate gcid from source
|
||||
rc, err := srcObj.Open(ctx)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to open src: %w", err)
|
||||
}
|
||||
defer fs.CheckClose(rc, &err)
|
||||
gcid, err := o.fs.getGcid(ctx, src)
|
||||
if err != nil || gcid == "" {
|
||||
fs.Debugf(o, "calculating gcid: %v", err)
|
||||
if srcObj := fs.UnWrapObjectInfo(src); srcObj != nil && srcObj.Fs().Features().IsLocal {
|
||||
// No buffering; directly calculate gcid from source
|
||||
rc, err := srcObj.Open(ctx)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to open src: %w", err)
|
||||
}
|
||||
defer fs.CheckClose(rc, &err)
|
||||
|
||||
if gcid, err = calcGcid(rc, srcObj.Size()); err != nil {
|
||||
return fmt.Errorf("failed to calculate gcid: %w", err)
|
||||
if gcid, err = calcGcid(rc, srcObj.Size()); err != nil {
|
||||
return fmt.Errorf("failed to calculate gcid: %w", err)
|
||||
}
|
||||
} else {
|
||||
// unwrap the accounting from the input, we use wrap to put it
|
||||
// back on after the buffering
|
||||
var wrap accounting.WrapFn
|
||||
in, wrap = accounting.UnWrap(in)
|
||||
var cleanup func()
|
||||
gcid, in, cleanup, err = readGcid(in, size, int64(o.fs.opt.HashMemoryThreshold))
|
||||
defer cleanup()
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to calculate gcid: %w", err)
|
||||
}
|
||||
// Wrap the accounting back onto the stream
|
||||
in = wrap(in)
|
||||
}
|
||||
} else {
|
||||
// unwrap the accounting from the input, we use wrap to put it
|
||||
// back on after the buffering
|
||||
var wrap accounting.WrapFn
|
||||
in, wrap = accounting.UnWrap(in)
|
||||
var cleanup func()
|
||||
gcid, in, cleanup, err = readGcid(in, size, int64(o.fs.opt.HashMemoryThreshold))
|
||||
defer cleanup()
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to calculate gcid: %w", err)
|
||||
}
|
||||
// Wrap the accounting back onto the stream
|
||||
in = wrap(in)
|
||||
}
|
||||
fs.Debugf(o, "gcid = %s", gcid)
|
||||
|
||||
if !withTemp {
|
||||
info, err := o.fs.upload(ctx, in, leaf, dirID, gcid, size, options...)
|
||||
|
|
Loading…
Reference in a new issue