[#401] Drop notifications #409

Merged
alexvanin merged 1 commit from dkirillov/frostfs-s3-gw:feature/401-drop_notifications into master 2024-06-26 11:28:17 +00:00
33 changed files with 66 additions and 1282 deletions

View file

@ -24,6 +24,7 @@ This document outlines major changes between releases.
### Removed ### Removed
- Remove control api (#406) - Remove control api (#406)
- Remove notifications (#401)
## [0.29.0] - Zemu - 2024-05-27 ## [0.29.0] - Zemu - 2024-05-27

View file

@ -182,24 +182,6 @@ func TestSettingsCacheType(t *testing.T) {
assertInvalidCacheEntry(t, cache.GetSettings(key), observedLog) assertInvalidCacheEntry(t, cache.GetSettings(key), observedLog)
} }
func TestNotificationConfigurationCacheType(t *testing.T) {
logger, observedLog := getObservedLogger()
cache := NewSystemCache(DefaultSystemConfig(logger))
key := "key"
notificationConfig := &data.NotificationConfiguration{}
err := cache.PutNotificationConfiguration(key, notificationConfig)
require.NoError(t, err)
val := cache.GetNotificationConfiguration(key)
require.Equal(t, notificationConfig, val)
require.Equal(t, 0, observedLog.Len())
err = cache.cache.Set(key, "tmp")
require.NoError(t, err)
assertInvalidCacheEntry(t, cache.GetNotificationConfiguration(key), observedLog)
}
func TestFrostFSIDSubjectCacheType(t *testing.T) { func TestFrostFSIDSubjectCacheType(t *testing.T) {
logger, observedLog := getObservedLogger() logger, observedLog := getObservedLogger()
cache := NewFrostfsIDCache(DefaultFrostfsIDConfig(logger)) cache := NewFrostfsIDCache(DefaultFrostfsIDConfig(logger))

20
api/cache/system.go vendored
View file

@ -104,22 +104,6 @@ func (o *SystemCache) GetSettings(key string) *data.BucketSettings {
return result return result
} }
func (o *SystemCache) GetNotificationConfiguration(key string) *data.NotificationConfiguration {
entry, err := o.cache.Get(key)
if err != nil {
return nil
}
result, ok := entry.(*data.NotificationConfiguration)
if !ok {
o.logger.Warn(logs.InvalidCacheEntryType, zap.String("actual", fmt.Sprintf("%T", entry)),
zap.String("expected", fmt.Sprintf("%T", result)))
return nil
}
return result
}
// GetTagging returns tags of a bucket or an object. // GetTagging returns tags of a bucket or an object.
func (o *SystemCache) GetTagging(key string) map[string]string { func (o *SystemCache) GetTagging(key string) map[string]string {
entry, err := o.cache.Get(key) entry, err := o.cache.Get(key)
@ -153,10 +137,6 @@ func (o *SystemCache) PutSettings(key string, settings *data.BucketSettings) err
return o.cache.Set(key, settings) return o.cache.Set(key, settings)
} }
func (o *SystemCache) PutNotificationConfiguration(key string, obj *data.NotificationConfiguration) error {
return o.cache.Set(key, obj)
}
// PutTagging puts tags of a bucket or an object. // PutTagging puts tags of a bucket or an object.
func (o *SystemCache) PutTagging(key string, tagSet map[string]string) error { func (o *SystemCache) PutTagging(key string, tagSet map[string]string) error {
return o.cache.Set(key, tagSet) return o.cache.Set(key, tagSet)

View file

@ -12,9 +12,8 @@ import (
) )
const ( const (
bktSettingsObject = ".s3-settings" bktSettingsObject = ".s3-settings"
bktCORSConfigurationObject = ".s3-cors" bktCORSConfigurationObject = ".s3-cors"
bktNotificationConfigurationObject = ".s3-notifications"
VersioningUnversioned = "Unversioned" VersioningUnversioned = "Unversioned"
VersioningEnabled = "Enabled" VersioningEnabled = "Enabled"
@ -52,14 +51,6 @@ type (
Headers map[string]string Headers map[string]string
} }
// NotificationInfo store info to send s3 notification.
NotificationInfo struct {
Name string
Version string
Size uint64
HashSum string
}
// BucketSettings stores settings such as versioning. // BucketSettings stores settings such as versioning.
BucketSettings struct { BucketSettings struct {
Versioning string Versioning string
@ -93,26 +84,12 @@ type (
} }
) )
// NotificationInfoFromObject creates new NotificationInfo from ObjectInfo.
func NotificationInfoFromObject(objInfo *ObjectInfo, md5Enabled bool) *NotificationInfo {
return &NotificationInfo{
Name: objInfo.Name,
Version: objInfo.VersionID(),
Size: objInfo.Size,
HashSum: Quote(objInfo.ETag(md5Enabled)),
}
}
// SettingsObjectName is a system name for a bucket settings file. // SettingsObjectName is a system name for a bucket settings file.
func (b *BucketInfo) SettingsObjectName() string { return bktSettingsObject } func (b *BucketInfo) SettingsObjectName() string { return bktSettingsObject }
// CORSObjectName returns a system name for a bucket CORS configuration file. // CORSObjectName returns a system name for a bucket CORS configuration file.
func (b *BucketInfo) CORSObjectName() string { return bktCORSConfigurationObject } func (b *BucketInfo) CORSObjectName() string { return bktCORSConfigurationObject }
func (b *BucketInfo) NotificationConfigurationObjectName() string {
return bktNotificationConfigurationObject
}
// VersionID returns object version from ObjectInfo. // VersionID returns object version from ObjectInfo.
func (o *ObjectInfo) VersionID() string { return o.ID.EncodeToString() } func (o *ObjectInfo) VersionID() string { return o.ID.EncodeToString() }

View file

@ -1,42 +0,0 @@
package data
import "encoding/xml"
type (
NotificationConfiguration struct {
XMLName xml.Name `xml:"http://s3.amazonaws.com/doc/2006-03-01/ NotificationConfiguration" json:"-"`
QueueConfigurations []QueueConfiguration `xml:"QueueConfiguration" json:"QueueConfigurations"`
// Not supported topics
TopicConfigurations []TopicConfiguration `xml:"TopicConfiguration" json:"TopicConfigurations"`
LambdaFunctionConfigurations []LambdaFunctionConfiguration `xml:"CloudFunctionConfiguration" json:"CloudFunctionConfigurations"`
}
QueueConfiguration struct {
ID string `xml:"Id" json:"Id"`
QueueArn string `xml:"Queue" json:"Queue"`
Events []string `xml:"Event" json:"Events"`
Filter Filter `xml:"Filter" json:"Filter"`
}
Filter struct {
Key Key `xml:"S3Key" json:"S3Key"`
}
Key struct {
FilterRules []FilterRule `xml:"FilterRule" json:"FilterRules"`
}
FilterRule struct {
Name string `xml:"Name" json:"Name"`
Value string `xml:"Value" json:"Value"`
}
// TopicConfiguration and LambdaFunctionConfiguration -- we don't support these configurations,
// but we need them to detect in notification configurations in requests.
TopicConfiguration struct{}
LambdaFunctionConfiguration struct{}
)
func (n NotificationConfiguration) IsEmpty() bool {
return len(n.QueueConfigurations) == 0 && len(n.TopicConfigurations) == 0 && len(n.LambdaFunctionConfigurations) == 0
}

View file

@ -616,22 +616,11 @@ func (h *handler) PutObjectACLHandler(w http.ResponseWriter, r *http.Request) {
return return
} }
updated, err := h.updateBucketACL(r, astObject, bktInfo, token) if _, err = h.updateBucketACL(r, astObject, bktInfo, token); err != nil {
if err != nil {
h.logAndSendError(w, "could not update bucket acl", reqInfo, err) h.logAndSendError(w, "could not update bucket acl", reqInfo, err)
return return
} }
if updated {
s := &SendNotificationParams{
Event: EventObjectACLPut,
NotificationInfo: data.NotificationInfoFromObject(objInfo, h.cfg.MD5Enabled()),
BktInfo: bktInfo,
ReqInfo: reqInfo,
}
if err = h.sendNotifications(ctx, s); err != nil {
h.reqLogger(ctx).Error(logs.CouldntSendNotification, zap.Error(err))
}
}
w.WriteHeader(http.StatusOK) w.WriteHeader(http.StatusOK)
} }

View file

@ -11,7 +11,6 @@ import (
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api" "git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api"
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/layer" "git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/layer"
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/internal/logs"
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id" cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/netmap" "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/netmap"
"git.frostfs.info/TrueCloudLab/policy-engine/pkg/chain" "git.frostfs.info/TrueCloudLab/policy-engine/pkg/chain"
@ -20,17 +19,11 @@ import (
type ( type (
handler struct { handler struct {
log *zap.Logger log *zap.Logger
obj layer.Client obj layer.Client
notificator Notificator cfg Config
cfg Config ape APE
ape APE frostfsid FrostFSID
frostfsid FrostFSID
}
Notificator interface {
SendNotifications(topics map[string]string, p *SendNotificationParams) error
SendTestNotification(topic, bucketName, requestID, HostID string, now time.Time) error
} }
// Config contains data which handler needs to keep. // Config contains data which handler needs to keep.
@ -41,7 +34,6 @@ type (
DefaultCopiesNumbers(namespace string) []uint32 DefaultCopiesNumbers(namespace string) []uint32
NewXMLDecoder(io.Reader) *xml.Decoder NewXMLDecoder(io.Reader) *xml.Decoder
DefaultMaxAge() int DefaultMaxAge() int
NotificatorEnabled() bool
ResolveZoneList() []string ResolveZoneList() []string
IsResolveListAllow() bool IsResolveListAllow() bool
BypassContentEncodingInChunks() bool BypassContentEncodingInChunks() bool
@ -76,7 +68,7 @@ const (
var _ api.Handler = (*handler)(nil) var _ api.Handler = (*handler)(nil)
// New creates new api.Handler using given logger and client. // New creates new api.Handler using given logger and client.
func New(log *zap.Logger, obj layer.Client, notificator Notificator, cfg Config, storage APE, ffsid FrostFSID) (api.Handler, error) { func New(log *zap.Logger, obj layer.Client, cfg Config, storage APE, ffsid FrostFSID) (api.Handler, error) {
switch { switch {
case obj == nil: case obj == nil:
return nil, errors.New("empty FrostFS Object Layer") return nil, errors.New("empty FrostFS Object Layer")
@ -88,19 +80,12 @@ func New(log *zap.Logger, obj layer.Client, notificator Notificator, cfg Config,
return nil, errors.New("empty frostfsid") return nil, errors.New("empty frostfsid")
} }
if !cfg.NotificatorEnabled() {
log.Warn(logs.NotificatorIsDisabledS3WontProduceNotificationEvents)
} else if notificator == nil {
return nil, errors.New("empty notificator")
}
return &handler{ return &handler{
log: log, log: log,
obj: obj, obj: obj,
cfg: cfg, cfg: cfg,
ape: storage, ape: storage,
notificator: notificator, frostfsid: ffsid,
frostfsid: ffsid,
}, nil }, nil
} }

View file

@ -268,7 +268,7 @@ func (h *handler) CopyObjectHandler(w http.ResponseWriter, r *http.Request) {
TagSet: tagSet, TagSet: tagSet,
NodeVersion: extendedDstObjInfo.NodeVersion, NodeVersion: extendedDstObjInfo.NodeVersion,
} }
if _, err = h.obj.PutObjectTagging(ctx, tagPrm); err != nil { if err = h.obj.PutObjectTagging(ctx, tagPrm); err != nil {
h.logAndSendError(w, "could not upload object tagging", reqInfo, err) h.logAndSendError(w, "could not upload object tagging", reqInfo, err)
return return
} }
@ -276,16 +276,6 @@ func (h *handler) CopyObjectHandler(w http.ResponseWriter, r *http.Request) {
h.reqLogger(ctx).Info(logs.ObjectIsCopied, zap.Stringer("object_id", dstObjInfo.ID)) h.reqLogger(ctx).Info(logs.ObjectIsCopied, zap.Stringer("object_id", dstObjInfo.ID))
s := &SendNotificationParams{
Event: EventObjectCreatedCopy,
NotificationInfo: data.NotificationInfoFromObject(dstObjInfo, h.cfg.MD5Enabled()),
BktInfo: dstBktInfo,
ReqInfo: reqInfo,
}
if err = h.sendNotifications(ctx, s); err != nil {
h.reqLogger(ctx).Error(logs.CouldntSendNotification, zap.Error(err))
}
if dstEncryptionParams.Enabled() { if dstEncryptionParams.Enabled() {
addSSECHeaders(w.Header(), r.Header) addSSECHeaders(w.Header(), r.Header)
} }

View file

@ -8,16 +8,12 @@ import (
"strings" "strings"
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api" "git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api"
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/data"
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/errors" "git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/errors"
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/layer" "git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/layer"
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/middleware" "git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/middleware"
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/internal/logs"
apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status" apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status"
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/session" "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/session"
"git.frostfs.info/TrueCloudLab/policy-engine/pkg/chain" "git.frostfs.info/TrueCloudLab/policy-engine/pkg/chain"
"go.uber.org/zap"
) )
// limitation of AWS https://docs.aws.amazon.com/AmazonS3/latest/API/API_DeleteObjects.html // limitation of AWS https://docs.aws.amazon.com/AmazonS3/latest/API/API_DeleteObjects.html
@ -101,41 +97,6 @@ func (h *handler) DeleteObjectHandler(w http.ResponseWriter, r *http.Request) {
return return
} }
var m *SendNotificationParams
if bktSettings.VersioningEnabled() && len(versionID) == 0 {
m = &SendNotificationParams{
Event: EventObjectRemovedDeleteMarkerCreated,
NotificationInfo: &data.NotificationInfo{
Name: reqInfo.ObjectName,
HashSum: deletedObject.DeleteMarkerEtag,
},
BktInfo: bktInfo,
ReqInfo: reqInfo,
}
} else {
var objID oid.ID
if len(versionID) != 0 {
if err = objID.DecodeString(versionID); err != nil {
h.reqLogger(ctx).Error(logs.CouldntSendNotification, zap.Error(err))
}
}
m = &SendNotificationParams{
Event: EventObjectRemovedDelete,
NotificationInfo: &data.NotificationInfo{
Name: reqInfo.ObjectName,
Version: objID.EncodeToString(),
},
BktInfo: bktInfo,
ReqInfo: reqInfo,
}
}
if err = h.sendNotifications(ctx, m); err != nil {
h.reqLogger(ctx).Error(logs.CouldntSendNotification, zap.Error(err))
}
if deletedObject.VersionID != "" { if deletedObject.VersionID != "" {
w.Header().Set(api.AmzVersionID, deletedObject.VersionID) w.Header().Set(api.AmzVersionID, deletedObject.VersionID)
} }

View file

@ -104,10 +104,6 @@ func (c *configMock) DefaultMaxAge() int {
return 0 return 0
} }
func (c *configMock) NotificatorEnabled() bool {
return false
}
func (c *configMock) ResolveZoneList() []string { func (c *configMock) ResolveZoneList() []string {
return []string{} return []string{}
} }

View file

@ -13,7 +13,6 @@ import (
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/errors" "git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/errors"
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/layer" "git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/layer"
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/middleware" "git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/middleware"
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/internal/logs"
"github.com/google/uuid" "github.com/google/uuid"
"go.uber.org/zap" "go.uber.org/zap"
) )
@ -456,7 +455,7 @@ func (h *handler) CompleteMultipartUploadHandler(w http.ResponseWriter, r *http.
// Start complete multipart upload which may take some time to fetch object // Start complete multipart upload which may take some time to fetch object
// and re-upload it part by part. // and re-upload it part by part.
objInfo, err := h.completeMultipartUpload(r, c, bktInfo, reqInfo) objInfo, err := h.completeMultipartUpload(r, c, bktInfo)
if err != nil { if err != nil {
h.logAndSendError(w, "complete multipart error", reqInfo, err, additional...) h.logAndSendError(w, "complete multipart error", reqInfo, err, additional...)
@ -478,7 +477,7 @@ func (h *handler) CompleteMultipartUploadHandler(w http.ResponseWriter, r *http.
} }
} }
func (h *handler) completeMultipartUpload(r *http.Request, c *layer.CompleteMultipartParams, bktInfo *data.BucketInfo, reqInfo *middleware.ReqInfo) (*data.ObjectInfo, error) { func (h *handler) completeMultipartUpload(r *http.Request, c *layer.CompleteMultipartParams, bktInfo *data.BucketInfo) (*data.ObjectInfo, error) {
ctx := r.Context() ctx := r.Context()
uploadData, extendedObjInfo, err := h.obj.CompleteMultipartUpload(ctx, c) uploadData, extendedObjInfo, err := h.obj.CompleteMultipartUpload(ctx, c)
if err != nil { if err != nil {
@ -496,7 +495,7 @@ func (h *handler) completeMultipartUpload(r *http.Request, c *layer.CompleteMult
TagSet: uploadData.TagSet, TagSet: uploadData.TagSet,
NodeVersion: extendedObjInfo.NodeVersion, NodeVersion: extendedObjInfo.NodeVersion,
} }
if _, err = h.obj.PutObjectTagging(ctx, tagPrm); err != nil { if err = h.obj.PutObjectTagging(ctx, tagPrm); err != nil {
return nil, fmt.Errorf("could not put tagging file of completed multipart upload: %w", err) return nil, fmt.Errorf("could not put tagging file of completed multipart upload: %w", err)
} }
} }
@ -528,16 +527,6 @@ func (h *handler) completeMultipartUpload(r *http.Request, c *layer.CompleteMult
} }
} }
s := &SendNotificationParams{
Event: EventObjectCreatedCompleteMultipartUpload,
NotificationInfo: data.NotificationInfoFromObject(objInfo, h.cfg.MD5Enabled()),
BktInfo: bktInfo,
ReqInfo: reqInfo,
}
if err = h.sendNotifications(ctx, s); err != nil {
h.reqLogger(ctx).Error(logs.CouldntSendNotification, zap.Error(err))
}
return objInfo, nil return objInfo, nil
} }

