Compare commits
3 commits
master
...
feat/add-g
Author | SHA1 | Date | |
---|---|---|---|
d42f67e053 | |||
99340b2717 | |||
c16788f9c6 |
16 changed files with 340 additions and 357 deletions
|
@ -6,7 +6,6 @@ import (
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-contract/nns"
|
"git.frostfs.info/TrueCloudLab/frostfs-contract/nns"
|
||||||
nns2 "git.frostfs.info/TrueCloudLab/frostfs-contract/rpcclient/nns"
|
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-adm/internal/commonflags"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-adm/internal/commonflags"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-adm/internal/modules/config"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-adm/internal/modules/config"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-adm/internal/modules/morph/constants"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-adm/internal/modules/morph/constants"
|
||||||
|
@ -14,7 +13,9 @@ import (
|
||||||
"github.com/nspcc-dev/neo-go/pkg/core/native/nativenames"
|
"github.com/nspcc-dev/neo-go/pkg/core/native/nativenames"
|
||||||
"github.com/nspcc-dev/neo-go/pkg/crypto/keys"
|
"github.com/nspcc-dev/neo-go/pkg/crypto/keys"
|
||||||
"github.com/nspcc-dev/neo-go/pkg/encoding/address"
|
"github.com/nspcc-dev/neo-go/pkg/encoding/address"
|
||||||
|
"github.com/nspcc-dev/neo-go/pkg/rpcclient"
|
||||||
"github.com/nspcc-dev/neo-go/pkg/rpcclient/invoker"
|
"github.com/nspcc-dev/neo-go/pkg/rpcclient/invoker"
|
||||||
|
nns2 "github.com/nspcc-dev/neo-go/pkg/rpcclient/nns"
|
||||||
"github.com/nspcc-dev/neo-go/pkg/rpcclient/unwrap"
|
"github.com/nspcc-dev/neo-go/pkg/rpcclient/unwrap"
|
||||||
"github.com/nspcc-dev/neo-go/pkg/smartcontract/trigger"
|
"github.com/nspcc-dev/neo-go/pkg/smartcontract/trigger"
|
||||||
"github.com/nspcc-dev/neo-go/pkg/util"
|
"github.com/nspcc-dev/neo-go/pkg/util"
|
||||||
|
@ -186,9 +187,19 @@ func NNSResolveKey(inv *invoker.Invoker, nnsHash util.Uint160, domain string) (*
|
||||||
}
|
}
|
||||||
|
|
||||||
func NNSIsAvailable(c Client, nnsHash util.Uint160, name string) (bool, error) {
|
func NNSIsAvailable(c Client, nnsHash util.Uint160, name string) (bool, error) {
|
||||||
inv := invoker.New(c, nil)
|
switch c.(type) {
|
||||||
reader := nns2.NewReader(inv, nnsHash)
|
case *rpcclient.Client:
|
||||||
return reader.IsAvailable(name)
|
inv := invoker.New(c, nil)
|
||||||
|
reader := nns2.NewReader(inv, nnsHash)
|
||||||
|
return reader.IsAvailable(name)
|
||||||
|
default:
|
||||||
|
b, err := unwrap.Bool(InvokeFunction(c, nnsHash, "isAvailable", []any{name}, nil))
|
||||||
|
if err != nil {
|
||||||
|
return false, fmt.Errorf("`isAvailable`: invalid response: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
return b, nil
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func CheckNotaryEnabled(c Client) error {
|
func CheckNotaryEnabled(c Client) error {
|
||||||
|
|
|
@ -40,8 +40,6 @@ type ClientContext struct {
|
||||||
CommitteeAct *actor.Actor // committee actor with the Global witness scope
|
CommitteeAct *actor.Actor // committee actor with the Global witness scope
|
||||||
ReadOnlyInvoker *invoker.Invoker // R/O contract invoker, does not contain any signer
|
ReadOnlyInvoker *invoker.Invoker // R/O contract invoker, does not contain any signer
|
||||||
SentTxs []HashVUBPair
|
SentTxs []HashVUBPair
|
||||||
|
|
||||||
AwaitDisabled bool
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewRemoteClient(v *viper.Viper) (Client, error) {
|
func NewRemoteClient(v *viper.Viper) (Client, error) {
|
||||||
|
@ -122,7 +120,7 @@ func (c *ClientContext) SendTx(tx *transaction.Transaction, cmd *cobra.Command,
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *ClientContext) AwaitTx(cmd *cobra.Command) error {
|
func (c *ClientContext) AwaitTx(cmd *cobra.Command) error {
|
||||||
if len(c.SentTxs) == 0 || c.AwaitDisabled {
|
if len(c.SentTxs) == 0 {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -39,7 +39,6 @@ func initializeSideChainCmd(cmd *cobra.Command, _ []string) error {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
initCtx.AwaitDisabled = true
|
|
||||||
cmd.Println("Stage 4.1: Transfer GAS to proxy contract.")
|
cmd.Println("Stage 4.1: Transfer GAS to proxy contract.")
|
||||||
if err := transferGASToProxy(initCtx); err != nil {
|
if err := transferGASToProxy(initCtx); err != nil {
|
||||||
return err
|
return err
|
||||||
|
@ -56,10 +55,5 @@ func initializeSideChainCmd(cmd *cobra.Command, _ []string) error {
|
||||||
}
|
}
|
||||||
|
|
||||||
cmd.Println("Stage 7: set addresses in NNS.")
|
cmd.Println("Stage 7: set addresses in NNS.")
|
||||||
if err := setNNS(initCtx); err != nil {
|
return setNNS(initCtx)
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
initCtx.AwaitDisabled = false
|
|
||||||
return initCtx.AwaitTx()
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,6 +1,7 @@
|
||||||
package initialize
|
package initialize
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"errors"
|
||||||
"fmt"
|
"fmt"
|
||||||
"math/big"
|
"math/big"
|
||||||
|
|
||||||
|
@ -10,8 +11,11 @@ import (
|
||||||
"github.com/nspcc-dev/neo-go/pkg/core/state"
|
"github.com/nspcc-dev/neo-go/pkg/core/state"
|
||||||
"github.com/nspcc-dev/neo-go/pkg/core/transaction"
|
"github.com/nspcc-dev/neo-go/pkg/core/transaction"
|
||||||
"github.com/nspcc-dev/neo-go/pkg/io"
|
"github.com/nspcc-dev/neo-go/pkg/io"
|
||||||
|
"github.com/nspcc-dev/neo-go/pkg/rpcclient"
|
||||||
"github.com/nspcc-dev/neo-go/pkg/rpcclient/actor"
|
"github.com/nspcc-dev/neo-go/pkg/rpcclient/actor"
|
||||||
|
"github.com/nspcc-dev/neo-go/pkg/rpcclient/invoker"
|
||||||
"github.com/nspcc-dev/neo-go/pkg/rpcclient/neo"
|
"github.com/nspcc-dev/neo-go/pkg/rpcclient/neo"
|
||||||
|
"github.com/nspcc-dev/neo-go/pkg/rpcclient/nep17"
|
||||||
"github.com/nspcc-dev/neo-go/pkg/rpcclient/unwrap"
|
"github.com/nspcc-dev/neo-go/pkg/rpcclient/unwrap"
|
||||||
"github.com/nspcc-dev/neo-go/pkg/smartcontract/callflag"
|
"github.com/nspcc-dev/neo-go/pkg/smartcontract/callflag"
|
||||||
"github.com/nspcc-dev/neo-go/pkg/util"
|
"github.com/nspcc-dev/neo-go/pkg/util"
|
||||||
|
@ -26,8 +30,7 @@ const (
|
||||||
)
|
)
|
||||||
|
|
||||||
func registerCandidateRange(c *helper.InitializeContext, start, end int) error {
|
func registerCandidateRange(c *helper.InitializeContext, start, end int) error {
|
||||||
reader := neo.NewReader(c.ReadOnlyInvoker)
|
regPrice, err := getCandidateRegisterPrice(c)
|
||||||
regPrice, err := reader.GetRegisterPrice()
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("can't fetch registration price: %w", err)
|
return fmt.Errorf("can't fetch registration price: %w", err)
|
||||||
}
|
}
|
||||||
|
@ -113,7 +116,7 @@ func registerCandidates(c *helper.InitializeContext) error {
|
||||||
func transferNEOToAlphabetContracts(c *helper.InitializeContext) error {
|
func transferNEOToAlphabetContracts(c *helper.InitializeContext) error {
|
||||||
neoHash := neo.Hash
|
neoHash := neo.Hash
|
||||||
|
|
||||||
ok, err := transferNEOFinished(c)
|
ok, err := transferNEOFinished(c, neoHash)
|
||||||
if ok || err != nil {
|
if ok || err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
@ -136,8 +139,33 @@ func transferNEOToAlphabetContracts(c *helper.InitializeContext) error {
|
||||||
return c.AwaitTx()
|
return c.AwaitTx()
|
||||||
}
|
}
|
||||||
|
|
||||||
func transferNEOFinished(c *helper.InitializeContext) (bool, error) {
|
func transferNEOFinished(c *helper.InitializeContext, neoHash util.Uint160) (bool, error) {
|
||||||
r := neo.NewReader(c.ReadOnlyInvoker)
|
r := nep17.NewReader(c.ReadOnlyInvoker, neoHash)
|
||||||
bal, err := r.BalanceOf(c.CommitteeAcc.Contract.ScriptHash())
|
bal, err := r.BalanceOf(c.CommitteeAcc.Contract.ScriptHash())
|
||||||
return bal.Cmp(big.NewInt(native.NEOTotalSupply)) == -1, err
|
return bal.Cmp(big.NewInt(native.NEOTotalSupply)) == -1, err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
var errGetPriceInvalid = errors.New("`getRegisterPrice`: invalid response")
|
||||||
|
|
||||||
|
func getCandidateRegisterPrice(c *helper.InitializeContext) (int64, error) {
|
||||||
|
switch c.Client.(type) {
|
||||||
|
case *rpcclient.Client:
|
||||||
|
inv := invoker.New(c.Client, nil)
|
||||||
|
reader := neo.NewReader(inv)
|
||||||
|
return reader.GetRegisterPrice()
|
||||||
|
default:
|
||||||
|
neoHash := neo.Hash
|
||||||
|
res, err := helper.InvokeFunction(c.Client, neoHash, "getRegisterPrice", nil, nil)
|
||||||
|
if err != nil {
|
||||||
|
return 0, err
|
||||||
|
}
|
||||||
|
if len(res.Stack) == 0 {
|
||||||
|
return 0, errGetPriceInvalid
|
||||||
|
}
|
||||||
|
bi, err := res.Stack[0].TryInteger()
|
||||||
|
if err != nil || !bi.IsInt64() {
|
||||||
|
return 0, errGetPriceInvalid
|
||||||
|
}
|
||||||
|
return bi.Int64(), nil
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
|
@ -108,7 +108,6 @@ type applicationConfiguration struct {
|
||||||
level string
|
level string
|
||||||
destination string
|
destination string
|
||||||
timestamp bool
|
timestamp bool
|
||||||
options []zap.Option
|
|
||||||
}
|
}
|
||||||
|
|
||||||
ObjectCfg struct {
|
ObjectCfg struct {
|
||||||
|
@ -233,14 +232,6 @@ func (a *applicationConfiguration) readConfig(c *config.Config) error {
|
||||||
a.LoggerCfg.level = loggerconfig.Level(c)
|
a.LoggerCfg.level = loggerconfig.Level(c)
|
||||||
a.LoggerCfg.destination = loggerconfig.Destination(c)
|
a.LoggerCfg.destination = loggerconfig.Destination(c)
|
||||||
a.LoggerCfg.timestamp = loggerconfig.Timestamp(c)
|
a.LoggerCfg.timestamp = loggerconfig.Timestamp(c)
|
||||||
var opts []zap.Option
|
|
||||||
if loggerconfig.ToLokiConfig(c).Enabled {
|
|
||||||
opts = []zap.Option{zap.WrapCore(func(core zapcore.Core) zapcore.Core {
|
|
||||||
lokiCore := lokicore.New(core, loggerconfig.ToLokiConfig(c))
|
|
||||||
return lokiCore
|
|
||||||
})}
|
|
||||||
}
|
|
||||||
a.LoggerCfg.options = opts
|
|
||||||
|
|
||||||
// Object
|
// Object
|
||||||
|
|
||||||
|
@ -727,6 +718,12 @@ func initCfg(appCfg *config.Config) *cfg {
|
||||||
logPrm.SamplingHook = c.metricsCollector.LogMetrics().GetSamplingHook()
|
logPrm.SamplingHook = c.metricsCollector.LogMetrics().GetSamplingHook()
|
||||||
log, err := logger.NewLogger(logPrm)
|
log, err := logger.NewLogger(logPrm)
|
||||||
fatalOnErr(err)
|
fatalOnErr(err)
|
||||||
|
if loggerconfig.ToLokiConfig(appCfg).Enabled {
|
||||||
|
log.WithOptions(zap.WrapCore(func(core zapcore.Core) zapcore.Core {
|
||||||
|
lokiCore := lokicore.New(core, loggerconfig.ToLokiConfig(appCfg))
|
||||||
|
return lokiCore
|
||||||
|
}))
|
||||||
|
}
|
||||||
|
|
||||||
c.internals = initInternals(appCfg, log)
|
c.internals = initInternals(appCfg, log)
|
||||||
|
|
||||||
|
@ -1093,7 +1090,6 @@ func (c *cfg) loggerPrm() (logger.Prm, error) {
|
||||||
return logger.Prm{}, errors.New("incorrect log destination format: " + c.LoggerCfg.destination)
|
return logger.Prm{}, errors.New("incorrect log destination format: " + c.LoggerCfg.destination)
|
||||||
}
|
}
|
||||||
prm.PrependTimestamp = c.LoggerCfg.timestamp
|
prm.PrependTimestamp = c.LoggerCfg.timestamp
|
||||||
prm.Options = c.LoggerCfg.options
|
|
||||||
|
|
||||||
return prm, nil
|
return prm, nil
|
||||||
}
|
}
|
||||||
|
|
|
@ -43,9 +43,6 @@ func initQoSService(c *cfg) {
|
||||||
func (s *cfgQoSService) AdjustIncomingTag(ctx context.Context, requestSignPublicKey []byte) context.Context {
|
func (s *cfgQoSService) AdjustIncomingTag(ctx context.Context, requestSignPublicKey []byte) context.Context {
|
||||||
rawTag, defined := qosTagging.IOTagFromContext(ctx)
|
rawTag, defined := qosTagging.IOTagFromContext(ctx)
|
||||||
if !defined {
|
if !defined {
|
||||||
if s.isInternalIOTagPublicKey(ctx, requestSignPublicKey) {
|
|
||||||
return qosTagging.ContextWithIOTag(ctx, qos.IOTagInternal.String())
|
|
||||||
}
|
|
||||||
return qosTagging.ContextWithIOTag(ctx, qos.IOTagClient.String())
|
return qosTagging.ContextWithIOTag(ctx, qos.IOTagClient.String())
|
||||||
}
|
}
|
||||||
ioTag, err := qos.FromRawString(rawTag)
|
ioTag, err := qos.FromRawString(rawTag)
|
||||||
|
@ -76,8 +73,20 @@ func (s *cfgQoSService) AdjustIncomingTag(ctx context.Context, requestSignPublic
|
||||||
s.logger.Debug(ctx, logs.FailedToValidateIncomingIOTag)
|
s.logger.Debug(ctx, logs.FailedToValidateIncomingIOTag)
|
||||||
return qosTagging.ContextWithIOTag(ctx, qos.IOTagClient.String())
|
return qosTagging.ContextWithIOTag(ctx, qos.IOTagClient.String())
|
||||||
case qos.IOTagInternal:
|
case qos.IOTagInternal:
|
||||||
if s.isInternalIOTagPublicKey(ctx, requestSignPublicKey) {
|
for _, pk := range s.allowedInternalPubs {
|
||||||
return ctx
|
if bytes.Equal(pk, requestSignPublicKey) {
|
||||||
|
return ctx
|
||||||
|
}
|
||||||
|
}
|
||||||
|
nm, err := s.netmapSource.GetNetMap(ctx, 0)
|
||||||
|
if err != nil {
|
||||||
|
s.logger.Debug(ctx, logs.FailedToGetNetmapToAdjustIOTag, zap.Error(err))
|
||||||
|
return qosTagging.ContextWithIOTag(ctx, qos.IOTagClient.String())
|
||||||
|
}
|
||||||
|
for _, node := range nm.Nodes() {
|
||||||
|
if bytes.Equal(node.PublicKey(), requestSignPublicKey) {
|
||||||
|
return ctx
|
||||||
|
}
|
||||||
}
|
}
|
||||||
s.logger.Debug(ctx, logs.FailedToValidateIncomingIOTag)
|
s.logger.Debug(ctx, logs.FailedToValidateIncomingIOTag)
|
||||||
return qosTagging.ContextWithIOTag(ctx, qos.IOTagClient.String())
|
return qosTagging.ContextWithIOTag(ctx, qos.IOTagClient.String())
|
||||||
|
@ -86,23 +95,3 @@ func (s *cfgQoSService) AdjustIncomingTag(ctx context.Context, requestSignPublic
|
||||||
return qosTagging.ContextWithIOTag(ctx, qos.IOTagClient.String())
|
return qosTagging.ContextWithIOTag(ctx, qos.IOTagClient.String())
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *cfgQoSService) isInternalIOTagPublicKey(ctx context.Context, publicKey []byte) bool {
|
|
||||||
for _, pk := range s.allowedInternalPubs {
|
|
||||||
if bytes.Equal(pk, publicKey) {
|
|
||||||
return true
|
|
||||||
}
|
|
||||||
}
|
|
||||||
nm, err := s.netmapSource.GetNetMap(ctx, 0)
|
|
||||||
if err != nil {
|
|
||||||
s.logger.Debug(ctx, logs.FailedToGetNetmapToAdjustIOTag, zap.Error(err))
|
|
||||||
return false
|
|
||||||
}
|
|
||||||
for _, node := range nm.Nodes() {
|
|
||||||
if bytes.Equal(node.PublicKey(), publicKey) {
|
|
||||||
return true
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
return false
|
|
||||||
}
|
|
||||||
|
|
|
@ -1,226 +0,0 @@
|
||||||
package main
|
|
||||||
|
|
||||||
import (
|
|
||||||
"context"
|
|
||||||
"testing"
|
|
||||||
|
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/qos"
|
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger/test"
|
|
||||||
utilTesting "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/testing"
|
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-qos/tagging"
|
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/netmap"
|
|
||||||
"github.com/nspcc-dev/neo-go/pkg/crypto/keys"
|
|
||||||
"github.com/stretchr/testify/require"
|
|
||||||
)
|
|
||||||
|
|
||||||
func TestQoSService_Client(t *testing.T) {
|
|
||||||
t.Parallel()
|
|
||||||
s, pk := testQoSServicePrepare(t)
|
|
||||||
t.Run("IO tag client defined", func(t *testing.T) {
|
|
||||||
ctx := tagging.ContextWithIOTag(context.Background(), qos.IOTagClient.String())
|
|
||||||
ctx = s.AdjustIncomingTag(ctx, pk.Request)
|
|
||||||
tag, ok := tagging.IOTagFromContext(ctx)
|
|
||||||
require.True(t, ok)
|
|
||||||
require.Equal(t, qos.IOTagClient.String(), tag)
|
|
||||||
})
|
|
||||||
t.Run("no IO tag defined, signed with unknown key", func(t *testing.T) {
|
|
||||||
ctx := s.AdjustIncomingTag(context.Background(), pk.Request)
|
|
||||||
tag, ok := tagging.IOTagFromContext(ctx)
|
|
||||||
require.True(t, ok)
|
|
||||||
require.Equal(t, qos.IOTagClient.String(), tag)
|
|
||||||
})
|
|
||||||
t.Run("no IO tag defined, signed with allowed critical key", func(t *testing.T) {
|
|
||||||
ctx := s.AdjustIncomingTag(context.Background(), pk.Critical)
|
|
||||||
tag, ok := tagging.IOTagFromContext(ctx)
|
|
||||||
require.True(t, ok)
|
|
||||||
require.Equal(t, qos.IOTagClient.String(), tag)
|
|
||||||
})
|
|
||||||
t.Run("unknown IO tag, signed with unknown key", func(t *testing.T) {
|
|
||||||
ctx := tagging.ContextWithIOTag(context.Background(), "some IO tag we don't know")
|
|
||||||
ctx = s.AdjustIncomingTag(ctx, pk.Request)
|
|
||||||
tag, ok := tagging.IOTagFromContext(ctx)
|
|
||||||
require.True(t, ok)
|
|
||||||
require.Equal(t, qos.IOTagClient.String(), tag)
|
|
||||||
})
|
|
||||||
t.Run("unknown IO tag, signed with netmap key", func(t *testing.T) {
|
|
||||||
ctx := tagging.ContextWithIOTag(context.Background(), "some IO tag we don't know")
|
|
||||||
ctx = s.AdjustIncomingTag(ctx, pk.NetmapNode)
|
|
||||||
tag, ok := tagging.IOTagFromContext(ctx)
|
|
||||||
require.True(t, ok)
|
|
||||||
require.Equal(t, qos.IOTagClient.String(), tag)
|
|
||||||
})
|
|
||||||
t.Run("unknown IO tag, signed with allowed internal key", func(t *testing.T) {
|
|
||||||
ctx := tagging.ContextWithIOTag(context.Background(), "some IO tag we don't know")
|
|
||||||
ctx = s.AdjustIncomingTag(ctx, pk.Internal)
|
|
||||||
tag, ok := tagging.IOTagFromContext(ctx)
|
|
||||||
require.True(t, ok)
|
|
||||||
require.Equal(t, qos.IOTagClient.String(), tag)
|
|
||||||
})
|
|
||||||
t.Run("unknown IO tag, signed with allowed critical key", func(t *testing.T) {
|
|
||||||
ctx := tagging.ContextWithIOTag(context.Background(), "some IO tag we don't know")
|
|
||||||
ctx = s.AdjustIncomingTag(ctx, pk.Critical)
|
|
||||||
tag, ok := tagging.IOTagFromContext(ctx)
|
|
||||||
require.True(t, ok)
|
|
||||||
require.Equal(t, qos.IOTagClient.String(), tag)
|
|
||||||
})
|
|
||||||
t.Run("IO tag internal defined, signed with unknown key", func(t *testing.T) {
|
|
||||||
ctx := tagging.ContextWithIOTag(context.Background(), qos.IOTagInternal.String())
|
|
||||||
ctx = s.AdjustIncomingTag(ctx, pk.Request)
|
|
||||||
tag, ok := tagging.IOTagFromContext(ctx)
|
|
||||||
require.True(t, ok)
|
|
||||||
require.Equal(t, qos.IOTagClient.String(), tag)
|
|
||||||
})
|
|
||||||
t.Run("IO tag internal defined, signed with allowed critical key", func(t *testing.T) {
|
|
||||||
ctx := tagging.ContextWithIOTag(context.Background(), qos.IOTagInternal.String())
|
|
||||||
ctx = s.AdjustIncomingTag(ctx, pk.Critical)
|
|
||||||
tag, ok := tagging.IOTagFromContext(ctx)
|
|
||||||
require.True(t, ok)
|
|
||||||
require.Equal(t, qos.IOTagClient.String(), tag)
|
|
||||||
})
|
|
||||||
t.Run("IO tag critical defined, signed with unknown key", func(t *testing.T) {
|
|
||||||
ctx := tagging.ContextWithIOTag(context.Background(), qos.IOTagCritical.String())
|
|
||||||
ctx = s.AdjustIncomingTag(ctx, pk.Request)
|
|
||||||
tag, ok := tagging.IOTagFromContext(ctx)
|
|
||||||
require.True(t, ok)
|
|
||||||
require.Equal(t, qos.IOTagClient.String(), tag)
|
|
||||||
})
|
|
||||||
t.Run("IO tag critical defined, signed with allowed internal key", func(t *testing.T) {
|
|
||||||
ctx := tagging.ContextWithIOTag(context.Background(), qos.IOTagCritical.String())
|
|
||||||
ctx = s.AdjustIncomingTag(ctx, pk.Internal)
|
|
||||||
tag, ok := tagging.IOTagFromContext(ctx)
|
|
||||||
require.True(t, ok)
|
|
||||||
require.Equal(t, qos.IOTagClient.String(), tag)
|
|
||||||
})
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestQoSService_Internal(t *testing.T) {
|
|
||||||
t.Parallel()
|
|
||||||
s, pk := testQoSServicePrepare(t)
|
|
||||||
t.Run("IO tag internal defined, signed with netmap key", func(t *testing.T) {
|
|
||||||
ctx := tagging.ContextWithIOTag(context.Background(), qos.IOTagInternal.String())
|
|
||||||
ctx = s.AdjustIncomingTag(ctx, pk.NetmapNode)
|
|
||||||
tag, ok := tagging.IOTagFromContext(ctx)
|
|
||||||
require.True(t, ok)
|
|
||||||
require.Equal(t, qos.IOTagInternal.String(), tag)
|
|
||||||
})
|
|
||||||
t.Run("IO tag internal defined, signed with allowed internal key", func(t *testing.T) {
|
|
||||||
ctx := tagging.ContextWithIOTag(context.Background(), qos.IOTagInternal.String())
|
|
||||||
ctx = s.AdjustIncomingTag(ctx, pk.Internal)
|
|
||||||
tag, ok := tagging.IOTagFromContext(ctx)
|
|
||||||
require.True(t, ok)
|
|
||||||
require.Equal(t, qos.IOTagInternal.String(), tag)
|
|
||||||
})
|
|
||||||
t.Run("no IO tag defined, signed with netmap key", func(t *testing.T) {
|
|
||||||
ctx := s.AdjustIncomingTag(context.Background(), pk.NetmapNode)
|
|
||||||
tag, ok := tagging.IOTagFromContext(ctx)
|
|
||||||
require.True(t, ok)
|
|
||||||
require.Equal(t, qos.IOTagInternal.String(), tag)
|
|
||||||
})
|
|
||||||
t.Run("no IO tag defined, signed with allowed internal key", func(t *testing.T) {
|
|
||||||
ctx := s.AdjustIncomingTag(context.Background(), pk.Internal)
|
|
||||||
tag, ok := tagging.IOTagFromContext(ctx)
|
|
||||||
require.True(t, ok)
|
|
||||||
require.Equal(t, qos.IOTagInternal.String(), tag)
|
|
||||||
})
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestQoSService_Critical(t *testing.T) {
|
|
||||||
t.Parallel()
|
|
||||||
s, pk := testQoSServicePrepare(t)
|
|
||||||
t.Run("IO tag critical defined, signed with netmap key", func(t *testing.T) {
|
|
||||||
ctx := tagging.ContextWithIOTag(context.Background(), qos.IOTagCritical.String())
|
|
||||||
ctx = s.AdjustIncomingTag(ctx, pk.NetmapNode)
|
|
||||||
tag, ok := tagging.IOTagFromContext(ctx)
|
|
||||||
require.True(t, ok)
|
|
||||||
require.Equal(t, qos.IOTagCritical.String(), tag)
|
|
||||||
})
|
|
||||||
t.Run("IO tag critical defined, signed with allowed critical key", func(t *testing.T) {
|
|
||||||
ctx := tagging.ContextWithIOTag(context.Background(), qos.IOTagCritical.String())
|
|
||||||
ctx = s.AdjustIncomingTag(ctx, pk.Critical)
|
|
||||||
tag, ok := tagging.IOTagFromContext(ctx)
|
|
||||||
require.True(t, ok)
|
|
||||||
require.Equal(t, qos.IOTagCritical.String(), tag)
|
|
||||||
})
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestQoSService_NetmapGetError(t *testing.T) {
|
|
||||||
t.Parallel()
|
|
||||||
s, pk := testQoSServicePrepare(t)
|
|
||||||
s.netmapSource = &utilTesting.TestNetmapSource{}
|
|
||||||
t.Run("IO tag internal defined, signed with netmap key", func(t *testing.T) {
|
|
||||||
ctx := tagging.ContextWithIOTag(context.Background(), qos.IOTagInternal.String())
|
|
||||||
ctx = s.AdjustIncomingTag(ctx, pk.NetmapNode)
|
|
||||||
tag, ok := tagging.IOTagFromContext(ctx)
|
|
||||||
require.True(t, ok)
|
|
||||||
require.Equal(t, qos.IOTagClient.String(), tag)
|
|
||||||
})
|
|
||||||
t.Run("IO tag critical defined, signed with netmap key", func(t *testing.T) {
|
|
||||||
ctx := tagging.ContextWithIOTag(context.Background(), qos.IOTagCritical.String())
|
|
||||||
ctx = s.AdjustIncomingTag(ctx, pk.NetmapNode)
|
|
||||||
tag, ok := tagging.IOTagFromContext(ctx)
|
|
||||||
require.True(t, ok)
|
|
||||||
require.Equal(t, qos.IOTagClient.String(), tag)
|
|
||||||
})
|
|
||||||
t.Run("no IO tag defined, signed with netmap key", func(t *testing.T) {
|
|
||||||
ctx := s.AdjustIncomingTag(context.Background(), pk.NetmapNode)
|
|
||||||
tag, ok := tagging.IOTagFromContext(ctx)
|
|
||||||
require.True(t, ok)
|
|
||||||
require.Equal(t, qos.IOTagClient.String(), tag)
|
|
||||||
})
|
|
||||||
t.Run("unknown IO tag, signed with netmap key", func(t *testing.T) {
|
|
||||||
ctx := tagging.ContextWithIOTag(context.Background(), "some IO tag we don't know")
|
|
||||||
ctx = s.AdjustIncomingTag(ctx, pk.NetmapNode)
|
|
||||||
tag, ok := tagging.IOTagFromContext(ctx)
|
|
||||||
require.True(t, ok)
|
|
||||||
require.Equal(t, qos.IOTagClient.String(), tag)
|
|
||||||
})
|
|
||||||
}
|
|
||||||
|
|
||||||
func testQoSServicePrepare(t *testing.T) (*cfgQoSService, *testQoSServicePublicKeys) {
|
|
||||||
nmSigner, err := keys.NewPrivateKey()
|
|
||||||
require.NoError(t, err)
|
|
||||||
|
|
||||||
reqSigner, err := keys.NewPrivateKey()
|
|
||||||
require.NoError(t, err)
|
|
||||||
|
|
||||||
allowedCritSigner, err := keys.NewPrivateKey()
|
|
||||||
require.NoError(t, err)
|
|
||||||
|
|
||||||
allowedIntSigner, err := keys.NewPrivateKey()
|
|
||||||
require.NoError(t, err)
|
|
||||||
|
|
||||||
var node netmap.NodeInfo
|
|
||||||
node.SetPublicKey(nmSigner.PublicKey().Bytes())
|
|
||||||
nm := &netmap.NetMap{}
|
|
||||||
nm.SetEpoch(100)
|
|
||||||
nm.SetNodes([]netmap.NodeInfo{node})
|
|
||||||
|
|
||||||
return &cfgQoSService{
|
|
||||||
logger: test.NewLogger(t),
|
|
||||||
netmapSource: &utilTesting.TestNetmapSource{
|
|
||||||
Netmaps: map[uint64]*netmap.NetMap{
|
|
||||||
100: nm,
|
|
||||||
},
|
|
||||||
CurrentEpoch: 100,
|
|
||||||
},
|
|
||||||
allowedCriticalPubs: [][]byte{
|
|
||||||
allowedCritSigner.PublicKey().Bytes(),
|
|
||||||
},
|
|
||||||
allowedInternalPubs: [][]byte{
|
|
||||||
allowedIntSigner.PublicKey().Bytes(),
|
|
||||||
},
|
|
||||||
},
|
|
||||||
&testQoSServicePublicKeys{
|
|
||||||
NetmapNode: nmSigner.PublicKey().Bytes(),
|
|
||||||
Request: reqSigner.PublicKey().Bytes(),
|
|
||||||
Internal: allowedIntSigner.PublicKey().Bytes(),
|
|
||||||
Critical: allowedCritSigner.PublicKey().Bytes(),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
type testQoSServicePublicKeys struct {
|
|
||||||
NetmapNode []byte
|
|
||||||
Request []byte
|
|
||||||
Internal []byte
|
|
||||||
Critical []byte
|
|
||||||
}
|
|
4
go.mod
4
go.mod
|
@ -8,8 +8,8 @@ require (
|
||||||
git.frostfs.info/TrueCloudLab/frostfs-crypto v0.6.0
|
git.frostfs.info/TrueCloudLab/frostfs-crypto v0.6.0
|
||||||
git.frostfs.info/TrueCloudLab/frostfs-locode-db v0.4.1-0.20240710074952-65761deb5c0d
|
git.frostfs.info/TrueCloudLab/frostfs-locode-db v0.4.1-0.20240710074952-65761deb5c0d
|
||||||
git.frostfs.info/TrueCloudLab/frostfs-observability v0.0.0-20250321063246-93b681a20248
|
git.frostfs.info/TrueCloudLab/frostfs-observability v0.0.0-20250321063246-93b681a20248
|
||||||
git.frostfs.info/TrueCloudLab/frostfs-qos v0.0.0-20250324133647-57d895c32167
|
git.frostfs.info/TrueCloudLab/frostfs-qos v0.0.0-20250320142439-32079ad7c275
|
||||||
git.frostfs.info/TrueCloudLab/frostfs-sdk-go v0.0.0-20250307150202-749b4e9ab592
|
git.frostfs.info/TrueCloudLab/frostfs-sdk-go v0.0.0-20250306092416-69b0711d12d9
|
||||||
git.frostfs.info/TrueCloudLab/hrw v1.2.1
|
git.frostfs.info/TrueCloudLab/hrw v1.2.1
|
||||||
git.frostfs.info/TrueCloudLab/multinet v0.0.0-20241015075604-6cb0d80e0972
|
git.frostfs.info/TrueCloudLab/multinet v0.0.0-20241015075604-6cb0d80e0972
|
||||||
git.frostfs.info/TrueCloudLab/policy-engine v0.0.0-20240822104152-a3bc3099bd5b
|
git.frostfs.info/TrueCloudLab/policy-engine v0.0.0-20240822104152-a3bc3099bd5b
|
||||||
|
|
8
go.sum
8
go.sum
|
@ -8,10 +8,10 @@ git.frostfs.info/TrueCloudLab/frostfs-locode-db v0.4.1-0.20240710074952-65761deb
|
||||||
git.frostfs.info/TrueCloudLab/frostfs-locode-db v0.4.1-0.20240710074952-65761deb5c0d/go.mod h1:7ZZq8iguY7qFsXajdHGmZd2AW4QbucyrJwhbsRfOfek=
|
git.frostfs.info/TrueCloudLab/frostfs-locode-db v0.4.1-0.20240710074952-65761deb5c0d/go.mod h1:7ZZq8iguY7qFsXajdHGmZd2AW4QbucyrJwhbsRfOfek=
|
||||||
git.frostfs.info/TrueCloudLab/frostfs-observability v0.0.0-20250321063246-93b681a20248 h1:fluzML8BIIabd07LyPSjc0JAV2qymWkPiFaLrXdALLA=
|
git.frostfs.info/TrueCloudLab/frostfs-observability v0.0.0-20250321063246-93b681a20248 h1:fluzML8BIIabd07LyPSjc0JAV2qymWkPiFaLrXdALLA=
|
||||||
git.frostfs.info/TrueCloudLab/frostfs-observability v0.0.0-20250321063246-93b681a20248/go.mod h1:kbwB4v2o6RyOfCo9kEFeUDZIX3LKhmS0yXPrtvzkQ1g=
|
git.frostfs.info/TrueCloudLab/frostfs-observability v0.0.0-20250321063246-93b681a20248/go.mod h1:kbwB4v2o6RyOfCo9kEFeUDZIX3LKhmS0yXPrtvzkQ1g=
|
||||||
git.frostfs.info/TrueCloudLab/frostfs-qos v0.0.0-20250324133647-57d895c32167 h1:NhqfqNcATndYwx413BaaYXxVJbkeu2vQOtVyxXw5xCQ=
|
git.frostfs.info/TrueCloudLab/frostfs-qos v0.0.0-20250320142439-32079ad7c275 h1:WqWxCnCl2ekfjWja/CpGeF2rf4h0x199xhdnsm/j+E8=
|
||||||
git.frostfs.info/TrueCloudLab/frostfs-qos v0.0.0-20250324133647-57d895c32167/go.mod h1:PCijYq4oa8vKtIEcUX6jRiszI6XAW+nBwU+T1kB4d1U=
|
git.frostfs.info/TrueCloudLab/frostfs-qos v0.0.0-20250320142439-32079ad7c275/go.mod h1:PCijYq4oa8vKtIEcUX6jRiszI6XAW+nBwU+T1kB4d1U=
|
||||||
git.frostfs.info/TrueCloudLab/frostfs-sdk-go v0.0.0-20250307150202-749b4e9ab592 h1:n7Pl8V7O1yS07J/fqdbzZjVe/mQW42a7eS0QHfgrzJw=
|
git.frostfs.info/TrueCloudLab/frostfs-sdk-go v0.0.0-20250306092416-69b0711d12d9 h1:svCl6NDAPZ/KuQPjdVKo74RkCIANesxUPM45zQZDhSw=
|
||||||
git.frostfs.info/TrueCloudLab/frostfs-sdk-go v0.0.0-20250307150202-749b4e9ab592/go.mod h1:aQpPWfG8oyfJ2X+FenPTJpSRWZjwcP5/RAtkW+/VEX8=
|
git.frostfs.info/TrueCloudLab/frostfs-sdk-go v0.0.0-20250306092416-69b0711d12d9/go.mod h1:aQpPWfG8oyfJ2X+FenPTJpSRWZjwcP5/RAtkW+/VEX8=
|
||||||
git.frostfs.info/TrueCloudLab/hrw v1.2.1 h1:ccBRK21rFvY5R1WotI6LNoPlizk7qSvdfD8lNIRudVc=
|
git.frostfs.info/TrueCloudLab/hrw v1.2.1 h1:ccBRK21rFvY5R1WotI6LNoPlizk7qSvdfD8lNIRudVc=
|
||||||
git.frostfs.info/TrueCloudLab/hrw v1.2.1/go.mod h1:C1Ygde2n843yTZEQ0FP69jYiuaYV0kriLvP4zm8JuvM=
|
git.frostfs.info/TrueCloudLab/hrw v1.2.1/go.mod h1:C1Ygde2n843yTZEQ0FP69jYiuaYV0kriLvP4zm8JuvM=
|
||||||
git.frostfs.info/TrueCloudLab/multinet v0.0.0-20241015075604-6cb0d80e0972 h1:/960fWeyn2AFHwQUwDsWB3sbP6lTEnFnMzLMM6tx6N8=
|
git.frostfs.info/TrueCloudLab/multinet v0.0.0-20241015075604-6cb0d80e0972 h1:/960fWeyn2AFHwQUwDsWB3sbP6lTEnFnMzLMM6tx6N8=
|
||||||
|
|
|
@ -512,7 +512,7 @@ const (
|
||||||
FailedToUpdateMultinetConfiguration = "failed to update multinet configuration"
|
FailedToUpdateMultinetConfiguration = "failed to update multinet configuration"
|
||||||
FailedToParseIncomingIOTag = "failed to parse incoming IO tag"
|
FailedToParseIncomingIOTag = "failed to parse incoming IO tag"
|
||||||
NotSupportedIncomingIOTagReplacedWithClient = "incoming IO tag is not supported, replaced with `client`"
|
NotSupportedIncomingIOTagReplacedWithClient = "incoming IO tag is not supported, replaced with `client`"
|
||||||
FailedToGetNetmapToAdjustIOTag = "failed to get netmap to adjust IO tag"
|
FailedToGetNetmapToAdjustIOTag = "failed to get netmap to adjust IO tag, replaced with `client`"
|
||||||
FailedToValidateIncomingIOTag = "failed to validate incoming IO tag, replaced with `client`"
|
FailedToValidateIncomingIOTag = "failed to validate incoming IO tag, replaced with `client`"
|
||||||
WriteCacheFailedToAcquireRPSQuota = "writecache failed to acquire RPS quota to flush object"
|
WriteCacheFailedToAcquireRPSQuota = "writecache failed to acquire RPS quota to flush object"
|
||||||
)
|
)
|
||||||
|
|
214
internal/qos/grpc_test.go
Normal file
214
internal/qos/grpc_test.go
Normal file
|
@ -0,0 +1,214 @@
|
||||||
|
package qos_test
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"errors"
|
||||||
|
"fmt"
|
||||||
|
"testing"
|
||||||
|
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/qos"
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-qos/limiting"
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-qos/tagging"
|
||||||
|
apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status"
|
||||||
|
"github.com/stretchr/testify/require"
|
||||||
|
"google.golang.org/grpc"
|
||||||
|
)
|
||||||
|
|
||||||
|
const (
|
||||||
|
okKey = "ok"
|
||||||
|
)
|
||||||
|
|
||||||
|
var (
|
||||||
|
errTest = errors.New("mock")
|
||||||
|
errWrongTag = errors.New("wrong tag")
|
||||||
|
errNoTag = errors.New("failed to get tag from context")
|
||||||
|
errResExhausted = new(apistatus.ResourceExhausted)
|
||||||
|
tags = []qos.IOTag{qos.IOTagBackground, qos.IOTagWritecache, qos.IOTagPolicer, qos.IOTagTreeSync}
|
||||||
|
)
|
||||||
|
|
||||||
|
type mockGRPCServerStream struct {
|
||||||
|
grpc.ServerStream
|
||||||
|
|
||||||
|
ctx context.Context
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *mockGRPCServerStream) Context() context.Context {
|
||||||
|
return m.ctx
|
||||||
|
}
|
||||||
|
|
||||||
|
type limiter struct {
|
||||||
|
released bool
|
||||||
|
}
|
||||||
|
|
||||||
|
func (l *limiter) Acquire(key string) (limiting.ReleaseFunc, bool) {
|
||||||
|
if key != okKey {
|
||||||
|
return nil, false
|
||||||
|
}
|
||||||
|
return func() { l.released = true }, true
|
||||||
|
}
|
||||||
|
|
||||||
|
func unaryMaxActiveRPCLimiter(ctx context.Context, lim *limiter, methodName string) (bool, error) {
|
||||||
|
interceptor := qos.NewMaxActiveRPCLimiterUnaryServerInterceptor(func() limiting.Limiter { return lim })
|
||||||
|
called := false
|
||||||
|
handler := func(ctx context.Context, req any) (any, error) {
|
||||||
|
called = true
|
||||||
|
return nil, errTest
|
||||||
|
}
|
||||||
|
_, err := interceptor(ctx, nil, &grpc.UnaryServerInfo{FullMethod: methodName}, handler)
|
||||||
|
return called, err
|
||||||
|
}
|
||||||
|
|
||||||
|
func streamMaxActiveRPCLimiter(ctx context.Context, lim *limiter, methodName string) (bool, error) {
|
||||||
|
interceptor := qos.NewMaxActiveRPCLimiterStreamServerInterceptor(func() limiting.Limiter { return lim })
|
||||||
|
called := false
|
||||||
|
handler := func(srv any, stream grpc.ServerStream) error {
|
||||||
|
called = true
|
||||||
|
return errTest
|
||||||
|
}
|
||||||
|
err := interceptor(nil, &mockGRPCServerStream{ctx: ctx}, &grpc.StreamServerInfo{
|
||||||
|
FullMethod: methodName,
|
||||||
|
}, handler)
|
||||||
|
return called, err
|
||||||
|
}
|
||||||
|
|
||||||
|
func Test_MaxActiveRPCLimiter(t *testing.T) {
|
||||||
|
// UnaryServerInterceptor
|
||||||
|
t.Run("unary fail", func(t *testing.T) {
|
||||||
|
var lim limiter
|
||||||
|
|
||||||
|
called, err := unaryMaxActiveRPCLimiter(context.Background(), &lim, "")
|
||||||
|
require.EqualError(t, err, errResExhausted.Error())
|
||||||
|
require.False(t, called)
|
||||||
|
})
|
||||||
|
t.Run("unary pass critical", func(t *testing.T) {
|
||||||
|
var lim limiter
|
||||||
|
ctx := tagging.ContextWithIOTag(context.Background(), qos.IOTagCritical.String())
|
||||||
|
|
||||||
|
called, err := unaryMaxActiveRPCLimiter(ctx, &lim, "")
|
||||||
|
require.EqualError(t, err, errTest.Error())
|
||||||
|
require.True(t, called)
|
||||||
|
require.False(t, lim.released)
|
||||||
|
})
|
||||||
|
t.Run("unary pass", func(t *testing.T) {
|
||||||
|
var lim limiter
|
||||||
|
|
||||||
|
called, err := unaryMaxActiveRPCLimiter(context.Background(), &lim, okKey)
|
||||||
|
require.EqualError(t, err, errTest.Error())
|
||||||
|
require.True(t, called && lim.released)
|
||||||
|
})
|
||||||
|
// StreamServerInterceptor
|
||||||
|
t.Run("stream fail", func(t *testing.T) {
|
||||||
|
var lim limiter
|
||||||
|
|
||||||
|
called, err := streamMaxActiveRPCLimiter(context.Background(), &lim, "")
|
||||||
|
require.EqualError(t, err, errResExhausted.Error())
|
||||||
|
require.False(t, called)
|
||||||
|
})
|
||||||
|
t.Run("stream pass critical", func(t *testing.T) {
|
||||||
|
var lim limiter
|
||||||
|
ctx := tagging.ContextWithIOTag(context.Background(), qos.IOTagCritical.String())
|
||||||
|
|
||||||
|
called, err := streamMaxActiveRPCLimiter(ctx, &lim, "")
|
||||||
|
require.EqualError(t, err, errTest.Error())
|
||||||
|
require.True(t, called)
|
||||||
|
require.False(t, lim.released)
|
||||||
|
})
|
||||||
|
t.Run("stream pass", func(t *testing.T) {
|
||||||
|
var lim limiter
|
||||||
|
|
||||||
|
called, err := streamMaxActiveRPCLimiter(context.Background(), &lim, okKey)
|
||||||
|
require.EqualError(t, err, errTest.Error())
|
||||||
|
require.True(t, called && lim.released)
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestSetCriticalIOTagUnaryServerInterceptor_Pass(t *testing.T) {
|
||||||
|
interceptor := qos.NewSetCriticalIOTagUnaryServerInterceptor()
|
||||||
|
handler := func(ctx context.Context, req any) (any, error) {
|
||||||
|
if tag, ok := tagging.IOTagFromContext(ctx); ok && tag == qos.IOTagCritical.String() {
|
||||||
|
return nil, nil
|
||||||
|
}
|
||||||
|
return nil, errWrongTag
|
||||||
|
}
|
||||||
|
_, err := interceptor(context.Background(), nil, nil, handler)
|
||||||
|
require.NoError(t, err)
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestAdjustOutgoingIOTagUnaryClientInterceptor(t *testing.T) {
|
||||||
|
interceptor := qos.NewAdjustOutgoingIOTagUnaryClientInterceptor()
|
||||||
|
|
||||||
|
// check context with no value
|
||||||
|
called := false
|
||||||
|
invoker := func(ctx context.Context, method string, req, reply any, cc *grpc.ClientConn, opts ...grpc.CallOption) error {
|
||||||
|
called = true
|
||||||
|
if _, ok := tagging.IOTagFromContext(ctx); ok {
|
||||||
|
return fmt.Errorf("%v: expected no IO tags", errWrongTag)
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
require.NoError(t, interceptor(context.Background(), "", nil, nil, nil, invoker, nil))
|
||||||
|
require.True(t, called)
|
||||||
|
|
||||||
|
// check context for internal tag
|
||||||
|
targetTag := qos.IOTagInternal.String()
|
||||||
|
invoker = func(ctx context.Context, method string, req, reply any, cc *grpc.ClientConn, opts ...grpc.CallOption) error {
|
||||||
|
raw, ok := tagging.IOTagFromContext(ctx)
|
||||||
|
if !ok {
|
||||||
|
return errNoTag
|
||||||
|
}
|
||||||
|
if raw != targetTag {
|
||||||
|
return errWrongTag
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
for _, tag := range tags {
|
||||||
|
ctx := tagging.ContextWithIOTag(context.Background(), tag.String())
|
||||||
|
require.NoError(t, interceptor(ctx, "", nil, nil, nil, invoker, nil))
|
||||||
|
}
|
||||||
|
|
||||||
|
// check context for client tag
|
||||||
|
ctx := tagging.ContextWithIOTag(context.Background(), "")
|
||||||
|
targetTag = qos.IOTagClient.String()
|
||||||
|
require.NoError(t, interceptor(ctx, "", nil, nil, nil, invoker, nil))
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestAdjustOutgoingIOTagStreamClientInterceptor(t *testing.T) {
|
||||||
|
interceptor := qos.NewAdjustOutgoingIOTagStreamClientInterceptor()
|
||||||
|
|
||||||
|
// check context with no value
|
||||||
|
called := false
|
||||||
|
streamer := func(ctx context.Context, desc *grpc.StreamDesc, cc *grpc.ClientConn, method string, opts ...grpc.CallOption) (grpc.ClientStream, error) {
|
||||||
|
called = true
|
||||||
|
if _, ok := tagging.IOTagFromContext(ctx); ok {
|
||||||
|
return nil, fmt.Errorf("%v: expected no IO tags", errWrongTag)
|
||||||
|
}
|
||||||
|
return nil, nil
|
||||||
|
}
|
||||||
|
_, err := interceptor(context.Background(), nil, nil, "", streamer, nil)
|
||||||
|
require.True(t, called)
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
// check context for internal tag
|
||||||
|
targetTag := qos.IOTagInternal.String()
|
||||||
|
streamer = func(ctx context.Context, desc *grpc.StreamDesc, cc *grpc.ClientConn, method string, opts ...grpc.CallOption) (grpc.ClientStream, error) {
|
||||||
|
raw, ok := tagging.IOTagFromContext(ctx)
|
||||||
|
if !ok {
|
||||||
|
return nil, errNoTag
|
||||||
|
}
|
||||||
|
if raw != targetTag {
|
||||||
|
return nil, errWrongTag
|
||||||
|
}
|
||||||
|
return nil, nil
|
||||||
|
}
|
||||||
|
for _, tag := range tags {
|
||||||
|
ctx := tagging.ContextWithIOTag(context.Background(), tag.String())
|
||||||
|
_, err := interceptor(ctx, nil, nil, "", streamer, nil)
|
||||||
|
require.NoError(t, err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// check context for client tag
|
||||||
|
ctx := tagging.ContextWithIOTag(context.Background(), "")
|
||||||
|
targetTag = qos.IOTagClient.String()
|
||||||
|
_, err = interceptor(ctx, nil, nil, "", streamer, nil)
|
||||||
|
require.NoError(t, err)
|
||||||
|
}
|
|
@ -9,7 +9,6 @@ import (
|
||||||
|
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/container"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/container"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger"
|
||||||
utilTesting "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/testing"
|
|
||||||
objectV2 "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/api/object"
|
objectV2 "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/api/object"
|
||||||
containerSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container"
|
containerSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container"
|
||||||
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
|
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
|
||||||
|
@ -411,11 +410,11 @@ func TestFormatValidator_ValidateTokenIssuer(t *testing.T) {
|
||||||
},
|
},
|
||||||
),
|
),
|
||||||
WithNetmapSource(
|
WithNetmapSource(
|
||||||
&utilTesting.TestNetmapSource{
|
&testNetmapSource{
|
||||||
Netmaps: map[uint64]*netmap.NetMap{
|
netmaps: map[uint64]*netmap.NetMap{
|
||||||
curEpoch: currentEpochNM,
|
curEpoch: currentEpochNM,
|
||||||
},
|
},
|
||||||
CurrentEpoch: curEpoch,
|
currentEpoch: curEpoch,
|
||||||
},
|
},
|
||||||
),
|
),
|
||||||
WithLogger(logger.NewLoggerWrapper(zaptest.NewLogger(t))),
|
WithLogger(logger.NewLoggerWrapper(zaptest.NewLogger(t))),
|
||||||
|
@ -484,12 +483,12 @@ func TestFormatValidator_ValidateTokenIssuer(t *testing.T) {
|
||||||
},
|
},
|
||||||
),
|
),
|
||||||
WithNetmapSource(
|
WithNetmapSource(
|
||||||
&utilTesting.TestNetmapSource{
|
&testNetmapSource{
|
||||||
Netmaps: map[uint64]*netmap.NetMap{
|
netmaps: map[uint64]*netmap.NetMap{
|
||||||
curEpoch: currentEpochNM,
|
curEpoch: currentEpochNM,
|
||||||
curEpoch - 1: previousEpochNM,
|
curEpoch - 1: previousEpochNM,
|
||||||
},
|
},
|
||||||
CurrentEpoch: curEpoch,
|
currentEpoch: curEpoch,
|
||||||
},
|
},
|
||||||
),
|
),
|
||||||
WithLogger(logger.NewLoggerWrapper(zaptest.NewLogger(t))),
|
WithLogger(logger.NewLoggerWrapper(zaptest.NewLogger(t))),
|
||||||
|
@ -560,12 +559,12 @@ func TestFormatValidator_ValidateTokenIssuer(t *testing.T) {
|
||||||
},
|
},
|
||||||
),
|
),
|
||||||
WithNetmapSource(
|
WithNetmapSource(
|
||||||
&utilTesting.TestNetmapSource{
|
&testNetmapSource{
|
||||||
Netmaps: map[uint64]*netmap.NetMap{
|
netmaps: map[uint64]*netmap.NetMap{
|
||||||
curEpoch: currentEpochNM,
|
curEpoch: currentEpochNM,
|
||||||
curEpoch - 1: previousEpochNM,
|
curEpoch - 1: previousEpochNM,
|
||||||
},
|
},
|
||||||
CurrentEpoch: curEpoch,
|
currentEpoch: curEpoch,
|
||||||
},
|
},
|
||||||
),
|
),
|
||||||
WithLogger(logger.NewLoggerWrapper(zaptest.NewLogger(t))),
|
WithLogger(logger.NewLoggerWrapper(zaptest.NewLogger(t))),
|
||||||
|
@ -597,3 +596,26 @@ func (s *testContainerSource) Get(ctx context.Context, cnrID cid.ID) (*container
|
||||||
func (s *testContainerSource) DeletionInfo(context.Context, cid.ID) (*container.DelInfo, error) {
|
func (s *testContainerSource) DeletionInfo(context.Context, cid.ID) (*container.DelInfo, error) {
|
||||||
return nil, nil
|
return nil, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type testNetmapSource struct {
|
||||||
|
netmaps map[uint64]*netmap.NetMap
|
||||||
|
currentEpoch uint64
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *testNetmapSource) GetNetMap(ctx context.Context, diff uint64) (*netmap.NetMap, error) {
|
||||||
|
if diff >= s.currentEpoch {
|
||||||
|
return nil, fmt.Errorf("invalid diff")
|
||||||
|
}
|
||||||
|
return s.GetNetMapByEpoch(ctx, s.currentEpoch-diff)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *testNetmapSource) GetNetMapByEpoch(ctx context.Context, epoch uint64) (*netmap.NetMap, error) {
|
||||||
|
if nm, found := s.netmaps[epoch]; found {
|
||||||
|
return nm, nil
|
||||||
|
}
|
||||||
|
return nil, fmt.Errorf("netmap not found")
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *testNetmapSource) Epoch(ctx context.Context) (uint64, error) {
|
||||||
|
return s.currentEpoch, nil
|
||||||
|
}
|
||||||
|
|
|
@ -139,7 +139,8 @@ func (db *DB) listWithCursor(tx *bbolt.Tx, result []objectcore.Info, count int,
|
||||||
|
|
||||||
var containerID cid.ID
|
var containerID cid.ID
|
||||||
var offset []byte
|
var offset []byte
|
||||||
bc := newBucketCache()
|
graveyardBkt := tx.Bucket(graveyardBucketName)
|
||||||
|
garbageBkt := tx.Bucket(garbageBucketName)
|
||||||
|
|
||||||
rawAddr := make([]byte, cidSize, addressKeySize)
|
rawAddr := make([]byte, cidSize, addressKeySize)
|
||||||
|
|
||||||
|
@ -168,7 +169,7 @@ loop:
|
||||||
bkt := tx.Bucket(name)
|
bkt := tx.Bucket(name)
|
||||||
if bkt != nil {
|
if bkt != nil {
|
||||||
copy(rawAddr, cidRaw)
|
copy(rawAddr, cidRaw)
|
||||||
result, offset, cursor, err = selectNFromBucket(bc, bkt, objType, rawAddr, containerID,
|
result, offset, cursor, err = selectNFromBucket(bkt, objType, graveyardBkt, garbageBkt, rawAddr, containerID,
|
||||||
result, count, cursor, threshold, currEpoch)
|
result, count, cursor, threshold, currEpoch)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, nil, err
|
return nil, nil, err
|
||||||
|
@ -203,10 +204,9 @@ loop:
|
||||||
|
|
||||||
// selectNFromBucket similar to selectAllFromBucket but uses cursor to find
|
// selectNFromBucket similar to selectAllFromBucket but uses cursor to find
|
||||||
// object to start selecting from. Ignores inhumed objects.
|
// object to start selecting from. Ignores inhumed objects.
|
||||||
func selectNFromBucket(
|
func selectNFromBucket(bkt *bbolt.Bucket, // main bucket
|
||||||
bc *bucketCache,
|
|
||||||
bkt *bbolt.Bucket, // main bucket
|
|
||||||
objType objectSDK.Type, // type of the objects stored in the main bucket
|
objType objectSDK.Type, // type of the objects stored in the main bucket
|
||||||
|
graveyardBkt, garbageBkt *bbolt.Bucket, // cached graveyard buckets
|
||||||
cidRaw []byte, // container ID prefix, optimization
|
cidRaw []byte, // container ID prefix, optimization
|
||||||
cnt cid.ID, // container ID
|
cnt cid.ID, // container ID
|
||||||
to []objectcore.Info, // listing result
|
to []objectcore.Info, // listing result
|
||||||
|
@ -219,6 +219,7 @@ func selectNFromBucket(
|
||||||
cursor = new(Cursor)
|
cursor = new(Cursor)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
count := len(to)
|
||||||
c := bkt.Cursor()
|
c := bkt.Cursor()
|
||||||
k, v := c.First()
|
k, v := c.First()
|
||||||
|
|
||||||
|
@ -230,7 +231,7 @@ func selectNFromBucket(
|
||||||
}
|
}
|
||||||
|
|
||||||
for ; k != nil; k, v = c.Next() {
|
for ; k != nil; k, v = c.Next() {
|
||||||
if len(to) >= limit {
|
if count >= limit {
|
||||||
break
|
break
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -240,8 +241,6 @@ func selectNFromBucket(
|
||||||
}
|
}
|
||||||
|
|
||||||
offset = k
|
offset = k
|
||||||
graveyardBkt := getGraveyardBucket(bc, bkt.Tx())
|
|
||||||
garbageBkt := getGarbageBucket(bc, bkt.Tx())
|
|
||||||
if inGraveyardWithKey(append(cidRaw, k...), graveyardBkt, garbageBkt) > 0 {
|
if inGraveyardWithKey(append(cidRaw, k...), graveyardBkt, garbageBkt) > 0 {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
@ -252,7 +251,7 @@ func selectNFromBucket(
|
||||||
}
|
}
|
||||||
|
|
||||||
expEpoch, hasExpEpoch := hasExpirationEpoch(&o)
|
expEpoch, hasExpEpoch := hasExpirationEpoch(&o)
|
||||||
if hasExpEpoch && expEpoch < currEpoch && !objectLockedWithCache(bc, bkt.Tx(), cnt, obj) {
|
if hasExpEpoch && expEpoch < currEpoch && !objectLocked(bkt.Tx(), cnt, obj) {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -274,6 +273,7 @@ func selectNFromBucket(
|
||||||
a.SetContainer(cnt)
|
a.SetContainer(cnt)
|
||||||
a.SetObject(obj)
|
a.SetObject(obj)
|
||||||
to = append(to, objectcore.Info{Address: a, Type: objType, IsLinkingObject: isLinkingObj, ECInfo: ecInfo})
|
to = append(to, objectcore.Info{Address: a, Type: objType, IsLinkingObject: isLinkingObj, ECInfo: ecInfo})
|
||||||
|
count++
|
||||||
}
|
}
|
||||||
|
|
||||||
return to, offset, cursor, nil
|
return to, offset, cursor, nil
|
||||||
|
|
|
@ -527,8 +527,7 @@ func (s *Shard) collectExpiredTombstones(ctx context.Context, e Event) {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
var release qos.ReleaseFunc
|
release, err := s.opsLimiter.ReadRequest(ctx)
|
||||||
release, err = s.opsLimiter.ReadRequest(ctx)
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Error(ctx, logs.ShardIteratorOverGraveyardFailed, zap.Error(err))
|
log.Error(ctx, logs.ShardIteratorOverGraveyardFailed, zap.Error(err))
|
||||||
s.m.RUnlock()
|
s.m.RUnlock()
|
||||||
|
|
|
@ -36,9 +36,6 @@ type Prm struct {
|
||||||
|
|
||||||
// PrependTimestamp specifies whether to prepend a timestamp in the log
|
// PrependTimestamp specifies whether to prepend a timestamp in the log
|
||||||
PrependTimestamp bool
|
PrependTimestamp bool
|
||||||
|
|
||||||
// Options for zap.Logger
|
|
||||||
Options []zap.Option
|
|
||||||
}
|
}
|
||||||
|
|
||||||
const (
|
const (
|
||||||
|
@ -106,12 +103,10 @@ func newConsoleLogger(prm Prm) (*Logger, error) {
|
||||||
c.EncoderConfig.TimeKey = ""
|
c.EncoderConfig.TimeKey = ""
|
||||||
}
|
}
|
||||||
|
|
||||||
opts := []zap.Option{
|
lZap, err := c.Build(
|
||||||
zap.AddStacktrace(zap.NewAtomicLevelAt(zap.FatalLevel)),
|
zap.AddStacktrace(zap.NewAtomicLevelAt(zap.FatalLevel)),
|
||||||
zap.AddCallerSkip(1),
|
zap.AddCallerSkip(1),
|
||||||
}
|
)
|
||||||
opts = append(opts, prm.Options...)
|
|
||||||
lZap, err := c.Build(opts...)
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
@ -155,12 +150,7 @@ func newJournaldLogger(prm Prm) (*Logger, error) {
|
||||||
c.Sampling.Thereafter,
|
c.Sampling.Thereafter,
|
||||||
samplerOpts...,
|
samplerOpts...,
|
||||||
)
|
)
|
||||||
opts := []zap.Option{
|
lZap := zap.New(samplingCore, zap.AddStacktrace(zap.NewAtomicLevelAt(zap.FatalLevel)), zap.AddCallerSkip(1))
|
||||||
zap.AddStacktrace(zap.NewAtomicLevelAt(zap.FatalLevel)),
|
|
||||||
zap.AddCallerSkip(1),
|
|
||||||
}
|
|
||||||
opts = append(opts, prm.Options...)
|
|
||||||
lZap := zap.New(samplingCore, opts...)
|
|
||||||
|
|
||||||
l := &Logger{z: lZap, lvl: lvl}
|
l := &Logger{z: lZap, lvl: lvl}
|
||||||
|
|
||||||
|
@ -171,6 +161,10 @@ func (l *Logger) Reload(prm Prm) {
|
||||||
l.lvl.SetLevel(prm.level)
|
l.lvl.SetLevel(prm.level)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (l *Logger) WithOptions(options ...zap.Option) {
|
||||||
|
l.z = l.z.WithOptions(options...)
|
||||||
|
}
|
||||||
|
|
||||||
func (l *Logger) With(fields ...zap.Field) *Logger {
|
func (l *Logger) With(fields ...zap.Field) *Logger {
|
||||||
return &Logger{z: l.z.With(fields...)}
|
return &Logger{z: l.z.With(fields...)}
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,36 +0,0 @@
|
||||||
package testing
|
|
||||||
|
|
||||||
import (
|
|
||||||
"context"
|
|
||||||
"errors"
|
|
||||||
|
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/netmap"
|
|
||||||
)
|
|
||||||
|
|
||||||
var (
|
|
||||||
errInvalidDiff = errors.New("invalid diff")
|
|
||||||
errNetmapNotFound = errors.New("netmap not found")
|
|
||||||
)
|
|
||||||
|
|
||||||
type TestNetmapSource struct {
|
|
||||||
Netmaps map[uint64]*netmap.NetMap
|
|
||||||
CurrentEpoch uint64
|
|
||||||
}
|
|
||||||
|
|
||||||
func (s *TestNetmapSource) GetNetMap(ctx context.Context, diff uint64) (*netmap.NetMap, error) {
|
|
||||||
if diff >= s.CurrentEpoch {
|
|
||||||
return nil, errInvalidDiff
|
|
||||||
}
|
|
||||||
return s.GetNetMapByEpoch(ctx, s.CurrentEpoch-diff)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (s *TestNetmapSource) GetNetMapByEpoch(_ context.Context, epoch uint64) (*netmap.NetMap, error) {
|
|
||||||
if nm, found := s.Netmaps[epoch]; found {
|
|
||||||
return nm, nil
|
|
||||||
}
|
|
||||||
return nil, errNetmapNotFound
|
|
||||||
}
|
|
||||||
|
|
||||||
func (s *TestNetmapSource) Epoch(context.Context) (uint64, error) {
|
|
||||||
return s.CurrentEpoch, nil
|
|
||||||
}
|
|
Loading…
Add table
Reference in a new issue