From 94caa2247e6ec2dbb0392ebecb0d39f48956e0ae Mon Sep 17 00:00:00 2001 From: Denis Kirillov Date: Fri, 29 Apr 2022 16:08:22 +0300 Subject: [PATCH] [#391] Refactor notifications Signed-off-by: Denis Kirillov --- api/handler/api.go | 30 ++- api/handler/copy.go | 6 +- api/handler/delete.go | 12 +- api/handler/multipart_upload.go | 6 +- api/handler/notifications.go | 229 +++++++++++++++++- api/{layer => handler}/notifications_test.go | 2 +- api/handler/put.go | 12 +- api/layer/layer.go | 12 +- api/layer/notifications.go | 240 +------------------ api/notifications/controller.go | 5 +- cmd/s3-gw/app.go | 3 +- 11 files changed, 279 insertions(+), 278 deletions(-) rename api/{layer => handler}/notifications_test.go (99%) diff --git a/api/handler/api.go b/api/handler/api.go index 58f5a9c..27bc5c2 100644 --- a/api/handler/api.go +++ b/api/handler/api.go @@ -11,15 +11,22 @@ import ( type ( handler struct { - log *zap.Logger - obj layer.Client - cfg *Config + log *zap.Logger + obj layer.Client + notificator Notificator + cfg *Config + } + + Notificator interface { + SendNotifications(topics map[string]string, p *SendNotificationParams) error + SendTestNotification(topic, bucketName, requestID, HostID string) error } // Config contains data which handler needs to keep. Config struct { - DefaultPolicy *netmap.PlacementPolicy - DefaultMaxAge int + DefaultPolicy *netmap.PlacementPolicy + DefaultMaxAge int + NotificatorEnabled bool } ) @@ -29,7 +36,7 @@ const DefaultPolicy = "REP 3" var _ api.Handler = (*handler)(nil) // New creates new api.Handler using given logger and client. -func New(log *zap.Logger, obj layer.Client, cfg *Config) (api.Handler, error) { +func New(log *zap.Logger, obj layer.Client, notificator Notificator, cfg *Config) (api.Handler, error) { switch { case obj == nil: return nil, errors.New("empty NeoFS Object Layer") @@ -37,9 +44,14 @@ func New(log *zap.Logger, obj layer.Client, cfg *Config) (api.Handler, error) { return nil, errors.New("empty logger") } + if cfg.NotificatorEnabled && notificator == nil { + return nil, errors.New("empty notificator") + } + return &handler{ - log: log, - obj: obj, - cfg: cfg, + log: log, + obj: obj, + cfg: cfg, + notificator: notificator, }, nil } diff --git a/api/handler/copy.go b/api/handler/copy.go index 81b8112..6f65d00 100644 --- a/api/handler/copy.go +++ b/api/handler/copy.go @@ -134,13 +134,13 @@ func (h *handler) CopyObjectHandler(w http.ResponseWriter, r *http.Request) { zap.String("object", info.Name), zap.Stringer("object_id", info.ID)) - s := &layer.SendNotificationParams{ - Event: layer.EventObjectCreatedCopy, + s := &SendNotificationParams{ + Event: EventObjectCreatedCopy, ObjInfo: info, BktInfo: dstBktInfo, ReqInfo: reqInfo, } - if err := h.obj.SendNotifications(r.Context(), s); err != nil { + if err = h.sendNotifications(r.Context(), s); err != nil { h.log.Error("couldn't send notification: %w", zap.Error(err)) } } diff --git a/api/handler/delete.go b/api/handler/delete.go index 3cfc7ba..03b2491 100644 --- a/api/handler/delete.go +++ b/api/handler/delete.go @@ -97,11 +97,11 @@ func (h *handler) DeleteObjectHandler(w http.ResponseWriter, r *http.Request) { zap.Error(err)) } - var m *layer.SendNotificationParams + var m *SendNotificationParams if bktSettings.VersioningEnabled && len(versionID) == 0 { - m = &layer.SendNotificationParams{ - Event: layer.EventObjectRemovedDeleteMarkerCreated, + m = &SendNotificationParams{ + Event: EventObjectRemovedDeleteMarkerCreated, ObjInfo: &data.ObjectInfo{ Name: reqInfo.ObjectName, HashSum: deletedObject.DeleteMarkerEtag, @@ -117,8 +117,8 @@ func (h *handler) DeleteObjectHandler(w http.ResponseWriter, r *http.Request) { } } - m = &layer.SendNotificationParams{ - Event: layer.EventObjectRemovedDelete, + m = &SendNotificationParams{ + Event: EventObjectRemovedDelete, ObjInfo: &data.ObjectInfo{ Name: reqInfo.ObjectName, ID: &objID, @@ -128,7 +128,7 @@ func (h *handler) DeleteObjectHandler(w http.ResponseWriter, r *http.Request) { } } - if err := h.obj.SendNotifications(r.Context(), m); err != nil { + if err = h.sendNotifications(r.Context(), m); err != nil { h.log.Error("couldn't send notification: %w", zap.Error(err)) } diff --git a/api/handler/multipart_upload.go b/api/handler/multipart_upload.go index d61e097..770169a 100644 --- a/api/handler/multipart_upload.go +++ b/api/handler/multipart_upload.go @@ -436,13 +436,13 @@ func (h *handler) CompleteMultipartUploadHandler(w http.ResponseWriter, r *http. } } - s := &layer.SendNotificationParams{ - Event: layer.EventObjectCreatedCompleteMultipartUpload, + s := &SendNotificationParams{ + Event: EventObjectCreatedCompleteMultipartUpload, ObjInfo: objInfo, BktInfo: bktInfo, ReqInfo: reqInfo, } - if err := h.obj.SendNotifications(r.Context(), s); err != nil { + if err = h.sendNotifications(r.Context(), s); err != nil { h.log.Error("couldn't send notification: %w", zap.Error(err)) } diff --git a/api/handler/notifications.go b/api/handler/notifications.go index 06608fd..2fdfd4d 100644 --- a/api/handler/notifications.go +++ b/api/handler/notifications.go @@ -1,17 +1,95 @@ package handler import ( + "context" "encoding/xml" "net/http" + "strings" + "github.com/google/uuid" "github.com/nspcc-dev/neofs-s3-gw/api" "github.com/nspcc-dev/neofs-s3-gw/api/data" + "github.com/nspcc-dev/neofs-s3-gw/api/errors" "github.com/nspcc-dev/neofs-s3-gw/api/layer" + "go.uber.org/zap" ) -type NotificationConfiguration struct { - XMLName xml.Name `xml:"http://s3.amazonaws.com/doc/2006-03-01/ NotificationConfiguation"` - NotificationConfiguration data.NotificationConfiguration +type ( + SendNotificationParams struct { + Event string + ObjInfo *data.ObjectInfo + BktInfo *data.BucketInfo + ReqInfo *api.ReqInfo + User string + } + + NotificationConfiguration struct { + XMLName xml.Name `xml:"http://s3.amazonaws.com/doc/2006-03-01/ NotificationConfiguation"` + NotificationConfiguration data.NotificationConfiguration + } +) + +const ( + filterRuleSuffixName = "suffix" + filterRulePrefixName = "prefix" + + EventObjectCreated = "s3:ObjectCreated:*" + EventObjectCreatedPut = "s3:ObjectCreated:Put" + EventObjectCreatedPost = "s3:ObjectCreated:Post" + EventObjectCreatedCopy = "s3:ObjectCreated:Copy" + EventReducedRedundancyLostObject = "s3:ReducedRedundancyLostObject" + EventObjectCreatedCompleteMultipartUpload = "s3:ObjectCreated:CompleteMultipartUpload" + EventObjectRemoved = "s3:ObjectRemoved:*" + EventObjectRemovedDelete = "s3:ObjectRemoved:Delete" + EventObjectRemovedDeleteMarkerCreated = "s3:ObjectRemoved:DeleteMarkerCreated" + EventObjectRestore = "s3:ObjectRestore:*" + EventObjectRestorePost = "s3:ObjectRestore:Post" + EventObjectRestoreCompleted = "s3:ObjectRestore:Completed" + EventReplication = "s3:Replication:*" + EventReplicationOperationFailedReplication = "s3:Replication:OperationFailedReplication" + EventReplicationOperationNotTracked = "s3:Replication:OperationNotTracked" + EventReplicationOperationMissedThreshold = "s3:Replication:OperationMissedThreshold" + EventReplicationOperationReplicatedAfterThreshold = "s3:Replication:OperationReplicatedAfterThreshold" + EventObjectRestoreDelete = "s3:ObjectRestore:Delete" + EventLifecycleTransition = "s3:LifecycleTransition" + EventIntelligentTiering = "s3:IntelligentTiering" + EventObjectACLPut = "s3:ObjectAcl:Put" + EventLifecycleExpiration = "s3:LifecycleExpiration:*" + EventLifecycleExpirationDelete = "s3:LifecycleExpiration:Delete" + EventLifecycleExpirationDeleteMarkerCreated = "s3:LifecycleExpiration:DeleteMarkerCreated" + EventObjectTagging = "s3:ObjectTagging:*" + EventObjectTaggingPut = "s3:ObjectTagging:Put" + EventObjectTaggingDelete = "s3:ObjectTagging:Delete" +) + +var validEvents = map[string]struct{}{ + EventReducedRedundancyLostObject: {}, + EventObjectCreated: {}, + EventObjectCreatedPut: {}, + EventObjectCreatedPost: {}, + EventObjectCreatedCopy: {}, + EventObjectCreatedCompleteMultipartUpload: {}, + EventObjectRemoved: {}, + EventObjectRemovedDelete: {}, + EventObjectRemovedDeleteMarkerCreated: {}, + EventObjectRestore: {}, + EventObjectRestorePost: {}, + EventObjectRestoreCompleted: {}, + EventReplication: {}, + EventReplicationOperationFailedReplication: {}, + EventReplicationOperationNotTracked: {}, + EventReplicationOperationMissedThreshold: {}, + EventReplicationOperationReplicatedAfterThreshold: {}, + EventObjectRestoreDelete: {}, + EventLifecycleTransition: {}, + EventIntelligentTiering: {}, + EventObjectACLPut: {}, + EventLifecycleExpiration: {}, + EventLifecycleExpirationDelete: {}, + EventLifecycleExpirationDeleteMarkerCreated: {}, + EventObjectTagging: {}, + EventObjectTaggingPut: {}, + EventObjectTaggingDelete: {}, } func (h *handler) PutBucketNotificationHandler(w http.ResponseWriter, r *http.Request) { @@ -22,13 +100,24 @@ func (h *handler) PutBucketNotificationHandler(w http.ResponseWriter, r *http.Re return } - p := &layer.PutBucketNotificationConfigurationParams{ - RequestInfo: reqInfo, - BktInfo: bktInfo, - Reader: r.Body, + conf := &data.NotificationConfiguration{} + if err = xml.NewDecoder(r.Body).Decode(conf); err != nil { + h.logAndSendError(w, "couldn't decode notification configuration", reqInfo, errors.GetAPIError(errors.ErrMalformedXML)) + return } - if err := h.obj.PutBucketNotificationConfiguration(r.Context(), p); err != nil { + if _, err = h.checkBucketConfiguration(conf, reqInfo); err != nil { + h.logAndSendError(w, "couldn't check bucket configuration", reqInfo, err) + return + } + + p := &layer.PutBucketNotificationConfigurationParams{ + RequestInfo: reqInfo, + BktInfo: bktInfo, + Configuration: conf, + } + + if err = h.obj.PutBucketNotificationConfiguration(r.Context(), p); err != nil { h.logAndSendError(w, "couldn't put bucket configuration", reqInfo, err) return } @@ -54,3 +143,127 @@ func (h *handler) GetBucketNotificationHandler(w http.ResponseWriter, r *http.Re return } } + +func (h *handler) sendNotifications(ctx context.Context, p *SendNotificationParams) error { + if !h.cfg.NotificatorEnabled { + h.log.Debug("could not send notification because notificator is disabled", zap.String("event", p.Event)) + return nil + } + + conf, err := h.obj.GetBucketNotificationConfiguration(ctx, p.BktInfo) + if err != nil { + return err + } + if conf.IsEmpty() { + return nil + } + + box, err := layer.GetBoxData(ctx) + if err == nil { + p.User = box.Gate.BearerToken.OwnerID().String() + } + + topics := filterSubjects(conf, p.Event, p.ObjInfo.Name) + + return h.notificator.SendNotifications(topics, p) +} + +// checkBucketConfiguration checks notification configuration and generates an ID for configurations with empty ids. +func (h *handler) checkBucketConfiguration(conf *data.NotificationConfiguration, r *api.ReqInfo) (completed bool, err error) { + if conf == nil { + return + } + + if conf.TopicConfigurations != nil || conf.LambdaFunctionConfigurations != nil { + return completed, errors.GetAPIError(errors.ErrNotificationTopicNotSupported) + } + + for i, q := range conf.QueueConfigurations { + if err = checkEvents(q.Events); err != nil { + return + } + + if err = checkRules(q.Filter.Key.FilterRules); err != nil { + return + } + + if h.cfg.NotificatorEnabled { + if err = h.notificator.SendTestNotification(q.QueueArn, r.BucketName, r.RequestID, r.Host); err != nil { + return + } + } else { + h.log.Warn("failed to send test event because notifications is disabled") + } + + if q.ID == "" { + completed = true + conf.QueueConfigurations[i].ID = uuid.NewString() + } + } + + return +} + +func checkRules(rules []data.FilterRule) error { + names := make(map[string]struct{}) + + for _, r := range rules { + if r.Name != filterRuleSuffixName && r.Name != filterRulePrefixName { + return errors.GetAPIError(errors.ErrFilterNameInvalid) + } + if _, ok := names[r.Name]; ok { + if r.Name == filterRuleSuffixName { + return errors.GetAPIError(errors.ErrFilterNameSuffix) + } + return errors.GetAPIError(errors.ErrFilterNamePrefix) + } + + names[r.Name] = struct{}{} + } + + return nil +} + +func checkEvents(events []string) error { + for _, e := range events { + if _, ok := validEvents[e]; !ok { + return errors.GetAPIError(errors.ErrEventNotification) + } + } + + return nil +} + +func filterSubjects(conf *data.NotificationConfiguration, eventType, objName string) map[string]string { + topics := make(map[string]string) + + for _, t := range conf.QueueConfigurations { + event := false + for _, e := range t.Events { + // the second condition is comparison with the events ending with *: + // s3:ObjectCreated:*, s3:ObjectRemoved:* etc without the last char + if eventType == e || strings.HasPrefix(eventType, e[:len(e)-1]) { + event = true + break + } + } + + if !event { + continue + } + + filter := true + for _, f := range t.Filter.Key.FilterRules { + if f.Name == filterRulePrefixName && !strings.HasPrefix(objName, f.Value) || + f.Name == filterRuleSuffixName && !strings.HasSuffix(objName, f.Value) { + filter = false + break + } + } + if filter { + topics[t.ID] = t.QueueArn + } + } + + return topics +} diff --git a/api/layer/notifications_test.go b/api/handler/notifications_test.go similarity index 99% rename from api/layer/notifications_test.go rename to api/handler/notifications_test.go index 69abb9d..c0e1c3a 100644 --- a/api/layer/notifications_test.go +++ b/api/handler/notifications_test.go @@ -1,4 +1,4 @@ -package layer +package handler import ( "testing" diff --git a/api/handler/put.go b/api/handler/put.go index 24c115d..4169ad7 100644 --- a/api/handler/put.go +++ b/api/handler/put.go @@ -237,13 +237,13 @@ func (h *handler) PutObjectHandler(w http.ResponseWriter, r *http.Request) { return } - s := &layer.SendNotificationParams{ - Event: layer.EventObjectCreatedPut, + s := &SendNotificationParams{ + Event: EventObjectCreatedPut, ObjInfo: info, BktInfo: bktInfo, ReqInfo: reqInfo, } - if err := h.obj.SendNotifications(r.Context(), s); err != nil { + if err = h.sendNotifications(r.Context(), s); err != nil { h.log.Error("couldn't send notification: %w", zap.Error(err)) } @@ -354,13 +354,13 @@ func (h *handler) PostObject(w http.ResponseWriter, r *http.Request) { return } - s := &layer.SendNotificationParams{ - Event: layer.EventObjectCreatedPost, + s := &SendNotificationParams{ + Event: EventObjectCreatedPost, ObjInfo: info, BktInfo: bktInfo, ReqInfo: reqInfo, } - if err := h.obj.SendNotifications(r.Context(), s); err != nil { + if err = h.sendNotifications(r.Context(), s); err != nil { h.log.Error("couldn't send notification: %w", zap.Error(err)) } diff --git a/api/layer/layer.go b/api/layer/layer.go index e7a6e31..67fb740 100644 --- a/api/layer/layer.go +++ b/api/layer/layer.go @@ -28,11 +28,9 @@ import ( ) type ( - Notificator interface { + EventListener interface { Subscribe(context.Context, string, MsgHandler) error Listen(context.Context) - SendNotifications(topics map[string]string, p *SendNotificationParams) error - SendTestNotification(topic, bucketName, requestID, HostID string) error } MsgHandler interface { @@ -46,7 +44,7 @@ type ( log *zap.Logger anonKey AnonymousKey resolver *resolver.BucketResolver - ncontroller Notificator + ncontroller EventListener listsCache *cache.ObjectsListCache objCache *cache.ObjectsCache namesCache *cache.ObjectsNameCache @@ -192,7 +190,7 @@ type ( // Client provides S3 API client interface. Client interface { - Initialize(ctx context.Context, c Notificator) error + Initialize(ctx context.Context, c EventListener) error EphemeralKey() *keys.PublicKey GetBucketSettings(ctx context.Context, bktInfo *data.BucketInfo) (*data.BucketSettings, error) @@ -241,8 +239,6 @@ type ( PutBucketNotificationConfiguration(ctx context.Context, p *PutBucketNotificationConfigurationParams) error GetBucketNotificationConfiguration(ctx context.Context, bktInfo *data.BucketInfo) (*data.NotificationConfiguration, error) - - SendNotifications(ctx context.Context, p *SendNotificationParams) error } ) @@ -290,7 +286,7 @@ func (n *layer) EphemeralKey() *keys.PublicKey { return n.anonKey.Key.PublicKey() } -func (n *layer) Initialize(ctx context.Context, c Notificator) error { +func (n *layer) Initialize(ctx context.Context, c EventListener) error { if n.IsNotificationEnabled() { return fmt.Errorf("already initialized") } diff --git a/api/layer/notifications.go b/api/layer/notifications.go index 4b2a482..008204b 100644 --- a/api/layer/notifications.go +++ b/api/layer/notifications.go @@ -4,130 +4,30 @@ import ( "bytes" "context" "encoding/xml" - "io" - "strings" - "github.com/google/uuid" "github.com/nspcc-dev/neofs-s3-gw/api" "github.com/nspcc-dev/neofs-s3-gw/api/data" "github.com/nspcc-dev/neofs-s3-gw/api/errors" "go.uber.org/zap" ) -type ( - PutBucketNotificationConfigurationParams struct { - RequestInfo *api.ReqInfo - BktInfo *data.BucketInfo - Reader io.Reader - } - - SendNotificationParams struct { - Event string - ObjInfo *data.ObjectInfo - BktInfo *data.BucketInfo - ReqInfo *api.ReqInfo - User string - } -) - -const ( - filterRuleSuffixName = "suffix" - filterRulePrefixName = "prefix" - - EventObjectCreated = "s3:ObjectCreated:*" - EventObjectCreatedPut = "s3:ObjectCreated:Put" - EventObjectCreatedPost = "s3:ObjectCreated:Post" - EventObjectCreatedCopy = "s3:ObjectCreated:Copy" - EventReducedRedundancyLostObject = "s3:ReducedRedundancyLostObject" - EventObjectCreatedCompleteMultipartUpload = "s3:ObjectCreated:CompleteMultipartUpload" - EventObjectRemoved = "s3:ObjectRemoved:*" - EventObjectRemovedDelete = "s3:ObjectRemoved:Delete" - EventObjectRemovedDeleteMarkerCreated = "s3:ObjectRemoved:DeleteMarkerCreated" - EventObjectRestore = "s3:ObjectRestore:*" - EventObjectRestorePost = "s3:ObjectRestore:Post" - EventObjectRestoreCompleted = "s3:ObjectRestore:Completed" - EventReplication = "s3:Replication:*" - EventReplicationOperationFailedReplication = "s3:Replication:OperationFailedReplication" - EventReplicationOperationNotTracked = "s3:Replication:OperationNotTracked" - EventReplicationOperationMissedThreshold = "s3:Replication:OperationMissedThreshold" - EventReplicationOperationReplicatedAfterThreshold = "s3:Replication:OperationReplicatedAfterThreshold" - EventObjectRestoreDelete = "s3:ObjectRestore:Delete" - EventLifecycleTransition = "s3:LifecycleTransition" - EventIntelligentTiering = "s3:IntelligentTiering" - EventObjectACLPut = "s3:ObjectAcl:Put" - EventLifecycleExpiration = "s3:LifecycleExpiration:*" - EventLifecycleExpirationDelete = "s3:LifecycleExpiration:Delete" - EventLifecycleExpirationDeleteMarkerCreated = "s3:LifecycleExpiration:DeleteMarkerCreated" - EventObjectTagging = "s3:ObjectTagging:*" - EventObjectTaggingPut = "s3:ObjectTagging:Put" - EventObjectTaggingDelete = "s3:ObjectTagging:Delete" -) - -var validEvents = map[string]struct{}{ - EventReducedRedundancyLostObject: {}, - EventObjectCreated: {}, - EventObjectCreatedPut: {}, - EventObjectCreatedPost: {}, - EventObjectCreatedCopy: {}, - EventObjectCreatedCompleteMultipartUpload: {}, - EventObjectRemoved: {}, - EventObjectRemovedDelete: {}, - EventObjectRemovedDeleteMarkerCreated: {}, - EventObjectRestore: {}, - EventObjectRestorePost: {}, - EventObjectRestoreCompleted: {}, - EventReplication: {}, - EventReplicationOperationFailedReplication: {}, - EventReplicationOperationNotTracked: {}, - EventReplicationOperationMissedThreshold: {}, - EventReplicationOperationReplicatedAfterThreshold: {}, - EventObjectRestoreDelete: {}, - EventLifecycleTransition: {}, - EventIntelligentTiering: {}, - EventObjectACLPut: {}, - EventLifecycleExpiration: {}, - EventLifecycleExpirationDelete: {}, - EventLifecycleExpirationDeleteMarkerCreated: {}, - EventObjectTagging: {}, - EventObjectTaggingPut: {}, - EventObjectTaggingDelete: {}, +type PutBucketNotificationConfigurationParams struct { + RequestInfo *api.ReqInfo + BktInfo *data.BucketInfo + Configuration *data.NotificationConfiguration } func (n *layer) PutBucketNotificationConfiguration(ctx context.Context, p *PutBucketNotificationConfigurationParams) error { - if !n.IsNotificationEnabled() { - return errors.GetAPIError(errors.ErrNotificationNotEnabled) - } - - var ( - buf bytes.Buffer - tee = io.TeeReader(p.Reader, &buf) - conf = &data.NotificationConfiguration{} - completed bool - err error - ) - - if err = xml.NewDecoder(tee).Decode(conf); err != nil { - return errors.GetAPIError(errors.ErrMalformedXML) - } - - if completed, err = n.checkBucketConfiguration(conf, p.RequestInfo); err != nil { + confXML, err := xml.Marshal(p.Configuration) + if err != nil { return err } - if completed { - confXML, err := xml.Marshal(conf) - if err != nil { - return err - } - buf.Reset() - buf.Write(confXML) - } s := &PutSystemObjectParams{ BktInfo: p.BktInfo, ObjName: p.BktInfo.NotificationConfigurationObjectName(), Metadata: map[string]string{}, - Prefix: "", - Reader: &buf, + Reader: bytes.NewReader(confXML), } obj, err := n.putSystemObjectIntoNeoFS(ctx, s) @@ -135,11 +35,11 @@ func (n *layer) PutBucketNotificationConfiguration(ctx context.Context, p *PutBu return err } - if obj.Size == 0 && !conf.IsEmpty() { + if obj.Size == 0 && !p.Configuration.IsEmpty() { return errors.GetAPIError(errors.ErrInternalError) } - if err = n.systemCache.PutNotificationConfiguration(systemObjectKey(p.BktInfo, s.ObjName), conf); err != nil { + if err = n.systemCache.PutNotificationConfiguration(systemObjectKey(p.BktInfo, s.ObjName), p.Configuration); err != nil { n.log.Error("couldn't cache system object", zap.Error(err)) } @@ -147,9 +47,6 @@ func (n *layer) PutBucketNotificationConfiguration(ctx context.Context, p *PutBu } func (n *layer) GetBucketNotificationConfiguration(ctx context.Context, bktInfo *data.BucketInfo) (*data.NotificationConfiguration, error) { - if !n.IsNotificationEnabled() { - return nil, errors.GetAPIError(errors.ErrNotificationNotEnabled) - } conf, err := n.getNotificationConf(ctx, bktInfo, bktInfo.NotificationConfigurationObjectName()) if err != nil { if errors.IsS3Error(err, errors.ErrNoSuchKey) { @@ -187,122 +84,3 @@ func (n *layer) getNotificationConf(ctx context.Context, bkt *data.BucketInfo, s return conf, nil } - -func (n *layer) SendNotifications(ctx context.Context, p *SendNotificationParams) error { - if !n.IsNotificationEnabled() { - return nil - } - - conf, err := n.getNotificationConf(ctx, p.BktInfo, p.BktInfo.NotificationConfigurationObjectName()) - if err != nil { - return err - } - if conf.IsEmpty() { - return nil - } - - box, err := GetBoxData(ctx) - if err == nil { - p.User = box.Gate.BearerToken.OwnerID().String() - } - - topics := filterSubjects(conf, p.Event, p.ObjInfo.Name) - - return n.ncontroller.SendNotifications(topics, p) -} - -// checkBucketConfiguration checks notification configuration and generates an ID for configurations with empty ids. -func (n *layer) checkBucketConfiguration(conf *data.NotificationConfiguration, r *api.ReqInfo) (completed bool, err error) { - if conf == nil { - return - } - - if conf.TopicConfigurations != nil || conf.LambdaFunctionConfigurations != nil { - return completed, errors.GetAPIError(errors.ErrNotificationTopicNotSupported) - } - - for i, q := range conf.QueueConfigurations { - if err = checkEvents(q.Events); err != nil { - return - } - - if err = checkRules(q.Filter.Key.FilterRules); err != nil { - return - } - - if err = n.ncontroller.SendTestNotification(q.QueueArn, r.BucketName, r.RequestID, r.Host); err != nil { - return - } - - if q.ID == "" { - completed = true - conf.QueueConfigurations[i].ID = uuid.NewString() - } - } - - return -} - -func filterSubjects(conf *data.NotificationConfiguration, eventType, objName string) map[string]string { - topics := make(map[string]string) - - for _, t := range conf.QueueConfigurations { - event := false - for _, e := range t.Events { - // the second condition is comparison with the events ending with *: - // s3:ObjectCreated:*, s3:ObjectRemoved:* etc without the last char - if eventType == e || strings.HasPrefix(eventType, e[:len(e)-1]) { - event = true - break - } - } - - if !event { - continue - } - - filter := true - for _, f := range t.Filter.Key.FilterRules { - if f.Name == filterRulePrefixName && !strings.HasPrefix(objName, f.Value) || - f.Name == filterRuleSuffixName && !strings.HasSuffix(objName, f.Value) { - filter = false - break - } - } - if filter { - topics[t.ID] = t.QueueArn - } - } - - return topics -} - -func checkRules(rules []data.FilterRule) error { - names := make(map[string]struct{}) - - for _, r := range rules { - if r.Name != filterRuleSuffixName && r.Name != filterRulePrefixName { - return errors.GetAPIError(errors.ErrFilterNameInvalid) - } - if _, ok := names[r.Name]; ok { - if r.Name == filterRuleSuffixName { - return errors.GetAPIError(errors.ErrFilterNameSuffix) - } - return errors.GetAPIError(errors.ErrFilterNamePrefix) - } - - names[r.Name] = struct{}{} - } - - return nil -} - -func checkEvents(events []string) error { - for _, e := range events { - if _, ok := validEvents[e]; !ok { - return errors.GetAPIError(errors.ErrEventNotification) - } - } - - return nil -} diff --git a/api/notifications/controller.go b/api/notifications/controller.go index 0c54a5a..1bf0409 100644 --- a/api/notifications/controller.go +++ b/api/notifications/controller.go @@ -8,6 +8,7 @@ import ( "time" "github.com/nats-io/nats.go" + "github.com/nspcc-dev/neofs-s3-gw/api/handler" "github.com/nspcc-dev/neofs-s3-gw/api/layer" "go.uber.org/zap" ) @@ -179,7 +180,7 @@ func (c *Controller) Listen(ctx context.Context) { } } -func (c *Controller) SendNotifications(topics map[string]string, p *layer.SendNotificationParams) error { +func (c *Controller) SendNotifications(topics map[string]string, p *handler.SendNotificationParams) error { event := prepareEvent(p) for id, topic := range topics { @@ -214,7 +215,7 @@ func (c *Controller) SendTestNotification(topic, bucketName, requestID, HostID s return c.publish(topic, msg) } -func prepareEvent(p *layer.SendNotificationParams) *Event { +func prepareEvent(p *handler.SendNotificationParams) *Event { return &Event{ Records: []EventRecord{ { diff --git a/cmd/s3-gw/app.go b/cmd/s3-gw/app.go index 7f92b25..47067e5 100644 --- a/cmd/s3-gw/app.go +++ b/cmd/s3-gw/app.go @@ -170,7 +170,7 @@ func newApp(ctx context.Context, l *zap.Logger, v *viper.Viper) *App { ctr = auth.New(neofs.NewAuthmateNeoFS(conns), key, getAccessBoxCacheConfig(v, l)) handlerOptions := getHandlerOptions(v, l) - if caller, err = handler.New(l, obj, handlerOptions); err != nil { + if caller, err = handler.New(l, obj, nc, handlerOptions); err != nil { l.Fatal("could not initialize API handler", zap.Error(err)) } @@ -386,6 +386,7 @@ func getHandlerOptions(v *viper.Viper, l *zap.Logger) *handler.Config { } cfg.DefaultMaxAge = defaultMaxAge + cfg.NotificatorEnabled = v.GetBool(cfgEnableNATS) return &cfg }