Set IO tags #1608
31 changed files with 625 additions and 30 deletions
|
@ -9,7 +9,6 @@ import (
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-cli/internal/commonflags"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-cli/internal/commonflags"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/network"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/network"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/tree"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/tree"
|
||||||
metrics "git.frostfs.info/TrueCloudLab/frostfs-observability/metrics/grpc"
|
|
||||||
tracing "git.frostfs.info/TrueCloudLab/frostfs-observability/tracing/grpc"
|
tracing "git.frostfs.info/TrueCloudLab/frostfs-observability/tracing/grpc"
|
||||||
"github.com/spf13/cobra"
|
"github.com/spf13/cobra"
|
||||||
"github.com/spf13/viper"
|
"github.com/spf13/viper"
|
||||||
|
@ -34,11 +33,9 @@ func _client() (tree.TreeServiceClient, error) {
|
||||||
|
|
||||||
opts := []grpc.DialOption{
|
opts := []grpc.DialOption{
|
||||||
grpc.WithChainUnaryInterceptor(
|
grpc.WithChainUnaryInterceptor(
|
||||||
metrics.NewUnaryClientInterceptor(),
|
|
||||||
tracing.NewUnaryClientInteceptor(),
|
tracing.NewUnaryClientInteceptor(),
|
||||||
),
|
),
|
||||||
grpc.WithChainStreamInterceptor(
|
grpc.WithChainStreamInterceptor(
|
||||||
metrics.NewStreamClientInterceptor(),
|
|
||||||
tracing.NewStreamClientInterceptor(),
|
tracing.NewStreamClientInterceptor(),
|
||||||
),
|
),
|
||||||
grpc.WithDefaultCallOptions(grpc.WaitForReady(true)),
|
grpc.WithDefaultCallOptions(grpc.WaitForReady(true)),
|
||||||
|
|
|
@ -493,6 +493,7 @@ type cfg struct {
|
||||||
cfgNetmap cfgNetmap
|
cfgNetmap cfgNetmap
|
||||||
cfgControlService cfgControlService
|
cfgControlService cfgControlService
|
||||||
cfgObject cfgObject
|
cfgObject cfgObject
|
||||||
|
cfgQoSService cfgQoSService
|
||||||
}
|
}
|
||||||
|
|
||||||
// ReadCurrentNetMap reads network map which has been cached at the
|
// ReadCurrentNetMap reads network map which has been cached at the
|
||||||
|
|
46
cmd/frostfs-node/config/qos/config.go
Normal file
46
cmd/frostfs-node/config/qos/config.go
Normal file
|
@ -0,0 +1,46 @@
|
||||||
|
package qos
|
||||||
|
|
||||||
|
import (
|
||||||
|
"fmt"
|
||||||
|
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config"
|
||||||
|
"github.com/nspcc-dev/neo-go/pkg/crypto/keys"
|
||||||
|
)
|
||||||
|
|
||||||
|
const (
|
||||||
|
subsection = "qos"
|
||||||
|
criticalSubSection = "critical"
|
||||||
|
internalSubSection = "internal"
|
||||||
|
)
|
||||||
|
|
||||||
|
// CriticalAuthorizedKeys parses and returns an array of "critical.authorized_keys" config
|
||||||
|
// parameter from "qos" section.
|
||||||
|
//
|
||||||
|
// Returns an empty list if not set.
|
||||||
|
func CriticalAuthorizedKeys(c *config.Config) keys.PublicKeys {
|
||||||
|
return authorizedKeys(c, criticalSubSection)
|
||||||
|
}
|
||||||
|
|
||||||
|
// InternalAuthorizedKeys parses and returns an array of "internal.authorized_keys" config
|
||||||
|
// parameter from "qos" section.
|
||||||
|
//
|
||||||
|
// Returns an empty list if not set.
|
||||||
|
func InternalAuthorizedKeys(c *config.Config) keys.PublicKeys {
|
||||||
|
return authorizedKeys(c, internalSubSection)
|
||||||
|
}
|
||||||
|
|
||||||
|
func authorizedKeys(c *config.Config, sub string) keys.PublicKeys {
|
||||||
|
strKeys := config.StringSliceSafe(c.Sub(subsection).Sub(sub), "authorized_keys")
|
||||||
|
pubs := make(keys.PublicKeys, 0, len(strKeys))
|
||||||
|
|
||||||
|
for i := range strKeys {
|
||||||
|
pub, err := keys.NewPublicKeyFromString(strKeys[i])
|
||||||
|
if err != nil {
|
||||||
|
panic(fmt.Errorf("invalid authorized key %s for qos.%s: %w", strKeys[i], sub, err))
|
||||||
|
}
|
||||||
|
|
||||||
|
pubs = append(pubs, pub)
|
||||||
|
}
|
||||||
|
|
||||||
|
return pubs
|
||||||
|
}
|
40
cmd/frostfs-node/config/qos/config_test.go
Normal file
40
cmd/frostfs-node/config/qos/config_test.go
Normal file
|
@ -0,0 +1,40 @@
|
||||||
|
package qos
|
||||||
|
|
||||||
|
import (
|
||||||
|
"testing"
|
||||||
|
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config"
|
||||||
|
configtest "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/test"
|
||||||
|
"github.com/nspcc-dev/neo-go/pkg/crypto/keys"
|
||||||
|
"github.com/stretchr/testify/require"
|
||||||
|
)
|
||||||
|
|
||||||
|
func TestQoSSection(t *testing.T) {
|
||||||
|
t.Run("defaults", func(t *testing.T) {
|
||||||
|
empty := configtest.EmptyConfig()
|
||||||
|
|
||||||
|
require.Empty(t, CriticalAuthorizedKeys(empty))
|
||||||
|
require.Empty(t, InternalAuthorizedKeys(empty))
|
||||||
|
})
|
||||||
|
|
||||||
|
const path = "../../../../config/example/node"
|
||||||
|
|
||||||
|
criticalPubs := make(keys.PublicKeys, 2)
|
||||||
|
criticalPubs[0], _ = keys.NewPublicKeyFromString("035839e45d472a3b7769a2a1bd7d54c4ccd4943c3b40f547870e83a8fcbfb3ce11")
|
||||||
|
criticalPubs[1], _ = keys.NewPublicKeyFromString("028f42cfcb74499d7b15b35d9bff260a1c8d27de4f446a627406a382d8961486d6")
|
||||||
|
|
||||||
|
internalPubs := make(keys.PublicKeys, 2)
|
||||||
|
internalPubs[0], _ = keys.NewPublicKeyFromString("02b3622bf4017bdfe317c58aed5f4c753f206b7db896046fa7d774bbc4bf7f8dc2")
|
||||||
|
internalPubs[1], _ = keys.NewPublicKeyFromString("031a6c6fbbdf02ca351745fa86b9ba5a9452d785ac4f7fc2b7548ca2a46c4fcf4a")
|
||||||
|
|
||||||
|
fileConfigTest := func(c *config.Config) {
|
||||||
|
require.Equal(t, criticalPubs, CriticalAuthorizedKeys(c))
|
||||||
|
require.Equal(t, internalPubs, InternalAuthorizedKeys(c))
|
||||||
|
}
|
||||||
|
|
||||||
|
configtest.ForEachFileType(path, fileConfigTest)
|
||||||
|
|
||||||
|
t.Run("ENV", func(t *testing.T) {
|
||||||
|
configtest.ForEnvFileType(t, path, fileConfigTest)
|
||||||
|
})
|
||||||
|
}
|
|
@ -7,9 +7,12 @@ import (
|
||||||
|
|
||||||
controlconfig "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/control"
|
controlconfig "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/control"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/qos"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/control"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/control"
|
||||||
controlSvc "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/control/server"
|
controlSvc "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/control/server"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/sdnotify"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/sdnotify"
|
||||||
|
metrics "git.frostfs.info/TrueCloudLab/frostfs-observability/metrics/grpc"
|
||||||
|
tracing "git.frostfs.info/TrueCloudLab/frostfs-observability/tracing/grpc"
|
||||||
"go.uber.org/zap"
|
"go.uber.org/zap"
|
||||||
"google.golang.org/grpc"
|
"google.golang.org/grpc"
|
||||||
)
|
)
|
||||||
|
@ -50,7 +53,14 @@ func initControlService(ctx context.Context, c *cfg) {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
c.cfgControlService.server = grpc.NewServer()
|
c.cfgControlService.server = grpc.NewServer(
|
||||||
|
grpc.ChainUnaryInterceptor(
|
||||||
|
qos.NewSetCriticalIOTagUnaryServerInterceptor(),
|
||||||
|
metrics.NewUnaryServerInterceptor(),
|
||||||
|
tracing.NewUnaryServerInterceptor(),
|
||||||
|
),
|
||||||
|
// control service has no stream methods, so no stream interceptors added
|
||||||
|
)
|
||||||
|
|
||||||
c.onShutdown(func() {
|
c.onShutdown(func() {
|
||||||
stopGRPC(ctx, "FrostFS Control API", c.cfgControlService.server, c.log)
|
stopGRPC(ctx, "FrostFS Control API", c.cfgControlService.server, c.log)
|
||||||
|
|
|
@ -12,6 +12,7 @@ import (
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger"
|
||||||
metrics "git.frostfs.info/TrueCloudLab/frostfs-observability/metrics/grpc"
|
metrics "git.frostfs.info/TrueCloudLab/frostfs-observability/metrics/grpc"
|
||||||
tracing "git.frostfs.info/TrueCloudLab/frostfs-observability/tracing/grpc"
|
tracing "git.frostfs.info/TrueCloudLab/frostfs-observability/tracing/grpc"
|
||||||
|
qos "git.frostfs.info/TrueCloudLab/frostfs-qos/tagging"
|
||||||
"go.uber.org/zap"
|
"go.uber.org/zap"
|
||||||
"google.golang.org/grpc"
|
"google.golang.org/grpc"
|
||||||
"google.golang.org/grpc/credentials"
|
"google.golang.org/grpc/credentials"
|
||||||
|
@ -130,10 +131,12 @@ func getGrpcServerOpts(ctx context.Context, c *cfg, sc *grpcconfig.Config) ([]gr
|
||||||
serverOpts := []grpc.ServerOption{
|
serverOpts := []grpc.ServerOption{
|
||||||
grpc.MaxRecvMsgSize(maxRecvMsgSize),
|
grpc.MaxRecvMsgSize(maxRecvMsgSize),
|
||||||
grpc.ChainUnaryInterceptor(
|
grpc.ChainUnaryInterceptor(
|
||||||
|
qos.NewUnaryServerInterceptor(),
|
||||||
metrics.NewUnaryServerInterceptor(),
|
metrics.NewUnaryServerInterceptor(),
|
||||||
tracing.NewUnaryServerInterceptor(),
|
tracing.NewUnaryServerInterceptor(),
|
||||||
),
|
),
|
||||||
grpc.ChainStreamInterceptor(
|
grpc.ChainStreamInterceptor(
|
||||||
|
qos.NewStreamServerInterceptor(),
|
||||||
metrics.NewStreamServerInterceptor(),
|
metrics.NewStreamServerInterceptor(),
|
||||||
tracing.NewStreamServerInterceptor(),
|
tracing.NewStreamServerInterceptor(),
|
||||||
),
|
),
|
||||||
|
|
|
@ -101,6 +101,7 @@ func initApp(ctx context.Context, c *cfg) {
|
||||||
|
|
||||||
initAndLog(ctx, c, "gRPC", func(c *cfg) { initGRPC(ctx, c) })
|
initAndLog(ctx, c, "gRPC", func(c *cfg) { initGRPC(ctx, c) })
|
||||||
initAndLog(ctx, c, "netmap", func(c *cfg) { initNetmapService(ctx, c) })
|
initAndLog(ctx, c, "netmap", func(c *cfg) { initNetmapService(ctx, c) })
|
||||||
|
initAndLog(ctx, c, "qos", func(c *cfg) { initQoSService(c) })
|
||||||
|
|
||||||
initAccessPolicyEngine(ctx, c)
|
initAccessPolicyEngine(ctx, c)
|
||||||
initAndLog(ctx, c, "access policy engine", func(c *cfg) {
|
initAndLog(ctx, c, "access policy engine", func(c *cfg) {
|
||||||
|
|
|
@ -168,7 +168,7 @@ func initObjectService(c *cfg) {
|
||||||
sPatch := createPatchSvc(sGet, sPut)
|
sPatch := createPatchSvc(sGet, sPut)
|
||||||
|
|
||||||
// build service pipeline
|
// build service pipeline
|
||||||
// grpc | audit | <metrics> | signature | response | acl | ape | split
|
// grpc | audit | qos | <metrics> | signature | response | acl | ape | split
|
||||||
|
|
||||||
splitSvc := createSplitService(c, sPutV2, sGetV2, sSearchV2, sDeleteV2, sPatch)
|
splitSvc := createSplitService(c, sPutV2, sGetV2, sSearchV2, sDeleteV2, sPatch)
|
||||||
|
|
||||||
|
@ -191,7 +191,8 @@ func initObjectService(c *cfg) {
|
||||||
|
|
||||||
c.shared.metricsSvc = objectService.NewMetricCollector(
|
c.shared.metricsSvc = objectService.NewMetricCollector(
|
||||||
signSvc, c.metricsCollector.ObjectService(), metricsconfig.Enabled(c.appCfg))
|
signSvc, c.metricsCollector.ObjectService(), metricsconfig.Enabled(c.appCfg))
|
||||||
auditSvc := objectService.NewAuditService(c.shared.metricsSvc, c.log, c.audit)
|
qosService := objectService.NewQoSObjectService(c.shared.metricsSvc, &c.cfgQoSService)
|
||||||
|
auditSvc := objectService.NewAuditService(qosService, c.log, c.audit)
|
||||||
server := objectTransportGRPC.New(auditSvc)
|
server := objectTransportGRPC.New(auditSvc)
|
||||||
|
|
||||||
c.cfgGRPC.performAndSave(func(_ string, _ net.Listener, s *grpc.Server) {
|
c.cfgGRPC.performAndSave(func(_ string, _ net.Listener, s *grpc.Server) {
|
||||||
|
|
95
cmd/frostfs-node/qos.go
Normal file
95
cmd/frostfs-node/qos.go
Normal file
|
@ -0,0 +1,95 @@
|
||||||
|
package main
|
||||||
|
|
||||||
|
import (
|
||||||
|
"bytes"
|
||||||
|
"context"
|
||||||
|
|
||||||
|
qosconfig "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/qos"
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/qos"
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/netmap"
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger"
|
||||||
|
qosTagging "git.frostfs.info/TrueCloudLab/frostfs-qos/tagging"
|
||||||
|
"go.uber.org/zap"
|
||||||
|
)
|
||||||
|
|
||||||
|
type cfgQoSService struct {
|
||||||
|
netmapSource netmap.Source
|
||||||
|
logger *logger.Logger
|
||||||
|
allowedCriticalPubs [][]byte
|
||||||
|
allowedInternalPubs [][]byte
|
||||||
|
}
|
||||||
|
|
||||||
|
func initQoSService(c *cfg) {
|
||||||
|
criticalPubs := qosconfig.CriticalAuthorizedKeys(c.appCfg)
|
||||||
|
internalPubs := qosconfig.InternalAuthorizedKeys(c.appCfg)
|
||||||
|
rawCriticalPubs := make([][]byte, 0, len(criticalPubs))
|
||||||
|
rawInternalPubs := make([][]byte, 0, len(internalPubs))
|
||||||
|
for i := range criticalPubs {
|
||||||
|
rawCriticalPubs = append(rawCriticalPubs, criticalPubs[i].Bytes())
|
||||||
|
}
|
||||||
|
for i := range internalPubs {
|
||||||
|
rawInternalPubs = append(rawInternalPubs, internalPubs[i].Bytes())
|
||||||
|
}
|
||||||
|
|
||||||
|
c.cfgQoSService = cfgQoSService{
|
||||||
|
netmapSource: c.netMapSource,
|
||||||
|
logger: c.log,
|
||||||
|
allowedCriticalPubs: rawCriticalPubs,
|
||||||
|
allowedInternalPubs: rawInternalPubs,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *cfgQoSService) AdjustIncomingTag(ctx context.Context, requestSignPublicKey []byte) context.Context {
|
||||||
|
rawTag, defined := qosTagging.IOTagFromContext(ctx)
|
||||||
|
if !defined {
|
||||||
|
return qosTagging.ContextWithIOTag(ctx, qos.IOTagClient.String())
|
||||||
|
}
|
||||||
|
ioTag, err := qos.FromRawString(rawTag)
|
||||||
|
if err != nil {
|
||||||
|
s.logger.Warn(ctx, logs.FailedToParseIncomingIOTag, zap.Error(err))
|
||||||
|
return qosTagging.ContextWithIOTag(ctx, qos.IOTagClient.String())
|
||||||
|
}
|
||||||
|
|
||||||
|
switch ioTag {
|
||||||
|
case qos.IOTagClient:
|
||||||
|
return ctx
|
||||||
|
case qos.IOTagCritical:
|
||||||
|
for _, pk := range s.allowedCriticalPubs {
|
||||||
|
if bytes.Equal(pk, requestSignPublicKey) {
|
||||||
|
return ctx
|
||||||
|
}
|
||||||
|
}
|
||||||
|
nm, err := s.netmapSource.GetNetMap(ctx, 0)
|
||||||
|
if err != nil {
|
||||||
|
s.logger.Debug(ctx, logs.FailedToGetNetmapToAdjustIOTag, zap.Error(err))
|
||||||
|
return qosTagging.ContextWithIOTag(ctx, qos.IOTagClient.String())
|
||||||
|
}
|
||||||
|
for _, node := range nm.Nodes() {
|
||||||
|
if bytes.Equal(node.PublicKey(), requestSignPublicKey) {
|
||||||
|
return ctx
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return qosTagging.ContextWithIOTag(ctx, qos.IOTagClient.String())
|
||||||
|
case qos.IOTagInternal:
|
||||||
|
for _, pk := range s.allowedInternalPubs {
|
||||||
|
if bytes.Equal(pk, requestSignPublicKey) {
|
||||||
|
return ctx
|
||||||
|
}
|
||||||
|
}
|
||||||
|
nm, err := s.netmapSource.GetNetMap(ctx, 0)
|
||||||
|
if err != nil {
|
||||||
|
s.logger.Debug(ctx, logs.FailedToGetNetmapToAdjustIOTag, zap.Error(err))
|
||||||
|
return qosTagging.ContextWithIOTag(ctx, qos.IOTagClient.String())
|
||||||
|
}
|
||||||
|
for _, node := range nm.Nodes() {
|
||||||
|
if bytes.Equal(node.PublicKey(), requestSignPublicKey) {
|
||||||
|
return ctx
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return qosTagging.ContextWithIOTag(ctx, qos.IOTagClient.String())
|
||||||
|
default:
|
||||||
|
s.logger.Warn(ctx, logs.NotSupportedIncomingIOTagReplacedWithClient, zap.Stringer("io_tag", ioTag))
|
||||||
|
return qosTagging.ContextWithIOTag(ctx, qos.IOTagClient.String())
|
||||||
|
}
|
||||||
|
}
|
|
@ -72,7 +72,7 @@ func initTreeService(c *cfg) {
|
||||||
)
|
)
|
||||||
|
|
||||||
c.cfgGRPC.performAndSave(func(_ string, _ net.Listener, s *grpc.Server) {
|
c.cfgGRPC.performAndSave(func(_ string, _ net.Listener, s *grpc.Server) {
|
||||||
tree.RegisterTreeServiceServer(s, c.treeService)
|
tree.RegisterTreeServiceServer(s, tree.NewIOTagAdjustServer(c.treeService, &c.cfgQoSService))
|
||||||
})
|
})
|
||||||
|
|
||||||
c.workers = append(c.workers, newWorkerFromFunc(func(ctx context.Context) {
|
c.workers = append(c.workers, newWorkerFromFunc(func(ctx context.Context) {
|
||||||
|
|
|
@ -225,3 +225,6 @@ FROSTFS_MULTINET_SUBNETS_1_SOURCE_IPS="10.78.70.185 10.78.71.185"
|
||||||
FROSTFS_MULTINET_BALANCER=roundrobin
|
FROSTFS_MULTINET_BALANCER=roundrobin
|
||||||
FROSTFS_MULTINET_RESTRICT=false
|
FROSTFS_MULTINET_RESTRICT=false
|
||||||
FROSTFS_MULTINET_FALLBACK_DELAY=350ms
|
FROSTFS_MULTINET_FALLBACK_DELAY=350ms
|
||||||
|
|
||||||
|
FROSTFS_QOS_CRITICAL_AUTHORIZED_KEYS="035839e45d472a3b7769a2a1bd7d54c4ccd4943c3b40f547870e83a8fcbfb3ce11 028f42cfcb74499d7b15b35d9bff260a1c8d27de4f446a627406a382d8961486d6"
|
||||||
|
FROSTFS_QOS_INTERNAL_AUTHORIZED_KEYS="02b3622bf4017bdfe317c58aed5f4c753f206b7db896046fa7d774bbc4bf7f8dc2 031a6c6fbbdf02ca351745fa86b9ba5a9452d785ac4f7fc2b7548ca2a46c4fcf4a"
|
||||||
|
|
|
@ -305,5 +305,19 @@
|
||||||
"balancer": "roundrobin",
|
"balancer": "roundrobin",
|
||||||
"restrict": false,
|
"restrict": false,
|
||||||
"fallback_delay": "350ms"
|
"fallback_delay": "350ms"
|
||||||
|
},
|
||||||
|
"qos": {
|
||||||
|
"critical": {
|
||||||
|
"authorized_keys": [
|
||||||
|
"035839e45d472a3b7769a2a1bd7d54c4ccd4943c3b40f547870e83a8fcbfb3ce11",
|
||||||
|
"028f42cfcb74499d7b15b35d9bff260a1c8d27de4f446a627406a382d8961486d6"
|
||||||
|
]
|
||||||
|
},
|
||||||
|
"internal": {
|
||||||
|
"authorized_keys": [
|
||||||
|
"02b3622bf4017bdfe317c58aed5f4c753f206b7db896046fa7d774bbc4bf7f8dc2",
|
||||||
|
"031a6c6fbbdf02ca351745fa86b9ba5a9452d785ac4f7fc2b7548ca2a46c4fcf4a"
|
||||||
|
]
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -270,3 +270,13 @@ multinet:
|
||||||
balancer: roundrobin
|
balancer: roundrobin
|
||||||
restrict: false
|
restrict: false
|
||||||
fallback_delay: 350ms
|
fallback_delay: 350ms
|
||||||
|
|
||||||
|
qos:
|
||||||
|
critical:
|
||||||
|
authorized_keys: # list of hex-encoded public keys that have rights to use `critical` IO tag
|
||||||
|
- 035839e45d472a3b7769a2a1bd7d54c4ccd4943c3b40f547870e83a8fcbfb3ce11
|
||||||
|
- 028f42cfcb74499d7b15b35d9bff260a1c8d27de4f446a627406a382d8961486d6
|
||||||
|
internal:
|
||||||
|
authorized_keys: # list of hex-encoded public keys that have rights to use `internal` IO tag
|
||||||
|
- 02b3622bf4017bdfe317c58aed5f4c753f206b7db896046fa7d774bbc4bf7f8dc2
|
||||||
|
- 031a6c6fbbdf02ca351745fa86b9ba5a9452d785ac4f7fc2b7548ca2a46c4fcf4a
|
||||||
|
|
|
@ -26,7 +26,8 @@ There are some custom types used for brevity:
|
||||||
| `storage` | [Storage engine configuration](#storage-section) |
|
| `storage` | [Storage engine configuration](#storage-section) |
|
||||||
| `runtime` | [Runtime configuration](#runtime-section) |
|
| `runtime` | [Runtime configuration](#runtime-section) |
|
||||||
| `audit` | [Audit configuration](#audit-section) |
|
| `audit` | [Audit configuration](#audit-section) |
|
||||||
| `multinet` | [Multinet configuration](#multinet-section) |
|
| `multinet` | [Multinet configuration](#multinet-section) |
|
||||||
|
| `qos` | [QoS configuration](#qos-section) |
|
||||||
|
|
||||||
# `control` section
|
# `control` section
|
||||||
```yaml
|
```yaml
|
||||||
|
@ -471,3 +472,20 @@ multinet:
|
||||||
| `balancer` | `string` | "" | Balancer to select network interfaces, allowed values are "" (no balancing, use first suitable interface) or "roundrobin". |
|
| `balancer` | `string` | "" | Balancer to select network interfaces, allowed values are "" (no balancing, use first suitable interface) or "roundrobin". |
|
||||||
| `restrict` | `bool` | false | If `true` then any requests that do not match `subnets` will fail. |
|
| `restrict` | `bool` | false | If `true` then any requests that do not match `subnets` will fail. |
|
||||||
| `fallback_delay` | `duration` | 350ms | Delay before fallback to secondary IP addresses in case of hostname resolve. |
|
| `fallback_delay` | `duration` | 350ms | Delay before fallback to secondary IP addresses in case of hostname resolve. |
|
||||||
|
|
||||||
|
# `qos` section
|
||||||
|
```yaml
|
||||||
|
qos:
|
||||||
|
critical:
|
||||||
|
authorized_keys:
|
||||||
|
- 035839e45d472a3b7769a2a1bd7d54c4ccd4943c3b40f547870e83a8fcbfb3ce11
|
||||||
|
- 028f42cfcb74499d7b15b35d9bff260a1c8d27de4f446a627406a382d8961486d6
|
||||||
|
internal:
|
||||||
|
authorized_keys:
|
||||||
|
- 035839e45d472a3b7769a2a1bd7d54c4ccd4943c3b40f547870e83a8fcbfb3ce11
|
||||||
|
- 028f42cfcb74499d7b15b35d9bff260a1c8d27de4f446a627406a382d8961486d6
|
||||||
|
```
|
||||||
|
| Parameter | Type | Default value | Description |
|
||||||
|
| -------------------------- | -------------- | ------------- | --------------------------------------------------------------------------- |
|
||||||
|
| `critical.authorized_keys` | `[]public key` | empty | List of public keys for which requests with the tag `critical` are allowed. |
|
||||||
|
| `internal.authorized_keys` | `[]public key` | empty | List of public keys for which requests with the tag `internal` are allowed. |
|
||||||
|
|
1
go.mod
1
go.mod
|
@ -8,6 +8,7 @@ require (
|
||||||
git.frostfs.info/TrueCloudLab/frostfs-crypto v0.6.0
|
git.frostfs.info/TrueCloudLab/frostfs-crypto v0.6.0
|
||||||
git.frostfs.info/TrueCloudLab/frostfs-locode-db v0.4.1-0.20240710074952-65761deb5c0d
|
git.frostfs.info/TrueCloudLab/frostfs-locode-db v0.4.1-0.20240710074952-65761deb5c0d
|
||||||
git.frostfs.info/TrueCloudLab/frostfs-observability v0.0.0-20241112082307-f17779933e88
|
git.frostfs.info/TrueCloudLab/frostfs-observability v0.0.0-20241112082307-f17779933e88
|
||||||
|
git.frostfs.info/TrueCloudLab/frostfs-qos v0.0.0-20250128150313-cfbca7fa1dfe
|
||||||
git.frostfs.info/TrueCloudLab/frostfs-sdk-go v0.0.0-20250202151421-8389887a3421
|
git.frostfs.info/TrueCloudLab/frostfs-sdk-go v0.0.0-20250202151421-8389887a3421
|
||||||
git.frostfs.info/TrueCloudLab/hrw v1.2.1
|
git.frostfs.info/TrueCloudLab/hrw v1.2.1
|
||||||
git.frostfs.info/TrueCloudLab/multinet v0.0.0-20241015075604-6cb0d80e0972
|
git.frostfs.info/TrueCloudLab/multinet v0.0.0-20241015075604-6cb0d80e0972
|
||||||
|
|
2
go.sum
2
go.sum
|
@ -8,6 +8,8 @@ git.frostfs.info/TrueCloudLab/frostfs-locode-db v0.4.1-0.20240710074952-65761deb
|
||||||
git.frostfs.info/TrueCloudLab/frostfs-locode-db v0.4.1-0.20240710074952-65761deb5c0d/go.mod h1:7ZZq8iguY7qFsXajdHGmZd2AW4QbucyrJwhbsRfOfek=
|
git.frostfs.info/TrueCloudLab/frostfs-locode-db v0.4.1-0.20240710074952-65761deb5c0d/go.mod h1:7ZZq8iguY7qFsXajdHGmZd2AW4QbucyrJwhbsRfOfek=
|
||||||
git.frostfs.info/TrueCloudLab/frostfs-observability v0.0.0-20241112082307-f17779933e88 h1:9bvBDLApbbO5sXBKdODpE9tzy3HV99nXxkDWNn22rdI=
|
git.frostfs.info/TrueCloudLab/frostfs-observability v0.0.0-20241112082307-f17779933e88 h1:9bvBDLApbbO5sXBKdODpE9tzy3HV99nXxkDWNn22rdI=
|
||||||
git.frostfs.info/TrueCloudLab/frostfs-observability v0.0.0-20241112082307-f17779933e88/go.mod h1:kbwB4v2o6RyOfCo9kEFeUDZIX3LKhmS0yXPrtvzkQ1g=
|
git.frostfs.info/TrueCloudLab/frostfs-observability v0.0.0-20241112082307-f17779933e88/go.mod h1:kbwB4v2o6RyOfCo9kEFeUDZIX3LKhmS0yXPrtvzkQ1g=
|
||||||
|
git.frostfs.info/TrueCloudLab/frostfs-qos v0.0.0-20250128150313-cfbca7fa1dfe h1:81gDNdWNLP24oMQukRiCE9R1wGSh0l0dRq3F1W+Oesc=
|
||||||
|
git.frostfs.info/TrueCloudLab/frostfs-qos v0.0.0-20250128150313-cfbca7fa1dfe/go.mod h1:PCijYq4oa8vKtIEcUX6jRiszI6XAW+nBwU+T1kB4d1U=
|
||||||
git.frostfs.info/TrueCloudLab/frostfs-sdk-go v0.0.0-20250202151421-8389887a3421 h1:pP19IawSdsLCKFv7HMNfWAeH6E3uSnntKZkwka+/2+4=
|
git.frostfs.info/TrueCloudLab/frostfs-sdk-go v0.0.0-20250202151421-8389887a3421 h1:pP19IawSdsLCKFv7HMNfWAeH6E3uSnntKZkwka+/2+4=
|
||||||
git.frostfs.info/TrueCloudLab/frostfs-sdk-go v0.0.0-20250202151421-8389887a3421/go.mod h1:aQpPWfG8oyfJ2X+FenPTJpSRWZjwcP5/RAtkW+/VEX8=
|
git.frostfs.info/TrueCloudLab/frostfs-sdk-go v0.0.0-20250202151421-8389887a3421/go.mod h1:aQpPWfG8oyfJ2X+FenPTJpSRWZjwcP5/RAtkW+/VEX8=
|
||||||
git.frostfs.info/TrueCloudLab/hrw v1.2.1 h1:ccBRK21rFvY5R1WotI6LNoPlizk7qSvdfD8lNIRudVc=
|
git.frostfs.info/TrueCloudLab/hrw v1.2.1 h1:ccBRK21rFvY5R1WotI6LNoPlizk7qSvdfD8lNIRudVc=
|
||||||
|
|
|
@ -510,4 +510,7 @@ const (
|
||||||
BlobovniczatreeFailedToRemoveRebuildTempFile = "failed to remove rebuild temp file"
|
BlobovniczatreeFailedToRemoveRebuildTempFile = "failed to remove rebuild temp file"
|
||||||
WritecacheCantGetObject = "can't get an object from fstree"
|
WritecacheCantGetObject = "can't get an object from fstree"
|
||||||
FailedToUpdateMultinetConfiguration = "failed to update multinet configuration"
|
FailedToUpdateMultinetConfiguration = "failed to update multinet configuration"
|
||||||
|
FailedToParseIncomingIOTag = "failed to parse incoming IO tag"
|
||||||
|
NotSupportedIncomingIOTagReplacedWithClient = "incoming IO tag is not supported, replaced with `client`"
|
||||||
|
FailedToGetNetmapToAdjustIOTag = "failed to get netmap to adjust IO tag, replaced with `client`"
|
||||||
)
|
)
|
||||||
|
|
51
internal/qos/grpc.go
Normal file
51
internal/qos/grpc.go
Normal file
|
@ -0,0 +1,51 @@
|
||||||
|
package qos
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-qos/tagging"
|
||||||
|
"google.golang.org/grpc"
|
||||||
|
)
|
||||||
|
|
||||||
|
func NewSetCriticalIOTagUnaryServerInterceptor() grpc.UnaryServerInterceptor {
|
||||||
|
return func(ctx context.Context, req any, _ *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (resp any, err error) {
|
||||||
|
ctx = tagging.ContextWithIOTag(ctx, IOTagCritical.String())
|
||||||
|
return handler(ctx, req)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewAdjustOutgoingIOTagUnaryClientInterceptor() grpc.UnaryClientInterceptor {
|
||||||
|
return func(ctx context.Context, method string, req, reply any, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error {
|
||||||
|
rawTag, ok := tagging.IOTagFromContext(ctx)
|
||||||
|
if !ok {
|
||||||
|
return invoker(ctx, method, req, reply, cc, opts...)
|
||||||
|
}
|
||||||
|
tag, err := FromRawString(rawTag)
|
||||||
|
if err != nil {
|
||||||
|
tag = IOTagClient
|
||||||
|
}
|
||||||
|
if tag == IOTagBackground || tag == IOTagPolicer || tag == IOTagWritecache {
|
||||||
|
tag = IOTagInternal
|
||||||
|
}
|
||||||
|
ctx = tagging.ContextWithIOTag(ctx, tag.String())
|
||||||
|
return invoker(ctx, method, req, reply, cc, opts...)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewAdjustOutgoingIOTagStreamClientInterceptor() grpc.StreamClientInterceptor {
|
||||||
|
return func(ctx context.Context, desc *grpc.StreamDesc, cc *grpc.ClientConn, method string, streamer grpc.Streamer, opts ...grpc.CallOption) (grpc.ClientStream, error) {
|
||||||
|
rawTag, ok := tagging.IOTagFromContext(ctx)
|
||||||
|
if !ok {
|
||||||
|
return streamer(ctx, desc, cc, method, opts...)
|
||||||
|
}
|
||||||
|
tag, err := FromRawString(rawTag)
|
||||||
|
if err != nil {
|
||||||
|
tag = IOTagClient
|
||||||
|
}
|
||||||
|
if tag == IOTagBackground || tag == IOTagPolicer || tag == IOTagWritecache {
|
||||||
|
tag = IOTagInternal
|
||||||
|
}
|
||||||
|
ctx = tagging.ContextWithIOTag(ctx, tag.String())
|
||||||
|
return streamer(ctx, desc, cc, method, opts...)
|
||||||
|
}
|
||||||
|
}
|
39
internal/qos/tags.go
Normal file
39
internal/qos/tags.go
Normal file
|
@ -0,0 +1,39 @@
|
||||||
|
package qos
|
||||||
|
|
||||||
|
import "fmt"
|
||||||
|
|
||||||
|
type IOTag string
|
||||||
|
|
||||||
|
const (
|
||||||
|
IOTagClient IOTag = "client"
|
||||||
|
IOTagInternal IOTag = "internal"
|
||||||
|
IOTagBackground IOTag = "background"
|
||||||
|
IOTagWritecache IOTag = "writecache"
|
||||||
|
IOTagPolicer IOTag = "policer"
|
||||||
|
IOTagCritical IOTag = "critical"
|
||||||
|
|
||||||
|
ioTagUnknown IOTag = ""
|
||||||
|
)
|
||||||
|
|
||||||
|
func FromRawString(s string) (IOTag, error) {
|
||||||
|
switch s {
|
||||||
|
case string(IOTagCritical):
|
||||||
|
return IOTagCritical, nil
|
||||||
|
case string(IOTagClient):
|
||||||
|
return IOTagClient, nil
|
||||||
|
case string(IOTagInternal):
|
||||||
|
return IOTagInternal, nil
|
||||||
|
case string(IOTagBackground):
|
||||||
|
return IOTagBackground, nil
|
||||||
|
case string(IOTagWritecache):
|
||||||
|
return IOTagWritecache, nil
|
||||||
|
case string(IOTagPolicer):
|
||||||
|
return IOTagPolicer, nil
|
||||||
|
default:
|
||||||
|
return ioTagUnknown, fmt.Errorf("unknown tag %s", s)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (t IOTag) String() string {
|
||||||
|
return string(t)
|
||||||
|
}
|
|
@ -45,7 +45,7 @@ func TestUpgradeV2ToV3(t *testing.T) {
|
||||||
|
|
||||||
type testContainerInfoProvider struct{}
|
type testContainerInfoProvider struct{}
|
||||||
|
|
||||||
func (p *testContainerInfoProvider) Info(id cid.ID) (container.Info, error) {
|
func (p *testContainerInfoProvider) Info(ctx context.Context, id cid.ID) (container.Info, error) {
|
||||||
return container.Info{}, nil
|
return container.Info{}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -6,11 +6,13 @@ import (
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/qos"
|
||||||
meta "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/metabase"
|
meta "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/metabase"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard/mode"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard/mode"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-observability/tracing"
|
"git.frostfs.info/TrueCloudLab/frostfs-observability/tracing"
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-qos/tagging"
|
||||||
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
|
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
|
||||||
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
||||||
"go.uber.org/zap"
|
"go.uber.org/zap"
|
||||||
|
@ -149,7 +151,7 @@ func (gc *gc) init(ctx context.Context) {
|
||||||
if sz > 0 {
|
if sz > 0 {
|
||||||
gc.workerPool = gc.workerPoolInit(sz)
|
gc.workerPool = gc.workerPoolInit(sz)
|
||||||
}
|
}
|
||||||
|
ctx = tagging.ContextWithIOTag(ctx, qos.IOTagBackground.String())
|
||||||
gc.wg.Add(2)
|
gc.wg.Add(2)
|
||||||
go gc.tickRemover(ctx)
|
go gc.tickRemover(ctx)
|
||||||
go gc.listenEvents(ctx)
|
go gc.listenEvents(ctx)
|
||||||
|
|
|
@ -6,10 +6,12 @@ import (
|
||||||
"sync"
|
"sync"
|
||||||
|
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/qos"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor"
|
||||||
meta "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/metabase"
|
meta "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/metabase"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-observability/tracing"
|
"git.frostfs.info/TrueCloudLab/frostfs-observability/tracing"
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-qos/tagging"
|
||||||
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
||||||
"go.opentelemetry.io/otel/attribute"
|
"go.opentelemetry.io/otel/attribute"
|
||||||
"go.opentelemetry.io/otel/trace"
|
"go.opentelemetry.io/otel/trace"
|
||||||
|
@ -103,6 +105,7 @@ func runRebuild(ctx context.Context, bs *blobstor.BlobStor, mb *meta.DB, log *lo
|
||||||
default:
|
default:
|
||||||
}
|
}
|
||||||
log.Info(ctx, logs.BlobstoreRebuildStarted)
|
log.Info(ctx, logs.BlobstoreRebuildStarted)
|
||||||
|
ctx = tagging.ContextWithIOTag(ctx, qos.IOTagBackground.String())
|
||||||
if err := bs.Rebuild(ctx, &mbStorageIDUpdate{mb: mb}, limiter, fillPercent); err != nil {
|
if err := bs.Rebuild(ctx, &mbStorageIDUpdate{mb: mb}, limiter, fillPercent); err != nil {
|
||||||
log.Warn(ctx, logs.FailedToRebuildBlobstore, zap.Error(err))
|
log.Warn(ctx, logs.FailedToRebuildBlobstore, zap.Error(err))
|
||||||
} else {
|
} else {
|
||||||
|
|
|
@ -6,6 +6,7 @@ import (
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/qos"
|
||||||
objectCore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object"
|
objectCore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common"
|
||||||
|
@ -14,6 +15,7 @@ import (
|
||||||
meta "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/metabase"
|
meta "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/metabase"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard/mode"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard/mode"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-observability/tracing"
|
"git.frostfs.info/TrueCloudLab/frostfs-observability/tracing"
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-qos/tagging"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client"
|
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client"
|
||||||
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
|
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
|
||||||
"go.opentelemetry.io/otel/attribute"
|
"go.opentelemetry.io/otel/attribute"
|
||||||
|
@ -35,6 +37,7 @@ func (c *cache) runFlushLoop(ctx context.Context) {
|
||||||
if c.disableBackgroundFlush {
|
if c.disableBackgroundFlush {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
ctx = tagging.ContextWithIOTag(ctx, qos.IOTagWritecache.String())
|
||||||
fl := newFlushLimiter(c.flushSizeLimit)
|
fl := newFlushLimiter(c.flushSizeLimit)
|
||||||
c.wg.Add(1)
|
c.wg.Add(1)
|
||||||
go func() {
|
go func() {
|
||||||
|
|
6
pkg/network/cache/multi.go
vendored
6
pkg/network/cache/multi.go
vendored
|
@ -7,10 +7,12 @@ import (
|
||||||
"sync"
|
"sync"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/qos"
|
||||||
clientcore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/client"
|
clientcore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/client"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/network"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/network"
|
||||||
metrics "git.frostfs.info/TrueCloudLab/frostfs-observability/metrics/grpc"
|
metrics "git.frostfs.info/TrueCloudLab/frostfs-observability/metrics/grpc"
|
||||||
tracing "git.frostfs.info/TrueCloudLab/frostfs-observability/tracing/grpc"
|
tracing "git.frostfs.info/TrueCloudLab/frostfs-observability/tracing/grpc"
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-qos/tagging"
|
||||||
rawclient "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/api/rpc/client"
|
rawclient "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/api/rpc/client"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client"
|
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client"
|
||||||
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
|
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
|
||||||
|
@ -62,12 +64,16 @@ func (x *multiClient) createForAddress(ctx context.Context, addr network.Address
|
||||||
|
|
||||||
grpcOpts := []grpc.DialOption{
|
grpcOpts := []grpc.DialOption{
|
||||||
grpc.WithChainUnaryInterceptor(
|
grpc.WithChainUnaryInterceptor(
|
||||||
|
qos.NewAdjustOutgoingIOTagUnaryClientInterceptor(),
|
||||||
metrics.NewUnaryClientInterceptor(),
|
metrics.NewUnaryClientInterceptor(),
|
||||||
tracing.NewUnaryClientInteceptor(),
|
tracing.NewUnaryClientInteceptor(),
|
||||||
|
tagging.NewUnaryClientInteceptor(),
|
||||||
),
|
),
|
||||||
grpc.WithChainStreamInterceptor(
|
grpc.WithChainStreamInterceptor(
|
||||||
|
qos.NewAdjustOutgoingIOTagStreamClientInterceptor(),
|
||||||
metrics.NewStreamClientInterceptor(),
|
metrics.NewStreamClientInterceptor(),
|
||||||
tracing.NewStreamClientInterceptor(),
|
tracing.NewStreamClientInterceptor(),
|
||||||
|
tagging.NewStreamClientInterceptor(),
|
||||||
),
|
),
|
||||||
grpc.WithContextDialer(x.opts.DialerSource.GrpcContextDialer()),
|
grpc.WithContextDialer(x.opts.DialerSource.GrpcContextDialer()),
|
||||||
grpc.WithDefaultCallOptions(grpc.WaitForReady(true)),
|
grpc.WithDefaultCallOptions(grpc.WaitForReady(true)),
|
||||||
|
|
132
pkg/services/object/qos.go
Normal file
132
pkg/services/object/qos.go
Normal file
|
@ -0,0 +1,132 @@
|
||||||
|
package object
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/api/object"
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/api/session"
|
||||||
|
)
|
||||||
|
|
||||||
|
var _ ServiceServer = (*qosObjectService)(nil)
|
||||||
|
|
||||||
|
type AdjustIOTag interface {
|
||||||
|
AdjustIncomingTag(ctx context.Context, requestSignPublicKey []byte) context.Context
|
||||||
|
}
|
||||||
|
|
||||||
|
type qosObjectService struct {
|
||||||
|
next ServiceServer
|
||||||
|
adj AdjustIOTag
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewQoSObjectService(next ServiceServer, adjIOTag AdjustIOTag) ServiceServer {
|
||||||
|
return &qosObjectService{
|
||||||
|
next: next,
|
||||||
|
adj: adjIOTag,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (q *qosObjectService) Delete(ctx context.Context, req *object.DeleteRequest) (*object.DeleteResponse, error) {
|
||||||
|
ctx = q.adj.AdjustIncomingTag(ctx, req.GetVerificationHeader().GetBodySignature().GetKey())
|
||||||
|
return q.next.Delete(ctx, req)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (q *qosObjectService) Get(req *object.GetRequest, s GetObjectStream) error {
|
||||||
|
ctx := q.adj.AdjustIncomingTag(s.Context(), req.GetVerificationHeader().GetBodySignature().GetKey())
|
||||||
|
return q.next.Get(req, &qosReadStream[*object.GetResponse]{
|
||||||
|
ctxF: func() context.Context { return ctx },
|
||||||
|
sender: s,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
func (q *qosObjectService) GetRange(req *object.GetRangeRequest, s GetObjectRangeStream) error {
|
||||||
|
ctx := q.adj.AdjustIncomingTag(s.Context(), req.GetVerificationHeader().GetBodySignature().GetKey())
|
||||||
|
return q.next.GetRange(req, &qosReadStream[*object.GetRangeResponse]{
|
||||||
|
ctxF: func() context.Context { return ctx },
|
||||||
|
sender: s,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
func (q *qosObjectService) GetRangeHash(ctx context.Context, req *object.GetRangeHashRequest) (*object.GetRangeHashResponse, error) {
|
||||||
|
ctx = q.adj.AdjustIncomingTag(ctx, req.GetVerificationHeader().GetBodySignature().GetKey())
|
||||||
|
return q.next.GetRangeHash(ctx, req)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (q *qosObjectService) Head(ctx context.Context, req *object.HeadRequest) (*object.HeadResponse, error) {
|
||||||
|
ctx = q.adj.AdjustIncomingTag(ctx, req.GetVerificationHeader().GetBodySignature().GetKey())
|
||||||
|
return q.next.Head(ctx, req)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (q *qosObjectService) Patch(ctx context.Context) (PatchObjectStream, error) {
|
||||||
|
s, err := q.next.Patch(ctx)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
return &qosWriteStream[*object.PatchRequest, *object.PatchResponse]{
|
||||||
|
s: s,
|
||||||
|
adj: q.adj,
|
||||||
|
}, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (q *qosObjectService) Put(ctx context.Context) (PutObjectStream, error) {
|
||||||
|
s, err := q.next.Put(ctx)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
return &qosWriteStream[*object.PutRequest, *object.PutResponse]{
|
||||||
|
s: s,
|
||||||
|
adj: q.adj,
|
||||||
|
}, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (q *qosObjectService) PutSingle(ctx context.Context, req *object.PutSingleRequest) (*object.PutSingleResponse, error) {
|
||||||
|
ctx = q.adj.AdjustIncomingTag(ctx, req.GetVerificationHeader().GetBodySignature().GetKey())
|
||||||
|
return q.next.PutSingle(ctx, req)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (q *qosObjectService) Search(req *object.SearchRequest, s SearchStream) error {
|
||||||
|
ctx := q.adj.AdjustIncomingTag(s.Context(), req.GetVerificationHeader().GetBodySignature().GetKey())
|
||||||
|
return q.next.Search(req, &qosReadStream[*object.SearchResponse]{
|
||||||
|
ctxF: func() context.Context { return ctx },
|
||||||
|
sender: s,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
type qosSend[T any] interface {
|
||||||
|
Send(T) error
|
||||||
|
}
|
||||||
|
|
||||||
|
type qosReadStream[T any] struct {
|
||||||
|
sender qosSend[T]
|
||||||
|
ctxF func() context.Context
|
||||||
|
}
|
||||||
|
|
||||||
|
func (g *qosReadStream[T]) Context() context.Context {
|
||||||
|
return g.ctxF()
|
||||||
|
}
|
||||||
|
|
||||||
|
func (g *qosReadStream[T]) Send(resp T) error {
|
||||||
|
return g.sender.Send(resp)
|
||||||
|
}
|
||||||
|
|
||||||
|
type qosVerificationHeader interface {
|
||||||
|
GetVerificationHeader() *session.RequestVerificationHeader
|
||||||
|
}
|
||||||
|
|
||||||
|
type qosSendRecv[TReq qosVerificationHeader, TResp any] interface {
|
||||||
|
Send(context.Context, TReq) error
|
||||||
|
CloseAndRecv(context.Context) (TResp, error)
|
||||||
|
}
|
||||||
|
|
||||||
|
type qosWriteStream[TReq qosVerificationHeader, TResp any] struct {
|
||||||
|
s qosSendRecv[TReq, TResp]
|
||||||
|
adj AdjustIOTag
|
||||||
|
}
|
||||||
|
|
||||||
|
func (q *qosWriteStream[TReq, TResp]) CloseAndRecv(ctx context.Context) (TResp, error) {
|
||||||
|
return q.s.CloseAndRecv(ctx)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (q *qosWriteStream[TReq, TResp]) Send(ctx context.Context, req TReq) error {
|
||||||
|
ctx = q.adj.AdjustIncomingTag(ctx, req.GetVerificationHeader().GetBodySignature().GetKey())
|
||||||
|
return q.s.Send(ctx, req)
|
||||||
|
}
|
|
@ -7,7 +7,9 @@ import (
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/qos"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/engine"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/engine"
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-qos/tagging"
|
||||||
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
|
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
|
||||||
"go.uber.org/zap"
|
"go.uber.org/zap"
|
||||||
)
|
)
|
||||||
|
@ -18,6 +20,7 @@ func (p *Policer) Run(ctx context.Context) {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (p *Policer) shardPolicyWorker(ctx context.Context) {
|
func (p *Policer) shardPolicyWorker(ctx context.Context) {
|
||||||
|
ctx = tagging.ContextWithIOTag(ctx, qos.IOTagPolicer.String())
|
||||||
for {
|
for {
|
||||||
select {
|
select {
|
||||||
case <-ctx.Done():
|
case <-ctx.Done():
|
||||||
|
|
|
@ -9,9 +9,11 @@ import (
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
internalNet "git.frostfs.info/TrueCloudLab/frostfs-node/internal/net"
|
internalNet "git.frostfs.info/TrueCloudLab/frostfs-node/internal/net"
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/qos"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/network"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/network"
|
||||||
metrics "git.frostfs.info/TrueCloudLab/frostfs-observability/metrics/grpc"
|
metrics "git.frostfs.info/TrueCloudLab/frostfs-observability/metrics/grpc"
|
||||||
tracing "git.frostfs.info/TrueCloudLab/frostfs-observability/tracing/grpc"
|
tracing "git.frostfs.info/TrueCloudLab/frostfs-observability/tracing/grpc"
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-qos/tagging"
|
||||||
"github.com/hashicorp/golang-lru/v2/simplelru"
|
"github.com/hashicorp/golang-lru/v2/simplelru"
|
||||||
"google.golang.org/grpc"
|
"google.golang.org/grpc"
|
||||||
"google.golang.org/grpc/connectivity"
|
"google.golang.org/grpc/connectivity"
|
||||||
|
@ -95,12 +97,16 @@ func (c *clientCache) dialTreeService(ctx context.Context, netmapAddr string) (*
|
||||||
|
|
||||||
opts := []grpc.DialOption{
|
opts := []grpc.DialOption{
|
||||||
grpc.WithChainUnaryInterceptor(
|
grpc.WithChainUnaryInterceptor(
|
||||||
|
qos.NewAdjustOutgoingIOTagUnaryClientInterceptor(),
|
||||||
metrics.NewUnaryClientInterceptor(),
|
metrics.NewUnaryClientInterceptor(),
|
||||||
tracing.NewUnaryClientInteceptor(),
|
tracing.NewUnaryClientInteceptor(),
|
||||||
|
tagging.NewUnaryClientInteceptor(),
|
||||||
),
|
),
|
||||||
grpc.WithChainStreamInterceptor(
|
grpc.WithChainStreamInterceptor(
|
||||||
|
qos.NewAdjustOutgoingIOTagStreamClientInterceptor(),
|
||||||
metrics.NewStreamClientInterceptor(),
|
metrics.NewStreamClientInterceptor(),
|
||||||
tracing.NewStreamClientInterceptor(),
|
tracing.NewStreamClientInterceptor(),
|
||||||
|
tagging.NewStreamClientInterceptor(),
|
||||||
),
|
),
|
||||||
grpc.WithContextDialer(c.ds.GrpcContextDialer()),
|
grpc.WithContextDialer(c.ds.GrpcContextDialer()),
|
||||||
grpc.WithDefaultCallOptions(grpc.WaitForReady(true)),
|
grpc.WithDefaultCallOptions(grpc.WaitForReady(true)),
|
||||||
|
|
101
pkg/services/tree/qos.go
Normal file
101
pkg/services/tree/qos.go
Normal file
|
@ -0,0 +1,101 @@
|
||||||
|
package tree
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
|
||||||
|
"google.golang.org/grpc"
|
||||||
|
)
|
||||||
|
|
||||||
|
var _ TreeServiceServer = (*ioTagAdjust)(nil)
|
||||||
|
|
||||||
|
type AdjustIOTag interface {
|
||||||
|
AdjustIncomingTag(ctx context.Context, requestSignPublicKey []byte) context.Context
|
||||||
|
}
|
||||||
|
|
||||||
|
type ioTagAdjust struct {
|
||||||
|
s TreeServiceServer
|
||||||
|
a AdjustIOTag
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewIOTagAdjustServer(s TreeServiceServer, a AdjustIOTag) TreeServiceServer {
|
||||||
|
return &ioTagAdjust{
|
||||||
|
s: s,
|
||||||
|
a: a,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (i *ioTagAdjust) Add(ctx context.Context, req *AddRequest) (*AddResponse, error) {
|
||||||
|
ctx = i.a.AdjustIncomingTag(ctx, req.GetSignature().GetKey())
|
||||||
|
return i.s.Add(ctx, req)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (i *ioTagAdjust) AddByPath(ctx context.Context, req *AddByPathRequest) (*AddByPathResponse, error) {
|
||||||
|
ctx = i.a.AdjustIncomingTag(ctx, req.GetSignature().GetKey())
|
||||||
|
return i.s.AddByPath(ctx, req)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (i *ioTagAdjust) Apply(ctx context.Context, req *ApplyRequest) (*ApplyResponse, error) {
|
||||||
|
ctx = i.a.AdjustIncomingTag(ctx, req.GetSignature().GetKey())
|
||||||
|
return i.s.Apply(ctx, req)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (i *ioTagAdjust) GetNodeByPath(ctx context.Context, req *GetNodeByPathRequest) (*GetNodeByPathResponse, error) {
|
||||||
|
ctx = i.a.AdjustIncomingTag(ctx, req.GetSignature().GetKey())
|
||||||
|
return i.s.GetNodeByPath(ctx, req)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (i *ioTagAdjust) GetOpLog(req *GetOpLogRequest, srv TreeService_GetOpLogServer) error {
|
||||||
|
ctx := i.a.AdjustIncomingTag(srv.Context(), req.GetSignature().GetKey())
|
||||||
|
return i.s.GetOpLog(req, &qosServerWrapper[*GetOpLogResponse]{
|
||||||
|
sender: srv,
|
||||||
|
ServerStream: srv,
|
||||||
|
ctxF: func() context.Context { return ctx },
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
func (i *ioTagAdjust) GetSubTree(req *GetSubTreeRequest, srv TreeService_GetSubTreeServer) error {
|
||||||
|
ctx := i.a.AdjustIncomingTag(srv.Context(), req.GetSignature().GetKey())
|
||||||
|
return i.s.GetSubTree(req, &qosServerWrapper[*GetSubTreeResponse]{
|
||||||
|
sender: srv,
|
||||||
|
ServerStream: srv,
|
||||||
|
ctxF: func() context.Context { return ctx },
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
func (i *ioTagAdjust) Healthcheck(ctx context.Context, req *HealthcheckRequest) (*HealthcheckResponse, error) {
|
||||||
|
ctx = i.a.AdjustIncomingTag(ctx, req.GetSignature().GetKey())
|
||||||
|
return i.s.Healthcheck(ctx, req)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (i *ioTagAdjust) Move(ctx context.Context, req *MoveRequest) (*MoveResponse, error) {
|
||||||
|
ctx = i.a.AdjustIncomingTag(ctx, req.GetSignature().GetKey())
|
||||||
|
return i.s.Move(ctx, req)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (i *ioTagAdjust) Remove(ctx context.Context, req *RemoveRequest) (*RemoveResponse, error) {
|
||||||
|
ctx = i.a.AdjustIncomingTag(ctx, req.GetSignature().GetKey())
|
||||||
|
return i.s.Remove(ctx, req)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (i *ioTagAdjust) TreeList(ctx context.Context, req *TreeListRequest) (*TreeListResponse, error) {
|
||||||
|
ctx = i.a.AdjustIncomingTag(ctx, req.GetSignature().GetKey())
|
||||||
|
return i.s.TreeList(ctx, req)
|
||||||
|
}
|
||||||
|
|
||||||
|
type qosSend[T any] interface {
|
||||||
|
Send(T) error
|
||||||
|
}
|
||||||
|
|
||||||
|
type qosServerWrapper[T any] struct {
|
||||||
|
grpc.ServerStream
|
||||||
|
sender qosSend[T]
|
||||||
|
ctxF func() context.Context
|
||||||
|
}
|
||||||
|
|
||||||
|
func (w *qosServerWrapper[T]) Send(resp T) error {
|
||||||
|
return w.sender.Send(resp)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (w *qosServerWrapper[T]) Context() context.Context {
|
||||||
|
return w.ctxF()
|
||||||
|
}
|
|
@ -9,9 +9,11 @@ import (
|
||||||
"sync"
|
"sync"
|
||||||
"sync/atomic"
|
"sync/atomic"
|
||||||
|
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/qos"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/pilorama"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/pilorama"
|
||||||
checkercore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/common/ape"
|
checkercore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/common/ape"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger"
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-qos/tagging"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/acl"
|
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/acl"
|
||||||
cidSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
|
cidSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
|
||||||
netmapSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/netmap"
|
netmapSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/netmap"
|
||||||
|
@ -83,6 +85,7 @@ func New(opts ...Option) *Service {
|
||||||
|
|
||||||
// Start starts the service.
|
// Start starts the service.
|
||||||
func (s *Service) Start(ctx context.Context) {
|
func (s *Service) Start(ctx context.Context) {
|
||||||
|
ctx = tagging.ContextWithIOTag(ctx, qos.IOTagBackground.String())
|
||||||
go s.replicateLoop(ctx)
|
go s.replicateLoop(ctx)
|
||||||
go s.syncLoop(ctx)
|
go s.syncLoop(ctx)
|
||||||
|
|
||||||
|
|
|
@ -13,6 +13,7 @@ import (
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/qos"
|
||||||
containerCore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/container"
|
containerCore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/container"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/pilorama"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/pilorama"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/client/netmap"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/client/netmap"
|
||||||
|
@ -20,6 +21,7 @@ import (
|
||||||
metrics "git.frostfs.info/TrueCloudLab/frostfs-observability/metrics/grpc"
|
metrics "git.frostfs.info/TrueCloudLab/frostfs-observability/metrics/grpc"
|
||||||
tracing "git.frostfs.info/TrueCloudLab/frostfs-observability/tracing"
|
tracing "git.frostfs.info/TrueCloudLab/frostfs-observability/tracing"
|
||||||
tracing_grpc "git.frostfs.info/TrueCloudLab/frostfs-observability/tracing/grpc"
|
tracing_grpc "git.frostfs.info/TrueCloudLab/frostfs-observability/tracing/grpc"
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-qos/tagging"
|
||||||
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
|
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
|
||||||
netmapSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/netmap"
|
netmapSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/netmap"
|
||||||
"github.com/panjf2000/ants/v2"
|
"github.com/panjf2000/ants/v2"
|
||||||
|
@ -340,12 +342,16 @@ func (s *Service) synchronizeTree(ctx context.Context, cid cid.ID, from uint64,
|
||||||
func (*Service) createConnection(a network.Address) (*grpc.ClientConn, error) {
|
func (*Service) createConnection(a network.Address) (*grpc.ClientConn, error) {
|
||||||
return grpc.NewClient(a.URIAddr(),
|
return grpc.NewClient(a.URIAddr(),
|
||||||
grpc.WithChainUnaryInterceptor(
|
grpc.WithChainUnaryInterceptor(
|
||||||
|
qos.NewAdjustOutgoingIOTagUnaryClientInterceptor(),
|
||||||
metrics.NewUnaryClientInterceptor(),
|
metrics.NewUnaryClientInterceptor(),
|
||||||
tracing_grpc.NewUnaryClientInteceptor(),
|
tracing_grpc.NewUnaryClientInteceptor(),
|
||||||
|
tagging.NewUnaryClientInteceptor(),
|
||||||
),
|
),
|
||||||
grpc.WithChainStreamInterceptor(
|
grpc.WithChainStreamInterceptor(
|
||||||
|
qos.NewAdjustOutgoingIOTagStreamClientInterceptor(),
|
||||||
metrics.NewStreamClientInterceptor(),
|
metrics.NewStreamClientInterceptor(),
|
||||||
tracing_grpc.NewStreamClientInterceptor(),
|
tracing_grpc.NewStreamClientInterceptor(),
|
||||||
|
tagging.NewStreamClientInterceptor(),
|
||||||
),
|
),
|
||||||
grpc.WithTransportCredentials(insecure.NewCredentials()),
|
grpc.WithTransportCredentials(insecure.NewCredentials()),
|
||||||
grpc.WithDefaultCallOptions(grpc.WaitForReady(true)),
|
grpc.WithDefaultCallOptions(grpc.WaitForReady(true)),
|
||||||
|
|
|
@ -4,37 +4,32 @@ import (
|
||||||
"context"
|
"context"
|
||||||
|
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/tracing"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/tracing"
|
||||||
|
qos "git.frostfs.info/TrueCloudLab/frostfs-qos/tagging"
|
||||||
"go.uber.org/zap"
|
"go.uber.org/zap"
|
||||||
)
|
)
|
||||||
|
|
||||||
func (l *Logger) Debug(ctx context.Context, msg string, fields ...zap.Field) {
|
func (l *Logger) Debug(ctx context.Context, msg string, fields ...zap.Field) {
|
||||||
if traceID := tracing.GetTraceID(ctx); traceID != "" {
|
l.z.Debug(msg, appendContext(ctx, fields...)...)
|
||||||
l.z.Debug(msg, append(fields, zap.String("trace_id", traceID))...)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
l.z.Debug(msg, fields...)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (l *Logger) Info(ctx context.Context, msg string, fields ...zap.Field) {
|
func (l *Logger) Info(ctx context.Context, msg string, fields ...zap.Field) {
|
||||||
if traceID := tracing.GetTraceID(ctx); traceID != "" {
|
l.z.Info(msg, appendContext(ctx, fields...)...)
|
||||||
l.z.Info(msg, append(fields, zap.String("trace_id", traceID))...)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
l.z.Info(msg, fields...)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (l *Logger) Warn(ctx context.Context, msg string, fields ...zap.Field) {
|
func (l *Logger) Warn(ctx context.Context, msg string, fields ...zap.Field) {
|
||||||
if traceID := tracing.GetTraceID(ctx); traceID != "" {
|
l.z.Warn(msg, appendContext(ctx, fields...)...)
|
||||||
l.z.Warn(msg, append(fields, zap.String("trace_id", traceID))...)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
l.z.Warn(msg, fields...)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (l *Logger) Error(ctx context.Context, msg string, fields ...zap.Field) {
|
func (l *Logger) Error(ctx context.Context, msg string, fields ...zap.Field) {
|
||||||
if traceID := tracing.GetTraceID(ctx); traceID != "" {
|
l.z.Error(msg, appendContext(ctx, fields...)...)
|
||||||
l.z.Error(msg, append(fields, zap.String("trace_id", traceID))...)
|
}
|
||||||
return
|
|
||||||
}
|
func appendContext(ctx context.Context, fields ...zap.Field) []zap.Field {
|
||||||
l.z.Error(msg, fields...)
|
if traceID := tracing.GetTraceID(ctx); traceID != "" {
|
||||||
|
fields = append(fields, zap.String("trace_id", traceID))
|
||||||
|
}
|
||||||
|
if ioTag, ioTagDefined := qos.IOTagFromContext(ctx); ioTagDefined {
|
||||||
|
fields = append(fields, zap.String("io_tag", ioTag))
|
||||||
|
}
|
||||||
|
return fields
|
||||||
}
|
}
|
||||||
|
|
Loading…
Add table
Reference in a new issue