Compare commits

..

No commits in common. "fyrchik/simplify-services" and "master" have entirely different histories.

59 changed files with 1475 additions and 577 deletions

View file

@ -19,8 +19,10 @@ func initAccountingService(c *cfg) {
server := accountingTransportGRPC.New( server := accountingTransportGRPC.New(
accountingService.NewSignService( accountingService.NewSignService(
&c.key.PrivateKey, &c.key.PrivateKey,
accountingService.NewResponseService(
accountingService.NewExecutionService( accountingService.NewExecutionService(
accounting.NewExecutor(balanceMorphWrapper), accounting.NewExecutor(balanceMorphWrapper),
),
c.respSvc, c.respSvc,
), ),
), ),

View file

@ -582,7 +582,7 @@ func initCfg(appCfg *config.Config) *cfg {
key: key, key: key,
binPublicKey: key.PublicKey().Bytes(), binPublicKey: key.PublicKey().Bytes(),
localAddr: netAddr, localAddr: netAddr,
respSvc: response.NewService(netState), respSvc: response.NewService(response.WithNetworkState(netState)),
clientCache: cache.NewSDKClientCache(cacheOpts), clientCache: cache.NewSDKClientCache(cacheOpts),
bgClientCache: cache.NewSDKClientCache(cacheOpts), bgClientCache: cache.NewSDKClientCache(cacheOpts),
putClientCache: cache.NewSDKClientCache(cacheOpts), putClientCache: cache.NewSDKClientCache(cacheOpts),

View file

@ -196,13 +196,16 @@ func initContainerService(c *cfg) {
server := containerTransportGRPC.New( server := containerTransportGRPC.New(
containerService.NewSignService( containerService.NewSignService(
&c.key.PrivateKey, &c.key.PrivateKey,
containerService.NewResponseService(
&usedSpaceService{ &usedSpaceService{
Server: containerService.NewExecutionService(containerMorph.NewExecutor(cnrRdr, cnrWrt), c.respSvc), Server: containerService.NewExecutionService(containerMorph.NewExecutor(cnrRdr, cnrWrt)),
loadWriterProvider: loadRouter, loadWriterProvider: loadRouter,
loadPlacementBuilder: loadPlacementBuilder, loadPlacementBuilder: loadPlacementBuilder,
routeBuilder: routeBuilder, routeBuilder: routeBuilder,
cfg: c, cfg: c,
}, },
c.respSvc,
),
), ),
) )
@ -563,8 +566,6 @@ func (c *usedSpaceService) AnnounceUsedSpace(ctx context.Context, req *container
resp := new(containerV2.AnnounceUsedSpaceResponse) resp := new(containerV2.AnnounceUsedSpaceResponse)
resp.SetBody(respBody) resp.SetBody(respBody)
c.cfg.respSvc.SetMeta(resp)
return resp, nil return resp, nil
} }

View file

@ -152,6 +152,7 @@ func initNetmapService(c *cfg) {
server := netmapTransportGRPC.New( server := netmapTransportGRPC.New(
netmapService.NewSignService( netmapService.NewSignService(
&c.key.PrivateKey, &c.key.PrivateKey,
netmapService.NewResponseService(
netmapService.NewExecutionService( netmapService.NewExecutionService(
c, c,
c.apiVersion, c.apiVersion,
@ -161,6 +162,7 @@ func initNetmapService(c *cfg) {
morphClientNetMap: c.cfgNetmap.wrapper, morphClientNetMap: c.cfgNetmap.wrapper,
msPerBlockRdr: c.cfgMorph.client.MsPerBlock, msPerBlockRdr: c.cfgMorph.client.MsPerBlock,
}, },
),
c.respSvc, c.respSvc,
), ),
), ),

View file

@ -198,6 +198,7 @@ func initReputationService(c *cfg) {
server := grpcreputation.New( server := grpcreputation.New(
reputationrpc.NewSignService( reputationrpc.NewSignService(
&c.key.PrivateKey, &c.key.PrivateKey,
reputationrpc.NewResponseService(
&reputationServer{ &reputationServer{
cfg: c, cfg: c,
log: c.log, log: c.log,
@ -205,6 +206,8 @@ func initReputationService(c *cfg) {
intermediateRouter: intermediateTrustRouter, intermediateRouter: intermediateTrustRouter,
routeBuilder: localRouteBuilder, routeBuilder: localRouteBuilder,
}, },
c.respSvc,
),
), ),
) )
@ -285,7 +288,6 @@ func (s *reputationServer) AnnounceLocalTrust(ctx context.Context, req *v2reputa
resp := new(v2reputation.AnnounceLocalTrustResponse) resp := new(v2reputation.AnnounceLocalTrustResponse)
resp.SetBody(new(v2reputation.AnnounceLocalTrustResponseBody)) resp.SetBody(new(v2reputation.AnnounceLocalTrustResponseBody))
s.respSvc.SetMeta(resp)
return resp, nil return resp, nil
} }
@ -314,7 +316,6 @@ func (s *reputationServer) AnnounceIntermediateResult(ctx context.Context, req *
resp := new(v2reputation.AnnounceIntermediateResultResponse) resp := new(v2reputation.AnnounceIntermediateResultResponse)
resp.SetBody(new(v2reputation.AnnounceIntermediateResultResponseBody)) resp.SetBody(new(v2reputation.AnnounceIntermediateResultResponseBody))
s.respSvc.SetMeta(resp)
return resp, nil return resp, nil
} }

View file

@ -53,7 +53,10 @@ func initSessionService(c *cfg) {
server := sessionTransportGRPC.New( server := sessionTransportGRPC.New(
sessionSvc.NewSignService( sessionSvc.NewSignService(
&c.key.PrivateKey, &c.key.PrivateKey,
sessionSvc.NewExecutionService(c.privateTokenStore, c.respSvc, c.log), sessionSvc.NewResponseService(
sessionSvc.NewExecutionService(c.privateTokenStore, c.log),
c.respSvc,
),
), ),
) )

View file

@ -5,7 +5,6 @@ import (
"fmt" "fmt"
"github.com/TrueCloudLab/frostfs-api-go/v2/accounting" "github.com/TrueCloudLab/frostfs-api-go/v2/accounting"
"github.com/TrueCloudLab/frostfs-node/pkg/services/util/response"
) )
type ServiceExecutor interface { type ServiceExecutor interface {
@ -14,14 +13,12 @@ type ServiceExecutor interface {
type executorSvc struct { type executorSvc struct {
exec ServiceExecutor exec ServiceExecutor
respSvc *response.Service
} }
// NewExecutionService wraps ServiceExecutor and returns Accounting Service interface. // NewExecutionService wraps ServiceExecutor and returns Accounting Service interface.
func NewExecutionService(exec ServiceExecutor, respSvc *response.Service) Server { func NewExecutionService(exec ServiceExecutor) Server {
return &executorSvc{ return &executorSvc{
exec: exec, exec: exec,
respSvc: respSvc,
} }
} }
@ -34,6 +31,5 @@ func (s *executorSvc) Balance(ctx context.Context, req *accounting.BalanceReques
resp := new(accounting.BalanceResponse) resp := new(accounting.BalanceResponse)
resp.SetBody(respBody) resp.SetBody(respBody)
s.respSvc.SetMeta(resp)
return resp, nil return resp, nil
} }

View file

@ -0,0 +1,37 @@
package accounting
import (
"context"
"github.com/TrueCloudLab/frostfs-api-go/v2/accounting"
"github.com/TrueCloudLab/frostfs-node/pkg/services/util"
"github.com/TrueCloudLab/frostfs-node/pkg/services/util/response"
)
type responseService struct {
respSvc *response.Service
svc Server
}
// NewResponseService returns accounting service instance that passes internal service
// call to response service.
func NewResponseService(accSvc Server, respSvc *response.Service) Server {
return &responseService{
respSvc: respSvc,
svc: accSvc,
}
}
func (s *responseService) Balance(ctx context.Context, req *accounting.BalanceRequest) (*accounting.BalanceResponse, error) {
resp, err := s.respSvc.HandleUnaryRequest(ctx, req,
func(ctx context.Context, req any) (util.ResponseMessage, error) {
return s.svc.Balance(ctx, req.(*accounting.BalanceRequest))
},
)
if err != nil {
return nil, err
}
return resp.(*accounting.BalanceResponse), nil
}

View file

@ -22,6 +22,17 @@ func NewSignService(key *ecdsa.PrivateKey, svc Server) Server {
} }
func (s *signService) Balance(ctx context.Context, req *accounting.BalanceRequest) (*accounting.BalanceResponse, error) { func (s *signService) Balance(ctx context.Context, req *accounting.BalanceRequest) (*accounting.BalanceResponse, error) {
resp, err := util.WrapResponse(s.svc.Balance(ctx, req)) resp, err := s.sigSvc.HandleUnaryRequest(ctx, req,
return resp, s.sigSvc.SignResponse(util.IsStatusSupported(req), resp, err) func(ctx context.Context, req any) (util.ResponseMessage, error) {
return s.svc.Balance(ctx, req.(*accounting.BalanceRequest))
},
func() util.ResponseMessage {
return new(accounting.BalanceResponse)
},
)
if err != nil {
return nil, err
}
return resp.(*accounting.BalanceResponse), nil
} }

View file

@ -6,7 +6,6 @@ import (
"github.com/TrueCloudLab/frostfs-api-go/v2/container" "github.com/TrueCloudLab/frostfs-api-go/v2/container"
"github.com/TrueCloudLab/frostfs-api-go/v2/session" "github.com/TrueCloudLab/frostfs-api-go/v2/session"
"github.com/TrueCloudLab/frostfs-node/pkg/services/util/response"
) )
type ServiceExecutor interface { type ServiceExecutor interface {
@ -22,15 +21,12 @@ type executorSvc struct {
Server Server
exec ServiceExecutor exec ServiceExecutor
respSvc *response.Service
} }
// NewExecutionService wraps ServiceExecutor and returns Container Service interface. // NewExecutionService wraps ServiceExecutor and returns Container Service interface.
func NewExecutionService(exec ServiceExecutor, respSvc *response.Service) Server { func NewExecutionService(exec ServiceExecutor) Server {
return &executorSvc{ return &executorSvc{
exec: exec, exec: exec,
respSvc: respSvc,
} }
} }
@ -48,7 +44,6 @@ func (s *executorSvc) Put(ctx context.Context, req *container.PutRequest) (*cont
resp := new(container.PutResponse) resp := new(container.PutResponse)
resp.SetBody(respBody) resp.SetBody(respBody)
s.respSvc.SetMeta(resp)
return resp, nil return resp, nil
} }
@ -66,7 +61,6 @@ func (s *executorSvc) Delete(ctx context.Context, req *container.DeleteRequest)
resp := new(container.DeleteResponse) resp := new(container.DeleteResponse)
resp.SetBody(respBody) resp.SetBody(respBody)
s.respSvc.SetMeta(resp)
return resp, nil return resp, nil
} }
@ -79,7 +73,6 @@ func (s *executorSvc) Get(ctx context.Context, req *container.GetRequest) (*cont
resp := new(container.GetResponse) resp := new(container.GetResponse)
resp.SetBody(respBody) resp.SetBody(respBody)
s.respSvc.SetMeta(resp)
return resp, nil return resp, nil
} }
@ -92,7 +85,6 @@ func (s *executorSvc) List(ctx context.Context, req *container.ListRequest) (*co
resp := new(container.ListResponse) resp := new(container.ListResponse)
resp.SetBody(respBody) resp.SetBody(respBody)
s.respSvc.SetMeta(resp)
return resp, nil return resp, nil
} }
@ -110,7 +102,6 @@ func (s *executorSvc) SetExtendedACL(ctx context.Context, req *container.SetExte
resp := new(container.SetExtendedACLResponse) resp := new(container.SetExtendedACLResponse)
resp.SetBody(respBody) resp.SetBody(respBody)
s.respSvc.SetMeta(resp)
return resp, nil return resp, nil
} }
@ -123,6 +114,5 @@ func (s *executorSvc) GetExtendedACL(ctx context.Context, req *container.GetExte
resp := new(container.GetExtendedACLResponse) resp := new(container.GetExtendedACLResponse)
resp.SetBody(respBody) resp.SetBody(respBody)
s.respSvc.SetMeta(resp)
return resp, nil return resp, nil
} }

View file

@ -0,0 +1,115 @@
package container
import (
"context"
"github.com/TrueCloudLab/frostfs-api-go/v2/container"
"github.com/TrueCloudLab/frostfs-node/pkg/services/util"
"github.com/TrueCloudLab/frostfs-node/pkg/services/util/response"
)
type responseService struct {
respSvc *response.Service
svc Server
}
// NewResponseService returns container service instance that passes internal service
// call to response service.
func NewResponseService(cnrSvc Server, respSvc *response.Service) Server {
return &responseService{
respSvc: respSvc,
svc: cnrSvc,
}
}
func (s *responseService) Put(ctx context.Context, req *container.PutRequest) (*container.PutResponse, error) {
resp, err := s.respSvc.HandleUnaryRequest(ctx, req,
func(ctx context.Context, req any) (util.ResponseMessage, error) {
return s.svc.Put(ctx, req.(*container.PutRequest))
},
)
if err != nil {
return nil, err
}
return resp.(*container.PutResponse), nil
}
func (s *responseService) Delete(ctx context.Context, req *container.DeleteRequest) (*container.DeleteResponse, error) {
resp, err := s.respSvc.HandleUnaryRequest(ctx, req,
func(ctx context.Context, req any) (util.ResponseMessage, error) {
return s.svc.Delete(ctx, req.(*container.DeleteRequest))
},
)
if err != nil {
return nil, err
}
return resp.(*container.DeleteResponse), nil
}
func (s *responseService) Get(ctx context.Context, req *container.GetRequest) (*container.GetResponse, error) {
resp, err := s.respSvc.HandleUnaryRequest(ctx, req,
func(ctx context.Context, req any) (util.ResponseMessage, error) {
return s.svc.Get(ctx, req.(*container.GetRequest))
},
)
if err != nil {
return nil, err
}
return resp.(*container.GetResponse), nil
}
func (s *responseService) List(ctx context.Context, req *container.ListRequest) (*container.ListResponse, error) {
resp, err := s.respSvc.HandleUnaryRequest(ctx, req,
func(ctx context.Context, req any) (util.ResponseMessage, error) {
return s.svc.List(ctx, req.(*container.ListRequest))
},
)
if err != nil {
return nil, err
}
return resp.(*container.ListResponse), nil
}
func (s *responseService) SetExtendedACL(ctx context.Context, req *container.SetExtendedACLRequest) (*container.SetExtendedACLResponse, error) {
resp, err := s.respSvc.HandleUnaryRequest(ctx, req,
func(ctx context.Context, req any) (util.ResponseMessage, error) {
return s.svc.SetExtendedACL(ctx, req.(*container.SetExtendedACLRequest))
},
)
if err != nil {
return nil, err
}
return resp.(*container.SetExtendedACLResponse), nil
}
func (s *responseService) GetExtendedACL(ctx context.Context, req *container.GetExtendedACLRequest) (*container.GetExtendedACLResponse, error) {
resp, err := s.respSvc.HandleUnaryRequest(ctx, req,
func(ctx context.Context, req any) (util.ResponseMessage, error) {
return s.svc.GetExtendedACL(ctx, req.(*container.GetExtendedACLRequest))
},
)
if err != nil {
return nil, err
}
return resp.(*container.GetExtendedACLResponse), nil
}
func (s *responseService) AnnounceUsedSpace(ctx context.Context, req *container.AnnounceUsedSpaceRequest) (*container.AnnounceUsedSpaceResponse, error) {
resp, err := s.respSvc.HandleUnaryRequest(ctx, req,
func(ctx context.Context, req any) (util.ResponseMessage, error) {
return s.svc.AnnounceUsedSpace(ctx, req.(*container.AnnounceUsedSpaceRequest))
},
)
if err != nil {
return nil, err
}
return resp.(*container.AnnounceUsedSpaceResponse), nil
}

View file

