From 44548212858db1d13a27c9b89cfe55041cde1e00 Mon Sep 17 00:00:00 2001 From: Angira Kekteeva Date: Thu, 17 Feb 2022 21:58:51 +0300 Subject: [PATCH] [#340] Add notification configuration handlers Signed-off-by: Angira Kekteeva --- api/cache/system.go | 18 +++++ api/data/info.go | 9 ++- api/data/notifications.go | 69 +++++++++++++++++ api/errors/errors.go | 14 ++++ api/handler/not_support.go | 4 - api/handler/notifications.go | 63 +++++++++++++++ api/handler/unimplemented.go | 4 - api/layer/layer.go | 3 + api/layer/notifications.go | 146 +++++++++++++++++++++++++++++++++++ 9 files changed, 320 insertions(+), 10 deletions(-) create mode 100644 api/data/notifications.go create mode 100644 api/handler/notifications.go create mode 100644 api/layer/notifications.go diff --git a/api/cache/system.go b/api/cache/system.go index 71d8325..5b9a5e6 100644 --- a/api/cache/system.go +++ b/api/cache/system.go @@ -61,6 +61,20 @@ func (o *SystemCache) GetCORS(key string) *data.CORSConfiguration { return result } +func (o *SystemCache) GetNotificationConfiguration(key string) *data.NotificationConfiguration { + entry, err := o.cache.Get(key) + if err != nil { + return nil + } + + result, ok := entry.(*data.NotificationConfiguration) + if !ok { + return nil + } + + return result +} + // PutObject puts an object to cache. func (o *SystemCache) PutObject(key string, obj *data.ObjectInfo) error { return o.cache.Set(key, obj) @@ -70,6 +84,10 @@ func (o *SystemCache) PutCORS(key string, obj *data.CORSConfiguration) error { return o.cache.Set(key, obj) } +func (o *SystemCache) PutNotificationConfiguration(key string, obj *data.NotificationConfiguration) error { + return o.cache.Set(key, obj) +} + // Delete deletes an object from cache. func (o *SystemCache) Delete(key string) bool { return o.cache.Remove(key) diff --git a/api/data/info.go b/api/data/info.go index a2b3ad7..77a57d9 100644 --- a/api/data/info.go +++ b/api/data/info.go @@ -10,8 +10,9 @@ import ( ) const ( - bktVersionSettingsObject = ".s3-versioning-settings" - bktCORSConfigurationObject = ".s3-cors" + bktVersionSettingsObject = ".s3-versioning-settings" + bktCORSConfigurationObject = ".s3-cors" + bktNotificationConfigurationObject = ".s3-notifications" ) type ( @@ -65,6 +66,10 @@ func (b *BucketInfo) SettingsObjectName() string { return bktVersionSettingsObje // CORSObjectName returns system name for bucket CORS configuration file. func (b *BucketInfo) CORSObjectName() string { return bktCORSConfigurationObject } +func (b *BucketInfo) NotificationConfigurationObjectName() string { + return bktNotificationConfigurationObject +} + // Version returns object version from ObjectInfo. func (o *ObjectInfo) Version() string { return o.ID.String() } diff --git a/api/data/notifications.go b/api/data/notifications.go new file mode 100644 index 0000000..655aba8 --- /dev/null +++ b/api/data/notifications.go @@ -0,0 +1,69 @@ +package data + +type ( + NotificationConfiguration struct { + QueueConfigurations []QueueConfiguration `xml:"QueueConfiguration" json:"QueueConfigurations"` + // Not supported topics + TopicConfigurations []TopicConfiguration `xml:"TopicConfiguration" json:"TopicConfigurations"` + LambdaFunctionConfigurations []LambdaFunctionConfiguration `xml:"CloudFunctionConfiguration" json:"CloudFunctionConfigurations"` + } + + QueueConfiguration struct { + ID string `xml:"Id" json:"Id"` + QueueArn string `xml:"Queue" json:"Queue"` + Events []string `xml:"Event" json:"Events"` + Filter Filter `xml:"Filter" json:"Filter"` + } + + Filter struct { + Key Key `xml:"S3Key" json:"S3Key"` + } + + Key struct { + FilterRules []FilterRule `xml:"FilterRule" json:"FilterRules"` + } + + FilterRule struct { + Name string `xml:"Name" json:"Name"` + Value string `xml:"Value" json:"Value"` + } + + // TopicConfiguration and LambdaFunctionConfiguration -- we don't support these configurations + // but we need them to detect in notification configurations in requests. + TopicConfiguration struct{} + LambdaFunctionConfiguration struct{} +) + +var ValidEvents = map[string]struct{}{ + "s3:ReducedRedundancyLostObject": {}, + "s3:ObjectCreated:*": {}, + "s3:ObjectCreated:Put": {}, + "s3:ObjectCreated:Post": {}, + "s3:ObjectCreated:Copy": {}, + "s3:ObjectCreated:CompleteMultipartUpload": {}, + "s3:ObjectRemoved:*": {}, + "s3:ObjectRemoved:Delete": {}, + "s3:ObjectRemoved:DeleteMarkerCreated": {}, + "s3:ObjectRestore:*": {}, + "s3:ObjectRestore:Post": {}, + "s3:ObjectRestore:Completed": {}, + "s3:Replication:*": {}, + "s3:Replication:OperationFailedReplication": {}, + "s3:Replication:OperationNotTracked": {}, + "s3:Replication:OperationMissedThreshold": {}, + "s3:Replication:OperationReplicatedAfterThreshold": {}, + "s3:ObjectRestore:Delete": {}, + "s3:LifecycleTransition": {}, + "s3:IntelligentTiering": {}, + "s3:ObjectAcl:Put": {}, + "s3:LifecycleExpiration:*": {}, + "s3:LifecycleExpiration:Delete": {}, + "s3:LifecycleExpiration:DeleteMarkerCreated": {}, + "s3:ObjectTagging:*": {}, + "s3:ObjectTagging:Put": {}, + "s3:ObjectTagging:Delete": {}, +} + +func (n NotificationConfiguration) IsEmpty() bool { + return len(n.QueueConfigurations) == 0 && len(n.TopicConfigurations) == 0 && len(n.LambdaFunctionConfigurations) == 0 +} diff --git a/api/errors/errors.go b/api/errors/errors.go index b990a20..61133c0 100644 --- a/api/errors/errors.go +++ b/api/errors/errors.go @@ -153,6 +153,7 @@ const ( ErrInvalidToken // Bucket notification related errors. + ErrNotificationNotEnabled ErrEventNotification ErrARNNotification ErrRegionNotification @@ -162,6 +163,7 @@ const ( ErrFilterNameSuffix ErrFilterValueInvalid ErrOverlappingConfigs + ErrNotificationTopicNotSupported // S3 extended errors. ErrContentSHA256Mismatch @@ -869,6 +871,12 @@ var errorCodes = errorCodeMap{ Description: "x-amz-object-lock-retain-until-date and x-amz-object-lock-mode must both be supplied", HTTPStatusCode: http.StatusBadRequest, }, + ErrNotificationNotEnabled: { + ErrCode: ErrNotificationNotEnabled, + Code: "InvalidRequest", + Description: "Notifications are not enabled in the gateway. Please connect to the other gateway", + HTTPStatusCode: http.StatusBadRequest, + }, // Bucket notification related errors. ErrEventNotification: { ErrCode: ErrEventNotification, @@ -876,6 +884,12 @@ var errorCodes = errorCodeMap{ Description: "A specified event is not supported for notifications.", HTTPStatusCode: http.StatusBadRequest, }, + ErrNotificationTopicNotSupported: { + ErrCode: ErrNotificationTopicNotSupported, + Code: "InvalidArgument", + Description: "SNS and Lambda configurations are not supported ", + HTTPStatusCode: http.StatusBadRequest, + }, ErrARNNotification: { ErrCode: ErrARNNotification, Code: "InvalidArgument", diff --git a/api/handler/not_support.go b/api/handler/not_support.go index 201488d..d747bd8 100644 --- a/api/handler/not_support.go +++ b/api/handler/not_support.go @@ -18,7 +18,3 @@ func (h *handler) DeleteBucketLifecycleHandler(w http.ResponseWriter, r *http.Re func (h *handler) DeleteBucketEncryptionHandler(w http.ResponseWriter, r *http.Request) { h.logAndSendError(w, "not supported", api.GetReqInfo(r.Context()), errors.GetAPIError(errors.ErrNotSupported)) } - -func (h *handler) PutBucketNotificationHandler(w http.ResponseWriter, r *http.Request) { - h.logAndSendError(w, "not supported", api.GetReqInfo(r.Context()), errors.GetAPIError(errors.ErrNotSupported)) -} diff --git a/api/handler/notifications.go b/api/handler/notifications.go new file mode 100644 index 0000000..983024a --- /dev/null +++ b/api/handler/notifications.go @@ -0,0 +1,63 @@ +package handler + +import ( + "encoding/xml" + "net/http" + + "github.com/nspcc-dev/neofs-s3-gw/api" + "github.com/nspcc-dev/neofs-s3-gw/api/data" + "github.com/nspcc-dev/neofs-s3-gw/api/layer" +) + +type NotificationConfiguration struct { + XMLName xml.Name `xml:"http://s3.amazonaws.com/doc/2006-03-01/ NotificationConfiguation"` + NotificationConfiguration data.NotificationConfiguration +} + +func (h *handler) PutBucketNotificationHandler(w http.ResponseWriter, r *http.Request) { + reqInfo := api.GetReqInfo(r.Context()) + bktInfo, err := h.obj.GetBucketInfo(r.Context(), reqInfo.BucketName) + if err != nil { + h.logAndSendError(w, "could not get bucket info", reqInfo, err) + return + } + if err := checkOwner(bktInfo, r.Header.Get(api.AmzExpectedBucketOwner)); err != nil { + h.logAndSendError(w, "expected owner doesn't match", reqInfo, err) + return + } + + p := &layer.PutBucketNotificationConfigurationParams{ + BktInfo: bktInfo, + Reader: r.Body, + } + + if err := h.obj.PutBucketNotificationConfiguration(r.Context(), p); err != nil { + h.logAndSendError(w, "couldn't put bucket configuration", reqInfo, err) + return + } +} + +func (h *handler) GetBucketNotificationHandler(w http.ResponseWriter, r *http.Request) { + reqInfo := api.GetReqInfo(r.Context()) + + bktInfo, err := h.obj.GetBucketInfo(r.Context(), reqInfo.BucketName) + if err != nil { + h.logAndSendError(w, "could not get bucket info", reqInfo, err) + return + } + + if err = checkOwner(bktInfo, r.Header.Get(api.AmzExpectedBucketOwner)); err != nil { + h.logAndSendError(w, "expected owner doesn't match", reqInfo, err) + return + } + conf, err := h.obj.GetBucketNotificationConfiguration(r.Context(), bktInfo) + if err != nil { + h.logAndSendError(w, "could not get bucket notification configuration", reqInfo, err) + return + } + + if err = api.EncodeToResponse(w, conf); err != nil { + h.logAndSendError(w, "could not encode bucket notification configuration to response", reqInfo, err) + return + } +} diff --git a/api/handler/unimplemented.go b/api/handler/unimplemented.go index a6ce2fd..0615d62 100644 --- a/api/handler/unimplemented.go +++ b/api/handler/unimplemented.go @@ -63,10 +63,6 @@ func (h *handler) GetBucketObjectLockConfigHandler(w http.ResponseWriter, r *htt h.logAndSendError(w, "not implemented", api.GetReqInfo(r.Context()), errors.GetAPIError(errors.ErrNotImplemented)) } -func (h *handler) GetBucketNotificationHandler(w http.ResponseWriter, r *http.Request) { - h.logAndSendError(w, "not implemented", api.GetReqInfo(r.Context()), errors.GetAPIError(errors.ErrNotImplemented)) -} - func (h *handler) ListenBucketNotificationHandler(w http.ResponseWriter, r *http.Request) { h.logAndSendError(w, "not implemented", api.GetReqInfo(r.Context()), errors.GetAPIError(errors.ErrNotImplemented)) } diff --git a/api/layer/layer.go b/api/layer/layer.go index 6c71078..1b9a0dd 100644 --- a/api/layer/layer.go +++ b/api/layer/layer.go @@ -235,6 +235,9 @@ type ( AbortMultipartUpload(ctx context.Context, p *UploadInfoParams) error ListParts(ctx context.Context, p *ListPartsParams) (*ListPartsInfo, error) GetUploadInitInfo(ctx context.Context, p *UploadInfoParams) (*data.ObjectInfo, error) + + PutBucketNotificationConfiguration(ctx context.Context, p *PutBucketNotificationConfigurationParams) error + GetBucketNotificationConfiguration(ctx context.Context, bktInfo *data.BucketInfo) (*data.NotificationConfiguration, error) } ) diff --git a/api/layer/notifications.go b/api/layer/notifications.go new file mode 100644 index 0000000..2b0c751 --- /dev/null +++ b/api/layer/notifications.go @@ -0,0 +1,146 @@ +package layer + +import ( + "bytes" + "context" + "encoding/xml" + "io" + + "github.com/google/uuid" + "github.com/nspcc-dev/neofs-s3-gw/api/data" + "github.com/nspcc-dev/neofs-s3-gw/api/errors" + "go.uber.org/zap" +) + +type ( + PutBucketNotificationConfigurationParams struct { + BktInfo *data.BucketInfo + Reader io.Reader + } +) + +func (n *layer) PutBucketNotificationConfiguration(ctx context.Context, p *PutBucketNotificationConfigurationParams) error { + if !n.IsNotificationEnabled() { + return errors.GetAPIError(errors.ErrNotificationNotEnabled) + } + + var ( + buf bytes.Buffer + tee = io.TeeReader(p.Reader, &buf) + conf = &data.NotificationConfiguration{} + completed bool + err error + ) + + if err = xml.NewDecoder(tee).Decode(conf); err != nil { + return errors.GetAPIError(errors.ErrMalformedXML) + } + + if completed, err = n.checkAndCompleteNotificationConfiguration(conf); err != nil { + return err + } + if completed { + confXML, err := xml.Marshal(conf) + if err != nil { + return err + } + buf.Reset() + buf.Write(confXML) + } + + s := &PutSystemObjectParams{ + BktInfo: p.BktInfo, + ObjName: p.BktInfo.NotificationConfigurationObjectName(), + Metadata: map[string]string{}, + Prefix: "", + Reader: &buf, + } + + obj, err := n.putSystemObjectIntoNeoFS(ctx, s) + if err != nil { + return err + } + + if obj.Size == 0 && !conf.IsEmpty() { + return errors.GetAPIError(errors.ErrInternalError) + } + + if err = n.systemCache.PutNotificationConfiguration(systemObjectKey(p.BktInfo, s.ObjName), conf); err != nil { + n.log.Error("couldn't cache system object", zap.Error(err)) + } + + return nil +} + +func (n *layer) GetBucketNotificationConfiguration(ctx context.Context, bktInfo *data.BucketInfo) (*data.NotificationConfiguration, error) { + if !n.IsNotificationEnabled() { + return nil, errors.GetAPIError(errors.ErrNotificationNotEnabled) + } + conf, err := n.getNotificationConf(ctx, bktInfo, bktInfo.NotificationConfigurationObjectName()) + if err != nil { + if errors.IsS3Error(err, errors.ErrNoSuchKey) { + return &data.NotificationConfiguration{}, nil + } + return nil, err + } + + return conf, nil +} + +func (n *layer) getNotificationConf(ctx context.Context, bkt *data.BucketInfo, sysName string) (*data.NotificationConfiguration, error) { + if conf := n.systemCache.GetNotificationConfiguration(systemObjectKey(bkt, sysName)); conf != nil { + return conf, nil + } + + obj, err := n.getSystemObjectFromNeoFS(ctx, bkt, sysName) + if err != nil { + return nil, err + } + + conf := &data.NotificationConfiguration{} + + if err = xml.Unmarshal(obj.Payload(), &conf); err != nil { + return nil, err + } + + if err = n.systemCache.PutNotificationConfiguration(systemObjectKey(bkt, sysName), conf); err != nil { + n.log.Warn("couldn't put system meta to objects cache", + zap.Stringer("object id", obj.ID()), + zap.Stringer("bucket id", bkt.CID), + zap.Error(err)) + } + + return conf, nil +} + +func (n *layer) checkAndCompleteNotificationConfiguration(c *data.NotificationConfiguration) (completed bool, err error) { + if c == nil { + return + } + + if c.TopicConfigurations != nil || c.LambdaFunctionConfigurations != nil { + return completed, errors.GetAPIError(errors.ErrNotificationTopicNotSupported) + } + + for i, q := range c.QueueConfigurations { + if err = checkEvents(q.Events); err != nil { + return + } + if q.ID == "" { + completed = true + c.QueueConfigurations[i].ID = uuid.NewString() + } + } + + return +} + +func checkEvents(events []string) error { + for _, e := range events { + if _, ok := data.ValidEvents[e]; !ok { + return errors.GetAPIError(errors.ErrEventNotification) + } + } + + return nil +}