[#319] Head and delete null versions

Signed-off-by: Denis Kirillov <denis@nspcc.ru>
This commit is contained in:
Denis Kirillov 2022-01-18 12:40:41 +03:00 committed by Angira Kekteeva
parent a46726a545
commit 58df410111
3 changed files with 59 additions and 11 deletions

View file

@ -414,7 +414,7 @@ func (n *layer) GetObjectInfo(ctx context.Context, p *HeadObjectParams) (*data.O
return n.headLastVersionIfNotDeleted(ctx, bkt, p.Object) return n.headLastVersionIfNotDeleted(ctx, bkt, p.Object)
} }
return n.headVersion(ctx, bkt, p.VersionID) return n.headVersion(ctx, bkt, p)
} }
// PutObject into storage. // PutObject into storage.

View file

@ -158,6 +158,9 @@ func (n *layer) objectPut(ctx context.Context, bkt *data.BucketInfo, p *PutObjec
return nil, err return nil, err
} }
idsToDeleteArr := updateCRDT2PSetHeaders(p.Header, versions, versioningEnabled) idsToDeleteArr := updateCRDT2PSetHeaders(p.Header, versions, versioningEnabled)
if !versioningEnabled {
p.Header[versionsUnversionedAttr] = "true"
}
r := p.Reader r := p.Reader
if r != nil { if r != nil {
@ -364,12 +367,25 @@ func (n *layer) headVersions(ctx context.Context, bkt *data.BucketInfo, objectNa
return versions, nil return versions, nil
} }
func (n *layer) headVersion(ctx context.Context, bkt *data.BucketInfo, versionID string) (*data.ObjectInfo, error) { func (n *layer) headVersion(ctx context.Context, bkt *data.BucketInfo, p *HeadObjectParams) (*data.ObjectInfo, error) {
oid := object.NewID() if p.VersionID == unversionedObjectVersionID {
if err := oid.Parse(versionID); err != nil { versions, err := n.headVersions(ctx, bkt, p.Object)
if err != nil {
return nil, err return nil, err
} }
objInfo := versions.getLast(FromUnversioned())
if objInfo == nil {
return nil, apiErrors.GetAPIError(apiErrors.ErrNoSuchVersion)
}
return objInfo, nil
}
oid := object.NewID()
if err := oid.Parse(p.VersionID); err != nil {
return nil, apiErrors.GetAPIError(apiErrors.ErrInvalidVersion)
}
if headInfo := n.objCache.Get(newAddress(bkt.CID, oid)); headInfo != nil { if headInfo := n.objCache.Get(newAddress(bkt.CID, oid)); headInfo != nil {
return objInfoFromMeta(bkt, headInfo), nil return objInfoFromMeta(bkt, headInfo), nil
} }

View file

@ -20,6 +20,26 @@ type objectVersions struct {
isSorted bool isSorted bool
} }
func FromUnversioned() VersionOption {
return func(options *versionOptions) {
options.unversioned = true
}
}
type VersionOption func(*versionOptions)
type versionOptions struct {
unversioned bool
}
func formVersionOptions(opts ...VersionOption) *versionOptions {
options := &versionOptions{}
for _, opt := range opts {
opt(options)
}
return options
}
const ( const (
VersionsDeleteMarkAttr = "S3-Versions-delete-mark" VersionsDeleteMarkAttr = "S3-Versions-delete-mark"
DelMarkFullObject = "*" DelMarkFullObject = "*"
@ -30,6 +50,7 @@ const (
attrSettingsVersioningEnabled = "S3-Settings-Versioning-enabled" attrSettingsVersioningEnabled = "S3-Settings-Versioning-enabled"
versionsDelAttr = "S3-Versions-del" versionsDelAttr = "S3-Versions-del"
versionsAddAttr = "S3-Versions-add" versionsAddAttr = "S3-Versions-add"
versionsUnversionedAttr = "S3-Versions-unversioned"
) )
func newObjectVersions(name string) *objectVersions { func newObjectVersions(name string) *objectVersions {
@ -161,17 +182,22 @@ func (v *objectVersions) isEmpty() bool {
return v == nil || len(v.objects) == 0 return v == nil || len(v.objects) == 0
} }
func (v *objectVersions) getLast() *data.ObjectInfo { func (v *objectVersions) getLast(opts ...VersionOption) *data.ObjectInfo {
if v.isEmpty() { if v.isEmpty() {
return nil return nil
} }
options := formVersionOptions(opts...)
v.sort() v.sort()
existedVersions := v.existedVersions() existedVersions := v.existedVersions()
for i := len(v.objects) - 1; i >= 0; i-- { for i := len(v.objects) - 1; i >= 0; i-- {
if contains(existedVersions, v.objects[i].Version()) { if contains(existedVersions, v.objects[i].Version()) {
delMarkHeader := v.objects[i].Headers[VersionsDeleteMarkAttr] delMarkHeader := v.objects[i].Headers[VersionsDeleteMarkAttr]
if delMarkHeader == "" { if delMarkHeader == "" {
if options.unversioned && v.objects[i].Headers[versionsUnversionedAttr] != "true" {
continue
}
return v.objects[i] return v.objects[i]
} }
if delMarkHeader == DelMarkFullObject { if delMarkHeader == DelMarkFullObject {
@ -377,16 +403,22 @@ func objectInfoToBucketSettings(info *data.ObjectInfo) *BucketSettings {
} }
func (n *layer) checkVersionsExist(ctx context.Context, bkt *data.BucketInfo, obj *VersionedObject) (*data.ObjectInfo, error) { func (n *layer) checkVersionsExist(ctx context.Context, bkt *data.BucketInfo, obj *VersionedObject) (*data.ObjectInfo, error) {
id := object.NewID()
if err := id.Parse(obj.VersionID); err != nil {
return nil, errors.GetAPIError(errors.ErrInvalidVersion)
}
versions, err := n.headVersions(ctx, bkt, obj.Name) versions, err := n.headVersions(ctx, bkt, obj.Name)
if err != nil { if err != nil {
return nil, err return nil, err
} }
version := versions.getVersion(id)
var version *data.ObjectInfo
if obj.VersionID == unversionedObjectVersionID {
version = versions.getLast(FromUnversioned())
} else {
id := object.NewID()
if err := id.Parse(obj.VersionID); err != nil {
return nil, errors.GetAPIError(errors.ErrInvalidVersion)
}
version = versions.getVersion(id)
}
if version == nil { if version == nil {
return nil, errors.GetAPIError(errors.ErrInvalidVersion) return nil, errors.GetAPIError(errors.ErrInvalidVersion)
} }