[#165] Don't use recursion in list streaming

Signed-off-by: Denis Kirillov <d.kirillov@yadro.com>
This commit is contained in:
Denis Kirillov 2024-01-21 14:13:37 +03:00
parent 2d7973b3f1
commit 8a30f18ff6

View file

@ -17,6 +17,7 @@ import (
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id" oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/user" "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/user"
"go.uber.org/zap" "go.uber.org/zap"
"golang.org/x/exp/maps"
) )
type ( type (
@ -666,63 +667,130 @@ type VersionsByPrefixStreamImpl struct {
ended bool ended bool
latestOnly bool latestOnly bool
currentLatest *data.NodeVersion currentLatest *data.NodeVersion
log *zap.Logger
} }
// Next todo remove recursion func (s *VersionsByPrefixStreamImpl) Next(context.Context) (*data.NodeVersion, error) {
func (s *VersionsByPrefixStreamImpl) Next(ctx context.Context) (*data.NodeVersion, error) {
if s.ended { if s.ended {
return nil, io.EOF return nil, io.EOF
} }
if s.innerStream == nil { for true {
if s.innerStream == nil {
node, err := s.getNodeFromMainStream()
if err != nil {
if errors.Is(err, io.EOF) {
s.ended = true
if s.currentLatest != nil {
return s.currentLatest, nil
}
}
return nil, fmt.Errorf("get node from main stream: %w", err)
}
if err = s.initInnerStream(node); err != nil {
return nil, fmt.Errorf("init inner stream: %w", err)
}
}
nodeVersion, err := s.getNodeVersionFromInnerStream()
if err != nil {
if errors.Is(err, io.EOF) {
s.innerStream = nil
maps.Clear(s.namesMap)
if s.currentLatest != nil && s.currentLatest.ID != s.intermediateRootID {
return s.currentLatest, nil
}
continue
}
return nil, fmt.Errorf("inner stream: %w", err)
}
return nodeVersion, nil
}
panic("unreachable code")
}
func (s *VersionsByPrefixStreamImpl) getNodeFromMainStream() (NodeResponse, error) {
for true {
node, err := s.mainStream.Next() node, err := s.mainStream.Next()
if err != nil { if err != nil {
if errors.Is(err, ErrNodeNotFound) { if errors.Is(err, ErrNodeNotFound) {
return nil, io.EOF return nil, io.EOF
} }
if errors.Is(err, io.EOF) {
s.ended = true
if s.latestOnly && s.currentLatest != nil {
return s.currentLatest, nil
}
}
return nil, fmt.Errorf("main stream next: %w", err) return nil, fmt.Errorf("main stream next: %w", err)
} }
if node.GetNodeID() == s.rootID || !strings.HasPrefix(getFilename(node), s.tailPrefix) { if node.GetNodeID() != s.rootID && strings.HasPrefix(getFilename(node), s.tailPrefix) {
return s.Next(ctx) return node, nil
}
if node.GetParentID() == s.rootID {
s.intermediateRootID = node.GetNodeID()
}
if isIntermediate(node) {
s.innerStream, err = s.service.GetSubTreeStream(s.ctx, s.bktInfo, versionTree, node.GetNodeID(), maxGetSubTreeDepth)
if err != nil {
return nil, fmt.Errorf("get sub tree node from main stream: %w", err)
}
} else {
s.innerStream = &DummySubTreeStream{data: node}
} }
} }
node, err := s.innerStream.Next() panic("unreachable code")
if err != nil { }
if errors.Is(err, io.EOF) {
s.innerStream = nil func (s *VersionsByPrefixStreamImpl) initInnerStream(node NodeResponse) (err error) {
s.namesMap = map[uint64]string{} if node.GetParentID() == s.rootID {
if s.latestOnly && s.currentLatest != nil && s.currentLatest.ID != s.intermediateRootID { s.intermediateRootID = node.GetNodeID()
return s.currentLatest, nil
}
return s.Next(ctx)
}
return nil, fmt.Errorf("inner stream: %w", err)
} }
treeNode, fileName, err := parseTreeNode(node) if isIntermediate(node) {
s.innerStream, err = s.service.GetSubTreeStream(s.ctx, s.bktInfo, versionTree, node.GetNodeID(), maxGetSubTreeDepth)
if err != nil {
return fmt.Errorf("get sub tree node from main stream: %w", err)
}
} else {
s.innerStream = &DummySubTreeStream{data: node}
}
return nil
}
func (s *VersionsByPrefixStreamImpl) getNodeVersionFromInnerStream() (*data.NodeVersion, error) {
for true {
node, err := s.innerStream.Next()
if err != nil {
return nil, fmt.Errorf("inner stream: %w", err)
}
nodeVersion, skip, err := s.parseNodeResponse(node)
if err != nil {
return nil, err
}
if skip {
continue
}
if s.latestOnly {
if s.currentLatest == nil {
s.currentLatest = nodeVersion
continue
}
if s.currentLatest.FilePath != nodeVersion.FilePath {
res := s.currentLatest
s.currentLatest = nodeVersion
return res, nil
}
if s.currentLatest.Timestamp < nodeVersion.Timestamp {
s.currentLatest = nodeVersion
}
continue
}
return nodeVersion, nil
}
panic("unreachable code")
}
func (s *VersionsByPrefixStreamImpl) parseNodeResponse(node NodeResponse) (res *data.NodeVersion, skip bool, err error) {
trNode, fileName, err := parseTreeNode(node)
if err != nil { if err != nil {
return s.Next(ctx) s.log.Debug("parse tree node", zap.Error(err))
return nil, true, nil
} }
var parentPrefix string var parentPrefix string
@ -731,41 +799,20 @@ func (s *VersionsByPrefixStreamImpl) Next(ctx context.Context) (*data.NodeVersio
} }
var filepath string var filepath string
if treeNode.ID != s.intermediateRootID { if trNode.ID != s.intermediateRootID {
if filepath, err = formFilePath(node, fileName, s.namesMap); err != nil { if filepath, err = formFilePath(node, fileName, s.namesMap); err != nil {
return nil, fmt.Errorf("invalid node order: %w", err) return nil, false, fmt.Errorf("invalid node order: %w", err)
} }
} else { } else {
filepath = parentPrefix + fileName filepath = parentPrefix + fileName
s.namesMap[treeNode.ID] = filepath s.namesMap[trNode.ID] = filepath
} }
if treeNode.ObjID.Equals(oid.ID{}) { // The node can be intermediate, but we still want to update namesMap if trNode.ObjID.Equals(oid.ID{}) { // The node can be intermediate, but we still want to update namesMap
return s.Next(ctx) return nil, true, nil
} }
nodeVersion := newNodeVersionFromTreeNode(filepath, treeNode) return newNodeVersionFromTreeNode(filepath, trNode), false, nil
if s.latestOnly {
if s.currentLatest == nil {
s.currentLatest = nodeVersion
return s.Next(ctx)
}
if s.currentLatest.FilePath != nodeVersion.FilePath {
res := s.currentLatest
s.currentLatest = nodeVersion
return res, nil
}
if s.currentLatest.Timestamp < nodeVersion.Timestamp {
s.currentLatest = nodeVersion
}
return s.Next(ctx)
}
return nodeVersion, nil
} }
func (c *Tree) InitVersionsByPrefixStream(ctx context.Context, bktInfo *data.BucketInfo, prefix string, latestOnly bool) (data.VersionsStream, error) { func (c *Tree) InitVersionsByPrefixStream(ctx context.Context, bktInfo *data.BucketInfo, prefix string, latestOnly bool) (data.VersionsStream, error) {
@ -787,6 +834,7 @@ func (c *Tree) InitVersionsByPrefixStream(ctx context.Context, bktInfo *data.Buc
headPrefix: strings.TrimSuffix(prefix, tailPrefix), headPrefix: strings.TrimSuffix(prefix, tailPrefix),
tailPrefix: tailPrefix, tailPrefix: tailPrefix,
latestOnly: latestOnly, latestOnly: latestOnly,
log: c.log,
}, nil }, nil
} }