[#248] Correct NextVersionIDMarker in listing versions #248

Merged
alexvanin merged 1 commit from dkirillov/frostfs-s3-gw:bugfix/correct_handling_version_id_marker into master 2023-11-01 06:53:55 +00:00
6 changed files with 217 additions and 9 deletions

View file

@ -18,6 +18,7 @@ This document outlines major changes between releases.
- Fix parsing `key-marker` for object list versions (#243)
- Fix marshaling errors in `DeleteObjects` method (#222)
- Fix status code in GET/HEAD delete marker (#226)
- Fix `NextVersionIDMarker` in `list-object-versions` (#248)
### Added
- Add `trace_id` value into log record when tracing is enabled (#142)

View file

@ -183,6 +183,7 @@ const (
ErrInvalidRequest
ErrInvalidRequestLargeCopy
ErrInvalidStorageClass
VersionIDMarkerWithoutKeyMarker
ErrMalformedJSON
ErrInsecureClientRequest
@ -314,6 +315,12 @@ var errorCodes = errorCodeMap{
Description: "Invalid storage class.",
HTTPStatusCode: http.StatusBadRequest,
},
VersionIDMarkerWithoutKeyMarker: {
ErrCode: VersionIDMarkerWithoutKeyMarker,
Code: "VersionIDMarkerWithoutKeyMarker",
Description: "A version-id marker cannot be specified without a key marker.",
HTTPStatusCode: http.StatusBadRequest,
},
ErrInvalidRequestBody: {
ErrCode: ErrInvalidRequestBody,
Code: "InvalidArgument",

View file

@ -260,6 +260,10 @@ func parseListObjectVersionsRequest(reqInfo *middleware.ReqInfo) (*layer.ListObj
res.Encode = queryValues.Get("encoding-type")
res.VersionIDMarker = queryValues.Get("version-id-marker")
if res.VersionIDMarker != "" && res.KeyMarker == "" {
return nil, errors.GetAPIError(errors.VersionIDMarkerWithoutKeyMarker)
}
return &res, nil
}

View file

@ -210,6 +210,41 @@ func TestS3BucketListV2DelimiterPrefix(t *testing.T) {
validateListV2(t, tc, bktName, prefix, delim, "", 2, false, true, []string{"boo/bar"}, []string{"boo/baz/"})
}
func TestMintVersioningListObjectVersionsVersionIDContinuation(t *testing.T) {
hc := prepareHandlerContext(t)
bktName, objName := "mint-bucket-for-listing-versions", "objName"
createTestBucket(hc, bktName)
putBucketVersioning(t, hc, bktName, true)
length := 10
objects := make([]string, length)
for i := 0; i < length; i++ {
objects[i] = objName
putObject(hc, bktName, objName)
}
maxKeys := 5
page1 := listObjectsVersions(hc, bktName, "", "", "", "", maxKeys)
require.Len(t, page1.Version, maxKeys)
checkVersionsNames(t, page1, objects)
require.Equal(t, page1.Version[maxKeys-1].VersionID, page1.NextVersionIDMarker)
require.True(t, page1.IsTruncated)
page2 := listObjectsVersions(hc, bktName, "", "", page1.NextKeyMarker, page1.NextVersionIDMarker, maxKeys)
require.Len(t, page2.Version, maxKeys)
checkVersionsNames(t, page1, objects)
require.Empty(t, page2.NextVersionIDMarker)
require.False(t, page2.IsTruncated)
}
func checkVersionsNames(t *testing.T, versions *ListObjectsVersionsResponse, names []string) {
for i, v := range versions.Version {
require.Equal(t, names[i], v.Key)
}
}
func listObjectsV2(hc *handlerContext, bktName, prefix, delimiter, startAfter, continuationToken string, maxKeys int) *ListObjectsV2Response {
query := prepareCommonListObjectsQuery(prefix, delimiter, maxKeys)
if len(startAfter) != 0 {

View file

@ -5,6 +5,7 @@ import (
"sort"
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/data"
s3errors "git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/errors"
)
func (n *layer) ListObjectVersions(ctx context.Context, p *ListObjectVersionsParams) (*ListObjectVersionsInfo, error) {
@ -36,29 +37,58 @@ func (n *layer) ListObjectVersions(ctx context.Context, p *ListObjectVersionsPar
}
}
for i, obj := range allObjects {
if obj.ObjectInfo.Name >= p.KeyMarker && obj.ObjectInfo.VersionID() >= p.VersionIDMarker {
allObjects = allObjects[i:]
break
}
if allObjects, err = filterVersionsByMarker(allObjects, p); err != nil {
return nil, err
}
res.CommonPrefixes, allObjects = triageExtendedObjects(allObjects)
if len(allObjects) > p.MaxKeys {
res.IsTruncated = true
res.NextKeyMarker = allObjects[p.MaxKeys].ObjectInfo.Name
res.NextVersionIDMarker = allObjects[p.MaxKeys].ObjectInfo.VersionID()
res.NextKeyMarker = allObjects[p.MaxKeys-1].ObjectInfo.Name
res.NextVersionIDMarker = allObjects[p.MaxKeys-1].ObjectInfo.VersionID()
allObjects = allObjects[:p.MaxKeys]
res.KeyMarker = allObjects[p.MaxKeys-1].ObjectInfo.Name
res.VersionIDMarker = allObjects[p.MaxKeys-1].ObjectInfo.VersionID()
res.KeyMarker = p.KeyMarker
res.VersionIDMarker = p.VersionIDMarker
}
res.Version, res.DeleteMarker = triageVersions(allObjects)
return res, nil
}
func filterVersionsByMarker(objects []*data.ExtendedObjectInfo, p *ListObjectVersionsParams) ([]*data.ExtendedObjectInfo, error) {
if p.KeyMarker == "" {
return objects, nil
}
for i, obj := range objects {
if obj.ObjectInfo.Name == p.KeyMarker {
for j := i; j < len(objects); j++ {
if objects[j].ObjectInfo.Name != obj.ObjectInfo.Name {
if p.VersionIDMarker == "" {
return objects[j:], nil
}
break
}
if objects[j].ObjectInfo.VersionID() == p.VersionIDMarker {
return objects[j+1:], nil
}
}
return nil, s3errors.GetAPIError(s3errors.ErrInvalidVersion)
} else if obj.ObjectInfo.Name > p.KeyMarker {
if p.VersionIDMarker != "" {
return nil, s3errors.GetAPIError(s3errors.ErrInvalidVersion)
}
return objects[i:], nil
}
}
// don't use nil as empty slice to be consistent with `return objects[j+1:], nil` above
// that can be empty
return []*data.ExtendedObjectInfo{}, nil
}
func triageVersions(objVersions []*data.ExtendedObjectInfo) ([]*data.ExtendedObjectInfo, []*data.ExtendedObjectInfo) {
if len(objVersions) == 0 {
return nil, nil

View file

@ -12,6 +12,7 @@ import (
bearertest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/bearer/test"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
oidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id/test"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/user"
"github.com/nspcc-dev/neo-go/pkg/crypto/keys"
"github.com/stretchr/testify/require"
@ -311,3 +312,133 @@ func TestNoVersioningDeleteObject(t *testing.T) {
tc.getObject(tc.obj, "", true)
tc.checkListObjects()
}
func TestFilterVersionsByMarker(t *testing.T) {
n := 10
testOIDs := make([]oid.ID, n)
for i := 0; i < n; i++ {
testOIDs[i] = oidtest.ID()
}
for _, tc := range []struct {
name string
objects []*data.ExtendedObjectInfo
params *ListObjectVersionsParams
expected []*data.ExtendedObjectInfo
error bool
}{
{
name: "missed key marker",
objects: []*data.ExtendedObjectInfo{
{ObjectInfo: &data.ObjectInfo{Name: "obj0", ID: testOIDs[0]}},
{ObjectInfo: &data.ObjectInfo{Name: "obj0", ID: testOIDs[1]}},
},
params: &ListObjectVersionsParams{KeyMarker: "", VersionIDMarker: "dummy"},
expected: []*data.ExtendedObjectInfo{
{ObjectInfo: &data.ObjectInfo{Name: "obj0", ID: testOIDs[0]}},
{ObjectInfo: &data.ObjectInfo{Name: "obj0", ID: testOIDs[1]}},
},
},
{
name: "last version id",
objects: []*data.ExtendedObjectInfo{
{ObjectInfo: &data.ObjectInfo{Name: "obj0", ID: testOIDs[0]}},
{ObjectInfo: &data.ObjectInfo{Name: "obj0", ID: testOIDs[1]}},
},
params: &ListObjectVersionsParams{KeyMarker: "obj0", VersionIDMarker: testOIDs[1].EncodeToString()},
expected: []*data.ExtendedObjectInfo{},
},
{
name: "same name, different versions",
objects: []*data.ExtendedObjectInfo{
{ObjectInfo: &data.ObjectInfo{Name: "obj0", ID: testOIDs[0]}},
{ObjectInfo: &data.ObjectInfo{Name: "obj0", ID: testOIDs[1]}},
},
params: &ListObjectVersionsParams{KeyMarker: "obj0", VersionIDMarker: testOIDs[0].EncodeToString()},
expected: []*data.ExtendedObjectInfo{
{ObjectInfo: &data.ObjectInfo{Name: "obj0", ID: testOIDs[1]}},
},
},
{
name: "different name, different versions",
objects: []*data.ExtendedObjectInfo{
{ObjectInfo: &data.ObjectInfo{Name: "obj0", ID: testOIDs[0]}},
{ObjectInfo: &data.ObjectInfo{Name: "obj1", ID: testOIDs[1]}},
},
params: &ListObjectVersionsParams{KeyMarker: "obj0", VersionIDMarker: testOIDs[0].EncodeToString()},
expected: []*data.ExtendedObjectInfo{
{ObjectInfo: &data.ObjectInfo{Name: "obj1", ID: testOIDs[1]}},
},
},
{
name: "not matched name alphabetically less",
objects: []*data.ExtendedObjectInfo{
{ObjectInfo: &data.ObjectInfo{Name: "obj0", ID: testOIDs[0]}},
{ObjectInfo: &data.ObjectInfo{Name: "obj1", ID: testOIDs[1]}},
},
params: &ListObjectVersionsParams{KeyMarker: "obj", VersionIDMarker: ""},
expected: []*data.ExtendedObjectInfo{
{ObjectInfo: &data.ObjectInfo{Name: "obj0", ID: testOIDs[0]}},
{ObjectInfo: &data.ObjectInfo{Name: "obj1", ID: testOIDs[1]}},
},
},
{
name: "not matched name alphabetically less with dummy version id",
objects: []*data.ExtendedObjectInfo{
{ObjectInfo: &data.ObjectInfo{Name: "obj0", ID: testOIDs[0]}},
},
params: &ListObjectVersionsParams{KeyMarker: "obj", VersionIDMarker: "dummy"},
error: true,
},
{
name: "not matched name alphabetically greater",
objects: []*data.ExtendedObjectInfo{
{ObjectInfo: &data.ObjectInfo{Name: "obj0", ID: testOIDs[0]}},
{ObjectInfo: &data.ObjectInfo{Name: "obj1", ID: testOIDs[1]}},
},
params: &ListObjectVersionsParams{KeyMarker: "obj2", VersionIDMarker: testOIDs[2].EncodeToString()},
expected: []*data.ExtendedObjectInfo{},
},
{
name: "not found version id",
objects: []*data.ExtendedObjectInfo{
{ObjectInfo: &data.ObjectInfo{Name: "obj0", ID: testOIDs[0]}},
{ObjectInfo: &data.ObjectInfo{Name: "obj0", ID: testOIDs[1]}},
{ObjectInfo: &data.ObjectInfo{Name: "obj1", ID: testOIDs[2]}},
},
params: &ListObjectVersionsParams{KeyMarker: "obj0", VersionIDMarker: "dummy"},
error: true,
},
{
name: "not found version id, obj last",
objects: []*data.ExtendedObjectInfo{
{ObjectInfo: &data.ObjectInfo{Name: "obj0", ID: testOIDs[0]}},
{ObjectInfo: &data.ObjectInfo{Name: "obj0", ID: testOIDs[1]}},
},
params: &ListObjectVersionsParams{KeyMarker: "obj0", VersionIDMarker: "dummy"},
error: true,
},
{
name: "not found version id, obj last",
objects: []*data.ExtendedObjectInfo{
{ObjectInfo: &data.ObjectInfo{Name: "obj0", ID: testOIDs[0]}},
{ObjectInfo: &data.ObjectInfo{Name: "obj0", ID: testOIDs[1]}},
{ObjectInfo: &data.ObjectInfo{Name: "obj1", ID: testOIDs[2]}},
},
params: &ListObjectVersionsParams{KeyMarker: "obj0", VersionIDMarker: ""},
expected: []*data.ExtendedObjectInfo{
{ObjectInfo: &data.ObjectInfo{Name: "obj1", ID: testOIDs[2]}},
},
},
} {
t.Run(tc.name, func(t *testing.T) {
actual, err := filterVersionsByMarker(tc.objects, tc.params)
if tc.error {
require.Error(t, err)
} else {
require.NoError(t, err)
require.Equal(t, tc.expected, actual)
}
})
}
}