View file

@ -1,274 +0,0 @@
package handler
import (
"context"
"fmt"
"net/http"
"strings"
"time"
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/data"
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/errors"
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/layer"
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/middleware"
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/internal/logs"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/bearer"
"github.com/google/uuid"
)
type (
SendNotificationParams struct {
Event string
NotificationInfo *data.NotificationInfo
BktInfo *data.BucketInfo
ReqInfo *middleware.ReqInfo
User string
Time time.Time
}
)
const (
filterRuleSuffixName = "suffix"
filterRulePrefixName = "prefix"
EventObjectCreated = "s3:ObjectCreated:*"
EventObjectCreatedPut = "s3:ObjectCreated:Put"
EventObjectCreatedPost = "s3:ObjectCreated:Post"
EventObjectCreatedCopy = "s3:ObjectCreated:Copy"
EventReducedRedundancyLostObject = "s3:ReducedRedundancyLostObject"
EventObjectCreatedCompleteMultipartUpload = "s3:ObjectCreated:CompleteMultipartUpload"
EventObjectRemoved = "s3:ObjectRemoved:*"
EventObjectRemovedDelete = "s3:ObjectRemoved:Delete"
EventObjectRemovedDeleteMarkerCreated = "s3:ObjectRemoved:DeleteMarkerCreated"
EventObjectRestore = "s3:ObjectRestore:*"
EventObjectRestorePost = "s3:ObjectRestore:Post"
EventObjectRestoreCompleted = "s3:ObjectRestore:Completed"
EventReplication = "s3:Replication:*"
EventReplicationOperationFailedReplication = "s3:Replication:OperationFailedReplication"
EventReplicationOperationNotTracked = "s3:Replication:OperationNotTracked"
EventReplicationOperationMissedThreshold = "s3:Replication:OperationMissedThreshold"
EventReplicationOperationReplicatedAfterThreshold = "s3:Replication:OperationReplicatedAfterThreshold"
EventObjectRestoreDelete = "s3:ObjectRestore:Delete"
EventLifecycleTransition = "s3:LifecycleTransition"
EventIntelligentTiering = "s3:IntelligentTiering"
EventObjectACLPut = "s3:ObjectAcl:Put"
EventLifecycleExpiration = "s3:LifecycleExpiration:*"
EventLifecycleExpirationDelete = "s3:LifecycleExpiration:Delete"
EventLifecycleExpirationDeleteMarkerCreated = "s3:LifecycleExpiration:DeleteMarkerCreated"
EventObjectTagging = "s3:ObjectTagging:*"
EventObjectTaggingPut = "s3:ObjectTagging:Put"
EventObjectTaggingDelete = "s3:ObjectTagging:Delete"
)
var validEvents = map[string]struct{}{
EventReducedRedundancyLostObject: {},
EventObjectCreated: {},
EventObjectCreatedPut: {},
EventObjectCreatedPost: {},
EventObjectCreatedCopy: {},
EventObjectCreatedCompleteMultipartUpload: {},
EventObjectRemoved: {},
EventObjectRemovedDelete: {},
EventObjectRemovedDeleteMarkerCreated: {},
EventObjectRestore: {},
EventObjectRestorePost: {},
EventObjectRestoreCompleted: {},
EventReplication: {},
EventReplicationOperationFailedReplication: {},
EventReplicationOperationNotTracked: {},
EventReplicationOperationMissedThreshold: {},
EventReplicationOperationReplicatedAfterThreshold: {},
EventObjectRestoreDelete: {},
EventLifecycleTransition: {},
EventIntelligentTiering: {},
EventObjectACLPut: {},
EventLifecycleExpiration: {},
EventLifecycleExpirationDelete: {},
EventLifecycleExpirationDeleteMarkerCreated: {},
EventObjectTagging: {},
EventObjectTaggingPut: {},
EventObjectTaggingDelete: {},
}
func (h *handler) PutBucketNotificationHandler(w http.ResponseWriter, r *http.Request) {
reqInfo := middleware.GetReqInfo(r.Context())
bktInfo, err := h.getBucketAndCheckOwner(r, reqInfo.BucketName)
if err != nil {
h.logAndSendError(w, "could not get bucket info", reqInfo, err)
return
}
conf := &data.NotificationConfiguration{}
if err = h.cfg.NewXMLDecoder(r.Body).Decode(conf); err != nil {
h.logAndSendError(w, "couldn't decode notification configuration", reqInfo, errors.GetAPIError(errors.ErrMalformedXML))
return
}
if _, err = h.checkBucketConfiguration(r.Context(), conf, reqInfo); err != nil {
h.logAndSendError(w, "couldn't check bucket configuration", reqInfo, err)
return
}
p := &layer.PutBucketNotificationConfigurationParams{
RequestInfo: reqInfo,
BktInfo: bktInfo,
Configuration: conf,
}
p.CopiesNumbers, err = h.pickCopiesNumbers(parseMetadata(r), reqInfo.Namespace, bktInfo.LocationConstraint)
if err != nil {
h.logAndSendError(w, "invalid copies number", reqInfo, err)
return
}
if err = h.obj.PutBucketNotificationConfiguration(r.Context(), p); err != nil {
h.logAndSendError(w, "couldn't put bucket configuration", reqInfo, err)
return
}
}
func (h *handler) GetBucketNotificationHandler(w http.ResponseWriter, r *http.Request) {
reqInfo := middleware.GetReqInfo(r.Context())
bktInfo, err := h.getBucketAndCheckOwner(r, reqInfo.BucketName)
if err != nil {
h.logAndSendError(w, "could not get bucket info", reqInfo, err)
return
}
conf, err := h.obj.GetBucketNotificationConfiguration(r.Context(), bktInfo)
if err != nil {
h.logAndSendError(w, "could not get bucket notification configuration", reqInfo, err)
return
}
if err = middleware.EncodeToResponse(w, conf); err != nil {
h.logAndSendError(w, "could not encode bucket notification configuration to response", reqInfo, err)
return
}
}
func (h *handler) sendNotifications(ctx context.Context, p *SendNotificationParams) error {
if !h.cfg.NotificatorEnabled() {
return nil
}
conf, err := h.obj.GetBucketNotificationConfiguration(ctx, p.BktInfo)
if err != nil {
return fmt.Errorf("failed to get notification configuration: %w", err)
}
if conf.IsEmpty() {
return nil
}
box, err := middleware.GetBoxData(ctx)
if err == nil && box.Gate.BearerToken != nil {
p.User = bearer.ResolveIssuer(*box.Gate.BearerToken).EncodeToString()
}
p.Time = layer.TimeNow(ctx)
topics := filterSubjects(conf, p.Event, p.NotificationInfo.Name)
return h.notificator.SendNotifications(topics, p)
}
// checkBucketConfiguration checks notification configuration and generates an ID for configurations with empty ids.
func (h *handler) checkBucketConfiguration(ctx context.Context, conf *data.NotificationConfiguration, r *middleware.ReqInfo) (completed bool, err error) {
if conf == nil {
return
}
if conf.TopicConfigurations != nil || conf.LambdaFunctionConfigurations != nil {
return completed, errors.GetAPIError(errors.ErrNotificationTopicNotSupported)
}
for i, q := range conf.QueueConfigurations {
if err = checkEvents(q.Events); err != nil {
return
}
if err = checkRules(q.Filter.Key.FilterRules); err != nil {
return
}
if h.cfg.NotificatorEnabled() {
if err = h.notificator.SendTestNotification(q.QueueArn, r.BucketName, r.RequestID, r.Host, layer.TimeNow(ctx)); err != nil {
return
}
} else {
h.reqLogger(ctx).Warn(logs.FailedToSendTestEventBecauseNotificationsIsDisabled)
}
if q.ID == "" {
completed = true
conf.QueueConfigurations[i].ID = uuid.NewString()
}
}
return
}
func checkRules(rules []data.FilterRule) error {
names := make(map[string]struct{})
for _, r := range rules {
if r.Name != filterRuleSuffixName && r.Name != filterRulePrefixName {
return errors.GetAPIError(errors.ErrFilterNameInvalid)
}
if _, ok := names[r.Name]; ok {
if r.Name == filterRuleSuffixName {
return errors.GetAPIError(errors.ErrFilterNameSuffix)
}
return errors.GetAPIError(errors.ErrFilterNamePrefix)
}
names[r.Name] = struct{}{}
}
return nil
}
func checkEvents(events []string) error {
for _, e := range events {
if _, ok := validEvents[e]; !ok {
return errors.GetAPIError(errors.ErrEventNotification)
}
}
return nil
}
func filterSubjects(conf *data.NotificationConfiguration, eventType, objName string) map[string]string {
topics := make(map[string]string)
for _, t := range conf.QueueConfigurations {
event := false
for _, e := range t.Events {
// the second condition is comparison with the events ending with *:
// s3:ObjectCreated:*, s3:ObjectRemoved:* etc without the last char
if eventType == e || strings.HasPrefix(eventType, e[:len(e)-1]) {
event = true
break
}
}
if !event {
continue
}
filter := true
for _, f := range t.Filter.Key.FilterRules {
if f.Name == filterRulePrefixName && !strings.HasPrefix(objName, f.Value) ||
f.Name == filterRuleSuffixName && !strings.HasSuffix(objName, f.Value) {
filter = false
break
}
}
if filter {
topics[t.ID] = t.QueueArn
}
}
return topics
}

