frostfs-node/pkg/services/notificator/nats/service.go

130 lines
3.1 KiB
Go

package nats
import (
"context"
"errors"
"fmt"
"sync"
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger"
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
"github.com/nats-io/nats.go"
"go.uber.org/zap"
)
// Writer is a NATS object notification writer.
// It handles NATS JetStream connections and allows
// sending string representation of the address to
// the NATS server.
//
// For correct operation must be created via New function.
// new(Writer) or Writer{} construction leads to undefined
// behaviour and is not safe.
type Writer struct {
js nats.JetStreamContext
nc *nats.Conn
m sync.RWMutex
createdStreams map[string]struct{}
opts
}
type opts struct {
log *logger.Logger
nOpts []nats.Option
}
type Option func(*opts)
var errConnIsClosed = errors.New("connection to the server is closed")
// Notify sends object address's string representation to the provided topic.
// Uses first 4 bytes of object ID as a message ID to support 'exactly once'
// message delivery.
//
// Returns error only if:
// 1. underlying connection was closed and has not been established again;
// 2. NATS server could not respond that it has saved the message.
func (n *Writer) Notify(topic string, address oid.Address) error {
if !n.nc.IsConnected() {
return errConnIsClosed
}
// use first 4 byte of the encoded string as
// message ID for the 'exactly once' delivery
messageID := address.Object().EncodeToString()[:4]
// check if the stream was previously created
n.m.RLock()
_, created := n.createdStreams[topic]
n.m.RUnlock()
if !created {
_, err := n.js.AddStream(&nats.StreamConfig{
Name: topic,
})
if err != nil {
return fmt.Errorf("could not add stream: %w", err)
}
n.m.Lock()
n.createdStreams[topic] = struct{}{}
n.m.Unlock()
}
_, err := n.js.Publish(topic, []byte(address.EncodeToString()), nats.MsgId(messageID))
return err
}
// New creates new Writer.
func New(oo ...Option) *Writer {
w := &Writer{
createdStreams: make(map[string]struct{}),
opts: opts{
log: &logger.Logger{Logger: zap.L()},
nOpts: make([]nats.Option, 0, len(oo)+3),
},
}
for _, o := range oo {
o(&w.opts)
}
w.opts.nOpts = append(w.opts.nOpts,
nats.NoCallbacksAfterClientClose(), // do not call callbacks when it was planned writer stop
nats.DisconnectErrHandler(func(_ *nats.Conn, err error) {
w.log.Error(logs.NatsNatsConnectionWasLost, zap.Error(err))
}),
nats.ReconnectHandler(func(_ *nats.Conn) {
w.log.Warn(logs.NatsNatsReconnectedToTheServer)
}),
)
return w
}
// Connect tries to connect to a specified NATS endpoint.
//
// Connection is closed when passed context is done.
func (n *Writer) Connect(ctx context.Context, endpoint string) error {
nc, err := nats.Connect(endpoint, n.opts.nOpts...)
if err != nil {
return fmt.Errorf("could not connect to server: %w", err)
}
n.nc = nc
// usage w/o options is error-free
n.js, _ = nc.JetStream()
go func() {
<-ctx.Done()
n.opts.log.Info(logs.NatsNatsClosingConnectionAsTheContextIsDone)
nc.Close()
}()
return nil
}