Compare commits

...

25 commits

Author SHA1 Message Date
7edec9193c [#1451] placement: Return copy of slice from container nodes cache
Nodes from cache could be changed by traverser, if no objectID specified.
So it is required to return copy of cache's slice.

Signed-off-by: Dmitrii Stepanov <d.stepanov@yadro.com>
2024-11-06 08:18:10 +00:00
3cf6ea745d [#1451] ec: Check all parts are saved
Signed-off-by: Dmitrii Stepanov <d.stepanov@yadro.com>
2024-11-06 08:18:10 +00:00
9a77527f46 [#1451] ape: Drop unused
Signed-off-by: Dmitrii Stepanov <d.stepanov@yadro.com>
2024-11-06 08:18:10 +00:00
5b1ba8e23d [#1451] ape: Perform strict APE checks for EC parts
Signed-off-by: Dmitrii Stepanov <d.stepanov@yadro.com>
2024-11-06 08:18:10 +00:00
9902965ff4 [#1451] writer: Sign EC parts with node's private key
As EC put request may be processed only by container node, so sign requests
with current node private to not to perform APE checks.

Signed-off-by: Dmitrii Stepanov <d.stepanov@yadro.com>
2024-11-06 08:18:10 +00:00
33ad753302 [#1473] policer: Add tracing span
To filter HEAD requests from policer.

Signed-off-by: Dmitrii Stepanov <d.stepanov@yadro.com>
2024-11-06 08:15:11 +00:00
15102e6dfd [#1471] Replace sort.Slice in some places
`slices.SortFunc` doesn't use reflection and is a bit faster.
I have done some micro-benchmarks for `[]NodeInfo`:
```
$ benchstat -col "/func" out
goos: linux
goarch: amd64
pkg: git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/pilorama
cpu: 11th Gen Intel(R) Core(TM) i5-1135G7 @ 2.40GHz
       │ sort.Slice  │           slices.SortFunc           │
       │   sec/op    │   sec/op     vs base                │
Sort-8   2.130µ ± 2%   1.253µ ± 2%  -41.20% (p=0.000 n=10)
```

Haven't included them, though, as they I don't see them being used a
lot.

Signed-off-by: Evgenii Stratonikov <e.stratonikov@yadro.com>
2024-11-06 08:07:32 +00:00
17ec84151b
[#532] cli: Respect XDG base directory spec
XDG base directory specification defines where various files
should be looked by an application. Hopefully, this makes `frostfs-cli`
more predictable and pleasant to work with. Luckily for us, golang already
has everything we need in the stdlib. This commit also gets rid of
`github.com/mitchellh/go-homedir` dependency.

Close #532
Refs #1455

Signed-off-by: Evgenii Stratonikov <e.stratonikov@yadro.com>
2024-11-05 16:27:18 +03:00
6c45a17af6
[#1467] node: Break notary deposit wait after VUB
Signed-off-by: Dmitrii Stepanov <d.stepanov@yadro.com>
2024-11-01 16:42:02 +03:00
d19ab43500
[#1462] node: Add off-cpu profiler
Signed-off-by: Dmitrii Stepanov <d.stepanov@yadro.com>
2024-10-31 11:32:13 +03:00
5bcf81d1cc
[#1466] Remove woodpecker CI
We use forgejo actions now.

Signed-off-by: Evgenii Stratonikov <e.stratonikov@yadro.com>
2024-10-31 10:07:33 +03:00
c2effcc61c [#1465] Makefile: Update golangci-lint, fix warnings
Signed-off-by: Evgenii Stratonikov <e.stratonikov@yadro.com>
2024-10-31 06:39:59 +00:00
2285cfc36f
[#1464] frostfsid: Cache subject not found error
Signed-off-by: Dmitrii Stepanov <d.stepanov@yadro.com>
2024-10-30 18:27:33 +03:00
e74d05c03f
[#1464] frostfsid: Add cache metrics
Signed-off-by: Dmitrii Stepanov <d.stepanov@yadro.com>
2024-10-30 18:27:32 +03:00
48862e0e63 [#1459] .golanci.yml: Add tenv linter, fix issues
Refs #1309

Signed-off-by: Evgenii Stratonikov <e.stratonikov@yadro.com>
2024-10-30 15:18:22 +00:00
89892d9754 [#1459] cli: Simplify slice append
Signed-off-by: Evgenii Stratonikov <e.stratonikov@yadro.com>
2024-10-30 15:18:22 +00:00
7ac0852364 [#1459] .golangci.yml: Add intrange linter, fix issues
Signed-off-by: Evgenii Stratonikov <e.stratonikov@yadro.com>
2024-10-30 15:18:22 +00:00
d28a5d2d7a
[#1448] container/ape: Ignore an error when getting a role
When getting a role in the APE checker for the container services,
an error may be returned if network maps of the previous two epochs
don't have enough nodes to fulfil a container placement policy.
It's a logical error, so we should ignore it.

Signed-off-by: Aleksey Savchuk <a.savchuk@yadro.com>
2024-10-30 12:51:57 +03:00
87ac3c5279 [#1458] object: Make patch not set key before target construction
* `SignRequestPrivateKey` field should be initialized either within
  `newUntrustedTarget` or within `newTrustedTarget`. Otherwise, all
  requests are signed by local node key that makes impossible to perform
  patch on non-container node.

Signed-off-by: Airat Arifullin <a.arifullin@yadro.com>
2024-10-29 15:20:28 +00:00
d5ee6d3039
[#1456] morph: Use DialerSource interface instead of internal struct
Signed-off-by: Dmitrii Stepanov <d.stepanov@yadro.com>
2024-10-29 17:34:14 +03:00
433aab12bb
[#1455] cli: Handle missing home directory
go-homedir library incorrectly handles some of the errors
that could occur. It is archived, so no PR, but let's fix it on our
side. The scenario in case: executing command in an empty environment.

Signed-off-by: Evgenii Stratonikov <e.stratonikov@yadro.com>
2024-10-29 16:37:55 +03:00
81f4cdbb91 [#1439] object: Sort nodes by priority metrics to compute GET request
Signed-off-by: Anton Nikiforov <an.nikiforov@yadro.com>
2024-10-29 08:05:09 +00:00
3cd7d23f10 [#1439] node: Reduce usage of netmapAPI.NodeInfo
Remove outdated code from `netmap` service.

Signed-off-by: Anton Nikiforov <an.nikiforov@yadro.com>
2024-10-29 08:05:09 +00:00
012af5cc38 [#1406] tree: Add unit-tests for ape check
Signed-off-by: Airat Arifullin <a.arifullin@yadro.com>
2024-10-29 08:04:23 +00:00
eb5336d5ff [#1406] tree: Use delete verb instead put for Remove
Signed-off-by: Airat Arifullin <a.arifullin@yadro.com>
2024-10-29 08:04:23 +00:00
62 changed files with 1012 additions and 256 deletions

View file

@ -87,5 +87,7 @@ linters:
- perfsprint
- testifylint
- protogetter
- intrange
- tenv
disable-all: true
fast: false

View file

@ -1,11 +0,0 @@
pipeline:
# Kludge for non-root containers under WoodPecker
fix-ownership:
image: alpine:latest
commands: chown -R 1234:1234 .
pre-commit:
image: git.frostfs.info/truecloudlab/frostfs-ci:v0.36
commands:
- export HOME="$(getent passwd $(id -u) | cut '-d:' -f6)"
- pre-commit run --hook-stage manual

View file

@ -8,7 +8,7 @@ HUB_IMAGE ?= git.frostfs.info/truecloudlab/frostfs
HUB_TAG ?= "$(shell echo ${VERSION} | sed 's/^v//')"
GO_VERSION ?= 1.22
LINT_VERSION ?= 1.60.3
LINT_VERSION ?= 1.61.0
TRUECLOUDLAB_LINT_VERSION ?= 0.0.7
PROTOC_VERSION ?= 25.0
PROTOGEN_FROSTFS_VERSION ?= $(shell go list -f '{{.Version}}' -m git.frostfs.info/TrueCloudLab/frostfs-api-go/v2)

View file

@ -128,7 +128,7 @@ func generateConfigExample(appDir string, credSize int) (string, error) {
tmpl.AlphabetDir = filepath.Join(appDir, "alphabet-wallets")
var i innerring.GlagoliticLetter
for i = 0; i < innerring.GlagoliticLetter(credSize); i++ {
for i = range innerring.GlagoliticLetter(credSize) {
tmpl.Glagolitics = append(tmpl.Glagolitics, i.String())
}

View file

@ -63,7 +63,7 @@ func TestGenerateAlphabet(t *testing.T) {
buf.Reset()
v.Set(commonflags.AlphabetWalletsFlag, walletDir)
require.NoError(t, GenerateAlphabetCmd.Flags().Set(commonflags.AlphabetSizeFlag, strconv.FormatUint(size, 10)))
for i := uint64(0); i < size; i++ {
for i := range uint64(size) {
buf.WriteString(strconv.FormatUint(i, 10) + "\r")
}

View file

@ -659,9 +659,7 @@ func SearchObjects(ctx context.Context, prm SearchObjectsPrm) (*SearchObjectsRes
for {
n, ok = rdr.Read(buf)
for i := range n {
list = append(list, buf[i])
}
list = append(list, buf[:n]...)
if !ok {
break
}
@ -672,9 +670,8 @@ func SearchObjects(ctx context.Context, prm SearchObjectsPrm) (*SearchObjectsRes
return nil, fmt.Errorf("read object list: %w", err)
}
sort.Slice(list, func(i, j int) bool {
lhs, rhs := list[i].EncodeToString(), list[j].EncodeToString()
return strings.Compare(lhs, rhs) < 0
slices.SortFunc(list, func(a, b oid.ID) int {
return strings.Compare(a.EncodeToString(), b.EncodeToString())
})
return &SearchObjectsRes{

View file

@ -2,7 +2,7 @@ package common
import (
"context"
"sort"
"slices"
"strings"
"git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-cli/internal/commonflags"
@ -45,15 +45,11 @@ func StartClientCommandSpan(cmd *cobra.Command) {
})
commonCmd.ExitOnErr(cmd, "init tracing: %w", err)
var components sort.StringSlice
var components []string
for c := cmd; c != nil; c = c.Parent() {
components = append(components, c.Name())
}
for i, j := 0, len(components)-1; i < j; {
components.Swap(i, j)
i++
j--
}
slices.Reverse(components)
operation := strings.Join(components, ".")
ctx, span := tracing.StartSpanFromContext(cmd.Context(), operation)

View file

@ -195,7 +195,7 @@ func flattenComplexMembersIfECContainer(cmd *cobra.Command, cnrID cid.ID, member
prmHead.SetRawFlag(true) // to get an error instead of whole object
eg, egCtx := errgroup.WithContext(cmd.Context())
for idx := range len(members) {
for idx := range members {
partObjID := members[idx]
eg.Go(func() error {

View file

@ -21,7 +21,6 @@ import (
"git.frostfs.info/TrueCloudLab/frostfs-node/misc"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/config"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/gendoc"
"github.com/mitchellh/go-homedir"
"github.com/spf13/cobra"
"github.com/spf13/viper"
)
@ -112,14 +111,16 @@ func initConfig() {
// Use config file from the flag.
viper.SetConfigFile(cfgFile)
} else {
// Find home directory.
home, err := homedir.Dir()
commonCmd.ExitOnErr(rootCmd, "", err)
// Search config in `$HOME/.config/frostfs-cli/` with name "config.yaml"
viper.AddConfigPath(filepath.Join(home, ".config", "frostfs-cli"))
viper.SetConfigName("config")
viper.SetConfigType("yaml")
// Find config directory.
configDir, err := os.UserConfigDir()
if err != nil {
common.PrintVerbose(rootCmd, "Get config dir: %s", err)
} else {
// Search config in `$XDG_CONFIG_HOME/frostfs-cli/` with name "config.yaml"
viper.AddConfigPath(filepath.Join(configDir, "frostfs-cli"))
viper.SetConfigName("config")
viper.SetConfigType("yaml")
}
}
viper.SetEnvPrefix(envPrefix)

View file

@ -11,7 +11,7 @@ func DecodeOIDs(data []byte) ([]oid.ID, error) {
size := r.ReadVarUint()
oids := make([]oid.ID, size)
for i := uint64(0); i < size; i++ {
for i := range size {
if err := oids[i].Decode(r.ReadVarBytes()); err != nil {
return nil, err
}

View file

@ -58,6 +58,7 @@ import (
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/control"
objectService "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object"
getsvc "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/get"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object_manager/placement"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object_manager/tombstone"
tsourse "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object_manager/tombstone/source"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/replicator"
@ -109,6 +110,7 @@ type applicationConfiguration struct {
ObjectCfg struct {
tombstoneLifetime uint64
priorityMetrics []placement.Metric
}
EngineCfg struct {
@ -232,6 +234,15 @@ func (a *applicationConfiguration) readConfig(c *config.Config) error {
// Object
a.ObjectCfg.tombstoneLifetime = objectconfig.TombstoneLifetime(c)
var pm []placement.Metric
for _, raw := range objectconfig.Get(c).Priority() {
m, err := placement.ParseMetric(raw)
if err != nil {
return err
}
pm = append(pm, m)
}
a.ObjectCfg.priorityMetrics = pm
// Storage Engine
@ -1179,17 +1190,15 @@ func initObjectPool(cfg *config.Config) (pool cfgObjectRoutines) {
return pool
}
func (c *cfg) LocalNodeInfo() (*netmapV2.NodeInfo, error) {
var res netmapV2.NodeInfo
func (c *cfg) LocalNodeInfo() *netmap.NodeInfo {
var res netmap.NodeInfo
ni, ok := c.cfgNetmap.state.getNodeInfo()
if ok {
ni.WriteToV2(&res)
res = ni
} else {
c.cfgNodeInfo.localInfo.WriteToV2(&res)
res = c.cfgNodeInfo.localInfo
}
return &res, nil
return &res
}
// setContractNodeInfo rewrites local node info from the FrostFS network map.

View file

@ -1,7 +1,6 @@
package config_test
import (
"os"
"strings"
"testing"
@ -38,8 +37,7 @@ func TestConfigEnv(t *testing.T) {
envName := strings.ToUpper(
strings.Join([]string{config.EnvPrefix, section, name}, configViper.EnvSeparator))
err := os.Setenv(envName, value)
require.NoError(t, err)
t.Setenv(envName, value)
c := configtest.EmptyConfig()

View file

@ -10,10 +10,17 @@ type PutConfig struct {
cfg *config.Config
}
// GetConfig is a wrapper over "get" config section which provides access
// to object get pipeline configuration of object service.
type GetConfig struct {
cfg *config.Config
}
const (
subsection = "object"
putSubsection = "put"
getSubsection = "get"
// PutPoolSizeDefault is a default value of routine pool size to
// process object.Put requests in object service.
@ -56,3 +63,16 @@ func (g PutConfig) PoolSizeLocal() int {
func (g PutConfig) SkipSessionTokenIssuerVerification() bool {
return config.BoolSafe(g.cfg, "skip_session_token_issuer_verification")
}
// Get returns structure that provides access to "get" subsection of
// "object" section.
func Get(c *config.Config) GetConfig {
return GetConfig{
c.Sub(subsection).Sub(getSubsection),
}
}
// Priority returns the value of "priority" config parameter.
func (g GetConfig) Priority() []string {
return config.StringSliceSafe(g.cfg, "priority")
}

View file

@ -11,8 +11,6 @@ import (
)
func fromFile(path string) *config.Config {
os.Clearenv() // ENVs have priority over config files, so we do this in tests
return config.New(path, "", "")
}
@ -40,15 +38,6 @@ func ForEachFileType(pref string, f func(*config.Config)) {
// ForEnvFileType creates config from `<pref>.env` file.
func ForEnvFileType(t testing.TB, pref string, f func(*config.Config)) {
envs := os.Environ()
t.Cleanup(func() {
os.Clearenv()
for _, env := range envs {
keyValue := strings.Split(env, "=")
os.Setenv(keyValue[0], keyValue[1])
}
})
f(fromEnvFile(t, pref+".env"))
}
@ -73,7 +62,6 @@ func loadEnv(t testing.TB, path string) {
v = strings.Trim(v, `"`)
err = os.Setenv(k, v)
require.NoError(t, err, "can't set environment variable")
t.Setenv(k, v)
}
}

View file

@ -8,6 +8,7 @@ import (
containerGRPC "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/container/grpc"
morphconfig "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/morph"
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/metrics"
containerCore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/container"
frostfsidcore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/frostfsid"
cntClient "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/client/container"
@ -42,7 +43,7 @@ func initContainerService(_ context.Context, c *cfg) {
cacheSize := morphconfig.FrostfsIDCacheSize(c.appCfg)
if cacheSize > 0 {
frostfsIDSubjectProvider = newMorphFrostfsIDCache(frostfsIDSubjectProvider, int(cacheSize), c.cfgMorph.cacheTTL)
frostfsIDSubjectProvider = newMorphFrostfsIDCache(frostfsIDSubjectProvider, int(cacheSize), c.cfgMorph.cacheTTL, metrics.NewCacheMetrics("frostfs_id"))
}
c.shared.frostfsidClient = frostfsIDSubjectProvider

View file

@ -1,6 +1,7 @@
package main
import (
"strings"
"time"
"git.frostfs.info/TrueCloudLab/frostfs-contract/frostfsid/client"
@ -9,57 +10,101 @@ import (
"github.com/nspcc-dev/neo-go/pkg/util"
)
type subjectWithError struct {
subject *client.Subject
err error
}
type subjectExtWithError struct {
subject *client.SubjectExtended
err error
}
type morphFrostfsIDCache struct {
subjProvider frostfsidcore.SubjectProvider
subjCache *expirable.LRU[util.Uint160, *client.Subject]
subjCache *expirable.LRU[util.Uint160, subjectWithError]
subjExtCache *expirable.LRU[util.Uint160, *client.SubjectExtended]
subjExtCache *expirable.LRU[util.Uint160, subjectExtWithError]
metrics cacheMetrics
}
func newMorphFrostfsIDCache(subjProvider frostfsidcore.SubjectProvider, size int, ttl time.Duration) frostfsidcore.SubjectProvider {
func newMorphFrostfsIDCache(subjProvider frostfsidcore.SubjectProvider, size int, ttl time.Duration, metrics cacheMetrics) frostfsidcore.SubjectProvider {
return &morphFrostfsIDCache{
subjProvider: subjProvider,
subjCache: expirable.NewLRU(size, func(util.Uint160, *client.Subject) {}, ttl),
subjCache: expirable.NewLRU(size, func(util.Uint160, subjectWithError) {}, ttl),
subjExtCache: expirable.NewLRU(size, func(util.Uint160, *client.SubjectExtended) {}, ttl),
subjExtCache: expirable.NewLRU(size, func(util.Uint160, subjectExtWithError) {}, ttl),
metrics: metrics,
}
}
func (m *morphFrostfsIDCache) GetSubject(addr util.Uint160) (*client.Subject, error) {
hit := false
startedAt := time.Now()
defer func() {
m.metrics.AddMethodDuration("GetSubject", time.Since(startedAt), hit)
}()
result, found := m.subjCache.Get(addr)
if found {
return result, nil
hit = true
return result.subject, result.err
}
result, err := m.subjProvider.GetSubject(addr)
subj, err := m.subjProvider.GetSubject(addr)
if err != nil {
if m.isCacheableError(err) {
m.subjCache.Add(addr, subjectWithError{
err: err,
})
}
return nil, err
}
m.subjCache.Add(addr, result)
return result, nil
m.subjCache.Add(addr, subjectWithError{subject: subj})
return subj, nil
}
func (m *morphFrostfsIDCache) GetSubjectExtended(addr util.Uint160) (*client.SubjectExtended, error) {
subjExt, found := m.subjExtCache.Get(addr)
hit := false
startedAt := time.Now()
defer func() {
m.metrics.AddMethodDuration("GetSubjectExtended", time.Since(startedAt), hit)
}()
result, found := m.subjExtCache.Get(addr)
if found {
return subjExt, nil
hit = true
return result.subject, result.err
}
var err error
subjExt, err = m.subjProvider.GetSubjectExtended(addr)
subjExt, err := m.subjProvider.GetSubjectExtended(addr)
if err != nil {
if m.isCacheableError(err) {
m.subjExtCache.Add(addr, subjectExtWithError{
err: err,
})
m.subjCache.Add(addr, subjectWithError{
err: err,
})
}
return nil, err
}
m.subjExtCache.Add(addr, subjExt)
m.subjCache.Add(addr, subjectFromSubjectExtended(subjExt))
m.subjExtCache.Add(addr, subjectExtWithError{subject: subjExt})
m.subjCache.Add(addr, subjectWithError{subject: subjectFromSubjectExtended(subjExt)})
return subjExt, nil
}
func (m *morphFrostfsIDCache) isCacheableError(err error) bool {
return strings.Contains(err.Error(), frostfsidcore.SubjectNotFoundErrorMessage)
}
func subjectFromSubjectExtended(subjExt *client.SubjectExtended) *client.Subject {
return &client.Subject{
PrimaryKey: subjExt.PrimaryKey,

View file

@ -17,15 +17,16 @@ import (
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/rand"
"github.com/nspcc-dev/neo-go/pkg/core/block"
"github.com/nspcc-dev/neo-go/pkg/core/state"
"github.com/nspcc-dev/neo-go/pkg/neorpc/result"
"github.com/nspcc-dev/neo-go/pkg/rpcclient/waiter"
"github.com/nspcc-dev/neo-go/pkg/smartcontract/trigger"
"github.com/nspcc-dev/neo-go/pkg/util"
"github.com/nspcc-dev/neo-go/pkg/vm/vmstate"
"go.uber.org/zap"
)
const (
newEpochNotification = "NewEpoch"
// amount of tries(blocks) before notary deposit timeout.
notaryDepositRetriesAmount = 300
)
func (c *cfg) initMorphComponents(ctx context.Context) {
@ -128,7 +129,7 @@ func makeAndWaitNotaryDeposit(ctx context.Context, c *cfg) {
return
}
tx, err := makeNotaryDeposit(c)
tx, vub, err := makeNotaryDeposit(c)
fatalOnErr(err)
if tx.Equals(util.Uint256{}) {
@ -139,11 +140,11 @@ func makeAndWaitNotaryDeposit(ctx context.Context, c *cfg) {
return
}
err = waitNotaryDeposit(ctx, c, tx)
err = waitNotaryDeposit(ctx, c, tx, vub)
fatalOnErr(err)
}
func makeNotaryDeposit(c *cfg) (util.Uint256, error) {
func makeNotaryDeposit(c *cfg) (util.Uint256, uint32, error) {
const (
// gasMultiplier defines how many times more the notary
// balance must be compared to the GAS balance of the node:
@ -157,7 +158,7 @@ func makeNotaryDeposit(c *cfg) (util.Uint256, error) {
depositAmount, err := client.CalculateNotaryDepositAmount(c.cfgMorph.client, gasMultiplier, gasDivisor)
if err != nil {
return util.Uint256{}, fmt.Errorf("could not calculate notary deposit: %w", err)
return util.Uint256{}, 0, fmt.Errorf("could not calculate notary deposit: %w", err)
}
return c.cfgMorph.client.DepositEndlessNotary(depositAmount)
@ -168,32 +169,43 @@ var (
errNotaryDepositTimeout = errors.New("notary deposit tx has not appeared in the network")
)
func waitNotaryDeposit(ctx context.Context, c *cfg, tx util.Uint256) error {
for range notaryDepositRetriesAmount {
c.log.Debug(logs.ClientAttemptToWaitForNotaryDepositTransactionToGetPersisted)
select {
case <-ctx.Done():
return ctx.Err()
default:
}
type waiterClient struct {
c *client.Client
}
ok, err := c.cfgMorph.client.TxHalt(tx)
if err == nil {
if ok {
c.log.Info(logs.ClientNotaryDepositTransactionWasSuccessfullyPersisted)
return nil
}
func (w *waiterClient) Context() context.Context {
return context.Background()
}
return errNotaryDepositFail
}
func (w *waiterClient) GetApplicationLog(hash util.Uint256, trig *trigger.Type) (*result.ApplicationLog, error) {
return w.c.GetApplicationLog(hash, trig)
}
err = c.cfgMorph.client.Wait(ctx, 1)
if err != nil {
return fmt.Errorf("could not wait for one block in chain: %w", err)
}
func (w *waiterClient) GetBlockCount() (uint32, error) {
return w.c.BlockCount()
}
func (w *waiterClient) GetVersion() (*result.Version, error) {
return w.c.GetVersion()
}
func waitNotaryDeposit(ctx context.Context, c *cfg, tx util.Uint256, vub uint32) error {
w, err := waiter.NewPollingBased(&waiterClient{c: c.cfgMorph.client})
if err != nil {
return fmt.Errorf("could not create notary deposit waiter: %w", err)
}
return errNotaryDepositTimeout
res, err := w.WaitAny(ctx, vub, tx)
if err != nil {
if errors.Is(err, waiter.ErrTxNotAccepted) {
return errNotaryDepositTimeout
}
return fmt.Errorf("could not wait for notary deposit persists in chain: %w", err)
}
if res.Execution.VMState.HasFlag(vmstate.Halt) {
c.log.Info(logs.ClientNotaryDepositTransactionWasSuccessfullyPersisted)
return nil
}
return errNotaryDepositFail
}
func listenMorphNotifications(ctx context.Context, c *cfg) {

View file

@ -192,7 +192,7 @@ func addNewEpochNotificationHandlers(c *cfg) {
if c.cfgMorph.notaryEnabled {
addNewEpochAsyncNotificationHandler(c, func(_ event.Event) {
_, err := makeNotaryDeposit(c)
_, _, err := makeNotaryDeposit(c)
if err != nil {
c.log.Error(logs.FrostFSNodeCouldNotMakeNotaryDeposit,
zap.String("error", err.Error()),

View file

@ -178,7 +178,8 @@ func initObjectService(c *cfg) {
sSearchV2 := createSearchSvcV2(sSearch, keyStorage)
sGet := createGetService(c, keyStorage, traverseGen, c.clientCache, c.cfgObject.cnrSource)
sGet := createGetService(c, keyStorage, traverseGen, c.clientCache, c.cfgObject.cnrSource,
c.ObjectCfg.priorityMetrics)
*c.cfgObject.getSvc = *sGet // need smth better
@ -389,6 +390,7 @@ func createSearchSvcV2(sSearch *searchsvc.Service, keyStorage *util.KeyStorage)
func createGetService(c *cfg, keyStorage *util.KeyStorage, traverseGen *util.TraverserGenerator,
coreConstructor *cache.ClientCache,
containerSource containercore.Source,
priorityMetrics []placement.Metric,
) *getsvc.Service {
ls := c.cfgObject.cfgLocalStorage.localStorage
@ -398,6 +400,8 @@ func createGetService(c *cfg, keyStorage *util.KeyStorage, traverseGen *util.Tra
ls,
traverseGen.WithTraverseOptions(
placement.SuccessAfter(1),
placement.WithPriorityMetrics(priorityMetrics),
placement.WithNodeState(c),
),
coreConstructor,
containerSource,

View file

@ -87,6 +87,7 @@ FROSTFS_OBJECT_PUT_REMOTE_POOL_SIZE=100
FROSTFS_OBJECT_PUT_LOCAL_POOL_SIZE=200
FROSTFS_OBJECT_PUT_SKIP_SESSION_TOKEN_ISSUER_VERIFICATION=true
FROSTFS_OBJECT_DELETE_TOMBSTONE_LIFETIME=10
FROSTFS_OBJECT_GET_PRIORITY="$attribute:ClusterName $attribute:UN-LOCODE"
# Storage engine section
FROSTFS_STORAGE_SHARD_POOL_SIZE=15

View file

@ -131,6 +131,9 @@
"remote_pool_size": 100,
"local_pool_size": 200,
"skip_session_token_issuer_verification": true
},
"get": {
"priority": ["$attribute:ClusterName", "$attribute:UN-LOCODE"]
}
},
"storage": {

View file

@ -114,6 +114,10 @@ object:
remote_pool_size: 100 # number of async workers for remote PUT operations
local_pool_size: 200 # number of async workers for local PUT operations
skip_session_token_issuer_verification: true # session token issuer verification will be skipped if true
get:
priority: # list of metrics of nodes for prioritization
- $attribute:ClusterName
- $attribute:UN-LOCODE
storage:
# note: shard configuration can be omitted for relay node (see `node.relay`)

View file

@ -407,13 +407,17 @@ Contains object-service related parameters.
object:
put:
remote_pool_size: 100
get:
priority:
- $attribute:ClusterName
```
| Parameter | Type | Default value | Description |
|-----------------------------|-------|---------------|------------------------------------------------------------------------------------------------|
| `delete.tombstone_lifetime` | `int` | `5` | Tombstone lifetime for removed objects in epochs. |
| `put.remote_pool_size` | `int` | `10` | Max pool size for performing remote `PUT` operations. Used by Policer and Replicator services. |
| `put.local_pool_size` | `int` | `10` | Max pool size for performing local `PUT` operations. Used by Policer and Replicator services. |
| Parameter | Type | Default value | Description |
|-----------------------------|------------|---------------|------------------------------------------------------------------------------------------------------|
| `delete.tombstone_lifetime` | `int` | `5` | Tombstone lifetime for removed objects in epochs. |
| `put.remote_pool_size` | `int` | `10` | Max pool size for performing remote `PUT` operations. Used by Policer and Replicator services. |
| `put.local_pool_size` | `int` | `10` | Max pool size for performing local `PUT` operations. Used by Policer and Replicator services. |
| `get.priority` | `[]string` | | List of metrics of nodes for prioritization. Used for computing response on GET and SEARCH requests. |
# `runtime` section
Contains runtime parameters.

3
go.mod
View file

@ -19,6 +19,7 @@ require (
github.com/cheggaaa/pb v1.0.29
github.com/chzyer/readline v1.5.1
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc
github.com/felixge/fgprof v0.9.5
github.com/flynn-archive/go-shlex v0.0.0-20150515145356-3f9db97f8568
github.com/gdamore/tcell/v2 v2.7.4
github.com/go-pkgz/expirable-cache/v3 v3.0.0
@ -26,7 +27,6 @@ require (
github.com/hashicorp/golang-lru/v2 v2.0.7
github.com/klauspost/compress v1.17.4
github.com/mailru/easyjson v0.7.7
github.com/mitchellh/go-homedir v1.1.0
github.com/mr-tron/base58 v1.2.0
github.com/multiformats/go-multiaddr v0.12.1
github.com/nspcc-dev/neo-go v0.106.3
@ -77,6 +77,7 @@ require (
github.com/go-logr/logr v1.4.2 // indirect
github.com/go-logr/stdr v1.2.2 // indirect
github.com/golang/snappy v0.0.4 // indirect
github.com/google/pprof v0.0.0-20240227163752-401108e1b7e7 // indirect
github.com/gorilla/websocket v1.5.1 // indirect
github.com/grpc-ecosystem/go-grpc-middleware/providers/prometheus v1.0.0 // indirect
github.com/grpc-ecosystem/go-grpc-middleware/v2 v2.0.1 // indirect

BIN
go.sum

Binary file not shown.

View file

@ -142,7 +142,6 @@ const (
ClientNotaryRequestWithPreparedMainTXInvoked = "notary request with prepared main TX invoked"
ClientNotaryRequestInvoked = "notary request invoked"
ClientNotaryDepositTransactionWasSuccessfullyPersisted = "notary deposit transaction was successfully persisted"
ClientAttemptToWaitForNotaryDepositTransactionToGetPersisted = "attempt to wait for notary deposit transaction to get persisted"
ClientNeoClientInvoke = "neo client invoke"
ClientNativeGasTransferInvoke = "native gas transfer invoke"
ClientBatchGasTransferInvoke = "batch gas transfer invoke"

View file

@ -40,13 +40,14 @@ func (s *Server) depositMainNotary() (tx util.Uint256, err error) {
)
}
func (s *Server) depositSideNotary() (tx util.Uint256, err error) {
func (s *Server) depositSideNotary() (util.Uint256, error) {
depositAmount, err := client.CalculateNotaryDepositAmount(s.morphClient, gasMultiplier, gasDivisor)
if err != nil {
return util.Uint256{}, fmt.Errorf("could not calculate side notary deposit amount: %w", err)
}
return s.morphClient.DepositEndlessNotary(depositAmount)
tx, _, err := s.morphClient.DepositEndlessNotary(depositAmount)
return tx, err
}
func (s *Server) notaryHandler(_ event.Event) {

View file

@ -61,7 +61,7 @@ func TestBlobovniczaTreeFillPercentRebuild(t *testing.T) {
require.NoError(t, b.Init())
storageIDs := make(map[oid.Address][]byte)
for i := 0; i < 100; i++ {
for range 100 {
obj := blobstortest.NewObject(64 * 1024) // 64KB object
data, err := obj.Marshal()
require.NoError(t, err)
@ -168,7 +168,7 @@ func TestBlobovniczaTreeFillPercentRebuild(t *testing.T) {
storageIDs := make(map[oid.Address][]byte)
toDelete := make(map[oid.Address][]byte)
for i := 0; i < 100; i++ { // 2 objects for one blobovnicza, so 50 DBs total will be created
for i := range 100 { // 2 objects for one blobovnicza, so 50 DBs total will be created
obj := blobstortest.NewObject(64 * 1024)
data, err := obj.Marshal()
require.NoError(t, err)
@ -236,7 +236,7 @@ func TestBlobovniczaTreeFillPercentRebuild(t *testing.T) {
require.NoError(t, b.Init())
storageIDs := make(map[oid.Address][]byte)
for i := 0; i < 100; i++ { // 2 objects for one blobovnicza, so 50 DBs total will be created
for range 100 { // 2 objects for one blobovnicza, so 50 DBs total will be created
obj := blobstortest.NewObject(64 * 1024)
data, err := obj.Marshal()
require.NoError(t, err)

View file

@ -47,7 +47,7 @@ func TestIterateObjects(t *testing.T) {
mObjs := make(map[string]addrData)
for i := uint64(0); i < objNum; i++ {
for i := range uint64(objNum) {
sz := smalSz
big := i < objNum/2

View file

@ -151,7 +151,7 @@ func TestErrorReporting(t *testing.T) {
checkShardState(t, te.ng, te.shards[1].id, 0, mode.ReadWrite)
}
for i := uint32(0); i < 2; i++ {
for i := range uint32(2) {
_, err = te.ng.Get(context.Background(), GetPrm{addr: object.AddressOf(obj)})
require.Error(t, err)
checkShardState(t, te.ng, te.shards[0].id, errThreshold+i, mode.ReadOnly)

View file

@ -1,9 +1,9 @@
package pilorama
import (
"cmp"
"encoding/binary"
"slices"
"sort"
"sync"
"time"
@ -48,8 +48,8 @@ func (b *batch) run() {
// Sorting without a mutex is ok, because we append to this slice only if timer is non-nil.
// See (*boltForest).addBatch for details.
sort.Slice(b.operations, func(i, j int) bool {
return b.operations[i].Time < b.operations[j].Time
slices.SortFunc(b.operations, func(mi, mj *Move) int {
return cmp.Compare(mi.Time, mj.Time)
})
b.operations = slices.CompactFunc(b.operations, func(x, y *Move) bool { return x.Time == y.Time })

View file

@ -10,7 +10,6 @@ import (
"os"
"path/filepath"
"slices"
"sort"
"strconv"
"sync"
"time"
@ -705,7 +704,7 @@ func (t *boltForest) applyOperation(logBucket, treeBucket *bbolt.Bucket, ms []*M
key, value = c.Prev()
}
for i := range len(ms) {
for i := range ms {
// Loop invariant: key represents the next stored timestamp after ms[i].Time.
// 2. Insert the operation.
@ -1093,14 +1092,19 @@ func (t *boltForest) TreeSortedByFilename(ctx context.Context, cid cidSDK.ID, tr
return res, last, metaerr.Wrap(err)
}
func sortByFilename(nodes []NodeInfo) {
slices.SortFunc(nodes, func(a, b NodeInfo) int {
return bytes.Compare(a.Meta.GetAttr(AttributeFilename), b.Meta.GetAttr(AttributeFilename))
})
}
func sortAndCut(result []NodeInfo, last *string) []NodeInfo {
var lastBytes []byte
if last != nil {
lastBytes = []byte(*last)
}
sort.Slice(result, func(i, j int) bool {
return bytes.Compare(result[i].Meta.GetAttr(AttributeFilename), result[j].Meta.GetAttr(AttributeFilename)) == -1
})
sortByFilename(result)
for i := range result {
if lastBytes == nil || bytes.Compare(lastBytes, result[i].Meta.GetAttr(AttributeFilename)) == -1 {
return result[i:]

View file

@ -1,7 +1,6 @@
package pilorama
import (
"bytes"
"context"
"errors"
"fmt"
@ -192,9 +191,7 @@ func (f *memoryForest) TreeSortedByFilename(_ context.Context, cid cid.ID, treeI
return nil, start, nil
}
sort.Slice(res, func(i, j int) bool {
return bytes.Compare(res[i].Meta.GetAttr(AttributeFilename), res[j].Meta.GetAttr(AttributeFilename)) == -1
})
sortByFilename(res)
r := mergeNodeInfos(res)
for i := range r {

View file

@ -1081,7 +1081,7 @@ func prepareRandomTree(nodeCount, opCount int) []Move {
}
func compareForests(t *testing.T, expected, actual Forest, cid cidSDK.ID, treeID string, nodeCount int) {
for i := uint64(0); i < uint64(nodeCount); i++ {
for i := range uint64(nodeCount) {
expectedMeta, expectedParent, err := expected.TreeGetMeta(context.Background(), cid, treeID, i)
require.NoError(t, err)
actualMeta, actualParent, err := actual.TreeGetMeta(context.Background(), cid, treeID, i)

View file

@ -1,6 +1,9 @@
package pilorama
import "sort"
import (
"cmp"
"slices"
)
// nodeInfo couples parent and metadata.
type nodeInfo struct {
@ -131,10 +134,10 @@ func (t tree) getChildren(parent Node) []Node {
}
}
sort.Slice(children, func(i, j int) bool {
a := t.infoMap[children[i]]
b := t.infoMap[children[j]]
return a.Meta.Time < b.Meta.Time
slices.SortFunc(children, func(ci, cj uint64) int {
a := t.infoMap[ci]
b := t.infoMap[cj]
return cmp.Compare(a.Meta.Time, b.Meta.Time)
})
return children
}

View file

@ -216,7 +216,7 @@ func TestRefillMetabase(t *testing.T) {
locked := make([]oid.ID, 1, 2)
locked[0] = oidtest.ID()
cnrLocked := cidtest.ID()
for i := uint64(0); i < objNum; i++ {
for range objNum {
obj := objecttest.Object()
obj.SetType(objectSDK.TypeRegular)

View file

@ -14,7 +14,7 @@ func TestLimiter(t *testing.T) {
l := newFlushLimiter(uint64(maxSize))
var currSize atomic.Int64
var eg errgroup.Group
for i := 0; i < 10_000; i++ {
for range 10_000 {
eg.Go(func() error {
defer l.release(single)
defer currSize.Add(-1)

View file

@ -19,6 +19,7 @@ import (
"github.com/nspcc-dev/neo-go/pkg/core/transaction"
"github.com/nspcc-dev/neo-go/pkg/crypto/keys"
"github.com/nspcc-dev/neo-go/pkg/encoding/fixedn"
"github.com/nspcc-dev/neo-go/pkg/neorpc/result"
"github.com/nspcc-dev/neo-go/pkg/rpcclient"
"github.com/nspcc-dev/neo-go/pkg/rpcclient/actor"
"github.com/nspcc-dev/neo-go/pkg/rpcclient/gas"
@ -461,6 +462,28 @@ func (c *Client) TxHalt(h util.Uint256) (res bool, err error) {
return len(aer.Executions) > 0 && aer.Executions[0].VMState.HasFlag(vmstate.Halt), nil
}
func (c *Client) GetApplicationLog(hash util.Uint256, trig *trigger.Type) (*result.ApplicationLog, error) {
c.switchLock.RLock()
defer c.switchLock.RUnlock()
if c.inactive {
return nil, ErrConnectionLost
}
return c.client.GetApplicationLog(hash, trig)
}
func (c *Client) GetVersion() (*result.Version, error) {
c.switchLock.RLock()
defer c.switchLock.RUnlock()
if c.inactive {
return nil, ErrConnectionLost
}
return c.client.GetVersion()
}
// TxHeight returns true if transaction has been successfully executed and persisted.
func (c *Client) TxHeight(h util.Uint256) (res uint32, err error) {
c.switchLock.RLock()

View file

@ -4,11 +4,11 @@ import (
"context"
"errors"
"fmt"
"net"
"time"
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/metrics"
internalNet "git.frostfs.info/TrueCloudLab/frostfs-node/internal/net"
morphmetrics "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/metrics"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger"
lru "github.com/hashicorp/golang-lru/v2"
@ -48,7 +48,7 @@ type cfg struct {
morphCacheMetrics metrics.MorphCacheMetrics
dialerSource *internalNet.DialerSource
dialerSource DialerSource
}
const (
@ -68,6 +68,7 @@ func defaultConfig() *cfg {
Scopes: transaction.Global,
},
morphCacheMetrics: &morphmetrics.NoopMorphCacheMetrics{},
dialerSource: &noopDialerSource{},
}
}
@ -296,7 +297,17 @@ func WithMorphCacheMetrics(morphCacheMetrics metrics.MorphCacheMetrics) Option {
}
}
func WithDialerSource(ds *internalNet.DialerSource) Option {
type DialerSource interface {
NetContextDialer() func(context.Context, string, string) (net.Conn, error)
}
type noopDialerSource struct{}
func (ds *noopDialerSource) NetContextDialer() func(context.Context, string, string) (net.Conn, error) {
return nil
}
func WithDialerSource(ds DialerSource) Option {
return func(c *cfg) {
c.dialerSource = ds
}

View file

@ -140,7 +140,7 @@ func (c *Client) ProbeNotary() (res bool) {
// use this function.
//
// This function must be invoked with notary enabled otherwise it throws panic.
func (c *Client) DepositNotary(amount fixedn.Fixed8, delta uint32) (res util.Uint256, err error) {
func (c *Client) DepositNotary(amount fixedn.Fixed8, delta uint32) (util.Uint256, error) {
c.switchLock.RLock()
defer c.switchLock.RUnlock()
@ -163,7 +163,8 @@ func (c *Client) DepositNotary(amount fixedn.Fixed8, delta uint32) (res util.Uin
}
till := max(int64(bc+delta), currentTill)
return c.depositNotary(amount, till)
res, _, err := c.depositNotary(amount, till)
return res, err
}
// DepositEndlessNotary calls notary deposit method. Unlike `DepositNotary`,
@ -171,12 +172,12 @@ func (c *Client) DepositNotary(amount fixedn.Fixed8, delta uint32) (res util.Uin
// This allows to avoid ValidAfterDeposit failures.
//
// This function must be invoked with notary enabled otherwise it throws panic.
func (c *Client) DepositEndlessNotary(amount fixedn.Fixed8) (res util.Uint256, err error) {
func (c *Client) DepositEndlessNotary(amount fixedn.Fixed8) (util.Uint256, uint32, error) {
c.switchLock.RLock()
defer c.switchLock.RUnlock()
if c.inactive {
return util.Uint256{}, ErrConnectionLost
return util.Uint256{}, 0, ErrConnectionLost
}
if c.notary == nil {
@ -187,7 +188,7 @@ func (c *Client) DepositEndlessNotary(amount fixedn.Fixed8) (res util.Uint256, e
return c.depositNotary(amount, math.MaxUint32)
}
func (c *Client) depositNotary(amount fixedn.Fixed8, till int64) (res util.Uint256, err error) {
func (c *Client) depositNotary(amount fixedn.Fixed8, till int64) (util.Uint256, uint32, error) {
txHash, vub, err := c.gasToken.Transfer(
c.accAddr,
c.notary.notary,
@ -195,7 +196,7 @@ func (c *Client) depositNotary(amount fixedn.Fixed8, till int64) (res util.Uint2
[]any{c.acc.PrivateKey().GetScriptHash(), till})
if err != nil {
if !errors.Is(err, neorpc.ErrAlreadyExists) {
return util.Uint256{}, fmt.Errorf("can't make notary deposit: %w", err)
return util.Uint256{}, 0, fmt.Errorf("can't make notary deposit: %w", err)
}
// Transaction is already in mempool waiting to be processed.
@ -205,7 +206,7 @@ func (c *Client) depositNotary(amount fixedn.Fixed8, till int64) (res util.Uint2
zap.Int64("expire_at", till),
zap.Uint32("vub", vub),
zap.Error(err))
return util.Uint256{}, nil
return util.Uint256{}, 0, nil
}
c.logger.Info(logs.ClientNotaryDepositInvoke,
@ -214,7 +215,7 @@ func (c *Client) depositNotary(amount fixedn.Fixed8, till int64) (res util.Uint2
zap.Uint32("vub", vub),
zap.Stringer("tx_hash", txHash.Reverse()))
return txHash, nil
return txHash, vub, nil
}
// GetNotaryDeposit returns deposit of client's account in notary contract.

View file

@ -8,7 +8,7 @@ import (
)
func tickN(t *timer.BlockTimer, n uint32) {
for i := uint32(0); i < n; i++ {
for range n {
t.Tick(0)
}
}

View file

@ -537,10 +537,7 @@ func (ac *apeChecker) isContainerKey(pk []byte, cnrID cid.ID, cont *containercor
return false, err
}
in, err := isContainerNode(nm, pk, binCnrID, cont)
if err != nil {
return false, err
} else if in {
if isContainerNode(nm, pk, binCnrID, cont) {
return true, nil
}
@ -551,24 +548,24 @@ func (ac *apeChecker) isContainerKey(pk []byte, cnrID cid.ID, cont *containercor
return false, err
}
return isContainerNode(nm, pk, binCnrID, cont)
return isContainerNode(nm, pk, binCnrID, cont), nil
}
func isContainerNode(nm *netmapSDK.NetMap, pk, binCnrID []byte, cont *containercore.Container) (bool, error) {
cnrVectors, err := nm.ContainerNodes(cont.Value.PlacementPolicy(), binCnrID)
if err != nil {
return false, err
}
func isContainerNode(nm *netmapSDK.NetMap, pk, binCnrID []byte, cont *containercore.Container) bool {
// It could an error only if the network map doesn't have enough nodes to
// fulfil the policy. It's a logical error that doesn't affect an actor role
// determining, so we ignore it
cnrVectors, _ := nm.ContainerNodes(cont.Value.PlacementPolicy(), binCnrID)
for i := range cnrVectors {
for j := range cnrVectors[i] {
if bytes.Equal(cnrVectors[i][j].PublicKey(), pk) {
return true, nil
return true
}
}
}
return false, nil
return false
}
func (ac *apeChecker) namespaceByOwner(owner *refs.OwnerID) (string, error) {

View file

@ -28,7 +28,7 @@ type executorSvc struct {
type NodeState interface {
// LocalNodeInfo must return current node state
// in FrostFS API v2 NodeInfo structure.
LocalNodeInfo() (*netmap.NodeInfo, error)
LocalNodeInfo() *netmapSDK.NodeInfo
// ReadCurrentNetMap reads current local network map of the storage node
// into the given parameter. Returns any error encountered which prevented
@ -64,39 +64,15 @@ func NewExecutionService(s NodeState, v versionsdk.Version, netInfo NetworkInfo,
func (s *executorSvc) LocalNodeInfo(
_ context.Context,
req *netmap.LocalNodeInfoRequest,
_ *netmap.LocalNodeInfoRequest,
) (*netmap.LocalNodeInfoResponse, error) {
verV2 := req.GetMetaHeader().GetVersion()
if verV2 == nil {
return nil, errors.New("missing version")
}
var ver versionsdk.Version
if err := ver.ReadFromV2(*verV2); err != nil {
return nil, fmt.Errorf("can't read version: %w", err)
}
ni, err := s.state.LocalNodeInfo()
if err != nil {
return nil, err
}
if addrNum := ni.NumberOfAddresses(); addrNum > 0 && ver.Minor() <= 7 {
ni2 := new(netmap.NodeInfo)
ni2.SetPublicKey(ni.GetPublicKey())
ni2.SetState(ni.GetState())
ni2.SetAttributes(ni.GetAttributes())
ni.IterateAddresses(func(s string) bool {
ni2.SetAddresses(s)
return true
})
ni = ni2
}
ni := s.state.LocalNodeInfo()
var nodeInfo netmap.NodeInfo
ni.WriteToV2(&nodeInfo)
body := new(netmap.LocalNodeInfoResponseBody)
body.SetVersion(&s.version)
body.SetNodeInfo(ni)
body.SetNodeInfo(&nodeInfo)
resp := new(netmap.LocalNodeInfoResponse)
resp.SetBody(body)

View file

@ -50,7 +50,7 @@ func testGenerateMetaHeader(depth uint32, b *acl.BearerToken, s *session.Token)
metaHeader.SetBearerToken(b)
metaHeader.SetSessionToken(s)
for i := uint32(0); i < depth; i++ {
for range depth {
link := metaHeader
metaHeader = new(session.RequestMetaHeader)
metaHeader.SetOrigin(link)

View file

@ -67,9 +67,6 @@ type Prm struct {
// If SoftAPECheck is set to true, then NoRuleFound is interpreted as allow.
SoftAPECheck bool
// If true, object headers will not retrieved from storage engine.
WithoutHeaderRequest bool
// The request's bearer token. It is used in order to check APE overrides with the token.
BearerToken *bearer.Token

View file

@ -695,6 +695,7 @@ func TestPutECChunk(t *testing.T) {
nm := &netmapStub{
currentEpoch: 100,
netmaps: map[uint64]*netmapSDK.NetMap{
99: netmap,
100: netmap,
},
}

View file

@ -3,6 +3,7 @@ package ape
import (
"context"
"crypto/sha256"
"errors"
"fmt"
"net"
"strconv"
@ -11,6 +12,7 @@ import (
aperequest "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/ape/request"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/netmap"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object"
apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/acl"
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
@ -24,6 +26,8 @@ import (
var defaultRequest = aperequest.Request{}
var errECMissingParentObjectID = errors.New("missing EC parent object ID")
func nativeSchemaRole(role acl.Role) string {
switch role {
case acl.RoleOwner:
@ -116,13 +120,16 @@ func (c *checkerImpl) newAPERequest(ctx context.Context, prm Prm) (aperequest.Re
var header *objectV2.Header
if prm.Header != nil {
header = prm.Header
} else if prm.Object != nil && !prm.WithoutHeaderRequest {
} else if prm.Object != nil {
headerObjSDK, err := c.headerProvider.GetHeader(ctx, prm.Container, *prm.Object, true)
if err == nil {
header = headerObjSDK.ToV2().GetHeader()
}
}
header = c.fillHeaderWithECParent(ctx, prm, header)
header, err := c.fillHeaderWithECParent(ctx, prm, header)
if err != nil {
return defaultRequest, fmt.Errorf("get EC parent header: %w", err)
}
reqProps := map[string]string{
nativeschema.PropertyKeyActorPublicKey: prm.SenderKey,
nativeschema.PropertyKeyActorRole: prm.Role,
@ -133,7 +140,6 @@ func (c *checkerImpl) newAPERequest(ctx context.Context, prm Prm) (aperequest.Re
reqProps[xheadKey] = xhead.GetValue()
}
var err error
reqProps, err = c.fillWithUserClaimTags(reqProps, prm)
if err != nil {
return defaultRequest, err
@ -155,35 +161,43 @@ func (c *checkerImpl) newAPERequest(ctx context.Context, prm Prm) (aperequest.Re
), nil
}
func (c *checkerImpl) fillHeaderWithECParent(ctx context.Context, prm Prm, header *objectV2.Header) *objectV2.Header {
func (c *checkerImpl) fillHeaderWithECParent(ctx context.Context, prm Prm, header *objectV2.Header) (*objectV2.Header, error) {
if header == nil {
return header
return header, nil
}
if header.GetEC() == nil {
return header
}
if prm.Role == nativeschema.PropertyValueContainerRoleContainer ||
prm.Role == nativeschema.PropertyValueContainerRoleIR {
return header
return header, nil
}
parentObjRefID := header.GetEC().Parent
if parentObjRefID == nil {
return header
return nil, errECMissingParentObjectID
}
var parentObjID oid.ID
if err := parentObjID.ReadFromV2(*parentObjRefID); err != nil {
return header
return nil, fmt.Errorf("EC parent object ID format error: %w", err)
}
// only container node have access to collect parent object
contNode, err := c.currentNodeIsContainerNode(prm.Container)
if err != nil || !contNode {
return header
if err != nil {
return nil, fmt.Errorf("check container node status: %w", err)
}
if !contNode {
return header, nil
}
parentObj, err := c.headerProvider.GetHeader(ctx, prm.Container, parentObjID, false)
if err != nil {
return header
if isLogicalError(err) {
return header, nil
}
return nil, fmt.Errorf("EC parent header request: %w", err)
}
return parentObj.ToV2().GetHeader()
return parentObj.ToV2().GetHeader(), nil
}
func isLogicalError(err error) bool {
var errObjRemoved *apistatus.ObjectAlreadyRemoved
var errObjNotFound *apistatus.ObjectNotFound
return errors.As(err, &errObjRemoved) || errors.As(err, &errObjNotFound)
}
func (c *checkerImpl) currentNodeIsContainerNode(cnrID cid.ID) (bool, error) {

View file

@ -25,7 +25,10 @@ import (
var _ transformer.ObjectWriter = (*ECWriter)(nil)
var errUnsupportedECObject = errors.New("object is not supported for erasure coding")
var (
errUnsupportedECObject = errors.New("object is not supported for erasure coding")
errFailedToSaveAllECParts = errors.New("failed to save all EC parts")
)
type ECWriter struct {
Config *Config
@ -37,10 +40,12 @@ type ECWriter struct {
ObjectMeta object.ContentMeta
ObjectMetaValid bool
remoteRequestSignKey *ecdsa.PrivateKey
}
func (e *ECWriter) WriteObject(ctx context.Context, obj *objectSDK.Object) error {
relayed, err := e.relayIfNotContainerNode(ctx, obj)
relayed, isContainerNode, err := e.relayIfNotContainerNode(ctx, obj)
if err != nil {
return err
}
@ -60,23 +65,35 @@ func (e *ECWriter) WriteObject(ctx context.Context, obj *objectSDK.Object) error
e.ObjectMetaValid = true
}
if isContainerNode {
restoreTokens := e.CommonPrm.ForgetTokens()
defer restoreTokens()
// As request executed on container node, so sign request with container key.
e.remoteRequestSignKey, err = e.Config.KeyStorage.GetKey(nil)
if err != nil {
return err
}
} else {
e.remoteRequestSignKey = e.Key
}
if obj.ECHeader() != nil {
return e.writeECPart(ctx, obj)
}
return e.writeRawObject(ctx, obj)
}
func (e *ECWriter) relayIfNotContainerNode(ctx context.Context, obj *objectSDK.Object) (bool, error) {
if e.Relay == nil {
return false, nil
}
func (e *ECWriter) relayIfNotContainerNode(ctx context.Context, obj *objectSDK.Object) (bool, bool, error) {
currentNodeIsContainerNode, err := e.currentNodeIsContainerNode()
if err != nil {
return false, err
return false, false, err
}
if currentNodeIsContainerNode {
// object can be splitted or saved local
return false, nil
return false, true, nil
}
if e.Relay == nil {
return false, currentNodeIsContainerNode, nil
}
objID := object.AddressOf(obj).Object()
var index uint32
@ -85,9 +102,9 @@ func (e *ECWriter) relayIfNotContainerNode(ctx context.Context, obj *objectSDK.O
index = obj.ECHeader().Index()
}
if err := e.relayToContainerNode(ctx, objID, index); err != nil {
return false, err
return false, false, err
}
return true, nil
return true, currentNodeIsContainerNode, nil
}
func (e *ECWriter) currentNodeIsContainerNode() (bool, error) {
@ -235,6 +252,13 @@ func (e *ECWriter) writeRawObject(ctx context.Context, obj *objectSDK.Object) er
singleErr: err,
}
}
for idx := range partsProcessed {
if !partsProcessed[idx].Load() {
return errIncompletePut{
singleErr: errFailedToSaveAllECParts,
}
}
}
return nil
}
@ -284,7 +308,7 @@ func (e *ECWriter) writePart(ctx context.Context, obj *objectSDK.Object, partIdx
}
// try to save to any node not visited by current part
for i := range len(nodes) {
for i := range nodes {
select {
case <-ctx.Done():
return ctx.Err()
@ -338,7 +362,7 @@ func (e *ECWriter) writePartRemote(ctx context.Context, obj *objectSDK.Object, n
client.NodeInfoFromNetmapElement(&clientNodeInfo, node)
remoteTaget := remoteWriter{
privateKey: e.Key,
privateKey: e.remoteRequestSignKey,
clientConstructor: e.Config.ClientConstructor,
commonPrm: e.CommonPrm,
nodeInfo: clientNodeInfo,

View file

@ -14,6 +14,7 @@ import (
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/client"
netmapcore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/netmap"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/network"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/util"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object_manager/placement"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/checksum"
@ -127,6 +128,8 @@ func TestECWriter(t *testing.T) {
ownerKey, err := keys.NewPrivateKey()
require.NoError(t, err)
nodeKey, err := keys.NewPrivateKey()
require.NoError(t, err)
pool, err := ants.NewPool(4, ants.WithNonblocking(true))
require.NoError(t, err)
@ -141,6 +144,7 @@ func TestECWriter(t *testing.T) {
RemotePool: pool,
Logger: log,
ClientConstructor: clientConstructor{vectors: ns},
KeyStorage: util.NewKeyStorage(&nodeKey.PrivateKey, nil, nil),
},
PlacementOpts: append(
[]placement.Option{placement.UseBuilder(builder), placement.ForContainer(cnr)},

View file

@ -113,10 +113,9 @@ func (s *Streamer) init(ctx context.Context, req *objectV2.PatchRequest) error {
oV2.GetHeader().SetOwnerID(ownerID)
target, err := target.New(objectwriter.Params{
Config: s.Config,
Common: commonPrm,
Header: objectSDK.NewFromV2(oV2),
SignRequestPrivateKey: s.localNodeKey,
Config: s.Config,
Common: commonPrm,
Header: objectSDK.NewFromV2(oV2),
})
if err != nil {
return fmt.Errorf("target creation: %w", err)

View file

@ -100,11 +100,18 @@ func (p *CommonPrm) SetNetmapLookupDepth(v uint64) {
// ForgetTokens forgets all the tokens read from the request's
// meta information before.
func (p *CommonPrm) ForgetTokens() {
func (p *CommonPrm) ForgetTokens() func() {
if p != nil {
tk := p.token
br := p.bearer
p.token = nil
p.bearer = nil
return func() {
p.token = tk
p.bearer = br
}
}
return func() {}
}
func CommonPrmFromV2(req interface {

View file

@ -3,6 +3,7 @@ package placement
import (
"crypto/sha256"
"fmt"
"slices"
"sync"
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
@ -44,7 +45,7 @@ func (c *ContainerNodesCache) ContainerNodes(nm *netmapSDK.NetMap, cnr cid.ID, p
raw, ok := c.containerCache.Get(cnr)
c.mtx.Unlock()
if ok {
return raw, nil
return c.cloneResult(raw), nil
}
} else {
c.lastEpoch = nm.Epoch()
@ -65,5 +66,13 @@ func (c *ContainerNodesCache) ContainerNodes(nm *netmapSDK.NetMap, cnr cid.ID, p
c.containerCache.Add(cnr, cn)
}
c.mtx.Unlock()
return cn, nil
return c.cloneResult(cn), nil
}
func (c *ContainerNodesCache) cloneResult(nodes [][]netmapSDK.NodeInfo) [][]netmapSDK.NodeInfo {
result := make([][]netmapSDK.NodeInfo, len(nodes))
for repIdx := range nodes {
result[repIdx] = slices.Clone(nodes[repIdx])
}
return result
}

View file

@ -0,0 +1,43 @@
package placement
import (
"errors"
"strings"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/netmap"
)
const (
attrPrefix = "$attribute:"
)
type Metric interface {
CalculateValue(*netmap.NodeInfo, *netmap.NodeInfo) int
}
func ParseMetric(raw string) (Metric, error) {
if attr, found := strings.CutPrefix(raw, attrPrefix); found {
return NewAttributeMetric(attr), nil
}
return nil, errors.New("unsupported priority metric")
}
// attributeMetric describes priority metric based on attribute.
type attributeMetric struct {
attribute string
}
// CalculateValue return [0] if from and to contains attribute attributeMetric.attribute and
// the value of attribute is the same. In other case return [1].
func (am *attributeMetric) CalculateValue(from *netmap.NodeInfo, to *netmap.NodeInfo) int {
fromAttr := from.Attribute(am.attribute)
toAttr := to.Attribute(am.attribute)
if len(fromAttr) > 0 && len(toAttr) > 0 && fromAttr == toAttr {
return 0
}
return 1
}
func NewAttributeMetric(attr string) Metric {
return &attributeMetric{attribute: attr}
}

View file

@ -3,6 +3,7 @@ package placement
import (
"errors"
"fmt"
"slices"
"sync"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/network"
@ -23,6 +24,11 @@ type Builder interface {
BuildPlacement(cid.ID, *oid.ID, netmap.PlacementPolicy) ([][]netmap.NodeInfo, error)
}
type NodeState interface {
// LocalNodeInfo return current node state in FrostFS API v2 NodeInfo structure.
LocalNodeInfo() *netmap.NodeInfo
}
// Option represents placement traverser option.
type Option func(*cfg)
@ -50,6 +56,10 @@ type cfg struct {
policy netmap.PlacementPolicy
builder Builder
metrics []Metric
nodeState NodeState
}
const invalidOptsMsg = "invalid traverser options"
@ -99,7 +109,22 @@ func NewTraverser(opts ...Option) (*Traverser, error) {
}
var rem []int
if cfg.flatSuccess != nil {
if len(cfg.metrics) > 0 && cfg.nodeState != nil {
rem = defaultCopiesVector(cfg.policy)
var unsortedVector []netmap.NodeInfo
var regularVector []netmap.NodeInfo
for i := range rem {
unsortedVector = append(unsortedVector, ns[i][:rem[i]]...)
regularVector = append(regularVector, ns[i][rem[i]:]...)
}
rem = []int{-1, -1}
sortedVector, err := sortVector(cfg, unsortedVector)
if err != nil {
return nil, err
}
ns = [][]netmap.NodeInfo{sortedVector, regularVector}
} else if cfg.flatSuccess != nil {
ns = flatNodes(ns)
rem = []int{int(*cfg.flatSuccess)}
} else {
@ -157,6 +182,35 @@ func flatNodes(ns [][]netmap.NodeInfo) [][]netmap.NodeInfo {
return [][]netmap.NodeInfo{flat}
}
type nodeMetrics struct {
index int
metrics []int
}
func sortVector(cfg *cfg, unsortedVector []netmap.NodeInfo) ([]netmap.NodeInfo, error) {
nm := make([]nodeMetrics, len(unsortedVector))
node := cfg.nodeState.LocalNodeInfo()
for i := range unsortedVector {
m := make([]int, len(cfg.metrics))
for j, pm := range cfg.metrics {
m[j] = pm.CalculateValue(node, &unsortedVector[i])
}
nm[i] = nodeMetrics{
index: i,
metrics: m,
}
}
slices.SortFunc(nm, func(a, b nodeMetrics) int {
return slices.Compare(a.metrics, b.metrics)
})
sortedVector := make([]netmap.NodeInfo, len(unsortedVector))
for i := range unsortedVector {
sortedVector[i] = unsortedVector[nm[i].index]
}
return sortedVector, nil
}
// Node is a descriptor of storage node with information required for intra-container communication.
type Node struct {
addresses network.AddressGroup
@ -322,3 +376,17 @@ func WithCopyNumbers(v []uint32) Option {
c.copyNumbers = v
}
}
// WithPriorityMetrics use provided priority metrics to sort nodes.
func WithPriorityMetrics(m []Metric) Option {
return func(c *cfg) {
c.metrics = m
}
}
// WithNodeState provide state of the current node.
func WithNodeState(s NodeState) Option {
return func(c *cfg) {
c.nodeState = s
}
}

View file

@ -22,7 +22,9 @@ func (b testBuilder) BuildPlacement(cid.ID, *oid.ID, netmap.PlacementPolicy) ([]
}
func testNode(v uint32) (n netmap.NodeInfo) {
n.SetNetworkEndpoints("/ip4/0.0.0.0/tcp/" + strconv.Itoa(int(v)))
ip := "/ip4/0.0.0.0/tcp/" + strconv.Itoa(int(v))
n.SetNetworkEndpoints(ip)
n.SetPublicKey([]byte(ip))
return n
}
@ -40,7 +42,15 @@ func copyVectors(v [][]netmap.NodeInfo) [][]netmap.NodeInfo {
return vc
}
func testPlacement(ss, rs []int) ([][]netmap.NodeInfo, container.Container) {
func testPlacement(ss []int, rs []int) ([][]netmap.NodeInfo, container.Container) {
return placement(ss, rs, nil)
}
func testECPlacement(ss []int, ec [][]int) ([][]netmap.NodeInfo, container.Container) {
return placement(ss, nil, ec)
}
func placement(ss []int, rs []int, ec [][]int) ([][]netmap.NodeInfo, container.Container) {
nodes := make([][]netmap.NodeInfo, 0, len(rs))
replicas := make([]netmap.ReplicaDescriptor, 0, len(rs))
num := uint32(0)
@ -56,7 +66,12 @@ func testPlacement(ss, rs []int) ([][]netmap.NodeInfo, container.Container) {
nodes = append(nodes, ns)
var rd netmap.ReplicaDescriptor
rd.SetNumberOfObjects(uint32(rs[i]))
if len(rs) > 0 {
rd.SetNumberOfObjects(uint32(rs[i]))
} else {
rd.SetECDataCount(uint32(ec[i][0]))
rd.SetECParityCount(uint32(ec[i][1]))
}
replicas = append(replicas, rd)
}
@ -134,7 +149,7 @@ func TestTraverserObjectScenarios(t *testing.T) {
err = n.FromIterator(netmapcore.Node(nodes[1][0]))
require.NoError(t, err)
require.Equal(t, []Node{{addresses: n}}, tr.Next())
require.Equal(t, []Node{{addresses: n, key: []byte("/ip4/0.0.0.0/tcp/5")}}, tr.Next())
})
t.Run("put scenario", func(t *testing.T) {
@ -275,3 +290,268 @@ func TestTraverserRemValues(t *testing.T) {
})
}
}
type nodeState struct {
node *netmap.NodeInfo
}
func (n *nodeState) LocalNodeInfo() *netmap.NodeInfo {
return n.node
}
func TestTraverserPriorityMetrics(t *testing.T) {
t.Run("one rep one metric", func(t *testing.T) {
selectors := []int{4}
replicas := []int{3}
nodes, cnr := testPlacement(selectors, replicas)
// Node_0, PK - ip4/0.0.0.0/tcp/0
nodes[0][0].SetAttribute("ClusterName", "A")
// Node_1, PK - ip4/0.0.0.0/tcp/1
nodes[0][1].SetAttribute("ClusterName", "A")
// Node_2, PK - ip4/0.0.0.0/tcp/2
nodes[0][2].SetAttribute("ClusterName", "B")
// Node_3, PK - ip4/0.0.0.0/tcp/3
nodes[0][3].SetAttribute("ClusterName", "B")
sdkNode := testNode(5)
sdkNode.SetAttribute("ClusterName", "B")
nodesCopy := copyVectors(nodes)
m := []Metric{NewAttributeMetric("ClusterName")}
tr, err := NewTraverser(
ForContainer(cnr),
UseBuilder(&testBuilder{
vectors: nodesCopy,
}),
WithoutSuccessTracking(),
WithPriorityMetrics(m),
WithNodeState(&nodeState{
node: &sdkNode,
}),
)
require.NoError(t, err)
// Without priority metric `ClusterName` the order will be:
// [ {Node_0 A}, {Node_1 A}, {Node_2 B}, {Node_3 B}]
// With priority metric `ClusterName` and current node in cluster B
// the order should be:
// [ {Node_2 B}, {Node_0 A}, {Node_1 A}, {Node_3 B}]
next := tr.Next()
require.NotNil(t, next)
require.Equal(t, 3, len(next))
require.Equal(t, "/ip4/0.0.0.0/tcp/2", string(next[0].PublicKey()))
require.Equal(t, "/ip4/0.0.0.0/tcp/0", string(next[1].PublicKey()))
require.Equal(t, "/ip4/0.0.0.0/tcp/1", string(next[2].PublicKey()))
next = tr.Next()
// The last node is
require.Equal(t, 1, len(next))
require.Equal(t, "/ip4/0.0.0.0/tcp/3", string(next[0].PublicKey()))
next = tr.Next()
require.Nil(t, next)
})
t.Run("two reps two metrics", func(t *testing.T) {
selectors := []int{3, 3}
replicas := []int{2, 2}
nodes, cnr := testPlacement(selectors, replicas)
// REPLICA #1
// Node_0, PK - ip4/0.0.0.0/tcp/0
nodes[0][0].SetAttribute("ClusterName", "A")
nodes[0][0].SetAttribute("UN-LOCODE", "RU LED")
// Node_1, PK - ip4/0.0.0.0/tcp/1
nodes[0][1].SetAttribute("ClusterName", "A")
nodes[0][1].SetAttribute("UN-LOCODE", "FI HEL")
// Node_2, PK - ip4/0.0.0.0/tcp/2
nodes[0][2].SetAttribute("ClusterName", "A")
nodes[0][2].SetAttribute("UN-LOCODE", "RU LED")
// REPLICA #2
// Node_3 ip4/0.0.0.0/tcp/3
nodes[1][0].SetAttribute("ClusterName", "B")
nodes[1][0].SetAttribute("UN-LOCODE", "RU MOW")
// Node_4, PK - ip4/0.0.0.0/tcp/4
nodes[1][1].SetAttribute("ClusterName", "B")
nodes[1][1].SetAttribute("UN-LOCODE", "RU DME")
// Node_5, PK - ip4/0.0.0.0/tcp/5
nodes[1][2].SetAttribute("ClusterName", "B")
nodes[1][2].SetAttribute("UN-LOCODE", "RU MOW")
sdkNode := testNode(9)
sdkNode.SetAttribute("ClusterName", "B")
sdkNode.SetAttribute("UN-LOCODE", "RU DME")
nodesCopy := copyVectors(nodes)
m := []Metric{
NewAttributeMetric("ClusterName"),
NewAttributeMetric("UN-LOCODE"),
}
tr, err := NewTraverser(
ForContainer(cnr),
UseBuilder(&testBuilder{
vectors: nodesCopy,
}),
WithoutSuccessTracking(),
WithPriorityMetrics(m),
WithNodeState(&nodeState{
node: &sdkNode,
}),
)
require.NoError(t, err)
// Check that nodes in the same cluster and
// in the same location should be the first in slice.
// Nodes which are follow criteria but stay outside the replica
// should be in the next slice.
next := tr.Next()
require.Equal(t, 4, len(next))
require.Equal(t, "/ip4/0.0.0.0/tcp/4", string(next[0].PublicKey()))
require.Equal(t, "/ip4/0.0.0.0/tcp/3", string(next[1].PublicKey()))
require.Equal(t, "/ip4/0.0.0.0/tcp/0", string(next[2].PublicKey()))
require.Equal(t, "/ip4/0.0.0.0/tcp/1", string(next[3].PublicKey()))
next = tr.Next()
require.Equal(t, 2, len(next))
require.Equal(t, "/ip4/0.0.0.0/tcp/2", string(next[0].PublicKey()))
require.Equal(t, "/ip4/0.0.0.0/tcp/5", string(next[1].PublicKey()))
next = tr.Next()
require.Nil(t, next)
sdkNode.SetAttribute("ClusterName", "B")
sdkNode.SetAttribute("UN-LOCODE", "RU MOW")
nodesCopy = copyVectors(nodes)
tr, err = NewTraverser(
ForContainer(cnr),
UseBuilder(&testBuilder{
vectors: nodesCopy,
}),
WithoutSuccessTracking(),
WithPriorityMetrics(m),
WithNodeState(&nodeState{
node: &sdkNode,
}),
)
require.NoError(t, err)
next = tr.Next()
require.Equal(t, 4, len(next))
require.Equal(t, "/ip4/0.0.0.0/tcp/3", string(next[0].PublicKey()))
require.Equal(t, "/ip4/0.0.0.0/tcp/4", string(next[1].PublicKey()))
require.Equal(t, "/ip4/0.0.0.0/tcp/0", string(next[2].PublicKey()))
require.Equal(t, "/ip4/0.0.0.0/tcp/1", string(next[3].PublicKey()))
next = tr.Next()
require.Equal(t, 2, len(next))
require.Equal(t, "/ip4/0.0.0.0/tcp/2", string(next[0].PublicKey()))
require.Equal(t, "/ip4/0.0.0.0/tcp/5", string(next[1].PublicKey()))
next = tr.Next()
require.Nil(t, next)
sdkNode.SetAttribute("ClusterName", "A")
sdkNode.SetAttribute("UN-LOCODE", "RU LED")
nodesCopy = copyVectors(nodes)
tr, err = NewTraverser(
ForContainer(cnr),
UseBuilder(&testBuilder{
vectors: nodesCopy,
}),
WithoutSuccessTracking(),
WithPriorityMetrics(m),
WithNodeState(&nodeState{
node: &sdkNode,
}),
)
require.NoError(t, err)
next = tr.Next()
require.Equal(t, 4, len(next))
require.Equal(t, "/ip4/0.0.0.0/tcp/0", string(next[0].PublicKey()))
require.Equal(t, "/ip4/0.0.0.0/tcp/1", string(next[1].PublicKey()))
require.Equal(t, "/ip4/0.0.0.0/tcp/3", string(next[2].PublicKey()))
require.Equal(t, "/ip4/0.0.0.0/tcp/4", string(next[3].PublicKey()))
next = tr.Next()
require.Equal(t, 2, len(next))
require.Equal(t, "/ip4/0.0.0.0/tcp/2", string(next[0].PublicKey()))
require.Equal(t, "/ip4/0.0.0.0/tcp/5", string(next[1].PublicKey()))
next = tr.Next()
require.Nil(t, next)
})
t.Run("ec container", func(t *testing.T) {
selectors := []int{4}
ec := [][]int{{2, 1}}
nodes, cnr := testECPlacement(selectors, ec)
// Node_0, PK - ip4/0.0.0.0/tcp/0
nodes[0][0].SetAttribute("ClusterName", "A")
// Node_1, PK - ip4/0.0.0.0/tcp/1
nodes[0][1].SetAttribute("ClusterName", "A")
// Node_2, PK - ip4/0.0.0.0/tcp/2
nodes[0][2].SetAttribute("ClusterName", "B")
// Node_3, PK - ip4/0.0.0.0/tcp/3
nodes[0][3].SetAttribute("ClusterName", "B")
sdkNode := testNode(5)
sdkNode.SetAttribute("ClusterName", "B")
nodesCopy := copyVectors(nodes)
m := []Metric{NewAttributeMetric("ClusterName")}
tr, err := NewTraverser(
ForContainer(cnr),
UseBuilder(&testBuilder{
vectors: nodesCopy,
}),
WithoutSuccessTracking(),
WithPriorityMetrics(m),
WithNodeState(&nodeState{
node: &sdkNode,
}),
)
require.NoError(t, err)
// Without priority metric `ClusterName` the order will be:
// [ {Node_0 A}, {Node_1 A}, {Node_2 B}, {Node_3 B}]
// With priority metric `ClusterName` and current node in cluster B
// the order should be:
// [ {Node_2 B}, {Node_0 A}, {Node_1 A}, {Node_3 B}]
next := tr.Next()
require.NotNil(t, next)
require.Equal(t, 3, len(next))
require.Equal(t, "/ip4/0.0.0.0/tcp/2", string(next[0].PublicKey()))
require.Equal(t, "/ip4/0.0.0.0/tcp/0", string(next[1].PublicKey()))
require.Equal(t, "/ip4/0.0.0.0/tcp/1", string(next[2].PublicKey()))
next = tr.Next()
// The last node is
require.Equal(t, 1, len(next))
require.Equal(t, "/ip4/0.0.0.0/tcp/3", string(next[0].PublicKey()))
next = tr.Next()
require.Nil(t, next)
})
}

View file

@ -9,14 +9,25 @@ import (
objectcore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object"
policycore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/policy"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/replicator"
"git.frostfs.info/TrueCloudLab/frostfs-observability/tracing"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/netmap"
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace"
"go.uber.org/zap"
)
func (p *Policer) processObject(ctx context.Context, objInfo objectcore.Info) error {
ctx, span := tracing.StartSpanFromContext(ctx, "Policer.ProcessObject", trace.WithAttributes(
attribute.String("address", objInfo.Address.String()),
attribute.Bool("is_linking_object", objInfo.IsLinkingObject),
attribute.Bool("is_ec_part", objInfo.ECInfo != nil),
attribute.String("type", objInfo.Type.String()),
))
defer span.End()
cnr, err := p.cnrSrc.Get(objInfo.Address.Container())
if err != nil {
if client.IsErrContainerNotFound(err) {

View file

@ -0,0 +1,207 @@
package tree
import (
"context"
"encoding/hex"
"fmt"
"testing"
"git.frostfs.info/TrueCloudLab/frostfs-contract/frostfsid/client"
core "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/container"
frostfsidcore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/frostfsid"
checkercore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/common/ape"
containerSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/acl"
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
"git.frostfs.info/TrueCloudLab/policy-engine/pkg/chain"
"git.frostfs.info/TrueCloudLab/policy-engine/pkg/engine"
"git.frostfs.info/TrueCloudLab/policy-engine/pkg/engine/inmemory"
nativeschema "git.frostfs.info/TrueCloudLab/policy-engine/schema/native"
"github.com/nspcc-dev/neo-go/pkg/crypto/keys"
"github.com/nspcc-dev/neo-go/pkg/util"
"github.com/stretchr/testify/require"
)
var (
containerID = "73tQMTYyUkTgmvPR1HWib6pndbhSoBovbnMF7Pws8Rcy"
senderPrivateKey, _ = keys.NewPrivateKey()
senderKey = hex.EncodeToString(senderPrivateKey.PublicKey().Bytes())
rootCnr = &core.Container{Value: containerSDK.Container{}}
)
type frostfsIDProviderMock struct {
subjects map[util.Uint160]*client.Subject
subjectsExtended map[util.Uint160]*client.SubjectExtended
}
func (f *frostfsIDProviderMock) GetSubject(key util.Uint160) (*client.Subject, error) {
v, ok := f.subjects[key]
if !ok {
return nil, fmt.Errorf("%s", frostfsidcore.SubjectNotFoundErrorMessage)
}
return v, nil
}
func (f *frostfsIDProviderMock) GetSubjectExtended(key util.Uint160) (*client.SubjectExtended, error) {
v, ok := f.subjectsExtended[key]
if !ok {
return nil, fmt.Errorf("%s", frostfsidcore.SubjectNotFoundErrorMessage)
}
return v, nil
}
var _ frostfsidcore.SubjectProvider = (*frostfsIDProviderMock)(nil)
func newFrostfsIDProviderMock(t *testing.T) *frostfsIDProviderMock {
return &frostfsIDProviderMock{
subjects: map[util.Uint160]*client.Subject{
scriptHashFromSenderKey(t, senderKey): {
Namespace: "testnamespace",
Name: "test",
KV: map[string]string{
"tag-attr1": "value1",
"tag-attr2": "value2",
},
},
},
subjectsExtended: map[util.Uint160]*client.SubjectExtended{
scriptHashFromSenderKey(t, senderKey): {
Namespace: "testnamespace",
Name: "test",
KV: map[string]string{
"tag-attr1": "value1",
"tag-attr2": "value2",
},
Groups: []*client.Group{
{
ID: 1,
Name: "test",
Namespace: "testnamespace",
KV: map[string]string{
"attr1": "value1",
"attr2": "value2",
},
},
},
},
},
}
}
func scriptHashFromSenderKey(t *testing.T, senderKey string) util.Uint160 {
pk, err := keys.NewPublicKeyFromString(senderKey)
require.NoError(t, err)
return pk.GetScriptHash()
}
type stMock struct{}
func (m *stMock) CurrentEpoch() uint64 {
return 8
}
func TestCheckAPE(t *testing.T) {
cid := cid.ID{}
_ = cid.DecodeString(containerID)
t.Run("put non-tombstone rule won't affect tree remove", func(t *testing.T) {
los := inmemory.NewInmemoryLocalStorage()
mcs := inmemory.NewInmemoryMorphRuleChainStorage()
fid := newFrostfsIDProviderMock(t)
s := Service{
cfg: cfg{
frostfsidSubjectProvider: fid,
},
apeChecker: checkercore.New(los, mcs, fid, &stMock{}),
}
los.AddOverride(chain.Ingress, engine.ContainerTarget(containerID), &chain.Chain{
Rules: []chain.Rule{
{
Status: chain.AccessDenied,
Actions: chain.Actions{Names: []string{nativeschema.MethodPutObject}},
Resources: chain.Resources{
Names: []string{fmt.Sprintf(nativeschema.ResourceFormatRootContainerObjects, containerID)},
},
Condition: []chain.Condition{
{
Op: chain.CondStringNotEquals,
Kind: chain.KindResource,
Key: nativeschema.PropertyKeyObjectType,
Value: "TOMBSTONE",
},
},
},
},
MatchType: chain.MatchTypeFirstMatch,
})
mcs.AddMorphRuleChain(chain.Ingress, engine.ContainerTarget(containerID), &chain.Chain{
Rules: []chain.Rule{
{
Status: chain.Allow,
Actions: chain.Actions{Names: []string{nativeschema.MethodDeleteObject}},
Resources: chain.Resources{
Names: []string{fmt.Sprintf(nativeschema.ResourceFormatRootContainerObjects, containerID)},
},
},
},
MatchType: chain.MatchTypeFirstMatch,
})
err := s.checkAPE(context.Background(), nil, rootCnr, cid, acl.OpObjectDelete, acl.RoleOwner, senderPrivateKey.PublicKey())
require.NoError(t, err)
})
t.Run("delete rule won't affect tree add", func(t *testing.T) {
los := inmemory.NewInmemoryLocalStorage()
mcs := inmemory.NewInmemoryMorphRuleChainStorage()
fid := newFrostfsIDProviderMock(t)
s := Service{
cfg: cfg{
frostfsidSubjectProvider: fid,
},
apeChecker: checkercore.New(los, mcs, fid, &stMock{}),
}
los.AddOverride(chain.Ingress, engine.ContainerTarget(containerID), &chain.Chain{
Rules: []chain.Rule{
{
Status: chain.AccessDenied,
Actions: chain.Actions{Names: []string{nativeschema.MethodDeleteObject}},
Resources: chain.Resources{
Names: []string{fmt.Sprintf(nativeschema.ResourceFormatRootContainerObjects, containerID)},
},
},
},
MatchType: chain.MatchTypeFirstMatch,
})
mcs.AddMorphRuleChain(chain.Ingress, engine.ContainerTarget(containerID), &chain.Chain{
Rules: []chain.Rule{
{
Status: chain.Allow,
Actions: chain.Actions{Names: []string{nativeschema.MethodPutObject}},
Resources: chain.Resources{
Names: []string{fmt.Sprintf(nativeschema.ResourceFormatRootContainerObjects, containerID)},
},
Condition: []chain.Condition{
{
Op: chain.CondStringNotEquals,
Kind: chain.KindResource,
Key: nativeschema.PropertyKeyObjectType,
Value: "TOMBSTONE",
},
},
},
},
MatchType: chain.MatchTypeFirstMatch,
})
err := s.checkAPE(context.Background(), nil, rootCnr, cid, acl.OpObjectPut, acl.RoleOwner, senderPrivateKey.PublicKey())
require.NoError(t, err)
})
}

View file

@ -5,7 +5,7 @@ import (
"context"
"errors"
"fmt"
"sort"
"slices"
"sync"
"sync/atomic"
@ -210,7 +210,7 @@ func (s *Service) Remove(ctx context.Context, req *RemoveRequest) (*RemoveRespon
return nil, err
}
err := s.verifyClient(ctx, req, cid, b.GetBearerToken(), acl.OpObjectPut)
err := s.verifyClient(ctx, req, cid, b.GetBearerToken(), acl.OpObjectDelete)
if err != nil {
return nil, err
}
@ -575,10 +575,9 @@ func sortByFilename(nodes []pilorama.NodeInfo, d GetSubTreeRequest_Body_Order_Di
if len(nodes) == 0 {
return nodes, nil
}
less := func(i, j int) bool {
return bytes.Compare(nodes[i].Meta.GetAttr(pilorama.AttributeFilename), nodes[j].Meta.GetAttr(pilorama.AttributeFilename)) < 0
}
sort.Slice(nodes, less)
slices.SortFunc(nodes, func(a, b pilorama.NodeInfo) int {
return bytes.Compare(a.Meta.GetAttr(pilorama.AttributeFilename), b.Meta.GetAttr(pilorama.AttributeFilename))
})
return nodes, nil
default:
return nil, fmt.Errorf("unsupported order direction: %s", d.String())

View file

@ -3,8 +3,14 @@ package httputil
import (
"net/http"
"net/http/pprof"
"github.com/felixge/fgprof"
)
func init() {
http.DefaultServeMux.Handle("/debug/fgprof", fgprof.Handler())
}
// initializes pprof package in order to
// register Prometheus handlers on http.DefaultServeMux.
var _ = pprof.Handler("")

View file

@ -19,7 +19,7 @@ import (
func GeneratePayloadPool(count uint, size uint) [][]byte {
var pool [][]byte
for i := uint(0); i < count; i++ {
for range count {
payload := make([]byte, size)
_, _ = rand.Read(payload)
@ -30,8 +30,8 @@ func GeneratePayloadPool(count uint, size uint) [][]byte {
func GenerateAttributePool(count uint) []objectSDK.Attribute {
var pool []objectSDK.Attribute
for i := uint(0); i < count; i++ {
for j := uint(0); j < count; j++ {
for i := range count {
for j := range count {
attr := *objectSDK.NewAttribute()
attr.SetKey(fmt.Sprintf("key%d", i))
attr.SetValue(fmt.Sprintf("value%d", j))
@ -43,7 +43,7 @@ func GenerateAttributePool(count uint) []objectSDK.Attribute {
func GenerateOwnerPool(count uint) []user.ID {
var pool []user.ID
for i := uint(0); i < count; i++ {
for range count {
pool = append(pool, usertest.ID())
}
return pool
@ -118,7 +118,7 @@ func WithPayloadFromPool(pool [][]byte) ObjectOption {
func WithAttributesFromPool(pool []objectSDK.Attribute, count uint) ObjectOption {
return func(obj *objectSDK.Object) {
var attrs []objectSDK.Attribute
for i := uint(0); i < count; i++ {
for range count {
attrs = append(attrs, pool[rand.Intn(len(pool))])
}
obj.SetAttributes(attrs...)

View file

@ -29,7 +29,7 @@ func PopulateWithObjects(
) {
digits := "0123456789"
for i := uint(0); i < count; i++ {
for range count {
obj := factory()
id := []byte(fmt.Sprintf(
@ -59,7 +59,7 @@ func PopulateWithBigObjects(
count uint,
factory func() *objectSDK.Object,
) {
for i := uint(0); i < count; i++ {
for range count {
group.Go(func() error {
if err := populateWithBigObject(ctx, db, factory); err != nil {
return fmt.Errorf("couldn't put a big object: %w", err)
@ -154,7 +154,7 @@ func PopulateGraveyard(
wg := &sync.WaitGroup{}
wg.Add(int(count))
for i := uint(0); i < count; i++ {
for range count {
obj := factory()
prm := meta.PutPrm{}
@ -226,7 +226,7 @@ func PopulateLocked(
wg := &sync.WaitGroup{}
wg.Add(int(count))
for i := uint(0); i < count; i++ {
for range count {
defer wg.Done()
obj := factory()

View file

@ -116,7 +116,7 @@ func populate() (err error) {
eg, ctx := errgroup.WithContext(ctx)
eg.SetLimit(int(jobs))
for i := uint(0); i < numContainers; i++ {
for range numContainers {
cid := cidtest.ID()
for _, typ := range types {