diff --git a/CHANGELOG.md b/CHANGELOG.md
index 487ba9f..2b87321 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -24,6 +24,7 @@ This document outlines major changes between releases.
### Removed
- Remove control api (#406)
+- Remove notifications (#401)
## [0.29.0] - Zemu - 2024-05-27
diff --git a/api/cache/cache_test.go b/api/cache/cache_test.go
index b4666c1..31aceba 100644
--- a/api/cache/cache_test.go
+++ b/api/cache/cache_test.go
@@ -182,24 +182,6 @@ func TestSettingsCacheType(t *testing.T) {
assertInvalidCacheEntry(t, cache.GetSettings(key), observedLog)
}
-func TestNotificationConfigurationCacheType(t *testing.T) {
- logger, observedLog := getObservedLogger()
- cache := NewSystemCache(DefaultSystemConfig(logger))
-
- key := "key"
- notificationConfig := &data.NotificationConfiguration{}
-
- err := cache.PutNotificationConfiguration(key, notificationConfig)
- require.NoError(t, err)
- val := cache.GetNotificationConfiguration(key)
- require.Equal(t, notificationConfig, val)
- require.Equal(t, 0, observedLog.Len())
-
- err = cache.cache.Set(key, "tmp")
- require.NoError(t, err)
- assertInvalidCacheEntry(t, cache.GetNotificationConfiguration(key), observedLog)
-}
-
func TestFrostFSIDSubjectCacheType(t *testing.T) {
logger, observedLog := getObservedLogger()
cache := NewFrostfsIDCache(DefaultFrostfsIDConfig(logger))
diff --git a/api/cache/system.go b/api/cache/system.go
index 292ae58..c0f51a5 100644
--- a/api/cache/system.go
+++ b/api/cache/system.go
@@ -104,22 +104,6 @@ func (o *SystemCache) GetSettings(key string) *data.BucketSettings {
return result
}
-func (o *SystemCache) GetNotificationConfiguration(key string) *data.NotificationConfiguration {
- entry, err := o.cache.Get(key)
- if err != nil {
- return nil
- }
-
- result, ok := entry.(*data.NotificationConfiguration)
- if !ok {
- o.logger.Warn(logs.InvalidCacheEntryType, zap.String("actual", fmt.Sprintf("%T", entry)),
- zap.String("expected", fmt.Sprintf("%T", result)))
- return nil
- }
-
- return result
-}
-
// GetTagging returns tags of a bucket or an object.
func (o *SystemCache) GetTagging(key string) map[string]string {
entry, err := o.cache.Get(key)
@@ -153,10 +137,6 @@ func (o *SystemCache) PutSettings(key string, settings *data.BucketSettings) err
return o.cache.Set(key, settings)
}
-func (o *SystemCache) PutNotificationConfiguration(key string, obj *data.NotificationConfiguration) error {
- return o.cache.Set(key, obj)
-}
-
// PutTagging puts tags of a bucket or an object.
func (o *SystemCache) PutTagging(key string, tagSet map[string]string) error {
return o.cache.Set(key, tagSet)
diff --git a/api/data/info.go b/api/data/info.go
index 041e001..3d85788 100644
--- a/api/data/info.go
+++ b/api/data/info.go
@@ -12,9 +12,8 @@ import (
)
const (
- bktSettingsObject = ".s3-settings"
- bktCORSConfigurationObject = ".s3-cors"
- bktNotificationConfigurationObject = ".s3-notifications"
+ bktSettingsObject = ".s3-settings"
+ bktCORSConfigurationObject = ".s3-cors"
VersioningUnversioned = "Unversioned"
VersioningEnabled = "Enabled"
@@ -52,14 +51,6 @@ type (
Headers map[string]string
}
- // NotificationInfo store info to send s3 notification.
- NotificationInfo struct {
- Name string
- Version string
- Size uint64
- HashSum string
- }
-
// BucketSettings stores settings such as versioning.
BucketSettings struct {
Versioning string
@@ -93,26 +84,12 @@ type (
}
)
-// NotificationInfoFromObject creates new NotificationInfo from ObjectInfo.
-func NotificationInfoFromObject(objInfo *ObjectInfo, md5Enabled bool) *NotificationInfo {
- return &NotificationInfo{
- Name: objInfo.Name,
- Version: objInfo.VersionID(),
- Size: objInfo.Size,
- HashSum: Quote(objInfo.ETag(md5Enabled)),
- }
-}
-
// SettingsObjectName is a system name for a bucket settings file.
func (b *BucketInfo) SettingsObjectName() string { return bktSettingsObject }
// CORSObjectName returns a system name for a bucket CORS configuration file.
func (b *BucketInfo) CORSObjectName() string { return bktCORSConfigurationObject }
-func (b *BucketInfo) NotificationConfigurationObjectName() string {
- return bktNotificationConfigurationObject
-}
-
// VersionID returns object version from ObjectInfo.
func (o *ObjectInfo) VersionID() string { return o.ID.EncodeToString() }
diff --git a/api/data/notifications.go b/api/data/notifications.go
deleted file mode 100644
index 0a894e5..0000000
--- a/api/data/notifications.go
+++ /dev/null
@@ -1,42 +0,0 @@
-package data
-
-import "encoding/xml"
-
-type (
- NotificationConfiguration struct {
- XMLName xml.Name `xml:"http://s3.amazonaws.com/doc/2006-03-01/ NotificationConfiguration" json:"-"`
- QueueConfigurations []QueueConfiguration `xml:"QueueConfiguration" json:"QueueConfigurations"`
- // Not supported topics
- TopicConfigurations []TopicConfiguration `xml:"TopicConfiguration" json:"TopicConfigurations"`
- LambdaFunctionConfigurations []LambdaFunctionConfiguration `xml:"CloudFunctionConfiguration" json:"CloudFunctionConfigurations"`
- }
-
- QueueConfiguration struct {
- ID string `xml:"Id" json:"Id"`
- QueueArn string `xml:"Queue" json:"Queue"`
- Events []string `xml:"Event" json:"Events"`
- Filter Filter `xml:"Filter" json:"Filter"`
- }
-
- Filter struct {
- Key Key `xml:"S3Key" json:"S3Key"`
- }
-
- Key struct {
- FilterRules []FilterRule `xml:"FilterRule" json:"FilterRules"`
- }
-
- FilterRule struct {
- Name string `xml:"Name" json:"Name"`
- Value string `xml:"Value" json:"Value"`
- }
-
- // TopicConfiguration and LambdaFunctionConfiguration -- we don't support these configurations,
- // but we need them to detect in notification configurations in requests.
- TopicConfiguration struct{}
- LambdaFunctionConfiguration struct{}
-)
-
-func (n NotificationConfiguration) IsEmpty() bool {
- return len(n.QueueConfigurations) == 0 && len(n.TopicConfigurations) == 0 && len(n.LambdaFunctionConfigurations) == 0
-}
diff --git a/api/handler/acl.go b/api/handler/acl.go
index 4219c27..7933340 100644
--- a/api/handler/acl.go
+++ b/api/handler/acl.go
@@ -616,22 +616,11 @@ func (h *handler) PutObjectACLHandler(w http.ResponseWriter, r *http.Request) {
return
}
- updated, err := h.updateBucketACL(r, astObject, bktInfo, token)
- if err != nil {
+ if _, err = h.updateBucketACL(r, astObject, bktInfo, token); err != nil {
h.logAndSendError(w, "could not update bucket acl", reqInfo, err)
return
}
- if updated {
- s := &SendNotificationParams{
- Event: EventObjectACLPut,
- NotificationInfo: data.NotificationInfoFromObject(objInfo, h.cfg.MD5Enabled()),
- BktInfo: bktInfo,
- ReqInfo: reqInfo,
- }
- if err = h.sendNotifications(ctx, s); err != nil {
- h.reqLogger(ctx).Error(logs.CouldntSendNotification, zap.Error(err))
- }
- }
+
w.WriteHeader(http.StatusOK)
}
diff --git a/api/handler/api.go b/api/handler/api.go
index 7734d80..2d9e01d 100644
--- a/api/handler/api.go
+++ b/api/handler/api.go
@@ -11,7 +11,6 @@ import (
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api"
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/layer"
- "git.frostfs.info/TrueCloudLab/frostfs-s3-gw/internal/logs"
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/netmap"
"git.frostfs.info/TrueCloudLab/policy-engine/pkg/chain"
@@ -20,17 +19,11 @@ import (
type (
handler struct {
- log *zap.Logger
- obj layer.Client
- notificator Notificator
- cfg Config
- ape APE
- frostfsid FrostFSID
- }
-
- Notificator interface {
- SendNotifications(topics map[string]string, p *SendNotificationParams) error
- SendTestNotification(topic, bucketName, requestID, HostID string, now time.Time) error
+ log *zap.Logger
+ obj layer.Client
+ cfg Config
+ ape APE
+ frostfsid FrostFSID
}
// Config contains data which handler needs to keep.
@@ -41,7 +34,6 @@ type (
DefaultCopiesNumbers(namespace string) []uint32
NewXMLDecoder(io.Reader) *xml.Decoder
DefaultMaxAge() int
- NotificatorEnabled() bool
ResolveZoneList() []string
IsResolveListAllow() bool
BypassContentEncodingInChunks() bool
@@ -76,7 +68,7 @@ const (
var _ api.Handler = (*handler)(nil)
// New creates new api.Handler using given logger and client.
-func New(log *zap.Logger, obj layer.Client, notificator Notificator, cfg Config, storage APE, ffsid FrostFSID) (api.Handler, error) {
+func New(log *zap.Logger, obj layer.Client, cfg Config, storage APE, ffsid FrostFSID) (api.Handler, error) {
switch {
case obj == nil:
return nil, errors.New("empty FrostFS Object Layer")
@@ -88,19 +80,12 @@ func New(log *zap.Logger, obj layer.Client, notificator Notificator, cfg Config,
return nil, errors.New("empty frostfsid")
}
- if !cfg.NotificatorEnabled() {
- log.Warn(logs.NotificatorIsDisabledS3WontProduceNotificationEvents)
- } else if notificator == nil {
- return nil, errors.New("empty notificator")
- }
-
return &handler{
- log: log,
- obj: obj,
- cfg: cfg,
- ape: storage,
- notificator: notificator,
- frostfsid: ffsid,
+ log: log,
+ obj: obj,
+ cfg: cfg,
+ ape: storage,
+ frostfsid: ffsid,
}, nil
}
diff --git a/api/handler/copy.go b/api/handler/copy.go
index 5b1fea8..fbd931c 100644
--- a/api/handler/copy.go
+++ b/api/handler/copy.go
@@ -268,7 +268,7 @@ func (h *handler) CopyObjectHandler(w http.ResponseWriter, r *http.Request) {
TagSet: tagSet,
NodeVersion: extendedDstObjInfo.NodeVersion,
}
- if _, err = h.obj.PutObjectTagging(ctx, tagPrm); err != nil {
+ if err = h.obj.PutObjectTagging(ctx, tagPrm); err != nil {
h.logAndSendError(w, "could not upload object tagging", reqInfo, err)
return
}
@@ -276,16 +276,6 @@ func (h *handler) CopyObjectHandler(w http.ResponseWriter, r *http.Request) {
h.reqLogger(ctx).Info(logs.ObjectIsCopied, zap.Stringer("object_id", dstObjInfo.ID))
- s := &SendNotificationParams{
- Event: EventObjectCreatedCopy,
- NotificationInfo: data.NotificationInfoFromObject(dstObjInfo, h.cfg.MD5Enabled()),
- BktInfo: dstBktInfo,
- ReqInfo: reqInfo,
- }
- if err = h.sendNotifications(ctx, s); err != nil {
- h.reqLogger(ctx).Error(logs.CouldntSendNotification, zap.Error(err))
- }
-
if dstEncryptionParams.Enabled() {
addSSECHeaders(w.Header(), r.Header)
}
diff --git a/api/handler/delete.go b/api/handler/delete.go
index f67d021..99ec696 100644
--- a/api/handler/delete.go
+++ b/api/handler/delete.go
@@ -8,16 +8,12 @@ import (
"strings"
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api"
- "git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/data"
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/errors"
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/layer"
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/middleware"
- "git.frostfs.info/TrueCloudLab/frostfs-s3-gw/internal/logs"
apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status"
- oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/session"
"git.frostfs.info/TrueCloudLab/policy-engine/pkg/chain"
- "go.uber.org/zap"
)
// limitation of AWS https://docs.aws.amazon.com/AmazonS3/latest/API/API_DeleteObjects.html
@@ -101,41 +97,6 @@ func (h *handler) DeleteObjectHandler(w http.ResponseWriter, r *http.Request) {
return
}
- var m *SendNotificationParams
-
- if bktSettings.VersioningEnabled() && len(versionID) == 0 {
- m = &SendNotificationParams{
- Event: EventObjectRemovedDeleteMarkerCreated,
- NotificationInfo: &data.NotificationInfo{
- Name: reqInfo.ObjectName,
- HashSum: deletedObject.DeleteMarkerEtag,
- },
- BktInfo: bktInfo,
- ReqInfo: reqInfo,
- }
- } else {
- var objID oid.ID
- if len(versionID) != 0 {
- if err = objID.DecodeString(versionID); err != nil {
- h.reqLogger(ctx).Error(logs.CouldntSendNotification, zap.Error(err))
- }
- }
-
- m = &SendNotificationParams{
- Event: EventObjectRemovedDelete,
- NotificationInfo: &data.NotificationInfo{
- Name: reqInfo.ObjectName,
- Version: objID.EncodeToString(),
- },
- BktInfo: bktInfo,
- ReqInfo: reqInfo,
- }
- }
-
- if err = h.sendNotifications(ctx, m); err != nil {
- h.reqLogger(ctx).Error(logs.CouldntSendNotification, zap.Error(err))
- }
-
if deletedObject.VersionID != "" {
w.Header().Set(api.AmzVersionID, deletedObject.VersionID)
}
diff --git a/api/handler/handlers_test.go b/api/handler/handlers_test.go
index bd02faa..7a1b6c4 100644
--- a/api/handler/handlers_test.go
+++ b/api/handler/handlers_test.go
@@ -104,10 +104,6 @@ func (c *configMock) DefaultMaxAge() int {
return 0
}
-func (c *configMock) NotificatorEnabled() bool {
- return false
-}
-
func (c *configMock) ResolveZoneList() []string {
return []string{}
}
diff --git a/api/handler/multipart_upload.go b/api/handler/multipart_upload.go
index 76bf787..169f776 100644
--- a/api/handler/multipart_upload.go
+++ b/api/handler/multipart_upload.go
@@ -13,7 +13,6 @@ import (
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/errors"
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/layer"
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/middleware"
- "git.frostfs.info/TrueCloudLab/frostfs-s3-gw/internal/logs"
"github.com/google/uuid"
"go.uber.org/zap"
)
@@ -456,7 +455,7 @@ func (h *handler) CompleteMultipartUploadHandler(w http.ResponseWriter, r *http.
// Start complete multipart upload which may take some time to fetch object
// and re-upload it part by part.
- objInfo, err := h.completeMultipartUpload(r, c, bktInfo, reqInfo)
+ objInfo, err := h.completeMultipartUpload(r, c, bktInfo)
if err != nil {
h.logAndSendError(w, "complete multipart error", reqInfo, err, additional...)
@@ -478,7 +477,7 @@ func (h *handler) CompleteMultipartUploadHandler(w http.ResponseWriter, r *http.
}
}
-func (h *handler) completeMultipartUpload(r *http.Request, c *layer.CompleteMultipartParams, bktInfo *data.BucketInfo, reqInfo *middleware.ReqInfo) (*data.ObjectInfo, error) {
+func (h *handler) completeMultipartUpload(r *http.Request, c *layer.CompleteMultipartParams, bktInfo *data.BucketInfo) (*data.ObjectInfo, error) {
ctx := r.Context()
uploadData, extendedObjInfo, err := h.obj.CompleteMultipartUpload(ctx, c)
if err != nil {
@@ -496,7 +495,7 @@ func (h *handler) completeMultipartUpload(r *http.Request, c *layer.CompleteMult
TagSet: uploadData.TagSet,
NodeVersion: extendedObjInfo.NodeVersion,
}
- if _, err = h.obj.PutObjectTagging(ctx, tagPrm); err != nil {
+ if err = h.obj.PutObjectTagging(ctx, tagPrm); err != nil {
return nil, fmt.Errorf("could not put tagging file of completed multipart upload: %w", err)
}
}
@@ -528,16 +527,6 @@ func (h *handler) completeMultipartUpload(r *http.Request, c *layer.CompleteMult
}
}
- s := &SendNotificationParams{
- Event: EventObjectCreatedCompleteMultipartUpload,
- NotificationInfo: data.NotificationInfoFromObject(objInfo, h.cfg.MD5Enabled()),
- BktInfo: bktInfo,
- ReqInfo: reqInfo,
- }
- if err = h.sendNotifications(ctx, s); err != nil {
- h.reqLogger(ctx).Error(logs.CouldntSendNotification, zap.Error(err))
- }
-
return objInfo, nil
}
diff --git a/api/handler/notifications.go b/api/handler/notifications.go
deleted file mode 100644
index 8ef4c7a..0000000
--- a/api/handler/notifications.go
+++ /dev/null
@@ -1,274 +0,0 @@
-package handler
-
-import (
- "context"
- "fmt"
- "net/http"
- "strings"
- "time"
-
- "git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/data"
- "git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/errors"
- "git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/layer"
- "git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/middleware"
- "git.frostfs.info/TrueCloudLab/frostfs-s3-gw/internal/logs"
- "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/bearer"
- "github.com/google/uuid"
-)
-
-type (
- SendNotificationParams struct {
- Event string
- NotificationInfo *data.NotificationInfo
- BktInfo *data.BucketInfo
- ReqInfo *middleware.ReqInfo
- User string
- Time time.Time
- }
-)
-
-const (
- filterRuleSuffixName = "suffix"
- filterRulePrefixName = "prefix"
-
- EventObjectCreated = "s3:ObjectCreated:*"
- EventObjectCreatedPut = "s3:ObjectCreated:Put"
- EventObjectCreatedPost = "s3:ObjectCreated:Post"
- EventObjectCreatedCopy = "s3:ObjectCreated:Copy"
- EventReducedRedundancyLostObject = "s3:ReducedRedundancyLostObject"
- EventObjectCreatedCompleteMultipartUpload = "s3:ObjectCreated:CompleteMultipartUpload"
- EventObjectRemoved = "s3:ObjectRemoved:*"
- EventObjectRemovedDelete = "s3:ObjectRemoved:Delete"
- EventObjectRemovedDeleteMarkerCreated = "s3:ObjectRemoved:DeleteMarkerCreated"
- EventObjectRestore = "s3:ObjectRestore:*"
- EventObjectRestorePost = "s3:ObjectRestore:Post"
- EventObjectRestoreCompleted = "s3:ObjectRestore:Completed"
- EventReplication = "s3:Replication:*"
- EventReplicationOperationFailedReplication = "s3:Replication:OperationFailedReplication"
- EventReplicationOperationNotTracked = "s3:Replication:OperationNotTracked"
- EventReplicationOperationMissedThreshold = "s3:Replication:OperationMissedThreshold"
- EventReplicationOperationReplicatedAfterThreshold = "s3:Replication:OperationReplicatedAfterThreshold"
- EventObjectRestoreDelete = "s3:ObjectRestore:Delete"
- EventLifecycleTransition = "s3:LifecycleTransition"
- EventIntelligentTiering = "s3:IntelligentTiering"
- EventObjectACLPut = "s3:ObjectAcl:Put"
- EventLifecycleExpiration = "s3:LifecycleExpiration:*"
- EventLifecycleExpirationDelete = "s3:LifecycleExpiration:Delete"
- EventLifecycleExpirationDeleteMarkerCreated = "s3:LifecycleExpiration:DeleteMarkerCreated"
- EventObjectTagging = "s3:ObjectTagging:*"
- EventObjectTaggingPut = "s3:ObjectTagging:Put"
- EventObjectTaggingDelete = "s3:ObjectTagging:Delete"
-)
-
-var validEvents = map[string]struct{}{
- EventReducedRedundancyLostObject: {},
- EventObjectCreated: {},
- EventObjectCreatedPut: {},
- EventObjectCreatedPost: {},
- EventObjectCreatedCopy: {},
- EventObjectCreatedCompleteMultipartUpload: {},
- EventObjectRemoved: {},
- EventObjectRemovedDelete: {},
- EventObjectRemovedDeleteMarkerCreated: {},
- EventObjectRestore: {},
- EventObjectRestorePost: {},
- EventObjectRestoreCompleted: {},
- EventReplication: {},
- EventReplicationOperationFailedReplication: {},
- EventReplicationOperationNotTracked: {},
- EventReplicationOperationMissedThreshold: {},
- EventReplicationOperationReplicatedAfterThreshold: {},
- EventObjectRestoreDelete: {},
- EventLifecycleTransition: {},
- EventIntelligentTiering: {},
- EventObjectACLPut: {},
- EventLifecycleExpiration: {},
- EventLifecycleExpirationDelete: {},
- EventLifecycleExpirationDeleteMarkerCreated: {},
- EventObjectTagging: {},
- EventObjectTaggingPut: {},
- EventObjectTaggingDelete: {},
-}
-
-func (h *handler) PutBucketNotificationHandler(w http.ResponseWriter, r *http.Request) {
- reqInfo := middleware.GetReqInfo(r.Context())
- bktInfo, err := h.getBucketAndCheckOwner(r, reqInfo.BucketName)
- if err != nil {
- h.logAndSendError(w, "could not get bucket info", reqInfo, err)
- return
- }
-
- conf := &data.NotificationConfiguration{}
- if err = h.cfg.NewXMLDecoder(r.Body).Decode(conf); err != nil {
- h.logAndSendError(w, "couldn't decode notification configuration", reqInfo, errors.GetAPIError(errors.ErrMalformedXML))
- return
- }
-
- if _, err = h.checkBucketConfiguration(r.Context(), conf, reqInfo); err != nil {
- h.logAndSendError(w, "couldn't check bucket configuration", reqInfo, err)
- return
- }
-
- p := &layer.PutBucketNotificationConfigurationParams{
- RequestInfo: reqInfo,
- BktInfo: bktInfo,
- Configuration: conf,
- }
-
- p.CopiesNumbers, err = h.pickCopiesNumbers(parseMetadata(r), reqInfo.Namespace, bktInfo.LocationConstraint)
- if err != nil {
- h.logAndSendError(w, "invalid copies number", reqInfo, err)
- return
- }
-
- if err = h.obj.PutBucketNotificationConfiguration(r.Context(), p); err != nil {
- h.logAndSendError(w, "couldn't put bucket configuration", reqInfo, err)
- return
- }
-}
-
-func (h *handler) GetBucketNotificationHandler(w http.ResponseWriter, r *http.Request) {
- reqInfo := middleware.GetReqInfo(r.Context())
-
- bktInfo, err := h.getBucketAndCheckOwner(r, reqInfo.BucketName)
- if err != nil {
- h.logAndSendError(w, "could not get bucket info", reqInfo, err)
- return
- }
-
- conf, err := h.obj.GetBucketNotificationConfiguration(r.Context(), bktInfo)
- if err != nil {
- h.logAndSendError(w, "could not get bucket notification configuration", reqInfo, err)
- return
- }
-
- if err = middleware.EncodeToResponse(w, conf); err != nil {
- h.logAndSendError(w, "could not encode bucket notification configuration to response", reqInfo, err)
- return
- }
-}
-
-func (h *handler) sendNotifications(ctx context.Context, p *SendNotificationParams) error {
- if !h.cfg.NotificatorEnabled() {
- return nil
- }
-
- conf, err := h.obj.GetBucketNotificationConfiguration(ctx, p.BktInfo)
- if err != nil {
- return fmt.Errorf("failed to get notification configuration: %w", err)
- }
- if conf.IsEmpty() {
- return nil
- }
-
- box, err := middleware.GetBoxData(ctx)
- if err == nil && box.Gate.BearerToken != nil {
- p.User = bearer.ResolveIssuer(*box.Gate.BearerToken).EncodeToString()
- }
-
- p.Time = layer.TimeNow(ctx)
-
- topics := filterSubjects(conf, p.Event, p.NotificationInfo.Name)
-
- return h.notificator.SendNotifications(topics, p)
-}
-
-// checkBucketConfiguration checks notification configuration and generates an ID for configurations with empty ids.
-func (h *handler) checkBucketConfiguration(ctx context.Context, conf *data.NotificationConfiguration, r *middleware.ReqInfo) (completed bool, err error) {
- if conf == nil {
- return
- }
-
- if conf.TopicConfigurations != nil || conf.LambdaFunctionConfigurations != nil {
- return completed, errors.GetAPIError(errors.ErrNotificationTopicNotSupported)
- }
-
- for i, q := range conf.QueueConfigurations {
- if err = checkEvents(q.Events); err != nil {
- return
- }
-
- if err = checkRules(q.Filter.Key.FilterRules); err != nil {
- return
- }
-
- if h.cfg.NotificatorEnabled() {
- if err = h.notificator.SendTestNotification(q.QueueArn, r.BucketName, r.RequestID, r.Host, layer.TimeNow(ctx)); err != nil {
- return
- }
- } else {
- h.reqLogger(ctx).Warn(logs.FailedToSendTestEventBecauseNotificationsIsDisabled)
- }
-
- if q.ID == "" {
- completed = true
- conf.QueueConfigurations[i].ID = uuid.NewString()
- }
- }
-
- return
-}
-
-func checkRules(rules []data.FilterRule) error {
- names := make(map[string]struct{})
-
- for _, r := range rules {
- if r.Name != filterRuleSuffixName && r.Name != filterRulePrefixName {
- return errors.GetAPIError(errors.ErrFilterNameInvalid)
- }
- if _, ok := names[r.Name]; ok {
- if r.Name == filterRuleSuffixName {
- return errors.GetAPIError(errors.ErrFilterNameSuffix)
- }
- return errors.GetAPIError(errors.ErrFilterNamePrefix)
- }
-
- names[r.Name] = struct{}{}
- }
-
- return nil
-}
-
-func checkEvents(events []string) error {
- for _, e := range events {
- if _, ok := validEvents[e]; !ok {
- return errors.GetAPIError(errors.ErrEventNotification)
- }
- }
-
- return nil
-}
-
-func filterSubjects(conf *data.NotificationConfiguration, eventType, objName string) map[string]string {
- topics := make(map[string]string)
-
- for _, t := range conf.QueueConfigurations {
- event := false
- for _, e := range t.Events {
- // the second condition is comparison with the events ending with *:
- // s3:ObjectCreated:*, s3:ObjectRemoved:* etc without the last char
- if eventType == e || strings.HasPrefix(eventType, e[:len(e)-1]) {
- event = true
- break
- }
- }
-
- if !event {
- continue
- }
-
- filter := true
- for _, f := range t.Filter.Key.FilterRules {
- if f.Name == filterRulePrefixName && !strings.HasPrefix(objName, f.Value) ||
- f.Name == filterRuleSuffixName && !strings.HasSuffix(objName, f.Value) {
- filter = false
- break
- }
- }
- if filter {
- topics[t.ID] = t.QueueArn
- }
- }
-
- return topics
-}
diff --git a/api/handler/notifications_test.go b/api/handler/notifications_test.go
deleted file mode 100644
index 82aa5bf..0000000
--- a/api/handler/notifications_test.go
+++ /dev/null
@@ -1,115 +0,0 @@
-package handler
-
-import (
- "testing"
-
- "git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/data"
- "git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/errors"
- "github.com/stretchr/testify/require"
-)
-
-func TestFilterSubjects(t *testing.T) {
- config := &data.NotificationConfiguration{
- QueueConfigurations: []data.QueueConfiguration{
- {
- ID: "test1",
- QueueArn: "test1",
- Events: []string{EventObjectCreated, EventObjectRemovedDelete},
- },
- {
- ID: "test2",
- QueueArn: "test2",
- Events: []string{EventObjectTagging},
- Filter: data.Filter{Key: data.Key{FilterRules: []data.FilterRule{
- {Name: "prefix", Value: "dir/"},
- {Name: "suffix", Value: ".png"},
- }}},
- },
- },
- }
-
- t.Run("no topics because suitable events not found", func(t *testing.T) {
- topics := filterSubjects(config, EventObjectACLPut, "dir/a.png")
- require.Empty(t, topics)
- })
-
- t.Run("no topics because of not suitable prefix", func(t *testing.T) {
- topics := filterSubjects(config, EventObjectTaggingPut, "dirw/cat.png")
- require.Empty(t, topics)
- })
-
- t.Run("no topics because of not suitable suffix", func(t *testing.T) {
- topics := filterSubjects(config, EventObjectTaggingPut, "a.jpg")
- require.Empty(t, topics)
- })
-
- t.Run("filter topics from queue configs without prefix suffix filter and exact event", func(t *testing.T) {
- topics := filterSubjects(config, EventObjectCreatedPut, "dir/a.png")
- require.Contains(t, topics, "test1")
- require.Len(t, topics, 1)
- require.Equal(t, topics["test1"], "test1")
- })
-
- t.Run("filter topics from queue configs with prefix suffix filter and '*' ending event", func(t *testing.T) {
- topics := filterSubjects(config, EventObjectTaggingPut, "dir/a.png")
- require.Contains(t, topics, "test2")
- require.Len(t, topics, 1)
- require.Equal(t, topics["test2"], "test2")
- })
-}
-
-func TestCheckRules(t *testing.T) {
- t.Run("correct rules with prefix and suffix", func(t *testing.T) {
- rules := []data.FilterRule{
- {Name: "prefix", Value: "asd"},
- {Name: "suffix", Value: "asd"},
- }
- err := checkRules(rules)
- require.NoError(t, err)
- })
-
- t.Run("correct rules with prefix", func(t *testing.T) {
- rules := []data.FilterRule{
- {Name: "prefix", Value: "asd"},
- }
- err := checkRules(rules)
- require.NoError(t, err)
- })
-
- t.Run("correct rules with suffix", func(t *testing.T) {
- rules := []data.FilterRule{
- {Name: "suffix", Value: "asd"},
- }
- err := checkRules(rules)
- require.NoError(t, err)
- })
-
- t.Run("incorrect rules with wrong name", func(t *testing.T) {
- rules := []data.FilterRule{
- {Name: "prefix", Value: "sdf"},
- {Name: "sfx", Value: "asd"},
- }
- err := checkRules(rules)
- require.ErrorIs(t, err, errors.GetAPIError(errors.ErrFilterNameInvalid))
- })
-
- t.Run("incorrect rules with repeating suffix", func(t *testing.T) {
- rules := []data.FilterRule{
- {Name: "suffix", Value: "asd"},
- {Name: "suffix", Value: "asdf"},
- {Name: "prefix", Value: "jk"},
- }
- err := checkRules(rules)
- require.ErrorIs(t, err, errors.GetAPIError(errors.ErrFilterNameSuffix))
- })
-
- t.Run("incorrect rules with repeating prefix", func(t *testing.T) {
- rules := []data.FilterRule{
- {Name: "suffix", Value: "ds"},
- {Name: "prefix", Value: "asd"},
- {Name: "prefix", Value: "asdf"},
- }
- err := checkRules(rules)
- require.ErrorIs(t, err, errors.GetAPIError(errors.ErrFilterNamePrefix))
- })
-}
diff --git a/api/handler/put.go b/api/handler/put.go
index 9e9a4a0..2c180f9 100644
--- a/api/handler/put.go
+++ b/api/handler/put.go
@@ -292,16 +292,6 @@ func (h *handler) PutObjectHandler(w http.ResponseWriter, r *http.Request) {
}
objInfo := extendedObjInfo.ObjectInfo
- s := &SendNotificationParams{
- Event: EventObjectCreatedPut,
- NotificationInfo: data.NotificationInfoFromObject(objInfo, h.cfg.MD5Enabled()),
- BktInfo: bktInfo,
- ReqInfo: reqInfo,
- }
- if err = h.sendNotifications(ctx, s); err != nil {
- h.reqLogger(ctx).Error(logs.CouldntSendNotification, zap.Error(err))
- }
-
if needUpdateEACLTable {
if newEaclTable, err = h.getNewEAclTable(r, bktInfo, objInfo); err != nil {
h.logAndSendError(w, "could not get new eacl table", reqInfo, err)
@@ -319,7 +309,7 @@ func (h *handler) PutObjectHandler(w http.ResponseWriter, r *http.Request) {
TagSet: tagSet,
NodeVersion: extendedObjInfo.NodeVersion,
}
- if _, err = h.obj.PutObjectTagging(r.Context(), tagPrm); err != nil {
+ if err = h.obj.PutObjectTagging(r.Context(), tagPrm); err != nil {
h.logAndSendError(w, "could not upload object tagging", reqInfo, err)
return
}
@@ -560,16 +550,6 @@ func (h *handler) PostObject(w http.ResponseWriter, r *http.Request) {
}
objInfo := extendedObjInfo.ObjectInfo
- s := &SendNotificationParams{
- Event: EventObjectCreatedPost,
- NotificationInfo: data.NotificationInfoFromObject(objInfo, h.cfg.MD5Enabled()),
- BktInfo: bktInfo,
- ReqInfo: reqInfo,
- }
- if err = h.sendNotifications(ctx, s); err != nil {
- h.reqLogger(ctx).Error(logs.CouldntSendNotification, zap.Error(err))
- }
-
if acl := auth.MultipartFormValue(r, "acl"); acl != "" {
r.Header.Set(api.AmzACL, acl)
r.Header.Set(api.AmzGrantFullControl, "")
@@ -592,7 +572,7 @@ func (h *handler) PostObject(w http.ResponseWriter, r *http.Request) {
NodeVersion: extendedObjInfo.NodeVersion,
}
- if _, err = h.obj.PutObjectTagging(ctx, tagPrm); err != nil {
+ if err = h.obj.PutObjectTagging(ctx, tagPrm); err != nil {
h.logAndSendError(w, "could not upload object tagging", reqInfo, err)
return
}
diff --git a/api/handler/tagging.go b/api/handler/tagging.go
index 3900e75..4018d88 100644
--- a/api/handler/tagging.go
+++ b/api/handler/tagging.go
@@ -10,8 +10,6 @@ import (
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/data"
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/errors"
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/middleware"
- "git.frostfs.info/TrueCloudLab/frostfs-s3-gw/internal/logs"
- "go.uber.org/zap"
)
const (
@@ -46,27 +44,12 @@ func (h *handler) PutObjectTaggingHandler(w http.ResponseWriter, r *http.Request
},
TagSet: tagSet,
}
- nodeVersion, err := h.obj.PutObjectTagging(ctx, tagPrm)
- if err != nil {
+
+ if err = h.obj.PutObjectTagging(ctx, tagPrm); err != nil {
h.logAndSendError(w, "could not put object tagging", reqInfo, err)
return
}
- s := &SendNotificationParams{
- Event: EventObjectTaggingPut,
- NotificationInfo: &data.NotificationInfo{
- Name: nodeVersion.FilePath,
- Size: nodeVersion.Size,
- Version: nodeVersion.OID.EncodeToString(),
- HashSum: nodeVersion.ETag,
- },
- BktInfo: bktInfo,
- ReqInfo: reqInfo,
- }
- if err = h.sendNotifications(ctx, s); err != nil {
- h.reqLogger(ctx).Error(logs.CouldntSendNotification, zap.Error(err))
- }
-
w.WriteHeader(http.StatusOK)
}
@@ -123,27 +106,11 @@ func (h *handler) DeleteObjectTaggingHandler(w http.ResponseWriter, r *http.Requ
VersionID: reqInfo.URL.Query().Get(api.QueryVersionID),
}
- nodeVersion, err := h.obj.DeleteObjectTagging(ctx, p)
- if err != nil {
+ if err = h.obj.DeleteObjectTagging(ctx, p); err != nil {
h.logAndSendError(w, "could not delete object tagging", reqInfo, err)
return
}
- s := &SendNotificationParams{
- Event: EventObjectTaggingDelete,
- NotificationInfo: &data.NotificationInfo{
- Name: nodeVersion.FilePath,
- Size: nodeVersion.Size,
- Version: nodeVersion.OID.EncodeToString(),
- HashSum: nodeVersion.ETag,
- },
- BktInfo: bktInfo,
- ReqInfo: reqInfo,
- }
- if err = h.sendNotifications(ctx, s); err != nil {
- h.reqLogger(ctx).Error(logs.CouldntSendNotification, zap.Error(err))
- }
-
w.WriteHeader(http.StatusNoContent)
}
diff --git a/api/handler/unimplemented.go b/api/handler/unimplemented.go
index fccdfc3..0c90789 100644
--- a/api/handler/unimplemented.go
+++ b/api/handler/unimplemented.go
@@ -58,3 +58,11 @@ func (h *handler) PutBucketLifecycleHandler(w http.ResponseWriter, r *http.Reque
func (h *handler) PutBucketEncryptionHandler(w http.ResponseWriter, r *http.Request) {
h.logAndSendError(w, "not implemented", middleware.GetReqInfo(r.Context()), errors.GetAPIError(errors.ErrNotImplemented))
}
+
+func (h *handler) PutBucketNotificationHandler(w http.ResponseWriter, r *http.Request) {
+ h.logAndSendError(w, "not implemented", middleware.GetReqInfo(r.Context()), errors.GetAPIError(errors.ErrNotImplemented))
+}
+
+func (h *handler) GetBucketNotificationHandler(w http.ResponseWriter, r *http.Request) {
+ h.logAndSendError(w, "not implemented", middleware.GetReqInfo(r.Context()), errors.GetAPIError(errors.ErrNotImplemented))
+}
diff --git a/api/layer/cache.go b/api/layer/cache.go
index 23cbd9a..0618dee 100644
--- a/api/layer/cache.go
+++ b/api/layer/cache.go
@@ -257,24 +257,3 @@ func (c *Cache) PutCORS(owner user.ID, bkt *data.BucketInfo, cors *data.CORSConf
func (c *Cache) DeleteCORS(bktInfo *data.BucketInfo) {
c.systemCache.Delete(bktInfo.Name + bktInfo.CORSObjectName())
}
-
-func (c *Cache) GetNotificationConfiguration(owner user.ID, bktInfo *data.BucketInfo) *data.NotificationConfiguration {
- key := bktInfo.Name + bktInfo.NotificationConfigurationObjectName()
-
- if !c.accessCache.Get(owner, key) {
- return nil
- }
-
- return c.systemCache.GetNotificationConfiguration(key)
-}
-
-func (c *Cache) PutNotificationConfiguration(owner user.ID, bktInfo *data.BucketInfo, configuration *data.NotificationConfiguration) {
- key := bktInfo.Name + bktInfo.NotificationConfigurationObjectName()
- if err := c.systemCache.PutNotificationConfiguration(key, configuration); err != nil {
- c.logger.Warn(logs.CouldntCacheNotificationConfiguration, zap.String("bucket", bktInfo.Name), zap.Error(err))
- }
-
- if err := c.accessCache.Put(owner, key); err != nil {
- c.logger.Warn(logs.CouldntCacheAccessControlOperation, zap.Error(err))
- }
-}
diff --git a/api/layer/layer.go b/api/layer/layer.go
index eae130f..e81c3e4 100644
--- a/api/layer/layer.go
+++ b/api/layer/layer.go
@@ -27,23 +27,11 @@ import (
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/session"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/user"
- "github.com/nats-io/nats.go"
"github.com/nspcc-dev/neo-go/pkg/crypto/keys"
"go.uber.org/zap"
)
type (
- EventListener interface {
- Subscribe(context.Context, string, MsgHandler) error
- Listen(context.Context)
- }
-
- MsgHandler interface {
- HandleMessage(context.Context, *nats.Msg) error
- }
-
- MsgHandlerFunc func(context.Context, *nats.Msg) error
-
BucketResolver interface {
Resolve(ctx context.Context, name string) (cid.ID, error)
}
@@ -61,7 +49,6 @@ type (
log *zap.Logger
anonKey AnonymousKey
resolver BucketResolver
- ncontroller EventListener
cache *Cache
treeService TreeService
features FeatureSettings
@@ -215,7 +202,6 @@ type (
// Client provides S3 API client interface.
Client interface {
- Initialize(ctx context.Context, c EventListener) error
EphemeralKey() *keys.PublicKey
GetBucketSettings(ctx context.Context, bktInfo *data.BucketInfo) (*data.BucketSettings, error)
@@ -245,8 +231,8 @@ type (
DeleteBucketTagging(ctx context.Context, bktInfo *data.BucketInfo) error
GetObjectTagging(ctx context.Context, p *data.GetObjectTaggingParams) (string, map[string]string, error)
- PutObjectTagging(ctx context.Context, p *data.PutObjectTaggingParams) (*data.NodeVersion, error)
- DeleteObjectTagging(ctx context.Context, p *data.ObjectVersion) (*data.NodeVersion, error)
+ PutObjectTagging(ctx context.Context, p *data.PutObjectTaggingParams) error
+ DeleteObjectTagging(ctx context.Context, p *data.ObjectVersion) error
PutObject(ctx context.Context, p *PutObjectParams) (*data.ExtendedObjectInfo, error)
@@ -266,9 +252,6 @@ type (
AbortMultipartUpload(ctx context.Context, p *UploadInfoParams) error
ListParts(ctx context.Context, p *ListPartsParams) (*ListPartsInfo, error)
- PutBucketNotificationConfiguration(ctx context.Context, p *PutBucketNotificationConfigurationParams) error
- GetBucketNotificationConfiguration(ctx context.Context, bktInfo *data.BucketInfo) (*data.NotificationConfiguration, error)
-
// Compound methods for optimizations
// GetObjectTaggingAndLock unifies GetObjectTagging and GetLock methods in single tree service invocation.
@@ -300,10 +283,6 @@ func (t *VersionedObject) String() string {
return t.Name + ":" + t.VersionID
}
-func (f MsgHandlerFunc) HandleMessage(ctx context.Context, msg *nats.Msg) error {
- return f(ctx, msg)
-}
-
func (p HeadObjectParams) Versioned() bool {
return len(p.VersionID) > 0
}
@@ -327,23 +306,6 @@ func (n *layer) EphemeralKey() *keys.PublicKey {
return n.anonKey.Key.PublicKey()
}
-func (n *layer) Initialize(ctx context.Context, c EventListener) error {
- if n.IsNotificationEnabled() {
- return fmt.Errorf("already initialized")
- }
-
- // todo add notification handlers (e.g. for lifecycles)
-
- c.Listen(ctx)
-
- n.ncontroller = c
- return nil
-}
-
-func (n *layer) IsNotificationEnabled() bool {
- return n.ncontroller != nil
-}
-
// IsAuthenticatedRequest checks if access box exists in the current request.
func IsAuthenticatedRequest(ctx context.Context) bool {
_, err := middleware.GetBoxData(ctx)
diff --git a/api/layer/notifications.go b/api/layer/notifications.go
deleted file mode 100644
index ecd7276..0000000
--- a/api/layer/notifications.go
+++ /dev/null
@@ -1,89 +0,0 @@
-package layer
-
-import (
- "bytes"
- "context"
- "encoding/xml"
- errorsStd "errors"
- "fmt"
-
- "git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/data"
- "git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/middleware"
- "git.frostfs.info/TrueCloudLab/frostfs-s3-gw/internal/logs"
- "go.uber.org/zap"
-)
-
-type PutBucketNotificationConfigurationParams struct {
- RequestInfo *middleware.ReqInfo
- BktInfo *data.BucketInfo
- Configuration *data.NotificationConfiguration
- CopiesNumbers []uint32
-}
-
-func (n *layer) PutBucketNotificationConfiguration(ctx context.Context, p *PutBucketNotificationConfigurationParams) error {
- confXML, err := xml.Marshal(p.Configuration)
- if err != nil {
- return fmt.Errorf("marshal notify configuration: %w", err)
- }
-
- prm := PrmObjectCreate{
- Container: p.BktInfo.CID,
- Payload: bytes.NewReader(confXML),
- Filepath: p.BktInfo.NotificationConfigurationObjectName(),
- CreationTime: TimeNow(ctx),
- CopiesNumber: p.CopiesNumbers,
- }
-
- _, objID, _, _, err := n.objectPutAndHash(ctx, prm, p.BktInfo)
- if err != nil {
- return err
- }
-
- objIDToDelete, err := n.treeService.PutNotificationConfigurationNode(ctx, p.BktInfo, objID)
- objIDToDeleteNotFound := errorsStd.Is(err, ErrNoNodeToRemove)
- if err != nil && !objIDToDeleteNotFound {
- return err
- }
-
- if !objIDToDeleteNotFound {
- if err = n.objectDelete(ctx, p.BktInfo, objIDToDelete); err != nil {
- n.reqLogger(ctx).Error(logs.CouldntDeleteNotificationConfigurationObject, zap.Error(err),
- zap.String("cid", p.BktInfo.CID.EncodeToString()),
- zap.String("oid", objIDToDelete.EncodeToString()))
- }
- }
-
- n.cache.PutNotificationConfiguration(n.BearerOwner(ctx), p.BktInfo, p.Configuration)
-
- return nil
-}
-
-func (n *layer) GetBucketNotificationConfiguration(ctx context.Context, bktInfo *data.BucketInfo) (*data.NotificationConfiguration, error) {
- owner := n.BearerOwner(ctx)
- if conf := n.cache.GetNotificationConfiguration(owner, bktInfo); conf != nil {
- return conf, nil
- }
-
- objID, err := n.treeService.GetNotificationConfigurationNode(ctx, bktInfo)
- objIDNotFound := errorsStd.Is(err, ErrNodeNotFound)
- if err != nil && !objIDNotFound {
- return nil, err
- }
-
- conf := &data.NotificationConfiguration{}
-
- if !objIDNotFound {
- obj, err := n.objectGet(ctx, bktInfo, objID)
- if err != nil {
- return nil, err
- }
-
- if err = xml.Unmarshal(obj.Payload(), &conf); err != nil {
- return nil, fmt.Errorf("unmarshal notify configuration: %w", err)
- }
- }
-
- n.cache.PutNotificationConfiguration(owner, bktInfo, conf)
-
- return conf, nil
-}
diff --git a/api/layer/tagging.go b/api/layer/tagging.go
index ba10653..7ef7b3d 100644
--- a/api/layer/tagging.go
+++ b/api/layer/tagging.go
@@ -50,12 +50,12 @@ func (n *layer) GetObjectTagging(ctx context.Context, p *data.GetObjectTaggingPa
return p.ObjectVersion.VersionID, tags, nil
}
-func (n *layer) PutObjectTagging(ctx context.Context, p *data.PutObjectTaggingParams) (nodeVersion *data.NodeVersion, err error) {
- nodeVersion = p.NodeVersion
+func (n *layer) PutObjectTagging(ctx context.Context, p *data.PutObjectTaggingParams) (err error) {
+ nodeVersion := p.NodeVersion
if nodeVersion == nil {
nodeVersion, err = n.getNodeVersionFromCacheOrFrostfs(ctx, p.ObjectVersion)
if err != nil {
- return nil, err
+ return err
}
}
p.ObjectVersion.VersionID = nodeVersion.OID.EncodeToString()
@@ -63,35 +63,35 @@ func (n *layer) PutObjectTagging(ctx context.Context, p *data.PutObjectTaggingPa
err = n.treeService.PutObjectTagging(ctx, p.ObjectVersion.BktInfo, nodeVersion, p.TagSet)
if err != nil {
if errors.Is(err, ErrNodeNotFound) {
- return nil, fmt.Errorf("%w: %s", s3errors.GetAPIError(s3errors.ErrNoSuchKey), err.Error())
+ return fmt.Errorf("%w: %s", s3errors.GetAPIError(s3errors.ErrNoSuchKey), err.Error())
}
- return nil, err
+ return err
}
n.cache.PutTagging(n.BearerOwner(ctx), objectTaggingCacheKey(p.ObjectVersion), p.TagSet)
- return nodeVersion, nil
+ return nil
}
-func (n *layer) DeleteObjectTagging(ctx context.Context, p *data.ObjectVersion) (*data.NodeVersion, error) {
+func (n *layer) DeleteObjectTagging(ctx context.Context, p *data.ObjectVersion) error {
version, err := n.getNodeVersion(ctx, p)
if err != nil {
- return nil, err
+ return err
}
err = n.treeService.DeleteObjectTagging(ctx, p.BktInfo, version)
if err != nil {
if errors.Is(err, ErrNodeNotFound) {
- return nil, fmt.Errorf("%w: %s", s3errors.GetAPIError(s3errors.ErrNoSuchKey), err.Error())
+ return fmt.Errorf("%w: %s", s3errors.GetAPIError(s3errors.ErrNoSuchKey), err.Error())
}
- return nil, err
+ return err
}
p.VersionID = version.OID.EncodeToString()
n.cache.DeleteTagging(objectTaggingCacheKey(p))
- return version, nil
+ return nil
}
func (n *layer) GetBucketTagging(ctx context.Context, bktInfo *data.BucketInfo) (map[string]string, error) {
diff --git a/api/layer/tree_mock.go b/api/layer/tree_mock.go
index 3a02e4f..4497fd0 100644
--- a/api/layer/tree_mock.go
+++ b/api/layer/tree_mock.go
@@ -110,14 +110,6 @@ func (t *TreeServiceMock) GetSettingsNode(_ context.Context, bktInfo *data.Bucke
return settings, nil
}
-func (t *TreeServiceMock) GetNotificationConfigurationNode(context.Context, *data.BucketInfo) (oid.ID, error) {
- panic("implement me")
-}
-
-func (t *TreeServiceMock) PutNotificationConfigurationNode(context.Context, *data.BucketInfo, oid.ID) (oid.ID, error) {
- panic("implement me")
-}
-
func (t *TreeServiceMock) GetBucketCORS(_ context.Context, bktInfo *data.BucketInfo) (oid.ID, error) {
systemMap, ok := t.system[bktInfo.CID.EncodeToString()]
if !ok {
diff --git a/api/layer/tree_service.go b/api/layer/tree_service.go
index 1ec59c7..001bad4 100644
--- a/api/layer/tree_service.go
+++ b/api/layer/tree_service.go
@@ -18,17 +18,6 @@ type TreeService interface {
// If tree node is not found returns ErrNodeNotFound error.
GetSettingsNode(ctx context.Context, bktInfo *data.BucketInfo) (*data.BucketSettings, error)
- // GetNotificationConfigurationNode gets an object id that corresponds to object with bucket CORS.
- //
- // If tree node is not found returns ErrNodeNotFound error.
- GetNotificationConfigurationNode(ctx context.Context, bktInfo *data.BucketInfo) (oid.ID, error)
-
- // PutNotificationConfigurationNode puts a node to a system tree
- // and returns objectID of a previous notif config which must be deleted in FrostFS.
- //
- // If object id to remove is not found returns ErrNoNodeToRemove error.
- PutNotificationConfigurationNode(ctx context.Context, bktInfo *data.BucketInfo, objID oid.ID) (oid.ID, error)
-
// GetBucketCORS gets an object id that corresponds to object with bucket CORS.
//
// If object id is not found returns ErrNodeNotFound error.
diff --git a/api/notifications/controller.go b/api/notifications/controller.go
deleted file mode 100644
index 1ce6919..0000000
--- a/api/notifications/controller.go
+++ /dev/null
@@ -1,263 +0,0 @@
-package notifications
-
-import (
- "context"
- "encoding/json"
- "fmt"
- "sync"
- "time"
-
- "git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/handler"
- "git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/layer"
- "git.frostfs.info/TrueCloudLab/frostfs-s3-gw/internal/logs"
- "github.com/nats-io/nats.go"
- "go.uber.org/zap"
-)
-
-const (
- DefaultTimeout = 30 * time.Second
-
- // EventVersion23 is used for lifecycle, tiering, objectACL, objectTagging, object restoration notifications.
- EventVersion23 = "2.3"
- // EventVersion22 is used for replication notifications.
- EventVersion22 = "2.2"
- // EventVersion21 is used for all other notification types.
- EventVersion21 = "2.1"
-)
-
-type (
- Options struct {
- URL string
- TLSCertFilepath string
- TLSAuthPrivateKeyFilePath string
- Timeout time.Duration
- RootCAFiles []string
- }
-
- Controller struct {
- logger *zap.Logger
- taskQueueConnection *nats.Conn
- jsClient nats.JetStreamContext
- handlers map[string]Stream
- mu sync.RWMutex
- }
-
- Stream struct {
- h layer.MsgHandler
- ch chan *nats.Msg
- }
-
- TestEvent struct {
- Service string
- Event string
- Time time.Time
- Bucket string
- RequestID string
- HostID string
- }
-
- Event struct {
- Records []EventRecord `json:"Records"`
- }
-
- EventRecord struct {
- EventVersion string `json:"eventVersion"`
- EventSource string `json:"eventSource"` // frostfs:s3
- AWSRegion string `json:"awsRegion,omitempty"` // empty
- EventTime time.Time `json:"eventTime"`
- EventName string `json:"eventName"`
- UserIdentity UserIdentity `json:"userIdentity"`
- RequestParameters RequestParameters `json:"requestParameters"`
- ResponseElements map[string]string `json:"responseElements"`
- S3 S3Entity `json:"s3"`
- }
-
- UserIdentity struct {
- PrincipalID string `json:"principalId"`
- }
-
- RequestParameters struct {
- SourceIPAddress string `json:"sourceIPAddress"`
- }
-
- S3Entity struct {
- SchemaVersion string `json:"s3SchemaVersion"`
- ConfigurationID string `json:"configurationId,omitempty"`
- Bucket Bucket `json:"bucket"`
- Object Object `json:"object"`
- }
-
- Bucket struct {
- Name string `json:"name"`
- OwnerIdentity UserIdentity `json:"ownerIdentity,omitempty"`
- Arn string `json:"arn,omitempty"`
- }
-
- Object struct {
- Key string `json:"key"`
- Size uint64 `json:"size,omitempty"`
- VersionID string `json:"versionId,omitempty"`
- ETag string `json:"eTag,omitempty"`
- Sequencer string `json:"sequencer,omitempty"`
- }
-)
-
-func NewController(p *Options, l *zap.Logger) (*Controller, error) {
- ncopts := []nats.Option{
- nats.Timeout(p.Timeout),
- }
-
- if len(p.TLSCertFilepath) != 0 && len(p.TLSAuthPrivateKeyFilePath) != 0 {
- ncopts = append(ncopts, nats.ClientCert(p.TLSCertFilepath, p.TLSAuthPrivateKeyFilePath))
- }
- if len(p.RootCAFiles) != 0 {
- ncopts = append(ncopts, nats.RootCAs(p.RootCAFiles...))
- }
-
- nc, err := nats.Connect(p.URL, ncopts...)
- if err != nil {
- return nil, fmt.Errorf("connect to nats: %w", err)
- }
-
- js, err := nc.JetStream()
- if err != nil {
- return nil, fmt.Errorf("get jet stream: %w", err)
- }
-
- return &Controller{
- logger: l,
- taskQueueConnection: nc,
- jsClient: js,
- handlers: make(map[string]Stream),
- }, nil
-}
-
-func (c *Controller) Subscribe(_ context.Context, topic string, handler layer.MsgHandler) error {
- ch := make(chan *nats.Msg, 1)
-
- c.mu.RLock()
- _, ok := c.handlers[topic]
- c.mu.RUnlock()
- if ok {
- return fmt.Errorf("already subscribed to topic '%s'", topic)
- }
-
- if _, err := c.jsClient.AddStream(&nats.StreamConfig{Name: topic}); err != nil {
- return fmt.Errorf("add stream: %w", err)
- }
-
- if _, err := c.jsClient.ChanSubscribe(topic, ch); err != nil {
- return fmt.Errorf("could not subscribe: %w", err)
- }
-
- c.mu.Lock()
- c.handlers[topic] = Stream{
- h: handler,
- ch: ch,
- }
- c.mu.Unlock()
-
- return nil
-}
-
-func (c *Controller) Listen(ctx context.Context) {
- c.mu.RLock()
- defer c.mu.RUnlock()
-
- for _, stream := range c.handlers {
- go func(stream Stream) {
- for {
- select {
- case msg := <-stream.ch:
- if err := stream.h.HandleMessage(ctx, msg); err != nil {
- c.logger.Error(logs.CouldNotHandleMessage, zap.Error(err))
- } else if err = msg.Ack(); err != nil {
- c.logger.Error(logs.CouldNotACKMessage, zap.Error(err))
- }
- case <-ctx.Done():
- return
- }
- }
- }(stream)
- }
-}
-
-func (c *Controller) SendNotifications(topics map[string]string, p *handler.SendNotificationParams) error {
- event := prepareEvent(p)
-
- for id, topic := range topics {
- event.Records[0].S3.ConfigurationID = id
- msg, err := json.Marshal(event)
- if err != nil {
- c.logger.Error(logs.CouldntMarshalAnEvent, zap.String("subject", topic), zap.Error(err))
- }
- if err = c.publish(topic, msg); err != nil {
- c.logger.Error(logs.CouldntSendAnEventToTopic, zap.String("subject", topic), zap.Error(err))
- }
- }
-
- return nil
-}
-
-func (c *Controller) SendTestNotification(topic, bucketName, requestID, HostID string, now time.Time) error {
- event := &TestEvent{
- Service: "FrostFS S3",
- Event: "s3:TestEvent",
- Time: now,
- Bucket: bucketName,
- RequestID: requestID,
- HostID: HostID,
- }
-
- msg, err := json.Marshal(event)
- if err != nil {
- return fmt.Errorf("couldn't marshal test event: %w", err)
- }
-
- return c.publish(topic, msg)
-}
-
-func prepareEvent(p *handler.SendNotificationParams) *Event {
- return &Event{
- Records: []EventRecord{
- {
- EventVersion: EventVersion21,
- EventSource: "frostfs:s3",
- AWSRegion: "",
- EventTime: p.Time,
- EventName: p.Event,
- UserIdentity: UserIdentity{
- PrincipalID: p.User,
- },
- RequestParameters: RequestParameters{
- SourceIPAddress: p.ReqInfo.RemoteHost,
- },
- ResponseElements: nil,
- S3: S3Entity{
- SchemaVersion: "1.0",
- // ConfigurationID is skipped and will be placed later
- Bucket: Bucket{
- Name: p.BktInfo.Name,
- OwnerIdentity: UserIdentity{PrincipalID: p.BktInfo.Owner.String()},
- Arn: p.BktInfo.Name,
- },
- Object: Object{
- Key: p.NotificationInfo.Name,
- Size: p.NotificationInfo.Size,
- VersionID: p.NotificationInfo.Version,
- ETag: p.NotificationInfo.HashSum,
- Sequencer: "",
- },
- },
- },
- },
- }
-}
-
-func (c *Controller) publish(topic string, msg []byte) error {
- if _, err := c.jsClient.Publish(topic, msg); err != nil {
- return fmt.Errorf("couldn't send event: %w", err)
- }
-
- return nil
-}
diff --git a/cmd/s3-gw/app.go b/cmd/s3-gw/app.go
index 3f72695..b022b50 100644
--- a/cmd/s3-gw/app.go
+++ b/cmd/s3-gw/app.go
@@ -24,7 +24,6 @@ import (
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/handler"
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/layer"
s3middleware "git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/middleware"
- "git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/notifications"
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/resolver"
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/creds/tokens"
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/internal/frostfs"
@@ -62,7 +61,6 @@ type (
pool *pool.Pool
treePool *treepool.Pool
key *keys.PrivateKey
- nc *notifications.Controller
obj layer.Client
api api.Handler
@@ -88,7 +86,6 @@ type (
maxClient maxClientsConfig
defaultMaxAge int
reconnectInterval time.Duration
- notificatorEnabled bool
resolveZoneList []string
isResolveListAllow bool // True if ResolveZoneList contains allowed zones
frostfsidValidation bool
@@ -157,13 +154,13 @@ func (a *App) init(ctx context.Context) {
a.setRuntimeParameters()
a.initFrostfsID(ctx)
a.initPolicyStorage(ctx)
- a.initAPI(ctx)
+ a.initAPI()
a.initMetrics()
a.initServers(ctx)
a.initTracing(ctx)
}
-func (a *App) initLayer(ctx context.Context) {
+func (a *App) initLayer() {
a.initResolver()
// prepare random key for anonymous requests
@@ -188,18 +185,6 @@ func (a *App) initLayer(ctx context.Context) {
// prepare object layer
a.obj = layer.NewLayer(a.log, frostfs.NewFrostFS(a.pool, a.key), layerCfg)
-
- if a.cfg.GetBool(cfgEnableNATS) {
- nopts := getNotificationsOptions(a.cfg, a.log)
- a.nc, err = notifications.NewController(nopts, a.log)
- if err != nil {
- a.log.Fatal(logs.FailedToEnableNotifications, zap.Error(err))
- }
-
- if err = a.obj.Initialize(ctx, a.nc); err != nil {
- a.log.Fatal(logs.CouldntInitializeLayer, zap.Error(err))
- }
- }
}
func newAppSettings(log *Logger, v *viper.Viper) *appSettings {
@@ -208,7 +193,6 @@ func newAppSettings(log *Logger, v *viper.Viper) *appSettings {
maxClient: newMaxClients(v),
defaultMaxAge: fetchDefaultMaxAge(v, log.logger),
reconnectInterval: fetchReconnectInterval(v),
- notificatorEnabled: v.GetBool(cfgEnableNATS),
frostfsidValidation: v.GetBool(cfgFrostfsIDValidationEnabled),
}
@@ -343,10 +327,6 @@ func (s *appSettings) DefaultMaxAge() int {
return s.defaultMaxAge
}
-func (s *appSettings) NotificatorEnabled() bool {
- return s.notificatorEnabled
-}
-
func (s *appSettings) ResolveZoneList() []string {
return s.resolveZoneList
}
@@ -468,8 +448,8 @@ func (s *appSettings) RetryStrategy() handler.RetryStrategy {
return s.retryStrategy
}
-func (a *App) initAPI(ctx context.Context) {
- a.initLayer(ctx)
+func (a *App) initAPI() {
+ a.initLayer()
a.initHandler()
}
@@ -908,17 +888,6 @@ func (a *App) stopServices() {
}
}
-func getNotificationsOptions(v *viper.Viper, l *zap.Logger) *notifications.Options {
- cfg := notifications.Options{}
- cfg.URL = v.GetString(cfgNATSEndpoint)
- cfg.Timeout = fetchNATSTimeout(v, l)
- cfg.TLSCertFilepath = v.GetString(cfgNATSTLSCertFile)
- cfg.TLSAuthPrivateKeyFilePath = v.GetString(cfgNATSAuthPrivateKeyFile)
- cfg.RootCAFiles = v.GetStringSlice(cfgNATSRootCAFiles)
-
- return &cfg
-}
-
func getCacheOptions(v *viper.Viper, l *zap.Logger) *layer.CachesConfig {
cacheCfg := layer.DefaultCachesConfigs(l)
@@ -976,7 +945,7 @@ func getFrostfsIDCacheConfig(v *viper.Viper, l *zap.Logger) *cache.Config {
func (a *App) initHandler() {
var err error
- a.api, err = handler.New(a.log, a.obj, a.nc, a.settings, a.policyStorage, a.frostfsid)
+ a.api, err = handler.New(a.log, a.obj, a.settings, a.policyStorage, a.frostfsid)
if err != nil {
a.log.Fatal(logs.CouldNotInitializeAPIHandler, zap.Error(err))
}
diff --git a/cmd/s3-gw/app_settings.go b/cmd/s3-gw/app_settings.go
index dc2a336..ca20cc7 100644
--- a/cmd/s3-gw/app_settings.go
+++ b/cmd/s3-gw/app_settings.go
@@ -14,7 +14,6 @@ import (
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api"
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/handler"
- "git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/notifications"
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/resolver"
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/internal/logs"
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/internal/version"
@@ -120,14 +119,6 @@ const ( // Settings.
cfgAccessBoxCacheRemovingCheckInterval = "cache.accessbox.removing_check_interval"
- // NATS.
- cfgEnableNATS = "nats.enabled"
- cfgNATSEndpoint = "nats.endpoint"
- cfgNATSTimeout = "nats.timeout"
- cfgNATSTLSCertFile = "nats.cert_file"
- cfgNATSAuthPrivateKeyFile = "nats.key_file"
- cfgNATSRootCAFiles = "nats.root_ca"
-
// Policy.
cfgPolicyDefault = "placement_policy.default"
cfgPolicyRegionMapFile = "placement_policy.region_mapping"
@@ -376,19 +367,6 @@ func fetchDefaultPolicy(l *zap.Logger, cfg *viper.Viper) netmap.PlacementPolicy
return policy
}
-func fetchNATSTimeout(cfg *viper.Viper, l *zap.Logger) time.Duration {
- timeout := cfg.GetDuration(cfgNATSTimeout)
- if timeout <= 0 {
- l.Error(logs.InvalidLifetimeUsingDefaultValue,
- zap.String("parameter", cfgNATSTimeout),
- zap.Duration("value in config", timeout),
- zap.Duration("default", notifications.DefaultTimeout))
- timeout = notifications.DefaultTimeout
- }
-
- return timeout
-}
-
func fetchCacheLifetime(v *viper.Viper, l *zap.Logger, cfgEntry string, defaultValue time.Duration) time.Duration {
if v.IsSet(cfgEntry) {
lifetime := v.GetDuration(cfgEntry)
diff --git a/config/config.env b/config/config.env
index 82c0a09..a16ecff 100644
--- a/config/config.env
+++ b/config/config.env
@@ -88,7 +88,7 @@ S3_GW_CACHE_BUCKETS_SIZE=1000
# Cache which contains mapping of nice name to object addresses
S3_GW_CACHE_NAMES_LIFETIME=1m
S3_GW_CACHE_NAMES_SIZE=10000
- # Cache for system objects in a bucket: bucket settings, notification configuration etc
+# Cache for system objects in a bucket: bucket settings etc
S3_GW_CACHE_SYSTEM_LIFETIME=5m
S3_GW_CACHE_SYSTEM_SIZE=100000
# Cache which stores access box with tokens by its address
@@ -105,14 +105,6 @@ S3_GW_CACHE_MORPH_POLICY_SIZE=10000
S3_GW_CACHE_FROSTFSID_LIFETIME=1m
S3_GW_CACHE_FROSTFSID_SIZE=10000
-# NATS
-S3_GW_NATS_ENABLED=true
-S3_GW_NATS_ENDPOINT=nats://nats.frostfs.devenv:4222
-S3_GW_NATS_TIMEOUT=30s
-S3_GW_NATS_CERT_FILE=/path/to/cert
-S3_GW_NATS_KEY_FILE=/path/to/key
-S3_GW_NATS_ROOT_CA=/path/to/ca
-
# Default policy of placing containers in FrostFS
# If a user sends a request `CreateBucket` and doesn't define policy for placing of a container in FrostFS, the S3 Gateway
# will put the container with default policy. It can be specified via environment variable, e.g.:
diff --git a/config/config.yaml b/config/config.yaml
index a492f7a..d9cc827 100644
--- a/config/config.yaml
+++ b/config/config.yaml
@@ -105,7 +105,7 @@ cache:
buckets:
lifetime: 1m
size: 500
- # Cache for system objects in a bucket: bucket settings, notification configuration etc
+ # Cache for system objects in a bucket: bucket settings etc
system:
lifetime: 2m
size: 1000
@@ -127,14 +127,6 @@ cache:
lifetime: 1m
size: 10000
-nats:
- enabled: true
- endpoint: nats://localhost:4222
- timeout: 30s
- cert_file: /path/to/cert
- key_file: /path/to/key
- root_ca: /path/to/ca
-
# Parameters of FrostFS container placement policy
placement_policy:
# Default policy of placing containers in FrostFS
diff --git a/docs/configuration.md b/docs/configuration.md
index 8633908..d25837a 100644
--- a/docs/configuration.md
+++ b/docs/configuration.md
@@ -177,7 +177,6 @@ There are some custom types used for brevity:
| `server` | [Server configuration](#server-section) |
| `logger` | [Logger configuration](#logger-section) |
| `cache` | [Cache configuration](#cache-section) |
-| `nats` | [NATS configuration](#nats-section) |
| `cors` | [CORS configuration](#cors-section) |
| `pprof` | [Pprof configuration](#pprof-section) |
| `prometheus` | [Prometheus configuration](#prometheus-section) |
@@ -411,18 +410,18 @@ cache:
size: 10000
```
-| Parameter | Type | Default value | Description |
-|-----------------|-------------------------------------------------|-----------------------------------|----------------------------------------------------------------------------------------|
-| `objects` | [Cache config](#cache-subsection) | `lifetime: 5m`
`size: 1000000` | Cache for objects (FrostFS headers). |
-| `list` | [Cache config](#cache-subsection) | `lifetime: 60s`
`size: 100000` | Cache which keeps lists of objects in buckets. |
-| `list_session` | [Cache config](#cache-subsection) | `lifetime: 60s`
`size: 100` | Cache which keeps listing session. |
-| `names` | [Cache config](#cache-subsection) | `lifetime: 60s`
`size: 10000` | Cache which contains mapping of nice name to object addresses. |
-| `buckets` | [Cache config](#cache-subsection) | `lifetime: 60s`
`size: 1000` | Cache which contains mapping of bucket name to bucket info. |
-| `system` | [Cache config](#cache-subsection) | `lifetime: 5m`
`size: 10000` | Cache for system objects in a bucket: bucket settings, notification configuration etc. |
-| `accessbox` | [Accessbox cache config](#accessbox-subsection) | `lifetime: 10m`
`size: 100` | Cache which stores access box with tokens by its address. |
-| `accesscontrol` | [Cache config](#cache-subsection) | `lifetime: 1m`
`size: 100000` | Cache which stores owner to cache operation mapping. |
-| `morph_policy` | [Cache config](#cache-subsection) | `lifetime: 1m`
`size: 10000` | Cache which stores list of policy chains. |
-| `frostfsid` | [Cache config](#cache-subsection) | `lifetime: 1m`
`size: 10000` | Cache which stores FrostfsID subject info. |
+| Parameter | Type | Default value | Description |
+|-----------------|-------------------------------------------------|-----------------------------------|----------------------------------------------------------------|
+| `objects` | [Cache config](#cache-subsection) | `lifetime: 5m`
`size: 1000000` | Cache for objects (FrostFS headers). |
+| `list` | [Cache config](#cache-subsection) | `lifetime: 60s`
`size: 100000` | Cache which keeps lists of objects in buckets. |
+| `list_session` | [Cache config](#cache-subsection) | `lifetime: 60s`
`size: 100` | Cache which keeps listing session. |
+| `names` | [Cache config](#cache-subsection) | `lifetime: 60s`
`size: 10000` | Cache which contains mapping of nice name to object addresses. |
+| `buckets` | [Cache config](#cache-subsection) | `lifetime: 60s`
`size: 1000` | Cache which contains mapping of bucket name to bucket info. |
+| `system` | [Cache config](#cache-subsection) | `lifetime: 5m`
`size: 10000` | Cache for system objects in a bucket: bucket settings etc. |
+| `accessbox` | [Accessbox cache config](#accessbox-subsection) | `lifetime: 10m`
`size: 100` | Cache which stores access box with tokens by its address. |
+| `accesscontrol` | [Cache config](#cache-subsection) | `lifetime: 1m`
`size: 100000` | Cache which stores owner to cache operation mapping. |
+| `morph_policy` | [Cache config](#cache-subsection) | `lifetime: 1m`
`size: 10000` | Cache which stores list of policy chains. |
+| `frostfsid` | [Cache config](#cache-subsection) | `lifetime: 1m`
`size: 10000` | Cache which stores FrostfsID subject info. |
#### `cache` subsection
@@ -450,36 +449,6 @@ size: 100
| `lifetime` | `duration` | '10m' | Lifetime of entries in cache. |
| `size` | `int` | '100 | LRU cache size. |
-### `nats` section
-
-This is an advanced section, use with caution.
-You can turn on notifications about successful completions of basic operations, and the gateway will send notifications
-via NATS JetStream.
-
-1. to configure the NATS server with JetStream
-2. to specify NATS parameters for the S3 GW. It's ***necessary*** to define a values of `nats.enable` or
- `S3_GW_NATS_ENABLED` as `True`
-3. to configure notifications in a bucket
-
-```yaml
-nats:
- enabled: true
- endpoint: nats://localhost:4222
- timeout: 30s
- cert_file: /path/to/cert
- key_file: /path/to/key
- root_ca: /path/to/ca
-```
-
-| Parameter | Type | Default value | Description |
-|---------------|------------|---------------|------------------------------------------------------|
-| `enabled` | `bool` | `false` | Flag to enable the service. |
-| `endpoint` | `string` | | NATS endpoint to connect to. |
-| `timeout` | `duration` | `30s` | Timeout for the object notification operation. |
-| `certificate` | `string` | | Path to the client certificate. |
-| `key` | `string` | | Path to the client key. |
-| `ca` | `string` | | Override root CA used to verify server certificates. |
-
### `cors` section
```yaml
diff --git a/docs/tree_service.md b/docs/tree_service.md
index d9b119c..81553b5 100644
--- a/docs/tree_service.md
+++ b/docs/tree_service.md
@@ -13,6 +13,5 @@ Each node keeps one of the types of data as a set of **key-value pairs**:
Some data takes up a lot of memory, so we store it in FrostFS nodes as an object with payload.
But we keep these objects' metadata in the Tree service too:
-* Notification configuration
* CORS
* Metadata of parts of active multipart uploads
diff --git a/go.mod b/go.mod
index a3e74b6..21f0633 100644
--- a/go.mod
+++ b/go.mod
@@ -15,7 +15,6 @@ require (
github.com/go-chi/chi/v5 v5.0.8
github.com/google/uuid v1.3.1
github.com/minio/sio v0.3.0
- github.com/nats-io/nats.go v1.13.1-0.20220121202836-972a071d373d
github.com/nspcc-dev/neo-go v0.105.0
github.com/panjf2000/ants/v2 v2.5.0
github.com/prometheus/client_golang v1.15.1
@@ -68,9 +67,6 @@ require (
github.com/matttproud/golang_protobuf_extensions v1.0.4 // indirect
github.com/mitchellh/mapstructure v1.5.0 // indirect
github.com/mr-tron/base58 v1.2.0 // indirect
- github.com/nats-io/nats-server/v2 v2.7.1 // indirect
- github.com/nats-io/nkeys v0.3.0 // indirect
- github.com/nats-io/nuid v1.0.1 // indirect
github.com/nspcc-dev/go-ordered-json v0.0.0-20231123160306-3374ff1e7a3c // indirect
github.com/nspcc-dev/neo-go/pkg/interop v0.0.0-20231127165613-b35f351f0ba0 // indirect
github.com/nspcc-dev/rfc6979 v0.2.0 // indirect
diff --git a/go.sum b/go.sum
index 674bf90..e2bd872 100644
--- a/go.sum
+++ b/go.sum
@@ -223,7 +223,6 @@ github.com/josharian/intern v1.0.0/go.mod h1:5DoeVV0s6jJacbCEi61lwdGj/aVlrQvzHFF
github.com/jstemmer/go-junit-report v0.0.0-20190106144839-af01ea7f8024/go.mod h1:6v2b51hI/fHJwM22ozAgKL4VKDeJcHhJFhtBdhmNjmU=
github.com/jstemmer/go-junit-report v0.9.1/go.mod h1:Brl9GWCQeLvo8nXZwPNNblvFj/XSXhF0NWZEnDohbsk=
github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck=
-github.com/klauspost/compress v1.13.4 h1:0zhec2I8zGnjWcKyLl6i3gPqKANCCn5e9xmviEEeX6s=
github.com/kr/fs v0.1.0/go.mod h1:FFnZGqtBN9Gxj7eW1uZ42v5BccTP0vu6NEaFoC2HwRg=
github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo=
github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE=
@@ -236,7 +235,6 @@ github.com/mailru/easyjson v0.7.7 h1:UGYAvKxe3sBsEDzO8ZeWOSlIQfWFlxbzLZe7hwFURr0
github.com/mailru/easyjson v0.7.7/go.mod h1:xzfreul335JAWq5oZzymOObrkdz5UnU4kGfJJLY9Nlc=
github.com/matttproud/golang_protobuf_extensions v1.0.4 h1:mmDVorXM7PCGKw94cs5zkfA9PSy5pEvNWRP0ET0TIVo=
github.com/matttproud/golang_protobuf_extensions v1.0.4/go.mod h1:BSXmuO+STAnVfrANrmjBb36TMTDstsz7MSK+HVaYKv4=
-github.com/minio/highwayhash v1.0.1 h1:dZ6IIu8Z14VlC0VpfKofAhCy74wu/Qb5gcn52yWoz/0=
github.com/minio/sio v0.3.0 h1:syEFBewzOMOYVzSTFpp1MqpSZk8rUNbz8VIIc+PNzus=
github.com/minio/sio v0.3.0/go.mod h1:8b0yPp2avGThviy/+OCJBI6OMpvxoUuiLvE6F1lebhw=
github.com/mitchellh/mapstructure v1.5.0 h1:jeMsZIYE/09sWLaz43PL7Gy6RuMjD2eJVyuac5Z2hdY=
@@ -244,15 +242,6 @@ github.com/mitchellh/mapstructure v1.5.0/go.mod h1:bFUtVrKA4DC2yAKiSyO/QUcy7e+RR
github.com/mmcloughlin/addchain v0.4.0 h1:SobOdjm2xLj1KkXN5/n0xTIWyZA2+s99UCY1iPfkHRY=
github.com/mr-tron/base58 v1.2.0 h1:T/HDJBh4ZCPbU39/+c3rRvE0uKBQlU27+QI8LJ4t64o=
github.com/mr-tron/base58 v1.2.0/go.mod h1:BinMc/sQntlIE1frQmRFPUoPA1Zkr8VRgBdjWI2mNwc=
-github.com/nats-io/jwt/v2 v2.2.1-0.20220113022732-58e87895b296 h1:vU9tpM3apjYlLLeY23zRWJ9Zktr5jp+mloR942LEOpY=
-github.com/nats-io/nats-server/v2 v2.7.1 h1:SDj8R0PJPVekw3EgHxGtTfJUuMbsuaul1nwWFI3xTyk=
-github.com/nats-io/nats-server/v2 v2.7.1/go.mod h1:tckmrt0M6bVaDT3kmh9UrIq/CBOBBse+TpXQi5ldaa8=
-github.com/nats-io/nats.go v1.13.1-0.20220121202836-972a071d373d h1:GRSmEJutHkdoxKsRypP575IIdoXe7Bm6yHQF6GcDBnA=
-github.com/nats-io/nats.go v1.13.1-0.20220121202836-972a071d373d/go.mod h1:BPko4oXsySz4aSWeFgOHLZs3G4Jq4ZAyE6/zMCxRT6w=
-github.com/nats-io/nkeys v0.3.0 h1:cgM5tL53EvYRU+2YLXIK0G2mJtK12Ft9oeooSZMA2G8=
-github.com/nats-io/nkeys v0.3.0/go.mod h1:gvUNGjVcM2IPr5rCsRsC6Wb3Hr2CQAm08dsxtV6A5y4=
-github.com/nats-io/nuid v1.0.1 h1:5iA8DT8V7q8WK2EScv2padNa/rTESc1KdnPw4TC2paw=
-github.com/nats-io/nuid v1.0.1/go.mod h1:19wcPz3Ph3q0Jbyiqsd0kePYG7A95tJPxeL+1OSON2c=
github.com/nspcc-dev/go-ordered-json v0.0.0-20231123160306-3374ff1e7a3c h1:OOQeE613BH93ICPq3eke5N78gWNeMjcBWkmD2NKyXVg=
github.com/nspcc-dev/go-ordered-json v0.0.0-20231123160306-3374ff1e7a3c/go.mod h1:79bEUDEviBHJMFV6Iq6in57FEOCMcRhfQnfaf0ETA5U=
github.com/nspcc-dev/neo-go v0.105.0 h1:vtNZYFEFySK8zRDhLzQYha849VzWrcKezlnq/oNQg/w=
@@ -375,7 +364,6 @@ golang.org/x/crypto v0.0.0-20190513172903-22d7a77e9e5f/go.mod h1:yigFU9vqHzYiE8U
golang.org/x/crypto v0.0.0-20190605123033-f99c8df09eb5/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
-golang.org/x/crypto v0.0.0-20210314154223-e6e6c4f2bb5b/go.mod h1:T9bdIzuCu7OtxOm1hfPfRQxPLYneinmdGuTeoZ9dtd4=
golang.org/x/crypto v0.0.0-20210421170649-83a5a9bb288b/go.mod h1:T9bdIzuCu7OtxOm1hfPfRQxPLYneinmdGuTeoZ9dtd4=
golang.org/x/crypto v0.0.0-20211108221036-ceb1ce70b4fa/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc=
golang.org/x/crypto v0.21.0 h1:X31++rzVUdKhX5sWmSOFZxx8UW/ldWx55cbf08iNAMA=
@@ -541,7 +529,6 @@ golang.org/x/text v0.14.0/go.mod h1:18ZOQIKpY8NJVqYksKHtTdi31H5itFRjB5/qKTNYzSU=
golang.org/x/time v0.0.0-20181108054448-85acf8d2951c/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
golang.org/x/time v0.0.0-20190308202827-9d24e82272b4/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
golang.org/x/time v0.0.0-20191024005414-555d28b269f0/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
-golang.org/x/time v0.1.0 h1:xYY+Bajn2a7VBmTM5GikTmnK8ZuX8YgnQCqZpbBNtmA=
golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
golang.org/x/tools v0.0.0-20190114222345-bf090417da8b/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
golang.org/x/tools v0.0.0-20190226205152-f727befe758c/go.mod h1:9Yl7xja0Znq3iFh3HoIrodX9oNMXvdceNzlUR8zjMvY=
diff --git a/internal/logs/logs.go b/internal/logs/logs.go
index da9a0d4..e3502ee 100644
--- a/internal/logs/logs.go
+++ b/internal/logs/logs.go
@@ -54,16 +54,12 @@ const (
InvalidCacheEntryType = "invalid cache entry type" // Warn in ../../api/cache/*
InvalidCacheKeyType = "invalid cache key type" // Warn in ../../api/cache/objectslist.go
ObjectIsCopied = "object is copied" // Info in ../../api/handler/copy.go
- CouldntSendNotification = "couldn't send notification: %w" // Error in ../../api/handler/*
- FailedToSendTestEventBecauseNotificationsIsDisabled = "failed to send test event because notifications is disabled" // Warn in ../../api/handler/notifications.go
RequestFailed = "request failed" // Error in ../../api/handler/util.go
GetBucketInfo = "get bucket info" // Warn in ../../api/handler/cors.go
GetBucketCors = "get bucket cors" // Warn in ../../api/handler/cors.go
SomeACLNotFullyMapped = "some acl not fully mapped" // Warn in ../../api/handler/acl.go
CouldntDeleteObject = "couldn't delete object" // Error in ../../api/layer/layer.go
- NotificatorIsDisabledS3WontProduceNotificationEvents = "notificator is disabled, s3 won't produce notification events" // Warn in ../../api/handler/api.go
BucketIsCreated = "bucket is created" // Info in ../../api/handler/put.go
- CouldntDeleteNotificationConfigurationObject = "couldn't delete notification configuration object" // Error in ../../api/layer/notifications.go
CouldNotParseContainerObjectLockEnabledAttribute = "could not parse container object lock enabled attribute" // Error in ../../api/layer/container.go
CouldNotListUserContainers = "could not list user containers" // Error in ../../api/layer/container.go
CouldNotFetchContainerInfo = "could not fetch container info" // Error in ../../api/layer/container.go
@@ -93,7 +89,6 @@ const (
CouldntCacheLockInfo = "couldn't cache lock info" // Error in ../../api/layer/cache.go
CouldntCacheBucketSettings = "couldn't cache bucket settings" // Warn in ../../api/layer/cache.go
CouldntCacheCors = "couldn't cache cors" // Warn in ../../api/layer/cache.go
- CouldntCacheNotificationConfiguration = "couldn't cache notification configuration" // Warn in ../../api/layer/cache.go
CouldntCacheListPolicyChains = "couldn't cache list policy chains" // Warn in ../../api/layer/cache.go
RequestEnd = "request end" // Info in ../../api/middleware/response.go
CouldntReceiveAccessBoxForGateKeyRandomKeyWillBeUsed = "couldn't receive access box for gate key, random key will be used" // Debug in ../../api/middleware/auth.go
@@ -101,15 +96,9 @@ const (
FailedToResolveCID = "failed to resolve CID" // Debug in ../../api/middleware/metrics.go
RequestStart = "request start" // Info in ../../api/middleware/reqinfo.go
FailedToUnescapeObjectName = "failed to unescape object name" // Warn in ../../api/middleware/reqinfo.go
- CouldNotHandleMessage = "could not handle message" // Error in ../../api/notifications/controller.go
- CouldNotACKMessage = "could not ACK message" // Error in ../../api/notifications/controller.go
- CouldntMarshalAnEvent = "couldn't marshal an event" // Error in ../../api/notifications/controller.go
- CouldntSendAnEventToTopic = "couldn't send an event to topic" // Error in ../../api/notifications/controller.go
InvalidDefaultMaxAge = "invalid defaultMaxAge" // Fatal in ../../cmd/s3-gw/app_settings.go
CantShutDownService = "can't shut down service" // Panic in ../../cmd/s3-gw/service.go
CouldntGenerateRandomKey = "couldn't generate random key" // Fatal in ../../cmd/s3-gw/app.go
- FailedToEnableNotifications = "failed to enable notifications" // Fatal in ../../cmd/s3-gw/app.go
- CouldntInitializeLayer = "couldn't initialize layer" // Fatal in ../../cmd/s3-gw/app.go
FailedToCreateResolver = "failed to create resolver" // Fatal in ../../cmd/s3-gw/app.go
CouldNotLoadFrostFSPrivateKey = "could not load FrostFS private key" // Fatal in ../../cmd/s3-gw/app.go
FailedToCreateConnectionPool = "failed to create connection pool" // Fatal in ../../cmd/s3-gw/app.go
diff --git a/pkg/service/tree/tree.go b/pkg/service/tree/tree.go
index 003da90..963cccd 100644
--- a/pkg/service/tree/tree.go
+++ b/pkg/service/tree/tree.go
@@ -108,7 +108,6 @@ const (
createdKV = "Created"
settingsFileName = "bucket-settings"
- notifConfFileName = "bucket-notifications"
corsFilename = "bucket-cors"
bucketTaggingFilename = "bucket-tagging"
@@ -116,7 +115,7 @@ const (
versionTree = "version"
// systemTree -- ID of a tree with system objects
- // i.e. bucket settings with versioning and lock configuration, cors, notifications.
+ // i.e. bucket settings with versioning and lock configuration, cors.
systemTree = "system"
separator = "/"
@@ -400,36 +399,6 @@ func (c *Tree) PutSettingsNode(ctx context.Context, bktInfo *data.BucketInfo, se
return c.service.MoveNode(ctx, bktInfo, systemTree, node.ID, 0, meta)
}
-func (c *Tree) GetNotificationConfigurationNode(ctx context.Context, bktInfo *data.BucketInfo) (oid.ID, error) {
- node, err := c.getSystemNode(ctx, bktInfo, []string{notifConfFileName}, []string{oidKV})
- if err != nil {
- return oid.ID{}, err
- }
-
- return node.ObjID, nil
-}
-
-func (c *Tree) PutNotificationConfigurationNode(ctx context.Context, bktInfo *data.BucketInfo, objID oid.ID) (oid.ID, error) {
- node, err := c.getSystemNode(ctx, bktInfo, []string{notifConfFileName}, []string{oidKV})
- isErrNotFound := errors.Is(err, layer.ErrNodeNotFound)
- if err != nil && !isErrNotFound {
- return oid.ID{}, fmt.Errorf("couldn't get node: %w", err)
- }
-
- meta := make(map[string]string)
- meta[FileNameKey] = notifConfFileName
- meta[oidKV] = objID.EncodeToString()
-
- if isErrNotFound {
- if _, err = c.service.AddNode(ctx, bktInfo, systemTree, 0, meta); err != nil {
- return oid.ID{}, err
- }
- return oid.ID{}, layer.ErrNoNodeToRemove
- }
-
- return node.ObjID, c.service.MoveNode(ctx, bktInfo, systemTree, node.ID, 0, meta)
-}
-
func (c *Tree) GetBucketCORS(ctx context.Context, bktInfo *data.BucketInfo) (oid.ID, error) {
node, err := c.getSystemNode(ctx, bktInfo, []string{corsFilename}, []string{oidKV})
if err != nil {