From 9432782ce6ddfe18bae0e699b1906876b965b7d6 Mon Sep 17 00:00:00 2001 From: Denis Kirillov Date: Tue, 25 Jun 2024 15:24:29 +0300 Subject: [PATCH] [#401] Drop notifications Signed-off-by: Denis Kirillov --- CHANGELOG.md | 1 + api/cache/cache_test.go | 18 -- api/cache/system.go | 20 --- api/data/info.go | 27 +-- api/data/notifications.go | 42 ----- api/handler/acl.go | 15 +- api/handler/api.go | 37 ++-- api/handler/copy.go | 12 +- api/handler/delete.go | 39 ----- api/handler/handlers_test.go | 4 - api/handler/multipart_upload.go | 17 +- api/handler/notifications.go | 274 ------------------------------ api/handler/notifications_test.go | 115 ------------- api/handler/put.go | 24 +-- api/handler/tagging.go | 39 +---- api/handler/unimplemented.go | 8 + api/layer/cache.go | 21 --- api/layer/layer.go | 42 +---- api/layer/notifications.go | 89 ---------- api/layer/tagging.go | 22 +-- api/layer/tree_mock.go | 8 - api/layer/tree_service.go | 11 -- api/notifications/controller.go | 263 ---------------------------- cmd/s3-gw/app.go | 41 +---- cmd/s3-gw/app_settings.go | 22 --- config/config.env | 10 +- config/config.yaml | 10 +- docs/configuration.md | 55 ++---- docs/tree_service.md | 1 - go.mod | 4 - go.sum | 13 -- internal/logs/logs.go | 11 -- pkg/service/tree/tree.go | 33 +--- 33 files changed, 66 insertions(+), 1282 deletions(-) delete mode 100644 api/data/notifications.go delete mode 100644 api/handler/notifications.go delete mode 100644 api/handler/notifications_test.go delete mode 100644 api/layer/notifications.go delete mode 100644 api/notifications/controller.go diff --git a/CHANGELOG.md b/CHANGELOG.md index 487ba9f..2b87321 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -24,6 +24,7 @@ This document outlines major changes between releases. ### Removed - Remove control api (#406) +- Remove notifications (#401) ## [0.29.0] - Zemu - 2024-05-27 diff --git a/api/cache/cache_test.go b/api/cache/cache_test.go index b4666c1..31aceba 100644 --- a/api/cache/cache_test.go +++ b/api/cache/cache_test.go @@ -182,24 +182,6 @@ func TestSettingsCacheType(t *testing.T) { assertInvalidCacheEntry(t, cache.GetSettings(key), observedLog) } -func TestNotificationConfigurationCacheType(t *testing.T) { - logger, observedLog := getObservedLogger() - cache := NewSystemCache(DefaultSystemConfig(logger)) - - key := "key" - notificationConfig := &data.NotificationConfiguration{} - - err := cache.PutNotificationConfiguration(key, notificationConfig) - require.NoError(t, err) - val := cache.GetNotificationConfiguration(key) - require.Equal(t, notificationConfig, val) - require.Equal(t, 0, observedLog.Len()) - - err = cache.cache.Set(key, "tmp") - require.NoError(t, err) - assertInvalidCacheEntry(t, cache.GetNotificationConfiguration(key), observedLog) -} - func TestFrostFSIDSubjectCacheType(t *testing.T) { logger, observedLog := getObservedLogger() cache := NewFrostfsIDCache(DefaultFrostfsIDConfig(logger)) diff --git a/api/cache/system.go b/api/cache/system.go index 292ae58..c0f51a5 100644 --- a/api/cache/system.go +++ b/api/cache/system.go @@ -104,22 +104,6 @@ func (o *SystemCache) GetSettings(key string) *data.BucketSettings { return result } -func (o *SystemCache) GetNotificationConfiguration(key string) *data.NotificationConfiguration { - entry, err := o.cache.Get(key) - if err != nil { - return nil - } - - result, ok := entry.(*data.NotificationConfiguration) - if !ok { - o.logger.Warn(logs.InvalidCacheEntryType, zap.String("actual", fmt.Sprintf("%T", entry)), - zap.String("expected", fmt.Sprintf("%T", result))) - return nil - } - - return result -} - // GetTagging returns tags of a bucket or an object. func (o *SystemCache) GetTagging(key string) map[string]string { entry, err := o.cache.Get(key) @@ -153,10 +137,6 @@ func (o *SystemCache) PutSettings(key string, settings *data.BucketSettings) err return o.cache.Set(key, settings) } -func (o *SystemCache) PutNotificationConfiguration(key string, obj *data.NotificationConfiguration) error { - return o.cache.Set(key, obj) -} - // PutTagging puts tags of a bucket or an object. func (o *SystemCache) PutTagging(key string, tagSet map[string]string) error { return o.cache.Set(key, tagSet) diff --git a/api/data/info.go b/api/data/info.go index 041e001..3d85788 100644 --- a/api/data/info.go +++ b/api/data/info.go @@ -12,9 +12,8 @@ import ( ) const ( - bktSettingsObject = ".s3-settings" - bktCORSConfigurationObject = ".s3-cors" - bktNotificationConfigurationObject = ".s3-notifications" + bktSettingsObject = ".s3-settings" + bktCORSConfigurationObject = ".s3-cors" VersioningUnversioned = "Unversioned" VersioningEnabled = "Enabled" @@ -52,14 +51,6 @@ type ( Headers map[string]string } - // NotificationInfo store info to send s3 notification. - NotificationInfo struct { - Name string - Version string - Size uint64 - HashSum string - } - // BucketSettings stores settings such as versioning. BucketSettings struct { Versioning string @@ -93,26 +84,12 @@ type ( } ) -// NotificationInfoFromObject creates new NotificationInfo from ObjectInfo. -func NotificationInfoFromObject(objInfo *ObjectInfo, md5Enabled bool) *NotificationInfo { - return &NotificationInfo{ - Name: objInfo.Name, - Version: objInfo.VersionID(), - Size: objInfo.Size, - HashSum: Quote(objInfo.ETag(md5Enabled)), - } -} - // SettingsObjectName is a system name for a bucket settings file. func (b *BucketInfo) SettingsObjectName() string { return bktSettingsObject } // CORSObjectName returns a system name for a bucket CORS configuration file. func (b *BucketInfo) CORSObjectName() string { return bktCORSConfigurationObject } -func (b *BucketInfo) NotificationConfigurationObjectName() string { - return bktNotificationConfigurationObject -} - // VersionID returns object version from ObjectInfo. func (o *ObjectInfo) VersionID() string { return o.ID.EncodeToString() } diff --git a/api/data/notifications.go b/api/data/notifications.go deleted file mode 100644 index 0a894e5..0000000 --- a/api/data/notifications.go +++ /dev/null @@ -1,42 +0,0 @@ -package data - -import "encoding/xml" - -type ( - NotificationConfiguration struct { - XMLName xml.Name `xml:"http://s3.amazonaws.com/doc/2006-03-01/ NotificationConfiguration" json:"-"` - QueueConfigurations []QueueConfiguration `xml:"QueueConfiguration" json:"QueueConfigurations"` - // Not supported topics - TopicConfigurations []TopicConfiguration `xml:"TopicConfiguration" json:"TopicConfigurations"` - LambdaFunctionConfigurations []LambdaFunctionConfiguration `xml:"CloudFunctionConfiguration" json:"CloudFunctionConfigurations"` - } - - QueueConfiguration struct { - ID string `xml:"Id" json:"Id"` - QueueArn string `xml:"Queue" json:"Queue"` - Events []string `xml:"Event" json:"Events"` - Filter Filter `xml:"Filter" json:"Filter"` - } - - Filter struct { - Key Key `xml:"S3Key" json:"S3Key"` - } - - Key struct { - FilterRules []FilterRule `xml:"FilterRule" json:"FilterRules"` - } - - FilterRule struct { - Name string `xml:"Name" json:"Name"` - Value string `xml:"Value" json:"Value"` - } - - // TopicConfiguration and LambdaFunctionConfiguration -- we don't support these configurations, - // but we need them to detect in notification configurations in requests. - TopicConfiguration struct{} - LambdaFunctionConfiguration struct{} -) - -func (n NotificationConfiguration) IsEmpty() bool { - return len(n.QueueConfigurations) == 0 && len(n.TopicConfigurations) == 0 && len(n.LambdaFunctionConfigurations) == 0 -} diff --git a/api/handler/acl.go b/api/handler/acl.go index 4219c27..7933340 100644 --- a/api/handler/acl.go +++ b/api/handler/acl.go @@ -616,22 +616,11 @@ func (h *handler) PutObjectACLHandler(w http.ResponseWriter, r *http.Request) { return } - updated, err := h.updateBucketACL(r, astObject, bktInfo, token) - if err != nil { + if _, err = h.updateBucketACL(r, astObject, bktInfo, token); err != nil { h.logAndSendError(w, "could not update bucket acl", reqInfo, err) return } - if updated { - s := &SendNotificationParams{ - Event: EventObjectACLPut, - NotificationInfo: data.NotificationInfoFromObject(objInfo, h.cfg.MD5Enabled()), - BktInfo: bktInfo, - ReqInfo: reqInfo, - } - if err = h.sendNotifications(ctx, s); err != nil { - h.reqLogger(ctx).Error(logs.CouldntSendNotification, zap.Error(err)) - } - } + w.WriteHeader(http.StatusOK) } diff --git a/api/handler/api.go b/api/handler/api.go index 7734d80..2d9e01d 100644 --- a/api/handler/api.go +++ b/api/handler/api.go @@ -11,7 +11,6 @@ import ( "git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api" "git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/layer" - "git.frostfs.info/TrueCloudLab/frostfs-s3-gw/internal/logs" cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id" "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/netmap" "git.frostfs.info/TrueCloudLab/policy-engine/pkg/chain" @@ -20,17 +19,11 @@ import ( type ( handler struct { - log *zap.Logger - obj layer.Client - notificator Notificator - cfg Config - ape APE - frostfsid FrostFSID - } - - Notificator interface { - SendNotifications(topics map[string]string, p *SendNotificationParams) error - SendTestNotification(topic, bucketName, requestID, HostID string, now time.Time) error + log *zap.Logger + obj layer.Client + cfg Config + ape APE + frostfsid FrostFSID } // Config contains data which handler needs to keep. @@ -41,7 +34,6 @@ type ( DefaultCopiesNumbers(namespace string) []uint32 NewXMLDecoder(io.Reader) *xml.Decoder DefaultMaxAge() int - NotificatorEnabled() bool ResolveZoneList() []string IsResolveListAllow() bool BypassContentEncodingInChunks() bool @@ -76,7 +68,7 @@ const ( var _ api.Handler = (*handler)(nil) // New creates new api.Handler using given logger and client. -func New(log *zap.Logger, obj layer.Client, notificator Notificator, cfg Config, storage APE, ffsid FrostFSID) (api.Handler, error) { +func New(log *zap.Logger, obj layer.Client, cfg Config, storage APE, ffsid FrostFSID) (api.Handler, error) { switch { case obj == nil: return nil, errors.New("empty FrostFS Object Layer") @@ -88,19 +80,12 @@ func New(log *zap.Logger, obj layer.Client, notificator Notificator, cfg Config, return nil, errors.New("empty frostfsid") } - if !cfg.NotificatorEnabled() { - log.Warn(logs.NotificatorIsDisabledS3WontProduceNotificationEvents) - } else if notificator == nil { - return nil, errors.New("empty notificator") - } - return &handler{ - log: log, - obj: obj, - cfg: cfg, - ape: storage, - notificator: notificator, - frostfsid: ffsid, + log: log, + obj: obj, + cfg: cfg, + ape: storage, + frostfsid: ffsid, }, nil } diff --git a/api/handler/copy.go b/api/handler/copy.go index 5b1fea8..fbd931c 100644 --- a/api/handler/copy.go +++ b/api/handler/copy.go @@ -268,7 +268,7 @@ func (h *handler) CopyObjectHandler(w http.ResponseWriter, r *http.Request) { TagSet: tagSet, NodeVersion: extendedDstObjInfo.NodeVersion, } - if _, err = h.obj.PutObjectTagging(ctx, tagPrm); err != nil { + if err = h.obj.PutObjectTagging(ctx, tagPrm); err != nil { h.logAndSendError(w, "could not upload object tagging", reqInfo, err) return } @@ -276,16 +276,6 @@ func (h *handler) CopyObjectHandler(w http.ResponseWriter, r *http.Request) { h.reqLogger(ctx).Info(logs.ObjectIsCopied, zap.Stringer("object_id", dstObjInfo.ID)) - s := &SendNotificationParams{ - Event: EventObjectCreatedCopy, - NotificationInfo: data.NotificationInfoFromObject(dstObjInfo, h.cfg.MD5Enabled()), - BktInfo: dstBktInfo, - ReqInfo: reqInfo, - } - if err = h.sendNotifications(ctx, s); err != nil { - h.reqLogger(ctx).Error(logs.CouldntSendNotification, zap.Error(err)) - } - if dstEncryptionParams.Enabled() { addSSECHeaders(w.Header(), r.Header) } diff --git a/api/handler/delete.go b/api/handler/delete.go index f67d021..99ec696 100644 --- a/api/handler/delete.go +++ b/api/handler/delete.go @@ -8,16 +8,12 @@ import ( "strings" "git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api" - "git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/data" "git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/errors" "git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/layer" "git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/middleware" - "git.frostfs.info/TrueCloudLab/frostfs-s3-gw/internal/logs" apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status" - oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id" "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/session" "git.frostfs.info/TrueCloudLab/policy-engine/pkg/chain" - "go.uber.org/zap" ) // limitation of AWS https://docs.aws.amazon.com/AmazonS3/latest/API/API_DeleteObjects.html @@ -101,41 +97,6 @@ func (h *handler) DeleteObjectHandler(w http.ResponseWriter, r *http.Request) { return } - var m *SendNotificationParams - - if bktSettings.VersioningEnabled() && len(versionID) == 0 { - m = &SendNotificationParams{ - Event: EventObjectRemovedDeleteMarkerCreated, - NotificationInfo: &data.NotificationInfo{ - Name: reqInfo.ObjectName, - HashSum: deletedObject.DeleteMarkerEtag, - }, - BktInfo: bktInfo, - ReqInfo: reqInfo, - } - } else { - var objID oid.ID - if len(versionID) != 0 { - if err = objID.DecodeString(versionID); err != nil { - h.reqLogger(ctx).Error(logs.CouldntSendNotification, zap.Error(err)) - } - } - - m = &SendNotificationParams{ - Event: EventObjectRemovedDelete, - NotificationInfo: &data.NotificationInfo{ - Name: reqInfo.ObjectName, - Version: objID.EncodeToString(), - }, - BktInfo: bktInfo, - ReqInfo: reqInfo, - } - } - - if err = h.sendNotifications(ctx, m); err != nil { - h.reqLogger(ctx).Error(logs.CouldntSendNotification, zap.Error(err)) - } - if deletedObject.VersionID != "" { w.Header().Set(api.AmzVersionID, deletedObject.VersionID) } diff --git a/api/handler/handlers_test.go b/api/handler/handlers_test.go index bd02faa..7a1b6c4 100644 --- a/api/handler/handlers_test.go +++ b/api/handler/handlers_test.go @@ -104,10 +104,6 @@ func (c *configMock) DefaultMaxAge() int { return 0 } -func (c *configMock) NotificatorEnabled() bool { - return false -} - func (c *configMock) ResolveZoneList() []string { return []string{} } diff --git a/api/handler/multipart_upload.go b/api/handler/multipart_upload.go index 76bf787..169f776 100644 --- a/api/handler/multipart_upload.go +++ b/api/handler/multipart_upload.go @@ -13,7 +13,6 @@ import ( "git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/errors" "git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/layer" "git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/middleware" - "git.frostfs.info/TrueCloudLab/frostfs-s3-gw/internal/logs" "github.com/google/uuid" "go.uber.org/zap" ) @@ -456,7 +455,7 @@ func (h *handler) CompleteMultipartUploadHandler(w http.ResponseWriter, r *http. // Start complete multipart upload which may take some time to fetch object // and re-upload it part by part. - objInfo, err := h.completeMultipartUpload(r, c, bktInfo, reqInfo) + objInfo, err := h.completeMultipartUpload(r, c, bktInfo) if err != nil { h.logAndSendError(w, "complete multipart error", reqInfo, err, additional...) @@ -478,7 +477,7 @@ func (h *handler) CompleteMultipartUploadHandler(w http.ResponseWriter, r *http. } } -func (h *handler) completeMultipartUpload(r *http.Request, c *layer.CompleteMultipartParams, bktInfo *data.BucketInfo, reqInfo *middleware.ReqInfo) (*data.ObjectInfo, error) { +func (h *handler) completeMultipartUpload(r *http.Request, c *layer.CompleteMultipartParams, bktInfo *data.BucketInfo) (*data.ObjectInfo, error) { ctx := r.Context() uploadData, extendedObjInfo, err := h.obj.CompleteMultipartUpload(ctx, c) if err != nil { @@ -496,7 +495,7 @@ func (h *handler) completeMultipartUpload(r *http.Request, c *layer.CompleteMult TagSet: uploadData.TagSet, NodeVersion: extendedObjInfo.NodeVersion, } - if _, err = h.obj.PutObjectTagging(ctx, tagPrm); err != nil { + if err = h.obj.PutObjectTagging(ctx, tagPrm); err != nil { return nil, fmt.Errorf("could not put tagging file of completed multipart upload: %w", err) } } @@ -528,16 +527,6 @@ func (h *handler) completeMultipartUpload(r *http.Request, c *layer.CompleteMult } } - s := &SendNotificationParams{ - Event: EventObjectCreatedCompleteMultipartUpload, - NotificationInfo: data.NotificationInfoFromObject(objInfo, h.cfg.MD5Enabled()), - BktInfo: bktInfo, - ReqInfo: reqInfo, - } - if err = h.sendNotifications(ctx, s); err != nil { - h.reqLogger(ctx).Error(logs.CouldntSendNotification, zap.Error(err)) - } - return objInfo, nil } diff --git a/api/handler/notifications.go b/api/handler/notifications.go deleted file mode 100644 index 8ef4c7a..0000000 --- a/api/handler/notifications.go +++ /dev/null @@ -1,274 +0,0 @@ -package handler - -import ( - "context" - "fmt" - "net/http" - "strings" - "time" - - "git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/data" - "git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/errors" - "git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/layer" - "git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/middleware" - "git.frostfs.info/TrueCloudLab/frostfs-s3-gw/internal/logs" - "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/bearer" - "github.com/google/uuid" -) - -type ( - SendNotificationParams struct { - Event string - NotificationInfo *data.NotificationInfo - BktInfo *data.BucketInfo - ReqInfo *middleware.ReqInfo - User string - Time time.Time - } -) - -const ( - filterRuleSuffixName = "suffix" - filterRulePrefixName = "prefix" - - EventObjectCreated = "s3:ObjectCreated:*" - EventObjectCreatedPut = "s3:ObjectCreated:Put" - EventObjectCreatedPost = "s3:ObjectCreated:Post" - EventObjectCreatedCopy = "s3:ObjectCreated:Copy" - EventReducedRedundancyLostObject = "s3:ReducedRedundancyLostObject" - EventObjectCreatedCompleteMultipartUpload = "s3:ObjectCreated:CompleteMultipartUpload" - EventObjectRemoved = "s3:ObjectRemoved:*" - EventObjectRemovedDelete = "s3:ObjectRemoved:Delete" - EventObjectRemovedDeleteMarkerCreated = "s3:ObjectRemoved:DeleteMarkerCreated" - EventObjectRestore = "s3:ObjectRestore:*" - EventObjectRestorePost = "s3:ObjectRestore:Post" - EventObjectRestoreCompleted = "s3:ObjectRestore:Completed" - EventReplication = "s3:Replication:*" - EventReplicationOperationFailedReplication = "s3:Replication:OperationFailedReplication" - EventReplicationOperationNotTracked = "s3:Replication:OperationNotTracked" - EventReplicationOperationMissedThreshold = "s3:Replication:OperationMissedThreshold" - EventReplicationOperationReplicatedAfterThreshold = "s3:Replication:OperationReplicatedAfterThreshold" - EventObjectRestoreDelete = "s3:ObjectRestore:Delete" - EventLifecycleTransition = "s3:LifecycleTransition" - EventIntelligentTiering = "s3:IntelligentTiering" - EventObjectACLPut = "s3:ObjectAcl:Put" - EventLifecycleExpiration = "s3:LifecycleExpiration:*" - EventLifecycleExpirationDelete = "s3:LifecycleExpiration:Delete" - EventLifecycleExpirationDeleteMarkerCreated = "s3:LifecycleExpiration:DeleteMarkerCreated" - EventObjectTagging = "s3:ObjectTagging:*" - EventObjectTaggingPut = "s3:ObjectTagging:Put" - EventObjectTaggingDelete = "s3:ObjectTagging:Delete" -) - -var validEvents = map[string]struct{}{ - EventReducedRedundancyLostObject: {}, - EventObjectCreated: {}, - EventObjectCreatedPut: {}, - EventObjectCreatedPost: {}, - EventObjectCreatedCopy: {}, - EventObjectCreatedCompleteMultipartUpload: {}, - EventObjectRemoved: {}, - EventObjectRemovedDelete: {}, - EventObjectRemovedDeleteMarkerCreated: {}, - EventObjectRestore: {}, - EventObjectRestorePost: {}, - EventObjectRestoreCompleted: {}, - EventReplication: {}, - EventReplicationOperationFailedReplication: {}, - EventReplicationOperationNotTracked: {}, - EventReplicationOperationMissedThreshold: {}, - EventReplicationOperationReplicatedAfterThreshold: {}, - EventObjectRestoreDelete: {}, - EventLifecycleTransition: {}, - EventIntelligentTiering: {}, - EventObjectACLPut: {}, - EventLifecycleExpiration: {}, - EventLifecycleExpirationDelete: {}, - EventLifecycleExpirationDeleteMarkerCreated: {}, - EventObjectTagging: {}, - EventObjectTaggingPut: {}, - EventObjectTaggingDelete: {}, -} - -func (h *handler) PutBucketNotificationHandler(w http.ResponseWriter, r *http.Request) { - reqInfo := middleware.GetReqInfo(r.Context()) - bktInfo, err := h.getBucketAndCheckOwner(r, reqInfo.BucketName) - if err != nil { - h.logAndSendError(w, "could not get bucket info", reqInfo, err) - return - } - - conf := &data.NotificationConfiguration{} - if err = h.cfg.NewXMLDecoder(r.Body).Decode(conf); err != nil { - h.logAndSendError(w, "couldn't decode notification configuration", reqInfo, errors.GetAPIError(errors.ErrMalformedXML)) - return - } - - if _, err = h.checkBucketConfiguration(r.Context(), conf, reqInfo); err != nil { - h.logAndSendError(w, "couldn't check bucket configuration", reqInfo, err) - return - } - - p := &layer.PutBucketNotificationConfigurationParams{ - RequestInfo: reqInfo, - BktInfo: bktInfo, - Configuration: conf, - } - - p.CopiesNumbers, err = h.pickCopiesNumbers(parseMetadata(r), reqInfo.Namespace, bktInfo.LocationConstraint) - if err != nil { - h.logAndSendError(w, "invalid copies number", reqInfo, err) - return - } - - if err = h.obj.PutBucketNotificationConfiguration(r.Context(), p); err != nil { - h.logAndSendError(w, "couldn't put bucket configuration", reqInfo, err) - return - } -} - -func (h *handler) GetBucketNotificationHandler(w http.ResponseWriter, r *http.Request) { - reqInfo := middleware.GetReqInfo(r.Context()) - - bktInfo, err := h.getBucketAndCheckOwner(r, reqInfo.BucketName) - if err != nil { - h.logAndSendError(w, "could not get bucket info", reqInfo, err) - return - } - - conf, err := h.obj.GetBucketNotificationConfiguration(r.Context(), bktInfo) - if err != nil { - h.logAndSendError(w, "could not get bucket notification configuration", reqInfo, err) - return - } - - if err = middleware.EncodeToResponse(w, conf); err != nil { - h.logAndSendError(w, "could not encode bucket notification configuration to response", reqInfo, err) - return - } -} - -func (h *handler) sendNotifications(ctx context.Context, p *SendNotificationParams) error { - if !h.cfg.NotificatorEnabled() { - return nil - } - - conf, err := h.obj.GetBucketNotificationConfiguration(ctx, p.BktInfo) - if err != nil { - return fmt.Errorf("failed to get notification configuration: %w", err) - } - if conf.IsEmpty() { - return nil - } - - box, err := middleware.GetBoxData(ctx) - if err == nil && box.Gate.BearerToken != nil { - p.User = bearer.ResolveIssuer(*box.Gate.BearerToken).EncodeToString() - } - - p.Time = layer.TimeNow(ctx) - - topics := filterSubjects(conf, p.Event, p.NotificationInfo.Name) - - return h.notificator.SendNotifications(topics, p) -} - -// checkBucketConfiguration checks notification configuration and generates an ID for configurations with empty ids. -func (h *handler) checkBucketConfiguration(ctx context.Context, conf *data.NotificationConfiguration, r *middleware.ReqInfo) (completed bool, err error) { - if conf == nil { - return - } - - if conf.TopicConfigurations != nil || conf.LambdaFunctionConfigurations != nil { - return completed, errors.GetAPIError(errors.ErrNotificationTopicNotSupported) - } - - for i, q := range conf.QueueConfigurations { - if err = checkEvents(q.Events); err != nil { - return - } - - if err = checkRules(q.Filter.Key.FilterRules); err != nil { - return - } - - if h.cfg.NotificatorEnabled() { - if err = h.notificator.SendTestNotification(q.QueueArn, r.BucketName, r.RequestID, r.Host, layer.TimeNow(ctx)); err != nil { - return - } - } else { - h.reqLogger(ctx).Warn(logs.FailedToSendTestEventBecauseNotificationsIsDisabled) - } - - if q.ID == "" { - completed = true - conf.QueueConfigurations[i].ID = uuid.NewString() - } - } - - return -} - -func checkRules(rules []data.FilterRule) error { - names := make(map[string]struct{}) - - for _, r := range rules { - if r.Name != filterRuleSuffixName && r.Name != filterRulePrefixName { - return errors.GetAPIError(errors.ErrFilterNameInvalid) - } - if _, ok := names[r.Name]; ok { - if r.Name == filterRuleSuffixName { - return errors.GetAPIError(errors.ErrFilterNameSuffix) - } - return errors.GetAPIError(errors.ErrFilterNamePrefix) - } - - names[r.Name] = struct{}{} - } - - return nil -} - -func checkEvents(events []string) error { - for _, e := range events { - if _, ok := validEvents[e]; !ok { - return errors.GetAPIError(errors.ErrEventNotification) - } - } - - return nil -} - -func filterSubjects(conf *data.NotificationConfiguration, eventType, objName string) map[string]string { - topics := make(map[string]string) - - for _, t := range conf.QueueConfigurations { - event := false - for _, e := range t.Events { - // the second condition is comparison with the events ending with *: - // s3:ObjectCreated:*, s3:ObjectRemoved:* etc without the last char - if eventType == e || strings.HasPrefix(eventType, e[:len(e)-1]) { - event = true - break - } - } - - if !event { - continue - } - - filter := true - for _, f := range t.Filter.Key.FilterRules { - if f.Name == filterRulePrefixName && !strings.HasPrefix(objName, f.Value) || - f.Name == filterRuleSuffixName && !strings.HasSuffix(objName, f.Value) { - filter = false - break - } - } - if filter { - topics[t.ID] = t.QueueArn - } - } - - return topics -} diff --git a/api/handler/notifications_test.go b/api/handler/notifications_test.go deleted file mode 100644 index 82aa5bf..0000000 --- a/api/handler/notifications_test.go +++ /dev/null @@ -1,115 +0,0 @@ -package handler - -import ( - "testing" - - "git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/data" - "git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/errors" - "github.com/stretchr/testify/require" -) - -func TestFilterSubjects(t *testing.T) { - config := &data.NotificationConfiguration{ - QueueConfigurations: []data.QueueConfiguration{ - { - ID: "test1", - QueueArn: "test1", - Events: []string{EventObjectCreated, EventObjectRemovedDelete}, - }, - { - ID: "test2", - QueueArn: "test2", - Events: []string{EventObjectTagging}, - Filter: data.Filter{Key: data.Key{FilterRules: []data.FilterRule{ - {Name: "prefix", Value: "dir/"}, - {Name: "suffix", Value: ".png"}, - }}}, - }, - }, - } - - t.Run("no topics because suitable events not found", func(t *testing.T) { - topics := filterSubjects(config, EventObjectACLPut, "dir/a.png") - require.Empty(t, topics) - }) - - t.Run("no topics because of not suitable prefix", func(t *testing.T) { - topics := filterSubjects(config, EventObjectTaggingPut, "dirw/cat.png") - require.Empty(t, topics) - }) - - t.Run("no topics because of not suitable suffix", func(t *testing.T) { - topics := filterSubjects(config, EventObjectTaggingPut, "a.jpg") - require.Empty(t, topics) - }) - - t.Run("filter topics from queue configs without prefix suffix filter and exact event", func(t *testing.T) { - topics := filterSubjects(config, EventObjectCreatedPut, "dir/a.png") - require.Contains(t, topics, "test1") - require.Len(t, topics, 1) - require.Equal(t, topics["test1"], "test1") - }) - - t.Run("filter topics from queue configs with prefix suffix filter and '*' ending event", func(t *testing.T) { - topics := filterSubjects(config, EventObjectTaggingPut, "dir/a.png") - require.Contains(t, topics, "test2") - require.Len(t, topics, 1) - require.Equal(t, topics["test2"], "test2") - }) -} - -func TestCheckRules(t *testing.T) { - t.Run("correct rules with prefix and suffix", func(t *testing.T) { - rules := []data.FilterRule{ - {Name: "prefix", Value: "asd"}, - {Name: "suffix", Value: "asd"}, - } - err := checkRules(rules) - require.NoError(t, err) - }) - - t.Run("correct rules with prefix", func(t *testing.T) { - rules := []data.FilterRule{ - {Name: "prefix", Value: "asd"}, - } - err := checkRules(rules) - require.NoError(t, err) - }) - - t.Run("correct rules with suffix", func(t *testing.T) { - rules := []data.FilterRule{ - {Name: "suffix", Value: "asd"}, - } - err := checkRules(rules) - require.NoError(t, err) - }) - - t.Run("incorrect rules with wrong name", func(t *testing.T) { - rules := []data.FilterRule{ - {Name: "prefix", Value: "sdf"}, - {Name: "sfx", Value: "asd"}, - } - err := checkRules(rules) - require.ErrorIs(t, err, errors.GetAPIError(errors.ErrFilterNameInvalid)) - }) - - t.Run("incorrect rules with repeating suffix", func(t *testing.T) { - rules := []data.FilterRule{ - {Name: "suffix", Value: "asd"}, - {Name: "suffix", Value: "asdf"}, - {Name: "prefix", Value: "jk"}, - } - err := checkRules(rules) - require.ErrorIs(t, err, errors.GetAPIError(errors.ErrFilterNameSuffix)) - }) - - t.Run("incorrect rules with repeating prefix", func(t *testing.T) { - rules := []data.FilterRule{ - {Name: "suffix", Value: "ds"}, - {Name: "prefix", Value: "asd"}, - {Name: "prefix", Value: "asdf"}, - } - err := checkRules(rules) - require.ErrorIs(t, err, errors.GetAPIError(errors.ErrFilterNamePrefix)) - }) -} diff --git a/api/handler/put.go b/api/handler/put.go index 9e9a4a0..2c180f9 100644 --- a/api/handler/put.go +++ b/api/handler/put.go @@ -292,16 +292,6 @@ func (h *handler) PutObjectHandler(w http.ResponseWriter, r *http.Request) { } objInfo := extendedObjInfo.ObjectInfo - s := &SendNotificationParams{ - Event: EventObjectCreatedPut, - NotificationInfo: data.NotificationInfoFromObject(objInfo, h.cfg.MD5Enabled()), - BktInfo: bktInfo, - ReqInfo: reqInfo, - } - if err = h.sendNotifications(ctx, s); err != nil { - h.reqLogger(ctx).Error(logs.CouldntSendNotification, zap.Error(err)) - } - if needUpdateEACLTable { if newEaclTable, err = h.getNewEAclTable(r, bktInfo, objInfo); err != nil { h.logAndSendError(w, "could not get new eacl table", reqInfo, err) @@ -319,7 +309,7 @@ func (h *handler) PutObjectHandler(w http.ResponseWriter, r *http.Request) { TagSet: tagSet, NodeVersion: extendedObjInfo.NodeVersion, } - if _, err = h.obj.PutObjectTagging(r.Context(), tagPrm); err != nil { + if err = h.obj.PutObjectTagging(r.Context(), tagPrm); err != nil { h.logAndSendError(w, "could not upload object tagging", reqInfo, err) return } @@ -560,16 +550,6 @@ func (h *handler) PostObject(w http.ResponseWriter, r *http.Request) { } objInfo := extendedObjInfo.ObjectInfo - s := &SendNotificationParams{ - Event: EventObjectCreatedPost, - NotificationInfo: data.NotificationInfoFromObject(objInfo, h.cfg.MD5Enabled()), - BktInfo: bktInfo, - ReqInfo: reqInfo, - } - if err = h.sendNotifications(ctx, s); err != nil { - h.reqLogger(ctx).Error(logs.CouldntSendNotification, zap.Error(err)) - } - if acl := auth.MultipartFormValue(r, "acl"); acl != "" { r.Header.Set(api.AmzACL, acl) r.Header.Set(api.AmzGrantFullControl, "") @@ -592,7 +572,7 @@ func (h *handler) PostObject(w http.ResponseWriter, r *http.Request) { NodeVersion: extendedObjInfo.NodeVersion, } - if _, err = h.obj.PutObjectTagging(ctx, tagPrm); err != nil { + if err = h.obj.PutObjectTagging(ctx, tagPrm); err != nil { h.logAndSendError(w, "could not upload object tagging", reqInfo, err) return } diff --git a/api/handler/tagging.go b/api/handler/tagging.go index 3900e75..4018d88 100644 --- a/api/handler/tagging.go +++ b/api/handler/tagging.go @@ -10,8 +10,6 @@ import ( "git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/data" "git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/errors" "git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/middleware" - "git.frostfs.info/TrueCloudLab/frostfs-s3-gw/internal/logs" - "go.uber.org/zap" ) const ( @@ -46,27 +44,12 @@ func (h *handler) PutObjectTaggingHandler(w http.ResponseWriter, r *http.Request }, TagSet: tagSet, } - nodeVersion, err := h.obj.PutObjectTagging(ctx, tagPrm) - if err != nil { + + if err = h.obj.PutObjectTagging(ctx, tagPrm); err != nil { h.logAndSendError(w, "could not put object tagging", reqInfo, err) return } - s := &SendNotificationParams{ - Event: EventObjectTaggingPut, - NotificationInfo: &data.NotificationInfo{ - Name: nodeVersion.FilePath, - Size: nodeVersion.Size, - Version: nodeVersion.OID.EncodeToString(), - HashSum: nodeVersion.ETag, - }, - BktInfo: bktInfo, - ReqInfo: reqInfo, - } - if err = h.sendNotifications(ctx, s); err != nil { - h.reqLogger(ctx).Error(logs.CouldntSendNotification, zap.Error(err)) - } - w.WriteHeader(http.StatusOK) } @@ -123,27 +106,11 @@ func (h *handler) DeleteObjectTaggingHandler(w http.ResponseWriter, r *http.Requ VersionID: reqInfo.URL.Query().Get(api.QueryVersionID), } - nodeVersion, err := h.obj.DeleteObjectTagging(ctx, p) - if err != nil { + if err = h.obj.DeleteObjectTagging(ctx, p); err != nil { h.logAndSendError(w, "could not delete object tagging", reqInfo, err) return } - s := &SendNotificationParams{ - Event: EventObjectTaggingDelete, - NotificationInfo: &data.NotificationInfo{ - Name: nodeVersion.FilePath, - Size: nodeVersion.Size, - Version: nodeVersion.OID.EncodeToString(), - HashSum: nodeVersion.ETag, - }, - BktInfo: bktInfo, - ReqInfo: reqInfo, - } - if err = h.sendNotifications(ctx, s); err != nil { - h.reqLogger(ctx).Error(logs.CouldntSendNotification, zap.Error(err)) - } - w.WriteHeader(http.StatusNoContent) } diff --git a/api/handler/unimplemented.go b/api/handler/unimplemented.go index fccdfc3..0c90789 100644 --- a/api/handler/unimplemented.go +++ b/api/handler/unimplemented.go @@ -58,3 +58,11 @@ func (h *handler) PutBucketLifecycleHandler(w http.ResponseWriter, r *http.Reque func (h *handler) PutBucketEncryptionHandler(w http.ResponseWriter, r *http.Request) { h.logAndSendError(w, "not implemented", middleware.GetReqInfo(r.Context()), errors.GetAPIError(errors.ErrNotImplemented)) } + +func (h *handler) PutBucketNotificationHandler(w http.ResponseWriter, r *http.Request) { + h.logAndSendError(w, "not implemented", middleware.GetReqInfo(r.Context()), errors.GetAPIError(errors.ErrNotImplemented)) +} + +func (h *handler) GetBucketNotificationHandler(w http.ResponseWriter, r *http.Request) { + h.logAndSendError(w, "not implemented", middleware.GetReqInfo(r.Context()), errors.GetAPIError(errors.ErrNotImplemented)) +} diff --git a/api/layer/cache.go b/api/layer/cache.go index 23cbd9a..0618dee 100644 --- a/api/layer/cache.go +++ b/api/layer/cache.go @@ -257,24 +257,3 @@ func (c *Cache) PutCORS(owner user.ID, bkt *data.BucketInfo, cors *data.CORSConf func (c *Cache) DeleteCORS(bktInfo *data.BucketInfo) { c.systemCache.Delete(bktInfo.Name + bktInfo.CORSObjectName()) } - -func (c *Cache) GetNotificationConfiguration(owner user.ID, bktInfo *data.BucketInfo) *data.NotificationConfiguration { - key := bktInfo.Name + bktInfo.NotificationConfigurationObjectName() - - if !c.accessCache.Get(owner, key) { - return nil - } - - return c.systemCache.GetNotificationConfiguration(key) -} - -func (c *Cache) PutNotificationConfiguration(owner user.ID, bktInfo *data.BucketInfo, configuration *data.NotificationConfiguration) { - key := bktInfo.Name + bktInfo.NotificationConfigurationObjectName() - if err := c.systemCache.PutNotificationConfiguration(key, configuration); err != nil { - c.logger.Warn(logs.CouldntCacheNotificationConfiguration, zap.String("bucket", bktInfo.Name), zap.Error(err)) - } - - if err := c.accessCache.Put(owner, key); err != nil { - c.logger.Warn(logs.CouldntCacheAccessControlOperation, zap.Error(err)) - } -} diff --git a/api/layer/layer.go b/api/layer/layer.go index eae130f..e81c3e4 100644 --- a/api/layer/layer.go +++ b/api/layer/layer.go @@ -27,23 +27,11 @@ import ( oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id" "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/session" "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/user" - "github.com/nats-io/nats.go" "github.com/nspcc-dev/neo-go/pkg/crypto/keys" "go.uber.org/zap" ) type ( - EventListener interface { - Subscribe(context.Context, string, MsgHandler) error - Listen(context.Context) - } - - MsgHandler interface { - HandleMessage(context.Context, *nats.Msg) error - } - - MsgHandlerFunc func(context.Context, *nats.Msg) error - BucketResolver interface { Resolve(ctx context.Context, name string) (cid.ID, error) } @@ -61,7 +49,6 @@ type ( log *zap.Logger anonKey AnonymousKey resolver BucketResolver - ncontroller EventListener cache *Cache treeService TreeService features FeatureSettings @@ -215,7 +202,6 @@ type ( // Client provides S3 API client interface. Client interface { - Initialize(ctx context.Context, c EventListener) error EphemeralKey() *keys.PublicKey GetBucketSettings(ctx context.Context, bktInfo *data.BucketInfo) (*data.BucketSettings, error) @@ -245,8 +231,8 @@ type ( DeleteBucketTagging(ctx context.Context, bktInfo *data.BucketInfo) error GetObjectTagging(ctx context.Context, p *data.GetObjectTaggingParams) (string, map[string]string, error) - PutObjectTagging(ctx context.Context, p *data.PutObjectTaggingParams) (*data.NodeVersion, error) - DeleteObjectTagging(ctx context.Context, p *data.ObjectVersion) (*data.NodeVersion, error) + PutObjectTagging(ctx context.Context, p *data.PutObjectTaggingParams) error + DeleteObjectTagging(ctx context.Context, p *data.ObjectVersion) error PutObject(ctx context.Context, p *PutObjectParams) (*data.ExtendedObjectInfo, error) @@ -266,9 +252,6 @@ type ( AbortMultipartUpload(ctx context.Context, p *UploadInfoParams) error ListParts(ctx context.Context, p *ListPartsParams) (*ListPartsInfo, error) - PutBucketNotificationConfiguration(ctx context.Context, p *PutBucketNotificationConfigurationParams) error - GetBucketNotificationConfiguration(ctx context.Context, bktInfo *data.BucketInfo) (*data.NotificationConfiguration, error) - // Compound methods for optimizations // GetObjectTaggingAndLock unifies GetObjectTagging and GetLock methods in single tree service invocation. @@ -300,10 +283,6 @@ func (t *VersionedObject) String() string { return t.Name + ":" + t.VersionID } -func (f MsgHandlerFunc) HandleMessage(ctx context.Context, msg *nats.Msg) error { - return f(ctx, msg) -} - func (p HeadObjectParams) Versioned() bool { return len(p.VersionID) > 0 } @@ -327,23 +306,6 @@ func (n *layer) EphemeralKey() *keys.PublicKey { return n.anonKey.Key.PublicKey() } -func (n *layer) Initialize(ctx context.Context, c EventListener) error { - if n.IsNotificationEnabled() { - return fmt.Errorf("already initialized") - } - - // todo add notification handlers (e.g. for lifecycles) - - c.Listen(ctx) - - n.ncontroller = c - return nil -} - -func (n *layer) IsNotificationEnabled() bool { - return n.ncontroller != nil -} - // IsAuthenticatedRequest checks if access box exists in the current request. func IsAuthenticatedRequest(ctx context.Context) bool { _, err := middleware.GetBoxData(ctx) diff --git a/api/layer/notifications.go b/api/layer/notifications.go deleted file mode 100644 index ecd7276..0000000 --- a/api/layer/notifications.go +++ /dev/null @@ -1,89 +0,0 @@ -package layer - -import ( - "bytes" - "context" - "encoding/xml" - errorsStd "errors" - "fmt" - - "git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/data" - "git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/middleware" - "git.frostfs.info/TrueCloudLab/frostfs-s3-gw/internal/logs" - "go.uber.org/zap" -) - -type PutBucketNotificationConfigurationParams struct { - RequestInfo *middleware.ReqInfo - BktInfo *data.BucketInfo - Configuration *data.NotificationConfiguration - CopiesNumbers []uint32 -} - -func (n *layer) PutBucketNotificationConfiguration(ctx context.Context, p *PutBucketNotificationConfigurationParams) error { - confXML, err := xml.Marshal(p.Configuration) - if err != nil { - return fmt.Errorf("marshal notify configuration: %w", err) - } - - prm := PrmObjectCreate{ - Container: p.BktInfo.CID, - Payload: bytes.NewReader(confXML), - Filepath: p.BktInfo.NotificationConfigurationObjectName(), - CreationTime: TimeNow(ctx), - CopiesNumber: p.CopiesNumbers, - } - - _, objID, _, _, err := n.objectPutAndHash(ctx, prm, p.BktInfo) - if err != nil { - return err - } - - objIDToDelete, err := n.treeService.PutNotificationConfigurationNode(ctx, p.BktInfo, objID) - objIDToDeleteNotFound := errorsStd.Is(err, ErrNoNodeToRemove) - if err != nil && !objIDToDeleteNotFound { - return err - } - - if !objIDToDeleteNotFound { - if err = n.objectDelete(ctx, p.BktInfo, objIDToDelete); err != nil { - n.reqLogger(ctx).Error(logs.CouldntDeleteNotificationConfigurationObject, zap.Error(err), - zap.String("cid", p.BktInfo.CID.EncodeToString()), - zap.String("oid", objIDToDelete.EncodeToString())) - } - } - - n.cache.PutNotificationConfiguration(n.BearerOwner(ctx), p.BktInfo, p.Configuration) - - return nil -} - -func (n *layer) GetBucketNotificationConfiguration(ctx context.Context, bktInfo *data.BucketInfo) (*data.NotificationConfiguration, error) { - owner := n.BearerOwner(ctx) - if conf := n.cache.GetNotificationConfiguration(owner, bktInfo); conf != nil { - return conf, nil - } - - objID, err := n.treeService.GetNotificationConfigurationNode(ctx, bktInfo) - objIDNotFound := errorsStd.Is(err, ErrNodeNotFound) - if err != nil && !objIDNotFound { - return nil, err - } - - conf := &data.NotificationConfiguration{} - - if !objIDNotFound { - obj, err := n.objectGet(ctx, bktInfo, objID) - if err != nil { - return nil, err - } - - if err = xml.Unmarshal(obj.Payload(), &conf); err != nil { - return nil, fmt.Errorf("unmarshal notify configuration: %w", err) - } - } - - n.cache.PutNotificationConfiguration(owner, bktInfo, conf) - - return conf, nil -} diff --git a/api/layer/tagging.go b/api/layer/tagging.go index ba10653..7ef7b3d 100644 --- a/api/layer/tagging.go +++ b/api/layer/tagging.go @@ -50,12 +50,12 @@ func (n *layer) GetObjectTagging(ctx context.Context, p *data.GetObjectTaggingPa return p.ObjectVersion.VersionID, tags, nil } -func (n *layer) PutObjectTagging(ctx context.Context, p *data.PutObjectTaggingParams) (nodeVersion *data.NodeVersion, err error) { - nodeVersion = p.NodeVersion +func (n *layer) PutObjectTagging(ctx context.Context, p *data.PutObjectTaggingParams) (err error) { + nodeVersion := p.NodeVersion if nodeVersion == nil { nodeVersion, err = n.getNodeVersionFromCacheOrFrostfs(ctx, p.ObjectVersion) if err != nil { - return nil, err + return err } } p.ObjectVersion.VersionID = nodeVersion.OID.EncodeToString() @@ -63,35 +63,35 @@ func (n *layer) PutObjectTagging(ctx context.Context, p *data.PutObjectTaggingPa err = n.treeService.PutObjectTagging(ctx, p.ObjectVersion.BktInfo, nodeVersion, p.TagSet) if err != nil { if errors.Is(err, ErrNodeNotFound) { - return nil, fmt.Errorf("%w: %s", s3errors.GetAPIError(s3errors.ErrNoSuchKey), err.Error()) + return fmt.Errorf("%w: %s", s3errors.GetAPIError(s3errors.ErrNoSuchKey), err.Error()) } - return nil, err + return err } n.cache.PutTagging(n.BearerOwner(ctx), objectTaggingCacheKey(p.ObjectVersion), p.TagSet) - return nodeVersion, nil + return nil } -func (n *layer) DeleteObjectTagging(ctx context.Context, p *data.ObjectVersion) (*data.NodeVersion, error) { +func (n *layer) DeleteObjectTagging(ctx context.Context, p *data.ObjectVersion) error { version, err := n.getNodeVersion(ctx, p) if err != nil { - return nil, err + return err } err = n.treeService.DeleteObjectTagging(ctx, p.BktInfo, version) if err != nil { if errors.Is(err, ErrNodeNotFound) { - return nil, fmt.Errorf("%w: %s", s3errors.GetAPIError(s3errors.ErrNoSuchKey), err.Error()) + return fmt.Errorf("%w: %s", s3errors.GetAPIError(s3errors.ErrNoSuchKey), err.Error()) } - return nil, err + return err } p.VersionID = version.OID.EncodeToString() n.cache.DeleteTagging(objectTaggingCacheKey(p)) - return version, nil + return nil } func (n *layer) GetBucketTagging(ctx context.Context, bktInfo *data.BucketInfo) (map[string]string, error) { diff --git a/api/layer/tree_mock.go b/api/layer/tree_mock.go index 3a02e4f..4497fd0 100644 --- a/api/layer/tree_mock.go +++ b/api/layer/tree_mock.go @@ -110,14 +110,6 @@ func (t *TreeServiceMock) GetSettingsNode(_ context.Context, bktInfo *data.Bucke return settings, nil } -func (t *TreeServiceMock) GetNotificationConfigurationNode(context.Context, *data.BucketInfo) (oid.ID, error) { - panic("implement me") -} - -func (t *TreeServiceMock) PutNotificationConfigurationNode(context.Context, *data.BucketInfo, oid.ID) (oid.ID, error) { - panic("implement me") -} - func (t *TreeServiceMock) GetBucketCORS(_ context.Context, bktInfo *data.BucketInfo) (oid.ID, error) { systemMap, ok := t.system[bktInfo.CID.EncodeToString()] if !ok { diff --git a/api/layer/tree_service.go b/api/layer/tree_service.go index 1ec59c7..001bad4 100644 --- a/api/layer/tree_service.go +++ b/api/layer/tree_service.go @@ -18,17 +18,6 @@ type TreeService interface { // If tree node is not found returns ErrNodeNotFound error. GetSettingsNode(ctx context.Context, bktInfo *data.BucketInfo) (*data.BucketSettings, error) - // GetNotificationConfigurationNode gets an object id that corresponds to object with bucket CORS. - // - // If tree node is not found returns ErrNodeNotFound error. - GetNotificationConfigurationNode(ctx context.Context, bktInfo *data.BucketInfo) (oid.ID, error) - - // PutNotificationConfigurationNode puts a node to a system tree - // and returns objectID of a previous notif config which must be deleted in FrostFS. - // - // If object id to remove is not found returns ErrNoNodeToRemove error. - PutNotificationConfigurationNode(ctx context.Context, bktInfo *data.BucketInfo, objID oid.ID) (oid.ID, error) - // GetBucketCORS gets an object id that corresponds to object with bucket CORS. // // If object id is not found returns ErrNodeNotFound error. diff --git a/api/notifications/controller.go b/api/notifications/controller.go deleted file mode 100644 index 1ce6919..0000000 --- a/api/notifications/controller.go +++ /dev/null @@ -1,263 +0,0 @@ -package notifications - -import ( - "context" - "encoding/json" - "fmt" - "sync" - "time" - - "git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/handler" - "git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/layer" - "git.frostfs.info/TrueCloudLab/frostfs-s3-gw/internal/logs" - "github.com/nats-io/nats.go" - "go.uber.org/zap" -) - -const ( - DefaultTimeout = 30 * time.Second - - // EventVersion23 is used for lifecycle, tiering, objectACL, objectTagging, object restoration notifications. - EventVersion23 = "2.3" - // EventVersion22 is used for replication notifications. - EventVersion22 = "2.2" - // EventVersion21 is used for all other notification types. - EventVersion21 = "2.1" -) - -type ( - Options struct { - URL string - TLSCertFilepath string - TLSAuthPrivateKeyFilePath string - Timeout time.Duration - RootCAFiles []string - } - - Controller struct { - logger *zap.Logger - taskQueueConnection *nats.Conn - jsClient nats.JetStreamContext - handlers map[string]Stream - mu sync.RWMutex - } - - Stream struct { - h layer.MsgHandler - ch chan *nats.Msg - } - - TestEvent struct { - Service string - Event string - Time time.Time - Bucket string - RequestID string - HostID string - } - - Event struct { - Records []EventRecord `json:"Records"` - } - - EventRecord struct { - EventVersion string `json:"eventVersion"` - EventSource string `json:"eventSource"` // frostfs:s3 - AWSRegion string `json:"awsRegion,omitempty"` // empty - EventTime time.Time `json:"eventTime"` - EventName string `json:"eventName"` - UserIdentity UserIdentity `json:"userIdentity"` - RequestParameters RequestParameters `json:"requestParameters"` - ResponseElements map[string]string `json:"responseElements"` - S3 S3Entity `json:"s3"` - } - - UserIdentity struct { - PrincipalID string `json:"principalId"` - } - - RequestParameters struct { - SourceIPAddress string `json:"sourceIPAddress"` - } - - S3Entity struct { - SchemaVersion string `json:"s3SchemaVersion"` - ConfigurationID string `json:"configurationId,omitempty"` - Bucket Bucket `json:"bucket"` - Object Object `json:"object"` - } - - Bucket struct { - Name string `json:"name"` - OwnerIdentity UserIdentity `json:"ownerIdentity,omitempty"` - Arn string `json:"arn,omitempty"` - } - - Object struct { - Key string `json:"key"` - Size uint64 `json:"size,omitempty"` - VersionID string `json:"versionId,omitempty"` - ETag string `json:"eTag,omitempty"` - Sequencer string `json:"sequencer,omitempty"` - } -) - -func NewController(p *Options, l *zap.Logger) (*Controller, error) { - ncopts := []nats.Option{ - nats.Timeout(p.Timeout), - } - - if len(p.TLSCertFilepath) != 0 && len(p.TLSAuthPrivateKeyFilePath) != 0 { - ncopts = append(ncopts, nats.ClientCert(p.TLSCertFilepath, p.TLSAuthPrivateKeyFilePath)) - } - if len(p.RootCAFiles) != 0 { - ncopts = append(ncopts, nats.RootCAs(p.RootCAFiles...)) - } - - nc, err := nats.Connect(p.URL, ncopts...) - if err != nil { - return nil, fmt.Errorf("connect to nats: %w", err) - } - - js, err := nc.JetStream() - if err != nil { - return nil, fmt.Errorf("get jet stream: %w", err) - } - - return &Controller{ - logger: l, - taskQueueConnection: nc, - jsClient: js, - handlers: make(map[string]Stream), - }, nil -} - -func (c *Controller) Subscribe(_ context.Context, topic string, handler layer.MsgHandler) error { - ch := make(chan *nats.Msg, 1) - - c.mu.RLock() - _, ok := c.handlers[topic] - c.mu.RUnlock() - if ok { - return fmt.Errorf("already subscribed to topic '%s'", topic) - } - - if _, err := c.jsClient.AddStream(&nats.StreamConfig{Name: topic}); err != nil { - return fmt.Errorf("add stream: %w", err) - } - - if _, err := c.jsClient.ChanSubscribe(topic, ch); err != nil { - return fmt.Errorf("could not subscribe: %w", err) - } - - c.mu.Lock() - c.handlers[topic] = Stream{ - h: handler, - ch: ch, - } - c.mu.Unlock() - - return nil -} - -func (c *Controller) Listen(ctx context.Context) { - c.mu.RLock() - defer c.mu.RUnlock() - - for _, stream := range c.handlers { - go func(stream Stream) { - for { - select { - case msg := <-stream.ch: - if err := stream.h.HandleMessage(ctx, msg); err != nil { - c.logger.Error(logs.CouldNotHandleMessage, zap.Error(err)) - } else if err = msg.Ack(); err != nil { - c.logger.Error(logs.CouldNotACKMessage, zap.Error(err)) - } - case <-ctx.Done(): - return - } - } - }(stream) - } -} - -func (c *Controller) SendNotifications(topics map[string]string, p *handler.SendNotificationParams) error { - event := prepareEvent(p) - - for id, topic := range topics { - event.Records[0].S3.ConfigurationID = id - msg, err := json.Marshal(event) - if err != nil { - c.logger.Error(logs.CouldntMarshalAnEvent, zap.String("subject", topic), zap.Error(err)) - } - if err = c.publish(topic, msg); err != nil { - c.logger.Error(logs.CouldntSendAnEventToTopic, zap.String("subject", topic), zap.Error(err)) - } - } - - return nil -} - -func (c *Controller) SendTestNotification(topic, bucketName, requestID, HostID string, now time.Time) error { - event := &TestEvent{ - Service: "FrostFS S3", - Event: "s3:TestEvent", - Time: now, - Bucket: bucketName, - RequestID: requestID, - HostID: HostID, - } - - msg, err := json.Marshal(event) - if err != nil { - return fmt.Errorf("couldn't marshal test event: %w", err) - } - - return c.publish(topic, msg) -} - -func prepareEvent(p *handler.SendNotificationParams) *Event { - return &Event{ - Records: []EventRecord{ - { - EventVersion: EventVersion21, - EventSource: "frostfs:s3", - AWSRegion: "", - EventTime: p.Time, - EventName: p.Event, - UserIdentity: UserIdentity{ - PrincipalID: p.User, - }, - RequestParameters: RequestParameters{ - SourceIPAddress: p.ReqInfo.RemoteHost, - }, - ResponseElements: nil, - S3: S3Entity{ - SchemaVersion: "1.0", - // ConfigurationID is skipped and will be placed later - Bucket: Bucket{ - Name: p.BktInfo.Name, - OwnerIdentity: UserIdentity{PrincipalID: p.BktInfo.Owner.String()}, - Arn: p.BktInfo.Name, - }, - Object: Object{ - Key: p.NotificationInfo.Name, - Size: p.NotificationInfo.Size, - VersionID: p.NotificationInfo.Version, - ETag: p.NotificationInfo.HashSum, - Sequencer: "", - }, - }, - }, - }, - } -} - -func (c *Controller) publish(topic string, msg []byte) error { - if _, err := c.jsClient.Publish(topic, msg); err != nil { - return fmt.Errorf("couldn't send event: %w", err) - } - - return nil -} diff --git a/cmd/s3-gw/app.go b/cmd/s3-gw/app.go index 3f72695..b022b50 100644 --- a/cmd/s3-gw/app.go +++ b/cmd/s3-gw/app.go @@ -24,7 +24,6 @@ import ( "git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/handler" "git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/layer" s3middleware "git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/middleware" - "git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/notifications" "git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/resolver" "git.frostfs.info/TrueCloudLab/frostfs-s3-gw/creds/tokens" "git.frostfs.info/TrueCloudLab/frostfs-s3-gw/internal/frostfs" @@ -62,7 +61,6 @@ type ( pool *pool.Pool treePool *treepool.Pool key *keys.PrivateKey - nc *notifications.Controller obj layer.Client api api.Handler @@ -88,7 +86,6 @@ type ( maxClient maxClientsConfig defaultMaxAge int reconnectInterval time.Duration - notificatorEnabled bool resolveZoneList []string isResolveListAllow bool // True if ResolveZoneList contains allowed zones frostfsidValidation bool @@ -157,13 +154,13 @@ func (a *App) init(ctx context.Context) { a.setRuntimeParameters() a.initFrostfsID(ctx) a.initPolicyStorage(ctx) - a.initAPI(ctx) + a.initAPI() a.initMetrics() a.initServers(ctx) a.initTracing(ctx) } -func (a *App) initLayer(ctx context.Context) { +func (a *App) initLayer() { a.initResolver() // prepare random key for anonymous requests @@ -188,18 +185,6 @@ func (a *App) initLayer(ctx context.Context) { // prepare object layer a.obj = layer.NewLayer(a.log, frostfs.NewFrostFS(a.pool, a.key), layerCfg) - - if a.cfg.GetBool(cfgEnableNATS) { - nopts := getNotificationsOptions(a.cfg, a.log) - a.nc, err = notifications.NewController(nopts, a.log) - if err != nil { - a.log.Fatal(logs.FailedToEnableNotifications, zap.Error(err)) - } - - if err = a.obj.Initialize(ctx, a.nc); err != nil { - a.log.Fatal(logs.CouldntInitializeLayer, zap.Error(err)) - } - } } func newAppSettings(log *Logger, v *viper.Viper) *appSettings { @@ -208,7 +193,6 @@ func newAppSettings(log *Logger, v *viper.Viper) *appSettings { maxClient: newMaxClients(v), defaultMaxAge: fetchDefaultMaxAge(v, log.logger), reconnectInterval: fetchReconnectInterval(v), - notificatorEnabled: v.GetBool(cfgEnableNATS), frostfsidValidation: v.GetBool(cfgFrostfsIDValidationEnabled), } @@ -343,10 +327,6 @@ func (s *appSettings) DefaultMaxAge() int { return s.defaultMaxAge } -func (s *appSettings) NotificatorEnabled() bool { - return s.notificatorEnabled -} - func (s *appSettings) ResolveZoneList() []string { return s.resolveZoneList } @@ -468,8 +448,8 @@ func (s *appSettings) RetryStrategy() handler.RetryStrategy { return s.retryStrategy } -func (a *App) initAPI(ctx context.Context) { - a.initLayer(ctx) +func (a *App) initAPI() { + a.initLayer() a.initHandler() } @@ -908,17 +888,6 @@ func (a *App) stopServices() { } } -func getNotificationsOptions(v *viper.Viper, l *zap.Logger) *notifications.Options { - cfg := notifications.Options{} - cfg.URL = v.GetString(cfgNATSEndpoint) - cfg.Timeout = fetchNATSTimeout(v, l) - cfg.TLSCertFilepath = v.GetString(cfgNATSTLSCertFile) - cfg.TLSAuthPrivateKeyFilePath = v.GetString(cfgNATSAuthPrivateKeyFile) - cfg.RootCAFiles = v.GetStringSlice(cfgNATSRootCAFiles) - - return &cfg -} - func getCacheOptions(v *viper.Viper, l *zap.Logger) *layer.CachesConfig { cacheCfg := layer.DefaultCachesConfigs(l) @@ -976,7 +945,7 @@ func getFrostfsIDCacheConfig(v *viper.Viper, l *zap.Logger) *cache.Config { func (a *App) initHandler() { var err error - a.api, err = handler.New(a.log, a.obj, a.nc, a.settings, a.policyStorage, a.frostfsid) + a.api, err = handler.New(a.log, a.obj, a.settings, a.policyStorage, a.frostfsid) if err != nil { a.log.Fatal(logs.CouldNotInitializeAPIHandler, zap.Error(err)) } diff --git a/cmd/s3-gw/app_settings.go b/cmd/s3-gw/app_settings.go index dc2a336..ca20cc7 100644 --- a/cmd/s3-gw/app_settings.go +++ b/cmd/s3-gw/app_settings.go @@ -14,7 +14,6 @@ import ( "git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api" "git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/handler" - "git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/notifications" "git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/resolver" "git.frostfs.info/TrueCloudLab/frostfs-s3-gw/internal/logs" "git.frostfs.info/TrueCloudLab/frostfs-s3-gw/internal/version" @@ -120,14 +119,6 @@ const ( // Settings. cfgAccessBoxCacheRemovingCheckInterval = "cache.accessbox.removing_check_interval" - // NATS. - cfgEnableNATS = "nats.enabled" - cfgNATSEndpoint = "nats.endpoint" - cfgNATSTimeout = "nats.timeout" - cfgNATSTLSCertFile = "nats.cert_file" - cfgNATSAuthPrivateKeyFile = "nats.key_file" - cfgNATSRootCAFiles = "nats.root_ca" - // Policy. cfgPolicyDefault = "placement_policy.default" cfgPolicyRegionMapFile = "placement_policy.region_mapping" @@ -376,19 +367,6 @@ func fetchDefaultPolicy(l *zap.Logger, cfg *viper.Viper) netmap.PlacementPolicy return policy } -func fetchNATSTimeout(cfg *viper.Viper, l *zap.Logger) time.Duration { - timeout := cfg.GetDuration(cfgNATSTimeout) - if timeout <= 0 { - l.Error(logs.InvalidLifetimeUsingDefaultValue, - zap.String("parameter", cfgNATSTimeout), - zap.Duration("value in config", timeout), - zap.Duration("default", notifications.DefaultTimeout)) - timeout = notifications.DefaultTimeout - } - - return timeout -} - func fetchCacheLifetime(v *viper.Viper, l *zap.Logger, cfgEntry string, defaultValue time.Duration) time.Duration { if v.IsSet(cfgEntry) { lifetime := v.GetDuration(cfgEntry) diff --git a/config/config.env b/config/config.env index 82c0a09..a16ecff 100644 --- a/config/config.env +++ b/config/config.env @@ -88,7 +88,7 @@ S3_GW_CACHE_BUCKETS_SIZE=1000 # Cache which contains mapping of nice name to object addresses S3_GW_CACHE_NAMES_LIFETIME=1m S3_GW_CACHE_NAMES_SIZE=10000 - # Cache for system objects in a bucket: bucket settings, notification configuration etc +# Cache for system objects in a bucket: bucket settings etc S3_GW_CACHE_SYSTEM_LIFETIME=5m S3_GW_CACHE_SYSTEM_SIZE=100000 # Cache which stores access box with tokens by its address @@ -105,14 +105,6 @@ S3_GW_CACHE_MORPH_POLICY_SIZE=10000 S3_GW_CACHE_FROSTFSID_LIFETIME=1m S3_GW_CACHE_FROSTFSID_SIZE=10000 -# NATS -S3_GW_NATS_ENABLED=true -S3_GW_NATS_ENDPOINT=nats://nats.frostfs.devenv:4222 -S3_GW_NATS_TIMEOUT=30s -S3_GW_NATS_CERT_FILE=/path/to/cert -S3_GW_NATS_KEY_FILE=/path/to/key -S3_GW_NATS_ROOT_CA=/path/to/ca - # Default policy of placing containers in FrostFS # If a user sends a request `CreateBucket` and doesn't define policy for placing of a container in FrostFS, the S3 Gateway # will put the container with default policy. It can be specified via environment variable, e.g.: diff --git a/config/config.yaml b/config/config.yaml index a492f7a..d9cc827 100644 --- a/config/config.yaml +++ b/config/config.yaml @@ -105,7 +105,7 @@ cache: buckets: lifetime: 1m size: 500 - # Cache for system objects in a bucket: bucket settings, notification configuration etc + # Cache for system objects in a bucket: bucket settings etc system: lifetime: 2m size: 1000 @@ -127,14 +127,6 @@ cache: lifetime: 1m size: 10000 -nats: - enabled: true - endpoint: nats://localhost:4222 - timeout: 30s - cert_file: /path/to/cert - key_file: /path/to/key - root_ca: /path/to/ca - # Parameters of FrostFS container placement policy placement_policy: # Default policy of placing containers in FrostFS diff --git a/docs/configuration.md b/docs/configuration.md index 8633908..d25837a 100644 --- a/docs/configuration.md +++ b/docs/configuration.md @@ -177,7 +177,6 @@ There are some custom types used for brevity: | `server` | [Server configuration](#server-section) | | `logger` | [Logger configuration](#logger-section) | | `cache` | [Cache configuration](#cache-section) | -| `nats` | [NATS configuration](#nats-section) | | `cors` | [CORS configuration](#cors-section) | | `pprof` | [Pprof configuration](#pprof-section) | | `prometheus` | [Prometheus configuration](#prometheus-section) | @@ -411,18 +410,18 @@ cache: size: 10000 ``` -| Parameter | Type | Default value | Description | -|-----------------|-------------------------------------------------|-----------------------------------|----------------------------------------------------------------------------------------| -| `objects` | [Cache config](#cache-subsection) | `lifetime: 5m`
`size: 1000000` | Cache for objects (FrostFS headers). | -| `list` | [Cache config](#cache-subsection) | `lifetime: 60s`
`size: 100000` | Cache which keeps lists of objects in buckets. | -| `list_session` | [Cache config](#cache-subsection) | `lifetime: 60s`
`size: 100` | Cache which keeps listing session. | -| `names` | [Cache config](#cache-subsection) | `lifetime: 60s`
`size: 10000` | Cache which contains mapping of nice name to object addresses. | -| `buckets` | [Cache config](#cache-subsection) | `lifetime: 60s`
`size: 1000` | Cache which contains mapping of bucket name to bucket info. | -| `system` | [Cache config](#cache-subsection) | `lifetime: 5m`
`size: 10000` | Cache for system objects in a bucket: bucket settings, notification configuration etc. | -| `accessbox` | [Accessbox cache config](#accessbox-subsection) | `lifetime: 10m`
`size: 100` | Cache which stores access box with tokens by its address. | -| `accesscontrol` | [Cache config](#cache-subsection) | `lifetime: 1m`
`size: 100000` | Cache which stores owner to cache operation mapping. | -| `morph_policy` | [Cache config](#cache-subsection) | `lifetime: 1m`
`size: 10000` | Cache which stores list of policy chains. | -| `frostfsid` | [Cache config](#cache-subsection) | `lifetime: 1m`
`size: 10000` | Cache which stores FrostfsID subject info. | +| Parameter | Type | Default value | Description | +|-----------------|-------------------------------------------------|-----------------------------------|----------------------------------------------------------------| +| `objects` | [Cache config](#cache-subsection) | `lifetime: 5m`
`size: 1000000` | Cache for objects (FrostFS headers). | +| `list` | [Cache config](#cache-subsection) | `lifetime: 60s`
`size: 100000` | Cache which keeps lists of objects in buckets. | +| `list_session` | [Cache config](#cache-subsection) | `lifetime: 60s`
`size: 100` | Cache which keeps listing session. | +| `names` | [Cache config](#cache-subsection) | `lifetime: 60s`
`size: 10000` | Cache which contains mapping of nice name to object addresses. | +| `buckets` | [Cache config](#cache-subsection) | `lifetime: 60s`
`size: 1000` | Cache which contains mapping of bucket name to bucket info. | +| `system` | [Cache config](#cache-subsection) | `lifetime: 5m`
`size: 10000` | Cache for system objects in a bucket: bucket settings etc. | +| `accessbox` | [Accessbox cache config](#accessbox-subsection) | `lifetime: 10m`
`size: 100` | Cache which stores access box with tokens by its address. | +| `accesscontrol` | [Cache config](#cache-subsection) | `lifetime: 1m`
`size: 100000` | Cache which stores owner to cache operation mapping. | +| `morph_policy` | [Cache config](#cache-subsection) | `lifetime: 1m`
`size: 10000` | Cache which stores list of policy chains. | +| `frostfsid` | [Cache config](#cache-subsection) | `lifetime: 1m`
`size: 10000` | Cache which stores FrostfsID subject info. | #### `cache` subsection @@ -450,36 +449,6 @@ size: 100 | `lifetime` | `duration` | '10m' | Lifetime of entries in cache. | | `size` | `int` | '100 | LRU cache size. | -### `nats` section - -This is an advanced section, use with caution. -You can turn on notifications about successful completions of basic operations, and the gateway will send notifications -via NATS JetStream. - -1. to configure the NATS server with JetStream -2. to specify NATS parameters for the S3 GW. It's ***necessary*** to define a values of `nats.enable` or - `S3_GW_NATS_ENABLED` as `True` -3. to configure notifications in a bucket - -```yaml -nats: - enabled: true - endpoint: nats://localhost:4222 - timeout: 30s - cert_file: /path/to/cert - key_file: /path/to/key - root_ca: /path/to/ca -``` - -| Parameter | Type | Default value | Description | -|---------------|------------|---------------|------------------------------------------------------| -| `enabled` | `bool` | `false` | Flag to enable the service. | -| `endpoint` | `string` | | NATS endpoint to connect to. | -| `timeout` | `duration` | `30s` | Timeout for the object notification operation. | -| `certificate` | `string` | | Path to the client certificate. | -| `key` | `string` | | Path to the client key. | -| `ca` | `string` | | Override root CA used to verify server certificates. | - ### `cors` section ```yaml diff --git a/docs/tree_service.md b/docs/tree_service.md index d9b119c..81553b5 100644 --- a/docs/tree_service.md +++ b/docs/tree_service.md @@ -13,6 +13,5 @@ Each node keeps one of the types of data as a set of **key-value pairs**: Some data takes up a lot of memory, so we store it in FrostFS nodes as an object with payload. But we keep these objects' metadata in the Tree service too: -* Notification configuration * CORS * Metadata of parts of active multipart uploads diff --git a/go.mod b/go.mod index a3e74b6..21f0633 100644 --- a/go.mod +++ b/go.mod @@ -15,7 +15,6 @@ require ( github.com/go-chi/chi/v5 v5.0.8 github.com/google/uuid v1.3.1 github.com/minio/sio v0.3.0 - github.com/nats-io/nats.go v1.13.1-0.20220121202836-972a071d373d github.com/nspcc-dev/neo-go v0.105.0 github.com/panjf2000/ants/v2 v2.5.0 github.com/prometheus/client_golang v1.15.1 @@ -68,9 +67,6 @@ require ( github.com/matttproud/golang_protobuf_extensions v1.0.4 // indirect github.com/mitchellh/mapstructure v1.5.0 // indirect github.com/mr-tron/base58 v1.2.0 // indirect - github.com/nats-io/nats-server/v2 v2.7.1 // indirect - github.com/nats-io/nkeys v0.3.0 // indirect - github.com/nats-io/nuid v1.0.1 // indirect github.com/nspcc-dev/go-ordered-json v0.0.0-20231123160306-3374ff1e7a3c // indirect github.com/nspcc-dev/neo-go/pkg/interop v0.0.0-20231127165613-b35f351f0ba0 // indirect github.com/nspcc-dev/rfc6979 v0.2.0 // indirect diff --git a/go.sum b/go.sum index 674bf90..e2bd872 100644 --- a/go.sum +++ b/go.sum @@ -223,7 +223,6 @@ github.com/josharian/intern v1.0.0/go.mod h1:5DoeVV0s6jJacbCEi61lwdGj/aVlrQvzHFF github.com/jstemmer/go-junit-report v0.0.0-20190106144839-af01ea7f8024/go.mod h1:6v2b51hI/fHJwM22ozAgKL4VKDeJcHhJFhtBdhmNjmU= github.com/jstemmer/go-junit-report v0.9.1/go.mod h1:Brl9GWCQeLvo8nXZwPNNblvFj/XSXhF0NWZEnDohbsk= github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck= -github.com/klauspost/compress v1.13.4 h1:0zhec2I8zGnjWcKyLl6i3gPqKANCCn5e9xmviEEeX6s= github.com/kr/fs v0.1.0/go.mod h1:FFnZGqtBN9Gxj7eW1uZ42v5BccTP0vu6NEaFoC2HwRg= github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo= github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE= @@ -236,7 +235,6 @@ github.com/mailru/easyjson v0.7.7 h1:UGYAvKxe3sBsEDzO8ZeWOSlIQfWFlxbzLZe7hwFURr0 github.com/mailru/easyjson v0.7.7/go.mod h1:xzfreul335JAWq5oZzymOObrkdz5UnU4kGfJJLY9Nlc= github.com/matttproud/golang_protobuf_extensions v1.0.4 h1:mmDVorXM7PCGKw94cs5zkfA9PSy5pEvNWRP0ET0TIVo= github.com/matttproud/golang_protobuf_extensions v1.0.4/go.mod h1:BSXmuO+STAnVfrANrmjBb36TMTDstsz7MSK+HVaYKv4= -github.com/minio/highwayhash v1.0.1 h1:dZ6IIu8Z14VlC0VpfKofAhCy74wu/Qb5gcn52yWoz/0= github.com/minio/sio v0.3.0 h1:syEFBewzOMOYVzSTFpp1MqpSZk8rUNbz8VIIc+PNzus= github.com/minio/sio v0.3.0/go.mod h1:8b0yPp2avGThviy/+OCJBI6OMpvxoUuiLvE6F1lebhw= github.com/mitchellh/mapstructure v1.5.0 h1:jeMsZIYE/09sWLaz43PL7Gy6RuMjD2eJVyuac5Z2hdY= @@ -244,15 +242,6 @@ github.com/mitchellh/mapstructure v1.5.0/go.mod h1:bFUtVrKA4DC2yAKiSyO/QUcy7e+RR github.com/mmcloughlin/addchain v0.4.0 h1:SobOdjm2xLj1KkXN5/n0xTIWyZA2+s99UCY1iPfkHRY= github.com/mr-tron/base58 v1.2.0 h1:T/HDJBh4ZCPbU39/+c3rRvE0uKBQlU27+QI8LJ4t64o= github.com/mr-tron/base58 v1.2.0/go.mod h1:BinMc/sQntlIE1frQmRFPUoPA1Zkr8VRgBdjWI2mNwc= -github.com/nats-io/jwt/v2 v2.2.1-0.20220113022732-58e87895b296 h1:vU9tpM3apjYlLLeY23zRWJ9Zktr5jp+mloR942LEOpY= -github.com/nats-io/nats-server/v2 v2.7.1 h1:SDj8R0PJPVekw3EgHxGtTfJUuMbsuaul1nwWFI3xTyk= -github.com/nats-io/nats-server/v2 v2.7.1/go.mod h1:tckmrt0M6bVaDT3kmh9UrIq/CBOBBse+TpXQi5ldaa8= -github.com/nats-io/nats.go v1.13.1-0.20220121202836-972a071d373d h1:GRSmEJutHkdoxKsRypP575IIdoXe7Bm6yHQF6GcDBnA= -github.com/nats-io/nats.go v1.13.1-0.20220121202836-972a071d373d/go.mod h1:BPko4oXsySz4aSWeFgOHLZs3G4Jq4ZAyE6/zMCxRT6w= -github.com/nats-io/nkeys v0.3.0 h1:cgM5tL53EvYRU+2YLXIK0G2mJtK12Ft9oeooSZMA2G8= -github.com/nats-io/nkeys v0.3.0/go.mod h1:gvUNGjVcM2IPr5rCsRsC6Wb3Hr2CQAm08dsxtV6A5y4= -github.com/nats-io/nuid v1.0.1 h1:5iA8DT8V7q8WK2EScv2padNa/rTESc1KdnPw4TC2paw= -github.com/nats-io/nuid v1.0.1/go.mod h1:19wcPz3Ph3q0Jbyiqsd0kePYG7A95tJPxeL+1OSON2c= github.com/nspcc-dev/go-ordered-json v0.0.0-20231123160306-3374ff1e7a3c h1:OOQeE613BH93ICPq3eke5N78gWNeMjcBWkmD2NKyXVg= github.com/nspcc-dev/go-ordered-json v0.0.0-20231123160306-3374ff1e7a3c/go.mod h1:79bEUDEviBHJMFV6Iq6in57FEOCMcRhfQnfaf0ETA5U= github.com/nspcc-dev/neo-go v0.105.0 h1:vtNZYFEFySK8zRDhLzQYha849VzWrcKezlnq/oNQg/w= @@ -375,7 +364,6 @@ golang.org/x/crypto v0.0.0-20190513172903-22d7a77e9e5f/go.mod h1:yigFU9vqHzYiE8U golang.org/x/crypto v0.0.0-20190605123033-f99c8df09eb5/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= -golang.org/x/crypto v0.0.0-20210314154223-e6e6c4f2bb5b/go.mod h1:T9bdIzuCu7OtxOm1hfPfRQxPLYneinmdGuTeoZ9dtd4= golang.org/x/crypto v0.0.0-20210421170649-83a5a9bb288b/go.mod h1:T9bdIzuCu7OtxOm1hfPfRQxPLYneinmdGuTeoZ9dtd4= golang.org/x/crypto v0.0.0-20211108221036-ceb1ce70b4fa/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc= golang.org/x/crypto v0.21.0 h1:X31++rzVUdKhX5sWmSOFZxx8UW/ldWx55cbf08iNAMA= @@ -541,7 +529,6 @@ golang.org/x/text v0.14.0/go.mod h1:18ZOQIKpY8NJVqYksKHtTdi31H5itFRjB5/qKTNYzSU= golang.org/x/time v0.0.0-20181108054448-85acf8d2951c/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= golang.org/x/time v0.0.0-20190308202827-9d24e82272b4/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= golang.org/x/time v0.0.0-20191024005414-555d28b269f0/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= -golang.org/x/time v0.1.0 h1:xYY+Bajn2a7VBmTM5GikTmnK8ZuX8YgnQCqZpbBNtmA= golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= golang.org/x/tools v0.0.0-20190114222345-bf090417da8b/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= golang.org/x/tools v0.0.0-20190226205152-f727befe758c/go.mod h1:9Yl7xja0Znq3iFh3HoIrodX9oNMXvdceNzlUR8zjMvY= diff --git a/internal/logs/logs.go b/internal/logs/logs.go index da9a0d4..e3502ee 100644 --- a/internal/logs/logs.go +++ b/internal/logs/logs.go @@ -54,16 +54,12 @@ const ( InvalidCacheEntryType = "invalid cache entry type" // Warn in ../../api/cache/* InvalidCacheKeyType = "invalid cache key type" // Warn in ../../api/cache/objectslist.go ObjectIsCopied = "object is copied" // Info in ../../api/handler/copy.go - CouldntSendNotification = "couldn't send notification: %w" // Error in ../../api/handler/* - FailedToSendTestEventBecauseNotificationsIsDisabled = "failed to send test event because notifications is disabled" // Warn in ../../api/handler/notifications.go RequestFailed = "request failed" // Error in ../../api/handler/util.go GetBucketInfo = "get bucket info" // Warn in ../../api/handler/cors.go GetBucketCors = "get bucket cors" // Warn in ../../api/handler/cors.go SomeACLNotFullyMapped = "some acl not fully mapped" // Warn in ../../api/handler/acl.go CouldntDeleteObject = "couldn't delete object" // Error in ../../api/layer/layer.go - NotificatorIsDisabledS3WontProduceNotificationEvents = "notificator is disabled, s3 won't produce notification events" // Warn in ../../api/handler/api.go BucketIsCreated = "bucket is created" // Info in ../../api/handler/put.go - CouldntDeleteNotificationConfigurationObject = "couldn't delete notification configuration object" // Error in ../../api/layer/notifications.go CouldNotParseContainerObjectLockEnabledAttribute = "could not parse container object lock enabled attribute" // Error in ../../api/layer/container.go CouldNotListUserContainers = "could not list user containers" // Error in ../../api/layer/container.go CouldNotFetchContainerInfo = "could not fetch container info" // Error in ../../api/layer/container.go @@ -93,7 +89,6 @@ const ( CouldntCacheLockInfo = "couldn't cache lock info" // Error in ../../api/layer/cache.go CouldntCacheBucketSettings = "couldn't cache bucket settings" // Warn in ../../api/layer/cache.go CouldntCacheCors = "couldn't cache cors" // Warn in ../../api/layer/cache.go - CouldntCacheNotificationConfiguration = "couldn't cache notification configuration" // Warn in ../../api/layer/cache.go CouldntCacheListPolicyChains = "couldn't cache list policy chains" // Warn in ../../api/layer/cache.go RequestEnd = "request end" // Info in ../../api/middleware/response.go CouldntReceiveAccessBoxForGateKeyRandomKeyWillBeUsed = "couldn't receive access box for gate key, random key will be used" // Debug in ../../api/middleware/auth.go @@ -101,15 +96,9 @@ const ( FailedToResolveCID = "failed to resolve CID" // Debug in ../../api/middleware/metrics.go RequestStart = "request start" // Info in ../../api/middleware/reqinfo.go FailedToUnescapeObjectName = "failed to unescape object name" // Warn in ../../api/middleware/reqinfo.go - CouldNotHandleMessage = "could not handle message" // Error in ../../api/notifications/controller.go - CouldNotACKMessage = "could not ACK message" // Error in ../../api/notifications/controller.go - CouldntMarshalAnEvent = "couldn't marshal an event" // Error in ../../api/notifications/controller.go - CouldntSendAnEventToTopic = "couldn't send an event to topic" // Error in ../../api/notifications/controller.go InvalidDefaultMaxAge = "invalid defaultMaxAge" // Fatal in ../../cmd/s3-gw/app_settings.go CantShutDownService = "can't shut down service" // Panic in ../../cmd/s3-gw/service.go CouldntGenerateRandomKey = "couldn't generate random key" // Fatal in ../../cmd/s3-gw/app.go - FailedToEnableNotifications = "failed to enable notifications" // Fatal in ../../cmd/s3-gw/app.go - CouldntInitializeLayer = "couldn't initialize layer" // Fatal in ../../cmd/s3-gw/app.go FailedToCreateResolver = "failed to create resolver" // Fatal in ../../cmd/s3-gw/app.go CouldNotLoadFrostFSPrivateKey = "could not load FrostFS private key" // Fatal in ../../cmd/s3-gw/app.go FailedToCreateConnectionPool = "failed to create connection pool" // Fatal in ../../cmd/s3-gw/app.go diff --git a/pkg/service/tree/tree.go b/pkg/service/tree/tree.go index 003da90..963cccd 100644 --- a/pkg/service/tree/tree.go +++ b/pkg/service/tree/tree.go @@ -108,7 +108,6 @@ const ( createdKV = "Created" settingsFileName = "bucket-settings" - notifConfFileName = "bucket-notifications" corsFilename = "bucket-cors" bucketTaggingFilename = "bucket-tagging" @@ -116,7 +115,7 @@ const ( versionTree = "version" // systemTree -- ID of a tree with system objects - // i.e. bucket settings with versioning and lock configuration, cors, notifications. + // i.e. bucket settings with versioning and lock configuration, cors. systemTree = "system" separator = "/" @@ -400,36 +399,6 @@ func (c *Tree) PutSettingsNode(ctx context.Context, bktInfo *data.BucketInfo, se return c.service.MoveNode(ctx, bktInfo, systemTree, node.ID, 0, meta) } -func (c *Tree) GetNotificationConfigurationNode(ctx context.Context, bktInfo *data.BucketInfo) (oid.ID, error) { - node, err := c.getSystemNode(ctx, bktInfo, []string{notifConfFileName}, []string{oidKV}) - if err != nil { - return oid.ID{}, err - } - - return node.ObjID, nil -} - -func (c *Tree) PutNotificationConfigurationNode(ctx context.Context, bktInfo *data.BucketInfo, objID oid.ID) (oid.ID, error) { - node, err := c.getSystemNode(ctx, bktInfo, []string{notifConfFileName}, []string{oidKV}) - isErrNotFound := errors.Is(err, layer.ErrNodeNotFound) - if err != nil && !isErrNotFound { - return oid.ID{}, fmt.Errorf("couldn't get node: %w", err) - } - - meta := make(map[string]string) - meta[FileNameKey] = notifConfFileName - meta[oidKV] = objID.EncodeToString() - - if isErrNotFound { - if _, err = c.service.AddNode(ctx, bktInfo, systemTree, 0, meta); err != nil { - return oid.ID{}, err - } - return oid.ID{}, layer.ErrNoNodeToRemove - } - - return node.ObjID, c.service.MoveNode(ctx, bktInfo, systemTree, node.ID, 0, meta) -} - func (c *Tree) GetBucketCORS(ctx context.Context, bktInfo *data.BucketInfo) (oid.ID, error) { node, err := c.getSystemNode(ctx, bktInfo, []string{corsFilename}, []string{oidKV}) if err != nil {