Compare commits

...

16 commits

Author SHA1 Message Date
1d3d0b7cc1 [#6] services/object: Remove mutex from traversal struct
Make it being used from a single thread.

Signed-off-by: Evgenii Stratonikov <e.stratonikov@yadro.com>
2023-03-21 13:03:03 +03:00
b3dd9bcb89 [#6] services/object: Simplify local/remote targets
Signed-off-by: Evgenii Stratonikov <e.stratonikov@yadro.com>
2023-03-21 13:03:03 +03:00
f161630cf6 [#6] services/util: Do not panic in sign function
Signed-off-by: Evgenii Stratonikov <e.stratonikov@yadro.com>
2023-03-21 13:03:03 +03:00
cfd1b1d349 [#6] services/util: Remove remaining stream wrappers
Signed-off-by: Evgenii Stratonikov <e.stratonikov@yadro.com>
2023-03-21 13:03:03 +03:00
0283071b64 [#6] services/util: Remove SignService.HandleUnaryRequest
There is no need in a wrapper with many from-`interface{}` conversions.

Signed-off-by: Evgenii Stratonikov <e.stratonikov@yadro.com>
2023-03-21 13:03:03 +03:00
761a0aaecc [#6] services/util: Simplify response.Service
It has only 1 parameter.

Signed-off-by: Evgenii Stratonikov <e.stratonikov@yadro.com>
2023-03-21 13:03:03 +03:00
5d13f70cb0 [#6] services/util: Remove CreateRequestStreamer
There is no need in a wrapper with many from-`interface{}` conversions.

Signed-off-by: Evgenii Stratonikov <e.stratonikov@yadro.com>
2023-03-21 13:03:03 +03:00
cb2c579000 [#6] services/util: Remove HandleServerStreamRequest
There is no need in a wrapper with many from-`interface{}` conversions.

Signed-off-by: Evgenii Stratonikov <e.stratonikov@yadro.com>
2023-03-21 13:03:03 +03:00
4a080ecbb7 [#6] services/util: Remove HandleUnaryRequest
There is no need in a wrapper with many from-`interface{}` conversions.

Signed-off-by: Evgenii Stratonikov <e.stratonikov@yadro.com>
2023-03-21 13:03:03 +03:00
41866dfcaa [#6] services/object: Reduce distibutedTarget memory footprint
Signed-off-by: Evgenii Stratonikov <e.stratonikov@yadro.com>
2023-03-21 13:03:03 +03:00
b8f8462f38 [#6] services/policer: Reduce the amount of indirect pointers
Signed-off-by: Evgenii Stratonikov <e.stratonikov@yadro.com>
2023-03-21 13:03:03 +03:00
724346a704 [#6] services/object: Simplify storage wrappers
Signed-off-by: Evgenii Stratonikov <e.stratonikov@yadro.com>
2023-03-21 13:03:03 +03:00
9ca20ad80f [#6] services/object: Remove pointer unneeded indirections
All `Service` are accessed by pointer.

Signed-off-by: Evgenii Stratonikov <e.stratonikov@yadro.com>
2023-03-21 13:03:03 +03:00
c1cd32ecca [#6] eacl/v2: Rename cfg receiver
Signed-off-by: Evgenii Stratonikov <e.stratonikov@yadro.com>
2023-03-21 13:03:02 +03:00
9fbcc0e43c [#6] services/object: Remove nmSrc wrapper
It calls a single method from source without any processing.

Signed-off-by: Evgenii Stratonikov <e.stratonikov@yadro.com>
2023-03-21 13:02:06 +03:00
882d010870 [#6] services/object: Remove useless helpers
We have lots of small _private_ methods on `execCtx` whose sole purpose
is to just return a struct field.

Signed-off-by: Evgenii Stratonikov <e.stratonikov@yadro.com>
2023-03-21 13:02:05 +03:00
59 changed files with 576 additions and 1474 deletions

View file

@ -19,10 +19,8 @@ func initAccountingService(c *cfg) {
server := accountingTransportGRPC.New(
accountingService.NewSignService(
&c.key.PrivateKey,
accountingService.NewResponseService(
accountingService.NewExecutionService(
accounting.NewExecutor(balanceMorphWrapper),
),
c.respSvc,
),
),

View file

@ -590,7 +590,7 @@ func initCfg(appCfg *config.Config) *cfg {
key: key,
binPublicKey: key.PublicKey().Bytes(),
localAddr: netAddr,
respSvc: response.NewService(response.WithNetworkState(netState)),
respSvc: response.NewService(netState),
clientCache: cache.NewSDKClientCache(cacheOpts),
bgClientCache: cache.NewSDKClientCache(cacheOpts),
putClientCache: cache.NewSDKClientCache(cacheOpts),

View file

@ -197,16 +197,13 @@ func initContainerService(c *cfg) {
server := containerTransportGRPC.New(
containerService.NewSignService(
&c.key.PrivateKey,
containerService.NewResponseService(
&usedSpaceService{
Server: containerService.NewExecutionService(containerMorph.NewExecutor(cnrRdr, cnrWrt)),
Server: containerService.NewExecutionService(containerMorph.NewExecutor(cnrRdr, cnrWrt), c.respSvc),
loadWriterProvider: loadRouter,
loadPlacementBuilder: loadPlacementBuilder,
routeBuilder: routeBuilder,
cfg: c,
},
c.respSvc,
),
),
)
@ -568,6 +565,8 @@ func (c *usedSpaceService) AnnounceUsedSpace(ctx context.Context, req *container
resp := new(containerV2.AnnounceUsedSpaceResponse)
resp.SetBody(respBody)
c.cfg.respSvc.SetMeta(resp)
return resp, nil
}

View file

@ -153,7 +153,6 @@ func initNetmapService(c *cfg) {
server := netmapTransportGRPC.New(
netmapService.NewSignService(
&c.key.PrivateKey,
netmapService.NewResponseService(
netmapService.NewExecutionService(
c,
c.apiVersion,
@ -163,7 +162,6 @@ func initNetmapService(c *cfg) {
morphClientNetMap: c.cfgNetmap.wrapper,
msPerBlockRdr: c.cfgMorph.client.MsPerBlock,
},
),
c.respSvc,
),
),

View file

@ -199,7 +199,6 @@ func initReputationService(c *cfg) {
server := grpcreputation.New(
reputationrpc.NewSignService(
&c.key.PrivateKey,
reputationrpc.NewResponseService(
&reputationServer{
cfg: c,
log: c.log,
@ -207,8 +206,6 @@ func initReputationService(c *cfg) {
intermediateRouter: intermediateTrustRouter,
routeBuilder: localRouteBuilder,
},
c.respSvc,
),
),
)
@ -289,6 +286,7 @@ func (s *reputationServer) AnnounceLocalTrust(ctx context.Context, req *v2reputa
resp := new(v2reputation.AnnounceLocalTrustResponse)
resp.SetBody(new(v2reputation.AnnounceLocalTrustResponseBody))
s.respSvc.SetMeta(resp)
return resp, nil
}
@ -317,6 +315,7 @@ func (s *reputationServer) AnnounceIntermediateResult(ctx context.Context, req *
resp := new(v2reputation.AnnounceIntermediateResultResponse)
resp.SetBody(new(v2reputation.AnnounceIntermediateResultResponseBody))
s.respSvc.SetMeta(resp)
return resp, nil
}

View file

@ -53,10 +53,7 @@ func initSessionService(c *cfg) {
server := sessionTransportGRPC.New(
sessionSvc.NewSignService(
&c.key.PrivateKey,
sessionSvc.NewResponseService(
sessionSvc.NewExecutionService(c.privateTokenStore, c.log),
c.respSvc,
),
sessionSvc.NewExecutionService(c.privateTokenStore, c.respSvc, c.log),
),
)

View file

@ -5,6 +5,7 @@ import (
"fmt"
"git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/accounting"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/util/response"
)
type ServiceExecutor interface {
@ -13,12 +14,14 @@ type ServiceExecutor interface {
type executorSvc struct {
exec ServiceExecutor
respSvc *response.Service
}
// NewExecutionService wraps ServiceExecutor and returns Accounting Service interface.
func NewExecutionService(exec ServiceExecutor) Server {
func NewExecutionService(exec ServiceExecutor, respSvc *response.Service) Server {
return &executorSvc{
exec: exec,
respSvc: respSvc,
}
}
@ -31,5 +34,6 @@ func (s *executorSvc) Balance(ctx context.Context, req *accounting.BalanceReques
resp := new(accounting.BalanceResponse)
resp.SetBody(respBody)
s.respSvc.SetMeta(resp)
return resp, nil
}

View file

@ -1,37 +0,0 @@
package accounting
import (
"context"
"git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/accounting"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/util"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/util/response"
)
type responseService struct {
respSvc *response.Service
svc Server
}
// NewResponseService returns accounting service instance that passes internal service
// call to response service.
func NewResponseService(accSvc Server, respSvc *response.Service) Server {
return &responseService{
respSvc: respSvc,
svc: accSvc,
}
}
func (s *responseService) Balance(ctx context.Context, req *accounting.BalanceRequest) (*accounting.BalanceResponse, error) {
resp, err := s.respSvc.HandleUnaryRequest(ctx, req,
func(ctx context.Context, req any) (util.ResponseMessage, error) {
return s.svc.Balance(ctx, req.(*accounting.BalanceRequest))
},
)
if err != nil {
return nil, err
}
return resp.(*accounting.BalanceResponse), nil
}

View file

@ -22,17 +22,6 @@ func NewSignService(key *ecdsa.PrivateKey, svc Server) Server {
}
func (s *signService) Balance(ctx context.Context, req *accounting.BalanceRequest) (*accounting.BalanceResponse, error) {
resp, err := s.sigSvc.HandleUnaryRequest(ctx, req,
func(ctx context.Context, req any) (util.ResponseMessage, error) {
return s.svc.Balance(ctx, req.(*accounting.BalanceRequest))
},
func() util.ResponseMessage {
return new(accounting.BalanceResponse)
},
)
if err != nil {
return nil, err
}
return resp.(*accounting.BalanceResponse), nil
resp, err := util.WrapResponse(s.svc.Balance(ctx, req))
return resp, s.sigSvc.SignResponse(util.IsStatusSupported(req), resp, err)
}

View file

@ -6,6 +6,7 @@ import (
"git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/container"
"git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/session"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/util/response"
)
type ServiceExecutor interface {
@ -21,12 +22,15 @@ type executorSvc struct {
Server
exec ServiceExecutor
respSvc *response.Service
}
// NewExecutionService wraps ServiceExecutor and returns Container Service interface.
func NewExecutionService(exec ServiceExecutor) Server {
func NewExecutionService(exec ServiceExecutor, respSvc *response.Service) Server {
return &executorSvc{
exec: exec,
respSvc: respSvc,
}
}
@ -44,6 +48,7 @@ func (s *executorSvc) Put(ctx context.Context, req *container.PutRequest) (*cont
resp := new(container.PutResponse)
resp.SetBody(respBody)
s.respSvc.SetMeta(resp)
return resp, nil
}
@ -61,6 +66,7 @@ func (s *executorSvc) Delete(ctx context.Context, req *container.DeleteRequest)
resp := new(container.DeleteResponse)
resp.SetBody(respBody)
s.respSvc.SetMeta(resp)
return resp, nil
}
@ -73,6 +79,7 @@ func (s *executorSvc) Get(ctx context.Context, req *container.GetRequest) (*cont
resp := new(container.GetResponse)
resp.SetBody(respBody)
s.respSvc.SetMeta(resp)
return resp, nil
}
@ -85,6 +92,7 @@ func (s *executorSvc) List(ctx context.Context, req *container.ListRequest) (*co
resp := new(container.ListResponse)
resp.SetBody(respBody)
s.respSvc.SetMeta(resp)
return resp, nil
}
@ -102,6 +110,7 @@ func (s *executorSvc) SetExtendedACL(ctx context.Context, req *container.SetExte
resp := new(container.SetExtendedACLResponse)
resp.SetBody(respBody)
s.respSvc.SetMeta(resp)
return resp, nil
}
@ -114,5 +123,6 @@ func (s *executorSvc) GetExtendedACL(ctx context.Context, req *container.GetExte
resp := new(container.GetExtendedACLResponse)
resp.SetBody(respBody)
s.respSvc.SetMeta(resp)
return resp, nil
}

View file

@ -1,115 +0,0 @@
package container
import (
"context"
"git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/container"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/util"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/util/response"
)
type responseService struct {
respSvc *response.Service
svc Server
}
// NewResponseService returns container service instance that passes internal service
// call to response service.
func NewResponseService(cnrSvc Server, respSvc *response.Service) Server {
return &responseService{
respSvc: respSvc,
svc: cnrSvc,
}
}
func (s *responseService) Put(ctx context.Context, req *container.PutRequest) (*container.PutResponse, error) {
resp, err := s.respSvc.HandleUnaryRequest(ctx, req,
func(ctx context.Context, req any) (util.ResponseMessage, error) {
return s.svc.Put(ctx, req.(*container.PutRequest))
},
)
if err != nil {
return nil, err
}
return resp.(*container.PutResponse), nil
}
func (s *responseService) Delete(ctx context.Context, req *container.DeleteRequest) (*container.DeleteResponse, error) {
resp, err := s.respSvc.HandleUnaryRequest(ctx, req,
func(ctx context.Context, req any) (util.ResponseMessage, error) {
return s.svc.Delete(ctx, req.(*container.DeleteRequest))
},
)
if err != nil {
return nil, err
}
return resp.(*container.DeleteResponse), nil
}
func (s *responseService) Get(ctx context.Context, req *container.GetRequest) (*container.GetResponse, error) {
resp, err := s.respSvc.HandleUnaryRequest(ctx, req,
func(ctx context.Context, req any) (util.ResponseMessage, error) {
return s.svc.Get(ctx, req.(*container.GetRequest))
},
)
if err != nil {
return nil, err
}
return resp.(*container.GetResponse), nil
}
func (s *responseService) List(ctx context.Context, req *container.ListRequest) (*container.ListResponse, error) {
resp, err := s.respSvc.HandleUnaryRequest(ctx, req,
func(ctx context.Context, req any) (util.ResponseMessage, error) {
return s.svc.List(ctx, req.(*container.ListRequest))
},
)
if err != nil {
return nil, err
}
return resp.(*container.ListResponse), nil
}
func (s *responseService) SetExtendedACL(ctx context.Context, req *container.SetExtendedACLRequest) (*container.SetExtendedACLResponse, error) {
resp, err := s.respSvc.HandleUnaryRequest(ctx, req,
func(ctx context.Context, req any) (util.ResponseMessage, error) {
return s.svc.SetExtendedACL(ctx, req.(*container.SetExtendedACLRequest))
},
)
if err != nil {
return nil, err
}
return resp.(*container.SetExtendedACLResponse), nil
}
func (s *responseService) GetExtendedACL(ctx context.Context, req *container.GetExtendedACLRequest) (*container.GetExtendedACLResponse, error) {
resp, err := s.respSvc.HandleUnaryRequest(ctx, req,
func(ctx context.Context, req any) (util.ResponseMessage, error) {
return s.svc.GetExtendedACL(ctx, req.(*container.GetExtendedACLRequest))
},
)
if err != nil {
return nil, err
}
return resp.(*container.GetExtendedACLResponse), nil
}
func (s *responseService) AnnounceUsedSpace(ctx context.Context, req *container.AnnounceUsedSpaceRequest) (*container.AnnounceUsedSpaceResponse, error) {
resp, err := s.respSvc.HandleUnaryRequest(ctx, req,
func(ctx context.Context, req any) (util.ResponseMessage, error) {
return s.svc.AnnounceUsedSpace(ctx, req.(*container.AnnounceUsedSpaceRequest))
},
)
if err != nil {
return nil, err
}
return resp.(*container.AnnounceUsedSpaceResponse), nil
}

View file

@ -22,113 +22,64 @@ func NewSignService(key *ecdsa.PrivateKey, svc Server) Server {
}
func (s *signService) Put(ctx context.Context, req *container.PutRequest) (*container.PutResponse, error) {
resp, err := s.sigSvc.HandleUnaryRequest(ctx, req,
func(ctx context.Context, req any) (util.ResponseMessage, error) {
return s.svc.Put(ctx, req.(*container.PutRequest))
},
func() util.ResponseMessage {
return new(container.PutResponse)
},
)
if err != nil {
return nil, err
if err := s.sigSvc.VerifyRequest(req); err != nil {
resp := new(container.PutResponse)
return resp, s.sigSvc.SignResponse(util.IsStatusSupported(req), resp, err)
}
return resp.(*container.PutResponse), nil
resp, err := util.WrapResponse(s.svc.Put(ctx, req))
return resp, s.sigSvc.SignResponse(util.IsStatusSupported(req), resp, err)
}
func (s *signService) Delete(ctx context.Context, req *container.DeleteRequest) (*container.DeleteResponse, error) {
resp, err := s.sigSvc.HandleUnaryRequest(ctx, req,
func(ctx context.Context, req any) (util.ResponseMessage, error) {
return s.svc.Delete(ctx, req.(*container.DeleteRequest))
},
func() util.ResponseMessage {
return new(container.DeleteResponse)
},
)
if err != nil {
return nil, err
if err := s.sigSvc.VerifyRequest(req); err != nil {
resp := new(container.DeleteResponse)
return resp, s.sigSvc.SignResponse(util.IsStatusSupported(req), resp, err)
}
return resp.(*container.DeleteResponse), nil
resp, err := util.WrapResponse(s.svc.Delete(ctx, req))
return resp, s.sigSvc.SignResponse(util.IsStatusSupported(req), resp, err)
}
func (s *signService) Get(ctx context.Context, req *container.GetRequest) (*container.GetResponse, error) {
resp, err := s.sigSvc.HandleUnaryRequest(ctx, req,
func(ctx context.Context, req any) (util.ResponseMessage, error) {
return s.svc.Get(ctx, req.(*container.GetRequest))
},
func() util.ResponseMessage {
return new(container.GetResponse)
},
)
if err != nil {
return nil, err
if err := s.sigSvc.VerifyRequest(req); err != nil {
resp := new(container.GetResponse)
return resp, s.sigSvc.SignResponse(util.IsStatusSupported(req), resp, err)
}
return resp.(*container.GetResponse), nil
resp, err := util.WrapResponse(s.svc.Get(ctx, req))
return resp, s.sigSvc.SignResponse(util.IsStatusSupported(req), resp, err)
}
func (s *signService) List(ctx context.Context, req *container.ListRequest) (*container.ListResponse, error) {
resp, err := s.sigSvc.HandleUnaryRequest(ctx, req,
func(ctx context.Context, req any) (util.ResponseMessage, error) {
return s.svc.List(ctx, req.(*container.ListRequest))
},
func() util.ResponseMessage {
return new(container.ListResponse)
},
)
if err != nil {
return nil, err
if err := s.sigSvc.VerifyRequest(req); err != nil {
resp := new(container.ListResponse)
return resp, s.sigSvc.SignResponse(util.IsStatusSupported(req), resp, err)
}
return resp.(*container.ListResponse), nil
resp, err := util.WrapResponse(s.svc.List(ctx, req))
return resp, s.sigSvc.SignResponse(util.IsStatusSupported(req), resp, err)
}
func (s *signService) SetExtendedACL(ctx context.Context, req *container.SetExtendedACLRequest) (*container.SetExtendedACLResponse, error) {
resp, err := s.sigSvc.HandleUnaryRequest(ctx, req,
func(ctx context.Context, req any) (util.ResponseMessage, error) {
return s.svc.SetExtendedACL(ctx, req.(*container.SetExtendedACLRequest))
},
func() util.ResponseMessage {
return new(container.SetExtendedACLResponse)
},
)
if err != nil {
return nil, err
if err := s.sigSvc.VerifyRequest(req); err != nil {
resp := new(container.SetExtendedACLResponse)
return resp, s.sigSvc.SignResponse(util.IsStatusSupported(req), resp, err)
}
return resp.(*container.SetExtendedACLResponse), nil
resp, err := util.WrapResponse(s.svc.SetExtendedACL(ctx, req))
return resp, s.sigSvc.SignResponse(util.IsStatusSupported(req), resp, err)
}
func (s *signService) GetExtendedACL(ctx context.Context, req *container.GetExtendedACLRequest) (*container.GetExtendedACLResponse, error) {
resp, err := s.sigSvc.HandleUnaryRequest(ctx, req,
func(ctx context.Context, req any) (util.ResponseMessage, error) {
return s.svc.GetExtendedACL(ctx, req.(*container.GetExtendedACLRequest))
},
func() util.ResponseMessage {
return new(container.GetExtendedACLResponse)
},
)
if err != nil {
return nil, err
if err := s.sigSvc.VerifyRequest(req); err != nil {
resp := new(container.GetExtendedACLResponse)
return resp, s.sigSvc.SignResponse(util.IsStatusSupported(req), resp, err)
}
return resp.(*container.GetExtendedACLResponse), nil
resp, err := util.WrapResponse(s.svc.GetExtendedACL(ctx, req))
return resp, s.sigSvc.SignResponse(util.IsStatusSupported(req), resp, err)
}
func (s *signService) AnnounceUsedSpace(ctx context.Context, req *container.AnnounceUsedSpaceRequest) (*container.AnnounceUsedSpaceResponse, error) {
resp, err := s.sigSvc.HandleUnaryRequest(ctx, req,
func(ctx context.Context, req any) (util.ResponseMessage, error) {
return s.svc.AnnounceUsedSpace(ctx, req.(*container.AnnounceUsedSpaceRequest))
},
func() util.ResponseMessage {
return new(container.AnnounceUsedSpaceResponse)
},
)
if err != nil {
return nil, err
if err := s.sigSvc.VerifyRequest(req); err != nil {
resp := new(container.AnnounceUsedSpaceResponse)
return resp, s.sigSvc.SignResponse(util.IsStatusSupported(req), resp, err)
}
return resp.(*container.AnnounceUsedSpaceResponse), nil
resp, err := util.WrapResponse(s.svc.AnnounceUsedSpace(ctx, req))
return resp, s.sigSvc.SignResponse(util.IsStatusSupported(req), resp, err)
}

View file

@ -8,6 +8,7 @@ import (
"git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/netmap"
"git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/refs"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/version"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/util/response"
netmapSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/netmap"
versionsdk "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/version"
)
@ -18,6 +19,8 @@ type executorSvc struct {
state NodeState
netInfo NetworkInfo
respSvc *response.Service
}
// NodeState encapsulates information
@ -42,8 +45,8 @@ type NetworkInfo interface {
Dump(versionsdk.Version) (*netmapSDK.NetworkInfo, error)
}
func NewExecutionService(s NodeState, v versionsdk.Version, netInfo NetworkInfo) Server {
if s == nil || netInfo == nil || !version.IsValid(v) {
func NewExecutionService(s NodeState, v versionsdk.Version, netInfo NetworkInfo, respSvc *response.Service) Server {
if s == nil || netInfo == nil || !version.IsValid(v) || respSvc == nil {
// this should never happen, otherwise it programmers bug
panic("can't create netmap execution service")
}
@ -51,6 +54,7 @@ func NewExecutionService(s NodeState, v versionsdk.Version, netInfo NetworkInfo)
res := &executorSvc{
state: s,
netInfo: netInfo,
respSvc: respSvc,
}
v.WriteToV2(&res.version)
@ -96,6 +100,7 @@ func (s *executorSvc) LocalNodeInfo(
resp := new(netmap.LocalNodeInfoResponse)
resp.SetBody(body)
s.respSvc.SetMeta(resp)
return resp, nil
}
@ -126,10 +131,11 @@ func (s *executorSvc) NetworkInfo(
resp := new(netmap.NetworkInfoResponse)
resp.SetBody(body)
s.respSvc.SetMeta(resp)
return resp, nil
}
func (s *executorSvc) Snapshot(_ context.Context, req *netmap.SnapshotRequest) (*netmap.SnapshotResponse, error) {
func (s *executorSvc) Snapshot(_ context.Context, _ *netmap.SnapshotRequest) (*netmap.SnapshotResponse, error) {
var nm netmap.NetMap
err := s.state.ReadCurrentNetMap(&nm)
@ -143,5 +149,6 @@ func (s *executorSvc) Snapshot(_ context.Context, req *netmap.SnapshotRequest) (
resp := new(netmap.SnapshotResponse)
resp.SetBody(body)
s.respSvc.SetMeta(resp)
return resp, nil
}

View file

@ -1,63 +0,0 @@
package netmap
import (
"context"
"git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/netmap"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/util"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/util/response"
)
type responseService struct {
respSvc *response.Service
svc Server
}
// NewResponseService returns netmap service instance that passes internal service
// call to response service.
func NewResponseService(nmSvc Server, respSvc *response.Service) Server {
return &responseService{
respSvc: respSvc,
svc: nmSvc,
}
}
func (s *responseService) LocalNodeInfo(ctx context.Context, req *netmap.LocalNodeInfoRequest) (*netmap.LocalNodeInfoResponse, error) {
resp, err := s.respSvc.HandleUnaryRequest(ctx, req,
func(ctx context.Context, req any) (util.ResponseMessage, error) {
return s.svc.LocalNodeInfo(ctx, req.(*netmap.LocalNodeInfoRequest))
},
)
if err != nil {
return nil, err
}
return resp.(*netmap.LocalNodeInfoResponse), nil
}
func (s *responseService) NetworkInfo(ctx context.Context, req *netmap.NetworkInfoRequest) (*netmap.NetworkInfoResponse, error) {
resp, err := s.respSvc.HandleUnaryRequest(ctx, req,
func(ctx context.Context, req any) (util.ResponseMessage, error) {
return s.svc.NetworkInfo(ctx, req.(*netmap.NetworkInfoRequest))
},
)
if err != nil {
return nil, err
}
return resp.(*netmap.NetworkInfoResponse), nil
}
func (s *responseService) Snapshot(ctx context.Context, req *netmap.SnapshotRequest) (*netmap.SnapshotResponse, error) {
resp, err := s.respSvc.HandleUnaryRequest(ctx, req,
func(ctx context.Context, req any) (util.ResponseMessage, error) {
return s.svc.Snapshot(ctx, req.(*netmap.SnapshotRequest))
},
)
if err != nil {
return nil, err
}
return resp.(*netmap.SnapshotResponse), nil
}

View file

@ -24,49 +24,28 @@ func NewSignService(key *ecdsa.PrivateKey, svc Server) Server {
func (s *signService) LocalNodeInfo(
ctx context.Context,
req *netmap.LocalNodeInfoRequest) (*netmap.LocalNodeInfoResponse, error) {
resp, err := s.sigSvc.HandleUnaryRequest(ctx, req,
func(ctx context.Context, req any) (util.ResponseMessage, error) {
return s.svc.LocalNodeInfo(ctx, req.(*netmap.LocalNodeInfoRequest))
},
func() util.ResponseMessage {
return new(netmap.LocalNodeInfoResponse)
},
)
if err != nil {
return nil, err
if err := s.sigSvc.VerifyRequest(req); err != nil {
resp := new(netmap.LocalNodeInfoResponse)
return resp, s.sigSvc.SignResponse(util.IsStatusSupported(req), resp, err)
}
return resp.(*netmap.LocalNodeInfoResponse), nil
resp, err := util.WrapResponse(s.svc.LocalNodeInfo(ctx, req))
return resp, s.sigSvc.SignResponse(util.IsStatusSupported(req), resp, err)
}
func (s *signService) NetworkInfo(ctx context.Context, req *netmap.NetworkInfoRequest) (*netmap.NetworkInfoResponse, error) {
resp, err := s.sigSvc.HandleUnaryRequest(ctx, req,
func(ctx context.Context, req any) (util.ResponseMessage, error) {
return s.svc.NetworkInfo(ctx, req.(*netmap.NetworkInfoRequest))
},
func() util.ResponseMessage {
return new(netmap.NetworkInfoResponse)
},
)
if err != nil {
return nil, err
if err := s.sigSvc.VerifyRequest(req); err != nil {
resp := new(netmap.NetworkInfoResponse)
return resp, s.sigSvc.SignResponse(util.IsStatusSupported(req), resp, err)
}
return resp.(*netmap.NetworkInfoResponse), nil
resp, err := util.WrapResponse(s.svc.NetworkInfo(ctx, req))
return resp, s.sigSvc.SignResponse(util.IsStatusSupported(req), resp, err)
}
func (s *signService) Snapshot(ctx context.Context, req *netmap.SnapshotRequest) (*netmap.SnapshotResponse, error) {
resp, err := s.sigSvc.HandleUnaryRequest(ctx, req,
func(ctx context.Context, req any) (util.ResponseMessage, error) {
return s.svc.Snapshot(ctx, req.(*netmap.SnapshotRequest))
},
func() util.ResponseMessage {
return new(netmap.SnapshotResponse)
},
)
if err != nil {
return nil, err
if err := s.sigSvc.VerifyRequest(req); err != nil {
resp := new(netmap.SnapshotResponse)
return resp, s.sigSvc.SignResponse(util.IsStatusSupported(req), resp, err)
}
return resp.(*netmap.SnapshotResponse), nil
resp, err := util.WrapResponse(s.svc.Snapshot(ctx, req))
return resp, s.sigSvc.SignResponse(util.IsStatusSupported(req), resp, err)
}

View file

@ -45,31 +45,30 @@ type headerSource struct {
incompleteObjectHeaders bool
}
func defaultCfg() *cfg {
return &cfg{
storage: new(localStorage),
}
func (c *cfg) initDefault() {
c.storage = (*localStorage)(nil)
}
func NewMessageHeaderSource(opts ...Option) (eaclSDK.TypedHeaderSource, error) {
cfg := defaultCfg()
var c cfg
c.initDefault()
for i := range opts {
opts[i](cfg)
opts[i](&c)
}
if cfg.msg == nil {
if c.msg == nil {
return nil, errors.New("message is not provided")
}
var res headerSource
err := cfg.readObjectHeaders(&res)
err := c.readObjectHeaders(&res)
if err != nil {
return nil, err
}
res.requestHeaders = requestHeaders(cfg.msg)
res.requestHeaders = requestHeaders(c.msg)
return res, nil
}
@ -102,20 +101,20 @@ func requestHeaders(msg xHeaderSource) []eaclSDK.Header {
var errMissingOID = errors.New("object ID is missing")
// nolint: funlen
func (h *cfg) readObjectHeaders(dst *headerSource) error {
switch m := h.msg.(type) {
func (c *cfg) readObjectHeaders(dst *headerSource) error {
switch m := c.msg.(type) {
default:
panic(fmt.Sprintf("unexpected message type %T", h.msg))
panic(fmt.Sprintf("unexpected message type %T", c.msg))
case requestXHeaderSource:
switch req := m.req.(type) {
case
*objectV2.GetRequest,
*objectV2.HeadRequest:
if h.obj == nil {
if c.obj == nil {
return errMissingOID
}
objHeaders, completed := h.localObjectHeaders(h.cnr, h.obj)
objHeaders, completed := c.localObjectHeaders(c.cnr, c.obj)
dst.objectHeaders = objHeaders
dst.incompleteObjectHeaders = !completed
@ -123,18 +122,18 @@ func (h *cfg) readObjectHeaders(dst *headerSource) error {
*objectV2.GetRangeRequest,
*objectV2.GetRangeHashRequest,
*objectV2.DeleteRequest:
if h.obj == nil {
if c.obj == nil {
return errMissingOID
}
dst.objectHeaders = addressHeaders(h.cnr, h.obj)
dst.objectHeaders = addressHeaders(c.cnr, c.obj)
case *objectV2.PutRequest:
if v, ok := req.GetBody().GetObjectPart().(*objectV2.PutObjectPartInit); ok {
oV2 := new(objectV2.Object)
oV2.SetObjectID(v.GetObjectID())
oV2.SetHeader(v.GetHeader())
dst.objectHeaders = headersFromObject(object.NewFromV2(oV2), h.cnr, h.obj)
dst.objectHeaders = headersFromObject(object.NewFromV2(oV2), c.cnr, c.obj)
}
case *objectV2.SearchRequest:
cnrV2 := req.GetBody().GetContainerID()
@ -151,7 +150,7 @@ func (h *cfg) readObjectHeaders(dst *headerSource) error {
case responseXHeaderSource:
switch resp := m.resp.(type) {
default:
objectHeaders, completed := h.localObjectHeaders(h.cnr, h.obj)
objectHeaders, completed := c.localObjectHeaders(c.cnr, c.obj)
dst.objectHeaders = objectHeaders
dst.incompleteObjectHeaders = !completed
@ -161,7 +160,7 @@ func (h *cfg) readObjectHeaders(dst *headerSource) error {
oV2.SetObjectID(v.GetObjectID())
oV2.SetHeader(v.GetHeader())
dst.objectHeaders = headersFromObject(object.NewFromV2(oV2), h.cnr, h.obj)
dst.objectHeaders = headersFromObject(object.NewFromV2(oV2), c.cnr, c.obj)
}
case *objectV2.HeadResponse:
oV2 := new(objectV2.Object)
@ -173,7 +172,7 @@ func (h *cfg) readObjectHeaders(dst *headerSource) error {
hdr = new(objectV2.Header)
var idV2 refsV2.ContainerID
h.cnr.WriteToV2(&idV2)
c.cnr.WriteToV2(&idV2)
hdr.SetContainerID(&idV2)
hdr.SetVersion(v.GetVersion())
@ -187,20 +186,20 @@ func (h *cfg) readObjectHeaders(dst *headerSource) error {
oV2.SetHeader(hdr)
dst.objectHeaders = headersFromObject(object.NewFromV2(oV2), h.cnr, h.obj)
dst.objectHeaders = headersFromObject(object.NewFromV2(oV2), c.cnr, c.obj)
}
}
return nil
}
func (h *cfg) localObjectHeaders(cnr cid.ID, idObj *oid.ID) ([]eaclSDK.Header, bool) {
func (c *cfg) localObjectHeaders(cnr cid.ID, idObj *oid.ID) ([]eaclSDK.Header, bool) {
if idObj != nil {
var addr oid.Address
addr.SetContainer(cnr)
addr.SetObject(*idObj)
obj, err := h.storage.Head(addr)
obj, err := c.storage.Head(addr)
if err == nil {
return headersFromObject(obj, cnr, idObj), true
}

View file

@ -8,14 +8,12 @@ import (
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
)
type localStorage struct {
ls *engine.StorageEngine
}
type localStorage engine.StorageEngine
func (s *localStorage) Head(addr oid.Address) (*objectSDK.Object, error) {
if s.ls == nil {
if s == nil {
return nil, io.ErrUnexpectedEOF
}
return engine.Head(s.ls, addr)
return engine.Head((*engine.StorageEngine)(s), addr)
}

View file

@ -14,9 +14,7 @@ func WithObjectStorage(v ObjectStorage) Option {
func WithLocalObjectStorage(v *engine.StorageEngine) Option {
return func(c *cfg) {
c.storage = &localStorage{
ls: v,
}
c.storage = (*localStorage)(v)
}
}

View file

@ -21,7 +21,7 @@ import (
// Service checks basic ACL rules.
type Service struct {
*cfg
cfg
c senderClassifier
}
@ -72,18 +72,17 @@ type cfg struct {
next object.ServiceServer
}
func defaultCfg() *cfg {
return &cfg{
log: &logger.Logger{Logger: zap.L()},
}
func (c *cfg) initDefault() {
c.log = &logger.Logger{Logger: zap.L()}
}
// New is a constructor for object ACL checking service.
func New(opts ...Option) Service {
cfg := defaultCfg()
func New(opts ...Option) *Service {
var s Service
s.cfg.initDefault()
for i := range opts {
opts[i](cfg)
opts[i](&s.cfg)
}
panicOnNil := func(v any, name string) {
@ -92,20 +91,18 @@ func New(opts ...Option) Service {
}
}
panicOnNil(cfg.next, "next Service")
panicOnNil(cfg.nm, "netmap client")
panicOnNil(cfg.irFetcher, "inner Ring fetcher")
panicOnNil(cfg.checker, "acl checker")
panicOnNil(cfg.containers, "container source")
panicOnNil(s.cfg.next, "next Service")
panicOnNil(s.cfg.nm, "netmap client")
panicOnNil(s.cfg.irFetcher, "inner Ring fetcher")
panicOnNil(s.cfg.checker, "acl checker")
panicOnNil(s.cfg.containers, "container source")
return Service{
cfg: cfg,
c: senderClassifier{
log: cfg.log,
innerRing: cfg.irFetcher,
netmap: cfg.nm,
},
s.c = senderClassifier{
log: s.cfg.log,
innerRing: s.cfg.irFetcher,
netmap: s.cfg.nm,
}
return &s
}
// Get implements ServiceServer interface, makes ACL checks and calls

View file

@ -5,9 +5,7 @@ import (
"strconv"
objectV2 "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/object"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/util"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger"
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
"go.uber.org/zap"
@ -45,37 +43,17 @@ const (
func (exec *execCtx) setLogger(l *logger.Logger) {
exec.log = &logger.Logger{Logger: l.With(
zap.String("request", "DELETE"),
zap.Stringer("address", exec.address()),
zap.Bool("local", exec.isLocal()),
zap.Stringer("address", exec.prm.addr),
zap.Bool("local", exec.prm.common.LocalOnly()),
zap.Bool("with session", exec.prm.common.SessionToken() != nil),
zap.Bool("with bearer", exec.prm.common.BearerToken() != nil),
)}
}
func (exec execCtx) context() context.Context {
return exec.ctx
}
func (exec execCtx) isLocal() bool {
return exec.prm.common.LocalOnly()
}
func (exec *execCtx) address() oid.Address {
return exec.prm.addr
}
func (exec *execCtx) containerID() cid.ID {
return exec.prm.addr.Container()
}
func (exec *execCtx) commonParameters() *util.CommonPrm {
return exec.prm.common
}
func (exec *execCtx) newAddress(id oid.ID) oid.Address {
var a oid.Address
a.SetObject(id)
a.SetContainer(exec.containerID())
a.SetContainer(exec.prm.addr.Container())
return a
}
@ -241,11 +219,11 @@ func (exec *execCtx) initTombstoneObject() bool {
}
exec.tombstoneObj = object.New()
exec.tombstoneObj.SetContainerID(exec.containerID())
exec.tombstoneObj.SetContainerID(exec.prm.addr.Container())
exec.tombstoneObj.SetType(object.TypeTombstone)
exec.tombstoneObj.SetPayload(payload)
tokenSession := exec.commonParameters().SessionToken()
tokenSession := exec.prm.common.SessionToken()
if tokenSession != nil {
issuer := tokenSession.Issuer()
exec.tombstoneObj.SetOwnerID(&issuer)

View file

@ -36,7 +36,7 @@ func (exec *execCtx) formTombstone() (ok bool) {
exec.tombstone.SetExpirationEpoch(
exec.svc.netInfo.CurrentEpoch() + tsLifetime,
)
exec.addMembers([]oid.ID{exec.address().Object()})
exec.addMembers([]oid.ID{exec.prm.addr.Object()})
exec.log.Debug("forming split info...")

View file

@ -15,7 +15,7 @@ import (
// Service utility serving requests of Object.Get service.
type Service struct {
*cfg
cfg
}
// Option is a Service's constructor option.
@ -60,24 +60,21 @@ type cfg struct {
keyStorage *util.KeyStorage
}
func defaultCfg() *cfg {
return &cfg{
log: &logger.Logger{Logger: zap.L()},
}
func (c *cfg) initDefault() {
c.log = &logger.Logger{Logger: zap.L()}
}
// New creates, initializes and returns utility serving
// Object.Get service requests.
func New(opts ...Option) *Service {
c := defaultCfg()
var s Service
s.cfg.initDefault()
for i := range opts {
opts[i](c)
opts[i](&s.cfg)
}
return &Service{
cfg: c,
}
return &s
}
// WithLogger returns option to specify Delete service's logger.

View file

@ -24,12 +24,12 @@ func (w *headSvcWrapper) headAddress(exec *execCtx, addr oid.Address) (*object.O
wr := getsvc.NewSimpleObjectWriter()
p := getsvc.HeadPrm{}
p.SetCommonParameters(exec.commonParameters())
p.SetCommonParameters(exec.prm.common)
p.SetHeaderWriter(wr)
p.WithRawFlag(true)
p.WithAddress(addr)
err := (*getsvc.Service)(w).Head(exec.context(), p)
err := (*getsvc.Service)(w).Head(exec.ctx, p)
if err != nil {
return nil, err
}
@ -38,7 +38,7 @@ func (w *headSvcWrapper) headAddress(exec *execCtx, addr oid.Address) (*object.O
}
func (w *headSvcWrapper) splitInfo(exec *execCtx) (*object.SplitInfo, error) {
_, err := w.headAddress(exec, exec.address())
_, err := w.headAddress(exec, exec.prm.addr)
var errSplitInfo *object.SplitInfoError
@ -89,11 +89,11 @@ func (w *searchSvcWrapper) splitMembers(exec *execCtx) ([]oid.ID, error) {
p := searchsvc.Prm{}
p.SetWriter(wr)
p.SetCommonParameters(exec.commonParameters())
p.WithContainerID(exec.containerID())
p.SetCommonParameters(exec.prm.common)
p.WithContainerID(exec.prm.addr.Container())
p.WithSearchFilters(fs)
err := (*searchsvc.Service)(w).Search(exec.context(), p)
err := (*searchsvc.Service)(w).Search(exec.ctx, p)
if err != nil {
return nil, err
}
@ -108,7 +108,7 @@ func (s *simpleIDWriter) WriteIDs(ids []oid.ID) error {
}
func (w *putSvcWrapper) put(exec *execCtx) (*oid.ID, error) {
streamer, err := (*putsvc.Service)(w).Put(exec.context())
streamer, err := (*putsvc.Service)(w).Put(exec.ctx)
if err != nil {
return nil, err
}
@ -116,7 +116,7 @@ func (w *putSvcWrapper) put(exec *execCtx) (*oid.ID, error) {
payload := exec.tombstoneObj.Payload()
initPrm := new(putsvc.PutInitPrm).
WithCommonPrm(exec.commonParameters()).
WithCommonPrm(exec.prm.common).
WithObject(exec.tombstoneObj.CutPayload())
err = streamer.Init(initPrm)

View file

@ -9,7 +9,7 @@ import (
// Service implements Delete operation of Object service v2.
type Service struct {
*cfg
cfg
}
// Option represents Service constructor option.
@ -21,15 +21,13 @@ type cfg struct {
// NewService constructs Service instance from provided options.
func NewService(opts ...Option) *Service {
c := new(cfg)
var s Service
for i := range opts {
opts[i](c)
opts[i](&s.cfg)
}
return &Service{
cfg: c,
}
return &s
}
// Delete calls internal service.

View file

@ -36,26 +36,26 @@ func (exec *execCtx) assemble() {
exec.log.Debug("trying to assemble the object...")
assembler := newAssembler(exec.address(), exec.splitInfo(), exec.ctxRange(), exec)
assembler := newAssembler(exec.prm.addr, exec.splitInfo, exec.prm.rng, exec)
exec.log.Debug("assembling splitted object...",
zap.Stringer("address", exec.address()),
zap.Uint64("range_offset", exec.ctxRange().GetOffset()),
zap.Uint64("range_length", exec.ctxRange().GetLength()),
zap.Stringer("address", exec.prm.addr),
zap.Uint64("range_offset", exec.prm.rng.GetOffset()),
zap.Uint64("range_length", exec.prm.rng.GetLength()),
)
defer exec.log.Debug("assembling splitted object completed",
zap.Stringer("address", exec.address()),
zap.Uint64("range_offset", exec.ctxRange().GetOffset()),
zap.Uint64("range_length", exec.ctxRange().GetLength()),
zap.Stringer("address", exec.prm.addr),
zap.Uint64("range_offset", exec.prm.rng.GetOffset()),
zap.Uint64("range_length", exec.prm.rng.GetLength()),
)
obj, err := assembler.Assemble(exec.context(), exec.prm.objWriter)
obj, err := assembler.Assemble(exec.ctx, exec.prm.objWriter)
if err != nil {
exec.log.Warn("failed to assemble splitted object",
zap.Error(err),
zap.Stringer("address", exec.address()),
zap.Uint64("range_offset", exec.ctxRange().GetOffset()),
zap.Uint64("range_length", exec.ctxRange().GetLength()),
zap.Stringer("address", exec.prm.addr),
zap.Uint64("range_offset", exec.prm.rng.GetOffset()),
zap.Uint64("range_length", exec.prm.rng.GetLength()),
)
}
@ -98,7 +98,7 @@ func equalAddresses(a, b oid.Address) bool {
func (exec *execCtx) HeadObject(ctx context.Context, id oid.ID) (*objectSDK.Object, error) {
p := exec.prm
p.common = p.common.WithLocalOnly(false)
p.addr.SetContainer(exec.containerID())
p.addr.SetContainer(exec.prm.addr.Container())
p.addr.SetObject(id)
prm := HeadPrm{
@ -108,7 +108,7 @@ func (exec *execCtx) HeadObject(ctx context.Context, id oid.ID) (*objectSDK.Obje
w := NewSimpleObjectWriter()
prm.SetHeaderWriter(w)
//nolint: contextcheck
err := exec.svc.Head(exec.context(), prm)
err := exec.svc.Head(exec.ctx, prm)
if err != nil {
return nil, err
@ -125,11 +125,11 @@ func (exec *execCtx) GetObject(ctx context.Context, id oid.ID, rng *objectSDK.Ra
p.objWriter = w
p.SetRange(rng)
p.addr.SetContainer(exec.containerID())
p.addr.SetContainer(exec.prm.addr.Container())
p.addr.SetObject(id)
//nolint: contextcheck
statusError := exec.svc.get(exec.context(), p.commonPrm, withPayloadRange(rng))
statusError := exec.svc.get(exec.ctx, p.commonPrm, withPayloadRange(rng))
if statusError.err != nil {
return nil, statusError.err

View file

@ -8,12 +8,12 @@ import (
)
func (exec *execCtx) executeOnContainer() {
if exec.isLocal() {
if exec.prm.common.LocalOnly() {
exec.log.Debug("return result directly")
return
}
lookupDepth := exec.netmapLookupDepth()
lookupDepth := exec.prm.common.NetmapLookupDepth()
exec.log.Debug("trying to execute in container...",
zap.Uint64("netmap lookup depth", lookupDepth),
@ -47,12 +47,12 @@ func (exec *execCtx) processCurrentEpoch() bool {
zap.Uint64("number", exec.curProcEpoch),
)
traverser, ok := exec.generateTraverser(exec.address())
traverser, ok := exec.generateTraverser(exec.prm.addr)
if !ok {
return true
}
ctx, cancel := context.WithCancel(exec.context())
ctx, cancel := context.WithCancel(exec.ctx)
defer cancel()
exec.status = statusUndefined

View file

@ -8,7 +8,6 @@ import (
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/util"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object_manager/placement"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger"
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
"go.uber.org/zap"
@ -29,13 +28,13 @@ type execCtx struct {
statusError
infoSplit *objectSDK.SplitInfo
splitInfo *objectSDK.SplitInfo
log *logger.Logger
collectedObject *objectSDK.Object
head bool
headOnly bool
curProcEpoch uint64
}
@ -52,7 +51,7 @@ const (
func headOnly() execOption {
return func(c *execCtx) {
c.head = true
c.headOnly = true
}
}
@ -64,38 +63,22 @@ func withPayloadRange(r *objectSDK.Range) execOption {
func (exec *execCtx) setLogger(l *logger.Logger) {
req := "GET"
if exec.headOnly() {
if exec.headOnly {
req = "HEAD"
} else if exec.ctxRange() != nil {
} else if exec.prm.rng != nil {
req = "GET_RANGE"
}
exec.log = &logger.Logger{Logger: l.With(
zap.String("request", req),
zap.Stringer("address", exec.address()),
zap.Bool("raw", exec.isRaw()),
zap.Bool("local", exec.isLocal()),
zap.Stringer("address", exec.prm.addr),
zap.Bool("raw", exec.prm.raw),
zap.Bool("local", exec.prm.common.LocalOnly()),
zap.Bool("with session", exec.prm.common.SessionToken() != nil),
zap.Bool("with bearer", exec.prm.common.BearerToken() != nil),
)}
}
func (exec execCtx) context() context.Context {
return exec.ctx
}
func (exec execCtx) isLocal() bool {
return exec.prm.common.LocalOnly()
}
func (exec execCtx) isRaw() bool {
return exec.prm.raw
}
func (exec execCtx) address() oid.Address {
return exec.prm.addr
}
func (exec execCtx) key() (*ecdsa.PrivateKey, error) {
if exec.prm.signerKey != nil {
// the key has already been requested and
@ -116,40 +99,16 @@ func (exec execCtx) key() (*ecdsa.PrivateKey, error) {
}
func (exec *execCtx) canAssemble() bool {
return !exec.isRaw() && !exec.headOnly()
}
func (exec *execCtx) splitInfo() *objectSDK.SplitInfo {
return exec.infoSplit
}
func (exec *execCtx) containerID() cid.ID {
return exec.address().Container()
}
func (exec *execCtx) ctxRange() *objectSDK.Range {
return exec.prm.rng
}
func (exec *execCtx) headOnly() bool {
return exec.head
}
func (exec *execCtx) netmapEpoch() uint64 {
return exec.prm.common.NetmapEpoch()
}
func (exec *execCtx) netmapLookupDepth() uint64 {
return exec.prm.common.NetmapLookupDepth()
return !exec.prm.raw && !exec.headOnly
}
func (exec *execCtx) initEpoch() bool {
exec.curProcEpoch = exec.netmapEpoch()
exec.curProcEpoch = exec.prm.common.NetmapEpoch()
if exec.curProcEpoch > 0 {
return true
}
e, err := exec.svc.currentEpochReceiver.currentEpoch()
e, err := exec.svc.epochSource.Epoch()
switch {
default:
@ -218,12 +177,12 @@ func mergeSplitInfo(dst, src *objectSDK.SplitInfo) {
}
func (exec *execCtx) writeCollectedHeader() bool {
if exec.ctxRange() != nil {
if exec.prm.rng != nil {
return true
}
err := exec.prm.objWriter.WriteHeader(
exec.context(),
exec.ctx,
exec.collectedObject.CutPayload(),
)
@ -244,11 +203,11 @@ func (exec *execCtx) writeCollectedHeader() bool {
}
func (exec *execCtx) writeObjectPayload(obj *objectSDK.Object) bool {
if exec.headOnly() {
if exec.headOnly {
return true
}
err := exec.prm.objWriter.WriteChunk(exec.context(), obj.Payload())
err := exec.prm.objWriter.WriteChunk(exec.ctx, obj.Payload())
switch {
default:
@ -272,12 +231,6 @@ func (exec *execCtx) writeCollectedObject() {
}
}
// isForwardingEnabled returns true if common execution
// parameters has request forwarding closure set.
func (exec execCtx) isForwardingEnabled() bool {
return exec.prm.forwarder != nil
}
// disableForwarding removes request forwarding closure from common
// parameters, so it won't be inherited in new execution contexts.
func (exec *execCtx) disableForwarding() {

View file

@ -69,7 +69,7 @@ func (s *Service) get(ctx context.Context, prm commonPrm, opts ...execOption) st
prm: RangePrm{
commonPrm: prm,
},
infoSplit: object.NewSplitInfo(),
splitInfo: object.NewSplitInfo(),
}
for i := range opts {

View file

@ -56,7 +56,7 @@ type testClient struct {
type testEpochReceiver uint64
func (e testEpochReceiver) currentEpoch() (uint64, error) {
func (e testEpochReceiver) Epoch() (uint64, error) {
return uint64(e), nil
}
@ -118,7 +118,7 @@ func newTestClient() *testClient {
}
func (c *testClient) getObject(exec *execCtx, _ client.NodeInfo) (*objectSDK.Object, error) {
v, ok := c.results[exec.address().EncodeToString()]
v, ok := c.results[exec.prm.addr.EncodeToString()]
if !ok {
var errNotFound apistatus.ObjectNotFound
@ -129,7 +129,7 @@ func (c *testClient) getObject(exec *execCtx, _ client.NodeInfo) (*objectSDK.Obj
return nil, v.err
}
return cutToRange(v.obj, exec.ctxRange()), nil
return cutToRange(v.obj, exec.prm.rng), nil
}
func (c *testClient) addResult(addr oid.Address, obj *objectSDK.Object, err error) {
@ -143,7 +143,7 @@ func (s *testStorage) get(exec *execCtx) (*objectSDK.Object, error) {
var (
ok bool
obj *objectSDK.Object
sAddr = exec.address().EncodeToString()
sAddr = exec.prm.addr.EncodeToString()
)
if _, ok = s.inhumed[sAddr]; ok {
@ -157,7 +157,7 @@ func (s *testStorage) get(exec *execCtx) (*objectSDK.Object, error) {
}
if obj, ok = s.phy[sAddr]; ok {
return cutToRange(obj, exec.ctxRange()), nil
return cutToRange(obj, exec.prm.rng), nil
}
var errNotFound apistatus.ObjectNotFound
@ -245,7 +245,7 @@ func TestGetLocalOnly(t *testing.T) {
ctx := context.Background()
newSvc := func(storage *testStorage) *Service {
svc := &Service{cfg: new(cfg)}
svc := &Service{}
svc.log = test.NewLogger(false)
svc.localStorage = storage
@ -506,7 +506,7 @@ func TestGetRemoteSmall(t *testing.T) {
container.CalculateID(&idCnr, cnr)
newSvc := func(b *testPlacementBuilder, c *testClientCache) *Service {
svc := &Service{cfg: new(cfg)}
svc := &Service{}
svc.log = test.NewLogger(false)
svc.localStorage = newTestStorage()
@ -519,7 +519,7 @@ func TestGetRemoteSmall(t *testing.T) {
},
}
svc.clientCache = c
svc.currentEpochReceiver = testEpochReceiver(curEpoch)
svc.epochSource = testEpochReceiver(curEpoch)
return svc
}
@ -1639,7 +1639,7 @@ func TestGetFromPastEpoch(t *testing.T) {
c22 := newTestClient()
c22.addResult(addr, obj, nil)
svc := &Service{cfg: new(cfg)}
svc := &Service{}
svc.log = test.NewLogger(false)
svc.localStorage = newTestStorage()
@ -1670,7 +1670,7 @@ func TestGetFromPastEpoch(t *testing.T) {
},
}
svc.currentEpochReceiver = testEpochReceiver(curEpoch)
svc.epochSource = testEpochReceiver(curEpoch)
w := NewSimpleObjectWriter()

View file

@ -34,8 +34,8 @@ func (exec *execCtx) executeLocal() {
exec.err = errRemoved
case errors.As(err, &errSplitInfo):
exec.status = statusVIRTUAL
mergeSplitInfo(exec.splitInfo(), errSplitInfo.SplitInfo())
exec.err = objectSDK.NewSplitInfoError(exec.infoSplit)
mergeSplitInfo(exec.splitInfo, errSplitInfo.SplitInfo())
exec.err = objectSDK.NewSplitInfoError(exec.splitInfo)
case errors.As(err, &errOutOfRange):
exec.status = statusOutOfRange
exec.err = errOutOfRange

View file

@ -54,8 +54,8 @@ func (exec *execCtx) processNode(ctx context.Context, info client.NodeInfo) bool
exec.err = errOutOfRange
case errors.As(err, &errSplitInfo):
exec.status = statusVIRTUAL
mergeSplitInfo(exec.splitInfo(), errSplitInfo.SplitInfo())
exec.err = objectSDK.NewSplitInfoError(exec.infoSplit)
mergeSplitInfo(exec.splitInfo, errSplitInfo.SplitInfo())
exec.err = objectSDK.NewSplitInfoError(exec.splitInfo)
}
return exec.status != statusUndefined

View file

@ -15,7 +15,7 @@ import (
// Service utility serving requests of Object.Get service.
type Service struct {
*cfg
cfg
}
// Option is a Service's constructor option.
@ -40,33 +40,31 @@ type cfg struct {
GenerateTraverser(cid.ID, *oid.ID, uint64) (*placement.Traverser, error)
}
currentEpochReceiver interface {
currentEpoch() (uint64, error)
}
epochSource epochSource
keyStore *util.KeyStorage
}
func defaultCfg() *cfg {
return &cfg{
log: &logger.Logger{Logger: zap.L()},
localStorage: new(storageEngineWrapper),
clientCache: new(clientCacheWrapper),
type epochSource interface {
Epoch() (uint64, error)
}
func (c *cfg) initDefault() {
c.log = &logger.Logger{Logger: zap.L()}
c.clientCache = new(clientCacheWrapper)
}
// New creates, initializes and returns utility serving
// Object.Get service requests.
func New(opts ...Option) *Service {
c := defaultCfg()
var s Service
s.cfg.initDefault()
for i := range opts {
opts[i](c)
opts[i](&s.cfg)
}
return &Service{
cfg: c,
}
return &s
}
// WithLogger returns option to specify Get service's logger.
@ -80,7 +78,7 @@ func WithLogger(l *logger.Logger) Option {
// instance.
func WithLocalStorageEngine(e *engine.StorageEngine) Option {
return func(c *cfg) {
c.localStorage.(*storageEngineWrapper).engine = e
c.localStorage = (*storageEngineWrapper)(e)
}
}
@ -107,9 +105,7 @@ func WithTraverserGenerator(t *util.TraverserGenerator) Option {
// map storage to receive current network state.
func WithNetMapSource(nmSrc netmap.Source) Option {
return func(c *cfg) {
c.currentEpochReceiver = &nmSrcWrapper{
nmSrc: nmSrc,
}
c.epochSource = nmSrc
}
}

View file

@ -7,7 +7,6 @@ import (
"io"
coreclient "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/client"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/netmap"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/engine"
internal "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/internal/client"
internalclient "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/internal/client"
@ -29,9 +28,7 @@ type clientWrapper struct {
client coreclient.MultiAddressClient
}
type storageEngineWrapper struct {
engine *engine.StorageEngine
}
type storageEngineWrapper engine.StorageEngine
type partWriter struct {
ObjectWriter
@ -45,10 +42,6 @@ type hasherWrapper struct {
hash io.Writer
}
type nmSrcWrapper struct {
nmSrc netmap.Source
}
func NewSimpleObjectWriter() *SimpleObjectWriter {
return &SimpleObjectWriter{
obj: object.New(),
@ -89,7 +82,7 @@ func (c *clientCacheWrapper) get(info coreclient.NodeInfo) (getClient, error) {
// nolint: funlen
func (c *clientWrapper) getObject(exec *execCtx, info coreclient.NodeInfo) (*object.Object, error) {
if exec.isForwardingEnabled() {
if exec.prm.forwarder != nil {
return exec.prm.forwarder(info, c.client)
}
@ -98,20 +91,20 @@ func (c *clientWrapper) getObject(exec *execCtx, info coreclient.NodeInfo) (*obj
return nil, err
}
if exec.headOnly() {
if exec.headOnly {
var prm internalclient.HeadObjectPrm
prm.SetContext(exec.context())
prm.SetContext(exec.ctx)
prm.SetClient(c.client)
prm.SetTTL(exec.prm.common.TTL())
prm.SetNetmapEpoch(exec.curProcEpoch)
prm.SetAddress(exec.address())
prm.SetAddress(exec.prm.addr)
prm.SetPrivateKey(key)
prm.SetSessionToken(exec.prm.common.SessionToken())
prm.SetBearerToken(exec.prm.common.BearerToken())
prm.SetXHeaders(exec.prm.common.XHeaders())
if exec.isRaw() {
if exec.prm.raw {
prm.SetRawFlag()
}
@ -124,21 +117,21 @@ func (c *clientWrapper) getObject(exec *execCtx, info coreclient.NodeInfo) (*obj
}
// we don't specify payload writer because we accumulate
// the object locally (even huge).
if rng := exec.ctxRange(); rng != nil {
if rng := exec.prm.rng; rng != nil {
var prm internalclient.PayloadRangePrm
prm.SetContext(exec.context())
prm.SetContext(exec.ctx)
prm.SetClient(c.client)
prm.SetTTL(exec.prm.common.TTL())
prm.SetNetmapEpoch(exec.curProcEpoch)
prm.SetAddress(exec.address())
prm.SetAddress(exec.prm.addr)
prm.SetPrivateKey(key)
prm.SetSessionToken(exec.prm.common.SessionToken())
prm.SetBearerToken(exec.prm.common.BearerToken())
prm.SetXHeaders(exec.prm.common.XHeaders())
prm.SetRange(rng)
if exec.isRaw() {
if exec.prm.raw {
prm.SetRawFlag()
}
@ -175,17 +168,17 @@ func (c *clientWrapper) getObject(exec *execCtx, info coreclient.NodeInfo) (*obj
func (c *clientWrapper) get(exec *execCtx, key *ecdsa.PrivateKey) (*object.Object, error) {
var prm internalclient.GetObjectPrm
prm.SetContext(exec.context())
prm.SetContext(exec.ctx)
prm.SetClient(c.client)
prm.SetTTL(exec.prm.common.TTL())
prm.SetNetmapEpoch(exec.curProcEpoch)
prm.SetAddress(exec.address())
prm.SetAddress(exec.prm.addr)
prm.SetPrivateKey(key)
prm.SetSessionToken(exec.prm.common.SessionToken())
prm.SetBearerToken(exec.prm.common.BearerToken())
prm.SetXHeaders(exec.prm.common.XHeaders())
if exec.isRaw() {
if exec.prm.raw {
prm.SetRawFlag()
}
@ -197,24 +190,25 @@ func (c *clientWrapper) get(exec *execCtx, key *ecdsa.PrivateKey) (*object.Objec
return res.Object(), nil
}
func (e *storageEngineWrapper) get(exec *execCtx) (*object.Object, error) {
if exec.headOnly() {
func (w *storageEngineWrapper) get(exec *execCtx) (*object.Object, error) {
e := (*engine.StorageEngine)(w)
if exec.headOnly {
var headPrm engine.HeadPrm
headPrm.WithAddress(exec.address())
headPrm.WithRaw(exec.isRaw())
headPrm.WithAddress(exec.prm.addr)
headPrm.WithRaw(exec.prm.raw)
r, err := e.engine.Head(headPrm)
r, err := e.Head(headPrm)
if err != nil {
return nil, err
}
return r.Header(), nil
} else if rng := exec.ctxRange(); rng != nil {
} else if rng := exec.prm.rng; rng != nil {
var getRange engine.RngPrm
getRange.WithAddress(exec.address())
getRange.WithAddress(exec.prm.addr)
getRange.WithPayloadRange(rng)
r, err := e.engine.GetRange(getRange)
r, err := e.GetRange(getRange)
if err != nil {
return nil, err
}
@ -222,9 +216,9 @@ func (e *storageEngineWrapper) get(exec *execCtx) (*object.Object, error) {
return r.Object(), nil
} else {
var getPrm engine.GetPrm
getPrm.WithAddress(exec.address())
getPrm.WithAddress(exec.prm.addr)
r, err := e.engine.Get(getPrm)
r, err := e.Get(getPrm)
if err != nil {
return nil, err
}
@ -252,7 +246,3 @@ func (h *hasherWrapper) WriteChunk(_ context.Context, p []byte) error {
_, err := h.hash.Write(p)
return err
}
func (n *nmSrcWrapper) currentEpoch() (uint64, error) {
return n.nmSrc.Epoch()
}

View file

@ -13,7 +13,7 @@ import (
// Service implements Get operation of Object service v2.
type Service struct {
*cfg
cfg
}
// Option represents Service constructor option.
@ -27,15 +27,13 @@ type cfg struct {
// NewService constructs Service instance from provided options.
func NewService(opts ...Option) *Service {
c := new(cfg)
var s Service
for i := range opts {
opts[i](c)
opts[i](&s.cfg)
}
return &Service{
cfg: c,
}
return &s
}
// Get calls internal service and returns v2 object stream.

View file

@ -17,14 +17,11 @@ import (
type preparedObjectTarget interface {
WriteObject(*objectSDK.Object, object.ContentMeta) error
Close() (*transformer.AccessIdentifiers, error)
}
type distributedTarget struct {
traversal traversal
remotePool, localPool util.WorkerPool
obj *objectSDK.Object
objMeta object.ContentMeta
@ -32,7 +29,7 @@ type distributedTarget struct {
nodeTargetInitializer func(nodeDesc) preparedObjectTarget
isLocalKey func([]byte) bool
getWorkerPool func([]byte) (util.WorkerPool, bool)
relay func(nodeDesc) error
@ -48,9 +45,6 @@ type traversal struct {
// need of additional broadcast after the object is saved
extraBroadcastEnabled bool
// mtx protects mExclude map.
mtx sync.RWMutex
// container nodes which was processed during the primary object placement
mExclude map[string]struct{}
}
@ -76,21 +70,17 @@ func (x *traversal) submitProcessed(n placement.Node) {
if x.extraBroadcastEnabled {
key := string(n.PublicKey())
x.mtx.Lock()
if x.mExclude == nil {
x.mExclude = make(map[string]struct{}, 1)
}
x.mExclude[key] = struct{}{}
x.mtx.Unlock()
}
}
// checks if specified node was processed during the primary object placement.
func (x *traversal) processed(n placement.Node) bool {
x.mtx.RLock()
_, ok := x.mExclude[string(n.PublicKey())]
x.mtx.RUnlock()
return ok
}
@ -156,10 +146,9 @@ func (t *distributedTarget) sendObject(node nodeDesc) error {
target := t.nodeTargetInitializer(node)
if err := target.WriteObject(t.obj, t.objMeta); err != nil {
err := target.WriteObject(t.obj, t.objMeta)
if err != nil {
return fmt.Errorf("could not write header: %w", err)
} else if _, err := target.Close(); err != nil {
return fmt.Errorf("could not close object stream: %w", err)
}
return nil
}
@ -196,27 +185,12 @@ loop:
addr := addrs[i]
isLocal := t.isLocalKey(addr.PublicKey())
var workerPool util.WorkerPool
if isLocal {
workerPool = t.localPool
} else {
workerPool = t.remotePool
}
workerPool, isLocal := t.getWorkerPool(addr.PublicKey())
if err := workerPool.Submit(func() {
defer wg.Done()
err := f(nodeDesc{local: isLocal, info: addr})
// mark the container node as processed in order to exclude it
// in subsequent container broadcast. Note that we don't
// process this node during broadcast if primary placement
// on it failed.
t.traversal.submitProcessed(addr)
if err != nil {
resErr.Store(err)
svcutil.LogServiceError(t.log, "PUT", addr.Addresses(), err)
@ -231,6 +205,12 @@ loop:
break loop
}
// mark the container node as processed in order to exclude it
// in subsequent container broadcast. Note that we don't
// process this node during broadcast if primary placement
// on it failed.
t.traversal.submitProcessed(addrs[i])
}
wg.Wait()

View file

@ -4,7 +4,6 @@ import (
"fmt"
objectCore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object_manager/transformer"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
)
@ -26,40 +25,26 @@ type ObjectStorage interface {
type localTarget struct {
storage ObjectStorage
obj *object.Object
meta objectCore.ContentMeta
}
func (t *localTarget) WriteObject(obj *object.Object, meta objectCore.ContentMeta) error {
t.obj = obj
t.meta = meta
return nil
}
func (t *localTarget) Close() (*transformer.AccessIdentifiers, error) {
switch t.meta.Type() {
func (t localTarget) WriteObject(obj *object.Object, meta objectCore.ContentMeta) error {
switch meta.Type() {
case object.TypeTombstone:
err := t.storage.Delete(objectCore.AddressOf(t.obj), t.meta.Objects())
err := t.storage.Delete(objectCore.AddressOf(obj), meta.Objects())
if err != nil {
return nil, fmt.Errorf("could not delete objects from tombstone locally: %w", err)
return fmt.Errorf("could not delete objects from tombstone locally: %w", err)
}
case object.TypeLock:
err := t.storage.Lock(objectCore.AddressOf(t.obj), t.meta.Objects())
err := t.storage.Lock(objectCore.AddressOf(obj), meta.Objects())
if err != nil {
return nil, fmt.Errorf("could not lock object from lock objects locally: %w", err)
return fmt.Errorf("could not lock object from lock objects locally: %w", err)
}
default:
// objects that do not change meta storage
}
if err := t.storage.Put(t.obj); err != nil {
return nil, fmt.Errorf("(%T) could not put object to local storage: %w", t, err)
if err := t.storage.Put(obj); err != nil {
return fmt.Errorf("(%T) could not put object to local storage: %w", t, err)
}
id, _ := t.obj.ID()
return new(transformer.AccessIdentifiers).
WithSelfID(id), nil
return nil
}

View file

@ -10,7 +10,6 @@ import (
objectcore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object"
internalclient "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/internal/client"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/util"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object_manager/transformer"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/netmap"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
)
@ -25,8 +24,6 @@ type remoteTarget struct {
nodeInfo clientcore.NodeInfo
obj *object.Object
clientConstructor ClientConstructor
}
@ -46,15 +43,9 @@ type RemotePutPrm struct {
}
func (t *remoteTarget) WriteObject(obj *object.Object, _ objectcore.ContentMeta) error {
t.obj = obj
return nil
}
func (t *remoteTarget) Close() (*transformer.AccessIdentifiers, error) {
c, err := t.clientConstructor.Get(t.nodeInfo)
if err != nil {
return nil, fmt.Errorf("(%T) could not create SDK client %s: %w", t, t.nodeInfo, err)
return fmt.Errorf("(%T) could not create SDK client %s: %w", t, t.nodeInfo, err)
}
var prm internalclient.PutObjectPrm
@ -65,15 +56,14 @@ func (t *remoteTarget) Close() (*transformer.AccessIdentifiers, error) {
prm.SetSessionToken(t.commonPrm.SessionToken())
prm.SetBearerToken(t.commonPrm.BearerToken())
prm.SetXHeaders(t.commonPrm.XHeaders())
prm.SetObject(t.obj)
prm.SetObject(obj)
res, err := internalclient.PutObject(prm)
_, err = internalclient.PutObject(prm)
if err != nil {
return nil, fmt.Errorf("(%T) could not put object to %s: %w", t, t.nodeInfo.AddressGroup(), err)
return fmt.Errorf("(%T) could not put object to %s: %w", t, t.nodeInfo.AddressGroup(), err)
}
return new(transformer.AccessIdentifiers).
WithSelfID(res.ID()), nil
return nil
}
// NewRemoteSender creates, initializes and returns new RemoteSender instance.
@ -120,9 +110,8 @@ func (s *RemoteSender) PutObject(ctx context.Context, p *RemotePutPrm) error {
return fmt.Errorf("parse client node info: %w", err)
}
if err := t.WriteObject(p.obj, objectcore.ContentMeta{}); err != nil {
return fmt.Errorf("(%T) could not send object header: %w", s, err)
} else if _, err := t.Close(); err != nil {
err = t.WriteObject(p.obj, objectcore.ContentMeta{})
if err != nil {
return fmt.Errorf("(%T) could not send object: %w", s, err)
}

View file

@ -22,7 +22,7 @@ type MaxSizeSource interface {
}
type Service struct {
*cfg
cfg
}
type Option func(*cfg)
@ -57,31 +57,28 @@ type cfg struct {
log *logger.Logger
}
func defaultCfg() *cfg {
return &cfg{
remotePool: util.NewPseudoWorkerPool(),
localPool: util.NewPseudoWorkerPool(),
log: &logger.Logger{Logger: zap.L()},
}
func (c *cfg) initDefault() {
c.remotePool = util.NewPseudoWorkerPool()
c.localPool = util.NewPseudoWorkerPool()
c.log = &logger.Logger{Logger: zap.L()}
}
func NewService(opts ...Option) *Service {
c := defaultCfg()
var s Service
s.cfg.initDefault()
for i := range opts {
opts[i](c)
opts[i](&s.cfg)
}
c.fmtValidator = object.NewFormatValidator(c.fmtValidatorOpts...)
s.cfg.fmtValidator = object.NewFormatValidator(s.cfg.fmtValidatorOpts...)
return &Service{
cfg: c,
}
return &s
}
func (p *Service) Put(ctx context.Context) (*Streamer, error) {
return &Streamer{
cfg: p.cfg,
cfg: &p.cfg,
ctx: ctx,
}, nil
}

View file

@ -11,6 +11,7 @@ import (
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/util"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object_manager/placement"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object_manager/transformer"
pkgutil "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util"
containerSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/user"
@ -222,11 +223,10 @@ func (p *Streamer) newCommonTarget(prm *PutInitPrm) transformer.ObjectTarget {
extraBroadcastEnabled: withBroadcast,
},
payload: getPayload(),
remotePool: p.remotePool,
localPool: p.localPool,
getWorkerPool: p.getWorkerPool,
nodeTargetInitializer: func(node nodeDesc) preparedObjectTarget {
if node.local {
return &localTarget{
return localTarget{
storage: p.localStore,
}
}
@ -245,8 +245,6 @@ func (p *Streamer) newCommonTarget(prm *PutInitPrm) transformer.ObjectTarget {
relay: relay,
fmt: p.fmtValidator,
log: p.log,
isLocalKey: p.netmapKeys.IsLocalKey,
}
}
@ -283,3 +281,10 @@ func (p *Streamer) Close() (*PutResponse, error) {
id: ids.SelfID(),
}, nil
}
func (p *Streamer) getWorkerPool(pub []byte) (pkgutil.WorkerPool, bool) {
if p.netmapKeys.IsLocalKey(pub) {
return p.localPool, true
}
return p.remotePool, false
}

View file

@ -11,7 +11,7 @@ import (
// Service implements Put operation of Object service v2.
type Service struct {
*cfg
cfg
}
// Option represents Service constructor option.
@ -24,15 +24,13 @@ type cfg struct {
// NewService constructs Service instance from provided options.
func NewService(opts ...Option) *Service {
c := new(cfg)
var s Service
for i := range opts {
opts[i](c)
opts[i](&s.cfg)
}
return &Service{
cfg: c,
}
return &s
}
// Put calls internal service and returns v2 object streamer.

View file

@ -5,7 +5,6 @@ import (
"fmt"
"git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/object"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/util"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/util/response"
)
@ -16,25 +15,26 @@ type ResponseService struct {
}
type searchStreamResponser struct {
util.ServerStream
SearchStream
respWriter util.ResponseMessageWriter
respSvc *response.Service
}
type getStreamResponser struct {
util.ServerStream
GetObjectStream
respWriter util.ResponseMessageWriter
respSvc *response.Service
}
type getRangeStreamResponser struct {
util.ServerStream
GetObjectRangeStream
respWriter util.ResponseMessageWriter
respSvc *response.Service
}
type putStreamResponser struct {
stream *response.ClientMessageStreamer
stream PutObjectStream
respSvc *response.Service
}
// NewResponseService returns object service instance that passes internal service
@ -47,29 +47,32 @@ func NewResponseService(objSvc ServiceServer, respSvc *response.Service) *Respon
}
func (s *getStreamResponser) Send(resp *object.GetResponse) error {
return s.respWriter(resp)
s.respSvc.SetMeta(resp)
return s.GetObjectStream.Send(resp)
}
func (s *ResponseService) Get(req *object.GetRequest, stream GetObjectStream) error {
return s.svc.Get(req, &getStreamResponser{
ServerStream: stream,
respWriter: s.respSvc.HandleServerStreamRequest(func(resp util.ResponseMessage) error {
return stream.Send(resp.(*object.GetResponse))
}),
GetObjectStream: stream,
respSvc: s.respSvc,
})
}
func (s *putStreamResponser) Send(req *object.PutRequest) error {
return s.stream.Send(req)
if err := s.stream.Send(req); err != nil {
return fmt.Errorf("could not send the request: %w", err)
}
return nil
}
func (s *putStreamResponser) CloseAndRecv() (*object.PutResponse, error) {
r, err := s.stream.CloseAndRecv()
if err != nil {
return nil, fmt.Errorf("(%T) could not receive response: %w", s, err)
return nil, fmt.Errorf("could not close stream and receive response: %w", err)
}
return r.(*object.PutResponse), nil
s.respSvc.SetMeta(r)
return r, nil
}
func (s *ResponseService) Put(ctx context.Context) (PutObjectStream, error) {
@ -79,78 +82,61 @@ func (s *ResponseService) Put(ctx context.Context) (PutObjectStream, error) {
}
return &putStreamResponser{
stream: s.respSvc.CreateRequestStreamer(
func(req any) error {
return stream.Send(req.(*object.PutRequest))
},
func() (util.ResponseMessage, error) {
return stream.CloseAndRecv()
},
),
stream: stream,
respSvc: s.respSvc,
}, nil
}
func (s *ResponseService) Head(ctx context.Context, req *object.HeadRequest) (*object.HeadResponse, error) {
resp, err := s.respSvc.HandleUnaryRequest(ctx, req,
func(ctx context.Context, req any) (util.ResponseMessage, error) {
return s.svc.Head(ctx, req.(*object.HeadRequest))
},
)
resp, err := s.svc.Head(ctx, req)
if err != nil {
return nil, err
}
return resp.(*object.HeadResponse), nil
s.respSvc.SetMeta(resp)
return resp, nil
}
func (s *searchStreamResponser) Send(resp *object.SearchResponse) error {
return s.respWriter(resp)
s.respSvc.SetMeta(resp)
return s.SearchStream.Send(resp)
}
func (s *ResponseService) Search(req *object.SearchRequest, stream SearchStream) error {
return s.svc.Search(req, &searchStreamResponser{
ServerStream: stream,
respWriter: s.respSvc.HandleServerStreamRequest(func(resp util.ResponseMessage) error {
return stream.Send(resp.(*object.SearchResponse))
}),
SearchStream: stream,
respSvc: s.respSvc,
})
}
func (s *ResponseService) Delete(ctx context.Context, req *object.DeleteRequest) (*object.DeleteResponse, error) {
resp, err := s.respSvc.HandleUnaryRequest(ctx, req,
func(ctx context.Context, req any) (util.ResponseMessage, error) {
return s.svc.Delete(ctx, req.(*object.DeleteRequest))
},
)
resp, err := s.svc.Delete(ctx, req)
if err != nil {
return nil, err
}
return resp.(*object.DeleteResponse), nil
s.respSvc.SetMeta(resp)
return resp, nil
}
func (s *getRangeStreamResponser) Send(resp *object.GetRangeResponse) error {
return s.respWriter(resp)
s.respSvc.SetMeta(resp)
return s.GetObjectRangeStream.Send(resp)
}
func (s *ResponseService) GetRange(req *object.GetRangeRequest, stream GetObjectRangeStream) error {
return s.svc.GetRange(req, &getRangeStreamResponser{
ServerStream: stream,
respWriter: s.respSvc.HandleServerStreamRequest(func(resp util.ResponseMessage) error {
return stream.Send(resp.(*object.GetRangeResponse))
}),
GetObjectRangeStream: stream,
respSvc: s.respSvc,
})
}
func (s *ResponseService) GetRangeHash(ctx context.Context, req *object.GetRangeHashRequest) (*object.GetRangeHashResponse, error) {
resp, err := s.respSvc.HandleUnaryRequest(ctx, req,
func(ctx context.Context, req any) (util.ResponseMessage, error) {
return s.svc.GetRangeHash(ctx, req.(*object.GetRangeHashRequest))
},
)
resp, err := s.svc.GetRangeHash(ctx, req)
if err != nil {
return nil, err
}
return resp.(*object.GetRangeHashResponse), nil
s.respSvc.SetMeta(resp)
return resp, nil
}

View file

@ -10,12 +10,12 @@ import (
)
func (exec *execCtx) executeOnContainer() {
if exec.isLocal() {
if exec.prm.common.LocalOnly() {
exec.log.Debug("return result directly")
return
}
lookupDepth := exec.netmapLookupDepth()
lookupDepth := exec.prm.common.NetmapLookupDepth()
exec.log.Debug("trying to execute in container...",
zap.Uint64("netmap lookup depth", lookupDepth),
@ -52,12 +52,12 @@ func (exec *execCtx) processCurrentEpoch() bool {
zap.Uint64("number", exec.curProcEpoch),
)
traverser, ok := exec.generateTraverser(exec.containerID())
traverser, ok := exec.generateTraverser(exec.prm.cnr)
if !ok {
return true
}
ctx, cancel := context.WithCancel(exec.context())
ctx, cancel := context.WithCancel(exec.ctx)
defer cancel()
for {

View file

@ -6,7 +6,6 @@ import (
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object_manager/placement"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger"
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
"go.uber.org/zap"
)
@ -45,44 +44,20 @@ func (exec *execCtx) prepare() {
func (exec *execCtx) setLogger(l *logger.Logger) {
exec.log = &logger.Logger{Logger: l.With(
zap.String("request", "SEARCH"),
zap.Stringer("container", exec.containerID()),
zap.Bool("local", exec.isLocal()),
zap.Stringer("container", exec.prm.cnr),
zap.Bool("local", exec.prm.common.LocalOnly()),
zap.Bool("with session", exec.prm.common.SessionToken() != nil),
zap.Bool("with bearer", exec.prm.common.BearerToken() != nil),
)}
}
func (exec execCtx) context() context.Context {
return exec.ctx
}
func (exec execCtx) isLocal() bool {
return exec.prm.common.LocalOnly()
}
func (exec *execCtx) containerID() cid.ID {
return exec.prm.cnr
}
func (exec *execCtx) searchFilters() object.SearchFilters {
return exec.prm.filters
}
func (exec *execCtx) netmapEpoch() uint64 {
return exec.prm.common.NetmapEpoch()
}
func (exec *execCtx) netmapLookupDepth() uint64 {
return exec.prm.common.NetmapLookupDepth()
}
func (exec *execCtx) initEpoch() bool {
exec.curProcEpoch = exec.netmapEpoch()
exec.curProcEpoch = exec.prm.common.NetmapEpoch()
if exec.curProcEpoch > 0 {
return true
}
e, err := exec.svc.currentEpochReceiver.currentEpoch()
e, err := exec.svc.epochSource.Epoch()
switch {
default:

View file

@ -51,7 +51,7 @@ type simpleIDWriter struct {
type testEpochReceiver uint64
func (e testEpochReceiver) currentEpoch() (uint64, error) {
func (e testEpochReceiver) Epoch() (uint64, error) {
return uint64(e), nil
}
@ -103,7 +103,7 @@ func (c *testClientCache) get(info clientcore.NodeInfo) (searchClient, error) {
}
func (s *testStorage) search(exec *execCtx) ([]oid.ID, error) {
v, ok := s.items[exec.containerID().EncodeToString()]
v, ok := s.items[exec.prm.cnr.EncodeToString()]
if !ok {
return nil, nil
}
@ -112,7 +112,7 @@ func (s *testStorage) search(exec *execCtx) ([]oid.ID, error) {
}
func (c *testStorage) searchObjects(exec *execCtx, _ clientcore.NodeInfo) ([]oid.ID, error) {
v, ok := c.items[exec.containerID().EncodeToString()]
v, ok := c.items[exec.prm.cnr.EncodeToString()]
if !ok {
return nil, nil
}
@ -146,7 +146,7 @@ func TestGetLocalOnly(t *testing.T) {
ctx := context.Background()
newSvc := func(storage *testStorage) *Service {
svc := &Service{cfg: new(cfg)}
svc := &Service{}
svc.log = test.NewLogger(false)
svc.localStorage = storage
@ -248,7 +248,7 @@ func TestGetRemoteSmall(t *testing.T) {
container.CalculateID(&id, cnr)
newSvc := func(b *testPlacementBuilder, c *testClientCache) *Service {
svc := &Service{cfg: new(cfg)}
svc := &Service{}
svc.log = test.NewLogger(false)
svc.localStorage = newTestStorage()
@ -261,7 +261,7 @@ func TestGetRemoteSmall(t *testing.T) {
},
}
svc.clientConstructor = c
svc.currentEpochReceiver = testEpochReceiver(curEpoch)
svc.epochSource = testEpochReceiver(curEpoch)
return svc
}
@ -357,7 +357,7 @@ func TestGetFromPastEpoch(t *testing.T) {
ids22 := generateIDs(10)
c22.addResult(idCnr, ids22, nil)
svc := &Service{cfg: new(cfg)}
svc := &Service{}
svc.log = test.NewLogger(false)
svc.localStorage = newTestStorage()
@ -388,7 +388,7 @@ func TestGetFromPastEpoch(t *testing.T) {
},
}
svc.currentEpochReceiver = testEpochReceiver(curEpoch)
svc.epochSource = testEpochReceiver(curEpoch)
w := new(simpleIDWriter)

View file

@ -15,7 +15,7 @@ import (
// Service is an utility serving requests
// of Object.Search service.
type Service struct {
*cfg
cfg
}
// Option is a Service's constructor option.
@ -46,32 +46,31 @@ type cfg struct {
generateTraverser(cid.ID, uint64) (*placement.Traverser, error)
}
currentEpochReceiver interface {
currentEpoch() (uint64, error)
}
epochSource epochSource
keyStore *util.KeyStorage
}
func defaultCfg() *cfg {
return &cfg{
log: &logger.Logger{Logger: zap.L()},
clientConstructor: new(clientConstructorWrapper),
type epochSource interface {
Epoch() (uint64, error)
}
func (c *cfg) initDefault() {
c.log = &logger.Logger{Logger: zap.L()}
c.clientConstructor = new(clientConstructorWrapper)
}
// New creates, initializes and returns utility serving
// Object.Get service requests.
func New(opts ...Option) *Service {
c := defaultCfg()
var s Service
s.cfg.initDefault()
for i := range opts {
opts[i](c)
opts[i](&s.cfg)
}
return &Service{
cfg: c,
}
return &s
}
// WithLogger returns option to specify Get service's logger.
@ -110,9 +109,7 @@ func WithTraverserGenerator(t *util.TraverserGenerator) Option {
// map storage to receive current network state.
func WithNetMapSource(nmSrc netmap.Source) Option {
return func(c *cfg) {
c.currentEpochReceiver = &nmSrcWrapper{
nmSrc: nmSrc,
}
c.epochSource = nmSrc
}
}

View file

@ -4,7 +4,6 @@ import (
"sync"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/client"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/netmap"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/engine"
internalclient "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/internal/client"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/util"
@ -35,10 +34,6 @@ type storageEngineWrapper struct {
type traverseGeneratorWrapper util.TraverserGenerator
type nmSrcWrapper struct {
nmSrc netmap.Source
}
func newUniqueAddressWriter(w IDListWriter) IDListWriter {
return &uniqueIDWriter{
written: make(map[oid.ID]struct{}),
@ -98,7 +93,7 @@ func (c *clientWrapper) searchObjects(exec *execCtx, info client.NodeInfo) ([]oi
var prm internalclient.SearchObjectsPrm
prm.SetContext(exec.context())
prm.SetContext(exec.ctx)
prm.SetClient(c.client)
prm.SetPrivateKey(key)
prm.SetSessionToken(exec.prm.common.SessionToken())
@ -106,8 +101,8 @@ func (c *clientWrapper) searchObjects(exec *execCtx, info client.NodeInfo) ([]oi
prm.SetTTL(exec.prm.common.TTL())
prm.SetXHeaders(exec.prm.common.XHeaders())
prm.SetNetmapEpoch(exec.curProcEpoch)
prm.SetContainerID(exec.containerID())
prm.SetFilters(exec.searchFilters())
prm.SetContainerID(exec.prm.cnr)
prm.SetFilters(exec.prm.filters)
res, err := internalclient.SearchObjects(prm)
if err != nil {
@ -119,8 +114,8 @@ func (c *clientWrapper) searchObjects(exec *execCtx, info client.NodeInfo) ([]oi
func (e *storageEngineWrapper) search(exec *execCtx) ([]oid.ID, error) {
var selectPrm engine.SelectPrm
selectPrm.WithFilters(exec.searchFilters())
selectPrm.WithContainerID(exec.containerID())
selectPrm.WithFilters(exec.prm.filters)
selectPrm.WithContainerID(exec.prm.cnr)
r, err := e.storage.Select(selectPrm)
if err != nil {
@ -143,7 +138,3 @@ func idsFromAddresses(addrs []oid.Address) []oid.ID {
func (e *traverseGeneratorWrapper) generateTraverser(cnr cid.ID, epoch uint64) (*placement.Traverser, error) {
return (*util.TraverserGenerator)(e).GenerateTraverser(cnr, nil, epoch)
}
func (n *nmSrcWrapper) currentEpoch() (uint64, error) {
return n.nmSrc.Epoch()
}

View file

@ -9,7 +9,7 @@ import (
// Service implements Search operation of Object service v2.
type Service struct {
*cfg
cfg
}
// Option represents Service constructor option.
@ -23,15 +23,13 @@ type cfg struct {
// NewService constructs Service instance from provided options.
func NewService(opts ...Option) *Service {
c := new(cfg)
var s Service
for i := range opts {
opts[i](c)
opts[i](&s.cfg)
}
return &Service{
cfg: c,
}
return &s
}
// Search calls internal service and returns v2 object stream.

View file

@ -18,27 +18,29 @@ type SignService struct {
}
type searchStreamSigner struct {
util.ServerStream
respWriter util.ResponseMessageWriter
SearchStream
statusSupported bool
sigSvc *util.SignService
nonEmptyResp bool // set on first Send call
}
type getStreamSigner struct {
util.ServerStream
respWriter util.ResponseMessageWriter
GetObjectStream
statusSupported bool
sigSvc *util.SignService
}
type putStreamSigner struct {
stream *util.RequestMessageStreamer
sigSvc *util.SignService
stream PutObjectStream
statusSupported bool
err error
}
type getRangeStreamSigner struct {
util.ServerStream
respWriter util.ResponseMessageWriter
GetObjectRangeStream
statusSupported bool
sigSvc *util.SignService
}
func NewSignService(key *ecdsa.PrivateKey, svc ServiceServer) *SignService {
@ -50,37 +52,50 @@ func NewSignService(key *ecdsa.PrivateKey, svc ServiceServer) *SignService {
}
func (s *getStreamSigner) Send(resp *object.GetResponse) error {
return s.respWriter(resp)
if err := s.sigSvc.SignResponse(s.statusSupported, resp, nil); err != nil {
return err
}
return s.GetObjectStream.Send(resp)
}
func (s *SignService) Get(req *object.GetRequest, stream GetObjectStream) error {
return s.sigSvc.HandleServerStreamRequest(req,
func(resp util.ResponseMessage) error {
return stream.Send(resp.(*object.GetResponse))
},
func() util.ResponseMessage {
return new(object.GetResponse)
},
func(respWriter util.ResponseMessageWriter) error {
if err := s.sigSvc.VerifyRequest(req); err != nil {
resp := new(object.GetResponse)
_ = s.sigSvc.SignResponse(util.IsStatusSupported(req), resp, err)
return stream.Send(resp)
}
return s.svc.Get(req, &getStreamSigner{
ServerStream: stream,
respWriter: respWriter,
GetObjectStream: stream,
sigSvc: s.sigSvc,
statusSupported: util.IsStatusSupported(req),
})
},
)
}
func (s *putStreamSigner) Send(req *object.PutRequest) error {
return s.stream.Send(req)
s.statusSupported = util.IsStatusSupported(req)
if s.err = s.sigSvc.VerifyRequest(req); s.err != nil {
return util.ErrAbortStream
}
if s.err = s.stream.Send(req); s.err != nil {
return util.ErrAbortStream
}
return nil
}
func (s *putStreamSigner) CloseAndRecv() (*object.PutResponse, error) {
r, err := s.stream.CloseAndRecv()
func (s *putStreamSigner) CloseAndRecv() (resp *object.PutResponse, err error) {
if s.err != nil {
err = s.err
resp = new(object.PutResponse)
} else {
resp, err = s.stream.CloseAndRecv()
if err != nil {
return nil, fmt.Errorf("could not receive response: %w", err)
return nil, fmt.Errorf("could not close stream and receive response: %w", err)
}
}
return r.(*object.PutResponse), nil
return resp, s.sigSvc.SignResponse(s.statusSupported, resp, err)
}
func (s *SignService) Put(ctx context.Context) (PutObjectStream, error) {
@ -90,58 +105,42 @@ func (s *SignService) Put(ctx context.Context) (PutObjectStream, error) {
}
return &putStreamSigner{
stream: s.sigSvc.CreateRequestStreamer(
func(req any) error {
return stream.Send(req.(*object.PutRequest))
},
func() (util.ResponseMessage, error) {
return stream.CloseAndRecv()
},
func() util.ResponseMessage {
return new(object.PutResponse)
},
),
stream: stream,
sigSvc: s.sigSvc,
}, nil
}
func (s *SignService) Head(ctx context.Context, req *object.HeadRequest) (*object.HeadResponse, error) {
resp, err := s.sigSvc.HandleUnaryRequest(ctx, req,
func(ctx context.Context, req any) (util.ResponseMessage, error) {
return s.svc.Head(ctx, req.(*object.HeadRequest))
},
func() util.ResponseMessage {
return new(object.HeadResponse)
},
)
if err != nil {
return nil, err
if err := s.sigSvc.VerifyRequest(req); err != nil {
resp := new(object.HeadResponse)
return resp, s.sigSvc.SignResponse(util.IsStatusSupported(req), resp, err)
}
return resp.(*object.HeadResponse), nil
resp, err := util.WrapResponse(s.svc.Head(ctx, req))
return resp, s.sigSvc.SignResponse(util.IsStatusSupported(req), resp, err)
}
func (s *searchStreamSigner) Send(resp *object.SearchResponse) error {
s.nonEmptyResp = true
return s.respWriter(resp)
if err := s.sigSvc.SignResponse(s.statusSupported, resp, nil); err != nil {
return err
}
return s.SearchStream.Send(resp)
}
func (s *SignService) Search(req *object.SearchRequest, stream SearchStream) error {
return s.sigSvc.HandleServerStreamRequest(req,
func(resp util.ResponseMessage) error {
return stream.Send(resp.(*object.SearchResponse))
},
func() util.ResponseMessage {
return new(object.SearchResponse)
},
func(respWriter util.ResponseMessageWriter) error {
stream := &searchStreamSigner{
ServerStream: stream,
respWriter: respWriter,
if err := s.sigSvc.VerifyRequest(req); err != nil {
resp := new(object.SearchResponse)
_ = s.sigSvc.SignResponse(util.IsStatusSupported(req), resp, err)
return stream.Send(resp)
}
err := s.svc.Search(req, stream)
if err == nil && !stream.nonEmptyResp {
ss := &searchStreamSigner{
SearchStream: stream,
sigSvc: s.sigSvc,
statusSupported: util.IsStatusSupported(req),
}
err := s.svc.Search(req, ss)
if err == nil && !ss.nonEmptyResp {
// The higher component does not write any response in the case of an empty result (which is correct).
// With the introduction of status returns at least one answer must be signed and sent to the client.
// This approach is supported by clients who do not know how to work with statuses (one could make
@ -149,61 +148,44 @@ func (s *SignService) Search(req *object.SearchRequest, stream SearchStream) err
// answer can be neglected due to the gradual refusal to use the "old" clients).
return stream.Send(new(object.SearchResponse))
}
return err
},
)
}
func (s *SignService) Delete(ctx context.Context, req *object.DeleteRequest) (*object.DeleteResponse, error) {
resp, err := s.sigSvc.HandleUnaryRequest(ctx, req,
func(ctx context.Context, req any) (util.ResponseMessage, error) {
return s.svc.Delete(ctx, req.(*object.DeleteRequest))
},
func() util.ResponseMessage {
return new(object.DeleteResponse)
},
)
if err != nil {
return nil, err
if err := s.sigSvc.VerifyRequest(req); err != nil {
resp := new(object.DeleteResponse)
return resp, s.sigSvc.SignResponse(util.IsStatusSupported(req), resp, err)
}
return resp.(*object.DeleteResponse), nil
resp, err := util.WrapResponse(s.svc.Delete(ctx, req))
return resp, s.sigSvc.SignResponse(util.IsStatusSupported(req), resp, err)
}
func (s *getRangeStreamSigner) Send(resp *object.GetRangeResponse) error {
return s.respWriter(resp)
if err := s.sigSvc.SignResponse(s.statusSupported, resp, nil); err != nil {
return err
}
return s.GetObjectRangeStream.Send(resp)
}
func (s *SignService) GetRange(req *object.GetRangeRequest, stream GetObjectRangeStream) error {
return s.sigSvc.HandleServerStreamRequest(req,
func(resp util.ResponseMessage) error {
return stream.Send(resp.(*object.GetRangeResponse))
},
func() util.ResponseMessage {
return new(object.GetRangeResponse)
},
func(respWriter util.ResponseMessageWriter) error {
if err := s.sigSvc.VerifyRequest(req); err != nil {
resp := new(object.GetRangeResponse)
_ = s.sigSvc.SignResponse(util.IsStatusSupported(req), resp, err)
return stream.Send(resp)
}
return s.svc.GetRange(req, &getRangeStreamSigner{
ServerStream: stream,
respWriter: respWriter,
GetObjectRangeStream: stream,
sigSvc: s.sigSvc,
statusSupported: util.IsStatusSupported(req),
})
},
)
}
func (s *SignService) GetRangeHash(ctx context.Context, req *object.GetRangeHashRequest) (*object.GetRangeHashResponse, error) {
resp, err := s.sigSvc.HandleUnaryRequest(ctx, req,
func(ctx context.Context, req any) (util.ResponseMessage, error) {
return s.svc.GetRangeHash(ctx, req.(*object.GetRangeHashRequest))
},
func() util.ResponseMessage {
return new(object.GetRangeHashResponse)
},
)
if err != nil {
return nil, err
if err := s.sigSvc.VerifyRequest(req); err != nil {
resp := new(object.GetRangeHashResponse)
return resp, s.sigSvc.SignResponse(util.IsStatusSupported(req), resp, err)
}
return resp.(*object.GetRangeHashResponse), nil
resp, err := util.WrapResponse(s.svc.GetRangeHash(ctx, req))
return resp, s.sigSvc.SignResponse(util.IsStatusSupported(req), resp, err)
}

View file

@ -51,11 +51,11 @@ func (oiw *objectsInWork) add(addr oid.Address) {
// Policer represents the utility that verifies
// compliance with the object storage policy.
type Policer struct {
*cfg
cfg
cache *lru.Cache[oid.Address, time.Time]
objsInWork *objectsInWork
objsInWork objectsInWork
}
// Option is an option for Policer constructor.
@ -95,38 +95,33 @@ type cfg struct {
rebalanceFreq, evictDuration time.Duration
}
func defaultCfg() *cfg {
return &cfg{
log: &logger.Logger{Logger: zap.L()},
batchSize: 10,
cacheSize: 1024, // 1024 * address size = 1024 * 64 = 64 MiB
rebalanceFreq: 1 * time.Second,
evictDuration: 30 * time.Second,
}
func (c *cfg) initDefault() {
c.log = &logger.Logger{Logger: zap.L()}
c.batchSize = 10
c.cacheSize = 1024 // 1024 * address size = 1024 * 64 = 64 MiB
c.rebalanceFreq = 1 * time.Second
c.evictDuration = 30 * time.Second
}
// New creates, initializes and returns Policer instance.
func New(opts ...Option) *Policer {
c := defaultCfg()
var p Policer
p.cfg.initDefault()
for i := range opts {
opts[i](c)
opts[i](&p.cfg)
}
c.log = &logger.Logger{Logger: c.log.With(zap.String("component", "Object Policer"))}
p.log = &logger.Logger{Logger: p.cfg.log.With(zap.String("component", "Object Policer"))}
cache, err := lru.New[oid.Address, time.Time](int(c.cacheSize))
cache, err := lru.New[oid.Address, time.Time](int(p.cacheSize))
if err != nil {
panic(err)
}
return &Policer{
cfg: c,
cache: cache,
objsInWork: &objectsInWork{
objs: make(map[oid.Address]struct{}, c.maxCapacity),
},
}
p.cache = cache
p.objsInWork.objs = make(map[oid.Address]struct{}, p.maxCapacity)
return &p
}
// WithHeadTimeout returns option to set Head timeout of Policer.

View file

@ -1,50 +0,0 @@
package reputationrpc
import (
"context"
"git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/reputation"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/util"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/util/response"
)
type responseService struct {
respSvc *response.Service
svc Server
}
// NewResponseService returns reputation service server instance that passes
// internal service call to response service.
func NewResponseService(cnrSvc Server, respSvc *response.Service) Server {
return &responseService{
respSvc: respSvc,
svc: cnrSvc,
}
}
func (s *responseService) AnnounceLocalTrust(ctx context.Context, req *reputation.AnnounceLocalTrustRequest) (*reputation.AnnounceLocalTrustResponse, error) {
resp, err := s.respSvc.HandleUnaryRequest(ctx, req,
func(ctx context.Context, req any) (util.ResponseMessage, error) {
return s.svc.AnnounceLocalTrust(ctx, req.(*reputation.AnnounceLocalTrustRequest))
},
)
if err != nil {
return nil, err
}
return resp.(*reputation.AnnounceLocalTrustResponse), nil
}
func (s *responseService) AnnounceIntermediateResult(ctx context.Context, req *reputation.AnnounceIntermediateResultRequest) (*reputation.AnnounceIntermediateResultResponse, error) {
resp, err := s.respSvc.HandleUnaryRequest(ctx, req,
func(ctx context.Context, req any) (util.ResponseMessage, error) {
return s.svc.AnnounceIntermediateResult(ctx, req.(*reputation.AnnounceIntermediateResultRequest))
},
)
if err != nil {
return nil, err
}
return resp.(*reputation.AnnounceIntermediateResultResponse), nil
}

View file

@ -22,33 +22,19 @@ func NewSignService(key *ecdsa.PrivateKey, svc Server) Server {
}
func (s *signService) AnnounceLocalTrust(ctx context.Context, req *reputation.AnnounceLocalTrustRequest) (*reputation.AnnounceLocalTrustResponse, error) {
resp, err := s.sigSvc.HandleUnaryRequest(ctx, req,
func(ctx context.Context, req any) (util.ResponseMessage, error) {
return s.svc.AnnounceLocalTrust(ctx, req.(*reputation.AnnounceLocalTrustRequest))
},
func() util.ResponseMessage {
return new(reputation.AnnounceLocalTrustResponse)
},
)
if err != nil {
return nil, err
if err := s.sigSvc.VerifyRequest(req); err != nil {
resp := new(reputation.AnnounceLocalTrustResponse)
return resp, s.sigSvc.SignResponse(util.IsStatusSupported(req), resp, err)
}
return resp.(*reputation.AnnounceLocalTrustResponse), nil
resp, err := util.WrapResponse(s.svc.AnnounceLocalTrust(ctx, req))
return resp, s.sigSvc.SignResponse(util.IsStatusSupported(req), resp, err)
}
func (s *signService) AnnounceIntermediateResult(ctx context.Context, req *reputation.AnnounceIntermediateResultRequest) (*reputation.AnnounceIntermediateResultResponse, error) {
resp, err := s.sigSvc.HandleUnaryRequest(ctx, req,
func(ctx context.Context, req any) (util.ResponseMessage, error) {
return s.svc.AnnounceIntermediateResult(ctx, req.(*reputation.AnnounceIntermediateResultRequest))
},
func() util.ResponseMessage {
return new(reputation.AnnounceIntermediateResultResponse)
},
)
if err != nil {
return nil, err
if err := s.sigSvc.VerifyRequest(req); err != nil {
resp := new(reputation.AnnounceIntermediateResultResponse)
return resp, s.sigSvc.SignResponse(util.IsStatusSupported(req), resp, err)
}
return resp.(*reputation.AnnounceIntermediateResultResponse), nil
resp, err := util.WrapResponse(s.svc.AnnounceIntermediateResult(ctx, req))
return resp, s.sigSvc.SignResponse(util.IsStatusSupported(req), resp, err)
}

View file

@ -5,6 +5,7 @@ import (
"fmt"
"git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/session"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/util/response"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger"
"go.uber.org/zap"
)
@ -16,14 +17,17 @@ type ServiceExecutor interface {
type executorSvc struct {
exec ServiceExecutor
respSvc *response.Service
log *logger.Logger
}
// NewExecutionService wraps ServiceExecutor and returns Session Service interface.
func NewExecutionService(exec ServiceExecutor, l *logger.Logger) Server {
func NewExecutionService(exec ServiceExecutor, respSvc *response.Service, l *logger.Logger) Server {
return &executorSvc{
exec: exec,
log: l,
respSvc: respSvc,
}
}
@ -41,5 +45,6 @@ func (s *executorSvc) Create(ctx context.Context, req *session.CreateRequest) (*
resp := new(session.CreateResponse)
resp.SetBody(respBody)
s.respSvc.SetMeta(resp)
return resp, nil
}

View file

@ -1,37 +0,0 @@
package session
import (
"context"
"git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/session"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/util"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/util/response"
)
type responseService struct {
respSvc *response.Service
svc Server
}
// NewResponseService returns session service instance that passes internal service
// call to response service.
func NewResponseService(ssSvc Server, respSvc *response.Service) Server {
return &responseService{
respSvc: respSvc,
svc: ssSvc,
}
}
func (s *responseService) Create(ctx context.Context, req *session.CreateRequest) (*session.CreateResponse, error) {
resp, err := s.respSvc.HandleUnaryRequest(ctx, req,
func(ctx context.Context, req any) (util.ResponseMessage, error) {
return s.svc.Create(ctx, req.(*session.CreateRequest))
},
)
if err != nil {
return nil, err
}
return resp.(*session.CreateResponse), nil
}

View file

@ -22,17 +22,10 @@ func NewSignService(key *ecdsa.PrivateKey, svc Server) Server {
}
func (s *signService) Create(ctx context.Context, req *session.CreateRequest) (*session.CreateResponse, error) {
resp, err := s.sigSvc.HandleUnaryRequest(ctx, req,
func(ctx context.Context, req any) (util.ResponseMessage, error) {
return s.svc.Create(ctx, req.(*session.CreateRequest))
},
func() util.ResponseMessage {
return new(session.CreateResponse)
},
)
if err != nil {
return nil, err
if err := s.sigSvc.VerifyRequest(req); err != nil {
resp := new(session.CreateResponse)
return resp, s.sigSvc.SignResponse(util.IsStatusSupported(req), resp, err)
}
return resp.(*session.CreateResponse), nil
resp, err := util.WrapResponse(s.svc.Create(ctx, req))
return resp, s.sigSvc.SignResponse(util.IsStatusSupported(req), resp, err)
}

View file

@ -1,47 +0,0 @@
package response
import (
"fmt"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/util"
)
// ClientMessageStreamer represents client-side message streamer
// that sets meta values to the response.
type ClientMessageStreamer struct {
cfg *cfg
send util.RequestMessageWriter
close util.ClientStreamCloser
}
// Send calls send method of internal streamer.
func (s *ClientMessageStreamer) Send(req any) error {
if err := s.send(req); err != nil {
return fmt.Errorf("(%T) could not send the request: %w", s, err)
}
return nil
}
// CloseAndRecv closes internal stream, receivers the response,
// sets meta values and returns the result.
func (s *ClientMessageStreamer) CloseAndRecv() (util.ResponseMessage, error) {
resp, err := s.close()
if err != nil {
return nil, fmt.Errorf("(%T) could not close stream and receive response: %w", s, err)
}
setMeta(resp, s.cfg)
return resp, nil
}
// CreateRequestStreamer wraps stream methods and returns ClientMessageStreamer instance.
func (s *Service) CreateRequestStreamer(sender util.RequestMessageWriter, closer util.ClientStreamCloser) *ClientMessageStreamer {
return &ClientMessageStreamer{
cfg: s.cfg,
send: sender,
close: closer,
}
}

View file

@ -1,37 +0,0 @@
package response
import (
"fmt"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/util"
)
// ServerMessageStreamer represents server-side message streamer
// that sets meta values to all response messages.
type ServerMessageStreamer struct {
cfg *cfg
recv util.ResponseMessageReader
}
// Recv calls Recv method of internal streamer, sets response meta
// values and returns the response.
func (s *ServerMessageStreamer) Recv() (util.ResponseMessage, error) {
m, err := s.recv()
if err != nil {
return nil, fmt.Errorf("could not receive response message for signing: %w", err)
}
setMeta(m, s.cfg)
return m, nil
}
// HandleServerStreamRequest builds internal streamer via handlers, wraps it to ServerMessageStreamer and returns the result.
func (s *Service) HandleServerStreamRequest(respWriter util.ResponseMessageWriter) util.ResponseMessageWriter {
return func(resp util.ResponseMessage) error {
setMeta(resp, s.cfg)
return respWriter(resp)
}
}

View file

@ -11,44 +11,24 @@ import (
// Service represents universal v2 service
// that sets response meta header values.
type Service struct {
cfg *cfg
}
// Option is an option of Service constructor.
type Option func(*cfg)
type cfg struct {
version refs.Version
state netmap.State
}
func defaultCfg() *cfg {
var c cfg
version.Current().WriteToV2(&c.version)
return &c
}
// NewService creates, initializes and returns Service instance.
func NewService(opts ...Option) *Service {
c := defaultCfg()
for i := range opts {
opts[i](c)
func NewService(nmState netmap.State) *Service {
s := &Service{state: nmState}
version.Current().WriteToV2(&s.version)
return s
}
return &Service{
cfg: c,
}
}
func setMeta(resp util.ResponseMessage, cfg *cfg) {
// SetMeta sets adds meta-header to resp.
func (s *Service) SetMeta(resp util.ResponseMessage) {
meta := new(session.ResponseMetaHeader)
meta.SetVersion(&cfg.version)
meta.SetVersion(&s.version)
meta.SetTTL(1) // FIXME: #1160 TTL must be calculated
meta.SetEpoch(cfg.state.CurrentEpoch())
meta.SetEpoch(s.state.CurrentEpoch())
if origin := resp.GetMetaHeader(); origin != nil {
// FIXME: #1160 what if origin is set by local server?
@ -57,10 +37,3 @@ func setMeta(resp util.ResponseMessage, cfg *cfg) {
resp.SetMetaHeader(meta)
}
// WithNetworkState returns option to set network state of Service.
func WithNetworkState(v netmap.State) Option {
return func(c *cfg) {
c.state = v
}
}

View file

@ -1,21 +0,0 @@
package response
import (
"context"
"fmt"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/util"
)
// HandleUnaryRequest call passes request to handler, sets response meta header values and returns it.
func (s *Service) HandleUnaryRequest(ctx context.Context, req any, handler util.UnaryHandler) (util.ResponseMessage, error) {
// process request
resp, err := handler(ctx, req)
if err != nil {
return nil, fmt.Errorf("could not handle request: %w", err)
}
setMeta(resp, s.cfg)
return resp, nil
}

View file

@ -1,7 +1,6 @@
package util
import (
"context"
"crypto/ecdsa"
"errors"
"fmt"
@ -21,198 +20,59 @@ type ResponseMessage interface {
SetMetaHeader(*session.ResponseMetaHeader)
}
type UnaryHandler func(context.Context, any) (ResponseMessage, error)
type SignService struct {
key *ecdsa.PrivateKey
}
type ResponseMessageWriter func(ResponseMessage) error
type ServerStreamHandler func(context.Context, any) (ResponseMessageReader, error)
type ResponseMessageReader func() (ResponseMessage, error)
var ErrAbortStream = errors.New("abort message stream")
type ResponseConstructor func() ResponseMessage
type RequestMessageWriter func(any) error
type ClientStreamCloser func() (ResponseMessage, error)
type RequestMessageStreamer struct {
key *ecdsa.PrivateKey
send RequestMessageWriter
close ClientStreamCloser
respCons ResponseConstructor
statusSupported bool
sendErr error
}
func NewUnarySignService(key *ecdsa.PrivateKey) *SignService {
return &SignService{
key: key,
}
}
func (s *RequestMessageStreamer) Send(req any) error {
// req argument should be strengthen with type RequestMessage
s.statusSupported = isStatusSupported(req.(RequestMessage)) // panic is OK here for now
var err error
// verify request signatures
if err = signature.VerifyServiceMessage(req); err != nil {
err = fmt.Errorf("could not verify request: %w", err)
} else {
err = s.send(req)
}
if err != nil {
if !s.statusSupported {
return err
}
s.sendErr = err
return ErrAbortStream
}
return nil
}
func (s *RequestMessageStreamer) CloseAndRecv() (ResponseMessage, error) {
var (
resp ResponseMessage
err error
)
if s.sendErr != nil {
err = s.sendErr
} else {
resp, err = s.close()
if err != nil {
err = fmt.Errorf("could not close stream and receive response: %w", err)
}
}
if err != nil {
if !s.statusSupported {
return nil, err
}
resp = s.respCons()
setStatusV2(resp, err)
}
if err = signResponse(s.key, resp, s.statusSupported); err != nil {
return nil, err
}
return resp, nil
}
func (s *SignService) CreateRequestStreamer(sender RequestMessageWriter, closer ClientStreamCloser, blankResp ResponseConstructor) *RequestMessageStreamer {
return &RequestMessageStreamer{
key: s.key,
send: sender,
close: closer,
respCons: blankResp,
}
}
func (s *SignService) HandleServerStreamRequest(
req any,
respWriter ResponseMessageWriter,
blankResp ResponseConstructor,
respWriterCaller func(ResponseMessageWriter) error,
) error {
// handle protocol versions <=2.10 (API statuses was introduced in 2.11 only)
// req argument should be strengthen with type RequestMessage
statusSupported := isStatusSupported(req.(RequestMessage)) // panic is OK here for now
var err error
// verify request signatures
if err = signature.VerifyServiceMessage(req); err != nil {
err = fmt.Errorf("could not verify request: %w", err)
} else {
err = respWriterCaller(func(resp ResponseMessage) error {
if err := signResponse(s.key, resp, statusSupported); err != nil {
return err
}
return respWriter(resp)
})
}
// SignResponse response with private key via signature.SignServiceMessage.
// The signature error affects the result depending on the protocol version:
// - if status return is supported, panics since we cannot return the failed status, because it will not be signed.
// - otherwise, returns error in order to transport it directly.
func (s *SignService) SignResponse(statusSupported bool, resp ResponseMessage, err error) error {
if err != nil {
if !statusSupported {
return err
}
resp := blankResp()
setStatusV2(resp, err)
}
_ = signResponse(s.key, resp, false) // panics or returns nil with false arg
return respWriter(resp)
err = signature.SignServiceMessage(s.key, resp)
if err != nil {
return fmt.Errorf("could not sign response: %w", err)
}
return nil
}
func (s *SignService) HandleUnaryRequest(ctx context.Context, req any, handler UnaryHandler, blankResp ResponseConstructor) (ResponseMessage, error) {
// handle protocol versions <=2.10 (API statuses was introduced in 2.11 only)
// req argument should be strengthen with type RequestMessage
statusSupported := isStatusSupported(req.(RequestMessage)) // panic is OK here for now
var (
resp ResponseMessage
err error
)
// verify request signatures
if err = signature.VerifyServiceMessage(req); err != nil {
func (s *SignService) VerifyRequest(req RequestMessage) error {
if err := signature.VerifyServiceMessage(req); err != nil {
var sigErr apistatus.SignatureVerification
sigErr.SetMessage(err.Error())
err = sigErr
} else {
// process request
resp, err = handler(ctx, req)
return sigErr
}
return nil
}
if err != nil {
if !statusSupported {
return nil, err
// WrapResponse creates an appropriate response struct if it is nil.
func WrapResponse[T any](resp *T, err error) (*T, error) {
if resp != nil {
return resp, err
}
return new(T), nil
}
resp = blankResp()
setStatusV2(resp, err)
}
// sign the response
if err = signResponse(s.key, resp, statusSupported); err != nil {
return nil, err
}
return resp, nil
}
func isStatusSupported(req RequestMessage) bool {
// IsStatusSupported returns true iff request version implies expecting status return.
// This allows us to handle protocol versions <=2.10 (API statuses was introduced in 2.11 only).
func IsStatusSupported(req RequestMessage) bool {
version := req.GetMetaHeader().GetVersion()
mjr := version.GetMajor()
@ -228,22 +88,3 @@ func setStatusV2(resp ResponseMessage, err error) {
session.SetStatus(resp, apistatus.ToStatusV2(apistatus.ErrToStatus(err)))
}
// signs response with private key via signature.SignServiceMessage.
// The signature error affects the result depending on the protocol version:
// - if status return is supported, panics since we cannot return the failed status, because it will not be signed;
// - otherwise, returns error in order to transport it directly.
func signResponse(key *ecdsa.PrivateKey, resp any, statusSupported bool) error {
err := signature.SignServiceMessage(key, resp)
if err != nil {
err = fmt.Errorf("could not sign response: %w", err)
if statusSupported {
// We can't pass this error as status code since response will be unsigned.
// Isn't expected in practice, so panic is ok here.
panic(err)
}
}
return err
}