package main

import (
	"context"
	"crypto/ecdsa"
	"net"
	"os"
	"path"
	"strconv"
	"strings"
	"sync"
	"time"

	"github.com/nspcc-dev/neo-go/pkg/encoding/fixedn"
	"github.com/nspcc-dev/neo-go/pkg/util"
	"github.com/nspcc-dev/neofs-api-go/pkg"
	"github.com/nspcc-dev/neofs-api-go/pkg/netmap"
	netmapV2 "github.com/nspcc-dev/neofs-api-go/v2/netmap"
	crypto "github.com/nspcc-dev/neofs-crypto"
	"github.com/nspcc-dev/neofs-node/misc"
	"github.com/nspcc-dev/neofs-node/pkg/core/container"
	netmapCore "github.com/nspcc-dev/neofs-node/pkg/core/netmap"
	"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/blobstor"
	"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/engine"
	meta "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/metabase"
	"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/shard"
	"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/writecache"
	"github.com/nspcc-dev/neofs-node/pkg/metrics"
	"github.com/nspcc-dev/neofs-node/pkg/morph/client"
	cntwrapper "github.com/nspcc-dev/neofs-node/pkg/morph/client/container/wrapper"
	nmwrapper "github.com/nspcc-dev/neofs-node/pkg/morph/client/netmap/wrapper"
	"github.com/nspcc-dev/neofs-node/pkg/morph/event"
	netmap2 "github.com/nspcc-dev/neofs-node/pkg/morph/event/netmap"
	"github.com/nspcc-dev/neofs-node/pkg/morph/timer"
	"github.com/nspcc-dev/neofs-node/pkg/network"
	"github.com/nspcc-dev/neofs-node/pkg/services/control"
	trustcontroller "github.com/nspcc-dev/neofs-node/pkg/services/reputation/local/controller"
	truststorage "github.com/nspcc-dev/neofs-node/pkg/services/reputation/local/storage"
	tokenStorage "github.com/nspcc-dev/neofs-node/pkg/services/session/storage"
	"github.com/nspcc-dev/neofs-node/pkg/services/util/response"
	util2 "github.com/nspcc-dev/neofs-node/pkg/util"
	"github.com/nspcc-dev/neofs-node/pkg/util/logger"
	"github.com/nspcc-dev/neofs-node/pkg/util/profiler"
	"github.com/panjf2000/ants/v2"
	"github.com/pkg/errors"
	"github.com/spf13/viper"
	"go.etcd.io/bbolt"
	"go.uber.org/atomic"
	"go.uber.org/zap"
	"google.golang.org/grpc"
)

