[#319] Update CRDT headers

Signed-off-by: Denis Kirillov <denis@nspcc.ru>
This commit is contained in:
Denis Kirillov 2022-01-18 18:28:46 +03:00 committed by Angira Kekteeva
parent 58df410111
commit f5d365af1d
5 changed files with 111 additions and 46 deletions

View file

@ -68,6 +68,15 @@ func (b *BucketInfo) CORSObjectName() string { return bktCORSConfigurationObject
// Version returns object version from ObjectInfo. // Version returns object version from ObjectInfo.
func (o *ObjectInfo) Version() string { return o.ID.String() } func (o *ObjectInfo) Version() string { return o.ID.String() }
// NullableVersion returns object version from ObjectInfo.
// Return "null" if "S3-Versions-unversioned" header present.
func (o *ObjectInfo) NullableVersion() string {
if _, ok := o.Headers["S3-Versions-unversioned"]; ok {
return "null"
}
return o.Version()
}
// NiceName returns object name for cache. // NiceName returns object name for cache.
func (o *ObjectInfo) NiceName() string { return o.Bucket + "/" + o.Name } func (o *ObjectInfo) NiceName() string { return o.Bucket + "/" + o.Name }

View file

@ -287,7 +287,7 @@ func encodeListObjectVersionsToResponse(info *layer.ListObjectVersionsInfo, buck
DisplayName: ver.Object.Owner.String(), DisplayName: ver.Object.Owner.String(),
}, },
Size: ver.Object.Size, Size: ver.Object.Size,
VersionID: ver.Object.Version(), VersionID: ver.Object.NullableVersion(),
ETag: ver.Object.HashSum, ETag: ver.Object.HashSum,
}) })
} }

View file

