Set IO tags #1608

Merged
fyrchik merged 13 commits from dstepanov-yadro/frostfs-node:feat/tagging into master 2025-02-07 14:05:59 +00:00
31 changed files with 625 additions and 30 deletions

View file

@ -9,7 +9,6 @@ import (
"git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-cli/internal/commonflags"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/network"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/tree"
metrics "git.frostfs.info/TrueCloudLab/frostfs-observability/metrics/grpc"
tracing "git.frostfs.info/TrueCloudLab/frostfs-observability/tracing/grpc"
"github.com/spf13/cobra"
"github.com/spf13/viper"
@ -34,11 +33,9 @@ func _client() (tree.TreeServiceClient, error) {
opts := []grpc.DialOption{
grpc.WithChainUnaryInterceptor(
metrics.NewUnaryClientInterceptor(),

No need for metric in frostfs-cli

No need for metric in frostfs-cli
tracing.NewUnaryClientInteceptor(),
),
grpc.WithChainStreamInterceptor(
metrics.NewStreamClientInterceptor(),
tracing.NewStreamClientInterceptor(),
),
grpc.WithDefaultCallOptions(grpc.WaitForReady(true)),

View file

@ -493,6 +493,7 @@ type cfg struct {
cfgNetmap cfgNetmap
cfgControlService cfgControlService
cfgObject cfgObject
cfgQoSService cfgQoSService
}
// ReadCurrentNetMap reads network map which has been cached at the

View file

@ -0,0 +1,46 @@
package qos
import (
"fmt"
"git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config"
"github.com/nspcc-dev/neo-go/pkg/crypto/keys"
)
const (
subsection = "qos"
criticalSubSection = "critical"
internalSubSection = "internal"
)
// CriticalAuthorizedKeys parses and returns an array of "critical.authorized_keys" config
// parameter from "qos" section.
//
// Returns an empty list if not set.
func CriticalAuthorizedKeys(c *config.Config) keys.PublicKeys {
return authorizedKeys(c, criticalSubSection)
}
// InternalAuthorizedKeys parses and returns an array of "internal.authorized_keys" config
// parameter from "qos" section.
//
// Returns an empty list if not set.
func InternalAuthorizedKeys(c *config.Config) keys.PublicKeys {
return authorizedKeys(c, internalSubSection)
}
func authorizedKeys(c *config.Config, sub string) keys.PublicKeys {
strKeys := config.StringSliceSafe(c.Sub(subsection).Sub(sub), "authorized_keys")
pubs := make(keys.PublicKeys, 0, len(strKeys))
for i := range strKeys {
pub, err := keys.NewPublicKeyFromString(strKeys[i])
if err != nil {
panic(fmt.Errorf("invalid authorized key %s for qos.%s: %w", strKeys[i], sub, err))
}
pubs = append(pubs, pub)
}
return pubs
}

View file

@ -0,0 +1,40 @@
package qos
import (
"testing"
"git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config"
configtest "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/test"
"github.com/nspcc-dev/neo-go/pkg/crypto/keys"
"github.com/stretchr/testify/require"
)
func TestQoSSection(t *testing.T) {
t.Run("defaults", func(t *testing.T) {
empty := configtest.EmptyConfig()
require.Empty(t, CriticalAuthorizedKeys(empty))
require.Empty(t, InternalAuthorizedKeys(empty))
})
const path = "../../../../config/example/node"
criticalPubs := make(keys.PublicKeys, 2)
criticalPubs[0], _ = keys.NewPublicKeyFromString("035839e45d472a3b7769a2a1bd7d54c4ccd4943c3b40f547870e83a8fcbfb3ce11")
criticalPubs[1], _ = keys.NewPublicKeyFromString("028f42cfcb74499d7b15b35d9bff260a1c8d27de4f446a627406a382d8961486d6")
internalPubs := make(keys.PublicKeys, 2)
internalPubs[0], _ = keys.NewPublicKeyFromString("02b3622bf4017bdfe317c58aed5f4c753f206b7db896046fa7d774bbc4bf7f8dc2")
internalPubs[1], _ = keys.NewPublicKeyFromString("031a6c6fbbdf02ca351745fa86b9ba5a9452d785ac4f7fc2b7548ca2a46c4fcf4a")
fileConfigTest := func(c *config.Config) {
require.Equal(t, criticalPubs, CriticalAuthorizedKeys(c))
require.Equal(t, internalPubs, InternalAuthorizedKeys(c))
}
configtest.ForEachFileType(path, fileConfigTest)
t.Run("ENV", func(t *testing.T) {
configtest.ForEnvFileType(t, path, fileConfigTest)
})
}

View file

@ -7,9 +7,12 @@ import (
controlconfig "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/control"
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/qos"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/control"
controlSvc "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/control/server"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/sdnotify"
metrics "git.frostfs.info/TrueCloudLab/frostfs-observability/metrics/grpc"
tracing "git.frostfs.info/TrueCloudLab/frostfs-observability/tracing/grpc"
"go.uber.org/zap"
"google.golang.org/grpc"
)
@ -50,7 +53,14 @@ func initControlService(ctx context.Context, c *cfg) {
return
}
c.cfgControlService.server = grpc.NewServer()
c.cfgControlService.server = grpc.NewServer(
grpc.ChainUnaryInterceptor(
qos.NewSetCriticalIOTagUnaryServerInterceptor(),

First interceptor is the outer most, so it will be incoming RPC -> set IO tag critical -> metrics -> tracing

First interceptor is the outer most, so it will be incoming RPC -> set IO tag critical -> metrics -> tracing
metrics.NewUnaryServerInterceptor(),
tracing.NewUnaryServerInterceptor(),
),
// control service has no stream methods, so no stream interceptors added

When it will have stream methods, we won't remember this line.
Is there any problem with adding them?

When it will have stream methods, we won't remember this line. Is there any problem with adding them?

Just I don't want to add something I can't test.

Just I don't want to add something I can't test.
)
c.onShutdown(func() {
stopGRPC(ctx, "FrostFS Control API", c.cfgControlService.server, c.log)

View file

@ -12,6 +12,7 @@ import (
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger"
metrics "git.frostfs.info/TrueCloudLab/frostfs-observability/metrics/grpc"
tracing "git.frostfs.info/TrueCloudLab/frostfs-observability/tracing/grpc"
qos "git.frostfs.info/TrueCloudLab/frostfs-qos/tagging"
"go.uber.org/zap"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials"
@ -130,10 +131,12 @@ func getGrpcServerOpts(ctx context.Context, c *cfg, sc *grpcconfig.Config) ([]gr
serverOpts := []grpc.ServerOption{
grpc.MaxRecvMsgSize(maxRecvMsgSize),
grpc.ChainUnaryInterceptor(
qos.NewUnaryServerInterceptor(),

This is the outer most interceptor, so so it will be incoming RPC -> parse and set IO tag -> metrics -> tracing

This is the outer most interceptor, so so it will be incoming RPC -> parse and set IO tag -> metrics -> tracing
metrics.NewUnaryServerInterceptor(),
tracing.NewUnaryServerInterceptor(),
),
grpc.ChainStreamInterceptor(
qos.NewStreamServerInterceptor(),
metrics.NewStreamServerInterceptor(),
tracing.NewStreamServerInterceptor(),
),

View file

@ -101,6 +101,7 @@ func initApp(ctx context.Context, c *cfg) {
initAndLog(ctx, c, "gRPC", func(c *cfg) { initGRPC(ctx, c) })
initAndLog(ctx, c, "netmap", func(c *cfg) { initNetmapService(ctx, c) })
initAndLog(ctx, c, "qos", func(c *cfg) { initQoSService(c) })

Could you explain, why do we have both interceptor and service?

Could you explain, why do we have both interceptor and service?

See comment about tree service: #1608 (comment)

See comment about tree service: https://git.frostfs.info/TrueCloudLab/frostfs-node/pulls/1608#issuecomment-66636
initAccessPolicyEngine(ctx, c)
initAndLog(ctx, c, "access policy engine", func(c *cfg) {

View file

@ -168,7 +168,7 @@ func initObjectService(c *cfg) {
sPatch := createPatchSvc(sGet, sPut)
// build service pipeline
// grpc | audit | <metrics> | signature | response | acl | ape | split
// grpc | audit | qos | <metrics> | signature | response | acl | ape | split
splitSvc := createSplitService(c, sPutV2, sGetV2, sSearchV2, sDeleteV2, sPatch)
@ -191,7 +191,8 @@ func initObjectService(c *cfg) {
c.shared.metricsSvc = objectService.NewMetricCollector(
signSvc, c.metricsCollector.ObjectService(), metricsconfig.Enabled(c.appCfg))
auditSvc := objectService.NewAuditService(c.shared.metricsSvc, c.log, c.audit)
qosService := objectService.NewQoSObjectService(c.shared.metricsSvc, &c.cfgQoSService)
auditSvc := objectService.NewAuditService(qosService, c.log, c.audit)
server := objectTransportGRPC.New(auditSvc)
c.cfgGRPC.performAndSave(func(_ string, _ net.Listener, s *grpc.Server) {

95
cmd/frostfs-node/qos.go Normal file
View file

@ -0,0 +1,95 @@
package main
import (
"bytes"
"context"
qosconfig "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/qos"
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/qos"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/netmap"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger"
qosTagging "git.frostfs.info/TrueCloudLab/frostfs-qos/tagging"
"go.uber.org/zap"
)
type cfgQoSService struct {
netmapSource netmap.Source
logger *logger.Logger
allowedCriticalPubs [][]byte
allowedInternalPubs [][]byte
}
func initQoSService(c *cfg) {
criticalPubs := qosconfig.CriticalAuthorizedKeys(c.appCfg)
internalPubs := qosconfig.InternalAuthorizedKeys(c.appCfg)
rawCriticalPubs := make([][]byte, 0, len(criticalPubs))
rawInternalPubs := make([][]byte, 0, len(internalPubs))
for i := range criticalPubs {
rawCriticalPubs = append(rawCriticalPubs, criticalPubs[i].Bytes())
}
for i := range internalPubs {
rawInternalPubs = append(rawInternalPubs, internalPubs[i].Bytes())
}
c.cfgQoSService = cfgQoSService{
netmapSource: c.netMapSource,
logger: c.log,
allowedCriticalPubs: rawCriticalPubs,
allowedInternalPubs: rawInternalPubs,
}
}
func (s *cfgQoSService) AdjustIncomingTag(ctx context.Context, requestSignPublicKey []byte) context.Context {
achuprov marked this conversation as resolved Outdated

Typo AdjustIncommingTag -> AdjustIncomingTag

Typo `AdjustIncommingTag` -> `AdjustIncomingTag`

fixed

fixed

This typo still appears in many places, including commit messages. Please make sure to fix it everywhere

This typo still appears in many places, including commit messages. Please make sure to fix it everywhere

fixed

fixed
rawTag, defined := qosTagging.IOTagFromContext(ctx)
if !defined {
return qosTagging.ContextWithIOTag(ctx, qos.IOTagClient.String())
}
ioTag, err := qos.FromRawString(rawTag)
if err != nil {
s.logger.Warn(ctx, logs.FailedToParseIncomingIOTag, zap.Error(err))
return qosTagging.ContextWithIOTag(ctx, qos.IOTagClient.String())
}
switch ioTag {
case qos.IOTagClient:
return ctx
case qos.IOTagCritical:
for _, pk := range s.allowedCriticalPubs {
if bytes.Equal(pk, requestSignPublicKey) {
return ctx
}
}
nm, err := s.netmapSource.GetNetMap(ctx, 0)
if err != nil {
s.logger.Debug(ctx, logs.FailedToGetNetmapToAdjustIOTag, zap.Error(err))
fyrchik marked this conversation as resolved Outdated

I won't be so sure about Error -- it could be done on every request.
How about debug?
Not insisting.

I won't be so sure about `Error` -- it could be done on every request. How about `debug`? Not insisting.

Ok, fixed

Ok, fixed
return qosTagging.ContextWithIOTag(ctx, qos.IOTagClient.String())
}
for _, node := range nm.Nodes() {
if bytes.Equal(node.PublicKey(), requestSignPublicKey) {
return ctx
}
}
return qosTagging.ContextWithIOTag(ctx, qos.IOTagClient.String())
fyrchik marked this conversation as resolved Outdated

Does qos.IOTagClient.String() allocate?
It seems constant, so better be it.

Does `qos.IOTagClient.String()` allocate? It seems constant, so better be it.
package qos

import (
	"testing"

	"github.com/stretchr/testify/require"
)

var ioTagBackgroundString = "background"

func (t IOTag) String2() string {
	switch t {
	case IOTagBackground:
		return ioTagBackgroundString
	default:
		return ""
	}
}

func BenchmarkTag(b *testing.B) {
	tag := IOTagBackground
	b.Run("cast", func(b *testing.B) {
		b.ReportAllocs()
		b.ResetTimer()
		require.Equal(b, ioTagBackgroundString, string(tag))
	})
	b.Run("String()", func(b *testing.B) {
		b.ReportAllocs()
		b.ResetTimer()
		require.Equal(b, ioTagBackgroundString, tag.String())
	})
	b.Run("String2()", func(b *testing.B) {
		b.ReportAllocs()
		b.ResetTimer()
		require.Equal(b, ioTagBackgroundString, tag.String2())
	})
}


BenchmarkTag/cast-8         	1000000000	         0.0000052 ns/op	       0 B/op	       0 allocs/op
BenchmarkTag/String()-8     	1000000000	         0.0000024 ns/op	       0 B/op	       0 allocs/op
BenchmarkTag/String2()-8    	1000000000	         0.0000028 ns/op	       0 B/op	       0 allocs/op
PASS
``` package qos import ( "testing" "github.com/stretchr/testify/require" ) var ioTagBackgroundString = "background" func (t IOTag) String2() string { switch t { case IOTagBackground: return ioTagBackgroundString default: return "" } } func BenchmarkTag(b *testing.B) { tag := IOTagBackground b.Run("cast", func(b *testing.B) { b.ReportAllocs() b.ResetTimer() require.Equal(b, ioTagBackgroundString, string(tag)) }) b.Run("String()", func(b *testing.B) { b.ReportAllocs() b.ResetTimer() require.Equal(b, ioTagBackgroundString, tag.String()) }) b.Run("String2()", func(b *testing.B) { b.ReportAllocs() b.ResetTimer() require.Equal(b, ioTagBackgroundString, tag.String2()) }) } BenchmarkTag/cast-8 1000000000 0.0000052 ns/op 0 B/op 0 allocs/op BenchmarkTag/String()-8 1000000000 0.0000024 ns/op 0 B/op 0 allocs/op BenchmarkTag/String2()-8 1000000000 0.0000028 ns/op 0 B/op 0 allocs/op PASS ```
case qos.IOTagInternal:
for _, pk := range s.allowedInternalPubs {
if bytes.Equal(pk, requestSignPublicKey) {
return ctx
}
}
nm, err := s.netmapSource.GetNetMap(ctx, 0)
if err != nil {
s.logger.Debug(ctx, logs.FailedToGetNetmapToAdjustIOTag, zap.Error(err))
return qosTagging.ContextWithIOTag(ctx, qos.IOTagClient.String())
}
for _, node := range nm.Nodes() {
if bytes.Equal(node.PublicKey(), requestSignPublicKey) {
return ctx
}
}
return qosTagging.ContextWithIOTag(ctx, qos.IOTagClient.String())
default:
s.logger.Warn(ctx, logs.NotSupportedIncomingIOTagReplacedWithClient, zap.Stringer("io_tag", ioTag))
return qosTagging.ContextWithIOTag(ctx, qos.IOTagClient.String())
}
}

View file

@ -72,7 +72,7 @@ func initTreeService(c *cfg) {
)
c.cfgGRPC.performAndSave(func(_ string, _ net.Listener, s *grpc.Server) {
tree.RegisterTreeServiceServer(s, c.treeService)
tree.RegisterTreeServiceServer(s, tree.NewIOTagAdjustServer(c.treeService, &c.cfgQoSService))
})
c.workers = append(c.workers, newWorkerFromFunc(func(ctx context.Context) {

View file

@ -225,3 +225,6 @@ FROSTFS_MULTINET_SUBNETS_1_SOURCE_IPS="10.78.70.185 10.78.71.185"
FROSTFS_MULTINET_BALANCER=roundrobin
FROSTFS_MULTINET_RESTRICT=false
FROSTFS_MULTINET_FALLBACK_DELAY=350ms
FROSTFS_QOS_CRITICAL_AUTHORIZED_KEYS="035839e45d472a3b7769a2a1bd7d54c4ccd4943c3b40f547870e83a8fcbfb3ce11 028f42cfcb74499d7b15b35d9bff260a1c8d27de4f446a627406a382d8961486d6"
fyrchik marked this conversation as resolved Outdated

Could you provide a different pair of keys for different section?
Testing same keys easily misses copy-paste errors.

Could you provide a different pair of keys for different section? Testing same keys easily misses copy-paste errors.

done

done
FROSTFS_QOS_INTERNAL_AUTHORIZED_KEYS="02b3622bf4017bdfe317c58aed5f4c753f206b7db896046fa7d774bbc4bf7f8dc2 031a6c6fbbdf02ca351745fa86b9ba5a9452d785ac4f7fc2b7548ca2a46c4fcf4a"

View file

@ -305,5 +305,19 @@
"balancer": "roundrobin",
"restrict": false,
"fallback_delay": "350ms"
},
"qos": {
"critical": {
"authorized_keys": [
"035839e45d472a3b7769a2a1bd7d54c4ccd4943c3b40f547870e83a8fcbfb3ce11",
"028f42cfcb74499d7b15b35d9bff260a1c8d27de4f446a627406a382d8961486d6"
]
},
"internal": {
"authorized_keys": [
"02b3622bf4017bdfe317c58aed5f4c753f206b7db896046fa7d774bbc4bf7f8dc2",
"031a6c6fbbdf02ca351745fa86b9ba5a9452d785ac4f7fc2b7548ca2a46c4fcf4a"
]
}
}
}

View file

@ -270,3 +270,13 @@ multinet:
balancer: roundrobin
restrict: false
fallback_delay: 350ms
qos:
critical:
authorized_keys: # list of hex-encoded public keys that have rights to use `critical` IO tag
- 035839e45d472a3b7769a2a1bd7d54c4ccd4943c3b40f547870e83a8fcbfb3ce11
- 028f42cfcb74499d7b15b35d9bff260a1c8d27de4f446a627406a382d8961486d6
internal:
authorized_keys: # list of hex-encoded public keys that have rights to use `internal` IO tag
- 02b3622bf4017bdfe317c58aed5f4c753f206b7db896046fa7d774bbc4bf7f8dc2
- 031a6c6fbbdf02ca351745fa86b9ba5a9452d785ac4f7fc2b7548ca2a46c4fcf4a

View file

@ -27,6 +27,7 @@ There are some custom types used for brevity:
| `runtime` | [Runtime configuration](#runtime-section) |
| `audit` | [Audit configuration](#audit-section) |
| `multinet` | [Multinet configuration](#multinet-section) |
| `qos` | [QoS configuration](#qos-section) |
# `control` section
```yaml
@ -471,3 +472,20 @@ multinet:
| `balancer` | `string` | "" | Balancer to select network interfaces, allowed values are "" (no balancing, use first suitable interface) or "roundrobin". |
| `restrict` | `bool` | false | If `true` then any requests that do not match `subnets` will fail. |
| `fallback_delay` | `duration` | 350ms | Delay before fallback to secondary IP addresses in case of hostname resolve. |
# `qos` section
```yaml
qos:
critical:
authorized_keys:
- 035839e45d472a3b7769a2a1bd7d54c4ccd4943c3b40f547870e83a8fcbfb3ce11
- 028f42cfcb74499d7b15b35d9bff260a1c8d27de4f446a627406a382d8961486d6
internal:
authorized_keys:
- 035839e45d472a3b7769a2a1bd7d54c4ccd4943c3b40f547870e83a8fcbfb3ce11
- 028f42cfcb74499d7b15b35d9bff260a1c8d27de4f446a627406a382d8961486d6
```
| Parameter | Type | Default value | Description |
| -------------------------- | -------------- | ------------- | --------------------------------------------------------------------------- |
| `critical.authorized_keys` | `[]public key` | empty | List of public keys for which requests with the tag `critical` are allowed. |
| `internal.authorized_keys` | `[]public key` | empty | List of public keys for which requests with the tag `internal` are allowed. |

1
go.mod
View file

@ -8,6 +8,7 @@ require (
git.frostfs.info/TrueCloudLab/frostfs-crypto v0.6.0
git.frostfs.info/TrueCloudLab/frostfs-locode-db v0.4.1-0.20240710074952-65761deb5c0d
git.frostfs.info/TrueCloudLab/frostfs-observability v0.0.0-20241112082307-f17779933e88
git.frostfs.info/TrueCloudLab/frostfs-qos v0.0.0-20250128150313-cfbca7fa1dfe
git.frostfs.info/TrueCloudLab/frostfs-sdk-go v0.0.0-20250202151421-8389887a3421
git.frostfs.info/TrueCloudLab/hrw v1.2.1
git.frostfs.info/TrueCloudLab/multinet v0.0.0-20241015075604-6cb0d80e0972

2
go.sum
View file

@ -8,6 +8,8 @@ git.frostfs.info/TrueCloudLab/frostfs-locode-db v0.4.1-0.20240710074952-65761deb
git.frostfs.info/TrueCloudLab/frostfs-locode-db v0.4.1-0.20240710074952-65761deb5c0d/go.mod h1:7ZZq8iguY7qFsXajdHGmZd2AW4QbucyrJwhbsRfOfek=
git.frostfs.info/TrueCloudLab/frostfs-observability v0.0.0-20241112082307-f17779933e88 h1:9bvBDLApbbO5sXBKdODpE9tzy3HV99nXxkDWNn22rdI=
git.frostfs.info/TrueCloudLab/frostfs-observability v0.0.0-20241112082307-f17779933e88/go.mod h1:kbwB4v2o6RyOfCo9kEFeUDZIX3LKhmS0yXPrtvzkQ1g=
git.frostfs.info/TrueCloudLab/frostfs-qos v0.0.0-20250128150313-cfbca7fa1dfe h1:81gDNdWNLP24oMQukRiCE9R1wGSh0l0dRq3F1W+Oesc=
git.frostfs.info/TrueCloudLab/frostfs-qos v0.0.0-20250128150313-cfbca7fa1dfe/go.mod h1:PCijYq4oa8vKtIEcUX6jRiszI6XAW+nBwU+T1kB4d1U=
git.frostfs.info/TrueCloudLab/frostfs-sdk-go v0.0.0-20250202151421-8389887a3421 h1:pP19IawSdsLCKFv7HMNfWAeH6E3uSnntKZkwka+/2+4=
git.frostfs.info/TrueCloudLab/frostfs-sdk-go v0.0.0-20250202151421-8389887a3421/go.mod h1:aQpPWfG8oyfJ2X+FenPTJpSRWZjwcP5/RAtkW+/VEX8=
git.frostfs.info/TrueCloudLab/hrw v1.2.1 h1:ccBRK21rFvY5R1WotI6LNoPlizk7qSvdfD8lNIRudVc=

View file

@ -510,4 +510,7 @@ const (
BlobovniczatreeFailedToRemoveRebuildTempFile = "failed to remove rebuild temp file"
WritecacheCantGetObject = "can't get an object from fstree"
FailedToUpdateMultinetConfiguration = "failed to update multinet configuration"
FailedToParseIncomingIOTag = "failed to parse incoming IO tag"

FailerToParseIncommingIOTag -> FailedToParseIncommingIOTag

FailerToParseIncommingIOTag -> FailedToParseIncommingIOTag

fixed

fixed
NotSupportedIncomingIOTagReplacedWithClient = "incoming IO tag is not supported, replaced with `client`"
FailedToGetNetmapToAdjustIOTag = "failed to get netmap to adjust IO tag, replaced with `client`"
)

51
internal/qos/grpc.go Normal file
View file

@ -0,0 +1,51 @@
package qos
import (
"context"
"git.frostfs.info/TrueCloudLab/frostfs-qos/tagging"
"google.golang.org/grpc"
)
func NewSetCriticalIOTagUnaryServerInterceptor() grpc.UnaryServerInterceptor {
return func(ctx context.Context, req any, _ *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (resp any, err error) {
ctx = tagging.ContextWithIOTag(ctx, IOTagCritical.String())
return handler(ctx, req)
}
}
func NewAdjustOutgoingIOTagUnaryClientInterceptor() grpc.UnaryClientInterceptor {
return func(ctx context.Context, method string, req, reply any, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error {
rawTag, ok := tagging.IOTagFromContext(ctx)
if !ok {
return invoker(ctx, method, req, reply, cc, opts...)
}
tag, err := FromRawString(rawTag)
if err != nil {
tag = IOTagClient
}
if tag == IOTagBackground || tag == IOTagPolicer || tag == IOTagWritecache {
tag = IOTagInternal
}
ctx = tagging.ContextWithIOTag(ctx, tag.String())
return invoker(ctx, method, req, reply, cc, opts...)
}
}
func NewAdjustOutgoingIOTagStreamClientInterceptor() grpc.StreamClientInterceptor {
return func(ctx context.Context, desc *grpc.StreamDesc, cc *grpc.ClientConn, method string, streamer grpc.Streamer, opts ...grpc.CallOption) (grpc.ClientStream, error) {
rawTag, ok := tagging.IOTagFromContext(ctx)
if !ok {
return streamer(ctx, desc, cc, method, opts...)
}
tag, err := FromRawString(rawTag)
if err != nil {
tag = IOTagClient
}
if tag == IOTagBackground || tag == IOTagPolicer || tag == IOTagWritecache {
tag = IOTagInternal
}
ctx = tagging.ContextWithIOTag(ctx, tag.String())
return streamer(ctx, desc, cc, method, opts...)
}
}

39
internal/qos/tags.go Normal file
View file

@ -0,0 +1,39 @@
package qos
import "fmt"
type IOTag string
const (
IOTagClient IOTag = "client"
IOTagInternal IOTag = "internal"
IOTagBackground IOTag = "background"
IOTagWritecache IOTag = "writecache"
IOTagPolicer IOTag = "policer"
IOTagCritical IOTag = "critical"
ioTagUnknown IOTag = ""
)
func FromRawString(s string) (IOTag, error) {
switch s {
case string(IOTagCritical):
return IOTagCritical, nil
case string(IOTagClient):
return IOTagClient, nil
case string(IOTagInternal):
return IOTagInternal, nil
case string(IOTagBackground):
return IOTagBackground, nil
case string(IOTagWritecache):
return IOTagWritecache, nil
case string(IOTagPolicer):
return IOTagPolicer, nil
default:
return ioTagUnknown, fmt.Errorf("unknown tag %s", s)
}
}
func (t IOTag) String() string {
return string(t)
}

View file

@ -45,7 +45,7 @@ func TestUpgradeV2ToV3(t *testing.T) {
type testContainerInfoProvider struct{}
func (p *testContainerInfoProvider) Info(id cid.ID) (container.Info, error) {
func (p *testContainerInfoProvider) Info(ctx context.Context, id cid.ID) (container.Info, error) {
return container.Info{}, nil
}

View file

@ -6,11 +6,13 @@ import (
"time"
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/qos"
meta "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/metabase"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard/mode"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger"
"git.frostfs.info/TrueCloudLab/frostfs-observability/tracing"
"git.frostfs.info/TrueCloudLab/frostfs-qos/tagging"
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
"go.uber.org/zap"
@ -149,7 +151,7 @@ func (gc *gc) init(ctx context.Context) {
if sz > 0 {
gc.workerPool = gc.workerPoolInit(sz)
}
ctx = tagging.ContextWithIOTag(ctx, qos.IOTagBackground.String())
gc.wg.Add(2)
go gc.tickRemover(ctx)
go gc.listenEvents(ctx)

View file

@ -6,10 +6,12 @@ import (
"sync"
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/qos"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor"
meta "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/metabase"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger"
"git.frostfs.info/TrueCloudLab/frostfs-observability/tracing"
"git.frostfs.info/TrueCloudLab/frostfs-qos/tagging"
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace"
@ -103,6 +105,7 @@ func runRebuild(ctx context.Context, bs *blobstor.BlobStor, mb *meta.DB, log *lo
default:
}
log.Info(ctx, logs.BlobstoreRebuildStarted)
ctx = tagging.ContextWithIOTag(ctx, qos.IOTagBackground.String())
if err := bs.Rebuild(ctx, &mbStorageIDUpdate{mb: mb}, limiter, fillPercent); err != nil {
log.Warn(ctx, logs.FailedToRebuildBlobstore, zap.Error(err))
} else {

View file

@ -6,6 +6,7 @@ import (
"time"
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/qos"
objectCore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common"
@ -14,6 +15,7 @@ import (
meta "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/metabase"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard/mode"
"git.frostfs.info/TrueCloudLab/frostfs-observability/tracing"
"git.frostfs.info/TrueCloudLab/frostfs-qos/tagging"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client"
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
"go.opentelemetry.io/otel/attribute"
@ -35,6 +37,7 @@ func (c *cache) runFlushLoop(ctx context.Context) {
if c.disableBackgroundFlush {
return
}
ctx = tagging.ContextWithIOTag(ctx, qos.IOTagWritecache.String())
fl := newFlushLimiter(c.flushSizeLimit)
c.wg.Add(1)
go func() {

View file

@ -7,10 +7,12 @@ import (
"sync"
"time"
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/qos"
clientcore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/client"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/network"
metrics "git.frostfs.info/TrueCloudLab/frostfs-observability/metrics/grpc"
tracing "git.frostfs.info/TrueCloudLab/frostfs-observability/tracing/grpc"
"git.frostfs.info/TrueCloudLab/frostfs-qos/tagging"
rawclient "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/api/rpc/client"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client"
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
@ -62,12 +64,16 @@ func (x *multiClient) createForAddress(ctx context.Context, addr network.Address
grpcOpts := []grpc.DialOption{
grpc.WithChainUnaryInterceptor(
qos.NewAdjustOutgoingIOTagUnaryClientInterceptor(),
metrics.NewUnaryClientInterceptor(),
tracing.NewUnaryClientInteceptor(),
tagging.NewUnaryClientInteceptor(),
),
grpc.WithChainStreamInterceptor(
qos.NewAdjustOutgoingIOTagStreamClientInterceptor(),
metrics.NewStreamClientInterceptor(),
tracing.NewStreamClientInterceptor(),
tagging.NewStreamClientInterceptor(),
),
grpc.WithContextDialer(x.opts.DialerSource.GrpcContextDialer()),
grpc.WithDefaultCallOptions(grpc.WaitForReady(true)),

132
pkg/services/object/qos.go Normal file
View file

@ -0,0 +1,132 @@
package object
import (
"context"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/api/object"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/api/session"
)
var _ ServiceServer = (*qosObjectService)(nil)
type AdjustIOTag interface {
AdjustIncomingTag(ctx context.Context, requestSignPublicKey []byte) context.Context
}
type qosObjectService struct {
aarifullin marked this conversation as resolved Outdated

qoSObjectService -> qosObjectService? :)

`qoSObjectService` -> `qosObjectService`? :)

fixed

fixed
next ServiceServer
adj AdjustIOTag
}
func NewQoSObjectService(next ServiceServer, adjIOTag AdjustIOTag) ServiceServer {
return &qosObjectService{
next: next,
adj: adjIOTag,
}
}
func (q *qosObjectService) Delete(ctx context.Context, req *object.DeleteRequest) (*object.DeleteResponse, error) {
ctx = q.adj.AdjustIncomingTag(ctx, req.GetVerificationHeader().GetBodySignature().GetKey())
return q.next.Delete(ctx, req)
}
func (q *qosObjectService) Get(req *object.GetRequest, s GetObjectStream) error {
ctx := q.adj.AdjustIncomingTag(s.Context(), req.GetVerificationHeader().GetBodySignature().GetKey())
return q.next.Get(req, &qosReadStream[*object.GetResponse]{
ctxF: func() context.Context { return ctx },
sender: s,
})
}
func (q *qosObjectService) GetRange(req *object.GetRangeRequest, s GetObjectRangeStream) error {
ctx := q.adj.AdjustIncomingTag(s.Context(), req.GetVerificationHeader().GetBodySignature().GetKey())
return q.next.GetRange(req, &qosReadStream[*object.GetRangeResponse]{
ctxF: func() context.Context { return ctx },
sender: s,
})
}
func (q *qosObjectService) GetRangeHash(ctx context.Context, req *object.GetRangeHashRequest) (*object.GetRangeHashResponse, error) {
ctx = q.adj.AdjustIncomingTag(ctx, req.GetVerificationHeader().GetBodySignature().GetKey())
return q.next.GetRangeHash(ctx, req)
}
func (q *qosObjectService) Head(ctx context.Context, req *object.HeadRequest) (*object.HeadResponse, error) {
ctx = q.adj.AdjustIncomingTag(ctx, req.GetVerificationHeader().GetBodySignature().GetKey())
return q.next.Head(ctx, req)
}
func (q *qosObjectService) Patch(ctx context.Context) (PatchObjectStream, error) {
s, err := q.next.Patch(ctx)
if err != nil {
return nil, err
}
return &qosWriteStream[*object.PatchRequest, *object.PatchResponse]{
s: s,
adj: q.adj,
}, nil
}
func (q *qosObjectService) Put(ctx context.Context) (PutObjectStream, error) {
s, err := q.next.Put(ctx)
if err != nil {
return nil, err
}
return &qosWriteStream[*object.PutRequest, *object.PutResponse]{
s: s,
adj: q.adj,
}, nil
}
func (q *qosObjectService) PutSingle(ctx context.Context, req *object.PutSingleRequest) (*object.PutSingleResponse, error) {
ctx = q.adj.AdjustIncomingTag(ctx, req.GetVerificationHeader().GetBodySignature().GetKey())
return q.next.PutSingle(ctx, req)
}
func (q *qosObjectService) Search(req *object.SearchRequest, s SearchStream) error {
ctx := q.adj.AdjustIncomingTag(s.Context(), req.GetVerificationHeader().GetBodySignature().GetKey())
return q.next.Search(req, &qosReadStream[*object.SearchResponse]{
ctxF: func() context.Context { return ctx },
sender: s,
})
}
type qosSend[T any] interface {
Send(T) error
}
type qosReadStream[T any] struct {
sender qosSend[T]
ctxF func() context.Context
}
func (g *qosReadStream[T]) Context() context.Context {
return g.ctxF()
}
func (g *qosReadStream[T]) Send(resp T) error {
return g.sender.Send(resp)
}
type qosVerificationHeader interface {
GetVerificationHeader() *session.RequestVerificationHeader
}
type qosSendRecv[TReq qosVerificationHeader, TResp any] interface {
Send(context.Context, TReq) error
CloseAndRecv(context.Context) (TResp, error)
}
type qosWriteStream[TReq qosVerificationHeader, TResp any] struct {
s qosSendRecv[TReq, TResp]
adj AdjustIOTag
}
func (q *qosWriteStream[TReq, TResp]) CloseAndRecv(ctx context.Context) (TResp, error) {
return q.s.CloseAndRecv(ctx)
}
func (q *qosWriteStream[TReq, TResp]) Send(ctx context.Context, req TReq) error {
ctx = q.adj.AdjustIncomingTag(ctx, req.GetVerificationHeader().GetBodySignature().GetKey())
return q.s.Send(ctx, req)
}

View file

@ -7,7 +7,9 @@ import (
"time"
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/qos"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/engine"
"git.frostfs.info/TrueCloudLab/frostfs-qos/tagging"
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
"go.uber.org/zap"
)
@ -18,6 +20,7 @@ func (p *Policer) Run(ctx context.Context) {
}
func (p *Policer) shardPolicyWorker(ctx context.Context) {
ctx = tagging.ContextWithIOTag(ctx, qos.IOTagPolicer.String())
for {
select {
case <-ctx.Done():

View file

@ -9,9 +9,11 @@ import (
"time"
internalNet "git.frostfs.info/TrueCloudLab/frostfs-node/internal/net"
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/qos"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/network"
metrics "git.frostfs.info/TrueCloudLab/frostfs-observability/metrics/grpc"
tracing "git.frostfs.info/TrueCloudLab/frostfs-observability/tracing/grpc"
"git.frostfs.info/TrueCloudLab/frostfs-qos/tagging"
"github.com/hashicorp/golang-lru/v2/simplelru"
"google.golang.org/grpc"
"google.golang.org/grpc/connectivity"
@ -95,12 +97,16 @@ func (c *clientCache) dialTreeService(ctx context.Context, netmapAddr string) (*
opts := []grpc.DialOption{
grpc.WithChainUnaryInterceptor(
qos.NewAdjustOutgoingIOTagUnaryClientInterceptor(),
metrics.NewUnaryClientInterceptor(),
tracing.NewUnaryClientInteceptor(),
tagging.NewUnaryClientInteceptor(),
),
grpc.WithChainStreamInterceptor(
qos.NewAdjustOutgoingIOTagStreamClientInterceptor(),
metrics.NewStreamClientInterceptor(),
tracing.NewStreamClientInterceptor(),
tagging.NewStreamClientInterceptor(),
),
grpc.WithContextDialer(c.ds.GrpcContextDialer()),
grpc.WithDefaultCallOptions(grpc.WaitForReady(true)),

101
pkg/services/tree/qos.go Normal file
View file

@ -0,0 +1,101 @@
package tree
import (
"context"
"google.golang.org/grpc"
)
var _ TreeServiceServer = (*ioTagAdjust)(nil)
type AdjustIOTag interface {
AdjustIncomingTag(ctx context.Context, requestSignPublicKey []byte) context.Context
}
type ioTagAdjust struct {
s TreeServiceServer
a AdjustIOTag
}
func NewIOTagAdjustServer(s TreeServiceServer, a AdjustIOTag) TreeServiceServer {
return &ioTagAdjust{
s: s,
a: a,
}
}
func (i *ioTagAdjust) Add(ctx context.Context, req *AddRequest) (*AddResponse, error) {
ctx = i.a.AdjustIncomingTag(ctx, req.GetSignature().GetKey())
fyrchik marked this conversation as resolved Outdated

I don't understand all these different places where we work with tags.
Consider tree service: we have 2 interceptors and also NewIOTagAdjustServer.
Why are all those different?

I don't understand all these different places where we work with tags. Consider tree service: we have 2 interceptors and also `NewIOTagAdjustServer`. Why are all those different?
  1. tagging.NewUnaryServerInterceptor() and tagging.NewStreamServerInterceptor() extract an IO tag from the metadata of incommin grpc request and store it in the context.Context. They don't know anything about frostfs-node allowed tags.
  2. tagging.NewUnaryClientInteceptor() and tagging.NewStreamClientInterceptor() extract an IO tag from context.Context and store it in the metadata of outcomming grpc request. They also don't know anything about frostfs-node allowed tags.
  3. AdjustIncommingTag checks if incomming request's signer has rights to use internal or critical IO tag and also that tag has allowed value.
  4. qos.NewAdjustOutgoingIOTagUnaryClientInterceptor() and qos.NewAdjustOutgoingIOTagStreamClientInterceptor() replace background, policer and writecache tag values with internal IO tag value for outcomming requests.
1. `tagging.NewUnaryServerInterceptor()` and `tagging.NewStreamServerInterceptor()` extract an IO tag from the metadata of incommin grpc request and store it in the context.Context. They don't know anything about frostfs-node allowed tags. 2. `tagging.NewUnaryClientInteceptor()` and `tagging.NewStreamClientInterceptor()` extract an IO tag from context.Context and store it in the metadata of outcomming grpc request. They also don't know anything about frostfs-node allowed tags. 3. `AdjustIncommingTag` checks if incomming request's signer has rights to use `internal` or `critical` IO tag and also that tag has allowed value. 4. `qos.NewAdjustOutgoingIOTagUnaryClientInterceptor()` and `qos.NewAdjustOutgoingIOTagStreamClientInterceptor()` replace `background`, `policer` and `writecache` tag values with `internal` IO tag value for outcomming requests.

Regarding (4), I think there is a slight confusion:
Any thread, initiating network request could set its meta explicitly (policer, replicator).
And writecache doesn't create network requests, for example.

Do you think introducing interceptor is a better solution, than explicit internal on all callsites?

Regarding (4), I think there is a slight confusion: Any thread, initiating network request could set its meta explicitly (policer, replicator). And writecache doesn't create network requests, for example. Do you think introducing interceptor is a better solution, than explicit `internal` on all callsites?

Thanks, it became clearer know.
Adjust doesn't scream VALIDATION, thats where my confusion came from.

Thanks, it became clearer know. `Adjust` doesn't scream VALIDATION, thats where my confusion came from.

Do you think introducing interceptor is a better solution, than explicit internal on all callsites?

I tried, but it looked very scary (especially in policer/replicator).

> Do you think introducing interceptor is a better solution, than explicit internal on all callsites? I tried, but it looked very scary (especially in policer/replicator).
return i.s.Add(ctx, req)
}
func (i *ioTagAdjust) AddByPath(ctx context.Context, req *AddByPathRequest) (*AddByPathResponse, error) {
ctx = i.a.AdjustIncomingTag(ctx, req.GetSignature().GetKey())
return i.s.AddByPath(ctx, req)
}
func (i *ioTagAdjust) Apply(ctx context.Context, req *ApplyRequest) (*ApplyResponse, error) {
ctx = i.a.AdjustIncomingTag(ctx, req.GetSignature().GetKey())
return i.s.Apply(ctx, req)
}
func (i *ioTagAdjust) GetNodeByPath(ctx context.Context, req *GetNodeByPathRequest) (*GetNodeByPathResponse, error) {
ctx = i.a.AdjustIncomingTag(ctx, req.GetSignature().GetKey())
return i.s.GetNodeByPath(ctx, req)
}
func (i *ioTagAdjust) GetOpLog(req *GetOpLogRequest, srv TreeService_GetOpLogServer) error {
ctx := i.a.AdjustIncomingTag(srv.Context(), req.GetSignature().GetKey())
return i.s.GetOpLog(req, &qosServerWrapper[*GetOpLogResponse]{
sender: srv,
ServerStream: srv,
ctxF: func() context.Context { return ctx },
})
}
func (i *ioTagAdjust) GetSubTree(req *GetSubTreeRequest, srv TreeService_GetSubTreeServer) error {
ctx := i.a.AdjustIncomingTag(srv.Context(), req.GetSignature().GetKey())
return i.s.GetSubTree(req, &qosServerWrapper[*GetSubTreeResponse]{
sender: srv,
ServerStream: srv,
ctxF: func() context.Context { return ctx },
})
}
func (i *ioTagAdjust) Healthcheck(ctx context.Context, req *HealthcheckRequest) (*HealthcheckResponse, error) {
ctx = i.a.AdjustIncomingTag(ctx, req.GetSignature().GetKey())
return i.s.Healthcheck(ctx, req)
}
func (i *ioTagAdjust) Move(ctx context.Context, req *MoveRequest) (*MoveResponse, error) {
ctx = i.a.AdjustIncomingTag(ctx, req.GetSignature().GetKey())
return i.s.Move(ctx, req)
}
func (i *ioTagAdjust) Remove(ctx context.Context, req *RemoveRequest) (*RemoveResponse, error) {
ctx = i.a.AdjustIncomingTag(ctx, req.GetSignature().GetKey())
return i.s.Remove(ctx, req)
}
func (i *ioTagAdjust) TreeList(ctx context.Context, req *TreeListRequest) (*TreeListResponse, error) {
ctx = i.a.AdjustIncomingTag(ctx, req.GetSignature().GetKey())
return i.s.TreeList(ctx, req)
}
type qosSend[T any] interface {
Send(T) error
}
type qosServerWrapper[T any] struct {
grpc.ServerStream
sender qosSend[T]
ctxF func() context.Context
}
func (w *qosServerWrapper[T]) Send(resp T) error {
return w.sender.Send(resp)
}
func (w *qosServerWrapper[T]) Context() context.Context {
return w.ctxF()
}

View file

@ -9,9 +9,11 @@ import (
"sync"
"sync/atomic"
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/qos"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/pilorama"
checkercore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/common/ape"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger"
"git.frostfs.info/TrueCloudLab/frostfs-qos/tagging"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/acl"
cidSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
netmapSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/netmap"
@ -83,6 +85,7 @@ func New(opts ...Option) *Service {
// Start starts the service.
func (s *Service) Start(ctx context.Context) {
ctx = tagging.ContextWithIOTag(ctx, qos.IOTagBackground.String())
go s.replicateLoop(ctx)
go s.syncLoop(ctx)

View file

@ -13,6 +13,7 @@ import (
"time"
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/qos"
containerCore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/container"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/pilorama"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/client/netmap"
@ -20,6 +21,7 @@ import (
metrics "git.frostfs.info/TrueCloudLab/frostfs-observability/metrics/grpc"
tracing "git.frostfs.info/TrueCloudLab/frostfs-observability/tracing"
tracing_grpc "git.frostfs.info/TrueCloudLab/frostfs-observability/tracing/grpc"
"git.frostfs.info/TrueCloudLab/frostfs-qos/tagging"
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
netmapSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/netmap"
"github.com/panjf2000/ants/v2"
@ -340,12 +342,16 @@ func (s *Service) synchronizeTree(ctx context.Context, cid cid.ID, from uint64,
func (*Service) createConnection(a network.Address) (*grpc.ClientConn, error) {
return grpc.NewClient(a.URIAddr(),
grpc.WithChainUnaryInterceptor(
qos.NewAdjustOutgoingIOTagUnaryClientInterceptor(),
metrics.NewUnaryClientInterceptor(),
tracing_grpc.NewUnaryClientInteceptor(),
tagging.NewUnaryClientInteceptor(),
),
grpc.WithChainStreamInterceptor(
qos.NewAdjustOutgoingIOTagStreamClientInterceptor(),
metrics.NewStreamClientInterceptor(),
tracing_grpc.NewStreamClientInterceptor(),
tagging.NewStreamClientInterceptor(),
),
grpc.WithTransportCredentials(insecure.NewCredentials()),
grpc.WithDefaultCallOptions(grpc.WaitForReady(true)),

View file

@ -4,37 +4,32 @@ import (
"context"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/tracing"
qos "git.frostfs.info/TrueCloudLab/frostfs-qos/tagging"
"go.uber.org/zap"
)
func (l *Logger) Debug(ctx context.Context, msg string, fields ...zap.Field) {
if traceID := tracing.GetTraceID(ctx); traceID != "" {
l.z.Debug(msg, append(fields, zap.String("trace_id", traceID))...)
return
}
l.z.Debug(msg, fields...)
l.z.Debug(msg, appendContext(ctx, fields...)...)
}
func (l *Logger) Info(ctx context.Context, msg string, fields ...zap.Field) {
if traceID := tracing.GetTraceID(ctx); traceID != "" {
l.z.Info(msg, append(fields, zap.String("trace_id", traceID))...)
return
}
l.z.Info(msg, fields...)
l.z.Info(msg, appendContext(ctx, fields...)...)
}
func (l *Logger) Warn(ctx context.Context, msg string, fields ...zap.Field) {
if traceID := tracing.GetTraceID(ctx); traceID != "" {
l.z.Warn(msg, append(fields, zap.String("trace_id", traceID))...)
return
}
l.z.Warn(msg, fields...)
l.z.Warn(msg, appendContext(ctx, fields...)...)
}
func (l *Logger) Error(ctx context.Context, msg string, fields ...zap.Field) {
if traceID := tracing.GetTraceID(ctx); traceID != "" {
l.z.Error(msg, append(fields, zap.String("trace_id", traceID))...)
return
}
l.z.Error(msg, fields...)
l.z.Error(msg, appendContext(ctx, fields...)...)
}
func appendContext(ctx context.Context, fields ...zap.Field) []zap.Field {

@acid-ant please, look
The PR with logging tags will be adapted, depending on the merge order.

@acid-ant please, look The PR with logging tags will be adapted, depending on the merge order.

Yeah, saw this, merge will be easy.

Yeah, saw this, merge will be easy.
if traceID := tracing.GetTraceID(ctx); traceID != "" {
fields = append(fields, zap.String("trace_id", traceID))
}
if ioTag, ioTagDefined := qos.IOTagFromContext(ctx); ioTagDefined {
fields = append(fields, zap.String("io_tag", ioTag))
}
return fields
}