Compare commits

...

16 commits

Author SHA1 Message Date
d19ab43500
[#1462] node: Add off-cpu profiler
Signed-off-by: Dmitrii Stepanov <d.stepanov@yadro.com>
2024-10-31 11:32:13 +03:00
5bcf81d1cc
[#1466] Remove woodpecker CI
We use forgejo actions now.

Signed-off-by: Evgenii Stratonikov <e.stratonikov@yadro.com>
2024-10-31 10:07:33 +03:00
c2effcc61c [#1465] Makefile: Update golangci-lint, fix warnings
Signed-off-by: Evgenii Stratonikov <e.stratonikov@yadro.com>
2024-10-31 06:39:59 +00:00
2285cfc36f
[#1464] frostfsid: Cache subject not found error
Signed-off-by: Dmitrii Stepanov <d.stepanov@yadro.com>
2024-10-30 18:27:33 +03:00
e74d05c03f
[#1464] frostfsid: Add cache metrics
Signed-off-by: Dmitrii Stepanov <d.stepanov@yadro.com>
2024-10-30 18:27:32 +03:00
48862e0e63 [#1459] .golanci.yml: Add tenv linter, fix issues
Refs #1309

Signed-off-by: Evgenii Stratonikov <e.stratonikov@yadro.com>
2024-10-30 15:18:22 +00:00
89892d9754 [#1459] cli: Simplify slice append
Signed-off-by: Evgenii Stratonikov <e.stratonikov@yadro.com>
2024-10-30 15:18:22 +00:00
7ac0852364 [#1459] .golangci.yml: Add intrange linter, fix issues
Signed-off-by: Evgenii Stratonikov <e.stratonikov@yadro.com>
2024-10-30 15:18:22 +00:00
d28a5d2d7a
[#1448] container/ape: Ignore an error when getting a role
When getting a role in the APE checker for the container services,
an error may be returned if network maps of the previous two epochs
don't have enough nodes to fulfil a container placement policy.
It's a logical error, so we should ignore it.

Signed-off-by: Aleksey Savchuk <a.savchuk@yadro.com>
2024-10-30 12:51:57 +03:00
87ac3c5279 [#1458] object: Make patch not set key before target construction
* `SignRequestPrivateKey` field should be initialized either within
  `newUntrustedTarget` or within `newTrustedTarget`. Otherwise, all
  requests are signed by local node key that makes impossible to perform
  patch on non-container node.

Signed-off-by: Airat Arifullin <a.arifullin@yadro.com>
2024-10-29 15:20:28 +00:00
d5ee6d3039
[#1456] morph: Use DialerSource interface instead of internal struct
Signed-off-by: Dmitrii Stepanov <d.stepanov@yadro.com>
2024-10-29 17:34:14 +03:00
433aab12bb
[#1455] cli: Handle missing home directory
go-homedir library incorrectly handles some of the errors
that could occur. It is archived, so no PR, but let's fix it on our
side. The scenario in case: executing command in an empty environment.

Signed-off-by: Evgenii Stratonikov <e.stratonikov@yadro.com>
2024-10-29 16:37:55 +03:00
81f4cdbb91 [#1439] object: Sort nodes by priority metrics to compute GET request
Signed-off-by: Anton Nikiforov <an.nikiforov@yadro.com>
2024-10-29 08:05:09 +00:00
3cd7d23f10 [#1439] node: Reduce usage of netmapAPI.NodeInfo
Remove outdated code from `netmap` service.

Signed-off-by: Anton Nikiforov <an.nikiforov@yadro.com>
2024-10-29 08:05:09 +00:00
012af5cc38 [#1406] tree: Add unit-tests for ape check
Signed-off-by: Airat Arifullin <a.arifullin@yadro.com>
2024-10-29 08:04:23 +00:00
eb5336d5ff [#1406] tree: Use delete verb instead put for Remove
Signed-off-by: Airat Arifullin <a.arifullin@yadro.com>
2024-10-29 08:04:23 +00:00
45 changed files with 804 additions and 147 deletions

View file

@ -87,5 +87,7 @@ linters:
- perfsprint - perfsprint
- testifylint - testifylint
- protogetter - protogetter
- intrange
- tenv
disable-all: true disable-all: true
fast: false fast: false

View file

@ -1,11 +0,0 @@
pipeline:
# Kludge for non-root containers under WoodPecker
fix-ownership:
image: alpine:latest
commands: chown -R 1234:1234 .
pre-commit:
image: git.frostfs.info/truecloudlab/frostfs-ci:v0.36
commands:
- export HOME="$(getent passwd $(id -u) | cut '-d:' -f6)"
- pre-commit run --hook-stage manual

View file

@ -8,7 +8,7 @@ HUB_IMAGE ?= git.frostfs.info/truecloudlab/frostfs
HUB_TAG ?= "$(shell echo ${VERSION} | sed 's/^v//')" HUB_TAG ?= "$(shell echo ${VERSION} | sed 's/^v//')"
GO_VERSION ?= 1.22 GO_VERSION ?= 1.22
LINT_VERSION ?= 1.60.3 LINT_VERSION ?= 1.61.0
TRUECLOUDLAB_LINT_VERSION ?= 0.0.7 TRUECLOUDLAB_LINT_VERSION ?= 0.0.7
PROTOC_VERSION ?= 25.0 PROTOC_VERSION ?= 25.0
PROTOGEN_FROSTFS_VERSION ?= $(shell go list -f '{{.Version}}' -m git.frostfs.info/TrueCloudLab/frostfs-api-go/v2) PROTOGEN_FROSTFS_VERSION ?= $(shell go list -f '{{.Version}}' -m git.frostfs.info/TrueCloudLab/frostfs-api-go/v2)

View file

@ -128,7 +128,7 @@ func generateConfigExample(appDir string, credSize int) (string, error) {
tmpl.AlphabetDir = filepath.Join(appDir, "alphabet-wallets") tmpl.AlphabetDir = filepath.Join(appDir, "alphabet-wallets")
var i innerring.GlagoliticLetter var i innerring.GlagoliticLetter
for i = 0; i < innerring.GlagoliticLetter(credSize); i++ { for i = range innerring.GlagoliticLetter(credSize) {
tmpl.Glagolitics = append(tmpl.Glagolitics, i.String()) tmpl.Glagolitics = append(tmpl.Glagolitics, i.String())
} }

View file

@ -63,7 +63,7 @@ func TestGenerateAlphabet(t *testing.T) {
buf.Reset() buf.Reset()
v.Set(commonflags.AlphabetWalletsFlag, walletDir) v.Set(commonflags.AlphabetWalletsFlag, walletDir)
require.NoError(t, GenerateAlphabetCmd.Flags().Set(commonflags.AlphabetSizeFlag, strconv.FormatUint(size, 10))) require.NoError(t, GenerateAlphabetCmd.Flags().Set(commonflags.AlphabetSizeFlag, strconv.FormatUint(size, 10)))
for i := uint64(0); i < size; i++ { for i := range uint64(size) {
buf.WriteString(strconv.FormatUint(i, 10) + "\r") buf.WriteString(strconv.FormatUint(i, 10) + "\r")
} }

View file

@ -659,9 +659,7 @@ func SearchObjects(ctx context.Context, prm SearchObjectsPrm) (*SearchObjectsRes
for { for {
n, ok = rdr.Read(buf) n, ok = rdr.Read(buf)
for i := range n { list = append(list, buf[:n]...)
list = append(list, buf[i])
}
if !ok { if !ok {
break break
} }

View file

@ -195,7 +195,7 @@ func flattenComplexMembersIfECContainer(cmd *cobra.Command, cnrID cid.ID, member
prmHead.SetRawFlag(true) // to get an error instead of whole object prmHead.SetRawFlag(true) // to get an error instead of whole object
eg, egCtx := errgroup.WithContext(cmd.Context()) eg, egCtx := errgroup.WithContext(cmd.Context())
for idx := range len(members) { for idx := range members {
partObjID := members[idx] partObjID := members[idx]
eg.Go(func() error { eg.Go(func() error {

View file

@ -114,12 +114,14 @@ func initConfig() {
} else { } else {
// Find home directory. // Find home directory.
home, err := homedir.Dir() home, err := homedir.Dir()
commonCmd.ExitOnErr(rootCmd, "", err) if err != nil {
common.PrintVerbose(rootCmd, "Get homedir: %s", err)
// Search config in `$HOME/.config/frostfs-cli/` with name "config.yaml" } else {
viper.AddConfigPath(filepath.Join(home, ".config", "frostfs-cli")) // Search config in `$HOME/.config/frostfs-cli/` with name "config.yaml"
viper.SetConfigName("config") viper.AddConfigPath(filepath.Join(home, ".config", "frostfs-cli"))
viper.SetConfigType("yaml") viper.SetConfigName("config")
viper.SetConfigType("yaml")
}
} }
viper.SetEnvPrefix(envPrefix) viper.SetEnvPrefix(envPrefix)

View file

@ -11,7 +11,7 @@ func DecodeOIDs(data []byte) ([]oid.ID, error) {
size := r.ReadVarUint() size := r.ReadVarUint()
oids := make([]oid.ID, size) oids := make([]oid.ID, size)
for i := uint64(0); i < size; i++ { for i := range size {
if err := oids[i].Decode(r.ReadVarBytes()); err != nil { if err := oids[i].Decode(r.ReadVarBytes()); err != nil {
return nil, err return nil, err
} }

View file

@ -58,6 +58,7 @@ import (
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/control" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/control"
objectService "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object" objectService "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object"
getsvc "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/get" getsvc "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/get"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object_manager/placement"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object_manager/tombstone" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object_manager/tombstone"
tsourse "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object_manager/tombstone/source" tsourse "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object_manager/tombstone/source"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/replicator" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/replicator"
@ -109,6 +110,7 @@ type applicationConfiguration struct {
ObjectCfg struct { ObjectCfg struct {
tombstoneLifetime uint64 tombstoneLifetime uint64
priorityMetrics []placement.Metric
} }
EngineCfg struct { EngineCfg struct {
@ -232,6 +234,15 @@ func (a *applicationConfiguration) readConfig(c *config.Config) error {
// Object // Object
a.ObjectCfg.tombstoneLifetime = objectconfig.TombstoneLifetime(c) a.ObjectCfg.tombstoneLifetime = objectconfig.TombstoneLifetime(c)
var pm []placement.Metric
for _, raw := range objectconfig.Get(c).Priority() {
m, err := placement.ParseMetric(raw)
if err != nil {
return err
}
pm = append(pm, m)
}
a.ObjectCfg.priorityMetrics = pm
// Storage Engine // Storage Engine
@ -1179,17 +1190,15 @@ func initObjectPool(cfg *config.Config) (pool cfgObjectRoutines) {
return pool return pool
} }
func (c *cfg) LocalNodeInfo() (*netmapV2.NodeInfo, error) { func (c *cfg) LocalNodeInfo() *netmap.NodeInfo {
var res netmapV2.NodeInfo var res netmap.NodeInfo
ni, ok := c.cfgNetmap.state.getNodeInfo() ni, ok := c.cfgNetmap.state.getNodeInfo()
if ok { if ok {
ni.WriteToV2(&res) res = ni
} else { } else {
c.cfgNodeInfo.localInfo.WriteToV2(&res) res = c.cfgNodeInfo.localInfo
} }
return &res
return &res, nil
} }
// setContractNodeInfo rewrites local node info from the FrostFS network map. // setContractNodeInfo rewrites local node info from the FrostFS network map.

View file

@ -1,7 +1,6 @@
package config_test package config_test
import ( import (
"os"
"strings" "strings"
"testing" "testing"
@ -38,8 +37,7 @@ func TestConfigEnv(t *testing.T) {
envName := strings.ToUpper( envName := strings.ToUpper(
strings.Join([]string{config.EnvPrefix, section, name}, configViper.EnvSeparator)) strings.Join([]string{config.EnvPrefix, section, name}, configViper.EnvSeparator))
err := os.Setenv(envName, value) t.Setenv(envName, value)
require.NoError(t, err)
c := configtest.EmptyConfig() c := configtest.EmptyConfig()

View file

@ -10,10 +10,17 @@ type PutConfig struct {
cfg *config.Config cfg *config.Config
} }
// GetConfig is a wrapper over "get" config section which provides access
// to object get pipeline configuration of object service.
type GetConfig struct {
cfg *config.Config
}
const ( const (
subsection = "object" subsection = "object"
putSubsection = "put" putSubsection = "put"
getSubsection = "get"
// PutPoolSizeDefault is a default value of routine pool size to // PutPoolSizeDefault is a default value of routine pool size to
// process object.Put requests in object service. // process object.Put requests in object service.
@ -56,3 +63,16 @@ func (g PutConfig) PoolSizeLocal() int {
func (g PutConfig) SkipSessionTokenIssuerVerification() bool { func (g PutConfig) SkipSessionTokenIssuerVerification() bool {
return config.BoolSafe(g.cfg, "skip_session_token_issuer_verification") return config.BoolSafe(g.cfg, "skip_session_token_issuer_verification")
} }
// Get returns structure that provides access to "get" subsection of
// "object" section.
func Get(c *config.Config) GetConfig {
return GetConfig{
c.Sub(subsection).Sub(getSubsection),
}
}
// Priority returns the value of "priority" config parameter.
func (g GetConfig) Priority() []string {
return config.StringSliceSafe(g.cfg, "priority")
}

View file

@ -11,8 +11,6 @@ import (
) )
func fromFile(path string) *config.Config { func fromFile(path string) *config.Config {
os.Clearenv() // ENVs have priority over config files, so we do this in tests
return config.New(path, "", "") return config.New(path, "", "")
} }
@ -40,15 +38,6 @@ func ForEachFileType(pref string, f func(*config.Config)) {
// ForEnvFileType creates config from `<pref>.env` file. // ForEnvFileType creates config from `<pref>.env` file.
func ForEnvFileType(t testing.TB, pref string, f func(*config.Config)) { func ForEnvFileType(t testing.TB, pref string, f func(*config.Config)) {
envs := os.Environ()
t.Cleanup(func() {
os.Clearenv()
for _, env := range envs {
keyValue := strings.Split(env, "=")
os.Setenv(keyValue[0], keyValue[1])
}
})
f(fromEnvFile(t, pref+".env")) f(fromEnvFile(t, pref+".env"))
} }
@ -73,7 +62,6 @@ func loadEnv(t testing.TB, path string) {
v = strings.Trim(v, `"`) v = strings.Trim(v, `"`)
err = os.Setenv(k, v) t.Setenv(k, v)
require.NoError(t, err, "can't set environment variable")
} }
} }

View file

@ -8,6 +8,7 @@ import (
containerGRPC "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/container/grpc" containerGRPC "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/container/grpc"
morphconfig "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/morph" morphconfig "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/morph"
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs" "git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/metrics"
containerCore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/container" containerCore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/container"
frostfsidcore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/frostfsid" frostfsidcore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/frostfsid"
cntClient "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/client/container" cntClient "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/client/container"
@ -42,7 +43,7 @@ func initContainerService(_ context.Context, c *cfg) {
cacheSize := morphconfig.FrostfsIDCacheSize(c.appCfg) cacheSize := morphconfig.FrostfsIDCacheSize(c.appCfg)
if cacheSize > 0 { if cacheSize > 0 {
frostfsIDSubjectProvider = newMorphFrostfsIDCache(frostfsIDSubjectProvider, int(cacheSize), c.cfgMorph.cacheTTL) frostfsIDSubjectProvider = newMorphFrostfsIDCache(frostfsIDSubjectProvider, int(cacheSize), c.cfgMorph.cacheTTL, metrics.NewCacheMetrics("frostfs_id"))
} }
c.shared.frostfsidClient = frostfsIDSubjectProvider c.shared.frostfsidClient = frostfsIDSubjectProvider

View file

@ -1,6 +1,7 @@
package main package main
import ( import (
"strings"
"time" "time"
"git.frostfs.info/TrueCloudLab/frostfs-contract/frostfsid/client" "git.frostfs.info/TrueCloudLab/frostfs-contract/frostfsid/client"
@ -9,57 +10,101 @@ import (
"github.com/nspcc-dev/neo-go/pkg/util" "github.com/nspcc-dev/neo-go/pkg/util"
) )
type subjectWithError struct {
subject *client.Subject
err error
}
type subjectExtWithError struct {
subject *client.SubjectExtended
err error
}
type morphFrostfsIDCache struct { type morphFrostfsIDCache struct {
subjProvider frostfsidcore.SubjectProvider subjProvider frostfsidcore.SubjectProvider
subjCache *expirable.LRU[util.Uint160, *client.Subject] subjCache *expirable.LRU[util.Uint160, subjectWithError]
subjExtCache *expirable.LRU[util.Uint160, *client.SubjectExtended] subjExtCache *expirable.LRU[util.Uint160, subjectExtWithError]
metrics cacheMetrics
} }
func newMorphFrostfsIDCache(subjProvider frostfsidcore.SubjectProvider, size int, ttl time.Duration) frostfsidcore.SubjectProvider { func newMorphFrostfsIDCache(subjProvider frostfsidcore.SubjectProvider, size int, ttl time.Duration, metrics cacheMetrics) frostfsidcore.SubjectProvider {
return &morphFrostfsIDCache{ return &morphFrostfsIDCache{
subjProvider: subjProvider, subjProvider: subjProvider,
subjCache: expirable.NewLRU(size, func(util.Uint160, *client.Subject) {}, ttl), subjCache: expirable.NewLRU(size, func(util.Uint160, subjectWithError) {}, ttl),
subjExtCache: expirable.NewLRU(size, func(util.Uint160, *client.SubjectExtended) {}, ttl), subjExtCache: expirable.NewLRU(size, func(util.Uint160, subjectExtWithError) {}, ttl),
metrics: metrics,
} }
} }
func (m *morphFrostfsIDCache) GetSubject(addr util.Uint160) (*client.Subject, error) { func (m *morphFrostfsIDCache) GetSubject(addr util.Uint160) (*client.Subject, error) {
hit := false
startedAt := time.Now()
defer func() {
m.metrics.AddMethodDuration("GetSubject", time.Since(startedAt), hit)
}()
result, found := m.subjCache.Get(addr) result, found := m.subjCache.Get(addr)
if found { if found {
return result, nil hit = true
return result.subject, result.err
} }
result, err := m.subjProvider.GetSubject(addr) subj, err := m.subjProvider.GetSubject(addr)
if err != nil { if err != nil {
if m.isCacheableError(err) {
m.subjCache.Add(addr, subjectWithError{
err: err,
})
}
return nil, err return nil, err
} }
m.subjCache.Add(addr, result) m.subjCache.Add(addr, subjectWithError{subject: subj})
return result, nil return subj, nil
} }
func (m *morphFrostfsIDCache) GetSubjectExtended(addr util.Uint160) (*client.SubjectExtended, error) { func (m *morphFrostfsIDCache) GetSubjectExtended(addr util.Uint160) (*client.SubjectExtended, error) {
subjExt, found := m.subjExtCache.Get(addr) hit := false
startedAt := time.Now()
defer func() {
m.metrics.AddMethodDuration("GetSubjectExtended", time.Since(startedAt), hit)
}()
result, found := m.subjExtCache.Get(addr)
if found { if found {
return subjExt, nil hit = true
return result.subject, result.err
} }
var err error subjExt, err := m.subjProvider.GetSubjectExtended(addr)
subjExt, err = m.subjProvider.GetSubjectExtended(addr)
if err != nil { if err != nil {
if m.isCacheableError(err) {
m.subjExtCache.Add(addr, subjectExtWithError{
err: err,
})
m.subjCache.Add(addr, subjectWithError{
err: err,
})
}
return nil, err return nil, err
} }
m.subjExtCache.Add(addr, subjExt) m.subjExtCache.Add(addr, subjectExtWithError{subject: subjExt})
m.subjCache.Add(addr, subjectFromSubjectExtended(subjExt)) m.subjCache.Add(addr, subjectWithError{subject: subjectFromSubjectExtended(subjExt)})
return subjExt, nil return subjExt, nil
} }
func (m *morphFrostfsIDCache) isCacheableError(err error) bool {
return strings.Contains(err.Error(), frostfsidcore.SubjectNotFoundErrorMessage)
}
func subjectFromSubjectExtended(subjExt *client.SubjectExtended) *client.Subject { func subjectFromSubjectExtended(subjExt *client.SubjectExtended) *client.Subject {
return &client.Subject{ return &client.Subject{
PrimaryKey: subjExt.PrimaryKey, PrimaryKey: subjExt.PrimaryKey,

View file

@ -178,7 +178,8 @@ func initObjectService(c *cfg) {
sSearchV2 := createSearchSvcV2(sSearch, keyStorage) sSearchV2 := createSearchSvcV2(sSearch, keyStorage)
sGet := createGetService(c, keyStorage, traverseGen, c.clientCache, c.cfgObject.cnrSource) sGet := createGetService(c, keyStorage, traverseGen, c.clientCache, c.cfgObject.cnrSource,
c.ObjectCfg.priorityMetrics)
*c.cfgObject.getSvc = *sGet // need smth better *c.cfgObject.getSvc = *sGet // need smth better
@ -389,6 +390,7 @@ func createSearchSvcV2(sSearch *searchsvc.Service, keyStorage *util.KeyStorage)
func createGetService(c *cfg, keyStorage *util.KeyStorage, traverseGen *util.TraverserGenerator, func createGetService(c *cfg, keyStorage *util.KeyStorage, traverseGen *util.TraverserGenerator,
coreConstructor *cache.ClientCache, coreConstructor *cache.ClientCache,
containerSource containercore.Source, containerSource containercore.Source,
priorityMetrics []placement.Metric,
) *getsvc.Service { ) *getsvc.Service {
ls := c.cfgObject.cfgLocalStorage.localStorage ls := c.cfgObject.cfgLocalStorage.localStorage
@ -398,6 +400,8 @@ func createGetService(c *cfg, keyStorage *util.KeyStorage, traverseGen *util.Tra
ls, ls,
traverseGen.WithTraverseOptions( traverseGen.WithTraverseOptions(
placement.SuccessAfter(1), placement.SuccessAfter(1),
placement.WithPriorityMetrics(priorityMetrics),
placement.WithNodeState(c),
), ),
coreConstructor, coreConstructor,
containerSource, containerSource,

View file

@ -87,6 +87,7 @@ FROSTFS_OBJECT_PUT_REMOTE_POOL_SIZE=100
FROSTFS_OBJECT_PUT_LOCAL_POOL_SIZE=200 FROSTFS_OBJECT_PUT_LOCAL_POOL_SIZE=200
FROSTFS_OBJECT_PUT_SKIP_SESSION_TOKEN_ISSUER_VERIFICATION=true FROSTFS_OBJECT_PUT_SKIP_SESSION_TOKEN_ISSUER_VERIFICATION=true
FROSTFS_OBJECT_DELETE_TOMBSTONE_LIFETIME=10 FROSTFS_OBJECT_DELETE_TOMBSTONE_LIFETIME=10
FROSTFS_OBJECT_GET_PRIORITY="$attribute:ClusterName $attribute:UN-LOCODE"
# Storage engine section # Storage engine section
FROSTFS_STORAGE_SHARD_POOL_SIZE=15 FROSTFS_STORAGE_SHARD_POOL_SIZE=15

View file

@ -131,6 +131,9 @@
"remote_pool_size": 100, "remote_pool_size": 100,
"local_pool_size": 200, "local_pool_size": 200,
"skip_session_token_issuer_verification": true "skip_session_token_issuer_verification": true
},
"get": {
"priority": ["$attribute:ClusterName", "$attribute:UN-LOCODE"]
} }
}, },
"storage": { "storage": {

View file

@ -114,6 +114,10 @@ object:
remote_pool_size: 100 # number of async workers for remote PUT operations remote_pool_size: 100 # number of async workers for remote PUT operations
local_pool_size: 200 # number of async workers for local PUT operations local_pool_size: 200 # number of async workers for local PUT operations
skip_session_token_issuer_verification: true # session token issuer verification will be skipped if true skip_session_token_issuer_verification: true # session token issuer verification will be skipped if true
get:
priority: # list of metrics of nodes for prioritization
- $attribute:ClusterName
- $attribute:UN-LOCODE
storage: storage:
# note: shard configuration can be omitted for relay node (see `node.relay`) # note: shard configuration can be omitted for relay node (see `node.relay`)

View file

@ -407,13 +407,17 @@ Contains object-service related parameters.
object: object:
put: put:
remote_pool_size: 100 remote_pool_size: 100
get:
priority:
- $attribute:ClusterName
``` ```
| Parameter | Type | Default value | Description | | Parameter | Type | Default value | Description |
|-----------------------------|-------|---------------|------------------------------------------------------------------------------------------------| |-----------------------------|------------|---------------|------------------------------------------------------------------------------------------------------|
| `delete.tombstone_lifetime` | `int` | `5` | Tombstone lifetime for removed objects in epochs. | | `delete.tombstone_lifetime` | `int` | `5` | Tombstone lifetime for removed objects in epochs. |
| `put.remote_pool_size` | `int` | `10` | Max pool size for performing remote `PUT` operations. Used by Policer and Replicator services. | | `put.remote_pool_size` | `int` | `10` | Max pool size for performing remote `PUT` operations. Used by Policer and Replicator services. |
| `put.local_pool_size` | `int` | `10` | Max pool size for performing local `PUT` operations. Used by Policer and Replicator services. | | `put.local_pool_size` | `int` | `10` | Max pool size for performing local `PUT` operations. Used by Policer and Replicator services. |
| `get.priority` | `[]string` | | List of metrics of nodes for prioritization. Used for computing response on GET and SEARCH requests. |
# `runtime` section # `runtime` section
Contains runtime parameters. Contains runtime parameters.

2
go.mod
View file

@ -19,6 +19,7 @@ require (
github.com/cheggaaa/pb v1.0.29 github.com/cheggaaa/pb v1.0.29
github.com/chzyer/readline v1.5.1 github.com/chzyer/readline v1.5.1
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc
github.com/felixge/fgprof v0.9.5
github.com/flynn-archive/go-shlex v0.0.0-20150515145356-3f9db97f8568 github.com/flynn-archive/go-shlex v0.0.0-20150515145356-3f9db97f8568
github.com/gdamore/tcell/v2 v2.7.4 github.com/gdamore/tcell/v2 v2.7.4
github.com/go-pkgz/expirable-cache/v3 v3.0.0 github.com/go-pkgz/expirable-cache/v3 v3.0.0
@ -77,6 +78,7 @@ require (
github.com/go-logr/logr v1.4.2 // indirect github.com/go-logr/logr v1.4.2 // indirect
github.com/go-logr/stdr v1.2.2 // indirect github.com/go-logr/stdr v1.2.2 // indirect
github.com/golang/snappy v0.0.4 // indirect github.com/golang/snappy v0.0.4 // indirect
github.com/google/pprof v0.0.0-20240227163752-401108e1b7e7 // indirect
github.com/gorilla/websocket v1.5.1 // indirect github.com/gorilla/websocket v1.5.1 // indirect
github.com/grpc-ecosystem/go-grpc-middleware/providers/prometheus v1.0.0 // indirect github.com/grpc-ecosystem/go-grpc-middleware/providers/prometheus v1.0.0 // indirect
github.com/grpc-ecosystem/go-grpc-middleware/v2 v2.0.1 // indirect github.com/grpc-ecosystem/go-grpc-middleware/v2 v2.0.1 // indirect

BIN
go.sum

Binary file not shown.

View file

@ -61,7 +61,7 @@ func TestBlobovniczaTreeFillPercentRebuild(t *testing.T) {
require.NoError(t, b.Init()) require.NoError(t, b.Init())
storageIDs := make(map[oid.Address][]byte) storageIDs := make(map[oid.Address][]byte)
for i := 0; i < 100; i++ { for range 100 {
obj := blobstortest.NewObject(64 * 1024) // 64KB object obj := blobstortest.NewObject(64 * 1024) // 64KB object
data, err := obj.Marshal() data, err := obj.Marshal()
require.NoError(t, err) require.NoError(t, err)
@ -168,7 +168,7 @@ func TestBlobovniczaTreeFillPercentRebuild(t *testing.T) {
storageIDs := make(map[oid.Address][]byte) storageIDs := make(map[oid.Address][]byte)
toDelete := make(map[oid.Address][]byte) toDelete := make(map[oid.Address][]byte)
for i := 0; i < 100; i++ { // 2 objects for one blobovnicza, so 50 DBs total will be created for i := range 100 { // 2 objects for one blobovnicza, so 50 DBs total will be created
obj := blobstortest.NewObject(64 * 1024) obj := blobstortest.NewObject(64 * 1024)
data, err := obj.Marshal() data, err := obj.Marshal()
require.NoError(t, err) require.NoError(t, err)
@ -236,7 +236,7 @@ func TestBlobovniczaTreeFillPercentRebuild(t *testing.T) {
require.NoError(t, b.Init()) require.NoError(t, b.Init())
storageIDs := make(map[oid.Address][]byte) storageIDs := make(map[oid.Address][]byte)
for i := 0; i < 100; i++ { // 2 objects for one blobovnicza, so 50 DBs total will be created for range 100 { // 2 objects for one blobovnicza, so 50 DBs total will be created
obj := blobstortest.NewObject(64 * 1024) obj := blobstortest.NewObject(64 * 1024)
data, err := obj.Marshal() data, err := obj.Marshal()
require.NoError(t, err) require.NoError(t, err)

View file

@ -47,7 +47,7 @@ func TestIterateObjects(t *testing.T) {
mObjs := make(map[string]addrData) mObjs := make(map[string]addrData)
for i := uint64(0); i < objNum; i++ { for i := range uint64(objNum) {
sz := smalSz sz := smalSz
big := i < objNum/2 big := i < objNum/2

View file

@ -151,7 +151,7 @@ func TestErrorReporting(t *testing.T) {
checkShardState(t, te.ng, te.shards[1].id, 0, mode.ReadWrite) checkShardState(t, te.ng, te.shards[1].id, 0, mode.ReadWrite)
} }
for i := uint32(0); i < 2; i++ { for i := range uint32(2) {
_, err = te.ng.Get(context.Background(), GetPrm{addr: object.AddressOf(obj)}) _, err = te.ng.Get(context.Background(), GetPrm{addr: object.AddressOf(obj)})
require.Error(t, err) require.Error(t, err)
checkShardState(t, te.ng, te.shards[0].id, errThreshold+i, mode.ReadOnly) checkShardState(t, te.ng, te.shards[0].id, errThreshold+i, mode.ReadOnly)

View file

@ -705,7 +705,7 @@ func (t *boltForest) applyOperation(logBucket, treeBucket *bbolt.Bucket, ms []*M
key, value = c.Prev() key, value = c.Prev()
} }
for i := range len(ms) { for i := range ms {
// Loop invariant: key represents the next stored timestamp after ms[i].Time. // Loop invariant: key represents the next stored timestamp after ms[i].Time.
// 2. Insert the operation. // 2. Insert the operation.

View file

@ -1081,7 +1081,7 @@ func prepareRandomTree(nodeCount, opCount int) []Move {
} }
func compareForests(t *testing.T, expected, actual Forest, cid cidSDK.ID, treeID string, nodeCount int) { func compareForests(t *testing.T, expected, actual Forest, cid cidSDK.ID, treeID string, nodeCount int) {
for i := uint64(0); i < uint64(nodeCount); i++ { for i := range uint64(nodeCount) {
expectedMeta, expectedParent, err := expected.TreeGetMeta(context.Background(), cid, treeID, i) expectedMeta, expectedParent, err := expected.TreeGetMeta(context.Background(), cid, treeID, i)
require.NoError(t, err) require.NoError(t, err)
actualMeta, actualParent, err := actual.TreeGetMeta(context.Background(), cid, treeID, i) actualMeta, actualParent, err := actual.TreeGetMeta(context.Background(), cid, treeID, i)

View file

@ -216,7 +216,7 @@ func TestRefillMetabase(t *testing.T) {
locked := make([]oid.ID, 1, 2) locked := make([]oid.ID, 1, 2)
locked[0] = oidtest.ID() locked[0] = oidtest.ID()
cnrLocked := cidtest.ID() cnrLocked := cidtest.ID()
for i := uint64(0); i < objNum; i++ { for range objNum {
obj := objecttest.Object() obj := objecttest.Object()
obj.SetType(objectSDK.TypeRegular) obj.SetType(objectSDK.TypeRegular)

View file

@ -14,7 +14,7 @@ func TestLimiter(t *testing.T) {
l := newFlushLimiter(uint64(maxSize)) l := newFlushLimiter(uint64(maxSize))
var currSize atomic.Int64 var currSize atomic.Int64
var eg errgroup.Group var eg errgroup.Group
for i := 0; i < 10_000; i++ { for range 10_000 {
eg.Go(func() error { eg.Go(func() error {
defer l.release(single) defer l.release(single)
defer currSize.Add(-1) defer currSize.Add(-1)

View file

@ -4,11 +4,11 @@ import (
"context" "context"
"errors" "errors"
"fmt" "fmt"
"net"
"time" "time"
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs" "git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/metrics" "git.frostfs.info/TrueCloudLab/frostfs-node/internal/metrics"
internalNet "git.frostfs.info/TrueCloudLab/frostfs-node/internal/net"
morphmetrics "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/metrics" morphmetrics "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/metrics"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger"
lru "github.com/hashicorp/golang-lru/v2" lru "github.com/hashicorp/golang-lru/v2"
@ -48,7 +48,7 @@ type cfg struct {
morphCacheMetrics metrics.MorphCacheMetrics morphCacheMetrics metrics.MorphCacheMetrics
dialerSource *internalNet.DialerSource dialerSource DialerSource
} }
const ( const (
@ -68,6 +68,7 @@ func defaultConfig() *cfg {
Scopes: transaction.Global, Scopes: transaction.Global,
}, },
morphCacheMetrics: &morphmetrics.NoopMorphCacheMetrics{}, morphCacheMetrics: &morphmetrics.NoopMorphCacheMetrics{},
dialerSource: &noopDialerSource{},
} }
} }
@ -296,7 +297,17 @@ func WithMorphCacheMetrics(morphCacheMetrics metrics.MorphCacheMetrics) Option {
} }
} }
func WithDialerSource(ds *internalNet.DialerSource) Option { type DialerSource interface {
NetContextDialer() func(context.Context, string, string) (net.Conn, error)
}
type noopDialerSource struct{}
func (ds *noopDialerSource) NetContextDialer() func(context.Context, string, string) (net.Conn, error) {
return nil
}
func WithDialerSource(ds DialerSource) Option {
return func(c *cfg) { return func(c *cfg) {
c.dialerSource = ds c.dialerSource = ds
} }

View file

@ -8,7 +8,7 @@ import (
) )
func tickN(t *timer.BlockTimer, n uint32) { func tickN(t *timer.BlockTimer, n uint32) {
for i := uint32(0); i < n; i++ { for range n {
t.Tick(0) t.Tick(0)
} }
} }

View file

@ -537,10 +537,7 @@ func (ac *apeChecker) isContainerKey(pk []byte, cnrID cid.ID, cont *containercor
return false, err return false, err
} }
in, err := isContainerNode(nm, pk, binCnrID, cont) if isContainerNode(nm, pk, binCnrID, cont) {
if err != nil {
return false, err
} else if in {
return true, nil return true, nil
} }
@ -551,24 +548,24 @@ func (ac *apeChecker) isContainerKey(pk []byte, cnrID cid.ID, cont *containercor
return false, err return false, err
} }
return isContainerNode(nm, pk, binCnrID, cont) return isContainerNode(nm, pk, binCnrID, cont), nil
} }
func isContainerNode(nm *netmapSDK.NetMap, pk, binCnrID []byte, cont *containercore.Container) (bool, error) { func isContainerNode(nm *netmapSDK.NetMap, pk, binCnrID []byte, cont *containercore.Container) bool {
cnrVectors, err := nm.ContainerNodes(cont.Value.PlacementPolicy(), binCnrID) // It could an error only if the network map doesn't have enough nodes to
if err != nil { // fulfil the policy. It's a logical error that doesn't affect an actor role
return false, err // determining, so we ignore it
} cnrVectors, _ := nm.ContainerNodes(cont.Value.PlacementPolicy(), binCnrID)
for i := range cnrVectors { for i := range cnrVectors {
for j := range cnrVectors[i] { for j := range cnrVectors[i] {
if bytes.Equal(cnrVectors[i][j].PublicKey(), pk) { if bytes.Equal(cnrVectors[i][j].PublicKey(), pk) {
return true, nil return true
} }
} }
} }
return false, nil return false
} }
func (ac *apeChecker) namespaceByOwner(owner *refs.OwnerID) (string, error) { func (ac *apeChecker) namespaceByOwner(owner *refs.OwnerID) (string, error) {

View file

@ -28,7 +28,7 @@ type executorSvc struct {
type NodeState interface { type NodeState interface {
// LocalNodeInfo must return current node state // LocalNodeInfo must return current node state
// in FrostFS API v2 NodeInfo structure. // in FrostFS API v2 NodeInfo structure.
LocalNodeInfo() (*netmap.NodeInfo, error) LocalNodeInfo() *netmapSDK.NodeInfo
// ReadCurrentNetMap reads current local network map of the storage node // ReadCurrentNetMap reads current local network map of the storage node
// into the given parameter. Returns any error encountered which prevented // into the given parameter. Returns any error encountered which prevented
@ -64,39 +64,15 @@ func NewExecutionService(s NodeState, v versionsdk.Version, netInfo NetworkInfo,
func (s *executorSvc) LocalNodeInfo( func (s *executorSvc) LocalNodeInfo(
_ context.Context, _ context.Context,
req *netmap.LocalNodeInfoRequest, _ *netmap.LocalNodeInfoRequest,
) (*netmap.LocalNodeInfoResponse, error) { ) (*netmap.LocalNodeInfoResponse, error) {
verV2 := req.GetMetaHeader().GetVersion() ni := s.state.LocalNodeInfo()
if verV2 == nil { var nodeInfo netmap.NodeInfo
return nil, errors.New("missing version") ni.WriteToV2(&nodeInfo)
}
var ver versionsdk.Version
if err := ver.ReadFromV2(*verV2); err != nil {
return nil, fmt.Errorf("can't read version: %w", err)
}
ni, err := s.state.LocalNodeInfo()
if err != nil {
return nil, err
}
if addrNum := ni.NumberOfAddresses(); addrNum > 0 && ver.Minor() <= 7 {
ni2 := new(netmap.NodeInfo)
ni2.SetPublicKey(ni.GetPublicKey())
ni2.SetState(ni.GetState())
ni2.SetAttributes(ni.GetAttributes())
ni.IterateAddresses(func(s string) bool {
ni2.SetAddresses(s)
return true
})
ni = ni2
}
body := new(netmap.LocalNodeInfoResponseBody) body := new(netmap.LocalNodeInfoResponseBody)
body.SetVersion(&s.version) body.SetVersion(&s.version)
body.SetNodeInfo(ni) body.SetNodeInfo(&nodeInfo)
resp := new(netmap.LocalNodeInfoResponse) resp := new(netmap.LocalNodeInfoResponse)
resp.SetBody(body) resp.SetBody(body)

View file

@ -50,7 +50,7 @@ func testGenerateMetaHeader(depth uint32, b *acl.BearerToken, s *session.Token)
metaHeader.SetBearerToken(b) metaHeader.SetBearerToken(b)
metaHeader.SetSessionToken(s) metaHeader.SetSessionToken(s)
for i := uint32(0); i < depth; i++ { for range depth {
link := metaHeader link := metaHeader
metaHeader = new(session.RequestMetaHeader) metaHeader = new(session.RequestMetaHeader)
metaHeader.SetOrigin(link) metaHeader.SetOrigin(link)

View file

@ -284,7 +284,7 @@ func (e *ECWriter) writePart(ctx context.Context, obj *objectSDK.Object, partIdx
} }
// try to save to any node not visited by current part // try to save to any node not visited by current part
for i := range len(nodes) { for i := range nodes {
select { select {
case <-ctx.Done(): case <-ctx.Done():
return ctx.Err() return ctx.Err()

View file

@ -113,10 +113,9 @@ func (s *Streamer) init(ctx context.Context, req *objectV2.PatchRequest) error {
oV2.GetHeader().SetOwnerID(ownerID) oV2.GetHeader().SetOwnerID(ownerID)
target, err := target.New(objectwriter.Params{ target, err := target.New(objectwriter.Params{
Config: s.Config, Config: s.Config,
Common: commonPrm, Common: commonPrm,
Header: objectSDK.NewFromV2(oV2), Header: objectSDK.NewFromV2(oV2),
SignRequestPrivateKey: s.localNodeKey,
}) })
if err != nil { if err != nil {
return fmt.Errorf("target creation: %w", err) return fmt.Errorf("target creation: %w", err)

View file

@ -0,0 +1,43 @@
package placement
import (
"errors"
"strings"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/netmap"
)
const (
attrPrefix = "$attribute:"
)
type Metric interface {
CalculateValue(*netmap.NodeInfo, *netmap.NodeInfo) int
}
func ParseMetric(raw string) (Metric, error) {
if attr, found := strings.CutPrefix(raw, attrPrefix); found {
return NewAttributeMetric(attr), nil
}
return nil, errors.New("unsupported priority metric")
}
// attributeMetric describes priority metric based on attribute.
type attributeMetric struct {
attribute string
}
// CalculateValue return [0] if from and to contains attribute attributeMetric.attribute and
// the value of attribute is the same. In other case return [1].
func (am *attributeMetric) CalculateValue(from *netmap.NodeInfo, to *netmap.NodeInfo) int {
fromAttr := from.Attribute(am.attribute)
toAttr := to.Attribute(am.attribute)
if len(fromAttr) > 0 && len(toAttr) > 0 && fromAttr == toAttr {
return 0
}
return 1
}
func NewAttributeMetric(attr string) Metric {
return &attributeMetric{attribute: attr}
}

View file

@ -3,6 +3,7 @@ package placement
import ( import (
"errors" "errors"
"fmt" "fmt"
"slices"
"sync" "sync"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/network" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/network"
@ -23,6 +24,11 @@ type Builder interface {
BuildPlacement(cid.ID, *oid.ID, netmap.PlacementPolicy) ([][]netmap.NodeInfo, error) BuildPlacement(cid.ID, *oid.ID, netmap.PlacementPolicy) ([][]netmap.NodeInfo, error)
} }
type NodeState interface {
// LocalNodeInfo return current node state in FrostFS API v2 NodeInfo structure.
LocalNodeInfo() *netmap.NodeInfo
}
// Option represents placement traverser option. // Option represents placement traverser option.
type Option func(*cfg) type Option func(*cfg)
@ -50,6 +56,10 @@ type cfg struct {
policy netmap.PlacementPolicy policy netmap.PlacementPolicy
builder Builder builder Builder
metrics []Metric
nodeState NodeState
} }
const invalidOptsMsg = "invalid traverser options" const invalidOptsMsg = "invalid traverser options"
@ -99,7 +109,22 @@ func NewTraverser(opts ...Option) (*Traverser, error) {
} }
var rem []int var rem []int
if cfg.flatSuccess != nil { if len(cfg.metrics) > 0 && cfg.nodeState != nil {
rem = defaultCopiesVector(cfg.policy)
var unsortedVector []netmap.NodeInfo
var regularVector []netmap.NodeInfo
for i := range rem {
unsortedVector = append(unsortedVector, ns[i][:rem[i]]...)
regularVector = append(regularVector, ns[i][rem[i]:]...)
}
rem = []int{-1, -1}
sortedVector, err := sortVector(cfg, unsortedVector)
if err != nil {
return nil, err
}
ns = [][]netmap.NodeInfo{sortedVector, regularVector}
} else if cfg.flatSuccess != nil {
ns = flatNodes(ns) ns = flatNodes(ns)
rem = []int{int(*cfg.flatSuccess)} rem = []int{int(*cfg.flatSuccess)}
} else { } else {
@ -157,6 +182,35 @@ func flatNodes(ns [][]netmap.NodeInfo) [][]netmap.NodeInfo {
return [][]netmap.NodeInfo{flat} return [][]netmap.NodeInfo{flat}
} }
type nodeMetrics struct {
index int
metrics []int
}
func sortVector(cfg *cfg, unsortedVector []netmap.NodeInfo) ([]netmap.NodeInfo, error) {
nm := make([]nodeMetrics, len(unsortedVector))
node := cfg.nodeState.LocalNodeInfo()
for i := range unsortedVector {
m := make([]int, len(cfg.metrics))
for j, pm := range cfg.metrics {
m[j] = pm.CalculateValue(node, &unsortedVector[i])
}
nm[i] = nodeMetrics{
index: i,
metrics: m,
}
}
slices.SortFunc(nm, func(a, b nodeMetrics) int {
return slices.Compare(a.metrics, b.metrics)
})
sortedVector := make([]netmap.NodeInfo, len(unsortedVector))
for i := range unsortedVector {
sortedVector[i] = unsortedVector[nm[i].index]
}
return sortedVector, nil
}
// Node is a descriptor of storage node with information required for intra-container communication. // Node is a descriptor of storage node with information required for intra-container communication.
type Node struct { type Node struct {
addresses network.AddressGroup addresses network.AddressGroup
@ -322,3 +376,17 @@ func WithCopyNumbers(v []uint32) Option {
c.copyNumbers = v c.copyNumbers = v
} }
} }
// WithPriorityMetrics use provided priority metrics to sort nodes.
func WithPriorityMetrics(m []Metric) Option {
return func(c *cfg) {
c.metrics = m
}
}
// WithNodeState provide state of the current node.
func WithNodeState(s NodeState) Option {
return func(c *cfg) {
c.nodeState = s
}
}

View file

@ -22,7 +22,9 @@ func (b testBuilder) BuildPlacement(cid.ID, *oid.ID, netmap.PlacementPolicy) ([]
} }
func testNode(v uint32) (n netmap.NodeInfo) { func testNode(v uint32) (n netmap.NodeInfo) {
n.SetNetworkEndpoints("/ip4/0.0.0.0/tcp/" + strconv.Itoa(int(v))) ip := "/ip4/0.0.0.0/tcp/" + strconv.Itoa(int(v))
n.SetNetworkEndpoints(ip)
n.SetPublicKey([]byte(ip))
return n return n
} }
@ -40,7 +42,15 @@ func copyVectors(v [][]netmap.NodeInfo) [][]netmap.NodeInfo {
return vc return vc
} }
func testPlacement(ss, rs []int) ([][]netmap.NodeInfo, container.Container) { func testPlacement(ss []int, rs []int) ([][]netmap.NodeInfo, container.Container) {
return placement(ss, rs, nil)
}
func testECPlacement(ss []int, ec [][]int) ([][]netmap.NodeInfo, container.Container) {
return placement(ss, nil, ec)
}
func placement(ss []int, rs []int, ec [][]int) ([][]netmap.NodeInfo, container.Container) {
nodes := make([][]netmap.NodeInfo, 0, len(rs)) nodes := make([][]netmap.NodeInfo, 0, len(rs))
replicas := make([]netmap.ReplicaDescriptor, 0, len(rs)) replicas := make([]netmap.ReplicaDescriptor, 0, len(rs))
num := uint32(0) num := uint32(0)
@ -56,7 +66,12 @@ func testPlacement(ss, rs []int) ([][]netmap.NodeInfo, container.Container) {
nodes = append(nodes, ns) nodes = append(nodes, ns)
var rd netmap.ReplicaDescriptor var rd netmap.ReplicaDescriptor
rd.SetNumberOfObjects(uint32(rs[i])) if len(rs) > 0 {
rd.SetNumberOfObjects(uint32(rs[i]))
} else {
rd.SetECDataCount(uint32(ec[i][0]))
rd.SetECParityCount(uint32(ec[i][1]))
}
replicas = append(replicas, rd) replicas = append(replicas, rd)
} }
@ -134,7 +149,7 @@ func TestTraverserObjectScenarios(t *testing.T) {
err = n.FromIterator(netmapcore.Node(nodes[1][0])) err = n.FromIterator(netmapcore.Node(nodes[1][0]))
require.NoError(t, err) require.NoError(t, err)
require.Equal(t, []Node{{addresses: n}}, tr.Next()) require.Equal(t, []Node{{addresses: n, key: []byte("/ip4/0.0.0.0/tcp/5")}}, tr.Next())
}) })
t.Run("put scenario", func(t *testing.T) { t.Run("put scenario", func(t *testing.T) {
@ -275,3 +290,268 @@ func TestTraverserRemValues(t *testing.T) {
}) })
} }
} }
type nodeState struct {
node *netmap.NodeInfo
}
func (n *nodeState) LocalNodeInfo() *netmap.NodeInfo {
return n.node
}
func TestTraverserPriorityMetrics(t *testing.T) {
t.Run("one rep one metric", func(t *testing.T) {
selectors := []int{4}
replicas := []int{3}
nodes, cnr := testPlacement(selectors, replicas)
// Node_0, PK - ip4/0.0.0.0/tcp/0
nodes[0][0].SetAttribute("ClusterName", "A")
// Node_1, PK - ip4/0.0.0.0/tcp/1
nodes[0][1].SetAttribute("ClusterName", "A")
// Node_2, PK - ip4/0.0.0.0/tcp/2
nodes[0][2].SetAttribute("ClusterName", "B")
// Node_3, PK - ip4/0.0.0.0/tcp/3
nodes[0][3].SetAttribute("ClusterName", "B")
sdkNode := testNode(5)
sdkNode.SetAttribute("ClusterName", "B")
nodesCopy := copyVectors(nodes)
m := []Metric{NewAttributeMetric("ClusterName")}
tr, err := NewTraverser(
ForContainer(cnr),
UseBuilder(&testBuilder{
vectors: nodesCopy,
}),
WithoutSuccessTracking(),
WithPriorityMetrics(m),
WithNodeState(&nodeState{
node: &sdkNode,
}),
)
require.NoError(t, err)
// Without priority metric `ClusterName` the order will be:
// [ {Node_0 A}, {Node_1 A}, {Node_2 B}, {Node_3 B}]
// With priority metric `ClusterName` and current node in cluster B
// the order should be:
// [ {Node_2 B}, {Node_0 A}, {Node_1 A}, {Node_3 B}]
next := tr.Next()
require.NotNil(t, next)
require.Equal(t, 3, len(next))
require.Equal(t, "/ip4/0.0.0.0/tcp/2", string(next[0].PublicKey()))
require.Equal(t, "/ip4/0.0.0.0/tcp/0", string(next[1].PublicKey()))
require.Equal(t, "/ip4/0.0.0.0/tcp/1", string(next[2].PublicKey()))
next = tr.Next()
// The last node is
require.Equal(t, 1, len(next))
require.Equal(t, "/ip4/0.0.0.0/tcp/3", string(next[0].PublicKey()))
next = tr.Next()
require.Nil(t, next)
})
t.Run("two reps two metrics", func(t *testing.T) {
selectors := []int{3, 3}
replicas := []int{2, 2}
nodes, cnr := testPlacement(selectors, replicas)
// REPLICA #1
// Node_0, PK - ip4/0.0.0.0/tcp/0
nodes[0][0].SetAttribute("ClusterName", "A")
nodes[0][0].SetAttribute("UN-LOCODE", "RU LED")
// Node_1, PK - ip4/0.0.0.0/tcp/1
nodes[0][1].SetAttribute("ClusterName", "A")
nodes[0][1].SetAttribute("UN-LOCODE", "FI HEL")
// Node_2, PK - ip4/0.0.0.0/tcp/2
nodes[0][2].SetAttribute("ClusterName", "A")
nodes[0][2].SetAttribute("UN-LOCODE", "RU LED")
// REPLICA #2
// Node_3 ip4/0.0.0.0/tcp/3
nodes[1][0].SetAttribute("ClusterName", "B")
nodes[1][0].SetAttribute("UN-LOCODE", "RU MOW")
// Node_4, PK - ip4/0.0.0.0/tcp/4
nodes[1][1].SetAttribute("ClusterName", "B")
nodes[1][1].SetAttribute("UN-LOCODE", "RU DME")
// Node_5, PK - ip4/0.0.0.0/tcp/5
nodes[1][2].SetAttribute("ClusterName", "B")
nodes[1][2].SetAttribute("UN-LOCODE", "RU MOW")
sdkNode := testNode(9)
sdkNode.SetAttribute("ClusterName", "B")
sdkNode.SetAttribute("UN-LOCODE", "RU DME")
nodesCopy := copyVectors(nodes)
m := []Metric{
NewAttributeMetric("ClusterName"),
NewAttributeMetric("UN-LOCODE"),
}
tr, err := NewTraverser(
ForContainer(cnr),
UseBuilder(&testBuilder{
vectors: nodesCopy,
}),
WithoutSuccessTracking(),
WithPriorityMetrics(m),
WithNodeState(&nodeState{
node: &sdkNode,
}),
)
require.NoError(t, err)
// Check that nodes in the same cluster and
// in the same location should be the first in slice.
// Nodes which are follow criteria but stay outside the replica
// should be in the next slice.
next := tr.Next()
require.Equal(t, 4, len(next))
require.Equal(t, "/ip4/0.0.0.0/tcp/4", string(next[0].PublicKey()))
require.Equal(t, "/ip4/0.0.0.0/tcp/3", string(next[1].PublicKey()))
require.Equal(t, "/ip4/0.0.0.0/tcp/0", string(next[2].PublicKey()))
require.Equal(t, "/ip4/0.0.0.0/tcp/1", string(next[3].PublicKey()))
next = tr.Next()
require.Equal(t, 2, len(next))
require.Equal(t, "/ip4/0.0.0.0/tcp/2", string(next[0].PublicKey()))
require.Equal(t, "/ip4/0.0.0.0/tcp/5", string(next[1].PublicKey()))
next = tr.Next()
require.Nil(t, next)
sdkNode.SetAttribute("ClusterName", "B")
sdkNode.SetAttribute("UN-LOCODE", "RU MOW")
nodesCopy = copyVectors(nodes)
tr, err = NewTraverser(
ForContainer(cnr),
UseBuilder(&testBuilder{
vectors: nodesCopy,
}),
WithoutSuccessTracking(),
WithPriorityMetrics(m),
WithNodeState(&nodeState{
node: &sdkNode,
}),
)
require.NoError(t, err)
next = tr.Next()
require.Equal(t, 4, len(next))
require.Equal(t, "/ip4/0.0.0.0/tcp/3", string(next[0].PublicKey()))
require.Equal(t, "/ip4/0.0.0.0/tcp/4", string(next[1].PublicKey()))
require.Equal(t, "/ip4/0.0.0.0/tcp/0", string(next[2].PublicKey()))
require.Equal(t, "/ip4/0.0.0.0/tcp/1", string(next[3].PublicKey()))
next = tr.Next()
require.Equal(t, 2, len(next))
require.Equal(t, "/ip4/0.0.0.0/tcp/2", string(next[0].PublicKey()))
require.Equal(t, "/ip4/0.0.0.0/tcp/5", string(next[1].PublicKey()))
next = tr.Next()
require.Nil(t, next)
sdkNode.SetAttribute("ClusterName", "A")
sdkNode.SetAttribute("UN-LOCODE", "RU LED")
nodesCopy = copyVectors(nodes)
tr, err = NewTraverser(
ForContainer(cnr),
UseBuilder(&testBuilder{
vectors: nodesCopy,
}),
WithoutSuccessTracking(),
WithPriorityMetrics(m),
WithNodeState(&nodeState{
node: &sdkNode,
}),
)
require.NoError(t, err)
next = tr.Next()
require.Equal(t, 4, len(next))
require.Equal(t, "/ip4/0.0.0.0/tcp/0", string(next[0].PublicKey()))
require.Equal(t, "/ip4/0.0.0.0/tcp/1", string(next[1].PublicKey()))
require.Equal(t, "/ip4/0.0.0.0/tcp/3", string(next[2].PublicKey()))
require.Equal(t, "/ip4/0.0.0.0/tcp/4", string(next[3].PublicKey()))
next = tr.Next()
require.Equal(t, 2, len(next))
require.Equal(t, "/ip4/0.0.0.0/tcp/2", string(next[0].PublicKey()))
require.Equal(t, "/ip4/0.0.0.0/tcp/5", string(next[1].PublicKey()))
next = tr.Next()
require.Nil(t, next)
})
t.Run("ec container", func(t *testing.T) {
selectors := []int{4}
ec := [][]int{{2, 1}}
nodes, cnr := testECPlacement(selectors, ec)
// Node_0, PK - ip4/0.0.0.0/tcp/0
nodes[0][0].SetAttribute("ClusterName", "A")
// Node_1, PK - ip4/0.0.0.0/tcp/1
nodes[0][1].SetAttribute("ClusterName", "A")
// Node_2, PK - ip4/0.0.0.0/tcp/2
nodes[0][2].SetAttribute("ClusterName", "B")
// Node_3, PK - ip4/0.0.0.0/tcp/3
nodes[0][3].SetAttribute("ClusterName", "B")
sdkNode := testNode(5)
sdkNode.SetAttribute("ClusterName", "B")
nodesCopy := copyVectors(nodes)
m := []Metric{NewAttributeMetric("ClusterName")}
tr, err := NewTraverser(
ForContainer(cnr),
UseBuilder(&testBuilder{
vectors: nodesCopy,
}),
WithoutSuccessTracking(),
WithPriorityMetrics(m),
WithNodeState(&nodeState{
node: &sdkNode,
}),
)
require.NoError(t, err)
// Without priority metric `ClusterName` the order will be:
// [ {Node_0 A}, {Node_1 A}, {Node_2 B}, {Node_3 B}]
// With priority metric `ClusterName` and current node in cluster B
// the order should be:
// [ {Node_2 B}, {Node_0 A}, {Node_1 A}, {Node_3 B}]
next := tr.Next()
require.NotNil(t, next)
require.Equal(t, 3, len(next))
require.Equal(t, "/ip4/0.0.0.0/tcp/2", string(next[0].PublicKey()))
require.Equal(t, "/ip4/0.0.0.0/tcp/0", string(next[1].PublicKey()))
require.Equal(t, "/ip4/0.0.0.0/tcp/1", string(next[2].PublicKey()))
next = tr.Next()
// The last node is
require.Equal(t, 1, len(next))
require.Equal(t, "/ip4/0.0.0.0/tcp/3", string(next[0].PublicKey()))
next = tr.Next()
require.Nil(t, next)
})
}

View file

@ -0,0 +1,207 @@
package tree
import (
"context"
"encoding/hex"
"fmt"
"testing"
"git.frostfs.info/TrueCloudLab/frostfs-contract/frostfsid/client"
core "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/container"
frostfsidcore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/frostfsid"
checkercore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/common/ape"
containerSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/acl"
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
"git.frostfs.info/TrueCloudLab/policy-engine/pkg/chain"
"git.frostfs.info/TrueCloudLab/policy-engine/pkg/engine"
"git.frostfs.info/TrueCloudLab/policy-engine/pkg/engine/inmemory"
nativeschema "git.frostfs.info/TrueCloudLab/policy-engine/schema/native"
"github.com/nspcc-dev/neo-go/pkg/crypto/keys"
"github.com/nspcc-dev/neo-go/pkg/util"
"github.com/stretchr/testify/require"
)
var (
containerID = "73tQMTYyUkTgmvPR1HWib6pndbhSoBovbnMF7Pws8Rcy"
senderPrivateKey, _ = keys.NewPrivateKey()
senderKey = hex.EncodeToString(senderPrivateKey.PublicKey().Bytes())
rootCnr = &core.Container{Value: containerSDK.Container{}}
)
type frostfsIDProviderMock struct {
subjects map[util.Uint160]*client.Subject
subjectsExtended map[util.Uint160]*client.SubjectExtended
}
func (f *frostfsIDProviderMock) GetSubject(key util.Uint160) (*client.Subject, error) {
v, ok := f.subjects[key]
if !ok {
return nil, fmt.Errorf("%s", frostfsidcore.SubjectNotFoundErrorMessage)
}
return v, nil
}
func (f *frostfsIDProviderMock) GetSubjectExtended(key util.Uint160) (*client.SubjectExtended, error) {
v, ok := f.subjectsExtended[key]
if !ok {
return nil, fmt.Errorf("%s", frostfsidcore.SubjectNotFoundErrorMessage)
}
return v, nil
}
var _ frostfsidcore.SubjectProvider = (*frostfsIDProviderMock)(nil)
func newFrostfsIDProviderMock(t *testing.T) *frostfsIDProviderMock {
return &frostfsIDProviderMock{
subjects: map[util.Uint160]*client.Subject{
scriptHashFromSenderKey(t, senderKey): {
Namespace: "testnamespace",
Name: "test",
KV: map[string]string{
"tag-attr1": "value1",
"tag-attr2": "value2",
},
},
},
subjectsExtended: map[util.Uint160]*client.SubjectExtended{
scriptHashFromSenderKey(t, senderKey): {
Namespace: "testnamespace",
Name: "test",
KV: map[string]string{
"tag-attr1": "value1",
"tag-attr2": "value2",
},
Groups: []*client.Group{
{
ID: 1,
Name: "test",
Namespace: "testnamespace",
KV: map[string]string{
"attr1": "value1",
"attr2": "value2",
},
},
},
},
},
}
}
func scriptHashFromSenderKey(t *testing.T, senderKey string) util.Uint160 {
pk, err := keys.NewPublicKeyFromString(senderKey)
require.NoError(t, err)
return pk.GetScriptHash()
}
type stMock struct{}
func (m *stMock) CurrentEpoch() uint64 {
return 8
}
func TestCheckAPE(t *testing.T) {
cid := cid.ID{}
_ = cid.DecodeString(containerID)
t.Run("put non-tombstone rule won't affect tree remove", func(t *testing.T) {
los := inmemory.NewInmemoryLocalStorage()
mcs := inmemory.NewInmemoryMorphRuleChainStorage()
fid := newFrostfsIDProviderMock(t)
s := Service{
cfg: cfg{
frostfsidSubjectProvider: fid,
},
apeChecker: checkercore.New(los, mcs, fid, &stMock{}),
}
los.AddOverride(chain.Ingress, engine.ContainerTarget(containerID), &chain.Chain{
Rules: []chain.Rule{
{
Status: chain.AccessDenied,
Actions: chain.Actions{Names: []string{nativeschema.MethodPutObject}},
Resources: chain.Resources{
Names: []string{fmt.Sprintf(nativeschema.ResourceFormatRootContainerObjects, containerID)},
},
Condition: []chain.Condition{
{
Op: chain.CondStringNotEquals,
Kind: chain.KindResource,
Key: nativeschema.PropertyKeyObjectType,
Value: "TOMBSTONE",
},
},
},
},
MatchType: chain.MatchTypeFirstMatch,
})
mcs.AddMorphRuleChain(chain.Ingress, engine.ContainerTarget(containerID), &chain.Chain{
Rules: []chain.Rule{
{
Status: chain.Allow,
Actions: chain.Actions{Names: []string{nativeschema.MethodDeleteObject}},
Resources: chain.Resources{
Names: []string{fmt.Sprintf(nativeschema.ResourceFormatRootContainerObjects, containerID)},
},
},
},
MatchType: chain.MatchTypeFirstMatch,
})
err := s.checkAPE(context.Background(), nil, rootCnr, cid, acl.OpObjectDelete, acl.RoleOwner, senderPrivateKey.PublicKey())
require.NoError(t, err)
})
t.Run("delete rule won't affect tree add", func(t *testing.T) {
los := inmemory.NewInmemoryLocalStorage()
mcs := inmemory.NewInmemoryMorphRuleChainStorage()
fid := newFrostfsIDProviderMock(t)
s := Service{
cfg: cfg{
frostfsidSubjectProvider: fid,
},
apeChecker: checkercore.New(los, mcs, fid, &stMock{}),
}
los.AddOverride(chain.Ingress, engine.ContainerTarget(containerID), &chain.Chain{
Rules: []chain.Rule{
{
Status: chain.AccessDenied,
Actions: chain.Actions{Names: []string{nativeschema.MethodDeleteObject}},
Resources: chain.Resources{
Names: []string{fmt.Sprintf(nativeschema.ResourceFormatRootContainerObjects, containerID)},
},
},
},
MatchType: chain.MatchTypeFirstMatch,
})
mcs.AddMorphRuleChain(chain.Ingress, engine.ContainerTarget(containerID), &chain.Chain{
Rules: []chain.Rule{
{
Status: chain.Allow,
Actions: chain.Actions{Names: []string{nativeschema.MethodPutObject}},
Resources: chain.Resources{
Names: []string{fmt.Sprintf(nativeschema.ResourceFormatRootContainerObjects, containerID)},
},
Condition: []chain.Condition{
{
Op: chain.CondStringNotEquals,
Kind: chain.KindResource,
Key: nativeschema.PropertyKeyObjectType,
Value: "TOMBSTONE",
},
},
},
},
MatchType: chain.MatchTypeFirstMatch,
})
err := s.checkAPE(context.Background(), nil, rootCnr, cid, acl.OpObjectPut, acl.RoleOwner, senderPrivateKey.PublicKey())
require.NoError(t, err)
})
}

View file

@ -210,7 +210,7 @@ func (s *Service) Remove(ctx context.Context, req *RemoveRequest) (*RemoveRespon
return nil, err return nil, err
} }
err := s.verifyClient(ctx, req, cid, b.GetBearerToken(), acl.OpObjectPut) err := s.verifyClient(ctx, req, cid, b.GetBearerToken(), acl.OpObjectDelete)
if err != nil { if err != nil {
return nil, err return nil, err
} }

View file

@ -3,8 +3,14 @@ package httputil
import ( import (
"net/http" "net/http"
"net/http/pprof" "net/http/pprof"
"github.com/felixge/fgprof"
) )
func init() {
http.DefaultServeMux.Handle("/debug/fgprof", fgprof.Handler())
}
// initializes pprof package in order to // initializes pprof package in order to
// register Prometheus handlers on http.DefaultServeMux. // register Prometheus handlers on http.DefaultServeMux.
var _ = pprof.Handler("") var _ = pprof.Handler("")

View file

@ -19,7 +19,7 @@ import (
func GeneratePayloadPool(count uint, size uint) [][]byte { func GeneratePayloadPool(count uint, size uint) [][]byte {
var pool [][]byte var pool [][]byte
for i := uint(0); i < count; i++ { for range count {
payload := make([]byte, size) payload := make([]byte, size)
_, _ = rand.Read(payload) _, _ = rand.Read(payload)
@ -30,8 +30,8 @@ func GeneratePayloadPool(count uint, size uint) [][]byte {
func GenerateAttributePool(count uint) []objectSDK.Attribute { func GenerateAttributePool(count uint) []objectSDK.Attribute {
var pool []objectSDK.Attribute var pool []objectSDK.Attribute
for i := uint(0); i < count; i++ { for i := range count {
for j := uint(0); j < count; j++ { for j := range count {
attr := *objectSDK.NewAttribute() attr := *objectSDK.NewAttribute()
attr.SetKey(fmt.Sprintf("key%d", i)) attr.SetKey(fmt.Sprintf("key%d", i))
attr.SetValue(fmt.Sprintf("value%d", j)) attr.SetValue(fmt.Sprintf("value%d", j))
@ -43,7 +43,7 @@ func GenerateAttributePool(count uint) []objectSDK.Attribute {
func GenerateOwnerPool(count uint) []user.ID { func GenerateOwnerPool(count uint) []user.ID {
var pool []user.ID var pool []user.ID
for i := uint(0); i < count; i++ { for range count {
pool = append(pool, usertest.ID()) pool = append(pool, usertest.ID())
} }
return pool return pool
@ -118,7 +118,7 @@ func WithPayloadFromPool(pool [][]byte) ObjectOption {
func WithAttributesFromPool(pool []objectSDK.Attribute, count uint) ObjectOption { func WithAttributesFromPool(pool []objectSDK.Attribute, count uint) ObjectOption {
return func(obj *objectSDK.Object) { return func(obj *objectSDK.Object) {
var attrs []objectSDK.Attribute var attrs []objectSDK.Attribute
for i := uint(0); i < count; i++ { for range count {
attrs = append(attrs, pool[rand.Intn(len(pool))]) attrs = append(attrs, pool[rand.Intn(len(pool))])
} }
obj.SetAttributes(attrs...) obj.SetAttributes(attrs...)

View file

@ -29,7 +29,7 @@ func PopulateWithObjects(
) { ) {
digits := "0123456789" digits := "0123456789"
for i := uint(0); i < count; i++ { for range count {
obj := factory() obj := factory()
id := []byte(fmt.Sprintf( id := []byte(fmt.Sprintf(
@ -59,7 +59,7 @@ func PopulateWithBigObjects(
count uint, count uint,
factory func() *objectSDK.Object, factory func() *objectSDK.Object,
) { ) {
for i := uint(0); i < count; i++ { for range count {
group.Go(func() error { group.Go(func() error {
if err := populateWithBigObject(ctx, db, factory); err != nil { if err := populateWithBigObject(ctx, db, factory); err != nil {
return fmt.Errorf("couldn't put a big object: %w", err) return fmt.Errorf("couldn't put a big object: %w", err)
@ -154,7 +154,7 @@ func PopulateGraveyard(
wg := &sync.WaitGroup{} wg := &sync.WaitGroup{}
wg.Add(int(count)) wg.Add(int(count))
for i := uint(0); i < count; i++ { for range count {
obj := factory() obj := factory()
prm := meta.PutPrm{} prm := meta.PutPrm{}
@ -226,7 +226,7 @@ func PopulateLocked(
wg := &sync.WaitGroup{} wg := &sync.WaitGroup{}
wg.Add(int(count)) wg.Add(int(count))
for i := uint(0); i < count; i++ { for range count {
defer wg.Done() defer wg.Done()
obj := factory() obj := factory()

View file

@ -116,7 +116,7 @@ func populate() (err error) {
eg, ctx := errgroup.WithContext(ctx) eg, ctx := errgroup.WithContext(ctx)
eg.SetLimit(int(jobs)) eg.SetLimit(int(jobs))
for i := uint(0); i < numContainers; i++ { for range numContainers {
cid := cidtest.ID() cid := cidtest.ID()
for _, typ := range types { for _, typ := range types {