@ -158,9 +158,6 @@ func (n *layer) objectPut(ctx context.Context, bkt *data.BucketInfo, p *PutObjec
return nil, err return nil, err
} }
idsToDeleteArr := updateCRDT2PSetHeaders(p.Header, versions, versioningEnabled) idsToDeleteArr := updateCRDT2PSetHeaders(p.Header, versions, versioningEnabled)
if !versioningEnabled {
p.Header[versionsUnversionedAttr] = "true"
}
r := p.Reader r := p.Reader
if r != nil { if r != nil {
@ -259,44 +256,52 @@ func formRawObject(p *PutObjectParams, bktID *cid.ID, own *owner.ID, obj string)
} }
func updateCRDT2PSetHeaders(header map[string]string, versions *objectVersions, versioningEnabled bool) []*object.ID { func updateCRDT2PSetHeaders(header map[string]string, versions *objectVersions, versioningEnabled bool) []*object.ID {
if !versioningEnabled {
header[versionsUnversionedAttr] = "true"
}
var idsToDeleteArr []*object.ID var idsToDeleteArr []*object.ID
if versions.isEmpty() { if versions.isEmpty() {
return idsToDeleteArr return idsToDeleteArr
} }
if versioningEnabled { if !versions.isAddListEmpty() {
if !versions.isAddListEmpty() { header[versionsAddAttr] = versions.getAddHeader()
header[versionsAddAttr] = versions.getAddHeader() }
}
deleted := versions.getDelHeader() if versioningEnabled {
versionsDeletedStr := versions.getDelHeader()
// header[versionsDelAttr] can be not empty when deleting specific version // header[versionsDelAttr] can be not empty when deleting specific version
if delAttr := header[versionsDelAttr]; len(delAttr) != 0 { if delAttr := header[versionsDelAttr]; len(delAttr) != 0 {
if len(deleted) != 0 { if len(versionsDeletedStr) != 0 {
header[versionsDelAttr] = deleted + "," + delAttr header[versionsDelAttr] = versionsDeletedStr + "," + delAttr
} else { } else {
header[versionsDelAttr] = delAttr header[versionsDelAttr] = delAttr
} }
} else if len(deleted) != 0 {
header[versionsDelAttr] = deleted
}
} else {
versionsDeletedStr := versions.getDelHeader()
if len(versionsDeletedStr) != 0 {
versionsDeletedStr += ","
}
if lastVersion := versions.getLast(); lastVersion != nil {
header[versionsDelAttr] = versionsDeletedStr + lastVersion.Version()
idsToDeleteArr = append(idsToDeleteArr, lastVersion.ID)
} else if len(versionsDeletedStr) != 0 { } else if len(versionsDeletedStr) != 0 {
header[versionsDelAttr] = versionsDeletedStr header[versionsDelAttr] = versionsDeletedStr
} }
} else {
versionsDeletedStr := versions.getDelHeader()
for _, version := range versions.objects { var additionalDel string
if contains(versions.delList, version.Version()) { for i, del := range versions.unversioned() {
idsToDeleteArr = append(idsToDeleteArr, version.ID) if i != 0 {
additionalDel += ","
} }
additionalDel += del.Version()
idsToDeleteArr = append(idsToDeleteArr, del.ID)
}
if len(additionalDel) != 0 {
if len(versionsDeletedStr) != 0 {
versionsDeletedStr += ","
}
versionsDeletedStr += additionalDel
}
if len(versionsDeletedStr) != 0 {
header[versionsDelAttr] = versionsDeletedStr
} }
} }

View file

@ -182,6 +182,23 @@ func (v *objectVersions) isEmpty() bool {
return v == nil || len(v.objects) == 0 return v == nil || len(v.objects) == 0
} }
func (v *objectVersions) unversioned() []*data.ObjectInfo {
if len(v.objects) == 0 {
return nil
}
existedVersions := v.existedVersions()
res := make([]*data.ObjectInfo, 0, len(v.objects))
for _, version := range v.objects {
if contains(existedVersions, version.Version()) && version.Headers[versionsUnversionedAttr] == "true" {
res = append(res, version)
}
}
return res
}
func (v *objectVersions) getLast(opts ...VersionOption) *data.ObjectInfo { func (v *objectVersions) getLast(opts ...VersionOption) *data.ObjectInfo {
if v.isEmpty() { if v.isEmpty() {
return nil return nil

View file

@ -687,6 +687,12 @@ func getTestObjectInfo(id byte, addAttr, delAttr, delMarkAttr string) *data.Obje
} }
} }
func getTestUnversionedObjectInfo(id byte, addAttr, delAttr, delMarkAttr string) *data.ObjectInfo {
objInfo := getTestObjectInfo(id, addAttr, delAttr, delMarkAttr)
objInfo.Headers[versionsUnversionedAttr] = "true"
return objInfo
}
func getTestObjectInfoEpoch(epoch uint64, id byte, addAttr, delAttr, delMarkAttr string) *data.ObjectInfo { func getTestObjectInfoEpoch(epoch uint64, id byte, addAttr, delAttr, delMarkAttr string) *data.ObjectInfo {
obj := getTestObjectInfo(id, addAttr, delAttr, delMarkAttr) obj := getTestObjectInfo(id, addAttr, delAttr, delMarkAttr)
obj.CreationEpoch = epoch obj.CreationEpoch = epoch
@ -694,10 +700,13 @@ func getTestObjectInfoEpoch(epoch uint64, id byte, addAttr, delAttr, delMarkAttr
} }
func TestUpdateCRDT2PSetHeaders(t *testing.T) { func TestUpdateCRDT2PSetHeaders(t *testing.T) {
obj1 := getTestObjectInfo(1, "", "", "") obj1 := getTestUnversionedObjectInfo(1, "", "", "")
obj2 := getTestObjectInfo(2, "", "", "") obj2 := getTestUnversionedObjectInfo(2, "", "", "")
obj3 := getTestObjectInfo(3, "", "", "")
obj4 := getTestObjectInfo(4, "", "", "")
for _, tc := range []struct { for _, tc := range []struct {
name string
header map[string]string header map[string]string
versions *objectVersions versions *objectVersions
versioningEnabled bool versioningEnabled bool
@ -705,53 +714,78 @@ func TestUpdateCRDT2PSetHeaders(t *testing.T) {
expectedIdsToDelete []*object.ID expectedIdsToDelete []*object.ID
}{ }{
{ {
header: map[string]string{"someKey": "someValue"}, name: "unversioned save headers",
expectedHeader: map[string]string{"someKey": "someValue"}, header: map[string]string{"someKey": "someValue"},
expectedIdsToDelete: nil, expectedHeader: map[string]string{"someKey": "someValue", versionsUnversionedAttr: "true"},
}, },
{ {
name: "unversioned put",
header: map[string]string{}, header: map[string]string{},
versions: &objectVersions{ versions: &objectVersions{
objects: []*data.ObjectInfo{obj1}, objects: []*data.ObjectInfo{obj1},
}, },
expectedHeader: map[string]string{versionsDelAttr: obj1.Version()}, expectedHeader: map[string]string{
versionsAddAttr: obj1.Version(),
versionsDelAttr: obj1.Version(),
versionsUnversionedAttr: "true",
},
expectedIdsToDelete: []*object.ID{obj1.ID}, expectedIdsToDelete: []*object.ID{obj1.ID},
}, },
{ {
name: "unversioned del header",
header: map[string]string{}, header: map[string]string{},
versions: &objectVersions{ versions: &objectVersions{
objects: []*data.ObjectInfo{obj2}, objects: []*data.ObjectInfo{obj2},
delList: []string{obj1.Version()}, delList: []string{obj1.Version()},
}, },
expectedHeader: map[string]string{versionsDelAttr: joinVers(obj1, obj2)}, expectedHeader: map[string]string{
versionsAddAttr: obj2.Version(),
versionsDelAttr: joinVers(obj1, obj2),
versionsUnversionedAttr: "true",
},
expectedIdsToDelete: []*object.ID{obj2.ID}, expectedIdsToDelete: []*object.ID{obj2.ID},
}, },
{ {
name: "versioned put",
header: map[string]string{}, header: map[string]string{},
versions: &objectVersions{ versions: &objectVersions{
objects: []*data.ObjectInfo{obj1}, objects: []*data.ObjectInfo{obj3},
}, },
versioningEnabled: true, versioningEnabled: true,
expectedHeader: map[string]string{versionsAddAttr: obj1.Version()}, expectedHeader: map[string]string{versionsAddAttr: obj3.Version()},
expectedIdsToDelete: nil,
}, },
{ {
header: map[string]string{versionsDelAttr: obj2.Version()}, name: "versioned del header",
header: map[string]string{versionsDelAttr: obj4.Version()},
versions: &objectVersions{ versions: &objectVersions{
objects: []*data.ObjectInfo{obj2}, objects: []*data.ObjectInfo{obj4},
delList: []string{obj1.Version()}, delList: []string{obj3.Version()},
}, },
versioningEnabled: true, versioningEnabled: true,
expectedHeader: map[string]string{ expectedHeader: map[string]string{
versionsAddAttr: obj2.Version(), versionsAddAttr: obj4.Version(),
versionsDelAttr: joinVers(obj1, obj2), versionsDelAttr: joinVers(obj3, obj4),
}, },
expectedIdsToDelete: nil, },
{
name: "unversioned put after some version",
header: map[string]string{},
versions: &objectVersions{
objects: []*data.ObjectInfo{obj1, obj3},
},
expectedHeader: map[string]string{
versionsAddAttr: joinVers(obj1, obj3),
versionsDelAttr: obj1.Version(),
versionsUnversionedAttr: "true",
},
expectedIdsToDelete: []*object.ID{obj1.ID},
}, },
} { } {
idsToDelete := updateCRDT2PSetHeaders(tc.header, tc.versions, tc.versioningEnabled) t.Run(tc.name, func(t *testing.T) {
require.Equal(t, tc.expectedHeader, tc.header) idsToDelete := updateCRDT2PSetHeaders(tc.header, tc.versions, tc.versioningEnabled)
require.Equal(t, tc.expectedIdsToDelete, idsToDelete) require.Equal(t, tc.expectedHeader, tc.header)
require.Equal(t, tc.expectedIdsToDelete, idsToDelete)
})
} }
} }