diff --git a/pkg/network/muxer/listener.go b/pkg/network/muxer/listener.go deleted file mode 100644 index 9ba669951..000000000 --- a/pkg/network/muxer/listener.go +++ /dev/null @@ -1,51 +0,0 @@ -package muxer - -import ( - "net" - - manet "github.com/multiformats/go-multiaddr-net" - "github.com/pkg/errors" -) - -type netListenerAdapter struct { - manet.Listener -} - -var errNothingAccept = errors.New("nothing to accept") - -// Accept waits for and returns the next connection to the listener. -func (l *netListenerAdapter) Accept() (net.Conn, error) { - if l.Listener == nil { - return nil, errNothingAccept - } - - return l.Listener.Accept() -} - -// Close closes the listener. -// Any blocked Accept operations will be unblocked and return errors. -func (l *netListenerAdapter) Close() error { - if l.Listener == nil { - return nil - } - - return l.Listener.Close() -} - -// Addr returns the net.Listener's network address. -func (l *netListenerAdapter) Addr() net.Addr { - if l.Listener == nil { - return (*net.TCPAddr)(nil) - } - - return l.Listener.Addr() -} - -// NetListener turns this Listener into a net.Listener. -// -// * Connections returned from Accept implement multiaddr-net Conn. -// * Calling WrapNetListener on the net.Listener returned by this function will -// return the original (underlying) multiaddr-net Listener. -func NetListener(l manet.Listener) net.Listener { - return &netListenerAdapter{Listener: l} -} diff --git a/pkg/network/muxer/muxer.go b/pkg/network/muxer/muxer.go deleted file mode 100644 index 3f8640382..000000000 --- a/pkg/network/muxer/muxer.go +++ /dev/null @@ -1,239 +0,0 @@ -package muxer - -import ( - "context" - "net" - "strings" - "sync/atomic" - "time" - - "github.com/multiformats/go-multiaddr" - "github.com/nspcc-dev/neofs-node/pkg/network" - "github.com/soheilhy/cmux" - "github.com/valyala/fasthttp" - "go.uber.org/zap" - "google.golang.org/grpc" -) - -type ( - // StoreParams groups the parameters of network connections muxer constructor. - Params struct { - Logger *zap.Logger - API *fasthttp.Server - Address multiaddr.Multiaddr - ShutdownTTL time.Duration - P2P *grpc.Server - } - - // Mux is an interface of network connections muxer. - Mux interface { - Start(ctx context.Context) - Stop() - } - - muxer struct { - maddr multiaddr.Multiaddr - run *int32 - lis net.Listener - log *zap.Logger - ttl time.Duration - - p2p *grpc.Server - api *fasthttp.Server - - done chan struct{} - } -) - -const ( - // we close listener, that's why we ignore this errors - errClosedConnection = "use of closed network connection" - errMuxListenerClose = "mux: listener closed" - errHTTPServerClosed = "http: Server closed" -) - -var ( - ignoredErrors = []string{ - errClosedConnection, - errMuxListenerClose, - errHTTPServerClosed, - } -) - -// New constructs network connections muxer and returns Mux interface. -func New(p Params) Mux { - return &muxer{ - maddr: p.Address, - ttl: p.ShutdownTTL, - run: new(int32), - api: p.API, - p2p: p.P2P, - log: p.Logger, - - done: make(chan struct{}), - } -} - -func needCatch(err error) bool { - if err == nil || containsErr(err) { - return false - } - - return true -} - -func containsErr(err error) bool { - for _, msg := range ignoredErrors { - if strings.Contains(err.Error(), msg) { - return true - } - } - - return false -} - -func (m *muxer) Start(ctx context.Context) { - var err error - - // if already started - ignore - if !atomic.CompareAndSwapInt32(m.run, 0, 1) { - m.log.Warn("already started") - return - } else if m.lis != nil { - m.log.Info("try close old listener") - if err = m.lis.Close(); err != nil { - m.log.Fatal("could not close old listener", - zap.Error(err)) - } - } - - if m.lis, err = network.Listen(m.maddr); err != nil { - m.log.Fatal("could not close old listener", - zap.Error(err)) - } - - mux := cmux.New(m.lis) - mux.HandleError(func(e error) bool { - if needCatch(e) { - m.log.Error("error-handler: something went wrong", - zap.Error(e)) - } - return true - }) - - // trpcL := mux.Match(cmux.Any()) // Any means anything that is not yet matched. - hLis := mux.Match(cmux.HTTP1Fast()) - gLis := mux.Match(cmux.HTTP2()) - pLis := mux.Match(cmux.Any()) - - m.log.Debug("delay context worker") - - go func() { - <-ctx.Done() - m.Stop() - }() - - m.log.Debug("delay tcp") - - go func() { - m.log.Debug("tcp: serve") - loop: - for { - select { - case <-ctx.Done(): - break loop - default: - } - - con, err := pLis.Accept() - if err != nil { - break loop - } - - _ = con.Close() - } - - m.log.Debug("tcp: stopped") - }() - - m.log.Debug("delay p2p") - - go func() { - if m.p2p == nil { - m.log.Info("p2p: service is empty") - return - } - - m.log.Debug("p2p: serve") - - if err := m.p2p.Serve(gLis); needCatch(err) { - m.log.Error("p2p: something went wrong", - zap.Error(err)) - } - - m.log.Debug("p2p: stopped") - }() - - m.log.Debug("delay api") - - go func() { - if m.api == nil { - m.log.Info("api: service is empty") - return - } - - m.log.Debug("api: serve") - - if err := m.api.Serve(hLis); needCatch(err) { - m.log.Error("rpc: something went wrong", - zap.Error(err)) - } - - m.log.Debug("rpc: stopped") - }() - - m.log.Debug("delay serve") - - go func() { - defer func() { close(m.done) }() - - m.log.Debug("mux: serve") - - if err := mux.Serve(); needCatch(err) { - m.log.Fatal("mux: something went wrong", - zap.Error(err)) - } - - m.log.Debug("mux: stopped") - }() -} - -func (m *muxer) Stop() { - if !atomic.CompareAndSwapInt32(m.run, 1, 0) { - m.log.Warn("already stopped") - return - } - - if err := m.lis.Close(); err != nil { - m.log.Error("could not close connection", - zap.Error(err)) - } - - m.log.Debug("lis: close ok") - - <-m.done // muxer stopped - - if m.api != nil { - if err := m.api.Shutdown(); needCatch(err) { - m.log.Error("api: could not shutdown", - zap.Error(err)) - } - - m.log.Debug("api: shutdown ok") - } - - if m.p2p != nil { - m.p2p.GracefulStop() - m.log.Debug("p2p: shutdown ok") - } -} diff --git a/pkg/network/muxer/muxer_test.go b/pkg/network/muxer/muxer_test.go deleted file mode 100644 index 79a7e1f32..000000000 --- a/pkg/network/muxer/muxer_test.go +++ /dev/null @@ -1,396 +0,0 @@ -package muxer - -import ( - "context" - "net" - "net/http" - "os" - "reflect" - "strings" - "sync" - "testing" - "time" - - "bou.ke/monkey" - "github.com/multiformats/go-multiaddr" - manet "github.com/multiformats/go-multiaddr-net" - "github.com/pkg/errors" - "github.com/soheilhy/cmux" - "github.com/spf13/viper" - "github.com/stretchr/testify/require" - "github.com/valyala/fasthttp" - "go.uber.org/atomic" - "go.uber.org/zap" - "go.uber.org/zap/zapcore" - "google.golang.org/grpc" -) - -type ( - errListener struct { - net.TCPListener - } - - syncListener struct { - sync.Mutex - net.Listener - } - - errMuxer struct { - handleError func(error) bool - } - - testWriter struct{} - - // service is used to implement GreaterServer. - service struct{} -) - -const MIMEApplicationJSON = "application/json" - -// Hello is simple handler -func (*service) Hello(ctx context.Context, req *HelloRequest) (*HelloResponse, error) { - return &HelloResponse{ - Message: "Hello " + req.Name, - }, nil -} - -func (testWriter) Sync() error { return nil } -func (testWriter) Write(p []byte) (n int, err error) { return len(p), nil } - -func (errMuxer) Match(...cmux.Matcher) net.Listener { - return &errListener{} -} - -func (errMuxer) MatchWithWriters(...cmux.MatchWriter) net.Listener { - return &errListener{} -} - -func (errMuxer) Serve() error { - return errors.New("cmux.Serve error") -} - -func (e *errMuxer) HandleError(h cmux.ErrorHandler) { - e.handleError = h -} - -func (errMuxer) SetReadTimeout(time.Duration) { - panic("implement me") -} - -func (l *syncListener) Close() error { - l.Lock() - err := l.Listener.Close() - l.Unlock() - return err -} - -func (errListener) Close() error { return errors.New("close error") } - -func testMultiAddr(is *require.Assertions) multiaddr.Multiaddr { - mAddr, err := multiaddr.NewMultiaddr("/ip4/0.0.0.0/tcp/0") - is.NoError(err) - return mAddr -} - -func testLogger() *zap.Logger { - encoderCfg := zapcore.EncoderConfig{ - MessageKey: "msg", - LevelKey: "level", - NameKey: "logger", - EncodeLevel: zapcore.LowercaseLevelEncoder, - EncodeTime: zapcore.ISO8601TimeEncoder, - EncodeDuration: zapcore.StringDurationEncoder, - } - core := zapcore.NewCore(zapcore.NewJSONEncoder(encoderCfg), testWriter{}, zap.DPanicLevel) - return zap.New(core).WithOptions() -} - -func testHTTPServer() *fasthttp.Server { - return &fasthttp.Server{Handler: func(ctx *fasthttp.RequestCtx) {}} -} - -func TestSuite(t *testing.T) { - t.Run("it should run, stop and not panic", func(t *testing.T) { - var ( - is = require.New(t) - v = viper.New() - g = grpc.NewServer() - l = testLogger() - a = testMultiAddr(is) - s = time.Second - err error - ) - - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - - v.SetDefault("api.address", "/ip4/0.0.0.0/tcp/0") - v.SetDefault("api.shutdown_timeout", time.Second) - - m := New(Params{ - Logger: l, - Address: a, - ShutdownTTL: s, - API: testHTTPServer(), - P2P: g, - }) - - is.NotPanics(func() { - m.Start(ctx) - }) - - res, err := http.Post("http://"+m.(*muxer).lis.Addr().String(), MIMEApplicationJSON, strings.NewReader(`{ - "jsonrpc": "2.0", - "id": 1 - "method": "get_version", - "params": [], - }`)) - is.NoError(err) - defer res.Body.Close() - - time.Sleep(100 * time.Millisecond) - - is.NotPanics(m.Stop) - }) - - t.Run("it should work with gRPC", func(t *testing.T) { - var ( - is = require.New(t) - g = grpc.NewServer() - l = testLogger() - s = time.Second - err error - ) - - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - - addr, err := multiaddr.NewMultiaddr("/ip4/127.0.0.1/tcp/63090") - is.NoError(err) - - RegisterGreeterServer(g, &service{}) - - m := New(Params{ - Logger: l, - Address: addr, - ShutdownTTL: s, - P2P: g, - }) - - is.NotPanics(func() { - m.Start(ctx) - }) - - a, err := manet.ToNetAddr(addr) - require.NoError(t, err) - - con, err := grpc.DialContext(ctx, a.String(), grpc.WithInsecure()) - require.NoError(t, err) - - res, err := NewGreeterClient(con).Hello(ctx, &HelloRequest{Name: "test"}) - is.NoError(err) - is.Contains(res.Message, "test") - - time.Sleep(100 * time.Millisecond) - - is.NotPanics(m.Stop) - }) - - t.Run("it should not start if already started", func(t *testing.T) { - var ( - is = require.New(t) - g = grpc.NewServer() - l = testLogger() - a = testMultiAddr(is) - s = time.Second - ) - - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - - m := New(Params{ - Logger: l, - Address: a, - ShutdownTTL: s, - API: testHTTPServer(), - P2P: g, - }) - is.NotNil(m) - - mux, ok := m.(*muxer) - is.True(ok) - is.NotNil(mux) - - *mux.run = 1 - - is.NotPanics(func() { - mux.Start(ctx) - }) - - *mux.run = 0 - - is.NotPanics(mux.Stop) - }) - - t.Run("it should fail on close listener", func(t *testing.T) { - var ( - is = require.New(t) - g = grpc.NewServer() - l = testLogger() - a = testMultiAddr(is) - s = time.Second - ) - - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - - m := New(Params{ - Logger: l, - Address: a, - ShutdownTTL: s, - API: testHTTPServer(), - P2P: g, - }) - is.NotNil(m) - - mux, ok := m.(*muxer) - is.True(ok) - is.NotNil(mux) - - mux.lis = &errListener{} - - exit := atomic.NewInt32(0) - - monkey.Patch(os.Exit, func(v int) { exit.Store(int32(v)) }) - - is.NotPanics(func() { - mux.Start(ctx) - }) - is.Equal(int32(1), exit.Load()) - }) - - t.Run("it should fail on create/close Listener without handlers", func(t *testing.T) { - var ( - is = require.New(t) - l = testLogger() - err error - ) - - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - - mux := new(muxer) - mux.log = l - mux.run = new(int32) - mux.done = make(chan struct{}) - mux.maddr, err = multiaddr.NewMultiaddr("/ip4/1.1.1.1/tcp/2") - is.NoError(err) - - mux.lis, err = net.ListenTCP("tcp", nil) - is.NoError(err) - - exit := atomic.NewInt32(0) - monkey.Patch(os.Exit, func(v int) { - exit.Store(int32(v)) - }) - - m := &errMuxer{handleError: func(e error) bool { return true }} - monkey.Patch(cmux.New, func(net.Listener) cmux.CMux { - // prevent panic: - mux.lis, err = net.ListenTCP("tcp", nil) - return m - }) - - mux.Start(ctx) - // c.So(mux.Start, ShouldNotPanic) - - m.handleError(errors.New("test")) - - is.Equal(int32(1), exit.Load()) - - mux.lis = &errListener{} - *mux.run = 1 - - is.NotPanics(mux.Stop) - }) - - t.Run("it should fail on create/close Listener with handlers", func(t *testing.T) { - var ( - is = require.New(t) - g = grpc.NewServer() - l = testLogger() - err error - ) - - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - - mux := new(muxer) - mux.api = testHTTPServer() - mux.p2p = g - mux.log = l - mux.run = new(int32) - mux.done = make(chan struct{}) - mux.maddr, err = multiaddr.NewMultiaddr("/ip4/1.1.1.1/tcp/2") - is.NoError(err) - - mu := new(sync.Mutex) - - exit := atomic.NewInt32(0) - monkey.Patch(os.Exit, func(v int) { - exit.Store(int32(v)) - - mu.Lock() - if l, ok := mux.lis.(*syncListener); ok { - l.Lock() - l.Listener, _ = net.ListenTCP("tcp", nil) - l.Unlock() - } - mu.Unlock() - }) - - m := &errMuxer{handleError: func(e error) bool { return true }} - monkey.Patch(cmux.New, func(net.Listener) cmux.CMux { - // prevent panic: - return m - }) - - is.NotPanics(func() { - mux.Start(ctx) - }) - - m.handleError(errors.New("test")) - - is.Equal(int32(1), exit.Load()) - - mu.Lock() - mux.lis = &syncListener{Listener: &errListener{}} - mu.Unlock() - *mux.run = 1 - - monkey.PatchInstanceMethod(reflect.TypeOf(&http.Server{}), "Shutdown", func(*http.Server, context.Context) error { - return errors.New("http.Shutdown error") - }) - - is.NotPanics(mux.Stop) - }) - - t.Run("should not panic when work with nil listener", func(t *testing.T) { - var ( - is = require.New(t) - err error - ) - - lis := NetListener(nil) - is.NotPanics(func() { - is.NoError(lis.Close()) - }) - is.NotPanics(func() { - lis.Addr() - }) - is.NotPanics(func() { - _, err = lis.Accept() - is.EqualError(err, errNothingAccept.Error()) - }) - }) -} diff --git a/pkg/network/muxer/muxer_test.pb.go b/pkg/network/muxer/muxer_test.pb.go deleted file mode 100644 index a7a905a37..000000000 Binary files a/pkg/network/muxer/muxer_test.pb.go and /dev/null differ diff --git a/pkg/network/muxer/muxer_test.proto b/pkg/network/muxer/muxer_test.proto deleted file mode 100644 index 3079865c8..000000000 --- a/pkg/network/muxer/muxer_test.proto +++ /dev/null @@ -1,18 +0,0 @@ -syntax = "proto3"; -option go_package = "github.com/nspcc-dev/neofs-node/pkg/network/muxer"; - -package muxer; - -// The Greater service definition. -service Greeter { - rpc Hello(HelloRequest) returns (HelloResponse); -} - -// Request message example -message HelloRequest { - string name = 1; -} - -message HelloResponse { - string message = 1; -}