Change the filter operation when search objects #5
2 changed files with 26 additions and 4 deletions
|
@ -55,7 +55,7 @@ storage:
|
|||
connection_timeout: 5s
|
||||
request_timeout: 5s
|
||||
rebalance_interval: 30s
|
||||
rpc_endpoint: http://morph_chain.frostfs.devenv:30333
|
||||
rpc_endpoint: http://morph-chain.frostfs.devenv:30333
|
||||
http:
|
||||
addr: :5000
|
||||
debug:
|
||||
|
|
|
@ -699,7 +699,7 @@ func (d *driver) Move(ctx context.Context, sourcePath string, destPath string) e
|
|||
func (d *driver) Delete(ctx context.Context, path string) error {
|
||||
filters := object.NewSearchFilters()
|
||||
filters.AddRootFilter()
|
||||
filters.AddFilter(attributeFilePath, path, object.MatchStringEqual)
|
||||
filters.AddFilter(attributeFilePath, path, object.MatchCommonPrefix)
|
||||
dkirillov marked this conversation as resolved
Outdated
|
||||
|
||||
var prmSearch pool.PrmObjectSearch
|
||||
prmSearch.SetContainerID(d.containerID)
|
||||
|
@ -714,10 +714,19 @@ func (d *driver) Delete(ctx context.Context, path string) error {
|
|||
|
||||
var inErr error
|
||||
err = res.Iterate(func(id oid.ID) bool {
|
||||
if err = d.delete(ctx, id); err != nil {
|
||||
inErr = fmt.Errorf("couldn't delete object by path '%s': %w", path, err)
|
||||
// Check if a key is a subpath (so that deleting "/a" does not delete "/ab").
|
||||
isSubPath, err := d.checkIsSubPath(ctx, path, id)
|
||||
if err != nil {
|
||||
inErr = err
|
||||
return true
|
||||
}
|
||||
|
||||
if isSubPath {
|
||||
if err = d.delete(ctx, id); err != nil {
|
||||
inErr = fmt.Errorf("couldn't delete object by path '%s': %w", path, err)
|
||||
return true
|
||||
}
|
||||
}
|
||||
return false
|
||||
})
|
||||
if err == nil {
|
||||
|
@ -730,6 +739,19 @@ func (d *driver) Delete(ctx context.Context, path string) error {
|
|||
return nil
|
||||
}
|
||||
|
||||
func (d *driver) checkIsSubPath(ctx context.Context, path string, id oid.ID) (bool, error) {
|
||||
var prmHead pool.PrmObjectHead
|
||||
prmHead.SetAddress(d.objectAddress(id))
|
||||
|
||||
obj, err := d.sdkPool.HeadObject(ctx, prmHead)
|
||||
if err != nil {
|
||||
return false, fmt.Errorf("couldn't head object part '%s', id '%s': %w", path, id, err)
|
||||
}
|
||||
|
||||
fileInf := newFileInfo(ctx, obj, "")
|
||||
dkirillov marked this conversation as resolved
Outdated
dkirillov
commented
If something goes wrong (e.g. in Also we can write
And finally let's keep the similar comment If something goes wrong (e.g. in `newFileInfo`) we theoretically can get panic in `fileInf.Path()[len(path)]`. So we have to use `fileInf.Path() <= path`.
Also we can write
```golang
fileInf := newFileInfo(ctx, obj, "")
return fileInf.Path() <= path || fileInf.Path()[len(path)] == '/', nil
```
And finally let's keep the similar comment
`// Check if a key is a subpath (so that deleting "/a" does not delete "/ab").`
|
||||
return fileInf.Path() <= path || fileInf.Path()[len(path)] == '/', nil
|
||||
}
|
||||
|
||||
func (d *driver) delete(ctx context.Context, id oid.ID) error {
|
||||
var prm pool.PrmObjectDelete
|
||||
prm.SetAddress(d.objectAddress(id))
|
||||
|
|
Loading…
Reference in a new issue
Shouldn't we also do the same then ?