diff --git a/go.mod b/go.mod index 1993ed041..f82ba9f5f 100644 --- a/go.mod +++ b/go.mod @@ -16,7 +16,7 @@ require ( github.com/multiformats/go-multihash v0.0.13 github.com/nspcc-dev/hrw v1.0.9 github.com/nspcc-dev/neo-go v0.90.0 - github.com/nspcc-dev/neofs-api-go v1.3.1-0.20200820112910-89e79ebe72b0 + github.com/nspcc-dev/neofs-api-go v1.3.1-0.20200821125006-afd2ce0400ec github.com/nspcc-dev/neofs-crypto v0.3.0 github.com/nspcc-dev/netmap v1.7.0 github.com/nspcc-dev/tzhash v1.4.0 // indirect diff --git a/go.sum b/go.sum index 5f3644aee..5eb9c3e80 100644 Binary files a/go.sum and b/go.sum differ diff --git a/pkg/network/transport/accounting/grpc/.gitkeep b/pkg/network/transport/accounting/grpc/.gitkeep deleted file mode 100644 index e69de29bb..000000000 diff --git a/pkg/network/transport/accounting/grpc/service.go b/pkg/network/transport/accounting/grpc/service.go new file mode 100644 index 000000000..b2fd0e413 --- /dev/null +++ b/pkg/network/transport/accounting/grpc/service.go @@ -0,0 +1,32 @@ +package accounting + +import ( + "context" + + "github.com/nspcc-dev/neofs-api-go/v2/accounting" + accountingGRPC "github.com/nspcc-dev/neofs-api-go/v2/accounting/grpc" +) + +// Server wraps NeoFS API Accounting service and +// provides gRPC Accounting service server interface. +type Server struct { + srv accounting.Service +} + +// New creates, initializes and returns Server instance. +func New(c accounting.Service) *Server { + return &Server{ + srv: c, + } +} + +// Balance converts gRPC BalanceRequest message and passes it to internal Accounting service. +func (s *Server) Balance(ctx context.Context, req *accountingGRPC.BalanceRequest) (*accountingGRPC.BalanceResponse, error) { + resp, err := s.srv.Balance(ctx, accounting.BalanceRequestFromGRPCMessage(req)) + if err != nil { + // TODO: think about how we transport errors through gRPC + return nil, err + } + + return accounting.BalanceResponseToGRPCMessage(resp), nil +} diff --git a/pkg/network/transport/container/grpc/.gitkeep b/pkg/network/transport/container/grpc/.gitkeep deleted file mode 100644 index e69de29bb..000000000 diff --git a/pkg/network/transport/container/grpc/service.go b/pkg/network/transport/container/grpc/service.go new file mode 100644 index 000000000..d4583111a --- /dev/null +++ b/pkg/network/transport/container/grpc/service.go @@ -0,0 +1,87 @@ +package container + +import ( + "context" + + "github.com/nspcc-dev/neofs-api-go/v2/container" + containerGRPC "github.com/nspcc-dev/neofs-api-go/v2/container/grpc" +) + +// Server wraps NeoFS API Container service and +// provides gRPC Container service server interface. +type Server struct { + srv container.Service +} + +// New creates, initializes and returns Server instance. +func New(c container.Service) *Server { + return &Server{ + srv: c, + } +} + +// Put converts gRPC PutRequest message and passes it to internal Container service. +func (s *Server) Put(ctx context.Context, req *containerGRPC.PutRequest) (*containerGRPC.PutResponse, error) { + resp, err := s.srv.Put(ctx, container.PutRequestFromGRPCMessage(req)) + if err != nil { + // TODO: think about how we transport errors through gRPC + return nil, err + } + + return container.PutResponseToGRPCMessage(resp), nil +} + +// Delete converts gRPC DeleteRequest message and passes it to internal Container service. +func (s *Server) Delete(ctx context.Context, req *containerGRPC.DeleteRequest) (*containerGRPC.DeleteResponse, error) { + resp, err := s.srv.Delete(ctx, container.DeleteRequestFromGRPCMessage(req)) + if err != nil { + // TODO: think about how we transport errors through gRPC + return nil, err + } + + return container.DeleteResponseToGRPCMessage(resp), nil +} + +// Get converts gRPC GetRequest message and passes it to internal Container service. +func (s *Server) Get(ctx context.Context, req *containerGRPC.GetRequest) (*containerGRPC.GetResponse, error) { + resp, err := s.srv.Get(ctx, container.GetRequestFromGRPCMessage(req)) + if err != nil { + // TODO: think about how we transport errors through gRPC + return nil, err + } + + return container.GetResponseToGRPCMessage(resp), nil +} + +// List converts gRPC ListRequest message and passes it to internal Container service. +func (s *Server) List(ctx context.Context, req *containerGRPC.ListRequest) (*containerGRPC.ListResponse, error) { + resp, err := s.srv.List(ctx, container.ListRequestFromGRPCMessage(req)) + if err != nil { + // TODO: think about how we transport errors through gRPC + return nil, err + } + + return container.ListResponseToGRPCMessage(resp), nil +} + +// SetExtendedACL converts gRPC SetExtendedACLRequest message and passes it to internal Container service. +func (s *Server) SetExtendedACL(ctx context.Context, req *containerGRPC.SetExtendedACLRequest) (*containerGRPC.SetExtendedACLResponse, error) { + resp, err := s.srv.SetExtendedACL(ctx, container.SetExtendedACLRequestFromGRPCMessage(req)) + if err != nil { + // TODO: think about how we transport errors through gRPC + return nil, err + } + + return container.SetExtendedACLResponseToGRPCMessage(resp), nil +} + +// GetExtendedACL converts gRPC GetExtendedACLRequest message and passes it to internal Container service. +func (s *Server) GetExtendedACL(ctx context.Context, req *containerGRPC.GetExtendedACLRequest) (*containerGRPC.GetExtendedACLResponse, error) { + resp, err := s.srv.GetExtendedACL(ctx, container.GetExtendedACLRequestFromGRPCMessage(req)) + if err != nil { + // TODO: think about how we transport errors through gRPC + return nil, err + } + + return container.GetExtendedACLResponseToGRPCMessage(resp), nil +} diff --git a/pkg/network/transport/object/grpc/.gitkeep b/pkg/network/transport/object/grpc/.gitkeep deleted file mode 100644 index e69de29bb..000000000 diff --git a/pkg/network/transport/object/grpc/service.go b/pkg/network/transport/object/grpc/service.go new file mode 100644 index 000000000..a993c6758 --- /dev/null +++ b/pkg/network/transport/object/grpc/service.go @@ -0,0 +1,159 @@ +package object + +import ( + "context" + "io" + + "github.com/nspcc-dev/neofs-api-go/v2/object" + objectGRPC "github.com/nspcc-dev/neofs-api-go/v2/object/grpc" +) + +// Server wraps NeoFS API Object service and +// provides gRPC Object service server interface. +type Server struct { + srv object.Service +} + +// New creates, initializes and returns Server instance. +func New(c object.Service) *Server { + return &Server{ + srv: c, + } +} + +// Get converts gRPC GetRequest message, opens internal Object service Get stream and overtakes its data +// to gRPC stream. +func (s *Server) Get(req *objectGRPC.GetRequest, gStream objectGRPC.ObjectService_GetServer) error { + stream, err := s.srv.Get(gStream.Context(), object.GetRequestFromGRPCMessage(req)) + if err != nil { + // TODO: think about how we transport errors through gRPC + return err + } + + for { + r, err := stream.Recv() + if err != nil { + if err == io.EOF { + return nil + } + + return err + } + + if err := gStream.Send(object.GetResponseToGRPCMessage(r)); err != nil { + return err + } + } +} + +// Put opens internal Object service Put stream and overtakes data from gRPC stream to it. +func (s *Server) Put(gStream objectGRPC.ObjectService_PutServer) error { + stream, err := s.srv.Put(gStream.Context()) + if err != nil { + // TODO: think about how we transport errors through gRPC + return err + } + + for { + req, err := gStream.Recv() + if err == nil { + if err := stream.Send(object.PutRequestFromGRPCMessage(req)); err != nil { + return err + } + } + + if err == io.EOF { + resp, err := stream.CloseAndRecv() + if err != nil { + return err + } + + return gStream.SendAndClose(object.PutResponseToGRPCMessage(resp)) + } + + return err + } +} + +// Delete converts gRPC DeleteRequest message and passes it to internal Object service. +func (s *Server) Delete(ctx context.Context, req *objectGRPC.DeleteRequest) (*objectGRPC.DeleteResponse, error) { + resp, err := s.srv.Delete(ctx, object.DeleteRequestFromGRPCMessage(req)) + if err != nil { + // TODO: think about how we transport errors through gRPC + return nil, err + } + + return object.DeleteResponseToGRPCMessage(resp), nil +} + +// Head converts gRPC HeadRequest message and passes it to internal Object service. +func (s *Server) Head(ctx context.Context, req *objectGRPC.HeadRequest) (*objectGRPC.HeadResponse, error) { + resp, err := s.srv.Head(ctx, object.HeadRequestFromGRPCMessage(req)) + if err != nil { + // TODO: think about how we transport errors through gRPC + return nil, err + } + + return object.HeadResponseToGRPCMessage(resp), nil +} + +// Search converts gRPC SearchRequest message, opens internal Object service Search stream and overtakes its data +// to gRPC stream. +func (s *Server) Search(req *objectGRPC.SearchRequest, gStream objectGRPC.ObjectService_SearchServer) error { + stream, err := s.srv.Search(gStream.Context(), object.SearchRequestFromGRPCMessage(req)) + if err != nil { + // TODO: think about how we transport errors through gRPC + return err + } + + for { + r, err := stream.Recv() + if err != nil { + if err == io.EOF { + return nil + } + + return err + } + + if err := gStream.Send(object.SearchResponseToGRPCMessage(r)); err != nil { + return err + } + } +} + +// GetRange converts gRPC GetRangeRequest message, opens internal Object service Search stream and overtakes its data +// to gRPC stream. +func (s *Server) GetRange(req *objectGRPC.GetRangeRequest, gStream objectGRPC.ObjectService_GetRangeServer) error { + stream, err := s.srv.GetRange(gStream.Context(), object.GetRangeRequestFromGRPCMessage(req)) + if err != nil { + // TODO: think about how we transport errors through gRPC + return err + } + + for { + r, err := stream.Recv() + if err != nil { + if err == io.EOF { + return nil + } + + return err + } + + if err := gStream.Send(object.GetRangeResponseToGRPCMessage(r)); err != nil { + return err + } + } +} + +// GetRangeHash converts gRPC GetRangeHashRequest message and passes it to internal Object service. +func (s *Server) GetRangeHash(ctx context.Context, req *objectGRPC.GetRangeHashRequest) (*objectGRPC.GetRangeHashResponse, error) { + resp, err := s.srv.GetRangeHash(ctx, object.GetRangeHashRequestFromGRPCMessage(req)) + if err != nil { + // TODO: think about how we transport errors through gRPC + return nil, err + } + + return object.GetRangeHashResponseToGRPCMessage(resp), nil +} diff --git a/pkg/network/transport/session/grpc/.gitkeep b/pkg/network/transport/session/grpc/.gitkeep deleted file mode 100644 index e69de29bb..000000000 diff --git a/pkg/network/transport/session/grpc/service.go b/pkg/network/transport/session/grpc/service.go new file mode 100644 index 000000000..3a248dbc8 --- /dev/null +++ b/pkg/network/transport/session/grpc/service.go @@ -0,0 +1,32 @@ +package accounting + +import ( + "context" + + "github.com/nspcc-dev/neofs-api-go/v2/session" + sessionGRPC "github.com/nspcc-dev/neofs-api-go/v2/session/grpc" +) + +// Server wraps NeoFS API Session service and +// provides gRPC Session service server interface. +type Server struct { + srv session.Service +} + +// New creates, initializes and returns Server instance. +func New(c session.Service) *Server { + return &Server{ + srv: c, + } +} + +// Create converts gRPC CreateRequest message and passes it to internal Session service. +func (s *Server) Create(ctx context.Context, req *sessionGRPC.CreateRequest) (*sessionGRPC.CreateResponse, error) { + resp, err := s.srv.Create(ctx, session.CreateRequestFromGRPCMessage(req)) + if err != nil { + // TODO: think about how we transport errors through gRPC + return nil, err + } + + return session.CreateResponseToGRPCMessage(resp), nil +}