[#391] Refactor notifications

Signed-off-by: Denis Kirillov <denis@nspcc.ru>
This commit is contained in:
Denis Kirillov 2022-04-29 16:08:22 +03:00 committed by Alex Vanin
parent ea8e1b3b19
commit 94caa2247e
11 changed files with 279 additions and 278 deletions

View file

@ -11,15 +11,22 @@ import (
type ( type (
handler struct { handler struct {
log *zap.Logger log *zap.Logger
obj layer.Client obj layer.Client
cfg *Config notificator Notificator
cfg *Config
}
Notificator interface {
SendNotifications(topics map[string]string, p *SendNotificationParams) error
SendTestNotification(topic, bucketName, requestID, HostID string) error
} }
// Config contains data which handler needs to keep. // Config contains data which handler needs to keep.
Config struct { Config struct {
DefaultPolicy *netmap.PlacementPolicy DefaultPolicy *netmap.PlacementPolicy
DefaultMaxAge int DefaultMaxAge int
NotificatorEnabled bool
} }
) )
@ -29,7 +36,7 @@ const DefaultPolicy = "REP 3"
var _ api.Handler = (*handler)(nil) var _ api.Handler = (*handler)(nil)
// New creates new api.Handler using given logger and client. // New creates new api.Handler using given logger and client.
func New(log *zap.Logger, obj layer.Client, cfg *Config) (api.Handler, error) { func New(log *zap.Logger, obj layer.Client, notificator Notificator, cfg *Config) (api.Handler, error) {
switch { switch {
case obj == nil: case obj == nil:
return nil, errors.New("empty NeoFS Object Layer") return nil, errors.New("empty NeoFS Object Layer")
@ -37,9 +44,14 @@ func New(log *zap.Logger, obj layer.Client, cfg *Config) (api.Handler, error) {
return nil, errors.New("empty logger") return nil, errors.New("empty logger")
} }
if cfg.NotificatorEnabled && notificator == nil {
return nil, errors.New("empty notificator")
}
return &handler{ return &handler{
log: log, log: log,
obj: obj, obj: obj,
cfg: cfg, cfg: cfg,
notificator: notificator,
}, nil }, nil
} }

View file

@ -134,13 +134,13 @@ func (h *handler) CopyObjectHandler(w http.ResponseWriter, r *http.Request) {
zap.String("object", info.Name), zap.String("object", info.Name),
zap.Stringer("object_id", info.ID)) zap.Stringer("object_id", info.ID))
s := &layer.SendNotificationParams{ s := &SendNotificationParams{
Event: layer.EventObjectCreatedCopy, Event: EventObjectCreatedCopy,
ObjInfo: info, ObjInfo: info,
BktInfo: dstBktInfo, BktInfo: dstBktInfo,
ReqInfo: reqInfo, ReqInfo: reqInfo,
} }
if err := h.obj.SendNotifications(r.Context(), s); err != nil { if err = h.sendNotifications(r.Context(), s); err != nil {
h.log.Error("couldn't send notification: %w", zap.Error(err)) h.log.Error("couldn't send notification: %w", zap.Error(err))
} }
} }

View file

