From 611a29f6828b5303fbf734ea61eb22674711e8a0 Mon Sep 17 00:00:00 2001 From: Leonard Lyubich Date: Thu, 10 Dec 2020 15:26:40 +0300 Subject: [PATCH] [#241] object/search: Refactor service processing Signed-off-by: Leonard Lyubich --- cmd/neofs-node/object.go | 23 +- pkg/network/transport/object/grpc/search.go | 28 ++ pkg/network/transport/object/grpc/service.go | 25 -- pkg/services/object/acl/acl.go | 36 +- pkg/services/object/response.go | 40 +-- pkg/services/object/search/container.go | 50 +++ pkg/services/object/search/exec.go | 147 ++++++++ pkg/services/object/search/local.go | 43 +-- pkg/services/object/search/prm.go | 56 ++-- pkg/services/object/search/relation.go | 80 ----- pkg/services/object/search/remote.go | 62 +--- pkg/services/object/search/res.go | 13 - pkg/services/object/search/search.go | 50 +++ pkg/services/object/search/search_test.go | 336 +++++++++++++++++++ pkg/services/object/search/service.go | 135 +++----- pkg/services/object/search/streamer.go | 186 ---------- pkg/services/object/search/util.go | 109 ++++++ pkg/services/object/search/v2/service.go | 35 +- pkg/services/object/search/v2/streamer.go | 31 +- pkg/services/object/search/v2/util.go | 75 +++-- pkg/services/object/server.go | 8 +- pkg/services/object/sign.go | 37 +- pkg/services/object/transport_splitter.go | 72 ++-- 23 files changed, 1020 insertions(+), 657 deletions(-) create mode 100644 pkg/network/transport/object/grpc/search.go create mode 100644 pkg/services/object/search/container.go create mode 100644 pkg/services/object/search/exec.go delete mode 100644 pkg/services/object/search/relation.go delete mode 100644 pkg/services/object/search/res.go create mode 100644 pkg/services/object/search/search.go create mode 100644 pkg/services/object/search/search_test.go delete mode 100644 pkg/services/object/search/streamer.go create mode 100644 pkg/services/object/search/util.go diff --git a/cmd/neofs-node/object.go b/cmd/neofs-node/object.go index fc9e27044f..0fa413d0c9 100644 --- a/cmd/neofs-node/object.go +++ b/cmd/neofs-node/object.go @@ -140,8 +140,8 @@ func (s *objectSvc) Head(ctx context.Context, req *object.HeadRequest) (*object. return s.get.Head(ctx, req) } -func (s *objectSvc) Search(ctx context.Context, req *object.SearchRequest) (object.SearchObjectStreamer, error) { - return s.search.Search(ctx, req) +func (s *objectSvc) Search(req *object.SearchRequest, stream objectService.SearchStream) error { + return s.search.Search(req, stream) } func (s *objectSvc) Get(req *object.GetRequest, stream objectService.GetObjectStream) error { @@ -307,22 +307,23 @@ func initObjectService(c *cfg) { putsvcV2.WithInternalService(sPut), ) - sSearch := searchsvc.NewService( - searchsvc.WithKeyStorage(keyStorage), - searchsvc.WithClientCache(clientCache), - searchsvc.WithLocalStorage(ls), - searchsvc.WithContainerSource(c.cfgObject.cnrStorage), - searchsvc.WithNetworkMapSource(c.cfgObject.netMapStorage), - searchsvc.WithLocalAddressSource(c), - searchsvc.WithWorkerPool(c.cfgObject.pool.search), + sSearch := searchsvc.New( searchsvc.WithLogger(c.log), + searchsvc.WithLocalStorageEngine(ls), + searchsvc.WithClientCache(clientCache), searchsvc.WithClientOptions( client.WithDialTimeout(c.viper.GetDuration(cfgObjectSearchDialTimeout)), ), + searchsvc.WithTraverserGenerator( + traverseGen.WithTraverseOptions( + placement.WithoutSuccessTracking(), + ), + ), ) sSearchV2 := searchsvcV2.NewService( searchsvcV2.WithInternalService(sSearch), + searchsvcV2.WithKeyStorage(keyStorage), ) sHead := headsvc.NewService( @@ -364,7 +365,7 @@ func initObjectService(c *cfg) { deletesvc.WithPutService(sPut), deletesvc.WithOwnerID(nodeOwner), deletesvc.WithLinkingHeader( - headsvc.NewRelationHeader(searchsvc.NewLinkingSearcher(sSearch), sHead), + headsvc.NewRelationHeader(nil, sHead), ), deletesvc.WithLogger(c.log), ) diff --git a/pkg/network/transport/object/grpc/search.go b/pkg/network/transport/object/grpc/search.go new file mode 100644 index 0000000000..ab77201d59 --- /dev/null +++ b/pkg/network/transport/object/grpc/search.go @@ -0,0 +1,28 @@ +package object + +import ( + "github.com/nspcc-dev/neofs-api-go/v2/object" + objectGRPC "github.com/nspcc-dev/neofs-api-go/v2/object/grpc" +) + +type searchStreamerV2 struct { + objectGRPC.ObjectService_SearchServer +} + +func (s *searchStreamerV2) Send(resp *object.SearchResponse) error { + return s.ObjectService_SearchServer.Send( + object.SearchResponseToGRPCMessage(resp), + ) +} + +// Search converts gRPC SearchRequest message and server-side stream and overtakes its data +// to gRPC stream. +func (s *Server) Search(req *objectGRPC.SearchRequest, gStream objectGRPC.ObjectService_SearchServer) error { + // TODO: think about how we transport errors through gRPC + return s.srv.Search( + object.SearchRequestFromGRPCMessage(req), + &searchStreamerV2{ + ObjectService_SearchServer: gStream, + }, + ) +} diff --git a/pkg/network/transport/object/grpc/service.go b/pkg/network/transport/object/grpc/service.go index a0429a91c1..370a5cf5f1 100644 --- a/pkg/network/transport/object/grpc/service.go +++ b/pkg/network/transport/object/grpc/service.go @@ -74,31 +74,6 @@ func (s *Server) Head(ctx context.Context, req *objectGRPC.HeadRequest) (*object return object.HeadResponseToGRPCMessage(resp), nil } -// Search converts gRPC SearchRequest message, opens internal Object service Search stream and overtakes its data -// to gRPC stream. -func (s *Server) Search(req *objectGRPC.SearchRequest, gStream objectGRPC.ObjectService_SearchServer) error { - stream, err := s.srv.Search(gStream.Context(), object.SearchRequestFromGRPCMessage(req)) - if err != nil { - // TODO: think about how we transport errors through gRPC - return err - } - - for { - r, err := stream.Recv() - if err != nil { - if errors.Is(errors.Cause(err), io.EOF) { - return nil - } - - return err - } - - if err := gStream.Send(object.SearchResponseToGRPCMessage(r)); err != nil { - return err - } - } -} - // GetRangeHash converts gRPC GetRangeHashRequest message and passes it to internal Object service. func (s *Server) GetRangeHash(ctx context.Context, req *objectGRPC.GetRangeHashRequest) (*objectGRPC.GetRangeHashResponse, error) { resp, err := s.srv.GetRangeHash(ctx, object.GetRangeHashRequestFromGRPCMessage(req)) diff --git a/pkg/services/object/acl/acl.go b/pkg/services/object/acl/acl.go index 40514ba1a8..9fe4f73dbb 100644 --- a/pkg/services/object/acl/acl.go +++ b/pkg/services/object/acl/acl.go @@ -54,11 +54,11 @@ type ( } searchStreamBasicChecker struct { - object.SearchObjectStreamer - } + objectSvc.SearchStream - getRangeStreamBasicChecker struct { - object.GetRangeObjectStreamer + info requestInfo + + *eACLCfg } requestInfo struct { @@ -209,15 +209,12 @@ func (b Service) Head( return resp, err } -func (b Service) Search( - ctx context.Context, - request *object.SearchRequest) (object.SearchObjectStreamer, error) { - +func (b Service) Search(request *object.SearchRequest, stream objectSvc.SearchStream) error { var cid *container.ID cid, err := getContainerIDFromRequest(request) if err != nil { - return nil, err + return err } req := metaWithToken{ @@ -228,17 +225,20 @@ func (b Service) Search( reqInfo, err := b.findRequestInfo(req, cid, acl.OperationSearch) if err != nil { - return nil, err + return err } if !basicACLCheck(reqInfo) { - return nil, basicACLErr(reqInfo) + return basicACLErr(reqInfo) } else if !eACLCheck(request, reqInfo, b.eACLCfg) { - return nil, eACLErr(reqInfo) + return eACLErr(reqInfo) } - stream, err := b.next.Search(ctx, request) - return searchStreamBasicChecker{stream}, err + return b.next.Search(request, &searchStreamBasicChecker{ + SearchStream: stream, + info: reqInfo, + eACLCfg: b.eACLCfg, + }) } func (b Service) Delete( @@ -390,6 +390,14 @@ func (g *rangeStreamBasicChecker) Send(resp *object.GetRangeResponse) error { return g.GetObjectRangeStream.Send(resp) } +func (g *searchStreamBasicChecker) Send(resp *object.SearchResponse) error { + if !eACLCheck(resp, g.info, g.eACLCfg) { + return eACLErr(g.info) + } + + return g.SearchStream.Send(resp) +} + func (b Service) findRequestInfo( req metaWithToken, cid *container.ID, diff --git a/pkg/services/object/response.go b/pkg/services/object/response.go index f7b43bebd4..f60eb9a699 100644 --- a/pkg/services/object/response.go +++ b/pkg/services/object/response.go @@ -16,7 +16,9 @@ type ResponseService struct { } type searchStreamResponser struct { - stream *response.ServerMessageStreamer + util.ServerStream + + respWriter util.ResponseMessageWriter } type getStreamResponser struct { @@ -101,35 +103,17 @@ func (s *ResponseService) Head(ctx context.Context, req *object.HeadRequest) (*o return resp.(*object.HeadResponse), nil } -func (s *searchStreamResponser) Recv() (*object.SearchResponse, error) { - r, err := s.stream.Recv() - if err != nil { - return nil, errors.Wrapf(err, "(%T) could not receive response", s) - } - - return r.(*object.SearchResponse), nil +func (s *searchStreamResponser) Send(resp *object.SearchResponse) error { + return s.respWriter(resp) } -func (s *ResponseService) Search(ctx context.Context, req *object.SearchRequest) (object.SearchObjectStreamer, error) { - stream, err := s.respSvc.HandleServerStreamRequest(ctx, req, - func(ctx context.Context, req interface{}) (util.ResponseMessageReader, error) { - stream, err := s.svc.Search(ctx, req.(*object.SearchRequest)) - if err != nil { - return nil, err - } - - return func() (util.ResponseMessage, error) { - return stream.Recv() - }, nil - }, - ) - if err != nil { - return nil, err - } - - return &searchStreamResponser{ - stream: stream, - }, nil +func (s *ResponseService) Search(req *object.SearchRequest, stream SearchStream) error { + return s.svc.Search(req, &searchStreamResponser{ + ServerStream: stream, + respWriter: s.respSvc.HandleServerStreamRequest_(func(resp util.ResponseMessage) error { + return stream.Send(resp.(*object.SearchResponse)) + }), + }) } func (s *ResponseService) Delete(ctx context.Context, req *object.DeleteRequest) (*object.DeleteResponse, error) { diff --git a/pkg/services/object/search/container.go b/pkg/services/object/search/container.go new file mode 100644 index 0000000000..b7f0eb80aa --- /dev/null +++ b/pkg/services/object/search/container.go @@ -0,0 +1,50 @@ +package searchsvc + +import ( + "context" + + "go.uber.org/zap" +) + +func (exec *execCtx) executeOnContainer() { + if exec.isLocal() { + exec.log.Debug("return result directly") + return + } + + exec.log.Debug("trying to execute in container...") + + traverser, ok := exec.generateTraverser(exec.containerID()) + if !ok { + return + } + + ctx, cancel := context.WithCancel(exec.context()) + defer cancel() + +loop: + for { + addrs := traverser.Next() + if len(addrs) == 0 { + exec.log.Debug("no more nodes, abort placement iteration") + break + } + + for i := range addrs { + select { + case <-ctx.Done(): + exec.log.Debug("interrupt placement iteration by context", + zap.String("error", ctx.Err().Error()), + ) + break loop + default: + } + + // TODO: consider parallel execution + exec.processNode(ctx, addrs[i]) + } + } + + exec.status = statusOK + exec.err = nil +} diff --git a/pkg/services/object/search/exec.go b/pkg/services/object/search/exec.go new file mode 100644 index 0000000000..1526b20137 --- /dev/null +++ b/pkg/services/object/search/exec.go @@ -0,0 +1,147 @@ +package searchsvc + +import ( + "context" + "crypto/ecdsa" + + "github.com/nspcc-dev/neofs-api-go/pkg/client" + "github.com/nspcc-dev/neofs-api-go/pkg/container" + objectSDK "github.com/nspcc-dev/neofs-api-go/pkg/object" + "github.com/nspcc-dev/neofs-node/pkg/network" + "github.com/nspcc-dev/neofs-node/pkg/services/object_manager/placement" + "github.com/nspcc-dev/neofs-node/pkg/util/logger" + "go.uber.org/zap" +) + +type statusError struct { + status int + err error +} + +type execCtx struct { + svc *Service + + ctx context.Context + + prm Prm + + statusError + + log *logger.Logger +} + +const ( + statusUndefined int = iota + statusOK +) + +func (exec *execCtx) prepare() { + if _, ok := exec.prm.writer.(*uniqueIDWriter); !ok { + exec.prm.writer = newUniqueAddressWriter(exec.prm.writer) + } + + fs := exec.prm.SearchFilters() + fs.AddObjectContainerIDFilter(objectSDK.MatchStringEqual, exec.containerID()) + exec.prm.WithSearchFilters(fs) +} + +func (exec *execCtx) setLogger(l *logger.Logger) { + exec.log = l.With( + zap.String("request", "SEARCH"), + zap.Stringer("container", exec.containerID()), + zap.Bool("local", exec.isLocal()), + zap.Bool("with session", exec.prm.common.SessionToken() != nil), + zap.Bool("with bearer", exec.prm.common.BearerToken() != nil), + ) +} + +func (exec execCtx) context() context.Context { + return exec.ctx +} + +func (exec execCtx) isLocal() bool { + return exec.prm.common.LocalOnly() +} + +func (exec execCtx) key() *ecdsa.PrivateKey { + return exec.prm.key +} + +func (exec execCtx) callOptions() []client.CallOption { + return exec.prm.callOpts +} + +func (exec execCtx) remotePrm() *client.SearchObjectParams { + return &exec.prm.SearchObjectParams +} + +func (exec *execCtx) containerID() *container.ID { + return exec.prm.ContainerID() +} + +func (exec *execCtx) searchFilters() objectSDK.SearchFilters { + return exec.prm.SearchFilters() +} + +func (exec *execCtx) generateTraverser(cid *container.ID) (*placement.Traverser, bool) { + t, err := exec.svc.traverserGenerator.generateTraverser(cid) + + switch { + default: + exec.status = statusUndefined + exec.err = err + + exec.log.Debug("could not generate container traverser", + zap.String("error", err.Error()), + ) + + return nil, false + case err == nil: + return t, true + } +} + +func (exec execCtx) remoteClient(node *network.Address) (searchClient, bool) { + ipAddr, err := node.IPAddrString() + + log := exec.log.With(zap.Stringer("node", node)) + + switch { + default: + exec.status = statusUndefined + exec.err = err + + log.Debug("could not calculate node IP address") + case err == nil: + c, err := exec.svc.clientCache.get(exec.key(), ipAddr) + + switch { + default: + exec.status = statusUndefined + exec.err = err + + log.Debug("could not construct remote node client") + case err == nil: + return c, true + } + } + + return nil, false +} + +func (exec *execCtx) writeIDList(ids []*objectSDK.ID) { + err := exec.prm.writer.WriteIDs(ids) + + switch { + default: + exec.status = statusUndefined + exec.err = err + + exec.log.Debug("could not write object identifiers", + zap.String("error", err.Error()), + ) + case err == nil: + exec.status = statusOK + exec.err = nil + } +} diff --git a/pkg/services/object/search/local.go b/pkg/services/object/search/local.go index 4e98964791..1e47769212 100644 --- a/pkg/services/object/search/local.go +++ b/pkg/services/object/search/local.go @@ -1,41 +1,22 @@ package searchsvc import ( - "context" - - "github.com/nspcc-dev/neofs-api-go/pkg/container" - objectSDK "github.com/nspcc-dev/neofs-api-go/pkg/object" - "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/engine" - "github.com/nspcc-dev/neofs-node/pkg/services/object/search/query" - "github.com/pkg/errors" + "go.uber.org/zap" ) -type localStream struct { - query query.Query +func (exec *execCtx) executeLocal() { + ids, err := exec.svc.localStorage.search(exec) - storage *engine.StorageEngine - - cid *container.ID -} - -func (s *localStream) stream(ctx context.Context, ch chan<- []*objectSDK.ID) error { - fs := s.query.ToSearchFilters() - - addrList, err := engine.Select(s.storage, s.cid, fs) if err != nil { - return errors.Wrapf(err, "(%T) could not select objects from local storage", s) + exec.status = statusUndefined + exec.err = err + + exec.log.Debug("local operation failed", + zap.String("error", err.Error()), + ) + + return } - idList := make([]*objectSDK.ID, 0, len(addrList)) - - for i := range addrList { - idList = append(idList, addrList[i].ObjectID()) - } - - select { - case <-ctx.Done(): - return ctx.Err() - case ch <- idList: - return nil - } + exec.writeIDList(ids) } diff --git a/pkg/services/object/search/prm.go b/pkg/services/object/search/prm.go index e65a4f8015..a458f9bae2 100644 --- a/pkg/services/object/search/prm.go +++ b/pkg/services/object/search/prm.go @@ -1,39 +1,49 @@ package searchsvc import ( - "github.com/nspcc-dev/neofs-api-go/pkg/container" - "github.com/nspcc-dev/neofs-node/pkg/services/object/search/query" + "crypto/ecdsa" + + "github.com/nspcc-dev/neofs-api-go/pkg/client" + objectSDK "github.com/nspcc-dev/neofs-api-go/pkg/object" "github.com/nspcc-dev/neofs-node/pkg/services/object/util" ) +// Prm groups parameters of Get service call. type Prm struct { + writer IDListWriter + + // TODO: replace key and callOpts to CommonPrm + key *ecdsa.PrivateKey + + callOpts []client.CallOption + common *util.CommonPrm - cid *container.ID - - query query.Query + client.SearchObjectParams } -func (p *Prm) WithContainerID(v *container.ID) *Prm { - if p != nil { - p.cid = v - } - - return p +// IDListWriter is an interface of target component +// to write list of object identifiers. +type IDListWriter interface { + WriteIDs([]*objectSDK.ID) error } -func (p *Prm) WithSearchQuery(v query.Query) *Prm { - if p != nil { - p.query = v - } - - return p +// SetPrivateKey sets private key to use during execution. +func (p *Prm) SetPrivateKey(key *ecdsa.PrivateKey) { + p.key = key } -func (p *Prm) WithCommonPrm(v *util.CommonPrm) *Prm { - if p != nil { - p.common = v - } - - return p +// SetRemoteCallOptions sets call options remote remote client calls. +func (p *Prm) SetRemoteCallOptions(opts ...client.CallOption) { + p.callOpts = opts +} + +// SetCommonParameters sets common parameters of the operation. +func (p *Prm) SetCommonParameters(common *util.CommonPrm) { + p.common = common +} + +// SetWriter sets target component to write list of object identifiers. +func (p *Prm) SetWriter(w IDListWriter) { + p.writer = w } diff --git a/pkg/services/object/search/relation.go b/pkg/services/object/search/relation.go deleted file mode 100644 index 95b55d971a..0000000000 --- a/pkg/services/object/search/relation.go +++ /dev/null @@ -1,80 +0,0 @@ -package searchsvc - -import ( - "context" - "io" - - "github.com/nspcc-dev/neofs-api-go/pkg/object" - "github.com/nspcc-dev/neofs-node/pkg/services/object/search/query" - queryV1 "github.com/nspcc-dev/neofs-node/pkg/services/object/search/query/v1" - "github.com/nspcc-dev/neofs-node/pkg/services/object/util" - "github.com/pkg/errors" -) - -type RelationSearcher struct { - svc *Service - - queryGenerator func(*object.Address) query.Query -} - -var ErrRelationNotFound = errors.New("relation not found") - -func (s *RelationSearcher) SearchRelation(ctx context.Context, addr *object.Address, prm *util.CommonPrm) (*object.ID, error) { - streamer, err := s.svc.Search(ctx, new(Prm). - WithContainerID(addr.ContainerID()).WithCommonPrm(prm). - WithSearchQuery(s.queryGenerator(addr)), - ) - if err != nil { - return nil, errors.Wrapf(err, "(%T) could not create search streamer", s) - } - - res, err := readFullStream(streamer, 1) - if err != nil { - return nil, errors.Wrapf(err, "(%T) could not read full search stream", s) - } else if ln := len(res); ln != 1 { - if ln == 0 { - return nil, ErrRelationNotFound - } - - return nil, errors.Errorf("(%T) unexpected amount of found objects %d", s, ln) - } - - return res[0], nil -} - -func readFullStream(s *Streamer, cap int) ([]*object.ID, error) { - res := make([]*object.ID, 0, cap) - - for { - r, err := s.Recv() - if err != nil { - if errors.Is(errors.Cause(err), io.EOF) { - break - } - - return nil, errors.Wrapf(err, "(%s) could not receive search result", "readFullStream") - } - - res = append(res, r.IDList()...) - } - - return res, nil -} - -func NewRightChildSearcher(svc *Service) *RelationSearcher { - return &RelationSearcher{ - svc: svc, - queryGenerator: func(addr *object.Address) query.Query { - return queryV1.NewRightChildQuery(addr.ObjectID()) - }, - } -} - -func NewLinkingSearcher(svc *Service) *RelationSearcher { - return &RelationSearcher{ - svc: svc, - queryGenerator: func(addr *object.Address) query.Query { - return queryV1.NewLinkingQuery(addr.ObjectID()) - }, - } -} diff --git a/pkg/services/object/search/remote.go b/pkg/services/object/search/remote.go index 95fa5fb063..c5db7bdbd7 100644 --- a/pkg/services/object/search/remote.go +++ b/pkg/services/object/search/remote.go @@ -3,55 +3,29 @@ package searchsvc import ( "context" - "github.com/nspcc-dev/neofs-api-go/pkg/client" - "github.com/nspcc-dev/neofs-api-go/pkg/object" "github.com/nspcc-dev/neofs-node/pkg/network" - "github.com/nspcc-dev/neofs-node/pkg/network/cache" - "github.com/nspcc-dev/neofs-node/pkg/services/object/util" - "github.com/pkg/errors" + "go.uber.org/zap" ) -type remoteStream struct { - prm *Prm +func (exec *execCtx) processNode(ctx context.Context, addr *network.Address) { + log := exec.log.With(zap.Stringer("remote node", addr)) - keyStorage *util.KeyStorage + log.Debug("processing node...") - addr *network.Address + client, ok := exec.remoteClient(addr) + if !ok { + return + } - clientCache *cache.ClientCache + ids, err := client.searchObjects(exec) - clientOpts []client.Option -} - -func (s *remoteStream) stream(ctx context.Context, ch chan<- []*object.ID) error { - key, err := s.keyStorage.GetKey(s.prm.common.SessionToken()) - if err != nil { - return errors.Wrapf(err, "(%T) could not receive private key", s) - } - - addr, err := s.addr.IPAddrString() - if err != nil { - return err - } - - c, err := s.clientCache.Get(key, addr, s.clientOpts...) - if err != nil { - return errors.Wrapf(err, "(%T) could not create SDK client %s", s, addr) - } - - // TODO: add writer parameter to SDK client - id, err := c.SearchObject(ctx, new(client.SearchObjectParams). - WithContainerID(s.prm.cid). - WithSearchFilters(s.prm.query.ToSearchFilters()), - client.WithTTL(1), // FIXME: use constant - client.WithSession(s.prm.common.SessionToken()), - client.WithBearer(s.prm.common.BearerToken()), - ) - if err != nil { - return errors.Wrapf(err, "(%T) could not search objects in %s", s, addr) - } - - ch <- id - - return nil + if err != nil { + exec.log.Debug("local operation failed", + zap.String("error", err.Error()), + ) + + return + } + + exec.writeIDList(ids) } diff --git a/pkg/services/object/search/res.go b/pkg/services/object/search/res.go deleted file mode 100644 index be99ffe601..0000000000 --- a/pkg/services/object/search/res.go +++ /dev/null @@ -1,13 +0,0 @@ -package searchsvc - -import ( - "github.com/nspcc-dev/neofs-api-go/pkg/object" -) - -type Response struct { - idList []*object.ID -} - -func (r *Response) IDList() []*object.ID { - return r.idList -} diff --git a/pkg/services/object/search/search.go b/pkg/services/object/search/search.go new file mode 100644 index 0000000000..5e1249a1b9 --- /dev/null +++ b/pkg/services/object/search/search.go @@ -0,0 +1,50 @@ +package searchsvc + +import ( + "context" + + "go.uber.org/zap" +) + +// Search serves a request to select the objects. +func (s *Service) Search(ctx context.Context, prm Prm) error { + exec := &execCtx{ + svc: s, + ctx: ctx, + prm: prm, + } + + exec.prepare() + + exec.setLogger(s.log) + + exec.execute() + + return exec.statusError.err +} + +func (exec *execCtx) execute() { + exec.log.Debug("serving request...") + + // perform local operation + exec.executeLocal() + + exec.analyzeStatus(true) +} + +func (exec *execCtx) analyzeStatus(execCnr bool) { + // analyze local result + switch exec.status { + default: + exec.log.Debug("operation finished with error", + zap.String("error", exec.err.Error()), + ) + case statusOK: + exec.log.Debug("operation finished successfully") + } + + if execCnr { + exec.executeOnContainer() + exec.analyzeStatus(false) + } +} diff --git a/pkg/services/object/search/search_test.go b/pkg/services/object/search/search_test.go new file mode 100644 index 0000000000..20e2c12857 --- /dev/null +++ b/pkg/services/object/search/search_test.go @@ -0,0 +1,336 @@ +package searchsvc + +import ( + "context" + "crypto/ecdsa" + "crypto/rand" + "crypto/sha256" + "fmt" + "strconv" + "testing" + + "github.com/nspcc-dev/neofs-api-go/pkg/container" + "github.com/nspcc-dev/neofs-api-go/pkg/netmap" + objectSDK "github.com/nspcc-dev/neofs-api-go/pkg/object" + "github.com/nspcc-dev/neofs-node/pkg/network" + "github.com/nspcc-dev/neofs-node/pkg/services/object/util" + "github.com/nspcc-dev/neofs-node/pkg/services/object_manager/placement" + "github.com/nspcc-dev/neofs-node/pkg/util/logger/test" + "github.com/pkg/errors" + "github.com/stretchr/testify/require" +) + +type idsErr struct { + ids []*objectSDK.ID + err error +} + +type testStorage struct { + items map[string]idsErr +} + +type testTraverserGenerator struct { + c *container.Container + b placement.Builder +} + +type testPlacementBuilder struct { + vectors map[string][]netmap.Nodes +} + +type testClientCache struct { + clients map[string]*testStorage +} + +type simpleIDWriter struct { + ids []*objectSDK.ID +} + +func (s *simpleIDWriter) WriteIDs(ids []*objectSDK.ID) error { + s.ids = append(s.ids, ids...) + return nil +} + +func newTestStorage() *testStorage { + return &testStorage{ + items: make(map[string]idsErr), + } +} + +func (g *testTraverserGenerator) generateTraverser(_ *container.ID) (*placement.Traverser, error) { + return placement.NewTraverser( + placement.ForContainer(g.c), + placement.UseBuilder(g.b), + placement.WithoutSuccessTracking(), + ) +} + +func (p *testPlacementBuilder) BuildPlacement(addr *objectSDK.Address, _ *netmap.PlacementPolicy) ([]netmap.Nodes, error) { + vs, ok := p.vectors[addr.String()] + if !ok { + return nil, errors.New("vectors for address not found") + } + + return vs, nil +} + +func (c *testClientCache) get(_ *ecdsa.PrivateKey, addr string) (searchClient, error) { + v, ok := c.clients[addr] + if !ok { + return nil, errors.New("could not construct client") + } + + return v, nil +} + +func (s *testStorage) search(exec *execCtx) ([]*objectSDK.ID, error) { + v, ok := s.items[exec.containerID().String()] + if !ok { + return nil, nil + } + + return v.ids, v.err +} + +func (c *testStorage) searchObjects(exec *execCtx) ([]*objectSDK.ID, error) { + v, ok := c.items[exec.containerID().String()] + if !ok { + return nil, nil + } + + return v.ids, v.err +} + +func (c *testStorage) addResult(addr *container.ID, ids []*objectSDK.ID, err error) { + c.items[addr.String()] = idsErr{ + ids: ids, + err: err, + } +} + +func testSHA256() (cs [sha256.Size]byte) { + rand.Read(cs[:]) + return cs +} + +func generateCID() *container.ID { + cid := container.NewID() + cid.SetSHA256(testSHA256()) + + return cid +} + +func generateIDs(num int) []*objectSDK.ID { + res := make([]*objectSDK.ID, num) + + for i := 0; i < num; i++ { + res[i] = objectSDK.NewID() + res[i].SetSHA256(testSHA256()) + } + + return res +} + +func TestGetLocalOnly(t *testing.T) { + ctx := context.Background() + + newSvc := func(storage *testStorage) *Service { + svc := &Service{cfg: new(cfg)} + svc.log = test.NewLogger(false) + svc.localStorage = storage + + return svc + } + + newPrm := func(cid *container.ID, w IDListWriter) Prm { + p := Prm{} + p.WithContainerID(cid) + p.SetWriter(w) + p.common = new(util.CommonPrm).WithLocalOnly(true) + + return p + } + + t.Run("OK", func(t *testing.T) { + storage := newTestStorage() + svc := newSvc(storage) + + cid := generateCID() + ids := generateIDs(10) + storage.addResult(cid, ids, nil) + + w := new(simpleIDWriter) + p := newPrm(cid, w) + + err := svc.Search(ctx, p) + require.NoError(t, err) + require.Equal(t, ids, w.ids) + }) + + t.Run("FAIL", func(t *testing.T) { + storage := newTestStorage() + svc := newSvc(storage) + + cid := generateCID() + testErr := errors.New("any error") + storage.addResult(cid, nil, testErr) + + w := new(simpleIDWriter) + p := newPrm(cid, w) + + err := svc.Search(ctx, p) + require.True(t, errors.Is(err, testErr)) + }) +} + +func testNodeMatrix(t testing.TB, dim []int) ([]netmap.Nodes, [][]string) { + mNodes := make([]netmap.Nodes, len(dim)) + mAddr := make([][]string, len(dim)) + + for i := range dim { + ns := make([]netmap.NodeInfo, dim[i]) + as := make([]string, dim[i]) + + for j := 0; j < dim[i]; j++ { + a := fmt.Sprintf("/ip4/192.168.0.%s/tcp/%s", + strconv.Itoa(i), + strconv.Itoa(60000+j), + ) + + var err error + na, err := network.AddressFromString(a) + require.NoError(t, err) + + as[j], err = na.IPAddrString() + require.NoError(t, err) + + ni := netmap.NewNodeInfo() + ni.SetAddress(a) + + ns[j] = *ni + } + + mNodes[i] = netmap.NodesFromInfo(ns) + mAddr[i] = as + } + + return mNodes, mAddr +} + +// +// func generateChain(ln int, cid *container.ID) ([]*object.RawObject, []*objectSDK.ID, []byte) { +// curID := generateID() +// var prevID *objectSDK.ID +// +// addr := objectSDK.NewAddress() +// addr.SetContainerID(cid) +// +// res := make([]*object.RawObject, 0, ln) +// ids := make([]*objectSDK.ID, 0, ln) +// payload := make([]byte, 0, ln*10) +// +// for i := 0; i < ln; i++ { +// ids = append(ids, curID) +// addr.SetObjectID(curID) +// +// payloadPart := make([]byte, 10) +// rand.Read(payloadPart) +// +// o := generateObject(addr, prevID, []byte{byte(i)}) +// o.SetPayload(payloadPart) +// o.SetPayloadSize(uint64(len(payloadPart))) +// o.SetID(curID) +// +// payload = append(payload, payloadPart...) +// +// res = append(res, o) +// +// prevID = curID +// curID = generateID() +// } +// +// return res, ids, payload +// } + +func TestGetRemoteSmall(t *testing.T) { + ctx := context.Background() + + placementDim := []int{2} + + rs := make([]*netmap.Replica, 0, len(placementDim)) + for i := range placementDim { + r := netmap.NewReplica() + r.SetCount(uint32(placementDim[i])) + + rs = append(rs, r) + } + + pp := netmap.NewPlacementPolicy() + pp.SetReplicas(rs...) + + cnr := container.New(container.WithPolicy(pp)) + cid := container.CalculateID(cnr) + + newSvc := func(b *testPlacementBuilder, c *testClientCache) *Service { + svc := &Service{cfg: new(cfg)} + svc.log = test.NewLogger(false) + svc.localStorage = newTestStorage() + + svc.traverserGenerator = &testTraverserGenerator{ + c: cnr, + b: b, + } + svc.clientCache = c + + return svc + } + + newPrm := func(cid *container.ID, w IDListWriter) Prm { + p := Prm{} + p.WithContainerID(cid) + p.SetWriter(w) + p.common = new(util.CommonPrm).WithLocalOnly(false) + + return p + } + + t.Run("OK", func(t *testing.T) { + addr := objectSDK.NewAddress() + addr.SetContainerID(cid) + + ns, as := testNodeMatrix(t, placementDim) + + builder := &testPlacementBuilder{ + vectors: map[string][]netmap.Nodes{ + addr.String(): ns, + }, + } + + c1 := newTestStorage() + ids1 := generateIDs(10) + c1.addResult(cid, ids1, nil) + + c2 := newTestStorage() + ids2 := generateIDs(10) + c2.addResult(cid, ids2, nil) + + svc := newSvc(builder, &testClientCache{ + clients: map[string]*testStorage{ + as[0][0]: c1, + as[0][1]: c2, + }, + }) + + w := new(simpleIDWriter) + + p := newPrm(cid, w) + + err := svc.Search(ctx, p) + require.NoError(t, err) + require.Len(t, w.ids, len(ids1)+len(ids2)) + + for _, id := range append(ids1, ids2...) { + require.Contains(t, w.ids, id) + } + }) +} diff --git a/pkg/services/object/search/service.go b/pkg/services/object/search/service.go index e66b2bb2b4..f90311a760 100644 --- a/pkg/services/object/search/service.go +++ b/pkg/services/object/search/service.go @@ -1,56 +1,58 @@ package searchsvc import ( - "context" - "sync" + "crypto/ecdsa" "github.com/nspcc-dev/neofs-api-go/pkg/client" + "github.com/nspcc-dev/neofs-api-go/pkg/container" "github.com/nspcc-dev/neofs-api-go/pkg/object" - "github.com/nspcc-dev/neofs-node/pkg/core/container" - "github.com/nspcc-dev/neofs-node/pkg/core/netmap" "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/engine" - "github.com/nspcc-dev/neofs-node/pkg/network" "github.com/nspcc-dev/neofs-node/pkg/network/cache" - objutil "github.com/nspcc-dev/neofs-node/pkg/services/object/util" - "github.com/nspcc-dev/neofs-node/pkg/util" + "github.com/nspcc-dev/neofs-node/pkg/services/object/util" + "github.com/nspcc-dev/neofs-node/pkg/services/object_manager/placement" "github.com/nspcc-dev/neofs-node/pkg/util/logger" "go.uber.org/zap" ) +// Service is an utility serving requests +// of Object.Search service. type Service struct { *cfg } +// Option is a Service's constructor option. type Option func(*cfg) +type searchClient interface { + searchObjects(*execCtx) ([]*object.ID, error) +} + type cfg struct { - keyStorage *objutil.KeyStorage - - localStore *engine.StorageEngine - - cnrSrc container.Source - - netMapSrc netmap.Source - - workerPool util.WorkerPool - - localAddrSrc network.LocalAddressSource - - clientCache *cache.ClientCache - log *logger.Logger - clientOpts []client.Option + localStorage interface { + search(*execCtx) ([]*object.ID, error) + } + + clientCache interface { + get(*ecdsa.PrivateKey, string) (searchClient, error) + } + + traverserGenerator interface { + generateTraverser(*container.ID) (*placement.Traverser, error) + } } func defaultCfg() *cfg { return &cfg{ - workerPool: new(util.SyncWorkerPool), - log: zap.L(), + log: zap.L(), + clientCache: new(clientCacheWrapper), } } -func NewService(opts ...Option) *Service { +// New creates, initializes and returns utility serving +// Object.Get service requests. +func New(opts ...Option) *Service { c := defaultCfg() for i := range opts { @@ -62,66 +64,39 @@ func NewService(opts ...Option) *Service { } } -func (p *Service) Search(ctx context.Context, prm *Prm) (*Streamer, error) { - return &Streamer{ - cfg: p.cfg, - once: new(sync.Once), - prm: prm, - ctx: ctx, - cache: make([][]*object.ID, 0, 10), - }, nil -} - -func WithKeyStorage(v *objutil.KeyStorage) Option { - return func(c *cfg) { - c.keyStorage = v - } -} - -func WithLocalStorage(v *engine.StorageEngine) Option { - return func(c *cfg) { - c.localStore = v - } -} - -func WithContainerSource(v container.Source) Option { - return func(c *cfg) { - c.cnrSrc = v - } -} - -func WithNetworkMapSource(v netmap.Source) Option { - return func(c *cfg) { - c.netMapSrc = v - } -} - -func WithWorkerPool(v util.WorkerPool) Option { - return func(c *cfg) { - c.workerPool = v - } -} - -func WithLocalAddressSource(v network.LocalAddressSource) Option { - return func(c *cfg) { - c.localAddrSrc = v - } -} - -func WithClientCache(v *cache.ClientCache) Option { - return func(c *cfg) { - c.clientCache = v - } -} - +// WithLogger returns option to specify Get service's logger. func WithLogger(l *logger.Logger) Option { return func(c *cfg) { - c.log = l + c.log = l.With(zap.String("component", "Object.Get service")) } } -func WithClientOptions(opts ...client.Option) Option { +// WithLocalStorageEngine returns option to set local storage +// instance. +func WithLocalStorageEngine(e *engine.StorageEngine) Option { return func(c *cfg) { - c.clientOpts = opts + c.localStorage = (*storageEngineWrapper)(e) + } +} + +// WithClientCache returns option to set cache of remote node clients. +func WithClientCache(v *cache.ClientCache) Option { + return func(c *cfg) { + c.clientCache.(*clientCacheWrapper).cache = v + } +} + +// WithClientOptions returns option to specify options of remote node clients. +func WithClientOptions(opts ...client.Option) Option { + return func(c *cfg) { + c.clientCache.(*clientCacheWrapper).opts = opts + } +} + +// WithTraverserGenerator returns option to set generator of +// placement traverser to get the objects from containers. +func WithTraverserGenerator(t *util.TraverserGenerator) Option { + return func(c *cfg) { + c.traverserGenerator = (*traverseGeneratorWrapper)(t) } } diff --git a/pkg/services/object/search/streamer.go b/pkg/services/object/search/streamer.go deleted file mode 100644 index 472bf42135..0000000000 --- a/pkg/services/object/search/streamer.go +++ /dev/null @@ -1,186 +0,0 @@ -package searchsvc - -import ( - "context" - "io" - "sync" - - "github.com/nspcc-dev/neofs-api-go/pkg/object" - "github.com/nspcc-dev/neofs-node/pkg/core/netmap" - "github.com/nspcc-dev/neofs-node/pkg/network" - svcutil "github.com/nspcc-dev/neofs-node/pkg/services/object/util" - "github.com/nspcc-dev/neofs-node/pkg/services/object_manager/placement" - "github.com/pkg/errors" -) - -type Streamer struct { - *cfg - - once *sync.Once - - prm *Prm - - traverser *placement.Traverser - - ctx context.Context - - ch chan []*object.ID - - cache [][]*object.ID -} - -func (p *Streamer) Recv() (*Response, error) { - var err error - - p.once.Do(func() { - if err = p.preparePrm(p.prm); err == nil { - go p.start(p.prm) - } - }) - - if err != nil { - return nil, errors.Wrapf(err, "(%T) could not start streaming", p) - } - - select { - case <-p.ctx.Done(): - return nil, errors.Wrapf(p.ctx.Err(), "(%T) context is done", p) - case v, ok := <-p.ch: - if !ok { - return nil, io.EOF - } - - v = p.cutCached(v) - - return &Response{ - idList: v, - }, nil - } -} - -func (p *Streamer) cutCached(ids []*object.ID) []*object.ID { -loop: - for i := 0; i < len(ids); i++ { - for j := range p.cache { - for k := range p.cache[j] { - if ids[i].Equal(p.cache[j][k]) { - ids = append(ids[:i], ids[i+1:]...) - - i-- - - continue loop - } - } - } - } - - if len(ids) > 0 { - p.cache = append(p.cache, ids) - } - - return ids -} - -func (p *Streamer) preparePrm(prm *Prm) error { - var err error - - // get latest network map - nm, err := netmap.GetLatestNetworkMap(p.netMapSrc) - if err != nil { - return errors.Wrapf(err, "(%T) could not get latest network map", p) - } - - // get container to store the object - cnr, err := p.cnrSrc.Get(prm.cid) - if err != nil { - return errors.Wrapf(err, "(%T) could not get container by ID", p) - } - - // allocate placement traverser options - traverseOpts := make([]placement.Option, 0, 4) - - // add common options - traverseOpts = append(traverseOpts, - // set processing container - placement.ForContainer(cnr), - ) - - // create placement builder from network map - builder := placement.NewNetworkMapBuilder(nm) - - if prm.common.LocalOnly() { - // restrict success count to 1 stored copy (to local storage) - traverseOpts = append(traverseOpts, placement.SuccessAfter(1)) - - // use local-only placement builder - builder = svcutil.NewLocalPlacement(builder, p.localAddrSrc) - } - - // set placement builder - traverseOpts = append(traverseOpts, placement.UseBuilder(builder)) - - // build placement traverser - if p.traverser, err = placement.NewTraverser(traverseOpts...); err != nil { - return errors.Wrapf(err, "(%T) could not build placement traverser", p) - } - - p.ch = make(chan []*object.ID) - - return nil -} - -func (p *Streamer) start(prm *Prm) { - defer close(p.ch) - -loop: - for { - addrs := p.traverser.Next() - if len(addrs) == 0 { - break - } - - wg := new(sync.WaitGroup) - - for i := range addrs { - wg.Add(1) - - addr := addrs[i] - - if err := p.workerPool.Submit(func() { - defer wg.Done() - - var streamer interface { - stream(context.Context, chan<- []*object.ID) error - } - - if network.IsLocalAddress(p.localAddrSrc, addr) { - streamer = &localStream{ - query: prm.query, - storage: p.localStore, - cid: prm.cid, - } - } else { - streamer = &remoteStream{ - prm: prm, - keyStorage: p.keyStorage, - addr: addr, - clientCache: p.clientCache, - clientOpts: p.clientOpts, - } - } - - if err := streamer.stream(p.ctx, p.ch); err != nil { - svcutil.LogServiceError(p.log, "SEARCH", addr, err) - } - }); err != nil { - wg.Done() - - svcutil.LogWorkerPoolError(p.log, "SEARCH", err) - - break loop - } - } - - wg.Wait() - } -} diff --git a/pkg/services/object/search/util.go b/pkg/services/object/search/util.go new file mode 100644 index 0000000000..737f1940a6 --- /dev/null +++ b/pkg/services/object/search/util.go @@ -0,0 +1,109 @@ +package searchsvc + +import ( + "crypto/ecdsa" + "sync" + + "github.com/nspcc-dev/neofs-api-go/pkg/client" + "github.com/nspcc-dev/neofs-api-go/pkg/container" + objectSDK "github.com/nspcc-dev/neofs-api-go/pkg/object" + "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/engine" + "github.com/nspcc-dev/neofs-node/pkg/network/cache" + "github.com/nspcc-dev/neofs-node/pkg/services/object/util" + "github.com/nspcc-dev/neofs-node/pkg/services/object_manager/placement" +) + +type uniqueIDWriter struct { + mtx sync.Mutex + + written map[string]struct{} + + writer IDListWriter +} + +type clientCacheWrapper struct { + cache *cache.ClientCache + + opts []client.Option +} + +type clientWrapper struct { + client *client.Client +} + +type storageEngineWrapper engine.StorageEngine + +type traverseGeneratorWrapper util.TraverserGenerator + +func newUniqueAddressWriter(w IDListWriter) IDListWriter { + return &uniqueIDWriter{ + written: make(map[string]struct{}), + writer: w, + } +} + +func (w *uniqueIDWriter) WriteIDs(list []*objectSDK.ID) error { + w.mtx.Lock() + + for i := 0; i < len(list); i++ { // don't use range, slice mutates in body + s := list[i].String() + // standard stringer is quite costly, it is better + // to facilitate the calculation of the key + + if _, ok := w.written[s]; !ok { + // mark address as processed + w.written[s] = struct{}{} + continue + } + + // exclude processed address + list = append(list[:i], list[i+1:]...) + i-- + } + + w.mtx.Unlock() + + return w.writer.WriteIDs(list) +} + +func (c *clientCacheWrapper) get(key *ecdsa.PrivateKey, addr string) (searchClient, error) { + clt, err := c.cache.Get(key, addr, c.opts...) + + return &clientWrapper{ + client: clt, + }, err +} + +func (c *clientWrapper) searchObjects(exec *execCtx) ([]*objectSDK.ID, error) { + return c.client.SearchObject(exec.context(), + exec.remotePrm(), + exec.callOptions()...) +} + +func (e *storageEngineWrapper) search(exec *execCtx) ([]*objectSDK.ID, error) { + r, err := (*engine.StorageEngine)(e).Select(new(engine.SelectPrm). + WithFilters(exec.searchFilters()), + ) + if err != nil { + return nil, err + } + + return idsFromAddresses(r.AddressList()), nil +} + +func idsFromAddresses(addrs []*objectSDK.Address) []*objectSDK.ID { + ids := make([]*objectSDK.ID, len(addrs)) + + for i := range addrs { + ids[i] = addrs[i].ObjectID() + } + + return ids +} + +func (e *traverseGeneratorWrapper) generateTraverser(cid *container.ID) (*placement.Traverser, error) { + a := objectSDK.NewAddress() + a.SetContainerID(cid) + + return (*util.TraverserGenerator)(e).GenerateTraverser(a) +} diff --git a/pkg/services/object/search/v2/service.go b/pkg/services/object/search/v2/service.go index 75c020baa5..0d6bc3e168 100644 --- a/pkg/services/object/search/v2/service.go +++ b/pkg/services/object/search/v2/service.go @@ -1,11 +1,10 @@ package searchsvc import ( - "context" - - "github.com/nspcc-dev/neofs-api-go/v2/object" + objectV2 "github.com/nspcc-dev/neofs-api-go/v2/object" + objectSvc "github.com/nspcc-dev/neofs-node/pkg/services/object" searchsvc "github.com/nspcc-dev/neofs-node/pkg/services/object/search" - "github.com/pkg/errors" + objutil "github.com/nspcc-dev/neofs-node/pkg/services/object/util" ) // Service implements Search operation of Object service v2. @@ -18,6 +17,8 @@ type Option func(*cfg) type cfg struct { svc *searchsvc.Service + + keyStorage *objutil.KeyStorage } // NewService constructs Service instance from provided options. @@ -33,25 +34,27 @@ func NewService(opts ...Option) *Service { } } -// Search calls internal service and returns v2 search object streamer. -func (s *Service) Search(ctx context.Context, req *object.SearchRequest) (object.SearchObjectStreamer, error) { - prm, err := toPrm(req.GetBody(), req) +// Get calls internal service and returns v2 object stream. +func (s *Service) Search(req *objectV2.SearchRequest, stream objectSvc.SearchStream) error { + p, err := s.toPrm(req, stream) if err != nil { - return nil, errors.Wrapf(err, "(%T) could not convert search parameters", s) + return err } - stream, err := s.svc.Search(ctx, prm) - if err != nil { - return nil, errors.Wrapf(err, "(%T) could not open object search stream", s) - } - - return &streamer{ - stream: stream, - }, nil + return s.svc.Search(stream.Context(), *p) } +// WithInternalService returns option to set entity +// that handles request payload. func WithInternalService(v *searchsvc.Service) Option { return func(c *cfg) { c.svc = v } } + +// WithKeyStorage returns option to set local private key storage. +func WithKeyStorage(ks *objutil.KeyStorage) Option { + return func(c *cfg) { + c.keyStorage = ks + } +} diff --git a/pkg/services/object/search/v2/streamer.go b/pkg/services/object/search/v2/streamer.go index e97f6dae09..ebbb1494fe 100644 --- a/pkg/services/object/search/v2/streamer.go +++ b/pkg/services/object/search/v2/streamer.go @@ -1,26 +1,29 @@ package searchsvc import ( - "io" - + objectSDK "github.com/nspcc-dev/neofs-api-go/pkg/object" "github.com/nspcc-dev/neofs-api-go/v2/object" - searchsvc "github.com/nspcc-dev/neofs-node/pkg/services/object/search" - "github.com/pkg/errors" + "github.com/nspcc-dev/neofs-api-go/v2/refs" + objectSvc "github.com/nspcc-dev/neofs-node/pkg/services/object" ) -type streamer struct { - stream *searchsvc.Streamer +type streamWriter struct { + stream objectSvc.SearchStream } -func (s *streamer) Recv() (*object.SearchResponse, error) { - r, err := s.stream.Recv() - if err != nil { - if errors.Is(errors.Cause(err), io.EOF) { - return nil, io.EOF - } +func (s *streamWriter) WriteIDs(ids []*objectSDK.ID) error { + r := new(object.SearchResponse) - return nil, errors.Wrapf(err, "(%T) could not receive search response", s) + body := new(object.SearchResponseBody) + r.SetBody(body) + + idsV2 := make([]*refs.ObjectID, len(ids)) + + for i := range ids { + idsV2[i] = ids[i].ToV2() } - return fromResponse(r), nil + body.SetIDList(idsV2) + + return s.stream.Send(r) } diff --git a/pkg/services/object/search/v2/util.go b/pkg/services/object/search/v2/util.go index d6e579a4a3..ad3652e403 100644 --- a/pkg/services/object/search/v2/util.go +++ b/pkg/services/object/search/v2/util.go @@ -1,50 +1,61 @@ package searchsvc import ( + "github.com/nspcc-dev/neofs-api-go/pkg" + "github.com/nspcc-dev/neofs-api-go/pkg/client" "github.com/nspcc-dev/neofs-api-go/pkg/container" objectSDK "github.com/nspcc-dev/neofs-api-go/pkg/object" - "github.com/nspcc-dev/neofs-api-go/v2/object" - "github.com/nspcc-dev/neofs-api-go/v2/refs" + "github.com/nspcc-dev/neofs-api-go/pkg/token" + objectV2 "github.com/nspcc-dev/neofs-api-go/v2/object" + "github.com/nspcc-dev/neofs-api-go/v2/session" + objectSvc "github.com/nspcc-dev/neofs-node/pkg/services/object" searchsvc "github.com/nspcc-dev/neofs-node/pkg/services/object/search" - "github.com/nspcc-dev/neofs-node/pkg/services/object/search/query" - queryV1 "github.com/nspcc-dev/neofs-node/pkg/services/object/search/query/v1" "github.com/nspcc-dev/neofs-node/pkg/services/object/util" - "github.com/pkg/errors" ) -func toPrm(body *object.SearchRequestBody, req *object.SearchRequest) (*searchsvc.Prm, error) { - var q query.Query +func (s *Service) toPrm(req *objectV2.SearchRequest, stream objectSvc.SearchStream) (*searchsvc.Prm, error) { + meta := req.GetMetaHeader() - switch v := body.GetVersion(); v { - default: - return nil, errors.Errorf("unsupported query version #%d", v) - case 1: - q = queryV1.New( - objectSDK.NewSearchFiltersFromV2(body.GetFilters()), - ) + key, err := s.keyStorage.GetKey(token.NewSessionTokenFromV2(meta.GetSessionToken())) + if err != nil { + return nil, err } - return new(searchsvc.Prm). - WithContainerID( - container.NewIDFromV2(body.GetContainerID()), - ). - WithSearchQuery(q). - WithCommonPrm(util.CommonPrmFromV2(req)), nil + p := new(searchsvc.Prm) + p.SetPrivateKey(key) + p.SetCommonParameters(commonParameters(meta)) + p.SetRemoteCallOptions(remoteCallOptionsFromMeta(meta)...) + p.SetWriter(&streamWriter{ + stream: stream, + }) + + body := req.GetBody() + p.WithContainerID(container.NewIDFromV2(body.GetContainerID())) + p.WithSearchFilters(objectSDK.NewSearchFiltersFromV2(body.GetFilters())) + + return p, nil } -func fromResponse(r *searchsvc.Response) *object.SearchResponse { - ids := r.IDList() - idsV2 := make([]*refs.ObjectID, 0, len(ids)) +// can be shared accross all services +func remoteCallOptionsFromMeta(meta *session.RequestMetaHeader) []client.CallOption { + xHdrs := meta.GetXHeaders() - for i := range ids { - idsV2 = append(idsV2, ids[i].ToV2()) + opts := make([]client.CallOption, 0, 3+len(xHdrs)) + + opts = append(opts, + client.WithBearer(token.NewBearerTokenFromV2(meta.GetBearerToken())), + client.WithSession(token.NewSessionTokenFromV2(meta.GetSessionToken())), + client.WithTTL(meta.GetTTL()-1), + ) + + for i := range xHdrs { + opts = append(opts, client.WithXHeader(pkg.NewXHeaderFromV2(xHdrs[i]))) } - body := new(object.SearchResponseBody) - body.SetIDList(idsV2) - - resp := new(object.SearchResponse) - resp.SetBody(body) - - return resp + return opts +} + +func commonParameters(meta *session.RequestMetaHeader) *util.CommonPrm { + return new(util.CommonPrm). + WithLocalOnly(meta.GetTTL() <= 1) } diff --git a/pkg/services/object/server.go b/pkg/services/object/server.go index 50b30656b0..3c6a6e2782 100644 --- a/pkg/services/object/server.go +++ b/pkg/services/object/server.go @@ -19,13 +19,19 @@ type GetObjectRangeStream interface { Send(*object.GetRangeResponse) error } +// SearchStream is an interface of NeoFS API v2 compatible search streamer. +type SearchStream interface { + util.ServerStream + Send(*object.SearchResponse) error +} + // ServiceServer is an interface of utility // serving v2 Object service. type ServiceServer interface { Get(*object.GetRequest, GetObjectStream) error Put(context.Context) (object.PutObjectStreamer, error) Head(context.Context, *object.HeadRequest) (*object.HeadResponse, error) - Search(context.Context, *object.SearchRequest) (object.SearchObjectStreamer, error) + Search(*object.SearchRequest, SearchStream) error Delete(context.Context, *object.DeleteRequest) (*object.DeleteResponse, error) GetRange(*object.GetRangeRequest, GetObjectRangeStream) error GetRangeHash(context.Context, *object.GetRangeHashRequest) (*object.GetRangeHashResponse, error) diff --git a/pkg/services/object/sign.go b/pkg/services/object/sign.go index c9f3957099..4d4220f1ad 100644 --- a/pkg/services/object/sign.go +++ b/pkg/services/object/sign.go @@ -18,7 +18,9 @@ type SignService struct { } type searchStreamSigner struct { - stream *util.ResponseMessageStreamer + util.ServerStream + + respWriter util.ResponseMessageWriter } type getStreamSigner struct { @@ -109,35 +111,24 @@ func (s *SignService) Head(ctx context.Context, req *object.HeadRequest) (*objec return resp.(*object.HeadResponse), nil } -func (s *searchStreamSigner) Recv() (*object.SearchResponse, error) { - r, err := s.stream.Recv() - if err != nil { - return nil, errors.Wrap(err, "could not receive response") - } - - return r.(*object.SearchResponse), nil +func (s *searchStreamSigner) Send(resp *object.SearchResponse) error { + return s.respWriter(resp) } -func (s *SignService) Search(ctx context.Context, req *object.SearchRequest) (object.SearchObjectStreamer, error) { - stream, err := s.sigSvc.HandleServerStreamRequest(ctx, req, - func(ctx context.Context, req interface{}) (util.ResponseMessageReader, error) { - stream, err := s.svc.Search(ctx, req.(*object.SearchRequest)) - if err != nil { - return nil, err - } - - return func() (util.ResponseMessage, error) { - return stream.Recv() - }, nil +func (s *SignService) Search(req *object.SearchRequest, stream SearchStream) error { + respWriter, err := s.sigSvc.HandleServerStreamRequest_(req, + func(resp util.ResponseMessage) error { + return stream.Send(resp.(*object.SearchResponse)) }, ) if err != nil { - return nil, err + return err } - return &searchStreamSigner{ - stream: stream, - }, nil + return s.svc.Search(req, &searchStreamSigner{ + ServerStream: stream, + respWriter: respWriter, + }) } func (s *SignService) Delete(ctx context.Context, req *object.DeleteRequest) (*object.DeleteResponse, error) { diff --git a/pkg/services/object/transport_splitter.go b/pkg/services/object/transport_splitter.go index f1f339456d..b3b9ad0bcc 100644 --- a/pkg/services/object/transport_splitter.go +++ b/pkg/services/object/transport_splitter.go @@ -5,7 +5,6 @@ import ( "context" "github.com/nspcc-dev/neofs-api-go/v2/object" - "github.com/nspcc-dev/neofs-api-go/v2/refs" "github.com/nspcc-dev/neofs-node/pkg/services/util" "github.com/pkg/errors" ) @@ -30,10 +29,11 @@ type ( chunkSize int } - searchStreamBasicChecker struct { - next object.SearchObjectStreamer - resp *object.SearchResponse - list []*refs.ObjectID + searchStreamMsgSizeCtrl struct { + util.ServerStream + + stream SearchStream + addrAmount uint64 } @@ -100,13 +100,12 @@ func (c TransportSplitter) Head(ctx context.Context, request *object.HeadRequest return c.next.Head(ctx, request) } -func (c TransportSplitter) Search(ctx context.Context, request *object.SearchRequest) (object.SearchObjectStreamer, error) { - stream, err := c.next.Search(ctx, request) - - return &searchStreamBasicChecker{ - next: stream, - addrAmount: c.addrAmount, - }, err +func (c TransportSplitter) Search(req *object.SearchRequest, stream SearchStream) error { + return c.next.Search(req, &searchStreamMsgSizeCtrl{ + ServerStream: stream, + stream: stream, + addrAmount: c.addrAmount, + }) } func (c TransportSplitter) Delete(ctx context.Context, request *object.DeleteRequest) (*object.DeleteResponse, error) { @@ -154,34 +153,35 @@ func (c TransportSplitter) GetRangeHash(ctx context.Context, request *object.Get return c.next.GetRangeHash(ctx, request) } -func (s *searchStreamBasicChecker) Recv() (*object.SearchResponse, error) { - if s.resp == nil { - resp, err := s.next.Recv() - if err != nil { - return resp, err +func (s *searchStreamMsgSizeCtrl) Send(resp *object.SearchResponse) error { + body := resp.GetBody() + ids := body.GetIDList() + + var newResp *object.SearchResponse + + for ln := uint64(len(ids)); len(ids) > 0; { + if newResp == nil { + newResp = new(object.SearchResponse) + newResp.SetBody(body) } - s.resp = resp - s.list = s.resp.GetBody().GetIDList() + cut := s.addrAmount + if cut > ln { + cut = ln + } + + body.SetIDList(ids[:cut]) + newResp.SetMetaHeader(resp.GetMetaHeader()) + newResp.SetVerificationHeader(resp.GetVerificationHeader()) + + if err := s.stream.Send(newResp); err != nil { + return err + } + + ids = ids[cut:] } - chunk := s.list[:min(int(s.addrAmount), len(s.list))] - s.list = s.list[len(chunk):] - - body := new(object.SearchResponseBody) - body.SetIDList(chunk) - - resp := new(object.SearchResponse) - resp.SetVerificationHeader(s.resp.GetVerificationHeader()) - resp.SetMetaHeader(s.resp.GetMetaHeader()) - resp.SetBody(body) - - if len(s.list) == 0 { - s.list = nil - s.resp = nil - } - - return resp, nil + return nil } func min(a, b int) int {