Compare commits

..

31 commits

Author SHA1 Message Date
e142d25fac
[#1700] gc: Wait for handlers on GC stopping
First wait for goroutine handles epoch events to not to get data race
on `gc.newEpochHandlers.cancelFunc`.

Then cancel handlers and wait for them.

Change-Id: I71f11f8526961f8356f582a95b10eb8340c0aedd
Signed-off-by: Dmitrii Stepanov <d.stepanov@yadro.com>
2025-04-01 16:00:41 +03:00
bd1c18e117
[#1689] cli/tree: Copy dial options from the service code
There should be no `grpcs://` prefix in address and credentials should
be picked.

Change-Id: I58cdc98b079eac2c7db7dc088f4f131794a91b9f
Signed-off-by: Evgenii Stratonikov <e.stratonikov@yadro.com>
2025-04-01 14:40:33 +03:00
b27f7d1d17
[#1689] treesvc: Use context dialer in synchronizeTree()
This dialer supports source-based routing and is already used in cache.

Change-Id: Ic7852edd2faea4e5d8667221e6f681cc82bb143a
Signed-off-by: Evgenii Stratonikov <e.stratonikov@yadro.com>
2025-04-01 14:40:33 +03:00
3cd8080232
[#1689] treesvc: Fix dial options for TLS connections
There are two problems with the current approach:
1. For TLS connections we need different transport credentials.
2. grpc.NewClient() considers scheme from `URIAddr()` as a scheme for a
   resolver. `grpcs://` scheme doesn't exist, though, so the default one
   is picked. The default resolver (`dns://`) is in turn unable to parse the
   5edab9e554/internal/resolver/dns/dns_resolver.go (L405)
   The error  is `grpcs://192.168.198.248:8081:443: too many colons in address`.

Both problems don't exist in the SDK code, take it from there.

Change-Id: Ia1212050f539162a560796685efdc3f9cfbf80a0
Signed-off-by: Evgenii Stratonikov <e.stratonikov@yadro.com>
2025-04-01 14:40:33 +03:00
a11b54ca15
[#1689] treesvc: Unify gRPC client creation for cache and sync
They connect to the same endpoints, the only difference is that
connection for synchronization is limited in lifetime and is closed
after the sync is finished. This is probably not intentional, as
synchronization was implemented before cache was introduced.
However, reusing dialTreeService() in sync.go has possible perfomance
implications, so is avoided for now.

Change-Id: I2e37befd783b4d873ff833969f932deded1195be
Signed-off-by: Evgenii Stratonikov <e.stratonikov@yadro.com>
2025-04-01 14:40:33 +03:00
b112a92408
[#1689] treesvc: Create request after client is initialized
Make it easier to follow.

Change-Id: I40c4db77f015bb45cb25f16ce24e68188fc14380
Signed-off-by: Evgenii Stratonikov <e.stratonikov@yadro.com>
2025-04-01 14:40:33 +03:00
19ca907223
[#1689] treesvc: Untie createConnection() from Service struct
Change-Id: I6212de4b81afe8c2516981a7bb2fea099c7df773
Signed-off-by: Evgenii Stratonikov <e.stratonikov@yadro.com>
2025-04-01 14:40:33 +03:00
f62d81e26a
[#1700] gc: Take mode mutex in locks handlers
Change-Id: I4408eae3aed936f85427b6246dcf727bd6813a0d
Signed-off-by: Dmitrii Stepanov <d.stepanov@yadro.com>
2025-04-01 14:23:03 +03:00
27899598dc
[#1700] gc: Drop Event interface
There is only one event: new epoch.

Change-Id: I982f3650f7bc753ff2782393625452f0f8cdcc35
Signed-off-by: Dmitrii Stepanov <d.stepanov@yadro.com>
2025-04-01 14:23:02 +03:00
bc6cc9ae2a
[#1700] engine: Print stacks on test request limiter
Change-Id: I4952769ca431d1049955823b41b99b0984b385fc
Signed-off-by: Dmitrii Stepanov <d.stepanov@yadro.com>
2025-04-01 14:23:02 +03:00
6e1576cfdb [#1656] qos: Add tests for AdjustOutgoingIOTag Interceptors
Change-Id: If534e756b26cf7f202039d48ecdf554b4283728b
Signed-off-by: Ekaterina Lebedeva <ekaterina.lebedeva@yadro.com>
2025-04-01 11:55:15 +03:00
a5bae6c5af
[#1699] qos: Allow to prohibit operations for IO tag
Change-Id: I2bee26885244e241d224860978b6de3526527e96
Signed-off-by: Dmitrii Stepanov <d.stepanov@yadro.com>
2025-04-01 10:08:03 +03:00
5a13830a94
[#1699] mod: Bump frostfs-qos version
Change-Id: Ie5e708c0ca653596c6e3346aa286618868a5aee8
Signed-off-by: Dmitrii Stepanov <d.stepanov@yadro.com>
2025-04-01 10:08:03 +03:00
dcb2b23a7d [#1656] qos: Add test for SetCriticalIOTag Interceptor
Change-Id: I4a55fcb84e6f65408a1c0120ac917e49e23354a1
Signed-off-by: Ekaterina Lebedeva <ekaterina.lebedeva@yadro.com>
2025-03-31 18:21:48 +03:00
115aae7c34 [#1656] qos: Add tests for MaxActiveRPCLimiter Interceptors
Change-Id: Ib65890ae5aec34c34e15d4ec1f05952f74f1ad26
Signed-off-by: Ekaterina Lebedeva <ekaterina.lebedeva@yadro.com>
2025-03-31 18:21:46 +03:00
12a0537a7a [#1689] ci: Add commit checker to Jenkinsfile
- Commit checker image is built from dco-go:
  TrueCloudLab/dco-go#14
- 'pull_request_target' branch is defined in Jenkins job:
  TrueCloudLab/jenkins#10
  TrueCloudLab/jenkins#11

Change-Id: Ib86c5749f9e084d736b868240c4b47014b02ba8d
Signed-off-by: Vitaliy Potyarkin <v.potyarkin@yadro.com>
2025-03-31 15:08:59 +00:00
30d4692c3e [#1640] go.mod: Bump version for frostfs-locode-db
Change-Id: Ic45ae77d6209c0097575fc8f89b076b22d50d149
Signed-off-by: Anton Nikiforov <an.nikiforov@yadro.com>
2025-03-31 10:29:41 +00:00
2254c8aff5 [#1689] go.mod: Bump SDK version
Change-Id: Ic946aa68c3d6da9e7d54363f8e9141c6547707d6
Signed-off-by: Airat Arifullin <a.arifullin@yadro.com>
2025-03-31 11:55:29 +03:00
d432bebef4 [#1689] go.mod: Bump frostfs-qos version
Change-Id: Iaa28da1a1e7b2f4ab7fd8ed661939eb38f4c7782
Signed-off-by: Dmitrii Stepanov <d.stepanov@yadro.com>
2025-03-28 12:40:05 +00:00
d144abc977 [#1692] metabase: Remove useless count variable
It is always equal to `len(to)`.

Change-Id: Id7a4c26e9711216b78f96e6b2511efa0773e3471
Signed-off-by: Evgenii Stratonikov <e.stratonikov@yadro.com>
2025-03-28 12:16:01 +00:00
a2053870e2 [#1692] metabase: Use bucket cache in ListWithCursor()
No changes in speed, but unified approach:
```
goos: linux
goarch: amd64
pkg: git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/metabase
cpu: 11th Gen Intel(R) Core(TM) i5-1135G7 @ 2.40GHz
                           │    master    │                 new                 │
                           │    sec/op    │    sec/op     vs base               │
ListWithCursor/1_item-8      6.067µ ±  8%   5.731µ ± 10%       ~ (p=0.052 n=10)
ListWithCursor/10_items-8    25.40µ ± 11%   26.12µ ± 13%       ~ (p=0.971 n=10)
ListWithCursor/100_items-8   210.7µ ±  9%   203.2µ ±  6%       ~ (p=0.280 n=10)
geomean                      31.90µ         31.22µ        -2.16%

                           │    master    │                  new                  │
                           │     B/op     │     B/op      vs base                 │
ListWithCursor/1_item-8      3.287Ki ± 0%   3.287Ki ± 0%       ~ (p=1.000 n=10) ¹
ListWithCursor/10_items-8    15.63Ki ± 0%   15.62Ki ± 0%       ~ (p=0.328 n=10)
ListWithCursor/100_items-8   138.1Ki ± 0%   138.1Ki ± 0%       ~ (p=0.340 n=10)
geomean                      19.21Ki        19.21Ki       -0.00%
¹ all samples are equal

                           │   master    │                 new                  │
                           │  allocs/op  │  allocs/op   vs base                 │
ListWithCursor/1_item-8       109.0 ± 0%    109.0 ± 0%       ~ (p=1.000 n=10) ¹
ListWithCursor/10_items-8     380.0 ± 0%    380.0 ± 0%       ~ (p=1.000 n=10) ¹
ListWithCursor/100_items-8   3.082k ± 0%   3.082k ± 0%       ~ (p=1.000 n=10) ¹
geomean                       503.5         503.5       +0.00%
¹ all samples are equal
```

Change-Id: Ic11673427615053656b2a60068a6d4dbd27af2cb
Signed-off-by: Evgenii Stratonikov <e.stratonikov@yadro.com>
2025-03-28 12:16:01 +00:00
d00c606fee [#652] adm: Group independent stages in batches
Each stage waits until transaction persists. This is needed to ensure
the next stage will see the result of the previous one. However, some of
the stages do not depend one on another, so we may execute them in
parallel.

`AwaitDisabled` flag is used to localize this batching on the code
level. We could've removed `AwaitTx()` from respective stages, but it
seems more error prone.

Close #652.

Change-Id: Ib9c6f6cd5e0db0f31aa1cda8e127b1fad5166336
Signed-off-by: Evgenii Stratonikov <e.stratonikov@yadro.com>
2025-03-28 12:15:21 +00:00
60446bb668 [#1689] adm/helper: Use proper nns bindings import
The one in `neo-go` is for another contract.

Change-Id: Ia1ac2da5e419b48801afdb26df72892d77344e0d
Signed-off-by: Evgenii Stratonikov <e.stratonikov@yadro.com>
2025-03-28 12:15:21 +00:00
bd8ab2d84a [#1689] adm: Remove useless switch in NNSIsAvailable()
After all the refactorings, there is no need to have custom behaviour
for local client.

Change-Id: I99e297cdeffff979524b3f89d3580ab5780e7681
Signed-off-by: Evgenii Stratonikov <e.stratonikov@yadro.com>
2025-03-28 12:15:21 +00:00
bce2f7bef0 [#1689] adm: Reuse neo.NewReader helper in transferNEOFinished()
Change-Id: I27980ed87436958cb4d27278e30e05da021d1506
Signed-off-by: Evgenii Stratonikov <e.stratonikov@yadro.com>
2025-03-28 12:10:37 +00:00
c2c05e2228 [#1689] adm: Reuse ReadOnlyInvoker in registerCandidateRange()
Change-Id: I544d10340825494b45a62700fa247404c18f746a
Signed-off-by: Evgenii Stratonikov <e.stratonikov@yadro.com>
2025-03-28 12:10:37 +00:00
0a38571a10 [#1689] adm: Simplify getCandidateRegisterPrice()
After all the refactoring, there is no more need to have custom branch
for the local client.

Change-Id: I274305b0c390578fb4583759135d3e7ce58873dc
Signed-off-by: Evgenii Stratonikov <e.stratonikov@yadro.com>
2025-03-28 12:10:31 +00:00
632bd8e38d [#1696] qos: Fix internal tag adjust
If request has no tag, but request's public key is netmap node's key or
one of allowed internal tag keys from config, then request must use
internal IO tag.

Change-Id: Iff93b626941a81b088d8999b3f2947f9501dcdf8
Signed-off-by: Dmitrii Stepanov <d.stepanov@yadro.com>
2025-03-28 07:47:12 +00:00
3bbee1b554 [#1619] logger: Allow to set options for zap.Logger via logger.Prm
Change-Id: I8eed951c25d1ecf18b0aea62c6825be65a450085
Signed-off-by: Anton Nikiforov <an.nikiforov@yadro.com>
2025-03-27 15:41:37 +03:00
9358938222 [#1633] go.mod: Bump frostfs-sdk-go
Change-Id: I50c1a0d5b88e307402a5b1b2883bb9b9a357a2c7
Signed-off-by: Anton Nikiforov <an.nikiforov@yadro.com>
2025-03-26 15:03:17 +00:00
5470b205fd [#1619] gc: Fix metric frostfs_node.garbage_collector.marking_duration_seconds
Change-Id: I957f930d1babf179d0fb6de624a90f4fe9977862
Signed-off-by: Anton Nikiforov <an.nikiforov@yadro.com>
2025-03-26 14:14:26 +03:00
31 changed files with 582 additions and 312 deletions

8
.ci/Jenkinsfile vendored
View file

@ -78,6 +78,10 @@ async {
} }
} }
} }
}
// TODO: dco check task('dco') {
container('git.frostfs.info/truecloudlab/commit-check:master') {
sh 'FROM=pull_request_target commit-check'
}
}
}

View file

@ -6,6 +6,7 @@ import (
"time" "time"
"git.frostfs.info/TrueCloudLab/frostfs-contract/nns" "git.frostfs.info/TrueCloudLab/frostfs-contract/nns"
nns2 "git.frostfs.info/TrueCloudLab/frostfs-contract/rpcclient/nns"
"git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-adm/internal/commonflags" "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-adm/internal/commonflags"
"git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-adm/internal/modules/config" "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-adm/internal/modules/config"
"git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-adm/internal/modules/morph/constants" "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-adm/internal/modules/morph/constants"
@ -13,9 +14,7 @@ import (
"github.com/nspcc-dev/neo-go/pkg/core/native/nativenames" "github.com/nspcc-dev/neo-go/pkg/core/native/nativenames"
"github.com/nspcc-dev/neo-go/pkg/crypto/keys" "github.com/nspcc-dev/neo-go/pkg/crypto/keys"
"github.com/nspcc-dev/neo-go/pkg/encoding/address" "github.com/nspcc-dev/neo-go/pkg/encoding/address"
"github.com/nspcc-dev/neo-go/pkg/rpcclient"
"github.com/nspcc-dev/neo-go/pkg/rpcclient/invoker" "github.com/nspcc-dev/neo-go/pkg/rpcclient/invoker"
nns2 "github.com/nspcc-dev/neo-go/pkg/rpcclient/nns"
"github.com/nspcc-dev/neo-go/pkg/rpcclient/unwrap" "github.com/nspcc-dev/neo-go/pkg/rpcclient/unwrap"
"github.com/nspcc-dev/neo-go/pkg/smartcontract/trigger" "github.com/nspcc-dev/neo-go/pkg/smartcontract/trigger"
"github.com/nspcc-dev/neo-go/pkg/util" "github.com/nspcc-dev/neo-go/pkg/util"
@ -187,19 +186,9 @@ func NNSResolveKey(inv *invoker.Invoker, nnsHash util.Uint160, domain string) (*
} }
func NNSIsAvailable(c Client, nnsHash util.Uint160, name string) (bool, error) { func NNSIsAvailable(c Client, nnsHash util.Uint160, name string) (bool, error) {
switch c.(type) {
case *rpcclient.Client:
inv := invoker.New(c, nil) inv := invoker.New(c, nil)
reader := nns2.NewReader(inv, nnsHash) reader := nns2.NewReader(inv, nnsHash)
return reader.IsAvailable(name) return reader.IsAvailable(name)
default:
b, err := unwrap.Bool(InvokeFunction(c, nnsHash, "isAvailable", []any{name}, nil))
if err != nil {
return false, fmt.Errorf("`isAvailable`: invalid response: %w", err)
}
return b, nil
}
} }
func CheckNotaryEnabled(c Client) error { func CheckNotaryEnabled(c Client) error {

View file

@ -40,6 +40,8 @@ type ClientContext struct {
CommitteeAct *actor.Actor // committee actor with the Global witness scope CommitteeAct *actor.Actor // committee actor with the Global witness scope
ReadOnlyInvoker *invoker.Invoker // R/O contract invoker, does not contain any signer ReadOnlyInvoker *invoker.Invoker // R/O contract invoker, does not contain any signer
SentTxs []HashVUBPair SentTxs []HashVUBPair
AwaitDisabled bool
} }
func NewRemoteClient(v *viper.Viper) (Client, error) { func NewRemoteClient(v *viper.Viper) (Client, error) {
@ -120,7 +122,7 @@ func (c *ClientContext) SendTx(tx *transaction.Transaction, cmd *cobra.Command,
} }
func (c *ClientContext) AwaitTx(cmd *cobra.Command) error { func (c *ClientContext) AwaitTx(cmd *cobra.Command) error {
if len(c.SentTxs) == 0 { if len(c.SentTxs) == 0 || c.AwaitDisabled {
return nil return nil
} }

View file

@ -39,6 +39,7 @@ func initializeSideChainCmd(cmd *cobra.Command, _ []string) error {
return err return err
} }
initCtx.AwaitDisabled = true
cmd.Println("Stage 4.1: Transfer GAS to proxy contract.") cmd.Println("Stage 4.1: Transfer GAS to proxy contract.")
if err := transferGASToProxy(initCtx); err != nil { if err := transferGASToProxy(initCtx); err != nil {
return err return err
@ -55,5 +56,10 @@ func initializeSideChainCmd(cmd *cobra.Command, _ []string) error {
} }
cmd.Println("Stage 7: set addresses in NNS.") cmd.Println("Stage 7: set addresses in NNS.")
return setNNS(initCtx) if err := setNNS(initCtx); err != nil {
return err
}
initCtx.AwaitDisabled = false
return initCtx.AwaitTx()
} }

View file

@ -1,7 +1,6 @@
package initialize package initialize
import ( import (
"errors"
"fmt" "fmt"
"math/big" "math/big"
@ -11,11 +10,8 @@ import (
"github.com/nspcc-dev/neo-go/pkg/core/state" "github.com/nspcc-dev/neo-go/pkg/core/state"
"github.com/nspcc-dev/neo-go/pkg/core/transaction" "github.com/nspcc-dev/neo-go/pkg/core/transaction"
"github.com/nspcc-dev/neo-go/pkg/io" "github.com/nspcc-dev/neo-go/pkg/io"
"github.com/nspcc-dev/neo-go/pkg/rpcclient"
"github.com/nspcc-dev/neo-go/pkg/rpcclient/actor" "github.com/nspcc-dev/neo-go/pkg/rpcclient/actor"
"github.com/nspcc-dev/neo-go/pkg/rpcclient/invoker"
"github.com/nspcc-dev/neo-go/pkg/rpcclient/neo" "github.com/nspcc-dev/neo-go/pkg/rpcclient/neo"
"github.com/nspcc-dev/neo-go/pkg/rpcclient/nep17"
"github.com/nspcc-dev/neo-go/pkg/rpcclient/unwrap" "github.com/nspcc-dev/neo-go/pkg/rpcclient/unwrap"
"github.com/nspcc-dev/neo-go/pkg/smartcontract/callflag" "github.com/nspcc-dev/neo-go/pkg/smartcontract/callflag"
"github.com/nspcc-dev/neo-go/pkg/util" "github.com/nspcc-dev/neo-go/pkg/util"
@ -30,7 +26,8 @@ const (
) )
func registerCandidateRange(c *helper.InitializeContext, start, end int) error { func registerCandidateRange(c *helper.InitializeContext, start, end int) error {
regPrice, err := getCandidateRegisterPrice(c) reader := neo.NewReader(c.ReadOnlyInvoker)
regPrice, err := reader.GetRegisterPrice()
if err != nil { if err != nil {
return fmt.Errorf("can't fetch registration price: %w", err) return fmt.Errorf("can't fetch registration price: %w", err)
} }
@ -116,7 +113,7 @@ func registerCandidates(c *helper.InitializeContext) error {
func transferNEOToAlphabetContracts(c *helper.InitializeContext) error { func transferNEOToAlphabetContracts(c *helper.InitializeContext) error {
neoHash := neo.Hash neoHash := neo.Hash
ok, err := transferNEOFinished(c, neoHash) ok, err := transferNEOFinished(c)
if ok || err != nil { if ok || err != nil {
return err return err
} }
@ -139,33 +136,8 @@ func transferNEOToAlphabetContracts(c *helper.InitializeContext) error {
return c.AwaitTx() return c.AwaitTx()
} }
func transferNEOFinished(c *helper.InitializeContext, neoHash util.Uint160) (bool, error) { func transferNEOFinished(c *helper.InitializeContext) (bool, error) {
r := nep17.NewReader(c.ReadOnlyInvoker, neoHash) r := neo.NewReader(c.ReadOnlyInvoker)
bal, err := r.BalanceOf(c.CommitteeAcc.Contract.ScriptHash()) bal, err := r.BalanceOf(c.CommitteeAcc.Contract.ScriptHash())
return bal.Cmp(big.NewInt(native.NEOTotalSupply)) == -1, err return bal.Cmp(big.NewInt(native.NEOTotalSupply)) == -1, err
} }
var errGetPriceInvalid = errors.New("`getRegisterPrice`: invalid response")
func getCandidateRegisterPrice(c *helper.InitializeContext) (int64, error) {
switch c.Client.(type) {
case *rpcclient.Client:
inv := invoker.New(c.Client, nil)
reader := neo.NewReader(inv)
return reader.GetRegisterPrice()
default:
neoHash := neo.Hash
res, err := helper.InvokeFunction(c.Client, neoHash, "getRegisterPrice", nil, nil)
if err != nil {
return 0, err
}
if len(res.Stack) == 0 {
return 0, errGetPriceInvalid
}
bi, err := res.Stack[0].TryInteger()
if err != nil || !bi.IsInt64() {
return 0, errGetPriceInvalid
}
return bi.Int64(), nil
}
}

View file

@ -2,17 +2,19 @@ package tree
import ( import (
"context" "context"
"crypto/tls"
"fmt" "fmt"
"strings"
"git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-cli/internal/common" "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-cli/internal/common"
"git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-cli/internal/commonflags" "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-cli/internal/commonflags"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/network" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/network"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/tree" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/tree"
tracing "git.frostfs.info/TrueCloudLab/frostfs-observability/tracing/grpc" tracing "git.frostfs.info/TrueCloudLab/frostfs-observability/tracing/grpc"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/api/rpc/client"
"github.com/spf13/cobra" "github.com/spf13/cobra"
"github.com/spf13/viper" "github.com/spf13/viper"
"google.golang.org/grpc" "google.golang.org/grpc"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/credentials/insecure" "google.golang.org/grpc/credentials/insecure"
) )
@ -31,6 +33,16 @@ func _client() (tree.TreeServiceClient, error) {
return nil, err return nil, err
} }
host, isTLS, err := client.ParseURI(netAddr.URIAddr())
if err != nil {
return nil, err
}
creds := insecure.NewCredentials()
if isTLS {
creds = credentials.NewTLS(&tls.Config{})
}
opts := []grpc.DialOption{ opts := []grpc.DialOption{
grpc.WithChainUnaryInterceptor( grpc.WithChainUnaryInterceptor(
tracing.NewUnaryClientInterceptor(), tracing.NewUnaryClientInterceptor(),
@ -40,13 +52,10 @@ func _client() (tree.TreeServiceClient, error) {
), ),
grpc.WithDefaultCallOptions(grpc.WaitForReady(true)), grpc.WithDefaultCallOptions(grpc.WaitForReady(true)),
grpc.WithDisableServiceConfig(), grpc.WithDisableServiceConfig(),
grpc.WithTransportCredentials(creds),
} }
if !strings.HasPrefix(netAddr.URIAddr(), "grpcs:") { cc, err := grpc.NewClient(host, opts...)
opts = append(opts, grpc.WithTransportCredentials(insecure.NewCredentials()))
}
cc, err := grpc.NewClient(netAddr.URIAddr(), opts...)
return tree.NewTreeServiceClient(cc), err return tree.NewTreeServiceClient(cc), err
} }

View file

@ -108,6 +108,7 @@ type applicationConfiguration struct {
level string level string
destination string destination string
timestamp bool timestamp bool
options []zap.Option
} }
ObjectCfg struct { ObjectCfg struct {
@ -232,6 +233,14 @@ func (a *applicationConfiguration) readConfig(c *config.Config) error {
a.LoggerCfg.level = loggerconfig.Level(c) a.LoggerCfg.level = loggerconfig.Level(c)
a.LoggerCfg.destination = loggerconfig.Destination(c) a.LoggerCfg.destination = loggerconfig.Destination(c)
a.LoggerCfg.timestamp = loggerconfig.Timestamp(c) a.LoggerCfg.timestamp = loggerconfig.Timestamp(c)
var opts []zap.Option
if loggerconfig.ToLokiConfig(c).Enabled {
opts = []zap.Option{zap.WrapCore(func(core zapcore.Core) zapcore.Core {
lokiCore := lokicore.New(core, loggerconfig.ToLokiConfig(c))
return lokiCore
})}
}
a.LoggerCfg.options = opts
// Object // Object
@ -718,12 +727,6 @@ func initCfg(appCfg *config.Config) *cfg {
logPrm.SamplingHook = c.metricsCollector.LogMetrics().GetSamplingHook() logPrm.SamplingHook = c.metricsCollector.LogMetrics().GetSamplingHook()
log, err := logger.NewLogger(logPrm) log, err := logger.NewLogger(logPrm)
fatalOnErr(err) fatalOnErr(err)
if loggerconfig.ToLokiConfig(appCfg).Enabled {
log.WithOptions(zap.WrapCore(func(core zapcore.Core) zapcore.Core {
lokiCore := lokicore.New(core, loggerconfig.ToLokiConfig(appCfg))
return lokiCore
}))
}
c.internals = initInternals(appCfg, log) c.internals = initInternals(appCfg, log)
@ -1090,6 +1093,7 @@ func (c *cfg) loggerPrm() (logger.Prm, error) {
return logger.Prm{}, errors.New("incorrect log destination format: " + c.LoggerCfg.destination) return logger.Prm{}, errors.New("incorrect log destination format: " + c.LoggerCfg.destination)
} }
prm.PrependTimestamp = c.LoggerCfg.timestamp prm.PrependTimestamp = c.LoggerCfg.timestamp
prm.Options = c.LoggerCfg.options
return prm, nil return prm, nil
} }

View file

@ -171,6 +171,7 @@ func TestEngineSection(t *testing.T) {
Tag: "policer", Tag: "policer",
Weight: toPtr(5), Weight: toPtr(5),
LimitOps: toPtr(25000), LimitOps: toPtr(25000),
Prohibited: true,
}, },
}) })
require.ElementsMatch(t, writeLimits.Tags, require.ElementsMatch(t, writeLimits.Tags,

View file

@ -84,6 +84,7 @@ type IOTagConfig struct {
Weight *float64 Weight *float64
LimitOps *float64 LimitOps *float64
ReservedOps *float64 ReservedOps *float64
Prohibited bool
} }
func tags(c *config.Config) []IOTagConfig { func tags(c *config.Config) []IOTagConfig {
@ -119,6 +120,13 @@ func tags(c *config.Config) []IOTagConfig {
tagConfig.ReservedOps = &r tagConfig.ReservedOps = &r
} }
v = c.Value(strconv.Itoa(i) + ".prohibited")
if v != nil {
r, err := cast.ToBoolE(v)
panicOnErr(err)
tagConfig.Prohibited = r
}
result = append(result, tagConfig) result = append(result, tagConfig)
} }
} }

View file

@ -43,6 +43,9 @@ func initQoSService(c *cfg) {
func (s *cfgQoSService) AdjustIncomingTag(ctx context.Context, requestSignPublicKey []byte) context.Context { func (s *cfgQoSService) AdjustIncomingTag(ctx context.Context, requestSignPublicKey []byte) context.Context {
rawTag, defined := qosTagging.IOTagFromContext(ctx) rawTag, defined := qosTagging.IOTagFromContext(ctx)
if !defined { if !defined {
if s.isInternalIOTagPublicKey(ctx, requestSignPublicKey) {
return qosTagging.ContextWithIOTag(ctx, qos.IOTagInternal.String())
}
return qosTagging.ContextWithIOTag(ctx, qos.IOTagClient.String()) return qosTagging.ContextWithIOTag(ctx, qos.IOTagClient.String())
} }
ioTag, err := qos.FromRawString(rawTag) ioTag, err := qos.FromRawString(rawTag)
@ -73,21 +76,9 @@ func (s *cfgQoSService) AdjustIncomingTag(ctx context.Context, requestSignPublic
s.logger.Debug(ctx, logs.FailedToValidateIncomingIOTag) s.logger.Debug(ctx, logs.FailedToValidateIncomingIOTag)
return qosTagging.ContextWithIOTag(ctx, qos.IOTagClient.String()) return qosTagging.ContextWithIOTag(ctx, qos.IOTagClient.String())
case qos.IOTagInternal: case qos.IOTagInternal:
for _, pk := range s.allowedInternalPubs { if s.isInternalIOTagPublicKey(ctx, requestSignPublicKey) {
if bytes.Equal(pk, requestSignPublicKey) {
return ctx return ctx
} }
}
nm, err := s.netmapSource.GetNetMap(ctx, 0)
if err != nil {
s.logger.Debug(ctx, logs.FailedToGetNetmapToAdjustIOTag, zap.Error(err))
return qosTagging.ContextWithIOTag(ctx, qos.IOTagClient.String())
}
for _, node := range nm.Nodes() {
if bytes.Equal(node.PublicKey(), requestSignPublicKey) {
return ctx
}
}
s.logger.Debug(ctx, logs.FailedToValidateIncomingIOTag) s.logger.Debug(ctx, logs.FailedToValidateIncomingIOTag)
return qosTagging.ContextWithIOTag(ctx, qos.IOTagClient.String()) return qosTagging.ContextWithIOTag(ctx, qos.IOTagClient.String())
default: default:
@ -95,3 +86,23 @@ func (s *cfgQoSService) AdjustIncomingTag(ctx context.Context, requestSignPublic
return qosTagging.ContextWithIOTag(ctx, qos.IOTagClient.String()) return qosTagging.ContextWithIOTag(ctx, qos.IOTagClient.String())
} }
} }
func (s *cfgQoSService) isInternalIOTagPublicKey(ctx context.Context, publicKey []byte) bool {
for _, pk := range s.allowedInternalPubs {
if bytes.Equal(pk, publicKey) {
return true
}
}
nm, err := s.netmapSource.GetNetMap(ctx, 0)
if err != nil {
s.logger.Debug(ctx, logs.FailedToGetNetmapToAdjustIOTag, zap.Error(err))
return false
}
for _, node := range nm.Nodes() {
if bytes.Equal(node.PublicKey(), publicKey) {
return true
}
}
return false
}

View file

@ -0,0 +1,226 @@
package main
import (
"context"
"testing"
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/qos"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger/test"
utilTesting "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/testing"
"git.frostfs.info/TrueCloudLab/frostfs-qos/tagging"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/netmap"
"github.com/nspcc-dev/neo-go/pkg/crypto/keys"
"github.com/stretchr/testify/require"
)
func TestQoSService_Client(t *testing.T) {
t.Parallel()
s, pk := testQoSServicePrepare(t)
t.Run("IO tag client defined", func(t *testing.T) {
ctx := tagging.ContextWithIOTag(context.Background(), qos.IOTagClient.String())
ctx = s.AdjustIncomingTag(ctx, pk.Request)
tag, ok := tagging.IOTagFromContext(ctx)
require.True(t, ok)
require.Equal(t, qos.IOTagClient.String(), tag)
})
t.Run("no IO tag defined, signed with unknown key", func(t *testing.T) {
ctx := s.AdjustIncomingTag(context.Background(), pk.Request)
tag, ok := tagging.IOTagFromContext(ctx)
require.True(t, ok)
require.Equal(t, qos.IOTagClient.String(), tag)
})
t.Run("no IO tag defined, signed with allowed critical key", func(t *testing.T) {
ctx := s.AdjustIncomingTag(context.Background(), pk.Critical)
tag, ok := tagging.IOTagFromContext(ctx)
require.True(t, ok)
require.Equal(t, qos.IOTagClient.String(), tag)
})
t.Run("unknown IO tag, signed with unknown key", func(t *testing.T) {
ctx := tagging.ContextWithIOTag(context.Background(), "some IO tag we don't know")
ctx = s.AdjustIncomingTag(ctx, pk.Request)
tag, ok := tagging.IOTagFromContext(ctx)
require.True(t, ok)
require.Equal(t, qos.IOTagClient.String(), tag)
})
t.Run("unknown IO tag, signed with netmap key", func(t *testing.T) {
ctx := tagging.ContextWithIOTag(context.Background(), "some IO tag we don't know")
ctx = s.AdjustIncomingTag(ctx, pk.NetmapNode)
tag, ok := tagging.IOTagFromContext(ctx)
require.True(t, ok)
require.Equal(t, qos.IOTagClient.String(), tag)
})
t.Run("unknown IO tag, signed with allowed internal key", func(t *testing.T) {
ctx := tagging.ContextWithIOTag(context.Background(), "some IO tag we don't know")
ctx = s.AdjustIncomingTag(ctx, pk.Internal)
tag, ok := tagging.IOTagFromContext(ctx)
require.True(t, ok)
require.Equal(t, qos.IOTagClient.String(), tag)
})
t.Run("unknown IO tag, signed with allowed critical key", func(t *testing.T) {
ctx := tagging.ContextWithIOTag(context.Background(), "some IO tag we don't know")
ctx = s.AdjustIncomingTag(ctx, pk.Critical)
tag, ok := tagging.IOTagFromContext(ctx)
require.True(t, ok)
require.Equal(t, qos.IOTagClient.String(), tag)
})
t.Run("IO tag internal defined, signed with unknown key", func(t *testing.T) {
ctx := tagging.ContextWithIOTag(context.Background(), qos.IOTagInternal.String())
ctx = s.AdjustIncomingTag(ctx, pk.Request)
tag, ok := tagging.IOTagFromContext(ctx)
require.True(t, ok)
require.Equal(t, qos.IOTagClient.String(), tag)
})
t.Run("IO tag internal defined, signed with allowed critical key", func(t *testing.T) {
ctx := tagging.ContextWithIOTag(context.Background(), qos.IOTagInternal.String())
ctx = s.AdjustIncomingTag(ctx, pk.Critical)
tag, ok := tagging.IOTagFromContext(ctx)
require.True(t, ok)
require.Equal(t, qos.IOTagClient.String(), tag)
})
t.Run("IO tag critical defined, signed with unknown key", func(t *testing.T) {
ctx := tagging.ContextWithIOTag(context.Background(), qos.IOTagCritical.String())
ctx = s.AdjustIncomingTag(ctx, pk.Request)
tag, ok := tagging.IOTagFromContext(ctx)
require.True(t, ok)
require.Equal(t, qos.IOTagClient.String(), tag)
})
t.Run("IO tag critical defined, signed with allowed internal key", func(t *testing.T) {
ctx := tagging.ContextWithIOTag(context.Background(), qos.IOTagCritical.String())
ctx = s.AdjustIncomingTag(ctx, pk.Internal)
tag, ok := tagging.IOTagFromContext(ctx)
require.True(t, ok)
require.Equal(t, qos.IOTagClient.String(), tag)
})
}
func TestQoSService_Internal(t *testing.T) {
t.Parallel()
s, pk := testQoSServicePrepare(t)
t.Run("IO tag internal defined, signed with netmap key", func(t *testing.T) {
ctx := tagging.ContextWithIOTag(context.Background(), qos.IOTagInternal.String())
ctx = s.AdjustIncomingTag(ctx, pk.NetmapNode)
tag, ok := tagging.IOTagFromContext(ctx)
require.True(t, ok)
require.Equal(t, qos.IOTagInternal.String(), tag)
})
t.Run("IO tag internal defined, signed with allowed internal key", func(t *testing.T) {
ctx := tagging.ContextWithIOTag(context.Background(), qos.IOTagInternal.String())
ctx = s.AdjustIncomingTag(ctx, pk.Internal)
tag, ok := tagging.IOTagFromContext(ctx)
require.True(t, ok)
require.Equal(t, qos.IOTagInternal.String(), tag)
})
t.Run("no IO tag defined, signed with netmap key", func(t *testing.T) {
ctx := s.AdjustIncomingTag(context.Background(), pk.NetmapNode)
tag, ok := tagging.IOTagFromContext(ctx)
require.True(t, ok)
require.Equal(t, qos.IOTagInternal.String(), tag)
})
t.Run("no IO tag defined, signed with allowed internal key", func(t *testing.T) {
ctx := s.AdjustIncomingTag(context.Background(), pk.Internal)
tag, ok := tagging.IOTagFromContext(ctx)
require.True(t, ok)
require.Equal(t, qos.IOTagInternal.String(), tag)
})
}
func TestQoSService_Critical(t *testing.T) {
t.Parallel()
s, pk := testQoSServicePrepare(t)
t.Run("IO tag critical defined, signed with netmap key", func(t *testing.T) {
ctx := tagging.ContextWithIOTag(context.Background(), qos.IOTagCritical.String())
ctx = s.AdjustIncomingTag(ctx, pk.NetmapNode)
tag, ok := tagging.IOTagFromContext(ctx)
require.True(t, ok)
require.Equal(t, qos.IOTagCritical.String(), tag)
})
t.Run("IO tag critical defined, signed with allowed critical key", func(t *testing.T) {
ctx := tagging.ContextWithIOTag(context.Background(), qos.IOTagCritical.String())
ctx = s.AdjustIncomingTag(ctx, pk.Critical)
tag, ok := tagging.IOTagFromContext(ctx)
require.True(t, ok)
require.Equal(t, qos.IOTagCritical.String(), tag)
})
}
func TestQoSService_NetmapGetError(t *testing.T) {
t.Parallel()
s, pk := testQoSServicePrepare(t)
s.netmapSource = &utilTesting.TestNetmapSource{}
t.Run("IO tag internal defined, signed with netmap key", func(t *testing.T) {
ctx := tagging.ContextWithIOTag(context.Background(), qos.IOTagInternal.String())
ctx = s.AdjustIncomingTag(ctx, pk.NetmapNode)
tag, ok := tagging.IOTagFromContext(ctx)
require.True(t, ok)
require.Equal(t, qos.IOTagClient.String(), tag)
})
t.Run("IO tag critical defined, signed with netmap key", func(t *testing.T) {
ctx := tagging.ContextWithIOTag(context.Background(), qos.IOTagCritical.String())
ctx = s.AdjustIncomingTag(ctx, pk.NetmapNode)
tag, ok := tagging.IOTagFromContext(ctx)
require.True(t, ok)
require.Equal(t, qos.IOTagClient.String(), tag)
})
t.Run("no IO tag defined, signed with netmap key", func(t *testing.T) {
ctx := s.AdjustIncomingTag(context.Background(), pk.NetmapNode)
tag, ok := tagging.IOTagFromContext(ctx)
require.True(t, ok)
require.Equal(t, qos.IOTagClient.String(), tag)
})
t.Run("unknown IO tag, signed with netmap key", func(t *testing.T) {
ctx := tagging.ContextWithIOTag(context.Background(), "some IO tag we don't know")
ctx = s.AdjustIncomingTag(ctx, pk.NetmapNode)
tag, ok := tagging.IOTagFromContext(ctx)
require.True(t, ok)
require.Equal(t, qos.IOTagClient.String(), tag)
})
}
func testQoSServicePrepare(t *testing.T) (*cfgQoSService, *testQoSServicePublicKeys) {
nmSigner, err := keys.NewPrivateKey()
require.NoError(t, err)
reqSigner, err := keys.NewPrivateKey()
require.NoError(t, err)
allowedCritSigner, err := keys.NewPrivateKey()
require.NoError(t, err)
allowedIntSigner, err := keys.NewPrivateKey()
require.NoError(t, err)
var node netmap.NodeInfo
node.SetPublicKey(nmSigner.PublicKey().Bytes())
nm := &netmap.NetMap{}
nm.SetEpoch(100)
nm.SetNodes([]netmap.NodeInfo{node})
return &cfgQoSService{
logger: test.NewLogger(t),
netmapSource: &utilTesting.TestNetmapSource{
Netmaps: map[uint64]*netmap.NetMap{
100: nm,
},
CurrentEpoch: 100,
},
allowedCriticalPubs: [][]byte{
allowedCritSigner.PublicKey().Bytes(),
},
allowedInternalPubs: [][]byte{
allowedIntSigner.PublicKey().Bytes(),
},
},
&testQoSServicePublicKeys{
NetmapNode: nmSigner.PublicKey().Bytes(),
Request: reqSigner.PublicKey().Bytes(),
Internal: allowedIntSigner.PublicKey().Bytes(),
Critical: allowedCritSigner.PublicKey().Bytes(),
}
}
type testQoSServicePublicKeys struct {
NetmapNode []byte
Request []byte
Internal []byte
Critical []byte
}

View file

@ -180,6 +180,7 @@ FROSTFS_STORAGE_SHARD_0_LIMITS_READ_TAGS_3_LIMIT_OPS=25000
FROSTFS_STORAGE_SHARD_0_LIMITS_READ_TAGS_4_TAG=policer FROSTFS_STORAGE_SHARD_0_LIMITS_READ_TAGS_4_TAG=policer
FROSTFS_STORAGE_SHARD_0_LIMITS_READ_TAGS_4_WEIGHT=5 FROSTFS_STORAGE_SHARD_0_LIMITS_READ_TAGS_4_WEIGHT=5
FROSTFS_STORAGE_SHARD_0_LIMITS_READ_TAGS_4_LIMIT_OPS=25000 FROSTFS_STORAGE_SHARD_0_LIMITS_READ_TAGS_4_LIMIT_OPS=25000
FROSTFS_STORAGE_SHARD_0_LIMITS_READ_TAGS_4_PROHIBITED=true
FROSTFS_STORAGE_SHARD_0_LIMITS_WRITE_TAGS_0_TAG=internal FROSTFS_STORAGE_SHARD_0_LIMITS_WRITE_TAGS_0_TAG=internal
FROSTFS_STORAGE_SHARD_0_LIMITS_WRITE_TAGS_0_WEIGHT=200 FROSTFS_STORAGE_SHARD_0_LIMITS_WRITE_TAGS_0_WEIGHT=200
FROSTFS_STORAGE_SHARD_0_LIMITS_WRITE_TAGS_0_LIMIT_OPS=0 FROSTFS_STORAGE_SHARD_0_LIMITS_WRITE_TAGS_0_LIMIT_OPS=0

View file

@ -252,7 +252,8 @@
{ {
"tag": "policer", "tag": "policer",
"weight": 5, "weight": 5,
"limit_ops": 25000 "limit_ops": 25000,
"prohibited": true
} }
] ]
}, },

View file

@ -249,6 +249,7 @@ storage:
- tag: policer - tag: policer
weight: 5 weight: 5
limit_ops: 25000 limit_ops: 25000
prohibited: true
write: write:
max_running_ops: 1000 max_running_ops: 1000
max_waiting_ops: 100 max_waiting_ops: 100

View file

@ -359,6 +359,7 @@ limits:
| `tag.weight` | `float` | 0 (no weight) | Weight for queries with the specified tag. Weights must be specified for all tags or not specified for any one. | | `tag.weight` | `float` | 0 (no weight) | Weight for queries with the specified tag. Weights must be specified for all tags or not specified for any one. |
| `tag.limit_ops` | `float` | 0 (no limit) | Operations per second rate limit for queries with the specified tag. | | `tag.limit_ops` | `float` | 0 (no limit) | Operations per second rate limit for queries with the specified tag. |
| `tag.reserved_ops` | `float` | 0 (no reserve) | Reserved operations per second rate for queries with the specified tag. | | `tag.reserved_ops` | `float` | 0 (no reserve) | Reserved operations per second rate for queries with the specified tag. |
| `tag.prohibited` | `bool` | false | If true, operations with this specified tag will be prohibited. |
# `node` section # `node` section

6
go.mod
View file

@ -6,10 +6,10 @@ require (
code.gitea.io/sdk/gitea v0.17.1 code.gitea.io/sdk/gitea v0.17.1
git.frostfs.info/TrueCloudLab/frostfs-contract v0.21.1 git.frostfs.info/TrueCloudLab/frostfs-contract v0.21.1
git.frostfs.info/TrueCloudLab/frostfs-crypto v0.6.0 git.frostfs.info/TrueCloudLab/frostfs-crypto v0.6.0
git.frostfs.info/TrueCloudLab/frostfs-locode-db v0.4.1-0.20240710074952-65761deb5c0d git.frostfs.info/TrueCloudLab/frostfs-locode-db v0.5.2
git.frostfs.info/TrueCloudLab/frostfs-observability v0.0.0-20250321063246-93b681a20248 git.frostfs.info/TrueCloudLab/frostfs-observability v0.0.0-20250321063246-93b681a20248
git.frostfs.info/TrueCloudLab/frostfs-qos v0.0.0-20250320142439-32079ad7c275 git.frostfs.info/TrueCloudLab/frostfs-qos v0.0.0-20250331080422-b5ed0b6eff47
git.frostfs.info/TrueCloudLab/frostfs-sdk-go v0.0.0-20250306092416-69b0711d12d9 git.frostfs.info/TrueCloudLab/frostfs-sdk-go v0.0.0-20250326101739-4d36a49d3945
git.frostfs.info/TrueCloudLab/hrw v1.2.1 git.frostfs.info/TrueCloudLab/hrw v1.2.1
git.frostfs.info/TrueCloudLab/multinet v0.0.0-20241015075604-6cb0d80e0972 git.frostfs.info/TrueCloudLab/multinet v0.0.0-20241015075604-6cb0d80e0972
git.frostfs.info/TrueCloudLab/policy-engine v0.0.0-20240822104152-a3bc3099bd5b git.frostfs.info/TrueCloudLab/policy-engine v0.0.0-20240822104152-a3bc3099bd5b

12
go.sum
View file

@ -4,14 +4,14 @@ git.frostfs.info/TrueCloudLab/frostfs-contract v0.21.1 h1:k1Qw8dWUQczfo0eVXlhrq9
git.frostfs.info/TrueCloudLab/frostfs-contract v0.21.1/go.mod h1:5fSm/l5xSjGWqsPUffSdboiGFUHa7y/1S0fvxzQowN8= git.frostfs.info/TrueCloudLab/frostfs-contract v0.21.1/go.mod h1:5fSm/l5xSjGWqsPUffSdboiGFUHa7y/1S0fvxzQowN8=
git.frostfs.info/TrueCloudLab/frostfs-crypto v0.6.0 h1:FxqFDhQYYgpe41qsIHVOcdzSVCB8JNSfPG7Uk4r2oSk= git.frostfs.info/TrueCloudLab/frostfs-crypto v0.6.0 h1:FxqFDhQYYgpe41qsIHVOcdzSVCB8JNSfPG7Uk4r2oSk=
git.frostfs.info/TrueCloudLab/frostfs-crypto v0.6.0/go.mod h1:RUIKZATQLJ+TaYQa60X2fTDwfuhMfm8Ar60bQ5fr+vU= git.frostfs.info/TrueCloudLab/frostfs-crypto v0.6.0/go.mod h1:RUIKZATQLJ+TaYQa60X2fTDwfuhMfm8Ar60bQ5fr+vU=
git.frostfs.info/TrueCloudLab/frostfs-locode-db v0.4.1-0.20240710074952-65761deb5c0d h1:uJ/wvuMdepbkaV8XMS5uN9B0FQWMep0CttSuDZiDhq0= git.frostfs.info/TrueCloudLab/frostfs-locode-db v0.5.2 h1:AovQs7bea0fLnYfldCZB88FkUgRj0QaHkJEbcWfgzvY=
git.frostfs.info/TrueCloudLab/frostfs-locode-db v0.4.1-0.20240710074952-65761deb5c0d/go.mod h1:7ZZq8iguY7qFsXajdHGmZd2AW4QbucyrJwhbsRfOfek= git.frostfs.info/TrueCloudLab/frostfs-locode-db v0.5.2/go.mod h1:7ZZq8iguY7qFsXajdHGmZd2AW4QbucyrJwhbsRfOfek=
git.frostfs.info/TrueCloudLab/frostfs-observability v0.0.0-20250321063246-93b681a20248 h1:fluzML8BIIabd07LyPSjc0JAV2qymWkPiFaLrXdALLA= git.frostfs.info/TrueCloudLab/frostfs-observability v0.0.0-20250321063246-93b681a20248 h1:fluzML8BIIabd07LyPSjc0JAV2qymWkPiFaLrXdALLA=
git.frostfs.info/TrueCloudLab/frostfs-observability v0.0.0-20250321063246-93b681a20248/go.mod h1:kbwB4v2o6RyOfCo9kEFeUDZIX3LKhmS0yXPrtvzkQ1g= git.frostfs.info/TrueCloudLab/frostfs-observability v0.0.0-20250321063246-93b681a20248/go.mod h1:kbwB4v2o6RyOfCo9kEFeUDZIX3LKhmS0yXPrtvzkQ1g=
git.frostfs.info/TrueCloudLab/frostfs-qos v0.0.0-20250320142439-32079ad7c275 h1:WqWxCnCl2ekfjWja/CpGeF2rf4h0x199xhdnsm/j+E8= git.frostfs.info/TrueCloudLab/frostfs-qos v0.0.0-20250331080422-b5ed0b6eff47 h1:O2c3VOlaGZ862hf2ZPLBMdTG6vGJzhIgDvFEFGfntzU=
git.frostfs.info/TrueCloudLab/frostfs-qos v0.0.0-20250320142439-32079ad7c275/go.mod h1:PCijYq4oa8vKtIEcUX6jRiszI6XAW+nBwU+T1kB4d1U= git.frostfs.info/TrueCloudLab/frostfs-qos v0.0.0-20250331080422-b5ed0b6eff47/go.mod h1:PCijYq4oa8vKtIEcUX6jRiszI6XAW+nBwU+T1kB4d1U=
git.frostfs.info/TrueCloudLab/frostfs-sdk-go v0.0.0-20250306092416-69b0711d12d9 h1:svCl6NDAPZ/KuQPjdVKo74RkCIANesxUPM45zQZDhSw= git.frostfs.info/TrueCloudLab/frostfs-sdk-go v0.0.0-20250326101739-4d36a49d3945 h1:zM2l316J55h9p30snl6vHBI/h0xmnuqZjnxIjRDtJZw=
git.frostfs.info/TrueCloudLab/frostfs-sdk-go v0.0.0-20250306092416-69b0711d12d9/go.mod h1:aQpPWfG8oyfJ2X+FenPTJpSRWZjwcP5/RAtkW+/VEX8= git.frostfs.info/TrueCloudLab/frostfs-sdk-go v0.0.0-20250326101739-4d36a49d3945/go.mod h1:aQpPWfG8oyfJ2X+FenPTJpSRWZjwcP5/RAtkW+/VEX8=
git.frostfs.info/TrueCloudLab/hrw v1.2.1 h1:ccBRK21rFvY5R1WotI6LNoPlizk7qSvdfD8lNIRudVc= git.frostfs.info/TrueCloudLab/hrw v1.2.1 h1:ccBRK21rFvY5R1WotI6LNoPlizk7qSvdfD8lNIRudVc=
git.frostfs.info/TrueCloudLab/hrw v1.2.1/go.mod h1:C1Ygde2n843yTZEQ0FP69jYiuaYV0kriLvP4zm8JuvM= git.frostfs.info/TrueCloudLab/hrw v1.2.1/go.mod h1:C1Ygde2n843yTZEQ0FP69jYiuaYV0kriLvP4zm8JuvM=
git.frostfs.info/TrueCloudLab/multinet v0.0.0-20241015075604-6cb0d80e0972 h1:/960fWeyn2AFHwQUwDsWB3sbP6lTEnFnMzLMM6tx6N8= git.frostfs.info/TrueCloudLab/multinet v0.0.0-20241015075604-6cb0d80e0972 h1:/960fWeyn2AFHwQUwDsWB3sbP6lTEnFnMzLMM6tx6N8=

View file

@ -512,7 +512,7 @@ const (
FailedToUpdateMultinetConfiguration = "failed to update multinet configuration" FailedToUpdateMultinetConfiguration = "failed to update multinet configuration"
FailedToParseIncomingIOTag = "failed to parse incoming IO tag" FailedToParseIncomingIOTag = "failed to parse incoming IO tag"
NotSupportedIncomingIOTagReplacedWithClient = "incoming IO tag is not supported, replaced with `client`" NotSupportedIncomingIOTagReplacedWithClient = "incoming IO tag is not supported, replaced with `client`"
FailedToGetNetmapToAdjustIOTag = "failed to get netmap to adjust IO tag, replaced with `client`" FailedToGetNetmapToAdjustIOTag = "failed to get netmap to adjust IO tag"
FailedToValidateIncomingIOTag = "failed to validate incoming IO tag, replaced with `client`" FailedToValidateIncomingIOTag = "failed to validate incoming IO tag, replaced with `client`"
WriteCacheFailedToAcquireRPSQuota = "writecache failed to acquire RPS quota to flush object" WriteCacheFailedToAcquireRPSQuota = "writecache failed to acquire RPS quota to flush object"
) )

View file

@ -22,7 +22,7 @@ var (
errTest = errors.New("mock") errTest = errors.New("mock")
errWrongTag = errors.New("wrong tag") errWrongTag = errors.New("wrong tag")
errNoTag = errors.New("failed to get tag from context") errNoTag = errors.New("failed to get tag from context")
errResExhausted = new(apistatus.ResourceExhausted) errResExhausted *apistatus.ResourceExhausted
tags = []qos.IOTag{qos.IOTagBackground, qos.IOTagWritecache, qos.IOTagPolicer, qos.IOTagTreeSync} tags = []qos.IOTag{qos.IOTagBackground, qos.IOTagWritecache, qos.IOTagPolicer, qos.IOTagTreeSync}
) )
@ -37,38 +37,36 @@ func (m *mockGRPCServerStream) Context() context.Context {
} }
type limiter struct { type limiter struct {
acquired bool
released bool released bool
} }
func (l *limiter) Acquire(key string) (limiting.ReleaseFunc, bool) { func (l *limiter) Acquire(key string) (limiting.ReleaseFunc, bool) {
l.acquired = true
if key != okKey { if key != okKey {
return nil, false return nil, false
} }
return func() { l.released = true }, true return func() { l.released = true }, true
} }
func unaryMaxActiveRPCLimiter(ctx context.Context, lim *limiter, methodName string) (bool, error) { func unaryMaxActiveRPCLimiter(ctx context.Context, lim *limiter, methodName string) error {
interceptor := qos.NewMaxActiveRPCLimiterUnaryServerInterceptor(func() limiting.Limiter { return lim }) interceptor := qos.NewMaxActiveRPCLimiterUnaryServerInterceptor(func() limiting.Limiter { return lim })
called := false
handler := func(ctx context.Context, req any) (any, error) { handler := func(ctx context.Context, req any) (any, error) {
called = true
return nil, errTest return nil, errTest
} }
_, err := interceptor(ctx, nil, &grpc.UnaryServerInfo{FullMethod: methodName}, handler) _, err := interceptor(ctx, nil, &grpc.UnaryServerInfo{FullMethod: methodName}, handler)
return called, err return err
} }
func streamMaxActiveRPCLimiter(ctx context.Context, lim *limiter, methodName string) (bool, error) { func streamMaxActiveRPCLimiter(ctx context.Context, lim *limiter, methodName string) error {
interceptor := qos.NewMaxActiveRPCLimiterStreamServerInterceptor(func() limiting.Limiter { return lim }) interceptor := qos.NewMaxActiveRPCLimiterStreamServerInterceptor(func() limiting.Limiter { return lim })
called := false
handler := func(srv any, stream grpc.ServerStream) error { handler := func(srv any, stream grpc.ServerStream) error {
called = true
return errTest return errTest
} }
err := interceptor(nil, &mockGRPCServerStream{ctx: ctx}, &grpc.StreamServerInfo{ err := interceptor(nil, &mockGRPCServerStream{ctx: ctx}, &grpc.StreamServerInfo{
FullMethod: methodName, FullMethod: methodName,
}, handler) }, handler)
return called, err return err
} }
func Test_MaxActiveRPCLimiter(t *testing.T) { func Test_MaxActiveRPCLimiter(t *testing.T) {
@ -76,55 +74,61 @@ func Test_MaxActiveRPCLimiter(t *testing.T) {
t.Run("unary fail", func(t *testing.T) { t.Run("unary fail", func(t *testing.T) {
var lim limiter var lim limiter
called, err := unaryMaxActiveRPCLimiter(context.Background(), &lim, "") err := unaryMaxActiveRPCLimiter(context.Background(), &lim, "")
require.EqualError(t, err, errResExhausted.Error()) require.ErrorAs(t, err, &errResExhausted)
require.False(t, called) require.True(t, lim.acquired)
require.False(t, lim.released)
}) })
t.Run("unary pass critical", func(t *testing.T) { t.Run("unary pass critical", func(t *testing.T) {
var lim limiter var lim limiter
ctx := tagging.ContextWithIOTag(context.Background(), qos.IOTagCritical.String()) ctx := tagging.ContextWithIOTag(context.Background(), qos.IOTagCritical.String())
called, err := unaryMaxActiveRPCLimiter(ctx, &lim, "") err := unaryMaxActiveRPCLimiter(ctx, &lim, "")
require.EqualError(t, err, errTest.Error()) require.ErrorIs(t, err, errTest)
require.True(t, called) require.False(t, lim.acquired)
require.False(t, lim.released) require.False(t, lim.released)
}) })
t.Run("unary pass", func(t *testing.T) { t.Run("unary pass", func(t *testing.T) {
var lim limiter var lim limiter
called, err := unaryMaxActiveRPCLimiter(context.Background(), &lim, okKey) err := unaryMaxActiveRPCLimiter(context.Background(), &lim, okKey)
require.EqualError(t, err, errTest.Error()) require.ErrorIs(t, err, errTest)
require.True(t, called && lim.released) require.True(t, lim.acquired)
require.True(t, lim.released)
}) })
// StreamServerInterceptor // StreamServerInterceptor
t.Run("stream fail", func(t *testing.T) { t.Run("stream fail", func(t *testing.T) {
var lim limiter var lim limiter
called, err := streamMaxActiveRPCLimiter(context.Background(), &lim, "") err := streamMaxActiveRPCLimiter(context.Background(), &lim, "")
require.EqualError(t, err, errResExhausted.Error()) require.ErrorAs(t, err, &errResExhausted)
require.False(t, called) require.True(t, lim.acquired)
require.False(t, lim.released)
}) })
t.Run("stream pass critical", func(t *testing.T) { t.Run("stream pass critical", func(t *testing.T) {
var lim limiter var lim limiter
ctx := tagging.ContextWithIOTag(context.Background(), qos.IOTagCritical.String()) ctx := tagging.ContextWithIOTag(context.Background(), qos.IOTagCritical.String())
called, err := streamMaxActiveRPCLimiter(ctx, &lim, "") err := streamMaxActiveRPCLimiter(ctx, &lim, "")
require.EqualError(t, err, errTest.Error()) require.ErrorIs(t, err, errTest)
require.True(t, called) require.False(t, lim.acquired)
require.False(t, lim.released) require.False(t, lim.released)
}) })
t.Run("stream pass", func(t *testing.T) { t.Run("stream pass", func(t *testing.T) {
var lim limiter var lim limiter
called, err := streamMaxActiveRPCLimiter(context.Background(), &lim, okKey) err := streamMaxActiveRPCLimiter(context.Background(), &lim, okKey)
require.EqualError(t, err, errTest.Error()) require.ErrorIs(t, err, errTest)
require.True(t, called && lim.released) require.True(t, lim.acquired)
require.True(t, lim.released)
}) })
} }
func TestSetCriticalIOTagUnaryServerInterceptor_Pass(t *testing.T) { func TestSetCriticalIOTagUnaryServerInterceptor_Pass(t *testing.T) {
interceptor := qos.NewSetCriticalIOTagUnaryServerInterceptor() interceptor := qos.NewSetCriticalIOTagUnaryServerInterceptor()
called := false
handler := func(ctx context.Context, req any) (any, error) { handler := func(ctx context.Context, req any) (any, error) {
called = true
if tag, ok := tagging.IOTagFromContext(ctx); ok && tag == qos.IOTagCritical.String() { if tag, ok := tagging.IOTagFromContext(ctx); ok && tag == qos.IOTagCritical.String() {
return nil, nil return nil, nil
} }
@ -132,6 +136,7 @@ func TestSetCriticalIOTagUnaryServerInterceptor_Pass(t *testing.T) {
} }
_, err := interceptor(context.Background(), nil, nil, handler) _, err := interceptor(context.Background(), nil, nil, handler)
require.NoError(t, err) require.NoError(t, err)
require.True(t, called)
} }
func TestAdjustOutgoingIOTagUnaryClientInterceptor(t *testing.T) { func TestAdjustOutgoingIOTagUnaryClientInterceptor(t *testing.T) {

View file

@ -90,6 +90,7 @@ func converToSchedulingTags(limits []limits.IOTagConfig) map[string]scheduling.T
if l.ReservedOps != nil && *l.ReservedOps != 0 { if l.ReservedOps != nil && *l.ReservedOps != 0 {
v.ReservedIOPS = l.ReservedOps v.ReservedIOPS = l.ReservedOps
} }
v.Prohibited = l.Prohibited
result[l.Tag] = v result[l.Tag] = v
} }
return result return result
@ -164,8 +165,7 @@ func requestArrival(ctx context.Context, s scheduler, stats map[string]*stat) (R
rel, err := s.RequestArrival(ctx, tag) rel, err := s.RequestArrival(ctx, tag)
stat.inProgress.Add(1) stat.inProgress.Add(1)
if err != nil { if err != nil {
if errors.Is(err, scheduling.ErrMClockSchedulerRequestLimitExceeded) || if isResourceExhaustedErr(err) {
errors.Is(err, errSemaphoreLimitExceeded) {
stat.resourceExhausted.Add(1) stat.resourceExhausted.Add(1)
return nil, &apistatus.ResourceExhausted{} return nil, &apistatus.ResourceExhausted{}
} }
@ -234,3 +234,9 @@ func exportMetrics(metrics Metrics, stats map[string]*stat, shardID, operation s
metrics.SetOperationTagCounters(shardID, operation, tag, pending, inProgress, completed, resExh) metrics.SetOperationTagCounters(shardID, operation, tag, pending, inProgress, completed, resExh)
} }
} }
func isResourceExhaustedErr(err error) bool {
return errors.Is(err, scheduling.ErrMClockSchedulerRequestLimitExceeded) ||
errors.Is(err, errSemaphoreLimitExceeded) ||
errors.Is(err, scheduling.ErrTagRequestsProhibited)
}

View file

@ -9,6 +9,7 @@ import (
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/container" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/container"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger"
utilTesting "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/testing"
objectV2 "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/api/object" objectV2 "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/api/object"
containerSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container" containerSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container"
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id" cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
@ -410,11 +411,11 @@ func TestFormatValidator_ValidateTokenIssuer(t *testing.T) {
}, },
), ),
WithNetmapSource( WithNetmapSource(
&testNetmapSource{ &utilTesting.TestNetmapSource{
netmaps: map[uint64]*netmap.NetMap{ Netmaps: map[uint64]*netmap.NetMap{
curEpoch: currentEpochNM, curEpoch: currentEpochNM,
}, },
currentEpoch: curEpoch, CurrentEpoch: curEpoch,
}, },
), ),
WithLogger(logger.NewLoggerWrapper(zaptest.NewLogger(t))), WithLogger(logger.NewLoggerWrapper(zaptest.NewLogger(t))),
@ -483,12 +484,12 @@ func TestFormatValidator_ValidateTokenIssuer(t *testing.T) {
}, },
), ),
WithNetmapSource( WithNetmapSource(
&testNetmapSource{ &utilTesting.TestNetmapSource{
netmaps: map[uint64]*netmap.NetMap{ Netmaps: map[uint64]*netmap.NetMap{
curEpoch: currentEpochNM, curEpoch: currentEpochNM,
curEpoch - 1: previousEpochNM, curEpoch - 1: previousEpochNM,
}, },
currentEpoch: curEpoch, CurrentEpoch: curEpoch,
}, },
), ),
WithLogger(logger.NewLoggerWrapper(zaptest.NewLogger(t))), WithLogger(logger.NewLoggerWrapper(zaptest.NewLogger(t))),
@ -559,12 +560,12 @@ func TestFormatValidator_ValidateTokenIssuer(t *testing.T) {
}, },
), ),
WithNetmapSource( WithNetmapSource(
&testNetmapSource{ &utilTesting.TestNetmapSource{
netmaps: map[uint64]*netmap.NetMap{ Netmaps: map[uint64]*netmap.NetMap{
curEpoch: currentEpochNM, curEpoch: currentEpochNM,
curEpoch - 1: previousEpochNM, curEpoch - 1: previousEpochNM,
}, },
currentEpoch: curEpoch, CurrentEpoch: curEpoch,
}, },
), ),
WithLogger(logger.NewLoggerWrapper(zaptest.NewLogger(t))), WithLogger(logger.NewLoggerWrapper(zaptest.NewLogger(t))),
@ -596,26 +597,3 @@ func (s *testContainerSource) Get(ctx context.Context, cnrID cid.ID) (*container
func (s *testContainerSource) DeletionInfo(context.Context, cid.ID) (*container.DelInfo, error) { func (s *testContainerSource) DeletionInfo(context.Context, cid.ID) (*container.DelInfo, error) {
return nil, nil return nil, nil
} }
type testNetmapSource struct {
netmaps map[uint64]*netmap.NetMap
currentEpoch uint64
}
func (s *testNetmapSource) GetNetMap(ctx context.Context, diff uint64) (*netmap.NetMap, error) {
if diff >= s.currentEpoch {
return nil, fmt.Errorf("invalid diff")
}
return s.GetNetMapByEpoch(ctx, s.currentEpoch-diff)
}
func (s *testNetmapSource) GetNetMapByEpoch(ctx context.Context, epoch uint64) (*netmap.NetMap, error) {
if nm, found := s.netmaps[epoch]; found {
return nm, nil
}
return nil, fmt.Errorf("netmap not found")
}
func (s *testNetmapSource) Epoch(ctx context.Context) (uint64, error) {
return s.currentEpoch, nil
}

View file

@ -2,8 +2,11 @@ package engine
import ( import (
"context" "context"
"fmt"
"path/filepath" "path/filepath"
"sync/atomic" "runtime/debug"
"strings"
"sync"
"testing" "testing"
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/qos" "git.frostfs.info/TrueCloudLab/frostfs-node/internal/qos"
@ -158,25 +161,73 @@ var _ qos.Limiter = (*testQoSLimiter)(nil)
type testQoSLimiter struct { type testQoSLimiter struct {
t testing.TB t testing.TB
read atomic.Int64 quard sync.Mutex
write atomic.Int64 id int64
readStacks map[int64][]byte
writeStacks map[int64][]byte
} }
func (t *testQoSLimiter) SetMetrics(qos.Metrics) {} func (t *testQoSLimiter) SetMetrics(qos.Metrics) {}
func (t *testQoSLimiter) Close() { func (t *testQoSLimiter) Close() {
require.Equal(t.t, int64(0), t.read.Load(), "read requests count after limiter close must be 0") t.quard.Lock()
require.Equal(t.t, int64(0), t.write.Load(), "write requests count after limiter close must be 0") defer t.quard.Unlock()
var sb strings.Builder
var seqN int
for _, stack := range t.readStacks {
seqN++
sb.WriteString(fmt.Sprintf("%d\n read request stack after limiter close: %s\n", seqN, string(stack)))
}
for _, stack := range t.writeStacks {
seqN++
sb.WriteString(fmt.Sprintf("%d\n write request stack after limiter close: %s\n", seqN, string(stack)))
}
require.True(t.t, seqN == 0, sb.String())
} }
func (t *testQoSLimiter) ReadRequest(context.Context) (qos.ReleaseFunc, error) { func (t *testQoSLimiter) ReadRequest(context.Context) (qos.ReleaseFunc, error) {
t.read.Add(1) t.quard.Lock()
return func() { t.read.Add(-1) }, nil defer t.quard.Unlock()
stack := debug.Stack()
t.id++
id := t.id
if t.readStacks == nil {
t.readStacks = make(map[int64][]byte)
}
t.readStacks[id] = stack
return func() {
t.quard.Lock()
defer t.quard.Unlock()
delete(t.readStacks, id)
}, nil
} }
func (t *testQoSLimiter) WriteRequest(context.Context) (qos.ReleaseFunc, error) { func (t *testQoSLimiter) WriteRequest(context.Context) (qos.ReleaseFunc, error) {
t.write.Add(1) t.quard.Lock()
return func() { t.write.Add(-1) }, nil defer t.quard.Unlock()
stack := debug.Stack()
t.id++
id := t.id
if t.writeStacks == nil {
t.writeStacks = make(map[int64][]byte)
}
t.writeStacks[id] = stack
return func() {
t.quard.Lock()
defer t.quard.Unlock()
delete(t.writeStacks, id)
}, nil
} }
func (t *testQoSLimiter) SetParentID(string) {} func (t *testQoSLimiter) SetParentID(string) {}

View file

@ -318,8 +318,6 @@ func (e *StorageEngine) SetShardMode(ctx context.Context, id *shard.ID, m mode.M
// HandleNewEpoch notifies every shard about NewEpoch event. // HandleNewEpoch notifies every shard about NewEpoch event.
func (e *StorageEngine) HandleNewEpoch(ctx context.Context, epoch uint64) { func (e *StorageEngine) HandleNewEpoch(ctx context.Context, epoch uint64) {
ev := shard.EventNewEpoch(epoch)
e.mtx.RLock() e.mtx.RLock()
defer e.mtx.RUnlock() defer e.mtx.RUnlock()
@ -327,7 +325,7 @@ func (e *StorageEngine) HandleNewEpoch(ctx context.Context, epoch uint64) {
select { select {
case <-ctx.Done(): case <-ctx.Done():
return return
case sh.NotificationChannel() <- ev: case sh.NotificationChannel() <- epoch:
default: default:
e.log.Debug(ctx, logs.ShardEventProcessingInProgress, e.log.Debug(ctx, logs.ShardEventProcessingInProgress,
zap.Uint64("epoch", epoch), zap.Stringer("shard", sh.ID())) zap.Uint64("epoch", epoch), zap.Stringer("shard", sh.ID()))

View file

@ -139,8 +139,7 @@ func (db *DB) listWithCursor(tx *bbolt.Tx, result []objectcore.Info, count int,
var containerID cid.ID var containerID cid.ID
var offset []byte var offset []byte
graveyardBkt := tx.Bucket(graveyardBucketName) bc := newBucketCache()
garbageBkt := tx.Bucket(garbageBucketName)
rawAddr := make([]byte, cidSize, addressKeySize) rawAddr := make([]byte, cidSize, addressKeySize)
@ -169,7 +168,7 @@ loop:
bkt := tx.Bucket(name) bkt := tx.Bucket(name)
if bkt != nil { if bkt != nil {
copy(rawAddr, cidRaw) copy(rawAddr, cidRaw)
result, offset, cursor, err = selectNFromBucket(bkt, objType, graveyardBkt, garbageBkt, rawAddr, containerID, result, offset, cursor, err = selectNFromBucket(bc, bkt, objType, rawAddr, containerID,
result, count, cursor, threshold, currEpoch) result, count, cursor, threshold, currEpoch)
if err != nil { if err != nil {
return nil, nil, err return nil, nil, err
@ -204,9 +203,10 @@ loop:
// selectNFromBucket similar to selectAllFromBucket but uses cursor to find // selectNFromBucket similar to selectAllFromBucket but uses cursor to find
// object to start selecting from. Ignores inhumed objects. // object to start selecting from. Ignores inhumed objects.
func selectNFromBucket(bkt *bbolt.Bucket, // main bucket func selectNFromBucket(
bc *bucketCache,
bkt *bbolt.Bucket, // main bucket
objType objectSDK.Type, // type of the objects stored in the main bucket objType objectSDK.Type, // type of the objects stored in the main bucket
graveyardBkt, garbageBkt *bbolt.Bucket, // cached graveyard buckets
cidRaw []byte, // container ID prefix, optimization cidRaw []byte, // container ID prefix, optimization
cnt cid.ID, // container ID cnt cid.ID, // container ID
to []objectcore.Info, // listing result to []objectcore.Info, // listing result
@ -219,7 +219,6 @@ func selectNFromBucket(bkt *bbolt.Bucket, // main bucket
cursor = new(Cursor) cursor = new(Cursor)
} }
count := len(to)
c := bkt.Cursor() c := bkt.Cursor()
k, v := c.First() k, v := c.First()
@ -231,7 +230,7 @@ func selectNFromBucket(bkt *bbolt.Bucket, // main bucket
} }
for ; k != nil; k, v = c.Next() { for ; k != nil; k, v = c.Next() {
if count >= limit { if len(to) >= limit {
break break
} }
@ -241,6 +240,8 @@ func selectNFromBucket(bkt *bbolt.Bucket, // main bucket
} }
offset = k offset = k
graveyardBkt := getGraveyardBucket(bc, bkt.Tx())
garbageBkt := getGarbageBucket(bc, bkt.Tx())
if inGraveyardWithKey(append(cidRaw, k...), graveyardBkt, garbageBkt) > 0 { if inGraveyardWithKey(append(cidRaw, k...), graveyardBkt, garbageBkt) > 0 {
continue continue
} }
@ -251,7 +252,7 @@ func selectNFromBucket(bkt *bbolt.Bucket, // main bucket
} }
expEpoch, hasExpEpoch := hasExpirationEpoch(&o) expEpoch, hasExpEpoch := hasExpirationEpoch(&o)
if hasExpEpoch && expEpoch < currEpoch && !objectLocked(bkt.Tx(), cnt, obj) { if hasExpEpoch && expEpoch < currEpoch && !objectLockedWithCache(bc, bkt.Tx(), cnt, obj) {
continue continue
} }
@ -273,7 +274,6 @@ func selectNFromBucket(bkt *bbolt.Bucket, // main bucket
a.SetContainer(cnt) a.SetContainer(cnt)
a.SetObject(obj) a.SetObject(obj)
to = append(to, objectcore.Info{Address: a, Type: objType, IsLinkingObject: isLinkingObj, ECInfo: ecInfo}) to = append(to, objectcore.Info{Address: a, Type: objType, IsLinkingObject: isLinkingObj, ECInfo: ecInfo})
count++
} }
return to, offset, cursor, nil return to, offset, cursor, nil

View file

@ -111,18 +111,16 @@ func (s *Shard) Init(ctx context.Context) error {
gcCfg: &s.gcCfg, gcCfg: &s.gcCfg,
remover: s.removeGarbage, remover: s.removeGarbage,
stopChannel: make(chan struct{}), stopChannel: make(chan struct{}),
eventChan: make(chan Event), newEpochChan: make(chan uint64),
mEventHandler: map[eventType]*eventHandlers{ newEpochHandlers: &newEpochHandlers{
eventNewEpoch: {
cancelFunc: func() {}, cancelFunc: func() {},
handlers: []eventHandler{ handlers: []newEpochHandler{
s.collectExpiredLocks, s.collectExpiredLocks,
s.collectExpiredObjects, s.collectExpiredObjects,
s.collectExpiredTombstones, s.collectExpiredTombstones,
s.collectExpiredMetrics, s.collectExpiredMetrics,
}, },
}, },
},
} }
if s.gc.metrics != nil { if s.gc.metrics != nil {
s.gc.metrics.SetShardID(s.info.ID.String()) s.gc.metrics.SetShardID(s.info.ID.String())

View file

@ -33,41 +33,14 @@ type TombstoneSource interface {
IsTombstoneAvailable(ctx context.Context, addr oid.Address, epoch uint64) bool IsTombstoneAvailable(ctx context.Context, addr oid.Address, epoch uint64) bool
} }
// Event represents class of external events. type newEpochHandler func(context.Context, uint64)
type Event interface {
typ() eventType
}
type eventType int type newEpochHandlers struct {
const (
_ eventType = iota
eventNewEpoch
)
type newEpoch struct {
epoch uint64
}
func (e newEpoch) typ() eventType {
return eventNewEpoch
}
// EventNewEpoch returns new epoch event.
func EventNewEpoch(e uint64) Event {
return newEpoch{
epoch: e,
}
}
type eventHandler func(context.Context, Event)
type eventHandlers struct {
prevGroup sync.WaitGroup prevGroup sync.WaitGroup
cancelFunc context.CancelFunc cancelFunc context.CancelFunc
handlers []eventHandler handlers []newEpochHandler
} }
type gcRunResult struct { type gcRunResult struct {
@ -109,10 +82,10 @@ type gc struct {
remover func(context.Context) gcRunResult remover func(context.Context) gcRunResult
// eventChan is used only for listening for the new epoch event. // newEpochChan is used only for listening for the new epoch event.
// It is ok to keep opened, we are listening for context done when writing in it. // It is ok to keep opened, we are listening for context done when writing in it.
eventChan chan Event newEpochChan chan uint64
mEventHandler map[eventType]*eventHandlers newEpochHandlers *newEpochHandlers
} }
type gcCfg struct { type gcCfg struct {
@ -142,15 +115,7 @@ func defaultGCCfg() gcCfg {
} }
func (gc *gc) init(ctx context.Context) { func (gc *gc) init(ctx context.Context) {
sz := 0 gc.workerPool = gc.workerPoolInit(len(gc.newEpochHandlers.handlers))
for _, v := range gc.mEventHandler {
sz += len(v.handlers)
}
if sz > 0 {
gc.workerPool = gc.workerPoolInit(sz)
}
ctx = tagging.ContextWithIOTag(ctx, qos.IOTagBackground.String()) ctx = tagging.ContextWithIOTag(ctx, qos.IOTagBackground.String())
gc.wg.Add(2) gc.wg.Add(2)
go gc.tickRemover(ctx) go gc.tickRemover(ctx)
@ -168,7 +133,7 @@ func (gc *gc) listenEvents(ctx context.Context) {
case <-ctx.Done(): case <-ctx.Done():
gc.log.Warn(ctx, logs.ShardStopEventListenerByContext) gc.log.Warn(ctx, logs.ShardStopEventListenerByContext)
return return
case event, ok := <-gc.eventChan: case event, ok := <-gc.newEpochChan:
if !ok { if !ok {
gc.log.Warn(ctx, logs.ShardStopEventListenerByClosedEventChannel) gc.log.Warn(ctx, logs.ShardStopEventListenerByClosedEventChannel)
return return
@ -179,38 +144,33 @@ func (gc *gc) listenEvents(ctx context.Context) {
} }
} }
func (gc *gc) handleEvent(ctx context.Context, event Event) { func (gc *gc) handleEvent(ctx context.Context, epoch uint64) {
v, ok := gc.mEventHandler[event.typ()] gc.newEpochHandlers.cancelFunc()
if !ok { gc.newEpochHandlers.prevGroup.Wait()
return
}
v.cancelFunc()
v.prevGroup.Wait()
var runCtx context.Context var runCtx context.Context
runCtx, v.cancelFunc = context.WithCancel(ctx) runCtx, gc.newEpochHandlers.cancelFunc = context.WithCancel(ctx)
v.prevGroup.Add(len(v.handlers)) gc.newEpochHandlers.prevGroup.Add(len(gc.newEpochHandlers.handlers))
for i := range v.handlers { for i := range gc.newEpochHandlers.handlers {
select { select {
case <-ctx.Done(): case <-ctx.Done():
return return
default: default:
} }
h := v.handlers[i] h := gc.newEpochHandlers.handlers[i]
err := gc.workerPool.Submit(func() { err := gc.workerPool.Submit(func() {
defer v.prevGroup.Done() defer gc.newEpochHandlers.prevGroup.Done()
h(runCtx, event) h(runCtx, epoch)
}) })
if err != nil { if err != nil {
gc.log.Warn(ctx, logs.ShardCouldNotSubmitGCJobToWorkerPool, gc.log.Warn(ctx, logs.ShardCouldNotSubmitGCJobToWorkerPool,
zap.Error(err), zap.Error(err),
) )
v.prevGroup.Done() gc.newEpochHandlers.prevGroup.Done()
} }
} }
} }
@ -267,6 +227,9 @@ func (gc *gc) stop(ctx context.Context) {
gc.log.Info(ctx, logs.ShardWaitingForGCWorkersToStop) gc.log.Info(ctx, logs.ShardWaitingForGCWorkersToStop)
gc.wg.Wait() gc.wg.Wait()
gc.newEpochHandlers.cancelFunc()
gc.newEpochHandlers.prevGroup.Wait()
} }
// iterates over metabase and deletes objects // iterates over metabase and deletes objects
@ -362,7 +325,7 @@ func (s *Shard) getExpiredObjectsParameters() (workerCount, batchSize int) {
return return
} }
func (s *Shard) collectExpiredObjects(ctx context.Context, e Event) { func (s *Shard) collectExpiredObjects(ctx context.Context, epoch uint64) {
var err error var err error
startedAt := time.Now() startedAt := time.Now()
@ -370,8 +333,8 @@ func (s *Shard) collectExpiredObjects(ctx context.Context, e Event) {
s.gc.metrics.AddExpiredObjectCollectionDuration(time.Since(startedAt), err == nil, objectTypeRegular) s.gc.metrics.AddExpiredObjectCollectionDuration(time.Since(startedAt), err == nil, objectTypeRegular)
}() }()
s.log.Debug(ctx, logs.ShardGCCollectingExpiredObjectsStarted, zap.Uint64("epoch", e.(newEpoch).epoch)) s.log.Debug(ctx, logs.ShardGCCollectingExpiredObjectsStarted, zap.Uint64("epoch", epoch))
defer s.log.Debug(ctx, logs.ShardGCCollectingExpiredObjectsCompleted, zap.Uint64("epoch", e.(newEpoch).epoch)) defer s.log.Debug(ctx, logs.ShardGCCollectingExpiredObjectsCompleted, zap.Uint64("epoch", epoch))
workersCount, batchSize := s.getExpiredObjectsParameters() workersCount, batchSize := s.getExpiredObjectsParameters()
@ -380,7 +343,7 @@ func (s *Shard) collectExpiredObjects(ctx context.Context, e Event) {
errGroup.Go(func() error { errGroup.Go(func() error {
batch := make([]oid.Address, 0, batchSize) batch := make([]oid.Address, 0, batchSize)
expErr := s.getExpiredObjects(egCtx, e.(newEpoch).epoch, func(o *meta.ExpiredObject) { expErr := s.getExpiredObjects(egCtx, epoch, func(o *meta.ExpiredObject) {
if o.Type() != objectSDK.TypeTombstone && o.Type() != objectSDK.TypeLock { if o.Type() != objectSDK.TypeTombstone && o.Type() != objectSDK.TypeLock {
batch = append(batch, o.Address()) batch = append(batch, o.Address())
@ -486,7 +449,7 @@ func (s *Shard) inhumeGC(ctx context.Context, addrs []oid.Address) (meta.InhumeR
return s.metaBase.Inhume(ctx, inhumePrm) return s.metaBase.Inhume(ctx, inhumePrm)
} }
func (s *Shard) collectExpiredTombstones(ctx context.Context, e Event) { func (s *Shard) collectExpiredTombstones(ctx context.Context, epoch uint64) {
var err error var err error
startedAt := time.Now() startedAt := time.Now()
@ -494,7 +457,6 @@ func (s *Shard) collectExpiredTombstones(ctx context.Context, e Event) {
s.gc.metrics.AddExpiredObjectCollectionDuration(time.Since(startedAt), err == nil, objectTypeTombstone) s.gc.metrics.AddExpiredObjectCollectionDuration(time.Since(startedAt), err == nil, objectTypeTombstone)
}() }()
epoch := e.(newEpoch).epoch
log := s.log.With(zap.Uint64("epoch", epoch)) log := s.log.With(zap.Uint64("epoch", epoch))
log.Debug(ctx, logs.ShardStartedExpiredTombstonesHandling) log.Debug(ctx, logs.ShardStartedExpiredTombstonesHandling)
@ -527,7 +489,8 @@ func (s *Shard) collectExpiredTombstones(ctx context.Context, e Event) {
return return
} }
release, err := s.opsLimiter.ReadRequest(ctx) var release qos.ReleaseFunc
release, err = s.opsLimiter.ReadRequest(ctx)
if err != nil { if err != nil {
log.Error(ctx, logs.ShardIteratorOverGraveyardFailed, zap.Error(err)) log.Error(ctx, logs.ShardIteratorOverGraveyardFailed, zap.Error(err))
s.m.RUnlock() s.m.RUnlock()
@ -565,7 +528,7 @@ func (s *Shard) collectExpiredTombstones(ctx context.Context, e Event) {
} }
} }
func (s *Shard) collectExpiredLocks(ctx context.Context, e Event) { func (s *Shard) collectExpiredLocks(ctx context.Context, epoch uint64) {
var err error var err error
startedAt := time.Now() startedAt := time.Now()
@ -573,8 +536,8 @@ func (s *Shard) collectExpiredLocks(ctx context.Context, e Event) {
s.gc.metrics.AddExpiredObjectCollectionDuration(time.Since(startedAt), err == nil, objectTypeLock) s.gc.metrics.AddExpiredObjectCollectionDuration(time.Since(startedAt), err == nil, objectTypeLock)
}() }()
s.log.Debug(ctx, logs.ShardGCCollectingExpiredLocksStarted, zap.Uint64("epoch", e.(newEpoch).epoch)) s.log.Debug(ctx, logs.ShardGCCollectingExpiredLocksStarted, zap.Uint64("epoch", epoch))
defer s.log.Debug(ctx, logs.ShardGCCollectingExpiredLocksCompleted, zap.Uint64("epoch", e.(newEpoch).epoch)) defer s.log.Debug(ctx, logs.ShardGCCollectingExpiredLocksCompleted, zap.Uint64("epoch", epoch))
workersCount, batchSize := s.getExpiredObjectsParameters() workersCount, batchSize := s.getExpiredObjectsParameters()
@ -584,14 +547,14 @@ func (s *Shard) collectExpiredLocks(ctx context.Context, e Event) {
errGroup.Go(func() error { errGroup.Go(func() error {
batch := make([]oid.Address, 0, batchSize) batch := make([]oid.Address, 0, batchSize)
expErr := s.getExpiredObjects(egCtx, e.(newEpoch).epoch, func(o *meta.ExpiredObject) { expErr := s.getExpiredObjects(egCtx, epoch, func(o *meta.ExpiredObject) {
if o.Type() == objectSDK.TypeLock { if o.Type() == objectSDK.TypeLock {
batch = append(batch, o.Address()) batch = append(batch, o.Address())
if len(batch) == batchSize { if len(batch) == batchSize {
expired := batch expired := batch
errGroup.Go(func() error { errGroup.Go(func() error {
s.expiredLocksCallback(egCtx, e.(newEpoch).epoch, expired) s.expiredLocksCallback(egCtx, epoch, expired)
return egCtx.Err() return egCtx.Err()
}) })
batch = make([]oid.Address, 0, batchSize) batch = make([]oid.Address, 0, batchSize)
@ -605,7 +568,7 @@ func (s *Shard) collectExpiredLocks(ctx context.Context, e Event) {
if len(batch) > 0 { if len(batch) > 0 {
expired := batch expired := batch
errGroup.Go(func() error { errGroup.Go(func() error {
s.expiredLocksCallback(egCtx, e.(newEpoch).epoch, expired) s.expiredLocksCallback(egCtx, epoch, expired)
return egCtx.Err() return egCtx.Err()
}) })
} }
@ -704,7 +667,10 @@ func (s *Shard) HandleExpiredTombstones(ctx context.Context, tss []meta.Tombston
// HandleExpiredLocks unlocks all objects which were locked by lockers. // HandleExpiredLocks unlocks all objects which were locked by lockers.
// If successful, marks lockers themselves as garbage. // If successful, marks lockers themselves as garbage.
func (s *Shard) HandleExpiredLocks(ctx context.Context, epoch uint64, lockers []oid.Address) { func (s *Shard) HandleExpiredLocks(ctx context.Context, epoch uint64, lockers []oid.Address) {
if s.GetMode().NoMetabase() { s.m.RLock()
defer s.m.RUnlock()
if s.info.Mode.NoMetabase() {
return return
} }
@ -767,7 +733,10 @@ func (s *Shard) inhumeUnlockedIfExpired(ctx context.Context, epoch uint64, unloc
// HandleDeletedLocks unlocks all objects which were locked by lockers. // HandleDeletedLocks unlocks all objects which were locked by lockers.
func (s *Shard) HandleDeletedLocks(ctx context.Context, lockers []oid.Address) { func (s *Shard) HandleDeletedLocks(ctx context.Context, lockers []oid.Address) {
if s.GetMode().NoMetabase() { s.m.RLock()
defer s.m.RUnlock()
if s.info.Mode.NoMetabase() {
return return
} }
@ -784,17 +753,15 @@ func (s *Shard) HandleDeletedLocks(ctx context.Context, lockers []oid.Address) {
} }
} }
// NotificationChannel returns channel for shard events. // NotificationChannel returns channel for new epoch events.
func (s *Shard) NotificationChannel() chan<- Event { func (s *Shard) NotificationChannel() chan<- uint64 {
return s.gc.eventChan return s.gc.newEpochChan
} }
func (s *Shard) collectExpiredMetrics(ctx context.Context, e Event) { func (s *Shard) collectExpiredMetrics(ctx context.Context, epoch uint64) {
ctx, span := tracing.StartSpanFromContext(ctx, "shard.collectExpiredMetrics") ctx, span := tracing.StartSpanFromContext(ctx, "shard.collectExpiredMetrics")
defer span.End() defer span.End()
epoch := e.(newEpoch).epoch
s.log.Debug(ctx, logs.ShardGCCollectingExpiredMetricsStarted, zap.Uint64("epoch", epoch)) s.log.Debug(ctx, logs.ShardGCCollectingExpiredMetricsStarted, zap.Uint64("epoch", epoch))
defer s.log.Debug(ctx, logs.ShardGCCollectingExpiredMetricsCompleted, zap.Uint64("epoch", epoch)) defer s.log.Debug(ctx, logs.ShardGCCollectingExpiredMetricsCompleted, zap.Uint64("epoch", epoch))

View file

@ -69,7 +69,7 @@ func Test_GCDropsLockedExpiredSimpleObject(t *testing.T) {
require.NoError(t, err) require.NoError(t, err)
epoch.Value = 105 epoch.Value = 105
sh.gc.handleEvent(context.Background(), EventNewEpoch(epoch.Value)) sh.gc.handleEvent(context.Background(), epoch.Value)
var getPrm GetPrm var getPrm GetPrm
getPrm.SetAddress(objectCore.AddressOf(obj)) getPrm.SetAddress(objectCore.AddressOf(obj))
@ -165,7 +165,7 @@ func Test_GCDropsLockedExpiredComplexObject(t *testing.T) {
require.True(t, errors.As(err, &splitInfoError), "split info must be provided") require.True(t, errors.As(err, &splitInfoError), "split info must be provided")
epoch.Value = 105 epoch.Value = 105
sh.gc.handleEvent(context.Background(), EventNewEpoch(epoch.Value)) sh.gc.handleEvent(context.Background(), epoch.Value)
_, err = sh.Get(context.Background(), getPrm) _, err = sh.Get(context.Background(), getPrm)
require.True(t, client.IsErrObjectNotFound(err) || IsErrObjectExpired(err), "expired complex object must be deleted on epoch after lock expires") require.True(t, client.IsErrObjectNotFound(err) || IsErrObjectExpired(err), "expired complex object must be deleted on epoch after lock expires")

View file

@ -9,15 +9,10 @@ import (
"time" "time"
internalNet "git.frostfs.info/TrueCloudLab/frostfs-node/internal/net" internalNet "git.frostfs.info/TrueCloudLab/frostfs-node/internal/net"
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/qos"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/network" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/network"
metrics "git.frostfs.info/TrueCloudLab/frostfs-observability/metrics/grpc"
tracing "git.frostfs.info/TrueCloudLab/frostfs-observability/tracing/grpc"
"git.frostfs.info/TrueCloudLab/frostfs-qos/tagging"
"github.com/hashicorp/golang-lru/v2/simplelru" "github.com/hashicorp/golang-lru/v2/simplelru"
"google.golang.org/grpc" "google.golang.org/grpc"
"google.golang.org/grpc/connectivity" "google.golang.org/grpc/connectivity"
"google.golang.org/grpc/credentials/insecure"
) )
type clientCache struct { type clientCache struct {
@ -95,27 +90,13 @@ func (c *clientCache) dialTreeService(ctx context.Context, netmapAddr string) (*
return nil, err return nil, err
} }
opts := []grpc.DialOption{ cc, err := createConnection(netAddr, grpc.WithContextDialer(c.ds.GrpcContextDialer()))
grpc.WithChainUnaryInterceptor( if err != nil {
qos.NewAdjustOutgoingIOTagUnaryClientInterceptor(), return nil, err
metrics.NewUnaryClientInterceptor(),
tracing.NewUnaryClientInterceptor(),
tagging.NewUnaryClientInterceptor(),
),
grpc.WithChainStreamInterceptor(
qos.NewAdjustOutgoingIOTagStreamClientInterceptor(),
metrics.NewStreamClientInterceptor(),
tracing.NewStreamClientInterceptor(),
tagging.NewStreamClientInterceptor(),
),
grpc.WithContextDialer(c.ds.GrpcContextDialer()),
grpc.WithDefaultCallOptions(grpc.WaitForReady(true)),
grpc.WithDisableServiceConfig(),
} }
if !netAddr.IsTLSEnabled() { ctx, cancel := context.WithTimeout(ctx, defaultClientConnectTimeout)
opts = append(opts, grpc.WithTransportCredentials(insecure.NewCredentials())) defer cancel()
}
req := &HealthcheckRequest{ req := &HealthcheckRequest{
Body: &HealthcheckRequest_Body{}, Body: &HealthcheckRequest_Body{},
@ -124,13 +105,6 @@ func (c *clientCache) dialTreeService(ctx context.Context, netmapAddr string) (*
return nil, err return nil, err
} }
cc, err := grpc.NewClient(netAddr.URIAddr(), opts...)
if err != nil {
return nil, err
}
ctx, cancel := context.WithTimeout(ctx, defaultClientConnectTimeout)
defer cancel()
// perform some request to check connection // perform some request to check connection
if _, err := NewTreeServiceClient(cc).Healthcheck(ctx, req); err != nil { if _, err := NewTreeServiceClient(cc).Healthcheck(ctx, req); err != nil {
_ = cc.Close() _ = cc.Close()

View file

@ -3,6 +3,7 @@ package tree
import ( import (
"context" "context"
"crypto/sha256" "crypto/sha256"
"crypto/tls"
"errors" "errors"
"fmt" "fmt"
"io" "io"
@ -22,12 +23,14 @@ import (
tracing "git.frostfs.info/TrueCloudLab/frostfs-observability/tracing" tracing "git.frostfs.info/TrueCloudLab/frostfs-observability/tracing"
tracing_grpc "git.frostfs.info/TrueCloudLab/frostfs-observability/tracing/grpc" tracing_grpc "git.frostfs.info/TrueCloudLab/frostfs-observability/tracing/grpc"
"git.frostfs.info/TrueCloudLab/frostfs-qos/tagging" "git.frostfs.info/TrueCloudLab/frostfs-qos/tagging"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/api/rpc/client"
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id" cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
netmapSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/netmap" netmapSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/netmap"
"github.com/panjf2000/ants/v2" "github.com/panjf2000/ants/v2"
"go.uber.org/zap" "go.uber.org/zap"
"golang.org/x/sync/errgroup" "golang.org/x/sync/errgroup"
"google.golang.org/grpc" "google.golang.org/grpc"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/credentials/insecure" "google.golang.org/grpc/credentials/insecure"
) )
@ -301,7 +304,7 @@ func (s *Service) synchronizeTree(ctx context.Context, cid cid.ID, from uint64,
return false return false
} }
cc, err := s.createConnection(a) cc, err := createConnection(a, grpc.WithContextDialer(s.ds.GrpcContextDialer()))
if err != nil { if err != nil {
s.log.Warn(ctx, logs.TreeFailedToConnectForTreeSynchronization, zap.Error(err), zap.String("address", addr)) s.log.Warn(ctx, logs.TreeFailedToConnectForTreeSynchronization, zap.Error(err), zap.String("address", addr))
return false return false
@ -339,8 +342,18 @@ func (s *Service) synchronizeTree(ctx context.Context, cid cid.ID, from uint64,
return from return from
} }
func (*Service) createConnection(a network.Address) (*grpc.ClientConn, error) { func createConnection(a network.Address, opts ...grpc.DialOption) (*grpc.ClientConn, error) {
return grpc.NewClient(a.URIAddr(), host, isTLS, err := client.ParseURI(a.URIAddr())
if err != nil {
return nil, err
}
creds := insecure.NewCredentials()
if isTLS {
creds = credentials.NewTLS(&tls.Config{})
}
defaultOpts := []grpc.DialOption{
grpc.WithChainUnaryInterceptor( grpc.WithChainUnaryInterceptor(
qos.NewAdjustOutgoingIOTagUnaryClientInterceptor(), qos.NewAdjustOutgoingIOTagUnaryClientInterceptor(),
metrics.NewUnaryClientInterceptor(), metrics.NewUnaryClientInterceptor(),
@ -353,10 +366,12 @@ func (*Service) createConnection(a network.Address) (*grpc.ClientConn, error) {
tracing_grpc.NewStreamClientInterceptor(), tracing_grpc.NewStreamClientInterceptor(),
tagging.NewStreamClientInterceptor(), tagging.NewStreamClientInterceptor(),
), ),
grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithTransportCredentials(creds),
grpc.WithDefaultCallOptions(grpc.WaitForReady(true)), grpc.WithDefaultCallOptions(grpc.WaitForReady(true)),
grpc.WithDisableServiceConfig(), grpc.WithDisableServiceConfig(),
) }
return grpc.NewClient(host, append(defaultOpts, opts...)...)
} }
// ErrAlreadySyncing is returned when a service synchronization has already // ErrAlreadySyncing is returned when a service synchronization has already

View file

@ -36,6 +36,9 @@ type Prm struct {
// PrependTimestamp specifies whether to prepend a timestamp in the log // PrependTimestamp specifies whether to prepend a timestamp in the log
PrependTimestamp bool PrependTimestamp bool
// Options for zap.Logger
Options []zap.Option
} }
const ( const (
@ -103,10 +106,12 @@ func newConsoleLogger(prm Prm) (*Logger, error) {
c.EncoderConfig.TimeKey = "" c.EncoderConfig.TimeKey = ""
} }
lZap, err := c.Build( opts := []zap.Option{
zap.AddStacktrace(zap.NewAtomicLevelAt(zap.FatalLevel)), zap.AddStacktrace(zap.NewAtomicLevelAt(zap.FatalLevel)),
zap.AddCallerSkip(1), zap.AddCallerSkip(1),
) }
opts = append(opts, prm.Options...)
lZap, err := c.Build(opts...)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -150,7 +155,12 @@ func newJournaldLogger(prm Prm) (*Logger, error) {
c.Sampling.Thereafter, c.Sampling.Thereafter,
samplerOpts..., samplerOpts...,
) )
lZap := zap.New(samplingCore, zap.AddStacktrace(zap.NewAtomicLevelAt(zap.FatalLevel)), zap.AddCallerSkip(1)) opts := []zap.Option{
zap.AddStacktrace(zap.NewAtomicLevelAt(zap.FatalLevel)),
zap.AddCallerSkip(1),
}
opts = append(opts, prm.Options...)
lZap := zap.New(samplingCore, opts...)
l := &Logger{z: lZap, lvl: lvl} l := &Logger{z: lZap, lvl: lvl}
@ -161,10 +171,6 @@ func (l *Logger) Reload(prm Prm) {
l.lvl.SetLevel(prm.level) l.lvl.SetLevel(prm.level)
} }
func (l *Logger) WithOptions(options ...zap.Option) {
l.z = l.z.WithOptions(options...)
}
func (l *Logger) With(fields ...zap.Field) *Logger { func (l *Logger) With(fields ...zap.Field) *Logger {
return &Logger{z: l.z.With(fields...)} return &Logger{z: l.z.With(fields...)}
} }

View file

@ -0,0 +1,36 @@
package testing
import (
"context"
"errors"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/netmap"
)
var (
errInvalidDiff = errors.New("invalid diff")
errNetmapNotFound = errors.New("netmap not found")
)
type TestNetmapSource struct {
Netmaps map[uint64]*netmap.NetMap
CurrentEpoch uint64
}
func (s *TestNetmapSource) GetNetMap(ctx context.Context, diff uint64) (*netmap.NetMap, error) {
if diff >= s.CurrentEpoch {
return nil, errInvalidDiff
}
return s.GetNetMapByEpoch(ctx, s.CurrentEpoch-diff)
}
func (s *TestNetmapSource) GetNetMapByEpoch(_ context.Context, epoch uint64) (*netmap.NetMap, error) {
if nm, found := s.Netmaps[epoch]; found {
return nm, nil
}
return nil, errNetmapNotFound
}
func (s *TestNetmapSource) Epoch(context.Context) (uint64, error) {
return s.CurrentEpoch, nil
}