frostfs-s3-gw/api/notifications/controller.go

264 lines
6.6 KiB
Go
Raw Permalink Normal View History

package notifications
import (
"context"
"encoding/json"
"fmt"
"sync"
"time"
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/handler"
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/layer"
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/internal/logs"
"github.com/nats-io/nats.go"
"go.uber.org/zap"
)
const (
DefaultTimeout = 30 * time.Second
// EventVersion23 is used for lifecycle, tiering, objectACL, objectTagging, object restoration notifications.
EventVersion23 = "2.3"
// EventVersion22 is used for replication notifications.
EventVersion22 = "2.2"
// EventVersion21 is used for all other notification types.
EventVersion21 = "2.1"
)
type (
Options struct {
URL string
TLSCertFilepath string
TLSAuthPrivateKeyFilePath string
Timeout time.Duration
RootCAFiles []string
}
Controller struct {
logger *zap.Logger
taskQueueConnection *nats.Conn
jsClient nats.JetStreamContext
handlers map[string]Stream
mu sync.RWMutex
}
Stream struct {
h layer.MsgHandler
ch chan *nats.Msg
}
TestEvent struct {
Service string
Event string
Time time.Time
Bucket string
RequestID string
HostID string
}
Event struct {
Records []EventRecord `json:"Records"`
}
EventRecord struct {
EventVersion string `json:"eventVersion"`
EventSource string `json:"eventSource"` // frostfs:s3
AWSRegion string `json:"awsRegion,omitempty"` // empty
EventTime time.Time `json:"eventTime"`
EventName string `json:"eventName"`
UserIdentity UserIdentity `json:"userIdentity"`
RequestParameters RequestParameters `json:"requestParameters"`
ResponseElements map[string]string `json:"responseElements"`
S3 S3Entity `json:"s3"`
}
UserIdentity struct {
PrincipalID string `json:"principalId"`
}
RequestParameters struct {
SourceIPAddress string `json:"sourceIPAddress"`
}
S3Entity struct {
SchemaVersion string `json:"s3SchemaVersion"`
ConfigurationID string `json:"configurationId,omitempty"`
Bucket Bucket `json:"bucket"`
Object Object `json:"object"`
}
Bucket struct {
Name string `json:"name"`
OwnerIdentity UserIdentity `json:"ownerIdentity,omitempty"`
Arn string `json:"arn,omitempty"`
}
Object struct {
Key string `json:"key"`
Size uint64 `json:"size,omitempty"`
VersionID string `json:"versionId,omitempty"`
ETag string `json:"eTag,omitempty"`
Sequencer string `json:"sequencer,omitempty"`
}
)
func NewController(p *Options, l *zap.Logger) (*Controller, error) {
ncopts := []nats.Option{
nats.Timeout(p.Timeout),
}
if len(p.TLSCertFilepath) != 0 && len(p.TLSAuthPrivateKeyFilePath) != 0 {
ncopts = append(ncopts, nats.ClientCert(p.TLSCertFilepath, p.TLSAuthPrivateKeyFilePath))
}
if len(p.RootCAFiles) != 0 {
ncopts = append(ncopts, nats.RootCAs(p.RootCAFiles...))
}
nc, err := nats.Connect(p.URL, ncopts...)
if err != nil {
return nil, fmt.Errorf("connect to nats: %w", err)
}
js, err := nc.JetStream()
if err != nil {
return nil, fmt.Errorf("get jet stream: %w", err)
}
return &Controller{
logger: l,
taskQueueConnection: nc,
jsClient: js,
handlers: make(map[string]Stream),
}, nil
}
func (c *Controller) Subscribe(_ context.Context, topic string, handler layer.MsgHandler) error {
ch := make(chan *nats.Msg, 1)
c.mu.RLock()
_, ok := c.handlers[topic]
c.mu.RUnlock()
if ok {
return fmt.Errorf("already subscribed to topic '%s'", topic)
}
if _, err := c.jsClient.AddStream(&nats.StreamConfig{Name: topic}); err != nil {
return fmt.Errorf("add stream: %w", err)
}
if _, err := c.jsClient.ChanSubscribe(topic, ch); err != nil {
return fmt.Errorf("could not subscribe: %w", err)
}
c.mu.Lock()
c.handlers[topic] = Stream{
h: handler,
ch: ch,
}
c.mu.Unlock()
return nil
}
func (c *Controller) Listen(ctx context.Context) {
c.mu.RLock()
defer c.mu.RUnlock()
for _, stream := range c.handlers {
go func(stream Stream) {
for {
select {
case msg := <-stream.ch:
if err := stream.h.HandleMessage(ctx, msg); err != nil {
c.logger.Error(logs.CouldNotHandleMessage, zap.Error(err))
} else if err = msg.Ack(); err != nil {
c.logger.Error(logs.CouldNotACKMessage, zap.Error(err))
}
case <-ctx.Done():
return
}
}
}(stream)
}
}
func (c *Controller) SendNotifications(topics map[string]string, p *handler.SendNotificationParams) error {
event := prepareEvent(p)
for id, topic := range topics {
event.Records[0].S3.ConfigurationID = id
msg, err := json.Marshal(event)
if err != nil {
c.logger.Error(logs.CouldntMarshalAnEvent, zap.String("subject", topic), zap.Error(err))
}
if err = c.publish(topic, msg); err != nil {
c.logger.Error(logs.CouldntSendAnEventToTopic, zap.String("subject", topic), zap.Error(err))
}
}
return nil
}
func (c *Controller) SendTestNotification(topic, bucketName, requestID, HostID string, now time.Time) error {
event := &TestEvent{
Service: "FrostFS S3",
Event: "s3:TestEvent",
Time: now,
Bucket: bucketName,
RequestID: requestID,
HostID: HostID,
}
msg, err := json.Marshal(event)
if err != nil {
return fmt.Errorf("couldn't marshal test event: %w", err)
}
return c.publish(topic, msg)
}
func prepareEvent(p *handler.SendNotificationParams) *Event {
return &Event{
Records: []EventRecord{
{
EventVersion: EventVersion21,
EventSource: "frostfs:s3",
AWSRegion: "",
EventTime: p.Time,
EventName: p.Event,
UserIdentity: UserIdentity{
PrincipalID: p.User,
},
RequestParameters: RequestParameters{
SourceIPAddress: p.ReqInfo.RemoteHost,
},
ResponseElements: nil,
S3: S3Entity{
SchemaVersion: "1.0",
// ConfigurationID is skipped and will be placed later
Bucket: Bucket{
Name: p.BktInfo.Name,
OwnerIdentity: UserIdentity{PrincipalID: p.BktInfo.Owner.String()},
Arn: p.BktInfo.Name,
},
Object: Object{
Key: p.NotificationInfo.Name,
Size: p.NotificationInfo.Size,
VersionID: p.NotificationInfo.Version,
ETag: p.NotificationInfo.HashSum,
Sequencer: "",
},
},
},
},
}
}
func (c *Controller) publish(topic string, msg []byte) error {
if _, err := c.jsClient.Publish(topic, msg); err != nil {
return fmt.Errorf("couldn't send event: %w", err)
}
return nil
}