const (
	// logger keys
	cfgLogLevel              = "logger.level"
	cfgLogFormat             = "logger.format"
	cfgLogTrace              = "logger.trace_level"
	cfgLogInitSampling       = "logger.sampling.initial"
	cfgLogThereafterSampling = "logger.sampling.thereafter"

	// pprof keys
	cfgProfilerEnable = "pprof.enabled"
	cfgProfilerAddr   = "pprof.address"
	cfgProfilerTTL    = "pprof.shutdown_ttl"

	// metrics keys
	cfgMetricsEnable = "metrics.enabled"
	cfgMetricsAddr   = "metrics.address"

	// config keys for cfgNodeInfo
	cfgNodeKey             = "node.key"
	cfgBootstrapAddress    = "node.address"
	cfgNodeAttributePrefix = "node.attribute"

	// config keys for cfgGRPC
	cfgListenAddress  = "grpc.endpoint"
	cfgMaxMsgSize     = "grpc.maxmessagesize"
	cfgReflectService = "grpc.enable_reflect_service"
	cfgDialTimeout    = "grpc.dial_timeout"

	// config keys for cfgMorph
	cfgMorphRPCAddress = "morph.rpc_endpoint"

	cfgMorphNotifyRPCAddress  = "morph.notification_endpoint"
	cfgMorphNotifyDialTimeout = "morph.dial_timeout"

	// config keys for cfgAccounting
	cfgAccountingContract = "accounting.scripthash"
	cfgAccountingFee      = "accounting.fee"

	// config keys for cfgNetmap
	cfgNetmapContract          = "netmap.scripthash"
	cfgNetmapFee               = "netmap.fee"
	cfgNetmapWorkerPoolEnabled = "netmap.async_worker.enabled"
	cfgNetmapWorkerPoolSize    = "netmap.async_worker.size"

	// config keys for cfgContainer
	cfgContainerContract          = "container.scripthash"
	cfgContainerFee               = "container.fee"
	cfgContainerWorkerPoolEnabled = "container.async_worker.enabled"
	cfgContainerWorkerPoolSize    = "container.async_worker.size"

	// config keys for cfgReputation
	cfgReputationContract          = "reputation.scripthash"
	cfgReputationWorkerPoolEnabled = "reputation.async_worker.enabled"
	cfgReputationWorkerPoolSize    = "reputation.async_worker.size"

	cfgGCQueueSize = "gc.queuesize"
	cfgGCQueueTick = "gc.duration.sleep"
	cfgGCTimeout   = "gc.duration.timeout"

	cfgPolicerWorkScope   = "policer.work_scope"
	cfgPolicerExpRate     = "policer.expansion_rate"
	cfgPolicerHeadTimeout = "policer.head_timeout"

	cfgReplicatorPutTimeout = "replicator.put_timeout"

	cfgReBootstrapRelay    = "bootstrap.relay_only"
	cfgReBootstrapEnabled  = "bootstrap.periodic.enabled"
	cfgReBootstrapInterval = "bootstrap.periodic.interval"

	cfgShutdownOfflineEnabled = "shutdown.offline.enabled"

	cfgObjectPutPoolSize       = "pool.object.put.size"
	cfgObjectGetPoolSize       = "pool.object.get.size"
	cfgObjectHeadPoolSize      = "pool.object.head.size"
	cfgObjectSearchPoolSize    = "pool.object.search.size"
	cfgObjectRangePoolSize     = "pool.object.range.size"
	cfgObjectRangeHashPoolSize = "pool.object.rangehash.size"
)

const (
	cfgLocalStorageSection = "storage"
	cfgStorageShardSection = "shard"

	cfgShardUseWriteCache = "use_write_cache"

	cfgBlobStorSection      = "blobstor"
	cfgWriteCacheSection    = "writecache"
	cfgWriteCacheMemSize    = "mem_size"
	cfgWriteCacheDBSize     = "db_size"
	cfgWriteCacheSmallSize  = "small_size"
	cfgWriteCacheMaxSize    = "max_size"
	cfgWriteCacheWrkCount   = "workers_count"
	cfgBlobStorCompress     = "compress"
	cfgBlobStorShallowDepth = "shallow_depth"
	cfgBlobStorTreePath     = "path"
	cfgBlobStorTreePerm     = "perm"
	cfgBlobStorSmallSzLimit = "small_size_limit"

	cfgBlobStorBlzSection = "blobovnicza"
	cfgBlzSize            = "size"
	cfgBlzShallowDepth    = "shallow_depth"
	cfgBlzShallowWidth    = "shallow_width"
	cfgBlzOpenedCacheSize = "opened_cache_size"

	cfgMetaBaseSection = "metabase"
	cfgMetaBasePath    = "path"
	cfgMetaBasePerm    = "perm"

	cfgGCSection          = "gc"
	cfgGCRemoverBatchSize = "remover_batch_size"
	cfgGCRemoverSleepInt  = "remover_sleep_interval"
)

const cfgTombstoneLifetime = "tombstone_lifetime"

const (
	addressSize = 72 // 32 bytes oid, 32 bytes cid, 8 bytes protobuf encoding
)

