diff --git a/cmd/neofs-node/accounting.go b/cmd/neofs-node/accounting.go index 5e99f2cc..28f7cb48 100644 --- a/cmd/neofs-node/accounting.go +++ b/cmd/neofs-node/accounting.go @@ -29,19 +29,13 @@ func initAccountingService(c *cfg) { balanceMorphWrapper, err := wrapper.New(balanceClient) fatalOnErr(err) - metaHdr := new(session.ResponseMetaHeader) - xHdr := new(session.XHeader) - xHdr.SetKey("test X-Header key") - xHdr.SetValue("test X-Header value") - metaHdr.SetXHeaders([]*session.XHeader{xHdr}) - accountingGRPC.RegisterAccountingServiceServer(c.cfgGRPC.server, accountingTransportGRPC.New( accountingService.NewSignService( c.key, accountingService.NewExecutionService( accounting.NewExecutor(balanceMorphWrapper), - metaHdr, + new(session.ResponseMetaHeader), ), ), ), diff --git a/cmd/neofs-node/container.go b/cmd/neofs-node/container.go index 51b20f62..71e05a77 100644 --- a/cmd/neofs-node/container.go +++ b/cmd/neofs-node/container.go @@ -28,19 +28,13 @@ func initContainerService(c *cfg) { c.cfgObject.cnrStorage = wrap // use RPC node as source of containers c.cfgObject.cnrClient = wrap - metaHdr := new(session.ResponseMetaHeader) - xHdr := new(session.XHeader) - xHdr.SetKey("test X-Header key") - xHdr.SetValue("test X-Header value") - metaHdr.SetXHeaders([]*session.XHeader{xHdr}) - containerGRPC.RegisterContainerServiceServer(c.cfgGRPC.server, containerTransportGRPC.New( containerService.NewSignService( c.key, containerService.NewExecutionService( containerMorph.NewExecutor(cnrClient), - metaHdr, + new(session.ResponseMetaHeader), ), ), ), diff --git a/cmd/neofs-node/object.go b/cmd/neofs-node/object.go index b307dad1..fb035d15 100644 --- a/cmd/neofs-node/object.go +++ b/cmd/neofs-node/object.go @@ -6,7 +6,6 @@ import ( "sync" "github.com/mr-tron/base58" - objectSDK "github.com/nspcc-dev/neofs-api-go/pkg/object" "github.com/nspcc-dev/neofs-api-go/pkg/owner" "github.com/nspcc-dev/neofs-api-go/v2/object" objectGRPC "github.com/nspcc-dev/neofs-api-go/v2/object/grpc" @@ -33,9 +32,7 @@ import ( searchsvcV2 "github.com/nspcc-dev/neofs-node/pkg/services/object/search/v2" "github.com/nspcc-dev/neofs-node/pkg/services/object/util" "github.com/nspcc-dev/neofs-node/pkg/services/object_manager/gc" - "github.com/nspcc-dev/neofs-node/pkg/util/logger" "github.com/panjf2000/ants/v2" - "go.uber.org/zap" ) type objectSvc struct { @@ -163,19 +160,6 @@ func (s *objectSvc) GetRangeHash(ctx context.Context, req *object.GetRangeHashRe return s.rngHash.GetRangeHash(ctx, req) } -type deleteHandler struct { - log *logger.Logger -} - -func (s *deleteHandler) DeleteObjects(list ...*objectSDK.Address) { - for i := range list { - s.log.Info("object is marked for removal", - zap.String("CID", base58.Encode(list[i].GetContainerID().ToV2().GetValue())), - zap.String("ID", base58.Encode(list[i].GetObjectID().ToV2().GetValue())), - ) - } -} - func initObjectService(c *cfg) { ls := localstore.New( c.cfgObject.blobstorage, diff --git a/cmd/neofs-node/session.go b/cmd/neofs-node/session.go index 46f492ee..8432f227 100644 --- a/cmd/neofs-node/session.go +++ b/cmd/neofs-node/session.go @@ -11,19 +11,13 @@ import ( func initSessionService(c *cfg) { c.privateTokenStore = storage.New() - metaHdr := new(session.ResponseMetaHeader) - xHdr := new(session.XHeader) - xHdr.SetKey("test X-Header key") - xHdr.SetValue("test X-Header value") - metaHdr.SetXHeaders([]*session.XHeader{xHdr}) - sessionGRPC.RegisterSessionServiceServer(c.cfgGRPC.server, sessionTransportGRPC.New( sessionSvc.NewSignService( c.key, sessionSvc.NewExecutionService( c.privateTokenStore, - metaHdr, + new(session.ResponseMetaHeader), ), ), ), diff --git a/pkg/services/object/executor.go b/pkg/services/object/executor.go deleted file mode 100644 index 2e84db60..00000000 --- a/pkg/services/object/executor.go +++ /dev/null @@ -1,219 +0,0 @@ -package object - -import ( - "context" - - "github.com/nspcc-dev/neofs-api-go/v2/object" - "github.com/nspcc-dev/neofs-api-go/v2/session" - "github.com/pkg/errors" -) - -type GetObjectBodyStreamer interface { - Recv() (*object.GetResponseBody, error) -} - -type PutObjectBodyStreamer interface { - Send(*object.PutRequestBody) error - CloseAndRecv() (*object.PutResponseBody, error) -} - -type SearchObjectBodyStreamer interface { - Recv() (*object.SearchResponseBody, error) -} - -type GetRangeObjectBodyStreamer interface { - Recv() (*object.GetRangeResponseBody, error) -} - -type ServiceExecutor interface { - Get(context.Context, *object.GetRequestBody) (GetObjectBodyStreamer, error) - Put(context.Context) (PutObjectBodyStreamer, error) - Head(context.Context, *object.HeadRequestBody) (*object.HeadResponseBody, error) - Search(context.Context, *object.SearchRequestBody) (SearchObjectBodyStreamer, error) - Delete(context.Context, *object.DeleteRequestBody) (*object.DeleteResponseBody, error) - GetRange(context.Context, *object.GetRangeRequestBody) (GetRangeObjectBodyStreamer, error) - GetRangeHash(context.Context, *object.GetRangeHashRequestBody) (*object.GetRangeHashResponseBody, error) -} - -type executorSvc struct { - exec ServiceExecutor - - metaHeader *session.ResponseMetaHeader -} - -type searchStreamer struct { - bodyStreamer SearchObjectBodyStreamer - - metaHdr *session.ResponseMetaHeader -} - -type getStreamer struct { - bodyStreamer GetObjectBodyStreamer - - metaHdr *session.ResponseMetaHeader -} - -type putStreamer struct { - bodyStreamer PutObjectBodyStreamer - - metaHdr *session.ResponseMetaHeader -} - -type rangeStreamer struct { - bodyStreamer GetRangeObjectBodyStreamer - - metaHdr *session.ResponseMetaHeader -} - -// NewExecutionService wraps ServiceExecutor and returns Object Service interface. -// -// Passed meta header is attached to all responses. -func NewExecutionService(exec ServiceExecutor, metaHdr *session.ResponseMetaHeader) object.Service { - return &executorSvc{ - exec: exec, - metaHeader: metaHdr, - } -} - -func (s *getStreamer) Recv() (*object.GetResponse, error) { - body, err := s.bodyStreamer.Recv() - if err != nil { - return nil, errors.Wrap(err, "could not receive response body") - } - - resp := new(object.GetResponse) - resp.SetBody(body) - resp.SetMetaHeader(s.metaHdr) - - return resp, nil -} - -func (s *executorSvc) Get(ctx context.Context, req *object.GetRequest) (object.GetObjectStreamer, error) { - bodyStream, err := s.exec.Get(ctx, req.GetBody()) - if err != nil { - return nil, errors.Wrap(err, "could not execute Get request") - } - - return &getStreamer{ - bodyStreamer: bodyStream, - metaHdr: s.metaHeader, - }, nil -} - -func (s *putStreamer) Send(req *object.PutRequest) error { - return s.bodyStreamer.Send(req.GetBody()) -} - -func (s *putStreamer) CloseAndRecv() (*object.PutResponse, error) { - body, err := s.bodyStreamer.CloseAndRecv() - if err != nil { - return nil, errors.Wrap(err, "could not receive response body") - } - - resp := new(object.PutResponse) - resp.SetBody(body) - resp.SetMetaHeader(s.metaHdr) - - return resp, nil -} - -func (s *executorSvc) Put(ctx context.Context) (object.PutObjectStreamer, error) { - bodyStream, err := s.exec.Put(ctx) - if err != nil { - return nil, errors.Wrap(err, "could not execute Put request") - } - - return &putStreamer{ - bodyStreamer: bodyStream, - metaHdr: s.metaHeader, - }, nil -} - -func (s *executorSvc) Head(ctx context.Context, req *object.HeadRequest) (*object.HeadResponse, error) { - respBody, err := s.exec.Head(ctx, req.GetBody()) - if err != nil { - return nil, errors.Wrap(err, "could not execute Head request") - } - - resp := new(object.HeadResponse) - resp.SetBody(respBody) - resp.SetMetaHeader(s.metaHeader) - - return resp, nil -} - -func (s *searchStreamer) Recv() (*object.SearchResponse, error) { - body, err := s.bodyStreamer.Recv() - if err != nil { - return nil, errors.Wrap(err, "could not receive response body") - } - - resp := new(object.SearchResponse) - resp.SetBody(body) - resp.SetMetaHeader(s.metaHdr) - - return resp, nil -} - -func (s *executorSvc) Search(ctx context.Context, req *object.SearchRequest) (object.SearchObjectStreamer, error) { - bodyStream, err := s.exec.Search(ctx, req.GetBody()) - if err != nil { - return nil, errors.Wrap(err, "could not execute Search request") - } - - return &searchStreamer{ - bodyStreamer: bodyStream, - metaHdr: s.metaHeader, - }, nil -} - -func (s *executorSvc) Delete(ctx context.Context, req *object.DeleteRequest) (*object.DeleteResponse, error) { - respBody, err := s.exec.Delete(ctx, req.GetBody()) - if err != nil { - return nil, errors.Wrap(err, "could not execute Delete request") - } - - resp := new(object.DeleteResponse) - resp.SetBody(respBody) - resp.SetMetaHeader(s.metaHeader) - - return resp, nil -} - -func (s *rangeStreamer) Recv() (*object.GetRangeResponse, error) { - body, err := s.bodyStreamer.Recv() - if err != nil { - return nil, errors.Wrap(err, "could not receive response body") - } - - resp := new(object.GetRangeResponse) - resp.SetBody(body) - resp.SetMetaHeader(s.metaHdr) - - return resp, nil -} - -func (s *executorSvc) GetRange(ctx context.Context, req *object.GetRangeRequest) (object.GetRangeObjectStreamer, error) { - bodyStream, err := s.exec.GetRange(ctx, req.GetBody()) - if err != nil { - return nil, errors.Wrap(err, "could not execute GetRange request") - } - - return &rangeStreamer{ - bodyStreamer: bodyStream, - metaHdr: s.metaHeader, - }, nil -} - -func (s *executorSvc) GetRangeHash(ctx context.Context, req *object.GetRangeHashRequest) (*object.GetRangeHashResponse, error) { - respBody, err := s.exec.GetRangeHash(ctx, req.GetBody()) - if err != nil { - return nil, errors.Wrap(err, "could not execute GetRangeHash request") - } - - resp := new(object.GetRangeHashResponse) - resp.SetBody(respBody) - resp.SetMetaHeader(s.metaHeader) - - return resp, nil -}