From 79c6f52a2787043a96327dff4dbc5f39a54c0abf Mon Sep 17 00:00:00 2001 From: Pavel Karpy Date: Mon, 1 Mar 2021 14:51:38 +0300 Subject: [PATCH] [#1183] node: Add NATS client Signed-off-by: Pavel Karpy --- go.mod | 4 +- go.sum | Bin 93844 -> 95855 bytes pkg/services/notificator/nats/options.go | 38 +++++++ pkg/services/notificator/nats/service.go | 129 +++++++++++++++++++++++ 4 files changed, 170 insertions(+), 1 deletion(-) create mode 100644 pkg/services/notificator/nats/options.go create mode 100644 pkg/services/notificator/nats/service.go diff --git a/go.mod b/go.mod index a7eb3a811..124854498 100644 --- a/go.mod +++ b/go.mod @@ -8,10 +8,12 @@ require ( github.com/google/go-github/v39 v39.2.0 github.com/google/uuid v1.2.0 github.com/hashicorp/golang-lru v0.5.4 - github.com/klauspost/compress v1.13.1 + github.com/klauspost/compress v1.13.4 github.com/mitchellh/go-homedir v1.1.0 github.com/mr-tron/base58 v1.2.0 github.com/multiformats/go-multiaddr v0.4.0 + github.com/nats-io/nats-server/v2 v2.7.2 // indirect + github.com/nats-io/nats.go v1.13.1-0.20220121202836-972a071d373d github.com/nspcc-dev/hrw v1.0.9 github.com/nspcc-dev/neo-go v0.98.0 github.com/nspcc-dev/neofs-api-go/v2 v2.12.0 diff --git a/go.sum b/go.sum index eb1242673d77d94207c110b543a5209695f39caf..c3cccf6d287c268a1989cda5edd566fc4700b58e 100644 GIT binary patch delta 1840 zcmbuA&5Pq?9LFiBhs9k)aTaA?=Cqg6_Ib&Z7hz%3OeRf}PSdn$rxg!*Ym%mE+9qk! zUWPp_cv`74;9nsOGXKTk3Ld?9Rb0^ZDt3llt+IoHxjf`Ke7@h`>+|{dZ$AI&&97cW zfZF%rM_}!{Hj>%3G#T}5Wu=bk0DIa@;g1AiLR>D77J(?b$8@r}h#nNEGi5Tp@v$Fx z{?c6eX5#NJU>TI*CCnMnvYfF4NOi{o=uU@9Jmh_0#Zb$v(z1wWPY7D|;&lZ)yx|3@ zpIK!)jw(?w3F692=H}kJd&O!i2@ht)Jgh@Bj>d)&Fa~Il_R_;*PcJ%ZCoG0zXVK03 zn-5pQ*^hZ}>%>fx(v~w?rj-TSU!XE7!xAVXfDnKXZx05e5@B7IVL4(Uj=nU#Ft~Ku zz~*$+AFHa{M|iFG>f9hq=VUZ3M&nrLD5Q$bY5#AgC;yZ?`FFjF7nh^hxk`LGTXVH> zMq5JlQ0?)2Tk%D*FnG#Fl*%cZVR(yW7KxScwX#?8i927o^Ow<{FlBT}E~?@Y8-e1; z5yC*SS(fTe4zgS?&PFV#5OY2%CTk_rr@H(f$y@M_^VANbc?ue|)in<~IE;MGn9H-u ziBN4;tBG6eOpjXu89S!QUMo=c;>*T?c&9`NZcD{ty2LTa1PpXAgRgQW>Fr3YXK5{T z;#^7e)l?%4k!2@7cA08KuWGGBS~{^0suTYIEAE6^)xwyHkCRnL9@q zRi9A$0p>KwzkHak7uB6t%6zNfY8WvQ z72{+x`R?8s_3i!r&G|1|{PSB9+oiEl zDy6im38$k)m-IJ}f4hJ7_OJJD?roPWB!Htx$+S(1Vj0dMH1)-!Lig)p;lbV>g)m?d zi%NaAqKr)O)^|u11F!$Rbw>V?44J@HTh1Z!RA?&nS7J?PvzWvzk8PFg~4RO y@A8vpe~+Hb|4VT5wI8y)+vC_6)y1}dkYRLY+&)i^(TQz(f)*qH_5-Sn>jMBe1zuJF diff --git a/pkg/services/notificator/nats/options.go b/pkg/services/notificator/nats/options.go new file mode 100644 index 000000000..8f87b6bd4 --- /dev/null +++ b/pkg/services/notificator/nats/options.go @@ -0,0 +1,38 @@ +package nats + +import ( + "time" + + "github.com/nats-io/nats.go" + "go.uber.org/zap" +) + +func WithClientCert(certPath, keyPath string) Option { + return func(o *opts) { + o.nOpts = append(o.nOpts, nats.ClientCert(certPath, keyPath)) + } +} + +func WithRootCA(paths ...string) Option { + return func(o *opts) { + o.nOpts = append(o.nOpts, nats.RootCAs(paths...)) + } +} + +func WithTimeout(timeout time.Duration) Option { + return func(o *opts) { + o.nOpts = append(o.nOpts, nats.Timeout(timeout)) + } +} + +func WithConnectionName(name string) Option { + return func(o *opts) { + o.nOpts = append(o.nOpts, nats.Name(name)) + } +} + +func WithLogger(logger *zap.Logger) Option { + return func(o *opts) { + o.logger = logger + } +} diff --git a/pkg/services/notificator/nats/service.go b/pkg/services/notificator/nats/service.go new file mode 100644 index 000000000..28fb707d1 --- /dev/null +++ b/pkg/services/notificator/nats/service.go @@ -0,0 +1,129 @@ +package nats + +import ( + "context" + "errors" + "fmt" + "sync" + + "github.com/nats-io/nats.go" + addressSDK "github.com/nspcc-dev/neofs-sdk-go/object/address" + "go.uber.org/zap" +) + +// Writer is a NATS object notification writer. +// It handles NATS JetStream connections and allows +// sending string representation of the address to +// the NATS server. +// +// For correct operation must be created via New function. +// new(Writer) or Writer{} construction leads to undefined +// behaviour and is not safe. +type Writer struct { + log *zap.Logger + js nats.JetStreamContext + nc *nats.Conn + + m *sync.RWMutex + createdStreams map[string]struct{} +} + +type opts struct { + logger *zap.Logger + nOpts []nats.Option +} + +type Option func(*opts) + +var errConnIsClosed = errors.New("connection to the server is closed") + +// Notify sends object address's string representation to the provided topic. +// Uses first 4 bytes of object ID as a message ID to support 'exactly once message' +// delivery. +// +// Returns error only if: +// 1. underlying connection was closed and has not established again; +// 2. NATS server could not respond that is has saved the message. +func (n *Writer) Notify(topic string, address *addressSDK.Address) error { + if !n.nc.IsConnected() { + return errConnIsClosed + } + + // use first 4 byte of the encoded string as + // message ID for the exactly-once delivery + messageID := address.ObjectID().String()[:4] + + // check if the stream was previously created + n.m.RLock() + _, created := n.createdStreams[topic] + n.m.RUnlock() + + if !created { + _, err := n.js.AddStream(&nats.StreamConfig{ + Name: topic, + }) + if err != nil { + return fmt.Errorf("could not add stream: %w", err) + } + + n.m.Lock() + n.createdStreams[topic] = struct{}{} + n.m.Unlock() + } + + _, err := n.js.Publish(topic, []byte(address.String()), nats.MsgId(messageID)) + if err != nil { + return err + } + + return nil +} + +// New creates and inits new Writer. +// Connection is closed when passed context is done. +// +// Returns error only if fails to open connection to a NATS server +// with provided configuration. +func New(ctx context.Context, endpoint string, oo ...Option) (*Writer, error) { + opts := &opts{ + logger: zap.L(), + nOpts: make([]nats.Option, 0, len(oo)+3), + } + + for _, o := range oo { + o(opts) + } + + opts.nOpts = append(opts.nOpts, + nats.NoCallbacksAfterClientClose(), // do not call callbacks when it was planned writer stop + nats.DisconnectErrHandler(func(conn *nats.Conn, err error) { + opts.logger.Error("nats: connection was lost", zap.Error(err)) + }), + nats.ReconnectHandler(func(conn *nats.Conn) { + opts.logger.Warn("nats: reconnected to the server") + }), + ) + + nc, err := nats.Connect(endpoint, opts.nOpts...) + if err != nil { + return nil, fmt.Errorf("could not connect to server: %w", err) + } + + // usage w/o options is error-free + js, _ := nc.JetStream() + + go func() { + <-ctx.Done() + opts.logger.Info("nats: closing connection as the context is done") + + nc.Close() + }() + + return &Writer{ + js: js, + nc: nc, + log: opts.logger, + m: &sync.RWMutex{}, + createdStreams: make(map[string]struct{}), + }, nil +}