Compare commits

..

1 commit

Author SHA1 Message Date
9aeea0b974 [#153] ci: Minor pipeline fixes
- We can skip full pre-commit run
- On a very slow agent golangci run may take up to 10 minutes

Signed-off-by: Stanislav Bogatyrev <s.bogatyrev@yadro.com>
2023-03-21 09:33:45 +00:00
61 changed files with 1481 additions and 594 deletions

View file

@ -4,7 +4,7 @@
# options for analysis running
run:
# timeout for analysis, e.g. 30s, 5m, default is 1m
timeout: 5m
timeout: 10m
# include test files or not, default is true
tests: false

View file

@ -1,11 +0,0 @@
pipeline:
# Kludge for non-root containers under WoodPecker
fix-ownership:
image: alpine:latest
commands: chown -R 1234:1234 .
full-pre-commit:
image: git.frostfs.info/truecloudlab/frostfs-ci:v0.36
commands:
- export HOME="$(getent passwd $(id -u) | cut '-d:' -f6)"
- pre-commit run -a

View file

@ -19,8 +19,10 @@ func initAccountingService(c *cfg) {
server := accountingTransportGRPC.New(
accountingService.NewSignService(
&c.key.PrivateKey,
accountingService.NewExecutionService(
accounting.NewExecutor(balanceMorphWrapper),
accountingService.NewResponseService(
accountingService.NewExecutionService(
accounting.NewExecutor(balanceMorphWrapper),
),
c.respSvc,
),
),

View file

@ -590,7 +590,7 @@ func initCfg(appCfg *config.Config) *cfg {
key: key,
binPublicKey: key.PublicKey().Bytes(),
localAddr: netAddr,
respSvc: response.NewService(netState),
respSvc: response.NewService(response.WithNetworkState(netState)),
clientCache: cache.NewSDKClientCache(cacheOpts),
bgClientCache: cache.NewSDKClientCache(cacheOpts),
putClientCache: cache.NewSDKClientCache(cacheOpts),

View file

@ -197,13 +197,16 @@ func initContainerService(c *cfg) {
server := containerTransportGRPC.New(
containerService.NewSignService(
&c.key.PrivateKey,
&usedSpaceService{
Server: containerService.NewExecutionService(containerMorph.NewExecutor(cnrRdr, cnrWrt), c.respSvc),
loadWriterProvider: loadRouter,
loadPlacementBuilder: loadPlacementBuilder,
routeBuilder: routeBuilder,
cfg: c,
},
containerService.NewResponseService(
&usedSpaceService{
Server: containerService.NewExecutionService(containerMorph.NewExecutor(cnrRdr, cnrWrt)),
loadWriterProvider: loadRouter,
loadPlacementBuilder: loadPlacementBuilder,
routeBuilder: routeBuilder,
cfg: c,
},
c.respSvc,
),
),
)
@ -565,8 +568,6 @@ func (c *usedSpaceService) AnnounceUsedSpace(ctx context.Context, req *container
resp := new(containerV2.AnnounceUsedSpaceResponse)
resp.SetBody(respBody)
c.cfg.respSvc.SetMeta(resp)
return resp, nil
}

View file

@ -153,15 +153,17 @@ func initNetmapService(c *cfg) {
server := netmapTransportGRPC.New(
netmapService.NewSignService(
&c.key.PrivateKey,
netmapService.NewExecutionService(
c,
c.apiVersion,
&netInfo{
netState: c.cfgNetmap.state,
magic: c.cfgMorph.client,
morphClientNetMap: c.cfgNetmap.wrapper,
msPerBlockRdr: c.cfgMorph.client.MsPerBlock,
},
netmapService.NewResponseService(
netmapService.NewExecutionService(
c,
c.apiVersion,
&netInfo{
netState: c.cfgNetmap.state,
magic: c.cfgMorph.client,
morphClientNetMap: c.cfgNetmap.wrapper,
msPerBlockRdr: c.cfgMorph.client.MsPerBlock,
},
),
c.respSvc,
),
),

View file

@ -199,13 +199,16 @@ func initReputationService(c *cfg) {
server := grpcreputation.New(
reputationrpc.NewSignService(
&c.key.PrivateKey,
&reputationServer{
cfg: c,
log: c.log,
localRouter: localTrustRouter,
intermediateRouter: intermediateTrustRouter,
routeBuilder: localRouteBuilder,
},
reputationrpc.NewResponseService(
&reputationServer{
cfg: c,
log: c.log,
localRouter: localTrustRouter,
intermediateRouter: intermediateTrustRouter,
routeBuilder: localRouteBuilder,
},
c.respSvc,
),
),
)
@ -286,7 +289,6 @@ func (s *reputationServer) AnnounceLocalTrust(ctx context.Context, req *v2reputa
resp := new(v2reputation.AnnounceLocalTrustResponse)
resp.SetBody(new(v2reputation.AnnounceLocalTrustResponseBody))
s.respSvc.SetMeta(resp)
return resp, nil
}
@ -315,7 +317,6 @@ func (s *reputationServer) AnnounceIntermediateResult(ctx context.Context, req *
resp := new(v2reputation.AnnounceIntermediateResultResponse)
resp.SetBody(new(v2reputation.AnnounceIntermediateResultResponseBody))
s.respSvc.SetMeta(resp)
return resp, nil
}

View file

@ -53,7 +53,10 @@ func initSessionService(c *cfg) {
server := sessionTransportGRPC.New(
sessionSvc.NewSignService(
&c.key.PrivateKey,
sessionSvc.NewExecutionService(c.privateTokenStore, c.respSvc, c.log),
sessionSvc.NewResponseService(
sessionSvc.NewExecutionService(c.privateTokenStore, c.log),
c.respSvc,
),
),
)

View file

@ -5,7 +5,6 @@ import (
"fmt"
"git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/accounting"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/util/response"
)
type ServiceExecutor interface {
@ -13,15 +12,13 @@ type ServiceExecutor interface {
}
type executorSvc struct {
exec ServiceExecutor
respSvc *response.Service
exec ServiceExecutor
}
// NewExecutionService wraps ServiceExecutor and returns Accounting Service interface.
func NewExecutionService(exec ServiceExecutor, respSvc *response.Service) Server {
func NewExecutionService(exec ServiceExecutor) Server {
return &executorSvc{
exec: exec,
respSvc: respSvc,
exec: exec,
}
}
@ -34,6 +31,5 @@ func (s *executorSvc) Balance(ctx context.Context, req *accounting.BalanceReques
resp := new(accounting.BalanceResponse)
resp.SetBody(respBody)
s.respSvc.SetMeta(resp)
return resp, nil
}

View file

@ -0,0 +1,37 @@
package accounting
import (
"context"
"git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/accounting"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/util"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/util/response"
)
type responseService struct {
respSvc *response.Service
svc Server
}
// NewResponseService returns accounting service instance that passes internal service
// call to response service.
func NewResponseService(accSvc Server, respSvc *response.Service) Server {
return &responseService{
respSvc: respSvc,
svc: accSvc,
}
}
func (s *responseService) Balance(ctx context.Context, req *accounting.BalanceRequest) (*accounting.BalanceResponse, error) {
resp, err := s.respSvc.HandleUnaryRequest(ctx, req,
func(ctx context.Context, req any) (util.ResponseMessage, error) {
return s.svc.Balance(ctx, req.(*accounting.BalanceRequest))
},
)
if err != nil {
return nil, err
}
return resp.(*accounting.BalanceResponse), nil
}

View file

@ -22,6 +22,17 @@ func NewSignService(key *ecdsa.PrivateKey, svc Server) Server {
}
func (s *signService) Balance(ctx context.Context, req *accounting.BalanceRequest) (*accounting.BalanceResponse, error) {
resp, err := util.WrapResponse(s.svc.Balance(ctx, req))
return resp, s.sigSvc.SignResponse(util.IsStatusSupported(req), resp, err)
resp, err := s.sigSvc.HandleUnaryRequest(ctx, req,
func(ctx context.Context, req any) (util.ResponseMessage, error) {
return s.svc.Balance(ctx, req.(*accounting.BalanceRequest))
},
func() util.ResponseMessage {
return new(accounting.BalanceResponse)
},
)
if err != nil {
return nil, err
}
return resp.(*accounting.BalanceResponse), nil
}

View file

@ -6,7 +6,6 @@ import (
"git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/container"
"git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/session"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/util/response"
)
type ServiceExecutor interface {
@ -22,15 +21,12 @@ type executorSvc struct {
Server
exec ServiceExecutor
respSvc *response.Service
}
// NewExecutionService wraps ServiceExecutor and returns Container Service interface.
func NewExecutionService(exec ServiceExecutor, respSvc *response.Service) Server {
func NewExecutionService(exec ServiceExecutor) Server {
return &executorSvc{
exec: exec,
respSvc: respSvc,
exec: exec,
}
}
@ -48,7 +44,6 @@ func (s *executorSvc) Put(ctx context.Context, req *container.PutRequest) (*cont
resp := new(container.PutResponse)
resp.SetBody(respBody)
s.respSvc.SetMeta(resp)
return resp, nil
}
@ -66,7 +61,6 @@ func (s *executorSvc) Delete(ctx context.Context, req *container.DeleteRequest)
resp := new(container.DeleteResponse)
resp.SetBody(respBody)
s.respSvc.SetMeta(resp)
return resp, nil
}
@ -79,7 +73,6 @@ func (s *executorSvc) Get(ctx context.Context, req *container.GetRequest) (*cont
resp := new(container.GetResponse)
resp.SetBody(respBody)
s.respSvc.SetMeta(resp)
return resp, nil
}
@ -92,7 +85,6 @@ func (s *executorSvc) List(ctx context.Context, req *container.ListRequest) (*co
resp := new(container.ListResponse)
resp.SetBody(respBody)
s.respSvc.SetMeta(resp)
return resp, nil
}
@ -110,7 +102,6 @@ func (s *executorSvc) SetExtendedACL(ctx context.Context, req *container.SetExte
resp := new(container.SetExtendedACLResponse)
resp.SetBody(respBody)
s.respSvc.SetMeta(resp)
return resp, nil
}
@ -123,6 +114,5 @@ func (s *executorSvc) GetExtendedACL(ctx context.Context, req *container.GetExte
resp := new(container.GetExtendedACLResponse)
resp.SetBody(respBody)
s.respSvc.SetMeta(resp)
return resp, nil
}

View file

@ -0,0 +1,115 @@
package container
import (
"context"
"git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/container"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/util"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/util/response"
)
type responseService struct {
respSvc *response.Service
svc Server
}
// NewResponseService returns container service instance that passes internal service
// call to response service.
func NewResponseService(cnrSvc Server, respSvc *response.Service) Server {
return &responseService{
respSvc: respSvc,
svc: cnrSvc,
}
}
func (s *responseService) Put(ctx context.Context, req *container.PutRequest) (*container.PutResponse, error) {
resp, err := s.respSvc.HandleUnaryRequest(ctx, req,
func(ctx context.Context, req any) (util.ResponseMessage, error) {
return s.svc.Put(ctx, req.(*container.PutRequest))
},
)
if err != nil {
return nil, err
}
return resp.(*container.PutResponse), nil
}
func (s *responseService) Delete(ctx context.Context, req *container.DeleteRequest) (*container.DeleteResponse, error) {
resp, err := s.respSvc.HandleUnaryRequest(ctx, req,
func(ctx context.Context, req any) (util.ResponseMessage, error) {
return s.svc.Delete(ctx, req.(*container.DeleteRequest))
},
)
if err != nil {
return nil, err
}
return resp.(*container.DeleteResponse), nil
}
func (s *responseService) Get(ctx context.Context, req *container.GetRequest) (*container.GetResponse, error) {
resp, err := s.respSvc.HandleUnaryRequest(ctx, req,
func(ctx context.Context, req any) (util.ResponseMessage, error) {
return s.svc.Get(ctx, req.(*container.GetRequest))
},
)
if err != nil {
return nil, err
}
return resp.(*container.GetResponse), nil
}
func (s *responseService) List(ctx context.Context, req *container.ListRequest) (*container.ListResponse, error) {
resp, err := s.respSvc.HandleUnaryRequest(ctx, req,
func(ctx context.Context, req any) (util.ResponseMessage, error) {
return s.svc.List(ctx, req.(*container.ListRequest))
},
)
if err != nil {
return nil, err
}
return resp.(*container.ListResponse), nil
}
func (s *responseService) SetExtendedACL(ctx context.Context, req *container.SetExtendedACLRequest) (*container.SetExtendedACLResponse, error) {
resp, err := s.respSvc.HandleUnaryRequest(ctx, req,
func(ctx context.Context, req any) (util.ResponseMessage, error) {
return s.svc.SetExtendedACL(ctx, req.(*container.SetExtendedACLRequest))
},
)
if err != nil {
return nil, err
}
return resp.(*container.SetExtendedACLResponse), nil
}
func (s *responseService) GetExtendedACL(ctx context.Context, req *container.GetExtendedACLRequest) (*container.GetExtendedACLResponse, error) {
resp, err := s.respSvc.HandleUnaryRequest(ctx, req,
func(ctx context.Context, req any) (util.ResponseMessage, error) {
return s.svc.GetExtendedACL(ctx, req.(*container.GetExtendedACLRequest))
},
)
if err != nil {
return nil, err
}
return resp.(*container.GetExtendedACLResponse), nil
}
func (s *responseService) AnnounceUsedSpace(ctx context.Context, req *container.AnnounceUsedSpaceRequest) (*container.AnnounceUsedSpaceResponse, error) {
resp, err := s.respSvc.HandleUnaryRequest(ctx, req,
func(ctx context.Context, req any) (util.ResponseMessage, error) {
return s.svc.AnnounceUsedSpace(ctx, req.(*container.AnnounceUsedSpaceRequest))
},
)
if err != nil {
return nil, err
}
return resp.(*container.AnnounceUsedSpaceResponse), nil
}

View file

@ -22,64 +22,113 @@ func NewSignService(key *ecdsa.PrivateKey, svc Server) Server {
}
func (s *signService) Put(ctx context.Context, req *container.PutRequest) (*container.PutResponse, error) {
if err := s.sigSvc.VerifyRequest(req); err != nil {
resp := new(container.PutResponse)
return resp, s.sigSvc.SignResponse(util.IsStatusSupported(req), resp, err)
resp, err := s.sigSvc.HandleUnaryRequest(ctx, req,
func(ctx context.Context, req any) (util.ResponseMessage, error) {
return s.svc.Put(ctx, req.(*container.PutRequest))
},
func() util.ResponseMessage {
return new(container.PutResponse)
},
)
if err != nil {
return nil, err
}
resp, err := util.WrapResponse(s.svc.Put(ctx, req))
return resp, s.sigSvc.SignResponse(util.IsStatusSupported(req), resp, err)
return resp.(*container.PutResponse), nil
}
func (s *signService) Delete(ctx context.Context, req *container.DeleteRequest) (*container.DeleteResponse, error) {
if err := s.sigSvc.VerifyRequest(req); err != nil {
resp := new(container.DeleteResponse)
return resp, s.sigSvc.SignResponse(util.IsStatusSupported(req), resp, err)
resp, err := s.sigSvc.HandleUnaryRequest(ctx, req,
func(ctx context.Context, req any) (util.ResponseMessage, error) {
return s.svc.Delete(ctx, req.(*container.DeleteRequest))
},
func() util.ResponseMessage {
return new(container.DeleteResponse)
},
)
if err != nil {
return nil, err
}
resp, err := util.WrapResponse(s.svc.Delete(ctx, req))
return resp, s.sigSvc.SignResponse(util.IsStatusSupported(req), resp, err)
return resp.(*container.DeleteResponse), nil
}
func (s *signService) Get(ctx context.Context, req *container.GetRequest) (*container.GetResponse, error) {
if err := s.sigSvc.VerifyRequest(req); err != nil {
resp := new(container.GetResponse)
return resp, s.sigSvc.SignResponse(util.IsStatusSupported(req), resp, err)
resp, err := s.sigSvc.HandleUnaryRequest(ctx, req,
func(ctx context.Context, req any) (util.ResponseMessage, error) {
return s.svc.Get(ctx, req.(*container.GetRequest))
},
func() util.ResponseMessage {
return new(container.GetResponse)
},
)
if err != nil {
return nil, err
}
resp, err := util.WrapResponse(s.svc.Get(ctx, req))
return resp, s.sigSvc.SignResponse(util.IsStatusSupported(req), resp, err)
return resp.(*container.GetResponse), nil
}
func (s *signService) List(ctx context.Context, req *container.ListRequest) (*container.ListResponse, error) {
if err := s.sigSvc.VerifyRequest(req); err != nil {
resp := new(container.ListResponse)
return resp, s.sigSvc.SignResponse(util.IsStatusSupported(req), resp, err)
resp, err := s.sigSvc.HandleUnaryRequest(ctx, req,
func(ctx context.Context, req any) (util.ResponseMessage, error) {
return s.svc.List(ctx, req.(*container.ListRequest))
},
func() util.ResponseMessage {
return new(container.ListResponse)
},
)
if err != nil {
return nil, err
}
resp, err := util.WrapResponse(s.svc.List(ctx, req))
return resp, s.sigSvc.SignResponse(util.IsStatusSupported(req), resp, err)
return resp.(*container.ListResponse), nil
}
func (s *signService) SetExtendedACL(ctx context.Context, req *container.SetExtendedACLRequest) (*container.SetExtendedACLResponse, error) {
if err := s.sigSvc.VerifyRequest(req); err != nil {
resp := new(container.SetExtendedACLResponse)
return resp, s.sigSvc.SignResponse(util.IsStatusSupported(req), resp, err)
resp, err := s.sigSvc.HandleUnaryRequest(ctx, req,
func(ctx context.Context, req any) (util.ResponseMessage, error) {
return s.svc.SetExtendedACL(ctx, req.(*container.SetExtendedACLRequest))
},
func() util.ResponseMessage {
return new(container.SetExtendedACLResponse)
},
)
if err != nil {
return nil, err
}
resp, err := util.WrapResponse(s.svc.SetExtendedACL(ctx, req))
return resp, s.sigSvc.SignResponse(util.IsStatusSupported(req), resp, err)
return resp.(*container.SetExtendedACLResponse), nil
}
func (s *signService) GetExtendedACL(ctx context.Context, req *container.GetExtendedACLRequest) (*container.GetExtendedACLResponse, error) {
if err := s.sigSvc.VerifyRequest(req); err != nil {
resp := new(container.GetExtendedACLResponse)
return resp, s.sigSvc.SignResponse(util.IsStatusSupported(req), resp, err)
resp, err := s.sigSvc.HandleUnaryRequest(ctx, req,
func(ctx context.Context, req any) (util.ResponseMessage, error) {
return s.svc.GetExtendedACL(ctx, req.(*container.GetExtendedACLRequest))
},
func() util.ResponseMessage {
return new(container.GetExtendedACLResponse)
},
)
if err != nil {
return nil, err
}
resp, err := util.WrapResponse(s.svc.GetExtendedACL(ctx, req))
return resp, s.sigSvc.SignResponse(util.IsStatusSupported(req), resp, err)
return resp.(*container.GetExtendedACLResponse), nil
}
func (s *signService) AnnounceUsedSpace(ctx context.Context, req *container.AnnounceUsedSpaceRequest) (*container.AnnounceUsedSpaceResponse, error) {
if err := s.sigSvc.VerifyRequest(req); err != nil {
resp := new(container.AnnounceUsedSpaceResponse)
return resp, s.sigSvc.SignResponse(util.IsStatusSupported(req), resp, err)
resp, err := s.sigSvc.HandleUnaryRequest(ctx, req,
func(ctx context.Context, req any) (util.ResponseMessage, error) {
return s.svc.AnnounceUsedSpace(ctx, req.(*container.AnnounceUsedSpaceRequest))
},
func() util.ResponseMessage {
return new(container.AnnounceUsedSpaceResponse)
},
)
if err != nil {
return nil, err
}
resp, err := util.WrapResponse(s.svc.AnnounceUsedSpace(ctx, req))
return resp, s.sigSvc.SignResponse(util.IsStatusSupported(req), resp, err)
return resp.(*container.AnnounceUsedSpaceResponse), nil
}

View file

@ -8,7 +8,6 @@ import (
"git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/netmap"
"git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/refs"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/version"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/util/response"
netmapSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/netmap"
versionsdk "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/version"
)
@ -19,8 +18,6 @@ type executorSvc struct {
state NodeState
netInfo NetworkInfo
respSvc *response.Service
}
// NodeState encapsulates information
@ -45,8 +42,8 @@ type NetworkInfo interface {
Dump(versionsdk.Version) (*netmapSDK.NetworkInfo, error)
}
func NewExecutionService(s NodeState, v versionsdk.Version, netInfo NetworkInfo, respSvc *response.Service) Server {
if s == nil || netInfo == nil || !version.IsValid(v) || respSvc == nil {
func NewExecutionService(s NodeState, v versionsdk.Version, netInfo NetworkInfo) Server {
if s == nil || netInfo == nil || !version.IsValid(v) {
// this should never happen, otherwise it programmers bug
panic("can't create netmap execution service")
}
@ -54,7 +51,6 @@ func NewExecutionService(s NodeState, v versionsdk.Version, netInfo NetworkInfo,
res := &executorSvc{
state: s,
netInfo: netInfo,
respSvc: respSvc,
}
v.WriteToV2(&res.version)
@ -100,7 +96,6 @@ func (s *executorSvc) LocalNodeInfo(
resp := new(netmap.LocalNodeInfoResponse)
resp.SetBody(body)
s.respSvc.SetMeta(resp)
return resp, nil
}
@ -131,11 +126,10 @@ func (s *executorSvc) NetworkInfo(
resp := new(netmap.NetworkInfoResponse)
resp.SetBody(body)
s.respSvc.SetMeta(resp)
return resp, nil
}
func (s *executorSvc) Snapshot(_ context.Context, _ *netmap.SnapshotRequest) (*netmap.SnapshotResponse, error) {
func (s *executorSvc) Snapshot(_ context.Context, req *netmap.SnapshotRequest) (*netmap.SnapshotResponse, error) {
var nm netmap.NetMap
err := s.state.ReadCurrentNetMap(&nm)
@ -149,6 +143,5 @@ func (s *executorSvc) Snapshot(_ context.Context, _ *netmap.SnapshotRequest) (*n
resp := new(netmap.SnapshotResponse)
resp.SetBody(body)
s.respSvc.SetMeta(resp)
return resp, nil
}

View file

@ -0,0 +1,63 @@
package netmap
import (
"context"
"git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/netmap"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/util"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/util/response"
)
type responseService struct {
respSvc *response.Service
svc Server
}
// NewResponseService returns netmap service instance that passes internal service
// call to response service.
func NewResponseService(nmSvc Server, respSvc *response.Service) Server {
return &responseService{
respSvc: respSvc,
svc: nmSvc,
}
}
func (s *responseService) LocalNodeInfo(ctx context.Context, req *netmap.LocalNodeInfoRequest) (*netmap.LocalNodeInfoResponse, error) {
resp, err := s.respSvc.HandleUnaryRequest(ctx, req,
func(ctx context.Context, req any) (util.ResponseMessage, error) {
return s.svc.LocalNodeInfo(ctx, req.(*netmap.LocalNodeInfoRequest))
},
)
if err != nil {
return nil, err
}
return resp.(*netmap.LocalNodeInfoResponse), nil
}
func (s *responseService) NetworkInfo(ctx context.Context, req *netmap.NetworkInfoRequest) (*netmap.NetworkInfoResponse, error) {
resp, err := s.respSvc.HandleUnaryRequest(ctx, req,
func(ctx context.Context, req any) (util.ResponseMessage, error) {
return s.svc.NetworkInfo(ctx, req.(*netmap.NetworkInfoRequest))
},
)
if err != nil {
return nil, err
}
return resp.(*netmap.NetworkInfoResponse), nil
}
func (s *responseService) Snapshot(ctx context.Context, req *netmap.SnapshotRequest) (*netmap.SnapshotResponse, error) {
resp, err := s.respSvc.HandleUnaryRequest(ctx, req,
func(ctx context.Context, req any) (util.ResponseMessage, error) {
return s.svc.Snapshot(ctx, req.(*netmap.SnapshotRequest))
},
)
if err != nil {
return nil, err
}
return resp.(*netmap.SnapshotResponse), nil
}

View file

@ -24,28 +24,49 @@ func NewSignService(key *ecdsa.PrivateKey, svc Server) Server {
func (s *signService) LocalNodeInfo(
ctx context.Context,
req *netmap.LocalNodeInfoRequest) (*netmap.LocalNodeInfoResponse, error) {
if err := s.sigSvc.VerifyRequest(req); err != nil {
resp := new(netmap.LocalNodeInfoResponse)
return resp, s.sigSvc.SignResponse(util.IsStatusSupported(req), resp, err)
resp, err := s.sigSvc.HandleUnaryRequest(ctx, req,
func(ctx context.Context, req any) (util.ResponseMessage, error) {
return s.svc.LocalNodeInfo(ctx, req.(*netmap.LocalNodeInfoRequest))
},
func() util.ResponseMessage {
return new(netmap.LocalNodeInfoResponse)
},
)
if err != nil {
return nil, err
}
resp, err := util.WrapResponse(s.svc.LocalNodeInfo(ctx, req))
return resp, s.sigSvc.SignResponse(util.IsStatusSupported(req), resp, err)
return resp.(*netmap.LocalNodeInfoResponse), nil
}
func (s *signService) NetworkInfo(ctx context.Context, req *netmap.NetworkInfoRequest) (*netmap.NetworkInfoResponse, error) {
if err := s.sigSvc.VerifyRequest(req); err != nil {
resp := new(netmap.NetworkInfoResponse)
return resp, s.sigSvc.SignResponse(util.IsStatusSupported(req), resp, err)
resp, err := s.sigSvc.HandleUnaryRequest(ctx, req,
func(ctx context.Context, req any) (util.ResponseMessage, error) {
return s.svc.NetworkInfo(ctx, req.(*netmap.NetworkInfoRequest))
},
func() util.ResponseMessage {
return new(netmap.NetworkInfoResponse)
},
)
if err != nil {
return nil, err
}
resp, err := util.WrapResponse(s.svc.NetworkInfo(ctx, req))
return resp, s.sigSvc.SignResponse(util.IsStatusSupported(req), resp, err)
return resp.(*netmap.NetworkInfoResponse), nil
}
func (s *signService) Snapshot(ctx context.Context, req *netmap.SnapshotRequest) (*netmap.SnapshotResponse, error) {
if err := s.sigSvc.VerifyRequest(req); err != nil {
resp := new(netmap.SnapshotResponse)
return resp, s.sigSvc.SignResponse(util.IsStatusSupported(req), resp, err)
resp, err := s.sigSvc.HandleUnaryRequest(ctx, req,
func(ctx context.Context, req any) (util.ResponseMessage, error) {
return s.svc.Snapshot(ctx, req.(*netmap.SnapshotRequest))
},
func() util.ResponseMessage {
return new(netmap.SnapshotResponse)
},
)
if err != nil {
return nil, err
}
resp, err := util.WrapResponse(s.svc.Snapshot(ctx, req))
return resp, s.sigSvc.SignResponse(util.IsStatusSupported(req), resp, err)
return resp.(*netmap.SnapshotResponse), nil
}

View file

@ -45,30 +45,31 @@ type headerSource struct {
incompleteObjectHeaders bool
}
func (c *cfg) initDefault() {
c.storage = (*localStorage)(nil)
func defaultCfg() *cfg {
return &cfg{
storage: new(localStorage),
}
}
func NewMessageHeaderSource(opts ...Option) (eaclSDK.TypedHeaderSource, error) {
var c cfg
c.initDefault()
cfg := defaultCfg()
for i := range opts {
opts[i](&c)
opts[i](cfg)
}
if c.msg == nil {
if cfg.msg == nil {
return nil, errors.New("message is not provided")
}
var res headerSource
err := c.readObjectHeaders(&res)
err := cfg.readObjectHeaders(&res)
if err != nil {
return nil, err
}
res.requestHeaders = requestHeaders(c.msg)
res.requestHeaders = requestHeaders(cfg.msg)
return res, nil
}
@ -101,20 +102,20 @@ func requestHeaders(msg xHeaderSource) []eaclSDK.Header {
var errMissingOID = errors.New("object ID is missing")
// nolint: funlen
func (c *cfg) readObjectHeaders(dst *headerSource) error {
switch m := c.msg.(type) {
func (h *cfg) readObjectHeaders(dst *headerSource) error {
switch m := h.msg.(type) {
default:
panic(fmt.Sprintf("unexpected message type %T", c.msg))
panic(fmt.Sprintf("unexpected message type %T", h.msg))
case requestXHeaderSource:
switch req := m.req.(type) {
case
*objectV2.GetRequest,
*objectV2.HeadRequest:
if c.obj == nil {
if h.obj == nil {
return errMissingOID
}
objHeaders, completed := c.localObjectHeaders(c.cnr, c.obj)
objHeaders, completed := h.localObjectHeaders(h.cnr, h.obj)
dst.objectHeaders = objHeaders
dst.incompleteObjectHeaders = !completed
@ -122,18 +123,18 @@ func (c *cfg) readObjectHeaders(dst *headerSource) error {
*objectV2.GetRangeRequest,
*objectV2.GetRangeHashRequest,
*objectV2.DeleteRequest:
if c.obj == nil {
if h.obj == nil {
return errMissingOID
}
dst.objectHeaders = addressHeaders(c.cnr, c.obj)
dst.objectHeaders = addressHeaders(h.cnr, h.obj)
case *objectV2.PutRequest:
if v, ok := req.GetBody().GetObjectPart().(*objectV2.PutObjectPartInit); ok {
oV2 := new(objectV2.Object)
oV2.SetObjectID(v.GetObjectID())
oV2.SetHeader(v.GetHeader())
dst.objectHeaders = headersFromObject(object.NewFromV2(oV2), c.cnr, c.obj)
dst.objectHeaders = headersFromObject(object.NewFromV2(oV2), h.cnr, h.obj)
}
case *objectV2.SearchRequest:
cnrV2 := req.GetBody().GetContainerID()
@ -150,7 +151,7 @@ func (c *cfg) readObjectHeaders(dst *headerSource) error {
case responseXHeaderSource:
switch resp := m.resp.(type) {
default:
objectHeaders, completed := c.localObjectHeaders(c.cnr, c.obj)
objectHeaders, completed := h.localObjectHeaders(h.cnr, h.obj)
dst.objectHeaders = objectHeaders
dst.incompleteObjectHeaders = !completed
@ -160,7 +161,7 @@ func (c *cfg) readObjectHeaders(dst *headerSource) error {
oV2.SetObjectID(v.GetObjectID())
oV2.SetHeader(v.GetHeader())
dst.objectHeaders = headersFromObject(object.NewFromV2(oV2), c.cnr, c.obj)
dst.objectHeaders = headersFromObject(object.NewFromV2(oV2), h.cnr, h.obj)
}
case *objectV2.HeadResponse:
oV2 := new(objectV2.Object)
@ -172,7 +173,7 @@ func (c *cfg) readObjectHeaders(dst *headerSource) error {
hdr = new(objectV2.Header)
var idV2 refsV2.ContainerID
c.cnr.WriteToV2(&idV2)
h.cnr.WriteToV2(&idV2)
hdr.SetContainerID(&idV2)
hdr.SetVersion(v.GetVersion())
@ -186,20 +187,20 @@ func (c *cfg) readObjectHeaders(dst *headerSource) error {
oV2.SetHeader(hdr)
dst.objectHeaders = headersFromObject(object.NewFromV2(oV2), c.cnr, c.obj)
dst.objectHeaders = headersFromObject(object.NewFromV2(oV2), h.cnr, h.obj)
}
}
return nil
}
func (c *cfg) localObjectHeaders(cnr cid.ID, idObj *oid.ID) ([]eaclSDK.Header, bool) {
func (h *cfg) localObjectHeaders(cnr cid.ID, idObj *oid.ID) ([]eaclSDK.Header, bool) {
if idObj != nil {
var addr oid.Address
addr.SetContainer(cnr)
addr.SetObject(*idObj)
obj, err := c.storage.Head(addr)
obj, err := h.storage.Head(addr)
if err == nil {
return headersFromObject(obj, cnr, idObj), true
}

View file

@ -8,12 +8,14 @@ import (
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
)
type localStorage engine.StorageEngine
type localStorage struct {
ls *engine.StorageEngine
}
func (s *localStorage) Head(addr oid.Address) (*objectSDK.Object, error) {
if s == nil {
if s.ls == nil {
return nil, io.ErrUnexpectedEOF
}
return engine.Head((*engine.StorageEngine)(s), addr)
return engine.Head(s.ls, addr)
}

View file

@ -14,7 +14,9 @@ func WithObjectStorage(v ObjectStorage) Option {
func WithLocalObjectStorage(v *engine.StorageEngine) Option {
return func(c *cfg) {
c.storage = (*localStorage)(v)
c.storage = &localStorage{
ls: v,
}
}
}

View file

@ -21,7 +21,7 @@ import (
// Service checks basic ACL rules.
type Service struct {
cfg
*cfg
c senderClassifier
}
@ -72,17 +72,18 @@ type cfg struct {
next object.ServiceServer
}
func (c *cfg) initDefault() {
c.log = &logger.Logger{Logger: zap.L()}
func defaultCfg() *cfg {
return &cfg{
log: &logger.Logger{Logger: zap.L()},
}
}
// New is a constructor for object ACL checking service.
func New(opts ...Option) *Service {
var s Service
s.cfg.initDefault()
func New(opts ...Option) Service {
cfg := defaultCfg()
for i := range opts {
opts[i](&s.cfg)
opts[i](cfg)
}
panicOnNil := func(v any, name string) {
@ -91,18 +92,20 @@ func New(opts ...Option) *Service {
}
}
panicOnNil(s.cfg.next, "next Service")
panicOnNil(s.cfg.nm, "netmap client")
panicOnNil(s.cfg.irFetcher, "inner Ring fetcher")
panicOnNil(s.cfg.checker, "acl checker")
panicOnNil(s.cfg.containers, "container source")
panicOnNil(cfg.next, "next Service")
panicOnNil(cfg.nm, "netmap client")
panicOnNil(cfg.irFetcher, "inner Ring fetcher")
panicOnNil(cfg.checker, "acl checker")
panicOnNil(cfg.containers, "container source")
s.c = senderClassifier{
log: s.cfg.log,
innerRing: s.cfg.irFetcher,
netmap: s.cfg.nm,
return Service{
cfg: cfg,
c: senderClassifier{
log: cfg.log,
innerRing: cfg.irFetcher,
netmap: cfg.nm,
},
}
return &s
}
// Get implements ServiceServer interface, makes ACL checks and calls

View file

@ -5,7 +5,9 @@ import (
"strconv"
objectV2 "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/object"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/util"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger"
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
"go.uber.org/zap"
@ -43,17 +45,37 @@ const (
func (exec *execCtx) setLogger(l *logger.Logger) {
exec.log = &logger.Logger{Logger: l.With(
zap.String("request", "DELETE"),
zap.Stringer("address", exec.prm.addr),
zap.Bool("local", exec.prm.common.LocalOnly()),
zap.Stringer("address", exec.address()),
zap.Bool("local", exec.isLocal()),
zap.Bool("with session", exec.prm.common.SessionToken() != nil),
zap.Bool("with bearer", exec.prm.common.BearerToken() != nil),
)}
}
func (exec execCtx) context() context.Context {
return exec.ctx
}
func (exec execCtx) isLocal() bool {
return exec.prm.common.LocalOnly()
}
func (exec *execCtx) address() oid.Address {
return exec.prm.addr
}
func (exec *execCtx) containerID() cid.ID {
return exec.prm.addr.Container()
}
func (exec *execCtx) commonParameters() *util.CommonPrm {
return exec.prm.common
}
func (exec *execCtx) newAddress(id oid.ID) oid.Address {
var a oid.Address
a.SetObject(id)
a.SetContainer(exec.prm.addr.Container())
a.SetContainer(exec.containerID())
return a
}
@ -219,11 +241,11 @@ func (exec *execCtx) initTombstoneObject() bool {
}
exec.tombstoneObj = object.New()
exec.tombstoneObj.SetContainerID(exec.prm.addr.Container())
exec.tombstoneObj.SetContainerID(exec.containerID())
exec.tombstoneObj.SetType(object.TypeTombstone)
exec.tombstoneObj.SetPayload(payload)
tokenSession := exec.prm.common.SessionToken()
tokenSession := exec.commonParameters().SessionToken()
if tokenSession != nil {
issuer := tokenSession.Issuer()
exec.tombstoneObj.SetOwnerID(&issuer)

View file

@ -36,7 +36,7 @@ func (exec *execCtx) formTombstone() (ok bool) {
exec.tombstone.SetExpirationEpoch(
exec.svc.netInfo.CurrentEpoch() + tsLifetime,
)
exec.addMembers([]oid.ID{exec.prm.addr.Object()})
exec.addMembers([]oid.ID{exec.address().Object()})
exec.log.Debug("forming split info...")

View file

@ -15,7 +15,7 @@ import (
// Service utility serving requests of Object.Get service.
type Service struct {
cfg
*cfg
}
// Option is a Service's constructor option.
@ -60,21 +60,24 @@ type cfg struct {
keyStorage *util.KeyStorage
}
func (c *cfg) initDefault() {
c.log = &logger.Logger{Logger: zap.L()}
func defaultCfg() *cfg {
return &cfg{
log: &logger.Logger{Logger: zap.L()},
}
}
// New creates, initializes and returns utility serving
// Object.Get service requests.
func New(opts ...Option) *Service {
var s Service
s.cfg.initDefault()
c := defaultCfg()
for i := range opts {
opts[i](&s.cfg)
opts[i](c)
}
return &s
return &Service{
cfg: c,
}
}
// WithLogger returns option to specify Delete service's logger.

View file

@ -24,12 +24,12 @@ func (w *headSvcWrapper) headAddress(exec *execCtx, addr oid.Address) (*object.O
wr := getsvc.NewSimpleObjectWriter()
p := getsvc.HeadPrm{}
p.SetCommonParameters(exec.prm.common)
p.SetCommonParameters(exec.commonParameters())
p.SetHeaderWriter(wr)
p.WithRawFlag(true)
p.WithAddress(addr)
err := (*getsvc.Service)(w).Head(exec.ctx, p)
err := (*getsvc.Service)(w).Head(exec.context(), p)
if err != nil {
return nil, err
}
@ -38,7 +38,7 @@ func (w *headSvcWrapper) headAddress(exec *execCtx, addr oid.Address) (*object.O
}
func (w *headSvcWrapper) splitInfo(exec *execCtx) (*object.SplitInfo, error) {
_, err := w.headAddress(exec, exec.prm.addr)
_, err := w.headAddress(exec, exec.address())
var errSplitInfo *object.SplitInfoError
@ -89,11 +89,11 @@ func (w *searchSvcWrapper) splitMembers(exec *execCtx) ([]oid.ID, error) {
p := searchsvc.Prm{}
p.SetWriter(wr)
p.SetCommonParameters(exec.prm.common)
p.WithContainerID(exec.prm.addr.Container())
p.SetCommonParameters(exec.commonParameters())
p.WithContainerID(exec.containerID())
p.WithSearchFilters(fs)
err := (*searchsvc.Service)(w).Search(exec.ctx, p)
err := (*searchsvc.Service)(w).Search(exec.context(), p)
if err != nil {
return nil, err
}
@ -108,7 +108,7 @@ func (s *simpleIDWriter) WriteIDs(ids []oid.ID) error {
}
func (w *putSvcWrapper) put(exec *execCtx) (*oid.ID, error) {
streamer, err := (*putsvc.Service)(w).Put(exec.ctx)
streamer, err := (*putsvc.Service)(w).Put(exec.context())
if err != nil {
return nil, err
}
@ -116,7 +116,7 @@ func (w *putSvcWrapper) put(exec *execCtx) (*oid.ID, error) {
payload := exec.tombstoneObj.Payload()
initPrm := new(putsvc.PutInitPrm).
WithCommonPrm(exec.prm.common).
WithCommonPrm(exec.commonParameters()).
WithObject(exec.tombstoneObj.CutPayload())
err = streamer.Init(initPrm)

View file

@ -9,7 +9,7 @@ import (
// Service implements Delete operation of Object service v2.
type Service struct {
cfg
*cfg
}
// Option represents Service constructor option.
@ -21,13 +21,15 @@ type cfg struct {
// NewService constructs Service instance from provided options.
func NewService(opts ...Option) *Service {
var s Service
c := new(cfg)
for i := range opts {
opts[i](&s.cfg)
opts[i](c)
}
return &s
return &Service{
cfg: c,
}
}
// Delete calls internal service.

View file

@ -36,26 +36,26 @@ func (exec *execCtx) assemble() {
exec.log.Debug("trying to assemble the object...")
assembler := newAssembler(exec.prm.addr, exec.splitInfo, exec.prm.rng, exec)
assembler := newAssembler(exec.address(), exec.splitInfo(), exec.ctxRange(), exec)
exec.log.Debug("assembling splitted object...",
zap.Stringer("address", exec.prm.addr),
zap.Uint64("range_offset", exec.prm.rng.GetOffset()),
zap.Uint64("range_length", exec.prm.rng.GetLength()),
zap.Stringer("address", exec.address()),
zap.Uint64("range_offset", exec.ctxRange().GetOffset()),
zap.Uint64("range_length", exec.ctxRange().GetLength()),
)
defer exec.log.Debug("assembling splitted object completed",
zap.Stringer("address", exec.prm.addr),
zap.Uint64("range_offset", exec.prm.rng.GetOffset()),
zap.Uint64("range_length", exec.prm.rng.GetLength()),
zap.Stringer("address", exec.address()),
zap.Uint64("range_offset", exec.ctxRange().GetOffset()),
zap.Uint64("range_length", exec.ctxRange().GetLength()),
)
obj, err := assembler.Assemble(exec.ctx, exec.prm.objWriter)
obj, err := assembler.Assemble(exec.context(), exec.prm.objWriter)
if err != nil {
exec.log.Warn("failed to assemble splitted object",
zap.Error(err),
zap.Stringer("address", exec.prm.addr),
zap.Uint64("range_offset", exec.prm.rng.GetOffset()),
zap.Uint64("range_length", exec.prm.rng.GetLength()),
zap.Stringer("address", exec.address()),
zap.Uint64("range_offset", exec.ctxRange().GetOffset()),
zap.Uint64("range_length", exec.ctxRange().GetLength()),
)
}
@ -98,7 +98,7 @@ func equalAddresses(a, b oid.Address) bool {
func (exec *execCtx) HeadObject(ctx context.Context, id oid.ID) (*objectSDK.Object, error) {
p := exec.prm
p.common = p.common.WithLocalOnly(false)
p.addr.SetContainer(exec.prm.addr.Container())
p.addr.SetContainer(exec.containerID())
p.addr.SetObject(id)
prm := HeadPrm{
@ -108,7 +108,7 @@ func (exec *execCtx) HeadObject(ctx context.Context, id oid.ID) (*objectSDK.Obje
w := NewSimpleObjectWriter()
prm.SetHeaderWriter(w)
//nolint: contextcheck
err := exec.svc.Head(exec.ctx, prm)
err := exec.svc.Head(exec.context(), prm)
if err != nil {
return nil, err
@ -125,11 +125,11 @@ func (exec *execCtx) GetObject(ctx context.Context, id oid.ID, rng *objectSDK.Ra
p.objWriter = w
p.SetRange(rng)
p.addr.SetContainer(exec.prm.addr.Container())
p.addr.SetContainer(exec.containerID())
p.addr.SetObject(id)
//nolint: contextcheck
statusError := exec.svc.get(exec.ctx, p.commonPrm, withPayloadRange(rng))
statusError := exec.svc.get(exec.context(), p.commonPrm, withPayloadRange(rng))
if statusError.err != nil {
return nil, statusError.err

View file

@ -8,12 +8,12 @@ import (
)
func (exec *execCtx) executeOnContainer() {
if exec.prm.common.LocalOnly() {
if exec.isLocal() {
exec.log.Debug("return result directly")
return
}
lookupDepth := exec.prm.common.NetmapLookupDepth()
lookupDepth := exec.netmapLookupDepth()
exec.log.Debug("trying to execute in container...",
zap.Uint64("netmap lookup depth", lookupDepth),
@ -47,12 +47,12 @@ func (exec *execCtx) processCurrentEpoch() bool {
zap.Uint64("number", exec.curProcEpoch),
)
traverser, ok := exec.generateTraverser(exec.prm.addr)
traverser, ok := exec.generateTraverser(exec.address())
if !ok {
return true
}
ctx, cancel := context.WithCancel(exec.ctx)
ctx, cancel := context.WithCancel(exec.context())
defer cancel()
exec.status = statusUndefined

View file

@ -8,6 +8,7 @@ import (
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/util"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object_manager/placement"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger"
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
"go.uber.org/zap"
@ -28,13 +29,13 @@ type execCtx struct {
statusError
splitInfo *objectSDK.SplitInfo
infoSplit *objectSDK.SplitInfo
log *logger.Logger
collectedObject *objectSDK.Object
headOnly bool
head bool
curProcEpoch uint64
}
@ -51,7 +52,7 @@ const (
func headOnly() execOption {
return func(c *execCtx) {
c.headOnly = true
c.head = true
}
}
@ -63,22 +64,38 @@ func withPayloadRange(r *objectSDK.Range) execOption {
func (exec *execCtx) setLogger(l *logger.Logger) {
req := "GET"
if exec.headOnly {
if exec.headOnly() {
req = "HEAD"
} else if exec.prm.rng != nil {
} else if exec.ctxRange() != nil {
req = "GET_RANGE"
}
exec.log = &logger.Logger{Logger: l.With(
zap.String("request", req),
zap.Stringer("address", exec.prm.addr),
zap.Bool("raw", exec.prm.raw),
zap.Bool("local", exec.prm.common.LocalOnly()),
zap.Stringer("address", exec.address()),
zap.Bool("raw", exec.isRaw()),
zap.Bool("local", exec.isLocal()),
zap.Bool("with session", exec.prm.common.SessionToken() != nil),
zap.Bool("with bearer", exec.prm.common.BearerToken() != nil),
)}
}
func (exec execCtx) context() context.Context {
return exec.ctx
}
func (exec execCtx) isLocal() bool {
return exec.prm.common.LocalOnly()
}
func (exec execCtx) isRaw() bool {
return exec.prm.raw
}
func (exec execCtx) address() oid.Address {
return exec.prm.addr
}
func (exec execCtx) key() (*ecdsa.PrivateKey, error) {
if exec.prm.signerKey != nil {
// the key has already been requested and
@ -99,16 +116,40 @@ func (exec execCtx) key() (*ecdsa.PrivateKey, error) {
}
func (exec *execCtx) canAssemble() bool {
return !exec.prm.raw && !exec.headOnly
return !exec.isRaw() && !exec.headOnly()
}
func (exec *execCtx) splitInfo() *objectSDK.SplitInfo {
return exec.infoSplit
}
func (exec *execCtx) containerID() cid.ID {
return exec.address().Container()
}
func (exec *execCtx) ctxRange() *objectSDK.Range {
return exec.prm.rng
}
func (exec *execCtx) headOnly() bool {
return exec.head
}
func (exec *execCtx) netmapEpoch() uint64 {
return exec.prm.common.NetmapEpoch()
}
func (exec *execCtx) netmapLookupDepth() uint64 {
return exec.prm.common.NetmapLookupDepth()
}
func (exec *execCtx) initEpoch() bool {
exec.curProcEpoch = exec.prm.common.NetmapEpoch()
exec.curProcEpoch = exec.netmapEpoch()
if exec.curProcEpoch > 0 {
return true
}
e, err := exec.svc.epochSource.Epoch()
e, err := exec.svc.currentEpochReceiver.currentEpoch()
switch {
default:
@ -177,12 +218,12 @@ func mergeSplitInfo(dst, src *objectSDK.SplitInfo) {
}
func (exec *execCtx) writeCollectedHeader() bool {
if exec.prm.rng != nil {
if exec.ctxRange() != nil {
return true
}
err := exec.prm.objWriter.WriteHeader(
exec.ctx,
exec.context(),
exec.collectedObject.CutPayload(),
)
@ -203,11 +244,11 @@ func (exec *execCtx) writeCollectedHeader() bool {
}
func (exec *execCtx) writeObjectPayload(obj *objectSDK.Object) bool {
if exec.headOnly {
if exec.headOnly() {
return true
}
err := exec.prm.objWriter.WriteChunk(exec.ctx, obj.Payload())
err := exec.prm.objWriter.WriteChunk(exec.context(), obj.Payload())
switch {
default:
@ -231,6 +272,12 @@ func (exec *execCtx) writeCollectedObject() {
}
}
// isForwardingEnabled returns true if common execution
// parameters has request forwarding closure set.
func (exec execCtx) isForwardingEnabled() bool {
return exec.prm.forwarder != nil
}
// disableForwarding removes request forwarding closure from common
// parameters, so it won't be inherited in new execution contexts.
func (exec *execCtx) disableForwarding() {

View file

@ -69,7 +69,7 @@ func (s *Service) get(ctx context.Context, prm commonPrm, opts ...execOption) st
prm: RangePrm{
commonPrm: prm,
},
splitInfo: object.NewSplitInfo(),
infoSplit: object.NewSplitInfo(),
}
for i := range opts {

View file

@ -56,7 +56,7 @@ type testClient struct {
type testEpochReceiver uint64
func (e testEpochReceiver) Epoch() (uint64, error) {
func (e testEpochReceiver) currentEpoch() (uint64, error) {
return uint64(e), nil
}
@ -118,7 +118,7 @@ func newTestClient() *testClient {
}
func (c *testClient) getObject(exec *execCtx, _ client.NodeInfo) (*objectSDK.Object, error) {
v, ok := c.results[exec.prm.addr.EncodeToString()]
v, ok := c.results[exec.address().EncodeToString()]
if !ok {
var errNotFound apistatus.ObjectNotFound
@ -129,7 +129,7 @@ func (c *testClient) getObject(exec *execCtx, _ client.NodeInfo) (*objectSDK.Obj
return nil, v.err
}
return cutToRange(v.obj, exec.prm.rng), nil
return cutToRange(v.obj, exec.ctxRange()), nil
}
func (c *testClient) addResult(addr oid.Address, obj *objectSDK.Object, err error) {
@ -143,7 +143,7 @@ func (s *testStorage) get(exec *execCtx) (*objectSDK.Object, error) {
var (
ok bool
obj *objectSDK.Object
sAddr = exec.prm.addr.EncodeToString()
sAddr = exec.address().EncodeToString()
)
if _, ok = s.inhumed[sAddr]; ok {
@ -157,7 +157,7 @@ func (s *testStorage) get(exec *execCtx) (*objectSDK.Object, error) {
}
if obj, ok = s.phy[sAddr]; ok {
return cutToRange(obj, exec.prm.rng), nil
return cutToRange(obj, exec.ctxRange()), nil
}
var errNotFound apistatus.ObjectNotFound
@ -245,7 +245,7 @@ func TestGetLocalOnly(t *testing.T) {
ctx := context.Background()
newSvc := func(storage *testStorage) *Service {
svc := &Service{}
svc := &Service{cfg: new(cfg)}
svc.log = test.NewLogger(false)
svc.localStorage = storage
@ -506,7 +506,7 @@ func TestGetRemoteSmall(t *testing.T) {
container.CalculateID(&idCnr, cnr)
newSvc := func(b *testPlacementBuilder, c *testClientCache) *Service {
svc := &Service{}
svc := &Service{cfg: new(cfg)}
svc.log = test.NewLogger(false)
svc.localStorage = newTestStorage()
@ -519,7 +519,7 @@ func TestGetRemoteSmall(t *testing.T) {
},
}
svc.clientCache = c
svc.epochSource = testEpochReceiver(curEpoch)
svc.currentEpochReceiver = testEpochReceiver(curEpoch)
return svc
}
@ -1639,7 +1639,7 @@ func TestGetFromPastEpoch(t *testing.T) {
c22 := newTestClient()
c22.addResult(addr, obj, nil)
svc := &Service{}
svc := &Service{cfg: new(cfg)}
svc.log = test.NewLogger(false)
svc.localStorage = newTestStorage()
@ -1670,7 +1670,7 @@ func TestGetFromPastEpoch(t *testing.T) {
},
}
svc.epochSource = testEpochReceiver(curEpoch)
svc.currentEpochReceiver = testEpochReceiver(curEpoch)
w := NewSimpleObjectWriter()

View file

@ -34,8 +34,8 @@ func (exec *execCtx) executeLocal() {
exec.err = errRemoved
case errors.As(err, &errSplitInfo):
exec.status = statusVIRTUAL
mergeSplitInfo(exec.splitInfo, errSplitInfo.SplitInfo())
exec.err = objectSDK.NewSplitInfoError(exec.splitInfo)
mergeSplitInfo(exec.splitInfo(), errSplitInfo.SplitInfo())
exec.err = objectSDK.NewSplitInfoError(exec.infoSplit)
case errors.As(err, &errOutOfRange):
exec.status = statusOutOfRange
exec.err = errOutOfRange

View file

@ -54,8 +54,8 @@ func (exec *execCtx) processNode(ctx context.Context, info client.NodeInfo) bool
exec.err = errOutOfRange
case errors.As(err, &errSplitInfo):
exec.status = statusVIRTUAL
mergeSplitInfo(exec.splitInfo, errSplitInfo.SplitInfo())
exec.err = objectSDK.NewSplitInfoError(exec.splitInfo)
mergeSplitInfo(exec.splitInfo(), errSplitInfo.SplitInfo())
exec.err = objectSDK.NewSplitInfoError(exec.infoSplit)
}
return exec.status != statusUndefined

View file

@ -15,7 +15,7 @@ import (
// Service utility serving requests of Object.Get service.
type Service struct {
cfg
*cfg
}
// Option is a Service's constructor option.
@ -40,31 +40,33 @@ type cfg struct {
GenerateTraverser(cid.ID, *oid.ID, uint64) (*placement.Traverser, error)
}
epochSource epochSource
currentEpochReceiver interface {
currentEpoch() (uint64, error)
}
keyStore *util.KeyStorage
}
type epochSource interface {
Epoch() (uint64, error)
}
func (c *cfg) initDefault() {
c.log = &logger.Logger{Logger: zap.L()}
c.clientCache = new(clientCacheWrapper)
func defaultCfg() *cfg {
return &cfg{
log: &logger.Logger{Logger: zap.L()},
localStorage: new(storageEngineWrapper),
clientCache: new(clientCacheWrapper),
}
}
// New creates, initializes and returns utility serving
// Object.Get service requests.
func New(opts ...Option) *Service {
var s Service
s.cfg.initDefault()
c := defaultCfg()
for i := range opts {
opts[i](&s.cfg)
opts[i](c)
}
return &s
return &Service{
cfg: c,
}
}
// WithLogger returns option to specify Get service's logger.
@ -78,7 +80,7 @@ func WithLogger(l *logger.Logger) Option {
// instance.
func WithLocalStorageEngine(e *engine.StorageEngine) Option {
return func(c *cfg) {
c.localStorage = (*storageEngineWrapper)(e)
c.localStorage.(*storageEngineWrapper).engine = e
}
}
@ -105,7 +107,9 @@ func WithTraverserGenerator(t *util.TraverserGenerator) Option {
// map storage to receive current network state.
func WithNetMapSource(nmSrc netmap.Source) Option {
return func(c *cfg) {
c.epochSource = nmSrc
c.currentEpochReceiver = &nmSrcWrapper{
nmSrc: nmSrc,
}
}
}

View file

@ -7,6 +7,7 @@ import (
"io"
coreclient "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/client"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/netmap"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/engine"
internal "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/internal/client"
internalclient "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/internal/client"
@ -28,7 +29,9 @@ type clientWrapper struct {
client coreclient.MultiAddressClient
}
type storageEngineWrapper engine.StorageEngine
type storageEngineWrapper struct {
engine *engine.StorageEngine
}
type partWriter struct {
ObjectWriter
@ -42,6 +45,10 @@ type hasherWrapper struct {
hash io.Writer
}
type nmSrcWrapper struct {
nmSrc netmap.Source
}
func NewSimpleObjectWriter() *SimpleObjectWriter {
return &SimpleObjectWriter{
obj: object.New(),
@ -82,7 +89,7 @@ func (c *clientCacheWrapper) get(info coreclient.NodeInfo) (getClient, error) {
// nolint: funlen
func (c *clientWrapper) getObject(exec *execCtx, info coreclient.NodeInfo) (*object.Object, error) {
if exec.prm.forwarder != nil {
if exec.isForwardingEnabled() {
return exec.prm.forwarder(info, c.client)
}
@ -91,20 +98,20 @@ func (c *clientWrapper) getObject(exec *execCtx, info coreclient.NodeInfo) (*obj
return nil, err
}
if exec.headOnly {
if exec.headOnly() {
var prm internalclient.HeadObjectPrm
prm.SetContext(exec.ctx)
prm.SetContext(exec.context())
prm.SetClient(c.client)
prm.SetTTL(exec.prm.common.TTL())
prm.SetNetmapEpoch(exec.curProcEpoch)
prm.SetAddress(exec.prm.addr)
prm.SetAddress(exec.address())
prm.SetPrivateKey(key)
prm.SetSessionToken(exec.prm.common.SessionToken())
prm.SetBearerToken(exec.prm.common.BearerToken())
prm.SetXHeaders(exec.prm.common.XHeaders())
if exec.prm.raw {
if exec.isRaw() {
prm.SetRawFlag()
}
@ -117,21 +124,21 @@ func (c *clientWrapper) getObject(exec *execCtx, info coreclient.NodeInfo) (*obj
}
// we don't specify payload writer because we accumulate
// the object locally (even huge).
if rng := exec.prm.rng; rng != nil {
if rng := exec.ctxRange(); rng != nil {
var prm internalclient.PayloadRangePrm
prm.SetContext(exec.ctx)
prm.SetContext(exec.context())
prm.SetClient(c.client)
prm.SetTTL(exec.prm.common.TTL())
prm.SetNetmapEpoch(exec.curProcEpoch)
prm.SetAddress(exec.prm.addr)
prm.SetAddress(exec.address())
prm.SetPrivateKey(key)
prm.SetSessionToken(exec.prm.common.SessionToken())
prm.SetBearerToken(exec.prm.common.BearerToken())
prm.SetXHeaders(exec.prm.common.XHeaders())
prm.SetRange(rng)
if exec.prm.raw {
if exec.isRaw() {
prm.SetRawFlag()
}
@ -168,17 +175,17 @@ func (c *clientWrapper) getObject(exec *execCtx, info coreclient.NodeInfo) (*obj
func (c *clientWrapper) get(exec *execCtx, key *ecdsa.PrivateKey) (*object.Object, error) {
var prm internalclient.GetObjectPrm
prm.SetContext(exec.ctx)
prm.SetContext(exec.context())
prm.SetClient(c.client)
prm.SetTTL(exec.prm.common.TTL())
prm.SetNetmapEpoch(exec.curProcEpoch)
prm.SetAddress(exec.prm.addr)
prm.SetAddress(exec.address())
prm.SetPrivateKey(key)
prm.SetSessionToken(exec.prm.common.SessionToken())
prm.SetBearerToken(exec.prm.common.BearerToken())
prm.SetXHeaders(exec.prm.common.XHeaders())
if exec.prm.raw {
if exec.isRaw() {
prm.SetRawFlag()
}
@ -190,25 +197,24 @@ func (c *clientWrapper) get(exec *execCtx, key *ecdsa.PrivateKey) (*object.Objec
return res.Object(), nil
}
func (w *storageEngineWrapper) get(exec *execCtx) (*object.Object, error) {
e := (*engine.StorageEngine)(w)
if exec.headOnly {
func (e *storageEngineWrapper) get(exec *execCtx) (*object.Object, error) {
if exec.headOnly() {
var headPrm engine.HeadPrm
headPrm.WithAddress(exec.prm.addr)
headPrm.WithRaw(exec.prm.raw)
headPrm.WithAddress(exec.address())
headPrm.WithRaw(exec.isRaw())
r, err := e.Head(headPrm)
r, err := e.engine.Head(headPrm)
if err != nil {
return nil, err
}
return r.Header(), nil
} else if rng := exec.prm.rng; rng != nil {
} else if rng := exec.ctxRange(); rng != nil {
var getRange engine.RngPrm
getRange.WithAddress(exec.prm.addr)
getRange.WithAddress(exec.address())
getRange.WithPayloadRange(rng)
r, err := e.GetRange(getRange)
r, err := e.engine.GetRange(getRange)
if err != nil {
return nil, err
}
@ -216,9 +222,9 @@ func (w *storageEngineWrapper) get(exec *execCtx) (*object.Object, error) {
return r.Object(), nil
} else {
var getPrm engine.GetPrm
getPrm.WithAddress(exec.prm.addr)
getPrm.WithAddress(exec.address())
r, err := e.Get(getPrm)
r, err := e.engine.Get(getPrm)
if err != nil {
return nil, err
}
@ -246,3 +252,7 @@ func (h *hasherWrapper) WriteChunk(_ context.Context, p []byte) error {
_, err := h.hash.Write(p)
return err
}
func (n *nmSrcWrapper) currentEpoch() (uint64, error) {
return n.nmSrc.Epoch()
}

View file

@ -13,7 +13,7 @@ import (
// Service implements Get operation of Object service v2.
type Service struct {
cfg
*cfg
}
// Option represents Service constructor option.
@ -27,13 +27,15 @@ type cfg struct {
// NewService constructs Service instance from provided options.
func NewService(opts ...Option) *Service {
var s Service
c := new(cfg)
for i := range opts {
opts[i](&s.cfg)
opts[i](c)
}
return &s
return &Service{
cfg: c,
}
}
// Get calls internal service and returns v2 object stream.

View file

@ -17,11 +17,14 @@ import (
type preparedObjectTarget interface {
WriteObject(*objectSDK.Object, object.ContentMeta) error
Close() (*transformer.AccessIdentifiers, error)
}
type distributedTarget struct {
traversal traversal
remotePool, localPool util.WorkerPool
obj *objectSDK.Object
objMeta object.ContentMeta
@ -29,7 +32,7 @@ type distributedTarget struct {
nodeTargetInitializer func(nodeDesc) preparedObjectTarget
getWorkerPool func([]byte) (util.WorkerPool, bool)
isLocalKey func([]byte) bool
relay func(nodeDesc) error
@ -45,6 +48,9 @@ type traversal struct {
// need of additional broadcast after the object is saved
extraBroadcastEnabled bool
// mtx protects mExclude map.
mtx sync.RWMutex
// container nodes which was processed during the primary object placement
mExclude map[string]struct{}
}
@ -70,17 +76,21 @@ func (x *traversal) submitProcessed(n placement.Node) {
if x.extraBroadcastEnabled {
key := string(n.PublicKey())
x.mtx.Lock()
if x.mExclude == nil {
x.mExclude = make(map[string]struct{}, 1)
}
x.mExclude[key] = struct{}{}
x.mtx.Unlock()
}
}
// checks if specified node was processed during the primary object placement.
func (x *traversal) processed(n placement.Node) bool {
x.mtx.RLock()
_, ok := x.mExclude[string(n.PublicKey())]
x.mtx.RUnlock()
return ok
}
@ -146,9 +156,10 @@ func (t *distributedTarget) sendObject(node nodeDesc) error {
target := t.nodeTargetInitializer(node)
err := target.WriteObject(t.obj, t.objMeta)
if err != nil {
if err := target.WriteObject(t.obj, t.objMeta); err != nil {
return fmt.Errorf("could not write header: %w", err)
} else if _, err := target.Close(); err != nil {
return fmt.Errorf("could not close object stream: %w", err)
}
return nil
}
@ -185,12 +196,27 @@ loop:
addr := addrs[i]
workerPool, isLocal := t.getWorkerPool(addr.PublicKey())
isLocal := t.isLocalKey(addr.PublicKey())
var workerPool util.WorkerPool
if isLocal {
workerPool = t.localPool
} else {
workerPool = t.remotePool
}
if err := workerPool.Submit(func() {
defer wg.Done()
err := f(nodeDesc{local: isLocal, info: addr})
// mark the container node as processed in order to exclude it
// in subsequent container broadcast. Note that we don't
// process this node during broadcast if primary placement
// on it failed.
t.traversal.submitProcessed(addr)
if err != nil {
resErr.Store(err)
svcutil.LogServiceError(t.log, "PUT", addr.Addresses(), err)
@ -205,12 +231,6 @@ loop:
break loop
}
// mark the container node as processed in order to exclude it
// in subsequent container broadcast. Note that we don't
// process this node during broadcast if primary placement
// on it failed.
t.traversal.submitProcessed(addrs[i])
}
wg.Wait()

View file

@ -4,6 +4,7 @@ import (
"fmt"
objectCore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object_manager/transformer"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
)
@ -25,26 +26,40 @@ type ObjectStorage interface {
type localTarget struct {
storage ObjectStorage
obj *object.Object
meta objectCore.ContentMeta
}
func (t localTarget) WriteObject(obj *object.Object, meta objectCore.ContentMeta) error {
switch meta.Type() {
func (t *localTarget) WriteObject(obj *object.Object, meta objectCore.ContentMeta) error {
t.obj = obj
t.meta = meta
return nil
}
func (t *localTarget) Close() (*transformer.AccessIdentifiers, error) {
switch t.meta.Type() {
case object.TypeTombstone:
err := t.storage.Delete(objectCore.AddressOf(obj), meta.Objects())
err := t.storage.Delete(objectCore.AddressOf(t.obj), t.meta.Objects())
if err != nil {
return fmt.Errorf("could not delete objects from tombstone locally: %w", err)
return nil, fmt.Errorf("could not delete objects from tombstone locally: %w", err)
}
case object.TypeLock:
err := t.storage.Lock(objectCore.AddressOf(obj), meta.Objects())
err := t.storage.Lock(objectCore.AddressOf(t.obj), t.meta.Objects())
if err != nil {
return fmt.Errorf("could not lock object from lock objects locally: %w", err)
return nil, fmt.Errorf("could not lock object from lock objects locally: %w", err)
}
default:
// objects that do not change meta storage
}
if err := t.storage.Put(obj); err != nil {
return fmt.Errorf("(%T) could not put object to local storage: %w", t, err)
if err := t.storage.Put(t.obj); err != nil {
return nil, fmt.Errorf("(%T) could not put object to local storage: %w", t, err)
}
return nil
id, _ := t.obj.ID()
return new(transformer.AccessIdentifiers).
WithSelfID(id), nil
}

View file

@ -10,6 +10,7 @@ import (
objectcore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object"
internalclient "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/internal/client"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/util"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object_manager/transformer"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/netmap"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
)
@ -24,6 +25,8 @@ type remoteTarget struct {
nodeInfo clientcore.NodeInfo
obj *object.Object
clientConstructor ClientConstructor
}
@ -43,9 +46,15 @@ type RemotePutPrm struct {
}
func (t *remoteTarget) WriteObject(obj *object.Object, _ objectcore.ContentMeta) error {
t.obj = obj
return nil
}
func (t *remoteTarget) Close() (*transformer.AccessIdentifiers, error) {
c, err := t.clientConstructor.Get(t.nodeInfo)
if err != nil {
return fmt.Errorf("(%T) could not create SDK client %s: %w", t, t.nodeInfo, err)
return nil, fmt.Errorf("(%T) could not create SDK client %s: %w", t, t.nodeInfo, err)
}
var prm internalclient.PutObjectPrm
@ -56,14 +65,15 @@ func (t *remoteTarget) WriteObject(obj *object.Object, _ objectcore.ContentMeta)
prm.SetSessionToken(t.commonPrm.SessionToken())
prm.SetBearerToken(t.commonPrm.BearerToken())
prm.SetXHeaders(t.commonPrm.XHeaders())
prm.SetObject(obj)
prm.SetObject(t.obj)
_, err = internalclient.PutObject(prm)
res, err := internalclient.PutObject(prm)
if err != nil {
return fmt.Errorf("(%T) could not put object to %s: %w", t, t.nodeInfo.AddressGroup(), err)
return nil, fmt.Errorf("(%T) could not put object to %s: %w", t, t.nodeInfo.AddressGroup(), err)
}
return nil
return new(transformer.AccessIdentifiers).
WithSelfID(res.ID()), nil
}
// NewRemoteSender creates, initializes and returns new RemoteSender instance.
@ -110,8 +120,9 @@ func (s *RemoteSender) PutObject(ctx context.Context, p *RemotePutPrm) error {
return fmt.Errorf("parse client node info: %w", err)
}
err = t.WriteObject(p.obj, objectcore.ContentMeta{})
if err != nil {
if err := t.WriteObject(p.obj, objectcore.ContentMeta{}); err != nil {
return fmt.Errorf("(%T) could not send object header: %w", s, err)
} else if _, err := t.Close(); err != nil {
return fmt.Errorf("(%T) could not send object: %w", s, err)
}

View file

@ -22,7 +22,7 @@ type MaxSizeSource interface {
}
type Service struct {
cfg
*cfg
}
type Option func(*cfg)
@ -57,28 +57,31 @@ type cfg struct {
log *logger.Logger
}
func (c *cfg) initDefault() {
c.remotePool = util.NewPseudoWorkerPool()
c.localPool = util.NewPseudoWorkerPool()
c.log = &logger.Logger{Logger: zap.L()}
func defaultCfg() *cfg {
return &cfg{
remotePool: util.NewPseudoWorkerPool(),
localPool: util.NewPseudoWorkerPool(),
log: &logger.Logger{Logger: zap.L()},
}
}
func NewService(opts ...Option) *Service {
var s Service
s.cfg.initDefault()
c := defaultCfg()
for i := range opts {
opts[i](&s.cfg)
opts[i](c)
}
s.cfg.fmtValidator = object.NewFormatValidator(s.cfg.fmtValidatorOpts...)
c.fmtValidator = object.NewFormatValidator(c.fmtValidatorOpts...)
return &s
return &Service{
cfg: c,
}
}
func (p *Service) Put(ctx context.Context) (*Streamer, error) {
return &Streamer{
cfg: &p.cfg,
cfg: p.cfg,
ctx: ctx,
}, nil
}

View file

@ -11,7 +11,6 @@ import (
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/util"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object_manager/placement"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object_manager/transformer"
pkgutil "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util"
containerSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/user"
@ -222,11 +221,12 @@ func (p *Streamer) newCommonTarget(prm *PutInitPrm) transformer.ObjectTarget {
extraBroadcastEnabled: withBroadcast,
},
payload: getPayload(),
getWorkerPool: p.getWorkerPool,
payload: getPayload(),
remotePool: p.remotePool,
localPool: p.localPool,
nodeTargetInitializer: func(node nodeDesc) preparedObjectTarget {
if node.local {
return localTarget{
return &localTarget{
storage: p.localStore,
}
}
@ -245,6 +245,8 @@ func (p *Streamer) newCommonTarget(prm *PutInitPrm) transformer.ObjectTarget {
relay: relay,
fmt: p.fmtValidator,
log: p.log,
isLocalKey: p.netmapKeys.IsLocalKey,
}
}
@ -281,10 +283,3 @@ func (p *Streamer) Close() (*PutResponse, error) {
id: ids.SelfID(),
}, nil
}
func (p *Streamer) getWorkerPool(pub []byte) (pkgutil.WorkerPool, bool) {
if p.netmapKeys.IsLocalKey(pub) {
return p.localPool, true
}
return p.remotePool, false
}

View file

@ -11,7 +11,7 @@ import (
// Service implements Put operation of Object service v2.
type Service struct {
cfg
*cfg
}
// Option represents Service constructor option.
@ -24,13 +24,15 @@ type cfg struct {
// NewService constructs Service instance from provided options.
func NewService(opts ...Option) *Service {
var s Service
c := new(cfg)
for i := range opts {
opts[i](&s.cfg)
opts[i](c)
}
return &s
return &Service{
cfg: c,
}
}
// Put calls internal service and returns v2 object streamer.

View file

@ -5,6 +5,7 @@ import (
"fmt"
"git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/object"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/util"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/util/response"
)
@ -15,26 +16,25 @@ type ResponseService struct {
}
type searchStreamResponser struct {
SearchStream
util.ServerStream
respSvc *response.Service
respWriter util.ResponseMessageWriter
}
type getStreamResponser struct {
GetObjectStream
util.ServerStream
respSvc *response.Service
respWriter util.ResponseMessageWriter
}
type getRangeStreamResponser struct {
GetObjectRangeStream
util.ServerStream
respSvc *response.Service
respWriter util.ResponseMessageWriter
}
type putStreamResponser struct {
stream PutObjectStream
respSvc *response.Service
stream *response.ClientMessageStreamer
}
// NewResponseService returns object service instance that passes internal service
@ -47,32 +47,29 @@ func NewResponseService(objSvc ServiceServer, respSvc *response.Service) *Respon
}
func (s *getStreamResponser) Send(resp *object.GetResponse) error {
s.respSvc.SetMeta(resp)
return s.GetObjectStream.Send(resp)
return s.respWriter(resp)
}
func (s *ResponseService) Get(req *object.GetRequest, stream GetObjectStream) error {
return s.svc.Get(req, &getStreamResponser{
GetObjectStream: stream,
respSvc: s.respSvc,
ServerStream: stream,
respWriter: s.respSvc.HandleServerStreamRequest(func(resp util.ResponseMessage) error {
return stream.Send(resp.(*object.GetResponse))
}),
})
}
func (s *putStreamResponser) Send(req *object.PutRequest) error {
if err := s.stream.Send(req); err != nil {
return fmt.Errorf("could not send the request: %w", err)
}
return nil
return s.stream.Send(req)
}
func (s *putStreamResponser) CloseAndRecv() (*object.PutResponse, error) {
r, err := s.stream.CloseAndRecv()
if err != nil {
return nil, fmt.Errorf("could not close stream and receive response: %w", err)
return nil, fmt.Errorf("(%T) could not receive response: %w", s, err)
}
s.respSvc.SetMeta(r)
return r, nil
return r.(*object.PutResponse), nil
}
func (s *ResponseService) Put(ctx context.Context) (PutObjectStream, error) {
@ -82,61 +79,78 @@ func (s *ResponseService) Put(ctx context.Context) (PutObjectStream, error) {
}
return &putStreamResponser{
stream: stream,
respSvc: s.respSvc,
stream: s.respSvc.CreateRequestStreamer(
func(req any) error {
return stream.Send(req.(*object.PutRequest))
},
func() (util.ResponseMessage, error) {
return stream.CloseAndRecv()
},
),
}, nil
}
func (s *ResponseService) Head(ctx context.Context, req *object.HeadRequest) (*object.HeadResponse, error) {
resp, err := s.svc.Head(ctx, req)
resp, err := s.respSvc.HandleUnaryRequest(ctx, req,
func(ctx context.Context, req any) (util.ResponseMessage, error) {
return s.svc.Head(ctx, req.(*object.HeadRequest))
},
)
if err != nil {
return nil, err
}
s.respSvc.SetMeta(resp)
return resp, nil
return resp.(*object.HeadResponse), nil
}
func (s *searchStreamResponser) Send(resp *object.SearchResponse) error {
s.respSvc.SetMeta(resp)
return s.SearchStream.Send(resp)
return s.respWriter(resp)
}
func (s *ResponseService) Search(req *object.SearchRequest, stream SearchStream) error {
return s.svc.Search(req, &searchStreamResponser{
SearchStream: stream,
respSvc: s.respSvc,
ServerStream: stream,
respWriter: s.respSvc.HandleServerStreamRequest(func(resp util.ResponseMessage) error {
return stream.Send(resp.(*object.SearchResponse))
}),
})
}
func (s *ResponseService) Delete(ctx context.Context, req *object.DeleteRequest) (*object.DeleteResponse, error) {
resp, err := s.svc.Delete(ctx, req)
resp, err := s.respSvc.HandleUnaryRequest(ctx, req,
func(ctx context.Context, req any) (util.ResponseMessage, error) {
return s.svc.Delete(ctx, req.(*object.DeleteRequest))
},
)
if err != nil {
return nil, err
}
s.respSvc.SetMeta(resp)
return resp, nil
return resp.(*object.DeleteResponse), nil
}
func (s *getRangeStreamResponser) Send(resp *object.GetRangeResponse) error {
s.respSvc.SetMeta(resp)
return s.GetObjectRangeStream.Send(resp)
return s.respWriter(resp)
}
func (s *ResponseService) GetRange(req *object.GetRangeRequest, stream GetObjectRangeStream) error {
return s.svc.GetRange(req, &getRangeStreamResponser{
GetObjectRangeStream: stream,
respSvc: s.respSvc,
ServerStream: stream,
respWriter: s.respSvc.HandleServerStreamRequest(func(resp util.ResponseMessage) error {
return stream.Send(resp.(*object.GetRangeResponse))
}),
})
}
func (s *ResponseService) GetRangeHash(ctx context.Context, req *object.GetRangeHashRequest) (*object.GetRangeHashResponse, error) {
resp, err := s.svc.GetRangeHash(ctx, req)
resp, err := s.respSvc.HandleUnaryRequest(ctx, req,
func(ctx context.Context, req any) (util.ResponseMessage, error) {
return s.svc.GetRangeHash(ctx, req.(*object.GetRangeHashRequest))
},
)
if err != nil {
return nil, err
}
s.respSvc.SetMeta(resp)
return resp, nil
return resp.(*object.GetRangeHashResponse), nil
}

View file

@ -10,12 +10,12 @@ import (
)
func (exec *execCtx) executeOnContainer() {
if exec.prm.common.LocalOnly() {
if exec.isLocal() {
exec.log.Debug("return result directly")
return
}
lookupDepth := exec.prm.common.NetmapLookupDepth()
lookupDepth := exec.netmapLookupDepth()
exec.log.Debug("trying to execute in container...",
zap.Uint64("netmap lookup depth", lookupDepth),
@ -52,12 +52,12 @@ func (exec *execCtx) processCurrentEpoch() bool {
zap.Uint64("number", exec.curProcEpoch),
)
traverser, ok := exec.generateTraverser(exec.prm.cnr)
traverser, ok := exec.generateTraverser(exec.containerID())
if !ok {
return true
}
ctx, cancel := context.WithCancel(exec.ctx)
ctx, cancel := context.WithCancel(exec.context())
defer cancel()
for {

View file

@ -6,6 +6,7 @@ import (
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object_manager/placement"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger"
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
"go.uber.org/zap"
)
@ -44,20 +45,44 @@ func (exec *execCtx) prepare() {
func (exec *execCtx) setLogger(l *logger.Logger) {
exec.log = &logger.Logger{Logger: l.With(
zap.String("request", "SEARCH"),
zap.Stringer("container", exec.prm.cnr),
zap.Bool("local", exec.prm.common.LocalOnly()),
zap.Stringer("container", exec.containerID()),
zap.Bool("local", exec.isLocal()),
zap.Bool("with session", exec.prm.common.SessionToken() != nil),
zap.Bool("with bearer", exec.prm.common.BearerToken() != nil),
)}
}
func (exec execCtx) context() context.Context {
return exec.ctx
}
func (exec execCtx) isLocal() bool {
return exec.prm.common.LocalOnly()
}
func (exec *execCtx) containerID() cid.ID {
return exec.prm.cnr
}
func (exec *execCtx) searchFilters() object.SearchFilters {
return exec.prm.filters
}
func (exec *execCtx) netmapEpoch() uint64 {
return exec.prm.common.NetmapEpoch()
}
func (exec *execCtx) netmapLookupDepth() uint64 {
return exec.prm.common.NetmapLookupDepth()
}
func (exec *execCtx) initEpoch() bool {
exec.curProcEpoch = exec.prm.common.NetmapEpoch()
exec.curProcEpoch = exec.netmapEpoch()
if exec.curProcEpoch > 0 {
return true
}
e, err := exec.svc.epochSource.Epoch()
e, err := exec.svc.currentEpochReceiver.currentEpoch()
switch {
default:

View file

@ -51,7 +51,7 @@ type simpleIDWriter struct {
type testEpochReceiver uint64
func (e testEpochReceiver) Epoch() (uint64, error) {
func (e testEpochReceiver) currentEpoch() (uint64, error) {
return uint64(e), nil
}
@ -103,7 +103,7 @@ func (c *testClientCache) get(info clientcore.NodeInfo) (searchClient, error) {
}
func (s *testStorage) search(exec *execCtx) ([]oid.ID, error) {
v, ok := s.items[exec.prm.cnr.EncodeToString()]
v, ok := s.items[exec.containerID().EncodeToString()]
if !ok {
return nil, nil
}
@ -112,7 +112,7 @@ func (s *testStorage) search(exec *execCtx) ([]oid.ID, error) {
}
func (c *testStorage) searchObjects(exec *execCtx, _ clientcore.NodeInfo) ([]oid.ID, error) {
v, ok := c.items[exec.prm.cnr.EncodeToString()]
v, ok := c.items[exec.containerID().EncodeToString()]
if !ok {
return nil, nil
}
@ -146,7 +146,7 @@ func TestGetLocalOnly(t *testing.T) {
ctx := context.Background()
newSvc := func(storage *testStorage) *Service {
svc := &Service{}
svc := &Service{cfg: new(cfg)}
svc.log = test.NewLogger(false)
svc.localStorage = storage
@ -248,7 +248,7 @@ func TestGetRemoteSmall(t *testing.T) {
container.CalculateID(&id, cnr)
newSvc := func(b *testPlacementBuilder, c *testClientCache) *Service {
svc := &Service{}
svc := &Service{cfg: new(cfg)}
svc.log = test.NewLogger(false)
svc.localStorage = newTestStorage()
@ -261,7 +261,7 @@ func TestGetRemoteSmall(t *testing.T) {
},
}
svc.clientConstructor = c
svc.epochSource = testEpochReceiver(curEpoch)
svc.currentEpochReceiver = testEpochReceiver(curEpoch)
return svc
}
@ -357,7 +357,7 @@ func TestGetFromPastEpoch(t *testing.T) {
ids22 := generateIDs(10)
c22.addResult(idCnr, ids22, nil)
svc := &Service{}
svc := &Service{cfg: new(cfg)}
svc.log = test.NewLogger(false)
svc.localStorage = newTestStorage()
@ -388,7 +388,7 @@ func TestGetFromPastEpoch(t *testing.T) {
},
}
svc.epochSource = testEpochReceiver(curEpoch)
svc.currentEpochReceiver = testEpochReceiver(curEpoch)
w := new(simpleIDWriter)

View file

@ -15,7 +15,7 @@ import (
// Service is an utility serving requests
// of Object.Search service.
type Service struct {
cfg
*cfg
}
// Option is a Service's constructor option.
@ -46,31 +46,32 @@ type cfg struct {
generateTraverser(cid.ID, uint64) (*placement.Traverser, error)
}
epochSource epochSource
currentEpochReceiver interface {
currentEpoch() (uint64, error)
}
keyStore *util.KeyStorage
}
type epochSource interface {
Epoch() (uint64, error)
}
func (c *cfg) initDefault() {
c.log = &logger.Logger{Logger: zap.L()}
c.clientConstructor = new(clientConstructorWrapper)
func defaultCfg() *cfg {
return &cfg{
log: &logger.Logger{Logger: zap.L()},
clientConstructor: new(clientConstructorWrapper),
}
}
// New creates, initializes and returns utility serving
// Object.Get service requests.
func New(opts ...Option) *Service {
var s Service
s.cfg.initDefault()
c := defaultCfg()
for i := range opts {
opts[i](&s.cfg)
opts[i](c)
}
return &s
return &Service{
cfg: c,
}
}
// WithLogger returns option to specify Get service's logger.
@ -109,7 +110,9 @@ func WithTraverserGenerator(t *util.TraverserGenerator) Option {
// map storage to receive current network state.
func WithNetMapSource(nmSrc netmap.Source) Option {
return func(c *cfg) {
c.epochSource = nmSrc
c.currentEpochReceiver = &nmSrcWrapper{
nmSrc: nmSrc,
}
}
}

View file

@ -4,6 +4,7 @@ import (
"sync"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/client"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/netmap"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/engine"
internalclient "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/internal/client"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/util"
@ -34,6 +35,10 @@ type storageEngineWrapper struct {
type traverseGeneratorWrapper util.TraverserGenerator
type nmSrcWrapper struct {
nmSrc netmap.Source
}
func newUniqueAddressWriter(w IDListWriter) IDListWriter {
return &uniqueIDWriter{
written: make(map[oid.ID]struct{}),
@ -93,7 +98,7 @@ func (c *clientWrapper) searchObjects(exec *execCtx, info client.NodeInfo) ([]oi
var prm internalclient.SearchObjectsPrm
prm.SetContext(exec.ctx)
prm.SetContext(exec.context())
prm.SetClient(c.client)
prm.SetPrivateKey(key)
prm.SetSessionToken(exec.prm.common.SessionToken())
@ -101,8 +106,8 @@ func (c *clientWrapper) searchObjects(exec *execCtx, info client.NodeInfo) ([]oi
prm.SetTTL(exec.prm.common.TTL())
prm.SetXHeaders(exec.prm.common.XHeaders())
prm.SetNetmapEpoch(exec.curProcEpoch)
prm.SetContainerID(exec.prm.cnr)
prm.SetFilters(exec.prm.filters)
prm.SetContainerID(exec.containerID())
prm.SetFilters(exec.searchFilters())
res, err := internalclient.SearchObjects(prm)
if err != nil {
@ -114,8 +119,8 @@ func (c *clientWrapper) searchObjects(exec *execCtx, info client.NodeInfo) ([]oi
func (e *storageEngineWrapper) search(exec *execCtx) ([]oid.ID, error) {
var selectPrm engine.SelectPrm
selectPrm.WithFilters(exec.prm.filters)
selectPrm.WithContainerID(exec.prm.cnr)
selectPrm.WithFilters(exec.searchFilters())
selectPrm.WithContainerID(exec.containerID())
r, err := e.storage.Select(selectPrm)
if err != nil {
@ -138,3 +143,7 @@ func idsFromAddresses(addrs []oid.Address) []oid.ID {
func (e *traverseGeneratorWrapper) generateTraverser(cnr cid.ID, epoch uint64) (*placement.Traverser, error) {
return (*util.TraverserGenerator)(e).GenerateTraverser(cnr, nil, epoch)
}
func (n *nmSrcWrapper) currentEpoch() (uint64, error) {
return n.nmSrc.Epoch()
}

View file

@ -9,7 +9,7 @@ import (
// Service implements Search operation of Object service v2.
type Service struct {
cfg
*cfg
}
// Option represents Service constructor option.
@ -23,13 +23,15 @@ type cfg struct {
// NewService constructs Service instance from provided options.
func NewService(opts ...Option) *Service {
var s Service
c := new(cfg)
for i := range opts {
opts[i](&s.cfg)
opts[i](c)
}
return &s
return &Service{
cfg: c,
}
}
// Search calls internal service and returns v2 object stream.

View file

@ -18,29 +18,27 @@ type SignService struct {
}
type searchStreamSigner struct {
SearchStream
statusSupported bool
sigSvc *util.SignService
nonEmptyResp bool // set on first Send call
util.ServerStream
respWriter util.ResponseMessageWriter
nonEmptyResp bool // set on first Send call
}
type getStreamSigner struct {
GetObjectStream
statusSupported bool
sigSvc *util.SignService
util.ServerStream
respWriter util.ResponseMessageWriter
}
type putStreamSigner struct {
sigSvc *util.SignService
stream PutObjectStream
statusSupported bool
err error
stream *util.RequestMessageStreamer
}
type getRangeStreamSigner struct {
GetObjectRangeStream
statusSupported bool
sigSvc *util.SignService
util.ServerStream
respWriter util.ResponseMessageWriter
}
func NewSignService(key *ecdsa.PrivateKey, svc ServiceServer) *SignService {
@ -52,50 +50,37 @@ func NewSignService(key *ecdsa.PrivateKey, svc ServiceServer) *SignService {
}
func (s *getStreamSigner) Send(resp *object.GetResponse) error {
if err := s.sigSvc.SignResponse(s.statusSupported, resp, nil); err != nil {
return err
}
return s.GetObjectStream.Send(resp)
return s.respWriter(resp)
}
func (s *SignService) Get(req *object.GetRequest, stream GetObjectStream) error {
if err := s.sigSvc.VerifyRequest(req); err != nil {
resp := new(object.GetResponse)
_ = s.sigSvc.SignResponse(util.IsStatusSupported(req), resp, err)
return stream.Send(resp)
}
return s.svc.Get(req, &getStreamSigner{
GetObjectStream: stream,
sigSvc: s.sigSvc,
statusSupported: util.IsStatusSupported(req),
})
return s.sigSvc.HandleServerStreamRequest(req,
func(resp util.ResponseMessage) error {
return stream.Send(resp.(*object.GetResponse))
},
func() util.ResponseMessage {
return new(object.GetResponse)
},
func(respWriter util.ResponseMessageWriter) error {
return s.svc.Get(req, &getStreamSigner{
ServerStream: stream,
respWriter: respWriter,
})
},
)
}
func (s *putStreamSigner) Send(req *object.PutRequest) error {
s.statusSupported = util.IsStatusSupported(req)
if s.err = s.sigSvc.VerifyRequest(req); s.err != nil {
return util.ErrAbortStream
}
if s.err = s.stream.Send(req); s.err != nil {
return util.ErrAbortStream
}
return nil
return s.stream.Send(req)
}
func (s *putStreamSigner) CloseAndRecv() (resp *object.PutResponse, err error) {
if s.err != nil {
err = s.err
resp = new(object.PutResponse)
} else {
resp, err = s.stream.CloseAndRecv()
if err != nil {
return nil, fmt.Errorf("could not close stream and receive response: %w", err)
}
func (s *putStreamSigner) CloseAndRecv() (*object.PutResponse, error) {
r, err := s.stream.CloseAndRecv()
if err != nil {
return nil, fmt.Errorf("could not receive response: %w", err)
}
return resp, s.sigSvc.SignResponse(s.statusSupported, resp, err)
return r.(*object.PutResponse), nil
}
func (s *SignService) Put(ctx context.Context) (PutObjectStream, error) {
@ -105,87 +90,120 @@ func (s *SignService) Put(ctx context.Context) (PutObjectStream, error) {
}
return &putStreamSigner{
stream: stream,
sigSvc: s.sigSvc,
stream: s.sigSvc.CreateRequestStreamer(
func(req any) error {
return stream.Send(req.(*object.PutRequest))
},
func() (util.ResponseMessage, error) {
return stream.CloseAndRecv()
},
func() util.ResponseMessage {
return new(object.PutResponse)
},
),
}, nil
}
func (s *SignService) Head(ctx context.Context, req *object.HeadRequest) (*object.HeadResponse, error) {
if err := s.sigSvc.VerifyRequest(req); err != nil {
resp := new(object.HeadResponse)
return resp, s.sigSvc.SignResponse(util.IsStatusSupported(req), resp, err)
resp, err := s.sigSvc.HandleUnaryRequest(ctx, req,
func(ctx context.Context, req any) (util.ResponseMessage, error) {
return s.svc.Head(ctx, req.(*object.HeadRequest))
},
func() util.ResponseMessage {
return new(object.HeadResponse)
},
)
if err != nil {
return nil, err
}
resp, err := util.WrapResponse(s.svc.Head(ctx, req))
return resp, s.sigSvc.SignResponse(util.IsStatusSupported(req), resp, err)
return resp.(*object.HeadResponse), nil
}
func (s *searchStreamSigner) Send(resp *object.SearchResponse) error {
s.nonEmptyResp = true
if err := s.sigSvc.SignResponse(s.statusSupported, resp, nil); err != nil {
return err
}
return s.SearchStream.Send(resp)
return s.respWriter(resp)
}
func (s *SignService) Search(req *object.SearchRequest, stream SearchStream) error {
if err := s.sigSvc.VerifyRequest(req); err != nil {
resp := new(object.SearchResponse)
_ = s.sigSvc.SignResponse(util.IsStatusSupported(req), resp, err)
return stream.Send(resp)
}
return s.sigSvc.HandleServerStreamRequest(req,
func(resp util.ResponseMessage) error {
return stream.Send(resp.(*object.SearchResponse))
},
func() util.ResponseMessage {
return new(object.SearchResponse)
},
func(respWriter util.ResponseMessageWriter) error {
stream := &searchStreamSigner{
ServerStream: stream,
respWriter: respWriter,
}
ss := &searchStreamSigner{
SearchStream: stream,
sigSvc: s.sigSvc,
statusSupported: util.IsStatusSupported(req),
}
err := s.svc.Search(req, ss)
if err == nil && !ss.nonEmptyResp {
// The higher component does not write any response in the case of an empty result (which is correct).
// With the introduction of status returns at least one answer must be signed and sent to the client.
// This approach is supported by clients who do not know how to work with statuses (one could make
// a switch according to the protocol version from the request, but the costs of sending an empty
// answer can be neglected due to the gradual refusal to use the "old" clients).
return stream.Send(new(object.SearchResponse))
}
return err
err := s.svc.Search(req, stream)
if err == nil && !stream.nonEmptyResp {
// The higher component does not write any response in the case of an empty result (which is correct).
// With the introduction of status returns at least one answer must be signed and sent to the client.
// This approach is supported by clients who do not know how to work with statuses (one could make
// a switch according to the protocol version from the request, but the costs of sending an empty
// answer can be neglected due to the gradual refusal to use the "old" clients).
return stream.Send(new(object.SearchResponse))
}
return err
},
)
}
func (s *SignService) Delete(ctx context.Context, req *object.DeleteRequest) (*object.DeleteResponse, error) {
if err := s.sigSvc.VerifyRequest(req); err != nil {
resp := new(object.DeleteResponse)
return resp, s.sigSvc.SignResponse(util.IsStatusSupported(req), resp, err)
resp, err := s.sigSvc.HandleUnaryRequest(ctx, req,
func(ctx context.Context, req any) (util.ResponseMessage, error) {
return s.svc.Delete(ctx, req.(*object.DeleteRequest))
},
func() util.ResponseMessage {
return new(object.DeleteResponse)
},
)
if err != nil {
return nil, err
}
resp, err := util.WrapResponse(s.svc.Delete(ctx, req))
return resp, s.sigSvc.SignResponse(util.IsStatusSupported(req), resp, err)
return resp.(*object.DeleteResponse), nil
}
func (s *getRangeStreamSigner) Send(resp *object.GetRangeResponse) error {
if err := s.sigSvc.SignResponse(s.statusSupported, resp, nil); err != nil {
return err
}
return s.GetObjectRangeStream.Send(resp)
return s.respWriter(resp)
}
func (s *SignService) GetRange(req *object.GetRangeRequest, stream GetObjectRangeStream) error {
if err := s.sigSvc.VerifyRequest(req); err != nil {
resp := new(object.GetRangeResponse)
_ = s.sigSvc.SignResponse(util.IsStatusSupported(req), resp, err)
return stream.Send(resp)
}
return s.svc.GetRange(req, &getRangeStreamSigner{
GetObjectRangeStream: stream,
sigSvc: s.sigSvc,
statusSupported: util.IsStatusSupported(req),
})
return s.sigSvc.HandleServerStreamRequest(req,
func(resp util.ResponseMessage) error {
return stream.Send(resp.(*object.GetRangeResponse))
},
func() util.ResponseMessage {
return new(object.GetRangeResponse)
},
func(respWriter util.ResponseMessageWriter) error {
return s.svc.GetRange(req, &getRangeStreamSigner{
ServerStream: stream,
respWriter: respWriter,
})
},
)
}
func (s *SignService) GetRangeHash(ctx context.Context, req *object.GetRangeHashRequest) (*object.GetRangeHashResponse, error) {
if err := s.sigSvc.VerifyRequest(req); err != nil {
resp := new(object.GetRangeHashResponse)
return resp, s.sigSvc.SignResponse(util.IsStatusSupported(req), resp, err)
resp, err := s.sigSvc.HandleUnaryRequest(ctx, req,
func(ctx context.Context, req any) (util.ResponseMessage, error) {
return s.svc.GetRangeHash(ctx, req.(*object.GetRangeHashRequest))
},
func() util.ResponseMessage {
return new(object.GetRangeHashResponse)
},
)
if err != nil {
return nil, err
}
resp, err := util.WrapResponse(s.svc.GetRangeHash(ctx, req))
return resp, s.sigSvc.SignResponse(util.IsStatusSupported(req), resp, err)
return resp.(*object.GetRangeHashResponse), nil
}

View file

@ -51,11 +51,11 @@ func (oiw *objectsInWork) add(addr oid.Address) {
// Policer represents the utility that verifies
// compliance with the object storage policy.
type Policer struct {
cfg
*cfg
cache *lru.Cache[oid.Address, time.Time]
objsInWork objectsInWork
objsInWork *objectsInWork
}
// Option is an option for Policer constructor.
@ -95,33 +95,38 @@ type cfg struct {
rebalanceFreq, evictDuration time.Duration
}
func (c *cfg) initDefault() {
c.log = &logger.Logger{Logger: zap.L()}
c.batchSize = 10
c.cacheSize = 1024 // 1024 * address size = 1024 * 64 = 64 MiB
c.rebalanceFreq = 1 * time.Second
c.evictDuration = 30 * time.Second
func defaultCfg() *cfg {
return &cfg{
log: &logger.Logger{Logger: zap.L()},
batchSize: 10,
cacheSize: 1024, // 1024 * address size = 1024 * 64 = 64 MiB
rebalanceFreq: 1 * time.Second,
evictDuration: 30 * time.Second,
}
}
// New creates, initializes and returns Policer instance.
func New(opts ...Option) *Policer {
var p Policer
p.cfg.initDefault()
c := defaultCfg()
for i := range opts {
opts[i](&p.cfg)
opts[i](c)
}
p.log = &logger.Logger{Logger: p.cfg.log.With(zap.String("component", "Object Policer"))}
c.log = &logger.Logger{Logger: c.log.With(zap.String("component", "Object Policer"))}
cache, err := lru.New[oid.Address, time.Time](int(p.cacheSize))
cache, err := lru.New[oid.Address, time.Time](int(c.cacheSize))
if err != nil {
panic(err)
}
p.cache = cache
p.objsInWork.objs = make(map[oid.Address]struct{}, p.maxCapacity)
return &p
return &Policer{
cfg: c,
cache: cache,
objsInWork: &objectsInWork{
objs: make(map[oid.Address]struct{}, c.maxCapacity),
},
}
}
// WithHeadTimeout returns option to set Head timeout of Policer.

View file

@ -0,0 +1,50 @@
package reputationrpc
import (
"context"
"git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/reputation"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/util"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/util/response"
)
type responseService struct {
respSvc *response.Service
svc Server
}
// NewResponseService returns reputation service server instance that passes
// internal service call to response service.
func NewResponseService(cnrSvc Server, respSvc *response.Service) Server {
return &responseService{
respSvc: respSvc,
svc: cnrSvc,
}
}
func (s *responseService) AnnounceLocalTrust(ctx context.Context, req *reputation.AnnounceLocalTrustRequest) (*reputation.AnnounceLocalTrustResponse, error) {
resp, err := s.respSvc.HandleUnaryRequest(ctx, req,
func(ctx context.Context, req any) (util.ResponseMessage, error) {
return s.svc.AnnounceLocalTrust(ctx, req.(*reputation.AnnounceLocalTrustRequest))
},
)
if err != nil {
return nil, err
}
return resp.(*reputation.AnnounceLocalTrustResponse), nil
}
func (s *responseService) AnnounceIntermediateResult(ctx context.Context, req *reputation.AnnounceIntermediateResultRequest) (*reputation.AnnounceIntermediateResultResponse, error) {
resp, err := s.respSvc.HandleUnaryRequest(ctx, req,
func(ctx context.Context, req any) (util.ResponseMessage, error) {
return s.svc.AnnounceIntermediateResult(ctx, req.(*reputation.AnnounceIntermediateResultRequest))
},
)
if err != nil {
return nil, err
}
return resp.(*reputation.AnnounceIntermediateResultResponse), nil
}

View file

@ -22,19 +22,33 @@ func NewSignService(key *ecdsa.PrivateKey, svc Server) Server {
}
func (s *signService) AnnounceLocalTrust(ctx context.Context, req *reputation.AnnounceLocalTrustRequest) (*reputation.AnnounceLocalTrustResponse, error) {
if err := s.sigSvc.VerifyRequest(req); err != nil {
resp := new(reputation.AnnounceLocalTrustResponse)
return resp, s.sigSvc.SignResponse(util.IsStatusSupported(req), resp, err)
resp, err := s.sigSvc.HandleUnaryRequest(ctx, req,
func(ctx context.Context, req any) (util.ResponseMessage, error) {
return s.svc.AnnounceLocalTrust(ctx, req.(*reputation.AnnounceLocalTrustRequest))
},
func() util.ResponseMessage {
return new(reputation.AnnounceLocalTrustResponse)
},
)
if err != nil {
return nil, err
}
resp, err := util.WrapResponse(s.svc.AnnounceLocalTrust(ctx, req))
return resp, s.sigSvc.SignResponse(util.IsStatusSupported(req), resp, err)
return resp.(*reputation.AnnounceLocalTrustResponse), nil
}
func (s *signService) AnnounceIntermediateResult(ctx context.Context, req *reputation.AnnounceIntermediateResultRequest) (*reputation.AnnounceIntermediateResultResponse, error) {
if err := s.sigSvc.VerifyRequest(req); err != nil {
resp := new(reputation.AnnounceIntermediateResultResponse)
return resp, s.sigSvc.SignResponse(util.IsStatusSupported(req), resp, err)
resp, err := s.sigSvc.HandleUnaryRequest(ctx, req,
func(ctx context.Context, req any) (util.ResponseMessage, error) {
return s.svc.AnnounceIntermediateResult(ctx, req.(*reputation.AnnounceIntermediateResultRequest))
},
func() util.ResponseMessage {
return new(reputation.AnnounceIntermediateResultResponse)
},
)
if err != nil {
return nil, err
}
resp, err := util.WrapResponse(s.svc.AnnounceIntermediateResult(ctx, req))
return resp, s.sigSvc.SignResponse(util.IsStatusSupported(req), resp, err)
return resp.(*reputation.AnnounceIntermediateResultResponse), nil
}

View file

@ -5,7 +5,6 @@ import (
"fmt"
"git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/session"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/util/response"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger"
"go.uber.org/zap"
)
@ -17,17 +16,14 @@ type ServiceExecutor interface {
type executorSvc struct {
exec ServiceExecutor
respSvc *response.Service
log *logger.Logger
}
// NewExecutionService wraps ServiceExecutor and returns Session Service interface.
func NewExecutionService(exec ServiceExecutor, respSvc *response.Service, l *logger.Logger) Server {
func NewExecutionService(exec ServiceExecutor, l *logger.Logger) Server {
return &executorSvc{
exec: exec,
log: l,
respSvc: respSvc,
exec: exec,
log: l,
}
}
@ -45,6 +41,5 @@ func (s *executorSvc) Create(ctx context.Context, req *session.CreateRequest) (*
resp := new(session.CreateResponse)
resp.SetBody(respBody)
s.respSvc.SetMeta(resp)
return resp, nil
}

View file

@ -0,0 +1,37 @@
package session
import (
"context"
"git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/session"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/util"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/util/response"
)
type responseService struct {
respSvc *response.Service
svc Server
}
// NewResponseService returns session service instance that passes internal service
// call to response service.
func NewResponseService(ssSvc Server, respSvc *response.Service) Server {
return &responseService{
respSvc: respSvc,
svc: ssSvc,
}
}
func (s *responseService) Create(ctx context.Context, req *session.CreateRequest) (*session.CreateResponse, error) {
resp, err := s.respSvc.HandleUnaryRequest(ctx, req,
func(ctx context.Context, req any) (util.ResponseMessage, error) {
return s.svc.Create(ctx, req.(*session.CreateRequest))
},
)
if err != nil {
return nil, err
}
return resp.(*session.CreateResponse), nil
}

View file

@ -22,10 +22,17 @@ func NewSignService(key *ecdsa.PrivateKey, svc Server) Server {
}
func (s *signService) Create(ctx context.Context, req *session.CreateRequest) (*session.CreateResponse, error) {
if err := s.sigSvc.VerifyRequest(req); err != nil {
resp := new(session.CreateResponse)
return resp, s.sigSvc.SignResponse(util.IsStatusSupported(req), resp, err)
resp, err := s.sigSvc.HandleUnaryRequest(ctx, req,
func(ctx context.Context, req any) (util.ResponseMessage, error) {
return s.svc.Create(ctx, req.(*session.CreateRequest))
},
func() util.ResponseMessage {
return new(session.CreateResponse)
},
)
if err != nil {
return nil, err
}
resp, err := util.WrapResponse(s.svc.Create(ctx, req))
return resp, s.sigSvc.SignResponse(util.IsStatusSupported(req), resp, err)
return resp.(*session.CreateResponse), nil
}

View file

@ -0,0 +1,47 @@
package response
import (
"fmt"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/util"
)
// ClientMessageStreamer represents client-side message streamer
// that sets meta values to the response.
type ClientMessageStreamer struct {
cfg *cfg
send util.RequestMessageWriter
close util.ClientStreamCloser
}
// Send calls send method of internal streamer.
func (s *ClientMessageStreamer) Send(req any) error {
if err := s.send(req); err != nil {
return fmt.Errorf("(%T) could not send the request: %w", s, err)
}
return nil
}
// CloseAndRecv closes internal stream, receivers the response,
// sets meta values and returns the result.
func (s *ClientMessageStreamer) CloseAndRecv() (util.ResponseMessage, error) {
resp, err := s.close()
if err != nil {
return nil, fmt.Errorf("(%T) could not close stream and receive response: %w", s, err)
}
setMeta(resp, s.cfg)
return resp, nil
}
// CreateRequestStreamer wraps stream methods and returns ClientMessageStreamer instance.
func (s *Service) CreateRequestStreamer(sender util.RequestMessageWriter, closer util.ClientStreamCloser) *ClientMessageStreamer {
return &ClientMessageStreamer{
cfg: s.cfg,
send: sender,
close: closer,
}
}

View file

@ -0,0 +1,37 @@
package response
import (
"fmt"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/util"
)
// ServerMessageStreamer represents server-side message streamer
// that sets meta values to all response messages.
type ServerMessageStreamer struct {
cfg *cfg
recv util.ResponseMessageReader
}
// Recv calls Recv method of internal streamer, sets response meta
// values and returns the response.
func (s *ServerMessageStreamer) Recv() (util.ResponseMessage, error) {
m, err := s.recv()
if err != nil {
return nil, fmt.Errorf("could not receive response message for signing: %w", err)
}
setMeta(m, s.cfg)
return m, nil
}
// HandleServerStreamRequest builds internal streamer via handlers, wraps it to ServerMessageStreamer and returns the result.
func (s *Service) HandleServerStreamRequest(respWriter util.ResponseMessageWriter) util.ResponseMessageWriter {
return func(resp util.ResponseMessage) error {
setMeta(resp, s.cfg)
return respWriter(resp)
}
}

View file

@ -11,24 +11,44 @@ import (
// Service represents universal v2 service
// that sets response meta header values.
type Service struct {
cfg *cfg
}
// Option is an option of Service constructor.
type Option func(*cfg)
type cfg struct {
version refs.Version
state netmap.State
}
// NewService creates, initializes and returns Service instance.
func NewService(nmState netmap.State) *Service {
s := &Service{state: nmState}
version.Current().WriteToV2(&s.version)
return s
func defaultCfg() *cfg {
var c cfg
version.Current().WriteToV2(&c.version)
return &c
}
// SetMeta sets adds meta-header to resp.
func (s *Service) SetMeta(resp util.ResponseMessage) {
// NewService creates, initializes and returns Service instance.
func NewService(opts ...Option) *Service {
c := defaultCfg()
for i := range opts {
opts[i](c)
}
return &Service{
cfg: c,
}
}
func setMeta(resp util.ResponseMessage, cfg *cfg) {
meta := new(session.ResponseMetaHeader)
meta.SetVersion(&s.version)
meta.SetVersion(&cfg.version)
meta.SetTTL(1) // FIXME: #1160 TTL must be calculated
meta.SetEpoch(s.state.CurrentEpoch())
meta.SetEpoch(cfg.state.CurrentEpoch())
if origin := resp.GetMetaHeader(); origin != nil {
// FIXME: #1160 what if origin is set by local server?
@ -37,3 +57,10 @@ func (s *Service) SetMeta(resp util.ResponseMessage) {
resp.SetMetaHeader(meta)
}
// WithNetworkState returns option to set network state of Service.
func WithNetworkState(v netmap.State) Option {
return func(c *cfg) {
c.state = v
}
}

View file

@ -0,0 +1,21 @@
package response
import (
"context"
"fmt"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/util"
)
// HandleUnaryRequest call passes request to handler, sets response meta header values and returns it.
func (s *Service) HandleUnaryRequest(ctx context.Context, req any, handler util.UnaryHandler) (util.ResponseMessage, error) {
// process request
resp, err := handler(ctx, req)
if err != nil {
return nil, fmt.Errorf("could not handle request: %w", err)
}
setMeta(resp, s.cfg)
return resp, nil
}

View file

@ -1,6 +1,7 @@
package util
import (
"context"
"crypto/ecdsa"
"errors"
"fmt"
@ -20,59 +21,198 @@ type ResponseMessage interface {
SetMetaHeader(*session.ResponseMetaHeader)
}
type UnaryHandler func(context.Context, any) (ResponseMessage, error)
type SignService struct {
key *ecdsa.PrivateKey
}
type ResponseMessageWriter func(ResponseMessage) error
type ServerStreamHandler func(context.Context, any) (ResponseMessageReader, error)
type ResponseMessageReader func() (ResponseMessage, error)
var ErrAbortStream = errors.New("abort message stream")
type ResponseConstructor func() ResponseMessage
type RequestMessageWriter func(any) error
type ClientStreamCloser func() (ResponseMessage, error)
type RequestMessageStreamer struct {
key *ecdsa.PrivateKey
send RequestMessageWriter
close ClientStreamCloser
respCons ResponseConstructor
statusSupported bool
sendErr error
}
func NewUnarySignService(key *ecdsa.PrivateKey) *SignService {
return &SignService{
key: key,
}
}
// SignResponse response with private key via signature.SignServiceMessage.
// The signature error affects the result depending on the protocol version:
// - if status return is supported, panics since we cannot return the failed status, because it will not be signed.
// - otherwise, returns error in order to transport it directly.
func (s *SignService) SignResponse(statusSupported bool, resp ResponseMessage, err error) error {
func (s *RequestMessageStreamer) Send(req any) error {
// req argument should be strengthen with type RequestMessage
s.statusSupported = isStatusSupported(req.(RequestMessage)) // panic is OK here for now
var err error
// verify request signatures
if err = signature.VerifyServiceMessage(req); err != nil {
err = fmt.Errorf("could not verify request: %w", err)
} else {
err = s.send(req)
}
if err != nil {
if !s.statusSupported {
return err
}
s.sendErr = err
return ErrAbortStream
}
return nil
}
func (s *RequestMessageStreamer) CloseAndRecv() (ResponseMessage, error) {
var (
resp ResponseMessage
err error
)
if s.sendErr != nil {
err = s.sendErr
} else {
resp, err = s.close()
if err != nil {
err = fmt.Errorf("could not close stream and receive response: %w", err)
}
}
if err != nil {
if !s.statusSupported {
return nil, err
}
resp = s.respCons()
setStatusV2(resp, err)
}
if err = signResponse(s.key, resp, s.statusSupported); err != nil {
return nil, err
}
return resp, nil
}
func (s *SignService) CreateRequestStreamer(sender RequestMessageWriter, closer ClientStreamCloser, blankResp ResponseConstructor) *RequestMessageStreamer {
return &RequestMessageStreamer{
key: s.key,
send: sender,
close: closer,
respCons: blankResp,
}
}
func (s *SignService) HandleServerStreamRequest(
req any,
respWriter ResponseMessageWriter,
blankResp ResponseConstructor,
respWriterCaller func(ResponseMessageWriter) error,
) error {
// handle protocol versions <=2.10 (API statuses was introduced in 2.11 only)
// req argument should be strengthen with type RequestMessage
statusSupported := isStatusSupported(req.(RequestMessage)) // panic is OK here for now
var err error
// verify request signatures
if err = signature.VerifyServiceMessage(req); err != nil {
err = fmt.Errorf("could not verify request: %w", err)
} else {
err = respWriterCaller(func(resp ResponseMessage) error {
if err := signResponse(s.key, resp, statusSupported); err != nil {
return err
}
return respWriter(resp)
})
}
if err != nil {
if !statusSupported {
return err
}
resp := blankResp()
setStatusV2(resp, err)
_ = signResponse(s.key, resp, false) // panics or returns nil with false arg
return respWriter(resp)
}
return nil
}
func (s *SignService) HandleUnaryRequest(ctx context.Context, req any, handler UnaryHandler, blankResp ResponseConstructor) (ResponseMessage, error) {
// handle protocol versions <=2.10 (API statuses was introduced in 2.11 only)
// req argument should be strengthen with type RequestMessage
statusSupported := isStatusSupported(req.(RequestMessage)) // panic is OK here for now
var (
resp ResponseMessage
err error
)
// verify request signatures
if err = signature.VerifyServiceMessage(req); err != nil {
var sigErr apistatus.SignatureVerification
sigErr.SetMessage(err.Error())
err = sigErr
} else {
// process request
resp, err = handler(ctx, req)
}
if err != nil {
if !statusSupported {
return nil, err
}
resp = blankResp()
setStatusV2(resp, err)
}
err = signature.SignServiceMessage(s.key, resp)
if err != nil {
return fmt.Errorf("could not sign response: %w", err)
// sign the response
if err = signResponse(s.key, resp, statusSupported); err != nil {
return nil, err
}
return nil
return resp, nil
}
func (s *SignService) VerifyRequest(req RequestMessage) error {
if err := signature.VerifyServiceMessage(req); err != nil {
var sigErr apistatus.SignatureVerification
sigErr.SetMessage(err.Error())
return sigErr
}
return nil
}
// WrapResponse creates an appropriate response struct if it is nil.
func WrapResponse[T any](resp *T, err error) (*T, error) {
if resp != nil {
return resp, err
}
return new(T), nil
}
// IsStatusSupported returns true iff request version implies expecting status return.
// This allows us to handle protocol versions <=2.10 (API statuses was introduced in 2.11 only).
func IsStatusSupported(req RequestMessage) bool {
func isStatusSupported(req RequestMessage) bool {
version := req.GetMetaHeader().GetVersion()
mjr := version.GetMajor()
@ -88,3 +228,22 @@ func setStatusV2(resp ResponseMessage, err error) {
session.SetStatus(resp, apistatus.ToStatusV2(apistatus.ErrToStatus(err)))
}
// signs response with private key via signature.SignServiceMessage.
// The signature error affects the result depending on the protocol version:
// - if status return is supported, panics since we cannot return the failed status, because it will not be signed;
// - otherwise, returns error in order to transport it directly.
func signResponse(key *ecdsa.PrivateKey, resp any, statusSupported bool) error {
err := signature.SignServiceMessage(key, resp)
if err != nil {
err = fmt.Errorf("could not sign response: %w", err)
if statusSupported {
// We can't pass this error as status code since response will be unsigned.
// Isn't expected in practice, so panic is ok here.
panic(err)
}
}
return err
}