From 94a5de58c81fe5cb8268ced7ac9255e8cff79679 Mon Sep 17 00:00:00 2001 From: Nick Craig-Wood Date: Fri, 17 Nov 2023 09:28:05 +0000 Subject: [PATCH] linkbox: pre-merge fixes - convert to directoryCache - makes backend much more efficient - don't force --low-level-retries to 2 - don't wrap paced calls in pacer - fix shouldRetry - fix file list searching mechanism --- backend/linkbox/linkbox.go | 785 ++++++++++++++++++------------------ fstest/test_all/config.yaml | 2 + 2 files changed, 390 insertions(+), 397 deletions(-) diff --git a/backend/linkbox/linkbox.go b/backend/linkbox/linkbox.go index d61fcb1a7..75445ea72 100644 --- a/backend/linkbox/linkbox.go +++ b/backend/linkbox/linkbox.go @@ -1,16 +1,25 @@ // Package linkbox provides an interface to the linkbox.to Cloud storage system. +// +// API docs: https://www.linkbox.to/api-docs package linkbox +/* + Extras + - PublicLink - NO - sharing doesn't share the actual file, only a page with it on + - Move - YES - have Move and Rename file APIs so is possible + - MoveDir - NO - probably not possible - have Move but no Rename +*/ + import ( "bytes" "context" "crypto/md5" - "errors" "fmt" "io" "net/http" "net/url" "path" + "regexp" "strconv" "strings" "time" @@ -21,17 +30,18 @@ import ( "github.com/rclone/rclone/fs/fserrors" "github.com/rclone/rclone/fs/fshttp" "github.com/rclone/rclone/fs/hash" + "github.com/rclone/rclone/lib/dircache" "github.com/rclone/rclone/lib/pacer" "github.com/rclone/rclone/lib/rest" ) const ( - retriesAmmount = 2 - maxEntitiesPerPage = 64 + maxEntitiesPerPage = 1024 minSleep = 200 * time.Millisecond maxSleep = 2 * time.Second pacerBurst = 1 linkboxAPIURL = "https://www.linkbox.to/api/open/" + rootID = "0" // ID of root directory ) func init() { @@ -58,10 +68,11 @@ type Options struct { type Fs struct { name string root string - opt Options // options for this backend - features *fs.Features // optional features - ci *fs.ConfigInfo // global config - srv *rest.Client // the connection to the server + opt Options // options for this backend + features *fs.Features // optional features + ci *fs.ConfigInfo // global config + srv *rest.Client // the connection to the server + dirCache *dircache.DirCache // Map of directory path to directory id pacer *fs.Pacer } @@ -73,14 +84,16 @@ type Object struct { modTime time.Time contentType string fullURL string - pid int + dirID int64 + itemID string // and these IDs are for files + id int64 // these IDs appear to apply to directories isDir bool - id string } // NewFs creates a new Fs object from the name and root. It connects to // the host specified in the config file. func NewFs(ctx context.Context, name, root string, m configmap.Mapper) (fs.Fs, error) { + root = strings.Trim(root, "/") // Parse config into Options struct opt := new(Options) @@ -92,37 +105,53 @@ func NewFs(ctx context.Context, name, root string, m configmap.Mapper) (fs.Fs, e ci := fs.GetConfig(ctx) f := &Fs{ - name: name, - opt: *opt, - ci: ci, - srv: rest.NewClient(fshttp.NewClient(ctx)), + name: name, + opt: *opt, + root: root, + ci: ci, + srv: rest.NewClient(fshttp.NewClient(ctx)), + pacer: fs.NewPacer(ctx, pacer.NewDefault(pacer.MinSleep(minSleep), pacer.MaxSleep(maxSleep))), } - - f.pacer.SetRetries(retriesAmmount) + f.dirCache = dircache.New(root, rootID, f) f.features = (&fs.Features{ CanHaveEmptyDirectories: true, CaseInsensitive: true, }).Fill(ctx, f) - // Check to see if the root actually an existing file - remote := path.Base(root) - f.root = path.Dir(root) - if f.root == "." { - f.root = "" - } - _, err = f.NewObject(ctx, remote) + // Find the current root + err = f.dirCache.FindRoot(ctx, false) if err != nil { - if errors.Is(err, fs.ErrorObjectNotFound) || errors.Is(err, fs.ErrorNotAFile) || errors.Is(err, fs.ErrorIsDir) { - // File doesn't exist so return old f - f.root = root + // Assume it is a file + newRoot, remote := dircache.SplitPath(root) + tempF := *f + tempF.dirCache = dircache.New(newRoot, rootID, &tempF) + tempF.root = newRoot + // Make new Fs which is the parent + err = tempF.dirCache.FindRoot(ctx, false) + if err != nil { + // No root so return old f return f, nil } - return nil, err + _, err := tempF.NewObject(ctx, remote) + if err != nil { + if err == fs.ErrorObjectNotFound { + // File doesn't exist so return old f + return f, nil + } + return nil, err + } + f.features.Fill(ctx, &tempF) + // XXX: update the old f here instead of returning tempF, since + // `features` were already filled with functions having *f as a receiver. + // See https://github.com/rclone/rclone/issues/2182 + f.dirCache = tempF.dirCache + f.root = tempF.root + // return an error with an fs which points to the parent + return f, fs.ErrorIsFile } - // return an error with an fs which points to the parent - return f, fs.ErrorIsFile + return f, nil } type entity struct { @@ -130,169 +159,135 @@ type entity struct { Name string `json:"name"` URL string `json:"url"` Ctime int64 `json:"ctime"` - Size int `json:"size"` - ID int `json:"id"` - Pid int `json:"pid"` + Size int64 `json:"size"` + ID int64 `json:"id"` + Pid int64 `json:"pid"` ItemID string `json:"item_id"` } + +// Return true if the entity is a directory +func (e *entity) isDir() bool { + return e.Type == "dir" || e.Type == "sdir" +} + type data struct { Entities []entity `json:"list"` } type fileSearchRes struct { - SearchData data `json:"data"` - Status int `json:"status"` - Message string `json:"msg"` + response + SearchData data `json:"data"` } -func (f *Fs) getIDByDir(ctx context.Context, dir string) (int, error) { - var pid int - var err error - err = f.pacer.Call(func() (bool, error) { - pid, err = f._getIDByDir(ctx, dir) - return f.shouldRetry(ctx, err) - }) - - if fserrors.IsRetryError(err) { - fs.Debugf(f, "getting ID of Dir error: retrying: pid = {%d}, dir = {%s}, err = {%s}", pid, dir, err) - err = fs.ErrorDirNotFound - } - - return pid, err -} - -func (f *Fs) _getIDByDir(ctx context.Context, dir string) (int, error) { - if dir == "" || dir == "/" { - return 0, nil // we assume that it is root directory - } - - path := strings.TrimPrefix(dir, "/") - dirs := strings.Split(path, "/") - pid := 0 - - for level, tdir := range dirs { - pageNumber := 0 - numberOfEntities := maxEntitiesPerPage - - for numberOfEntities == maxEntitiesPerPage { - pageNumber++ - opts := makeSearchQuery("", pid, f.opt.Token, pageNumber) - responseResult := fileSearchRes{} - err := getUnmarshaledResponse(ctx, f, opts, &responseResult) - if err != nil { - return 0, fmt.Errorf("error in unmurshaling response from linkbox.to: %w", err) - } - - numberOfEntities = len(responseResult.SearchData.Entities) - if len(responseResult.SearchData.Entities) == 0 { - return 0, fs.ErrorDirNotFound - } - - for _, entity := range responseResult.SearchData.Entities { - if entity.Pid == pid && (entity.Type == "dir" || entity.Type == "sdir") && strings.EqualFold(entity.Name, tdir) { - pid = entity.ID - if level == len(dirs)-1 { - return pid, nil - } - } - } - - if pageNumber > 100000 { - return 0, fmt.Errorf("too many results") - } - - } - } - - // fs.Debugf(f, "getIDByDir fs.ErrorDirNotFound dir = {%s} path = {%s}", dir, path) - - return 0, fs.ErrorDirNotFound +// Set an object info from an entity +func (o *Object) set(e *entity) { + o.modTime = time.Unix(e.Ctime, 0) + o.contentType = e.Type + o.size = e.Size + o.fullURL = e.URL + o.isDir = e.isDir() + o.id = e.ID + o.itemID = e.ItemID + o.dirID = e.Pid } +// Call linkbox with the query in opts and return result +// +// This will be checked for error and an error will be returned if Status != 1 func getUnmarshaledResponse(ctx context.Context, f *Fs, opts *rest.Opts, result interface{}) error { err := f.pacer.Call(func() (bool, error) { - _, err := f.srv.CallJSON(ctx, opts, nil, &result) - return f.shouldRetry(ctx, err) + resp, err := f.srv.CallJSON(ctx, opts, nil, &result) + return f.shouldRetry(ctx, resp, err) }) - return err -} - -func makeSearchQuery(name string, pid int, token string, pageNubmer int) *rest.Opts { - return &rest.Opts{ - Method: "GET", - RootURL: linkboxAPIURL, - Path: "file_search", - Parameters: url.Values{ - "token": {token}, - "name": {name}, - "pid": {strconv.Itoa(pid)}, - "pageNo": {strconv.Itoa(pageNubmer)}, - "pageSize": {strconv.Itoa(maxEntitiesPerPage)}, - }, - } -} - -func (f *Fs) getFilesByDir(ctx context.Context, dir string) ([]*Object, error) { - var responseResult fileSearchRes - var files []*Object - var numberOfEntities int - - fullPath := path.Join(f.root, dir) - fullPath = strings.TrimPrefix(fullPath, "/") - - pid, err := f.getIDByDir(ctx, fullPath) - if err != nil { - fs.Debugf(f, "getting files list error: dir = {%s} fullPath = {%s} pid = {%d} err = {%s}", dir, fullPath, pid, err) - - return nil, err + return err } + responser := result.(responser) + if responser.IsError() { + return responser + } + return nil +} - pageNumber := 0 - numberOfEntities = maxEntitiesPerPage +// list the objects into the function supplied +// +// If directories is set it only sends directories +// User function to process a File item from listAll +// +// Should return true to finish processing +type listAllFn func(*entity) bool +// Search is a bit fussy about which characters match +// +// If the name doesn't match this then do an dir list instead +var searchOK = regexp.MustCompile(`^[a-zA-Z0-9_ .]+$`) + +// Lists the directory required calling the user function on each item found +// +// If the user fn ever returns true then it early exits with found = true +// +// If you set name then search ignores dirID. name is a substring +// search also so name="dir" matches "sub dir" also. This filters it +// down so it only returns items in dirID +func (f *Fs) listAll(ctx context.Context, dirID string, name string, fn listAllFn) (found bool, err error) { + var ( + pageNumber = 0 + numberOfEntities = maxEntitiesPerPage + ) + name = strings.TrimSpace(name) // search doesn't like spaces + if !searchOK.MatchString(name) { + // If name isn't good then do an unbounded search + name = "" + } +OUTER: for numberOfEntities == maxEntitiesPerPage { pageNumber++ - opts := makeSearchQuery("", pid, f.opt.Token, pageNumber) - - responseResult = fileSearchRes{} - err = getUnmarshaledResponse(ctx, f, opts, &responseResult) - if err != nil { - return nil, fmt.Errorf("getting files failed with error in unmurshaling response from linkbox.to: %w", err) - + opts := &rest.Opts{ + Method: "GET", + RootURL: linkboxAPIURL, + Path: "file_search", + Parameters: url.Values{ + "token": {f.opt.Token}, + "name": {name}, + "pid": {dirID}, + "pageNo": {itoa(pageNumber)}, + "pageSize": {itoa64(maxEntitiesPerPage)}, + }, } - if responseResult.Status != 1 { - return nil, fmt.Errorf("parsing failed: %s", responseResult.Message) + var responseResult fileSearchRes + err = getUnmarshaledResponse(ctx, f, opts, &responseResult) + if err != nil { + return false, fmt.Errorf("getting files failed: %w", err) + } numberOfEntities = len(responseResult.SearchData.Entities) for _, entity := range responseResult.SearchData.Entities { - if entity.Pid != pid { - fs.Debugf(f, "getFilesByDir error with entity.Name {%s} dir {%s}", entity.Name, dir) + if itoa64(entity.Pid) != dirID { + // when name != "" this returns from all directories, so ignore not this one + continue } - file := &Object{ - fs: f, - remote: entity.Name, - modTime: time.Unix(entity.Ctime, 0), - contentType: entity.Type, - size: int64(entity.Size), - fullURL: entity.URL, - isDir: entity.Type == "dir" || entity.Type == "sdir", - id: entity.ItemID, - pid: entity.Pid, + if fn(&entity) { + found = true + break OUTER } - - files = append(files, file) } - if pageNumber > 100000 { - return files, fmt.Errorf("too many results") + return false, fmt.Errorf("too many results") } - } + return found, nil +} - return files, nil +// Turn 64 bit int to string +func itoa64(i int64) string { + return strconv.FormatInt(i, 10) +} + +// Turn int to string +func itoa(i int) string { + return itoa64(int64(i)) } func splitDirAndName(remote string) (dir string, name string) { @@ -310,6 +305,62 @@ func splitDirAndName(remote string) (dir string, name string) { return dir, name } +// FindLeaf finds a directory of name leaf in the folder with ID directoryID +func (f *Fs) FindLeaf(ctx context.Context, directoryID, leaf string) (directoryIDOut string, found bool, err error) { + // Find the leaf in directoryID + found, err = f.listAll(ctx, directoryID, leaf, func(entity *entity) bool { + if entity.isDir() && strings.EqualFold(entity.Name, leaf) { + directoryIDOut = itoa64(entity.ID) + return true + } + return false + }) + return directoryIDOut, found, err +} + +// Returned from "folder_create" +type folderCreateRes struct { + response + Data struct { + DirID int64 `json:"dirId"` + } `json:"data"` +} + +// CreateDir makes a directory with dirID as parent and name leaf +func (f *Fs) CreateDir(ctx context.Context, dirID, leaf string) (newID string, err error) { + // fs.Debugf(f, "CreateDir(%q, %q)\n", dirID, leaf) + opts := &rest.Opts{ + Method: "GET", + RootURL: linkboxAPIURL, + Path: "folder_create", + Parameters: url.Values{ + "token": {f.opt.Token}, + "name": {leaf}, + "pid": {dirID}, + "isShare": {"0"}, + "canInvite": {"1"}, + "canShare": {"1"}, + "withBodyImg": {"1"}, + "desc": {""}, + }, + } + + response := folderCreateRes{} + err = getUnmarshaledResponse(ctx, f, opts, &response) + if err != nil { + // response status 1501 means that directory already exists + if response.Status == 1501 { + return newID, fmt.Errorf("couldn't find already created directory: %w", fs.ErrorDirNotFound) + } + return newID, fmt.Errorf("CreateDir failed: %w", err) + + } + if response.Data.DirID == 0 { + return newID, fmt.Errorf("API returned 0 for ID of newly created directory") + } + return itoa64(response.Data.DirID), nil +} + // List the objects and directories in dir into entries. The // entries can be returned in any order but should be for a // complete directory. @@ -321,84 +372,57 @@ func splitDirAndName(remote string) (dir string, name string) { // found. func (f *Fs) List(ctx context.Context, dir string) (entries fs.DirEntries, err error) { // fs.Debugf(f, "List method dir = {%s}", dir) - - objects, err := f.getFilesByDir(ctx, dir) + directoryID, err := f.dirCache.FindDir(ctx, dir, false) if err != nil { return nil, err } - - for _, obj := range objects { - prefix := "" - if dir != "" { - prefix = dir + "/" - } - - if obj.isDir { - entries = append(entries, fs.NewDir(prefix+obj.remote, obj.modTime)) + _, err = f.listAll(ctx, directoryID, "", func(entity *entity) bool { + remote := path.Join(dir, entity.Name) + if entity.isDir() { + id := itoa64(entity.ID) + modTime := time.Unix(entity.Ctime, 0) + d := fs.NewDir(remote, modTime).SetID(id).SetParentID(itoa64(entity.Pid)) + entries = append(entries, d) + // cache the directory ID for later lookups + f.dirCache.Put(remote, id) } else { - obj.remote = prefix + obj.remote - entries = append(entries, obj) + o := &Object{ + fs: f, + remote: remote, + } + o.set(entity) + entries = append(entries, o) } + return false + }) + if err != nil { + return nil, err } - return entries, nil } -func getObject(ctx context.Context, f *Fs, name string, pid int, token string) (entity, error) { - var err error - var entity entity - err = f.pacer.Call(func() (bool, error) { - entity, err = _getObject(ctx, f, name, pid, token) - return f.shouldRetry(ctx, err) - }) - // fs.Debugf(f, "getObject: name = {%s}, pid = {%d}, err = {%#v}", name, pid, err) - - if fserrors.IsRetryError(err) { - fs.Debugf(f, "getObject IsRetryError: name = {%s}, pid = {%d}, err = {%#v}", name, pid, err) - - err = fs.ErrorObjectNotFound - } - - return entity, err -} - -func _getObject(ctx context.Context, f *Fs, name string, pid int, token string) (entity, error) { - pageNumber := 0 - numberOfEntities := maxEntitiesPerPage - - for numberOfEntities == maxEntitiesPerPage { - pageNumber++ - opts := makeSearchQuery("", pid, token, pageNumber) - - searchResponse := fileSearchRes{} - err := getUnmarshaledResponse(ctx, f, opts, &searchResponse) - if err != nil { - return entity{}, fmt.Errorf("unable to create new object: %w", err) - } - if searchResponse.Status != 1 { - return entity{}, fmt.Errorf("unable to create new object: %s", searchResponse.Message) - } - numberOfEntities = len(searchResponse.SearchData.Entities) - - // fs.Debugf(f, "getObject numberOfEntities {%d} name {%s}", numberOfEntities, name) - - for _, obj := range searchResponse.SearchData.Entities { - // fs.Debugf(f, "getObject entity.Name {%s} name {%s}", obj.Name, name) - if obj.Pid == pid && strings.EqualFold(obj.Name, name) { - // fs.Debugf(f, "getObject found entity.Name {%s} name {%s}", obj.Name, name) - if obj.Type == "dir" || obj.Type == "sdir" { - return entity{}, fs.ErrorIsDir - } - return obj, nil +// get an entity with leaf from dirID +func getEntity(ctx context.Context, f *Fs, leaf string, directoryID string, token string) (*entity, error) { + var result *entity + var resultErr = fs.ErrorObjectNotFound + _, err := f.listAll(ctx, directoryID, leaf, func(entity *entity) bool { + if strings.EqualFold(entity.Name, leaf) { + // fs.Debugf(f, "getObject found entity.Name {%s} name {%s}", entity.Name, name) + if entity.isDir() { + result = nil + resultErr = fs.ErrorIsDir + } else { + result = entity + resultErr = nil } + return true } - - if pageNumber > 100000 { - return entity{}, fmt.Errorf("too many results") - } + return false + }) + if err != nil { + return nil, err } - - return entity{}, fs.ErrorObjectNotFound + return result, resultErr } // NewObject finds the Object at remote. If it can't be found @@ -408,145 +432,72 @@ func _getObject(ctx context.Context, f *Fs, name string, pid int, token string) // ErrorIsDir if possible without doing any extra work, // otherwise ErrorObjectNotFound. func (f *Fs) NewObject(ctx context.Context, remote string) (fs.Object, error) { - var newObject entity - var dir, name string - - fullPath := path.Join(f.root, remote) - dir, name = splitDirAndName(fullPath) - - dirID, err := f.getIDByDir(ctx, dir) + leaf, dirID, err := f.dirCache.FindPath(ctx, remote, false) if err != nil { - return nil, fs.ErrorObjectNotFound - } - - newObject, err = getObject(ctx, f, name, dirID, f.opt.Token) - if err != nil { - // fs.Debugf(f, "NewObject getObject error = {%s}", err) - + if err == fs.ErrorDirNotFound { + return nil, fs.ErrorObjectNotFound + } return nil, err } - if newObject == (entity{}) { - return nil, fs.ErrorObjectNotFound + entity, err := getEntity(ctx, f, leaf, dirID, f.opt.Token) + if err != nil { + return nil, err } - - return &Object{ - fs: f, - remote: name, - modTime: time.Unix(newObject.Ctime, 0), - fullURL: newObject.URL, - size: int64(newObject.Size), - id: newObject.ItemID, - pid: newObject.Pid, - }, nil + o := &Object{ + fs: f, + remote: remote, + } + o.set(entity) + return o, nil } // Mkdir makes the directory (container, bucket) // // Shouldn't return an error if it already exists func (f *Fs) Mkdir(ctx context.Context, dir string) error { - var pdir, name string - - fullPath := path.Join(f.root, dir) - if fullPath == "" { - return nil - } - - fullPath = strings.TrimPrefix(fullPath, "/") - - dirs := strings.Split(fullPath, "/") - dirs = append([]string{""}, dirs...) - - for i, dirName := range dirs { - pdir = path.Join(pdir, dirName) - name = dirs[i+1] - pid, err := f.getIDByDir(ctx, pdir) - if err != nil { - return err - } - - opts := &rest.Opts{ - Method: "GET", - RootURL: linkboxAPIURL, - Path: "folder_create", - Parameters: url.Values{ - "token": {f.opt.Token}, - "name": {name}, - "pid": {strconv.Itoa(pid)}, - "isShare": {"0"}, - "canInvite": {"1"}, - "canShare": {"1"}, - "withBodyImg": {"1"}, - "desc": {""}, - }, - } - - response := getResponse{} - - err = getUnmarshaledResponse(ctx, f, opts, &response) - if err != nil { - return fmt.Errorf("Mkdir error in unmurshaling response from linkbox.to: %w", err) - - } - - if i+1 == len(dirs)-1 { - break - } - - // response status 1501 means that directory already exists - if response.Status != 1 && response.Status != 1501 { - return fmt.Errorf("could not create dir[%s]: %s", dir, response.Message) - } - - } - return nil + _, err := f.dirCache.FindDir(ctx, dir, true) + return err } func (f *Fs) purgeCheck(ctx context.Context, dir string, check bool) error { - fullPath := path.Join(f.root, dir) - - if fullPath == "" { - return fs.ErrorDirNotFound + if check { + entries, err := f.List(ctx, dir) + if err != nil { + return err + } + if len(entries) != 0 { + return fs.ErrorDirectoryNotEmpty + } } - fullPath = strings.TrimPrefix(fullPath, "/") - dirIDs, err := f.getIDByDir(ctx, fullPath) - + directoryID, err := f.dirCache.FindDir(ctx, dir, false) if err != nil { return err } - - entries, err := f.List(ctx, dir) - if err != nil { - return err - } - - if len(entries) != 0 && check { - return fs.ErrorDirectoryNotEmpty - } - opts := &rest.Opts{ Method: "GET", RootURL: linkboxAPIURL, Path: "folder_del", Parameters: url.Values{ "token": {f.opt.Token}, - "dirIds": {strconv.Itoa(dirIDs)}, + "dirIds": {directoryID}, }, } - response := getResponse{} + response := response{} err = getUnmarshaledResponse(ctx, f, opts, &response) - if err != nil { - return fmt.Errorf("purging error in unmurshaling response from linkbox.to: %w", err) - + // Linkbox has some odd error returns here + if response.Status == 403 || response.Status == 500 { + return fs.ErrorDirNotFound + } + return fmt.Errorf("purge error: %w", err) } - if response.Status != 1 { - // it can be some different error, but Linkbox - // returns very few statuses - return fs.ErrorDirExists + f.dirCache.FlushDir(dir) + if err != nil { + return err } return nil } @@ -569,11 +520,11 @@ func (o *Object) Open(ctx context.Context, options ...fs.OpenOption) (io.ReadClo downloadURL := o.fullURL if downloadURL == "" { _, name := splitDirAndName(o.Remote()) - newObject, err := getObject(ctx, o.fs, name, o.pid, o.fs.opt.Token) + newObject, err := getEntity(ctx, o.fs, name, itoa64(o.dirID), o.fs.opt.Token) if err != nil { return nil, err } - if newObject == (entity{}) { + if newObject == nil { // fs.Debugf(o.fs, "Open entity is empty: name = {%s}", name) return nil, fs.ErrorObjectNotFound } @@ -590,7 +541,7 @@ func (o *Object) Open(ctx context.Context, options ...fs.OpenOption) (io.ReadClo err := o.fs.pacer.Call(func() (bool, error) { var err error res, err = o.fs.srv.Call(ctx, opts) - return o.fs.shouldRetry(ctx, err) + return o.fs.shouldRetry(ctx, res, err) }) if err != nil { @@ -605,17 +556,33 @@ func (o *Object) Open(ctx context.Context, options ...fs.OpenOption) (io.ReadClo // When called from outside an Fs by rclone, src.Size() will always be >= 0. // But for unknown-sized objects (indicated by src.Size() == -1), Upload should either // return an error or update the object properly (rather than e.g. calling panic). -func (o *Object) Update(ctx context.Context, in io.Reader, src fs.ObjectInfo, options ...fs.OpenOption) error { - if src.Size() == 0 { +func (o *Object) Update(ctx context.Context, in io.Reader, src fs.ObjectInfo, options ...fs.OpenOption) (err error) { + size := src.Size() + if size == 0 { return fs.ErrorCantUploadEmptyFiles + } else if size < 0 { + return fmt.Errorf("can't upload files of unknown length") } remote := o.Remote() - tmpObject, err := o.fs.NewObject(ctx, remote) - if err == nil { - // fs.Debugf(o.fs, "Update: removing old file") - _ = tmpObject.Remove(ctx) + // remove the file if it exists + if o.itemID != "" { + fs.Debugf(o, "Update: removing old file") + err = o.Remove(ctx) + if err != nil { + fs.Errorf(o, "Update: failed to remove existing file: %v", err) + } + o.itemID = "" + } else { + tmpObject, err := o.fs.NewObject(ctx, remote) + if err == nil { + fs.Debugf(o, "Update: removing old file") + err = tmpObject.Remove(ctx) + if err != nil { + fs.Errorf(o, "Update: failed to remove existing file: %v", err) + } + } } first10m := io.LimitReader(in, 10_485_760) @@ -633,17 +600,19 @@ func (o *Object) Update(ctx context.Context, in io.Reader, src fs.ObjectInfo, op Parameters: url.Values{ "token": {o.fs.opt.Token}, "fileMd5ofPre10m": {fmt.Sprintf("%x", md5.Sum(first10mBytes))}, - "fileSize": {strconv.FormatInt(src.Size(), 10)}, + "fileSize": {itoa64(size)}, }, } - getFistStepResult := getUploadURLResponse{} - err = getUnmarshaledResponse(ctx, o.fs, opts, &getFistStepResult) + getFirstStepResult := getUploadURLResponse{} + err = getUnmarshaledResponse(ctx, o.fs, opts, &getFirstStepResult) if err != nil { - return fmt.Errorf("Update err in unmarshaling response: %w", err) + if getFirstStepResult.Status != 600 { + return fmt.Errorf("Update err in unmarshaling response: %w", err) + } } - switch getFistStepResult.Status { + switch getFirstStepResult.Status { case 1: // upload file using link from first step var res *http.Response @@ -651,15 +620,16 @@ func (o *Object) Update(ctx context.Context, in io.Reader, src fs.ObjectInfo, op file := io.MultiReader(bytes.NewReader(first10mBytes), in) opts := &rest.Opts{ - Method: "PUT", - RootURL: getFistStepResult.Data.SignURL, - Options: options, - Body: file, + Method: "PUT", + RootURL: getFirstStepResult.Data.SignURL, + Options: options, + Body: file, + ContentLength: &size, } err = o.fs.pacer.CallNoRetry(func() (bool, error) { res, err = o.fs.srv.Call(ctx, opts) - return o.fs.shouldRetry(ctx, err) + return o.fs.shouldRetry(ctx, res, err) }) if err != nil { @@ -675,14 +645,10 @@ func (o *Object) Update(ctx context.Context, in io.Reader, src fs.ObjectInfo, op // Status means that we don't need to upload file // We need only to make second step default: - return fmt.Errorf("got unexpected message from Linkbox: %s", getFistStepResult.Message) + return fmt.Errorf("got unexpected message from Linkbox: %s", getFirstStepResult.Message) } - fullPath := path.Join(o.fs.root, remote) - fullPath = strings.TrimPrefix(fullPath, "/") - - pdir, name := splitDirAndName(fullPath) - pid, err := o.fs.getIDByDir(ctx, pdir) + leaf, dirID, err := o.fs.dirCache.FindPath(ctx, remote, false) if err != nil { return err } @@ -696,34 +662,38 @@ func (o *Object) Update(ctx context.Context, in io.Reader, src fs.ObjectInfo, op Parameters: url.Values{ "token": {o.fs.opt.Token}, "fileMd5ofPre10m": {fmt.Sprintf("%x", md5.Sum(first10mBytes))}, - "fileSize": {strconv.FormatInt(src.Size(), 10)}, - "pid": {strconv.Itoa(pid)}, - "diyName": {name}, + "fileSize": {itoa64(size)}, + "pid": {dirID}, + "diyName": {leaf}, }, } getSecondStepResult := getUploadURLResponse{} err = getUnmarshaledResponse(ctx, o.fs, opts, &getSecondStepResult) if err != nil { - return fmt.Errorf("Update err in unmarshaling response: %w", err) - } - if getSecondStepResult.Status != 1 { - return fmt.Errorf("get bad status from linkbox: %s", getSecondStepResult.Message) + return fmt.Errorf("Update second step failed: %w", err) } - newObject, err := getObject(ctx, o.fs, name, pid, o.fs.opt.Token) + // Try a few times to read the object after upload for eventual consistency + const maxTries = 10 + var sleepTime = 100 * time.Millisecond + var entity *entity + for try := 1; try <= maxTries; try++ { + entity, err = getEntity(ctx, o.fs, leaf, dirID, o.fs.opt.Token) + if err == nil { + break + } + if err != fs.ErrorObjectNotFound { + return fmt.Errorf("Update failed to read object: %w", err) + } + fs.Debugf(o, "Trying to read object after upload: try again in %v (%d/%d)", sleepTime, try, maxTries) + time.Sleep(sleepTime) + sleepTime *= 2 + } if err != nil { - return fs.ErrorObjectNotFound + return err } - if newObject == (entity{}) { - return fs.ErrorObjectNotFound - } - - o.pid = pid - o.remote = remote - o.modTime = time.Unix(newObject.Ctime, 0) - o.size = src.Size() - + o.set(entity) return nil } @@ -735,21 +705,15 @@ func (o *Object) Remove(ctx context.Context) error { Path: "file_del", Parameters: url.Values{ "token": {o.fs.opt.Token}, - "itemIds": {o.id}, + "itemIds": {o.itemID}, }, } - requestResult := getUploadURLResponse{} err := getUnmarshaledResponse(ctx, o.fs, opts, &requestResult) if err != nil { return fmt.Errorf("could not Remove: %w", err) } - - if requestResult.Status != 1 { - return fmt.Errorf("got unexpected message from Linkbox: %s", requestResult.Message) - } - return nil } @@ -834,18 +798,36 @@ func (f *Fs) Hashes() hash.Set { "status": 1 } */ -type getResponse struct { + +// All messages have these items +type response struct { Message string `json:"msg"` Status int `json:"status"` } +// IsError returns whether response represents an error +func (r *response) IsError() bool { + return r.Status != 1 +} + +// Error returns the error state of this response +func (r *response) Error() string { + return fmt.Sprintf("Linkbox error %d: %s", r.Status, r.Message) +} + +// responser is interface covering the response so we can use it when it is embedded. +type responser interface { + IsError() bool + Error() string +} + type getUploadURLData struct { SignURL string `json:"signUrl"` } type getUploadURLResponse struct { + response Data getUploadURLData `json:"data"` - getResponse } // Put in to the remote path with the modTime given of the given size @@ -882,25 +864,34 @@ func (f *Fs) Purge(ctx context.Context, dir string) error { return f.purgeCheck(ctx, dir, false) } +// retryErrorCodes is a slice of error codes that we will retry +var retryErrorCodes = []int{ + 429, // Too Many Requests. + 500, // Internal Server Error + 502, // Bad Gateway + 503, // Service Unavailable + 504, // Gateway Timeout + 509, // Bandwidth Limit Exceeded +} + // shouldRetry determines whether a given err rates being retried -func (f *Fs) shouldRetry(ctx context.Context, err error) (bool, error) { - if err == fs.ErrorDirNotFound { - // fs.Debugf(nil, "retry with %v", err) - - return true, err +func (f *Fs) shouldRetry(ctx context.Context, resp *http.Response, err error) (bool, error) { + if fserrors.ContextError(ctx, &err) { + return false, err } + return fserrors.ShouldRetry(err) || fserrors.ShouldRetryHTTP(resp, retryErrorCodes), err +} - if err == fs.ErrorObjectNotFound { - // fs.Debugf(nil, "retry with %v", err) - - return true, err - } - return false, err +// DirCacheFlush resets the directory cache - used in testing as an +// optional interface +func (f *Fs) DirCacheFlush() { + f.dirCache.ResetRoot() } // Check the interfaces are satisfied var ( - _ fs.Fs = &Fs{} - _ fs.Purger = &Fs{} - _ fs.Object = &Object{} + _ fs.Fs = &Fs{} + _ fs.Purger = &Fs{} + _ fs.DirCacheFlusher = &Fs{} + _ fs.Object = &Object{} ) diff --git a/fstest/test_all/config.yaml b/fstest/test_all/config.yaml index b6aef1e7b..3c77100c0 100644 --- a/fstest/test_all/config.yaml +++ b/fstest/test_all/config.yaml @@ -379,6 +379,8 @@ backends: fastlist: false ignore: - TestIntegration/FsMkdir/FsEncoding/invalid_UTF-8 + - TestRWFileHandleWriteNoWrite + - TestCaseInsensitiveMoveFile - backend: "premiumizeme" remote: "TestPremiumizeMe:" fastlist: false