@ -97,11 +97,11 @@ func (h *handler) DeleteObjectHandler(w http.ResponseWriter, r *http.Request) {
zap.Error(err)) zap.Error(err))
} }
var m *layer.SendNotificationParams var m *SendNotificationParams
if bktSettings.VersioningEnabled && len(versionID) == 0 { if bktSettings.VersioningEnabled && len(versionID) == 0 {
m = &layer.SendNotificationParams{ m = &SendNotificationParams{
Event: layer.EventObjectRemovedDeleteMarkerCreated, Event: EventObjectRemovedDeleteMarkerCreated,
ObjInfo: &data.ObjectInfo{ ObjInfo: &data.ObjectInfo{
Name: reqInfo.ObjectName, Name: reqInfo.ObjectName,
HashSum: deletedObject.DeleteMarkerEtag, HashSum: deletedObject.DeleteMarkerEtag,
@ -117,8 +117,8 @@ func (h *handler) DeleteObjectHandler(w http.ResponseWriter, r *http.Request) {
} }
} }
m = &layer.SendNotificationParams{ m = &SendNotificationParams{
Event: layer.EventObjectRemovedDelete, Event: EventObjectRemovedDelete,
ObjInfo: &data.ObjectInfo{ ObjInfo: &data.ObjectInfo{
Name: reqInfo.ObjectName, Name: reqInfo.ObjectName,
ID: &objID, ID: &objID,
@ -128,7 +128,7 @@ func (h *handler) DeleteObjectHandler(w http.ResponseWriter, r *http.Request) {
} }
} }
if err := h.obj.SendNotifications(r.Context(), m); err != nil { if err = h.sendNotifications(r.Context(), m); err != nil {
h.log.Error("couldn't send notification: %w", zap.Error(err)) h.log.Error("couldn't send notification: %w", zap.Error(err))
} }

View file

@ -436,13 +436,13 @@ func (h *handler) CompleteMultipartUploadHandler(w http.ResponseWriter, r *http.
} }
} }
s := &layer.SendNotificationParams{ s := &SendNotificationParams{
Event: layer.EventObjectCreatedCompleteMultipartUpload, Event: EventObjectCreatedCompleteMultipartUpload,
ObjInfo: objInfo, ObjInfo: objInfo,
BktInfo: bktInfo, BktInfo: bktInfo,
ReqInfo: reqInfo, ReqInfo: reqInfo,
} }
if err := h.obj.SendNotifications(r.Context(), s); err != nil { if err = h.sendNotifications(r.Context(), s); err != nil {
h.log.Error("couldn't send notification: %w", zap.Error(err)) h.log.Error("couldn't send notification: %w", zap.Error(err))
} }

View file

@ -1,17 +1,95 @@
package handler package handler
import ( import (
"context"
"encoding/xml" "encoding/xml"
"net/http" "net/http"
"strings"
"github.com/google/uuid"
"github.com/nspcc-dev/neofs-s3-gw/api" "github.com/nspcc-dev/neofs-s3-gw/api"
"github.com/nspcc-dev/neofs-s3-gw/api/data" "github.com/nspcc-dev/neofs-s3-gw/api/data"
"github.com/nspcc-dev/neofs-s3-gw/api/errors"
"github.com/nspcc-dev/neofs-s3-gw/api/layer" "github.com/nspcc-dev/neofs-s3-gw/api/layer"
"go.uber.org/zap"
) )
type NotificationConfiguration struct { type (
XMLName xml.Name `xml:"http://s3.amazonaws.com/doc/2006-03-01/ NotificationConfiguation"` SendNotificationParams struct {
NotificationConfiguration data.NotificationConfiguration Event string
ObjInfo *data.ObjectInfo
BktInfo *data.BucketInfo
ReqInfo *api.ReqInfo
User string
}
NotificationConfiguration struct {
XMLName xml.Name `xml:"http://s3.amazonaws.com/doc/2006-03-01/ NotificationConfiguation"`
NotificationConfiguration data.NotificationConfiguration
}
)
const (
filterRuleSuffixName = "suffix"
filterRulePrefixName = "prefix"
EventObjectCreated = "s3:ObjectCreated:*"
EventObjectCreatedPut = "s3:ObjectCreated:Put"
EventObjectCreatedPost = "s3:ObjectCreated:Post"
EventObjectCreatedCopy = "s3:ObjectCreated:Copy"
EventReducedRedundancyLostObject = "s3:ReducedRedundancyLostObject"
EventObjectCreatedCompleteMultipartUpload = "s3:ObjectCreated:CompleteMultipartUpload"
EventObjectRemoved = "s3:ObjectRemoved:*"
EventObjectRemovedDelete = "s3:ObjectRemoved:Delete"
EventObjectRemovedDeleteMarkerCreated = "s3:ObjectRemoved:DeleteMarkerCreated"
EventObjectRestore = "s3:ObjectRestore:*"
EventObjectRestorePost = "s3:ObjectRestore:Post"
EventObjectRestoreCompleted = "s3:ObjectRestore:Completed"
EventReplication = "s3:Replication:*"
EventReplicationOperationFailedReplication = "s3:Replication:OperationFailedReplication"
EventReplicationOperationNotTracked = "s3:Replication:OperationNotTracked"
EventReplicationOperationMissedThreshold = "s3:Replication:OperationMissedThreshold"
EventReplicationOperationReplicatedAfterThreshold = "s3:Replication:OperationReplicatedAfterThreshold"
EventObjectRestoreDelete = "s3:ObjectRestore:Delete"
EventLifecycleTransition = "s3:LifecycleTransition"
EventIntelligentTiering = "s3:IntelligentTiering"
EventObjectACLPut = "s3:ObjectAcl:Put"
EventLifecycleExpiration = "s3:LifecycleExpiration:*"
EventLifecycleExpirationDelete = "s3:LifecycleExpiration:Delete"
EventLifecycleExpirationDeleteMarkerCreated = "s3:LifecycleExpiration:DeleteMarkerCreated"
EventObjectTagging = "s3:ObjectTagging:*"
EventObjectTaggingPut = "s3:ObjectTagging:Put"
EventObjectTaggingDelete = "s3:ObjectTagging:Delete"
)
var validEvents = map[string]struct{}{
EventReducedRedundancyLostObject: {},
EventObjectCreated: {},
EventObjectCreatedPut: {},
EventObjectCreatedPost: {},
EventObjectCreatedCopy: {},
EventObjectCreatedCompleteMultipartUpload: {},
EventObjectRemoved: {},
EventObjectRemovedDelete: {},
EventObjectRemovedDeleteMarkerCreated: {},
EventObjectRestore: {},
EventObjectRestorePost: {},
EventObjectRestoreCompleted: {},
EventReplication: {},
EventReplicationOperationFailedReplication: {},
EventReplicationOperationNotTracked: {},
EventReplicationOperationMissedThreshold: {},
EventReplicationOperationReplicatedAfterThreshold: {},
EventObjectRestoreDelete: {},
EventLifecycleTransition: {},
EventIntelligentTiering: {},
EventObjectACLPut: {},
EventLifecycleExpiration: {},
EventLifecycleExpirationDelete: {},
EventLifecycleExpirationDeleteMarkerCreated: {},
EventObjectTagging: {},
EventObjectTaggingPut: {},
EventObjectTaggingDelete: {},
} }
func (h *handler) PutBucketNotificationHandler(w http.ResponseWriter, r *http.Request) { func (h *handler) PutBucketNotificationHandler(w http.ResponseWriter, r *http.Request) {
@ -22,13 +100,24 @@ func (h *handler) PutBucketNotificationHandler(w http.ResponseWriter, r *http.Re
return return
} }
p := &layer.PutBucketNotificationConfigurationParams{ conf := &data.NotificationConfiguration{}
RequestInfo: reqInfo, if err = xml.NewDecoder(r.Body).Decode(conf); err != nil {
BktInfo: bktInfo, h.logAndSendError(w, "couldn't decode notification configuration", reqInfo, errors.GetAPIError(errors.ErrMalformedXML))
Reader: r.Body, return
} }
if err := h.obj.PutBucketNotificationConfiguration(r.Context(), p); err != nil { if _, err = h.checkBucketConfiguration(conf, reqInfo); err != nil {
h.logAndSendError(w, "couldn't check bucket configuration", reqInfo, err)
return
}
p := &layer.PutBucketNotificationConfigurationParams{
RequestInfo: reqInfo,
BktInfo: bktInfo,
Configuration: conf,
}
if err = h.obj.PutBucketNotificationConfiguration(r.Context(), p); err != nil {
h.logAndSendError(w, "couldn't put bucket configuration", reqInfo, err) h.logAndSendError(w, "couldn't put bucket configuration", reqInfo, err)
return return
} }
@ -54,3 +143,127 @@ func (h *handler) GetBucketNotificationHandler(w http.ResponseWriter, r *http.Re
return return
} }
} }
func (h *handler) sendNotifications(ctx context.Context, p *SendNotificationParams) error {
if !h.cfg.NotificatorEnabled {
h.log.Debug("could not send notification because notificator is disabled", zap.String("event", p.Event))
return nil
}
conf, err := h.obj.GetBucketNotificationConfiguration(ctx, p.BktInfo)
if err != nil {
return err
}
if conf.IsEmpty() {
return nil
}
box, err := layer.GetBoxData(ctx)
if err == nil {
p.User = box.Gate.BearerToken.OwnerID().String()
}
topics := filterSubjects(conf, p.Event, p.ObjInfo.Name)
return h.notificator.SendNotifications(topics, p)
}
// checkBucketConfiguration checks notification configuration and generates an ID for configurations with empty ids.
func (h *handler) checkBucketConfiguration(conf *data.NotificationConfiguration, r *api.ReqInfo) (completed bool, err error) {
if conf == nil {
return
}
if conf.TopicConfigurations != nil || conf.LambdaFunctionConfigurations != nil {
return completed, errors.GetAPIError(errors.ErrNotificationTopicNotSupported)
}
for i, q := range conf.QueueConfigurations {
if err = checkEvents(q.Events); err != nil {
return
}
if err = checkRules(q.Filter.Key.FilterRules); err != nil {
return
}
if h.cfg.NotificatorEnabled {
if err = h.notificator.SendTestNotification(q.QueueArn, r.BucketName, r.RequestID, r.Host); err != nil {
return
}
} else {
h.log.Warn("failed to send test event because notifications is disabled")
}
if q.ID == "" {
completed = true
conf.QueueConfigurations[i].ID = uuid.NewString()
}
}
return
}
func checkRules(rules []data.FilterRule) error {
names := make(map[string]struct{})
for _, r := range rules {
if r.Name != filterRuleSuffixName && r.Name != filterRulePrefixName {
return errors.GetAPIError(errors.ErrFilterNameInvalid)
}
if _, ok := names[r.Name]; ok {
if r.Name == filterRuleSuffixName {
return errors.GetAPIError(errors.ErrFilterNameSuffix)
}
return errors.GetAPIError(errors.ErrFilterNamePrefix)
}
names[r.Name] = struct{}{}
}
return nil
}
func checkEvents(events []string) error {
for _, e := range events {
if _, ok := validEvents[e]; !ok {
return errors.GetAPIError(errors.ErrEventNotification)
}
}
return nil
}
func filterSubjects(conf *data.NotificationConfiguration, eventType, objName string) map[string]string {
topics := make(map[string]string)
for _, t := range conf.QueueConfigurations {
event := false
for _, e := range t.Events {
// the second condition is comparison with the events ending with *:
// s3:ObjectCreated:*, s3:ObjectRemoved:* etc without the last char
if eventType == e || strings.HasPrefix(eventType, e[:len(e)-1]) {
event = true
break
}
}
if !event {
continue
}
filter := true
for _, f := range t.Filter.Key.FilterRules {
if f.Name == filterRulePrefixName && !strings.HasPrefix(objName, f.Value) ||
f.Name == filterRuleSuffixName && !strings.HasSuffix(objName, f.Value) {
filter = false
break
}
}
if filter {
topics[t.ID] = t.QueueArn
}
}
return topics
}

