[#437] tree: Support removing old split system nodes
All checks were successful
/ DCO (pull_request) Successful in 58s
/ Vulncheck (pull_request) Successful in 1m12s
/ Builds (1.21) (pull_request) Successful in 3m17s
/ Builds (1.22) (pull_request) Successful in 53s
/ Lint (pull_request) Successful in 4m46s
/ Tests (1.21) (pull_request) Successful in 1m34s
/ Tests (1.22) (pull_request) Successful in 1m27s

It's need to fit user expectation on deleting CORS for example.
Previously after removing cors (that was uploaded in split manner)
we can still get some data (from other node)
because deletion worked only for latest node version.

Signed-off-by: Denis Kirillov <d.kirillov@yadro.com>
Signed-off-by: Alex Vanin <a.vanin@yadro.com>
This commit is contained in:
Denis Kirillov 2024-07-17 12:44:38 +03:00 committed by Alex Vanin
parent c506620199
commit 41eda7aa35
5 changed files with 159 additions and 92 deletions

View file

@ -60,14 +60,16 @@ func (n *Layer) PutBucketCORS(ctx context.Context, p *PutCORSParams) error {
return fmt.Errorf("put cors object: %w", err) return fmt.Errorf("put cors object: %w", err)
} }
objToDelete, err := n.treeService.PutBucketCORS(ctx, p.BktInfo, newAddress(corsBkt.CID, objID)) objsToDelete, err := n.treeService.PutBucketCORS(ctx, p.BktInfo, newAddress(corsBkt.CID, objID))
objToDeleteNotFound := errorsStd.Is(err, ErrNoNodeToRemove) objToDeleteNotFound := errorsStd.Is(err, ErrNoNodeToRemove)
if err != nil && !objToDeleteNotFound { if err != nil && !objToDeleteNotFound {
return err return err
} }
if !objToDeleteNotFound { if !objToDeleteNotFound {
n.deleteCORSObject(ctx, p.BktInfo, objToDelete) for _, addr := range objsToDelete {
n.deleteCORSObject(ctx, p.BktInfo, addr)
}
} }
n.cache.PutCORS(n.BearerOwner(ctx), p.BktInfo, cors) n.cache.PutCORS(n.BearerOwner(ctx), p.BktInfo, cors)
@ -101,22 +103,15 @@ func (n *Layer) GetBucketCORS(ctx context.Context, bktInfo *data.BucketInfo) (*d
} }
func (n *Layer) DeleteBucketCORS(ctx context.Context, bktInfo *data.BucketInfo) error { func (n *Layer) DeleteBucketCORS(ctx context.Context, bktInfo *data.BucketInfo) error {
obj, err := n.treeService.DeleteBucketCORS(ctx, bktInfo) objs, err := n.treeService.DeleteBucketCORS(ctx, bktInfo)
objNotFound := errorsStd.Is(err, ErrNoNodeToRemove) objNotFound := errorsStd.Is(err, ErrNoNodeToRemove)
if err != nil && !objNotFound { if err != nil && !objNotFound {
return err return err
} }
if !objNotFound { if !objNotFound {
var prmAuth PrmAuth for _, addr := range objs {
corsBkt := bktInfo n.deleteCORSObject(ctx, bktInfo, addr)
if !obj.Container().Equals(bktInfo.CID) && !obj.Container().Equals(cid.ID{}) {
corsBkt = &data.BucketInfo{CID: obj.Container()}
prmAuth.PrivateKey = &n.gateKey.PrivateKey
}
if err = n.objectDeleteWithAuth(ctx, corsBkt, obj.Object(), prmAuth); err != nil {
return fmt.Errorf("delete cors object: %w", err)
} }
} }

View file

@ -127,7 +127,7 @@ func (t *TreeServiceMock) GetBucketCORS(_ context.Context, bktInfo *data.BucketI
return addr, nil return addr, nil
} }
func (t *TreeServiceMock) PutBucketCORS(_ context.Context, bktInfo *data.BucketInfo, addr oid.Address) (oid.Address, error) { func (t *TreeServiceMock) PutBucketCORS(_ context.Context, bktInfo *data.BucketInfo, addr oid.Address) ([]oid.Address, error) {
systemMap, ok := t.system[bktInfo.CID.EncodeToString()] systemMap, ok := t.system[bktInfo.CID.EncodeToString()]
if !ok { if !ok {
systemMap = make(map[string]*data.BaseNodeVersion) systemMap = make(map[string]*data.BaseNodeVersion)
@ -139,10 +139,10 @@ func (t *TreeServiceMock) PutBucketCORS(_ context.Context, bktInfo *data.BucketI
t.system[bktInfo.CID.EncodeToString()] = systemMap t.system[bktInfo.CID.EncodeToString()] = systemMap
return oid.Address{}, ErrNoNodeToRemove return nil, ErrNoNodeToRemove
} }
func (t *TreeServiceMock) DeleteBucketCORS(context.Context, *data.BucketInfo) (oid.Address, error) { func (t *TreeServiceMock) DeleteBucketCORS(context.Context, *data.BucketInfo) ([]oid.Address, error) {
panic("implement me") panic("implement me")
} }

View file

@ -25,13 +25,13 @@ type TreeService interface {
// PutBucketCORS puts a node to a system tree and returns objectID of a previous cors config which must be deleted in FrostFS. // PutBucketCORS puts a node to a system tree and returns objectID of a previous cors config which must be deleted in FrostFS.
// //
// If object id to remove is not found returns ErrNoNodeToRemove error. // If object ids to remove is not found returns ErrNoNodeToRemove error.
PutBucketCORS(ctx context.Context, bktInfo *data.BucketInfo, addr oid.Address) (oid.Address, error) PutBucketCORS(ctx context.Context, bktInfo *data.BucketInfo, addr oid.Address) ([]oid.Address, error)
// DeleteBucketCORS removes a node from a system tree and returns objID which must be deleted in FrostFS. // DeleteBucketCORS removes a node from a system tree and returns objID which must be deleted in FrostFS.
// //
// If object id to remove is not found returns ErrNoNodeToRemove error. // If object ids to remove is not found returns ErrNoNodeToRemove error.
DeleteBucketCORS(ctx context.Context, bktInfo *data.BucketInfo) (oid.Address, error) DeleteBucketCORS(ctx context.Context, bktInfo *data.BucketInfo) ([]oid.Address, error)
GetObjectTagging(ctx context.Context, bktInfo *data.BucketInfo, objVersion *data.NodeVersion) (map[string]string, error) GetObjectTagging(ctx context.Context, bktInfo *data.BucketInfo, objVersion *data.NodeVersion) (map[string]string, error)
PutObjectTagging(ctx context.Context, bktInfo *data.BucketInfo, objVersion *data.NodeVersion, tagSet map[string]string) error PutObjectTagging(ctx context.Context, bktInfo *data.BucketInfo, objVersion *data.NodeVersion, tagSet map[string]string) error

View file

@ -142,12 +142,15 @@ const (
CouldntCacheSubject = "couldn't cache subject info" CouldntCacheSubject = "couldn't cache subject info"
UserGroupsListIsEmpty = "user groups list is empty, subject not found" UserGroupsListIsEmpty = "user groups list is empty, subject not found"
CouldntCacheUserKey = "couldn't cache user key" CouldntCacheUserKey = "couldn't cache user key"
FoundSeveralBucketCorsNodes = "found several bucket cors nodes, latest be used" ObjectTaggingNodeHasMultipleIDs = "object tagging node has multiple ids"
FoundSeveralObjectTaggingNodes = "found several object tagging nodes, latest be used" BucketTaggingNodeHasMultipleIDs = "bucket tagging node has multiple ids"
FoundSeveralBucketTaggingNodes = "found several bucket tagging nodes, latest be used" BucketSettingsNodeHasMultipleIDs = "bucket settings node has multiple ids"
FoundSeveralBucketSettingsNodes = "found several bucket settings nodes, latest be used" BucketCORSNodeHasMultipleIDs = "bucket cors node has multiple ids"
SystemNodeHasMultipleIDs = "system node has multiple ids"
FailedToRemoveOldSystemNode = "failed to remove old system node"
FailedToParseAddressInTreeNode = "failed to parse object addr in tree node"
UnexpectedMultiNodeIDsInSubTreeMultiParts = "unexpected multi node ids in sub tree multi parts" UnexpectedMultiNodeIDsInSubTreeMultiParts = "unexpected multi node ids in sub tree multi parts"
FoundSeveralSystemNodes = "found several system nodes, latest be used" FoundSeveralSystemNodes = "found several system nodes"
FailedToParsePartInfo = "failed to parse part info" FailedToParsePartInfo = "failed to parse part info"
CouldNotFetchCORSContainerInfo = "couldn't fetch CORS container info" CouldNotFetchCORSContainerInfo = "couldn't fetch CORS container info"
CloseCredsObjectPayload = "close creds object payload" CloseCredsObjectPayload = "close creds object payload"

View file

@ -54,6 +54,11 @@ type (
Meta map[string]string Meta map[string]string
} }
multiSystemNode struct {
// the first element is latest
nodes []*treeNode
}
GetNodesParams struct { GetNodesParams struct {
BktInfo *data.BucketInfo BktInfo *data.BucketInfo
TreeID string TreeID string
@ -270,6 +275,45 @@ func newNodeVersionFromTreeNode(log *zap.Logger, filePath string, treeNode *tree
return version, nil return version, nil
} }
func newMultiNode(nodes []NodeResponse) (*multiSystemNode, error) {
var (
err error
index int
maxTimestamp uint64
)
if len(nodes) == 0 {
return nil, errors.New("multi node must have at least one node")
}
treeNodes := make([]*treeNode, len(nodes))
for i, node := range nodes {
if treeNodes[i], err = newTreeNode(node); err != nil {
return nil, fmt.Errorf("parse system node response: %w", err)
}
if timestamp := getMaxTimestamp(node); timestamp > maxTimestamp {
index = i
maxTimestamp = timestamp
}
}
treeNodes[0], treeNodes[index] = treeNodes[index], treeNodes[0]
return &multiSystemNode{
nodes: treeNodes,
}, nil
}
func (m *multiSystemNode) Latest() *treeNode {
return m.nodes[0]
}
func (m *multiSystemNode) Old() []*treeNode {
return m.nodes[1:]
}
func newMultipartInfoFromTreeNode(log *zap.Logger, filePath string, treeNode *treeNode) (*data.MultipartInfo, error) { func newMultipartInfoFromTreeNode(log *zap.Logger, filePath string, treeNode *treeNode) (*data.MultipartInfo, error) {
uploadID, _ := treeNode.Get(uploadIDKV) uploadID, _ := treeNode.Get(uploadIDKV)
if uploadID == "" { if uploadID == "" {
@ -396,11 +440,13 @@ func newPartInfo(node NodeResponse) (*data.PartInfo, error) {
} }
func (c *Tree) GetSettingsNode(ctx context.Context, bktInfo *data.BucketInfo) (*data.BucketSettings, error) { func (c *Tree) GetSettingsNode(ctx context.Context, bktInfo *data.BucketInfo) (*data.BucketSettings, error) {
node, err := c.getSystemNode(ctx, bktInfo, []string{settingsFileName}) multiNode, err := c.getSystemNode(ctx, bktInfo, settingsFileName)
if err != nil { if err != nil {
return nil, fmt.Errorf("couldn't get node: %w", err) return nil, fmt.Errorf("couldn't get node: %w", err)
} }
node := multiNode.Latest()
settings := &data.BucketSettings{Versioning: data.VersioningUnversioned} settings := &data.BucketSettings{Versioning: data.VersioningUnversioned}
if versioningValue, ok := node.Get(versioningKV); ok { if versioningValue, ok := node.Get(versioningKV); ok {
settings.Versioning = versioningValue settings.Versioning = versioningValue
@ -424,7 +470,7 @@ func (c *Tree) GetSettingsNode(ctx context.Context, bktInfo *data.BucketInfo) (*
} }
func (c *Tree) PutSettingsNode(ctx context.Context, bktInfo *data.BucketInfo, settings *data.BucketSettings) error { func (c *Tree) PutSettingsNode(ctx context.Context, bktInfo *data.BucketInfo, settings *data.BucketSettings) error {
node, err := c.getSystemNode(ctx, bktInfo, []string{settingsFileName}) multiNode, err := c.getSystemNode(ctx, bktInfo, settingsFileName)
isErrNotFound := errors.Is(err, layer.ErrNodeNotFound) isErrNotFound := errors.Is(err, layer.ErrNodeNotFound)
if err != nil && !isErrNotFound { if err != nil && !isErrNotFound {
return fmt.Errorf("couldn't get node: %w", err) return fmt.Errorf("couldn't get node: %w", err)
@ -437,28 +483,35 @@ func (c *Tree) PutSettingsNode(ctx context.Context, bktInfo *data.BucketInfo, se
return err return err
} }
ind := node.GetLatestNodeIndex() latest := multiNode.Latest()
if node.IsSplit() { ind := latest.GetLatestNodeIndex()
c.reqLogger(ctx).Warn(logs.FoundSeveralBucketSettingsNodes) if latest.IsSplit() {
c.reqLogger(ctx).Error(logs.BucketSettingsNodeHasMultipleIDs, zap.Uint64s("ids", latest.ID))
} }
return c.service.MoveNode(ctx, bktInfo, systemTree, node.ID[ind], 0, meta) if err = c.service.MoveNode(ctx, bktInfo, systemTree, latest.ID[ind], 0, meta); err != nil {
return fmt.Errorf("move settings node: %w", err)
}
c.cleanOldNodes(ctx, multiNode.Old(), bktInfo)
return nil
} }
func (c *Tree) GetBucketCORS(ctx context.Context, bktInfo *data.BucketInfo) (oid.Address, error) { func (c *Tree) GetBucketCORS(ctx context.Context, bktInfo *data.BucketInfo) (oid.Address, error) {
node, err := c.getSystemNode(ctx, bktInfo, []string{corsFilename}) node, err := c.getSystemNode(ctx, bktInfo, corsFilename)
if err != nil { if err != nil {
return oid.Address{}, err return oid.Address{}, err
} }
return getCORSAddress(node) return getCORSAddress(node.Latest())
} }
func (c *Tree) PutBucketCORS(ctx context.Context, bktInfo *data.BucketInfo, addr oid.Address) (oid.Address, error) { func (c *Tree) PutBucketCORS(ctx context.Context, bktInfo *data.BucketInfo, addr oid.Address) ([]oid.Address, error) {
node, err := c.getSystemNode(ctx, bktInfo, []string{corsFilename}) multiNode, err := c.getSystemNode(ctx, bktInfo, corsFilename)
isErrNotFound := errors.Is(err, layer.ErrNodeNotFound) isErrNotFound := errors.Is(err, layer.ErrNodeNotFound)
if err != nil && !isErrNotFound { if err != nil && !isErrNotFound {
return oid.Address{}, fmt.Errorf("couldn't get node: %w", err) return nil, fmt.Errorf("couldn't get node: %w", err)
} }
meta := make(map[string]string) meta := make(map[string]string)
@ -468,45 +521,49 @@ func (c *Tree) PutBucketCORS(ctx context.Context, bktInfo *data.BucketInfo, addr
if isErrNotFound { if isErrNotFound {
if _, err = c.service.AddNode(ctx, bktInfo, systemTree, 0, meta); err != nil { if _, err = c.service.AddNode(ctx, bktInfo, systemTree, 0, meta); err != nil {
return oid.Address{}, err return nil, err
} }
return oid.Address{}, layer.ErrNoNodeToRemove return nil, layer.ErrNoNodeToRemove
} }
ind := node.GetLatestNodeIndex() latest := multiNode.Latest()
if node.IsSplit() { ind := latest.GetLatestNodeIndex()
c.reqLogger(ctx).Warn(logs.FoundSeveralBucketCorsNodes) if latest.IsSplit() {
c.reqLogger(ctx).Error(logs.BucketCORSNodeHasMultipleIDs)
} }
prevAddr, err := getCORSAddress(node) if err = c.service.MoveNode(ctx, bktInfo, systemTree, latest.ID[ind], 0, meta); err != nil {
return nil, fmt.Errorf("move cors node: %w", err)
}
objToDelete := make([]oid.Address, 1, len(multiNode.nodes))
objToDelete[0], err = getCORSAddress(latest)
if err != nil { if err != nil {
return oid.Address{}, fmt.Errorf("couldn't get cors object addr: %w", err) return nil, fmt.Errorf("parse object addr of latest cors node in tree: %w", err)
} }
return prevAddr, c.service.MoveNode(ctx, bktInfo, systemTree, node.ID[ind], 0, meta) objToDelete = append(objToDelete, c.cleanOldNodes(ctx, multiNode.Old(), bktInfo)...)
return objToDelete, nil
} }
func (c *Tree) DeleteBucketCORS(ctx context.Context, bktInfo *data.BucketInfo) (oid.Address, error) { func (c *Tree) DeleteBucketCORS(ctx context.Context, bktInfo *data.BucketInfo) ([]oid.Address, error) {
node, err := c.getSystemNode(ctx, bktInfo, []string{corsFilename}) multiNode, err := c.getSystemNode(ctx, bktInfo, corsFilename)
if err != nil && !errors.Is(err, layer.ErrNodeNotFound) { isErrNotFound := errors.Is(err, layer.ErrNodeNotFound)
return oid.Address{}, err if err != nil && !isErrNotFound {
return nil, err
} }
if node != nil { if isErrNotFound {
ind := node.GetLatestNodeIndex() return nil, layer.ErrNoNodeToRemove
if node.IsSplit() {
c.reqLogger(ctx).Warn(logs.FoundSeveralBucketCorsNodes)
} }
addr, err := getCORSAddress(node) objToDelete := c.cleanOldNodes(ctx, multiNode.nodes, bktInfo)
if err != nil { if len(objToDelete) != len(multiNode.nodes) {
return oid.Address{}, fmt.Errorf("couldn't get cors object addr: %w", err) return nil, fmt.Errorf("clean old cors nodes: %w", err)
} }
return addr, c.service.RemoveNode(ctx, bktInfo, systemTree, node.ID[ind]) return objToDelete, nil
}
return oid.Address{}, layer.ErrNoNodeToRemove
} }
func getCORSAddress(node *treeNode) (oid.Address, error) { func getCORSAddress(node *treeNode) (oid.Address, error) {
@ -524,6 +581,29 @@ func getCORSAddress(node *treeNode) (oid.Address, error) {
return addr, nil return addr, nil
} }
func (c *Tree) cleanOldNodes(ctx context.Context, nodes []*treeNode, bktInfo *data.BucketInfo) []oid.Address {
res := make([]oid.Address, 0, len(nodes))
for _, node := range nodes {
ind := node.GetLatestNodeIndex()
if node.IsSplit() {
c.reqLogger(ctx).Error(logs.SystemNodeHasMultipleIDs, zap.String("FileName", node.Meta[FileNameKey]), zap.Uint64s("ids", node.ID))
}
if err := c.service.RemoveNode(ctx, bktInfo, systemTree, node.ID[ind]); err != nil {
c.reqLogger(ctx).Warn(logs.FailedToRemoveOldSystemNode, zap.String("FileName", node.Meta[FileNameKey]), zap.Uint64("id", node.ID[ind]))
} else {
addr, err := getCORSAddress(node)
if err != nil {
c.log.Warn(logs.FailedToParseAddressInTreeNode, zap.String("FileName", node.Meta[FileNameKey]), zap.Uint64("id", node.ID[ind]))
continue
}
res = append(res, addr)
}
}
return res
}
func (c *Tree) GetObjectTagging(ctx context.Context, bktInfo *data.BucketInfo, objVersion *data.NodeVersion) (map[string]string, error) { func (c *Tree) GetObjectTagging(ctx context.Context, bktInfo *data.BucketInfo, objVersion *data.NodeVersion) (map[string]string, error) {
tagNode, err := c.getTreeNode(ctx, bktInfo, objVersion.ID, isTagKV) tagNode, err := c.getTreeNode(ctx, bktInfo, objVersion.ID, isTagKV)
if err != nil { if err != nil {
@ -569,7 +649,7 @@ func (c *Tree) PutObjectTagging(ctx context.Context, bktInfo *data.BucketInfo, o
ind := tagNode.GetLatestNodeIndex() ind := tagNode.GetLatestNodeIndex()
if tagNode.IsSplit() { if tagNode.IsSplit() {
c.reqLogger(ctx).Warn(logs.FoundSeveralObjectTaggingNodes) c.reqLogger(ctx).Error(logs.ObjectTaggingNodeHasMultipleIDs)
} }
return c.service.MoveNode(ctx, bktInfo, versionTree, tagNode.ID[ind], objVersion.ID, treeTagSet) return c.service.MoveNode(ctx, bktInfo, versionTree, tagNode.ID[ind], objVersion.ID, treeTagSet)
@ -580,14 +660,14 @@ func (c *Tree) DeleteObjectTagging(ctx context.Context, bktInfo *data.BucketInfo
} }
func (c *Tree) GetBucketTagging(ctx context.Context, bktInfo *data.BucketInfo) (map[string]string, error) { func (c *Tree) GetBucketTagging(ctx context.Context, bktInfo *data.BucketInfo) (map[string]string, error) {
node, err := c.getSystemNode(ctx, bktInfo, []string{bucketTaggingFilename}) multiNode, err := c.getSystemNode(ctx, bktInfo, bucketTaggingFilename)
if err != nil { if err != nil {
return nil, err return nil, err
} }
tags := make(map[string]string) tags := make(map[string]string)
for key, val := range node.Meta { for key, val := range multiNode.Latest().Meta {
if strings.HasPrefix(key, userDefinedTagPrefix) { if strings.HasPrefix(key, userDefinedTagPrefix) {
tags[strings.TrimPrefix(key, userDefinedTagPrefix)] = val tags[strings.TrimPrefix(key, userDefinedTagPrefix)] = val
} }
@ -597,7 +677,7 @@ func (c *Tree) GetBucketTagging(ctx context.Context, bktInfo *data.BucketInfo) (
} }
func (c *Tree) PutBucketTagging(ctx context.Context, bktInfo *data.BucketInfo, tagSet map[string]string) error { func (c *Tree) PutBucketTagging(ctx context.Context, bktInfo *data.BucketInfo, tagSet map[string]string) error {
node, err := c.getSystemNode(ctx, bktInfo, []string{bucketTaggingFilename}) multiNode, err := c.getSystemNode(ctx, bktInfo, bucketTaggingFilename)
isErrNotFound := errors.Is(err, layer.ErrNodeNotFound) isErrNotFound := errors.Is(err, layer.ErrNodeNotFound)
if err != nil && !isErrNotFound { if err != nil && !isErrNotFound {
return fmt.Errorf("couldn't get node: %w", err) return fmt.Errorf("couldn't get node: %w", err)
@ -615,12 +695,19 @@ func (c *Tree) PutBucketTagging(ctx context.Context, bktInfo *data.BucketInfo, t
return err return err
} }
ind := node.GetLatestNodeIndex() latest := multiNode.Latest()
if node.IsSplit() { ind := latest.GetLatestNodeIndex()
c.reqLogger(ctx).Warn(logs.FoundSeveralBucketTaggingNodes) if latest.IsSplit() {
c.reqLogger(ctx).Error(logs.BucketTaggingNodeHasMultipleIDs, zap.Uint64s("ids", latest.ID))
} }
return c.service.MoveNode(ctx, bktInfo, systemTree, node.ID[ind], 0, treeTagSet) if err = c.service.MoveNode(ctx, bktInfo, systemTree, latest.ID[ind], 0, treeTagSet); err != nil {
return fmt.Errorf("move bucket tagging node: %w", err)
}
c.cleanOldNodes(ctx, multiNode.Old(), bktInfo)
return nil
} }
func (c *Tree) DeleteBucketTagging(ctx context.Context, bktInfo *data.BucketInfo) error { func (c *Tree) DeleteBucketTagging(ctx context.Context, bktInfo *data.BucketInfo) error {
@ -643,6 +730,8 @@ func (c *Tree) getTreeNodes(ctx context.Context, bktInfo *data.BucketInfo, nodeI
return nil, err return nil, err
} }
// consider using map[string][]*treeNode
// to be able to remove unused node, that can be added during split
treeNodes := make(map[string]*treeNode, len(keys)) treeNodes := make(map[string]*treeNode, len(keys))
for _, s := range subtree { for _, s := range subtree {
@ -717,26 +806,6 @@ func getLatestVersionNode(nodes []NodeResponse) (NodeResponse, error) {
return nodes[targetIndexNode], nil return nodes[targetIndexNode], nil
} }
func getLatestNode(nodes []NodeResponse) NodeResponse {
if len(nodes) == 0 {
return nil
}
var (
index int
maxTimestamp uint64
)
for i, node := range nodes {
if timestamp := getMaxTimestamp(node); timestamp > maxTimestamp {
index = i
maxTimestamp = timestamp
}
}
return nodes[index]
}
func getMaxTimestamp(node NodeResponse) uint64 { func getMaxTimestamp(node NodeResponse) uint64 {
var maxTimestamp uint64 var maxTimestamp uint64
@ -1586,11 +1655,11 @@ func metaFromMultipart(info *data.MultipartInfo, fileName string) map[string]str
return info.Meta return info.Meta
} }
func (c *Tree) getSystemNode(ctx context.Context, bktInfo *data.BucketInfo, path []string) (*treeNode, error) { func (c *Tree) getSystemNode(ctx context.Context, bktInfo *data.BucketInfo, name string) (*multiSystemNode, error) {
p := &GetNodesParams{ p := &GetNodesParams{
BktInfo: bktInfo, BktInfo: bktInfo,
TreeID: systemTree, TreeID: systemTree,
Path: path, Path: []string{name},
LatestOnly: false, LatestOnly: false,
AllAttrs: true, AllAttrs: true,
} }
@ -1605,10 +1674,10 @@ func (c *Tree) getSystemNode(ctx context.Context, bktInfo *data.BucketInfo, path
return nil, layer.ErrNodeNotFound return nil, layer.ErrNodeNotFound
} }
if len(nodes) != 1 { if len(nodes) != 1 {
c.reqLogger(ctx).Warn(logs.FoundSeveralSystemNodes, zap.String("path", strings.Join(path, "/"))) c.reqLogger(ctx).Warn(logs.FoundSeveralSystemNodes, zap.String("name", name))
} }
return newTreeNode(getLatestNode(nodes)) return newMultiNode(nodes)
} }
func filterMultipartNodes(nodes []NodeResponse) []NodeResponse { func filterMultipartNodes(nodes []NodeResponse) []NodeResponse {