package main import ( "context" "crypto/ecdsa" "net" "os" "path" "strconv" "strings" "sync" "time" "github.com/nspcc-dev/neo-go/pkg/encoding/fixedn" "github.com/nspcc-dev/neo-go/pkg/util" "github.com/nspcc-dev/neofs-api-go/pkg" "github.com/nspcc-dev/neofs-api-go/pkg/netmap" netmapV2 "github.com/nspcc-dev/neofs-api-go/v2/netmap" crypto "github.com/nspcc-dev/neofs-crypto" "github.com/nspcc-dev/neofs-node/misc" "github.com/nspcc-dev/neofs-node/pkg/core/container" netmapCore "github.com/nspcc-dev/neofs-node/pkg/core/netmap" "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/blobstor" "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/engine" meta "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/metabase" "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/shard" "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/writecache" "github.com/nspcc-dev/neofs-node/pkg/metrics" "github.com/nspcc-dev/neofs-node/pkg/morph/client" cntwrapper "github.com/nspcc-dev/neofs-node/pkg/morph/client/container/wrapper" nmwrapper "github.com/nspcc-dev/neofs-node/pkg/morph/client/netmap/wrapper" "github.com/nspcc-dev/neofs-node/pkg/morph/event" netmap2 "github.com/nspcc-dev/neofs-node/pkg/morph/event/netmap" "github.com/nspcc-dev/neofs-node/pkg/morph/timer" "github.com/nspcc-dev/neofs-node/pkg/network" "github.com/nspcc-dev/neofs-node/pkg/services/control" trustcontroller "github.com/nspcc-dev/neofs-node/pkg/services/reputation/local/controller" truststorage "github.com/nspcc-dev/neofs-node/pkg/services/reputation/local/storage" tokenStorage "github.com/nspcc-dev/neofs-node/pkg/services/session/storage" "github.com/nspcc-dev/neofs-node/pkg/services/util/response" util2 "github.com/nspcc-dev/neofs-node/pkg/util" "github.com/nspcc-dev/neofs-node/pkg/util/logger" "github.com/nspcc-dev/neofs-node/pkg/util/profiler" "github.com/panjf2000/ants/v2" "github.com/pkg/errors" "github.com/spf13/viper" "go.etcd.io/bbolt" "go.uber.org/atomic" "go.uber.org/zap" "google.golang.org/grpc" ) const ( // logger keys cfgLogLevel = "logger.level" // pprof keys cfgProfilerEnable = "pprof.enabled" cfgProfilerAddr = "pprof.address" cfgProfilerTTL = "pprof.shutdown_ttl" // metrics keys cfgMetricsEnable = "metrics.enabled" cfgMetricsAddr = "metrics.address" // config keys for cfgNodeInfo cfgNodeKey = "node.key" cfgBootstrapAddress = "node.address" cfgNodeAttributePrefix = "node.attribute" // config keys for cfgGRPC cfgListenAddress = "grpc.endpoint" cfgMaxMsgSize = "grpc.maxmessagesize" cfgReflectService = "grpc.enable_reflect_service" cfgDialTimeout = "grpc.dial_timeout" // config keys for cfgMorph cfgMorphRPCAddress = "morph.rpc_endpoint" cfgMorphNotifyRPCAddress = "morph.notification_endpoint" cfgMorphNotifyDialTimeout = "morph.dial_timeout" // config keys for cfgAccounting cfgAccountingContract = "accounting.scripthash" cfgAccountingFee = "accounting.fee" // config keys for cfgNetmap cfgNetmapContract = "netmap.scripthash" cfgNetmapFee = "netmap.fee" cfgNetmapWorkerPoolEnabled = "netmap.async_worker.enabled" cfgNetmapWorkerPoolSize = "netmap.async_worker.size" // config keys for cfgContainer cfgContainerContract = "container.scripthash" cfgContainerFee = "container.fee" cfgContainerWorkerPoolEnabled = "container.async_worker.enabled" cfgContainerWorkerPoolSize = "container.async_worker.size" // config keys for cfgReputation cfgReputationContract = "reputation.scripthash" cfgReputationWorkerPoolEnabled = "reputation.async_worker.enabled" cfgReputationWorkerPoolSize = "reputation.async_worker.size" cfgGCQueueSize = "gc.queuesize" cfgGCQueueTick = "gc.duration.sleep" cfgGCTimeout = "gc.duration.timeout" cfgPolicerWorkScope = "policer.work_scope" cfgPolicerExpRate = "policer.expansion_rate" cfgPolicerHeadTimeout = "policer.head_timeout" cfgReplicatorPutTimeout = "replicator.put_timeout" cfgReBootstrapRelay = "bootstrap.relay_only" cfgReBootstrapEnabled = "bootstrap.periodic.enabled" cfgReBootstrapInterval = "bootstrap.periodic.interval" cfgShutdownOfflineEnabled = "shutdown.offline.enabled" cfgObjectPutPoolSize = "pool.object.put.size" cfgObjectGetPoolSize = "pool.object.get.size" cfgObjectHeadPoolSize = "pool.object.head.size" cfgObjectSearchPoolSize = "pool.object.search.size" cfgObjectRangePoolSize = "pool.object.range.size" cfgObjectRangeHashPoolSize = "pool.object.rangehash.size" ) const ( cfgLocalStorageSection = "storage" cfgStorageShardSection = "shard" cfgShardUseWriteCache = "use_write_cache" cfgBlobStorSection = "blobstor" cfgWriteCacheSection = "writecache" cfgWriteCacheMemSize = "mem_size" cfgWriteCacheDBSize = "db_size" cfgWriteCacheSmallSize = "small_size" cfgWriteCacheMaxSize = "max_size" cfgWriteCacheWrkCount = "workers_count" cfgBlobStorCompress = "compress" cfgBlobStorShallowDepth = "shallow_depth" cfgBlobStorTreePath = "path" cfgBlobStorTreePerm = "perm" cfgBlobStorSmallSzLimit = "small_size_limit" cfgBlobStorBlzSection = "blobovnicza" cfgBlzSize = "size" cfgBlzShallowDepth = "shallow_depth" cfgBlzShallowWidth = "shallow_width" cfgBlzOpenedCacheSize = "opened_cache_size" cfgMetaBaseSection = "metabase" cfgMetaBasePath = "path" cfgMetaBasePerm = "perm" cfgGCSection = "gc" cfgGCRemoverBatchSize = "remover_batch_size" cfgGCRemoverSleepInt = "remover_sleep_interval" ) const cfgTombstoneLifetime = "tombstone_lifetime" const ( addressSize = 72 // 32 bytes oid, 32 bytes cid, 8 bytes protobuf encoding ) type cfg struct { ctx context.Context ctxCancel func() internalErr chan error // channel for internal application errors at runtime viper *viper.Viper log *zap.Logger wg *sync.WaitGroup key *ecdsa.PrivateKey apiVersion *pkg.Version cfgGRPC cfgGRPC cfgMorph cfgMorph cfgAccounting cfgAccounting cfgContainer cfgContainer cfgNetmap cfgNetmap privateTokenStore *tokenStorage.TokenStore cfgNodeInfo cfgNodeInfo localAddr *network.Address cfgObject cfgObject profiler profiler.Profiler metricsServer profiler.Metrics metricsCollector *metrics.StorageMetrics workers []worker respSvc *response.Service cfgControlService cfgControlService netStatus *atomic.Int32 healthStatus *atomic.Int32 closers []func() cfgReputation cfgReputation } type cfgGRPC struct { listener net.Listener server *grpc.Server maxChunkSize uint64 maxAddrAmount uint64 enableReflectService bool } type cfgMorph struct { client *client.Client blockTimers []*timer.BlockTimer // all combined timers eigenTrustTimer *timer.BlockTimer // timer for EigenTrust iterations } type cfgAccounting struct { scriptHash util.Uint160 fee fixedn.Fixed8 } type cfgContainer struct { scriptHash util.Uint160 fee fixedn.Fixed8 parsers map[event.Type]event.Parser subscribers map[event.Type][]event.Handler workerPool util2.WorkerPool // pool for asynchronous handlers } type cfgNetmap struct { scriptHash util.Uint160 wrapper *nmwrapper.Wrapper fee fixedn.Fixed8 parsers map[event.Type]event.Parser subscribers map[event.Type][]event.Handler workerPool util2.WorkerPool // pool for asynchronous handlers state *networkState reBootstrapEnabled bool reBoostrapTurnedOff *atomic.Bool // managed by control service in runtime reBootstrapInterval uint64 // in epochs goOfflineEnabled bool // send `UpdateState(offline)` tx at shutdown } type BootstrapType uint32 type cfgNodeInfo struct { // values from config bootType BootstrapType attributes []*netmap.NodeAttribute // values at runtime infoMtx sync.RWMutex info netmap.NodeInfo } type cfgObject struct { netMapStorage netmapCore.Source cnrStorage container.Source cnrClient *cntwrapper.Wrapper pool cfgObjectRoutines cfgLocalStorage cfgLocalStorage } type cfgLocalStorage struct { localStorage *engine.StorageEngine shardOpts [][]shard.Option } type cfgObjectRoutines struct { get, head, put, search, rng, rngHash *ants.Pool } type cfgControlService struct { server *grpc.Server } type cfgReputation struct { workerPool util2.WorkerPool // pool for EigenTrust algorithm's iterations localTrustStorage *truststorage.Storage localTrustCtrl *trustcontroller.Controller scriptHash util.Uint160 } const ( _ BootstrapType = iota StorageNode RelayNode ) func initCfg(path string) *cfg { viperCfg := initViper(path) key, err := crypto.LoadPrivateKey(viperCfg.GetString(cfgNodeKey)) fatalOnErr(err) u160Accounting, err := util.Uint160DecodeStringLE( viperCfg.GetString(cfgAccountingContract)) fatalOnErr(err) u160Netmap, err := util.Uint160DecodeStringLE( viperCfg.GetString(cfgNetmapContract)) fatalOnErr(err) u160Container, err := util.Uint160DecodeStringLE( viperCfg.GetString(cfgContainerContract)) fatalOnErr(err) u160Reputation, err := util.Uint160DecodeStringLE( viperCfg.GetString(cfgReputationContract)) fatalOnErr(err) log, err := logger.NewLogger(viperCfg) fatalOnErr(err) log = log.With( zap.String("app_name", misc.NodeName), zap.String("app_version", misc.Version), ) netAddr, err := network.AddressFromString(viperCfg.GetString(cfgBootstrapAddress)) fatalOnErr(err) maxChunkSize := viperCfg.GetUint64(cfgMaxMsgSize) * 3 / 4 // 25% to meta, 75% to payload maxAddrAmount := maxChunkSize / addressSize // each address is about 72 bytes state := newNetworkState() // initialize async workers if it is configured so containerWorkerPool, err := initContainerWorkerPool(viperCfg) fatalOnErr(err) netmapWorkerPool, err := initNetmapWorkerPool(viperCfg) fatalOnErr(err) reputationWorkerPool, err := initReputationWorkerPool(viperCfg) fatalOnErr(err) relayOnly := viperCfg.GetBool(cfgReBootstrapRelay) c := &cfg{ ctx: context.Background(), internalErr: make(chan error), viper: viperCfg, log: log, wg: new(sync.WaitGroup), key: key, apiVersion: pkg.SDKVersion(), cfgAccounting: cfgAccounting{ scriptHash: u160Accounting, fee: fixedn.Fixed8(viperCfg.GetInt(cfgAccountingFee)), }, cfgContainer: cfgContainer{ scriptHash: u160Container, fee: fixedn.Fixed8(viperCfg.GetInt(cfgContainerFee)), workerPool: containerWorkerPool, }, cfgNetmap: cfgNetmap{ scriptHash: u160Netmap, fee: fixedn.Fixed8(viperCfg.GetInt(cfgNetmapFee)), state: state, workerPool: netmapWorkerPool, reBootstrapInterval: viperCfg.GetUint64(cfgReBootstrapInterval), reBootstrapEnabled: !relayOnly && viperCfg.GetBool(cfgReBootstrapEnabled), reBoostrapTurnedOff: atomic.NewBool(relayOnly), goOfflineEnabled: viperCfg.GetBool(cfgShutdownOfflineEnabled), }, cfgNodeInfo: cfgNodeInfo{ bootType: StorageNode, attributes: parseAttributes(viperCfg), }, cfgGRPC: cfgGRPC{ maxChunkSize: maxChunkSize, maxAddrAmount: maxAddrAmount, enableReflectService: viperCfg.GetBool(cfgReflectService), }, localAddr: netAddr, respSvc: response.NewService( response.WithNetworkState(state), ), cfgObject: cfgObject{ pool: initObjectPool(viperCfg), }, netStatus: atomic.NewInt32(int32(control.NetmapStatus_STATUS_UNDEFINED)), healthStatus: atomic.NewInt32(int32(control.HealthStatus_HEALTH_STATUS_UNDEFINED)), cfgReputation: cfgReputation{ scriptHash: u160Reputation, workerPool: reputationWorkerPool, }, } if viperCfg.GetBool(cfgMetricsEnable) { c.metricsCollector = metrics.NewStorageMetrics() } initLocalStorage(c) return c } func initViper(path string) *viper.Viper { v := viper.New() v.SetEnvPrefix(misc.Prefix) v.AutomaticEnv() v.SetEnvKeyReplacer(strings.NewReplacer(".", "_")) defaultConfiguration(v) if path != "" { v.SetConfigFile(path) v.SetConfigType("yml") fatalOnErr(v.ReadInConfig()) } return v } func defaultConfiguration(v *viper.Viper) { v.SetDefault(cfgNodeKey, "") // node key v.SetDefault(cfgBootstrapAddress, "") // announced address of the node v.SetDefault(cfgMorphRPCAddress, []string{}) v.SetDefault(cfgMorphNotifyRPCAddress, []string{}) v.SetDefault(cfgMorphNotifyDialTimeout, 5*time.Second) v.SetDefault(cfgListenAddress, "127.0.0.1:50501") // listen address v.SetDefault(cfgMaxMsgSize, 4<<20) // transport msg limit 4 MiB v.SetDefault(cfgReflectService, false) v.SetDefault(cfgDialTimeout, 5*time.Second) v.SetDefault(cfgAccountingContract, "1aeefe1d0dfade49740fff779c02cd4a0538ffb1") v.SetDefault(cfgAccountingFee, "1") v.SetDefault(cfgContainerContract, "9d2ca84d7fb88213c4baced5a6ed4dc402309039") v.SetDefault(cfgContainerFee, "1") v.SetDefault(cfgContainerWorkerPoolEnabled, true) v.SetDefault(cfgContainerWorkerPoolSize, 10) v.SetDefault(cfgReputationWorkerPoolEnabled, true) v.SetDefault(cfgReputationWorkerPoolSize, 10) v.SetDefault(cfgNetmapContract, "75194459637323ea8837d2afe8225ec74a5658c3") v.SetDefault(cfgNetmapFee, "1") v.SetDefault(cfgNetmapWorkerPoolEnabled, true) v.SetDefault(cfgNetmapWorkerPoolSize, 10) v.SetDefault(cfgLogLevel, "info") v.SetDefault(cfgProfilerEnable, false) v.SetDefault(cfgProfilerAddr, ":6060") v.SetDefault(cfgProfilerTTL, "30s") v.SetDefault(cfgMetricsEnable, false) v.SetDefault(cfgMetricsAddr, ":9090") v.SetDefault(cfgGCQueueSize, 1000) v.SetDefault(cfgGCQueueTick, "5s") v.SetDefault(cfgGCTimeout, "5s") v.SetDefault(cfgPolicerWorkScope, 100) v.SetDefault(cfgPolicerExpRate, 10) // in % v.SetDefault(cfgPolicerHeadTimeout, 5*time.Second) v.SetDefault(cfgReplicatorPutTimeout, 5*time.Second) v.SetDefault(cfgReBootstrapEnabled, false) // in epochs v.SetDefault(cfgReBootstrapInterval, 2) // in epochs v.SetDefault(cfgObjectGetPoolSize, 10) v.SetDefault(cfgObjectHeadPoolSize, 10) v.SetDefault(cfgObjectPutPoolSize, 10) v.SetDefault(cfgObjectSearchPoolSize, 10) v.SetDefault(cfgObjectRangePoolSize, 10) v.SetDefault(cfgObjectRangeHashPoolSize, 10) v.SetDefault(cfgCtrlSvcAuthorizedKeys, []string{}) v.SetDefault(cfgTombstoneLifetime, 5) } func (c *cfg) LocalAddress() *network.Address { return c.localAddr } func initLocalStorage(c *cfg) { initShardOptions(c) engineOpts := []engine.Option{engine.WithLogger(c.log)} if c.metricsCollector != nil { engineOpts = append(engineOpts, engine.WithMetrics(c.metricsCollector)) } ls := engine.New(engineOpts...) for _, opts := range c.cfgObject.cfgLocalStorage.shardOpts { id, err := ls.AddShard(opts...) fatalOnErr(err) c.log.Info("shard attached to engine", zap.Stringer("id", id), ) } c.cfgObject.cfgLocalStorage.localStorage = ls c.onShutdown(func() { c.log.Info("closing components of the storage engine...") err := ls.Close() if err != nil { c.log.Info("storage engine closing failure", zap.String("error", err.Error()), ) } else { c.log.Info("all components of the storage engine closed successfully") } }) } func initShardOptions(c *cfg) { var opts [][]shard.Option for i := 0; ; i++ { prefix := configPath( cfgLocalStorageSection, cfgStorageShardSection, strconv.Itoa(i), ) useCache := c.viper.GetBool( configPath(prefix, cfgShardUseWriteCache), ) writeCachePrefix := configPath(prefix, cfgWriteCacheSection) writeCachePath := c.viper.GetString( configPath(writeCachePrefix, cfgBlobStorTreePath), ) if useCache && writeCachePath == "" { c.log.Warn("incorrect writeCache path, ignore shard") break } writeCacheMemSize := c.viper.GetUint64(configPath(writeCachePrefix, cfgWriteCacheMemSize)) writeCacheDBSize := c.viper.GetUint64(configPath(writeCachePrefix, cfgWriteCacheDBSize)) writeCacheSmallSize := c.viper.GetUint64(configPath(writeCachePrefix, cfgWriteCacheSmallSize)) writeCacheMaxSize := c.viper.GetUint64(configPath(writeCachePrefix, cfgWriteCacheMaxSize)) writeCacheWrkCount := c.viper.GetInt(configPath(writeCachePrefix, cfgWriteCacheWrkCount)) blobPrefix := configPath(prefix, cfgBlobStorSection) blobPath := c.viper.GetString( configPath(blobPrefix, cfgBlobStorTreePath), ) if blobPath == "" { c.log.Warn("incorrect blobStor path, ignore shard") break } compressObjects := c.viper.GetBool( configPath(blobPrefix, cfgBlobStorCompress), ) blobPerm := os.FileMode(c.viper.GetInt( configPath(blobPrefix, cfgBlobStorTreePerm), )) shallowDepth := c.viper.GetInt( configPath(blobPrefix, cfgBlobStorShallowDepth), ) smallSzLimit := c.viper.GetUint64( configPath(blobPrefix, cfgBlobStorSmallSzLimit), ) if smallSzLimit == 0 { smallSzLimit = 1 << 20 // 1MB } if writeCacheMaxSize <= 0 { writeCacheSmallSize = smallSzLimit } blzPrefix := configPath(blobPrefix, cfgBlobStorBlzSection) blzSize := c.viper.GetUint64( configPath(blzPrefix, cfgBlzSize), ) if blzSize == 0 { blzSize = 1 << 30 // 1 GB } blzShallowDepth := c.viper.GetUint64( configPath(blzPrefix, cfgBlzShallowDepth), ) blzShallowWidth := c.viper.GetUint64( configPath(blzPrefix, cfgBlzShallowWidth), ) blzCacheSize := c.viper.GetInt( configPath(blzPrefix, cfgBlzOpenedCacheSize), ) metaPrefix := configPath(prefix, cfgMetaBaseSection) metaPath := c.viper.GetString( configPath(metaPrefix, cfgMetaBasePath), ) metaPerm := os.FileMode(c.viper.GetUint32( configPath(metaPrefix, cfgMetaBasePerm), )) fatalOnErr(os.MkdirAll(path.Dir(metaPath), metaPerm)) gcPrefix := configPath(prefix, cfgGCSection) rmBatchSize := c.viper.GetInt( configPath(gcPrefix, cfgGCRemoverBatchSize), ) rmSleepInterval := c.viper.GetDuration( configPath(gcPrefix, cfgGCRemoverSleepInt), ) opts = append(opts, []shard.Option{ shard.WithLogger(c.log), shard.WithBlobStorOptions( blobstor.WithRootPath(blobPath), blobstor.WithCompressObjects(compressObjects, c.log), blobstor.WithRootPerm(blobPerm), blobstor.WithShallowDepth(shallowDepth), blobstor.WithSmallSizeLimit(smallSzLimit), blobstor.WithBlobovniczaSize(blzSize), blobstor.WithBlobovniczaShallowDepth(blzShallowDepth), blobstor.WithBlobovniczaShallowWidth(blzShallowWidth), blobstor.WithBlobovniczaOpenedCacheSize(blzCacheSize), blobstor.WithLogger(c.log), ), shard.WithMetaBaseOptions( meta.WithLogger(c.log), meta.WithPath(metaPath), meta.WithPermissions(metaPerm), meta.WithBoltDBOptions(&bbolt.Options{ Timeout: 100 * time.Millisecond, }), ), shard.WithWriteCache(useCache), shard.WithWriteCacheOptions( writecache.WithPath(writeCachePath), writecache.WithLogger(c.log), writecache.WithMaxMemSize(writeCacheMemSize), writecache.WithMaxObjectSize(writeCacheMaxSize), writecache.WithSmallObjectSize(writeCacheSmallSize), writecache.WithMaxDBSize(writeCacheDBSize), writecache.WithFlushWorkersCount(writeCacheWrkCount), ), shard.WithRemoverBatchSize(rmBatchSize), shard.WithGCRemoverSleepInterval(rmSleepInterval), shard.WithGCWorkerPoolInitializer(func(sz int) util2.WorkerPool { pool, err := ants.NewPool(sz) fatalOnErr(err) return pool }), shard.WithGCEventChannelInitializer(func() <-chan shard.Event { ch := make(chan shard.Event) addNewEpochNotificationHandler(c, func(ev event.Event) { ch <- shard.EventNewEpoch(ev.(netmap2.NewEpoch).EpochNumber()) }) return ch }), }) c.log.Info("storage shard options", zap.Bool("with write cache", useCache), zap.String("with write cache path", writeCachePath), zap.String("BLOB path", blobPath), zap.Stringer("BLOB permissions", blobPerm), zap.Bool("BLOB compress", compressObjects), zap.Int("BLOB shallow depth", shallowDepth), zap.Uint64("BLOB small size limit", smallSzLimit), zap.String("metabase path", metaPath), zap.Stringer("metabase permissions", metaPerm), zap.Int("GC remover batch size", rmBatchSize), zap.Duration("GC remover sleep interval", rmSleepInterval), ) } if len(opts) == 0 { fatalOnErr(errors.New("no correctly set up shards, exit")) } c.cfgObject.cfgLocalStorage.shardOpts = opts } func configPath(sections ...string) string { return strings.Join(sections, ".") } func initObjectPool(cfg *viper.Viper) (pool cfgObjectRoutines) { var err error optNonBlocking := ants.WithNonblocking(true) pool.get, err = ants.NewPool(cfg.GetInt(cfgObjectGetPoolSize), optNonBlocking) if err != nil { fatalOnErr(err) } pool.head, err = ants.NewPool(cfg.GetInt(cfgObjectHeadPoolSize), optNonBlocking) if err != nil { fatalOnErr(err) } pool.search, err = ants.NewPool(cfg.GetInt(cfgObjectSearchPoolSize), optNonBlocking) if err != nil { fatalOnErr(err) } pool.put, err = ants.NewPool(cfg.GetInt(cfgObjectPutPoolSize), optNonBlocking) if err != nil { fatalOnErr(err) } pool.rng, err = ants.NewPool(cfg.GetInt(cfgObjectRangePoolSize), optNonBlocking) if err != nil { fatalOnErr(err) } pool.rngHash, err = ants.NewPool(cfg.GetInt(cfgObjectRangeHashPoolSize), optNonBlocking) if err != nil { fatalOnErr(err) } return pool } func initNetmapWorkerPool(v *viper.Viper) (util2.WorkerPool, error) { if v.GetBool(cfgNetmapWorkerPoolEnabled) { // return async worker pool return ants.NewPool(v.GetInt(cfgNetmapWorkerPoolSize)) } // return sync worker pool return util2.SyncWorkerPool{}, nil } func initContainerWorkerPool(v *viper.Viper) (util2.WorkerPool, error) { if v.GetBool(cfgContainerWorkerPoolEnabled) { // return async worker pool return ants.NewPool(v.GetInt(cfgContainerWorkerPoolSize)) } // return sync worker pool return util2.SyncWorkerPool{}, nil } func initReputationWorkerPool(v *viper.Viper) (util2.WorkerPool, error) { if v.GetBool(cfgReputationWorkerPoolEnabled) { // return async worker pool return ants.NewPool(v.GetInt(cfgReputationWorkerPoolSize)) } // return sync worker pool return util2.SyncWorkerPool{}, nil } func (c *cfg) LocalNodeInfo() (*netmapV2.NodeInfo, error) { ni := c.localNodeInfo() return ni.ToV2(), nil } // handleLocalNodeInfo rewrites local node info func (c *cfg) handleLocalNodeInfo(ni *netmap.NodeInfo) { c.cfgNodeInfo.infoMtx.Lock() if ni != nil { c.cfgNodeInfo.info = *ni } c.updateStatusWithoutLock(ni) c.cfgNodeInfo.infoMtx.Unlock() } // handleNodeInfoStatus updates node info status without rewriting whole local // node info status func (c *cfg) handleNodeInfoStatus(ni *netmap.NodeInfo) { c.cfgNodeInfo.infoMtx.Lock() c.updateStatusWithoutLock(ni) c.cfgNodeInfo.infoMtx.Unlock() } func (c *cfg) localNodeInfo() netmap.NodeInfo { c.cfgNodeInfo.infoMtx.RLock() defer c.cfgNodeInfo.infoMtx.RUnlock() return c.cfgNodeInfo.info } func (c *cfg) toOnlineLocalNodeInfo() *netmap.NodeInfo { ni := c.localNodeInfo() ni.SetState(netmap.NodeStateOnline) return &ni } func (c *cfg) updateStatusWithoutLock(ni *netmap.NodeInfo) { var nmState netmap.NodeState if ni != nil { nmState = ni.State() } else { nmState = netmap.NodeStateOffline c.cfgNodeInfo.info.SetState(nmState) } switch nmState { default: c.setNetmapStatus(control.NetmapStatus_STATUS_UNDEFINED) case netmap.NodeStateOnline: c.setNetmapStatus(control.NetmapStatus_ONLINE) case netmap.NodeStateOffline: c.setNetmapStatus(control.NetmapStatus_OFFLINE) } }