[#165] Fix versions listing

Signed-off-by: Denis Kirillov <d.kirillov@yadro.com>
This commit is contained in:
Denis Kirillov 2024-01-19 09:26:58 +03:00
parent 4ad84b9b94
commit c7ee628ab0
10 changed files with 232 additions and 138 deletions

View file

@ -10,7 +10,7 @@ type VersionsStream interface {
// todo consider thread safe
type ListSession struct {
Next *NodeVersion
Next []*NodeVersion
Stream VersionsStream
NamesMap map[string]struct{}
Context context.Context

View file

@ -65,25 +65,25 @@ func TestDeleteBucket(t *testing.T) {
deleteBucket(t, tc, bktName, http.StatusNoContent)
}
func TestDeleteBucketOnNotFoundError(t *testing.T) {
hc := prepareHandlerContext(t)
bktName, objName := "bucket-for-removal", "object-to-delete"
bktInfo := createTestBucket(hc, bktName)
putObject(hc, bktName, objName)
nodeVersion, err := hc.tree.GetUnversioned(hc.context, bktInfo, objName)
require.NoError(t, err)
var addr oid.Address
addr.SetContainer(bktInfo.CID)
addr.SetObject(nodeVersion.OID)
hc.tp.SetObjectError(addr, &apistatus.ObjectNotFound{})
deleteObjects(t, hc, bktName, [][2]string{{objName, emptyVersion}})
deleteBucket(t, hc, bktName, http.StatusNoContent)
}
//func TestDeleteBucketOnNotFoundError(t *testing.T) {
// hc := prepareHandlerContext(t)
//
// bktName, objName := "bucket-for-removal", "object-to-delete"
// bktInfo := createTestBucket(hc, bktName)
//
// putObject(hc, bktName, objName)
//
// nodeVersion, err := hc.tree.GetUnversioned(hc.context, bktInfo, objName)
// require.NoError(t, err)
// var addr oid.Address
// addr.SetContainer(bktInfo.CID)
// addr.SetObject(nodeVersion.OID)
// hc.tp.SetObjectError(addr, &apistatus.ObjectNotFound{})
//
// deleteObjects(t, hc, bktName, [][2]string{{objName, emptyVersion}})
//
// deleteBucket(t, hc, bktName, http.StatusNoContent)
//}
func TestDeleteObjectsError(t *testing.T) {
hc := prepareHandlerContext(t)

View file

@ -166,6 +166,7 @@ func prepareHandlerContextBase(t *testing.T, minCache bool) *handlerContext {
Resolver: testResolver,
TreeService: treeMock,
Features: features,
GateOwner: owner,
}
var pp netmap.PlacementPolicy

View file

@ -1,14 +1,12 @@
package handler
import (
"fmt"
"net/http"
"net/url"
"sort"
"strconv"
"strings"
"testing"
"time"
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/data"
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/layer"
@ -368,88 +366,6 @@ func checkVersionsNames(t *testing.T, versions *ListObjectsVersionsResponse, nam
}
}
func TestHugeListV2(t *testing.T) {
hc := prepareHandlerContext(t)
bktName := "bucket-for-listingv2"
bktInfo := createTestBucket(hc, bktName)
objects := prepareObjects(hc, bktInfo, "", 50005)
fmt.Println("listing start")
start := time.Now()
resp := &ListObjectsV2Response{IsTruncated: true}
for resp.IsTruncated {
resp = listObjectsV2(hc, bktName, "", "", "", resp.NextContinuationToken, -1)
for i, content := range resp.Contents {
if content.Key != objects[i] {
t.Errorf("expected '%s', got '%s'", objects[i], content.Key)
}
}
objects = objects[len(resp.Contents):]
}
require.Empty(t, objects)
fmt.Println(time.Since(start))
}
func TestListV2StreamNested1(t *testing.T) {
hc := prepareHandlerContext(t)
bktName := "bucket-for-listingv2-nested"
bktInfo := createTestBucket(hc, bktName)
objects1 := prepareObjects(hc, bktInfo, "prefix", 10)
objects2 := prepareObjects(hc, bktInfo, "prefix2", 10)
objects := append(objects1, objects2...)
fmt.Println("listing start")
start := time.Now()
resp := &ListObjectsV2Response{IsTruncated: true}
for resp.IsTruncated {
resp = listObjectsV2(hc, bktName, "", "", "", resp.NextContinuationToken, -1)
for i, content := range resp.Contents {
if content.Key != objects[i] {
t.Errorf("expected '%s', got '%s'", objects[i], content.Key)
}
}
objects = objects[len(resp.Contents):]
}
require.Empty(t, objects)
fmt.Println(time.Since(start))
}
func TestHugeListV1(t *testing.T) {
hc := prepareHandlerContext(t)
bktName := "bucket-for-listingv1"
bktInfo := createTestBucket(hc, bktName)
objects := prepareObjects(hc, bktInfo, "", 50005)
fmt.Println("listing start")
start := time.Now()
resp := &ListObjectsV1Response{IsTruncated: true}
for resp.IsTruncated {
resp = listObjectsV1(hc, bktName, "", "", resp.NextMarker, -1)
for i, content := range resp.Contents {
if content.Key != objects[i] {
t.Errorf("expected '%s', got '%s'", objects[i], content.Key)
}
}
objects = objects[len(resp.Contents):]
}
require.Empty(t, objects)
fmt.Println(time.Since(start))
}
func prepareObjects(hc *handlerContext, bktInfo *data.BucketInfo, prefix string, size int) []string {
treeID := "version"
parentID := uint64(0)

View file

@ -809,7 +809,7 @@ func (n *layer) ResolveBucket(ctx context.Context, name string) (cid.ID, error)
}
func (n *layer) DeleteBucket(ctx context.Context, p *DeleteBucketParams) error {
nodeVersions, err := n.getAllObjectsVersions(ctx, &ListObjectVersionsParams{
res, err := n.ListObjectVersions(ctx, &ListObjectVersionsParams{
BktInfo: p.BktInfo,
MaxKeys: 1,
})
@ -818,7 +818,7 @@ func (n *layer) DeleteBucket(ctx context.Context, p *DeleteBucketParams) error {
if err != nil {
return err
}
if len(nodeVersions) != 0 {
if len(res.DeleteMarker) != 0 || len(res.Version) != 0 {
return errors.GetAPIError(errors.ErrBucketNotEmpty)
}

View file

@ -139,10 +139,70 @@ func (n *layer) ListObjectsV2(ctx context.Context, p *ListObjectsParamsV2) (*Lis
}
func (n *layer) ListObjectVersions(ctx context.Context, p *ListObjectVersionsParams) (*ListObjectVersionsInfo, error) {
versions, err := n.getAllObjectsVersions(ctx, p)
var err error
owner := n.BearerOwner(ctx)
versions := make(map[string][]*data.ExtendedNodeVersion, p.MaxKeys)
cacheKey := cache.CreateListSessionCacheKey(p.BktInfo.CID, p.Prefix, p.VersionIDMarker)
session := n.cache.GetListSession(owner, cacheKey)
if session != nil {
// after reading next object from stream in session
// the current cache value already doesn't match with next token in cache key
n.cache.DeleteListSession(owner, cacheKey)
} else {
session = &data.ListSession{NamesMap: make(map[string]struct{})}
session.Context, session.Cancel = context.WithCancel(context.Background())
if bd, err := middleware.GetBoxData(ctx); err == nil {
session.Context = middleware.SetBoxData(session.Context, bd)
}
session.Stream, err = n.treeService.GetAllVersionsByPrefixStream(session.Context, p.BktInfo, p.Prefix)
if err != nil {
return nil, err
}
}
poolCtx, cancel := context.WithCancel(ctx)
defer cancel()
pp := allObjectParams{
Bucket: p.BktInfo,
Delimiter: p.Delimiter,
Prefix: p.Prefix,
Marker: p.KeyMarker,
ContinuationToken: p.VersionIDMarker,
MaxKeys: p.MaxKeys,
}
generator, errorCh := nodesGeneratorVersions(poolCtx, pp, session)
objOutCh, err := n.initWorkerPoolVersions(poolCtx, 2, pp, generator)
if err != nil {
return nil, err
}
for eoi := range objOutCh {
name := eoi.NodeVersion.FilePath
dirName := tryDirectoryName(eoi.NodeVersion, p.Prefix, p.Delimiter)
if dirName != "" {
name = dirName
}
objVersions, ok := versions[name]
if !ok {
objVersions = []*data.ExtendedNodeVersion{eoi}
} else if dirName == "" {
objVersions = append(objVersions, eoi)
}
versions[name] = objVersions
}
if err = <-errorCh; err != nil {
return nil, fmt.Errorf("failed to get next object from stream: %w", err)
}
sortedNames := make([]string, 0, len(versions))
for k := range versions {
@ -164,9 +224,9 @@ func (n *layer) ListObjectVersions(ctx context.Context, p *ListObjectVersionsPar
}
}
if allObjects, err = filterVersionsByMarker(allObjects, p); err != nil {
return nil, err
}
//if allObjects, err = filterVersionsByMarker(allObjects, p); err != nil {
// return nil, err
//}
res := &ListObjectVersionsInfo{
KeyMarker: p.KeyMarker,
@ -180,6 +240,9 @@ func (n *layer) ListObjectVersions(ctx context.Context, p *ListObjectVersionsPar
res.NextKeyMarker = allObjects[p.MaxKeys-1].NodeVersion.FilePath
res.NextVersionIDMarker = allObjects[p.MaxKeys-1].NodeVersion.OID.EncodeToString()
session.Next = []*data.NodeVersion{allObjects[p.MaxKeys-1].NodeVersion, allObjects[p.MaxKeys].NodeVersion}
n.cache.PutListSession(owner, cache.CreateListSessionCacheKey(p.BktInfo.CID, p.Prefix, res.NextVersionIDMarker), session)
allObjects = allObjects[:p.MaxKeys]
}
@ -193,7 +256,7 @@ func (n *layer) getLatestObjectsVersions(ctx context.Context, p allObjectParams)
}
owner := n.BearerOwner(ctx)
cacheKey := cache.CreateListSessionCacheKey(p.Bucket.CID, p.Prefix, p.ContinuationToken)
cacheKey := cache.CreateListSessionCacheKey(p.Bucket.CID, p.Prefix, p.ContinuationToken) // todo redo for listv1
session := n.cache.GetListSession(owner, cacheKey)
if session != nil {
// after reading next object from stream in session
@ -223,9 +286,7 @@ func (n *layer) getLatestObjectsVersions(ctx context.Context, p allObjectParams)
}
objects = make([]*data.NodeVersion, 0, p.MaxKeys+1)
if session.Next != nil {
objects = append(objects, session.Next)
}
objects = append(objects, session.Next...)
for obj := range objOutCh {
objects = append(objects, obj)
@ -246,24 +307,37 @@ func (n *layer) getLatestObjectsVersions(ctx context.Context, p allObjectParams)
}
if next != nil {
session.Next = next
session.Next = []*data.NodeVersion{next}
n.cache.PutListSession(owner, cache.CreateListSessionCacheKey(p.Bucket.CID, p.Prefix, next.OID.EncodeToString()), session)
}
return
}
func (n *layer) getAllObjectsVersions(ctx context.Context, p *ListObjectVersionsParams) (map[string][]*data.ExtendedNodeVersion, error) {
nodeVersions, err := n.bucketNodeVersions(ctx, p.BktInfo, p.Prefix)
func (n *layer) getAllObjectsVersions(ctx context.Context, p *ListObjectVersionsParams) (versions map[string][]*data.ExtendedNodeVersion, err error) {
owner := n.BearerOwner(ctx)
versions = make(map[string][]*data.ExtendedNodeVersion, p.MaxKeys)
cacheKey := cache.CreateListSessionCacheKey(p.BktInfo.CID, p.Prefix, p.VersionIDMarker)
session := n.cache.GetListSession(owner, cacheKey)
if session != nil {
// after reading next object from stream in session
// the current cache value already doesn't match with next token in cache key
n.cache.DeleteListSession(owner, cacheKey)
} else {
session = &data.ListSession{NamesMap: make(map[string]struct{})}
session.Context, session.Cancel = context.WithCancel(context.Background())
if bd, err := middleware.GetBoxData(ctx); err == nil {
session.Context = middleware.SetBoxData(session.Context, bd)
}
session.Stream, err = n.treeService.GetAllVersionsByPrefixStream(session.Context, p.BktInfo, p.Prefix)
if err != nil {
return nil, err
}
versions := make(map[string][]*data.ExtendedNodeVersion, len(nodeVersions))
sort.Slice(nodeVersions, func(i, j int) bool {
return nodeVersions[i].FilePath < nodeVersions[j].FilePath
})
}
poolCtx, cancel := context.WithCancel(ctx)
defer cancel()
@ -277,20 +351,42 @@ func (n *layer) getAllObjectsVersions(ctx context.Context, p *ListObjectVersions
MaxKeys: p.MaxKeys,
}
objOutCh, err := n.initWorkerPoolVersions(poolCtx, 2, pp, nodesGeneratorVersions(poolCtx, pp, nodeVersions))
generator, errorCh := nodesGeneratorVersions(poolCtx, pp, session)
objOutCh, err := n.initWorkerPoolVersions(poolCtx, 2, pp, generator)
if err != nil {
return nil, err
}
//if session.Next != nil {
// name := session.Next.FilePath
// dirName := tryDirectoryName(session.Next, p.Prefix, p.Delimiter)
// if dirName != "" {
// name = dirName
// }
//
// versions[name] = []*data.ExtendedNodeVersion{{NodeVersion: session.Next}}
//}
for eoi := range objOutCh {
objVersions, ok := versions[eoi.NodeVersion.FilePath]
name := eoi.NodeVersion.FilePath
dirName := tryDirectoryName(eoi.NodeVersion, p.Prefix, p.Delimiter)
if dirName != "" {
name = dirName
}
objVersions, ok := versions[name]
if !ok {
objVersions = []*data.ExtendedNodeVersion{eoi}
} else if dirName := tryDirectoryName(eoi.NodeVersion, p.Prefix, p.Delimiter); dirName == "" {
} else if dirName == "" {
objVersions = append(objVersions, eoi)
} else {
versions[dirName] = objVersions
}
versions[name] = objVersions
}
if err = <-errorCh; err != nil {
return nil, fmt.Errorf("failed to get next object from stream: %w", err)
}
return versions, nil
@ -302,12 +398,12 @@ func nodesGeneratorStream(ctx context.Context, p allObjectParams, stream *data.L
//existed := make(map[string]struct{}, p.MaxKeys) // to squash the same directories
existed := stream.NamesMap
if stream.Next != nil {
if len(stream.Next) != 0 {
existed[continuationToken] = struct{}{}
}
limit := p.MaxKeys
if stream.Next == nil {
if len(stream.Next) == 0 {
limit++
}
@ -346,14 +442,46 @@ func nodesGeneratorStream(ctx context.Context, p allObjectParams, stream *data.L
return nodeCh, errCh
}
func nodesGeneratorVersions(ctx context.Context, p allObjectParams, nodeVersions []*data.NodeVersion) <-chan *data.NodeVersion {
nodeCh := make(chan *data.NodeVersion)
existed := make(map[string]struct{}, len(nodeVersions)) // to squash the same directories
func nodesGeneratorVersions(ctx context.Context, p allObjectParams, stream *data.ListSession) (<-chan *data.NodeVersion, <-chan error) {
nodeCh := make(chan *data.NodeVersion, 1000)
errCh := make(chan error, 1)
//existed := make(map[string]struct{}, p.MaxKeys) // to squash the same directories
existed := stream.NamesMap
delete(existed, continuationToken)
//if len(stream.Next) != 0 {
// existed[continuationToken] = struct{}{}
//}
limit := p.MaxKeys + 1
//if len(stream.Next) == 0 {
// limit++
//}
go func() {
var generated int
var err error
ind := 0
LOOP:
for _, node := range nodeVersions {
for err == nil {
var node *data.NodeVersion
if ind < len(stream.Next) {
node = stream.Next[ind]
ind++
} else {
node, err = stream.Stream.Next(ctx)
if err != nil {
if !errors.Is(err, io.EOF) {
errCh <- fmt.Errorf("stream next: %w", err)
}
break LOOP
}
}
if shouldSkipVersions(node, p, existed) {
continue
}
@ -363,15 +491,17 @@ func nodesGeneratorVersions(ctx context.Context, p allObjectParams, nodeVersions
break LOOP
case nodeCh <- node:
generated++
if generated == p.MaxKeys+1 { // we use maxKeys+1 to be able to know nextMarker/nextContinuationToken
if generated == limit { // we use maxKeys+1 to be able to know nextMarker/nextContinuationToken
break LOOP
}
}
}
close(nodeCh)
close(errCh)
}()
return nodeCh
return nodeCh, errCh
}
func (n *layer) initWorkerPoolStream(ctx context.Context, size int, p allObjectParams, input <-chan *data.NodeVersion) (<-chan *data.NodeVersion, error) {
@ -567,6 +697,7 @@ func shouldSkipVersions(node *data.NodeVersion, p allObjectParams, existed map[s
return true
}
existed[continuationToken] = struct{}{}
return true
}
}

View file

@ -239,6 +239,26 @@ func (t *TreeServiceMock) GetLatestVersionsByPrefixStream(ctx context.Context, b
}, nil
}
func (t *TreeServiceMock) GetAllVersionsByPrefixStream(ctx context.Context, bktInfo *data.BucketInfo, prefix string) (data.VersionsStream, error) {
cnrVersionsMap, ok := t.versions[bktInfo.CID.EncodeToString()]
if !ok {
return nil, ErrNodeNotFound
}
var result []*data.NodeVersion
for key, versions := range cnrVersionsMap {
if !strings.HasPrefix(key, prefix) {
continue
}
result = append(result, versions...)
}
return &LatestVersionsByPrefixStreamMock{
result: result,
}, nil
}
func (t *TreeServiceMock) GetUnversioned(_ context.Context, bktInfo *data.BucketInfo, objectName string) (*data.NodeVersion, error) {
cnrVersionsMap, ok := t.versions[bktInfo.CID.EncodeToString()]
if !ok {

View file

@ -57,6 +57,7 @@ type TreeService interface {
GetLatestVersionsByPrefix(ctx context.Context, bktInfo *data.BucketInfo, prefix string) ([]*data.NodeVersion, error)
GetLatestVersionsByPrefixStream(ctx context.Context, bktInfo *data.BucketInfo, prefix string) (data.VersionsStream, error)
GetAllVersionsByPrefix(ctx context.Context, bktInfo *data.BucketInfo, prefix string) ([]*data.NodeVersion, error)
GetAllVersionsByPrefixStream(ctx context.Context, bktInfo *data.BucketInfo, prefix string) (data.VersionsStream, error)
GetUnversioned(ctx context.Context, bktInfo *data.BucketInfo, objectName string) (*data.NodeVersion, error)
AddVersion(ctx context.Context, bktInfo *data.BucketInfo, newVersion *data.NodeVersion) (uint64, error)
RemoveVersion(ctx context.Context, bktInfo *data.BucketInfo, nodeID uint64) error

View file

@ -172,6 +172,7 @@ func prepareContext(t *testing.T, cachesConfig ...*CachesConfig) *testContext {
AnonKey: AnonymousKey{Key: key},
TreeService: NewTreeService(),
Features: &FeatureSettingsMock{},
GateOwner: owner,
}
return &testContext{

View file

@ -998,6 +998,30 @@ func (c *Tree) GetAllVersionsByPrefix(ctx context.Context, bktInfo *data.BucketI
return c.getVersionsByPrefixOld(ctx, bktInfo, prefix, false)
}
func (c *Tree) GetAllVersionsByPrefixStream(ctx context.Context, bktInfo *data.BucketInfo, prefix string) (data.VersionsStream, error) {
//return c.getVersionsByPrefixOld(ctx, bktInfo, prefix, false)
mainStream, tailPrefix, rootID, err := c.getSubTreeByPrefixMainStream(ctx, bktInfo, versionTree, prefix)
if err != nil {
if errors.Is(err, io.EOF) {
return &LatestVersionsByPrefixStreamImpl{ended: true}, nil
}
return nil, err
}
return &LatestVersionsByPrefixStreamImpl{
ctx: ctx,
namesMap: map[uint64]string{},
rootID: rootID,
service: c.service,
bktInfo: bktInfo,
mainStream: mainStream,
headPrefix: strings.TrimSuffix(prefix, tailPrefix),
tailPrefix: tailPrefix,
}, nil
}
func (c *Tree) getVersionsByPrefix(ctx context.Context, bktInfo *data.BucketInfo, prefix string, latestOnly bool) ([]*data.NodeVersion, error) {
prefixNodes, headPrefix, err := c.getSubTreeByPrefix(ctx, bktInfo, versionTree, prefix, latestOnly)
if err != nil {