[] grpc: Use new mechanism of message conversion

Signed-off-by: Leonard Lyubich <leonard@nspcc.ru>
This commit is contained in:
Leonard Lyubich 2021-03-15 13:59:28 +03:00 committed by Alex Vanin
parent 718a2fad26
commit 9eaba52660
8 changed files with 126 additions and 36 deletions
pkg/network/transport
accounting/grpc
container/grpc
netmap/grpc
object/grpc
session/grpc

View file

@ -23,11 +23,16 @@ func New(c accountingsvc.Server) *Server {
// Balance converts gRPC BalanceRequest message and passes it to internal Accounting service.
func (s *Server) Balance(ctx context.Context, req *accountingGRPC.BalanceRequest) (*accountingGRPC.BalanceResponse, error) {
resp, err := s.srv.Balance(ctx, accounting.BalanceRequestFromGRPCMessage(req))
balReq := new(accounting.BalanceRequest)
if err := balReq.FromGRPCMessage(req); err != nil {
return nil, err
}
resp, err := s.srv.Balance(ctx, balReq)
if err != nil {
// TODO: think about how we transport errors through gRPC
return nil, err
}
return accounting.BalanceResponseToGRPCMessage(resp), nil
return resp.ToGRPCMessage().(*accountingGRPC.BalanceResponse), nil
}

View file

@ -23,77 +23,112 @@ func New(c containersvc.Server) *Server {
// Put converts gRPC PutRequest message and passes it to internal Container service.
func (s *Server) Put(ctx context.Context, req *containerGRPC.PutRequest) (*containerGRPC.PutResponse, error) {
resp, err := s.srv.Put(ctx, container.PutRequestFromGRPCMessage(req))
putReq := new(container.PutRequest)
if err := putReq.FromGRPCMessage(req); err != nil {
return nil, err
}
resp, err := s.srv.Put(ctx, putReq)
if err != nil {
// TODO: think about how we transport errors through gRPC
return nil, err
}
return container.PutResponseToGRPCMessage(resp), nil
return resp.ToGRPCMessage().(*containerGRPC.PutResponse), nil
}
// Delete converts gRPC DeleteRequest message and passes it to internal Container service.
func (s *Server) Delete(ctx context.Context, req *containerGRPC.DeleteRequest) (*containerGRPC.DeleteResponse, error) {
resp, err := s.srv.Delete(ctx, container.DeleteRequestFromGRPCMessage(req))
delReq := new(container.DeleteRequest)
if err := delReq.FromGRPCMessage(req); err != nil {
return nil, err
}
resp, err := s.srv.Delete(ctx, delReq)
if err != nil {
// TODO: think about how we transport errors through gRPC
return nil, err
}
return container.DeleteResponseToGRPCMessage(resp), nil
return resp.ToGRPCMessage().(*containerGRPC.DeleteResponse), nil
}
// Get converts gRPC GetRequest message and passes it to internal Container service.
func (s *Server) Get(ctx context.Context, req *containerGRPC.GetRequest) (*containerGRPC.GetResponse, error) {
resp, err := s.srv.Get(ctx, container.GetRequestFromGRPCMessage(req))
getReq := new(container.GetRequest)
if err := getReq.FromGRPCMessage(req); err != nil {
return nil, err
}
resp, err := s.srv.Get(ctx, getReq)
if err != nil {
// TODO: think about how we transport errors through gRPC
return nil, err
}
return container.GetResponseToGRPCMessage(resp), nil
return resp.ToGRPCMessage().(*containerGRPC.GetResponse), nil
}
// List converts gRPC ListRequest message and passes it to internal Container service.
func (s *Server) List(ctx context.Context, req *containerGRPC.ListRequest) (*containerGRPC.ListResponse, error) {
resp, err := s.srv.List(ctx, container.ListRequestFromGRPCMessage(req))
listReq := new(container.ListRequest)
if err := listReq.FromGRPCMessage(req); err != nil {
return nil, err
}
resp, err := s.srv.List(ctx, listReq)
if err != nil {
// TODO: think about how we transport errors through gRPC
return nil, err
}
return container.ListResponseToGRPCMessage(resp), nil
return resp.ToGRPCMessage().(*containerGRPC.ListResponse), nil
}
// SetExtendedACL converts gRPC SetExtendedACLRequest message and passes it to internal Container service.
func (s *Server) SetExtendedACL(ctx context.Context, req *containerGRPC.SetExtendedACLRequest) (*containerGRPC.SetExtendedACLResponse, error) {
resp, err := s.srv.SetExtendedACL(ctx, container.SetExtendedACLRequestFromGRPCMessage(req))
setEACLReq := new(container.SetExtendedACLRequest)
if err := setEACLReq.FromGRPCMessage(req); err != nil {
return nil, err
}
resp, err := s.srv.SetExtendedACL(ctx, setEACLReq)
if err != nil {
// TODO: think about how we transport errors through gRPC
return nil, err
}
return container.SetExtendedACLResponseToGRPCMessage(resp), nil
return resp.ToGRPCMessage().(*containerGRPC.SetExtendedACLResponse), nil
}
// GetExtendedACL converts gRPC GetExtendedACLRequest message and passes it to internal Container service.
func (s *Server) GetExtendedACL(ctx context.Context, req *containerGRPC.GetExtendedACLRequest) (*containerGRPC.GetExtendedACLResponse, error) {
resp, err := s.srv.GetExtendedACL(ctx, container.GetExtendedACLRequestFromGRPCMessage(req))
getEACLReq := new(container.GetExtendedACLRequest)
if err := getEACLReq.FromGRPCMessage(req); err != nil {
return nil, err
}
resp, err := s.srv.GetExtendedACL(ctx, getEACLReq)
if err != nil {
// TODO: think about how we transport errors through gRPC
return nil, err
}
return container.GetExtendedACLResponseToGRPCMessage(resp), nil
return resp.ToGRPCMessage().(*containerGRPC.GetExtendedACLResponse), nil
}
// AnnounceUsedSpace converts gRPC AnnounceUsedSpaceRequest message and passes it to internal Container service.
func (s *Server) AnnounceUsedSpace(ctx context.Context, req *containerGRPC.AnnounceUsedSpaceRequest) (*containerGRPC.AnnounceUsedSpaceResponse, error) {
resp, err := s.srv.AnnounceUsedSpace(ctx, container.AnnounceUsedSpaceRequestFromGRPCMessage(req))
announceReq := new(container.AnnounceUsedSpaceRequest)
if err := announceReq.FromGRPCMessage(req); err != nil {
return nil, err
}
resp, err := s.srv.AnnounceUsedSpace(ctx, announceReq)
if err != nil {
// TODO: think about how we transport errors through gRPC
return nil, err
}
return container.AnnounceUsedSpaceResponseToGRPCMessage(resp), nil
return resp.ToGRPCMessage().(*containerGRPC.AnnounceUsedSpaceResponse), nil
}

View file

@ -25,22 +25,32 @@ func New(c netmapsvc.Server) *Server {
func (s *Server) LocalNodeInfo(
ctx context.Context,
req *netmapGRPC.LocalNodeInfoRequest) (*netmapGRPC.LocalNodeInfoResponse, error) {
resp, err := s.srv.LocalNodeInfo(ctx, netmap.LocalNodeInfoRequestFromGRPCMessage(req))
nodeInfoReq := new(netmap.LocalNodeInfoRequest)
if err := nodeInfoReq.FromGRPCMessage(req); err != nil {
return nil, err
}
resp, err := s.srv.LocalNodeInfo(ctx, nodeInfoReq)
if err != nil {
// TODO: think about how we transport errors through gRPC
return nil, err
}
return netmap.LocalNodeInfoResponseToGRPCMessage(resp), nil
return resp.ToGRPCMessage().(*netmapGRPC.LocalNodeInfoResponse), nil
}
// NetworkInfo converts gRPC request message and passes it to internal netmap service.
func (s *Server) NetworkInfo(ctx context.Context, req *netmapGRPC.NetworkInfoRequest) (*netmapGRPC.NetworkInfoResponse, error) {
resp, err := s.srv.NetworkInfo(ctx, netmap.NetworkInfoRequestFromGRPCMessage(req))
netInfoReq := new(netmap.NetworkInfoRequest)
if err := netInfoReq.FromGRPCMessage(req); err != nil {
return nil, err
}
resp, err := s.srv.NetworkInfo(ctx, netInfoReq)
if err != nil {
// TODO: think about how we transport errors through gRPC
return nil, err
}
return netmap.NetworkInfoResponseToGRPCMessage(resp), nil
return resp.ToGRPCMessage().(*netmapGRPC.NetworkInfoResponse), nil
}

View file

@ -11,16 +11,21 @@ type getStreamerV2 struct {
func (s *getStreamerV2) Send(resp *object.GetResponse) error {
return s.ObjectService_GetServer.Send(
object.GetResponseToGRPCMessage(resp),
resp.ToGRPCMessage().(*objectGRPC.GetResponse),
)
}
// Get converts gRPC GetRequest message and server-side stream and overtakes its data
// to gRPC stream.
func (s *Server) Get(req *objectGRPC.GetRequest, gStream objectGRPC.ObjectService_GetServer) error {
getReq := new(object.GetRequest)
if err := getReq.FromGRPCMessage(req); err != nil {
return err
}
// TODO: think about how we transport errors through gRPC
return s.srv.Get(
object.GetRequestFromGRPCMessage(req),
getReq,
&getStreamerV2{
ObjectService_GetServer: gStream,
},

View file

@ -11,16 +11,21 @@ type getRangeStreamerV2 struct {
func (s *getRangeStreamerV2) Send(resp *object.GetRangeResponse) error {
return s.ObjectService_GetRangeServer.Send(
object.GetRangeResponseToGRPCMessage(resp),
resp.ToGRPCMessage().(*objectGRPC.GetRangeResponse),
)
}
// GetRange converts gRPC GetRangeRequest message and server-side stream and overtakes its data
// to gRPC stream.
func (s *Server) GetRange(req *objectGRPC.GetRangeRequest, gStream objectGRPC.ObjectService_GetRangeServer) error {
getRngReq := new(object.GetRangeRequest)
if err := getRngReq.FromGRPCMessage(req); err != nil {
return err
}
// TODO: think about how we transport errors through gRPC
return s.srv.GetRange(
object.GetRangeRequestFromGRPCMessage(req),
getRngReq,
&getRangeStreamerV2{
ObjectService_GetRangeServer: gStream,
},

View file

@ -11,16 +11,21 @@ type searchStreamerV2 struct {
func (s *searchStreamerV2) Send(resp *object.SearchResponse) error {
return s.ObjectService_SearchServer.Send(
object.SearchResponseToGRPCMessage(resp),
resp.ToGRPCMessage().(*objectGRPC.SearchResponse),
)
}
// Search converts gRPC SearchRequest message and server-side stream and overtakes its data
// to gRPC stream.
func (s *Server) Search(req *objectGRPC.SearchRequest, gStream objectGRPC.ObjectService_SearchServer) error {
searchReq := new(object.SearchRequest)
if err := searchReq.FromGRPCMessage(req); err != nil {
return err
}
// TODO: think about how we transport errors through gRPC
return s.srv.Search(
object.SearchRequestFromGRPCMessage(req),
searchReq,
&searchStreamerV2{
ObjectService_SearchServer: gStream,
},

View file

@ -40,13 +40,18 @@ func (s *Server) Put(gStream objectGRPC.ObjectService_PutServer) error {
return err
}
return gStream.SendAndClose(object.PutResponseToGRPCMessage(resp))
return gStream.SendAndClose(resp.ToGRPCMessage().(*objectGRPC.PutResponse))
}
return err
}
if err := stream.Send(object.PutRequestFromGRPCMessage(req)); err != nil {
putReq := new(object.PutRequest)
if err := putReq.FromGRPCMessage(req); err != nil {
return err
}
if err := stream.Send(putReq); err != nil {
return err
}
}
@ -54,33 +59,48 @@ func (s *Server) Put(gStream objectGRPC.ObjectService_PutServer) error {
// Delete converts gRPC DeleteRequest message and passes it to internal Object service.
func (s *Server) Delete(ctx context.Context, req *objectGRPC.DeleteRequest) (*objectGRPC.DeleteResponse, error) {
resp, err := s.srv.Delete(ctx, object.DeleteRequestFromGRPCMessage(req))
delReq := new(object.DeleteRequest)
if err := delReq.FromGRPCMessage(req); err != nil {
return nil, err
}
resp, err := s.srv.Delete(ctx, delReq)
if err != nil {
// TODO: think about how we transport errors through gRPC
return nil, err
}
return object.DeleteResponseToGRPCMessage(resp), nil
return resp.ToGRPCMessage().(*objectGRPC.DeleteResponse), nil
}
// Head converts gRPC HeadRequest message and passes it to internal Object service.
func (s *Server) Head(ctx context.Context, req *objectGRPC.HeadRequest) (*objectGRPC.HeadResponse, error) {
resp, err := s.srv.Head(ctx, object.HeadRequestFromGRPCMessage(req))
searchReq := new(object.HeadRequest)
if err := searchReq.FromGRPCMessage(req); err != nil {
return nil, err
}
resp, err := s.srv.Head(ctx, searchReq)
if err != nil {
// TODO: think about how we transport errors through gRPC
return nil, err
}
return object.HeadResponseToGRPCMessage(resp), nil
return resp.ToGRPCMessage().(*objectGRPC.HeadResponse), nil
}
// GetRangeHash converts gRPC GetRangeHashRequest message and passes it to internal Object service.
func (s *Server) GetRangeHash(ctx context.Context, req *objectGRPC.GetRangeHashRequest) (*objectGRPC.GetRangeHashResponse, error) {
resp, err := s.srv.GetRangeHash(ctx, object.GetRangeHashRequestFromGRPCMessage(req))
hashRngReq := new(object.GetRangeHashRequest)
if err := hashRngReq.FromGRPCMessage(req); err != nil {
return nil, err
}
resp, err := s.srv.GetRangeHash(ctx, hashRngReq)
if err != nil {
// TODO: think about how we transport errors through gRPC
return nil, err
}
return object.GetRangeHashResponseToGRPCMessage(resp), nil
return resp.ToGRPCMessage().(*objectGRPC.GetRangeHashResponse), nil
}

View file

@ -23,11 +23,16 @@ func New(c sessionsvc.Server) *Server {
// Create converts gRPC CreateRequest message and passes it to internal Session service.
func (s *Server) Create(ctx context.Context, req *sessionGRPC.CreateRequest) (*sessionGRPC.CreateResponse, error) {
resp, err := s.srv.Create(ctx, session.CreateRequestFromGRPCMessage(req))
createReq := new(session.CreateRequest)
if err := createReq.FromGRPCMessage(req); err != nil {
return nil, err
}
resp, err := s.srv.Create(ctx, createReq)
if err != nil {
// TODO: think about how we transport errors through gRPC
return nil, err
}
return session.CreateResponseToGRPCMessage(resp), nil
return resp.ToGRPCMessage().(*sessionGRPC.CreateResponse), nil
}