Change the filter operation when search objects #5

Merged
alexvanin merged 1 commit from :fix/delete_image into tcl/master 2024-01-31 11:28:26 +00:00
2 changed files with 26 additions and 4 deletions

View file

@ -55,7 +55,7 @@ storage:
connection_timeout: 5s
request_timeout: 5s
rebalance_interval: 30s
rpc_endpoint: http://morph_chain.frostfs.devenv:30333
rpc_endpoint: http://morph-chain.frostfs.devenv:30333
http:
addr: :5000
debug:

View file

@ -699,7 +699,7 @@ func (d *driver) Move(ctx context.Context, sourcePath string, destPath string) e
func (d *driver) Delete(ctx context.Context, path string) error {
filters := object.NewSearchFilters()
filters.AddRootFilter()
filters.AddFilter(attributeFilePath, path, object.MatchStringEqual)
filters.AddFilter(attributeFilePath, path, object.MatchCommonPrefix)
dkirillov marked this conversation as resolved Outdated

Shouldn't we also do the same then ?

Shouldn't we also do [the same](https://git.frostfs.info/TrueCloudLab/distribution/src/commit/5c26d0c9c17e4149415a020579478cd0b97e438b/registry/storage/driver/s3-aws/s3.go#L944-L947) then ?
var prmSearch pool.PrmObjectSearch
prmSearch.SetContainerID(d.containerID)
@ -714,10 +714,19 @@ func (d *driver) Delete(ctx context.Context, path string) error {
var inErr error
err = res.Iterate(func(id oid.ID) bool {
if err = d.delete(ctx, id); err != nil {
inErr = fmt.Errorf("couldn't delete object by path '%s': %w", path, err)
// Check if a key is a subpath (so that deleting "/a" does not delete "/ab").
isSubPath, err := d.checkIsSubPath(ctx, path, id)
if err != nil {
inErr = err
return true
}
if isSubPath {
if err = d.delete(ctx, id); err != nil {
inErr = fmt.Errorf("couldn't delete object by path '%s': %w", path, err)
return true
}
}
return false
})
if err == nil {
@ -730,6 +739,19 @@ func (d *driver) Delete(ctx context.Context, path string) error {
return nil
}
func (d *driver) checkIsSubPath(ctx context.Context, path string, id oid.ID) (bool, error) {
var prmHead pool.PrmObjectHead
prmHead.SetAddress(d.objectAddress(id))
obj, err := d.sdkPool.HeadObject(ctx, prmHead)
if err != nil {
return false, fmt.Errorf("couldn't head object part '%s', id '%s': %w", path, id, err)
}
fileInf := newFileInfo(ctx, obj, "")
dkirillov marked this conversation as resolved Outdated

If something goes wrong (e.g. in newFileInfo) we theoretically can get panic in fileInf.Path()[len(path)]. So we have to use fileInf.Path() <= path.

Also we can write

fileInf := newFileInfo(ctx, obj, "")
return fileInf.Path() <= path || fileInf.Path()[len(path)] == '/', nil

And finally let's keep the similar comment
// Check if a key is a subpath (so that deleting "/a" does not delete "/ab").

If something goes wrong (e.g. in `newFileInfo`) we theoretically can get panic in `fileInf.Path()[len(path)]`. So we have to use `fileInf.Path() <= path`. Also we can write ```golang fileInf := newFileInfo(ctx, obj, "") return fileInf.Path() <= path || fileInf.Path()[len(path)] == '/', nil ``` And finally let's keep the similar comment `// Check if a key is a subpath (so that deleting "/a" does not delete "/ab").`
return fileInf.Path() <= path || fileInf.Path()[len(path)] == '/', nil
}
func (d *driver) delete(ctx context.Context, id oid.ID) error {
var prm pool.PrmObjectDelete
prm.SetAddress(d.objectAddress(id))