forked from TrueCloudLab/frostfs-node
[#493] cmd/node: Remove viper from storage node
Signed-off-by: Alex Vanin <alexey@nspcc.ru>
This commit is contained in:
parent
cd947bb580
commit
e9e986ac71
2 changed files with 11 additions and 56 deletions
|
@ -6,7 +6,6 @@ import (
|
||||||
"net"
|
"net"
|
||||||
"os"
|
"os"
|
||||||
"path"
|
"path"
|
||||||
"strings"
|
|
||||||
"sync"
|
"sync"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
@ -23,7 +22,7 @@ import (
|
||||||
loggerconfig "github.com/nspcc-dev/neofs-node/cmd/neofs-node/config/logger"
|
loggerconfig "github.com/nspcc-dev/neofs-node/cmd/neofs-node/config/logger"
|
||||||
metricsconfig "github.com/nspcc-dev/neofs-node/cmd/neofs-node/config/metrics"
|
metricsconfig "github.com/nspcc-dev/neofs-node/cmd/neofs-node/config/metrics"
|
||||||
nodeconfig "github.com/nspcc-dev/neofs-node/cmd/neofs-node/config/node"
|
nodeconfig "github.com/nspcc-dev/neofs-node/cmd/neofs-node/config/node"
|
||||||
"github.com/nspcc-dev/neofs-node/misc"
|
objectconfig "github.com/nspcc-dev/neofs-node/cmd/neofs-node/config/object"
|
||||||
"github.com/nspcc-dev/neofs-node/pkg/core/container"
|
"github.com/nspcc-dev/neofs-node/pkg/core/container"
|
||||||
netmapCore "github.com/nspcc-dev/neofs-node/pkg/core/netmap"
|
netmapCore "github.com/nspcc-dev/neofs-node/pkg/core/netmap"
|
||||||
"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/blobstor"
|
"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/blobstor"
|
||||||
|
@ -47,27 +46,13 @@ import (
|
||||||
util2 "github.com/nspcc-dev/neofs-node/pkg/util"
|
util2 "github.com/nspcc-dev/neofs-node/pkg/util"
|
||||||
"github.com/nspcc-dev/neofs-node/pkg/util/logger"
|
"github.com/nspcc-dev/neofs-node/pkg/util/logger"
|
||||||
"github.com/panjf2000/ants/v2"
|
"github.com/panjf2000/ants/v2"
|
||||||
"github.com/spf13/viper"
|
|
||||||
"go.etcd.io/bbolt"
|
"go.etcd.io/bbolt"
|
||||||
"go.uber.org/atomic"
|
"go.uber.org/atomic"
|
||||||
"go.uber.org/zap"
|
"go.uber.org/zap"
|
||||||
"google.golang.org/grpc"
|
"google.golang.org/grpc"
|
||||||
)
|
)
|
||||||
|
|
||||||
const (
|
const addressSize = 72 // 32 bytes oid, 32 bytes cid, 8 bytes protobuf encoding
|
||||||
// config keys for API client cache
|
|
||||||
cfgAPIClientDialTimeout = "apiclient.dial_timeout"
|
|
||||||
|
|
||||||
cfgPolicerHeadTimeout = "policer.head_timeout"
|
|
||||||
|
|
||||||
cfgReplicatorPutTimeout = "replicator.put_timeout"
|
|
||||||
|
|
||||||
cfgObjectPutPoolSize = "object.put.pool_size"
|
|
||||||
)
|
|
||||||
|
|
||||||
const (
|
|
||||||
addressSize = 72 // 32 bytes oid, 32 bytes cid, 8 bytes protobuf encoding
|
|
||||||
)
|
|
||||||
|
|
||||||
const maxMsgSize = 4 << 20 // transport msg limit 4 MiB
|
const maxMsgSize = 4 << 20 // transport msg limit 4 MiB
|
||||||
|
|
||||||
|
@ -84,8 +69,6 @@ type cfg struct {
|
||||||
|
|
||||||
internalErr chan error // channel for internal application errors at runtime
|
internalErr chan error // channel for internal application errors at runtime
|
||||||
|
|
||||||
viper *viper.Viper
|
|
||||||
|
|
||||||
log *zap.Logger
|
log *zap.Logger
|
||||||
|
|
||||||
wg *sync.WaitGroup
|
wg *sync.WaitGroup
|
||||||
|
@ -241,8 +224,6 @@ func initCfg(path string) *cfg {
|
||||||
config.WithConfigFile(path),
|
config.WithConfigFile(path),
|
||||||
)
|
)
|
||||||
|
|
||||||
viperCfg := initViper(path)
|
|
||||||
|
|
||||||
key, err := crypto.LoadPrivateKey(nodeconfig.Key(appCfg))
|
key, err := crypto.LoadPrivateKey(nodeconfig.Key(appCfg))
|
||||||
fatalOnErr(err)
|
fatalOnErr(err)
|
||||||
|
|
||||||
|
@ -296,7 +277,6 @@ func initCfg(path string) *cfg {
|
||||||
ctx: context.Background(),
|
ctx: context.Background(),
|
||||||
appCfg: appCfg,
|
appCfg: appCfg,
|
||||||
internalErr: make(chan error),
|
internalErr: make(chan error),
|
||||||
viper: viperCfg,
|
|
||||||
log: log,
|
log: log,
|
||||||
wg: new(sync.WaitGroup),
|
wg: new(sync.WaitGroup),
|
||||||
key: key,
|
key: key,
|
||||||
|
@ -331,7 +311,7 @@ func initCfg(path string) *cfg {
|
||||||
response.WithNetworkState(state),
|
response.WithNetworkState(state),
|
||||||
),
|
),
|
||||||
cfgObject: cfgObject{
|
cfgObject: cfgObject{
|
||||||
pool: initObjectPool(viperCfg),
|
pool: initObjectPool(appCfg),
|
||||||
},
|
},
|
||||||
netStatus: atomic.NewInt32(int32(control.NetmapStatus_STATUS_UNDEFINED)),
|
netStatus: atomic.NewInt32(int32(control.NetmapStatus_STATUS_UNDEFINED)),
|
||||||
healthStatus: atomic.NewInt32(int32(control.HealthStatus_HEALTH_STATUS_UNDEFINED)),
|
healthStatus: atomic.NewInt32(int32(control.HealthStatus_HEALTH_STATUS_UNDEFINED)),
|
||||||
|
@ -350,34 +330,6 @@ func initCfg(path string) *cfg {
|
||||||
return c
|
return c
|
||||||
}
|
}
|
||||||
|
|
||||||
func initViper(path string) *viper.Viper {
|
|
||||||
v := viper.New()
|
|
||||||
|
|
||||||
v.SetEnvPrefix(misc.Prefix)
|
|
||||||
v.AutomaticEnv()
|
|
||||||
v.SetEnvKeyReplacer(strings.NewReplacer(".", "_"))
|
|
||||||
|
|
||||||
defaultConfiguration(v)
|
|
||||||
|
|
||||||
if path != "" {
|
|
||||||
v.SetConfigFile(path)
|
|
||||||
v.SetConfigType("yml")
|
|
||||||
fatalOnErr(v.ReadInConfig())
|
|
||||||
}
|
|
||||||
|
|
||||||
return v
|
|
||||||
}
|
|
||||||
|
|
||||||
func defaultConfiguration(v *viper.Viper) {
|
|
||||||
v.SetDefault(cfgAPIClientDialTimeout, 5*time.Second)
|
|
||||||
|
|
||||||
v.SetDefault(cfgPolicerHeadTimeout, 5*time.Second)
|
|
||||||
|
|
||||||
v.SetDefault(cfgReplicatorPutTimeout, 5*time.Second)
|
|
||||||
|
|
||||||
v.SetDefault(cfgObjectPutPoolSize, 10)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (c *cfg) LocalAddress() *network.Address {
|
func (c *cfg) LocalAddress() *network.Address {
|
||||||
return c.localAddr
|
return c.localAddr
|
||||||
}
|
}
|
||||||
|
@ -494,12 +446,12 @@ func initShardOptions(c *cfg) {
|
||||||
c.cfgObject.cfgLocalStorage.shardOpts = opts
|
c.cfgObject.cfgLocalStorage.shardOpts = opts
|
||||||
}
|
}
|
||||||
|
|
||||||
func initObjectPool(cfg *viper.Viper) (pool cfgObjectRoutines) {
|
func initObjectPool(cfg *config.Config) (pool cfgObjectRoutines) {
|
||||||
var err error
|
var err error
|
||||||
|
|
||||||
optNonBlocking := ants.WithNonblocking(true)
|
optNonBlocking := ants.WithNonblocking(true)
|
||||||
|
|
||||||
pool.put, err = ants.NewPool(cfg.GetInt(cfgObjectPutPoolSize), optNonBlocking)
|
pool.put, err = ants.NewPool(objectconfig.Put(cfg).PoolSize(), optNonBlocking)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
fatalOnErr(err)
|
fatalOnErr(err)
|
||||||
}
|
}
|
||||||
|
|
|
@ -13,6 +13,9 @@ import (
|
||||||
"github.com/nspcc-dev/neofs-api-go/util/signature"
|
"github.com/nspcc-dev/neofs-api-go/util/signature"
|
||||||
"github.com/nspcc-dev/neofs-api-go/v2/object"
|
"github.com/nspcc-dev/neofs-api-go/v2/object"
|
||||||
objectGRPC "github.com/nspcc-dev/neofs-api-go/v2/object/grpc"
|
objectGRPC "github.com/nspcc-dev/neofs-api-go/v2/object/grpc"
|
||||||
|
apiclientconfig "github.com/nspcc-dev/neofs-node/cmd/neofs-node/config/apiclient"
|
||||||
|
policerconfig "github.com/nspcc-dev/neofs-node/cmd/neofs-node/config/policer"
|
||||||
|
replicatorconfig "github.com/nspcc-dev/neofs-node/cmd/neofs-node/config/replicator"
|
||||||
"github.com/nspcc-dev/neofs-node/pkg/core/netmap"
|
"github.com/nspcc-dev/neofs-node/pkg/core/netmap"
|
||||||
objectCore "github.com/nspcc-dev/neofs-node/pkg/core/object"
|
objectCore "github.com/nspcc-dev/neofs-node/pkg/core/object"
|
||||||
"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/engine"
|
"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/engine"
|
||||||
|
@ -153,7 +156,7 @@ func initObjectService(c *cfg) {
|
||||||
nodeOwner.SetNeo3Wallet(neo3Wallet)
|
nodeOwner.SetNeo3Wallet(neo3Wallet)
|
||||||
|
|
||||||
clientCache := cache.NewSDKClientCache(
|
clientCache := cache.NewSDKClientCache(
|
||||||
client.WithDialTimeout(c.viper.GetDuration(cfgAPIClientDialTimeout)))
|
client.WithDialTimeout(apiclientconfig.DialTimeout(c.appCfg)))
|
||||||
|
|
||||||
c.onShutdown(clientCache.CloseAll)
|
c.onShutdown(clientCache.CloseAll)
|
||||||
|
|
||||||
|
@ -177,7 +180,7 @@ func initObjectService(c *cfg) {
|
||||||
repl := replicator.New(
|
repl := replicator.New(
|
||||||
replicator.WithLogger(c.log),
|
replicator.WithLogger(c.log),
|
||||||
replicator.WithPutTimeout(
|
replicator.WithPutTimeout(
|
||||||
c.viper.GetDuration(cfgReplicatorPutTimeout),
|
replicatorconfig.PutTimeout(c.appCfg),
|
||||||
),
|
),
|
||||||
replicator.WithLocalStorage(ls),
|
replicator.WithLocalStorage(ls),
|
||||||
replicator.WithRemoteSender(
|
replicator.WithRemoteSender(
|
||||||
|
@ -204,7 +207,7 @@ func initObjectService(c *cfg) {
|
||||||
),
|
),
|
||||||
policer.WithLocalAddressSource(c),
|
policer.WithLocalAddressSource(c),
|
||||||
policer.WithHeadTimeout(
|
policer.WithHeadTimeout(
|
||||||
c.viper.GetDuration(cfgPolicerHeadTimeout),
|
policerconfig.HeadTimeout(c.appCfg),
|
||||||
),
|
),
|
||||||
policer.WithReplicator(repl),
|
policer.WithReplicator(repl),
|
||||||
policer.WithRedundantCopyCallback(func(addr *objectSDK.Address) {
|
policer.WithRedundantCopyCallback(func(addr *objectSDK.Address) {
|
||||||
|
|
Loading…
Reference in a new issue