[#11] Trim the old functionality

Signed-off-by: Leonard Lyubich <leonard@nspcc.ru>
This commit is contained in:
Leonard Lyubich 2020-08-21 14:32:03 +03:00 committed by Alex Vanin
parent 783ec72d56
commit a87fdab324
235 changed files with 39 additions and 36211 deletions

View file

@ -1,314 +0,0 @@
package main
import (
"time"
"github.com/nspcc-dev/neo-go/pkg/config/netmode"
"github.com/nspcc-dev/neofs-node/cmd/neofs-node/modules/morph"
"github.com/spf13/viper"
)
func setDefaults(v *viper.Viper) {
// Logger section
{
v.SetDefault("logger.level", "debug")
v.SetDefault("logger.format", "console")
v.SetDefault("logger.trace_level", "fatal")
v.SetDefault("logger.no_disclaimer", false) // to disable app_name and app_version
v.SetDefault("logger.sampling.initial", 1000) // todo: add description
v.SetDefault("logger.sampling.thereafter", 1000) // todo: add description
}
// Transport section
{
v.SetDefault("transport.attempts_count", 5)
v.SetDefault("transport.attempts_ttl", "30s")
}
// Peers section
{
v.SetDefault("peers.metrics_timeout", "5s")
v.SetDefault("peers.connections_ttl", "30s")
v.SetDefault("peers.connections_idle", "30s")
v.SetDefault("peers.keep_alive.ttl", "30s")
v.SetDefault("peers.keep_alive.ping", "100ms")
}
// Muxer session
{
v.SetDefault("muxer.http.read_buffer_size", 0)
v.SetDefault("muxer.http.write_buffer_size", 0)
v.SetDefault("muxer.http.read_timeout", 0)
v.SetDefault("muxer.http.write_timeout", 0)
}
// Node section
{
v.SetDefault("node.proto", "tcp") // tcp or udp
v.SetDefault("node.address", ":8080")
v.SetDefault("node.shutdown_ttl", "30s")
v.SetDefault("node.private_key", "keys/node_00.key")
v.SetDefault("node.grpc.logging", true)
v.SetDefault("node.grpc.metrics", true)
v.SetDefault("node.grpc.billing", true)
// Contains public keys, which can send requests to state.DumpConfig
// for now, in the future, should be replaced with ACL or something else.
v.SetDefault("node.rpc.owners", []string{
// By default we add user.key
// TODO should be removed before public release:
// or add into default Dockerfile `NEOFS_NODE_RPC_OWNERS_0=`
"031a6c6fbbdf02ca351745fa86b9ba5a9452d785ac4f7fc2b7548ca2a46c4fcf4a",
})
}
// Object section
{
v.SetDefault("object.max_processing_size", 100) // size in MB, use 0 to remove restriction
v.SetDefault("object.workers_count", 5)
v.SetDefault("object.assembly", true)
v.SetDefault("object.window_size", 3)
v.SetDefault("object.transformers.payload_limiter.max_payload_size", 5000) // size in KB
// algorithm used for salt applying in range hash, for now only xor is available
v.SetDefault("object.salitor", "xor")
// set true to check container ACL rules
v.SetDefault("object.check_acl", true)
v.SetDefault("object.dial_timeout", "500ms")
rpcs := []string{"put", "get", "delete", "head", "search", "range", "range_hash"}
for i := range rpcs {
v.SetDefault("object."+rpcs[i]+".timeout", "5s")
v.SetDefault("object."+rpcs[i]+".log_errs", false)
}
}
// Replication section
{
v.SetDefault("replication.manager.pool_size", 100)
v.SetDefault("replication.manager.pool_expansion_rate", 0.1)
v.SetDefault("replication.manager.read_pool_interval", "500ms")
v.SetDefault("replication.manager.push_task_timeout", "1s")
v.SetDefault("replication.manager.placement_honorer_enabled", true)
v.SetDefault("replication.manager.capacities.replicate", 1)
v.SetDefault("replication.manager.capacities.restore", 1)
v.SetDefault("replication.manager.capacities.garbage", 1)
v.SetDefault("replication.placement_honorer.chan_capacity", 1)
v.SetDefault("replication.placement_honorer.result_timeout", "1s")
v.SetDefault("replication.placement_honorer.timeouts.put", "5s")
v.SetDefault("replication.placement_honorer.timeouts.get", "5s")
v.SetDefault("replication.location_detector.chan_capacity", 1)
v.SetDefault("replication.location_detector.result_timeout", "1s")
v.SetDefault("replication.location_detector.timeouts.search", "5s")
v.SetDefault("replication.storage_validator.chan_capacity", 1)
v.SetDefault("replication.storage_validator.result_timeout", "1s")
v.SetDefault("replication.storage_validator.salt_size", 64) // size in bytes
v.SetDefault("replication.storage_validator.max_payload_range_size", 64) // size in bytes
v.SetDefault("replication.storage_validator.payload_range_count", 3)
v.SetDefault("replication.storage_validator.salitor", "xor")
v.SetDefault("replication.storage_validator.timeouts.get", "5s")
v.SetDefault("replication.storage_validator.timeouts.head", "5s")
v.SetDefault("replication.storage_validator.timeouts.range_hash", "5s")
v.SetDefault("replication.replicator.chan_capacity", 1)
v.SetDefault("replication.replicator.result_timeout", "1s")
v.SetDefault("replication.replicator.timeouts.put", "5s")
v.SetDefault("replication.restorer.chan_capacity", 1)
v.SetDefault("replication.restorer.result_timeout", "1s")
v.SetDefault("replication.restorer.timeouts.get", "5s")
v.SetDefault("replication.restorer.timeouts.head", "5s")
}
// PPROF section
{
v.SetDefault("pprof.enabled", true)
v.SetDefault("pprof.address", ":6060")
v.SetDefault("pprof.shutdown_ttl", "10s")
// v.SetDefault("pprof.read_timeout", "10s")
// v.SetDefault("pprof.read_header_timeout", "10s")
// v.SetDefault("pprof.write_timeout", "10s")
// v.SetDefault("pprof.idle_timeout", "10s")
// v.SetDefault("pprof.max_header_bytes", 1024)
}
// Metrics section
{
v.SetDefault("metrics.enabled", true)
v.SetDefault("metrics.address", ":8090")
v.SetDefault("metrics.shutdown_ttl", "10s")
// v.SetDefault("metrics.read_header_timeout", "10s")
// v.SetDefault("metrics.write_timeout", "10s")
// v.SetDefault("metrics.idle_timeout", "10s")
// v.SetDefault("metrics.max_header_bytes", 1024)
}
// Workers section
{
workers := []string{
"peers",
"boot",
"replicator",
"metrics",
"event_listener",
}
for i := range workers {
v.SetDefault("workers."+workers[i]+".immediately", true)
v.SetDefault("workers."+workers[i]+".disabled", false)
// v.SetDefault("workers."+workers[i]+".timer", "5s") // run worker every 5sec and reset timer after job
// v.SetDefault("workers."+workers[i]+".ticker", "5s") // run worker every 5sec
}
}
// Morph section
{
// Endpoint
v.SetDefault(
morph.EndpointOptPath(),
"http://morph_chain.localtest.nspcc.ru:30333",
)
// Dial timeout
v.SetDefault(
morph.DialTimeoutOptPath(),
5*time.Second,
)
v.SetDefault(
morph.MagicNumberOptPath(),
uint32(netmode.PrivNet),
)
{ // Event listener
// Endpoint
v.SetDefault(
morph.ListenerEndpointOptPath(),
"ws://morph_chain.localtest.nspcc.ru:30333/ws",
)
// Dial timeout
v.SetDefault(
morph.ListenerDialTimeoutOptPath(),
5*time.Second,
)
}
{ // Common parameters
for _, name := range morph.ContractNames {
// Script hash
v.SetDefault(
morph.ScriptHashOptPath(name),
"c77ecae9773ad0c619ad59f7f2dd6f585ddc2e70", // LE
)
// Invocation fee
v.SetDefault(
morph.InvocationFeeOptPath(name),
0,
)
}
}
{ // Container
// Set EACL method name
v.SetDefault(
morph.ContainerContractSetEACLOptPath(),
"SetEACL",
)
// Get EACL method name
v.SetDefault(
morph.ContainerContractEACLOptPath(),
"EACL",
)
// Put method name
v.SetDefault(
morph.ContainerContractPutOptPath(),
"Put",
)
// Get method name
v.SetDefault(
morph.ContainerContractGetOptPath(),
"Get",
)
// Delete method name
v.SetDefault(
morph.ContainerContractDelOptPath(),
"Delete",
)
// List method name
v.SetDefault(
morph.ContainerContractListOptPath(),
"List",
)
}
{ // Netmap
// AddPeer method name
v.SetDefault(
morph.NetmapContractAddPeerOptPath(),
"AddPeer",
)
// New epoch method name
v.SetDefault(
morph.NetmapContractNewEpochOptPath(),
"NewEpoch",
)
// Netmap method name
v.SetDefault(
morph.NetmapContractNetmapOptPath(),
"Netmap",
)
// Update state method name
v.SetDefault(
morph.NetmapContractUpdateStateOptPath(),
"UpdateState",
)
// IR list method name
v.SetDefault(
morph.NetmapContractIRListOptPath(),
"InnerRingList",
)
// New epoch event type
v.SetDefault(
morph.ContractEventOptPath(
morph.NetmapContractName,
morph.NewEpochEventType,
),
"NewEpoch",
)
}
{ // Balance
// balanceOf method name
v.SetDefault(
morph.BalanceContractBalanceOfOptPath(),
"balanceOf",
)
// decimals method name
v.SetDefault(
morph.BalanceContractDecimalsOfOptPath(),
"decimals",
)
}
}
}

View file

@ -1,146 +1,11 @@
package main
import (
"context"
"crypto/ecdsa"
"crypto/elliptic"
"crypto/rand"
"flag"
"os"
"time"
"github.com/nspcc-dev/neofs-api-go/service"
state2 "github.com/nspcc-dev/neofs-api-go/state"
crypto "github.com/nspcc-dev/neofs-crypto"
"github.com/nspcc-dev/neofs-node/cmd/neofs-node/modules/fix"
"github.com/nspcc-dev/neofs-node/cmd/neofs-node/modules/fix/config"
"github.com/nspcc-dev/neofs-node/cmd/neofs-node/modules/fix/worker"
"github.com/nspcc-dev/neofs-node/cmd/neofs-node/modules/node"
"github.com/nspcc-dev/neofs-node/misc"
"github.com/nspcc-dev/neofs-node/pkg/network/muxer"
statesrv "github.com/nspcc-dev/neofs-node/pkg/network/transport/state/grpc"
"github.com/nspcc-dev/neofs-node/pkg/util/profiler"
"github.com/pkg/errors"
"github.com/spf13/viper"
"go.uber.org/dig"
"go.uber.org/zap"
"google.golang.org/grpc"
"github.com/nspcc-dev/neofs-node/pkg/util/grace"
)
type params struct {
dig.In
Debug profiler.Profiler `optional:"true"`
Metric profiler.Metrics `optional:"true"`
Worker worker.Workers `optional:"true"`
Muxer muxer.Mux
Logger *zap.Logger
}
var (
healthCheck bool
configFile string
)
func runner(ctx context.Context, p params) error {
// create combined service, that would start/stop all
svc := fix.NewServices(p.Debug, p.Metric, p.Muxer, p.Worker)
p.Logger.Info("start services")
svc.Start(ctx)
<-ctx.Done()
p.Logger.Info("stop services")
svc.Stop()
return nil
}
func check(err error) {
if err != nil {
panic(err)
}
}
// FIXME: this is a copypaste from node settings constructor
func keyFromCfg(v *viper.Viper) (*ecdsa.PrivateKey, error) {
switch key := v.GetString("node.private_key"); key {
case "":
return nil, errors.New("`node.private_key` could not be empty")
case "generated":
return ecdsa.GenerateKey(elliptic.P256(), rand.Reader)
default:
return crypto.LoadPrivateKey(key)
}
}
func runHealthCheck() {
if !healthCheck {
return
}
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
cfg, err := config.NewConfig(config.Params{
File: configFile,
Prefix: misc.Prefix,
Name: misc.NodeName,
Version: misc.Version,
AppDefaults: setDefaults,
})
check(err)
addr := cfg.GetString("node.address")
key, err := keyFromCfg(cfg)
if err != nil {
check(err)
}
con, err := grpc.DialContext(ctx, addr,
// TODO: we must provide grpc.WithInsecure() or set credentials
grpc.WithInsecure())
check(err)
req := new(statesrv.HealthRequest)
req.SetTTL(service.NonForwardingTTL)
if err := service.SignRequestData(key, req); err != nil {
check(err)
}
res, err := state2.NewStatusClient(con).
HealthCheck(ctx, req)
check(errors.Wrapf(err, "address: %q", addr))
var exitCode int
if !res.Healthy {
exitCode = 2
}
_, _ = os.Stdout.Write([]byte(res.Status + "\n"))
os.Exit(exitCode)
}
func main() {
flag.BoolVar(&healthCheck, "health", healthCheck, "run health-check")
ctx := grace.NewGracefulContext(nil)
// todo: if configFile is empty, we can check './config.yml' manually
flag.StringVar(&configFile, "config", configFile, "use config.yml file")
flag.Parse()
runHealthCheck()
fix.New(&fix.Settings{
File: configFile,
Name: misc.NodeName,
Prefix: misc.Prefix,
Runner: runner,
Build: misc.Build,
Version: misc.Version,
AppDefaults: setDefaults,
}, node.Module).RunAndCatch()
<-ctx.Done()
}

View file

@ -1,93 +0,0 @@
package bootstrap
import (
"crypto/ecdsa"
"errors"
"sync"
contract "github.com/nspcc-dev/neofs-node/pkg/morph/client/netmap/wrapper"
state "github.com/nspcc-dev/neofs-node/pkg/network/transport/state/grpc"
"github.com/nspcc-dev/neofs-node/pkg/services/object_manager/placement"
"github.com/spf13/viper"
"go.uber.org/dig"
"go.uber.org/zap"
)
type (
healthyParams struct {
dig.In
Logger *zap.Logger
Viper *viper.Viper
Place placement.Component
Checkers []state.HealthChecker `group:"healthy"`
// for ChangeState
PrivateKey *ecdsa.PrivateKey
Client *contract.Wrapper
}
healthyResult struct {
dig.Out
HealthyClient HealthyClient
StateService state.Service
}
// HealthyClient is an interface of healthiness checking tool.
HealthyClient interface {
Healthy() error
}
healthyClient struct {
*sync.RWMutex
healthy func() error
}
)
var errUnhealthy = errors.New("unhealthy")
func (h *healthyClient) setHandler(handler func() error) {
if handler == nil {
return
}
h.Lock()
h.healthy = handler
h.Unlock()
}
func (h *healthyClient) Healthy() error {
if h.healthy == nil {
return errUnhealthy
}
return h.healthy()
}
func newHealthy(p healthyParams) (res healthyResult, err error) {
sp := state.Params{
Stater: p.Place,
Logger: p.Logger,
Viper: p.Viper,
Checkers: p.Checkers,
PrivateKey: p.PrivateKey,
Client: p.Client,
}
if res.StateService, err = state.New(sp); err != nil {
return
}
healthyClient := &healthyClient{
RWMutex: new(sync.RWMutex),
}
healthyClient.setHandler(res.StateService.Healthy)
res.HealthyClient = healthyClient
return
}

View file

@ -1,8 +0,0 @@
package bootstrap
import "github.com/nspcc-dev/neofs-node/cmd/neofs-node/modules/fix/module"
// Module is a module of bootstrap component.
var Module = module.Module{
{Constructor: newHealthy},
}

View file

@ -1,59 +0,0 @@
package fix
import (
"fmt"
"reflect"
"go.uber.org/zap"
)
func (a *app) Catch(err error) {
if err == nil {
return
}
if a.log == nil {
panic(err)
}
a.log.Fatal("Can't run app",
zap.Error(err))
}
// CatchTrace catch errors for debugging
// use that function just for debug your application.
func (a *app) CatchTrace(err error) {
if err == nil {
return
}
// digging into the root of the problem
for {
var (
ok bool
v = reflect.ValueOf(err)
fn reflect.Value
)
if v.Type().Kind() != reflect.Struct {
break
}
if !v.FieldByName("Reason").IsValid() {
break
}
if v.FieldByName("Func").IsValid() {
fn = v.FieldByName("Func")
}
fmt.Printf("Place: %#v\nReason: %s\n\n", fn, err)
if err, ok = v.FieldByName("Reason").Interface().(error); !ok {
err = v.Interface().(error)
break
}
}
panic(err)
}

View file

@ -1,53 +0,0 @@
package config
import (
"strings"
"github.com/spf13/viper"
)
// Params groups the parameters of configuration.
type Params struct {
File string
Type string
Prefix string
Name string
Version string
AppDefaults func(v *viper.Viper)
}
// NewConfig is a configuration tool's constructor.
func NewConfig(p Params) (v *viper.Viper, err error) {
v = viper.New()
v.SetEnvPrefix(p.Prefix)
v.AutomaticEnv()
v.SetEnvKeyReplacer(strings.NewReplacer(".", "_"))
v.SetDefault("app.name", p.Name)
v.SetDefault("app.version", p.Version)
if p.AppDefaults != nil {
p.AppDefaults(v)
}
if p.fromFile() {
v.SetConfigFile(p.File)
v.SetConfigType(p.safeType())
err = v.ReadInConfig()
}
return v, err
}
func (p Params) fromFile() bool {
return p.File != ""
}
func (p Params) safeType() string {
if p.Type == "" {
p.Type = "yaml"
}
return strings.ToLower(p.Type)
}

View file

@ -1,113 +0,0 @@
package fix
import (
"context"
"fmt"
"strconv"
"strings"
"github.com/nspcc-dev/neofs-node/cmd/neofs-node/modules/fix/config"
"github.com/nspcc-dev/neofs-node/cmd/neofs-node/modules/fix/module"
"github.com/nspcc-dev/neofs-node/misc"
"github.com/nspcc-dev/neofs-node/pkg/util/grace"
"github.com/nspcc-dev/neofs-node/pkg/util/logger"
"github.com/pkg/errors"
"github.com/spf13/viper"
"go.uber.org/dig"
"go.uber.org/zap"
)
type (
// App is an interface of executable application.
App interface {
Run() error
RunAndCatch()
}
app struct {
err error
log *zap.Logger
di *dig.Container
runner interface{}
}
// Settings groups the application parameters.
Settings struct {
File string
Type string
Name string
Prefix string
Build string
Version string
Runner interface{}
AppDefaults func(v *viper.Viper)
}
)
func (a *app) RunAndCatch() {
err := a.Run()
if errors.Is(err, context.Canceled) {
return
}
if ok, _ := strconv.ParseBool(misc.Debug); ok {
a.CatchTrace(err)
}
a.Catch(err)
}
func (a *app) Run() error {
if a.err != nil {
return a.err
}
// setup app logger:
if err := a.di.Invoke(func(l *zap.Logger) {
a.log = l
}); err != nil {
return err
}
return a.di.Invoke(a.runner)
}
// New is an application constructor.
func New(s *Settings, mod module.Module) App {
var (
a app
err error
)
a.di = dig.New(dig.DeferAcyclicVerification())
a.runner = s.Runner
if s.Prefix == "" {
s.Prefix = s.Name
}
mod = mod.Append(
module.Module{
{Constructor: logger.NewLogger},
{Constructor: grace.NewGracefulContext},
{Constructor: func() (*viper.Viper, error) {
return config.NewConfig(config.Params{
File: s.File,
Type: s.Type,
Prefix: strings.ToUpper(s.Prefix),
Name: s.Name,
Version: fmt.Sprintf("%s(%s)", s.Version, s.Build),
AppDefaults: s.AppDefaults,
})
}},
})
if err = module.Provide(a.di, mod); err != nil {
a.err = err
}
return &a
}

View file

@ -1,35 +0,0 @@
package module
import (
"go.uber.org/dig"
)
type (
// Module type
Module []*Provider
// Provider struct
Provider struct {
Constructor interface{}
Options []dig.ProvideOption
}
)
// Append module to target module and return new module
func (m Module) Append(mods ...Module) Module {
var result = m
for _, mod := range mods {
result = append(result, mod...)
}
return result
}
// Provide set providers functions to DI container
func Provide(dic *dig.Container, providers Module) error {
for _, p := range providers {
if err := dic.Provide(p.Constructor, p.Options...); err != nil {
return err
}
}
return nil
}

View file

@ -1,46 +0,0 @@
package fix
import (
"context"
)
type (
// Service interface
Service interface {
Start(context.Context)
Stop()
}
combiner []Service
)
var _ Service = (combiner)(nil)
// NewServices creates single runner.
func NewServices(items ...Service) Service {
var svc = make(combiner, 0, len(items))
for _, item := range items {
if item == nil {
continue
}
svc = append(svc, item)
}
return svc
}
// Start all services.
func (c combiner) Start(ctx context.Context) {
for _, svc := range c {
svc.Start(ctx)
}
}
// Stop all services.
func (c combiner) Stop() {
for _, svc := range c {
svc.Stop()
}
}

View file

@ -1,79 +0,0 @@
package worker
import (
"context"
"sync"
"sync/atomic"
"time"
)
type (
// Workers is an interface of worker tool.
Workers interface {
Start(context.Context)
Stop()
Add(Job Handler)
}
workers struct {
cancel context.CancelFunc
started *int32
wg *sync.WaitGroup
jobs []Handler
}
// Handler is a worker's handling function.
Handler func(ctx context.Context)
// Jobs is a map of worker names to handlers.
Jobs map[string]Handler
// Job groups the parameters of worker's job.
Job struct {
Disabled bool
Immediately bool
Timer time.Duration
Ticker time.Duration
Handler Handler
}
)
// New is a constructor of workers.
func New() Workers {
return &workers{
started: new(int32),
wg: new(sync.WaitGroup),
}
}
func (w *workers) Add(job Handler) {
w.jobs = append(w.jobs, job)
}
func (w *workers) Stop() {
if !atomic.CompareAndSwapInt32(w.started, 1, 0) {
// already stopped
return
}
w.cancel()
w.wg.Wait()
}
func (w *workers) Start(ctx context.Context) {
if !atomic.CompareAndSwapInt32(w.started, 0, 1) {
// already started
return
}
ctx, w.cancel = context.WithCancel(ctx)
for _, job := range w.jobs {
w.wg.Add(1)
go func(handler Handler) {
defer w.wg.Done()
handler(ctx)
}(job)
}
}

View file

@ -1,141 +0,0 @@
package grpc
import (
"context"
"github.com/gogo/protobuf/proto"
"github.com/nspcc-dev/neofs-api-go/refs"
"github.com/prometheus/client_golang/prometheus"
"google.golang.org/grpc"
)
type (
billingStream struct {
grpc.ServerStream
*grpc.StreamServerInfo
input int
output int
cid string
}
cider interface {
CID() refs.CID
}
)
const (
typeInput = "input"
typeOutput = "output"
labelType = "type"
labelMethod = "method"
labelContainer = "container"
)
var (
serviceBillingBytes = prometheus.NewCounterVec(prometheus.CounterOpts{
Namespace: "neofs",
Name: "billing_bytes",
Help: "Count of bytes received / sent for method and container",
}, []string{labelType, labelMethod, labelContainer})
serviceBillingCalls = prometheus.NewCounterVec(prometheus.CounterOpts{
Namespace: "neofs",
Name: "billing_calls",
Help: "Count of calls for api methods",
}, []string{labelMethod, labelContainer})
)
func init() {
// Register billing metrics
prometheus.MustRegister(serviceBillingBytes)
prometheus.MustRegister(serviceBillingCalls)
}
func getProtoSize(val interface{}) int {
if msg, ok := val.(proto.Message); ok && msg != nil {
return proto.Size(msg)
}
return 0
}
func getProtoContainer(val interface{}) string {
if t, ok := val.(cider); ok && t != nil {
return t.CID().String()
}
return ""
}
func (b *billingStream) RecvMsg(msg interface{}) error {
err := b.ServerStream.RecvMsg(msg)
b.input += getProtoSize(msg)
if cid := getProtoContainer(msg); cid != "" {
b.cid = cid
}
return err
}
func (b *billingStream) SendMsg(msg interface{}) error {
b.output += getProtoSize(msg)
return b.ServerStream.SendMsg(msg)
}
func (b *billingStream) report() {
labels := prometheus.Labels{
labelMethod: b.FullMethod,
labelContainer: b.cid,
}
serviceBillingCalls.With(labels).Inc()
labels[labelType] = typeInput
serviceBillingBytes.With(labels).Add(float64(b.input))
labels[labelType] = typeOutput
serviceBillingBytes.With(labels).Add(float64(b.output))
}
func streamBilling(srv interface{}, ss grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error {
stream := &billingStream{
ServerStream: ss,
StreamServerInfo: info,
}
err := handler(srv, stream)
stream.report()
return err
}
func unaryBilling(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (res interface{}, err error) {
input := getProtoSize(req)
cid := getProtoContainer(req)
labels := prometheus.Labels{
labelMethod: info.FullMethod,
labelContainer: cid,
}
serviceBillingCalls.With(labels).Inc()
if res, err = handler(ctx, req); err != nil {
return
}
output := getProtoSize(res)
labels[labelType] = typeInput
serviceBillingBytes.With(labels).Add(float64(input))
labels[labelType] = typeOutput
serviceBillingBytes.With(labels).Add(float64(output))
return
}

View file

@ -1,8 +0,0 @@
package grpc
import "github.com/nspcc-dev/neofs-node/cmd/neofs-node/modules/fix/module"
// Module is a gRPC layer module.
var Module = module.Module{
{Constructor: routing},
}

View file

@ -1,115 +0,0 @@
// About "github.com/nspcc-dev/neofs-node/lib/grpc"
// there's just alias for "google.golang.org/grpc"
// with Service-interface
package grpc
import (
middleware "github.com/grpc-ecosystem/go-grpc-middleware"
gZap "github.com/grpc-ecosystem/go-grpc-middleware/logging/zap"
prometheus "github.com/grpc-ecosystem/go-grpc-prometheus"
libgrpc "github.com/nspcc-dev/neofs-node/pkg/network/transport/grpc"
"github.com/spf13/viper"
"go.uber.org/dig"
"go.uber.org/zap"
"google.golang.org/grpc"
)
type (
Service = libgrpc.Service
// ServerParams to create gRPC-server
// and provide service-handlers
ServerParams struct {
dig.In
Services []Service
Logger *zap.Logger
Viper *viper.Viper
}
// ServicesResult ...
ServicesResult struct {
dig.Out
Services []Service
}
// Server type-alias
Server = grpc.Server
// CallOption type-alias
CallOption = grpc.CallOption
// ClientConn type-alias
ClientConn = grpc.ClientConn
// ServerOption type-alias
ServerOption = grpc.ServerOption
)
var (
// DialContext func-alias
DialContext = grpc.DialContext
// WithBlock func-alias
WithBlock = grpc.WithBlock
// WithInsecure func-alias
WithInsecure = grpc.WithInsecure
)
// NewServer creates a gRPC server which has no service registered and has not
// started to accept requests yet.
func NewServer(opts ...ServerOption) *Server {
return grpc.NewServer(opts...)
}
// creates new gRPC server and attach handlers.
func routing(p ServerParams) *grpc.Server {
var (
options []ServerOption
stream []grpc.StreamServerInterceptor
unary []grpc.UnaryServerInterceptor
)
if p.Viper.GetBool("node.grpc.billing") {
unary = append(unary, unaryBilling)
stream = append(stream, streamBilling)
}
if p.Viper.GetBool("node.grpc.logging") {
stream = append(stream, gZap.StreamServerInterceptor(p.Logger))
unary = append(unary, gZap.UnaryServerInterceptor(p.Logger))
}
if p.Viper.GetBool("node.grpc.metrics") {
stream = append(stream, prometheus.StreamServerInterceptor)
unary = append(unary, prometheus.UnaryServerInterceptor)
}
// Add stream options:
if len(stream) > 0 {
options = append(options,
grpc.StreamInterceptor(middleware.ChainStreamServer(stream...)),
)
}
// Add unary options:
if len(unary) > 0 {
options = append(options,
grpc.UnaryInterceptor(middleware.ChainUnaryServer(unary...)),
)
}
g := grpc.NewServer(options...)
// Service services here:
for _, service := range p.Services {
p.Logger.Info("register gRPC service",
zap.String("service", service.Name()))
service.Register(g)
}
return g
}

View file

@ -1,69 +0,0 @@
package morph
import (
contract "github.com/nspcc-dev/neofs-node/pkg/morph/client/balance"
clientWrapper "github.com/nspcc-dev/neofs-node/pkg/morph/client/balance/wrapper"
accounting "github.com/nspcc-dev/neofs-node/pkg/network/transport/accounting/grpc"
"github.com/pkg/errors"
"go.uber.org/dig"
)
type balanceContractResult struct {
dig.Out
Client *clientWrapper.Wrapper
AccountingService accounting.Service
}
// BalanceContractName is a name of Balance contract config sub-section.
const BalanceContractName = "balance"
const (
balanceContractBalanceOfOpt = "balance_of_method"
balanceContractDecimalsOfOpt = "decimals_method"
)
// BalanceContractBalanceOfOptPath is a path to balanceOf method name option.
func BalanceContractBalanceOfOptPath() string {
return optPath(prefix, BalanceContractName, balanceContractBalanceOfOpt)
}
// BalanceContractDecimalsOfOptPath is a path to decimals method name option.
func BalanceContractDecimalsOfOptPath() string {
return optPath(prefix, BalanceContractName, balanceContractDecimalsOfOpt)
}
func newBalanceContract(p contractParams) (res balanceContractResult, err error) {
client, ok := p.MorphContracts[BalanceContractName]
if !ok {
err = errors.Errorf("missing %s contract client", BalanceContractName)
return
}
var (
balanceOfMethod = p.Viper.GetString(BalanceContractBalanceOfOptPath())
decimalsMethod = p.Viper.GetString(BalanceContractDecimalsOfOptPath())
)
var c *contract.Client
if c, err = contract.New(client,
contract.WithBalanceOfMethod(balanceOfMethod),
contract.WithDecimalsMethod(decimalsMethod),
); err != nil {
return
}
if res.Client, err = clientWrapper.New(c); err != nil {
return
}
if res.AccountingService, err = accounting.New(accounting.Params{
ContractClient: res.Client,
}); err != nil {
return
}
return
}

View file

@ -1,138 +0,0 @@
package morph
import (
"github.com/nspcc-dev/neo-go/pkg/util"
"github.com/nspcc-dev/neofs-node/pkg/core/netmap"
"github.com/nspcc-dev/neofs-node/pkg/morph/client"
"github.com/nspcc-dev/neofs-node/pkg/morph/event"
"github.com/spf13/viper"
"go.uber.org/dig"
"go.uber.org/zap"
)
// SmartContracts maps smart contract name to contract client.
type SmartContracts map[string]*client.StaticClient
// EventHandlers maps notification event name to handler information.
type EventHandlers map[string]event.HandlerInfo
type morphContractsParams struct {
dig.In
Viper *viper.Viper
Client *client.Client
Listener event.Listener
}
type contractParams struct {
dig.In
Viper *viper.Viper
Logger *zap.Logger
MorphContracts SmartContracts
NodeInfo netmap.Info
}
func newMorphContracts(p morphContractsParams) (SmartContracts, EventHandlers, error) {
mContracts := make(map[string]*client.StaticClient, len(ContractNames))
mHandlers := make(map[string]event.HandlerInfo)
for _, contractName := range ContractNames {
scHash, err := util.Uint160DecodeStringLE(
p.Viper.GetString(
ScriptHashOptPath(contractName),
),
)
if err != nil {
return nil, nil, err
}
fee := util.Fixed8FromInt64(
p.Viper.GetInt64(
InvocationFeeOptPath(contractName),
),
)
mContracts[contractName], err = client.NewStatic(p.Client, scHash, fee)
if err != nil {
return nil, nil, err
}
// set event parsers
parserInfo := event.ParserInfo{}
parserInfo.SetScriptHash(scHash)
handlerInfo := event.HandlerInfo{}
handlerInfo.SetScriptHash(scHash)
for _, item := range mParsers[contractName] {
parserInfo.SetParser(item.parser)
optPath := ContractEventOptPath(contractName, item.typ)
typEvent := event.TypeFromString(
p.Viper.GetString(optPath),
)
parserInfo.SetType(typEvent)
handlerInfo.SetType(typEvent)
p.Listener.SetParser(parserInfo)
mHandlers[optPath] = handlerInfo
}
}
return mContracts, mHandlers, nil
}
const prefix = "morph"
const (
endpointOpt = "endpoint"
dialTimeoutOpt = "dial_timeout"
magicNumberOpt = "magic_number"
scriptHashOpt = "script_hash"
invocationFeeOpt = "invocation_fee"
)
// ContractNames is a list of smart contract names.
var ContractNames = []string{
containerContractName,
NetmapContractName,
BalanceContractName,
}
// EndpointOptPath returns the config path to goclient endpoint.
func EndpointOptPath() string {
return optPath(prefix, endpointOpt)
}
// MagicNumberOptPath returns the config path to goclient magic number.
func MagicNumberOptPath() string {
return optPath(prefix, magicNumberOpt)
}
// DialTimeoutOptPath returns the config path to goclient dial timeout.
func DialTimeoutOptPath() string {
return optPath(prefix, dialTimeoutOpt)
}
// ScriptHashOptPath calculates the config path to script hash config of particular contract.
func ScriptHashOptPath(name string) string {
return optPath(prefix, name, scriptHashOpt)
}
// InvocationFeeOptPath calculates the config path to invocation fee config of particular contract.
func InvocationFeeOptPath(name string) string {
return optPath(prefix, name, invocationFeeOpt)
}

View file

@ -1,103 +0,0 @@
package morph
import (
eacl "github.com/nspcc-dev/neofs-node/pkg/core/container/acl/extended/storage"
"github.com/nspcc-dev/neofs-node/pkg/core/container/storage"
contract "github.com/nspcc-dev/neofs-node/pkg/morph/client/container"
clientWrapper "github.com/nspcc-dev/neofs-node/pkg/morph/client/container/wrapper"
"github.com/pkg/errors"
"go.uber.org/dig"
)
type containerContractResult struct {
dig.Out
ExtendedACLStore eacl.Storage
ContainerStorage storage.Storage
}
const (
containerContractName = "container"
containerContractSetEACLOpt = "set_eacl_method"
containerContractEACLOpt = "get_eacl_method"
containerContractPutOpt = "put_method"
containerContractGetOpt = "get_method"
containerContractDelOpt = "delete_method"
containerContractListOpt = "list_method"
)
// ContainerContractSetEACLOptPath returns the config path to set eACL method name of Container contract.
func ContainerContractSetEACLOptPath() string {
return optPath(prefix, containerContractName, containerContractSetEACLOpt)
}
// ContainerContractEACLOptPath returns the config path to get eACL method name of Container contract.
func ContainerContractEACLOptPath() string {
return optPath(prefix, containerContractName, containerContractEACLOpt)
}
// ContainerContractPutOptPath returns the config path to put container method name of Container contract.
func ContainerContractPutOptPath() string {
return optPath(prefix, containerContractName, containerContractPutOpt)
}
// ContainerContractGetOptPath returns the config path to get container method name of Container contract.
func ContainerContractGetOptPath() string {
return optPath(prefix, containerContractName, containerContractGetOpt)
}
// ContainerContractDelOptPath returns the config path to delete container method name of Container contract.
func ContainerContractDelOptPath() string {
return optPath(prefix, containerContractName, containerContractDelOpt)
}
// ContainerContractListOptPath returns the config path to list containers method name of Container contract.
func ContainerContractListOptPath() string {
return optPath(prefix, containerContractName, containerContractListOpt)
}
func newContainerContract(p contractParams) (res containerContractResult, err error) {
client, ok := p.MorphContracts[containerContractName]
if !ok {
err = errors.Errorf("missing %s contract client", containerContractName)
return
}
var (
setEACLMethod = p.Viper.GetString(ContainerContractSetEACLOptPath())
eaclMethod = p.Viper.GetString(ContainerContractEACLOptPath())
getMethod = p.Viper.GetString(ContainerContractGetOptPath())
putMethod = p.Viper.GetString(ContainerContractPutOptPath())
deleteMethod = p.Viper.GetString(ContainerContractDelOptPath())
listMethod = p.Viper.GetString(ContainerContractListOptPath())
)
var containerClient *contract.Client
if containerClient, err = contract.New(client,
contract.WithSetEACLMethod(setEACLMethod),
contract.WithEACLMethod(eaclMethod),
contract.WithGetMethod(getMethod),
contract.WithPutMethod(putMethod),
contract.WithDeleteMethod(deleteMethod),
contract.WithListMethod(listMethod),
); err != nil {
return
}
var wrapClient *clientWrapper.Wrapper
if wrapClient, err = clientWrapper.New(containerClient); err != nil {
return
}
res.ContainerStorage = wrapClient
res.ExtendedACLStore = wrapClient
return res, nil
}

View file

@ -1,28 +0,0 @@
package morph
import (
"github.com/nspcc-dev/neofs-node/pkg/morph/event"
"github.com/nspcc-dev/neofs-node/pkg/morph/event/netmap"
)
const eventOpt = "event"
// NewEpochEventType is a config section of new epoch notification event.
const NewEpochEventType = "new_epoch"
// ContractEventOptPath returns the config path to notification event name of particular contract.
func ContractEventOptPath(contract, event string) string {
return optPath(prefix, contract, eventOpt, event)
}
var mParsers = map[string][]struct {
typ string
parser event.Parser
}{
NetmapContractName: {
{
typ: NewEpochEventType,
parser: netmap.ParseNewEpoch,
},
},
}

View file

@ -1,31 +0,0 @@
package morph
import (
"crypto/ecdsa"
"github.com/nspcc-dev/neo-go/pkg/config/netmode"
"github.com/nspcc-dev/neofs-node/pkg/morph/client"
"github.com/spf13/viper"
"go.uber.org/dig"
"go.uber.org/zap"
)
type morphClientParams struct {
dig.In
Viper *viper.Viper
Logger *zap.Logger
Key *ecdsa.PrivateKey
}
func newClient(p morphClientParams) (*client.Client, error) {
return client.New(
p.Key,
p.Viper.GetString(optPath(prefix, endpointOpt)),
client.WithLogger(p.Logger),
client.WithDialTimeout(p.Viper.GetDuration(optPath(prefix, dialTimeoutOpt))),
client.WithMagic(netmode.Magic(p.Viper.GetUint32(optPath(prefix, magicNumberOpt)))),
)
}

View file

@ -1,53 +0,0 @@
package morph
import (
"context"
"github.com/nspcc-dev/neofs-node/pkg/morph/event"
"github.com/nspcc-dev/neofs-node/pkg/morph/subscriber"
"github.com/spf13/viper"
"go.uber.org/dig"
"go.uber.org/zap"
)
type eventListenerParams struct {
dig.In
Viper *viper.Viper
Logger *zap.Logger
}
var listenerPrefix = optPath(prefix, "listener")
const (
listenerEndpointOpt = "endpoint"
listenerDialTimeoutOpt = "dial_timeout"
)
// ListenerEndpointOptPath returns the config path to event listener's endpoint.
func ListenerEndpointOptPath() string {
return optPath(listenerPrefix, listenerEndpointOpt)
}
// ListenerDialTimeoutOptPath returns the config path to event listener's dial timeout.
func ListenerDialTimeoutOptPath() string {
return optPath(listenerPrefix, listenerDialTimeoutOpt)
}
func newEventListener(p eventListenerParams) (event.Listener, error) {
sub, err := subscriber.New(context.Background(), &subscriber.Params{
Log: p.Logger,
Endpoint: p.Viper.GetString(ListenerEndpointOptPath()),
DialTimeout: p.Viper.GetDuration(ListenerDialTimeoutOptPath()),
})
if err != nil {
return nil, err
}
return event.NewListener(event.ListenerParams{
Logger: p.Logger,
Subscriber: sub,
})
}

View file

@ -1,21 +0,0 @@
package morph
import (
"strings"
"github.com/nspcc-dev/neofs-node/cmd/neofs-node/modules/fix/module"
)
// Module is a Neo:Morph module.
var Module = module.Module{
{Constructor: newClient},
{Constructor: newMorphContracts},
{Constructor: newContainerContract},
{Constructor: newNetmapContract},
{Constructor: newEventListener},
{Constructor: newBalanceContract},
}
func optPath(sections ...string) string {
return strings.Join(sections, ".")
}

View file

@ -1,94 +0,0 @@
package morph
import (
contract "github.com/nspcc-dev/neofs-node/pkg/morph/client/netmap"
clientWrapper "github.com/nspcc-dev/neofs-node/pkg/morph/client/netmap/wrapper"
"github.com/nspcc-dev/neofs-node/pkg/network/bootstrap"
"github.com/pkg/errors"
"go.uber.org/dig"
)
type netmapContractResult struct {
dig.Out
Client *clientWrapper.Wrapper
NodeRegisterer *bootstrap.Registerer
}
const (
// NetmapContractName is a Netmap contract's config section name.
NetmapContractName = "netmap"
netmapContractAddPeerOpt = "add_peer_method"
netmapContractNewEpochOpt = "new_epoch_method"
netmapContractNetmapOpt = "netmap_method"
netmapContractUpdStateOpt = "update_state_method"
netmapContractIRListOpt = "ir_list_method"
)
// NetmapContractAddPeerOptPath returns the config path to add peer method of Netmap contract.
func NetmapContractAddPeerOptPath() string {
return optPath(prefix, NetmapContractName, netmapContractAddPeerOpt)
}
// NetmapContractNewEpochOptPath returns the config path to new epoch method of Netmap contract.
func NetmapContractNewEpochOptPath() string {
return optPath(prefix, NetmapContractName, netmapContractNewEpochOpt)
}
// NetmapContractNetmapOptPath returns the config path to get netmap method of Netmap contract.
func NetmapContractNetmapOptPath() string {
return optPath(prefix, NetmapContractName, netmapContractNetmapOpt)
}
// NetmapContractUpdateStateOptPath returns the config path to update state method of Netmap contract.
func NetmapContractUpdateStateOptPath() string {
return optPath(prefix, NetmapContractName, netmapContractUpdStateOpt)
}
// NetmapContractIRListOptPath returns the config path to inner ring list method of Netmap contract.
func NetmapContractIRListOptPath() string {
return optPath(prefix, NetmapContractName, netmapContractIRListOpt)
}
func newNetmapContract(p contractParams) (res netmapContractResult, err error) {
client, ok := p.MorphContracts[NetmapContractName]
if !ok {
err = errors.Errorf("missing %s contract client", NetmapContractName)
return
}
var (
addPeerMethod = p.Viper.GetString(NetmapContractAddPeerOptPath())
newEpochMethod = p.Viper.GetString(NetmapContractNewEpochOptPath())
netmapMethod = p.Viper.GetString(NetmapContractNetmapOptPath())
updStateMethod = p.Viper.GetString(NetmapContractUpdateStateOptPath())
irListMethod = p.Viper.GetString(NetmapContractIRListOptPath())
)
var c *contract.Client
if c, err = contract.New(client,
contract.WithAddPeerMethod(addPeerMethod),
contract.WithNewEpochMethod(newEpochMethod),
contract.WithNetMapMethod(netmapMethod),
contract.WithUpdateStateMethod(updStateMethod),
contract.WithInnerRingListMethod(irListMethod),
); err != nil {
return
}
if res.Client, err = clientWrapper.New(c); err != nil {
return
}
if res.NodeRegisterer, err = bootstrap.New(res.Client, p.NodeInfo); err != nil {
return
}
return res, nil
}

View file

@ -1,49 +0,0 @@
package network
import (
"github.com/fasthttp/router"
svc "github.com/nspcc-dev/neofs-node/cmd/neofs-node/modules/bootstrap"
"github.com/valyala/fasthttp"
"go.uber.org/dig"
)
type (
handlerParams struct {
dig.In
Healthy svc.HealthyClient
}
)
const (
healthyState = "NeoFS node is "
defaultContentType = "text/plain; charset=utf-8"
)
func newHTTPHandler(p handlerParams) (fasthttp.RequestHandler, error) {
r := router.New()
r.RedirectTrailingSlash = true
r.GET("/-/ready/", func(c *fasthttp.RequestCtx) {
c.SetStatusCode(fasthttp.StatusOK)
c.SetBodyString(healthyState + "ready")
})
r.GET("/-/healthy/", func(c *fasthttp.RequestCtx) {
code := fasthttp.StatusOK
msg := "healthy"
err := p.Healthy.Healthy()
if err != nil {
code = fasthttp.StatusBadRequest
msg = "unhealthy: " + err.Error()
}
c.Response.Reset()
c.SetStatusCode(code)
c.SetContentType(defaultContentType)
c.SetBodyString(healthyState + msg)
})
return r.Handler, nil
}

View file

@ -1,19 +0,0 @@
package network
import (
"github.com/nspcc-dev/neofs-node/cmd/neofs-node/modules/fix/module"
"github.com/nspcc-dev/neofs-node/pkg/util/profiler"
)
// Module is a network layer module.
var Module = module.Module{
{Constructor: newMuxer},
{Constructor: newPeers},
{Constructor: newPlacement},
// Metrics is prometheus handler
{Constructor: profiler.NewMetrics},
// Profiler is pprof handler
{Constructor: profiler.NewProfiler},
{Constructor: newHTTPHandler},
}

View file

@ -1,53 +0,0 @@
package network
import (
"time"
"github.com/multiformats/go-multiaddr"
"github.com/nspcc-dev/neofs-node/pkg/network/muxer"
"github.com/spf13/viper"
"github.com/valyala/fasthttp"
"go.uber.org/dig"
"go.uber.org/zap"
"google.golang.org/grpc"
)
type muxerParams struct {
dig.In
Logger *zap.Logger
P2P *grpc.Server
Address multiaddr.Multiaddr
ShutdownTTL time.Duration `name:"shutdown_ttl"`
API fasthttp.RequestHandler
Viper *viper.Viper
}
const appName = "neofs-node"
func newFastHTTPServer(p muxerParams) *fasthttp.Server {
srv := new(fasthttp.Server)
srv.Name = appName
srv.ReadBufferSize = p.Viper.GetInt("muxer.http.read_buffer_size")
srv.WriteBufferSize = p.Viper.GetInt("muxer.http.write_buffer_size")
srv.ReadTimeout = p.Viper.GetDuration("muxer.http.read_timeout")
srv.WriteTimeout = p.Viper.GetDuration("muxer.http.write_timeout")
srv.GetOnly = true
srv.DisableHeaderNamesNormalizing = true
srv.NoDefaultServerHeader = true
srv.NoDefaultContentType = true
srv.Handler = p.API
return srv
}
func newMuxer(p muxerParams) muxer.Mux {
return muxer.New(muxer.Params{
P2P: p.P2P,
Logger: p.Logger,
Address: p.Address,
ShutdownTTL: p.ShutdownTTL,
API: newFastHTTPServer(p),
})
}

View file

@ -1,28 +0,0 @@
package network
import (
"github.com/multiformats/go-multiaddr"
"github.com/nspcc-dev/neofs-node/pkg/network/peers"
"github.com/spf13/viper"
"go.uber.org/dig"
"go.uber.org/zap"
)
type peersParams struct {
dig.In
Viper *viper.Viper
Logger *zap.Logger
Address multiaddr.Multiaddr
}
func newPeers(p peersParams) (peers.Interface, error) {
return peers.New(peers.Params{
Logger: p.Logger,
ConnectionTTL: p.Viper.GetDuration("peers.connections_ttl"),
ConnectionIDLE: p.Viper.GetDuration("peers.connections_idle"),
MetricsTimeout: p.Viper.GetDuration("peers.metrics_timeout"),
KeepAliveTTL: p.Viper.GetDuration("peers.keep_alive.ttl"),
KeepAlivePingTTL: p.Viper.GetDuration("peers.keep_alive.ping"),
})
}

View file

@ -1,79 +0,0 @@
package network
import (
"github.com/nspcc-dev/neofs-node/cmd/neofs-node/modules/morph"
"github.com/nspcc-dev/neofs-node/pkg/core/container/storage"
contract "github.com/nspcc-dev/neofs-node/pkg/morph/client/netmap/wrapper"
"github.com/nspcc-dev/neofs-node/pkg/morph/event"
netmapevent "github.com/nspcc-dev/neofs-node/pkg/morph/event/netmap"
"github.com/nspcc-dev/neofs-node/pkg/network/peers"
state "github.com/nspcc-dev/neofs-node/pkg/network/transport/state/grpc"
"github.com/nspcc-dev/neofs-node/pkg/services/object_manager/placement"
"go.uber.org/dig"
"go.uber.org/zap"
)
type (
placementParams struct {
dig.In
Log *zap.Logger
Peers peers.Store
Fetcher storage.Storage
MorphEventListener event.Listener
NetMapClient *contract.Wrapper
MorphEventHandlers morph.EventHandlers
}
placementOutput struct {
dig.Out
Placement placement.Component
Healthy state.HealthChecker `group:"healthy"`
}
)
const defaultChronologyDuraion = 2
func newPlacement(p placementParams) placementOutput {
place := placement.New(placement.Params{
Log: p.Log,
Peerstore: p.Peers,
Fetcher: p.Fetcher,
ChronologyDuration: defaultChronologyDuraion,
})
if handlerInfo, ok := p.MorphEventHandlers[morph.ContractEventOptPath(
morph.NetmapContractName,
morph.NewEpochEventType,
)]; ok {
handlerInfo.SetHandler(func(ev event.Event) {
nm, err := p.NetMapClient.GetNetMap()
if err != nil {
p.Log.Error("could not get network map",
zap.String("error", err.Error()),
)
return
}
if err := place.Update(
ev.(netmapevent.NewEpoch).EpochNumber(),
nm,
); err != nil {
p.Log.Error("could not update network map in placement component",
zap.String("error", err.Error()),
)
}
})
p.MorphEventListener.RegisterHandler(handlerInfo)
}
return placementOutput{
Placement: place,
Healthy: place.(state.HealthChecker),
}
}

View file

@ -1,65 +0,0 @@
package node
import (
"crypto/ecdsa"
"github.com/nspcc-dev/neofs-api-go/session"
"github.com/nspcc-dev/neofs-node/pkg/network/peers"
object "github.com/nspcc-dev/neofs-node/pkg/network/transport/object/grpc"
"github.com/nspcc-dev/neofs-node/pkg/services/object_manager/placement"
"github.com/nspcc-dev/neofs-node/pkg/services/object_manager/replication/storage"
"github.com/nspcc-dev/neofs-node/pkg/services/object_manager/transport"
"github.com/spf13/viper"
"go.uber.org/zap"
)
type (
cnrHandlerParams struct {
*viper.Viper
*zap.Logger
Placer *placement.PlacementWrapper
PeerStore peers.Store
Peers peers.Interface
TimeoutsPrefix string
Key *ecdsa.PrivateKey
TokenStore session.PrivateTokenStore
}
)
func newObjectsContainerHandler(p cnrHandlerParams) (transport.SelectiveContainerExecutor, error) {
as, err := storage.NewAddressStore(p.PeerStore, p.Logger)
if err != nil {
return nil, err
}
multiTransport, err := object.NewMultiTransport(object.MultiTransportParams{
AddressStore: as,
EpochReceiver: p.Placer,
RemoteService: object.NewRemoteService(p.Peers),
Logger: p.Logger,
Key: p.Key,
PutTimeout: p.Viper.GetDuration(p.TimeoutsPrefix + ".timeouts.put"),
GetTimeout: p.Viper.GetDuration(p.TimeoutsPrefix + ".timeouts.get"),
HeadTimeout: p.Viper.GetDuration(p.TimeoutsPrefix + ".timeouts.head"),
SearchTimeout: p.Viper.GetDuration(p.TimeoutsPrefix + ".timeouts.search"),
RangeHashTimeout: p.Viper.GetDuration(p.TimeoutsPrefix + ".timeouts.range_hash"),
DialTimeout: p.Viper.GetDuration("object.dial_timeout"),
PrivateTokenStore: p.TokenStore,
})
if err != nil {
return nil, err
}
exec, err := transport.NewContainerTraverseExecutor(multiTransport)
if err != nil {
return nil, err
}
return transport.NewObjectContainerHandler(transport.ObjectContainerHandlerParams{
NodeLister: p.Placer,
Executor: exec,
Logger: p.Logger,
})
}

View file

@ -1,31 +0,0 @@
package node
import (
svc "github.com/nspcc-dev/neofs-node/cmd/neofs-node/modules/bootstrap"
eacl "github.com/nspcc-dev/neofs-node/pkg/core/container/acl/extended/storage"
"github.com/nspcc-dev/neofs-node/pkg/core/container/storage"
container "github.com/nspcc-dev/neofs-node/pkg/network/transport/container/grpc"
"go.uber.org/dig"
"go.uber.org/zap"
)
type cnrParams struct {
dig.In
Logger *zap.Logger
Healthy svc.HealthyClient
ExtendedACLStore eacl.Storage
ContainerStorage storage.Storage
}
func newContainerService(p cnrParams) (container.Service, error) {
return container.New(container.Params{
Logger: p.Logger,
Healthy: p.Healthy,
Store: p.ContainerStorage,
ExtendedACLStore: p.ExtendedACLStore,
})
}

View file

@ -1,35 +0,0 @@
package node
import (
"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/bucket"
"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/bucket/boltdb"
"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/bucket/fsbucket"
"github.com/spf13/viper"
)
type Buckets map[string]bucket.Bucket
const (
fsBucket = "fsbucket"
boltBucket = "bolt"
)
func newBuckets(v *viper.Viper) (Buckets, error) {
var (
err error
mBuckets = make(Buckets)
)
if mBuckets[fsBucket], err = fsbucket.NewBucket(v); err != nil {
return nil, err
}
boltOpts, err := boltdb.NewOptions(v)
if err != nil {
return nil, err
} else if mBuckets[boltBucket], err = boltdb.NewBucket(&boltOpts); err != nil {
return nil, err
}
return mBuckets, nil
}

View file

@ -1,53 +0,0 @@
package node
import (
"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/localstore"
meta2 "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/meta"
metrics2 "github.com/nspcc-dev/neofs-node/pkg/services/metrics"
"go.uber.org/atomic"
"go.uber.org/dig"
"go.uber.org/zap"
)
type (
localstoreParams struct {
dig.In
Logger *zap.Logger
Buckets Buckets
Counter *atomic.Float64
Collector metrics2.Collector
}
metaIterator struct {
iter localstore.Iterator
}
)
func newMetaIterator(iter localstore.Iterator) meta2.Iterator {
return &metaIterator{iter: iter}
}
func (m *metaIterator) Iterate(handler meta2.IterateFunc) error {
return m.iter.Iterate(nil, func(objMeta *localstore.ObjectMeta) bool {
return handler == nil || handler(objMeta.Object) != nil
})
}
func newLocalstore(p localstoreParams) (localstore.Localstore, error) {
local, err := localstore.New(localstore.Params{
BlobBucket: p.Buckets[fsBucket],
MetaBucket: p.Buckets[boltBucket],
Logger: p.Logger,
Collector: p.Collector,
})
if err != nil {
return nil, err
}
iter := newMetaIterator(local)
p.Collector.SetCounter(local)
p.Collector.SetIterator(iter)
return local, nil
}

View file

@ -1,46 +0,0 @@
package node
import (
metrics "github.com/nspcc-dev/neofs-node/pkg/network/transport/metrics/grpc"
metrics2 "github.com/nspcc-dev/neofs-node/pkg/services/metrics"
"github.com/spf13/viper"
"go.uber.org/atomic"
"go.uber.org/dig"
"go.uber.org/zap"
)
type (
metricsParams struct {
dig.In
Logger *zap.Logger
Options []string `name:"node_options"`
Viper *viper.Viper
Buckets Buckets
}
metricsServiceParams struct {
dig.In
Logger *zap.Logger
Collector metrics2.Collector
}
)
func newObjectCounter() *atomic.Float64 { return atomic.NewFloat64(0) }
func newMetricsService(p metricsServiceParams) (metrics.Service, error) {
return metrics.New(metrics.Params{
Logger: p.Logger,
Collector: p.Collector,
})
}
func newMetricsCollector(p metricsParams) (metrics2.Collector, error) {
return metrics2.New(metrics2.Params{
Options: p.Options,
Logger: p.Logger,
Interval: p.Viper.GetDuration("metrics_collector.interval"),
MetricsStore: p.Buckets[fsBucket],
})
}

View file

@ -1,89 +0,0 @@
package node
import (
"github.com/nspcc-dev/neofs-api-go/session"
"github.com/nspcc-dev/neofs-node/cmd/neofs-node/modules/bootstrap"
"github.com/nspcc-dev/neofs-node/cmd/neofs-node/modules/fix/module"
"github.com/nspcc-dev/neofs-node/cmd/neofs-node/modules/fix/worker"
"github.com/nspcc-dev/neofs-node/cmd/neofs-node/modules/grpc"
"github.com/nspcc-dev/neofs-node/cmd/neofs-node/modules/morph"
"github.com/nspcc-dev/neofs-node/cmd/neofs-node/modules/network"
"github.com/nspcc-dev/neofs-node/cmd/neofs-node/modules/settings"
"github.com/nspcc-dev/neofs-node/cmd/neofs-node/modules/workers"
"github.com/nspcc-dev/neofs-node/pkg/morph/event"
libboot "github.com/nspcc-dev/neofs-node/pkg/network/bootstrap"
"github.com/nspcc-dev/neofs-node/pkg/network/peers"
metrics2 "github.com/nspcc-dev/neofs-node/pkg/services/metrics"
"github.com/nspcc-dev/neofs-node/pkg/services/object_manager/replication"
"github.com/spf13/viper"
"go.uber.org/dig"
"go.uber.org/zap"
)
type jobParams struct {
dig.In
Logger *zap.Logger
Viper *viper.Viper
Peers peers.Store
Replicator replication.Manager
PeersInterface peers.Interface
Metrics metrics2.Collector
MorphEventListener event.Listener
NodeRegisterer *libboot.Registerer
}
// Module is a NeoFS node module.
var Module = module.Module{
{Constructor: attachJobs},
{Constructor: newPeerstore},
{Constructor: attachServices},
{Constructor: newBuckets},
{Constructor: newMetricsCollector},
{Constructor: newObjectCounter},
// -- Container gRPC handlers -- //
{Constructor: newContainerService},
// -- gRPC Services -- //
// -- Local store -- //
{Constructor: newLocalstore},
// -- Object manager -- //
{Constructor: newObjectManager},
// -- Replication manager -- //
{Constructor: newReplicationManager},
// -- Session service -- //
{Constructor: session.NewMapTokenStore},
{Constructor: newSessionService},
// -- Placement tool -- //
{Constructor: newPlacementTool},
// metrics service -- //
{Constructor: newMetricsService},
}.Append(
// app specific modules:
grpc.Module,
network.Module,
workers.Module,
settings.Module,
bootstrap.Module,
morph.Module,
)
func attachJobs(p jobParams) worker.Jobs {
return worker.Jobs{
"peers": p.PeersInterface.Job,
"metrics": p.Metrics.Start,
"event_listener": p.MorphEventListener.Listen,
"replicator": p.Replicator.Process,
"boot": p.NodeRegisterer.Bootstrap,
}
}

View file

@ -1,201 +0,0 @@
package node
import (
"crypto/ecdsa"
"github.com/nspcc-dev/neofs-api-go/bootstrap"
"github.com/nspcc-dev/neofs-api-go/hash"
apiobj "github.com/nspcc-dev/neofs-api-go/object"
"github.com/nspcc-dev/neofs-api-go/session"
eacl "github.com/nspcc-dev/neofs-node/pkg/core/container/acl/extended/storage"
"github.com/nspcc-dev/neofs-node/pkg/core/container/storage"
"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/localstore"
contract "github.com/nspcc-dev/neofs-node/pkg/morph/client/netmap/wrapper"
"github.com/nspcc-dev/neofs-node/pkg/network/peers"
object "github.com/nspcc-dev/neofs-node/pkg/network/transport/object/grpc"
"github.com/nspcc-dev/neofs-node/pkg/services/object_manager/placement"
storage2 "github.com/nspcc-dev/neofs-node/pkg/services/object_manager/replication/storage"
"github.com/nspcc-dev/neofs-node/pkg/services/object_manager/transformer"
"github.com/nspcc-dev/neofs-node/pkg/services/object_manager/transport"
"github.com/nspcc-dev/neofs-node/pkg/services/object_manager/transport/storagegroup"
"github.com/spf13/viper"
"go.uber.org/dig"
"go.uber.org/zap"
)
type (
objectManagerParams struct {
dig.In
Logger *zap.Logger
Viper *viper.Viper
LocalStore localstore.Localstore
PeersInterface peers.Interface
Peers peers.Store
Placement placement.Component
TokenStore session.PrivateTokenStore
Options []string `name:"node_options"`
Key *ecdsa.PrivateKey
NetMapClient *contract.Wrapper
Placer *placement.PlacementWrapper
ExtendedACLStore eacl.Storage
ContainerStorage storage.Storage
}
)
const (
transformersSectionPath = "object.transformers."
)
const xorSalitor = "xor"
func newObjectManager(p objectManagerParams) (object.Service, error) {
var sltr object.Salitor
if p.Viper.GetString("object.salitor") == xorSalitor {
sltr = hash.SaltXOR
}
as, err := storage2.NewAddressStore(p.Peers, p.Logger)
if err != nil {
return nil, err
}
rs := object.NewRemoteService(p.PeersInterface)
pto := p.Viper.GetDuration("object.put.timeout")
gto := p.Viper.GetDuration("object.get.timeout")
hto := p.Viper.GetDuration("object.head.timeout")
sto := p.Viper.GetDuration("object.search.timeout")
rhto := p.Viper.GetDuration("object.range_hash.timeout")
dto := p.Viper.GetDuration("object.dial_timeout")
tr, err := object.NewMultiTransport(object.MultiTransportParams{
AddressStore: as,
EpochReceiver: p.Placer,
RemoteService: rs,
Logger: p.Logger,
Key: p.Key,
PutTimeout: pto,
GetTimeout: gto,
HeadTimeout: hto,
SearchTimeout: sto,
RangeHashTimeout: rhto,
DialTimeout: dto,
PrivateTokenStore: p.TokenStore,
})
if err != nil {
return nil, err
}
exec, err := transport.NewContainerTraverseExecutor(tr)
if err != nil {
return nil, err
}
selectiveExec, err := transport.NewObjectContainerHandler(transport.ObjectContainerHandlerParams{
NodeLister: p.Placer,
Executor: exec,
Logger: p.Logger,
})
if err != nil {
return nil, err
}
sgInfoRecv, err := storagegroup.NewStorageGroupInfoReceiver(storagegroup.StorageGroupInfoReceiverParams{
SelectiveContainerExecutor: selectiveExec,
Logger: p.Logger,
})
if err != nil {
return nil, err
}
verifier, err := storage2.NewLocalIntegrityVerifier()
if err != nil {
return nil, err
}
trans, err := transformer.NewTransformer(transformer.Params{
SGInfoReceiver: sgInfoRecv,
EpochReceiver: p.Placer,
SizeLimit: uint64(p.Viper.GetInt64(transformersSectionPath+"payload_limiter.max_payload_size") * apiobj.UnitsKB),
Verifier: verifier,
})
if err != nil {
return nil, err
}
verifier, err = storage2.NewLocalHeadIntegrityVerifier()
if err != nil {
return nil, err
}
return object.New(&object.Params{
Verifier: verifier,
Salitor: sltr,
LocalStore: p.LocalStore,
MaxProcessingSize: p.Viper.GetUint64("object.max_processing_size") * uint64(apiobj.UnitsMB),
StorageCapacity: bootstrap.NodeInfo{Options: p.Options}.Capacity() * uint64(apiobj.UnitsGB),
PoolSize: p.Viper.GetInt("object.workers_count"),
Placer: p.Placer,
Transformer: trans,
ObjectRestorer: transformer.NewRestorePipeline(
transformer.SplitRestorer(),
),
RemoteService: rs,
AddressStore: as,
Logger: p.Logger,
TokenStore: p.TokenStore,
EpochReceiver: p.Placer,
PlacementWrapper: p.Placer,
Key: p.Key,
CheckACL: p.Viper.GetBool("object.check_acl"),
DialTimeout: p.Viper.GetDuration("object.dial_timeout"),
MaxPayloadSize: p.Viper.GetUint64("object.transformers.payload_limiter.max_payload_size") * uint64(apiobj.UnitsKB),
PutParams: object.OperationParams{
Timeout: pto,
LogErrors: p.Viper.GetBool("object.put.log_errs"),
},
GetParams: object.OperationParams{
Timeout: gto,
LogErrors: p.Viper.GetBool("object.get.log_errs"),
},
HeadParams: object.OperationParams{
Timeout: hto,
LogErrors: p.Viper.GetBool("object.head.log_errs"),
},
DeleteParams: object.OperationParams{
Timeout: p.Viper.GetDuration("object.delete.timeout"),
LogErrors: p.Viper.GetBool("object.get.log_errs"),
},
SearchParams: object.OperationParams{
Timeout: sto,
LogErrors: p.Viper.GetBool("object.search.log_errs"),
},
RangeParams: object.OperationParams{
Timeout: p.Viper.GetDuration("object.range.timeout"),
LogErrors: p.Viper.GetBool("object.range.log_errs"),
},
RangeHashParams: object.OperationParams{
Timeout: rhto,
LogErrors: p.Viper.GetBool("object.range_hash.log_errs"),
},
Assembly: p.Viper.GetBool("object.assembly"),
WindowSize: p.Viper.GetInt("object.window_size"),
ContainerStorage: p.ContainerStorage,
NetmapClient: p.NetMapClient,
SGInfoReceiver: sgInfoRecv,
ExtendedACLSource: p.ExtendedACLStore,
})
}

View file

@ -1,28 +0,0 @@
package node
import (
"crypto/ecdsa"
"github.com/multiformats/go-multiaddr"
"github.com/nspcc-dev/neofs-node/pkg/network/peers"
"go.uber.org/dig"
"go.uber.org/zap"
)
type peerstoreParams struct {
dig.In
Logger *zap.Logger
PrivateKey *ecdsa.PrivateKey
Address multiaddr.Multiaddr
Store peers.Storage `optional:"true"`
}
func newPeerstore(p peerstoreParams) (peers.Store, error) {
return peers.NewStore(peers.StoreParams{
Storage: p.Store,
Logger: p.Logger,
Addr: p.Address,
Key: p.PrivateKey,
})
}

View file

@ -1,28 +0,0 @@
package node
import (
"github.com/nspcc-dev/neofs-node/pkg/services/object_manager/placement"
"go.uber.org/dig"
)
type (
placementToolParams struct {
dig.In
Placement placement.Component
}
placementToolResult struct {
dig.Out
Placer *placement.PlacementWrapper
}
)
func newPlacementTool(p placementToolParams) (res placementToolResult, err error) {
if res.Placer, err = placement.NewObjectPlacer(p.Placement); err != nil {
return
}
return
}

View file

@ -1,385 +0,0 @@
package node
import (
"context"
"crypto/ecdsa"
"github.com/nspcc-dev/neofs-api-go/hash"
"github.com/nspcc-dev/neofs-api-go/session"
"github.com/nspcc-dev/neofs-node/cmd/neofs-node/modules/morph"
"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/localstore"
"github.com/nspcc-dev/neofs-node/pkg/morph/event"
"github.com/nspcc-dev/neofs-node/pkg/morph/event/netmap"
"github.com/nspcc-dev/neofs-node/pkg/network/peers"
"github.com/nspcc-dev/neofs-node/pkg/services/object_manager/placement"
"github.com/nspcc-dev/neofs-node/pkg/services/object_manager/replication"
"github.com/nspcc-dev/neofs-node/pkg/services/object_manager/replication/storage"
"github.com/pkg/errors"
"github.com/spf13/viper"
"go.uber.org/dig"
"go.uber.org/zap"
)
type (
replicationManagerParams struct {
dig.In
Viper *viper.Viper
PeersInterface peers.Interface
LocalStore localstore.Localstore
Peers peers.Store
Placement placement.Component
Logger *zap.Logger
Key *ecdsa.PrivateKey
Placer *placement.PlacementWrapper
TokenStore session.PrivateTokenStore
MorphEventListener event.Listener
MorphEventHandlers morph.EventHandlers
}
)
const (
mainReplicationPrefix = "replication"
managerPrefix = "manager"
placementHonorerPrefix = "placement_honorer"
locationDetectorPrefix = "location_detector"
storageValidatorPrefix = "storage_validator"
replicatorPrefix = "replicator"
restorerPrefix = "restorer"
)
func newReplicationManager(p replicationManagerParams) (replication.Manager, error) {
as, err := storage.NewAddressStore(p.Peers, p.Logger)
if err != nil {
return nil, err
}
ms, err := replication.NewMultiSolver(replication.MultiSolverParams{
AddressStore: as,
Placement: p.Placement,
})
if err != nil {
return nil, err
}
op := replication.NewObjectPool()
schd, err := replication.NewReplicationScheduler(replication.SchedulerParams{
ContainerActualityChecker: ms,
Iterator: p.LocalStore,
})
if err != nil {
return nil, err
}
integrityVerifier, err := storage.NewLocalIntegrityVerifier()
if err != nil {
return nil, err
}
verifier, err := storage.NewObjectValidator(&storage.ObjectValidatorParams{
AddressStore: ms,
Localstore: p.LocalStore,
Logger: p.Logger,
Verifier: integrityVerifier,
})
if err != nil {
return nil, err
}
placementHonorer, err := newPlacementHonorer(p, ms)
if err != nil {
return nil, err
}
locationDetector, err := newLocationDetector(p, ms)
if err != nil {
return nil, err
}
storageValidator, err := newStorageValidator(p, ms)
if err != nil {
return nil, err
}
replicator, err := newObjectReplicator(p, ms)
if err != nil {
return nil, err
}
restorer, err := newRestorer(p, ms)
if err != nil {
return nil, err
}
prefix := mainReplicationPrefix + "." + managerPrefix + "."
capPrefix := prefix + "capacities."
mngr, err := replication.NewManager(replication.ManagerParams{
Interval: p.Viper.GetDuration(prefix + "read_pool_interval"),
PushTaskTimeout: p.Viper.GetDuration(prefix + "push_task_timeout"),
InitPoolSize: p.Viper.GetInt(prefix + "pool_size"),
ExpansionRate: p.Viper.GetFloat64(prefix + "pool_expansion_rate"),
PlacementHonorerEnabled: p.Viper.GetBool(prefix + "placement_honorer_enabled"),
ReplicateTaskChanCap: p.Viper.GetInt(capPrefix + "replicate"),
RestoreTaskChanCap: p.Viper.GetInt(capPrefix + "restore"),
GarbageChanCap: p.Viper.GetInt(capPrefix + "garbage"),
ObjectPool: op,
ObjectVerifier: verifier,
PlacementHonorer: placementHonorer,
ObjectLocationDetector: locationDetector,
StorageValidator: storageValidator,
ObjectReplicator: replicator,
ObjectRestorer: restorer,
Scheduler: schd,
Logger: p.Logger,
})
if err != nil {
return nil, err
}
if handlerInfo, ok := p.MorphEventHandlers[morph.ContractEventOptPath(
morph.NetmapContractName,
morph.NewEpochEventType,
)]; ok {
handlerInfo.SetHandler(func(ev event.Event) {
mngr.HandleEpoch(
context.Background(),
ev.(netmap.NewEpoch).EpochNumber(),
)
})
p.MorphEventListener.RegisterHandler(handlerInfo)
}
return mngr, nil
}
func newPlacementHonorer(p replicationManagerParams, rss replication.RemoteStorageSelector) (replication.PlacementHonorer, error) {
prefix := mainReplicationPrefix + "." + placementHonorerPrefix
och, err := newObjectsContainerHandler(cnrHandlerParams{
Viper: p.Viper,
Logger: p.Logger,
Placer: p.Placer,
PeerStore: p.Peers,
Peers: p.PeersInterface,
TimeoutsPrefix: prefix,
Key: p.Key,
TokenStore: p.TokenStore,
})
if err != nil {
return nil, err
}
storage, err := storage.NewObjectStorage(storage.ObjectStorageParams{
Localstore: p.LocalStore,
SelectiveContainerExecutor: och,
Logger: p.Logger,
})
if err != nil {
return nil, err
}
return replication.NewPlacementHonorer(replication.PlacementHonorerParams{
ObjectSource: storage,
ObjectReceptacle: storage,
RemoteStorageSelector: rss,
PresenceChecker: p.LocalStore,
Logger: p.Logger,
TaskChanCap: p.Viper.GetInt(prefix + ".chan_capacity"),
ResultTimeout: p.Viper.GetDuration(prefix + ".result_timeout"),
})
}
func newLocationDetector(p replicationManagerParams, ms replication.MultiSolver) (replication.ObjectLocationDetector, error) {
prefix := mainReplicationPrefix + "." + locationDetectorPrefix
och, err := newObjectsContainerHandler(cnrHandlerParams{
Viper: p.Viper,
Logger: p.Logger,
Placer: p.Placer,
PeerStore: p.Peers,
Peers: p.PeersInterface,
TimeoutsPrefix: prefix,
Key: p.Key,
TokenStore: p.TokenStore,
})
if err != nil {
return nil, err
}
locator, err := storage.NewObjectLocator(storage.LocatorParams{
SelectiveContainerExecutor: och,
Logger: p.Logger,
})
if err != nil {
return nil, err
}
return replication.NewLocationDetector(&replication.LocationDetectorParams{
WeightComparator: ms,
ObjectLocator: locator,
ReservationRatioReceiver: ms,
PresenceChecker: p.LocalStore,
Logger: p.Logger,
TaskChanCap: p.Viper.GetInt(prefix + ".chan_capacity"),
ResultTimeout: p.Viper.GetDuration(prefix + ".result_timeout"),
})
}
func newStorageValidator(p replicationManagerParams, as replication.AddressStore) (replication.StorageValidator, error) {
prefix := mainReplicationPrefix + "." + storageValidatorPrefix
var sltr storage.Salitor
switch v := p.Viper.GetString(prefix + ".salitor"); v {
case xorSalitor:
sltr = hash.SaltXOR
default:
return nil, errors.Errorf("unsupported salitor: %s", v)
}
och, err := newObjectsContainerHandler(cnrHandlerParams{
Viper: p.Viper,
Logger: p.Logger,
Placer: p.Placer,
PeerStore: p.Peers,
Peers: p.PeersInterface,
TimeoutsPrefix: prefix,
Key: p.Key,
TokenStore: p.TokenStore,
})
if err != nil {
return nil, err
}
headVerifier, err := storage.NewLocalHeadIntegrityVerifier()
if err != nil {
return nil, err
}
verifier, err := storage.NewObjectValidator(&storage.ObjectValidatorParams{
AddressStore: as,
Localstore: p.LocalStore,
SelectiveContainerExecutor: och,
Logger: p.Logger,
Salitor: sltr,
SaltSize: p.Viper.GetInt(prefix + ".salt_size"),
MaxPayloadRangeSize: p.Viper.GetUint64(prefix + ".max_payload_range_size"),
PayloadRangeCount: p.Viper.GetInt(prefix + ".payload_range_count"),
Verifier: headVerifier,
})
if err != nil {
return nil, err
}
return replication.NewStorageValidator(replication.StorageValidatorParams{
ObjectVerifier: verifier,
PresenceChecker: p.LocalStore,
Logger: p.Logger,
TaskChanCap: p.Viper.GetInt(prefix + ".chan_capacity"),
ResultTimeout: p.Viper.GetDuration(prefix + ".result_timeout"),
AddrStore: as,
})
}
func newObjectReplicator(p replicationManagerParams, rss replication.RemoteStorageSelector) (replication.ObjectReplicator, error) {
prefix := mainReplicationPrefix + "." + replicatorPrefix
och, err := newObjectsContainerHandler(cnrHandlerParams{
Viper: p.Viper,
Logger: p.Logger,
Placer: p.Placer,
PeerStore: p.Peers,
Peers: p.PeersInterface,
TimeoutsPrefix: prefix,
Key: p.Key,
TokenStore: p.TokenStore,
})
if err != nil {
return nil, err
}
storage, err := storage.NewObjectStorage(storage.ObjectStorageParams{
Localstore: p.LocalStore,
SelectiveContainerExecutor: och,
Logger: p.Logger,
})
if err != nil {
return nil, err
}
return replication.NewReplicator(replication.ObjectReplicatorParams{
RemoteStorageSelector: rss,
ObjectSource: storage,
ObjectReceptacle: storage,
PresenceChecker: p.LocalStore,
Logger: p.Logger,
TaskChanCap: p.Viper.GetInt(prefix + ".chan_capacity"),
ResultTimeout: p.Viper.GetDuration(prefix + ".result_timeout"),
})
}
func newRestorer(p replicationManagerParams, ms replication.MultiSolver) (replication.ObjectRestorer, error) {
prefix := mainReplicationPrefix + "." + restorerPrefix
och, err := newObjectsContainerHandler(cnrHandlerParams{
Viper: p.Viper,
Logger: p.Logger,
Placer: p.Placer,
PeerStore: p.Peers,
Peers: p.PeersInterface,
TimeoutsPrefix: prefix,
Key: p.Key,
TokenStore: p.TokenStore,
})
if err != nil {
return nil, err
}
integrityVerifier, err := storage.NewLocalIntegrityVerifier()
if err != nil {
return nil, err
}
verifier, err := storage.NewObjectValidator(&storage.ObjectValidatorParams{
AddressStore: ms,
Localstore: p.LocalStore,
SelectiveContainerExecutor: och,
Logger: p.Logger,
Verifier: integrityVerifier,
})
if err != nil {
return nil, err
}
storage, err := storage.NewObjectStorage(storage.ObjectStorageParams{
Localstore: p.LocalStore,
Logger: p.Logger,
})
if err != nil {
return nil, err
}
return replication.NewObjectRestorer(&replication.ObjectRestorerParams{
ObjectVerifier: verifier,
ObjectReceptacle: storage,
EpochReceiver: ms,
RemoteStorageSelector: ms,
PresenceChecker: p.LocalStore,
Logger: p.Logger,
TaskChanCap: p.Viper.GetInt(prefix + ".chan_capacity"),
ResultTimeout: p.Viper.GetDuration(prefix + ".result_timeout"),
})
}

View file

@ -1,36 +0,0 @@
package node
import (
"github.com/nspcc-dev/neofs-node/cmd/neofs-node/modules/grpc"
accounting "github.com/nspcc-dev/neofs-node/pkg/network/transport/accounting/grpc"
container "github.com/nspcc-dev/neofs-node/pkg/network/transport/container/grpc"
metrics "github.com/nspcc-dev/neofs-node/pkg/network/transport/metrics/grpc"
object "github.com/nspcc-dev/neofs-node/pkg/network/transport/object/grpc"
session "github.com/nspcc-dev/neofs-node/pkg/network/transport/session/grpc"
state "github.com/nspcc-dev/neofs-node/pkg/network/transport/state/grpc"
"go.uber.org/dig"
)
type servicesParams struct {
dig.In
Status state.Service
Container container.Service
Object object.Service
Session session.Service
Accounting accounting.Service
Metrics metrics.Service
}
func attachServices(p servicesParams) grpc.ServicesResult {
return grpc.ServicesResult{
Services: []grpc.Service{
p.Status,
p.Container,
p.Accounting,
p.Metrics,
p.Session,
p.Object,
},
}
}

View file

@ -1,26 +0,0 @@
package node
import (
session "github.com/nspcc-dev/neofs-node/pkg/network/transport/session/grpc"
"github.com/nspcc-dev/neofs-node/pkg/services/object_manager/placement"
"go.uber.org/dig"
"go.uber.org/zap"
)
type sessionParams struct {
dig.In
Logger *zap.Logger
TokenStore session.TokenStore
EpochReceiver *placement.PlacementWrapper
}
func newSessionService(p sessionParams) (session.Service, error) {
return session.New(session.Params{
TokenStore: p.TokenStore,
Logger: p.Logger,
EpochReceiver: p.EpochReceiver,
}), nil
}

View file

@ -1,108 +0,0 @@
package settings
import (
"net"
"strconv"
"strings"
"github.com/multiformats/go-multiaddr"
"github.com/pkg/errors"
)
const (
protoTCP = "tcp"
protoUDP = "udp"
protoQUIC = "quic"
)
const emptyAddr = "0.0.0.0"
const ip4ColonCount = 1
var (
errEmptyAddress = errors.New("`node.address` could not be empty")
errEmptyProtocol = errors.New("`node.protocol` could not be empty")
errUnknownProtocol = errors.New("`node.protocol` unknown protocol")
errEmptyShutdownTTL = errors.New("`node.shutdown_ttl` could not be empty")
)
func ipVersion(address string) string {
if strings.Count(address, ":") > ip4ColonCount {
return "ip6"
}
return "ip4"
}
func prepareAddress(address string) (string, error) {
host, port, err := net.SplitHostPort(address)
if err != nil {
return "", errors.Wrapf(err, "could not fetch host/port: %s", address)
} else if host == "" {
host = emptyAddr
}
addr, err := net.ResolveIPAddr("ip", host)
if err != nil {
return "", errors.Wrapf(err, "could not resolve address: %s:%s", host, port)
}
return net.JoinHostPort(addr.IP.String(), port), nil
}
func resolveAddress(proto, address string) (string, string, error) {
var (
ip net.IP
host, port string
)
switch proto {
case protoTCP:
addr, err := net.ResolveTCPAddr(protoTCP, address)
if err != nil {
return "", "", errors.Wrapf(err, "could not parse address: '%s'", address)
}
ip = addr.IP
port = strconv.Itoa(addr.Port)
case protoUDP, protoQUIC:
addr, err := net.ResolveUDPAddr(protoUDP, address)
if err != nil {
return "", "", errors.Wrapf(err, "could not parse address: '%s'", address)
}
ip = addr.IP
port = strconv.Itoa(addr.Port)
default:
return "", "", errors.Wrapf(errUnknownProtocol, "unknown protocol: '%s'", proto)
}
if host = ip.String(); ip == nil {
host = emptyAddr
}
return host, port, nil
}
func multiAddressFromProtoAddress(proto, addr string) (multiaddr.Multiaddr, error) {
var (
err error
host, port string
ipVer = ipVersion(addr)
)
if host, port, err = resolveAddress(proto, addr); err != nil {
return nil, errors.Wrapf(err, "could not resolve address: (%s) '%s'", proto, addr)
}
items := []string{
ipVer,
host,
proto,
port,
}
addr = "/" + strings.Join(items, "/")
return multiaddr.NewMultiaddr(addr)
}

View file

@ -1,8 +0,0 @@
package settings
import "github.com/nspcc-dev/neofs-node/cmd/neofs-node/modules/fix/module"
// Module is a node settings module.
var Module = module.Module{
{Constructor: newNodeSettings},
}

View file

@ -1,150 +0,0 @@
package settings
import (
"crypto/ecdsa"
"crypto/elliptic"
"crypto/rand"
"fmt"
"strconv"
"strings"
"time"
"github.com/multiformats/go-multiaddr"
crypto "github.com/nspcc-dev/neofs-crypto"
"github.com/nspcc-dev/neofs-node/pkg/core/netmap"
"github.com/nspcc-dev/neofs-node/pkg/network/peers"
"github.com/pkg/errors"
"github.com/spf13/viper"
"go.uber.org/dig"
"go.uber.org/zap"
)
type (
nodeSettings struct {
dig.Out
Address multiaddr.Multiaddr
PrivateKey *ecdsa.PrivateKey
NodeOpts []string `name:"node_options"`
ShutdownTTL time.Duration `name:"shutdown_ttl"`
NodeInfo netmap.Info
}
)
const generateKey = "generated"
var errEmptyNodeSettings = errors.New("node settings could not be empty")
func newNodeSettings(v *viper.Viper, l *zap.Logger) (cfg nodeSettings, err error) {
// check, that we have node settings in provided config
if !v.IsSet("node") {
err = errEmptyNodeSettings
return
}
// try to load and setup ecdsa.PrivateKey
key := v.GetString("node.private_key")
switch key {
case "":
err = crypto.ErrEmptyPrivateKey
return cfg, err
case generateKey:
if cfg.PrivateKey, err = ecdsa.GenerateKey(elliptic.P256(), rand.Reader); err != nil {
return cfg, err
}
default:
if cfg.PrivateKey, err = crypto.LoadPrivateKey(key); err != nil {
return cfg, errors.Wrap(err, "cannot unmarshal private key")
}
}
id := peers.IDFromPublicKey(&cfg.PrivateKey.PublicKey)
pub := crypto.MarshalPublicKey(&cfg.PrivateKey.PublicKey)
l.Debug("private key loaded successful",
zap.String("file", v.GetString("node.private_key")),
zap.Binary("public", pub),
zap.Stringer("node-id", id))
var (
addr string
proto string
)
// fetch shutdown timeout from settings
if cfg.ShutdownTTL = v.GetDuration("node.shutdown_ttl"); cfg.ShutdownTTL == 0 {
return cfg, errEmptyShutdownTTL
}
// fetch address and protocol from settings
if addr = v.GetString("node.address"); addr == "" {
return cfg, errors.Wrapf(errEmptyAddress, "given '%s'", addr)
} else if addr, err := prepareAddress(addr); err != nil {
return cfg, err
} else if proto = v.GetString("node.proto"); proto == "" {
return cfg, errors.Wrapf(errEmptyProtocol, "given '%s'", proto)
} else if cfg.Address, err = multiAddressFromProtoAddress(proto, addr); err != nil {
return cfg, errors.Wrapf(err, "given '%s' '%s'", proto, addr)
}
// add well-known options
items := map[string]string{
"Capacity": "capacity",
"Price": "price",
"Location": "location",
"Country": "country",
"City": "city",
}
// TODO: use const namings
prefix := "node."
for opt, path := range items {
val := v.GetString(prefix + path)
if len(val) == 0 {
err = errors.Errorf("node option %s must be set explicitly", opt)
return
}
cfg.NodeOpts = append(cfg.NodeOpts,
fmt.Sprintf("/%s:%s",
opt,
val,
),
)
}
// add other options
var (
i int
val string
)
loop:
for ; ; i++ {
val = v.GetString("node.options." + strconv.Itoa(i))
if val == "" {
break
}
for opt := range items {
if strings.Contains(val, "/"+opt) {
continue loop
}
}
cfg.NodeOpts = append(cfg.NodeOpts, val)
}
nodeInfo := netmap.Info{}
nodeInfo.SetAddress(cfg.Address.String())
nodeInfo.SetPublicKey(crypto.MarshalPublicKey(&cfg.PrivateKey.PublicKey))
nodeInfo.SetOptions(cfg.NodeOpts)
cfg.NodeInfo = nodeInfo
l.Debug("loaded node options",
zap.Strings("options", cfg.NodeOpts))
return cfg, err
}

View file

@ -1,8 +0,0 @@
package workers
import "github.com/nspcc-dev/neofs-node/cmd/neofs-node/modules/fix/module"
// Module is a workers module.
var Module = module.Module{
{Constructor: prepare},
}

View file

@ -1,132 +0,0 @@
package workers
import (
"context"
"time"
"github.com/nspcc-dev/neofs-node/cmd/neofs-node/modules/fix/worker"
"github.com/spf13/viper"
"go.uber.org/dig"
"go.uber.org/zap"
)
type (
// Result returns wrapped workers group for DI.
Result struct {
dig.Out
Workers []*worker.Job
}
// Params is dependencies for create workers slice.
Params struct {
dig.In
Jobs worker.Jobs
Viper *viper.Viper
Logger *zap.Logger
}
)
func prepare(p Params) worker.Workers {
w := worker.New()
for name, handler := range p.Jobs {
if job := byConfig(name, handler, p.Logger, p.Viper); job != nil {
p.Logger.Debug("worker: add new job",
zap.String("name", name))
w.Add(job)
}
}
return w
}
func byTicker(d time.Duration, h worker.Handler) worker.Handler {
return func(ctx context.Context) {
ticker := time.NewTicker(d)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return
default:
select {
case <-ctx.Done():
return
case <-ticker.C:
h(ctx)
}
}
}
}
}
func byTimer(d time.Duration, h worker.Handler) worker.Handler {
return func(ctx context.Context) {
timer := time.NewTimer(d)
defer timer.Stop()
for {
select {
case <-ctx.Done():
return
default:
select {
case <-ctx.Done():
return
case <-timer.C:
h(ctx)
timer.Reset(d)
}
}
}
}
}
func byConfig(name string, h worker.Handler, l *zap.Logger, v *viper.Viper) worker.Handler {
var job worker.Handler
if !v.IsSet("workers." + name) {
l.Info("worker: has no configuration",
zap.String("worker", name))
return nil
}
if v.GetBool("workers." + name + ".disabled") {
l.Info("worker: disabled",
zap.String("worker", name))
return nil
}
if ticker := v.GetDuration("workers." + name + ".ticker"); ticker > 0 {
job = byTicker(ticker, h)
}
if timer := v.GetDuration("workers." + name + ".timer"); timer > 0 {
job = byTimer(timer, h)
}
if v.GetBool("workers." + name + ".immediately") {
return func(ctx context.Context) {
h(ctx)
if job == nil {
return
}
// check context before run immediately job again
select {
case <-ctx.Done():
return
default:
}
job(ctx)
}
}
return job
}