[#5] Change the filter operation when search objects

The delete function should work recursively
and delete all objects by path and by subpaths.

Signed-off-by: Roman Loginov <r.loginov@yadro.com>
This commit is contained in:
Roman Loginov 2024-01-25 09:09:25 +03:00
parent 5284b39b5f
commit 953388ac54
2 changed files with 26 additions and 4 deletions

View file

@ -55,7 +55,7 @@ storage:
connection_timeout: 5s connection_timeout: 5s
request_timeout: 5s request_timeout: 5s
rebalance_interval: 30s rebalance_interval: 30s
rpc_endpoint: http://morph_chain.frostfs.devenv:30333 rpc_endpoint: http://morph-chain.frostfs.devenv:30333
http: http:
addr: :5000 addr: :5000
debug: debug:

View file

@ -700,7 +700,7 @@ func (d *driver) Move(ctx context.Context, sourcePath string, destPath string) e
func (d *driver) Delete(ctx context.Context, path string) error { func (d *driver) Delete(ctx context.Context, path string) error {
filters := object.NewSearchFilters() filters := object.NewSearchFilters()
filters.AddRootFilter() filters.AddRootFilter()
filters.AddFilter(attributeFilePath, path, object.MatchStringEqual) filters.AddFilter(attributeFilePath, path, object.MatchCommonPrefix)
var prmSearch pool.PrmObjectSearch var prmSearch pool.PrmObjectSearch
prmSearch.SetContainerID(d.containerID) prmSearch.SetContainerID(d.containerID)
@ -715,10 +715,19 @@ func (d *driver) Delete(ctx context.Context, path string) error {
var inErr error var inErr error
err = res.Iterate(func(id oid.ID) bool { err = res.Iterate(func(id oid.ID) bool {
// Check if a key is a subpath (so that deleting "/a" does not delete "/ab").
isSubPath, err := d.checkIsSubPath(ctx, path, id)
if err != nil {
inErr = err
return true
}
if isSubPath {
if err = d.delete(ctx, id); err != nil { if err = d.delete(ctx, id); err != nil {
inErr = fmt.Errorf("couldn't delete object by path '%s': %w", path, err) inErr = fmt.Errorf("couldn't delete object by path '%s': %w", path, err)
return true return true
} }
}
return false return false
}) })
if err == nil { if err == nil {
@ -731,6 +740,19 @@ func (d *driver) Delete(ctx context.Context, path string) error {
return nil return nil
} }
func (d *driver) checkIsSubPath(ctx context.Context, path string, id oid.ID) (bool, error) {
var prmHead pool.PrmObjectHead
prmHead.SetAddress(d.objectAddress(id))
obj, err := d.sdkPool.HeadObject(ctx, prmHead)
if err != nil {
return false, fmt.Errorf("couldn't head object part '%s', id '%s': %w", path, id, err)
}
fileInf := newFileInfo(ctx, obj, "")
return fileInf.Path() <= path || fileInf.Path()[len(path)] == '/', nil
}
func (d *driver) delete(ctx context.Context, id oid.ID) error { func (d *driver) delete(ctx context.Context, id oid.ID) error {
var prm pool.PrmObjectDelete var prm pool.PrmObjectDelete
prm.SetAddress(d.objectAddress(id)) prm.SetAddress(d.objectAddress(id))