type cfg struct {
	ctx context.Context

	ctxCancel func()

	internalErr chan error // channel for internal application errors at runtime

	viper *viper.Viper

	log *zap.Logger

	wg *sync.WaitGroup

	key *ecdsa.PrivateKey

	apiVersion *pkg.Version

	cfgGRPC cfgGRPC

	cfgMorph cfgMorph

	cfgAccounting cfgAccounting

	cfgContainer cfgContainer

	cfgNetmap cfgNetmap

	privateTokenStore *tokenStorage.TokenStore

	cfgNodeInfo cfgNodeInfo

	localAddr *network.Address

	cfgObject cfgObject

	profiler profiler.Profiler

	metricsServer profiler.Metrics

	metricsCollector *metrics.StorageMetrics

	workers []worker

	respSvc *response.Service

	cfgControlService cfgControlService

	netStatus *atomic.Int32

	healthStatus *atomic.Int32

	closers []func()

	cfgReputation cfgReputation
}

type cfgGRPC struct {
	listener net.Listener

	server *grpc.Server

	maxChunkSize uint64

	maxAddrAmount uint64

	enableReflectService bool
}

type cfgMorph struct {
	client *client.Client

	blockTimers     []*timer.BlockTimer // all combined timers
	eigenTrustTimer *timer.BlockTimer   // timer for EigenTrust iterations
}

type cfgAccounting struct {
	scriptHash util.Uint160

	fee fixedn.Fixed8
}

type cfgContainer struct {
	scriptHash util.Uint160

	fee fixedn.Fixed8

	parsers     map[event.Type]event.Parser
	subscribers map[event.Type][]event.Handler
	workerPool  util2.WorkerPool // pool for asynchronous handlers
}

type cfgNetmap struct {
	scriptHash util.Uint160
	wrapper    *nmwrapper.Wrapper

	fee fixedn.Fixed8

	parsers map[event.Type]event.Parser

	subscribers map[event.Type][]event.Handler
	workerPool  util2.WorkerPool // pool for asynchronous handlers

	state *networkState

	reBootstrapEnabled  bool
	reBoostrapTurnedOff *atomic.Bool // managed by control service in runtime
	reBootstrapInterval uint64       // in epochs

	goOfflineEnabled bool // send `UpdateState(offline)` tx at shutdown
}

type BootstrapType uint32

type cfgNodeInfo struct {
	// values from config
	bootType   BootstrapType
	attributes []*netmap.NodeAttribute

	// values at runtime
	infoMtx sync.RWMutex
	info    netmap.NodeInfo
}

type cfgObject struct {
	netMapStorage netmapCore.Source

	cnrStorage container.Source

	cnrClient *cntwrapper.Wrapper

	pool cfgObjectRoutines

	cfgLocalStorage cfgLocalStorage
}

type cfgLocalStorage struct {
	localStorage *engine.StorageEngine

	shardOpts [][]shard.Option
}

type cfgObjectRoutines struct {
	get, head, put, search, rng, rngHash *ants.Pool
}

type cfgControlService struct {
	server *grpc.Server
}

type cfgReputation struct {
	workerPool util2.WorkerPool // pool for EigenTrust algorithm's iterations

	localTrustStorage *truststorage.Storage

	localTrustCtrl *trustcontroller.Controller

	scriptHash util.Uint160
}

const (
	_ BootstrapType = iota
	StorageNode
	RelayNode
)

