From 167a67f0b846cdcfb5405f5bc6f3d13057e3161f Mon Sep 17 00:00:00 2001 From: Evgenii Stratonikov Date: Fri, 30 Dec 2022 17:48:50 +0300 Subject: [PATCH] [#460] services/util: Remove `HandleUnaryRequest` There is no need in a wrapper with many from-`interface{}` conversions. Signed-off-by: Evgenii Stratonikov --- cmd/frostfs-node/accounting.go | 6 +- cmd/frostfs-node/container.go | 19 ++-- cmd/frostfs-node/netmap.go | 20 ++-- cmd/frostfs-node/session.go | 5 +- pkg/services/accounting/executor.go | 10 +- pkg/services/accounting/response.go | 37 ------- pkg/services/container/executor.go | 14 ++- pkg/services/container/response.go | 115 -------------------- pkg/services/netmap/executor.go | 11 +- pkg/services/netmap/response.go | 63 ----------- pkg/services/object/response.go | 27 ++--- pkg/services/session/executor.go | 11 +- pkg/services/session/response.go | 37 ------- pkg/services/util/response/client_stream.go | 6 +- pkg/services/util/response/server_stream.go | 6 +- pkg/services/util/response/service.go | 7 +- pkg/services/util/response/unary.go | 21 ---- 17 files changed, 76 insertions(+), 339 deletions(-) delete mode 100644 pkg/services/accounting/response.go delete mode 100644 pkg/services/container/response.go delete mode 100644 pkg/services/netmap/response.go delete mode 100644 pkg/services/session/response.go delete mode 100644 pkg/services/util/response/unary.go diff --git a/cmd/frostfs-node/accounting.go b/cmd/frostfs-node/accounting.go index 6a35f37d..d04f34ff 100644 --- a/cmd/frostfs-node/accounting.go +++ b/cmd/frostfs-node/accounting.go @@ -21,10 +21,8 @@ func initAccountingService(ctx context.Context, c *cfg) { server := accountingTransportGRPC.New( accountingService.NewSignService( &c.key.PrivateKey, - accountingService.NewResponseService( - accountingService.NewExecutionService( - accounting.NewExecutor(balanceMorphWrapper), - ), + accountingService.NewExecutionService( + accounting.NewExecutor(balanceMorphWrapper), c.respSvc, ), ), diff --git a/cmd/frostfs-node/container.go b/cmd/frostfs-node/container.go index d5a5afce..9a6cfd02 100644 --- a/cmd/frostfs-node/container.go +++ b/cmd/frostfs-node/container.go @@ -83,16 +83,13 @@ func initContainerService(ctx context.Context, c *cfg) { server := containerTransportGRPC.New( containerService.NewSignService( &c.key.PrivateKey, - containerService.NewResponseService( - &usedSpaceService{ - Server: containerService.NewExecutionService(containerMorph.NewExecutor(cnrRdr, cnrWrt)), - loadWriterProvider: loadRouter, - loadPlacementBuilder: loadPlacementBuilder, - routeBuilder: routeBuilder, - cfg: c, - }, - c.respSvc, - ), + &usedSpaceService{ + Server: containerService.NewExecutionService(containerMorph.NewExecutor(cnrRdr, cnrWrt), c.respSvc), + loadWriterProvider: loadRouter, + loadPlacementBuilder: loadPlacementBuilder, + routeBuilder: routeBuilder, + cfg: c, + }, ), ) @@ -575,6 +572,8 @@ func (c *usedSpaceService) AnnounceUsedSpace(ctx context.Context, req *container resp := new(containerV2.AnnounceUsedSpaceResponse) resp.SetBody(respBody) + c.cfg.respSvc.SetMeta(resp) + return resp, nil } diff --git a/cmd/frostfs-node/netmap.go b/cmd/frostfs-node/netmap.go index f1ea8b40..b7b09a00 100644 --- a/cmd/frostfs-node/netmap.go +++ b/cmd/frostfs-node/netmap.go @@ -148,17 +148,15 @@ func initNetmapService(ctx context.Context, c *cfg) { server := netmapTransportGRPC.New( netmapService.NewSignService( &c.key.PrivateKey, - netmapService.NewResponseService( - netmapService.NewExecutionService( - c, - c.apiVersion, - &netInfo{ - netState: c.cfgNetmap.state, - magic: c.cfgMorph.client, - morphClientNetMap: c.cfgNetmap.wrapper, - msPerBlockRdr: c.cfgMorph.client.MsPerBlock, - }, - ), + netmapService.NewExecutionService( + c, + c.apiVersion, + &netInfo{ + netState: c.cfgNetmap.state, + magic: c.cfgMorph.client, + morphClientNetMap: c.cfgNetmap.wrapper, + msPerBlockRdr: c.cfgMorph.client.MsPerBlock, + }, c.respSvc, ), ), diff --git a/cmd/frostfs-node/session.go b/cmd/frostfs-node/session.go index 95e3b820..f9c1811a 100644 --- a/cmd/frostfs-node/session.go +++ b/cmd/frostfs-node/session.go @@ -53,10 +53,7 @@ func initSessionService(c *cfg) { server := sessionTransportGRPC.New( sessionSvc.NewSignService( &c.key.PrivateKey, - sessionSvc.NewResponseService( - sessionSvc.NewExecutionService(c.privateTokenStore, c.log), - c.respSvc, - ), + sessionSvc.NewExecutionService(c.privateTokenStore, c.respSvc, c.log), ), ) diff --git a/pkg/services/accounting/executor.go b/pkg/services/accounting/executor.go index 40284595..b0722cf8 100644 --- a/pkg/services/accounting/executor.go +++ b/pkg/services/accounting/executor.go @@ -5,6 +5,7 @@ import ( "fmt" "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/accounting" + "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/util/response" ) type ServiceExecutor interface { @@ -12,13 +13,15 @@ type ServiceExecutor interface { } type executorSvc struct { - exec ServiceExecutor + exec ServiceExecutor + respSvc *response.Service } // NewExecutionService wraps ServiceExecutor and returns Accounting Service interface. -func NewExecutionService(exec ServiceExecutor) Server { +func NewExecutionService(exec ServiceExecutor, respSvc *response.Service) Server { return &executorSvc{ - exec: exec, + exec: exec, + respSvc: respSvc, } } @@ -31,5 +34,6 @@ func (s *executorSvc) Balance(ctx context.Context, req *accounting.BalanceReques resp := new(accounting.BalanceResponse) resp.SetBody(respBody) + s.respSvc.SetMeta(resp) return resp, nil } diff --git a/pkg/services/accounting/response.go b/pkg/services/accounting/response.go deleted file mode 100644 index a78ac6fd..00000000 --- a/pkg/services/accounting/response.go +++ /dev/null @@ -1,37 +0,0 @@ -package accounting - -import ( - "context" - - "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/accounting" - "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/util" - "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/util/response" -) - -type responseService struct { - respSvc *response.Service - - svc Server -} - -// NewResponseService returns accounting service instance that passes internal service -// call to response service. -func NewResponseService(accSvc Server, respSvc *response.Service) Server { - return &responseService{ - respSvc: respSvc, - svc: accSvc, - } -} - -func (s *responseService) Balance(ctx context.Context, req *accounting.BalanceRequest) (*accounting.BalanceResponse, error) { - resp, err := s.respSvc.HandleUnaryRequest(ctx, req, - func(ctx context.Context, req any) (util.ResponseMessage, error) { - return s.svc.Balance(ctx, req.(*accounting.BalanceRequest)) - }, - ) - if err != nil { - return nil, err - } - - return resp.(*accounting.BalanceResponse), nil -} diff --git a/pkg/services/container/executor.go b/pkg/services/container/executor.go index b4705d25..d4ae11d6 100644 --- a/pkg/services/container/executor.go +++ b/pkg/services/container/executor.go @@ -6,6 +6,7 @@ import ( "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/container" "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/session" + "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/util/response" ) type ServiceExecutor interface { @@ -21,12 +22,15 @@ type executorSvc struct { Server exec ServiceExecutor + + respSvc *response.Service } // NewExecutionService wraps ServiceExecutor and returns Container Service interface. -func NewExecutionService(exec ServiceExecutor) Server { +func NewExecutionService(exec ServiceExecutor, respSvc *response.Service) Server { return &executorSvc{ - exec: exec, + exec: exec, + respSvc: respSvc, } } @@ -44,6 +48,7 @@ func (s *executorSvc) Put(ctx context.Context, req *container.PutRequest) (*cont resp := new(container.PutResponse) resp.SetBody(respBody) + s.respSvc.SetMeta(resp) return resp, nil } @@ -61,6 +66,7 @@ func (s *executorSvc) Delete(ctx context.Context, req *container.DeleteRequest) resp := new(container.DeleteResponse) resp.SetBody(respBody) + s.respSvc.SetMeta(resp) return resp, nil } @@ -73,6 +79,7 @@ func (s *executorSvc) Get(ctx context.Context, req *container.GetRequest) (*cont resp := new(container.GetResponse) resp.SetBody(respBody) + s.respSvc.SetMeta(resp) return resp, nil } @@ -85,6 +92,7 @@ func (s *executorSvc) List(ctx context.Context, req *container.ListRequest) (*co resp := new(container.ListResponse) resp.SetBody(respBody) + s.respSvc.SetMeta(resp) return resp, nil } @@ -102,6 +110,7 @@ func (s *executorSvc) SetExtendedACL(ctx context.Context, req *container.SetExte resp := new(container.SetExtendedACLResponse) resp.SetBody(respBody) + s.respSvc.SetMeta(resp) return resp, nil } @@ -114,5 +123,6 @@ func (s *executorSvc) GetExtendedACL(ctx context.Context, req *container.GetExte resp := new(container.GetExtendedACLResponse) resp.SetBody(respBody) + s.respSvc.SetMeta(resp) return resp, nil } diff --git a/pkg/services/container/response.go b/pkg/services/container/response.go deleted file mode 100644 index 13897453..00000000 --- a/pkg/services/container/response.go +++ /dev/null @@ -1,115 +0,0 @@ -package container - -import ( - "context" - - "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/container" - "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/util" - "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/util/response" -) - -type responseService struct { - respSvc *response.Service - - svc Server -} - -// NewResponseService returns container service instance that passes internal service -// call to response service. -func NewResponseService(cnrSvc Server, respSvc *response.Service) Server { - return &responseService{ - respSvc: respSvc, - svc: cnrSvc, - } -} - -func (s *responseService) Put(ctx context.Context, req *container.PutRequest) (*container.PutResponse, error) { - resp, err := s.respSvc.HandleUnaryRequest(ctx, req, - func(ctx context.Context, req any) (util.ResponseMessage, error) { - return s.svc.Put(ctx, req.(*container.PutRequest)) - }, - ) - if err != nil { - return nil, err - } - - return resp.(*container.PutResponse), nil -} - -func (s *responseService) Delete(ctx context.Context, req *container.DeleteRequest) (*container.DeleteResponse, error) { - resp, err := s.respSvc.HandleUnaryRequest(ctx, req, - func(ctx context.Context, req any) (util.ResponseMessage, error) { - return s.svc.Delete(ctx, req.(*container.DeleteRequest)) - }, - ) - if err != nil { - return nil, err - } - - return resp.(*container.DeleteResponse), nil -} - -func (s *responseService) Get(ctx context.Context, req *container.GetRequest) (*container.GetResponse, error) { - resp, err := s.respSvc.HandleUnaryRequest(ctx, req, - func(ctx context.Context, req any) (util.ResponseMessage, error) { - return s.svc.Get(ctx, req.(*container.GetRequest)) - }, - ) - if err != nil { - return nil, err - } - - return resp.(*container.GetResponse), nil -} - -func (s *responseService) List(ctx context.Context, req *container.ListRequest) (*container.ListResponse, error) { - resp, err := s.respSvc.HandleUnaryRequest(ctx, req, - func(ctx context.Context, req any) (util.ResponseMessage, error) { - return s.svc.List(ctx, req.(*container.ListRequest)) - }, - ) - if err != nil { - return nil, err - } - - return resp.(*container.ListResponse), nil -} - -func (s *responseService) SetExtendedACL(ctx context.Context, req *container.SetExtendedACLRequest) (*container.SetExtendedACLResponse, error) { - resp, err := s.respSvc.HandleUnaryRequest(ctx, req, - func(ctx context.Context, req any) (util.ResponseMessage, error) { - return s.svc.SetExtendedACL(ctx, req.(*container.SetExtendedACLRequest)) - }, - ) - if err != nil { - return nil, err - } - - return resp.(*container.SetExtendedACLResponse), nil -} - -func (s *responseService) GetExtendedACL(ctx context.Context, req *container.GetExtendedACLRequest) (*container.GetExtendedACLResponse, error) { - resp, err := s.respSvc.HandleUnaryRequest(ctx, req, - func(ctx context.Context, req any) (util.ResponseMessage, error) { - return s.svc.GetExtendedACL(ctx, req.(*container.GetExtendedACLRequest)) - }, - ) - if err != nil { - return nil, err - } - - return resp.(*container.GetExtendedACLResponse), nil -} - -func (s *responseService) AnnounceUsedSpace(ctx context.Context, req *container.AnnounceUsedSpaceRequest) (*container.AnnounceUsedSpaceResponse, error) { - resp, err := s.respSvc.HandleUnaryRequest(ctx, req, - func(ctx context.Context, req any) (util.ResponseMessage, error) { - return s.svc.AnnounceUsedSpace(ctx, req.(*container.AnnounceUsedSpaceRequest)) - }, - ) - if err != nil { - return nil, err - } - - return resp.(*container.AnnounceUsedSpaceResponse), nil -} diff --git a/pkg/services/netmap/executor.go b/pkg/services/netmap/executor.go index d77c69d4..d1e7a949 100644 --- a/pkg/services/netmap/executor.go +++ b/pkg/services/netmap/executor.go @@ -8,6 +8,7 @@ import ( "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/netmap" "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/refs" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/version" + "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/util/response" netmapSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/netmap" versionsdk "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/version" ) @@ -18,6 +19,8 @@ type executorSvc struct { state NodeState netInfo NetworkInfo + + respSvc *response.Service } // NodeState encapsulates information @@ -42,8 +45,8 @@ type NetworkInfo interface { Dump(versionsdk.Version) (*netmapSDK.NetworkInfo, error) } -func NewExecutionService(s NodeState, v versionsdk.Version, netInfo NetworkInfo) Server { - if s == nil || netInfo == nil || !version.IsValid(v) { +func NewExecutionService(s NodeState, v versionsdk.Version, netInfo NetworkInfo, respSvc *response.Service) Server { + if s == nil || netInfo == nil || !version.IsValid(v) || respSvc == nil { // this should never happen, otherwise it programmers bug panic("can't create netmap execution service") } @@ -51,6 +54,7 @@ func NewExecutionService(s NodeState, v versionsdk.Version, netInfo NetworkInfo) res := &executorSvc{ state: s, netInfo: netInfo, + respSvc: respSvc, } v.WriteToV2(&res.version) @@ -96,6 +100,7 @@ func (s *executorSvc) LocalNodeInfo( resp := new(netmap.LocalNodeInfoResponse) resp.SetBody(body) + s.respSvc.SetMeta(resp) return resp, nil } @@ -126,6 +131,7 @@ func (s *executorSvc) NetworkInfo( resp := new(netmap.NetworkInfoResponse) resp.SetBody(body) + s.respSvc.SetMeta(resp) return resp, nil } @@ -143,5 +149,6 @@ func (s *executorSvc) Snapshot(_ context.Context, _ *netmap.SnapshotRequest) (*n resp := new(netmap.SnapshotResponse) resp.SetBody(body) + s.respSvc.SetMeta(resp) return resp, nil } diff --git a/pkg/services/netmap/response.go b/pkg/services/netmap/response.go deleted file mode 100644 index 8b035e46..00000000 --- a/pkg/services/netmap/response.go +++ /dev/null @@ -1,63 +0,0 @@ -package netmap - -import ( - "context" - - "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/netmap" - "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/util" - "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/util/response" -) - -type responseService struct { - respSvc *response.Service - - svc Server -} - -// NewResponseService returns netmap service instance that passes internal service -// call to response service. -func NewResponseService(nmSvc Server, respSvc *response.Service) Server { - return &responseService{ - respSvc: respSvc, - svc: nmSvc, - } -} - -func (s *responseService) LocalNodeInfo(ctx context.Context, req *netmap.LocalNodeInfoRequest) (*netmap.LocalNodeInfoResponse, error) { - resp, err := s.respSvc.HandleUnaryRequest(ctx, req, - func(ctx context.Context, req any) (util.ResponseMessage, error) { - return s.svc.LocalNodeInfo(ctx, req.(*netmap.LocalNodeInfoRequest)) - }, - ) - if err != nil { - return nil, err - } - - return resp.(*netmap.LocalNodeInfoResponse), nil -} - -func (s *responseService) NetworkInfo(ctx context.Context, req *netmap.NetworkInfoRequest) (*netmap.NetworkInfoResponse, error) { - resp, err := s.respSvc.HandleUnaryRequest(ctx, req, - func(ctx context.Context, req any) (util.ResponseMessage, error) { - return s.svc.NetworkInfo(ctx, req.(*netmap.NetworkInfoRequest)) - }, - ) - if err != nil { - return nil, err - } - - return resp.(*netmap.NetworkInfoResponse), nil -} - -func (s *responseService) Snapshot(ctx context.Context, req *netmap.SnapshotRequest) (*netmap.SnapshotResponse, error) { - resp, err := s.respSvc.HandleUnaryRequest(ctx, req, - func(ctx context.Context, req any) (util.ResponseMessage, error) { - return s.svc.Snapshot(ctx, req.(*netmap.SnapshotRequest)) - }, - ) - if err != nil { - return nil, err - } - - return resp.(*netmap.SnapshotResponse), nil -} diff --git a/pkg/services/object/response.go b/pkg/services/object/response.go index def934ea..4eef1dfb 100644 --- a/pkg/services/object/response.go +++ b/pkg/services/object/response.go @@ -91,16 +91,13 @@ func (s *ResponseService) Put() (PutObjectStream, error) { } func (s *ResponseService) Head(ctx context.Context, req *object.HeadRequest) (*object.HeadResponse, error) { - resp, err := s.respSvc.HandleUnaryRequest(ctx, req, - func(ctx context.Context, req any) (util.ResponseMessage, error) { - return s.svc.Head(ctx, req.(*object.HeadRequest)) - }, - ) + resp, err := s.svc.Head(ctx, req) if err != nil { return nil, err } - return resp.(*object.HeadResponse), nil + s.respSvc.SetMeta(resp) + return resp, nil } func (s *searchStreamResponser) Send(resp *object.SearchResponse) error { @@ -117,16 +114,13 @@ func (s *ResponseService) Search(req *object.SearchRequest, stream SearchStream) } func (s *ResponseService) Delete(ctx context.Context, req *object.DeleteRequest) (*object.DeleteResponse, error) { - resp, err := s.respSvc.HandleUnaryRequest(ctx, req, - func(ctx context.Context, req any) (util.ResponseMessage, error) { - return s.svc.Delete(ctx, req.(*object.DeleteRequest)) - }, - ) + resp, err := s.svc.Delete(ctx, req) if err != nil { return nil, err } - return resp.(*object.DeleteResponse), nil + s.respSvc.SetMeta(resp) + return resp, nil } func (s *getRangeStreamResponser) Send(resp *object.GetRangeResponse) error { @@ -143,14 +137,11 @@ func (s *ResponseService) GetRange(req *object.GetRangeRequest, stream GetObject } func (s *ResponseService) GetRangeHash(ctx context.Context, req *object.GetRangeHashRequest) (*object.GetRangeHashResponse, error) { - resp, err := s.respSvc.HandleUnaryRequest(ctx, req, - func(ctx context.Context, req any) (util.ResponseMessage, error) { - return s.svc.GetRangeHash(ctx, req.(*object.GetRangeHashRequest)) - }, - ) + resp, err := s.svc.GetRangeHash(ctx, req) if err != nil { return nil, err } - return resp.(*object.GetRangeHashResponse), nil + s.respSvc.SetMeta(resp) + return resp, nil } diff --git a/pkg/services/session/executor.go b/pkg/services/session/executor.go index 5ad1d651..76c220fa 100644 --- a/pkg/services/session/executor.go +++ b/pkg/services/session/executor.go @@ -6,6 +6,7 @@ import ( "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/session" "git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs" + "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/util/response" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger" "go.uber.org/zap" ) @@ -17,14 +18,17 @@ type ServiceExecutor interface { type executorSvc struct { exec ServiceExecutor + respSvc *response.Service + log *logger.Logger } // NewExecutionService wraps ServiceExecutor and returns Session Service interface. -func NewExecutionService(exec ServiceExecutor, l *logger.Logger) Server { +func NewExecutionService(exec ServiceExecutor, respSvc *response.Service, l *logger.Logger) Server { return &executorSvc{ - exec: exec, - log: l, + exec: exec, + log: l, + respSvc: respSvc, } } @@ -42,5 +46,6 @@ func (s *executorSvc) Create(ctx context.Context, req *session.CreateRequest) (* resp := new(session.CreateResponse) resp.SetBody(respBody) + s.respSvc.SetMeta(resp) return resp, nil } diff --git a/pkg/services/session/response.go b/pkg/services/session/response.go deleted file mode 100644 index cbf93fb1..00000000 --- a/pkg/services/session/response.go +++ /dev/null @@ -1,37 +0,0 @@ -package session - -import ( - "context" - - "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/session" - "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/util" - "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/util/response" -) - -type responseService struct { - respSvc *response.Service - - svc Server -} - -// NewResponseService returns session service instance that passes internal service -// call to response service. -func NewResponseService(ssSvc Server, respSvc *response.Service) Server { - return &responseService{ - respSvc: respSvc, - svc: ssSvc, - } -} - -func (s *responseService) Create(ctx context.Context, req *session.CreateRequest) (*session.CreateResponse, error) { - resp, err := s.respSvc.HandleUnaryRequest(ctx, req, - func(ctx context.Context, req any) (util.ResponseMessage, error) { - return s.svc.Create(ctx, req.(*session.CreateRequest)) - }, - ) - if err != nil { - return nil, err - } - - return resp.(*session.CreateResponse), nil -} diff --git a/pkg/services/util/response/client_stream.go b/pkg/services/util/response/client_stream.go index b541c73d..962f2bc2 100644 --- a/pkg/services/util/response/client_stream.go +++ b/pkg/services/util/response/client_stream.go @@ -10,7 +10,7 @@ import ( // ClientMessageStreamer represents client-side message streamer // that sets meta values to the response. type ClientMessageStreamer struct { - cfg *cfg + srv *Service send util.RequestMessageWriter @@ -33,7 +33,7 @@ func (s *ClientMessageStreamer) CloseAndRecv(ctx context.Context) (util.Response return nil, fmt.Errorf("(%T) could not close stream and receive response: %w", s, err) } - setMeta(resp, s.cfg) + s.srv.SetMeta(resp) return resp, nil } @@ -41,7 +41,7 @@ func (s *ClientMessageStreamer) CloseAndRecv(ctx context.Context) (util.Response // CreateRequestStreamer wraps stream methods and returns ClientMessageStreamer instance. func (s *Service) CreateRequestStreamer(sender util.RequestMessageWriter, closer util.ClientStreamCloser) *ClientMessageStreamer { return &ClientMessageStreamer{ - cfg: s.cfg, + srv: s, send: sender, close: closer, } diff --git a/pkg/services/util/response/server_stream.go b/pkg/services/util/response/server_stream.go index 8a19fc4e..df1c0017 100644 --- a/pkg/services/util/response/server_stream.go +++ b/pkg/services/util/response/server_stream.go @@ -9,7 +9,7 @@ import ( // ServerMessageStreamer represents server-side message streamer // that sets meta values to all response messages. type ServerMessageStreamer struct { - cfg *cfg + srv *Service recv util.ResponseMessageReader } @@ -22,7 +22,7 @@ func (s *ServerMessageStreamer) Recv() (util.ResponseMessage, error) { return nil, fmt.Errorf("could not receive response message for signing: %w", err) } - setMeta(m, s.cfg) + s.srv.SetMeta(m) return m, nil } @@ -30,7 +30,7 @@ func (s *ServerMessageStreamer) Recv() (util.ResponseMessage, error) { // HandleServerStreamRequest builds internal streamer via handlers, wraps it to ServerMessageStreamer and returns the result. func (s *Service) HandleServerStreamRequest(respWriter util.ResponseMessageWriter) util.ResponseMessageWriter { return func(resp util.ResponseMessage) error { - setMeta(resp, s.cfg) + s.SetMeta(resp) return respWriter(resp) } diff --git a/pkg/services/util/response/service.go b/pkg/services/util/response/service.go index 87cc8383..a63d5343 100644 --- a/pkg/services/util/response/service.go +++ b/pkg/services/util/response/service.go @@ -44,11 +44,12 @@ func NewService(opts ...Option) *Service { } } -func setMeta(resp util.ResponseMessage, cfg *cfg) { +// SetMeta sets adds meta-header to resp. +func (s *Service) SetMeta(resp util.ResponseMessage) { meta := new(session.ResponseMetaHeader) - meta.SetVersion(&cfg.version) + meta.SetVersion(&s.cfg.version) meta.SetTTL(1) // FIXME: #1160 TTL must be calculated - meta.SetEpoch(cfg.state.CurrentEpoch()) + meta.SetEpoch(s.cfg.state.CurrentEpoch()) if origin := resp.GetMetaHeader(); origin != nil { // FIXME: #1160 what if origin is set by local server? diff --git a/pkg/services/util/response/unary.go b/pkg/services/util/response/unary.go deleted file mode 100644 index 29cb9531..00000000 --- a/pkg/services/util/response/unary.go +++ /dev/null @@ -1,21 +0,0 @@ -package response - -import ( - "context" - "fmt" - - "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/util" -) - -// HandleUnaryRequest call passes request to handler, sets response meta header values and returns it. -func (s *Service) HandleUnaryRequest(ctx context.Context, req any, handler util.UnaryHandler) (util.ResponseMessage, error) { - // process request - resp, err := handler(ctx, req) - if err != nil { - return nil, fmt.Errorf("could not handle request: %w", err) - } - - setMeta(resp, s.cfg) - - return resp, nil -}