View file

@ -1,4 +1,4 @@
package layer package handler
import ( import (
"testing" "testing"

View file

@ -237,13 +237,13 @@ func (h *handler) PutObjectHandler(w http.ResponseWriter, r *http.Request) {
return return
} }
s := &layer.SendNotificationParams{ s := &SendNotificationParams{
Event: layer.EventObjectCreatedPut, Event: EventObjectCreatedPut,
ObjInfo: info, ObjInfo: info,
BktInfo: bktInfo, BktInfo: bktInfo,
ReqInfo: reqInfo, ReqInfo: reqInfo,
} }
if err := h.obj.SendNotifications(r.Context(), s); err != nil { if err = h.sendNotifications(r.Context(), s); err != nil {
h.log.Error("couldn't send notification: %w", zap.Error(err)) h.log.Error("couldn't send notification: %w", zap.Error(err))
} }
@ -354,13 +354,13 @@ func (h *handler) PostObject(w http.ResponseWriter, r *http.Request) {
return return
} }
s := &layer.SendNotificationParams{ s := &SendNotificationParams{
Event: layer.EventObjectCreatedPost, Event: EventObjectCreatedPost,
ObjInfo: info, ObjInfo: info,
BktInfo: bktInfo, BktInfo: bktInfo,
ReqInfo: reqInfo, ReqInfo: reqInfo,
} }
if err := h.obj.SendNotifications(r.Context(), s); err != nil { if err = h.sendNotifications(r.Context(), s); err != nil {
h.log.Error("couldn't send notification: %w", zap.Error(err)) h.log.Error("couldn't send notification: %w", zap.Error(err))
} }

