From b96c3c5a332ad33c35338ae95c6a0e0bbcb13c62 Mon Sep 17 00:00:00 2001 From: Denis Kirillov Date: Mon, 28 Feb 2022 11:02:05 +0300 Subject: [PATCH] [#195] Refactor Using object settings to save bucket versioning Signed-off-by: Denis Kirillov --- api/cache/system.go | 18 ++++++++++ api/data/info.go | 9 +++-- api/handler/multipart_upload.go | 4 +-- api/handler/put.go | 32 +++++++++++++----- api/handler/versioning.go | 25 +++++++++----- api/layer/layer.go | 17 ++++------ api/layer/system_object.go | 57 +++++++++++++++++++++++++++++++ api/layer/versioning.go | 12 +++---- api/layer/versioning_test.go | 60 ++++++++++++++++++++------------- 9 files changed, 173 insertions(+), 61 deletions(-) diff --git a/api/cache/system.go b/api/cache/system.go index 5b9a5e61..b7d981f9 100644 --- a/api/cache/system.go +++ b/api/cache/system.go @@ -61,6 +61,20 @@ func (o *SystemCache) GetCORS(key string) *data.CORSConfiguration { return result } +func (o *SystemCache) GetSettings(key string) *data.BucketSettings { + entry, err := o.cache.Get(key) + if err != nil { + return nil + } + + result, ok := entry.(*data.BucketSettings) + if !ok { + return nil + } + + return result +} + func (o *SystemCache) GetNotificationConfiguration(key string) *data.NotificationConfiguration { entry, err := o.cache.Get(key) if err != nil { @@ -84,6 +98,10 @@ func (o *SystemCache) PutCORS(key string, obj *data.CORSConfiguration) error { return o.cache.Set(key, obj) } +func (o *SystemCache) PutSettings(key string, settings *data.BucketSettings) error { + return o.cache.Set(key, settings) +} + func (o *SystemCache) PutNotificationConfiguration(key string, obj *data.NotificationConfiguration) error { return o.cache.Set(key, obj) } diff --git a/api/data/info.go b/api/data/info.go index f706ed2a..89823eac 100644 --- a/api/data/info.go +++ b/api/data/info.go @@ -11,7 +11,7 @@ import ( ) const ( - bktVersionSettingsObject = ".s3-versioning-settings" + bktSettingsObject = ".s3-settings" bktCORSConfigurationObject = ".s3-cors" bktNotificationConfigurationObject = ".s3-notifications" ) @@ -45,6 +45,11 @@ type ( Headers map[string]string } + // BucketSettings stores settings such as versioning. + BucketSettings struct { + VersioningEnabled bool `json:"versioning_enabled"` + } + // CORSConfiguration stores CORS configuration of a request. CORSConfiguration struct { XMLName xml.Name `xml:"http://s3.amazonaws.com/doc/2006-03-01/ CORSConfiguration" json:"-"` @@ -63,7 +68,7 @@ type ( ) // SettingsObjectName is system name for bucket settings file. -func (b *BucketInfo) SettingsObjectName() string { return bktVersionSettingsObject } +func (b *BucketInfo) SettingsObjectName() string { return bktSettingsObject } // CORSObjectName returns system name for bucket CORS configuration file. func (b *BucketInfo) CORSObjectName() string { return bktCORSConfigurationObject } diff --git a/api/handler/multipart_upload.go b/api/handler/multipart_upload.go index f96623b1..deec6f85 100644 --- a/api/handler/multipart_upload.go +++ b/api/handler/multipart_upload.go @@ -458,9 +458,9 @@ func (h *handler) CompleteMultipartUploadHandler(w http.ResponseWriter, r *http. Key: objInfo.Name, } - if versioning, err := h.obj.GetBucketVersioning(r.Context(), reqInfo.BucketName); err != nil { + if settings, err := h.obj.GetBucketSettings(r.Context(), bktInfo); err != nil { h.log.Warn("couldn't get bucket versioning", zap.String("bucket name", reqInfo.BucketName), zap.Error(err)) - } else if versioning.VersioningEnabled { + } else if settings.VersioningEnabled { w.Header().Set(api.AmzVersionID, objInfo.Version()) } diff --git a/api/handler/put.go b/api/handler/put.go index dab5c356..f7d27685 100644 --- a/api/handler/put.go +++ b/api/handler/put.go @@ -241,9 +241,9 @@ func (h *handler) PutObjectHandler(w http.ResponseWriter, r *http.Request) { } } - if versioning, err := h.obj.GetBucketVersioning(r.Context(), reqInfo.BucketName); err != nil { + if settings, err := h.obj.GetBucketSettings(r.Context(), bktInfo); err != nil { h.log.Warn("couldn't get bucket versioning", zap.String("bucket name", reqInfo.BucketName), zap.Error(err)) - } else if versioning.VersioningEnabled { + } else if settings.VersioningEnabled { w.Header().Set(api.AmzVersionID, info.Version()) } @@ -339,9 +339,15 @@ func (h *handler) PostObject(w http.ResponseWriter, r *http.Request) { } } - if versioning, err := h.obj.GetBucketVersioning(r.Context(), reqInfo.BucketName); err != nil { + bktInfo, err := h.obj.GetBucketInfo(r.Context(), reqInfo.BucketName) + if err != nil { + h.logAndSendError(w, "could not get bucket info", reqInfo, err) + return + } + + if settings, err := h.obj.GetBucketSettings(r.Context(), bktInfo); err != nil { h.log.Warn("couldn't get bucket versioning", zap.String("bucket name", reqInfo.BucketName), zap.Error(err)) - } else if versioning.VersioningEnabled { + } else if settings.VersioningEnabled { w.Header().Set(api.AmzVersionID, info.Version()) } @@ -574,12 +580,20 @@ func (h *handler) CreateBucketHandler(w http.ResponseWriter, r *http.Request) { } if p.ObjectLockEnabled { - vp := &layer.PutVersioningParams{ - Bucket: reqInfo.BucketName, - Settings: &layer.BucketSettings{VersioningEnabled: true}, + bktInfo, err := h.obj.GetBucketInfo(r.Context(), reqInfo.BucketName) + if err != nil { + h.logAndSendError(w, "could get bucket info", reqInfo, err) + return } - if _, err = h.obj.PutBucketVersioning(r.Context(), vp); err != nil { - h.log.Error("couldn't enable bucket versioning", zap.Stringer("container_id", cid), zap.Error(err)) + + sp := &layer.PutSettingsParams{ + BktInfo: bktInfo, + Settings: &data.BucketSettings{VersioningEnabled: true}, + } + if err = h.obj.PutBucketSettings(r.Context(), sp); err != nil { + h.logAndSendError(w, "couldn't enable bucket versioning", reqInfo, err, + zap.Stringer("container_id", cid)) + return } } diff --git a/api/handler/versioning.go b/api/handler/versioning.go index 6e3c899a..70bc16e3 100644 --- a/api/handler/versioning.go +++ b/api/handler/versioning.go @@ -6,6 +6,7 @@ import ( "net/http" "github.com/nspcc-dev/neofs-s3-gw/api" + "github.com/nspcc-dev/neofs-s3-gw/api/data" "github.com/nspcc-dev/neofs-s3-gw/api/errors" "github.com/nspcc-dev/neofs-s3-gw/api/layer" "go.uber.org/zap" @@ -20,11 +21,6 @@ func (h *handler) PutBucketVersioningHandler(w http.ResponseWriter, r *http.Requ return } - p := &layer.PutVersioningParams{ - Bucket: reqInfo.BucketName, - Settings: &layer.BucketSettings{VersioningEnabled: configuration.Status == "Enabled"}, - } - bktInfo, err := h.obj.GetBucketInfo(r.Context(), reqInfo.BucketName) if err != nil { h.logAndSendError(w, "could not get bucket info", reqInfo, err) @@ -35,12 +31,25 @@ func (h *handler) PutBucketVersioningHandler(w http.ResponseWriter, r *http.Requ return } + settings, err := h.obj.GetBucketSettings(r.Context(), bktInfo) + if err != nil { + h.logAndSendError(w, "couldn't get bucket settings", reqInfo, err) + return + } + + settings.VersioningEnabled = configuration.Status == "Enabled" + + p := &layer.PutSettingsParams{ + BktInfo: bktInfo, + Settings: settings, + } + if !p.Settings.VersioningEnabled && bktInfo.ObjectLockEnabled { h.logAndSendError(w, "couldn't suspend bucket versioning", reqInfo, fmt.Errorf("object lock is enabled")) return } - if _, err := h.obj.PutBucketVersioning(r.Context(), p); err != nil { + if err = h.obj.PutBucketSettings(r.Context(), p); err != nil { h.logAndSendError(w, "couldn't put update versioning settings", reqInfo, err) return } @@ -61,7 +70,7 @@ func (h *handler) GetBucketVersioningHandler(w http.ResponseWriter, r *http.Requ return } - settings, err := h.obj.GetBucketVersioning(r.Context(), reqInfo.BucketName) + settings, err := h.obj.GetBucketSettings(r.Context(), bktInfo) if err != nil { if errors.IsS3Error(err, errors.ErrNoSuchBucket) { h.logAndSendError(w, "couldn't get versioning settings", reqInfo, err) @@ -79,7 +88,7 @@ func (h *handler) GetBucketVersioningHandler(w http.ResponseWriter, r *http.Requ } } -func formVersioningConfiguration(settings *layer.BucketSettings) *VersioningConfiguration { +func formVersioningConfiguration(settings *data.BucketSettings) *VersioningConfiguration { res := &VersioningConfiguration{} if settings == nil { return res diff --git a/api/layer/layer.go b/api/layer/layer.go index f93d6204..2ad50360 100644 --- a/api/layer/layer.go +++ b/api/layer/layer.go @@ -302,10 +302,10 @@ type ( Header map[string]string } - // PutVersioningParams stores object copy request parameters. - PutVersioningParams struct { - Bucket string - Settings *BucketSettings + // PutSettingsParams stores object copy request parameters. + PutSettingsParams struct { + BktInfo *data.BucketInfo + Settings *data.BucketSettings } // PutCORSParams stores PutCORS request parameters. @@ -314,11 +314,6 @@ type ( Reader io.Reader } - // BucketSettings stores settings such as versioning. - BucketSettings struct { - VersioningEnabled bool - } - // CopyObjectParams stores object copy request parameters. CopyObjectParams struct { SrcObject *data.ObjectInfo @@ -386,8 +381,8 @@ type ( Client interface { EphemeralKey() *keys.PublicKey - PutBucketVersioning(ctx context.Context, p *PutVersioningParams) (*data.ObjectInfo, error) - GetBucketVersioning(ctx context.Context, name string) (*BucketSettings, error) + GetBucketSettings(ctx context.Context, bktInfo *data.BucketInfo) (*data.BucketSettings, error) + PutBucketSettings(ctx context.Context, p *PutSettingsParams) error PutBucketCORS(ctx context.Context, p *PutCORSParams) error GetBucketCORS(ctx context.Context, bktInfo *data.BucketInfo) (*data.CORSConfiguration, error) diff --git a/api/layer/system_object.go b/api/layer/system_object.go index ff16a69c..7f275ae3 100644 --- a/api/layer/system_object.go +++ b/api/layer/system_object.go @@ -1,8 +1,11 @@ package layer import ( + "bytes" "context" + "encoding/json" "encoding/xml" + "fmt" "github.com/nspcc-dev/neofs-s3-gw/api/data" "github.com/nspcc-dev/neofs-s3-gw/api/errors" @@ -205,3 +208,57 @@ func (n *layer) headSystemVersions(ctx context.Context, bkt *data.BucketInfo, sy func systemObjectKey(bktInfo *data.BucketInfo, obj string) string { return bktInfo.Name + obj } + +func (n *layer) GetBucketSettings(ctx context.Context, bktInfo *data.BucketInfo) (*data.BucketSettings, error) { + if settings := n.systemCache.GetSettings(bktInfo.SettingsObjectName()); settings != nil { + return settings, nil + } + + obj, err := n.getSystemObjectFromNeoFS(ctx, bktInfo, bktInfo.SettingsObjectName()) + if err != nil { + return nil, err + } + + settings := &data.BucketSettings{} + if err = json.Unmarshal(obj.Payload(), settings); err != nil { + return nil, err + } + + if err = n.systemCache.PutSettings(bktInfo.SettingsObjectName(), settings); err != nil { + n.log.Warn("couldn't put system meta to objects cache", + zap.Stringer("object id", obj.ID()), + zap.Stringer("bucket id", bktInfo.CID), + zap.Error(err)) + } + + return settings, nil +} + +func (n *layer) PutBucketSettings(ctx context.Context, p *PutSettingsParams) error { + rawSettings, err := json.Marshal(p.Settings) + if err != nil { + return fmt.Errorf("couldn't marshal bucket settings") + } + + s := &PutSystemObjectParams{ + BktInfo: p.BktInfo, + ObjName: p.BktInfo.SettingsObjectName(), + Metadata: map[string]string{}, + Reader: bytes.NewReader(rawSettings), + } + + obj, err := n.putSystemObjectIntoNeoFS(ctx, s) + if err != nil { + return err + } + + if obj.Size == 0 { + return errors.GetAPIError(errors.ErrInternalError) + } + + if err = n.systemCache.PutSettings(p.BktInfo.SettingsObjectName(), p.Settings); err != nil { + n.log.Error("couldn't cache system object", zap.Error(err)) + } + + return nil +} diff --git a/api/layer/versioning.go b/api/layer/versioning.go index 5508165b..af7c2862 100644 --- a/api/layer/versioning.go +++ b/api/layer/versioning.go @@ -282,8 +282,8 @@ func (v *objectVersions) getVersion(oid *oid.ID) *data.ObjectInfo { } return nil } -func (n *layer) PutBucketVersioning(ctx context.Context, p *PutVersioningParams) (*data.ObjectInfo, error) { - bktInfo, err := n.GetBucketInfo(ctx, p.Bucket) +func (n *layer) PutBucketVersioning(ctx context.Context, p *PutSettingsParams) (*data.ObjectInfo, error) { + bktInfo, err := n.GetBucketInfo(ctx, p.BktInfo.Name) if err != nil { return nil, err } @@ -303,7 +303,7 @@ func (n *layer) PutBucketVersioning(ctx context.Context, p *PutVersioningParams) return n.putSystemObject(ctx, s) } -func (n *layer) GetBucketVersioning(ctx context.Context, bucketName string) (*BucketSettings, error) { +func (n *layer) GetBucketVersioning(ctx context.Context, bucketName string) (*data.BucketSettings, error) { bktInfo, err := n.GetBucketInfo(ctx, bucketName) if err != nil { return nil, err @@ -398,7 +398,7 @@ func contains(list []string, elem string) bool { return false } -func (n *layer) getBucketSettings(ctx context.Context, bktInfo *data.BucketInfo) (*BucketSettings, error) { +func (n *layer) getBucketSettings(ctx context.Context, bktInfo *data.BucketInfo) (*data.BucketSettings, error) { objInfo, err := n.headSystemObject(ctx, bktInfo, bktInfo.SettingsObjectName()) if err != nil { return nil, err @@ -407,8 +407,8 @@ func (n *layer) getBucketSettings(ctx context.Context, bktInfo *data.BucketInfo) return objectInfoToBucketSettings(objInfo), nil } -func objectInfoToBucketSettings(info *data.ObjectInfo) *BucketSettings { - res := &BucketSettings{} +func objectInfoToBucketSettings(info *data.ObjectInfo) *data.BucketSettings { + res := &data.BucketSettings{} enabled, ok := info.Headers[attrSettingsVersioningEnabled] if ok { diff --git a/api/layer/versioning_test.go b/api/layer/versioning_test.go index f5419e01..8fffac77 100644 --- a/api/layer/versioning_test.go +++ b/api/layer/versioning_test.go @@ -323,6 +323,7 @@ type testContext struct { layer Client bkt string bktID *cid.ID + bktInfo *data.BucketInfo obj string testNeoFS *testNeoFS } @@ -361,10 +362,14 @@ func prepareContext(t *testing.T, cachesConfig ...*CachesConfig) *testContext { } return &testContext{ - ctx: ctx, - layer: NewLayer(l, tp, layerCfg), - bkt: bktName, - bktID: bktID, + ctx: ctx, + layer: NewLayer(l, tp, layerCfg), + bkt: bktName, + bktID: bktID, + bktInfo: &data.BucketInfo{ + Name: bktName, + CID: bktID, + }, obj: "obj1", t: t, testNeoFS: tp, @@ -373,9 +378,9 @@ func prepareContext(t *testing.T, cachesConfig ...*CachesConfig) *testContext { func TestSimpleVersioning(t *testing.T) { tc := prepareContext(t) - _, err := tc.layer.PutBucketVersioning(tc.ctx, &PutVersioningParams{ - Bucket: tc.bktID.String(), - Settings: &BucketSettings{VersioningEnabled: true}, + err := tc.layer.PutBucketSettings(tc.ctx, &PutSettingsParams{ + BktInfo: tc.bktInfo, + Settings: &data.BucketSettings{VersioningEnabled: true}, }) require.NoError(t, err) @@ -414,9 +419,9 @@ func TestSimpleNoVersioning(t *testing.T) { func TestVersioningDeleteObject(t *testing.T) { tc := prepareContext(t) - _, err := tc.layer.PutBucketVersioning(tc.ctx, &PutVersioningParams{ - Bucket: tc.bktID.String(), - Settings: &BucketSettings{VersioningEnabled: true}, + err := tc.layer.PutBucketSettings(tc.ctx, &PutSettingsParams{ + BktInfo: tc.bktInfo, + Settings: &data.BucketSettings{VersioningEnabled: true}, }) require.NoError(t, err) @@ -431,9 +436,9 @@ func TestVersioningDeleteObject(t *testing.T) { func TestVersioningDeleteSpecificObjectVersion(t *testing.T) { tc := prepareContext(t) - _, err := tc.layer.PutBucketVersioning(tc.ctx, &PutVersioningParams{ - Bucket: tc.bktID.String(), - Settings: &BucketSettings{VersioningEnabled: true}, + err := tc.layer.PutBucketSettings(tc.ctx, &PutSettingsParams{ + BktInfo: tc.bktInfo, + Settings: &data.BucketSettings{VersioningEnabled: true}, }) require.NoError(t, err) @@ -796,25 +801,34 @@ func TestSystemObjectsVersioning(t *testing.T) { cacheConfig.System.Lifetime = 0 tc := prepareContext(t, cacheConfig) - objInfo, err := tc.layer.PutBucketVersioning(tc.ctx, &PutVersioningParams{ - Bucket: tc.bktID.String(), - Settings: &BucketSettings{VersioningEnabled: false}, + err := tc.layer.PutBucketSettings(tc.ctx, &PutSettingsParams{ + BktInfo: tc.bktInfo, + Settings: &data.BucketSettings{VersioningEnabled: false}, }) require.NoError(t, err) - objMeta, ok := tc.testNeoFS.objects[objInfo.Address().String()] - require.True(t, ok) + objMeta := tc.getSystemObject(tc.bktInfo.SettingsObjectName()) + require.NotNil(t, objMeta) - _, err = tc.layer.PutBucketVersioning(tc.ctx, &PutVersioningParams{ - Bucket: tc.bktID.String(), - Settings: &BucketSettings{VersioningEnabled: true}, + err = tc.layer.PutBucketSettings(tc.ctx, &PutSettingsParams{ + BktInfo: tc.bktInfo, + Settings: &data.BucketSettings{VersioningEnabled: true}, }) require.NoError(t, err) + addr := object.NewAddress() + addr.SetContainerID(objMeta.ContainerID()) + addr.SetObjectID(objMeta.ID()) + // simulate failed deletion - tc.testNeoFS.objects[objInfo.Address().String()] = objMeta + tc.testNeoFS.objects[addr.String()] = objMeta - versioning, err := tc.layer.GetBucketVersioning(tc.ctx, tc.bkt) + bktInfo := &data.BucketInfo{ + Name: tc.bkt, + CID: tc.bktID, + } + + versioning, err := tc.layer.GetBucketSettings(tc.ctx, bktInfo) require.NoError(t, err) require.True(t, versioning.VersioningEnabled) }