forked from TrueCloudLab/frostfs-node
Compare commits
1 commit
master
...
feature/ch
Author | SHA1 | Date | |
---|---|---|---|
e249c95844 |
40 changed files with 246 additions and 270 deletions
|
@ -575,10 +575,8 @@ func (c *initializeContext) getContractDeployData(ctrName string, keysParam []an
|
|||
c.Contracts[containerContract].Hash,
|
||||
keysParam,
|
||||
configParam)
|
||||
case proxyContract:
|
||||
case proxyContract, policyContract:
|
||||
items = nil
|
||||
case policyContract:
|
||||
items = []any{nil}
|
||||
default:
|
||||
panic(fmt.Sprintf("invalid contract name: %s", ctrName))
|
||||
}
|
||||
|
|
|
@ -39,7 +39,7 @@ var accountingBalanceCmd = &cobra.Command{
|
|||
|
||||
var prm internalclient.BalanceOfPrm
|
||||
prm.SetClient(cli)
|
||||
prm.Account = idUser
|
||||
prm.Account = &idUser
|
||||
|
||||
res, err := internalclient.BalanceOf(cmd.Context(), prm)
|
||||
commonCmd.ExitOnErr(cmd, "rpc error: %w", err)
|
||||
|
|
|
@ -1,6 +1,7 @@
|
|||
package accounting
|
||||
|
||||
import (
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-cli/internal/common"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-cli/internal/commonflags"
|
||||
"github.com/spf13/cobra"
|
||||
"github.com/spf13/viper"
|
||||
|
@ -17,7 +18,9 @@ var Cmd = &cobra.Command{
|
|||
_ = viper.BindPFlag(commonflags.WalletPath, flags.Lookup(commonflags.WalletPath))
|
||||
_ = viper.BindPFlag(commonflags.Account, flags.Lookup(commonflags.Account))
|
||||
_ = viper.BindPFlag(commonflags.RPC, flags.Lookup(commonflags.RPC))
|
||||
common.StartClientCommandSpan(cmd)
|
||||
},
|
||||
PersistentPostRun: common.StopClientCommandSpan,
|
||||
}
|
||||
|
||||
func init() {
|
||||
|
|
|
@ -47,7 +47,7 @@ var listContainersCmd = &cobra.Command{
|
|||
|
||||
var prm internalclient.ListContainersPrm
|
||||
prm.SetClient(cli)
|
||||
prm.Account = idUser
|
||||
prm.Account = &idUser
|
||||
|
||||
res, err := internalclient.ListContainers(cmd.Context(), prm)
|
||||
commonCmd.ExitOnErr(cmd, "rpc error: %w", err)
|
||||
|
|
|
@ -1,6 +1,7 @@
|
|||
package container
|
||||
|
||||
import (
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-cli/internal/common"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-cli/internal/commonflags"
|
||||
"github.com/spf13/cobra"
|
||||
)
|
||||
|
@ -15,7 +16,9 @@ var Cmd = &cobra.Command{
|
|||
// the viper before execution
|
||||
commonflags.Bind(cmd)
|
||||
commonflags.BindAPI(cmd)
|
||||
common.StartClientCommandSpan(cmd)
|
||||
},
|
||||
PersistentPostRun: common.StopClientCommandSpan,
|
||||
}
|
||||
|
||||
func init() {
|
||||
|
|
|
@ -1,6 +1,8 @@
|
|||
package control
|
||||
|
||||
import (
|
||||
"errors"
|
||||
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/refs"
|
||||
rawclient "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/rpc/client"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-cli/internal/commonflags"
|
||||
|
@ -33,7 +35,6 @@ func initControlIRRemoveContainerCmd() {
|
|||
flags.String(commonflags.CIDFlag, "", commonflags.CIDFlagUsage)
|
||||
flags.String(ownerFlag, "", "Container owner's wallet address.")
|
||||
removeContainerCmd.MarkFlagsMutuallyExclusive(commonflags.CIDFlag, ownerFlag)
|
||||
removeContainerCmd.MarkFlagsOneRequired(commonflags.CIDFlag, ownerFlag)
|
||||
}
|
||||
|
||||
func removeContainer(cmd *cobra.Command, _ []string) {
|
||||
|
@ -73,6 +74,10 @@ func prepareRemoveContainerRequest(cmd *cobra.Command) *ircontrol.RemoveContaine
|
|||
ownerStr, err := cmd.Flags().GetString(ownerFlag)
|
||||
commonCmd.ExitOnErr(cmd, "failed to get owner: ", err)
|
||||
|
||||
if len(ownerStr) == 0 && len(cidStr) == 0 {
|
||||
commonCmd.ExitOnErr(cmd, "invalid usage: %w", errors.New("neither owner's wallet address nor container ID are specified"))
|
||||
}
|
||||
|
||||
if len(ownerStr) > 0 {
|
||||
var owner user.ID
|
||||
commonCmd.ExitOnErr(cmd, "invalid owner ID: %w", owner.DecodeString(ownerStr))
|
||||
|
|
|
@ -1,6 +1,7 @@
|
|||
package netmap
|
||||
|
||||
import (
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-cli/internal/common"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-cli/internal/commonflags"
|
||||
"github.com/spf13/cobra"
|
||||
)
|
||||
|
@ -14,7 +15,9 @@ var Cmd = &cobra.Command{
|
|||
// the viper before execution
|
||||
commonflags.Bind(cmd)
|
||||
commonflags.BindAPI(cmd)
|
||||
common.StartClientCommandSpan(cmd)
|
||||
},
|
||||
PersistentPostRun: common.StopClientCommandSpan,
|
||||
}
|
||||
|
||||
func init() {
|
||||
|
|
|
@ -94,7 +94,7 @@ var objectLockCmd = &cobra.Command{
|
|||
|
||||
obj := objectSDK.New()
|
||||
obj.SetContainerID(cnr)
|
||||
obj.SetOwnerID(idOwner)
|
||||
obj.SetOwnerID(&idOwner)
|
||||
obj.SetType(objectSDK.TypeLock)
|
||||
obj.SetAttributes(expirationAttr)
|
||||
obj.SetPayload(lock.Marshal())
|
||||
|
|
|
@ -93,7 +93,7 @@ func putObject(cmd *cobra.Command, _ []string) {
|
|||
attrs := getAllObjectAttributes(cmd)
|
||||
|
||||
obj.SetContainerID(cnr)
|
||||
obj.SetOwnerID(ownerID)
|
||||
obj.SetOwnerID(&ownerID)
|
||||
obj.SetAttributes(attrs...)
|
||||
|
||||
notificationInfo, err := parseObjectNotifications(cmd)
|
||||
|
@ -160,7 +160,7 @@ func readFilePayload(filename string, cmd *cobra.Command) (io.Reader, cid.ID, us
|
|||
commonCmd.ExitOnErr(cmd, "can't unmarshal object from given file: %w", objTemp.Unmarshal(buf))
|
||||
payloadReader := bytes.NewReader(objTemp.Payload())
|
||||
cnr, _ := objTemp.ContainerID()
|
||||
ownerID := objTemp.OwnerID()
|
||||
ownerID := *objTemp.OwnerID()
|
||||
return payloadReader, cnr, ownerID
|
||||
}
|
||||
|
||||
|
|
|
@ -1,6 +1,7 @@
|
|||
package object
|
||||
|
||||
import (
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-cli/internal/common"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-cli/internal/commonflags"
|
||||
"github.com/spf13/cobra"
|
||||
)
|
||||
|
@ -15,7 +16,9 @@ var Cmd = &cobra.Command{
|
|||
// the viper before execution
|
||||
commonflags.Bind(cmd)
|
||||
commonflags.BindAPI(cmd)
|
||||
common.StartClientCommandSpan(cmd)
|
||||
},
|
||||
PersistentPostRun: common.StopClientCommandSpan,
|
||||
}
|
||||
|
||||
func init() {
|
||||
|
|
|
@ -46,10 +46,6 @@ of frostfs-api and some useful utilities for compiling ACL rules from JSON
|
|||
notation, managing container access through protocol gates, querying network map
|
||||
and much more!`,
|
||||
Run: entryPoint,
|
||||
PersistentPreRun: func(cmd *cobra.Command, _ []string) {
|
||||
common.StartClientCommandSpan(cmd)
|
||||
},
|
||||
PersistentPostRun: common.StopClientCommandSpan,
|
||||
}
|
||||
|
||||
// Execute adds all child commands to the root command and sets flags appropriately.
|
||||
|
@ -61,7 +57,6 @@ func Execute() {
|
|||
|
||||
func init() {
|
||||
cobra.OnInitialize(initConfig)
|
||||
cobra.EnableTraverseRunHooks = true
|
||||
|
||||
// use stdout as default output for cmd.Print()
|
||||
rootCmd.SetOut(os.Stdout)
|
||||
|
|
|
@ -6,6 +6,7 @@ import (
|
|||
"os"
|
||||
|
||||
internalclient "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-cli/internal/client"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-cli/internal/common"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-cli/internal/commonflags"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-cli/internal/key"
|
||||
commonCmd "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/internal/common"
|
||||
|
@ -32,7 +33,9 @@ var createCmd = &cobra.Command{
|
|||
PersistentPreRun: func(cmd *cobra.Command, args []string) {
|
||||
_ = viper.BindPFlag(commonflags.WalletPath, cmd.Flags().Lookup(commonflags.WalletPath))
|
||||
_ = viper.BindPFlag(commonflags.Account, cmd.Flags().Lookup(commonflags.Account))
|
||||
common.StartClientCommandSpan(cmd)
|
||||
},
|
||||
PersistentPostRun: common.StopClientCommandSpan,
|
||||
}
|
||||
|
||||
func init() {
|
||||
|
|
|
@ -12,29 +12,38 @@ import (
|
|||
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
|
||||
netmapSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/netmap"
|
||||
lru "github.com/hashicorp/golang-lru/v2"
|
||||
"github.com/hashicorp/golang-lru/v2/expirable"
|
||||
)
|
||||
|
||||
type netValueReader[K any, V any] func(K) (V, error)
|
||||
|
||||
type valueWithError[V any] struct {
|
||||
type valueWithTime[V any] struct {
|
||||
v V
|
||||
t time.Time
|
||||
// cached error in order to not repeat failed request for some time
|
||||
e error
|
||||
}
|
||||
|
||||
// entity that provides TTL cache interface.
|
||||
type ttlNetCache[K comparable, V any] struct {
|
||||
cache *expirable.LRU[K, *valueWithError[V]]
|
||||
netRdr netValueReader[K, V]
|
||||
ttl time.Duration
|
||||
|
||||
sz int
|
||||
|
||||
cache *lru.Cache[K, *valueWithTime[V]]
|
||||
|
||||
netRdr netValueReader[K, V]
|
||||
|
||||
keyLocker *utilSync.KeyLocker[K]
|
||||
}
|
||||
|
||||
// complicates netValueReader with TTL caching mechanism.
|
||||
func newNetworkTTLCache[K comparable, V any](sz int, ttl time.Duration, netRdr netValueReader[K, V]) *ttlNetCache[K, V] {
|
||||
cache := expirable.NewLRU[K, *valueWithError[V]](sz, nil, ttl)
|
||||
cache, err := lru.New[K, *valueWithTime[V]](sz)
|
||||
fatalOnErr(err)
|
||||
|
||||
return &ttlNetCache[K, V]{
|
||||
ttl: ttl,
|
||||
sz: sz,
|
||||
cache: cache,
|
||||
netRdr: netRdr,
|
||||
keyLocker: utilSync.NewKeyLocker[K](),
|
||||
|
@ -48,7 +57,7 @@ func newNetworkTTLCache[K comparable, V any](sz int, ttl time.Duration, netRdr n
|
|||
// returned value should not be modified.
|
||||
func (c *ttlNetCache[K, V]) get(key K) (V, error) {
|
||||
val, ok := c.cache.Peek(key)
|
||||
if ok {
|
||||
if ok && time.Since(val.t) < c.ttl {
|
||||
return val.v, val.e
|
||||
}
|
||||
|
||||
|
@ -56,14 +65,15 @@ func (c *ttlNetCache[K, V]) get(key K) (V, error) {
|
|||
defer c.keyLocker.Unlock(key)
|
||||
|
||||
val, ok = c.cache.Peek(key)
|
||||
if ok {
|
||||
if ok && time.Since(val.t) < c.ttl {
|
||||
return val.v, val.e
|
||||
}
|
||||
|
||||
v, err := c.netRdr(key)
|
||||
|
||||
c.cache.Add(key, &valueWithError[V]{
|
||||
c.cache.Add(key, &valueWithTime[V]{
|
||||
v: v,
|
||||
t: time.Now(),
|
||||
e: err,
|
||||
})
|
||||
|
||||
|
@ -74,8 +84,9 @@ func (c *ttlNetCache[K, V]) set(k K, v V, e error) {
|
|||
c.keyLocker.Lock(k)
|
||||
defer c.keyLocker.Unlock(k)
|
||||
|
||||
c.cache.Add(k, &valueWithError[V]{
|
||||
c.cache.Add(k, &valueWithTime[V]{
|
||||
v: v,
|
||||
t: time.Now(),
|
||||
e: e,
|
||||
})
|
||||
}
|
||||
|
|
|
@ -1,56 +0,0 @@
|
|||
package main
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
func TestTTLNetCache(t *testing.T) {
|
||||
ttlDuration := time.Millisecond * 50
|
||||
cache := newNetworkTTLCache[string, time.Time](10, ttlDuration, testNetValueReader)
|
||||
|
||||
key := "key"
|
||||
|
||||
t.Run("Test Add and Get", func(t *testing.T) {
|
||||
ti := time.Now()
|
||||
cache.set(key, ti, nil)
|
||||
val, err := cache.get(key)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, ti, val)
|
||||
})
|
||||
|
||||
t.Run("Test TTL", func(t *testing.T) {
|
||||
ti := time.Now()
|
||||
cache.set(key, ti, nil)
|
||||
time.Sleep(2 * ttlDuration)
|
||||
val, err := cache.get(key)
|
||||
require.NoError(t, err)
|
||||
require.NotEqual(t, val, ti)
|
||||
})
|
||||
|
||||
t.Run("Test Remove", func(t *testing.T) {
|
||||
ti := time.Now()
|
||||
cache.set(key, ti, nil)
|
||||
cache.remove(key)
|
||||
val, err := cache.get(key)
|
||||
require.NoError(t, err)
|
||||
require.NotEqual(t, val, ti)
|
||||
})
|
||||
|
||||
t.Run("Test Cache Error", func(t *testing.T) {
|
||||
cache.set("error", time.Now(), errors.New("mock error"))
|
||||
_, err := cache.get("error")
|
||||
require.Error(t, err)
|
||||
require.Equal(t, "mock error", err.Error())
|
||||
})
|
||||
}
|
||||
|
||||
func testNetValueReader(key string) (time.Time, error) {
|
||||
if key == "error" {
|
||||
return time.Now(), errors.New("mock error")
|
||||
}
|
||||
return time.Now(), nil
|
||||
}
|
|
@ -126,10 +126,10 @@ type shardCfg struct {
|
|||
subStorages []subStorageCfg
|
||||
|
||||
gcCfg struct {
|
||||
removerBatchSize int
|
||||
removerSleepInterval time.Duration
|
||||
expiredCollectorBatchSize int
|
||||
expiredCollectorWorkerCount int
|
||||
removerBatchSize int
|
||||
removerSleepInterval time.Duration
|
||||
expiredCollectorBatchSize int
|
||||
expiredCollectorWorkersCount int
|
||||
}
|
||||
|
||||
writecacheCfg struct {
|
||||
|
@ -256,7 +256,7 @@ func (a *applicationConfiguration) setShardWriteCacheConfig(newConfig *shardCfg,
|
|||
wc.maxBatchDelay = writeCacheCfg.BoltDB().MaxBatchDelay()
|
||||
wc.maxObjSize = writeCacheCfg.MaxObjectSize()
|
||||
wc.smallObjectSize = writeCacheCfg.SmallObjectSize()
|
||||
wc.flushWorkerCount = writeCacheCfg.WorkerCount()
|
||||
wc.flushWorkerCount = writeCacheCfg.WorkersNumber()
|
||||
wc.sizeLimit = writeCacheCfg.SizeLimit()
|
||||
wc.noSync = writeCacheCfg.NoSync()
|
||||
wc.gcInterval = writeCacheCfg.GCInterval()
|
||||
|
@ -328,7 +328,7 @@ func (a *applicationConfiguration) setGCConfig(newConfig *shardCfg, oldConfig *s
|
|||
newConfig.gcCfg.removerBatchSize = gcCfg.RemoverBatchSize()
|
||||
newConfig.gcCfg.removerSleepInterval = gcCfg.RemoverSleepInterval()
|
||||
newConfig.gcCfg.expiredCollectorBatchSize = gcCfg.ExpiredCollectorBatchSize()
|
||||
newConfig.gcCfg.expiredCollectorWorkerCount = gcCfg.ExpiredCollectorWorkerCount()
|
||||
newConfig.gcCfg.expiredCollectorWorkersCount = gcCfg.ExpiredCollectorWorkersCount()
|
||||
}
|
||||
|
||||
// internals contains application-specific internals that are created
|
||||
|
@ -888,7 +888,7 @@ func (c *cfg) getShardOpts(shCfg shardCfg) shardOptsWithID {
|
|||
shard.WithRemoverBatchSize(shCfg.gcCfg.removerBatchSize),
|
||||
shard.WithGCRemoverSleepInterval(shCfg.gcCfg.removerSleepInterval),
|
||||
shard.WithExpiredCollectorBatchSize(shCfg.gcCfg.expiredCollectorBatchSize),
|
||||
shard.WithExpiredCollectorWorkerCount(shCfg.gcCfg.expiredCollectorWorkerCount),
|
||||
shard.WithExpiredCollectorWorkersCount(shCfg.gcCfg.expiredCollectorWorkersCount),
|
||||
shard.WithGCWorkerPoolInitializer(func(sz int) util.WorkerPool {
|
||||
pool, err := ants.NewPool(sz)
|
||||
fatalOnErr(err)
|
||||
|
|
|
@ -74,7 +74,7 @@ func TestEngineSection(t *testing.T) {
|
|||
require.Equal(t, "tmp/0/cache", wc.Path())
|
||||
require.EqualValues(t, 16384, wc.SmallObjectSize())
|
||||
require.EqualValues(t, 134217728, wc.MaxObjectSize())
|
||||
require.EqualValues(t, 30, wc.WorkerCount())
|
||||
require.EqualValues(t, 30, wc.WorkersNumber())
|
||||
require.EqualValues(t, 3221225472, wc.SizeLimit())
|
||||
|
||||
require.Equal(t, "tmp/0/meta", meta.Path())
|
||||
|
@ -108,7 +108,7 @@ func TestEngineSection(t *testing.T) {
|
|||
require.EqualValues(t, 150, gc.RemoverBatchSize())
|
||||
require.Equal(t, 2*time.Minute, gc.RemoverSleepInterval())
|
||||
require.Equal(t, 1500, gc.ExpiredCollectorBatchSize())
|
||||
require.Equal(t, 15, gc.ExpiredCollectorWorkerCount())
|
||||
require.Equal(t, 15, gc.ExpiredCollectorWorkersCount())
|
||||
|
||||
require.Equal(t, false, sc.RefillMetabase())
|
||||
require.Equal(t, mode.ReadOnly, sc.Mode())
|
||||
|
@ -125,7 +125,7 @@ func TestEngineSection(t *testing.T) {
|
|||
require.Equal(t, "tmp/1/cache", wc.Path())
|
||||
require.EqualValues(t, 16384, wc.SmallObjectSize())
|
||||
require.EqualValues(t, 134217728, wc.MaxObjectSize())
|
||||
require.EqualValues(t, 30, wc.WorkerCount())
|
||||
require.EqualValues(t, 30, wc.WorkersNumber())
|
||||
require.EqualValues(t, 4294967296, wc.SizeLimit())
|
||||
|
||||
require.Equal(t, "tmp/1/meta", meta.Path())
|
||||
|
@ -157,7 +157,7 @@ func TestEngineSection(t *testing.T) {
|
|||
require.EqualValues(t, 200, gc.RemoverBatchSize())
|
||||
require.Equal(t, 5*time.Minute, gc.RemoverSleepInterval())
|
||||
require.Equal(t, gcconfig.ExpiredCollectorBatchSizeDefault, gc.ExpiredCollectorBatchSize())
|
||||
require.Equal(t, gcconfig.ExpiredCollectorWorkersCountDefault, gc.ExpiredCollectorWorkerCount())
|
||||
require.Equal(t, gcconfig.ExpiredCollectorWorkersCountDefault, gc.ExpiredCollectorWorkersCount())
|
||||
|
||||
require.Equal(t, true, sc.RefillMetabase())
|
||||
require.Equal(t, mode.ReadWrite, sc.Mode())
|
||||
|
|
|
@ -63,14 +63,14 @@ func (x *Config) RemoverSleepInterval() time.Duration {
|
|||
return RemoverSleepIntervalDefault
|
||||
}
|
||||
|
||||
// ExpiredCollectorWorkerCount returns the value of "expired_collector_worker_count"
|
||||
// ExpiredCollectorWorkersCount returns the value of "expired_collector_workers_count"
|
||||
// config parameter.
|
||||
//
|
||||
// Returns ExpiredCollectorWorkersCountDefault if the value is not a positive number.
|
||||
func (x *Config) ExpiredCollectorWorkerCount() int {
|
||||
func (x *Config) ExpiredCollectorWorkersCount() int {
|
||||
s := config.IntSafe(
|
||||
(*config.Config)(x),
|
||||
"expired_collector_worker_count",
|
||||
"expired_collector_workers_count",
|
||||
)
|
||||
|
||||
if s > 0 {
|
||||
|
|
|
@ -106,13 +106,13 @@ func (x *Config) MaxObjectSize() uint64 {
|
|||
return MaxSizeDefault
|
||||
}
|
||||
|
||||
// WorkerCount returns the value of "flush_worker_count" config parameter.
|
||||
// WorkersNumber returns the value of "workers_number" config parameter.
|
||||
//
|
||||
// Returns WorkersNumberDefault if the value is not a positive number.
|
||||
func (x *Config) WorkerCount() int {
|
||||
func (x *Config) WorkersNumber() int {
|
||||
c := config.IntSafe(
|
||||
(*config.Config)(x),
|
||||
"flush_worker_count",
|
||||
"workers_number",
|
||||
)
|
||||
|
||||
if c > 0 {
|
||||
|
|
|
@ -28,11 +28,11 @@ func Put(c *config.Config) PutConfig {
|
|||
}
|
||||
}
|
||||
|
||||
// PoolSizeRemote returns the value of "remote_pool_size" config parameter.
|
||||
// PoolSizeRemote returns the value of "pool_size_remote" config parameter.
|
||||
//
|
||||
// Returns PutPoolSizeDefault if the value is not a positive number.
|
||||
func (g PutConfig) PoolSizeRemote() int {
|
||||
v := config.Int(g.cfg, "remote_pool_size")
|
||||
v := config.Int(g.cfg, "pool_size_remote")
|
||||
if v > 0 {
|
||||
return int(v)
|
||||
}
|
||||
|
@ -40,11 +40,11 @@ func (g PutConfig) PoolSizeRemote() int {
|
|||
return PutPoolSizeDefault
|
||||
}
|
||||
|
||||
// PoolSizeLocal returns the value of "local_pool_size" config parameter.
|
||||
// PoolSizeLocal returns the value of "pool_size_local" config parameter.
|
||||
//
|
||||
// Returns PutPoolSizeDefault if the value is not a positive number.
|
||||
func (g PutConfig) PoolSizeLocal() int {
|
||||
v := config.Int(g.cfg, "local_pool_size")
|
||||
v := config.Int(g.cfg, "pool_size_local")
|
||||
if v > 0 {
|
||||
return int(v)
|
||||
}
|
||||
|
|
|
@ -84,8 +84,8 @@ FROSTFS_REPLICATOR_PUT_TIMEOUT=15s
|
|||
FROSTFS_REPLICATOR_POOL_SIZE=10
|
||||
|
||||
# Object service section
|
||||
FROSTFS_OBJECT_PUT_REMOTE_POOL_SIZE=100
|
||||
FROSTFS_OBJECT_PUT_LOCAL_POOL_SIZE=200
|
||||
FROSTFS_OBJECT_PUT_POOL_SIZE_REMOTE=100
|
||||
FROSTFS_OBJECT_PUT_POOL_SIZE_LOCAL=200
|
||||
FROSTFS_OBJECT_PUT_SKIP_SESSION_TOKEN_ISSUER_VERIFICATION=true
|
||||
FROSTFS_OBJECT_DELETE_TOMBSTONE_LIFETIME=10
|
||||
|
||||
|
@ -103,7 +103,7 @@ FROSTFS_STORAGE_SHARD_0_WRITECACHE_NO_SYNC=true
|
|||
FROSTFS_STORAGE_SHARD_0_WRITECACHE_PATH=tmp/0/cache
|
||||
FROSTFS_STORAGE_SHARD_0_WRITECACHE_SMALL_OBJECT_SIZE=16384
|
||||
FROSTFS_STORAGE_SHARD_0_WRITECACHE_MAX_OBJECT_SIZE=134217728
|
||||
FROSTFS_STORAGE_SHARD_0_WRITECACHE_FLUSH_WORKER_COUNT=30
|
||||
FROSTFS_STORAGE_SHARD_0_WRITECACHE_WORKERS_NUMBER=30
|
||||
FROSTFS_STORAGE_SHARD_0_WRITECACHE_CAPACITY=3221225472
|
||||
### Metabase config
|
||||
FROSTFS_STORAGE_SHARD_0_METABASE_PATH=tmp/0/meta
|
||||
|
@ -142,7 +142,7 @@ FROSTFS_STORAGE_SHARD_0_GC_REMOVER_SLEEP_INTERVAL=2m
|
|||
#### Limit of objects to be marked expired by the garbage collector
|
||||
FROSTFS_STORAGE_SHARD_0_GC_EXPIRED_COLLECTOR_BATCH_SIZE=1500
|
||||
#### Limit of concurrent workers collecting expired objects by the garbage collector
|
||||
FROSTFS_STORAGE_SHARD_0_GC_EXPIRED_COLLECTOR_WORKER_COUNT=15
|
||||
FROSTFS_STORAGE_SHARD_0_GC_EXPIRED_COLLECTOR_WORKERS_COUNT=15
|
||||
|
||||
## 1 shard
|
||||
### Flag to refill Metabase from BlobStor
|
||||
|
@ -154,7 +154,7 @@ FROSTFS_STORAGE_SHARD_1_WRITECACHE_ENABLED=true
|
|||
FROSTFS_STORAGE_SHARD_1_WRITECACHE_PATH=tmp/1/cache
|
||||
FROSTFS_STORAGE_SHARD_1_WRITECACHE_SMALL_OBJECT_SIZE=16384
|
||||
FROSTFS_STORAGE_SHARD_1_WRITECACHE_MAX_OBJECT_SIZE=134217728
|
||||
FROSTFS_STORAGE_SHARD_1_WRITECACHE_FLUSH_WORKER_COUNT=30
|
||||
FROSTFS_STORAGE_SHARD_1_WRITECACHE_WORKERS_NUMBER=30
|
||||
FROSTFS_STORAGE_SHARD_1_WRITECACHE_CAPACITY=4294967296
|
||||
### Metabase config
|
||||
FROSTFS_STORAGE_SHARD_1_METABASE_PATH=tmp/1/meta
|
||||
|
|
|
@ -129,8 +129,8 @@
|
|||
"tombstone_lifetime": 10
|
||||
},
|
||||
"put": {
|
||||
"remote_pool_size": 100,
|
||||
"local_pool_size": 200,
|
||||
"pool_size_remote": 100,
|
||||
"pool_size_local": 200,
|
||||
"skip_session_token_issuer_verification": true
|
||||
}
|
||||
},
|
||||
|
@ -147,7 +147,7 @@
|
|||
"path": "tmp/0/cache",
|
||||
"small_object_size": 16384,
|
||||
"max_object_size": 134217728,
|
||||
"flush_worker_count": 30,
|
||||
"workers_number": 30,
|
||||
"capacity": 3221225472
|
||||
},
|
||||
"metabase": {
|
||||
|
@ -190,7 +190,7 @@
|
|||
"remover_batch_size": 150,
|
||||
"remover_sleep_interval": "2m",
|
||||
"expired_collector_batch_size": 1500,
|
||||
"expired_collector_worker_count": 15
|
||||
"expired_collector_workers_count": 15
|
||||
}
|
||||
},
|
||||
"1": {
|
||||
|
@ -203,7 +203,7 @@
|
|||
"memcache_capacity": 2147483648,
|
||||
"small_object_size": 16384,
|
||||
"max_object_size": 134217728,
|
||||
"flush_worker_count": 30,
|
||||
"workers_number": 30,
|
||||
"capacity": 4294967296
|
||||
},
|
||||
"metabase": {
|
||||
|
|
|
@ -108,8 +108,8 @@ object:
|
|||
delete:
|
||||
tombstone_lifetime: 10 # tombstone "local" lifetime in epochs
|
||||
put:
|
||||
remote_pool_size: 100 # number of async workers for remote PUT operations
|
||||
local_pool_size: 200 # number of async workers for local PUT operations
|
||||
pool_size_remote: 100 # number of async workers for remote PUT operations
|
||||
pool_size_local: 200 # number of async workers for local PUT operations
|
||||
skip_session_token_issuer_verification: true # session token issuer verification will be skipped if true
|
||||
|
||||
storage:
|
||||
|
@ -126,7 +126,7 @@ storage:
|
|||
type: bbolt
|
||||
small_object_size: 16k # size threshold for "small" objects which are cached in key-value DB, not in FS, bytes
|
||||
max_object_size: 134217728 # size threshold for "big" objects which bypass write-cache and go to the storage directly, bytes
|
||||
flush_worker_count: 30 # number of write-cache flusher threads
|
||||
workers_number: 30 # number of write-cache flusher threads
|
||||
|
||||
metabase:
|
||||
perm: 0644 # permissions for metabase files(directories: +x for current user and group)
|
||||
|
@ -196,7 +196,7 @@ storage:
|
|||
remover_batch_size: 150 # number of objects to be removed by the garbage collector
|
||||
remover_sleep_interval: 2m # frequency of the garbage collector invocation
|
||||
expired_collector_batch_size: 1500 # number of objects to be marked expired by the garbage collector
|
||||
expired_collector_worker_count: 15 # number of concurrent workers collecting expired objects by the garbage collector
|
||||
expired_collector_workers_count: 15 # number of concurrent workers collecting expired objects by the garbage collector
|
||||
|
||||
1:
|
||||
writecache:
|
||||
|
|
|
@ -50,8 +50,8 @@ prometheus:
|
|||
|
||||
object:
|
||||
put:
|
||||
remote_pool_size: 100
|
||||
local_pool_size: 100
|
||||
pool_size_remote: 100
|
||||
pool_size_local: 100
|
||||
|
||||
morph:
|
||||
rpc_endpoint:
|
||||
|
|
|
@ -243,7 +243,7 @@ gc:
|
|||
remover_batch_size: 200
|
||||
remover_sleep_interval: 5m
|
||||
expired_collector_batch_size: 500
|
||||
expired_collector_worker_count: 5
|
||||
expired_collector_workers_count: 5
|
||||
```
|
||||
|
||||
| Parameter | Type | Default value | Description |
|
||||
|
@ -251,7 +251,7 @@ gc:
|
|||
| `remover_batch_size` | `int` | `100` | Amount of objects to grab in a single batch. |
|
||||
| `remover_sleep_interval` | `duration` | `1m` | Time to sleep between iterations. |
|
||||
| `expired_collector_batch_size` | `int` | `500` | Max amount of expired objects to grab in a single batch. |
|
||||
| `expired_collector_worker_count` | `int` | `5` | Max amount of concurrent expired objects workers. |
|
||||
| `expired_collector_workers_count` | `int` | `5` | Max amount of concurrent expired objects workers. |
|
||||
|
||||
### `metabase` subsection
|
||||
|
||||
|
@ -280,7 +280,7 @@ writecache:
|
|||
capacity: 4294967296
|
||||
small_object_size: 16384
|
||||
max_object_size: 134217728
|
||||
flush_worker_count: 30
|
||||
workers_number: 30
|
||||
```
|
||||
|
||||
| Parameter | Type | Default value | Description |
|
||||
|
@ -290,7 +290,7 @@ writecache:
|
|||
| `capacity` | `size` | unrestricted | Approximate maximum size of the writecache. If the writecache is full, objects are written to the blobstor directly. |
|
||||
| `small_object_size` | `size` | `32K` | Maximum object size for "small" objects. This objects are stored in a key-value database instead of a file-system. |
|
||||
| `max_object_size` | `size` | `64M` | Maximum object size allowed to be stored in the writecache. |
|
||||
| `flush_worker_count` | `int` | `20` | Amount of background workers that move data from the writecache to the blobstor. |
|
||||
| `workers_number` | `int` | `20` | Amount of background workers that move data from the writecache to the blobstor. |
|
||||
| `max_batch_size` | `int` | `1000` | Maximum amount of small object `PUT` operations to perform in a single transaction. |
|
||||
| `max_batch_delay` | `duration` | `10ms` | Maximum delay before a batch starts. |
|
||||
|
||||
|
@ -415,7 +415,7 @@ replicator:
|
|||
| Parameter | Type | Default value | Description |
|
||||
|---------------|------------|----------------------------------------|---------------------------------------------|
|
||||
| `put_timeout` | `duration` | `5s` | Timeout for performing the `PUT` operation. |
|
||||
| `pool_size` | `int` | Equal to `object.put.remote_pool_size` | Maximum amount of concurrent replications. |
|
||||
| `pool_size` | `int` | Equal to `object.put.pool_size_remote` | Maximum amount of concurrent replications. |
|
||||
|
||||
# `object` section
|
||||
Contains object-service related parameters.
|
||||
|
@ -423,14 +423,14 @@ Contains object-service related parameters.
|
|||
```yaml
|
||||
object:
|
||||
put:
|
||||
remote_pool_size: 100
|
||||
pool_size_remote: 100
|
||||
```
|
||||
|
||||
| Parameter | Type | Default value | Description |
|
||||
|-----------------------------|-------|---------------|------------------------------------------------------------------------------------------------|
|
||||
| `delete.tombstone_lifetime` | `int` | `5` | Tombstone lifetime for removed objects in epochs. |
|
||||
| `put.remote_pool_size` | `int` | `10` | Max pool size for performing remote `PUT` operations. Used by Policer and Replicator services. |
|
||||
| `put.local_pool_size` | `int` | `10` | Max pool size for performing local `PUT` operations. Used by Policer and Replicator services. |
|
||||
| `put.pool_size_remote` | `int` | `10` | Max pool size for performing remote `PUT` operations. Used by Policer and Replicator services. |
|
||||
| `put.pool_size_local` | `int` | `10` | Max pool size for performing local `PUT` operations. Used by Policer and Replicator services. |
|
||||
|
||||
# `runtime` section
|
||||
Contains runtime parameters.
|
||||
|
|
12
go.mod
12
go.mod
|
@ -6,7 +6,7 @@ require (
|
|||
git.frostfs.info/TrueCloudLab/frostfs-api-go/v2 v2.16.1-0.20231031104748-498877e378fd
|
||||
git.frostfs.info/TrueCloudLab/frostfs-contract v0.18.1-0.20231102065436-9ed3845aa989
|
||||
git.frostfs.info/TrueCloudLab/frostfs-observability v0.0.0-20231101111734-b3ad3335ff65
|
||||
git.frostfs.info/TrueCloudLab/frostfs-sdk-go v0.0.0-20231122162120-56debcfa569e
|
||||
git.frostfs.info/TrueCloudLab/frostfs-sdk-go v0.0.0-20231114081800-3787477133f3
|
||||
git.frostfs.info/TrueCloudLab/hrw v1.2.1
|
||||
git.frostfs.info/TrueCloudLab/policy-engine v0.0.0-20231115094736-5db67021e10f
|
||||
git.frostfs.info/TrueCloudLab/tzhash v1.8.0
|
||||
|
@ -27,11 +27,11 @@ require (
|
|||
github.com/paulmach/orb v0.9.2
|
||||
github.com/prometheus/client_golang v1.16.0
|
||||
github.com/spf13/cast v1.5.1
|
||||
github.com/spf13/cobra v1.8.0
|
||||
github.com/spf13/cobra v1.7.0
|
||||
github.com/spf13/pflag v1.0.5
|
||||
github.com/spf13/viper v1.16.0
|
||||
github.com/stretchr/testify v1.8.4
|
||||
go.etcd.io/bbolt v1.3.8
|
||||
go.etcd.io/bbolt v1.3.7
|
||||
go.opentelemetry.io/otel v1.16.0
|
||||
go.opentelemetry.io/otel/trace v1.16.0
|
||||
go.uber.org/zap v1.26.0
|
||||
|
@ -63,7 +63,7 @@ require (
|
|||
github.com/cespare/xxhash/v2 v2.2.0 // indirect
|
||||
github.com/consensys/bavard v0.1.13 // indirect
|
||||
github.com/consensys/gnark-crypto v0.12.2-0.20231013160410-1f65e75b6dfb // indirect
|
||||
github.com/cpuguy83/go-md2man/v2 v2.0.3 // indirect
|
||||
github.com/cpuguy83/go-md2man/v2 v2.0.2 // indirect
|
||||
github.com/davecgh/go-spew v1.1.1 // indirect
|
||||
github.com/decred/dcrd/dcrec/secp256k1/v4 v4.2.0 // indirect
|
||||
github.com/dgraph-io/badger/v4 v4.1.0
|
||||
|
@ -124,8 +124,8 @@ require (
|
|||
go.uber.org/multierr v1.11.0 // indirect
|
||||
golang.org/x/crypto v0.14.0 // indirect
|
||||
golang.org/x/net v0.17.0 // indirect
|
||||
golang.org/x/sys v0.14.0 // indirect
|
||||
golang.org/x/text v0.14.0 // indirect
|
||||
golang.org/x/sys v0.13.0 // indirect
|
||||
golang.org/x/text v0.13.0 // indirect
|
||||
google.golang.org/genproto/googleapis/api v0.0.0-20230822172742-b8732ec3820d // indirect
|
||||
google.golang.org/genproto/googleapis/rpc v0.0.0-20231016165738-49dd2c1f3d0b // indirect
|
||||
gopkg.in/ini.v1 v1.67.0 // indirect
|
||||
|
|
BIN
go.sum
BIN
go.sum
Binary file not shown.
|
@ -64,9 +64,6 @@ const (
|
|||
TreeCouldNotUpdateLastSynchronizedHeightForATree = "could not update last synchronized height for a tree"
|
||||
TreeSynchronizeTree = "synchronize tree"
|
||||
TreeFailedToRunTreeSynchronizationOverAllNodes = "failed to run tree synchronization over all nodes"
|
||||
TreeFailedToRunTreeSynchronizationForSpecificNode = "failed to run tree synchronization for specific node"
|
||||
TreeFailedToParseAddressForTreeSynchronization = "failed to parse address for tree synchronization"
|
||||
TreeFailedToConnectForTreeSynchronization = "failed to connect for tree synchronization"
|
||||
TreeSyncingTrees = "syncing trees..."
|
||||
TreeCouldNotFetchContainers = "could not fetch containers"
|
||||
TreeTreesHaveBeenSynchronized = "trees have been synchronized"
|
||||
|
|
|
@ -165,7 +165,7 @@ func (v *FormatValidator) validateSignatureKey(obj *objectSDK.Object) error {
|
|||
}
|
||||
|
||||
token := obj.SessionToken()
|
||||
ownerID := obj.OwnerID()
|
||||
ownerID := *obj.OwnerID()
|
||||
|
||||
if token == nil || !token.AssertAuthKey(&key) {
|
||||
return v.checkOwnerKey(ownerID, key)
|
||||
|
@ -412,7 +412,7 @@ func (v *FormatValidator) checkAttributes(obj *objectSDK.Object) error {
|
|||
var errIncorrectOwner = errors.New("incorrect object owner")
|
||||
|
||||
func (v *FormatValidator) checkOwner(obj *objectSDK.Object) error {
|
||||
if idOwner := obj.OwnerID(); idOwner.IsEmpty() {
|
||||
if idOwner := obj.OwnerID(); idOwner == nil || len(idOwner.WalletBytes()) == 0 {
|
||||
return errIncorrectOwner
|
||||
}
|
||||
|
||||
|
|
|
@ -32,7 +32,7 @@ func blankValidObject(key *ecdsa.PrivateKey) *objectSDK.Object {
|
|||
|
||||
obj := objectSDK.New()
|
||||
obj.SetContainerID(cidtest.ID())
|
||||
obj.SetOwnerID(idOwner)
|
||||
obj.SetOwnerID(&idOwner)
|
||||
|
||||
return obj
|
||||
}
|
||||
|
@ -107,7 +107,7 @@ func TestFormatValidator_Validate(t *testing.T) {
|
|||
obj := objectSDK.New()
|
||||
obj.SetContainerID(cidtest.ID())
|
||||
obj.SetSessionToken(tok)
|
||||
obj.SetOwnerID(idOwner)
|
||||
obj.SetOwnerID(&idOwner)
|
||||
|
||||
require.NoError(t, objectSDK.SetIDWithSignature(ownerKey.PrivateKey, obj))
|
||||
|
||||
|
@ -303,7 +303,7 @@ func TestFormatValidator_ValidateTokenIssuer(t *testing.T) {
|
|||
obj := objectSDK.New()
|
||||
obj.SetContainerID(cidtest.ID())
|
||||
obj.SetSessionToken(tok)
|
||||
obj.SetOwnerID(owner)
|
||||
obj.SetOwnerID(&owner)
|
||||
require.NoError(t, objectSDK.SetIDWithSignature(signer.PrivateKey, obj))
|
||||
|
||||
require.NoError(t, v.Validate(context.Background(), obj, false))
|
||||
|
@ -352,7 +352,7 @@ func TestFormatValidator_ValidateTokenIssuer(t *testing.T) {
|
|||
obj := objectSDK.New()
|
||||
obj.SetContainerID(cnrID)
|
||||
obj.SetSessionToken(tok)
|
||||
obj.SetOwnerID(owner)
|
||||
obj.SetOwnerID(&owner)
|
||||
require.NoError(t, objectSDK.SetIDWithSignature(signer.PrivateKey, obj))
|
||||
|
||||
require.NoError(t, v.Validate(context.Background(), obj, false))
|
||||
|
@ -386,7 +386,7 @@ func TestFormatValidator_ValidateTokenIssuer(t *testing.T) {
|
|||
obj := objectSDK.New()
|
||||
obj.SetContainerID(cnrID)
|
||||
obj.SetSessionToken(tok)
|
||||
obj.SetOwnerID(owner)
|
||||
obj.SetOwnerID(&owner)
|
||||
require.NoError(t, objectSDK.SetIDWithSignature(signer.PrivateKey, obj))
|
||||
|
||||
v := NewFormatValidator(
|
||||
|
@ -459,7 +459,7 @@ func TestFormatValidator_ValidateTokenIssuer(t *testing.T) {
|
|||
obj := objectSDK.New()
|
||||
obj.SetContainerID(cnrID)
|
||||
obj.SetSessionToken(tok)
|
||||
obj.SetOwnerID(owner)
|
||||
obj.SetOwnerID(&owner)
|
||||
require.NoError(t, objectSDK.SetIDWithSignature(signer.PrivateKey, obj))
|
||||
|
||||
v := NewFormatValidator(
|
||||
|
@ -535,7 +535,7 @@ func TestFormatValidator_ValidateTokenIssuer(t *testing.T) {
|
|||
obj := objectSDK.New()
|
||||
obj.SetContainerID(cnrID)
|
||||
obj.SetSessionToken(tok)
|
||||
obj.SetOwnerID(owner)
|
||||
obj.SetOwnerID(&owner)
|
||||
require.NoError(t, objectSDK.SetIDWithSignature(signer.PrivateKey, obj))
|
||||
|
||||
v := NewFormatValidator(
|
||||
|
|
|
@ -36,7 +36,6 @@ func TestStorageEngine_Inhume(t *testing.T) {
|
|||
link.SetSplitID(splitID)
|
||||
|
||||
t.Run("delete small object", func(t *testing.T) {
|
||||
t.Parallel()
|
||||
e := testNewEngine(t).setShardsNum(t, 1).engine
|
||||
defer e.Close(context.Background())
|
||||
|
||||
|
@ -55,7 +54,6 @@ func TestStorageEngine_Inhume(t *testing.T) {
|
|||
})
|
||||
|
||||
t.Run("delete big object", func(t *testing.T) {
|
||||
t.Parallel()
|
||||
s1 := testNewShard(t, 1)
|
||||
s2 := testNewShard(t, 2)
|
||||
|
||||
|
|
|
@ -12,65 +12,88 @@ import (
|
|||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
func TestShard_Delete_SmallObject(t *testing.T) {
|
||||
t.Run("small object without write cache", func(t *testing.T) {
|
||||
func TestShard_Delete(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
t.Run("without write cache", func(t *testing.T) {
|
||||
t.Parallel()
|
||||
testShard(t, false, 1<<5)
|
||||
testShardDelete(t, false)
|
||||
})
|
||||
|
||||
t.Run("small object with write cache", func(t *testing.T) {
|
||||
t.Run("with write cache", func(t *testing.T) {
|
||||
t.Parallel()
|
||||
testShard(t, true, 1<<5)
|
||||
testShardDelete(t, true)
|
||||
})
|
||||
}
|
||||
|
||||
func TestShard_Delete_BigObject(t *testing.T) {
|
||||
t.Run("big object without write cache", func(t *testing.T) {
|
||||
t.Parallel()
|
||||
testShard(t, false, 1<<20)
|
||||
})
|
||||
|
||||
t.Run("big object with write cache", func(t *testing.T) {
|
||||
t.Parallel()
|
||||
testShard(t, true, 1<<20)
|
||||
})
|
||||
}
|
||||
|
||||
func testShard(t *testing.T, hasWriteCache bool, payloadSize int) {
|
||||
func testShardDelete(t *testing.T, hasWriteCache bool) {
|
||||
sh := newShard(t, hasWriteCache)
|
||||
|
||||
cnr := cidtest.ID()
|
||||
|
||||
obj := testutil.GenerateObjectWithCID(cnr)
|
||||
testutil.AddAttribute(obj, "foo", "bar")
|
||||
testutil.AddPayload(obj, payloadSize)
|
||||
|
||||
var putPrm PutPrm
|
||||
putPrm.SetObject(obj)
|
||||
|
||||
var getPrm GetPrm
|
||||
getPrm.SetAddress(object.AddressOf(obj))
|
||||
|
||||
var delPrm DeletePrm
|
||||
delPrm.SetAddresses(object.AddressOf(obj))
|
||||
t.Run("big object", func(t *testing.T) {
|
||||
testutil.AddPayload(obj, 1<<20)
|
||||
|
||||
_, err := sh.Put(context.Background(), putPrm)
|
||||
require.NoError(t, err)
|
||||
putPrm.SetObject(obj)
|
||||
getPrm.SetAddress(object.AddressOf(obj))
|
||||
|
||||
_, err = testGet(t, sh, getPrm, hasWriteCache)
|
||||
require.NoError(t, err)
|
||||
var delPrm DeletePrm
|
||||
delPrm.SetAddresses(object.AddressOf(obj))
|
||||
|
||||
if hasWriteCache {
|
||||
sh.FlushWriteCache(context.Background(), FlushWriteCachePrm{ignoreErrors: false})
|
||||
require.Eventually(t, func() bool {
|
||||
_, err = sh.Delete(context.Background(), delPrm)
|
||||
return err == nil
|
||||
}, 30*time.Second, 10*time.Millisecond)
|
||||
} else {
|
||||
_, err = sh.Delete(context.Background(), delPrm)
|
||||
_, err := sh.Put(context.Background(), putPrm)
|
||||
require.NoError(t, err)
|
||||
}
|
||||
|
||||
_, err = sh.Get(context.Background(), getPrm)
|
||||
require.True(t, client.IsErrObjectNotFound(err))
|
||||
_, err = testGet(t, sh, getPrm, hasWriteCache)
|
||||
require.NoError(t, err)
|
||||
|
||||
if hasWriteCache {
|
||||
require.Eventually(t, func() bool {
|
||||
_, err = sh.Delete(context.Background(), delPrm)
|
||||
return err == nil
|
||||
}, 30*time.Second, 100*time.Millisecond)
|
||||
} else {
|
||||
_, err = sh.Delete(context.Background(), delPrm)
|
||||
require.NoError(t, err)
|
||||
}
|
||||
|
||||
_, err = sh.Get(context.Background(), getPrm)
|
||||
require.True(t, client.IsErrObjectNotFound(err))
|
||||
})
|
||||
|
||||
t.Run("small object", func(t *testing.T) {
|
||||
obj := testutil.GenerateObjectWithCID(cnr)
|
||||
testutil.AddAttribute(obj, "foo", "bar")
|
||||
testutil.AddPayload(obj, 1<<5)
|
||||
|
||||
putPrm.SetObject(obj)
|
||||
getPrm.SetAddress(object.AddressOf(obj))
|
||||
|
||||
var delPrm DeletePrm
|
||||
delPrm.SetAddresses(object.AddressOf(obj))
|
||||
|
||||
_, err := sh.Put(context.Background(), putPrm)
|
||||
require.NoError(t, err)
|
||||
|
||||
_, err = sh.Get(context.Background(), getPrm)
|
||||
require.NoError(t, err)
|
||||
|
||||
if hasWriteCache {
|
||||
require.Eventually(t, func() bool {
|
||||
_, err = sh.Delete(context.Background(), delPrm)
|
||||
return err == nil
|
||||
}, 10*time.Second, 100*time.Millisecond)
|
||||
} else {
|
||||
_, err = sh.Delete(context.Background(), delPrm)
|
||||
require.NoError(t, err)
|
||||
}
|
||||
|
||||
_, err = sh.Get(context.Background(), getPrm)
|
||||
require.True(t, client.IsErrObjectNotFound(err))
|
||||
})
|
||||
}
|
||||
|
|
|
@ -115,8 +115,8 @@ type gcCfg struct {
|
|||
|
||||
workerPoolInit func(int) util.WorkerPool
|
||||
|
||||
expiredCollectorWorkerCount int
|
||||
expiredCollectorBatchSize int
|
||||
expiredCollectorWorkersCount int
|
||||
expiredCollectorBatchSize int
|
||||
|
||||
metrics GCMectrics
|
||||
|
||||
|
@ -313,16 +313,16 @@ func (s *Shard) removeGarbage(pctx context.Context) (result gcRunResult) {
|
|||
return
|
||||
}
|
||||
|
||||
func (s *Shard) getExpiredObjectsParameters() (workerCount, batchSize int) {
|
||||
workerCount = minExpiredWorkers
|
||||
func (s *Shard) getExpiredObjectsParameters() (workersCount, batchSize int) {
|
||||
workersCount = minExpiredWorkers
|
||||
batchSize = minExpiredBatchSize
|
||||
|
||||
if s.gc.gcCfg.expiredCollectorBatchSize > batchSize {
|
||||
batchSize = s.gc.gcCfg.expiredCollectorBatchSize
|
||||
}
|
||||
|
||||
if s.gc.gcCfg.expiredCollectorWorkerCount > workerCount {
|
||||
workerCount = s.gc.gcCfg.expiredCollectorWorkerCount
|
||||
if s.gc.gcCfg.expiredCollectorWorkersCount > workersCount {
|
||||
workersCount = s.gc.gcCfg.expiredCollectorWorkersCount
|
||||
}
|
||||
return
|
||||
}
|
||||
|
|
|
@ -365,11 +365,11 @@ func WithExpiredCollectorBatchSize(size int) Option {
|
|||
}
|
||||
}
|
||||
|
||||
// WithExpiredCollectorWorkerCount returns option to set concurrent
|
||||
// WithExpiredCollectorWorkersCount returns option to set concurrent
|
||||
// workers count of expired object collection operation.
|
||||
func WithExpiredCollectorWorkerCount(count int) Option {
|
||||
func WithExpiredCollectorWorkersCount(count int) Option {
|
||||
return func(c *cfg) {
|
||||
c.gcCfg.expiredCollectorWorkerCount = count
|
||||
c.gcCfg.expiredCollectorWorkersCount = count
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -39,14 +39,14 @@ func TestStickyCheck(t *testing.T) {
|
|||
info.SetSenderKey(make([]byte, 33)) // any non-empty key
|
||||
info.SetRequestRole(acl.RoleContainer)
|
||||
|
||||
require.True(t, checker.StickyBitCheck(info, usertest.ID()))
|
||||
require.True(t, checker.StickyBitCheck(info, *usertest.ID()))
|
||||
|
||||
var basicACL acl.Basic
|
||||
basicACL.MakeSticky()
|
||||
|
||||
info.SetBasicACL(basicACL)
|
||||
|
||||
require.True(t, checker.StickyBitCheck(info, usertest.ID()))
|
||||
require.True(t, checker.StickyBitCheck(info, *usertest.ID()))
|
||||
})
|
||||
|
||||
t.Run("owner ID and/or public key emptiness", func(t *testing.T) {
|
||||
|
@ -72,7 +72,7 @@ func TestStickyCheck(t *testing.T) {
|
|||
var ownerID user.ID
|
||||
|
||||
if withOwner {
|
||||
ownerID = usertest.ID()
|
||||
ownerID = *usertest.ID()
|
||||
}
|
||||
|
||||
require.Equal(t, expected, checker.StickyBitCheck(info, ownerID))
|
||||
|
|
|
@ -62,8 +62,8 @@ func headersFromObject(obj *objectSDK.Object, cnr cid.ID, oid *oid.ID) []eaclSDK
|
|||
res = append(res, oidHeader(*oid))
|
||||
}
|
||||
|
||||
if idOwner := obj.OwnerID(); !idOwner.IsEmpty() {
|
||||
res = append(res, ownerIDHeader(idOwner))
|
||||
if idOwner := obj.OwnerID(); idOwner != nil {
|
||||
res = append(res, ownerIDHeader(*idOwner))
|
||||
}
|
||||
|
||||
cs, ok := obj.PayloadChecksum()
|
||||
|
|
|
@ -176,11 +176,11 @@ func (exec *execCtx) initTombstoneObject() error {
|
|||
tokenSession := exec.commonParameters().SessionToken()
|
||||
if tokenSession != nil {
|
||||
issuer := tokenSession.Issuer()
|
||||
exec.tombstoneObj.SetOwnerID(issuer)
|
||||
exec.tombstoneObj.SetOwnerID(&issuer)
|
||||
} else {
|
||||
// make local node a tombstone object owner
|
||||
localUser := exec.svc.netInfo.LocalNodeID()
|
||||
exec.tombstoneObj.SetOwnerID(localUser)
|
||||
exec.tombstoneObj.SetOwnerID(&localUser)
|
||||
}
|
||||
|
||||
var a objectSDK.Attribute
|
||||
|
|
|
@ -112,7 +112,7 @@ func (p *Streamer) initTrustedTarget(prm *PutInitPrm) error {
|
|||
// If it isn't owner key, replication attempts will fail, thus this check.
|
||||
if sToken == nil {
|
||||
ownerObj := prm.hdr.OwnerID()
|
||||
if ownerObj.IsEmpty() {
|
||||
if ownerObj == nil {
|
||||
return errors.New("missing object owner")
|
||||
}
|
||||
|
||||
|
|
|
@ -25,7 +25,7 @@ func TestNewKeyStorage(t *testing.T) {
|
|||
tokenStor := tokenStorage.NewTokenStore()
|
||||
stor := util.NewKeyStorage(&nodeKey.PrivateKey, tokenStor, mockedNetworkState{42})
|
||||
|
||||
owner := usertest.ID()
|
||||
owner := *usertest.ID()
|
||||
|
||||
t.Run("node key", func(t *testing.T) {
|
||||
key, err := stor.GetKey(nil)
|
||||
|
@ -36,7 +36,7 @@ func TestNewKeyStorage(t *testing.T) {
|
|||
t.Run("unknown token", func(t *testing.T) {
|
||||
_, err = stor.GetKey(&util.SessionInfo{
|
||||
ID: uuid.New(),
|
||||
Owner: usertest.ID(),
|
||||
Owner: *usertest.ID(),
|
||||
})
|
||||
require.Error(t, err)
|
||||
})
|
||||
|
|
|
@ -22,7 +22,7 @@ func TestTokenStore(t *testing.T) {
|
|||
|
||||
defer ts.Close()
|
||||
|
||||
owner := usertest.ID()
|
||||
owner := *usertest.ID()
|
||||
|
||||
var ownerV2 refs.OwnerID
|
||||
owner.WriteToV2(&ownerV2)
|
||||
|
@ -66,7 +66,7 @@ func TestTokenStore_Persistent(t *testing.T) {
|
|||
ts, err := NewTokenStore(path)
|
||||
require.NoError(t, err)
|
||||
|
||||
idOwner := usertest.ID()
|
||||
idOwner := *usertest.ID()
|
||||
|
||||
var idOwnerV2 refs.OwnerID
|
||||
idOwner.WriteToV2(&idOwnerV2)
|
||||
|
@ -127,7 +127,7 @@ func TestTokenStore_RemoveOld(t *testing.T) {
|
|||
|
||||
defer ts.Close()
|
||||
|
||||
owner := usertest.ID()
|
||||
owner := *usertest.ID()
|
||||
|
||||
var ownerV2 refs.OwnerID
|
||||
owner.WriteToV2(&ownerV2)
|
||||
|
|
|
@ -9,7 +9,6 @@ import (
|
|||
"math"
|
||||
"math/rand"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
|
||||
|
@ -217,44 +216,45 @@ func (s *Service) applyOperationStream(ctx context.Context, cid cid.ID, treeID s
|
|||
}
|
||||
|
||||
func (s *Service) startStream(ctx context.Context, cid cid.ID, treeID string,
|
||||
height uint64, cc *grpc.ClientConn, opsCh chan<- *pilorama.Move,
|
||||
) error {
|
||||
treeClient := NewTreeServiceClient(cc)
|
||||
|
||||
height uint64, treeClient TreeServiceClient, opsCh chan<- *pilorama.Move,
|
||||
) (uint64, error) {
|
||||
rawCID := make([]byte, sha256.Size)
|
||||
cid.Encode(rawCID)
|
||||
|
||||
req := &GetOpLogRequest{
|
||||
Body: &GetOpLogRequest_Body{
|
||||
ContainerId: rawCID,
|
||||
TreeId: treeID,
|
||||
Height: height,
|
||||
},
|
||||
}
|
||||
if err := SignMessage(req, s.key); err != nil {
|
||||
return err
|
||||
}
|
||||
for {
|
||||
newHeight := height
|
||||
req := &GetOpLogRequest{
|
||||
Body: &GetOpLogRequest_Body{
|
||||
ContainerId: rawCID,
|
||||
TreeId: treeID,
|
||||
Height: newHeight,
|
||||
},
|
||||
}
|
||||
if err := SignMessage(req, s.key); err != nil {
|
||||
return 0, err
|
||||
}
|
||||
|
||||
c, err := treeClient.GetOpLog(ctx, req)
|
||||
if err != nil {
|
||||
return fmt.Errorf("can't initialize client: %w", err)
|
||||
}
|
||||
res, err := c.Recv()
|
||||
for ; err == nil; res, err = c.Recv() {
|
||||
lm := res.GetBody().GetOperation()
|
||||
m := &pilorama.Move{
|
||||
Parent: lm.ParentId,
|
||||
Child: lm.ChildId,
|
||||
c, err := treeClient.GetOpLog(ctx, req)
|
||||
if err != nil {
|
||||
return 0, fmt.Errorf("can't initialize client: %w", err)
|
||||
}
|
||||
if err := m.Meta.FromBytes(lm.Meta); err != nil {
|
||||
return err
|
||||
res, err := c.Recv()
|
||||
for ; err == nil; res, err = c.Recv() {
|
||||
lm := res.GetBody().GetOperation()
|
||||
m := &pilorama.Move{
|
||||
Parent: lm.ParentId,
|
||||
Child: lm.ChildId,
|
||||
}
|
||||
if err := m.Meta.FromBytes(lm.Meta); err != nil {
|
||||
return 0, err
|
||||
}
|
||||
opsCh <- m
|
||||
}
|
||||
opsCh <- m
|
||||
if height == newHeight || err != nil && !errors.Is(err, io.EOF) {
|
||||
return newHeight, err
|
||||
}
|
||||
height = newHeight
|
||||
}
|
||||
if err != nil && !errors.Is(err, io.EOF) {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// synchronizeTree synchronizes operations getting them from different nodes.
|
||||
|
@ -287,44 +287,50 @@ func (s *Service) synchronizeTree(ctx context.Context, cid cid.ID, from uint64,
|
|||
return nil
|
||||
})
|
||||
|
||||
var allNodesSynced atomic.Bool
|
||||
allNodesSynced.Store(true)
|
||||
|
||||
for i, n := range nodes {
|
||||
i := i
|
||||
n := n
|
||||
errGroup.Go(func() error {
|
||||
var nodeSynced bool
|
||||
height := from
|
||||
n.IterateNetworkEndpoints(func(addr string) bool {
|
||||
var a network.Address
|
||||
if err := a.FromString(addr); err != nil {
|
||||
s.log.Warn(logs.TreeFailedToParseAddressForTreeSynchronization, zap.Error(err), zap.String("address", addr))
|
||||
return false
|
||||
}
|
||||
|
||||
cc, err := s.dialCtx(egCtx, a)
|
||||
cc, err := grpc.DialContext(egCtx, a.URIAddr(),
|
||||
grpc.WithChainUnaryInterceptor(
|
||||
metrics.NewUnaryClientInterceptor(),
|
||||
tracing_grpc.NewUnaryClientInteceptor(),
|
||||
),
|
||||
grpc.WithChainStreamInterceptor(
|
||||
metrics.NewStreamClientInterceptor(),
|
||||
tracing_grpc.NewStreamClientInterceptor(),
|
||||
),
|
||||
grpc.WithTransportCredentials(insecure.NewCredentials()))
|
||||
if err != nil {
|
||||
s.log.Warn(logs.TreeFailedToConnectForTreeSynchronization, zap.Error(err), zap.String("address", addr))
|
||||
// Failed to connect, try the next address.
|
||||
return false
|
||||
}
|
||||
defer cc.Close()
|
||||
|
||||
err = s.startStream(egCtx, cid, treeID, from, cc, nodeOperationStreams[i])
|
||||
if err != nil {
|
||||
s.log.Warn(logs.TreeFailedToRunTreeSynchronizationForSpecificNode, zap.Error(err), zap.String("address", addr))
|
||||
treeClient := NewTreeServiceClient(cc)
|
||||
for {
|
||||
h, err := s.startStream(egCtx, cid, treeID, from, treeClient, nodeOperationStreams[i])
|
||||
if height < h {
|
||||
height = h
|
||||
}
|
||||
if err != nil || h <= height {
|
||||
// Error with the response, try the next node.
|
||||
return true
|
||||
}
|
||||
}
|
||||
nodeSynced = err == nil
|
||||
return true
|
||||
})
|
||||
close(nodeOperationStreams[i])
|
||||
if !nodeSynced {
|
||||
allNodesSynced.Store(false)
|
||||
}
|
||||
return nil
|
||||
})
|
||||
}
|
||||
if err := errGroup.Wait(); err != nil {
|
||||
allNodesSynced.Store(false)
|
||||
s.log.Warn(logs.TreeFailedToRunTreeSynchronizationOverAllNodes, zap.Error(err))
|
||||
}
|
||||
|
||||
|
@ -334,23 +340,7 @@ func (s *Service) synchronizeTree(ctx context.Context, cid cid.ID, from uint64,
|
|||
} else {
|
||||
newHeight++
|
||||
}
|
||||
if allNodesSynced.Load() {
|
||||
return newHeight
|
||||
}
|
||||
return from
|
||||
}
|
||||
|
||||
func (*Service) dialCtx(egCtx context.Context, a network.Address) (*grpc.ClientConn, error) {
|
||||
return grpc.DialContext(egCtx, a.URIAddr(),
|
||||
grpc.WithChainUnaryInterceptor(
|
||||
metrics.NewUnaryClientInterceptor(),
|
||||
tracing_grpc.NewUnaryClientInteceptor(),
|
||||
),
|
||||
grpc.WithChainStreamInterceptor(
|
||||
metrics.NewStreamClientInterceptor(),
|
||||
tracing_grpc.NewStreamClientInterceptor(),
|
||||
),
|
||||
grpc.WithTransportCredentials(insecure.NewCredentials()))
|
||||
return newHeight
|
||||
}
|
||||
|
||||
// ErrAlreadySyncing is returned when a service synchronization has already
|
||||
|
|
Loading…
Reference in a new issue