From 58df410111fc8d51769e8be370ae4c9a9ca37556 Mon Sep 17 00:00:00 2001 From: Denis Kirillov Date: Tue, 18 Jan 2022 12:40:41 +0300 Subject: [PATCH] [#319] Head and delete null versions Signed-off-by: Denis Kirillov --- api/layer/layer.go | 2 +- api/layer/object.go | 22 +++++++++++++++++--- api/layer/versioning.go | 46 ++++++++++++++++++++++++++++++++++------- 3 files changed, 59 insertions(+), 11 deletions(-) diff --git a/api/layer/layer.go b/api/layer/layer.go index ffc64db9d..7322fc651 100644 --- a/api/layer/layer.go +++ b/api/layer/layer.go @@ -414,7 +414,7 @@ func (n *layer) GetObjectInfo(ctx context.Context, p *HeadObjectParams) (*data.O return n.headLastVersionIfNotDeleted(ctx, bkt, p.Object) } - return n.headVersion(ctx, bkt, p.VersionID) + return n.headVersion(ctx, bkt, p) } // PutObject into storage. diff --git a/api/layer/object.go b/api/layer/object.go index e2ad9a83b..1a78d02e5 100644 --- a/api/layer/object.go +++ b/api/layer/object.go @@ -158,6 +158,9 @@ func (n *layer) objectPut(ctx context.Context, bkt *data.BucketInfo, p *PutObjec return nil, err } idsToDeleteArr := updateCRDT2PSetHeaders(p.Header, versions, versioningEnabled) + if !versioningEnabled { + p.Header[versionsUnversionedAttr] = "true" + } r := p.Reader if r != nil { @@ -364,10 +367,23 @@ func (n *layer) headVersions(ctx context.Context, bkt *data.BucketInfo, objectNa return versions, nil } -func (n *layer) headVersion(ctx context.Context, bkt *data.BucketInfo, versionID string) (*data.ObjectInfo, error) { +func (n *layer) headVersion(ctx context.Context, bkt *data.BucketInfo, p *HeadObjectParams) (*data.ObjectInfo, error) { + if p.VersionID == unversionedObjectVersionID { + versions, err := n.headVersions(ctx, bkt, p.Object) + if err != nil { + return nil, err + } + + objInfo := versions.getLast(FromUnversioned()) + if objInfo == nil { + return nil, apiErrors.GetAPIError(apiErrors.ErrNoSuchVersion) + } + return objInfo, nil + } + oid := object.NewID() - if err := oid.Parse(versionID); err != nil { - return nil, err + if err := oid.Parse(p.VersionID); err != nil { + return nil, apiErrors.GetAPIError(apiErrors.ErrInvalidVersion) } if headInfo := n.objCache.Get(newAddress(bkt.CID, oid)); headInfo != nil { diff --git a/api/layer/versioning.go b/api/layer/versioning.go index 49fda85a6..efe57edfc 100644 --- a/api/layer/versioning.go +++ b/api/layer/versioning.go @@ -20,6 +20,26 @@ type objectVersions struct { isSorted bool } +func FromUnversioned() VersionOption { + return func(options *versionOptions) { + options.unversioned = true + } +} + +type VersionOption func(*versionOptions) + +type versionOptions struct { + unversioned bool +} + +func formVersionOptions(opts ...VersionOption) *versionOptions { + options := &versionOptions{} + for _, opt := range opts { + opt(options) + } + return options +} + const ( VersionsDeleteMarkAttr = "S3-Versions-delete-mark" DelMarkFullObject = "*" @@ -30,6 +50,7 @@ const ( attrSettingsVersioningEnabled = "S3-Settings-Versioning-enabled" versionsDelAttr = "S3-Versions-del" versionsAddAttr = "S3-Versions-add" + versionsUnversionedAttr = "S3-Versions-unversioned" ) func newObjectVersions(name string) *objectVersions { @@ -161,17 +182,22 @@ func (v *objectVersions) isEmpty() bool { return v == nil || len(v.objects) == 0 } -func (v *objectVersions) getLast() *data.ObjectInfo { +func (v *objectVersions) getLast(opts ...VersionOption) *data.ObjectInfo { if v.isEmpty() { return nil } + options := formVersionOptions(opts...) + v.sort() existedVersions := v.existedVersions() for i := len(v.objects) - 1; i >= 0; i-- { if contains(existedVersions, v.objects[i].Version()) { delMarkHeader := v.objects[i].Headers[VersionsDeleteMarkAttr] if delMarkHeader == "" { + if options.unversioned && v.objects[i].Headers[versionsUnversionedAttr] != "true" { + continue + } return v.objects[i] } if delMarkHeader == DelMarkFullObject { @@ -377,16 +403,22 @@ func objectInfoToBucketSettings(info *data.ObjectInfo) *BucketSettings { } func (n *layer) checkVersionsExist(ctx context.Context, bkt *data.BucketInfo, obj *VersionedObject) (*data.ObjectInfo, error) { - id := object.NewID() - if err := id.Parse(obj.VersionID); err != nil { - return nil, errors.GetAPIError(errors.ErrInvalidVersion) - } - versions, err := n.headVersions(ctx, bkt, obj.Name) if err != nil { return nil, err } - version := versions.getVersion(id) + + var version *data.ObjectInfo + if obj.VersionID == unversionedObjectVersionID { + version = versions.getLast(FromUnversioned()) + } else { + id := object.NewID() + if err := id.Parse(obj.VersionID); err != nil { + return nil, errors.GetAPIError(errors.ErrInvalidVersion) + } + version = versions.getVersion(id) + } + if version == nil { return nil, errors.GetAPIError(errors.ErrInvalidVersion) }