package main import ( "context" "crypto/ecdsa" "errors" "net" "os" "path" "strconv" "strings" "sync" "time" "github.com/nspcc-dev/neo-go/pkg/util" "github.com/nspcc-dev/neofs-api-go/pkg" "github.com/nspcc-dev/neofs-api-go/pkg/netmap" netmapV2 "github.com/nspcc-dev/neofs-api-go/v2/netmap" crypto "github.com/nspcc-dev/neofs-crypto" "github.com/nspcc-dev/neofs-node/misc" "github.com/nspcc-dev/neofs-node/pkg/core/container" netmapCore "github.com/nspcc-dev/neofs-node/pkg/core/netmap" "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/blobstor" "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/engine" meta "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/metabase" "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/shard" "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/writecache" "github.com/nspcc-dev/neofs-node/pkg/metrics" "github.com/nspcc-dev/neofs-node/pkg/morph/client" cntwrapper "github.com/nspcc-dev/neofs-node/pkg/morph/client/container/wrapper" nmwrapper "github.com/nspcc-dev/neofs-node/pkg/morph/client/netmap/wrapper" "github.com/nspcc-dev/neofs-node/pkg/morph/event" netmap2 "github.com/nspcc-dev/neofs-node/pkg/morph/event/netmap" "github.com/nspcc-dev/neofs-node/pkg/morph/timer" "github.com/nspcc-dev/neofs-node/pkg/network" "github.com/nspcc-dev/neofs-node/pkg/services/control" trustcontroller "github.com/nspcc-dev/neofs-node/pkg/services/reputation/local/controller" truststorage "github.com/nspcc-dev/neofs-node/pkg/services/reputation/local/storage" tokenStorage "github.com/nspcc-dev/neofs-node/pkg/services/session/storage" "github.com/nspcc-dev/neofs-node/pkg/services/util/response" util2 "github.com/nspcc-dev/neofs-node/pkg/util" "github.com/nspcc-dev/neofs-node/pkg/util/logger" "github.com/panjf2000/ants/v2" "github.com/spf13/viper" "go.etcd.io/bbolt" "go.uber.org/atomic" "go.uber.org/zap" "google.golang.org/grpc" ) const ( // logger keys cfgLogLevel = "logger.level" // pprof keys cfgProfilerAddr = "profiler.address" cfgProfilerShutdownTimeout = "profiler.shutdown_timeout" // metrics keys cfgMetricsAddr = "metrics.address" cfgMetricsShutdownTimeout = "metrics.shutdown_timeout" // config keys for cfgNodeInfo cfgNodeKey = "node.key" cfgBootstrapAddress = "node.address" cfgNodeAttributePrefix = "node.attribute" cfgNodeRelay = "node.relay" // config keys for cfgGRPC cfgListenAddress = "grpc.endpoint" cfgTLSEnabled = "grpc.tls.enabled" cfgTLSCertFile = "grpc.tls.certificate" cfgTLSKeyFile = "grpc.tls.key" // config keys for API client cache cfgAPIClientDialTimeout = "apiclient.dial_timeout" // config keys for cfgMorph cfgMorphRPCAddress = "morph.rpc_endpoint" cfgMorphNotifyRPCAddress = "morph.notification_endpoint" cfgMorphNotifyDialTimeout = "morph.dial_timeout" cfgMainChainRPCAddress = "mainchain.rpc_endpoint" cfgMainChainDialTimeout = "mainchain.dial_timeout" // config keys for cfgAccounting cfgAccountingContract = "accounting.scripthash" // config keys for cfgNetmap cfgNetmapContract = "netmap.scripthash" // config keys for cfgContainer cfgContainerContract = "container.scripthash" // config keys for cfgReputation cfgReputationContract = "reputation.scripthash" cfgPolicerHeadTimeout = "policer.head_timeout" cfgReplicatorPutTimeout = "replicator.put_timeout" cfgObjectPutPoolSize = "object.put.pool_size" ) const ( cfgLocalStorageSection = "storage" cfgStorageShardSection = "shard" cfgShardUseWriteCache = "use_write_cache" cfgBlobStorSection = "blobstor" cfgWriteCacheSection = "writecache" cfgWriteCacheMemSize = "mem_size" cfgWriteCacheDBSize = "db_size" cfgWriteCacheSmallSize = "small_size" cfgWriteCacheMaxSize = "max_size" cfgWriteCacheWrkCount = "workers_count" cfgBlobStorCompress = "compress" cfgBlobStorShallowDepth = "shallow_depth" cfgBlobStorTreePath = "path" cfgBlobStorTreePerm = "perm" cfgBlobStorSmallSzLimit = "small_size_limit" cfgBlobStorBlzSection = "blobovnicza" cfgBlzSize = "size" cfgBlzShallowDepth = "shallow_depth" cfgBlzShallowWidth = "shallow_width" cfgBlzOpenedCacheSize = "opened_cache_size" cfgMetaBaseSection = "metabase" cfgMetaBasePath = "path" cfgMetaBasePerm = "perm" cfgGCSection = "gc" cfgGCRemoverBatchSize = "remover_batch_size" cfgGCRemoverSleepInt = "remover_sleep_interval" ) const ( addressSize = 72 // 32 bytes oid, 32 bytes cid, 8 bytes protobuf encoding ) const maxMsgSize = 4 << 20 // transport msg limit 4 MiB // capacity of the pools of the morph notification handlers // for each contract listener. const notificationHandlerPoolSize = 10 type cfg struct { ctx context.Context ctxCancel func() internalErr chan error // channel for internal application errors at runtime viper *viper.Viper log *zap.Logger wg *sync.WaitGroup key *ecdsa.PrivateKey apiVersion *pkg.Version cfgGRPC cfgGRPC cfgMorph cfgMorph cfgAccounting cfgAccounting cfgContainer cfgContainer cfgNetmap cfgNetmap privateTokenStore *tokenStorage.TokenStore cfgNodeInfo cfgNodeInfo localAddr *network.Address cfgObject cfgObject metricsCollector *metrics.StorageMetrics workers []worker respSvc *response.Service cfgControlService cfgControlService netStatus *atomic.Int32 healthStatus *atomic.Int32 closers []func() cfgReputation cfgReputation mainChainClient *client.Client } type cfgGRPC struct { listener net.Listener server *grpc.Server maxChunkSize uint64 maxAddrAmount uint64 tlsEnabled bool tlsCertFile string tlsKeyFile string } type cfgMorph struct { client *client.Client blockTimers []*timer.BlockTimer // all combined timers eigenTrustTimer *timer.BlockTimer // timer for EigenTrust iterations } type cfgAccounting struct { scriptHash util.Uint160 } type cfgContainer struct { scriptHash util.Uint160 parsers map[event.Type]event.Parser subscribers map[event.Type][]event.Handler workerPool util2.WorkerPool // pool for asynchronous handlers } type cfgNetmap struct { scriptHash util.Uint160 wrapper *nmwrapper.Wrapper parsers map[event.Type]event.Parser subscribers map[event.Type][]event.Handler workerPool util2.WorkerPool // pool for asynchronous handlers state *networkState reBootstrapEnabled bool reBoostrapTurnedOff *atomic.Bool // managed by control service in runtime startEpoch uint64 // epoch number when application is started } type BootstrapType uint32 type cfgNodeInfo struct { // values from config bootType BootstrapType attributes []*netmap.NodeAttribute // values at runtime infoMtx sync.RWMutex info netmap.NodeInfo } type cfgObject struct { netMapStorage netmapCore.Source cnrStorage container.Source cnrClient *cntwrapper.Wrapper pool cfgObjectRoutines cfgLocalStorage cfgLocalStorage } type cfgLocalStorage struct { localStorage *engine.StorageEngine shardOpts [][]shard.Option } type cfgObjectRoutines struct { put *ants.Pool } type cfgControlService struct { server *grpc.Server } type cfgReputation struct { workerPool util2.WorkerPool // pool for EigenTrust algorithm's iterations localTrustStorage *truststorage.Storage localTrustCtrl *trustcontroller.Controller scriptHash util.Uint160 } const ( _ BootstrapType = iota StorageNode RelayNode ) func initCfg(path string) *cfg { viperCfg := initViper(path) key, err := crypto.LoadPrivateKey(viperCfg.GetString(cfgNodeKey)) fatalOnErr(err) u160Accounting, err := util.Uint160DecodeStringLE( viperCfg.GetString(cfgAccountingContract)) fatalOnErr(err) u160Netmap, err := util.Uint160DecodeStringLE( viperCfg.GetString(cfgNetmapContract)) fatalOnErr(err) u160Container, err := util.Uint160DecodeStringLE( viperCfg.GetString(cfgContainerContract)) fatalOnErr(err) u160Reputation, err := util.Uint160DecodeStringLE( viperCfg.GetString(cfgReputationContract)) fatalOnErr(err) var logPrm logger.Prm err = logPrm.SetLevelString( viperCfg.GetString(cfgLogLevel), ) fatalOnErr(err) log, err := logger.NewLogger(logPrm) fatalOnErr(err) netAddr, err := network.AddressFromString(viperCfg.GetString(cfgBootstrapAddress)) fatalOnErr(err) maxChunkSize := uint64(maxMsgSize) * 3 / 4 // 25% to meta, 75% to payload maxAddrAmount := uint64(maxChunkSize) / addressSize // each address is about 72 bytes var ( tlsEnabled bool tlsCertFile string tlsKeyFile string ) if viperCfg.GetBool(cfgTLSEnabled) { tlsEnabled = true tlsCertFile = viperCfg.GetString(cfgTLSCertFile) tlsKeyFile = viperCfg.GetString(cfgTLSKeyFile) } state := newNetworkState() containerWorkerPool, err := ants.NewPool(notificationHandlerPoolSize) fatalOnErr(err) netmapWorkerPool, err := ants.NewPool(notificationHandlerPoolSize) fatalOnErr(err) reputationWorkerPool, err := ants.NewPool(notificationHandlerPoolSize) fatalOnErr(err) relayOnly := viperCfg.GetBool(cfgNodeRelay) c := &cfg{ ctx: context.Background(), internalErr: make(chan error), viper: viperCfg, log: log, wg: new(sync.WaitGroup), key: key, apiVersion: pkg.SDKVersion(), cfgAccounting: cfgAccounting{ scriptHash: u160Accounting, }, cfgContainer: cfgContainer{ scriptHash: u160Container, workerPool: containerWorkerPool, }, cfgNetmap: cfgNetmap{ scriptHash: u160Netmap, state: state, workerPool: netmapWorkerPool, reBootstrapEnabled: !relayOnly, reBoostrapTurnedOff: atomic.NewBool(relayOnly), }, cfgNodeInfo: cfgNodeInfo{ bootType: StorageNode, attributes: parseAttributes(viperCfg), }, cfgGRPC: cfgGRPC{ maxChunkSize: maxChunkSize, maxAddrAmount: maxAddrAmount, tlsEnabled: tlsEnabled, tlsCertFile: tlsCertFile, tlsKeyFile: tlsKeyFile, }, localAddr: netAddr, respSvc: response.NewService( response.WithNetworkState(state), ), cfgObject: cfgObject{ pool: initObjectPool(viperCfg), }, netStatus: atomic.NewInt32(int32(control.NetmapStatus_STATUS_UNDEFINED)), healthStatus: atomic.NewInt32(int32(control.HealthStatus_HEALTH_STATUS_UNDEFINED)), cfgReputation: cfgReputation{ scriptHash: u160Reputation, workerPool: reputationWorkerPool, }, } if c.viper.GetString(cfgMetricsAddr) != "" { c.metricsCollector = metrics.NewStorageMetrics() } initLocalStorage(c) return c } func initViper(path string) *viper.Viper { v := viper.New() v.SetEnvPrefix(misc.Prefix) v.AutomaticEnv() v.SetEnvKeyReplacer(strings.NewReplacer(".", "_")) defaultConfiguration(v) if path != "" { v.SetConfigFile(path) v.SetConfigType("yml") fatalOnErr(v.ReadInConfig()) } return v } func defaultConfiguration(v *viper.Viper) { v.SetDefault(cfgNodeKey, "") // node key v.SetDefault(cfgBootstrapAddress, "") // announced address of the node v.SetDefault(cfgNodeRelay, false) v.SetDefault(cfgMorphRPCAddress, []string{}) v.SetDefault(cfgMorphNotifyRPCAddress, []string{}) v.SetDefault(cfgMorphNotifyDialTimeout, 5*time.Second) v.SetDefault(cfgListenAddress, "127.0.0.1:50501") // listen address v.SetDefault(cfgTLSEnabled, false) v.SetDefault(cfgTLSCertFile, "") v.SetDefault(cfgTLSKeyFile, "") v.SetDefault(cfgAPIClientDialTimeout, 5*time.Second) v.SetDefault(cfgAccountingContract, "") v.SetDefault(cfgContainerContract, "") v.SetDefault(cfgNetmapContract, "") v.SetDefault(cfgLogLevel, "info") v.SetDefault(cfgProfilerShutdownTimeout, "30s") v.SetDefault(cfgMetricsShutdownTimeout, "30s") v.SetDefault(cfgPolicerHeadTimeout, 5*time.Second) v.SetDefault(cfgReplicatorPutTimeout, 5*time.Second) v.SetDefault(cfgObjectPutPoolSize, 10) v.SetDefault(cfgCtrlSvcAuthorizedKeys, []string{}) } func (c *cfg) LocalAddress() *network.Address { return c.localAddr } func initLocalStorage(c *cfg) { initShardOptions(c) engineOpts := []engine.Option{engine.WithLogger(c.log)} if c.metricsCollector != nil { engineOpts = append(engineOpts, engine.WithMetrics(c.metricsCollector)) } ls := engine.New(engineOpts...) for _, opts := range c.cfgObject.cfgLocalStorage.shardOpts { id, err := ls.AddShard(opts...) fatalOnErr(err) c.log.Info("shard attached to engine", zap.Stringer("id", id), ) } c.cfgObject.cfgLocalStorage.localStorage = ls c.onShutdown(func() { c.log.Info("closing components of the storage engine...") err := ls.Close() if err != nil { c.log.Info("storage engine closing failure", zap.String("error", err.Error()), ) } else { c.log.Info("all components of the storage engine closed successfully") } }) } func initShardOptions(c *cfg) { var opts [][]shard.Option for i := 0; ; i++ { prefix := configPath( cfgLocalStorageSection, cfgStorageShardSection, strconv.Itoa(i), ) useCache := c.viper.GetBool( configPath(prefix, cfgShardUseWriteCache), ) writeCachePrefix := configPath(prefix, cfgWriteCacheSection) writeCachePath := c.viper.GetString( configPath(writeCachePrefix, cfgBlobStorTreePath), ) if useCache && writeCachePath == "" { c.log.Warn("incorrect writeCache path, ignore shard") break } writeCacheMemSize := c.viper.GetUint64(configPath(writeCachePrefix, cfgWriteCacheMemSize)) writeCacheDBSize := c.viper.GetUint64(configPath(writeCachePrefix, cfgWriteCacheDBSize)) writeCacheSmallSize := c.viper.GetUint64(configPath(writeCachePrefix, cfgWriteCacheSmallSize)) writeCacheMaxSize := c.viper.GetUint64(configPath(writeCachePrefix, cfgWriteCacheMaxSize)) writeCacheWrkCount := c.viper.GetInt(configPath(writeCachePrefix, cfgWriteCacheWrkCount)) blobPrefix := configPath(prefix, cfgBlobStorSection) blobPath := c.viper.GetString( configPath(blobPrefix, cfgBlobStorTreePath), ) if blobPath == "" { c.log.Warn("incorrect blobStor path, ignore shard") break } compressObjects := c.viper.GetBool( configPath(blobPrefix, cfgBlobStorCompress), ) blobPerm := os.FileMode(c.viper.GetInt( configPath(blobPrefix, cfgBlobStorTreePerm), )) shallowDepth := c.viper.GetInt( configPath(blobPrefix, cfgBlobStorShallowDepth), ) smallSzLimit := c.viper.GetUint64( configPath(blobPrefix, cfgBlobStorSmallSzLimit), ) if smallSzLimit == 0 { smallSzLimit = 1 << 20 // 1MB } if writeCacheMaxSize <= 0 { writeCacheSmallSize = smallSzLimit } blzPrefix := configPath(blobPrefix, cfgBlobStorBlzSection) blzSize := c.viper.GetUint64( configPath(blzPrefix, cfgBlzSize), ) if blzSize == 0 { blzSize = 1 << 30 // 1 GB } blzShallowDepth := c.viper.GetUint64( configPath(blzPrefix, cfgBlzShallowDepth), ) blzShallowWidth := c.viper.GetUint64( configPath(blzPrefix, cfgBlzShallowWidth), ) blzCacheSize := c.viper.GetInt( configPath(blzPrefix, cfgBlzOpenedCacheSize), ) metaPrefix := configPath(prefix, cfgMetaBaseSection) metaPath := c.viper.GetString( configPath(metaPrefix, cfgMetaBasePath), ) metaPerm := os.FileMode(c.viper.GetUint32( configPath(metaPrefix, cfgMetaBasePerm), )) fatalOnErr(os.MkdirAll(path.Dir(metaPath), metaPerm)) gcPrefix := configPath(prefix, cfgGCSection) rmBatchSize := c.viper.GetInt( configPath(gcPrefix, cfgGCRemoverBatchSize), ) rmSleepInterval := c.viper.GetDuration( configPath(gcPrefix, cfgGCRemoverSleepInt), ) opts = append(opts, []shard.Option{ shard.WithLogger(c.log), shard.WithBlobStorOptions( blobstor.WithRootPath(blobPath), blobstor.WithCompressObjects(compressObjects, c.log), blobstor.WithRootPerm(blobPerm), blobstor.WithShallowDepth(shallowDepth), blobstor.WithSmallSizeLimit(smallSzLimit), blobstor.WithBlobovniczaSize(blzSize), blobstor.WithBlobovniczaShallowDepth(blzShallowDepth), blobstor.WithBlobovniczaShallowWidth(blzShallowWidth), blobstor.WithBlobovniczaOpenedCacheSize(blzCacheSize), blobstor.WithLogger(c.log), ), shard.WithMetaBaseOptions( meta.WithLogger(c.log), meta.WithPath(metaPath), meta.WithPermissions(metaPerm), meta.WithBoltDBOptions(&bbolt.Options{ Timeout: 100 * time.Millisecond, }), ), shard.WithWriteCache(useCache), shard.WithWriteCacheOptions( writecache.WithPath(writeCachePath), writecache.WithLogger(c.log), writecache.WithMaxMemSize(writeCacheMemSize), writecache.WithMaxObjectSize(writeCacheMaxSize), writecache.WithSmallObjectSize(writeCacheSmallSize), writecache.WithMaxDBSize(writeCacheDBSize), writecache.WithFlushWorkersCount(writeCacheWrkCount), ), shard.WithRemoverBatchSize(rmBatchSize), shard.WithGCRemoverSleepInterval(rmSleepInterval), shard.WithGCWorkerPoolInitializer(func(sz int) util2.WorkerPool { pool, err := ants.NewPool(sz) fatalOnErr(err) return pool }), shard.WithGCEventChannelInitializer(func() <-chan shard.Event { ch := make(chan shard.Event) addNewEpochNotificationHandler(c, func(ev event.Event) { ch <- shard.EventNewEpoch(ev.(netmap2.NewEpoch).EpochNumber()) }) return ch }), }) c.log.Info("storage shard options", zap.Bool("with write cache", useCache), zap.String("with write cache path", writeCachePath), zap.String("BLOB path", blobPath), zap.Stringer("BLOB permissions", blobPerm), zap.Bool("BLOB compress", compressObjects), zap.Int("BLOB shallow depth", shallowDepth), zap.Uint64("BLOB small size limit", smallSzLimit), zap.String("metabase path", metaPath), zap.Stringer("metabase permissions", metaPerm), zap.Int("GC remover batch size", rmBatchSize), zap.Duration("GC remover sleep interval", rmSleepInterval), ) } if len(opts) == 0 { fatalOnErr(errors.New("no correctly set up shards, exit")) } c.cfgObject.cfgLocalStorage.shardOpts = opts } func configPath(sections ...string) string { return strings.Join(sections, ".") } func initObjectPool(cfg *viper.Viper) (pool cfgObjectRoutines) { var err error optNonBlocking := ants.WithNonblocking(true) pool.put, err = ants.NewPool(cfg.GetInt(cfgObjectPutPoolSize), optNonBlocking) if err != nil { fatalOnErr(err) } return pool } func (c *cfg) LocalNodeInfo() (*netmapV2.NodeInfo, error) { ni := c.localNodeInfo() return ni.ToV2(), nil } // handleLocalNodeInfo rewrites local node info func (c *cfg) handleLocalNodeInfo(ni *netmap.NodeInfo) { c.cfgNodeInfo.infoMtx.Lock() if ni != nil { c.cfgNodeInfo.info = *ni } c.updateStatusWithoutLock(ni) c.cfgNodeInfo.infoMtx.Unlock() } // handleNodeInfoStatus updates node info status without rewriting whole local // node info status func (c *cfg) handleNodeInfoStatus(ni *netmap.NodeInfo) { c.cfgNodeInfo.infoMtx.Lock() c.updateStatusWithoutLock(ni) c.cfgNodeInfo.infoMtx.Unlock() } func (c *cfg) localNodeInfo() netmap.NodeInfo { c.cfgNodeInfo.infoMtx.RLock() defer c.cfgNodeInfo.infoMtx.RUnlock() return c.cfgNodeInfo.info } func (c *cfg) toOnlineLocalNodeInfo() *netmap.NodeInfo { ni := c.localNodeInfo() ni.SetState(netmap.NodeStateOnline) return &ni } func (c *cfg) updateStatusWithoutLock(ni *netmap.NodeInfo) { var nmState netmap.NodeState if ni != nil { nmState = ni.State() } else { nmState = netmap.NodeStateOffline c.cfgNodeInfo.info.SetState(nmState) } switch nmState { default: c.setNetmapStatus(control.NetmapStatus_STATUS_UNDEFINED) case netmap.NodeStateOnline: c.setNetmapStatus(control.NetmapStatus_ONLINE) case netmap.NodeStateOffline: c.setNetmapStatus(control.NetmapStatus_OFFLINE) } }