From 65eb1181e9f63f6c72d56933459e760f60d0aa36 Mon Sep 17 00:00:00 2001 From: Leonard Lyubich Date: Sat, 22 Aug 2020 14:03:45 +0300 Subject: [PATCH] [#11] Use gRPC handlers in neofs-node application Signed-off-by: Leonard Lyubich --- cmd/neofs-node/config.go | 13 ++++- cmd/neofs-node/grpc.go | 123 ++++++++++++++++++++++----------------- cmd/neofs-node/main.go | 8 ++- 3 files changed, 87 insertions(+), 57 deletions(-) diff --git a/cmd/neofs-node/config.go b/cmd/neofs-node/config.go index 0acde3f8e..98e5df170 100644 --- a/cmd/neofs-node/config.go +++ b/cmd/neofs-node/config.go @@ -1,11 +1,22 @@ package main +import ( + "context" + "sync" +) + type cfg struct { + ctx context.Context + + wg *sync.WaitGroup + grpcAddr string } func defaultCfg() *cfg { return &cfg{ - grpcAddr: ":50501", + ctx: context.Background(), + wg: new(sync.WaitGroup), + grpcAddr: "127.0.0.1:50501", } } diff --git a/cmd/neofs-node/grpc.go b/cmd/neofs-node/grpc.go index fec514595..1acdb754d 100644 --- a/cmd/neofs-node/grpc.go +++ b/cmd/neofs-node/grpc.go @@ -2,112 +2,129 @@ package main import ( "context" + "fmt" "net" - accounting "github.com/nspcc-dev/neofs-api-go/v2/accounting/grpc" + "github.com/nspcc-dev/neofs-api-go/v2/accounting" + accountingGRPC "github.com/nspcc-dev/neofs-api-go/v2/accounting/grpc" + containerGRPC "github.com/nspcc-dev/neofs-api-go/v2/container" container "github.com/nspcc-dev/neofs-api-go/v2/container/grpc" + objectGRPC "github.com/nspcc-dev/neofs-api-go/v2/object" object "github.com/nspcc-dev/neofs-api-go/v2/object/grpc" + sessionGRPC "github.com/nspcc-dev/neofs-api-go/v2/session" session "github.com/nspcc-dev/neofs-api-go/v2/session/grpc" + accountingTransport "github.com/nspcc-dev/neofs-node/pkg/network/transport/accounting/grpc" + containerTransport "github.com/nspcc-dev/neofs-node/pkg/network/transport/container/grpc" + objectTransport "github.com/nspcc-dev/neofs-node/pkg/network/transport/object/grpc" + sessionTransport "github.com/nspcc-dev/neofs-node/pkg/network/transport/session/grpc" "github.com/pkg/errors" "google.golang.org/grpc" ) -type unimplementedServer struct { - accServer - cnrServer - objServer -} +type accountingSvc struct{} -type accServer struct{} +type sessionSvc struct{} -type sesServer struct{} +type containerSvc struct{} -type cnrServer struct{} - -type objServer struct{} +type objectSvc struct{} func unimplementedErr(srv, call string) error { - return errors.Errorf("unimplemented service call %s.%s", srv, call) + return errors.Errorf("unimplemented API service call %s.%s", srv, call) } -func (*accServer) Balance(context.Context, *accounting.BalanceRequest) (*accounting.BalanceResponse, error) { +func (s *accountingSvc) Balance(context.Context, *accounting.BalanceRequest) (*accounting.BalanceResponse, error) { return nil, unimplementedErr("Accounting", "Balance") } -func (*sesServer) Create(context.Context, *session.CreateRequest) (*session.CreateResponse, error) { +func (s *sessionSvc) Create(context.Context, *sessionGRPC.CreateRequest) (*sessionGRPC.CreateResponse, error) { return nil, unimplementedErr("Session", "Create") } -func (*cnrServer) Put(context.Context, *container.PutRequest) (*container.PutResponse, error) { - return nil, unimplementedErr("Contianer", "Put") +func (s *containerSvc) Put(context.Context, *containerGRPC.PutRequest) (*containerGRPC.PutResponse, error) { + return nil, unimplementedErr("Container", "Put") } -func (*cnrServer) Delete(context.Context, *container.DeleteRequest) (*container.DeleteResponse, error) { - return nil, unimplementedErr("Contianer", "Delete") +func (s *containerSvc) Delete(context.Context, *containerGRPC.DeleteRequest) (*containerGRPC.DeleteResponse, error) { + return nil, unimplementedErr("Container", "Delete") } -func (*cnrServer) Get(context.Context, *container.GetRequest) (*container.GetResponse, error) { - return nil, unimplementedErr("Contianer", "Get") +func (s *containerSvc) Get(context.Context, *containerGRPC.GetRequest) (*containerGRPC.GetResponse, error) { + return nil, unimplementedErr("Container", "Get") } -func (*cnrServer) List(context.Context, *container.ListRequest) (*container.ListResponse, error) { - return nil, unimplementedErr("Contianer", "List") +func (s *containerSvc) List(context.Context, *containerGRPC.ListRequest) (*containerGRPC.ListResponse, error) { + return nil, unimplementedErr("Container", "List") } -func (*cnrServer) SetExtendedACL(context.Context, *container.SetExtendedACLRequest) (*container.SetExtendedACLResponse, error) { - return nil, unimplementedErr("Contianer", "SetExtendedACL") +func (s *containerSvc) SetExtendedACL(context.Context, *containerGRPC.SetExtendedACLRequest) (*containerGRPC.SetExtendedACLResponse, error) { + return nil, unimplementedErr("Container", "SetExtendedACL") } -func (*cnrServer) GetExtendedACL(context.Context, *container.GetExtendedACLRequest) (*container.GetExtendedACLResponse, error) { - return nil, unimplementedErr("Contianer", "GetExtendedACL") +func (s *containerSvc) GetExtendedACL(context.Context, *containerGRPC.GetExtendedACLRequest) (*containerGRPC.GetExtendedACLResponse, error) { + return nil, unimplementedErr("Container", "GetExtendedACL") } -func (*objServer) Get(*object.GetRequest, object.ObjectService_GetServer) error { - return unimplementedErr("Object", "Get") +func (s *objectSvc) Get(context.Context, *objectGRPC.GetRequest) (objectGRPC.GetObjectStreamer, error) { + return nil, unimplementedErr("Object", "Get") } -func (*objServer) Put(object.ObjectService_PutServer) error { - return unimplementedErr("Object", "Put") +func (s *objectSvc) Put(context.Context) (objectGRPC.PutObjectStreamer, error) { + return nil, unimplementedErr("Object", "Put") } -func (*objServer) Delete(context.Context, *object.DeleteRequest) (*object.DeleteResponse, error) { +func (s *objectSvc) Head(context.Context, *objectGRPC.HeadRequest) (*objectGRPC.HeadResponse, error) { + return nil, unimplementedErr("Object", "Put") +} + +func (s *objectSvc) Search(context.Context, *objectGRPC.SearchRequest) (objectGRPC.SearchObjectStreamer, error) { + return nil, unimplementedErr("Object", "Search") +} + +func (s *objectSvc) Delete(context.Context, *objectGRPC.DeleteRequest) (*objectGRPC.DeleteResponse, error) { return nil, unimplementedErr("Object", "Delete") } -func (*objServer) Head(context.Context, *object.HeadRequest) (*object.HeadResponse, error) { - return nil, unimplementedErr("Object", "Head") +func (s *objectSvc) GetRange(context.Context, *objectGRPC.GetRangeRequest) (objectGRPC.GetRangeObjectStreamer, error) { + return nil, unimplementedErr("Object", "GetRange") } -func (*objServer) Search(*object.SearchRequest, object.ObjectService_SearchServer) error { - return unimplementedErr("Object", "Search") -} - -func (*objServer) GetRange(*object.GetRangeRequest, object.ObjectService_GetRangeServer) error { - return unimplementedErr("Object", "GetRange") -} - -func (*objServer) GetRangeHash(context.Context, *object.GetRangeHashRequest) (*object.GetRangeHashResponse, error) { +func (s *objectSvc) GetRangeHash(context.Context, *objectGRPC.GetRangeHashRequest) (*objectGRPC.GetRangeHashResponse, error) { return nil, unimplementedErr("Object", "GetRangeHash") } -func serveGRPC(c *cfg) error { +func serveGRPC(c *cfg) { lis, err := net.Listen("tcp", c.grpcAddr) fatalOnErr(err) srv := grpc.NewServer() - s := new(unimplementedServer) + accountingGRPC.RegisterAccountingServiceServer(srv, accountingTransport.New(new(accountingSvc))) + container.RegisterContainerServiceServer(srv, containerTransport.New(new(containerSvc))) + session.RegisterSessionServiceServer(srv, sessionTransport.New(new(sessionSvc))) + object.RegisterObjectServiceServer(srv, objectTransport.New(new(objectSvc))) - accounting.RegisterAccountingServiceServer(srv, s) - container.RegisterContainerServiceServer(srv, s) - session.RegisterSessionServiceServer(srv, s) - object.RegisterObjectServiceServer(srv, s) + go func() { + c.wg.Add(1) + defer func() { + c.wg.Done() + }() - if err := srv.Serve(lis); err != nil { - return err - } + if err := srv.Serve(lis); err != nil { + fmt.Println("gRPC server error", err) + } + }() - lis.Close() + go func() { + c.wg.Add(1) + defer func() { + fmt.Println("gRPC server stopped gracefully") + fmt.Println("net listener stopped", lis.Addr()) + c.wg.Done() + }() - return nil + <-c.ctx.Done() + + srv.GracefulStop() + }() } diff --git a/cmd/neofs-node/main.go b/cmd/neofs-node/main.go index f904f33b3..2f83e9d60 100644 --- a/cmd/neofs-node/main.go +++ b/cmd/neofs-node/main.go @@ -15,9 +15,11 @@ func fatalOnErr(err error) { func main() { c := defaultCfg() - fatalOnErr(serveGRPC(c)) + c.ctx = grace.NewGracefulContext(nil) - ctx := grace.NewGracefulContext(nil) + serveGRPC(c) - <-ctx.Done() + <-c.ctx.Done() + + c.wg.Wait() }