func initCfg(path string) *cfg {
	viperCfg := initViper(path)

	key, err := crypto.LoadPrivateKey(viperCfg.GetString(cfgNodeKey))
	fatalOnErr(err)

	u160Accounting, err := util.Uint160DecodeStringLE(
		viperCfg.GetString(cfgAccountingContract))
	fatalOnErr(err)

	u160Netmap, err := util.Uint160DecodeStringLE(
		viperCfg.GetString(cfgNetmapContract))
	fatalOnErr(err)

	u160Container, err := util.Uint160DecodeStringLE(
		viperCfg.GetString(cfgContainerContract))
	fatalOnErr(err)

	u160Reputation, err := util.Uint160DecodeStringLE(
		viperCfg.GetString(cfgReputationContract))
	fatalOnErr(err)

	log, err := logger.NewLogger(viperCfg)
	fatalOnErr(err)

	netAddr, err := network.AddressFromString(viperCfg.GetString(cfgBootstrapAddress))
	fatalOnErr(err)

	maxChunkSize := viperCfg.GetUint64(cfgMaxMsgSize) * 3 / 4 // 25% to meta, 75% to payload
	maxAddrAmount := maxChunkSize / addressSize               // each address is about 72 bytes

	state := newNetworkState()

	// initialize async workers if it is configured so
	containerWorkerPool, err := initContainerWorkerPool(viperCfg)
	fatalOnErr(err)

	netmapWorkerPool, err := initNetmapWorkerPool(viperCfg)
	fatalOnErr(err)

	reputationWorkerPool, err := initReputationWorkerPool(viperCfg)
	fatalOnErr(err)

	relayOnly := viperCfg.GetBool(cfgReBootstrapRelay)

	c := &cfg{
		ctx:         context.Background(),
		internalErr: make(chan error),
		viper:       viperCfg,
		log:         log,
		wg:          new(sync.WaitGroup),
		key:         key,
		apiVersion:  pkg.SDKVersion(),
		cfgAccounting: cfgAccounting{
			scriptHash: u160Accounting,
			fee:        fixedn.Fixed8(viperCfg.GetInt(cfgAccountingFee)),
		},
		cfgContainer: cfgContainer{
			scriptHash: u160Container,
			fee:        fixedn.Fixed8(viperCfg.GetInt(cfgContainerFee)),
			workerPool: containerWorkerPool,
		},
		cfgNetmap: cfgNetmap{
			scriptHash:          u160Netmap,
			fee:                 fixedn.Fixed8(viperCfg.GetInt(cfgNetmapFee)),
			state:               state,
			workerPool:          netmapWorkerPool,
			reBootstrapInterval: viperCfg.GetUint64(cfgReBootstrapInterval),
			reBootstrapEnabled:  !relayOnly && viperCfg.GetBool(cfgReBootstrapEnabled),
			reBoostrapTurnedOff: atomic.NewBool(relayOnly),
			goOfflineEnabled:    viperCfg.GetBool(cfgShutdownOfflineEnabled),
		},
		cfgNodeInfo: cfgNodeInfo{
			bootType:   StorageNode,
			attributes: parseAttributes(viperCfg),
		},
		cfgGRPC: cfgGRPC{
			maxChunkSize:         maxChunkSize,
			maxAddrAmount:        maxAddrAmount,
			enableReflectService: viperCfg.GetBool(cfgReflectService),
		},
		localAddr: netAddr,
		respSvc: response.NewService(
			response.WithNetworkState(state),
		),
		cfgObject: cfgObject{
			pool: initObjectPool(viperCfg),
		},
		netStatus:    atomic.NewInt32(int32(control.NetmapStatus_STATUS_UNDEFINED)),
		healthStatus: atomic.NewInt32(int32(control.HealthStatus_HEALTH_STATUS_UNDEFINED)),
		cfgReputation: cfgReputation{
			scriptHash: u160Reputation,
			workerPool: reputationWorkerPool,
		},
	}

	if viperCfg.GetBool(cfgMetricsEnable) {
		c.metricsCollector = metrics.NewStorageMetrics()
	}

	initLocalStorage(c)

	return c
}

func initViper(path string) *viper.Viper {
	v := viper.New()

	v.SetEnvPrefix(misc.Prefix)
	v.AutomaticEnv()
	v.SetEnvKeyReplacer(strings.NewReplacer(".", "_"))

	v.SetDefault("app.name", misc.NodeName)
	v.SetDefault("app.version", misc.Version)

	defaultConfiguration(v)

	if path != "" {
		v.SetConfigFile(path)
		v.SetConfigType("yml")
		fatalOnErr(v.ReadInConfig())
	}

	return v
}

