From 6bede7d836eead238dda38c3d21944c15b8d029f Mon Sep 17 00:00:00 2001 From: Leonard Lyubich Date: Thu, 22 Oct 2020 14:04:46 +0300 Subject: [PATCH] [#83] services/util: Implement response service Create response package. Implement response Service that sets values of response meta header. Signed-off-by: Leonard Lyubich --- pkg/services/util/response/client_stream.go | 45 ++++++++++++++++ pkg/services/util/response/server_stream.go | 42 +++++++++++++++ pkg/services/util/response/service.go | 58 +++++++++++++++++++++ pkg/services/util/response/unary.go | 21 ++++++++ 4 files changed, 166 insertions(+) create mode 100644 pkg/services/util/response/client_stream.go create mode 100644 pkg/services/util/response/server_stream.go create mode 100644 pkg/services/util/response/service.go create mode 100644 pkg/services/util/response/unary.go diff --git a/pkg/services/util/response/client_stream.go b/pkg/services/util/response/client_stream.go new file mode 100644 index 000000000..2e8196280 --- /dev/null +++ b/pkg/services/util/response/client_stream.go @@ -0,0 +1,45 @@ +package response + +import ( + "github.com/nspcc-dev/neofs-node/pkg/services/util" + "github.com/pkg/errors" +) + +// ClientMessageStreamer represents client-side message streamer +// that sets meta values to the response. +type ClientMessageStreamer struct { + cfg *cfg + + send util.RequestMessageWriter + + close util.ClientStreamCloser +} + +// Recv calls send method of internal streamer. +func (s *ClientMessageStreamer) Send(req interface{}) error { + return errors.Wrapf( + s.send(req), + "(%T) could not send the request", s) +} + +// CloseAndRecv closes internal stream, receivers the response, +// sets meta values and returns the result. +func (s *ClientMessageStreamer) CloseAndRecv() (util.ResponseMessage, error) { + resp, err := s.close() + if err != nil { + return nil, errors.Wrapf(err, "(%T) could not close stream and receive response", s) + } + + setMeta(resp, s.cfg) + + return resp, nil +} + +// CreateRequestStreamer wraps stream methods and returns ClientMessageStreamer instance. +func (s *Service) CreateRequestStreamer(sender util.RequestMessageWriter, closer util.ClientStreamCloser) *ClientMessageStreamer { + return &ClientMessageStreamer{ + cfg: s.cfg, + send: sender, + close: closer, + } +} diff --git a/pkg/services/util/response/server_stream.go b/pkg/services/util/response/server_stream.go new file mode 100644 index 000000000..fbc1a9615 --- /dev/null +++ b/pkg/services/util/response/server_stream.go @@ -0,0 +1,42 @@ +package response + +import ( + "context" + + "github.com/nspcc-dev/neofs-node/pkg/services/util" + "github.com/pkg/errors" +) + +// ServerMessageStreamer represents server-side message streamer +// that sets meta values to all response messages. +type ServerMessageStreamer struct { + cfg *cfg + + recv util.ResponseMessageReader +} + +// Recv calls Recv method of internal streamer, sets response meta +// values and returns the response. +func (s *ServerMessageStreamer) Recv() (util.ResponseMessage, error) { + m, err := s.recv() + if err != nil { + return nil, errors.Wrap(err, "could not receive response message for signing") + } + + setMeta(m, s.cfg) + + return m, nil +} + +// HandleServerStreamRequest builds internal streamer via handlers, wraps it to ServerMessageStreamer and returns the result. +func (s *Service) HandleServerStreamRequest(ctx context.Context, req interface{}, handler util.ServerStreamHandler) (*ServerMessageStreamer, error) { + msgRdr, err := handler(ctx, req) + if err != nil { + return nil, errors.Wrap(err, "could not create message reader") + } + + return &ServerMessageStreamer{ + cfg: s.cfg, + recv: msgRdr, + }, nil +} diff --git a/pkg/services/util/response/service.go b/pkg/services/util/response/service.go new file mode 100644 index 000000000..29e91e3ec --- /dev/null +++ b/pkg/services/util/response/service.go @@ -0,0 +1,58 @@ +package response + +import ( + "github.com/nspcc-dev/neofs-api-go/pkg" + "github.com/nspcc-dev/neofs-api-go/v2/refs" + "github.com/nspcc-dev/neofs-api-go/v2/session" + "github.com/nspcc-dev/neofs-node/pkg/services/util" +) + +// Service represents universal v2 service +// that sets response meta header values. +type Service struct { + cfg *cfg +} + +// Option is an option of Service constructor. +type Option func(*cfg) + +type cfg struct { + version *refs.Version + + // TODO: neofs-node#83 add network state +} + +func defaultCfg() *cfg { + return &cfg{ + version: pkg.SDKVersion().ToV2(), + } +} + +// NewService creates, initializes and returns Service instance. +func NewService(opts ...Option) *Service { + c := defaultCfg() + + for i := range opts { + opts[i](c) + } + + return &Service{ + cfg: c, + } +} + +func setMeta(resp util.ResponseMessage, cfg *cfg) { + meta := new(session.ResponseMetaHeader) + meta.SetVersion(cfg.version) + meta.SetTTL(1) // FIXME: TTL must be calculated + + // TODO: neofs-node#83 + // meta.SetEpoch() + + if origin := resp.GetMetaHeader(); origin != nil { + // FIXME: what if origin is set by local server? + meta.SetOrigin(origin) + } + + resp.SetMetaHeader(meta) +} diff --git a/pkg/services/util/response/unary.go b/pkg/services/util/response/unary.go new file mode 100644 index 000000000..0ca84b4e2 --- /dev/null +++ b/pkg/services/util/response/unary.go @@ -0,0 +1,21 @@ +package response + +import ( + "context" + + "github.com/nspcc-dev/neofs-node/pkg/services/util" + "github.com/pkg/errors" +) + +// HandleUnaryRequest call passes request to handler, sets response meta header values and returns it. +func (s *Service) HandleUnaryRequest(ctx context.Context, req interface{}, handler util.UnaryHandler) (util.ResponseMessage, error) { + // process request + resp, err := handler(ctx, req) + if err != nil { + return nil, errors.Wrap(err, "could not handle request") + } + + setMeta(resp, s.cfg) + + return resp, nil +}