@ -22,64 +22,113 @@ func NewSignService(key *ecdsa.PrivateKey, svc Server) Server {
} }
func (s *signService) Put(ctx context.Context, req *container.PutRequest) (*container.PutResponse, error) { func (s *signService) Put(ctx context.Context, req *container.PutRequest) (*container.PutResponse, error) {
if err := s.sigSvc.VerifyRequest(req); err != nil { resp, err := s.sigSvc.HandleUnaryRequest(ctx, req,
resp := new(container.PutResponse) func(ctx context.Context, req any) (util.ResponseMessage, error) {
return resp, s.sigSvc.SignResponse(util.IsStatusSupported(req), resp, err) return s.svc.Put(ctx, req.(*container.PutRequest))
},
func() util.ResponseMessage {
return new(container.PutResponse)
},
)
if err != nil {
return nil, err
} }
resp, err := util.WrapResponse(s.svc.Put(ctx, req))
return resp, s.sigSvc.SignResponse(util.IsStatusSupported(req), resp, err) return resp.(*container.PutResponse), nil
} }
func (s *signService) Delete(ctx context.Context, req *container.DeleteRequest) (*container.DeleteResponse, error) { func (s *signService) Delete(ctx context.Context, req *container.DeleteRequest) (*container.DeleteResponse, error) {
if err := s.sigSvc.VerifyRequest(req); err != nil { resp, err := s.sigSvc.HandleUnaryRequest(ctx, req,
resp := new(container.DeleteResponse) func(ctx context.Context, req any) (util.ResponseMessage, error) {
return resp, s.sigSvc.SignResponse(util.IsStatusSupported(req), resp, err) return s.svc.Delete(ctx, req.(*container.DeleteRequest))
},
func() util.ResponseMessage {
return new(container.DeleteResponse)
},
)
if err != nil {
return nil, err
} }
resp, err := util.WrapResponse(s.svc.Delete(ctx, req))
return resp, s.sigSvc.SignResponse(util.IsStatusSupported(req), resp, err) return resp.(*container.DeleteResponse), nil
} }
func (s *signService) Get(ctx context.Context, req *container.GetRequest) (*container.GetResponse, error) { func (s *signService) Get(ctx context.Context, req *container.GetRequest) (*container.GetResponse, error) {
if err := s.sigSvc.VerifyRequest(req); err != nil { resp, err := s.sigSvc.HandleUnaryRequest(ctx, req,
resp := new(container.GetResponse) func(ctx context.Context, req any) (util.ResponseMessage, error) {
return resp, s.sigSvc.SignResponse(util.IsStatusSupported(req), resp, err) return s.svc.Get(ctx, req.(*container.GetRequest))
},
func() util.ResponseMessage {
return new(container.GetResponse)
},
)
if err != nil {
return nil, err
} }
resp, err := util.WrapResponse(s.svc.Get(ctx, req))
return resp, s.sigSvc.SignResponse(util.IsStatusSupported(req), resp, err) return resp.(*container.GetResponse), nil
} }
func (s *signService) List(ctx context.Context, req *container.ListRequest) (*container.ListResponse, error) { func (s *signService) List(ctx context.Context, req *container.ListRequest) (*container.ListResponse, error) {
if err := s.sigSvc.VerifyRequest(req); err != nil { resp, err := s.sigSvc.HandleUnaryRequest(ctx, req,
resp := new(container.ListResponse) func(ctx context.Context, req any) (util.ResponseMessage, error) {
return resp, s.sigSvc.SignResponse(util.IsStatusSupported(req), resp, err) return s.svc.List(ctx, req.(*container.ListRequest))
},
func() util.ResponseMessage {
return new(container.ListResponse)
},
)
if err != nil {
return nil, err
} }
resp, err := util.WrapResponse(s.svc.List(ctx, req))
return resp, s.sigSvc.SignResponse(util.IsStatusSupported(req), resp, err) return resp.(*container.ListResponse), nil
} }
func (s *signService) SetExtendedACL(ctx context.Context, req *container.SetExtendedACLRequest) (*container.SetExtendedACLResponse, error) { func (s *signService) SetExtendedACL(ctx context.Context, req *container.SetExtendedACLRequest) (*container.SetExtendedACLResponse, error) {
if err := s.sigSvc.VerifyRequest(req); err != nil { resp, err := s.sigSvc.HandleUnaryRequest(ctx, req,
resp := new(container.SetExtendedACLResponse) func(ctx context.Context, req any) (util.ResponseMessage, error) {
return resp, s.sigSvc.SignResponse(util.IsStatusSupported(req), resp, err) return s.svc.SetExtendedACL(ctx, req.(*container.SetExtendedACLRequest))
},
func() util.ResponseMessage {
return new(container.SetExtendedACLResponse)
},
)
if err != nil {
return nil, err
} }
resp, err := util.WrapResponse(s.svc.SetExtendedACL(ctx, req))
return resp, s.sigSvc.SignResponse(util.IsStatusSupported(req), resp, err) return resp.(*container.SetExtendedACLResponse), nil
} }
func (s *signService) GetExtendedACL(ctx context.Context, req *container.GetExtendedACLRequest) (*container.GetExtendedACLResponse, error) { func (s *signService) GetExtendedACL(ctx context.Context, req *container.GetExtendedACLRequest) (*container.GetExtendedACLResponse, error) {
if err := s.sigSvc.VerifyRequest(req); err != nil { resp, err := s.sigSvc.HandleUnaryRequest(ctx, req,
resp := new(container.GetExtendedACLResponse) func(ctx context.Context, req any) (util.ResponseMessage, error) {
return resp, s.sigSvc.SignResponse(util.IsStatusSupported(req), resp, err) return s.svc.GetExtendedACL(ctx, req.(*container.GetExtendedACLRequest))
},
func() util.ResponseMessage {
return new(container.GetExtendedACLResponse)
},
)
if err != nil {
return nil, err
} }
resp, err := util.WrapResponse(s.svc.GetExtendedACL(ctx, req))
return resp, s.sigSvc.SignResponse(util.IsStatusSupported(req), resp, err) return resp.(*container.GetExtendedACLResponse), nil
} }
func (s *signService) AnnounceUsedSpace(ctx context.Context, req *container.AnnounceUsedSpaceRequest) (*container.AnnounceUsedSpaceResponse, error) { func (s *signService) AnnounceUsedSpace(ctx context.Context, req *container.AnnounceUsedSpaceRequest) (*container.AnnounceUsedSpaceResponse, error) {
if err := s.sigSvc.VerifyRequest(req); err != nil { resp, err := s.sigSvc.HandleUnaryRequest(ctx, req,
resp := new(container.AnnounceUsedSpaceResponse) func(ctx context.Context, req any) (util.ResponseMessage, error) {
return resp, s.sigSvc.SignResponse(util.IsStatusSupported(req), resp, err) return s.svc.AnnounceUsedSpace(ctx, req.(*container.AnnounceUsedSpaceRequest))
},
func() util.ResponseMessage {
return new(container.AnnounceUsedSpaceResponse)
},
)
if err != nil {
return nil, err
} }
resp, err := util.WrapResponse(s.svc.AnnounceUsedSpace(ctx, req))
return resp, s.sigSvc.SignResponse(util.IsStatusSupported(req), resp, err) return resp.(*container.AnnounceUsedSpaceResponse), nil
} }

View file

