[#195] Refactor
Using object settings to save bucket versioning Signed-off-by: Denis Kirillov <denis@nspcc.ru>
This commit is contained in:
parent
3046d80b37
commit
b96c3c5a33
9 changed files with 173 additions and 61 deletions
18
api/cache/system.go
vendored
18
api/cache/system.go
vendored
|
@ -61,6 +61,20 @@ func (o *SystemCache) GetCORS(key string) *data.CORSConfiguration {
|
|||
return result
|
||||
}
|
||||
|
||||
func (o *SystemCache) GetSettings(key string) *data.BucketSettings {
|
||||
entry, err := o.cache.Get(key)
|
||||
if err != nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
result, ok := entry.(*data.BucketSettings)
|
||||
if !ok {
|
||||
return nil
|
||||
}
|
||||
|
||||
return result
|
||||
}
|
||||
|
||||
func (o *SystemCache) GetNotificationConfiguration(key string) *data.NotificationConfiguration {
|
||||
entry, err := o.cache.Get(key)
|
||||
if err != nil {
|
||||
|
@ -84,6 +98,10 @@ func (o *SystemCache) PutCORS(key string, obj *data.CORSConfiguration) error {
|
|||
return o.cache.Set(key, obj)
|
||||
}
|
||||
|
||||
func (o *SystemCache) PutSettings(key string, settings *data.BucketSettings) error {
|
||||
return o.cache.Set(key, settings)
|
||||
}
|
||||
|
||||
func (o *SystemCache) PutNotificationConfiguration(key string, obj *data.NotificationConfiguration) error {
|
||||
return o.cache.Set(key, obj)
|
||||
}
|
||||
|
|
|
@ -11,7 +11,7 @@ import (
|
|||
)
|
||||
|
||||
const (
|
||||
bktVersionSettingsObject = ".s3-versioning-settings"
|
||||
bktSettingsObject = ".s3-settings"
|
||||
bktCORSConfigurationObject = ".s3-cors"
|
||||
bktNotificationConfigurationObject = ".s3-notifications"
|
||||
)
|
||||
|
@ -45,6 +45,11 @@ type (
|
|||
Headers map[string]string
|
||||
}
|
||||
|
||||
// BucketSettings stores settings such as versioning.
|
||||
BucketSettings struct {
|
||||
VersioningEnabled bool `json:"versioning_enabled"`
|
||||
}
|
||||
|
||||
// CORSConfiguration stores CORS configuration of a request.
|
||||
CORSConfiguration struct {
|
||||
XMLName xml.Name `xml:"http://s3.amazonaws.com/doc/2006-03-01/ CORSConfiguration" json:"-"`
|
||||
|
@ -63,7 +68,7 @@ type (
|
|||
)
|
||||
|
||||
// SettingsObjectName is system name for bucket settings file.
|
||||
func (b *BucketInfo) SettingsObjectName() string { return bktVersionSettingsObject }
|
||||
func (b *BucketInfo) SettingsObjectName() string { return bktSettingsObject }
|
||||
|
||||
// CORSObjectName returns system name for bucket CORS configuration file.
|
||||
func (b *BucketInfo) CORSObjectName() string { return bktCORSConfigurationObject }
|
||||
|
|
|
@ -458,9 +458,9 @@ func (h *handler) CompleteMultipartUploadHandler(w http.ResponseWriter, r *http.
|
|||
Key: objInfo.Name,
|
||||
}
|
||||
|
||||
if versioning, err := h.obj.GetBucketVersioning(r.Context(), reqInfo.BucketName); err != nil {
|
||||
if settings, err := h.obj.GetBucketSettings(r.Context(), bktInfo); err != nil {
|
||||
h.log.Warn("couldn't get bucket versioning", zap.String("bucket name", reqInfo.BucketName), zap.Error(err))
|
||||
} else if versioning.VersioningEnabled {
|
||||
} else if settings.VersioningEnabled {
|
||||
w.Header().Set(api.AmzVersionID, objInfo.Version())
|
||||
}
|
||||
|
||||
|
|
|
@ -241,9 +241,9 @@ func (h *handler) PutObjectHandler(w http.ResponseWriter, r *http.Request) {
|
|||
}
|
||||
}
|
||||
|
||||
if versioning, err := h.obj.GetBucketVersioning(r.Context(), reqInfo.BucketName); err != nil {
|
||||
if settings, err := h.obj.GetBucketSettings(r.Context(), bktInfo); err != nil {
|
||||
h.log.Warn("couldn't get bucket versioning", zap.String("bucket name", reqInfo.BucketName), zap.Error(err))
|
||||
} else if versioning.VersioningEnabled {
|
||||
} else if settings.VersioningEnabled {
|
||||
w.Header().Set(api.AmzVersionID, info.Version())
|
||||
}
|
||||
|
||||
|
@ -339,9 +339,15 @@ func (h *handler) PostObject(w http.ResponseWriter, r *http.Request) {
|
|||
}
|
||||
}
|
||||
|
||||
if versioning, err := h.obj.GetBucketVersioning(r.Context(), reqInfo.BucketName); err != nil {
|
||||
bktInfo, err := h.obj.GetBucketInfo(r.Context(), reqInfo.BucketName)
|
||||
if err != nil {
|
||||
h.logAndSendError(w, "could not get bucket info", reqInfo, err)
|
||||
return
|
||||
}
|
||||
|
||||
if settings, err := h.obj.GetBucketSettings(r.Context(), bktInfo); err != nil {
|
||||
h.log.Warn("couldn't get bucket versioning", zap.String("bucket name", reqInfo.BucketName), zap.Error(err))
|
||||
} else if versioning.VersioningEnabled {
|
||||
} else if settings.VersioningEnabled {
|
||||
w.Header().Set(api.AmzVersionID, info.Version())
|
||||
}
|
||||
|
||||
|
@ -574,12 +580,20 @@ func (h *handler) CreateBucketHandler(w http.ResponseWriter, r *http.Request) {
|
|||
}
|
||||
|
||||
if p.ObjectLockEnabled {
|
||||
vp := &layer.PutVersioningParams{
|
||||
Bucket: reqInfo.BucketName,
|
||||
Settings: &layer.BucketSettings{VersioningEnabled: true},
|
||||
bktInfo, err := h.obj.GetBucketInfo(r.Context(), reqInfo.BucketName)
|
||||
if err != nil {
|
||||
h.logAndSendError(w, "could get bucket info", reqInfo, err)
|
||||
return
|
||||
}
|
||||
if _, err = h.obj.PutBucketVersioning(r.Context(), vp); err != nil {
|
||||
h.log.Error("couldn't enable bucket versioning", zap.Stringer("container_id", cid), zap.Error(err))
|
||||
|
||||
sp := &layer.PutSettingsParams{
|
||||
BktInfo: bktInfo,
|
||||
Settings: &data.BucketSettings{VersioningEnabled: true},
|
||||
}
|
||||
if err = h.obj.PutBucketSettings(r.Context(), sp); err != nil {
|
||||
h.logAndSendError(w, "couldn't enable bucket versioning", reqInfo, err,
|
||||
zap.Stringer("container_id", cid))
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -6,6 +6,7 @@ import (
|
|||
"net/http"
|
||||
|
||||
"github.com/nspcc-dev/neofs-s3-gw/api"
|
||||
"github.com/nspcc-dev/neofs-s3-gw/api/data"
|
||||
"github.com/nspcc-dev/neofs-s3-gw/api/errors"
|
||||
"github.com/nspcc-dev/neofs-s3-gw/api/layer"
|
||||
"go.uber.org/zap"
|
||||
|
@ -20,11 +21,6 @@ func (h *handler) PutBucketVersioningHandler(w http.ResponseWriter, r *http.Requ
|
|||
return
|
||||
}
|
||||
|
||||
p := &layer.PutVersioningParams{
|
||||
Bucket: reqInfo.BucketName,
|
||||
Settings: &layer.BucketSettings{VersioningEnabled: configuration.Status == "Enabled"},
|
||||
}
|
||||
|
||||
bktInfo, err := h.obj.GetBucketInfo(r.Context(), reqInfo.BucketName)
|
||||
if err != nil {
|
||||
h.logAndSendError(w, "could not get bucket info", reqInfo, err)
|
||||
|
@ -35,12 +31,25 @@ func (h *handler) PutBucketVersioningHandler(w http.ResponseWriter, r *http.Requ
|
|||
return
|
||||
}
|
||||
|
||||
settings, err := h.obj.GetBucketSettings(r.Context(), bktInfo)
|
||||
if err != nil {
|
||||
h.logAndSendError(w, "couldn't get bucket settings", reqInfo, err)
|
||||
return
|
||||
}
|
||||
|
||||
settings.VersioningEnabled = configuration.Status == "Enabled"
|
||||
|
||||
p := &layer.PutSettingsParams{
|
||||
BktInfo: bktInfo,
|
||||
Settings: settings,
|
||||
}
|
||||
|
||||
if !p.Settings.VersioningEnabled && bktInfo.ObjectLockEnabled {
|
||||
h.logAndSendError(w, "couldn't suspend bucket versioning", reqInfo, fmt.Errorf("object lock is enabled"))
|
||||
return
|
||||
}
|
||||
|
||||
if _, err := h.obj.PutBucketVersioning(r.Context(), p); err != nil {
|
||||
if err = h.obj.PutBucketSettings(r.Context(), p); err != nil {
|
||||
h.logAndSendError(w, "couldn't put update versioning settings", reqInfo, err)
|
||||
return
|
||||
}
|
||||
|
@ -61,7 +70,7 @@ func (h *handler) GetBucketVersioningHandler(w http.ResponseWriter, r *http.Requ
|
|||
return
|
||||
}
|
||||
|
||||
settings, err := h.obj.GetBucketVersioning(r.Context(), reqInfo.BucketName)
|
||||
settings, err := h.obj.GetBucketSettings(r.Context(), bktInfo)
|
||||
if err != nil {
|
||||
if errors.IsS3Error(err, errors.ErrNoSuchBucket) {
|
||||
h.logAndSendError(w, "couldn't get versioning settings", reqInfo, err)
|
||||
|
@ -79,7 +88,7 @@ func (h *handler) GetBucketVersioningHandler(w http.ResponseWriter, r *http.Requ
|
|||
}
|
||||
}
|
||||
|
||||
func formVersioningConfiguration(settings *layer.BucketSettings) *VersioningConfiguration {
|
||||
func formVersioningConfiguration(settings *data.BucketSettings) *VersioningConfiguration {
|
||||
res := &VersioningConfiguration{}
|
||||
if settings == nil {
|
||||
return res
|
||||
|
|
|
@ -302,10 +302,10 @@ type (
|
|||
Header map[string]string
|
||||
}
|
||||
|
||||
// PutVersioningParams stores object copy request parameters.
|
||||
PutVersioningParams struct {
|
||||
Bucket string
|
||||
Settings *BucketSettings
|
||||
// PutSettingsParams stores object copy request parameters.
|
||||
PutSettingsParams struct {
|
||||
BktInfo *data.BucketInfo
|
||||
Settings *data.BucketSettings
|
||||
}
|
||||
|
||||
// PutCORSParams stores PutCORS request parameters.
|
||||
|
@ -314,11 +314,6 @@ type (
|
|||
Reader io.Reader
|
||||
}
|
||||
|
||||
// BucketSettings stores settings such as versioning.
|
||||
BucketSettings struct {
|
||||
VersioningEnabled bool
|
||||
}
|
||||
|
||||
// CopyObjectParams stores object copy request parameters.
|
||||
CopyObjectParams struct {
|
||||
SrcObject *data.ObjectInfo
|
||||
|
@ -386,8 +381,8 @@ type (
|
|||
Client interface {
|
||||
EphemeralKey() *keys.PublicKey
|
||||
|
||||
PutBucketVersioning(ctx context.Context, p *PutVersioningParams) (*data.ObjectInfo, error)
|
||||
GetBucketVersioning(ctx context.Context, name string) (*BucketSettings, error)
|
||||
GetBucketSettings(ctx context.Context, bktInfo *data.BucketInfo) (*data.BucketSettings, error)
|
||||
PutBucketSettings(ctx context.Context, p *PutSettingsParams) error
|
||||
|
||||
PutBucketCORS(ctx context.Context, p *PutCORSParams) error
|
||||
GetBucketCORS(ctx context.Context, bktInfo *data.BucketInfo) (*data.CORSConfiguration, error)
|
||||
|
|
|
@ -1,8 +1,11 @@
|
|||
package layer
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"encoding/json"
|
||||
"encoding/xml"
|
||||
"fmt"
|
||||
|
||||
"github.com/nspcc-dev/neofs-s3-gw/api/data"
|
||||
"github.com/nspcc-dev/neofs-s3-gw/api/errors"
|
||||
|
@ -205,3 +208,57 @@ func (n *layer) headSystemVersions(ctx context.Context, bkt *data.BucketInfo, sy
|
|||
func systemObjectKey(bktInfo *data.BucketInfo, obj string) string {
|
||||
return bktInfo.Name + obj
|
||||
}
|
||||
|
||||
func (n *layer) GetBucketSettings(ctx context.Context, bktInfo *data.BucketInfo) (*data.BucketSettings, error) {
|
||||
if settings := n.systemCache.GetSettings(bktInfo.SettingsObjectName()); settings != nil {
|
||||
return settings, nil
|
||||
}
|
||||
|
||||
obj, err := n.getSystemObjectFromNeoFS(ctx, bktInfo, bktInfo.SettingsObjectName())
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
settings := &data.BucketSettings{}
|
||||
if err = json.Unmarshal(obj.Payload(), settings); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if err = n.systemCache.PutSettings(bktInfo.SettingsObjectName(), settings); err != nil {
|
||||
n.log.Warn("couldn't put system meta to objects cache",
|
||||
zap.Stringer("object id", obj.ID()),
|
||||
zap.Stringer("bucket id", bktInfo.CID),
|
||||
zap.Error(err))
|
||||
}
|
||||
|
||||
return settings, nil
|
||||
}
|
||||
|
||||
func (n *layer) PutBucketSettings(ctx context.Context, p *PutSettingsParams) error {
|
||||
rawSettings, err := json.Marshal(p.Settings)
|
||||
if err != nil {
|
||||
return fmt.Errorf("couldn't marshal bucket settings")
|
||||
}
|
||||
|
||||
s := &PutSystemObjectParams{
|
||||
BktInfo: p.BktInfo,
|
||||
ObjName: p.BktInfo.SettingsObjectName(),
|
||||
Metadata: map[string]string{},
|
||||
Reader: bytes.NewReader(rawSettings),
|
||||
}
|
||||
|
||||
obj, err := n.putSystemObjectIntoNeoFS(ctx, s)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if obj.Size == 0 {
|
||||
return errors.GetAPIError(errors.ErrInternalError)
|
||||
}
|
||||
|
||||
if err = n.systemCache.PutSettings(p.BktInfo.SettingsObjectName(), p.Settings); err != nil {
|
||||
n.log.Error("couldn't cache system object", zap.Error(err))
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
|
|
@ -282,8 +282,8 @@ func (v *objectVersions) getVersion(oid *oid.ID) *data.ObjectInfo {
|
|||
}
|
||||
return nil
|
||||
}
|
||||
func (n *layer) PutBucketVersioning(ctx context.Context, p *PutVersioningParams) (*data.ObjectInfo, error) {
|
||||
bktInfo, err := n.GetBucketInfo(ctx, p.Bucket)
|
||||
func (n *layer) PutBucketVersioning(ctx context.Context, p *PutSettingsParams) (*data.ObjectInfo, error) {
|
||||
bktInfo, err := n.GetBucketInfo(ctx, p.BktInfo.Name)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
@ -303,7 +303,7 @@ func (n *layer) PutBucketVersioning(ctx context.Context, p *PutVersioningParams)
|
|||
return n.putSystemObject(ctx, s)
|
||||
}
|
||||
|
||||
func (n *layer) GetBucketVersioning(ctx context.Context, bucketName string) (*BucketSettings, error) {
|
||||
func (n *layer) GetBucketVersioning(ctx context.Context, bucketName string) (*data.BucketSettings, error) {
|
||||
bktInfo, err := n.GetBucketInfo(ctx, bucketName)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
|
@ -398,7 +398,7 @@ func contains(list []string, elem string) bool {
|
|||
return false
|
||||
}
|
||||
|
||||
func (n *layer) getBucketSettings(ctx context.Context, bktInfo *data.BucketInfo) (*BucketSettings, error) {
|
||||
func (n *layer) getBucketSettings(ctx context.Context, bktInfo *data.BucketInfo) (*data.BucketSettings, error) {
|
||||
objInfo, err := n.headSystemObject(ctx, bktInfo, bktInfo.SettingsObjectName())
|
||||
if err != nil {
|
||||
return nil, err
|
||||
|
@ -407,8 +407,8 @@ func (n *layer) getBucketSettings(ctx context.Context, bktInfo *data.BucketInfo)
|
|||
return objectInfoToBucketSettings(objInfo), nil
|
||||
}
|
||||
|
||||
func objectInfoToBucketSettings(info *data.ObjectInfo) *BucketSettings {
|
||||
res := &BucketSettings{}
|
||||
func objectInfoToBucketSettings(info *data.ObjectInfo) *data.BucketSettings {
|
||||
res := &data.BucketSettings{}
|
||||
|
||||
enabled, ok := info.Headers[attrSettingsVersioningEnabled]
|
||||
if ok {
|
||||
|
|
|
@ -323,6 +323,7 @@ type testContext struct {
|
|||
layer Client
|
||||
bkt string
|
||||
bktID *cid.ID
|
||||
bktInfo *data.BucketInfo
|
||||
obj string
|
||||
testNeoFS *testNeoFS
|
||||
}
|
||||
|
@ -365,6 +366,10 @@ func prepareContext(t *testing.T, cachesConfig ...*CachesConfig) *testContext {
|
|||
layer: NewLayer(l, tp, layerCfg),
|
||||
bkt: bktName,
|
||||
bktID: bktID,
|
||||
bktInfo: &data.BucketInfo{
|
||||
Name: bktName,
|
||||
CID: bktID,
|
||||
},
|
||||
obj: "obj1",
|
||||
t: t,
|
||||
testNeoFS: tp,
|
||||
|
@ -373,9 +378,9 @@ func prepareContext(t *testing.T, cachesConfig ...*CachesConfig) *testContext {
|
|||
|
||||
func TestSimpleVersioning(t *testing.T) {
|
||||
tc := prepareContext(t)
|
||||
_, err := tc.layer.PutBucketVersioning(tc.ctx, &PutVersioningParams{
|
||||
Bucket: tc.bktID.String(),
|
||||
Settings: &BucketSettings{VersioningEnabled: true},
|
||||
err := tc.layer.PutBucketSettings(tc.ctx, &PutSettingsParams{
|
||||
BktInfo: tc.bktInfo,
|
||||
Settings: &data.BucketSettings{VersioningEnabled: true},
|
||||
})
|
||||
require.NoError(t, err)
|
||||
|
||||
|
@ -414,9 +419,9 @@ func TestSimpleNoVersioning(t *testing.T) {
|
|||
|
||||
func TestVersioningDeleteObject(t *testing.T) {
|
||||
tc := prepareContext(t)
|
||||
_, err := tc.layer.PutBucketVersioning(tc.ctx, &PutVersioningParams{
|
||||
Bucket: tc.bktID.String(),
|
||||
Settings: &BucketSettings{VersioningEnabled: true},
|
||||
err := tc.layer.PutBucketSettings(tc.ctx, &PutSettingsParams{
|
||||
BktInfo: tc.bktInfo,
|
||||
Settings: &data.BucketSettings{VersioningEnabled: true},
|
||||
})
|
||||
require.NoError(t, err)
|
||||
|
||||
|
@ -431,9 +436,9 @@ func TestVersioningDeleteObject(t *testing.T) {
|
|||
|
||||
func TestVersioningDeleteSpecificObjectVersion(t *testing.T) {
|
||||
tc := prepareContext(t)
|
||||
_, err := tc.layer.PutBucketVersioning(tc.ctx, &PutVersioningParams{
|
||||
Bucket: tc.bktID.String(),
|
||||
Settings: &BucketSettings{VersioningEnabled: true},
|
||||
err := tc.layer.PutBucketSettings(tc.ctx, &PutSettingsParams{
|
||||
BktInfo: tc.bktInfo,
|
||||
Settings: &data.BucketSettings{VersioningEnabled: true},
|
||||
})
|
||||
require.NoError(t, err)
|
||||
|
||||
|
@ -796,25 +801,34 @@ func TestSystemObjectsVersioning(t *testing.T) {
|
|||
cacheConfig.System.Lifetime = 0
|
||||
|
||||
tc := prepareContext(t, cacheConfig)
|
||||
objInfo, err := tc.layer.PutBucketVersioning(tc.ctx, &PutVersioningParams{
|
||||
Bucket: tc.bktID.String(),
|
||||
Settings: &BucketSettings{VersioningEnabled: false},
|
||||
err := tc.layer.PutBucketSettings(tc.ctx, &PutSettingsParams{
|
||||
BktInfo: tc.bktInfo,
|
||||
Settings: &data.BucketSettings{VersioningEnabled: false},
|
||||
})
|
||||
require.NoError(t, err)
|
||||
|
||||
objMeta, ok := tc.testNeoFS.objects[objInfo.Address().String()]
|
||||
require.True(t, ok)
|
||||
objMeta := tc.getSystemObject(tc.bktInfo.SettingsObjectName())
|
||||
require.NotNil(t, objMeta)
|
||||
|
||||
_, err = tc.layer.PutBucketVersioning(tc.ctx, &PutVersioningParams{
|
||||
Bucket: tc.bktID.String(),
|
||||
Settings: &BucketSettings{VersioningEnabled: true},
|
||||
err = tc.layer.PutBucketSettings(tc.ctx, &PutSettingsParams{
|
||||
BktInfo: tc.bktInfo,
|
||||
Settings: &data.BucketSettings{VersioningEnabled: true},
|
||||
})
|
||||
require.NoError(t, err)
|
||||
|
||||
addr := object.NewAddress()
|
||||
addr.SetContainerID(objMeta.ContainerID())
|
||||
addr.SetObjectID(objMeta.ID())
|
||||
|
||||
// simulate failed deletion
|
||||
tc.testNeoFS.objects[objInfo.Address().String()] = objMeta
|
||||
tc.testNeoFS.objects[addr.String()] = objMeta
|
||||
|
||||
versioning, err := tc.layer.GetBucketVersioning(tc.ctx, tc.bkt)
|
||||
bktInfo := &data.BucketInfo{
|
||||
Name: tc.bkt,
|
||||
CID: tc.bktID,
|
||||
}
|
||||
|
||||
versioning, err := tc.layer.GetBucketSettings(tc.ctx, bktInfo)
|
||||
require.NoError(t, err)
|
||||
require.True(t, versioning.VersioningEnabled)
|
||||
}
|
||||
|
|
Loading…
Reference in a new issue