[#357] Add events and sending of events

Signed-off-by: Angira Kekteeva <kira@nspcc.ru>
This commit is contained in:
Angira Kekteeva 2022-03-31 10:13:21 +04:00 committed by Alex Vanin
parent 40e7dbf768
commit 371eb2feda
3 changed files with 184 additions and 0 deletions

View file

@ -32,6 +32,7 @@ type (
Notificator interface { Notificator interface {
Subscribe(context.Context, string, MsgHandler) error Subscribe(context.Context, string, MsgHandler) error
Listen(context.Context) Listen(context.Context)
SendNotifications(topics map[string]string, p *SendNotificationParams) error
SendTestNotification(topic, bucketName, requestID, HostID string) error SendTestNotification(topic, bucketName, requestID, HostID string) error
} }
@ -234,6 +235,8 @@ type (
PutBucketNotificationConfiguration(ctx context.Context, p *PutBucketNotificationConfigurationParams) error PutBucketNotificationConfiguration(ctx context.Context, p *PutBucketNotificationConfigurationParams) error
GetBucketNotificationConfiguration(ctx context.Context, bktInfo *data.BucketInfo) (*data.NotificationConfiguration, error) GetBucketNotificationConfiguration(ctx context.Context, bktInfo *data.BucketInfo) (*data.NotificationConfiguration, error)
SendNotifications(ctx context.Context, p *SendNotificationParams) error
} }
) )

View file