@ -8,7 +8,6 @@ import (
"github.com/TrueCloudLab/frostfs-api-go/v2/netmap" "github.com/TrueCloudLab/frostfs-api-go/v2/netmap"
"github.com/TrueCloudLab/frostfs-api-go/v2/refs" "github.com/TrueCloudLab/frostfs-api-go/v2/refs"
"github.com/TrueCloudLab/frostfs-node/pkg/core/version" "github.com/TrueCloudLab/frostfs-node/pkg/core/version"
"github.com/TrueCloudLab/frostfs-node/pkg/services/util/response"
netmapSDK "github.com/TrueCloudLab/frostfs-sdk-go/netmap" netmapSDK "github.com/TrueCloudLab/frostfs-sdk-go/netmap"
versionsdk "github.com/TrueCloudLab/frostfs-sdk-go/version" versionsdk "github.com/TrueCloudLab/frostfs-sdk-go/version"
) )
@ -19,8 +18,6 @@ type executorSvc struct {
state NodeState state NodeState
netInfo NetworkInfo netInfo NetworkInfo
respSvc *response.Service
} }
// NodeState encapsulates information // NodeState encapsulates information
@ -45,8 +42,8 @@ type NetworkInfo interface {
Dump(versionsdk.Version) (*netmapSDK.NetworkInfo, error) Dump(versionsdk.Version) (*netmapSDK.NetworkInfo, error)
} }
func NewExecutionService(s NodeState, v versionsdk.Version, netInfo NetworkInfo, respSvc *response.Service) Server { func NewExecutionService(s NodeState, v versionsdk.Version, netInfo NetworkInfo) Server {
if s == nil || netInfo == nil || !version.IsValid(v) || respSvc == nil { if s == nil || netInfo == nil || !version.IsValid(v) {
// this should never happen, otherwise it programmers bug // this should never happen, otherwise it programmers bug
panic("can't create netmap execution service") panic("can't create netmap execution service")
} }
@ -54,7 +51,6 @@ func NewExecutionService(s NodeState, v versionsdk.Version, netInfo NetworkInfo,
res := &executorSvc{ res := &executorSvc{
state: s, state: s,
netInfo: netInfo, netInfo: netInfo,
respSvc: respSvc,
} }
v.WriteToV2(&res.version) v.WriteToV2(&res.version)
@ -100,7 +96,6 @@ func (s *executorSvc) LocalNodeInfo(
resp := new(netmap.LocalNodeInfoResponse) resp := new(netmap.LocalNodeInfoResponse)
resp.SetBody(body) resp.SetBody(body)
s.respSvc.SetMeta(resp)
return resp, nil return resp, nil
} }
@ -131,11 +126,10 @@ func (s *executorSvc) NetworkInfo(
resp := new(netmap.NetworkInfoResponse) resp := new(netmap.NetworkInfoResponse)
resp.SetBody(body) resp.SetBody(body)
s.respSvc.SetMeta(resp)
return resp, nil return resp, nil
} }
func (s *executorSvc) Snapshot(_ context.Context, _ *netmap.SnapshotRequest) (*netmap.SnapshotResponse, error) { func (s *executorSvc) Snapshot(_ context.Context, req *netmap.SnapshotRequest) (*netmap.SnapshotResponse, error) {
var nm netmap.NetMap var nm netmap.NetMap
err := s.state.ReadCurrentNetMap(&nm) err := s.state.ReadCurrentNetMap(&nm)
@ -149,6 +143,5 @@ func (s *executorSvc) Snapshot(_ context.Context, _ *netmap.SnapshotRequest) (*n
resp := new(netmap.SnapshotResponse) resp := new(netmap.SnapshotResponse)
resp.SetBody(body) resp.SetBody(body)
s.respSvc.SetMeta(resp)
return resp, nil return resp, nil
} }

View file

@ -0,0 +1,63 @@
package netmap
import (
"context"
"github.com/TrueCloudLab/frostfs-api-go/v2/netmap"
"github.com/TrueCloudLab/frostfs-node/pkg/services/util"
"github.com/TrueCloudLab/frostfs-node/pkg/services/util/response"
)
type responseService struct {
respSvc *response.Service
svc Server
}
// NewResponseService returns netmap service instance that passes internal service
// call to response service.
func NewResponseService(nmSvc Server, respSvc *response.Service) Server {
return &responseService{
respSvc: respSvc,
svc: nmSvc,
}
}
func (s *responseService) LocalNodeInfo(ctx context.Context, req *netmap.LocalNodeInfoRequest) (*netmap.LocalNodeInfoResponse, error) {
resp, err := s.respSvc.HandleUnaryRequest(ctx, req,
func(ctx context.Context, req any) (util.ResponseMessage, error) {
return s.svc.LocalNodeInfo(ctx, req.(*netmap.LocalNodeInfoRequest))
},
)
if err != nil {
return nil, err
}
return resp.(*netmap.LocalNodeInfoResponse), nil
}
func (s *responseService) NetworkInfo(ctx context.Context, req *netmap.NetworkInfoRequest) (*netmap.NetworkInfoResponse, error) {
resp, err := s.respSvc.HandleUnaryRequest(ctx, req,
func(ctx context.Context, req any) (util.ResponseMessage, error) {
return s.svc.NetworkInfo(ctx, req.(*netmap.NetworkInfoRequest))
},
)
if err != nil {
return nil, err
}
return resp.(*netmap.NetworkInfoResponse), nil
}
func (s *responseService) Snapshot(ctx context.Context, req *netmap.SnapshotRequest) (*netmap.SnapshotResponse, error) {
resp, err := s.respSvc.HandleUnaryRequest(ctx, req,
func(ctx context.Context, req any) (util.ResponseMessage, error) {
return s.svc.Snapshot(ctx, req.(*netmap.SnapshotRequest))
},
)
if err != nil {
return nil, err
}
return resp.(*netmap.SnapshotResponse), nil
}

View file

@ -24,28 +24,49 @@ func NewSignService(key *ecdsa.PrivateKey, svc Server) Server {
func (s *signService) LocalNodeInfo( func (s *signService) LocalNodeInfo(
ctx context.Context, ctx context.Context,
req *netmap.LocalNodeInfoRequest) (*netmap.LocalNodeInfoResponse, error) { req *netmap.LocalNodeInfoRequest) (*netmap.LocalNodeInfoResponse, error) {
if err := s.sigSvc.VerifyRequest(req); err != nil { resp, err := s.sigSvc.HandleUnaryRequest(ctx, req,
resp := new(netmap.LocalNodeInfoResponse) func(ctx context.Context, req any) (util.ResponseMessage, error) {
return resp, s.sigSvc.SignResponse(util.IsStatusSupported(req), resp, err) return s.svc.LocalNodeInfo(ctx, req.(*netmap.LocalNodeInfoRequest))
},
func() util.ResponseMessage {
return new(netmap.LocalNodeInfoResponse)
},
)
if err != nil {
return nil, err
} }
resp, err := util.WrapResponse(s.svc.LocalNodeInfo(ctx, req))
return resp, s.sigSvc.SignResponse(util.IsStatusSupported(req), resp, err) return resp.(*netmap.LocalNodeInfoResponse), nil
} }
func (s *signService) NetworkInfo(ctx context.Context, req *netmap.NetworkInfoRequest) (*netmap.NetworkInfoResponse, error) { func (s *signService) NetworkInfo(ctx context.Context, req *netmap.NetworkInfoRequest) (*netmap.NetworkInfoResponse, error) {
if err := s.sigSvc.VerifyRequest(req); err != nil { resp, err := s.sigSvc.HandleUnaryRequest(ctx, req,
resp := new(netmap.NetworkInfoResponse) func(ctx context.Context, req any) (util.ResponseMessage, error) {
return resp, s.sigSvc.SignResponse(util.IsStatusSupported(req), resp, err) return s.svc.NetworkInfo(ctx, req.(*netmap.NetworkInfoRequest))
},
func() util.ResponseMessage {
return new(netmap.NetworkInfoResponse)
},
)
if err != nil {
return nil, err
} }
resp, err := util.WrapResponse(s.svc.NetworkInfo(ctx, req))
return resp, s.sigSvc.SignResponse(util.IsStatusSupported(req), resp, err) return resp.(*netmap.NetworkInfoResponse), nil
} }
func (s *signService) Snapshot(ctx context.Context, req *netmap.SnapshotRequest) (*netmap.SnapshotResponse, error) { func (s *signService) Snapshot(ctx context.Context, req *netmap.SnapshotRequest) (*netmap.SnapshotResponse, error) {
if err := s.sigSvc.VerifyRequest(req); err != nil { resp, err := s.sigSvc.HandleUnaryRequest(ctx, req,
resp := new(netmap.SnapshotResponse) func(ctx context.Context, req any) (util.ResponseMessage, error) {
return resp, s.sigSvc.SignResponse(util.IsStatusSupported(req), resp, err) return s.svc.Snapshot(ctx, req.(*netmap.SnapshotRequest))
},
func() util.ResponseMessage {
return new(netmap.SnapshotResponse)
},
)
if err != nil {
return nil, err
} }
resp, err := util.WrapResponse(s.svc.Snapshot(ctx, req))
return resp, s.sigSvc.SignResponse(util.IsStatusSupported(req), resp, err) return resp.(*netmap.SnapshotResponse), nil
} }

View file

@ -45,30 +45,31 @@ type headerSource struct {
incompleteObjectHeaders bool incompleteObjectHeaders bool
} }
func (c *cfg) initDefault() { func defaultCfg() *cfg {
c.storage = (*localStorage)(nil) return &cfg{
storage: new(localStorage),
}
} }
func NewMessageHeaderSource(opts ...Option) (eaclSDK.TypedHeaderSource, error) { func NewMessageHeaderSource(opts ...Option) (eaclSDK.TypedHeaderSource, error) {
var c cfg cfg := defaultCfg()
c.initDefault()
for i := range opts { for i := range opts {
opts[i](&c) opts[i](cfg)
} }
if c.msg == nil { if cfg.msg == nil {
return nil, errors.New("message is not provided") return nil, errors.New("message is not provided")
} }
var res headerSource var res headerSource
err := c.readObjectHeaders(&res) err := cfg.readObjectHeaders(&res)
if err != nil { if err != nil {
return nil, err return nil, err
} }
res.requestHeaders = requestHeaders(c.msg) res.requestHeaders = requestHeaders(cfg.msg)
return res, nil return res, nil
} }
@ -100,20 +101,20 @@ func requestHeaders(msg xHeaderSource) []eaclSDK.Header {
var errMissingOID = errors.New("object ID is missing") var errMissingOID = errors.New("object ID is missing")
func (c *cfg) readObjectHeaders(dst *headerSource) error { func (h *cfg) readObjectHeaders(dst *headerSource) error {
switch m := c.msg.(type) { switch m := h.msg.(type) {
default: default:
panic(fmt.Sprintf("unexpected message type %T", c.msg)) panic(fmt.Sprintf("unexpected message type %T", h.msg))
case requestXHeaderSource: case requestXHeaderSource:
switch req := m.req.(type) { switch req := m.req.(type) {
case case
*objectV2.GetRequest, *objectV2.GetRequest,
*objectV2.HeadRequest: *objectV2.HeadRequest:
if c.obj == nil { if h.obj == nil {
return errMissingOID return errMissingOID
} }
objHeaders, completed := c.localObjectHeaders(c.cnr, c.obj) objHeaders, completed := h.localObjectHeaders(h.cnr, h.obj)
dst.objectHeaders = objHeaders dst.objectHeaders = objHeaders
dst.incompleteObjectHeaders = !completed dst.incompleteObjectHeaders = !completed
@ -121,18 +122,18 @@ func (c *cfg) readObjectHeaders(dst *headerSource) error {
*objectV2.GetRangeRequest, *objectV2.GetRangeRequest,
*objectV2.GetRangeHashRequest, *objectV2.GetRangeHashRequest,
*objectV2.DeleteRequest: *objectV2.DeleteRequest:
if c.obj == nil { if h.obj == nil {
return errMissingOID return errMissingOID
} }
dst.objectHeaders = addressHeaders(c.cnr, c.obj) dst.objectHeaders = addressHeaders(h.cnr, h.obj)
case *objectV2.PutRequest: case *objectV2.PutRequest:
if v, ok := req.GetBody().GetObjectPart().(*objectV2.PutObjectPartInit); ok { if v, ok := req.GetBody().GetObjectPart().(*objectV2.PutObjectPartInit); ok {
oV2 := new(objectV2.Object) oV2 := new(objectV2.Object)
oV2.SetObjectID(v.GetObjectID()) oV2.SetObjectID(v.GetObjectID())
oV2.SetHeader(v.GetHeader()) oV2.SetHeader(v.GetHeader())
dst.objectHeaders = headersFromObject(object.NewFromV2(oV2), c.cnr, c.obj) dst.objectHeaders = headersFromObject(object.NewFromV2(oV2), h.cnr, h.obj)
} }
case *objectV2.SearchRequest: case *objectV2.SearchRequest:
cnrV2 := req.GetBody().GetContainerID() cnrV2 := req.GetBody().GetContainerID()
@ -149,7 +150,7 @@ func (c *cfg) readObjectHeaders(dst *headerSource) error {
case responseXHeaderSource: case responseXHeaderSource:
switch resp := m.resp.(type) { switch resp := m.resp.(type) {
default: default:
objectHeaders, completed := c.localObjectHeaders(c.cnr, c.obj) objectHeaders, completed := h.localObjectHeaders(h.cnr, h.obj)
dst.objectHeaders = objectHeaders dst.objectHeaders = objectHeaders
dst.incompleteObjectHeaders = !completed dst.incompleteObjectHeaders = !completed
@ -159,7 +160,7 @@ func (c *cfg) readObjectHeaders(dst *headerSource) error {
oV2.SetObjectID(v.GetObjectID()) oV2.SetObjectID(v.GetObjectID())
oV2.SetHeader(v.GetHeader()) oV2.SetHeader(v.GetHeader())
dst.objectHeaders = headersFromObject(object.NewFromV2(oV2), c.cnr, c.obj) dst.objectHeaders = headersFromObject(object.NewFromV2(oV2), h.cnr, h.obj)
} }
case *objectV2.HeadResponse: case *objectV2.HeadResponse:
oV2 := new(objectV2.Object) oV2 := new(objectV2.Object)
@ -171,7 +172,7 @@ func (c *cfg) readObjectHeaders(dst *headerSource) error {
hdr = new(objectV2.Header) hdr = new(objectV2.Header)
var idV2 refsV2.ContainerID var idV2 refsV2.ContainerID
c.cnr.WriteToV2(&idV2) h.cnr.WriteToV2(&idV2)
hdr.SetContainerID(&idV2) hdr.SetContainerID(&idV2)
hdr.SetVersion(v.GetVersion()) hdr.SetVersion(v.GetVersion())
@ -185,20 +186,20 @@ func (c *cfg) readObjectHeaders(dst *headerSource) error {
oV2.SetHeader(hdr) oV2.SetHeader(hdr)
dst.objectHeaders = headersFromObject(object.NewFromV2(oV2), c.cnr, c.obj) dst.objectHeaders = headersFromObject(object.NewFromV2(oV2), h.cnr, h.obj)
} }
} }
return nil return nil
} }
func (c *cfg) localObjectHeaders(cnr cid.ID, idObj *oid.ID) ([]eaclSDK.Header, bool) { func (h *cfg) localObjectHeaders(cnr cid.ID, idObj *oid.ID) ([]eaclSDK.Header, bool) {
if idObj != nil { if idObj != nil {
var addr oid.Address var addr oid.Address
addr.SetContainer(cnr) addr.SetContainer(cnr)
addr.SetObject(*idObj) addr.SetObject(*idObj)
obj, err := c.storage.Head(addr) obj, err := h.storage.Head(addr)
if err == nil { if err == nil {
return headersFromObject(obj, cnr, idObj), true return headersFromObject(obj, cnr, idObj), true
} }

View file

@ -8,12 +8,14 @@ import (
oid "github.com/TrueCloudLab/frostfs-sdk-go/object/id" oid "github.com/TrueCloudLab/frostfs-sdk-go/object/id"
) )
type localStorage engine.StorageEngine type localStorage struct {
ls *engine.StorageEngine
}
func (s *localStorage) Head(addr oid.Address) (*objectSDK.Object, error) { func (s *localStorage) Head(addr oid.Address) (*objectSDK.Object, error) {
if s == nil { if s.ls == nil {
return nil, io.ErrUnexpectedEOF return nil, io.ErrUnexpectedEOF
} }
return engine.Head((*engine.StorageEngine)(s), addr) return engine.Head(s.ls, addr)
} }

View file

@ -14,7 +14,9 @@ func WithObjectStorage(v ObjectStorage) Option {
func WithLocalObjectStorage(v *engine.StorageEngine) Option { func WithLocalObjectStorage(v *engine.StorageEngine) Option {
return func(c *cfg) { return func(c *cfg) {
c.storage = (*localStorage)(v) c.storage = &localStorage{
ls: v,
}
} }
} }

View file

@ -21,7 +21,7 @@ import (
// Service checks basic ACL rules. // Service checks basic ACL rules.
type Service struct { type Service struct {
cfg *cfg
c senderClassifier c senderClassifier
} }
@ -72,17 +72,18 @@ type cfg struct {
next object.ServiceServer next object.ServiceServer
} }
func (c *cfg) initDefault() { func defaultCfg() *cfg {
c.log = &logger.Logger{Logger: zap.L()} return &cfg{
log: &logger.Logger{Logger: zap.L()},
}
} }
// New is a constructor for object ACL checking service. // New is a constructor for object ACL checking service.
func New(opts ...Option) *Service { func New(opts ...Option) Service {
var s Service cfg := defaultCfg()
s.cfg.initDefault()
for i := range opts { for i := range opts {
opts[i](&s.cfg) opts[i](cfg)
} }
panicOnNil := func(v any, name string) { panicOnNil := func(v any, name string) {
@ -91,18 +92,20 @@ func New(opts ...Option) *Service {
} }
} }
panicOnNil(s.cfg.next, "next Service") panicOnNil(cfg.next, "next Service")
panicOnNil(s.cfg.nm, "netmap client") panicOnNil(cfg.nm, "netmap client")
panicOnNil(s.cfg.irFetcher, "inner Ring fetcher") panicOnNil(cfg.irFetcher, "inner Ring fetcher")
panicOnNil(s.cfg.checker, "acl checker") panicOnNil(cfg.checker, "acl checker")
panicOnNil(s.cfg.containers, "container source") panicOnNil(cfg.containers, "container source")
s.c = senderClassifier{ return Service{
log: s.cfg.log, cfg: cfg,
innerRing: s.cfg.irFetcher, c: senderClassifier{
netmap: s.cfg.nm, log: cfg.log,
innerRing: cfg.irFetcher,
netmap: cfg.nm,
},
} }
return &s
} }
// Get implements ServiceServer interface, makes ACL checks and calls // Get implements ServiceServer interface, makes ACL checks and calls

View file

@ -5,7 +5,9 @@ import (
"strconv" "strconv"
objectV2 "github.com/TrueCloudLab/frostfs-api-go/v2/object" objectV2 "github.com/TrueCloudLab/frostfs-api-go/v2/object"
"github.com/TrueCloudLab/frostfs-node/pkg/services/object/util"
"github.com/TrueCloudLab/frostfs-node/pkg/util/logger" "github.com/TrueCloudLab/frostfs-node/pkg/util/logger"
cid "github.com/TrueCloudLab/frostfs-sdk-go/container/id"
"github.com/TrueCloudLab/frostfs-sdk-go/object" "github.com/TrueCloudLab/frostfs-sdk-go/object"
oid "github.com/TrueCloudLab/frostfs-sdk-go/object/id" oid "github.com/TrueCloudLab/frostfs-sdk-go/object/id"
"go.uber.org/zap" "go.uber.org/zap"
@ -42,17 +44,37 @@ const (
func (exec *execCtx) setLogger(l *logger.Logger) { func (exec *execCtx) setLogger(l *logger.Logger) {
exec.log = &logger.Logger{Logger: l.With( exec.log = &logger.Logger{Logger: l.With(
zap.String("request", "DELETE"), zap.String("request", "DELETE"),
zap.Stringer("address", exec.prm.addr), zap.Stringer("address", exec.address()),
zap.Bool("local", exec.prm.common.LocalOnly()), zap.Bool("local", exec.isLocal()),
zap.Bool("with session", exec.prm.common.SessionToken() != nil), zap.Bool("with session", exec.prm.common.SessionToken() != nil),
zap.Bool("with bearer", exec.prm.common.BearerToken() != nil), zap.Bool("with bearer", exec.prm.common.BearerToken() != nil),
)} )}
} }
func (exec execCtx) context() context.Context {
return exec.ctx
}
func (exec execCtx) isLocal() bool {
return exec.prm.common.LocalOnly()
}
func (exec *execCtx) address() oid.Address {
return exec.prm.addr
}
func (exec *execCtx) containerID() cid.ID {
return exec.prm.addr.Container()
}
func (exec *execCtx) commonParameters() *util.CommonPrm {
return exec.prm.common
}
func (exec *execCtx) newAddress(id oid.ID) oid.Address { func (exec *execCtx) newAddress(id oid.ID) oid.Address {
var a oid.Address var a oid.Address
a.SetObject(id) a.SetObject(id)
a.SetContainer(exec.prm.addr.Container()) a.SetContainer(exec.containerID())
return a return a
} }
@ -218,11 +240,11 @@ func (exec *execCtx) initTombstoneObject() bool {
} }
exec.tombstoneObj = object.New() exec.tombstoneObj = object.New()
exec.tombstoneObj.SetContainerID(exec.prm.addr.Container()) exec.tombstoneObj.SetContainerID(exec.containerID())
exec.tombstoneObj.SetType(object.TypeTombstone) exec.tombstoneObj.SetType(object.TypeTombstone)
exec.tombstoneObj.SetPayload(payload) exec.tombstoneObj.SetPayload(payload)
tokenSession := exec.prm.common.SessionToken() tokenSession := exec.commonParameters().SessionToken()
if tokenSession != nil { if tokenSession != nil {
issuer := tokenSession.Issuer() issuer := tokenSession.Issuer()
exec.tombstoneObj.SetOwnerID(&issuer) exec.tombstoneObj.SetOwnerID(&issuer)

View file

@ -36,7 +36,7 @@ func (exec *execCtx) formTombstone() (ok bool) {
exec.tombstone.SetExpirationEpoch( exec.tombstone.SetExpirationEpoch(
exec.svc.netInfo.CurrentEpoch() + tsLifetime, exec.svc.netInfo.CurrentEpoch() + tsLifetime,
) )
exec.addMembers([]oid.ID{exec.prm.addr.Object()}) exec.addMembers([]oid.ID{exec.address().Object()})
exec.log.Debug("forming split info...") exec.log.Debug("forming split info...")

View file

@ -15,7 +15,7 @@ import (
// Service utility serving requests of Object.Get service. // Service utility serving requests of Object.Get service.
type Service struct { type Service struct {
cfg *cfg
} }
// Option is a Service's constructor option. // Option is a Service's constructor option.
@ -60,21 +60,24 @@ type cfg struct {
keyStorage *util.KeyStorage keyStorage *util.KeyStorage
} }
func (c *cfg) initDefault() { func defaultCfg() *cfg {
c.log = &logger.Logger{Logger: zap.L()} return &cfg{
log: &logger.Logger{Logger: zap.L()},
}
} }
// New creates, initializes and returns utility serving // New creates, initializes and returns utility serving
// Object.Get service requests. // Object.Get service requests.
func New(opts ...Option) *Service { func New(opts ...Option) *Service {
var s Service c := defaultCfg()
s.cfg.initDefault()
for i := range opts { for i := range opts {
opts[i](&s.cfg) opts[i](c)
} }
return &s return &Service{
cfg: c,
}
} }
// WithLogger returns option to specify Delete service's logger. // WithLogger returns option to specify Delete service's logger.

View file

@ -24,12 +24,12 @@ func (w *headSvcWrapper) headAddress(exec *execCtx, addr oid.Address) (*object.O
wr := getsvc.NewSimpleObjectWriter() wr := getsvc.NewSimpleObjectWriter()
p := getsvc.HeadPrm{} p := getsvc.HeadPrm{}
p.SetCommonParameters(exec.prm.common) p.SetCommonParameters(exec.commonParameters())
p.SetHeaderWriter(wr) p.SetHeaderWriter(wr)
p.WithRawFlag(true) p.WithRawFlag(true)
p.WithAddress(addr) p.WithAddress(addr)
err := (*getsvc.Service)(w).Head(exec.ctx, p) err := (*getsvc.Service)(w).Head(exec.context(), p)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -38,7 +38,7 @@ func (w *headSvcWrapper) headAddress(exec *execCtx, addr oid.Address) (*object.O
} }
func (w *headSvcWrapper) splitInfo(exec *execCtx) (*object.SplitInfo, error) { func (w *headSvcWrapper) splitInfo(exec *execCtx) (*object.SplitInfo, error) {
_, err := w.headAddress(exec, exec.prm.addr) _, err := w.headAddress(exec, exec.address())
var errSplitInfo *object.SplitInfoError var errSplitInfo *object.SplitInfoError
@ -89,11 +89,11 @@ func (w *searchSvcWrapper) splitMembers(exec *execCtx) ([]oid.ID, error) {
p := searchsvc.Prm{} p := searchsvc.Prm{}
p.SetWriter(wr) p.SetWriter(wr)
p.SetCommonParameters(exec.prm.common) p.SetCommonParameters(exec.commonParameters())
p.WithContainerID(exec.prm.addr.Container()) p.WithContainerID(exec.containerID())
p.WithSearchFilters(fs) p.WithSearchFilters(fs)
err := (*searchsvc.Service)(w).Search(exec.ctx, p) err := (*searchsvc.Service)(w).Search(exec.context(), p)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -108,7 +108,7 @@ func (s *simpleIDWriter) WriteIDs(ids []oid.ID) error {
} }
func (w *putSvcWrapper) put(exec *execCtx) (*oid.ID, error) { func (w *putSvcWrapper) put(exec *execCtx) (*oid.ID, error) {
streamer, err := (*putsvc.Service)(w).Put(exec.ctx) streamer, err := (*putsvc.Service)(w).Put(exec.context())
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -116,7 +116,7 @@ func (w *putSvcWrapper) put(exec *execCtx) (*oid.ID, error) {
payload := exec.tombstoneObj.Payload() payload := exec.tombstoneObj.Payload()
initPrm := new(putsvc.PutInitPrm). initPrm := new(putsvc.PutInitPrm).
WithCommonPrm(exec.prm.common). WithCommonPrm(exec.commonParameters()).
WithObject(exec.tombstoneObj.CutPayload()) WithObject(exec.tombstoneObj.CutPayload())
err = streamer.Init(initPrm) err = streamer.Init(initPrm)

View file

@ -9,7 +9,7 @@ import (
// Service implements Delete operation of Object service v2. // Service implements Delete operation of Object service v2.
type Service struct { type Service struct {
cfg *cfg
} }
// Option represents Service constructor option. // Option represents Service constructor option.
@ -21,13 +21,15 @@ type cfg struct {
// NewService constructs Service instance from provided options. // NewService constructs Service instance from provided options.
func NewService(opts ...Option) *Service { func NewService(opts ...Option) *Service {
var s Service c := new(cfg)
for i := range opts { for i := range opts {
opts[i](&s.cfg) opts[i](c)
} }
return &s return &Service{
cfg: c,
}
} }
// Delete calls internal service. // Delete calls internal service.

View file

@ -33,7 +33,7 @@ func (exec *execCtx) assemble() {
exec.log.Debug("trying to assemble the object...") exec.log.Debug("trying to assemble the object...")
splitInfo := exec.splitInfo splitInfo := exec.splitInfo()
childID, ok := splitInfo.Link() childID, ok := splitInfo.Link()
if !ok { if !ok {
@ -47,7 +47,7 @@ func (exec *execCtx) assemble() {
prev, children := exec.initFromChild(childID) prev, children := exec.initFromChild(childID)
if len(children) > 0 { if len(children) > 0 {
if exec.prm.rng == nil { if exec.ctxRange() == nil {
if ok := exec.writeCollectedHeader(); ok { if ok := exec.writeCollectedHeader(); ok {
exec.overtakePayloadDirectly(children, nil, true) exec.overtakePayloadDirectly(children, nil, true)
} }
@ -100,7 +100,7 @@ func (exec *execCtx) initFromChild(obj oid.ID) (prev *oid.ID, children []oid.ID)
var payload []byte var payload []byte
if rng := exec.prm.rng; rng != nil { if rng := exec.ctxRange(); rng != nil {
seekOff := rng.GetOffset() seekOff := rng.GetOffset()
seekLen := rng.GetLength() seekLen := rng.GetLength()
seekTo := seekOff + seekLen seekTo := seekOff + seekLen
@ -146,7 +146,7 @@ func (exec *execCtx) initFromChild(obj oid.ID) (prev *oid.ID, children []oid.ID)
} }
func (exec *execCtx) overtakePayloadDirectly(children []oid.ID, rngs []objectSDK.Range, checkRight bool) { func (exec *execCtx) overtakePayloadDirectly(children []oid.ID, rngs []objectSDK.Range, checkRight bool) {
withRng := len(rngs) > 0 && exec.prm.rng != nil withRng := len(rngs) > 0 && exec.ctxRange() != nil
for i := range children { for i := range children {
var r *objectSDK.Range var r *objectSDK.Range
@ -194,7 +194,7 @@ func (exec *execCtx) buildChainInReverse(prev oid.ID) ([]oid.ID, []objectSDK.Ran
var ( var (
chain = make([]oid.ID, 0) chain = make([]oid.ID, 0)
rngs = make([]objectSDK.Range, 0) rngs = make([]objectSDK.Range, 0)
seekRng = exec.prm.rng seekRng = exec.ctxRange()
from = seekRng.GetOffset() from = seekRng.GetOffset()
to = from + seekRng.GetLength() to = from + seekRng.GetLength()

View file

@ -8,12 +8,12 @@ import (
) )
func (exec *execCtx) executeOnContainer() { func (exec *execCtx) executeOnContainer() {
if exec.prm.common.LocalOnly() { if exec.isLocal() {
exec.log.Debug("return result directly") exec.log.Debug("return result directly")
return return
} }
lookupDepth := exec.prm.common.NetmapLookupDepth() lookupDepth := exec.netmapLookupDepth()
exec.log.Debug("trying to execute in container...", exec.log.Debug("trying to execute in container...",
zap.Uint64("netmap lookup depth", lookupDepth), zap.Uint64("netmap lookup depth", lookupDepth),
@ -47,12 +47,12 @@ func (exec *execCtx) processCurrentEpoch() bool {
zap.Uint64("number", exec.curProcEpoch), zap.Uint64("number", exec.curProcEpoch),
) )
traverser, ok := exec.generateTraverser(exec.prm.addr) traverser, ok := exec.generateTraverser(exec.address())
if !ok { if !ok {
return true return true
} }
ctx, cancel := context.WithCancel(exec.ctx) ctx, cancel := context.WithCancel(exec.context())
defer cancel() defer cancel()
exec.status = statusUndefined exec.status = statusUndefined

View file

@ -10,6 +10,7 @@ import (
"github.com/TrueCloudLab/frostfs-node/pkg/services/object/util" "github.com/TrueCloudLab/frostfs-node/pkg/services/object/util"
"github.com/TrueCloudLab/frostfs-node/pkg/services/object_manager/placement" "github.com/TrueCloudLab/frostfs-node/pkg/services/object_manager/placement"
"github.com/TrueCloudLab/frostfs-node/pkg/util/logger" "github.com/TrueCloudLab/frostfs-node/pkg/util/logger"
cid "github.com/TrueCloudLab/frostfs-sdk-go/container/id"
objectSDK "github.com/TrueCloudLab/frostfs-sdk-go/object" objectSDK "github.com/TrueCloudLab/frostfs-sdk-go/object"
oid "github.com/TrueCloudLab/frostfs-sdk-go/object/id" oid "github.com/TrueCloudLab/frostfs-sdk-go/object/id"
"go.uber.org/zap" "go.uber.org/zap"
@ -29,7 +30,7 @@ type execCtx struct {
statusError statusError
splitInfo *objectSDK.SplitInfo infoSplit *objectSDK.SplitInfo
log *logger.Logger log *logger.Logger
@ -37,7 +38,7 @@ type execCtx struct {
curOff uint64 curOff uint64
headOnly bool head bool
curProcEpoch uint64 curProcEpoch uint64
} }
@ -54,7 +55,7 @@ const (
func headOnly() execOption { func headOnly() execOption {
return func(c *execCtx) { return func(c *execCtx) {
c.headOnly = true c.head = true
} }
} }
@ -66,29 +67,45 @@ func withPayloadRange(r *objectSDK.Range) execOption {
func (exec *execCtx) setLogger(l *logger.Logger) { func (exec *execCtx) setLogger(l *logger.Logger) {
req := "GET" req := "GET"
if exec.headOnly { if exec.headOnly() {
req = "HEAD" req = "HEAD"
} else if exec.prm.rng != nil { } else if exec.ctxRange() != nil {
req = "GET_RANGE" req = "GET_RANGE"
} }
exec.log = &logger.Logger{Logger: l.With( exec.log = &logger.Logger{Logger: l.With(
zap.String("request", req), zap.String("request", req),
zap.Stringer("address", exec.prm.addr), zap.Stringer("address", exec.address()),
zap.Bool("raw", exec.prm.raw), zap.Bool("raw", exec.isRaw()),
zap.Bool("local", exec.prm.common.LocalOnly()), zap.Bool("local", exec.isLocal()),
zap.Bool("with session", exec.prm.common.SessionToken() != nil), zap.Bool("with session", exec.prm.common.SessionToken() != nil),
zap.Bool("with bearer", exec.prm.common.BearerToken() != nil), zap.Bool("with bearer", exec.prm.common.BearerToken() != nil),
)} )}
} }
func (exec execCtx) context() context.Context {
return exec.ctx
}
func (exec execCtx) isLocal() bool {
return exec.prm.common.LocalOnly()
}
func (exec execCtx) isRaw() bool {
return exec.prm.raw
}
func (exec execCtx) address() oid.Address {
return exec.prm.addr
}
// isChild checks if reading object is a parent of the given object. // isChild checks if reading object is a parent of the given object.
// Object without reference to the parent (only children with the parent header // Object without reference to the parent (only children with the parent header
// have it) is automatically considered as child: this should be guaranteed by // have it) is automatically considered as child: this should be guaranteed by
// upper level logic. // upper level logic.
func (exec execCtx) isChild(obj *objectSDK.Object) bool { func (exec execCtx) isChild(obj *objectSDK.Object) bool {
par := obj.Parent() par := obj.Parent()
return par == nil || equalAddresses(exec.prm.addr, object.AddressOf(par)) return par == nil || equalAddresses(exec.address(), object.AddressOf(par))
} }
func (exec execCtx) key() (*ecdsa.PrivateKey, error) { func (exec execCtx) key() (*ecdsa.PrivateKey, error) {
@ -111,16 +128,40 @@ func (exec execCtx) key() (*ecdsa.PrivateKey, error) {
} }
func (exec *execCtx) canAssemble() bool { func (exec *execCtx) canAssemble() bool {
return exec.svc.assembly && !exec.prm.raw && !exec.headOnly && !exec.prm.common.LocalOnly() return exec.svc.assembly && !exec.isRaw() && !exec.headOnly() && !exec.isLocal()
}
func (exec *execCtx) splitInfo() *objectSDK.SplitInfo {
return exec.infoSplit
}
func (exec *execCtx) containerID() cid.ID {
return exec.address().Container()
}
func (exec *execCtx) ctxRange() *objectSDK.Range {
return exec.prm.rng
}
func (exec *execCtx) headOnly() bool {
return exec.head
}
func (exec *execCtx) netmapEpoch() uint64 {
return exec.prm.common.NetmapEpoch()
}
func (exec *execCtx) netmapLookupDepth() uint64 {
return exec.prm.common.NetmapLookupDepth()
} }
func (exec *execCtx) initEpoch() bool { func (exec *execCtx) initEpoch() bool {
exec.curProcEpoch = exec.prm.common.NetmapEpoch() exec.curProcEpoch = exec.netmapEpoch()
if exec.curProcEpoch > 0 { if exec.curProcEpoch > 0 {
return true return true
} }
e, err := exec.svc.epochSource.Epoch() e, err := exec.svc.currentEpochReceiver.currentEpoch()
switch { switch {
default: default:
@ -166,10 +207,10 @@ func (exec *execCtx) getChild(id oid.ID, rng *objectSDK.Range, withHdr bool) (*o
p.objWriter = w p.objWriter = w
p.SetRange(rng) p.SetRange(rng)
p.addr.SetContainer(exec.prm.addr.Container()) p.addr.SetContainer(exec.containerID())
p.addr.SetObject(id) p.addr.SetObject(id)
exec.statusError = exec.svc.get(exec.ctx, p.commonPrm, withPayloadRange(rng)) exec.statusError = exec.svc.get(exec.context(), p.commonPrm, withPayloadRange(rng))
child := w.Object() child := w.Object()
ok := exec.status == statusOK ok := exec.status == statusOK
@ -187,7 +228,7 @@ func (exec *execCtx) getChild(id oid.ID, rng *objectSDK.Range, withHdr bool) (*o
func (exec *execCtx) headChild(id oid.ID) (*objectSDK.Object, bool) { func (exec *execCtx) headChild(id oid.ID) (*objectSDK.Object, bool) {
p := exec.prm p := exec.prm
p.common = p.common.WithLocalOnly(false) p.common = p.common.WithLocalOnly(false)
p.addr.SetContainer(exec.prm.addr.Container()) p.addr.SetContainer(exec.containerID())
p.addr.SetObject(id) p.addr.SetObject(id)
prm := HeadPrm{ prm := HeadPrm{
@ -197,7 +238,7 @@ func (exec *execCtx) headChild(id oid.ID) (*objectSDK.Object, bool) {
w := NewSimpleObjectWriter() w := NewSimpleObjectWriter()
prm.SetHeaderWriter(w) prm.SetHeaderWriter(w)
err := exec.svc.Head(exec.ctx, prm) err := exec.svc.Head(exec.context(), prm)
switch { switch {
default: default:
@ -257,7 +298,7 @@ func mergeSplitInfo(dst, src *objectSDK.SplitInfo) {
} }
func (exec *execCtx) writeCollectedHeader() bool { func (exec *execCtx) writeCollectedHeader() bool {
if exec.prm.rng != nil { if exec.ctxRange() != nil {
return true return true
} }
@ -282,7 +323,7 @@ func (exec *execCtx) writeCollectedHeader() bool {
} }
func (exec *execCtx) writeObjectPayload(obj *objectSDK.Object) bool { func (exec *execCtx) writeObjectPayload(obj *objectSDK.Object) bool {
if exec.headOnly { if exec.headOnly() {
return true return true
} }
@ -310,6 +351,12 @@ func (exec *execCtx) writeCollectedObject() {
} }
} }
// isForwardingEnabled returns true if common execution
// parameters has request forwarding closure set.
func (exec execCtx) isForwardingEnabled() bool {
return exec.prm.forwarder != nil
}
// disableForwarding removes request forwarding closure from common // disableForwarding removes request forwarding closure from common
// parameters, so it won't be inherited in new execution contexts. // parameters, so it won't be inherited in new execution contexts.
func (exec *execCtx) disableForwarding() { func (exec *execCtx) disableForwarding() {

View file

@ -69,7 +69,7 @@ func (s *Service) get(ctx context.Context, prm commonPrm, opts ...execOption) st
prm: RangePrm{ prm: RangePrm{
commonPrm: prm, commonPrm: prm,
}, },
splitInfo: object.NewSplitInfo(), infoSplit: object.NewSplitInfo(),
} }
for i := range opts { for i := range opts {

View file

@ -56,7 +56,7 @@ type testClient struct {
type testEpochReceiver uint64 type testEpochReceiver uint64
func (e testEpochReceiver) Epoch() (uint64, error) { func (e testEpochReceiver) currentEpoch() (uint64, error) {
return uint64(e), nil return uint64(e), nil
} }
@ -118,7 +118,7 @@ func newTestClient() *testClient {
} }
func (c *testClient) getObject(exec *execCtx, _ client.NodeInfo) (*objectSDK.Object, error) { func (c *testClient) getObject(exec *execCtx, _ client.NodeInfo) (*objectSDK.Object, error) {
v, ok := c.results[exec.prm.addr.EncodeToString()] v, ok := c.results[exec.address().EncodeToString()]
if !ok { if !ok {
var errNotFound apistatus.ObjectNotFound var errNotFound apistatus.ObjectNotFound
@ -129,7 +129,7 @@ func (c *testClient) getObject(exec *execCtx, _ client.NodeInfo) (*objectSDK.Obj
return nil, v.err return nil, v.err
} }
return cutToRange(v.obj, exec.prm.rng), nil return cutToRange(v.obj, exec.ctxRange()), nil
} }
func (c *testClient) addResult(addr oid.Address, obj *objectSDK.Object, err error) { func (c *testClient) addResult(addr oid.Address, obj *objectSDK.Object, err error) {
@ -143,7 +143,7 @@ func (s *testStorage) get(exec *execCtx) (*objectSDK.Object, error) {
var ( var (
ok bool ok bool
obj *objectSDK.Object obj *objectSDK.Object
sAddr = exec.prm.addr.EncodeToString() sAddr = exec.address().EncodeToString()
) )
if _, ok = s.inhumed[sAddr]; ok { if _, ok = s.inhumed[sAddr]; ok {
@ -157,7 +157,7 @@ func (s *testStorage) get(exec *execCtx) (*objectSDK.Object, error) {
} }
if obj, ok = s.phy[sAddr]; ok { if obj, ok = s.phy[sAddr]; ok {
return cutToRange(obj, exec.prm.rng), nil return cutToRange(obj, exec.ctxRange()), nil
} }
var errNotFound apistatus.ObjectNotFound var errNotFound apistatus.ObjectNotFound
@ -211,7 +211,7 @@ func TestGetLocalOnly(t *testing.T) {
ctx := context.Background() ctx := context.Background()
newSvc := func(storage *testStorage) *Service { newSvc := func(storage *testStorage) *Service {
svc := &Service{} svc := &Service{cfg: new(cfg)}
svc.log = test.NewLogger(false) svc.log = test.NewLogger(false)
svc.localStorage = storage svc.localStorage = storage
svc.assembly = true svc.assembly = true
@ -473,7 +473,7 @@ func TestGetRemoteSmall(t *testing.T) {
container.CalculateID(&idCnr, cnr) container.CalculateID(&idCnr, cnr)
newSvc := func(b *testPlacementBuilder, c *testClientCache) *Service { newSvc := func(b *testPlacementBuilder, c *testClientCache) *Service {
svc := &Service{} svc := &Service{cfg: new(cfg)}
svc.log = test.NewLogger(false) svc.log = test.NewLogger(false)
svc.localStorage = newTestStorage() svc.localStorage = newTestStorage()
svc.assembly = true svc.assembly = true
@ -487,7 +487,7 @@ func TestGetRemoteSmall(t *testing.T) {
}, },
} }
svc.clientCache = c svc.clientCache = c
svc.epochSource = testEpochReceiver(curEpoch) svc.currentEpochReceiver = testEpochReceiver(curEpoch)
return svc return svc
} }
@ -1150,7 +1150,7 @@ func TestGetFromPastEpoch(t *testing.T) {
c22 := newTestClient() c22 := newTestClient()
c22.addResult(addr, obj, nil) c22.addResult(addr, obj, nil)
svc := &Service{} svc := &Service{cfg: new(cfg)}
svc.log = test.NewLogger(false) svc.log = test.NewLogger(false)
svc.localStorage = newTestStorage() svc.localStorage = newTestStorage()
svc.assembly = true svc.assembly = true
@ -1182,7 +1182,7 @@ func TestGetFromPastEpoch(t *testing.T) {
}, },
} }
svc.epochSource = testEpochReceiver(curEpoch) svc.currentEpochReceiver = testEpochReceiver(curEpoch)
w := NewSimpleObjectWriter() w := NewSimpleObjectWriter()

View file

@ -34,8 +34,8 @@ func (exec *execCtx) executeLocal() {
exec.err = errRemoved exec.err = errRemoved
case errors.As(err, &errSplitInfo): case errors.As(err, &errSplitInfo):
exec.status = statusVIRTUAL exec.status = statusVIRTUAL
mergeSplitInfo(exec.splitInfo, errSplitInfo.SplitInfo()) mergeSplitInfo(exec.splitInfo(), errSplitInfo.SplitInfo())
exec.err = objectSDK.NewSplitInfoError(exec.splitInfo) exec.err = objectSDK.NewSplitInfoError(exec.infoSplit)
case errors.As(err, &errOutOfRange): case errors.As(err, &errOutOfRange):
exec.status = statusOutOfRange exec.status = statusOutOfRange
exec.err = errOutOfRange exec.err = errOutOfRange

View file

@ -53,8 +53,8 @@ func (exec *execCtx) processNode(ctx context.Context, info client.NodeInfo) bool
exec.err = errOutOfRange exec.err = errOutOfRange
case errors.As(err, &errSplitInfo): case errors.As(err, &errSplitInfo):
exec.status = statusVIRTUAL exec.status = statusVIRTUAL
mergeSplitInfo(exec.splitInfo, errSplitInfo.SplitInfo()) mergeSplitInfo(exec.splitInfo(), errSplitInfo.SplitInfo())
exec.err = objectSDK.NewSplitInfoError(exec.splitInfo) exec.err = objectSDK.NewSplitInfoError(exec.infoSplit)
} }
return exec.status != statusUndefined return exec.status != statusUndefined

View file

@ -15,7 +15,7 @@ import (
// Service utility serving requests of Object.Get service. // Service utility serving requests of Object.Get service.
type Service struct { type Service struct {
cfg *cfg
} }
// Option is a Service's constructor option. // Option is a Service's constructor option.
@ -42,32 +42,34 @@ type cfg struct {
GenerateTraverser(cid.ID, *oid.ID, uint64) (*placement.Traverser, error) GenerateTraverser(cid.ID, *oid.ID, uint64) (*placement.Traverser, error)
} }
epochSource epochSource currentEpochReceiver interface {
currentEpoch() (uint64, error)
}
keyStore *util.KeyStorage keyStore *util.KeyStorage
} }
type epochSource interface { func defaultCfg() *cfg {
Epoch() (uint64, error) return &cfg{
assembly: true,
log: &logger.Logger{Logger: zap.L()},
localStorage: new(storageEngineWrapper),
clientCache: new(clientCacheWrapper),
} }
func (c *cfg) initDefault() {
c.log = &logger.Logger{Logger: zap.L()}
c.assembly = true
c.clientCache = new(clientCacheWrapper)
} }
// New creates, initializes and returns utility serving // New creates, initializes and returns utility serving
// Object.Get service requests. // Object.Get service requests.
func New(opts ...Option) *Service { func New(opts ...Option) *Service {
var s Service c := defaultCfg()
s.cfg.initDefault()
for i := range opts { for i := range opts {
opts[i](&s.cfg) opts[i](c)
} }
return &s return &Service{
cfg: c,
}
} }
// WithLogger returns option to specify Get service's logger. // WithLogger returns option to specify Get service's logger.
@ -88,7 +90,7 @@ func WithoutAssembly() Option {
// instance. // instance.
func WithLocalStorageEngine(e *engine.StorageEngine) Option { func WithLocalStorageEngine(e *engine.StorageEngine) Option {
return func(c *cfg) { return func(c *cfg) {
c.localStorage = (*storageEngineWrapper)(e) c.localStorage.(*storageEngineWrapper).engine = e
} }
} }
@ -115,7 +117,9 @@ func WithTraverserGenerator(t *util.TraverserGenerator) Option {
// map storage to receive current network state. // map storage to receive current network state.
func WithNetMapSource(nmSrc netmap.Source) Option { func WithNetMapSource(nmSrc netmap.Source) Option {
return func(c *cfg) { return func(c *cfg) {
c.epochSource = nmSrc c.currentEpochReceiver = &nmSrcWrapper{
nmSrc: nmSrc,
}
} }
} }

View file

@ -6,6 +6,7 @@ import (
"io" "io"
coreclient "github.com/TrueCloudLab/frostfs-node/pkg/core/client" coreclient "github.com/TrueCloudLab/frostfs-node/pkg/core/client"
"github.com/TrueCloudLab/frostfs-node/pkg/core/netmap"
"github.com/TrueCloudLab/frostfs-node/pkg/local_object_storage/engine" "github.com/TrueCloudLab/frostfs-node/pkg/local_object_storage/engine"
internal "github.com/TrueCloudLab/frostfs-node/pkg/services/object/internal/client" internal "github.com/TrueCloudLab/frostfs-node/pkg/services/object/internal/client"
internalclient "github.com/TrueCloudLab/frostfs-node/pkg/services/object/internal/client" internalclient "github.com/TrueCloudLab/frostfs-node/pkg/services/object/internal/client"
@ -27,7 +28,9 @@ type clientWrapper struct {
client coreclient.MultiAddressClient client coreclient.MultiAddressClient
} }
type storageEngineWrapper engine.StorageEngine type storageEngineWrapper struct {
engine *engine.StorageEngine
}
type partWriter struct { type partWriter struct {
ObjectWriter ObjectWriter
@ -41,6 +44,10 @@ type hasherWrapper struct {
hash io.Writer hash io.Writer
} }
type nmSrcWrapper struct {
nmSrc netmap.Source
}
func NewSimpleObjectWriter() *SimpleObjectWriter { func NewSimpleObjectWriter() *SimpleObjectWriter {
return &SimpleObjectWriter{ return &SimpleObjectWriter{
obj: object.New(), obj: object.New(),
@ -80,7 +87,7 @@ func (c *clientCacheWrapper) get(info coreclient.NodeInfo) (getClient, error) {
} }
func (c *clientWrapper) getObject(exec *execCtx, info coreclient.NodeInfo) (*object.Object, error) { func (c *clientWrapper) getObject(exec *execCtx, info coreclient.NodeInfo) (*object.Object, error) {
if exec.prm.forwarder != nil { if exec.isForwardingEnabled() {
return exec.prm.forwarder(info, c.client) return exec.prm.forwarder(info, c.client)
} }
@ -89,20 +96,20 @@ func (c *clientWrapper) getObject(exec *execCtx, info coreclient.NodeInfo) (*obj
return nil, err return nil, err
} }
if exec.headOnly { if exec.headOnly() {
var prm internalclient.HeadObjectPrm var prm internalclient.HeadObjectPrm
prm.SetContext(exec.ctx) prm.SetContext(exec.context())
prm.SetClient(c.client) prm.SetClient(c.client)
prm.SetTTL(exec.prm.common.TTL()) prm.SetTTL(exec.prm.common.TTL())
prm.SetNetmapEpoch(exec.curProcEpoch) prm.SetNetmapEpoch(exec.curProcEpoch)
prm.SetAddress(exec.prm.addr) prm.SetAddress(exec.address())
prm.SetPrivateKey(key) prm.SetPrivateKey(key)
prm.SetSessionToken(exec.prm.common.SessionToken()) prm.SetSessionToken(exec.prm.common.SessionToken())
prm.SetBearerToken(exec.prm.common.BearerToken()) prm.SetBearerToken(exec.prm.common.BearerToken())
prm.SetXHeaders(exec.prm.common.XHeaders()) prm.SetXHeaders(exec.prm.common.XHeaders())
if exec.prm.raw { if exec.isRaw() {
prm.SetRawFlag() prm.SetRawFlag()
} }
@ -115,21 +122,21 @@ func (c *clientWrapper) getObject(exec *execCtx, info coreclient.NodeInfo) (*obj
} }
// we don't specify payload writer because we accumulate // we don't specify payload writer because we accumulate
// the object locally (even huge). // the object locally (even huge).
if rng := exec.prm.rng; rng != nil { if rng := exec.ctxRange(); rng != nil {
var prm internalclient.PayloadRangePrm var prm internalclient.PayloadRangePrm
prm.SetContext(exec.ctx) prm.SetContext(exec.context())
prm.SetClient(c.client) prm.SetClient(c.client)
prm.SetTTL(exec.prm.common.TTL()) prm.SetTTL(exec.prm.common.TTL())
prm.SetNetmapEpoch(exec.curProcEpoch) prm.SetNetmapEpoch(exec.curProcEpoch)
prm.SetAddress(exec.prm.addr) prm.SetAddress(exec.address())
prm.SetPrivateKey(key) prm.SetPrivateKey(key)
prm.SetSessionToken(exec.prm.common.SessionToken()) prm.SetSessionToken(exec.prm.common.SessionToken())
prm.SetBearerToken(exec.prm.common.BearerToken()) prm.SetBearerToken(exec.prm.common.BearerToken())
prm.SetXHeaders(exec.prm.common.XHeaders()) prm.SetXHeaders(exec.prm.common.XHeaders())
prm.SetRange(rng) prm.SetRange(rng)
if exec.prm.raw { if exec.isRaw() {
prm.SetRawFlag() prm.SetRawFlag()
} }
@ -166,17 +173,17 @@ func (c *clientWrapper) getObject(exec *execCtx, info coreclient.NodeInfo) (*obj
func (c *clientWrapper) get(exec *execCtx, key *ecdsa.PrivateKey) (*object.Object, error) { func (c *clientWrapper) get(exec *execCtx, key *ecdsa.PrivateKey) (*object.Object, error) {
var prm internalclient.GetObjectPrm var prm internalclient.GetObjectPrm
prm.SetContext(exec.ctx) prm.SetContext(exec.context())
prm.SetClient(c.client) prm.SetClient(c.client)
prm.SetTTL(exec.prm.common.TTL()) prm.SetTTL(exec.prm.common.TTL())
prm.SetNetmapEpoch(exec.curProcEpoch) prm.SetNetmapEpoch(exec.curProcEpoch)
prm.SetAddress(exec.prm.addr) prm.SetAddress(exec.address())
prm.SetPrivateKey(key) prm.SetPrivateKey(key)
prm.SetSessionToken(exec.prm.common.SessionToken()) prm.SetSessionToken(exec.prm.common.SessionToken())
prm.SetBearerToken(exec.prm.common.BearerToken()) prm.SetBearerToken(exec.prm.common.BearerToken())
prm.SetXHeaders(exec.prm.common.XHeaders()) prm.SetXHeaders(exec.prm.common.XHeaders())
if exec.prm.raw { if exec.isRaw() {
prm.SetRawFlag() prm.SetRawFlag()
} }
@ -188,25 +195,24 @@ func (c *clientWrapper) get(exec *execCtx, key *ecdsa.PrivateKey) (*object.Objec
return res.Object(), nil return res.Object(), nil
} }
func (w *storageEngineWrapper) get(exec *execCtx) (*object.Object, error) { func (e *storageEngineWrapper) get(exec *execCtx) (*object.Object, error) {
e := (*engine.StorageEngine)(w) if exec.headOnly() {
if exec.headOnly {
var headPrm engine.HeadPrm var headPrm engine.HeadPrm
headPrm.WithAddress(exec.prm.addr) headPrm.WithAddress(exec.address())
headPrm.WithRaw(exec.prm.raw) headPrm.WithRaw(exec.isRaw())
r, err := e.Head(headPrm) r, err := e.engine.Head(headPrm)
if err != nil { if err != nil {
return nil, err return nil, err
} }
return r.Header(), nil return r.Header(), nil
} else if rng := exec.prm.rng; rng != nil { } else if rng := exec.ctxRange(); rng != nil {
var getRange engine.RngPrm var getRange engine.RngPrm
getRange.WithAddress(exec.prm.addr) getRange.WithAddress(exec.address())
getRange.WithPayloadRange(rng) getRange.WithPayloadRange(rng)
r, err := e.GetRange(getRange) r, err := e.engine.GetRange(getRange)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -214,9 +220,9 @@ func (w *storageEngineWrapper) get(exec *execCtx) (*object.Object, error) {
return r.Object(), nil return r.Object(), nil
} else { } else {
var getPrm engine.GetPrm var getPrm engine.GetPrm
getPrm.WithAddress(exec.prm.addr) getPrm.WithAddress(exec.address())
r, err := e.Get(getPrm) r, err := e.engine.Get(getPrm)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -244,3 +250,7 @@ func (h *hasherWrapper) WriteChunk(p []byte) error {
_, err := h.hash.Write(p) _, err := h.hash.Write(p)
return err return err
} }
func (n *nmSrcWrapper) currentEpoch() (uint64, error) {
return n.nmSrc.Epoch()
}

View file

@ -13,7 +13,7 @@ import (
// Service implements Get operation of Object service v2. // Service implements Get operation of Object service v2.
type Service struct { type Service struct {
cfg *cfg
} }
// Option represents Service constructor option. // Option represents Service constructor option.
@ -27,13 +27,15 @@ type cfg struct {
// NewService constructs Service instance from provided options. // NewService constructs Service instance from provided options.
func NewService(opts ...Option) *Service { func NewService(opts ...Option) *Service {
var s Service c := new(cfg)
for i := range opts { for i := range opts {
opts[i](&s.cfg) opts[i](c)
} }
return &s return &Service{
cfg: c,
}
} }
// Get calls internal service and returns v2 object stream. // Get calls internal service and returns v2 object stream.

View file

@ -17,11 +17,14 @@ import (
type preparedObjectTarget interface { type preparedObjectTarget interface {
WriteObject(*objectSDK.Object, object.ContentMeta) error WriteObject(*objectSDK.Object, object.ContentMeta) error
Close() (*transformer.AccessIdentifiers, error)
} }
type distributedTarget struct { type distributedTarget struct {
traversal traversal traversal traversal
remotePool, localPool util.WorkerPool
obj *objectSDK.Object obj *objectSDK.Object
objMeta object.ContentMeta objMeta object.ContentMeta
@ -29,7 +32,7 @@ type distributedTarget struct {
nodeTargetInitializer func(nodeDesc) preparedObjectTarget nodeTargetInitializer func(nodeDesc) preparedObjectTarget
getWorkerPool func([]byte) (util.WorkerPool, bool) isLocalKey func([]byte) bool
relay func(nodeDesc) error relay func(nodeDesc) error
@ -45,6 +48,9 @@ type traversal struct {
// need of additional broadcast after the object is saved // need of additional broadcast after the object is saved
extraBroadcastEnabled bool extraBroadcastEnabled bool
// mtx protects mExclude map.
mtx sync.RWMutex
// container nodes which was processed during the primary object placement // container nodes which was processed during the primary object placement
mExclude map[string]struct{} mExclude map[string]struct{}
} }
@ -70,17 +76,21 @@ func (x *traversal) submitProcessed(n placement.Node) {
if x.extraBroadcastEnabled { if x.extraBroadcastEnabled {
key := string(n.PublicKey()) key := string(n.PublicKey())
x.mtx.Lock()
if x.mExclude == nil { if x.mExclude == nil {
x.mExclude = make(map[string]struct{}, 1) x.mExclude = make(map[string]struct{}, 1)
} }
x.mExclude[key] = struct{}{} x.mExclude[key] = struct{}{}
x.mtx.Unlock()
} }
} }
// checks if specified node was processed during the primary object placement. // checks if specified node was processed during the primary object placement.
func (x *traversal) processed(n placement.Node) bool { func (x *traversal) processed(n placement.Node) bool {
x.mtx.RLock()
_, ok := x.mExclude[string(n.PublicKey())] _, ok := x.mExclude[string(n.PublicKey())]
x.mtx.RUnlock()
return ok return ok
} }
@ -146,9 +156,10 @@ func (t *distributedTarget) sendObject(node nodeDesc) error {
target := t.nodeTargetInitializer(node) target := t.nodeTargetInitializer(node)
err := target.WriteObject(t.obj, t.objMeta) if err := target.WriteObject(t.obj, t.objMeta); err != nil {
if err != nil {
return fmt.Errorf("could not write header: %w", err) return fmt.Errorf("could not write header: %w", err)
} else if _, err := target.Close(); err != nil {
return fmt.Errorf("could not close object stream: %w", err)
} }
return nil return nil
} }
@ -184,12 +195,27 @@ loop:
addr := addrs[i] addr := addrs[i]
workerPool, isLocal := t.getWorkerPool(addr.PublicKey()) isLocal := t.isLocalKey(addr.PublicKey())
var workerPool util.WorkerPool
if isLocal {
workerPool = t.localPool
} else {
workerPool = t.remotePool
}
if err := workerPool.Submit(func() { if err := workerPool.Submit(func() {
defer wg.Done() defer wg.Done()
err := f(nodeDesc{local: isLocal, info: addr}) err := f(nodeDesc{local: isLocal, info: addr})
// mark the container node as processed in order to exclude it
// in subsequent container broadcast. Note that we don't
// process this node during broadcast if primary placement
// on it failed.
t.traversal.submitProcessed(addr)
if err != nil { if err != nil {
resErr.Store(err) resErr.Store(err)
svcutil.LogServiceError(t.log, "PUT", addr.Addresses(), err) svcutil.LogServiceError(t.log, "PUT", addr.Addresses(), err)
@ -204,12 +230,6 @@ loop:
break loop break loop
} }
// mark the container node as processed in order to exclude it
// in subsequent container broadcast. Note that we don't
// process this node during broadcast if primary placement
// on it failed.
t.traversal.submitProcessed(addrs[i])
} }
wg.Wait() wg.Wait()

View file

@ -4,6 +4,7 @@ import (
"fmt" "fmt"
objectCore "github.com/TrueCloudLab/frostfs-node/pkg/core/object" objectCore "github.com/TrueCloudLab/frostfs-node/pkg/core/object"
"github.com/TrueCloudLab/frostfs-node/pkg/services/object_manager/transformer"
"github.com/TrueCloudLab/frostfs-sdk-go/object" "github.com/TrueCloudLab/frostfs-sdk-go/object"
oid "github.com/TrueCloudLab/frostfs-sdk-go/object/id" oid "github.com/TrueCloudLab/frostfs-sdk-go/object/id"
) )
@ -23,26 +24,40 @@ type ObjectStorage interface {
type localTarget struct { type localTarget struct {
storage ObjectStorage storage ObjectStorage
obj *object.Object
meta objectCore.ContentMeta
} }
func (t localTarget) WriteObject(obj *object.Object, meta objectCore.ContentMeta) error { func (t *localTarget) WriteObject(obj *object.Object, meta objectCore.ContentMeta) error {
switch meta.Type() { t.obj = obj
t.meta = meta
return nil
}
func (t *localTarget) Close() (*transformer.AccessIdentifiers, error) {
switch t.meta.Type() {
case object.TypeTombstone: case object.TypeTombstone:
err := t.storage.Delete(objectCore.AddressOf(obj), meta.Objects()) err := t.storage.Delete(objectCore.AddressOf(t.obj), t.meta.Objects())
if err != nil { if err != nil {
return fmt.Errorf("could not delete objects from tombstone locally: %w", err) return nil, fmt.Errorf("could not delete objects from tombstone locally: %w", err)
} }
case object.TypeLock: case object.TypeLock:
err := t.storage.Lock(objectCore.AddressOf(obj), meta.Objects()) err := t.storage.Lock(objectCore.AddressOf(t.obj), t.meta.Objects())
if err != nil { if err != nil {
return fmt.Errorf("could not lock object from lock objects locally: %w", err) return nil, fmt.Errorf("could not lock object from lock objects locally: %w", err)
} }
default: default:
// objects that do not change meta storage // objects that do not change meta storage
} }
if err := t.storage.Put(obj); err != nil { if err := t.storage.Put(t.obj); err != nil {
return fmt.Errorf("(%T) could not put object to local storage: %w", t, err) return nil, fmt.Errorf("(%T) could not put object to local storage: %w", t, err)
} }
return nil
id, _ := t.obj.ID()
return new(transformer.AccessIdentifiers).
WithSelfID(id), nil
} }

View file

@ -9,6 +9,7 @@ import (
objectcore "github.com/TrueCloudLab/frostfs-node/pkg/core/object" objectcore "github.com/TrueCloudLab/frostfs-node/pkg/core/object"
internalclient "github.com/TrueCloudLab/frostfs-node/pkg/services/object/internal/client" internalclient "github.com/TrueCloudLab/frostfs-node/pkg/services/object/internal/client"
"github.com/TrueCloudLab/frostfs-node/pkg/services/object/util" "github.com/TrueCloudLab/frostfs-node/pkg/services/object/util"
"github.com/TrueCloudLab/frostfs-node/pkg/services/object_manager/transformer"
"github.com/TrueCloudLab/frostfs-sdk-go/netmap" "github.com/TrueCloudLab/frostfs-sdk-go/netmap"
"github.com/TrueCloudLab/frostfs-sdk-go/object" "github.com/TrueCloudLab/frostfs-sdk-go/object"
) )
@ -22,6 +23,8 @@ type remoteTarget struct {
nodeInfo clientcore.NodeInfo nodeInfo clientcore.NodeInfo
obj *object.Object
clientConstructor ClientConstructor clientConstructor ClientConstructor
} }
@ -41,6 +44,12 @@ type RemotePutPrm struct {
} }
func (t *remoteTarget) WriteObject(obj *object.Object, _ objectcore.ContentMeta) error { func (t *remoteTarget) WriteObject(obj *object.Object, _ objectcore.ContentMeta) error {
t.obj = obj
return nil
}
func (t *remoteTarget) Close() (*transformer.AccessIdentifiers, error) {
var sessionInfo *util.SessionInfo var sessionInfo *util.SessionInfo
if tok := t.commonPrm.SessionToken(); tok != nil { if tok := t.commonPrm.SessionToken(); tok != nil {
@ -52,12 +61,12 @@ func (t *remoteTarget) WriteObject(obj *object.Object, _ objectcore.ContentMeta)
key, err := t.keyStorage.GetKey(sessionInfo) key, err := t.keyStorage.GetKey(sessionInfo)
if err != nil { if err != nil {
return fmt.Errorf("(%T) could not receive private key: %w", t, err) return nil, fmt.Errorf("(%T) could not receive private key: %w", t, err)
} }
c, err := t.clientConstructor.Get(t.nodeInfo) c, err := t.clientConstructor.Get(t.nodeInfo)
if err != nil { if err != nil {
return fmt.Errorf("(%T) could not create SDK client %s: %w", t, t.nodeInfo, err) return nil, fmt.Errorf("(%T) could not create SDK client %s: %w", t, t.nodeInfo, err)
} }
var prm internalclient.PutObjectPrm var prm internalclient.PutObjectPrm
@ -68,14 +77,15 @@ func (t *remoteTarget) WriteObject(obj *object.Object, _ objectcore.ContentMeta)
prm.SetSessionToken(t.commonPrm.SessionToken()) prm.SetSessionToken(t.commonPrm.SessionToken())
prm.SetBearerToken(t.commonPrm.BearerToken()) prm.SetBearerToken(t.commonPrm.BearerToken())
prm.SetXHeaders(t.commonPrm.XHeaders()) prm.SetXHeaders(t.commonPrm.XHeaders())
prm.SetObject(obj) prm.SetObject(t.obj)
_, err = internalclient.PutObject(prm) res, err := internalclient.PutObject(prm)
if err != nil { if err != nil {
return fmt.Errorf("(%T) could not put object to %s: %w", t, t.nodeInfo.AddressGroup(), err) return nil, fmt.Errorf("(%T) could not put object to %s: %w", t, t.nodeInfo.AddressGroup(), err)
} }
return nil return new(transformer.AccessIdentifiers).
WithSelfID(res.ID()), nil
} }
// NewRemoteSender creates, initializes and returns new RemoteSender instance. // NewRemoteSender creates, initializes and returns new RemoteSender instance.
@ -117,8 +127,9 @@ func (s *RemoteSender) PutObject(ctx context.Context, p *RemotePutPrm) error {
return fmt.Errorf("parse client node info: %w", err) return fmt.Errorf("parse client node info: %w", err)
} }
err = t.WriteObject(p.obj, objectcore.ContentMeta{}) if err := t.WriteObject(p.obj, objectcore.ContentMeta{}); err != nil {
if err != nil { return fmt.Errorf("(%T) could not send object header: %w", s, err)
} else if _, err := t.Close(); err != nil {
return fmt.Errorf("(%T) could not send object: %w", s, err) return fmt.Errorf("(%T) could not send object: %w", s, err)
} }

View file

@ -22,7 +22,7 @@ type MaxSizeSource interface {
} }
type Service struct { type Service struct {
cfg *cfg
} }
type Option func(*cfg) type Option func(*cfg)
@ -57,28 +57,31 @@ type cfg struct {
log *logger.Logger log *logger.Logger
} }
func (c *cfg) initDefault() { func defaultCfg() *cfg {
c.remotePool = util.NewPseudoWorkerPool() return &cfg{
c.localPool = util.NewPseudoWorkerPool() remotePool: util.NewPseudoWorkerPool(),
c.log = &logger.Logger{Logger: zap.L()} localPool: util.NewPseudoWorkerPool(),
log: &logger.Logger{Logger: zap.L()},
}
} }
func NewService(opts ...Option) *Service { func NewService(opts ...Option) *Service {
var s Service c := defaultCfg()
s.cfg.initDefault()
for i := range opts { for i := range opts {
opts[i](&s.cfg) opts[i](c)
} }
s.cfg.fmtValidator = object.NewFormatValidator(s.cfg.fmtValidatorOpts...) c.fmtValidator = object.NewFormatValidator(c.fmtValidatorOpts...)
return &s return &Service{
cfg: c,
}
} }
func (p *Service) Put(ctx context.Context) (*Streamer, error) { func (p *Service) Put(ctx context.Context) (*Streamer, error) {
return &Streamer{ return &Streamer{
cfg: &p.cfg, cfg: p.cfg,
ctx: ctx, ctx: ctx,
}, nil }, nil
} }

View file

@ -10,7 +10,6 @@ import (
"github.com/TrueCloudLab/frostfs-node/pkg/services/object/util" "github.com/TrueCloudLab/frostfs-node/pkg/services/object/util"
"github.com/TrueCloudLab/frostfs-node/pkg/services/object_manager/placement" "github.com/TrueCloudLab/frostfs-node/pkg/services/object_manager/placement"
"github.com/TrueCloudLab/frostfs-node/pkg/services/object_manager/transformer" "github.com/TrueCloudLab/frostfs-node/pkg/services/object_manager/transformer"
pkgutil "github.com/TrueCloudLab/frostfs-node/pkg/util"
containerSDK "github.com/TrueCloudLab/frostfs-sdk-go/container" containerSDK "github.com/TrueCloudLab/frostfs-sdk-go/container"
"github.com/TrueCloudLab/frostfs-sdk-go/object" "github.com/TrueCloudLab/frostfs-sdk-go/object"
"github.com/TrueCloudLab/frostfs-sdk-go/user" "github.com/TrueCloudLab/frostfs-sdk-go/user"
@ -217,10 +216,11 @@ func (p *Streamer) newCommonTarget(prm *PutInitPrm) transformer.ObjectTarget {
extraBroadcastEnabled: withBroadcast, extraBroadcastEnabled: withBroadcast,
}, },
payload: getPayload(), payload: getPayload(),
getWorkerPool: p.getWorkerPool, remotePool: p.remotePool,
localPool: p.localPool,
nodeTargetInitializer: func(node nodeDesc) preparedObjectTarget { nodeTargetInitializer: func(node nodeDesc) preparedObjectTarget {
if node.local { if node.local {
return localTarget{ return &localTarget{
storage: p.localStore, storage: p.localStore,
} }
} }
@ -239,6 +239,8 @@ func (p *Streamer) newCommonTarget(prm *PutInitPrm) transformer.ObjectTarget {
relay: relay, relay: relay,
fmt: p.fmtValidator, fmt: p.fmtValidator,
log: p.log, log: p.log,
isLocalKey: p.netmapKeys.IsLocalKey,
} }
} }
@ -275,10 +277,3 @@ func (p *Streamer) Close() (*PutResponse, error) {
id: ids.SelfID(), id: ids.SelfID(),
}, nil }, nil
} }
func (p *Streamer) getWorkerPool(pub []byte) (pkgutil.WorkerPool, bool) {
if p.netmapKeys.IsLocalKey(pub) {
return p.localPool, true
}
return p.remotePool, false
}

View file

@ -11,7 +11,7 @@ import (
// Service implements Put operation of Object service v2. // Service implements Put operation of Object service v2.
type Service struct { type Service struct {
cfg *cfg
} }
// Option represents Service constructor option. // Option represents Service constructor option.
@ -24,13 +24,15 @@ type cfg struct {
// NewService constructs Service instance from provided options. // NewService constructs Service instance from provided options.
func NewService(opts ...Option) *Service { func NewService(opts ...Option) *Service {
var s Service c := new(cfg)
for i := range opts { for i := range opts {
opts[i](&s.cfg) opts[i](c)
} }
return &s return &Service{
cfg: c,
}
} }
// Put calls internal service and returns v2 object streamer. // Put calls internal service and returns v2 object streamer.

View file

@ -5,6 +5,7 @@ import (
"fmt" "fmt"
"github.com/TrueCloudLab/frostfs-api-go/v2/object" "github.com/TrueCloudLab/frostfs-api-go/v2/object"
"github.com/TrueCloudLab/frostfs-node/pkg/services/util"
"github.com/TrueCloudLab/frostfs-node/pkg/services/util/response" "github.com/TrueCloudLab/frostfs-node/pkg/services/util/response"
) )
@ -15,26 +16,25 @@ type ResponseService struct {
} }
type searchStreamResponser struct { type searchStreamResponser struct {
SearchStream util.ServerStream
respSvc *response.Service respWriter util.ResponseMessageWriter
} }
type getStreamResponser struct { type getStreamResponser struct {
GetObjectStream util.ServerStream
respSvc *response.Service respWriter util.ResponseMessageWriter
} }
type getRangeStreamResponser struct { type getRangeStreamResponser struct {
GetObjectRangeStream util.ServerStream
respSvc *response.Service respWriter util.ResponseMessageWriter
} }
type putStreamResponser struct { type putStreamResponser struct {
stream PutObjectStream stream *response.ClientMessageStreamer
respSvc *response.Service
} }
// NewResponseService returns object service instance that passes internal service // NewResponseService returns object service instance that passes internal service
@ -47,32 +47,29 @@ func NewResponseService(objSvc ServiceServer, respSvc *response.Service) *Respon
} }
func (s *getStreamResponser) Send(resp *object.GetResponse) error { func (s *getStreamResponser) Send(resp *object.GetResponse) error {
s.respSvc.SetMeta(resp) return s.respWriter(resp)
return s.GetObjectStream.Send(resp)
} }
func (s *ResponseService) Get(req *object.GetRequest, stream GetObjectStream) error { func (s *ResponseService) Get(req *object.GetRequest, stream GetObjectStream) error {
return s.svc.Get(req, &getStreamResponser{ return s.svc.Get(req, &getStreamResponser{
GetObjectStream: stream, ServerStream: stream,
respSvc: s.respSvc, respWriter: s.respSvc.HandleServerStreamRequest(func(resp util.ResponseMessage) error {
return stream.Send(resp.(*object.GetResponse))
}),
}) })
} }
func (s *putStreamResponser) Send(req *object.PutRequest) error { func (s *putStreamResponser) Send(req *object.PutRequest) error {
if err := s.stream.Send(req); err != nil { return s.stream.Send(req)
return fmt.Errorf("could not send the request: %w", err)
}
return nil
} }
func (s *putStreamResponser) CloseAndRecv() (*object.PutResponse, error) { func (s *putStreamResponser) CloseAndRecv() (*object.PutResponse, error) {
r, err := s.stream.CloseAndRecv() r, err := s.stream.CloseAndRecv()
if err != nil { if err != nil {
return nil, fmt.Errorf("could not close stream and receive response: %w", err) return nil, fmt.Errorf("(%T) could not receive response: %w", s, err)
} }
s.respSvc.SetMeta(r) return r.(*object.PutResponse), nil
return r, nil
} }
func (s *ResponseService) Put(ctx context.Context) (PutObjectStream, error) { func (s *ResponseService) Put(ctx context.Context) (PutObjectStream, error) {
@ -82,61 +79,78 @@ func (s *ResponseService) Put(ctx context.Context) (PutObjectStream, error) {
} }
return &putStreamResponser{ return &putStreamResponser{
stream: stream, stream: s.respSvc.CreateRequestStreamer(
respSvc: s.respSvc, func(req any) error {
return stream.Send(req.(*object.PutRequest))
},
func() (util.ResponseMessage, error) {
return stream.CloseAndRecv()
},
),
}, nil }, nil
} }
func (s *ResponseService) Head(ctx context.Context, req *object.HeadRequest) (*object.HeadResponse, error) { func (s *ResponseService) Head(ctx context.Context, req *object.HeadRequest) (*object.HeadResponse, error) {
resp, err := s.svc.Head(ctx, req) resp, err := s.respSvc.HandleUnaryRequest(ctx, req,
func(ctx context.Context, req any) (util.ResponseMessage, error) {
return s.svc.Head(ctx, req.(*object.HeadRequest))
},
)
if err != nil { if err != nil {
return nil, err return nil, err
} }
s.respSvc.SetMeta(resp) return resp.(*object.HeadResponse), nil
return resp, nil
} }
func (s *searchStreamResponser) Send(resp *object.SearchResponse) error { func (s *searchStreamResponser) Send(resp *object.SearchResponse) error {
s.respSvc.SetMeta(resp) return s.respWriter(resp)
return s.SearchStream.Send(resp)
} }
func (s *ResponseService) Search(req *object.SearchRequest, stream SearchStream) error { func (s *ResponseService) Search(req *object.SearchRequest, stream SearchStream) error {
return s.svc.Search(req, &searchStreamResponser{ return s.svc.Search(req, &searchStreamResponser{
SearchStream: stream, ServerStream: stream,
respSvc: s.respSvc, respWriter: s.respSvc.HandleServerStreamRequest(func(resp util.ResponseMessage) error {
return stream.Send(resp.(*object.SearchResponse))
}),
}) })
} }
func (s *ResponseService) Delete(ctx context.Context, req *object.DeleteRequest) (*object.DeleteResponse, error) { func (s *ResponseService) Delete(ctx context.Context, req *object.DeleteRequest) (*object.DeleteResponse, error) {
resp, err := s.svc.Delete(ctx, req) resp, err := s.respSvc.HandleUnaryRequest(ctx, req,
func(ctx context.Context, req any) (util.ResponseMessage, error) {
return s.svc.Delete(ctx, req.(*object.DeleteRequest))
},
)
if err != nil { if err != nil {
return nil, err return nil, err
} }
s.respSvc.SetMeta(resp) return resp.(*object.DeleteResponse), nil
return resp, nil
} }
func (s *getRangeStreamResponser) Send(resp *object.GetRangeResponse) error { func (s *getRangeStreamResponser) Send(resp *object.GetRangeResponse) error {
s.respSvc.SetMeta(resp) return s.respWriter(resp)
return s.GetObjectRangeStream.Send(resp)
} }
func (s *ResponseService) GetRange(req *object.GetRangeRequest, stream GetObjectRangeStream) error { func (s *ResponseService) GetRange(req *object.GetRangeRequest, stream GetObjectRangeStream) error {
return s.svc.GetRange(req, &getRangeStreamResponser{ return s.svc.GetRange(req, &getRangeStreamResponser{
GetObjectRangeStream: stream, ServerStream: stream,
respSvc: s.respSvc, respWriter: s.respSvc.HandleServerStreamRequest(func(resp util.ResponseMessage) error {
return stream.Send(resp.(*object.GetRangeResponse))
}),
}) })
} }
func (s *ResponseService) GetRangeHash(ctx context.Context, req *object.GetRangeHashRequest) (*object.GetRangeHashResponse, error) { func (s *ResponseService) GetRangeHash(ctx context.Context, req *object.GetRangeHashRequest) (*object.GetRangeHashResponse, error) {
resp, err := s.svc.GetRangeHash(ctx, req) resp, err := s.respSvc.HandleUnaryRequest(ctx, req,
func(ctx context.Context, req any) (util.ResponseMessage, error) {
return s.svc.GetRangeHash(ctx, req.(*object.GetRangeHashRequest))
},
)
if err != nil { if err != nil {
return nil, err return nil, err
} }
s.respSvc.SetMeta(resp) return resp.(*object.GetRangeHashResponse), nil
return resp, nil
} }

View file

@ -10,12 +10,12 @@ import (
) )
func (exec *execCtx) executeOnContainer() { func (exec *execCtx) executeOnContainer() {
if exec.prm.common.LocalOnly() { if exec.isLocal() {
exec.log.Debug("return result directly") exec.log.Debug("return result directly")
return return
} }
lookupDepth := exec.prm.common.NetmapLookupDepth() lookupDepth := exec.netmapLookupDepth()
exec.log.Debug("trying to execute in container...", exec.log.Debug("trying to execute in container...",
zap.Uint64("netmap lookup depth", lookupDepth), zap.Uint64("netmap lookup depth", lookupDepth),
@ -52,12 +52,12 @@ func (exec *execCtx) processCurrentEpoch() bool {
zap.Uint64("number", exec.curProcEpoch), zap.Uint64("number", exec.curProcEpoch),
) )
traverser, ok := exec.generateTraverser(exec.prm.cnr) traverser, ok := exec.generateTraverser(exec.containerID())
if !ok { if !ok {
return true return true
} }
ctx, cancel := context.WithCancel(exec.ctx) ctx, cancel := context.WithCancel(exec.context())
defer cancel() defer cancel()
for { for {

View file

@ -6,6 +6,7 @@ import (
"github.com/TrueCloudLab/frostfs-node/pkg/services/object_manager/placement" "github.com/TrueCloudLab/frostfs-node/pkg/services/object_manager/placement"
"github.com/TrueCloudLab/frostfs-node/pkg/util/logger" "github.com/TrueCloudLab/frostfs-node/pkg/util/logger"
cid "github.com/TrueCloudLab/frostfs-sdk-go/container/id" cid "github.com/TrueCloudLab/frostfs-sdk-go/container/id"
"github.com/TrueCloudLab/frostfs-sdk-go/object"
oid "github.com/TrueCloudLab/frostfs-sdk-go/object/id" oid "github.com/TrueCloudLab/frostfs-sdk-go/object/id"
"go.uber.org/zap" "go.uber.org/zap"
) )
@ -43,20 +44,44 @@ func (exec *execCtx) prepare() {
func (exec *execCtx) setLogger(l *logger.Logger) { func (exec *execCtx) setLogger(l *logger.Logger) {
exec.log = &logger.Logger{Logger: l.With( exec.log = &logger.Logger{Logger: l.With(
zap.String("request", "SEARCH"), zap.String("request", "SEARCH"),
zap.Stringer("container", exec.prm.cnr), zap.Stringer("container", exec.containerID()),
zap.Bool("local", exec.prm.common.LocalOnly()), zap.Bool("local", exec.isLocal()),
zap.Bool("with session", exec.prm.common.SessionToken() != nil), zap.Bool("with session", exec.prm.common.SessionToken() != nil),
zap.Bool("with bearer", exec.prm.common.BearerToken() != nil), zap.Bool("with bearer", exec.prm.common.BearerToken() != nil),
)} )}
} }
func (exec execCtx) context() context.Context {
return exec.ctx
}
func (exec execCtx) isLocal() bool {
return exec.prm.common.LocalOnly()
}
func (exec *execCtx) containerID() cid.ID {
return exec.prm.cnr
}
func (exec *execCtx) searchFilters() object.SearchFilters {
return exec.prm.filters
}
func (exec *execCtx) netmapEpoch() uint64 {
return exec.prm.common.NetmapEpoch()
}
func (exec *execCtx) netmapLookupDepth() uint64 {
return exec.prm.common.NetmapLookupDepth()
}
func (exec *execCtx) initEpoch() bool { func (exec *execCtx) initEpoch() bool {
exec.curProcEpoch = exec.prm.common.NetmapEpoch() exec.curProcEpoch = exec.netmapEpoch()
if exec.curProcEpoch > 0 { if exec.curProcEpoch > 0 {
return true return true
} }
e, err := exec.svc.epochSource.Epoch() e, err := exec.svc.currentEpochReceiver.currentEpoch()
switch { switch {
default: default:

View file

@ -51,7 +51,7 @@ type simpleIDWriter struct {
type testEpochReceiver uint64 type testEpochReceiver uint64
func (e testEpochReceiver) Epoch() (uint64, error) { func (e testEpochReceiver) currentEpoch() (uint64, error) {
return uint64(e), nil return uint64(e), nil
} }
@ -103,7 +103,7 @@ func (c *testClientCache) get(info clientcore.NodeInfo) (searchClient, error) {
} }
func (s *testStorage) search(exec *execCtx) ([]oid.ID, error) { func (s *testStorage) search(exec *execCtx) ([]oid.ID, error) {
v, ok := s.items[exec.prm.cnr.EncodeToString()] v, ok := s.items[exec.containerID().EncodeToString()]
if !ok { if !ok {
return nil, nil return nil, nil
} }
@ -112,7 +112,7 @@ func (s *testStorage) search(exec *execCtx) ([]oid.ID, error) {
} }
func (c *testStorage) searchObjects(exec *execCtx, _ clientcore.NodeInfo) ([]oid.ID, error) { func (c *testStorage) searchObjects(exec *execCtx, _ clientcore.NodeInfo) ([]oid.ID, error) {
v, ok := c.items[exec.prm.cnr.EncodeToString()] v, ok := c.items[exec.containerID().EncodeToString()]
if !ok { if !ok {
return nil, nil return nil, nil
} }
@ -146,7 +146,7 @@ func TestGetLocalOnly(t *testing.T) {
ctx := context.Background() ctx := context.Background()
newSvc := func(storage *testStorage) *Service { newSvc := func(storage *testStorage) *Service {
svc := &Service{} svc := &Service{cfg: new(cfg)}
svc.log = test.NewLogger(false) svc.log = test.NewLogger(false)
svc.localStorage = storage svc.localStorage = storage
@ -248,7 +248,7 @@ func TestGetRemoteSmall(t *testing.T) {
container.CalculateID(&id, cnr) container.CalculateID(&id, cnr)
newSvc := func(b *testPlacementBuilder, c *testClientCache) *Service { newSvc := func(b *testPlacementBuilder, c *testClientCache) *Service {
svc := &Service{} svc := &Service{cfg: new(cfg)}
svc.log = test.NewLogger(false) svc.log = test.NewLogger(false)
svc.localStorage = newTestStorage() svc.localStorage = newTestStorage()
@ -261,7 +261,7 @@ func TestGetRemoteSmall(t *testing.T) {
}, },
} }
svc.clientConstructor = c svc.clientConstructor = c
svc.epochSource = testEpochReceiver(curEpoch) svc.currentEpochReceiver = testEpochReceiver(curEpoch)
return svc return svc
} }
@ -357,7 +357,7 @@ func TestGetFromPastEpoch(t *testing.T) {
ids22 := generateIDs(10) ids22 := generateIDs(10)
c22.addResult(idCnr, ids22, nil) c22.addResult(idCnr, ids22, nil)
svc := &Service{} svc := &Service{cfg: new(cfg)}
svc.log = test.NewLogger(false) svc.log = test.NewLogger(false)
svc.localStorage = newTestStorage() svc.localStorage = newTestStorage()
@ -388,7 +388,7 @@ func TestGetFromPastEpoch(t *testing.T) {
}, },
} }
svc.epochSource = testEpochReceiver(curEpoch) svc.currentEpochReceiver = testEpochReceiver(curEpoch)
w := new(simpleIDWriter) w := new(simpleIDWriter)

View file

@ -15,7 +15,7 @@ import (
// Service is an utility serving requests // Service is an utility serving requests
// of Object.Search service. // of Object.Search service.
type Service struct { type Service struct {
cfg *cfg
} }
// Option is a Service's constructor option. // Option is a Service's constructor option.
@ -46,31 +46,32 @@ type cfg struct {
generateTraverser(cid.ID, uint64) (*placement.Traverser, error) generateTraverser(cid.ID, uint64) (*placement.Traverser, error)
} }
epochSource epochSource currentEpochReceiver interface {
currentEpoch() (uint64, error)
}
keyStore *util.KeyStorage keyStore *util.KeyStorage
} }
type epochSource interface { func defaultCfg() *cfg {
Epoch() (uint64, error) return &cfg{
log: &logger.Logger{Logger: zap.L()},
clientConstructor: new(clientConstructorWrapper),
} }
func (c *cfg) initDefault() {
c.log = &logger.Logger{Logger: zap.L()}
c.clientConstructor = new(clientConstructorWrapper)
} }
// New creates, initializes and returns utility serving // New creates, initializes and returns utility serving
// Object.Get service requests. // Object.Get service requests.
func New(opts ...Option) *Service { func New(opts ...Option) *Service {
var s Service c := defaultCfg()
s.cfg.initDefault()
for i := range opts { for i := range opts {
opts[i](&s.cfg) opts[i](c)
} }
return &s return &Service{
cfg: c,
}
} }
// WithLogger returns option to specify Get service's logger. // WithLogger returns option to specify Get service's logger.
@ -109,7 +110,9 @@ func WithTraverserGenerator(t *util.TraverserGenerator) Option {
// map storage to receive current network state. // map storage to receive current network state.
func WithNetMapSource(nmSrc netmap.Source) Option { func WithNetMapSource(nmSrc netmap.Source) Option {
return func(c *cfg) { return func(c *cfg) {
c.epochSource = nmSrc c.currentEpochReceiver = &nmSrcWrapper{
nmSrc: nmSrc,
}
} }
} }

View file

@ -4,6 +4,7 @@ import (
"sync" "sync"
"github.com/TrueCloudLab/frostfs-node/pkg/core/client" "github.com/TrueCloudLab/frostfs-node/pkg/core/client"
"github.com/TrueCloudLab/frostfs-node/pkg/core/netmap"
"github.com/TrueCloudLab/frostfs-node/pkg/local_object_storage/engine" "github.com/TrueCloudLab/frostfs-node/pkg/local_object_storage/engine"
internalclient "github.com/TrueCloudLab/frostfs-node/pkg/services/object/internal/client" internalclient "github.com/TrueCloudLab/frostfs-node/pkg/services/object/internal/client"
"github.com/TrueCloudLab/frostfs-node/pkg/services/object/util" "github.com/TrueCloudLab/frostfs-node/pkg/services/object/util"
@ -34,6 +35,10 @@ type storageEngineWrapper struct {
type traverseGeneratorWrapper util.TraverserGenerator type traverseGeneratorWrapper util.TraverserGenerator
type nmSrcWrapper struct {
nmSrc netmap.Source
}
func newUniqueAddressWriter(w IDListWriter) IDListWriter { func newUniqueAddressWriter(w IDListWriter) IDListWriter {
return &uniqueIDWriter{ return &uniqueIDWriter{
written: make(map[oid.ID]struct{}), written: make(map[oid.ID]struct{}),
@ -93,7 +98,7 @@ func (c *clientWrapper) searchObjects(exec *execCtx, info client.NodeInfo) ([]oi
var prm internalclient.SearchObjectsPrm var prm internalclient.SearchObjectsPrm
prm.SetContext(exec.ctx) prm.SetContext(exec.context())
prm.SetClient(c.client) prm.SetClient(c.client)
prm.SetPrivateKey(key) prm.SetPrivateKey(key)
prm.SetSessionToken(exec.prm.common.SessionToken()) prm.SetSessionToken(exec.prm.common.SessionToken())
@ -101,8 +106,8 @@ func (c *clientWrapper) searchObjects(exec *execCtx, info client.NodeInfo) ([]oi
prm.SetTTL(exec.prm.common.TTL()) prm.SetTTL(exec.prm.common.TTL())
prm.SetXHeaders(exec.prm.common.XHeaders()) prm.SetXHeaders(exec.prm.common.XHeaders())
prm.SetNetmapEpoch(exec.curProcEpoch) prm.SetNetmapEpoch(exec.curProcEpoch)
prm.SetContainerID(exec.prm.cnr) prm.SetContainerID(exec.containerID())
prm.SetFilters(exec.prm.filters) prm.SetFilters(exec.searchFilters())
res, err := internalclient.SearchObjects(prm) res, err := internalclient.SearchObjects(prm)
if err != nil { if err != nil {
@ -114,8 +119,8 @@ func (c *clientWrapper) searchObjects(exec *execCtx, info client.NodeInfo) ([]oi
func (e *storageEngineWrapper) search(exec *execCtx) ([]oid.ID, error) { func (e *storageEngineWrapper) search(exec *execCtx) ([]oid.ID, error) {
var selectPrm engine.SelectPrm var selectPrm engine.SelectPrm
selectPrm.WithFilters(exec.prm.filters) selectPrm.WithFilters(exec.searchFilters())
selectPrm.WithContainerID(exec.prm.cnr) selectPrm.WithContainerID(exec.containerID())
r, err := e.storage.Select(selectPrm) r, err := e.storage.Select(selectPrm)
if err != nil { if err != nil {
@ -138,3 +143,7 @@ func idsFromAddresses(addrs []oid.Address) []oid.ID {
func (e *traverseGeneratorWrapper) generateTraverser(cnr cid.ID, epoch uint64) (*placement.Traverser, error) { func (e *traverseGeneratorWrapper) generateTraverser(cnr cid.ID, epoch uint64) (*placement.Traverser, error) {
return (*util.TraverserGenerator)(e).GenerateTraverser(cnr, nil, epoch) return (*util.TraverserGenerator)(e).GenerateTraverser(cnr, nil, epoch)
} }
func (n *nmSrcWrapper) currentEpoch() (uint64, error) {
return n.nmSrc.Epoch()
}

View file

@ -9,7 +9,7 @@ import (
// Service implements Search operation of Object service v2. // Service implements Search operation of Object service v2.
type Service struct { type Service struct {
cfg *cfg
} }
// Option represents Service constructor option. // Option represents Service constructor option.
@ -23,13 +23,15 @@ type cfg struct {
// NewService constructs Service instance from provided options. // NewService constructs Service instance from provided options.
func NewService(opts ...Option) *Service { func NewService(opts ...Option) *Service {
var s Service c := new(cfg)
for i := range opts { for i := range opts {
opts[i](&s.cfg) opts[i](c)
} }
return &s return &Service{
cfg: c,
}
} }
// Search calls internal service and returns v2 object stream. // Search calls internal service and returns v2 object stream.

View file

@ -18,29 +18,27 @@ type SignService struct {
} }
type searchStreamSigner struct { type searchStreamSigner struct {
SearchStream util.ServerStream
statusSupported bool
sigSvc *util.SignService respWriter util.ResponseMessageWriter
nonEmptyResp bool // set on first Send call nonEmptyResp bool // set on first Send call
} }
type getStreamSigner struct { type getStreamSigner struct {
GetObjectStream util.ServerStream
statusSupported bool
sigSvc *util.SignService respWriter util.ResponseMessageWriter
} }
type putStreamSigner struct { type putStreamSigner struct {
sigSvc *util.SignService stream *util.RequestMessageStreamer
stream PutObjectStream
statusSupported bool
err error
} }
type getRangeStreamSigner struct { type getRangeStreamSigner struct {
GetObjectRangeStream util.ServerStream
statusSupported bool
sigSvc *util.SignService respWriter util.ResponseMessageWriter
} }
func NewSignService(key *ecdsa.PrivateKey, svc ServiceServer) *SignService { func NewSignService(key *ecdsa.PrivateKey, svc ServiceServer) *SignService {
@ -52,50 +50,37 @@ func NewSignService(key *ecdsa.PrivateKey, svc ServiceServer) *SignService {
} }
func (s *getStreamSigner) Send(resp *object.GetResponse) error { func (s *getStreamSigner) Send(resp *object.GetResponse) error {
if err := s.sigSvc.SignResponse(s.statusSupported, resp, nil); err != nil { return s.respWriter(resp)
return err
}
return s.GetObjectStream.Send(resp)
} }
func (s *SignService) Get(req *object.GetRequest, stream GetObjectStream) error { func (s *SignService) Get(req *object.GetRequest, stream GetObjectStream) error {
if err := s.sigSvc.VerifyRequest(req); err != nil { return s.sigSvc.HandleServerStreamRequest(req,
resp := new(object.GetResponse) func(resp util.ResponseMessage) error {
_ = s.sigSvc.SignResponse(util.IsStatusSupported(req), resp, err) return stream.Send(resp.(*object.GetResponse))
return stream.Send(resp) },
} func() util.ResponseMessage {
return new(object.GetResponse)
},
func(respWriter util.ResponseMessageWriter) error {
return s.svc.Get(req, &getStreamSigner{ return s.svc.Get(req, &getStreamSigner{
GetObjectStream: stream, ServerStream: stream,
sigSvc: s.sigSvc, respWriter: respWriter,
statusSupported: util.IsStatusSupported(req),
}) })
},
)
} }
func (s *putStreamSigner) Send(req *object.PutRequest) error { func (s *putStreamSigner) Send(req *object.PutRequest) error {
s.statusSupported = util.IsStatusSupported(req) return s.stream.Send(req)
if s.err = s.sigSvc.VerifyRequest(req); s.err != nil {
return util.ErrAbortStream
}
if s.err = s.stream.Send(req); s.err != nil {
return util.ErrAbortStream
}
return nil
} }
func (s *putStreamSigner) CloseAndRecv() (resp *object.PutResponse, err error) { func (s *putStreamSigner) CloseAndRecv() (*object.PutResponse, error) {
if s.err != nil { r, err := s.stream.CloseAndRecv()
err = s.err
resp = new(object.PutResponse)
} else {
resp, err = s.stream.CloseAndRecv()
if err != nil { if err != nil {
return nil, fmt.Errorf("could not close stream and receive response: %w", err) return nil, fmt.Errorf("could not receive response: %w", err)
}
} }
return resp, s.sigSvc.SignResponse(s.statusSupported, resp, err) return r.(*object.PutResponse), nil
} }
func (s *SignService) Put(ctx context.Context) (PutObjectStream, error) { func (s *SignService) Put(ctx context.Context) (PutObjectStream, error) {
@ -105,42 +90,58 @@ func (s *SignService) Put(ctx context.Context) (PutObjectStream, error) {
} }
return &putStreamSigner{ return &putStreamSigner{
stream: stream, stream: s.sigSvc.CreateRequestStreamer(
sigSvc: s.sigSvc, func(req any) error {
return stream.Send(req.(*object.PutRequest))
},
func() (util.ResponseMessage, error) {
return stream.CloseAndRecv()
},
func() util.ResponseMessage {
return new(object.PutResponse)
},
),
}, nil }, nil
} }
func (s *SignService) Head(ctx context.Context, req *object.HeadRequest) (*object.HeadResponse, error) { func (s *SignService) Head(ctx context.Context, req *object.HeadRequest) (*object.HeadResponse, error) {
if err := s.sigSvc.VerifyRequest(req); err != nil { resp, err := s.sigSvc.HandleUnaryRequest(ctx, req,
resp := new(object.HeadResponse) func(ctx context.Context, req any) (util.ResponseMessage, error) {
return resp, s.sigSvc.SignResponse(util.IsStatusSupported(req), resp, err) return s.svc.Head(ctx, req.(*object.HeadRequest))
},
func() util.ResponseMessage {
return new(object.HeadResponse)
},
)
if err != nil {
return nil, err
} }
resp, err := util.WrapResponse(s.svc.Head(ctx, req))
return resp, s.sigSvc.SignResponse(util.IsStatusSupported(req), resp, err) return resp.(*object.HeadResponse), nil
} }
func (s *searchStreamSigner) Send(resp *object.SearchResponse) error { func (s *searchStreamSigner) Send(resp *object.SearchResponse) error {
s.nonEmptyResp = true s.nonEmptyResp = true
if err := s.sigSvc.SignResponse(s.statusSupported, resp, nil); err != nil { return s.respWriter(resp)
return err
}
return s.SearchStream.Send(resp)
} }
func (s *SignService) Search(req *object.SearchRequest, stream SearchStream) error { func (s *SignService) Search(req *object.SearchRequest, stream SearchStream) error {
if err := s.sigSvc.VerifyRequest(req); err != nil { return s.sigSvc.HandleServerStreamRequest(req,
resp := new(object.SearchResponse) func(resp util.ResponseMessage) error {
_ = s.sigSvc.SignResponse(util.IsStatusSupported(req), resp, err) return stream.Send(resp.(*object.SearchResponse))
return stream.Send(resp) },
func() util.ResponseMessage {
return new(object.SearchResponse)
},
func(respWriter util.ResponseMessageWriter) error {
stream := &searchStreamSigner{
ServerStream: stream,
respWriter: respWriter,
} }
ss := &searchStreamSigner{ err := s.svc.Search(req, stream)
SearchStream: stream,
sigSvc: s.sigSvc, if err == nil && !stream.nonEmptyResp {
statusSupported: util.IsStatusSupported(req),
}
err := s.svc.Search(req, ss)
if err == nil && !ss.nonEmptyResp {
// The higher component does not write any response in the case of an empty result (which is correct). // The higher component does not write any response in the case of an empty result (which is correct).
// With the introduction of status returns at least one answer must be signed and sent to the client. // With the introduction of status returns at least one answer must be signed and sent to the client.
// This approach is supported by clients who do not know how to work with statuses (one could make // This approach is supported by clients who do not know how to work with statuses (one could make
@ -148,44 +149,61 @@ func (s *SignService) Search(req *object.SearchRequest, stream SearchStream) err
// answer can be neglected due to the gradual refusal to use the "old" clients). // answer can be neglected due to the gradual refusal to use the "old" clients).
return stream.Send(new(object.SearchResponse)) return stream.Send(new(object.SearchResponse))
} }
return err return err
},
)
} }
func (s *SignService) Delete(ctx context.Context, req *object.DeleteRequest) (*object.DeleteResponse, error) { func (s *SignService) Delete(ctx context.Context, req *object.DeleteRequest) (*object.DeleteResponse, error) {
if err := s.sigSvc.VerifyRequest(req); err != nil { resp, err := s.sigSvc.HandleUnaryRequest(ctx, req,
resp := new(object.DeleteResponse) func(ctx context.Context, req any) (util.ResponseMessage, error) {
return resp, s.sigSvc.SignResponse(util.IsStatusSupported(req), resp, err) return s.svc.Delete(ctx, req.(*object.DeleteRequest))
},
func() util.ResponseMessage {
return new(object.DeleteResponse)
},
)
if err != nil {
return nil, err
} }
resp, err := util.WrapResponse(s.svc.Delete(ctx, req))
return resp, s.sigSvc.SignResponse(util.IsStatusSupported(req), resp, err) return resp.(*object.DeleteResponse), nil
} }
func (s *getRangeStreamSigner) Send(resp *object.GetRangeResponse) error { func (s *getRangeStreamSigner) Send(resp *object.GetRangeResponse) error {
if err := s.sigSvc.SignResponse(s.statusSupported, resp, nil); err != nil { return s.respWriter(resp)
return err
}
return s.GetObjectRangeStream.Send(resp)
} }
func (s *SignService) GetRange(req *object.GetRangeRequest, stream GetObjectRangeStream) error { func (s *SignService) GetRange(req *object.GetRangeRequest, stream GetObjectRangeStream) error {
if err := s.sigSvc.VerifyRequest(req); err != nil { return s.sigSvc.HandleServerStreamRequest(req,
resp := new(object.GetRangeResponse) func(resp util.ResponseMessage) error {
_ = s.sigSvc.SignResponse(util.IsStatusSupported(req), resp, err) return stream.Send(resp.(*object.GetRangeResponse))
return stream.Send(resp) },
} func() util.ResponseMessage {
return new(object.GetRangeResponse)
},
func(respWriter util.ResponseMessageWriter) error {
return s.svc.GetRange(req, &getRangeStreamSigner{ return s.svc.GetRange(req, &getRangeStreamSigner{
GetObjectRangeStream: stream, ServerStream: stream,
sigSvc: s.sigSvc, respWriter: respWriter,
statusSupported: util.IsStatusSupported(req),
}) })
},
)
} }
func (s *SignService) GetRangeHash(ctx context.Context, req *object.GetRangeHashRequest) (*object.GetRangeHashResponse, error) { func (s *SignService) GetRangeHash(ctx context.Context, req *object.GetRangeHashRequest) (*object.GetRangeHashResponse, error) {
if err := s.sigSvc.VerifyRequest(req); err != nil { resp, err := s.sigSvc.HandleUnaryRequest(ctx, req,
resp := new(object.GetRangeHashResponse) func(ctx context.Context, req any) (util.ResponseMessage, error) {
return resp, s.sigSvc.SignResponse(util.IsStatusSupported(req), resp, err) return s.svc.GetRangeHash(ctx, req.(*object.GetRangeHashRequest))
},
func() util.ResponseMessage {
return new(object.GetRangeHashResponse)
},
)
if err != nil {
return nil, err
} }
resp, err := util.WrapResponse(s.svc.GetRangeHash(ctx, req))
return resp, s.sigSvc.SignResponse(util.IsStatusSupported(req), resp, err) return resp.(*object.GetRangeHashResponse), nil
} }

View file

@ -51,11 +51,11 @@ func (oiw *objectsInWork) add(addr oid.Address) {
// Policer represents the utility that verifies // Policer represents the utility that verifies
// compliance with the object storage policy. // compliance with the object storage policy.
type Policer struct { type Policer struct {
cfg *cfg
cache *lru.Cache[oid.Address, time.Time] cache *lru.Cache[oid.Address, time.Time]
objsInWork objectsInWork objsInWork *objectsInWork
} }
// Option is an option for Policer constructor. // Option is an option for Policer constructor.
@ -95,33 +95,38 @@ type cfg struct {
rebalanceFreq, evictDuration time.Duration rebalanceFreq, evictDuration time.Duration
} }
func (c *cfg) initDefault() { func defaultCfg() *cfg {
c.log = &logger.Logger{Logger: zap.L()} return &cfg{
c.batchSize = 10 log: &logger.Logger{Logger: zap.L()},
c.cacheSize = 1024 // 1024 * address size = 1024 * 64 = 64 MiB batchSize: 10,
c.rebalanceFreq = 1 * time.Second cacheSize: 1024, // 1024 * address size = 1024 * 64 = 64 MiB
c.evictDuration = 30 * time.Second rebalanceFreq: 1 * time.Second,
evictDuration: 30 * time.Second,
}
} }
// New creates, initializes and returns Policer instance. // New creates, initializes and returns Policer instance.
func New(opts ...Option) *Policer { func New(opts ...Option) *Policer {
var p Policer c := defaultCfg()
p.cfg.initDefault()
for i := range opts { for i := range opts {
opts[i](&p.cfg) opts[i](c)
} }
p.log = &logger.Logger{Logger: p.cfg.log.With(zap.String("component", "Object Policer"))} c.log = &logger.Logger{Logger: c.log.With(zap.String("component", "Object Policer"))}
cache, err := lru.New[oid.Address, time.Time](int(p.cacheSize)) cache, err := lru.New[oid.Address, time.Time](int(c.cacheSize))
if err != nil { if err != nil {
panic(err) panic(err)
} }
p.cache = cache return &Policer{
p.objsInWork.objs = make(map[oid.Address]struct{}, p.maxCapacity) cfg: c,
return &p cache: cache,
objsInWork: &objectsInWork{
objs: make(map[oid.Address]struct{}, c.maxCapacity),
},
}
} }
// WithHeadTimeout returns option to set Head timeout of Policer. // WithHeadTimeout returns option to set Head timeout of Policer.

View file

@ -0,0 +1,50 @@
package reputationrpc
import (
"context"
"github.com/TrueCloudLab/frostfs-api-go/v2/reputation"
"github.com/TrueCloudLab/frostfs-node/pkg/services/util"
"github.com/TrueCloudLab/frostfs-node/pkg/services/util/response"
)
type responseService struct {
respSvc *response.Service
svc Server
}
// NewResponseService returns reputation service server instance that passes
// internal service call to response service.
func NewResponseService(cnrSvc Server, respSvc *response.Service) Server {
return &responseService{
respSvc: respSvc,
svc: cnrSvc,
}
}
func (s *responseService) AnnounceLocalTrust(ctx context.Context, req *reputation.AnnounceLocalTrustRequest) (*reputation.AnnounceLocalTrustResponse, error) {
resp, err := s.respSvc.HandleUnaryRequest(ctx, req,
func(ctx context.Context, req any) (util.ResponseMessage, error) {
return s.svc.AnnounceLocalTrust(ctx, req.(*reputation.AnnounceLocalTrustRequest))
},
)
if err != nil {
return nil, err
}
return resp.(*reputation.AnnounceLocalTrustResponse), nil
}
func (s *responseService) AnnounceIntermediateResult(ctx context.Context, req *reputation.AnnounceIntermediateResultRequest) (*reputation.AnnounceIntermediateResultResponse, error) {
resp, err := s.respSvc.HandleUnaryRequest(ctx, req,
func(ctx context.Context, req any) (util.ResponseMessage, error) {
return s.svc.AnnounceIntermediateResult(ctx, req.(*reputation.AnnounceIntermediateResultRequest))
},
)
if err != nil {
return nil, err
}
return resp.(*reputation.AnnounceIntermediateResultResponse), nil
}

View file

@ -22,19 +22,33 @@ func NewSignService(key *ecdsa.PrivateKey, svc Server) Server {
} }
func (s *signService) AnnounceLocalTrust(ctx context.Context, req *reputation.AnnounceLocalTrustRequest) (*reputation.AnnounceLocalTrustResponse, error) { func (s *signService) AnnounceLocalTrust(ctx context.Context, req *reputation.AnnounceLocalTrustRequest) (*reputation.AnnounceLocalTrustResponse, error) {
if err := s.sigSvc.VerifyRequest(req); err != nil { resp, err := s.sigSvc.HandleUnaryRequest(ctx, req,
resp := new(reputation.AnnounceLocalTrustResponse) func(ctx context.Context, req any) (util.ResponseMessage, error) {
return resp, s.sigSvc.SignResponse(util.IsStatusSupported(req), resp, err) return s.svc.AnnounceLocalTrust(ctx, req.(*reputation.AnnounceLocalTrustRequest))
},
func() util.ResponseMessage {
return new(reputation.AnnounceLocalTrustResponse)
},
)
if err != nil {
return nil, err
} }
resp, err := util.WrapResponse(s.svc.AnnounceLocalTrust(ctx, req))
return resp, s.sigSvc.SignResponse(util.IsStatusSupported(req), resp, err) return resp.(*reputation.AnnounceLocalTrustResponse), nil
} }
func (s *signService) AnnounceIntermediateResult(ctx context.Context, req *reputation.AnnounceIntermediateResultRequest) (*reputation.AnnounceIntermediateResultResponse, error) { func (s *signService) AnnounceIntermediateResult(ctx context.Context, req *reputation.AnnounceIntermediateResultRequest) (*reputation.AnnounceIntermediateResultResponse, error) {
if err := s.sigSvc.VerifyRequest(req); err != nil { resp, err := s.sigSvc.HandleUnaryRequest(ctx, req,
resp := new(reputation.AnnounceIntermediateResultResponse) func(ctx context.Context, req any) (util.ResponseMessage, error) {
return resp, s.sigSvc.SignResponse(util.IsStatusSupported(req), resp, err) return s.svc.AnnounceIntermediateResult(ctx, req.(*reputation.AnnounceIntermediateResultRequest))
},
func() util.ResponseMessage {
return new(reputation.AnnounceIntermediateResultResponse)
},
)
if err != nil {
return nil, err
} }
resp, err := util.WrapResponse(s.svc.AnnounceIntermediateResult(ctx, req))
return resp, s.sigSvc.SignResponse(util.IsStatusSupported(req), resp, err) return resp.(*reputation.AnnounceIntermediateResultResponse), nil
} }

View file

@ -5,7 +5,6 @@ import (
"fmt" "fmt"
"github.com/TrueCloudLab/frostfs-api-go/v2/session" "github.com/TrueCloudLab/frostfs-api-go/v2/session"
"github.com/TrueCloudLab/frostfs-node/pkg/services/util/response"
"github.com/TrueCloudLab/frostfs-node/pkg/util/logger" "github.com/TrueCloudLab/frostfs-node/pkg/util/logger"
"go.uber.org/zap" "go.uber.org/zap"
) )
@ -17,17 +16,14 @@ type ServiceExecutor interface {
type executorSvc struct { type executorSvc struct {
exec ServiceExecutor exec ServiceExecutor
respSvc *response.Service
log *logger.Logger log *logger.Logger
} }
// NewExecutionService wraps ServiceExecutor and returns Session Service interface. // NewExecutionService wraps ServiceExecutor and returns Session Service interface.
func NewExecutionService(exec ServiceExecutor, respSvc *response.Service, l *logger.Logger) Server { func NewExecutionService(exec ServiceExecutor, l *logger.Logger) Server {
return &executorSvc{ return &executorSvc{
exec: exec, exec: exec,
log: l, log: l,
respSvc: respSvc,
} }
} }
@ -45,6 +41,5 @@ func (s *executorSvc) Create(ctx context.Context, req *session.CreateRequest) (*
resp := new(session.CreateResponse) resp := new(session.CreateResponse)
resp.SetBody(respBody) resp.SetBody(respBody)
s.respSvc.SetMeta(resp)
return resp, nil return resp, nil
} }

View file

@ -0,0 +1,37 @@
package session
import (
"context"
"github.com/TrueCloudLab/frostfs-api-go/v2/session"
"github.com/TrueCloudLab/frostfs-node/pkg/services/util"
"github.com/TrueCloudLab/frostfs-node/pkg/services/util/response"
)
type responseService struct {
respSvc *response.Service
svc Server
}
// NewResponseService returns session service instance that passes internal service
// call to response service.
func NewResponseService(ssSvc Server, respSvc *response.Service) Server {
return &responseService{
respSvc: respSvc,
svc: ssSvc,
}
}
func (s *responseService) Create(ctx context.Context, req *session.CreateRequest) (*session.CreateResponse, error) {
resp, err := s.respSvc.HandleUnaryRequest(ctx, req,
func(ctx context.Context, req any) (util.ResponseMessage, error) {
return s.svc.Create(ctx, req.(*session.CreateRequest))
},
)
if err != nil {
return nil, err
}
return resp.(*session.CreateResponse), nil
}

View file

@ -22,10 +22,17 @@ func NewSignService(key *ecdsa.PrivateKey, svc Server) Server {
} }
func (s *signService) Create(ctx context.Context, req *session.CreateRequest) (*session.CreateResponse, error) { func (s *signService) Create(ctx context.Context, req *session.CreateRequest) (*session.CreateResponse, error) {
if err := s.sigSvc.VerifyRequest(req); err != nil { resp, err := s.sigSvc.HandleUnaryRequest(ctx, req,
resp := new(session.CreateResponse) func(ctx context.Context, req any) (util.ResponseMessage, error) {
return resp, s.sigSvc.SignResponse(util.IsStatusSupported(req), resp, err) return s.svc.Create(ctx, req.(*session.CreateRequest))
},
func() util.ResponseMessage {
return new(session.CreateResponse)
},
)
if err != nil {
return nil, err
} }
resp, err := util.WrapResponse(s.svc.Create(ctx, req))
return resp, s.sigSvc.SignResponse(util.IsStatusSupported(req), resp, err) return resp.(*session.CreateResponse), nil
} }

View file

@ -0,0 +1,47 @@
package response
import (
"fmt"
"github.com/TrueCloudLab/frostfs-node/pkg/services/util"
)
// ClientMessageStreamer represents client-side message streamer
// that sets meta values to the response.
type ClientMessageStreamer struct {
cfg *cfg
send util.RequestMessageWriter
close util.ClientStreamCloser
}
// Send calls send method of internal streamer.
func (s *ClientMessageStreamer) Send(req any) error {
if err := s.send(req); err != nil {
return fmt.Errorf("(%T) could not send the request: %w", s, err)
}
return nil
}
// CloseAndRecv closes internal stream, receivers the response,
// sets meta values and returns the result.
func (s *ClientMessageStreamer) CloseAndRecv() (util.ResponseMessage, error) {
resp, err := s.close()
if err != nil {
return nil, fmt.Errorf("(%T) could not close stream and receive response: %w", s, err)
}
setMeta(resp, s.cfg)
return resp, nil
}
// CreateRequestStreamer wraps stream methods and returns ClientMessageStreamer instance.
func (s *Service) CreateRequestStreamer(sender util.RequestMessageWriter, closer util.ClientStreamCloser) *ClientMessageStreamer {
return &ClientMessageStreamer{
cfg: s.cfg,
send: sender,
close: closer,
}
}

View file

@ -0,0 +1,37 @@
package response
import (
"fmt"
"github.com/TrueCloudLab/frostfs-node/pkg/services/util"
)
// ServerMessageStreamer represents server-side message streamer
// that sets meta values to all response messages.
type ServerMessageStreamer struct {
cfg *cfg
recv util.ResponseMessageReader
}
// Recv calls Recv method of internal streamer, sets response meta
// values and returns the response.
func (s *ServerMessageStreamer) Recv() (util.ResponseMessage, error) {
m, err := s.recv()
if err != nil {
return nil, fmt.Errorf("could not receive response message for signing: %w", err)
}
setMeta(m, s.cfg)
return m, nil
}
// HandleServerStreamRequest builds internal streamer via handlers, wraps it to ServerMessageStreamer and returns the result.
func (s *Service) HandleServerStreamRequest(respWriter util.ResponseMessageWriter) util.ResponseMessageWriter {
return func(resp util.ResponseMessage) error {
setMeta(resp, s.cfg)
return respWriter(resp)
}
}

View file

@ -11,24 +11,44 @@ import (
// Service represents universal v2 service // Service represents universal v2 service
// that sets response meta header values. // that sets response meta header values.
type Service struct { type Service struct {
cfg *cfg
}
// Option is an option of Service constructor.
type Option func(*cfg)
type cfg struct {
version refs.Version version refs.Version
state netmap.State state netmap.State
} }
// NewService creates, initializes and returns Service instance. func defaultCfg() *cfg {
func NewService(nmState netmap.State) *Service { var c cfg
s := &Service{state: nmState}
version.Current().WriteToV2(&s.version) version.Current().WriteToV2(&c.version)
return s
return &c
} }
// SetMeta sets adds meta-header to resp. // NewService creates, initializes and returns Service instance.
func (s *Service) SetMeta(resp util.ResponseMessage) { func NewService(opts ...Option) *Service {
c := defaultCfg()
for i := range opts {
opts[i](c)
}
return &Service{
cfg: c,
}
}
func setMeta(resp util.ResponseMessage, cfg *cfg) {
meta := new(session.ResponseMetaHeader) meta := new(session.ResponseMetaHeader)
meta.SetVersion(&s.version) meta.SetVersion(&cfg.version)
meta.SetTTL(1) // FIXME: #1160 TTL must be calculated meta.SetTTL(1) // FIXME: #1160 TTL must be calculated
meta.SetEpoch(s.state.CurrentEpoch()) meta.SetEpoch(cfg.state.CurrentEpoch())
if origin := resp.GetMetaHeader(); origin != nil { if origin := resp.GetMetaHeader(); origin != nil {
// FIXME: #1160 what if origin is set by local server? // FIXME: #1160 what if origin is set by local server?
@ -37,3 +57,10 @@ func (s *Service) SetMeta(resp util.ResponseMessage) {
resp.SetMetaHeader(meta) resp.SetMetaHeader(meta)
} }
// WithNetworkState returns option to set network state of Service.
func WithNetworkState(v netmap.State) Option {
return func(c *cfg) {
c.state = v
}
}

View file

@ -0,0 +1,21 @@
package response
import (
"context"
"fmt"
"github.com/TrueCloudLab/frostfs-node/pkg/services/util"
)
// HandleUnaryRequest call passes request to handler, sets response meta header values and returns it.
func (s *Service) HandleUnaryRequest(ctx context.Context, req any, handler util.UnaryHandler) (util.ResponseMessage, error) {
// process request
resp, err := handler(ctx, req)
if err != nil {
return nil, fmt.Errorf("could not handle request: %w", err)
}
setMeta(resp, s.cfg)
return resp, nil
}

View file

@ -1,6 +1,7 @@
package util package util
import ( import (
"context"
"crypto/ecdsa" "crypto/ecdsa"
"errors" "errors"
"fmt" "fmt"
@ -20,59 +21,198 @@ type ResponseMessage interface {
SetMetaHeader(*session.ResponseMetaHeader) SetMetaHeader(*session.ResponseMetaHeader)
} }
type UnaryHandler func(context.Context, any) (ResponseMessage, error)
type SignService struct { type SignService struct {
key *ecdsa.PrivateKey key *ecdsa.PrivateKey
} }
type ResponseMessageWriter func(ResponseMessage) error
type ServerStreamHandler func(context.Context, any) (ResponseMessageReader, error)
type ResponseMessageReader func() (ResponseMessage, error)
var ErrAbortStream = errors.New("abort message stream") var ErrAbortStream = errors.New("abort message stream")
type ResponseConstructor func() ResponseMessage
type RequestMessageWriter func(any) error
type ClientStreamCloser func() (ResponseMessage, error)
type RequestMessageStreamer struct {
key *ecdsa.PrivateKey
send RequestMessageWriter
close ClientStreamCloser
respCons ResponseConstructor
statusSupported bool
sendErr error
}
func NewUnarySignService(key *ecdsa.PrivateKey) *SignService { func NewUnarySignService(key *ecdsa.PrivateKey) *SignService {
return &SignService{ return &SignService{
key: key, key: key,
} }
} }
// SignResponse response with private key via signature.SignServiceMessage. func (s *RequestMessageStreamer) Send(req any) error {
// The signature error affects the result depending on the protocol version: // req argument should be strengthen with type RequestMessage
// - if status return is supported, panics since we cannot return the failed status, because it will not be signed. s.statusSupported = isStatusSupported(req.(RequestMessage)) // panic is OK here for now
// - otherwise, returns error in order to transport it directly.
func (s *SignService) SignResponse(statusSupported bool, resp ResponseMessage, err error) error { var err error
// verify request signatures
if err = signature.VerifyServiceMessage(req); err != nil {
err = fmt.Errorf("could not verify request: %w", err)
} else {
err = s.send(req)
}
if err != nil {
if !s.statusSupported {
return err
}
s.sendErr = err
return ErrAbortStream
}
return nil
}
func (s *RequestMessageStreamer) CloseAndRecv() (ResponseMessage, error) {
var (
resp ResponseMessage
err error
)
if s.sendErr != nil {
err = s.sendErr
} else {
resp, err = s.close()
if err != nil {
err = fmt.Errorf("could not close stream and receive response: %w", err)
}
}
if err != nil {
if !s.statusSupported {
return nil, err
}
resp = s.respCons()
setStatusV2(resp, err)
}
if err = signResponse(s.key, resp, s.statusSupported); err != nil {
return nil, err
}
return resp, nil
}
func (s *SignService) CreateRequestStreamer(sender RequestMessageWriter, closer ClientStreamCloser, blankResp ResponseConstructor) *RequestMessageStreamer {
return &RequestMessageStreamer{
key: s.key,
send: sender,
close: closer,
respCons: blankResp,
}
}
func (s *SignService) HandleServerStreamRequest(
req any,
respWriter ResponseMessageWriter,
blankResp ResponseConstructor,
respWriterCaller func(ResponseMessageWriter) error,
) error {
// handle protocol versions <=2.10 (API statuses was introduced in 2.11 only)
// req argument should be strengthen with type RequestMessage
statusSupported := isStatusSupported(req.(RequestMessage)) // panic is OK here for now
var err error
// verify request signatures
if err = signature.VerifyServiceMessage(req); err != nil {
err = fmt.Errorf("could not verify request: %w", err)
} else {
err = respWriterCaller(func(resp ResponseMessage) error {
if err := signResponse(s.key, resp, statusSupported); err != nil {
return err
}
return respWriter(resp)
})
}
if err != nil { if err != nil {
if !statusSupported { if !statusSupported {
return err return err
} }
resp := blankResp()
setStatusV2(resp, err)
_ = signResponse(s.key, resp, false) // panics or returns nil with false arg
return respWriter(resp)
}
return nil
}
func (s *SignService) HandleUnaryRequest(ctx context.Context, req any, handler UnaryHandler, blankResp ResponseConstructor) (ResponseMessage, error) {
// handle protocol versions <=2.10 (API statuses was introduced in 2.11 only)
// req argument should be strengthen with type RequestMessage
statusSupported := isStatusSupported(req.(RequestMessage)) // panic is OK here for now
var (
resp ResponseMessage
err error
)
// verify request signatures
if err = signature.VerifyServiceMessage(req); err != nil {
var sigErr apistatus.SignatureVerification
sigErr.SetMessage(err.Error())
err = sigErr
} else {
// process request
resp, err = handler(ctx, req)
}
if err != nil {
if !statusSupported {
return nil, err
}
resp = blankResp()
setStatusV2(resp, err) setStatusV2(resp, err)
} }
err = signature.SignServiceMessage(s.key, resp) // sign the response
if err != nil { if err = signResponse(s.key, resp, statusSupported); err != nil {
return fmt.Errorf("could not sign response: %w", err) return nil, err
} }
return nil return resp, nil
} }
func (s *SignService) VerifyRequest(req RequestMessage) error { func isStatusSupported(req RequestMessage) bool {
if err := signature.VerifyServiceMessage(req); err != nil {
var sigErr apistatus.SignatureVerification
sigErr.SetMessage(err.Error())
return sigErr
}
return nil
}
// WrapResponse creates an appropriate response struct if it is nil.
func WrapResponse[T any](resp *T, err error) (*T, error) {
if resp != nil {
return resp, err
}
return new(T), nil
}
// IsStatusSupported returns true iff request version implies expecting status return.
// This allows us to handle protocol versions <=2.10 (API statuses was introduced in 2.11 only).
func IsStatusSupported(req RequestMessage) bool {
version := req.GetMetaHeader().GetVersion() version := req.GetMetaHeader().GetVersion()
mjr := version.GetMajor() mjr := version.GetMajor()
@ -88,3 +228,22 @@ func setStatusV2(resp ResponseMessage, err error) {
session.SetStatus(resp, apistatus.ToStatusV2(apistatus.ErrToStatus(err))) session.SetStatus(resp, apistatus.ToStatusV2(apistatus.ErrToStatus(err)))
} }
// signs response with private key via signature.SignServiceMessage.
// The signature error affects the result depending on the protocol version:
// - if status return is supported, panics since we cannot return the failed status, because it will not be signed;
// - otherwise, returns error in order to transport it directly.
func signResponse(key *ecdsa.PrivateKey, resp any, statusSupported bool) error {
err := signature.SignServiceMessage(key, resp)
if err != nil {
err = fmt.Errorf("could not sign response: %w", err)
if statusSupported {
// We can't pass this error as status code since response will be unsigned.
// Isn't expected in practice, so panic is ok here.
panic(err)
}
}
return err
}