WIP: Add treesync tag to QoS #1695
13 changed files with 37 additions and 28 deletions
|
@ -56,7 +56,7 @@ func GetSDKClient(ctx context.Context, cmd *cobra.Command, key *ecdsa.PrivateKey
|
||||||
prmDial := client.PrmDial{
|
prmDial := client.PrmDial{
|
||||||
Endpoint: addr.URIAddr(),
|
Endpoint: addr.URIAddr(),
|
||||||
GRPCDialOptions: []grpc.DialOption{
|
GRPCDialOptions: []grpc.DialOption{
|
||||||
grpc.WithChainUnaryInterceptor(tracing.NewUnaryClientInteceptor()),
|
grpc.WithChainUnaryInterceptor(tracing.NewUnaryClientInterceptor()),
|
||||||
grpc.WithChainStreamInterceptor(tracing.NewStreamClientInterceptor()),
|
grpc.WithChainStreamInterceptor(tracing.NewStreamClientInterceptor()),
|
||||||
grpc.WithDefaultCallOptions(grpc.WaitForReady(true)),
|
grpc.WithDefaultCallOptions(grpc.WaitForReady(true)),
|
||||||
},
|
},
|
||||||
|
|
|
@ -33,7 +33,7 @@ func _client() (tree.TreeServiceClient, error) {
|
||||||
|
|
||||||
opts := []grpc.DialOption{
|
opts := []grpc.DialOption{
|
||||||
grpc.WithChainUnaryInterceptor(
|
grpc.WithChainUnaryInterceptor(
|
||||||
tracing.NewUnaryClientInteceptor(),
|
tracing.NewUnaryClientInterceptor(),
|
||||||
),
|
),
|
||||||
grpc.WithChainStreamInterceptor(
|
grpc.WithChainStreamInterceptor(
|
||||||
tracing.NewStreamClientInterceptor(),
|
tracing.NewStreamClientInterceptor(),
|
||||||
|
|
2
go.mod
2
go.mod
|
@ -7,7 +7,7 @@ require (
|
||||||
git.frostfs.info/TrueCloudLab/frostfs-contract v0.21.1
|
git.frostfs.info/TrueCloudLab/frostfs-contract v0.21.1
|
||||||
git.frostfs.info/TrueCloudLab/frostfs-crypto v0.6.0
|
git.frostfs.info/TrueCloudLab/frostfs-crypto v0.6.0
|
||||||
git.frostfs.info/TrueCloudLab/frostfs-locode-db v0.4.1-0.20240710074952-65761deb5c0d
|
git.frostfs.info/TrueCloudLab/frostfs-locode-db v0.4.1-0.20240710074952-65761deb5c0d
|
||||||
git.frostfs.info/TrueCloudLab/frostfs-observability v0.0.0-20250212111929-d34e1329c824
|
git.frostfs.info/TrueCloudLab/frostfs-observability v0.0.0-20250321063246-93b681a20248
|
||||||
git.frostfs.info/TrueCloudLab/frostfs-qos v0.0.0-20250310135838-3e7ca9403529
|
git.frostfs.info/TrueCloudLab/frostfs-qos v0.0.0-20250310135838-3e7ca9403529
|
||||||
git.frostfs.info/TrueCloudLab/frostfs-sdk-go v0.0.0-20250306092416-69b0711d12d9
|
git.frostfs.info/TrueCloudLab/frostfs-sdk-go v0.0.0-20250306092416-69b0711d12d9
|
||||||
git.frostfs.info/TrueCloudLab/hrw v1.2.1
|
git.frostfs.info/TrueCloudLab/hrw v1.2.1
|
||||||
|
|
4
go.sum
4
go.sum
|
@ -6,8 +6,8 @@ git.frostfs.info/TrueCloudLab/frostfs-crypto v0.6.0 h1:FxqFDhQYYgpe41qsIHVOcdzSV
|
||||||
git.frostfs.info/TrueCloudLab/frostfs-crypto v0.6.0/go.mod h1:RUIKZATQLJ+TaYQa60X2fTDwfuhMfm8Ar60bQ5fr+vU=
|
git.frostfs.info/TrueCloudLab/frostfs-crypto v0.6.0/go.mod h1:RUIKZATQLJ+TaYQa60X2fTDwfuhMfm8Ar60bQ5fr+vU=
|
||||||
git.frostfs.info/TrueCloudLab/frostfs-locode-db v0.4.1-0.20240710074952-65761deb5c0d h1:uJ/wvuMdepbkaV8XMS5uN9B0FQWMep0CttSuDZiDhq0=
|
git.frostfs.info/TrueCloudLab/frostfs-locode-db v0.4.1-0.20240710074952-65761deb5c0d h1:uJ/wvuMdepbkaV8XMS5uN9B0FQWMep0CttSuDZiDhq0=
|
||||||
git.frostfs.info/TrueCloudLab/frostfs-locode-db v0.4.1-0.20240710074952-65761deb5c0d/go.mod h1:7ZZq8iguY7qFsXajdHGmZd2AW4QbucyrJwhbsRfOfek=
|
git.frostfs.info/TrueCloudLab/frostfs-locode-db v0.4.1-0.20240710074952-65761deb5c0d/go.mod h1:7ZZq8iguY7qFsXajdHGmZd2AW4QbucyrJwhbsRfOfek=
|
||||||
git.frostfs.info/TrueCloudLab/frostfs-observability v0.0.0-20250212111929-d34e1329c824 h1:Mxw1c/8t96vFIUOffl28lFaHKi413oCBfLMGJmF9cFA=
|
git.frostfs.info/TrueCloudLab/frostfs-observability v0.0.0-20250321063246-93b681a20248 h1:fluzML8BIIabd07LyPSjc0JAV2qymWkPiFaLrXdALLA=
|
||||||
git.frostfs.info/TrueCloudLab/frostfs-observability v0.0.0-20250212111929-d34e1329c824/go.mod h1:kbwB4v2o6RyOfCo9kEFeUDZIX3LKhmS0yXPrtvzkQ1g=
|
git.frostfs.info/TrueCloudLab/frostfs-observability v0.0.0-20250321063246-93b681a20248/go.mod h1:kbwB4v2o6RyOfCo9kEFeUDZIX3LKhmS0yXPrtvzkQ1g=
|
||||||
git.frostfs.info/TrueCloudLab/frostfs-qos v0.0.0-20250310135838-3e7ca9403529 h1:CBreXSxGoYJAdZ1QdJPsDs1UCXGF5psinII0lxtohsc=
|
git.frostfs.info/TrueCloudLab/frostfs-qos v0.0.0-20250310135838-3e7ca9403529 h1:CBreXSxGoYJAdZ1QdJPsDs1UCXGF5psinII0lxtohsc=
|
||||||
git.frostfs.info/TrueCloudLab/frostfs-qos v0.0.0-20250310135838-3e7ca9403529/go.mod h1:PCijYq4oa8vKtIEcUX6jRiszI6XAW+nBwU+T1kB4d1U=
|
git.frostfs.info/TrueCloudLab/frostfs-qos v0.0.0-20250310135838-3e7ca9403529/go.mod h1:PCijYq4oa8vKtIEcUX6jRiszI6XAW+nBwU+T1kB4d1U=
|
||||||
git.frostfs.info/TrueCloudLab/frostfs-sdk-go v0.0.0-20250306092416-69b0711d12d9 h1:svCl6NDAPZ/KuQPjdVKo74RkCIANesxUPM45zQZDhSw=
|
git.frostfs.info/TrueCloudLab/frostfs-sdk-go v0.0.0-20250306092416-69b0711d12d9 h1:svCl6NDAPZ/KuQPjdVKo74RkCIANesxUPM45zQZDhSw=
|
||||||
|
|
|
@ -26,7 +26,7 @@ func NewAdjustOutgoingIOTagUnaryClientInterceptor() grpc.UnaryClientInterceptor
|
||||||
if err != nil {
|
if err != nil {
|
||||||
tag = IOTagClient
|
tag = IOTagClient
|
||||||
}
|
}
|
||||||
if tag == IOTagBackground || tag == IOTagPolicer || tag == IOTagWritecache {
|
if tag.IsLocal() {
|
||||||
tag = IOTagInternal
|
tag = IOTagInternal
|
||||||
}
|
}
|
||||||
ctx = tagging.ContextWithIOTag(ctx, tag.String())
|
ctx = tagging.ContextWithIOTag(ctx, tag.String())
|
||||||
|
@ -44,7 +44,7 @@ func NewAdjustOutgoingIOTagStreamClientInterceptor() grpc.StreamClientIntercepto
|
||||||
if err != nil {
|
if err != nil {
|
||||||
tag = IOTagClient
|
tag = IOTagClient
|
||||||
}
|
}
|
||||||
if tag == IOTagBackground || tag == IOTagPolicer || tag == IOTagWritecache {
|
if tag.IsLocal() {
|
||||||
tag = IOTagInternal
|
tag = IOTagInternal
|
||||||
}
|
}
|
||||||
ctx = tagging.ContextWithIOTag(ctx, tag.String())
|
ctx = tagging.ContextWithIOTag(ctx, tag.String())
|
||||||
|
|
|
@ -74,7 +74,7 @@ func createScheduler(config limits.OpConfig) (scheduler, error) {
|
||||||
|
|
||||||
func converToSchedulingTags(limits []limits.IOTagConfig) map[string]scheduling.TagInfo {
|
func converToSchedulingTags(limits []limits.IOTagConfig) map[string]scheduling.TagInfo {
|
||||||
result := make(map[string]scheduling.TagInfo)
|
result := make(map[string]scheduling.TagInfo)
|
||||||
for _, tag := range []IOTag{IOTagClient, IOTagBackground, IOTagInternal, IOTagPolicer, IOTagWritecache} {
|
for _, tag := range []IOTag{IOTagBackground, IOTagClient, IOTagInternal, IOTagPolicer, IOTagTreeSync, IOTagWritecache} {
|
||||||
result[tag.String()] = scheduling.TagInfo{
|
result[tag.String()] = scheduling.TagInfo{
|
||||||
Share: defaultShare,
|
Share: defaultShare,
|
||||||
}
|
}
|
||||||
|
|
|
@ -3,12 +3,13 @@ package qos
|
||||||
const unknownStatsTag = "unknown"
|
const unknownStatsTag = "unknown"
|
||||||
|
|
||||||
var statTags = map[string]struct{}{
|
var statTags = map[string]struct{}{
|
||||||
IOTagClient.String(): {},
|
|
||||||
IOTagBackground.String(): {},
|
IOTagBackground.String(): {},
|
||||||
|
IOTagClient.String(): {},
|
||||||
|
IOTagCritical.String(): {},
|
||||||
IOTagInternal.String(): {},
|
IOTagInternal.String(): {},
|
||||||
IOTagPolicer.String(): {},
|
IOTagPolicer.String(): {},
|
||||||
|
IOTagTreeSync.String(): {},
|
||||||
IOTagWritecache.String(): {},
|
IOTagWritecache.String(): {},
|
||||||
IOTagCritical.String(): {},
|
|
||||||
unknownStatsTag: {},
|
unknownStatsTag: {},
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -10,30 +10,33 @@ import (
|
||||||
type IOTag string
|
type IOTag string
|
||||||
|
|
||||||
const (
|
const (
|
||||||
IOTagClient IOTag = "client"
|
|
||||||
IOTagInternal IOTag = "internal"
|
|
||||||
IOTagBackground IOTag = "background"
|
IOTagBackground IOTag = "background"
|
||||||
IOTagWritecache IOTag = "writecache"
|
IOTagClient IOTag = "client"
|
||||||
IOTagPolicer IOTag = "policer"
|
|
||||||
IOTagCritical IOTag = "critical"
|
IOTagCritical IOTag = "critical"
|
||||||
|
IOTagInternal IOTag = "internal"
|
||||||
|
IOTagPolicer IOTag = "policer"
|
||||||
|
IOTagTreeSync IOTag = "treesync"
|
||||||
|
IOTagWritecache IOTag = "writecache"
|
||||||
|
|
||||||
ioTagUnknown IOTag = ""
|
ioTagUnknown IOTag = ""
|
||||||
)
|
)
|
||||||
|
|
||||||
func FromRawString(s string) (IOTag, error) {
|
func FromRawString(s string) (IOTag, error) {
|
||||||
switch s {
|
switch s {
|
||||||
case string(IOTagCritical):
|
|
||||||
return IOTagCritical, nil
|
|
||||||
case string(IOTagClient):
|
|
||||||
return IOTagClient, nil
|
|
||||||
case string(IOTagInternal):
|
|
||||||
return IOTagInternal, nil
|
|
||||||
case string(IOTagBackground):
|
case string(IOTagBackground):
|
||||||
return IOTagBackground, nil
|
return IOTagBackground, nil
|
||||||
case string(IOTagWritecache):
|
case string(IOTagClient):
|
||||||
return IOTagWritecache, nil
|
return IOTagClient, nil
|
||||||
|
case string(IOTagCritical):
|
||||||
|
return IOTagCritical, nil
|
||||||
|
case string(IOTagInternal):
|
||||||
|
return IOTagInternal, nil
|
||||||
case string(IOTagPolicer):
|
case string(IOTagPolicer):
|
||||||
return IOTagPolicer, nil
|
return IOTagPolicer, nil
|
||||||
|
case string(IOTagTreeSync):
|
||||||
|
return IOTagTreeSync, nil
|
||||||
|
case string(IOTagWritecache):
|
||||||
|
return IOTagWritecache, nil
|
||||||
default:
|
default:
|
||||||
return ioTagUnknown, fmt.Errorf("unknown tag %s", s)
|
return ioTagUnknown, fmt.Errorf("unknown tag %s", s)
|
||||||
}
|
}
|
||||||
|
@ -50,3 +53,7 @@ func IOTagFromContext(ctx context.Context) string {
|
||||||
}
|
}
|
||||||
return tag
|
return tag
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (t IOTag) IsLocal() bool {
|
||||||
|
return t == IOTagBackground || t == IOTagPolicer || t == IOTagWritecache || t == IOTagTreeSync
|
||||||
|
}
|
||||||
|
|
|
@ -42,11 +42,12 @@ func validateOpConfig(c limits.OpConfig) error {
|
||||||
|
|
||||||
func validateTags(configTags []limits.IOTagConfig) error {
|
func validateTags(configTags []limits.IOTagConfig) error {
|
||||||
tags := map[IOTag]tagConfig{
|
tags := map[IOTag]tagConfig{
|
||||||
|
IOTagBackground: {},
|
||||||
IOTagClient: {},
|
IOTagClient: {},
|
||||||
IOTagInternal: {},
|
IOTagInternal: {},
|
||||||
IOTagBackground: {},
|
|
||||||
IOTagWritecache: {},
|
|
||||||
IOTagPolicer: {},
|
IOTagPolicer: {},
|
||||||
|
IOTagTreeSync: {},
|
||||||
|
IOTagWritecache: {},
|
||||||
}
|
}
|
||||||
for _, t := range configTags {
|
for _, t := range configTags {
|
||||||
tag, err := FromRawString(t.Tag)
|
tag, err := FromRawString(t.Tag)
|
||||||
|
|
2
pkg/network/cache/multi.go
vendored
2
pkg/network/cache/multi.go
vendored
|
@ -66,7 +66,7 @@ func (x *multiClient) createForAddress(ctx context.Context, addr network.Address
|
||||||
grpc.WithChainUnaryInterceptor(
|
grpc.WithChainUnaryInterceptor(
|
||||||
qos.NewAdjustOutgoingIOTagUnaryClientInterceptor(),
|
qos.NewAdjustOutgoingIOTagUnaryClientInterceptor(),
|
||||||
metrics.NewUnaryClientInterceptor(),
|
metrics.NewUnaryClientInterceptor(),
|
||||||
tracing.NewUnaryClientInteceptor(),
|
tracing.NewUnaryClientInterceptor(),
|
||||||
tagging.NewUnaryClientInteceptor(),
|
tagging.NewUnaryClientInteceptor(),
|
||||||
),
|
),
|
||||||
grpc.WithChainStreamInterceptor(
|
grpc.WithChainStreamInterceptor(
|
||||||
|
|
|
@ -99,7 +99,7 @@ func (c *clientCache) dialTreeService(ctx context.Context, netmapAddr string) (*
|
||||||
grpc.WithChainUnaryInterceptor(
|
grpc.WithChainUnaryInterceptor(
|
||||||
qos.NewAdjustOutgoingIOTagUnaryClientInterceptor(),
|
qos.NewAdjustOutgoingIOTagUnaryClientInterceptor(),
|
||||||
metrics.NewUnaryClientInterceptor(),
|
metrics.NewUnaryClientInterceptor(),
|
||||||
tracing.NewUnaryClientInteceptor(),
|
tracing.NewUnaryClientInterceptor(),
|
||||||
tagging.NewUnaryClientInteceptor(),
|
tagging.NewUnaryClientInteceptor(),
|
||||||
),
|
),
|
||||||
grpc.WithChainStreamInterceptor(
|
grpc.WithChainStreamInterceptor(
|
||||||
|
|
|
@ -85,7 +85,7 @@ func New(opts ...Option) *Service {
|
||||||
|
|
||||||
// Start starts the service.
|
// Start starts the service.
|
||||||
func (s *Service) Start(ctx context.Context) {
|
func (s *Service) Start(ctx context.Context) {
|
||||||
ctx = tagging.ContextWithIOTag(ctx, qos.IOTagBackground.String())
|
ctx = tagging.ContextWithIOTag(ctx, qos.IOTagTreeSync.String())
|
||||||
go s.replicateLoop(ctx)
|
go s.replicateLoop(ctx)
|
||||||
go s.syncLoop(ctx)
|
go s.syncLoop(ctx)
|
||||||
|
|
||||||
|
|
|
@ -344,7 +344,7 @@ func (*Service) createConnection(a network.Address) (*grpc.ClientConn, error) {
|
||||||
grpc.WithChainUnaryInterceptor(
|
grpc.WithChainUnaryInterceptor(
|
||||||
qos.NewAdjustOutgoingIOTagUnaryClientInterceptor(),
|
qos.NewAdjustOutgoingIOTagUnaryClientInterceptor(),
|
||||||
metrics.NewUnaryClientInterceptor(),
|
metrics.NewUnaryClientInterceptor(),
|
||||||
tracing_grpc.NewUnaryClientInteceptor(),
|
tracing_grpc.NewUnaryClientInterceptor(),
|
||||||
tagging.NewUnaryClientInteceptor(),
|
tagging.NewUnaryClientInteceptor(),
|
||||||
),
|
),
|
||||||
grpc.WithChainStreamInterceptor(
|
grpc.WithChainStreamInterceptor(
|
||||||
|
|
Loading…
Add table
Reference in a new issue