forked from TrueCloudLab/frostfs-s3-gw
[#401] Drop notifications
Signed-off-by: Denis Kirillov <d.kirillov@yadro.com>
This commit is contained in:
parent
2b04fcb5ec
commit
9432782ce6
33 changed files with 66 additions and 1282 deletions
|
@ -24,6 +24,7 @@ This document outlines major changes between releases.
|
|||
|
||||
### Removed
|
||||
- Remove control api (#406)
|
||||
- Remove notifications (#401)
|
||||
|
||||
## [0.29.0] - Zemu - 2024-05-27
|
||||
|
||||
|
|
18
api/cache/cache_test.go
vendored
18
api/cache/cache_test.go
vendored
|
@ -182,24 +182,6 @@ func TestSettingsCacheType(t *testing.T) {
|
|||
assertInvalidCacheEntry(t, cache.GetSettings(key), observedLog)
|
||||
}
|
||||
|
||||
func TestNotificationConfigurationCacheType(t *testing.T) {
|
||||
logger, observedLog := getObservedLogger()
|
||||
cache := NewSystemCache(DefaultSystemConfig(logger))
|
||||
|
||||
key := "key"
|
||||
notificationConfig := &data.NotificationConfiguration{}
|
||||
|
||||
err := cache.PutNotificationConfiguration(key, notificationConfig)
|
||||
require.NoError(t, err)
|
||||
val := cache.GetNotificationConfiguration(key)
|
||||
require.Equal(t, notificationConfig, val)
|
||||
require.Equal(t, 0, observedLog.Len())
|
||||
|
||||
err = cache.cache.Set(key, "tmp")
|
||||
require.NoError(t, err)
|
||||
assertInvalidCacheEntry(t, cache.GetNotificationConfiguration(key), observedLog)
|
||||
}
|
||||
|
||||
func TestFrostFSIDSubjectCacheType(t *testing.T) {
|
||||
logger, observedLog := getObservedLogger()
|
||||
cache := NewFrostfsIDCache(DefaultFrostfsIDConfig(logger))
|
||||
|
|
20
api/cache/system.go
vendored
20
api/cache/system.go
vendored
|
@ -104,22 +104,6 @@ func (o *SystemCache) GetSettings(key string) *data.BucketSettings {
|
|||
return result
|
||||
}
|
||||
|
||||
func (o *SystemCache) GetNotificationConfiguration(key string) *data.NotificationConfiguration {
|
||||
entry, err := o.cache.Get(key)
|
||||
if err != nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
result, ok := entry.(*data.NotificationConfiguration)
|
||||
if !ok {
|
||||
o.logger.Warn(logs.InvalidCacheEntryType, zap.String("actual", fmt.Sprintf("%T", entry)),
|
||||
zap.String("expected", fmt.Sprintf("%T", result)))
|
||||
return nil
|
||||
}
|
||||
|
||||
return result
|
||||
}
|
||||
|
||||
// GetTagging returns tags of a bucket or an object.
|
||||
func (o *SystemCache) GetTagging(key string) map[string]string {
|
||||
entry, err := o.cache.Get(key)
|
||||
|
@ -153,10 +137,6 @@ func (o *SystemCache) PutSettings(key string, settings *data.BucketSettings) err
|
|||
return o.cache.Set(key, settings)
|
||||
}
|
||||
|
||||
func (o *SystemCache) PutNotificationConfiguration(key string, obj *data.NotificationConfiguration) error {
|
||||
return o.cache.Set(key, obj)
|
||||
}
|
||||
|
||||
// PutTagging puts tags of a bucket or an object.
|
||||
func (o *SystemCache) PutTagging(key string, tagSet map[string]string) error {
|
||||
return o.cache.Set(key, tagSet)
|
||||
|
|
|
@ -12,9 +12,8 @@ import (
|
|||
)
|
||||
|
||||
const (
|
||||
bktSettingsObject = ".s3-settings"
|
||||
bktCORSConfigurationObject = ".s3-cors"
|
||||
bktNotificationConfigurationObject = ".s3-notifications"
|
||||
bktSettingsObject = ".s3-settings"
|
||||
bktCORSConfigurationObject = ".s3-cors"
|
||||
|
||||
VersioningUnversioned = "Unversioned"
|
||||
VersioningEnabled = "Enabled"
|
||||
|
@ -52,14 +51,6 @@ type (
|
|||
Headers map[string]string
|
||||
}
|
||||
|
||||
// NotificationInfo store info to send s3 notification.
|
||||
NotificationInfo struct {
|
||||
Name string
|
||||
Version string
|
||||
Size uint64
|
||||
HashSum string
|
||||
}
|
||||
|
||||
// BucketSettings stores settings such as versioning.
|
||||
BucketSettings struct {
|
||||
Versioning string
|
||||
|
@ -93,26 +84,12 @@ type (
|
|||
}
|
||||
)
|
||||
|
||||
// NotificationInfoFromObject creates new NotificationInfo from ObjectInfo.
|
||||
func NotificationInfoFromObject(objInfo *ObjectInfo, md5Enabled bool) *NotificationInfo {
|
||||
return &NotificationInfo{
|
||||
Name: objInfo.Name,
|
||||
Version: objInfo.VersionID(),
|
||||
Size: objInfo.Size,
|
||||
HashSum: Quote(objInfo.ETag(md5Enabled)),
|
||||
}
|
||||
}
|
||||
|
||||
// SettingsObjectName is a system name for a bucket settings file.
|
||||
func (b *BucketInfo) SettingsObjectName() string { return bktSettingsObject }
|
||||
|
||||
// CORSObjectName returns a system name for a bucket CORS configuration file.
|
||||
func (b *BucketInfo) CORSObjectName() string { return bktCORSConfigurationObject }
|
||||
|
||||
func (b *BucketInfo) NotificationConfigurationObjectName() string {
|
||||
return bktNotificationConfigurationObject
|
||||
}
|
||||
|
||||
// VersionID returns object version from ObjectInfo.
|
||||
func (o *ObjectInfo) VersionID() string { return o.ID.EncodeToString() }
|
||||
|
||||
|
|
|
@ -1,42 +0,0 @@
|
|||
package data
|
||||
|
||||
import "encoding/xml"
|
||||
|
||||
type (
|
||||
NotificationConfiguration struct {
|
||||
XMLName xml.Name `xml:"http://s3.amazonaws.com/doc/2006-03-01/ NotificationConfiguration" json:"-"`
|
||||
QueueConfigurations []QueueConfiguration `xml:"QueueConfiguration" json:"QueueConfigurations"`
|
||||
// Not supported topics
|
||||
TopicConfigurations []TopicConfiguration `xml:"TopicConfiguration" json:"TopicConfigurations"`
|
||||
LambdaFunctionConfigurations []LambdaFunctionConfiguration `xml:"CloudFunctionConfiguration" json:"CloudFunctionConfigurations"`
|
||||
}
|
||||
|
||||
QueueConfiguration struct {
|
||||
ID string `xml:"Id" json:"Id"`
|
||||
QueueArn string `xml:"Queue" json:"Queue"`
|
||||
Events []string `xml:"Event" json:"Events"`
|
||||
Filter Filter `xml:"Filter" json:"Filter"`
|
||||
}
|
||||
|
||||
Filter struct {
|
||||
Key Key `xml:"S3Key" json:"S3Key"`
|
||||
}
|
||||
|
||||
Key struct {
|
||||
FilterRules []FilterRule `xml:"FilterRule" json:"FilterRules"`
|
||||
}
|
||||
|
||||
FilterRule struct {
|
||||
Name string `xml:"Name" json:"Name"`
|
||||
Value string `xml:"Value" json:"Value"`
|
||||
}
|
||||
|
||||
// TopicConfiguration and LambdaFunctionConfiguration -- we don't support these configurations,
|
||||
// but we need them to detect in notification configurations in requests.
|
||||
TopicConfiguration struct{}
|
||||
LambdaFunctionConfiguration struct{}
|
||||
)
|
||||
|
||||
func (n NotificationConfiguration) IsEmpty() bool {
|
||||
return len(n.QueueConfigurations) == 0 && len(n.TopicConfigurations) == 0 && len(n.LambdaFunctionConfigurations) == 0
|
||||
}
|
|
@ -616,22 +616,11 @@ func (h *handler) PutObjectACLHandler(w http.ResponseWriter, r *http.Request) {
|
|||
return
|
||||
}
|
||||
|
||||
updated, err := h.updateBucketACL(r, astObject, bktInfo, token)
|
||||
if err != nil {
|
||||
if _, err = h.updateBucketACL(r, astObject, bktInfo, token); err != nil {
|
||||
h.logAndSendError(w, "could not update bucket acl", reqInfo, err)
|
||||
return
|
||||
}
|
||||
if updated {
|
||||
s := &SendNotificationParams{
|
||||
Event: EventObjectACLPut,
|
||||
NotificationInfo: data.NotificationInfoFromObject(objInfo, h.cfg.MD5Enabled()),
|
||||
BktInfo: bktInfo,
|
||||
ReqInfo: reqInfo,
|
||||
}
|
||||
if err = h.sendNotifications(ctx, s); err != nil {
|
||||
h.reqLogger(ctx).Error(logs.CouldntSendNotification, zap.Error(err))
|
||||
}
|
||||
}
|
||||
|
||||
w.WriteHeader(http.StatusOK)
|
||||
}
|
||||
|
||||
|
|
|
@ -11,7 +11,6 @@ import (
|
|||
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/layer"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/internal/logs"
|
||||
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/netmap"
|
||||
"git.frostfs.info/TrueCloudLab/policy-engine/pkg/chain"
|
||||
|
@ -20,17 +19,11 @@ import (
|
|||
|
||||
type (
|
||||
handler struct {
|
||||
log *zap.Logger
|
||||
obj layer.Client
|
||||
notificator Notificator
|
||||
cfg Config
|
||||
ape APE
|
||||
frostfsid FrostFSID
|
||||
}
|
||||
|
||||
Notificator interface {
|
||||
SendNotifications(topics map[string]string, p *SendNotificationParams) error
|
||||
SendTestNotification(topic, bucketName, requestID, HostID string, now time.Time) error
|
||||
log *zap.Logger
|
||||
obj layer.Client
|
||||
cfg Config
|
||||
ape APE
|
||||
frostfsid FrostFSID
|
||||
}
|
||||
|
||||
// Config contains data which handler needs to keep.
|
||||
|
@ -41,7 +34,6 @@ type (
|
|||
DefaultCopiesNumbers(namespace string) []uint32
|
||||
NewXMLDecoder(io.Reader) *xml.Decoder
|
||||
DefaultMaxAge() int
|
||||
NotificatorEnabled() bool
|
||||
ResolveZoneList() []string
|
||||
IsResolveListAllow() bool
|
||||
BypassContentEncodingInChunks() bool
|
||||
|
@ -76,7 +68,7 @@ const (
|
|||
var _ api.Handler = (*handler)(nil)
|
||||
|
||||
// New creates new api.Handler using given logger and client.
|
||||
func New(log *zap.Logger, obj layer.Client, notificator Notificator, cfg Config, storage APE, ffsid FrostFSID) (api.Handler, error) {
|
||||
func New(log *zap.Logger, obj layer.Client, cfg Config, storage APE, ffsid FrostFSID) (api.Handler, error) {
|
||||
switch {
|
||||
case obj == nil:
|
||||
return nil, errors.New("empty FrostFS Object Layer")
|
||||
|
@ -88,19 +80,12 @@ func New(log *zap.Logger, obj layer.Client, notificator Notificator, cfg Config,
|
|||
return nil, errors.New("empty frostfsid")
|
||||
}
|
||||
|
||||
if !cfg.NotificatorEnabled() {
|
||||
log.Warn(logs.NotificatorIsDisabledS3WontProduceNotificationEvents)
|
||||
} else if notificator == nil {
|
||||
return nil, errors.New("empty notificator")
|
||||
}
|
||||
|
||||
return &handler{
|
||||
log: log,
|
||||
obj: obj,
|
||||
cfg: cfg,
|
||||
ape: storage,
|
||||
notificator: notificator,
|
||||
frostfsid: ffsid,
|
||||
log: log,
|
||||
obj: obj,
|
||||
cfg: cfg,
|
||||
ape: storage,
|
||||
frostfsid: ffsid,
|
||||
}, nil
|
||||
}
|
||||
|
||||
|
|
|
@ -268,7 +268,7 @@ func (h *handler) CopyObjectHandler(w http.ResponseWriter, r *http.Request) {
|
|||
TagSet: tagSet,
|
||||
NodeVersion: extendedDstObjInfo.NodeVersion,
|
||||
}
|
||||
if _, err = h.obj.PutObjectTagging(ctx, tagPrm); err != nil {
|
||||
if err = h.obj.PutObjectTagging(ctx, tagPrm); err != nil {
|
||||
h.logAndSendError(w, "could not upload object tagging", reqInfo, err)
|
||||
return
|
||||
}
|
||||
|
@ -276,16 +276,6 @@ func (h *handler) CopyObjectHandler(w http.ResponseWriter, r *http.Request) {
|
|||
|
||||
h.reqLogger(ctx).Info(logs.ObjectIsCopied, zap.Stringer("object_id", dstObjInfo.ID))
|
||||
|
||||
s := &SendNotificationParams{
|
||||
Event: EventObjectCreatedCopy,
|
||||
NotificationInfo: data.NotificationInfoFromObject(dstObjInfo, h.cfg.MD5Enabled()),
|
||||
BktInfo: dstBktInfo,
|
||||
ReqInfo: reqInfo,
|
||||
}
|
||||
if err = h.sendNotifications(ctx, s); err != nil {
|
||||
h.reqLogger(ctx).Error(logs.CouldntSendNotification, zap.Error(err))
|
||||
}
|
||||
|
||||
if dstEncryptionParams.Enabled() {
|
||||
addSSECHeaders(w.Header(), r.Header)
|
||||
}
|
||||
|
|
|
@ -8,16 +8,12 @@ import (
|
|||
"strings"
|
||||
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/data"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/errors"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/layer"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/middleware"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/internal/logs"
|
||||
apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status"
|
||||
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/session"
|
||||
"git.frostfs.info/TrueCloudLab/policy-engine/pkg/chain"
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
// limitation of AWS https://docs.aws.amazon.com/AmazonS3/latest/API/API_DeleteObjects.html
|
||||
|
@ -101,41 +97,6 @@ func (h *handler) DeleteObjectHandler(w http.ResponseWriter, r *http.Request) {
|
|||
return
|
||||
}
|
||||
|
||||
var m *SendNotificationParams
|
||||
|
||||
if bktSettings.VersioningEnabled() && len(versionID) == 0 {
|
||||
m = &SendNotificationParams{
|
||||
Event: EventObjectRemovedDeleteMarkerCreated,
|
||||
NotificationInfo: &data.NotificationInfo{
|
||||
Name: reqInfo.ObjectName,
|
||||
HashSum: deletedObject.DeleteMarkerEtag,
|
||||
},
|
||||
BktInfo: bktInfo,
|
||||
ReqInfo: reqInfo,
|
||||
}
|
||||
} else {
|
||||
var objID oid.ID
|
||||
if len(versionID) != 0 {
|
||||
if err = objID.DecodeString(versionID); err != nil {
|
||||
h.reqLogger(ctx).Error(logs.CouldntSendNotification, zap.Error(err))
|
||||
}
|
||||
}
|
||||
|
||||
m = &SendNotificationParams{
|
||||
Event: EventObjectRemovedDelete,
|
||||
NotificationInfo: &data.NotificationInfo{
|
||||
Name: reqInfo.ObjectName,
|
||||
Version: objID.EncodeToString(),
|
||||
},
|
||||
BktInfo: bktInfo,
|
||||
ReqInfo: reqInfo,
|
||||
}
|
||||
}
|
||||
|
||||
if err = h.sendNotifications(ctx, m); err != nil {
|
||||
h.reqLogger(ctx).Error(logs.CouldntSendNotification, zap.Error(err))
|
||||
}
|
||||
|
||||
if deletedObject.VersionID != "" {
|
||||
w.Header().Set(api.AmzVersionID, deletedObject.VersionID)
|
||||
}
|
||||
|
|
|
@ -104,10 +104,6 @@ func (c *configMock) DefaultMaxAge() int {
|
|||
return 0
|
||||
}
|
||||
|
||||
func (c *configMock) NotificatorEnabled() bool {
|
||||
return false
|
||||
}
|
||||
|
||||
func (c *configMock) ResolveZoneList() []string {
|
||||
return []string{}
|
||||
}
|
||||
|
|
|
@ -13,7 +13,6 @@ import (
|
|||
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/errors"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/layer"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/middleware"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/internal/logs"
|
||||
"github.com/google/uuid"
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
@ -456,7 +455,7 @@ func (h *handler) CompleteMultipartUploadHandler(w http.ResponseWriter, r *http.
|
|||
|
||||
// Start complete multipart upload which may take some time to fetch object
|
||||
// and re-upload it part by part.
|
||||
objInfo, err := h.completeMultipartUpload(r, c, bktInfo, reqInfo)
|
||||
objInfo, err := h.completeMultipartUpload(r, c, bktInfo)
|
||||
|
||||
if err != nil {
|
||||
h.logAndSendError(w, "complete multipart error", reqInfo, err, additional...)
|
||||
|
@ -478,7 +477,7 @@ func (h *handler) CompleteMultipartUploadHandler(w http.ResponseWriter, r *http.
|
|||
}
|
||||
}
|
||||
|
||||
func (h *handler) completeMultipartUpload(r *http.Request, c *layer.CompleteMultipartParams, bktInfo *data.BucketInfo, reqInfo *middleware.ReqInfo) (*data.ObjectInfo, error) {
|
||||
func (h *handler) completeMultipartUpload(r *http.Request, c *layer.CompleteMultipartParams, bktInfo *data.BucketInfo) (*data.ObjectInfo, error) {
|
||||
ctx := r.Context()
|
||||
uploadData, extendedObjInfo, err := h.obj.CompleteMultipartUpload(ctx, c)
|
||||
if err != nil {
|
||||
|
@ -496,7 +495,7 @@ func (h *handler) completeMultipartUpload(r *http.Request, c *layer.CompleteMult
|
|||
TagSet: uploadData.TagSet,
|
||||
NodeVersion: extendedObjInfo.NodeVersion,
|
||||
}
|
||||
if _, err = h.obj.PutObjectTagging(ctx, tagPrm); err != nil {
|
||||
if err = h.obj.PutObjectTagging(ctx, tagPrm); err != nil {
|
||||
return nil, fmt.Errorf("could not put tagging file of completed multipart upload: %w", err)
|
||||
}
|
||||
}
|
||||
|
@ -528,16 +527,6 @@ func (h *handler) completeMultipartUpload(r *http.Request, c *layer.CompleteMult
|
|||
}
|
||||
}
|
||||
|
||||
s := &SendNotificationParams{
|
||||
Event: EventObjectCreatedCompleteMultipartUpload,
|
||||
NotificationInfo: data.NotificationInfoFromObject(objInfo, h.cfg.MD5Enabled()),
|
||||
BktInfo: bktInfo,
|
||||
ReqInfo: reqInfo,
|
||||
}
|
||||
if err = h.sendNotifications(ctx, s); err != nil {
|
||||
h.reqLogger(ctx).Error(logs.CouldntSendNotification, zap.Error(err))
|
||||
}
|
||||
|
||||
return objInfo, nil
|
||||
}
|
||||
|
||||
|
|
|
@ -1,274 +0,0 @@
|
|||
package handler
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"net/http"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/data"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/errors"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/layer"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/middleware"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/internal/logs"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/bearer"
|
||||
"github.com/google/uuid"
|
||||
)
|
||||
|
||||
type (
|
||||
SendNotificationParams struct {
|
||||
Event string
|
||||
NotificationInfo *data.NotificationInfo
|
||||
BktInfo *data.BucketInfo
|
||||
ReqInfo *middleware.ReqInfo
|
||||
User string
|
||||
Time time.Time
|
||||
}
|
||||
)
|
||||
|
||||
const (
|
||||
filterRuleSuffixName = "suffix"
|
||||
filterRulePrefixName = "prefix"
|
||||
|
||||
EventObjectCreated = "s3:ObjectCreated:*"
|
||||
EventObjectCreatedPut = "s3:ObjectCreated:Put"
|
||||
EventObjectCreatedPost = "s3:ObjectCreated:Post"
|
||||
EventObjectCreatedCopy = "s3:ObjectCreated:Copy"
|
||||
EventReducedRedundancyLostObject = "s3:ReducedRedundancyLostObject"
|
||||
EventObjectCreatedCompleteMultipartUpload = "s3:ObjectCreated:CompleteMultipartUpload"
|
||||
EventObjectRemoved = "s3:ObjectRemoved:*"
|
||||
EventObjectRemovedDelete = "s3:ObjectRemoved:Delete"
|
||||
EventObjectRemovedDeleteMarkerCreated = "s3:ObjectRemoved:DeleteMarkerCreated"
|
||||
EventObjectRestore = "s3:ObjectRestore:*"
|
||||
EventObjectRestorePost = "s3:ObjectRestore:Post"
|
||||
EventObjectRestoreCompleted = "s3:ObjectRestore:Completed"
|
||||
EventReplication = "s3:Replication:*"
|
||||
EventReplicationOperationFailedReplication = "s3:Replication:OperationFailedReplication"
|
||||
EventReplicationOperationNotTracked = "s3:Replication:OperationNotTracked"
|
||||
EventReplicationOperationMissedThreshold = "s3:Replication:OperationMissedThreshold"
|
||||
EventReplicationOperationReplicatedAfterThreshold = "s3:Replication:OperationReplicatedAfterThreshold"
|
||||
EventObjectRestoreDelete = "s3:ObjectRestore:Delete"
|
||||
EventLifecycleTransition = "s3:LifecycleTransition"
|
||||
EventIntelligentTiering = "s3:IntelligentTiering"
|
||||
EventObjectACLPut = "s3:ObjectAcl:Put"
|
||||
EventLifecycleExpiration = "s3:LifecycleExpiration:*"
|
||||
EventLifecycleExpirationDelete = "s3:LifecycleExpiration:Delete"
|
||||
EventLifecycleExpirationDeleteMarkerCreated = "s3:LifecycleExpiration:DeleteMarkerCreated"
|
||||
EventObjectTagging = "s3:ObjectTagging:*"
|
||||
EventObjectTaggingPut = "s3:ObjectTagging:Put"
|
||||
EventObjectTaggingDelete = "s3:ObjectTagging:Delete"
|
||||
)
|
||||
|
||||
var validEvents = map[string]struct{}{
|
||||
EventReducedRedundancyLostObject: {},
|
||||
EventObjectCreated: {},
|
||||
EventObjectCreatedPut: {},
|
||||
EventObjectCreatedPost: {},
|
||||
EventObjectCreatedCopy: {},
|
||||
EventObjectCreatedCompleteMultipartUpload: {},
|
||||
EventObjectRemoved: {},
|
||||
EventObjectRemovedDelete: {},
|
||||
EventObjectRemovedDeleteMarkerCreated: {},
|
||||
EventObjectRestore: {},
|
||||
EventObjectRestorePost: {},
|
||||
EventObjectRestoreCompleted: {},
|
||||
EventReplication: {},
|
||||
EventReplicationOperationFailedReplication: {},
|
||||
EventReplicationOperationNotTracked: {},
|
||||
EventReplicationOperationMissedThreshold: {},
|
||||
EventReplicationOperationReplicatedAfterThreshold: {},
|
||||
EventObjectRestoreDelete: {},
|
||||
EventLifecycleTransition: {},
|
||||
EventIntelligentTiering: {},
|
||||
EventObjectACLPut: {},
|
||||
EventLifecycleExpiration: {},
|
||||
EventLifecycleExpirationDelete: {},
|
||||
EventLifecycleExpirationDeleteMarkerCreated: {},
|
||||
EventObjectTagging: {},
|
||||
EventObjectTaggingPut: {},
|
||||
EventObjectTaggingDelete: {},
|
||||
}
|
||||
|
||||
func (h *handler) PutBucketNotificationHandler(w http.ResponseWriter, r *http.Request) {
|
||||
reqInfo := middleware.GetReqInfo(r.Context())
|
||||
bktInfo, err := h.getBucketAndCheckOwner(r, reqInfo.BucketName)
|
||||
if err != nil {
|
||||
h.logAndSendError(w, "could not get bucket info", reqInfo, err)
|
||||
return
|
||||
}
|
||||
|
||||
conf := &data.NotificationConfiguration{}
|
||||
if err = h.cfg.NewXMLDecoder(r.Body).Decode(conf); err != nil {
|
||||
h.logAndSendError(w, "couldn't decode notification configuration", reqInfo, errors.GetAPIError(errors.ErrMalformedXML))
|
||||
return
|
||||
}
|
||||
|
||||
if _, err = h.checkBucketConfiguration(r.Context(), conf, reqInfo); err != nil {
|
||||
h.logAndSendError(w, "couldn't check bucket configuration", reqInfo, err)
|
||||
return
|
||||
}
|
||||
|
||||
p := &layer.PutBucketNotificationConfigurationParams{
|
||||
RequestInfo: reqInfo,
|
||||
BktInfo: bktInfo,
|
||||
Configuration: conf,
|
||||
}
|
||||
|
||||
p.CopiesNumbers, err = h.pickCopiesNumbers(parseMetadata(r), reqInfo.Namespace, bktInfo.LocationConstraint)
|
||||
if err != nil {
|
||||
h.logAndSendError(w, "invalid copies number", reqInfo, err)
|
||||
return
|
||||
}
|
||||
|
||||
if err = h.obj.PutBucketNotificationConfiguration(r.Context(), p); err != nil {
|
||||
h.logAndSendError(w, "couldn't put bucket configuration", reqInfo, err)
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
func (h *handler) GetBucketNotificationHandler(w http.ResponseWriter, r *http.Request) {
|
||||
reqInfo := middleware.GetReqInfo(r.Context())
|
||||
|
||||
bktInfo, err := h.getBucketAndCheckOwner(r, reqInfo.BucketName)
|
||||
if err != nil {
|
||||
h.logAndSendError(w, "could not get bucket info", reqInfo, err)
|
||||
return
|
||||
}
|
||||
|
||||
conf, err := h.obj.GetBucketNotificationConfiguration(r.Context(), bktInfo)
|
||||
if err != nil {
|
||||
h.logAndSendError(w, "could not get bucket notification configuration", reqInfo, err)
|
||||
return
|
||||
}
|
||||
|
||||
if err = middleware.EncodeToResponse(w, conf); err != nil {
|
||||
h.logAndSendError(w, "could not encode bucket notification configuration to response", reqInfo, err)
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
func (h *handler) sendNotifications(ctx context.Context, p *SendNotificationParams) error {
|
||||
if !h.cfg.NotificatorEnabled() {
|
||||
return nil
|
||||
}
|
||||
|
||||
conf, err := h.obj.GetBucketNotificationConfiguration(ctx, p.BktInfo)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to get notification configuration: %w", err)
|
||||
}
|
||||
if conf.IsEmpty() {
|
||||
return nil
|
||||
}
|
||||
|
||||
box, err := middleware.GetBoxData(ctx)
|
||||
if err == nil && box.Gate.BearerToken != nil {
|
||||
p.User = bearer.ResolveIssuer(*box.Gate.BearerToken).EncodeToString()
|
||||
}
|
||||
|
||||
p.Time = layer.TimeNow(ctx)
|
||||
|
||||
topics := filterSubjects(conf, p.Event, p.NotificationInfo.Name)
|
||||
|
||||
return h.notificator.SendNotifications(topics, p)
|
||||
}
|
||||
|
||||
// checkBucketConfiguration checks notification configuration and generates an ID for configurations with empty ids.
|
||||
func (h *handler) checkBucketConfiguration(ctx context.Context, conf *data.NotificationConfiguration, r *middleware.ReqInfo) (completed bool, err error) {
|
||||
if conf == nil {
|
||||
return
|
||||
}
|
||||
|
||||
if conf.TopicConfigurations != nil || conf.LambdaFunctionConfigurations != nil {
|
||||
return completed, errors.GetAPIError(errors.ErrNotificationTopicNotSupported)
|
||||
}
|
||||
|
||||
for i, q := range conf.QueueConfigurations {
|
||||
if err = checkEvents(q.Events); err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
if err = checkRules(q.Filter.Key.FilterRules); err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
if h.cfg.NotificatorEnabled() {
|
||||
if err = h.notificator.SendTestNotification(q.QueueArn, r.BucketName, r.RequestID, r.Host, layer.TimeNow(ctx)); err != nil {
|
||||
return
|
||||
}
|
||||
} else {
|
||||
h.reqLogger(ctx).Warn(logs.FailedToSendTestEventBecauseNotificationsIsDisabled)
|
||||
}
|
||||
|
||||
if q.ID == "" {
|
||||
completed = true
|
||||
conf.QueueConfigurations[i].ID = uuid.NewString()
|
||||
}
|
||||
}
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
func checkRules(rules []data.FilterRule) error {
|
||||
names := make(map[string]struct{})
|
||||
|
||||
for _, r := range rules {
|
||||
if r.Name != filterRuleSuffixName && r.Name != filterRulePrefixName {
|
||||
return errors.GetAPIError(errors.ErrFilterNameInvalid)
|
||||
}
|
||||
if _, ok := names[r.Name]; ok {
|
||||
if r.Name == filterRuleSuffixName {
|
||||
return errors.GetAPIError(errors.ErrFilterNameSuffix)
|
||||
}
|
||||
return errors.GetAPIError(errors.ErrFilterNamePrefix)
|
||||
}
|
||||
|
||||
names[r.Name] = struct{}{}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func checkEvents(events []string) error {
|
||||
for _, e := range events {
|
||||
if _, ok := validEvents[e]; !ok {
|
||||
return errors.GetAPIError(errors.ErrEventNotification)
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func filterSubjects(conf *data.NotificationConfiguration, eventType, objName string) map[string]string {
|
||||
topics := make(map[string]string)
|
||||
|
||||
for _, t := range conf.QueueConfigurations {
|
||||
event := false
|
||||
for _, e := range t.Events {
|
||||
// the second condition is comparison with the events ending with *:
|
||||
// s3:ObjectCreated:*, s3:ObjectRemoved:* etc without the last char
|
||||
if eventType == e || strings.HasPrefix(eventType, e[:len(e)-1]) {
|
||||
event = true
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
if !event {
|
||||
continue
|
||||
}
|
||||
|
||||
filter := true
|
||||
for _, f := range t.Filter.Key.FilterRules {
|
||||
if f.Name == filterRulePrefixName && !strings.HasPrefix(objName, f.Value) ||
|
||||
f.Name == filterRuleSuffixName && !strings.HasSuffix(objName, f.Value) {
|
||||
filter = false
|
||||
break
|
||||
}
|
||||
}
|
||||
if filter {
|
||||
topics[t.ID] = t.QueueArn
|
||||
}
|
||||
}
|
||||
|
||||
return topics
|
||||
}
|
|
@ -1,115 +0,0 @@
|
|||
package handler
|
||||
|
||||
import (
|
||||
"testing"
|
||||
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/data"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/errors"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
func TestFilterSubjects(t *testing.T) {
|
||||
config := &data.NotificationConfiguration{
|
||||
QueueConfigurations: []data.QueueConfiguration{
|
||||
{
|
||||
ID: "test1",
|
||||
QueueArn: "test1",
|
||||
Events: []string{EventObjectCreated, EventObjectRemovedDelete},
|
||||
},
|
||||
{
|
||||
ID: "test2",
|
||||
QueueArn: "test2",
|
||||
Events: []string{EventObjectTagging},
|
||||
Filter: data.Filter{Key: data.Key{FilterRules: []data.FilterRule{
|
||||
{Name: "prefix", Value: "dir/"},
|
||||
{Name: "suffix", Value: ".png"},
|
||||
}}},
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
t.Run("no topics because suitable events not found", func(t *testing.T) {
|
||||
topics := filterSubjects(config, EventObjectACLPut, "dir/a.png")
|
||||
require.Empty(t, topics)
|
||||
})
|
||||
|
||||
t.Run("no topics because of not suitable prefix", func(t *testing.T) {
|
||||
topics := filterSubjects(config, EventObjectTaggingPut, "dirw/cat.png")
|
||||
require.Empty(t, topics)
|
||||
})
|
||||
|
||||
t.Run("no topics because of not suitable suffix", func(t *testing.T) {
|
||||
topics := filterSubjects(config, EventObjectTaggingPut, "a.jpg")
|
||||
require.Empty(t, topics)
|
||||
})
|
||||
|
||||
t.Run("filter topics from queue configs without prefix suffix filter and exact event", func(t *testing.T) {
|
||||
topics := filterSubjects(config, EventObjectCreatedPut, "dir/a.png")
|
||||
require.Contains(t, topics, "test1")
|
||||
require.Len(t, topics, 1)
|
||||
require.Equal(t, topics["test1"], "test1")
|
||||
})
|
||||
|
||||
t.Run("filter topics from queue configs with prefix suffix filter and '*' ending event", func(t *testing.T) {
|
||||
topics := filterSubjects(config, EventObjectTaggingPut, "dir/a.png")
|
||||
require.Contains(t, topics, "test2")
|
||||
require.Len(t, topics, 1)
|
||||
require.Equal(t, topics["test2"], "test2")
|
||||
})
|
||||
}
|
||||
|
||||
func TestCheckRules(t *testing.T) {
|
||||
t.Run("correct rules with prefix and suffix", func(t *testing.T) {
|
||||
rules := []data.FilterRule{
|
||||
{Name: "prefix", Value: "asd"},
|
||||
{Name: "suffix", Value: "asd"},
|
||||
}
|
||||
err := checkRules(rules)
|
||||
require.NoError(t, err)
|
||||
})
|
||||
|
||||
t.Run("correct rules with prefix", func(t *testing.T) {
|
||||
rules := []data.FilterRule{
|
||||
{Name: "prefix", Value: "asd"},
|
||||
}
|
||||
err := checkRules(rules)
|
||||
require.NoError(t, err)
|
||||
})
|
||||
|
||||
t.Run("correct rules with suffix", func(t *testing.T) {
|
||||
rules := []data.FilterRule{
|
||||
{Name: "suffix", Value: "asd"},
|
||||
}
|
||||
err := checkRules(rules)
|
||||
require.NoError(t, err)
|
||||
})
|
||||
|
||||
t.Run("incorrect rules with wrong name", func(t *testing.T) {
|
||||
rules := []data.FilterRule{
|
||||
{Name: "prefix", Value: "sdf"},
|
||||
{Name: "sfx", Value: "asd"},
|
||||
}
|
||||
err := checkRules(rules)
|
||||
require.ErrorIs(t, err, errors.GetAPIError(errors.ErrFilterNameInvalid))
|
||||
})
|
||||
|
||||
t.Run("incorrect rules with repeating suffix", func(t *testing.T) {
|
||||
rules := []data.FilterRule{
|
||||
{Name: "suffix", Value: "asd"},
|
||||
{Name: "suffix", Value: "asdf"},
|
||||
{Name: "prefix", Value: "jk"},
|
||||
}
|
||||
err := checkRules(rules)
|
||||
require.ErrorIs(t, err, errors.GetAPIError(errors.ErrFilterNameSuffix))
|
||||
})
|
||||
|
||||
t.Run("incorrect rules with repeating prefix", func(t *testing.T) {
|
||||
rules := []data.FilterRule{
|
||||
{Name: "suffix", Value: "ds"},
|
||||
{Name: "prefix", Value: "asd"},
|
||||
{Name: "prefix", Value: "asdf"},
|
||||
}
|
||||
err := checkRules(rules)
|
||||
require.ErrorIs(t, err, errors.GetAPIError(errors.ErrFilterNamePrefix))
|
||||
})
|
||||
}
|
|
@ -292,16 +292,6 @@ func (h *handler) PutObjectHandler(w http.ResponseWriter, r *http.Request) {
|
|||
}
|
||||
objInfo := extendedObjInfo.ObjectInfo
|
||||
|
||||
s := &SendNotificationParams{
|
||||
Event: EventObjectCreatedPut,
|
||||
NotificationInfo: data.NotificationInfoFromObject(objInfo, h.cfg.MD5Enabled()),
|
||||
BktInfo: bktInfo,
|
||||
ReqInfo: reqInfo,
|
||||
}
|
||||
if err = h.sendNotifications(ctx, s); err != nil {
|
||||
h.reqLogger(ctx).Error(logs.CouldntSendNotification, zap.Error(err))
|
||||
}
|
||||
|
||||
if needUpdateEACLTable {
|
||||
if newEaclTable, err = h.getNewEAclTable(r, bktInfo, objInfo); err != nil {
|
||||
h.logAndSendError(w, "could not get new eacl table", reqInfo, err)
|
||||
|
@ -319,7 +309,7 @@ func (h *handler) PutObjectHandler(w http.ResponseWriter, r *http.Request) {
|
|||
TagSet: tagSet,
|
||||
NodeVersion: extendedObjInfo.NodeVersion,
|
||||
}
|
||||
if _, err = h.obj.PutObjectTagging(r.Context(), tagPrm); err != nil {
|
||||
if err = h.obj.PutObjectTagging(r.Context(), tagPrm); err != nil {
|
||||
h.logAndSendError(w, "could not upload object tagging", reqInfo, err)
|
||||
return
|
||||
}
|
||||
|
@ -560,16 +550,6 @@ func (h *handler) PostObject(w http.ResponseWriter, r *http.Request) {
|
|||
}
|
||||
objInfo := extendedObjInfo.ObjectInfo
|
||||
|
||||
s := &SendNotificationParams{
|
||||
Event: EventObjectCreatedPost,
|
||||
NotificationInfo: data.NotificationInfoFromObject(objInfo, h.cfg.MD5Enabled()),
|
||||
BktInfo: bktInfo,
|
||||
ReqInfo: reqInfo,
|
||||
}
|
||||
if err = h.sendNotifications(ctx, s); err != nil {
|
||||
h.reqLogger(ctx).Error(logs.CouldntSendNotification, zap.Error(err))
|
||||
}
|
||||
|
||||
if acl := auth.MultipartFormValue(r, "acl"); acl != "" {
|
||||
r.Header.Set(api.AmzACL, acl)
|
||||
r.Header.Set(api.AmzGrantFullControl, "")
|
||||
|
@ -592,7 +572,7 @@ func (h *handler) PostObject(w http.ResponseWriter, r *http.Request) {
|
|||
NodeVersion: extendedObjInfo.NodeVersion,
|
||||
}
|
||||
|
||||
if _, err = h.obj.PutObjectTagging(ctx, tagPrm); err != nil {
|
||||
if err = h.obj.PutObjectTagging(ctx, tagPrm); err != nil {
|
||||
h.logAndSendError(w, "could not upload object tagging", reqInfo, err)
|
||||
return
|
||||
}
|
||||
|
|
|
@ -10,8 +10,6 @@ import (
|
|||
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/data"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/errors"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/middleware"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/internal/logs"
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
const (
|
||||
|
@ -46,27 +44,12 @@ func (h *handler) PutObjectTaggingHandler(w http.ResponseWriter, r *http.Request
|
|||
},
|
||||
TagSet: tagSet,
|
||||
}
|
||||
nodeVersion, err := h.obj.PutObjectTagging(ctx, tagPrm)
|
||||
if err != nil {
|
||||
|
||||
if err = h.obj.PutObjectTagging(ctx, tagPrm); err != nil {
|
||||
h.logAndSendError(w, "could not put object tagging", reqInfo, err)
|
||||
return
|
||||
}
|
||||
|
||||
s := &SendNotificationParams{
|
||||
Event: EventObjectTaggingPut,
|
||||
NotificationInfo: &data.NotificationInfo{
|
||||
Name: nodeVersion.FilePath,
|
||||
Size: nodeVersion.Size,
|
||||
Version: nodeVersion.OID.EncodeToString(),
|
||||
HashSum: nodeVersion.ETag,
|
||||
},
|
||||
BktInfo: bktInfo,
|
||||
ReqInfo: reqInfo,
|
||||
}
|
||||
if err = h.sendNotifications(ctx, s); err != nil {
|
||||
h.reqLogger(ctx).Error(logs.CouldntSendNotification, zap.Error(err))
|
||||
}
|
||||
|
||||
w.WriteHeader(http.StatusOK)
|
||||
}
|
||||
|
||||
|
@ -123,27 +106,11 @@ func (h *handler) DeleteObjectTaggingHandler(w http.ResponseWriter, r *http.Requ
|
|||
VersionID: reqInfo.URL.Query().Get(api.QueryVersionID),
|
||||
}
|
||||
|
||||
nodeVersion, err := h.obj.DeleteObjectTagging(ctx, p)
|
||||
if err != nil {
|
||||
if err = h.obj.DeleteObjectTagging(ctx, p); err != nil {
|
||||
h.logAndSendError(w, "could not delete object tagging", reqInfo, err)
|
||||
return
|
||||
}
|
||||
|
||||
s := &SendNotificationParams{
|
||||
Event: EventObjectTaggingDelete,
|
||||
NotificationInfo: &data.NotificationInfo{
|
||||
Name: nodeVersion.FilePath,
|
||||
Size: nodeVersion.Size,
|
||||
Version: nodeVersion.OID.EncodeToString(),
|
||||
HashSum: nodeVersion.ETag,
|
||||
},
|
||||
BktInfo: bktInfo,
|
||||
ReqInfo: reqInfo,
|
||||
}
|
||||
if err = h.sendNotifications(ctx, s); err != nil {
|
||||
h.reqLogger(ctx).Error(logs.CouldntSendNotification, zap.Error(err))
|
||||
}
|
||||
|
||||
w.WriteHeader(http.StatusNoContent)
|
||||
}
|
||||
|
||||
|
|
|
@ -58,3 +58,11 @@ func (h *handler) PutBucketLifecycleHandler(w http.ResponseWriter, r *http.Reque
|
|||
func (h *handler) PutBucketEncryptionHandler(w http.ResponseWriter, r *http.Request) {
|
||||
h.logAndSendError(w, "not implemented", middleware.GetReqInfo(r.Context()), errors.GetAPIError(errors.ErrNotImplemented))
|
||||
}
|
||||
|
||||
func (h *handler) PutBucketNotificationHandler(w http.ResponseWriter, r *http.Request) {
|
||||
h.logAndSendError(w, "not implemented", middleware.GetReqInfo(r.Context()), errors.GetAPIError(errors.ErrNotImplemented))
|
||||
}
|
||||
|
||||
func (h *handler) GetBucketNotificationHandler(w http.ResponseWriter, r *http.Request) {
|
||||
h.logAndSendError(w, "not implemented", middleware.GetReqInfo(r.Context()), errors.GetAPIError(errors.ErrNotImplemented))
|
||||
}
|
||||
|
|
|
@ -257,24 +257,3 @@ func (c *Cache) PutCORS(owner user.ID, bkt *data.BucketInfo, cors *data.CORSConf
|
|||
func (c *Cache) DeleteCORS(bktInfo *data.BucketInfo) {
|
||||
c.systemCache.Delete(bktInfo.Name + bktInfo.CORSObjectName())
|
||||
}
|
||||
|
||||
func (c *Cache) GetNotificationConfiguration(owner user.ID, bktInfo *data.BucketInfo) *data.NotificationConfiguration {
|
||||
key := bktInfo.Name + bktInfo.NotificationConfigurationObjectName()
|
||||
|
||||
if !c.accessCache.Get(owner, key) {
|
||||
return nil
|
||||
}
|
||||
|
||||
return c.systemCache.GetNotificationConfiguration(key)
|
||||
}
|
||||
|
||||
func (c *Cache) PutNotificationConfiguration(owner user.ID, bktInfo *data.BucketInfo, configuration *data.NotificationConfiguration) {
|
||||
key := bktInfo.Name + bktInfo.NotificationConfigurationObjectName()
|
||||
if err := c.systemCache.PutNotificationConfiguration(key, configuration); err != nil {
|
||||
c.logger.Warn(logs.CouldntCacheNotificationConfiguration, zap.String("bucket", bktInfo.Name), zap.Error(err))
|
||||
}
|
||||
|
||||
if err := c.accessCache.Put(owner, key); err != nil {
|
||||
c.logger.Warn(logs.CouldntCacheAccessControlOperation, zap.Error(err))
|
||||
}
|
||||
}
|
||||
|
|
|
@ -27,23 +27,11 @@ import (
|
|||
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/session"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/user"
|
||||
"github.com/nats-io/nats.go"
|
||||
"github.com/nspcc-dev/neo-go/pkg/crypto/keys"
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
type (
|
||||
EventListener interface {
|
||||
Subscribe(context.Context, string, MsgHandler) error
|
||||
Listen(context.Context)
|
||||
}
|
||||
|
||||
MsgHandler interface {
|
||||
HandleMessage(context.Context, *nats.Msg) error
|
||||
}
|
||||
|
||||
MsgHandlerFunc func(context.Context, *nats.Msg) error
|
||||
|
||||
BucketResolver interface {
|
||||
Resolve(ctx context.Context, name string) (cid.ID, error)
|
||||
}
|
||||
|
@ -61,7 +49,6 @@ type (
|
|||
log *zap.Logger
|
||||
anonKey AnonymousKey
|
||||
resolver BucketResolver
|
||||
ncontroller EventListener
|
||||
cache *Cache
|
||||
treeService TreeService
|
||||
features FeatureSettings
|
||||
|
@ -215,7 +202,6 @@ type (
|
|||
|
||||
// Client provides S3 API client interface.
|
||||
Client interface {
|
||||
Initialize(ctx context.Context, c EventListener) error
|
||||
EphemeralKey() *keys.PublicKey
|
||||
|
||||
GetBucketSettings(ctx context.Context, bktInfo *data.BucketInfo) (*data.BucketSettings, error)
|
||||
|
@ -245,8 +231,8 @@ type (
|
|||
DeleteBucketTagging(ctx context.Context, bktInfo *data.BucketInfo) error
|
||||
|
||||
GetObjectTagging(ctx context.Context, p *data.GetObjectTaggingParams) (string, map[string]string, error)
|
||||
PutObjectTagging(ctx context.Context, p *data.PutObjectTaggingParams) (*data.NodeVersion, error)
|
||||
DeleteObjectTagging(ctx context.Context, p *data.ObjectVersion) (*data.NodeVersion, error)
|
||||
PutObjectTagging(ctx context.Context, p *data.PutObjectTaggingParams) error
|
||||
DeleteObjectTagging(ctx context.Context, p *data.ObjectVersion) error
|
||||
|
||||
PutObject(ctx context.Context, p *PutObjectParams) (*data.ExtendedObjectInfo, error)
|
||||
|
||||
|
@ -266,9 +252,6 @@ type (
|
|||
AbortMultipartUpload(ctx context.Context, p *UploadInfoParams) error
|
||||
ListParts(ctx context.Context, p *ListPartsParams) (*ListPartsInfo, error)
|
||||
|
||||
PutBucketNotificationConfiguration(ctx context.Context, p *PutBucketNotificationConfigurationParams) error
|
||||
GetBucketNotificationConfiguration(ctx context.Context, bktInfo *data.BucketInfo) (*data.NotificationConfiguration, error)
|
||||
|
||||
// Compound methods for optimizations
|
||||
|
||||
// GetObjectTaggingAndLock unifies GetObjectTagging and GetLock methods in single tree service invocation.
|
||||
|
@ -300,10 +283,6 @@ func (t *VersionedObject) String() string {
|
|||
return t.Name + ":" + t.VersionID
|
||||
}
|
||||
|
||||
func (f MsgHandlerFunc) HandleMessage(ctx context.Context, msg *nats.Msg) error {
|
||||
return f(ctx, msg)
|
||||
}
|
||||
|
||||
func (p HeadObjectParams) Versioned() bool {
|
||||
return len(p.VersionID) > 0
|
||||
}
|
||||
|
@ -327,23 +306,6 @@ func (n *layer) EphemeralKey() *keys.PublicKey {
|
|||
return n.anonKey.Key.PublicKey()
|
||||
}
|
||||
|
||||
func (n *layer) Initialize(ctx context.Context, c EventListener) error {
|
||||
if n.IsNotificationEnabled() {
|
||||
return fmt.Errorf("already initialized")
|
||||
}
|
||||
|
||||
// todo add notification handlers (e.g. for lifecycles)
|
||||
|
||||
c.Listen(ctx)
|
||||
|
||||
n.ncontroller = c
|
||||
return nil
|
||||
}
|
||||
|
||||
func (n *layer) IsNotificationEnabled() bool {
|
||||
return n.ncontroller != nil
|
||||
}
|
||||
|
||||
// IsAuthenticatedRequest checks if access box exists in the current request.
|
||||
func IsAuthenticatedRequest(ctx context.Context) bool {
|
||||
_, err := middleware.GetBoxData(ctx)
|
||||
|
|
|
@ -1,89 +0,0 @@
|
|||
package layer
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"encoding/xml"
|
||||
errorsStd "errors"
|
||||
"fmt"
|
||||
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/data"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/middleware"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/internal/logs"
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
type PutBucketNotificationConfigurationParams struct {
|
||||
RequestInfo *middleware.ReqInfo
|
||||
BktInfo *data.BucketInfo
|
||||
Configuration *data.NotificationConfiguration
|
||||
CopiesNumbers []uint32
|
||||
}
|
||||
|
||||
func (n *layer) PutBucketNotificationConfiguration(ctx context.Context, p *PutBucketNotificationConfigurationParams) error {
|
||||
confXML, err := xml.Marshal(p.Configuration)
|
||||
if err != nil {
|
||||
return fmt.Errorf("marshal notify configuration: %w", err)
|
||||
}
|
||||
|
||||
prm := PrmObjectCreate{
|
||||
Container: p.BktInfo.CID,
|
||||
Payload: bytes.NewReader(confXML),
|
||||
Filepath: p.BktInfo.NotificationConfigurationObjectName(),
|
||||
CreationTime: TimeNow(ctx),
|
||||
CopiesNumber: p.CopiesNumbers,
|
||||
}
|
||||
|
||||
_, objID, _, _, err := n.objectPutAndHash(ctx, prm, p.BktInfo)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
objIDToDelete, err := n.treeService.PutNotificationConfigurationNode(ctx, p.BktInfo, objID)
|
||||
objIDToDeleteNotFound := errorsStd.Is(err, ErrNoNodeToRemove)
|
||||
if err != nil && !objIDToDeleteNotFound {
|
||||
return err
|
||||
}
|
||||
|
||||
if !objIDToDeleteNotFound {
|
||||
if err = n.objectDelete(ctx, p.BktInfo, objIDToDelete); err != nil {
|
||||
n.reqLogger(ctx).Error(logs.CouldntDeleteNotificationConfigurationObject, zap.Error(err),
|
||||
zap.String("cid", p.BktInfo.CID.EncodeToString()),
|
||||
zap.String("oid", objIDToDelete.EncodeToString()))
|
||||
}
|
||||
}
|
||||
|
||||
n.cache.PutNotificationConfiguration(n.BearerOwner(ctx), p.BktInfo, p.Configuration)
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (n *layer) GetBucketNotificationConfiguration(ctx context.Context, bktInfo *data.BucketInfo) (*data.NotificationConfiguration, error) {
|
||||
owner := n.BearerOwner(ctx)
|
||||
if conf := n.cache.GetNotificationConfiguration(owner, bktInfo); conf != nil {
|
||||
return conf, nil
|
||||
}
|
||||
|
||||
objID, err := n.treeService.GetNotificationConfigurationNode(ctx, bktInfo)
|
||||
objIDNotFound := errorsStd.Is(err, ErrNodeNotFound)
|
||||
if err != nil && !objIDNotFound {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
conf := &data.NotificationConfiguration{}
|
||||
|
||||
if !objIDNotFound {
|
||||
obj, err := n.objectGet(ctx, bktInfo, objID)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if err = xml.Unmarshal(obj.Payload(), &conf); err != nil {
|
||||
return nil, fmt.Errorf("unmarshal notify configuration: %w", err)
|
||||
}
|
||||
}
|
||||
|
||||
n.cache.PutNotificationConfiguration(owner, bktInfo, conf)
|
||||
|
||||
return conf, nil
|
||||
}
|
|
@ -50,12 +50,12 @@ func (n *layer) GetObjectTagging(ctx context.Context, p *data.GetObjectTaggingPa
|
|||
return p.ObjectVersion.VersionID, tags, nil
|
||||
}
|
||||
|
||||
func (n *layer) PutObjectTagging(ctx context.Context, p *data.PutObjectTaggingParams) (nodeVersion *data.NodeVersion, err error) {
|
||||
nodeVersion = p.NodeVersion
|
||||
func (n *layer) PutObjectTagging(ctx context.Context, p *data.PutObjectTaggingParams) (err error) {
|
||||
nodeVersion := p.NodeVersion
|
||||
if nodeVersion == nil {
|
||||
nodeVersion, err = n.getNodeVersionFromCacheOrFrostfs(ctx, p.ObjectVersion)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
return err
|
||||
}
|
||||
}
|
||||
p.ObjectVersion.VersionID = nodeVersion.OID.EncodeToString()
|
||||
|
@ -63,35 +63,35 @@ func (n *layer) PutObjectTagging(ctx context.Context, p *data.PutObjectTaggingPa
|
|||
err = n.treeService.PutObjectTagging(ctx, p.ObjectVersion.BktInfo, nodeVersion, p.TagSet)
|
||||
if err != nil {
|
||||
if errors.Is(err, ErrNodeNotFound) {
|
||||
return nil, fmt.Errorf("%w: %s", s3errors.GetAPIError(s3errors.ErrNoSuchKey), err.Error())
|
||||
return fmt.Errorf("%w: %s", s3errors.GetAPIError(s3errors.ErrNoSuchKey), err.Error())
|
||||
}
|
||||
return nil, err
|
||||
return err
|
||||
}
|
||||
|
||||
n.cache.PutTagging(n.BearerOwner(ctx), objectTaggingCacheKey(p.ObjectVersion), p.TagSet)
|
||||
|
||||
return nodeVersion, nil
|
||||
return nil
|
||||
}
|
||||
|
||||
func (n *layer) DeleteObjectTagging(ctx context.Context, p *data.ObjectVersion) (*data.NodeVersion, error) {
|
||||
func (n *layer) DeleteObjectTagging(ctx context.Context, p *data.ObjectVersion) error {
|
||||
version, err := n.getNodeVersion(ctx, p)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
return err
|
||||
}
|
||||
|
||||
err = n.treeService.DeleteObjectTagging(ctx, p.BktInfo, version)
|
||||
if err != nil {
|
||||
if errors.Is(err, ErrNodeNotFound) {
|
||||
return nil, fmt.Errorf("%w: %s", s3errors.GetAPIError(s3errors.ErrNoSuchKey), err.Error())
|
||||
return fmt.Errorf("%w: %s", s3errors.GetAPIError(s3errors.ErrNoSuchKey), err.Error())
|
||||
}
|
||||
return nil, err
|
||||
return err
|
||||
}
|
||||
|
||||
p.VersionID = version.OID.EncodeToString()
|
||||
|
||||
n.cache.DeleteTagging(objectTaggingCacheKey(p))
|
||||
|
||||
return version, nil
|
||||
return nil
|
||||
}
|
||||
|
||||
func (n *layer) GetBucketTagging(ctx context.Context, bktInfo *data.BucketInfo) (map[string]string, error) {
|
||||
|
|
|
@ -110,14 +110,6 @@ func (t *TreeServiceMock) GetSettingsNode(_ context.Context, bktInfo *data.Bucke
|
|||
return settings, nil
|
||||
}
|
||||
|
||||
func (t *TreeServiceMock) GetNotificationConfigurationNode(context.Context, *data.BucketInfo) (oid.ID, error) {
|
||||
panic("implement me")
|
||||
}
|
||||
|
||||
func (t *TreeServiceMock) PutNotificationConfigurationNode(context.Context, *data.BucketInfo, oid.ID) (oid.ID, error) {
|
||||
panic("implement me")
|
||||
}
|
||||
|
||||
func (t *TreeServiceMock) GetBucketCORS(_ context.Context, bktInfo *data.BucketInfo) (oid.ID, error) {
|
||||
systemMap, ok := t.system[bktInfo.CID.EncodeToString()]
|
||||
if !ok {
|
||||
|
|
|
@ -18,17 +18,6 @@ type TreeService interface {
|
|||
// If tree node is not found returns ErrNodeNotFound error.
|
||||
GetSettingsNode(ctx context.Context, bktInfo *data.BucketInfo) (*data.BucketSettings, error)
|
||||
|
||||
// GetNotificationConfigurationNode gets an object id that corresponds to object with bucket CORS.
|
||||
//
|
||||
// If tree node is not found returns ErrNodeNotFound error.
|
||||
GetNotificationConfigurationNode(ctx context.Context, bktInfo *data.BucketInfo) (oid.ID, error)
|
||||
|
||||
// PutNotificationConfigurationNode puts a node to a system tree
|
||||
// and returns objectID of a previous notif config which must be deleted in FrostFS.
|
||||
//
|
||||
// If object id to remove is not found returns ErrNoNodeToRemove error.
|
||||
PutNotificationConfigurationNode(ctx context.Context, bktInfo *data.BucketInfo, objID oid.ID) (oid.ID, error)
|
||||
|
||||
// GetBucketCORS gets an object id that corresponds to object with bucket CORS.
|
||||
//
|
||||
// If object id is not found returns ErrNodeNotFound error.
|
||||
|
|
|
@ -1,263 +0,0 @@
|
|||
package notifications
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/handler"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/layer"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/internal/logs"
|
||||
"github.com/nats-io/nats.go"
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
const (
|
||||
DefaultTimeout = 30 * time.Second
|
||||
|
||||
// EventVersion23 is used for lifecycle, tiering, objectACL, objectTagging, object restoration notifications.
|
||||
EventVersion23 = "2.3"
|
||||
// EventVersion22 is used for replication notifications.
|
||||
EventVersion22 = "2.2"
|
||||
// EventVersion21 is used for all other notification types.
|
||||
EventVersion21 = "2.1"
|
||||
)
|
||||
|
||||
type (
|
||||
Options struct {
|
||||
URL string
|
||||
TLSCertFilepath string
|
||||
TLSAuthPrivateKeyFilePath string
|
||||
Timeout time.Duration
|
||||
RootCAFiles []string
|
||||
}
|
||||
|
||||
Controller struct {
|
||||
logger *zap.Logger
|
||||
taskQueueConnection *nats.Conn
|
||||
jsClient nats.JetStreamContext
|
||||
handlers map[string]Stream
|
||||
mu sync.RWMutex
|
||||
}
|
||||
|
||||
Stream struct {
|
||||
h layer.MsgHandler
|
||||
ch chan *nats.Msg
|
||||
}
|
||||
|
||||
TestEvent struct {
|
||||
Service string
|
||||
Event string
|
||||
Time time.Time
|
||||
Bucket string
|
||||
RequestID string
|
||||
HostID string
|
||||
}
|
||||
|
||||
Event struct {
|
||||
Records []EventRecord `json:"Records"`
|
||||
}
|
||||
|
||||
EventRecord struct {
|
||||
EventVersion string `json:"eventVersion"`
|
||||
EventSource string `json:"eventSource"` // frostfs:s3
|
||||
AWSRegion string `json:"awsRegion,omitempty"` // empty
|
||||
EventTime time.Time `json:"eventTime"`
|
||||
EventName string `json:"eventName"`
|
||||
UserIdentity UserIdentity `json:"userIdentity"`
|
||||
RequestParameters RequestParameters `json:"requestParameters"`
|
||||
ResponseElements map[string]string `json:"responseElements"`
|
||||
S3 S3Entity `json:"s3"`
|
||||
}
|
||||
|
||||
UserIdentity struct {
|
||||
PrincipalID string `json:"principalId"`
|
||||
}
|
||||
|
||||
RequestParameters struct {
|
||||
SourceIPAddress string `json:"sourceIPAddress"`
|
||||
}
|
||||
|
||||
S3Entity struct {
|
||||
SchemaVersion string `json:"s3SchemaVersion"`
|
||||
ConfigurationID string `json:"configurationId,omitempty"`
|
||||
Bucket Bucket `json:"bucket"`
|
||||
Object Object `json:"object"`
|
||||
}
|
||||
|
||||
Bucket struct {
|
||||
Name string `json:"name"`
|
||||
OwnerIdentity UserIdentity `json:"ownerIdentity,omitempty"`
|
||||
Arn string `json:"arn,omitempty"`
|
||||
}
|
||||
|
||||
Object struct {
|
||||
Key string `json:"key"`
|
||||
Size uint64 `json:"size,omitempty"`
|
||||
VersionID string `json:"versionId,omitempty"`
|
||||
ETag string `json:"eTag,omitempty"`
|
||||
Sequencer string `json:"sequencer,omitempty"`
|
||||
}
|
||||
)
|
||||
|
||||
func NewController(p *Options, l *zap.Logger) (*Controller, error) {
|
||||
ncopts := []nats.Option{
|
||||
nats.Timeout(p.Timeout),
|
||||
}
|
||||
|
||||
if len(p.TLSCertFilepath) != 0 && len(p.TLSAuthPrivateKeyFilePath) != 0 {
|
||||
ncopts = append(ncopts, nats.ClientCert(p.TLSCertFilepath, p.TLSAuthPrivateKeyFilePath))
|
||||
}
|
||||
if len(p.RootCAFiles) != 0 {
|
||||
ncopts = append(ncopts, nats.RootCAs(p.RootCAFiles...))
|
||||
}
|
||||
|
||||
nc, err := nats.Connect(p.URL, ncopts...)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("connect to nats: %w", err)
|
||||
}
|
||||
|
||||
js, err := nc.JetStream()
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("get jet stream: %w", err)
|
||||
}
|
||||
|
||||
return &Controller{
|
||||
logger: l,
|
||||
taskQueueConnection: nc,
|
||||
jsClient: js,
|
||||
handlers: make(map[string]Stream),
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (c *Controller) Subscribe(_ context.Context, topic string, handler layer.MsgHandler) error {
|
||||
ch := make(chan *nats.Msg, 1)
|
||||
|
||||
c.mu.RLock()
|
||||
_, ok := c.handlers[topic]
|
||||
c.mu.RUnlock()
|
||||
if ok {
|
||||
return fmt.Errorf("already subscribed to topic '%s'", topic)
|
||||
}
|
||||
|
||||
if _, err := c.jsClient.AddStream(&nats.StreamConfig{Name: topic}); err != nil {
|
||||
return fmt.Errorf("add stream: %w", err)
|
||||
}
|
||||
|
||||
if _, err := c.jsClient.ChanSubscribe(topic, ch); err != nil {
|
||||
return fmt.Errorf("could not subscribe: %w", err)
|
||||
}
|
||||
|
||||
c.mu.Lock()
|
||||
c.handlers[topic] = Stream{
|
||||
h: handler,
|
||||
ch: ch,
|
||||
}
|
||||
c.mu.Unlock()
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (c *Controller) Listen(ctx context.Context) {
|
||||
c.mu.RLock()
|
||||
defer c.mu.RUnlock()
|
||||
|
||||
for _, stream := range c.handlers {
|
||||
go func(stream Stream) {
|
||||
for {
|
||||
select {
|
||||
case msg := <-stream.ch:
|
||||
if err := stream.h.HandleMessage(ctx, msg); err != nil {
|
||||
c.logger.Error(logs.CouldNotHandleMessage, zap.Error(err))
|
||||
} else if err = msg.Ack(); err != nil {
|
||||
c.logger.Error(logs.CouldNotACKMessage, zap.Error(err))
|
||||
}
|
||||
case <-ctx.Done():
|
||||
return
|
||||
}
|
||||
}
|
||||
}(stream)
|
||||
}
|
||||
}
|
||||
|
||||
func (c *Controller) SendNotifications(topics map[string]string, p *handler.SendNotificationParams) error {
|
||||
event := prepareEvent(p)
|
||||
|
||||
for id, topic := range topics {
|
||||
event.Records[0].S3.ConfigurationID = id
|
||||
msg, err := json.Marshal(event)
|
||||
if err != nil {
|
||||
c.logger.Error(logs.CouldntMarshalAnEvent, zap.String("subject", topic), zap.Error(err))
|
||||
}
|
||||
if err = c.publish(topic, msg); err != nil {
|
||||
c.logger.Error(logs.CouldntSendAnEventToTopic, zap.String("subject", topic), zap.Error(err))
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (c *Controller) SendTestNotification(topic, bucketName, requestID, HostID string, now time.Time) error {
|
||||
event := &TestEvent{
|
||||
Service: "FrostFS S3",
|
||||
Event: "s3:TestEvent",
|
||||
Time: now,
|
||||
Bucket: bucketName,
|
||||
RequestID: requestID,
|
||||
HostID: HostID,
|
||||
}
|
||||
|
||||
msg, err := json.Marshal(event)
|
||||
if err != nil {
|
||||
return fmt.Errorf("couldn't marshal test event: %w", err)
|
||||
}
|
||||
|
||||
return c.publish(topic, msg)
|
||||
}
|
||||
|
||||
func prepareEvent(p *handler.SendNotificationParams) *Event {
|
||||
return &Event{
|
||||
Records: []EventRecord{
|
||||
{
|
||||
EventVersion: EventVersion21,
|
||||
EventSource: "frostfs:s3",
|
||||
AWSRegion: "",
|
||||
EventTime: p.Time,
|
||||
EventName: p.Event,
|
||||
UserIdentity: UserIdentity{
|
||||
PrincipalID: p.User,
|
||||
},
|
||||
RequestParameters: RequestParameters{
|
||||
SourceIPAddress: p.ReqInfo.RemoteHost,
|
||||
},
|
||||
ResponseElements: nil,
|
||||
S3: S3Entity{
|
||||
SchemaVersion: "1.0",
|
||||
// ConfigurationID is skipped and will be placed later
|
||||
Bucket: Bucket{
|
||||
Name: p.BktInfo.Name,
|
||||
OwnerIdentity: UserIdentity{PrincipalID: p.BktInfo.Owner.String()},
|
||||
Arn: p.BktInfo.Name,
|
||||
},
|
||||
Object: Object{
|
||||
Key: p.NotificationInfo.Name,
|
||||
Size: p.NotificationInfo.Size,
|
||||
VersionID: p.NotificationInfo.Version,
|
||||
ETag: p.NotificationInfo.HashSum,
|
||||
Sequencer: "",
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
func (c *Controller) publish(topic string, msg []byte) error {
|
||||
if _, err := c.jsClient.Publish(topic, msg); err != nil {
|
||||
return fmt.Errorf("couldn't send event: %w", err)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
|
@ -24,7 +24,6 @@ import (
|
|||
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/handler"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/layer"
|
||||
s3middleware "git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/middleware"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/notifications"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/resolver"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/creds/tokens"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/internal/frostfs"
|
||||
|
@ -62,7 +61,6 @@ type (
|
|||
pool *pool.Pool
|
||||
treePool *treepool.Pool
|
||||
key *keys.PrivateKey
|
||||
nc *notifications.Controller
|
||||
obj layer.Client
|
||||
api api.Handler
|
||||
|
||||
|
@ -88,7 +86,6 @@ type (
|
|||
maxClient maxClientsConfig
|
||||
defaultMaxAge int
|
||||
reconnectInterval time.Duration
|
||||
notificatorEnabled bool
|
||||
resolveZoneList []string
|
||||
isResolveListAllow bool // True if ResolveZoneList contains allowed zones
|
||||
frostfsidValidation bool
|
||||
|
@ -157,13 +154,13 @@ func (a *App) init(ctx context.Context) {
|
|||
a.setRuntimeParameters()
|
||||
a.initFrostfsID(ctx)
|
||||
a.initPolicyStorage(ctx)
|
||||
a.initAPI(ctx)
|
||||
a.initAPI()
|
||||
a.initMetrics()
|
||||
a.initServers(ctx)
|
||||
a.initTracing(ctx)
|
||||
}
|
||||
|
||||
func (a *App) initLayer(ctx context.Context) {
|
||||
func (a *App) initLayer() {
|
||||
a.initResolver()
|
||||
|
||||
// prepare random key for anonymous requests
|
||||
|
@ -188,18 +185,6 @@ func (a *App) initLayer(ctx context.Context) {
|
|||
|
||||
// prepare object layer
|
||||
a.obj = layer.NewLayer(a.log, frostfs.NewFrostFS(a.pool, a.key), layerCfg)
|
||||
|
||||
if a.cfg.GetBool(cfgEnableNATS) {
|
||||
nopts := getNotificationsOptions(a.cfg, a.log)
|
||||
a.nc, err = notifications.NewController(nopts, a.log)
|
||||
if err != nil {
|
||||
a.log.Fatal(logs.FailedToEnableNotifications, zap.Error(err))
|
||||
}
|
||||
|
||||
if err = a.obj.Initialize(ctx, a.nc); err != nil {
|
||||
a.log.Fatal(logs.CouldntInitializeLayer, zap.Error(err))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func newAppSettings(log *Logger, v *viper.Viper) *appSettings {
|
||||
|
@ -208,7 +193,6 @@ func newAppSettings(log *Logger, v *viper.Viper) *appSettings {
|
|||
maxClient: newMaxClients(v),
|
||||
defaultMaxAge: fetchDefaultMaxAge(v, log.logger),
|
||||
reconnectInterval: fetchReconnectInterval(v),
|
||||
notificatorEnabled: v.GetBool(cfgEnableNATS),
|
||||
frostfsidValidation: v.GetBool(cfgFrostfsIDValidationEnabled),
|
||||
}
|
||||
|
||||
|
@ -343,10 +327,6 @@ func (s *appSettings) DefaultMaxAge() int {
|
|||
return s.defaultMaxAge
|
||||
}
|
||||
|
||||
func (s *appSettings) NotificatorEnabled() bool {
|
||||
return s.notificatorEnabled
|
||||
}
|
||||
|
||||
func (s *appSettings) ResolveZoneList() []string {
|
||||
return s.resolveZoneList
|
||||
}
|
||||
|
@ -468,8 +448,8 @@ func (s *appSettings) RetryStrategy() handler.RetryStrategy {
|
|||
return s.retryStrategy
|
||||
}
|
||||
|
||||
func (a *App) initAPI(ctx context.Context) {
|
||||
a.initLayer(ctx)
|
||||
func (a *App) initAPI() {
|
||||
a.initLayer()
|
||||
a.initHandler()
|
||||
}
|
||||
|
||||
|
@ -908,17 +888,6 @@ func (a *App) stopServices() {
|
|||
}
|
||||
}
|
||||
|
||||
func getNotificationsOptions(v *viper.Viper, l *zap.Logger) *notifications.Options {
|
||||
cfg := notifications.Options{}
|
||||
cfg.URL = v.GetString(cfgNATSEndpoint)
|
||||
cfg.Timeout = fetchNATSTimeout(v, l)
|
||||
cfg.TLSCertFilepath = v.GetString(cfgNATSTLSCertFile)
|
||||
cfg.TLSAuthPrivateKeyFilePath = v.GetString(cfgNATSAuthPrivateKeyFile)
|
||||
cfg.RootCAFiles = v.GetStringSlice(cfgNATSRootCAFiles)
|
||||
|
||||
return &cfg
|
||||
}
|
||||
|
||||
func getCacheOptions(v *viper.Viper, l *zap.Logger) *layer.CachesConfig {
|
||||
cacheCfg := layer.DefaultCachesConfigs(l)
|
||||
|
||||
|
@ -976,7 +945,7 @@ func getFrostfsIDCacheConfig(v *viper.Viper, l *zap.Logger) *cache.Config {
|
|||
func (a *App) initHandler() {
|
||||
var err error
|
||||
|
||||
a.api, err = handler.New(a.log, a.obj, a.nc, a.settings, a.policyStorage, a.frostfsid)
|
||||
a.api, err = handler.New(a.log, a.obj, a.settings, a.policyStorage, a.frostfsid)
|
||||
if err != nil {
|
||||
a.log.Fatal(logs.CouldNotInitializeAPIHandler, zap.Error(err))
|
||||
}
|
||||
|
|
|
@ -14,7 +14,6 @@ import (
|
|||
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/handler"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/notifications"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/resolver"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/internal/logs"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/internal/version"
|
||||
|
@ -120,14 +119,6 @@ const ( // Settings.
|
|||
|
||||
cfgAccessBoxCacheRemovingCheckInterval = "cache.accessbox.removing_check_interval"
|
||||
|
||||
// NATS.
|
||||
cfgEnableNATS = "nats.enabled"
|
||||
cfgNATSEndpoint = "nats.endpoint"
|
||||
cfgNATSTimeout = "nats.timeout"
|
||||
cfgNATSTLSCertFile = "nats.cert_file"
|
||||
cfgNATSAuthPrivateKeyFile = "nats.key_file"
|
||||
cfgNATSRootCAFiles = "nats.root_ca"
|
||||
|
||||
// Policy.
|
||||
cfgPolicyDefault = "placement_policy.default"
|
||||
cfgPolicyRegionMapFile = "placement_policy.region_mapping"
|
||||
|
@ -376,19 +367,6 @@ func fetchDefaultPolicy(l *zap.Logger, cfg *viper.Viper) netmap.PlacementPolicy
|
|||
return policy
|
||||
}
|
||||
|
||||
func fetchNATSTimeout(cfg *viper.Viper, l *zap.Logger) time.Duration {
|
||||
timeout := cfg.GetDuration(cfgNATSTimeout)
|
||||
if timeout <= 0 {
|
||||
l.Error(logs.InvalidLifetimeUsingDefaultValue,
|
||||
zap.String("parameter", cfgNATSTimeout),
|
||||
zap.Duration("value in config", timeout),
|
||||
zap.Duration("default", notifications.DefaultTimeout))
|
||||
timeout = notifications.DefaultTimeout
|
||||
}
|
||||
|
||||
return timeout
|
||||
}
|
||||
|
||||
func fetchCacheLifetime(v *viper.Viper, l *zap.Logger, cfgEntry string, defaultValue time.Duration) time.Duration {
|
||||
if v.IsSet(cfgEntry) {
|
||||
lifetime := v.GetDuration(cfgEntry)
|
||||
|
|
|
@ -88,7 +88,7 @@ S3_GW_CACHE_BUCKETS_SIZE=1000
|
|||
# Cache which contains mapping of nice name to object addresses
|
||||
S3_GW_CACHE_NAMES_LIFETIME=1m
|
||||
S3_GW_CACHE_NAMES_SIZE=10000
|
||||
# Cache for system objects in a bucket: bucket settings, notification configuration etc
|
||||
# Cache for system objects in a bucket: bucket settings etc
|
||||
S3_GW_CACHE_SYSTEM_LIFETIME=5m
|
||||
S3_GW_CACHE_SYSTEM_SIZE=100000
|
||||
# Cache which stores access box with tokens by its address
|
||||
|
@ -105,14 +105,6 @@ S3_GW_CACHE_MORPH_POLICY_SIZE=10000
|
|||
S3_GW_CACHE_FROSTFSID_LIFETIME=1m
|
||||
S3_GW_CACHE_FROSTFSID_SIZE=10000
|
||||
|
||||
# NATS
|
||||
S3_GW_NATS_ENABLED=true
|
||||
S3_GW_NATS_ENDPOINT=nats://nats.frostfs.devenv:4222
|
||||
S3_GW_NATS_TIMEOUT=30s
|
||||
S3_GW_NATS_CERT_FILE=/path/to/cert
|
||||
S3_GW_NATS_KEY_FILE=/path/to/key
|
||||
S3_GW_NATS_ROOT_CA=/path/to/ca
|
||||
|
||||
# Default policy of placing containers in FrostFS
|
||||
# If a user sends a request `CreateBucket` and doesn't define policy for placing of a container in FrostFS, the S3 Gateway
|
||||
# will put the container with default policy. It can be specified via environment variable, e.g.:
|
||||
|
|
|
@ -105,7 +105,7 @@ cache:
|
|||
buckets:
|
||||
lifetime: 1m
|
||||
size: 500
|
||||
# Cache for system objects in a bucket: bucket settings, notification configuration etc
|
||||
# Cache for system objects in a bucket: bucket settings etc
|
||||
system:
|
||||
lifetime: 2m
|
||||
size: 1000
|
||||
|
@ -127,14 +127,6 @@ cache:
|
|||
lifetime: 1m
|
||||
size: 10000
|
||||
|
||||
nats:
|
||||
enabled: true
|
||||
endpoint: nats://localhost:4222
|
||||
timeout: 30s
|
||||
cert_file: /path/to/cert
|
||||
key_file: /path/to/key
|
||||
root_ca: /path/to/ca
|
||||
|
||||
# Parameters of FrostFS container placement policy
|
||||
placement_policy:
|
||||
# Default policy of placing containers in FrostFS
|
||||
|
|
|
@ -177,7 +177,6 @@ There are some custom types used for brevity:
|
|||
| `server` | [Server configuration](#server-section) |
|
||||
| `logger` | [Logger configuration](#logger-section) |
|
||||
| `cache` | [Cache configuration](#cache-section) |
|
||||
| `nats` | [NATS configuration](#nats-section) |
|
||||
| `cors` | [CORS configuration](#cors-section) |
|
||||
| `pprof` | [Pprof configuration](#pprof-section) |
|
||||
| `prometheus` | [Prometheus configuration](#prometheus-section) |
|
||||
|
@ -411,18 +410,18 @@ cache:
|
|||
size: 10000
|
||||
```
|
||||
|
||||
| Parameter | Type | Default value | Description |
|
||||
|-----------------|-------------------------------------------------|-----------------------------------|----------------------------------------------------------------------------------------|
|
||||
| `objects` | [Cache config](#cache-subsection) | `lifetime: 5m`<br>`size: 1000000` | Cache for objects (FrostFS headers). |
|
||||
| `list` | [Cache config](#cache-subsection) | `lifetime: 60s`<br>`size: 100000` | Cache which keeps lists of objects in buckets. |
|
||||
| `list_session` | [Cache config](#cache-subsection) | `lifetime: 60s`<br>`size: 100` | Cache which keeps listing session. |
|
||||
| `names` | [Cache config](#cache-subsection) | `lifetime: 60s`<br>`size: 10000` | Cache which contains mapping of nice name to object addresses. |
|
||||
| `buckets` | [Cache config](#cache-subsection) | `lifetime: 60s`<br>`size: 1000` | Cache which contains mapping of bucket name to bucket info. |
|
||||
| `system` | [Cache config](#cache-subsection) | `lifetime: 5m`<br>`size: 10000` | Cache for system objects in a bucket: bucket settings, notification configuration etc. |
|
||||
| `accessbox` | [Accessbox cache config](#accessbox-subsection) | `lifetime: 10m`<br>`size: 100` | Cache which stores access box with tokens by its address. |
|
||||
| `accesscontrol` | [Cache config](#cache-subsection) | `lifetime: 1m`<br>`size: 100000` | Cache which stores owner to cache operation mapping. |
|
||||
| `morph_policy` | [Cache config](#cache-subsection) | `lifetime: 1m`<br>`size: 10000` | Cache which stores list of policy chains. |
|
||||
| `frostfsid` | [Cache config](#cache-subsection) | `lifetime: 1m`<br>`size: 10000` | Cache which stores FrostfsID subject info. |
|
||||
| Parameter | Type | Default value | Description |
|
||||
|-----------------|-------------------------------------------------|-----------------------------------|----------------------------------------------------------------|
|
||||
| `objects` | [Cache config](#cache-subsection) | `lifetime: 5m`<br>`size: 1000000` | Cache for objects (FrostFS headers). |
|
||||
| `list` | [Cache config](#cache-subsection) | `lifetime: 60s`<br>`size: 100000` | Cache which keeps lists of objects in buckets. |
|
||||
| `list_session` | [Cache config](#cache-subsection) | `lifetime: 60s`<br>`size: 100` | Cache which keeps listing session. |
|
||||
| `names` | [Cache config](#cache-subsection) | `lifetime: 60s`<br>`size: 10000` | Cache which contains mapping of nice name to object addresses. |
|
||||
| `buckets` | [Cache config](#cache-subsection) | `lifetime: 60s`<br>`size: 1000` | Cache which contains mapping of bucket name to bucket info. |
|
||||
| `system` | [Cache config](#cache-subsection) | `lifetime: 5m`<br>`size: 10000` | Cache for system objects in a bucket: bucket settings etc. |
|
||||
| `accessbox` | [Accessbox cache config](#accessbox-subsection) | `lifetime: 10m`<br>`size: 100` | Cache which stores access box with tokens by its address. |
|
||||
| `accesscontrol` | [Cache config](#cache-subsection) | `lifetime: 1m`<br>`size: 100000` | Cache which stores owner to cache operation mapping. |
|
||||
| `morph_policy` | [Cache config](#cache-subsection) | `lifetime: 1m`<br>`size: 10000` | Cache which stores list of policy chains. |
|
||||
| `frostfsid` | [Cache config](#cache-subsection) | `lifetime: 1m`<br>`size: 10000` | Cache which stores FrostfsID subject info. |
|
||||
|
||||
#### `cache` subsection
|
||||
|
||||
|
@ -450,36 +449,6 @@ size: 100
|
|||
| `lifetime` | `duration` | '10m' | Lifetime of entries in cache. |
|
||||
| `size` | `int` | '100 | LRU cache size. |
|
||||
|
||||
### `nats` section
|
||||
|
||||
This is an advanced section, use with caution.
|
||||
You can turn on notifications about successful completions of basic operations, and the gateway will send notifications
|
||||
via NATS JetStream.
|
||||
|
||||
1. to configure the NATS server with JetStream
|
||||
2. to specify NATS parameters for the S3 GW. It's ***necessary*** to define a values of `nats.enable` or
|
||||
`S3_GW_NATS_ENABLED` as `True`
|
||||
3. to configure notifications in a bucket
|
||||
|
||||
```yaml
|
||||
nats:
|
||||
enabled: true
|
||||
endpoint: nats://localhost:4222
|
||||
timeout: 30s
|
||||
cert_file: /path/to/cert
|
||||
key_file: /path/to/key
|
||||
root_ca: /path/to/ca
|
||||
```
|
||||
|
||||
| Parameter | Type | Default value | Description |
|
||||
|---------------|------------|---------------|------------------------------------------------------|
|
||||
| `enabled` | `bool` | `false` | Flag to enable the service. |
|
||||
| `endpoint` | `string` | | NATS endpoint to connect to. |
|
||||
| `timeout` | `duration` | `30s` | Timeout for the object notification operation. |
|
||||
| `certificate` | `string` | | Path to the client certificate. |
|
||||
| `key` | `string` | | Path to the client key. |
|
||||
| `ca` | `string` | | Override root CA used to verify server certificates. |
|
||||
|
||||
### `cors` section
|
||||
|
||||
```yaml
|
||||
|
|
|
@ -13,6 +13,5 @@ Each node keeps one of the types of data as a set of **key-value pairs**:
|
|||
|
||||
Some data takes up a lot of memory, so we store it in FrostFS nodes as an object with payload.
|
||||
But we keep these objects' metadata in the Tree service too:
|
||||
* Notification configuration
|
||||
* CORS
|
||||
* Metadata of parts of active multipart uploads
|
||||
|
|
4
go.mod
4
go.mod
|
@ -15,7 +15,6 @@ require (
|
|||
github.com/go-chi/chi/v5 v5.0.8
|
||||
github.com/google/uuid v1.3.1
|
||||
github.com/minio/sio v0.3.0
|
||||
github.com/nats-io/nats.go v1.13.1-0.20220121202836-972a071d373d
|
||||
github.com/nspcc-dev/neo-go v0.105.0
|
||||
github.com/panjf2000/ants/v2 v2.5.0
|
||||
github.com/prometheus/client_golang v1.15.1
|
||||
|
@ -68,9 +67,6 @@ require (
|
|||
github.com/matttproud/golang_protobuf_extensions v1.0.4 // indirect
|
||||
github.com/mitchellh/mapstructure v1.5.0 // indirect
|
||||
github.com/mr-tron/base58 v1.2.0 // indirect
|
||||
github.com/nats-io/nats-server/v2 v2.7.1 // indirect
|
||||
github.com/nats-io/nkeys v0.3.0 // indirect
|
||||
github.com/nats-io/nuid v1.0.1 // indirect
|
||||
github.com/nspcc-dev/go-ordered-json v0.0.0-20231123160306-3374ff1e7a3c // indirect
|
||||
github.com/nspcc-dev/neo-go/pkg/interop v0.0.0-20231127165613-b35f351f0ba0 // indirect
|
||||
github.com/nspcc-dev/rfc6979 v0.2.0 // indirect
|
||||
|
|
13
go.sum
13
go.sum
|
@ -223,7 +223,6 @@ github.com/josharian/intern v1.0.0/go.mod h1:5DoeVV0s6jJacbCEi61lwdGj/aVlrQvzHFF
|
|||
github.com/jstemmer/go-junit-report v0.0.0-20190106144839-af01ea7f8024/go.mod h1:6v2b51hI/fHJwM22ozAgKL4VKDeJcHhJFhtBdhmNjmU=
|
||||
github.com/jstemmer/go-junit-report v0.9.1/go.mod h1:Brl9GWCQeLvo8nXZwPNNblvFj/XSXhF0NWZEnDohbsk=
|
||||
github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck=
|
||||
github.com/klauspost/compress v1.13.4 h1:0zhec2I8zGnjWcKyLl6i3gPqKANCCn5e9xmviEEeX6s=
|
||||
github.com/kr/fs v0.1.0/go.mod h1:FFnZGqtBN9Gxj7eW1uZ42v5BccTP0vu6NEaFoC2HwRg=
|
||||
github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo=
|
||||
github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE=
|
||||
|
@ -236,7 +235,6 @@ github.com/mailru/easyjson v0.7.7 h1:UGYAvKxe3sBsEDzO8ZeWOSlIQfWFlxbzLZe7hwFURr0
|
|||
github.com/mailru/easyjson v0.7.7/go.mod h1:xzfreul335JAWq5oZzymOObrkdz5UnU4kGfJJLY9Nlc=
|
||||
github.com/matttproud/golang_protobuf_extensions v1.0.4 h1:mmDVorXM7PCGKw94cs5zkfA9PSy5pEvNWRP0ET0TIVo=
|
||||
github.com/matttproud/golang_protobuf_extensions v1.0.4/go.mod h1:BSXmuO+STAnVfrANrmjBb36TMTDstsz7MSK+HVaYKv4=
|
||||
github.com/minio/highwayhash v1.0.1 h1:dZ6IIu8Z14VlC0VpfKofAhCy74wu/Qb5gcn52yWoz/0=
|
||||
github.com/minio/sio v0.3.0 h1:syEFBewzOMOYVzSTFpp1MqpSZk8rUNbz8VIIc+PNzus=
|
||||
github.com/minio/sio v0.3.0/go.mod h1:8b0yPp2avGThviy/+OCJBI6OMpvxoUuiLvE6F1lebhw=
|
||||
github.com/mitchellh/mapstructure v1.5.0 h1:jeMsZIYE/09sWLaz43PL7Gy6RuMjD2eJVyuac5Z2hdY=
|
||||
|
@ -244,15 +242,6 @@ github.com/mitchellh/mapstructure v1.5.0/go.mod h1:bFUtVrKA4DC2yAKiSyO/QUcy7e+RR
|
|||
github.com/mmcloughlin/addchain v0.4.0 h1:SobOdjm2xLj1KkXN5/n0xTIWyZA2+s99UCY1iPfkHRY=
|
||||
github.com/mr-tron/base58 v1.2.0 h1:T/HDJBh4ZCPbU39/+c3rRvE0uKBQlU27+QI8LJ4t64o=
|
||||
github.com/mr-tron/base58 v1.2.0/go.mod h1:BinMc/sQntlIE1frQmRFPUoPA1Zkr8VRgBdjWI2mNwc=
|
||||
github.com/nats-io/jwt/v2 v2.2.1-0.20220113022732-58e87895b296 h1:vU9tpM3apjYlLLeY23zRWJ9Zktr5jp+mloR942LEOpY=
|
||||
github.com/nats-io/nats-server/v2 v2.7.1 h1:SDj8R0PJPVekw3EgHxGtTfJUuMbsuaul1nwWFI3xTyk=
|
||||
github.com/nats-io/nats-server/v2 v2.7.1/go.mod h1:tckmrt0M6bVaDT3kmh9UrIq/CBOBBse+TpXQi5ldaa8=
|
||||
github.com/nats-io/nats.go v1.13.1-0.20220121202836-972a071d373d h1:GRSmEJutHkdoxKsRypP575IIdoXe7Bm6yHQF6GcDBnA=
|
||||
github.com/nats-io/nats.go v1.13.1-0.20220121202836-972a071d373d/go.mod h1:BPko4oXsySz4aSWeFgOHLZs3G4Jq4ZAyE6/zMCxRT6w=
|
||||
github.com/nats-io/nkeys v0.3.0 h1:cgM5tL53EvYRU+2YLXIK0G2mJtK12Ft9oeooSZMA2G8=
|
||||
github.com/nats-io/nkeys v0.3.0/go.mod h1:gvUNGjVcM2IPr5rCsRsC6Wb3Hr2CQAm08dsxtV6A5y4=
|
||||
github.com/nats-io/nuid v1.0.1 h1:5iA8DT8V7q8WK2EScv2padNa/rTESc1KdnPw4TC2paw=
|
||||
github.com/nats-io/nuid v1.0.1/go.mod h1:19wcPz3Ph3q0Jbyiqsd0kePYG7A95tJPxeL+1OSON2c=
|
||||
github.com/nspcc-dev/go-ordered-json v0.0.0-20231123160306-3374ff1e7a3c h1:OOQeE613BH93ICPq3eke5N78gWNeMjcBWkmD2NKyXVg=
|
||||
github.com/nspcc-dev/go-ordered-json v0.0.0-20231123160306-3374ff1e7a3c/go.mod h1:79bEUDEviBHJMFV6Iq6in57FEOCMcRhfQnfaf0ETA5U=
|
||||
github.com/nspcc-dev/neo-go v0.105.0 h1:vtNZYFEFySK8zRDhLzQYha849VzWrcKezlnq/oNQg/w=
|
||||
|
@ -375,7 +364,6 @@ golang.org/x/crypto v0.0.0-20190513172903-22d7a77e9e5f/go.mod h1:yigFU9vqHzYiE8U
|
|||
golang.org/x/crypto v0.0.0-20190605123033-f99c8df09eb5/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
|
||||
golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
|
||||
golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
|
||||
golang.org/x/crypto v0.0.0-20210314154223-e6e6c4f2bb5b/go.mod h1:T9bdIzuCu7OtxOm1hfPfRQxPLYneinmdGuTeoZ9dtd4=
|
||||
golang.org/x/crypto v0.0.0-20210421170649-83a5a9bb288b/go.mod h1:T9bdIzuCu7OtxOm1hfPfRQxPLYneinmdGuTeoZ9dtd4=
|
||||
golang.org/x/crypto v0.0.0-20211108221036-ceb1ce70b4fa/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc=
|
||||
golang.org/x/crypto v0.21.0 h1:X31++rzVUdKhX5sWmSOFZxx8UW/ldWx55cbf08iNAMA=
|
||||
|
@ -541,7 +529,6 @@ golang.org/x/text v0.14.0/go.mod h1:18ZOQIKpY8NJVqYksKHtTdi31H5itFRjB5/qKTNYzSU=
|
|||
golang.org/x/time v0.0.0-20181108054448-85acf8d2951c/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
|
||||
golang.org/x/time v0.0.0-20190308202827-9d24e82272b4/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
|
||||
golang.org/x/time v0.0.0-20191024005414-555d28b269f0/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
|
||||
golang.org/x/time v0.1.0 h1:xYY+Bajn2a7VBmTM5GikTmnK8ZuX8YgnQCqZpbBNtmA=
|
||||
golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
|
||||
golang.org/x/tools v0.0.0-20190114222345-bf090417da8b/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
|
||||
golang.org/x/tools v0.0.0-20190226205152-f727befe758c/go.mod h1:9Yl7xja0Znq3iFh3HoIrodX9oNMXvdceNzlUR8zjMvY=
|
||||
|
|
|
@ -54,16 +54,12 @@ const (
|
|||
InvalidCacheEntryType = "invalid cache entry type" // Warn in ../../api/cache/*
|
||||
InvalidCacheKeyType = "invalid cache key type" // Warn in ../../api/cache/objectslist.go
|
||||
ObjectIsCopied = "object is copied" // Info in ../../api/handler/copy.go
|
||||
CouldntSendNotification = "couldn't send notification: %w" // Error in ../../api/handler/*
|
||||
FailedToSendTestEventBecauseNotificationsIsDisabled = "failed to send test event because notifications is disabled" // Warn in ../../api/handler/notifications.go
|
||||
RequestFailed = "request failed" // Error in ../../api/handler/util.go
|
||||
GetBucketInfo = "get bucket info" // Warn in ../../api/handler/cors.go
|
||||
GetBucketCors = "get bucket cors" // Warn in ../../api/handler/cors.go
|
||||
SomeACLNotFullyMapped = "some acl not fully mapped" // Warn in ../../api/handler/acl.go
|
||||
CouldntDeleteObject = "couldn't delete object" // Error in ../../api/layer/layer.go
|
||||
NotificatorIsDisabledS3WontProduceNotificationEvents = "notificator is disabled, s3 won't produce notification events" // Warn in ../../api/handler/api.go
|
||||
BucketIsCreated = "bucket is created" // Info in ../../api/handler/put.go
|
||||
CouldntDeleteNotificationConfigurationObject = "couldn't delete notification configuration object" // Error in ../../api/layer/notifications.go
|
||||
CouldNotParseContainerObjectLockEnabledAttribute = "could not parse container object lock enabled attribute" // Error in ../../api/layer/container.go
|
||||
CouldNotListUserContainers = "could not list user containers" // Error in ../../api/layer/container.go
|
||||
CouldNotFetchContainerInfo = "could not fetch container info" // Error in ../../api/layer/container.go
|
||||
|
@ -93,7 +89,6 @@ const (
|
|||
CouldntCacheLockInfo = "couldn't cache lock info" // Error in ../../api/layer/cache.go
|
||||
CouldntCacheBucketSettings = "couldn't cache bucket settings" // Warn in ../../api/layer/cache.go
|
||||
CouldntCacheCors = "couldn't cache cors" // Warn in ../../api/layer/cache.go
|
||||
CouldntCacheNotificationConfiguration = "couldn't cache notification configuration" // Warn in ../../api/layer/cache.go
|
||||
CouldntCacheListPolicyChains = "couldn't cache list policy chains" // Warn in ../../api/layer/cache.go
|
||||
RequestEnd = "request end" // Info in ../../api/middleware/response.go
|
||||
CouldntReceiveAccessBoxForGateKeyRandomKeyWillBeUsed = "couldn't receive access box for gate key, random key will be used" // Debug in ../../api/middleware/auth.go
|
||||
|
@ -101,15 +96,9 @@ const (
|
|||
FailedToResolveCID = "failed to resolve CID" // Debug in ../../api/middleware/metrics.go
|
||||
RequestStart = "request start" // Info in ../../api/middleware/reqinfo.go
|
||||
FailedToUnescapeObjectName = "failed to unescape object name" // Warn in ../../api/middleware/reqinfo.go
|
||||
CouldNotHandleMessage = "could not handle message" // Error in ../../api/notifications/controller.go
|
||||
CouldNotACKMessage = "could not ACK message" // Error in ../../api/notifications/controller.go
|
||||
CouldntMarshalAnEvent = "couldn't marshal an event" // Error in ../../api/notifications/controller.go
|
||||
CouldntSendAnEventToTopic = "couldn't send an event to topic" // Error in ../../api/notifications/controller.go
|
||||
InvalidDefaultMaxAge = "invalid defaultMaxAge" // Fatal in ../../cmd/s3-gw/app_settings.go
|
||||
CantShutDownService = "can't shut down service" // Panic in ../../cmd/s3-gw/service.go
|
||||
CouldntGenerateRandomKey = "couldn't generate random key" // Fatal in ../../cmd/s3-gw/app.go
|
||||
FailedToEnableNotifications = "failed to enable notifications" // Fatal in ../../cmd/s3-gw/app.go
|
||||
CouldntInitializeLayer = "couldn't initialize layer" // Fatal in ../../cmd/s3-gw/app.go
|
||||
FailedToCreateResolver = "failed to create resolver" // Fatal in ../../cmd/s3-gw/app.go
|
||||
CouldNotLoadFrostFSPrivateKey = "could not load FrostFS private key" // Fatal in ../../cmd/s3-gw/app.go
|
||||
FailedToCreateConnectionPool = "failed to create connection pool" // Fatal in ../../cmd/s3-gw/app.go
|
||||
|
|
|
@ -108,7 +108,6 @@ const (
|
|||
createdKV = "Created"
|
||||
|
||||
settingsFileName = "bucket-settings"
|
||||
notifConfFileName = "bucket-notifications"
|
||||
corsFilename = "bucket-cors"
|
||||
bucketTaggingFilename = "bucket-tagging"
|
||||
|
||||
|
@ -116,7 +115,7 @@ const (
|
|||
versionTree = "version"
|
||||
|
||||
// systemTree -- ID of a tree with system objects
|
||||
// i.e. bucket settings with versioning and lock configuration, cors, notifications.
|
||||
// i.e. bucket settings with versioning and lock configuration, cors.
|
||||
systemTree = "system"
|
||||
|
||||
separator = "/"
|
||||
|
@ -400,36 +399,6 @@ func (c *Tree) PutSettingsNode(ctx context.Context, bktInfo *data.BucketInfo, se
|
|||
return c.service.MoveNode(ctx, bktInfo, systemTree, node.ID, 0, meta)
|
||||
}
|
||||
|
||||
func (c *Tree) GetNotificationConfigurationNode(ctx context.Context, bktInfo *data.BucketInfo) (oid.ID, error) {
|
||||
node, err := c.getSystemNode(ctx, bktInfo, []string{notifConfFileName}, []string{oidKV})
|
||||
if err != nil {
|
||||
return oid.ID{}, err
|
||||
}
|
||||
|
||||
return node.ObjID, nil
|
||||
}
|
||||
|
||||
func (c *Tree) PutNotificationConfigurationNode(ctx context.Context, bktInfo *data.BucketInfo, objID oid.ID) (oid.ID, error) {
|
||||
node, err := c.getSystemNode(ctx, bktInfo, []string{notifConfFileName}, []string{oidKV})
|
||||
isErrNotFound := errors.Is(err, layer.ErrNodeNotFound)
|
||||
if err != nil && !isErrNotFound {
|
||||
return oid.ID{}, fmt.Errorf("couldn't get node: %w", err)
|
||||
}
|
||||
|
||||
meta := make(map[string]string)
|
||||
meta[FileNameKey] = notifConfFileName
|
||||
meta[oidKV] = objID.EncodeToString()
|
||||
|
||||
if isErrNotFound {
|
||||
if _, err = c.service.AddNode(ctx, bktInfo, systemTree, 0, meta); err != nil {
|
||||
return oid.ID{}, err
|
||||
}
|
||||
return oid.ID{}, layer.ErrNoNodeToRemove
|
||||
}
|
||||
|
||||
return node.ObjID, c.service.MoveNode(ctx, bktInfo, systemTree, node.ID, 0, meta)
|
||||
}
|
||||
|
||||
func (c *Tree) GetBucketCORS(ctx context.Context, bktInfo *data.BucketInfo) (oid.ID, error) {
|
||||
node, err := c.getSystemNode(ctx, bktInfo, []string{corsFilename}, []string{oidKV})
|
||||
if err != nil {
|
||||
|
|
Loading…
Reference in a new issue