forked from TrueCloudLab/frostfs-node
Compare commits
25 commits
feat/strea
...
master
Author | SHA1 | Date | |
---|---|---|---|
7edec9193c | |||
3cf6ea745d | |||
9a77527f46 | |||
5b1ba8e23d | |||
9902965ff4 | |||
33ad753302 | |||
15102e6dfd | |||
17ec84151b | |||
6c45a17af6 | |||
d19ab43500 | |||
5bcf81d1cc | |||
c2effcc61c | |||
2285cfc36f | |||
e74d05c03f | |||
48862e0e63 | |||
89892d9754 | |||
7ac0852364 | |||
d28a5d2d7a | |||
87ac3c5279 | |||
d5ee6d3039 | |||
433aab12bb | |||
81f4cdbb91 | |||
3cd7d23f10 | |||
012af5cc38 | |||
eb5336d5ff |
62 changed files with 1012 additions and 256 deletions
|
@ -87,5 +87,7 @@ linters:
|
|||
- perfsprint
|
||||
- testifylint
|
||||
- protogetter
|
||||
- intrange
|
||||
- tenv
|
||||
disable-all: true
|
||||
fast: false
|
||||
|
|
|
@ -1,11 +0,0 @@
|
|||
pipeline:
|
||||
# Kludge for non-root containers under WoodPecker
|
||||
fix-ownership:
|
||||
image: alpine:latest
|
||||
commands: chown -R 1234:1234 .
|
||||
|
||||
pre-commit:
|
||||
image: git.frostfs.info/truecloudlab/frostfs-ci:v0.36
|
||||
commands:
|
||||
- export HOME="$(getent passwd $(id -u) | cut '-d:' -f6)"
|
||||
- pre-commit run --hook-stage manual
|
2
Makefile
2
Makefile
|
@ -8,7 +8,7 @@ HUB_IMAGE ?= git.frostfs.info/truecloudlab/frostfs
|
|||
HUB_TAG ?= "$(shell echo ${VERSION} | sed 's/^v//')"
|
||||
|
||||
GO_VERSION ?= 1.22
|
||||
LINT_VERSION ?= 1.60.3
|
||||
LINT_VERSION ?= 1.61.0
|
||||
TRUECLOUDLAB_LINT_VERSION ?= 0.0.7
|
||||
PROTOC_VERSION ?= 25.0
|
||||
PROTOGEN_FROSTFS_VERSION ?= $(shell go list -f '{{.Version}}' -m git.frostfs.info/TrueCloudLab/frostfs-api-go/v2)
|
||||
|
|
|
@ -128,7 +128,7 @@ func generateConfigExample(appDir string, credSize int) (string, error) {
|
|||
tmpl.AlphabetDir = filepath.Join(appDir, "alphabet-wallets")
|
||||
|
||||
var i innerring.GlagoliticLetter
|
||||
for i = 0; i < innerring.GlagoliticLetter(credSize); i++ {
|
||||
for i = range innerring.GlagoliticLetter(credSize) {
|
||||
tmpl.Glagolitics = append(tmpl.Glagolitics, i.String())
|
||||
}
|
||||
|
||||
|
|
|
@ -63,7 +63,7 @@ func TestGenerateAlphabet(t *testing.T) {
|
|||
buf.Reset()
|
||||
v.Set(commonflags.AlphabetWalletsFlag, walletDir)
|
||||
require.NoError(t, GenerateAlphabetCmd.Flags().Set(commonflags.AlphabetSizeFlag, strconv.FormatUint(size, 10)))
|
||||
for i := uint64(0); i < size; i++ {
|
||||
for i := range uint64(size) {
|
||||
buf.WriteString(strconv.FormatUint(i, 10) + "\r")
|
||||
}
|
||||
|
||||
|
|
|
@ -659,9 +659,7 @@ func SearchObjects(ctx context.Context, prm SearchObjectsPrm) (*SearchObjectsRes
|
|||
|
||||
for {
|
||||
n, ok = rdr.Read(buf)
|
||||
for i := range n {
|
||||
list = append(list, buf[i])
|
||||
}
|
||||
list = append(list, buf[:n]...)
|
||||
if !ok {
|
||||
break
|
||||
}
|
||||
|
@ -672,9 +670,8 @@ func SearchObjects(ctx context.Context, prm SearchObjectsPrm) (*SearchObjectsRes
|
|||
return nil, fmt.Errorf("read object list: %w", err)
|
||||
}
|
||||
|
||||
sort.Slice(list, func(i, j int) bool {
|
||||
lhs, rhs := list[i].EncodeToString(), list[j].EncodeToString()
|
||||
return strings.Compare(lhs, rhs) < 0
|
||||
slices.SortFunc(list, func(a, b oid.ID) int {
|
||||
return strings.Compare(a.EncodeToString(), b.EncodeToString())
|
||||
})
|
||||
|
||||
return &SearchObjectsRes{
|
||||
|
|
|
@ -2,7 +2,7 @@ package common
|
|||
|
||||
import (
|
||||
"context"
|
||||
"sort"
|
||||
"slices"
|
||||
"strings"
|
||||
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-cli/internal/commonflags"
|
||||
|
@ -45,15 +45,11 @@ func StartClientCommandSpan(cmd *cobra.Command) {
|
|||
})
|
||||
commonCmd.ExitOnErr(cmd, "init tracing: %w", err)
|
||||
|
||||
var components sort.StringSlice
|
||||
var components []string
|
||||
for c := cmd; c != nil; c = c.Parent() {
|
||||
components = append(components, c.Name())
|
||||
}
|
||||
for i, j := 0, len(components)-1; i < j; {
|
||||
components.Swap(i, j)
|
||||
i++
|
||||
j--
|
||||
}
|
||||
slices.Reverse(components)
|
||||
|
||||
operation := strings.Join(components, ".")
|
||||
ctx, span := tracing.StartSpanFromContext(cmd.Context(), operation)
|
||||
|
|
|
@ -195,7 +195,7 @@ func flattenComplexMembersIfECContainer(cmd *cobra.Command, cnrID cid.ID, member
|
|||
prmHead.SetRawFlag(true) // to get an error instead of whole object
|
||||
|
||||
eg, egCtx := errgroup.WithContext(cmd.Context())
|
||||
for idx := range len(members) {
|
||||
for idx := range members {
|
||||
partObjID := members[idx]
|
||||
|
||||
eg.Go(func() error {
|
||||
|
|
|
@ -21,7 +21,6 @@ import (
|
|||
"git.frostfs.info/TrueCloudLab/frostfs-node/misc"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/config"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/gendoc"
|
||||
"github.com/mitchellh/go-homedir"
|
||||
"github.com/spf13/cobra"
|
||||
"github.com/spf13/viper"
|
||||
)
|
||||
|
@ -112,14 +111,16 @@ func initConfig() {
|
|||
// Use config file from the flag.
|
||||
viper.SetConfigFile(cfgFile)
|
||||
} else {
|
||||
// Find home directory.
|
||||
home, err := homedir.Dir()
|
||||
commonCmd.ExitOnErr(rootCmd, "", err)
|
||||
|
||||
// Search config in `$HOME/.config/frostfs-cli/` with name "config.yaml"
|
||||
viper.AddConfigPath(filepath.Join(home, ".config", "frostfs-cli"))
|
||||
viper.SetConfigName("config")
|
||||
viper.SetConfigType("yaml")
|
||||
// Find config directory.
|
||||
configDir, err := os.UserConfigDir()
|
||||
if err != nil {
|
||||
common.PrintVerbose(rootCmd, "Get config dir: %s", err)
|
||||
} else {
|
||||
// Search config in `$XDG_CONFIG_HOME/frostfs-cli/` with name "config.yaml"
|
||||
viper.AddConfigPath(filepath.Join(configDir, "frostfs-cli"))
|
||||
viper.SetConfigName("config")
|
||||
viper.SetConfigType("yaml")
|
||||
}
|
||||
}
|
||||
|
||||
viper.SetEnvPrefix(envPrefix)
|
||||
|
|
|
@ -11,7 +11,7 @@ func DecodeOIDs(data []byte) ([]oid.ID, error) {
|
|||
size := r.ReadVarUint()
|
||||
oids := make([]oid.ID, size)
|
||||
|
||||
for i := uint64(0); i < size; i++ {
|
||||
for i := range size {
|
||||
if err := oids[i].Decode(r.ReadVarBytes()); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
|
|
@ -58,6 +58,7 @@ import (
|
|||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/control"
|
||||
objectService "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object"
|
||||
getsvc "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/get"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object_manager/placement"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object_manager/tombstone"
|
||||
tsourse "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object_manager/tombstone/source"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/replicator"
|
||||
|
@ -109,6 +110,7 @@ type applicationConfiguration struct {
|
|||
|
||||
ObjectCfg struct {
|
||||
tombstoneLifetime uint64
|
||||
priorityMetrics []placement.Metric
|
||||
}
|
||||
|
||||
EngineCfg struct {
|
||||
|
@ -232,6 +234,15 @@ func (a *applicationConfiguration) readConfig(c *config.Config) error {
|
|||
// Object
|
||||
|
||||
a.ObjectCfg.tombstoneLifetime = objectconfig.TombstoneLifetime(c)
|
||||
var pm []placement.Metric
|
||||
for _, raw := range objectconfig.Get(c).Priority() {
|
||||
m, err := placement.ParseMetric(raw)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
pm = append(pm, m)
|
||||
}
|
||||
a.ObjectCfg.priorityMetrics = pm
|
||||
|
||||
// Storage Engine
|
||||
|
||||
|
@ -1179,17 +1190,15 @@ func initObjectPool(cfg *config.Config) (pool cfgObjectRoutines) {
|
|||
return pool
|
||||
}
|
||||
|
||||
func (c *cfg) LocalNodeInfo() (*netmapV2.NodeInfo, error) {
|
||||
var res netmapV2.NodeInfo
|
||||
|
||||
func (c *cfg) LocalNodeInfo() *netmap.NodeInfo {
|
||||
var res netmap.NodeInfo
|
||||
ni, ok := c.cfgNetmap.state.getNodeInfo()
|
||||
if ok {
|
||||
ni.WriteToV2(&res)
|
||||
res = ni
|
||||
} else {
|
||||
c.cfgNodeInfo.localInfo.WriteToV2(&res)
|
||||
res = c.cfgNodeInfo.localInfo
|
||||
}
|
||||
|
||||
return &res, nil
|
||||
return &res
|
||||
}
|
||||
|
||||
// setContractNodeInfo rewrites local node info from the FrostFS network map.
|
||||
|
|
|
@ -1,7 +1,6 @@
|
|||
package config_test
|
||||
|
||||
import (
|
||||
"os"
|
||||
"strings"
|
||||
"testing"
|
||||
|
||||
|
@ -38,8 +37,7 @@ func TestConfigEnv(t *testing.T) {
|
|||
|
||||
envName := strings.ToUpper(
|
||||
strings.Join([]string{config.EnvPrefix, section, name}, configViper.EnvSeparator))
|
||||
err := os.Setenv(envName, value)
|
||||
require.NoError(t, err)
|
||||
t.Setenv(envName, value)
|
||||
|
||||
c := configtest.EmptyConfig()
|
||||
|
||||
|
|
|
@ -10,10 +10,17 @@ type PutConfig struct {
|
|||
cfg *config.Config
|
||||
}
|
||||
|
||||
// GetConfig is a wrapper over "get" config section which provides access
|
||||
// to object get pipeline configuration of object service.
|
||||
type GetConfig struct {
|
||||
cfg *config.Config
|
||||
}
|
||||
|
||||
const (
|
||||
subsection = "object"
|
||||
|
||||
putSubsection = "put"
|
||||
getSubsection = "get"
|
||||
|
||||
// PutPoolSizeDefault is a default value of routine pool size to
|
||||
// process object.Put requests in object service.
|
||||
|
@ -56,3 +63,16 @@ func (g PutConfig) PoolSizeLocal() int {
|
|||
func (g PutConfig) SkipSessionTokenIssuerVerification() bool {
|
||||
return config.BoolSafe(g.cfg, "skip_session_token_issuer_verification")
|
||||
}
|
||||
|
||||
// Get returns structure that provides access to "get" subsection of
|
||||
// "object" section.
|
||||
func Get(c *config.Config) GetConfig {
|
||||
return GetConfig{
|
||||
c.Sub(subsection).Sub(getSubsection),
|
||||
}
|
||||
}
|
||||
|
||||
// Priority returns the value of "priority" config parameter.
|
||||
func (g GetConfig) Priority() []string {
|
||||
return config.StringSliceSafe(g.cfg, "priority")
|
||||
}
|
||||
|
|
|
@ -11,8 +11,6 @@ import (
|
|||
)
|
||||
|
||||
func fromFile(path string) *config.Config {
|
||||
os.Clearenv() // ENVs have priority over config files, so we do this in tests
|
||||
|
||||
return config.New(path, "", "")
|
||||
}
|
||||
|
||||
|
@ -40,15 +38,6 @@ func ForEachFileType(pref string, f func(*config.Config)) {
|
|||
|
||||
// ForEnvFileType creates config from `<pref>.env` file.
|
||||
func ForEnvFileType(t testing.TB, pref string, f func(*config.Config)) {
|
||||
envs := os.Environ()
|
||||
t.Cleanup(func() {
|
||||
os.Clearenv()
|
||||
for _, env := range envs {
|
||||
keyValue := strings.Split(env, "=")
|
||||
os.Setenv(keyValue[0], keyValue[1])
|
||||
}
|
||||
})
|
||||
|
||||
f(fromEnvFile(t, pref+".env"))
|
||||
}
|
||||
|
||||
|
@ -73,7 +62,6 @@ func loadEnv(t testing.TB, path string) {
|
|||
|
||||
v = strings.Trim(v, `"`)
|
||||
|
||||
err = os.Setenv(k, v)
|
||||
require.NoError(t, err, "can't set environment variable")
|
||||
t.Setenv(k, v)
|
||||
}
|
||||
}
|
||||
|
|
|
@ -8,6 +8,7 @@ import (
|
|||
containerGRPC "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/container/grpc"
|
||||
morphconfig "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/morph"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/metrics"
|
||||
containerCore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/container"
|
||||
frostfsidcore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/frostfsid"
|
||||
cntClient "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/client/container"
|
||||
|
@ -42,7 +43,7 @@ func initContainerService(_ context.Context, c *cfg) {
|
|||
|
||||
cacheSize := morphconfig.FrostfsIDCacheSize(c.appCfg)
|
||||
if cacheSize > 0 {
|
||||
frostfsIDSubjectProvider = newMorphFrostfsIDCache(frostfsIDSubjectProvider, int(cacheSize), c.cfgMorph.cacheTTL)
|
||||
frostfsIDSubjectProvider = newMorphFrostfsIDCache(frostfsIDSubjectProvider, int(cacheSize), c.cfgMorph.cacheTTL, metrics.NewCacheMetrics("frostfs_id"))
|
||||
}
|
||||
|
||||
c.shared.frostfsidClient = frostfsIDSubjectProvider
|
||||
|
|
|
@ -1,6 +1,7 @@
|
|||
package main
|
||||
|
||||
import (
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-contract/frostfsid/client"
|
||||
|
@ -9,57 +10,101 @@ import (
|
|||
"github.com/nspcc-dev/neo-go/pkg/util"
|
||||
)
|
||||
|
||||
type subjectWithError struct {
|
||||
subject *client.Subject
|
||||
err error
|
||||
}
|
||||
|
||||
type subjectExtWithError struct {
|
||||
subject *client.SubjectExtended
|
||||
err error
|
||||
}
|
||||
|
||||
type morphFrostfsIDCache struct {
|
||||
subjProvider frostfsidcore.SubjectProvider
|
||||
|
||||
subjCache *expirable.LRU[util.Uint160, *client.Subject]
|
||||
subjCache *expirable.LRU[util.Uint160, subjectWithError]
|
||||
|
||||
subjExtCache *expirable.LRU[util.Uint160, *client.SubjectExtended]
|
||||
subjExtCache *expirable.LRU[util.Uint160, subjectExtWithError]
|
||||
|
||||
metrics cacheMetrics
|
||||
}
|
||||
|
||||
func newMorphFrostfsIDCache(subjProvider frostfsidcore.SubjectProvider, size int, ttl time.Duration) frostfsidcore.SubjectProvider {
|
||||
func newMorphFrostfsIDCache(subjProvider frostfsidcore.SubjectProvider, size int, ttl time.Duration, metrics cacheMetrics) frostfsidcore.SubjectProvider {
|
||||
return &morphFrostfsIDCache{
|
||||
subjProvider: subjProvider,
|
||||
|
||||
subjCache: expirable.NewLRU(size, func(util.Uint160, *client.Subject) {}, ttl),
|
||||
subjCache: expirable.NewLRU(size, func(util.Uint160, subjectWithError) {}, ttl),
|
||||
|
||||
subjExtCache: expirable.NewLRU(size, func(util.Uint160, *client.SubjectExtended) {}, ttl),
|
||||
subjExtCache: expirable.NewLRU(size, func(util.Uint160, subjectExtWithError) {}, ttl),
|
||||
|
||||
metrics: metrics,
|
||||
}
|
||||
}
|
||||
|
||||
func (m *morphFrostfsIDCache) GetSubject(addr util.Uint160) (*client.Subject, error) {
|
||||
hit := false
|
||||
startedAt := time.Now()
|
||||
defer func() {
|
||||
m.metrics.AddMethodDuration("GetSubject", time.Since(startedAt), hit)
|
||||
}()
|
||||
|
||||
result, found := m.subjCache.Get(addr)
|
||||
if found {
|
||||
return result, nil
|
||||
hit = true
|
||||
return result.subject, result.err
|
||||
}
|
||||
|
||||
result, err := m.subjProvider.GetSubject(addr)
|
||||
subj, err := m.subjProvider.GetSubject(addr)
|
||||
if err != nil {
|
||||
if m.isCacheableError(err) {
|
||||
m.subjCache.Add(addr, subjectWithError{
|
||||
err: err,
|
||||
})
|
||||
}
|
||||
return nil, err
|
||||
}
|
||||
|
||||
m.subjCache.Add(addr, result)
|
||||
return result, nil
|
||||
m.subjCache.Add(addr, subjectWithError{subject: subj})
|
||||
return subj, nil
|
||||
}
|
||||
|
||||
func (m *morphFrostfsIDCache) GetSubjectExtended(addr util.Uint160) (*client.SubjectExtended, error) {
|
||||
subjExt, found := m.subjExtCache.Get(addr)
|
||||
hit := false
|
||||
startedAt := time.Now()
|
||||
defer func() {
|
||||
m.metrics.AddMethodDuration("GetSubjectExtended", time.Since(startedAt), hit)
|
||||
}()
|
||||
|
||||
result, found := m.subjExtCache.Get(addr)
|
||||
if found {
|
||||
return subjExt, nil
|
||||
hit = true
|
||||
return result.subject, result.err
|
||||
}
|
||||
|
||||
var err error
|
||||
subjExt, err = m.subjProvider.GetSubjectExtended(addr)
|
||||
subjExt, err := m.subjProvider.GetSubjectExtended(addr)
|
||||
if err != nil {
|
||||
if m.isCacheableError(err) {
|
||||
m.subjExtCache.Add(addr, subjectExtWithError{
|
||||
err: err,
|
||||
})
|
||||
m.subjCache.Add(addr, subjectWithError{
|
||||
err: err,
|
||||
})
|
||||
}
|
||||
return nil, err
|
||||
}
|
||||
|
||||
m.subjExtCache.Add(addr, subjExt)
|
||||
m.subjCache.Add(addr, subjectFromSubjectExtended(subjExt))
|
||||
m.subjExtCache.Add(addr, subjectExtWithError{subject: subjExt})
|
||||
m.subjCache.Add(addr, subjectWithError{subject: subjectFromSubjectExtended(subjExt)})
|
||||
|
||||
return subjExt, nil
|
||||
}
|
||||
|
||||
func (m *morphFrostfsIDCache) isCacheableError(err error) bool {
|
||||
return strings.Contains(err.Error(), frostfsidcore.SubjectNotFoundErrorMessage)
|
||||
}
|
||||
|
||||
func subjectFromSubjectExtended(subjExt *client.SubjectExtended) *client.Subject {
|
||||
return &client.Subject{
|
||||
PrimaryKey: subjExt.PrimaryKey,
|
||||
|
|
|
@ -17,15 +17,16 @@ import (
|
|||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/rand"
|
||||
"github.com/nspcc-dev/neo-go/pkg/core/block"
|
||||
"github.com/nspcc-dev/neo-go/pkg/core/state"
|
||||
"github.com/nspcc-dev/neo-go/pkg/neorpc/result"
|
||||
"github.com/nspcc-dev/neo-go/pkg/rpcclient/waiter"
|
||||
"github.com/nspcc-dev/neo-go/pkg/smartcontract/trigger"
|
||||
"github.com/nspcc-dev/neo-go/pkg/util"
|
||||
"github.com/nspcc-dev/neo-go/pkg/vm/vmstate"
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
const (
|
||||
newEpochNotification = "NewEpoch"
|
||||
|
||||
// amount of tries(blocks) before notary deposit timeout.
|
||||
notaryDepositRetriesAmount = 300
|
||||
)
|
||||
|
||||
func (c *cfg) initMorphComponents(ctx context.Context) {
|
||||
|
@ -128,7 +129,7 @@ func makeAndWaitNotaryDeposit(ctx context.Context, c *cfg) {
|
|||
return
|
||||
}
|
||||
|
||||
tx, err := makeNotaryDeposit(c)
|
||||
tx, vub, err := makeNotaryDeposit(c)
|
||||
fatalOnErr(err)
|
||||
|
||||
if tx.Equals(util.Uint256{}) {
|
||||
|
@ -139,11 +140,11 @@ func makeAndWaitNotaryDeposit(ctx context.Context, c *cfg) {
|
|||
return
|
||||
}
|
||||
|
||||
err = waitNotaryDeposit(ctx, c, tx)
|
||||
err = waitNotaryDeposit(ctx, c, tx, vub)
|
||||
fatalOnErr(err)
|
||||
}
|
||||
|
||||
func makeNotaryDeposit(c *cfg) (util.Uint256, error) {
|
||||
func makeNotaryDeposit(c *cfg) (util.Uint256, uint32, error) {
|
||||
const (
|
||||
// gasMultiplier defines how many times more the notary
|
||||
// balance must be compared to the GAS balance of the node:
|
||||
|
@ -157,7 +158,7 @@ func makeNotaryDeposit(c *cfg) (util.Uint256, error) {
|
|||
|
||||
depositAmount, err := client.CalculateNotaryDepositAmount(c.cfgMorph.client, gasMultiplier, gasDivisor)
|
||||
if err != nil {
|
||||
return util.Uint256{}, fmt.Errorf("could not calculate notary deposit: %w", err)
|
||||
return util.Uint256{}, 0, fmt.Errorf("could not calculate notary deposit: %w", err)
|
||||
}
|
||||
|
||||
return c.cfgMorph.client.DepositEndlessNotary(depositAmount)
|
||||
|
@ -168,32 +169,43 @@ var (
|
|||
errNotaryDepositTimeout = errors.New("notary deposit tx has not appeared in the network")
|
||||
)
|
||||
|
||||
func waitNotaryDeposit(ctx context.Context, c *cfg, tx util.Uint256) error {
|
||||
for range notaryDepositRetriesAmount {
|
||||
c.log.Debug(logs.ClientAttemptToWaitForNotaryDepositTransactionToGetPersisted)
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return ctx.Err()
|
||||
default:
|
||||
}
|
||||
type waiterClient struct {
|
||||
c *client.Client
|
||||
}
|
||||
|
||||
ok, err := c.cfgMorph.client.TxHalt(tx)
|
||||
if err == nil {
|
||||
if ok {
|
||||
c.log.Info(logs.ClientNotaryDepositTransactionWasSuccessfullyPersisted)
|
||||
return nil
|
||||
}
|
||||
func (w *waiterClient) Context() context.Context {
|
||||
return context.Background()
|
||||
}
|
||||
|
||||
return errNotaryDepositFail
|
||||
}
|
||||
func (w *waiterClient) GetApplicationLog(hash util.Uint256, trig *trigger.Type) (*result.ApplicationLog, error) {
|
||||
return w.c.GetApplicationLog(hash, trig)
|
||||
}
|
||||
|
||||
err = c.cfgMorph.client.Wait(ctx, 1)
|
||||
if err != nil {
|
||||
return fmt.Errorf("could not wait for one block in chain: %w", err)
|
||||
}
|
||||
func (w *waiterClient) GetBlockCount() (uint32, error) {
|
||||
return w.c.BlockCount()
|
||||
}
|
||||
|
||||
func (w *waiterClient) GetVersion() (*result.Version, error) {
|
||||
return w.c.GetVersion()
|
||||
}
|
||||
|
||||
func waitNotaryDeposit(ctx context.Context, c *cfg, tx util.Uint256, vub uint32) error {
|
||||
w, err := waiter.NewPollingBased(&waiterClient{c: c.cfgMorph.client})
|
||||
if err != nil {
|
||||
return fmt.Errorf("could not create notary deposit waiter: %w", err)
|
||||
}
|
||||
|
||||
return errNotaryDepositTimeout
|
||||
res, err := w.WaitAny(ctx, vub, tx)
|
||||
if err != nil {
|
||||
if errors.Is(err, waiter.ErrTxNotAccepted) {
|
||||
return errNotaryDepositTimeout
|
||||
}
|
||||
return fmt.Errorf("could not wait for notary deposit persists in chain: %w", err)
|
||||
}
|
||||
if res.Execution.VMState.HasFlag(vmstate.Halt) {
|
||||
c.log.Info(logs.ClientNotaryDepositTransactionWasSuccessfullyPersisted)
|
||||
return nil
|
||||
}
|
||||
return errNotaryDepositFail
|
||||
}
|
||||
|
||||
func listenMorphNotifications(ctx context.Context, c *cfg) {
|
||||
|
|
|
@ -192,7 +192,7 @@ func addNewEpochNotificationHandlers(c *cfg) {
|
|||
|
||||
if c.cfgMorph.notaryEnabled {
|
||||
addNewEpochAsyncNotificationHandler(c, func(_ event.Event) {
|
||||
_, err := makeNotaryDeposit(c)
|
||||
_, _, err := makeNotaryDeposit(c)
|
||||
if err != nil {
|
||||
c.log.Error(logs.FrostFSNodeCouldNotMakeNotaryDeposit,
|
||||
zap.String("error", err.Error()),
|
||||
|
|
|
@ -178,7 +178,8 @@ func initObjectService(c *cfg) {
|
|||
|
||||
sSearchV2 := createSearchSvcV2(sSearch, keyStorage)
|
||||
|
||||
sGet := createGetService(c, keyStorage, traverseGen, c.clientCache, c.cfgObject.cnrSource)
|
||||
sGet := createGetService(c, keyStorage, traverseGen, c.clientCache, c.cfgObject.cnrSource,
|
||||
c.ObjectCfg.priorityMetrics)
|
||||
|
||||
*c.cfgObject.getSvc = *sGet // need smth better
|
||||
|
||||
|
@ -389,6 +390,7 @@ func createSearchSvcV2(sSearch *searchsvc.Service, keyStorage *util.KeyStorage)
|
|||
func createGetService(c *cfg, keyStorage *util.KeyStorage, traverseGen *util.TraverserGenerator,
|
||||
coreConstructor *cache.ClientCache,
|
||||
containerSource containercore.Source,
|
||||
priorityMetrics []placement.Metric,
|
||||
) *getsvc.Service {
|
||||
ls := c.cfgObject.cfgLocalStorage.localStorage
|
||||
|
||||
|
@ -398,6 +400,8 @@ func createGetService(c *cfg, keyStorage *util.KeyStorage, traverseGen *util.Tra
|
|||
ls,
|
||||
traverseGen.WithTraverseOptions(
|
||||
placement.SuccessAfter(1),
|
||||
placement.WithPriorityMetrics(priorityMetrics),
|
||||
placement.WithNodeState(c),
|
||||
),
|
||||
coreConstructor,
|
||||
containerSource,
|
||||
|
|
|
@ -87,6 +87,7 @@ FROSTFS_OBJECT_PUT_REMOTE_POOL_SIZE=100
|
|||
FROSTFS_OBJECT_PUT_LOCAL_POOL_SIZE=200
|
||||
FROSTFS_OBJECT_PUT_SKIP_SESSION_TOKEN_ISSUER_VERIFICATION=true
|
||||
FROSTFS_OBJECT_DELETE_TOMBSTONE_LIFETIME=10
|
||||
FROSTFS_OBJECT_GET_PRIORITY="$attribute:ClusterName $attribute:UN-LOCODE"
|
||||
|
||||
# Storage engine section
|
||||
FROSTFS_STORAGE_SHARD_POOL_SIZE=15
|
||||
|
|
|
@ -131,6 +131,9 @@
|
|||
"remote_pool_size": 100,
|
||||
"local_pool_size": 200,
|
||||
"skip_session_token_issuer_verification": true
|
||||
},
|
||||
"get": {
|
||||
"priority": ["$attribute:ClusterName", "$attribute:UN-LOCODE"]
|
||||
}
|
||||
},
|
||||
"storage": {
|
||||
|
|
|
@ -114,6 +114,10 @@ object:
|
|||
remote_pool_size: 100 # number of async workers for remote PUT operations
|
||||
local_pool_size: 200 # number of async workers for local PUT operations
|
||||
skip_session_token_issuer_verification: true # session token issuer verification will be skipped if true
|
||||
get:
|
||||
priority: # list of metrics of nodes for prioritization
|
||||
- $attribute:ClusterName
|
||||
- $attribute:UN-LOCODE
|
||||
|
||||
storage:
|
||||
# note: shard configuration can be omitted for relay node (see `node.relay`)
|
||||
|
|
|
@ -407,13 +407,17 @@ Contains object-service related parameters.
|
|||
object:
|
||||
put:
|
||||
remote_pool_size: 100
|
||||
get:
|
||||
priority:
|
||||
- $attribute:ClusterName
|
||||
```
|
||||
|
||||
| Parameter | Type | Default value | Description |
|
||||
|-----------------------------|-------|---------------|------------------------------------------------------------------------------------------------|
|
||||
| `delete.tombstone_lifetime` | `int` | `5` | Tombstone lifetime for removed objects in epochs. |
|
||||
| `put.remote_pool_size` | `int` | `10` | Max pool size for performing remote `PUT` operations. Used by Policer and Replicator services. |
|
||||
| `put.local_pool_size` | `int` | `10` | Max pool size for performing local `PUT` operations. Used by Policer and Replicator services. |
|
||||
| Parameter | Type | Default value | Description |
|
||||
|-----------------------------|------------|---------------|------------------------------------------------------------------------------------------------------|
|
||||
| `delete.tombstone_lifetime` | `int` | `5` | Tombstone lifetime for removed objects in epochs. |
|
||||
| `put.remote_pool_size` | `int` | `10` | Max pool size for performing remote `PUT` operations. Used by Policer and Replicator services. |
|
||||
| `put.local_pool_size` | `int` | `10` | Max pool size for performing local `PUT` operations. Used by Policer and Replicator services. |
|
||||
| `get.priority` | `[]string` | | List of metrics of nodes for prioritization. Used for computing response on GET and SEARCH requests. |
|
||||
|
||||
# `runtime` section
|
||||
Contains runtime parameters.
|
||||
|
|
3
go.mod
3
go.mod
|
@ -19,6 +19,7 @@ require (
|
|||
github.com/cheggaaa/pb v1.0.29
|
||||
github.com/chzyer/readline v1.5.1
|
||||
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc
|
||||
github.com/felixge/fgprof v0.9.5
|
||||
github.com/flynn-archive/go-shlex v0.0.0-20150515145356-3f9db97f8568
|
||||
github.com/gdamore/tcell/v2 v2.7.4
|
||||
github.com/go-pkgz/expirable-cache/v3 v3.0.0
|
||||
|
@ -26,7 +27,6 @@ require (
|
|||
github.com/hashicorp/golang-lru/v2 v2.0.7
|
||||
github.com/klauspost/compress v1.17.4
|
||||
github.com/mailru/easyjson v0.7.7
|
||||
github.com/mitchellh/go-homedir v1.1.0
|
||||
github.com/mr-tron/base58 v1.2.0
|
||||
github.com/multiformats/go-multiaddr v0.12.1
|
||||
github.com/nspcc-dev/neo-go v0.106.3
|
||||
|
@ -77,6 +77,7 @@ require (
|
|||
github.com/go-logr/logr v1.4.2 // indirect
|
||||
github.com/go-logr/stdr v1.2.2 // indirect
|
||||
github.com/golang/snappy v0.0.4 // indirect
|
||||
github.com/google/pprof v0.0.0-20240227163752-401108e1b7e7 // indirect
|
||||
github.com/gorilla/websocket v1.5.1 // indirect
|
||||
github.com/grpc-ecosystem/go-grpc-middleware/providers/prometheus v1.0.0 // indirect
|
||||
github.com/grpc-ecosystem/go-grpc-middleware/v2 v2.0.1 // indirect
|
||||
|
|
BIN
go.sum
BIN
go.sum
Binary file not shown.
|
@ -142,7 +142,6 @@ const (
|
|||
ClientNotaryRequestWithPreparedMainTXInvoked = "notary request with prepared main TX invoked"
|
||||
ClientNotaryRequestInvoked = "notary request invoked"
|
||||
ClientNotaryDepositTransactionWasSuccessfullyPersisted = "notary deposit transaction was successfully persisted"
|
||||
ClientAttemptToWaitForNotaryDepositTransactionToGetPersisted = "attempt to wait for notary deposit transaction to get persisted"
|
||||
ClientNeoClientInvoke = "neo client invoke"
|
||||
ClientNativeGasTransferInvoke = "native gas transfer invoke"
|
||||
ClientBatchGasTransferInvoke = "batch gas transfer invoke"
|
||||
|
|
|
@ -40,13 +40,14 @@ func (s *Server) depositMainNotary() (tx util.Uint256, err error) {
|
|||
)
|
||||
}
|
||||
|
||||
func (s *Server) depositSideNotary() (tx util.Uint256, err error) {
|
||||
func (s *Server) depositSideNotary() (util.Uint256, error) {
|
||||
depositAmount, err := client.CalculateNotaryDepositAmount(s.morphClient, gasMultiplier, gasDivisor)
|
||||
if err != nil {
|
||||
return util.Uint256{}, fmt.Errorf("could not calculate side notary deposit amount: %w", err)
|
||||
}
|
||||
|
||||
return s.morphClient.DepositEndlessNotary(depositAmount)
|
||||
tx, _, err := s.morphClient.DepositEndlessNotary(depositAmount)
|
||||
return tx, err
|
||||
}
|
||||
|
||||
func (s *Server) notaryHandler(_ event.Event) {
|
||||
|
|
|
@ -61,7 +61,7 @@ func TestBlobovniczaTreeFillPercentRebuild(t *testing.T) {
|
|||
require.NoError(t, b.Init())
|
||||
|
||||
storageIDs := make(map[oid.Address][]byte)
|
||||
for i := 0; i < 100; i++ {
|
||||
for range 100 {
|
||||
obj := blobstortest.NewObject(64 * 1024) // 64KB object
|
||||
data, err := obj.Marshal()
|
||||
require.NoError(t, err)
|
||||
|
@ -168,7 +168,7 @@ func TestBlobovniczaTreeFillPercentRebuild(t *testing.T) {
|
|||
|
||||
storageIDs := make(map[oid.Address][]byte)
|
||||
toDelete := make(map[oid.Address][]byte)
|
||||
for i := 0; i < 100; i++ { // 2 objects for one blobovnicza, so 50 DBs total will be created
|
||||
for i := range 100 { // 2 objects for one blobovnicza, so 50 DBs total will be created
|
||||
obj := blobstortest.NewObject(64 * 1024)
|
||||
data, err := obj.Marshal()
|
||||
require.NoError(t, err)
|
||||
|
@ -236,7 +236,7 @@ func TestBlobovniczaTreeFillPercentRebuild(t *testing.T) {
|
|||
require.NoError(t, b.Init())
|
||||
|
||||
storageIDs := make(map[oid.Address][]byte)
|
||||
for i := 0; i < 100; i++ { // 2 objects for one blobovnicza, so 50 DBs total will be created
|
||||
for range 100 { // 2 objects for one blobovnicza, so 50 DBs total will be created
|
||||
obj := blobstortest.NewObject(64 * 1024)
|
||||
data, err := obj.Marshal()
|
||||
require.NoError(t, err)
|
||||
|
|
|
@ -47,7 +47,7 @@ func TestIterateObjects(t *testing.T) {
|
|||
|
||||
mObjs := make(map[string]addrData)
|
||||
|
||||
for i := uint64(0); i < objNum; i++ {
|
||||
for i := range uint64(objNum) {
|
||||
sz := smalSz
|
||||
|
||||
big := i < objNum/2
|
||||
|
|
|
@ -151,7 +151,7 @@ func TestErrorReporting(t *testing.T) {
|
|||
checkShardState(t, te.ng, te.shards[1].id, 0, mode.ReadWrite)
|
||||
}
|
||||
|
||||
for i := uint32(0); i < 2; i++ {
|
||||
for i := range uint32(2) {
|
||||
_, err = te.ng.Get(context.Background(), GetPrm{addr: object.AddressOf(obj)})
|
||||
require.Error(t, err)
|
||||
checkShardState(t, te.ng, te.shards[0].id, errThreshold+i, mode.ReadOnly)
|
||||
|
|
|
@ -1,9 +1,9 @@
|
|||
package pilorama
|
||||
|
||||
import (
|
||||
"cmp"
|
||||
"encoding/binary"
|
||||
"slices"
|
||||
"sort"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
|
@ -48,8 +48,8 @@ func (b *batch) run() {
|
|||
|
||||
// Sorting without a mutex is ok, because we append to this slice only if timer is non-nil.
|
||||
// See (*boltForest).addBatch for details.
|
||||
sort.Slice(b.operations, func(i, j int) bool {
|
||||
return b.operations[i].Time < b.operations[j].Time
|
||||
slices.SortFunc(b.operations, func(mi, mj *Move) int {
|
||||
return cmp.Compare(mi.Time, mj.Time)
|
||||
})
|
||||
b.operations = slices.CompactFunc(b.operations, func(x, y *Move) bool { return x.Time == y.Time })
|
||||
|
||||
|
|
|
@ -10,7 +10,6 @@ import (
|
|||
"os"
|
||||
"path/filepath"
|
||||
"slices"
|
||||
"sort"
|
||||
"strconv"
|
||||
"sync"
|
||||
"time"
|
||||
|
@ -705,7 +704,7 @@ func (t *boltForest) applyOperation(logBucket, treeBucket *bbolt.Bucket, ms []*M
|
|||
key, value = c.Prev()
|
||||
}
|
||||
|
||||
for i := range len(ms) {
|
||||
for i := range ms {
|
||||
// Loop invariant: key represents the next stored timestamp after ms[i].Time.
|
||||
|
||||
// 2. Insert the operation.
|
||||
|
@ -1093,14 +1092,19 @@ func (t *boltForest) TreeSortedByFilename(ctx context.Context, cid cidSDK.ID, tr
|
|||
return res, last, metaerr.Wrap(err)
|
||||
}
|
||||
|
||||
func sortByFilename(nodes []NodeInfo) {
|
||||
slices.SortFunc(nodes, func(a, b NodeInfo) int {
|
||||
return bytes.Compare(a.Meta.GetAttr(AttributeFilename), b.Meta.GetAttr(AttributeFilename))
|
||||
})
|
||||
}
|
||||
|
||||
func sortAndCut(result []NodeInfo, last *string) []NodeInfo {
|
||||
var lastBytes []byte
|
||||
if last != nil {
|
||||
lastBytes = []byte(*last)
|
||||
}
|
||||
sort.Slice(result, func(i, j int) bool {
|
||||
return bytes.Compare(result[i].Meta.GetAttr(AttributeFilename), result[j].Meta.GetAttr(AttributeFilename)) == -1
|
||||
})
|
||||
sortByFilename(result)
|
||||
|
||||
for i := range result {
|
||||
if lastBytes == nil || bytes.Compare(lastBytes, result[i].Meta.GetAttr(AttributeFilename)) == -1 {
|
||||
return result[i:]
|
||||
|
|
|
@ -1,7 +1,6 @@
|
|||
package pilorama
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
|
@ -192,9 +191,7 @@ func (f *memoryForest) TreeSortedByFilename(_ context.Context, cid cid.ID, treeI
|
|||
return nil, start, nil
|
||||
}
|
||||
|
||||
sort.Slice(res, func(i, j int) bool {
|
||||
return bytes.Compare(res[i].Meta.GetAttr(AttributeFilename), res[j].Meta.GetAttr(AttributeFilename)) == -1
|
||||
})
|
||||
sortByFilename(res)
|
||||
|
||||
r := mergeNodeInfos(res)
|
||||
for i := range r {
|
||||
|
|
|
@ -1081,7 +1081,7 @@ func prepareRandomTree(nodeCount, opCount int) []Move {
|
|||
}
|
||||
|
||||
func compareForests(t *testing.T, expected, actual Forest, cid cidSDK.ID, treeID string, nodeCount int) {
|
||||
for i := uint64(0); i < uint64(nodeCount); i++ {
|
||||
for i := range uint64(nodeCount) {
|
||||
expectedMeta, expectedParent, err := expected.TreeGetMeta(context.Background(), cid, treeID, i)
|
||||
require.NoError(t, err)
|
||||
actualMeta, actualParent, err := actual.TreeGetMeta(context.Background(), cid, treeID, i)
|
||||
|
|
|
@ -1,6 +1,9 @@
|
|||
package pilorama
|
||||
|
||||
import "sort"
|
||||
import (
|
||||
"cmp"
|
||||
"slices"
|
||||
)
|
||||
|
||||
// nodeInfo couples parent and metadata.
|
||||
type nodeInfo struct {
|
||||
|
@ -131,10 +134,10 @@ func (t tree) getChildren(parent Node) []Node {
|
|||
}
|
||||
}
|
||||
|
||||
sort.Slice(children, func(i, j int) bool {
|
||||
a := t.infoMap[children[i]]
|
||||
b := t.infoMap[children[j]]
|
||||
return a.Meta.Time < b.Meta.Time
|
||||
slices.SortFunc(children, func(ci, cj uint64) int {
|
||||
a := t.infoMap[ci]
|
||||
b := t.infoMap[cj]
|
||||
return cmp.Compare(a.Meta.Time, b.Meta.Time)
|
||||
})
|
||||
return children
|
||||
}
|
||||
|
|
|
@ -216,7 +216,7 @@ func TestRefillMetabase(t *testing.T) {
|
|||
locked := make([]oid.ID, 1, 2)
|
||||
locked[0] = oidtest.ID()
|
||||
cnrLocked := cidtest.ID()
|
||||
for i := uint64(0); i < objNum; i++ {
|
||||
for range objNum {
|
||||
obj := objecttest.Object()
|
||||
obj.SetType(objectSDK.TypeRegular)
|
||||
|
||||
|
|
|
@ -14,7 +14,7 @@ func TestLimiter(t *testing.T) {
|
|||
l := newFlushLimiter(uint64(maxSize))
|
||||
var currSize atomic.Int64
|
||||
var eg errgroup.Group
|
||||
for i := 0; i < 10_000; i++ {
|
||||
for range 10_000 {
|
||||
eg.Go(func() error {
|
||||
defer l.release(single)
|
||||
defer currSize.Add(-1)
|
||||
|
|
|
@ -19,6 +19,7 @@ import (
|
|||
"github.com/nspcc-dev/neo-go/pkg/core/transaction"
|
||||
"github.com/nspcc-dev/neo-go/pkg/crypto/keys"
|
||||
"github.com/nspcc-dev/neo-go/pkg/encoding/fixedn"
|
||||
"github.com/nspcc-dev/neo-go/pkg/neorpc/result"
|
||||
"github.com/nspcc-dev/neo-go/pkg/rpcclient"
|
||||
"github.com/nspcc-dev/neo-go/pkg/rpcclient/actor"
|
||||
"github.com/nspcc-dev/neo-go/pkg/rpcclient/gas"
|
||||
|
@ -461,6 +462,28 @@ func (c *Client) TxHalt(h util.Uint256) (res bool, err error) {
|
|||
return len(aer.Executions) > 0 && aer.Executions[0].VMState.HasFlag(vmstate.Halt), nil
|
||||
}
|
||||
|
||||
func (c *Client) GetApplicationLog(hash util.Uint256, trig *trigger.Type) (*result.ApplicationLog, error) {
|
||||
c.switchLock.RLock()
|
||||
defer c.switchLock.RUnlock()
|
||||
|
||||
if c.inactive {
|
||||
return nil, ErrConnectionLost
|
||||
}
|
||||
|
||||
return c.client.GetApplicationLog(hash, trig)
|
||||
}
|
||||
|
||||
func (c *Client) GetVersion() (*result.Version, error) {
|
||||
c.switchLock.RLock()
|
||||
defer c.switchLock.RUnlock()
|
||||
|
||||
if c.inactive {
|
||||
return nil, ErrConnectionLost
|
||||
}
|
||||
|
||||
return c.client.GetVersion()
|
||||
}
|
||||
|
||||
// TxHeight returns true if transaction has been successfully executed and persisted.
|
||||
func (c *Client) TxHeight(h util.Uint256) (res uint32, err error) {
|
||||
c.switchLock.RLock()
|
||||
|
|
|
@ -4,11 +4,11 @@ import (
|
|||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"net"
|
||||
"time"
|
||||
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/metrics"
|
||||
internalNet "git.frostfs.info/TrueCloudLab/frostfs-node/internal/net"
|
||||
morphmetrics "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/metrics"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger"
|
||||
lru "github.com/hashicorp/golang-lru/v2"
|
||||
|
@ -48,7 +48,7 @@ type cfg struct {
|
|||
|
||||
morphCacheMetrics metrics.MorphCacheMetrics
|
||||
|
||||
dialerSource *internalNet.DialerSource
|
||||
dialerSource DialerSource
|
||||
}
|
||||
|
||||
const (
|
||||
|
@ -68,6 +68,7 @@ func defaultConfig() *cfg {
|
|||
Scopes: transaction.Global,
|
||||
},
|
||||
morphCacheMetrics: &morphmetrics.NoopMorphCacheMetrics{},
|
||||
dialerSource: &noopDialerSource{},
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -296,7 +297,17 @@ func WithMorphCacheMetrics(morphCacheMetrics metrics.MorphCacheMetrics) Option {
|
|||
}
|
||||
}
|
||||
|
||||
func WithDialerSource(ds *internalNet.DialerSource) Option {
|
||||
type DialerSource interface {
|
||||
NetContextDialer() func(context.Context, string, string) (net.Conn, error)
|
||||
}
|
||||
|
||||
type noopDialerSource struct{}
|
||||
|
||||
func (ds *noopDialerSource) NetContextDialer() func(context.Context, string, string) (net.Conn, error) {
|
||||
return nil
|
||||
}
|
||||
|
||||
func WithDialerSource(ds DialerSource) Option {
|
||||
return func(c *cfg) {
|
||||
c.dialerSource = ds
|
||||
}
|
||||
|
|
|
@ -140,7 +140,7 @@ func (c *Client) ProbeNotary() (res bool) {
|
|||
// use this function.
|
||||
//
|
||||
// This function must be invoked with notary enabled otherwise it throws panic.
|
||||
func (c *Client) DepositNotary(amount fixedn.Fixed8, delta uint32) (res util.Uint256, err error) {
|
||||
func (c *Client) DepositNotary(amount fixedn.Fixed8, delta uint32) (util.Uint256, error) {
|
||||
c.switchLock.RLock()
|
||||
defer c.switchLock.RUnlock()
|
||||
|
||||
|
@ -163,7 +163,8 @@ func (c *Client) DepositNotary(amount fixedn.Fixed8, delta uint32) (res util.Uin
|
|||
}
|
||||
|
||||
till := max(int64(bc+delta), currentTill)
|
||||
return c.depositNotary(amount, till)
|
||||
res, _, err := c.depositNotary(amount, till)
|
||||
return res, err
|
||||
}
|
||||
|
||||
// DepositEndlessNotary calls notary deposit method. Unlike `DepositNotary`,
|
||||
|
@ -171,12 +172,12 @@ func (c *Client) DepositNotary(amount fixedn.Fixed8, delta uint32) (res util.Uin
|
|||
// This allows to avoid ValidAfterDeposit failures.
|
||||
//
|
||||
// This function must be invoked with notary enabled otherwise it throws panic.
|
||||
func (c *Client) DepositEndlessNotary(amount fixedn.Fixed8) (res util.Uint256, err error) {
|
||||
func (c *Client) DepositEndlessNotary(amount fixedn.Fixed8) (util.Uint256, uint32, error) {
|
||||
c.switchLock.RLock()
|
||||
defer c.switchLock.RUnlock()
|
||||
|
||||
if c.inactive {
|
||||
return util.Uint256{}, ErrConnectionLost
|
||||
return util.Uint256{}, 0, ErrConnectionLost
|
||||
}
|
||||
|
||||
if c.notary == nil {
|
||||
|
@ -187,7 +188,7 @@ func (c *Client) DepositEndlessNotary(amount fixedn.Fixed8) (res util.Uint256, e
|
|||
return c.depositNotary(amount, math.MaxUint32)
|
||||
}
|
||||
|
||||
func (c *Client) depositNotary(amount fixedn.Fixed8, till int64) (res util.Uint256, err error) {
|
||||
func (c *Client) depositNotary(amount fixedn.Fixed8, till int64) (util.Uint256, uint32, error) {
|
||||
txHash, vub, err := c.gasToken.Transfer(
|
||||
c.accAddr,
|
||||
c.notary.notary,
|
||||
|
@ -195,7 +196,7 @@ func (c *Client) depositNotary(amount fixedn.Fixed8, till int64) (res util.Uint2
|
|||
[]any{c.acc.PrivateKey().GetScriptHash(), till})
|
||||
if err != nil {
|
||||
if !errors.Is(err, neorpc.ErrAlreadyExists) {
|
||||
return util.Uint256{}, fmt.Errorf("can't make notary deposit: %w", err)
|
||||
return util.Uint256{}, 0, fmt.Errorf("can't make notary deposit: %w", err)
|
||||
}
|
||||
|
||||
// Transaction is already in mempool waiting to be processed.
|
||||
|
@ -205,7 +206,7 @@ func (c *Client) depositNotary(amount fixedn.Fixed8, till int64) (res util.Uint2
|
|||
zap.Int64("expire_at", till),
|
||||
zap.Uint32("vub", vub),
|
||||
zap.Error(err))
|
||||
return util.Uint256{}, nil
|
||||
return util.Uint256{}, 0, nil
|
||||
}
|
||||
|
||||
c.logger.Info(logs.ClientNotaryDepositInvoke,
|
||||
|
@ -214,7 +215,7 @@ func (c *Client) depositNotary(amount fixedn.Fixed8, till int64) (res util.Uint2
|
|||
zap.Uint32("vub", vub),
|
||||
zap.Stringer("tx_hash", txHash.Reverse()))
|
||||
|
||||
return txHash, nil
|
||||
return txHash, vub, nil
|
||||
}
|
||||
|
||||
// GetNotaryDeposit returns deposit of client's account in notary contract.
|
||||
|
|
|
@ -8,7 +8,7 @@ import (
|
|||
)
|
||||
|
||||
func tickN(t *timer.BlockTimer, n uint32) {
|
||||
for i := uint32(0); i < n; i++ {
|
||||
for range n {
|
||||
t.Tick(0)
|
||||
}
|
||||
}
|
||||
|
|
|
@ -537,10 +537,7 @@ func (ac *apeChecker) isContainerKey(pk []byte, cnrID cid.ID, cont *containercor
|
|||
return false, err
|
||||
}
|
||||
|
||||
in, err := isContainerNode(nm, pk, binCnrID, cont)
|
||||
if err != nil {
|
||||
return false, err
|
||||
} else if in {
|
||||
if isContainerNode(nm, pk, binCnrID, cont) {
|
||||
return true, nil
|
||||
}
|
||||
|
||||
|
@ -551,24 +548,24 @@ func (ac *apeChecker) isContainerKey(pk []byte, cnrID cid.ID, cont *containercor
|
|||
return false, err
|
||||
}
|
||||
|
||||
return isContainerNode(nm, pk, binCnrID, cont)
|
||||
return isContainerNode(nm, pk, binCnrID, cont), nil
|
||||
}
|
||||
|
||||
func isContainerNode(nm *netmapSDK.NetMap, pk, binCnrID []byte, cont *containercore.Container) (bool, error) {
|
||||
cnrVectors, err := nm.ContainerNodes(cont.Value.PlacementPolicy(), binCnrID)
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
func isContainerNode(nm *netmapSDK.NetMap, pk, binCnrID []byte, cont *containercore.Container) bool {
|
||||
// It could an error only if the network map doesn't have enough nodes to
|
||||
// fulfil the policy. It's a logical error that doesn't affect an actor role
|
||||
// determining, so we ignore it
|
||||
cnrVectors, _ := nm.ContainerNodes(cont.Value.PlacementPolicy(), binCnrID)
|
||||
|
||||
for i := range cnrVectors {
|
||||
for j := range cnrVectors[i] {
|
||||
if bytes.Equal(cnrVectors[i][j].PublicKey(), pk) {
|
||||
return true, nil
|
||||
return true
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return false, nil
|
||||
return false
|
||||
}
|
||||
|
||||
func (ac *apeChecker) namespaceByOwner(owner *refs.OwnerID) (string, error) {
|
||||
|
|
|
@ -28,7 +28,7 @@ type executorSvc struct {
|
|||
type NodeState interface {
|
||||
// LocalNodeInfo must return current node state
|
||||
// in FrostFS API v2 NodeInfo structure.
|
||||
LocalNodeInfo() (*netmap.NodeInfo, error)
|
||||
LocalNodeInfo() *netmapSDK.NodeInfo
|
||||
|
||||
// ReadCurrentNetMap reads current local network map of the storage node
|
||||
// into the given parameter. Returns any error encountered which prevented
|
||||
|
@ -64,39 +64,15 @@ func NewExecutionService(s NodeState, v versionsdk.Version, netInfo NetworkInfo,
|
|||
|
||||
func (s *executorSvc) LocalNodeInfo(
|
||||
_ context.Context,
|
||||
req *netmap.LocalNodeInfoRequest,
|
||||
_ *netmap.LocalNodeInfoRequest,
|
||||
) (*netmap.LocalNodeInfoResponse, error) {
|
||||
verV2 := req.GetMetaHeader().GetVersion()
|
||||
if verV2 == nil {
|
||||
return nil, errors.New("missing version")
|
||||
}
|
||||
|
||||
var ver versionsdk.Version
|
||||
if err := ver.ReadFromV2(*verV2); err != nil {
|
||||
return nil, fmt.Errorf("can't read version: %w", err)
|
||||
}
|
||||
|
||||
ni, err := s.state.LocalNodeInfo()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if addrNum := ni.NumberOfAddresses(); addrNum > 0 && ver.Minor() <= 7 {
|
||||
ni2 := new(netmap.NodeInfo)
|
||||
ni2.SetPublicKey(ni.GetPublicKey())
|
||||
ni2.SetState(ni.GetState())
|
||||
ni2.SetAttributes(ni.GetAttributes())
|
||||
ni.IterateAddresses(func(s string) bool {
|
||||
ni2.SetAddresses(s)
|
||||
return true
|
||||
})
|
||||
|
||||
ni = ni2
|
||||
}
|
||||
ni := s.state.LocalNodeInfo()
|
||||
var nodeInfo netmap.NodeInfo
|
||||
ni.WriteToV2(&nodeInfo)
|
||||
|
||||
body := new(netmap.LocalNodeInfoResponseBody)
|
||||
body.SetVersion(&s.version)
|
||||
body.SetNodeInfo(ni)
|
||||
body.SetNodeInfo(&nodeInfo)
|
||||
|
||||
resp := new(netmap.LocalNodeInfoResponse)
|
||||
resp.SetBody(body)
|
||||
|
|
|
@ -50,7 +50,7 @@ func testGenerateMetaHeader(depth uint32, b *acl.BearerToken, s *session.Token)
|
|||
metaHeader.SetBearerToken(b)
|
||||
metaHeader.SetSessionToken(s)
|
||||
|
||||
for i := uint32(0); i < depth; i++ {
|
||||
for range depth {
|
||||
link := metaHeader
|
||||
metaHeader = new(session.RequestMetaHeader)
|
||||
metaHeader.SetOrigin(link)
|
||||
|
|
|
@ -67,9 +67,6 @@ type Prm struct {
|
|||
// If SoftAPECheck is set to true, then NoRuleFound is interpreted as allow.
|
||||
SoftAPECheck bool
|
||||
|
||||
// If true, object headers will not retrieved from storage engine.
|
||||
WithoutHeaderRequest bool
|
||||
|
||||
// The request's bearer token. It is used in order to check APE overrides with the token.
|
||||
BearerToken *bearer.Token
|
||||
|
||||
|
|
|
@ -695,6 +695,7 @@ func TestPutECChunk(t *testing.T) {
|
|||
nm := &netmapStub{
|
||||
currentEpoch: 100,
|
||||
netmaps: map[uint64]*netmapSDK.NetMap{
|
||||
99: netmap,
|
||||
100: netmap,
|
||||
},
|
||||
}
|
||||
|
|
|
@ -3,6 +3,7 @@ package ape
|
|||
import (
|
||||
"context"
|
||||
"crypto/sha256"
|
||||
"errors"
|
||||
"fmt"
|
||||
"net"
|
||||
"strconv"
|
||||
|
@ -11,6 +12,7 @@ import (
|
|||
aperequest "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/ape/request"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/netmap"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object"
|
||||
apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/acl"
|
||||
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
|
||||
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
|
||||
|
@ -24,6 +26,8 @@ import (
|
|||
|
||||
var defaultRequest = aperequest.Request{}
|
||||
|
||||
var errECMissingParentObjectID = errors.New("missing EC parent object ID")
|
||||
|
||||
func nativeSchemaRole(role acl.Role) string {
|
||||
switch role {
|
||||
case acl.RoleOwner:
|
||||
|
@ -116,13 +120,16 @@ func (c *checkerImpl) newAPERequest(ctx context.Context, prm Prm) (aperequest.Re
|
|||
var header *objectV2.Header
|
||||
if prm.Header != nil {
|
||||
header = prm.Header
|
||||
} else if prm.Object != nil && !prm.WithoutHeaderRequest {
|
||||
} else if prm.Object != nil {
|
||||
headerObjSDK, err := c.headerProvider.GetHeader(ctx, prm.Container, *prm.Object, true)
|
||||
if err == nil {
|
||||
header = headerObjSDK.ToV2().GetHeader()
|
||||
}
|
||||
}
|
||||
header = c.fillHeaderWithECParent(ctx, prm, header)
|
||||
header, err := c.fillHeaderWithECParent(ctx, prm, header)
|
||||
if err != nil {
|
||||
return defaultRequest, fmt.Errorf("get EC parent header: %w", err)
|
||||
}
|
||||
reqProps := map[string]string{
|
||||
nativeschema.PropertyKeyActorPublicKey: prm.SenderKey,
|
||||
nativeschema.PropertyKeyActorRole: prm.Role,
|
||||
|
@ -133,7 +140,6 @@ func (c *checkerImpl) newAPERequest(ctx context.Context, prm Prm) (aperequest.Re
|
|||
reqProps[xheadKey] = xhead.GetValue()
|
||||
}
|
||||
|
||||
var err error
|
||||
reqProps, err = c.fillWithUserClaimTags(reqProps, prm)
|
||||
if err != nil {
|
||||
return defaultRequest, err
|
||||
|
@ -155,35 +161,43 @@ func (c *checkerImpl) newAPERequest(ctx context.Context, prm Prm) (aperequest.Re
|
|||
), nil
|
||||
}
|
||||
|
||||
func (c *checkerImpl) fillHeaderWithECParent(ctx context.Context, prm Prm, header *objectV2.Header) *objectV2.Header {
|
||||
func (c *checkerImpl) fillHeaderWithECParent(ctx context.Context, prm Prm, header *objectV2.Header) (*objectV2.Header, error) {
|
||||
if header == nil {
|
||||
return header
|
||||
return header, nil
|
||||
}
|
||||
if header.GetEC() == nil {
|
||||
return header
|
||||
}
|
||||
if prm.Role == nativeschema.PropertyValueContainerRoleContainer ||
|
||||
prm.Role == nativeschema.PropertyValueContainerRoleIR {
|
||||
return header
|
||||
return header, nil
|
||||
}
|
||||
parentObjRefID := header.GetEC().Parent
|
||||
if parentObjRefID == nil {
|
||||
return header
|
||||
return nil, errECMissingParentObjectID
|
||||
}
|
||||
var parentObjID oid.ID
|
||||
if err := parentObjID.ReadFromV2(*parentObjRefID); err != nil {
|
||||
return header
|
||||
return nil, fmt.Errorf("EC parent object ID format error: %w", err)
|
||||
}
|
||||
// only container node have access to collect parent object
|
||||
contNode, err := c.currentNodeIsContainerNode(prm.Container)
|
||||
if err != nil || !contNode {
|
||||
return header
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("check container node status: %w", err)
|
||||
}
|
||||
if !contNode {
|
||||
return header, nil
|
||||
}
|
||||
parentObj, err := c.headerProvider.GetHeader(ctx, prm.Container, parentObjID, false)
|
||||
if err != nil {
|
||||
return header
|
||||
if isLogicalError(err) {
|
||||
return header, nil
|
||||
}
|
||||
return nil, fmt.Errorf("EC parent header request: %w", err)
|
||||
}
|
||||
return parentObj.ToV2().GetHeader()
|
||||
return parentObj.ToV2().GetHeader(), nil
|
||||
}
|
||||
|
||||
func isLogicalError(err error) bool {
|
||||
var errObjRemoved *apistatus.ObjectAlreadyRemoved
|
||||
var errObjNotFound *apistatus.ObjectNotFound
|
||||
return errors.As(err, &errObjRemoved) || errors.As(err, &errObjNotFound)
|
||||
}
|
||||
|
||||
func (c *checkerImpl) currentNodeIsContainerNode(cnrID cid.ID) (bool, error) {
|
||||
|
|
|
@ -25,7 +25,10 @@ import (
|
|||
|
||||
var _ transformer.ObjectWriter = (*ECWriter)(nil)
|
||||
|
||||
var errUnsupportedECObject = errors.New("object is not supported for erasure coding")
|
||||
var (
|
||||
errUnsupportedECObject = errors.New("object is not supported for erasure coding")
|
||||
errFailedToSaveAllECParts = errors.New("failed to save all EC parts")
|
||||
)
|
||||
|
||||
type ECWriter struct {
|
||||
Config *Config
|
||||
|
@ -37,10 +40,12 @@ type ECWriter struct {
|
|||
|
||||
ObjectMeta object.ContentMeta
|
||||
ObjectMetaValid bool
|
||||
|
||||
remoteRequestSignKey *ecdsa.PrivateKey
|
||||
}
|
||||
|
||||
func (e *ECWriter) WriteObject(ctx context.Context, obj *objectSDK.Object) error {
|
||||
relayed, err := e.relayIfNotContainerNode(ctx, obj)
|
||||
relayed, isContainerNode, err := e.relayIfNotContainerNode(ctx, obj)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
@ -60,23 +65,35 @@ func (e *ECWriter) WriteObject(ctx context.Context, obj *objectSDK.Object) error
|
|||
e.ObjectMetaValid = true
|
||||
}
|
||||
|
||||
if isContainerNode {
|
||||
restoreTokens := e.CommonPrm.ForgetTokens()
|
||||
defer restoreTokens()
|
||||
// As request executed on container node, so sign request with container key.
|
||||
e.remoteRequestSignKey, err = e.Config.KeyStorage.GetKey(nil)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
} else {
|
||||
e.remoteRequestSignKey = e.Key
|
||||
}
|
||||
|
||||
if obj.ECHeader() != nil {
|
||||
return e.writeECPart(ctx, obj)
|
||||
}
|
||||
return e.writeRawObject(ctx, obj)
|
||||
}
|
||||
|
||||
func (e *ECWriter) relayIfNotContainerNode(ctx context.Context, obj *objectSDK.Object) (bool, error) {
|
||||
if e.Relay == nil {
|
||||
return false, nil
|
||||
}
|
||||
func (e *ECWriter) relayIfNotContainerNode(ctx context.Context, obj *objectSDK.Object) (bool, bool, error) {
|
||||
currentNodeIsContainerNode, err := e.currentNodeIsContainerNode()
|
||||
if err != nil {
|
||||
return false, err
|
||||
return false, false, err
|
||||
}
|
||||
if currentNodeIsContainerNode {
|
||||
// object can be splitted or saved local
|
||||
return false, nil
|
||||
return false, true, nil
|
||||
}
|
||||
if e.Relay == nil {
|
||||
return false, currentNodeIsContainerNode, nil
|
||||
}
|
||||
objID := object.AddressOf(obj).Object()
|
||||
var index uint32
|
||||
|
@ -85,9 +102,9 @@ func (e *ECWriter) relayIfNotContainerNode(ctx context.Context, obj *objectSDK.O
|
|||
index = obj.ECHeader().Index()
|
||||
}
|
||||
if err := e.relayToContainerNode(ctx, objID, index); err != nil {
|
||||
return false, err
|
||||
return false, false, err
|
||||
}
|
||||
return true, nil
|
||||
return true, currentNodeIsContainerNode, nil
|
||||
}
|
||||
|
||||
func (e *ECWriter) currentNodeIsContainerNode() (bool, error) {
|
||||
|
@ -235,6 +252,13 @@ func (e *ECWriter) writeRawObject(ctx context.Context, obj *objectSDK.Object) er
|
|||
singleErr: err,
|
||||
}
|
||||
}
|
||||
for idx := range partsProcessed {
|
||||
if !partsProcessed[idx].Load() {
|
||||
return errIncompletePut{
|
||||
singleErr: errFailedToSaveAllECParts,
|
||||
}
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
|
@ -284,7 +308,7 @@ func (e *ECWriter) writePart(ctx context.Context, obj *objectSDK.Object, partIdx
|
|||
}
|
||||
|
||||
// try to save to any node not visited by current part
|
||||
for i := range len(nodes) {
|
||||
for i := range nodes {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return ctx.Err()
|
||||
|
@ -338,7 +362,7 @@ func (e *ECWriter) writePartRemote(ctx context.Context, obj *objectSDK.Object, n
|
|||
client.NodeInfoFromNetmapElement(&clientNodeInfo, node)
|
||||
|
||||
remoteTaget := remoteWriter{
|
||||
privateKey: e.Key,
|
||||
privateKey: e.remoteRequestSignKey,
|
||||
clientConstructor: e.Config.ClientConstructor,
|
||||
commonPrm: e.CommonPrm,
|
||||
nodeInfo: clientNodeInfo,
|
||||
|
|
|
@ -14,6 +14,7 @@ import (
|
|||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/client"
|
||||
netmapcore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/netmap"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/network"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/util"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object_manager/placement"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/checksum"
|
||||
|
@ -127,6 +128,8 @@ func TestECWriter(t *testing.T) {
|
|||
|
||||
ownerKey, err := keys.NewPrivateKey()
|
||||
require.NoError(t, err)
|
||||
nodeKey, err := keys.NewPrivateKey()
|
||||
require.NoError(t, err)
|
||||
|
||||
pool, err := ants.NewPool(4, ants.WithNonblocking(true))
|
||||
require.NoError(t, err)
|
||||
|
@ -141,6 +144,7 @@ func TestECWriter(t *testing.T) {
|
|||
RemotePool: pool,
|
||||
Logger: log,
|
||||
ClientConstructor: clientConstructor{vectors: ns},
|
||||
KeyStorage: util.NewKeyStorage(&nodeKey.PrivateKey, nil, nil),
|
||||
},
|
||||
PlacementOpts: append(
|
||||
[]placement.Option{placement.UseBuilder(builder), placement.ForContainer(cnr)},
|
||||
|
|
|
@ -113,10 +113,9 @@ func (s *Streamer) init(ctx context.Context, req *objectV2.PatchRequest) error {
|
|||
oV2.GetHeader().SetOwnerID(ownerID)
|
||||
|
||||
target, err := target.New(objectwriter.Params{
|
||||
Config: s.Config,
|
||||
Common: commonPrm,
|
||||
Header: objectSDK.NewFromV2(oV2),
|
||||
SignRequestPrivateKey: s.localNodeKey,
|
||||
Config: s.Config,
|
||||
Common: commonPrm,
|
||||
Header: objectSDK.NewFromV2(oV2),
|
||||
})
|
||||
if err != nil {
|
||||
return fmt.Errorf("target creation: %w", err)
|
||||
|
|
|
@ -100,11 +100,18 @@ func (p *CommonPrm) SetNetmapLookupDepth(v uint64) {
|
|||
|
||||
// ForgetTokens forgets all the tokens read from the request's
|
||||
// meta information before.
|
||||
func (p *CommonPrm) ForgetTokens() {
|
||||
func (p *CommonPrm) ForgetTokens() func() {
|
||||
if p != nil {
|
||||
tk := p.token
|
||||
br := p.bearer
|
||||
p.token = nil
|
||||
p.bearer = nil
|
||||
return func() {
|
||||
p.token = tk
|
||||
p.bearer = br
|
||||
}
|
||||
}
|
||||
return func() {}
|
||||
}
|
||||
|
||||
func CommonPrmFromV2(req interface {
|
||||
|
|
|
@ -3,6 +3,7 @@ package placement
|
|||
import (
|
||||
"crypto/sha256"
|
||||
"fmt"
|
||||
"slices"
|
||||
"sync"
|
||||
|
||||
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
|
||||
|
@ -44,7 +45,7 @@ func (c *ContainerNodesCache) ContainerNodes(nm *netmapSDK.NetMap, cnr cid.ID, p
|
|||
raw, ok := c.containerCache.Get(cnr)
|
||||
c.mtx.Unlock()
|
||||
if ok {
|
||||
return raw, nil
|
||||
return c.cloneResult(raw), nil
|
||||
}
|
||||
} else {
|
||||
c.lastEpoch = nm.Epoch()
|
||||
|
@ -65,5 +66,13 @@ func (c *ContainerNodesCache) ContainerNodes(nm *netmapSDK.NetMap, cnr cid.ID, p
|
|||
c.containerCache.Add(cnr, cn)
|
||||
}
|
||||
c.mtx.Unlock()
|
||||
return cn, nil
|
||||
return c.cloneResult(cn), nil
|
||||
}
|
||||
|
||||
func (c *ContainerNodesCache) cloneResult(nodes [][]netmapSDK.NodeInfo) [][]netmapSDK.NodeInfo {
|
||||
result := make([][]netmapSDK.NodeInfo, len(nodes))
|
||||
for repIdx := range nodes {
|
||||
result[repIdx] = slices.Clone(nodes[repIdx])
|
||||
}
|
||||
return result
|
||||
}
|
||||
|
|
43
pkg/services/object_manager/placement/metrics.go
Normal file
43
pkg/services/object_manager/placement/metrics.go
Normal file
|
@ -0,0 +1,43 @@
|
|||
package placement
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"strings"
|
||||
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/netmap"
|
||||
)
|
||||
|
||||
const (
|
||||
attrPrefix = "$attribute:"
|
||||
)
|
||||
|
||||
type Metric interface {
|
||||
CalculateValue(*netmap.NodeInfo, *netmap.NodeInfo) int
|
||||
}
|
||||
|
||||
func ParseMetric(raw string) (Metric, error) {
|
||||
if attr, found := strings.CutPrefix(raw, attrPrefix); found {
|
||||
return NewAttributeMetric(attr), nil
|
||||
}
|
||||
return nil, errors.New("unsupported priority metric")
|
||||
}
|
||||
|
||||
// attributeMetric describes priority metric based on attribute.
|
||||
type attributeMetric struct {
|
||||
attribute string
|
||||
}
|
||||
|
||||
// CalculateValue return [0] if from and to contains attribute attributeMetric.attribute and
|
||||
// the value of attribute is the same. In other case return [1].
|
||||
func (am *attributeMetric) CalculateValue(from *netmap.NodeInfo, to *netmap.NodeInfo) int {
|
||||
fromAttr := from.Attribute(am.attribute)
|
||||
toAttr := to.Attribute(am.attribute)
|
||||
if len(fromAttr) > 0 && len(toAttr) > 0 && fromAttr == toAttr {
|
||||
return 0
|
||||
}
|
||||
return 1
|
||||
}
|
||||
|
||||
func NewAttributeMetric(attr string) Metric {
|
||||
return &attributeMetric{attribute: attr}
|
||||
}
|
|
@ -3,6 +3,7 @@ package placement
|
|||
import (
|
||||
"errors"
|
||||
"fmt"
|
||||
"slices"
|
||||
"sync"
|
||||
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/network"
|
||||
|
@ -23,6 +24,11 @@ type Builder interface {
|
|||
BuildPlacement(cid.ID, *oid.ID, netmap.PlacementPolicy) ([][]netmap.NodeInfo, error)
|
||||
}
|
||||
|
||||
type NodeState interface {
|
||||
// LocalNodeInfo return current node state in FrostFS API v2 NodeInfo structure.
|
||||
LocalNodeInfo() *netmap.NodeInfo
|
||||
}
|
||||
|
||||
// Option represents placement traverser option.
|
||||
type Option func(*cfg)
|
||||
|
||||
|
@ -50,6 +56,10 @@ type cfg struct {
|
|||
policy netmap.PlacementPolicy
|
||||
|
||||
builder Builder
|
||||
|
||||
metrics []Metric
|
||||
|
||||
nodeState NodeState
|
||||
}
|
||||
|
||||
const invalidOptsMsg = "invalid traverser options"
|
||||
|
@ -99,7 +109,22 @@ func NewTraverser(opts ...Option) (*Traverser, error) {
|
|||
}
|
||||
|
||||
var rem []int
|
||||
if cfg.flatSuccess != nil {
|
||||
if len(cfg.metrics) > 0 && cfg.nodeState != nil {
|
||||
rem = defaultCopiesVector(cfg.policy)
|
||||
var unsortedVector []netmap.NodeInfo
|
||||
var regularVector []netmap.NodeInfo
|
||||
for i := range rem {
|
||||
unsortedVector = append(unsortedVector, ns[i][:rem[i]]...)
|
||||
regularVector = append(regularVector, ns[i][rem[i]:]...)
|
||||
}
|
||||
rem = []int{-1, -1}
|
||||
|
||||
sortedVector, err := sortVector(cfg, unsortedVector)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
ns = [][]netmap.NodeInfo{sortedVector, regularVector}
|
||||
} else if cfg.flatSuccess != nil {
|
||||
ns = flatNodes(ns)
|
||||
rem = []int{int(*cfg.flatSuccess)}
|
||||
} else {
|
||||
|
@ -157,6 +182,35 @@ func flatNodes(ns [][]netmap.NodeInfo) [][]netmap.NodeInfo {
|
|||
return [][]netmap.NodeInfo{flat}
|
||||
}
|
||||
|
||||
type nodeMetrics struct {
|
||||
index int
|
||||
metrics []int
|
||||
}
|
||||
|
||||
func sortVector(cfg *cfg, unsortedVector []netmap.NodeInfo) ([]netmap.NodeInfo, error) {
|
||||
nm := make([]nodeMetrics, len(unsortedVector))
|
||||
node := cfg.nodeState.LocalNodeInfo()
|
||||
|
||||
for i := range unsortedVector {
|
||||
m := make([]int, len(cfg.metrics))
|
||||
for j, pm := range cfg.metrics {
|
||||
m[j] = pm.CalculateValue(node, &unsortedVector[i])
|
||||
}
|
||||
nm[i] = nodeMetrics{
|
||||
index: i,
|
||||
metrics: m,
|
||||
}
|
||||
}
|
||||
slices.SortFunc(nm, func(a, b nodeMetrics) int {
|
||||
return slices.Compare(a.metrics, b.metrics)
|
||||
})
|
||||
sortedVector := make([]netmap.NodeInfo, len(unsortedVector))
|
||||
for i := range unsortedVector {
|
||||
sortedVector[i] = unsortedVector[nm[i].index]
|
||||
}
|
||||
return sortedVector, nil
|
||||
}
|
||||
|
||||
// Node is a descriptor of storage node with information required for intra-container communication.
|
||||
type Node struct {
|
||||
addresses network.AddressGroup
|
||||
|
@ -322,3 +376,17 @@ func WithCopyNumbers(v []uint32) Option {
|
|||
c.copyNumbers = v
|
||||
}
|
||||
}
|
||||
|
||||
// WithPriorityMetrics use provided priority metrics to sort nodes.
|
||||
func WithPriorityMetrics(m []Metric) Option {
|
||||
return func(c *cfg) {
|
||||
c.metrics = m
|
||||
}
|
||||
}
|
||||
|
||||
// WithNodeState provide state of the current node.
|
||||
func WithNodeState(s NodeState) Option {
|
||||
return func(c *cfg) {
|
||||
c.nodeState = s
|
||||
}
|
||||
}
|
||||
|
|
|
@ -22,7 +22,9 @@ func (b testBuilder) BuildPlacement(cid.ID, *oid.ID, netmap.PlacementPolicy) ([]
|
|||
}
|
||||
|
||||
func testNode(v uint32) (n netmap.NodeInfo) {
|
||||
n.SetNetworkEndpoints("/ip4/0.0.0.0/tcp/" + strconv.Itoa(int(v)))
|
||||
ip := "/ip4/0.0.0.0/tcp/" + strconv.Itoa(int(v))
|
||||
n.SetNetworkEndpoints(ip)
|
||||
n.SetPublicKey([]byte(ip))
|
||||
|
||||
return n
|
||||
}
|
||||
|
@ -40,7 +42,15 @@ func copyVectors(v [][]netmap.NodeInfo) [][]netmap.NodeInfo {
|
|||
return vc
|
||||
}
|
||||
|
||||
func testPlacement(ss, rs []int) ([][]netmap.NodeInfo, container.Container) {
|
||||
func testPlacement(ss []int, rs []int) ([][]netmap.NodeInfo, container.Container) {
|
||||
return placement(ss, rs, nil)
|
||||
}
|
||||
|
||||
func testECPlacement(ss []int, ec [][]int) ([][]netmap.NodeInfo, container.Container) {
|
||||
return placement(ss, nil, ec)
|
||||
}
|
||||
|
||||
func placement(ss []int, rs []int, ec [][]int) ([][]netmap.NodeInfo, container.Container) {
|
||||
nodes := make([][]netmap.NodeInfo, 0, len(rs))
|
||||
replicas := make([]netmap.ReplicaDescriptor, 0, len(rs))
|
||||
num := uint32(0)
|
||||
|
@ -56,7 +66,12 @@ func testPlacement(ss, rs []int) ([][]netmap.NodeInfo, container.Container) {
|
|||
nodes = append(nodes, ns)
|
||||
|
||||
var rd netmap.ReplicaDescriptor
|
||||
rd.SetNumberOfObjects(uint32(rs[i]))
|
||||
if len(rs) > 0 {
|
||||
rd.SetNumberOfObjects(uint32(rs[i]))
|
||||
} else {
|
||||
rd.SetECDataCount(uint32(ec[i][0]))
|
||||
rd.SetECParityCount(uint32(ec[i][1]))
|
||||
}
|
||||
|
||||
replicas = append(replicas, rd)
|
||||
}
|
||||
|
@ -134,7 +149,7 @@ func TestTraverserObjectScenarios(t *testing.T) {
|
|||
err = n.FromIterator(netmapcore.Node(nodes[1][0]))
|
||||
require.NoError(t, err)
|
||||
|
||||
require.Equal(t, []Node{{addresses: n}}, tr.Next())
|
||||
require.Equal(t, []Node{{addresses: n, key: []byte("/ip4/0.0.0.0/tcp/5")}}, tr.Next())
|
||||
})
|
||||
|
||||
t.Run("put scenario", func(t *testing.T) {
|
||||
|
@ -275,3 +290,268 @@ func TestTraverserRemValues(t *testing.T) {
|
|||
})
|
||||
}
|
||||
}
|
||||
|
||||
type nodeState struct {
|
||||
node *netmap.NodeInfo
|
||||
}
|
||||
|
||||
func (n *nodeState) LocalNodeInfo() *netmap.NodeInfo {
|
||||
return n.node
|
||||
}
|
||||
|
||||
func TestTraverserPriorityMetrics(t *testing.T) {
|
||||
t.Run("one rep one metric", func(t *testing.T) {
|
||||
selectors := []int{4}
|
||||
replicas := []int{3}
|
||||
|
||||
nodes, cnr := testPlacement(selectors, replicas)
|
||||
|
||||
// Node_0, PK - ip4/0.0.0.0/tcp/0
|
||||
nodes[0][0].SetAttribute("ClusterName", "A")
|
||||
// Node_1, PK - ip4/0.0.0.0/tcp/1
|
||||
nodes[0][1].SetAttribute("ClusterName", "A")
|
||||
// Node_2, PK - ip4/0.0.0.0/tcp/2
|
||||
nodes[0][2].SetAttribute("ClusterName", "B")
|
||||
// Node_3, PK - ip4/0.0.0.0/tcp/3
|
||||
nodes[0][3].SetAttribute("ClusterName", "B")
|
||||
|
||||
sdkNode := testNode(5)
|
||||
sdkNode.SetAttribute("ClusterName", "B")
|
||||
|
||||
nodesCopy := copyVectors(nodes)
|
||||
|
||||
m := []Metric{NewAttributeMetric("ClusterName")}
|
||||
|
||||
tr, err := NewTraverser(
|
||||
ForContainer(cnr),
|
||||
UseBuilder(&testBuilder{
|
||||
vectors: nodesCopy,
|
||||
}),
|
||||
WithoutSuccessTracking(),
|
||||
WithPriorityMetrics(m),
|
||||
WithNodeState(&nodeState{
|
||||
node: &sdkNode,
|
||||
}),
|
||||
)
|
||||
require.NoError(t, err)
|
||||
|
||||
// Without priority metric `ClusterName` the order will be:
|
||||
// [ {Node_0 A}, {Node_1 A}, {Node_2 B}, {Node_3 B}]
|
||||
// With priority metric `ClusterName` and current node in cluster B
|
||||
// the order should be:
|
||||
// [ {Node_2 B}, {Node_0 A}, {Node_1 A}, {Node_3 B}]
|
||||
next := tr.Next()
|
||||
require.NotNil(t, next)
|
||||
require.Equal(t, 3, len(next))
|
||||
require.Equal(t, "/ip4/0.0.0.0/tcp/2", string(next[0].PublicKey()))
|
||||
require.Equal(t, "/ip4/0.0.0.0/tcp/0", string(next[1].PublicKey()))
|
||||
require.Equal(t, "/ip4/0.0.0.0/tcp/1", string(next[2].PublicKey()))
|
||||
|
||||
next = tr.Next()
|
||||
// The last node is
|
||||
require.Equal(t, 1, len(next))
|
||||
require.Equal(t, "/ip4/0.0.0.0/tcp/3", string(next[0].PublicKey()))
|
||||
|
||||
next = tr.Next()
|
||||
require.Nil(t, next)
|
||||
})
|
||||
|
||||
t.Run("two reps two metrics", func(t *testing.T) {
|
||||
selectors := []int{3, 3}
|
||||
replicas := []int{2, 2}
|
||||
|
||||
nodes, cnr := testPlacement(selectors, replicas)
|
||||
|
||||
// REPLICA #1
|
||||
// Node_0, PK - ip4/0.0.0.0/tcp/0
|
||||
nodes[0][0].SetAttribute("ClusterName", "A")
|
||||
nodes[0][0].SetAttribute("UN-LOCODE", "RU LED")
|
||||
|
||||
// Node_1, PK - ip4/0.0.0.0/tcp/1
|
||||
nodes[0][1].SetAttribute("ClusterName", "A")
|
||||
nodes[0][1].SetAttribute("UN-LOCODE", "FI HEL")
|
||||
|
||||
// Node_2, PK - ip4/0.0.0.0/tcp/2
|
||||
nodes[0][2].SetAttribute("ClusterName", "A")
|
||||
nodes[0][2].SetAttribute("UN-LOCODE", "RU LED")
|
||||
|
||||
// REPLICA #2
|
||||
// Node_3 ip4/0.0.0.0/tcp/3
|
||||
nodes[1][0].SetAttribute("ClusterName", "B")
|
||||
nodes[1][0].SetAttribute("UN-LOCODE", "RU MOW")
|
||||
|
||||
// Node_4, PK - ip4/0.0.0.0/tcp/4
|
||||
nodes[1][1].SetAttribute("ClusterName", "B")
|
||||
nodes[1][1].SetAttribute("UN-LOCODE", "RU DME")
|
||||
|
||||
// Node_5, PK - ip4/0.0.0.0/tcp/5
|
||||
nodes[1][2].SetAttribute("ClusterName", "B")
|
||||
nodes[1][2].SetAttribute("UN-LOCODE", "RU MOW")
|
||||
|
||||
sdkNode := testNode(9)
|
||||
sdkNode.SetAttribute("ClusterName", "B")
|
||||
sdkNode.SetAttribute("UN-LOCODE", "RU DME")
|
||||
|
||||
nodesCopy := copyVectors(nodes)
|
||||
|
||||
m := []Metric{
|
||||
NewAttributeMetric("ClusterName"),
|
||||
NewAttributeMetric("UN-LOCODE"),
|
||||
}
|
||||
|
||||
tr, err := NewTraverser(
|
||||
ForContainer(cnr),
|
||||
UseBuilder(&testBuilder{
|
||||
vectors: nodesCopy,
|
||||
}),
|
||||
WithoutSuccessTracking(),
|
||||
WithPriorityMetrics(m),
|
||||
WithNodeState(&nodeState{
|
||||
node: &sdkNode,
|
||||
}),
|
||||
)
|
||||
require.NoError(t, err)
|
||||
|
||||
// Check that nodes in the same cluster and
|
||||
// in the same location should be the first in slice.
|
||||
// Nodes which are follow criteria but stay outside the replica
|
||||
// should be in the next slice.
|
||||
|
||||
next := tr.Next()
|
||||
require.Equal(t, 4, len(next))
|
||||
require.Equal(t, "/ip4/0.0.0.0/tcp/4", string(next[0].PublicKey()))
|
||||
require.Equal(t, "/ip4/0.0.0.0/tcp/3", string(next[1].PublicKey()))
|
||||
require.Equal(t, "/ip4/0.0.0.0/tcp/0", string(next[2].PublicKey()))
|
||||
require.Equal(t, "/ip4/0.0.0.0/tcp/1", string(next[3].PublicKey()))
|
||||
|
||||
next = tr.Next()
|
||||
require.Equal(t, 2, len(next))
|
||||
require.Equal(t, "/ip4/0.0.0.0/tcp/2", string(next[0].PublicKey()))
|
||||
require.Equal(t, "/ip4/0.0.0.0/tcp/5", string(next[1].PublicKey()))
|
||||
|
||||
next = tr.Next()
|
||||
require.Nil(t, next)
|
||||
|
||||
sdkNode.SetAttribute("ClusterName", "B")
|
||||
sdkNode.SetAttribute("UN-LOCODE", "RU MOW")
|
||||
|
||||
nodesCopy = copyVectors(nodes)
|
||||
|
||||
tr, err = NewTraverser(
|
||||
ForContainer(cnr),
|
||||
UseBuilder(&testBuilder{
|
||||
vectors: nodesCopy,
|
||||
}),
|
||||
WithoutSuccessTracking(),
|
||||
WithPriorityMetrics(m),
|
||||
WithNodeState(&nodeState{
|
||||
node: &sdkNode,
|
||||
}),
|
||||
)
|
||||
require.NoError(t, err)
|
||||
|
||||
next = tr.Next()
|
||||
require.Equal(t, 4, len(next))
|
||||
require.Equal(t, "/ip4/0.0.0.0/tcp/3", string(next[0].PublicKey()))
|
||||
require.Equal(t, "/ip4/0.0.0.0/tcp/4", string(next[1].PublicKey()))
|
||||
require.Equal(t, "/ip4/0.0.0.0/tcp/0", string(next[2].PublicKey()))
|
||||
require.Equal(t, "/ip4/0.0.0.0/tcp/1", string(next[3].PublicKey()))
|
||||
|
||||
next = tr.Next()
|
||||
require.Equal(t, 2, len(next))
|
||||
require.Equal(t, "/ip4/0.0.0.0/tcp/2", string(next[0].PublicKey()))
|
||||
require.Equal(t, "/ip4/0.0.0.0/tcp/5", string(next[1].PublicKey()))
|
||||
|
||||
next = tr.Next()
|
||||
require.Nil(t, next)
|
||||
|
||||
sdkNode.SetAttribute("ClusterName", "A")
|
||||
sdkNode.SetAttribute("UN-LOCODE", "RU LED")
|
||||
|
||||
nodesCopy = copyVectors(nodes)
|
||||
|
||||
tr, err = NewTraverser(
|
||||
ForContainer(cnr),
|
||||
UseBuilder(&testBuilder{
|
||||
vectors: nodesCopy,
|
||||
}),
|
||||
WithoutSuccessTracking(),
|
||||
WithPriorityMetrics(m),
|
||||
WithNodeState(&nodeState{
|
||||
node: &sdkNode,
|
||||
}),
|
||||
)
|
||||
require.NoError(t, err)
|
||||
|
||||
next = tr.Next()
|
||||
require.Equal(t, 4, len(next))
|
||||
require.Equal(t, "/ip4/0.0.0.0/tcp/0", string(next[0].PublicKey()))
|
||||
require.Equal(t, "/ip4/0.0.0.0/tcp/1", string(next[1].PublicKey()))
|
||||
require.Equal(t, "/ip4/0.0.0.0/tcp/3", string(next[2].PublicKey()))
|
||||
require.Equal(t, "/ip4/0.0.0.0/tcp/4", string(next[3].PublicKey()))
|
||||
|
||||
next = tr.Next()
|
||||
require.Equal(t, 2, len(next))
|
||||
require.Equal(t, "/ip4/0.0.0.0/tcp/2", string(next[0].PublicKey()))
|
||||
require.Equal(t, "/ip4/0.0.0.0/tcp/5", string(next[1].PublicKey()))
|
||||
|
||||
next = tr.Next()
|
||||
require.Nil(t, next)
|
||||
})
|
||||
|
||||
t.Run("ec container", func(t *testing.T) {
|
||||
selectors := []int{4}
|
||||
ec := [][]int{{2, 1}}
|
||||
|
||||
nodes, cnr := testECPlacement(selectors, ec)
|
||||
|
||||
// Node_0, PK - ip4/0.0.0.0/tcp/0
|
||||
nodes[0][0].SetAttribute("ClusterName", "A")
|
||||
// Node_1, PK - ip4/0.0.0.0/tcp/1
|
||||
nodes[0][1].SetAttribute("ClusterName", "A")
|
||||
// Node_2, PK - ip4/0.0.0.0/tcp/2
|
||||
nodes[0][2].SetAttribute("ClusterName", "B")
|
||||
// Node_3, PK - ip4/0.0.0.0/tcp/3
|
||||
nodes[0][3].SetAttribute("ClusterName", "B")
|
||||
|
||||
sdkNode := testNode(5)
|
||||
sdkNode.SetAttribute("ClusterName", "B")
|
||||
|
||||
nodesCopy := copyVectors(nodes)
|
||||
|
||||
m := []Metric{NewAttributeMetric("ClusterName")}
|
||||
|
||||
tr, err := NewTraverser(
|
||||
ForContainer(cnr),
|
||||
UseBuilder(&testBuilder{
|
||||
vectors: nodesCopy,
|
||||
}),
|
||||
WithoutSuccessTracking(),
|
||||
WithPriorityMetrics(m),
|
||||
WithNodeState(&nodeState{
|
||||
node: &sdkNode,
|
||||
}),
|
||||
)
|
||||
require.NoError(t, err)
|
||||
|
||||
// Without priority metric `ClusterName` the order will be:
|
||||
// [ {Node_0 A}, {Node_1 A}, {Node_2 B}, {Node_3 B}]
|
||||
// With priority metric `ClusterName` and current node in cluster B
|
||||
// the order should be:
|
||||
// [ {Node_2 B}, {Node_0 A}, {Node_1 A}, {Node_3 B}]
|
||||
next := tr.Next()
|
||||
require.NotNil(t, next)
|
||||
require.Equal(t, 3, len(next))
|
||||
require.Equal(t, "/ip4/0.0.0.0/tcp/2", string(next[0].PublicKey()))
|
||||
require.Equal(t, "/ip4/0.0.0.0/tcp/0", string(next[1].PublicKey()))
|
||||
require.Equal(t, "/ip4/0.0.0.0/tcp/1", string(next[2].PublicKey()))
|
||||
|
||||
next = tr.Next()
|
||||
// The last node is
|
||||
require.Equal(t, 1, len(next))
|
||||
require.Equal(t, "/ip4/0.0.0.0/tcp/3", string(next[0].PublicKey()))
|
||||
|
||||
next = tr.Next()
|
||||
require.Nil(t, next)
|
||||
})
|
||||
}
|
||||
|
|
|
@ -9,14 +9,25 @@ import (
|
|||
objectcore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object"
|
||||
policycore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/policy"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/replicator"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-observability/tracing"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/netmap"
|
||||
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
|
||||
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
||||
"go.opentelemetry.io/otel/attribute"
|
||||
"go.opentelemetry.io/otel/trace"
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
func (p *Policer) processObject(ctx context.Context, objInfo objectcore.Info) error {
|
||||
ctx, span := tracing.StartSpanFromContext(ctx, "Policer.ProcessObject", trace.WithAttributes(
|
||||
attribute.String("address", objInfo.Address.String()),
|
||||
attribute.Bool("is_linking_object", objInfo.IsLinkingObject),
|
||||
attribute.Bool("is_ec_part", objInfo.ECInfo != nil),
|
||||
attribute.String("type", objInfo.Type.String()),
|
||||
))
|
||||
defer span.End()
|
||||
|
||||
cnr, err := p.cnrSrc.Get(objInfo.Address.Container())
|
||||
if err != nil {
|
||||
if client.IsErrContainerNotFound(err) {
|
||||
|
|
207
pkg/services/tree/ape_test.go
Normal file
207
pkg/services/tree/ape_test.go
Normal file
|
@ -0,0 +1,207 @@
|
|||
package tree
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/hex"
|
||||
"fmt"
|
||||
"testing"
|
||||
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-contract/frostfsid/client"
|
||||
core "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/container"
|
||||
frostfsidcore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/frostfsid"
|
||||
checkercore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/common/ape"
|
||||
containerSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/acl"
|
||||
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
|
||||
"git.frostfs.info/TrueCloudLab/policy-engine/pkg/chain"
|
||||
"git.frostfs.info/TrueCloudLab/policy-engine/pkg/engine"
|
||||
"git.frostfs.info/TrueCloudLab/policy-engine/pkg/engine/inmemory"
|
||||
nativeschema "git.frostfs.info/TrueCloudLab/policy-engine/schema/native"
|
||||
"github.com/nspcc-dev/neo-go/pkg/crypto/keys"
|
||||
"github.com/nspcc-dev/neo-go/pkg/util"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
var (
|
||||
containerID = "73tQMTYyUkTgmvPR1HWib6pndbhSoBovbnMF7Pws8Rcy"
|
||||
|
||||
senderPrivateKey, _ = keys.NewPrivateKey()
|
||||
|
||||
senderKey = hex.EncodeToString(senderPrivateKey.PublicKey().Bytes())
|
||||
|
||||
rootCnr = &core.Container{Value: containerSDK.Container{}}
|
||||
)
|
||||
|
||||
type frostfsIDProviderMock struct {
|
||||
subjects map[util.Uint160]*client.Subject
|
||||
subjectsExtended map[util.Uint160]*client.SubjectExtended
|
||||
}
|
||||
|
||||
func (f *frostfsIDProviderMock) GetSubject(key util.Uint160) (*client.Subject, error) {
|
||||
v, ok := f.subjects[key]
|
||||
if !ok {
|
||||
return nil, fmt.Errorf("%s", frostfsidcore.SubjectNotFoundErrorMessage)
|
||||
}
|
||||
return v, nil
|
||||
}
|
||||
|
||||
func (f *frostfsIDProviderMock) GetSubjectExtended(key util.Uint160) (*client.SubjectExtended, error) {
|
||||
v, ok := f.subjectsExtended[key]
|
||||
if !ok {
|
||||
return nil, fmt.Errorf("%s", frostfsidcore.SubjectNotFoundErrorMessage)
|
||||
}
|
||||
return v, nil
|
||||
}
|
||||
|
||||
var _ frostfsidcore.SubjectProvider = (*frostfsIDProviderMock)(nil)
|
||||
|
||||
func newFrostfsIDProviderMock(t *testing.T) *frostfsIDProviderMock {
|
||||
return &frostfsIDProviderMock{
|
||||
subjects: map[util.Uint160]*client.Subject{
|
||||
scriptHashFromSenderKey(t, senderKey): {
|
||||
Namespace: "testnamespace",
|
||||
Name: "test",
|
||||
KV: map[string]string{
|
||||
"tag-attr1": "value1",
|
||||
"tag-attr2": "value2",
|
||||
},
|
||||
},
|
||||
},
|
||||
subjectsExtended: map[util.Uint160]*client.SubjectExtended{
|
||||
scriptHashFromSenderKey(t, senderKey): {
|
||||
Namespace: "testnamespace",
|
||||
Name: "test",
|
||||
KV: map[string]string{
|
||||
"tag-attr1": "value1",
|
||||
"tag-attr2": "value2",
|
||||
},
|
||||
Groups: []*client.Group{
|
||||
{
|
||||
ID: 1,
|
||||
Name: "test",
|
||||
Namespace: "testnamespace",
|
||||
KV: map[string]string{
|
||||
"attr1": "value1",
|
||||
"attr2": "value2",
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
func scriptHashFromSenderKey(t *testing.T, senderKey string) util.Uint160 {
|
||||
pk, err := keys.NewPublicKeyFromString(senderKey)
|
||||
require.NoError(t, err)
|
||||
return pk.GetScriptHash()
|
||||
}
|
||||
|
||||
type stMock struct{}
|
||||
|
||||
func (m *stMock) CurrentEpoch() uint64 {
|
||||
return 8
|
||||
}
|
||||
|
||||
func TestCheckAPE(t *testing.T) {
|
||||
cid := cid.ID{}
|
||||
_ = cid.DecodeString(containerID)
|
||||
|
||||
t.Run("put non-tombstone rule won't affect tree remove", func(t *testing.T) {
|
||||
los := inmemory.NewInmemoryLocalStorage()
|
||||
mcs := inmemory.NewInmemoryMorphRuleChainStorage()
|
||||
fid := newFrostfsIDProviderMock(t)
|
||||
s := Service{
|
||||
cfg: cfg{
|
||||
frostfsidSubjectProvider: fid,
|
||||
},
|
||||
apeChecker: checkercore.New(los, mcs, fid, &stMock{}),
|
||||
}
|
||||
|
||||
los.AddOverride(chain.Ingress, engine.ContainerTarget(containerID), &chain.Chain{
|
||||
Rules: []chain.Rule{
|
||||
{
|
||||
Status: chain.AccessDenied,
|
||||
Actions: chain.Actions{Names: []string{nativeschema.MethodPutObject}},
|
||||
Resources: chain.Resources{
|
||||
Names: []string{fmt.Sprintf(nativeschema.ResourceFormatRootContainerObjects, containerID)},
|
||||
},
|
||||
Condition: []chain.Condition{
|
||||
{
|
||||
Op: chain.CondStringNotEquals,
|
||||
Kind: chain.KindResource,
|
||||
Key: nativeschema.PropertyKeyObjectType,
|
||||
Value: "TOMBSTONE",
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
MatchType: chain.MatchTypeFirstMatch,
|
||||
})
|
||||
|
||||
mcs.AddMorphRuleChain(chain.Ingress, engine.ContainerTarget(containerID), &chain.Chain{
|
||||
Rules: []chain.Rule{
|
||||
{
|
||||
Status: chain.Allow,
|
||||
Actions: chain.Actions{Names: []string{nativeschema.MethodDeleteObject}},
|
||||
Resources: chain.Resources{
|
||||
Names: []string{fmt.Sprintf(nativeschema.ResourceFormatRootContainerObjects, containerID)},
|
||||
},
|
||||
},
|
||||
},
|
||||
MatchType: chain.MatchTypeFirstMatch,
|
||||
})
|
||||
|
||||
err := s.checkAPE(context.Background(), nil, rootCnr, cid, acl.OpObjectDelete, acl.RoleOwner, senderPrivateKey.PublicKey())
|
||||
require.NoError(t, err)
|
||||
})
|
||||
|
||||
t.Run("delete rule won't affect tree add", func(t *testing.T) {
|
||||
los := inmemory.NewInmemoryLocalStorage()
|
||||
mcs := inmemory.NewInmemoryMorphRuleChainStorage()
|
||||
fid := newFrostfsIDProviderMock(t)
|
||||
s := Service{
|
||||
cfg: cfg{
|
||||
frostfsidSubjectProvider: fid,
|
||||
},
|
||||
apeChecker: checkercore.New(los, mcs, fid, &stMock{}),
|
||||
}
|
||||
|
||||
los.AddOverride(chain.Ingress, engine.ContainerTarget(containerID), &chain.Chain{
|
||||
Rules: []chain.Rule{
|
||||
{
|
||||
Status: chain.AccessDenied,
|
||||
Actions: chain.Actions{Names: []string{nativeschema.MethodDeleteObject}},
|
||||
Resources: chain.Resources{
|
||||
Names: []string{fmt.Sprintf(nativeschema.ResourceFormatRootContainerObjects, containerID)},
|
||||
},
|
||||
},
|
||||
},
|
||||
MatchType: chain.MatchTypeFirstMatch,
|
||||
})
|
||||
|
||||
mcs.AddMorphRuleChain(chain.Ingress, engine.ContainerTarget(containerID), &chain.Chain{
|
||||
Rules: []chain.Rule{
|
||||
{
|
||||
Status: chain.Allow,
|
||||
Actions: chain.Actions{Names: []string{nativeschema.MethodPutObject}},
|
||||
Resources: chain.Resources{
|
||||
Names: []string{fmt.Sprintf(nativeschema.ResourceFormatRootContainerObjects, containerID)},
|
||||
},
|
||||
Condition: []chain.Condition{
|
||||
{
|
||||
Op: chain.CondStringNotEquals,
|
||||
Kind: chain.KindResource,
|
||||
Key: nativeschema.PropertyKeyObjectType,
|
||||
Value: "TOMBSTONE",
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
MatchType: chain.MatchTypeFirstMatch,
|
||||
})
|
||||
|
||||
err := s.checkAPE(context.Background(), nil, rootCnr, cid, acl.OpObjectPut, acl.RoleOwner, senderPrivateKey.PublicKey())
|
||||
require.NoError(t, err)
|
||||
})
|
||||
}
|
|
@ -5,7 +5,7 @@ import (
|
|||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"sort"
|
||||
"slices"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
|
||||
|
@ -210,7 +210,7 @@ func (s *Service) Remove(ctx context.Context, req *RemoveRequest) (*RemoveRespon
|
|||
return nil, err
|
||||
}
|
||||
|
||||
err := s.verifyClient(ctx, req, cid, b.GetBearerToken(), acl.OpObjectPut)
|
||||
err := s.verifyClient(ctx, req, cid, b.GetBearerToken(), acl.OpObjectDelete)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
@ -575,10 +575,9 @@ func sortByFilename(nodes []pilorama.NodeInfo, d GetSubTreeRequest_Body_Order_Di
|
|||
if len(nodes) == 0 {
|
||||
return nodes, nil
|
||||
}
|
||||
less := func(i, j int) bool {
|
||||
return bytes.Compare(nodes[i].Meta.GetAttr(pilorama.AttributeFilename), nodes[j].Meta.GetAttr(pilorama.AttributeFilename)) < 0
|
||||
}
|
||||
sort.Slice(nodes, less)
|
||||
slices.SortFunc(nodes, func(a, b pilorama.NodeInfo) int {
|
||||
return bytes.Compare(a.Meta.GetAttr(pilorama.AttributeFilename), b.Meta.GetAttr(pilorama.AttributeFilename))
|
||||
})
|
||||
return nodes, nil
|
||||
default:
|
||||
return nil, fmt.Errorf("unsupported order direction: %s", d.String())
|
||||
|
|
|
@ -3,8 +3,14 @@ package httputil
|
|||
import (
|
||||
"net/http"
|
||||
"net/http/pprof"
|
||||
|
||||
"github.com/felixge/fgprof"
|
||||
)
|
||||
|
||||
func init() {
|
||||
http.DefaultServeMux.Handle("/debug/fgprof", fgprof.Handler())
|
||||
}
|
||||
|
||||
// initializes pprof package in order to
|
||||
// register Prometheus handlers on http.DefaultServeMux.
|
||||
var _ = pprof.Handler("")
|
||||
|
|
|
@ -19,7 +19,7 @@ import (
|
|||
|
||||
func GeneratePayloadPool(count uint, size uint) [][]byte {
|
||||
var pool [][]byte
|
||||
for i := uint(0); i < count; i++ {
|
||||
for range count {
|
||||
payload := make([]byte, size)
|
||||
_, _ = rand.Read(payload)
|
||||
|
||||
|
@ -30,8 +30,8 @@ func GeneratePayloadPool(count uint, size uint) [][]byte {
|
|||
|
||||
func GenerateAttributePool(count uint) []objectSDK.Attribute {
|
||||
var pool []objectSDK.Attribute
|
||||
for i := uint(0); i < count; i++ {
|
||||
for j := uint(0); j < count; j++ {
|
||||
for i := range count {
|
||||
for j := range count {
|
||||
attr := *objectSDK.NewAttribute()
|
||||
attr.SetKey(fmt.Sprintf("key%d", i))
|
||||
attr.SetValue(fmt.Sprintf("value%d", j))
|
||||
|
@ -43,7 +43,7 @@ func GenerateAttributePool(count uint) []objectSDK.Attribute {
|
|||
|
||||
func GenerateOwnerPool(count uint) []user.ID {
|
||||
var pool []user.ID
|
||||
for i := uint(0); i < count; i++ {
|
||||
for range count {
|
||||
pool = append(pool, usertest.ID())
|
||||
}
|
||||
return pool
|
||||
|
@ -118,7 +118,7 @@ func WithPayloadFromPool(pool [][]byte) ObjectOption {
|
|||
func WithAttributesFromPool(pool []objectSDK.Attribute, count uint) ObjectOption {
|
||||
return func(obj *objectSDK.Object) {
|
||||
var attrs []objectSDK.Attribute
|
||||
for i := uint(0); i < count; i++ {
|
||||
for range count {
|
||||
attrs = append(attrs, pool[rand.Intn(len(pool))])
|
||||
}
|
||||
obj.SetAttributes(attrs...)
|
||||
|
|
|
@ -29,7 +29,7 @@ func PopulateWithObjects(
|
|||
) {
|
||||
digits := "0123456789"
|
||||
|
||||
for i := uint(0); i < count; i++ {
|
||||
for range count {
|
||||
obj := factory()
|
||||
|
||||
id := []byte(fmt.Sprintf(
|
||||
|
@ -59,7 +59,7 @@ func PopulateWithBigObjects(
|
|||
count uint,
|
||||
factory func() *objectSDK.Object,
|
||||
) {
|
||||
for i := uint(0); i < count; i++ {
|
||||
for range count {
|
||||
group.Go(func() error {
|
||||
if err := populateWithBigObject(ctx, db, factory); err != nil {
|
||||
return fmt.Errorf("couldn't put a big object: %w", err)
|
||||
|
@ -154,7 +154,7 @@ func PopulateGraveyard(
|
|||
wg := &sync.WaitGroup{}
|
||||
wg.Add(int(count))
|
||||
|
||||
for i := uint(0); i < count; i++ {
|
||||
for range count {
|
||||
obj := factory()
|
||||
|
||||
prm := meta.PutPrm{}
|
||||
|
@ -226,7 +226,7 @@ func PopulateLocked(
|
|||
wg := &sync.WaitGroup{}
|
||||
wg.Add(int(count))
|
||||
|
||||
for i := uint(0); i < count; i++ {
|
||||
for range count {
|
||||
defer wg.Done()
|
||||
|
||||
obj := factory()
|
||||
|
|
|
@ -116,7 +116,7 @@ func populate() (err error) {
|
|||
eg, ctx := errgroup.WithContext(ctx)
|
||||
eg.SetLimit(int(jobs))
|
||||
|
||||
for i := uint(0); i < numContainers; i++ {
|
||||
for range numContainers {
|
||||
cid := cidtest.ID()
|
||||
|
||||
for _, typ := range types {
|
||||
|
|
Loading…
Reference in a new issue