[#45] object/search: Add filtering parent objects

In previous implementation object.Search services allowed to search only
physically stored objects. This limitation did not allow getting the ID of
the split object.

Extend search execution logic with parent object filtering. Parent objects
that passed filters are now included in the result

Signed-off-by: Leonard Lyubich <leonard@nspcc.ru>
support/v0.27
Leonard Lyubich 2020-09-28 18:11:11 +03:00 committed by Alex Vanin
parent 88459963fb
commit 08b9ae547a
1 changed files with 30 additions and 25 deletions

View File

@ -3,7 +3,8 @@ package searchsvc
import (
"context"
"github.com/nspcc-dev/neofs-api-go/pkg/object"
objectSDK "github.com/nspcc-dev/neofs-api-go/pkg/object"
"github.com/nspcc-dev/neofs-node/pkg/core/object"
"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/bucket"
"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/localstore"
"github.com/nspcc-dev/neofs-node/pkg/services/object/search/query"
@ -16,10 +17,23 @@ type localStream struct {
storage *localstore.Storage
}
func (s *localStream) stream(ctx context.Context, ch chan<- []*object.ID) error {
idList := make([]*object.ID, 0)
type searchQueryFilter struct {
localstore.FilterPipeline
if err := s.storage.Iterate(newFilterPipeline(s.query), func(meta *localstore.ObjectMeta) bool {
query query.Query
ch chan<- []*objectSDK.ID
}
func (s *localStream) stream(ctx context.Context, ch chan<- []*objectSDK.ID) error {
idList := make([]*objectSDK.ID, 0)
filter := &searchQueryFilter{
query: s.query,
ch: ch,
}
if err := s.storage.Iterate(filter, func(meta *localstore.ObjectMeta) bool {
select {
case <-ctx.Done():
return true
@ -37,28 +51,19 @@ func (s *localStream) stream(ctx context.Context, ch chan<- []*object.ID) error
return nil
}
func newFilterPipeline(q query.Query) localstore.FilterPipeline {
res := localstore.NewFilter(&localstore.FilterParams{
Name: "SEARCH_OBJECTS_FILTER",
FilterFunc: func(context.Context, *localstore.ObjectMeta) *localstore.FilterResult {
return localstore.ResultPass()
},
})
func (f *searchQueryFilter) Pass(ctx context.Context, meta *localstore.ObjectMeta) *localstore.FilterResult {
loop:
for obj := meta.Head(); obj.GetID() != nil; obj = object.NewFromSDK(obj.GetParent()) {
if !f.query.Match(obj) {
continue
}
if err := res.PutSubFilter(localstore.SubFilterParams{
FilterPipeline: localstore.NewFilter(&localstore.FilterParams{
FilterFunc: func(_ context.Context, o *localstore.ObjectMeta) *localstore.FilterResult {
if !q.Match(o.Head()) {
return localstore.ResultFail()
}
return localstore.ResultPass()
},
}),
OnIgnore: localstore.CodeFail,
OnFail: localstore.CodeFail,
}); err != nil {
panic(errors.Wrap(err, "could not create all pass including filter"))
select {
case <-ctx.Done():
break loop
case f.ch <- []*objectSDK.ID{obj.GetID()}:
}
}
return res
return localstore.ResultPass()
}