func defaultConfiguration(v *viper.Viper) {
	v.SetDefault(cfgNodeKey, "")          // node key
	v.SetDefault(cfgBootstrapAddress, "") // announced address of the node

	v.SetDefault(cfgMorphRPCAddress, []string{})
	v.SetDefault(cfgMorphNotifyRPCAddress, []string{})
	v.SetDefault(cfgMorphNotifyDialTimeout, 5*time.Second)

	v.SetDefault(cfgListenAddress, "127.0.0.1:50501") // listen address
	v.SetDefault(cfgMaxMsgSize, 4<<20)                // transport msg limit 4 MiB
	v.SetDefault(cfgReflectService, false)
	v.SetDefault(cfgDialTimeout, 5*time.Second)

	v.SetDefault(cfgAccountingContract, "1aeefe1d0dfade49740fff779c02cd4a0538ffb1")
	v.SetDefault(cfgAccountingFee, "1")

	v.SetDefault(cfgContainerContract, "9d2ca84d7fb88213c4baced5a6ed4dc402309039")
	v.SetDefault(cfgContainerFee, "1")
	v.SetDefault(cfgContainerWorkerPoolEnabled, true)
	v.SetDefault(cfgContainerWorkerPoolSize, 10)

	v.SetDefault(cfgReputationWorkerPoolEnabled, true)
	v.SetDefault(cfgReputationWorkerPoolSize, 10)

	v.SetDefault(cfgNetmapContract, "75194459637323ea8837d2afe8225ec74a5658c3")
	v.SetDefault(cfgNetmapFee, "1")
	v.SetDefault(cfgNetmapWorkerPoolEnabled, true)
	v.SetDefault(cfgNetmapWorkerPoolSize, 10)

	v.SetDefault(cfgLogLevel, "info")
	v.SetDefault(cfgLogFormat, "console")
	v.SetDefault(cfgLogTrace, "fatal")
	v.SetDefault(cfgLogInitSampling, 1000)
	v.SetDefault(cfgLogThereafterSampling, 1000)

	v.SetDefault(cfgProfilerEnable, false)
	v.SetDefault(cfgProfilerAddr, ":6060")
	v.SetDefault(cfgProfilerTTL, "30s")

	v.SetDefault(cfgMetricsEnable, false)
	v.SetDefault(cfgMetricsAddr, ":9090")

	v.SetDefault(cfgGCQueueSize, 1000)
	v.SetDefault(cfgGCQueueTick, "5s")
	v.SetDefault(cfgGCTimeout, "5s")

	v.SetDefault(cfgPolicerWorkScope, 100)
	v.SetDefault(cfgPolicerExpRate, 10) // in %
	v.SetDefault(cfgPolicerHeadTimeout, 5*time.Second)

	v.SetDefault(cfgReplicatorPutTimeout, 5*time.Second)

	v.SetDefault(cfgReBootstrapEnabled, false) // in epochs
	v.SetDefault(cfgReBootstrapInterval, 2)    // in epochs

	v.SetDefault(cfgObjectGetPoolSize, 10)
	v.SetDefault(cfgObjectHeadPoolSize, 10)
	v.SetDefault(cfgObjectPutPoolSize, 10)
	v.SetDefault(cfgObjectSearchPoolSize, 10)
	v.SetDefault(cfgObjectRangePoolSize, 10)
	v.SetDefault(cfgObjectRangeHashPoolSize, 10)

	v.SetDefault(cfgCtrlSvcAuthorizedKeys, []string{})

	v.SetDefault(cfgTombstoneLifetime, 5)
}

func (c *cfg) LocalAddress() *network.Address {
	return c.localAddr
}