View file

@ -1,115 +0,0 @@
package handler
import (
"testing"
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/data"
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/errors"
"github.com/stretchr/testify/require"
)
func TestFilterSubjects(t *testing.T) {
config := &data.NotificationConfiguration{
QueueConfigurations: []data.QueueConfiguration{
{
ID: "test1",
QueueArn: "test1",
Events: []string{EventObjectCreated, EventObjectRemovedDelete},
},
{
ID: "test2",
QueueArn: "test2",
Events: []string{EventObjectTagging},
Filter: data.Filter{Key: data.Key{FilterRules: []data.FilterRule{
{Name: "prefix", Value: "dir/"},
{Name: "suffix", Value: ".png"},
}}},
},
},
}
t.Run("no topics because suitable events not found", func(t *testing.T) {
topics := filterSubjects(config, EventObjectACLPut, "dir/a.png")
require.Empty(t, topics)
})
t.Run("no topics because of not suitable prefix", func(t *testing.T) {
topics := filterSubjects(config, EventObjectTaggingPut, "dirw/cat.png")
require.Empty(t, topics)
})
t.Run("no topics because of not suitable suffix", func(t *testing.T) {
topics := filterSubjects(config, EventObjectTaggingPut, "a.jpg")
require.Empty(t, topics)
})
t.Run("filter topics from queue configs without prefix suffix filter and exact event", func(t *testing.T) {
topics := filterSubjects(config, EventObjectCreatedPut, "dir/a.png")
require.Contains(t, topics, "test1")
require.Len(t, topics, 1)
require.Equal(t, topics["test1"], "test1")
})
t.Run("filter topics from queue configs with prefix suffix filter and '*' ending event", func(t *testing.T) {
topics := filterSubjects(config, EventObjectTaggingPut, "dir/a.png")
require.Contains(t, topics, "test2")
require.Len(t, topics, 1)
require.Equal(t, topics["test2"], "test2")
})
}
func TestCheckRules(t *testing.T) {
t.Run("correct rules with prefix and suffix", func(t *testing.T) {
rules := []data.FilterRule{
{Name: "prefix", Value: "asd"},
{Name: "suffix", Value: "asd"},
}
err := checkRules(rules)
require.NoError(t, err)
})
t.Run("correct rules with prefix", func(t *testing.T) {
rules := []data.FilterRule{
{Name: "prefix", Value: "asd"},
}
err := checkRules(rules)
require.NoError(t, err)
})
t.Run("correct rules with suffix", func(t *testing.T) {
rules := []data.FilterRule{
{Name: "suffix", Value: "asd"},
}
err := checkRules(rules)
require.NoError(t, err)
})
t.Run("incorrect rules with wrong name", func(t *testing.T) {
rules := []data.FilterRule{
{Name: "prefix", Value: "sdf"},
{Name: "sfx", Value: "asd"},
}
err := checkRules(rules)
require.ErrorIs(t, err, errors.GetAPIError(errors.ErrFilterNameInvalid))
})
t.Run("incorrect rules with repeating suffix", func(t *testing.T) {
rules := []data.FilterRule{
{Name: "suffix", Value: "asd"},
{Name: "suffix", Value: "asdf"},
{Name: "prefix", Value: "jk"},
}
err := checkRules(rules)
require.ErrorIs(t, err, errors.GetAPIError(errors.ErrFilterNameSuffix))
})
t.Run("incorrect rules with repeating prefix", func(t *testing.T) {
rules := []data.FilterRule{
{Name: "suffix", Value: "ds"},
{Name: "prefix", Value: "asd"},
{Name: "prefix", Value: "asdf"},
}
err := checkRules(rules)
require.ErrorIs(t, err, errors.GetAPIError(errors.ErrFilterNamePrefix))
})
}

View file

@ -292,16 +292,6 @@ func (h *handler) PutObjectHandler(w http.ResponseWriter, r *http.Request) {
} }
objInfo := extendedObjInfo.ObjectInfo objInfo := extendedObjInfo.ObjectInfo
s := &SendNotificationParams{
Event: EventObjectCreatedPut,
NotificationInfo: data.NotificationInfoFromObject(objInfo, h.cfg.MD5Enabled()),
BktInfo: bktInfo,
ReqInfo: reqInfo,
}
if err = h.sendNotifications(ctx, s); err != nil {
h.reqLogger(ctx).Error(logs.CouldntSendNotification, zap.Error(err))
}
if needUpdateEACLTable { if needUpdateEACLTable {
if newEaclTable, err = h.getNewEAclTable(r, bktInfo, objInfo); err != nil { if newEaclTable, err = h.getNewEAclTable(r, bktInfo, objInfo); err != nil {
h.logAndSendError(w, "could not get new eacl table", reqInfo, err) h.logAndSendError(w, "could not get new eacl table", reqInfo, err)
@ -319,7 +309,7 @@ func (h *handler) PutObjectHandler(w http.ResponseWriter, r *http.Request) {
TagSet: tagSet, TagSet: tagSet,
NodeVersion: extendedObjInfo.NodeVersion, NodeVersion: extendedObjInfo.NodeVersion,
} }
if _, err = h.obj.PutObjectTagging(r.Context(), tagPrm); err != nil { if err = h.obj.PutObjectTagging(r.Context(), tagPrm); err != nil {
h.logAndSendError(w, "could not upload object tagging", reqInfo, err) h.logAndSendError(w, "could not upload object tagging", reqInfo, err)
return return
} }
@ -560,16 +550,6 @@ func (h *handler) PostObject(w http.ResponseWriter, r *http.Request) {
} }
objInfo := extendedObjInfo.ObjectInfo objInfo := extendedObjInfo.ObjectInfo
s := &SendNotificationParams{
Event: EventObjectCreatedPost,
NotificationInfo: data.NotificationInfoFromObject(objInfo, h.cfg.MD5Enabled()),
BktInfo: bktInfo,
ReqInfo: reqInfo,
}
if err = h.sendNotifications(ctx, s); err != nil {
h.reqLogger(ctx).Error(logs.CouldntSendNotification, zap.Error(err))
}
if acl := auth.MultipartFormValue(r, "acl"); acl != "" { if acl := auth.MultipartFormValue(r, "acl"); acl != "" {
r.Header.Set(api.AmzACL, acl) r.Header.Set(api.AmzACL, acl)
r.Header.Set(api.AmzGrantFullControl, "") r.Header.Set(api.AmzGrantFullControl, "")
@ -592,7 +572,7 @@ func (h *handler) PostObject(w http.ResponseWriter, r *http.Request) {
NodeVersion: extendedObjInfo.NodeVersion, NodeVersion: extendedObjInfo.NodeVersion,
} }
if _, err = h.obj.PutObjectTagging(ctx, tagPrm); err != nil { if err = h.obj.PutObjectTagging(ctx, tagPrm); err != nil {
h.logAndSendError(w, "could not upload object tagging", reqInfo, err) h.logAndSendError(w, "could not upload object tagging", reqInfo, err)
return return
} }

View file

@ -10,8 +10,6 @@ import (
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/data" "git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/data"
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/errors" "git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/errors"
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/middleware" "git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/middleware"
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/internal/logs"
"go.uber.org/zap"
) )
const ( const (
@ -46,27 +44,12 @@ func (h *handler) PutObjectTaggingHandler(w http.ResponseWriter, r *http.Request
}, },
TagSet: tagSet, TagSet: tagSet,
} }
nodeVersion, err := h.obj.PutObjectTagging(ctx, tagPrm)
if err != nil { if err = h.obj.PutObjectTagging(ctx, tagPrm); err != nil {
h.logAndSendError(w, "could not put object tagging", reqInfo, err) h.logAndSendError(w, "could not put object tagging", reqInfo, err)
return return
} }
s := &SendNotificationParams{
Event: EventObjectTaggingPut,
NotificationInfo: &data.NotificationInfo{
Name: nodeVersion.FilePath,
Size: nodeVersion.Size,
Version: nodeVersion.OID.EncodeToString(),
HashSum: nodeVersion.ETag,
},
BktInfo: bktInfo,
ReqInfo: reqInfo,
}
if err = h.sendNotifications(ctx, s); err != nil {
h.reqLogger(ctx).Error(logs.CouldntSendNotification, zap.Error(err))
}
w.WriteHeader(http.StatusOK) w.WriteHeader(http.StatusOK)
} }
@ -123,27 +106,11 @@ func (h *handler) DeleteObjectTaggingHandler(w http.ResponseWriter, r *http.Requ
VersionID: reqInfo.URL.Query().Get(api.QueryVersionID), VersionID: reqInfo.URL.Query().Get(api.QueryVersionID),
} }
nodeVersion, err := h.obj.DeleteObjectTagging(ctx, p) if err = h.obj.DeleteObjectTagging(ctx, p); err != nil {
if err != nil {
h.logAndSendError(w, "could not delete object tagging", reqInfo, err) h.logAndSendError(w, "could not delete object tagging", reqInfo, err)
return return
} }
s := &SendNotificationParams{
Event: EventObjectTaggingDelete,
NotificationInfo: &data.NotificationInfo{
Name: nodeVersion.FilePath,
Size: nodeVersion.Size,
Version: nodeVersion.OID.EncodeToString(),
HashSum: nodeVersion.ETag,
},
BktInfo: bktInfo,
ReqInfo: reqInfo,
}
if err = h.sendNotifications(ctx, s); err != nil {
h.reqLogger(ctx).Error(logs.CouldntSendNotification, zap.Error(err))
}
w.WriteHeader(http.StatusNoContent) w.WriteHeader(http.StatusNoContent)
} }

View file

