diff --git a/api/layer/layer.go b/api/layer/layer.go index e479021..a3862b9 100644 --- a/api/layer/layer.go +++ b/api/layer/layer.go @@ -32,6 +32,7 @@ type ( Notificator interface { Subscribe(context.Context, string, MsgHandler) error Listen(context.Context) + SendNotifications(topics map[string]string, p *SendNotificationParams) error SendTestNotification(topic, bucketName, requestID, HostID string) error } @@ -234,6 +235,8 @@ type ( PutBucketNotificationConfiguration(ctx context.Context, p *PutBucketNotificationConfigurationParams) error GetBucketNotificationConfiguration(ctx context.Context, bktInfo *data.BucketInfo) (*data.NotificationConfiguration, error) + + SendNotifications(ctx context.Context, p *SendNotificationParams) error } ) diff --git a/api/layer/notifications.go b/api/layer/notifications.go index 97c1272..c1bac87 100644 --- a/api/layer/notifications.go +++ b/api/layer/notifications.go @@ -20,6 +20,14 @@ type ( BktInfo *data.BucketInfo Reader io.Reader } + + SendNotificationParams struct { + Event string + ObjInfo *data.ObjectInfo + BktInfo *data.BucketInfo + ReqInfo *api.ReqInfo + User string + } ) const ( @@ -179,6 +187,29 @@ func (n *layer) getNotificationConf(ctx context.Context, bkt *data.BucketInfo, s return conf, nil } +func (n *layer) SendNotifications(ctx context.Context, p *SendNotificationParams) error { + if !n.IsNotificationEnabled() { + return nil + } + + conf, err := n.getNotificationConf(ctx, p.BktInfo, p.BktInfo.NotificationConfigurationObjectName()) + if err != nil { + return err + } + if conf.IsEmpty() { + return nil + } + + box, err := GetBoxData(ctx) + if err == nil { + p.User = box.Gate.BearerToken.OwnerID().String() + } + + topics := filterSubjects(conf, p.Event, p.ObjInfo.Name) + + return n.ncontroller.SendNotifications(topics, p) +} + // checkBucketConfiguration checks notification configuration and generates ID for configurations with empty ids. func (n *layer) checkBucketConfiguration(conf *data.NotificationConfiguration, r *api.ReqInfo) (completed bool, err error) { if conf == nil { @@ -211,6 +242,40 @@ func (n *layer) checkBucketConfiguration(conf *data.NotificationConfiguration, r return } +func filterSubjects(conf *data.NotificationConfiguration, eventType, objName string) map[string]string { + topics := make(map[string]string) + + for _, t := range conf.QueueConfigurations { + event := false + for _, e := range t.Events { + // the second condition is comparison with events ending with *: + // s3:ObjectCreated:*, s3:ObjectRemoved:* etc without the last char + if eventType == e || strings.HasPrefix(eventType, e[:len(e)-1]) { + event = true + break + } + } + + if !event { + continue + } + + filter := true + for _, f := range t.Filter.Key.FilterRules { + if f.Name == filterRulePrefixName && !strings.HasPrefix(objName, f.Value) || + f.Name == filterRuleSuffixName && !strings.HasSuffix(objName, f.Value) { + filter = false + break + } + } + if filter { + topics[t.ID] = t.QueueArn + } + } + + return topics +} + func checkRules(rules []data.FilterRule) error { names := make(map[string]struct{}) diff --git a/api/notifications/controller.go b/api/notifications/controller.go index be654b9..57f7b3f 100644 --- a/api/notifications/controller.go +++ b/api/notifications/controller.go @@ -8,12 +8,21 @@ import ( "time" "github.com/nats-io/nats.go" + "github.com/nspcc-dev/neofs-s3-gw/api" + "github.com/nspcc-dev/neofs-s3-gw/api/data" "github.com/nspcc-dev/neofs-s3-gw/api/layer" "go.uber.org/zap" ) const ( DefaultTimeout = 30 * time.Second + + // EventVersion23 is used for lifecycle, tiering, objectACL, objectTagging, object restoration notifications. + EventVersion23 = "2.3" + // EventVersion22 is used for replication notifications. + EventVersion22 = "2.2" + // EventVersion21 is used for all other notification types. + EventVersion21 = "2.1" ) type ( @@ -46,6 +55,51 @@ type ( RequestID string HostID string } + + Event struct { + Records []EventRecord `json:"Records"` + } + + EventRecord struct { + EventVersion string `json:"eventVersion"` + EventSource string `json:"eventSource"` // neofs:s3 + AWSRegion string `json:"awsRegion,omitempty"` // empty + EventTime time.Time `json:"eventTime"` + EventName string `json:"eventName"` + UserIdentity UserIdentity `json:"userIdentity"` + RequestParameters RequestParameters `json:"requestParameters"` + ResponseElements map[string]string `json:"responseElements"` + S3 S3Entity `json:"s3"` + } + + UserIdentity struct { + PrincipalID string `json:"principalId"` + } + + RequestParameters struct { + SourceIPAddress string `json:"sourceIPAddress"` + } + + S3Entity struct { + SchemaVersion string `json:"s3SchemaVersion"` + ConfigurationID string `json:"configurationId,omitempty"` + Bucket Bucket `json:"bucket"` + Object Object `json:"object"` + } + + Bucket struct { + Name string `json:"name"` + OwnerIdentity UserIdentity `json:"ownerIdentity,omitempty"` + Arn string `json:"arn,omitempty"` + } + + Object struct { + Key string `json:"key"` + Size int64 `json:"size,omitempty"` + VersionID string `json:"versionId,omitempty"` + ETag string `json:"eTag,omitempty"` + Sequencer string `json:"sequencer,omitempty"` + } ) func NewController(p *Options, l *zap.Logger) (*Controller, error) { @@ -127,6 +181,23 @@ func (c *Controller) Listen(ctx context.Context) { } } +func (c *Controller) SendNotifications(topics map[string]string, p *layer.SendNotificationParams) error { + event := prepareEvent(p.Event, p.User, p.BktInfo, p.ObjInfo, p.ReqInfo) + + for id, topic := range topics { + event.Records[0].S3.ConfigurationID = id + msg, err := json.Marshal(event) + if err != nil { + c.logger.Error("couldn't marshal an event", zap.String("subject", topic), zap.Error(err)) + } + if err = c.publish(topic, msg); err != nil { + c.logger.Error("couldn't send an event to topic", zap.String("subject", topic), zap.Error(err)) + } + } + + return nil +} + func (c *Controller) SendTestNotification(topic, bucketName, requestID, HostID string) error { event := &TestEvent{ Service: "NeoFS S3", @@ -144,3 +215,48 @@ func (c *Controller) SendTestNotification(topic, bucketName, requestID, HostID s return c.publish(topic, msg) } + +func prepareEvent(eventName string, initiatorID string, bkt *data.BucketInfo, obj *data.ObjectInfo, reqInfo *api.ReqInfo) *Event { + return &Event{ + Records: []EventRecord{ + { + EventVersion: EventVersion21, + EventSource: "neofs:s3", + AWSRegion: "", + EventTime: time.Now(), + EventName: eventName, + UserIdentity: UserIdentity{ + PrincipalID: initiatorID, + }, + RequestParameters: RequestParameters{ + SourceIPAddress: reqInfo.RemoteHost, + }, + ResponseElements: nil, + S3: S3Entity{ + SchemaVersion: "1.0", + // ConfigurationID is skipped and will be placed later + Bucket: Bucket{ + Name: bkt.Name, + OwnerIdentity: UserIdentity{PrincipalID: bkt.Owner.String()}, + Arn: bkt.Name, + }, + Object: Object{ + Key: obj.Name, + Size: obj.Size, + VersionID: obj.Version(), + ETag: obj.HashSum, + Sequencer: "", + }, + }, + }, + }, + } +} + +func (c *Controller) publish(topic string, msg []byte) error { + if _, err := c.jsClient.Publish(topic, msg); err != nil { + return fmt.Errorf("couldn't send event: %w", err) + } + + return nil +}