forked from TrueCloudLab/frostfs-node
Compare commits
13 commits
feat/add-g
...
master
Author | SHA1 | Date | |
---|---|---|---|
d432bebef4 | |||
d144abc977 | |||
a2053870e2 | |||
d00c606fee | |||
60446bb668 | |||
bd8ab2d84a | |||
bce2f7bef0 | |||
c2c05e2228 | |||
0a38571a10 | |||
632bd8e38d | |||
3bbee1b554 | |||
9358938222 | |||
5470b205fd |
16 changed files with 357 additions and 340 deletions
|
@ -6,6 +6,7 @@ import (
|
|||
"time"
|
||||
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-contract/nns"
|
||||
nns2 "git.frostfs.info/TrueCloudLab/frostfs-contract/rpcclient/nns"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-adm/internal/commonflags"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-adm/internal/modules/config"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-adm/internal/modules/morph/constants"
|
||||
|
@ -13,9 +14,7 @@ import (
|
|||
"github.com/nspcc-dev/neo-go/pkg/core/native/nativenames"
|
||||
"github.com/nspcc-dev/neo-go/pkg/crypto/keys"
|
||||
"github.com/nspcc-dev/neo-go/pkg/encoding/address"
|
||||
"github.com/nspcc-dev/neo-go/pkg/rpcclient"
|
||||
"github.com/nspcc-dev/neo-go/pkg/rpcclient/invoker"
|
||||
nns2 "github.com/nspcc-dev/neo-go/pkg/rpcclient/nns"
|
||||
"github.com/nspcc-dev/neo-go/pkg/rpcclient/unwrap"
|
||||
"github.com/nspcc-dev/neo-go/pkg/smartcontract/trigger"
|
||||
"github.com/nspcc-dev/neo-go/pkg/util"
|
||||
|
@ -187,19 +186,9 @@ func NNSResolveKey(inv *invoker.Invoker, nnsHash util.Uint160, domain string) (*
|
|||
}
|
||||
|
||||
func NNSIsAvailable(c Client, nnsHash util.Uint160, name string) (bool, error) {
|
||||
switch c.(type) {
|
||||
case *rpcclient.Client:
|
||||
inv := invoker.New(c, nil)
|
||||
reader := nns2.NewReader(inv, nnsHash)
|
||||
return reader.IsAvailable(name)
|
||||
default:
|
||||
b, err := unwrap.Bool(InvokeFunction(c, nnsHash, "isAvailable", []any{name}, nil))
|
||||
if err != nil {
|
||||
return false, fmt.Errorf("`isAvailable`: invalid response: %w", err)
|
||||
}
|
||||
|
||||
return b, nil
|
||||
}
|
||||
}
|
||||
|
||||
func CheckNotaryEnabled(c Client) error {
|
||||
|
|
|
@ -40,6 +40,8 @@ type ClientContext struct {
|
|||
CommitteeAct *actor.Actor // committee actor with the Global witness scope
|
||||
ReadOnlyInvoker *invoker.Invoker // R/O contract invoker, does not contain any signer
|
||||
SentTxs []HashVUBPair
|
||||
|
||||
AwaitDisabled bool
|
||||
}
|
||||
|
||||
func NewRemoteClient(v *viper.Viper) (Client, error) {
|
||||
|
@ -120,7 +122,7 @@ func (c *ClientContext) SendTx(tx *transaction.Transaction, cmd *cobra.Command,
|
|||
}
|
||||
|
||||
func (c *ClientContext) AwaitTx(cmd *cobra.Command) error {
|
||||
if len(c.SentTxs) == 0 {
|
||||
if len(c.SentTxs) == 0 || c.AwaitDisabled {
|
||||
return nil
|
||||
}
|
||||
|
||||
|
|
|
@ -39,6 +39,7 @@ func initializeSideChainCmd(cmd *cobra.Command, _ []string) error {
|
|||
return err
|
||||
}
|
||||
|
||||
initCtx.AwaitDisabled = true
|
||||
cmd.Println("Stage 4.1: Transfer GAS to proxy contract.")
|
||||
if err := transferGASToProxy(initCtx); err != nil {
|
||||
return err
|
||||
|
@ -55,5 +56,10 @@ func initializeSideChainCmd(cmd *cobra.Command, _ []string) error {
|
|||
}
|
||||
|
||||
cmd.Println("Stage 7: set addresses in NNS.")
|
||||
return setNNS(initCtx)
|
||||
if err := setNNS(initCtx); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
initCtx.AwaitDisabled = false
|
||||
return initCtx.AwaitTx()
|
||||
}
|
||||
|
|
|
@ -1,7 +1,6 @@
|
|||
package initialize
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"fmt"
|
||||
"math/big"
|
||||
|
||||
|
@ -11,11 +10,8 @@ import (
|
|||
"github.com/nspcc-dev/neo-go/pkg/core/state"
|
||||
"github.com/nspcc-dev/neo-go/pkg/core/transaction"
|
||||
"github.com/nspcc-dev/neo-go/pkg/io"
|
||||
"github.com/nspcc-dev/neo-go/pkg/rpcclient"
|
||||
"github.com/nspcc-dev/neo-go/pkg/rpcclient/actor"
|
||||
"github.com/nspcc-dev/neo-go/pkg/rpcclient/invoker"
|
||||
"github.com/nspcc-dev/neo-go/pkg/rpcclient/neo"
|
||||
"github.com/nspcc-dev/neo-go/pkg/rpcclient/nep17"
|
||||
"github.com/nspcc-dev/neo-go/pkg/rpcclient/unwrap"
|
||||
"github.com/nspcc-dev/neo-go/pkg/smartcontract/callflag"
|
||||
"github.com/nspcc-dev/neo-go/pkg/util"
|
||||
|
@ -30,7 +26,8 @@ const (
|
|||
)
|
||||
|
||||
func registerCandidateRange(c *helper.InitializeContext, start, end int) error {
|
||||
regPrice, err := getCandidateRegisterPrice(c)
|
||||
reader := neo.NewReader(c.ReadOnlyInvoker)
|
||||
regPrice, err := reader.GetRegisterPrice()
|
||||
if err != nil {
|
||||
return fmt.Errorf("can't fetch registration price: %w", err)
|
||||
}
|
||||
|
@ -116,7 +113,7 @@ func registerCandidates(c *helper.InitializeContext) error {
|
|||
func transferNEOToAlphabetContracts(c *helper.InitializeContext) error {
|
||||
neoHash := neo.Hash
|
||||
|
||||
ok, err := transferNEOFinished(c, neoHash)
|
||||
ok, err := transferNEOFinished(c)
|
||||
if ok || err != nil {
|
||||
return err
|
||||
}
|
||||
|
@ -139,33 +136,8 @@ func transferNEOToAlphabetContracts(c *helper.InitializeContext) error {
|
|||
return c.AwaitTx()
|
||||
}
|
||||
|
||||
func transferNEOFinished(c *helper.InitializeContext, neoHash util.Uint160) (bool, error) {
|
||||
r := nep17.NewReader(c.ReadOnlyInvoker, neoHash)
|
||||
func transferNEOFinished(c *helper.InitializeContext) (bool, error) {
|
||||
r := neo.NewReader(c.ReadOnlyInvoker)
|
||||
bal, err := r.BalanceOf(c.CommitteeAcc.Contract.ScriptHash())
|
||||
return bal.Cmp(big.NewInt(native.NEOTotalSupply)) == -1, err
|
||||
}
|
||||
|
||||
var errGetPriceInvalid = errors.New("`getRegisterPrice`: invalid response")
|
||||
|
||||
func getCandidateRegisterPrice(c *helper.InitializeContext) (int64, error) {
|
||||
switch c.Client.(type) {
|
||||
case *rpcclient.Client:
|
||||
inv := invoker.New(c.Client, nil)
|
||||
reader := neo.NewReader(inv)
|
||||
return reader.GetRegisterPrice()
|
||||
default:
|
||||
neoHash := neo.Hash
|
||||
res, err := helper.InvokeFunction(c.Client, neoHash, "getRegisterPrice", nil, nil)
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
if len(res.Stack) == 0 {
|
||||
return 0, errGetPriceInvalid
|
||||
}
|
||||
bi, err := res.Stack[0].TryInteger()
|
||||
if err != nil || !bi.IsInt64() {
|
||||
return 0, errGetPriceInvalid
|
||||
}
|
||||
return bi.Int64(), nil
|
||||
}
|
||||
}
|
||||
|
|
|
@ -108,6 +108,7 @@ type applicationConfiguration struct {
|
|||
level string
|
||||
destination string
|
||||
timestamp bool
|
||||
options []zap.Option
|
||||
}
|
||||
|
||||
ObjectCfg struct {
|
||||
|
@ -232,6 +233,14 @@ func (a *applicationConfiguration) readConfig(c *config.Config) error {
|
|||
a.LoggerCfg.level = loggerconfig.Level(c)
|
||||
a.LoggerCfg.destination = loggerconfig.Destination(c)
|
||||
a.LoggerCfg.timestamp = loggerconfig.Timestamp(c)
|
||||
var opts []zap.Option
|
||||
if loggerconfig.ToLokiConfig(c).Enabled {
|
||||
opts = []zap.Option{zap.WrapCore(func(core zapcore.Core) zapcore.Core {
|
||||
lokiCore := lokicore.New(core, loggerconfig.ToLokiConfig(c))
|
||||
return lokiCore
|
||||
})}
|
||||
}
|
||||
a.LoggerCfg.options = opts
|
||||
|
||||
// Object
|
||||
|
||||
|
@ -718,12 +727,6 @@ func initCfg(appCfg *config.Config) *cfg {
|
|||
logPrm.SamplingHook = c.metricsCollector.LogMetrics().GetSamplingHook()
|
||||
log, err := logger.NewLogger(logPrm)
|
||||
fatalOnErr(err)
|
||||
if loggerconfig.ToLokiConfig(appCfg).Enabled {
|
||||
log.WithOptions(zap.WrapCore(func(core zapcore.Core) zapcore.Core {
|
||||
lokiCore := lokicore.New(core, loggerconfig.ToLokiConfig(appCfg))
|
||||
return lokiCore
|
||||
}))
|
||||
}
|
||||
|
||||
c.internals = initInternals(appCfg, log)
|
||||
|
||||
|
@ -1090,6 +1093,7 @@ func (c *cfg) loggerPrm() (logger.Prm, error) {
|
|||
return logger.Prm{}, errors.New("incorrect log destination format: " + c.LoggerCfg.destination)
|
||||
}
|
||||
prm.PrependTimestamp = c.LoggerCfg.timestamp
|
||||
prm.Options = c.LoggerCfg.options
|
||||
|
||||
return prm, nil
|
||||
}
|
||||
|
|
|
@ -43,6 +43,9 @@ func initQoSService(c *cfg) {
|
|||
func (s *cfgQoSService) AdjustIncomingTag(ctx context.Context, requestSignPublicKey []byte) context.Context {
|
||||
rawTag, defined := qosTagging.IOTagFromContext(ctx)
|
||||
if !defined {
|
||||
if s.isInternalIOTagPublicKey(ctx, requestSignPublicKey) {
|
||||
return qosTagging.ContextWithIOTag(ctx, qos.IOTagInternal.String())
|
||||
}
|
||||
return qosTagging.ContextWithIOTag(ctx, qos.IOTagClient.String())
|
||||
}
|
||||
ioTag, err := qos.FromRawString(rawTag)
|
||||
|
@ -73,21 +76,9 @@ func (s *cfgQoSService) AdjustIncomingTag(ctx context.Context, requestSignPublic
|
|||
s.logger.Debug(ctx, logs.FailedToValidateIncomingIOTag)
|
||||
return qosTagging.ContextWithIOTag(ctx, qos.IOTagClient.String())
|
||||
case qos.IOTagInternal:
|
||||
for _, pk := range s.allowedInternalPubs {
|
||||
if bytes.Equal(pk, requestSignPublicKey) {
|
||||
if s.isInternalIOTagPublicKey(ctx, requestSignPublicKey) {
|
||||
return ctx
|
||||
}
|
||||
}
|
||||
nm, err := s.netmapSource.GetNetMap(ctx, 0)
|
||||
if err != nil {
|
||||
s.logger.Debug(ctx, logs.FailedToGetNetmapToAdjustIOTag, zap.Error(err))
|
||||
return qosTagging.ContextWithIOTag(ctx, qos.IOTagClient.String())
|
||||
}
|
||||
for _, node := range nm.Nodes() {
|
||||
if bytes.Equal(node.PublicKey(), requestSignPublicKey) {
|
||||
return ctx
|
||||
}
|
||||
}
|
||||
s.logger.Debug(ctx, logs.FailedToValidateIncomingIOTag)
|
||||
return qosTagging.ContextWithIOTag(ctx, qos.IOTagClient.String())
|
||||
default:
|
||||
|
@ -95,3 +86,23 @@ func (s *cfgQoSService) AdjustIncomingTag(ctx context.Context, requestSignPublic
|
|||
return qosTagging.ContextWithIOTag(ctx, qos.IOTagClient.String())
|
||||
}
|
||||
}
|
||||
|
||||
func (s *cfgQoSService) isInternalIOTagPublicKey(ctx context.Context, publicKey []byte) bool {
|
||||
for _, pk := range s.allowedInternalPubs {
|
||||
if bytes.Equal(pk, publicKey) {
|
||||
return true
|
||||
}
|
||||
}
|
||||
nm, err := s.netmapSource.GetNetMap(ctx, 0)
|
||||
if err != nil {
|
||||
s.logger.Debug(ctx, logs.FailedToGetNetmapToAdjustIOTag, zap.Error(err))
|
||||
return false
|
||||
}
|
||||
for _, node := range nm.Nodes() {
|
||||
if bytes.Equal(node.PublicKey(), publicKey) {
|
||||
return true
|
||||
}
|
||||
}
|
||||
|
||||
return false
|
||||
}
|
||||
|
|
226
cmd/frostfs-node/qos_test.go
Normal file
226
cmd/frostfs-node/qos_test.go
Normal file
|
@ -0,0 +1,226 @@
|
|||
package main
|
||||
|
||||
import (
|
||||
"context"
|
||||
"testing"
|
||||
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/qos"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger/test"
|
||||
utilTesting "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/testing"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-qos/tagging"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/netmap"
|
||||
"github.com/nspcc-dev/neo-go/pkg/crypto/keys"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
func TestQoSService_Client(t *testing.T) {
|
||||
t.Parallel()
|
||||
s, pk := testQoSServicePrepare(t)
|
||||
t.Run("IO tag client defined", func(t *testing.T) {
|
||||
ctx := tagging.ContextWithIOTag(context.Background(), qos.IOTagClient.String())
|
||||
ctx = s.AdjustIncomingTag(ctx, pk.Request)
|
||||
tag, ok := tagging.IOTagFromContext(ctx)
|
||||
require.True(t, ok)
|
||||
require.Equal(t, qos.IOTagClient.String(), tag)
|
||||
})
|
||||
t.Run("no IO tag defined, signed with unknown key", func(t *testing.T) {
|
||||
ctx := s.AdjustIncomingTag(context.Background(), pk.Request)
|
||||
tag, ok := tagging.IOTagFromContext(ctx)
|
||||
require.True(t, ok)
|
||||
require.Equal(t, qos.IOTagClient.String(), tag)
|
||||
})
|
||||
t.Run("no IO tag defined, signed with allowed critical key", func(t *testing.T) {
|
||||
ctx := s.AdjustIncomingTag(context.Background(), pk.Critical)
|
||||
tag, ok := tagging.IOTagFromContext(ctx)
|
||||
require.True(t, ok)
|
||||
require.Equal(t, qos.IOTagClient.String(), tag)
|
||||
})
|
||||
t.Run("unknown IO tag, signed with unknown key", func(t *testing.T) {
|
||||
ctx := tagging.ContextWithIOTag(context.Background(), "some IO tag we don't know")
|
||||
ctx = s.AdjustIncomingTag(ctx, pk.Request)
|
||||
tag, ok := tagging.IOTagFromContext(ctx)
|
||||
require.True(t, ok)
|
||||
require.Equal(t, qos.IOTagClient.String(), tag)
|
||||
})
|
||||
t.Run("unknown IO tag, signed with netmap key", func(t *testing.T) {
|
||||
ctx := tagging.ContextWithIOTag(context.Background(), "some IO tag we don't know")
|
||||
ctx = s.AdjustIncomingTag(ctx, pk.NetmapNode)
|
||||
tag, ok := tagging.IOTagFromContext(ctx)
|
||||
require.True(t, ok)
|
||||
require.Equal(t, qos.IOTagClient.String(), tag)
|
||||
})
|
||||
t.Run("unknown IO tag, signed with allowed internal key", func(t *testing.T) {
|
||||
ctx := tagging.ContextWithIOTag(context.Background(), "some IO tag we don't know")
|
||||
ctx = s.AdjustIncomingTag(ctx, pk.Internal)
|
||||
tag, ok := tagging.IOTagFromContext(ctx)
|
||||
require.True(t, ok)
|
||||
require.Equal(t, qos.IOTagClient.String(), tag)
|
||||
})
|
||||
t.Run("unknown IO tag, signed with allowed critical key", func(t *testing.T) {
|
||||
ctx := tagging.ContextWithIOTag(context.Background(), "some IO tag we don't know")
|
||||
ctx = s.AdjustIncomingTag(ctx, pk.Critical)
|
||||
tag, ok := tagging.IOTagFromContext(ctx)
|
||||
require.True(t, ok)
|
||||
require.Equal(t, qos.IOTagClient.String(), tag)
|
||||
})
|
||||
t.Run("IO tag internal defined, signed with unknown key", func(t *testing.T) {
|
||||
ctx := tagging.ContextWithIOTag(context.Background(), qos.IOTagInternal.String())
|
||||
ctx = s.AdjustIncomingTag(ctx, pk.Request)
|
||||
tag, ok := tagging.IOTagFromContext(ctx)
|
||||
require.True(t, ok)
|
||||
require.Equal(t, qos.IOTagClient.String(), tag)
|
||||
})
|
||||
t.Run("IO tag internal defined, signed with allowed critical key", func(t *testing.T) {
|
||||
ctx := tagging.ContextWithIOTag(context.Background(), qos.IOTagInternal.String())
|
||||
ctx = s.AdjustIncomingTag(ctx, pk.Critical)
|
||||
tag, ok := tagging.IOTagFromContext(ctx)
|
||||
require.True(t, ok)
|
||||
require.Equal(t, qos.IOTagClient.String(), tag)
|
||||
})
|
||||
t.Run("IO tag critical defined, signed with unknown key", func(t *testing.T) {
|
||||
ctx := tagging.ContextWithIOTag(context.Background(), qos.IOTagCritical.String())
|
||||
ctx = s.AdjustIncomingTag(ctx, pk.Request)
|
||||
tag, ok := tagging.IOTagFromContext(ctx)
|
||||
require.True(t, ok)
|
||||
require.Equal(t, qos.IOTagClient.String(), tag)
|
||||
})
|
||||
t.Run("IO tag critical defined, signed with allowed internal key", func(t *testing.T) {
|
||||
ctx := tagging.ContextWithIOTag(context.Background(), qos.IOTagCritical.String())
|
||||
ctx = s.AdjustIncomingTag(ctx, pk.Internal)
|
||||
tag, ok := tagging.IOTagFromContext(ctx)
|
||||
require.True(t, ok)
|
||||
require.Equal(t, qos.IOTagClient.String(), tag)
|
||||
})
|
||||
}
|
||||
|
||||
func TestQoSService_Internal(t *testing.T) {
|
||||
t.Parallel()
|
||||
s, pk := testQoSServicePrepare(t)
|
||||
t.Run("IO tag internal defined, signed with netmap key", func(t *testing.T) {
|
||||
ctx := tagging.ContextWithIOTag(context.Background(), qos.IOTagInternal.String())
|
||||
ctx = s.AdjustIncomingTag(ctx, pk.NetmapNode)
|
||||
tag, ok := tagging.IOTagFromContext(ctx)
|
||||
require.True(t, ok)
|
||||
require.Equal(t, qos.IOTagInternal.String(), tag)
|
||||
})
|
||||
t.Run("IO tag internal defined, signed with allowed internal key", func(t *testing.T) {
|
||||
ctx := tagging.ContextWithIOTag(context.Background(), qos.IOTagInternal.String())
|
||||
ctx = s.AdjustIncomingTag(ctx, pk.Internal)
|
||||
tag, ok := tagging.IOTagFromContext(ctx)
|
||||
require.True(t, ok)
|
||||
require.Equal(t, qos.IOTagInternal.String(), tag)
|
||||
})
|
||||
t.Run("no IO tag defined, signed with netmap key", func(t *testing.T) {
|
||||
ctx := s.AdjustIncomingTag(context.Background(), pk.NetmapNode)
|
||||
tag, ok := tagging.IOTagFromContext(ctx)
|
||||
require.True(t, ok)
|
||||
require.Equal(t, qos.IOTagInternal.String(), tag)
|
||||
})
|
||||
t.Run("no IO tag defined, signed with allowed internal key", func(t *testing.T) {
|
||||
ctx := s.AdjustIncomingTag(context.Background(), pk.Internal)
|
||||
tag, ok := tagging.IOTagFromContext(ctx)
|
||||
require.True(t, ok)
|
||||
require.Equal(t, qos.IOTagInternal.String(), tag)
|
||||
})
|
||||
}
|
||||
|
||||
func TestQoSService_Critical(t *testing.T) {
|
||||
t.Parallel()
|
||||
s, pk := testQoSServicePrepare(t)
|
||||
t.Run("IO tag critical defined, signed with netmap key", func(t *testing.T) {
|
||||
ctx := tagging.ContextWithIOTag(context.Background(), qos.IOTagCritical.String())
|
||||
ctx = s.AdjustIncomingTag(ctx, pk.NetmapNode)
|
||||
tag, ok := tagging.IOTagFromContext(ctx)
|
||||
require.True(t, ok)
|
||||
require.Equal(t, qos.IOTagCritical.String(), tag)
|
||||
})
|
||||
t.Run("IO tag critical defined, signed with allowed critical key", func(t *testing.T) {
|
||||
ctx := tagging.ContextWithIOTag(context.Background(), qos.IOTagCritical.String())
|
||||
ctx = s.AdjustIncomingTag(ctx, pk.Critical)
|
||||
tag, ok := tagging.IOTagFromContext(ctx)
|
||||
require.True(t, ok)
|
||||
require.Equal(t, qos.IOTagCritical.String(), tag)
|
||||
})
|
||||
}
|
||||
|
||||
func TestQoSService_NetmapGetError(t *testing.T) {
|
||||
t.Parallel()
|
||||
s, pk := testQoSServicePrepare(t)
|
||||
s.netmapSource = &utilTesting.TestNetmapSource{}
|
||||
t.Run("IO tag internal defined, signed with netmap key", func(t *testing.T) {
|
||||
ctx := tagging.ContextWithIOTag(context.Background(), qos.IOTagInternal.String())
|
||||
ctx = s.AdjustIncomingTag(ctx, pk.NetmapNode)
|
||||
tag, ok := tagging.IOTagFromContext(ctx)
|
||||
require.True(t, ok)
|
||||
require.Equal(t, qos.IOTagClient.String(), tag)
|
||||
})
|
||||
t.Run("IO tag critical defined, signed with netmap key", func(t *testing.T) {
|
||||
ctx := tagging.ContextWithIOTag(context.Background(), qos.IOTagCritical.String())
|
||||
ctx = s.AdjustIncomingTag(ctx, pk.NetmapNode)
|
||||
tag, ok := tagging.IOTagFromContext(ctx)
|
||||
require.True(t, ok)
|
||||
require.Equal(t, qos.IOTagClient.String(), tag)
|
||||
})
|
||||
t.Run("no IO tag defined, signed with netmap key", func(t *testing.T) {
|
||||
ctx := s.AdjustIncomingTag(context.Background(), pk.NetmapNode)
|
||||
tag, ok := tagging.IOTagFromContext(ctx)
|
||||
require.True(t, ok)
|
||||
require.Equal(t, qos.IOTagClient.String(), tag)
|
||||
})
|
||||
t.Run("unknown IO tag, signed with netmap key", func(t *testing.T) {
|
||||
ctx := tagging.ContextWithIOTag(context.Background(), "some IO tag we don't know")
|
||||
ctx = s.AdjustIncomingTag(ctx, pk.NetmapNode)
|
||||
tag, ok := tagging.IOTagFromContext(ctx)
|
||||
require.True(t, ok)
|
||||
require.Equal(t, qos.IOTagClient.String(), tag)
|
||||
})
|
||||
}
|
||||
|
||||
func testQoSServicePrepare(t *testing.T) (*cfgQoSService, *testQoSServicePublicKeys) {
|
||||
nmSigner, err := keys.NewPrivateKey()
|
||||
require.NoError(t, err)
|
||||
|
||||
reqSigner, err := keys.NewPrivateKey()
|
||||
require.NoError(t, err)
|
||||
|
||||
allowedCritSigner, err := keys.NewPrivateKey()
|
||||
require.NoError(t, err)
|
||||
|
||||
allowedIntSigner, err := keys.NewPrivateKey()
|
||||
require.NoError(t, err)
|
||||
|
||||
var node netmap.NodeInfo
|
||||
node.SetPublicKey(nmSigner.PublicKey().Bytes())
|
||||
nm := &netmap.NetMap{}
|
||||
nm.SetEpoch(100)
|
||||
nm.SetNodes([]netmap.NodeInfo{node})
|
||||
|
||||
return &cfgQoSService{
|
||||
logger: test.NewLogger(t),
|
||||
netmapSource: &utilTesting.TestNetmapSource{
|
||||
Netmaps: map[uint64]*netmap.NetMap{
|
||||
100: nm,
|
||||
},
|
||||
CurrentEpoch: 100,
|
||||
},
|
||||
allowedCriticalPubs: [][]byte{
|
||||
allowedCritSigner.PublicKey().Bytes(),
|
||||
},
|
||||
allowedInternalPubs: [][]byte{
|
||||
allowedIntSigner.PublicKey().Bytes(),
|
||||
},
|
||||
},
|
||||
&testQoSServicePublicKeys{
|
||||
NetmapNode: nmSigner.PublicKey().Bytes(),
|
||||
Request: reqSigner.PublicKey().Bytes(),
|
||||
Internal: allowedIntSigner.PublicKey().Bytes(),
|
||||
Critical: allowedCritSigner.PublicKey().Bytes(),
|
||||
}
|
||||
}
|
||||
|
||||
type testQoSServicePublicKeys struct {
|
||||
NetmapNode []byte
|
||||
Request []byte
|
||||
Internal []byte
|
||||
Critical []byte
|
||||
}
|
4
go.mod
4
go.mod
|
@ -8,8 +8,8 @@ require (
|
|||
git.frostfs.info/TrueCloudLab/frostfs-crypto v0.6.0
|
||||
git.frostfs.info/TrueCloudLab/frostfs-locode-db v0.4.1-0.20240710074952-65761deb5c0d
|
||||
git.frostfs.info/TrueCloudLab/frostfs-observability v0.0.0-20250321063246-93b681a20248
|
||||
git.frostfs.info/TrueCloudLab/frostfs-qos v0.0.0-20250320142439-32079ad7c275
|
||||
git.frostfs.info/TrueCloudLab/frostfs-sdk-go v0.0.0-20250306092416-69b0711d12d9
|
||||
git.frostfs.info/TrueCloudLab/frostfs-qos v0.0.0-20250324133647-57d895c32167
|
||||
git.frostfs.info/TrueCloudLab/frostfs-sdk-go v0.0.0-20250307150202-749b4e9ab592
|
||||
git.frostfs.info/TrueCloudLab/hrw v1.2.1
|
||||
git.frostfs.info/TrueCloudLab/multinet v0.0.0-20241015075604-6cb0d80e0972
|
||||
git.frostfs.info/TrueCloudLab/policy-engine v0.0.0-20240822104152-a3bc3099bd5b
|
||||
|
|
8
go.sum
8
go.sum
|
@ -8,10 +8,10 @@ git.frostfs.info/TrueCloudLab/frostfs-locode-db v0.4.1-0.20240710074952-65761deb
|
|||
git.frostfs.info/TrueCloudLab/frostfs-locode-db v0.4.1-0.20240710074952-65761deb5c0d/go.mod h1:7ZZq8iguY7qFsXajdHGmZd2AW4QbucyrJwhbsRfOfek=
|
||||
git.frostfs.info/TrueCloudLab/frostfs-observability v0.0.0-20250321063246-93b681a20248 h1:fluzML8BIIabd07LyPSjc0JAV2qymWkPiFaLrXdALLA=
|
||||
git.frostfs.info/TrueCloudLab/frostfs-observability v0.0.0-20250321063246-93b681a20248/go.mod h1:kbwB4v2o6RyOfCo9kEFeUDZIX3LKhmS0yXPrtvzkQ1g=
|
||||
git.frostfs.info/TrueCloudLab/frostfs-qos v0.0.0-20250320142439-32079ad7c275 h1:WqWxCnCl2ekfjWja/CpGeF2rf4h0x199xhdnsm/j+E8=
|
||||
git.frostfs.info/TrueCloudLab/frostfs-qos v0.0.0-20250320142439-32079ad7c275/go.mod h1:PCijYq4oa8vKtIEcUX6jRiszI6XAW+nBwU+T1kB4d1U=
|
||||
git.frostfs.info/TrueCloudLab/frostfs-sdk-go v0.0.0-20250306092416-69b0711d12d9 h1:svCl6NDAPZ/KuQPjdVKo74RkCIANesxUPM45zQZDhSw=
|
||||
git.frostfs.info/TrueCloudLab/frostfs-sdk-go v0.0.0-20250306092416-69b0711d12d9/go.mod h1:aQpPWfG8oyfJ2X+FenPTJpSRWZjwcP5/RAtkW+/VEX8=
|
||||
git.frostfs.info/TrueCloudLab/frostfs-qos v0.0.0-20250324133647-57d895c32167 h1:NhqfqNcATndYwx413BaaYXxVJbkeu2vQOtVyxXw5xCQ=
|
||||
git.frostfs.info/TrueCloudLab/frostfs-qos v0.0.0-20250324133647-57d895c32167/go.mod h1:PCijYq4oa8vKtIEcUX6jRiszI6XAW+nBwU+T1kB4d1U=
|
||||
git.frostfs.info/TrueCloudLab/frostfs-sdk-go v0.0.0-20250307150202-749b4e9ab592 h1:n7Pl8V7O1yS07J/fqdbzZjVe/mQW42a7eS0QHfgrzJw=
|
||||
git.frostfs.info/TrueCloudLab/frostfs-sdk-go v0.0.0-20250307150202-749b4e9ab592/go.mod h1:aQpPWfG8oyfJ2X+FenPTJpSRWZjwcP5/RAtkW+/VEX8=
|
||||
git.frostfs.info/TrueCloudLab/hrw v1.2.1 h1:ccBRK21rFvY5R1WotI6LNoPlizk7qSvdfD8lNIRudVc=
|
||||
git.frostfs.info/TrueCloudLab/hrw v1.2.1/go.mod h1:C1Ygde2n843yTZEQ0FP69jYiuaYV0kriLvP4zm8JuvM=
|
||||
git.frostfs.info/TrueCloudLab/multinet v0.0.0-20241015075604-6cb0d80e0972 h1:/960fWeyn2AFHwQUwDsWB3sbP6lTEnFnMzLMM6tx6N8=
|
||||
|
|
|
@ -512,7 +512,7 @@ const (
|
|||
FailedToUpdateMultinetConfiguration = "failed to update multinet configuration"
|
||||
FailedToParseIncomingIOTag = "failed to parse incoming IO tag"
|
||||
NotSupportedIncomingIOTagReplacedWithClient = "incoming IO tag is not supported, replaced with `client`"
|
||||
FailedToGetNetmapToAdjustIOTag = "failed to get netmap to adjust IO tag, replaced with `client`"
|
||||
FailedToGetNetmapToAdjustIOTag = "failed to get netmap to adjust IO tag"
|
||||
FailedToValidateIncomingIOTag = "failed to validate incoming IO tag, replaced with `client`"
|
||||
WriteCacheFailedToAcquireRPSQuota = "writecache failed to acquire RPS quota to flush object"
|
||||
)
|
||||
|
|
|
@ -1,214 +0,0 @@
|
|||
package qos_test
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"testing"
|
||||
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/qos"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-qos/limiting"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-qos/tagging"
|
||||
apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status"
|
||||
"github.com/stretchr/testify/require"
|
||||
"google.golang.org/grpc"
|
||||
)
|
||||
|
||||
const (
|
||||
okKey = "ok"
|
||||
)
|
||||
|
||||
var (
|
||||
errTest = errors.New("mock")
|
||||
errWrongTag = errors.New("wrong tag")
|
||||
errNoTag = errors.New("failed to get tag from context")
|
||||
errResExhausted = new(apistatus.ResourceExhausted)
|
||||
tags = []qos.IOTag{qos.IOTagBackground, qos.IOTagWritecache, qos.IOTagPolicer, qos.IOTagTreeSync}
|
||||
)
|
||||
|
||||
type mockGRPCServerStream struct {
|
||||
grpc.ServerStream
|
||||
|
||||
ctx context.Context
|
||||
}
|
||||
|
||||
func (m *mockGRPCServerStream) Context() context.Context {
|
||||
return m.ctx
|
||||
}
|
||||
|
||||
type limiter struct {
|
||||
released bool
|
||||
}
|
||||
|
||||
func (l *limiter) Acquire(key string) (limiting.ReleaseFunc, bool) {
|
||||
if key != okKey {
|
||||
return nil, false
|
||||
}
|
||||
return func() { l.released = true }, true
|
||||
}
|
||||
|
||||
func unaryMaxActiveRPCLimiter(ctx context.Context, lim *limiter, methodName string) (bool, error) {
|
||||
interceptor := qos.NewMaxActiveRPCLimiterUnaryServerInterceptor(func() limiting.Limiter { return lim })
|
||||
called := false
|
||||
handler := func(ctx context.Context, req any) (any, error) {
|
||||
called = true
|
||||
return nil, errTest
|
||||
}
|
||||
_, err := interceptor(ctx, nil, &grpc.UnaryServerInfo{FullMethod: methodName}, handler)
|
||||
return called, err
|
||||
}
|
||||
|
||||
func streamMaxActiveRPCLimiter(ctx context.Context, lim *limiter, methodName string) (bool, error) {
|
||||
interceptor := qos.NewMaxActiveRPCLimiterStreamServerInterceptor(func() limiting.Limiter { return lim })
|
||||
called := false
|
||||
handler := func(srv any, stream grpc.ServerStream) error {
|
||||
called = true
|
||||
return errTest
|
||||
}
|
||||
err := interceptor(nil, &mockGRPCServerStream{ctx: ctx}, &grpc.StreamServerInfo{
|
||||
FullMethod: methodName,
|
||||
}, handler)
|
||||
return called, err
|
||||
}
|
||||
|
||||
func Test_MaxActiveRPCLimiter(t *testing.T) {
|
||||
// UnaryServerInterceptor
|
||||
t.Run("unary fail", func(t *testing.T) {
|
||||
var lim limiter
|
||||
|
||||
called, err := unaryMaxActiveRPCLimiter(context.Background(), &lim, "")
|
||||
require.EqualError(t, err, errResExhausted.Error())
|
||||
require.False(t, called)
|
||||
})
|
||||
t.Run("unary pass critical", func(t *testing.T) {
|
||||
var lim limiter
|
||||
ctx := tagging.ContextWithIOTag(context.Background(), qos.IOTagCritical.String())
|
||||
|
||||
called, err := unaryMaxActiveRPCLimiter(ctx, &lim, "")
|
||||
require.EqualError(t, err, errTest.Error())
|
||||
require.True(t, called)
|
||||
require.False(t, lim.released)
|
||||
})
|
||||
t.Run("unary pass", func(t *testing.T) {
|
||||
var lim limiter
|
||||
|
||||
called, err := unaryMaxActiveRPCLimiter(context.Background(), &lim, okKey)
|
||||
require.EqualError(t, err, errTest.Error())
|
||||
require.True(t, called && lim.released)
|
||||
})
|
||||
// StreamServerInterceptor
|
||||
t.Run("stream fail", func(t *testing.T) {
|
||||
var lim limiter
|
||||
|
||||
called, err := streamMaxActiveRPCLimiter(context.Background(), &lim, "")
|
||||
require.EqualError(t, err, errResExhausted.Error())
|
||||
require.False(t, called)
|
||||
})
|
||||
t.Run("stream pass critical", func(t *testing.T) {
|
||||
var lim limiter
|
||||
ctx := tagging.ContextWithIOTag(context.Background(), qos.IOTagCritical.String())
|
||||
|
||||
called, err := streamMaxActiveRPCLimiter(ctx, &lim, "")
|
||||
require.EqualError(t, err, errTest.Error())
|
||||
require.True(t, called)
|
||||
require.False(t, lim.released)
|
||||
})
|
||||
t.Run("stream pass", func(t *testing.T) {
|
||||
var lim limiter
|
||||
|
||||
called, err := streamMaxActiveRPCLimiter(context.Background(), &lim, okKey)
|
||||
require.EqualError(t, err, errTest.Error())
|
||||
require.True(t, called && lim.released)
|
||||
})
|
||||
}
|
||||
|
||||
func TestSetCriticalIOTagUnaryServerInterceptor_Pass(t *testing.T) {
|
||||
interceptor := qos.NewSetCriticalIOTagUnaryServerInterceptor()
|
||||
handler := func(ctx context.Context, req any) (any, error) {
|
||||
if tag, ok := tagging.IOTagFromContext(ctx); ok && tag == qos.IOTagCritical.String() {
|
||||
return nil, nil
|
||||
}
|
||||
return nil, errWrongTag
|
||||
}
|
||||
_, err := interceptor(context.Background(), nil, nil, handler)
|
||||
require.NoError(t, err)
|
||||
}
|
||||
|
||||
func TestAdjustOutgoingIOTagUnaryClientInterceptor(t *testing.T) {
|
||||
interceptor := qos.NewAdjustOutgoingIOTagUnaryClientInterceptor()
|
||||
|
||||
// check context with no value
|
||||
called := false
|
||||
invoker := func(ctx context.Context, method string, req, reply any, cc *grpc.ClientConn, opts ...grpc.CallOption) error {
|
||||
called = true
|
||||
if _, ok := tagging.IOTagFromContext(ctx); ok {
|
||||
return fmt.Errorf("%v: expected no IO tags", errWrongTag)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
require.NoError(t, interceptor(context.Background(), "", nil, nil, nil, invoker, nil))
|
||||
require.True(t, called)
|
||||
|
||||
// check context for internal tag
|
||||
targetTag := qos.IOTagInternal.String()
|
||||
invoker = func(ctx context.Context, method string, req, reply any, cc *grpc.ClientConn, opts ...grpc.CallOption) error {
|
||||
raw, ok := tagging.IOTagFromContext(ctx)
|
||||
if !ok {
|
||||
return errNoTag
|
||||
}
|
||||
if raw != targetTag {
|
||||
return errWrongTag
|
||||
}
|
||||
return nil
|
||||
}
|
||||
for _, tag := range tags {
|
||||
ctx := tagging.ContextWithIOTag(context.Background(), tag.String())
|
||||
require.NoError(t, interceptor(ctx, "", nil, nil, nil, invoker, nil))
|
||||
}
|
||||
|
||||
// check context for client tag
|
||||
ctx := tagging.ContextWithIOTag(context.Background(), "")
|
||||
targetTag = qos.IOTagClient.String()
|
||||
require.NoError(t, interceptor(ctx, "", nil, nil, nil, invoker, nil))
|
||||
}
|
||||
|
||||
func TestAdjustOutgoingIOTagStreamClientInterceptor(t *testing.T) {
|
||||
interceptor := qos.NewAdjustOutgoingIOTagStreamClientInterceptor()
|
||||
|
||||
// check context with no value
|
||||
called := false
|
||||
streamer := func(ctx context.Context, desc *grpc.StreamDesc, cc *grpc.ClientConn, method string, opts ...grpc.CallOption) (grpc.ClientStream, error) {
|
||||
called = true
|
||||
if _, ok := tagging.IOTagFromContext(ctx); ok {
|
||||
return nil, fmt.Errorf("%v: expected no IO tags", errWrongTag)
|
||||
}
|
||||
return nil, nil
|
||||
}
|
||||
_, err := interceptor(context.Background(), nil, nil, "", streamer, nil)
|
||||
require.True(t, called)
|
||||
require.NoError(t, err)
|
||||
|
||||
// check context for internal tag
|
||||
targetTag := qos.IOTagInternal.String()
|
||||
streamer = func(ctx context.Context, desc *grpc.StreamDesc, cc *grpc.ClientConn, method string, opts ...grpc.CallOption) (grpc.ClientStream, error) {
|
||||
raw, ok := tagging.IOTagFromContext(ctx)
|
||||
if !ok {
|
||||
return nil, errNoTag
|
||||
}
|
||||
if raw != targetTag {
|
||||
return nil, errWrongTag
|
||||
}
|
||||
return nil, nil
|
||||
}
|
||||
for _, tag := range tags {
|
||||
ctx := tagging.ContextWithIOTag(context.Background(), tag.String())
|
||||
_, err := interceptor(ctx, nil, nil, "", streamer, nil)
|
||||
require.NoError(t, err)
|
||||
}
|
||||
|
||||
// check context for client tag
|
||||
ctx := tagging.ContextWithIOTag(context.Background(), "")
|
||||
targetTag = qos.IOTagClient.String()
|
||||
_, err = interceptor(ctx, nil, nil, "", streamer, nil)
|
||||
require.NoError(t, err)
|
||||
}
|
|
@ -9,6 +9,7 @@ import (
|
|||
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/container"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger"
|
||||
utilTesting "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/testing"
|
||||
objectV2 "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/api/object"
|
||||
containerSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container"
|
||||
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
|
||||
|
@ -410,11 +411,11 @@ func TestFormatValidator_ValidateTokenIssuer(t *testing.T) {
|
|||
},
|
||||
),
|
||||
WithNetmapSource(
|
||||
&testNetmapSource{
|
||||
netmaps: map[uint64]*netmap.NetMap{
|
||||
&utilTesting.TestNetmapSource{
|
||||
Netmaps: map[uint64]*netmap.NetMap{
|
||||
curEpoch: currentEpochNM,
|
||||
},
|
||||
currentEpoch: curEpoch,
|
||||
CurrentEpoch: curEpoch,
|
||||
},
|
||||
),
|
||||
WithLogger(logger.NewLoggerWrapper(zaptest.NewLogger(t))),
|
||||
|
@ -483,12 +484,12 @@ func TestFormatValidator_ValidateTokenIssuer(t *testing.T) {
|
|||
},
|
||||
),
|
||||
WithNetmapSource(
|
||||
&testNetmapSource{
|
||||
netmaps: map[uint64]*netmap.NetMap{
|
||||
&utilTesting.TestNetmapSource{
|
||||
Netmaps: map[uint64]*netmap.NetMap{
|
||||
curEpoch: currentEpochNM,
|
||||
curEpoch - 1: previousEpochNM,
|
||||
},
|
||||
currentEpoch: curEpoch,
|
||||
CurrentEpoch: curEpoch,
|
||||
},
|
||||
),
|
||||
WithLogger(logger.NewLoggerWrapper(zaptest.NewLogger(t))),
|
||||
|
@ -559,12 +560,12 @@ func TestFormatValidator_ValidateTokenIssuer(t *testing.T) {
|
|||
},
|
||||
),
|
||||
WithNetmapSource(
|
||||
&testNetmapSource{
|
||||
netmaps: map[uint64]*netmap.NetMap{
|
||||
&utilTesting.TestNetmapSource{
|
||||
Netmaps: map[uint64]*netmap.NetMap{
|
||||
curEpoch: currentEpochNM,
|
||||
curEpoch - 1: previousEpochNM,
|
||||
},
|
||||
currentEpoch: curEpoch,
|
||||
CurrentEpoch: curEpoch,
|
||||
},
|
||||
),
|
||||
WithLogger(logger.NewLoggerWrapper(zaptest.NewLogger(t))),
|
||||
|
@ -596,26 +597,3 @@ func (s *testContainerSource) Get(ctx context.Context, cnrID cid.ID) (*container
|
|||
func (s *testContainerSource) DeletionInfo(context.Context, cid.ID) (*container.DelInfo, error) {
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
type testNetmapSource struct {
|
||||
netmaps map[uint64]*netmap.NetMap
|
||||
currentEpoch uint64
|
||||
}
|
||||
|
||||
func (s *testNetmapSource) GetNetMap(ctx context.Context, diff uint64) (*netmap.NetMap, error) {
|
||||
if diff >= s.currentEpoch {
|
||||
return nil, fmt.Errorf("invalid diff")
|
||||
}
|
||||
return s.GetNetMapByEpoch(ctx, s.currentEpoch-diff)
|
||||
}
|
||||
|
||||
func (s *testNetmapSource) GetNetMapByEpoch(ctx context.Context, epoch uint64) (*netmap.NetMap, error) {
|
||||
if nm, found := s.netmaps[epoch]; found {
|
||||
return nm, nil
|
||||
}
|
||||
return nil, fmt.Errorf("netmap not found")
|
||||
}
|
||||
|
||||
func (s *testNetmapSource) Epoch(ctx context.Context) (uint64, error) {
|
||||
return s.currentEpoch, nil
|
||||
}
|
||||
|
|
|
@ -139,8 +139,7 @@ func (db *DB) listWithCursor(tx *bbolt.Tx, result []objectcore.Info, count int,
|
|||
|
||||
var containerID cid.ID
|
||||
var offset []byte
|
||||
graveyardBkt := tx.Bucket(graveyardBucketName)
|
||||
garbageBkt := tx.Bucket(garbageBucketName)
|
||||
bc := newBucketCache()
|
||||
|
||||
rawAddr := make([]byte, cidSize, addressKeySize)
|
||||
|
||||
|
@ -169,7 +168,7 @@ loop:
|
|||
bkt := tx.Bucket(name)
|
||||
if bkt != nil {
|
||||
copy(rawAddr, cidRaw)
|
||||
result, offset, cursor, err = selectNFromBucket(bkt, objType, graveyardBkt, garbageBkt, rawAddr, containerID,
|
||||
result, offset, cursor, err = selectNFromBucket(bc, bkt, objType, rawAddr, containerID,
|
||||
result, count, cursor, threshold, currEpoch)
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
|
@ -204,9 +203,10 @@ loop:
|
|||
|
||||
// selectNFromBucket similar to selectAllFromBucket but uses cursor to find
|
||||
// object to start selecting from. Ignores inhumed objects.
|
||||
func selectNFromBucket(bkt *bbolt.Bucket, // main bucket
|
||||
func selectNFromBucket(
|
||||
bc *bucketCache,
|
||||
bkt *bbolt.Bucket, // main bucket
|
||||
objType objectSDK.Type, // type of the objects stored in the main bucket
|
||||
graveyardBkt, garbageBkt *bbolt.Bucket, // cached graveyard buckets
|
||||
cidRaw []byte, // container ID prefix, optimization
|
||||
cnt cid.ID, // container ID
|
||||
to []objectcore.Info, // listing result
|
||||
|
@ -219,7 +219,6 @@ func selectNFromBucket(bkt *bbolt.Bucket, // main bucket
|
|||
cursor = new(Cursor)
|
||||
}
|
||||
|
||||
count := len(to)
|
||||
c := bkt.Cursor()
|
||||
k, v := c.First()
|
||||
|
||||
|
@ -231,7 +230,7 @@ func selectNFromBucket(bkt *bbolt.Bucket, // main bucket
|
|||
}
|
||||
|
||||
for ; k != nil; k, v = c.Next() {
|
||||
if count >= limit {
|
||||
if len(to) >= limit {
|
||||
break
|
||||
}
|
||||
|
||||
|
@ -241,6 +240,8 @@ func selectNFromBucket(bkt *bbolt.Bucket, // main bucket
|
|||
}
|
||||
|
||||
offset = k
|
||||
graveyardBkt := getGraveyardBucket(bc, bkt.Tx())
|
||||
garbageBkt := getGarbageBucket(bc, bkt.Tx())
|
||||
if inGraveyardWithKey(append(cidRaw, k...), graveyardBkt, garbageBkt) > 0 {
|
||||
continue
|
||||
}
|
||||
|
@ -251,7 +252,7 @@ func selectNFromBucket(bkt *bbolt.Bucket, // main bucket
|
|||
}
|
||||
|
||||
expEpoch, hasExpEpoch := hasExpirationEpoch(&o)
|
||||
if hasExpEpoch && expEpoch < currEpoch && !objectLocked(bkt.Tx(), cnt, obj) {
|
||||
if hasExpEpoch && expEpoch < currEpoch && !objectLockedWithCache(bc, bkt.Tx(), cnt, obj) {
|
||||
continue
|
||||
}
|
||||
|
||||
|
@ -273,7 +274,6 @@ func selectNFromBucket(bkt *bbolt.Bucket, // main bucket
|
|||
a.SetContainer(cnt)
|
||||
a.SetObject(obj)
|
||||
to = append(to, objectcore.Info{Address: a, Type: objType, IsLinkingObject: isLinkingObj, ECInfo: ecInfo})
|
||||
count++
|
||||
}
|
||||
|
||||
return to, offset, cursor, nil
|
||||
|
|
|
@ -527,7 +527,8 @@ func (s *Shard) collectExpiredTombstones(ctx context.Context, e Event) {
|
|||
return
|
||||
}
|
||||
|
||||
release, err := s.opsLimiter.ReadRequest(ctx)
|
||||
var release qos.ReleaseFunc
|
||||
release, err = s.opsLimiter.ReadRequest(ctx)
|
||||
if err != nil {
|
||||
log.Error(ctx, logs.ShardIteratorOverGraveyardFailed, zap.Error(err))
|
||||
s.m.RUnlock()
|
||||
|
|
|
@ -36,6 +36,9 @@ type Prm struct {
|
|||
|
||||
// PrependTimestamp specifies whether to prepend a timestamp in the log
|
||||
PrependTimestamp bool
|
||||
|
||||
// Options for zap.Logger
|
||||
Options []zap.Option
|
||||
}
|
||||
|
||||
const (
|
||||
|
@ -103,10 +106,12 @@ func newConsoleLogger(prm Prm) (*Logger, error) {
|
|||
c.EncoderConfig.TimeKey = ""
|
||||
}
|
||||
|
||||
lZap, err := c.Build(
|
||||
opts := []zap.Option{
|
||||
zap.AddStacktrace(zap.NewAtomicLevelAt(zap.FatalLevel)),
|
||||
zap.AddCallerSkip(1),
|
||||
)
|
||||
}
|
||||
opts = append(opts, prm.Options...)
|
||||
lZap, err := c.Build(opts...)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
@ -150,7 +155,12 @@ func newJournaldLogger(prm Prm) (*Logger, error) {
|
|||
c.Sampling.Thereafter,
|
||||
samplerOpts...,
|
||||
)
|
||||
lZap := zap.New(samplingCore, zap.AddStacktrace(zap.NewAtomicLevelAt(zap.FatalLevel)), zap.AddCallerSkip(1))
|
||||
opts := []zap.Option{
|
||||
zap.AddStacktrace(zap.NewAtomicLevelAt(zap.FatalLevel)),
|
||||
zap.AddCallerSkip(1),
|
||||
}
|
||||
opts = append(opts, prm.Options...)
|
||||
lZap := zap.New(samplingCore, opts...)
|
||||
|
||||
l := &Logger{z: lZap, lvl: lvl}
|
||||
|
||||
|
@ -161,10 +171,6 @@ func (l *Logger) Reload(prm Prm) {
|
|||
l.lvl.SetLevel(prm.level)
|
||||
}
|
||||
|
||||
func (l *Logger) WithOptions(options ...zap.Option) {
|
||||
l.z = l.z.WithOptions(options...)
|
||||
}
|
||||
|
||||
func (l *Logger) With(fields ...zap.Field) *Logger {
|
||||
return &Logger{z: l.z.With(fields...)}
|
||||
}
|
||||
|
|
36
pkg/util/testing/netmap_source.go
Normal file
36
pkg/util/testing/netmap_source.go
Normal file
|
@ -0,0 +1,36 @@
|
|||
package testing
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/netmap"
|
||||
)
|
||||
|
||||
var (
|
||||
errInvalidDiff = errors.New("invalid diff")
|
||||
errNetmapNotFound = errors.New("netmap not found")
|
||||
)
|
||||
|
||||
type TestNetmapSource struct {
|
||||
Netmaps map[uint64]*netmap.NetMap
|
||||
CurrentEpoch uint64
|
||||
}
|
||||
|
||||
func (s *TestNetmapSource) GetNetMap(ctx context.Context, diff uint64) (*netmap.NetMap, error) {
|
||||
if diff >= s.CurrentEpoch {
|
||||
return nil, errInvalidDiff
|
||||
}
|
||||
return s.GetNetMapByEpoch(ctx, s.CurrentEpoch-diff)
|
||||
}
|
||||
|
||||
func (s *TestNetmapSource) GetNetMapByEpoch(_ context.Context, epoch uint64) (*netmap.NetMap, error) {
|
||||
if nm, found := s.Netmaps[epoch]; found {
|
||||
return nm, nil
|
||||
}
|
||||
return nil, errNetmapNotFound
|
||||
}
|
||||
|
||||
func (s *TestNetmapSource) Epoch(context.Context) (uint64, error) {
|
||||
return s.CurrentEpoch, nil
|
||||
}
|
Loading…
Add table
Reference in a new issue