From b52552e8c2237a68d9e2bbccd68439a9d31ed420 Mon Sep 17 00:00:00 2001 From: Denis Kirillov Date: Tue, 17 Oct 2023 10:43:58 +0300 Subject: [PATCH] [#165] Add batching in streamin listing Signed-off-by: Denis Kirillov --- api/cache/listsession.go | 2 +- api/layer/object.go | 47 ++++++++++++++++++++++- internal/frostfs/services/pool_wrapper.go | 38 +++++++++++++++--- pkg/service/tree/tree.go | 1 + 4 files changed, 80 insertions(+), 8 deletions(-) diff --git a/api/cache/listsession.go b/api/cache/listsession.go index bbe4a80..a8405aa 100644 --- a/api/cache/listsession.go +++ b/api/cache/listsession.go @@ -55,7 +55,7 @@ func NewListSessionCache(config *Config) *ListSessionCache { zap.String("expected", fmt.Sprintf("%T", session))) } - session.Cancel() + //session.Cancel() }).Build() return &ListSessionCache{cache: gc, logger: config.Logger} } diff --git a/api/layer/object.go b/api/layer/object.go index 86c4851..670a696 100644 --- a/api/layer/object.go +++ b/api/layer/object.go @@ -13,6 +13,7 @@ import ( "io" "mime" "path/filepath" + "runtime" "sort" "strconv" "strings" @@ -596,6 +597,20 @@ func (l *logWrapper) Printf(format string, args ...interface{}) { l.log.Info(fmt.Sprintf(format, args...)) } +func PrintMemUsage() { + var m runtime.MemStats + runtime.ReadMemStats(&m) + // For info on each, see: https://golang.org/pkg/runtime/#MemStats + fmt.Printf("Alloc = %v MiB", bToMb(m.Alloc)) + fmt.Printf("\tTotalAlloc = %v MiB", bToMb(m.TotalAlloc)) + fmt.Printf("\tSys = %v MiB", bToMb(m.Sys)) + fmt.Printf("\tNumGC = %v\n", m.NumGC) +} + +func bToMb(b uint64) uint64 { + return b / 1024 / 1024 +} + func (n *layer) getLatestObjectsVersions(ctx context.Context, p allObjectParams) (objects []*data.ObjectInfo, next *data.ObjectInfo, err error) { if p.MaxKeys == 0 { return nil, nil, nil @@ -634,6 +649,20 @@ func (n *layer) getLatestObjectsVersions(ctx context.Context, p allObjectParams) objects = append(objects, obj) } + //for node := range nodesGenerator(poolCtx, p, nodeVersions) { + // objects = append(objects, &data.ObjectInfo{ + // ID: node.OID, + // IsDir: false, + // IsDeleteMarker: node.IsDeleteMarker(), + // Name: node.FilePath, + // Size: node.Size, + // Created: time.Time{}, + // HashSum: node.ETag, + // Owner: user.ID{}, + // Headers: nil, + // }) + //} + sort.Slice(objects, func(i, j int) bool { return objects[i].Name < objects[j].Name }) @@ -652,7 +681,7 @@ func (n *layer) getLatestObjectsVersionsV2(ctx context.Context, p allObjectParam } owner := n.BearerOwner(ctx) - cacheKey := cache.CreateListSessionCacheKey(p.Bucket.CID, p.Prefix, p.Delimiter) + cacheKey := cache.CreateListSessionCacheKey(p.Bucket.CID, p.Prefix, p.ContinuationToken) session := n.cache.GetListSession(owner, cacheKey) if session != nil { // after reading next object from stream in session @@ -690,6 +719,20 @@ func (n *layer) getLatestObjectsVersionsV2(ctx context.Context, p allObjectParam objects = append(objects, obj) } + //for node := range generator { + // objects = append(objects, &data.ObjectInfo{ + // ID: node.OID, + // IsDir: false, + // IsDeleteMarker: node.IsDeleteMarker(), + // Name: node.FilePath, + // Size: node.Size, + // Created: time.Time{}, + // HashSum: node.ETag, + // Owner: user.ID{}, + // Headers: nil, + // }) + //} + if err = <-errorCh; err != nil { return nil, nil, fmt.Errorf("failed to get next object from stream: %w", err) } @@ -768,7 +811,7 @@ func nodesGeneratorVersions(ctx context.Context, p allObjectParams, nodeVersions } func nodesGeneratorStream(ctx context.Context, p allObjectParams, stream *data.ListSession) (<-chan *data.NodeVersion, <-chan error) { - nodeCh := make(chan *data.NodeVersion) + nodeCh := make(chan *data.NodeVersion, 1000) errCh := make(chan error, 1) //existed := make(map[string]struct{}, p.MaxKeys) // to squash the same directories existed := stream.NamesMap diff --git a/internal/frostfs/services/pool_wrapper.go b/internal/frostfs/services/pool_wrapper.go index 67f2b7a..480d753 100644 --- a/internal/frostfs/services/pool_wrapper.go +++ b/internal/frostfs/services/pool_wrapper.go @@ -125,19 +125,43 @@ func (w *PoolWrapper) GetSubTree(ctx context.Context, bktInfo *data.BucketInfo, } type SubTreeStreamImpl struct { - r *treepool.SubTreeReader + r *treepool.SubTreeReader + buffer []*grpcService.GetSubTreeResponse_Body + eof bool + index int + ln int } +const bufSize = 1000 + func (s *SubTreeStreamImpl) Next() (tree.NodeResponse, error) { - node, err := s.r.Next() + if s.index != -1 { + node := s.buffer[s.index] + s.index++ + if s.index >= s.ln { + s.index = -1 + } + return GetSubTreeResponseBodyWrapper{response: node}, nil + } + if s.eof { + return nil, io.EOF + } + + var err error + s.ln, err = s.r.Read(s.buffer) if err != nil { if err != io.EOF { err = handleError(err) + return nil, fmt.Errorf("sub tree stream impl pool wrap: %w", err) + } else { + s.eof = true } - return nil, fmt.Errorf("sub tree stream impl pool wrap: %w", err) + } + if s.ln > 0 { + s.index = 0 } - return GetSubTreeResponseBodyWrapper{node}, nil + return s.Next() } func (w *PoolWrapper) GetSubTreeStream(ctx context.Context, bktInfo *data.BucketInfo, treeID string, rootID uint64, depth uint32) (tree.SubTreeStream, error) { @@ -154,7 +178,11 @@ func (w *PoolWrapper) GetSubTreeStream(ctx context.Context, bktInfo *data.Bucket return nil, handleError(err) } - return &SubTreeStreamImpl{r: subTreeReader}, nil + return &SubTreeStreamImpl{ + r: subTreeReader, + buffer: make([]*grpcService.GetSubTreeResponse_Body, bufSize), + index: -1, + }, nil } func (w *PoolWrapper) AddNode(ctx context.Context, bktInfo *data.BucketInfo, treeID string, parent uint64, meta map[string]string) (uint64, error) { diff --git a/pkg/service/tree/tree.go b/pkg/service/tree/tree.go index dba9bf6..b0c25f8 100644 --- a/pkg/service/tree/tree.go +++ b/pkg/service/tree/tree.go @@ -708,6 +708,7 @@ func (s *LatestVersionsByPrefixStreamImpl) Next(ctx context.Context) (*data.Node if err != nil { if errors.Is(err, io.EOF) { s.innerStream = nil + s.namesMap = map[uint64]string{} return s.Next(ctx) } return nil, fmt.Errorf("inner stream: %w", err)