package main import ( "context" "errors" "fmt" "io/fs" "net" "os" "os/signal" "sync" atomicstd "sync/atomic" "syscall" "time" "github.com/nspcc-dev/neo-go/pkg/crypto/keys" neogoutil "github.com/nspcc-dev/neo-go/pkg/util" netmapV2 "github.com/nspcc-dev/neofs-api-go/v2/netmap" "github.com/nspcc-dev/neofs-node/cmd/neofs-node/config" apiclientconfig "github.com/nspcc-dev/neofs-node/cmd/neofs-node/config/apiclient" contractsconfig "github.com/nspcc-dev/neofs-node/cmd/neofs-node/config/contracts" engineconfig "github.com/nspcc-dev/neofs-node/cmd/neofs-node/config/engine" shardconfig "github.com/nspcc-dev/neofs-node/cmd/neofs-node/config/engine/shard" blobovniczaconfig "github.com/nspcc-dev/neofs-node/cmd/neofs-node/config/engine/shard/blobstor/blobovnicza" fstreeconfig "github.com/nspcc-dev/neofs-node/cmd/neofs-node/config/engine/shard/blobstor/fstree" loggerconfig "github.com/nspcc-dev/neofs-node/cmd/neofs-node/config/logger" metricsconfig "github.com/nspcc-dev/neofs-node/cmd/neofs-node/config/metrics" nodeconfig "github.com/nspcc-dev/neofs-node/cmd/neofs-node/config/node" objectconfig "github.com/nspcc-dev/neofs-node/cmd/neofs-node/config/object" "github.com/nspcc-dev/neofs-node/pkg/core/container" netmapCore "github.com/nspcc-dev/neofs-node/pkg/core/netmap" "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/blobstor" "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/blobstor/blobovniczatree" "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/blobstor/fstree" "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/engine" meta "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/metabase" "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/pilorama" "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/shard" shardmode "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/shard/mode" "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/writecache" "github.com/nspcc-dev/neofs-node/pkg/metrics" "github.com/nspcc-dev/neofs-node/pkg/morph/client" nmClient "github.com/nspcc-dev/neofs-node/pkg/morph/client/netmap" "github.com/nspcc-dev/neofs-node/pkg/morph/event" netmap2 "github.com/nspcc-dev/neofs-node/pkg/morph/event/netmap" "github.com/nspcc-dev/neofs-node/pkg/network" "github.com/nspcc-dev/neofs-node/pkg/network/cache" "github.com/nspcc-dev/neofs-node/pkg/services/control" getsvc "github.com/nspcc-dev/neofs-node/pkg/services/object/get" "github.com/nspcc-dev/neofs-node/pkg/services/object_manager/tombstone" tsourse "github.com/nspcc-dev/neofs-node/pkg/services/object_manager/tombstone/source" "github.com/nspcc-dev/neofs-node/pkg/services/replicator" trustcontroller "github.com/nspcc-dev/neofs-node/pkg/services/reputation/local/controller" truststorage "github.com/nspcc-dev/neofs-node/pkg/services/reputation/local/storage" "github.com/nspcc-dev/neofs-node/pkg/services/tree" "github.com/nspcc-dev/neofs-node/pkg/services/util/response" "github.com/nspcc-dev/neofs-node/pkg/util" "github.com/nspcc-dev/neofs-node/pkg/util/logger" "github.com/nspcc-dev/neofs-node/pkg/util/state" "github.com/nspcc-dev/neofs-sdk-go/netmap" objectSDK "github.com/nspcc-dev/neofs-sdk-go/object" "github.com/nspcc-dev/neofs-sdk-go/user" "github.com/nspcc-dev/neofs-sdk-go/version" "github.com/panjf2000/ants/v2" "go.etcd.io/bbolt" "go.uber.org/atomic" "go.uber.org/zap" "google.golang.org/grpc" ) const addressSize = 72 // 32 bytes object ID, 32 bytes container ID, 8 bytes protobuf encoding const maxMsgSize = 4 << 20 // transport msg limit 4 MiB // capacity of the pools of the morph notification handlers // for each contract listener. const notificationHandlerPoolSize = 10 // applicationConfiguration reads and stores component-specific configuration // values. It should not store any application helpers structs (pointers to shared // structs). // It must not be used concurrently. type applicationConfiguration struct { // _read indicated whether a config // has already been read _read bool EngineCfg struct { errorThreshold uint32 shardPoolSize uint32 shards []shardCfg } } type shardCfg struct { compress bool smallSizeObjectLimit uint64 uncompressableContentType []string refillMetabase bool mode shardmode.Mode metaCfg struct { path string perm fs.FileMode maxBatchSize int maxBatchDelay time.Duration } subStorages []subStorageCfg gcCfg struct { removerBatchSize int removerSleepInterval time.Duration } writecacheCfg struct { enabled bool path string maxBatchSize int maxBatchDelay time.Duration smallObjectSize uint64 maxObjSize uint64 flushWorkerCount int maxCacheSize uint64 sizeLimit uint64 } piloramaCfg struct { enabled bool path string perm fs.FileMode noSync bool maxBatchSize int maxBatchDelay time.Duration } } type subStorageCfg struct { // common for all storages typ string path string perm fs.FileMode depth uint64 // blobovnicza-specific size uint64 width uint64 openedCacheSize int } // readConfig fills applicationConfiguration with raw configuration values // not modifying them. func (a *applicationConfiguration) readConfig(c *config.Config) error { if a._read { err := c.Reload() if err != nil { return fmt.Errorf("could not reload configuration: %w", err) } err = validateConfig(c) if err != nil { return fmt.Errorf("configuration's validation: %w", err) } // clear if it is rereading *a = applicationConfiguration{} } else { // update the status. // initial configuration validation is expected to be // performed on the higher level a._read = true } a.EngineCfg.errorThreshold = engineconfig.ShardErrorThreshold(c) a.EngineCfg.shardPoolSize = engineconfig.ShardPoolSize(c) return engineconfig.IterateShards(c, false, func(sc *shardconfig.Config) error { var sh shardCfg sh.refillMetabase = sc.RefillMetabase() sh.mode = sc.Mode() sh.compress = sc.Compress() sh.uncompressableContentType = sc.UncompressableContentTypes() sh.smallSizeObjectLimit = sc.SmallSizeLimit() // write-cache writeCacheCfg := sc.WriteCache() if writeCacheCfg.Enabled() { wc := &sh.writecacheCfg wc.enabled = true wc.path = writeCacheCfg.Path() wc.maxBatchSize = writeCacheCfg.BoltDB().MaxBatchSize() wc.maxBatchDelay = writeCacheCfg.BoltDB().MaxBatchDelay() wc.maxCacheSize = writeCacheCfg.MaxObjectSize() wc.smallObjectSize = writeCacheCfg.SmallObjectSize() wc.flushWorkerCount = writeCacheCfg.WorkersNumber() wc.sizeLimit = writeCacheCfg.SizeLimit() } // blobstor with substorages blobStorCfg := sc.BlobStor() storagesCfg := blobStorCfg.Storages() metabaseCfg := sc.Metabase() gcCfg := sc.GC() if config.BoolSafe(c.Sub("tree"), "enabled") { piloramaCfg := sc.Pilorama() pr := &sh.piloramaCfg pr.enabled = true pr.path = piloramaCfg.Path() pr.perm = piloramaCfg.Perm() pr.noSync = piloramaCfg.NoSync() pr.maxBatchSize = piloramaCfg.MaxBatchSize() pr.maxBatchDelay = piloramaCfg.MaxBatchDelay() } ss := make([]subStorageCfg, 0, len(storagesCfg)) for i := range storagesCfg { var sCfg subStorageCfg sCfg.typ = storagesCfg[i].Type() sCfg.path = storagesCfg[i].Path() sCfg.perm = storagesCfg[i].Perm() switch storagesCfg[i].Type() { case blobovniczatree.Type: sub := blobovniczaconfig.From((*config.Config)(storagesCfg[i])) sCfg.size = sub.Size() sCfg.depth = sub.ShallowDepth() sCfg.width = sub.ShallowWidth() sCfg.openedCacheSize = sub.OpenedCacheSize() case fstree.Type: sub := fstreeconfig.From((*config.Config)(storagesCfg[i])) sCfg.depth = sub.Depth() default: return fmt.Errorf("invalid storage type: %s", storagesCfg[i].Type()) } ss = append(ss, sCfg) } sh.subStorages = ss // meta m := &sh.metaCfg m.path = metabaseCfg.Path() m.perm = metabaseCfg.BoltDB().Perm() m.maxBatchDelay = metabaseCfg.BoltDB().MaxBatchDelay() m.maxBatchSize = metabaseCfg.BoltDB().MaxBatchSize() // GC sh.gcCfg.removerBatchSize = gcCfg.RemoverBatchSize() sh.gcCfg.removerSleepInterval = gcCfg.RemoverSleepInterval() a.EngineCfg.shards = append(a.EngineCfg.shards, sh) return nil }) } type cfg struct { applicationConfiguration ctx context.Context appCfg *config.Config ctxCancel func() internalErr chan error // channel for internal application errors at runtime log *zap.Logger wg *sync.WaitGroup key *keys.PrivateKey binPublicKey []byte ownerIDFromKey user.ID // user ID calculated from key apiVersion version.Version cfgGRPC cfgGRPC cfgMorph cfgMorph cfgAccounting cfgAccounting cfgContainer cfgContainer cfgNetmap cfgNetmap privateTokenStore sessionStorage cfgNodeInfo cfgNodeInfo localAddr network.AddressGroup cfgObject cfgObject cfgNotifications cfgNotifications metricsCollector *metrics.NodeMetrics workers []worker respSvc *response.Service replicator *replicator.Replicator cfgControlService cfgControlService treeService *tree.Service healthStatus *atomic.Int32 closers []func() cfgReputation cfgReputation clientCache *cache.ClientCache persistate *state.PersistentStorage netMapSource netmapCore.Source // current network map netMap atomicstd.Value // type netmap.NetMap } // ReadCurrentNetMap reads network map which has been cached at the // latest epoch. Returns an error if value has not been cached yet. // // Provides interface for NetmapService server. func (c *cfg) ReadCurrentNetMap(msg *netmapV2.NetMap) error { val := c.netMap.Load() if val == nil { return errors.New("missing local network map") } val.(netmap.NetMap).WriteToV2(msg) return nil } type cfgGRPC struct { listeners []net.Listener servers []*grpc.Server maxChunkSize uint64 maxAddrAmount uint64 } type cfgMorph struct { client *client.Client notaryEnabled bool // TTL of Sidechain cached values. Non-positive value disables caching. cacheTTL time.Duration eigenTrustTicker *eigenTrustTickers // timers for EigenTrust iterations proxyScriptHash neogoutil.Uint160 } type cfgAccounting struct { scriptHash neogoutil.Uint160 } type cfgContainer struct { scriptHash neogoutil.Uint160 parsers map[event.Type]event.NotificationParser subscribers map[event.Type][]event.Handler workerPool util.WorkerPool // pool for asynchronous handlers } type cfgNetmap struct { scriptHash neogoutil.Uint160 wrapper *nmClient.Client parsers map[event.Type]event.NotificationParser subscribers map[event.Type][]event.Handler workerPool util.WorkerPool // pool for asynchronous handlers state *networkState needBootstrap bool reBoostrapTurnedOff *atomic.Bool // managed by control service in runtime startEpoch uint64 // epoch number when application is started } type cfgNodeInfo struct { // values from config localInfo netmap.NodeInfo } type cfgObject struct { getSvc *getsvc.Service cnrSource container.Source eaclSource container.EACLSource pool cfgObjectRoutines cfgLocalStorage cfgLocalStorage } type cfgNotifications struct { enabled bool nw notificationWriter defaultTopic string } type cfgLocalStorage struct { localStorage *engine.StorageEngine } type cfgObjectRoutines struct { putRemote *ants.Pool putRemoteCapacity int replication *ants.Pool } type cfgControlService struct { server *grpc.Server } type cfgReputation struct { workerPool util.WorkerPool // pool for EigenTrust algorithm's iterations localTrustStorage *truststorage.Storage localTrustCtrl *trustcontroller.Controller scriptHash neogoutil.Uint160 } var persistateSideChainLastBlockKey = []byte("side_chain_last_processed_block") func initCfg(appCfg *config.Config) *cfg { key := nodeconfig.Key(appCfg) var logPrm logger.Prm err := logPrm.SetLevelString( loggerconfig.Level(appCfg), ) fatalOnErr(err) log, err := logger.NewLogger(logPrm) fatalOnErr(err) var netAddr network.AddressGroup relayOnly := nodeconfig.Relay(appCfg) if !relayOnly { netAddr = nodeconfig.BootstrapAddresses(appCfg) } maxChunkSize := uint64(maxMsgSize) * 3 / 4 // 25% to meta, 75% to payload maxAddrAmount := uint64(maxChunkSize) / addressSize // each address is about 72 bytes netState := newNetworkState() persistate, err := state.NewPersistentStorage(nodeconfig.PersistentState(appCfg).Path()) fatalOnErr(err) containerWorkerPool, err := ants.NewPool(notificationHandlerPoolSize) fatalOnErr(err) netmapWorkerPool, err := ants.NewPool(notificationHandlerPoolSize) fatalOnErr(err) reputationWorkerPool, err := ants.NewPool(notificationHandlerPoolSize) fatalOnErr(err) c := &cfg{ ctx: context.Background(), appCfg: appCfg, internalErr: make(chan error), log: log, wg: new(sync.WaitGroup), key: key, binPublicKey: key.PublicKey().Bytes(), apiVersion: version.Current(), cfgAccounting: cfgAccounting{ scriptHash: contractsconfig.Balance(appCfg), }, cfgContainer: cfgContainer{ scriptHash: contractsconfig.Container(appCfg), workerPool: containerWorkerPool, }, cfgNetmap: cfgNetmap{ scriptHash: contractsconfig.Netmap(appCfg), state: netState, workerPool: netmapWorkerPool, needBootstrap: !relayOnly, reBoostrapTurnedOff: atomic.NewBool(relayOnly), }, cfgGRPC: cfgGRPC{ maxChunkSize: maxChunkSize, maxAddrAmount: maxAddrAmount, }, cfgMorph: cfgMorph{ proxyScriptHash: contractsconfig.Proxy(appCfg), }, localAddr: netAddr, respSvc: response.NewService( response.WithNetworkState(netState), ), cfgObject: cfgObject{ pool: initObjectPool(appCfg), }, healthStatus: atomic.NewInt32(int32(control.HealthStatus_HEALTH_STATUS_UNDEFINED)), cfgReputation: cfgReputation{ scriptHash: contractsconfig.Reputation(appCfg), workerPool: reputationWorkerPool, }, clientCache: cache.NewSDKClientCache(cache.ClientCacheOpts{ DialTimeout: apiclientconfig.DialTimeout(appCfg), StreamTimeout: apiclientconfig.StreamTimeout(appCfg), Key: &key.PrivateKey, AllowExternal: apiclientconfig.AllowExternal(appCfg), }), persistate: persistate, } // returned err must be nil during first time read err = c.readConfig(appCfg) if err != nil { panic(fmt.Errorf("config reading: %w", err)) } user.IDFromKey(&c.ownerIDFromKey, key.PrivateKey.PublicKey) if metricsconfig.Enabled(c.appCfg) { c.metricsCollector = metrics.NewNodeMetrics() netState.metrics = c.metricsCollector } c.onShutdown(c.clientCache.CloseAll) // clean up connections c.onShutdown(func() { _ = c.persistate.Close() }) return c } func (c *cfg) engineOpts() []engine.Option { opts := make([]engine.Option, 0, 4) opts = append(opts, engine.WithShardPoolSize(c.EngineCfg.shardPoolSize), engine.WithErrorThreshold(c.EngineCfg.errorThreshold), engine.WithLogger(c.log), ) if c.metricsCollector != nil { opts = append(opts, engine.WithMetrics(c.metricsCollector)) } return opts } type shardOptsWithMetaPath struct { metaPath string shOpts []shard.Option } func (c *cfg) shardOpts() []shardOptsWithMetaPath { shards := make([]shardOptsWithMetaPath, 0, len(c.EngineCfg.shards)) for _, shCfg := range c.EngineCfg.shards { var writeCacheOpts []writecache.Option if wcRead := shCfg.writecacheCfg; wcRead.enabled { writeCacheOpts = append(writeCacheOpts, writecache.WithPath(wcRead.path), writecache.WithMaxBatchSize(wcRead.maxBatchSize), writecache.WithMaxBatchDelay(wcRead.maxBatchDelay), writecache.WithMaxObjectSize(wcRead.maxObjSize), writecache.WithSmallObjectSize(wcRead.smallObjectSize), writecache.WithFlushWorkersCount(wcRead.flushWorkerCount), writecache.WithMaxCacheSize(wcRead.sizeLimit), writecache.WithLogger(c.log), ) } var piloramaOpts []pilorama.Option if prRead := shCfg.piloramaCfg; prRead.enabled { piloramaOpts = append(piloramaOpts, pilorama.WithPath(prRead.path), pilorama.WithPerm(prRead.perm), pilorama.WithNoSync(prRead.noSync), pilorama.WithMaxBatchSize(prRead.maxBatchSize), pilorama.WithMaxBatchDelay(prRead.maxBatchDelay), ) } var ss []blobstor.SubStorage for _, sRead := range shCfg.subStorages { switch sRead.typ { case blobovniczatree.Type: ss = append(ss, blobstor.SubStorage{ Storage: blobovniczatree.NewBlobovniczaTree( blobovniczatree.WithRootPath(sRead.path), blobovniczatree.WithPermissions(sRead.perm), blobovniczatree.WithBlobovniczaSize(sRead.size), blobovniczatree.WithBlobovniczaShallowDepth(sRead.depth), blobovniczatree.WithBlobovniczaShallowWidth(sRead.width), blobovniczatree.WithOpenedCacheSize(sRead.openedCacheSize), blobovniczatree.WithLogger(c.log)), Policy: func(_ *objectSDK.Object, data []byte) bool { return uint64(len(data)) < shCfg.smallSizeObjectLimit }, }) case fstree.Type: ss = append(ss, blobstor.SubStorage{ Storage: fstree.New( fstree.WithPath(sRead.path), fstree.WithPerm(sRead.perm), fstree.WithDepth(sRead.depth)), Policy: func(_ *objectSDK.Object, data []byte) bool { return true }, }) default: // should never happen, that has already // been handled: when the config was read } } var sh shardOptsWithMetaPath sh.metaPath = shCfg.metaCfg.path sh.shOpts = []shard.Option{ shard.WithLogger(c.log), shard.WithRefillMetabase(shCfg.refillMetabase), shard.WithMode(shCfg.mode), shard.WithBlobStorOptions( blobstor.WithCompressObjects(shCfg.compress), blobstor.WithUncompressableContentTypes(shCfg.uncompressableContentType), blobstor.WithStorages(ss), blobstor.WithLogger(c.log), ), shard.WithMetaBaseOptions( meta.WithPath(shCfg.metaCfg.path), meta.WithPermissions(shCfg.metaCfg.perm), meta.WithMaxBatchSize(shCfg.metaCfg.maxBatchSize), meta.WithMaxBatchDelay(shCfg.metaCfg.maxBatchDelay), meta.WithBoltDBOptions(&bbolt.Options{ Timeout: 100 * time.Millisecond, }), meta.WithLogger(c.log), meta.WithEpochState(c.cfgNetmap.state), ), shard.WithPiloramaOptions(piloramaOpts...), shard.WithWriteCache(shCfg.writecacheCfg.enabled), shard.WithWriteCacheOptions(writeCacheOpts...), shard.WithRemoverBatchSize(shCfg.gcCfg.removerBatchSize), shard.WithGCRemoverSleepInterval(shCfg.gcCfg.removerSleepInterval), shard.WithGCWorkerPoolInitializer(func(sz int) util.WorkerPool { pool, err := ants.NewPool(sz) fatalOnErr(err) return pool }), } shards = append(shards, sh) } return shards } func (c *cfg) LocalAddress() network.AddressGroup { return c.localAddr } func initLocalStorage(c *cfg) { ls := engine.New(c.engineOpts()...) addNewEpochAsyncNotificationHandler(c, func(ev event.Event) { ls.HandleNewEpoch(ev.(netmap2.NewEpoch).EpochNumber()) }) // allocate memory for the service; // service will be created later c.cfgObject.getSvc = new(getsvc.Service) var tssPrm tsourse.TombstoneSourcePrm tssPrm.SetGetService(c.cfgObject.getSvc) tombstoneSrc := tsourse.NewSource(tssPrm) tombstoneSource := tombstone.NewChecker( tombstone.WithLogger(c.log), tombstone.WithTombstoneSource(tombstoneSrc), ) for _, optsWithMeta := range c.shardOpts() { id, err := ls.AddShard(append(optsWithMeta.shOpts, shard.WithTombstoneSource(tombstoneSource))...) fatalOnErr(err) c.log.Info("shard attached to engine", zap.Stringer("id", id), ) } c.cfgObject.cfgLocalStorage.localStorage = ls c.onShutdown(func() { c.log.Info("closing components of the storage engine...") err := ls.Close() if err != nil { c.log.Info("storage engine closing failure", zap.String("error", err.Error()), ) } else { c.log.Info("all components of the storage engine closed successfully") } }) } func initObjectPool(cfg *config.Config) (pool cfgObjectRoutines) { var err error optNonBlocking := ants.WithNonblocking(true) pool.putRemoteCapacity = objectconfig.Put(cfg).PoolSizeRemote() pool.putRemote, err = ants.NewPool(pool.putRemoteCapacity, optNonBlocking) fatalOnErr(err) pool.replication, err = ants.NewPool(pool.putRemoteCapacity) fatalOnErr(err) return pool } func (c *cfg) LocalNodeInfo() (*netmapV2.NodeInfo, error) { var res netmapV2.NodeInfo ni, ok := c.cfgNetmap.state.getNodeInfo() if ok { ni.WriteToV2(&res) } else { c.cfgNodeInfo.localInfo.WriteToV2(&res) } return &res, nil } // handleLocalNodeInfo rewrites local node info from the NeoFS network map. // Called with nil when storage node is outside the NeoFS network map // (before entering the network and after leaving it). func (c *cfg) handleLocalNodeInfo(ni *netmap.NodeInfo) { c.cfgNetmap.state.setNodeInfo(ni) } // bootstrap sets local node's netmap status to "online". func (c *cfg) bootstrap() error { ni := c.cfgNodeInfo.localInfo ni.SetOnline() prm := nmClient.AddPeerPrm{} prm.SetNodeInfo(ni) return c.cfgNetmap.wrapper.AddPeer(prm) } // needBootstrap checks if local node should be registered in network on bootup. func (c *cfg) needBootstrap() bool { return c.cfgNetmap.needBootstrap } // ObjectServiceLoad implements system loader interface for policer component. // It is calculated as size/capacity ratio of "remote object put" worker. // Returns float value between 0.0 and 1.0. func (c *cfg) ObjectServiceLoad() float64 { return float64(c.cfgObject.pool.putRemote.Running()) / float64(c.cfgObject.pool.putRemoteCapacity) } func (c *cfg) configWatcher(ctx context.Context) { ch := make(chan os.Signal, 1) signal.Notify(ch, syscall.SIGHUP) for { select { case <-ch: c.log.Info("SIGHUP has been received, rereading configuration...") err := c.readConfig(c.appCfg) if err != nil { c.log.Error("configuration reading", zap.Error(err)) continue } var rcfg engine.ReConfiguration for _, optsWithMeta := range c.shardOpts() { rcfg.AddShard(optsWithMeta.metaPath, optsWithMeta.shOpts) } err = c.cfgObject.cfgLocalStorage.localStorage.Reload(rcfg) if err != nil { c.log.Error("storage engine configuration update", zap.Error(err)) continue } c.log.Info("configuration has been reloaded successfully") case <-ctx.Done(): return } } }