Compare commits

..

1 commit

Author SHA1 Message Date
e249c95844 [#824] frostfs-cli: Support passing chain ID in add-rule command
Signed-off-by: Airat Arifullin <a.arifullin@yadro.com>
2023-11-21 17:04:41 +03:00
40 changed files with 246 additions and 270 deletions

View file

@ -575,10 +575,8 @@ func (c *initializeContext) getContractDeployData(ctrName string, keysParam []an
c.Contracts[containerContract].Hash,
keysParam,
configParam)
case proxyContract:
case proxyContract, policyContract:
items = nil
case policyContract:
items = []any{nil}
default:
panic(fmt.Sprintf("invalid contract name: %s", ctrName))
}

View file

@ -39,7 +39,7 @@ var accountingBalanceCmd = &cobra.Command{
var prm internalclient.BalanceOfPrm
prm.SetClient(cli)
prm.Account = idUser
prm.Account = &idUser
res, err := internalclient.BalanceOf(cmd.Context(), prm)
commonCmd.ExitOnErr(cmd, "rpc error: %w", err)

View file

@ -1,6 +1,7 @@
package accounting
import (
"git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-cli/internal/common"
"git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-cli/internal/commonflags"
"github.com/spf13/cobra"
"github.com/spf13/viper"
@ -17,7 +18,9 @@ var Cmd = &cobra.Command{
_ = viper.BindPFlag(commonflags.WalletPath, flags.Lookup(commonflags.WalletPath))
_ = viper.BindPFlag(commonflags.Account, flags.Lookup(commonflags.Account))
_ = viper.BindPFlag(commonflags.RPC, flags.Lookup(commonflags.RPC))
common.StartClientCommandSpan(cmd)
},
PersistentPostRun: common.StopClientCommandSpan,
}
func init() {

View file

@ -47,7 +47,7 @@ var listContainersCmd = &cobra.Command{
var prm internalclient.ListContainersPrm
prm.SetClient(cli)
prm.Account = idUser
prm.Account = &idUser
res, err := internalclient.ListContainers(cmd.Context(), prm)
commonCmd.ExitOnErr(cmd, "rpc error: %w", err)

View file

@ -1,6 +1,7 @@
package container
import (
"git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-cli/internal/common"
"git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-cli/internal/commonflags"
"github.com/spf13/cobra"
)
@ -15,7 +16,9 @@ var Cmd = &cobra.Command{
// the viper before execution
commonflags.Bind(cmd)
commonflags.BindAPI(cmd)
common.StartClientCommandSpan(cmd)
},
PersistentPostRun: common.StopClientCommandSpan,
}
func init() {

View file

@ -1,6 +1,8 @@
package control
import (
"errors"
"git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/refs"
rawclient "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/rpc/client"
"git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-cli/internal/commonflags"
@ -33,7 +35,6 @@ func initControlIRRemoveContainerCmd() {
flags.String(commonflags.CIDFlag, "", commonflags.CIDFlagUsage)
flags.String(ownerFlag, "", "Container owner's wallet address.")
removeContainerCmd.MarkFlagsMutuallyExclusive(commonflags.CIDFlag, ownerFlag)
removeContainerCmd.MarkFlagsOneRequired(commonflags.CIDFlag, ownerFlag)
}
func removeContainer(cmd *cobra.Command, _ []string) {
@ -73,6 +74,10 @@ func prepareRemoveContainerRequest(cmd *cobra.Command) *ircontrol.RemoveContaine
ownerStr, err := cmd.Flags().GetString(ownerFlag)
commonCmd.ExitOnErr(cmd, "failed to get owner: ", err)
if len(ownerStr) == 0 && len(cidStr) == 0 {
commonCmd.ExitOnErr(cmd, "invalid usage: %w", errors.New("neither owner's wallet address nor container ID are specified"))
}
if len(ownerStr) > 0 {
var owner user.ID
commonCmd.ExitOnErr(cmd, "invalid owner ID: %w", owner.DecodeString(ownerStr))

View file

@ -1,6 +1,7 @@
package netmap
import (
"git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-cli/internal/common"
"git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-cli/internal/commonflags"
"github.com/spf13/cobra"
)
@ -14,7 +15,9 @@ var Cmd = &cobra.Command{
// the viper before execution
commonflags.Bind(cmd)
commonflags.BindAPI(cmd)
common.StartClientCommandSpan(cmd)
},
PersistentPostRun: common.StopClientCommandSpan,
}
func init() {

View file

@ -94,7 +94,7 @@ var objectLockCmd = &cobra.Command{
obj := objectSDK.New()
obj.SetContainerID(cnr)
obj.SetOwnerID(idOwner)
obj.SetOwnerID(&idOwner)
obj.SetType(objectSDK.TypeLock)
obj.SetAttributes(expirationAttr)
obj.SetPayload(lock.Marshal())

View file

@ -93,7 +93,7 @@ func putObject(cmd *cobra.Command, _ []string) {
attrs := getAllObjectAttributes(cmd)
obj.SetContainerID(cnr)
obj.SetOwnerID(ownerID)
obj.SetOwnerID(&ownerID)
obj.SetAttributes(attrs...)
notificationInfo, err := parseObjectNotifications(cmd)
@ -160,7 +160,7 @@ func readFilePayload(filename string, cmd *cobra.Command) (io.Reader, cid.ID, us
commonCmd.ExitOnErr(cmd, "can't unmarshal object from given file: %w", objTemp.Unmarshal(buf))
payloadReader := bytes.NewReader(objTemp.Payload())
cnr, _ := objTemp.ContainerID()
ownerID := objTemp.OwnerID()
ownerID := *objTemp.OwnerID()
return payloadReader, cnr, ownerID
}

View file

@ -1,6 +1,7 @@
package object
import (
"git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-cli/internal/common"
"git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-cli/internal/commonflags"
"github.com/spf13/cobra"
)
@ -15,7 +16,9 @@ var Cmd = &cobra.Command{
// the viper before execution
commonflags.Bind(cmd)
commonflags.BindAPI(cmd)
common.StartClientCommandSpan(cmd)
},
PersistentPostRun: common.StopClientCommandSpan,
}
func init() {

View file

@ -46,10 +46,6 @@ of frostfs-api and some useful utilities for compiling ACL rules from JSON
notation, managing container access through protocol gates, querying network map
and much more!`,
Run: entryPoint,
PersistentPreRun: func(cmd *cobra.Command, _ []string) {
common.StartClientCommandSpan(cmd)
},
PersistentPostRun: common.StopClientCommandSpan,
}
// Execute adds all child commands to the root command and sets flags appropriately.
@ -61,7 +57,6 @@ func Execute() {
func init() {
cobra.OnInitialize(initConfig)
cobra.EnableTraverseRunHooks = true
// use stdout as default output for cmd.Print()
rootCmd.SetOut(os.Stdout)

View file

@ -6,6 +6,7 @@ import (
"os"
internalclient "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-cli/internal/client"
"git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-cli/internal/common"
"git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-cli/internal/commonflags"
"git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-cli/internal/key"
commonCmd "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/internal/common"
@ -32,7 +33,9 @@ var createCmd = &cobra.Command{
PersistentPreRun: func(cmd *cobra.Command, args []string) {
_ = viper.BindPFlag(commonflags.WalletPath, cmd.Flags().Lookup(commonflags.WalletPath))
_ = viper.BindPFlag(commonflags.Account, cmd.Flags().Lookup(commonflags.Account))
common.StartClientCommandSpan(cmd)
},
PersistentPostRun: common.StopClientCommandSpan,
}
func init() {

View file

@ -12,29 +12,38 @@ import (
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
netmapSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/netmap"
lru "github.com/hashicorp/golang-lru/v2"
"github.com/hashicorp/golang-lru/v2/expirable"
)
type netValueReader[K any, V any] func(K) (V, error)
type valueWithError[V any] struct {
type valueWithTime[V any] struct {
v V
t time.Time
// cached error in order to not repeat failed request for some time
e error
}
// entity that provides TTL cache interface.
type ttlNetCache[K comparable, V any] struct {
cache *expirable.LRU[K, *valueWithError[V]]
netRdr netValueReader[K, V]
ttl time.Duration
sz int
cache *lru.Cache[K, *valueWithTime[V]]
netRdr netValueReader[K, V]
keyLocker *utilSync.KeyLocker[K]
}
// complicates netValueReader with TTL caching mechanism.
func newNetworkTTLCache[K comparable, V any](sz int, ttl time.Duration, netRdr netValueReader[K, V]) *ttlNetCache[K, V] {
cache := expirable.NewLRU[K, *valueWithError[V]](sz, nil, ttl)
cache, err := lru.New[K, *valueWithTime[V]](sz)
fatalOnErr(err)
return &ttlNetCache[K, V]{
ttl: ttl,
sz: sz,
cache: cache,
netRdr: netRdr,
keyLocker: utilSync.NewKeyLocker[K](),
@ -48,7 +57,7 @@ func newNetworkTTLCache[K comparable, V any](sz int, ttl time.Duration, netRdr n
// returned value should not be modified.
func (c *ttlNetCache[K, V]) get(key K) (V, error) {
val, ok := c.cache.Peek(key)
if ok {
if ok && time.Since(val.t) < c.ttl {
return val.v, val.e
}
@ -56,14 +65,15 @@ func (c *ttlNetCache[K, V]) get(key K) (V, error) {
defer c.keyLocker.Unlock(key)
val, ok = c.cache.Peek(key)
if ok {
if ok && time.Since(val.t) < c.ttl {
return val.v, val.e
}
v, err := c.netRdr(key)
c.cache.Add(key, &valueWithError[V]{
c.cache.Add(key, &valueWithTime[V]{
v: v,
t: time.Now(),
e: err,
})
@ -74,8 +84,9 @@ func (c *ttlNetCache[K, V]) set(k K, v V, e error) {
c.keyLocker.Lock(k)
defer c.keyLocker.Unlock(k)
c.cache.Add(k, &valueWithError[V]{
c.cache.Add(k, &valueWithTime[V]{
v: v,
t: time.Now(),
e: e,
})
}

View file

@ -1,56 +0,0 @@
package main
import (
"errors"
"testing"
"time"
"github.com/stretchr/testify/require"
)
func TestTTLNetCache(t *testing.T) {
ttlDuration := time.Millisecond * 50
cache := newNetworkTTLCache[string, time.Time](10, ttlDuration, testNetValueReader)
key := "key"
t.Run("Test Add and Get", func(t *testing.T) {
ti := time.Now()
cache.set(key, ti, nil)
val, err := cache.get(key)
require.NoError(t, err)
require.Equal(t, ti, val)
})
t.Run("Test TTL", func(t *testing.T) {
ti := time.Now()
cache.set(key, ti, nil)
time.Sleep(2 * ttlDuration)
val, err := cache.get(key)
require.NoError(t, err)
require.NotEqual(t, val, ti)
})
t.Run("Test Remove", func(t *testing.T) {
ti := time.Now()
cache.set(key, ti, nil)
cache.remove(key)
val, err := cache.get(key)
require.NoError(t, err)
require.NotEqual(t, val, ti)
})
t.Run("Test Cache Error", func(t *testing.T) {
cache.set("error", time.Now(), errors.New("mock error"))
_, err := cache.get("error")
require.Error(t, err)
require.Equal(t, "mock error", err.Error())
})
}
func testNetValueReader(key string) (time.Time, error) {
if key == "error" {
return time.Now(), errors.New("mock error")
}
return time.Now(), nil
}

View file

@ -126,10 +126,10 @@ type shardCfg struct {
subStorages []subStorageCfg
gcCfg struct {
removerBatchSize int
removerSleepInterval time.Duration
expiredCollectorBatchSize int
expiredCollectorWorkerCount int
removerBatchSize int
removerSleepInterval time.Duration
expiredCollectorBatchSize int
expiredCollectorWorkersCount int
}
writecacheCfg struct {
@ -256,7 +256,7 @@ func (a *applicationConfiguration) setShardWriteCacheConfig(newConfig *shardCfg,
wc.maxBatchDelay = writeCacheCfg.BoltDB().MaxBatchDelay()
wc.maxObjSize = writeCacheCfg.MaxObjectSize()
wc.smallObjectSize = writeCacheCfg.SmallObjectSize()
wc.flushWorkerCount = writeCacheCfg.WorkerCount()
wc.flushWorkerCount = writeCacheCfg.WorkersNumber()
wc.sizeLimit = writeCacheCfg.SizeLimit()
wc.noSync = writeCacheCfg.NoSync()
wc.gcInterval = writeCacheCfg.GCInterval()
@ -328,7 +328,7 @@ func (a *applicationConfiguration) setGCConfig(newConfig *shardCfg, oldConfig *s
newConfig.gcCfg.removerBatchSize = gcCfg.RemoverBatchSize()
newConfig.gcCfg.removerSleepInterval = gcCfg.RemoverSleepInterval()
newConfig.gcCfg.expiredCollectorBatchSize = gcCfg.ExpiredCollectorBatchSize()
newConfig.gcCfg.expiredCollectorWorkerCount = gcCfg.ExpiredCollectorWorkerCount()
newConfig.gcCfg.expiredCollectorWorkersCount = gcCfg.ExpiredCollectorWorkersCount()
}
// internals contains application-specific internals that are created
@ -888,7 +888,7 @@ func (c *cfg) getShardOpts(shCfg shardCfg) shardOptsWithID {
shard.WithRemoverBatchSize(shCfg.gcCfg.removerBatchSize),
shard.WithGCRemoverSleepInterval(shCfg.gcCfg.removerSleepInterval),
shard.WithExpiredCollectorBatchSize(shCfg.gcCfg.expiredCollectorBatchSize),
shard.WithExpiredCollectorWorkerCount(shCfg.gcCfg.expiredCollectorWorkerCount),
shard.WithExpiredCollectorWorkersCount(shCfg.gcCfg.expiredCollectorWorkersCount),
shard.WithGCWorkerPoolInitializer(func(sz int) util.WorkerPool {
pool, err := ants.NewPool(sz)
fatalOnErr(err)

View file

@ -74,7 +74,7 @@ func TestEngineSection(t *testing.T) {
require.Equal(t, "tmp/0/cache", wc.Path())
require.EqualValues(t, 16384, wc.SmallObjectSize())
require.EqualValues(t, 134217728, wc.MaxObjectSize())
require.EqualValues(t, 30, wc.WorkerCount())
require.EqualValues(t, 30, wc.WorkersNumber())
require.EqualValues(t, 3221225472, wc.SizeLimit())
require.Equal(t, "tmp/0/meta", meta.Path())
@ -108,7 +108,7 @@ func TestEngineSection(t *testing.T) {
require.EqualValues(t, 150, gc.RemoverBatchSize())
require.Equal(t, 2*time.Minute, gc.RemoverSleepInterval())
require.Equal(t, 1500, gc.ExpiredCollectorBatchSize())
require.Equal(t, 15, gc.ExpiredCollectorWorkerCount())
require.Equal(t, 15, gc.ExpiredCollectorWorkersCount())
require.Equal(t, false, sc.RefillMetabase())
require.Equal(t, mode.ReadOnly, sc.Mode())
@ -125,7 +125,7 @@ func TestEngineSection(t *testing.T) {
require.Equal(t, "tmp/1/cache", wc.Path())
require.EqualValues(t, 16384, wc.SmallObjectSize())
require.EqualValues(t, 134217728, wc.MaxObjectSize())
require.EqualValues(t, 30, wc.WorkerCount())
require.EqualValues(t, 30, wc.WorkersNumber())
require.EqualValues(t, 4294967296, wc.SizeLimit())
require.Equal(t, "tmp/1/meta", meta.Path())
@ -157,7 +157,7 @@ func TestEngineSection(t *testing.T) {
require.EqualValues(t, 200, gc.RemoverBatchSize())
require.Equal(t, 5*time.Minute, gc.RemoverSleepInterval())
require.Equal(t, gcconfig.ExpiredCollectorBatchSizeDefault, gc.ExpiredCollectorBatchSize())
require.Equal(t, gcconfig.ExpiredCollectorWorkersCountDefault, gc.ExpiredCollectorWorkerCount())
require.Equal(t, gcconfig.ExpiredCollectorWorkersCountDefault, gc.ExpiredCollectorWorkersCount())
require.Equal(t, true, sc.RefillMetabase())
require.Equal(t, mode.ReadWrite, sc.Mode())

View file

@ -63,14 +63,14 @@ func (x *Config) RemoverSleepInterval() time.Duration {
return RemoverSleepIntervalDefault
}
// ExpiredCollectorWorkerCount returns the value of "expired_collector_worker_count"
// ExpiredCollectorWorkersCount returns the value of "expired_collector_workers_count"
// config parameter.
//
// Returns ExpiredCollectorWorkersCountDefault if the value is not a positive number.
func (x *Config) ExpiredCollectorWorkerCount() int {
func (x *Config) ExpiredCollectorWorkersCount() int {
s := config.IntSafe(
(*config.Config)(x),
"expired_collector_worker_count",
"expired_collector_workers_count",
)
if s > 0 {

View file

@ -106,13 +106,13 @@ func (x *Config) MaxObjectSize() uint64 {
return MaxSizeDefault
}
// WorkerCount returns the value of "flush_worker_count" config parameter.
// WorkersNumber returns the value of "workers_number" config parameter.
//
// Returns WorkersNumberDefault if the value is not a positive number.
func (x *Config) WorkerCount() int {
func (x *Config) WorkersNumber() int {
c := config.IntSafe(
(*config.Config)(x),
"flush_worker_count",
"workers_number",
)
if c > 0 {

View file

@ -28,11 +28,11 @@ func Put(c *config.Config) PutConfig {
}
}
// PoolSizeRemote returns the value of "remote_pool_size" config parameter.
// PoolSizeRemote returns the value of "pool_size_remote" config parameter.
//
// Returns PutPoolSizeDefault if the value is not a positive number.
func (g PutConfig) PoolSizeRemote() int {
v := config.Int(g.cfg, "remote_pool_size")
v := config.Int(g.cfg, "pool_size_remote")
if v > 0 {
return int(v)
}
@ -40,11 +40,11 @@ func (g PutConfig) PoolSizeRemote() int {
return PutPoolSizeDefault
}
// PoolSizeLocal returns the value of "local_pool_size" config parameter.
// PoolSizeLocal returns the value of "pool_size_local" config parameter.
//
// Returns PutPoolSizeDefault if the value is not a positive number.
func (g PutConfig) PoolSizeLocal() int {
v := config.Int(g.cfg, "local_pool_size")
v := config.Int(g.cfg, "pool_size_local")
if v > 0 {
return int(v)
}

View file

@ -84,8 +84,8 @@ FROSTFS_REPLICATOR_PUT_TIMEOUT=15s
FROSTFS_REPLICATOR_POOL_SIZE=10
# Object service section
FROSTFS_OBJECT_PUT_REMOTE_POOL_SIZE=100
FROSTFS_OBJECT_PUT_LOCAL_POOL_SIZE=200
FROSTFS_OBJECT_PUT_POOL_SIZE_REMOTE=100
FROSTFS_OBJECT_PUT_POOL_SIZE_LOCAL=200
FROSTFS_OBJECT_PUT_SKIP_SESSION_TOKEN_ISSUER_VERIFICATION=true
FROSTFS_OBJECT_DELETE_TOMBSTONE_LIFETIME=10
@ -103,7 +103,7 @@ FROSTFS_STORAGE_SHARD_0_WRITECACHE_NO_SYNC=true
FROSTFS_STORAGE_SHARD_0_WRITECACHE_PATH=tmp/0/cache
FROSTFS_STORAGE_SHARD_0_WRITECACHE_SMALL_OBJECT_SIZE=16384
FROSTFS_STORAGE_SHARD_0_WRITECACHE_MAX_OBJECT_SIZE=134217728
FROSTFS_STORAGE_SHARD_0_WRITECACHE_FLUSH_WORKER_COUNT=30
FROSTFS_STORAGE_SHARD_0_WRITECACHE_WORKERS_NUMBER=30
FROSTFS_STORAGE_SHARD_0_WRITECACHE_CAPACITY=3221225472
### Metabase config
FROSTFS_STORAGE_SHARD_0_METABASE_PATH=tmp/0/meta
@ -142,7 +142,7 @@ FROSTFS_STORAGE_SHARD_0_GC_REMOVER_SLEEP_INTERVAL=2m
#### Limit of objects to be marked expired by the garbage collector
FROSTFS_STORAGE_SHARD_0_GC_EXPIRED_COLLECTOR_BATCH_SIZE=1500
#### Limit of concurrent workers collecting expired objects by the garbage collector
FROSTFS_STORAGE_SHARD_0_GC_EXPIRED_COLLECTOR_WORKER_COUNT=15
FROSTFS_STORAGE_SHARD_0_GC_EXPIRED_COLLECTOR_WORKERS_COUNT=15
## 1 shard
### Flag to refill Metabase from BlobStor
@ -154,7 +154,7 @@ FROSTFS_STORAGE_SHARD_1_WRITECACHE_ENABLED=true
FROSTFS_STORAGE_SHARD_1_WRITECACHE_PATH=tmp/1/cache
FROSTFS_STORAGE_SHARD_1_WRITECACHE_SMALL_OBJECT_SIZE=16384
FROSTFS_STORAGE_SHARD_1_WRITECACHE_MAX_OBJECT_SIZE=134217728
FROSTFS_STORAGE_SHARD_1_WRITECACHE_FLUSH_WORKER_COUNT=30
FROSTFS_STORAGE_SHARD_1_WRITECACHE_WORKERS_NUMBER=30
FROSTFS_STORAGE_SHARD_1_WRITECACHE_CAPACITY=4294967296
### Metabase config
FROSTFS_STORAGE_SHARD_1_METABASE_PATH=tmp/1/meta

View file

@ -129,8 +129,8 @@
"tombstone_lifetime": 10
},
"put": {
"remote_pool_size": 100,
"local_pool_size": 200,
"pool_size_remote": 100,
"pool_size_local": 200,
"skip_session_token_issuer_verification": true
}
},
@ -147,7 +147,7 @@
"path": "tmp/0/cache",
"small_object_size": 16384,
"max_object_size": 134217728,
"flush_worker_count": 30,
"workers_number": 30,
"capacity": 3221225472
},
"metabase": {
@ -190,7 +190,7 @@
"remover_batch_size": 150,
"remover_sleep_interval": "2m",
"expired_collector_batch_size": 1500,
"expired_collector_worker_count": 15
"expired_collector_workers_count": 15
}
},
"1": {
@ -203,7 +203,7 @@
"memcache_capacity": 2147483648,
"small_object_size": 16384,
"max_object_size": 134217728,
"flush_worker_count": 30,
"workers_number": 30,
"capacity": 4294967296
},
"metabase": {

View file

@ -108,8 +108,8 @@ object:
delete:
tombstone_lifetime: 10 # tombstone "local" lifetime in epochs
put:
remote_pool_size: 100 # number of async workers for remote PUT operations
local_pool_size: 200 # number of async workers for local PUT operations
pool_size_remote: 100 # number of async workers for remote PUT operations
pool_size_local: 200 # number of async workers for local PUT operations
skip_session_token_issuer_verification: true # session token issuer verification will be skipped if true
storage:
@ -126,7 +126,7 @@ storage:
type: bbolt
small_object_size: 16k # size threshold for "small" objects which are cached in key-value DB, not in FS, bytes
max_object_size: 134217728 # size threshold for "big" objects which bypass write-cache and go to the storage directly, bytes
flush_worker_count: 30 # number of write-cache flusher threads
workers_number: 30 # number of write-cache flusher threads
metabase:
perm: 0644 # permissions for metabase files(directories: +x for current user and group)
@ -196,7 +196,7 @@ storage:
remover_batch_size: 150 # number of objects to be removed by the garbage collector
remover_sleep_interval: 2m # frequency of the garbage collector invocation
expired_collector_batch_size: 1500 # number of objects to be marked expired by the garbage collector
expired_collector_worker_count: 15 # number of concurrent workers collecting expired objects by the garbage collector
expired_collector_workers_count: 15 # number of concurrent workers collecting expired objects by the garbage collector
1:
writecache:

View file

@ -50,8 +50,8 @@ prometheus:
object:
put:
remote_pool_size: 100
local_pool_size: 100
pool_size_remote: 100
pool_size_local: 100
morph:
rpc_endpoint:

View file

@ -243,7 +243,7 @@ gc:
remover_batch_size: 200
remover_sleep_interval: 5m
expired_collector_batch_size: 500
expired_collector_worker_count: 5
expired_collector_workers_count: 5
```
| Parameter | Type | Default value | Description |
@ -251,7 +251,7 @@ gc:
| `remover_batch_size` | `int` | `100` | Amount of objects to grab in a single batch. |
| `remover_sleep_interval` | `duration` | `1m` | Time to sleep between iterations. |
| `expired_collector_batch_size` | `int` | `500` | Max amount of expired objects to grab in a single batch. |
| `expired_collector_worker_count` | `int` | `5` | Max amount of concurrent expired objects workers. |
| `expired_collector_workers_count` | `int` | `5` | Max amount of concurrent expired objects workers. |
### `metabase` subsection
@ -280,7 +280,7 @@ writecache:
capacity: 4294967296
small_object_size: 16384
max_object_size: 134217728
flush_worker_count: 30
workers_number: 30
```
| Parameter | Type | Default value | Description |
@ -290,7 +290,7 @@ writecache:
| `capacity` | `size` | unrestricted | Approximate maximum size of the writecache. If the writecache is full, objects are written to the blobstor directly. |
| `small_object_size` | `size` | `32K` | Maximum object size for "small" objects. This objects are stored in a key-value database instead of a file-system. |
| `max_object_size` | `size` | `64M` | Maximum object size allowed to be stored in the writecache. |
| `flush_worker_count` | `int` | `20` | Amount of background workers that move data from the writecache to the blobstor. |
| `workers_number` | `int` | `20` | Amount of background workers that move data from the writecache to the blobstor. |
| `max_batch_size` | `int` | `1000` | Maximum amount of small object `PUT` operations to perform in a single transaction. |
| `max_batch_delay` | `duration` | `10ms` | Maximum delay before a batch starts. |
@ -415,7 +415,7 @@ replicator:
| Parameter | Type | Default value | Description |
|---------------|------------|----------------------------------------|---------------------------------------------|
| `put_timeout` | `duration` | `5s` | Timeout for performing the `PUT` operation. |
| `pool_size` | `int` | Equal to `object.put.remote_pool_size` | Maximum amount of concurrent replications. |
| `pool_size` | `int` | Equal to `object.put.pool_size_remote` | Maximum amount of concurrent replications. |
# `object` section
Contains object-service related parameters.
@ -423,14 +423,14 @@ Contains object-service related parameters.
```yaml
object:
put:
remote_pool_size: 100
pool_size_remote: 100
```
| Parameter | Type | Default value | Description |
|-----------------------------|-------|---------------|------------------------------------------------------------------------------------------------|
| `delete.tombstone_lifetime` | `int` | `5` | Tombstone lifetime for removed objects in epochs. |
| `put.remote_pool_size` | `int` | `10` | Max pool size for performing remote `PUT` operations. Used by Policer and Replicator services. |
| `put.local_pool_size` | `int` | `10` | Max pool size for performing local `PUT` operations. Used by Policer and Replicator services. |
| `put.pool_size_remote` | `int` | `10` | Max pool size for performing remote `PUT` operations. Used by Policer and Replicator services. |
| `put.pool_size_local` | `int` | `10` | Max pool size for performing local `PUT` operations. Used by Policer and Replicator services. |
# `runtime` section
Contains runtime parameters.

12
go.mod
View file

@ -6,7 +6,7 @@ require (
git.frostfs.info/TrueCloudLab/frostfs-api-go/v2 v2.16.1-0.20231031104748-498877e378fd
git.frostfs.info/TrueCloudLab/frostfs-contract v0.18.1-0.20231102065436-9ed3845aa989
git.frostfs.info/TrueCloudLab/frostfs-observability v0.0.0-20231101111734-b3ad3335ff65
git.frostfs.info/TrueCloudLab/frostfs-sdk-go v0.0.0-20231122162120-56debcfa569e
git.frostfs.info/TrueCloudLab/frostfs-sdk-go v0.0.0-20231114081800-3787477133f3
git.frostfs.info/TrueCloudLab/hrw v1.2.1
git.frostfs.info/TrueCloudLab/policy-engine v0.0.0-20231115094736-5db67021e10f
git.frostfs.info/TrueCloudLab/tzhash v1.8.0
@ -27,11 +27,11 @@ require (
github.com/paulmach/orb v0.9.2
github.com/prometheus/client_golang v1.16.0
github.com/spf13/cast v1.5.1
github.com/spf13/cobra v1.8.0
github.com/spf13/cobra v1.7.0
github.com/spf13/pflag v1.0.5
github.com/spf13/viper v1.16.0
github.com/stretchr/testify v1.8.4
go.etcd.io/bbolt v1.3.8
go.etcd.io/bbolt v1.3.7
go.opentelemetry.io/otel v1.16.0
go.opentelemetry.io/otel/trace v1.16.0
go.uber.org/zap v1.26.0
@ -63,7 +63,7 @@ require (
github.com/cespare/xxhash/v2 v2.2.0 // indirect
github.com/consensys/bavard v0.1.13 // indirect
github.com/consensys/gnark-crypto v0.12.2-0.20231013160410-1f65e75b6dfb // indirect
github.com/cpuguy83/go-md2man/v2 v2.0.3 // indirect
github.com/cpuguy83/go-md2man/v2 v2.0.2 // indirect
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/decred/dcrd/dcrec/secp256k1/v4 v4.2.0 // indirect
github.com/dgraph-io/badger/v4 v4.1.0
@ -124,8 +124,8 @@ require (
go.uber.org/multierr v1.11.0 // indirect
golang.org/x/crypto v0.14.0 // indirect
golang.org/x/net v0.17.0 // indirect
golang.org/x/sys v0.14.0 // indirect
golang.org/x/text v0.14.0 // indirect
golang.org/x/sys v0.13.0 // indirect
golang.org/x/text v0.13.0 // indirect
google.golang.org/genproto/googleapis/api v0.0.0-20230822172742-b8732ec3820d // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20231016165738-49dd2c1f3d0b // indirect
gopkg.in/ini.v1 v1.67.0 // indirect

BIN
go.sum

Binary file not shown.

View file

@ -64,9 +64,6 @@ const (
TreeCouldNotUpdateLastSynchronizedHeightForATree = "could not update last synchronized height for a tree"
TreeSynchronizeTree = "synchronize tree"
TreeFailedToRunTreeSynchronizationOverAllNodes = "failed to run tree synchronization over all nodes"
TreeFailedToRunTreeSynchronizationForSpecificNode = "failed to run tree synchronization for specific node"
TreeFailedToParseAddressForTreeSynchronization = "failed to parse address for tree synchronization"
TreeFailedToConnectForTreeSynchronization = "failed to connect for tree synchronization"
TreeSyncingTrees = "syncing trees..."
TreeCouldNotFetchContainers = "could not fetch containers"
TreeTreesHaveBeenSynchronized = "trees have been synchronized"

View file

@ -165,7 +165,7 @@ func (v *FormatValidator) validateSignatureKey(obj *objectSDK.Object) error {
}
token := obj.SessionToken()
ownerID := obj.OwnerID()
ownerID := *obj.OwnerID()
if token == nil || !token.AssertAuthKey(&key) {
return v.checkOwnerKey(ownerID, key)
@ -412,7 +412,7 @@ func (v *FormatValidator) checkAttributes(obj *objectSDK.Object) error {
var errIncorrectOwner = errors.New("incorrect object owner")
func (v *FormatValidator) checkOwner(obj *objectSDK.Object) error {
if idOwner := obj.OwnerID(); idOwner.IsEmpty() {
if idOwner := obj.OwnerID(); idOwner == nil || len(idOwner.WalletBytes()) == 0 {
return errIncorrectOwner
}

View file

@ -32,7 +32,7 @@ func blankValidObject(key *ecdsa.PrivateKey) *objectSDK.Object {
obj := objectSDK.New()
obj.SetContainerID(cidtest.ID())
obj.SetOwnerID(idOwner)
obj.SetOwnerID(&idOwner)
return obj
}
@ -107,7 +107,7 @@ func TestFormatValidator_Validate(t *testing.T) {
obj := objectSDK.New()
obj.SetContainerID(cidtest.ID())
obj.SetSessionToken(tok)
obj.SetOwnerID(idOwner)
obj.SetOwnerID(&idOwner)
require.NoError(t, objectSDK.SetIDWithSignature(ownerKey.PrivateKey, obj))
@ -303,7 +303,7 @@ func TestFormatValidator_ValidateTokenIssuer(t *testing.T) {
obj := objectSDK.New()
obj.SetContainerID(cidtest.ID())
obj.SetSessionToken(tok)
obj.SetOwnerID(owner)
obj.SetOwnerID(&owner)
require.NoError(t, objectSDK.SetIDWithSignature(signer.PrivateKey, obj))
require.NoError(t, v.Validate(context.Background(), obj, false))
@ -352,7 +352,7 @@ func TestFormatValidator_ValidateTokenIssuer(t *testing.T) {
obj := objectSDK.New()
obj.SetContainerID(cnrID)
obj.SetSessionToken(tok)
obj.SetOwnerID(owner)
obj.SetOwnerID(&owner)
require.NoError(t, objectSDK.SetIDWithSignature(signer.PrivateKey, obj))
require.NoError(t, v.Validate(context.Background(), obj, false))
@ -386,7 +386,7 @@ func TestFormatValidator_ValidateTokenIssuer(t *testing.T) {
obj := objectSDK.New()
obj.SetContainerID(cnrID)
obj.SetSessionToken(tok)
obj.SetOwnerID(owner)
obj.SetOwnerID(&owner)
require.NoError(t, objectSDK.SetIDWithSignature(signer.PrivateKey, obj))
v := NewFormatValidator(
@ -459,7 +459,7 @@ func TestFormatValidator_ValidateTokenIssuer(t *testing.T) {
obj := objectSDK.New()
obj.SetContainerID(cnrID)
obj.SetSessionToken(tok)
obj.SetOwnerID(owner)
obj.SetOwnerID(&owner)
require.NoError(t, objectSDK.SetIDWithSignature(signer.PrivateKey, obj))
v := NewFormatValidator(
@ -535,7 +535,7 @@ func TestFormatValidator_ValidateTokenIssuer(t *testing.T) {
obj := objectSDK.New()
obj.SetContainerID(cnrID)
obj.SetSessionToken(tok)
obj.SetOwnerID(owner)
obj.SetOwnerID(&owner)
require.NoError(t, objectSDK.SetIDWithSignature(signer.PrivateKey, obj))
v := NewFormatValidator(

View file

@ -36,7 +36,6 @@ func TestStorageEngine_Inhume(t *testing.T) {
link.SetSplitID(splitID)
t.Run("delete small object", func(t *testing.T) {
t.Parallel()
e := testNewEngine(t).setShardsNum(t, 1).engine
defer e.Close(context.Background())
@ -55,7 +54,6 @@ func TestStorageEngine_Inhume(t *testing.T) {
})
t.Run("delete big object", func(t *testing.T) {
t.Parallel()
s1 := testNewShard(t, 1)
s2 := testNewShard(t, 2)

View file

@ -12,65 +12,88 @@ import (
"github.com/stretchr/testify/require"
)
func TestShard_Delete_SmallObject(t *testing.T) {
t.Run("small object without write cache", func(t *testing.T) {
func TestShard_Delete(t *testing.T) {
t.Parallel()
t.Run("without write cache", func(t *testing.T) {
t.Parallel()
testShard(t, false, 1<<5)
testShardDelete(t, false)
})
t.Run("small object with write cache", func(t *testing.T) {
t.Run("with write cache", func(t *testing.T) {
t.Parallel()
testShard(t, true, 1<<5)
testShardDelete(t, true)
})
}
func TestShard_Delete_BigObject(t *testing.T) {
t.Run("big object without write cache", func(t *testing.T) {
t.Parallel()
testShard(t, false, 1<<20)
})
t.Run("big object with write cache", func(t *testing.T) {
t.Parallel()
testShard(t, true, 1<<20)
})
}
func testShard(t *testing.T, hasWriteCache bool, payloadSize int) {
func testShardDelete(t *testing.T, hasWriteCache bool) {
sh := newShard(t, hasWriteCache)
cnr := cidtest.ID()
obj := testutil.GenerateObjectWithCID(cnr)
testutil.AddAttribute(obj, "foo", "bar")
testutil.AddPayload(obj, payloadSize)
var putPrm PutPrm
putPrm.SetObject(obj)
var getPrm GetPrm
getPrm.SetAddress(object.AddressOf(obj))
var delPrm DeletePrm
delPrm.SetAddresses(object.AddressOf(obj))
t.Run("big object", func(t *testing.T) {
testutil.AddPayload(obj, 1<<20)
_, err := sh.Put(context.Background(), putPrm)
require.NoError(t, err)
putPrm.SetObject(obj)
getPrm.SetAddress(object.AddressOf(obj))
_, err = testGet(t, sh, getPrm, hasWriteCache)
require.NoError(t, err)
var delPrm DeletePrm
delPrm.SetAddresses(object.AddressOf(obj))
if hasWriteCache {
sh.FlushWriteCache(context.Background(), FlushWriteCachePrm{ignoreErrors: false})
require.Eventually(t, func() bool {
_, err = sh.Delete(context.Background(), delPrm)
return err == nil
}, 30*time.Second, 10*time.Millisecond)
} else {
_, err = sh.Delete(context.Background(), delPrm)
_, err := sh.Put(context.Background(), putPrm)
require.NoError(t, err)
}
_, err = sh.Get(context.Background(), getPrm)
require.True(t, client.IsErrObjectNotFound(err))
_, err = testGet(t, sh, getPrm, hasWriteCache)
require.NoError(t, err)
if hasWriteCache {
require.Eventually(t, func() bool {
_, err = sh.Delete(context.Background(), delPrm)
return err == nil
}, 30*time.Second, 100*time.Millisecond)
} else {
_, err = sh.Delete(context.Background(), delPrm)
require.NoError(t, err)
}
_, err = sh.Get(context.Background(), getPrm)
require.True(t, client.IsErrObjectNotFound(err))
})
t.Run("small object", func(t *testing.T) {
obj := testutil.GenerateObjectWithCID(cnr)
testutil.AddAttribute(obj, "foo", "bar")
testutil.AddPayload(obj, 1<<5)
putPrm.SetObject(obj)
getPrm.SetAddress(object.AddressOf(obj))
var delPrm DeletePrm
delPrm.SetAddresses(object.AddressOf(obj))
_, err := sh.Put(context.Background(), putPrm)
require.NoError(t, err)
_, err = sh.Get(context.Background(), getPrm)
require.NoError(t, err)
if hasWriteCache {
require.Eventually(t, func() bool {
_, err = sh.Delete(context.Background(), delPrm)
return err == nil
}, 10*time.Second, 100*time.Millisecond)
} else {
_, err = sh.Delete(context.Background(), delPrm)
require.NoError(t, err)
}
_, err = sh.Get(context.Background(), getPrm)
require.True(t, client.IsErrObjectNotFound(err))
})
}

View file

@ -115,8 +115,8 @@ type gcCfg struct {
workerPoolInit func(int) util.WorkerPool
expiredCollectorWorkerCount int
expiredCollectorBatchSize int
expiredCollectorWorkersCount int
expiredCollectorBatchSize int
metrics GCMectrics
@ -313,16 +313,16 @@ func (s *Shard) removeGarbage(pctx context.Context) (result gcRunResult) {
return
}
func (s *Shard) getExpiredObjectsParameters() (workerCount, batchSize int) {
workerCount = minExpiredWorkers
func (s *Shard) getExpiredObjectsParameters() (workersCount, batchSize int) {
workersCount = minExpiredWorkers
batchSize = minExpiredBatchSize
if s.gc.gcCfg.expiredCollectorBatchSize > batchSize {
batchSize = s.gc.gcCfg.expiredCollectorBatchSize
}
if s.gc.gcCfg.expiredCollectorWorkerCount > workerCount {
workerCount = s.gc.gcCfg.expiredCollectorWorkerCount
if s.gc.gcCfg.expiredCollectorWorkersCount > workersCount {
workersCount = s.gc.gcCfg.expiredCollectorWorkersCount
}
return
}

View file

@ -365,11 +365,11 @@ func WithExpiredCollectorBatchSize(size int) Option {
}
}
// WithExpiredCollectorWorkerCount returns option to set concurrent
// WithExpiredCollectorWorkersCount returns option to set concurrent
// workers count of expired object collection operation.
func WithExpiredCollectorWorkerCount(count int) Option {
func WithExpiredCollectorWorkersCount(count int) Option {
return func(c *cfg) {
c.gcCfg.expiredCollectorWorkerCount = count
c.gcCfg.expiredCollectorWorkersCount = count
}
}

View file

@ -39,14 +39,14 @@ func TestStickyCheck(t *testing.T) {
info.SetSenderKey(make([]byte, 33)) // any non-empty key
info.SetRequestRole(acl.RoleContainer)
require.True(t, checker.StickyBitCheck(info, usertest.ID()))
require.True(t, checker.StickyBitCheck(info, *usertest.ID()))
var basicACL acl.Basic
basicACL.MakeSticky()
info.SetBasicACL(basicACL)
require.True(t, checker.StickyBitCheck(info, usertest.ID()))
require.True(t, checker.StickyBitCheck(info, *usertest.ID()))
})
t.Run("owner ID and/or public key emptiness", func(t *testing.T) {
@ -72,7 +72,7 @@ func TestStickyCheck(t *testing.T) {
var ownerID user.ID
if withOwner {
ownerID = usertest.ID()
ownerID = *usertest.ID()
}
require.Equal(t, expected, checker.StickyBitCheck(info, ownerID))

View file

@ -62,8 +62,8 @@ func headersFromObject(obj *objectSDK.Object, cnr cid.ID, oid *oid.ID) []eaclSDK
res = append(res, oidHeader(*oid))
}
if idOwner := obj.OwnerID(); !idOwner.IsEmpty() {
res = append(res, ownerIDHeader(idOwner))
if idOwner := obj.OwnerID(); idOwner != nil {
res = append(res, ownerIDHeader(*idOwner))
}
cs, ok := obj.PayloadChecksum()

View file

@ -176,11 +176,11 @@ func (exec *execCtx) initTombstoneObject() error {
tokenSession := exec.commonParameters().SessionToken()
if tokenSession != nil {
issuer := tokenSession.Issuer()
exec.tombstoneObj.SetOwnerID(issuer)
exec.tombstoneObj.SetOwnerID(&issuer)
} else {
// make local node a tombstone object owner
localUser := exec.svc.netInfo.LocalNodeID()
exec.tombstoneObj.SetOwnerID(localUser)
exec.tombstoneObj.SetOwnerID(&localUser)
}
var a objectSDK.Attribute

View file

@ -112,7 +112,7 @@ func (p *Streamer) initTrustedTarget(prm *PutInitPrm) error {
// If it isn't owner key, replication attempts will fail, thus this check.
if sToken == nil {
ownerObj := prm.hdr.OwnerID()
if ownerObj.IsEmpty() {
if ownerObj == nil {
return errors.New("missing object owner")
}

View file

@ -25,7 +25,7 @@ func TestNewKeyStorage(t *testing.T) {
tokenStor := tokenStorage.NewTokenStore()
stor := util.NewKeyStorage(&nodeKey.PrivateKey, tokenStor, mockedNetworkState{42})
owner := usertest.ID()
owner := *usertest.ID()
t.Run("node key", func(t *testing.T) {
key, err := stor.GetKey(nil)
@ -36,7 +36,7 @@ func TestNewKeyStorage(t *testing.T) {
t.Run("unknown token", func(t *testing.T) {
_, err = stor.GetKey(&util.SessionInfo{
ID: uuid.New(),
Owner: usertest.ID(),
Owner: *usertest.ID(),
})
require.Error(t, err)
})

View file

@ -22,7 +22,7 @@ func TestTokenStore(t *testing.T) {
defer ts.Close()
owner := usertest.ID()
owner := *usertest.ID()
var ownerV2 refs.OwnerID
owner.WriteToV2(&ownerV2)
@ -66,7 +66,7 @@ func TestTokenStore_Persistent(t *testing.T) {
ts, err := NewTokenStore(path)
require.NoError(t, err)
idOwner := usertest.ID()
idOwner := *usertest.ID()
var idOwnerV2 refs.OwnerID
idOwner.WriteToV2(&idOwnerV2)
@ -127,7 +127,7 @@ func TestTokenStore_RemoveOld(t *testing.T) {
defer ts.Close()
owner := usertest.ID()
owner := *usertest.ID()
var ownerV2 refs.OwnerID
owner.WriteToV2(&ownerV2)

View file

@ -9,7 +9,6 @@ import (
"math"
"math/rand"
"sync"
"sync/atomic"
"time"
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
@ -217,44 +216,45 @@ func (s *Service) applyOperationStream(ctx context.Context, cid cid.ID, treeID s
}
func (s *Service) startStream(ctx context.Context, cid cid.ID, treeID string,
height uint64, cc *grpc.ClientConn, opsCh chan<- *pilorama.Move,
) error {
treeClient := NewTreeServiceClient(cc)
height uint64, treeClient TreeServiceClient, opsCh chan<- *pilorama.Move,
) (uint64, error) {
rawCID := make([]byte, sha256.Size)
cid.Encode(rawCID)
req := &GetOpLogRequest{
Body: &GetOpLogRequest_Body{
ContainerId: rawCID,
TreeId: treeID,
Height: height,
},
}
if err := SignMessage(req, s.key); err != nil {
return err
}
for {
newHeight := height
req := &GetOpLogRequest{
Body: &GetOpLogRequest_Body{
ContainerId: rawCID,
TreeId: treeID,
Height: newHeight,
},
}
if err := SignMessage(req, s.key); err != nil {
return 0, err
}
c, err := treeClient.GetOpLog(ctx, req)
if err != nil {
return fmt.Errorf("can't initialize client: %w", err)
}
res, err := c.Recv()
for ; err == nil; res, err = c.Recv() {
lm := res.GetBody().GetOperation()
m := &pilorama.Move{
Parent: lm.ParentId,
Child: lm.ChildId,
c, err := treeClient.GetOpLog(ctx, req)
if err != nil {
return 0, fmt.Errorf("can't initialize client: %w", err)
}
if err := m.Meta.FromBytes(lm.Meta); err != nil {
return err
res, err := c.Recv()
for ; err == nil; res, err = c.Recv() {
lm := res.GetBody().GetOperation()
m := &pilorama.Move{
Parent: lm.ParentId,
Child: lm.ChildId,
}
if err := m.Meta.FromBytes(lm.Meta); err != nil {
return 0, err
}
opsCh <- m
}
opsCh <- m
if height == newHeight || err != nil && !errors.Is(err, io.EOF) {
return newHeight, err
}
height = newHeight
}
if err != nil && !errors.Is(err, io.EOF) {
return err
}
return nil
}
// synchronizeTree synchronizes operations getting them from different nodes.
@ -287,44 +287,50 @@ func (s *Service) synchronizeTree(ctx context.Context, cid cid.ID, from uint64,
return nil
})
var allNodesSynced atomic.Bool
allNodesSynced.Store(true)
for i, n := range nodes {
i := i
n := n
errGroup.Go(func() error {
var nodeSynced bool
height := from
n.IterateNetworkEndpoints(func(addr string) bool {
var a network.Address
if err := a.FromString(addr); err != nil {
s.log.Warn(logs.TreeFailedToParseAddressForTreeSynchronization, zap.Error(err), zap.String("address", addr))
return false
}
cc, err := s.dialCtx(egCtx, a)
cc, err := grpc.DialContext(egCtx, a.URIAddr(),
grpc.WithChainUnaryInterceptor(
metrics.NewUnaryClientInterceptor(),
tracing_grpc.NewUnaryClientInteceptor(),
),
grpc.WithChainStreamInterceptor(
metrics.NewStreamClientInterceptor(),
tracing_grpc.NewStreamClientInterceptor(),
),
grpc.WithTransportCredentials(insecure.NewCredentials()))
if err != nil {
s.log.Warn(logs.TreeFailedToConnectForTreeSynchronization, zap.Error(err), zap.String("address", addr))
// Failed to connect, try the next address.
return false
}
defer cc.Close()
err = s.startStream(egCtx, cid, treeID, from, cc, nodeOperationStreams[i])
if err != nil {
s.log.Warn(logs.TreeFailedToRunTreeSynchronizationForSpecificNode, zap.Error(err), zap.String("address", addr))
treeClient := NewTreeServiceClient(cc)
for {
h, err := s.startStream(egCtx, cid, treeID, from, treeClient, nodeOperationStreams[i])
if height < h {
height = h
}
if err != nil || h <= height {
// Error with the response, try the next node.
return true
}
}
nodeSynced = err == nil
return true
})
close(nodeOperationStreams[i])
if !nodeSynced {
allNodesSynced.Store(false)
}
return nil
})
}
if err := errGroup.Wait(); err != nil {
allNodesSynced.Store(false)
s.log.Warn(logs.TreeFailedToRunTreeSynchronizationOverAllNodes, zap.Error(err))
}
@ -334,23 +340,7 @@ func (s *Service) synchronizeTree(ctx context.Context, cid cid.ID, from uint64,
} else {
newHeight++
}
if allNodesSynced.Load() {
return newHeight
}
return from
}
func (*Service) dialCtx(egCtx context.Context, a network.Address) (*grpc.ClientConn, error) {
return grpc.DialContext(egCtx, a.URIAddr(),
grpc.WithChainUnaryInterceptor(
metrics.NewUnaryClientInterceptor(),
tracing_grpc.NewUnaryClientInteceptor(),
),
grpc.WithChainStreamInterceptor(
metrics.NewStreamClientInterceptor(),
tracing_grpc.NewStreamClientInterceptor(),
),
grpc.WithTransportCredentials(insecure.NewCredentials()))
return newHeight
}
// ErrAlreadySyncing is returned when a service synchronization has already