forked from TrueCloudLab/frostfs-s3-gw
[#569] Refactor versioning in tree service
Signed-off-by: Angira Kekteeva <kira@nspcc.ru>
This commit is contained in:
parent
e46763e429
commit
72013e08ec
10 changed files with 48 additions and 44 deletions
|
@ -13,6 +13,10 @@ const (
|
||||||
bktSettingsObject = ".s3-settings"
|
bktSettingsObject = ".s3-settings"
|
||||||
bktCORSConfigurationObject = ".s3-cors"
|
bktCORSConfigurationObject = ".s3-cors"
|
||||||
bktNotificationConfigurationObject = ".s3-notifications"
|
bktNotificationConfigurationObject = ".s3-notifications"
|
||||||
|
|
||||||
|
VerUnversioned = "Unversioned"
|
||||||
|
VerEnabled = "Enabled"
|
||||||
|
VerSuspended = "Suspended"
|
||||||
)
|
)
|
||||||
|
|
||||||
type (
|
type (
|
||||||
|
@ -45,8 +49,7 @@ type (
|
||||||
|
|
||||||
// BucketSettings stores settings such as versioning.
|
// BucketSettings stores settings such as versioning.
|
||||||
BucketSettings struct {
|
BucketSettings struct {
|
||||||
IsNoneStatus bool
|
Versioning string `json:"versioning"`
|
||||||
VersioningEnabled bool `json:"versioning_enabled"`
|
|
||||||
LockConfiguration *ObjectLockConfiguration `json:"lock_configuration"`
|
LockConfiguration *ObjectLockConfiguration `json:"lock_configuration"`
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -91,3 +94,15 @@ func (o *ObjectInfo) Address() oid.Address {
|
||||||
|
|
||||||
return addr
|
return addr
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (b BucketSettings) Unversioned() bool {
|
||||||
|
return b.Versioning == VerUnversioned
|
||||||
|
}
|
||||||
|
|
||||||
|
func (b BucketSettings) VersioningEnabled() bool {
|
||||||
|
return b.Versioning == VerEnabled
|
||||||
|
}
|
||||||
|
|
||||||
|
func (b BucketSettings) VersioningSuspended() bool {
|
||||||
|
return b.Versioning == VerSuspended
|
||||||
|
}
|
||||||
|
|
|
@ -98,7 +98,7 @@ func (h *handler) DeleteObjectHandler(w http.ResponseWriter, r *http.Request) {
|
||||||
|
|
||||||
var m *SendNotificationParams
|
var m *SendNotificationParams
|
||||||
|
|
||||||
if bktSettings.VersioningEnabled && len(versionID) == 0 {
|
if bktSettings.VersioningEnabled() && len(versionID) == 0 {
|
||||||
m = &SendNotificationParams{
|
m = &SendNotificationParams{
|
||||||
Event: EventObjectRemovedDeleteMarkerCreated,
|
Event: EventObjectRemovedDeleteMarkerCreated,
|
||||||
ObjInfo: &data.ObjectInfo{
|
ObjInfo: &data.ObjectInfo{
|
||||||
|
|
|
@ -418,7 +418,7 @@ func (h *handler) CompleteMultipartUploadHandler(w http.ResponseWriter, r *http.
|
||||||
Key: objInfo.Name,
|
Key: objInfo.Name,
|
||||||
}
|
}
|
||||||
|
|
||||||
if bktSettings.VersioningEnabled {
|
if bktSettings.VersioningEnabled() {
|
||||||
w.Header().Set(api.AmzVersionID, objInfo.Version())
|
w.Header().Set(api.AmzVersionID, objInfo.Version())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -277,7 +277,7 @@ func (h *handler) PutObjectHandler(w http.ResponseWriter, r *http.Request) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if settings.VersioningEnabled {
|
if settings.VersioningEnabled() {
|
||||||
w.Header().Set(api.AmzVersionID, info.Version())
|
w.Header().Set(api.AmzVersionID, info.Version())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -407,7 +407,7 @@ func (h *handler) PostObject(w http.ResponseWriter, r *http.Request) {
|
||||||
|
|
||||||
if settings, err := h.obj.GetBucketSettings(r.Context(), bktInfo); err != nil {
|
if settings, err := h.obj.GetBucketSettings(r.Context(), bktInfo); err != nil {
|
||||||
h.log.Warn("couldn't get bucket versioning", zap.String("bucket name", reqInfo.BucketName), zap.Error(err))
|
h.log.Warn("couldn't get bucket versioning", zap.String("bucket name", reqInfo.BucketName), zap.Error(err))
|
||||||
} else if settings.VersioningEnabled {
|
} else if settings.VersioningEnabled() {
|
||||||
w.Header().Set(api.AmzVersionID, info.Version())
|
w.Header().Set(api.AmzVersionID, info.Version())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -659,7 +659,7 @@ func (h *handler) CreateBucketHandler(w http.ResponseWriter, r *http.Request) {
|
||||||
if p.ObjectLockEnabled {
|
if p.ObjectLockEnabled {
|
||||||
sp := &layer.PutSettingsParams{
|
sp := &layer.PutSettingsParams{
|
||||||
BktInfo: bktInfo,
|
BktInfo: bktInfo,
|
||||||
Settings: &data.BucketSettings{VersioningEnabled: true},
|
Settings: &data.BucketSettings{Versioning: data.VerEnabled},
|
||||||
}
|
}
|
||||||
if err = h.obj.PutBucketSettings(r.Context(), sp); err != nil {
|
if err = h.obj.PutBucketSettings(r.Context(), sp); err != nil {
|
||||||
h.logAndSendError(w, "couldn't enable bucket versioning", reqInfo, err,
|
h.logAndSendError(w, "couldn't enable bucket versioning", reqInfo, err,
|
||||||
|
|
|
@ -91,7 +91,7 @@ func (h *handler) GetObjectTaggingHandler(w http.ResponseWriter, r *http.Request
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
if settings.VersioningEnabled {
|
if settings.VersioningEnabled() {
|
||||||
w.Header().Set(api.AmzVersionID, versionID)
|
w.Header().Set(api.AmzVersionID, versionID)
|
||||||
}
|
}
|
||||||
if err = api.EncodeToResponse(w, encodeTagging(tagSet)); err != nil {
|
if err = api.EncodeToResponse(w, encodeTagging(tagSet)); err != nil {
|
||||||
|
|
|
@ -33,15 +33,18 @@ func (h *handler) PutBucketVersioningHandler(w http.ResponseWriter, r *http.Requ
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
settings.VersioningEnabled = configuration.Status == "Enabled"
|
if configuration.Status != data.VerEnabled && configuration.Status != data.VerSuspended {
|
||||||
settings.IsNoneStatus = false
|
h.logAndSendError(w, "invalid versioning configuration", reqInfo, err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
settings.Versioning = configuration.Status
|
||||||
|
|
||||||
p := &layer.PutSettingsParams{
|
p := &layer.PutSettingsParams{
|
||||||
BktInfo: bktInfo,
|
BktInfo: bktInfo,
|
||||||
Settings: settings,
|
Settings: settings,
|
||||||
}
|
}
|
||||||
|
|
||||||
if !p.Settings.VersioningEnabled && bktInfo.ObjectLockEnabled {
|
if p.Settings.VersioningSuspended() && bktInfo.ObjectLockEnabled {
|
||||||
h.logAndSendError(w, "couldn't suspend bucket versioning", reqInfo, fmt.Errorf("object lock is enabled"))
|
h.logAndSendError(w, "couldn't suspend bucket versioning", reqInfo, fmt.Errorf("object lock is enabled"))
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
@ -77,13 +80,9 @@ func (h *handler) GetBucketVersioningHandler(w http.ResponseWriter, r *http.Requ
|
||||||
|
|
||||||
func formVersioningConfiguration(settings *data.BucketSettings) *VersioningConfiguration {
|
func formVersioningConfiguration(settings *data.BucketSettings) *VersioningConfiguration {
|
||||||
res := &VersioningConfiguration{}
|
res := &VersioningConfiguration{}
|
||||||
if settings.IsNoneStatus {
|
if !settings.Unversioned() {
|
||||||
return res
|
res.Status = settings.Versioning
|
||||||
}
|
}
|
||||||
if settings.VersioningEnabled {
|
|
||||||
res.Status = "Enabled"
|
|
||||||
} else {
|
|
||||||
res.Status = "Suspended"
|
|
||||||
}
|
|
||||||
return res
|
return res
|
||||||
}
|
}
|
||||||
|
|
|
@ -475,7 +475,7 @@ func getRandomOID() (oid.ID, error) {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (n *layer) deleteObject(ctx context.Context, bkt *data.BucketInfo, settings *data.BucketSettings, obj *VersionedObject) *VersionedObject {
|
func (n *layer) deleteObject(ctx context.Context, bkt *data.BucketInfo, settings *data.BucketSettings, obj *VersionedObject) *VersionedObject {
|
||||||
if len(obj.VersionID) != 0 || settings.IsNoneStatus {
|
if len(obj.VersionID) != 0 || settings.Unversioned() {
|
||||||
var nodeVersion *data.NodeVersion
|
var nodeVersion *data.NodeVersion
|
||||||
if nodeVersion, obj.Error = n.getNodeVersionToDelete(ctx, bkt, obj); obj.Error != nil {
|
if nodeVersion, obj.Error = n.getNodeVersionToDelete(ctx, bkt, obj); obj.Error != nil {
|
||||||
return obj
|
return obj
|
||||||
|
@ -492,7 +492,7 @@ func (n *layer) deleteObject(ctx context.Context, bkt *data.BucketInfo, settings
|
||||||
|
|
||||||
var newVersion *data.NodeVersion
|
var newVersion *data.NodeVersion
|
||||||
|
|
||||||
if !settings.VersioningEnabled {
|
if settings.VersioningSuspended() {
|
||||||
obj.VersionID = UnversionedObjectVersionID
|
obj.VersionID = UnversionedObjectVersionID
|
||||||
|
|
||||||
var nodeVersion *data.NodeVersion
|
var nodeVersion *data.NodeVersion
|
||||||
|
@ -526,7 +526,7 @@ func (n *layer) deleteObject(ctx context.Context, bkt *data.BucketInfo, settings
|
||||||
Created: time.Now(),
|
Created: time.Now(),
|
||||||
Owner: n.Owner(ctx),
|
Owner: n.Owner(ctx),
|
||||||
},
|
},
|
||||||
IsUnversioned: !settings.VersioningEnabled,
|
IsUnversioned: settings.VersioningSuspended(),
|
||||||
}
|
}
|
||||||
|
|
||||||
if obj.Error = n.treeService.AddVersion(ctx, bkt.CID, newVersion); obj.Error != nil {
|
if obj.Error = n.treeService.AddVersion(ctx, bkt.CID, newVersion); obj.Error != nil {
|
||||||
|
|
|
@ -149,10 +149,14 @@ func MimeByFileName(name string) string {
|
||||||
func (n *layer) PutObject(ctx context.Context, p *PutObjectParams) (*data.ObjectInfo, error) {
|
func (n *layer) PutObject(ctx context.Context, p *PutObjectParams) (*data.ObjectInfo, error) {
|
||||||
own := n.Owner(ctx)
|
own := n.Owner(ctx)
|
||||||
|
|
||||||
versioningEnabled := n.isVersioningEnabled(ctx, p.BktInfo)
|
bktSettings, err := n.GetBucketSettings(ctx, p.BktInfo)
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("couldn't get versioning settings object: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
newVersion := &data.NodeVersion{
|
newVersion := &data.NodeVersion{
|
||||||
BaseNodeVersion: data.BaseNodeVersion{FilePath: p.Object},
|
BaseNodeVersion: data.BaseNodeVersion{FilePath: p.Object},
|
||||||
IsUnversioned: !versioningEnabled,
|
IsUnversioned: !bktSettings.VersioningEnabled(),
|
||||||
}
|
}
|
||||||
|
|
||||||
r := p.Reader
|
r := p.Reader
|
||||||
|
@ -653,16 +657,6 @@ func triageExtendedObjects(allObjects []*data.ExtendedObjectInfo) (prefixes []st
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
func (n *layer) isVersioningEnabled(ctx context.Context, bktInfo *data.BucketInfo) bool {
|
|
||||||
settings, err := n.GetBucketSettings(ctx, bktInfo)
|
|
||||||
if err != nil {
|
|
||||||
n.log.Warn("couldn't get versioning settings object", zap.Error(err))
|
|
||||||
return false
|
|
||||||
}
|
|
||||||
|
|
||||||
return settings.VersioningEnabled
|
|
||||||
}
|
|
||||||
|
|
||||||
func (n *layer) objectInfoFromObjectsCacheOrNeoFS(ctx context.Context, bktInfo *data.BucketInfo, obj oid.ID, prefix, delimiter string) *data.ObjectInfo {
|
func (n *layer) objectInfoFromObjectsCacheOrNeoFS(ctx context.Context, bktInfo *data.BucketInfo, obj oid.ID, prefix, delimiter string) *data.ObjectInfo {
|
||||||
if objInfo := n.objCache.GetObject(newAddress(bktInfo.CID, obj)); objInfo != nil {
|
if objInfo := n.objCache.GetObject(newAddress(bktInfo.CID, obj)); objInfo != nil {
|
||||||
return objInfo
|
return objInfo
|
||||||
|
|
|
@ -189,8 +189,7 @@ func (n *layer) GetBucketSettings(ctx context.Context, bktInfo *data.BucketInfo)
|
||||||
if !errorsStd.Is(err, ErrNodeNotFound) {
|
if !errorsStd.Is(err, ErrNodeNotFound) {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
settings = &data.BucketSettings{}
|
settings = &data.BucketSettings{Versioning: data.VerUnversioned}
|
||||||
settings.IsNoneStatus = true
|
|
||||||
}
|
}
|
||||||
|
|
||||||
if err = n.systemCache.PutSettings(systemKey, settings); err != nil {
|
if err = n.systemCache.PutSettings(systemKey, settings); err != nil {
|
||||||
|
|
|
@ -47,7 +47,7 @@ type (
|
||||||
)
|
)
|
||||||
|
|
||||||
const (
|
const (
|
||||||
versioningEnabledKV = "VersioningEnabled"
|
versioningKV = "Versioning"
|
||||||
lockConfigurationKV = "LockConfiguration"
|
lockConfigurationKV = "LockConfiguration"
|
||||||
oidKV = "OID"
|
oidKV = "OID"
|
||||||
fileNameKV = "FileName"
|
fileNameKV = "FileName"
|
||||||
|
@ -260,18 +260,15 @@ func newPartInfo(node NodeResponse) (*data.PartInfo, error) {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *TreeClient) GetSettingsNode(ctx context.Context, cnrID cid.ID) (*data.BucketSettings, error) {
|
func (c *TreeClient) GetSettingsNode(ctx context.Context, cnrID cid.ID) (*data.BucketSettings, error) {
|
||||||
keysToReturn := []string{versioningEnabledKV, lockConfigurationKV}
|
keysToReturn := []string{versioningKV, lockConfigurationKV}
|
||||||
node, err := c.getSystemNode(ctx, cnrID, []string{settingsFileName}, keysToReturn)
|
node, err := c.getSystemNode(ctx, cnrID, []string{settingsFileName}, keysToReturn)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, fmt.Errorf("couldn't get node: %w", err)
|
return nil, fmt.Errorf("couldn't get node: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
settings := &data.BucketSettings{}
|
settings := &data.BucketSettings{Versioning: data.VerUnversioned}
|
||||||
|
if versioningValue, ok := node.Get(versioningKV); ok {
|
||||||
if versioningEnabledValue, ok := node.Get(versioningEnabledKV); ok {
|
settings.Versioning = versioningValue
|
||||||
if settings.VersioningEnabled, err = strconv.ParseBool(versioningEnabledValue); err != nil {
|
|
||||||
return nil, fmt.Errorf("settings node: invalid versioning: %w", err)
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
if lockConfigurationValue, ok := node.Get(lockConfigurationKV); ok {
|
if lockConfigurationValue, ok := node.Get(lockConfigurationKV); ok {
|
||||||
|
@ -1158,7 +1155,7 @@ func metaFromSettings(settings *data.BucketSettings) map[string]string {
|
||||||
results := make(map[string]string, 3)
|
results := make(map[string]string, 3)
|
||||||
|
|
||||||
results[fileNameKV] = settingsFileName
|
results[fileNameKV] = settingsFileName
|
||||||
results[versioningEnabledKV] = strconv.FormatBool(settings.VersioningEnabled)
|
results[versioningKV] = settings.Versioning
|
||||||
results[lockConfigurationKV] = encodeLockConfiguration(settings.LockConfiguration)
|
results[lockConfigurationKV] = encodeLockConfiguration(settings.LockConfiguration)
|
||||||
|
|
||||||
return results
|
return results
|
||||||
|
|
Loading…
Reference in a new issue