Set IO tags #1608
31 changed files with 625 additions and 30 deletions
|
@ -9,7 +9,6 @@ import (
|
|||
"git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-cli/internal/commonflags"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/network"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/tree"
|
||||
metrics "git.frostfs.info/TrueCloudLab/frostfs-observability/metrics/grpc"
|
||||
tracing "git.frostfs.info/TrueCloudLab/frostfs-observability/tracing/grpc"
|
||||
"github.com/spf13/cobra"
|
||||
"github.com/spf13/viper"
|
||||
|
@ -34,11 +33,9 @@ func _client() (tree.TreeServiceClient, error) {
|
|||
|
||||
opts := []grpc.DialOption{
|
||||
grpc.WithChainUnaryInterceptor(
|
||||
metrics.NewUnaryClientInterceptor(),
|
||||
tracing.NewUnaryClientInteceptor(),
|
||||
),
|
||||
grpc.WithChainStreamInterceptor(
|
||||
metrics.NewStreamClientInterceptor(),
|
||||
tracing.NewStreamClientInterceptor(),
|
||||
),
|
||||
grpc.WithDefaultCallOptions(grpc.WaitForReady(true)),
|
||||
|
|
|
@ -493,6 +493,7 @@ type cfg struct {
|
|||
cfgNetmap cfgNetmap
|
||||
cfgControlService cfgControlService
|
||||
cfgObject cfgObject
|
||||
cfgQoSService cfgQoSService
|
||||
}
|
||||
|
||||
// ReadCurrentNetMap reads network map which has been cached at the
|
||||
|
|
46
cmd/frostfs-node/config/qos/config.go
Normal file
46
cmd/frostfs-node/config/qos/config.go
Normal file
|
@ -0,0 +1,46 @@
|
|||
package qos
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config"
|
||||
"github.com/nspcc-dev/neo-go/pkg/crypto/keys"
|
||||
)
|
||||
|
||||
const (
|
||||
subsection = "qos"
|
||||
criticalSubSection = "critical"
|
||||
internalSubSection = "internal"
|
||||
)
|
||||
|
||||
// CriticalAuthorizedKeys parses and returns an array of "critical.authorized_keys" config
|
||||
// parameter from "qos" section.
|
||||
//
|
||||
// Returns an empty list if not set.
|
||||
func CriticalAuthorizedKeys(c *config.Config) keys.PublicKeys {
|
||||
return authorizedKeys(c, criticalSubSection)
|
||||
}
|
||||
|
||||
// InternalAuthorizedKeys parses and returns an array of "internal.authorized_keys" config
|
||||
// parameter from "qos" section.
|
||||
//
|
||||
// Returns an empty list if not set.
|
||||
func InternalAuthorizedKeys(c *config.Config) keys.PublicKeys {
|
||||
return authorizedKeys(c, internalSubSection)
|
||||
}
|
||||
|
||||
func authorizedKeys(c *config.Config, sub string) keys.PublicKeys {
|
||||
strKeys := config.StringSliceSafe(c.Sub(subsection).Sub(sub), "authorized_keys")
|
||||
pubs := make(keys.PublicKeys, 0, len(strKeys))
|
||||
|
||||
for i := range strKeys {
|
||||
pub, err := keys.NewPublicKeyFromString(strKeys[i])
|
||||
if err != nil {
|
||||
panic(fmt.Errorf("invalid authorized key %s for qos.%s: %w", strKeys[i], sub, err))
|
||||
}
|
||||
|
||||
pubs = append(pubs, pub)
|
||||
}
|
||||
|
||||
return pubs
|
||||
}
|
40
cmd/frostfs-node/config/qos/config_test.go
Normal file
40
cmd/frostfs-node/config/qos/config_test.go
Normal file
|
@ -0,0 +1,40 @@
|
|||
package qos
|
||||
|
||||
import (
|
||||
"testing"
|
||||
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config"
|
||||
configtest "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/test"
|
||||
"github.com/nspcc-dev/neo-go/pkg/crypto/keys"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
func TestQoSSection(t *testing.T) {
|
||||
t.Run("defaults", func(t *testing.T) {
|
||||
empty := configtest.EmptyConfig()
|
||||
|
||||
require.Empty(t, CriticalAuthorizedKeys(empty))
|
||||
require.Empty(t, InternalAuthorizedKeys(empty))
|
||||
})
|
||||
|
||||
const path = "../../../../config/example/node"
|
||||
|
||||
criticalPubs := make(keys.PublicKeys, 2)
|
||||
criticalPubs[0], _ = keys.NewPublicKeyFromString("035839e45d472a3b7769a2a1bd7d54c4ccd4943c3b40f547870e83a8fcbfb3ce11")
|
||||
criticalPubs[1], _ = keys.NewPublicKeyFromString("028f42cfcb74499d7b15b35d9bff260a1c8d27de4f446a627406a382d8961486d6")
|
||||
|
||||
internalPubs := make(keys.PublicKeys, 2)
|
||||
internalPubs[0], _ = keys.NewPublicKeyFromString("02b3622bf4017bdfe317c58aed5f4c753f206b7db896046fa7d774bbc4bf7f8dc2")
|
||||
internalPubs[1], _ = keys.NewPublicKeyFromString("031a6c6fbbdf02ca351745fa86b9ba5a9452d785ac4f7fc2b7548ca2a46c4fcf4a")
|
||||
|
||||
fileConfigTest := func(c *config.Config) {
|
||||
require.Equal(t, criticalPubs, CriticalAuthorizedKeys(c))
|
||||
require.Equal(t, internalPubs, InternalAuthorizedKeys(c))
|
||||
}
|
||||
|
||||
configtest.ForEachFileType(path, fileConfigTest)
|
||||
|
||||
t.Run("ENV", func(t *testing.T) {
|
||||
configtest.ForEnvFileType(t, path, fileConfigTest)
|
||||
})
|
||||
}
|
|
@ -7,9 +7,12 @@ import (
|
|||
|
||||
controlconfig "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/control"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/qos"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/control"
|
||||
controlSvc "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/control/server"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/sdnotify"
|
||||
metrics "git.frostfs.info/TrueCloudLab/frostfs-observability/metrics/grpc"
|
||||
tracing "git.frostfs.info/TrueCloudLab/frostfs-observability/tracing/grpc"
|
||||
"go.uber.org/zap"
|
||||
"google.golang.org/grpc"
|
||||
)
|
||||
|
@ -50,7 +53,14 @@ func initControlService(ctx context.Context, c *cfg) {
|
|||
return
|
||||
}
|
||||
|
||||
c.cfgControlService.server = grpc.NewServer()
|
||||
c.cfgControlService.server = grpc.NewServer(
|
||||
grpc.ChainUnaryInterceptor(
|
||||
qos.NewSetCriticalIOTagUnaryServerInterceptor(),
|
||||
metrics.NewUnaryServerInterceptor(),
|
||||
tracing.NewUnaryServerInterceptor(),
|
||||
),
|
||||
// control service has no stream methods, so no stream interceptors added
|
||||
)
|
||||
|
||||
c.onShutdown(func() {
|
||||
stopGRPC(ctx, "FrostFS Control API", c.cfgControlService.server, c.log)
|
||||
|
|
|
@ -12,6 +12,7 @@ import (
|
|||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger"
|
||||
metrics "git.frostfs.info/TrueCloudLab/frostfs-observability/metrics/grpc"
|
||||
tracing "git.frostfs.info/TrueCloudLab/frostfs-observability/tracing/grpc"
|
||||
qos "git.frostfs.info/TrueCloudLab/frostfs-qos/tagging"
|
||||
"go.uber.org/zap"
|
||||
"google.golang.org/grpc"
|
||||
"google.golang.org/grpc/credentials"
|
||||
|
@ -130,10 +131,12 @@ func getGrpcServerOpts(ctx context.Context, c *cfg, sc *grpcconfig.Config) ([]gr
|
|||
serverOpts := []grpc.ServerOption{
|
||||
grpc.MaxRecvMsgSize(maxRecvMsgSize),
|
||||
grpc.ChainUnaryInterceptor(
|
||||
qos.NewUnaryServerInterceptor(),
|
||||
metrics.NewUnaryServerInterceptor(),
|
||||
tracing.NewUnaryServerInterceptor(),
|
||||
),
|
||||
grpc.ChainStreamInterceptor(
|
||||
qos.NewStreamServerInterceptor(),
|
||||
metrics.NewStreamServerInterceptor(),
|
||||
tracing.NewStreamServerInterceptor(),
|
||||
),
|
||||
|
|
|
@ -101,6 +101,7 @@ func initApp(ctx context.Context, c *cfg) {
|
|||
|
||||
initAndLog(ctx, c, "gRPC", func(c *cfg) { initGRPC(ctx, c) })
|
||||
initAndLog(ctx, c, "netmap", func(c *cfg) { initNetmapService(ctx, c) })
|
||||
initAndLog(ctx, c, "qos", func(c *cfg) { initQoSService(c) })
|
||||
|
||||
initAccessPolicyEngine(ctx, c)
|
||||
initAndLog(ctx, c, "access policy engine", func(c *cfg) {
|
||||
|
|
|
@ -168,7 +168,7 @@ func initObjectService(c *cfg) {
|
|||
sPatch := createPatchSvc(sGet, sPut)
|
||||
|
||||
// build service pipeline
|
||||
// grpc | audit | <metrics> | signature | response | acl | ape | split
|
||||
// grpc | audit | qos | <metrics> | signature | response | acl | ape | split
|
||||
|
||||
splitSvc := createSplitService(c, sPutV2, sGetV2, sSearchV2, sDeleteV2, sPatch)
|
||||
|
||||
|
@ -191,7 +191,8 @@ func initObjectService(c *cfg) {
|
|||
|
||||
c.shared.metricsSvc = objectService.NewMetricCollector(
|
||||
signSvc, c.metricsCollector.ObjectService(), metricsconfig.Enabled(c.appCfg))
|
||||
auditSvc := objectService.NewAuditService(c.shared.metricsSvc, c.log, c.audit)
|
||||
qosService := objectService.NewQoSObjectService(c.shared.metricsSvc, &c.cfgQoSService)
|
||||
auditSvc := objectService.NewAuditService(qosService, c.log, c.audit)
|
||||
server := objectTransportGRPC.New(auditSvc)
|
||||
|
||||
c.cfgGRPC.performAndSave(func(_ string, _ net.Listener, s *grpc.Server) {
|
||||
|
|
95
cmd/frostfs-node/qos.go
Normal file
95
cmd/frostfs-node/qos.go
Normal file
|
@ -0,0 +1,95 @@
|
|||
package main
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
|
||||
qosconfig "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/qos"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/qos"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/netmap"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger"
|
||||
qosTagging "git.frostfs.info/TrueCloudLab/frostfs-qos/tagging"
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
type cfgQoSService struct {
|
||||
netmapSource netmap.Source
|
||||
logger *logger.Logger
|
||||
allowedCriticalPubs [][]byte
|
||||
allowedInternalPubs [][]byte
|
||||
}
|
||||
|
||||
func initQoSService(c *cfg) {
|
||||
criticalPubs := qosconfig.CriticalAuthorizedKeys(c.appCfg)
|
||||
internalPubs := qosconfig.InternalAuthorizedKeys(c.appCfg)
|
||||
rawCriticalPubs := make([][]byte, 0, len(criticalPubs))
|
||||
rawInternalPubs := make([][]byte, 0, len(internalPubs))
|
||||
for i := range criticalPubs {
|
||||
rawCriticalPubs = append(rawCriticalPubs, criticalPubs[i].Bytes())
|
||||
}
|
||||
for i := range internalPubs {
|
||||
rawInternalPubs = append(rawInternalPubs, internalPubs[i].Bytes())
|
||||
}
|
||||
|
||||
c.cfgQoSService = cfgQoSService{
|
||||
netmapSource: c.netMapSource,
|
||||
logger: c.log,
|
||||
allowedCriticalPubs: rawCriticalPubs,
|
||||
allowedInternalPubs: rawInternalPubs,
|
||||
}
|
||||
}
|
||||
|
||||
func (s *cfgQoSService) AdjustIncomingTag(ctx context.Context, requestSignPublicKey []byte) context.Context {
|
||||
rawTag, defined := qosTagging.IOTagFromContext(ctx)
|
||||
if !defined {
|
||||
return qosTagging.ContextWithIOTag(ctx, qos.IOTagClient.String())
|
||||
}
|
||||
ioTag, err := qos.FromRawString(rawTag)
|
||||
if err != nil {
|
||||
s.logger.Warn(ctx, logs.FailedToParseIncomingIOTag, zap.Error(err))
|
||||
return qosTagging.ContextWithIOTag(ctx, qos.IOTagClient.String())
|
||||
}
|
||||
|
||||
switch ioTag {
|
||||
case qos.IOTagClient:
|
||||
return ctx
|
||||
case qos.IOTagCritical:
|
||||
for _, pk := range s.allowedCriticalPubs {
|
||||
if bytes.Equal(pk, requestSignPublicKey) {
|
||||
return ctx
|
||||
}
|
||||
}
|
||||
nm, err := s.netmapSource.GetNetMap(ctx, 0)
|
||||
if err != nil {
|
||||
s.logger.Debug(ctx, logs.FailedToGetNetmapToAdjustIOTag, zap.Error(err))
|
||||
return qosTagging.ContextWithIOTag(ctx, qos.IOTagClient.String())
|
||||
}
|
||||
for _, node := range nm.Nodes() {
|
||||
if bytes.Equal(node.PublicKey(), requestSignPublicKey) {
|
||||
return ctx
|
||||
}
|
||||
}
|
||||
return qosTagging.ContextWithIOTag(ctx, qos.IOTagClient.String())
|
||||
case qos.IOTagInternal:
|
||||
for _, pk := range s.allowedInternalPubs {
|
||||
if bytes.Equal(pk, requestSignPublicKey) {
|
||||
return ctx
|
||||
}
|
||||
}
|
||||
nm, err := s.netmapSource.GetNetMap(ctx, 0)
|
||||
if err != nil {
|
||||
s.logger.Debug(ctx, logs.FailedToGetNetmapToAdjustIOTag, zap.Error(err))
|
||||
return qosTagging.ContextWithIOTag(ctx, qos.IOTagClient.String())
|
||||
}
|
||||
for _, node := range nm.Nodes() {
|
||||
if bytes.Equal(node.PublicKey(), requestSignPublicKey) {
|
||||
return ctx
|
||||
}
|
||||
}
|
||||
return qosTagging.ContextWithIOTag(ctx, qos.IOTagClient.String())
|
||||
default:
|
||||
s.logger.Warn(ctx, logs.NotSupportedIncomingIOTagReplacedWithClient, zap.Stringer("io_tag", ioTag))
|
||||
return qosTagging.ContextWithIOTag(ctx, qos.IOTagClient.String())
|
||||
}
|
||||
}
|
|
@ -72,7 +72,7 @@ func initTreeService(c *cfg) {
|
|||
)
|
||||
|
||||
c.cfgGRPC.performAndSave(func(_ string, _ net.Listener, s *grpc.Server) {
|
||||
tree.RegisterTreeServiceServer(s, c.treeService)
|
||||
tree.RegisterTreeServiceServer(s, tree.NewIOTagAdjustServer(c.treeService, &c.cfgQoSService))
|
||||
})
|
||||
|
||||
c.workers = append(c.workers, newWorkerFromFunc(func(ctx context.Context) {
|
||||
|
|
|
@ -225,3 +225,6 @@ FROSTFS_MULTINET_SUBNETS_1_SOURCE_IPS="10.78.70.185 10.78.71.185"
|
|||
FROSTFS_MULTINET_BALANCER=roundrobin
|
||||
FROSTFS_MULTINET_RESTRICT=false
|
||||
FROSTFS_MULTINET_FALLBACK_DELAY=350ms
|
||||
|
||||
FROSTFS_QOS_CRITICAL_AUTHORIZED_KEYS="035839e45d472a3b7769a2a1bd7d54c4ccd4943c3b40f547870e83a8fcbfb3ce11 028f42cfcb74499d7b15b35d9bff260a1c8d27de4f446a627406a382d8961486d6"
|
||||
FROSTFS_QOS_INTERNAL_AUTHORIZED_KEYS="02b3622bf4017bdfe317c58aed5f4c753f206b7db896046fa7d774bbc4bf7f8dc2 031a6c6fbbdf02ca351745fa86b9ba5a9452d785ac4f7fc2b7548ca2a46c4fcf4a"
|
||||
|
|
|
@ -305,5 +305,19 @@
|
|||
"balancer": "roundrobin",
|
||||
"restrict": false,
|
||||
"fallback_delay": "350ms"
|
||||
},
|
||||
"qos": {
|
||||
"critical": {
|
||||
"authorized_keys": [
|
||||
"035839e45d472a3b7769a2a1bd7d54c4ccd4943c3b40f547870e83a8fcbfb3ce11",
|
||||
"028f42cfcb74499d7b15b35d9bff260a1c8d27de4f446a627406a382d8961486d6"
|
||||
]
|
||||
},
|
||||
"internal": {
|
||||
"authorized_keys": [
|
||||
"02b3622bf4017bdfe317c58aed5f4c753f206b7db896046fa7d774bbc4bf7f8dc2",
|
||||
"031a6c6fbbdf02ca351745fa86b9ba5a9452d785ac4f7fc2b7548ca2a46c4fcf4a"
|
||||
]
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -270,3 +270,13 @@ multinet:
|
|||
balancer: roundrobin
|
||||
restrict: false
|
||||
fallback_delay: 350ms
|
||||
|
||||
qos:
|
||||
critical:
|
||||
authorized_keys: # list of hex-encoded public keys that have rights to use `critical` IO tag
|
||||
- 035839e45d472a3b7769a2a1bd7d54c4ccd4943c3b40f547870e83a8fcbfb3ce11
|
||||
- 028f42cfcb74499d7b15b35d9bff260a1c8d27de4f446a627406a382d8961486d6
|
||||
internal:
|
||||
authorized_keys: # list of hex-encoded public keys that have rights to use `internal` IO tag
|
||||
- 02b3622bf4017bdfe317c58aed5f4c753f206b7db896046fa7d774bbc4bf7f8dc2
|
||||
- 031a6c6fbbdf02ca351745fa86b9ba5a9452d785ac4f7fc2b7548ca2a46c4fcf4a
|
||||
|
|
|
@ -26,7 +26,8 @@ There are some custom types used for brevity:
|
|||
| `storage` | [Storage engine configuration](#storage-section) |
|
||||
| `runtime` | [Runtime configuration](#runtime-section) |
|
||||
| `audit` | [Audit configuration](#audit-section) |
|
||||
| `multinet` | [Multinet configuration](#multinet-section) |
|
||||
| `multinet` | [Multinet configuration](#multinet-section) |
|
||||
| `qos` | [QoS configuration](#qos-section) |
|
||||
|
||||
# `control` section
|
||||
```yaml
|
||||
|
@ -471,3 +472,20 @@ multinet:
|
|||
| `balancer` | `string` | "" | Balancer to select network interfaces, allowed values are "" (no balancing, use first suitable interface) or "roundrobin". |
|
||||
| `restrict` | `bool` | false | If `true` then any requests that do not match `subnets` will fail. |
|
||||
| `fallback_delay` | `duration` | 350ms | Delay before fallback to secondary IP addresses in case of hostname resolve. |
|
||||
|
||||
# `qos` section
|
||||
```yaml
|
||||
qos:
|
||||
critical:
|
||||
authorized_keys:
|
||||
- 035839e45d472a3b7769a2a1bd7d54c4ccd4943c3b40f547870e83a8fcbfb3ce11
|
||||
- 028f42cfcb74499d7b15b35d9bff260a1c8d27de4f446a627406a382d8961486d6
|
||||
internal:
|
||||
authorized_keys:
|
||||
- 035839e45d472a3b7769a2a1bd7d54c4ccd4943c3b40f547870e83a8fcbfb3ce11
|
||||
- 028f42cfcb74499d7b15b35d9bff260a1c8d27de4f446a627406a382d8961486d6
|
||||
```
|
||||
| Parameter | Type | Default value | Description |
|
||||
| -------------------------- | -------------- | ------------- | --------------------------------------------------------------------------- |
|
||||
| `critical.authorized_keys` | `[]public key` | empty | List of public keys for which requests with the tag `critical` are allowed. |
|
||||
| `internal.authorized_keys` | `[]public key` | empty | List of public keys for which requests with the tag `internal` are allowed. |
|
||||
|
|
1
go.mod
1
go.mod
|
@ -8,6 +8,7 @@ require (
|
|||
git.frostfs.info/TrueCloudLab/frostfs-crypto v0.6.0
|
||||
git.frostfs.info/TrueCloudLab/frostfs-locode-db v0.4.1-0.20240710074952-65761deb5c0d
|
||||
git.frostfs.info/TrueCloudLab/frostfs-observability v0.0.0-20241112082307-f17779933e88
|
||||
git.frostfs.info/TrueCloudLab/frostfs-qos v0.0.0-20250128150313-cfbca7fa1dfe
|
||||
git.frostfs.info/TrueCloudLab/frostfs-sdk-go v0.0.0-20250202151421-8389887a3421
|
||||
git.frostfs.info/TrueCloudLab/hrw v1.2.1
|
||||
git.frostfs.info/TrueCloudLab/multinet v0.0.0-20241015075604-6cb0d80e0972
|
||||
|
|
2
go.sum
2
go.sum
|
@ -8,6 +8,8 @@ git.frostfs.info/TrueCloudLab/frostfs-locode-db v0.4.1-0.20240710074952-65761deb
|
|||
git.frostfs.info/TrueCloudLab/frostfs-locode-db v0.4.1-0.20240710074952-65761deb5c0d/go.mod h1:7ZZq8iguY7qFsXajdHGmZd2AW4QbucyrJwhbsRfOfek=
|
||||
git.frostfs.info/TrueCloudLab/frostfs-observability v0.0.0-20241112082307-f17779933e88 h1:9bvBDLApbbO5sXBKdODpE9tzy3HV99nXxkDWNn22rdI=
|
||||
git.frostfs.info/TrueCloudLab/frostfs-observability v0.0.0-20241112082307-f17779933e88/go.mod h1:kbwB4v2o6RyOfCo9kEFeUDZIX3LKhmS0yXPrtvzkQ1g=
|
||||
git.frostfs.info/TrueCloudLab/frostfs-qos v0.0.0-20250128150313-cfbca7fa1dfe h1:81gDNdWNLP24oMQukRiCE9R1wGSh0l0dRq3F1W+Oesc=
|
||||
git.frostfs.info/TrueCloudLab/frostfs-qos v0.0.0-20250128150313-cfbca7fa1dfe/go.mod h1:PCijYq4oa8vKtIEcUX6jRiszI6XAW+nBwU+T1kB4d1U=
|
||||
git.frostfs.info/TrueCloudLab/frostfs-sdk-go v0.0.0-20250202151421-8389887a3421 h1:pP19IawSdsLCKFv7HMNfWAeH6E3uSnntKZkwka+/2+4=
|
||||
git.frostfs.info/TrueCloudLab/frostfs-sdk-go v0.0.0-20250202151421-8389887a3421/go.mod h1:aQpPWfG8oyfJ2X+FenPTJpSRWZjwcP5/RAtkW+/VEX8=
|
||||
git.frostfs.info/TrueCloudLab/hrw v1.2.1 h1:ccBRK21rFvY5R1WotI6LNoPlizk7qSvdfD8lNIRudVc=
|
||||
|
|
|
@ -510,4 +510,7 @@ const (
|
|||
BlobovniczatreeFailedToRemoveRebuildTempFile = "failed to remove rebuild temp file"
|
||||
WritecacheCantGetObject = "can't get an object from fstree"
|
||||
FailedToUpdateMultinetConfiguration = "failed to update multinet configuration"
|
||||
FailedToParseIncomingIOTag = "failed to parse incoming IO tag"
|
||||
NotSupportedIncomingIOTagReplacedWithClient = "incoming IO tag is not supported, replaced with `client`"
|
||||
FailedToGetNetmapToAdjustIOTag = "failed to get netmap to adjust IO tag, replaced with `client`"
|
||||
)
|
||||
|
|
51
internal/qos/grpc.go
Normal file
51
internal/qos/grpc.go
Normal file
|
@ -0,0 +1,51 @@
|
|||
package qos
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-qos/tagging"
|
||||
"google.golang.org/grpc"
|
||||
)
|
||||
|
||||
func NewSetCriticalIOTagUnaryServerInterceptor() grpc.UnaryServerInterceptor {
|
||||
return func(ctx context.Context, req any, _ *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (resp any, err error) {
|
||||
ctx = tagging.ContextWithIOTag(ctx, IOTagCritical.String())
|
||||
return handler(ctx, req)
|
||||
}
|
||||
}
|
||||
|
||||
func NewAdjustOutgoingIOTagUnaryClientInterceptor() grpc.UnaryClientInterceptor {
|
||||
return func(ctx context.Context, method string, req, reply any, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error {
|
||||
rawTag, ok := tagging.IOTagFromContext(ctx)
|
||||
if !ok {
|
||||
return invoker(ctx, method, req, reply, cc, opts...)
|
||||
}
|
||||
tag, err := FromRawString(rawTag)
|
||||
if err != nil {
|
||||
tag = IOTagClient
|
||||
}
|
||||
if tag == IOTagBackground || tag == IOTagPolicer || tag == IOTagWritecache {
|
||||
tag = IOTagInternal
|
||||
}
|
||||
ctx = tagging.ContextWithIOTag(ctx, tag.String())
|
||||
return invoker(ctx, method, req, reply, cc, opts...)
|
||||
}
|
||||
}
|
||||
|
||||
func NewAdjustOutgoingIOTagStreamClientInterceptor() grpc.StreamClientInterceptor {
|
||||
return func(ctx context.Context, desc *grpc.StreamDesc, cc *grpc.ClientConn, method string, streamer grpc.Streamer, opts ...grpc.CallOption) (grpc.ClientStream, error) {
|
||||
rawTag, ok := tagging.IOTagFromContext(ctx)
|
||||
if !ok {
|
||||
return streamer(ctx, desc, cc, method, opts...)
|
||||
}
|
||||
tag, err := FromRawString(rawTag)
|
||||
if err != nil {
|
||||
tag = IOTagClient
|
||||
}
|
||||
if tag == IOTagBackground || tag == IOTagPolicer || tag == IOTagWritecache {
|
||||
tag = IOTagInternal
|
||||
}
|
||||
ctx = tagging.ContextWithIOTag(ctx, tag.String())
|
||||
return streamer(ctx, desc, cc, method, opts...)
|
||||
}
|
||||
}
|
39
internal/qos/tags.go
Normal file
39
internal/qos/tags.go
Normal file
|
@ -0,0 +1,39 @@
|
|||
package qos
|
||||
|
||||
import "fmt"
|
||||
|
||||
type IOTag string
|
||||
|
||||
const (
|
||||
IOTagClient IOTag = "client"
|
||||
IOTagInternal IOTag = "internal"
|
||||
IOTagBackground IOTag = "background"
|
||||
IOTagWritecache IOTag = "writecache"
|
||||
IOTagPolicer IOTag = "policer"
|
||||
IOTagCritical IOTag = "critical"
|
||||
|
||||
ioTagUnknown IOTag = ""
|
||||
)
|
||||
|
||||
func FromRawString(s string) (IOTag, error) {
|
||||
switch s {
|
||||
case string(IOTagCritical):
|
||||
return IOTagCritical, nil
|
||||
case string(IOTagClient):
|
||||
return IOTagClient, nil
|
||||
case string(IOTagInternal):
|
||||
return IOTagInternal, nil
|
||||
case string(IOTagBackground):
|
||||
return IOTagBackground, nil
|
||||
case string(IOTagWritecache):
|
||||
return IOTagWritecache, nil
|
||||
case string(IOTagPolicer):
|
||||
return IOTagPolicer, nil
|
||||
default:
|
||||
return ioTagUnknown, fmt.Errorf("unknown tag %s", s)
|
||||
}
|
||||
}
|
||||
|
||||
func (t IOTag) String() string {
|
||||
return string(t)
|
||||
}
|
|
@ -45,7 +45,7 @@ func TestUpgradeV2ToV3(t *testing.T) {
|
|||
|
||||
type testContainerInfoProvider struct{}
|
||||
|
||||
func (p *testContainerInfoProvider) Info(id cid.ID) (container.Info, error) {
|
||||
func (p *testContainerInfoProvider) Info(ctx context.Context, id cid.ID) (container.Info, error) {
|
||||
return container.Info{}, nil
|
||||
}
|
||||
|
||||
|
|
|
@ -6,11 +6,13 @@ import (
|
|||
"time"
|
||||
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/qos"
|
||||
meta "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/metabase"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard/mode"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-observability/tracing"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-qos/tagging"
|
||||
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
|
||||
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
||||
"go.uber.org/zap"
|
||||
|
@ -149,7 +151,7 @@ func (gc *gc) init(ctx context.Context) {
|
|||
if sz > 0 {
|
||||
gc.workerPool = gc.workerPoolInit(sz)
|
||||
}
|
||||
|
||||
ctx = tagging.ContextWithIOTag(ctx, qos.IOTagBackground.String())
|
||||
gc.wg.Add(2)
|
||||
go gc.tickRemover(ctx)
|
||||
go gc.listenEvents(ctx)
|
||||
|
|
|
@ -6,10 +6,12 @@ import (
|
|||
"sync"
|
||||
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/qos"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor"
|
||||
meta "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/metabase"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-observability/tracing"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-qos/tagging"
|
||||
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
||||
"go.opentelemetry.io/otel/attribute"
|
||||
"go.opentelemetry.io/otel/trace"
|
||||
|
@ -103,6 +105,7 @@ func runRebuild(ctx context.Context, bs *blobstor.BlobStor, mb *meta.DB, log *lo
|
|||
default:
|
||||
}
|
||||
log.Info(ctx, logs.BlobstoreRebuildStarted)
|
||||
ctx = tagging.ContextWithIOTag(ctx, qos.IOTagBackground.String())
|
||||
if err := bs.Rebuild(ctx, &mbStorageIDUpdate{mb: mb}, limiter, fillPercent); err != nil {
|
||||
log.Warn(ctx, logs.FailedToRebuildBlobstore, zap.Error(err))
|
||||
} else {
|
||||
|
|
|
@ -6,6 +6,7 @@ import (
|
|||
"time"
|
||||
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/qos"
|
||||
objectCore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common"
|
||||
|
@ -14,6 +15,7 @@ import (
|
|||
meta "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/metabase"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard/mode"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-observability/tracing"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-qos/tagging"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client"
|
||||
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
|
||||
"go.opentelemetry.io/otel/attribute"
|
||||
|
@ -35,6 +37,7 @@ func (c *cache) runFlushLoop(ctx context.Context) {
|
|||
if c.disableBackgroundFlush {
|
||||
return
|
||||
}
|
||||
ctx = tagging.ContextWithIOTag(ctx, qos.IOTagWritecache.String())
|
||||
fl := newFlushLimiter(c.flushSizeLimit)
|
||||
c.wg.Add(1)
|
||||
go func() {
|
||||
|
|
6
pkg/network/cache/multi.go
vendored
6
pkg/network/cache/multi.go
vendored
|
@ -7,10 +7,12 @@ import (
|
|||
"sync"
|
||||
"time"
|
||||
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/qos"
|
||||
clientcore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/client"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/network"
|
||||
metrics "git.frostfs.info/TrueCloudLab/frostfs-observability/metrics/grpc"
|
||||
tracing "git.frostfs.info/TrueCloudLab/frostfs-observability/tracing/grpc"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-qos/tagging"
|
||||
rawclient "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/api/rpc/client"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client"
|
||||
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
|
||||
|
@ -62,12 +64,16 @@ func (x *multiClient) createForAddress(ctx context.Context, addr network.Address
|
|||
|
||||
grpcOpts := []grpc.DialOption{
|
||||
grpc.WithChainUnaryInterceptor(
|
||||
qos.NewAdjustOutgoingIOTagUnaryClientInterceptor(),
|
||||
metrics.NewUnaryClientInterceptor(),
|
||||
tracing.NewUnaryClientInteceptor(),
|
||||
tagging.NewUnaryClientInteceptor(),
|
||||
),
|
||||
grpc.WithChainStreamInterceptor(
|
||||
qos.NewAdjustOutgoingIOTagStreamClientInterceptor(),
|
||||
metrics.NewStreamClientInterceptor(),
|
||||
tracing.NewStreamClientInterceptor(),
|
||||
tagging.NewStreamClientInterceptor(),
|
||||
),
|
||||
grpc.WithContextDialer(x.opts.DialerSource.GrpcContextDialer()),
|
||||
grpc.WithDefaultCallOptions(grpc.WaitForReady(true)),
|
||||
|
|
132
pkg/services/object/qos.go
Normal file
132
pkg/services/object/qos.go
Normal file
|
@ -0,0 +1,132 @@
|
|||
package object
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/api/object"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/api/session"
|
||||
)
|
||||
|
||||
var _ ServiceServer = (*qosObjectService)(nil)
|
||||
|
||||
type AdjustIOTag interface {
|
||||
AdjustIncomingTag(ctx context.Context, requestSignPublicKey []byte) context.Context
|
||||
}
|
||||
|
||||
type qosObjectService struct {
|
||||
next ServiceServer
|
||||
adj AdjustIOTag
|
||||
}
|
||||
|
||||
func NewQoSObjectService(next ServiceServer, adjIOTag AdjustIOTag) ServiceServer {
|
||||
return &qosObjectService{
|
||||
next: next,
|
||||
adj: adjIOTag,
|
||||
}
|
||||
}
|
||||
|
||||
func (q *qosObjectService) Delete(ctx context.Context, req *object.DeleteRequest) (*object.DeleteResponse, error) {
|
||||
ctx = q.adj.AdjustIncomingTag(ctx, req.GetVerificationHeader().GetBodySignature().GetKey())
|
||||
return q.next.Delete(ctx, req)
|
||||
}
|
||||
|
||||
func (q *qosObjectService) Get(req *object.GetRequest, s GetObjectStream) error {
|
||||
ctx := q.adj.AdjustIncomingTag(s.Context(), req.GetVerificationHeader().GetBodySignature().GetKey())
|
||||
return q.next.Get(req, &qosReadStream[*object.GetResponse]{
|
||||
ctxF: func() context.Context { return ctx },
|
||||
sender: s,
|
||||
})
|
||||
}
|
||||
|
||||
func (q *qosObjectService) GetRange(req *object.GetRangeRequest, s GetObjectRangeStream) error {
|
||||
ctx := q.adj.AdjustIncomingTag(s.Context(), req.GetVerificationHeader().GetBodySignature().GetKey())
|
||||
return q.next.GetRange(req, &qosReadStream[*object.GetRangeResponse]{
|
||||
ctxF: func() context.Context { return ctx },
|
||||
sender: s,
|
||||
})
|
||||
}
|
||||
|
||||
func (q *qosObjectService) GetRangeHash(ctx context.Context, req *object.GetRangeHashRequest) (*object.GetRangeHashResponse, error) {
|
||||
ctx = q.adj.AdjustIncomingTag(ctx, req.GetVerificationHeader().GetBodySignature().GetKey())
|
||||
return q.next.GetRangeHash(ctx, req)
|
||||
}
|
||||
|
||||
func (q *qosObjectService) Head(ctx context.Context, req *object.HeadRequest) (*object.HeadResponse, error) {
|
||||
ctx = q.adj.AdjustIncomingTag(ctx, req.GetVerificationHeader().GetBodySignature().GetKey())
|
||||
return q.next.Head(ctx, req)
|
||||
}
|
||||
|
||||
func (q *qosObjectService) Patch(ctx context.Context) (PatchObjectStream, error) {
|
||||
s, err := q.next.Patch(ctx)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return &qosWriteStream[*object.PatchRequest, *object.PatchResponse]{
|
||||
s: s,
|
||||
adj: q.adj,
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (q *qosObjectService) Put(ctx context.Context) (PutObjectStream, error) {
|
||||
s, err := q.next.Put(ctx)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return &qosWriteStream[*object.PutRequest, *object.PutResponse]{
|
||||
s: s,
|
||||
adj: q.adj,
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (q *qosObjectService) PutSingle(ctx context.Context, req *object.PutSingleRequest) (*object.PutSingleResponse, error) {
|
||||
ctx = q.adj.AdjustIncomingTag(ctx, req.GetVerificationHeader().GetBodySignature().GetKey())
|
||||
return q.next.PutSingle(ctx, req)
|
||||
}
|
||||
|
||||
func (q *qosObjectService) Search(req *object.SearchRequest, s SearchStream) error {
|
||||
ctx := q.adj.AdjustIncomingTag(s.Context(), req.GetVerificationHeader().GetBodySignature().GetKey())
|
||||
return q.next.Search(req, &qosReadStream[*object.SearchResponse]{
|
||||
ctxF: func() context.Context { return ctx },
|
||||
sender: s,
|
||||
})
|
||||
}
|
||||
|
||||
type qosSend[T any] interface {
|
||||
Send(T) error
|
||||
}
|
||||
|
||||
type qosReadStream[T any] struct {
|
||||
sender qosSend[T]
|
||||
ctxF func() context.Context
|
||||
}
|
||||
|
||||
func (g *qosReadStream[T]) Context() context.Context {
|
||||
return g.ctxF()
|
||||
}
|
||||
|
||||
func (g *qosReadStream[T]) Send(resp T) error {
|
||||
return g.sender.Send(resp)
|
||||
}
|
||||
|
||||
type qosVerificationHeader interface {
|
||||
GetVerificationHeader() *session.RequestVerificationHeader
|
||||
}
|
||||
|
||||
type qosSendRecv[TReq qosVerificationHeader, TResp any] interface {
|
||||
Send(context.Context, TReq) error
|
||||
CloseAndRecv(context.Context) (TResp, error)
|
||||
}
|
||||
|
||||
type qosWriteStream[TReq qosVerificationHeader, TResp any] struct {
|
||||
s qosSendRecv[TReq, TResp]
|
||||
adj AdjustIOTag
|
||||
}
|
||||
|
||||
func (q *qosWriteStream[TReq, TResp]) CloseAndRecv(ctx context.Context) (TResp, error) {
|
||||
return q.s.CloseAndRecv(ctx)
|
||||
}
|
||||
|
||||
func (q *qosWriteStream[TReq, TResp]) Send(ctx context.Context, req TReq) error {
|
||||
ctx = q.adj.AdjustIncomingTag(ctx, req.GetVerificationHeader().GetBodySignature().GetKey())
|
||||
return q.s.Send(ctx, req)
|
||||
}
|
|
@ -7,7 +7,9 @@ import (
|
|||
"time"
|
||||
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/qos"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/engine"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-qos/tagging"
|
||||
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
@ -18,6 +20,7 @@ func (p *Policer) Run(ctx context.Context) {
|
|||
}
|
||||
|
||||
func (p *Policer) shardPolicyWorker(ctx context.Context) {
|
||||
ctx = tagging.ContextWithIOTag(ctx, qos.IOTagPolicer.String())
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
|
|
|
@ -9,9 +9,11 @@ import (
|
|||
"time"
|
||||
|
||||
internalNet "git.frostfs.info/TrueCloudLab/frostfs-node/internal/net"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/qos"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/network"
|
||||
metrics "git.frostfs.info/TrueCloudLab/frostfs-observability/metrics/grpc"
|
||||
tracing "git.frostfs.info/TrueCloudLab/frostfs-observability/tracing/grpc"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-qos/tagging"
|
||||
"github.com/hashicorp/golang-lru/v2/simplelru"
|
||||
"google.golang.org/grpc"
|
||||
"google.golang.org/grpc/connectivity"
|
||||
|
@ -95,12 +97,16 @@ func (c *clientCache) dialTreeService(ctx context.Context, netmapAddr string) (*
|
|||
|
||||
opts := []grpc.DialOption{
|
||||
grpc.WithChainUnaryInterceptor(
|
||||
qos.NewAdjustOutgoingIOTagUnaryClientInterceptor(),
|
||||
metrics.NewUnaryClientInterceptor(),
|
||||
tracing.NewUnaryClientInteceptor(),
|
||||
tagging.NewUnaryClientInteceptor(),
|
||||
),
|
||||
grpc.WithChainStreamInterceptor(
|
||||
qos.NewAdjustOutgoingIOTagStreamClientInterceptor(),
|
||||
metrics.NewStreamClientInterceptor(),
|
||||
tracing.NewStreamClientInterceptor(),
|
||||
tagging.NewStreamClientInterceptor(),
|
||||
),
|
||||
grpc.WithContextDialer(c.ds.GrpcContextDialer()),
|
||||
grpc.WithDefaultCallOptions(grpc.WaitForReady(true)),
|
||||
|
|
101
pkg/services/tree/qos.go
Normal file
101
pkg/services/tree/qos.go
Normal file
|
@ -0,0 +1,101 @@
|
|||
package tree
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
||||
"google.golang.org/grpc"
|
||||
)
|
||||
|
||||
var _ TreeServiceServer = (*ioTagAdjust)(nil)
|
||||
|
||||
type AdjustIOTag interface {
|
||||
AdjustIncomingTag(ctx context.Context, requestSignPublicKey []byte) context.Context
|
||||
}
|
||||
|
||||
type ioTagAdjust struct {
|
||||
s TreeServiceServer
|
||||
a AdjustIOTag
|
||||
}
|
||||
|
||||
func NewIOTagAdjustServer(s TreeServiceServer, a AdjustIOTag) TreeServiceServer {
|
||||
return &ioTagAdjust{
|
||||
s: s,
|
||||
a: a,
|
||||
}
|
||||
}
|
||||
|
||||
func (i *ioTagAdjust) Add(ctx context.Context, req *AddRequest) (*AddResponse, error) {
|
||||
ctx = i.a.AdjustIncomingTag(ctx, req.GetSignature().GetKey())
|
||||
return i.s.Add(ctx, req)
|
||||
}
|
||||
|
||||
func (i *ioTagAdjust) AddByPath(ctx context.Context, req *AddByPathRequest) (*AddByPathResponse, error) {
|
||||
ctx = i.a.AdjustIncomingTag(ctx, req.GetSignature().GetKey())
|
||||
return i.s.AddByPath(ctx, req)
|
||||
}
|
||||
|
||||
func (i *ioTagAdjust) Apply(ctx context.Context, req *ApplyRequest) (*ApplyResponse, error) {
|
||||
ctx = i.a.AdjustIncomingTag(ctx, req.GetSignature().GetKey())
|
||||
return i.s.Apply(ctx, req)
|
||||
}
|
||||
|
||||
func (i *ioTagAdjust) GetNodeByPath(ctx context.Context, req *GetNodeByPathRequest) (*GetNodeByPathResponse, error) {
|
||||
ctx = i.a.AdjustIncomingTag(ctx, req.GetSignature().GetKey())
|
||||
return i.s.GetNodeByPath(ctx, req)
|
||||
}
|
||||
|
||||
func (i *ioTagAdjust) GetOpLog(req *GetOpLogRequest, srv TreeService_GetOpLogServer) error {
|
||||
ctx := i.a.AdjustIncomingTag(srv.Context(), req.GetSignature().GetKey())
|
||||
return i.s.GetOpLog(req, &qosServerWrapper[*GetOpLogResponse]{
|
||||
sender: srv,
|
||||
ServerStream: srv,
|
||||
ctxF: func() context.Context { return ctx },
|
||||
})
|
||||
}
|
||||
|
||||
func (i *ioTagAdjust) GetSubTree(req *GetSubTreeRequest, srv TreeService_GetSubTreeServer) error {
|
||||
ctx := i.a.AdjustIncomingTag(srv.Context(), req.GetSignature().GetKey())
|
||||
return i.s.GetSubTree(req, &qosServerWrapper[*GetSubTreeResponse]{
|
||||
sender: srv,
|
||||
ServerStream: srv,
|
||||
ctxF: func() context.Context { return ctx },
|
||||
})
|
||||
}
|
||||
|
||||
func (i *ioTagAdjust) Healthcheck(ctx context.Context, req *HealthcheckRequest) (*HealthcheckResponse, error) {
|
||||
ctx = i.a.AdjustIncomingTag(ctx, req.GetSignature().GetKey())
|
||||
return i.s.Healthcheck(ctx, req)
|
||||
}
|
||||
|
||||
func (i *ioTagAdjust) Move(ctx context.Context, req *MoveRequest) (*MoveResponse, error) {
|
||||
ctx = i.a.AdjustIncomingTag(ctx, req.GetSignature().GetKey())
|
||||
return i.s.Move(ctx, req)
|
||||
}
|
||||
|
||||
func (i *ioTagAdjust) Remove(ctx context.Context, req *RemoveRequest) (*RemoveResponse, error) {
|
||||
ctx = i.a.AdjustIncomingTag(ctx, req.GetSignature().GetKey())
|
||||
return i.s.Remove(ctx, req)
|
||||
}
|
||||
|
||||
func (i *ioTagAdjust) TreeList(ctx context.Context, req *TreeListRequest) (*TreeListResponse, error) {
|
||||
ctx = i.a.AdjustIncomingTag(ctx, req.GetSignature().GetKey())
|
||||
return i.s.TreeList(ctx, req)
|
||||
}
|
||||
|
||||
type qosSend[T any] interface {
|
||||
Send(T) error
|
||||
}
|
||||
|
||||
type qosServerWrapper[T any] struct {
|
||||
grpc.ServerStream
|
||||
sender qosSend[T]
|
||||
ctxF func() context.Context
|
||||
}
|
||||
|
||||
func (w *qosServerWrapper[T]) Send(resp T) error {
|
||||
return w.sender.Send(resp)
|
||||
}
|
||||
|
||||
func (w *qosServerWrapper[T]) Context() context.Context {
|
||||
return w.ctxF()
|
||||
}
|
|
@ -9,9 +9,11 @@ import (
|
|||
"sync"
|
||||
"sync/atomic"
|
||||
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/qos"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/pilorama"
|
||||
checkercore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/common/ape"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-qos/tagging"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/acl"
|
||||
cidSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
|
||||
netmapSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/netmap"
|
||||
|
@ -83,6 +85,7 @@ func New(opts ...Option) *Service {
|
|||
|
||||
// Start starts the service.
|
||||
func (s *Service) Start(ctx context.Context) {
|
||||
ctx = tagging.ContextWithIOTag(ctx, qos.IOTagBackground.String())
|
||||
go s.replicateLoop(ctx)
|
||||
go s.syncLoop(ctx)
|
||||
|
||||
|
|
|
@ -13,6 +13,7 @@ import (
|
|||
"time"
|
||||
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/qos"
|
||||
containerCore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/container"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/pilorama"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/client/netmap"
|
||||
|
@ -20,6 +21,7 @@ import (
|
|||
metrics "git.frostfs.info/TrueCloudLab/frostfs-observability/metrics/grpc"
|
||||
tracing "git.frostfs.info/TrueCloudLab/frostfs-observability/tracing"
|
||||
tracing_grpc "git.frostfs.info/TrueCloudLab/frostfs-observability/tracing/grpc"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-qos/tagging"
|
||||
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
|
||||
netmapSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/netmap"
|
||||
"github.com/panjf2000/ants/v2"
|
||||
|
@ -340,12 +342,16 @@ func (s *Service) synchronizeTree(ctx context.Context, cid cid.ID, from uint64,
|
|||
func (*Service) createConnection(a network.Address) (*grpc.ClientConn, error) {
|
||||
return grpc.NewClient(a.URIAddr(),
|
||||
grpc.WithChainUnaryInterceptor(
|
||||
qos.NewAdjustOutgoingIOTagUnaryClientInterceptor(),
|
||||
metrics.NewUnaryClientInterceptor(),
|
||||
tracing_grpc.NewUnaryClientInteceptor(),
|
||||
tagging.NewUnaryClientInteceptor(),
|
||||
),
|
||||
grpc.WithChainStreamInterceptor(
|
||||
qos.NewAdjustOutgoingIOTagStreamClientInterceptor(),
|
||||
metrics.NewStreamClientInterceptor(),
|
||||
tracing_grpc.NewStreamClientInterceptor(),
|
||||
tagging.NewStreamClientInterceptor(),
|
||||
),
|
||||
grpc.WithTransportCredentials(insecure.NewCredentials()),
|
||||
grpc.WithDefaultCallOptions(grpc.WaitForReady(true)),
|
||||
|
|
|
@ -4,37 +4,32 @@ import (
|
|||
"context"
|
||||
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/tracing"
|
||||
qos "git.frostfs.info/TrueCloudLab/frostfs-qos/tagging"
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
func (l *Logger) Debug(ctx context.Context, msg string, fields ...zap.Field) {
|
||||
if traceID := tracing.GetTraceID(ctx); traceID != "" {
|
||||
l.z.Debug(msg, append(fields, zap.String("trace_id", traceID))...)
|
||||
return
|
||||
}
|
||||
l.z.Debug(msg, fields...)
|
||||
l.z.Debug(msg, appendContext(ctx, fields...)...)
|
||||
}
|
||||
|
||||
func (l *Logger) Info(ctx context.Context, msg string, fields ...zap.Field) {
|
||||
if traceID := tracing.GetTraceID(ctx); traceID != "" {
|
||||
l.z.Info(msg, append(fields, zap.String("trace_id", traceID))...)
|
||||
return
|
||||
}
|
||||
l.z.Info(msg, fields...)
|
||||
l.z.Info(msg, appendContext(ctx, fields...)...)
|
||||
}
|
||||
|
||||
func (l *Logger) Warn(ctx context.Context, msg string, fields ...zap.Field) {
|
||||
if traceID := tracing.GetTraceID(ctx); traceID != "" {
|
||||
l.z.Warn(msg, append(fields, zap.String("trace_id", traceID))...)
|
||||
return
|
||||
}
|
||||
l.z.Warn(msg, fields...)
|
||||
l.z.Warn(msg, appendContext(ctx, fields...)...)
|
||||
}
|
||||
|
||||
func (l *Logger) Error(ctx context.Context, msg string, fields ...zap.Field) {
|
||||
if traceID := tracing.GetTraceID(ctx); traceID != "" {
|
||||
l.z.Error(msg, append(fields, zap.String("trace_id", traceID))...)
|
||||
return
|
||||
}
|
||||
l.z.Error(msg, fields...)
|
||||
l.z.Error(msg, appendContext(ctx, fields...)...)
|
||||
}
|
||||
|
||||
func appendContext(ctx context.Context, fields ...zap.Field) []zap.Field {
|
||||
if traceID := tracing.GetTraceID(ctx); traceID != "" {
|
||||
fields = append(fields, zap.String("trace_id", traceID))
|
||||
}
|
||||
if ioTag, ioTagDefined := qos.IOTagFromContext(ctx); ioTagDefined {
|
||||
fields = append(fields, zap.String("io_tag", ioTag))
|
||||
}
|
||||
return fields
|
||||
}
|
||||
|
|
Loading…
Add table
Reference in a new issue