forked from TrueCloudLab/frostfs-node
Compare commits
16 commits
feat/get-p
...
master
Author | SHA1 | Date | |
---|---|---|---|
d19ab43500 | |||
5bcf81d1cc | |||
c2effcc61c | |||
2285cfc36f | |||
e74d05c03f | |||
48862e0e63 | |||
89892d9754 | |||
7ac0852364 | |||
d28a5d2d7a | |||
87ac3c5279 | |||
d5ee6d3039 | |||
433aab12bb | |||
81f4cdbb91 | |||
3cd7d23f10 | |||
012af5cc38 | |||
eb5336d5ff |
45 changed files with 804 additions and 147 deletions
|
@ -87,5 +87,7 @@ linters:
|
||||||
- perfsprint
|
- perfsprint
|
||||||
- testifylint
|
- testifylint
|
||||||
- protogetter
|
- protogetter
|
||||||
|
- intrange
|
||||||
|
- tenv
|
||||||
disable-all: true
|
disable-all: true
|
||||||
fast: false
|
fast: false
|
||||||
|
|
|
@ -1,11 +0,0 @@
|
||||||
pipeline:
|
|
||||||
# Kludge for non-root containers under WoodPecker
|
|
||||||
fix-ownership:
|
|
||||||
image: alpine:latest
|
|
||||||
commands: chown -R 1234:1234 .
|
|
||||||
|
|
||||||
pre-commit:
|
|
||||||
image: git.frostfs.info/truecloudlab/frostfs-ci:v0.36
|
|
||||||
commands:
|
|
||||||
- export HOME="$(getent passwd $(id -u) | cut '-d:' -f6)"
|
|
||||||
- pre-commit run --hook-stage manual
|
|
2
Makefile
2
Makefile
|
@ -8,7 +8,7 @@ HUB_IMAGE ?= git.frostfs.info/truecloudlab/frostfs
|
||||||
HUB_TAG ?= "$(shell echo ${VERSION} | sed 's/^v//')"
|
HUB_TAG ?= "$(shell echo ${VERSION} | sed 's/^v//')"
|
||||||
|
|
||||||
GO_VERSION ?= 1.22
|
GO_VERSION ?= 1.22
|
||||||
LINT_VERSION ?= 1.60.3
|
LINT_VERSION ?= 1.61.0
|
||||||
TRUECLOUDLAB_LINT_VERSION ?= 0.0.7
|
TRUECLOUDLAB_LINT_VERSION ?= 0.0.7
|
||||||
PROTOC_VERSION ?= 25.0
|
PROTOC_VERSION ?= 25.0
|
||||||
PROTOGEN_FROSTFS_VERSION ?= $(shell go list -f '{{.Version}}' -m git.frostfs.info/TrueCloudLab/frostfs-api-go/v2)
|
PROTOGEN_FROSTFS_VERSION ?= $(shell go list -f '{{.Version}}' -m git.frostfs.info/TrueCloudLab/frostfs-api-go/v2)
|
||||||
|
|
|
@ -128,7 +128,7 @@ func generateConfigExample(appDir string, credSize int) (string, error) {
|
||||||
tmpl.AlphabetDir = filepath.Join(appDir, "alphabet-wallets")
|
tmpl.AlphabetDir = filepath.Join(appDir, "alphabet-wallets")
|
||||||
|
|
||||||
var i innerring.GlagoliticLetter
|
var i innerring.GlagoliticLetter
|
||||||
for i = 0; i < innerring.GlagoliticLetter(credSize); i++ {
|
for i = range innerring.GlagoliticLetter(credSize) {
|
||||||
tmpl.Glagolitics = append(tmpl.Glagolitics, i.String())
|
tmpl.Glagolitics = append(tmpl.Glagolitics, i.String())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -63,7 +63,7 @@ func TestGenerateAlphabet(t *testing.T) {
|
||||||
buf.Reset()
|
buf.Reset()
|
||||||
v.Set(commonflags.AlphabetWalletsFlag, walletDir)
|
v.Set(commonflags.AlphabetWalletsFlag, walletDir)
|
||||||
require.NoError(t, GenerateAlphabetCmd.Flags().Set(commonflags.AlphabetSizeFlag, strconv.FormatUint(size, 10)))
|
require.NoError(t, GenerateAlphabetCmd.Flags().Set(commonflags.AlphabetSizeFlag, strconv.FormatUint(size, 10)))
|
||||||
for i := uint64(0); i < size; i++ {
|
for i := range uint64(size) {
|
||||||
buf.WriteString(strconv.FormatUint(i, 10) + "\r")
|
buf.WriteString(strconv.FormatUint(i, 10) + "\r")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -659,9 +659,7 @@ func SearchObjects(ctx context.Context, prm SearchObjectsPrm) (*SearchObjectsRes
|
||||||
|
|
||||||
for {
|
for {
|
||||||
n, ok = rdr.Read(buf)
|
n, ok = rdr.Read(buf)
|
||||||
for i := range n {
|
list = append(list, buf[:n]...)
|
||||||
list = append(list, buf[i])
|
|
||||||
}
|
|
||||||
if !ok {
|
if !ok {
|
||||||
break
|
break
|
||||||
}
|
}
|
||||||
|
|
|
@ -195,7 +195,7 @@ func flattenComplexMembersIfECContainer(cmd *cobra.Command, cnrID cid.ID, member
|
||||||
prmHead.SetRawFlag(true) // to get an error instead of whole object
|
prmHead.SetRawFlag(true) // to get an error instead of whole object
|
||||||
|
|
||||||
eg, egCtx := errgroup.WithContext(cmd.Context())
|
eg, egCtx := errgroup.WithContext(cmd.Context())
|
||||||
for idx := range len(members) {
|
for idx := range members {
|
||||||
partObjID := members[idx]
|
partObjID := members[idx]
|
||||||
|
|
||||||
eg.Go(func() error {
|
eg.Go(func() error {
|
||||||
|
|
|
@ -114,12 +114,14 @@ func initConfig() {
|
||||||
} else {
|
} else {
|
||||||
// Find home directory.
|
// Find home directory.
|
||||||
home, err := homedir.Dir()
|
home, err := homedir.Dir()
|
||||||
commonCmd.ExitOnErr(rootCmd, "", err)
|
if err != nil {
|
||||||
|
common.PrintVerbose(rootCmd, "Get homedir: %s", err)
|
||||||
// Search config in `$HOME/.config/frostfs-cli/` with name "config.yaml"
|
} else {
|
||||||
viper.AddConfigPath(filepath.Join(home, ".config", "frostfs-cli"))
|
// Search config in `$HOME/.config/frostfs-cli/` with name "config.yaml"
|
||||||
viper.SetConfigName("config")
|
viper.AddConfigPath(filepath.Join(home, ".config", "frostfs-cli"))
|
||||||
viper.SetConfigType("yaml")
|
viper.SetConfigName("config")
|
||||||
|
viper.SetConfigType("yaml")
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
viper.SetEnvPrefix(envPrefix)
|
viper.SetEnvPrefix(envPrefix)
|
||||||
|
|
|
@ -11,7 +11,7 @@ func DecodeOIDs(data []byte) ([]oid.ID, error) {
|
||||||
size := r.ReadVarUint()
|
size := r.ReadVarUint()
|
||||||
oids := make([]oid.ID, size)
|
oids := make([]oid.ID, size)
|
||||||
|
|
||||||
for i := uint64(0); i < size; i++ {
|
for i := range size {
|
||||||
if err := oids[i].Decode(r.ReadVarBytes()); err != nil {
|
if err := oids[i].Decode(r.ReadVarBytes()); err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
|
@ -58,6 +58,7 @@ import (
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/control"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/control"
|
||||||
objectService "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object"
|
objectService "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object"
|
||||||
getsvc "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/get"
|
getsvc "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/get"
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object_manager/placement"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object_manager/tombstone"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object_manager/tombstone"
|
||||||
tsourse "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object_manager/tombstone/source"
|
tsourse "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object_manager/tombstone/source"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/replicator"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/replicator"
|
||||||
|
@ -109,6 +110,7 @@ type applicationConfiguration struct {
|
||||||
|
|
||||||
ObjectCfg struct {
|
ObjectCfg struct {
|
||||||
tombstoneLifetime uint64
|
tombstoneLifetime uint64
|
||||||
|
priorityMetrics []placement.Metric
|
||||||
}
|
}
|
||||||
|
|
||||||
EngineCfg struct {
|
EngineCfg struct {
|
||||||
|
@ -232,6 +234,15 @@ func (a *applicationConfiguration) readConfig(c *config.Config) error {
|
||||||
// Object
|
// Object
|
||||||
|
|
||||||
a.ObjectCfg.tombstoneLifetime = objectconfig.TombstoneLifetime(c)
|
a.ObjectCfg.tombstoneLifetime = objectconfig.TombstoneLifetime(c)
|
||||||
|
var pm []placement.Metric
|
||||||
|
for _, raw := range objectconfig.Get(c).Priority() {
|
||||||
|
m, err := placement.ParseMetric(raw)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
pm = append(pm, m)
|
||||||
|
}
|
||||||
|
a.ObjectCfg.priorityMetrics = pm
|
||||||
|
|
||||||
// Storage Engine
|
// Storage Engine
|
||||||
|
|
||||||
|
@ -1179,17 +1190,15 @@ func initObjectPool(cfg *config.Config) (pool cfgObjectRoutines) {
|
||||||
return pool
|
return pool
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *cfg) LocalNodeInfo() (*netmapV2.NodeInfo, error) {
|
func (c *cfg) LocalNodeInfo() *netmap.NodeInfo {
|
||||||
var res netmapV2.NodeInfo
|
var res netmap.NodeInfo
|
||||||
|
|
||||||
ni, ok := c.cfgNetmap.state.getNodeInfo()
|
ni, ok := c.cfgNetmap.state.getNodeInfo()
|
||||||
if ok {
|
if ok {
|
||||||
ni.WriteToV2(&res)
|
res = ni
|
||||||
} else {
|
} else {
|
||||||
c.cfgNodeInfo.localInfo.WriteToV2(&res)
|
res = c.cfgNodeInfo.localInfo
|
||||||
}
|
}
|
||||||
|
return &res
|
||||||
return &res, nil
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// setContractNodeInfo rewrites local node info from the FrostFS network map.
|
// setContractNodeInfo rewrites local node info from the FrostFS network map.
|
||||||
|
|
|
@ -1,7 +1,6 @@
|
||||||
package config_test
|
package config_test
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"os"
|
|
||||||
"strings"
|
"strings"
|
||||||
"testing"
|
"testing"
|
||||||
|
|
||||||
|
@ -38,8 +37,7 @@ func TestConfigEnv(t *testing.T) {
|
||||||
|
|
||||||
envName := strings.ToUpper(
|
envName := strings.ToUpper(
|
||||||
strings.Join([]string{config.EnvPrefix, section, name}, configViper.EnvSeparator))
|
strings.Join([]string{config.EnvPrefix, section, name}, configViper.EnvSeparator))
|
||||||
err := os.Setenv(envName, value)
|
t.Setenv(envName, value)
|
||||||
require.NoError(t, err)
|
|
||||||
|
|
||||||
c := configtest.EmptyConfig()
|
c := configtest.EmptyConfig()
|
||||||
|
|
||||||
|
|
|
@ -10,10 +10,17 @@ type PutConfig struct {
|
||||||
cfg *config.Config
|
cfg *config.Config
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// GetConfig is a wrapper over "get" config section which provides access
|
||||||
|
// to object get pipeline configuration of object service.
|
||||||
|
type GetConfig struct {
|
||||||
|
cfg *config.Config
|
||||||
|
}
|
||||||
|
|
||||||
const (
|
const (
|
||||||
subsection = "object"
|
subsection = "object"
|
||||||
|
|
||||||
putSubsection = "put"
|
putSubsection = "put"
|
||||||
|
getSubsection = "get"
|
||||||
|
|
||||||
// PutPoolSizeDefault is a default value of routine pool size to
|
// PutPoolSizeDefault is a default value of routine pool size to
|
||||||
// process object.Put requests in object service.
|
// process object.Put requests in object service.
|
||||||
|
@ -56,3 +63,16 @@ func (g PutConfig) PoolSizeLocal() int {
|
||||||
func (g PutConfig) SkipSessionTokenIssuerVerification() bool {
|
func (g PutConfig) SkipSessionTokenIssuerVerification() bool {
|
||||||
return config.BoolSafe(g.cfg, "skip_session_token_issuer_verification")
|
return config.BoolSafe(g.cfg, "skip_session_token_issuer_verification")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Get returns structure that provides access to "get" subsection of
|
||||||
|
// "object" section.
|
||||||
|
func Get(c *config.Config) GetConfig {
|
||||||
|
return GetConfig{
|
||||||
|
c.Sub(subsection).Sub(getSubsection),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Priority returns the value of "priority" config parameter.
|
||||||
|
func (g GetConfig) Priority() []string {
|
||||||
|
return config.StringSliceSafe(g.cfg, "priority")
|
||||||
|
}
|
||||||
|
|
|
@ -11,8 +11,6 @@ import (
|
||||||
)
|
)
|
||||||
|
|
||||||
func fromFile(path string) *config.Config {
|
func fromFile(path string) *config.Config {
|
||||||
os.Clearenv() // ENVs have priority over config files, so we do this in tests
|
|
||||||
|
|
||||||
return config.New(path, "", "")
|
return config.New(path, "", "")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -40,15 +38,6 @@ func ForEachFileType(pref string, f func(*config.Config)) {
|
||||||
|
|
||||||
// ForEnvFileType creates config from `<pref>.env` file.
|
// ForEnvFileType creates config from `<pref>.env` file.
|
||||||
func ForEnvFileType(t testing.TB, pref string, f func(*config.Config)) {
|
func ForEnvFileType(t testing.TB, pref string, f func(*config.Config)) {
|
||||||
envs := os.Environ()
|
|
||||||
t.Cleanup(func() {
|
|
||||||
os.Clearenv()
|
|
||||||
for _, env := range envs {
|
|
||||||
keyValue := strings.Split(env, "=")
|
|
||||||
os.Setenv(keyValue[0], keyValue[1])
|
|
||||||
}
|
|
||||||
})
|
|
||||||
|
|
||||||
f(fromEnvFile(t, pref+".env"))
|
f(fromEnvFile(t, pref+".env"))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -73,7 +62,6 @@ func loadEnv(t testing.TB, path string) {
|
||||||
|
|
||||||
v = strings.Trim(v, `"`)
|
v = strings.Trim(v, `"`)
|
||||||
|
|
||||||
err = os.Setenv(k, v)
|
t.Setenv(k, v)
|
||||||
require.NoError(t, err, "can't set environment variable")
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -8,6 +8,7 @@ import (
|
||||||
containerGRPC "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/container/grpc"
|
containerGRPC "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/container/grpc"
|
||||||
morphconfig "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/morph"
|
morphconfig "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/morph"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/metrics"
|
||||||
containerCore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/container"
|
containerCore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/container"
|
||||||
frostfsidcore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/frostfsid"
|
frostfsidcore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/frostfsid"
|
||||||
cntClient "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/client/container"
|
cntClient "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/client/container"
|
||||||
|
@ -42,7 +43,7 @@ func initContainerService(_ context.Context, c *cfg) {
|
||||||
|
|
||||||
cacheSize := morphconfig.FrostfsIDCacheSize(c.appCfg)
|
cacheSize := morphconfig.FrostfsIDCacheSize(c.appCfg)
|
||||||
if cacheSize > 0 {
|
if cacheSize > 0 {
|
||||||
frostfsIDSubjectProvider = newMorphFrostfsIDCache(frostfsIDSubjectProvider, int(cacheSize), c.cfgMorph.cacheTTL)
|
frostfsIDSubjectProvider = newMorphFrostfsIDCache(frostfsIDSubjectProvider, int(cacheSize), c.cfgMorph.cacheTTL, metrics.NewCacheMetrics("frostfs_id"))
|
||||||
}
|
}
|
||||||
|
|
||||||
c.shared.frostfsidClient = frostfsIDSubjectProvider
|
c.shared.frostfsidClient = frostfsIDSubjectProvider
|
||||||
|
|
|
@ -1,6 +1,7 @@
|
||||||
package main
|
package main
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"strings"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-contract/frostfsid/client"
|
"git.frostfs.info/TrueCloudLab/frostfs-contract/frostfsid/client"
|
||||||
|
@ -9,57 +10,101 @@ import (
|
||||||
"github.com/nspcc-dev/neo-go/pkg/util"
|
"github.com/nspcc-dev/neo-go/pkg/util"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
type subjectWithError struct {
|
||||||
|
subject *client.Subject
|
||||||
|
err error
|
||||||
|
}
|
||||||
|
|
||||||
|
type subjectExtWithError struct {
|
||||||
|
subject *client.SubjectExtended
|
||||||
|
err error
|
||||||
|
}
|
||||||
|
|
||||||
type morphFrostfsIDCache struct {
|
type morphFrostfsIDCache struct {
|
||||||
subjProvider frostfsidcore.SubjectProvider
|
subjProvider frostfsidcore.SubjectProvider
|
||||||
|
|
||||||
subjCache *expirable.LRU[util.Uint160, *client.Subject]
|
subjCache *expirable.LRU[util.Uint160, subjectWithError]
|
||||||
|
|
||||||
subjExtCache *expirable.LRU[util.Uint160, *client.SubjectExtended]
|
subjExtCache *expirable.LRU[util.Uint160, subjectExtWithError]
|
||||||
|
|
||||||
|
metrics cacheMetrics
|
||||||
}
|
}
|
||||||
|
|
||||||
func newMorphFrostfsIDCache(subjProvider frostfsidcore.SubjectProvider, size int, ttl time.Duration) frostfsidcore.SubjectProvider {
|
func newMorphFrostfsIDCache(subjProvider frostfsidcore.SubjectProvider, size int, ttl time.Duration, metrics cacheMetrics) frostfsidcore.SubjectProvider {
|
||||||
return &morphFrostfsIDCache{
|
return &morphFrostfsIDCache{
|
||||||
subjProvider: subjProvider,
|
subjProvider: subjProvider,
|
||||||
|
|
||||||
subjCache: expirable.NewLRU(size, func(util.Uint160, *client.Subject) {}, ttl),
|
subjCache: expirable.NewLRU(size, func(util.Uint160, subjectWithError) {}, ttl),
|
||||||
|
|
||||||
subjExtCache: expirable.NewLRU(size, func(util.Uint160, *client.SubjectExtended) {}, ttl),
|
subjExtCache: expirable.NewLRU(size, func(util.Uint160, subjectExtWithError) {}, ttl),
|
||||||
|
|
||||||
|
metrics: metrics,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (m *morphFrostfsIDCache) GetSubject(addr util.Uint160) (*client.Subject, error) {
|
func (m *morphFrostfsIDCache) GetSubject(addr util.Uint160) (*client.Subject, error) {
|
||||||
|
hit := false
|
||||||
|
startedAt := time.Now()
|
||||||
|
defer func() {
|
||||||
|
m.metrics.AddMethodDuration("GetSubject", time.Since(startedAt), hit)
|
||||||
|
}()
|
||||||
|
|
||||||
result, found := m.subjCache.Get(addr)
|
result, found := m.subjCache.Get(addr)
|
||||||
if found {
|
if found {
|
||||||
return result, nil
|
hit = true
|
||||||
|
return result.subject, result.err
|
||||||
}
|
}
|
||||||
|
|
||||||
result, err := m.subjProvider.GetSubject(addr)
|
subj, err := m.subjProvider.GetSubject(addr)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
if m.isCacheableError(err) {
|
||||||
|
m.subjCache.Add(addr, subjectWithError{
|
||||||
|
err: err,
|
||||||
|
})
|
||||||
|
}
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
m.subjCache.Add(addr, result)
|
m.subjCache.Add(addr, subjectWithError{subject: subj})
|
||||||
return result, nil
|
return subj, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (m *morphFrostfsIDCache) GetSubjectExtended(addr util.Uint160) (*client.SubjectExtended, error) {
|
func (m *morphFrostfsIDCache) GetSubjectExtended(addr util.Uint160) (*client.SubjectExtended, error) {
|
||||||
subjExt, found := m.subjExtCache.Get(addr)
|
hit := false
|
||||||
|
startedAt := time.Now()
|
||||||
|
defer func() {
|
||||||
|
m.metrics.AddMethodDuration("GetSubjectExtended", time.Since(startedAt), hit)
|
||||||
|
}()
|
||||||
|
|
||||||
|
result, found := m.subjExtCache.Get(addr)
|
||||||
if found {
|
if found {
|
||||||
return subjExt, nil
|
hit = true
|
||||||
|
return result.subject, result.err
|
||||||
}
|
}
|
||||||
|
|
||||||
var err error
|
subjExt, err := m.subjProvider.GetSubjectExtended(addr)
|
||||||
subjExt, err = m.subjProvider.GetSubjectExtended(addr)
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
if m.isCacheableError(err) {
|
||||||
|
m.subjExtCache.Add(addr, subjectExtWithError{
|
||||||
|
err: err,
|
||||||
|
})
|
||||||
|
m.subjCache.Add(addr, subjectWithError{
|
||||||
|
err: err,
|
||||||
|
})
|
||||||
|
}
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
m.subjExtCache.Add(addr, subjExt)
|
m.subjExtCache.Add(addr, subjectExtWithError{subject: subjExt})
|
||||||
m.subjCache.Add(addr, subjectFromSubjectExtended(subjExt))
|
m.subjCache.Add(addr, subjectWithError{subject: subjectFromSubjectExtended(subjExt)})
|
||||||
|
|
||||||
return subjExt, nil
|
return subjExt, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (m *morphFrostfsIDCache) isCacheableError(err error) bool {
|
||||||
|
return strings.Contains(err.Error(), frostfsidcore.SubjectNotFoundErrorMessage)
|
||||||
|
}
|
||||||
|
|
||||||
func subjectFromSubjectExtended(subjExt *client.SubjectExtended) *client.Subject {
|
func subjectFromSubjectExtended(subjExt *client.SubjectExtended) *client.Subject {
|
||||||
return &client.Subject{
|
return &client.Subject{
|
||||||
PrimaryKey: subjExt.PrimaryKey,
|
PrimaryKey: subjExt.PrimaryKey,
|
||||||
|
|
|
@ -178,7 +178,8 @@ func initObjectService(c *cfg) {
|
||||||
|
|
||||||
sSearchV2 := createSearchSvcV2(sSearch, keyStorage)
|
sSearchV2 := createSearchSvcV2(sSearch, keyStorage)
|
||||||
|
|
||||||
sGet := createGetService(c, keyStorage, traverseGen, c.clientCache, c.cfgObject.cnrSource)
|
sGet := createGetService(c, keyStorage, traverseGen, c.clientCache, c.cfgObject.cnrSource,
|
||||||
|
c.ObjectCfg.priorityMetrics)
|
||||||
|
|
||||||
*c.cfgObject.getSvc = *sGet // need smth better
|
*c.cfgObject.getSvc = *sGet // need smth better
|
||||||
|
|
||||||
|
@ -389,6 +390,7 @@ func createSearchSvcV2(sSearch *searchsvc.Service, keyStorage *util.KeyStorage)
|
||||||
func createGetService(c *cfg, keyStorage *util.KeyStorage, traverseGen *util.TraverserGenerator,
|
func createGetService(c *cfg, keyStorage *util.KeyStorage, traverseGen *util.TraverserGenerator,
|
||||||
coreConstructor *cache.ClientCache,
|
coreConstructor *cache.ClientCache,
|
||||||
containerSource containercore.Source,
|
containerSource containercore.Source,
|
||||||
|
priorityMetrics []placement.Metric,
|
||||||
) *getsvc.Service {
|
) *getsvc.Service {
|
||||||
ls := c.cfgObject.cfgLocalStorage.localStorage
|
ls := c.cfgObject.cfgLocalStorage.localStorage
|
||||||
|
|
||||||
|
@ -398,6 +400,8 @@ func createGetService(c *cfg, keyStorage *util.KeyStorage, traverseGen *util.Tra
|
||||||
ls,
|
ls,
|
||||||
traverseGen.WithTraverseOptions(
|
traverseGen.WithTraverseOptions(
|
||||||
placement.SuccessAfter(1),
|
placement.SuccessAfter(1),
|
||||||
|
placement.WithPriorityMetrics(priorityMetrics),
|
||||||
|
placement.WithNodeState(c),
|
||||||
),
|
),
|
||||||
coreConstructor,
|
coreConstructor,
|
||||||
containerSource,
|
containerSource,
|
||||||
|
|
|
@ -87,6 +87,7 @@ FROSTFS_OBJECT_PUT_REMOTE_POOL_SIZE=100
|
||||||
FROSTFS_OBJECT_PUT_LOCAL_POOL_SIZE=200
|
FROSTFS_OBJECT_PUT_LOCAL_POOL_SIZE=200
|
||||||
FROSTFS_OBJECT_PUT_SKIP_SESSION_TOKEN_ISSUER_VERIFICATION=true
|
FROSTFS_OBJECT_PUT_SKIP_SESSION_TOKEN_ISSUER_VERIFICATION=true
|
||||||
FROSTFS_OBJECT_DELETE_TOMBSTONE_LIFETIME=10
|
FROSTFS_OBJECT_DELETE_TOMBSTONE_LIFETIME=10
|
||||||
|
FROSTFS_OBJECT_GET_PRIORITY="$attribute:ClusterName $attribute:UN-LOCODE"
|
||||||
|
|
||||||
# Storage engine section
|
# Storage engine section
|
||||||
FROSTFS_STORAGE_SHARD_POOL_SIZE=15
|
FROSTFS_STORAGE_SHARD_POOL_SIZE=15
|
||||||
|
|
|
@ -131,6 +131,9 @@
|
||||||
"remote_pool_size": 100,
|
"remote_pool_size": 100,
|
||||||
"local_pool_size": 200,
|
"local_pool_size": 200,
|
||||||
"skip_session_token_issuer_verification": true
|
"skip_session_token_issuer_verification": true
|
||||||
|
},
|
||||||
|
"get": {
|
||||||
|
"priority": ["$attribute:ClusterName", "$attribute:UN-LOCODE"]
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
"storage": {
|
"storage": {
|
||||||
|
|
|
@ -114,6 +114,10 @@ object:
|
||||||
remote_pool_size: 100 # number of async workers for remote PUT operations
|
remote_pool_size: 100 # number of async workers for remote PUT operations
|
||||||
local_pool_size: 200 # number of async workers for local PUT operations
|
local_pool_size: 200 # number of async workers for local PUT operations
|
||||||
skip_session_token_issuer_verification: true # session token issuer verification will be skipped if true
|
skip_session_token_issuer_verification: true # session token issuer verification will be skipped if true
|
||||||
|
get:
|
||||||
|
priority: # list of metrics of nodes for prioritization
|
||||||
|
- $attribute:ClusterName
|
||||||
|
- $attribute:UN-LOCODE
|
||||||
|
|
||||||
storage:
|
storage:
|
||||||
# note: shard configuration can be omitted for relay node (see `node.relay`)
|
# note: shard configuration can be omitted for relay node (see `node.relay`)
|
||||||
|
|
|
@ -407,13 +407,17 @@ Contains object-service related parameters.
|
||||||
object:
|
object:
|
||||||
put:
|
put:
|
||||||
remote_pool_size: 100
|
remote_pool_size: 100
|
||||||
|
get:
|
||||||
|
priority:
|
||||||
|
- $attribute:ClusterName
|
||||||
```
|
```
|
||||||
|
|
||||||
| Parameter | Type | Default value | Description |
|
| Parameter | Type | Default value | Description |
|
||||||
|-----------------------------|-------|---------------|------------------------------------------------------------------------------------------------|
|
|-----------------------------|------------|---------------|------------------------------------------------------------------------------------------------------|
|
||||||
| `delete.tombstone_lifetime` | `int` | `5` | Tombstone lifetime for removed objects in epochs. |
|
| `delete.tombstone_lifetime` | `int` | `5` | Tombstone lifetime for removed objects in epochs. |
|
||||||
| `put.remote_pool_size` | `int` | `10` | Max pool size for performing remote `PUT` operations. Used by Policer and Replicator services. |
|
| `put.remote_pool_size` | `int` | `10` | Max pool size for performing remote `PUT` operations. Used by Policer and Replicator services. |
|
||||||
| `put.local_pool_size` | `int` | `10` | Max pool size for performing local `PUT` operations. Used by Policer and Replicator services. |
|
| `put.local_pool_size` | `int` | `10` | Max pool size for performing local `PUT` operations. Used by Policer and Replicator services. |
|
||||||
|
| `get.priority` | `[]string` | | List of metrics of nodes for prioritization. Used for computing response on GET and SEARCH requests. |
|
||||||
|
|
||||||
# `runtime` section
|
# `runtime` section
|
||||||
Contains runtime parameters.
|
Contains runtime parameters.
|
||||||
|
|
2
go.mod
2
go.mod
|
@ -19,6 +19,7 @@ require (
|
||||||
github.com/cheggaaa/pb v1.0.29
|
github.com/cheggaaa/pb v1.0.29
|
||||||
github.com/chzyer/readline v1.5.1
|
github.com/chzyer/readline v1.5.1
|
||||||
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc
|
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc
|
||||||
|
github.com/felixge/fgprof v0.9.5
|
||||||
github.com/flynn-archive/go-shlex v0.0.0-20150515145356-3f9db97f8568
|
github.com/flynn-archive/go-shlex v0.0.0-20150515145356-3f9db97f8568
|
||||||
github.com/gdamore/tcell/v2 v2.7.4
|
github.com/gdamore/tcell/v2 v2.7.4
|
||||||
github.com/go-pkgz/expirable-cache/v3 v3.0.0
|
github.com/go-pkgz/expirable-cache/v3 v3.0.0
|
||||||
|
@ -77,6 +78,7 @@ require (
|
||||||
github.com/go-logr/logr v1.4.2 // indirect
|
github.com/go-logr/logr v1.4.2 // indirect
|
||||||
github.com/go-logr/stdr v1.2.2 // indirect
|
github.com/go-logr/stdr v1.2.2 // indirect
|
||||||
github.com/golang/snappy v0.0.4 // indirect
|
github.com/golang/snappy v0.0.4 // indirect
|
||||||
|
github.com/google/pprof v0.0.0-20240227163752-401108e1b7e7 // indirect
|
||||||
github.com/gorilla/websocket v1.5.1 // indirect
|
github.com/gorilla/websocket v1.5.1 // indirect
|
||||||
github.com/grpc-ecosystem/go-grpc-middleware/providers/prometheus v1.0.0 // indirect
|
github.com/grpc-ecosystem/go-grpc-middleware/providers/prometheus v1.0.0 // indirect
|
||||||
github.com/grpc-ecosystem/go-grpc-middleware/v2 v2.0.1 // indirect
|
github.com/grpc-ecosystem/go-grpc-middleware/v2 v2.0.1 // indirect
|
||||||
|
|
BIN
go.sum
BIN
go.sum
Binary file not shown.
|
@ -61,7 +61,7 @@ func TestBlobovniczaTreeFillPercentRebuild(t *testing.T) {
|
||||||
require.NoError(t, b.Init())
|
require.NoError(t, b.Init())
|
||||||
|
|
||||||
storageIDs := make(map[oid.Address][]byte)
|
storageIDs := make(map[oid.Address][]byte)
|
||||||
for i := 0; i < 100; i++ {
|
for range 100 {
|
||||||
obj := blobstortest.NewObject(64 * 1024) // 64KB object
|
obj := blobstortest.NewObject(64 * 1024) // 64KB object
|
||||||
data, err := obj.Marshal()
|
data, err := obj.Marshal()
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
@ -168,7 +168,7 @@ func TestBlobovniczaTreeFillPercentRebuild(t *testing.T) {
|
||||||
|
|
||||||
storageIDs := make(map[oid.Address][]byte)
|
storageIDs := make(map[oid.Address][]byte)
|
||||||
toDelete := make(map[oid.Address][]byte)
|
toDelete := make(map[oid.Address][]byte)
|
||||||
for i := 0; i < 100; i++ { // 2 objects for one blobovnicza, so 50 DBs total will be created
|
for i := range 100 { // 2 objects for one blobovnicza, so 50 DBs total will be created
|
||||||
obj := blobstortest.NewObject(64 * 1024)
|
obj := blobstortest.NewObject(64 * 1024)
|
||||||
data, err := obj.Marshal()
|
data, err := obj.Marshal()
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
@ -236,7 +236,7 @@ func TestBlobovniczaTreeFillPercentRebuild(t *testing.T) {
|
||||||
require.NoError(t, b.Init())
|
require.NoError(t, b.Init())
|
||||||
|
|
||||||
storageIDs := make(map[oid.Address][]byte)
|
storageIDs := make(map[oid.Address][]byte)
|
||||||
for i := 0; i < 100; i++ { // 2 objects for one blobovnicza, so 50 DBs total will be created
|
for range 100 { // 2 objects for one blobovnicza, so 50 DBs total will be created
|
||||||
obj := blobstortest.NewObject(64 * 1024)
|
obj := blobstortest.NewObject(64 * 1024)
|
||||||
data, err := obj.Marshal()
|
data, err := obj.Marshal()
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
|
@ -47,7 +47,7 @@ func TestIterateObjects(t *testing.T) {
|
||||||
|
|
||||||
mObjs := make(map[string]addrData)
|
mObjs := make(map[string]addrData)
|
||||||
|
|
||||||
for i := uint64(0); i < objNum; i++ {
|
for i := range uint64(objNum) {
|
||||||
sz := smalSz
|
sz := smalSz
|
||||||
|
|
||||||
big := i < objNum/2
|
big := i < objNum/2
|
||||||
|
|
|
@ -151,7 +151,7 @@ func TestErrorReporting(t *testing.T) {
|
||||||
checkShardState(t, te.ng, te.shards[1].id, 0, mode.ReadWrite)
|
checkShardState(t, te.ng, te.shards[1].id, 0, mode.ReadWrite)
|
||||||
}
|
}
|
||||||
|
|
||||||
for i := uint32(0); i < 2; i++ {
|
for i := range uint32(2) {
|
||||||
_, err = te.ng.Get(context.Background(), GetPrm{addr: object.AddressOf(obj)})
|
_, err = te.ng.Get(context.Background(), GetPrm{addr: object.AddressOf(obj)})
|
||||||
require.Error(t, err)
|
require.Error(t, err)
|
||||||
checkShardState(t, te.ng, te.shards[0].id, errThreshold+i, mode.ReadOnly)
|
checkShardState(t, te.ng, te.shards[0].id, errThreshold+i, mode.ReadOnly)
|
||||||
|
|
|
@ -705,7 +705,7 @@ func (t *boltForest) applyOperation(logBucket, treeBucket *bbolt.Bucket, ms []*M
|
||||||
key, value = c.Prev()
|
key, value = c.Prev()
|
||||||
}
|
}
|
||||||
|
|
||||||
for i := range len(ms) {
|
for i := range ms {
|
||||||
// Loop invariant: key represents the next stored timestamp after ms[i].Time.
|
// Loop invariant: key represents the next stored timestamp after ms[i].Time.
|
||||||
|
|
||||||
// 2. Insert the operation.
|
// 2. Insert the operation.
|
||||||
|
|
|
@ -1081,7 +1081,7 @@ func prepareRandomTree(nodeCount, opCount int) []Move {
|
||||||
}
|
}
|
||||||
|
|
||||||
func compareForests(t *testing.T, expected, actual Forest, cid cidSDK.ID, treeID string, nodeCount int) {
|
func compareForests(t *testing.T, expected, actual Forest, cid cidSDK.ID, treeID string, nodeCount int) {
|
||||||
for i := uint64(0); i < uint64(nodeCount); i++ {
|
for i := range uint64(nodeCount) {
|
||||||
expectedMeta, expectedParent, err := expected.TreeGetMeta(context.Background(), cid, treeID, i)
|
expectedMeta, expectedParent, err := expected.TreeGetMeta(context.Background(), cid, treeID, i)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
actualMeta, actualParent, err := actual.TreeGetMeta(context.Background(), cid, treeID, i)
|
actualMeta, actualParent, err := actual.TreeGetMeta(context.Background(), cid, treeID, i)
|
||||||
|
|
|
@ -216,7 +216,7 @@ func TestRefillMetabase(t *testing.T) {
|
||||||
locked := make([]oid.ID, 1, 2)
|
locked := make([]oid.ID, 1, 2)
|
||||||
locked[0] = oidtest.ID()
|
locked[0] = oidtest.ID()
|
||||||
cnrLocked := cidtest.ID()
|
cnrLocked := cidtest.ID()
|
||||||
for i := uint64(0); i < objNum; i++ {
|
for range objNum {
|
||||||
obj := objecttest.Object()
|
obj := objecttest.Object()
|
||||||
obj.SetType(objectSDK.TypeRegular)
|
obj.SetType(objectSDK.TypeRegular)
|
||||||
|
|
||||||
|
|
|
@ -14,7 +14,7 @@ func TestLimiter(t *testing.T) {
|
||||||
l := newFlushLimiter(uint64(maxSize))
|
l := newFlushLimiter(uint64(maxSize))
|
||||||
var currSize atomic.Int64
|
var currSize atomic.Int64
|
||||||
var eg errgroup.Group
|
var eg errgroup.Group
|
||||||
for i := 0; i < 10_000; i++ {
|
for range 10_000 {
|
||||||
eg.Go(func() error {
|
eg.Go(func() error {
|
||||||
defer l.release(single)
|
defer l.release(single)
|
||||||
defer currSize.Add(-1)
|
defer currSize.Add(-1)
|
||||||
|
|
|
@ -4,11 +4,11 @@ import (
|
||||||
"context"
|
"context"
|
||||||
"errors"
|
"errors"
|
||||||
"fmt"
|
"fmt"
|
||||||
|
"net"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/metrics"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/metrics"
|
||||||
internalNet "git.frostfs.info/TrueCloudLab/frostfs-node/internal/net"
|
|
||||||
morphmetrics "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/metrics"
|
morphmetrics "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/metrics"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger"
|
||||||
lru "github.com/hashicorp/golang-lru/v2"
|
lru "github.com/hashicorp/golang-lru/v2"
|
||||||
|
@ -48,7 +48,7 @@ type cfg struct {
|
||||||
|
|
||||||
morphCacheMetrics metrics.MorphCacheMetrics
|
morphCacheMetrics metrics.MorphCacheMetrics
|
||||||
|
|
||||||
dialerSource *internalNet.DialerSource
|
dialerSource DialerSource
|
||||||
}
|
}
|
||||||
|
|
||||||
const (
|
const (
|
||||||
|
@ -68,6 +68,7 @@ func defaultConfig() *cfg {
|
||||||
Scopes: transaction.Global,
|
Scopes: transaction.Global,
|
||||||
},
|
},
|
||||||
morphCacheMetrics: &morphmetrics.NoopMorphCacheMetrics{},
|
morphCacheMetrics: &morphmetrics.NoopMorphCacheMetrics{},
|
||||||
|
dialerSource: &noopDialerSource{},
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -296,7 +297,17 @@ func WithMorphCacheMetrics(morphCacheMetrics metrics.MorphCacheMetrics) Option {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func WithDialerSource(ds *internalNet.DialerSource) Option {
|
type DialerSource interface {
|
||||||
|
NetContextDialer() func(context.Context, string, string) (net.Conn, error)
|
||||||
|
}
|
||||||
|
|
||||||
|
type noopDialerSource struct{}
|
||||||
|
|
||||||
|
func (ds *noopDialerSource) NetContextDialer() func(context.Context, string, string) (net.Conn, error) {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func WithDialerSource(ds DialerSource) Option {
|
||||||
return func(c *cfg) {
|
return func(c *cfg) {
|
||||||
c.dialerSource = ds
|
c.dialerSource = ds
|
||||||
}
|
}
|
||||||
|
|
|
@ -8,7 +8,7 @@ import (
|
||||||
)
|
)
|
||||||
|
|
||||||
func tickN(t *timer.BlockTimer, n uint32) {
|
func tickN(t *timer.BlockTimer, n uint32) {
|
||||||
for i := uint32(0); i < n; i++ {
|
for range n {
|
||||||
t.Tick(0)
|
t.Tick(0)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -537,10 +537,7 @@ func (ac *apeChecker) isContainerKey(pk []byte, cnrID cid.ID, cont *containercor
|
||||||
return false, err
|
return false, err
|
||||||
}
|
}
|
||||||
|
|
||||||
in, err := isContainerNode(nm, pk, binCnrID, cont)
|
if isContainerNode(nm, pk, binCnrID, cont) {
|
||||||
if err != nil {
|
|
||||||
return false, err
|
|
||||||
} else if in {
|
|
||||||
return true, nil
|
return true, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -551,24 +548,24 @@ func (ac *apeChecker) isContainerKey(pk []byte, cnrID cid.ID, cont *containercor
|
||||||
return false, err
|
return false, err
|
||||||
}
|
}
|
||||||
|
|
||||||
return isContainerNode(nm, pk, binCnrID, cont)
|
return isContainerNode(nm, pk, binCnrID, cont), nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func isContainerNode(nm *netmapSDK.NetMap, pk, binCnrID []byte, cont *containercore.Container) (bool, error) {
|
func isContainerNode(nm *netmapSDK.NetMap, pk, binCnrID []byte, cont *containercore.Container) bool {
|
||||||
cnrVectors, err := nm.ContainerNodes(cont.Value.PlacementPolicy(), binCnrID)
|
// It could an error only if the network map doesn't have enough nodes to
|
||||||
if err != nil {
|
// fulfil the policy. It's a logical error that doesn't affect an actor role
|
||||||
return false, err
|
// determining, so we ignore it
|
||||||
}
|
cnrVectors, _ := nm.ContainerNodes(cont.Value.PlacementPolicy(), binCnrID)
|
||||||
|
|
||||||
for i := range cnrVectors {
|
for i := range cnrVectors {
|
||||||
for j := range cnrVectors[i] {
|
for j := range cnrVectors[i] {
|
||||||
if bytes.Equal(cnrVectors[i][j].PublicKey(), pk) {
|
if bytes.Equal(cnrVectors[i][j].PublicKey(), pk) {
|
||||||
return true, nil
|
return true
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
return false, nil
|
return false
|
||||||
}
|
}
|
||||||
|
|
||||||
func (ac *apeChecker) namespaceByOwner(owner *refs.OwnerID) (string, error) {
|
func (ac *apeChecker) namespaceByOwner(owner *refs.OwnerID) (string, error) {
|
||||||
|
|
|
@ -28,7 +28,7 @@ type executorSvc struct {
|
||||||
type NodeState interface {
|
type NodeState interface {
|
||||||
// LocalNodeInfo must return current node state
|
// LocalNodeInfo must return current node state
|
||||||
// in FrostFS API v2 NodeInfo structure.
|
// in FrostFS API v2 NodeInfo structure.
|
||||||
LocalNodeInfo() (*netmap.NodeInfo, error)
|
LocalNodeInfo() *netmapSDK.NodeInfo
|
||||||
|
|
||||||
// ReadCurrentNetMap reads current local network map of the storage node
|
// ReadCurrentNetMap reads current local network map of the storage node
|
||||||
// into the given parameter. Returns any error encountered which prevented
|
// into the given parameter. Returns any error encountered which prevented
|
||||||
|
@ -64,39 +64,15 @@ func NewExecutionService(s NodeState, v versionsdk.Version, netInfo NetworkInfo,
|
||||||
|
|
||||||
func (s *executorSvc) LocalNodeInfo(
|
func (s *executorSvc) LocalNodeInfo(
|
||||||
_ context.Context,
|
_ context.Context,
|
||||||
req *netmap.LocalNodeInfoRequest,
|
_ *netmap.LocalNodeInfoRequest,
|
||||||
) (*netmap.LocalNodeInfoResponse, error) {
|
) (*netmap.LocalNodeInfoResponse, error) {
|
||||||
verV2 := req.GetMetaHeader().GetVersion()
|
ni := s.state.LocalNodeInfo()
|
||||||
if verV2 == nil {
|
var nodeInfo netmap.NodeInfo
|
||||||
return nil, errors.New("missing version")
|
ni.WriteToV2(&nodeInfo)
|
||||||
}
|
|
||||||
|
|
||||||
var ver versionsdk.Version
|
|
||||||
if err := ver.ReadFromV2(*verV2); err != nil {
|
|
||||||
return nil, fmt.Errorf("can't read version: %w", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
ni, err := s.state.LocalNodeInfo()
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
if addrNum := ni.NumberOfAddresses(); addrNum > 0 && ver.Minor() <= 7 {
|
|
||||||
ni2 := new(netmap.NodeInfo)
|
|
||||||
ni2.SetPublicKey(ni.GetPublicKey())
|
|
||||||
ni2.SetState(ni.GetState())
|
|
||||||
ni2.SetAttributes(ni.GetAttributes())
|
|
||||||
ni.IterateAddresses(func(s string) bool {
|
|
||||||
ni2.SetAddresses(s)
|
|
||||||
return true
|
|
||||||
})
|
|
||||||
|
|
||||||
ni = ni2
|
|
||||||
}
|
|
||||||
|
|
||||||
body := new(netmap.LocalNodeInfoResponseBody)
|
body := new(netmap.LocalNodeInfoResponseBody)
|
||||||
body.SetVersion(&s.version)
|
body.SetVersion(&s.version)
|
||||||
body.SetNodeInfo(ni)
|
body.SetNodeInfo(&nodeInfo)
|
||||||
|
|
||||||
resp := new(netmap.LocalNodeInfoResponse)
|
resp := new(netmap.LocalNodeInfoResponse)
|
||||||
resp.SetBody(body)
|
resp.SetBody(body)
|
||||||
|
|
|
@ -50,7 +50,7 @@ func testGenerateMetaHeader(depth uint32, b *acl.BearerToken, s *session.Token)
|
||||||
metaHeader.SetBearerToken(b)
|
metaHeader.SetBearerToken(b)
|
||||||
metaHeader.SetSessionToken(s)
|
metaHeader.SetSessionToken(s)
|
||||||
|
|
||||||
for i := uint32(0); i < depth; i++ {
|
for range depth {
|
||||||
link := metaHeader
|
link := metaHeader
|
||||||
metaHeader = new(session.RequestMetaHeader)
|
metaHeader = new(session.RequestMetaHeader)
|
||||||
metaHeader.SetOrigin(link)
|
metaHeader.SetOrigin(link)
|
||||||
|
|
|
@ -284,7 +284,7 @@ func (e *ECWriter) writePart(ctx context.Context, obj *objectSDK.Object, partIdx
|
||||||
}
|
}
|
||||||
|
|
||||||
// try to save to any node not visited by current part
|
// try to save to any node not visited by current part
|
||||||
for i := range len(nodes) {
|
for i := range nodes {
|
||||||
select {
|
select {
|
||||||
case <-ctx.Done():
|
case <-ctx.Done():
|
||||||
return ctx.Err()
|
return ctx.Err()
|
||||||
|
|
|
@ -113,10 +113,9 @@ func (s *Streamer) init(ctx context.Context, req *objectV2.PatchRequest) error {
|
||||||
oV2.GetHeader().SetOwnerID(ownerID)
|
oV2.GetHeader().SetOwnerID(ownerID)
|
||||||
|
|
||||||
target, err := target.New(objectwriter.Params{
|
target, err := target.New(objectwriter.Params{
|
||||||
Config: s.Config,
|
Config: s.Config,
|
||||||
Common: commonPrm,
|
Common: commonPrm,
|
||||||
Header: objectSDK.NewFromV2(oV2),
|
Header: objectSDK.NewFromV2(oV2),
|
||||||
SignRequestPrivateKey: s.localNodeKey,
|
|
||||||
})
|
})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("target creation: %w", err)
|
return fmt.Errorf("target creation: %w", err)
|
||||||
|
|
43
pkg/services/object_manager/placement/metrics.go
Normal file
43
pkg/services/object_manager/placement/metrics.go
Normal file
|
@ -0,0 +1,43 @@
|
||||||
|
package placement
|
||||||
|
|
||||||
|
import (
|
||||||
|
"errors"
|
||||||
|
"strings"
|
||||||
|
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/netmap"
|
||||||
|
)
|
||||||
|
|
||||||
|
const (
|
||||||
|
attrPrefix = "$attribute:"
|
||||||
|
)
|
||||||
|
|
||||||
|
type Metric interface {
|
||||||
|
CalculateValue(*netmap.NodeInfo, *netmap.NodeInfo) int
|
||||||
|
}
|
||||||
|
|
||||||
|
func ParseMetric(raw string) (Metric, error) {
|
||||||
|
if attr, found := strings.CutPrefix(raw, attrPrefix); found {
|
||||||
|
return NewAttributeMetric(attr), nil
|
||||||
|
}
|
||||||
|
return nil, errors.New("unsupported priority metric")
|
||||||
|
}
|
||||||
|
|
||||||
|
// attributeMetric describes priority metric based on attribute.
|
||||||
|
type attributeMetric struct {
|
||||||
|
attribute string
|
||||||
|
}
|
||||||
|
|
||||||
|
// CalculateValue return [0] if from and to contains attribute attributeMetric.attribute and
|
||||||
|
// the value of attribute is the same. In other case return [1].
|
||||||
|
func (am *attributeMetric) CalculateValue(from *netmap.NodeInfo, to *netmap.NodeInfo) int {
|
||||||
|
fromAttr := from.Attribute(am.attribute)
|
||||||
|
toAttr := to.Attribute(am.attribute)
|
||||||
|
if len(fromAttr) > 0 && len(toAttr) > 0 && fromAttr == toAttr {
|
||||||
|
return 0
|
||||||
|
}
|
||||||
|
return 1
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewAttributeMetric(attr string) Metric {
|
||||||
|
return &attributeMetric{attribute: attr}
|
||||||
|
}
|
|
@ -3,6 +3,7 @@ package placement
|
||||||
import (
|
import (
|
||||||
"errors"
|
"errors"
|
||||||
"fmt"
|
"fmt"
|
||||||
|
"slices"
|
||||||
"sync"
|
"sync"
|
||||||
|
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/network"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/network"
|
||||||
|
@ -23,6 +24,11 @@ type Builder interface {
|
||||||
BuildPlacement(cid.ID, *oid.ID, netmap.PlacementPolicy) ([][]netmap.NodeInfo, error)
|
BuildPlacement(cid.ID, *oid.ID, netmap.PlacementPolicy) ([][]netmap.NodeInfo, error)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type NodeState interface {
|
||||||
|
// LocalNodeInfo return current node state in FrostFS API v2 NodeInfo structure.
|
||||||
|
LocalNodeInfo() *netmap.NodeInfo
|
||||||
|
}
|
||||||
|
|
||||||
// Option represents placement traverser option.
|
// Option represents placement traverser option.
|
||||||
type Option func(*cfg)
|
type Option func(*cfg)
|
||||||
|
|
||||||
|
@ -50,6 +56,10 @@ type cfg struct {
|
||||||
policy netmap.PlacementPolicy
|
policy netmap.PlacementPolicy
|
||||||
|
|
||||||
builder Builder
|
builder Builder
|
||||||
|
|
||||||
|
metrics []Metric
|
||||||
|
|
||||||
|
nodeState NodeState
|
||||||
}
|
}
|
||||||
|
|
||||||
const invalidOptsMsg = "invalid traverser options"
|
const invalidOptsMsg = "invalid traverser options"
|
||||||
|
@ -99,7 +109,22 @@ func NewTraverser(opts ...Option) (*Traverser, error) {
|
||||||
}
|
}
|
||||||
|
|
||||||
var rem []int
|
var rem []int
|
||||||
if cfg.flatSuccess != nil {
|
if len(cfg.metrics) > 0 && cfg.nodeState != nil {
|
||||||
|
rem = defaultCopiesVector(cfg.policy)
|
||||||
|
var unsortedVector []netmap.NodeInfo
|
||||||
|
var regularVector []netmap.NodeInfo
|
||||||
|
for i := range rem {
|
||||||
|
unsortedVector = append(unsortedVector, ns[i][:rem[i]]...)
|
||||||
|
regularVector = append(regularVector, ns[i][rem[i]:]...)
|
||||||
|
}
|
||||||
|
rem = []int{-1, -1}
|
||||||
|
|
||||||
|
sortedVector, err := sortVector(cfg, unsortedVector)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
ns = [][]netmap.NodeInfo{sortedVector, regularVector}
|
||||||
|
} else if cfg.flatSuccess != nil {
|
||||||
ns = flatNodes(ns)
|
ns = flatNodes(ns)
|
||||||
rem = []int{int(*cfg.flatSuccess)}
|
rem = []int{int(*cfg.flatSuccess)}
|
||||||
} else {
|
} else {
|
||||||
|
@ -157,6 +182,35 @@ func flatNodes(ns [][]netmap.NodeInfo) [][]netmap.NodeInfo {
|
||||||
return [][]netmap.NodeInfo{flat}
|
return [][]netmap.NodeInfo{flat}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type nodeMetrics struct {
|
||||||
|
index int
|
||||||
|
metrics []int
|
||||||
|
}
|
||||||
|
|
||||||
|
func sortVector(cfg *cfg, unsortedVector []netmap.NodeInfo) ([]netmap.NodeInfo, error) {
|
||||||
|
nm := make([]nodeMetrics, len(unsortedVector))
|
||||||
|
node := cfg.nodeState.LocalNodeInfo()
|
||||||
|
|
||||||
|
for i := range unsortedVector {
|
||||||
|
m := make([]int, len(cfg.metrics))
|
||||||
|
for j, pm := range cfg.metrics {
|
||||||
|
m[j] = pm.CalculateValue(node, &unsortedVector[i])
|
||||||
|
}
|
||||||
|
nm[i] = nodeMetrics{
|
||||||
|
index: i,
|
||||||
|
metrics: m,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
slices.SortFunc(nm, func(a, b nodeMetrics) int {
|
||||||
|
return slices.Compare(a.metrics, b.metrics)
|
||||||
|
})
|
||||||
|
sortedVector := make([]netmap.NodeInfo, len(unsortedVector))
|
||||||
|
for i := range unsortedVector {
|
||||||
|
sortedVector[i] = unsortedVector[nm[i].index]
|
||||||
|
}
|
||||||
|
return sortedVector, nil
|
||||||
|
}
|
||||||
|
|
||||||
// Node is a descriptor of storage node with information required for intra-container communication.
|
// Node is a descriptor of storage node with information required for intra-container communication.
|
||||||
type Node struct {
|
type Node struct {
|
||||||
addresses network.AddressGroup
|
addresses network.AddressGroup
|
||||||
|
@ -322,3 +376,17 @@ func WithCopyNumbers(v []uint32) Option {
|
||||||
c.copyNumbers = v
|
c.copyNumbers = v
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// WithPriorityMetrics use provided priority metrics to sort nodes.
|
||||||
|
func WithPriorityMetrics(m []Metric) Option {
|
||||||
|
return func(c *cfg) {
|
||||||
|
c.metrics = m
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// WithNodeState provide state of the current node.
|
||||||
|
func WithNodeState(s NodeState) Option {
|
||||||
|
return func(c *cfg) {
|
||||||
|
c.nodeState = s
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
|
@ -22,7 +22,9 @@ func (b testBuilder) BuildPlacement(cid.ID, *oid.ID, netmap.PlacementPolicy) ([]
|
||||||
}
|
}
|
||||||
|
|
||||||
func testNode(v uint32) (n netmap.NodeInfo) {
|
func testNode(v uint32) (n netmap.NodeInfo) {
|
||||||
n.SetNetworkEndpoints("/ip4/0.0.0.0/tcp/" + strconv.Itoa(int(v)))
|
ip := "/ip4/0.0.0.0/tcp/" + strconv.Itoa(int(v))
|
||||||
|
n.SetNetworkEndpoints(ip)
|
||||||
|
n.SetPublicKey([]byte(ip))
|
||||||
|
|
||||||
return n
|
return n
|
||||||
}
|
}
|
||||||
|
@ -40,7 +42,15 @@ func copyVectors(v [][]netmap.NodeInfo) [][]netmap.NodeInfo {
|
||||||
return vc
|
return vc
|
||||||
}
|
}
|
||||||
|
|
||||||
func testPlacement(ss, rs []int) ([][]netmap.NodeInfo, container.Container) {
|
func testPlacement(ss []int, rs []int) ([][]netmap.NodeInfo, container.Container) {
|
||||||
|
return placement(ss, rs, nil)
|
||||||
|
}
|
||||||
|
|
||||||
|
func testECPlacement(ss []int, ec [][]int) ([][]netmap.NodeInfo, container.Container) {
|
||||||
|
return placement(ss, nil, ec)
|
||||||
|
}
|
||||||
|
|
||||||
|
func placement(ss []int, rs []int, ec [][]int) ([][]netmap.NodeInfo, container.Container) {
|
||||||
nodes := make([][]netmap.NodeInfo, 0, len(rs))
|
nodes := make([][]netmap.NodeInfo, 0, len(rs))
|
||||||
replicas := make([]netmap.ReplicaDescriptor, 0, len(rs))
|
replicas := make([]netmap.ReplicaDescriptor, 0, len(rs))
|
||||||
num := uint32(0)
|
num := uint32(0)
|
||||||
|
@ -56,7 +66,12 @@ func testPlacement(ss, rs []int) ([][]netmap.NodeInfo, container.Container) {
|
||||||
nodes = append(nodes, ns)
|
nodes = append(nodes, ns)
|
||||||
|
|
||||||
var rd netmap.ReplicaDescriptor
|
var rd netmap.ReplicaDescriptor
|
||||||
rd.SetNumberOfObjects(uint32(rs[i]))
|
if len(rs) > 0 {
|
||||||
|
rd.SetNumberOfObjects(uint32(rs[i]))
|
||||||
|
} else {
|
||||||
|
rd.SetECDataCount(uint32(ec[i][0]))
|
||||||
|
rd.SetECParityCount(uint32(ec[i][1]))
|
||||||
|
}
|
||||||
|
|
||||||
replicas = append(replicas, rd)
|
replicas = append(replicas, rd)
|
||||||
}
|
}
|
||||||
|
@ -134,7 +149,7 @@ func TestTraverserObjectScenarios(t *testing.T) {
|
||||||
err = n.FromIterator(netmapcore.Node(nodes[1][0]))
|
err = n.FromIterator(netmapcore.Node(nodes[1][0]))
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
require.Equal(t, []Node{{addresses: n}}, tr.Next())
|
require.Equal(t, []Node{{addresses: n, key: []byte("/ip4/0.0.0.0/tcp/5")}}, tr.Next())
|
||||||
})
|
})
|
||||||
|
|
||||||
t.Run("put scenario", func(t *testing.T) {
|
t.Run("put scenario", func(t *testing.T) {
|
||||||
|
@ -275,3 +290,268 @@ func TestTraverserRemValues(t *testing.T) {
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type nodeState struct {
|
||||||
|
node *netmap.NodeInfo
|
||||||
|
}
|
||||||
|
|
||||||
|
func (n *nodeState) LocalNodeInfo() *netmap.NodeInfo {
|
||||||
|
return n.node
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestTraverserPriorityMetrics(t *testing.T) {
|
||||||
|
t.Run("one rep one metric", func(t *testing.T) {
|
||||||
|
selectors := []int{4}
|
||||||
|
replicas := []int{3}
|
||||||
|
|
||||||
|
nodes, cnr := testPlacement(selectors, replicas)
|
||||||
|
|
||||||
|
// Node_0, PK - ip4/0.0.0.0/tcp/0
|
||||||
|
nodes[0][0].SetAttribute("ClusterName", "A")
|
||||||
|
// Node_1, PK - ip4/0.0.0.0/tcp/1
|
||||||
|
nodes[0][1].SetAttribute("ClusterName", "A")
|
||||||
|
// Node_2, PK - ip4/0.0.0.0/tcp/2
|
||||||
|
nodes[0][2].SetAttribute("ClusterName", "B")
|
||||||
|
// Node_3, PK - ip4/0.0.0.0/tcp/3
|
||||||
|
nodes[0][3].SetAttribute("ClusterName", "B")
|
||||||
|
|
||||||
|
sdkNode := testNode(5)
|
||||||
|
sdkNode.SetAttribute("ClusterName", "B")
|
||||||
|
|
||||||
|
nodesCopy := copyVectors(nodes)
|
||||||
|
|
||||||
|
m := []Metric{NewAttributeMetric("ClusterName")}
|
||||||
|
|
||||||
|
tr, err := NewTraverser(
|
||||||
|
ForContainer(cnr),
|
||||||
|
UseBuilder(&testBuilder{
|
||||||
|
vectors: nodesCopy,
|
||||||
|
}),
|
||||||
|
WithoutSuccessTracking(),
|
||||||
|
WithPriorityMetrics(m),
|
||||||
|
WithNodeState(&nodeState{
|
||||||
|
node: &sdkNode,
|
||||||
|
}),
|
||||||
|
)
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
// Without priority metric `ClusterName` the order will be:
|
||||||
|
// [ {Node_0 A}, {Node_1 A}, {Node_2 B}, {Node_3 B}]
|
||||||
|
// With priority metric `ClusterName` and current node in cluster B
|
||||||
|
// the order should be:
|
||||||
|
// [ {Node_2 B}, {Node_0 A}, {Node_1 A}, {Node_3 B}]
|
||||||
|
next := tr.Next()
|
||||||
|
require.NotNil(t, next)
|
||||||
|
require.Equal(t, 3, len(next))
|
||||||
|
require.Equal(t, "/ip4/0.0.0.0/tcp/2", string(next[0].PublicKey()))
|
||||||
|
require.Equal(t, "/ip4/0.0.0.0/tcp/0", string(next[1].PublicKey()))
|
||||||
|
require.Equal(t, "/ip4/0.0.0.0/tcp/1", string(next[2].PublicKey()))
|
||||||
|
|
||||||
|
next = tr.Next()
|
||||||
|
// The last node is
|
||||||
|
require.Equal(t, 1, len(next))
|
||||||
|
require.Equal(t, "/ip4/0.0.0.0/tcp/3", string(next[0].PublicKey()))
|
||||||
|
|
||||||
|
next = tr.Next()
|
||||||
|
require.Nil(t, next)
|
||||||
|
})
|
||||||
|
|
||||||
|
t.Run("two reps two metrics", func(t *testing.T) {
|
||||||
|
selectors := []int{3, 3}
|
||||||
|
replicas := []int{2, 2}
|
||||||
|
|
||||||
|
nodes, cnr := testPlacement(selectors, replicas)
|
||||||
|
|
||||||
|
// REPLICA #1
|
||||||
|
// Node_0, PK - ip4/0.0.0.0/tcp/0
|
||||||
|
nodes[0][0].SetAttribute("ClusterName", "A")
|
||||||
|
nodes[0][0].SetAttribute("UN-LOCODE", "RU LED")
|
||||||
|
|
||||||
|
// Node_1, PK - ip4/0.0.0.0/tcp/1
|
||||||
|
nodes[0][1].SetAttribute("ClusterName", "A")
|
||||||
|
nodes[0][1].SetAttribute("UN-LOCODE", "FI HEL")
|
||||||
|
|
||||||
|
// Node_2, PK - ip4/0.0.0.0/tcp/2
|
||||||
|
nodes[0][2].SetAttribute("ClusterName", "A")
|
||||||
|
nodes[0][2].SetAttribute("UN-LOCODE", "RU LED")
|
||||||
|
|
||||||
|
// REPLICA #2
|
||||||
|
// Node_3 ip4/0.0.0.0/tcp/3
|
||||||
|
nodes[1][0].SetAttribute("ClusterName", "B")
|
||||||
|
nodes[1][0].SetAttribute("UN-LOCODE", "RU MOW")
|
||||||
|
|
||||||
|
// Node_4, PK - ip4/0.0.0.0/tcp/4
|
||||||
|
nodes[1][1].SetAttribute("ClusterName", "B")
|
||||||
|
nodes[1][1].SetAttribute("UN-LOCODE", "RU DME")
|
||||||
|
|
||||||
|
// Node_5, PK - ip4/0.0.0.0/tcp/5
|
||||||
|
nodes[1][2].SetAttribute("ClusterName", "B")
|
||||||
|
nodes[1][2].SetAttribute("UN-LOCODE", "RU MOW")
|
||||||
|
|
||||||
|
sdkNode := testNode(9)
|
||||||
|
sdkNode.SetAttribute("ClusterName", "B")
|
||||||
|
sdkNode.SetAttribute("UN-LOCODE", "RU DME")
|
||||||
|
|
||||||
|
nodesCopy := copyVectors(nodes)
|
||||||
|
|
||||||
|
m := []Metric{
|
||||||
|
NewAttributeMetric("ClusterName"),
|
||||||
|
NewAttributeMetric("UN-LOCODE"),
|
||||||
|
}
|
||||||
|
|
||||||
|
tr, err := NewTraverser(
|
||||||
|
ForContainer(cnr),
|
||||||
|
UseBuilder(&testBuilder{
|
||||||
|
vectors: nodesCopy,
|
||||||
|
}),
|
||||||
|
WithoutSuccessTracking(),
|
||||||
|
WithPriorityMetrics(m),
|
||||||
|
WithNodeState(&nodeState{
|
||||||
|
node: &sdkNode,
|
||||||
|
}),
|
||||||
|
)
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
// Check that nodes in the same cluster and
|
||||||
|
// in the same location should be the first in slice.
|
||||||
|
// Nodes which are follow criteria but stay outside the replica
|
||||||
|
// should be in the next slice.
|
||||||
|
|
||||||
|
next := tr.Next()
|
||||||
|
require.Equal(t, 4, len(next))
|
||||||
|
require.Equal(t, "/ip4/0.0.0.0/tcp/4", string(next[0].PublicKey()))
|
||||||
|
require.Equal(t, "/ip4/0.0.0.0/tcp/3", string(next[1].PublicKey()))
|
||||||
|
require.Equal(t, "/ip4/0.0.0.0/tcp/0", string(next[2].PublicKey()))
|
||||||
|
require.Equal(t, "/ip4/0.0.0.0/tcp/1", string(next[3].PublicKey()))
|
||||||
|
|
||||||
|
next = tr.Next()
|
||||||
|
require.Equal(t, 2, len(next))
|
||||||
|
require.Equal(t, "/ip4/0.0.0.0/tcp/2", string(next[0].PublicKey()))
|
||||||
|
require.Equal(t, "/ip4/0.0.0.0/tcp/5", string(next[1].PublicKey()))
|
||||||
|
|
||||||
|
next = tr.Next()
|
||||||
|
require.Nil(t, next)
|
||||||
|
|
||||||
|
sdkNode.SetAttribute("ClusterName", "B")
|
||||||
|
sdkNode.SetAttribute("UN-LOCODE", "RU MOW")
|
||||||
|
|
||||||
|
nodesCopy = copyVectors(nodes)
|
||||||
|
|
||||||
|
tr, err = NewTraverser(
|
||||||
|
ForContainer(cnr),
|
||||||
|
UseBuilder(&testBuilder{
|
||||||
|
vectors: nodesCopy,
|
||||||
|
}),
|
||||||
|
WithoutSuccessTracking(),
|
||||||
|
WithPriorityMetrics(m),
|
||||||
|
WithNodeState(&nodeState{
|
||||||
|
node: &sdkNode,
|
||||||
|
}),
|
||||||
|
)
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
next = tr.Next()
|
||||||
|
require.Equal(t, 4, len(next))
|
||||||
|
require.Equal(t, "/ip4/0.0.0.0/tcp/3", string(next[0].PublicKey()))
|
||||||
|
require.Equal(t, "/ip4/0.0.0.0/tcp/4", string(next[1].PublicKey()))
|
||||||
|
require.Equal(t, "/ip4/0.0.0.0/tcp/0", string(next[2].PublicKey()))
|
||||||
|
require.Equal(t, "/ip4/0.0.0.0/tcp/1", string(next[3].PublicKey()))
|
||||||
|
|
||||||
|
next = tr.Next()
|
||||||
|
require.Equal(t, 2, len(next))
|
||||||
|
require.Equal(t, "/ip4/0.0.0.0/tcp/2", string(next[0].PublicKey()))
|
||||||
|
require.Equal(t, "/ip4/0.0.0.0/tcp/5", string(next[1].PublicKey()))
|
||||||
|
|
||||||
|
next = tr.Next()
|
||||||
|
require.Nil(t, next)
|
||||||
|
|
||||||
|
sdkNode.SetAttribute("ClusterName", "A")
|
||||||
|
sdkNode.SetAttribute("UN-LOCODE", "RU LED")
|
||||||
|
|
||||||
|
nodesCopy = copyVectors(nodes)
|
||||||
|
|
||||||
|
tr, err = NewTraverser(
|
||||||
|
ForContainer(cnr),
|
||||||
|
UseBuilder(&testBuilder{
|
||||||
|
vectors: nodesCopy,
|
||||||
|
}),
|
||||||
|
WithoutSuccessTracking(),
|
||||||
|
WithPriorityMetrics(m),
|
||||||
|
WithNodeState(&nodeState{
|
||||||
|
node: &sdkNode,
|
||||||
|
}),
|
||||||
|
)
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
next = tr.Next()
|
||||||
|
require.Equal(t, 4, len(next))
|
||||||
|
require.Equal(t, "/ip4/0.0.0.0/tcp/0", string(next[0].PublicKey()))
|
||||||
|
require.Equal(t, "/ip4/0.0.0.0/tcp/1", string(next[1].PublicKey()))
|
||||||
|
require.Equal(t, "/ip4/0.0.0.0/tcp/3", string(next[2].PublicKey()))
|
||||||
|
require.Equal(t, "/ip4/0.0.0.0/tcp/4", string(next[3].PublicKey()))
|
||||||
|
|
||||||
|
next = tr.Next()
|
||||||
|
require.Equal(t, 2, len(next))
|
||||||
|
require.Equal(t, "/ip4/0.0.0.0/tcp/2", string(next[0].PublicKey()))
|
||||||
|
require.Equal(t, "/ip4/0.0.0.0/tcp/5", string(next[1].PublicKey()))
|
||||||
|
|
||||||
|
next = tr.Next()
|
||||||
|
require.Nil(t, next)
|
||||||
|
})
|
||||||
|
|
||||||
|
t.Run("ec container", func(t *testing.T) {
|
||||||
|
selectors := []int{4}
|
||||||
|
ec := [][]int{{2, 1}}
|
||||||
|
|
||||||
|
nodes, cnr := testECPlacement(selectors, ec)
|
||||||
|
|
||||||
|
// Node_0, PK - ip4/0.0.0.0/tcp/0
|
||||||
|
nodes[0][0].SetAttribute("ClusterName", "A")
|
||||||
|
// Node_1, PK - ip4/0.0.0.0/tcp/1
|
||||||
|
nodes[0][1].SetAttribute("ClusterName", "A")
|
||||||
|
// Node_2, PK - ip4/0.0.0.0/tcp/2
|
||||||
|
nodes[0][2].SetAttribute("ClusterName", "B")
|
||||||
|
// Node_3, PK - ip4/0.0.0.0/tcp/3
|
||||||
|
nodes[0][3].SetAttribute("ClusterName", "B")
|
||||||
|
|
||||||
|
sdkNode := testNode(5)
|
||||||
|
sdkNode.SetAttribute("ClusterName", "B")
|
||||||
|
|
||||||
|
nodesCopy := copyVectors(nodes)
|
||||||
|
|
||||||
|
m := []Metric{NewAttributeMetric("ClusterName")}
|
||||||
|
|
||||||
|
tr, err := NewTraverser(
|
||||||
|
ForContainer(cnr),
|
||||||
|
UseBuilder(&testBuilder{
|
||||||
|
vectors: nodesCopy,
|
||||||
|
}),
|
||||||
|
WithoutSuccessTracking(),
|
||||||
|
WithPriorityMetrics(m),
|
||||||
|
WithNodeState(&nodeState{
|
||||||
|
node: &sdkNode,
|
||||||
|
}),
|
||||||
|
)
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
// Without priority metric `ClusterName` the order will be:
|
||||||
|
// [ {Node_0 A}, {Node_1 A}, {Node_2 B}, {Node_3 B}]
|
||||||
|
// With priority metric `ClusterName` and current node in cluster B
|
||||||
|
// the order should be:
|
||||||
|
// [ {Node_2 B}, {Node_0 A}, {Node_1 A}, {Node_3 B}]
|
||||||
|
next := tr.Next()
|
||||||
|
require.NotNil(t, next)
|
||||||
|
require.Equal(t, 3, len(next))
|
||||||
|
require.Equal(t, "/ip4/0.0.0.0/tcp/2", string(next[0].PublicKey()))
|
||||||
|
require.Equal(t, "/ip4/0.0.0.0/tcp/0", string(next[1].PublicKey()))
|
||||||
|
require.Equal(t, "/ip4/0.0.0.0/tcp/1", string(next[2].PublicKey()))
|
||||||
|
|
||||||
|
next = tr.Next()
|
||||||
|
// The last node is
|
||||||
|
require.Equal(t, 1, len(next))
|
||||||
|
require.Equal(t, "/ip4/0.0.0.0/tcp/3", string(next[0].PublicKey()))
|
||||||
|
|
||||||
|
next = tr.Next()
|
||||||
|
require.Nil(t, next)
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
207
pkg/services/tree/ape_test.go
Normal file
207
pkg/services/tree/ape_test.go
Normal file
|
@ -0,0 +1,207 @@
|
||||||
|
package tree
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"encoding/hex"
|
||||||
|
"fmt"
|
||||||
|
"testing"
|
||||||
|
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-contract/frostfsid/client"
|
||||||
|
core "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/container"
|
||||||
|
frostfsidcore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/frostfsid"
|
||||||
|
checkercore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/common/ape"
|
||||||
|
containerSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container"
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/acl"
|
||||||
|
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
|
||||||
|
"git.frostfs.info/TrueCloudLab/policy-engine/pkg/chain"
|
||||||
|
"git.frostfs.info/TrueCloudLab/policy-engine/pkg/engine"
|
||||||
|
"git.frostfs.info/TrueCloudLab/policy-engine/pkg/engine/inmemory"
|
||||||
|
nativeschema "git.frostfs.info/TrueCloudLab/policy-engine/schema/native"
|
||||||
|
"github.com/nspcc-dev/neo-go/pkg/crypto/keys"
|
||||||
|
"github.com/nspcc-dev/neo-go/pkg/util"
|
||||||
|
"github.com/stretchr/testify/require"
|
||||||
|
)
|
||||||
|
|
||||||
|
var (
|
||||||
|
containerID = "73tQMTYyUkTgmvPR1HWib6pndbhSoBovbnMF7Pws8Rcy"
|
||||||
|
|
||||||
|
senderPrivateKey, _ = keys.NewPrivateKey()
|
||||||
|
|
||||||
|
senderKey = hex.EncodeToString(senderPrivateKey.PublicKey().Bytes())
|
||||||
|
|
||||||
|
rootCnr = &core.Container{Value: containerSDK.Container{}}
|
||||||
|
)
|
||||||
|
|
||||||
|
type frostfsIDProviderMock struct {
|
||||||
|
subjects map[util.Uint160]*client.Subject
|
||||||
|
subjectsExtended map[util.Uint160]*client.SubjectExtended
|
||||||
|
}
|
||||||
|
|
||||||
|
func (f *frostfsIDProviderMock) GetSubject(key util.Uint160) (*client.Subject, error) {
|
||||||
|
v, ok := f.subjects[key]
|
||||||
|
if !ok {
|
||||||
|
return nil, fmt.Errorf("%s", frostfsidcore.SubjectNotFoundErrorMessage)
|
||||||
|
}
|
||||||
|
return v, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (f *frostfsIDProviderMock) GetSubjectExtended(key util.Uint160) (*client.SubjectExtended, error) {
|
||||||
|
v, ok := f.subjectsExtended[key]
|
||||||
|
if !ok {
|
||||||
|
return nil, fmt.Errorf("%s", frostfsidcore.SubjectNotFoundErrorMessage)
|
||||||
|
}
|
||||||
|
return v, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
var _ frostfsidcore.SubjectProvider = (*frostfsIDProviderMock)(nil)
|
||||||
|
|
||||||
|
func newFrostfsIDProviderMock(t *testing.T) *frostfsIDProviderMock {
|
||||||
|
return &frostfsIDProviderMock{
|
||||||
|
subjects: map[util.Uint160]*client.Subject{
|
||||||
|
scriptHashFromSenderKey(t, senderKey): {
|
||||||
|
Namespace: "testnamespace",
|
||||||
|
Name: "test",
|
||||||
|
KV: map[string]string{
|
||||||
|
"tag-attr1": "value1",
|
||||||
|
"tag-attr2": "value2",
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
subjectsExtended: map[util.Uint160]*client.SubjectExtended{
|
||||||
|
scriptHashFromSenderKey(t, senderKey): {
|
||||||
|
Namespace: "testnamespace",
|
||||||
|
Name: "test",
|
||||||
|
KV: map[string]string{
|
||||||
|
"tag-attr1": "value1",
|
||||||
|
"tag-attr2": "value2",
|
||||||
|
},
|
||||||
|
Groups: []*client.Group{
|
||||||
|
{
|
||||||
|
ID: 1,
|
||||||
|
Name: "test",
|
||||||
|
Namespace: "testnamespace",
|
||||||
|
KV: map[string]string{
|
||||||
|
"attr1": "value1",
|
||||||
|
"attr2": "value2",
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func scriptHashFromSenderKey(t *testing.T, senderKey string) util.Uint160 {
|
||||||
|
pk, err := keys.NewPublicKeyFromString(senderKey)
|
||||||
|
require.NoError(t, err)
|
||||||
|
return pk.GetScriptHash()
|
||||||
|
}
|
||||||
|
|
||||||
|
type stMock struct{}
|
||||||
|
|
||||||
|
func (m *stMock) CurrentEpoch() uint64 {
|
||||||
|
return 8
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestCheckAPE(t *testing.T) {
|
||||||
|
cid := cid.ID{}
|
||||||
|
_ = cid.DecodeString(containerID)
|
||||||
|
|
||||||
|
t.Run("put non-tombstone rule won't affect tree remove", func(t *testing.T) {
|
||||||
|
los := inmemory.NewInmemoryLocalStorage()
|
||||||
|
mcs := inmemory.NewInmemoryMorphRuleChainStorage()
|
||||||
|
fid := newFrostfsIDProviderMock(t)
|
||||||
|
s := Service{
|
||||||
|
cfg: cfg{
|
||||||
|
frostfsidSubjectProvider: fid,
|
||||||
|
},
|
||||||
|
apeChecker: checkercore.New(los, mcs, fid, &stMock{}),
|
||||||
|
}
|
||||||
|
|
||||||
|
los.AddOverride(chain.Ingress, engine.ContainerTarget(containerID), &chain.Chain{
|
||||||
|
Rules: []chain.Rule{
|
||||||
|
{
|
||||||
|
Status: chain.AccessDenied,
|
||||||
|
Actions: chain.Actions{Names: []string{nativeschema.MethodPutObject}},
|
||||||
|
Resources: chain.Resources{
|
||||||
|
Names: []string{fmt.Sprintf(nativeschema.ResourceFormatRootContainerObjects, containerID)},
|
||||||
|
},
|
||||||
|
Condition: []chain.Condition{
|
||||||
|
{
|
||||||
|
Op: chain.CondStringNotEquals,
|
||||||
|
Kind: chain.KindResource,
|
||||||
|
Key: nativeschema.PropertyKeyObjectType,
|
||||||
|
Value: "TOMBSTONE",
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
MatchType: chain.MatchTypeFirstMatch,
|
||||||
|
})
|
||||||
|
|
||||||
|
mcs.AddMorphRuleChain(chain.Ingress, engine.ContainerTarget(containerID), &chain.Chain{
|
||||||
|
Rules: []chain.Rule{
|
||||||
|
{
|
||||||
|
Status: chain.Allow,
|
||||||
|
Actions: chain.Actions{Names: []string{nativeschema.MethodDeleteObject}},
|
||||||
|
Resources: chain.Resources{
|
||||||
|
Names: []string{fmt.Sprintf(nativeschema.ResourceFormatRootContainerObjects, containerID)},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
MatchType: chain.MatchTypeFirstMatch,
|
||||||
|
})
|
||||||
|
|
||||||
|
err := s.checkAPE(context.Background(), nil, rootCnr, cid, acl.OpObjectDelete, acl.RoleOwner, senderPrivateKey.PublicKey())
|
||||||
|
require.NoError(t, err)
|
||||||
|
})
|
||||||
|
|
||||||
|
t.Run("delete rule won't affect tree add", func(t *testing.T) {
|
||||||
|
los := inmemory.NewInmemoryLocalStorage()
|
||||||
|
mcs := inmemory.NewInmemoryMorphRuleChainStorage()
|
||||||
|
fid := newFrostfsIDProviderMock(t)
|
||||||
|
s := Service{
|
||||||
|
cfg: cfg{
|
||||||
|
frostfsidSubjectProvider: fid,
|
||||||
|
},
|
||||||
|
apeChecker: checkercore.New(los, mcs, fid, &stMock{}),
|
||||||
|
}
|
||||||
|
|
||||||
|
los.AddOverride(chain.Ingress, engine.ContainerTarget(containerID), &chain.Chain{
|
||||||
|
Rules: []chain.Rule{
|
||||||
|
{
|
||||||
|
Status: chain.AccessDenied,
|
||||||
|
Actions: chain.Actions{Names: []string{nativeschema.MethodDeleteObject}},
|
||||||
|
Resources: chain.Resources{
|
||||||
|
Names: []string{fmt.Sprintf(nativeschema.ResourceFormatRootContainerObjects, containerID)},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
MatchType: chain.MatchTypeFirstMatch,
|
||||||
|
})
|
||||||
|
|
||||||
|
mcs.AddMorphRuleChain(chain.Ingress, engine.ContainerTarget(containerID), &chain.Chain{
|
||||||
|
Rules: []chain.Rule{
|
||||||
|
{
|
||||||
|
Status: chain.Allow,
|
||||||
|
Actions: chain.Actions{Names: []string{nativeschema.MethodPutObject}},
|
||||||
|
Resources: chain.Resources{
|
||||||
|
Names: []string{fmt.Sprintf(nativeschema.ResourceFormatRootContainerObjects, containerID)},
|
||||||
|
},
|
||||||
|
Condition: []chain.Condition{
|
||||||
|
{
|
||||||
|
Op: chain.CondStringNotEquals,
|
||||||
|
Kind: chain.KindResource,
|
||||||
|
Key: nativeschema.PropertyKeyObjectType,
|
||||||
|
Value: "TOMBSTONE",
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
MatchType: chain.MatchTypeFirstMatch,
|
||||||
|
})
|
||||||
|
|
||||||
|
err := s.checkAPE(context.Background(), nil, rootCnr, cid, acl.OpObjectPut, acl.RoleOwner, senderPrivateKey.PublicKey())
|
||||||
|
require.NoError(t, err)
|
||||||
|
})
|
||||||
|
}
|
|
@ -210,7 +210,7 @@ func (s *Service) Remove(ctx context.Context, req *RemoveRequest) (*RemoveRespon
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
err := s.verifyClient(ctx, req, cid, b.GetBearerToken(), acl.OpObjectPut)
|
err := s.verifyClient(ctx, req, cid, b.GetBearerToken(), acl.OpObjectDelete)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
|
@ -3,8 +3,14 @@ package httputil
|
||||||
import (
|
import (
|
||||||
"net/http"
|
"net/http"
|
||||||
"net/http/pprof"
|
"net/http/pprof"
|
||||||
|
|
||||||
|
"github.com/felixge/fgprof"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
func init() {
|
||||||
|
http.DefaultServeMux.Handle("/debug/fgprof", fgprof.Handler())
|
||||||
|
}
|
||||||
|
|
||||||
// initializes pprof package in order to
|
// initializes pprof package in order to
|
||||||
// register Prometheus handlers on http.DefaultServeMux.
|
// register Prometheus handlers on http.DefaultServeMux.
|
||||||
var _ = pprof.Handler("")
|
var _ = pprof.Handler("")
|
||||||
|
|
|
@ -19,7 +19,7 @@ import (
|
||||||
|
|
||||||
func GeneratePayloadPool(count uint, size uint) [][]byte {
|
func GeneratePayloadPool(count uint, size uint) [][]byte {
|
||||||
var pool [][]byte
|
var pool [][]byte
|
||||||
for i := uint(0); i < count; i++ {
|
for range count {
|
||||||
payload := make([]byte, size)
|
payload := make([]byte, size)
|
||||||
_, _ = rand.Read(payload)
|
_, _ = rand.Read(payload)
|
||||||
|
|
||||||
|
@ -30,8 +30,8 @@ func GeneratePayloadPool(count uint, size uint) [][]byte {
|
||||||
|
|
||||||
func GenerateAttributePool(count uint) []objectSDK.Attribute {
|
func GenerateAttributePool(count uint) []objectSDK.Attribute {
|
||||||
var pool []objectSDK.Attribute
|
var pool []objectSDK.Attribute
|
||||||
for i := uint(0); i < count; i++ {
|
for i := range count {
|
||||||
for j := uint(0); j < count; j++ {
|
for j := range count {
|
||||||
attr := *objectSDK.NewAttribute()
|
attr := *objectSDK.NewAttribute()
|
||||||
attr.SetKey(fmt.Sprintf("key%d", i))
|
attr.SetKey(fmt.Sprintf("key%d", i))
|
||||||
attr.SetValue(fmt.Sprintf("value%d", j))
|
attr.SetValue(fmt.Sprintf("value%d", j))
|
||||||
|
@ -43,7 +43,7 @@ func GenerateAttributePool(count uint) []objectSDK.Attribute {
|
||||||
|
|
||||||
func GenerateOwnerPool(count uint) []user.ID {
|
func GenerateOwnerPool(count uint) []user.ID {
|
||||||
var pool []user.ID
|
var pool []user.ID
|
||||||
for i := uint(0); i < count; i++ {
|
for range count {
|
||||||
pool = append(pool, usertest.ID())
|
pool = append(pool, usertest.ID())
|
||||||
}
|
}
|
||||||
return pool
|
return pool
|
||||||
|
@ -118,7 +118,7 @@ func WithPayloadFromPool(pool [][]byte) ObjectOption {
|
||||||
func WithAttributesFromPool(pool []objectSDK.Attribute, count uint) ObjectOption {
|
func WithAttributesFromPool(pool []objectSDK.Attribute, count uint) ObjectOption {
|
||||||
return func(obj *objectSDK.Object) {
|
return func(obj *objectSDK.Object) {
|
||||||
var attrs []objectSDK.Attribute
|
var attrs []objectSDK.Attribute
|
||||||
for i := uint(0); i < count; i++ {
|
for range count {
|
||||||
attrs = append(attrs, pool[rand.Intn(len(pool))])
|
attrs = append(attrs, pool[rand.Intn(len(pool))])
|
||||||
}
|
}
|
||||||
obj.SetAttributes(attrs...)
|
obj.SetAttributes(attrs...)
|
||||||
|
|
|
@ -29,7 +29,7 @@ func PopulateWithObjects(
|
||||||
) {
|
) {
|
||||||
digits := "0123456789"
|
digits := "0123456789"
|
||||||
|
|
||||||
for i := uint(0); i < count; i++ {
|
for range count {
|
||||||
obj := factory()
|
obj := factory()
|
||||||
|
|
||||||
id := []byte(fmt.Sprintf(
|
id := []byte(fmt.Sprintf(
|
||||||
|
@ -59,7 +59,7 @@ func PopulateWithBigObjects(
|
||||||
count uint,
|
count uint,
|
||||||
factory func() *objectSDK.Object,
|
factory func() *objectSDK.Object,
|
||||||
) {
|
) {
|
||||||
for i := uint(0); i < count; i++ {
|
for range count {
|
||||||
group.Go(func() error {
|
group.Go(func() error {
|
||||||
if err := populateWithBigObject(ctx, db, factory); err != nil {
|
if err := populateWithBigObject(ctx, db, factory); err != nil {
|
||||||
return fmt.Errorf("couldn't put a big object: %w", err)
|
return fmt.Errorf("couldn't put a big object: %w", err)
|
||||||
|
@ -154,7 +154,7 @@ func PopulateGraveyard(
|
||||||
wg := &sync.WaitGroup{}
|
wg := &sync.WaitGroup{}
|
||||||
wg.Add(int(count))
|
wg.Add(int(count))
|
||||||
|
|
||||||
for i := uint(0); i < count; i++ {
|
for range count {
|
||||||
obj := factory()
|
obj := factory()
|
||||||
|
|
||||||
prm := meta.PutPrm{}
|
prm := meta.PutPrm{}
|
||||||
|
@ -226,7 +226,7 @@ func PopulateLocked(
|
||||||
wg := &sync.WaitGroup{}
|
wg := &sync.WaitGroup{}
|
||||||
wg.Add(int(count))
|
wg.Add(int(count))
|
||||||
|
|
||||||
for i := uint(0); i < count; i++ {
|
for range count {
|
||||||
defer wg.Done()
|
defer wg.Done()
|
||||||
|
|
||||||
obj := factory()
|
obj := factory()
|
||||||
|
|
|
@ -116,7 +116,7 @@ func populate() (err error) {
|
||||||
eg, ctx := errgroup.WithContext(ctx)
|
eg, ctx := errgroup.WithContext(ctx)
|
||||||
eg.SetLimit(int(jobs))
|
eg.SetLimit(int(jobs))
|
||||||
|
|
||||||
for i := uint(0); i < numContainers; i++ {
|
for range numContainers {
|
||||||
cid := cidtest.ID()
|
cid := cidtest.ID()
|
||||||
|
|
||||||
for _, typ := range types {
|
for _, typ := range types {
|
||||||
|
|
Loading…
Reference in a new issue