From 40e7dbf768251ef9eca33f9804e3667ec12d4d7d Mon Sep 17 00:00:00 2001 From: Angira Kekteeva Date: Thu, 31 Mar 2022 10:10:27 +0400 Subject: [PATCH] [#357] Add test events and check of bucket notif conf Signed-off-by: Angira Kekteeva --- api/data/notifications.go | 30 --------- api/handler/notifications.go | 5 +- api/layer/layer.go | 1 + api/layer/notifications.go | 114 +++++++++++++++++++++++++++++--- api/notifications/controller.go | 66 +++++++++++++----- 5 files changed, 157 insertions(+), 59 deletions(-) diff --git a/api/data/notifications.go b/api/data/notifications.go index 655aba8f..6ff38984 100644 --- a/api/data/notifications.go +++ b/api/data/notifications.go @@ -34,36 +34,6 @@ type ( LambdaFunctionConfiguration struct{} ) -var ValidEvents = map[string]struct{}{ - "s3:ReducedRedundancyLostObject": {}, - "s3:ObjectCreated:*": {}, - "s3:ObjectCreated:Put": {}, - "s3:ObjectCreated:Post": {}, - "s3:ObjectCreated:Copy": {}, - "s3:ObjectCreated:CompleteMultipartUpload": {}, - "s3:ObjectRemoved:*": {}, - "s3:ObjectRemoved:Delete": {}, - "s3:ObjectRemoved:DeleteMarkerCreated": {}, - "s3:ObjectRestore:*": {}, - "s3:ObjectRestore:Post": {}, - "s3:ObjectRestore:Completed": {}, - "s3:Replication:*": {}, - "s3:Replication:OperationFailedReplication": {}, - "s3:Replication:OperationNotTracked": {}, - "s3:Replication:OperationMissedThreshold": {}, - "s3:Replication:OperationReplicatedAfterThreshold": {}, - "s3:ObjectRestore:Delete": {}, - "s3:LifecycleTransition": {}, - "s3:IntelligentTiering": {}, - "s3:ObjectAcl:Put": {}, - "s3:LifecycleExpiration:*": {}, - "s3:LifecycleExpiration:Delete": {}, - "s3:LifecycleExpiration:DeleteMarkerCreated": {}, - "s3:ObjectTagging:*": {}, - "s3:ObjectTagging:Put": {}, - "s3:ObjectTagging:Delete": {}, -} - func (n NotificationConfiguration) IsEmpty() bool { return len(n.QueueConfigurations) == 0 && len(n.TopicConfigurations) == 0 && len(n.LambdaFunctionConfigurations) == 0 } diff --git a/api/handler/notifications.go b/api/handler/notifications.go index 825618ad..06608fd9 100644 --- a/api/handler/notifications.go +++ b/api/handler/notifications.go @@ -23,8 +23,9 @@ func (h *handler) PutBucketNotificationHandler(w http.ResponseWriter, r *http.Re } p := &layer.PutBucketNotificationConfigurationParams{ - BktInfo: bktInfo, - Reader: r.Body, + RequestInfo: reqInfo, + BktInfo: bktInfo, + Reader: r.Body, } if err := h.obj.PutBucketNotificationConfiguration(r.Context(), p); err != nil { diff --git a/api/layer/layer.go b/api/layer/layer.go index d0790162..e479021e 100644 --- a/api/layer/layer.go +++ b/api/layer/layer.go @@ -32,6 +32,7 @@ type ( Notificator interface { Subscribe(context.Context, string, MsgHandler) error Listen(context.Context) + SendTestNotification(topic, bucketName, requestID, HostID string) error } MsgHandler interface { diff --git a/api/layer/notifications.go b/api/layer/notifications.go index 2b0c7515..97c12728 100644 --- a/api/layer/notifications.go +++ b/api/layer/notifications.go @@ -5,8 +5,10 @@ import ( "context" "encoding/xml" "io" + "strings" "github.com/google/uuid" + "github.com/nspcc-dev/neofs-s3-gw/api" "github.com/nspcc-dev/neofs-s3-gw/api/data" "github.com/nspcc-dev/neofs-s3-gw/api/errors" "go.uber.org/zap" @@ -14,11 +16,75 @@ import ( type ( PutBucketNotificationConfigurationParams struct { - BktInfo *data.BucketInfo - Reader io.Reader + RequestInfo *api.ReqInfo + BktInfo *data.BucketInfo + Reader io.Reader } ) +const ( + filterRuleSuffixName = "suffix" + filterRulePrefixName = "prefix" + + EventObjectCreated = "s3:ObjectCreated:*" + EventObjectCreatedPut = "s3:ObjectCreated:Put" + EventObjectCreatedPost = "s3:ObjectCreated:Post" + EventObjectCreatedCopy = "s3:ObjectCreated:Copy" + EventReducedRedundancyLostObject = "s3:ReducedRedundancyLostObject" + EventObjectCreatedCompleteMultipartUpload = "s3:ObjectCreated:CompleteMultipartUpload" + EventObjectRemoved = "s3:ObjectRemoved:*" + EventObjectRemovedDelete = "s3:ObjectRemoved:Delete" + EventObjectRemovedDeleteMarkerCreated = "s3:ObjectRemoved:DeleteMarkerCreated" + EventObjectRestore = "s3:ObjectRestore:*" + EventObjectRestorePost = "s3:ObjectRestore:Post" + EventObjectRestoreCompleted = "s3:ObjectRestore:Completed" + EventReplication = "s3:Replication:*" + EventReplicationOperationFailedReplication = "s3:Replication:OperationFailedReplication" + EventReplicationOperationNotTracked = "s3:Replication:OperationNotTracked" + EventReplicationOperationMissedThreshold = "s3:Replication:OperationMissedThreshold" + EventReplicationOperationReplicatedAfterThreshold = "s3:Replication:OperationReplicatedAfterThreshold" + EventObjectRestoreDelete = "s3:ObjectRestore:Delete" + EventLifecycleTransition = "s3:LifecycleTransition" + EventIntelligentTiering = "s3:IntelligentTiering" + EventObjectACLPut = "s3:ObjectAcl:Put" + EventLifecycleExpiration = "s3:LifecycleExpiration:*" + EventLifecycleExpirationDelete = "s3:LifecycleExpiration:Delete" + EventLifecycleExpirationDeleteMarkerCreated = "s3:LifecycleExpiration:DeleteMarkerCreated" + EventObjectTagging = "s3:ObjectTagging:*" + EventObjectTaggingPut = "s3:ObjectTagging:Put" + EventObjectTaggingDelete = "s3:ObjectTagging:Delete" +) + +var validEvents = map[string]struct{}{ + EventReducedRedundancyLostObject: {}, + EventObjectCreated: {}, + EventObjectCreatedPut: {}, + EventObjectCreatedPost: {}, + EventObjectCreatedCopy: {}, + EventObjectCreatedCompleteMultipartUpload: {}, + EventObjectRemoved: {}, + EventObjectRemovedDelete: {}, + EventObjectRemovedDeleteMarkerCreated: {}, + EventObjectRestore: {}, + EventObjectRestorePost: {}, + EventObjectRestoreCompleted: {}, + EventReplication: {}, + EventReplicationOperationFailedReplication: {}, + EventReplicationOperationNotTracked: {}, + EventReplicationOperationMissedThreshold: {}, + EventReplicationOperationReplicatedAfterThreshold: {}, + EventObjectRestoreDelete: {}, + EventLifecycleTransition: {}, + EventIntelligentTiering: {}, + EventObjectACLPut: {}, + EventLifecycleExpiration: {}, + EventLifecycleExpirationDelete: {}, + EventLifecycleExpirationDeleteMarkerCreated: {}, + EventObjectTagging: {}, + EventObjectTaggingPut: {}, + EventObjectTaggingDelete: {}, +} + func (n *layer) PutBucketNotificationConfiguration(ctx context.Context, p *PutBucketNotificationConfigurationParams) error { if !n.IsNotificationEnabled() { return errors.GetAPIError(errors.ErrNotificationNotEnabled) @@ -36,7 +102,7 @@ func (n *layer) PutBucketNotificationConfiguration(ctx context.Context, p *PutBu return errors.GetAPIError(errors.ErrMalformedXML) } - if completed, err = n.checkAndCompleteNotificationConfiguration(conf); err != nil { + if completed, err = n.checkBucketConfiguration(conf, p.RequestInfo); err != nil { return err } if completed { @@ -113,31 +179,61 @@ func (n *layer) getNotificationConf(ctx context.Context, bkt *data.BucketInfo, s return conf, nil } -func (n *layer) checkAndCompleteNotificationConfiguration(c *data.NotificationConfiguration) (completed bool, err error) { - if c == nil { +// checkBucketConfiguration checks notification configuration and generates ID for configurations with empty ids. +func (n *layer) checkBucketConfiguration(conf *data.NotificationConfiguration, r *api.ReqInfo) (completed bool, err error) { + if conf == nil { return } - if c.TopicConfigurations != nil || c.LambdaFunctionConfigurations != nil { + if conf.TopicConfigurations != nil || conf.LambdaFunctionConfigurations != nil { return completed, errors.GetAPIError(errors.ErrNotificationTopicNotSupported) } - for i, q := range c.QueueConfigurations { + for i, q := range conf.QueueConfigurations { if err = checkEvents(q.Events); err != nil { return } + + if err = checkRules(q.Filter.Key.FilterRules); err != nil { + return + } + + if err = n.ncontroller.SendTestNotification(q.QueueArn, r.BucketName, r.RequestID, r.Host); err != nil { + return + } + if q.ID == "" { completed = true - c.QueueConfigurations[i].ID = uuid.NewString() + conf.QueueConfigurations[i].ID = uuid.NewString() } } return } +func checkRules(rules []data.FilterRule) error { + names := make(map[string]struct{}) + + for _, r := range rules { + if r.Name != filterRuleSuffixName && r.Name != filterRulePrefixName { + return errors.GetAPIError(errors.ErrFilterNameInvalid) + } + if _, ok := names[r.Name]; ok { + if r.Name == filterRuleSuffixName { + return errors.GetAPIError(errors.ErrFilterNameSuffix) + } + return errors.GetAPIError(errors.ErrFilterNamePrefix) + } + + names[r.Name] = struct{}{} + } + + return nil +} + func checkEvents(events []string) error { for _, e := range events { - if _, ok := data.ValidEvents[e]; !ok { + if _, ok := validEvents[e]; !ok { return errors.GetAPIError(errors.ErrEventNotification) } } diff --git a/api/notifications/controller.go b/api/notifications/controller.go index 8e754cd0..be654b9b 100644 --- a/api/notifications/controller.go +++ b/api/notifications/controller.go @@ -2,6 +2,7 @@ package notifications import ( "context" + "encoding/json" "fmt" "sync" "time" @@ -15,26 +16,37 @@ const ( DefaultTimeout = 30 * time.Second ) -type Options struct { - URL string - TLSCertFilepath string - TLSAuthPrivateKeyFilePath string - Timeout time.Duration - RootCAFiles []string -} +type ( + Options struct { + URL string + TLSCertFilepath string + TLSAuthPrivateKeyFilePath string + Timeout time.Duration + RootCAFiles []string + } -type Controller struct { - logger *zap.Logger - taskQueueConnection *nats.Conn - jsClient nats.JetStreamContext - handlers map[string]Stream - mu sync.RWMutex -} + Controller struct { + logger *zap.Logger + taskQueueConnection *nats.Conn + jsClient nats.JetStreamContext + handlers map[string]Stream + mu sync.RWMutex + } -type Stream struct { - h layer.MsgHandler - ch chan *nats.Msg -} + Stream struct { + h layer.MsgHandler + ch chan *nats.Msg + } + + TestEvent struct { + Service string + Event string + Time time.Time + Bucket string + RequestID string + HostID string + } +) func NewController(p *Options, l *zap.Logger) (*Controller, error) { ncopts := []nats.Option{ @@ -114,3 +126,21 @@ func (c *Controller) Listen(ctx context.Context) { }(stream) } } + +func (c *Controller) SendTestNotification(topic, bucketName, requestID, HostID string) error { + event := &TestEvent{ + Service: "NeoFS S3", + Event: "s3:TestEvent", + Time: time.Now(), + Bucket: bucketName, + RequestID: requestID, + HostID: HostID, + } + + msg, err := json.Marshal(event) + if err != nil { + return fmt.Errorf("couldn't marshal test event: %w", err) + } + + return c.publish(topic, msg) +}