forked from TrueCloudLab/frostfs-node
[#11] transport: Implement gRPC server handlers of API service
Signed-off-by: Leonard Lyubich <leonard@nspcc.ru>
This commit is contained in:
parent
265c26150d
commit
c2954aab20
10 changed files with 311 additions and 1 deletions
2
go.mod
2
go.mod
|
@ -16,7 +16,7 @@ require (
|
|||
github.com/multiformats/go-multihash v0.0.13
|
||||
github.com/nspcc-dev/hrw v1.0.9
|
||||
github.com/nspcc-dev/neo-go v0.90.0
|
||||
github.com/nspcc-dev/neofs-api-go v1.3.1-0.20200820112910-89e79ebe72b0
|
||||
github.com/nspcc-dev/neofs-api-go v1.3.1-0.20200821125006-afd2ce0400ec
|
||||
github.com/nspcc-dev/neofs-crypto v0.3.0
|
||||
github.com/nspcc-dev/netmap v1.7.0
|
||||
github.com/nspcc-dev/tzhash v1.4.0 // indirect
|
||||
|
|
BIN
go.sum
BIN
go.sum
Binary file not shown.
32
pkg/network/transport/accounting/grpc/service.go
Normal file
32
pkg/network/transport/accounting/grpc/service.go
Normal file
|
@ -0,0 +1,32 @@
|
|||
package accounting
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
||||
"github.com/nspcc-dev/neofs-api-go/v2/accounting"
|
||||
accountingGRPC "github.com/nspcc-dev/neofs-api-go/v2/accounting/grpc"
|
||||
)
|
||||
|
||||
// Server wraps NeoFS API Accounting service and
|
||||
// provides gRPC Accounting service server interface.
|
||||
type Server struct {
|
||||
srv accounting.Service
|
||||
}
|
||||
|
||||
// New creates, initializes and returns Server instance.
|
||||
func New(c accounting.Service) *Server {
|
||||
return &Server{
|
||||
srv: c,
|
||||
}
|
||||
}
|
||||
|
||||
// Balance converts gRPC BalanceRequest message and passes it to internal Accounting service.
|
||||
func (s *Server) Balance(ctx context.Context, req *accountingGRPC.BalanceRequest) (*accountingGRPC.BalanceResponse, error) {
|
||||
resp, err := s.srv.Balance(ctx, accounting.BalanceRequestFromGRPCMessage(req))
|
||||
if err != nil {
|
||||
// TODO: think about how we transport errors through gRPC
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return accounting.BalanceResponseToGRPCMessage(resp), nil
|
||||
}
|
87
pkg/network/transport/container/grpc/service.go
Normal file
87
pkg/network/transport/container/grpc/service.go
Normal file
|
@ -0,0 +1,87 @@
|
|||
package container
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
||||
"github.com/nspcc-dev/neofs-api-go/v2/container"
|
||||
containerGRPC "github.com/nspcc-dev/neofs-api-go/v2/container/grpc"
|
||||
)
|
||||
|
||||
// Server wraps NeoFS API Container service and
|
||||
// provides gRPC Container service server interface.
|
||||
type Server struct {
|
||||
srv container.Service
|
||||
}
|
||||
|
||||
// New creates, initializes and returns Server instance.
|
||||
func New(c container.Service) *Server {
|
||||
return &Server{
|
||||
srv: c,
|
||||
}
|
||||
}
|
||||
|
||||
// Put converts gRPC PutRequest message and passes it to internal Container service.
|
||||
func (s *Server) Put(ctx context.Context, req *containerGRPC.PutRequest) (*containerGRPC.PutResponse, error) {
|
||||
resp, err := s.srv.Put(ctx, container.PutRequestFromGRPCMessage(req))
|
||||
if err != nil {
|
||||
// TODO: think about how we transport errors through gRPC
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return container.PutResponseToGRPCMessage(resp), nil
|
||||
}
|
||||
|
||||
// Delete converts gRPC DeleteRequest message and passes it to internal Container service.
|
||||
func (s *Server) Delete(ctx context.Context, req *containerGRPC.DeleteRequest) (*containerGRPC.DeleteResponse, error) {
|
||||
resp, err := s.srv.Delete(ctx, container.DeleteRequestFromGRPCMessage(req))
|
||||
if err != nil {
|
||||
// TODO: think about how we transport errors through gRPC
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return container.DeleteResponseToGRPCMessage(resp), nil
|
||||
}
|
||||
|
||||
// Get converts gRPC GetRequest message and passes it to internal Container service.
|
||||
func (s *Server) Get(ctx context.Context, req *containerGRPC.GetRequest) (*containerGRPC.GetResponse, error) {
|
||||
resp, err := s.srv.Get(ctx, container.GetRequestFromGRPCMessage(req))
|
||||
if err != nil {
|
||||
// TODO: think about how we transport errors through gRPC
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return container.GetResponseToGRPCMessage(resp), nil
|
||||
}
|
||||
|
||||
// List converts gRPC ListRequest message and passes it to internal Container service.
|
||||
func (s *Server) List(ctx context.Context, req *containerGRPC.ListRequest) (*containerGRPC.ListResponse, error) {
|
||||
resp, err := s.srv.List(ctx, container.ListRequestFromGRPCMessage(req))
|
||||
if err != nil {
|
||||
// TODO: think about how we transport errors through gRPC
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return container.ListResponseToGRPCMessage(resp), nil
|
||||
}
|
||||
|
||||
// SetExtendedACL converts gRPC SetExtendedACLRequest message and passes it to internal Container service.
|
||||
func (s *Server) SetExtendedACL(ctx context.Context, req *containerGRPC.SetExtendedACLRequest) (*containerGRPC.SetExtendedACLResponse, error) {
|
||||
resp, err := s.srv.SetExtendedACL(ctx, container.SetExtendedACLRequestFromGRPCMessage(req))
|
||||
if err != nil {
|
||||
// TODO: think about how we transport errors through gRPC
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return container.SetExtendedACLResponseToGRPCMessage(resp), nil
|
||||
}
|
||||
|
||||
// GetExtendedACL converts gRPC GetExtendedACLRequest message and passes it to internal Container service.
|
||||
func (s *Server) GetExtendedACL(ctx context.Context, req *containerGRPC.GetExtendedACLRequest) (*containerGRPC.GetExtendedACLResponse, error) {
|
||||
resp, err := s.srv.GetExtendedACL(ctx, container.GetExtendedACLRequestFromGRPCMessage(req))
|
||||
if err != nil {
|
||||
// TODO: think about how we transport errors through gRPC
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return container.GetExtendedACLResponseToGRPCMessage(resp), nil
|
||||
}
|
159
pkg/network/transport/object/grpc/service.go
Normal file
159
pkg/network/transport/object/grpc/service.go
Normal file
|
@ -0,0 +1,159 @@
|
|||
package object
|
||||
|
||||
import (
|
||||
"context"
|
||||
"io"
|
||||
|
||||
"github.com/nspcc-dev/neofs-api-go/v2/object"
|
||||
objectGRPC "github.com/nspcc-dev/neofs-api-go/v2/object/grpc"
|
||||
)
|
||||
|
||||
// Server wraps NeoFS API Object service and
|
||||
// provides gRPC Object service server interface.
|
||||
type Server struct {
|
||||
srv object.Service
|
||||
}
|
||||
|
||||
// New creates, initializes and returns Server instance.
|
||||
func New(c object.Service) *Server {
|
||||
return &Server{
|
||||
srv: c,
|
||||
}
|
||||
}
|
||||
|
||||
// Get converts gRPC GetRequest message, opens internal Object service Get stream and overtakes its data
|
||||
// to gRPC stream.
|
||||
func (s *Server) Get(req *objectGRPC.GetRequest, gStream objectGRPC.ObjectService_GetServer) error {
|
||||
stream, err := s.srv.Get(gStream.Context(), object.GetRequestFromGRPCMessage(req))
|
||||
if err != nil {
|
||||
// TODO: think about how we transport errors through gRPC
|
||||
return err
|
||||
}
|
||||
|
||||
for {
|
||||
r, err := stream.Recv()
|
||||
if err != nil {
|
||||
if err == io.EOF {
|
||||
return nil
|
||||
}
|
||||
|
||||
return err
|
||||
}
|
||||
|
||||
if err := gStream.Send(object.GetResponseToGRPCMessage(r)); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Put opens internal Object service Put stream and overtakes data from gRPC stream to it.
|
||||
func (s *Server) Put(gStream objectGRPC.ObjectService_PutServer) error {
|
||||
stream, err := s.srv.Put(gStream.Context())
|
||||
if err != nil {
|
||||
// TODO: think about how we transport errors through gRPC
|
||||
return err
|
||||
}
|
||||
|
||||
for {
|
||||
req, err := gStream.Recv()
|
||||
if err == nil {
|
||||
if err := stream.Send(object.PutRequestFromGRPCMessage(req)); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
if err == io.EOF {
|
||||
resp, err := stream.CloseAndRecv()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return gStream.SendAndClose(object.PutResponseToGRPCMessage(resp))
|
||||
}
|
||||
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
// Delete converts gRPC DeleteRequest message and passes it to internal Object service.
|
||||
func (s *Server) Delete(ctx context.Context, req *objectGRPC.DeleteRequest) (*objectGRPC.DeleteResponse, error) {
|
||||
resp, err := s.srv.Delete(ctx, object.DeleteRequestFromGRPCMessage(req))
|
||||
if err != nil {
|
||||
// TODO: think about how we transport errors through gRPC
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return object.DeleteResponseToGRPCMessage(resp), nil
|
||||
}
|
||||
|
||||
// Head converts gRPC HeadRequest message and passes it to internal Object service.
|
||||
func (s *Server) Head(ctx context.Context, req *objectGRPC.HeadRequest) (*objectGRPC.HeadResponse, error) {
|
||||
resp, err := s.srv.Head(ctx, object.HeadRequestFromGRPCMessage(req))
|
||||
if err != nil {
|
||||
// TODO: think about how we transport errors through gRPC
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return object.HeadResponseToGRPCMessage(resp), nil
|
||||
}
|
||||
|
||||
// Search converts gRPC SearchRequest message, opens internal Object service Search stream and overtakes its data
|
||||
// to gRPC stream.
|
||||
func (s *Server) Search(req *objectGRPC.SearchRequest, gStream objectGRPC.ObjectService_SearchServer) error {
|
||||
stream, err := s.srv.Search(gStream.Context(), object.SearchRequestFromGRPCMessage(req))
|
||||
if err != nil {
|
||||
// TODO: think about how we transport errors through gRPC
|
||||
return err
|
||||
}
|
||||
|
||||
for {
|
||||
r, err := stream.Recv()
|
||||
if err != nil {
|
||||
if err == io.EOF {
|
||||
return nil
|
||||
}
|
||||
|
||||
return err
|
||||
}
|
||||
|
||||
if err := gStream.Send(object.SearchResponseToGRPCMessage(r)); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// GetRange converts gRPC GetRangeRequest message, opens internal Object service Search stream and overtakes its data
|
||||
// to gRPC stream.
|
||||
func (s *Server) GetRange(req *objectGRPC.GetRangeRequest, gStream objectGRPC.ObjectService_GetRangeServer) error {
|
||||
stream, err := s.srv.GetRange(gStream.Context(), object.GetRangeRequestFromGRPCMessage(req))
|
||||
if err != nil {
|
||||
// TODO: think about how we transport errors through gRPC
|
||||
return err
|
||||
}
|
||||
|
||||
for {
|
||||
r, err := stream.Recv()
|
||||
if err != nil {
|
||||
if err == io.EOF {
|
||||
return nil
|
||||
}
|
||||
|
||||
return err
|
||||
}
|
||||
|
||||
if err := gStream.Send(object.GetRangeResponseToGRPCMessage(r)); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// GetRangeHash converts gRPC GetRangeHashRequest message and passes it to internal Object service.
|
||||
func (s *Server) GetRangeHash(ctx context.Context, req *objectGRPC.GetRangeHashRequest) (*objectGRPC.GetRangeHashResponse, error) {
|
||||
resp, err := s.srv.GetRangeHash(ctx, object.GetRangeHashRequestFromGRPCMessage(req))
|
||||
if err != nil {
|
||||
// TODO: think about how we transport errors through gRPC
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return object.GetRangeHashResponseToGRPCMessage(resp), nil
|
||||
}
|
32
pkg/network/transport/session/grpc/service.go
Normal file
32
pkg/network/transport/session/grpc/service.go
Normal file
|
@ -0,0 +1,32 @@
|
|||
package accounting
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
||||
"github.com/nspcc-dev/neofs-api-go/v2/session"
|
||||
sessionGRPC "github.com/nspcc-dev/neofs-api-go/v2/session/grpc"
|
||||
)
|
||||
|
||||
// Server wraps NeoFS API Session service and
|
||||
// provides gRPC Session service server interface.
|
||||
type Server struct {
|
||||
srv session.Service
|
||||
}
|
||||
|
||||
// New creates, initializes and returns Server instance.
|
||||
func New(c session.Service) *Server {
|
||||
return &Server{
|
||||
srv: c,
|
||||
}
|
||||
}
|
||||
|
||||
// Create converts gRPC CreateRequest message and passes it to internal Session service.
|
||||
func (s *Server) Create(ctx context.Context, req *sessionGRPC.CreateRequest) (*sessionGRPC.CreateResponse, error) {
|
||||
resp, err := s.srv.Create(ctx, session.CreateRequestFromGRPCMessage(req))
|
||||
if err != nil {
|
||||
// TODO: think about how we transport errors through gRPC
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return session.CreateResponseToGRPCMessage(resp), nil
|
||||
}
|
Loading…
Reference in a new issue