View file

@ -28,11 +28,9 @@ import (
) )
type ( type (
Notificator interface { EventListener interface {
Subscribe(context.Context, string, MsgHandler) error Subscribe(context.Context, string, MsgHandler) error
Listen(context.Context) Listen(context.Context)
SendNotifications(topics map[string]string, p *SendNotificationParams) error
SendTestNotification(topic, bucketName, requestID, HostID string) error
} }
MsgHandler interface { MsgHandler interface {
@ -46,7 +44,7 @@ type (
log *zap.Logger log *zap.Logger
anonKey AnonymousKey anonKey AnonymousKey
resolver *resolver.BucketResolver resolver *resolver.BucketResolver
ncontroller Notificator ncontroller EventListener
listsCache *cache.ObjectsListCache listsCache *cache.ObjectsListCache
objCache *cache.ObjectsCache objCache *cache.ObjectsCache
namesCache *cache.ObjectsNameCache namesCache *cache.ObjectsNameCache
@ -192,7 +190,7 @@ type (
// Client provides S3 API client interface. // Client provides S3 API client interface.
Client interface { Client interface {
Initialize(ctx context.Context, c Notificator) error Initialize(ctx context.Context, c EventListener) error
EphemeralKey() *keys.PublicKey EphemeralKey() *keys.PublicKey
GetBucketSettings(ctx context.Context, bktInfo *data.BucketInfo) (*data.BucketSettings, error) GetBucketSettings(ctx context.Context, bktInfo *data.BucketInfo) (*data.BucketSettings, error)
@ -241,8 +239,6 @@ type (
PutBucketNotificationConfiguration(ctx context.Context, p *PutBucketNotificationConfigurationParams) error PutBucketNotificationConfiguration(ctx context.Context, p *PutBucketNotificationConfigurationParams) error
GetBucketNotificationConfiguration(ctx context.Context, bktInfo *data.BucketInfo) (*data.NotificationConfiguration, error) GetBucketNotificationConfiguration(ctx context.Context, bktInfo *data.BucketInfo) (*data.NotificationConfiguration, error)
SendNotifications(ctx context.Context, p *SendNotificationParams) error
} }
) )
@ -290,7 +286,7 @@ func (n *layer) EphemeralKey() *keys.PublicKey {
return n.anonKey.Key.PublicKey() return n.anonKey.Key.PublicKey()
} }
func (n *layer) Initialize(ctx context.Context, c Notificator) error { func (n *layer) Initialize(ctx context.Context, c EventListener) error {
if n.IsNotificationEnabled() { if n.IsNotificationEnabled() {
return fmt.Errorf("already initialized") return fmt.Errorf("already initialized")
} }

View file

@ -4,130 +4,30 @@ import (
"bytes" "bytes"
"context" "context"
"encoding/xml" "encoding/xml"
"io"
"strings"
"github.com/google/uuid"
"github.com/nspcc-dev/neofs-s3-gw/api" "github.com/nspcc-dev/neofs-s3-gw/api"
"github.com/nspcc-dev/neofs-s3-gw/api/data" "github.com/nspcc-dev/neofs-s3-gw/api/data"
"github.com/nspcc-dev/neofs-s3-gw/api/errors" "github.com/nspcc-dev/neofs-s3-gw/api/errors"
"go.uber.org/zap" "go.uber.org/zap"
) )
type ( type PutBucketNotificationConfigurationParams struct {
PutBucketNotificationConfigurationParams struct { RequestInfo *api.ReqInfo
RequestInfo *api.ReqInfo BktInfo *data.BucketInfo
BktInfo *data.BucketInfo Configuration *data.NotificationConfiguration
Reader io.Reader
}
SendNotificationParams struct {
Event string
ObjInfo *data.ObjectInfo
BktInfo *data.BucketInfo
ReqInfo *api.ReqInfo
User string
}
)
const (
filterRuleSuffixName = "suffix"
filterRulePrefixName = "prefix"
EventObjectCreated = "s3:ObjectCreated:*"
EventObjectCreatedPut = "s3:ObjectCreated:Put"
EventObjectCreatedPost = "s3:ObjectCreated:Post"
EventObjectCreatedCopy = "s3:ObjectCreated:Copy"
EventReducedRedundancyLostObject = "s3:ReducedRedundancyLostObject"
EventObjectCreatedCompleteMultipartUpload = "s3:ObjectCreated:CompleteMultipartUpload"
EventObjectRemoved = "s3:ObjectRemoved:*"
EventObjectRemovedDelete = "s3:ObjectRemoved:Delete"
EventObjectRemovedDeleteMarkerCreated = "s3:ObjectRemoved:DeleteMarkerCreated"
EventObjectRestore = "s3:ObjectRestore:*"
EventObjectRestorePost = "s3:ObjectRestore:Post"
EventObjectRestoreCompleted = "s3:ObjectRestore:Completed"
EventReplication = "s3:Replication:*"
EventReplicationOperationFailedReplication = "s3:Replication:OperationFailedReplication"
EventReplicationOperationNotTracked = "s3:Replication:OperationNotTracked"
EventReplicationOperationMissedThreshold = "s3:Replication:OperationMissedThreshold"
EventReplicationOperationReplicatedAfterThreshold = "s3:Replication:OperationReplicatedAfterThreshold"
EventObjectRestoreDelete = "s3:ObjectRestore:Delete"
EventLifecycleTransition = "s3:LifecycleTransition"
EventIntelligentTiering = "s3:IntelligentTiering"
EventObjectACLPut = "s3:ObjectAcl:Put"
EventLifecycleExpiration = "s3:LifecycleExpiration:*"
EventLifecycleExpirationDelete = "s3:LifecycleExpiration:Delete"
EventLifecycleExpirationDeleteMarkerCreated = "s3:LifecycleExpiration:DeleteMarkerCreated"
EventObjectTagging = "s3:ObjectTagging:*"
EventObjectTaggingPut = "s3:ObjectTagging:Put"
EventObjectTaggingDelete = "s3:ObjectTagging:Delete"
)
var validEvents = map[string]struct{}{
EventReducedRedundancyLostObject: {},
EventObjectCreated: {},
EventObjectCreatedPut: {},
EventObjectCreatedPost: {},
EventObjectCreatedCopy: {},
EventObjectCreatedCompleteMultipartUpload: {},
EventObjectRemoved: {},
EventObjectRemovedDelete: {},
EventObjectRemovedDeleteMarkerCreated: {},
EventObjectRestore: {},
EventObjectRestorePost: {},
EventObjectRestoreCompleted: {},
EventReplication: {},
EventReplicationOperationFailedReplication: {},
EventReplicationOperationNotTracked: {},
EventReplicationOperationMissedThreshold: {},
EventReplicationOperationReplicatedAfterThreshold: {},
EventObjectRestoreDelete: {},
EventLifecycleTransition: {},
EventIntelligentTiering: {},
EventObjectACLPut: {},
EventLifecycleExpiration: {},
EventLifecycleExpirationDelete: {},
EventLifecycleExpirationDeleteMarkerCreated: {},
EventObjectTagging: {},
EventObjectTaggingPut: {},
EventObjectTaggingDelete: {},
} }
func (n *layer) PutBucketNotificationConfiguration(ctx context.Context, p *PutBucketNotificationConfigurationParams) error { func (n *layer) PutBucketNotificationConfiguration(ctx context.Context, p *PutBucketNotificationConfigurationParams) error {
if !n.IsNotificationEnabled() { confXML, err := xml.Marshal(p.Configuration)
return errors.GetAPIError(errors.ErrNotificationNotEnabled) if err != nil {
}
var (
buf bytes.Buffer
tee = io.TeeReader(p.Reader, &buf)
conf = &data.NotificationConfiguration{}
completed bool
err error
)
if err = xml.NewDecoder(tee).Decode(conf); err != nil {
return errors.GetAPIError(errors.ErrMalformedXML)
}
if completed, err = n.checkBucketConfiguration(conf, p.RequestInfo); err != nil {
return err return err
} }
if completed {
confXML, err := xml.Marshal(conf)
if err != nil {
return err
}
buf.Reset()
buf.Write(confXML)
}
s := &PutSystemObjectParams{ s := &PutSystemObjectParams{
BktInfo: p.BktInfo, BktInfo: p.BktInfo,
ObjName: p.BktInfo.NotificationConfigurationObjectName(), ObjName: p.BktInfo.NotificationConfigurationObjectName(),
Metadata: map[string]string{}, Metadata: map[string]string{},
Prefix: "", Reader: bytes.NewReader(confXML),
Reader: &buf,
} }
obj, err := n.putSystemObjectIntoNeoFS(ctx, s) obj, err := n.putSystemObjectIntoNeoFS(ctx, s)
@ -135,11 +35,11 @@ func (n *layer) PutBucketNotificationConfiguration(ctx context.Context, p *PutBu
return err return err
} }
if obj.Size == 0 && !conf.IsEmpty() { if obj.Size == 0 && !p.Configuration.IsEmpty() {
return errors.GetAPIError(errors.ErrInternalError) return errors.GetAPIError(errors.ErrInternalError)
} }
if err = n.systemCache.PutNotificationConfiguration(systemObjectKey(p.BktInfo, s.ObjName), conf); err != nil { if err = n.systemCache.PutNotificationConfiguration(systemObjectKey(p.BktInfo, s.ObjName), p.Configuration); err != nil {
n.log.Error("couldn't cache system object", zap.Error(err)) n.log.Error("couldn't cache system object", zap.Error(err))
} }
@ -147,9 +47,6 @@ func (n *layer) PutBucketNotificationConfiguration(ctx context.Context, p *PutBu
} }
func (n *layer) GetBucketNotificationConfiguration(ctx context.Context, bktInfo *data.BucketInfo) (*data.NotificationConfiguration, error) { func (n *layer) GetBucketNotificationConfiguration(ctx context.Context, bktInfo *data.BucketInfo) (*data.NotificationConfiguration, error) {
if !n.IsNotificationEnabled() {
return nil, errors.GetAPIError(errors.ErrNotificationNotEnabled)
}
conf, err := n.getNotificationConf(ctx, bktInfo, bktInfo.NotificationConfigurationObjectName()) conf, err := n.getNotificationConf(ctx, bktInfo, bktInfo.NotificationConfigurationObjectName())
if err != nil { if err != nil {
if errors.IsS3Error(err, errors.ErrNoSuchKey) { if errors.IsS3Error(err, errors.ErrNoSuchKey) {
@ -187,122 +84,3 @@ func (n *layer) getNotificationConf(ctx context.Context, bkt *data.BucketInfo, s
return conf, nil return conf, nil
} }
func (n *layer) SendNotifications(ctx context.Context, p *SendNotificationParams) error {
if !n.IsNotificationEnabled() {
return nil
}
conf, err := n.getNotificationConf(ctx, p.BktInfo, p.BktInfo.NotificationConfigurationObjectName())
if err != nil {
return err
}
if conf.IsEmpty() {
return nil
}
box, err := GetBoxData(ctx)
if err == nil {
p.User = box.Gate.BearerToken.OwnerID().String()
}
topics := filterSubjects(conf, p.Event, p.ObjInfo.Name)
return n.ncontroller.SendNotifications(topics, p)
}
// checkBucketConfiguration checks notification configuration and generates an ID for configurations with empty ids.
func (n *layer) checkBucketConfiguration(conf *data.NotificationConfiguration, r *api.ReqInfo) (completed bool, err error) {
if conf == nil {
return
}
if conf.TopicConfigurations != nil || conf.LambdaFunctionConfigurations != nil {
return completed, errors.GetAPIError(errors.ErrNotificationTopicNotSupported)
}
for i, q := range conf.QueueConfigurations {
if err = checkEvents(q.Events); err != nil {
return
}
if err = checkRules(q.Filter.Key.FilterRules); err != nil {
return
}
if err = n.ncontroller.SendTestNotification(q.QueueArn, r.BucketName, r.RequestID, r.Host); err != nil {
return
}
if q.ID == "" {
completed = true
conf.QueueConfigurations[i].ID = uuid.NewString()
}
}
return
}
func filterSubjects(conf *data.NotificationConfiguration, eventType, objName string) map[string]string {
topics := make(map[string]string)
for _, t := range conf.QueueConfigurations {
event := false
for _, e := range t.Events {
// the second condition is comparison with the events ending with *:
// s3:ObjectCreated:*, s3:ObjectRemoved:* etc without the last char
if eventType == e || strings.HasPrefix(eventType, e[:len(e)-1]) {
event = true
break
}
}
if !event {
continue
}
filter := true
for _, f := range t.Filter.Key.FilterRules {
if f.Name == filterRulePrefixName && !strings.HasPrefix(objName, f.Value) ||
f.Name == filterRuleSuffixName && !strings.HasSuffix(objName, f.Value) {
filter = false
break
}
}
if filter {
topics[t.ID] = t.QueueArn
}
}
return topics
}
func checkRules(rules []data.FilterRule) error {
names := make(map[string]struct{})
for _, r := range rules {
if r.Name != filterRuleSuffixName && r.Name != filterRulePrefixName {
return errors.GetAPIError(errors.ErrFilterNameInvalid)
}
if _, ok := names[r.Name]; ok {
if r.Name == filterRuleSuffixName {
return errors.GetAPIError(errors.ErrFilterNameSuffix)
}
return errors.GetAPIError(errors.ErrFilterNamePrefix)
}
names[r.Name] = struct{}{}
}
return nil
}
func checkEvents(events []string) error {
for _, e := range events {
if _, ok := validEvents[e]; !ok {
return errors.GetAPIError(errors.ErrEventNotification)
}
}
return nil
}

View file

@ -8,6 +8,7 @@ import (
"time" "time"
"github.com/nats-io/nats.go" "github.com/nats-io/nats.go"
"github.com/nspcc-dev/neofs-s3-gw/api/handler"
"github.com/nspcc-dev/neofs-s3-gw/api/layer" "github.com/nspcc-dev/neofs-s3-gw/api/layer"
"go.uber.org/zap" "go.uber.org/zap"
) )
@ -179,7 +180,7 @@ func (c *Controller) Listen(ctx context.Context) {
} }
} }
func (c *Controller) SendNotifications(topics map[string]string, p *layer.SendNotificationParams) error { func (c *Controller) SendNotifications(topics map[string]string, p *handler.SendNotificationParams) error {
event := prepareEvent(p) event := prepareEvent(p)
for id, topic := range topics { for id, topic := range topics {
@ -214,7 +215,7 @@ func (c *Controller) SendTestNotification(topic, bucketName, requestID, HostID s
return c.publish(topic, msg) return c.publish(topic, msg)
} }
func prepareEvent(p *layer.SendNotificationParams) *Event { func prepareEvent(p *handler.SendNotificationParams) *Event {
return &Event{ return &Event{
Records: []EventRecord{ Records: []EventRecord{
{ {

View file

@ -170,7 +170,7 @@ func newApp(ctx context.Context, l *zap.Logger, v *viper.Viper) *App {
ctr = auth.New(neofs.NewAuthmateNeoFS(conns), key, getAccessBoxCacheConfig(v, l)) ctr = auth.New(neofs.NewAuthmateNeoFS(conns), key, getAccessBoxCacheConfig(v, l))
handlerOptions := getHandlerOptions(v, l) handlerOptions := getHandlerOptions(v, l)
if caller, err = handler.New(l, obj, handlerOptions); err != nil { if caller, err = handler.New(l, obj, nc, handlerOptions); err != nil {
l.Fatal("could not initialize API handler", zap.Error(err)) l.Fatal("could not initialize API handler", zap.Error(err))
} }
@ -386,6 +386,7 @@ func getHandlerOptions(v *viper.Viper, l *zap.Logger) *handler.Config {
} }
cfg.DefaultMaxAge = defaultMaxAge cfg.DefaultMaxAge = defaultMaxAge
cfg.NotificatorEnabled = v.GetBool(cfgEnableNATS)
return &cfg return &cfg
} }