forked from TrueCloudLab/frostfs-node
[#84] Remove mocks and debug code from neofs-node services
Signed-off-by: Alex Vanin <alexey@nspcc.ru>
This commit is contained in:
parent
b5aef1011b
commit
0e7e0bd2d6
5 changed files with 3 additions and 256 deletions
|
@ -29,19 +29,13 @@ func initAccountingService(c *cfg) {
|
||||||
balanceMorphWrapper, err := wrapper.New(balanceClient)
|
balanceMorphWrapper, err := wrapper.New(balanceClient)
|
||||||
fatalOnErr(err)
|
fatalOnErr(err)
|
||||||
|
|
||||||
metaHdr := new(session.ResponseMetaHeader)
|
|
||||||
xHdr := new(session.XHeader)
|
|
||||||
xHdr.SetKey("test X-Header key")
|
|
||||||
xHdr.SetValue("test X-Header value")
|
|
||||||
metaHdr.SetXHeaders([]*session.XHeader{xHdr})
|
|
||||||
|
|
||||||
accountingGRPC.RegisterAccountingServiceServer(c.cfgGRPC.server,
|
accountingGRPC.RegisterAccountingServiceServer(c.cfgGRPC.server,
|
||||||
accountingTransportGRPC.New(
|
accountingTransportGRPC.New(
|
||||||
accountingService.NewSignService(
|
accountingService.NewSignService(
|
||||||
c.key,
|
c.key,
|
||||||
accountingService.NewExecutionService(
|
accountingService.NewExecutionService(
|
||||||
accounting.NewExecutor(balanceMorphWrapper),
|
accounting.NewExecutor(balanceMorphWrapper),
|
||||||
metaHdr,
|
new(session.ResponseMetaHeader),
|
||||||
),
|
),
|
||||||
),
|
),
|
||||||
),
|
),
|
||||||
|
|
|
@ -28,19 +28,13 @@ func initContainerService(c *cfg) {
|
||||||
c.cfgObject.cnrStorage = wrap // use RPC node as source of containers
|
c.cfgObject.cnrStorage = wrap // use RPC node as source of containers
|
||||||
c.cfgObject.cnrClient = wrap
|
c.cfgObject.cnrClient = wrap
|
||||||
|
|
||||||
metaHdr := new(session.ResponseMetaHeader)
|
|
||||||
xHdr := new(session.XHeader)
|
|
||||||
xHdr.SetKey("test X-Header key")
|
|
||||||
xHdr.SetValue("test X-Header value")
|
|
||||||
metaHdr.SetXHeaders([]*session.XHeader{xHdr})
|
|
||||||
|
|
||||||
containerGRPC.RegisterContainerServiceServer(c.cfgGRPC.server,
|
containerGRPC.RegisterContainerServiceServer(c.cfgGRPC.server,
|
||||||
containerTransportGRPC.New(
|
containerTransportGRPC.New(
|
||||||
containerService.NewSignService(
|
containerService.NewSignService(
|
||||||
c.key,
|
c.key,
|
||||||
containerService.NewExecutionService(
|
containerService.NewExecutionService(
|
||||||
containerMorph.NewExecutor(cnrClient),
|
containerMorph.NewExecutor(cnrClient),
|
||||||
metaHdr,
|
new(session.ResponseMetaHeader),
|
||||||
),
|
),
|
||||||
),
|
),
|
||||||
),
|
),
|
||||||
|
|
|
@ -6,7 +6,6 @@ import (
|
||||||
"sync"
|
"sync"
|
||||||
|
|
||||||
"github.com/mr-tron/base58"
|
"github.com/mr-tron/base58"
|
||||||
objectSDK "github.com/nspcc-dev/neofs-api-go/pkg/object"
|
|
||||||
"github.com/nspcc-dev/neofs-api-go/pkg/owner"
|
"github.com/nspcc-dev/neofs-api-go/pkg/owner"
|
||||||
"github.com/nspcc-dev/neofs-api-go/v2/object"
|
"github.com/nspcc-dev/neofs-api-go/v2/object"
|
||||||
objectGRPC "github.com/nspcc-dev/neofs-api-go/v2/object/grpc"
|
objectGRPC "github.com/nspcc-dev/neofs-api-go/v2/object/grpc"
|
||||||
|
@ -33,9 +32,7 @@ import (
|
||||||
searchsvcV2 "github.com/nspcc-dev/neofs-node/pkg/services/object/search/v2"
|
searchsvcV2 "github.com/nspcc-dev/neofs-node/pkg/services/object/search/v2"
|
||||||
"github.com/nspcc-dev/neofs-node/pkg/services/object/util"
|
"github.com/nspcc-dev/neofs-node/pkg/services/object/util"
|
||||||
"github.com/nspcc-dev/neofs-node/pkg/services/object_manager/gc"
|
"github.com/nspcc-dev/neofs-node/pkg/services/object_manager/gc"
|
||||||
"github.com/nspcc-dev/neofs-node/pkg/util/logger"
|
|
||||||
"github.com/panjf2000/ants/v2"
|
"github.com/panjf2000/ants/v2"
|
||||||
"go.uber.org/zap"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
type objectSvc struct {
|
type objectSvc struct {
|
||||||
|
@ -163,19 +160,6 @@ func (s *objectSvc) GetRangeHash(ctx context.Context, req *object.GetRangeHashRe
|
||||||
return s.rngHash.GetRangeHash(ctx, req)
|
return s.rngHash.GetRangeHash(ctx, req)
|
||||||
}
|
}
|
||||||
|
|
||||||
type deleteHandler struct {
|
|
||||||
log *logger.Logger
|
|
||||||
}
|
|
||||||
|
|
||||||
func (s *deleteHandler) DeleteObjects(list ...*objectSDK.Address) {
|
|
||||||
for i := range list {
|
|
||||||
s.log.Info("object is marked for removal",
|
|
||||||
zap.String("CID", base58.Encode(list[i].GetContainerID().ToV2().GetValue())),
|
|
||||||
zap.String("ID", base58.Encode(list[i].GetObjectID().ToV2().GetValue())),
|
|
||||||
)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func initObjectService(c *cfg) {
|
func initObjectService(c *cfg) {
|
||||||
ls := localstore.New(
|
ls := localstore.New(
|
||||||
c.cfgObject.blobstorage,
|
c.cfgObject.blobstorage,
|
||||||
|
|
|
@ -11,19 +11,13 @@ import (
|
||||||
func initSessionService(c *cfg) {
|
func initSessionService(c *cfg) {
|
||||||
c.privateTokenStore = storage.New()
|
c.privateTokenStore = storage.New()
|
||||||
|
|
||||||
metaHdr := new(session.ResponseMetaHeader)
|
|
||||||
xHdr := new(session.XHeader)
|
|
||||||
xHdr.SetKey("test X-Header key")
|
|
||||||
xHdr.SetValue("test X-Header value")
|
|
||||||
metaHdr.SetXHeaders([]*session.XHeader{xHdr})
|
|
||||||
|
|
||||||
sessionGRPC.RegisterSessionServiceServer(c.cfgGRPC.server,
|
sessionGRPC.RegisterSessionServiceServer(c.cfgGRPC.server,
|
||||||
sessionTransportGRPC.New(
|
sessionTransportGRPC.New(
|
||||||
sessionSvc.NewSignService(
|
sessionSvc.NewSignService(
|
||||||
c.key,
|
c.key,
|
||||||
sessionSvc.NewExecutionService(
|
sessionSvc.NewExecutionService(
|
||||||
c.privateTokenStore,
|
c.privateTokenStore,
|
||||||
metaHdr,
|
new(session.ResponseMetaHeader),
|
||||||
),
|
),
|
||||||
),
|
),
|
||||||
),
|
),
|
||||||
|
|
|
@ -1,219 +0,0 @@
|
||||||
package object
|
|
||||||
|
|
||||||
import (
|
|
||||||
"context"
|
|
||||||
|
|
||||||
"github.com/nspcc-dev/neofs-api-go/v2/object"
|
|
||||||
"github.com/nspcc-dev/neofs-api-go/v2/session"
|
|
||||||
"github.com/pkg/errors"
|
|
||||||
)
|
|
||||||
|
|
||||||
type GetObjectBodyStreamer interface {
|
|
||||||
Recv() (*object.GetResponseBody, error)
|
|
||||||
}
|
|
||||||
|
|
||||||
type PutObjectBodyStreamer interface {
|
|
||||||
Send(*object.PutRequestBody) error
|
|
||||||
CloseAndRecv() (*object.PutResponseBody, error)
|
|
||||||
}
|
|
||||||
|
|
||||||
type SearchObjectBodyStreamer interface {
|
|
||||||
Recv() (*object.SearchResponseBody, error)
|
|
||||||
}
|
|
||||||
|
|
||||||
type GetRangeObjectBodyStreamer interface {
|
|
||||||
Recv() (*object.GetRangeResponseBody, error)
|
|
||||||
}
|
|
||||||
|
|
||||||
type ServiceExecutor interface {
|
|
||||||
Get(context.Context, *object.GetRequestBody) (GetObjectBodyStreamer, error)
|
|
||||||
Put(context.Context) (PutObjectBodyStreamer, error)
|
|
||||||
Head(context.Context, *object.HeadRequestBody) (*object.HeadResponseBody, error)
|
|
||||||
Search(context.Context, *object.SearchRequestBody) (SearchObjectBodyStreamer, error)
|
|
||||||
Delete(context.Context, *object.DeleteRequestBody) (*object.DeleteResponseBody, error)
|
|
||||||
GetRange(context.Context, *object.GetRangeRequestBody) (GetRangeObjectBodyStreamer, error)
|
|
||||||
GetRangeHash(context.Context, *object.GetRangeHashRequestBody) (*object.GetRangeHashResponseBody, error)
|
|
||||||
}
|
|
||||||
|
|
||||||
type executorSvc struct {
|
|
||||||
exec ServiceExecutor
|
|
||||||
|
|
||||||
metaHeader *session.ResponseMetaHeader
|
|
||||||
}
|
|
||||||
|
|
||||||
type searchStreamer struct {
|
|
||||||
bodyStreamer SearchObjectBodyStreamer
|
|
||||||
|
|
||||||
metaHdr *session.ResponseMetaHeader
|
|
||||||
}
|
|
||||||
|
|
||||||
type getStreamer struct {
|
|
||||||
bodyStreamer GetObjectBodyStreamer
|
|
||||||
|
|
||||||
metaHdr *session.ResponseMetaHeader
|
|
||||||
}
|
|
||||||
|
|
||||||
type putStreamer struct {
|
|
||||||
bodyStreamer PutObjectBodyStreamer
|
|
||||||
|
|
||||||
metaHdr *session.ResponseMetaHeader
|
|
||||||
}
|
|
||||||
|
|
||||||
type rangeStreamer struct {
|
|
||||||
bodyStreamer GetRangeObjectBodyStreamer
|
|
||||||
|
|
||||||
metaHdr *session.ResponseMetaHeader
|
|
||||||
}
|
|
||||||
|
|
||||||
// NewExecutionService wraps ServiceExecutor and returns Object Service interface.
|
|
||||||
//
|
|
||||||
// Passed meta header is attached to all responses.
|
|
||||||
func NewExecutionService(exec ServiceExecutor, metaHdr *session.ResponseMetaHeader) object.Service {
|
|
||||||
return &executorSvc{
|
|
||||||
exec: exec,
|
|
||||||
metaHeader: metaHdr,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func (s *getStreamer) Recv() (*object.GetResponse, error) {
|
|
||||||
body, err := s.bodyStreamer.Recv()
|
|
||||||
if err != nil {
|
|
||||||
return nil, errors.Wrap(err, "could not receive response body")
|
|
||||||
}
|
|
||||||
|
|
||||||
resp := new(object.GetResponse)
|
|
||||||
resp.SetBody(body)
|
|
||||||
resp.SetMetaHeader(s.metaHdr)
|
|
||||||
|
|
||||||
return resp, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (s *executorSvc) Get(ctx context.Context, req *object.GetRequest) (object.GetObjectStreamer, error) {
|
|
||||||
bodyStream, err := s.exec.Get(ctx, req.GetBody())
|
|
||||||
if err != nil {
|
|
||||||
return nil, errors.Wrap(err, "could not execute Get request")
|
|
||||||
}
|
|
||||||
|
|
||||||
return &getStreamer{
|
|
||||||
bodyStreamer: bodyStream,
|
|
||||||
metaHdr: s.metaHeader,
|
|
||||||
}, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (s *putStreamer) Send(req *object.PutRequest) error {
|
|
||||||
return s.bodyStreamer.Send(req.GetBody())
|
|
||||||
}
|
|
||||||
|
|
||||||
func (s *putStreamer) CloseAndRecv() (*object.PutResponse, error) {
|
|
||||||
body, err := s.bodyStreamer.CloseAndRecv()
|
|
||||||
if err != nil {
|
|
||||||
return nil, errors.Wrap(err, "could not receive response body")
|
|
||||||
}
|
|
||||||
|
|
||||||
resp := new(object.PutResponse)
|
|
||||||
resp.SetBody(body)
|
|
||||||
resp.SetMetaHeader(s.metaHdr)
|
|
||||||
|
|
||||||
return resp, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (s *executorSvc) Put(ctx context.Context) (object.PutObjectStreamer, error) {
|
|
||||||
bodyStream, err := s.exec.Put(ctx)
|
|
||||||
if err != nil {
|
|
||||||
return nil, errors.Wrap(err, "could not execute Put request")
|
|
||||||
}
|
|
||||||
|
|
||||||
return &putStreamer{
|
|
||||||
bodyStreamer: bodyStream,
|
|
||||||
metaHdr: s.metaHeader,
|
|
||||||
}, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (s *executorSvc) Head(ctx context.Context, req *object.HeadRequest) (*object.HeadResponse, error) {
|
|
||||||
respBody, err := s.exec.Head(ctx, req.GetBody())
|
|
||||||
if err != nil {
|
|
||||||
return nil, errors.Wrap(err, "could not execute Head request")
|
|
||||||
}
|
|
||||||
|
|
||||||
resp := new(object.HeadResponse)
|
|
||||||
resp.SetBody(respBody)
|
|
||||||
resp.SetMetaHeader(s.metaHeader)
|
|
||||||
|
|
||||||
return resp, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (s *searchStreamer) Recv() (*object.SearchResponse, error) {
|
|
||||||
body, err := s.bodyStreamer.Recv()
|
|
||||||
if err != nil {
|
|
||||||
return nil, errors.Wrap(err, "could not receive response body")
|
|
||||||
}
|
|
||||||
|
|
||||||
resp := new(object.SearchResponse)
|
|
||||||
resp.SetBody(body)
|
|
||||||
resp.SetMetaHeader(s.metaHdr)
|
|
||||||
|
|
||||||
return resp, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (s *executorSvc) Search(ctx context.Context, req *object.SearchRequest) (object.SearchObjectStreamer, error) {
|
|
||||||
bodyStream, err := s.exec.Search(ctx, req.GetBody())
|
|
||||||
if err != nil {
|
|
||||||
return nil, errors.Wrap(err, "could not execute Search request")
|
|
||||||
}
|
|
||||||
|
|
||||||
return &searchStreamer{
|
|
||||||
bodyStreamer: bodyStream,
|
|
||||||
metaHdr: s.metaHeader,
|
|
||||||
}, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (s *executorSvc) Delete(ctx context.Context, req *object.DeleteRequest) (*object.DeleteResponse, error) {
|
|
||||||
respBody, err := s.exec.Delete(ctx, req.GetBody())
|
|
||||||
if err != nil {
|
|
||||||
return nil, errors.Wrap(err, "could not execute Delete request")
|
|
||||||
}
|
|
||||||
|
|
||||||
resp := new(object.DeleteResponse)
|
|
||||||
resp.SetBody(respBody)
|
|
||||||
resp.SetMetaHeader(s.metaHeader)
|
|
||||||
|
|
||||||
return resp, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (s *rangeStreamer) Recv() (*object.GetRangeResponse, error) {
|
|
||||||
body, err := s.bodyStreamer.Recv()
|
|
||||||
if err != nil {
|
|
||||||
return nil, errors.Wrap(err, "could not receive response body")
|
|
||||||
}
|
|
||||||
|
|
||||||
resp := new(object.GetRangeResponse)
|
|
||||||
resp.SetBody(body)
|
|
||||||
resp.SetMetaHeader(s.metaHdr)
|
|
||||||
|
|
||||||
return resp, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (s *executorSvc) GetRange(ctx context.Context, req *object.GetRangeRequest) (object.GetRangeObjectStreamer, error) {
|
|
||||||
bodyStream, err := s.exec.GetRange(ctx, req.GetBody())
|
|
||||||
if err != nil {
|
|
||||||
return nil, errors.Wrap(err, "could not execute GetRange request")
|
|
||||||
}
|
|
||||||
|
|
||||||
return &rangeStreamer{
|
|
||||||
bodyStreamer: bodyStream,
|
|
||||||
metaHdr: s.metaHeader,
|
|
||||||
}, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (s *executorSvc) GetRangeHash(ctx context.Context, req *object.GetRangeHashRequest) (*object.GetRangeHashResponse, error) {
|
|
||||||
respBody, err := s.exec.GetRangeHash(ctx, req.GetBody())
|
|
||||||
if err != nil {
|
|
||||||
return nil, errors.Wrap(err, "could not execute GetRangeHash request")
|
|
||||||
}
|
|
||||||
|
|
||||||
resp := new(object.GetRangeHashResponse)
|
|
||||||
resp.SetBody(respBody)
|
|
||||||
resp.SetMetaHeader(s.metaHeader)
|
|
||||||
|
|
||||||
return resp, nil
|
|
||||||
}
|
|
Loading…
Reference in a new issue