func initLocalStorage(c *cfg) {
	initShardOptions(c)

	engineOpts := []engine.Option{engine.WithLogger(c.log)}
	if c.metricsCollector != nil {
		engineOpts = append(engineOpts, engine.WithMetrics(c.metricsCollector))
	}

	ls := engine.New(engineOpts...)

	for _, opts := range c.cfgObject.cfgLocalStorage.shardOpts {
		id, err := ls.AddShard(opts...)
		fatalOnErr(err)

		c.log.Info("shard attached to engine",
			zap.Stringer("id", id),
		)
	}

	c.cfgObject.cfgLocalStorage.localStorage = ls

	c.onShutdown(func() {
		c.log.Info("closing components of the storage engine...")

		err := ls.Close()
		if err != nil {
			c.log.Info("storage engine closing failure",
				zap.String("error", err.Error()),
			)
		} else {
			c.log.Info("all components of the storage engine closed successfully")
		}
	})
}

func initShardOptions(c *cfg) {
	var opts [][]shard.Option

	for i := 0; ; i++ {
		prefix := configPath(
			cfgLocalStorageSection,
			cfgStorageShardSection,
			strconv.Itoa(i),
		)

		useCache := c.viper.GetBool(
			configPath(prefix, cfgShardUseWriteCache),
		)

		writeCachePrefix := configPath(prefix, cfgWriteCacheSection)

		writeCachePath := c.viper.GetString(
			configPath(writeCachePrefix, cfgBlobStorTreePath),
		)
		if useCache && writeCachePath == "" {
			c.log.Warn("incorrect writeCache path, ignore shard")
			break
		}
		writeCacheMemSize := c.viper.GetUint64(configPath(writeCachePrefix, cfgWriteCacheMemSize))
		writeCacheDBSize := c.viper.GetUint64(configPath(writeCachePrefix, cfgWriteCacheDBSize))
		writeCacheSmallSize := c.viper.GetUint64(configPath(writeCachePrefix, cfgWriteCacheSmallSize))
		writeCacheMaxSize := c.viper.GetUint64(configPath(writeCachePrefix, cfgWriteCacheMaxSize))
		writeCacheWrkCount := c.viper.GetInt(configPath(writeCachePrefix, cfgWriteCacheWrkCount))

		blobPrefix := configPath(prefix, cfgBlobStorSection)

		blobPath := c.viper.GetString(
			configPath(blobPrefix, cfgBlobStorTreePath),
		)
		if blobPath == "" {
			c.log.Warn("incorrect blobStor path, ignore shard")
			break
		}

		compressObjects := c.viper.GetBool(
			configPath(blobPrefix, cfgBlobStorCompress),
		)

		blobPerm := os.FileMode(c.viper.GetInt(
			configPath(blobPrefix, cfgBlobStorTreePerm),
		))

		shallowDepth := c.viper.GetInt(
			configPath(blobPrefix, cfgBlobStorShallowDepth),
		)

		smallSzLimit := c.viper.GetUint64(
			configPath(blobPrefix, cfgBlobStorSmallSzLimit),
		)
		if smallSzLimit == 0 {
			smallSzLimit = 1 << 20 // 1MB
		}
		if writeCacheMaxSize <= 0 {
			writeCacheSmallSize = smallSzLimit
		}

		blzPrefix := configPath(blobPrefix, cfgBlobStorBlzSection)

		blzSize := c.viper.GetUint64(
			configPath(blzPrefix, cfgBlzSize),
		)
		if blzSize == 0 {
			blzSize = 1 << 30 // 1 GB
		}

		blzShallowDepth := c.viper.GetUint64(
			configPath(blzPrefix, cfgBlzShallowDepth),
		)

		blzShallowWidth := c.viper.GetUint64(
			configPath(blzPrefix, cfgBlzShallowWidth),
		)

		blzCacheSize := c.viper.GetInt(
			configPath(blzPrefix, cfgBlzOpenedCacheSize),
		)

		metaPrefix := configPath(prefix, cfgMetaBaseSection)

		metaPath := c.viper.GetString(
			configPath(metaPrefix, cfgMetaBasePath),
		)

		metaPerm := os.FileMode(c.viper.GetUint32(
			configPath(metaPrefix, cfgMetaBasePerm),
		))

		fatalOnErr(os.MkdirAll(path.Dir(metaPath), metaPerm))

		gcPrefix := configPath(prefix, cfgGCSection)

		rmBatchSize := c.viper.GetInt(
			configPath(gcPrefix, cfgGCRemoverBatchSize),
		)

		rmSleepInterval := c.viper.GetDuration(
			configPath(gcPrefix, cfgGCRemoverSleepInt),
		)

		opts = append(opts, []shard.Option{
			shard.WithLogger(c.log),
			shard.WithBlobStorOptions(
				blobstor.WithRootPath(blobPath),
				blobstor.WithCompressObjects(compressObjects, c.log),
				blobstor.WithRootPerm(blobPerm),
				blobstor.WithShallowDepth(shallowDepth),
				blobstor.WithSmallSizeLimit(smallSzLimit),
				blobstor.WithBlobovniczaSize(blzSize),
				blobstor.WithBlobovniczaShallowDepth(blzShallowDepth),
				blobstor.WithBlobovniczaShallowWidth(blzShallowWidth),
				blobstor.WithBlobovniczaOpenedCacheSize(blzCacheSize),
				blobstor.WithLogger(c.log),
			),
			shard.WithMetaBaseOptions(
				meta.WithLogger(c.log),
				meta.WithPath(metaPath),
				meta.WithPermissions(metaPerm),
				meta.WithBoltDBOptions(&bbolt.Options{
					Timeout: 100 * time.Millisecond,
				}),
			),
			shard.WithWriteCache(useCache),
			shard.WithWriteCacheOptions(
				writecache.WithPath(writeCachePath),
				writecache.WithLogger(c.log),
				writecache.WithMaxMemSize(writeCacheMemSize),
				writecache.WithMaxObjectSize(writeCacheMaxSize),
				writecache.WithSmallObjectSize(writeCacheSmallSize),
				writecache.WithMaxDBSize(writeCacheDBSize),
				writecache.WithFlushWorkersCount(writeCacheWrkCount),
			),
			shard.WithRemoverBatchSize(rmBatchSize),
			shard.WithGCRemoverSleepInterval(rmSleepInterval),
			shard.WithGCWorkerPoolInitializer(func(sz int) util2.WorkerPool {
				pool, err := ants.NewPool(sz)
				fatalOnErr(err)

				return pool
			}),
			shard.WithGCEventChannelInitializer(func() <-chan shard.Event {
				ch := make(chan shard.Event)

				addNewEpochNotificationHandler(c, func(ev event.Event) {
					ch <- shard.EventNewEpoch(ev.(netmap2.NewEpoch).EpochNumber())
				})

				return ch
			}),
		})

		c.log.Info("storage shard options",
			zap.Bool("with write cache", useCache),
			zap.String("with write cache path", writeCachePath),
			zap.String("BLOB path", blobPath),
			zap.Stringer("BLOB permissions", blobPerm),
			zap.Bool("BLOB compress", compressObjects),
			zap.Int("BLOB shallow depth", shallowDepth),
			zap.Uint64("BLOB small size limit", smallSzLimit),
			zap.String("metabase path", metaPath),
			zap.Stringer("metabase permissions", metaPerm),
			zap.Int("GC remover batch size", rmBatchSize),
			zap.Duration("GC remover sleep interval", rmSleepInterval),
		)
	}

	if len(opts) == 0 {
		fatalOnErr(errors.New("no correctly set up shards, exit"))
	}

	c.cfgObject.cfgLocalStorage.shardOpts = opts
}

