[#11] Add leading slash to FilePath attr

Signed-off-by: Aleksey Kravchenko <al.kravchenko@yadro.com>
This commit is contained in:
Aleksey Kravchenko 2025-02-17 15:19:15 +03:00 committed by Alexey Vanin
parent c719874fd9
commit 5557f6af23

View file

@ -5,6 +5,7 @@ import (
"bytes" "bytes"
"context" "context"
"encoding/hex" "encoding/hex"
"errors"
"fmt" "fmt"
"io" "io"
"math" "math"
@ -155,6 +156,8 @@ func init() {
}) })
} }
var errMalformedObject = errors.New("malformed object")
// Options defines the configuration for this backend // Options defines the configuration for this backend
type Options struct { type Options struct {
FrostfsEndpoint string `config:"endpoint"` FrostfsEndpoint string `config:"endpoint"`
@ -292,7 +295,7 @@ func NewFs(ctx context.Context, name string, root string, m configmap.Mapper) (f
return f, nil return f, nil
} }
func newObject(f *Fs, obj object.Object, container string) *Object { func newObject(f *Fs, obj object.Object, container string) (*Object, error) {
// we should not include rootDirectory into remote name // we should not include rootDirectory into remote name
prefix := f.rootDirectory prefix := f.rootDirectory
if prefix != "" { if prefix != "" {
@ -326,9 +329,12 @@ func newObject(f *Fs, obj object.Object, container string) *Object {
} }
} }
if objInfo.filePath == "" { // We expect that the FilePath attribute is present in the object and that it starts with a leading slash
objInfo.filePath = objInfo.name if objInfo.filePath == "" || objInfo.filePath[0] != '/' {
return nil, errMalformedObject
} }
// Don't include a leading slash in the resulting object's file path.
objInfo.filePath = objInfo.filePath[1:]
objInfo.remote = objInfo.filePath objInfo.remote = objInfo.filePath
if strings.Contains(objInfo.remote, prefix) { if strings.Contains(objInfo.remote, prefix) {
@ -339,7 +345,7 @@ func newObject(f *Fs, obj object.Object, container string) *Object {
} }
} }
return objInfo return objInfo, nil
} }
// MimeType of an Object if known, "" otherwise // MimeType of an Object if known, "" otherwise
@ -631,7 +637,7 @@ func (f *Fs) Put(ctx context.Context, in io.Reader, src fs.ObjectInfo, options .
_ = f.pool.DeleteObject(ctx, prmDelete) _ = f.pool.DeleteObject(ctx, prmDelete)
} }
return newObject(f, obj, ""), nil return newObject(f, obj, "")
} }
func fillHeaders(ctx context.Context, filePath string, src fs.ObjectInfo, options ...fs.OpenOption) map[string]string { func fillHeaders(ctx context.Context, filePath string, src fs.ObjectInfo, options ...fs.OpenOption) map[string]string {
@ -642,7 +648,7 @@ func fillHeaders(ctx context.Context, filePath string, src fs.ObjectInfo, option
}) })
} }
headers := map[string]string{object.AttributeFilePath: filePath} headers := map[string]string{object.AttributeFilePath: "/" + filePath}
for _, option := range options { for _, option := range options {
key, value := option.Header() key, value := option.Header()
@ -718,8 +724,10 @@ func (o *Object) Update(ctx context.Context, in io.Reader, src fs.ObjectInfo, op
if err != nil { if err != nil {
return fmt.Errorf("fetch head object: %w", err) return fmt.Errorf("fetch head object: %w", err)
} }
var objInfo *Object
objInfo := newObject(o.fs, obj, "") if objInfo, err = newObject(o.fs, obj, ""); err != nil {
return err
}
o.filePath = objInfo.filePath o.filePath = objInfo.filePath
o.remote = objInfo.remote o.remote = objInfo.remote
@ -769,7 +777,7 @@ func (f *Fs) NewObject(ctx context.Context, remote string) (fs.Object, error) {
return nil, fmt.Errorf("head object: %w", err) return nil, fmt.Errorf("head object: %w", err)
} }
return newObject(f, obj, ""), nil return newObject(f, obj, "")
} }
func (f *Fs) waitForAPECacheInvalidated(ctx context.Context, expectedCh chain.Chain, cnrID cid.ID) error { func (f *Fs) waitForAPECacheInvalidated(ctx context.Context, expectedCh chain.Chain, cnrID cid.ID) error {
@ -1053,7 +1061,11 @@ func (f *Fs) listEntries(ctx context.Context, rootDirName, containerPath, direct
return nil, err return nil, err
} }
objInf := newObject(f, obj, rootDirName) objInf, err := newObject(f, obj, rootDirName)
if err != nil {
// skip an erroneous object
continue
}
if !recursive { if !recursive {
withoutPath := strings.TrimPrefix(objInf.filePath, containerPath) withoutPath := strings.TrimPrefix(objInf.filePath, containerPath)
@ -1112,7 +1124,7 @@ func (f *Fs) listContainers(ctx context.Context) (fs.DirEntries, error) {
func (f *Fs) findObjectsFilePath(ctx context.Context, cnrID cid.ID, filePath string) ([]oid.ID, error) { func (f *Fs) findObjectsFilePath(ctx context.Context, cnrID cid.ID, filePath string) ([]oid.ID, error) {
return f.findObjects(ctx, cnrID, searchFilter{ return f.findObjects(ctx, cnrID, searchFilter{
Header: object.AttributeFilePath, Header: object.AttributeFilePath,
Value: filePath, Value: "/" + filePath,
MatchType: object.MatchStringEqual, MatchType: object.MatchStringEqual,
}) })
} }
@ -1120,7 +1132,7 @@ func (f *Fs) findObjectsFilePath(ctx context.Context, cnrID cid.ID, filePath str
func (f *Fs) findObjectsPrefix(ctx context.Context, cnrID cid.ID, prefix string) ([]oid.ID, error) { func (f *Fs) findObjectsPrefix(ctx context.Context, cnrID cid.ID, prefix string) ([]oid.ID, error) {
return f.findObjects(ctx, cnrID, searchFilter{ return f.findObjects(ctx, cnrID, searchFilter{
Header: object.AttributeFilePath, Header: object.AttributeFilePath,
Value: prefix, Value: "/" + prefix,
MatchType: object.MatchCommonPrefix, MatchType: object.MatchCommonPrefix,
}) })
} }
@ -1139,7 +1151,7 @@ func (f *Fs) findObjects(ctx context.Context, cnrID cid.ID, filters ...searchFil
func (f *Fs) deleteByPrefix(ctx context.Context, cnrID cid.ID, prefix string) error { func (f *Fs) deleteByPrefix(ctx context.Context, cnrID cid.ID, prefix string) error {
filters := object.NewSearchFilters() filters := object.NewSearchFilters()
filters.AddRootFilter() filters.AddRootFilter()
filters.AddFilter(object.AttributeFilePath, prefix, object.MatchCommonPrefix) filters.AddFilter(object.AttributeFilePath, "/"+prefix, object.MatchCommonPrefix)
var prmSearch pool.PrmObjectSearch var prmSearch pool.PrmObjectSearch
prmSearch.SetContainerID(cnrID) prmSearch.SetContainerID(cnrID)