From 8a30f18ff6ec4ccd867064989f37792ec203c619 Mon Sep 17 00:00:00 2001 From: Denis Kirillov Date: Sun, 21 Jan 2024 14:13:37 +0300 Subject: [PATCH] [#165] Don't use recursion in list streaming Signed-off-by: Denis Kirillov --- pkg/service/tree/tree.go | 176 +++++++++++++++++++++++++-------------- 1 file changed, 112 insertions(+), 64 deletions(-) diff --git a/pkg/service/tree/tree.go b/pkg/service/tree/tree.go index 8436fc3..e36d9ef 100644 --- a/pkg/service/tree/tree.go +++ b/pkg/service/tree/tree.go @@ -17,6 +17,7 @@ import ( oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id" "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/user" "go.uber.org/zap" + "golang.org/x/exp/maps" ) type ( @@ -666,63 +667,130 @@ type VersionsByPrefixStreamImpl struct { ended bool latestOnly bool currentLatest *data.NodeVersion + log *zap.Logger } -// Next todo remove recursion -func (s *VersionsByPrefixStreamImpl) Next(ctx context.Context) (*data.NodeVersion, error) { +func (s *VersionsByPrefixStreamImpl) Next(context.Context) (*data.NodeVersion, error) { if s.ended { return nil, io.EOF } - if s.innerStream == nil { + for true { + if s.innerStream == nil { + node, err := s.getNodeFromMainStream() + if err != nil { + if errors.Is(err, io.EOF) { + s.ended = true + if s.currentLatest != nil { + return s.currentLatest, nil + } + } + return nil, fmt.Errorf("get node from main stream: %w", err) + } + + if err = s.initInnerStream(node); err != nil { + return nil, fmt.Errorf("init inner stream: %w", err) + } + } + + nodeVersion, err := s.getNodeVersionFromInnerStream() + if err != nil { + if errors.Is(err, io.EOF) { + s.innerStream = nil + maps.Clear(s.namesMap) + if s.currentLatest != nil && s.currentLatest.ID != s.intermediateRootID { + return s.currentLatest, nil + } + continue + } + return nil, fmt.Errorf("inner stream: %w", err) + } + return nodeVersion, nil + } + + panic("unreachable code") +} + +func (s *VersionsByPrefixStreamImpl) getNodeFromMainStream() (NodeResponse, error) { + for true { node, err := s.mainStream.Next() if err != nil { if errors.Is(err, ErrNodeNotFound) { return nil, io.EOF } - if errors.Is(err, io.EOF) { - s.ended = true - if s.latestOnly && s.currentLatest != nil { - return s.currentLatest, nil - } - } return nil, fmt.Errorf("main stream next: %w", err) } - if node.GetNodeID() == s.rootID || !strings.HasPrefix(getFilename(node), s.tailPrefix) { - return s.Next(ctx) - } - - if node.GetParentID() == s.rootID { - s.intermediateRootID = node.GetNodeID() - } - - if isIntermediate(node) { - s.innerStream, err = s.service.GetSubTreeStream(s.ctx, s.bktInfo, versionTree, node.GetNodeID(), maxGetSubTreeDepth) - if err != nil { - return nil, fmt.Errorf("get sub tree node from main stream: %w", err) - } - } else { - s.innerStream = &DummySubTreeStream{data: node} + if node.GetNodeID() != s.rootID && strings.HasPrefix(getFilename(node), s.tailPrefix) { + return node, nil } } - node, err := s.innerStream.Next() - if err != nil { - if errors.Is(err, io.EOF) { - s.innerStream = nil - s.namesMap = map[uint64]string{} - if s.latestOnly && s.currentLatest != nil && s.currentLatest.ID != s.intermediateRootID { - return s.currentLatest, nil - } - return s.Next(ctx) - } - return nil, fmt.Errorf("inner stream: %w", err) + panic("unreachable code") +} + +func (s *VersionsByPrefixStreamImpl) initInnerStream(node NodeResponse) (err error) { + if node.GetParentID() == s.rootID { + s.intermediateRootID = node.GetNodeID() } - treeNode, fileName, err := parseTreeNode(node) + if isIntermediate(node) { + s.innerStream, err = s.service.GetSubTreeStream(s.ctx, s.bktInfo, versionTree, node.GetNodeID(), maxGetSubTreeDepth) + if err != nil { + return fmt.Errorf("get sub tree node from main stream: %w", err) + } + } else { + s.innerStream = &DummySubTreeStream{data: node} + } + + return nil +} + +func (s *VersionsByPrefixStreamImpl) getNodeVersionFromInnerStream() (*data.NodeVersion, error) { + for true { + node, err := s.innerStream.Next() + if err != nil { + return nil, fmt.Errorf("inner stream: %w", err) + } + + nodeVersion, skip, err := s.parseNodeResponse(node) + if err != nil { + return nil, err + } + if skip { + continue + } + + if s.latestOnly { + if s.currentLatest == nil { + s.currentLatest = nodeVersion + continue + } + + if s.currentLatest.FilePath != nodeVersion.FilePath { + res := s.currentLatest + s.currentLatest = nodeVersion + return res, nil + } + + if s.currentLatest.Timestamp < nodeVersion.Timestamp { + s.currentLatest = nodeVersion + } + + continue + } + + return nodeVersion, nil + } + + panic("unreachable code") +} + +func (s *VersionsByPrefixStreamImpl) parseNodeResponse(node NodeResponse) (res *data.NodeVersion, skip bool, err error) { + trNode, fileName, err := parseTreeNode(node) if err != nil { - return s.Next(ctx) + s.log.Debug("parse tree node", zap.Error(err)) + return nil, true, nil } var parentPrefix string @@ -731,41 +799,20 @@ func (s *VersionsByPrefixStreamImpl) Next(ctx context.Context) (*data.NodeVersio } var filepath string - if treeNode.ID != s.intermediateRootID { + if trNode.ID != s.intermediateRootID { if filepath, err = formFilePath(node, fileName, s.namesMap); err != nil { - return nil, fmt.Errorf("invalid node order: %w", err) + return nil, false, fmt.Errorf("invalid node order: %w", err) } } else { filepath = parentPrefix + fileName - s.namesMap[treeNode.ID] = filepath + s.namesMap[trNode.ID] = filepath } - if treeNode.ObjID.Equals(oid.ID{}) { // The node can be intermediate, but we still want to update namesMap - return s.Next(ctx) + if trNode.ObjID.Equals(oid.ID{}) { // The node can be intermediate, but we still want to update namesMap + return nil, true, nil } - nodeVersion := newNodeVersionFromTreeNode(filepath, treeNode) - - if s.latestOnly { - if s.currentLatest == nil { - s.currentLatest = nodeVersion - return s.Next(ctx) - } - - if s.currentLatest.FilePath != nodeVersion.FilePath { - res := s.currentLatest - s.currentLatest = nodeVersion - return res, nil - } - - if s.currentLatest.Timestamp < nodeVersion.Timestamp { - s.currentLatest = nodeVersion - } - - return s.Next(ctx) - } - - return nodeVersion, nil + return newNodeVersionFromTreeNode(filepath, trNode), false, nil } func (c *Tree) InitVersionsByPrefixStream(ctx context.Context, bktInfo *data.BucketInfo, prefix string, latestOnly bool) (data.VersionsStream, error) { @@ -787,6 +834,7 @@ func (c *Tree) InitVersionsByPrefixStream(ctx context.Context, bktInfo *data.Buc headPrefix: strings.TrimSuffix(prefix, tailPrefix), tailPrefix: tailPrefix, latestOnly: latestOnly, + log: c.log, }, nil }