func configPath(sections ...string) string {
	return strings.Join(sections, ".")
}

func initObjectPool(cfg *viper.Viper) (pool cfgObjectRoutines) {
	var err error

	optNonBlocking := ants.WithNonblocking(true)

	pool.get, err = ants.NewPool(cfg.GetInt(cfgObjectGetPoolSize), optNonBlocking)
	if err != nil {
		fatalOnErr(err)
	}

	pool.head, err = ants.NewPool(cfg.GetInt(cfgObjectHeadPoolSize), optNonBlocking)
	if err != nil {
		fatalOnErr(err)
	}

	pool.search, err = ants.NewPool(cfg.GetInt(cfgObjectSearchPoolSize), optNonBlocking)
	if err != nil {
		fatalOnErr(err)
	}

	pool.put, err = ants.NewPool(cfg.GetInt(cfgObjectPutPoolSize), optNonBlocking)
	if err != nil {
		fatalOnErr(err)
	}

	pool.rng, err = ants.NewPool(cfg.GetInt(cfgObjectRangePoolSize), optNonBlocking)
	if err != nil {
		fatalOnErr(err)
	}

	pool.rngHash, err = ants.NewPool(cfg.GetInt(cfgObjectRangeHashPoolSize), optNonBlocking)
	if err != nil {
		fatalOnErr(err)
	}

	return pool
}

