[#474] Use appropriate null version during listing

Signed-off-by: Denis Kirillov <denis@nspcc.ru>
This commit is contained in:
Denis Kirillov 2022-05-31 18:03:58 +03:00 committed by Alex Vanin
parent c8e8ba9f6a
commit a02900a4f7
9 changed files with 81 additions and 22 deletions

View file

@ -27,6 +27,7 @@ type DeleteMarkerInfo struct {
type ExtendedObjectInfo struct { type ExtendedObjectInfo struct {
ObjectInfo *ObjectInfo ObjectInfo *ObjectInfo
NodeVersion *NodeVersion NodeVersion *NodeVersion
IsLatest bool
} }
// BaseNodeVersion is minimal node info from tree service. // BaseNodeVersion is minimal node info from tree service.

View file

@ -272,6 +272,10 @@ func encodeListObjectVersionsToResponse(info *layer.ListObjectVersionsInfo, buck
} }
for _, ver := range info.Version { for _, ver := range info.Version {
versionID := ver.Object.Version()
if ver.IsUnversioned {
versionID = layer.UnversionedObjectVersionID
}
res.Version = append(res.Version, ObjectVersionResponse{ res.Version = append(res.Version, ObjectVersionResponse{
IsLatest: ver.IsLatest, IsLatest: ver.IsLatest,
Key: ver.Object.Name, Key: ver.Object.Name,
@ -281,7 +285,7 @@ func encodeListObjectVersionsToResponse(info *layer.ListObjectVersionsInfo, buck
DisplayName: ver.Object.Owner.String(), DisplayName: ver.Object.Owner.String(),
}, },
Size: ver.Object.Size, Size: ver.Object.Size,
VersionID: ver.Object.Version(), // todo return "null" version for unversioned https://github.com/nspcc-dev/neofs-s3-gw/issues/474 VersionID: versionID,
ETag: ver.Object.HashSum, ETag: ver.Object.HashSum,
}) })
} }

View file

@ -1,8 +1,12 @@
package handler package handler
import ( import (
"bytes"
"context"
"net/http"
"testing" "testing"
"github.com/nspcc-dev/neofs-s3-gw/api/layer"
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
) )
@ -35,3 +39,37 @@ func TestParseContinuationToken(t *testing.T) {
require.Equal(t, tokenStr, token) require.Equal(t, tokenStr, token)
}) })
} }
func TestListObjectNullVersions(t *testing.T) {
ctx := context.Background()
hc := prepareHandlerContext(t)
bktName := "bucket-versioning-enabled"
createTestBucket(ctx, t, hc, bktName)
objName := "object"
body := bytes.NewReader([]byte("content"))
w, r := prepareTestPayloadRequest(bktName, objName, body)
hc.Handler().PutObjectHandler(w, r)
assertStatus(t, w, http.StatusOK)
versioning := &VersioningConfiguration{Status: "Enabled"}
w, r = prepareTestRequest(t, bktName, objName, versioning)
hc.Handler().PutBucketVersioningHandler(w, r)
assertStatus(t, w, http.StatusOK)
body2 := bytes.NewReader([]byte("content2"))
w, r = prepareTestPayloadRequest(bktName, objName, body2)
hc.Handler().PutObjectHandler(w, r)
assertStatus(t, w, http.StatusOK)
w, r = prepareTestRequest(t, bktName, objName, nil)
hc.Handler().ListBucketObjectVersionsHandler(w, r)
result := &ListObjectsVersionsResponse{}
parseTestResponse(t, w, result)
require.Len(t, result.Version, 2)
require.Equal(t, layer.UnversionedObjectVersionID, result.Version[1].VersionID)
}

View file

