Change the filter operation when search objects #5
2 changed files with 26 additions and 4 deletions
|
@ -55,7 +55,7 @@ storage:
|
||||||
connection_timeout: 5s
|
connection_timeout: 5s
|
||||||
request_timeout: 5s
|
request_timeout: 5s
|
||||||
rebalance_interval: 30s
|
rebalance_interval: 30s
|
||||||
rpc_endpoint: http://morph_chain.frostfs.devenv:30333
|
rpc_endpoint: http://morph-chain.frostfs.devenv:30333
|
||||||
http:
|
http:
|
||||||
addr: :5000
|
addr: :5000
|
||||||
debug:
|
debug:
|
||||||
|
|
|
@ -699,7 +699,7 @@ func (d *driver) Move(ctx context.Context, sourcePath string, destPath string) e
|
||||||
func (d *driver) Delete(ctx context.Context, path string) error {
|
func (d *driver) Delete(ctx context.Context, path string) error {
|
||||||
filters := object.NewSearchFilters()
|
filters := object.NewSearchFilters()
|
||||||
filters.AddRootFilter()
|
filters.AddRootFilter()
|
||||||
filters.AddFilter(attributeFilePath, path, object.MatchStringEqual)
|
filters.AddFilter(attributeFilePath, path, object.MatchCommonPrefix)
|
||||||
dkirillov marked this conversation as resolved
Outdated
|
|||||||
|
|
||||||
var prmSearch pool.PrmObjectSearch
|
var prmSearch pool.PrmObjectSearch
|
||||||
prmSearch.SetContainerID(d.containerID)
|
prmSearch.SetContainerID(d.containerID)
|
||||||
|
@ -714,10 +714,19 @@ func (d *driver) Delete(ctx context.Context, path string) error {
|
||||||
|
|
||||||
var inErr error
|
var inErr error
|
||||||
err = res.Iterate(func(id oid.ID) bool {
|
err = res.Iterate(func(id oid.ID) bool {
|
||||||
if err = d.delete(ctx, id); err != nil {
|
// Check if a key is a subpath (so that deleting "/a" does not delete "/ab").
|
||||||
inErr = fmt.Errorf("couldn't delete object by path '%s': %w", path, err)
|
isSubPath, err := d.checkIsSubPath(ctx, path, id)
|
||||||
|
if err != nil {
|
||||||
|
inErr = err
|
||||||
return true
|
return true
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if isSubPath {
|
||||||
|
if err = d.delete(ctx, id); err != nil {
|
||||||
|
inErr = fmt.Errorf("couldn't delete object by path '%s': %w", path, err)
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
}
|
||||||
return false
|
return false
|
||||||
})
|
})
|
||||||
if err == nil {
|
if err == nil {
|
||||||
|
@ -730,6 +739,19 @@ func (d *driver) Delete(ctx context.Context, path string) error {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (d *driver) checkIsSubPath(ctx context.Context, path string, id oid.ID) (bool, error) {
|
||||||
|
var prmHead pool.PrmObjectHead
|
||||||
|
prmHead.SetAddress(d.objectAddress(id))
|
||||||
|
|
||||||
|
obj, err := d.sdkPool.HeadObject(ctx, prmHead)
|
||||||
|
if err != nil {
|
||||||
|
return false, fmt.Errorf("couldn't head object part '%s', id '%s': %w", path, id, err)
|
||||||
|
}
|
||||||
|
|
||||||
|
fileInf := newFileInfo(ctx, obj, "")
|
||||||
dkirillov marked this conversation as resolved
Outdated
dkirillov
commented
If something goes wrong (e.g. in Also we can write
And finally let's keep the similar comment If something goes wrong (e.g. in `newFileInfo`) we theoretically can get panic in `fileInf.Path()[len(path)]`. So we have to use `fileInf.Path() <= path`.
Also we can write
```golang
fileInf := newFileInfo(ctx, obj, "")
return fileInf.Path() <= path || fileInf.Path()[len(path)] == '/', nil
```
And finally let's keep the similar comment
`// Check if a key is a subpath (so that deleting "/a" does not delete "/ab").`
|
|||||||
|
return fileInf.Path() <= path || fileInf.Path()[len(path)] == '/', nil
|
||||||
|
}
|
||||||
|
|
||||||
func (d *driver) delete(ctx context.Context, id oid.ID) error {
|
func (d *driver) delete(ctx context.Context, id oid.ID) error {
|
||||||
var prm pool.PrmObjectDelete
|
var prm pool.PrmObjectDelete
|
||||||
prm.SetAddress(d.objectAddress(id))
|
prm.SetAddress(d.objectAddress(id))
|
||||||
|
|
Loading…
Reference in a new issue
Shouldn't we also do the same then ?