forked from TrueCloudLab/frostfs-node
Compare commits
31 commits
feat/add-g
...
master
Author | SHA1 | Date | |
---|---|---|---|
e142d25fac | |||
bd1c18e117 | |||
b27f7d1d17 | |||
3cd8080232 | |||
a11b54ca15 | |||
b112a92408 | |||
19ca907223 | |||
f62d81e26a | |||
27899598dc | |||
bc6cc9ae2a | |||
6e1576cfdb | |||
a5bae6c5af | |||
5a13830a94 | |||
dcb2b23a7d | |||
115aae7c34 | |||
12a0537a7a | |||
30d4692c3e | |||
2254c8aff5 | |||
d432bebef4 | |||
d144abc977 | |||
a2053870e2 | |||
d00c606fee | |||
60446bb668 | |||
bd8ab2d84a | |||
bce2f7bef0 | |||
c2c05e2228 | |||
0a38571a10 | |||
632bd8e38d | |||
3bbee1b554 | |||
9358938222 | |||
5470b205fd |
31 changed files with 582 additions and 312 deletions
8
.ci/Jenkinsfile
vendored
8
.ci/Jenkinsfile
vendored
|
@ -78,6 +78,10 @@ async {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
|
||||||
|
|
||||||
// TODO: dco check
|
task('dco') {
|
||||||
|
container('git.frostfs.info/truecloudlab/commit-check:master') {
|
||||||
|
sh 'FROM=pull_request_target commit-check'
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
|
@ -6,6 +6,7 @@ import (
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-contract/nns"
|
"git.frostfs.info/TrueCloudLab/frostfs-contract/nns"
|
||||||
|
nns2 "git.frostfs.info/TrueCloudLab/frostfs-contract/rpcclient/nns"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-adm/internal/commonflags"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-adm/internal/commonflags"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-adm/internal/modules/config"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-adm/internal/modules/config"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-adm/internal/modules/morph/constants"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-adm/internal/modules/morph/constants"
|
||||||
|
@ -13,9 +14,7 @@ import (
|
||||||
"github.com/nspcc-dev/neo-go/pkg/core/native/nativenames"
|
"github.com/nspcc-dev/neo-go/pkg/core/native/nativenames"
|
||||||
"github.com/nspcc-dev/neo-go/pkg/crypto/keys"
|
"github.com/nspcc-dev/neo-go/pkg/crypto/keys"
|
||||||
"github.com/nspcc-dev/neo-go/pkg/encoding/address"
|
"github.com/nspcc-dev/neo-go/pkg/encoding/address"
|
||||||
"github.com/nspcc-dev/neo-go/pkg/rpcclient"
|
|
||||||
"github.com/nspcc-dev/neo-go/pkg/rpcclient/invoker"
|
"github.com/nspcc-dev/neo-go/pkg/rpcclient/invoker"
|
||||||
nns2 "github.com/nspcc-dev/neo-go/pkg/rpcclient/nns"
|
|
||||||
"github.com/nspcc-dev/neo-go/pkg/rpcclient/unwrap"
|
"github.com/nspcc-dev/neo-go/pkg/rpcclient/unwrap"
|
||||||
"github.com/nspcc-dev/neo-go/pkg/smartcontract/trigger"
|
"github.com/nspcc-dev/neo-go/pkg/smartcontract/trigger"
|
||||||
"github.com/nspcc-dev/neo-go/pkg/util"
|
"github.com/nspcc-dev/neo-go/pkg/util"
|
||||||
|
@ -187,19 +186,9 @@ func NNSResolveKey(inv *invoker.Invoker, nnsHash util.Uint160, domain string) (*
|
||||||
}
|
}
|
||||||
|
|
||||||
func NNSIsAvailable(c Client, nnsHash util.Uint160, name string) (bool, error) {
|
func NNSIsAvailable(c Client, nnsHash util.Uint160, name string) (bool, error) {
|
||||||
switch c.(type) {
|
inv := invoker.New(c, nil)
|
||||||
case *rpcclient.Client:
|
reader := nns2.NewReader(inv, nnsHash)
|
||||||
inv := invoker.New(c, nil)
|
return reader.IsAvailable(name)
|
||||||
reader := nns2.NewReader(inv, nnsHash)
|
|
||||||
return reader.IsAvailable(name)
|
|
||||||
default:
|
|
||||||
b, err := unwrap.Bool(InvokeFunction(c, nnsHash, "isAvailable", []any{name}, nil))
|
|
||||||
if err != nil {
|
|
||||||
return false, fmt.Errorf("`isAvailable`: invalid response: %w", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
return b, nil
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func CheckNotaryEnabled(c Client) error {
|
func CheckNotaryEnabled(c Client) error {
|
||||||
|
|
|
@ -40,6 +40,8 @@ type ClientContext struct {
|
||||||
CommitteeAct *actor.Actor // committee actor with the Global witness scope
|
CommitteeAct *actor.Actor // committee actor with the Global witness scope
|
||||||
ReadOnlyInvoker *invoker.Invoker // R/O contract invoker, does not contain any signer
|
ReadOnlyInvoker *invoker.Invoker // R/O contract invoker, does not contain any signer
|
||||||
SentTxs []HashVUBPair
|
SentTxs []HashVUBPair
|
||||||
|
|
||||||
|
AwaitDisabled bool
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewRemoteClient(v *viper.Viper) (Client, error) {
|
func NewRemoteClient(v *viper.Viper) (Client, error) {
|
||||||
|
@ -120,7 +122,7 @@ func (c *ClientContext) SendTx(tx *transaction.Transaction, cmd *cobra.Command,
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *ClientContext) AwaitTx(cmd *cobra.Command) error {
|
func (c *ClientContext) AwaitTx(cmd *cobra.Command) error {
|
||||||
if len(c.SentTxs) == 0 {
|
if len(c.SentTxs) == 0 || c.AwaitDisabled {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -39,6 +39,7 @@ func initializeSideChainCmd(cmd *cobra.Command, _ []string) error {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
initCtx.AwaitDisabled = true
|
||||||
cmd.Println("Stage 4.1: Transfer GAS to proxy contract.")
|
cmd.Println("Stage 4.1: Transfer GAS to proxy contract.")
|
||||||
if err := transferGASToProxy(initCtx); err != nil {
|
if err := transferGASToProxy(initCtx); err != nil {
|
||||||
return err
|
return err
|
||||||
|
@ -55,5 +56,10 @@ func initializeSideChainCmd(cmd *cobra.Command, _ []string) error {
|
||||||
}
|
}
|
||||||
|
|
||||||
cmd.Println("Stage 7: set addresses in NNS.")
|
cmd.Println("Stage 7: set addresses in NNS.")
|
||||||
return setNNS(initCtx)
|
if err := setNNS(initCtx); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
initCtx.AwaitDisabled = false
|
||||||
|
return initCtx.AwaitTx()
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,7 +1,6 @@
|
||||||
package initialize
|
package initialize
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"errors"
|
|
||||||
"fmt"
|
"fmt"
|
||||||
"math/big"
|
"math/big"
|
||||||
|
|
||||||
|
@ -11,11 +10,8 @@ import (
|
||||||
"github.com/nspcc-dev/neo-go/pkg/core/state"
|
"github.com/nspcc-dev/neo-go/pkg/core/state"
|
||||||
"github.com/nspcc-dev/neo-go/pkg/core/transaction"
|
"github.com/nspcc-dev/neo-go/pkg/core/transaction"
|
||||||
"github.com/nspcc-dev/neo-go/pkg/io"
|
"github.com/nspcc-dev/neo-go/pkg/io"
|
||||||
"github.com/nspcc-dev/neo-go/pkg/rpcclient"
|
|
||||||
"github.com/nspcc-dev/neo-go/pkg/rpcclient/actor"
|
"github.com/nspcc-dev/neo-go/pkg/rpcclient/actor"
|
||||||
"github.com/nspcc-dev/neo-go/pkg/rpcclient/invoker"
|
|
||||||
"github.com/nspcc-dev/neo-go/pkg/rpcclient/neo"
|
"github.com/nspcc-dev/neo-go/pkg/rpcclient/neo"
|
||||||
"github.com/nspcc-dev/neo-go/pkg/rpcclient/nep17"
|
|
||||||
"github.com/nspcc-dev/neo-go/pkg/rpcclient/unwrap"
|
"github.com/nspcc-dev/neo-go/pkg/rpcclient/unwrap"
|
||||||
"github.com/nspcc-dev/neo-go/pkg/smartcontract/callflag"
|
"github.com/nspcc-dev/neo-go/pkg/smartcontract/callflag"
|
||||||
"github.com/nspcc-dev/neo-go/pkg/util"
|
"github.com/nspcc-dev/neo-go/pkg/util"
|
||||||
|
@ -30,7 +26,8 @@ const (
|
||||||
)
|
)
|
||||||
|
|
||||||
func registerCandidateRange(c *helper.InitializeContext, start, end int) error {
|
func registerCandidateRange(c *helper.InitializeContext, start, end int) error {
|
||||||
regPrice, err := getCandidateRegisterPrice(c)
|
reader := neo.NewReader(c.ReadOnlyInvoker)
|
||||||
|
regPrice, err := reader.GetRegisterPrice()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("can't fetch registration price: %w", err)
|
return fmt.Errorf("can't fetch registration price: %w", err)
|
||||||
}
|
}
|
||||||
|
@ -116,7 +113,7 @@ func registerCandidates(c *helper.InitializeContext) error {
|
||||||
func transferNEOToAlphabetContracts(c *helper.InitializeContext) error {
|
func transferNEOToAlphabetContracts(c *helper.InitializeContext) error {
|
||||||
neoHash := neo.Hash
|
neoHash := neo.Hash
|
||||||
|
|
||||||
ok, err := transferNEOFinished(c, neoHash)
|
ok, err := transferNEOFinished(c)
|
||||||
if ok || err != nil {
|
if ok || err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
@ -139,33 +136,8 @@ func transferNEOToAlphabetContracts(c *helper.InitializeContext) error {
|
||||||
return c.AwaitTx()
|
return c.AwaitTx()
|
||||||
}
|
}
|
||||||
|
|
||||||
func transferNEOFinished(c *helper.InitializeContext, neoHash util.Uint160) (bool, error) {
|
func transferNEOFinished(c *helper.InitializeContext) (bool, error) {
|
||||||
r := nep17.NewReader(c.ReadOnlyInvoker, neoHash)
|
r := neo.NewReader(c.ReadOnlyInvoker)
|
||||||
bal, err := r.BalanceOf(c.CommitteeAcc.Contract.ScriptHash())
|
bal, err := r.BalanceOf(c.CommitteeAcc.Contract.ScriptHash())
|
||||||
return bal.Cmp(big.NewInt(native.NEOTotalSupply)) == -1, err
|
return bal.Cmp(big.NewInt(native.NEOTotalSupply)) == -1, err
|
||||||
}
|
}
|
||||||
|
|
||||||
var errGetPriceInvalid = errors.New("`getRegisterPrice`: invalid response")
|
|
||||||
|
|
||||||
func getCandidateRegisterPrice(c *helper.InitializeContext) (int64, error) {
|
|
||||||
switch c.Client.(type) {
|
|
||||||
case *rpcclient.Client:
|
|
||||||
inv := invoker.New(c.Client, nil)
|
|
||||||
reader := neo.NewReader(inv)
|
|
||||||
return reader.GetRegisterPrice()
|
|
||||||
default:
|
|
||||||
neoHash := neo.Hash
|
|
||||||
res, err := helper.InvokeFunction(c.Client, neoHash, "getRegisterPrice", nil, nil)
|
|
||||||
if err != nil {
|
|
||||||
return 0, err
|
|
||||||
}
|
|
||||||
if len(res.Stack) == 0 {
|
|
||||||
return 0, errGetPriceInvalid
|
|
||||||
}
|
|
||||||
bi, err := res.Stack[0].TryInteger()
|
|
||||||
if err != nil || !bi.IsInt64() {
|
|
||||||
return 0, errGetPriceInvalid
|
|
||||||
}
|
|
||||||
return bi.Int64(), nil
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
|
@ -2,17 +2,19 @@ package tree
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
|
"crypto/tls"
|
||||||
"fmt"
|
"fmt"
|
||||||
"strings"
|
|
||||||
|
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-cli/internal/common"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-cli/internal/common"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-cli/internal/commonflags"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-cli/internal/commonflags"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/network"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/network"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/tree"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/tree"
|
||||||
tracing "git.frostfs.info/TrueCloudLab/frostfs-observability/tracing/grpc"
|
tracing "git.frostfs.info/TrueCloudLab/frostfs-observability/tracing/grpc"
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/api/rpc/client"
|
||||||
"github.com/spf13/cobra"
|
"github.com/spf13/cobra"
|
||||||
"github.com/spf13/viper"
|
"github.com/spf13/viper"
|
||||||
"google.golang.org/grpc"
|
"google.golang.org/grpc"
|
||||||
|
"google.golang.org/grpc/credentials"
|
||||||
"google.golang.org/grpc/credentials/insecure"
|
"google.golang.org/grpc/credentials/insecure"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -31,6 +33,16 @@ func _client() (tree.TreeServiceClient, error) {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
host, isTLS, err := client.ParseURI(netAddr.URIAddr())
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
creds := insecure.NewCredentials()
|
||||||
|
if isTLS {
|
||||||
|
creds = credentials.NewTLS(&tls.Config{})
|
||||||
|
}
|
||||||
|
|
||||||
opts := []grpc.DialOption{
|
opts := []grpc.DialOption{
|
||||||
grpc.WithChainUnaryInterceptor(
|
grpc.WithChainUnaryInterceptor(
|
||||||
tracing.NewUnaryClientInterceptor(),
|
tracing.NewUnaryClientInterceptor(),
|
||||||
|
@ -40,13 +52,10 @@ func _client() (tree.TreeServiceClient, error) {
|
||||||
),
|
),
|
||||||
grpc.WithDefaultCallOptions(grpc.WaitForReady(true)),
|
grpc.WithDefaultCallOptions(grpc.WaitForReady(true)),
|
||||||
grpc.WithDisableServiceConfig(),
|
grpc.WithDisableServiceConfig(),
|
||||||
|
grpc.WithTransportCredentials(creds),
|
||||||
}
|
}
|
||||||
|
|
||||||
if !strings.HasPrefix(netAddr.URIAddr(), "grpcs:") {
|
cc, err := grpc.NewClient(host, opts...)
|
||||||
opts = append(opts, grpc.WithTransportCredentials(insecure.NewCredentials()))
|
|
||||||
}
|
|
||||||
|
|
||||||
cc, err := grpc.NewClient(netAddr.URIAddr(), opts...)
|
|
||||||
return tree.NewTreeServiceClient(cc), err
|
return tree.NewTreeServiceClient(cc), err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -108,6 +108,7 @@ type applicationConfiguration struct {
|
||||||
level string
|
level string
|
||||||
destination string
|
destination string
|
||||||
timestamp bool
|
timestamp bool
|
||||||
|
options []zap.Option
|
||||||
}
|
}
|
||||||
|
|
||||||
ObjectCfg struct {
|
ObjectCfg struct {
|
||||||
|
@ -232,6 +233,14 @@ func (a *applicationConfiguration) readConfig(c *config.Config) error {
|
||||||
a.LoggerCfg.level = loggerconfig.Level(c)
|
a.LoggerCfg.level = loggerconfig.Level(c)
|
||||||
a.LoggerCfg.destination = loggerconfig.Destination(c)
|
a.LoggerCfg.destination = loggerconfig.Destination(c)
|
||||||
a.LoggerCfg.timestamp = loggerconfig.Timestamp(c)
|
a.LoggerCfg.timestamp = loggerconfig.Timestamp(c)
|
||||||
|
var opts []zap.Option
|
||||||
|
if loggerconfig.ToLokiConfig(c).Enabled {
|
||||||
|
opts = []zap.Option{zap.WrapCore(func(core zapcore.Core) zapcore.Core {
|
||||||
|
lokiCore := lokicore.New(core, loggerconfig.ToLokiConfig(c))
|
||||||
|
return lokiCore
|
||||||
|
})}
|
||||||
|
}
|
||||||
|
a.LoggerCfg.options = opts
|
||||||
|
|
||||||
// Object
|
// Object
|
||||||
|
|
||||||
|
@ -718,12 +727,6 @@ func initCfg(appCfg *config.Config) *cfg {
|
||||||
logPrm.SamplingHook = c.metricsCollector.LogMetrics().GetSamplingHook()
|
logPrm.SamplingHook = c.metricsCollector.LogMetrics().GetSamplingHook()
|
||||||
log, err := logger.NewLogger(logPrm)
|
log, err := logger.NewLogger(logPrm)
|
||||||
fatalOnErr(err)
|
fatalOnErr(err)
|
||||||
if loggerconfig.ToLokiConfig(appCfg).Enabled {
|
|
||||||
log.WithOptions(zap.WrapCore(func(core zapcore.Core) zapcore.Core {
|
|
||||||
lokiCore := lokicore.New(core, loggerconfig.ToLokiConfig(appCfg))
|
|
||||||
return lokiCore
|
|
||||||
}))
|
|
||||||
}
|
|
||||||
|
|
||||||
c.internals = initInternals(appCfg, log)
|
c.internals = initInternals(appCfg, log)
|
||||||
|
|
||||||
|
@ -1090,6 +1093,7 @@ func (c *cfg) loggerPrm() (logger.Prm, error) {
|
||||||
return logger.Prm{}, errors.New("incorrect log destination format: " + c.LoggerCfg.destination)
|
return logger.Prm{}, errors.New("incorrect log destination format: " + c.LoggerCfg.destination)
|
||||||
}
|
}
|
||||||
prm.PrependTimestamp = c.LoggerCfg.timestamp
|
prm.PrependTimestamp = c.LoggerCfg.timestamp
|
||||||
|
prm.Options = c.LoggerCfg.options
|
||||||
|
|
||||||
return prm, nil
|
return prm, nil
|
||||||
}
|
}
|
||||||
|
|
|
@ -168,9 +168,10 @@ func TestEngineSection(t *testing.T) {
|
||||||
LimitOps: toPtr(25000),
|
LimitOps: toPtr(25000),
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
Tag: "policer",
|
Tag: "policer",
|
||||||
Weight: toPtr(5),
|
Weight: toPtr(5),
|
||||||
LimitOps: toPtr(25000),
|
LimitOps: toPtr(25000),
|
||||||
|
Prohibited: true,
|
||||||
},
|
},
|
||||||
})
|
})
|
||||||
require.ElementsMatch(t, writeLimits.Tags,
|
require.ElementsMatch(t, writeLimits.Tags,
|
||||||
|
|
|
@ -84,6 +84,7 @@ type IOTagConfig struct {
|
||||||
Weight *float64
|
Weight *float64
|
||||||
LimitOps *float64
|
LimitOps *float64
|
||||||
ReservedOps *float64
|
ReservedOps *float64
|
||||||
|
Prohibited bool
|
||||||
}
|
}
|
||||||
|
|
||||||
func tags(c *config.Config) []IOTagConfig {
|
func tags(c *config.Config) []IOTagConfig {
|
||||||
|
@ -119,6 +120,13 @@ func tags(c *config.Config) []IOTagConfig {
|
||||||
tagConfig.ReservedOps = &r
|
tagConfig.ReservedOps = &r
|
||||||
}
|
}
|
||||||
|
|
||||||
|
v = c.Value(strconv.Itoa(i) + ".prohibited")
|
||||||
|
if v != nil {
|
||||||
|
r, err := cast.ToBoolE(v)
|
||||||
|
panicOnErr(err)
|
||||||
|
tagConfig.Prohibited = r
|
||||||
|
}
|
||||||
|
|
||||||
result = append(result, tagConfig)
|
result = append(result, tagConfig)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -43,6 +43,9 @@ func initQoSService(c *cfg) {
|
||||||
func (s *cfgQoSService) AdjustIncomingTag(ctx context.Context, requestSignPublicKey []byte) context.Context {
|
func (s *cfgQoSService) AdjustIncomingTag(ctx context.Context, requestSignPublicKey []byte) context.Context {
|
||||||
rawTag, defined := qosTagging.IOTagFromContext(ctx)
|
rawTag, defined := qosTagging.IOTagFromContext(ctx)
|
||||||
if !defined {
|
if !defined {
|
||||||
|
if s.isInternalIOTagPublicKey(ctx, requestSignPublicKey) {
|
||||||
|
return qosTagging.ContextWithIOTag(ctx, qos.IOTagInternal.String())
|
||||||
|
}
|
||||||
return qosTagging.ContextWithIOTag(ctx, qos.IOTagClient.String())
|
return qosTagging.ContextWithIOTag(ctx, qos.IOTagClient.String())
|
||||||
}
|
}
|
||||||
ioTag, err := qos.FromRawString(rawTag)
|
ioTag, err := qos.FromRawString(rawTag)
|
||||||
|
@ -73,20 +76,8 @@ func (s *cfgQoSService) AdjustIncomingTag(ctx context.Context, requestSignPublic
|
||||||
s.logger.Debug(ctx, logs.FailedToValidateIncomingIOTag)
|
s.logger.Debug(ctx, logs.FailedToValidateIncomingIOTag)
|
||||||
return qosTagging.ContextWithIOTag(ctx, qos.IOTagClient.String())
|
return qosTagging.ContextWithIOTag(ctx, qos.IOTagClient.String())
|
||||||
case qos.IOTagInternal:
|
case qos.IOTagInternal:
|
||||||
for _, pk := range s.allowedInternalPubs {
|
if s.isInternalIOTagPublicKey(ctx, requestSignPublicKey) {
|
||||||
if bytes.Equal(pk, requestSignPublicKey) {
|
return ctx
|
||||||
return ctx
|
|
||||||
}
|
|
||||||
}
|
|
||||||
nm, err := s.netmapSource.GetNetMap(ctx, 0)
|
|
||||||
if err != nil {
|
|
||||||
s.logger.Debug(ctx, logs.FailedToGetNetmapToAdjustIOTag, zap.Error(err))
|
|
||||||
return qosTagging.ContextWithIOTag(ctx, qos.IOTagClient.String())
|
|
||||||
}
|
|
||||||
for _, node := range nm.Nodes() {
|
|
||||||
if bytes.Equal(node.PublicKey(), requestSignPublicKey) {
|
|
||||||
return ctx
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
s.logger.Debug(ctx, logs.FailedToValidateIncomingIOTag)
|
s.logger.Debug(ctx, logs.FailedToValidateIncomingIOTag)
|
||||||
return qosTagging.ContextWithIOTag(ctx, qos.IOTagClient.String())
|
return qosTagging.ContextWithIOTag(ctx, qos.IOTagClient.String())
|
||||||
|
@ -95,3 +86,23 @@ func (s *cfgQoSService) AdjustIncomingTag(ctx context.Context, requestSignPublic
|
||||||
return qosTagging.ContextWithIOTag(ctx, qos.IOTagClient.String())
|
return qosTagging.ContextWithIOTag(ctx, qos.IOTagClient.String())
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (s *cfgQoSService) isInternalIOTagPublicKey(ctx context.Context, publicKey []byte) bool {
|
||||||
|
for _, pk := range s.allowedInternalPubs {
|
||||||
|
if bytes.Equal(pk, publicKey) {
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
}
|
||||||
|
nm, err := s.netmapSource.GetNetMap(ctx, 0)
|
||||||
|
if err != nil {
|
||||||
|
s.logger.Debug(ctx, logs.FailedToGetNetmapToAdjustIOTag, zap.Error(err))
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
for _, node := range nm.Nodes() {
|
||||||
|
if bytes.Equal(node.PublicKey(), publicKey) {
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
|
226
cmd/frostfs-node/qos_test.go
Normal file
226
cmd/frostfs-node/qos_test.go
Normal file
|
@ -0,0 +1,226 @@
|
||||||
|
package main
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"testing"
|
||||||
|
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/qos"
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger/test"
|
||||||
|
utilTesting "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/testing"
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-qos/tagging"
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/netmap"
|
||||||
|
"github.com/nspcc-dev/neo-go/pkg/crypto/keys"
|
||||||
|
"github.com/stretchr/testify/require"
|
||||||
|
)
|
||||||
|
|
||||||
|
func TestQoSService_Client(t *testing.T) {
|
||||||
|
t.Parallel()
|
||||||
|
s, pk := testQoSServicePrepare(t)
|
||||||
|
t.Run("IO tag client defined", func(t *testing.T) {
|
||||||
|
ctx := tagging.ContextWithIOTag(context.Background(), qos.IOTagClient.String())
|
||||||
|
ctx = s.AdjustIncomingTag(ctx, pk.Request)
|
||||||
|
tag, ok := tagging.IOTagFromContext(ctx)
|
||||||
|
require.True(t, ok)
|
||||||
|
require.Equal(t, qos.IOTagClient.String(), tag)
|
||||||
|
})
|
||||||
|
t.Run("no IO tag defined, signed with unknown key", func(t *testing.T) {
|
||||||
|
ctx := s.AdjustIncomingTag(context.Background(), pk.Request)
|
||||||
|
tag, ok := tagging.IOTagFromContext(ctx)
|
||||||
|
require.True(t, ok)
|
||||||
|
require.Equal(t, qos.IOTagClient.String(), tag)
|
||||||
|
})
|
||||||
|
t.Run("no IO tag defined, signed with allowed critical key", func(t *testing.T) {
|
||||||
|
ctx := s.AdjustIncomingTag(context.Background(), pk.Critical)
|
||||||
|
tag, ok := tagging.IOTagFromContext(ctx)
|
||||||
|
require.True(t, ok)
|
||||||
|
require.Equal(t, qos.IOTagClient.String(), tag)
|
||||||
|
})
|
||||||
|
t.Run("unknown IO tag, signed with unknown key", func(t *testing.T) {
|
||||||
|
ctx := tagging.ContextWithIOTag(context.Background(), "some IO tag we don't know")
|
||||||
|
ctx = s.AdjustIncomingTag(ctx, pk.Request)
|
||||||
|
tag, ok := tagging.IOTagFromContext(ctx)
|
||||||
|
require.True(t, ok)
|
||||||
|
require.Equal(t, qos.IOTagClient.String(), tag)
|
||||||
|
})
|
||||||
|
t.Run("unknown IO tag, signed with netmap key", func(t *testing.T) {
|
||||||
|
ctx := tagging.ContextWithIOTag(context.Background(), "some IO tag we don't know")
|
||||||
|
ctx = s.AdjustIncomingTag(ctx, pk.NetmapNode)
|
||||||
|
tag, ok := tagging.IOTagFromContext(ctx)
|
||||||
|
require.True(t, ok)
|
||||||
|
require.Equal(t, qos.IOTagClient.String(), tag)
|
||||||
|
})
|
||||||
|
t.Run("unknown IO tag, signed with allowed internal key", func(t *testing.T) {
|
||||||
|
ctx := tagging.ContextWithIOTag(context.Background(), "some IO tag we don't know")
|
||||||
|
ctx = s.AdjustIncomingTag(ctx, pk.Internal)
|
||||||
|
tag, ok := tagging.IOTagFromContext(ctx)
|
||||||
|
require.True(t, ok)
|
||||||
|
require.Equal(t, qos.IOTagClient.String(), tag)
|
||||||
|
})
|
||||||
|
t.Run("unknown IO tag, signed with allowed critical key", func(t *testing.T) {
|
||||||
|
ctx := tagging.ContextWithIOTag(context.Background(), "some IO tag we don't know")
|
||||||
|
ctx = s.AdjustIncomingTag(ctx, pk.Critical)
|
||||||
|
tag, ok := tagging.IOTagFromContext(ctx)
|
||||||
|
require.True(t, ok)
|
||||||
|
require.Equal(t, qos.IOTagClient.String(), tag)
|
||||||
|
})
|
||||||
|
t.Run("IO tag internal defined, signed with unknown key", func(t *testing.T) {
|
||||||
|
ctx := tagging.ContextWithIOTag(context.Background(), qos.IOTagInternal.String())
|
||||||
|
ctx = s.AdjustIncomingTag(ctx, pk.Request)
|
||||||
|
tag, ok := tagging.IOTagFromContext(ctx)
|
||||||
|
require.True(t, ok)
|
||||||
|
require.Equal(t, qos.IOTagClient.String(), tag)
|
||||||
|
})
|
||||||
|
t.Run("IO tag internal defined, signed with allowed critical key", func(t *testing.T) {
|
||||||
|
ctx := tagging.ContextWithIOTag(context.Background(), qos.IOTagInternal.String())
|
||||||
|
ctx = s.AdjustIncomingTag(ctx, pk.Critical)
|
||||||
|
tag, ok := tagging.IOTagFromContext(ctx)
|
||||||
|
require.True(t, ok)
|
||||||
|
require.Equal(t, qos.IOTagClient.String(), tag)
|
||||||
|
})
|
||||||
|
t.Run("IO tag critical defined, signed with unknown key", func(t *testing.T) {
|
||||||
|
ctx := tagging.ContextWithIOTag(context.Background(), qos.IOTagCritical.String())
|
||||||
|
ctx = s.AdjustIncomingTag(ctx, pk.Request)
|
||||||
|
tag, ok := tagging.IOTagFromContext(ctx)
|
||||||
|
require.True(t, ok)
|
||||||
|
require.Equal(t, qos.IOTagClient.String(), tag)
|
||||||
|
})
|
||||||
|
t.Run("IO tag critical defined, signed with allowed internal key", func(t *testing.T) {
|
||||||
|
ctx := tagging.ContextWithIOTag(context.Background(), qos.IOTagCritical.String())
|
||||||
|
ctx = s.AdjustIncomingTag(ctx, pk.Internal)
|
||||||
|
tag, ok := tagging.IOTagFromContext(ctx)
|
||||||
|
require.True(t, ok)
|
||||||
|
require.Equal(t, qos.IOTagClient.String(), tag)
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestQoSService_Internal(t *testing.T) {
|
||||||
|
t.Parallel()
|
||||||
|
s, pk := testQoSServicePrepare(t)
|
||||||
|
t.Run("IO tag internal defined, signed with netmap key", func(t *testing.T) {
|
||||||
|
ctx := tagging.ContextWithIOTag(context.Background(), qos.IOTagInternal.String())
|
||||||
|
ctx = s.AdjustIncomingTag(ctx, pk.NetmapNode)
|
||||||
|
tag, ok := tagging.IOTagFromContext(ctx)
|
||||||
|
require.True(t, ok)
|
||||||
|
require.Equal(t, qos.IOTagInternal.String(), tag)
|
||||||
|
})
|
||||||
|
t.Run("IO tag internal defined, signed with allowed internal key", func(t *testing.T) {
|
||||||
|
ctx := tagging.ContextWithIOTag(context.Background(), qos.IOTagInternal.String())
|
||||||
|
ctx = s.AdjustIncomingTag(ctx, pk.Internal)
|
||||||
|
tag, ok := tagging.IOTagFromContext(ctx)
|
||||||
|
require.True(t, ok)
|
||||||
|
require.Equal(t, qos.IOTagInternal.String(), tag)
|
||||||
|
})
|
||||||
|
t.Run("no IO tag defined, signed with netmap key", func(t *testing.T) {
|
||||||
|
ctx := s.AdjustIncomingTag(context.Background(), pk.NetmapNode)
|
||||||
|
tag, ok := tagging.IOTagFromContext(ctx)
|
||||||
|
require.True(t, ok)
|
||||||
|
require.Equal(t, qos.IOTagInternal.String(), tag)
|
||||||
|
})
|
||||||
|
t.Run("no IO tag defined, signed with allowed internal key", func(t *testing.T) {
|
||||||
|
ctx := s.AdjustIncomingTag(context.Background(), pk.Internal)
|
||||||
|
tag, ok := tagging.IOTagFromContext(ctx)
|
||||||
|
require.True(t, ok)
|
||||||
|
require.Equal(t, qos.IOTagInternal.String(), tag)
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestQoSService_Critical(t *testing.T) {
|
||||||
|
t.Parallel()
|
||||||
|
s, pk := testQoSServicePrepare(t)
|
||||||
|
t.Run("IO tag critical defined, signed with netmap key", func(t *testing.T) {
|
||||||
|
ctx := tagging.ContextWithIOTag(context.Background(), qos.IOTagCritical.String())
|
||||||
|
ctx = s.AdjustIncomingTag(ctx, pk.NetmapNode)
|
||||||
|
tag, ok := tagging.IOTagFromContext(ctx)
|
||||||
|
require.True(t, ok)
|
||||||
|
require.Equal(t, qos.IOTagCritical.String(), tag)
|
||||||
|
})
|
||||||
|
t.Run("IO tag critical defined, signed with allowed critical key", func(t *testing.T) {
|
||||||
|
ctx := tagging.ContextWithIOTag(context.Background(), qos.IOTagCritical.String())
|
||||||
|
ctx = s.AdjustIncomingTag(ctx, pk.Critical)
|
||||||
|
tag, ok := tagging.IOTagFromContext(ctx)
|
||||||
|
require.True(t, ok)
|
||||||
|
require.Equal(t, qos.IOTagCritical.String(), tag)
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestQoSService_NetmapGetError(t *testing.T) {
|
||||||
|
t.Parallel()
|
||||||
|
s, pk := testQoSServicePrepare(t)
|
||||||
|
s.netmapSource = &utilTesting.TestNetmapSource{}
|
||||||
|
t.Run("IO tag internal defined, signed with netmap key", func(t *testing.T) {
|
||||||
|
ctx := tagging.ContextWithIOTag(context.Background(), qos.IOTagInternal.String())
|
||||||
|
ctx = s.AdjustIncomingTag(ctx, pk.NetmapNode)
|
||||||
|
tag, ok := tagging.IOTagFromContext(ctx)
|
||||||
|
require.True(t, ok)
|
||||||
|
require.Equal(t, qos.IOTagClient.String(), tag)
|
||||||
|
})
|
||||||
|
t.Run("IO tag critical defined, signed with netmap key", func(t *testing.T) {
|
||||||
|
ctx := tagging.ContextWithIOTag(context.Background(), qos.IOTagCritical.String())
|
||||||
|
ctx = s.AdjustIncomingTag(ctx, pk.NetmapNode)
|
||||||
|
tag, ok := tagging.IOTagFromContext(ctx)
|
||||||
|
require.True(t, ok)
|
||||||
|
require.Equal(t, qos.IOTagClient.String(), tag)
|
||||||
|
})
|
||||||
|
t.Run("no IO tag defined, signed with netmap key", func(t *testing.T) {
|
||||||
|
ctx := s.AdjustIncomingTag(context.Background(), pk.NetmapNode)
|
||||||
|
tag, ok := tagging.IOTagFromContext(ctx)
|
||||||
|
require.True(t, ok)
|
||||||
|
require.Equal(t, qos.IOTagClient.String(), tag)
|
||||||
|
})
|
||||||
|
t.Run("unknown IO tag, signed with netmap key", func(t *testing.T) {
|
||||||
|
ctx := tagging.ContextWithIOTag(context.Background(), "some IO tag we don't know")
|
||||||
|
ctx = s.AdjustIncomingTag(ctx, pk.NetmapNode)
|
||||||
|
tag, ok := tagging.IOTagFromContext(ctx)
|
||||||
|
require.True(t, ok)
|
||||||
|
require.Equal(t, qos.IOTagClient.String(), tag)
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
func testQoSServicePrepare(t *testing.T) (*cfgQoSService, *testQoSServicePublicKeys) {
|
||||||
|
nmSigner, err := keys.NewPrivateKey()
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
reqSigner, err := keys.NewPrivateKey()
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
allowedCritSigner, err := keys.NewPrivateKey()
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
allowedIntSigner, err := keys.NewPrivateKey()
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
var node netmap.NodeInfo
|
||||||
|
node.SetPublicKey(nmSigner.PublicKey().Bytes())
|
||||||
|
nm := &netmap.NetMap{}
|
||||||
|
nm.SetEpoch(100)
|
||||||
|
nm.SetNodes([]netmap.NodeInfo{node})
|
||||||
|
|
||||||
|
return &cfgQoSService{
|
||||||
|
logger: test.NewLogger(t),
|
||||||
|
netmapSource: &utilTesting.TestNetmapSource{
|
||||||
|
Netmaps: map[uint64]*netmap.NetMap{
|
||||||
|
100: nm,
|
||||||
|
},
|
||||||
|
CurrentEpoch: 100,
|
||||||
|
},
|
||||||
|
allowedCriticalPubs: [][]byte{
|
||||||
|
allowedCritSigner.PublicKey().Bytes(),
|
||||||
|
},
|
||||||
|
allowedInternalPubs: [][]byte{
|
||||||
|
allowedIntSigner.PublicKey().Bytes(),
|
||||||
|
},
|
||||||
|
},
|
||||||
|
&testQoSServicePublicKeys{
|
||||||
|
NetmapNode: nmSigner.PublicKey().Bytes(),
|
||||||
|
Request: reqSigner.PublicKey().Bytes(),
|
||||||
|
Internal: allowedIntSigner.PublicKey().Bytes(),
|
||||||
|
Critical: allowedCritSigner.PublicKey().Bytes(),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
type testQoSServicePublicKeys struct {
|
||||||
|
NetmapNode []byte
|
||||||
|
Request []byte
|
||||||
|
Internal []byte
|
||||||
|
Critical []byte
|
||||||
|
}
|
|
@ -180,6 +180,7 @@ FROSTFS_STORAGE_SHARD_0_LIMITS_READ_TAGS_3_LIMIT_OPS=25000
|
||||||
FROSTFS_STORAGE_SHARD_0_LIMITS_READ_TAGS_4_TAG=policer
|
FROSTFS_STORAGE_SHARD_0_LIMITS_READ_TAGS_4_TAG=policer
|
||||||
FROSTFS_STORAGE_SHARD_0_LIMITS_READ_TAGS_4_WEIGHT=5
|
FROSTFS_STORAGE_SHARD_0_LIMITS_READ_TAGS_4_WEIGHT=5
|
||||||
FROSTFS_STORAGE_SHARD_0_LIMITS_READ_TAGS_4_LIMIT_OPS=25000
|
FROSTFS_STORAGE_SHARD_0_LIMITS_READ_TAGS_4_LIMIT_OPS=25000
|
||||||
|
FROSTFS_STORAGE_SHARD_0_LIMITS_READ_TAGS_4_PROHIBITED=true
|
||||||
FROSTFS_STORAGE_SHARD_0_LIMITS_WRITE_TAGS_0_TAG=internal
|
FROSTFS_STORAGE_SHARD_0_LIMITS_WRITE_TAGS_0_TAG=internal
|
||||||
FROSTFS_STORAGE_SHARD_0_LIMITS_WRITE_TAGS_0_WEIGHT=200
|
FROSTFS_STORAGE_SHARD_0_LIMITS_WRITE_TAGS_0_WEIGHT=200
|
||||||
FROSTFS_STORAGE_SHARD_0_LIMITS_WRITE_TAGS_0_LIMIT_OPS=0
|
FROSTFS_STORAGE_SHARD_0_LIMITS_WRITE_TAGS_0_LIMIT_OPS=0
|
||||||
|
|
|
@ -252,7 +252,8 @@
|
||||||
{
|
{
|
||||||
"tag": "policer",
|
"tag": "policer",
|
||||||
"weight": 5,
|
"weight": 5,
|
||||||
"limit_ops": 25000
|
"limit_ops": 25000,
|
||||||
|
"prohibited": true
|
||||||
}
|
}
|
||||||
]
|
]
|
||||||
},
|
},
|
||||||
|
|
|
@ -249,6 +249,7 @@ storage:
|
||||||
- tag: policer
|
- tag: policer
|
||||||
weight: 5
|
weight: 5
|
||||||
limit_ops: 25000
|
limit_ops: 25000
|
||||||
|
prohibited: true
|
||||||
write:
|
write:
|
||||||
max_running_ops: 1000
|
max_running_ops: 1000
|
||||||
max_waiting_ops: 100
|
max_waiting_ops: 100
|
||||||
|
|
|
@ -359,6 +359,7 @@ limits:
|
||||||
| `tag.weight` | `float` | 0 (no weight) | Weight for queries with the specified tag. Weights must be specified for all tags or not specified for any one. |
|
| `tag.weight` | `float` | 0 (no weight) | Weight for queries with the specified tag. Weights must be specified for all tags or not specified for any one. |
|
||||||
| `tag.limit_ops` | `float` | 0 (no limit) | Operations per second rate limit for queries with the specified tag. |
|
| `tag.limit_ops` | `float` | 0 (no limit) | Operations per second rate limit for queries with the specified tag. |
|
||||||
| `tag.reserved_ops` | `float` | 0 (no reserve) | Reserved operations per second rate for queries with the specified tag. |
|
| `tag.reserved_ops` | `float` | 0 (no reserve) | Reserved operations per second rate for queries with the specified tag. |
|
||||||
|
| `tag.prohibited` | `bool` | false | If true, operations with this specified tag will be prohibited. |
|
||||||
|
|
||||||
# `node` section
|
# `node` section
|
||||||
|
|
||||||
|
|
6
go.mod
6
go.mod
|
@ -6,10 +6,10 @@ require (
|
||||||
code.gitea.io/sdk/gitea v0.17.1
|
code.gitea.io/sdk/gitea v0.17.1
|
||||||
git.frostfs.info/TrueCloudLab/frostfs-contract v0.21.1
|
git.frostfs.info/TrueCloudLab/frostfs-contract v0.21.1
|
||||||
git.frostfs.info/TrueCloudLab/frostfs-crypto v0.6.0
|
git.frostfs.info/TrueCloudLab/frostfs-crypto v0.6.0
|
||||||
git.frostfs.info/TrueCloudLab/frostfs-locode-db v0.4.1-0.20240710074952-65761deb5c0d
|
git.frostfs.info/TrueCloudLab/frostfs-locode-db v0.5.2
|
||||||
git.frostfs.info/TrueCloudLab/frostfs-observability v0.0.0-20250321063246-93b681a20248
|
git.frostfs.info/TrueCloudLab/frostfs-observability v0.0.0-20250321063246-93b681a20248
|
||||||
git.frostfs.info/TrueCloudLab/frostfs-qos v0.0.0-20250320142439-32079ad7c275
|
git.frostfs.info/TrueCloudLab/frostfs-qos v0.0.0-20250331080422-b5ed0b6eff47
|
||||||
git.frostfs.info/TrueCloudLab/frostfs-sdk-go v0.0.0-20250306092416-69b0711d12d9
|
git.frostfs.info/TrueCloudLab/frostfs-sdk-go v0.0.0-20250326101739-4d36a49d3945
|
||||||
git.frostfs.info/TrueCloudLab/hrw v1.2.1
|
git.frostfs.info/TrueCloudLab/hrw v1.2.1
|
||||||
git.frostfs.info/TrueCloudLab/multinet v0.0.0-20241015075604-6cb0d80e0972
|
git.frostfs.info/TrueCloudLab/multinet v0.0.0-20241015075604-6cb0d80e0972
|
||||||
git.frostfs.info/TrueCloudLab/policy-engine v0.0.0-20240822104152-a3bc3099bd5b
|
git.frostfs.info/TrueCloudLab/policy-engine v0.0.0-20240822104152-a3bc3099bd5b
|
||||||
|
|
12
go.sum
12
go.sum
|
@ -4,14 +4,14 @@ git.frostfs.info/TrueCloudLab/frostfs-contract v0.21.1 h1:k1Qw8dWUQczfo0eVXlhrq9
|
||||||
git.frostfs.info/TrueCloudLab/frostfs-contract v0.21.1/go.mod h1:5fSm/l5xSjGWqsPUffSdboiGFUHa7y/1S0fvxzQowN8=
|
git.frostfs.info/TrueCloudLab/frostfs-contract v0.21.1/go.mod h1:5fSm/l5xSjGWqsPUffSdboiGFUHa7y/1S0fvxzQowN8=
|
||||||
git.frostfs.info/TrueCloudLab/frostfs-crypto v0.6.0 h1:FxqFDhQYYgpe41qsIHVOcdzSVCB8JNSfPG7Uk4r2oSk=
|
git.frostfs.info/TrueCloudLab/frostfs-crypto v0.6.0 h1:FxqFDhQYYgpe41qsIHVOcdzSVCB8JNSfPG7Uk4r2oSk=
|
||||||
git.frostfs.info/TrueCloudLab/frostfs-crypto v0.6.0/go.mod h1:RUIKZATQLJ+TaYQa60X2fTDwfuhMfm8Ar60bQ5fr+vU=
|
git.frostfs.info/TrueCloudLab/frostfs-crypto v0.6.0/go.mod h1:RUIKZATQLJ+TaYQa60X2fTDwfuhMfm8Ar60bQ5fr+vU=
|
||||||
git.frostfs.info/TrueCloudLab/frostfs-locode-db v0.4.1-0.20240710074952-65761deb5c0d h1:uJ/wvuMdepbkaV8XMS5uN9B0FQWMep0CttSuDZiDhq0=
|
git.frostfs.info/TrueCloudLab/frostfs-locode-db v0.5.2 h1:AovQs7bea0fLnYfldCZB88FkUgRj0QaHkJEbcWfgzvY=
|
||||||
git.frostfs.info/TrueCloudLab/frostfs-locode-db v0.4.1-0.20240710074952-65761deb5c0d/go.mod h1:7ZZq8iguY7qFsXajdHGmZd2AW4QbucyrJwhbsRfOfek=
|
git.frostfs.info/TrueCloudLab/frostfs-locode-db v0.5.2/go.mod h1:7ZZq8iguY7qFsXajdHGmZd2AW4QbucyrJwhbsRfOfek=
|
||||||
git.frostfs.info/TrueCloudLab/frostfs-observability v0.0.0-20250321063246-93b681a20248 h1:fluzML8BIIabd07LyPSjc0JAV2qymWkPiFaLrXdALLA=
|
git.frostfs.info/TrueCloudLab/frostfs-observability v0.0.0-20250321063246-93b681a20248 h1:fluzML8BIIabd07LyPSjc0JAV2qymWkPiFaLrXdALLA=
|
||||||
git.frostfs.info/TrueCloudLab/frostfs-observability v0.0.0-20250321063246-93b681a20248/go.mod h1:kbwB4v2o6RyOfCo9kEFeUDZIX3LKhmS0yXPrtvzkQ1g=
|
git.frostfs.info/TrueCloudLab/frostfs-observability v0.0.0-20250321063246-93b681a20248/go.mod h1:kbwB4v2o6RyOfCo9kEFeUDZIX3LKhmS0yXPrtvzkQ1g=
|
||||||
git.frostfs.info/TrueCloudLab/frostfs-qos v0.0.0-20250320142439-32079ad7c275 h1:WqWxCnCl2ekfjWja/CpGeF2rf4h0x199xhdnsm/j+E8=
|
git.frostfs.info/TrueCloudLab/frostfs-qos v0.0.0-20250331080422-b5ed0b6eff47 h1:O2c3VOlaGZ862hf2ZPLBMdTG6vGJzhIgDvFEFGfntzU=
|
||||||
git.frostfs.info/TrueCloudLab/frostfs-qos v0.0.0-20250320142439-32079ad7c275/go.mod h1:PCijYq4oa8vKtIEcUX6jRiszI6XAW+nBwU+T1kB4d1U=
|
git.frostfs.info/TrueCloudLab/frostfs-qos v0.0.0-20250331080422-b5ed0b6eff47/go.mod h1:PCijYq4oa8vKtIEcUX6jRiszI6XAW+nBwU+T1kB4d1U=
|
||||||
git.frostfs.info/TrueCloudLab/frostfs-sdk-go v0.0.0-20250306092416-69b0711d12d9 h1:svCl6NDAPZ/KuQPjdVKo74RkCIANesxUPM45zQZDhSw=
|
git.frostfs.info/TrueCloudLab/frostfs-sdk-go v0.0.0-20250326101739-4d36a49d3945 h1:zM2l316J55h9p30snl6vHBI/h0xmnuqZjnxIjRDtJZw=
|
||||||
git.frostfs.info/TrueCloudLab/frostfs-sdk-go v0.0.0-20250306092416-69b0711d12d9/go.mod h1:aQpPWfG8oyfJ2X+FenPTJpSRWZjwcP5/RAtkW+/VEX8=
|
git.frostfs.info/TrueCloudLab/frostfs-sdk-go v0.0.0-20250326101739-4d36a49d3945/go.mod h1:aQpPWfG8oyfJ2X+FenPTJpSRWZjwcP5/RAtkW+/VEX8=
|
||||||
git.frostfs.info/TrueCloudLab/hrw v1.2.1 h1:ccBRK21rFvY5R1WotI6LNoPlizk7qSvdfD8lNIRudVc=
|
git.frostfs.info/TrueCloudLab/hrw v1.2.1 h1:ccBRK21rFvY5R1WotI6LNoPlizk7qSvdfD8lNIRudVc=
|
||||||
git.frostfs.info/TrueCloudLab/hrw v1.2.1/go.mod h1:C1Ygde2n843yTZEQ0FP69jYiuaYV0kriLvP4zm8JuvM=
|
git.frostfs.info/TrueCloudLab/hrw v1.2.1/go.mod h1:C1Ygde2n843yTZEQ0FP69jYiuaYV0kriLvP4zm8JuvM=
|
||||||
git.frostfs.info/TrueCloudLab/multinet v0.0.0-20241015075604-6cb0d80e0972 h1:/960fWeyn2AFHwQUwDsWB3sbP6lTEnFnMzLMM6tx6N8=
|
git.frostfs.info/TrueCloudLab/multinet v0.0.0-20241015075604-6cb0d80e0972 h1:/960fWeyn2AFHwQUwDsWB3sbP6lTEnFnMzLMM6tx6N8=
|
||||||
|
|
|
@ -512,7 +512,7 @@ const (
|
||||||
FailedToUpdateMultinetConfiguration = "failed to update multinet configuration"
|
FailedToUpdateMultinetConfiguration = "failed to update multinet configuration"
|
||||||
FailedToParseIncomingIOTag = "failed to parse incoming IO tag"
|
FailedToParseIncomingIOTag = "failed to parse incoming IO tag"
|
||||||
NotSupportedIncomingIOTagReplacedWithClient = "incoming IO tag is not supported, replaced with `client`"
|
NotSupportedIncomingIOTagReplacedWithClient = "incoming IO tag is not supported, replaced with `client`"
|
||||||
FailedToGetNetmapToAdjustIOTag = "failed to get netmap to adjust IO tag, replaced with `client`"
|
FailedToGetNetmapToAdjustIOTag = "failed to get netmap to adjust IO tag"
|
||||||
FailedToValidateIncomingIOTag = "failed to validate incoming IO tag, replaced with `client`"
|
FailedToValidateIncomingIOTag = "failed to validate incoming IO tag, replaced with `client`"
|
||||||
WriteCacheFailedToAcquireRPSQuota = "writecache failed to acquire RPS quota to flush object"
|
WriteCacheFailedToAcquireRPSQuota = "writecache failed to acquire RPS quota to flush object"
|
||||||
)
|
)
|
||||||
|
|
|
@ -22,7 +22,7 @@ var (
|
||||||
errTest = errors.New("mock")
|
errTest = errors.New("mock")
|
||||||
errWrongTag = errors.New("wrong tag")
|
errWrongTag = errors.New("wrong tag")
|
||||||
errNoTag = errors.New("failed to get tag from context")
|
errNoTag = errors.New("failed to get tag from context")
|
||||||
errResExhausted = new(apistatus.ResourceExhausted)
|
errResExhausted *apistatus.ResourceExhausted
|
||||||
tags = []qos.IOTag{qos.IOTagBackground, qos.IOTagWritecache, qos.IOTagPolicer, qos.IOTagTreeSync}
|
tags = []qos.IOTag{qos.IOTagBackground, qos.IOTagWritecache, qos.IOTagPolicer, qos.IOTagTreeSync}
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -37,38 +37,36 @@ func (m *mockGRPCServerStream) Context() context.Context {
|
||||||
}
|
}
|
||||||
|
|
||||||
type limiter struct {
|
type limiter struct {
|
||||||
|
acquired bool
|
||||||
released bool
|
released bool
|
||||||
}
|
}
|
||||||
|
|
||||||
func (l *limiter) Acquire(key string) (limiting.ReleaseFunc, bool) {
|
func (l *limiter) Acquire(key string) (limiting.ReleaseFunc, bool) {
|
||||||
|
l.acquired = true
|
||||||
if key != okKey {
|
if key != okKey {
|
||||||
return nil, false
|
return nil, false
|
||||||
}
|
}
|
||||||
return func() { l.released = true }, true
|
return func() { l.released = true }, true
|
||||||
}
|
}
|
||||||
|
|
||||||
func unaryMaxActiveRPCLimiter(ctx context.Context, lim *limiter, methodName string) (bool, error) {
|
func unaryMaxActiveRPCLimiter(ctx context.Context, lim *limiter, methodName string) error {
|
||||||
interceptor := qos.NewMaxActiveRPCLimiterUnaryServerInterceptor(func() limiting.Limiter { return lim })
|
interceptor := qos.NewMaxActiveRPCLimiterUnaryServerInterceptor(func() limiting.Limiter { return lim })
|
||||||
called := false
|
|
||||||
handler := func(ctx context.Context, req any) (any, error) {
|
handler := func(ctx context.Context, req any) (any, error) {
|
||||||
called = true
|
|
||||||
return nil, errTest
|
return nil, errTest
|
||||||
}
|
}
|
||||||
_, err := interceptor(ctx, nil, &grpc.UnaryServerInfo{FullMethod: methodName}, handler)
|
_, err := interceptor(ctx, nil, &grpc.UnaryServerInfo{FullMethod: methodName}, handler)
|
||||||
return called, err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
func streamMaxActiveRPCLimiter(ctx context.Context, lim *limiter, methodName string) (bool, error) {
|
func streamMaxActiveRPCLimiter(ctx context.Context, lim *limiter, methodName string) error {
|
||||||
interceptor := qos.NewMaxActiveRPCLimiterStreamServerInterceptor(func() limiting.Limiter { return lim })
|
interceptor := qos.NewMaxActiveRPCLimiterStreamServerInterceptor(func() limiting.Limiter { return lim })
|
||||||
called := false
|
|
||||||
handler := func(srv any, stream grpc.ServerStream) error {
|
handler := func(srv any, stream grpc.ServerStream) error {
|
||||||
called = true
|
|
||||||
return errTest
|
return errTest
|
||||||
}
|
}
|
||||||
err := interceptor(nil, &mockGRPCServerStream{ctx: ctx}, &grpc.StreamServerInfo{
|
err := interceptor(nil, &mockGRPCServerStream{ctx: ctx}, &grpc.StreamServerInfo{
|
||||||
FullMethod: methodName,
|
FullMethod: methodName,
|
||||||
}, handler)
|
}, handler)
|
||||||
return called, err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
func Test_MaxActiveRPCLimiter(t *testing.T) {
|
func Test_MaxActiveRPCLimiter(t *testing.T) {
|
||||||
|
@ -76,55 +74,61 @@ func Test_MaxActiveRPCLimiter(t *testing.T) {
|
||||||
t.Run("unary fail", func(t *testing.T) {
|
t.Run("unary fail", func(t *testing.T) {
|
||||||
var lim limiter
|
var lim limiter
|
||||||
|
|
||||||
called, err := unaryMaxActiveRPCLimiter(context.Background(), &lim, "")
|
err := unaryMaxActiveRPCLimiter(context.Background(), &lim, "")
|
||||||
require.EqualError(t, err, errResExhausted.Error())
|
require.ErrorAs(t, err, &errResExhausted)
|
||||||
require.False(t, called)
|
require.True(t, lim.acquired)
|
||||||
|
require.False(t, lim.released)
|
||||||
})
|
})
|
||||||
t.Run("unary pass critical", func(t *testing.T) {
|
t.Run("unary pass critical", func(t *testing.T) {
|
||||||
var lim limiter
|
var lim limiter
|
||||||
ctx := tagging.ContextWithIOTag(context.Background(), qos.IOTagCritical.String())
|
ctx := tagging.ContextWithIOTag(context.Background(), qos.IOTagCritical.String())
|
||||||
|
|
||||||
called, err := unaryMaxActiveRPCLimiter(ctx, &lim, "")
|
err := unaryMaxActiveRPCLimiter(ctx, &lim, "")
|
||||||
require.EqualError(t, err, errTest.Error())
|
require.ErrorIs(t, err, errTest)
|
||||||
require.True(t, called)
|
require.False(t, lim.acquired)
|
||||||
require.False(t, lim.released)
|
require.False(t, lim.released)
|
||||||
})
|
})
|
||||||
t.Run("unary pass", func(t *testing.T) {
|
t.Run("unary pass", func(t *testing.T) {
|
||||||
var lim limiter
|
var lim limiter
|
||||||
|
|
||||||
called, err := unaryMaxActiveRPCLimiter(context.Background(), &lim, okKey)
|
err := unaryMaxActiveRPCLimiter(context.Background(), &lim, okKey)
|
||||||
require.EqualError(t, err, errTest.Error())
|
require.ErrorIs(t, err, errTest)
|
||||||
require.True(t, called && lim.released)
|
require.True(t, lim.acquired)
|
||||||
|
require.True(t, lim.released)
|
||||||
})
|
})
|
||||||
// StreamServerInterceptor
|
// StreamServerInterceptor
|
||||||
t.Run("stream fail", func(t *testing.T) {
|
t.Run("stream fail", func(t *testing.T) {
|
||||||
var lim limiter
|
var lim limiter
|
||||||
|
|
||||||
called, err := streamMaxActiveRPCLimiter(context.Background(), &lim, "")
|
err := streamMaxActiveRPCLimiter(context.Background(), &lim, "")
|
||||||
require.EqualError(t, err, errResExhausted.Error())
|
require.ErrorAs(t, err, &errResExhausted)
|
||||||
require.False(t, called)
|
require.True(t, lim.acquired)
|
||||||
|
require.False(t, lim.released)
|
||||||
})
|
})
|
||||||
t.Run("stream pass critical", func(t *testing.T) {
|
t.Run("stream pass critical", func(t *testing.T) {
|
||||||
var lim limiter
|
var lim limiter
|
||||||
ctx := tagging.ContextWithIOTag(context.Background(), qos.IOTagCritical.String())
|
ctx := tagging.ContextWithIOTag(context.Background(), qos.IOTagCritical.String())
|
||||||
|
|
||||||
called, err := streamMaxActiveRPCLimiter(ctx, &lim, "")
|
err := streamMaxActiveRPCLimiter(ctx, &lim, "")
|
||||||
require.EqualError(t, err, errTest.Error())
|
require.ErrorIs(t, err, errTest)
|
||||||
require.True(t, called)
|
require.False(t, lim.acquired)
|
||||||
require.False(t, lim.released)
|
require.False(t, lim.released)
|
||||||
})
|
})
|
||||||
t.Run("stream pass", func(t *testing.T) {
|
t.Run("stream pass", func(t *testing.T) {
|
||||||
var lim limiter
|
var lim limiter
|
||||||
|
|
||||||
called, err := streamMaxActiveRPCLimiter(context.Background(), &lim, okKey)
|
err := streamMaxActiveRPCLimiter(context.Background(), &lim, okKey)
|
||||||
require.EqualError(t, err, errTest.Error())
|
require.ErrorIs(t, err, errTest)
|
||||||
require.True(t, called && lim.released)
|
require.True(t, lim.acquired)
|
||||||
|
require.True(t, lim.released)
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestSetCriticalIOTagUnaryServerInterceptor_Pass(t *testing.T) {
|
func TestSetCriticalIOTagUnaryServerInterceptor_Pass(t *testing.T) {
|
||||||
interceptor := qos.NewSetCriticalIOTagUnaryServerInterceptor()
|
interceptor := qos.NewSetCriticalIOTagUnaryServerInterceptor()
|
||||||
|
called := false
|
||||||
handler := func(ctx context.Context, req any) (any, error) {
|
handler := func(ctx context.Context, req any) (any, error) {
|
||||||
|
called = true
|
||||||
if tag, ok := tagging.IOTagFromContext(ctx); ok && tag == qos.IOTagCritical.String() {
|
if tag, ok := tagging.IOTagFromContext(ctx); ok && tag == qos.IOTagCritical.String() {
|
||||||
return nil, nil
|
return nil, nil
|
||||||
}
|
}
|
||||||
|
@ -132,6 +136,7 @@ func TestSetCriticalIOTagUnaryServerInterceptor_Pass(t *testing.T) {
|
||||||
}
|
}
|
||||||
_, err := interceptor(context.Background(), nil, nil, handler)
|
_, err := interceptor(context.Background(), nil, nil, handler)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
require.True(t, called)
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestAdjustOutgoingIOTagUnaryClientInterceptor(t *testing.T) {
|
func TestAdjustOutgoingIOTagUnaryClientInterceptor(t *testing.T) {
|
||||||
|
|
|
@ -90,6 +90,7 @@ func converToSchedulingTags(limits []limits.IOTagConfig) map[string]scheduling.T
|
||||||
if l.ReservedOps != nil && *l.ReservedOps != 0 {
|
if l.ReservedOps != nil && *l.ReservedOps != 0 {
|
||||||
v.ReservedIOPS = l.ReservedOps
|
v.ReservedIOPS = l.ReservedOps
|
||||||
}
|
}
|
||||||
|
v.Prohibited = l.Prohibited
|
||||||
result[l.Tag] = v
|
result[l.Tag] = v
|
||||||
}
|
}
|
||||||
return result
|
return result
|
||||||
|
@ -164,8 +165,7 @@ func requestArrival(ctx context.Context, s scheduler, stats map[string]*stat) (R
|
||||||
rel, err := s.RequestArrival(ctx, tag)
|
rel, err := s.RequestArrival(ctx, tag)
|
||||||
stat.inProgress.Add(1)
|
stat.inProgress.Add(1)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
if errors.Is(err, scheduling.ErrMClockSchedulerRequestLimitExceeded) ||
|
if isResourceExhaustedErr(err) {
|
||||||
errors.Is(err, errSemaphoreLimitExceeded) {
|
|
||||||
stat.resourceExhausted.Add(1)
|
stat.resourceExhausted.Add(1)
|
||||||
return nil, &apistatus.ResourceExhausted{}
|
return nil, &apistatus.ResourceExhausted{}
|
||||||
}
|
}
|
||||||
|
@ -234,3 +234,9 @@ func exportMetrics(metrics Metrics, stats map[string]*stat, shardID, operation s
|
||||||
metrics.SetOperationTagCounters(shardID, operation, tag, pending, inProgress, completed, resExh)
|
metrics.SetOperationTagCounters(shardID, operation, tag, pending, inProgress, completed, resExh)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func isResourceExhaustedErr(err error) bool {
|
||||||
|
return errors.Is(err, scheduling.ErrMClockSchedulerRequestLimitExceeded) ||
|
||||||
|
errors.Is(err, errSemaphoreLimitExceeded) ||
|
||||||
|
errors.Is(err, scheduling.ErrTagRequestsProhibited)
|
||||||
|
}
|
||||||
|
|
|
@ -9,6 +9,7 @@ import (
|
||||||
|
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/container"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/container"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger"
|
||||||
|
utilTesting "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/testing"
|
||||||
objectV2 "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/api/object"
|
objectV2 "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/api/object"
|
||||||
containerSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container"
|
containerSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container"
|
||||||
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
|
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
|
||||||
|
@ -410,11 +411,11 @@ func TestFormatValidator_ValidateTokenIssuer(t *testing.T) {
|
||||||
},
|
},
|
||||||
),
|
),
|
||||||
WithNetmapSource(
|
WithNetmapSource(
|
||||||
&testNetmapSource{
|
&utilTesting.TestNetmapSource{
|
||||||
netmaps: map[uint64]*netmap.NetMap{
|
Netmaps: map[uint64]*netmap.NetMap{
|
||||||
curEpoch: currentEpochNM,
|
curEpoch: currentEpochNM,
|
||||||
},
|
},
|
||||||
currentEpoch: curEpoch,
|
CurrentEpoch: curEpoch,
|
||||||
},
|
},
|
||||||
),
|
),
|
||||||
WithLogger(logger.NewLoggerWrapper(zaptest.NewLogger(t))),
|
WithLogger(logger.NewLoggerWrapper(zaptest.NewLogger(t))),
|
||||||
|
@ -483,12 +484,12 @@ func TestFormatValidator_ValidateTokenIssuer(t *testing.T) {
|
||||||
},
|
},
|
||||||
),
|
),
|
||||||
WithNetmapSource(
|
WithNetmapSource(
|
||||||
&testNetmapSource{
|
&utilTesting.TestNetmapSource{
|
||||||
netmaps: map[uint64]*netmap.NetMap{
|
Netmaps: map[uint64]*netmap.NetMap{
|
||||||
curEpoch: currentEpochNM,
|
curEpoch: currentEpochNM,
|
||||||
curEpoch - 1: previousEpochNM,
|
curEpoch - 1: previousEpochNM,
|
||||||
},
|
},
|
||||||
currentEpoch: curEpoch,
|
CurrentEpoch: curEpoch,
|
||||||
},
|
},
|
||||||
),
|
),
|
||||||
WithLogger(logger.NewLoggerWrapper(zaptest.NewLogger(t))),
|
WithLogger(logger.NewLoggerWrapper(zaptest.NewLogger(t))),
|
||||||
|
@ -559,12 +560,12 @@ func TestFormatValidator_ValidateTokenIssuer(t *testing.T) {
|
||||||
},
|
},
|
||||||
),
|
),
|
||||||
WithNetmapSource(
|
WithNetmapSource(
|
||||||
&testNetmapSource{
|
&utilTesting.TestNetmapSource{
|
||||||
netmaps: map[uint64]*netmap.NetMap{
|
Netmaps: map[uint64]*netmap.NetMap{
|
||||||
curEpoch: currentEpochNM,
|
curEpoch: currentEpochNM,
|
||||||
curEpoch - 1: previousEpochNM,
|
curEpoch - 1: previousEpochNM,
|
||||||
},
|
},
|
||||||
currentEpoch: curEpoch,
|
CurrentEpoch: curEpoch,
|
||||||
},
|
},
|
||||||
),
|
),
|
||||||
WithLogger(logger.NewLoggerWrapper(zaptest.NewLogger(t))),
|
WithLogger(logger.NewLoggerWrapper(zaptest.NewLogger(t))),
|
||||||
|
@ -596,26 +597,3 @@ func (s *testContainerSource) Get(ctx context.Context, cnrID cid.ID) (*container
|
||||||
func (s *testContainerSource) DeletionInfo(context.Context, cid.ID) (*container.DelInfo, error) {
|
func (s *testContainerSource) DeletionInfo(context.Context, cid.ID) (*container.DelInfo, error) {
|
||||||
return nil, nil
|
return nil, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
type testNetmapSource struct {
|
|
||||||
netmaps map[uint64]*netmap.NetMap
|
|
||||||
currentEpoch uint64
|
|
||||||
}
|
|
||||||
|
|
||||||
func (s *testNetmapSource) GetNetMap(ctx context.Context, diff uint64) (*netmap.NetMap, error) {
|
|
||||||
if diff >= s.currentEpoch {
|
|
||||||
return nil, fmt.Errorf("invalid diff")
|
|
||||||
}
|
|
||||||
return s.GetNetMapByEpoch(ctx, s.currentEpoch-diff)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (s *testNetmapSource) GetNetMapByEpoch(ctx context.Context, epoch uint64) (*netmap.NetMap, error) {
|
|
||||||
if nm, found := s.netmaps[epoch]; found {
|
|
||||||
return nm, nil
|
|
||||||
}
|
|
||||||
return nil, fmt.Errorf("netmap not found")
|
|
||||||
}
|
|
||||||
|
|
||||||
func (s *testNetmapSource) Epoch(ctx context.Context) (uint64, error) {
|
|
||||||
return s.currentEpoch, nil
|
|
||||||
}
|
|
||||||
|
|
|
@ -2,8 +2,11 @@ package engine
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
|
"fmt"
|
||||||
"path/filepath"
|
"path/filepath"
|
||||||
"sync/atomic"
|
"runtime/debug"
|
||||||
|
"strings"
|
||||||
|
"sync"
|
||||||
"testing"
|
"testing"
|
||||||
|
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/qos"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/qos"
|
||||||
|
@ -157,26 +160,74 @@ func newTestStorages(root string, smallSize uint64) ([]blobstor.SubStorage, *tes
|
||||||
var _ qos.Limiter = (*testQoSLimiter)(nil)
|
var _ qos.Limiter = (*testQoSLimiter)(nil)
|
||||||
|
|
||||||
type testQoSLimiter struct {
|
type testQoSLimiter struct {
|
||||||
t testing.TB
|
t testing.TB
|
||||||
read atomic.Int64
|
quard sync.Mutex
|
||||||
write atomic.Int64
|
id int64
|
||||||
|
readStacks map[int64][]byte
|
||||||
|
writeStacks map[int64][]byte
|
||||||
}
|
}
|
||||||
|
|
||||||
func (t *testQoSLimiter) SetMetrics(qos.Metrics) {}
|
func (t *testQoSLimiter) SetMetrics(qos.Metrics) {}
|
||||||
|
|
||||||
func (t *testQoSLimiter) Close() {
|
func (t *testQoSLimiter) Close() {
|
||||||
require.Equal(t.t, int64(0), t.read.Load(), "read requests count after limiter close must be 0")
|
t.quard.Lock()
|
||||||
require.Equal(t.t, int64(0), t.write.Load(), "write requests count after limiter close must be 0")
|
defer t.quard.Unlock()
|
||||||
|
|
||||||
|
var sb strings.Builder
|
||||||
|
var seqN int
|
||||||
|
for _, stack := range t.readStacks {
|
||||||
|
seqN++
|
||||||
|
sb.WriteString(fmt.Sprintf("%d\n read request stack after limiter close: %s\n", seqN, string(stack)))
|
||||||
|
}
|
||||||
|
for _, stack := range t.writeStacks {
|
||||||
|
seqN++
|
||||||
|
sb.WriteString(fmt.Sprintf("%d\n write request stack after limiter close: %s\n", seqN, string(stack)))
|
||||||
|
}
|
||||||
|
require.True(t.t, seqN == 0, sb.String())
|
||||||
}
|
}
|
||||||
|
|
||||||
func (t *testQoSLimiter) ReadRequest(context.Context) (qos.ReleaseFunc, error) {
|
func (t *testQoSLimiter) ReadRequest(context.Context) (qos.ReleaseFunc, error) {
|
||||||
t.read.Add(1)
|
t.quard.Lock()
|
||||||
return func() { t.read.Add(-1) }, nil
|
defer t.quard.Unlock()
|
||||||
|
|
||||||
|
stack := debug.Stack()
|
||||||
|
|
||||||
|
t.id++
|
||||||
|
id := t.id
|
||||||
|
|
||||||
|
if t.readStacks == nil {
|
||||||
|
t.readStacks = make(map[int64][]byte)
|
||||||
|
}
|
||||||
|
t.readStacks[id] = stack
|
||||||
|
|
||||||
|
return func() {
|
||||||
|
t.quard.Lock()
|
||||||
|
defer t.quard.Unlock()
|
||||||
|
|
||||||
|
delete(t.readStacks, id)
|
||||||
|
}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (t *testQoSLimiter) WriteRequest(context.Context) (qos.ReleaseFunc, error) {
|
func (t *testQoSLimiter) WriteRequest(context.Context) (qos.ReleaseFunc, error) {
|
||||||
t.write.Add(1)
|
t.quard.Lock()
|
||||||
return func() { t.write.Add(-1) }, nil
|
defer t.quard.Unlock()
|
||||||
|
|
||||||
|
stack := debug.Stack()
|
||||||
|
|
||||||
|
t.id++
|
||||||
|
id := t.id
|
||||||
|
|
||||||
|
if t.writeStacks == nil {
|
||||||
|
t.writeStacks = make(map[int64][]byte)
|
||||||
|
}
|
||||||
|
t.writeStacks[id] = stack
|
||||||
|
|
||||||
|
return func() {
|
||||||
|
t.quard.Lock()
|
||||||
|
defer t.quard.Unlock()
|
||||||
|
|
||||||
|
delete(t.writeStacks, id)
|
||||||
|
}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (t *testQoSLimiter) SetParentID(string) {}
|
func (t *testQoSLimiter) SetParentID(string) {}
|
||||||
|
|
|
@ -318,8 +318,6 @@ func (e *StorageEngine) SetShardMode(ctx context.Context, id *shard.ID, m mode.M
|
||||||
|
|
||||||
// HandleNewEpoch notifies every shard about NewEpoch event.
|
// HandleNewEpoch notifies every shard about NewEpoch event.
|
||||||
func (e *StorageEngine) HandleNewEpoch(ctx context.Context, epoch uint64) {
|
func (e *StorageEngine) HandleNewEpoch(ctx context.Context, epoch uint64) {
|
||||||
ev := shard.EventNewEpoch(epoch)
|
|
||||||
|
|
||||||
e.mtx.RLock()
|
e.mtx.RLock()
|
||||||
defer e.mtx.RUnlock()
|
defer e.mtx.RUnlock()
|
||||||
|
|
||||||
|
@ -327,7 +325,7 @@ func (e *StorageEngine) HandleNewEpoch(ctx context.Context, epoch uint64) {
|
||||||
select {
|
select {
|
||||||
case <-ctx.Done():
|
case <-ctx.Done():
|
||||||
return
|
return
|
||||||
case sh.NotificationChannel() <- ev:
|
case sh.NotificationChannel() <- epoch:
|
||||||
default:
|
default:
|
||||||
e.log.Debug(ctx, logs.ShardEventProcessingInProgress,
|
e.log.Debug(ctx, logs.ShardEventProcessingInProgress,
|
||||||
zap.Uint64("epoch", epoch), zap.Stringer("shard", sh.ID()))
|
zap.Uint64("epoch", epoch), zap.Stringer("shard", sh.ID()))
|
||||||
|
|
|
@ -139,8 +139,7 @@ func (db *DB) listWithCursor(tx *bbolt.Tx, result []objectcore.Info, count int,
|
||||||
|
|
||||||
var containerID cid.ID
|
var containerID cid.ID
|
||||||
var offset []byte
|
var offset []byte
|
||||||
graveyardBkt := tx.Bucket(graveyardBucketName)
|
bc := newBucketCache()
|
||||||
garbageBkt := tx.Bucket(garbageBucketName)
|
|
||||||
|
|
||||||
rawAddr := make([]byte, cidSize, addressKeySize)
|
rawAddr := make([]byte, cidSize, addressKeySize)
|
||||||
|
|
||||||
|
@ -169,7 +168,7 @@ loop:
|
||||||
bkt := tx.Bucket(name)
|
bkt := tx.Bucket(name)
|
||||||
if bkt != nil {
|
if bkt != nil {
|
||||||
copy(rawAddr, cidRaw)
|
copy(rawAddr, cidRaw)
|
||||||
result, offset, cursor, err = selectNFromBucket(bkt, objType, graveyardBkt, garbageBkt, rawAddr, containerID,
|
result, offset, cursor, err = selectNFromBucket(bc, bkt, objType, rawAddr, containerID,
|
||||||
result, count, cursor, threshold, currEpoch)
|
result, count, cursor, threshold, currEpoch)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, nil, err
|
return nil, nil, err
|
||||||
|
@ -204,9 +203,10 @@ loop:
|
||||||
|
|
||||||
// selectNFromBucket similar to selectAllFromBucket but uses cursor to find
|
// selectNFromBucket similar to selectAllFromBucket but uses cursor to find
|
||||||
// object to start selecting from. Ignores inhumed objects.
|
// object to start selecting from. Ignores inhumed objects.
|
||||||
func selectNFromBucket(bkt *bbolt.Bucket, // main bucket
|
func selectNFromBucket(
|
||||||
|
bc *bucketCache,
|
||||||
|
bkt *bbolt.Bucket, // main bucket
|
||||||
objType objectSDK.Type, // type of the objects stored in the main bucket
|
objType objectSDK.Type, // type of the objects stored in the main bucket
|
||||||
graveyardBkt, garbageBkt *bbolt.Bucket, // cached graveyard buckets
|
|
||||||
cidRaw []byte, // container ID prefix, optimization
|
cidRaw []byte, // container ID prefix, optimization
|
||||||
cnt cid.ID, // container ID
|
cnt cid.ID, // container ID
|
||||||
to []objectcore.Info, // listing result
|
to []objectcore.Info, // listing result
|
||||||
|
@ -219,7 +219,6 @@ func selectNFromBucket(bkt *bbolt.Bucket, // main bucket
|
||||||
cursor = new(Cursor)
|
cursor = new(Cursor)
|
||||||
}
|
}
|
||||||
|
|
||||||
count := len(to)
|
|
||||||
c := bkt.Cursor()
|
c := bkt.Cursor()
|
||||||
k, v := c.First()
|
k, v := c.First()
|
||||||
|
|
||||||
|
@ -231,7 +230,7 @@ func selectNFromBucket(bkt *bbolt.Bucket, // main bucket
|
||||||
}
|
}
|
||||||
|
|
||||||
for ; k != nil; k, v = c.Next() {
|
for ; k != nil; k, v = c.Next() {
|
||||||
if count >= limit {
|
if len(to) >= limit {
|
||||||
break
|
break
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -241,6 +240,8 @@ func selectNFromBucket(bkt *bbolt.Bucket, // main bucket
|
||||||
}
|
}
|
||||||
|
|
||||||
offset = k
|
offset = k
|
||||||
|
graveyardBkt := getGraveyardBucket(bc, bkt.Tx())
|
||||||
|
garbageBkt := getGarbageBucket(bc, bkt.Tx())
|
||||||
if inGraveyardWithKey(append(cidRaw, k...), graveyardBkt, garbageBkt) > 0 {
|
if inGraveyardWithKey(append(cidRaw, k...), graveyardBkt, garbageBkt) > 0 {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
@ -251,7 +252,7 @@ func selectNFromBucket(bkt *bbolt.Bucket, // main bucket
|
||||||
}
|
}
|
||||||
|
|
||||||
expEpoch, hasExpEpoch := hasExpirationEpoch(&o)
|
expEpoch, hasExpEpoch := hasExpirationEpoch(&o)
|
||||||
if hasExpEpoch && expEpoch < currEpoch && !objectLocked(bkt.Tx(), cnt, obj) {
|
if hasExpEpoch && expEpoch < currEpoch && !objectLockedWithCache(bc, bkt.Tx(), cnt, obj) {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -273,7 +274,6 @@ func selectNFromBucket(bkt *bbolt.Bucket, // main bucket
|
||||||
a.SetContainer(cnt)
|
a.SetContainer(cnt)
|
||||||
a.SetObject(obj)
|
a.SetObject(obj)
|
||||||
to = append(to, objectcore.Info{Address: a, Type: objType, IsLinkingObject: isLinkingObj, ECInfo: ecInfo})
|
to = append(to, objectcore.Info{Address: a, Type: objType, IsLinkingObject: isLinkingObj, ECInfo: ecInfo})
|
||||||
count++
|
|
||||||
}
|
}
|
||||||
|
|
||||||
return to, offset, cursor, nil
|
return to, offset, cursor, nil
|
||||||
|
|
|
@ -108,19 +108,17 @@ func (s *Shard) Init(ctx context.Context) error {
|
||||||
s.updateMetrics(ctx)
|
s.updateMetrics(ctx)
|
||||||
|
|
||||||
s.gc = &gc{
|
s.gc = &gc{
|
||||||
gcCfg: &s.gcCfg,
|
gcCfg: &s.gcCfg,
|
||||||
remover: s.removeGarbage,
|
remover: s.removeGarbage,
|
||||||
stopChannel: make(chan struct{}),
|
stopChannel: make(chan struct{}),
|
||||||
eventChan: make(chan Event),
|
newEpochChan: make(chan uint64),
|
||||||
mEventHandler: map[eventType]*eventHandlers{
|
newEpochHandlers: &newEpochHandlers{
|
||||||
eventNewEpoch: {
|
cancelFunc: func() {},
|
||||||
cancelFunc: func() {},
|
handlers: []newEpochHandler{
|
||||||
handlers: []eventHandler{
|
s.collectExpiredLocks,
|
||||||
s.collectExpiredLocks,
|
s.collectExpiredObjects,
|
||||||
s.collectExpiredObjects,
|
s.collectExpiredTombstones,
|
||||||
s.collectExpiredTombstones,
|
s.collectExpiredMetrics,
|
||||||
s.collectExpiredMetrics,
|
|
||||||
},
|
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
|
|
@ -33,41 +33,14 @@ type TombstoneSource interface {
|
||||||
IsTombstoneAvailable(ctx context.Context, addr oid.Address, epoch uint64) bool
|
IsTombstoneAvailable(ctx context.Context, addr oid.Address, epoch uint64) bool
|
||||||
}
|
}
|
||||||
|
|
||||||
// Event represents class of external events.
|
type newEpochHandler func(context.Context, uint64)
|
||||||
type Event interface {
|
|
||||||
typ() eventType
|
|
||||||
}
|
|
||||||
|
|
||||||
type eventType int
|
type newEpochHandlers struct {
|
||||||
|
|
||||||
const (
|
|
||||||
_ eventType = iota
|
|
||||||
eventNewEpoch
|
|
||||||
)
|
|
||||||
|
|
||||||
type newEpoch struct {
|
|
||||||
epoch uint64
|
|
||||||
}
|
|
||||||
|
|
||||||
func (e newEpoch) typ() eventType {
|
|
||||||
return eventNewEpoch
|
|
||||||
}
|
|
||||||
|
|
||||||
// EventNewEpoch returns new epoch event.
|
|
||||||
func EventNewEpoch(e uint64) Event {
|
|
||||||
return newEpoch{
|
|
||||||
epoch: e,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
type eventHandler func(context.Context, Event)
|
|
||||||
|
|
||||||
type eventHandlers struct {
|
|
||||||
prevGroup sync.WaitGroup
|
prevGroup sync.WaitGroup
|
||||||
|
|
||||||
cancelFunc context.CancelFunc
|
cancelFunc context.CancelFunc
|
||||||
|
|
||||||
handlers []eventHandler
|
handlers []newEpochHandler
|
||||||
}
|
}
|
||||||
|
|
||||||
type gcRunResult struct {
|
type gcRunResult struct {
|
||||||
|
@ -109,10 +82,10 @@ type gc struct {
|
||||||
|
|
||||||
remover func(context.Context) gcRunResult
|
remover func(context.Context) gcRunResult
|
||||||
|
|
||||||
// eventChan is used only for listening for the new epoch event.
|
// newEpochChan is used only for listening for the new epoch event.
|
||||||
// It is ok to keep opened, we are listening for context done when writing in it.
|
// It is ok to keep opened, we are listening for context done when writing in it.
|
||||||
eventChan chan Event
|
newEpochChan chan uint64
|
||||||
mEventHandler map[eventType]*eventHandlers
|
newEpochHandlers *newEpochHandlers
|
||||||
}
|
}
|
||||||
|
|
||||||
type gcCfg struct {
|
type gcCfg struct {
|
||||||
|
@ -142,15 +115,7 @@ func defaultGCCfg() gcCfg {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (gc *gc) init(ctx context.Context) {
|
func (gc *gc) init(ctx context.Context) {
|
||||||
sz := 0
|
gc.workerPool = gc.workerPoolInit(len(gc.newEpochHandlers.handlers))
|
||||||
|
|
||||||
for _, v := range gc.mEventHandler {
|
|
||||||
sz += len(v.handlers)
|
|
||||||
}
|
|
||||||
|
|
||||||
if sz > 0 {
|
|
||||||
gc.workerPool = gc.workerPoolInit(sz)
|
|
||||||
}
|
|
||||||
ctx = tagging.ContextWithIOTag(ctx, qos.IOTagBackground.String())
|
ctx = tagging.ContextWithIOTag(ctx, qos.IOTagBackground.String())
|
||||||
gc.wg.Add(2)
|
gc.wg.Add(2)
|
||||||
go gc.tickRemover(ctx)
|
go gc.tickRemover(ctx)
|
||||||
|
@ -168,7 +133,7 @@ func (gc *gc) listenEvents(ctx context.Context) {
|
||||||
case <-ctx.Done():
|
case <-ctx.Done():
|
||||||
gc.log.Warn(ctx, logs.ShardStopEventListenerByContext)
|
gc.log.Warn(ctx, logs.ShardStopEventListenerByContext)
|
||||||
return
|
return
|
||||||
case event, ok := <-gc.eventChan:
|
case event, ok := <-gc.newEpochChan:
|
||||||
if !ok {
|
if !ok {
|
||||||
gc.log.Warn(ctx, logs.ShardStopEventListenerByClosedEventChannel)
|
gc.log.Warn(ctx, logs.ShardStopEventListenerByClosedEventChannel)
|
||||||
return
|
return
|
||||||
|
@ -179,38 +144,33 @@ func (gc *gc) listenEvents(ctx context.Context) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (gc *gc) handleEvent(ctx context.Context, event Event) {
|
func (gc *gc) handleEvent(ctx context.Context, epoch uint64) {
|
||||||
v, ok := gc.mEventHandler[event.typ()]
|
gc.newEpochHandlers.cancelFunc()
|
||||||
if !ok {
|
gc.newEpochHandlers.prevGroup.Wait()
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
v.cancelFunc()
|
|
||||||
v.prevGroup.Wait()
|
|
||||||
|
|
||||||
var runCtx context.Context
|
var runCtx context.Context
|
||||||
runCtx, v.cancelFunc = context.WithCancel(ctx)
|
runCtx, gc.newEpochHandlers.cancelFunc = context.WithCancel(ctx)
|
||||||
|
|
||||||
v.prevGroup.Add(len(v.handlers))
|
gc.newEpochHandlers.prevGroup.Add(len(gc.newEpochHandlers.handlers))
|
||||||
|
|
||||||
for i := range v.handlers {
|
for i := range gc.newEpochHandlers.handlers {
|
||||||
select {
|
select {
|
||||||
case <-ctx.Done():
|
case <-ctx.Done():
|
||||||
return
|
return
|
||||||
default:
|
default:
|
||||||
}
|
}
|
||||||
h := v.handlers[i]
|
h := gc.newEpochHandlers.handlers[i]
|
||||||
|
|
||||||
err := gc.workerPool.Submit(func() {
|
err := gc.workerPool.Submit(func() {
|
||||||
defer v.prevGroup.Done()
|
defer gc.newEpochHandlers.prevGroup.Done()
|
||||||
h(runCtx, event)
|
h(runCtx, epoch)
|
||||||
})
|
})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
gc.log.Warn(ctx, logs.ShardCouldNotSubmitGCJobToWorkerPool,
|
gc.log.Warn(ctx, logs.ShardCouldNotSubmitGCJobToWorkerPool,
|
||||||
zap.Error(err),
|
zap.Error(err),
|
||||||
)
|
)
|
||||||
|
|
||||||
v.prevGroup.Done()
|
gc.newEpochHandlers.prevGroup.Done()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -267,6 +227,9 @@ func (gc *gc) stop(ctx context.Context) {
|
||||||
|
|
||||||
gc.log.Info(ctx, logs.ShardWaitingForGCWorkersToStop)
|
gc.log.Info(ctx, logs.ShardWaitingForGCWorkersToStop)
|
||||||
gc.wg.Wait()
|
gc.wg.Wait()
|
||||||
|
|
||||||
|
gc.newEpochHandlers.cancelFunc()
|
||||||
|
gc.newEpochHandlers.prevGroup.Wait()
|
||||||
}
|
}
|
||||||
|
|
||||||
// iterates over metabase and deletes objects
|
// iterates over metabase and deletes objects
|
||||||
|
@ -362,7 +325,7 @@ func (s *Shard) getExpiredObjectsParameters() (workerCount, batchSize int) {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *Shard) collectExpiredObjects(ctx context.Context, e Event) {
|
func (s *Shard) collectExpiredObjects(ctx context.Context, epoch uint64) {
|
||||||
var err error
|
var err error
|
||||||
startedAt := time.Now()
|
startedAt := time.Now()
|
||||||
|
|
||||||
|
@ -370,8 +333,8 @@ func (s *Shard) collectExpiredObjects(ctx context.Context, e Event) {
|
||||||
s.gc.metrics.AddExpiredObjectCollectionDuration(time.Since(startedAt), err == nil, objectTypeRegular)
|
s.gc.metrics.AddExpiredObjectCollectionDuration(time.Since(startedAt), err == nil, objectTypeRegular)
|
||||||
}()
|
}()
|
||||||
|
|
||||||
s.log.Debug(ctx, logs.ShardGCCollectingExpiredObjectsStarted, zap.Uint64("epoch", e.(newEpoch).epoch))
|
s.log.Debug(ctx, logs.ShardGCCollectingExpiredObjectsStarted, zap.Uint64("epoch", epoch))
|
||||||
defer s.log.Debug(ctx, logs.ShardGCCollectingExpiredObjectsCompleted, zap.Uint64("epoch", e.(newEpoch).epoch))
|
defer s.log.Debug(ctx, logs.ShardGCCollectingExpiredObjectsCompleted, zap.Uint64("epoch", epoch))
|
||||||
|
|
||||||
workersCount, batchSize := s.getExpiredObjectsParameters()
|
workersCount, batchSize := s.getExpiredObjectsParameters()
|
||||||
|
|
||||||
|
@ -380,7 +343,7 @@ func (s *Shard) collectExpiredObjects(ctx context.Context, e Event) {
|
||||||
|
|
||||||
errGroup.Go(func() error {
|
errGroup.Go(func() error {
|
||||||
batch := make([]oid.Address, 0, batchSize)
|
batch := make([]oid.Address, 0, batchSize)
|
||||||
expErr := s.getExpiredObjects(egCtx, e.(newEpoch).epoch, func(o *meta.ExpiredObject) {
|
expErr := s.getExpiredObjects(egCtx, epoch, func(o *meta.ExpiredObject) {
|
||||||
if o.Type() != objectSDK.TypeTombstone && o.Type() != objectSDK.TypeLock {
|
if o.Type() != objectSDK.TypeTombstone && o.Type() != objectSDK.TypeLock {
|
||||||
batch = append(batch, o.Address())
|
batch = append(batch, o.Address())
|
||||||
|
|
||||||
|
@ -486,7 +449,7 @@ func (s *Shard) inhumeGC(ctx context.Context, addrs []oid.Address) (meta.InhumeR
|
||||||
return s.metaBase.Inhume(ctx, inhumePrm)
|
return s.metaBase.Inhume(ctx, inhumePrm)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *Shard) collectExpiredTombstones(ctx context.Context, e Event) {
|
func (s *Shard) collectExpiredTombstones(ctx context.Context, epoch uint64) {
|
||||||
var err error
|
var err error
|
||||||
startedAt := time.Now()
|
startedAt := time.Now()
|
||||||
|
|
||||||
|
@ -494,7 +457,6 @@ func (s *Shard) collectExpiredTombstones(ctx context.Context, e Event) {
|
||||||
s.gc.metrics.AddExpiredObjectCollectionDuration(time.Since(startedAt), err == nil, objectTypeTombstone)
|
s.gc.metrics.AddExpiredObjectCollectionDuration(time.Since(startedAt), err == nil, objectTypeTombstone)
|
||||||
}()
|
}()
|
||||||
|
|
||||||
epoch := e.(newEpoch).epoch
|
|
||||||
log := s.log.With(zap.Uint64("epoch", epoch))
|
log := s.log.With(zap.Uint64("epoch", epoch))
|
||||||
|
|
||||||
log.Debug(ctx, logs.ShardStartedExpiredTombstonesHandling)
|
log.Debug(ctx, logs.ShardStartedExpiredTombstonesHandling)
|
||||||
|
@ -527,7 +489,8 @@ func (s *Shard) collectExpiredTombstones(ctx context.Context, e Event) {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
release, err := s.opsLimiter.ReadRequest(ctx)
|
var release qos.ReleaseFunc
|
||||||
|
release, err = s.opsLimiter.ReadRequest(ctx)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Error(ctx, logs.ShardIteratorOverGraveyardFailed, zap.Error(err))
|
log.Error(ctx, logs.ShardIteratorOverGraveyardFailed, zap.Error(err))
|
||||||
s.m.RUnlock()
|
s.m.RUnlock()
|
||||||
|
@ -565,7 +528,7 @@ func (s *Shard) collectExpiredTombstones(ctx context.Context, e Event) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *Shard) collectExpiredLocks(ctx context.Context, e Event) {
|
func (s *Shard) collectExpiredLocks(ctx context.Context, epoch uint64) {
|
||||||
var err error
|
var err error
|
||||||
startedAt := time.Now()
|
startedAt := time.Now()
|
||||||
|
|
||||||
|
@ -573,8 +536,8 @@ func (s *Shard) collectExpiredLocks(ctx context.Context, e Event) {
|
||||||
s.gc.metrics.AddExpiredObjectCollectionDuration(time.Since(startedAt), err == nil, objectTypeLock)
|
s.gc.metrics.AddExpiredObjectCollectionDuration(time.Since(startedAt), err == nil, objectTypeLock)
|
||||||
}()
|
}()
|
||||||
|
|
||||||
s.log.Debug(ctx, logs.ShardGCCollectingExpiredLocksStarted, zap.Uint64("epoch", e.(newEpoch).epoch))
|
s.log.Debug(ctx, logs.ShardGCCollectingExpiredLocksStarted, zap.Uint64("epoch", epoch))
|
||||||
defer s.log.Debug(ctx, logs.ShardGCCollectingExpiredLocksCompleted, zap.Uint64("epoch", e.(newEpoch).epoch))
|
defer s.log.Debug(ctx, logs.ShardGCCollectingExpiredLocksCompleted, zap.Uint64("epoch", epoch))
|
||||||
|
|
||||||
workersCount, batchSize := s.getExpiredObjectsParameters()
|
workersCount, batchSize := s.getExpiredObjectsParameters()
|
||||||
|
|
||||||
|
@ -584,14 +547,14 @@ func (s *Shard) collectExpiredLocks(ctx context.Context, e Event) {
|
||||||
errGroup.Go(func() error {
|
errGroup.Go(func() error {
|
||||||
batch := make([]oid.Address, 0, batchSize)
|
batch := make([]oid.Address, 0, batchSize)
|
||||||
|
|
||||||
expErr := s.getExpiredObjects(egCtx, e.(newEpoch).epoch, func(o *meta.ExpiredObject) {
|
expErr := s.getExpiredObjects(egCtx, epoch, func(o *meta.ExpiredObject) {
|
||||||
if o.Type() == objectSDK.TypeLock {
|
if o.Type() == objectSDK.TypeLock {
|
||||||
batch = append(batch, o.Address())
|
batch = append(batch, o.Address())
|
||||||
|
|
||||||
if len(batch) == batchSize {
|
if len(batch) == batchSize {
|
||||||
expired := batch
|
expired := batch
|
||||||
errGroup.Go(func() error {
|
errGroup.Go(func() error {
|
||||||
s.expiredLocksCallback(egCtx, e.(newEpoch).epoch, expired)
|
s.expiredLocksCallback(egCtx, epoch, expired)
|
||||||
return egCtx.Err()
|
return egCtx.Err()
|
||||||
})
|
})
|
||||||
batch = make([]oid.Address, 0, batchSize)
|
batch = make([]oid.Address, 0, batchSize)
|
||||||
|
@ -605,7 +568,7 @@ func (s *Shard) collectExpiredLocks(ctx context.Context, e Event) {
|
||||||
if len(batch) > 0 {
|
if len(batch) > 0 {
|
||||||
expired := batch
|
expired := batch
|
||||||
errGroup.Go(func() error {
|
errGroup.Go(func() error {
|
||||||
s.expiredLocksCallback(egCtx, e.(newEpoch).epoch, expired)
|
s.expiredLocksCallback(egCtx, epoch, expired)
|
||||||
return egCtx.Err()
|
return egCtx.Err()
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
@ -704,7 +667,10 @@ func (s *Shard) HandleExpiredTombstones(ctx context.Context, tss []meta.Tombston
|
||||||
// HandleExpiredLocks unlocks all objects which were locked by lockers.
|
// HandleExpiredLocks unlocks all objects which were locked by lockers.
|
||||||
// If successful, marks lockers themselves as garbage.
|
// If successful, marks lockers themselves as garbage.
|
||||||
func (s *Shard) HandleExpiredLocks(ctx context.Context, epoch uint64, lockers []oid.Address) {
|
func (s *Shard) HandleExpiredLocks(ctx context.Context, epoch uint64, lockers []oid.Address) {
|
||||||
if s.GetMode().NoMetabase() {
|
s.m.RLock()
|
||||||
|
defer s.m.RUnlock()
|
||||||
|
|
||||||
|
if s.info.Mode.NoMetabase() {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -767,7 +733,10 @@ func (s *Shard) inhumeUnlockedIfExpired(ctx context.Context, epoch uint64, unloc
|
||||||
|
|
||||||
// HandleDeletedLocks unlocks all objects which were locked by lockers.
|
// HandleDeletedLocks unlocks all objects which were locked by lockers.
|
||||||
func (s *Shard) HandleDeletedLocks(ctx context.Context, lockers []oid.Address) {
|
func (s *Shard) HandleDeletedLocks(ctx context.Context, lockers []oid.Address) {
|
||||||
if s.GetMode().NoMetabase() {
|
s.m.RLock()
|
||||||
|
defer s.m.RUnlock()
|
||||||
|
|
||||||
|
if s.info.Mode.NoMetabase() {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -784,17 +753,15 @@ func (s *Shard) HandleDeletedLocks(ctx context.Context, lockers []oid.Address) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// NotificationChannel returns channel for shard events.
|
// NotificationChannel returns channel for new epoch events.
|
||||||
func (s *Shard) NotificationChannel() chan<- Event {
|
func (s *Shard) NotificationChannel() chan<- uint64 {
|
||||||
return s.gc.eventChan
|
return s.gc.newEpochChan
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *Shard) collectExpiredMetrics(ctx context.Context, e Event) {
|
func (s *Shard) collectExpiredMetrics(ctx context.Context, epoch uint64) {
|
||||||
ctx, span := tracing.StartSpanFromContext(ctx, "shard.collectExpiredMetrics")
|
ctx, span := tracing.StartSpanFromContext(ctx, "shard.collectExpiredMetrics")
|
||||||
defer span.End()
|
defer span.End()
|
||||||
|
|
||||||
epoch := e.(newEpoch).epoch
|
|
||||||
|
|
||||||
s.log.Debug(ctx, logs.ShardGCCollectingExpiredMetricsStarted, zap.Uint64("epoch", epoch))
|
s.log.Debug(ctx, logs.ShardGCCollectingExpiredMetricsStarted, zap.Uint64("epoch", epoch))
|
||||||
defer s.log.Debug(ctx, logs.ShardGCCollectingExpiredMetricsCompleted, zap.Uint64("epoch", epoch))
|
defer s.log.Debug(ctx, logs.ShardGCCollectingExpiredMetricsCompleted, zap.Uint64("epoch", epoch))
|
||||||
|
|
||||||
|
|
|
@ -69,7 +69,7 @@ func Test_GCDropsLockedExpiredSimpleObject(t *testing.T) {
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
epoch.Value = 105
|
epoch.Value = 105
|
||||||
sh.gc.handleEvent(context.Background(), EventNewEpoch(epoch.Value))
|
sh.gc.handleEvent(context.Background(), epoch.Value)
|
||||||
|
|
||||||
var getPrm GetPrm
|
var getPrm GetPrm
|
||||||
getPrm.SetAddress(objectCore.AddressOf(obj))
|
getPrm.SetAddress(objectCore.AddressOf(obj))
|
||||||
|
@ -165,7 +165,7 @@ func Test_GCDropsLockedExpiredComplexObject(t *testing.T) {
|
||||||
require.True(t, errors.As(err, &splitInfoError), "split info must be provided")
|
require.True(t, errors.As(err, &splitInfoError), "split info must be provided")
|
||||||
|
|
||||||
epoch.Value = 105
|
epoch.Value = 105
|
||||||
sh.gc.handleEvent(context.Background(), EventNewEpoch(epoch.Value))
|
sh.gc.handleEvent(context.Background(), epoch.Value)
|
||||||
|
|
||||||
_, err = sh.Get(context.Background(), getPrm)
|
_, err = sh.Get(context.Background(), getPrm)
|
||||||
require.True(t, client.IsErrObjectNotFound(err) || IsErrObjectExpired(err), "expired complex object must be deleted on epoch after lock expires")
|
require.True(t, client.IsErrObjectNotFound(err) || IsErrObjectExpired(err), "expired complex object must be deleted on epoch after lock expires")
|
||||||
|
|
|
@ -9,15 +9,10 @@ import (
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
internalNet "git.frostfs.info/TrueCloudLab/frostfs-node/internal/net"
|
internalNet "git.frostfs.info/TrueCloudLab/frostfs-node/internal/net"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/qos"
|
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/network"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/network"
|
||||||
metrics "git.frostfs.info/TrueCloudLab/frostfs-observability/metrics/grpc"
|
|
||||||
tracing "git.frostfs.info/TrueCloudLab/frostfs-observability/tracing/grpc"
|
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-qos/tagging"
|
|
||||||
"github.com/hashicorp/golang-lru/v2/simplelru"
|
"github.com/hashicorp/golang-lru/v2/simplelru"
|
||||||
"google.golang.org/grpc"
|
"google.golang.org/grpc"
|
||||||
"google.golang.org/grpc/connectivity"
|
"google.golang.org/grpc/connectivity"
|
||||||
"google.golang.org/grpc/credentials/insecure"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
type clientCache struct {
|
type clientCache struct {
|
||||||
|
@ -95,27 +90,13 @@ func (c *clientCache) dialTreeService(ctx context.Context, netmapAddr string) (*
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
opts := []grpc.DialOption{
|
cc, err := createConnection(netAddr, grpc.WithContextDialer(c.ds.GrpcContextDialer()))
|
||||||
grpc.WithChainUnaryInterceptor(
|
if err != nil {
|
||||||
qos.NewAdjustOutgoingIOTagUnaryClientInterceptor(),
|
return nil, err
|
||||||
metrics.NewUnaryClientInterceptor(),
|
|
||||||
tracing.NewUnaryClientInterceptor(),
|
|
||||||
tagging.NewUnaryClientInterceptor(),
|
|
||||||
),
|
|
||||||
grpc.WithChainStreamInterceptor(
|
|
||||||
qos.NewAdjustOutgoingIOTagStreamClientInterceptor(),
|
|
||||||
metrics.NewStreamClientInterceptor(),
|
|
||||||
tracing.NewStreamClientInterceptor(),
|
|
||||||
tagging.NewStreamClientInterceptor(),
|
|
||||||
),
|
|
||||||
grpc.WithContextDialer(c.ds.GrpcContextDialer()),
|
|
||||||
grpc.WithDefaultCallOptions(grpc.WaitForReady(true)),
|
|
||||||
grpc.WithDisableServiceConfig(),
|
|
||||||
}
|
}
|
||||||
|
|
||||||
if !netAddr.IsTLSEnabled() {
|
ctx, cancel := context.WithTimeout(ctx, defaultClientConnectTimeout)
|
||||||
opts = append(opts, grpc.WithTransportCredentials(insecure.NewCredentials()))
|
defer cancel()
|
||||||
}
|
|
||||||
|
|
||||||
req := &HealthcheckRequest{
|
req := &HealthcheckRequest{
|
||||||
Body: &HealthcheckRequest_Body{},
|
Body: &HealthcheckRequest_Body{},
|
||||||
|
@ -124,13 +105,6 @@ func (c *clientCache) dialTreeService(ctx context.Context, netmapAddr string) (*
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
cc, err := grpc.NewClient(netAddr.URIAddr(), opts...)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
ctx, cancel := context.WithTimeout(ctx, defaultClientConnectTimeout)
|
|
||||||
defer cancel()
|
|
||||||
// perform some request to check connection
|
// perform some request to check connection
|
||||||
if _, err := NewTreeServiceClient(cc).Healthcheck(ctx, req); err != nil {
|
if _, err := NewTreeServiceClient(cc).Healthcheck(ctx, req); err != nil {
|
||||||
_ = cc.Close()
|
_ = cc.Close()
|
||||||
|
|
|
@ -3,6 +3,7 @@ package tree
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"crypto/sha256"
|
"crypto/sha256"
|
||||||
|
"crypto/tls"
|
||||||
"errors"
|
"errors"
|
||||||
"fmt"
|
"fmt"
|
||||||
"io"
|
"io"
|
||||||
|
@ -22,12 +23,14 @@ import (
|
||||||
tracing "git.frostfs.info/TrueCloudLab/frostfs-observability/tracing"
|
tracing "git.frostfs.info/TrueCloudLab/frostfs-observability/tracing"
|
||||||
tracing_grpc "git.frostfs.info/TrueCloudLab/frostfs-observability/tracing/grpc"
|
tracing_grpc "git.frostfs.info/TrueCloudLab/frostfs-observability/tracing/grpc"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-qos/tagging"
|
"git.frostfs.info/TrueCloudLab/frostfs-qos/tagging"
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/api/rpc/client"
|
||||||
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
|
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
|
||||||
netmapSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/netmap"
|
netmapSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/netmap"
|
||||||
"github.com/panjf2000/ants/v2"
|
"github.com/panjf2000/ants/v2"
|
||||||
"go.uber.org/zap"
|
"go.uber.org/zap"
|
||||||
"golang.org/x/sync/errgroup"
|
"golang.org/x/sync/errgroup"
|
||||||
"google.golang.org/grpc"
|
"google.golang.org/grpc"
|
||||||
|
"google.golang.org/grpc/credentials"
|
||||||
"google.golang.org/grpc/credentials/insecure"
|
"google.golang.org/grpc/credentials/insecure"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -301,7 +304,7 @@ func (s *Service) synchronizeTree(ctx context.Context, cid cid.ID, from uint64,
|
||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
|
|
||||||
cc, err := s.createConnection(a)
|
cc, err := createConnection(a, grpc.WithContextDialer(s.ds.GrpcContextDialer()))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
s.log.Warn(ctx, logs.TreeFailedToConnectForTreeSynchronization, zap.Error(err), zap.String("address", addr))
|
s.log.Warn(ctx, logs.TreeFailedToConnectForTreeSynchronization, zap.Error(err), zap.String("address", addr))
|
||||||
return false
|
return false
|
||||||
|
@ -339,8 +342,18 @@ func (s *Service) synchronizeTree(ctx context.Context, cid cid.ID, from uint64,
|
||||||
return from
|
return from
|
||||||
}
|
}
|
||||||
|
|
||||||
func (*Service) createConnection(a network.Address) (*grpc.ClientConn, error) {
|
func createConnection(a network.Address, opts ...grpc.DialOption) (*grpc.ClientConn, error) {
|
||||||
return grpc.NewClient(a.URIAddr(),
|
host, isTLS, err := client.ParseURI(a.URIAddr())
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
creds := insecure.NewCredentials()
|
||||||
|
if isTLS {
|
||||||
|
creds = credentials.NewTLS(&tls.Config{})
|
||||||
|
}
|
||||||
|
|
||||||
|
defaultOpts := []grpc.DialOption{
|
||||||
grpc.WithChainUnaryInterceptor(
|
grpc.WithChainUnaryInterceptor(
|
||||||
qos.NewAdjustOutgoingIOTagUnaryClientInterceptor(),
|
qos.NewAdjustOutgoingIOTagUnaryClientInterceptor(),
|
||||||
metrics.NewUnaryClientInterceptor(),
|
metrics.NewUnaryClientInterceptor(),
|
||||||
|
@ -353,10 +366,12 @@ func (*Service) createConnection(a network.Address) (*grpc.ClientConn, error) {
|
||||||
tracing_grpc.NewStreamClientInterceptor(),
|
tracing_grpc.NewStreamClientInterceptor(),
|
||||||
tagging.NewStreamClientInterceptor(),
|
tagging.NewStreamClientInterceptor(),
|
||||||
),
|
),
|
||||||
grpc.WithTransportCredentials(insecure.NewCredentials()),
|
grpc.WithTransportCredentials(creds),
|
||||||
grpc.WithDefaultCallOptions(grpc.WaitForReady(true)),
|
grpc.WithDefaultCallOptions(grpc.WaitForReady(true)),
|
||||||
grpc.WithDisableServiceConfig(),
|
grpc.WithDisableServiceConfig(),
|
||||||
)
|
}
|
||||||
|
|
||||||
|
return grpc.NewClient(host, append(defaultOpts, opts...)...)
|
||||||
}
|
}
|
||||||
|
|
||||||
// ErrAlreadySyncing is returned when a service synchronization has already
|
// ErrAlreadySyncing is returned when a service synchronization has already
|
||||||
|
|
|
@ -36,6 +36,9 @@ type Prm struct {
|
||||||
|
|
||||||
// PrependTimestamp specifies whether to prepend a timestamp in the log
|
// PrependTimestamp specifies whether to prepend a timestamp in the log
|
||||||
PrependTimestamp bool
|
PrependTimestamp bool
|
||||||
|
|
||||||
|
// Options for zap.Logger
|
||||||
|
Options []zap.Option
|
||||||
}
|
}
|
||||||
|
|
||||||
const (
|
const (
|
||||||
|
@ -103,10 +106,12 @@ func newConsoleLogger(prm Prm) (*Logger, error) {
|
||||||
c.EncoderConfig.TimeKey = ""
|
c.EncoderConfig.TimeKey = ""
|
||||||
}
|
}
|
||||||
|
|
||||||
lZap, err := c.Build(
|
opts := []zap.Option{
|
||||||
zap.AddStacktrace(zap.NewAtomicLevelAt(zap.FatalLevel)),
|
zap.AddStacktrace(zap.NewAtomicLevelAt(zap.FatalLevel)),
|
||||||
zap.AddCallerSkip(1),
|
zap.AddCallerSkip(1),
|
||||||
)
|
}
|
||||||
|
opts = append(opts, prm.Options...)
|
||||||
|
lZap, err := c.Build(opts...)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
@ -150,7 +155,12 @@ func newJournaldLogger(prm Prm) (*Logger, error) {
|
||||||
c.Sampling.Thereafter,
|
c.Sampling.Thereafter,
|
||||||
samplerOpts...,
|
samplerOpts...,
|
||||||
)
|
)
|
||||||
lZap := zap.New(samplingCore, zap.AddStacktrace(zap.NewAtomicLevelAt(zap.FatalLevel)), zap.AddCallerSkip(1))
|
opts := []zap.Option{
|
||||||
|
zap.AddStacktrace(zap.NewAtomicLevelAt(zap.FatalLevel)),
|
||||||
|
zap.AddCallerSkip(1),
|
||||||
|
}
|
||||||
|
opts = append(opts, prm.Options...)
|
||||||
|
lZap := zap.New(samplingCore, opts...)
|
||||||
|
|
||||||
l := &Logger{z: lZap, lvl: lvl}
|
l := &Logger{z: lZap, lvl: lvl}
|
||||||
|
|
||||||
|
@ -161,10 +171,6 @@ func (l *Logger) Reload(prm Prm) {
|
||||||
l.lvl.SetLevel(prm.level)
|
l.lvl.SetLevel(prm.level)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (l *Logger) WithOptions(options ...zap.Option) {
|
|
||||||
l.z = l.z.WithOptions(options...)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (l *Logger) With(fields ...zap.Field) *Logger {
|
func (l *Logger) With(fields ...zap.Field) *Logger {
|
||||||
return &Logger{z: l.z.With(fields...)}
|
return &Logger{z: l.z.With(fields...)}
|
||||||
}
|
}
|
||||||
|
|
36
pkg/util/testing/netmap_source.go
Normal file
36
pkg/util/testing/netmap_source.go
Normal file
|
@ -0,0 +1,36 @@
|
||||||
|
package testing
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"errors"
|
||||||
|
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/netmap"
|
||||||
|
)
|
||||||
|
|
||||||
|
var (
|
||||||
|
errInvalidDiff = errors.New("invalid diff")
|
||||||
|
errNetmapNotFound = errors.New("netmap not found")
|
||||||
|
)
|
||||||
|
|
||||||
|
type TestNetmapSource struct {
|
||||||
|
Netmaps map[uint64]*netmap.NetMap
|
||||||
|
CurrentEpoch uint64
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *TestNetmapSource) GetNetMap(ctx context.Context, diff uint64) (*netmap.NetMap, error) {
|
||||||
|
if diff >= s.CurrentEpoch {
|
||||||
|
return nil, errInvalidDiff
|
||||||
|
}
|
||||||
|
return s.GetNetMapByEpoch(ctx, s.CurrentEpoch-diff)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *TestNetmapSource) GetNetMapByEpoch(_ context.Context, epoch uint64) (*netmap.NetMap, error) {
|
||||||
|
if nm, found := s.Netmaps[epoch]; found {
|
||||||
|
return nm, nil
|
||||||
|
}
|
||||||
|
return nil, errNetmapNotFound
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *TestNetmapSource) Epoch(context.Context) (uint64, error) {
|
||||||
|
return s.CurrentEpoch, nil
|
||||||
|
}
|
Loading…
Add table
Reference in a new issue