[#1244] nats: Split client creation into 2 stages

Create and connect to an endpoint using separate functions.

Signed-off-by: Evgenii Stratonikov <evgeniy@nspcc.ru>
This commit is contained in:
Evgenii Stratonikov 2022-03-18 18:13:12 +03:00 committed by Alex Vanin
parent 2b0460c532
commit 414ba6e0a2
4 changed files with 47 additions and 36 deletions

View file

@ -89,6 +89,7 @@ func initApp(c *cfg) {
} }
func bootUp(c *cfg) { func bootUp(c *cfg) {
connectNats(c)
serveGRPC(c) serveGRPC(c)
makeAndWaitNotaryDeposit(c) makeAndWaitNotaryDeposit(c)
bootstrapNode(c) bootstrapNode(c)

View file

@ -115,9 +115,7 @@ func initNotifications(c *cfg) {
topic = pubKey topic = pubKey
} }
natsSvc, err := nats.New( natsSvc := nats.New(
c.ctx,
nodeconfig.Notification(c.appCfg).Endpoint(),
nats.WithConnectionName("NeoFS Storage Node: "+pubKey), // connection name is used in the server side logs nats.WithConnectionName("NeoFS Storage Node: "+pubKey), // connection name is used in the server side logs
nats.WithTimeout(nodeconfig.Notification(c.appCfg).Timeout()), nats.WithTimeout(nodeconfig.Notification(c.appCfg).Timeout()),
nats.WithClientCert( nats.WithClientCert(
@ -127,9 +125,6 @@ func initNotifications(c *cfg) {
nats.WithRootCA(nodeconfig.Notification(c.appCfg).CAPath()), nats.WithRootCA(nodeconfig.Notification(c.appCfg).CAPath()),
nats.WithLogger(c.log), nats.WithLogger(c.log),
) )
if err != nil {
panic("could not created object notificator: " + err.Error())
}
c.cfgNotifications = cfgNotifications{ c.cfgNotifications = cfgNotifications{
enabled: true, enabled: true,
@ -158,3 +153,15 @@ func initNotifications(c *cfg) {
}) })
} }
} }
func connectNats(c *cfg) {
if !c.cfgNotifications.enabled {
return
}
endpoint := nodeconfig.Notification(c.appCfg).Endpoint()
err := c.cfgNotifications.nw.w.Connect(c.ctx, endpoint)
if err != nil {
panic(fmt.Sprintf("could not connect to a nats endpoint %s: %v", endpoint, err))
}
}

View file

@ -33,6 +33,6 @@ func WithConnectionName(name string) Option {
func WithLogger(logger *zap.Logger) Option { func WithLogger(logger *zap.Logger) Option {
return func(o *opts) { return func(o *opts) {
o.logger = logger o.log = logger
} }
} }

View file

@ -20,17 +20,17 @@ import (
// new(Writer) or Writer{} construction leads to undefined // new(Writer) or Writer{} construction leads to undefined
// behaviour and is not safe. // behaviour and is not safe.
type Writer struct { type Writer struct {
log *zap.Logger js nats.JetStreamContext
js nats.JetStreamContext nc *nats.Conn
nc *nats.Conn
m *sync.RWMutex m *sync.RWMutex
createdStreams map[string]struct{} createdStreams map[string]struct{}
opts
} }
type opts struct { type opts struct {
logger *zap.Logger log *zap.Logger
nOpts []nats.Option nOpts []nats.Option
} }
type Option func(*opts) type Option func(*opts)
@ -79,51 +79,54 @@ func (n *Writer) Notify(topic string, address *addressSDK.Address) error {
return nil return nil
} }
// New creates and inits new Writer. // New creates new Writer.
// Connection is closed when passed context is done. func New(oo ...Option) *Writer {
// w := &Writer{
// Returns error only if fails to open connection to a NATS server m: &sync.RWMutex{},
// with provided configuration. createdStreams: make(map[string]struct{}),
func New(ctx context.Context, endpoint string, oo ...Option) (*Writer, error) { opts: opts{
opts := &opts{ log: zap.L(),
logger: zap.L(), nOpts: make([]nats.Option, 0, len(oo)+3),
nOpts: make([]nats.Option, 0, len(oo)+3), },
} }
for _, o := range oo { for _, o := range oo {
o(opts) o(&w.opts)
} }
opts.nOpts = append(opts.nOpts, w.opts.nOpts = append(w.opts.nOpts,
nats.NoCallbacksAfterClientClose(), // do not call callbacks when it was planned writer stop nats.NoCallbacksAfterClientClose(), // do not call callbacks when it was planned writer stop
nats.DisconnectErrHandler(func(conn *nats.Conn, err error) { nats.DisconnectErrHandler(func(conn *nats.Conn, err error) {
opts.logger.Error("nats: connection was lost", zap.Error(err)) w.log.Error("nats: connection was lost", zap.Error(err))
}), }),
nats.ReconnectHandler(func(conn *nats.Conn) { nats.ReconnectHandler(func(conn *nats.Conn) {
opts.logger.Warn("nats: reconnected to the server") w.log.Warn("nats: reconnected to the server")
}), }),
) )
nc, err := nats.Connect(endpoint, opts.nOpts...) return w
}
// Connect tries to connect to a specified NATS endpoint.
//
// Connection is closed when passed context is done.
func (n *Writer) Connect(ctx context.Context, endpoint string) error {
nc, err := nats.Connect(endpoint, n.opts.nOpts...)
if err != nil { if err != nil {
return nil, fmt.Errorf("could not connect to server: %w", err) return fmt.Errorf("could not connect to server: %w", err)
} }
n.nc = nc
// usage w/o options is error-free // usage w/o options is error-free
js, _ := nc.JetStream() n.js, _ = nc.JetStream()
go func() { go func() {
<-ctx.Done() <-ctx.Done()
opts.logger.Info("nats: closing connection as the context is done") n.opts.log.Info("nats: closing connection as the context is done")
nc.Close() nc.Close()
}() }()
return &Writer{ return nil
js: js,
nc: nc,
log: opts.logger,
m: &sync.RWMutex{},
createdStreams: make(map[string]struct{}),
}, nil
} }