[#165] Add stream listing tests

Signed-off-by: Denis Kirillov <d.kirillov@yadro.com>
This commit is contained in:
Denis Kirillov 2024-01-19 12:53:53 +03:00
parent cf4fc3b602
commit 093de13f54
6 changed files with 189 additions and 186 deletions

View file

@ -2,17 +2,18 @@ package data
import (
"context"
"sync/atomic"
)
type VersionsStream interface {
Next(ctx context.Context) (*NodeVersion, error)
}
// todo consider thread safe
type ListSession struct {
Next []*NodeVersion
Stream VersionsStream
NamesMap map[string]struct{}
Context context.Context
Cancel context.CancelFunc
Acquired atomic.Bool
}

View file

@ -435,16 +435,19 @@ func createBucketAndObject(tc *handlerContext, bktName, objName string) (*data.B
}
func createVersionedBucketAndObject(t *testing.T, tc *handlerContext, bktName, objName string) (*data.BucketInfo, *data.ObjectInfo) {
createTestBucket(tc, bktName)
bktInfo, err := tc.Layer().GetBucketInfo(tc.Context(), bktName)
require.NoError(t, err)
putBucketVersioning(t, tc, bktName, true)
bktInfo := createVersionedBucket(tc, bktName)
objInfo := createTestObject(tc, bktInfo, objName, encryption.Params{})
return bktInfo, objInfo
}
func createVersionedBucket(hc *handlerContext, bktName string) *data.BucketInfo {
bktInfo := createTestBucket(hc, bktName)
putBucketVersioning(hc.t, hc, bktName, true)
return bktInfo
}
func putBucketVersioning(t *testing.T, tc *handlerContext, bktName string, enabled bool) {
cfg := &VersioningConfiguration{Status: "Suspended"}
if enabled {

View file

@ -98,7 +98,7 @@ func TestListObjectsLatestVersions(t *testing.T) {
})
}
func TestListObjectsPaging(t *testing.T) {
func TestListObjectsVersionsPaging(t *testing.T) {
hc := prepareHandlerContext(t)
bktName := "bucket-versioning-enabled"
@ -128,6 +128,65 @@ func TestListObjectsPaging(t *testing.T) {
require.Empty(t, objects)
}
func TestListObjectsVersionsCorrectIsLatestFlag(t *testing.T) {
hc := prepareHandlerContext(t)
bktName := "bucket-versioning-enabled"
createVersionedBucket(hc, bktName)
objName1, objName2 := "obj1", "obj2"
n := 9
listSize := 3
headers := make([]http.Header, n)
// objects uploaded: ["obj1"-v1, "obj1"-v2, "obj1"-v3, "obj2"-v1, "obj2"-v2, "obj2"-v3, "obj2"-v4, "obj2"-v5, "obj2"-v6]
for i := 0; i < n; i++ {
objName := objName1
if i >= listSize {
objName = objName2
}
headers[i] = putObjectContent(hc, bktName, objName, fmt.Sprintf("content/%d", i))
}
versions := listObjectsVersions(hc, bktName, "", "", "", "", listSize)
// expected objects: ["obj1"-v3, "obj1"-v2, "obj1"-v1]
checkListVersionsParts(t, versions, formReverseVersionResponse(objName1, headers[:listSize], true))
versions = listObjectsVersions(hc, bktName, "", "", versions.NextKeyMarker, versions.NextVersionIDMarker, listSize)
// expected objects: ["obj2"-v6, "obj2"-v5, "obj2"-v4]
checkListVersionsParts(t, versions, formReverseVersionResponse(objName2, headers[2*listSize:], true))
versions = listObjectsVersions(hc, bktName, "", "", versions.NextKeyMarker, versions.NextVersionIDMarker, listSize)
// expected objects: ["obj2"-v3, "obj2"-v2, "obj2"-v1]
checkListVersionsParts(t, versions, formReverseVersionResponse(objName2, headers[listSize:2*listSize], false))
}
func formReverseVersionResponse(objName string, headers []http.Header, isLatest bool) []ObjectVersionResponse {
res := make([]ObjectVersionResponse, len(headers))
for i, h := range headers {
ind := len(headers) - 1 - i
res[ind] = ObjectVersionResponse{
ETag: h.Get(api.ETag),
IsLatest: isLatest && ind == 0,
Key: objName,
VersionID: h.Get(api.AmzVersionID),
}
}
return res
}
func checkListVersionsParts(t *testing.T, versions *ListObjectsVersionsResponse, expected []ObjectVersionResponse) {
require.Len(t, versions.Version, len(expected))
for i, res := range versions.Version {
require.Equal(t, expected[i].Key, res.Key)
require.Equal(t, expected[i].ETag, res.ETag)
require.Equal(t, expected[i].VersionID, res.VersionID)
require.Equal(t, expected[i].IsLatest, res.IsLatest)
}
}
func TestS3CompatibilityBucketListV2BothContinuationTokenStartAfter(t *testing.T) {
tc := prepareHandlerContext(t)
@ -202,6 +261,18 @@ func TestS3BucketListDelimiterBasic(t *testing.T) {
require.Equal(t, "quux/", listV1Response.CommonPrefixes[1].Prefix)
}
func TestS3BucketListEmpty(t *testing.T) {
hc := prepareHandlerContext(t)
bktName := "bucket-for-listing"
createTestBucket(hc, bktName)
versions := listObjectsVersions(hc, bktName, "", "", "", "", -1)
require.Empty(t, versions.Version)
require.Empty(t, versions.DeleteMarker)
require.Empty(t, versions.CommonPrefixes)
}
func TestS3BucketListV2PrefixAlt(t *testing.T) {
hc := prepareHandlerContext(t)

View file

@ -139,114 +139,24 @@ func (n *layer) ListObjectsV2(ctx context.Context, p *ListObjectsParamsV2) (*Lis
}
func (n *layer) ListObjectVersions(ctx context.Context, p *ListObjectVersionsParams) (*ListObjectVersionsInfo, error) {
var err error
owner := n.BearerOwner(ctx)
versions := make(map[string][]*data.ExtendedNodeVersion, p.MaxKeys)
cacheKey := cache.CreateListSessionCacheKey(p.BktInfo.CID, p.Prefix, p.VersionIDMarker)
session := n.cache.GetListSession(owner, cacheKey)
if session != nil {
// after reading next object from stream in session
// the current cache value already doesn't match with next token in cache key
n.cache.DeleteListSession(owner, cacheKey)
} else {
session = &data.ListSession{NamesMap: make(map[string]struct{})}
session.Context, session.Cancel = context.WithCancel(context.Background())
if bd, err := middleware.GetBoxData(ctx); err == nil {
session.Context = middleware.SetBoxData(session.Context, bd)
}
session.Stream, err = n.treeService.GetAllVersionsByPrefixStream(session.Context, p.BktInfo, p.Prefix)
objects, isTruncated, err := n.getAllObjectsVersions(ctx, p)
if err != nil {
return nil, err
}
}
poolCtx, cancel := context.WithCancel(ctx)
defer cancel()
pp := allObjectParams{
Bucket: p.BktInfo,
Delimiter: p.Delimiter,
Prefix: p.Prefix,
Marker: p.KeyMarker,
ContinuationToken: p.VersionIDMarker,
MaxKeys: p.MaxKeys,
}
generator, errorCh := nodesGeneratorVersions(poolCtx, pp, session)
objOutCh, err := n.initWorkerPoolVersions(poolCtx, 2, pp, generator)
if err != nil {
return nil, err
}
for eoi := range objOutCh {
name := eoi.NodeVersion.FilePath
dirName := tryDirectoryName(eoi.NodeVersion, p.Prefix, p.Delimiter)
if dirName != "" {
name = dirName
}
objVersions, ok := versions[name]
if !ok {
objVersions = []*data.ExtendedNodeVersion{eoi}
} else if dirName == "" {
objVersions = append(objVersions, eoi)
}
versions[name] = objVersions
}
if err = <-errorCh; err != nil {
return nil, fmt.Errorf("failed to get next object from stream: %w", err)
}
sortedNames := make([]string, 0, len(versions))
for k := range versions {
sortedNames = append(sortedNames, k)
}
sort.Strings(sortedNames)
allObjects := make([]*data.ExtendedNodeVersion, 0, p.MaxKeys)
for _, name := range sortedNames {
sortedVersions := versions[name]
sort.Slice(sortedVersions, func(i, j int) bool {
return sortedVersions[j].NodeVersion.Timestamp < sortedVersions[i].NodeVersion.Timestamp // sort in reverse order
})
for i, version := range sortedVersions {
version.IsLatest = i == 0
allObjects = append(allObjects, version)
}
}
//if allObjects, err = filterVersionsByMarker(allObjects, p); err != nil {
// return nil, err
//}
res := &ListObjectVersionsInfo{
KeyMarker: p.KeyMarker,
VersionIDMarker: p.VersionIDMarker,
IsTruncated: isTruncated,
}
res.CommonPrefixes, allObjects = triageExtendedObjects(allObjects, p.Prefix, p.Delimiter)
if len(allObjects) > p.MaxKeys {
res.IsTruncated = true
res.NextKeyMarker = allObjects[p.MaxKeys-1].NodeVersion.FilePath
res.NextVersionIDMarker = allObjects[p.MaxKeys-1].NodeVersion.OID.EncodeToString()
session.Next = []*data.NodeVersion{allObjects[p.MaxKeys-1].NodeVersion, allObjects[p.MaxKeys].NodeVersion}
n.cache.PutListSession(owner, cache.CreateListSessionCacheKey(p.BktInfo.CID, p.Prefix, res.NextVersionIDMarker), session)
allObjects = allObjects[:p.MaxKeys]
if res.IsTruncated {
res.NextKeyMarker = objects[p.MaxKeys-1].NodeVersion.FilePath
res.NextVersionIDMarker = objects[p.MaxKeys-1].NodeVersion.OID.EncodeToString()
}
res.Version, res.DeleteMarker = triageVersions(allObjects)
res.CommonPrefixes, objects = triageExtendedObjects(objects, p.Prefix, p.Delimiter)
res.Version, res.DeleteMarker = triageVersions(objects)
return res, nil
}
@ -309,18 +219,107 @@ func (n *layer) getLatestObjectsVersions(ctx context.Context, p allObjectParams)
return
}
func (n *layer) getAllObjectsVersions(ctx context.Context, p *ListObjectVersionsParams) (versions map[string][]*data.ExtendedNodeVersion, err error) {
owner := n.BearerOwner(ctx)
func (n *layer) getAllObjectsVersions(ctx context.Context, p *ListObjectVersionsParams) ([]*data.ExtendedNodeVersion, bool, error) {
if p.MaxKeys == 0 {
return nil, false, nil
}
versions = make(map[string][]*data.ExtendedNodeVersion, p.MaxKeys)
session, err := n.getListVersionsSession(ctx, p)
if err != nil {
return nil, false, err
}
pp := allObjectParams{
Bucket: p.BktInfo,
Delimiter: p.Delimiter,
Prefix: p.Prefix,
Marker: p.KeyMarker,
ContinuationToken: p.VersionIDMarker,
MaxKeys: p.MaxKeys,
}
generator, errorCh := nodesGeneratorVersions(ctx, pp, session)
objOutCh, err := n.initWorkerPoolVersions(ctx, 2, pp, generator)
if err != nil {
return nil, false, err
}
var lastName string
groupedVersions := make([][]*data.ExtendedNodeVersion, 0, p.MaxKeys)
for eoi := range objOutCh {
name := eoi.NodeVersion.FilePath
dirName := tryDirectoryName(eoi.NodeVersion, p.Prefix, p.Delimiter)
if dirName != "" {
name = dirName
}
if lastName != name {
groupedVersions = append(groupedVersions, []*data.ExtendedNodeVersion{eoi})
} else if dirName == "" {
groupedVersions[len(groupedVersions)-1] = append(groupedVersions[len(groupedVersions)-1], eoi)
}
lastName = name
}
if err = <-errorCh; err != nil {
return nil, false, fmt.Errorf("failed to get next object from stream: %w", err)
}
allObjects := make([]*data.ExtendedNodeVersion, 0, p.MaxKeys)
for _, versions := range groupedVersions {
sort.Slice(versions, func(i, j int) bool {
return versions[j].NodeVersion.Timestamp < versions[i].NodeVersion.Timestamp // sort in reverse order
})
for i, version := range versions {
version.IsLatest = i == 0 && (session.Next == nil || session.Next[0].FilePath != versions[0].NodeVersion.FilePath)
allObjects = append(allObjects, version)
}
}
var isTruncated bool
if len(allObjects) > p.MaxKeys {
isTruncated = true
session.Next = make([]*data.NodeVersion, len(allObjects)-p.MaxKeys+1)
session.Next[0] = allObjects[p.MaxKeys-1].NodeVersion
for i, node := range allObjects[p.MaxKeys:] {
session.Next[i+1] = node.NodeVersion
}
session.Acquired.Store(false)
n.cache.PutListSession(n.BearerOwner(ctx), cache.CreateListSessionCacheKey(p.BktInfo.CID, p.Prefix, session.Next[0].OID.EncodeToString()), session)
allObjects = allObjects[:p.MaxKeys]
}
return allObjects, isTruncated, nil
}
func (n *layer) getListVersionsSession(ctx context.Context, p *ListObjectVersionsParams) (*data.ListSession, error) {
owner := n.BearerOwner(ctx)
cacheKey := cache.CreateListSessionCacheKey(p.BktInfo.CID, p.Prefix, p.VersionIDMarker)
session := n.cache.GetListSession(owner, cacheKey)
if session != nil {
if session == nil {
return n.initNewListVersionsSession(ctx, p)
}
if session.Acquired.Swap(true) {
return n.initNewListVersionsSession(ctx, p)
}
// after reading next object from stream in session
// the current cache value already doesn't match with next token in cache key
n.cache.DeleteListSession(owner, cacheKey)
} else {
return session, nil
}
func (n *layer) initNewListVersionsSession(ctx context.Context, p *ListObjectVersionsParams) (session *data.ListSession, err error) {
session = &data.ListSession{NamesMap: make(map[string]struct{})}
session.Context, session.Cancel = context.WithCancel(context.Background())
@ -332,59 +331,8 @@ func (n *layer) getAllObjectsVersions(ctx context.Context, p *ListObjectVersions
if err != nil {
return nil, err
}
}
poolCtx, cancel := context.WithCancel(ctx)
defer cancel()
pp := allObjectParams{
Bucket: p.BktInfo,
Delimiter: p.Delimiter,
Prefix: p.Prefix,
Marker: p.KeyMarker,
ContinuationToken: p.VersionIDMarker,
MaxKeys: p.MaxKeys,
}
generator, errorCh := nodesGeneratorVersions(poolCtx, pp, session)
objOutCh, err := n.initWorkerPoolVersions(poolCtx, 2, pp, generator)
if err != nil {
return nil, err
}
//if session.Next != nil {
// name := session.Next.FilePath
// dirName := tryDirectoryName(session.Next, p.Prefix, p.Delimiter)
// if dirName != "" {
// name = dirName
// }
//
// versions[name] = []*data.ExtendedNodeVersion{{NodeVersion: session.Next}}
//}
for eoi := range objOutCh {
name := eoi.NodeVersion.FilePath
dirName := tryDirectoryName(eoi.NodeVersion, p.Prefix, p.Delimiter)
if dirName != "" {
name = dirName
}
objVersions, ok := versions[name]
if !ok {
objVersions = []*data.ExtendedNodeVersion{eoi}
} else if dirName == "" {
objVersions = append(objVersions, eoi)
}
versions[name] = objVersions
}
if err = <-errorCh; err != nil {
return nil, fmt.Errorf("failed to get next object from stream: %w", err)
}
return versions, nil
return session, nil
}
func nodesGeneratorStream(ctx context.Context, p allObjectParams, stream *data.ListSession) (<-chan *data.NodeVersion, <-chan error) {
@ -441,25 +389,17 @@ func nodesGeneratorStream(ctx context.Context, p allObjectParams, stream *data.L
func nodesGeneratorVersions(ctx context.Context, p allObjectParams, stream *data.ListSession) (<-chan *data.NodeVersion, <-chan error) {
nodeCh := make(chan *data.NodeVersion, 1000)
errCh := make(chan error, 1)
//existed := make(map[string]struct{}, p.MaxKeys) // to squash the same directories
existed := stream.NamesMap
delete(existed, continuationToken)
//if len(stream.Next) != 0 {
// existed[continuationToken] = struct{}{}
//}
limit := p.MaxKeys + 1
//if len(stream.Next) == 0 {
// limit++
//}
go func() {
var generated int
var err error
ind := 0
var (
generated int
ind int
err error
lastName string
)
LOOP:
for err == nil {
@ -486,10 +426,10 @@ func nodesGeneratorVersions(ctx context.Context, p allObjectParams, stream *data
break LOOP
case nodeCh <- node:
generated++
if generated == limit { // we use maxKeys+1 to be able to know nextMarker/nextContinuationToken
if generated > p.MaxKeys && node.FilePath != lastName {
break LOOP
}
lastName = node.FilePath
}
}
close(nodeCh)
@ -627,25 +567,6 @@ func (n *layer) initWorkerPoolVersions(ctx context.Context, size int, p allObjec
return objCh, nil
}
func (n *layer) bucketNodeVersions(ctx context.Context, bkt *data.BucketInfo, prefix string) ([]*data.NodeVersion, error) {
var err error
owner := n.BearerOwner(ctx)
cacheKey := cache.CreateObjectsListCacheKey(bkt.CID, prefix, false)
nodeVersions := n.cache.GetList(owner, cacheKey)
if nodeVersions == nil {
nodeVersions, err = n.treeService.GetAllVersionsByPrefix(ctx, bkt, prefix)
if err != nil {
return nil, fmt.Errorf("get all versions from tree service: %w", err)
}
n.cache.PutList(owner, cacheKey, nodeVersions)
}
return nodeVersions, nil
}
func shouldSkip(node *data.NodeVersion, p allObjectParams, existed map[string]struct{}) bool {
if node.IsDeleteMarker {
return true

View file

@ -680,6 +680,9 @@ func (s *LatestVersionsByPrefixStreamImpl) Next(ctx context.Context) (*data.Node
if s.innerStream == nil {
node, err := s.mainStream.Next()
if err != nil {
if errors.Is(err, ErrNodeNotFound) {
return nil, io.EOF
}
if errors.Is(err, io.EOF) {
s.ended = true
if s.latestOnly && s.currentLatest != nil {

View file

@ -236,9 +236,13 @@ func (c *ServiceClientMemory) GetSubTree(_ context.Context, bktInfo *data.Bucket
type SubTreeStreamImpl struct {
res []NodeResponse
offset int
err error
}
func (s *SubTreeStreamImpl) Next() (NodeResponse, error) {
if s.err != nil {
return nil, s.err
}
if s.offset > len(s.res)-1 {
return nil, io.EOF
}
@ -249,7 +253,7 @@ func (s *SubTreeStreamImpl) Next() (NodeResponse, error) {
func (c *ServiceClientMemory) GetSubTreeStream(_ context.Context, bktInfo *data.BucketInfo, treeID string, rootID uint64, depth uint32) (SubTreeStream, error) {
cnr, ok := c.containers[bktInfo.CID.EncodeToString()]
if !ok {
return nil, nil
return &SubTreeStreamImpl{err: ErrNodeNotFound}, nil
}
tr, ok := cnr.trees[treeID]