@ -470,14 +470,14 @@ func getRandomOID() (*oid.ID, error) {
// DeleteObject removes all objects with the passed nice name. // DeleteObject removes all objects with the passed nice name.
func (n *layer) deleteObject(ctx context.Context, bkt *data.BucketInfo, settings *data.BucketSettings, obj *VersionedObject) *VersionedObject { func (n *layer) deleteObject(ctx context.Context, bkt *data.BucketInfo, settings *data.BucketSettings, obj *VersionedObject) *VersionedObject {
if len(obj.VersionID) == 0 || obj.VersionID == unversionedObjectVersionID { if len(obj.VersionID) == 0 || obj.VersionID == UnversionedObjectVersionID {
randOID, err := getRandomOID() randOID, err := getRandomOID()
if err != nil { if err != nil {
obj.Error = fmt.Errorf("couldn't get random oid: %w", err) obj.Error = fmt.Errorf("couldn't get random oid: %w", err)
return obj return obj
} }
obj.DeleteMarkVersion = unversionedObjectVersionID obj.DeleteMarkVersion = UnversionedObjectVersionID
newVersion := &data.NodeVersion{ newVersion := &data.NodeVersion{
BaseNodeVersion: data.BaseNodeVersion{ BaseNodeVersion: data.BaseNodeVersion{
OID: *randOID, OID: *randOID,

View file

@ -277,7 +277,7 @@ func (n *layer) headLastVersionIfNotDeleted(ctx context.Context, bkt *data.Bucke
func (n *layer) headVersion(ctx context.Context, bkt *data.BucketInfo, p *HeadObjectParams) (*data.ObjectInfo, error) { func (n *layer) headVersion(ctx context.Context, bkt *data.BucketInfo, p *HeadObjectParams) (*data.ObjectInfo, error) {
var err error var err error
var foundVersion *data.NodeVersion var foundVersion *data.NodeVersion
if p.VersionID == unversionedObjectVersionID { if p.VersionID == UnversionedObjectVersionID {
foundVersion, err = n.treeService.GetUnversioned(ctx, &bkt.CID, p.Object) foundVersion, err = n.treeService.GetUnversioned(ctx, &bkt.CID, p.Object)
if err != nil { if err != nil {
if errors.Is(err, ErrNodeNotFound) { if errors.Is(err, ErrNodeNotFound) {
@ -572,6 +572,18 @@ func triageObjects(allObjects []*data.ObjectInfo) (prefixes []string, objects []
return return
} }
func triageExtendedObjects(allObjects []*data.ExtendedObjectInfo) (prefixes []string, objects []*data.ExtendedObjectInfo) {
for _, ov := range allObjects {
if ov.ObjectInfo.IsDir {
prefixes = append(prefixes, ov.ObjectInfo.Name)
} else {
objects = append(objects, ov)
}
}
return
}
func (n *layer) listAllObjects(ctx context.Context, p ListObjectsParamsCommon) ([]*data.ObjectInfo, error) { func (n *layer) listAllObjects(ctx context.Context, p ListObjectsParamsCommon) ([]*data.ObjectInfo, error) {
var ( var (
err error err error

View file

@ -131,7 +131,7 @@ func (n *layer) getNodeVersion(ctx context.Context, objVersion *ObjectVersion) (
var err error var err error
var version *data.NodeVersion var version *data.NodeVersion
if objVersion.VersionID == unversionedObjectVersionID { if objVersion.VersionID == UnversionedObjectVersionID {
version, err = n.treeService.GetUnversioned(ctx, &objVersion.BktInfo.CID, objVersion.ObjectName) version, err = n.treeService.GetUnversioned(ctx, &objVersion.BktInfo.CID, objVersion.ObjectName)
} else if len(objVersion.VersionID) == 0 { } else if len(objVersion.VersionID) == 0 {
version, err = n.treeService.GetLatestVersion(ctx, &objVersion.BktInfo.CID, objVersion.ObjectName) version, err = n.treeService.GetLatestVersion(ctx, &objVersion.BktInfo.CID, objVersion.ObjectName)

View file

@ -39,6 +39,7 @@ type (
ObjectVersionInfo struct { ObjectVersionInfo struct {
Object *data.ObjectInfo Object *data.ObjectInfo
IsLatest bool IsLatest bool
IsUnversioned bool
} }
// ListObjectVersionsInfo stores info and list of objects versions. // ListObjectVersionsInfo stores info and list of objects versions.

View file

@ -8,12 +8,12 @@ import (
) )
const ( const (
unversionedObjectVersionID = "null" UnversionedObjectVersionID = "null"
) )
func (n *layer) ListObjectVersions(ctx context.Context, p *ListObjectVersionsParams) (*ListObjectVersionsInfo, error) { func (n *layer) ListObjectVersions(ctx context.Context, p *ListObjectVersionsParams) (*ListObjectVersionsInfo, error) {
var ( var (
allObjects = make([]*data.ObjectInfo, 0, p.MaxKeys) allObjects = make([]*data.ExtendedObjectInfo, 0, p.MaxKeys)
res = &ListObjectVersionsInfo{} res = &ListObjectVersionsInfo{}
) )
@ -34,35 +34,37 @@ func (n *layer) ListObjectVersions(ctx context.Context, p *ListObjectVersionsPar
return sortedVersions[j].NodeVersion.Timestamp < sortedVersions[i].NodeVersion.Timestamp // sort in reverse order return sortedVersions[j].NodeVersion.Timestamp < sortedVersions[i].NodeVersion.Timestamp // sort in reverse order
}) })
for _, version := range sortedVersions { for i, version := range sortedVersions {
allObjects = append(allObjects, version.ObjectInfo) version.IsLatest = i == 0
allObjects = append(allObjects, version)
} }
} }
for i, obj := range allObjects { for i, obj := range allObjects {
if obj.Name >= p.KeyMarker && obj.Version() >= p.VersionIDMarker { if obj.ObjectInfo.Name >= p.KeyMarker && obj.ObjectInfo.Version() >= p.VersionIDMarker {
allObjects = allObjects[i:] allObjects = allObjects[i:]
break break
} }
} }
res.CommonPrefixes, allObjects = triageObjects(allObjects) res.CommonPrefixes, allObjects = triageExtendedObjects(allObjects)
if len(allObjects) > p.MaxKeys { if len(allObjects) > p.MaxKeys {
res.IsTruncated = true res.IsTruncated = true
res.NextKeyMarker = allObjects[p.MaxKeys].Name res.NextKeyMarker = allObjects[p.MaxKeys].ObjectInfo.Name
res.NextVersionIDMarker = allObjects[p.MaxKeys].Version() res.NextVersionIDMarker = allObjects[p.MaxKeys].ObjectInfo.Version()
allObjects = allObjects[:p.MaxKeys] allObjects = allObjects[:p.MaxKeys]
res.KeyMarker = allObjects[p.MaxKeys-1].Name res.KeyMarker = allObjects[p.MaxKeys-1].ObjectInfo.Name
res.VersionIDMarker = allObjects[p.MaxKeys-1].Version() res.VersionIDMarker = allObjects[p.MaxKeys-1].ObjectInfo.Version()
} }
objects := make([]*ObjectVersionInfo, len(allObjects)) objects := make([]*ObjectVersionInfo, len(allObjects))
for i, obj := range allObjects { for i, obj := range allObjects {
objects[i] = &ObjectVersionInfo{Object: obj} objects[i] = &ObjectVersionInfo{
if i == 0 || allObjects[i-1].Name != obj.Name { Object: obj.ObjectInfo,
objects[i].IsLatest = true IsUnversioned: obj.NodeVersion.IsUnversioned,
IsLatest: obj.IsLatest,
} }
} }

View file

@ -250,7 +250,7 @@ func TestGetUnversioned(t *testing.T) {
}) })
require.NoError(t, err) require.NoError(t, err)
resInfo, buffer := tc.getObject(tc.obj, unversionedObjectVersionID, false) resInfo, buffer := tc.getObject(tc.obj, UnversionedObjectVersionID, false)
require.Equal(t, objContent, buffer) require.Equal(t, objContent, buffer)
require.Equal(t, objInfo.Version(), resInfo.Version()) require.Equal(t, objInfo.Version(), resInfo.Version())
} }
@ -278,7 +278,8 @@ func TestVersioningDeleteSpecificObjectVersion(t *testing.T) {
tc.deleteObject(tc.obj, "", settings) tc.deleteObject(tc.obj, "", settings)
tc.getObject(tc.obj, "", true) tc.getObject(tc.obj, "", true)
for _, ver := range tc.listVersions().DeleteMarker { versions := tc.listVersions()
for _, ver := range versions.DeleteMarker {
if ver.IsLatest { if ver.IsLatest {
tc.deleteObject(tc.obj, ver.Object.Version(), settings) tc.deleteObject(tc.obj, ver.Object.Version(), settings)
} }