forked from TrueCloudLab/frostfs-s3-gw
[#165] Add batching in streamin listing
Signed-off-by: Denis Kirillov <d.kirillov@yadro.com>
This commit is contained in:
parent
6e8960b2ab
commit
b52552e8c2
4 changed files with 80 additions and 8 deletions
2
api/cache/listsession.go
vendored
2
api/cache/listsession.go
vendored
|
@ -55,7 +55,7 @@ func NewListSessionCache(config *Config) *ListSessionCache {
|
||||||
zap.String("expected", fmt.Sprintf("%T", session)))
|
zap.String("expected", fmt.Sprintf("%T", session)))
|
||||||
}
|
}
|
||||||
|
|
||||||
session.Cancel()
|
//session.Cancel()
|
||||||
}).Build()
|
}).Build()
|
||||||
return &ListSessionCache{cache: gc, logger: config.Logger}
|
return &ListSessionCache{cache: gc, logger: config.Logger}
|
||||||
}
|
}
|
||||||
|
|
|
@ -13,6 +13,7 @@ import (
|
||||||
"io"
|
"io"
|
||||||
"mime"
|
"mime"
|
||||||
"path/filepath"
|
"path/filepath"
|
||||||
|
"runtime"
|
||||||
"sort"
|
"sort"
|
||||||
"strconv"
|
"strconv"
|
||||||
"strings"
|
"strings"
|
||||||
|
@ -596,6 +597,20 @@ func (l *logWrapper) Printf(format string, args ...interface{}) {
|
||||||
l.log.Info(fmt.Sprintf(format, args...))
|
l.log.Info(fmt.Sprintf(format, args...))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func PrintMemUsage() {
|
||||||
|
var m runtime.MemStats
|
||||||
|
runtime.ReadMemStats(&m)
|
||||||
|
// For info on each, see: https://golang.org/pkg/runtime/#MemStats
|
||||||
|
fmt.Printf("Alloc = %v MiB", bToMb(m.Alloc))
|
||||||
|
fmt.Printf("\tTotalAlloc = %v MiB", bToMb(m.TotalAlloc))
|
||||||
|
fmt.Printf("\tSys = %v MiB", bToMb(m.Sys))
|
||||||
|
fmt.Printf("\tNumGC = %v\n", m.NumGC)
|
||||||
|
}
|
||||||
|
|
||||||
|
func bToMb(b uint64) uint64 {
|
||||||
|
return b / 1024 / 1024
|
||||||
|
}
|
||||||
|
|
||||||
func (n *layer) getLatestObjectsVersions(ctx context.Context, p allObjectParams) (objects []*data.ObjectInfo, next *data.ObjectInfo, err error) {
|
func (n *layer) getLatestObjectsVersions(ctx context.Context, p allObjectParams) (objects []*data.ObjectInfo, next *data.ObjectInfo, err error) {
|
||||||
if p.MaxKeys == 0 {
|
if p.MaxKeys == 0 {
|
||||||
return nil, nil, nil
|
return nil, nil, nil
|
||||||
|
@ -634,6 +649,20 @@ func (n *layer) getLatestObjectsVersions(ctx context.Context, p allObjectParams)
|
||||||
objects = append(objects, obj)
|
objects = append(objects, obj)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
//for node := range nodesGenerator(poolCtx, p, nodeVersions) {
|
||||||
|
// objects = append(objects, &data.ObjectInfo{
|
||||||
|
// ID: node.OID,
|
||||||
|
// IsDir: false,
|
||||||
|
// IsDeleteMarker: node.IsDeleteMarker(),
|
||||||
|
// Name: node.FilePath,
|
||||||
|
// Size: node.Size,
|
||||||
|
// Created: time.Time{},
|
||||||
|
// HashSum: node.ETag,
|
||||||
|
// Owner: user.ID{},
|
||||||
|
// Headers: nil,
|
||||||
|
// })
|
||||||
|
//}
|
||||||
|
|
||||||
sort.Slice(objects, func(i, j int) bool {
|
sort.Slice(objects, func(i, j int) bool {
|
||||||
return objects[i].Name < objects[j].Name
|
return objects[i].Name < objects[j].Name
|
||||||
})
|
})
|
||||||
|
@ -652,7 +681,7 @@ func (n *layer) getLatestObjectsVersionsV2(ctx context.Context, p allObjectParam
|
||||||
}
|
}
|
||||||
|
|
||||||
owner := n.BearerOwner(ctx)
|
owner := n.BearerOwner(ctx)
|
||||||
cacheKey := cache.CreateListSessionCacheKey(p.Bucket.CID, p.Prefix, p.Delimiter)
|
cacheKey := cache.CreateListSessionCacheKey(p.Bucket.CID, p.Prefix, p.ContinuationToken)
|
||||||
session := n.cache.GetListSession(owner, cacheKey)
|
session := n.cache.GetListSession(owner, cacheKey)
|
||||||
if session != nil {
|
if session != nil {
|
||||||
// after reading next object from stream in session
|
// after reading next object from stream in session
|
||||||
|
@ -690,6 +719,20 @@ func (n *layer) getLatestObjectsVersionsV2(ctx context.Context, p allObjectParam
|
||||||
objects = append(objects, obj)
|
objects = append(objects, obj)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
//for node := range generator {
|
||||||
|
// objects = append(objects, &data.ObjectInfo{
|
||||||
|
// ID: node.OID,
|
||||||
|
// IsDir: false,
|
||||||
|
// IsDeleteMarker: node.IsDeleteMarker(),
|
||||||
|
// Name: node.FilePath,
|
||||||
|
// Size: node.Size,
|
||||||
|
// Created: time.Time{},
|
||||||
|
// HashSum: node.ETag,
|
||||||
|
// Owner: user.ID{},
|
||||||
|
// Headers: nil,
|
||||||
|
// })
|
||||||
|
//}
|
||||||
|
|
||||||
if err = <-errorCh; err != nil {
|
if err = <-errorCh; err != nil {
|
||||||
return nil, nil, fmt.Errorf("failed to get next object from stream: %w", err)
|
return nil, nil, fmt.Errorf("failed to get next object from stream: %w", err)
|
||||||
}
|
}
|
||||||
|
@ -768,7 +811,7 @@ func nodesGeneratorVersions(ctx context.Context, p allObjectParams, nodeVersions
|
||||||
}
|
}
|
||||||
|
|
||||||
func nodesGeneratorStream(ctx context.Context, p allObjectParams, stream *data.ListSession) (<-chan *data.NodeVersion, <-chan error) {
|
func nodesGeneratorStream(ctx context.Context, p allObjectParams, stream *data.ListSession) (<-chan *data.NodeVersion, <-chan error) {
|
||||||
nodeCh := make(chan *data.NodeVersion)
|
nodeCh := make(chan *data.NodeVersion, 1000)
|
||||||
errCh := make(chan error, 1)
|
errCh := make(chan error, 1)
|
||||||
//existed := make(map[string]struct{}, p.MaxKeys) // to squash the same directories
|
//existed := make(map[string]struct{}, p.MaxKeys) // to squash the same directories
|
||||||
existed := stream.NamesMap
|
existed := stream.NamesMap
|
||||||
|
|
|
@ -125,19 +125,43 @@ func (w *PoolWrapper) GetSubTree(ctx context.Context, bktInfo *data.BucketInfo,
|
||||||
}
|
}
|
||||||
|
|
||||||
type SubTreeStreamImpl struct {
|
type SubTreeStreamImpl struct {
|
||||||
r *treepool.SubTreeReader
|
r *treepool.SubTreeReader
|
||||||
|
buffer []*grpcService.GetSubTreeResponse_Body
|
||||||
|
eof bool
|
||||||
|
index int
|
||||||
|
ln int
|
||||||
}
|
}
|
||||||
|
|
||||||
|
const bufSize = 1000
|
||||||
|
|
||||||
func (s *SubTreeStreamImpl) Next() (tree.NodeResponse, error) {
|
func (s *SubTreeStreamImpl) Next() (tree.NodeResponse, error) {
|
||||||
node, err := s.r.Next()
|
if s.index != -1 {
|
||||||
|
node := s.buffer[s.index]
|
||||||
|
s.index++
|
||||||
|
if s.index >= s.ln {
|
||||||
|
s.index = -1
|
||||||
|
}
|
||||||
|
return GetSubTreeResponseBodyWrapper{response: node}, nil
|
||||||
|
}
|
||||||
|
if s.eof {
|
||||||
|
return nil, io.EOF
|
||||||
|
}
|
||||||
|
|
||||||
|
var err error
|
||||||
|
s.ln, err = s.r.Read(s.buffer)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
if err != io.EOF {
|
if err != io.EOF {
|
||||||
err = handleError(err)
|
err = handleError(err)
|
||||||
|
return nil, fmt.Errorf("sub tree stream impl pool wrap: %w", err)
|
||||||
|
} else {
|
||||||
|
s.eof = true
|
||||||
}
|
}
|
||||||
return nil, fmt.Errorf("sub tree stream impl pool wrap: %w", err)
|
}
|
||||||
|
if s.ln > 0 {
|
||||||
|
s.index = 0
|
||||||
}
|
}
|
||||||
|
|
||||||
return GetSubTreeResponseBodyWrapper{node}, nil
|
return s.Next()
|
||||||
}
|
}
|
||||||
|
|
||||||
func (w *PoolWrapper) GetSubTreeStream(ctx context.Context, bktInfo *data.BucketInfo, treeID string, rootID uint64, depth uint32) (tree.SubTreeStream, error) {
|
func (w *PoolWrapper) GetSubTreeStream(ctx context.Context, bktInfo *data.BucketInfo, treeID string, rootID uint64, depth uint32) (tree.SubTreeStream, error) {
|
||||||
|
@ -154,7 +178,11 @@ func (w *PoolWrapper) GetSubTreeStream(ctx context.Context, bktInfo *data.Bucket
|
||||||
return nil, handleError(err)
|
return nil, handleError(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
return &SubTreeStreamImpl{r: subTreeReader}, nil
|
return &SubTreeStreamImpl{
|
||||||
|
r: subTreeReader,
|
||||||
|
buffer: make([]*grpcService.GetSubTreeResponse_Body, bufSize),
|
||||||
|
index: -1,
|
||||||
|
}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (w *PoolWrapper) AddNode(ctx context.Context, bktInfo *data.BucketInfo, treeID string, parent uint64, meta map[string]string) (uint64, error) {
|
func (w *PoolWrapper) AddNode(ctx context.Context, bktInfo *data.BucketInfo, treeID string, parent uint64, meta map[string]string) (uint64, error) {
|
||||||
|
|
|
@ -708,6 +708,7 @@ func (s *LatestVersionsByPrefixStreamImpl) Next(ctx context.Context) (*data.Node
|
||||||
if err != nil {
|
if err != nil {
|
||||||
if errors.Is(err, io.EOF) {
|
if errors.Is(err, io.EOF) {
|
||||||
s.innerStream = nil
|
s.innerStream = nil
|
||||||
|
s.namesMap = map[uint64]string{}
|
||||||
return s.Next(ctx)
|
return s.Next(ctx)
|
||||||
}
|
}
|
||||||
return nil, fmt.Errorf("inner stream: %w", err)
|
return nil, fmt.Errorf("inner stream: %w", err)
|
||||||
|
|
Loading…
Reference in a new issue