[#498] tree: Add spans to detail the trace

Signed-off-by: Roman Loginov <r.loginov@yadro.com>
This commit is contained in:
Roman Loginov 2024-11-25 16:05:58 +03:00
parent 8a1f4f8eab
commit 993e2bb114

View file

@ -11,6 +11,7 @@ import (
"strings"
"time"
"git.frostfs.info/TrueCloudLab/frostfs-observability/tracing"
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/data"
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/layer/frostfs"
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/layer/tree"
@ -493,6 +494,9 @@ func newPartInfo(node NodeResponse) (*data.PartInfoExtended, error) {
}
func (c *Tree) GetSettingsNode(ctx context.Context, bktInfo *data.BucketInfo) (*data.BucketSettings, error) {
ctx, span := tracing.StartSpanFromContext(ctx, "tree.GetSettingsNode")
defer span.End()
multiNode, err := c.getSystemNode(ctx, bktInfo, settingsFileName)
if err != nil {
return nil, fmt.Errorf("couldn't get node: %w", err)
@ -523,6 +527,9 @@ func (c *Tree) GetSettingsNode(ctx context.Context, bktInfo *data.BucketInfo) (*
}
func (c *Tree) PutSettingsNode(ctx context.Context, bktInfo *data.BucketInfo, settings *data.BucketSettings) error {
ctx, span := tracing.StartSpanFromContext(ctx, "tree.PutSettingsNode")
defer span.End()
multiNode, err := c.getSystemNode(ctx, bktInfo, settingsFileName)
isErrNotFound := errors.Is(err, tree.ErrNodeNotFound)
if err != nil && !isErrNotFound {
@ -552,6 +559,9 @@ func (c *Tree) PutSettingsNode(ctx context.Context, bktInfo *data.BucketInfo, se
}
func (c *Tree) GetBucketCORS(ctx context.Context, bktInfo *data.BucketInfo) (oid.Address, error) {
ctx, span := tracing.StartSpanFromContext(ctx, "tree.GetBucketCORS")
defer span.End()
node, err := c.getSystemNode(ctx, bktInfo, corsFilename)
if err != nil {
return oid.Address{}, err
@ -561,6 +571,9 @@ func (c *Tree) GetBucketCORS(ctx context.Context, bktInfo *data.BucketInfo) (oid
}
func (c *Tree) PutBucketCORS(ctx context.Context, bktInfo *data.BucketInfo, addr oid.Address) ([]oid.Address, error) {
ctx, span := tracing.StartSpanFromContext(ctx, "tree.PutBucketCORS")
defer span.End()
multiNode, err := c.getSystemNode(ctx, bktInfo, corsFilename)
isErrNotFound := errors.Is(err, tree.ErrNodeNotFound)
if err != nil && !isErrNotFound {
@ -601,6 +614,9 @@ func (c *Tree) PutBucketCORS(ctx context.Context, bktInfo *data.BucketInfo, addr
}
func (c *Tree) DeleteBucketCORS(ctx context.Context, bktInfo *data.BucketInfo) ([]oid.Address, error) {
ctx, span := tracing.StartSpanFromContext(ctx, "tree.DeleteBucketCORS")
defer span.End()
multiNode, err := c.getSystemNode(ctx, bktInfo, corsFilename)
isErrNotFound := errors.Is(err, tree.ErrNodeNotFound)
if err != nil && !isErrNotFound {
@ -658,6 +674,9 @@ func (c *Tree) cleanOldNodes(ctx context.Context, nodes []*treeNode, bktInfo *da
}
func (c *Tree) GetObjectTagging(ctx context.Context, bktInfo *data.BucketInfo, objVersion *data.NodeVersion) (map[string]string, error) {
ctx, span := tracing.StartSpanFromContext(ctx, "tree.GetObjectTagging")
defer span.End()
tagNode, err := c.getTreeNode(ctx, bktInfo, objVersion.ID, isTagKV)
if err != nil {
return nil, err
@ -683,6 +702,9 @@ func getObjectTagging(tagNode *treeNode) map[string]string {
}
func (c *Tree) PutObjectTagging(ctx context.Context, bktInfo *data.BucketInfo, objVersion *data.NodeVersion, tagSet map[string]string) error {
ctx, span := tracing.StartSpanFromContext(ctx, "tree.PutObjectTagging")
defer span.End()
tagNode, err := c.getTreeNode(ctx, bktInfo, objVersion.ID, isTagKV)
if err != nil {
return err
@ -709,10 +731,16 @@ func (c *Tree) PutObjectTagging(ctx context.Context, bktInfo *data.BucketInfo, o
}
func (c *Tree) DeleteObjectTagging(ctx context.Context, bktInfo *data.BucketInfo, objVersion *data.NodeVersion) error {
ctx, span := tracing.StartSpanFromContext(ctx, "tree.DeleteObjectTagging")
defer span.End()
return c.PutObjectTagging(ctx, bktInfo, objVersion, nil)
}
func (c *Tree) GetBucketTagging(ctx context.Context, bktInfo *data.BucketInfo) (map[string]string, error) {
ctx, span := tracing.StartSpanFromContext(ctx, "tree.GetBucketTagging")
defer span.End()
multiNode, err := c.getSystemNode(ctx, bktInfo, bucketTaggingFilename)
if err != nil {
return nil, err
@ -730,6 +758,9 @@ func (c *Tree) GetBucketTagging(ctx context.Context, bktInfo *data.BucketInfo) (
}
func (c *Tree) PutBucketTagging(ctx context.Context, bktInfo *data.BucketInfo, tagSet map[string]string) error {
ctx, span := tracing.StartSpanFromContext(ctx, "tree.PutBucketTagging")
defer span.End()
multiNode, err := c.getSystemNode(ctx, bktInfo, bucketTaggingFilename)
isErrNotFound := errors.Is(err, tree.ErrNodeNotFound)
if err != nil && !isErrNotFound {
@ -764,6 +795,9 @@ func (c *Tree) PutBucketTagging(ctx context.Context, bktInfo *data.BucketInfo, t
}
func (c *Tree) DeleteBucketTagging(ctx context.Context, bktInfo *data.BucketInfo) error {
ctx, span := tracing.StartSpanFromContext(ctx, "tree.DeleteBucketTagging")
defer span.End()
return c.PutBucketTagging(ctx, bktInfo, nil)
}
@ -807,10 +841,16 @@ func (c *Tree) getTreeNodes(ctx context.Context, bktInfo *data.BucketInfo, nodeI
}
func (c *Tree) GetVersions(ctx context.Context, bktInfo *data.BucketInfo, filepath string) ([]*data.NodeVersion, error) {
ctx, span := tracing.StartSpanFromContext(ctx, "tree.GetVersions")
defer span.End()
return c.getVersions(ctx, bktInfo, versionTree, filepath, false)
}
func (c *Tree) GetLatestVersion(ctx context.Context, bktInfo *data.BucketInfo, objectName string) (*data.NodeVersion, error) {
ctx, span := tracing.StartSpanFromContext(ctx, "tree.GetLatestVersion")
defer span.End()
meta := []string{oidKV, isCombinedKV, isUnversionedKV, isDeleteMarkerKV, etagKV, sizeKV, md5KV, creationEpochKV}
path := pathFromName(objectName)
@ -1080,6 +1120,9 @@ func (s *VersionsByPrefixStreamImpl) parseNodeResponse(node NodeResponse) (res *
}
func (c *Tree) InitVersionsByPrefixStream(ctx context.Context, bktInfo *data.BucketInfo, prefix string, latestOnly bool) (data.VersionsStream, error) {
ctx, span := tracing.StartSpanFromContext(ctx, "tree.InitVersionsByPrefixStream")
defer span.End()
mainStream, tailPrefix, rootID, err := c.getSubTreeByPrefixMainStream(ctx, bktInfo, versionTree, prefix)
if err != nil {
if errors.Is(err, io.EOF) {
@ -1272,6 +1315,9 @@ func formLatestNodeKey(parentID uint64, fileName string) string {
}
func (c *Tree) GetUnversioned(ctx context.Context, bktInfo *data.BucketInfo, filepath string) (*data.NodeVersion, error) {
ctx, span := tracing.StartSpanFromContext(ctx, "tree.GetUnversioned")
defer span.End()
return c.getUnversioned(ctx, bktInfo, versionTree, filepath)
}
@ -1298,14 +1344,23 @@ func (c *Tree) getUnversioned(ctx context.Context, bktInfo *data.BucketInfo, tre
}
func (c *Tree) AddVersion(ctx context.Context, bktInfo *data.BucketInfo, version *data.NodeVersion) (uint64, error) {
ctx, span := tracing.StartSpanFromContext(ctx, "tree.AddVersion")
defer span.End()
return c.addVersion(ctx, bktInfo, versionTree, version)
}
func (c *Tree) RemoveVersion(ctx context.Context, bktInfo *data.BucketInfo, id uint64) error {
ctx, span := tracing.StartSpanFromContext(ctx, "tree.RemoveVersion")
defer span.End()
return c.service.RemoveNode(ctx, bktInfo, versionTree, id)
}
func (c *Tree) CreateMultipartUpload(ctx context.Context, bktInfo *data.BucketInfo, info *data.MultipartInfo) error {
ctx, span := tracing.StartSpanFromContext(ctx, "tree.CreateMultipartUpload")
defer span.End()
path := pathFromName(info.Key)
meta := metaFromMultipart(info, path[len(path)-1])
_, err := c.service.AddNodeByPath(ctx, bktInfo, systemTree, path[:len(path)-1], meta)
@ -1314,6 +1369,9 @@ func (c *Tree) CreateMultipartUpload(ctx context.Context, bktInfo *data.BucketIn
}
func (c *Tree) GetMultipartUploadsByPrefix(ctx context.Context, bktInfo *data.BucketInfo, prefix string) ([]*data.MultipartInfo, error) {
ctx, span := tracing.StartSpanFromContext(ctx, "tree.GetMultipartUploadsByPrefix")
defer span.End()
subTreeNodes, headPrefix, err := c.getSubTreeByPrefix(ctx, bktInfo, systemTree, prefix, false)
if err != nil {
return nil, err
@ -1394,6 +1452,9 @@ func (c *Tree) getSubTreeMultipartUploads(ctx context.Context, bktInfo *data.Buc
}
func (c *Tree) GetMultipartUpload(ctx context.Context, bktInfo *data.BucketInfo, objectName, uploadID string) (*data.MultipartInfo, error) {
ctx, span := tracing.StartSpanFromContext(ctx, "tree.GetMultipartUpload")
defer span.End()
path := pathFromName(objectName)
p := &GetNodesParams{
BktInfo: bktInfo,
@ -1425,6 +1486,9 @@ func (c *Tree) GetMultipartUpload(ctx context.Context, bktInfo *data.BucketInfo,
}
func (c *Tree) AddPart(ctx context.Context, bktInfo *data.BucketInfo, multipartNodeID uint64, info *data.PartInfo) (oldObjIDsToDelete []oid.ID, err error) {
ctx, span := tracing.StartSpanFromContext(ctx, "tree.AddPart")
defer span.End()
parts, err := c.service.GetSubTree(ctx, bktInfo, systemTree, []uint64{multipartNodeID}, 2, false)
if err != nil {
return nil, err
@ -1503,6 +1567,9 @@ func (c *Tree) AddPart(ctx context.Context, bktInfo *data.BucketInfo, multipartN
}
func (c *Tree) GetParts(ctx context.Context, bktInfo *data.BucketInfo, multipartNodeID uint64) ([]*data.PartInfoExtended, error) {
ctx, span := tracing.StartSpanFromContext(ctx, "tree.GetParts")
defer span.End()
parts, err := c.service.GetSubTree(ctx, bktInfo, systemTree, []uint64{multipartNodeID}, 2, false)
if err != nil {
return nil, err
@ -1535,6 +1602,9 @@ func (c *Tree) GetParts(ctx context.Context, bktInfo *data.BucketInfo, multipart
}
func (c *Tree) PutBucketLifecycleConfiguration(ctx context.Context, bktInfo *data.BucketInfo, addr oid.Address) ([]oid.Address, error) {
ctx, span := tracing.StartSpanFromContext(ctx, "tree.PutBucketLifecycleConfiguration")
defer span.End()
multiNode, err := c.getSystemNode(ctx, bktInfo, bucketLifecycleFilename)
isErrNotFound := errors.Is(err, tree.ErrNodeNotFound)
if err != nil && !isErrNotFound {
@ -1575,6 +1645,9 @@ func (c *Tree) PutBucketLifecycleConfiguration(ctx context.Context, bktInfo *dat
}
func (c *Tree) GetBucketLifecycleConfiguration(ctx context.Context, bktInfo *data.BucketInfo) (oid.Address, error) {
ctx, span := tracing.StartSpanFromContext(ctx, "tree.GetBucketLifecycleConfiguration")
defer span.End()
node, err := c.getSystemNode(ctx, bktInfo, bucketLifecycleFilename)
if err != nil {
return oid.Address{}, fmt.Errorf("get lifecycle node: %w", err)
@ -1584,6 +1657,9 @@ func (c *Tree) GetBucketLifecycleConfiguration(ctx context.Context, bktInfo *dat
}
func (c *Tree) DeleteBucketLifecycleConfiguration(ctx context.Context, bktInfo *data.BucketInfo) ([]oid.Address, error) {
ctx, span := tracing.StartSpanFromContext(ctx, "tree.DeleteBucketLifecycleConfiguration")
defer span.End()
multiNode, err := c.getSystemNode(ctx, bktInfo, bucketLifecycleFilename)
isErrNotFound := errors.Is(err, tree.ErrNodeNotFound)
if err != nil && !isErrNotFound {
@ -1603,6 +1679,9 @@ func (c *Tree) DeleteBucketLifecycleConfiguration(ctx context.Context, bktInfo *
}
func (c *Tree) DeleteMultipartUpload(ctx context.Context, bktInfo *data.BucketInfo, multipartInfo *data.MultipartInfo) error {
ctx, span := tracing.StartSpanFromContext(ctx, "tree.DeleteMultipartUpload")
defer span.End()
err := c.service.RemoveNode(ctx, bktInfo, systemTree, multipartInfo.ID)
if err != nil {
return err
@ -1614,6 +1693,9 @@ func (c *Tree) DeleteMultipartUpload(ctx context.Context, bktInfo *data.BucketIn
}
func (c *Tree) PutLock(ctx context.Context, bktInfo *data.BucketInfo, nodeID uint64, lock *data.LockInfo) error {
ctx, span := tracing.StartSpanFromContext(ctx, "tree.PutLock")
defer span.End()
meta := map[string]string{isLockKV: "true"}
if lock.IsLegalHoldSet() {
@ -1636,6 +1718,9 @@ func (c *Tree) PutLock(ctx context.Context, bktInfo *data.BucketInfo, nodeID uin
}
func (c *Tree) GetLock(ctx context.Context, bktInfo *data.BucketInfo, nodeID uint64) (*data.LockInfo, error) {
ctx, span := tracing.StartSpanFromContext(ctx, "tree.GetLock")
defer span.End()
lockNode, err := c.getTreeNode(ctx, bktInfo, nodeID, isLockKV)
if err != nil {
return nil, err
@ -1675,6 +1760,9 @@ func getLock(lockNode *treeNode) (*data.LockInfo, error) {
}
func (c *Tree) GetObjectTaggingAndLock(ctx context.Context, bktInfo *data.BucketInfo, objVersion *data.NodeVersion) (map[string]string, *data.LockInfo, error) {
ctx, span := tracing.StartSpanFromContext(ctx, "tree.GetObjectTaggingAndLock")
defer span.End()
nodes, err := c.getTreeNodes(ctx, bktInfo, objVersion.ID, isTagKV, isLockKV)
if err != nil {
return nil, nil, err