@ -58,3 +58,11 @@ func (h *handler) PutBucketLifecycleHandler(w http.ResponseWriter, r *http.Reque
func (h *handler) PutBucketEncryptionHandler(w http.ResponseWriter, r *http.Request) { func (h *handler) PutBucketEncryptionHandler(w http.ResponseWriter, r *http.Request) {
h.logAndSendError(w, "not implemented", middleware.GetReqInfo(r.Context()), errors.GetAPIError(errors.ErrNotImplemented)) h.logAndSendError(w, "not implemented", middleware.GetReqInfo(r.Context()), errors.GetAPIError(errors.ErrNotImplemented))
} }
func (h *handler) PutBucketNotificationHandler(w http.ResponseWriter, r *http.Request) {
h.logAndSendError(w, "not implemented", middleware.GetReqInfo(r.Context()), errors.GetAPIError(errors.ErrNotImplemented))
}
func (h *handler) GetBucketNotificationHandler(w http.ResponseWriter, r *http.Request) {
h.logAndSendError(w, "not implemented", middleware.GetReqInfo(r.Context()), errors.GetAPIError(errors.ErrNotImplemented))
}

View file

@ -257,24 +257,3 @@ func (c *Cache) PutCORS(owner user.ID, bkt *data.BucketInfo, cors *data.CORSConf
func (c *Cache) DeleteCORS(bktInfo *data.BucketInfo) { func (c *Cache) DeleteCORS(bktInfo *data.BucketInfo) {
c.systemCache.Delete(bktInfo.Name + bktInfo.CORSObjectName()) c.systemCache.Delete(bktInfo.Name + bktInfo.CORSObjectName())
} }
func (c *Cache) GetNotificationConfiguration(owner user.ID, bktInfo *data.BucketInfo) *data.NotificationConfiguration {
key := bktInfo.Name + bktInfo.NotificationConfigurationObjectName()
if !c.accessCache.Get(owner, key) {
return nil
}
return c.systemCache.GetNotificationConfiguration(key)
}
func (c *Cache) PutNotificationConfiguration(owner user.ID, bktInfo *data.BucketInfo, configuration *data.NotificationConfiguration) {
key := bktInfo.Name + bktInfo.NotificationConfigurationObjectName()
if err := c.systemCache.PutNotificationConfiguration(key, configuration); err != nil {
c.logger.Warn(logs.CouldntCacheNotificationConfiguration, zap.String("bucket", bktInfo.Name), zap.Error(err))
}
if err := c.accessCache.Put(owner, key); err != nil {
c.logger.Warn(logs.CouldntCacheAccessControlOperation, zap.Error(err))
}
}

View file

@ -27,23 +27,11 @@ import (
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id" oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/session" "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/session"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/user" "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/user"
"github.com/nats-io/nats.go"
"github.com/nspcc-dev/neo-go/pkg/crypto/keys" "github.com/nspcc-dev/neo-go/pkg/crypto/keys"
"go.uber.org/zap" "go.uber.org/zap"
) )
type ( type (
EventListener interface {
Subscribe(context.Context, string, MsgHandler) error
Listen(context.Context)
}
MsgHandler interface {
HandleMessage(context.Context, *nats.Msg) error
}
MsgHandlerFunc func(context.Context, *nats.Msg) error
BucketResolver interface { BucketResolver interface {
Resolve(ctx context.Context, name string) (cid.ID, error) Resolve(ctx context.Context, name string) (cid.ID, error)
} }
@ -61,7 +49,6 @@ type (
log *zap.Logger log *zap.Logger
anonKey AnonymousKey anonKey AnonymousKey
resolver BucketResolver resolver BucketResolver
ncontroller EventListener
cache *Cache cache *Cache
treeService TreeService treeService TreeService
features FeatureSettings features FeatureSettings
@ -215,7 +202,6 @@ type (
// Client provides S3 API client interface. // Client provides S3 API client interface.
Client interface { Client interface {
Initialize(ctx context.Context, c EventListener) error
EphemeralKey() *keys.PublicKey EphemeralKey() *keys.PublicKey
GetBucketSettings(ctx context.Context, bktInfo *data.BucketInfo) (*data.BucketSettings, error) GetBucketSettings(ctx context.Context, bktInfo *data.BucketInfo) (*data.BucketSettings, error)
@ -245,8 +231,8 @@ type (
DeleteBucketTagging(ctx context.Context, bktInfo *data.BucketInfo) error DeleteBucketTagging(ctx context.Context, bktInfo *data.BucketInfo) error
GetObjectTagging(ctx context.Context, p *data.GetObjectTaggingParams) (string, map[string]string, error) GetObjectTagging(ctx context.Context, p *data.GetObjectTaggingParams) (string, map[string]string, error)
PutObjectTagging(ctx context.Context, p *data.PutObjectTaggingParams) (*data.NodeVersion, error) PutObjectTagging(ctx context.Context, p *data.PutObjectTaggingParams) error
DeleteObjectTagging(ctx context.Context, p *data.ObjectVersion) (*data.NodeVersion, error) DeleteObjectTagging(ctx context.Context, p *data.ObjectVersion) error
PutObject(ctx context.Context, p *PutObjectParams) (*data.ExtendedObjectInfo, error) PutObject(ctx context.Context, p *PutObjectParams) (*data.ExtendedObjectInfo, error)
@ -266,9 +252,6 @@ type (
AbortMultipartUpload(ctx context.Context, p *UploadInfoParams) error AbortMultipartUpload(ctx context.Context, p *UploadInfoParams) error
ListParts(ctx context.Context, p *ListPartsParams) (*ListPartsInfo, error) ListParts(ctx context.Context, p *ListPartsParams) (*ListPartsInfo, error)
PutBucketNotificationConfiguration(ctx context.Context, p *PutBucketNotificationConfigurationParams) error
GetBucketNotificationConfiguration(ctx context.Context, bktInfo *data.BucketInfo) (*data.NotificationConfiguration, error)
// Compound methods for optimizations // Compound methods for optimizations
// GetObjectTaggingAndLock unifies GetObjectTagging and GetLock methods in single tree service invocation. // GetObjectTaggingAndLock unifies GetObjectTagging and GetLock methods in single tree service invocation.
@ -300,10 +283,6 @@ func (t *VersionedObject) String() string {
return t.Name + ":" + t.VersionID return t.Name + ":" + t.VersionID
} }
func (f MsgHandlerFunc) HandleMessage(ctx context.Context, msg *nats.Msg) error {
return f(ctx, msg)
}
func (p HeadObjectParams) Versioned() bool { func (p HeadObjectParams) Versioned() bool {
return len(p.VersionID) > 0 return len(p.VersionID) > 0
} }
@ -327,23 +306,6 @@ func (n *layer) EphemeralKey() *keys.PublicKey {
return n.anonKey.Key.PublicKey() return n.anonKey.Key.PublicKey()
} }
func (n *layer) Initialize(ctx context.Context, c EventListener) error {
if n.IsNotificationEnabled() {
return fmt.Errorf("already initialized")
}
// todo add notification handlers (e.g. for lifecycles)
c.Listen(ctx)
n.ncontroller = c
return nil
}
func (n *layer) IsNotificationEnabled() bool {
return n.ncontroller != nil
}
// IsAuthenticatedRequest checks if access box exists in the current request. // IsAuthenticatedRequest checks if access box exists in the current request.
func IsAuthenticatedRequest(ctx context.Context) bool { func IsAuthenticatedRequest(ctx context.Context) bool {
_, err := middleware.GetBoxData(ctx) _, err := middleware.GetBoxData(ctx)

View file

@ -1,89 +0,0 @@
package layer
import (
"bytes"
"context"
"encoding/xml"
errorsStd "errors"
"fmt"
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/data"
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/middleware"
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/internal/logs"
"go.uber.org/zap"
)
type PutBucketNotificationConfigurationParams struct {
RequestInfo *middleware.ReqInfo
BktInfo *data.BucketInfo
Configuration *data.NotificationConfiguration
CopiesNumbers []uint32
}
func (n *layer) PutBucketNotificationConfiguration(ctx context.Context, p *PutBucketNotificationConfigurationParams) error {
confXML, err := xml.Marshal(p.Configuration)
if err != nil {
return fmt.Errorf("marshal notify configuration: %w", err)
}
prm := PrmObjectCreate{
Container: p.BktInfo.CID,
Payload: bytes.NewReader(confXML),
Filepath: p.BktInfo.NotificationConfigurationObjectName(),
CreationTime: TimeNow(ctx),
CopiesNumber: p.CopiesNumbers,
}
_, objID, _, _, err := n.objectPutAndHash(ctx, prm, p.BktInfo)
if err != nil {
return err
}
objIDToDelete, err := n.treeService.PutNotificationConfigurationNode(ctx, p.BktInfo, objID)
objIDToDeleteNotFound := errorsStd.Is(err, ErrNoNodeToRemove)
if err != nil && !objIDToDeleteNotFound {
return err
}
if !objIDToDeleteNotFound {
if err = n.objectDelete(ctx, p.BktInfo, objIDToDelete); err != nil {
n.reqLogger(ctx).Error(logs.CouldntDeleteNotificationConfigurationObject, zap.Error(err),
zap.String("cid", p.BktInfo.CID.EncodeToString()),
zap.String("oid", objIDToDelete.EncodeToString()))
}
}
n.cache.PutNotificationConfiguration(n.BearerOwner(ctx), p.BktInfo, p.Configuration)
return nil
}
func (n *layer) GetBucketNotificationConfiguration(ctx context.Context, bktInfo *data.BucketInfo) (*data.NotificationConfiguration, error) {
owner := n.BearerOwner(ctx)
if conf := n.cache.GetNotificationConfiguration(owner, bktInfo); conf != nil {
return conf, nil
}
objID, err := n.treeService.GetNotificationConfigurationNode(ctx, bktInfo)
objIDNotFound := errorsStd.Is(err, ErrNodeNotFound)
if err != nil && !objIDNotFound {
return nil, err
}
conf := &data.NotificationConfiguration{}
if !objIDNotFound {
obj, err := n.objectGet(ctx, bktInfo, objID)
if err != nil {
return nil, err
}
if err = xml.Unmarshal(obj.Payload(), &conf); err != nil {
return nil, fmt.Errorf("unmarshal notify configuration: %w", err)
}
}
n.cache.PutNotificationConfiguration(owner, bktInfo, conf)
return conf, nil
}

View file

@ -50,12 +50,12 @@ func (n *layer) GetObjectTagging(ctx context.Context, p *data.GetObjectTaggingPa
return p.ObjectVersion.VersionID, tags, nil return p.ObjectVersion.VersionID, tags, nil
} }
func (n *layer) PutObjectTagging(ctx context.Context, p *data.PutObjectTaggingParams) (nodeVersion *data.NodeVersion, err error) { func (n *layer) PutObjectTagging(ctx context.Context, p *data.PutObjectTaggingParams) (err error) {
nodeVersion = p.NodeVersion nodeVersion := p.NodeVersion
if nodeVersion == nil { if nodeVersion == nil {
nodeVersion, err = n.getNodeVersionFromCacheOrFrostfs(ctx, p.ObjectVersion) nodeVersion, err = n.getNodeVersionFromCacheOrFrostfs(ctx, p.ObjectVersion)
if err != nil { if err != nil {
return nil, err return err
} }
} }
p.ObjectVersion.VersionID = nodeVersion.OID.EncodeToString() p.ObjectVersion.VersionID = nodeVersion.OID.EncodeToString()
@ -63,35 +63,35 @@ func (n *layer) PutObjectTagging(ctx context.Context, p *data.PutObjectTaggingPa
err = n.treeService.PutObjectTagging(ctx, p.ObjectVersion.BktInfo, nodeVersion, p.TagSet) err = n.treeService.PutObjectTagging(ctx, p.ObjectVersion.BktInfo, nodeVersion, p.TagSet)
if err != nil { if err != nil {
if errors.Is(err, ErrNodeNotFound) { if errors.Is(err, ErrNodeNotFound) {
return nil, fmt.Errorf("%w: %s", s3errors.GetAPIError(s3errors.ErrNoSuchKey), err.Error()) return fmt.Errorf("%w: %s", s3errors.GetAPIError(s3errors.ErrNoSuchKey), err.Error())
} }
return nil, err return err
} }
n.cache.PutTagging(n.BearerOwner(ctx), objectTaggingCacheKey(p.ObjectVersion), p.TagSet) n.cache.PutTagging(n.BearerOwner(ctx), objectTaggingCacheKey(p.ObjectVersion), p.TagSet)
return nodeVersion, nil return nil
} }
func (n *layer) DeleteObjectTagging(ctx context.Context, p *data.ObjectVersion) (*data.NodeVersion, error) { func (n *layer) DeleteObjectTagging(ctx context.Context, p *data.ObjectVersion) error {
version, err := n.getNodeVersion(ctx, p) version, err := n.getNodeVersion(ctx, p)
if err != nil { if err != nil {
return nil, err return err
} }
err = n.treeService.DeleteObjectTagging(ctx, p.BktInfo, version) err = n.treeService.DeleteObjectTagging(ctx, p.BktInfo, version)
if err != nil { if err != nil {
if errors.Is(err, ErrNodeNotFound) { if errors.Is(err, ErrNodeNotFound) {
return nil, fmt.Errorf("%w: %s", s3errors.GetAPIError(s3errors.ErrNoSuchKey), err.Error()) return fmt.Errorf("%w: %s", s3errors.GetAPIError(s3errors.ErrNoSuchKey), err.Error())
} }
return nil, err return err
} }
p.VersionID = version.OID.EncodeToString() p.VersionID = version.OID.EncodeToString()
n.cache.DeleteTagging(objectTaggingCacheKey(p)) n.cache.DeleteTagging(objectTaggingCacheKey(p))
return version, nil return nil
} }
func (n *layer) GetBucketTagging(ctx context.Context, bktInfo *data.BucketInfo) (map[string]string, error) { func (n *layer) GetBucketTagging(ctx context.Context, bktInfo *data.BucketInfo) (map[string]string, error) {

View file

@ -110,14 +110,6 @@ func (t *TreeServiceMock) GetSettingsNode(_ context.Context, bktInfo *data.Bucke
return settings, nil return settings, nil
} }
func (t *TreeServiceMock) GetNotificationConfigurationNode(context.Context, *data.BucketInfo) (oid.ID, error) {
panic("implement me")
}
func (t *TreeServiceMock) PutNotificationConfigurationNode(context.Context, *data.BucketInfo, oid.ID) (oid.ID, error) {
panic("implement me")
}
func (t *TreeServiceMock) GetBucketCORS(_ context.Context, bktInfo *data.BucketInfo) (oid.ID, error) { func (t *TreeServiceMock) GetBucketCORS(_ context.Context, bktInfo *data.BucketInfo) (oid.ID, error) {
systemMap, ok := t.system[bktInfo.CID.EncodeToString()] systemMap, ok := t.system[bktInfo.CID.EncodeToString()]
if !ok { if !ok {

View file

@ -18,17 +18,6 @@ type TreeService interface {
// If tree node is not found returns ErrNodeNotFound error. // If tree node is not found returns ErrNodeNotFound error.
GetSettingsNode(ctx context.Context, bktInfo *data.BucketInfo) (*data.BucketSettings, error) GetSettingsNode(ctx context.Context, bktInfo *data.BucketInfo) (*data.BucketSettings, error)
// GetNotificationConfigurationNode gets an object id that corresponds to object with bucket CORS.
//
// If tree node is not found returns ErrNodeNotFound error.
GetNotificationConfigurationNode(ctx context.Context, bktInfo *data.BucketInfo) (oid.ID, error)
// PutNotificationConfigurationNode puts a node to a system tree
// and returns objectID of a previous notif config which must be deleted in FrostFS.
//
// If object id to remove is not found returns ErrNoNodeToRemove error.
PutNotificationConfigurationNode(ctx context.Context, bktInfo *data.BucketInfo, objID oid.ID) (oid.ID, error)
// GetBucketCORS gets an object id that corresponds to object with bucket CORS. // GetBucketCORS gets an object id that corresponds to object with bucket CORS.
// //
// If object id is not found returns ErrNodeNotFound error. // If object id is not found returns ErrNodeNotFound error.

View file

@ -1,263 +0,0 @@
package notifications
import (
"context"
"encoding/json"
"fmt"
"sync"
"time"
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/handler"
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/layer"
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/internal/logs"
"github.com/nats-io/nats.go"
"go.uber.org/zap"
)
const (
DefaultTimeout = 30 * time.Second
// EventVersion23 is used for lifecycle, tiering, objectACL, objectTagging, object restoration notifications.
EventVersion23 = "2.3"
// EventVersion22 is used for replication notifications.
EventVersion22 = "2.2"
// EventVersion21 is used for all other notification types.
EventVersion21 = "2.1"
)
type (
Options struct {
URL string
TLSCertFilepath string
TLSAuthPrivateKeyFilePath string
Timeout time.Duration
RootCAFiles []string
}
Controller struct {
logger *zap.Logger
taskQueueConnection *nats.Conn
jsClient nats.JetStreamContext
handlers map[string]Stream
mu sync.RWMutex
}
Stream struct {
h layer.MsgHandler
ch chan *nats.Msg
}
TestEvent struct {
Service string
Event string
Time time.Time
Bucket string
RequestID string
HostID string
}
Event struct {
Records []EventRecord `json:"Records"`
}
EventRecord struct {
EventVersion string `json:"eventVersion"`
EventSource string `json:"eventSource"` // frostfs:s3
AWSRegion string `json:"awsRegion,omitempty"` // empty
EventTime time.Time `json:"eventTime"`
EventName string `json:"eventName"`
UserIdentity UserIdentity `json:"userIdentity"`
RequestParameters RequestParameters `json:"requestParameters"`
ResponseElements map[string]string `json:"responseElements"`
S3 S3Entity `json:"s3"`
}
UserIdentity struct {
PrincipalID string `json:"principalId"`
}
RequestParameters struct {
SourceIPAddress string `json:"sourceIPAddress"`
}
S3Entity struct {
SchemaVersion string `json:"s3SchemaVersion"`
ConfigurationID string `json:"configurationId,omitempty"`
Bucket Bucket `json:"bucket"`
Object Object `json:"object"`
}
Bucket struct {
Name string `json:"name"`
OwnerIdentity UserIdentity `json:"ownerIdentity,omitempty"`
Arn string `json:"arn,omitempty"`
}
Object struct {
Key string `json:"key"`
Size uint64 `json:"size,omitempty"`
VersionID string `json:"versionId,omitempty"`
ETag string `json:"eTag,omitempty"`
Sequencer string `json:"sequencer,omitempty"`
}
)
func NewController(p *Options, l *zap.Logger) (*Controller, error) {
ncopts := []nats.Option{
nats.Timeout(p.Timeout),
}
if len(p.TLSCertFilepath) != 0 && len(p.TLSAuthPrivateKeyFilePath) != 0 {
ncopts = append(ncopts, nats.ClientCert(p.TLSCertFilepath, p.TLSAuthPrivateKeyFilePath))
}
if len(p.RootCAFiles) != 0 {
ncopts = append(ncopts, nats.RootCAs(p.RootCAFiles...))
}
nc, err := nats.Connect(p.URL, ncopts...)
if err != nil {
return nil, fmt.Errorf("connect to nats: %w", err)
}
js, err := nc.JetStream()
if err != nil {
return nil, fmt.Errorf("get jet stream: %w", err)
}
return &Controller{
logger: l,
taskQueueConnection: nc,
jsClient: js,
handlers: make(map[string]Stream),
}, nil
}
func (c *Controller) Subscribe(_ context.Context, topic string, handler layer.MsgHandler) error {
ch := make(chan *nats.Msg, 1)
c.mu.RLock()
_, ok := c.handlers[topic]
c.mu.RUnlock()
if ok {
return fmt.Errorf("already subscribed to topic '%s'", topic)
}
if _, err := c.jsClient.AddStream(&nats.StreamConfig{Name: topic}); err != nil {
return fmt.Errorf("add stream: %w", err)
}
if _, err := c.jsClient.ChanSubscribe(topic, ch); err != nil {
return fmt.Errorf("could not subscribe: %w", err)
}
c.mu.Lock()
c.handlers[topic] = Stream{
h: handler,
ch: ch,
}
c.mu.Unlock()
return nil
}
func (c *Controller) Listen(ctx context.Context) {
c.mu.RLock()
defer c.mu.RUnlock()
for _, stream := range c.handlers {
go func(stream Stream) {
for {
select {
case msg := <-stream.ch:
if err := stream.h.HandleMessage(ctx, msg); err != nil {
c.logger.Error(logs.CouldNotHandleMessage, zap.Error(err))
} else if err = msg.Ack(); err != nil {
c.logger.Error(logs.CouldNotACKMessage, zap.Error(err))
}
case <-ctx.Done():
return
}
}
}(stream)
}
}
func (c *Controller) SendNotifications(topics map[string]string, p *handler.SendNotificationParams) error {
event := prepareEvent(p)
for id, topic := range topics {
event.Records[0].S3.ConfigurationID = id
msg, err := json.Marshal(event)
if err != nil {
c.logger.Error(logs.CouldntMarshalAnEvent, zap.String("subject", topic), zap.Error(err))
}
if err = c.publish(topic, msg); err != nil {
c.logger.Error(logs.CouldntSendAnEventToTopic, zap.String("subject", topic), zap.Error(err))
}
}
return nil
}
func (c *Controller) SendTestNotification(topic, bucketName, requestID, HostID string, now time.Time) error {
event := &TestEvent{
Service: "FrostFS S3",
Event: "s3:TestEvent",
Time: now,
Bucket: bucketName,
RequestID: requestID,
HostID: HostID,
}
msg, err := json.Marshal(event)
if err != nil {
return fmt.Errorf("couldn't marshal test event: %w", err)
}
return c.publish(topic, msg)
}
func prepareEvent(p *handler.SendNotificationParams) *Event {
return &Event{
Records: []EventRecord{
{
EventVersion: EventVersion21,
EventSource: "frostfs:s3",
AWSRegion: "",
EventTime: p.Time,
EventName: p.Event,
UserIdentity: UserIdentity{
PrincipalID: p.User,
},
RequestParameters: RequestParameters{
SourceIPAddress: p.ReqInfo.RemoteHost,
},
ResponseElements: nil,
S3: S3Entity{
SchemaVersion: "1.0",
// ConfigurationID is skipped and will be placed later
Bucket: Bucket{
Name: p.BktInfo.Name,
OwnerIdentity: UserIdentity{PrincipalID: p.BktInfo.Owner.String()},
Arn: p.BktInfo.Name,
},
Object: Object{
Key: p.NotificationInfo.Name,
Size: p.NotificationInfo.Size,
VersionID: p.NotificationInfo.Version,
ETag: p.NotificationInfo.HashSum,
Sequencer: "",
},
},
},
},
}
}
func (c *Controller) publish(topic string, msg []byte) error {
if _, err := c.jsClient.Publish(topic, msg); err != nil {
return fmt.Errorf("couldn't send event: %w", err)
}
return nil
}

View file

@ -24,7 +24,6 @@ import (
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/handler" "git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/handler"
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/layer" "git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/layer"
s3middleware "git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/middleware" s3middleware "git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/middleware"
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/notifications"
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/resolver" "git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/resolver"
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/creds/tokens" "git.frostfs.info/TrueCloudLab/frostfs-s3-gw/creds/tokens"
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/internal/frostfs" "git.frostfs.info/TrueCloudLab/frostfs-s3-gw/internal/frostfs"
@ -62,7 +61,6 @@ type (
pool *pool.Pool pool *pool.Pool
treePool *treepool.Pool treePool *treepool.Pool
key *keys.PrivateKey key *keys.PrivateKey
nc *notifications.Controller
obj layer.Client obj layer.Client
api api.Handler api api.Handler
@ -88,7 +86,6 @@ type (
maxClient maxClientsConfig maxClient maxClientsConfig
defaultMaxAge int defaultMaxAge int
reconnectInterval time.Duration reconnectInterval time.Duration
notificatorEnabled bool
resolveZoneList []string resolveZoneList []string
isResolveListAllow bool // True if ResolveZoneList contains allowed zones isResolveListAllow bool // True if ResolveZoneList contains allowed zones
frostfsidValidation bool frostfsidValidation bool
@ -157,13 +154,13 @@ func (a *App) init(ctx context.Context) {
a.setRuntimeParameters() a.setRuntimeParameters()
a.initFrostfsID(ctx) a.initFrostfsID(ctx)
a.initPolicyStorage(ctx) a.initPolicyStorage(ctx)
a.initAPI(ctx) a.initAPI()
a.initMetrics() a.initMetrics()
a.initServers(ctx) a.initServers(ctx)
a.initTracing(ctx) a.initTracing(ctx)
} }
func (a *App) initLayer(ctx context.Context) { func (a *App) initLayer() {
a.initResolver() a.initResolver()
// prepare random key for anonymous requests // prepare random key for anonymous requests
@ -188,18 +185,6 @@ func (a *App) initLayer(ctx context.Context) {
// prepare object layer // prepare object layer
a.obj = layer.NewLayer(a.log, frostfs.NewFrostFS(a.pool, a.key), layerCfg) a.obj = layer.NewLayer(a.log, frostfs.NewFrostFS(a.pool, a.key), layerCfg)
if a.cfg.GetBool(cfgEnableNATS) {
nopts := getNotificationsOptions(a.cfg, a.log)
a.nc, err = notifications.NewController(nopts, a.log)
if err != nil {
a.log.Fatal(logs.FailedToEnableNotifications, zap.Error(err))
}
if err = a.obj.Initialize(ctx, a.nc); err != nil {
a.log.Fatal(logs.CouldntInitializeLayer, zap.Error(err))
}
}
} }
func newAppSettings(log *Logger, v *viper.Viper) *appSettings { func newAppSettings(log *Logger, v *viper.Viper) *appSettings {
@ -208,7 +193,6 @@ func newAppSettings(log *Logger, v *viper.Viper) *appSettings {
maxClient: newMaxClients(v), maxClient: newMaxClients(v),
defaultMaxAge: fetchDefaultMaxAge(v, log.logger), defaultMaxAge: fetchDefaultMaxAge(v, log.logger),
reconnectInterval: fetchReconnectInterval(v), reconnectInterval: fetchReconnectInterval(v),
notificatorEnabled: v.GetBool(cfgEnableNATS),
frostfsidValidation: v.GetBool(cfgFrostfsIDValidationEnabled), frostfsidValidation: v.GetBool(cfgFrostfsIDValidationEnabled),
} }
@ -343,10 +327,6 @@ func (s *appSettings) DefaultMaxAge() int {
return s.defaultMaxAge return s.defaultMaxAge
} }
func (s *appSettings) NotificatorEnabled() bool {
return s.notificatorEnabled
}
func (s *appSettings) ResolveZoneList() []string { func (s *appSettings) ResolveZoneList() []string {
return s.resolveZoneList return s.resolveZoneList
} }
@ -468,8 +448,8 @@ func (s *appSettings) RetryStrategy() handler.RetryStrategy {
return s.retryStrategy return s.retryStrategy
} }
func (a *App) initAPI(ctx context.Context) { func (a *App) initAPI() {
a.initLayer(ctx) a.initLayer()
a.initHandler() a.initHandler()
} }
@ -908,17 +888,6 @@ func (a *App) stopServices() {
} }
} }
func getNotificationsOptions(v *viper.Viper, l *zap.Logger) *notifications.Options {
cfg := notifications.Options{}
cfg.URL = v.GetString(cfgNATSEndpoint)
cfg.Timeout = fetchNATSTimeout(v, l)
cfg.TLSCertFilepath = v.GetString(cfgNATSTLSCertFile)
cfg.TLSAuthPrivateKeyFilePath = v.GetString(cfgNATSAuthPrivateKeyFile)
cfg.RootCAFiles = v.GetStringSlice(cfgNATSRootCAFiles)
return &cfg
}
func getCacheOptions(v *viper.Viper, l *zap.Logger) *layer.CachesConfig { func getCacheOptions(v *viper.Viper, l *zap.Logger) *layer.CachesConfig {
cacheCfg := layer.DefaultCachesConfigs(l) cacheCfg := layer.DefaultCachesConfigs(l)
@ -976,7 +945,7 @@ func getFrostfsIDCacheConfig(v *viper.Viper, l *zap.Logger) *cache.Config {
func (a *App) initHandler() { func (a *App) initHandler() {
var err error var err error
a.api, err = handler.New(a.log, a.obj, a.nc, a.settings, a.policyStorage, a.frostfsid) a.api, err = handler.New(a.log, a.obj, a.settings, a.policyStorage, a.frostfsid)
if err != nil { if err != nil {
a.log.Fatal(logs.CouldNotInitializeAPIHandler, zap.Error(err)) a.log.Fatal(logs.CouldNotInitializeAPIHandler, zap.Error(err))
} }

View file

@ -14,7 +14,6 @@ import (
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api" "git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api"
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/handler" "git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/handler"
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/notifications"
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/resolver" "git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/resolver"
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/internal/logs" "git.frostfs.info/TrueCloudLab/frostfs-s3-gw/internal/logs"
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/internal/version" "git.frostfs.info/TrueCloudLab/frostfs-s3-gw/internal/version"
@ -120,14 +119,6 @@ const ( // Settings.
cfgAccessBoxCacheRemovingCheckInterval = "cache.accessbox.removing_check_interval" cfgAccessBoxCacheRemovingCheckInterval = "cache.accessbox.removing_check_interval"
// NATS.
cfgEnableNATS = "nats.enabled"
cfgNATSEndpoint = "nats.endpoint"
cfgNATSTimeout = "nats.timeout"
cfgNATSTLSCertFile = "nats.cert_file"
cfgNATSAuthPrivateKeyFile = "nats.key_file"
cfgNATSRootCAFiles = "nats.root_ca"
// Policy. // Policy.
cfgPolicyDefault = "placement_policy.default" cfgPolicyDefault = "placement_policy.default"
cfgPolicyRegionMapFile = "placement_policy.region_mapping" cfgPolicyRegionMapFile = "placement_policy.region_mapping"
@ -376,19 +367,6 @@ func fetchDefaultPolicy(l *zap.Logger, cfg *viper.Viper) netmap.PlacementPolicy
return policy return policy
} }
func fetchNATSTimeout(cfg *viper.Viper, l *zap.Logger) time.Duration {
timeout := cfg.GetDuration(cfgNATSTimeout)
if timeout <= 0 {
l.Error(logs.InvalidLifetimeUsingDefaultValue,
zap.String("parameter", cfgNATSTimeout),
zap.Duration("value in config", timeout),
zap.Duration("default", notifications.DefaultTimeout))
timeout = notifications.DefaultTimeout
}
return timeout
}
func fetchCacheLifetime(v *viper.Viper, l *zap.Logger, cfgEntry string, defaultValue time.Duration) time.Duration { func fetchCacheLifetime(v *viper.Viper, l *zap.Logger, cfgEntry string, defaultValue time.Duration) time.Duration {
if v.IsSet(cfgEntry) { if v.IsSet(cfgEntry) {
lifetime := v.GetDuration(cfgEntry) lifetime := v.GetDuration(cfgEntry)

View file

@ -88,7 +88,7 @@ S3_GW_CACHE_BUCKETS_SIZE=1000
# Cache which contains mapping of nice name to object addresses # Cache which contains mapping of nice name to object addresses
S3_GW_CACHE_NAMES_LIFETIME=1m S3_GW_CACHE_NAMES_LIFETIME=1m
S3_GW_CACHE_NAMES_SIZE=10000 S3_GW_CACHE_NAMES_SIZE=10000
# Cache for system objects in a bucket: bucket settings, notification configuration etc # Cache for system objects in a bucket: bucket settings etc
S3_GW_CACHE_SYSTEM_LIFETIME=5m S3_GW_CACHE_SYSTEM_LIFETIME=5m
S3_GW_CACHE_SYSTEM_SIZE=100000 S3_GW_CACHE_SYSTEM_SIZE=100000
# Cache which stores access box with tokens by its address # Cache which stores access box with tokens by its address
@ -105,14 +105,6 @@ S3_GW_CACHE_MORPH_POLICY_SIZE=10000
S3_GW_CACHE_FROSTFSID_LIFETIME=1m S3_GW_CACHE_FROSTFSID_LIFETIME=1m
S3_GW_CACHE_FROSTFSID_SIZE=10000 S3_GW_CACHE_FROSTFSID_SIZE=10000
# NATS
S3_GW_NATS_ENABLED=true
S3_GW_NATS_ENDPOINT=nats://nats.frostfs.devenv:4222
S3_GW_NATS_TIMEOUT=30s
S3_GW_NATS_CERT_FILE=/path/to/cert
S3_GW_NATS_KEY_FILE=/path/to/key
S3_GW_NATS_ROOT_CA=/path/to/ca
# Default policy of placing containers in FrostFS # Default policy of placing containers in FrostFS
# If a user sends a request `CreateBucket` and doesn't define policy for placing of a container in FrostFS, the S3 Gateway # If a user sends a request `CreateBucket` and doesn't define policy for placing of a container in FrostFS, the S3 Gateway
# will put the container with default policy. It can be specified via environment variable, e.g.: # will put the container with default policy. It can be specified via environment variable, e.g.:

View file

@ -105,7 +105,7 @@ cache:
buckets: buckets:
lifetime: 1m lifetime: 1m
size: 500 size: 500
# Cache for system objects in a bucket: bucket settings, notification configuration etc # Cache for system objects in a bucket: bucket settings etc
system: system:
lifetime: 2m lifetime: 2m
size: 1000 size: 1000
@ -127,14 +127,6 @@ cache:
lifetime: 1m lifetime: 1m
size: 10000 size: 10000
nats:
enabled: true
endpoint: nats://localhost:4222
timeout: 30s
cert_file: /path/to/cert
key_file: /path/to/key
root_ca: /path/to/ca
# Parameters of FrostFS container placement policy # Parameters of FrostFS container placement policy
placement_policy: placement_policy:
# Default policy of placing containers in FrostFS # Default policy of placing containers in FrostFS

View file

@ -177,7 +177,6 @@ There are some custom types used for brevity:
| `server` | [Server configuration](#server-section) | | `server` | [Server configuration](#server-section) |
| `logger` | [Logger configuration](#logger-section) | | `logger` | [Logger configuration](#logger-section) |
| `cache` | [Cache configuration](#cache-section) | | `cache` | [Cache configuration](#cache-section) |
| `nats` | [NATS configuration](#nats-section) |
| `cors` | [CORS configuration](#cors-section) | | `cors` | [CORS configuration](#cors-section) |
| `pprof` | [Pprof configuration](#pprof-section) | | `pprof` | [Pprof configuration](#pprof-section) |
| `prometheus` | [Prometheus configuration](#prometheus-section) | | `prometheus` | [Prometheus configuration](#prometheus-section) |
@ -411,18 +410,18 @@ cache:
size: 10000 size: 10000
``` ```
| Parameter | Type | Default value | Description | | Parameter | Type | Default value | Description |
|-----------------|-------------------------------------------------|-----------------------------------|----------------------------------------------------------------------------------------| |-----------------|-------------------------------------------------|-----------------------------------|----------------------------------------------------------------|
| `objects` | [Cache config](#cache-subsection) | `lifetime: 5m`<br>`size: 1000000` | Cache for objects (FrostFS headers). | | `objects` | [Cache config](#cache-subsection) | `lifetime: 5m`<br>`size: 1000000` | Cache for objects (FrostFS headers). |
| `list` | [Cache config](#cache-subsection) | `lifetime: 60s`<br>`size: 100000` | Cache which keeps lists of objects in buckets. | | `list` | [Cache config](#cache-subsection) | `lifetime: 60s`<br>`size: 100000` | Cache which keeps lists of objects in buckets. |
| `list_session` | [Cache config](#cache-subsection) | `lifetime: 60s`<br>`size: 100` | Cache which keeps listing session. | | `list_session` | [Cache config](#cache-subsection) | `lifetime: 60s`<br>`size: 100` | Cache which keeps listing session. |
| `names` | [Cache config](#cache-subsection) | `lifetime: 60s`<br>`size: 10000` | Cache which contains mapping of nice name to object addresses. | | `names` | [Cache config](#cache-subsection) | `lifetime: 60s`<br>`size: 10000` | Cache which contains mapping of nice name to object addresses. |
| `buckets` | [Cache config](#cache-subsection) | `lifetime: 60s`<br>`size: 1000` | Cache which contains mapping of bucket name to bucket info. | | `buckets` | [Cache config](#cache-subsection) | `lifetime: 60s`<br>`size: 1000` | Cache which contains mapping of bucket name to bucket info. |
| `system` | [Cache config](#cache-subsection) | `lifetime: 5m`<br>`size: 10000` | Cache for system objects in a bucket: bucket settings, notification configuration etc. | | `system` | [Cache config](#cache-subsection) | `lifetime: 5m`<br>`size: 10000` | Cache for system objects in a bucket: bucket settings etc. |
| `accessbox` | [Accessbox cache config](#accessbox-subsection) | `lifetime: 10m`<br>`size: 100` | Cache which stores access box with tokens by its address. | | `accessbox` | [Accessbox cache config](#accessbox-subsection) | `lifetime: 10m`<br>`size: 100` | Cache which stores access box with tokens by its address. |
| `accesscontrol` | [Cache config](#cache-subsection) | `lifetime: 1m`<br>`size: 100000` | Cache which stores owner to cache operation mapping. | | `accesscontrol` | [Cache config](#cache-subsection) | `lifetime: 1m`<br>`size: 100000` | Cache which stores owner to cache operation mapping. |
| `morph_policy` | [Cache config](#cache-subsection) | `lifetime: 1m`<br>`size: 10000` | Cache which stores list of policy chains. | | `morph_policy` | [Cache config](#cache-subsection) | `lifetime: 1m`<br>`size: 10000` | Cache which stores list of policy chains. |
| `frostfsid` | [Cache config](#cache-subsection) | `lifetime: 1m`<br>`size: 10000` | Cache which stores FrostfsID subject info. | | `frostfsid` | [Cache config](#cache-subsection) | `lifetime: 1m`<br>`size: 10000` | Cache which stores FrostfsID subject info. |
#### `cache` subsection #### `cache` subsection
@ -450,36 +449,6 @@ size: 100
| `lifetime` | `duration` | '10m' | Lifetime of entries in cache. | | `lifetime` | `duration` | '10m' | Lifetime of entries in cache. |
| `size` | `int` | '100 | LRU cache size. | | `size` | `int` | '100 | LRU cache size. |
### `nats` section
This is an advanced section, use with caution.
You can turn on notifications about successful completions of basic operations, and the gateway will send notifications
via NATS JetStream.
1. to configure the NATS server with JetStream
2. to specify NATS parameters for the S3 GW. It's ***necessary*** to define a values of `nats.enable` or
`S3_GW_NATS_ENABLED` as `True`
3. to configure notifications in a bucket
```yaml
nats:
enabled: true
endpoint: nats://localhost:4222
timeout: 30s
cert_file: /path/to/cert
key_file: /path/to/key
root_ca: /path/to/ca
```
| Parameter | Type | Default value | Description |
|---------------|------------|---------------|------------------------------------------------------|
| `enabled` | `bool` | `false` | Flag to enable the service. |
| `endpoint` | `string` | | NATS endpoint to connect to. |
| `timeout` | `duration` | `30s` | Timeout for the object notification operation. |
| `certificate` | `string` | | Path to the client certificate. |
| `key` | `string` | | Path to the client key. |
| `ca` | `string` | | Override root CA used to verify server certificates. |
### `cors` section ### `cors` section
```yaml ```yaml

View file

@ -13,6 +13,5 @@ Each node keeps one of the types of data as a set of **key-value pairs**:
Some data takes up a lot of memory, so we store it in FrostFS nodes as an object with payload. Some data takes up a lot of memory, so we store it in FrostFS nodes as an object with payload.
But we keep these objects' metadata in the Tree service too: But we keep these objects' metadata in the Tree service too:
* Notification configuration
* CORS * CORS
* Metadata of parts of active multipart uploads * Metadata of parts of active multipart uploads

4
go.mod
View file

@ -15,7 +15,6 @@ require (
github.com/go-chi/chi/v5 v5.0.8 github.com/go-chi/chi/v5 v5.0.8
github.com/google/uuid v1.3.1 github.com/google/uuid v1.3.1
github.com/minio/sio v0.3.0 github.com/minio/sio v0.3.0
github.com/nats-io/nats.go v1.13.1-0.20220121202836-972a071d373d
github.com/nspcc-dev/neo-go v0.105.0 github.com/nspcc-dev/neo-go v0.105.0
github.com/panjf2000/ants/v2 v2.5.0 github.com/panjf2000/ants/v2 v2.5.0
github.com/prometheus/client_golang v1.15.1 github.com/prometheus/client_golang v1.15.1
@ -68,9 +67,6 @@ require (
github.com/matttproud/golang_protobuf_extensions v1.0.4 // indirect github.com/matttproud/golang_protobuf_extensions v1.0.4 // indirect
github.com/mitchellh/mapstructure v1.5.0 // indirect github.com/mitchellh/mapstructure v1.5.0 // indirect
github.com/mr-tron/base58 v1.2.0 // indirect github.com/mr-tron/base58 v1.2.0 // indirect
github.com/nats-io/nats-server/v2 v2.7.1 // indirect
github.com/nats-io/nkeys v0.3.0 // indirect
github.com/nats-io/nuid v1.0.1 // indirect
github.com/nspcc-dev/go-ordered-json v0.0.0-20231123160306-3374ff1e7a3c // indirect github.com/nspcc-dev/go-ordered-json v0.0.0-20231123160306-3374ff1e7a3c // indirect
github.com/nspcc-dev/neo-go/pkg/interop v0.0.0-20231127165613-b35f351f0ba0 // indirect github.com/nspcc-dev/neo-go/pkg/interop v0.0.0-20231127165613-b35f351f0ba0 // indirect
github.com/nspcc-dev/rfc6979 v0.2.0 // indirect github.com/nspcc-dev/rfc6979 v0.2.0 // indirect

13
go.sum
View file

@ -223,7 +223,6 @@ github.com/josharian/intern v1.0.0/go.mod h1:5DoeVV0s6jJacbCEi61lwdGj/aVlrQvzHFF
github.com/jstemmer/go-junit-report v0.0.0-20190106144839-af01ea7f8024/go.mod h1:6v2b51hI/fHJwM22ozAgKL4VKDeJcHhJFhtBdhmNjmU= github.com/jstemmer/go-junit-report v0.0.0-20190106144839-af01ea7f8024/go.mod h1:6v2b51hI/fHJwM22ozAgKL4VKDeJcHhJFhtBdhmNjmU=
github.com/jstemmer/go-junit-report v0.9.1/go.mod h1:Brl9GWCQeLvo8nXZwPNNblvFj/XSXhF0NWZEnDohbsk= github.com/jstemmer/go-junit-report v0.9.1/go.mod h1:Brl9GWCQeLvo8nXZwPNNblvFj/XSXhF0NWZEnDohbsk=
github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck= github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck=
github.com/klauspost/compress v1.13.4 h1:0zhec2I8zGnjWcKyLl6i3gPqKANCCn5e9xmviEEeX6s=
github.com/kr/fs v0.1.0/go.mod h1:FFnZGqtBN9Gxj7eW1uZ42v5BccTP0vu6NEaFoC2HwRg= github.com/kr/fs v0.1.0/go.mod h1:FFnZGqtBN9Gxj7eW1uZ42v5BccTP0vu6NEaFoC2HwRg=
github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo= github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo=
github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE= github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE=
@ -236,7 +235,6 @@ github.com/mailru/easyjson v0.7.7 h1:UGYAvKxe3sBsEDzO8ZeWOSlIQfWFlxbzLZe7hwFURr0
github.com/mailru/easyjson v0.7.7/go.mod h1:xzfreul335JAWq5oZzymOObrkdz5UnU4kGfJJLY9Nlc= github.com/mailru/easyjson v0.7.7/go.mod h1:xzfreul335JAWq5oZzymOObrkdz5UnU4kGfJJLY9Nlc=
github.com/matttproud/golang_protobuf_extensions v1.0.4 h1:mmDVorXM7PCGKw94cs5zkfA9PSy5pEvNWRP0ET0TIVo= github.com/matttproud/golang_protobuf_extensions v1.0.4 h1:mmDVorXM7PCGKw94cs5zkfA9PSy5pEvNWRP0ET0TIVo=
github.com/matttproud/golang_protobuf_extensions v1.0.4/go.mod h1:BSXmuO+STAnVfrANrmjBb36TMTDstsz7MSK+HVaYKv4= github.com/matttproud/golang_protobuf_extensions v1.0.4/go.mod h1:BSXmuO+STAnVfrANrmjBb36TMTDstsz7MSK+HVaYKv4=
github.com/minio/highwayhash v1.0.1 h1:dZ6IIu8Z14VlC0VpfKofAhCy74wu/Qb5gcn52yWoz/0=
github.com/minio/sio v0.3.0 h1:syEFBewzOMOYVzSTFpp1MqpSZk8rUNbz8VIIc+PNzus= github.com/minio/sio v0.3.0 h1:syEFBewzOMOYVzSTFpp1MqpSZk8rUNbz8VIIc+PNzus=
github.com/minio/sio v0.3.0/go.mod h1:8b0yPp2avGThviy/+OCJBI6OMpvxoUuiLvE6F1lebhw= github.com/minio/sio v0.3.0/go.mod h1:8b0yPp2avGThviy/+OCJBI6OMpvxoUuiLvE6F1lebhw=
github.com/mitchellh/mapstructure v1.5.0 h1:jeMsZIYE/09sWLaz43PL7Gy6RuMjD2eJVyuac5Z2hdY= github.com/mitchellh/mapstructure v1.5.0 h1:jeMsZIYE/09sWLaz43PL7Gy6RuMjD2eJVyuac5Z2hdY=
@ -244,15 +242,6 @@ github.com/mitchellh/mapstructure v1.5.0/go.mod h1:bFUtVrKA4DC2yAKiSyO/QUcy7e+RR
github.com/mmcloughlin/addchain v0.4.0 h1:SobOdjm2xLj1KkXN5/n0xTIWyZA2+s99UCY1iPfkHRY= github.com/mmcloughlin/addchain v0.4.0 h1:SobOdjm2xLj1KkXN5/n0xTIWyZA2+s99UCY1iPfkHRY=
github.com/mr-tron/base58 v1.2.0 h1:T/HDJBh4ZCPbU39/+c3rRvE0uKBQlU27+QI8LJ4t64o= github.com/mr-tron/base58 v1.2.0 h1:T/HDJBh4ZCPbU39/+c3rRvE0uKBQlU27+QI8LJ4t64o=
github.com/mr-tron/base58 v1.2.0/go.mod h1:BinMc/sQntlIE1frQmRFPUoPA1Zkr8VRgBdjWI2mNwc= github.com/mr-tron/base58 v1.2.0/go.mod h1:BinMc/sQntlIE1frQmRFPUoPA1Zkr8VRgBdjWI2mNwc=
github.com/nats-io/jwt/v2 v2.2.1-0.20220113022732-58e87895b296 h1:vU9tpM3apjYlLLeY23zRWJ9Zktr5jp+mloR942LEOpY=
github.com/nats-io/nats-server/v2 v2.7.1 h1:SDj8R0PJPVekw3EgHxGtTfJUuMbsuaul1nwWFI3xTyk=
github.com/nats-io/nats-server/v2 v2.7.1/go.mod h1:tckmrt0M6bVaDT3kmh9UrIq/CBOBBse+TpXQi5ldaa8=
github.com/nats-io/nats.go v1.13.1-0.20220121202836-972a071d373d h1:GRSmEJutHkdoxKsRypP575IIdoXe7Bm6yHQF6GcDBnA=
github.com/nats-io/nats.go v1.13.1-0.20220121202836-972a071d373d/go.mod h1:BPko4oXsySz4aSWeFgOHLZs3G4Jq4ZAyE6/zMCxRT6w=
github.com/nats-io/nkeys v0.3.0 h1:cgM5tL53EvYRU+2YLXIK0G2mJtK12Ft9oeooSZMA2G8=
github.com/nats-io/nkeys v0.3.0/go.mod h1:gvUNGjVcM2IPr5rCsRsC6Wb3Hr2CQAm08dsxtV6A5y4=
github.com/nats-io/nuid v1.0.1 h1:5iA8DT8V7q8WK2EScv2padNa/rTESc1KdnPw4TC2paw=
github.com/nats-io/nuid v1.0.1/go.mod h1:19wcPz3Ph3q0Jbyiqsd0kePYG7A95tJPxeL+1OSON2c=
github.com/nspcc-dev/go-ordered-json v0.0.0-20231123160306-3374ff1e7a3c h1:OOQeE613BH93ICPq3eke5N78gWNeMjcBWkmD2NKyXVg= github.com/nspcc-dev/go-ordered-json v0.0.0-20231123160306-3374ff1e7a3c h1:OOQeE613BH93ICPq3eke5N78gWNeMjcBWkmD2NKyXVg=
github.com/nspcc-dev/go-ordered-json v0.0.0-20231123160306-3374ff1e7a3c/go.mod h1:79bEUDEviBHJMFV6Iq6in57FEOCMcRhfQnfaf0ETA5U= github.com/nspcc-dev/go-ordered-json v0.0.0-20231123160306-3374ff1e7a3c/go.mod h1:79bEUDEviBHJMFV6Iq6in57FEOCMcRhfQnfaf0ETA5U=
github.com/nspcc-dev/neo-go v0.105.0 h1:vtNZYFEFySK8zRDhLzQYha849VzWrcKezlnq/oNQg/w= github.com/nspcc-dev/neo-go v0.105.0 h1:vtNZYFEFySK8zRDhLzQYha849VzWrcKezlnq/oNQg/w=
@ -375,7 +364,6 @@ golang.org/x/crypto v0.0.0-20190513172903-22d7a77e9e5f/go.mod h1:yigFU9vqHzYiE8U
golang.org/x/crypto v0.0.0-20190605123033-f99c8df09eb5/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= golang.org/x/crypto v0.0.0-20190605123033-f99c8df09eb5/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
golang.org/x/crypto v0.0.0-20210314154223-e6e6c4f2bb5b/go.mod h1:T9bdIzuCu7OtxOm1hfPfRQxPLYneinmdGuTeoZ9dtd4=
golang.org/x/crypto v0.0.0-20210421170649-83a5a9bb288b/go.mod h1:T9bdIzuCu7OtxOm1hfPfRQxPLYneinmdGuTeoZ9dtd4= golang.org/x/crypto v0.0.0-20210421170649-83a5a9bb288b/go.mod h1:T9bdIzuCu7OtxOm1hfPfRQxPLYneinmdGuTeoZ9dtd4=
golang.org/x/crypto v0.0.0-20211108221036-ceb1ce70b4fa/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc= golang.org/x/crypto v0.0.0-20211108221036-ceb1ce70b4fa/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc=
golang.org/x/crypto v0.21.0 h1:X31++rzVUdKhX5sWmSOFZxx8UW/ldWx55cbf08iNAMA= golang.org/x/crypto v0.21.0 h1:X31++rzVUdKhX5sWmSOFZxx8UW/ldWx55cbf08iNAMA=
@ -541,7 +529,6 @@ golang.org/x/text v0.14.0/go.mod h1:18ZOQIKpY8NJVqYksKHtTdi31H5itFRjB5/qKTNYzSU=
golang.org/x/time v0.0.0-20181108054448-85acf8d2951c/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= golang.org/x/time v0.0.0-20181108054448-85acf8d2951c/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
golang.org/x/time v0.0.0-20190308202827-9d24e82272b4/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= golang.org/x/time v0.0.0-20190308202827-9d24e82272b4/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
golang.org/x/time v0.0.0-20191024005414-555d28b269f0/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= golang.org/x/time v0.0.0-20191024005414-555d28b269f0/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
golang.org/x/time v0.1.0 h1:xYY+Bajn2a7VBmTM5GikTmnK8ZuX8YgnQCqZpbBNtmA=
golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
golang.org/x/tools v0.0.0-20190114222345-bf090417da8b/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= golang.org/x/tools v0.0.0-20190114222345-bf090417da8b/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
golang.org/x/tools v0.0.0-20190226205152-f727befe758c/go.mod h1:9Yl7xja0Znq3iFh3HoIrodX9oNMXvdceNzlUR8zjMvY= golang.org/x/tools v0.0.0-20190226205152-f727befe758c/go.mod h1:9Yl7xja0Znq3iFh3HoIrodX9oNMXvdceNzlUR8zjMvY=

View file

@ -54,16 +54,12 @@ const (
InvalidCacheEntryType = "invalid cache entry type" // Warn in ../../api/cache/* InvalidCacheEntryType = "invalid cache entry type" // Warn in ../../api/cache/*
InvalidCacheKeyType = "invalid cache key type" // Warn in ../../api/cache/objectslist.go InvalidCacheKeyType = "invalid cache key type" // Warn in ../../api/cache/objectslist.go
ObjectIsCopied = "object is copied" // Info in ../../api/handler/copy.go ObjectIsCopied = "object is copied" // Info in ../../api/handler/copy.go
CouldntSendNotification = "couldn't send notification: %w" // Error in ../../api/handler/*
FailedToSendTestEventBecauseNotificationsIsDisabled = "failed to send test event because notifications is disabled" // Warn in ../../api/handler/notifications.go
RequestFailed = "request failed" // Error in ../../api/handler/util.go RequestFailed = "request failed" // Error in ../../api/handler/util.go
GetBucketInfo = "get bucket info" // Warn in ../../api/handler/cors.go GetBucketInfo = "get bucket info" // Warn in ../../api/handler/cors.go
GetBucketCors = "get bucket cors" // Warn in ../../api/handler/cors.go GetBucketCors = "get bucket cors" // Warn in ../../api/handler/cors.go
SomeACLNotFullyMapped = "some acl not fully mapped" // Warn in ../../api/handler/acl.go SomeACLNotFullyMapped = "some acl not fully mapped" // Warn in ../../api/handler/acl.go
CouldntDeleteObject = "couldn't delete object" // Error in ../../api/layer/layer.go CouldntDeleteObject = "couldn't delete object" // Error in ../../api/layer/layer.go
NotificatorIsDisabledS3WontProduceNotificationEvents = "notificator is disabled, s3 won't produce notification events" // Warn in ../../api/handler/api.go
BucketIsCreated = "bucket is created" // Info in ../../api/handler/put.go BucketIsCreated = "bucket is created" // Info in ../../api/handler/put.go
CouldntDeleteNotificationConfigurationObject = "couldn't delete notification configuration object" // Error in ../../api/layer/notifications.go
CouldNotParseContainerObjectLockEnabledAttribute = "could not parse container object lock enabled attribute" // Error in ../../api/layer/container.go CouldNotParseContainerObjectLockEnabledAttribute = "could not parse container object lock enabled attribute" // Error in ../../api/layer/container.go
CouldNotListUserContainers = "could not list user containers" // Error in ../../api/layer/container.go CouldNotListUserContainers = "could not list user containers" // Error in ../../api/layer/container.go
CouldNotFetchContainerInfo = "could not fetch container info" // Error in ../../api/layer/container.go CouldNotFetchContainerInfo = "could not fetch container info" // Error in ../../api/layer/container.go
@ -93,7 +89,6 @@ const (
CouldntCacheLockInfo = "couldn't cache lock info" // Error in ../../api/layer/cache.go CouldntCacheLockInfo = "couldn't cache lock info" // Error in ../../api/layer/cache.go
CouldntCacheBucketSettings = "couldn't cache bucket settings" // Warn in ../../api/layer/cache.go CouldntCacheBucketSettings = "couldn't cache bucket settings" // Warn in ../../api/layer/cache.go
CouldntCacheCors = "couldn't cache cors" // Warn in ../../api/layer/cache.go CouldntCacheCors = "couldn't cache cors" // Warn in ../../api/layer/cache.go
CouldntCacheNotificationConfiguration = "couldn't cache notification configuration" // Warn in ../../api/layer/cache.go
CouldntCacheListPolicyChains = "couldn't cache list policy chains" // Warn in ../../api/layer/cache.go CouldntCacheListPolicyChains = "couldn't cache list policy chains" // Warn in ../../api/layer/cache.go
RequestEnd = "request end" // Info in ../../api/middleware/response.go RequestEnd = "request end" // Info in ../../api/middleware/response.go
CouldntReceiveAccessBoxForGateKeyRandomKeyWillBeUsed = "couldn't receive access box for gate key, random key will be used" // Debug in ../../api/middleware/auth.go CouldntReceiveAccessBoxForGateKeyRandomKeyWillBeUsed = "couldn't receive access box for gate key, random key will be used" // Debug in ../../api/middleware/auth.go
@ -101,15 +96,9 @@ const (
FailedToResolveCID = "failed to resolve CID" // Debug in ../../api/middleware/metrics.go FailedToResolveCID = "failed to resolve CID" // Debug in ../../api/middleware/metrics.go
RequestStart = "request start" // Info in ../../api/middleware/reqinfo.go RequestStart = "request start" // Info in ../../api/middleware/reqinfo.go
FailedToUnescapeObjectName = "failed to unescape object name" // Warn in ../../api/middleware/reqinfo.go FailedToUnescapeObjectName = "failed to unescape object name" // Warn in ../../api/middleware/reqinfo.go
CouldNotHandleMessage = "could not handle message" // Error in ../../api/notifications/controller.go
CouldNotACKMessage = "could not ACK message" // Error in ../../api/notifications/controller.go
CouldntMarshalAnEvent = "couldn't marshal an event" // Error in ../../api/notifications/controller.go
CouldntSendAnEventToTopic = "couldn't send an event to topic" // Error in ../../api/notifications/controller.go
InvalidDefaultMaxAge = "invalid defaultMaxAge" // Fatal in ../../cmd/s3-gw/app_settings.go InvalidDefaultMaxAge = "invalid defaultMaxAge" // Fatal in ../../cmd/s3-gw/app_settings.go
CantShutDownService = "can't shut down service" // Panic in ../../cmd/s3-gw/service.go CantShutDownService = "can't shut down service" // Panic in ../../cmd/s3-gw/service.go
CouldntGenerateRandomKey = "couldn't generate random key" // Fatal in ../../cmd/s3-gw/app.go CouldntGenerateRandomKey = "couldn't generate random key" // Fatal in ../../cmd/s3-gw/app.go
FailedToEnableNotifications = "failed to enable notifications" // Fatal in ../../cmd/s3-gw/app.go
CouldntInitializeLayer = "couldn't initialize layer" // Fatal in ../../cmd/s3-gw/app.go
FailedToCreateResolver = "failed to create resolver" // Fatal in ../../cmd/s3-gw/app.go FailedToCreateResolver = "failed to create resolver" // Fatal in ../../cmd/s3-gw/app.go
CouldNotLoadFrostFSPrivateKey = "could not load FrostFS private key" // Fatal in ../../cmd/s3-gw/app.go CouldNotLoadFrostFSPrivateKey = "could not load FrostFS private key" // Fatal in ../../cmd/s3-gw/app.go
FailedToCreateConnectionPool = "failed to create connection pool" // Fatal in ../../cmd/s3-gw/app.go FailedToCreateConnectionPool = "failed to create connection pool" // Fatal in ../../cmd/s3-gw/app.go

View file

@ -108,7 +108,6 @@ const (
createdKV = "Created" createdKV = "Created"
settingsFileName = "bucket-settings" settingsFileName = "bucket-settings"
notifConfFileName = "bucket-notifications"
corsFilename = "bucket-cors" corsFilename = "bucket-cors"
bucketTaggingFilename = "bucket-tagging" bucketTaggingFilename = "bucket-tagging"
@ -116,7 +115,7 @@ const (
versionTree = "version" versionTree = "version"
// systemTree -- ID of a tree with system objects // systemTree -- ID of a tree with system objects
// i.e. bucket settings with versioning and lock configuration, cors, notifications. // i.e. bucket settings with versioning and lock configuration, cors.
systemTree = "system" systemTree = "system"
separator = "/" separator = "/"
@ -400,36 +399,6 @@ func (c *Tree) PutSettingsNode(ctx context.Context, bktInfo *data.BucketInfo, se
return c.service.MoveNode(ctx, bktInfo, systemTree, node.ID, 0, meta) return c.service.MoveNode(ctx, bktInfo, systemTree, node.ID, 0, meta)
} }
func (c *Tree) GetNotificationConfigurationNode(ctx context.Context, bktInfo *data.BucketInfo) (oid.ID, error) {
node, err := c.getSystemNode(ctx, bktInfo, []string{notifConfFileName}, []string{oidKV})
if err != nil {
return oid.ID{}, err
}
return node.ObjID, nil
}
func (c *Tree) PutNotificationConfigurationNode(ctx context.Context, bktInfo *data.BucketInfo, objID oid.ID) (oid.ID, error) {
node, err := c.getSystemNode(ctx, bktInfo, []string{notifConfFileName}, []string{oidKV})
isErrNotFound := errors.Is(err, layer.ErrNodeNotFound)
if err != nil && !isErrNotFound {
return oid.ID{}, fmt.Errorf("couldn't get node: %w", err)
}
meta := make(map[string]string)
meta[FileNameKey] = notifConfFileName
meta[oidKV] = objID.EncodeToString()
if isErrNotFound {
if _, err = c.service.AddNode(ctx, bktInfo, systemTree, 0, meta); err != nil {
return oid.ID{}, err
}
return oid.ID{}, layer.ErrNoNodeToRemove
}
return node.ObjID, c.service.MoveNode(ctx, bktInfo, systemTree, node.ID, 0, meta)
}
func (c *Tree) GetBucketCORS(ctx context.Context, bktInfo *data.BucketInfo) (oid.ID, error) { func (c *Tree) GetBucketCORS(ctx context.Context, bktInfo *data.BucketInfo) (oid.ID, error) {
node, err := c.getSystemNode(ctx, bktInfo, []string{corsFilename}, []string{oidKV}) node, err := c.getSystemNode(ctx, bktInfo, []string{corsFilename}, []string{oidKV})
if err != nil { if err != nil {