Compare commits

...

25 commits

Author SHA1 Message Date
7edec9193c [#1451] placement: Return copy of slice from container nodes cache
Nodes from cache could be changed by traverser, if no objectID specified.
So it is required to return copy of cache's slice.

Signed-off-by: Dmitrii Stepanov <d.stepanov@yadro.com>
2024-11-06 08:18:10 +00:00
3cf6ea745d [#1451] ec: Check all parts are saved
Signed-off-by: Dmitrii Stepanov <d.stepanov@yadro.com>
2024-11-06 08:18:10 +00:00
9a77527f46 [#1451] ape: Drop unused
Signed-off-by: Dmitrii Stepanov <d.stepanov@yadro.com>
2024-11-06 08:18:10 +00:00
5b1ba8e23d [#1451] ape: Perform strict APE checks for EC parts
Signed-off-by: Dmitrii Stepanov <d.stepanov@yadro.com>
2024-11-06 08:18:10 +00:00
9902965ff4 [#1451] writer: Sign EC parts with node's private key
As EC put request may be processed only by container node, so sign requests
with current node private to not to perform APE checks.

Signed-off-by: Dmitrii Stepanov <d.stepanov@yadro.com>
2024-11-06 08:18:10 +00:00
33ad753302 [#1473] policer: Add tracing span
To filter HEAD requests from policer.

Signed-off-by: Dmitrii Stepanov <d.stepanov@yadro.com>
2024-11-06 08:15:11 +00:00
15102e6dfd [#1471] Replace sort.Slice in some places
`slices.SortFunc` doesn't use reflection and is a bit faster.
I have done some micro-benchmarks for `[]NodeInfo`:
```
$ benchstat -col "/func" out
goos: linux
goarch: amd64
pkg: git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/pilorama
cpu: 11th Gen Intel(R) Core(TM) i5-1135G7 @ 2.40GHz
       │ sort.Slice  │           slices.SortFunc           │
       │   sec/op    │   sec/op     vs base                │
Sort-8   2.130µ ± 2%   1.253µ ± 2%  -41.20% (p=0.000 n=10)
```

Haven't included them, though, as they I don't see them being used a
lot.

Signed-off-by: Evgenii Stratonikov <e.stratonikov@yadro.com>
2024-11-06 08:07:32 +00:00
17ec84151b
[#532] cli: Respect XDG base directory spec
XDG base directory specification defines where various files
should be looked by an application. Hopefully, this makes `frostfs-cli`
more predictable and pleasant to work with. Luckily for us, golang already
has everything we need in the stdlib. This commit also gets rid of
`github.com/mitchellh/go-homedir` dependency.

Close #532
Refs #1455

Signed-off-by: Evgenii Stratonikov <e.stratonikov@yadro.com>
2024-11-05 16:27:18 +03:00
6c45a17af6
[#1467] node: Break notary deposit wait after VUB
Signed-off-by: Dmitrii Stepanov <d.stepanov@yadro.com>
2024-11-01 16:42:02 +03:00
d19ab43500
[#1462] node: Add off-cpu profiler
Signed-off-by: Dmitrii Stepanov <d.stepanov@yadro.com>
2024-10-31 11:32:13 +03:00
5bcf81d1cc
[#1466] Remove woodpecker CI
We use forgejo actions now.

Signed-off-by: Evgenii Stratonikov <e.stratonikov@yadro.com>
2024-10-31 10:07:33 +03:00
c2effcc61c [#1465] Makefile: Update golangci-lint, fix warnings
Signed-off-by: Evgenii Stratonikov <e.stratonikov@yadro.com>
2024-10-31 06:39:59 +00:00
2285cfc36f
[#1464] frostfsid: Cache subject not found error
Signed-off-by: Dmitrii Stepanov <d.stepanov@yadro.com>
2024-10-30 18:27:33 +03:00
e74d05c03f
[#1464] frostfsid: Add cache metrics
Signed-off-by: Dmitrii Stepanov <d.stepanov@yadro.com>
2024-10-30 18:27:32 +03:00
48862e0e63 [#1459] .golanci.yml: Add tenv linter, fix issues
Refs #1309

Signed-off-by: Evgenii Stratonikov <e.stratonikov@yadro.com>
2024-10-30 15:18:22 +00:00
89892d9754 [#1459] cli: Simplify slice append
Signed-off-by: Evgenii Stratonikov <e.stratonikov@yadro.com>
2024-10-30 15:18:22 +00:00
7ac0852364 [#1459] .golangci.yml: Add intrange linter, fix issues
Signed-off-by: Evgenii Stratonikov <e.stratonikov@yadro.com>
2024-10-30 15:18:22 +00:00
d28a5d2d7a
[#1448] container/ape: Ignore an error when getting a role
When getting a role in the APE checker for the container services,
an error may be returned if network maps of the previous two epochs
don't have enough nodes to fulfil a container placement policy.
It's a logical error, so we should ignore it.

Signed-off-by: Aleksey Savchuk <a.savchuk@yadro.com>
2024-10-30 12:51:57 +03:00
87ac3c5279 [#1458] object: Make patch not set key before target construction
* `SignRequestPrivateKey` field should be initialized either within
  `newUntrustedTarget` or within `newTrustedTarget`. Otherwise, all
  requests are signed by local node key that makes impossible to perform
  patch on non-container node.

Signed-off-by: Airat Arifullin <a.arifullin@yadro.com>
2024-10-29 15:20:28 +00:00
d5ee6d3039
[#1456] morph: Use DialerSource interface instead of internal struct
Signed-off-by: Dmitrii Stepanov <d.stepanov@yadro.com>
2024-10-29 17:34:14 +03:00
433aab12bb
[#1455] cli: Handle missing home directory
go-homedir library incorrectly handles some of the errors
that could occur. It is archived, so no PR, but let's fix it on our
side. The scenario in case: executing command in an empty environment.

Signed-off-by: Evgenii Stratonikov <e.stratonikov@yadro.com>
2024-10-29 16:37:55 +03:00
81f4cdbb91 [#1439] object: Sort nodes by priority metrics to compute GET request
Signed-off-by: Anton Nikiforov <an.nikiforov@yadro.com>
2024-10-29 08:05:09 +00:00
3cd7d23f10 [#1439] node: Reduce usage of netmapAPI.NodeInfo
Remove outdated code from `netmap` service.

Signed-off-by: Anton Nikiforov <an.nikiforov@yadro.com>
2024-10-29 08:05:09 +00:00
012af5cc38 [#1406] tree: Add unit-tests for ape check
Signed-off-by: Airat Arifullin <a.arifullin@yadro.com>
2024-10-29 08:04:23 +00:00
eb5336d5ff [#1406] tree: Use delete verb instead put for Remove
Signed-off-by: Airat Arifullin <a.arifullin@yadro.com>
2024-10-29 08:04:23 +00:00
62 changed files with 1012 additions and 256 deletions

View file

@ -87,5 +87,7 @@ linters:
- perfsprint - perfsprint
- testifylint - testifylint
- protogetter - protogetter
- intrange
- tenv
disable-all: true disable-all: true
fast: false fast: false

View file

@ -1,11 +0,0 @@
pipeline:
# Kludge for non-root containers under WoodPecker
fix-ownership:
image: alpine:latest
commands: chown -R 1234:1234 .
pre-commit:
image: git.frostfs.info/truecloudlab/frostfs-ci:v0.36
commands:
- export HOME="$(getent passwd $(id -u) | cut '-d:' -f6)"
- pre-commit run --hook-stage manual

View file

@ -8,7 +8,7 @@ HUB_IMAGE ?= git.frostfs.info/truecloudlab/frostfs
HUB_TAG ?= "$(shell echo ${VERSION} | sed 's/^v//')" HUB_TAG ?= "$(shell echo ${VERSION} | sed 's/^v//')"
GO_VERSION ?= 1.22 GO_VERSION ?= 1.22
LINT_VERSION ?= 1.60.3 LINT_VERSION ?= 1.61.0
TRUECLOUDLAB_LINT_VERSION ?= 0.0.7 TRUECLOUDLAB_LINT_VERSION ?= 0.0.7
PROTOC_VERSION ?= 25.0 PROTOC_VERSION ?= 25.0
PROTOGEN_FROSTFS_VERSION ?= $(shell go list -f '{{.Version}}' -m git.frostfs.info/TrueCloudLab/frostfs-api-go/v2) PROTOGEN_FROSTFS_VERSION ?= $(shell go list -f '{{.Version}}' -m git.frostfs.info/TrueCloudLab/frostfs-api-go/v2)

View file

@ -128,7 +128,7 @@ func generateConfigExample(appDir string, credSize int) (string, error) {
tmpl.AlphabetDir = filepath.Join(appDir, "alphabet-wallets") tmpl.AlphabetDir = filepath.Join(appDir, "alphabet-wallets")
var i innerring.GlagoliticLetter var i innerring.GlagoliticLetter
for i = 0; i < innerring.GlagoliticLetter(credSize); i++ { for i = range innerring.GlagoliticLetter(credSize) {
tmpl.Glagolitics = append(tmpl.Glagolitics, i.String()) tmpl.Glagolitics = append(tmpl.Glagolitics, i.String())
} }

View file

@ -63,7 +63,7 @@ func TestGenerateAlphabet(t *testing.T) {
buf.Reset() buf.Reset()
v.Set(commonflags.AlphabetWalletsFlag, walletDir) v.Set(commonflags.AlphabetWalletsFlag, walletDir)
require.NoError(t, GenerateAlphabetCmd.Flags().Set(commonflags.AlphabetSizeFlag, strconv.FormatUint(size, 10))) require.NoError(t, GenerateAlphabetCmd.Flags().Set(commonflags.AlphabetSizeFlag, strconv.FormatUint(size, 10)))
for i := uint64(0); i < size; i++ { for i := range uint64(size) {
buf.WriteString(strconv.FormatUint(i, 10) + "\r") buf.WriteString(strconv.FormatUint(i, 10) + "\r")
} }

View file

@ -659,9 +659,7 @@ func SearchObjects(ctx context.Context, prm SearchObjectsPrm) (*SearchObjectsRes
for { for {
n, ok = rdr.Read(buf) n, ok = rdr.Read(buf)
for i := range n { list = append(list, buf[:n]...)
list = append(list, buf[i])
}
if !ok { if !ok {
break break
} }
@ -672,9 +670,8 @@ func SearchObjects(ctx context.Context, prm SearchObjectsPrm) (*SearchObjectsRes
return nil, fmt.Errorf("read object list: %w", err) return nil, fmt.Errorf("read object list: %w", err)
} }
sort.Slice(list, func(i, j int) bool { slices.SortFunc(list, func(a, b oid.ID) int {
lhs, rhs := list[i].EncodeToString(), list[j].EncodeToString() return strings.Compare(a.EncodeToString(), b.EncodeToString())
return strings.Compare(lhs, rhs) < 0
}) })
return &SearchObjectsRes{ return &SearchObjectsRes{

View file

@ -2,7 +2,7 @@ package common
import ( import (
"context" "context"
"sort" "slices"
"strings" "strings"
"git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-cli/internal/commonflags" "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-cli/internal/commonflags"
@ -45,15 +45,11 @@ func StartClientCommandSpan(cmd *cobra.Command) {
}) })
commonCmd.ExitOnErr(cmd, "init tracing: %w", err) commonCmd.ExitOnErr(cmd, "init tracing: %w", err)
var components sort.StringSlice var components []string
for c := cmd; c != nil; c = c.Parent() { for c := cmd; c != nil; c = c.Parent() {
components = append(components, c.Name()) components = append(components, c.Name())
} }
for i, j := 0, len(components)-1; i < j; { slices.Reverse(components)
components.Swap(i, j)
i++
j--
}
operation := strings.Join(components, ".") operation := strings.Join(components, ".")
ctx, span := tracing.StartSpanFromContext(cmd.Context(), operation) ctx, span := tracing.StartSpanFromContext(cmd.Context(), operation)

View file

@ -195,7 +195,7 @@ func flattenComplexMembersIfECContainer(cmd *cobra.Command, cnrID cid.ID, member
prmHead.SetRawFlag(true) // to get an error instead of whole object prmHead.SetRawFlag(true) // to get an error instead of whole object
eg, egCtx := errgroup.WithContext(cmd.Context()) eg, egCtx := errgroup.WithContext(cmd.Context())
for idx := range len(members) { for idx := range members {
partObjID := members[idx] partObjID := members[idx]
eg.Go(func() error { eg.Go(func() error {

View file

@ -21,7 +21,6 @@ import (
"git.frostfs.info/TrueCloudLab/frostfs-node/misc" "git.frostfs.info/TrueCloudLab/frostfs-node/misc"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/config" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/config"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/gendoc" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/gendoc"
"github.com/mitchellh/go-homedir"
"github.com/spf13/cobra" "github.com/spf13/cobra"
"github.com/spf13/viper" "github.com/spf13/viper"
) )
@ -112,14 +111,16 @@ func initConfig() {
// Use config file from the flag. // Use config file from the flag.
viper.SetConfigFile(cfgFile) viper.SetConfigFile(cfgFile)
} else { } else {
// Find home directory. // Find config directory.
home, err := homedir.Dir() configDir, err := os.UserConfigDir()
commonCmd.ExitOnErr(rootCmd, "", err) if err != nil {
common.PrintVerbose(rootCmd, "Get config dir: %s", err)
// Search config in `$HOME/.config/frostfs-cli/` with name "config.yaml" } else {
viper.AddConfigPath(filepath.Join(home, ".config", "frostfs-cli")) // Search config in `$XDG_CONFIG_HOME/frostfs-cli/` with name "config.yaml"
viper.SetConfigName("config") viper.AddConfigPath(filepath.Join(configDir, "frostfs-cli"))
viper.SetConfigType("yaml") viper.SetConfigName("config")
viper.SetConfigType("yaml")
}
} }
viper.SetEnvPrefix(envPrefix) viper.SetEnvPrefix(envPrefix)

View file

@ -11,7 +11,7 @@ func DecodeOIDs(data []byte) ([]oid.ID, error) {
size := r.ReadVarUint() size := r.ReadVarUint()
oids := make([]oid.ID, size) oids := make([]oid.ID, size)
for i := uint64(0); i < size; i++ { for i := range size {
if err := oids[i].Decode(r.ReadVarBytes()); err != nil { if err := oids[i].Decode(r.ReadVarBytes()); err != nil {
return nil, err return nil, err
} }

View file

@ -58,6 +58,7 @@ import (
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/control" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/control"
objectService "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object" objectService "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object"
getsvc "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/get" getsvc "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/get"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object_manager/placement"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object_manager/tombstone" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object_manager/tombstone"
tsourse "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object_manager/tombstone/source" tsourse "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object_manager/tombstone/source"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/replicator" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/replicator"
@ -109,6 +110,7 @@ type applicationConfiguration struct {
ObjectCfg struct { ObjectCfg struct {
tombstoneLifetime uint64 tombstoneLifetime uint64
priorityMetrics []placement.Metric
} }
EngineCfg struct { EngineCfg struct {
@ -232,6 +234,15 @@ func (a *applicationConfiguration) readConfig(c *config.Config) error {
// Object // Object
a.ObjectCfg.tombstoneLifetime = objectconfig.TombstoneLifetime(c) a.ObjectCfg.tombstoneLifetime = objectconfig.TombstoneLifetime(c)
var pm []placement.Metric
for _, raw := range objectconfig.Get(c).Priority() {
m, err := placement.ParseMetric(raw)
if err != nil {
return err
}
pm = append(pm, m)
}
a.ObjectCfg.priorityMetrics = pm
// Storage Engine // Storage Engine
@ -1179,17 +1190,15 @@ func initObjectPool(cfg *config.Config) (pool cfgObjectRoutines) {
return pool return pool
} }
func (c *cfg) LocalNodeInfo() (*netmapV2.NodeInfo, error) { func (c *cfg) LocalNodeInfo() *netmap.NodeInfo {
var res netmapV2.NodeInfo var res netmap.NodeInfo
ni, ok := c.cfgNetmap.state.getNodeInfo() ni, ok := c.cfgNetmap.state.getNodeInfo()
if ok { if ok {
ni.WriteToV2(&res) res = ni
} else { } else {
c.cfgNodeInfo.localInfo.WriteToV2(&res) res = c.cfgNodeInfo.localInfo
} }
return &res
return &res, nil
} }
// setContractNodeInfo rewrites local node info from the FrostFS network map. // setContractNodeInfo rewrites local node info from the FrostFS network map.

View file

@ -1,7 +1,6 @@
package config_test package config_test
import ( import (
"os"
"strings" "strings"
"testing" "testing"
@ -38,8 +37,7 @@ func TestConfigEnv(t *testing.T) {
envName := strings.ToUpper( envName := strings.ToUpper(
strings.Join([]string{config.EnvPrefix, section, name}, configViper.EnvSeparator)) strings.Join([]string{config.EnvPrefix, section, name}, configViper.EnvSeparator))
err := os.Setenv(envName, value) t.Setenv(envName, value)
require.NoError(t, err)
c := configtest.EmptyConfig() c := configtest.EmptyConfig()

View file

@ -10,10 +10,17 @@ type PutConfig struct {
cfg *config.Config cfg *config.Config
} }
// GetConfig is a wrapper over "get" config section which provides access
// to object get pipeline configuration of object service.
type GetConfig struct {
cfg *config.Config
}
const ( const (
subsection = "object" subsection = "object"
putSubsection = "put" putSubsection = "put"
getSubsection = "get"
// PutPoolSizeDefault is a default value of routine pool size to // PutPoolSizeDefault is a default value of routine pool size to
// process object.Put requests in object service. // process object.Put requests in object service.
@ -56,3 +63,16 @@ func (g PutConfig) PoolSizeLocal() int {
func (g PutConfig) SkipSessionTokenIssuerVerification() bool { func (g PutConfig) SkipSessionTokenIssuerVerification() bool {
return config.BoolSafe(g.cfg, "skip_session_token_issuer_verification") return config.BoolSafe(g.cfg, "skip_session_token_issuer_verification")
} }
// Get returns structure that provides access to "get" subsection of
// "object" section.
func Get(c *config.Config) GetConfig {
return GetConfig{
c.Sub(subsection).Sub(getSubsection),
}
}
// Priority returns the value of "priority" config parameter.
func (g GetConfig) Priority() []string {
return config.StringSliceSafe(g.cfg, "priority")
}

View file

@ -11,8 +11,6 @@ import (
) )
func fromFile(path string) *config.Config { func fromFile(path string) *config.Config {
os.Clearenv() // ENVs have priority over config files, so we do this in tests
return config.New(path, "", "") return config.New(path, "", "")
} }
@ -40,15 +38,6 @@ func ForEachFileType(pref string, f func(*config.Config)) {
// ForEnvFileType creates config from `<pref>.env` file. // ForEnvFileType creates config from `<pref>.env` file.
func ForEnvFileType(t testing.TB, pref string, f func(*config.Config)) { func ForEnvFileType(t testing.TB, pref string, f func(*config.Config)) {
envs := os.Environ()
t.Cleanup(func() {
os.Clearenv()
for _, env := range envs {
keyValue := strings.Split(env, "=")
os.Setenv(keyValue[0], keyValue[1])
}
})
f(fromEnvFile(t, pref+".env")) f(fromEnvFile(t, pref+".env"))
} }
@ -73,7 +62,6 @@ func loadEnv(t testing.TB, path string) {
v = strings.Trim(v, `"`) v = strings.Trim(v, `"`)
err = os.Setenv(k, v) t.Setenv(k, v)
require.NoError(t, err, "can't set environment variable")
} }
} }

View file

@ -8,6 +8,7 @@ import (
containerGRPC "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/container/grpc" containerGRPC "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/container/grpc"
morphconfig "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/morph" morphconfig "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/morph"
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs" "git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/metrics"
containerCore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/container" containerCore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/container"
frostfsidcore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/frostfsid" frostfsidcore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/frostfsid"
cntClient "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/client/container" cntClient "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/client/container"
@ -42,7 +43,7 @@ func initContainerService(_ context.Context, c *cfg) {
cacheSize := morphconfig.FrostfsIDCacheSize(c.appCfg) cacheSize := morphconfig.FrostfsIDCacheSize(c.appCfg)
if cacheSize > 0 { if cacheSize > 0 {
frostfsIDSubjectProvider = newMorphFrostfsIDCache(frostfsIDSubjectProvider, int(cacheSize), c.cfgMorph.cacheTTL) frostfsIDSubjectProvider = newMorphFrostfsIDCache(frostfsIDSubjectProvider, int(cacheSize), c.cfgMorph.cacheTTL, metrics.NewCacheMetrics("frostfs_id"))
} }
c.shared.frostfsidClient = frostfsIDSubjectProvider c.shared.frostfsidClient = frostfsIDSubjectProvider

View file

@ -1,6 +1,7 @@
package main package main
import ( import (
"strings"
"time" "time"
"git.frostfs.info/TrueCloudLab/frostfs-contract/frostfsid/client" "git.frostfs.info/TrueCloudLab/frostfs-contract/frostfsid/client"
@ -9,57 +10,101 @@ import (
"github.com/nspcc-dev/neo-go/pkg/util" "github.com/nspcc-dev/neo-go/pkg/util"
) )
type subjectWithError struct {
subject *client.Subject
err error
}
type subjectExtWithError struct {
subject *client.SubjectExtended
err error
}
type morphFrostfsIDCache struct { type morphFrostfsIDCache struct {
subjProvider frostfsidcore.SubjectProvider subjProvider frostfsidcore.SubjectProvider
subjCache *expirable.LRU[util.Uint160, *client.Subject] subjCache *expirable.LRU[util.Uint160, subjectWithError]
subjExtCache *expirable.LRU[util.Uint160, *client.SubjectExtended] subjExtCache *expirable.LRU[util.Uint160, subjectExtWithError]
metrics cacheMetrics
} }
func newMorphFrostfsIDCache(subjProvider frostfsidcore.SubjectProvider, size int, ttl time.Duration) frostfsidcore.SubjectProvider { func newMorphFrostfsIDCache(subjProvider frostfsidcore.SubjectProvider, size int, ttl time.Duration, metrics cacheMetrics) frostfsidcore.SubjectProvider {
return &morphFrostfsIDCache{ return &morphFrostfsIDCache{
subjProvider: subjProvider, subjProvider: subjProvider,
subjCache: expirable.NewLRU(size, func(util.Uint160, *client.Subject) {}, ttl), subjCache: expirable.NewLRU(size, func(util.Uint160, subjectWithError) {}, ttl),
subjExtCache: expirable.NewLRU(size, func(util.Uint160, *client.SubjectExtended) {}, ttl), subjExtCache: expirable.NewLRU(size, func(util.Uint160, subjectExtWithError) {}, ttl),
metrics: metrics,
} }
} }
func (m *morphFrostfsIDCache) GetSubject(addr util.Uint160) (*client.Subject, error) { func (m *morphFrostfsIDCache) GetSubject(addr util.Uint160) (*client.Subject, error) {
hit := false
startedAt := time.Now()
defer func() {
m.metrics.AddMethodDuration("GetSubject", time.Since(startedAt), hit)
}()
result, found := m.subjCache.Get(addr) result, found := m.subjCache.Get(addr)
if found { if found {
return result, nil hit = true
return result.subject, result.err
} }
result, err := m.subjProvider.GetSubject(addr) subj, err := m.subjProvider.GetSubject(addr)
if err != nil { if err != nil {
if m.isCacheableError(err) {
m.subjCache.Add(addr, subjectWithError{
err: err,
})
}
return nil, err return nil, err
} }
m.subjCache.Add(addr, result) m.subjCache.Add(addr, subjectWithError{subject: subj})
return result, nil return subj, nil
} }
func (m *morphFrostfsIDCache) GetSubjectExtended(addr util.Uint160) (*client.SubjectExtended, error) { func (m *morphFrostfsIDCache) GetSubjectExtended(addr util.Uint160) (*client.SubjectExtended, error) {
subjExt, found := m.subjExtCache.Get(addr) hit := false
startedAt := time.Now()
defer func() {
m.metrics.AddMethodDuration("GetSubjectExtended", time.Since(startedAt), hit)
}()
result, found := m.subjExtCache.Get(addr)
if found { if found {
return subjExt, nil hit = true
return result.subject, result.err
} }
var err error subjExt, err := m.subjProvider.GetSubjectExtended(addr)
subjExt, err = m.subjProvider.GetSubjectExtended(addr)
if err != nil { if err != nil {
if m.isCacheableError(err) {
m.subjExtCache.Add(addr, subjectExtWithError{
err: err,
})
m.subjCache.Add(addr, subjectWithError{
err: err,
})
}
return nil, err return nil, err
} }
m.subjExtCache.Add(addr, subjExt) m.subjExtCache.Add(addr, subjectExtWithError{subject: subjExt})
m.subjCache.Add(addr, subjectFromSubjectExtended(subjExt)) m.subjCache.Add(addr, subjectWithError{subject: subjectFromSubjectExtended(subjExt)})
return subjExt, nil return subjExt, nil
} }
func (m *morphFrostfsIDCache) isCacheableError(err error) bool {
return strings.Contains(err.Error(), frostfsidcore.SubjectNotFoundErrorMessage)
}
func subjectFromSubjectExtended(subjExt *client.SubjectExtended) *client.Subject { func subjectFromSubjectExtended(subjExt *client.SubjectExtended) *client.Subject {
return &client.Subject{ return &client.Subject{
PrimaryKey: subjExt.PrimaryKey, PrimaryKey: subjExt.PrimaryKey,

View file

@ -17,15 +17,16 @@ import (
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/rand" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/rand"
"github.com/nspcc-dev/neo-go/pkg/core/block" "github.com/nspcc-dev/neo-go/pkg/core/block"
"github.com/nspcc-dev/neo-go/pkg/core/state" "github.com/nspcc-dev/neo-go/pkg/core/state"
"github.com/nspcc-dev/neo-go/pkg/neorpc/result"
"github.com/nspcc-dev/neo-go/pkg/rpcclient/waiter"
"github.com/nspcc-dev/neo-go/pkg/smartcontract/trigger"
"github.com/nspcc-dev/neo-go/pkg/util" "github.com/nspcc-dev/neo-go/pkg/util"
"github.com/nspcc-dev/neo-go/pkg/vm/vmstate"
"go.uber.org/zap" "go.uber.org/zap"
) )
const ( const (
newEpochNotification = "NewEpoch" newEpochNotification = "NewEpoch"
// amount of tries(blocks) before notary deposit timeout.
notaryDepositRetriesAmount = 300
) )
func (c *cfg) initMorphComponents(ctx context.Context) { func (c *cfg) initMorphComponents(ctx context.Context) {
@ -128,7 +129,7 @@ func makeAndWaitNotaryDeposit(ctx context.Context, c *cfg) {
return return
} }
tx, err := makeNotaryDeposit(c) tx, vub, err := makeNotaryDeposit(c)
fatalOnErr(err) fatalOnErr(err)
if tx.Equals(util.Uint256{}) { if tx.Equals(util.Uint256{}) {
@ -139,11 +140,11 @@ func makeAndWaitNotaryDeposit(ctx context.Context, c *cfg) {
return return
} }
err = waitNotaryDeposit(ctx, c, tx) err = waitNotaryDeposit(ctx, c, tx, vub)
fatalOnErr(err) fatalOnErr(err)
} }
func makeNotaryDeposit(c *cfg) (util.Uint256, error) { func makeNotaryDeposit(c *cfg) (util.Uint256, uint32, error) {
const ( const (
// gasMultiplier defines how many times more the notary // gasMultiplier defines how many times more the notary
// balance must be compared to the GAS balance of the node: // balance must be compared to the GAS balance of the node:
@ -157,7 +158,7 @@ func makeNotaryDeposit(c *cfg) (util.Uint256, error) {
depositAmount, err := client.CalculateNotaryDepositAmount(c.cfgMorph.client, gasMultiplier, gasDivisor) depositAmount, err := client.CalculateNotaryDepositAmount(c.cfgMorph.client, gasMultiplier, gasDivisor)
if err != nil { if err != nil {
return util.Uint256{}, fmt.Errorf("could not calculate notary deposit: %w", err) return util.Uint256{}, 0, fmt.Errorf("could not calculate notary deposit: %w", err)
} }
return c.cfgMorph.client.DepositEndlessNotary(depositAmount) return c.cfgMorph.client.DepositEndlessNotary(depositAmount)
@ -168,32 +169,43 @@ var (
errNotaryDepositTimeout = errors.New("notary deposit tx has not appeared in the network") errNotaryDepositTimeout = errors.New("notary deposit tx has not appeared in the network")
) )
func waitNotaryDeposit(ctx context.Context, c *cfg, tx util.Uint256) error { type waiterClient struct {
for range notaryDepositRetriesAmount { c *client.Client
c.log.Debug(logs.ClientAttemptToWaitForNotaryDepositTransactionToGetPersisted) }
select {
case <-ctx.Done():
return ctx.Err()
default:
}
ok, err := c.cfgMorph.client.TxHalt(tx) func (w *waiterClient) Context() context.Context {
if err == nil { return context.Background()
if ok { }
c.log.Info(logs.ClientNotaryDepositTransactionWasSuccessfullyPersisted)
return nil
}
return errNotaryDepositFail func (w *waiterClient) GetApplicationLog(hash util.Uint256, trig *trigger.Type) (*result.ApplicationLog, error) {
} return w.c.GetApplicationLog(hash, trig)
}
err = c.cfgMorph.client.Wait(ctx, 1) func (w *waiterClient) GetBlockCount() (uint32, error) {
if err != nil { return w.c.BlockCount()
return fmt.Errorf("could not wait for one block in chain: %w", err) }
}
func (w *waiterClient) GetVersion() (*result.Version, error) {
return w.c.GetVersion()
}
func waitNotaryDeposit(ctx context.Context, c *cfg, tx util.Uint256, vub uint32) error {
w, err := waiter.NewPollingBased(&waiterClient{c: c.cfgMorph.client})
if err != nil {
return fmt.Errorf("could not create notary deposit waiter: %w", err)
} }
res, err := w.WaitAny(ctx, vub, tx)
return errNotaryDepositTimeout if err != nil {
if errors.Is(err, waiter.ErrTxNotAccepted) {
return errNotaryDepositTimeout
}
return fmt.Errorf("could not wait for notary deposit persists in chain: %w", err)
}
if res.Execution.VMState.HasFlag(vmstate.Halt) {
c.log.Info(logs.ClientNotaryDepositTransactionWasSuccessfullyPersisted)
return nil
}
return errNotaryDepositFail
} }
func listenMorphNotifications(ctx context.Context, c *cfg) { func listenMorphNotifications(ctx context.Context, c *cfg) {

View file

@ -192,7 +192,7 @@ func addNewEpochNotificationHandlers(c *cfg) {
if c.cfgMorph.notaryEnabled { if c.cfgMorph.notaryEnabled {
addNewEpochAsyncNotificationHandler(c, func(_ event.Event) { addNewEpochAsyncNotificationHandler(c, func(_ event.Event) {
_, err := makeNotaryDeposit(c) _, _, err := makeNotaryDeposit(c)
if err != nil { if err != nil {
c.log.Error(logs.FrostFSNodeCouldNotMakeNotaryDeposit, c.log.Error(logs.FrostFSNodeCouldNotMakeNotaryDeposit,
zap.String("error", err.Error()), zap.String("error", err.Error()),

View file

@ -178,7 +178,8 @@ func initObjectService(c *cfg) {
sSearchV2 := createSearchSvcV2(sSearch, keyStorage) sSearchV2 := createSearchSvcV2(sSearch, keyStorage)
sGet := createGetService(c, keyStorage, traverseGen, c.clientCache, c.cfgObject.cnrSource) sGet := createGetService(c, keyStorage, traverseGen, c.clientCache, c.cfgObject.cnrSource,
c.ObjectCfg.priorityMetrics)
*c.cfgObject.getSvc = *sGet // need smth better *c.cfgObject.getSvc = *sGet // need smth better
@ -389,6 +390,7 @@ func createSearchSvcV2(sSearch *searchsvc.Service, keyStorage *util.KeyStorage)
func createGetService(c *cfg, keyStorage *util.KeyStorage, traverseGen *util.TraverserGenerator, func createGetService(c *cfg, keyStorage *util.KeyStorage, traverseGen *util.TraverserGenerator,
coreConstructor *cache.ClientCache, coreConstructor *cache.ClientCache,
containerSource containercore.Source, containerSource containercore.Source,
priorityMetrics []placement.Metric,
) *getsvc.Service { ) *getsvc.Service {
ls := c.cfgObject.cfgLocalStorage.localStorage ls := c.cfgObject.cfgLocalStorage.localStorage
@ -398,6 +400,8 @@ func createGetService(c *cfg, keyStorage *util.KeyStorage, traverseGen *util.Tra
ls, ls,
traverseGen.WithTraverseOptions( traverseGen.WithTraverseOptions(
placement.SuccessAfter(1), placement.SuccessAfter(1),
placement.WithPriorityMetrics(priorityMetrics),
placement.WithNodeState(c),
), ),
coreConstructor, coreConstructor,
containerSource, containerSource,

View file

@ -87,6 +87,7 @@ FROSTFS_OBJECT_PUT_REMOTE_POOL_SIZE=100
FROSTFS_OBJECT_PUT_LOCAL_POOL_SIZE=200 FROSTFS_OBJECT_PUT_LOCAL_POOL_SIZE=200
FROSTFS_OBJECT_PUT_SKIP_SESSION_TOKEN_ISSUER_VERIFICATION=true FROSTFS_OBJECT_PUT_SKIP_SESSION_TOKEN_ISSUER_VERIFICATION=true
FROSTFS_OBJECT_DELETE_TOMBSTONE_LIFETIME=10 FROSTFS_OBJECT_DELETE_TOMBSTONE_LIFETIME=10
FROSTFS_OBJECT_GET_PRIORITY="$attribute:ClusterName $attribute:UN-LOCODE"
# Storage engine section # Storage engine section
FROSTFS_STORAGE_SHARD_POOL_SIZE=15 FROSTFS_STORAGE_SHARD_POOL_SIZE=15

View file

@ -131,6 +131,9 @@
"remote_pool_size": 100, "remote_pool_size": 100,
"local_pool_size": 200, "local_pool_size": 200,
"skip_session_token_issuer_verification": true "skip_session_token_issuer_verification": true
},
"get": {
"priority": ["$attribute:ClusterName", "$attribute:UN-LOCODE"]
} }
}, },
"storage": { "storage": {

View file

@ -114,6 +114,10 @@ object:
remote_pool_size: 100 # number of async workers for remote PUT operations remote_pool_size: 100 # number of async workers for remote PUT operations
local_pool_size: 200 # number of async workers for local PUT operations local_pool_size: 200 # number of async workers for local PUT operations
skip_session_token_issuer_verification: true # session token issuer verification will be skipped if true skip_session_token_issuer_verification: true # session token issuer verification will be skipped if true
get:
priority: # list of metrics of nodes for prioritization
- $attribute:ClusterName
- $attribute:UN-LOCODE
storage: storage:
# note: shard configuration can be omitted for relay node (see `node.relay`) # note: shard configuration can be omitted for relay node (see `node.relay`)

View file

@ -407,13 +407,17 @@ Contains object-service related parameters.
object: object:
put: put:
remote_pool_size: 100 remote_pool_size: 100
get:
priority:
- $attribute:ClusterName
``` ```
| Parameter | Type | Default value | Description | | Parameter | Type | Default value | Description |
|-----------------------------|-------|---------------|------------------------------------------------------------------------------------------------| |-----------------------------|------------|---------------|------------------------------------------------------------------------------------------------------|
| `delete.tombstone_lifetime` | `int` | `5` | Tombstone lifetime for removed objects in epochs. | | `delete.tombstone_lifetime` | `int` | `5` | Tombstone lifetime for removed objects in epochs. |
| `put.remote_pool_size` | `int` | `10` | Max pool size for performing remote `PUT` operations. Used by Policer and Replicator services. | | `put.remote_pool_size` | `int` | `10` | Max pool size for performing remote `PUT` operations. Used by Policer and Replicator services. |
| `put.local_pool_size` | `int` | `10` | Max pool size for performing local `PUT` operations. Used by Policer and Replicator services. | | `put.local_pool_size` | `int` | `10` | Max pool size for performing local `PUT` operations. Used by Policer and Replicator services. |
| `get.priority` | `[]string` | | List of metrics of nodes for prioritization. Used for computing response on GET and SEARCH requests. |
# `runtime` section # `runtime` section
Contains runtime parameters. Contains runtime parameters.

3
go.mod
View file

@ -19,6 +19,7 @@ require (
github.com/cheggaaa/pb v1.0.29 github.com/cheggaaa/pb v1.0.29
github.com/chzyer/readline v1.5.1 github.com/chzyer/readline v1.5.1
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc
github.com/felixge/fgprof v0.9.5
github.com/flynn-archive/go-shlex v0.0.0-20150515145356-3f9db97f8568 github.com/flynn-archive/go-shlex v0.0.0-20150515145356-3f9db97f8568
github.com/gdamore/tcell/v2 v2.7.4 github.com/gdamore/tcell/v2 v2.7.4
github.com/go-pkgz/expirable-cache/v3 v3.0.0 github.com/go-pkgz/expirable-cache/v3 v3.0.0
@ -26,7 +27,6 @@ require (
github.com/hashicorp/golang-lru/v2 v2.0.7 github.com/hashicorp/golang-lru/v2 v2.0.7
github.com/klauspost/compress v1.17.4 github.com/klauspost/compress v1.17.4
github.com/mailru/easyjson v0.7.7 github.com/mailru/easyjson v0.7.7
github.com/mitchellh/go-homedir v1.1.0
github.com/mr-tron/base58 v1.2.0 github.com/mr-tron/base58 v1.2.0
github.com/multiformats/go-multiaddr v0.12.1 github.com/multiformats/go-multiaddr v0.12.1
github.com/nspcc-dev/neo-go v0.106.3 github.com/nspcc-dev/neo-go v0.106.3
@ -77,6 +77,7 @@ require (
github.com/go-logr/logr v1.4.2 // indirect github.com/go-logr/logr v1.4.2 // indirect
github.com/go-logr/stdr v1.2.2 // indirect github.com/go-logr/stdr v1.2.2 // indirect
github.com/golang/snappy v0.0.4 // indirect github.com/golang/snappy v0.0.4 // indirect
github.com/google/pprof v0.0.0-20240227163752-401108e1b7e7 // indirect
github.com/gorilla/websocket v1.5.1 // indirect github.com/gorilla/websocket v1.5.1 // indirect
github.com/grpc-ecosystem/go-grpc-middleware/providers/prometheus v1.0.0 // indirect github.com/grpc-ecosystem/go-grpc-middleware/providers/prometheus v1.0.0 // indirect
github.com/grpc-ecosystem/go-grpc-middleware/v2 v2.0.1 // indirect github.com/grpc-ecosystem/go-grpc-middleware/v2 v2.0.1 // indirect

BIN
go.sum

Binary file not shown.

View file

@ -142,7 +142,6 @@ const (
ClientNotaryRequestWithPreparedMainTXInvoked = "notary request with prepared main TX invoked" ClientNotaryRequestWithPreparedMainTXInvoked = "notary request with prepared main TX invoked"
ClientNotaryRequestInvoked = "notary request invoked" ClientNotaryRequestInvoked = "notary request invoked"
ClientNotaryDepositTransactionWasSuccessfullyPersisted = "notary deposit transaction was successfully persisted" ClientNotaryDepositTransactionWasSuccessfullyPersisted = "notary deposit transaction was successfully persisted"
ClientAttemptToWaitForNotaryDepositTransactionToGetPersisted = "attempt to wait for notary deposit transaction to get persisted"
ClientNeoClientInvoke = "neo client invoke" ClientNeoClientInvoke = "neo client invoke"
ClientNativeGasTransferInvoke = "native gas transfer invoke" ClientNativeGasTransferInvoke = "native gas transfer invoke"
ClientBatchGasTransferInvoke = "batch gas transfer invoke" ClientBatchGasTransferInvoke = "batch gas transfer invoke"

View file

@ -40,13 +40,14 @@ func (s *Server) depositMainNotary() (tx util.Uint256, err error) {
) )
} }
func (s *Server) depositSideNotary() (tx util.Uint256, err error) { func (s *Server) depositSideNotary() (util.Uint256, error) {
depositAmount, err := client.CalculateNotaryDepositAmount(s.morphClient, gasMultiplier, gasDivisor) depositAmount, err := client.CalculateNotaryDepositAmount(s.morphClient, gasMultiplier, gasDivisor)
if err != nil { if err != nil {
return util.Uint256{}, fmt.Errorf("could not calculate side notary deposit amount: %w", err) return util.Uint256{}, fmt.Errorf("could not calculate side notary deposit amount: %w", err)
} }
return s.morphClient.DepositEndlessNotary(depositAmount) tx, _, err := s.morphClient.DepositEndlessNotary(depositAmount)
return tx, err
} }
func (s *Server) notaryHandler(_ event.Event) { func (s *Server) notaryHandler(_ event.Event) {

View file

@ -61,7 +61,7 @@ func TestBlobovniczaTreeFillPercentRebuild(t *testing.T) {
require.NoError(t, b.Init()) require.NoError(t, b.Init())
storageIDs := make(map[oid.Address][]byte) storageIDs := make(map[oid.Address][]byte)
for i := 0; i < 100; i++ { for range 100 {
obj := blobstortest.NewObject(64 * 1024) // 64KB object obj := blobstortest.NewObject(64 * 1024) // 64KB object
data, err := obj.Marshal() data, err := obj.Marshal()
require.NoError(t, err) require.NoError(t, err)
@ -168,7 +168,7 @@ func TestBlobovniczaTreeFillPercentRebuild(t *testing.T) {
storageIDs := make(map[oid.Address][]byte) storageIDs := make(map[oid.Address][]byte)
toDelete := make(map[oid.Address][]byte) toDelete := make(map[oid.Address][]byte)
for i := 0; i < 100; i++ { // 2 objects for one blobovnicza, so 50 DBs total will be created for i := range 100 { // 2 objects for one blobovnicza, so 50 DBs total will be created
obj := blobstortest.NewObject(64 * 1024) obj := blobstortest.NewObject(64 * 1024)
data, err := obj.Marshal() data, err := obj.Marshal()
require.NoError(t, err) require.NoError(t, err)
@ -236,7 +236,7 @@ func TestBlobovniczaTreeFillPercentRebuild(t *testing.T) {
require.NoError(t, b.Init()) require.NoError(t, b.Init())
storageIDs := make(map[oid.Address][]byte) storageIDs := make(map[oid.Address][]byte)
for i := 0; i < 100; i++ { // 2 objects for one blobovnicza, so 50 DBs total will be created for range 100 { // 2 objects for one blobovnicza, so 50 DBs total will be created
obj := blobstortest.NewObject(64 * 1024) obj := blobstortest.NewObject(64 * 1024)
data, err := obj.Marshal() data, err := obj.Marshal()
require.NoError(t, err) require.NoError(t, err)

View file

@ -47,7 +47,7 @@ func TestIterateObjects(t *testing.T) {
mObjs := make(map[string]addrData) mObjs := make(map[string]addrData)
for i := uint64(0); i < objNum; i++ { for i := range uint64(objNum) {
sz := smalSz sz := smalSz
big := i < objNum/2 big := i < objNum/2

View file

@ -151,7 +151,7 @@ func TestErrorReporting(t *testing.T) {
checkShardState(t, te.ng, te.shards[1].id, 0, mode.ReadWrite) checkShardState(t, te.ng, te.shards[1].id, 0, mode.ReadWrite)
} }
for i := uint32(0); i < 2; i++ { for i := range uint32(2) {
_, err = te.ng.Get(context.Background(), GetPrm{addr: object.AddressOf(obj)}) _, err = te.ng.Get(context.Background(), GetPrm{addr: object.AddressOf(obj)})
require.Error(t, err) require.Error(t, err)
checkShardState(t, te.ng, te.shards[0].id, errThreshold+i, mode.ReadOnly) checkShardState(t, te.ng, te.shards[0].id, errThreshold+i, mode.ReadOnly)

View file

@ -1,9 +1,9 @@
package pilorama package pilorama
import ( import (
"cmp"
"encoding/binary" "encoding/binary"
"slices" "slices"
"sort"
"sync" "sync"
"time" "time"
@ -48,8 +48,8 @@ func (b *batch) run() {
// Sorting without a mutex is ok, because we append to this slice only if timer is non-nil. // Sorting without a mutex is ok, because we append to this slice only if timer is non-nil.
// See (*boltForest).addBatch for details. // See (*boltForest).addBatch for details.
sort.Slice(b.operations, func(i, j int) bool { slices.SortFunc(b.operations, func(mi, mj *Move) int {
return b.operations[i].Time < b.operations[j].Time return cmp.Compare(mi.Time, mj.Time)
}) })
b.operations = slices.CompactFunc(b.operations, func(x, y *Move) bool { return x.Time == y.Time }) b.operations = slices.CompactFunc(b.operations, func(x, y *Move) bool { return x.Time == y.Time })

View file

@ -10,7 +10,6 @@ import (
"os" "os"
"path/filepath" "path/filepath"
"slices" "slices"
"sort"
"strconv" "strconv"
"sync" "sync"
"time" "time"
@ -705,7 +704,7 @@ func (t *boltForest) applyOperation(logBucket, treeBucket *bbolt.Bucket, ms []*M
key, value = c.Prev() key, value = c.Prev()
} }
for i := range len(ms) { for i := range ms {
// Loop invariant: key represents the next stored timestamp after ms[i].Time. // Loop invariant: key represents the next stored timestamp after ms[i].Time.
// 2. Insert the operation. // 2. Insert the operation.
@ -1093,14 +1092,19 @@ func (t *boltForest) TreeSortedByFilename(ctx context.Context, cid cidSDK.ID, tr
return res, last, metaerr.Wrap(err) return res, last, metaerr.Wrap(err)
} }
func sortByFilename(nodes []NodeInfo) {
slices.SortFunc(nodes, func(a, b NodeInfo) int {
return bytes.Compare(a.Meta.GetAttr(AttributeFilename), b.Meta.GetAttr(AttributeFilename))
})
}
func sortAndCut(result []NodeInfo, last *string) []NodeInfo { func sortAndCut(result []NodeInfo, last *string) []NodeInfo {
var lastBytes []byte var lastBytes []byte
if last != nil { if last != nil {
lastBytes = []byte(*last) lastBytes = []byte(*last)
} }
sort.Slice(result, func(i, j int) bool { sortByFilename(result)
return bytes.Compare(result[i].Meta.GetAttr(AttributeFilename), result[j].Meta.GetAttr(AttributeFilename)) == -1
})
for i := range result { for i := range result {
if lastBytes == nil || bytes.Compare(lastBytes, result[i].Meta.GetAttr(AttributeFilename)) == -1 { if lastBytes == nil || bytes.Compare(lastBytes, result[i].Meta.GetAttr(AttributeFilename)) == -1 {
return result[i:] return result[i:]

View file

@ -1,7 +1,6 @@
package pilorama package pilorama
import ( import (
"bytes"
"context" "context"
"errors" "errors"
"fmt" "fmt"
@ -192,9 +191,7 @@ func (f *memoryForest) TreeSortedByFilename(_ context.Context, cid cid.ID, treeI
return nil, start, nil return nil, start, nil
} }
sort.Slice(res, func(i, j int) bool { sortByFilename(res)
return bytes.Compare(res[i].Meta.GetAttr(AttributeFilename), res[j].Meta.GetAttr(AttributeFilename)) == -1
})
r := mergeNodeInfos(res) r := mergeNodeInfos(res)
for i := range r { for i := range r {

View file

@ -1081,7 +1081,7 @@ func prepareRandomTree(nodeCount, opCount int) []Move {
} }
func compareForests(t *testing.T, expected, actual Forest, cid cidSDK.ID, treeID string, nodeCount int) { func compareForests(t *testing.T, expected, actual Forest, cid cidSDK.ID, treeID string, nodeCount int) {
for i := uint64(0); i < uint64(nodeCount); i++ { for i := range uint64(nodeCount) {
expectedMeta, expectedParent, err := expected.TreeGetMeta(context.Background(), cid, treeID, i) expectedMeta, expectedParent, err := expected.TreeGetMeta(context.Background(), cid, treeID, i)
require.NoError(t, err) require.NoError(t, err)
actualMeta, actualParent, err := actual.TreeGetMeta(context.Background(), cid, treeID, i) actualMeta, actualParent, err := actual.TreeGetMeta(context.Background(), cid, treeID, i)

View file

@ -1,6 +1,9 @@
package pilorama package pilorama
import "sort" import (
"cmp"
"slices"
)
// nodeInfo couples parent and metadata. // nodeInfo couples parent and metadata.
type nodeInfo struct { type nodeInfo struct {
@ -131,10 +134,10 @@ func (t tree) getChildren(parent Node) []Node {
} }
} }
sort.Slice(children, func(i, j int) bool { slices.SortFunc(children, func(ci, cj uint64) int {
a := t.infoMap[children[i]] a := t.infoMap[ci]
b := t.infoMap[children[j]] b := t.infoMap[cj]
return a.Meta.Time < b.Meta.Time return cmp.Compare(a.Meta.Time, b.Meta.Time)
}) })
return children return children
} }

View file

@ -216,7 +216,7 @@ func TestRefillMetabase(t *testing.T) {
locked := make([]oid.ID, 1, 2) locked := make([]oid.ID, 1, 2)
locked[0] = oidtest.ID() locked[0] = oidtest.ID()
cnrLocked := cidtest.ID() cnrLocked := cidtest.ID()
for i := uint64(0); i < objNum; i++ { for range objNum {
obj := objecttest.Object() obj := objecttest.Object()
obj.SetType(objectSDK.TypeRegular) obj.SetType(objectSDK.TypeRegular)

View file

@ -14,7 +14,7 @@ func TestLimiter(t *testing.T) {
l := newFlushLimiter(uint64(maxSize)) l := newFlushLimiter(uint64(maxSize))
var currSize atomic.Int64 var currSize atomic.Int64
var eg errgroup.Group var eg errgroup.Group
for i := 0; i < 10_000; i++ { for range 10_000 {
eg.Go(func() error { eg.Go(func() error {
defer l.release(single) defer l.release(single)
defer currSize.Add(-1) defer currSize.Add(-1)

View file

@ -19,6 +19,7 @@ import (
"github.com/nspcc-dev/neo-go/pkg/core/transaction" "github.com/nspcc-dev/neo-go/pkg/core/transaction"
"github.com/nspcc-dev/neo-go/pkg/crypto/keys" "github.com/nspcc-dev/neo-go/pkg/crypto/keys"
"github.com/nspcc-dev/neo-go/pkg/encoding/fixedn" "github.com/nspcc-dev/neo-go/pkg/encoding/fixedn"
"github.com/nspcc-dev/neo-go/pkg/neorpc/result"
"github.com/nspcc-dev/neo-go/pkg/rpcclient" "github.com/nspcc-dev/neo-go/pkg/rpcclient"
"github.com/nspcc-dev/neo-go/pkg/rpcclient/actor" "github.com/nspcc-dev/neo-go/pkg/rpcclient/actor"
"github.com/nspcc-dev/neo-go/pkg/rpcclient/gas" "github.com/nspcc-dev/neo-go/pkg/rpcclient/gas"
@ -461,6 +462,28 @@ func (c *Client) TxHalt(h util.Uint256) (res bool, err error) {
return len(aer.Executions) > 0 && aer.Executions[0].VMState.HasFlag(vmstate.Halt), nil return len(aer.Executions) > 0 && aer.Executions[0].VMState.HasFlag(vmstate.Halt), nil
} }
func (c *Client) GetApplicationLog(hash util.Uint256, trig *trigger.Type) (*result.ApplicationLog, error) {
c.switchLock.RLock()
defer c.switchLock.RUnlock()
if c.inactive {
return nil, ErrConnectionLost
}
return c.client.GetApplicationLog(hash, trig)
}
func (c *Client) GetVersion() (*result.Version, error) {
c.switchLock.RLock()
defer c.switchLock.RUnlock()
if c.inactive {
return nil, ErrConnectionLost
}
return c.client.GetVersion()
}
// TxHeight returns true if transaction has been successfully executed and persisted. // TxHeight returns true if transaction has been successfully executed and persisted.
func (c *Client) TxHeight(h util.Uint256) (res uint32, err error) { func (c *Client) TxHeight(h util.Uint256) (res uint32, err error) {
c.switchLock.RLock() c.switchLock.RLock()

View file

@ -4,11 +4,11 @@ import (
"context" "context"
"errors" "errors"
"fmt" "fmt"
"net"
"time" "time"
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs" "git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/metrics" "git.frostfs.info/TrueCloudLab/frostfs-node/internal/metrics"
internalNet "git.frostfs.info/TrueCloudLab/frostfs-node/internal/net"
morphmetrics "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/metrics" morphmetrics "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/metrics"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger"
lru "github.com/hashicorp/golang-lru/v2" lru "github.com/hashicorp/golang-lru/v2"
@ -48,7 +48,7 @@ type cfg struct {
morphCacheMetrics metrics.MorphCacheMetrics morphCacheMetrics metrics.MorphCacheMetrics
dialerSource *internalNet.DialerSource dialerSource DialerSource
} }
const ( const (
@ -68,6 +68,7 @@ func defaultConfig() *cfg {
Scopes: transaction.Global, Scopes: transaction.Global,
}, },
morphCacheMetrics: &morphmetrics.NoopMorphCacheMetrics{}, morphCacheMetrics: &morphmetrics.NoopMorphCacheMetrics{},
dialerSource: &noopDialerSource{},
} }
} }
@ -296,7 +297,17 @@ func WithMorphCacheMetrics(morphCacheMetrics metrics.MorphCacheMetrics) Option {
} }
} }
func WithDialerSource(ds *internalNet.DialerSource) Option { type DialerSource interface {
NetContextDialer() func(context.Context, string, string) (net.Conn, error)
}
type noopDialerSource struct{}
func (ds *noopDialerSource) NetContextDialer() func(context.Context, string, string) (net.Conn, error) {
return nil
}
func WithDialerSource(ds DialerSource) Option {
return func(c *cfg) { return func(c *cfg) {
c.dialerSource = ds c.dialerSource = ds
} }

View file

@ -140,7 +140,7 @@ func (c *Client) ProbeNotary() (res bool) {
// use this function. // use this function.
// //
// This function must be invoked with notary enabled otherwise it throws panic. // This function must be invoked with notary enabled otherwise it throws panic.
func (c *Client) DepositNotary(amount fixedn.Fixed8, delta uint32) (res util.Uint256, err error) { func (c *Client) DepositNotary(amount fixedn.Fixed8, delta uint32) (util.Uint256, error) {
c.switchLock.RLock() c.switchLock.RLock()
defer c.switchLock.RUnlock() defer c.switchLock.RUnlock()
@ -163,7 +163,8 @@ func (c *Client) DepositNotary(amount fixedn.Fixed8, delta uint32) (res util.Uin
} }
till := max(int64(bc+delta), currentTill) till := max(int64(bc+delta), currentTill)
return c.depositNotary(amount, till) res, _, err := c.depositNotary(amount, till)
return res, err
} }
// DepositEndlessNotary calls notary deposit method. Unlike `DepositNotary`, // DepositEndlessNotary calls notary deposit method. Unlike `DepositNotary`,
@ -171,12 +172,12 @@ func (c *Client) DepositNotary(amount fixedn.Fixed8, delta uint32) (res util.Uin
// This allows to avoid ValidAfterDeposit failures. // This allows to avoid ValidAfterDeposit failures.
// //
// This function must be invoked with notary enabled otherwise it throws panic. // This function must be invoked with notary enabled otherwise it throws panic.
func (c *Client) DepositEndlessNotary(amount fixedn.Fixed8) (res util.Uint256, err error) { func (c *Client) DepositEndlessNotary(amount fixedn.Fixed8) (util.Uint256, uint32, error) {
c.switchLock.RLock() c.switchLock.RLock()
defer c.switchLock.RUnlock() defer c.switchLock.RUnlock()
if c.inactive { if c.inactive {
return util.Uint256{}, ErrConnectionLost return util.Uint256{}, 0, ErrConnectionLost
} }
if c.notary == nil { if c.notary == nil {
@ -187,7 +188,7 @@ func (c *Client) DepositEndlessNotary(amount fixedn.Fixed8) (res util.Uint256, e
return c.depositNotary(amount, math.MaxUint32) return c.depositNotary(amount, math.MaxUint32)
} }
func (c *Client) depositNotary(amount fixedn.Fixed8, till int64) (res util.Uint256, err error) { func (c *Client) depositNotary(amount fixedn.Fixed8, till int64) (util.Uint256, uint32, error) {
txHash, vub, err := c.gasToken.Transfer( txHash, vub, err := c.gasToken.Transfer(
c.accAddr, c.accAddr,
c.notary.notary, c.notary.notary,
@ -195,7 +196,7 @@ func (c *Client) depositNotary(amount fixedn.Fixed8, till int64) (res util.Uint2
[]any{c.acc.PrivateKey().GetScriptHash(), till}) []any{c.acc.PrivateKey().GetScriptHash(), till})
if err != nil { if err != nil {
if !errors.Is(err, neorpc.ErrAlreadyExists) { if !errors.Is(err, neorpc.ErrAlreadyExists) {
return util.Uint256{}, fmt.Errorf("can't make notary deposit: %w", err) return util.Uint256{}, 0, fmt.Errorf("can't make notary deposit: %w", err)
} }
// Transaction is already in mempool waiting to be processed. // Transaction is already in mempool waiting to be processed.
@ -205,7 +206,7 @@ func (c *Client) depositNotary(amount fixedn.Fixed8, till int64) (res util.Uint2
zap.Int64("expire_at", till), zap.Int64("expire_at", till),
zap.Uint32("vub", vub), zap.Uint32("vub", vub),
zap.Error(err)) zap.Error(err))
return util.Uint256{}, nil return util.Uint256{}, 0, nil
} }
c.logger.Info(logs.ClientNotaryDepositInvoke, c.logger.Info(logs.ClientNotaryDepositInvoke,
@ -214,7 +215,7 @@ func (c *Client) depositNotary(amount fixedn.Fixed8, till int64) (res util.Uint2
zap.Uint32("vub", vub), zap.Uint32("vub", vub),
zap.Stringer("tx_hash", txHash.Reverse())) zap.Stringer("tx_hash", txHash.Reverse()))
return txHash, nil return txHash, vub, nil
} }
// GetNotaryDeposit returns deposit of client's account in notary contract. // GetNotaryDeposit returns deposit of client's account in notary contract.

View file

@ -8,7 +8,7 @@ import (
) )
func tickN(t *timer.BlockTimer, n uint32) { func tickN(t *timer.BlockTimer, n uint32) {
for i := uint32(0); i < n; i++ { for range n {
t.Tick(0) t.Tick(0)
} }
} }

View file

@ -537,10 +537,7 @@ func (ac *apeChecker) isContainerKey(pk []byte, cnrID cid.ID, cont *containercor
return false, err return false, err
} }
in, err := isContainerNode(nm, pk, binCnrID, cont) if isContainerNode(nm, pk, binCnrID, cont) {
if err != nil {
return false, err
} else if in {
return true, nil return true, nil
} }
@ -551,24 +548,24 @@ func (ac *apeChecker) isContainerKey(pk []byte, cnrID cid.ID, cont *containercor
return false, err return false, err
} }
return isContainerNode(nm, pk, binCnrID, cont) return isContainerNode(nm, pk, binCnrID, cont), nil
} }
func isContainerNode(nm *netmapSDK.NetMap, pk, binCnrID []byte, cont *containercore.Container) (bool, error) { func isContainerNode(nm *netmapSDK.NetMap, pk, binCnrID []byte, cont *containercore.Container) bool {
cnrVectors, err := nm.ContainerNodes(cont.Value.PlacementPolicy(), binCnrID) // It could an error only if the network map doesn't have enough nodes to
if err != nil { // fulfil the policy. It's a logical error that doesn't affect an actor role
return false, err // determining, so we ignore it
} cnrVectors, _ := nm.ContainerNodes(cont.Value.PlacementPolicy(), binCnrID)
for i := range cnrVectors { for i := range cnrVectors {
for j := range cnrVectors[i] { for j := range cnrVectors[i] {
if bytes.Equal(cnrVectors[i][j].PublicKey(), pk) { if bytes.Equal(cnrVectors[i][j].PublicKey(), pk) {
return true, nil return true
} }
} }
} }
return false, nil return false
} }
func (ac *apeChecker) namespaceByOwner(owner *refs.OwnerID) (string, error) { func (ac *apeChecker) namespaceByOwner(owner *refs.OwnerID) (string, error) {

View file

@ -28,7 +28,7 @@ type executorSvc struct {
type NodeState interface { type NodeState interface {
// LocalNodeInfo must return current node state // LocalNodeInfo must return current node state
// in FrostFS API v2 NodeInfo structure. // in FrostFS API v2 NodeInfo structure.
LocalNodeInfo() (*netmap.NodeInfo, error) LocalNodeInfo() *netmapSDK.NodeInfo
// ReadCurrentNetMap reads current local network map of the storage node // ReadCurrentNetMap reads current local network map of the storage node
// into the given parameter. Returns any error encountered which prevented // into the given parameter. Returns any error encountered which prevented
@ -64,39 +64,15 @@ func NewExecutionService(s NodeState, v versionsdk.Version, netInfo NetworkInfo,
func (s *executorSvc) LocalNodeInfo( func (s *executorSvc) LocalNodeInfo(
_ context.Context, _ context.Context,
req *netmap.LocalNodeInfoRequest, _ *netmap.LocalNodeInfoRequest,
) (*netmap.LocalNodeInfoResponse, error) { ) (*netmap.LocalNodeInfoResponse, error) {
verV2 := req.GetMetaHeader().GetVersion() ni := s.state.LocalNodeInfo()
if verV2 == nil { var nodeInfo netmap.NodeInfo
return nil, errors.New("missing version") ni.WriteToV2(&nodeInfo)
}
var ver versionsdk.Version
if err := ver.ReadFromV2(*verV2); err != nil {
return nil, fmt.Errorf("can't read version: %w", err)
}
ni, err := s.state.LocalNodeInfo()
if err != nil {
return nil, err
}
if addrNum := ni.NumberOfAddresses(); addrNum > 0 && ver.Minor() <= 7 {
ni2 := new(netmap.NodeInfo)
ni2.SetPublicKey(ni.GetPublicKey())
ni2.SetState(ni.GetState())
ni2.SetAttributes(ni.GetAttributes())
ni.IterateAddresses(func(s string) bool {
ni2.SetAddresses(s)
return true
})
ni = ni2
}
body := new(netmap.LocalNodeInfoResponseBody) body := new(netmap.LocalNodeInfoResponseBody)
body.SetVersion(&s.version) body.SetVersion(&s.version)
body.SetNodeInfo(ni) body.SetNodeInfo(&nodeInfo)
resp := new(netmap.LocalNodeInfoResponse) resp := new(netmap.LocalNodeInfoResponse)
resp.SetBody(body) resp.SetBody(body)

View file

@ -50,7 +50,7 @@ func testGenerateMetaHeader(depth uint32, b *acl.BearerToken, s *session.Token)
metaHeader.SetBearerToken(b) metaHeader.SetBearerToken(b)
metaHeader.SetSessionToken(s) metaHeader.SetSessionToken(s)
for i := uint32(0); i < depth; i++ { for range depth {
link := metaHeader link := metaHeader
metaHeader = new(session.RequestMetaHeader) metaHeader = new(session.RequestMetaHeader)
metaHeader.SetOrigin(link) metaHeader.SetOrigin(link)

View file

@ -67,9 +67,6 @@ type Prm struct {
// If SoftAPECheck is set to true, then NoRuleFound is interpreted as allow. // If SoftAPECheck is set to true, then NoRuleFound is interpreted as allow.
SoftAPECheck bool SoftAPECheck bool
// If true, object headers will not retrieved from storage engine.
WithoutHeaderRequest bool
// The request's bearer token. It is used in order to check APE overrides with the token. // The request's bearer token. It is used in order to check APE overrides with the token.
BearerToken *bearer.Token BearerToken *bearer.Token

View file

@ -695,6 +695,7 @@ func TestPutECChunk(t *testing.T) {
nm := &netmapStub{ nm := &netmapStub{
currentEpoch: 100, currentEpoch: 100,
netmaps: map[uint64]*netmapSDK.NetMap{ netmaps: map[uint64]*netmapSDK.NetMap{
99: netmap,
100: netmap, 100: netmap,
}, },
} }

View file

@ -3,6 +3,7 @@ package ape
import ( import (
"context" "context"
"crypto/sha256" "crypto/sha256"
"errors"
"fmt" "fmt"
"net" "net"
"strconv" "strconv"
@ -11,6 +12,7 @@ import (
aperequest "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/ape/request" aperequest "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/ape/request"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/netmap" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/netmap"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object"
apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/acl" "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/acl"
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id" cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object" objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
@ -24,6 +26,8 @@ import (
var defaultRequest = aperequest.Request{} var defaultRequest = aperequest.Request{}
var errECMissingParentObjectID = errors.New("missing EC parent object ID")
func nativeSchemaRole(role acl.Role) string { func nativeSchemaRole(role acl.Role) string {
switch role { switch role {
case acl.RoleOwner: case acl.RoleOwner:
@ -116,13 +120,16 @@ func (c *checkerImpl) newAPERequest(ctx context.Context, prm Prm) (aperequest.Re
var header *objectV2.Header var header *objectV2.Header
if prm.Header != nil { if prm.Header != nil {
header = prm.Header header = prm.Header
} else if prm.Object != nil && !prm.WithoutHeaderRequest { } else if prm.Object != nil {
headerObjSDK, err := c.headerProvider.GetHeader(ctx, prm.Container, *prm.Object, true) headerObjSDK, err := c.headerProvider.GetHeader(ctx, prm.Container, *prm.Object, true)
if err == nil { if err == nil {
header = headerObjSDK.ToV2().GetHeader() header = headerObjSDK.ToV2().GetHeader()
} }
} }
header = c.fillHeaderWithECParent(ctx, prm, header) header, err := c.fillHeaderWithECParent(ctx, prm, header)
if err != nil {
return defaultRequest, fmt.Errorf("get EC parent header: %w", err)
}
reqProps := map[string]string{ reqProps := map[string]string{
nativeschema.PropertyKeyActorPublicKey: prm.SenderKey, nativeschema.PropertyKeyActorPublicKey: prm.SenderKey,
nativeschema.PropertyKeyActorRole: prm.Role, nativeschema.PropertyKeyActorRole: prm.Role,
@ -133,7 +140,6 @@ func (c *checkerImpl) newAPERequest(ctx context.Context, prm Prm) (aperequest.Re
reqProps[xheadKey] = xhead.GetValue() reqProps[xheadKey] = xhead.GetValue()
} }
var err error
reqProps, err = c.fillWithUserClaimTags(reqProps, prm) reqProps, err = c.fillWithUserClaimTags(reqProps, prm)
if err != nil { if err != nil {
return defaultRequest, err return defaultRequest, err
@ -155,35 +161,43 @@ func (c *checkerImpl) newAPERequest(ctx context.Context, prm Prm) (aperequest.Re
), nil ), nil
} }
func (c *checkerImpl) fillHeaderWithECParent(ctx context.Context, prm Prm, header *objectV2.Header) *objectV2.Header { func (c *checkerImpl) fillHeaderWithECParent(ctx context.Context, prm Prm, header *objectV2.Header) (*objectV2.Header, error) {
if header == nil { if header == nil {
return header return header, nil
} }
if header.GetEC() == nil { if header.GetEC() == nil {
return header return header, nil
}
if prm.Role == nativeschema.PropertyValueContainerRoleContainer ||
prm.Role == nativeschema.PropertyValueContainerRoleIR {
return header
} }
parentObjRefID := header.GetEC().Parent parentObjRefID := header.GetEC().Parent
if parentObjRefID == nil { if parentObjRefID == nil {
return header return nil, errECMissingParentObjectID
} }
var parentObjID oid.ID var parentObjID oid.ID
if err := parentObjID.ReadFromV2(*parentObjRefID); err != nil { if err := parentObjID.ReadFromV2(*parentObjRefID); err != nil {
return header return nil, fmt.Errorf("EC parent object ID format error: %w", err)
} }
// only container node have access to collect parent object // only container node have access to collect parent object
contNode, err := c.currentNodeIsContainerNode(prm.Container) contNode, err := c.currentNodeIsContainerNode(prm.Container)
if err != nil || !contNode { if err != nil {
return header return nil, fmt.Errorf("check container node status: %w", err)
}
if !contNode {
return header, nil
} }
parentObj, err := c.headerProvider.GetHeader(ctx, prm.Container, parentObjID, false) parentObj, err := c.headerProvider.GetHeader(ctx, prm.Container, parentObjID, false)
if err != nil { if err != nil {
return header if isLogicalError(err) {
return header, nil
}
return nil, fmt.Errorf("EC parent header request: %w", err)
} }
return parentObj.ToV2().GetHeader() return parentObj.ToV2().GetHeader(), nil
}
func isLogicalError(err error) bool {
var errObjRemoved *apistatus.ObjectAlreadyRemoved
var errObjNotFound *apistatus.ObjectNotFound
return errors.As(err, &errObjRemoved) || errors.As(err, &errObjNotFound)
} }
func (c *checkerImpl) currentNodeIsContainerNode(cnrID cid.ID) (bool, error) { func (c *checkerImpl) currentNodeIsContainerNode(cnrID cid.ID) (bool, error) {

View file

@ -25,7 +25,10 @@ import (
var _ transformer.ObjectWriter = (*ECWriter)(nil) var _ transformer.ObjectWriter = (*ECWriter)(nil)
var errUnsupportedECObject = errors.New("object is not supported for erasure coding") var (
errUnsupportedECObject = errors.New("object is not supported for erasure coding")
errFailedToSaveAllECParts = errors.New("failed to save all EC parts")
)
type ECWriter struct { type ECWriter struct {
Config *Config Config *Config
@ -37,10 +40,12 @@ type ECWriter struct {
ObjectMeta object.ContentMeta ObjectMeta object.ContentMeta
ObjectMetaValid bool ObjectMetaValid bool
remoteRequestSignKey *ecdsa.PrivateKey
} }
func (e *ECWriter) WriteObject(ctx context.Context, obj *objectSDK.Object) error { func (e *ECWriter) WriteObject(ctx context.Context, obj *objectSDK.Object) error {
relayed, err := e.relayIfNotContainerNode(ctx, obj) relayed, isContainerNode, err := e.relayIfNotContainerNode(ctx, obj)
if err != nil { if err != nil {
return err return err
} }
@ -60,23 +65,35 @@ func (e *ECWriter) WriteObject(ctx context.Context, obj *objectSDK.Object) error
e.ObjectMetaValid = true e.ObjectMetaValid = true
} }
if isContainerNode {
restoreTokens := e.CommonPrm.ForgetTokens()
defer restoreTokens()
// As request executed on container node, so sign request with container key.
e.remoteRequestSignKey, err = e.Config.KeyStorage.GetKey(nil)
if err != nil {
return err
}
} else {
e.remoteRequestSignKey = e.Key
}
if obj.ECHeader() != nil { if obj.ECHeader() != nil {
return e.writeECPart(ctx, obj) return e.writeECPart(ctx, obj)
} }
return e.writeRawObject(ctx, obj) return e.writeRawObject(ctx, obj)
} }
func (e *ECWriter) relayIfNotContainerNode(ctx context.Context, obj *objectSDK.Object) (bool, error) { func (e *ECWriter) relayIfNotContainerNode(ctx context.Context, obj *objectSDK.Object) (bool, bool, error) {
if e.Relay == nil {
return false, nil
}
currentNodeIsContainerNode, err := e.currentNodeIsContainerNode() currentNodeIsContainerNode, err := e.currentNodeIsContainerNode()
if err != nil { if err != nil {
return false, err return false, false, err
} }
if currentNodeIsContainerNode { if currentNodeIsContainerNode {
// object can be splitted or saved local // object can be splitted or saved local
return false, nil return false, true, nil
}
if e.Relay == nil {
return false, currentNodeIsContainerNode, nil
} }
objID := object.AddressOf(obj).Object() objID := object.AddressOf(obj).Object()
var index uint32 var index uint32
@ -85,9 +102,9 @@ func (e *ECWriter) relayIfNotContainerNode(ctx context.Context, obj *objectSDK.O
index = obj.ECHeader().Index() index = obj.ECHeader().Index()
} }
if err := e.relayToContainerNode(ctx, objID, index); err != nil { if err := e.relayToContainerNode(ctx, objID, index); err != nil {
return false, err return false, false, err
} }
return true, nil return true, currentNodeIsContainerNode, nil
} }
func (e *ECWriter) currentNodeIsContainerNode() (bool, error) { func (e *ECWriter) currentNodeIsContainerNode() (bool, error) {
@ -235,6 +252,13 @@ func (e *ECWriter) writeRawObject(ctx context.Context, obj *objectSDK.Object) er
singleErr: err, singleErr: err,
} }
} }
for idx := range partsProcessed {
if !partsProcessed[idx].Load() {
return errIncompletePut{
singleErr: errFailedToSaveAllECParts,
}
}
}
return nil return nil
} }
@ -284,7 +308,7 @@ func (e *ECWriter) writePart(ctx context.Context, obj *objectSDK.Object, partIdx
} }
// try to save to any node not visited by current part // try to save to any node not visited by current part
for i := range len(nodes) { for i := range nodes {
select { select {
case <-ctx.Done(): case <-ctx.Done():
return ctx.Err() return ctx.Err()
@ -338,7 +362,7 @@ func (e *ECWriter) writePartRemote(ctx context.Context, obj *objectSDK.Object, n
client.NodeInfoFromNetmapElement(&clientNodeInfo, node) client.NodeInfoFromNetmapElement(&clientNodeInfo, node)
remoteTaget := remoteWriter{ remoteTaget := remoteWriter{
privateKey: e.Key, privateKey: e.remoteRequestSignKey,
clientConstructor: e.Config.ClientConstructor, clientConstructor: e.Config.ClientConstructor,
commonPrm: e.CommonPrm, commonPrm: e.CommonPrm,
nodeInfo: clientNodeInfo, nodeInfo: clientNodeInfo,

View file

@ -14,6 +14,7 @@ import (
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/client" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/client"
netmapcore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/netmap" netmapcore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/netmap"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/network" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/network"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/util"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object_manager/placement" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object_manager/placement"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/checksum" "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/checksum"
@ -127,6 +128,8 @@ func TestECWriter(t *testing.T) {
ownerKey, err := keys.NewPrivateKey() ownerKey, err := keys.NewPrivateKey()
require.NoError(t, err) require.NoError(t, err)
nodeKey, err := keys.NewPrivateKey()
require.NoError(t, err)
pool, err := ants.NewPool(4, ants.WithNonblocking(true)) pool, err := ants.NewPool(4, ants.WithNonblocking(true))
require.NoError(t, err) require.NoError(t, err)
@ -141,6 +144,7 @@ func TestECWriter(t *testing.T) {
RemotePool: pool, RemotePool: pool,
Logger: log, Logger: log,
ClientConstructor: clientConstructor{vectors: ns}, ClientConstructor: clientConstructor{vectors: ns},
KeyStorage: util.NewKeyStorage(&nodeKey.PrivateKey, nil, nil),
}, },
PlacementOpts: append( PlacementOpts: append(
[]placement.Option{placement.UseBuilder(builder), placement.ForContainer(cnr)}, []placement.Option{placement.UseBuilder(builder), placement.ForContainer(cnr)},

View file

@ -113,10 +113,9 @@ func (s *Streamer) init(ctx context.Context, req *objectV2.PatchRequest) error {
oV2.GetHeader().SetOwnerID(ownerID) oV2.GetHeader().SetOwnerID(ownerID)
target, err := target.New(objectwriter.Params{ target, err := target.New(objectwriter.Params{
Config: s.Config, Config: s.Config,
Common: commonPrm, Common: commonPrm,
Header: objectSDK.NewFromV2(oV2), Header: objectSDK.NewFromV2(oV2),
SignRequestPrivateKey: s.localNodeKey,
}) })
if err != nil { if err != nil {
return fmt.Errorf("target creation: %w", err) return fmt.Errorf("target creation: %w", err)

View file

@ -100,11 +100,18 @@ func (p *CommonPrm) SetNetmapLookupDepth(v uint64) {
// ForgetTokens forgets all the tokens read from the request's // ForgetTokens forgets all the tokens read from the request's
// meta information before. // meta information before.
func (p *CommonPrm) ForgetTokens() { func (p *CommonPrm) ForgetTokens() func() {
if p != nil { if p != nil {
tk := p.token
br := p.bearer
p.token = nil p.token = nil
p.bearer = nil p.bearer = nil
return func() {
p.token = tk
p.bearer = br
}
} }
return func() {}
} }
func CommonPrmFromV2(req interface { func CommonPrmFromV2(req interface {

View file

@ -3,6 +3,7 @@ package placement
import ( import (
"crypto/sha256" "crypto/sha256"
"fmt" "fmt"
"slices"
"sync" "sync"
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id" cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
@ -44,7 +45,7 @@ func (c *ContainerNodesCache) ContainerNodes(nm *netmapSDK.NetMap, cnr cid.ID, p
raw, ok := c.containerCache.Get(cnr) raw, ok := c.containerCache.Get(cnr)
c.mtx.Unlock() c.mtx.Unlock()
if ok { if ok {
return raw, nil return c.cloneResult(raw), nil
} }
} else { } else {
c.lastEpoch = nm.Epoch() c.lastEpoch = nm.Epoch()
@ -65,5 +66,13 @@ func (c *ContainerNodesCache) ContainerNodes(nm *netmapSDK.NetMap, cnr cid.ID, p
c.containerCache.Add(cnr, cn) c.containerCache.Add(cnr, cn)
} }
c.mtx.Unlock() c.mtx.Unlock()
return cn, nil return c.cloneResult(cn), nil
}
func (c *ContainerNodesCache) cloneResult(nodes [][]netmapSDK.NodeInfo) [][]netmapSDK.NodeInfo {
result := make([][]netmapSDK.NodeInfo, len(nodes))
for repIdx := range nodes {
result[repIdx] = slices.Clone(nodes[repIdx])
}
return result
} }

View file

@ -0,0 +1,43 @@
package placement
import (
"errors"
"strings"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/netmap"
)
const (
attrPrefix = "$attribute:"
)
type Metric interface {
CalculateValue(*netmap.NodeInfo, *netmap.NodeInfo) int
}
func ParseMetric(raw string) (Metric, error) {
if attr, found := strings.CutPrefix(raw, attrPrefix); found {
return NewAttributeMetric(attr), nil
}
return nil, errors.New("unsupported priority metric")
}
// attributeMetric describes priority metric based on attribute.
type attributeMetric struct {
attribute string
}
// CalculateValue return [0] if from and to contains attribute attributeMetric.attribute and
// the value of attribute is the same. In other case return [1].
func (am *attributeMetric) CalculateValue(from *netmap.NodeInfo, to *netmap.NodeInfo) int {
fromAttr := from.Attribute(am.attribute)
toAttr := to.Attribute(am.attribute)
if len(fromAttr) > 0 && len(toAttr) > 0 && fromAttr == toAttr {
return 0
}
return 1
}
func NewAttributeMetric(attr string) Metric {
return &attributeMetric{attribute: attr}
}

View file

@ -3,6 +3,7 @@ package placement
import ( import (
"errors" "errors"
"fmt" "fmt"
"slices"
"sync" "sync"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/network" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/network"
@ -23,6 +24,11 @@ type Builder interface {
BuildPlacement(cid.ID, *oid.ID, netmap.PlacementPolicy) ([][]netmap.NodeInfo, error) BuildPlacement(cid.ID, *oid.ID, netmap.PlacementPolicy) ([][]netmap.NodeInfo, error)
} }
type NodeState interface {
// LocalNodeInfo return current node state in FrostFS API v2 NodeInfo structure.
LocalNodeInfo() *netmap.NodeInfo
}
// Option represents placement traverser option. // Option represents placement traverser option.
type Option func(*cfg) type Option func(*cfg)
@ -50,6 +56,10 @@ type cfg struct {
policy netmap.PlacementPolicy policy netmap.PlacementPolicy
builder Builder builder Builder
metrics []Metric
nodeState NodeState
} }
const invalidOptsMsg = "invalid traverser options" const invalidOptsMsg = "invalid traverser options"
@ -99,7 +109,22 @@ func NewTraverser(opts ...Option) (*Traverser, error) {
} }
var rem []int var rem []int
if cfg.flatSuccess != nil { if len(cfg.metrics) > 0 && cfg.nodeState != nil {
rem = defaultCopiesVector(cfg.policy)
var unsortedVector []netmap.NodeInfo
var regularVector []netmap.NodeInfo
for i := range rem {
unsortedVector = append(unsortedVector, ns[i][:rem[i]]...)
regularVector = append(regularVector, ns[i][rem[i]:]...)
}
rem = []int{-1, -1}
sortedVector, err := sortVector(cfg, unsortedVector)
if err != nil {
return nil, err
}
ns = [][]netmap.NodeInfo{sortedVector, regularVector}
} else if cfg.flatSuccess != nil {
ns = flatNodes(ns) ns = flatNodes(ns)
rem = []int{int(*cfg.flatSuccess)} rem = []int{int(*cfg.flatSuccess)}
} else { } else {
@ -157,6 +182,35 @@ func flatNodes(ns [][]netmap.NodeInfo) [][]netmap.NodeInfo {
return [][]netmap.NodeInfo{flat} return [][]netmap.NodeInfo{flat}
} }
type nodeMetrics struct {
index int
metrics []int
}
func sortVector(cfg *cfg, unsortedVector []netmap.NodeInfo) ([]netmap.NodeInfo, error) {
nm := make([]nodeMetrics, len(unsortedVector))
node := cfg.nodeState.LocalNodeInfo()
for i := range unsortedVector {
m := make([]int, len(cfg.metrics))
for j, pm := range cfg.metrics {
m[j] = pm.CalculateValue(node, &unsortedVector[i])
}
nm[i] = nodeMetrics{
index: i,
metrics: m,
}
}
slices.SortFunc(nm, func(a, b nodeMetrics) int {
return slices.Compare(a.metrics, b.metrics)
})
sortedVector := make([]netmap.NodeInfo, len(unsortedVector))
for i := range unsortedVector {
sortedVector[i] = unsortedVector[nm[i].index]
}
return sortedVector, nil
}
// Node is a descriptor of storage node with information required for intra-container communication. // Node is a descriptor of storage node with information required for intra-container communication.
type Node struct { type Node struct {
addresses network.AddressGroup addresses network.AddressGroup
@ -322,3 +376,17 @@ func WithCopyNumbers(v []uint32) Option {
c.copyNumbers = v c.copyNumbers = v
} }
} }
// WithPriorityMetrics use provided priority metrics to sort nodes.
func WithPriorityMetrics(m []Metric) Option {
return func(c *cfg) {
c.metrics = m
}
}
// WithNodeState provide state of the current node.
func WithNodeState(s NodeState) Option {
return func(c *cfg) {
c.nodeState = s
}
}

View file

@ -22,7 +22,9 @@ func (b testBuilder) BuildPlacement(cid.ID, *oid.ID, netmap.PlacementPolicy) ([]
} }
func testNode(v uint32) (n netmap.NodeInfo) { func testNode(v uint32) (n netmap.NodeInfo) {
n.SetNetworkEndpoints("/ip4/0.0.0.0/tcp/" + strconv.Itoa(int(v))) ip := "/ip4/0.0.0.0/tcp/" + strconv.Itoa(int(v))
n.SetNetworkEndpoints(ip)
n.SetPublicKey([]byte(ip))
return n return n
} }
@ -40,7 +42,15 @@ func copyVectors(v [][]netmap.NodeInfo) [][]netmap.NodeInfo {
return vc return vc
} }
func testPlacement(ss, rs []int) ([][]netmap.NodeInfo, container.Container) { func testPlacement(ss []int, rs []int) ([][]netmap.NodeInfo, container.Container) {
return placement(ss, rs, nil)
}
func testECPlacement(ss []int, ec [][]int) ([][]netmap.NodeInfo, container.Container) {
return placement(ss, nil, ec)
}
func placement(ss []int, rs []int, ec [][]int) ([][]netmap.NodeInfo, container.Container) {
nodes := make([][]netmap.NodeInfo, 0, len(rs)) nodes := make([][]netmap.NodeInfo, 0, len(rs))
replicas := make([]netmap.ReplicaDescriptor, 0, len(rs)) replicas := make([]netmap.ReplicaDescriptor, 0, len(rs))
num := uint32(0) num := uint32(0)
@ -56,7 +66,12 @@ func testPlacement(ss, rs []int) ([][]netmap.NodeInfo, container.Container) {
nodes = append(nodes, ns) nodes = append(nodes, ns)
var rd netmap.ReplicaDescriptor var rd netmap.ReplicaDescriptor
rd.SetNumberOfObjects(uint32(rs[i])) if len(rs) > 0 {
rd.SetNumberOfObjects(uint32(rs[i]))
} else {
rd.SetECDataCount(uint32(ec[i][0]))
rd.SetECParityCount(uint32(ec[i][1]))
}
replicas = append(replicas, rd) replicas = append(replicas, rd)
} }
@ -134,7 +149,7 @@ func TestTraverserObjectScenarios(t *testing.T) {
err = n.FromIterator(netmapcore.Node(nodes[1][0])) err = n.FromIterator(netmapcore.Node(nodes[1][0]))
require.NoError(t, err) require.NoError(t, err)
require.Equal(t, []Node{{addresses: n}}, tr.Next()) require.Equal(t, []Node{{addresses: n, key: []byte("/ip4/0.0.0.0/tcp/5")}}, tr.Next())
}) })
t.Run("put scenario", func(t *testing.T) { t.Run("put scenario", func(t *testing.T) {
@ -275,3 +290,268 @@ func TestTraverserRemValues(t *testing.T) {
}) })
} }
} }
type nodeState struct {
node *netmap.NodeInfo
}
func (n *nodeState) LocalNodeInfo() *netmap.NodeInfo {
return n.node
}
func TestTraverserPriorityMetrics(t *testing.T) {
t.Run("one rep one metric", func(t *testing.T) {
selectors := []int{4}
replicas := []int{3}
nodes, cnr := testPlacement(selectors, replicas)
// Node_0, PK - ip4/0.0.0.0/tcp/0
nodes[0][0].SetAttribute("ClusterName", "A")
// Node_1, PK - ip4/0.0.0.0/tcp/1
nodes[0][1].SetAttribute("ClusterName", "A")
// Node_2, PK - ip4/0.0.0.0/tcp/2
nodes[0][2].SetAttribute("ClusterName", "B")
// Node_3, PK - ip4/0.0.0.0/tcp/3
nodes[0][3].SetAttribute("ClusterName", "B")
sdkNode := testNode(5)
sdkNode.SetAttribute("ClusterName", "B")
nodesCopy := copyVectors(nodes)
m := []Metric{NewAttributeMetric("ClusterName")}
tr, err := NewTraverser(
ForContainer(cnr),
UseBuilder(&testBuilder{
vectors: nodesCopy,
}),
WithoutSuccessTracking(),
WithPriorityMetrics(m),
WithNodeState(&nodeState{
node: &sdkNode,
}),
)
require.NoError(t, err)
// Without priority metric `ClusterName` the order will be:
// [ {Node_0 A}, {Node_1 A}, {Node_2 B}, {Node_3 B}]
// With priority metric `ClusterName` and current node in cluster B
// the order should be:
// [ {Node_2 B}, {Node_0 A}, {Node_1 A}, {Node_3 B}]
next := tr.Next()
require.NotNil(t, next)
require.Equal(t, 3, len(next))
require.Equal(t, "/ip4/0.0.0.0/tcp/2", string(next[0].PublicKey()))
require.Equal(t, "/ip4/0.0.0.0/tcp/0", string(next[1].PublicKey()))
require.Equal(t, "/ip4/0.0.0.0/tcp/1", string(next[2].PublicKey()))
next = tr.Next()
// The last node is
require.Equal(t, 1, len(next))
require.Equal(t, "/ip4/0.0.0.0/tcp/3", string(next[0].PublicKey()))
next = tr.Next()
require.Nil(t, next)
})
t.Run("two reps two metrics", func(t *testing.T) {
selectors := []int{3, 3}
replicas := []int{2, 2}
nodes, cnr := testPlacement(selectors, replicas)
// REPLICA #1
// Node_0, PK - ip4/0.0.0.0/tcp/0
nodes[0][0].SetAttribute("ClusterName", "A")
nodes[0][0].SetAttribute("UN-LOCODE", "RU LED")
// Node_1, PK - ip4/0.0.0.0/tcp/1
nodes[0][1].SetAttribute("ClusterName", "A")
nodes[0][1].SetAttribute("UN-LOCODE", "FI HEL")
// Node_2, PK - ip4/0.0.0.0/tcp/2
nodes[0][2].SetAttribute("ClusterName", "A")
nodes[0][2].SetAttribute("UN-LOCODE", "RU LED")
// REPLICA #2
// Node_3 ip4/0.0.0.0/tcp/3
nodes[1][0].SetAttribute("ClusterName", "B")
nodes[1][0].SetAttribute("UN-LOCODE", "RU MOW")
// Node_4, PK - ip4/0.0.0.0/tcp/4
nodes[1][1].SetAttribute("ClusterName", "B")
nodes[1][1].SetAttribute("UN-LOCODE", "RU DME")
// Node_5, PK - ip4/0.0.0.0/tcp/5
nodes[1][2].SetAttribute("ClusterName", "B")
nodes[1][2].SetAttribute("UN-LOCODE", "RU MOW")
sdkNode := testNode(9)
sdkNode.SetAttribute("ClusterName", "B")
sdkNode.SetAttribute("UN-LOCODE", "RU DME")
nodesCopy := copyVectors(nodes)
m := []Metric{
NewAttributeMetric("ClusterName"),
NewAttributeMetric("UN-LOCODE"),
}
tr, err := NewTraverser(
ForContainer(cnr),
UseBuilder(&testBuilder{
vectors: nodesCopy,
}),
WithoutSuccessTracking(),
WithPriorityMetrics(m),
WithNodeState(&nodeState{
node: &sdkNode,
}),
)
require.NoError(t, err)
// Check that nodes in the same cluster and
// in the same location should be the first in slice.
// Nodes which are follow criteria but stay outside the replica
// should be in the next slice.
next := tr.Next()
require.Equal(t, 4, len(next))
require.Equal(t, "/ip4/0.0.0.0/tcp/4", string(next[0].PublicKey()))
require.Equal(t, "/ip4/0.0.0.0/tcp/3", string(next[1].PublicKey()))
require.Equal(t, "/ip4/0.0.0.0/tcp/0", string(next[2].PublicKey()))
require.Equal(t, "/ip4/0.0.0.0/tcp/1", string(next[3].PublicKey()))
next = tr.Next()
require.Equal(t, 2, len(next))
require.Equal(t, "/ip4/0.0.0.0/tcp/2", string(next[0].PublicKey()))
require.Equal(t, "/ip4/0.0.0.0/tcp/5", string(next[1].PublicKey()))
next = tr.Next()
require.Nil(t, next)
sdkNode.SetAttribute("ClusterName", "B")
sdkNode.SetAttribute("UN-LOCODE", "RU MOW")
nodesCopy = copyVectors(nodes)
tr, err = NewTraverser(
ForContainer(cnr),
UseBuilder(&testBuilder{
vectors: nodesCopy,
}),
WithoutSuccessTracking(),
WithPriorityMetrics(m),
WithNodeState(&nodeState{
node: &sdkNode,
}),
)
require.NoError(t, err)
next = tr.Next()
require.Equal(t, 4, len(next))
require.Equal(t, "/ip4/0.0.0.0/tcp/3", string(next[0].PublicKey()))
require.Equal(t, "/ip4/0.0.0.0/tcp/4", string(next[1].PublicKey()))
require.Equal(t, "/ip4/0.0.0.0/tcp/0", string(next[2].PublicKey()))
require.Equal(t, "/ip4/0.0.0.0/tcp/1", string(next[3].PublicKey()))
next = tr.Next()
require.Equal(t, 2, len(next))
require.Equal(t, "/ip4/0.0.0.0/tcp/2", string(next[0].PublicKey()))
require.Equal(t, "/ip4/0.0.0.0/tcp/5", string(next[1].PublicKey()))
next = tr.Next()
require.Nil(t, next)
sdkNode.SetAttribute("ClusterName", "A")
sdkNode.SetAttribute("UN-LOCODE", "RU LED")
nodesCopy = copyVectors(nodes)
tr, err = NewTraverser(
ForContainer(cnr),
UseBuilder(&testBuilder{
vectors: nodesCopy,
}),
WithoutSuccessTracking(),
WithPriorityMetrics(m),
WithNodeState(&nodeState{
node: &sdkNode,
}),
)
require.NoError(t, err)
next = tr.Next()
require.Equal(t, 4, len(next))
require.Equal(t, "/ip4/0.0.0.0/tcp/0", string(next[0].PublicKey()))
require.Equal(t, "/ip4/0.0.0.0/tcp/1", string(next[1].PublicKey()))
require.Equal(t, "/ip4/0.0.0.0/tcp/3", string(next[2].PublicKey()))
require.Equal(t, "/ip4/0.0.0.0/tcp/4", string(next[3].PublicKey()))
next = tr.Next()
require.Equal(t, 2, len(next))
require.Equal(t, "/ip4/0.0.0.0/tcp/2", string(next[0].PublicKey()))
require.Equal(t, "/ip4/0.0.0.0/tcp/5", string(next[1].PublicKey()))
next = tr.Next()
require.Nil(t, next)
})
t.Run("ec container", func(t *testing.T) {
selectors := []int{4}
ec := [][]int{{2, 1}}
nodes, cnr := testECPlacement(selectors, ec)
// Node_0, PK - ip4/0.0.0.0/tcp/0
nodes[0][0].SetAttribute("ClusterName", "A")
// Node_1, PK - ip4/0.0.0.0/tcp/1
nodes[0][1].SetAttribute("ClusterName", "A")
// Node_2, PK - ip4/0.0.0.0/tcp/2
nodes[0][2].SetAttribute("ClusterName", "B")
// Node_3, PK - ip4/0.0.0.0/tcp/3
nodes[0][3].SetAttribute("ClusterName", "B")
sdkNode := testNode(5)
sdkNode.SetAttribute("ClusterName", "B")
nodesCopy := copyVectors(nodes)
m := []Metric{NewAttributeMetric("ClusterName")}
tr, err := NewTraverser(
ForContainer(cnr),
UseBuilder(&testBuilder{
vectors: nodesCopy,
}),
WithoutSuccessTracking(),
WithPriorityMetrics(m),
WithNodeState(&nodeState{
node: &sdkNode,
}),
)
require.NoError(t, err)
// Without priority metric `ClusterName` the order will be:
// [ {Node_0 A}, {Node_1 A}, {Node_2 B}, {Node_3 B}]
// With priority metric `ClusterName` and current node in cluster B
// the order should be:
// [ {Node_2 B}, {Node_0 A}, {Node_1 A}, {Node_3 B}]
next := tr.Next()
require.NotNil(t, next)
require.Equal(t, 3, len(next))
require.Equal(t, "/ip4/0.0.0.0/tcp/2", string(next[0].PublicKey()))
require.Equal(t, "/ip4/0.0.0.0/tcp/0", string(next[1].PublicKey()))
require.Equal(t, "/ip4/0.0.0.0/tcp/1", string(next[2].PublicKey()))
next = tr.Next()
// The last node is
require.Equal(t, 1, len(next))
require.Equal(t, "/ip4/0.0.0.0/tcp/3", string(next[0].PublicKey()))
next = tr.Next()
require.Nil(t, next)
})
}

View file

@ -9,14 +9,25 @@ import (
objectcore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object" objectcore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object"
policycore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/policy" policycore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/policy"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/replicator" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/replicator"
"git.frostfs.info/TrueCloudLab/frostfs-observability/tracing"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client" "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/netmap" "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/netmap"
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object" objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id" oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace"
"go.uber.org/zap" "go.uber.org/zap"
) )
func (p *Policer) processObject(ctx context.Context, objInfo objectcore.Info) error { func (p *Policer) processObject(ctx context.Context, objInfo objectcore.Info) error {
ctx, span := tracing.StartSpanFromContext(ctx, "Policer.ProcessObject", trace.WithAttributes(
attribute.String("address", objInfo.Address.String()),
attribute.Bool("is_linking_object", objInfo.IsLinkingObject),
attribute.Bool("is_ec_part", objInfo.ECInfo != nil),
attribute.String("type", objInfo.Type.String()),
))
defer span.End()
cnr, err := p.cnrSrc.Get(objInfo.Address.Container()) cnr, err := p.cnrSrc.Get(objInfo.Address.Container())
if err != nil { if err != nil {
if client.IsErrContainerNotFound(err) { if client.IsErrContainerNotFound(err) {

View file

@ -0,0 +1,207 @@
package tree
import (
"context"
"encoding/hex"
"fmt"
"testing"
"git.frostfs.info/TrueCloudLab/frostfs-contract/frostfsid/client"
core "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/container"
frostfsidcore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/frostfsid"
checkercore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/common/ape"
containerSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/acl"
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
"git.frostfs.info/TrueCloudLab/policy-engine/pkg/chain"
"git.frostfs.info/TrueCloudLab/policy-engine/pkg/engine"
"git.frostfs.info/TrueCloudLab/policy-engine/pkg/engine/inmemory"
nativeschema "git.frostfs.info/TrueCloudLab/policy-engine/schema/native"
"github.com/nspcc-dev/neo-go/pkg/crypto/keys"
"github.com/nspcc-dev/neo-go/pkg/util"
"github.com/stretchr/testify/require"
)
var (
containerID = "73tQMTYyUkTgmvPR1HWib6pndbhSoBovbnMF7Pws8Rcy"
senderPrivateKey, _ = keys.NewPrivateKey()
senderKey = hex.EncodeToString(senderPrivateKey.PublicKey().Bytes())
rootCnr = &core.Container{Value: containerSDK.Container{}}
)
type frostfsIDProviderMock struct {
subjects map[util.Uint160]*client.Subject
subjectsExtended map[util.Uint160]*client.SubjectExtended
}
func (f *frostfsIDProviderMock) GetSubject(key util.Uint160) (*client.Subject, error) {
v, ok := f.subjects[key]
if !ok {
return nil, fmt.Errorf("%s", frostfsidcore.SubjectNotFoundErrorMessage)
}
return v, nil
}
func (f *frostfsIDProviderMock) GetSubjectExtended(key util.Uint160) (*client.SubjectExtended, error) {
v, ok := f.subjectsExtended[key]
if !ok {
return nil, fmt.Errorf("%s", frostfsidcore.SubjectNotFoundErrorMessage)
}
return v, nil
}
var _ frostfsidcore.SubjectProvider = (*frostfsIDProviderMock)(nil)
func newFrostfsIDProviderMock(t *testing.T) *frostfsIDProviderMock {
return &frostfsIDProviderMock{
subjects: map[util.Uint160]*client.Subject{
scriptHashFromSenderKey(t, senderKey): {
Namespace: "testnamespace",
Name: "test",
KV: map[string]string{
"tag-attr1": "value1",
"tag-attr2": "value2",
},
},
},
subjectsExtended: map[util.Uint160]*client.SubjectExtended{
scriptHashFromSenderKey(t, senderKey): {
Namespace: "testnamespace",
Name: "test",
KV: map[string]string{
"tag-attr1": "value1",
"tag-attr2": "value2",
},
Groups: []*client.Group{
{
ID: 1,
Name: "test",
Namespace: "testnamespace",
KV: map[string]string{
"attr1": "value1",
"attr2": "value2",
},
},
},
},
},
}
}
func scriptHashFromSenderKey(t *testing.T, senderKey string) util.Uint160 {
pk, err := keys.NewPublicKeyFromString(senderKey)
require.NoError(t, err)
return pk.GetScriptHash()
}
type stMock struct{}
func (m *stMock) CurrentEpoch() uint64 {
return 8
}
func TestCheckAPE(t *testing.T) {
cid := cid.ID{}
_ = cid.DecodeString(containerID)
t.Run("put non-tombstone rule won't affect tree remove", func(t *testing.T) {
los := inmemory.NewInmemoryLocalStorage()
mcs := inmemory.NewInmemoryMorphRuleChainStorage()
fid := newFrostfsIDProviderMock(t)
s := Service{
cfg: cfg{
frostfsidSubjectProvider: fid,
},
apeChecker: checkercore.New(los, mcs, fid, &stMock{}),
}
los.AddOverride(chain.Ingress, engine.ContainerTarget(containerID), &chain.Chain{
Rules: []chain.Rule{
{
Status: chain.AccessDenied,
Actions: chain.Actions{Names: []string{nativeschema.MethodPutObject}},
Resources: chain.Resources{
Names: []string{fmt.Sprintf(nativeschema.ResourceFormatRootContainerObjects, containerID)},
},
Condition: []chain.Condition{
{
Op: chain.CondStringNotEquals,
Kind: chain.KindResource,
Key: nativeschema.PropertyKeyObjectType,
Value: "TOMBSTONE",
},
},
},
},
MatchType: chain.MatchTypeFirstMatch,
})
mcs.AddMorphRuleChain(chain.Ingress, engine.ContainerTarget(containerID), &chain.Chain{
Rules: []chain.Rule{
{
Status: chain.Allow,
Actions: chain.Actions{Names: []string{nativeschema.MethodDeleteObject}},
Resources: chain.Resources{
Names: []string{fmt.Sprintf(nativeschema.ResourceFormatRootContainerObjects, containerID)},
},
},
},
MatchType: chain.MatchTypeFirstMatch,
})
err := s.checkAPE(context.Background(), nil, rootCnr, cid, acl.OpObjectDelete, acl.RoleOwner, senderPrivateKey.PublicKey())
require.NoError(t, err)
})
t.Run("delete rule won't affect tree add", func(t *testing.T) {
los := inmemory.NewInmemoryLocalStorage()
mcs := inmemory.NewInmemoryMorphRuleChainStorage()
fid := newFrostfsIDProviderMock(t)
s := Service{
cfg: cfg{
frostfsidSubjectProvider: fid,
},
apeChecker: checkercore.New(los, mcs, fid, &stMock{}),
}
los.AddOverride(chain.Ingress, engine.ContainerTarget(containerID), &chain.Chain{
Rules: []chain.Rule{
{
Status: chain.AccessDenied,
Actions: chain.Actions{Names: []string{nativeschema.MethodDeleteObject}},
Resources: chain.Resources{
Names: []string{fmt.Sprintf(nativeschema.ResourceFormatRootContainerObjects, containerID)},
},
},
},
MatchType: chain.MatchTypeFirstMatch,
})
mcs.AddMorphRuleChain(chain.Ingress, engine.ContainerTarget(containerID), &chain.Chain{
Rules: []chain.Rule{
{
Status: chain.Allow,
Actions: chain.Actions{Names: []string{nativeschema.MethodPutObject}},
Resources: chain.Resources{
Names: []string{fmt.Sprintf(nativeschema.ResourceFormatRootContainerObjects, containerID)},
},
Condition: []chain.Condition{
{
Op: chain.CondStringNotEquals,
Kind: chain.KindResource,
Key: nativeschema.PropertyKeyObjectType,
Value: "TOMBSTONE",
},
},
},
},
MatchType: chain.MatchTypeFirstMatch,
})
err := s.checkAPE(context.Background(), nil, rootCnr, cid, acl.OpObjectPut, acl.RoleOwner, senderPrivateKey.PublicKey())
require.NoError(t, err)
})
}

View file

@ -5,7 +5,7 @@ import (
"context" "context"
"errors" "errors"
"fmt" "fmt"
"sort" "slices"
"sync" "sync"
"sync/atomic" "sync/atomic"
@ -210,7 +210,7 @@ func (s *Service) Remove(ctx context.Context, req *RemoveRequest) (*RemoveRespon
return nil, err return nil, err
} }
err := s.verifyClient(ctx, req, cid, b.GetBearerToken(), acl.OpObjectPut) err := s.verifyClient(ctx, req, cid, b.GetBearerToken(), acl.OpObjectDelete)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -575,10 +575,9 @@ func sortByFilename(nodes []pilorama.NodeInfo, d GetSubTreeRequest_Body_Order_Di
if len(nodes) == 0 { if len(nodes) == 0 {
return nodes, nil return nodes, nil
} }
less := func(i, j int) bool { slices.SortFunc(nodes, func(a, b pilorama.NodeInfo) int {
return bytes.Compare(nodes[i].Meta.GetAttr(pilorama.AttributeFilename), nodes[j].Meta.GetAttr(pilorama.AttributeFilename)) < 0 return bytes.Compare(a.Meta.GetAttr(pilorama.AttributeFilename), b.Meta.GetAttr(pilorama.AttributeFilename))
} })
sort.Slice(nodes, less)
return nodes, nil return nodes, nil
default: default:
return nil, fmt.Errorf("unsupported order direction: %s", d.String()) return nil, fmt.Errorf("unsupported order direction: %s", d.String())

View file

@ -3,8 +3,14 @@ package httputil
import ( import (
"net/http" "net/http"
"net/http/pprof" "net/http/pprof"
"github.com/felixge/fgprof"
) )
func init() {
http.DefaultServeMux.Handle("/debug/fgprof", fgprof.Handler())
}
// initializes pprof package in order to // initializes pprof package in order to
// register Prometheus handlers on http.DefaultServeMux. // register Prometheus handlers on http.DefaultServeMux.
var _ = pprof.Handler("") var _ = pprof.Handler("")

View file

@ -19,7 +19,7 @@ import (
func GeneratePayloadPool(count uint, size uint) [][]byte { func GeneratePayloadPool(count uint, size uint) [][]byte {
var pool [][]byte var pool [][]byte
for i := uint(0); i < count; i++ { for range count {
payload := make([]byte, size) payload := make([]byte, size)
_, _ = rand.Read(payload) _, _ = rand.Read(payload)
@ -30,8 +30,8 @@ func GeneratePayloadPool(count uint, size uint) [][]byte {
func GenerateAttributePool(count uint) []objectSDK.Attribute { func GenerateAttributePool(count uint) []objectSDK.Attribute {
var pool []objectSDK.Attribute var pool []objectSDK.Attribute
for i := uint(0); i < count; i++ { for i := range count {
for j := uint(0); j < count; j++ { for j := range count {
attr := *objectSDK.NewAttribute() attr := *objectSDK.NewAttribute()
attr.SetKey(fmt.Sprintf("key%d", i)) attr.SetKey(fmt.Sprintf("key%d", i))
attr.SetValue(fmt.Sprintf("value%d", j)) attr.SetValue(fmt.Sprintf("value%d", j))
@ -43,7 +43,7 @@ func GenerateAttributePool(count uint) []objectSDK.Attribute {
func GenerateOwnerPool(count uint) []user.ID { func GenerateOwnerPool(count uint) []user.ID {
var pool []user.ID var pool []user.ID
for i := uint(0); i < count; i++ { for range count {
pool = append(pool, usertest.ID()) pool = append(pool, usertest.ID())
} }
return pool return pool
@ -118,7 +118,7 @@ func WithPayloadFromPool(pool [][]byte) ObjectOption {
func WithAttributesFromPool(pool []objectSDK.Attribute, count uint) ObjectOption { func WithAttributesFromPool(pool []objectSDK.Attribute, count uint) ObjectOption {
return func(obj *objectSDK.Object) { return func(obj *objectSDK.Object) {
var attrs []objectSDK.Attribute var attrs []objectSDK.Attribute
for i := uint(0); i < count; i++ { for range count {
attrs = append(attrs, pool[rand.Intn(len(pool))]) attrs = append(attrs, pool[rand.Intn(len(pool))])
} }
obj.SetAttributes(attrs...) obj.SetAttributes(attrs...)

View file

@ -29,7 +29,7 @@ func PopulateWithObjects(
) { ) {
digits := "0123456789" digits := "0123456789"
for i := uint(0); i < count; i++ { for range count {
obj := factory() obj := factory()
id := []byte(fmt.Sprintf( id := []byte(fmt.Sprintf(
@ -59,7 +59,7 @@ func PopulateWithBigObjects(
count uint, count uint,
factory func() *objectSDK.Object, factory func() *objectSDK.Object,
) { ) {
for i := uint(0); i < count; i++ { for range count {
group.Go(func() error { group.Go(func() error {
if err := populateWithBigObject(ctx, db, factory); err != nil { if err := populateWithBigObject(ctx, db, factory); err != nil {
return fmt.Errorf("couldn't put a big object: %w", err) return fmt.Errorf("couldn't put a big object: %w", err)
@ -154,7 +154,7 @@ func PopulateGraveyard(
wg := &sync.WaitGroup{} wg := &sync.WaitGroup{}
wg.Add(int(count)) wg.Add(int(count))
for i := uint(0); i < count; i++ { for range count {
obj := factory() obj := factory()
prm := meta.PutPrm{} prm := meta.PutPrm{}
@ -226,7 +226,7 @@ func PopulateLocked(
wg := &sync.WaitGroup{} wg := &sync.WaitGroup{}
wg.Add(int(count)) wg.Add(int(count))
for i := uint(0); i < count; i++ { for range count {
defer wg.Done() defer wg.Done()
obj := factory() obj := factory()

View file

@ -116,7 +116,7 @@ func populate() (err error) {
eg, ctx := errgroup.WithContext(ctx) eg, ctx := errgroup.WithContext(ctx)
eg.SetLimit(int(jobs)) eg.SetLimit(int(jobs))
for i := uint(0); i < numContainers; i++ { for range numContainers {
cid := cidtest.ID() cid := cidtest.ID()
for _, typ := range types { for _, typ := range types {