[#242] Fix versions sorting

Signed-off-by: Denis Kirillov <denis@nspcc.ru>
This commit is contained in:
Denis Kirillov 2021-09-02 11:40:41 +03:00 committed by Alex Vanin
parent 42ed6a16ea
commit 458f9cf17b
5 changed files with 231 additions and 76 deletions

View file

@ -419,7 +419,7 @@ func containsACLHeaders(r *http.Request) bool {
r.Header.Get(api.AmzGrantFullControl) != "" || r.Header.Get(api.AmzGrantWrite) != ""
}
func (h *handler) getNewEAclTable(r *http.Request, objInfo *layer.ObjectInfo) (*eacl.Table, error) {
func (h *handler) getNewEAclTable(r *http.Request, objInfo *api.ObjectInfo) (*eacl.Table, error) {
var newEaclTable *eacl.Table
objectACL, err := parseACLHeaders(r)
if err != nil {

View file

@ -249,7 +249,7 @@ func updateCRDT2PSetHeaders(p *PutObjectParams, versions *objectVersions, versio
}
if versioningEnabled {
if len(versions.addList) != 0 {
if !versions.isAddListEmpty() {
p.Header[versionsAddAttr] = versions.getAddHeader()
}
@ -319,11 +319,11 @@ func (n *layer) headVersions(ctx context.Context, bkt *api.BucketInfo, objectNam
return nil, err
}
versions := newObjectVersions(objectName)
if len(ids) == 0 {
return nil, apiErrors.GetAPIError(apiErrors.ErrNoSuchKey)
return versions, apiErrors.GetAPIError(apiErrors.ErrNoSuchKey)
}
versions := newObjectVersions(objectName)
for _, id := range ids {
meta, err := n.objectHead(ctx, bkt.CID, id)
if err != nil {
@ -526,16 +526,6 @@ func (n *layer) getAllObjectsVersions(ctx context.Context, bkt *api.BucketInfo,
return versions, nil
}
func getExistedVersions(versions *objectVersions) []string {
var res []string
for _, add := range versions.addList {
if !contains(versions.delList, add) {
res = append(res, add)
}
}
return res
}
func splitVersions(header string) []string {
if len(header) == 0 {
return nil

View file

@ -2,6 +2,7 @@ package layer
import (
"context"
"math"
"sort"
"strconv"
"strings"
@ -34,15 +35,15 @@ func newObjectVersions(name string) *objectVersions {
return &objectVersions{name: name}
}
func (v *objectVersions) isAddListEmpty() bool {
v.sort()
return len(v.addList) == 0
}
func (v *objectVersions) appendVersion(oi *api.ObjectInfo) {
addVers := append(splitVersions(oi.Headers[versionsAddAttr]), oi.Version())
delVers := splitVersions(oi.Headers[versionsDelAttr])
v.objects = append(v.objects, oi)
for _, add := range addVers {
if !contains(v.addList, add) {
v.addList = append(v.addList, add)
}
}
for _, del := range delVers {
if !contains(v.delList, del) {
v.delList = append(v.delList, del)
@ -54,19 +55,114 @@ func (v *objectVersions) appendVersion(oi *api.ObjectInfo) {
func (v *objectVersions) sort() {
if !v.isSorted {
sort.Slice(v.objects, func(i, j int) bool {
return less(v.objects[i], v.objects[j])
o1, o2 := v.objects[i], v.objects[j]
if o1.CreationEpoch == o2.CreationEpoch {
l1, l2 := o1.Headers[versionsAddAttr], o2.Headers[versionsAddAttr]
if len(l1) != len(l2) {
if strings.HasPrefix(l1, l2) {
return false
} else if strings.HasPrefix(l2, l1) {
return true
}
}
return o1.Version() < o2.Version()
}
return o1.CreationEpoch < o2.CreationEpoch
})
v.formAddList()
v.isSorted = true
}
}
func (v *objectVersions) formAddList() {
for i := 0; i < len(v.objects); i++ {
var conflicts [][]string
for { // forming conflicts set (objects with the same creation epoch)
addVers := append(splitVersions(v.objects[i].Headers[versionsAddAttr]), v.objects[i].Version())
conflicts = append(conflicts, addVers)
if i == len(v.objects)-1 || v.objects[i].CreationEpoch != v.objects[i+1].CreationEpoch ||
containsVersions(v.objects[i+1], addVers) {
break
}
i++
}
if len(conflicts) == 1 {
v.addList = addIfNotContains(v.addList, conflicts[0])
continue
}
commonVersions, prevConflictedVersions, conflictedVersions := mergeVersionsConflicts(conflicts)
v.addList = commonVersions
v.addList = addIfNotContains(v.addList, prevConflictedVersions)
v.addList = addIfNotContains(v.addList, conflictedVersions)
}
}
func containsVersions(obj *api.ObjectInfo, versions []string) bool {
header := obj.Headers[versionsAddAttr]
for _, version := range versions {
if !strings.Contains(header, version) {
return false
}
}
return true
}
func addIfNotContains(list1, list2 []string) []string {
for _, add := range list2 {
if !contains(list1, add) {
list1 = append(list1, add)
}
}
return list1
}
func mergeVersionsConflicts(conflicts [][]string) ([]string, []string, []string) {
var currentVersions []string
var prevVersions []string
minLength := math.MaxInt32
for _, conflicted := range conflicts {
if len(conflicted)-1 < minLength {
minLength = len(conflicted) - 1
}
//last := conflicted[len(conflicted)-1]
//conflicts[j] = conflicted[:len(conflicted)-1]
//currentVersions = append(currentVersions, last)
}
var commonAddedVersions []string
diffIndex := 0
LOOP:
for k := 0; k < minLength; k++ {
candidate := conflicts[0][k]
for _, conflicted := range conflicts {
if conflicted[k] != candidate {
diffIndex = k
break LOOP
}
}
commonAddedVersions = append(commonAddedVersions, candidate)
}
for _, conflicted := range conflicts {
for j := diffIndex; j < len(conflicted); j++ {
prevVersions = append(prevVersions, conflicted[j])
}
}
sort.Strings(prevVersions)
sort.Strings(currentVersions)
return commonAddedVersions, prevVersions, currentVersions
}
func (v *objectVersions) getLast() *api.ObjectInfo {
if len(v.objects) == 0 {
if v == nil || len(v.objects) == 0 {
return nil
}
v.sort()
existedVersions := getExistedVersions(v)
existedVersions := v.existedVersions()
for i := len(v.objects) - 1; i >= 0; i-- {
if contains(existedVersions, v.objects[i].Version()) {
delMarkHeader := v.objects[i].Headers[versionsDeleteMarkAttr]
@ -82,13 +178,24 @@ func (v *objectVersions) getLast() *api.ObjectInfo {
return nil
}
func (v *objectVersions) getFiltered() []*api.ObjectInfo {
func (v *objectVersions) existedVersions() []string {
v.sort()
var res []string
for _, add := range v.addList {
if !contains(v.delList, add) {
res = append(res, add)
}
}
return res
}
func (v *objectVersions) getFiltered(reverse bool) []*api.ObjectInfo {
if len(v.objects) == 0 {
return nil
}
v.sort()
existedVersions := getExistedVersions(v)
existedVersions := v.existedVersions()
res := make([]*api.ObjectInfo, 0, len(v.objects))
for _, version := range v.objects {
@ -98,10 +205,17 @@ func (v *objectVersions) getFiltered() []*api.ObjectInfo {
}
}
if reverse {
for i, j := 0, len(res)-1; i < j; i, j = i+1, j-1 {
res[i], res[j] = res[j], res[i]
}
}
return res
}
func (v *objectVersions) getAddHeader() string {
v.sort()
return strings.Join(v.addList, ",")
}
@ -149,6 +263,7 @@ func (n *layer) ListObjectVersions(ctx context.Context, p *ListObjectVersionsPar
versions map[string]*objectVersions
allObjects = make([]*api.ObjectInfo, 0, p.MaxKeys)
res = &ListObjectVersionsInfo{}
reverse = true
)
bkt, err := n.GetBucketInfo(ctx, p.Bucket)
@ -167,7 +282,7 @@ func (n *layer) ListObjectVersions(ctx context.Context, p *ListObjectVersionsPar
sort.Strings(sortedNames)
for _, name := range sortedNames {
allObjects = append(allObjects, versions[name].getFiltered()...)
allObjects = append(allObjects, versions[name].getFiltered(reverse)...)
}
for i, obj := range allObjects {
@ -192,7 +307,7 @@ func (n *layer) ListObjectVersions(ctx context.Context, p *ListObjectVersionsPar
objects := make([]*ObjectVersionInfo, len(allObjects))
for i, obj := range allObjects {
objects[i] = &ObjectVersionInfo{Object: obj}
if i == len(allObjects)-1 || allObjects[i+1].Name != obj.Name {
if i == 0 || allObjects[i-1].Name != obj.Name {
objects[i].IsLatest = true
}
}
@ -220,13 +335,6 @@ func triageVersions(objVersions []*ObjectVersionInfo) ([]*ObjectVersionInfo, []*
return resVersion, resDelMarkVersions
}
func less(ov1, ov2 *api.ObjectInfo) bool {
if ov1.CreationEpoch == ov2.CreationEpoch {
return ov1.Version() < ov2.Version()
}
return ov1.CreationEpoch < ov2.CreationEpoch
}
func contains(list []string, elem string) bool {
for _, item := range list {
if elem == item {
@ -267,7 +375,7 @@ func (n *layer) checkVersionsExist(ctx context.Context, bkt *api.BucketInfo, obj
if err != nil {
return nil, err
}
if !contains(getExistedVersions(versions), obj.VersionID) {
if !contains(versions.existedVersions(), obj.VersionID) {
return nil, errors.GetAPIError(errors.ErrInvalidVersion)
}

View file

@ -7,6 +7,7 @@ import (
"crypto/sha256"
"fmt"
"io"
"strconv"
"strings"
"testing"
@ -445,13 +446,13 @@ func TestNoVersioningDeleteObject(t *testing.T) {
}
func TestGetLastVersion(t *testing.T) {
obj1 := getTestObjectInfo(1, getOID(1), "", "", "")
obj1V2 := getTestObjectInfo(1, getOID(2), "", "", "")
obj2 := getTestObjectInfo(2, getOID(2), obj1.Version(), "", "")
obj3 := getTestObjectInfo(3, getOID(3), joinVers(obj1, obj2), "", "*")
obj4 := getTestObjectInfo(4, getOID(4), joinVers(obj1, obj2), obj2.Version(), obj2.Version())
obj5 := getTestObjectInfo(5, getOID(5), obj1.Version(), obj1.Version(), obj1.Version())
obj6 := getTestObjectInfo(6, getOID(6), joinVers(obj1, obj2, obj3), obj3.Version(), obj3.Version())
obj1 := getTestObjectInfo(1, "", "", "")
obj1V2 := getTestObjectInfo(2, "", "", "")
obj2 := getTestObjectInfoEpoch(1, 2, obj1.Version(), "", "")
obj3 := getTestObjectInfoEpoch(1, 3, joinVers(obj1, obj2), "", "*")
obj4 := getTestObjectInfoEpoch(1, 4, joinVers(obj1, obj2), obj2.Version(), obj2.Version())
obj5 := getTestObjectInfoEpoch(1, 5, obj1.Version(), obj1.Version(), obj1.Version())
obj6 := getTestObjectInfoEpoch(1, 6, joinVers(obj1, obj2, obj3), obj3.Version(), obj3.Version())
for _, tc := range []struct {
versions *objectVersions
@ -510,9 +511,7 @@ func TestGetLastVersion(t *testing.T) {
objects: []*api.ObjectInfo{obj1, obj1V2},
addList: []string{obj1.Version(), obj1V2.Version()},
},
// creation epochs are equal
// obj1 version/oid > obj1_1 version/oid
expected: obj1,
expected: obj1V2,
},
} {
actualObjInfo := tc.versions.getLast()
@ -521,10 +520,12 @@ func TestGetLastVersion(t *testing.T) {
}
func TestAppendVersions(t *testing.T) {
obj1 := getTestObjectInfo(1, getOID(1), "", "", "")
obj2 := getTestObjectInfo(2, getOID(2), obj1.Version(), "", "")
obj3 := getTestObjectInfo(3, getOID(3), joinVers(obj1, obj2), "", "*")
obj4 := getTestObjectInfo(4, getOID(4), joinVers(obj1, obj2), obj2.Version(), obj2.Version())
obj1 := getTestObjectInfo(1, "", "", "")
obj2 := getTestObjectInfo(2, obj1.Version(), "", "")
obj3 := getTestObjectInfo(3, joinVers(obj1, obj2), "", "*")
obj4 := getTestObjectInfo(4, joinVers(obj1, obj2), obj2.Version(), obj2.Version())
obj5 := getTestObjectInfo(5, joinVers(obj1, obj2), "", "")
obj6 := getTestObjectInfo(6, joinVers(obj1, obj3), "", "")
for _, tc := range []struct {
versions *objectVersions
@ -537,6 +538,7 @@ func TestAppendVersions(t *testing.T) {
expectedVersions: &objectVersions{
objects: []*api.ObjectInfo{obj1},
addList: []string{obj1.Version()},
isSorted: true,
},
},
{
@ -545,6 +547,7 @@ func TestAppendVersions(t *testing.T) {
expectedVersions: &objectVersions{
objects: []*api.ObjectInfo{obj1, obj2},
addList: []string{obj1.Version(), obj2.Version()},
isSorted: true,
},
},
{
@ -553,6 +556,7 @@ func TestAppendVersions(t *testing.T) {
expectedVersions: &objectVersions{
objects: []*api.ObjectInfo{obj1, obj2, obj3},
addList: []string{obj1.Version(), obj2.Version(), obj3.Version()},
isSorted: true,
},
},
{
@ -562,14 +566,61 @@ func TestAppendVersions(t *testing.T) {
objects: []*api.ObjectInfo{obj1, obj2, obj4},
addList: []string{obj1.Version(), obj2.Version(), obj4.Version()},
delList: []string{obj2.Version()},
isSorted: true,
},
},
{
versions: &objectVersions{objects: []*api.ObjectInfo{obj5}},
objectToAdd: obj6,
expectedVersions: &objectVersions{
objects: []*api.ObjectInfo{obj5, obj6},
addList: []string{obj1.Version(), obj2.Version(), obj3.Version(), obj5.Version(), obj6.Version()},
isSorted: true,
},
},
} {
tc.versions.appendVersion(tc.objectToAdd)
tc.versions.sort()
require.Equal(t, tc.expectedVersions, tc.versions)
}
}
func TestSortAddHeaders(t *testing.T) {
obj1 := getTestObjectInfo(1, "", "", "")
obj2 := getTestObjectInfo(2, "", "", "")
obj3 := getTestObjectInfo(3, "", "", "")
obj4 := getTestObjectInfo(4, "", "", "")
obj5 := getTestObjectInfo(5, "", "", "")
obj6 := getTestObjectInfoEpoch(1, 6, joinVers(obj1, obj2, obj3), "", "")
obj7 := getTestObjectInfoEpoch(1, 7, joinVers(obj1, obj4), "", "")
obj8 := getTestObjectInfoEpoch(1, 8, joinVers(obj5), "", "")
obj9 := getTestObjectInfoEpoch(1, 8, joinVers(obj1, obj5), "", "")
obj10 := getTestObjectInfo(11, "", "", "")
obj11 := getTestObjectInfo(10, joinVers(obj10), "", "")
obj12 := getTestObjectInfo(9, joinVers(obj10, obj11), "", "")
for _, tc := range []struct {
versions *objectVersions
expectedAddHeaders string
}{
{
versions: &objectVersions{objects: []*api.ObjectInfo{obj6, obj7, obj8}},
expectedAddHeaders: joinVers(obj1, obj2, obj3, obj4, obj5, obj6, obj7, obj8),
},
{
versions: &objectVersions{objects: []*api.ObjectInfo{obj7, obj9}},
expectedAddHeaders: joinVers(obj1, obj4, obj5, obj7, obj9),
},
{
versions: &objectVersions{objects: []*api.ObjectInfo{obj11, obj10, obj12}},
expectedAddHeaders: joinVers(obj10, obj11, obj12),
},
} {
require.Equal(t, tc.expectedAddHeaders, tc.versions.getAddHeader())
}
}
func joinVers(objs ...*api.ObjectInfo) string {
if len(objs) == 0 {
return ""
@ -584,14 +635,14 @@ func joinVers(objs ...*api.ObjectInfo) string {
}
func getOID(id byte) *object.ID {
b := make([]byte, 32)
b[0] = id
b := [32]byte{}
b[31] = id
oid := object.NewID()
oid.SetSHA256(sha256.Sum256(b))
oid.SetSHA256(b)
return oid
}
func getTestObjectInfo(epoch uint64, oid *object.ID, addAttr, delAttr, delMarkAttr string) *api.ObjectInfo {
func getTestObjectInfo(id byte, addAttr, delAttr, delMarkAttr string) *api.ObjectInfo {
headers := make(map[string]string)
if addAttr != "" {
headers[versionsAddAttr] = addAttr
@ -604,8 +655,14 @@ func getTestObjectInfo(epoch uint64, oid *object.ID, addAttr, delAttr, delMarkAt
}
return &api.ObjectInfo{
ID: oid,
CreationEpoch: epoch,
ID: getOID(id),
Name: strconv.Itoa(int(id)),
Headers: headers,
}
}
func getTestObjectInfoEpoch(epoch uint64, id byte, addAttr, delAttr, delMarkAttr string) *api.ObjectInfo {
obj := getTestObjectInfo(id, addAttr, delAttr, delMarkAttr)
obj.CreationEpoch = epoch
return obj
}

View file

@ -377,34 +377,34 @@ Compatibility: 9/8/11
## Versioning
Compatibility: 1/24/26
Compatibility: 11/24/26
| | Test | s3-gw | aws s3 |
|----|---------------------------------------------------------------------------------------------|-------|--------|
| 1 | s3tests_boto3.functional.test_s3.test_versioning_bucket_create_suspend | ok | ok |
| 2 | s3tests_boto3.functional.test_s3.test_versioning_obj_create_read_remove | ERROR | ok |
| 2 | s3tests_boto3.functional.test_s3.test_versioning_obj_create_read_remove | ok | ok |
| 3 | s3tests_boto3.functional.test_s3.test_versioning_obj_create_read_remove_head | ERROR | ok |
| 4 | s3tests_boto3.functional.test_s3.test_versioning_obj_plain_null_version_removal | ERROR | ok |
| 5 | s3tests_boto3.functional.test_s3.test_versioning_obj_plain_null_version_overwrite | ERROR | ok |
| 6 | s3tests_boto3.functional.test_s3.test_versioning_obj_plain_null_version_overwrite_suspended | ERROR | ok |
| 7 | s3tests_boto3.functional.test_s3.test_versioning_obj_suspend_versions | ERROR | ok |
| 8 | s3tests_boto3.functional.test_s3.test_versioning_obj_create_versions_remove_all | ERROR | ok |
| 9 | s3tests_boto3.functional.test_s3.test_versioning_obj_create_versions_remove_special_names | ERROR | ok |
| 8 | s3tests_boto3.functional.test_s3.test_versioning_obj_create_versions_remove_all | ok | ok |
| 9 | s3tests_boto3.functional.test_s3.test_versioning_obj_create_versions_remove_special_names | ok | ok |
| 10 | s3tests_boto3.functional.test_s3.test_versioning_obj_create_overwrite_multipart | ERROR | ok |
| 11 | s3tests_boto3.functional.test_s3.test_versioning_obj_list_marker | ERROR | ok |
| 12 | s3tests_boto3.functional.test_s3.test_versioning_copy_obj_version | ERROR | ok |
| 13 | s3tests_boto3.functional.test_s3.test_versioning_multi_object_delete | ERROR | ok |
| 14 | s3tests_boto3.functional.test_s3.test_versioning_multi_object_delete_with_marker | ERROR | ok |
| 11 | s3tests_boto3.functional.test_s3.test_versioning_obj_list_marker | ok | ok |
| 12 | s3tests_boto3.functional.test_s3.test_versioning_copy_obj_version | ok | ok |
| 13 | s3tests_boto3.functional.test_s3.test_versioning_multi_object_delete | ok | ok |
| 14 | s3tests_boto3.functional.test_s3.test_versioning_multi_object_delete_with_marker | ok | ok |
| 15 | s3tests_boto3.functional.test_s3.test_versioning_multi_object_delete_with_marker_create | ERROR | ok |
| 16 | s3tests_boto3.functional.test_s3.test_versioned_object_acl | ERROR | FAIL |
| 17 | s3tests_boto3.functional.test_s3.test_versioned_object_acl_no_version_specified | ERROR | FAIL |
| 18 | s3tests_boto3.functional.test_s3.test_versioned_concurrent_object_create_concurrent_remove | ERROR | ok |
| 19 | s3tests_boto3.functional.test_s3.test_versioned_concurrent_object_create_and_remove | ERROR | ok |
| 20 | s3tests_boto3.functional.test_s3.test_versioning_bucket_atomic_upload_return_version_id | ERROR | ok |
| 20 | s3tests_boto3.functional.test_s3.test_versioning_bucket_atomic_upload_return_version_id | ok | ok |
| 21 | s3tests_boto3.functional.test_s3.test_versioning_bucket_multipart_upload_return_version_id | ERROR | ok |
| 22 | s3tests_boto3.functional.test_s3.test_bucket_list_return_data_versioning | ERROR | ok |
| 23 | s3tests_boto3.functional.test_s3.test_object_copy_versioned_bucket | ERROR | ok |
| 24 | s3tests_boto3.functional.test_s3.test_object_copy_versioned_url_encoding | ERROR | ok |
| 23 | s3tests_boto3.functional.test_s3.test_object_copy_versioned_bucket | ok | ok |
| 24 | s3tests_boto3.functional.test_s3.test_object_copy_versioned_url_encoding | ok | ok |
| 25 | s3tests_boto3.functional.test_s3.test_object_copy_versioning_multipart_upload | ERROR | ok |
| 26 | s3tests_boto3.functional.test_s3.test_multipart_copy_versioned | ERROR | ok |