package main import ( "context" "crypto/ecdsa" "net" "os" "path" "strconv" "strings" "sync" "time" "github.com/nspcc-dev/neo-go/pkg/encoding/fixedn" "github.com/nspcc-dev/neo-go/pkg/util" "github.com/nspcc-dev/neofs-api-go/pkg" "github.com/nspcc-dev/neofs-api-go/pkg/netmap" crypto "github.com/nspcc-dev/neofs-crypto" "github.com/nspcc-dev/neofs-node/misc" "github.com/nspcc-dev/neofs-node/pkg/core/container" netmapCore "github.com/nspcc-dev/neofs-node/pkg/core/netmap" "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/blobstor" "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/engine" meta "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/metabase" "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/shard" "github.com/nspcc-dev/neofs-node/pkg/morph/client" "github.com/nspcc-dev/neofs-node/pkg/morph/client/container/wrapper" nmwrapper "github.com/nspcc-dev/neofs-node/pkg/morph/client/netmap/wrapper" "github.com/nspcc-dev/neofs-node/pkg/morph/event" "github.com/nspcc-dev/neofs-node/pkg/network" "github.com/nspcc-dev/neofs-node/pkg/services/control" tokenStorage "github.com/nspcc-dev/neofs-node/pkg/services/session/storage" "github.com/nspcc-dev/neofs-node/pkg/services/util/response" "github.com/nspcc-dev/neofs-node/pkg/util/logger" "github.com/nspcc-dev/neofs-node/pkg/util/profiler" "github.com/panjf2000/ants/v2" "github.com/pkg/errors" "github.com/spf13/viper" "go.uber.org/atomic" "go.uber.org/zap" "google.golang.org/grpc" ) const ( // logger keys cfgLogLevel = "logger.level" cfgLogFormat = "logger.format" cfgLogTrace = "logger.trace_level" cfgLogInitSampling = "logger.sampling.initial" cfgLogThereafterSampling = "logger.sampling.thereafter" // pprof keys cfgProfilerEnable = "pprof.enabled" cfgProfilerAddr = "pprof.address" cfgProfilerTTL = "pprof.shutdown_ttl" // config keys for cfgNodeInfo cfgNodeKey = "node.key" cfgBootstrapAddress = "node.address" cfgNodeAttributePrefix = "node.attribute" // config keys for cfgGRPC cfgListenAddress = "grpc.endpoint" cfgMaxMsgSize = "grpc.maxmessagesize" cfgReflectService = "grpc.enable_reflect_service" // config keys for cfgMorph cfgMorphRPCAddress = "morph.rpc_endpoint" cfgMorphNotifyRPCAddress = "morph.notification_endpoint" cfgMorphNotifyDialTimeout = "morph.dial_timeout" // config keys for cfgAccounting cfgAccountingContract = "accounting.scripthash" cfgAccountingFee = "accounting.fee" // config keys for cfgNetmap cfgNetmapContract = "netmap.scripthash" cfgNetmapFee = "netmap.fee" // config keys for cfgContainer cfgContainerContract = "container.scripthash" cfgContainerFee = "container.fee" cfgGCQueueSize = "gc.queuesize" cfgGCQueueTick = "gc.duration.sleep" cfgGCTimeout = "gc.duration.timeout" cfgPolicerWorkScope = "policer.work_scope" cfgPolicerExpRate = "policer.expansion_rate" cfgPolicerHeadTimeout = "policer.head_timeout" cfgPolicerDialTimeout = "policer.dial_timeout" cfgReplicatorPutTimeout = "replicator.put_timeout" cfgReplicatorDialTimeout = "replicator.dial_timeout" cfgReBootstrapEnabled = "bootstrap.periodic.enabled" cfgReBootstrapInterval = "bootstrap.periodic.interval" cfgObjectPutPoolSize = "pool.object.put.size" cfgObjectGetPoolSize = "pool.object.get.size" cfgObjectHeadPoolSize = "pool.object.head.size" cfgObjectSearchPoolSize = "pool.object.search.size" cfgObjectRangePoolSize = "pool.object.range.size" cfgObjectRangeHashPoolSize = "pool.object.rangehash.size" cfgObjectPutDialTimeout = "object.put.dial_timeout" cfgObjectHeadDialTimeout = "object.head.dial_timeout" cfgObjectRangeDialTimeout = "object.range.dial_timeout" cfgObjectRangeHashDialTimeout = "object.rangehash.dial_timeout" cfgObjectSearchDialTimeout = "object.search.dial_timeout" cfgObjectGetDialTimeout = "object.get.dial_timeout" ) const ( cfgLocalStorageSection = "storage" cfgStorageShardSection = "shard" cfgShardUseWriteCache = "use_write_cache" cfgBlobStorSection = "blobstor" cfgWriteCacheSection = "writecache" cfgBlobStorCompress = "compress" cfgBlobStorShallowDepth = "shallow_depth" cfgBlobStorTreePath = "path" cfgBlobStorTreePerm = "perm" cfgBlobStorSmallSzLimit = "small_size_limit" cfgBlobStorBlzSection = "blobovnicza" cfgBlzSize = "size" cfgBlzShallowDepth = "shallow_depth" cfgBlzShallowWidth = "shallow_width" cfgBlzOpenedCacheSize = "opened_cache_size" cfgMetaBaseSection = "metabase" cfgMetaBasePath = "path" cfgMetaBasePerm = "perm" ) const ( addressSize = 72 // 32 bytes oid, 32 bytes cid, 8 bytes protobuf encoding ) type cfg struct { ctx context.Context ctxCancel func() internalErr chan error // channel for internal application errors at runtime viper *viper.Viper log *zap.Logger wg *sync.WaitGroup key *ecdsa.PrivateKey apiVersion *pkg.Version cfgGRPC cfgGRPC cfgMorph cfgMorph cfgAccounting cfgAccounting cfgContainer cfgContainer cfgNetmap cfgNetmap privateTokenStore *tokenStorage.TokenStore cfgNodeInfo cfgNodeInfo localAddr *network.Address cfgObject cfgObject profiler profiler.Profiler workers []worker respSvc *response.Service cfgControlService cfgControlService healthStatus *atomic.Int32 } type cfgGRPC struct { listener net.Listener server *grpc.Server maxChunkSize uint64 maxAddrAmount uint64 enableReflectService bool } type cfgMorph struct { client *client.Client } type cfgAccounting struct { scriptHash util.Uint160 fee fixedn.Fixed8 } type cfgContainer struct { scriptHash util.Uint160 fee fixedn.Fixed8 } type cfgNetmap struct { scriptHash util.Uint160 wrapper *nmwrapper.Wrapper fee fixedn.Fixed8 parsers map[event.Type]event.Parser subscribers map[event.Type][]event.Handler state *networkState reBootstrapEnabled bool reBootstrapInterval uint64 // in epochs } type BootstrapType uint32 type cfgNodeInfo struct { // values from config bootType BootstrapType attributes []*netmap.NodeAttribute // values at runtime info *netmap.NodeInfo } type cfgObject struct { netMapStorage netmapCore.Source cnrStorage container.Source cnrClient *wrapper.Wrapper pool cfgObjectRoutines cfgLocalStorage cfgLocalStorage } type cfgLocalStorage struct { localStorage *engine.StorageEngine shardOpts [][]shard.Option } type cfgObjectRoutines struct { get, head, put, search, rng, rngHash *ants.Pool } type cfgControlService struct { server *grpc.Server } const ( _ BootstrapType = iota StorageNode RelayNode ) func initCfg(path string) *cfg { viperCfg := initViper(path) key, err := crypto.LoadPrivateKey(viperCfg.GetString(cfgNodeKey)) fatalOnErr(err) u160Accounting, err := util.Uint160DecodeStringLE( viperCfg.GetString(cfgAccountingContract)) fatalOnErr(err) u160Netmap, err := util.Uint160DecodeStringLE( viperCfg.GetString(cfgNetmapContract)) fatalOnErr(err) u160Container, err := util.Uint160DecodeStringLE( viperCfg.GetString(cfgContainerContract)) fatalOnErr(err) log, err := logger.NewLogger(viperCfg) fatalOnErr(err) netAddr, err := network.AddressFromString(viperCfg.GetString(cfgBootstrapAddress)) fatalOnErr(err) maxChunkSize := viperCfg.GetUint64(cfgMaxMsgSize) * 3 / 4 // 25% to meta, 75% to payload maxAddrAmount := maxChunkSize / addressSize // each address is about 72 bytes state := newNetworkState() c := &cfg{ ctx: context.Background(), internalErr: make(chan error), viper: viperCfg, log: log, wg: new(sync.WaitGroup), key: key, apiVersion: pkg.SDKVersion(), cfgAccounting: cfgAccounting{ scriptHash: u160Accounting, fee: fixedn.Fixed8(viperCfg.GetInt(cfgAccountingFee)), }, cfgContainer: cfgContainer{ scriptHash: u160Container, fee: fixedn.Fixed8(viperCfg.GetInt(cfgContainerFee)), }, cfgNetmap: cfgNetmap{ scriptHash: u160Netmap, fee: fixedn.Fixed8(viperCfg.GetInt(cfgNetmapFee)), state: state, reBootstrapInterval: viperCfg.GetUint64(cfgReBootstrapInterval), reBootstrapEnabled: viperCfg.GetBool(cfgReBootstrapEnabled), }, cfgNodeInfo: cfgNodeInfo{ bootType: StorageNode, attributes: parseAttributes(viperCfg), }, cfgGRPC: cfgGRPC{ maxChunkSize: maxChunkSize, maxAddrAmount: maxAddrAmount, enableReflectService: viperCfg.GetBool(cfgReflectService), }, localAddr: netAddr, respSvc: response.NewService( response.WithNetworkState(state), ), cfgObject: cfgObject{ pool: initObjectPool(viperCfg), }, healthStatus: atomic.NewInt32(int32(control.NetmapStatus_STATUS_UNDEFINED)), } initLocalStorage(c) return c } func initViper(path string) *viper.Viper { v := viper.New() v.SetEnvPrefix(misc.Prefix) v.AutomaticEnv() v.SetEnvKeyReplacer(strings.NewReplacer(".", "_")) v.SetDefault("app.name", misc.NodeName) v.SetDefault("app.version", misc.Version) defaultConfiguration(v) if path != "" { v.SetConfigFile(path) v.SetConfigType("yml") fatalOnErr(v.ReadInConfig()) } return v } func defaultConfiguration(v *viper.Viper) { v.SetDefault(cfgNodeKey, "") // node key v.SetDefault(cfgBootstrapAddress, "") // announced address of the node v.SetDefault(cfgMorphRPCAddress, []string{}) v.SetDefault(cfgMorphNotifyRPCAddress, []string{}) v.SetDefault(cfgMorphNotifyDialTimeout, 5*time.Second) v.SetDefault(cfgListenAddress, "127.0.0.1:50501") // listen address v.SetDefault(cfgMaxMsgSize, 4<<20) // transport msg limit 4 MiB v.SetDefault(cfgReflectService, false) v.SetDefault(cfgAccountingContract, "1aeefe1d0dfade49740fff779c02cd4a0538ffb1") v.SetDefault(cfgAccountingFee, "1") v.SetDefault(cfgContainerContract, "9d2ca84d7fb88213c4baced5a6ed4dc402309039") v.SetDefault(cfgContainerFee, "1") v.SetDefault(cfgNetmapContract, "75194459637323ea8837d2afe8225ec74a5658c3") v.SetDefault(cfgNetmapFee, "1") v.SetDefault(cfgLogLevel, "info") v.SetDefault(cfgLogFormat, "console") v.SetDefault(cfgLogTrace, "fatal") v.SetDefault(cfgLogInitSampling, 1000) v.SetDefault(cfgLogThereafterSampling, 1000) v.SetDefault(cfgProfilerEnable, false) v.SetDefault(cfgProfilerAddr, ":6060") v.SetDefault(cfgProfilerTTL, "30s") v.SetDefault(cfgGCQueueSize, 1000) v.SetDefault(cfgGCQueueTick, "5s") v.SetDefault(cfgGCTimeout, "5s") v.SetDefault(cfgPolicerWorkScope, 100) v.SetDefault(cfgPolicerExpRate, 10) // in % v.SetDefault(cfgPolicerHeadTimeout, 5*time.Second) v.SetDefault(cfgPolicerDialTimeout, 5*time.Second) v.SetDefault(cfgReplicatorPutTimeout, 5*time.Second) v.SetDefault(cfgReplicatorDialTimeout, 5*time.Second) v.SetDefault(cfgReBootstrapEnabled, false) // in epochs v.SetDefault(cfgReBootstrapInterval, 2) // in epochs v.SetDefault(cfgObjectGetPoolSize, 10) v.SetDefault(cfgObjectHeadPoolSize, 10) v.SetDefault(cfgObjectPutPoolSize, 10) v.SetDefault(cfgObjectSearchPoolSize, 10) v.SetDefault(cfgObjectRangePoolSize, 10) v.SetDefault(cfgObjectRangeHashPoolSize, 10) v.SetDefault(cfgCtrlSvcAuthorizedKeys, []string{}) } func (c *cfg) LocalAddress() *network.Address { return c.localAddr } func initLocalStorage(c *cfg) { initShardOptions(c) ls := engine.New( engine.WithLogger(c.log), ) for _, opts := range c.cfgObject.cfgLocalStorage.shardOpts { id, err := ls.AddShard(opts...) fatalOnErr(err) c.log.Info("shard attached to engine", zap.Stringer("id", id), ) } c.cfgObject.cfgLocalStorage.localStorage = ls } func initShardOptions(c *cfg) { var opts [][]shard.Option for i := 0; ; i++ { prefix := configPath( cfgLocalStorageSection, cfgStorageShardSection, strconv.Itoa(i), ) useCache := c.viper.GetBool( configPath(prefix, cfgShardUseWriteCache), ) writeCachePrefix := configPath(prefix, cfgWriteCacheSection) writeCachePath := c.viper.GetString( configPath(writeCachePrefix, cfgBlobStorTreePath), ) if useCache && writeCachePath == "" { c.log.Warn("incorrect writeCache path, ignore shard") break } blobPrefix := configPath(prefix, cfgBlobStorSection) blobPath := c.viper.GetString( configPath(blobPrefix, cfgBlobStorTreePath), ) if blobPath == "" { c.log.Warn("incorrect blobStor path, ignore shard") break } compressObjects := c.viper.GetBool( configPath(blobPrefix, cfgBlobStorCompress), ) blobPerm := os.FileMode(c.viper.GetInt( configPath(blobPrefix, cfgBlobStorTreePerm), )) shallowDepth := c.viper.GetInt( configPath(blobPrefix, cfgBlobStorShallowDepth), ) smallSzLimit := c.viper.GetUint64( configPath(blobPrefix, cfgBlobStorSmallSzLimit), ) blzPrefix := configPath(blobPrefix, cfgBlobStorBlzSection) blzSize := c.viper.GetUint64( configPath(blzPrefix, cfgBlzSize), ) blzShallowDepth := c.viper.GetUint64( configPath(blzPrefix, cfgBlzShallowDepth), ) blzShallowWidth := c.viper.GetUint64( configPath(blzPrefix, cfgBlzShallowWidth), ) blzCacheSize := c.viper.GetInt( configPath(blzPrefix, cfgBlzOpenedCacheSize), ) metaPrefix := configPath(prefix, cfgMetaBaseSection) metaPath := c.viper.GetString( configPath(metaPrefix, cfgMetaBasePath), ) metaPerm := os.FileMode(c.viper.GetUint32( configPath(metaPrefix, cfgMetaBasePerm), )) fatalOnErr(os.MkdirAll(path.Dir(metaPath), metaPerm)) opts = append(opts, []shard.Option{ shard.WithLogger(c.log), shard.WithBlobStorOptions( blobstor.WithRootPath(blobPath), blobstor.WithCompressObjects(compressObjects, c.log), blobstor.WithRootPerm(blobPerm), blobstor.WithShallowDepth(shallowDepth), blobstor.WithSmallSizeLimit(smallSzLimit), blobstor.WithBlobovniczaSize(blzSize), blobstor.WithBlobovniczaShallowDepth(blzShallowDepth), blobstor.WithBlobovniczaShallowWidth(blzShallowWidth), blobstor.WithBlobovniczaOpenedCacheSize(blzCacheSize), blobstor.WithLogger(c.log), ), shard.WithMetaBaseOptions( meta.WithLogger(c.log), meta.WithPath(metaPath), meta.WithPermissions(metaPerm), ), shard.WithWriteCache(useCache), shard.WithWriteCacheOptions( blobstor.WithRootPath(writeCachePath), blobstor.WithBlobovniczaShallowDepth(0), blobstor.WithBlobovniczaShallowWidth(1), ), }) c.log.Info("storage shard options", zap.Bool("with write cache", useCache), zap.String("with write cache path", writeCachePath), zap.String("BLOB path", blobPath), zap.Stringer("BLOB permissions", blobPerm), zap.Bool("BLOB compress", compressObjects), zap.Int("BLOB shallow depth", shallowDepth), zap.Uint64("BLOB small size limit", smallSzLimit), zap.String("metabase path", metaPath), zap.Stringer("metabase permissions", metaPerm), ) } if len(opts) == 0 { fatalOnErr(errors.New("no correctly set up shards, exit")) } c.cfgObject.cfgLocalStorage.shardOpts = opts } func configPath(sections ...string) string { return strings.Join(sections, ".") } func initObjectPool(cfg *viper.Viper) (pool cfgObjectRoutines) { var err error optNonBlocking := ants.WithNonblocking(true) pool.get, err = ants.NewPool(cfg.GetInt(cfgObjectGetPoolSize), optNonBlocking) if err != nil { fatalOnErr(err) } pool.head, err = ants.NewPool(cfg.GetInt(cfgObjectHeadPoolSize), optNonBlocking) if err != nil { fatalOnErr(err) } pool.search, err = ants.NewPool(cfg.GetInt(cfgObjectSearchPoolSize), optNonBlocking) if err != nil { fatalOnErr(err) } pool.put, err = ants.NewPool(cfg.GetInt(cfgObjectPutPoolSize), optNonBlocking) if err != nil { fatalOnErr(err) } pool.rng, err = ants.NewPool(cfg.GetInt(cfgObjectRangePoolSize), optNonBlocking) if err != nil { fatalOnErr(err) } pool.rngHash, err = ants.NewPool(cfg.GetInt(cfgObjectRangeHashPoolSize), optNonBlocking) if err != nil { fatalOnErr(err) } return pool }