func initNetmapWorkerPool(v *viper.Viper) (util2.WorkerPool, error) {
	if v.GetBool(cfgNetmapWorkerPoolEnabled) {
		// return async worker pool
		return ants.NewPool(v.GetInt(cfgNetmapWorkerPoolSize))
	}

	// return sync worker pool
	return util2.SyncWorkerPool{}, nil
}

func initContainerWorkerPool(v *viper.Viper) (util2.WorkerPool, error) {
	if v.GetBool(cfgContainerWorkerPoolEnabled) {
		// return async worker pool
		return ants.NewPool(v.GetInt(cfgContainerWorkerPoolSize))
	}

	// return sync worker pool
	return util2.SyncWorkerPool{}, nil
}

func initReputationWorkerPool(v *viper.Viper) (util2.WorkerPool, error) {
	if v.GetBool(cfgReputationWorkerPoolEnabled) {
		// return async worker pool
		return ants.NewPool(v.GetInt(cfgReputationWorkerPoolSize))
	}

	// return sync worker pool
	return util2.SyncWorkerPool{}, nil
}

func (c *cfg) LocalNodeInfo() (*netmapV2.NodeInfo, error) {
	ni := c.localNodeInfo()
	return ni.ToV2(), nil
}

// handleLocalNodeInfo rewrites local node info
func (c *cfg) handleLocalNodeInfo(ni *netmap.NodeInfo) {
	c.cfgNodeInfo.infoMtx.Lock()

	if ni != nil {
		c.cfgNodeInfo.info = *ni
	}

	c.updateStatusWithoutLock(ni)

	c.cfgNodeInfo.infoMtx.Unlock()
}

// handleNodeInfoStatus updates node info status without rewriting whole local
// node info status
func (c *cfg) handleNodeInfoStatus(ni *netmap.NodeInfo) {
	c.cfgNodeInfo.infoMtx.Lock()

	c.updateStatusWithoutLock(ni)

	c.cfgNodeInfo.infoMtx.Unlock()
}

func (c *cfg) localNodeInfo() netmap.NodeInfo {
	c.cfgNodeInfo.infoMtx.RLock()
	defer c.cfgNodeInfo.infoMtx.RUnlock()

	return c.cfgNodeInfo.info
}

func (c *cfg) toOnlineLocalNodeInfo() *netmap.NodeInfo {
	ni := c.localNodeInfo()
	ni.SetState(netmap.NodeStateOnline)

	return &ni
}

func (c *cfg) updateStatusWithoutLock(ni *netmap.NodeInfo) {
	var nmState netmap.NodeState

	if ni != nil {
		nmState = ni.State()
	} else {
		nmState = netmap.NodeStateOffline
		c.cfgNodeInfo.info.SetState(nmState)
	}

	switch nmState {
	default:
		c.setNetmapStatus(control.NetmapStatus_STATUS_UNDEFINED)
	case netmap.NodeStateOnline:
		c.setNetmapStatus(control.NetmapStatus_ONLINE)
	case netmap.NodeStateOffline:
		c.setNetmapStatus(control.NetmapStatus_OFFLINE)
	}
}