@ -20,6 +20,14 @@ type (
BktInfo *data.BucketInfo BktInfo *data.BucketInfo
Reader io.Reader Reader io.Reader
} }
SendNotificationParams struct {
Event string
ObjInfo *data.ObjectInfo
BktInfo *data.BucketInfo
ReqInfo *api.ReqInfo
User string
}
) )
const ( const (
@ -179,6 +187,29 @@ func (n *layer) getNotificationConf(ctx context.Context, bkt *data.BucketInfo, s
return conf, nil return conf, nil
} }
func (n *layer) SendNotifications(ctx context.Context, p *SendNotificationParams) error {
if !n.IsNotificationEnabled() {
return nil
}
conf, err := n.getNotificationConf(ctx, p.BktInfo, p.BktInfo.NotificationConfigurationObjectName())
if err != nil {
return err
}
if conf.IsEmpty() {
return nil
}
box, err := GetBoxData(ctx)
if err == nil {
p.User = box.Gate.BearerToken.OwnerID().String()
}
topics := filterSubjects(conf, p.Event, p.ObjInfo.Name)
return n.ncontroller.SendNotifications(topics, p)
}
// checkBucketConfiguration checks notification configuration and generates ID for configurations with empty ids. // checkBucketConfiguration checks notification configuration and generates ID for configurations with empty ids.
func (n *layer) checkBucketConfiguration(conf *data.NotificationConfiguration, r *api.ReqInfo) (completed bool, err error) { func (n *layer) checkBucketConfiguration(conf *data.NotificationConfiguration, r *api.ReqInfo) (completed bool, err error) {
if conf == nil { if conf == nil {
@ -211,6 +242,40 @@ func (n *layer) checkBucketConfiguration(conf *data.NotificationConfiguration, r
return return
} }
func filterSubjects(conf *data.NotificationConfiguration, eventType, objName string) map[string]string {
topics := make(map[string]string)
for _, t := range conf.QueueConfigurations {
event := false
for _, e := range t.Events {
// the second condition is comparison with events ending with *:
// s3:ObjectCreated:*, s3:ObjectRemoved:* etc without the last char
if eventType == e || strings.HasPrefix(eventType, e[:len(e)-1]) {
event = true
break
}
}
if !event {
continue
}
filter := true
for _, f := range t.Filter.Key.FilterRules {
if f.Name == filterRulePrefixName && !strings.HasPrefix(objName, f.Value) ||
f.Name == filterRuleSuffixName && !strings.HasSuffix(objName, f.Value) {
filter = false
break
}
}
if filter {
topics[t.ID] = t.QueueArn
}
}
return topics
}
func checkRules(rules []data.FilterRule) error { func checkRules(rules []data.FilterRule) error {
names := make(map[string]struct{}) names := make(map[string]struct{})

View file

@ -8,12 +8,21 @@ import (
"time" "time"
"github.com/nats-io/nats.go" "github.com/nats-io/nats.go"
"github.com/nspcc-dev/neofs-s3-gw/api"
"github.com/nspcc-dev/neofs-s3-gw/api/data"
"github.com/nspcc-dev/neofs-s3-gw/api/layer" "github.com/nspcc-dev/neofs-s3-gw/api/layer"
"go.uber.org/zap" "go.uber.org/zap"
) )
const ( const (
DefaultTimeout = 30 * time.Second DefaultTimeout = 30 * time.Second
// EventVersion23 is used for lifecycle, tiering, objectACL, objectTagging, object restoration notifications.
EventVersion23 = "2.3"
// EventVersion22 is used for replication notifications.
EventVersion22 = "2.2"
// EventVersion21 is used for all other notification types.
EventVersion21 = "2.1"
) )
type ( type (
@ -46,6 +55,51 @@ type (
RequestID string RequestID string
HostID string HostID string
} }
Event struct {
Records []EventRecord `json:"Records"`
}
EventRecord struct {
EventVersion string `json:"eventVersion"`
EventSource string `json:"eventSource"` // neofs:s3
AWSRegion string `json:"awsRegion,omitempty"` // empty
EventTime time.Time `json:"eventTime"`
EventName string `json:"eventName"`
UserIdentity UserIdentity `json:"userIdentity"`
RequestParameters RequestParameters `json:"requestParameters"`
ResponseElements map[string]string `json:"responseElements"`
S3 S3Entity `json:"s3"`
}
UserIdentity struct {
PrincipalID string `json:"principalId"`
}
RequestParameters struct {
SourceIPAddress string `json:"sourceIPAddress"`
}
S3Entity struct {
SchemaVersion string `json:"s3SchemaVersion"`
ConfigurationID string `json:"configurationId,omitempty"`
Bucket Bucket `json:"bucket"`
Object Object `json:"object"`
}
Bucket struct {
Name string `json:"name"`
OwnerIdentity UserIdentity `json:"ownerIdentity,omitempty"`
Arn string `json:"arn,omitempty"`
}
Object struct {
Key string `json:"key"`
Size int64 `json:"size,omitempty"`
VersionID string `json:"versionId,omitempty"`
ETag string `json:"eTag,omitempty"`
Sequencer string `json:"sequencer,omitempty"`
}
) )
func NewController(p *Options, l *zap.Logger) (*Controller, error) { func NewController(p *Options, l *zap.Logger) (*Controller, error) {
@ -127,6 +181,23 @@ func (c *Controller) Listen(ctx context.Context) {
} }
} }
func (c *Controller) SendNotifications(topics map[string]string, p *layer.SendNotificationParams) error {
event := prepareEvent(p.Event, p.User, p.BktInfo, p.ObjInfo, p.ReqInfo)
for id, topic := range topics {
event.Records[0].S3.ConfigurationID = id
msg, err := json.Marshal(event)
if err != nil {
c.logger.Error("couldn't marshal an event", zap.String("subject", topic), zap.Error(err))
}
if err = c.publish(topic, msg); err != nil {
c.logger.Error("couldn't send an event to topic", zap.String("subject", topic), zap.Error(err))
}
}
return nil
}
func (c *Controller) SendTestNotification(topic, bucketName, requestID, HostID string) error { func (c *Controller) SendTestNotification(topic, bucketName, requestID, HostID string) error {
event := &TestEvent{ event := &TestEvent{
Service: "NeoFS S3", Service: "NeoFS S3",
@ -144,3 +215,48 @@ func (c *Controller) SendTestNotification(topic, bucketName, requestID, HostID s
return c.publish(topic, msg) return c.publish(topic, msg)
} }
func prepareEvent(eventName string, initiatorID string, bkt *data.BucketInfo, obj *data.ObjectInfo, reqInfo *api.ReqInfo) *Event {
return &Event{
Records: []EventRecord{
{
EventVersion: EventVersion21,
EventSource: "neofs:s3",
AWSRegion: "",
EventTime: time.Now(),
EventName: eventName,
UserIdentity: UserIdentity{
PrincipalID: initiatorID,
},
RequestParameters: RequestParameters{
SourceIPAddress: reqInfo.RemoteHost,
},
ResponseElements: nil,
S3: S3Entity{
SchemaVersion: "1.0",
// ConfigurationID is skipped and will be placed later
Bucket: Bucket{
Name: bkt.Name,
OwnerIdentity: UserIdentity{PrincipalID: bkt.Owner.String()},
Arn: bkt.Name,
},
Object: Object{
Key: obj.Name,
Size: obj.Size,
VersionID: obj.Version(),
ETag: obj.HashSum,
Sequencer: "",
},
},
},
},
}
}
func (c *Controller) publish(topic string, msg []byte) error {
if _, err := c.jsClient.Publish(topic, msg); err != nil {
return fmt.Errorf("couldn't send event: %w", err)
}
return nil
}