forked from TrueCloudLab/frostfs-node
Compare commits
1 commit
fyrchik/si
...
master
Author | SHA1 | Date | |
---|---|---|---|
9aeea0b974 |
61 changed files with 1481 additions and 594 deletions
|
@ -4,7 +4,7 @@
|
||||||
# options for analysis running
|
# options for analysis running
|
||||||
run:
|
run:
|
||||||
# timeout for analysis, e.g. 30s, 5m, default is 1m
|
# timeout for analysis, e.g. 30s, 5m, default is 1m
|
||||||
timeout: 5m
|
timeout: 10m
|
||||||
|
|
||||||
# include test files or not, default is true
|
# include test files or not, default is true
|
||||||
tests: false
|
tests: false
|
||||||
|
|
|
@ -1,11 +0,0 @@
|
||||||
pipeline:
|
|
||||||
# Kludge for non-root containers under WoodPecker
|
|
||||||
fix-ownership:
|
|
||||||
image: alpine:latest
|
|
||||||
commands: chown -R 1234:1234 .
|
|
||||||
|
|
||||||
full-pre-commit:
|
|
||||||
image: git.frostfs.info/truecloudlab/frostfs-ci:v0.36
|
|
||||||
commands:
|
|
||||||
- export HOME="$(getent passwd $(id -u) | cut '-d:' -f6)"
|
|
||||||
- pre-commit run -a
|
|
|
@ -19,8 +19,10 @@ func initAccountingService(c *cfg) {
|
||||||
server := accountingTransportGRPC.New(
|
server := accountingTransportGRPC.New(
|
||||||
accountingService.NewSignService(
|
accountingService.NewSignService(
|
||||||
&c.key.PrivateKey,
|
&c.key.PrivateKey,
|
||||||
|
accountingService.NewResponseService(
|
||||||
accountingService.NewExecutionService(
|
accountingService.NewExecutionService(
|
||||||
accounting.NewExecutor(balanceMorphWrapper),
|
accounting.NewExecutor(balanceMorphWrapper),
|
||||||
|
),
|
||||||
c.respSvc,
|
c.respSvc,
|
||||||
),
|
),
|
||||||
),
|
),
|
||||||
|
|
|
@ -590,7 +590,7 @@ func initCfg(appCfg *config.Config) *cfg {
|
||||||
key: key,
|
key: key,
|
||||||
binPublicKey: key.PublicKey().Bytes(),
|
binPublicKey: key.PublicKey().Bytes(),
|
||||||
localAddr: netAddr,
|
localAddr: netAddr,
|
||||||
respSvc: response.NewService(netState),
|
respSvc: response.NewService(response.WithNetworkState(netState)),
|
||||||
clientCache: cache.NewSDKClientCache(cacheOpts),
|
clientCache: cache.NewSDKClientCache(cacheOpts),
|
||||||
bgClientCache: cache.NewSDKClientCache(cacheOpts),
|
bgClientCache: cache.NewSDKClientCache(cacheOpts),
|
||||||
putClientCache: cache.NewSDKClientCache(cacheOpts),
|
putClientCache: cache.NewSDKClientCache(cacheOpts),
|
||||||
|
|
|
@ -197,13 +197,16 @@ func initContainerService(c *cfg) {
|
||||||
server := containerTransportGRPC.New(
|
server := containerTransportGRPC.New(
|
||||||
containerService.NewSignService(
|
containerService.NewSignService(
|
||||||
&c.key.PrivateKey,
|
&c.key.PrivateKey,
|
||||||
|
containerService.NewResponseService(
|
||||||
&usedSpaceService{
|
&usedSpaceService{
|
||||||
Server: containerService.NewExecutionService(containerMorph.NewExecutor(cnrRdr, cnrWrt), c.respSvc),
|
Server: containerService.NewExecutionService(containerMorph.NewExecutor(cnrRdr, cnrWrt)),
|
||||||
loadWriterProvider: loadRouter,
|
loadWriterProvider: loadRouter,
|
||||||
loadPlacementBuilder: loadPlacementBuilder,
|
loadPlacementBuilder: loadPlacementBuilder,
|
||||||
routeBuilder: routeBuilder,
|
routeBuilder: routeBuilder,
|
||||||
cfg: c,
|
cfg: c,
|
||||||
},
|
},
|
||||||
|
c.respSvc,
|
||||||
|
),
|
||||||
),
|
),
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -565,8 +568,6 @@ func (c *usedSpaceService) AnnounceUsedSpace(ctx context.Context, req *container
|
||||||
resp := new(containerV2.AnnounceUsedSpaceResponse)
|
resp := new(containerV2.AnnounceUsedSpaceResponse)
|
||||||
resp.SetBody(respBody)
|
resp.SetBody(respBody)
|
||||||
|
|
||||||
c.cfg.respSvc.SetMeta(resp)
|
|
||||||
|
|
||||||
return resp, nil
|
return resp, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -153,6 +153,7 @@ func initNetmapService(c *cfg) {
|
||||||
server := netmapTransportGRPC.New(
|
server := netmapTransportGRPC.New(
|
||||||
netmapService.NewSignService(
|
netmapService.NewSignService(
|
||||||
&c.key.PrivateKey,
|
&c.key.PrivateKey,
|
||||||
|
netmapService.NewResponseService(
|
||||||
netmapService.NewExecutionService(
|
netmapService.NewExecutionService(
|
||||||
c,
|
c,
|
||||||
c.apiVersion,
|
c.apiVersion,
|
||||||
|
@ -162,6 +163,7 @@ func initNetmapService(c *cfg) {
|
||||||
morphClientNetMap: c.cfgNetmap.wrapper,
|
morphClientNetMap: c.cfgNetmap.wrapper,
|
||||||
msPerBlockRdr: c.cfgMorph.client.MsPerBlock,
|
msPerBlockRdr: c.cfgMorph.client.MsPerBlock,
|
||||||
},
|
},
|
||||||
|
),
|
||||||
c.respSvc,
|
c.respSvc,
|
||||||
),
|
),
|
||||||
),
|
),
|
||||||
|
|
|
@ -199,6 +199,7 @@ func initReputationService(c *cfg) {
|
||||||
server := grpcreputation.New(
|
server := grpcreputation.New(
|
||||||
reputationrpc.NewSignService(
|
reputationrpc.NewSignService(
|
||||||
&c.key.PrivateKey,
|
&c.key.PrivateKey,
|
||||||
|
reputationrpc.NewResponseService(
|
||||||
&reputationServer{
|
&reputationServer{
|
||||||
cfg: c,
|
cfg: c,
|
||||||
log: c.log,
|
log: c.log,
|
||||||
|
@ -206,6 +207,8 @@ func initReputationService(c *cfg) {
|
||||||
intermediateRouter: intermediateTrustRouter,
|
intermediateRouter: intermediateTrustRouter,
|
||||||
routeBuilder: localRouteBuilder,
|
routeBuilder: localRouteBuilder,
|
||||||
},
|
},
|
||||||
|
c.respSvc,
|
||||||
|
),
|
||||||
),
|
),
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -286,7 +289,6 @@ func (s *reputationServer) AnnounceLocalTrust(ctx context.Context, req *v2reputa
|
||||||
resp := new(v2reputation.AnnounceLocalTrustResponse)
|
resp := new(v2reputation.AnnounceLocalTrustResponse)
|
||||||
resp.SetBody(new(v2reputation.AnnounceLocalTrustResponseBody))
|
resp.SetBody(new(v2reputation.AnnounceLocalTrustResponseBody))
|
||||||
|
|
||||||
s.respSvc.SetMeta(resp)
|
|
||||||
return resp, nil
|
return resp, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -315,7 +317,6 @@ func (s *reputationServer) AnnounceIntermediateResult(ctx context.Context, req *
|
||||||
resp := new(v2reputation.AnnounceIntermediateResultResponse)
|
resp := new(v2reputation.AnnounceIntermediateResultResponse)
|
||||||
resp.SetBody(new(v2reputation.AnnounceIntermediateResultResponseBody))
|
resp.SetBody(new(v2reputation.AnnounceIntermediateResultResponseBody))
|
||||||
|
|
||||||
s.respSvc.SetMeta(resp)
|
|
||||||
return resp, nil
|
return resp, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -53,7 +53,10 @@ func initSessionService(c *cfg) {
|
||||||
server := sessionTransportGRPC.New(
|
server := sessionTransportGRPC.New(
|
||||||
sessionSvc.NewSignService(
|
sessionSvc.NewSignService(
|
||||||
&c.key.PrivateKey,
|
&c.key.PrivateKey,
|
||||||
sessionSvc.NewExecutionService(c.privateTokenStore, c.respSvc, c.log),
|
sessionSvc.NewResponseService(
|
||||||
|
sessionSvc.NewExecutionService(c.privateTokenStore, c.log),
|
||||||
|
c.respSvc,
|
||||||
|
),
|
||||||
),
|
),
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
|
@ -5,7 +5,6 @@ import (
|
||||||
"fmt"
|
"fmt"
|
||||||
|
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/accounting"
|
"git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/accounting"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/util/response"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
type ServiceExecutor interface {
|
type ServiceExecutor interface {
|
||||||
|
@ -14,14 +13,12 @@ type ServiceExecutor interface {
|
||||||
|
|
||||||
type executorSvc struct {
|
type executorSvc struct {
|
||||||
exec ServiceExecutor
|
exec ServiceExecutor
|
||||||
respSvc *response.Service
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewExecutionService wraps ServiceExecutor and returns Accounting Service interface.
|
// NewExecutionService wraps ServiceExecutor and returns Accounting Service interface.
|
||||||
func NewExecutionService(exec ServiceExecutor, respSvc *response.Service) Server {
|
func NewExecutionService(exec ServiceExecutor) Server {
|
||||||
return &executorSvc{
|
return &executorSvc{
|
||||||
exec: exec,
|
exec: exec,
|
||||||
respSvc: respSvc,
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -34,6 +31,5 @@ func (s *executorSvc) Balance(ctx context.Context, req *accounting.BalanceReques
|
||||||
resp := new(accounting.BalanceResponse)
|
resp := new(accounting.BalanceResponse)
|
||||||
resp.SetBody(respBody)
|
resp.SetBody(respBody)
|
||||||
|
|
||||||
s.respSvc.SetMeta(resp)
|
|
||||||
return resp, nil
|
return resp, nil
|
||||||
}
|
}
|
||||||
|
|
37
pkg/services/accounting/response.go
Normal file
37
pkg/services/accounting/response.go
Normal file
|
@ -0,0 +1,37 @@
|
||||||
|
package accounting
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/accounting"
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/util"
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/util/response"
|
||||||
|
)
|
||||||
|
|
||||||
|
type responseService struct {
|
||||||
|
respSvc *response.Service
|
||||||
|
|
||||||
|
svc Server
|
||||||
|
}
|
||||||
|
|
||||||
|
// NewResponseService returns accounting service instance that passes internal service
|
||||||
|
// call to response service.
|
||||||
|
func NewResponseService(accSvc Server, respSvc *response.Service) Server {
|
||||||
|
return &responseService{
|
||||||
|
respSvc: respSvc,
|
||||||
|
svc: accSvc,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *responseService) Balance(ctx context.Context, req *accounting.BalanceRequest) (*accounting.BalanceResponse, error) {
|
||||||
|
resp, err := s.respSvc.HandleUnaryRequest(ctx, req,
|
||||||
|
func(ctx context.Context, req any) (util.ResponseMessage, error) {
|
||||||
|
return s.svc.Balance(ctx, req.(*accounting.BalanceRequest))
|
||||||
|
},
|
||||||
|
)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
return resp.(*accounting.BalanceResponse), nil
|
||||||
|
}
|
|
@ -22,6 +22,17 @@ func NewSignService(key *ecdsa.PrivateKey, svc Server) Server {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *signService) Balance(ctx context.Context, req *accounting.BalanceRequest) (*accounting.BalanceResponse, error) {
|
func (s *signService) Balance(ctx context.Context, req *accounting.BalanceRequest) (*accounting.BalanceResponse, error) {
|
||||||
resp, err := util.WrapResponse(s.svc.Balance(ctx, req))
|
resp, err := s.sigSvc.HandleUnaryRequest(ctx, req,
|
||||||
return resp, s.sigSvc.SignResponse(util.IsStatusSupported(req), resp, err)
|
func(ctx context.Context, req any) (util.ResponseMessage, error) {
|
||||||
|
return s.svc.Balance(ctx, req.(*accounting.BalanceRequest))
|
||||||
|
},
|
||||||
|
func() util.ResponseMessage {
|
||||||
|
return new(accounting.BalanceResponse)
|
||||||
|
},
|
||||||
|
)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
return resp.(*accounting.BalanceResponse), nil
|
||||||
}
|
}
|
||||||
|
|
|
@ -6,7 +6,6 @@ import (
|
||||||
|
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/container"
|
"git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/container"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/session"
|
"git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/session"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/util/response"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
type ServiceExecutor interface {
|
type ServiceExecutor interface {
|
||||||
|
@ -22,15 +21,12 @@ type executorSvc struct {
|
||||||
Server
|
Server
|
||||||
|
|
||||||
exec ServiceExecutor
|
exec ServiceExecutor
|
||||||
|
|
||||||
respSvc *response.Service
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewExecutionService wraps ServiceExecutor and returns Container Service interface.
|
// NewExecutionService wraps ServiceExecutor and returns Container Service interface.
|
||||||
func NewExecutionService(exec ServiceExecutor, respSvc *response.Service) Server {
|
func NewExecutionService(exec ServiceExecutor) Server {
|
||||||
return &executorSvc{
|
return &executorSvc{
|
||||||
exec: exec,
|
exec: exec,
|
||||||
respSvc: respSvc,
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -48,7 +44,6 @@ func (s *executorSvc) Put(ctx context.Context, req *container.PutRequest) (*cont
|
||||||
resp := new(container.PutResponse)
|
resp := new(container.PutResponse)
|
||||||
resp.SetBody(respBody)
|
resp.SetBody(respBody)
|
||||||
|
|
||||||
s.respSvc.SetMeta(resp)
|
|
||||||
return resp, nil
|
return resp, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -66,7 +61,6 @@ func (s *executorSvc) Delete(ctx context.Context, req *container.DeleteRequest)
|
||||||
resp := new(container.DeleteResponse)
|
resp := new(container.DeleteResponse)
|
||||||
resp.SetBody(respBody)
|
resp.SetBody(respBody)
|
||||||
|
|
||||||
s.respSvc.SetMeta(resp)
|
|
||||||
return resp, nil
|
return resp, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -79,7 +73,6 @@ func (s *executorSvc) Get(ctx context.Context, req *container.GetRequest) (*cont
|
||||||
resp := new(container.GetResponse)
|
resp := new(container.GetResponse)
|
||||||
resp.SetBody(respBody)
|
resp.SetBody(respBody)
|
||||||
|
|
||||||
s.respSvc.SetMeta(resp)
|
|
||||||
return resp, nil
|
return resp, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -92,7 +85,6 @@ func (s *executorSvc) List(ctx context.Context, req *container.ListRequest) (*co
|
||||||
resp := new(container.ListResponse)
|
resp := new(container.ListResponse)
|
||||||
resp.SetBody(respBody)
|
resp.SetBody(respBody)
|
||||||
|
|
||||||
s.respSvc.SetMeta(resp)
|
|
||||||
return resp, nil
|
return resp, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -110,7 +102,6 @@ func (s *executorSvc) SetExtendedACL(ctx context.Context, req *container.SetExte
|
||||||
resp := new(container.SetExtendedACLResponse)
|
resp := new(container.SetExtendedACLResponse)
|
||||||
resp.SetBody(respBody)
|
resp.SetBody(respBody)
|
||||||
|
|
||||||
s.respSvc.SetMeta(resp)
|
|
||||||
return resp, nil
|
return resp, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -123,6 +114,5 @@ func (s *executorSvc) GetExtendedACL(ctx context.Context, req *container.GetExte
|
||||||
resp := new(container.GetExtendedACLResponse)
|
resp := new(container.GetExtendedACLResponse)
|
||||||
resp.SetBody(respBody)
|
resp.SetBody(respBody)
|
||||||
|
|
||||||
s.respSvc.SetMeta(resp)
|
|
||||||
return resp, nil
|
return resp, nil
|
||||||
}
|
}
|
||||||
|
|
115
pkg/services/container/response.go
Normal file
115
pkg/services/container/response.go
Normal file
|
@ -0,0 +1,115 @@
|
||||||
|
package container
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/container"
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/util"
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/util/response"
|
||||||
|
)
|
||||||
|
|
||||||
|
type responseService struct {
|
||||||
|
respSvc *response.Service
|
||||||
|
|
||||||
|
svc Server
|
||||||
|
}
|
||||||
|
|
||||||
|
// NewResponseService returns container service instance that passes internal service
|
||||||
|
// call to response service.
|
||||||
|
func NewResponseService(cnrSvc Server, respSvc *response.Service) Server {
|
||||||
|
return &responseService{
|
||||||
|
respSvc: respSvc,
|
||||||
|
svc: cnrSvc,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *responseService) Put(ctx context.Context, req *container.PutRequest) (*container.PutResponse, error) {
|
||||||
|
resp, err := s.respSvc.HandleUnaryRequest(ctx, req,
|
||||||
|
func(ctx context.Context, req any) (util.ResponseMessage, error) {
|
||||||
|
return s.svc.Put(ctx, req.(*container.PutRequest))
|
||||||
|
},
|
||||||
|
)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
return resp.(*container.PutResponse), nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *responseService) Delete(ctx context.Context, req *container.DeleteRequest) (*container.DeleteResponse, error) {
|
||||||
|
resp, err := s.respSvc.HandleUnaryRequest(ctx, req,
|
||||||
|
func(ctx context.Context, req any) (util.ResponseMessage, error) {
|
||||||
|
return s.svc.Delete(ctx, req.(*container.DeleteRequest))
|
||||||
|
},
|
||||||
|
)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
return resp.(*container.DeleteResponse), nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *responseService) Get(ctx context.Context, req *container.GetRequest) (*container.GetResponse, error) {
|
||||||
|
resp, err := s.respSvc.HandleUnaryRequest(ctx, req,
|
||||||
|
func(ctx context.Context, req any) (util.ResponseMessage, error) {
|
||||||
|
return s.svc.Get(ctx, req.(*container.GetRequest))
|
||||||
|
},
|
||||||
|
)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
return resp.(*container.GetResponse), nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *responseService) List(ctx context.Context, req *container.ListRequest) (*container.ListResponse, error) {
|
||||||
|
resp, err := s.respSvc.HandleUnaryRequest(ctx, req,
|
||||||
|
func(ctx context.Context, req any) (util.ResponseMessage, error) {
|
||||||
|
return s.svc.List(ctx, req.(*container.ListRequest))
|
||||||
|
},
|
||||||
|
)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
return resp.(*container.ListResponse), nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *responseService) SetExtendedACL(ctx context.Context, req *container.SetExtendedACLRequest) (*container.SetExtendedACLResponse, error) {
|
||||||
|
resp, err := s.respSvc.HandleUnaryRequest(ctx, req,
|
||||||
|
func(ctx context.Context, req any) (util.ResponseMessage, error) {
|
||||||
|
return s.svc.SetExtendedACL(ctx, req.(*container.SetExtendedACLRequest))
|
||||||
|
},
|
||||||
|
)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
return resp.(*container.SetExtendedACLResponse), nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *responseService) GetExtendedACL(ctx context.Context, req *container.GetExtendedACLRequest) (*container.GetExtendedACLResponse, error) {
|
||||||
|
resp, err := s.respSvc.HandleUnaryRequest(ctx, req,
|
||||||
|
func(ctx context.Context, req any) (util.ResponseMessage, error) {
|
||||||
|
return s.svc.GetExtendedACL(ctx, req.(*container.GetExtendedACLRequest))
|
||||||
|
},
|
||||||
|
)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
return resp.(*container.GetExtendedACLResponse), nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *responseService) AnnounceUsedSpace(ctx context.Context, req *container.AnnounceUsedSpaceRequest) (*container.AnnounceUsedSpaceResponse, error) {
|
||||||
|
resp, err := s.respSvc.HandleUnaryRequest(ctx, req,
|
||||||
|
func(ctx context.Context, req any) (util.ResponseMessage, error) {
|
||||||
|
return s.svc.AnnounceUsedSpace(ctx, req.(*container.AnnounceUsedSpaceRequest))
|
||||||
|
},
|
||||||
|
)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
return resp.(*container.AnnounceUsedSpaceResponse), nil
|
||||||
|
}
|
|
@ -22,64 +22,113 @@ func NewSignService(key *ecdsa.PrivateKey, svc Server) Server {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *signService) Put(ctx context.Context, req *container.PutRequest) (*container.PutResponse, error) {
|
func (s *signService) Put(ctx context.Context, req *container.PutRequest) (*container.PutResponse, error) {
|
||||||
if err := s.sigSvc.VerifyRequest(req); err != nil {
|
resp, err := s.sigSvc.HandleUnaryRequest(ctx, req,
|
||||||
resp := new(container.PutResponse)
|
func(ctx context.Context, req any) (util.ResponseMessage, error) {
|
||||||
return resp, s.sigSvc.SignResponse(util.IsStatusSupported(req), resp, err)
|
return s.svc.Put(ctx, req.(*container.PutRequest))
|
||||||
|
},
|
||||||
|
func() util.ResponseMessage {
|
||||||
|
return new(container.PutResponse)
|
||||||
|
},
|
||||||
|
)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
}
|
}
|
||||||
resp, err := util.WrapResponse(s.svc.Put(ctx, req))
|
|
||||||
return resp, s.sigSvc.SignResponse(util.IsStatusSupported(req), resp, err)
|
return resp.(*container.PutResponse), nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *signService) Delete(ctx context.Context, req *container.DeleteRequest) (*container.DeleteResponse, error) {
|
func (s *signService) Delete(ctx context.Context, req *container.DeleteRequest) (*container.DeleteResponse, error) {
|
||||||
if err := s.sigSvc.VerifyRequest(req); err != nil {
|
resp, err := s.sigSvc.HandleUnaryRequest(ctx, req,
|
||||||
resp := new(container.DeleteResponse)
|
func(ctx context.Context, req any) (util.ResponseMessage, error) {
|
||||||
return resp, s.sigSvc.SignResponse(util.IsStatusSupported(req), resp, err)
|
return s.svc.Delete(ctx, req.(*container.DeleteRequest))
|
||||||
|
},
|
||||||
|
func() util.ResponseMessage {
|
||||||
|
return new(container.DeleteResponse)
|
||||||
|
},
|
||||||
|
)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
}
|
}
|
||||||
resp, err := util.WrapResponse(s.svc.Delete(ctx, req))
|
|
||||||
return resp, s.sigSvc.SignResponse(util.IsStatusSupported(req), resp, err)
|
return resp.(*container.DeleteResponse), nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *signService) Get(ctx context.Context, req *container.GetRequest) (*container.GetResponse, error) {
|
func (s *signService) Get(ctx context.Context, req *container.GetRequest) (*container.GetResponse, error) {
|
||||||
if err := s.sigSvc.VerifyRequest(req); err != nil {
|
resp, err := s.sigSvc.HandleUnaryRequest(ctx, req,
|
||||||
resp := new(container.GetResponse)
|
func(ctx context.Context, req any) (util.ResponseMessage, error) {
|
||||||
return resp, s.sigSvc.SignResponse(util.IsStatusSupported(req), resp, err)
|
return s.svc.Get(ctx, req.(*container.GetRequest))
|
||||||
|
},
|
||||||
|
func() util.ResponseMessage {
|
||||||
|
return new(container.GetResponse)
|
||||||
|
},
|
||||||
|
)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
}
|
}
|
||||||
resp, err := util.WrapResponse(s.svc.Get(ctx, req))
|
|
||||||
return resp, s.sigSvc.SignResponse(util.IsStatusSupported(req), resp, err)
|
return resp.(*container.GetResponse), nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *signService) List(ctx context.Context, req *container.ListRequest) (*container.ListResponse, error) {
|
func (s *signService) List(ctx context.Context, req *container.ListRequest) (*container.ListResponse, error) {
|
||||||
if err := s.sigSvc.VerifyRequest(req); err != nil {
|
resp, err := s.sigSvc.HandleUnaryRequest(ctx, req,
|
||||||
resp := new(container.ListResponse)
|
func(ctx context.Context, req any) (util.ResponseMessage, error) {
|
||||||
return resp, s.sigSvc.SignResponse(util.IsStatusSupported(req), resp, err)
|
return s.svc.List(ctx, req.(*container.ListRequest))
|
||||||
|
},
|
||||||
|
func() util.ResponseMessage {
|
||||||
|
return new(container.ListResponse)
|
||||||
|
},
|
||||||
|
)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
}
|
}
|
||||||
resp, err := util.WrapResponse(s.svc.List(ctx, req))
|
|
||||||
return resp, s.sigSvc.SignResponse(util.IsStatusSupported(req), resp, err)
|
return resp.(*container.ListResponse), nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *signService) SetExtendedACL(ctx context.Context, req *container.SetExtendedACLRequest) (*container.SetExtendedACLResponse, error) {
|
func (s *signService) SetExtendedACL(ctx context.Context, req *container.SetExtendedACLRequest) (*container.SetExtendedACLResponse, error) {
|
||||||
if err := s.sigSvc.VerifyRequest(req); err != nil {
|
resp, err := s.sigSvc.HandleUnaryRequest(ctx, req,
|
||||||
resp := new(container.SetExtendedACLResponse)
|
func(ctx context.Context, req any) (util.ResponseMessage, error) {
|
||||||
return resp, s.sigSvc.SignResponse(util.IsStatusSupported(req), resp, err)
|
return s.svc.SetExtendedACL(ctx, req.(*container.SetExtendedACLRequest))
|
||||||
|
},
|
||||||
|
func() util.ResponseMessage {
|
||||||
|
return new(container.SetExtendedACLResponse)
|
||||||
|
},
|
||||||
|
)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
}
|
}
|
||||||
resp, err := util.WrapResponse(s.svc.SetExtendedACL(ctx, req))
|
|
||||||
return resp, s.sigSvc.SignResponse(util.IsStatusSupported(req), resp, err)
|
return resp.(*container.SetExtendedACLResponse), nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *signService) GetExtendedACL(ctx context.Context, req *container.GetExtendedACLRequest) (*container.GetExtendedACLResponse, error) {
|
func (s *signService) GetExtendedACL(ctx context.Context, req *container.GetExtendedACLRequest) (*container.GetExtendedACLResponse, error) {
|
||||||
if err := s.sigSvc.VerifyRequest(req); err != nil {
|
resp, err := s.sigSvc.HandleUnaryRequest(ctx, req,
|
||||||
resp := new(container.GetExtendedACLResponse)
|
func(ctx context.Context, req any) (util.ResponseMessage, error) {
|
||||||
return resp, s.sigSvc.SignResponse(util.IsStatusSupported(req), resp, err)
|
return s.svc.GetExtendedACL(ctx, req.(*container.GetExtendedACLRequest))
|
||||||
|
},
|
||||||
|
func() util.ResponseMessage {
|
||||||
|
return new(container.GetExtendedACLResponse)
|
||||||
|
},
|
||||||
|
)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
}
|
}
|
||||||
resp, err := util.WrapResponse(s.svc.GetExtendedACL(ctx, req))
|
|
||||||
return resp, s.sigSvc.SignResponse(util.IsStatusSupported(req), resp, err)
|
return resp.(*container.GetExtendedACLResponse), nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *signService) AnnounceUsedSpace(ctx context.Context, req *container.AnnounceUsedSpaceRequest) (*container.AnnounceUsedSpaceResponse, error) {
|
func (s *signService) AnnounceUsedSpace(ctx context.Context, req *container.AnnounceUsedSpaceRequest) (*container.AnnounceUsedSpaceResponse, error) {
|
||||||
if err := s.sigSvc.VerifyRequest(req); err != nil {
|
resp, err := s.sigSvc.HandleUnaryRequest(ctx, req,
|
||||||
resp := new(container.AnnounceUsedSpaceResponse)
|
func(ctx context.Context, req any) (util.ResponseMessage, error) {
|
||||||
return resp, s.sigSvc.SignResponse(util.IsStatusSupported(req), resp, err)
|
return s.svc.AnnounceUsedSpace(ctx, req.(*container.AnnounceUsedSpaceRequest))
|
||||||
|
},
|
||||||
|
func() util.ResponseMessage {
|
||||||
|
return new(container.AnnounceUsedSpaceResponse)
|
||||||
|
},
|
||||||
|
)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
}
|
}
|
||||||
resp, err := util.WrapResponse(s.svc.AnnounceUsedSpace(ctx, req))
|
|
||||||
return resp, s.sigSvc.SignResponse(util.IsStatusSupported(req), resp, err)
|
return resp.(*container.AnnounceUsedSpaceResponse), nil
|
||||||
}
|
}
|
||||||
|
|
|
@ -8,7 +8,6 @@ import (
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/netmap"
|
"git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/netmap"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/refs"
|
"git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/refs"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/version"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/version"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/util/response"
|
|
||||||
netmapSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/netmap"
|
netmapSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/netmap"
|
||||||
versionsdk "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/version"
|
versionsdk "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/version"
|
||||||
)
|
)
|
||||||
|
@ -19,8 +18,6 @@ type executorSvc struct {
|
||||||
state NodeState
|
state NodeState
|
||||||
|
|
||||||
netInfo NetworkInfo
|
netInfo NetworkInfo
|
||||||
|
|
||||||
respSvc *response.Service
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// NodeState encapsulates information
|
// NodeState encapsulates information
|
||||||
|
@ -45,8 +42,8 @@ type NetworkInfo interface {
|
||||||
Dump(versionsdk.Version) (*netmapSDK.NetworkInfo, error)
|
Dump(versionsdk.Version) (*netmapSDK.NetworkInfo, error)
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewExecutionService(s NodeState, v versionsdk.Version, netInfo NetworkInfo, respSvc *response.Service) Server {
|
func NewExecutionService(s NodeState, v versionsdk.Version, netInfo NetworkInfo) Server {
|
||||||
if s == nil || netInfo == nil || !version.IsValid(v) || respSvc == nil {
|
if s == nil || netInfo == nil || !version.IsValid(v) {
|
||||||
// this should never happen, otherwise it programmers bug
|
// this should never happen, otherwise it programmers bug
|
||||||
panic("can't create netmap execution service")
|
panic("can't create netmap execution service")
|
||||||
}
|
}
|
||||||
|
@ -54,7 +51,6 @@ func NewExecutionService(s NodeState, v versionsdk.Version, netInfo NetworkInfo,
|
||||||
res := &executorSvc{
|
res := &executorSvc{
|
||||||
state: s,
|
state: s,
|
||||||
netInfo: netInfo,
|
netInfo: netInfo,
|
||||||
respSvc: respSvc,
|
|
||||||
}
|
}
|
||||||
|
|
||||||
v.WriteToV2(&res.version)
|
v.WriteToV2(&res.version)
|
||||||
|
@ -100,7 +96,6 @@ func (s *executorSvc) LocalNodeInfo(
|
||||||
resp := new(netmap.LocalNodeInfoResponse)
|
resp := new(netmap.LocalNodeInfoResponse)
|
||||||
resp.SetBody(body)
|
resp.SetBody(body)
|
||||||
|
|
||||||
s.respSvc.SetMeta(resp)
|
|
||||||
return resp, nil
|
return resp, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -131,11 +126,10 @@ func (s *executorSvc) NetworkInfo(
|
||||||
resp := new(netmap.NetworkInfoResponse)
|
resp := new(netmap.NetworkInfoResponse)
|
||||||
resp.SetBody(body)
|
resp.SetBody(body)
|
||||||
|
|
||||||
s.respSvc.SetMeta(resp)
|
|
||||||
return resp, nil
|
return resp, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *executorSvc) Snapshot(_ context.Context, _ *netmap.SnapshotRequest) (*netmap.SnapshotResponse, error) {
|
func (s *executorSvc) Snapshot(_ context.Context, req *netmap.SnapshotRequest) (*netmap.SnapshotResponse, error) {
|
||||||
var nm netmap.NetMap
|
var nm netmap.NetMap
|
||||||
|
|
||||||
err := s.state.ReadCurrentNetMap(&nm)
|
err := s.state.ReadCurrentNetMap(&nm)
|
||||||
|
@ -149,6 +143,5 @@ func (s *executorSvc) Snapshot(_ context.Context, _ *netmap.SnapshotRequest) (*n
|
||||||
resp := new(netmap.SnapshotResponse)
|
resp := new(netmap.SnapshotResponse)
|
||||||
resp.SetBody(body)
|
resp.SetBody(body)
|
||||||
|
|
||||||
s.respSvc.SetMeta(resp)
|
|
||||||
return resp, nil
|
return resp, nil
|
||||||
}
|
}
|
||||||
|
|
63
pkg/services/netmap/response.go
Normal file
63
pkg/services/netmap/response.go
Normal file
|
@ -0,0 +1,63 @@
|
||||||
|
package netmap
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/netmap"
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/util"
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/util/response"
|
||||||
|
)
|
||||||
|
|
||||||
|
type responseService struct {
|
||||||
|
respSvc *response.Service
|
||||||
|
|
||||||
|
svc Server
|
||||||
|
}
|
||||||
|
|
||||||
|
// NewResponseService returns netmap service instance that passes internal service
|
||||||
|
// call to response service.
|
||||||
|
func NewResponseService(nmSvc Server, respSvc *response.Service) Server {
|
||||||
|
return &responseService{
|
||||||
|
respSvc: respSvc,
|
||||||
|
svc: nmSvc,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *responseService) LocalNodeInfo(ctx context.Context, req *netmap.LocalNodeInfoRequest) (*netmap.LocalNodeInfoResponse, error) {
|
||||||
|
resp, err := s.respSvc.HandleUnaryRequest(ctx, req,
|
||||||
|
func(ctx context.Context, req any) (util.ResponseMessage, error) {
|
||||||
|
return s.svc.LocalNodeInfo(ctx, req.(*netmap.LocalNodeInfoRequest))
|
||||||
|
},
|
||||||
|
)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
return resp.(*netmap.LocalNodeInfoResponse), nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *responseService) NetworkInfo(ctx context.Context, req *netmap.NetworkInfoRequest) (*netmap.NetworkInfoResponse, error) {
|
||||||
|
resp, err := s.respSvc.HandleUnaryRequest(ctx, req,
|
||||||
|
func(ctx context.Context, req any) (util.ResponseMessage, error) {
|
||||||
|
return s.svc.NetworkInfo(ctx, req.(*netmap.NetworkInfoRequest))
|
||||||
|
},
|
||||||
|
)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
return resp.(*netmap.NetworkInfoResponse), nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *responseService) Snapshot(ctx context.Context, req *netmap.SnapshotRequest) (*netmap.SnapshotResponse, error) {
|
||||||
|
resp, err := s.respSvc.HandleUnaryRequest(ctx, req,
|
||||||
|
func(ctx context.Context, req any) (util.ResponseMessage, error) {
|
||||||
|
return s.svc.Snapshot(ctx, req.(*netmap.SnapshotRequest))
|
||||||
|
},
|
||||||
|
)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
return resp.(*netmap.SnapshotResponse), nil
|
||||||
|
}
|
|
@ -24,28 +24,49 @@ func NewSignService(key *ecdsa.PrivateKey, svc Server) Server {
|
||||||
func (s *signService) LocalNodeInfo(
|
func (s *signService) LocalNodeInfo(
|
||||||
ctx context.Context,
|
ctx context.Context,
|
||||||
req *netmap.LocalNodeInfoRequest) (*netmap.LocalNodeInfoResponse, error) {
|
req *netmap.LocalNodeInfoRequest) (*netmap.LocalNodeInfoResponse, error) {
|
||||||
if err := s.sigSvc.VerifyRequest(req); err != nil {
|
resp, err := s.sigSvc.HandleUnaryRequest(ctx, req,
|
||||||
resp := new(netmap.LocalNodeInfoResponse)
|
func(ctx context.Context, req any) (util.ResponseMessage, error) {
|
||||||
return resp, s.sigSvc.SignResponse(util.IsStatusSupported(req), resp, err)
|
return s.svc.LocalNodeInfo(ctx, req.(*netmap.LocalNodeInfoRequest))
|
||||||
|
},
|
||||||
|
func() util.ResponseMessage {
|
||||||
|
return new(netmap.LocalNodeInfoResponse)
|
||||||
|
},
|
||||||
|
)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
}
|
}
|
||||||
resp, err := util.WrapResponse(s.svc.LocalNodeInfo(ctx, req))
|
|
||||||
return resp, s.sigSvc.SignResponse(util.IsStatusSupported(req), resp, err)
|
return resp.(*netmap.LocalNodeInfoResponse), nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *signService) NetworkInfo(ctx context.Context, req *netmap.NetworkInfoRequest) (*netmap.NetworkInfoResponse, error) {
|
func (s *signService) NetworkInfo(ctx context.Context, req *netmap.NetworkInfoRequest) (*netmap.NetworkInfoResponse, error) {
|
||||||
if err := s.sigSvc.VerifyRequest(req); err != nil {
|
resp, err := s.sigSvc.HandleUnaryRequest(ctx, req,
|
||||||
resp := new(netmap.NetworkInfoResponse)
|
func(ctx context.Context, req any) (util.ResponseMessage, error) {
|
||||||
return resp, s.sigSvc.SignResponse(util.IsStatusSupported(req), resp, err)
|
return s.svc.NetworkInfo(ctx, req.(*netmap.NetworkInfoRequest))
|
||||||
|
},
|
||||||
|
func() util.ResponseMessage {
|
||||||
|
return new(netmap.NetworkInfoResponse)
|
||||||
|
},
|
||||||
|
)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
}
|
}
|
||||||
resp, err := util.WrapResponse(s.svc.NetworkInfo(ctx, req))
|
|
||||||
return resp, s.sigSvc.SignResponse(util.IsStatusSupported(req), resp, err)
|
return resp.(*netmap.NetworkInfoResponse), nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *signService) Snapshot(ctx context.Context, req *netmap.SnapshotRequest) (*netmap.SnapshotResponse, error) {
|
func (s *signService) Snapshot(ctx context.Context, req *netmap.SnapshotRequest) (*netmap.SnapshotResponse, error) {
|
||||||
if err := s.sigSvc.VerifyRequest(req); err != nil {
|
resp, err := s.sigSvc.HandleUnaryRequest(ctx, req,
|
||||||
resp := new(netmap.SnapshotResponse)
|
func(ctx context.Context, req any) (util.ResponseMessage, error) {
|
||||||
return resp, s.sigSvc.SignResponse(util.IsStatusSupported(req), resp, err)
|
return s.svc.Snapshot(ctx, req.(*netmap.SnapshotRequest))
|
||||||
|
},
|
||||||
|
func() util.ResponseMessage {
|
||||||
|
return new(netmap.SnapshotResponse)
|
||||||
|
},
|
||||||
|
)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
}
|
}
|
||||||
resp, err := util.WrapResponse(s.svc.Snapshot(ctx, req))
|
|
||||||
return resp, s.sigSvc.SignResponse(util.IsStatusSupported(req), resp, err)
|
return resp.(*netmap.SnapshotResponse), nil
|
||||||
}
|
}
|
||||||
|
|
|
@ -45,30 +45,31 @@ type headerSource struct {
|
||||||
incompleteObjectHeaders bool
|
incompleteObjectHeaders bool
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *cfg) initDefault() {
|
func defaultCfg() *cfg {
|
||||||
c.storage = (*localStorage)(nil)
|
return &cfg{
|
||||||
|
storage: new(localStorage),
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewMessageHeaderSource(opts ...Option) (eaclSDK.TypedHeaderSource, error) {
|
func NewMessageHeaderSource(opts ...Option) (eaclSDK.TypedHeaderSource, error) {
|
||||||
var c cfg
|
cfg := defaultCfg()
|
||||||
c.initDefault()
|
|
||||||
|
|
||||||
for i := range opts {
|
for i := range opts {
|
||||||
opts[i](&c)
|
opts[i](cfg)
|
||||||
}
|
}
|
||||||
|
|
||||||
if c.msg == nil {
|
if cfg.msg == nil {
|
||||||
return nil, errors.New("message is not provided")
|
return nil, errors.New("message is not provided")
|
||||||
}
|
}
|
||||||
|
|
||||||
var res headerSource
|
var res headerSource
|
||||||
|
|
||||||
err := c.readObjectHeaders(&res)
|
err := cfg.readObjectHeaders(&res)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
res.requestHeaders = requestHeaders(c.msg)
|
res.requestHeaders = requestHeaders(cfg.msg)
|
||||||
|
|
||||||
return res, nil
|
return res, nil
|
||||||
}
|
}
|
||||||
|
@ -101,20 +102,20 @@ func requestHeaders(msg xHeaderSource) []eaclSDK.Header {
|
||||||
var errMissingOID = errors.New("object ID is missing")
|
var errMissingOID = errors.New("object ID is missing")
|
||||||
|
|
||||||
// nolint: funlen
|
// nolint: funlen
|
||||||
func (c *cfg) readObjectHeaders(dst *headerSource) error {
|
func (h *cfg) readObjectHeaders(dst *headerSource) error {
|
||||||
switch m := c.msg.(type) {
|
switch m := h.msg.(type) {
|
||||||
default:
|
default:
|
||||||
panic(fmt.Sprintf("unexpected message type %T", c.msg))
|
panic(fmt.Sprintf("unexpected message type %T", h.msg))
|
||||||
case requestXHeaderSource:
|
case requestXHeaderSource:
|
||||||
switch req := m.req.(type) {
|
switch req := m.req.(type) {
|
||||||
case
|
case
|
||||||
*objectV2.GetRequest,
|
*objectV2.GetRequest,
|
||||||
*objectV2.HeadRequest:
|
*objectV2.HeadRequest:
|
||||||
if c.obj == nil {
|
if h.obj == nil {
|
||||||
return errMissingOID
|
return errMissingOID
|
||||||
}
|
}
|
||||||
|
|
||||||
objHeaders, completed := c.localObjectHeaders(c.cnr, c.obj)
|
objHeaders, completed := h.localObjectHeaders(h.cnr, h.obj)
|
||||||
|
|
||||||
dst.objectHeaders = objHeaders
|
dst.objectHeaders = objHeaders
|
||||||
dst.incompleteObjectHeaders = !completed
|
dst.incompleteObjectHeaders = !completed
|
||||||
|
@ -122,18 +123,18 @@ func (c *cfg) readObjectHeaders(dst *headerSource) error {
|
||||||
*objectV2.GetRangeRequest,
|
*objectV2.GetRangeRequest,
|
||||||
*objectV2.GetRangeHashRequest,
|
*objectV2.GetRangeHashRequest,
|
||||||
*objectV2.DeleteRequest:
|
*objectV2.DeleteRequest:
|
||||||
if c.obj == nil {
|
if h.obj == nil {
|
||||||
return errMissingOID
|
return errMissingOID
|
||||||
}
|
}
|
||||||
|
|
||||||
dst.objectHeaders = addressHeaders(c.cnr, c.obj)
|
dst.objectHeaders = addressHeaders(h.cnr, h.obj)
|
||||||
case *objectV2.PutRequest:
|
case *objectV2.PutRequest:
|
||||||
if v, ok := req.GetBody().GetObjectPart().(*objectV2.PutObjectPartInit); ok {
|
if v, ok := req.GetBody().GetObjectPart().(*objectV2.PutObjectPartInit); ok {
|
||||||
oV2 := new(objectV2.Object)
|
oV2 := new(objectV2.Object)
|
||||||
oV2.SetObjectID(v.GetObjectID())
|
oV2.SetObjectID(v.GetObjectID())
|
||||||
oV2.SetHeader(v.GetHeader())
|
oV2.SetHeader(v.GetHeader())
|
||||||
|
|
||||||
dst.objectHeaders = headersFromObject(object.NewFromV2(oV2), c.cnr, c.obj)
|
dst.objectHeaders = headersFromObject(object.NewFromV2(oV2), h.cnr, h.obj)
|
||||||
}
|
}
|
||||||
case *objectV2.SearchRequest:
|
case *objectV2.SearchRequest:
|
||||||
cnrV2 := req.GetBody().GetContainerID()
|
cnrV2 := req.GetBody().GetContainerID()
|
||||||
|
@ -150,7 +151,7 @@ func (c *cfg) readObjectHeaders(dst *headerSource) error {
|
||||||
case responseXHeaderSource:
|
case responseXHeaderSource:
|
||||||
switch resp := m.resp.(type) {
|
switch resp := m.resp.(type) {
|
||||||
default:
|
default:
|
||||||
objectHeaders, completed := c.localObjectHeaders(c.cnr, c.obj)
|
objectHeaders, completed := h.localObjectHeaders(h.cnr, h.obj)
|
||||||
|
|
||||||
dst.objectHeaders = objectHeaders
|
dst.objectHeaders = objectHeaders
|
||||||
dst.incompleteObjectHeaders = !completed
|
dst.incompleteObjectHeaders = !completed
|
||||||
|
@ -160,7 +161,7 @@ func (c *cfg) readObjectHeaders(dst *headerSource) error {
|
||||||
oV2.SetObjectID(v.GetObjectID())
|
oV2.SetObjectID(v.GetObjectID())
|
||||||
oV2.SetHeader(v.GetHeader())
|
oV2.SetHeader(v.GetHeader())
|
||||||
|
|
||||||
dst.objectHeaders = headersFromObject(object.NewFromV2(oV2), c.cnr, c.obj)
|
dst.objectHeaders = headersFromObject(object.NewFromV2(oV2), h.cnr, h.obj)
|
||||||
}
|
}
|
||||||
case *objectV2.HeadResponse:
|
case *objectV2.HeadResponse:
|
||||||
oV2 := new(objectV2.Object)
|
oV2 := new(objectV2.Object)
|
||||||
|
@ -172,7 +173,7 @@ func (c *cfg) readObjectHeaders(dst *headerSource) error {
|
||||||
hdr = new(objectV2.Header)
|
hdr = new(objectV2.Header)
|
||||||
|
|
||||||
var idV2 refsV2.ContainerID
|
var idV2 refsV2.ContainerID
|
||||||
c.cnr.WriteToV2(&idV2)
|
h.cnr.WriteToV2(&idV2)
|
||||||
|
|
||||||
hdr.SetContainerID(&idV2)
|
hdr.SetContainerID(&idV2)
|
||||||
hdr.SetVersion(v.GetVersion())
|
hdr.SetVersion(v.GetVersion())
|
||||||
|
@ -186,20 +187,20 @@ func (c *cfg) readObjectHeaders(dst *headerSource) error {
|
||||||
|
|
||||||
oV2.SetHeader(hdr)
|
oV2.SetHeader(hdr)
|
||||||
|
|
||||||
dst.objectHeaders = headersFromObject(object.NewFromV2(oV2), c.cnr, c.obj)
|
dst.objectHeaders = headersFromObject(object.NewFromV2(oV2), h.cnr, h.obj)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *cfg) localObjectHeaders(cnr cid.ID, idObj *oid.ID) ([]eaclSDK.Header, bool) {
|
func (h *cfg) localObjectHeaders(cnr cid.ID, idObj *oid.ID) ([]eaclSDK.Header, bool) {
|
||||||
if idObj != nil {
|
if idObj != nil {
|
||||||
var addr oid.Address
|
var addr oid.Address
|
||||||
addr.SetContainer(cnr)
|
addr.SetContainer(cnr)
|
||||||
addr.SetObject(*idObj)
|
addr.SetObject(*idObj)
|
||||||
|
|
||||||
obj, err := c.storage.Head(addr)
|
obj, err := h.storage.Head(addr)
|
||||||
if err == nil {
|
if err == nil {
|
||||||
return headersFromObject(obj, cnr, idObj), true
|
return headersFromObject(obj, cnr, idObj), true
|
||||||
}
|
}
|
||||||
|
|
|
@ -8,12 +8,14 @@ import (
|
||||||
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
||||||
)
|
)
|
||||||
|
|
||||||
type localStorage engine.StorageEngine
|
type localStorage struct {
|
||||||
|
ls *engine.StorageEngine
|
||||||
|
}
|
||||||
|
|
||||||
func (s *localStorage) Head(addr oid.Address) (*objectSDK.Object, error) {
|
func (s *localStorage) Head(addr oid.Address) (*objectSDK.Object, error) {
|
||||||
if s == nil {
|
if s.ls == nil {
|
||||||
return nil, io.ErrUnexpectedEOF
|
return nil, io.ErrUnexpectedEOF
|
||||||
}
|
}
|
||||||
|
|
||||||
return engine.Head((*engine.StorageEngine)(s), addr)
|
return engine.Head(s.ls, addr)
|
||||||
}
|
}
|
||||||
|
|
|
@ -14,7 +14,9 @@ func WithObjectStorage(v ObjectStorage) Option {
|
||||||
|
|
||||||
func WithLocalObjectStorage(v *engine.StorageEngine) Option {
|
func WithLocalObjectStorage(v *engine.StorageEngine) Option {
|
||||||
return func(c *cfg) {
|
return func(c *cfg) {
|
||||||
c.storage = (*localStorage)(v)
|
c.storage = &localStorage{
|
||||||
|
ls: v,
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -21,7 +21,7 @@ import (
|
||||||
|
|
||||||
// Service checks basic ACL rules.
|
// Service checks basic ACL rules.
|
||||||
type Service struct {
|
type Service struct {
|
||||||
cfg
|
*cfg
|
||||||
|
|
||||||
c senderClassifier
|
c senderClassifier
|
||||||
}
|
}
|
||||||
|
@ -72,17 +72,18 @@ type cfg struct {
|
||||||
next object.ServiceServer
|
next object.ServiceServer
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *cfg) initDefault() {
|
func defaultCfg() *cfg {
|
||||||
c.log = &logger.Logger{Logger: zap.L()}
|
return &cfg{
|
||||||
|
log: &logger.Logger{Logger: zap.L()},
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// New is a constructor for object ACL checking service.
|
// New is a constructor for object ACL checking service.
|
||||||
func New(opts ...Option) *Service {
|
func New(opts ...Option) Service {
|
||||||
var s Service
|
cfg := defaultCfg()
|
||||||
s.cfg.initDefault()
|
|
||||||
|
|
||||||
for i := range opts {
|
for i := range opts {
|
||||||
opts[i](&s.cfg)
|
opts[i](cfg)
|
||||||
}
|
}
|
||||||
|
|
||||||
panicOnNil := func(v any, name string) {
|
panicOnNil := func(v any, name string) {
|
||||||
|
@ -91,18 +92,20 @@ func New(opts ...Option) *Service {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
panicOnNil(s.cfg.next, "next Service")
|
panicOnNil(cfg.next, "next Service")
|
||||||
panicOnNil(s.cfg.nm, "netmap client")
|
panicOnNil(cfg.nm, "netmap client")
|
||||||
panicOnNil(s.cfg.irFetcher, "inner Ring fetcher")
|
panicOnNil(cfg.irFetcher, "inner Ring fetcher")
|
||||||
panicOnNil(s.cfg.checker, "acl checker")
|
panicOnNil(cfg.checker, "acl checker")
|
||||||
panicOnNil(s.cfg.containers, "container source")
|
panicOnNil(cfg.containers, "container source")
|
||||||
|
|
||||||
s.c = senderClassifier{
|
return Service{
|
||||||
log: s.cfg.log,
|
cfg: cfg,
|
||||||
innerRing: s.cfg.irFetcher,
|
c: senderClassifier{
|
||||||
netmap: s.cfg.nm,
|
log: cfg.log,
|
||||||
|
innerRing: cfg.irFetcher,
|
||||||
|
netmap: cfg.nm,
|
||||||
|
},
|
||||||
}
|
}
|
||||||
return &s
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Get implements ServiceServer interface, makes ACL checks and calls
|
// Get implements ServiceServer interface, makes ACL checks and calls
|
||||||
|
|
|
@ -5,7 +5,9 @@ import (
|
||||||
"strconv"
|
"strconv"
|
||||||
|
|
||||||
objectV2 "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/object"
|
objectV2 "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/object"
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/util"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger"
|
||||||
|
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
|
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
|
||||||
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
||||||
"go.uber.org/zap"
|
"go.uber.org/zap"
|
||||||
|
@ -43,17 +45,37 @@ const (
|
||||||
func (exec *execCtx) setLogger(l *logger.Logger) {
|
func (exec *execCtx) setLogger(l *logger.Logger) {
|
||||||
exec.log = &logger.Logger{Logger: l.With(
|
exec.log = &logger.Logger{Logger: l.With(
|
||||||
zap.String("request", "DELETE"),
|
zap.String("request", "DELETE"),
|
||||||
zap.Stringer("address", exec.prm.addr),
|
zap.Stringer("address", exec.address()),
|
||||||
zap.Bool("local", exec.prm.common.LocalOnly()),
|
zap.Bool("local", exec.isLocal()),
|
||||||
zap.Bool("with session", exec.prm.common.SessionToken() != nil),
|
zap.Bool("with session", exec.prm.common.SessionToken() != nil),
|
||||||
zap.Bool("with bearer", exec.prm.common.BearerToken() != nil),
|
zap.Bool("with bearer", exec.prm.common.BearerToken() != nil),
|
||||||
)}
|
)}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (exec execCtx) context() context.Context {
|
||||||
|
return exec.ctx
|
||||||
|
}
|
||||||
|
|
||||||
|
func (exec execCtx) isLocal() bool {
|
||||||
|
return exec.prm.common.LocalOnly()
|
||||||
|
}
|
||||||
|
|
||||||
|
func (exec *execCtx) address() oid.Address {
|
||||||
|
return exec.prm.addr
|
||||||
|
}
|
||||||
|
|
||||||
|
func (exec *execCtx) containerID() cid.ID {
|
||||||
|
return exec.prm.addr.Container()
|
||||||
|
}
|
||||||
|
|
||||||
|
func (exec *execCtx) commonParameters() *util.CommonPrm {
|
||||||
|
return exec.prm.common
|
||||||
|
}
|
||||||
|
|
||||||
func (exec *execCtx) newAddress(id oid.ID) oid.Address {
|
func (exec *execCtx) newAddress(id oid.ID) oid.Address {
|
||||||
var a oid.Address
|
var a oid.Address
|
||||||
a.SetObject(id)
|
a.SetObject(id)
|
||||||
a.SetContainer(exec.prm.addr.Container())
|
a.SetContainer(exec.containerID())
|
||||||
|
|
||||||
return a
|
return a
|
||||||
}
|
}
|
||||||
|
@ -219,11 +241,11 @@ func (exec *execCtx) initTombstoneObject() bool {
|
||||||
}
|
}
|
||||||
|
|
||||||
exec.tombstoneObj = object.New()
|
exec.tombstoneObj = object.New()
|
||||||
exec.tombstoneObj.SetContainerID(exec.prm.addr.Container())
|
exec.tombstoneObj.SetContainerID(exec.containerID())
|
||||||
exec.tombstoneObj.SetType(object.TypeTombstone)
|
exec.tombstoneObj.SetType(object.TypeTombstone)
|
||||||
exec.tombstoneObj.SetPayload(payload)
|
exec.tombstoneObj.SetPayload(payload)
|
||||||
|
|
||||||
tokenSession := exec.prm.common.SessionToken()
|
tokenSession := exec.commonParameters().SessionToken()
|
||||||
if tokenSession != nil {
|
if tokenSession != nil {
|
||||||
issuer := tokenSession.Issuer()
|
issuer := tokenSession.Issuer()
|
||||||
exec.tombstoneObj.SetOwnerID(&issuer)
|
exec.tombstoneObj.SetOwnerID(&issuer)
|
||||||
|
|
|
@ -36,7 +36,7 @@ func (exec *execCtx) formTombstone() (ok bool) {
|
||||||
exec.tombstone.SetExpirationEpoch(
|
exec.tombstone.SetExpirationEpoch(
|
||||||
exec.svc.netInfo.CurrentEpoch() + tsLifetime,
|
exec.svc.netInfo.CurrentEpoch() + tsLifetime,
|
||||||
)
|
)
|
||||||
exec.addMembers([]oid.ID{exec.prm.addr.Object()})
|
exec.addMembers([]oid.ID{exec.address().Object()})
|
||||||
|
|
||||||
exec.log.Debug("forming split info...")
|
exec.log.Debug("forming split info...")
|
||||||
|
|
||||||
|
|
|
@ -15,7 +15,7 @@ import (
|
||||||
|
|
||||||
// Service utility serving requests of Object.Get service.
|
// Service utility serving requests of Object.Get service.
|
||||||
type Service struct {
|
type Service struct {
|
||||||
cfg
|
*cfg
|
||||||
}
|
}
|
||||||
|
|
||||||
// Option is a Service's constructor option.
|
// Option is a Service's constructor option.
|
||||||
|
@ -60,21 +60,24 @@ type cfg struct {
|
||||||
keyStorage *util.KeyStorage
|
keyStorage *util.KeyStorage
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *cfg) initDefault() {
|
func defaultCfg() *cfg {
|
||||||
c.log = &logger.Logger{Logger: zap.L()}
|
return &cfg{
|
||||||
|
log: &logger.Logger{Logger: zap.L()},
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// New creates, initializes and returns utility serving
|
// New creates, initializes and returns utility serving
|
||||||
// Object.Get service requests.
|
// Object.Get service requests.
|
||||||
func New(opts ...Option) *Service {
|
func New(opts ...Option) *Service {
|
||||||
var s Service
|
c := defaultCfg()
|
||||||
s.cfg.initDefault()
|
|
||||||
|
|
||||||
for i := range opts {
|
for i := range opts {
|
||||||
opts[i](&s.cfg)
|
opts[i](c)
|
||||||
}
|
}
|
||||||
|
|
||||||
return &s
|
return &Service{
|
||||||
|
cfg: c,
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// WithLogger returns option to specify Delete service's logger.
|
// WithLogger returns option to specify Delete service's logger.
|
||||||
|
|
|
@ -24,12 +24,12 @@ func (w *headSvcWrapper) headAddress(exec *execCtx, addr oid.Address) (*object.O
|
||||||
wr := getsvc.NewSimpleObjectWriter()
|
wr := getsvc.NewSimpleObjectWriter()
|
||||||
|
|
||||||
p := getsvc.HeadPrm{}
|
p := getsvc.HeadPrm{}
|
||||||
p.SetCommonParameters(exec.prm.common)
|
p.SetCommonParameters(exec.commonParameters())
|
||||||
p.SetHeaderWriter(wr)
|
p.SetHeaderWriter(wr)
|
||||||
p.WithRawFlag(true)
|
p.WithRawFlag(true)
|
||||||
p.WithAddress(addr)
|
p.WithAddress(addr)
|
||||||
|
|
||||||
err := (*getsvc.Service)(w).Head(exec.ctx, p)
|
err := (*getsvc.Service)(w).Head(exec.context(), p)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
@ -38,7 +38,7 @@ func (w *headSvcWrapper) headAddress(exec *execCtx, addr oid.Address) (*object.O
|
||||||
}
|
}
|
||||||
|
|
||||||
func (w *headSvcWrapper) splitInfo(exec *execCtx) (*object.SplitInfo, error) {
|
func (w *headSvcWrapper) splitInfo(exec *execCtx) (*object.SplitInfo, error) {
|
||||||
_, err := w.headAddress(exec, exec.prm.addr)
|
_, err := w.headAddress(exec, exec.address())
|
||||||
|
|
||||||
var errSplitInfo *object.SplitInfoError
|
var errSplitInfo *object.SplitInfoError
|
||||||
|
|
||||||
|
@ -89,11 +89,11 @@ func (w *searchSvcWrapper) splitMembers(exec *execCtx) ([]oid.ID, error) {
|
||||||
|
|
||||||
p := searchsvc.Prm{}
|
p := searchsvc.Prm{}
|
||||||
p.SetWriter(wr)
|
p.SetWriter(wr)
|
||||||
p.SetCommonParameters(exec.prm.common)
|
p.SetCommonParameters(exec.commonParameters())
|
||||||
p.WithContainerID(exec.prm.addr.Container())
|
p.WithContainerID(exec.containerID())
|
||||||
p.WithSearchFilters(fs)
|
p.WithSearchFilters(fs)
|
||||||
|
|
||||||
err := (*searchsvc.Service)(w).Search(exec.ctx, p)
|
err := (*searchsvc.Service)(w).Search(exec.context(), p)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
@ -108,7 +108,7 @@ func (s *simpleIDWriter) WriteIDs(ids []oid.ID) error {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (w *putSvcWrapper) put(exec *execCtx) (*oid.ID, error) {
|
func (w *putSvcWrapper) put(exec *execCtx) (*oid.ID, error) {
|
||||||
streamer, err := (*putsvc.Service)(w).Put(exec.ctx)
|
streamer, err := (*putsvc.Service)(w).Put(exec.context())
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
@ -116,7 +116,7 @@ func (w *putSvcWrapper) put(exec *execCtx) (*oid.ID, error) {
|
||||||
payload := exec.tombstoneObj.Payload()
|
payload := exec.tombstoneObj.Payload()
|
||||||
|
|
||||||
initPrm := new(putsvc.PutInitPrm).
|
initPrm := new(putsvc.PutInitPrm).
|
||||||
WithCommonPrm(exec.prm.common).
|
WithCommonPrm(exec.commonParameters()).
|
||||||
WithObject(exec.tombstoneObj.CutPayload())
|
WithObject(exec.tombstoneObj.CutPayload())
|
||||||
|
|
||||||
err = streamer.Init(initPrm)
|
err = streamer.Init(initPrm)
|
||||||
|
|
|
@ -9,7 +9,7 @@ import (
|
||||||
|
|
||||||
// Service implements Delete operation of Object service v2.
|
// Service implements Delete operation of Object service v2.
|
||||||
type Service struct {
|
type Service struct {
|
||||||
cfg
|
*cfg
|
||||||
}
|
}
|
||||||
|
|
||||||
// Option represents Service constructor option.
|
// Option represents Service constructor option.
|
||||||
|
@ -21,13 +21,15 @@ type cfg struct {
|
||||||
|
|
||||||
// NewService constructs Service instance from provided options.
|
// NewService constructs Service instance from provided options.
|
||||||
func NewService(opts ...Option) *Service {
|
func NewService(opts ...Option) *Service {
|
||||||
var s Service
|
c := new(cfg)
|
||||||
|
|
||||||
for i := range opts {
|
for i := range opts {
|
||||||
opts[i](&s.cfg)
|
opts[i](c)
|
||||||
}
|
}
|
||||||
|
|
||||||
return &s
|
return &Service{
|
||||||
|
cfg: c,
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Delete calls internal service.
|
// Delete calls internal service.
|
||||||
|
|
|
@ -36,26 +36,26 @@ func (exec *execCtx) assemble() {
|
||||||
|
|
||||||
exec.log.Debug("trying to assemble the object...")
|
exec.log.Debug("trying to assemble the object...")
|
||||||
|
|
||||||
assembler := newAssembler(exec.prm.addr, exec.splitInfo, exec.prm.rng, exec)
|
assembler := newAssembler(exec.address(), exec.splitInfo(), exec.ctxRange(), exec)
|
||||||
|
|
||||||
exec.log.Debug("assembling splitted object...",
|
exec.log.Debug("assembling splitted object...",
|
||||||
zap.Stringer("address", exec.prm.addr),
|
zap.Stringer("address", exec.address()),
|
||||||
zap.Uint64("range_offset", exec.prm.rng.GetOffset()),
|
zap.Uint64("range_offset", exec.ctxRange().GetOffset()),
|
||||||
zap.Uint64("range_length", exec.prm.rng.GetLength()),
|
zap.Uint64("range_length", exec.ctxRange().GetLength()),
|
||||||
)
|
)
|
||||||
defer exec.log.Debug("assembling splitted object completed",
|
defer exec.log.Debug("assembling splitted object completed",
|
||||||
zap.Stringer("address", exec.prm.addr),
|
zap.Stringer("address", exec.address()),
|
||||||
zap.Uint64("range_offset", exec.prm.rng.GetOffset()),
|
zap.Uint64("range_offset", exec.ctxRange().GetOffset()),
|
||||||
zap.Uint64("range_length", exec.prm.rng.GetLength()),
|
zap.Uint64("range_length", exec.ctxRange().GetLength()),
|
||||||
)
|
)
|
||||||
|
|
||||||
obj, err := assembler.Assemble(exec.ctx, exec.prm.objWriter)
|
obj, err := assembler.Assemble(exec.context(), exec.prm.objWriter)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
exec.log.Warn("failed to assemble splitted object",
|
exec.log.Warn("failed to assemble splitted object",
|
||||||
zap.Error(err),
|
zap.Error(err),
|
||||||
zap.Stringer("address", exec.prm.addr),
|
zap.Stringer("address", exec.address()),
|
||||||
zap.Uint64("range_offset", exec.prm.rng.GetOffset()),
|
zap.Uint64("range_offset", exec.ctxRange().GetOffset()),
|
||||||
zap.Uint64("range_length", exec.prm.rng.GetLength()),
|
zap.Uint64("range_length", exec.ctxRange().GetLength()),
|
||||||
)
|
)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -98,7 +98,7 @@ func equalAddresses(a, b oid.Address) bool {
|
||||||
func (exec *execCtx) HeadObject(ctx context.Context, id oid.ID) (*objectSDK.Object, error) {
|
func (exec *execCtx) HeadObject(ctx context.Context, id oid.ID) (*objectSDK.Object, error) {
|
||||||
p := exec.prm
|
p := exec.prm
|
||||||
p.common = p.common.WithLocalOnly(false)
|
p.common = p.common.WithLocalOnly(false)
|
||||||
p.addr.SetContainer(exec.prm.addr.Container())
|
p.addr.SetContainer(exec.containerID())
|
||||||
p.addr.SetObject(id)
|
p.addr.SetObject(id)
|
||||||
|
|
||||||
prm := HeadPrm{
|
prm := HeadPrm{
|
||||||
|
@ -108,7 +108,7 @@ func (exec *execCtx) HeadObject(ctx context.Context, id oid.ID) (*objectSDK.Obje
|
||||||
w := NewSimpleObjectWriter()
|
w := NewSimpleObjectWriter()
|
||||||
prm.SetHeaderWriter(w)
|
prm.SetHeaderWriter(w)
|
||||||
//nolint: contextcheck
|
//nolint: contextcheck
|
||||||
err := exec.svc.Head(exec.ctx, prm)
|
err := exec.svc.Head(exec.context(), prm)
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
|
@ -125,11 +125,11 @@ func (exec *execCtx) GetObject(ctx context.Context, id oid.ID, rng *objectSDK.Ra
|
||||||
p.objWriter = w
|
p.objWriter = w
|
||||||
p.SetRange(rng)
|
p.SetRange(rng)
|
||||||
|
|
||||||
p.addr.SetContainer(exec.prm.addr.Container())
|
p.addr.SetContainer(exec.containerID())
|
||||||
p.addr.SetObject(id)
|
p.addr.SetObject(id)
|
||||||
|
|
||||||
//nolint: contextcheck
|
//nolint: contextcheck
|
||||||
statusError := exec.svc.get(exec.ctx, p.commonPrm, withPayloadRange(rng))
|
statusError := exec.svc.get(exec.context(), p.commonPrm, withPayloadRange(rng))
|
||||||
|
|
||||||
if statusError.err != nil {
|
if statusError.err != nil {
|
||||||
return nil, statusError.err
|
return nil, statusError.err
|
||||||
|
|
|
@ -8,12 +8,12 @@ import (
|
||||||
)
|
)
|
||||||
|
|
||||||
func (exec *execCtx) executeOnContainer() {
|
func (exec *execCtx) executeOnContainer() {
|
||||||
if exec.prm.common.LocalOnly() {
|
if exec.isLocal() {
|
||||||
exec.log.Debug("return result directly")
|
exec.log.Debug("return result directly")
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
lookupDepth := exec.prm.common.NetmapLookupDepth()
|
lookupDepth := exec.netmapLookupDepth()
|
||||||
|
|
||||||
exec.log.Debug("trying to execute in container...",
|
exec.log.Debug("trying to execute in container...",
|
||||||
zap.Uint64("netmap lookup depth", lookupDepth),
|
zap.Uint64("netmap lookup depth", lookupDepth),
|
||||||
|
@ -47,12 +47,12 @@ func (exec *execCtx) processCurrentEpoch() bool {
|
||||||
zap.Uint64("number", exec.curProcEpoch),
|
zap.Uint64("number", exec.curProcEpoch),
|
||||||
)
|
)
|
||||||
|
|
||||||
traverser, ok := exec.generateTraverser(exec.prm.addr)
|
traverser, ok := exec.generateTraverser(exec.address())
|
||||||
if !ok {
|
if !ok {
|
||||||
return true
|
return true
|
||||||
}
|
}
|
||||||
|
|
||||||
ctx, cancel := context.WithCancel(exec.ctx)
|
ctx, cancel := context.WithCancel(exec.context())
|
||||||
defer cancel()
|
defer cancel()
|
||||||
|
|
||||||
exec.status = statusUndefined
|
exec.status = statusUndefined
|
||||||
|
|
|
@ -8,6 +8,7 @@ import (
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/util"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/util"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object_manager/placement"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object_manager/placement"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger"
|
||||||
|
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
|
||||||
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
|
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
|
||||||
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
||||||
"go.uber.org/zap"
|
"go.uber.org/zap"
|
||||||
|
@ -28,13 +29,13 @@ type execCtx struct {
|
||||||
|
|
||||||
statusError
|
statusError
|
||||||
|
|
||||||
splitInfo *objectSDK.SplitInfo
|
infoSplit *objectSDK.SplitInfo
|
||||||
|
|
||||||
log *logger.Logger
|
log *logger.Logger
|
||||||
|
|
||||||
collectedObject *objectSDK.Object
|
collectedObject *objectSDK.Object
|
||||||
|
|
||||||
headOnly bool
|
head bool
|
||||||
|
|
||||||
curProcEpoch uint64
|
curProcEpoch uint64
|
||||||
}
|
}
|
||||||
|
@ -51,7 +52,7 @@ const (
|
||||||
|
|
||||||
func headOnly() execOption {
|
func headOnly() execOption {
|
||||||
return func(c *execCtx) {
|
return func(c *execCtx) {
|
||||||
c.headOnly = true
|
c.head = true
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -63,22 +64,38 @@ func withPayloadRange(r *objectSDK.Range) execOption {
|
||||||
|
|
||||||
func (exec *execCtx) setLogger(l *logger.Logger) {
|
func (exec *execCtx) setLogger(l *logger.Logger) {
|
||||||
req := "GET"
|
req := "GET"
|
||||||
if exec.headOnly {
|
if exec.headOnly() {
|
||||||
req = "HEAD"
|
req = "HEAD"
|
||||||
} else if exec.prm.rng != nil {
|
} else if exec.ctxRange() != nil {
|
||||||
req = "GET_RANGE"
|
req = "GET_RANGE"
|
||||||
}
|
}
|
||||||
|
|
||||||
exec.log = &logger.Logger{Logger: l.With(
|
exec.log = &logger.Logger{Logger: l.With(
|
||||||
zap.String("request", req),
|
zap.String("request", req),
|
||||||
zap.Stringer("address", exec.prm.addr),
|
zap.Stringer("address", exec.address()),
|
||||||
zap.Bool("raw", exec.prm.raw),
|
zap.Bool("raw", exec.isRaw()),
|
||||||
zap.Bool("local", exec.prm.common.LocalOnly()),
|
zap.Bool("local", exec.isLocal()),
|
||||||
zap.Bool("with session", exec.prm.common.SessionToken() != nil),
|
zap.Bool("with session", exec.prm.common.SessionToken() != nil),
|
||||||
zap.Bool("with bearer", exec.prm.common.BearerToken() != nil),
|
zap.Bool("with bearer", exec.prm.common.BearerToken() != nil),
|
||||||
)}
|
)}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (exec execCtx) context() context.Context {
|
||||||
|
return exec.ctx
|
||||||
|
}
|
||||||
|
|
||||||
|
func (exec execCtx) isLocal() bool {
|
||||||
|
return exec.prm.common.LocalOnly()
|
||||||
|
}
|
||||||
|
|
||||||
|
func (exec execCtx) isRaw() bool {
|
||||||
|
return exec.prm.raw
|
||||||
|
}
|
||||||
|
|
||||||
|
func (exec execCtx) address() oid.Address {
|
||||||
|
return exec.prm.addr
|
||||||
|
}
|
||||||
|
|
||||||
func (exec execCtx) key() (*ecdsa.PrivateKey, error) {
|
func (exec execCtx) key() (*ecdsa.PrivateKey, error) {
|
||||||
if exec.prm.signerKey != nil {
|
if exec.prm.signerKey != nil {
|
||||||
// the key has already been requested and
|
// the key has already been requested and
|
||||||
|
@ -99,16 +116,40 @@ func (exec execCtx) key() (*ecdsa.PrivateKey, error) {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (exec *execCtx) canAssemble() bool {
|
func (exec *execCtx) canAssemble() bool {
|
||||||
return !exec.prm.raw && !exec.headOnly
|
return !exec.isRaw() && !exec.headOnly()
|
||||||
|
}
|
||||||
|
|
||||||
|
func (exec *execCtx) splitInfo() *objectSDK.SplitInfo {
|
||||||
|
return exec.infoSplit
|
||||||
|
}
|
||||||
|
|
||||||
|
func (exec *execCtx) containerID() cid.ID {
|
||||||
|
return exec.address().Container()
|
||||||
|
}
|
||||||
|
|
||||||
|
func (exec *execCtx) ctxRange() *objectSDK.Range {
|
||||||
|
return exec.prm.rng
|
||||||
|
}
|
||||||
|
|
||||||
|
func (exec *execCtx) headOnly() bool {
|
||||||
|
return exec.head
|
||||||
|
}
|
||||||
|
|
||||||
|
func (exec *execCtx) netmapEpoch() uint64 {
|
||||||
|
return exec.prm.common.NetmapEpoch()
|
||||||
|
}
|
||||||
|
|
||||||
|
func (exec *execCtx) netmapLookupDepth() uint64 {
|
||||||
|
return exec.prm.common.NetmapLookupDepth()
|
||||||
}
|
}
|
||||||
|
|
||||||
func (exec *execCtx) initEpoch() bool {
|
func (exec *execCtx) initEpoch() bool {
|
||||||
exec.curProcEpoch = exec.prm.common.NetmapEpoch()
|
exec.curProcEpoch = exec.netmapEpoch()
|
||||||
if exec.curProcEpoch > 0 {
|
if exec.curProcEpoch > 0 {
|
||||||
return true
|
return true
|
||||||
}
|
}
|
||||||
|
|
||||||
e, err := exec.svc.epochSource.Epoch()
|
e, err := exec.svc.currentEpochReceiver.currentEpoch()
|
||||||
|
|
||||||
switch {
|
switch {
|
||||||
default:
|
default:
|
||||||
|
@ -177,12 +218,12 @@ func mergeSplitInfo(dst, src *objectSDK.SplitInfo) {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (exec *execCtx) writeCollectedHeader() bool {
|
func (exec *execCtx) writeCollectedHeader() bool {
|
||||||
if exec.prm.rng != nil {
|
if exec.ctxRange() != nil {
|
||||||
return true
|
return true
|
||||||
}
|
}
|
||||||
|
|
||||||
err := exec.prm.objWriter.WriteHeader(
|
err := exec.prm.objWriter.WriteHeader(
|
||||||
exec.ctx,
|
exec.context(),
|
||||||
exec.collectedObject.CutPayload(),
|
exec.collectedObject.CutPayload(),
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -203,11 +244,11 @@ func (exec *execCtx) writeCollectedHeader() bool {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (exec *execCtx) writeObjectPayload(obj *objectSDK.Object) bool {
|
func (exec *execCtx) writeObjectPayload(obj *objectSDK.Object) bool {
|
||||||
if exec.headOnly {
|
if exec.headOnly() {
|
||||||
return true
|
return true
|
||||||
}
|
}
|
||||||
|
|
||||||
err := exec.prm.objWriter.WriteChunk(exec.ctx, obj.Payload())
|
err := exec.prm.objWriter.WriteChunk(exec.context(), obj.Payload())
|
||||||
|
|
||||||
switch {
|
switch {
|
||||||
default:
|
default:
|
||||||
|
@ -231,6 +272,12 @@ func (exec *execCtx) writeCollectedObject() {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// isForwardingEnabled returns true if common execution
|
||||||
|
// parameters has request forwarding closure set.
|
||||||
|
func (exec execCtx) isForwardingEnabled() bool {
|
||||||
|
return exec.prm.forwarder != nil
|
||||||
|
}
|
||||||
|
|
||||||
// disableForwarding removes request forwarding closure from common
|
// disableForwarding removes request forwarding closure from common
|
||||||
// parameters, so it won't be inherited in new execution contexts.
|
// parameters, so it won't be inherited in new execution contexts.
|
||||||
func (exec *execCtx) disableForwarding() {
|
func (exec *execCtx) disableForwarding() {
|
||||||
|
|
|
@ -69,7 +69,7 @@ func (s *Service) get(ctx context.Context, prm commonPrm, opts ...execOption) st
|
||||||
prm: RangePrm{
|
prm: RangePrm{
|
||||||
commonPrm: prm,
|
commonPrm: prm,
|
||||||
},
|
},
|
||||||
splitInfo: object.NewSplitInfo(),
|
infoSplit: object.NewSplitInfo(),
|
||||||
}
|
}
|
||||||
|
|
||||||
for i := range opts {
|
for i := range opts {
|
||||||
|
|
|
@ -56,7 +56,7 @@ type testClient struct {
|
||||||
|
|
||||||
type testEpochReceiver uint64
|
type testEpochReceiver uint64
|
||||||
|
|
||||||
func (e testEpochReceiver) Epoch() (uint64, error) {
|
func (e testEpochReceiver) currentEpoch() (uint64, error) {
|
||||||
return uint64(e), nil
|
return uint64(e), nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -118,7 +118,7 @@ func newTestClient() *testClient {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *testClient) getObject(exec *execCtx, _ client.NodeInfo) (*objectSDK.Object, error) {
|
func (c *testClient) getObject(exec *execCtx, _ client.NodeInfo) (*objectSDK.Object, error) {
|
||||||
v, ok := c.results[exec.prm.addr.EncodeToString()]
|
v, ok := c.results[exec.address().EncodeToString()]
|
||||||
if !ok {
|
if !ok {
|
||||||
var errNotFound apistatus.ObjectNotFound
|
var errNotFound apistatus.ObjectNotFound
|
||||||
|
|
||||||
|
@ -129,7 +129,7 @@ func (c *testClient) getObject(exec *execCtx, _ client.NodeInfo) (*objectSDK.Obj
|
||||||
return nil, v.err
|
return nil, v.err
|
||||||
}
|
}
|
||||||
|
|
||||||
return cutToRange(v.obj, exec.prm.rng), nil
|
return cutToRange(v.obj, exec.ctxRange()), nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *testClient) addResult(addr oid.Address, obj *objectSDK.Object, err error) {
|
func (c *testClient) addResult(addr oid.Address, obj *objectSDK.Object, err error) {
|
||||||
|
@ -143,7 +143,7 @@ func (s *testStorage) get(exec *execCtx) (*objectSDK.Object, error) {
|
||||||
var (
|
var (
|
||||||
ok bool
|
ok bool
|
||||||
obj *objectSDK.Object
|
obj *objectSDK.Object
|
||||||
sAddr = exec.prm.addr.EncodeToString()
|
sAddr = exec.address().EncodeToString()
|
||||||
)
|
)
|
||||||
|
|
||||||
if _, ok = s.inhumed[sAddr]; ok {
|
if _, ok = s.inhumed[sAddr]; ok {
|
||||||
|
@ -157,7 +157,7 @@ func (s *testStorage) get(exec *execCtx) (*objectSDK.Object, error) {
|
||||||
}
|
}
|
||||||
|
|
||||||
if obj, ok = s.phy[sAddr]; ok {
|
if obj, ok = s.phy[sAddr]; ok {
|
||||||
return cutToRange(obj, exec.prm.rng), nil
|
return cutToRange(obj, exec.ctxRange()), nil
|
||||||
}
|
}
|
||||||
|
|
||||||
var errNotFound apistatus.ObjectNotFound
|
var errNotFound apistatus.ObjectNotFound
|
||||||
|
@ -245,7 +245,7 @@ func TestGetLocalOnly(t *testing.T) {
|
||||||
ctx := context.Background()
|
ctx := context.Background()
|
||||||
|
|
||||||
newSvc := func(storage *testStorage) *Service {
|
newSvc := func(storage *testStorage) *Service {
|
||||||
svc := &Service{}
|
svc := &Service{cfg: new(cfg)}
|
||||||
svc.log = test.NewLogger(false)
|
svc.log = test.NewLogger(false)
|
||||||
svc.localStorage = storage
|
svc.localStorage = storage
|
||||||
|
|
||||||
|
@ -506,7 +506,7 @@ func TestGetRemoteSmall(t *testing.T) {
|
||||||
container.CalculateID(&idCnr, cnr)
|
container.CalculateID(&idCnr, cnr)
|
||||||
|
|
||||||
newSvc := func(b *testPlacementBuilder, c *testClientCache) *Service {
|
newSvc := func(b *testPlacementBuilder, c *testClientCache) *Service {
|
||||||
svc := &Service{}
|
svc := &Service{cfg: new(cfg)}
|
||||||
svc.log = test.NewLogger(false)
|
svc.log = test.NewLogger(false)
|
||||||
svc.localStorage = newTestStorage()
|
svc.localStorage = newTestStorage()
|
||||||
|
|
||||||
|
@ -519,7 +519,7 @@ func TestGetRemoteSmall(t *testing.T) {
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
svc.clientCache = c
|
svc.clientCache = c
|
||||||
svc.epochSource = testEpochReceiver(curEpoch)
|
svc.currentEpochReceiver = testEpochReceiver(curEpoch)
|
||||||
|
|
||||||
return svc
|
return svc
|
||||||
}
|
}
|
||||||
|
@ -1639,7 +1639,7 @@ func TestGetFromPastEpoch(t *testing.T) {
|
||||||
c22 := newTestClient()
|
c22 := newTestClient()
|
||||||
c22.addResult(addr, obj, nil)
|
c22.addResult(addr, obj, nil)
|
||||||
|
|
||||||
svc := &Service{}
|
svc := &Service{cfg: new(cfg)}
|
||||||
svc.log = test.NewLogger(false)
|
svc.log = test.NewLogger(false)
|
||||||
svc.localStorage = newTestStorage()
|
svc.localStorage = newTestStorage()
|
||||||
|
|
||||||
|
@ -1670,7 +1670,7 @@ func TestGetFromPastEpoch(t *testing.T) {
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
svc.epochSource = testEpochReceiver(curEpoch)
|
svc.currentEpochReceiver = testEpochReceiver(curEpoch)
|
||||||
|
|
||||||
w := NewSimpleObjectWriter()
|
w := NewSimpleObjectWriter()
|
||||||
|
|
||||||
|
|
|
@ -34,8 +34,8 @@ func (exec *execCtx) executeLocal() {
|
||||||
exec.err = errRemoved
|
exec.err = errRemoved
|
||||||
case errors.As(err, &errSplitInfo):
|
case errors.As(err, &errSplitInfo):
|
||||||
exec.status = statusVIRTUAL
|
exec.status = statusVIRTUAL
|
||||||
mergeSplitInfo(exec.splitInfo, errSplitInfo.SplitInfo())
|
mergeSplitInfo(exec.splitInfo(), errSplitInfo.SplitInfo())
|
||||||
exec.err = objectSDK.NewSplitInfoError(exec.splitInfo)
|
exec.err = objectSDK.NewSplitInfoError(exec.infoSplit)
|
||||||
case errors.As(err, &errOutOfRange):
|
case errors.As(err, &errOutOfRange):
|
||||||
exec.status = statusOutOfRange
|
exec.status = statusOutOfRange
|
||||||
exec.err = errOutOfRange
|
exec.err = errOutOfRange
|
||||||
|
|
|
@ -54,8 +54,8 @@ func (exec *execCtx) processNode(ctx context.Context, info client.NodeInfo) bool
|
||||||
exec.err = errOutOfRange
|
exec.err = errOutOfRange
|
||||||
case errors.As(err, &errSplitInfo):
|
case errors.As(err, &errSplitInfo):
|
||||||
exec.status = statusVIRTUAL
|
exec.status = statusVIRTUAL
|
||||||
mergeSplitInfo(exec.splitInfo, errSplitInfo.SplitInfo())
|
mergeSplitInfo(exec.splitInfo(), errSplitInfo.SplitInfo())
|
||||||
exec.err = objectSDK.NewSplitInfoError(exec.splitInfo)
|
exec.err = objectSDK.NewSplitInfoError(exec.infoSplit)
|
||||||
}
|
}
|
||||||
|
|
||||||
return exec.status != statusUndefined
|
return exec.status != statusUndefined
|
||||||
|
|
|
@ -15,7 +15,7 @@ import (
|
||||||
|
|
||||||
// Service utility serving requests of Object.Get service.
|
// Service utility serving requests of Object.Get service.
|
||||||
type Service struct {
|
type Service struct {
|
||||||
cfg
|
*cfg
|
||||||
}
|
}
|
||||||
|
|
||||||
// Option is a Service's constructor option.
|
// Option is a Service's constructor option.
|
||||||
|
@ -40,31 +40,33 @@ type cfg struct {
|
||||||
GenerateTraverser(cid.ID, *oid.ID, uint64) (*placement.Traverser, error)
|
GenerateTraverser(cid.ID, *oid.ID, uint64) (*placement.Traverser, error)
|
||||||
}
|
}
|
||||||
|
|
||||||
epochSource epochSource
|
currentEpochReceiver interface {
|
||||||
|
currentEpoch() (uint64, error)
|
||||||
|
}
|
||||||
|
|
||||||
keyStore *util.KeyStorage
|
keyStore *util.KeyStorage
|
||||||
}
|
}
|
||||||
|
|
||||||
type epochSource interface {
|
func defaultCfg() *cfg {
|
||||||
Epoch() (uint64, error)
|
return &cfg{
|
||||||
|
log: &logger.Logger{Logger: zap.L()},
|
||||||
|
localStorage: new(storageEngineWrapper),
|
||||||
|
clientCache: new(clientCacheWrapper),
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *cfg) initDefault() {
|
|
||||||
c.log = &logger.Logger{Logger: zap.L()}
|
|
||||||
c.clientCache = new(clientCacheWrapper)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// New creates, initializes and returns utility serving
|
// New creates, initializes and returns utility serving
|
||||||
// Object.Get service requests.
|
// Object.Get service requests.
|
||||||
func New(opts ...Option) *Service {
|
func New(opts ...Option) *Service {
|
||||||
var s Service
|
c := defaultCfg()
|
||||||
s.cfg.initDefault()
|
|
||||||
|
|
||||||
for i := range opts {
|
for i := range opts {
|
||||||
opts[i](&s.cfg)
|
opts[i](c)
|
||||||
}
|
}
|
||||||
|
|
||||||
return &s
|
return &Service{
|
||||||
|
cfg: c,
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// WithLogger returns option to specify Get service's logger.
|
// WithLogger returns option to specify Get service's logger.
|
||||||
|
@ -78,7 +80,7 @@ func WithLogger(l *logger.Logger) Option {
|
||||||
// instance.
|
// instance.
|
||||||
func WithLocalStorageEngine(e *engine.StorageEngine) Option {
|
func WithLocalStorageEngine(e *engine.StorageEngine) Option {
|
||||||
return func(c *cfg) {
|
return func(c *cfg) {
|
||||||
c.localStorage = (*storageEngineWrapper)(e)
|
c.localStorage.(*storageEngineWrapper).engine = e
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -105,7 +107,9 @@ func WithTraverserGenerator(t *util.TraverserGenerator) Option {
|
||||||
// map storage to receive current network state.
|
// map storage to receive current network state.
|
||||||
func WithNetMapSource(nmSrc netmap.Source) Option {
|
func WithNetMapSource(nmSrc netmap.Source) Option {
|
||||||
return func(c *cfg) {
|
return func(c *cfg) {
|
||||||
c.epochSource = nmSrc
|
c.currentEpochReceiver = &nmSrcWrapper{
|
||||||
|
nmSrc: nmSrc,
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -7,6 +7,7 @@ import (
|
||||||
"io"
|
"io"
|
||||||
|
|
||||||
coreclient "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/client"
|
coreclient "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/client"
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/netmap"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/engine"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/engine"
|
||||||
internal "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/internal/client"
|
internal "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/internal/client"
|
||||||
internalclient "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/internal/client"
|
internalclient "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/internal/client"
|
||||||
|
@ -28,7 +29,9 @@ type clientWrapper struct {
|
||||||
client coreclient.MultiAddressClient
|
client coreclient.MultiAddressClient
|
||||||
}
|
}
|
||||||
|
|
||||||
type storageEngineWrapper engine.StorageEngine
|
type storageEngineWrapper struct {
|
||||||
|
engine *engine.StorageEngine
|
||||||
|
}
|
||||||
|
|
||||||
type partWriter struct {
|
type partWriter struct {
|
||||||
ObjectWriter
|
ObjectWriter
|
||||||
|
@ -42,6 +45,10 @@ type hasherWrapper struct {
|
||||||
hash io.Writer
|
hash io.Writer
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type nmSrcWrapper struct {
|
||||||
|
nmSrc netmap.Source
|
||||||
|
}
|
||||||
|
|
||||||
func NewSimpleObjectWriter() *SimpleObjectWriter {
|
func NewSimpleObjectWriter() *SimpleObjectWriter {
|
||||||
return &SimpleObjectWriter{
|
return &SimpleObjectWriter{
|
||||||
obj: object.New(),
|
obj: object.New(),
|
||||||
|
@ -82,7 +89,7 @@ func (c *clientCacheWrapper) get(info coreclient.NodeInfo) (getClient, error) {
|
||||||
|
|
||||||
// nolint: funlen
|
// nolint: funlen
|
||||||
func (c *clientWrapper) getObject(exec *execCtx, info coreclient.NodeInfo) (*object.Object, error) {
|
func (c *clientWrapper) getObject(exec *execCtx, info coreclient.NodeInfo) (*object.Object, error) {
|
||||||
if exec.prm.forwarder != nil {
|
if exec.isForwardingEnabled() {
|
||||||
return exec.prm.forwarder(info, c.client)
|
return exec.prm.forwarder(info, c.client)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -91,20 +98,20 @@ func (c *clientWrapper) getObject(exec *execCtx, info coreclient.NodeInfo) (*obj
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
if exec.headOnly {
|
if exec.headOnly() {
|
||||||
var prm internalclient.HeadObjectPrm
|
var prm internalclient.HeadObjectPrm
|
||||||
|
|
||||||
prm.SetContext(exec.ctx)
|
prm.SetContext(exec.context())
|
||||||
prm.SetClient(c.client)
|
prm.SetClient(c.client)
|
||||||
prm.SetTTL(exec.prm.common.TTL())
|
prm.SetTTL(exec.prm.common.TTL())
|
||||||
prm.SetNetmapEpoch(exec.curProcEpoch)
|
prm.SetNetmapEpoch(exec.curProcEpoch)
|
||||||
prm.SetAddress(exec.prm.addr)
|
prm.SetAddress(exec.address())
|
||||||
prm.SetPrivateKey(key)
|
prm.SetPrivateKey(key)
|
||||||
prm.SetSessionToken(exec.prm.common.SessionToken())
|
prm.SetSessionToken(exec.prm.common.SessionToken())
|
||||||
prm.SetBearerToken(exec.prm.common.BearerToken())
|
prm.SetBearerToken(exec.prm.common.BearerToken())
|
||||||
prm.SetXHeaders(exec.prm.common.XHeaders())
|
prm.SetXHeaders(exec.prm.common.XHeaders())
|
||||||
|
|
||||||
if exec.prm.raw {
|
if exec.isRaw() {
|
||||||
prm.SetRawFlag()
|
prm.SetRawFlag()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -117,21 +124,21 @@ func (c *clientWrapper) getObject(exec *execCtx, info coreclient.NodeInfo) (*obj
|
||||||
}
|
}
|
||||||
// we don't specify payload writer because we accumulate
|
// we don't specify payload writer because we accumulate
|
||||||
// the object locally (even huge).
|
// the object locally (even huge).
|
||||||
if rng := exec.prm.rng; rng != nil {
|
if rng := exec.ctxRange(); rng != nil {
|
||||||
var prm internalclient.PayloadRangePrm
|
var prm internalclient.PayloadRangePrm
|
||||||
|
|
||||||
prm.SetContext(exec.ctx)
|
prm.SetContext(exec.context())
|
||||||
prm.SetClient(c.client)
|
prm.SetClient(c.client)
|
||||||
prm.SetTTL(exec.prm.common.TTL())
|
prm.SetTTL(exec.prm.common.TTL())
|
||||||
prm.SetNetmapEpoch(exec.curProcEpoch)
|
prm.SetNetmapEpoch(exec.curProcEpoch)
|
||||||
prm.SetAddress(exec.prm.addr)
|
prm.SetAddress(exec.address())
|
||||||
prm.SetPrivateKey(key)
|
prm.SetPrivateKey(key)
|
||||||
prm.SetSessionToken(exec.prm.common.SessionToken())
|
prm.SetSessionToken(exec.prm.common.SessionToken())
|
||||||
prm.SetBearerToken(exec.prm.common.BearerToken())
|
prm.SetBearerToken(exec.prm.common.BearerToken())
|
||||||
prm.SetXHeaders(exec.prm.common.XHeaders())
|
prm.SetXHeaders(exec.prm.common.XHeaders())
|
||||||
prm.SetRange(rng)
|
prm.SetRange(rng)
|
||||||
|
|
||||||
if exec.prm.raw {
|
if exec.isRaw() {
|
||||||
prm.SetRawFlag()
|
prm.SetRawFlag()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -168,17 +175,17 @@ func (c *clientWrapper) getObject(exec *execCtx, info coreclient.NodeInfo) (*obj
|
||||||
func (c *clientWrapper) get(exec *execCtx, key *ecdsa.PrivateKey) (*object.Object, error) {
|
func (c *clientWrapper) get(exec *execCtx, key *ecdsa.PrivateKey) (*object.Object, error) {
|
||||||
var prm internalclient.GetObjectPrm
|
var prm internalclient.GetObjectPrm
|
||||||
|
|
||||||
prm.SetContext(exec.ctx)
|
prm.SetContext(exec.context())
|
||||||
prm.SetClient(c.client)
|
prm.SetClient(c.client)
|
||||||
prm.SetTTL(exec.prm.common.TTL())
|
prm.SetTTL(exec.prm.common.TTL())
|
||||||
prm.SetNetmapEpoch(exec.curProcEpoch)
|
prm.SetNetmapEpoch(exec.curProcEpoch)
|
||||||
prm.SetAddress(exec.prm.addr)
|
prm.SetAddress(exec.address())
|
||||||
prm.SetPrivateKey(key)
|
prm.SetPrivateKey(key)
|
||||||
prm.SetSessionToken(exec.prm.common.SessionToken())
|
prm.SetSessionToken(exec.prm.common.SessionToken())
|
||||||
prm.SetBearerToken(exec.prm.common.BearerToken())
|
prm.SetBearerToken(exec.prm.common.BearerToken())
|
||||||
prm.SetXHeaders(exec.prm.common.XHeaders())
|
prm.SetXHeaders(exec.prm.common.XHeaders())
|
||||||
|
|
||||||
if exec.prm.raw {
|
if exec.isRaw() {
|
||||||
prm.SetRawFlag()
|
prm.SetRawFlag()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -190,25 +197,24 @@ func (c *clientWrapper) get(exec *execCtx, key *ecdsa.PrivateKey) (*object.Objec
|
||||||
return res.Object(), nil
|
return res.Object(), nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (w *storageEngineWrapper) get(exec *execCtx) (*object.Object, error) {
|
func (e *storageEngineWrapper) get(exec *execCtx) (*object.Object, error) {
|
||||||
e := (*engine.StorageEngine)(w)
|
if exec.headOnly() {
|
||||||
if exec.headOnly {
|
|
||||||
var headPrm engine.HeadPrm
|
var headPrm engine.HeadPrm
|
||||||
headPrm.WithAddress(exec.prm.addr)
|
headPrm.WithAddress(exec.address())
|
||||||
headPrm.WithRaw(exec.prm.raw)
|
headPrm.WithRaw(exec.isRaw())
|
||||||
|
|
||||||
r, err := e.Head(headPrm)
|
r, err := e.engine.Head(headPrm)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
return r.Header(), nil
|
return r.Header(), nil
|
||||||
} else if rng := exec.prm.rng; rng != nil {
|
} else if rng := exec.ctxRange(); rng != nil {
|
||||||
var getRange engine.RngPrm
|
var getRange engine.RngPrm
|
||||||
getRange.WithAddress(exec.prm.addr)
|
getRange.WithAddress(exec.address())
|
||||||
getRange.WithPayloadRange(rng)
|
getRange.WithPayloadRange(rng)
|
||||||
|
|
||||||
r, err := e.GetRange(getRange)
|
r, err := e.engine.GetRange(getRange)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
@ -216,9 +222,9 @@ func (w *storageEngineWrapper) get(exec *execCtx) (*object.Object, error) {
|
||||||
return r.Object(), nil
|
return r.Object(), nil
|
||||||
} else {
|
} else {
|
||||||
var getPrm engine.GetPrm
|
var getPrm engine.GetPrm
|
||||||
getPrm.WithAddress(exec.prm.addr)
|
getPrm.WithAddress(exec.address())
|
||||||
|
|
||||||
r, err := e.Get(getPrm)
|
r, err := e.engine.Get(getPrm)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
@ -246,3 +252,7 @@ func (h *hasherWrapper) WriteChunk(_ context.Context, p []byte) error {
|
||||||
_, err := h.hash.Write(p)
|
_, err := h.hash.Write(p)
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (n *nmSrcWrapper) currentEpoch() (uint64, error) {
|
||||||
|
return n.nmSrc.Epoch()
|
||||||
|
}
|
||||||
|
|
|
@ -13,7 +13,7 @@ import (
|
||||||
|
|
||||||
// Service implements Get operation of Object service v2.
|
// Service implements Get operation of Object service v2.
|
||||||
type Service struct {
|
type Service struct {
|
||||||
cfg
|
*cfg
|
||||||
}
|
}
|
||||||
|
|
||||||
// Option represents Service constructor option.
|
// Option represents Service constructor option.
|
||||||
|
@ -27,13 +27,15 @@ type cfg struct {
|
||||||
|
|
||||||
// NewService constructs Service instance from provided options.
|
// NewService constructs Service instance from provided options.
|
||||||
func NewService(opts ...Option) *Service {
|
func NewService(opts ...Option) *Service {
|
||||||
var s Service
|
c := new(cfg)
|
||||||
|
|
||||||
for i := range opts {
|
for i := range opts {
|
||||||
opts[i](&s.cfg)
|
opts[i](c)
|
||||||
}
|
}
|
||||||
|
|
||||||
return &s
|
return &Service{
|
||||||
|
cfg: c,
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Get calls internal service and returns v2 object stream.
|
// Get calls internal service and returns v2 object stream.
|
||||||
|
|
|
@ -17,11 +17,14 @@ import (
|
||||||
|
|
||||||
type preparedObjectTarget interface {
|
type preparedObjectTarget interface {
|
||||||
WriteObject(*objectSDK.Object, object.ContentMeta) error
|
WriteObject(*objectSDK.Object, object.ContentMeta) error
|
||||||
|
Close() (*transformer.AccessIdentifiers, error)
|
||||||
}
|
}
|
||||||
|
|
||||||
type distributedTarget struct {
|
type distributedTarget struct {
|
||||||
traversal traversal
|
traversal traversal
|
||||||
|
|
||||||
|
remotePool, localPool util.WorkerPool
|
||||||
|
|
||||||
obj *objectSDK.Object
|
obj *objectSDK.Object
|
||||||
objMeta object.ContentMeta
|
objMeta object.ContentMeta
|
||||||
|
|
||||||
|
@ -29,7 +32,7 @@ type distributedTarget struct {
|
||||||
|
|
||||||
nodeTargetInitializer func(nodeDesc) preparedObjectTarget
|
nodeTargetInitializer func(nodeDesc) preparedObjectTarget
|
||||||
|
|
||||||
getWorkerPool func([]byte) (util.WorkerPool, bool)
|
isLocalKey func([]byte) bool
|
||||||
|
|
||||||
relay func(nodeDesc) error
|
relay func(nodeDesc) error
|
||||||
|
|
||||||
|
@ -45,6 +48,9 @@ type traversal struct {
|
||||||
// need of additional broadcast after the object is saved
|
// need of additional broadcast after the object is saved
|
||||||
extraBroadcastEnabled bool
|
extraBroadcastEnabled bool
|
||||||
|
|
||||||
|
// mtx protects mExclude map.
|
||||||
|
mtx sync.RWMutex
|
||||||
|
|
||||||
// container nodes which was processed during the primary object placement
|
// container nodes which was processed during the primary object placement
|
||||||
mExclude map[string]struct{}
|
mExclude map[string]struct{}
|
||||||
}
|
}
|
||||||
|
@ -70,17 +76,21 @@ func (x *traversal) submitProcessed(n placement.Node) {
|
||||||
if x.extraBroadcastEnabled {
|
if x.extraBroadcastEnabled {
|
||||||
key := string(n.PublicKey())
|
key := string(n.PublicKey())
|
||||||
|
|
||||||
|
x.mtx.Lock()
|
||||||
if x.mExclude == nil {
|
if x.mExclude == nil {
|
||||||
x.mExclude = make(map[string]struct{}, 1)
|
x.mExclude = make(map[string]struct{}, 1)
|
||||||
}
|
}
|
||||||
|
|
||||||
x.mExclude[key] = struct{}{}
|
x.mExclude[key] = struct{}{}
|
||||||
|
x.mtx.Unlock()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// checks if specified node was processed during the primary object placement.
|
// checks if specified node was processed during the primary object placement.
|
||||||
func (x *traversal) processed(n placement.Node) bool {
|
func (x *traversal) processed(n placement.Node) bool {
|
||||||
|
x.mtx.RLock()
|
||||||
_, ok := x.mExclude[string(n.PublicKey())]
|
_, ok := x.mExclude[string(n.PublicKey())]
|
||||||
|
x.mtx.RUnlock()
|
||||||
return ok
|
return ok
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -146,9 +156,10 @@ func (t *distributedTarget) sendObject(node nodeDesc) error {
|
||||||
|
|
||||||
target := t.nodeTargetInitializer(node)
|
target := t.nodeTargetInitializer(node)
|
||||||
|
|
||||||
err := target.WriteObject(t.obj, t.objMeta)
|
if err := target.WriteObject(t.obj, t.objMeta); err != nil {
|
||||||
if err != nil {
|
|
||||||
return fmt.Errorf("could not write header: %w", err)
|
return fmt.Errorf("could not write header: %w", err)
|
||||||
|
} else if _, err := target.Close(); err != nil {
|
||||||
|
return fmt.Errorf("could not close object stream: %w", err)
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
@ -185,12 +196,27 @@ loop:
|
||||||
|
|
||||||
addr := addrs[i]
|
addr := addrs[i]
|
||||||
|
|
||||||
workerPool, isLocal := t.getWorkerPool(addr.PublicKey())
|
isLocal := t.isLocalKey(addr.PublicKey())
|
||||||
|
|
||||||
|
var workerPool util.WorkerPool
|
||||||
|
|
||||||
|
if isLocal {
|
||||||
|
workerPool = t.localPool
|
||||||
|
} else {
|
||||||
|
workerPool = t.remotePool
|
||||||
|
}
|
||||||
|
|
||||||
if err := workerPool.Submit(func() {
|
if err := workerPool.Submit(func() {
|
||||||
defer wg.Done()
|
defer wg.Done()
|
||||||
|
|
||||||
err := f(nodeDesc{local: isLocal, info: addr})
|
err := f(nodeDesc{local: isLocal, info: addr})
|
||||||
|
|
||||||
|
// mark the container node as processed in order to exclude it
|
||||||
|
// in subsequent container broadcast. Note that we don't
|
||||||
|
// process this node during broadcast if primary placement
|
||||||
|
// on it failed.
|
||||||
|
t.traversal.submitProcessed(addr)
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
resErr.Store(err)
|
resErr.Store(err)
|
||||||
svcutil.LogServiceError(t.log, "PUT", addr.Addresses(), err)
|
svcutil.LogServiceError(t.log, "PUT", addr.Addresses(), err)
|
||||||
|
@ -205,12 +231,6 @@ loop:
|
||||||
|
|
||||||
break loop
|
break loop
|
||||||
}
|
}
|
||||||
|
|
||||||
// mark the container node as processed in order to exclude it
|
|
||||||
// in subsequent container broadcast. Note that we don't
|
|
||||||
// process this node during broadcast if primary placement
|
|
||||||
// on it failed.
|
|
||||||
t.traversal.submitProcessed(addrs[i])
|
|
||||||
}
|
}
|
||||||
|
|
||||||
wg.Wait()
|
wg.Wait()
|
||||||
|
|
|
@ -4,6 +4,7 @@ import (
|
||||||
"fmt"
|
"fmt"
|
||||||
|
|
||||||
objectCore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object"
|
objectCore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object"
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object_manager/transformer"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
|
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
|
||||||
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
||||||
)
|
)
|
||||||
|
@ -25,26 +26,40 @@ type ObjectStorage interface {
|
||||||
|
|
||||||
type localTarget struct {
|
type localTarget struct {
|
||||||
storage ObjectStorage
|
storage ObjectStorage
|
||||||
|
|
||||||
|
obj *object.Object
|
||||||
|
meta objectCore.ContentMeta
|
||||||
}
|
}
|
||||||
|
|
||||||
func (t localTarget) WriteObject(obj *object.Object, meta objectCore.ContentMeta) error {
|
func (t *localTarget) WriteObject(obj *object.Object, meta objectCore.ContentMeta) error {
|
||||||
switch meta.Type() {
|
t.obj = obj
|
||||||
|
t.meta = meta
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (t *localTarget) Close() (*transformer.AccessIdentifiers, error) {
|
||||||
|
switch t.meta.Type() {
|
||||||
case object.TypeTombstone:
|
case object.TypeTombstone:
|
||||||
err := t.storage.Delete(objectCore.AddressOf(obj), meta.Objects())
|
err := t.storage.Delete(objectCore.AddressOf(t.obj), t.meta.Objects())
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("could not delete objects from tombstone locally: %w", err)
|
return nil, fmt.Errorf("could not delete objects from tombstone locally: %w", err)
|
||||||
}
|
}
|
||||||
case object.TypeLock:
|
case object.TypeLock:
|
||||||
err := t.storage.Lock(objectCore.AddressOf(obj), meta.Objects())
|
err := t.storage.Lock(objectCore.AddressOf(t.obj), t.meta.Objects())
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("could not lock object from lock objects locally: %w", err)
|
return nil, fmt.Errorf("could not lock object from lock objects locally: %w", err)
|
||||||
}
|
}
|
||||||
default:
|
default:
|
||||||
// objects that do not change meta storage
|
// objects that do not change meta storage
|
||||||
}
|
}
|
||||||
|
|
||||||
if err := t.storage.Put(obj); err != nil {
|
if err := t.storage.Put(t.obj); err != nil {
|
||||||
return fmt.Errorf("(%T) could not put object to local storage: %w", t, err)
|
return nil, fmt.Errorf("(%T) could not put object to local storage: %w", t, err)
|
||||||
}
|
}
|
||||||
return nil
|
|
||||||
|
id, _ := t.obj.ID()
|
||||||
|
|
||||||
|
return new(transformer.AccessIdentifiers).
|
||||||
|
WithSelfID(id), nil
|
||||||
}
|
}
|
||||||
|
|
|
@ -10,6 +10,7 @@ import (
|
||||||
objectcore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object"
|
objectcore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object"
|
||||||
internalclient "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/internal/client"
|
internalclient "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/internal/client"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/util"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/util"
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object_manager/transformer"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/netmap"
|
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/netmap"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
|
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
|
||||||
)
|
)
|
||||||
|
@ -24,6 +25,8 @@ type remoteTarget struct {
|
||||||
|
|
||||||
nodeInfo clientcore.NodeInfo
|
nodeInfo clientcore.NodeInfo
|
||||||
|
|
||||||
|
obj *object.Object
|
||||||
|
|
||||||
clientConstructor ClientConstructor
|
clientConstructor ClientConstructor
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -43,9 +46,15 @@ type RemotePutPrm struct {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (t *remoteTarget) WriteObject(obj *object.Object, _ objectcore.ContentMeta) error {
|
func (t *remoteTarget) WriteObject(obj *object.Object, _ objectcore.ContentMeta) error {
|
||||||
|
t.obj = obj
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (t *remoteTarget) Close() (*transformer.AccessIdentifiers, error) {
|
||||||
c, err := t.clientConstructor.Get(t.nodeInfo)
|
c, err := t.clientConstructor.Get(t.nodeInfo)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("(%T) could not create SDK client %s: %w", t, t.nodeInfo, err)
|
return nil, fmt.Errorf("(%T) could not create SDK client %s: %w", t, t.nodeInfo, err)
|
||||||
}
|
}
|
||||||
|
|
||||||
var prm internalclient.PutObjectPrm
|
var prm internalclient.PutObjectPrm
|
||||||
|
@ -56,14 +65,15 @@ func (t *remoteTarget) WriteObject(obj *object.Object, _ objectcore.ContentMeta)
|
||||||
prm.SetSessionToken(t.commonPrm.SessionToken())
|
prm.SetSessionToken(t.commonPrm.SessionToken())
|
||||||
prm.SetBearerToken(t.commonPrm.BearerToken())
|
prm.SetBearerToken(t.commonPrm.BearerToken())
|
||||||
prm.SetXHeaders(t.commonPrm.XHeaders())
|
prm.SetXHeaders(t.commonPrm.XHeaders())
|
||||||
prm.SetObject(obj)
|
prm.SetObject(t.obj)
|
||||||
|
|
||||||
_, err = internalclient.PutObject(prm)
|
res, err := internalclient.PutObject(prm)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("(%T) could not put object to %s: %w", t, t.nodeInfo.AddressGroup(), err)
|
return nil, fmt.Errorf("(%T) could not put object to %s: %w", t, t.nodeInfo.AddressGroup(), err)
|
||||||
}
|
}
|
||||||
|
|
||||||
return nil
|
return new(transformer.AccessIdentifiers).
|
||||||
|
WithSelfID(res.ID()), nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewRemoteSender creates, initializes and returns new RemoteSender instance.
|
// NewRemoteSender creates, initializes and returns new RemoteSender instance.
|
||||||
|
@ -110,8 +120,9 @@ func (s *RemoteSender) PutObject(ctx context.Context, p *RemotePutPrm) error {
|
||||||
return fmt.Errorf("parse client node info: %w", err)
|
return fmt.Errorf("parse client node info: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
err = t.WriteObject(p.obj, objectcore.ContentMeta{})
|
if err := t.WriteObject(p.obj, objectcore.ContentMeta{}); err != nil {
|
||||||
if err != nil {
|
return fmt.Errorf("(%T) could not send object header: %w", s, err)
|
||||||
|
} else if _, err := t.Close(); err != nil {
|
||||||
return fmt.Errorf("(%T) could not send object: %w", s, err)
|
return fmt.Errorf("(%T) could not send object: %w", s, err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -22,7 +22,7 @@ type MaxSizeSource interface {
|
||||||
}
|
}
|
||||||
|
|
||||||
type Service struct {
|
type Service struct {
|
||||||
cfg
|
*cfg
|
||||||
}
|
}
|
||||||
|
|
||||||
type Option func(*cfg)
|
type Option func(*cfg)
|
||||||
|
@ -57,28 +57,31 @@ type cfg struct {
|
||||||
log *logger.Logger
|
log *logger.Logger
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *cfg) initDefault() {
|
func defaultCfg() *cfg {
|
||||||
c.remotePool = util.NewPseudoWorkerPool()
|
return &cfg{
|
||||||
c.localPool = util.NewPseudoWorkerPool()
|
remotePool: util.NewPseudoWorkerPool(),
|
||||||
c.log = &logger.Logger{Logger: zap.L()}
|
localPool: util.NewPseudoWorkerPool(),
|
||||||
|
log: &logger.Logger{Logger: zap.L()},
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewService(opts ...Option) *Service {
|
func NewService(opts ...Option) *Service {
|
||||||
var s Service
|
c := defaultCfg()
|
||||||
s.cfg.initDefault()
|
|
||||||
|
|
||||||
for i := range opts {
|
for i := range opts {
|
||||||
opts[i](&s.cfg)
|
opts[i](c)
|
||||||
}
|
}
|
||||||
|
|
||||||
s.cfg.fmtValidator = object.NewFormatValidator(s.cfg.fmtValidatorOpts...)
|
c.fmtValidator = object.NewFormatValidator(c.fmtValidatorOpts...)
|
||||||
|
|
||||||
return &s
|
return &Service{
|
||||||
|
cfg: c,
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (p *Service) Put(ctx context.Context) (*Streamer, error) {
|
func (p *Service) Put(ctx context.Context) (*Streamer, error) {
|
||||||
return &Streamer{
|
return &Streamer{
|
||||||
cfg: &p.cfg,
|
cfg: p.cfg,
|
||||||
ctx: ctx,
|
ctx: ctx,
|
||||||
}, nil
|
}, nil
|
||||||
}
|
}
|
||||||
|
|
|
@ -11,7 +11,6 @@ import (
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/util"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/util"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object_manager/placement"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object_manager/placement"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object_manager/transformer"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object_manager/transformer"
|
||||||
pkgutil "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util"
|
|
||||||
containerSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container"
|
containerSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
|
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/user"
|
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/user"
|
||||||
|
@ -223,10 +222,11 @@ func (p *Streamer) newCommonTarget(prm *PutInitPrm) transformer.ObjectTarget {
|
||||||
extraBroadcastEnabled: withBroadcast,
|
extraBroadcastEnabled: withBroadcast,
|
||||||
},
|
},
|
||||||
payload: getPayload(),
|
payload: getPayload(),
|
||||||
getWorkerPool: p.getWorkerPool,
|
remotePool: p.remotePool,
|
||||||
|
localPool: p.localPool,
|
||||||
nodeTargetInitializer: func(node nodeDesc) preparedObjectTarget {
|
nodeTargetInitializer: func(node nodeDesc) preparedObjectTarget {
|
||||||
if node.local {
|
if node.local {
|
||||||
return localTarget{
|
return &localTarget{
|
||||||
storage: p.localStore,
|
storage: p.localStore,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -245,6 +245,8 @@ func (p *Streamer) newCommonTarget(prm *PutInitPrm) transformer.ObjectTarget {
|
||||||
relay: relay,
|
relay: relay,
|
||||||
fmt: p.fmtValidator,
|
fmt: p.fmtValidator,
|
||||||
log: p.log,
|
log: p.log,
|
||||||
|
|
||||||
|
isLocalKey: p.netmapKeys.IsLocalKey,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -281,10 +283,3 @@ func (p *Streamer) Close() (*PutResponse, error) {
|
||||||
id: ids.SelfID(),
|
id: ids.SelfID(),
|
||||||
}, nil
|
}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (p *Streamer) getWorkerPool(pub []byte) (pkgutil.WorkerPool, bool) {
|
|
||||||
if p.netmapKeys.IsLocalKey(pub) {
|
|
||||||
return p.localPool, true
|
|
||||||
}
|
|
||||||
return p.remotePool, false
|
|
||||||
}
|
|
||||||
|
|
|
@ -11,7 +11,7 @@ import (
|
||||||
|
|
||||||
// Service implements Put operation of Object service v2.
|
// Service implements Put operation of Object service v2.
|
||||||
type Service struct {
|
type Service struct {
|
||||||
cfg
|
*cfg
|
||||||
}
|
}
|
||||||
|
|
||||||
// Option represents Service constructor option.
|
// Option represents Service constructor option.
|
||||||
|
@ -24,13 +24,15 @@ type cfg struct {
|
||||||
|
|
||||||
// NewService constructs Service instance from provided options.
|
// NewService constructs Service instance from provided options.
|
||||||
func NewService(opts ...Option) *Service {
|
func NewService(opts ...Option) *Service {
|
||||||
var s Service
|
c := new(cfg)
|
||||||
|
|
||||||
for i := range opts {
|
for i := range opts {
|
||||||
opts[i](&s.cfg)
|
opts[i](c)
|
||||||
}
|
}
|
||||||
|
|
||||||
return &s
|
return &Service{
|
||||||
|
cfg: c,
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Put calls internal service and returns v2 object streamer.
|
// Put calls internal service and returns v2 object streamer.
|
||||||
|
|
|
@ -5,6 +5,7 @@ import (
|
||||||
"fmt"
|
"fmt"
|
||||||
|
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/object"
|
"git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/object"
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/util"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/util/response"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/util/response"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -15,26 +16,25 @@ type ResponseService struct {
|
||||||
}
|
}
|
||||||
|
|
||||||
type searchStreamResponser struct {
|
type searchStreamResponser struct {
|
||||||
SearchStream
|
util.ServerStream
|
||||||
|
|
||||||
respSvc *response.Service
|
respWriter util.ResponseMessageWriter
|
||||||
}
|
}
|
||||||
|
|
||||||
type getStreamResponser struct {
|
type getStreamResponser struct {
|
||||||
GetObjectStream
|
util.ServerStream
|
||||||
|
|
||||||
respSvc *response.Service
|
respWriter util.ResponseMessageWriter
|
||||||
}
|
}
|
||||||
|
|
||||||
type getRangeStreamResponser struct {
|
type getRangeStreamResponser struct {
|
||||||
GetObjectRangeStream
|
util.ServerStream
|
||||||
|
|
||||||
respSvc *response.Service
|
respWriter util.ResponseMessageWriter
|
||||||
}
|
}
|
||||||
|
|
||||||
type putStreamResponser struct {
|
type putStreamResponser struct {
|
||||||
stream PutObjectStream
|
stream *response.ClientMessageStreamer
|
||||||
respSvc *response.Service
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewResponseService returns object service instance that passes internal service
|
// NewResponseService returns object service instance that passes internal service
|
||||||
|
@ -47,32 +47,29 @@ func NewResponseService(objSvc ServiceServer, respSvc *response.Service) *Respon
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *getStreamResponser) Send(resp *object.GetResponse) error {
|
func (s *getStreamResponser) Send(resp *object.GetResponse) error {
|
||||||
s.respSvc.SetMeta(resp)
|
return s.respWriter(resp)
|
||||||
return s.GetObjectStream.Send(resp)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *ResponseService) Get(req *object.GetRequest, stream GetObjectStream) error {
|
func (s *ResponseService) Get(req *object.GetRequest, stream GetObjectStream) error {
|
||||||
return s.svc.Get(req, &getStreamResponser{
|
return s.svc.Get(req, &getStreamResponser{
|
||||||
GetObjectStream: stream,
|
ServerStream: stream,
|
||||||
respSvc: s.respSvc,
|
respWriter: s.respSvc.HandleServerStreamRequest(func(resp util.ResponseMessage) error {
|
||||||
|
return stream.Send(resp.(*object.GetResponse))
|
||||||
|
}),
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *putStreamResponser) Send(req *object.PutRequest) error {
|
func (s *putStreamResponser) Send(req *object.PutRequest) error {
|
||||||
if err := s.stream.Send(req); err != nil {
|
return s.stream.Send(req)
|
||||||
return fmt.Errorf("could not send the request: %w", err)
|
|
||||||
}
|
|
||||||
return nil
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *putStreamResponser) CloseAndRecv() (*object.PutResponse, error) {
|
func (s *putStreamResponser) CloseAndRecv() (*object.PutResponse, error) {
|
||||||
r, err := s.stream.CloseAndRecv()
|
r, err := s.stream.CloseAndRecv()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, fmt.Errorf("could not close stream and receive response: %w", err)
|
return nil, fmt.Errorf("(%T) could not receive response: %w", s, err)
|
||||||
}
|
}
|
||||||
|
|
||||||
s.respSvc.SetMeta(r)
|
return r.(*object.PutResponse), nil
|
||||||
return r, nil
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *ResponseService) Put(ctx context.Context) (PutObjectStream, error) {
|
func (s *ResponseService) Put(ctx context.Context) (PutObjectStream, error) {
|
||||||
|
@ -82,61 +79,78 @@ func (s *ResponseService) Put(ctx context.Context) (PutObjectStream, error) {
|
||||||
}
|
}
|
||||||
|
|
||||||
return &putStreamResponser{
|
return &putStreamResponser{
|
||||||
stream: stream,
|
stream: s.respSvc.CreateRequestStreamer(
|
||||||
respSvc: s.respSvc,
|
func(req any) error {
|
||||||
|
return stream.Send(req.(*object.PutRequest))
|
||||||
|
},
|
||||||
|
func() (util.ResponseMessage, error) {
|
||||||
|
return stream.CloseAndRecv()
|
||||||
|
},
|
||||||
|
),
|
||||||
}, nil
|
}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *ResponseService) Head(ctx context.Context, req *object.HeadRequest) (*object.HeadResponse, error) {
|
func (s *ResponseService) Head(ctx context.Context, req *object.HeadRequest) (*object.HeadResponse, error) {
|
||||||
resp, err := s.svc.Head(ctx, req)
|
resp, err := s.respSvc.HandleUnaryRequest(ctx, req,
|
||||||
|
func(ctx context.Context, req any) (util.ResponseMessage, error) {
|
||||||
|
return s.svc.Head(ctx, req.(*object.HeadRequest))
|
||||||
|
},
|
||||||
|
)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
s.respSvc.SetMeta(resp)
|
return resp.(*object.HeadResponse), nil
|
||||||
return resp, nil
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *searchStreamResponser) Send(resp *object.SearchResponse) error {
|
func (s *searchStreamResponser) Send(resp *object.SearchResponse) error {
|
||||||
s.respSvc.SetMeta(resp)
|
return s.respWriter(resp)
|
||||||
return s.SearchStream.Send(resp)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *ResponseService) Search(req *object.SearchRequest, stream SearchStream) error {
|
func (s *ResponseService) Search(req *object.SearchRequest, stream SearchStream) error {
|
||||||
return s.svc.Search(req, &searchStreamResponser{
|
return s.svc.Search(req, &searchStreamResponser{
|
||||||
SearchStream: stream,
|
ServerStream: stream,
|
||||||
respSvc: s.respSvc,
|
respWriter: s.respSvc.HandleServerStreamRequest(func(resp util.ResponseMessage) error {
|
||||||
|
return stream.Send(resp.(*object.SearchResponse))
|
||||||
|
}),
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *ResponseService) Delete(ctx context.Context, req *object.DeleteRequest) (*object.DeleteResponse, error) {
|
func (s *ResponseService) Delete(ctx context.Context, req *object.DeleteRequest) (*object.DeleteResponse, error) {
|
||||||
resp, err := s.svc.Delete(ctx, req)
|
resp, err := s.respSvc.HandleUnaryRequest(ctx, req,
|
||||||
|
func(ctx context.Context, req any) (util.ResponseMessage, error) {
|
||||||
|
return s.svc.Delete(ctx, req.(*object.DeleteRequest))
|
||||||
|
},
|
||||||
|
)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
s.respSvc.SetMeta(resp)
|
return resp.(*object.DeleteResponse), nil
|
||||||
return resp, nil
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *getRangeStreamResponser) Send(resp *object.GetRangeResponse) error {
|
func (s *getRangeStreamResponser) Send(resp *object.GetRangeResponse) error {
|
||||||
s.respSvc.SetMeta(resp)
|
return s.respWriter(resp)
|
||||||
return s.GetObjectRangeStream.Send(resp)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *ResponseService) GetRange(req *object.GetRangeRequest, stream GetObjectRangeStream) error {
|
func (s *ResponseService) GetRange(req *object.GetRangeRequest, stream GetObjectRangeStream) error {
|
||||||
return s.svc.GetRange(req, &getRangeStreamResponser{
|
return s.svc.GetRange(req, &getRangeStreamResponser{
|
||||||
GetObjectRangeStream: stream,
|
ServerStream: stream,
|
||||||
respSvc: s.respSvc,
|
respWriter: s.respSvc.HandleServerStreamRequest(func(resp util.ResponseMessage) error {
|
||||||
|
return stream.Send(resp.(*object.GetRangeResponse))
|
||||||
|
}),
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *ResponseService) GetRangeHash(ctx context.Context, req *object.GetRangeHashRequest) (*object.GetRangeHashResponse, error) {
|
func (s *ResponseService) GetRangeHash(ctx context.Context, req *object.GetRangeHashRequest) (*object.GetRangeHashResponse, error) {
|
||||||
resp, err := s.svc.GetRangeHash(ctx, req)
|
resp, err := s.respSvc.HandleUnaryRequest(ctx, req,
|
||||||
|
func(ctx context.Context, req any) (util.ResponseMessage, error) {
|
||||||
|
return s.svc.GetRangeHash(ctx, req.(*object.GetRangeHashRequest))
|
||||||
|
},
|
||||||
|
)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
s.respSvc.SetMeta(resp)
|
return resp.(*object.GetRangeHashResponse), nil
|
||||||
return resp, nil
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -10,12 +10,12 @@ import (
|
||||||
)
|
)
|
||||||
|
|
||||||
func (exec *execCtx) executeOnContainer() {
|
func (exec *execCtx) executeOnContainer() {
|
||||||
if exec.prm.common.LocalOnly() {
|
if exec.isLocal() {
|
||||||
exec.log.Debug("return result directly")
|
exec.log.Debug("return result directly")
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
lookupDepth := exec.prm.common.NetmapLookupDepth()
|
lookupDepth := exec.netmapLookupDepth()
|
||||||
|
|
||||||
exec.log.Debug("trying to execute in container...",
|
exec.log.Debug("trying to execute in container...",
|
||||||
zap.Uint64("netmap lookup depth", lookupDepth),
|
zap.Uint64("netmap lookup depth", lookupDepth),
|
||||||
|
@ -52,12 +52,12 @@ func (exec *execCtx) processCurrentEpoch() bool {
|
||||||
zap.Uint64("number", exec.curProcEpoch),
|
zap.Uint64("number", exec.curProcEpoch),
|
||||||
)
|
)
|
||||||
|
|
||||||
traverser, ok := exec.generateTraverser(exec.prm.cnr)
|
traverser, ok := exec.generateTraverser(exec.containerID())
|
||||||
if !ok {
|
if !ok {
|
||||||
return true
|
return true
|
||||||
}
|
}
|
||||||
|
|
||||||
ctx, cancel := context.WithCancel(exec.ctx)
|
ctx, cancel := context.WithCancel(exec.context())
|
||||||
defer cancel()
|
defer cancel()
|
||||||
|
|
||||||
for {
|
for {
|
||||||
|
|
|
@ -6,6 +6,7 @@ import (
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object_manager/placement"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object_manager/placement"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger"
|
||||||
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
|
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
|
||||||
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
||||||
"go.uber.org/zap"
|
"go.uber.org/zap"
|
||||||
)
|
)
|
||||||
|
@ -44,20 +45,44 @@ func (exec *execCtx) prepare() {
|
||||||
func (exec *execCtx) setLogger(l *logger.Logger) {
|
func (exec *execCtx) setLogger(l *logger.Logger) {
|
||||||
exec.log = &logger.Logger{Logger: l.With(
|
exec.log = &logger.Logger{Logger: l.With(
|
||||||
zap.String("request", "SEARCH"),
|
zap.String("request", "SEARCH"),
|
||||||
zap.Stringer("container", exec.prm.cnr),
|
zap.Stringer("container", exec.containerID()),
|
||||||
zap.Bool("local", exec.prm.common.LocalOnly()),
|
zap.Bool("local", exec.isLocal()),
|
||||||
zap.Bool("with session", exec.prm.common.SessionToken() != nil),
|
zap.Bool("with session", exec.prm.common.SessionToken() != nil),
|
||||||
zap.Bool("with bearer", exec.prm.common.BearerToken() != nil),
|
zap.Bool("with bearer", exec.prm.common.BearerToken() != nil),
|
||||||
)}
|
)}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (exec execCtx) context() context.Context {
|
||||||
|
return exec.ctx
|
||||||
|
}
|
||||||
|
|
||||||
|
func (exec execCtx) isLocal() bool {
|
||||||
|
return exec.prm.common.LocalOnly()
|
||||||
|
}
|
||||||
|
|
||||||
|
func (exec *execCtx) containerID() cid.ID {
|
||||||
|
return exec.prm.cnr
|
||||||
|
}
|
||||||
|
|
||||||
|
func (exec *execCtx) searchFilters() object.SearchFilters {
|
||||||
|
return exec.prm.filters
|
||||||
|
}
|
||||||
|
|
||||||
|
func (exec *execCtx) netmapEpoch() uint64 {
|
||||||
|
return exec.prm.common.NetmapEpoch()
|
||||||
|
}
|
||||||
|
|
||||||
|
func (exec *execCtx) netmapLookupDepth() uint64 {
|
||||||
|
return exec.prm.common.NetmapLookupDepth()
|
||||||
|
}
|
||||||
|
|
||||||
func (exec *execCtx) initEpoch() bool {
|
func (exec *execCtx) initEpoch() bool {
|
||||||
exec.curProcEpoch = exec.prm.common.NetmapEpoch()
|
exec.curProcEpoch = exec.netmapEpoch()
|
||||||
if exec.curProcEpoch > 0 {
|
if exec.curProcEpoch > 0 {
|
||||||
return true
|
return true
|
||||||
}
|
}
|
||||||
|
|
||||||
e, err := exec.svc.epochSource.Epoch()
|
e, err := exec.svc.currentEpochReceiver.currentEpoch()
|
||||||
|
|
||||||
switch {
|
switch {
|
||||||
default:
|
default:
|
||||||
|
|
|
@ -51,7 +51,7 @@ type simpleIDWriter struct {
|
||||||
|
|
||||||
type testEpochReceiver uint64
|
type testEpochReceiver uint64
|
||||||
|
|
||||||
func (e testEpochReceiver) Epoch() (uint64, error) {
|
func (e testEpochReceiver) currentEpoch() (uint64, error) {
|
||||||
return uint64(e), nil
|
return uint64(e), nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -103,7 +103,7 @@ func (c *testClientCache) get(info clientcore.NodeInfo) (searchClient, error) {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *testStorage) search(exec *execCtx) ([]oid.ID, error) {
|
func (s *testStorage) search(exec *execCtx) ([]oid.ID, error) {
|
||||||
v, ok := s.items[exec.prm.cnr.EncodeToString()]
|
v, ok := s.items[exec.containerID().EncodeToString()]
|
||||||
if !ok {
|
if !ok {
|
||||||
return nil, nil
|
return nil, nil
|
||||||
}
|
}
|
||||||
|
@ -112,7 +112,7 @@ func (s *testStorage) search(exec *execCtx) ([]oid.ID, error) {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *testStorage) searchObjects(exec *execCtx, _ clientcore.NodeInfo) ([]oid.ID, error) {
|
func (c *testStorage) searchObjects(exec *execCtx, _ clientcore.NodeInfo) ([]oid.ID, error) {
|
||||||
v, ok := c.items[exec.prm.cnr.EncodeToString()]
|
v, ok := c.items[exec.containerID().EncodeToString()]
|
||||||
if !ok {
|
if !ok {
|
||||||
return nil, nil
|
return nil, nil
|
||||||
}
|
}
|
||||||
|
@ -146,7 +146,7 @@ func TestGetLocalOnly(t *testing.T) {
|
||||||
ctx := context.Background()
|
ctx := context.Background()
|
||||||
|
|
||||||
newSvc := func(storage *testStorage) *Service {
|
newSvc := func(storage *testStorage) *Service {
|
||||||
svc := &Service{}
|
svc := &Service{cfg: new(cfg)}
|
||||||
svc.log = test.NewLogger(false)
|
svc.log = test.NewLogger(false)
|
||||||
svc.localStorage = storage
|
svc.localStorage = storage
|
||||||
|
|
||||||
|
@ -248,7 +248,7 @@ func TestGetRemoteSmall(t *testing.T) {
|
||||||
container.CalculateID(&id, cnr)
|
container.CalculateID(&id, cnr)
|
||||||
|
|
||||||
newSvc := func(b *testPlacementBuilder, c *testClientCache) *Service {
|
newSvc := func(b *testPlacementBuilder, c *testClientCache) *Service {
|
||||||
svc := &Service{}
|
svc := &Service{cfg: new(cfg)}
|
||||||
svc.log = test.NewLogger(false)
|
svc.log = test.NewLogger(false)
|
||||||
svc.localStorage = newTestStorage()
|
svc.localStorage = newTestStorage()
|
||||||
|
|
||||||
|
@ -261,7 +261,7 @@ func TestGetRemoteSmall(t *testing.T) {
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
svc.clientConstructor = c
|
svc.clientConstructor = c
|
||||||
svc.epochSource = testEpochReceiver(curEpoch)
|
svc.currentEpochReceiver = testEpochReceiver(curEpoch)
|
||||||
|
|
||||||
return svc
|
return svc
|
||||||
}
|
}
|
||||||
|
@ -357,7 +357,7 @@ func TestGetFromPastEpoch(t *testing.T) {
|
||||||
ids22 := generateIDs(10)
|
ids22 := generateIDs(10)
|
||||||
c22.addResult(idCnr, ids22, nil)
|
c22.addResult(idCnr, ids22, nil)
|
||||||
|
|
||||||
svc := &Service{}
|
svc := &Service{cfg: new(cfg)}
|
||||||
svc.log = test.NewLogger(false)
|
svc.log = test.NewLogger(false)
|
||||||
svc.localStorage = newTestStorage()
|
svc.localStorage = newTestStorage()
|
||||||
|
|
||||||
|
@ -388,7 +388,7 @@ func TestGetFromPastEpoch(t *testing.T) {
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
svc.epochSource = testEpochReceiver(curEpoch)
|
svc.currentEpochReceiver = testEpochReceiver(curEpoch)
|
||||||
|
|
||||||
w := new(simpleIDWriter)
|
w := new(simpleIDWriter)
|
||||||
|
|
||||||
|
|
|
@ -15,7 +15,7 @@ import (
|
||||||
// Service is an utility serving requests
|
// Service is an utility serving requests
|
||||||
// of Object.Search service.
|
// of Object.Search service.
|
||||||
type Service struct {
|
type Service struct {
|
||||||
cfg
|
*cfg
|
||||||
}
|
}
|
||||||
|
|
||||||
// Option is a Service's constructor option.
|
// Option is a Service's constructor option.
|
||||||
|
@ -46,31 +46,32 @@ type cfg struct {
|
||||||
generateTraverser(cid.ID, uint64) (*placement.Traverser, error)
|
generateTraverser(cid.ID, uint64) (*placement.Traverser, error)
|
||||||
}
|
}
|
||||||
|
|
||||||
epochSource epochSource
|
currentEpochReceiver interface {
|
||||||
|
currentEpoch() (uint64, error)
|
||||||
|
}
|
||||||
|
|
||||||
keyStore *util.KeyStorage
|
keyStore *util.KeyStorage
|
||||||
}
|
}
|
||||||
|
|
||||||
type epochSource interface {
|
func defaultCfg() *cfg {
|
||||||
Epoch() (uint64, error)
|
return &cfg{
|
||||||
|
log: &logger.Logger{Logger: zap.L()},
|
||||||
|
clientConstructor: new(clientConstructorWrapper),
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *cfg) initDefault() {
|
|
||||||
c.log = &logger.Logger{Logger: zap.L()}
|
|
||||||
c.clientConstructor = new(clientConstructorWrapper)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// New creates, initializes and returns utility serving
|
// New creates, initializes and returns utility serving
|
||||||
// Object.Get service requests.
|
// Object.Get service requests.
|
||||||
func New(opts ...Option) *Service {
|
func New(opts ...Option) *Service {
|
||||||
var s Service
|
c := defaultCfg()
|
||||||
s.cfg.initDefault()
|
|
||||||
|
|
||||||
for i := range opts {
|
for i := range opts {
|
||||||
opts[i](&s.cfg)
|
opts[i](c)
|
||||||
}
|
}
|
||||||
|
|
||||||
return &s
|
return &Service{
|
||||||
|
cfg: c,
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// WithLogger returns option to specify Get service's logger.
|
// WithLogger returns option to specify Get service's logger.
|
||||||
|
@ -109,7 +110,9 @@ func WithTraverserGenerator(t *util.TraverserGenerator) Option {
|
||||||
// map storage to receive current network state.
|
// map storage to receive current network state.
|
||||||
func WithNetMapSource(nmSrc netmap.Source) Option {
|
func WithNetMapSource(nmSrc netmap.Source) Option {
|
||||||
return func(c *cfg) {
|
return func(c *cfg) {
|
||||||
c.epochSource = nmSrc
|
c.currentEpochReceiver = &nmSrcWrapper{
|
||||||
|
nmSrc: nmSrc,
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -4,6 +4,7 @@ import (
|
||||||
"sync"
|
"sync"
|
||||||
|
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/client"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/client"
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/netmap"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/engine"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/engine"
|
||||||
internalclient "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/internal/client"
|
internalclient "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/internal/client"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/util"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/util"
|
||||||
|
@ -34,6 +35,10 @@ type storageEngineWrapper struct {
|
||||||
|
|
||||||
type traverseGeneratorWrapper util.TraverserGenerator
|
type traverseGeneratorWrapper util.TraverserGenerator
|
||||||
|
|
||||||
|
type nmSrcWrapper struct {
|
||||||
|
nmSrc netmap.Source
|
||||||
|
}
|
||||||
|
|
||||||
func newUniqueAddressWriter(w IDListWriter) IDListWriter {
|
func newUniqueAddressWriter(w IDListWriter) IDListWriter {
|
||||||
return &uniqueIDWriter{
|
return &uniqueIDWriter{
|
||||||
written: make(map[oid.ID]struct{}),
|
written: make(map[oid.ID]struct{}),
|
||||||
|
@ -93,7 +98,7 @@ func (c *clientWrapper) searchObjects(exec *execCtx, info client.NodeInfo) ([]oi
|
||||||
|
|
||||||
var prm internalclient.SearchObjectsPrm
|
var prm internalclient.SearchObjectsPrm
|
||||||
|
|
||||||
prm.SetContext(exec.ctx)
|
prm.SetContext(exec.context())
|
||||||
prm.SetClient(c.client)
|
prm.SetClient(c.client)
|
||||||
prm.SetPrivateKey(key)
|
prm.SetPrivateKey(key)
|
||||||
prm.SetSessionToken(exec.prm.common.SessionToken())
|
prm.SetSessionToken(exec.prm.common.SessionToken())
|
||||||
|
@ -101,8 +106,8 @@ func (c *clientWrapper) searchObjects(exec *execCtx, info client.NodeInfo) ([]oi
|
||||||
prm.SetTTL(exec.prm.common.TTL())
|
prm.SetTTL(exec.prm.common.TTL())
|
||||||
prm.SetXHeaders(exec.prm.common.XHeaders())
|
prm.SetXHeaders(exec.prm.common.XHeaders())
|
||||||
prm.SetNetmapEpoch(exec.curProcEpoch)
|
prm.SetNetmapEpoch(exec.curProcEpoch)
|
||||||
prm.SetContainerID(exec.prm.cnr)
|
prm.SetContainerID(exec.containerID())
|
||||||
prm.SetFilters(exec.prm.filters)
|
prm.SetFilters(exec.searchFilters())
|
||||||
|
|
||||||
res, err := internalclient.SearchObjects(prm)
|
res, err := internalclient.SearchObjects(prm)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -114,8 +119,8 @@ func (c *clientWrapper) searchObjects(exec *execCtx, info client.NodeInfo) ([]oi
|
||||||
|
|
||||||
func (e *storageEngineWrapper) search(exec *execCtx) ([]oid.ID, error) {
|
func (e *storageEngineWrapper) search(exec *execCtx) ([]oid.ID, error) {
|
||||||
var selectPrm engine.SelectPrm
|
var selectPrm engine.SelectPrm
|
||||||
selectPrm.WithFilters(exec.prm.filters)
|
selectPrm.WithFilters(exec.searchFilters())
|
||||||
selectPrm.WithContainerID(exec.prm.cnr)
|
selectPrm.WithContainerID(exec.containerID())
|
||||||
|
|
||||||
r, err := e.storage.Select(selectPrm)
|
r, err := e.storage.Select(selectPrm)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -138,3 +143,7 @@ func idsFromAddresses(addrs []oid.Address) []oid.ID {
|
||||||
func (e *traverseGeneratorWrapper) generateTraverser(cnr cid.ID, epoch uint64) (*placement.Traverser, error) {
|
func (e *traverseGeneratorWrapper) generateTraverser(cnr cid.ID, epoch uint64) (*placement.Traverser, error) {
|
||||||
return (*util.TraverserGenerator)(e).GenerateTraverser(cnr, nil, epoch)
|
return (*util.TraverserGenerator)(e).GenerateTraverser(cnr, nil, epoch)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (n *nmSrcWrapper) currentEpoch() (uint64, error) {
|
||||||
|
return n.nmSrc.Epoch()
|
||||||
|
}
|
||||||
|
|
|
@ -9,7 +9,7 @@ import (
|
||||||
|
|
||||||
// Service implements Search operation of Object service v2.
|
// Service implements Search operation of Object service v2.
|
||||||
type Service struct {
|
type Service struct {
|
||||||
cfg
|
*cfg
|
||||||
}
|
}
|
||||||
|
|
||||||
// Option represents Service constructor option.
|
// Option represents Service constructor option.
|
||||||
|
@ -23,13 +23,15 @@ type cfg struct {
|
||||||
|
|
||||||
// NewService constructs Service instance from provided options.
|
// NewService constructs Service instance from provided options.
|
||||||
func NewService(opts ...Option) *Service {
|
func NewService(opts ...Option) *Service {
|
||||||
var s Service
|
c := new(cfg)
|
||||||
|
|
||||||
for i := range opts {
|
for i := range opts {
|
||||||
opts[i](&s.cfg)
|
opts[i](c)
|
||||||
}
|
}
|
||||||
|
|
||||||
return &s
|
return &Service{
|
||||||
|
cfg: c,
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Search calls internal service and returns v2 object stream.
|
// Search calls internal service and returns v2 object stream.
|
||||||
|
|
|
@ -18,29 +18,27 @@ type SignService struct {
|
||||||
}
|
}
|
||||||
|
|
||||||
type searchStreamSigner struct {
|
type searchStreamSigner struct {
|
||||||
SearchStream
|
util.ServerStream
|
||||||
statusSupported bool
|
|
||||||
sigSvc *util.SignService
|
respWriter util.ResponseMessageWriter
|
||||||
|
|
||||||
nonEmptyResp bool // set on first Send call
|
nonEmptyResp bool // set on first Send call
|
||||||
}
|
}
|
||||||
|
|
||||||
type getStreamSigner struct {
|
type getStreamSigner struct {
|
||||||
GetObjectStream
|
util.ServerStream
|
||||||
statusSupported bool
|
|
||||||
sigSvc *util.SignService
|
respWriter util.ResponseMessageWriter
|
||||||
}
|
}
|
||||||
|
|
||||||
type putStreamSigner struct {
|
type putStreamSigner struct {
|
||||||
sigSvc *util.SignService
|
stream *util.RequestMessageStreamer
|
||||||
stream PutObjectStream
|
|
||||||
statusSupported bool
|
|
||||||
err error
|
|
||||||
}
|
}
|
||||||
|
|
||||||
type getRangeStreamSigner struct {
|
type getRangeStreamSigner struct {
|
||||||
GetObjectRangeStream
|
util.ServerStream
|
||||||
statusSupported bool
|
|
||||||
sigSvc *util.SignService
|
respWriter util.ResponseMessageWriter
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewSignService(key *ecdsa.PrivateKey, svc ServiceServer) *SignService {
|
func NewSignService(key *ecdsa.PrivateKey, svc ServiceServer) *SignService {
|
||||||
|
@ -52,50 +50,37 @@ func NewSignService(key *ecdsa.PrivateKey, svc ServiceServer) *SignService {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *getStreamSigner) Send(resp *object.GetResponse) error {
|
func (s *getStreamSigner) Send(resp *object.GetResponse) error {
|
||||||
if err := s.sigSvc.SignResponse(s.statusSupported, resp, nil); err != nil {
|
return s.respWriter(resp)
|
||||||
return err
|
|
||||||
}
|
|
||||||
return s.GetObjectStream.Send(resp)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *SignService) Get(req *object.GetRequest, stream GetObjectStream) error {
|
func (s *SignService) Get(req *object.GetRequest, stream GetObjectStream) error {
|
||||||
if err := s.sigSvc.VerifyRequest(req); err != nil {
|
return s.sigSvc.HandleServerStreamRequest(req,
|
||||||
resp := new(object.GetResponse)
|
func(resp util.ResponseMessage) error {
|
||||||
_ = s.sigSvc.SignResponse(util.IsStatusSupported(req), resp, err)
|
return stream.Send(resp.(*object.GetResponse))
|
||||||
return stream.Send(resp)
|
},
|
||||||
}
|
func() util.ResponseMessage {
|
||||||
|
return new(object.GetResponse)
|
||||||
|
},
|
||||||
|
func(respWriter util.ResponseMessageWriter) error {
|
||||||
return s.svc.Get(req, &getStreamSigner{
|
return s.svc.Get(req, &getStreamSigner{
|
||||||
GetObjectStream: stream,
|
ServerStream: stream,
|
||||||
sigSvc: s.sigSvc,
|
respWriter: respWriter,
|
||||||
statusSupported: util.IsStatusSupported(req),
|
|
||||||
})
|
})
|
||||||
|
},
|
||||||
|
)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *putStreamSigner) Send(req *object.PutRequest) error {
|
func (s *putStreamSigner) Send(req *object.PutRequest) error {
|
||||||
s.statusSupported = util.IsStatusSupported(req)
|
return s.stream.Send(req)
|
||||||
|
|
||||||
if s.err = s.sigSvc.VerifyRequest(req); s.err != nil {
|
|
||||||
return util.ErrAbortStream
|
|
||||||
}
|
|
||||||
if s.err = s.stream.Send(req); s.err != nil {
|
|
||||||
return util.ErrAbortStream
|
|
||||||
}
|
|
||||||
return nil
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *putStreamSigner) CloseAndRecv() (resp *object.PutResponse, err error) {
|
func (s *putStreamSigner) CloseAndRecv() (*object.PutResponse, error) {
|
||||||
if s.err != nil {
|
r, err := s.stream.CloseAndRecv()
|
||||||
err = s.err
|
|
||||||
resp = new(object.PutResponse)
|
|
||||||
} else {
|
|
||||||
resp, err = s.stream.CloseAndRecv()
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, fmt.Errorf("could not close stream and receive response: %w", err)
|
return nil, fmt.Errorf("could not receive response: %w", err)
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
return resp, s.sigSvc.SignResponse(s.statusSupported, resp, err)
|
return r.(*object.PutResponse), nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *SignService) Put(ctx context.Context) (PutObjectStream, error) {
|
func (s *SignService) Put(ctx context.Context) (PutObjectStream, error) {
|
||||||
|
@ -105,42 +90,58 @@ func (s *SignService) Put(ctx context.Context) (PutObjectStream, error) {
|
||||||
}
|
}
|
||||||
|
|
||||||
return &putStreamSigner{
|
return &putStreamSigner{
|
||||||
stream: stream,
|
stream: s.sigSvc.CreateRequestStreamer(
|
||||||
sigSvc: s.sigSvc,
|
func(req any) error {
|
||||||
|
return stream.Send(req.(*object.PutRequest))
|
||||||
|
},
|
||||||
|
func() (util.ResponseMessage, error) {
|
||||||
|
return stream.CloseAndRecv()
|
||||||
|
},
|
||||||
|
func() util.ResponseMessage {
|
||||||
|
return new(object.PutResponse)
|
||||||
|
},
|
||||||
|
),
|
||||||
}, nil
|
}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *SignService) Head(ctx context.Context, req *object.HeadRequest) (*object.HeadResponse, error) {
|
func (s *SignService) Head(ctx context.Context, req *object.HeadRequest) (*object.HeadResponse, error) {
|
||||||
if err := s.sigSvc.VerifyRequest(req); err != nil {
|
resp, err := s.sigSvc.HandleUnaryRequest(ctx, req,
|
||||||
resp := new(object.HeadResponse)
|
func(ctx context.Context, req any) (util.ResponseMessage, error) {
|
||||||
return resp, s.sigSvc.SignResponse(util.IsStatusSupported(req), resp, err)
|
return s.svc.Head(ctx, req.(*object.HeadRequest))
|
||||||
|
},
|
||||||
|
func() util.ResponseMessage {
|
||||||
|
return new(object.HeadResponse)
|
||||||
|
},
|
||||||
|
)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
}
|
}
|
||||||
resp, err := util.WrapResponse(s.svc.Head(ctx, req))
|
|
||||||
return resp, s.sigSvc.SignResponse(util.IsStatusSupported(req), resp, err)
|
return resp.(*object.HeadResponse), nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *searchStreamSigner) Send(resp *object.SearchResponse) error {
|
func (s *searchStreamSigner) Send(resp *object.SearchResponse) error {
|
||||||
s.nonEmptyResp = true
|
s.nonEmptyResp = true
|
||||||
if err := s.sigSvc.SignResponse(s.statusSupported, resp, nil); err != nil {
|
return s.respWriter(resp)
|
||||||
return err
|
|
||||||
}
|
|
||||||
return s.SearchStream.Send(resp)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *SignService) Search(req *object.SearchRequest, stream SearchStream) error {
|
func (s *SignService) Search(req *object.SearchRequest, stream SearchStream) error {
|
||||||
if err := s.sigSvc.VerifyRequest(req); err != nil {
|
return s.sigSvc.HandleServerStreamRequest(req,
|
||||||
resp := new(object.SearchResponse)
|
func(resp util.ResponseMessage) error {
|
||||||
_ = s.sigSvc.SignResponse(util.IsStatusSupported(req), resp, err)
|
return stream.Send(resp.(*object.SearchResponse))
|
||||||
return stream.Send(resp)
|
},
|
||||||
|
func() util.ResponseMessage {
|
||||||
|
return new(object.SearchResponse)
|
||||||
|
},
|
||||||
|
func(respWriter util.ResponseMessageWriter) error {
|
||||||
|
stream := &searchStreamSigner{
|
||||||
|
ServerStream: stream,
|
||||||
|
respWriter: respWriter,
|
||||||
}
|
}
|
||||||
|
|
||||||
ss := &searchStreamSigner{
|
err := s.svc.Search(req, stream)
|
||||||
SearchStream: stream,
|
|
||||||
sigSvc: s.sigSvc,
|
if err == nil && !stream.nonEmptyResp {
|
||||||
statusSupported: util.IsStatusSupported(req),
|
|
||||||
}
|
|
||||||
err := s.svc.Search(req, ss)
|
|
||||||
if err == nil && !ss.nonEmptyResp {
|
|
||||||
// The higher component does not write any response in the case of an empty result (which is correct).
|
// The higher component does not write any response in the case of an empty result (which is correct).
|
||||||
// With the introduction of status returns at least one answer must be signed and sent to the client.
|
// With the introduction of status returns at least one answer must be signed and sent to the client.
|
||||||
// This approach is supported by clients who do not know how to work with statuses (one could make
|
// This approach is supported by clients who do not know how to work with statuses (one could make
|
||||||
|
@ -148,44 +149,61 @@ func (s *SignService) Search(req *object.SearchRequest, stream SearchStream) err
|
||||||
// answer can be neglected due to the gradual refusal to use the "old" clients).
|
// answer can be neglected due to the gradual refusal to use the "old" clients).
|
||||||
return stream.Send(new(object.SearchResponse))
|
return stream.Send(new(object.SearchResponse))
|
||||||
}
|
}
|
||||||
|
|
||||||
return err
|
return err
|
||||||
|
},
|
||||||
|
)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *SignService) Delete(ctx context.Context, req *object.DeleteRequest) (*object.DeleteResponse, error) {
|
func (s *SignService) Delete(ctx context.Context, req *object.DeleteRequest) (*object.DeleteResponse, error) {
|
||||||
if err := s.sigSvc.VerifyRequest(req); err != nil {
|
resp, err := s.sigSvc.HandleUnaryRequest(ctx, req,
|
||||||
resp := new(object.DeleteResponse)
|
func(ctx context.Context, req any) (util.ResponseMessage, error) {
|
||||||
return resp, s.sigSvc.SignResponse(util.IsStatusSupported(req), resp, err)
|
return s.svc.Delete(ctx, req.(*object.DeleteRequest))
|
||||||
|
},
|
||||||
|
func() util.ResponseMessage {
|
||||||
|
return new(object.DeleteResponse)
|
||||||
|
},
|
||||||
|
)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
}
|
}
|
||||||
resp, err := util.WrapResponse(s.svc.Delete(ctx, req))
|
|
||||||
return resp, s.sigSvc.SignResponse(util.IsStatusSupported(req), resp, err)
|
return resp.(*object.DeleteResponse), nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *getRangeStreamSigner) Send(resp *object.GetRangeResponse) error {
|
func (s *getRangeStreamSigner) Send(resp *object.GetRangeResponse) error {
|
||||||
if err := s.sigSvc.SignResponse(s.statusSupported, resp, nil); err != nil {
|
return s.respWriter(resp)
|
||||||
return err
|
|
||||||
}
|
|
||||||
return s.GetObjectRangeStream.Send(resp)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *SignService) GetRange(req *object.GetRangeRequest, stream GetObjectRangeStream) error {
|
func (s *SignService) GetRange(req *object.GetRangeRequest, stream GetObjectRangeStream) error {
|
||||||
if err := s.sigSvc.VerifyRequest(req); err != nil {
|
return s.sigSvc.HandleServerStreamRequest(req,
|
||||||
resp := new(object.GetRangeResponse)
|
func(resp util.ResponseMessage) error {
|
||||||
_ = s.sigSvc.SignResponse(util.IsStatusSupported(req), resp, err)
|
return stream.Send(resp.(*object.GetRangeResponse))
|
||||||
return stream.Send(resp)
|
},
|
||||||
}
|
func() util.ResponseMessage {
|
||||||
|
return new(object.GetRangeResponse)
|
||||||
|
},
|
||||||
|
func(respWriter util.ResponseMessageWriter) error {
|
||||||
return s.svc.GetRange(req, &getRangeStreamSigner{
|
return s.svc.GetRange(req, &getRangeStreamSigner{
|
||||||
GetObjectRangeStream: stream,
|
ServerStream: stream,
|
||||||
sigSvc: s.sigSvc,
|
respWriter: respWriter,
|
||||||
statusSupported: util.IsStatusSupported(req),
|
|
||||||
})
|
})
|
||||||
|
},
|
||||||
|
)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *SignService) GetRangeHash(ctx context.Context, req *object.GetRangeHashRequest) (*object.GetRangeHashResponse, error) {
|
func (s *SignService) GetRangeHash(ctx context.Context, req *object.GetRangeHashRequest) (*object.GetRangeHashResponse, error) {
|
||||||
if err := s.sigSvc.VerifyRequest(req); err != nil {
|
resp, err := s.sigSvc.HandleUnaryRequest(ctx, req,
|
||||||
resp := new(object.GetRangeHashResponse)
|
func(ctx context.Context, req any) (util.ResponseMessage, error) {
|
||||||
return resp, s.sigSvc.SignResponse(util.IsStatusSupported(req), resp, err)
|
return s.svc.GetRangeHash(ctx, req.(*object.GetRangeHashRequest))
|
||||||
|
},
|
||||||
|
func() util.ResponseMessage {
|
||||||
|
return new(object.GetRangeHashResponse)
|
||||||
|
},
|
||||||
|
)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
}
|
}
|
||||||
resp, err := util.WrapResponse(s.svc.GetRangeHash(ctx, req))
|
|
||||||
return resp, s.sigSvc.SignResponse(util.IsStatusSupported(req), resp, err)
|
return resp.(*object.GetRangeHashResponse), nil
|
||||||
}
|
}
|
||||||
|
|
|
@ -51,11 +51,11 @@ func (oiw *objectsInWork) add(addr oid.Address) {
|
||||||
// Policer represents the utility that verifies
|
// Policer represents the utility that verifies
|
||||||
// compliance with the object storage policy.
|
// compliance with the object storage policy.
|
||||||
type Policer struct {
|
type Policer struct {
|
||||||
cfg
|
*cfg
|
||||||
|
|
||||||
cache *lru.Cache[oid.Address, time.Time]
|
cache *lru.Cache[oid.Address, time.Time]
|
||||||
|
|
||||||
objsInWork objectsInWork
|
objsInWork *objectsInWork
|
||||||
}
|
}
|
||||||
|
|
||||||
// Option is an option for Policer constructor.
|
// Option is an option for Policer constructor.
|
||||||
|
@ -95,33 +95,38 @@ type cfg struct {
|
||||||
rebalanceFreq, evictDuration time.Duration
|
rebalanceFreq, evictDuration time.Duration
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *cfg) initDefault() {
|
func defaultCfg() *cfg {
|
||||||
c.log = &logger.Logger{Logger: zap.L()}
|
return &cfg{
|
||||||
c.batchSize = 10
|
log: &logger.Logger{Logger: zap.L()},
|
||||||
c.cacheSize = 1024 // 1024 * address size = 1024 * 64 = 64 MiB
|
batchSize: 10,
|
||||||
c.rebalanceFreq = 1 * time.Second
|
cacheSize: 1024, // 1024 * address size = 1024 * 64 = 64 MiB
|
||||||
c.evictDuration = 30 * time.Second
|
rebalanceFreq: 1 * time.Second,
|
||||||
|
evictDuration: 30 * time.Second,
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// New creates, initializes and returns Policer instance.
|
// New creates, initializes and returns Policer instance.
|
||||||
func New(opts ...Option) *Policer {
|
func New(opts ...Option) *Policer {
|
||||||
var p Policer
|
c := defaultCfg()
|
||||||
p.cfg.initDefault()
|
|
||||||
|
|
||||||
for i := range opts {
|
for i := range opts {
|
||||||
opts[i](&p.cfg)
|
opts[i](c)
|
||||||
}
|
}
|
||||||
|
|
||||||
p.log = &logger.Logger{Logger: p.cfg.log.With(zap.String("component", "Object Policer"))}
|
c.log = &logger.Logger{Logger: c.log.With(zap.String("component", "Object Policer"))}
|
||||||
|
|
||||||
cache, err := lru.New[oid.Address, time.Time](int(p.cacheSize))
|
cache, err := lru.New[oid.Address, time.Time](int(c.cacheSize))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
panic(err)
|
panic(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
p.cache = cache
|
return &Policer{
|
||||||
p.objsInWork.objs = make(map[oid.Address]struct{}, p.maxCapacity)
|
cfg: c,
|
||||||
return &p
|
cache: cache,
|
||||||
|
objsInWork: &objectsInWork{
|
||||||
|
objs: make(map[oid.Address]struct{}, c.maxCapacity),
|
||||||
|
},
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// WithHeadTimeout returns option to set Head timeout of Policer.
|
// WithHeadTimeout returns option to set Head timeout of Policer.
|
||||||
|
|
50
pkg/services/reputation/rpc/response.go
Normal file
50
pkg/services/reputation/rpc/response.go
Normal file
|
@ -0,0 +1,50 @@
|
||||||
|
package reputationrpc
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/reputation"
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/util"
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/util/response"
|
||||||
|
)
|
||||||
|
|
||||||
|
type responseService struct {
|
||||||
|
respSvc *response.Service
|
||||||
|
|
||||||
|
svc Server
|
||||||
|
}
|
||||||
|
|
||||||
|
// NewResponseService returns reputation service server instance that passes
|
||||||
|
// internal service call to response service.
|
||||||
|
func NewResponseService(cnrSvc Server, respSvc *response.Service) Server {
|
||||||
|
return &responseService{
|
||||||
|
respSvc: respSvc,
|
||||||
|
svc: cnrSvc,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *responseService) AnnounceLocalTrust(ctx context.Context, req *reputation.AnnounceLocalTrustRequest) (*reputation.AnnounceLocalTrustResponse, error) {
|
||||||
|
resp, err := s.respSvc.HandleUnaryRequest(ctx, req,
|
||||||
|
func(ctx context.Context, req any) (util.ResponseMessage, error) {
|
||||||
|
return s.svc.AnnounceLocalTrust(ctx, req.(*reputation.AnnounceLocalTrustRequest))
|
||||||
|
},
|
||||||
|
)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
return resp.(*reputation.AnnounceLocalTrustResponse), nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *responseService) AnnounceIntermediateResult(ctx context.Context, req *reputation.AnnounceIntermediateResultRequest) (*reputation.AnnounceIntermediateResultResponse, error) {
|
||||||
|
resp, err := s.respSvc.HandleUnaryRequest(ctx, req,
|
||||||
|
func(ctx context.Context, req any) (util.ResponseMessage, error) {
|
||||||
|
return s.svc.AnnounceIntermediateResult(ctx, req.(*reputation.AnnounceIntermediateResultRequest))
|
||||||
|
},
|
||||||
|
)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
return resp.(*reputation.AnnounceIntermediateResultResponse), nil
|
||||||
|
}
|
|
@ -22,19 +22,33 @@ func NewSignService(key *ecdsa.PrivateKey, svc Server) Server {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *signService) AnnounceLocalTrust(ctx context.Context, req *reputation.AnnounceLocalTrustRequest) (*reputation.AnnounceLocalTrustResponse, error) {
|
func (s *signService) AnnounceLocalTrust(ctx context.Context, req *reputation.AnnounceLocalTrustRequest) (*reputation.AnnounceLocalTrustResponse, error) {
|
||||||
if err := s.sigSvc.VerifyRequest(req); err != nil {
|
resp, err := s.sigSvc.HandleUnaryRequest(ctx, req,
|
||||||
resp := new(reputation.AnnounceLocalTrustResponse)
|
func(ctx context.Context, req any) (util.ResponseMessage, error) {
|
||||||
return resp, s.sigSvc.SignResponse(util.IsStatusSupported(req), resp, err)
|
return s.svc.AnnounceLocalTrust(ctx, req.(*reputation.AnnounceLocalTrustRequest))
|
||||||
|
},
|
||||||
|
func() util.ResponseMessage {
|
||||||
|
return new(reputation.AnnounceLocalTrustResponse)
|
||||||
|
},
|
||||||
|
)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
}
|
}
|
||||||
resp, err := util.WrapResponse(s.svc.AnnounceLocalTrust(ctx, req))
|
|
||||||
return resp, s.sigSvc.SignResponse(util.IsStatusSupported(req), resp, err)
|
return resp.(*reputation.AnnounceLocalTrustResponse), nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *signService) AnnounceIntermediateResult(ctx context.Context, req *reputation.AnnounceIntermediateResultRequest) (*reputation.AnnounceIntermediateResultResponse, error) {
|
func (s *signService) AnnounceIntermediateResult(ctx context.Context, req *reputation.AnnounceIntermediateResultRequest) (*reputation.AnnounceIntermediateResultResponse, error) {
|
||||||
if err := s.sigSvc.VerifyRequest(req); err != nil {
|
resp, err := s.sigSvc.HandleUnaryRequest(ctx, req,
|
||||||
resp := new(reputation.AnnounceIntermediateResultResponse)
|
func(ctx context.Context, req any) (util.ResponseMessage, error) {
|
||||||
return resp, s.sigSvc.SignResponse(util.IsStatusSupported(req), resp, err)
|
return s.svc.AnnounceIntermediateResult(ctx, req.(*reputation.AnnounceIntermediateResultRequest))
|
||||||
|
},
|
||||||
|
func() util.ResponseMessage {
|
||||||
|
return new(reputation.AnnounceIntermediateResultResponse)
|
||||||
|
},
|
||||||
|
)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
}
|
}
|
||||||
resp, err := util.WrapResponse(s.svc.AnnounceIntermediateResult(ctx, req))
|
|
||||||
return resp, s.sigSvc.SignResponse(util.IsStatusSupported(req), resp, err)
|
return resp.(*reputation.AnnounceIntermediateResultResponse), nil
|
||||||
}
|
}
|
||||||
|
|
|
@ -5,7 +5,6 @@ import (
|
||||||
"fmt"
|
"fmt"
|
||||||
|
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/session"
|
"git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/session"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/util/response"
|
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger"
|
||||||
"go.uber.org/zap"
|
"go.uber.org/zap"
|
||||||
)
|
)
|
||||||
|
@ -17,17 +16,14 @@ type ServiceExecutor interface {
|
||||||
type executorSvc struct {
|
type executorSvc struct {
|
||||||
exec ServiceExecutor
|
exec ServiceExecutor
|
||||||
|
|
||||||
respSvc *response.Service
|
|
||||||
|
|
||||||
log *logger.Logger
|
log *logger.Logger
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewExecutionService wraps ServiceExecutor and returns Session Service interface.
|
// NewExecutionService wraps ServiceExecutor and returns Session Service interface.
|
||||||
func NewExecutionService(exec ServiceExecutor, respSvc *response.Service, l *logger.Logger) Server {
|
func NewExecutionService(exec ServiceExecutor, l *logger.Logger) Server {
|
||||||
return &executorSvc{
|
return &executorSvc{
|
||||||
exec: exec,
|
exec: exec,
|
||||||
log: l,
|
log: l,
|
||||||
respSvc: respSvc,
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -45,6 +41,5 @@ func (s *executorSvc) Create(ctx context.Context, req *session.CreateRequest) (*
|
||||||
resp := new(session.CreateResponse)
|
resp := new(session.CreateResponse)
|
||||||
resp.SetBody(respBody)
|
resp.SetBody(respBody)
|
||||||
|
|
||||||
s.respSvc.SetMeta(resp)
|
|
||||||
return resp, nil
|
return resp, nil
|
||||||
}
|
}
|
||||||
|
|
37
pkg/services/session/response.go
Normal file
37
pkg/services/session/response.go
Normal file
|
@ -0,0 +1,37 @@
|
||||||
|
package session
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/session"
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/util"
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/util/response"
|
||||||
|
)
|
||||||
|
|
||||||
|
type responseService struct {
|
||||||
|
respSvc *response.Service
|
||||||
|
|
||||||
|
svc Server
|
||||||
|
}
|
||||||
|
|
||||||
|
// NewResponseService returns session service instance that passes internal service
|
||||||
|
// call to response service.
|
||||||
|
func NewResponseService(ssSvc Server, respSvc *response.Service) Server {
|
||||||
|
return &responseService{
|
||||||
|
respSvc: respSvc,
|
||||||
|
svc: ssSvc,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *responseService) Create(ctx context.Context, req *session.CreateRequest) (*session.CreateResponse, error) {
|
||||||
|
resp, err := s.respSvc.HandleUnaryRequest(ctx, req,
|
||||||
|
func(ctx context.Context, req any) (util.ResponseMessage, error) {
|
||||||
|
return s.svc.Create(ctx, req.(*session.CreateRequest))
|
||||||
|
},
|
||||||
|
)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
return resp.(*session.CreateResponse), nil
|
||||||
|
}
|
|
@ -22,10 +22,17 @@ func NewSignService(key *ecdsa.PrivateKey, svc Server) Server {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *signService) Create(ctx context.Context, req *session.CreateRequest) (*session.CreateResponse, error) {
|
func (s *signService) Create(ctx context.Context, req *session.CreateRequest) (*session.CreateResponse, error) {
|
||||||
if err := s.sigSvc.VerifyRequest(req); err != nil {
|
resp, err := s.sigSvc.HandleUnaryRequest(ctx, req,
|
||||||
resp := new(session.CreateResponse)
|
func(ctx context.Context, req any) (util.ResponseMessage, error) {
|
||||||
return resp, s.sigSvc.SignResponse(util.IsStatusSupported(req), resp, err)
|
return s.svc.Create(ctx, req.(*session.CreateRequest))
|
||||||
|
},
|
||||||
|
func() util.ResponseMessage {
|
||||||
|
return new(session.CreateResponse)
|
||||||
|
},
|
||||||
|
)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
}
|
}
|
||||||
resp, err := util.WrapResponse(s.svc.Create(ctx, req))
|
|
||||||
return resp, s.sigSvc.SignResponse(util.IsStatusSupported(req), resp, err)
|
return resp.(*session.CreateResponse), nil
|
||||||
}
|
}
|
||||||
|
|
47
pkg/services/util/response/client_stream.go
Normal file
47
pkg/services/util/response/client_stream.go
Normal file
|
@ -0,0 +1,47 @@
|
||||||
|
package response
|
||||||
|
|
||||||
|
import (
|
||||||
|
"fmt"
|
||||||
|
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/util"
|
||||||
|
)
|
||||||
|
|
||||||
|
// ClientMessageStreamer represents client-side message streamer
|
||||||
|
// that sets meta values to the response.
|
||||||
|
type ClientMessageStreamer struct {
|
||||||
|
cfg *cfg
|
||||||
|
|
||||||
|
send util.RequestMessageWriter
|
||||||
|
|
||||||
|
close util.ClientStreamCloser
|
||||||
|
}
|
||||||
|
|
||||||
|
// Send calls send method of internal streamer.
|
||||||
|
func (s *ClientMessageStreamer) Send(req any) error {
|
||||||
|
if err := s.send(req); err != nil {
|
||||||
|
return fmt.Errorf("(%T) could not send the request: %w", s, err)
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// CloseAndRecv closes internal stream, receivers the response,
|
||||||
|
// sets meta values and returns the result.
|
||||||
|
func (s *ClientMessageStreamer) CloseAndRecv() (util.ResponseMessage, error) {
|
||||||
|
resp, err := s.close()
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("(%T) could not close stream and receive response: %w", s, err)
|
||||||
|
}
|
||||||
|
|
||||||
|
setMeta(resp, s.cfg)
|
||||||
|
|
||||||
|
return resp, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// CreateRequestStreamer wraps stream methods and returns ClientMessageStreamer instance.
|
||||||
|
func (s *Service) CreateRequestStreamer(sender util.RequestMessageWriter, closer util.ClientStreamCloser) *ClientMessageStreamer {
|
||||||
|
return &ClientMessageStreamer{
|
||||||
|
cfg: s.cfg,
|
||||||
|
send: sender,
|
||||||
|
close: closer,
|
||||||
|
}
|
||||||
|
}
|
37
pkg/services/util/response/server_stream.go
Normal file
37
pkg/services/util/response/server_stream.go
Normal file
|
@ -0,0 +1,37 @@
|
||||||
|
package response
|
||||||
|
|
||||||
|
import (
|
||||||
|
"fmt"
|
||||||
|
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/util"
|
||||||
|
)
|
||||||
|
|
||||||
|
// ServerMessageStreamer represents server-side message streamer
|
||||||
|
// that sets meta values to all response messages.
|
||||||
|
type ServerMessageStreamer struct {
|
||||||
|
cfg *cfg
|
||||||
|
|
||||||
|
recv util.ResponseMessageReader
|
||||||
|
}
|
||||||
|
|
||||||
|
// Recv calls Recv method of internal streamer, sets response meta
|
||||||
|
// values and returns the response.
|
||||||
|
func (s *ServerMessageStreamer) Recv() (util.ResponseMessage, error) {
|
||||||
|
m, err := s.recv()
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("could not receive response message for signing: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
setMeta(m, s.cfg)
|
||||||
|
|
||||||
|
return m, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// HandleServerStreamRequest builds internal streamer via handlers, wraps it to ServerMessageStreamer and returns the result.
|
||||||
|
func (s *Service) HandleServerStreamRequest(respWriter util.ResponseMessageWriter) util.ResponseMessageWriter {
|
||||||
|
return func(resp util.ResponseMessage) error {
|
||||||
|
setMeta(resp, s.cfg)
|
||||||
|
|
||||||
|
return respWriter(resp)
|
||||||
|
}
|
||||||
|
}
|
|
@ -11,24 +11,44 @@ import (
|
||||||
// Service represents universal v2 service
|
// Service represents universal v2 service
|
||||||
// that sets response meta header values.
|
// that sets response meta header values.
|
||||||
type Service struct {
|
type Service struct {
|
||||||
|
cfg *cfg
|
||||||
|
}
|
||||||
|
|
||||||
|
// Option is an option of Service constructor.
|
||||||
|
type Option func(*cfg)
|
||||||
|
|
||||||
|
type cfg struct {
|
||||||
version refs.Version
|
version refs.Version
|
||||||
|
|
||||||
state netmap.State
|
state netmap.State
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewService creates, initializes and returns Service instance.
|
func defaultCfg() *cfg {
|
||||||
func NewService(nmState netmap.State) *Service {
|
var c cfg
|
||||||
s := &Service{state: nmState}
|
|
||||||
version.Current().WriteToV2(&s.version)
|
version.Current().WriteToV2(&c.version)
|
||||||
return s
|
|
||||||
|
return &c
|
||||||
}
|
}
|
||||||
|
|
||||||
// SetMeta sets adds meta-header to resp.
|
// NewService creates, initializes and returns Service instance.
|
||||||
func (s *Service) SetMeta(resp util.ResponseMessage) {
|
func NewService(opts ...Option) *Service {
|
||||||
|
c := defaultCfg()
|
||||||
|
|
||||||
|
for i := range opts {
|
||||||
|
opts[i](c)
|
||||||
|
}
|
||||||
|
|
||||||
|
return &Service{
|
||||||
|
cfg: c,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func setMeta(resp util.ResponseMessage, cfg *cfg) {
|
||||||
meta := new(session.ResponseMetaHeader)
|
meta := new(session.ResponseMetaHeader)
|
||||||
meta.SetVersion(&s.version)
|
meta.SetVersion(&cfg.version)
|
||||||
meta.SetTTL(1) // FIXME: #1160 TTL must be calculated
|
meta.SetTTL(1) // FIXME: #1160 TTL must be calculated
|
||||||
meta.SetEpoch(s.state.CurrentEpoch())
|
meta.SetEpoch(cfg.state.CurrentEpoch())
|
||||||
|
|
||||||
if origin := resp.GetMetaHeader(); origin != nil {
|
if origin := resp.GetMetaHeader(); origin != nil {
|
||||||
// FIXME: #1160 what if origin is set by local server?
|
// FIXME: #1160 what if origin is set by local server?
|
||||||
|
@ -37,3 +57,10 @@ func (s *Service) SetMeta(resp util.ResponseMessage) {
|
||||||
|
|
||||||
resp.SetMetaHeader(meta)
|
resp.SetMetaHeader(meta)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// WithNetworkState returns option to set network state of Service.
|
||||||
|
func WithNetworkState(v netmap.State) Option {
|
||||||
|
return func(c *cfg) {
|
||||||
|
c.state = v
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
21
pkg/services/util/response/unary.go
Normal file
21
pkg/services/util/response/unary.go
Normal file
|
@ -0,0 +1,21 @@
|
||||||
|
package response
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"fmt"
|
||||||
|
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/util"
|
||||||
|
)
|
||||||
|
|
||||||
|
// HandleUnaryRequest call passes request to handler, sets response meta header values and returns it.
|
||||||
|
func (s *Service) HandleUnaryRequest(ctx context.Context, req any, handler util.UnaryHandler) (util.ResponseMessage, error) {
|
||||||
|
// process request
|
||||||
|
resp, err := handler(ctx, req)
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("could not handle request: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
setMeta(resp, s.cfg)
|
||||||
|
|
||||||
|
return resp, nil
|
||||||
|
}
|
|
@ -1,6 +1,7 @@
|
||||||
package util
|
package util
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"context"
|
||||||
"crypto/ecdsa"
|
"crypto/ecdsa"
|
||||||
"errors"
|
"errors"
|
||||||
"fmt"
|
"fmt"
|
||||||
|
@ -20,59 +21,198 @@ type ResponseMessage interface {
|
||||||
SetMetaHeader(*session.ResponseMetaHeader)
|
SetMetaHeader(*session.ResponseMetaHeader)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type UnaryHandler func(context.Context, any) (ResponseMessage, error)
|
||||||
|
|
||||||
type SignService struct {
|
type SignService struct {
|
||||||
key *ecdsa.PrivateKey
|
key *ecdsa.PrivateKey
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type ResponseMessageWriter func(ResponseMessage) error
|
||||||
|
|
||||||
|
type ServerStreamHandler func(context.Context, any) (ResponseMessageReader, error)
|
||||||
|
|
||||||
|
type ResponseMessageReader func() (ResponseMessage, error)
|
||||||
|
|
||||||
var ErrAbortStream = errors.New("abort message stream")
|
var ErrAbortStream = errors.New("abort message stream")
|
||||||
|
|
||||||
|
type ResponseConstructor func() ResponseMessage
|
||||||
|
|
||||||
|
type RequestMessageWriter func(any) error
|
||||||
|
|
||||||
|
type ClientStreamCloser func() (ResponseMessage, error)
|
||||||
|
|
||||||
|
type RequestMessageStreamer struct {
|
||||||
|
key *ecdsa.PrivateKey
|
||||||
|
|
||||||
|
send RequestMessageWriter
|
||||||
|
|
||||||
|
close ClientStreamCloser
|
||||||
|
|
||||||
|
respCons ResponseConstructor
|
||||||
|
|
||||||
|
statusSupported bool
|
||||||
|
|
||||||
|
sendErr error
|
||||||
|
}
|
||||||
|
|
||||||
func NewUnarySignService(key *ecdsa.PrivateKey) *SignService {
|
func NewUnarySignService(key *ecdsa.PrivateKey) *SignService {
|
||||||
return &SignService{
|
return &SignService{
|
||||||
key: key,
|
key: key,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// SignResponse response with private key via signature.SignServiceMessage.
|
func (s *RequestMessageStreamer) Send(req any) error {
|
||||||
// The signature error affects the result depending on the protocol version:
|
// req argument should be strengthen with type RequestMessage
|
||||||
// - if status return is supported, panics since we cannot return the failed status, because it will not be signed.
|
s.statusSupported = isStatusSupported(req.(RequestMessage)) // panic is OK here for now
|
||||||
// - otherwise, returns error in order to transport it directly.
|
|
||||||
func (s *SignService) SignResponse(statusSupported bool, resp ResponseMessage, err error) error {
|
var err error
|
||||||
|
|
||||||
|
// verify request signatures
|
||||||
|
if err = signature.VerifyServiceMessage(req); err != nil {
|
||||||
|
err = fmt.Errorf("could not verify request: %w", err)
|
||||||
|
} else {
|
||||||
|
err = s.send(req)
|
||||||
|
}
|
||||||
|
|
||||||
|
if err != nil {
|
||||||
|
if !s.statusSupported {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
s.sendErr = err
|
||||||
|
|
||||||
|
return ErrAbortStream
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *RequestMessageStreamer) CloseAndRecv() (ResponseMessage, error) {
|
||||||
|
var (
|
||||||
|
resp ResponseMessage
|
||||||
|
err error
|
||||||
|
)
|
||||||
|
|
||||||
|
if s.sendErr != nil {
|
||||||
|
err = s.sendErr
|
||||||
|
} else {
|
||||||
|
resp, err = s.close()
|
||||||
|
if err != nil {
|
||||||
|
err = fmt.Errorf("could not close stream and receive response: %w", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if err != nil {
|
||||||
|
if !s.statusSupported {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
resp = s.respCons()
|
||||||
|
|
||||||
|
setStatusV2(resp, err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if err = signResponse(s.key, resp, s.statusSupported); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
return resp, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *SignService) CreateRequestStreamer(sender RequestMessageWriter, closer ClientStreamCloser, blankResp ResponseConstructor) *RequestMessageStreamer {
|
||||||
|
return &RequestMessageStreamer{
|
||||||
|
key: s.key,
|
||||||
|
send: sender,
|
||||||
|
close: closer,
|
||||||
|
|
||||||
|
respCons: blankResp,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *SignService) HandleServerStreamRequest(
|
||||||
|
req any,
|
||||||
|
respWriter ResponseMessageWriter,
|
||||||
|
blankResp ResponseConstructor,
|
||||||
|
respWriterCaller func(ResponseMessageWriter) error,
|
||||||
|
) error {
|
||||||
|
// handle protocol versions <=2.10 (API statuses was introduced in 2.11 only)
|
||||||
|
|
||||||
|
// req argument should be strengthen with type RequestMessage
|
||||||
|
statusSupported := isStatusSupported(req.(RequestMessage)) // panic is OK here for now
|
||||||
|
|
||||||
|
var err error
|
||||||
|
|
||||||
|
// verify request signatures
|
||||||
|
if err = signature.VerifyServiceMessage(req); err != nil {
|
||||||
|
err = fmt.Errorf("could not verify request: %w", err)
|
||||||
|
} else {
|
||||||
|
err = respWriterCaller(func(resp ResponseMessage) error {
|
||||||
|
if err := signResponse(s.key, resp, statusSupported); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
return respWriter(resp)
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
if !statusSupported {
|
if !statusSupported {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
resp := blankResp()
|
||||||
|
|
||||||
|
setStatusV2(resp, err)
|
||||||
|
|
||||||
|
_ = signResponse(s.key, resp, false) // panics or returns nil with false arg
|
||||||
|
|
||||||
|
return respWriter(resp)
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *SignService) HandleUnaryRequest(ctx context.Context, req any, handler UnaryHandler, blankResp ResponseConstructor) (ResponseMessage, error) {
|
||||||
|
// handle protocol versions <=2.10 (API statuses was introduced in 2.11 only)
|
||||||
|
|
||||||
|
// req argument should be strengthen with type RequestMessage
|
||||||
|
statusSupported := isStatusSupported(req.(RequestMessage)) // panic is OK here for now
|
||||||
|
|
||||||
|
var (
|
||||||
|
resp ResponseMessage
|
||||||
|
err error
|
||||||
|
)
|
||||||
|
|
||||||
|
// verify request signatures
|
||||||
|
if err = signature.VerifyServiceMessage(req); err != nil {
|
||||||
|
var sigErr apistatus.SignatureVerification
|
||||||
|
sigErr.SetMessage(err.Error())
|
||||||
|
|
||||||
|
err = sigErr
|
||||||
|
} else {
|
||||||
|
// process request
|
||||||
|
resp, err = handler(ctx, req)
|
||||||
|
}
|
||||||
|
|
||||||
|
if err != nil {
|
||||||
|
if !statusSupported {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
resp = blankResp()
|
||||||
|
|
||||||
setStatusV2(resp, err)
|
setStatusV2(resp, err)
|
||||||
}
|
}
|
||||||
|
|
||||||
err = signature.SignServiceMessage(s.key, resp)
|
// sign the response
|
||||||
if err != nil {
|
if err = signResponse(s.key, resp, statusSupported); err != nil {
|
||||||
return fmt.Errorf("could not sign response: %w", err)
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
return nil
|
return resp, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *SignService) VerifyRequest(req RequestMessage) error {
|
func isStatusSupported(req RequestMessage) bool {
|
||||||
if err := signature.VerifyServiceMessage(req); err != nil {
|
|
||||||
var sigErr apistatus.SignatureVerification
|
|
||||||
sigErr.SetMessage(err.Error())
|
|
||||||
return sigErr
|
|
||||||
}
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// WrapResponse creates an appropriate response struct if it is nil.
|
|
||||||
func WrapResponse[T any](resp *T, err error) (*T, error) {
|
|
||||||
if resp != nil {
|
|
||||||
return resp, err
|
|
||||||
}
|
|
||||||
return new(T), nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// IsStatusSupported returns true iff request version implies expecting status return.
|
|
||||||
// This allows us to handle protocol versions <=2.10 (API statuses was introduced in 2.11 only).
|
|
||||||
func IsStatusSupported(req RequestMessage) bool {
|
|
||||||
version := req.GetMetaHeader().GetVersion()
|
version := req.GetMetaHeader().GetVersion()
|
||||||
|
|
||||||
mjr := version.GetMajor()
|
mjr := version.GetMajor()
|
||||||
|
@ -88,3 +228,22 @@ func setStatusV2(resp ResponseMessage, err error) {
|
||||||
|
|
||||||
session.SetStatus(resp, apistatus.ToStatusV2(apistatus.ErrToStatus(err)))
|
session.SetStatus(resp, apistatus.ToStatusV2(apistatus.ErrToStatus(err)))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// signs response with private key via signature.SignServiceMessage.
|
||||||
|
// The signature error affects the result depending on the protocol version:
|
||||||
|
// - if status return is supported, panics since we cannot return the failed status, because it will not be signed;
|
||||||
|
// - otherwise, returns error in order to transport it directly.
|
||||||
|
func signResponse(key *ecdsa.PrivateKey, resp any, statusSupported bool) error {
|
||||||
|
err := signature.SignServiceMessage(key, resp)
|
||||||
|
if err != nil {
|
||||||
|
err = fmt.Errorf("could not sign response: %w", err)
|
||||||
|
|
||||||
|
if statusSupported {
|
||||||
|
// We can't pass this error as status code since response will be unsigned.
|
||||||
|
// Isn't expected in practice, so panic is ok here.
|
||||||
|
panic(err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
Loading…
Reference in a new issue