[#11] Add leading slash to FilePath attribute #16

Merged
alexvanin merged 1 commit from KurlesHS/rclone:fix/add_leading_slash into tcl/master 2025-02-21 14:40:48 +00:00

View file

@ -5,6 +5,7 @@ import (
"bytes"
"context"
"encoding/hex"
"errors"
"fmt"
"io"
"math"
@ -155,6 +156,8 @@ func init() {
})
}
var errMalformedObject = errors.New("malformed object")
// Options defines the configuration for this backend
type Options struct {
FrostfsEndpoint string `config:"endpoint"`
@ -292,7 +295,7 @@ func NewFs(ctx context.Context, name string, root string, m configmap.Mapper) (f
return f, nil
}
func newObject(f *Fs, obj object.Object, container string) *Object {
func newObject(f *Fs, obj object.Object, container string) (*Object, error) {
// we should not include rootDirectory into remote name
prefix := f.rootDirectory
if prefix != "" {
@ -326,9 +329,12 @@ func newObject(f *Fs, obj object.Object, container string) *Object {
}
}
if objInfo.filePath == "" {
objInfo.filePath = objInfo.name
// We expect that the FilePath attribute is present in the object and that it starts with a leading slash
if objInfo.filePath == "" || objInfo.filePath[0] != '/' {
return nil, errMalformedObject
}
// Don't include a leading slash in the resulting object's file path.
dkirillov marked this conversation as resolved
Review

Why so?

Why so?
Review

Here, we create an rclone object from a FrostFS object. The internal implementation of this backend assumes that the object.remote and fs.rootDirectory fields of the object will not have a leading slash. We ensure that a slash will be added to these fields later when storing, deleting, and searching for objects in FrostFS. This is handled by the findObjects, deleteByPrefix, and fillHeaders methods.

Here, we create an rclone object from a FrostFS object. The internal implementation of this backend assumes that the `object.remote` and `fs.rootDirectory` fields of the object will not have a leading slash. We ensure that a slash will be added to these fields later when storing, deleting, and searching for objects in FrostFS. This is handled by the `findObjects`, `deleteByPrefix`, and `fillHeaders` methods.
objInfo.filePath = objInfo.filePath[1:]
objInfo.remote = objInfo.filePath
if strings.Contains(objInfo.remote, prefix) {
@ -339,7 +345,7 @@ func newObject(f *Fs, obj object.Object, container string) *Object {
}
}
return objInfo
return objInfo, nil
}
// MimeType of an Object if known, "" otherwise
@ -630,7 +636,7 @@ func (f *Fs) Put(ctx context.Context, in io.Reader, src fs.ObjectInfo, options .
_ = f.pool.DeleteObject(ctx, prmDelete)
}
return newObject(f, obj, ""), nil
return newObject(f, obj, "")
}
func fillHeaders(ctx context.Context, filePath string, src fs.ObjectInfo, options ...fs.OpenOption) map[string]string {
@ -641,7 +647,7 @@ func fillHeaders(ctx context.Context, filePath string, src fs.ObjectInfo, option
})
}
headers := map[string]string{object.AttributeFilePath: filePath}
headers := map[string]string{object.AttributeFilePath: "/" + filePath}
for _, option := range options {
key, value := option.Header()
@ -717,8 +723,10 @@ func (o *Object) Update(ctx context.Context, in io.Reader, src fs.ObjectInfo, op
if err != nil {
return fmt.Errorf("fetch head object: %w", err)
}
objInfo := newObject(o.fs, obj, "")
var objInfo *Object
if objInfo, err = newObject(o.fs, obj, ""); err != nil {
return err
}
o.filePath = objInfo.filePath
o.remote = objInfo.remote
@ -768,7 +776,7 @@ func (f *Fs) NewObject(ctx context.Context, remote string) (fs.Object, error) {
return nil, fmt.Errorf("head object: %w", err)
}
return newObject(f, obj, ""), nil
return newObject(f, obj, "")
}
func (f *Fs) waitForAPECacheInvalidated(ctx context.Context, expectedCh chain.Chain, cnrID cid.ID) error {
@ -1056,7 +1064,11 @@ func (f *Fs) listEntries(ctx context.Context, rootDirName, containerPath, direct
return nil, err
}
objInf := newObject(f, obj, rootDirName)
objInf, err := newObject(f, obj, rootDirName)
if err != nil {
// skip an erroneous object
continue
}
if !recursive {
withoutPath := strings.TrimPrefix(objInf.filePath, containerPath)
@ -1115,7 +1127,7 @@ func (f *Fs) listContainers(ctx context.Context) (fs.DirEntries, error) {
func (f *Fs) findObjectsFilePath(ctx context.Context, cnrID cid.ID, filePath string) ([]oid.ID, error) {
return f.findObjects(ctx, cnrID, searchFilter{
Header: object.AttributeFilePath,
Value: filePath,
Value: "/" + filePath,
MatchType: object.MatchStringEqual,
})
}
@ -1123,7 +1135,7 @@ func (f *Fs) findObjectsFilePath(ctx context.Context, cnrID cid.ID, filePath str
func (f *Fs) findObjectsPrefix(ctx context.Context, cnrID cid.ID, prefix string) ([]oid.ID, error) {
return f.findObjects(ctx, cnrID, searchFilter{
Header: object.AttributeFilePath,
Value: prefix,
Value: "/" + prefix,
MatchType: object.MatchCommonPrefix,
})
}
@ -1142,7 +1154,7 @@ func (f *Fs) findObjects(ctx context.Context, cnrID cid.ID, filters ...searchFil
func (f *Fs) deleteByPrefix(ctx context.Context, cnrID cid.ID, prefix string) error {
filters := object.NewSearchFilters()
filters.AddRootFilter()
filters.AddFilter(object.AttributeFilePath, prefix, object.MatchCommonPrefix)
filters.AddFilter(object.AttributeFilePath, "/"+prefix, object.MatchCommonPrefix)
var prmSearch pool.PrmObjectSearch
prmSearch.SetContainerID(cnrID)