From 624e8cd3cb9ecad79c0000d78124c14d7173f755 Mon Sep 17 00:00:00 2001 From: Leonard Lyubich Date: Thu, 1 Oct 2020 11:52:21 +0300 Subject: [PATCH] [#58] object/search: Refactor RelationSearcher implementation Signed-off-by: Leonard Lyubich --- pkg/services/object/search/relation.go | 64 ++++++++++++++++++++++++++ pkg/services/object/search/service.go | 43 ----------------- 2 files changed, 64 insertions(+), 43 deletions(-) create mode 100644 pkg/services/object/search/relation.go diff --git a/pkg/services/object/search/relation.go b/pkg/services/object/search/relation.go new file mode 100644 index 000000000..99da04c26 --- /dev/null +++ b/pkg/services/object/search/relation.go @@ -0,0 +1,64 @@ +package searchsvc + +import ( + "context" + "io" + + "github.com/nspcc-dev/neofs-api-go/pkg/object" + "github.com/nspcc-dev/neofs-node/pkg/services/object/search/query" + queryV1 "github.com/nspcc-dev/neofs-node/pkg/services/object/search/query/v1" + "github.com/pkg/errors" +) + +type RelationSearcher struct { + svc *Service + + queryGenerator func(*object.Address) query.Query +} + +func (s *RelationSearcher) SearchRelation(ctx context.Context, addr *object.Address) (*object.ID, error) { + streamer, err := s.svc.Search(ctx, new(Prm). + WithContainerID(addr.GetContainerID()). + WithSearchQuery(s.queryGenerator(addr)), + ) + if err != nil { + return nil, errors.Wrapf(err, "(%T) could not create search streamer", s) + } + + res, err := readFullStream(streamer, 1) + if err != nil { + return nil, errors.Wrapf(err, "(%T) could not read full search stream", s) + } else if ln := len(res); ln != 1 { + return nil, errors.Errorf("(%T) unexpected amount of found objects %d", s, ln) + } + + return res[0], nil +} + +func readFullStream(s *Streamer, cap int) ([]*object.ID, error) { + res := make([]*object.ID, 0, cap) + + for { + r, err := s.Recv() + if err != nil { + if errors.Is(errors.Cause(err), io.EOF) { + break + } + + return nil, errors.Wrapf(err, "(%s) could not receive search result", "readFullStream") + } + + res = append(res, r.IDList()...) + } + + return res, nil +} + +func NewRightChildSearcher(svc *Service) *RelationSearcher { + return &RelationSearcher{ + svc: svc, + queryGenerator: func(addr *object.Address) query.Query { + return queryV1.NewRightChildQuery(addr.GetObjectID()) + }, + } +} diff --git a/pkg/services/object/search/service.go b/pkg/services/object/search/service.go index 62f06fcd2..d16bd8f2d 100644 --- a/pkg/services/object/search/service.go +++ b/pkg/services/object/search/service.go @@ -2,7 +2,6 @@ package searchsvc import ( "context" - "io" "sync" "github.com/nspcc-dev/neofs-api-go/pkg/object" @@ -10,10 +9,8 @@ import ( "github.com/nspcc-dev/neofs-node/pkg/core/netmap" "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/localstore" "github.com/nspcc-dev/neofs-node/pkg/network" - "github.com/nspcc-dev/neofs-node/pkg/services/object/search/query/v1" objutil "github.com/nspcc-dev/neofs-node/pkg/services/object/util" "github.com/nspcc-dev/neofs-node/pkg/util" - "github.com/pkg/errors" ) type Service struct { @@ -64,46 +61,6 @@ func (p *Service) Search(ctx context.Context, prm *Prm) (*Streamer, error) { }, nil } -func (p *Service) SearchRightChild(ctx context.Context, addr *object.Address) (*object.ID, error) { - streamer, err := p.Search(ctx, new(Prm). - WithContainerID(addr.GetContainerID()). - WithSearchQuery( - query.NewRightChildQuery(addr.GetObjectID()), - ), - ) - if err != nil { - return nil, errors.Wrapf(err, "(%T) could not create search streamer", p) - } - - res, err := readFullStream(streamer, 1) - if err != nil { - return nil, errors.Wrapf(err, "(%T) could not read full search stream", p) - } else if ln := len(res); ln != 1 { - return nil, errors.Errorf("(%T) unexpected amount of found objects %d", p, ln) - } - - return res[0], nil -} - -func readFullStream(s *Streamer, cap int) ([]*object.ID, error) { - res := make([]*object.ID, 0, cap) - - for { - r, err := s.Recv() - if err != nil { - if errors.Is(errors.Cause(err), io.EOF) { - break - } - - return nil, errors.Wrapf(err, "(%s) could not receive search result", "readFullStream") - } - - res = append(res, r.IDList()...) - } - - return res, nil -} - func WithKeyStorage(v *objutil.KeyStorage) Option { return func(c *cfg) { c.keyStorage = v