[#437] tree: Add cleanOldNodes method

Signed-off-by: Denis Kirillov <d.kirillov@yadro.com>
This commit is contained in:
Denis Kirillov 2024-07-23 16:47:05 +03:00
parent 689f7ee818
commit 2bae704d3e
2 changed files with 26 additions and 46 deletions

View file

@ -146,10 +146,8 @@ const (
BucketTaggingNodeHasMultipleIDs = "bucket tagging node has multiple ids" BucketTaggingNodeHasMultipleIDs = "bucket tagging node has multiple ids"
BucketSettingsNodeHasMultipleIDs = "bucket settings node has multiple ids" BucketSettingsNodeHasMultipleIDs = "bucket settings node has multiple ids"
BucketCORSNodeHasMultipleIDs = "bucket cors node has multiple ids" BucketCORSNodeHasMultipleIDs = "bucket cors node has multiple ids"
FailedToRemoveOldBucketSettingsNode = "failed to remove old bucket settings node" SystemNodeHasMultipleIDs = "system node has multiple ids"
FailedToRemoveOldBucketTaggingNode = "failed to remove old bucket tagging node" FailedToRemoveOldSystemNode = "failed to remove old system node"
FailedToRemoveOldBucketCORSNode = "failed to remove old bucket cors node"
FailedToRemoveOldPartNode = "failed to remove old part node"
UnexpectedMultiNodeIDsInSubTreeMultiParts = "unexpected multi node ids in sub tree multi parts" UnexpectedMultiNodeIDsInSubTreeMultiParts = "unexpected multi node ids in sub tree multi parts"
FoundSeveralSystemNodes = "found several system nodes" FoundSeveralSystemNodes = "found several system nodes"
FailedToParsePartInfo = "failed to parse part info" FailedToParsePartInfo = "failed to parse part info"

View file

@ -491,15 +491,7 @@ func (c *Tree) PutSettingsNode(ctx context.Context, bktInfo *data.BucketInfo, se
return fmt.Errorf("move settings node: %w", err) return fmt.Errorf("move settings node: %w", err)
} }
for _, node := range multiNode.Old() { c.cleanOldNodes(ctx, multiNode.Old(), bktInfo)
ind = node.GetLatestNodeIndex()
if node.IsSplit() {
c.reqLogger(ctx).Error(logs.BucketSettingsNodeHasMultipleIDs, zap.Uint64s("ids", node.ID))
}
if err = c.service.RemoveNode(ctx, bktInfo, systemTree, node.ID[ind]); err != nil {
c.reqLogger(ctx).Warn(logs.FailedToRemoveOldBucketSettingsNode, zap.Uint64("id", node.ID[ind]))
}
}
return nil return nil
} }
@ -544,17 +536,7 @@ func (c *Tree) PutBucketCORS(ctx context.Context, bktInfo *data.BucketInfo, objI
objToDelete := make([]oid.ID, 1, len(multiNode.nodes)) objToDelete := make([]oid.ID, 1, len(multiNode.nodes))
objToDelete[0] = latest.ObjID objToDelete[0] = latest.ObjID
for _, node := range multiNode.Old() { objToDelete = append(objToDelete, c.cleanOldNodes(ctx, multiNode.Old(), bktInfo)...)
ind = node.GetLatestNodeIndex()
if node.IsSplit() {
c.reqLogger(ctx).Error(logs.BucketCORSNodeHasMultipleIDs, zap.Uint64s("ids", node.ID))
}
if err = c.service.RemoveNode(ctx, bktInfo, systemTree, node.ID[ind]); err != nil {
c.reqLogger(ctx).Warn(logs.FailedToRemoveOldBucketCORSNode, zap.Uint64("id", node.ID[ind]))
} else {
objToDelete = append(objToDelete, node.ObjID)
}
}
return objToDelete, nil return objToDelete, nil
} }
@ -570,24 +552,32 @@ func (c *Tree) DeleteBucketCORS(ctx context.Context, bktInfo *data.BucketInfo) (
return nil, layer.ErrNoNodeToRemove return nil, layer.ErrNoNodeToRemove
} }
objToDelete := make([]oid.ID, len(multiNode.nodes)) objToDelete := c.cleanOldNodes(ctx, multiNode.nodes, bktInfo)
if len(objToDelete) != len(multiNode.nodes) {
for i, node := range multiNode.nodes { return nil, fmt.Errorf("clean old cors nodes: %w", err)
ind := node.GetLatestNodeIndex()
if node.IsSplit() {
c.reqLogger(ctx).Error(logs.BucketCORSNodeHasMultipleIDs, zap.Uint64s("ids", node.ID))
}
if err = c.service.RemoveNode(ctx, bktInfo, systemTree, node.ID[ind]); err != nil {
return nil, fmt.Errorf("delete cors node '%d': %w", node.ID[ind], err)
}
objToDelete[i] = node.ObjID
} }
return objToDelete, nil return objToDelete, nil
} }
func (c *Tree) cleanOldNodes(ctx context.Context, nodes []*treeNode, bktInfo *data.BucketInfo) []oid.ID {
res := make([]oid.ID, 0, len(nodes))
for _, node := range nodes {
ind := node.GetLatestNodeIndex()
if node.IsSplit() {
c.reqLogger(ctx).Error(logs.SystemNodeHasMultipleIDs, zap.String("FileName", node.Meta[FileNameKey]), zap.Uint64s("ids", node.ID))
}
if err := c.service.RemoveNode(ctx, bktInfo, systemTree, node.ID[ind]); err != nil {
c.reqLogger(ctx).Warn(logs.FailedToRemoveOldSystemNode, zap.String("FileName", node.Meta[FileNameKey]), zap.Uint64("id", node.ID[ind]))
} else {
res = append(res, node.ObjID)
}
}
return res
}
func (c *Tree) GetObjectTagging(ctx context.Context, bktInfo *data.BucketInfo, objVersion *data.NodeVersion) (map[string]string, error) { func (c *Tree) GetObjectTagging(ctx context.Context, bktInfo *data.BucketInfo, objVersion *data.NodeVersion) (map[string]string, error) {
tagNode, err := c.getTreeNode(ctx, bktInfo, objVersion.ID, isTagKV) tagNode, err := c.getTreeNode(ctx, bktInfo, objVersion.ID, isTagKV)
if err != nil { if err != nil {
@ -689,15 +679,7 @@ func (c *Tree) PutBucketTagging(ctx context.Context, bktInfo *data.BucketInfo, t
return fmt.Errorf("move bucket tagging node: %w", err) return fmt.Errorf("move bucket tagging node: %w", err)
} }
for _, node := range multiNode.Old() { c.cleanOldNodes(ctx, multiNode.Old(), bktInfo)
ind = node.GetLatestNodeIndex()
if node.IsSplit() {
c.reqLogger(ctx).Error(logs.BucketTaggingNodeHasMultipleIDs, zap.Uint64s("ids", node.ID))
}
if err = c.service.RemoveNode(ctx, bktInfo, systemTree, node.ID[ind]); err != nil {
c.reqLogger(ctx).Warn(logs.FailedToRemoveOldBucketTaggingNode, zap.Uint64("id", node.ID[ind]))
}
}
return nil return nil
} }