[#606] Support log tagging

Signed-off-by: Nikita Zinkevich <n.zinkevich@yadro.com>
This commit is contained in:
Nikita Zinkevich 2025-02-06 13:18:16 +03:00
parent ffac62e8b4
commit e7f620f137
40 changed files with 850 additions and 497 deletions

View file

@ -257,7 +257,7 @@ func newNodeVersionFromTreeNode(log *zap.Logger, filePath string, treeNode *tree
if createdStr, ok := treeNode.Get(createdKV); ok {
if utcMilli, err := strconv.ParseInt(createdStr, 10, 64); err != nil {
log.Warn(logs.InvalidTreeKV, zap.String(createdKV, createdStr), zap.Error(err))
log.Warn(logs.InvalidTreeKV, zap.String(createdKV, createdStr), zap.Error(err), logs.TagField(logs.TagExternalStorageTree))
} else {
created := time.UnixMilli(utcMilli)
version.Created = &created
@ -267,7 +267,7 @@ func newNodeVersionFromTreeNode(log *zap.Logger, filePath string, treeNode *tree
if ownerStr, ok := treeNode.Get(ownerKV); ok {
var owner user.ID
if err := owner.DecodeString(ownerStr); err != nil {
log.Warn(logs.InvalidTreeKV, zap.String(ownerKV, ownerStr), zap.Error(err))
log.Warn(logs.InvalidTreeKV, zap.String(ownerKV, ownerStr), zap.Error(err), logs.TagField(logs.TagExternalStorageTree))
} else {
version.Owner = &owner
}
@ -275,7 +275,7 @@ func newNodeVersionFromTreeNode(log *zap.Logger, filePath string, treeNode *tree
if creationEpoch, ok := treeNode.Get(creationEpochKV); ok {
if epoch, err := strconv.ParseUint(creationEpoch, 10, 64); err != nil {
log.Warn(logs.InvalidTreeKV, zap.String(creationEpochKV, creationEpoch), zap.Error(err))
log.Warn(logs.InvalidTreeKV, zap.String(creationEpochKV, creationEpoch), zap.Error(err), logs.TagField(logs.TagExternalStorageTree))
} else {
version.CreationEpoch = epoch
}
@ -342,13 +342,13 @@ func newMultipartInfoFromTreeNode(log *zap.Logger, filePath string, treeNode *tr
if ownerID, ok := treeNode.Get(ownerKV); ok {
if err := multipartInfo.Owner.DecodeString(ownerID); err != nil {
log.Warn(logs.InvalidTreeKV, zap.String(ownerKV, ownerID), zap.Error(err))
log.Warn(logs.InvalidTreeKV, zap.String(ownerKV, ownerID), zap.Error(err), logs.TagField(logs.TagExternalStorageTree))
}
}
if created, ok := treeNode.Get(createdKV); ok {
if utcMilli, err := strconv.ParseInt(created, 10, 64); err != nil {
log.Warn(logs.InvalidTreeKV, zap.String(createdKV, created), zap.Error(err))
log.Warn(logs.InvalidTreeKV, zap.String(createdKV, created), zap.Error(err), logs.TagField(logs.TagExternalStorageTree))
} else {
multipartInfo.Created = time.UnixMilli(utcMilli)
}
@ -356,7 +356,7 @@ func newMultipartInfoFromTreeNode(log *zap.Logger, filePath string, treeNode *tr
if finished, ok := treeNode.Get(finishedKV); ok {
if flag, err := strconv.ParseBool(finished); err != nil {
log.Warn(logs.InvalidTreeKV, zap.String(finishedKV, finished), zap.Error(err))
log.Warn(logs.InvalidTreeKV, zap.String(finishedKV, finished), zap.Error(err), logs.TagField(logs.TagExternalStorageTree))
} else {
multipartInfo.Finished = flag
}
@ -364,7 +364,7 @@ func newMultipartInfoFromTreeNode(log *zap.Logger, filePath string, treeNode *tr
if creationEpoch, ok := treeNode.Get(creationEpochKV); ok {
if epoch, err := strconv.ParseUint(creationEpoch, 10, 64); err != nil {
log.Warn(logs.InvalidTreeKV, zap.String(creationEpochKV, creationEpoch), zap.Error(err))
log.Warn(logs.InvalidTreeKV, zap.String(creationEpochKV, creationEpoch), zap.Error(err), logs.TagField(logs.TagExternalStorageTree))
} else {
multipartInfo.CreationEpoch = epoch
}
@ -395,23 +395,23 @@ func newMultipartInfo(log *zap.Logger, node NodeResponse) (*data.MultipartInfo,
multipartInfo.Key = string(kv.GetValue())
case createdKV:
if utcMilli, err := strconv.ParseInt(string(kv.GetValue()), 10, 64); err != nil {
log.Warn(logs.InvalidTreeKV, zap.String(createdKV, string(kv.GetValue())), zap.Error(err))
log.Warn(logs.InvalidTreeKV, zap.String(createdKV, string(kv.GetValue())), zap.Error(err), logs.TagField(logs.TagExternalStorageTree))
} else {
multipartInfo.Created = time.UnixMilli(utcMilli)
}
case ownerKV:
if err := multipartInfo.Owner.DecodeString(string(kv.GetValue())); err != nil {
log.Warn(logs.InvalidTreeKV, zap.String(ownerKV, string(kv.GetValue())), zap.Error(err))
log.Warn(logs.InvalidTreeKV, zap.String(ownerKV, string(kv.GetValue())), zap.Error(err), logs.TagField(logs.TagExternalStorageTree))
}
case finishedKV:
if isFinished, err := strconv.ParseBool(string(kv.GetValue())); err != nil {
log.Warn(logs.InvalidTreeKV, zap.String(finishedKV, string(kv.GetValue())), zap.Error(err))
log.Warn(logs.InvalidTreeKV, zap.String(finishedKV, string(kv.GetValue())), zap.Error(err), logs.TagField(logs.TagExternalStorageTree))
} else {
multipartInfo.Finished = isFinished
}
case creationEpochKV:
if epoch, err := strconv.ParseUint(string(kv.GetValue()), 10, 64); err != nil {
log.Warn(logs.InvalidTreeKV, zap.String(creationEpochKV, string(kv.GetValue())), zap.Error(err))
log.Warn(logs.InvalidTreeKV, zap.String(creationEpochKV, string(kv.GetValue())), zap.Error(err), logs.TagField(logs.TagExternalStorageTree))
} else {
multipartInfo.CreationEpoch = epoch
}
@ -515,7 +515,7 @@ func (c *Tree) GetSettingsNode(ctx context.Context, bktInfo *data.BucketInfo) (*
if ownerKeyHex, ok := node.Get(ownerKeyKV); ok {
if settings.OwnerKey, err = keys.NewPublicKeyFromString(ownerKeyHex); err != nil {
c.reqLogger(ctx).Error(logs.SettingsNodeInvalidOwnerKey, zap.Error(err))
c.reqLogger(ctx).Error(logs.SettingsNodeInvalidOwnerKey, zap.Error(err), logs.TagField(logs.TagDatapath))
}
}
@ -539,7 +539,7 @@ func (c *Tree) PutSettingsNode(ctx context.Context, bktInfo *data.BucketInfo, se
latest := multiNode.Latest()
ind := latest.GetLatestNodeIndex()
if latest.IsSplit() {
c.reqLogger(ctx).Error(logs.BucketSettingsNodeHasMultipleIDs, zap.Uint64s("ids", latest.ID))
c.reqLogger(ctx).Error(logs.BucketSettingsNodeHasMultipleIDs, zap.Uint64s("ids", latest.ID), logs.TagField(logs.TagExternalStorageTree))
}
if err = c.service.MoveNode(ctx, bktInfo, systemTree, latest.ID[ind], 0, meta); err != nil {
@ -582,7 +582,7 @@ func (c *Tree) PutBucketCORS(ctx context.Context, bktInfo *data.BucketInfo, addr
latest := multiNode.Latest()
ind := latest.GetLatestNodeIndex()
if latest.IsSplit() {
c.reqLogger(ctx).Error(logs.BucketCORSNodeHasMultipleIDs)
c.reqLogger(ctx).Error(logs.BucketCORSNodeHasMultipleIDs, logs.TagField(logs.TagExternalStorageTree))
}
if err = c.service.MoveNode(ctx, bktInfo, systemTree, latest.ID[ind], 0, meta); err != nil {
@ -640,14 +640,14 @@ func (c *Tree) cleanOldNodes(ctx context.Context, nodes []*treeNode, bktInfo *da
for _, node := range nodes {
ind := node.GetLatestNodeIndex()
if node.IsSplit() {
c.reqLogger(ctx).Error(logs.SystemNodeHasMultipleIDs, zap.String("FileName", node.Meta[FileNameKey]), zap.Uint64s("ids", node.ID))
c.reqLogger(ctx).Error(logs.SystemNodeHasMultipleIDs, zap.String("FileName", node.Meta[FileNameKey]), zap.Uint64s("ids", node.ID), logs.TagField(logs.TagExternalStorageTree))
}
if err := c.service.RemoveNode(ctx, bktInfo, systemTree, node.ID[ind]); err != nil {
c.reqLogger(ctx).Warn(logs.FailedToRemoveOldSystemNode, zap.String("FileName", node.Meta[FileNameKey]), zap.Uint64("id", node.ID[ind]))
c.reqLogger(ctx).Warn(logs.FailedToRemoveOldSystemNode, zap.String("FileName", node.Meta[FileNameKey]), zap.Uint64("id", node.ID[ind]), logs.TagField(logs.TagExternalStorageTree))
} else {
addr, err := getTreeNodeAddress(node)
if err != nil {
c.log.Warn(logs.FailedToParseAddressInTreeNode, zap.String("FileName", node.Meta[FileNameKey]), zap.Uint64("id", node.ID[ind]))
c.log.Warn(logs.FailedToParseAddressInTreeNode, zap.String("FileName", node.Meta[FileNameKey]), zap.Uint64("id", node.ID[ind]), logs.TagField(logs.TagDatapath))
continue
}
res = append(res, addr)
@ -702,7 +702,7 @@ func (c *Tree) PutObjectTagging(ctx context.Context, bktInfo *data.BucketInfo, o
ind := tagNode.GetLatestNodeIndex()
if tagNode.IsSplit() {
c.reqLogger(ctx).Error(logs.ObjectTaggingNodeHasMultipleIDs)
c.reqLogger(ctx).Error(logs.ObjectTaggingNodeHasMultipleIDs, logs.TagField(logs.TagExternalStorageTree))
}
return c.service.MoveNode(ctx, bktInfo, versionTree, tagNode.ID[ind], objVersion.ID, treeTagSet)
@ -751,7 +751,7 @@ func (c *Tree) PutBucketTagging(ctx context.Context, bktInfo *data.BucketInfo, t
latest := multiNode.Latest()
ind := latest.GetLatestNodeIndex()
if latest.IsSplit() {
c.reqLogger(ctx).Error(logs.BucketTaggingNodeHasMultipleIDs, zap.Uint64s("ids", latest.ID))
c.reqLogger(ctx).Error(logs.BucketTaggingNodeHasMultipleIDs, zap.Uint64s("ids", latest.ID), logs.TagField(logs.TagExternalStorageTree))
}
if err = c.service.MoveNode(ctx, bktInfo, systemTree, latest.ID[ind], 0, treeTagSet); err != nil {
@ -1049,7 +1049,7 @@ func (s *VersionsByPrefixStreamImpl) parseNodeResponse(node NodeResponse) (res *
trNode, fileName, err := parseTreeNode(node)
if err != nil {
if !errors.Is(err, errNodeDoesntContainFileName) {
s.log.Debug(logs.ParseTreeNode, zap.Error(err))
s.log.Debug(logs.ParseTreeNode, zap.Error(err), logs.TagField(logs.TagDatapath))
}
return nil, true, nil
}
@ -1287,7 +1287,9 @@ func (c *Tree) getUnversioned(ctx context.Context, bktInfo *data.BucketInfo, tre
if len(nodes) > 1 {
c.reqLogger(ctx).Debug(logs.FoundMoreThanOneUnversionedNode,
zap.String("treeID", treeID), zap.String("filepath", filepath))
zap.String("treeID", treeID),
zap.String("filepath", filepath),
logs.TagField(logs.TagExternalStorageTree))
}
sort.Slice(nodes, func(i, j int) bool {
@ -1460,7 +1462,8 @@ func (c *Tree) AddPart(ctx context.Context, bktInfo *data.BucketInfo, multipartN
zap.String("upload id", info.UploadID),
zap.Uint64("multipart node id ", multipartNodeID),
zap.Uint64s("id", part.GetNodeID()),
zap.Error(err))
zap.Error(err),
logs.TagField(logs.TagDatapath))
continue
}
if partInfo.Number == info.Number {
@ -1488,7 +1491,8 @@ func (c *Tree) AddPart(ctx context.Context, bktInfo *data.BucketInfo, multipartN
c.reqLogger(ctx).Warn(logs.FailedToRemoveOldPartNode,
zap.String("key", info.Key),
zap.String("upload id", info.UploadID),
zap.Uint64("id", nodeID))
zap.Uint64("id", nodeID),
logs.TagField(logs.TagExternalStorageTree))
}
}
@ -1514,7 +1518,8 @@ func (c *Tree) GetParts(ctx context.Context, bktInfo *data.BucketInfo, multipart
// multipart parts nodeID shouldn't have multiple values
c.reqLogger(ctx).Warn(logs.UnexpectedMultiNodeIDsInSubTreeMultiParts,
zap.Uint64("multipart node id ", multipartNodeID),
zap.Uint64s("node ids", part.GetNodeID()))
zap.Uint64s("node ids", part.GetNodeID()),
logs.TagField(logs.TagDatapath))
continue
}
if part.GetNodeID()[0] == multipartNodeID {
@ -1525,7 +1530,8 @@ func (c *Tree) GetParts(ctx context.Context, bktInfo *data.BucketInfo, multipart
c.reqLogger(ctx).Warn(logs.FailedToParsePartInfo,
zap.Uint64("multipart node id ", multipartNodeID),
zap.Uint64s("node ids", part.GetNodeID()),
zap.Error(err))
zap.Error(err),
logs.TagField(logs.TagDatapath))
continue
}
result = append(result, partInfo)
@ -1556,7 +1562,7 @@ func (c *Tree) PutBucketLifecycleConfiguration(ctx context.Context, bktInfo *dat
latest := multiNode.Latest()
ind := latest.GetLatestNodeIndex()
if latest.IsSplit() {
c.reqLogger(ctx).Error(logs.BucketLifecycleNodeHasMultipleIDs)
c.reqLogger(ctx).Error(logs.BucketLifecycleNodeHasMultipleIDs, logs.TagField(logs.TagDatapath))
}
if err = c.service.MoveNode(ctx, bktInfo, systemTree, latest.ID[ind], 0, meta); err != nil {
@ -1831,7 +1837,7 @@ func (c *Tree) getSystemNode(ctx context.Context, bktInfo *data.BucketInfo, name
return nil, tree.ErrNodeNotFound
}
if len(nodes) != 1 {
c.reqLogger(ctx).Warn(logs.FoundSeveralSystemNodes, zap.String("name", name))
c.reqLogger(ctx).Warn(logs.FoundSeveralSystemNodes, zap.String("name", name), logs.TagField(logs.TagDatapath))
}
return newMultiNode(nodes)