[#1608] qos: Add qos service to adjust incoming IO tags
Signed-off-by: Dmitrii Stepanov <d.stepanov@yadro.com>
This commit is contained in:
parent
9729f31e5c
commit
f6b3f79e89
4 changed files with 83 additions and 4 deletions
|
@ -101,6 +101,7 @@ func initApp(ctx context.Context, c *cfg) {
|
|||
|
||||
initAndLog(ctx, c, "gRPC", func(c *cfg) { initGRPC(ctx, c) })
|
||||
initAndLog(ctx, c, "netmap", func(c *cfg) { initNetmapService(ctx, c) })
|
||||
initAndLog(ctx, c, "qos", func(c *cfg) { initQoSService(c) })
|
||||
|
||||
initAccessPolicyEngine(ctx, c)
|
||||
initAndLog(ctx, c, "access policy engine", func(c *cfg) {
|
||||
|
|
|
@ -1,8 +1,24 @@
|
|||
package main
|
||||
|
||||
import qosconfig "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/qos"
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
|
||||
type cfgQoSService struct{}
|
||||
qosconfig "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/qos"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/qos"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/netmap"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger"
|
||||
qosTagging "git.frostfs.info/TrueCloudLab/frostfs-qos/tagging"
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
type cfgQoSService struct {
|
||||
netmapSource netmap.Source
|
||||
logger *logger.Logger
|
||||
allowedCriticalPubs [][]byte
|
||||
allowedInternalPubs [][]byte
|
||||
}
|
||||
|
||||
func initQoSService(c *cfg) {
|
||||
criticalPubs := qosconfig.CriticalAuthorizedKeys(c.appCfg)
|
||||
|
@ -16,5 +32,64 @@ func initQoSService(c *cfg) {
|
|||
rawInternalPubs = append(rawInternalPubs, internalPubs[i].Bytes())
|
||||
}
|
||||
|
||||
c.cfgQoSService = cfgQoSService{}
|
||||
c.cfgQoSService = cfgQoSService{
|
||||
netmapSource: c.netMapSource,
|
||||
logger: c.log,
|
||||
allowedCriticalPubs: rawCriticalPubs,
|
||||
allowedInternalPubs: rawInternalPubs,
|
||||
}
|
||||
}
|
||||
|
||||
func (s *cfgQoSService) AdjustIncomingTag(ctx context.Context, requestSignPublicKey []byte) context.Context {
|
||||
rawTag, defined := qosTagging.IOTagFromContext(ctx)
|
||||
if !defined {
|
||||
return qosTagging.ContextWithIOTag(ctx, qos.IOTagClient.String())
|
||||
}
|
||||
ioTag, err := qos.FromRawString(rawTag)
|
||||
if err != nil {
|
||||
s.logger.Warn(ctx, logs.FailedToParseIncomingIOTag, zap.Error(err))
|
||||
return qosTagging.ContextWithIOTag(ctx, qos.IOTagClient.String())
|
||||
}
|
||||
|
||||
switch ioTag {
|
||||
case qos.IOTagClient:
|
||||
return ctx
|
||||
case qos.IOTagCritical:
|
||||
for _, pk := range s.allowedCriticalPubs {
|
||||
if bytes.Equal(pk, requestSignPublicKey) {
|
||||
return ctx
|
||||
}
|
||||
}
|
||||
nm, err := s.netmapSource.GetNetMap(ctx, 0)
|
||||
if err != nil {
|
||||
s.logger.Debug(ctx, logs.FailedToGetNetmapToAdjustIOTag, zap.Error(err))
|
||||
return qosTagging.ContextWithIOTag(ctx, qos.IOTagClient.String())
|
||||
}
|
||||
for _, node := range nm.Nodes() {
|
||||
if bytes.Equal(node.PublicKey(), requestSignPublicKey) {
|
||||
return ctx
|
||||
}
|
||||
}
|
||||
return qosTagging.ContextWithIOTag(ctx, qos.IOTagClient.String())
|
||||
case qos.IOTagInternal:
|
||||
for _, pk := range s.allowedInternalPubs {
|
||||
if bytes.Equal(pk, requestSignPublicKey) {
|
||||
return ctx
|
||||
}
|
||||
}
|
||||
nm, err := s.netmapSource.GetNetMap(ctx, 0)
|
||||
if err != nil {
|
||||
s.logger.Debug(ctx, logs.FailedToGetNetmapToAdjustIOTag, zap.Error(err))
|
||||
return qosTagging.ContextWithIOTag(ctx, qos.IOTagClient.String())
|
||||
}
|
||||
for _, node := range nm.Nodes() {
|
||||
if bytes.Equal(node.PublicKey(), requestSignPublicKey) {
|
||||
return ctx
|
||||
}
|
||||
}
|
||||
return qosTagging.ContextWithIOTag(ctx, qos.IOTagClient.String())
|
||||
default:
|
||||
s.logger.Warn(ctx, logs.NotSupportedIncomingIOTagReplacedWithClient, zap.Stringer("io_tag", ioTag))
|
||||
return qosTagging.ContextWithIOTag(ctx, qos.IOTagClient.String())
|
||||
}
|
||||
}
|
||||
|
|
|
@ -510,4 +510,7 @@ const (
|
|||
BlobovniczatreeFailedToRemoveRebuildTempFile = "failed to remove rebuild temp file"
|
||||
WritecacheCantGetObject = "can't get an object from fstree"
|
||||
FailedToUpdateMultinetConfiguration = "failed to update multinet configuration"
|
||||
FailedToParseIncomingIOTag = "failed to parse incoming IO tag"
|
||||
NotSupportedIncomingIOTagReplacedWithClient = "incoming IO tag is not supported, replaced with `client`"
|
||||
FailedToGetNetmapToAdjustIOTag = "failed to get netmap to adjust IO tag, replaced with `client`"
|
||||
)
|
||||
|
|
|
@ -45,7 +45,7 @@ func TestUpgradeV2ToV3(t *testing.T) {
|
|||
|
||||
type testContainerInfoProvider struct{}
|
||||
|
||||
func (p *testContainerInfoProvider) Info(id cid.ID) (container.Info, error) {
|
||||
func (p *testContainerInfoProvider) Info(ctx context.Context, id cid.ID) (container.Info, error) {
|
||||
return container.Info{}, nil
|
||||
}
|
||||
|
||||
|
|
Loading…
Add table
Reference in a new issue