[#165] Support streaming listing

Signed-off-by: Denis Kirillov <d.kirillov@yadro.com>
This commit is contained in:
Denis Kirillov 2023-10-09 09:57:33 +03:00
parent 84af85ed67
commit 29ac91dfd5
12 changed files with 938 additions and 39 deletions

View file

@ -17,12 +17,14 @@ import (
"strconv"
"strings"
"sync"
"time"
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api"
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/auth"
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/cache"
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/data"
apiErrors "git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/errors"
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/middleware"
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/internal/logs"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client"
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
@ -572,7 +574,7 @@ func (n *layer) ListObjectsV2(ctx context.Context, p *ListObjectsParamsV2) (*Lis
ContinuationToken: p.ContinuationToken,
}
objects, next, err := n.getLatestObjectsVersions(ctx, prm)
objects, next, err := n.getLatestObjectsVersionsV2(ctx, prm)
if err != nil {
return nil, err
}
@ -645,6 +647,75 @@ func (n *layer) getLatestObjectsVersions(ctx context.Context, p allObjectParams)
return
}
func (n *layer) getLatestObjectsVersionsV2(ctx context.Context, p allObjectParams) (objects []*data.ObjectInfo, next *data.ObjectInfo, err error) {
if p.MaxKeys == 0 {
return nil, nil, nil
}
testKey := p.Prefix + p.Delimiter + p.ContinuationToken
nodeVersionsStreamValue, ok := n.cache.testCache[testKey]
if ok {
delete(n.cache.testCache, testKey)
} else {
ctx2, cancel2 := context.WithCancel(context.Background())
go func() {
<-time.After(10 * time.Second)
cancel2()
}()
if bd, err := middleware.GetBoxData(ctx); err == nil {
ctx2 = middleware.SetBoxData(ctx2, bd)
}
nodeVersionsStreamValue.Stream, err = n.treeService.GetLatestVersionsByPrefixStream(ctx2, p.Bucket, p.Prefix)
if err != nil {
return nil, nil, err
}
nodeVersionsStreamValue.NamesMap = map[string]struct{}{}
}
poolCtx, cancel := context.WithCancel(ctx)
defer cancel()
generator, errorCh := nodesGeneratorStream(poolCtx, p, nodeVersionsStreamValue)
objOutCh, err := n.initWorkerPoolStream(poolCtx, 2, p, generator)
if err != nil {
return nil, nil, fmt.Errorf("failed to init worker pool: %w", err)
}
objects = make([]*data.ObjectInfo, 0, p.MaxKeys+1)
if nodeVersionsStreamValue.Next != nil {
objects = append(objects, nodeVersionsStreamValue.Next)
}
for obj := range objOutCh {
objects = append(objects, obj)
}
if err = <-errorCh; err != nil {
fmt.Println(len(objects))
fmt.Println(objects[len(objects)-1].Name)
return nil, nil, fmt.Errorf("failed to get object from tree: %w", err)
}
sort.Slice(objects, func(i, j int) bool {
return objects[i].Name < objects[j].Name
})
if len(objects) > p.MaxKeys {
next = objects[p.MaxKeys]
objects = objects[:p.MaxKeys]
}
if next != nil {
nodeVersionsStreamValue.Next = next
n.cache.testCache[p.Prefix+p.Delimiter+next.VersionID()] = nodeVersionsStreamValue
}
return
}
func nodesGenerator(ctx context.Context, p allObjectParams, nodeVersions []*data.NodeVersion) <-chan *data.NodeVersion {
nodeCh := make(chan *data.NodeVersion)
existed := make(map[string]struct{}, len(nodeVersions)) // to squash the same directories
@ -673,6 +744,86 @@ func nodesGenerator(ctx context.Context, p allObjectParams, nodeVersions []*data
return nodeCh
}
func nodesGeneratorVersions(ctx context.Context, p allObjectParams, nodeVersions []*data.NodeVersion) <-chan *data.NodeVersion {
nodeCh := make(chan *data.NodeVersion)
existed := make(map[string]struct{}, len(nodeVersions)) // to squash the same directories
go func() {
var generated int
LOOP:
for _, node := range nodeVersions {
if shouldSkipVersions(node, p, existed) {
continue
}
select {
case <-ctx.Done():
break LOOP
case nodeCh <- node:
generated++
if generated == p.MaxKeys+1 { // we use maxKeys+1 to be able to know nextMarker/nextContinuationToken
break LOOP
}
}
}
close(nodeCh)
}()
return nodeCh
}
func nodesGeneratorStream(ctx context.Context, p allObjectParams, stream TestCacheValue) (<-chan *data.NodeVersion, <-chan error) {
nodeCh := make(chan *data.NodeVersion)
errCh := make(chan error, 1)
//existed := make(map[string]struct{}, p.MaxKeys) // to squash the same directories
existed := stream.NamesMap
if stream.Next != nil {
existed[continuationToken] = struct{}{}
}
limit := p.MaxKeys
if stream.Next == nil {
limit++
}
go func() {
var generated int
var err error
LOOP:
for err == nil {
node, err := stream.Stream.Next(ctx)
if err != nil {
if !errors.Is(err, io.EOF) {
fmt.Println(ctx.Err())
errCh <- fmt.Errorf("stream next: %w", err)
}
break LOOP
}
if shouldSkip(node, p, existed) {
continue
}
select {
case <-ctx.Done():
break LOOP
case nodeCh <- node:
generated++
if generated == limit { // we use maxKeys+1 to be able to know nextMarker/nextContinuationToken
break LOOP
}
}
}
close(nodeCh)
close(errCh)
}()
return nodeCh, errCh
}
func (n *layer) initWorkerPool(ctx context.Context, size int, p allObjectParams, input <-chan *data.NodeVersion) (<-chan *data.ObjectInfo, error) {
reqLog := n.reqLogger(ctx)
pool, err := ants.NewPool(size, ants.WithLogger(&logWrapper{reqLog}))
@ -725,6 +876,126 @@ func (n *layer) initWorkerPool(ctx context.Context, size int, p allObjectParams,
return objCh, nil
}
func (n *layer) initWorkerPoolVersions(ctx context.Context, size int, p allObjectParams, input <-chan *data.NodeVersion) (<-chan *data.ExtendedObjectInfo, error) {
reqLog := n.reqLogger(ctx)
pool, err := ants.NewPool(size, ants.WithLogger(&logWrapper{reqLog}))
if err != nil {
return nil, fmt.Errorf("coudln't init go pool for listing: %w", err)
}
objCh := make(chan *data.ExtendedObjectInfo)
go func() {
var wg sync.WaitGroup
LOOP:
for node := range input {
select {
case <-ctx.Done():
break LOOP
default:
}
// We have to make a copy of pointer to data.NodeVersion
// to get correct value in submitted task function.
func(node *data.NodeVersion) {
wg.Add(1)
err = pool.Submit(func() {
defer wg.Done()
oi := &data.ObjectInfo{}
if node.IsDeleteMarker() { // delete marker does not match any object in FrostFS
oi.ID = node.OID
oi.Name = node.FilePath
oi.Owner = node.DeleteMarker.Owner
oi.Created = node.DeleteMarker.Created
oi.IsDeleteMarker = true
} else {
oi = n.objectInfoFromObjectsCacheOrFrostFS(ctx, p.Bucket, node, p.Prefix, p.Delimiter)
if oi == nil {
// try to get object again
if oi = n.objectInfoFromObjectsCacheOrFrostFS(ctx, p.Bucket, node, p.Prefix, p.Delimiter); oi == nil {
// do not process object which are definitely missing in object service
return
}
}
}
eoi := &data.ExtendedObjectInfo{
ObjectInfo: oi,
NodeVersion: node,
}
select {
case <-ctx.Done():
case objCh <- eoi:
}
})
if err != nil {
wg.Done()
reqLog.Warn(logs.FailedToSubmitTaskToPool, zap.Error(err))
}
}(node)
}
wg.Wait()
close(objCh)
pool.Release()
}()
return objCh, nil
}
func (n *layer) initWorkerPoolStream(ctx context.Context, size int, p allObjectParams, input <-chan *data.NodeVersion) (<-chan *data.ObjectInfo, error) {
reqLog := n.reqLogger(ctx)
pool, err := ants.NewPool(size, ants.WithLogger(&logWrapper{reqLog}))
if err != nil {
return nil, fmt.Errorf("coudln't init go pool for listing: %w", err)
}
objCh := make(chan *data.ObjectInfo)
go func() {
var wg sync.WaitGroup
LOOP:
for node := range input {
select {
case <-ctx.Done():
break LOOP
default:
}
// We have to make a copy of pointer to data.NodeVersion
// to get correct value in submitted task function.
func(node *data.NodeVersion) {
wg.Add(1)
err = pool.Submit(func() {
defer wg.Done()
oi := n.objectInfoFromObjectsCacheOrFrostFS(ctx, p.Bucket, node, p.Prefix, p.Delimiter)
if oi == nil {
// try to get object again
if oi = n.objectInfoFromObjectsCacheOrFrostFS(ctx, p.Bucket, node, p.Prefix, p.Delimiter); oi == nil {
// do not process object which are definitely missing in object service
return
}
}
select {
case <-ctx.Done():
case objCh <- oi:
}
})
if err != nil {
wg.Done()
reqLog.Warn(logs.FailedToSubmitTaskToPool, zap.Error(err))
}
}(node)
}
wg.Wait()
close(objCh)
pool.Release()
}()
return objCh, nil
}
func (n *layer) bucketNodeVersions(ctx context.Context, bkt *data.BucketInfo, prefix string) ([]*data.NodeVersion, error) {
var err error
@ -744,41 +1015,43 @@ func (n *layer) bucketNodeVersions(ctx context.Context, bkt *data.BucketInfo, pr
return nodeVersions, nil
}
func (n *layer) getAllObjectsVersions(ctx context.Context, bkt *data.BucketInfo, prefix, delimiter string) (map[string][]*data.ExtendedObjectInfo, error) {
nodeVersions, err := n.bucketNodeVersions(ctx, bkt, prefix)
func (n *layer) getAllObjectsVersions(ctx context.Context, p *ListObjectVersionsParams) (map[string][]*data.ExtendedObjectInfo, error) {
nodeVersions, err := n.bucketNodeVersions(ctx, p.BktInfo, p.Prefix)
if err != nil {
return nil, err
}
versions := make(map[string][]*data.ExtendedObjectInfo, len(nodeVersions))
for _, nodeVersion := range nodeVersions {
oi := &data.ObjectInfo{}
sort.Slice(nodeVersions, func(i, j int) bool {
return nodeVersions[i].FilePath < nodeVersions[j].FilePath
})
if nodeVersion.IsDeleteMarker() { // delete marker does not match any object in FrostFS
oi.ID = nodeVersion.OID
oi.Name = nodeVersion.FilePath
oi.Owner = nodeVersion.DeleteMarker.Owner
oi.Created = nodeVersion.DeleteMarker.Created
oi.IsDeleteMarker = true
} else {
if oi = n.objectInfoFromObjectsCacheOrFrostFS(ctx, bkt, nodeVersion, prefix, delimiter); oi == nil {
continue
}
}
poolCtx, cancel := context.WithCancel(ctx)
defer cancel()
eoi := &data.ExtendedObjectInfo{
ObjectInfo: oi,
NodeVersion: nodeVersion,
}
pp := allObjectParams{
Bucket: p.BktInfo,
Delimiter: p.Delimiter,
Prefix: p.Prefix,
Marker: p.KeyMarker,
ContinuationToken: p.VersionIDMarker,
MaxKeys: p.MaxKeys,
}
objVersions, ok := versions[oi.Name]
objOutCh, err := n.initWorkerPoolVersions(poolCtx, 2, pp, nodesGeneratorVersions(poolCtx, pp, nodeVersions))
if err != nil {
return nil, err
}
for eoi := range objOutCh {
objVersions, ok := versions[eoi.ObjectInfo.Name]
if !ok {
objVersions = []*data.ExtendedObjectInfo{eoi}
} else if !oi.IsDir {
} else if !eoi.ObjectInfo.IsDir {
objVersions = append(objVersions, eoi)
}
versions[oi.Name] = objVersions
versions[eoi.ObjectInfo.Name] = objVersions
}
return versions, nil
@ -819,6 +1092,32 @@ func shouldSkip(node *data.NodeVersion, p allObjectParams, existed map[string]st
return false
}
func shouldSkipVersions(node *data.NodeVersion, p allObjectParams, existed map[string]struct{}) bool {
filePath := node.FilePath
if dirName := tryDirectoryName(node, p.Prefix, p.Delimiter); len(dirName) != 0 {
filePath = dirName
if _, ok := existed[filePath]; ok {
return true
}
}
if filePath < p.Marker {
return true
}
if p.ContinuationToken != "" {
if _, ok := existed[continuationToken]; !ok {
if p.ContinuationToken != node.OID.EncodeToString() {
return true
}
existed[continuationToken] = struct{}{}
}
}
existed[filePath] = struct{}{}
return false
}
func triageObjects(allObjects []*data.ObjectInfo) (prefixes []string, objects []*data.ObjectInfo) {
for _, ov := range allObjects {
if ov.IsDir {