From 414ba6e0a20583eef2a0f982fa903b67732bebfc Mon Sep 17 00:00:00 2001 From: Evgenii Stratonikov Date: Fri, 18 Mar 2022 18:13:12 +0300 Subject: [PATCH] [#1244] nats: Split client creation into 2 stages Create and connect to an endpoint using separate functions. Signed-off-by: Evgenii Stratonikov --- cmd/neofs-node/main.go | 1 + cmd/neofs-node/notificator.go | 19 +++++--- pkg/services/notificator/nats/options.go | 2 +- pkg/services/notificator/nats/service.go | 61 +++++++++++++----------- 4 files changed, 47 insertions(+), 36 deletions(-) diff --git a/cmd/neofs-node/main.go b/cmd/neofs-node/main.go index 4647ed59ba..6cc5ebad53 100644 --- a/cmd/neofs-node/main.go +++ b/cmd/neofs-node/main.go @@ -89,6 +89,7 @@ func initApp(c *cfg) { } func bootUp(c *cfg) { + connectNats(c) serveGRPC(c) makeAndWaitNotaryDeposit(c) bootstrapNode(c) diff --git a/cmd/neofs-node/notificator.go b/cmd/neofs-node/notificator.go index 7c02e8b985..9944fc05aa 100644 --- a/cmd/neofs-node/notificator.go +++ b/cmd/neofs-node/notificator.go @@ -115,9 +115,7 @@ func initNotifications(c *cfg) { topic = pubKey } - natsSvc, err := nats.New( - c.ctx, - nodeconfig.Notification(c.appCfg).Endpoint(), + natsSvc := nats.New( nats.WithConnectionName("NeoFS Storage Node: "+pubKey), // connection name is used in the server side logs nats.WithTimeout(nodeconfig.Notification(c.appCfg).Timeout()), nats.WithClientCert( @@ -127,9 +125,6 @@ func initNotifications(c *cfg) { nats.WithRootCA(nodeconfig.Notification(c.appCfg).CAPath()), nats.WithLogger(c.log), ) - if err != nil { - panic("could not created object notificator: " + err.Error()) - } c.cfgNotifications = cfgNotifications{ enabled: true, @@ -158,3 +153,15 @@ func initNotifications(c *cfg) { }) } } + +func connectNats(c *cfg) { + if !c.cfgNotifications.enabled { + return + } + + endpoint := nodeconfig.Notification(c.appCfg).Endpoint() + err := c.cfgNotifications.nw.w.Connect(c.ctx, endpoint) + if err != nil { + panic(fmt.Sprintf("could not connect to a nats endpoint %s: %v", endpoint, err)) + } +} diff --git a/pkg/services/notificator/nats/options.go b/pkg/services/notificator/nats/options.go index 8f87b6bd45..278705ac55 100644 --- a/pkg/services/notificator/nats/options.go +++ b/pkg/services/notificator/nats/options.go @@ -33,6 +33,6 @@ func WithConnectionName(name string) Option { func WithLogger(logger *zap.Logger) Option { return func(o *opts) { - o.logger = logger + o.log = logger } } diff --git a/pkg/services/notificator/nats/service.go b/pkg/services/notificator/nats/service.go index 1bb5cc7ae2..5b4363878f 100644 --- a/pkg/services/notificator/nats/service.go +++ b/pkg/services/notificator/nats/service.go @@ -20,17 +20,17 @@ import ( // new(Writer) or Writer{} construction leads to undefined // behaviour and is not safe. type Writer struct { - log *zap.Logger - js nats.JetStreamContext - nc *nats.Conn + js nats.JetStreamContext + nc *nats.Conn m *sync.RWMutex createdStreams map[string]struct{} + opts } type opts struct { - logger *zap.Logger - nOpts []nats.Option + log *zap.Logger + nOpts []nats.Option } type Option func(*opts) @@ -79,51 +79,54 @@ func (n *Writer) Notify(topic string, address *addressSDK.Address) error { return nil } -// New creates and inits new Writer. -// Connection is closed when passed context is done. -// -// Returns error only if fails to open connection to a NATS server -// with provided configuration. -func New(ctx context.Context, endpoint string, oo ...Option) (*Writer, error) { - opts := &opts{ - logger: zap.L(), - nOpts: make([]nats.Option, 0, len(oo)+3), +// New creates new Writer. +func New(oo ...Option) *Writer { + w := &Writer{ + m: &sync.RWMutex{}, + createdStreams: make(map[string]struct{}), + opts: opts{ + log: zap.L(), + nOpts: make([]nats.Option, 0, len(oo)+3), + }, } for _, o := range oo { - o(opts) + o(&w.opts) } - opts.nOpts = append(opts.nOpts, + w.opts.nOpts = append(w.opts.nOpts, nats.NoCallbacksAfterClientClose(), // do not call callbacks when it was planned writer stop nats.DisconnectErrHandler(func(conn *nats.Conn, err error) { - opts.logger.Error("nats: connection was lost", zap.Error(err)) + w.log.Error("nats: connection was lost", zap.Error(err)) }), nats.ReconnectHandler(func(conn *nats.Conn) { - opts.logger.Warn("nats: reconnected to the server") + w.log.Warn("nats: reconnected to the server") }), ) - nc, err := nats.Connect(endpoint, opts.nOpts...) + return w +} + +// Connect tries to connect to a specified NATS endpoint. +// +// Connection is closed when passed context is done. +func (n *Writer) Connect(ctx context.Context, endpoint string) error { + nc, err := nats.Connect(endpoint, n.opts.nOpts...) if err != nil { - return nil, fmt.Errorf("could not connect to server: %w", err) + return fmt.Errorf("could not connect to server: %w", err) } + n.nc = nc + // usage w/o options is error-free - js, _ := nc.JetStream() + n.js, _ = nc.JetStream() go func() { <-ctx.Done() - opts.logger.Info("nats: closing connection as the context is done") + n.opts.log.Info("nats: closing connection as the context is done") nc.Close() }() - return &Writer{ - js: js, - nc: nc, - log: opts.logger, - m: &sync.RWMutex{}, - createdStreams: make(map[string]struct{}), - }, nil + return nil }