forked from TrueCloudLab/frostfs-node
WIP: Morph: Add unit tests #2
17 changed files with 76 additions and 339 deletions
|
@ -21,10 +21,8 @@ func initAccountingService(ctx context.Context, c *cfg) {
|
|||
server := accountingTransportGRPC.New(
|
||||
accountingService.NewSignService(
|
||||
&c.key.PrivateKey,
|
||||
accountingService.NewResponseService(
|
||||
accountingService.NewExecutionService(
|
||||
accounting.NewExecutor(balanceMorphWrapper),
|
||||
),
|
||||
accountingService.NewExecutionService(
|
||||
accounting.NewExecutor(balanceMorphWrapper),
|
||||
c.respSvc,
|
||||
),
|
||||
),
|
||||
|
|
|
@ -83,16 +83,13 @@ func initContainerService(ctx context.Context, c *cfg) {
|
|||
server := containerTransportGRPC.New(
|
||||
containerService.NewSignService(
|
||||
&c.key.PrivateKey,
|
||||
containerService.NewResponseService(
|
||||
&usedSpaceService{
|
||||
Server: containerService.NewExecutionService(containerMorph.NewExecutor(cnrRdr, cnrWrt)),
|
||||
loadWriterProvider: loadRouter,
|
||||
loadPlacementBuilder: loadPlacementBuilder,
|
||||
routeBuilder: routeBuilder,
|
||||
cfg: c,
|
||||
},
|
||||
c.respSvc,
|
||||
),
|
||||
&usedSpaceService{
|
||||
Server: containerService.NewExecutionService(containerMorph.NewExecutor(cnrRdr, cnrWrt), c.respSvc),
|
||||
loadWriterProvider: loadRouter,
|
||||
loadPlacementBuilder: loadPlacementBuilder,
|
||||
routeBuilder: routeBuilder,
|
||||
cfg: c,
|
||||
},
|
||||
),
|
||||
)
|
||||
|
||||
|
@ -575,6 +572,8 @@ func (c *usedSpaceService) AnnounceUsedSpace(ctx context.Context, req *container
|
|||
resp := new(containerV2.AnnounceUsedSpaceResponse)
|
||||
resp.SetBody(respBody)
|
||||
|
||||
c.cfg.respSvc.SetMeta(resp)
|
||||
|
||||
return resp, nil
|
||||
}
|
||||
|
||||
|
|
|
@ -148,17 +148,15 @@ func initNetmapService(ctx context.Context, c *cfg) {
|
|||
server := netmapTransportGRPC.New(
|
||||
netmapService.NewSignService(
|
||||
&c.key.PrivateKey,
|
||||
netmapService.NewResponseService(
|
||||
netmapService.NewExecutionService(
|
||||
c,
|
||||
c.apiVersion,
|
||||
&netInfo{
|
||||
netState: c.cfgNetmap.state,
|
||||
magic: c.cfgMorph.client,
|
||||
morphClientNetMap: c.cfgNetmap.wrapper,
|
||||
msPerBlockRdr: c.cfgMorph.client.MsPerBlock,
|
||||
},
|
||||
),
|
||||
netmapService.NewExecutionService(
|
||||
c,
|
||||
c.apiVersion,
|
||||
&netInfo{
|
||||
netState: c.cfgNetmap.state,
|
||||
magic: c.cfgMorph.client,
|
||||
morphClientNetMap: c.cfgNetmap.wrapper,
|
||||
msPerBlockRdr: c.cfgMorph.client.MsPerBlock,
|
||||
},
|
||||
c.respSvc,
|
||||
),
|
||||
),
|
||||
|
|
|
@ -53,10 +53,7 @@ func initSessionService(c *cfg) {
|
|||
server := sessionTransportGRPC.New(
|
||||
sessionSvc.NewSignService(
|
||||
&c.key.PrivateKey,
|
||||
sessionSvc.NewResponseService(
|
||||
sessionSvc.NewExecutionService(c.privateTokenStore, c.log),
|
||||
c.respSvc,
|
||||
),
|
||||
sessionSvc.NewExecutionService(c.privateTokenStore, c.respSvc, c.log),
|
||||
),
|
||||
)
|
||||
|
||||
|
|
|
@ -5,6 +5,7 @@ import (
|
|||
"fmt"
|
||||
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/accounting"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/util/response"
|
||||
)
|
||||
|
||||
type ServiceExecutor interface {
|
||||
|
@ -12,13 +13,15 @@ type ServiceExecutor interface {
|
|||
}
|
||||
|
||||
type executorSvc struct {
|
||||
exec ServiceExecutor
|
||||
exec ServiceExecutor
|
||||
respSvc *response.Service
|
||||
}
|
||||
|
||||
// NewExecutionService wraps ServiceExecutor and returns Accounting Service interface.
|
||||
func NewExecutionService(exec ServiceExecutor) Server {
|
||||
func NewExecutionService(exec ServiceExecutor, respSvc *response.Service) Server {
|
||||
return &executorSvc{
|
||||
exec: exec,
|
||||
exec: exec,
|
||||
respSvc: respSvc,
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -31,5 +34,6 @@ func (s *executorSvc) Balance(ctx context.Context, req *accounting.BalanceReques
|
|||
resp := new(accounting.BalanceResponse)
|
||||
resp.SetBody(respBody)
|
||||
|
||||
s.respSvc.SetMeta(resp)
|
||||
return resp, nil
|
||||
}
|
||||
|
|
|
@ -1,37 +0,0 @@
|
|||
package accounting
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/accounting"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/util"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/util/response"
|
||||
)
|
||||
|
||||
type responseService struct {
|
||||
respSvc *response.Service
|
||||
|
||||
svc Server
|
||||
}
|
||||
|
||||
// NewResponseService returns accounting service instance that passes internal service
|
||||
// call to response service.
|
||||
func NewResponseService(accSvc Server, respSvc *response.Service) Server {
|
||||
return &responseService{
|
||||
respSvc: respSvc,
|
||||
svc: accSvc,
|
||||
}
|
||||
}
|
||||
|
||||
func (s *responseService) Balance(ctx context.Context, req *accounting.BalanceRequest) (*accounting.BalanceResponse, error) {
|
||||
resp, err := s.respSvc.HandleUnaryRequest(ctx, req,
|
||||
func(ctx context.Context, req any) (util.ResponseMessage, error) {
|
||||
return s.svc.Balance(ctx, req.(*accounting.BalanceRequest))
|
||||
},
|
||||
)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return resp.(*accounting.BalanceResponse), nil
|
||||
}
|
|
@ -6,6 +6,7 @@ import (
|
|||
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/container"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/session"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/util/response"
|
||||
)
|
||||
|
||||
type ServiceExecutor interface {
|
||||
|
@ -21,12 +22,15 @@ type executorSvc struct {
|
|||
Server
|
||||
|
||||
exec ServiceExecutor
|
||||
|
||||
respSvc *response.Service
|
||||
}
|
||||
|
||||
// NewExecutionService wraps ServiceExecutor and returns Container Service interface.
|
||||
func NewExecutionService(exec ServiceExecutor) Server {
|
||||
func NewExecutionService(exec ServiceExecutor, respSvc *response.Service) Server {
|
||||
return &executorSvc{
|
||||
exec: exec,
|
||||
exec: exec,
|
||||
respSvc: respSvc,
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -44,6 +48,7 @@ func (s *executorSvc) Put(ctx context.Context, req *container.PutRequest) (*cont
|
|||
resp := new(container.PutResponse)
|
||||
resp.SetBody(respBody)
|
||||
|
||||
s.respSvc.SetMeta(resp)
|
||||
return resp, nil
|
||||
}
|
||||
|
||||
|
@ -61,6 +66,7 @@ func (s *executorSvc) Delete(ctx context.Context, req *container.DeleteRequest)
|
|||
resp := new(container.DeleteResponse)
|
||||
resp.SetBody(respBody)
|
||||
|
||||
s.respSvc.SetMeta(resp)
|
||||
return resp, nil
|
||||
}
|
||||
|
||||
|
@ -73,6 +79,7 @@ func (s *executorSvc) Get(ctx context.Context, req *container.GetRequest) (*cont
|
|||
resp := new(container.GetResponse)
|
||||
resp.SetBody(respBody)
|
||||
|
||||
s.respSvc.SetMeta(resp)
|
||||
return resp, nil
|
||||
}
|
||||
|
||||
|
@ -85,6 +92,7 @@ func (s *executorSvc) List(ctx context.Context, req *container.ListRequest) (*co
|
|||
resp := new(container.ListResponse)
|
||||
resp.SetBody(respBody)
|
||||
|
||||
s.respSvc.SetMeta(resp)
|
||||
return resp, nil
|
||||
}
|
||||
|
||||
|
@ -102,6 +110,7 @@ func (s *executorSvc) SetExtendedACL(ctx context.Context, req *container.SetExte
|
|||
resp := new(container.SetExtendedACLResponse)
|
||||
resp.SetBody(respBody)
|
||||
|
||||
s.respSvc.SetMeta(resp)
|
||||
return resp, nil
|
||||
}
|
||||
|
||||
|
@ -114,5 +123,6 @@ func (s *executorSvc) GetExtendedACL(ctx context.Context, req *container.GetExte
|
|||
resp := new(container.GetExtendedACLResponse)
|
||||
resp.SetBody(respBody)
|
||||
|
||||
s.respSvc.SetMeta(resp)
|
||||
return resp, nil
|
||||
}
|
||||
|
|
|
@ -1,115 +0,0 @@
|
|||
package container
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/container"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/util"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/util/response"
|
||||
)
|
||||
|
||||
type responseService struct {
|
||||
respSvc *response.Service
|
||||
|
||||
svc Server
|
||||
}
|
||||
|
||||
// NewResponseService returns container service instance that passes internal service
|
||||
// call to response service.
|
||||
func NewResponseService(cnrSvc Server, respSvc *response.Service) Server {
|
||||
return &responseService{
|
||||
respSvc: respSvc,
|
||||
svc: cnrSvc,
|
||||
}
|
||||
}
|
||||
|
||||
func (s *responseService) Put(ctx context.Context, req *container.PutRequest) (*container.PutResponse, error) {
|
||||
resp, err := s.respSvc.HandleUnaryRequest(ctx, req,
|
||||
func(ctx context.Context, req any) (util.ResponseMessage, error) {
|
||||
return s.svc.Put(ctx, req.(*container.PutRequest))
|
||||
},
|
||||
)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return resp.(*container.PutResponse), nil
|
||||
}
|
||||
|
||||
func (s *responseService) Delete(ctx context.Context, req *container.DeleteRequest) (*container.DeleteResponse, error) {
|
||||
resp, err := s.respSvc.HandleUnaryRequest(ctx, req,
|
||||
func(ctx context.Context, req any) (util.ResponseMessage, error) {
|
||||
return s.svc.Delete(ctx, req.(*container.DeleteRequest))
|
||||
},
|
||||
)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return resp.(*container.DeleteResponse), nil
|
||||
}
|
||||
|
||||
func (s *responseService) Get(ctx context.Context, req *container.GetRequest) (*container.GetResponse, error) {
|
||||
resp, err := s.respSvc.HandleUnaryRequest(ctx, req,
|
||||
func(ctx context.Context, req any) (util.ResponseMessage, error) {
|
||||
return s.svc.Get(ctx, req.(*container.GetRequest))
|
||||
},
|
||||
)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return resp.(*container.GetResponse), nil
|
||||
}
|
||||
|
||||
func (s *responseService) List(ctx context.Context, req *container.ListRequest) (*container.ListResponse, error) {
|
||||
resp, err := s.respSvc.HandleUnaryRequest(ctx, req,
|
||||
func(ctx context.Context, req any) (util.ResponseMessage, error) {
|
||||
return s.svc.List(ctx, req.(*container.ListRequest))
|
||||
},
|
||||
)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return resp.(*container.ListResponse), nil
|
||||
}
|
||||
|
||||
func (s *responseService) SetExtendedACL(ctx context.Context, req *container.SetExtendedACLRequest) (*container.SetExtendedACLResponse, error) {
|
||||
resp, err := s.respSvc.HandleUnaryRequest(ctx, req,
|
||||
func(ctx context.Context, req any) (util.ResponseMessage, error) {
|
||||
return s.svc.SetExtendedACL(ctx, req.(*container.SetExtendedACLRequest))
|
||||
},
|
||||
)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return resp.(*container.SetExtendedACLResponse), nil
|
||||
}
|
||||
|
||||
func (s *responseService) GetExtendedACL(ctx context.Context, req *container.GetExtendedACLRequest) (*container.GetExtendedACLResponse, error) {
|
||||
resp, err := s.respSvc.HandleUnaryRequest(ctx, req,
|
||||
func(ctx context.Context, req any) (util.ResponseMessage, error) {
|
||||
return s.svc.GetExtendedACL(ctx, req.(*container.GetExtendedACLRequest))
|
||||
},
|
||||
)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return resp.(*container.GetExtendedACLResponse), nil
|
||||
}
|
||||
|
||||
func (s *responseService) AnnounceUsedSpace(ctx context.Context, req *container.AnnounceUsedSpaceRequest) (*container.AnnounceUsedSpaceResponse, error) {
|
||||
resp, err := s.respSvc.HandleUnaryRequest(ctx, req,
|
||||
func(ctx context.Context, req any) (util.ResponseMessage, error) {
|
||||
return s.svc.AnnounceUsedSpace(ctx, req.(*container.AnnounceUsedSpaceRequest))
|
||||
},
|
||||
)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return resp.(*container.AnnounceUsedSpaceResponse), nil
|
||||
}
|
|
@ -8,6 +8,7 @@ import (
|
|||
"git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/netmap"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/refs"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/version"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/util/response"
|
||||
netmapSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/netmap"
|
||||
versionsdk "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/version"
|
||||
)
|
||||
|
@ -18,6 +19,8 @@ type executorSvc struct {
|
|||
state NodeState
|
||||
|
||||
netInfo NetworkInfo
|
||||
|
||||
respSvc *response.Service
|
||||
}
|
||||
|
||||
// NodeState encapsulates information
|
||||
|
@ -42,8 +45,8 @@ type NetworkInfo interface {
|
|||
Dump(versionsdk.Version) (*netmapSDK.NetworkInfo, error)
|
||||
}
|
||||
|
||||
func NewExecutionService(s NodeState, v versionsdk.Version, netInfo NetworkInfo) Server {
|
||||
if s == nil || netInfo == nil || !version.IsValid(v) {
|
||||
func NewExecutionService(s NodeState, v versionsdk.Version, netInfo NetworkInfo, respSvc *response.Service) Server {
|
||||
if s == nil || netInfo == nil || !version.IsValid(v) || respSvc == nil {
|
||||
// this should never happen, otherwise it programmers bug
|
||||
panic("can't create netmap execution service")
|
||||
}
|
||||
|
@ -51,6 +54,7 @@ func NewExecutionService(s NodeState, v versionsdk.Version, netInfo NetworkInfo)
|
|||
res := &executorSvc{
|
||||
state: s,
|
||||
netInfo: netInfo,
|
||||
respSvc: respSvc,
|
||||
}
|
||||
|
||||
v.WriteToV2(&res.version)
|
||||
|
@ -96,6 +100,7 @@ func (s *executorSvc) LocalNodeInfo(
|
|||
resp := new(netmap.LocalNodeInfoResponse)
|
||||
resp.SetBody(body)
|
||||
|
||||
s.respSvc.SetMeta(resp)
|
||||
return resp, nil
|
||||
}
|
||||
|
||||
|
@ -126,6 +131,7 @@ func (s *executorSvc) NetworkInfo(
|
|||
resp := new(netmap.NetworkInfoResponse)
|
||||
resp.SetBody(body)
|
||||
|
||||
s.respSvc.SetMeta(resp)
|
||||
return resp, nil
|
||||
}
|
||||
|
||||
|
@ -143,5 +149,6 @@ func (s *executorSvc) Snapshot(_ context.Context, _ *netmap.SnapshotRequest) (*n
|
|||
resp := new(netmap.SnapshotResponse)
|
||||
resp.SetBody(body)
|
||||
|
||||
s.respSvc.SetMeta(resp)
|
||||
return resp, nil
|
||||
}
|
||||
|
|
|
@ -1,63 +0,0 @@
|
|||
package netmap
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/netmap"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/util"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/util/response"
|
||||
)
|
||||
|
||||
type responseService struct {
|
||||
respSvc *response.Service
|
||||
|
||||
svc Server
|
||||
}
|
||||
|
||||
// NewResponseService returns netmap service instance that passes internal service
|
||||
// call to response service.
|
||||
func NewResponseService(nmSvc Server, respSvc *response.Service) Server {
|
||||
return &responseService{
|
||||
respSvc: respSvc,
|
||||
svc: nmSvc,
|
||||
}
|
||||
}
|
||||
|
||||
func (s *responseService) LocalNodeInfo(ctx context.Context, req *netmap.LocalNodeInfoRequest) (*netmap.LocalNodeInfoResponse, error) {
|
||||
resp, err := s.respSvc.HandleUnaryRequest(ctx, req,
|
||||
func(ctx context.Context, req any) (util.ResponseMessage, error) {
|
||||
return s.svc.LocalNodeInfo(ctx, req.(*netmap.LocalNodeInfoRequest))
|
||||
},
|
||||
)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return resp.(*netmap.LocalNodeInfoResponse), nil
|
||||
}
|
||||
|
||||
func (s *responseService) NetworkInfo(ctx context.Context, req *netmap.NetworkInfoRequest) (*netmap.NetworkInfoResponse, error) {
|
||||
resp, err := s.respSvc.HandleUnaryRequest(ctx, req,
|
||||
func(ctx context.Context, req any) (util.ResponseMessage, error) {
|
||||
return s.svc.NetworkInfo(ctx, req.(*netmap.NetworkInfoRequest))
|
||||
},
|
||||
)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return resp.(*netmap.NetworkInfoResponse), nil
|
||||
}
|
||||
|
||||
func (s *responseService) Snapshot(ctx context.Context, req *netmap.SnapshotRequest) (*netmap.SnapshotResponse, error) {
|
||||
resp, err := s.respSvc.HandleUnaryRequest(ctx, req,
|
||||
func(ctx context.Context, req any) (util.ResponseMessage, error) {
|
||||
return s.svc.Snapshot(ctx, req.(*netmap.SnapshotRequest))
|
||||
},
|
||||
)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return resp.(*netmap.SnapshotResponse), nil
|
||||
}
|
|
@ -91,16 +91,13 @@ func (s *ResponseService) Put() (PutObjectStream, error) {
|
|||
}
|
||||
|
||||
func (s *ResponseService) Head(ctx context.Context, req *object.HeadRequest) (*object.HeadResponse, error) {
|
||||
resp, err := s.respSvc.HandleUnaryRequest(ctx, req,
|
||||
func(ctx context.Context, req any) (util.ResponseMessage, error) {
|
||||
return s.svc.Head(ctx, req.(*object.HeadRequest))
|
||||
},
|
||||
)
|
||||
resp, err := s.svc.Head(ctx, req)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return resp.(*object.HeadResponse), nil
|
||||
s.respSvc.SetMeta(resp)
|
||||
return resp, nil
|
||||
}
|
||||
|
||||
func (s *searchStreamResponser) Send(resp *object.SearchResponse) error {
|
||||
|
@ -117,16 +114,13 @@ func (s *ResponseService) Search(req *object.SearchRequest, stream SearchStream)
|
|||
}
|
||||
|
||||
func (s *ResponseService) Delete(ctx context.Context, req *object.DeleteRequest) (*object.DeleteResponse, error) {
|
||||
resp, err := s.respSvc.HandleUnaryRequest(ctx, req,
|
||||
func(ctx context.Context, req any) (util.ResponseMessage, error) {
|
||||
return s.svc.Delete(ctx, req.(*object.DeleteRequest))
|
||||
},
|
||||
)
|
||||
resp, err := s.svc.Delete(ctx, req)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return resp.(*object.DeleteResponse), nil
|
||||
s.respSvc.SetMeta(resp)
|
||||
return resp, nil
|
||||
}
|
||||
|
||||
func (s *getRangeStreamResponser) Send(resp *object.GetRangeResponse) error {
|
||||
|
@ -143,14 +137,11 @@ func (s *ResponseService) GetRange(req *object.GetRangeRequest, stream GetObject
|
|||
}
|
||||
|
||||
func (s *ResponseService) GetRangeHash(ctx context.Context, req *object.GetRangeHashRequest) (*object.GetRangeHashResponse, error) {
|
||||
resp, err := s.respSvc.HandleUnaryRequest(ctx, req,
|
||||
func(ctx context.Context, req any) (util.ResponseMessage, error) {
|
||||
return s.svc.GetRangeHash(ctx, req.(*object.GetRangeHashRequest))
|
||||
},
|
||||
)
|
||||
resp, err := s.svc.GetRangeHash(ctx, req)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return resp.(*object.GetRangeHashResponse), nil
|
||||
s.respSvc.SetMeta(resp)
|
||||
return resp, nil
|
||||
}
|
||||
|
|
|
@ -6,6 +6,7 @@ import (
|
|||
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/session"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/util/response"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger"
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
@ -17,14 +18,17 @@ type ServiceExecutor interface {
|
|||
type executorSvc struct {
|
||||
exec ServiceExecutor
|
||||
|
||||
respSvc *response.Service
|
||||
|
||||
log *logger.Logger
|
||||
}
|
||||
|
||||
// NewExecutionService wraps ServiceExecutor and returns Session Service interface.
|
||||
func NewExecutionService(exec ServiceExecutor, l *logger.Logger) Server {
|
||||
func NewExecutionService(exec ServiceExecutor, respSvc *response.Service, l *logger.Logger) Server {
|
||||
return &executorSvc{
|
||||
exec: exec,
|
||||
log: l,
|
||||
exec: exec,
|
||||
log: l,
|
||||
respSvc: respSvc,
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -42,5 +46,6 @@ func (s *executorSvc) Create(ctx context.Context, req *session.CreateRequest) (*
|
|||
resp := new(session.CreateResponse)
|
||||
resp.SetBody(respBody)
|
||||
|
||||
s.respSvc.SetMeta(resp)
|
||||
return resp, nil
|
||||
}
|
||||
|
|
|
@ -1,37 +0,0 @@
|
|||
package session
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/session"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/util"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/util/response"
|
||||
)
|
||||
|
||||
type responseService struct {
|
||||
respSvc *response.Service
|
||||
|
||||
svc Server
|
||||
}
|
||||
|
||||
// NewResponseService returns session service instance that passes internal service
|
||||
// call to response service.
|
||||
func NewResponseService(ssSvc Server, respSvc *response.Service) Server {
|
||||
return &responseService{
|
||||
respSvc: respSvc,
|
||||
svc: ssSvc,
|
||||
}
|
||||
}
|
||||
|
||||
func (s *responseService) Create(ctx context.Context, req *session.CreateRequest) (*session.CreateResponse, error) {
|
||||
resp, err := s.respSvc.HandleUnaryRequest(ctx, req,
|
||||
func(ctx context.Context, req any) (util.ResponseMessage, error) {
|
||||
return s.svc.Create(ctx, req.(*session.CreateRequest))
|
||||
},
|
||||
)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return resp.(*session.CreateResponse), nil
|
||||
}
|
|
@ -10,7 +10,7 @@ import (
|
|||
// ClientMessageStreamer represents client-side message streamer
|
||||
// that sets meta values to the response.
|
||||
type ClientMessageStreamer struct {
|
||||
cfg *cfg
|
||||
srv *Service
|
||||
|
||||
send util.RequestMessageWriter
|
||||
|
||||
|
@ -33,7 +33,7 @@ func (s *ClientMessageStreamer) CloseAndRecv(ctx context.Context) (util.Response
|
|||
return nil, fmt.Errorf("(%T) could not close stream and receive response: %w", s, err)
|
||||
}
|
||||
|
||||
setMeta(resp, s.cfg)
|
||||
s.srv.SetMeta(resp)
|
||||
|
||||
return resp, nil
|
||||
}
|
||||
|
@ -41,7 +41,7 @@ func (s *ClientMessageStreamer) CloseAndRecv(ctx context.Context) (util.Response
|
|||
// CreateRequestStreamer wraps stream methods and returns ClientMessageStreamer instance.
|
||||
func (s *Service) CreateRequestStreamer(sender util.RequestMessageWriter, closer util.ClientStreamCloser) *ClientMessageStreamer {
|
||||
return &ClientMessageStreamer{
|
||||
cfg: s.cfg,
|
||||
srv: s,
|
||||
send: sender,
|
||||
close: closer,
|
||||
}
|
||||
|
|
|
@ -9,7 +9,7 @@ import (
|
|||
// ServerMessageStreamer represents server-side message streamer
|
||||
// that sets meta values to all response messages.
|
||||
type ServerMessageStreamer struct {
|
||||
cfg *cfg
|
||||
srv *Service
|
||||
|
||||
recv util.ResponseMessageReader
|
||||
}
|
||||
|
@ -22,7 +22,7 @@ func (s *ServerMessageStreamer) Recv() (util.ResponseMessage, error) {
|
|||
return nil, fmt.Errorf("could not receive response message for signing: %w", err)
|
||||
}
|
||||
|
||||
setMeta(m, s.cfg)
|
||||
s.srv.SetMeta(m)
|
||||
|
||||
return m, nil
|
||||
}
|
||||
|
@ -30,7 +30,7 @@ func (s *ServerMessageStreamer) Recv() (util.ResponseMessage, error) {
|
|||
// HandleServerStreamRequest builds internal streamer via handlers, wraps it to ServerMessageStreamer and returns the result.
|
||||
func (s *Service) HandleServerStreamRequest(respWriter util.ResponseMessageWriter) util.ResponseMessageWriter {
|
||||
return func(resp util.ResponseMessage) error {
|
||||
setMeta(resp, s.cfg)
|
||||
s.SetMeta(resp)
|
||||
|
||||
return respWriter(resp)
|
||||
}
|
||||
|
|
|
@ -44,11 +44,12 @@ func NewService(opts ...Option) *Service {
|
|||
}
|
||||
}
|
||||
|
||||
func setMeta(resp util.ResponseMessage, cfg *cfg) {
|
||||
// SetMeta sets adds meta-header to resp.
|
||||
func (s *Service) SetMeta(resp util.ResponseMessage) {
|
||||
meta := new(session.ResponseMetaHeader)
|
||||
meta.SetVersion(&cfg.version)
|
||||
meta.SetVersion(&s.cfg.version)
|
||||
meta.SetTTL(1) // FIXME: #1160 TTL must be calculated
|
||||
meta.SetEpoch(cfg.state.CurrentEpoch())
|
||||
meta.SetEpoch(s.cfg.state.CurrentEpoch())
|
||||
|
||||
if origin := resp.GetMetaHeader(); origin != nil {
|
||||
// FIXME: #1160 what if origin is set by local server?
|
||||
|
|
|
@ -1,21 +0,0 @@
|
|||
package response
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/util"
|
||||
)
|
||||
|
||||
// HandleUnaryRequest call passes request to handler, sets response meta header values and returns it.
|
||||
func (s *Service) HandleUnaryRequest(ctx context.Context, req any, handler util.UnaryHandler) (util.ResponseMessage, error) {
|
||||
// process request
|
||||
resp, err := handler(ctx, req)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("could not handle request: %w", err)
|
||||
}
|
||||
|
||||
setMeta(resp, s.cfg)
|
||||
|
||||
return resp, nil
|
||||
}
|
Loading…
Reference in a new issue