port: tree: Support removing old split system nodes #444

Merged
alexvanin merged 2 commits from alexvanin/frostfs-s3-gw:port/settings-split into master 2024-07-26 06:54:10 +00:00
5 changed files with 160 additions and 93 deletions

View file

@ -60,14 +60,16 @@ func (n *Layer) PutBucketCORS(ctx context.Context, p *PutCORSParams) error {
return fmt.Errorf("put cors object: %w", err) return fmt.Errorf("put cors object: %w", err)
} }
objToDelete, err := n.treeService.PutBucketCORS(ctx, p.BktInfo, newAddress(corsBkt.CID, objID)) objsToDelete, err := n.treeService.PutBucketCORS(ctx, p.BktInfo, newAddress(corsBkt.CID, objID))
objToDeleteNotFound := errorsStd.Is(err, ErrNoNodeToRemove) objToDeleteNotFound := errorsStd.Is(err, ErrNoNodeToRemove)
if err != nil && !objToDeleteNotFound { if err != nil && !objToDeleteNotFound {
return err return err
} }
if !objToDeleteNotFound { if !objToDeleteNotFound {
n.deleteCORSObject(ctx, p.BktInfo, objToDelete) for _, addr := range objsToDelete {
n.deleteCORSObject(ctx, p.BktInfo, addr)
}
} }
n.cache.PutCORS(n.BearerOwner(ctx), p.BktInfo, cors) n.cache.PutCORS(n.BearerOwner(ctx), p.BktInfo, cors)
@ -101,22 +103,15 @@ func (n *Layer) GetBucketCORS(ctx context.Context, bktInfo *data.BucketInfo) (*d
} }
func (n *Layer) DeleteBucketCORS(ctx context.Context, bktInfo *data.BucketInfo) error { func (n *Layer) DeleteBucketCORS(ctx context.Context, bktInfo *data.BucketInfo) error {
obj, err := n.treeService.DeleteBucketCORS(ctx, bktInfo) objs, err := n.treeService.DeleteBucketCORS(ctx, bktInfo)
objNotFound := errorsStd.Is(err, ErrNoNodeToRemove) objNotFound := errorsStd.Is(err, ErrNoNodeToRemove)
if err != nil && !objNotFound { if err != nil && !objNotFound {
return err return err
} }
if !objNotFound { if !objNotFound {
var prmAuth PrmAuth for _, addr := range objs {
corsBkt := bktInfo n.deleteCORSObject(ctx, bktInfo, addr)
alexvanin marked this conversation as resolved Outdated

@mbiryukova is this change legal? We've used deleteCORSObject in PutBucketCORS so I've decided to reuse it in DeleteBucketCORS. Can I do that?

@mbiryukova is this change legal? We've used `deleteCORSObject` in `PutBucketCORS` so I've decided to reuse it in `DeleteBucketCORS`. Can I do that?

The only difference is that error was returned. With multiple deletions I guess it's ok to just log them

The only difference is that error was returned. With multiple deletions I guess it's ok to just log them
if !obj.Container().Equals(bktInfo.CID) && !obj.Container().Equals(cid.ID{}) {
corsBkt = &data.BucketInfo{CID: obj.Container()}
prmAuth.PrivateKey = &n.gateKey.PrivateKey
}
if err = n.objectDeleteWithAuth(ctx, corsBkt, obj.Object(), prmAuth); err != nil {
return fmt.Errorf("delete cors object: %w", err)
} }
} }

View file

@ -127,7 +127,7 @@ func (t *TreeServiceMock) GetBucketCORS(_ context.Context, bktInfo *data.BucketI
return addr, nil return addr, nil
} }
func (t *TreeServiceMock) PutBucketCORS(_ context.Context, bktInfo *data.BucketInfo, addr oid.Address) (oid.Address, error) { func (t *TreeServiceMock) PutBucketCORS(_ context.Context, bktInfo *data.BucketInfo, addr oid.Address) ([]oid.Address, error) {
systemMap, ok := t.system[bktInfo.CID.EncodeToString()] systemMap, ok := t.system[bktInfo.CID.EncodeToString()]
if !ok { if !ok {
systemMap = make(map[string]*data.BaseNodeVersion) systemMap = make(map[string]*data.BaseNodeVersion)
@ -139,10 +139,10 @@ func (t *TreeServiceMock) PutBucketCORS(_ context.Context, bktInfo *data.BucketI
t.system[bktInfo.CID.EncodeToString()] = systemMap t.system[bktInfo.CID.EncodeToString()] = systemMap
return oid.Address{}, ErrNoNodeToRemove return nil, ErrNoNodeToRemove
} }
func (t *TreeServiceMock) DeleteBucketCORS(context.Context, *data.BucketInfo) (oid.Address, error) { func (t *TreeServiceMock) DeleteBucketCORS(context.Context, *data.BucketInfo) ([]oid.Address, error) {
panic("implement me") panic("implement me")
} }

View file

@ -25,13 +25,13 @@ type TreeService interface {
// PutBucketCORS puts a node to a system tree and returns objectID of a previous cors config which must be deleted in FrostFS. // PutBucketCORS puts a node to a system tree and returns objectID of a previous cors config which must be deleted in FrostFS.
// //
// If object id to remove is not found returns ErrNoNodeToRemove error. // If object ids to remove is not found returns ErrNoNodeToRemove error.
PutBucketCORS(ctx context.Context, bktInfo *data.BucketInfo, addr oid.Address) (oid.Address, error) PutBucketCORS(ctx context.Context, bktInfo *data.BucketInfo, addr oid.Address) ([]oid.Address, error)
// DeleteBucketCORS removes a node from a system tree and returns objID which must be deleted in FrostFS. // DeleteBucketCORS removes a node from a system tree and returns objID which must be deleted in FrostFS.
// //
// If object id to remove is not found returns ErrNoNodeToRemove error. // If object ids to remove is not found returns ErrNoNodeToRemove error.
DeleteBucketCORS(ctx context.Context, bktInfo *data.BucketInfo) (oid.Address, error) DeleteBucketCORS(ctx context.Context, bktInfo *data.BucketInfo) ([]oid.Address, error)
GetObjectTagging(ctx context.Context, bktInfo *data.BucketInfo, objVersion *data.NodeVersion) (map[string]string, error) GetObjectTagging(ctx context.Context, bktInfo *data.BucketInfo, objVersion *data.NodeVersion) (map[string]string, error)
PutObjectTagging(ctx context.Context, bktInfo *data.BucketInfo, objVersion *data.NodeVersion, tagSet map[string]string) error PutObjectTagging(ctx context.Context, bktInfo *data.BucketInfo, objVersion *data.NodeVersion, tagSet map[string]string) error

View file

@ -142,12 +142,15 @@ const (
CouldntCacheSubject = "couldn't cache subject info" CouldntCacheSubject = "couldn't cache subject info"
UserGroupsListIsEmpty = "user groups list is empty, subject not found" UserGroupsListIsEmpty = "user groups list is empty, subject not found"
CouldntCacheUserKey = "couldn't cache user key" CouldntCacheUserKey = "couldn't cache user key"
FoundSeveralBucketCorsNodes = "found several bucket cors nodes, latest be used" ObjectTaggingNodeHasMultipleIDs = "object tagging node has multiple ids"
FoundSeveralObjectTaggingNodes = "found several object tagging nodes, latest be used" BucketTaggingNodeHasMultipleIDs = "bucket tagging node has multiple ids"
FoundSeveralBucketTaggingNodes = "found several bucket tagging nodes, latest be used" BucketSettingsNodeHasMultipleIDs = "bucket settings node has multiple ids"
FoundSeveralBucketSettingsNodes = "found several bucket settings nodes, latest be used" BucketCORSNodeHasMultipleIDs = "bucket cors node has multiple ids"
SystemNodeHasMultipleIDs = "system node has multiple ids"
FailedToRemoveOldSystemNode = "failed to remove old system node"
FailedToParseAddressInTreeNode = "failed to parse object addr in tree node"
UnexpectedMultiNodeIDsInSubTreeMultiParts = "unexpected multi node ids in sub tree multi parts" UnexpectedMultiNodeIDsInSubTreeMultiParts = "unexpected multi node ids in sub tree multi parts"
FoundSeveralSystemNodes = "found several system nodes, latest be used" FoundSeveralSystemNodes = "found several system nodes"
FailedToParsePartInfo = "failed to parse part info" FailedToParsePartInfo = "failed to parse part info"
CouldNotFetchCORSContainerInfo = "couldn't fetch CORS container info" CouldNotFetchCORSContainerInfo = "couldn't fetch CORS container info"
CloseCredsObjectPayload = "close creds object payload" CloseCredsObjectPayload = "close creds object payload"

View file

@ -54,6 +54,11 @@ type (
Meta map[string]string Meta map[string]string
} }
multiSystemNode struct {
// the first element is latest
nodes []*treeNode
}
GetNodesParams struct { GetNodesParams struct {
BktInfo *data.BucketInfo BktInfo *data.BucketInfo
TreeID string TreeID string
@ -270,6 +275,45 @@ func newNodeVersionFromTreeNode(log *zap.Logger, filePath string, treeNode *tree
return version, nil return version, nil
} }
func newMultiNode(nodes []NodeResponse) (*multiSystemNode, error) {
var (
err error
index int
maxTimestamp uint64
)
if len(nodes) == 0 {
return nil, errors.New("multi node must have at least one node")
}
treeNodes := make([]*treeNode, len(nodes))
for i, node := range nodes {
if treeNodes[i], err = newTreeNode(node); err != nil {
return nil, fmt.Errorf("parse system node response: %w", err)
}
if timestamp := getMaxTimestamp(node); timestamp > maxTimestamp {
index = i
maxTimestamp = timestamp
}
}
treeNodes[0], treeNodes[index] = treeNodes[index], treeNodes[0]
return &multiSystemNode{
nodes: treeNodes,
}, nil
}
func (m *multiSystemNode) Latest() *treeNode {
return m.nodes[0]
}
func (m *multiSystemNode) Old() []*treeNode {
return m.nodes[1:]
}
func newMultipartInfoFromTreeNode(log *zap.Logger, filePath string, treeNode *treeNode) (*data.MultipartInfo, error) { func newMultipartInfoFromTreeNode(log *zap.Logger, filePath string, treeNode *treeNode) (*data.MultipartInfo, error) {
uploadID, _ := treeNode.Get(uploadIDKV) uploadID, _ := treeNode.Get(uploadIDKV)
if uploadID == "" { if uploadID == "" {
@ -396,11 +440,13 @@ func newPartInfo(node NodeResponse) (*data.PartInfo, error) {
} }
func (c *Tree) GetSettingsNode(ctx context.Context, bktInfo *data.BucketInfo) (*data.BucketSettings, error) { func (c *Tree) GetSettingsNode(ctx context.Context, bktInfo *data.BucketInfo) (*data.BucketSettings, error) {
node, err := c.getSystemNode(ctx, bktInfo, []string{settingsFileName}) multiNode, err := c.getSystemNode(ctx, bktInfo, settingsFileName)
if err != nil { if err != nil {
return nil, fmt.Errorf("couldn't get node: %w", err) return nil, fmt.Errorf("couldn't get node: %w", err)
} }
node := multiNode.Latest()
settings := &data.BucketSettings{Versioning: data.VersioningUnversioned} settings := &data.BucketSettings{Versioning: data.VersioningUnversioned}
if versioningValue, ok := node.Get(versioningKV); ok { if versioningValue, ok := node.Get(versioningKV); ok {
settings.Versioning = versioningValue settings.Versioning = versioningValue
@ -424,7 +470,7 @@ func (c *Tree) GetSettingsNode(ctx context.Context, bktInfo *data.BucketInfo) (*
} }
func (c *Tree) PutSettingsNode(ctx context.Context, bktInfo *data.BucketInfo, settings *data.BucketSettings) error { func (c *Tree) PutSettingsNode(ctx context.Context, bktInfo *data.BucketInfo, settings *data.BucketSettings) error {
node, err := c.getSystemNode(ctx, bktInfo, []string{settingsFileName}) multiNode, err := c.getSystemNode(ctx, bktInfo, settingsFileName)
isErrNotFound := errors.Is(err, layer.ErrNodeNotFound) isErrNotFound := errors.Is(err, layer.ErrNodeNotFound)
if err != nil && !isErrNotFound { if err != nil && !isErrNotFound {
return fmt.Errorf("couldn't get node: %w", err) return fmt.Errorf("couldn't get node: %w", err)
@ -437,28 +483,35 @@ func (c *Tree) PutSettingsNode(ctx context.Context, bktInfo *data.BucketInfo, se
return err return err
} }
ind := node.GetLatestNodeIndex() latest := multiNode.Latest()
if node.IsSplit() { ind := latest.GetLatestNodeIndex()
c.reqLogger(ctx).Warn(logs.FoundSeveralBucketSettingsNodes) if latest.IsSplit() {
c.reqLogger(ctx).Error(logs.BucketSettingsNodeHasMultipleIDs, zap.Uint64s("ids", latest.ID))
} }
return c.service.MoveNode(ctx, bktInfo, systemTree, node.ID[ind], 0, meta) if err = c.service.MoveNode(ctx, bktInfo, systemTree, latest.ID[ind], 0, meta); err != nil {
return fmt.Errorf("move settings node: %w", err)
}
c.cleanOldNodes(ctx, multiNode.Old(), bktInfo)
return nil
} }
func (c *Tree) GetBucketCORS(ctx context.Context, bktInfo *data.BucketInfo) (oid.Address, error) { func (c *Tree) GetBucketCORS(ctx context.Context, bktInfo *data.BucketInfo) (oid.Address, error) {
node, err := c.getSystemNode(ctx, bktInfo, []string{corsFilename}) node, err := c.getSystemNode(ctx, bktInfo, corsFilename)
if err != nil { if err != nil {
return oid.Address{}, err return oid.Address{}, err
} }
return getCORSAddress(node) return getTreeNodeAddress(node.Latest())
} }
func (c *Tree) PutBucketCORS(ctx context.Context, bktInfo *data.BucketInfo, addr oid.Address) (oid.Address, error) { func (c *Tree) PutBucketCORS(ctx context.Context, bktInfo *data.BucketInfo, addr oid.Address) ([]oid.Address, error) {
node, err := c.getSystemNode(ctx, bktInfo, []string{corsFilename}) multiNode, err := c.getSystemNode(ctx, bktInfo, corsFilename)
isErrNotFound := errors.Is(err, layer.ErrNodeNotFound) isErrNotFound := errors.Is(err, layer.ErrNodeNotFound)
if err != nil && !isErrNotFound { if err != nil && !isErrNotFound {
return oid.Address{}, fmt.Errorf("couldn't get node: %w", err) return nil, fmt.Errorf("couldn't get node: %w", err)
} }
meta := make(map[string]string) meta := make(map[string]string)
@ -468,48 +521,52 @@ func (c *Tree) PutBucketCORS(ctx context.Context, bktInfo *data.BucketInfo, addr
if isErrNotFound { if isErrNotFound {
if _, err = c.service.AddNode(ctx, bktInfo, systemTree, 0, meta); err != nil { if _, err = c.service.AddNode(ctx, bktInfo, systemTree, 0, meta); err != nil {
return oid.Address{}, err return nil, err
} }
return oid.Address{}, layer.ErrNoNodeToRemove return nil, layer.ErrNoNodeToRemove
} }
ind := node.GetLatestNodeIndex() latest := multiNode.Latest()
if node.IsSplit() { ind := latest.GetLatestNodeIndex()
c.reqLogger(ctx).Warn(logs.FoundSeveralBucketCorsNodes) if latest.IsSplit() {
c.reqLogger(ctx).Error(logs.BucketCORSNodeHasMultipleIDs)
} }
prevAddr, err := getCORSAddress(node) if err = c.service.MoveNode(ctx, bktInfo, systemTree, latest.ID[ind], 0, meta); err != nil {
return nil, fmt.Errorf("move cors node: %w", err)
}
objToDelete := make([]oid.Address, 1, len(multiNode.nodes))
objToDelete[0], err = getTreeNodeAddress(latest)
if err != nil { if err != nil {
return oid.Address{}, fmt.Errorf("couldn't get cors object addr: %w", err) return nil, fmt.Errorf("parse object addr of latest cors node in tree: %w", err)
} }
return prevAddr, c.service.MoveNode(ctx, bktInfo, systemTree, node.ID[ind], 0, meta) objToDelete = append(objToDelete, c.cleanOldNodes(ctx, multiNode.Old(), bktInfo)...)
return objToDelete, nil
} }
func (c *Tree) DeleteBucketCORS(ctx context.Context, bktInfo *data.BucketInfo) (oid.Address, error) { func (c *Tree) DeleteBucketCORS(ctx context.Context, bktInfo *data.BucketInfo) ([]oid.Address, error) {
node, err := c.getSystemNode(ctx, bktInfo, []string{corsFilename}) multiNode, err := c.getSystemNode(ctx, bktInfo, corsFilename)
if err != nil && !errors.Is(err, layer.ErrNodeNotFound) { isErrNotFound := errors.Is(err, layer.ErrNodeNotFound)
return oid.Address{}, err if err != nil && !isErrNotFound {
return nil, err
} }
if node != nil { if isErrNotFound {
ind := node.GetLatestNodeIndex() return nil, layer.ErrNoNodeToRemove
if node.IsSplit() {
c.reqLogger(ctx).Warn(logs.FoundSeveralBucketCorsNodes)
} }
addr, err := getCORSAddress(node) objToDelete := c.cleanOldNodes(ctx, multiNode.nodes, bktInfo)
if err != nil { if len(objToDelete) != len(multiNode.nodes) {
return oid.Address{}, fmt.Errorf("couldn't get cors object addr: %w", err) return nil, fmt.Errorf("clean old cors nodes: %w", err)
} }
return addr, c.service.RemoveNode(ctx, bktInfo, systemTree, node.ID[ind]) return objToDelete, nil
}
return oid.Address{}, layer.ErrNoNodeToRemove
} }
func getCORSAddress(node *treeNode) (oid.Address, error) { func getTreeNodeAddress(node *treeNode) (oid.Address, error) {

Let's rename this function to getTreeNodeAddress for example

Let's rename this function to `getTreeNodeAddress` for example
var addr oid.Address var addr oid.Address
addr.SetObject(node.ObjID) addr.SetObject(node.ObjID)
@ -524,6 +581,29 @@ func getCORSAddress(node *treeNode) (oid.Address, error) {
return addr, nil return addr, nil
} }
func (c *Tree) cleanOldNodes(ctx context.Context, nodes []*treeNode, bktInfo *data.BucketInfo) []oid.Address {
res := make([]oid.Address, 0, len(nodes))
for _, node := range nodes {
ind := node.GetLatestNodeIndex()
if node.IsSplit() {
c.reqLogger(ctx).Error(logs.SystemNodeHasMultipleIDs, zap.String("FileName", node.Meta[FileNameKey]), zap.Uint64s("ids", node.ID))
}
if err := c.service.RemoveNode(ctx, bktInfo, systemTree, node.ID[ind]); err != nil {
c.reqLogger(ctx).Warn(logs.FailedToRemoveOldSystemNode, zap.String("FileName", node.Meta[FileNameKey]), zap.Uint64("id", node.ID[ind]))
} else {
addr, err := getTreeNodeAddress(node)
if err != nil {
c.log.Warn(logs.FailedToParseAddressInTreeNode, zap.String("FileName", node.Meta[FileNameKey]), zap.Uint64("id", node.ID[ind]))
continue
}
res = append(res, addr)
}
}
return res
}
func (c *Tree) GetObjectTagging(ctx context.Context, bktInfo *data.BucketInfo, objVersion *data.NodeVersion) (map[string]string, error) { func (c *Tree) GetObjectTagging(ctx context.Context, bktInfo *data.BucketInfo, objVersion *data.NodeVersion) (map[string]string, error) {
tagNode, err := c.getTreeNode(ctx, bktInfo, objVersion.ID, isTagKV) tagNode, err := c.getTreeNode(ctx, bktInfo, objVersion.ID, isTagKV)
if err != nil { if err != nil {
@ -569,7 +649,7 @@ func (c *Tree) PutObjectTagging(ctx context.Context, bktInfo *data.BucketInfo, o
ind := tagNode.GetLatestNodeIndex() ind := tagNode.GetLatestNodeIndex()
if tagNode.IsSplit() { if tagNode.IsSplit() {
c.reqLogger(ctx).Warn(logs.FoundSeveralObjectTaggingNodes) c.reqLogger(ctx).Error(logs.ObjectTaggingNodeHasMultipleIDs)
} }
return c.service.MoveNode(ctx, bktInfo, versionTree, tagNode.ID[ind], objVersion.ID, treeTagSet) return c.service.MoveNode(ctx, bktInfo, versionTree, tagNode.ID[ind], objVersion.ID, treeTagSet)
@ -580,14 +660,14 @@ func (c *Tree) DeleteObjectTagging(ctx context.Context, bktInfo *data.BucketInfo
} }
func (c *Tree) GetBucketTagging(ctx context.Context, bktInfo *data.BucketInfo) (map[string]string, error) { func (c *Tree) GetBucketTagging(ctx context.Context, bktInfo *data.BucketInfo) (map[string]string, error) {
node, err := c.getSystemNode(ctx, bktInfo, []string{bucketTaggingFilename}) multiNode, err := c.getSystemNode(ctx, bktInfo, bucketTaggingFilename)
if err != nil { if err != nil {
return nil, err return nil, err
} }
tags := make(map[string]string) tags := make(map[string]string)
for key, val := range node.Meta { for key, val := range multiNode.Latest().Meta {
if strings.HasPrefix(key, userDefinedTagPrefix) { if strings.HasPrefix(key, userDefinedTagPrefix) {
tags[strings.TrimPrefix(key, userDefinedTagPrefix)] = val tags[strings.TrimPrefix(key, userDefinedTagPrefix)] = val
} }
@ -597,7 +677,7 @@ func (c *Tree) GetBucketTagging(ctx context.Context, bktInfo *data.BucketInfo) (
} }
func (c *Tree) PutBucketTagging(ctx context.Context, bktInfo *data.BucketInfo, tagSet map[string]string) error { func (c *Tree) PutBucketTagging(ctx context.Context, bktInfo *data.BucketInfo, tagSet map[string]string) error {
node, err := c.getSystemNode(ctx, bktInfo, []string{bucketTaggingFilename}) multiNode, err := c.getSystemNode(ctx, bktInfo, bucketTaggingFilename)
isErrNotFound := errors.Is(err, layer.ErrNodeNotFound) isErrNotFound := errors.Is(err, layer.ErrNodeNotFound)
if err != nil && !isErrNotFound { if err != nil && !isErrNotFound {
return fmt.Errorf("couldn't get node: %w", err) return fmt.Errorf("couldn't get node: %w", err)
@ -615,12 +695,19 @@ func (c *Tree) PutBucketTagging(ctx context.Context, bktInfo *data.BucketInfo, t
return err return err
} }
ind := node.GetLatestNodeIndex() latest := multiNode.Latest()
if node.IsSplit() { ind := latest.GetLatestNodeIndex()
c.reqLogger(ctx).Warn(logs.FoundSeveralBucketTaggingNodes) if latest.IsSplit() {
c.reqLogger(ctx).Error(logs.BucketTaggingNodeHasMultipleIDs, zap.Uint64s("ids", latest.ID))
} }
return c.service.MoveNode(ctx, bktInfo, systemTree, node.ID[ind], 0, treeTagSet) if err = c.service.MoveNode(ctx, bktInfo, systemTree, latest.ID[ind], 0, treeTagSet); err != nil {
return fmt.Errorf("move bucket tagging node: %w", err)
}
c.cleanOldNodes(ctx, multiNode.Old(), bktInfo)
return nil
} }
func (c *Tree) DeleteBucketTagging(ctx context.Context, bktInfo *data.BucketInfo) error { func (c *Tree) DeleteBucketTagging(ctx context.Context, bktInfo *data.BucketInfo) error {
@ -643,6 +730,8 @@ func (c *Tree) getTreeNodes(ctx context.Context, bktInfo *data.BucketInfo, nodeI
return nil, err return nil, err
} }
// consider using map[string][]*treeNode
// to be able to remove unused node, that can be added during split
treeNodes := make(map[string]*treeNode, len(keys)) treeNodes := make(map[string]*treeNode, len(keys))
for _, s := range subtree { for _, s := range subtree {
@ -717,26 +806,6 @@ func getLatestVersionNode(nodes []NodeResponse) (NodeResponse, error) {
return nodes[targetIndexNode], nil return nodes[targetIndexNode], nil
} }
func getLatestNode(nodes []NodeResponse) NodeResponse {
if len(nodes) == 0 {
return nil
}
var (
index int
maxTimestamp uint64
)
for i, node := range nodes {
if timestamp := getMaxTimestamp(node); timestamp > maxTimestamp {
index = i
maxTimestamp = timestamp
}
}
return nodes[index]
}
func getMaxTimestamp(node NodeResponse) uint64 { func getMaxTimestamp(node NodeResponse) uint64 {
var maxTimestamp uint64 var maxTimestamp uint64
@ -1586,11 +1655,11 @@ func metaFromMultipart(info *data.MultipartInfo, fileName string) map[string]str
return info.Meta return info.Meta
} }
func (c *Tree) getSystemNode(ctx context.Context, bktInfo *data.BucketInfo, path []string) (*treeNode, error) { func (c *Tree) getSystemNode(ctx context.Context, bktInfo *data.BucketInfo, name string) (*multiSystemNode, error) {
p := &GetNodesParams{ p := &GetNodesParams{
BktInfo: bktInfo, BktInfo: bktInfo,
TreeID: systemTree, TreeID: systemTree,
Path: path, Path: []string{name},
LatestOnly: false, LatestOnly: false,
AllAttrs: true, AllAttrs: true,
} }
@ -1605,10 +1674,10 @@ func (c *Tree) getSystemNode(ctx context.Context, bktInfo *data.BucketInfo, path
return nil, layer.ErrNodeNotFound return nil, layer.ErrNodeNotFound
} }
if len(nodes) != 1 { if len(nodes) != 1 {
c.reqLogger(ctx).Warn(logs.FoundSeveralSystemNodes, zap.String("path", strings.Join(path, "/"))) c.reqLogger(ctx).Warn(logs.FoundSeveralSystemNodes, zap.String("name", name))
} }
return newTreeNode(getLatestNode(nodes)) return newMultiNode(nodes)
} }
func filterMultipartNodes(nodes []NodeResponse) []NodeResponse { func filterMultipartNodes(nodes []NodeResponse) []NodeResponse {