diff --git a/cmd/registry/config-dev-frostfs.yml b/cmd/registry/config-dev-frostfs.yml index 4c9b7ce2..f0463860 100644 --- a/cmd/registry/config-dev-frostfs.yml +++ b/cmd/registry/config-dev-frostfs.yml @@ -55,7 +55,7 @@ storage: connection_timeout: 5s request_timeout: 5s rebalance_interval: 30s - rpc_endpoint: http://morph_chain.frostfs.devenv:30333 + rpc_endpoint: http://morph-chain.frostfs.devenv:30333 http: addr: :5000 debug: diff --git a/registry/storage/driver/frostfs/frostfs.go b/registry/storage/driver/frostfs/frostfs.go index 9212669d..690948ce 100644 --- a/registry/storage/driver/frostfs/frostfs.go +++ b/registry/storage/driver/frostfs/frostfs.go @@ -700,7 +700,7 @@ func (d *driver) Move(ctx context.Context, sourcePath string, destPath string) e func (d *driver) Delete(ctx context.Context, path string) error { filters := object.NewSearchFilters() filters.AddRootFilter() - filters.AddFilter(attributeFilePath, path, object.MatchStringEqual) + filters.AddFilter(attributeFilePath, path, object.MatchCommonPrefix) var prmSearch pool.PrmObjectSearch prmSearch.SetContainerID(d.containerID) @@ -715,10 +715,19 @@ func (d *driver) Delete(ctx context.Context, path string) error { var inErr error err = res.Iterate(func(id oid.ID) bool { - if err = d.delete(ctx, id); err != nil { - inErr = fmt.Errorf("couldn't delete object by path '%s': %w", path, err) + // Check if a key is a subpath (so that deleting "/a" does not delete "/ab"). + isSubPath, err := d.checkIsSubPath(ctx, path, id) + if err != nil { + inErr = err return true } + + if isSubPath { + if err = d.delete(ctx, id); err != nil { + inErr = fmt.Errorf("couldn't delete object by path '%s': %w", path, err) + return true + } + } return false }) if err == nil { @@ -731,6 +740,19 @@ func (d *driver) Delete(ctx context.Context, path string) error { return nil } +func (d *driver) checkIsSubPath(ctx context.Context, path string, id oid.ID) (bool, error) { + var prmHead pool.PrmObjectHead + prmHead.SetAddress(d.objectAddress(id)) + + obj, err := d.sdkPool.HeadObject(ctx, prmHead) + if err != nil { + return false, fmt.Errorf("couldn't head object part '%s', id '%s': %w", path, id, err) + } + + fileInf := newFileInfo(ctx, obj, "") + return fileInf.Path() <= path || fileInf.Path()[len(path)] == '/', nil +} + func (d *driver) delete(ctx context.Context, id oid.ID) error { var prm pool.PrmObjectDelete prm.SetAddress(d.objectAddress(id))