Set IO tags #1608
|
@ -9,7 +9,6 @@ import (
|
|||
"git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-cli/internal/commonflags"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/network"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/tree"
|
||||
metrics "git.frostfs.info/TrueCloudLab/frostfs-observability/metrics/grpc"
|
||||
tracing "git.frostfs.info/TrueCloudLab/frostfs-observability/tracing/grpc"
|
||||
"github.com/spf13/cobra"
|
||||
"github.com/spf13/viper"
|
||||
|
@ -34,11 +33,9 @@ func _client() (tree.TreeServiceClient, error) {
|
|||
|
||||
opts := []grpc.DialOption{
|
||||
grpc.WithChainUnaryInterceptor(
|
||||
metrics.NewUnaryClientInterceptor(),
|
||||
|
||||
tracing.NewUnaryClientInteceptor(),
|
||||
),
|
||||
grpc.WithChainStreamInterceptor(
|
||||
metrics.NewStreamClientInterceptor(),
|
||||
tracing.NewStreamClientInterceptor(),
|
||||
),
|
||||
grpc.WithDefaultCallOptions(grpc.WaitForReady(true)),
|
||||
|
|
|
@ -493,6 +493,7 @@ type cfg struct {
|
|||
cfgNetmap cfgNetmap
|
||||
cfgControlService cfgControlService
|
||||
cfgObject cfgObject
|
||||
cfgQoSService cfgQoSService
|
||||
}
|
||||
|
||||
// ReadCurrentNetMap reads network map which has been cached at the
|
||||
|
|
46
cmd/frostfs-node/config/qos/config.go
Normal file
|
@ -0,0 +1,46 @@
|
|||
package qos
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config"
|
||||
"github.com/nspcc-dev/neo-go/pkg/crypto/keys"
|
||||
)
|
||||
|
||||
const (
|
||||
subsection = "qos"
|
||||
criticalSubSection = "critical"
|
||||
internalSubSection = "internal"
|
||||
)
|
||||
|
||||
// CriticalAuthorizedKeys parses and returns an array of "critical.authorized_keys" config
|
||||
// parameter from "qos" section.
|
||||
//
|
||||
// Returns an empty list if not set.
|
||||
func CriticalAuthorizedKeys(c *config.Config) keys.PublicKeys {
|
||||
return authorizedKeys(c, criticalSubSection)
|
||||
}
|
||||
|
||||
// InternalAuthorizedKeys parses and returns an array of "internal.authorized_keys" config
|
||||
// parameter from "qos" section.
|
||||
//
|
||||
// Returns an empty list if not set.
|
||||
func InternalAuthorizedKeys(c *config.Config) keys.PublicKeys {
|
||||
return authorizedKeys(c, internalSubSection)
|
||||
}
|
||||
|
||||
func authorizedKeys(c *config.Config, sub string) keys.PublicKeys {
|
||||
strKeys := config.StringSliceSafe(c.Sub(subsection).Sub(sub), "authorized_keys")
|
||||
pubs := make(keys.PublicKeys, 0, len(strKeys))
|
||||
|
||||
for i := range strKeys {
|
||||
pub, err := keys.NewPublicKeyFromString(strKeys[i])
|
||||
if err != nil {
|
||||
panic(fmt.Errorf("invalid authorized key %s for qos.%s: %w", strKeys[i], sub, err))
|
||||
}
|
||||
|
||||
pubs = append(pubs, pub)
|
||||
}
|
||||
|
||||
return pubs
|
||||
}
|
40
cmd/frostfs-node/config/qos/config_test.go
Normal file
|
@ -0,0 +1,40 @@
|
|||
package qos
|
||||
|
||||
import (
|
||||
"testing"
|
||||
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config"
|
||||
configtest "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/test"
|
||||
"github.com/nspcc-dev/neo-go/pkg/crypto/keys"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
func TestQoSSection(t *testing.T) {
|
||||
t.Run("defaults", func(t *testing.T) {
|
||||
empty := configtest.EmptyConfig()
|
||||
|
||||
require.Empty(t, CriticalAuthorizedKeys(empty))
|
||||
require.Empty(t, InternalAuthorizedKeys(empty))
|
||||
})
|
||||
|
||||
const path = "../../../../config/example/node"
|
||||
|
||||
criticalPubs := make(keys.PublicKeys, 2)
|
||||
criticalPubs[0], _ = keys.NewPublicKeyFromString("035839e45d472a3b7769a2a1bd7d54c4ccd4943c3b40f547870e83a8fcbfb3ce11")
|
||||
criticalPubs[1], _ = keys.NewPublicKeyFromString("028f42cfcb74499d7b15b35d9bff260a1c8d27de4f446a627406a382d8961486d6")
|
||||
|
||||
internalPubs := make(keys.PublicKeys, 2)
|
||||
internalPubs[0], _ = keys.NewPublicKeyFromString("02b3622bf4017bdfe317c58aed5f4c753f206b7db896046fa7d774bbc4bf7f8dc2")
|
||||
internalPubs[1], _ = keys.NewPublicKeyFromString("031a6c6fbbdf02ca351745fa86b9ba5a9452d785ac4f7fc2b7548ca2a46c4fcf4a")
|
||||
|
||||
fileConfigTest := func(c *config.Config) {
|
||||
require.Equal(t, criticalPubs, CriticalAuthorizedKeys(c))
|
||||
require.Equal(t, internalPubs, InternalAuthorizedKeys(c))
|
||||
}
|
||||
|
||||
configtest.ForEachFileType(path, fileConfigTest)
|
||||
|
||||
t.Run("ENV", func(t *testing.T) {
|
||||
configtest.ForEnvFileType(t, path, fileConfigTest)
|
||||
})
|
||||
}
|
|
@ -7,9 +7,12 @@ import (
|
|||
|
||||
controlconfig "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/control"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/qos"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/control"
|
||||
controlSvc "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/control/server"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/sdnotify"
|
||||
metrics "git.frostfs.info/TrueCloudLab/frostfs-observability/metrics/grpc"
|
||||
tracing "git.frostfs.info/TrueCloudLab/frostfs-observability/tracing/grpc"
|
||||
"go.uber.org/zap"
|
||||
"google.golang.org/grpc"
|
||||
)
|
||||
|
@ -50,7 +53,14 @@ func initControlService(ctx context.Context, c *cfg) {
|
|||
return
|
||||
}
|
||||
|
||||
c.cfgControlService.server = grpc.NewServer()
|
||||
c.cfgControlService.server = grpc.NewServer(
|
||||
grpc.ChainUnaryInterceptor(
|
||||
qos.NewSetCriticalIOTagUnaryServerInterceptor(),
|
||||
dstepanov-yadro
commented
First interceptor is the outer most, so it will be incoming RPC -> set IO tag critical -> metrics -> tracing First interceptor is the outer most, so it will be incoming RPC -> set IO tag critical -> metrics -> tracing
|
||||
metrics.NewUnaryServerInterceptor(),
|
||||
tracing.NewUnaryServerInterceptor(),
|
||||
),
|
||||
// control service has no stream methods, so no stream interceptors added
|
||||
fyrchik
commented
When it will have stream methods, we won't remember this line. When it will have stream methods, we won't remember this line.
Is there any problem with adding them?
dstepanov-yadro
commented
Just I don't want to add something I can't test. Just I don't want to add something I can't test.
|
||||
)
|
||||
|
||||
c.onShutdown(func() {
|
||||
stopGRPC(ctx, "FrostFS Control API", c.cfgControlService.server, c.log)
|
||||
|
|
|
@ -12,6 +12,7 @@ import (
|
|||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger"
|
||||
metrics "git.frostfs.info/TrueCloudLab/frostfs-observability/metrics/grpc"
|
||||
tracing "git.frostfs.info/TrueCloudLab/frostfs-observability/tracing/grpc"
|
||||
qos "git.frostfs.info/TrueCloudLab/frostfs-qos/tagging"
|
||||
"go.uber.org/zap"
|
||||
"google.golang.org/grpc"
|
||||
"google.golang.org/grpc/credentials"
|
||||
|
@ -130,10 +131,12 @@ func getGrpcServerOpts(ctx context.Context, c *cfg, sc *grpcconfig.Config) ([]gr
|
|||
serverOpts := []grpc.ServerOption{
|
||||
grpc.MaxRecvMsgSize(maxRecvMsgSize),
|
||||
grpc.ChainUnaryInterceptor(
|
||||
qos.NewUnaryServerInterceptor(),
|
||||
dstepanov-yadro
commented
This is the outer most interceptor, so so it will be incoming RPC -> parse and set IO tag -> metrics -> tracing This is the outer most interceptor, so so it will be incoming RPC -> parse and set IO tag -> metrics -> tracing
|
||||
metrics.NewUnaryServerInterceptor(),
|
||||
tracing.NewUnaryServerInterceptor(),
|
||||
),
|
||||
grpc.ChainStreamInterceptor(
|
||||
qos.NewStreamServerInterceptor(),
|
||||
metrics.NewStreamServerInterceptor(),
|
||||
tracing.NewStreamServerInterceptor(),
|
||||
),
|
||||
|
|
|
@ -101,6 +101,7 @@ func initApp(ctx context.Context, c *cfg) {
|
|||
|
||||
initAndLog(ctx, c, "gRPC", func(c *cfg) { initGRPC(ctx, c) })
|
||||
initAndLog(ctx, c, "netmap", func(c *cfg) { initNetmapService(ctx, c) })
|
||||
initAndLog(ctx, c, "qos", func(c *cfg) { initQoSService(c) })
|
||||
fyrchik
commented
Could you explain, why do we have both interceptor and service? Could you explain, why do we have both interceptor and service?
dstepanov-yadro
commented
See comment about tree service: #1608 (comment) See comment about tree service: https://git.frostfs.info/TrueCloudLab/frostfs-node/pulls/1608#issuecomment-66636
|
||||
|
||||
initAccessPolicyEngine(ctx, c)
|
||||
initAndLog(ctx, c, "access policy engine", func(c *cfg) {
|
||||
|
|
|
@ -168,7 +168,7 @@ func initObjectService(c *cfg) {
|
|||
sPatch := createPatchSvc(sGet, sPut)
|
||||
|
||||
// build service pipeline
|
||||
// grpc | audit | <metrics> | signature | response | acl | ape | split
|
||||
// grpc | audit | qos | <metrics> | signature | response | acl | ape | split
|
||||
|
||||
splitSvc := createSplitService(c, sPutV2, sGetV2, sSearchV2, sDeleteV2, sPatch)
|
||||
|
||||
|
@ -191,7 +191,8 @@ func initObjectService(c *cfg) {
|
|||
|
||||
c.shared.metricsSvc = objectService.NewMetricCollector(
|
||||
signSvc, c.metricsCollector.ObjectService(), metricsconfig.Enabled(c.appCfg))
|
||||
auditSvc := objectService.NewAuditService(c.shared.metricsSvc, c.log, c.audit)
|
||||
qosService := objectService.NewQoSObjectService(c.shared.metricsSvc, &c.cfgQoSService)
|
||||
auditSvc := objectService.NewAuditService(qosService, c.log, c.audit)
|
||||
server := objectTransportGRPC.New(auditSvc)
|
||||
|
||||
c.cfgGRPC.performAndSave(func(_ string, _ net.Listener, s *grpc.Server) {
|
||||
|
|
95
cmd/frostfs-node/qos.go
Normal file
|
@ -0,0 +1,95 @@
|
|||
package main
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
|
||||
qosconfig "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/qos"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/qos"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/netmap"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger"
|
||||
qosTagging "git.frostfs.info/TrueCloudLab/frostfs-qos/tagging"
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
type cfgQoSService struct {
|
||||
netmapSource netmap.Source
|
||||
logger *logger.Logger
|
||||
allowedCriticalPubs [][]byte
|
||||
allowedInternalPubs [][]byte
|
||||
}
|
||||
|
||||
func initQoSService(c *cfg) {
|
||||
criticalPubs := qosconfig.CriticalAuthorizedKeys(c.appCfg)
|
||||
internalPubs := qosconfig.InternalAuthorizedKeys(c.appCfg)
|
||||
rawCriticalPubs := make([][]byte, 0, len(criticalPubs))
|
||||
rawInternalPubs := make([][]byte, 0, len(internalPubs))
|
||||
for i := range criticalPubs {
|
||||
rawCriticalPubs = append(rawCriticalPubs, criticalPubs[i].Bytes())
|
||||
}
|
||||
for i := range internalPubs {
|
||||
rawInternalPubs = append(rawInternalPubs, internalPubs[i].Bytes())
|
||||
}
|
||||
|
||||
c.cfgQoSService = cfgQoSService{
|
||||
netmapSource: c.netMapSource,
|
||||
logger: c.log,
|
||||
allowedCriticalPubs: rawCriticalPubs,
|
||||
allowedInternalPubs: rawInternalPubs,
|
||||
}
|
||||
}
|
||||
|
||||
func (s *cfgQoSService) AdjustIncomingTag(ctx context.Context, requestSignPublicKey []byte) context.Context {
|
||||
achuprov marked this conversation as resolved
Outdated
achuprov
commented
Typo Typo `AdjustIncommingTag` -> `AdjustIncomingTag`
dstepanov-yadro
commented
fixed fixed
a-savchuk
commented
This typo still appears in many places, including commit messages. Please make sure to fix it everywhere This typo still appears in many places, including commit messages. Please make sure to fix it everywhere
dstepanov-yadro
commented
fixed fixed
|
||||
rawTag, defined := qosTagging.IOTagFromContext(ctx)
|
||||
if !defined {
|
||||
return qosTagging.ContextWithIOTag(ctx, qos.IOTagClient.String())
|
||||
}
|
||||
ioTag, err := qos.FromRawString(rawTag)
|
||||
if err != nil {
|
||||
s.logger.Warn(ctx, logs.FailedToParseIncomingIOTag, zap.Error(err))
|
||||
return qosTagging.ContextWithIOTag(ctx, qos.IOTagClient.String())
|
||||
}
|
||||
|
||||
switch ioTag {
|
||||
case qos.IOTagClient:
|
||||
return ctx
|
||||
case qos.IOTagCritical:
|
||||
for _, pk := range s.allowedCriticalPubs {
|
||||
if bytes.Equal(pk, requestSignPublicKey) {
|
||||
return ctx
|
||||
}
|
||||
}
|
||||
nm, err := s.netmapSource.GetNetMap(ctx, 0)
|
||||
if err != nil {
|
||||
s.logger.Debug(ctx, logs.FailedToGetNetmapToAdjustIOTag, zap.Error(err))
|
||||
fyrchik marked this conversation as resolved
Outdated
fyrchik
commented
I won't be so sure about I won't be so sure about `Error` -- it could be done on every request.
How about `debug`?
Not insisting.
dstepanov-yadro
commented
Ok, fixed Ok, fixed
|
||||
return qosTagging.ContextWithIOTag(ctx, qos.IOTagClient.String())
|
||||
}
|
||||
for _, node := range nm.Nodes() {
|
||||
if bytes.Equal(node.PublicKey(), requestSignPublicKey) {
|
||||
return ctx
|
||||
}
|
||||
}
|
||||
return qosTagging.ContextWithIOTag(ctx, qos.IOTagClient.String())
|
||||
fyrchik marked this conversation as resolved
Outdated
fyrchik
commented
Does Does `qos.IOTagClient.String()` allocate?
It seems constant, so better be it.
dstepanov-yadro
commented
```
package qos
import (
"testing"
"github.com/stretchr/testify/require"
)
var ioTagBackgroundString = "background"
func (t IOTag) String2() string {
switch t {
case IOTagBackground:
return ioTagBackgroundString
default:
return ""
}
}
func BenchmarkTag(b *testing.B) {
tag := IOTagBackground
b.Run("cast", func(b *testing.B) {
b.ReportAllocs()
b.ResetTimer()
require.Equal(b, ioTagBackgroundString, string(tag))
})
b.Run("String()", func(b *testing.B) {
b.ReportAllocs()
b.ResetTimer()
require.Equal(b, ioTagBackgroundString, tag.String())
})
b.Run("String2()", func(b *testing.B) {
b.ReportAllocs()
b.ResetTimer()
require.Equal(b, ioTagBackgroundString, tag.String2())
})
}
BenchmarkTag/cast-8 1000000000 0.0000052 ns/op 0 B/op 0 allocs/op
BenchmarkTag/String()-8 1000000000 0.0000024 ns/op 0 B/op 0 allocs/op
BenchmarkTag/String2()-8 1000000000 0.0000028 ns/op 0 B/op 0 allocs/op
PASS
```
|
||||
case qos.IOTagInternal:
|
||||
for _, pk := range s.allowedInternalPubs {
|
||||
if bytes.Equal(pk, requestSignPublicKey) {
|
||||
return ctx
|
||||
}
|
||||
}
|
||||
nm, err := s.netmapSource.GetNetMap(ctx, 0)
|
||||
if err != nil {
|
||||
s.logger.Debug(ctx, logs.FailedToGetNetmapToAdjustIOTag, zap.Error(err))
|
||||
return qosTagging.ContextWithIOTag(ctx, qos.IOTagClient.String())
|
||||
}
|
||||
for _, node := range nm.Nodes() {
|
||||
if bytes.Equal(node.PublicKey(), requestSignPublicKey) {
|
||||
return ctx
|
||||
}
|
||||
}
|
||||
return qosTagging.ContextWithIOTag(ctx, qos.IOTagClient.String())
|
||||
default:
|
||||
s.logger.Warn(ctx, logs.NotSupportedIncomingIOTagReplacedWithClient, zap.Stringer("io_tag", ioTag))
|
||||
return qosTagging.ContextWithIOTag(ctx, qos.IOTagClient.String())
|
||||
}
|
||||
}
|
|
@ -72,7 +72,7 @@ func initTreeService(c *cfg) {
|
|||
)
|
||||
|
||||
c.cfgGRPC.performAndSave(func(_ string, _ net.Listener, s *grpc.Server) {
|
||||
tree.RegisterTreeServiceServer(s, c.treeService)
|
||||
tree.RegisterTreeServiceServer(s, tree.NewIOTagAdjustServer(c.treeService, &c.cfgQoSService))
|
||||
})
|
||||
|
||||
c.workers = append(c.workers, newWorkerFromFunc(func(ctx context.Context) {
|
||||
|
|
|
@ -225,3 +225,6 @@ FROSTFS_MULTINET_SUBNETS_1_SOURCE_IPS="10.78.70.185 10.78.71.185"
|
|||
FROSTFS_MULTINET_BALANCER=roundrobin
|
||||
FROSTFS_MULTINET_RESTRICT=false
|
||||
FROSTFS_MULTINET_FALLBACK_DELAY=350ms
|
||||
|
||||
FROSTFS_QOS_CRITICAL_AUTHORIZED_KEYS="035839e45d472a3b7769a2a1bd7d54c4ccd4943c3b40f547870e83a8fcbfb3ce11 028f42cfcb74499d7b15b35d9bff260a1c8d27de4f446a627406a382d8961486d6"
|
||||
fyrchik marked this conversation as resolved
Outdated
fyrchik
commented
Could you provide a different pair of keys for different section? Could you provide a different pair of keys for different section?
Testing same keys easily misses copy-paste errors.
dstepanov-yadro
commented
done done
|
||||
FROSTFS_QOS_INTERNAL_AUTHORIZED_KEYS="02b3622bf4017bdfe317c58aed5f4c753f206b7db896046fa7d774bbc4bf7f8dc2 031a6c6fbbdf02ca351745fa86b9ba5a9452d785ac4f7fc2b7548ca2a46c4fcf4a"
|
||||
|
|
|
@ -305,5 +305,19 @@
|
|||
"balancer": "roundrobin",
|
||||
"restrict": false,
|
||||
"fallback_delay": "350ms"
|
||||
},
|
||||
"qos": {
|
||||
"critical": {
|
||||
"authorized_keys": [
|
||||
"035839e45d472a3b7769a2a1bd7d54c4ccd4943c3b40f547870e83a8fcbfb3ce11",
|
||||
"028f42cfcb74499d7b15b35d9bff260a1c8d27de4f446a627406a382d8961486d6"
|
||||
]
|
||||
},
|
||||
"internal": {
|
||||
"authorized_keys": [
|
||||
"02b3622bf4017bdfe317c58aed5f4c753f206b7db896046fa7d774bbc4bf7f8dc2",
|
||||
"031a6c6fbbdf02ca351745fa86b9ba5a9452d785ac4f7fc2b7548ca2a46c4fcf4a"
|
||||
]
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -270,3 +270,13 @@ multinet:
|
|||
balancer: roundrobin
|
||||
restrict: false
|
||||
fallback_delay: 350ms
|
||||
|
||||
qos:
|
||||
critical:
|
||||
authorized_keys: # list of hex-encoded public keys that have rights to use `critical` IO tag
|
||||
- 035839e45d472a3b7769a2a1bd7d54c4ccd4943c3b40f547870e83a8fcbfb3ce11
|
||||
- 028f42cfcb74499d7b15b35d9bff260a1c8d27de4f446a627406a382d8961486d6
|
||||
internal:
|
||||
authorized_keys: # list of hex-encoded public keys that have rights to use `internal` IO tag
|
||||
- 02b3622bf4017bdfe317c58aed5f4c753f206b7db896046fa7d774bbc4bf7f8dc2
|
||||
- 031a6c6fbbdf02ca351745fa86b9ba5a9452d785ac4f7fc2b7548ca2a46c4fcf4a
|
||||
|
|
|
@ -27,6 +27,7 @@ There are some custom types used for brevity:
|
|||
| `runtime` | [Runtime configuration](#runtime-section) |
|
||||
| `audit` | [Audit configuration](#audit-section) |
|
||||
| `multinet` | [Multinet configuration](#multinet-section) |
|
||||
| `qos` | [QoS configuration](#qos-section) |
|
||||
|
||||
# `control` section
|
||||
```yaml
|
||||
|
@ -471,3 +472,20 @@ multinet:
|
|||
| `balancer` | `string` | "" | Balancer to select network interfaces, allowed values are "" (no balancing, use first suitable interface) or "roundrobin". |
|
||||
| `restrict` | `bool` | false | If `true` then any requests that do not match `subnets` will fail. |
|
||||
| `fallback_delay` | `duration` | 350ms | Delay before fallback to secondary IP addresses in case of hostname resolve. |
|
||||
|
||||
# `qos` section
|
||||
```yaml
|
||||
qos:
|
||||
critical:
|
||||
authorized_keys:
|
||||
- 035839e45d472a3b7769a2a1bd7d54c4ccd4943c3b40f547870e83a8fcbfb3ce11
|
||||
- 028f42cfcb74499d7b15b35d9bff260a1c8d27de4f446a627406a382d8961486d6
|
||||
internal:
|
||||
authorized_keys:
|
||||
- 035839e45d472a3b7769a2a1bd7d54c4ccd4943c3b40f547870e83a8fcbfb3ce11
|
||||
- 028f42cfcb74499d7b15b35d9bff260a1c8d27de4f446a627406a382d8961486d6
|
||||
```
|
||||
| Parameter | Type | Default value | Description |
|
||||
| -------------------------- | -------------- | ------------- | --------------------------------------------------------------------------- |
|
||||
| `critical.authorized_keys` | `[]public key` | empty | List of public keys for which requests with the tag `critical` are allowed. |
|
||||
| `internal.authorized_keys` | `[]public key` | empty | List of public keys for which requests with the tag `internal` are allowed. |
|
||||
|
|
1
go.mod
|
@ -8,6 +8,7 @@ require (
|
|||
git.frostfs.info/TrueCloudLab/frostfs-crypto v0.6.0
|
||||
git.frostfs.info/TrueCloudLab/frostfs-locode-db v0.4.1-0.20240710074952-65761deb5c0d
|
||||
git.frostfs.info/TrueCloudLab/frostfs-observability v0.0.0-20241112082307-f17779933e88
|
||||
git.frostfs.info/TrueCloudLab/frostfs-qos v0.0.0-20250128150313-cfbca7fa1dfe
|
||||
git.frostfs.info/TrueCloudLab/frostfs-sdk-go v0.0.0-20250202151421-8389887a3421
|
||||
git.frostfs.info/TrueCloudLab/hrw v1.2.1
|
||||
git.frostfs.info/TrueCloudLab/multinet v0.0.0-20241015075604-6cb0d80e0972
|
||||
|
|
2
go.sum
|
@ -8,6 +8,8 @@ git.frostfs.info/TrueCloudLab/frostfs-locode-db v0.4.1-0.20240710074952-65761deb
|
|||
git.frostfs.info/TrueCloudLab/frostfs-locode-db v0.4.1-0.20240710074952-65761deb5c0d/go.mod h1:7ZZq8iguY7qFsXajdHGmZd2AW4QbucyrJwhbsRfOfek=
|
||||
git.frostfs.info/TrueCloudLab/frostfs-observability v0.0.0-20241112082307-f17779933e88 h1:9bvBDLApbbO5sXBKdODpE9tzy3HV99nXxkDWNn22rdI=
|
||||
git.frostfs.info/TrueCloudLab/frostfs-observability v0.0.0-20241112082307-f17779933e88/go.mod h1:kbwB4v2o6RyOfCo9kEFeUDZIX3LKhmS0yXPrtvzkQ1g=
|
||||
git.frostfs.info/TrueCloudLab/frostfs-qos v0.0.0-20250128150313-cfbca7fa1dfe h1:81gDNdWNLP24oMQukRiCE9R1wGSh0l0dRq3F1W+Oesc=
|
||||
git.frostfs.info/TrueCloudLab/frostfs-qos v0.0.0-20250128150313-cfbca7fa1dfe/go.mod h1:PCijYq4oa8vKtIEcUX6jRiszI6XAW+nBwU+T1kB4d1U=
|
||||
git.frostfs.info/TrueCloudLab/frostfs-sdk-go v0.0.0-20250202151421-8389887a3421 h1:pP19IawSdsLCKFv7HMNfWAeH6E3uSnntKZkwka+/2+4=
|
||||
git.frostfs.info/TrueCloudLab/frostfs-sdk-go v0.0.0-20250202151421-8389887a3421/go.mod h1:aQpPWfG8oyfJ2X+FenPTJpSRWZjwcP5/RAtkW+/VEX8=
|
||||
git.frostfs.info/TrueCloudLab/hrw v1.2.1 h1:ccBRK21rFvY5R1WotI6LNoPlizk7qSvdfD8lNIRudVc=
|
||||
|
|
|
@ -510,4 +510,7 @@ const (
|
|||
BlobovniczatreeFailedToRemoveRebuildTempFile = "failed to remove rebuild temp file"
|
||||
WritecacheCantGetObject = "can't get an object from fstree"
|
||||
FailedToUpdateMultinetConfiguration = "failed to update multinet configuration"
|
||||
FailedToParseIncomingIOTag = "failed to parse incoming IO tag"
|
||||
acid-ant
commented
FailerToParseIncommingIOTag -> FailedToParseIncommingIOTag FailerToParseIncommingIOTag -> FailedToParseIncommingIOTag
dstepanov-yadro
commented
fixed fixed
|
||||
NotSupportedIncomingIOTagReplacedWithClient = "incoming IO tag is not supported, replaced with `client`"
|
||||
FailedToGetNetmapToAdjustIOTag = "failed to get netmap to adjust IO tag, replaced with `client`"
|
||||
)
|
||||
|
|
51
internal/qos/grpc.go
Normal file
|
@ -0,0 +1,51 @@
|
|||
package qos
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-qos/tagging"
|
||||
"google.golang.org/grpc"
|
||||
)
|
||||
|
||||
func NewSetCriticalIOTagUnaryServerInterceptor() grpc.UnaryServerInterceptor {
|
||||
return func(ctx context.Context, req any, _ *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (resp any, err error) {
|
||||
ctx = tagging.ContextWithIOTag(ctx, IOTagCritical.String())
|
||||
return handler(ctx, req)
|
||||
}
|
||||
}
|
||||
|
||||
func NewAdjustOutgoingIOTagUnaryClientInterceptor() grpc.UnaryClientInterceptor {
|
||||
return func(ctx context.Context, method string, req, reply any, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error {
|
||||
rawTag, ok := tagging.IOTagFromContext(ctx)
|
||||
if !ok {
|
||||
return invoker(ctx, method, req, reply, cc, opts...)
|
||||
}
|
||||
tag, err := FromRawString(rawTag)
|
||||
if err != nil {
|
||||
tag = IOTagClient
|
||||
}
|
||||
if tag == IOTagBackground || tag == IOTagPolicer || tag == IOTagWritecache {
|
||||
tag = IOTagInternal
|
||||
}
|
||||
ctx = tagging.ContextWithIOTag(ctx, tag.String())
|
||||
return invoker(ctx, method, req, reply, cc, opts...)
|
||||
}
|
||||
}
|
||||
|
||||
func NewAdjustOutgoingIOTagStreamClientInterceptor() grpc.StreamClientInterceptor {
|
||||
return func(ctx context.Context, desc *grpc.StreamDesc, cc *grpc.ClientConn, method string, streamer grpc.Streamer, opts ...grpc.CallOption) (grpc.ClientStream, error) {
|
||||
rawTag, ok := tagging.IOTagFromContext(ctx)
|
||||
if !ok {
|
||||
return streamer(ctx, desc, cc, method, opts...)
|
||||
}
|
||||
tag, err := FromRawString(rawTag)
|
||||
if err != nil {
|
||||
tag = IOTagClient
|
||||
}
|
||||
if tag == IOTagBackground || tag == IOTagPolicer || tag == IOTagWritecache {
|
||||
tag = IOTagInternal
|
||||
}
|
||||
ctx = tagging.ContextWithIOTag(ctx, tag.String())
|
||||
return streamer(ctx, desc, cc, method, opts...)
|
||||
}
|
||||
}
|
39
internal/qos/tags.go
Normal file
|
@ -0,0 +1,39 @@
|
|||
package qos
|
||||
|
||||
import "fmt"
|
||||
|
||||
type IOTag string
|
||||
|
||||
const (
|
||||
IOTagClient IOTag = "client"
|
||||
IOTagInternal IOTag = "internal"
|
||||
IOTagBackground IOTag = "background"
|
||||
IOTagWritecache IOTag = "writecache"
|
||||
IOTagPolicer IOTag = "policer"
|
||||
IOTagCritical IOTag = "critical"
|
||||
|
||||
ioTagUnknown IOTag = ""
|
||||
)
|
||||
|
||||
func FromRawString(s string) (IOTag, error) {
|
||||
switch s {
|
||||
case string(IOTagCritical):
|
||||
return IOTagCritical, nil
|
||||
case string(IOTagClient):
|
||||
return IOTagClient, nil
|
||||
case string(IOTagInternal):
|
||||
return IOTagInternal, nil
|
||||
case string(IOTagBackground):
|
||||
return IOTagBackground, nil
|
||||
case string(IOTagWritecache):
|
||||
return IOTagWritecache, nil
|
||||
case string(IOTagPolicer):
|
||||
return IOTagPolicer, nil
|
||||
default:
|
||||
return ioTagUnknown, fmt.Errorf("unknown tag %s", s)
|
||||
}
|
||||
}
|
||||
|
||||
func (t IOTag) String() string {
|
||||
return string(t)
|
||||
}
|
|
@ -45,7 +45,7 @@ func TestUpgradeV2ToV3(t *testing.T) {
|
|||
|
||||
type testContainerInfoProvider struct{}
|
||||
|
||||
func (p *testContainerInfoProvider) Info(id cid.ID) (container.Info, error) {
|
||||
func (p *testContainerInfoProvider) Info(ctx context.Context, id cid.ID) (container.Info, error) {
|
||||
return container.Info{}, nil
|
||||
}
|
||||
|
||||
|
|
|
@ -6,11 +6,13 @@ import (
|
|||
"time"
|
||||
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/qos"
|
||||
meta "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/metabase"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard/mode"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-observability/tracing"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-qos/tagging"
|
||||
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
|
||||
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
||||
"go.uber.org/zap"
|
||||
|
@ -149,7 +151,7 @@ func (gc *gc) init(ctx context.Context) {
|
|||
if sz > 0 {
|
||||
gc.workerPool = gc.workerPoolInit(sz)
|
||||
}
|
||||
|
||||
ctx = tagging.ContextWithIOTag(ctx, qos.IOTagBackground.String())
|
||||
gc.wg.Add(2)
|
||||
go gc.tickRemover(ctx)
|
||||
go gc.listenEvents(ctx)
|
||||
|
|
|
@ -6,10 +6,12 @@ import (
|
|||
"sync"
|
||||
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/qos"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor"
|
||||
meta "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/metabase"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-observability/tracing"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-qos/tagging"
|
||||
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
||||
"go.opentelemetry.io/otel/attribute"
|
||||
"go.opentelemetry.io/otel/trace"
|
||||
|
@ -103,6 +105,7 @@ func runRebuild(ctx context.Context, bs *blobstor.BlobStor, mb *meta.DB, log *lo
|
|||
default:
|
||||
}
|
||||
log.Info(ctx, logs.BlobstoreRebuildStarted)
|
||||
ctx = tagging.ContextWithIOTag(ctx, qos.IOTagBackground.String())
|
||||
if err := bs.Rebuild(ctx, &mbStorageIDUpdate{mb: mb}, limiter, fillPercent); err != nil {
|
||||
log.Warn(ctx, logs.FailedToRebuildBlobstore, zap.Error(err))
|
||||
} else {
|
||||
|
|
|
@ -6,6 +6,7 @@ import (
|
|||
"time"
|
||||
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/qos"
|
||||
objectCore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common"
|
||||
|
@ -14,6 +15,7 @@ import (
|
|||
meta "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/metabase"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard/mode"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-observability/tracing"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-qos/tagging"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client"
|
||||
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
|
||||
"go.opentelemetry.io/otel/attribute"
|
||||
|
@ -35,6 +37,7 @@ func (c *cache) runFlushLoop(ctx context.Context) {
|
|||
if c.disableBackgroundFlush {
|
||||
return
|
||||
}
|
||||
ctx = tagging.ContextWithIOTag(ctx, qos.IOTagWritecache.String())
|
||||
fl := newFlushLimiter(c.flushSizeLimit)
|
||||
c.wg.Add(1)
|
||||
go func() {
|
||||
|
|
6
pkg/network/cache/multi.go
vendored
|
@ -7,10 +7,12 @@ import (
|
|||
"sync"
|
||||
"time"
|
||||
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/qos"
|
||||
clientcore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/client"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/network"
|
||||
metrics "git.frostfs.info/TrueCloudLab/frostfs-observability/metrics/grpc"
|
||||
tracing "git.frostfs.info/TrueCloudLab/frostfs-observability/tracing/grpc"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-qos/tagging"
|
||||
rawclient "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/api/rpc/client"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client"
|
||||
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
|
||||
|
@ -62,12 +64,16 @@ func (x *multiClient) createForAddress(ctx context.Context, addr network.Address
|
|||
|
||||
grpcOpts := []grpc.DialOption{
|
||||
grpc.WithChainUnaryInterceptor(
|
||||
qos.NewAdjustOutgoingIOTagUnaryClientInterceptor(),
|
||||
metrics.NewUnaryClientInterceptor(),
|
||||
tracing.NewUnaryClientInteceptor(),
|
||||
tagging.NewUnaryClientInteceptor(),
|
||||
),
|
||||
grpc.WithChainStreamInterceptor(
|
||||
qos.NewAdjustOutgoingIOTagStreamClientInterceptor(),
|
||||
metrics.NewStreamClientInterceptor(),
|
||||
tracing.NewStreamClientInterceptor(),
|
||||
tagging.NewStreamClientInterceptor(),
|
||||
),
|
||||
grpc.WithContextDialer(x.opts.DialerSource.GrpcContextDialer()),
|
||||
grpc.WithDefaultCallOptions(grpc.WaitForReady(true)),
|
||||
|
|
132
pkg/services/object/qos.go
Normal file
|
@ -0,0 +1,132 @@
|
|||
package object
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/api/object"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/api/session"
|
||||
)
|
||||
|
||||
var _ ServiceServer = (*qosObjectService)(nil)
|
||||
|
||||
type AdjustIOTag interface {
|
||||
AdjustIncomingTag(ctx context.Context, requestSignPublicKey []byte) context.Context
|
||||
}
|
||||
|
||||
type qosObjectService struct {
|
||||
aarifullin marked this conversation as resolved
Outdated
aarifullin
commented
`qoSObjectService` -> `qosObjectService`? :)
dstepanov-yadro
commented
fixed fixed
|
||||
next ServiceServer
|
||||
adj AdjustIOTag
|
||||
}
|
||||
|
||||
func NewQoSObjectService(next ServiceServer, adjIOTag AdjustIOTag) ServiceServer {
|
||||
return &qosObjectService{
|
||||
next: next,
|
||||
adj: adjIOTag,
|
||||
}
|
||||
}
|
||||
|
||||
func (q *qosObjectService) Delete(ctx context.Context, req *object.DeleteRequest) (*object.DeleteResponse, error) {
|
||||
ctx = q.adj.AdjustIncomingTag(ctx, req.GetVerificationHeader().GetBodySignature().GetKey())
|
||||
return q.next.Delete(ctx, req)
|
||||
}
|
||||
|
||||
func (q *qosObjectService) Get(req *object.GetRequest, s GetObjectStream) error {
|
||||
ctx := q.adj.AdjustIncomingTag(s.Context(), req.GetVerificationHeader().GetBodySignature().GetKey())
|
||||
return q.next.Get(req, &qosReadStream[*object.GetResponse]{
|
||||
ctxF: func() context.Context { return ctx },
|
||||
sender: s,
|
||||
})
|
||||
}
|
||||
|
||||
func (q *qosObjectService) GetRange(req *object.GetRangeRequest, s GetObjectRangeStream) error {
|
||||
ctx := q.adj.AdjustIncomingTag(s.Context(), req.GetVerificationHeader().GetBodySignature().GetKey())
|
||||
return q.next.GetRange(req, &qosReadStream[*object.GetRangeResponse]{
|
||||
ctxF: func() context.Context { return ctx },
|
||||
sender: s,
|
||||
})
|
||||
}
|
||||
|
||||
func (q *qosObjectService) GetRangeHash(ctx context.Context, req *object.GetRangeHashRequest) (*object.GetRangeHashResponse, error) {
|
||||
ctx = q.adj.AdjustIncomingTag(ctx, req.GetVerificationHeader().GetBodySignature().GetKey())
|
||||
return q.next.GetRangeHash(ctx, req)
|
||||
}
|
||||
|
||||
func (q *qosObjectService) Head(ctx context.Context, req *object.HeadRequest) (*object.HeadResponse, error) {
|
||||
ctx = q.adj.AdjustIncomingTag(ctx, req.GetVerificationHeader().GetBodySignature().GetKey())
|
||||
return q.next.Head(ctx, req)
|
||||
}
|
||||
|
||||
func (q *qosObjectService) Patch(ctx context.Context) (PatchObjectStream, error) {
|
||||
s, err := q.next.Patch(ctx)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return &qosWriteStream[*object.PatchRequest, *object.PatchResponse]{
|
||||
s: s,
|
||||
adj: q.adj,
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (q *qosObjectService) Put(ctx context.Context) (PutObjectStream, error) {
|
||||
s, err := q.next.Put(ctx)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return &qosWriteStream[*object.PutRequest, *object.PutResponse]{
|
||||
s: s,
|
||||
adj: q.adj,
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (q *qosObjectService) PutSingle(ctx context.Context, req *object.PutSingleRequest) (*object.PutSingleResponse, error) {
|
||||
ctx = q.adj.AdjustIncomingTag(ctx, req.GetVerificationHeader().GetBodySignature().GetKey())
|
||||
return q.next.PutSingle(ctx, req)
|
||||
}
|
||||
|
||||
func (q *qosObjectService) Search(req *object.SearchRequest, s SearchStream) error {
|
||||
ctx := q.adj.AdjustIncomingTag(s.Context(), req.GetVerificationHeader().GetBodySignature().GetKey())
|
||||
return q.next.Search(req, &qosReadStream[*object.SearchResponse]{
|
||||
ctxF: func() context.Context { return ctx },
|
||||
sender: s,
|
||||
})
|
||||
}
|
||||
|
||||
type qosSend[T any] interface {
|
||||
Send(T) error
|
||||
}
|
||||
|
||||
type qosReadStream[T any] struct {
|
||||
sender qosSend[T]
|
||||
ctxF func() context.Context
|
||||
}
|
||||
|
||||
func (g *qosReadStream[T]) Context() context.Context {
|
||||
return g.ctxF()
|
||||
}
|
||||
|
||||
func (g *qosReadStream[T]) Send(resp T) error {
|
||||
return g.sender.Send(resp)
|
||||
}
|
||||
|
||||
type qosVerificationHeader interface {
|
||||
GetVerificationHeader() *session.RequestVerificationHeader
|
||||
}
|
||||
|
||||
type qosSendRecv[TReq qosVerificationHeader, TResp any] interface {
|
||||
Send(context.Context, TReq) error
|
||||
CloseAndRecv(context.Context) (TResp, error)
|
||||
}
|
||||
|
||||
type qosWriteStream[TReq qosVerificationHeader, TResp any] struct {
|
||||
s qosSendRecv[TReq, TResp]
|
||||
adj AdjustIOTag
|
||||
}
|
||||
|
||||
func (q *qosWriteStream[TReq, TResp]) CloseAndRecv(ctx context.Context) (TResp, error) {
|
||||
return q.s.CloseAndRecv(ctx)
|
||||
}
|
||||
|
||||
func (q *qosWriteStream[TReq, TResp]) Send(ctx context.Context, req TReq) error {
|
||||
ctx = q.adj.AdjustIncomingTag(ctx, req.GetVerificationHeader().GetBodySignature().GetKey())
|
||||
return q.s.Send(ctx, req)
|
||||
}
|
|
@ -7,7 +7,9 @@ import (
|
|||
"time"
|
||||
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/qos"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/engine"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-qos/tagging"
|
||||
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
@ -18,6 +20,7 @@ func (p *Policer) Run(ctx context.Context) {
|
|||
}
|
||||
|
||||
func (p *Policer) shardPolicyWorker(ctx context.Context) {
|
||||
ctx = tagging.ContextWithIOTag(ctx, qos.IOTagPolicer.String())
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
|
|
|
@ -9,9 +9,11 @@ import (
|
|||
"time"
|
||||
|
||||
internalNet "git.frostfs.info/TrueCloudLab/frostfs-node/internal/net"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/qos"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/network"
|
||||
metrics "git.frostfs.info/TrueCloudLab/frostfs-observability/metrics/grpc"
|
||||
tracing "git.frostfs.info/TrueCloudLab/frostfs-observability/tracing/grpc"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-qos/tagging"
|
||||
"github.com/hashicorp/golang-lru/v2/simplelru"
|
||||
"google.golang.org/grpc"
|
||||
"google.golang.org/grpc/connectivity"
|
||||
|
@ -95,12 +97,16 @@ func (c *clientCache) dialTreeService(ctx context.Context, netmapAddr string) (*
|
|||
|
||||
opts := []grpc.DialOption{
|
||||
grpc.WithChainUnaryInterceptor(
|
||||
qos.NewAdjustOutgoingIOTagUnaryClientInterceptor(),
|
||||
metrics.NewUnaryClientInterceptor(),
|
||||
tracing.NewUnaryClientInteceptor(),
|
||||
tagging.NewUnaryClientInteceptor(),
|
||||
),
|
||||
grpc.WithChainStreamInterceptor(
|
||||
qos.NewAdjustOutgoingIOTagStreamClientInterceptor(),
|
||||
metrics.NewStreamClientInterceptor(),
|
||||
tracing.NewStreamClientInterceptor(),
|
||||
tagging.NewStreamClientInterceptor(),
|
||||
),
|
||||
grpc.WithContextDialer(c.ds.GrpcContextDialer()),
|
||||
grpc.WithDefaultCallOptions(grpc.WaitForReady(true)),
|
||||
|
|
101
pkg/services/tree/qos.go
Normal file
|
@ -0,0 +1,101 @@
|
|||
package tree
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
||||
"google.golang.org/grpc"
|
||||
)
|
||||
|
||||
var _ TreeServiceServer = (*ioTagAdjust)(nil)
|
||||
|
||||
type AdjustIOTag interface {
|
||||
AdjustIncomingTag(ctx context.Context, requestSignPublicKey []byte) context.Context
|
||||
}
|
||||
|
||||
type ioTagAdjust struct {
|
||||
s TreeServiceServer
|
||||
a AdjustIOTag
|
||||
}
|
||||
|
||||
func NewIOTagAdjustServer(s TreeServiceServer, a AdjustIOTag) TreeServiceServer {
|
||||
return &ioTagAdjust{
|
||||
s: s,
|
||||
a: a,
|
||||
}
|
||||
}
|
||||
|
||||
func (i *ioTagAdjust) Add(ctx context.Context, req *AddRequest) (*AddResponse, error) {
|
||||
ctx = i.a.AdjustIncomingTag(ctx, req.GetSignature().GetKey())
|
||||
fyrchik marked this conversation as resolved
Outdated
fyrchik
commented
I don't understand all these different places where we work with tags. I don't understand all these different places where we work with tags.
Consider tree service: we have 2 interceptors and also `NewIOTagAdjustServer`.
Why are all those different?
dstepanov-yadro
commented
1. `tagging.NewUnaryServerInterceptor()` and `tagging.NewStreamServerInterceptor()` extract an IO tag from the metadata of incommin grpc request and store it in the context.Context. They don't know anything about frostfs-node allowed tags.
2. `tagging.NewUnaryClientInteceptor()` and `tagging.NewStreamClientInterceptor()` extract an IO tag from context.Context and store it in the metadata of outcomming grpc request. They also don't know anything about frostfs-node allowed tags.
3. `AdjustIncommingTag` checks if incomming request's signer has rights to use `internal` or `critical` IO tag and also that tag has allowed value.
4. `qos.NewAdjustOutgoingIOTagUnaryClientInterceptor()` and `qos.NewAdjustOutgoingIOTagStreamClientInterceptor()` replace `background`, `policer` and `writecache` tag values with `internal` IO tag value for outcomming requests.
fyrchik
commented
Regarding (4), I think there is a slight confusion: Do you think introducing interceptor is a better solution, than explicit Regarding (4), I think there is a slight confusion:
Any thread, initiating network request could set its meta explicitly (policer, replicator).
And writecache doesn't create network requests, for example.
Do you think introducing interceptor is a better solution, than explicit `internal` on all callsites?
fyrchik
commented
Thanks, it became clearer know. Thanks, it became clearer know.
`Adjust` doesn't scream VALIDATION, thats where my confusion came from.
dstepanov-yadro
commented
I tried, but it looked very scary (especially in policer/replicator). > Do you think introducing interceptor is a better solution, than explicit internal on all callsites?
I tried, but it looked very scary (especially in policer/replicator).
|
||||
return i.s.Add(ctx, req)
|
||||
}
|
||||
|
||||
func (i *ioTagAdjust) AddByPath(ctx context.Context, req *AddByPathRequest) (*AddByPathResponse, error) {
|
||||
ctx = i.a.AdjustIncomingTag(ctx, req.GetSignature().GetKey())
|
||||
return i.s.AddByPath(ctx, req)
|
||||
}
|
||||
|
||||
func (i *ioTagAdjust) Apply(ctx context.Context, req *ApplyRequest) (*ApplyResponse, error) {
|
||||
ctx = i.a.AdjustIncomingTag(ctx, req.GetSignature().GetKey())
|
||||
return i.s.Apply(ctx, req)
|
||||
}
|
||||
|
||||
func (i *ioTagAdjust) GetNodeByPath(ctx context.Context, req *GetNodeByPathRequest) (*GetNodeByPathResponse, error) {
|
||||
ctx = i.a.AdjustIncomingTag(ctx, req.GetSignature().GetKey())
|
||||
return i.s.GetNodeByPath(ctx, req)
|
||||
}
|
||||
|
||||
func (i *ioTagAdjust) GetOpLog(req *GetOpLogRequest, srv TreeService_GetOpLogServer) error {
|
||||
ctx := i.a.AdjustIncomingTag(srv.Context(), req.GetSignature().GetKey())
|
||||
return i.s.GetOpLog(req, &qosServerWrapper[*GetOpLogResponse]{
|
||||
sender: srv,
|
||||
ServerStream: srv,
|
||||
ctxF: func() context.Context { return ctx },
|
||||
})
|
||||
}
|
||||
|
||||
func (i *ioTagAdjust) GetSubTree(req *GetSubTreeRequest, srv TreeService_GetSubTreeServer) error {
|
||||
ctx := i.a.AdjustIncomingTag(srv.Context(), req.GetSignature().GetKey())
|
||||
return i.s.GetSubTree(req, &qosServerWrapper[*GetSubTreeResponse]{
|
||||
sender: srv,
|
||||
ServerStream: srv,
|
||||
ctxF: func() context.Context { return ctx },
|
||||
})
|
||||
}
|
||||
|
||||
func (i *ioTagAdjust) Healthcheck(ctx context.Context, req *HealthcheckRequest) (*HealthcheckResponse, error) {
|
||||
ctx = i.a.AdjustIncomingTag(ctx, req.GetSignature().GetKey())
|
||||
return i.s.Healthcheck(ctx, req)
|
||||
}
|
||||
|
||||
func (i *ioTagAdjust) Move(ctx context.Context, req *MoveRequest) (*MoveResponse, error) {
|
||||
ctx = i.a.AdjustIncomingTag(ctx, req.GetSignature().GetKey())
|
||||
return i.s.Move(ctx, req)
|
||||
}
|
||||
|
||||
func (i *ioTagAdjust) Remove(ctx context.Context, req *RemoveRequest) (*RemoveResponse, error) {
|
||||
ctx = i.a.AdjustIncomingTag(ctx, req.GetSignature().GetKey())
|
||||
return i.s.Remove(ctx, req)
|
||||
}
|
||||
|
||||
func (i *ioTagAdjust) TreeList(ctx context.Context, req *TreeListRequest) (*TreeListResponse, error) {
|
||||
ctx = i.a.AdjustIncomingTag(ctx, req.GetSignature().GetKey())
|
||||
return i.s.TreeList(ctx, req)
|
||||
}
|
||||
|
||||
type qosSend[T any] interface {
|
||||
Send(T) error
|
||||
}
|
||||
|
||||
type qosServerWrapper[T any] struct {
|
||||
grpc.ServerStream
|
||||
sender qosSend[T]
|
||||
ctxF func() context.Context
|
||||
}
|
||||
|
||||
func (w *qosServerWrapper[T]) Send(resp T) error {
|
||||
return w.sender.Send(resp)
|
||||
}
|
||||
|
||||
func (w *qosServerWrapper[T]) Context() context.Context {
|
||||
return w.ctxF()
|
||||
}
|
|
@ -9,9 +9,11 @@ import (
|
|||
"sync"
|
||||
"sync/atomic"
|
||||
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/qos"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/pilorama"
|
||||
checkercore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/common/ape"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-qos/tagging"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/acl"
|
||||
cidSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
|
||||
netmapSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/netmap"
|
||||
|
@ -83,6 +85,7 @@ func New(opts ...Option) *Service {
|
|||
|
||||
// Start starts the service.
|
||||
func (s *Service) Start(ctx context.Context) {
|
||||
ctx = tagging.ContextWithIOTag(ctx, qos.IOTagBackground.String())
|
||||
go s.replicateLoop(ctx)
|
||||
go s.syncLoop(ctx)
|
||||
|
||||
|
|
|
@ -13,6 +13,7 @@ import (
|
|||
"time"
|
||||
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/qos"
|
||||
containerCore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/container"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/pilorama"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/client/netmap"
|
||||
|
@ -20,6 +21,7 @@ import (
|
|||
metrics "git.frostfs.info/TrueCloudLab/frostfs-observability/metrics/grpc"
|
||||
tracing "git.frostfs.info/TrueCloudLab/frostfs-observability/tracing"
|
||||
tracing_grpc "git.frostfs.info/TrueCloudLab/frostfs-observability/tracing/grpc"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-qos/tagging"
|
||||
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
|
||||
netmapSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/netmap"
|
||||
"github.com/panjf2000/ants/v2"
|
||||
|
@ -340,12 +342,16 @@ func (s *Service) synchronizeTree(ctx context.Context, cid cid.ID, from uint64,
|
|||
func (*Service) createConnection(a network.Address) (*grpc.ClientConn, error) {
|
||||
return grpc.NewClient(a.URIAddr(),
|
||||
grpc.WithChainUnaryInterceptor(
|
||||
qos.NewAdjustOutgoingIOTagUnaryClientInterceptor(),
|
||||
metrics.NewUnaryClientInterceptor(),
|
||||
tracing_grpc.NewUnaryClientInteceptor(),
|
||||
tagging.NewUnaryClientInteceptor(),
|
||||
),
|
||||
grpc.WithChainStreamInterceptor(
|
||||
qos.NewAdjustOutgoingIOTagStreamClientInterceptor(),
|
||||
metrics.NewStreamClientInterceptor(),
|
||||
tracing_grpc.NewStreamClientInterceptor(),
|
||||
tagging.NewStreamClientInterceptor(),
|
||||
),
|
||||
grpc.WithTransportCredentials(insecure.NewCredentials()),
|
||||
grpc.WithDefaultCallOptions(grpc.WaitForReady(true)),
|
||||
|
|
|
@ -4,37 +4,32 @@ import (
|
|||
"context"
|
||||
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/tracing"
|
||||
qos "git.frostfs.info/TrueCloudLab/frostfs-qos/tagging"
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
func (l *Logger) Debug(ctx context.Context, msg string, fields ...zap.Field) {
|
||||
if traceID := tracing.GetTraceID(ctx); traceID != "" {
|
||||
l.z.Debug(msg, append(fields, zap.String("trace_id", traceID))...)
|
||||
return
|
||||
}
|
||||
l.z.Debug(msg, fields...)
|
||||
l.z.Debug(msg, appendContext(ctx, fields...)...)
|
||||
}
|
||||
|
||||
func (l *Logger) Info(ctx context.Context, msg string, fields ...zap.Field) {
|
||||
if traceID := tracing.GetTraceID(ctx); traceID != "" {
|
||||
l.z.Info(msg, append(fields, zap.String("trace_id", traceID))...)
|
||||
return
|
||||
}
|
||||
l.z.Info(msg, fields...)
|
||||
l.z.Info(msg, appendContext(ctx, fields...)...)
|
||||
}
|
||||
|
||||
func (l *Logger) Warn(ctx context.Context, msg string, fields ...zap.Field) {
|
||||
if traceID := tracing.GetTraceID(ctx); traceID != "" {
|
||||
l.z.Warn(msg, append(fields, zap.String("trace_id", traceID))...)
|
||||
return
|
||||
}
|
||||
l.z.Warn(msg, fields...)
|
||||
l.z.Warn(msg, appendContext(ctx, fields...)...)
|
||||
}
|
||||
|
||||
func (l *Logger) Error(ctx context.Context, msg string, fields ...zap.Field) {
|
||||
if traceID := tracing.GetTraceID(ctx); traceID != "" {
|
||||
l.z.Error(msg, append(fields, zap.String("trace_id", traceID))...)
|
||||
return
|
||||
}
|
||||
l.z.Error(msg, fields...)
|
||||
l.z.Error(msg, appendContext(ctx, fields...)...)
|
||||
}
|
||||
|
||||
func appendContext(ctx context.Context, fields ...zap.Field) []zap.Field {
|
||||
fyrchik
commented
@acid-ant please, look @acid-ant please, look
The PR with logging tags will be adapted, depending on the merge order.
acid-ant
commented
Yeah, saw this, merge will be easy. Yeah, saw this, merge will be easy.
|
||||
if traceID := tracing.GetTraceID(ctx); traceID != "" {
|
||||
fields = append(fields, zap.String("trace_id", traceID))
|
||||
}
|
||||
if ioTag, ioTagDefined := qos.IOTagFromContext(ctx); ioTagDefined {
|
||||
fields = append(fields, zap.String("io_tag", ioTag))
|
||||
}
|
||||
return fields
|
||||
}
|
||||
|
|
No need for metric in frostfs-cli