diff --git a/cmd/neofs-node/config.go b/cmd/neofs-node/config.go index 1eb56a82a..cde64c2b8 100644 --- a/cmd/neofs-node/config.go +++ b/cmd/neofs-node/config.go @@ -6,6 +6,7 @@ import ( "net" "strings" "sync" + "time" "github.com/nspcc-dev/neo-go/pkg/util" "github.com/nspcc-dev/neofs-api-go/pkg" @@ -20,6 +21,7 @@ import ( "github.com/nspcc-dev/neofs-node/pkg/morph/client" "github.com/nspcc-dev/neofs-node/pkg/morph/client/container/wrapper" nmwrapper "github.com/nspcc-dev/neofs-node/pkg/morph/client/netmap/wrapper" + "github.com/nspcc-dev/neofs-node/pkg/morph/event" "github.com/nspcc-dev/neofs-node/pkg/network" tokenStorage "github.com/nspcc-dev/neofs-node/pkg/services/session/storage" "github.com/nspcc-dev/neofs-node/pkg/util/logger" @@ -57,6 +59,9 @@ const ( // config keys for cfgMorph cfgMorphRPCAddress = "morph.endpoint" + cfgMorphNotifyRPCAddress = "morph.notification.endpoint" + cfgMorphNotifyDialTimeout = "morph.notification.dial_timeout" + // config keys for cfgAccounting cfgAccountingContract = "accounting.scripthash" cfgAccountingFee = "accounting.fee" @@ -150,6 +155,10 @@ type cfgNetmap struct { wrapper *nmwrapper.Wrapper fee util.Fixed8 + + parsers map[event.Type]event.Parser + + subscribers map[event.Type][]event.Handler } type BootstrapType uint32 @@ -277,6 +286,8 @@ func defaultConfiguration(v *viper.Viper) { v.SetDefault(cfgMaxObjectSize, 1024*1024) // default max object size 1 megabyte v.SetDefault(cfgMorphRPCAddress, "http://morph_chain.localtest.nspcc.ru:30333/") + v.SetDefault(cfgMorphNotifyRPCAddress, "ws://morph_chain:30333/ws") + v.SetDefault(cfgMorphNotifyDialTimeout, 5*time.Second) v.SetDefault(cfgListenAddress, "127.0.0.1:50501") // listen address v.SetDefault(cfgMaxMsgSize, 4<<20) // transport msg limit 4 MiB diff --git a/cmd/neofs-node/main.go b/cmd/neofs-node/main.go index 243d45443..bf9f27093 100644 --- a/cmd/neofs-node/main.go +++ b/cmd/neofs-node/main.go @@ -39,6 +39,8 @@ func init_(c *cfg) { initSessionService(c) initObjectService(c) initProfiler(c) + + listenMorphNotifications(c) } func bootUp(c *cfg) { diff --git a/cmd/neofs-node/morph.go b/cmd/neofs-node/morph.go index f83ae19c4..fc9c36a99 100644 --- a/cmd/neofs-node/morph.go +++ b/cmd/neofs-node/morph.go @@ -1,11 +1,19 @@ package main import ( + "fmt" + + "github.com/nspcc-dev/neo-go/pkg/util" "github.com/nspcc-dev/neofs-node/pkg/morph/client" "github.com/nspcc-dev/neofs-node/pkg/morph/client/netmap" "github.com/nspcc-dev/neofs-node/pkg/morph/client/netmap/wrapper" + "github.com/nspcc-dev/neofs-node/pkg/morph/event" + netmapEvent "github.com/nspcc-dev/neofs-node/pkg/morph/event/netmap" + "github.com/nspcc-dev/neofs-node/pkg/morph/subscriber" ) +const newEpochNotification = "NewEpoch" + func initMorphComponents(c *cfg) { var err error @@ -28,3 +36,51 @@ func initMorphComponents(c *cfg) { c.cfgObject.netMapStorage = wrap c.cfgNetmap.wrapper = wrap } + +func listenMorphNotifications(c *cfg) { + subs, err := subscriber.New(c.ctx, &subscriber.Params{ + Log: c.log, + Endpoint: c.viper.GetString(cfgMorphNotifyRPCAddress), + DialTimeout: c.viper.GetDuration(cfgMorphNotifyDialTimeout), + }) + fatalOnErr(err) + + lis, err := event.NewListener(event.ListenerParams{ + Logger: c.log, + Subscriber: subs, + }) + fatalOnErr(err) + + c.workers = append(c.workers, newWorkerFromFunc(lis.Listen)) + + setNetmapNotificationParser(c, newEpochNotification, netmapEvent.ParseNewEpoch) + registerNotificationHandlers(c.cfgNetmap.scriptHash, lis, c.cfgNetmap.parsers, c.cfgNetmap.subscribers) +} + +func registerNotificationHandlers(scHash util.Uint160, lis event.Listener, parsers map[event.Type]event.Parser, + subs map[event.Type][]event.Handler) { + for typ, handlers := range subs { + pi := event.ParserInfo{} + pi.SetType(typ) + pi.SetScriptHash(scHash) + + p, ok := parsers[typ] + if !ok { + panic(fmt.Sprintf("missing parser for event %s", typ)) + } + + pi.SetParser(p) + + lis.SetParser(pi) + + for _, h := range handlers { + hi := event.HandlerInfo{} + hi.SetType(typ) + hi.SetScriptHash(scHash) + hi.SetHandler(h) + + lis.RegisterHandler(hi) + } + + } +} diff --git a/cmd/neofs-node/netmap.go b/cmd/neofs-node/netmap.go index 8164ef433..5f79fdfdb 100644 --- a/cmd/neofs-node/netmap.go +++ b/cmd/neofs-node/netmap.go @@ -4,6 +4,7 @@ import ( "github.com/nspcc-dev/neofs-api-go/v2/netmap" netmapGRPC "github.com/nspcc-dev/neofs-api-go/v2/netmap/grpc" crypto "github.com/nspcc-dev/neofs-crypto" + "github.com/nspcc-dev/neofs-node/pkg/morph/event" netmapTransportGRPC "github.com/nspcc-dev/neofs-node/pkg/network/transport/netmap/grpc" netmapService "github.com/nspcc-dev/neofs-node/pkg/services/netmap" "github.com/pkg/errors" @@ -34,3 +35,27 @@ func bootstrapNode(c *cfg) { err := c.cfgNetmap.wrapper.AddPeer(c.cfgNodeInfo.info) fatalOnErr(errors.Wrap(err, "bootstrap error")) } + +func addNetmapNotificationHandler(c *cfg, sTyp string, h event.Handler) { + typ := event.TypeFromString(sTyp) + + if c.cfgNetmap.subscribers == nil { + c.cfgNetmap.subscribers = make(map[event.Type][]event.Handler, 1) + } + + c.cfgNetmap.subscribers[typ] = append(c.cfgNetmap.subscribers[typ], h) +} + +func setNetmapNotificationParser(c *cfg, sTyp string, p event.Parser) { + typ := event.TypeFromString(sTyp) + + if c.cfgNetmap.parsers == nil { + c.cfgNetmap.parsers = make(map[event.Type]event.Parser, 1) + } + + c.cfgNetmap.parsers[typ] = p +} + +func addNewEpochNotificationHandler(c *cfg, h event.Handler) { + addNetmapNotificationHandler(c, newEpochNotification, h) +} diff --git a/cmd/neofs-node/worker.go b/cmd/neofs-node/worker.go index 6accf9d01..eebce73c2 100644 --- a/cmd/neofs-node/worker.go +++ b/cmd/neofs-node/worker.go @@ -8,6 +8,20 @@ type worker interface { Run(context.Context) } +type workerFromFunc struct { + fn func(context.Context) +} + +func newWorkerFromFunc(fn func(ctx context.Context)) worker { + return &workerFromFunc{ + fn: fn, + } +} + +func (w *workerFromFunc) Run(ctx context.Context) { + w.fn(ctx) +} + func startWorkers(c *cfg) { for _, wrk := range c.workers { c.wg.Add(1)