[#5] Change the filter operation when search objects

The delete function should work recursively
and delete all objects by path and by subpaths.

Signed-off-by: Roman Loginov <r.loginov@yadro.com>
This commit is contained in:
Roman Loginov 2024-01-25 09:09:25 +03:00
parent 5c26d0c9c1
commit 42eafd1fff
2 changed files with 26 additions and 4 deletions

View file

@ -55,7 +55,7 @@ storage:
connection_timeout: 5s
request_timeout: 5s
rebalance_interval: 30s
rpc_endpoint: http://morph_chain.frostfs.devenv:30333
rpc_endpoint: http://morph-chain.frostfs.devenv:30333
http:
addr: :5000
debug:

View file

@ -699,7 +699,7 @@ func (d *driver) Move(ctx context.Context, sourcePath string, destPath string) e
func (d *driver) Delete(ctx context.Context, path string) error {
filters := object.NewSearchFilters()
filters.AddRootFilter()
filters.AddFilter(attributeFilePath, path, object.MatchStringEqual)
filters.AddFilter(attributeFilePath, path, object.MatchCommonPrefix)
var prmSearch pool.PrmObjectSearch
prmSearch.SetContainerID(d.containerID)
@ -714,10 +714,19 @@ func (d *driver) Delete(ctx context.Context, path string) error {
var inErr error
err = res.Iterate(func(id oid.ID) bool {
if err = d.delete(ctx, id); err != nil {
inErr = fmt.Errorf("couldn't delete object by path '%s': %w", path, err)
// Check if a key is a subpath (so that deleting "/a" does not delete "/ab").
isSubPath, err := d.checkIsSubPath(ctx, path, id)
if err != nil {
inErr = err
return true
}
if isSubPath {
if err = d.delete(ctx, id); err != nil {
inErr = fmt.Errorf("couldn't delete object by path '%s': %w", path, err)
return true
}
}
return false
})
if err == nil {
@ -730,6 +739,19 @@ func (d *driver) Delete(ctx context.Context, path string) error {
return nil
}
func (d *driver) checkIsSubPath(ctx context.Context, path string, id oid.ID) (bool, error) {
var prmHead pool.PrmObjectHead
prmHead.SetAddress(d.objectAddress(id))
obj, err := d.sdkPool.HeadObject(ctx, prmHead)
if err != nil {
return false, fmt.Errorf("couldn't head object part '%s', id '%s': %w", path, id, err)
}
fileInf := newFileInfo(ctx, obj, "")
return fileInf.Path() <= path || fileInf.Path()[len(path)] == '/', nil
}
func (d *driver) delete(ctx context.Context, id oid.ID) error {
var prm pool.PrmObjectDelete
prm.SetAddress(d.objectAddress(id))