[#328] Log invalid tree service KVs

Signed-off-by: Denis Kirillov <d.kirillov@yadro.com>
pull/330/head
Denis Kirillov 2024-03-04 15:35:23 +03:00
parent 4ee3648183
commit 6788306998
2 changed files with 48 additions and 26 deletions

View File

@ -148,4 +148,5 @@ const (
PolicyRequest = "policy request"
FailedToGenerateRequestID = "failed to generate request id"
InvalidBucketObjectLockEnabledHeader = "invalid X-Amz-Bucket-Object-Lock-Enabled header"
InvalidTreeKV = "invalid tree service meta KV"
)

View File

@ -184,16 +184,16 @@ func (n *treeNode) FileName() (string, bool) {
return value, ok
}
func newNodeVersion(filePath string, node NodeResponse) (*data.NodeVersion, error) {
func newNodeVersion(log *zap.Logger, filePath string, node NodeResponse) (*data.NodeVersion, error) {
treeNode, err := newTreeNode(node)
if err != nil {
return nil, fmt.Errorf("invalid tree node: %w", err)
}
return newNodeVersionFromTreeNode(filePath, treeNode), nil
return newNodeVersionFromTreeNode(log, filePath, treeNode), nil
}
func newNodeVersionFromTreeNode(filePath string, treeNode *treeNode) *data.NodeVersion {
func newNodeVersionFromTreeNode(log *zap.Logger, filePath string, treeNode *treeNode) *data.NodeVersion {
_, isUnversioned := treeNode.Get(isUnversionedKV)
_, isDeleteMarker := treeNode.Get(isDeleteMarkerKV)
_, isCombined := treeNode.Get(isCombinedKV)
@ -217,7 +217,9 @@ func newNodeVersionFromTreeNode(filePath string, treeNode *treeNode) *data.NodeV
}
if createdStr, ok := treeNode.Get(createdKV); ok {
if utcMilli, err := strconv.ParseInt(createdStr, 10, 64); err == nil {
if utcMilli, err := strconv.ParseInt(createdStr, 10, 64); err != nil {
log.Warn(logs.InvalidTreeKV, zap.String(createdKV, createdStr), zap.Error(err))
} else {
created := time.UnixMilli(utcMilli)
version.Created = &created
}
@ -225,7 +227,9 @@ func newNodeVersionFromTreeNode(filePath string, treeNode *treeNode) *data.NodeV
if ownerStr, ok := treeNode.Get(ownerKV); ok {
var owner user.ID
if err := owner.DecodeString(ownerStr); err == nil {
if err := owner.DecodeString(ownerStr); err != nil {
log.Warn(logs.InvalidTreeKV, zap.String(ownerKV, ownerStr), zap.Error(err))
} else {
version.Owner = &owner
}
}
@ -233,10 +237,10 @@ func newNodeVersionFromTreeNode(filePath string, treeNode *treeNode) *data.NodeV
return version
}
func newMultipartInfoFromTreeNode(filePath string, treeNode *treeNode) (*data.MultipartInfo, error) {
func newMultipartInfoFromTreeNode(log *zap.Logger, filePath string, treeNode *treeNode) (*data.MultipartInfo, error) {
uploadID, _ := treeNode.Get(uploadIDKV)
if uploadID == "" {
return nil, fmt.Errorf("it's not a multipart node")
return nil, fmt.Errorf("it's not a multipart node: missing UploadId")
}
multipartInfo := &data.MultipartInfo{
@ -246,23 +250,32 @@ func newMultipartInfoFromTreeNode(filePath string, treeNode *treeNode) (*data.Mu
Meta: treeNode.Meta,
}
ownerID, _ := treeNode.Get(ownerKV)
_ = multipartInfo.Owner.DecodeString(ownerID)
created, _ := treeNode.Get(createdKV)
if utcMilli, err := strconv.ParseInt(created, 10, 64); err == nil {
multipartInfo.Created = time.UnixMilli(utcMilli)
if ownerID, ok := treeNode.Get(ownerKV); ok {
if err := multipartInfo.Owner.DecodeString(ownerID); err != nil {
log.Warn(logs.InvalidTreeKV, zap.String(ownerKV, ownerID), zap.Error(err))
}
}
finished, _ := treeNode.Get(finishedKV)
if flag, err := strconv.ParseBool(finished); err == nil {
multipartInfo.Finished = flag
if created, ok := treeNode.Get(createdKV); ok {
if utcMilli, err := strconv.ParseInt(created, 10, 64); err != nil {
log.Warn(logs.InvalidTreeKV, zap.String(createdKV, created), zap.Error(err))
} else {
multipartInfo.Created = time.UnixMilli(utcMilli)
}
}
if finished, ok := treeNode.Get(finishedKV); ok {
if flag, err := strconv.ParseBool(finished); err != nil {
log.Warn(logs.InvalidTreeKV, zap.String(finishedKV, finished), zap.Error(err))
} else {
multipartInfo.Finished = flag
}
}
return multipartInfo, nil
}
func newMultipartInfo(node NodeResponse) (*data.MultipartInfo, error) {
func newMultipartInfo(log *zap.Logger, node NodeResponse) (*data.MultipartInfo, error) {
multipartInfo := &data.MultipartInfo{
ID: node.GetNodeID(),
Meta: make(map[string]string, len(node.GetMeta())),
@ -275,13 +288,19 @@ func newMultipartInfo(node NodeResponse) (*data.MultipartInfo, error) {
case FileNameKey:
multipartInfo.Key = string(kv.GetValue())
case createdKV:
if utcMilli, err := strconv.ParseInt(string(kv.GetValue()), 10, 64); err == nil {
if utcMilli, err := strconv.ParseInt(string(kv.GetValue()), 10, 64); err != nil {
log.Warn(logs.InvalidTreeKV, zap.String(createdKV, string(kv.GetValue())), zap.Error(err))
} else {
multipartInfo.Created = time.UnixMilli(utcMilli)
}
case ownerKV:
_ = multipartInfo.Owner.DecodeString(string(kv.GetValue()))
if err := multipartInfo.Owner.DecodeString(string(kv.GetValue())); err != nil {
log.Warn(logs.InvalidTreeKV, zap.String(ownerKV, string(kv.GetValue())), zap.Error(err))
}
case finishedKV:
if isFinished, err := strconv.ParseBool(string(kv.GetValue())); err == nil {
if isFinished, err := strconv.ParseBool(string(kv.GetValue())); err != nil {
log.Warn(logs.InvalidTreeKV, zap.String(finishedKV, string(kv.GetValue())), zap.Error(err))
} else {
multipartInfo.Finished = isFinished
}
default:
@ -612,7 +631,7 @@ func (c *Tree) GetLatestVersion(ctx context.Context, bktInfo *data.BucketInfo, o
return nil, err
}
return newNodeVersion(objectName, latestNode)
return newNodeVersion(c.reqLogger(ctx), objectName, latestNode)
}
func getLatestNode(nodes []NodeResponse) (NodeResponse, error) {
@ -818,7 +837,7 @@ func (s *VersionsByPrefixStreamImpl) parseNodeResponse(node NodeResponse) (res *
return nil, true, nil
}
return newNodeVersionFromTreeNode(filepath, trNode), false, nil
return newNodeVersionFromTreeNode(s.log, filepath, trNode), false, nil
}
func (c *Tree) InitVersionsByPrefixStream(ctx context.Context, bktInfo *data.BucketInfo, prefix string, latestOnly bool) (data.VersionsStream, error) {
@ -840,7 +859,7 @@ func (c *Tree) InitVersionsByPrefixStream(ctx context.Context, bktInfo *data.Buc
headPrefix: strings.TrimSuffix(prefix, tailPrefix),
tailPrefix: tailPrefix,
latestOnly: latestOnly,
log: c.log,
log: c.reqLogger(ctx),
}, nil
}
@ -1102,7 +1121,7 @@ func (c *Tree) getSubTreeMultipartUploads(ctx context.Context, bktInfo *data.Buc
namesMap[treeNode.ID] = filepath
}
multipartInfo, err := newMultipartInfoFromTreeNode(filepath, treeNode)
multipartInfo, err := newMultipartInfoFromTreeNode(c.reqLogger(ctx), filepath, treeNode)
if err != nil || multipartInfo.Finished {
continue
}
@ -1140,8 +1159,9 @@ func (c *Tree) GetMultipartUpload(ctx context.Context, bktInfo *data.BucketInfo,
return nil, err
}
log := c.reqLogger(ctx)
for _, node := range nodes {
info, err := newMultipartInfo(node)
info, err := newMultipartInfo(log, node)
if err != nil {
continue
}
@ -1373,9 +1393,10 @@ func (c *Tree) getVersions(ctx context.Context, bktInfo *data.BucketInfo, treeID
return nil, err
}
log := c.reqLogger(ctx)
result := make([]*data.NodeVersion, 0, len(nodes))
for _, node := range nodes {
nodeVersion, err := newNodeVersion(filepath, node)
nodeVersion, err := newNodeVersion(log, filepath, node)
if err != nil {
return nil, err
}