forked from TrueCloudLab/frostfs-node
129 lines
3.1 KiB
Go
129 lines
3.1 KiB
Go
package nats
|
|
|
|
import (
|
|
"context"
|
|
"errors"
|
|
"fmt"
|
|
"sync"
|
|
|
|
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
|
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger"
|
|
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
|
"github.com/nats-io/nats.go"
|
|
"go.uber.org/zap"
|
|
)
|
|
|
|
// Writer is a NATS object notification writer.
|
|
// It handles NATS JetStream connections and allows
|
|
// sending string representation of the address to
|
|
// the NATS server.
|
|
//
|
|
// For correct operation must be created via New function.
|
|
// new(Writer) or Writer{} construction leads to undefined
|
|
// behaviour and is not safe.
|
|
type Writer struct {
|
|
js nats.JetStreamContext
|
|
nc *nats.Conn
|
|
|
|
m sync.RWMutex
|
|
createdStreams map[string]struct{}
|
|
opts
|
|
}
|
|
|
|
type opts struct {
|
|
log *logger.Logger
|
|
nOpts []nats.Option
|
|
}
|
|
|
|
type Option func(*opts)
|
|
|
|
var errConnIsClosed = errors.New("connection to the server is closed")
|
|
|
|
// Notify sends object address's string representation to the provided topic.
|
|
// Uses first 4 bytes of object ID as a message ID to support 'exactly once'
|
|
// message delivery.
|
|
//
|
|
// Returns error only if:
|
|
// 1. underlying connection was closed and has not been established again;
|
|
// 2. NATS server could not respond that it has saved the message.
|
|
func (n *Writer) Notify(topic string, address oid.Address) error {
|
|
if !n.nc.IsConnected() {
|
|
return errConnIsClosed
|
|
}
|
|
|
|
// use first 4 byte of the encoded string as
|
|
// message ID for the 'exactly once' delivery
|
|
messageID := address.Object().EncodeToString()[:4]
|
|
|
|
// check if the stream was previously created
|
|
n.m.RLock()
|
|
_, created := n.createdStreams[topic]
|
|
n.m.RUnlock()
|
|
|
|
if !created {
|
|
_, err := n.js.AddStream(&nats.StreamConfig{
|
|
Name: topic,
|
|
})
|
|
if err != nil {
|
|
return fmt.Errorf("could not add stream: %w", err)
|
|
}
|
|
|
|
n.m.Lock()
|
|
n.createdStreams[topic] = struct{}{}
|
|
n.m.Unlock()
|
|
}
|
|
|
|
_, err := n.js.Publish(topic, []byte(address.EncodeToString()), nats.MsgId(messageID))
|
|
return err
|
|
}
|
|
|
|
// New creates new Writer.
|
|
func New(oo ...Option) *Writer {
|
|
w := &Writer{
|
|
createdStreams: make(map[string]struct{}),
|
|
opts: opts{
|
|
log: &logger.Logger{Logger: zap.L()},
|
|
nOpts: make([]nats.Option, 0, len(oo)+3),
|
|
},
|
|
}
|
|
|
|
for _, o := range oo {
|
|
o(&w.opts)
|
|
}
|
|
|
|
w.opts.nOpts = append(w.opts.nOpts,
|
|
nats.NoCallbacksAfterClientClose(), // do not call callbacks when it was planned writer stop
|
|
nats.DisconnectErrHandler(func(_ *nats.Conn, err error) {
|
|
w.log.Error(logs.NatsNatsConnectionWasLost, zap.Error(err))
|
|
}),
|
|
nats.ReconnectHandler(func(_ *nats.Conn) {
|
|
w.log.Warn(logs.NatsNatsReconnectedToTheServer)
|
|
}),
|
|
)
|
|
|
|
return w
|
|
}
|
|
|
|
// Connect tries to connect to a specified NATS endpoint.
|
|
//
|
|
// Connection is closed when passed context is done.
|
|
func (n *Writer) Connect(ctx context.Context, endpoint string) error {
|
|
nc, err := nats.Connect(endpoint, n.opts.nOpts...)
|
|
if err != nil {
|
|
return fmt.Errorf("could not connect to server: %w", err)
|
|
}
|
|
|
|
n.nc = nc
|
|
|
|
// usage w/o options is error-free
|
|
n.js, _ = nc.JetStream()
|
|
|
|
go func() {
|
|
<-ctx.Done()
|
|
n.opts.log.Info(logs.NatsNatsClosingConnectionAsTheContextIsDone)
|
|
|
|
nc.Close()
|
|
}()
|
|
|
|
return nil
|
|
}
|