diff --git a/backend/frostfs/frostfs.go b/backend/frostfs/frostfs.go index dc5bbdd6a..8d0d8566b 100644 --- a/backend/frostfs/frostfs.go +++ b/backend/frostfs/frostfs.go @@ -5,6 +5,7 @@ import ( "bytes" "context" "encoding/hex" + "errors" "fmt" "io" "math" @@ -155,6 +156,8 @@ func init() { }) } +var errMalformedObject = errors.New("malformed object") + // Options defines the configuration for this backend type Options struct { FrostfsEndpoint string `config:"endpoint"` @@ -292,7 +295,7 @@ func NewFs(ctx context.Context, name string, root string, m configmap.Mapper) (f return f, nil } -func newObject(f *Fs, obj object.Object, container string) *Object { +func newObject(f *Fs, obj object.Object, container string) (*Object, error) { // we should not include rootDirectory into remote name prefix := f.rootDirectory if prefix != "" { @@ -326,9 +329,12 @@ func newObject(f *Fs, obj object.Object, container string) *Object { } } - if objInfo.filePath == "" { - objInfo.filePath = objInfo.name + // We expect that the FilePath attribute is present in the object and that it starts with a leading slash + if objInfo.filePath == "" || objInfo.filePath[0] != '/' { + return nil, errMalformedObject } + // Don't include a leading slash in the resulting object's file path. + objInfo.filePath = objInfo.filePath[1:] objInfo.remote = objInfo.filePath if strings.Contains(objInfo.remote, prefix) { @@ -339,7 +345,7 @@ func newObject(f *Fs, obj object.Object, container string) *Object { } } - return objInfo + return objInfo, nil } // MimeType of an Object if known, "" otherwise @@ -631,7 +637,7 @@ func (f *Fs) Put(ctx context.Context, in io.Reader, src fs.ObjectInfo, options . _ = f.pool.DeleteObject(ctx, prmDelete) } - return newObject(f, obj, ""), nil + return newObject(f, obj, "") } func fillHeaders(ctx context.Context, filePath string, src fs.ObjectInfo, options ...fs.OpenOption) map[string]string { @@ -642,7 +648,7 @@ func fillHeaders(ctx context.Context, filePath string, src fs.ObjectInfo, option }) } - headers := map[string]string{object.AttributeFilePath: filePath} + headers := map[string]string{object.AttributeFilePath: "/" + filePath} for _, option := range options { key, value := option.Header() @@ -718,8 +724,10 @@ func (o *Object) Update(ctx context.Context, in io.Reader, src fs.ObjectInfo, op if err != nil { return fmt.Errorf("fetch head object: %w", err) } - - objInfo := newObject(o.fs, obj, "") + var objInfo *Object + if objInfo, err = newObject(o.fs, obj, ""); err != nil { + return err + } o.filePath = objInfo.filePath o.remote = objInfo.remote @@ -769,7 +777,7 @@ func (f *Fs) NewObject(ctx context.Context, remote string) (fs.Object, error) { return nil, fmt.Errorf("head object: %w", err) } - return newObject(f, obj, ""), nil + return newObject(f, obj, "") } func (f *Fs) waitForAPECacheInvalidated(ctx context.Context, expectedCh chain.Chain, cnrID cid.ID) error { @@ -1053,7 +1061,11 @@ func (f *Fs) listEntries(ctx context.Context, rootDirName, containerPath, direct return nil, err } - objInf := newObject(f, obj, rootDirName) + objInf, err := newObject(f, obj, rootDirName) + if err != nil { + // skip an erroneous object + continue + } if !recursive { withoutPath := strings.TrimPrefix(objInf.filePath, containerPath) @@ -1112,7 +1124,7 @@ func (f *Fs) listContainers(ctx context.Context) (fs.DirEntries, error) { func (f *Fs) findObjectsFilePath(ctx context.Context, cnrID cid.ID, filePath string) ([]oid.ID, error) { return f.findObjects(ctx, cnrID, searchFilter{ Header: object.AttributeFilePath, - Value: filePath, + Value: "/" + filePath, MatchType: object.MatchStringEqual, }) } @@ -1120,7 +1132,7 @@ func (f *Fs) findObjectsFilePath(ctx context.Context, cnrID cid.ID, filePath str func (f *Fs) findObjectsPrefix(ctx context.Context, cnrID cid.ID, prefix string) ([]oid.ID, error) { return f.findObjects(ctx, cnrID, searchFilter{ Header: object.AttributeFilePath, - Value: prefix, + Value: "/" + prefix, MatchType: object.MatchCommonPrefix, }) } @@ -1139,7 +1151,7 @@ func (f *Fs) findObjects(ctx context.Context, cnrID cid.ID, filters ...searchFil func (f *Fs) deleteByPrefix(ctx context.Context, cnrID cid.ID, prefix string) error { filters := object.NewSearchFilters() filters.AddRootFilter() - filters.AddFilter(object.AttributeFilePath, prefix, object.MatchCommonPrefix) + filters.AddFilter(object.AttributeFilePath, "/"+prefix, object.MatchCommonPrefix) var prmSearch pool.PrmObjectSearch prmSearch.SetContainerID(cnrID)