diff --git a/cmd/neofs-node/container.go b/cmd/neofs-node/container.go index 4ee5a6c7d..65d889e8b 100644 --- a/cmd/neofs-node/container.go +++ b/cmd/neofs-node/container.go @@ -11,6 +11,7 @@ import ( "github.com/nspcc-dev/neofs-api-go/pkg/netmap" containerV2 "github.com/nspcc-dev/neofs-api-go/v2/container" containerGRPC "github.com/nspcc-dev/neofs-api-go/v2/container/grpc" + crypto "github.com/nspcc-dev/neofs-crypto" containerCore "github.com/nspcc-dev/neofs-node/pkg/core/container" netmapCore "github.com/nspcc-dev/neofs-node/pkg/core/netmap" "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/engine" @@ -18,18 +19,26 @@ import ( "github.com/nspcc-dev/neofs-node/pkg/morph/client/container" "github.com/nspcc-dev/neofs-node/pkg/morph/client/container/wrapper" "github.com/nspcc-dev/neofs-node/pkg/morph/event" + containerEvent "github.com/nspcc-dev/neofs-node/pkg/morph/event/container" "github.com/nspcc-dev/neofs-node/pkg/network" "github.com/nspcc-dev/neofs-node/pkg/network/cache" containerTransportGRPC "github.com/nspcc-dev/neofs-node/pkg/network/transport/container/grpc" containerService "github.com/nspcc-dev/neofs-node/pkg/services/container" loadcontroller "github.com/nspcc-dev/neofs-node/pkg/services/container/announcement/load/controller" loadroute "github.com/nspcc-dev/neofs-node/pkg/services/container/announcement/load/route" + placementrouter "github.com/nspcc-dev/neofs-node/pkg/services/container/announcement/load/route/placement" + loadstorage "github.com/nspcc-dev/neofs-node/pkg/services/container/announcement/load/storage" containerMorph "github.com/nspcc-dev/neofs-node/pkg/services/container/morph" "github.com/nspcc-dev/neofs-node/pkg/util/logger" "github.com/pkg/errors" "go.uber.org/zap" ) +const ( + startEstimationNotifyEvent = "StartEstimation" + stopEstimationNotifyEvent = "StopEstimation" +) + func initContainerService(c *cfg) { staticClient, err := client.NewStatic( c.cfgMorph.client, @@ -47,6 +56,69 @@ func initContainerService(c *cfg) { c.cfgObject.cnrStorage = wrap // use RPC node as source of containers c.cfgObject.cnrClient = wrap + localMetrics := &localStorageLoad{ + log: c.log, + engine: c.cfgObject.cfgLocalStorage.localStorage, + } + + pubKey := crypto.MarshalPublicKey(&c.key.PublicKey) + + resultWriter := &morphLoadWriter{ + log: c.log, + cnrMorphClient: wrap, + key: pubKey, + } + + loadAccumulator := loadstorage.New(loadstorage.Prm{}) + + loadPlacementBuilder := &loadPlacementBuilder{ + log: c.log, + nmSrc: c.cfgNetmap.wrapper, + cnrSrc: wrap, + } + + routeBuilder := placementrouter.New(placementrouter.Prm{ + PlacementBuilder: loadPlacementBuilder, + }) + + loadRouter := loadroute.New( + loadroute.Prm{ + LocalServerInfo: c, + RemoteWriterProvider: &remoteLoadAnnounceProvider{ + key: c.key, + loadAddrSrc: c, + clientCache: cache.NewSDKClientCache(), // FIXME: use shared cache + deadEndProvider: loadcontroller.SimpleWriterProvider(loadAccumulator), + }, + Builder: routeBuilder, + }, + loadroute.WithLogger(c.log), + ) + + ctrl := loadcontroller.New( + loadcontroller.Prm{ + LocalMetrics: loadcontroller.SimpleIteratorProvider(localMetrics), + AnnouncementAccumulator: loadcontroller.SimpleIteratorProvider(loadAccumulator), + LocalAnnouncementTarget: loadRouter, + ResultReceiver: loadcontroller.SimpleWriterProvider(resultWriter), + }, + loadcontroller.WithLogger(c.log), + ) + + setContainerNotificationParser(c, startEstimationNotifyEvent, containerEvent.ParseStartEstimation) + addContainerNotificationHandler(c, startEstimationNotifyEvent, func(ev event.Event) { + ctrl.Start(loadcontroller.StartPrm{ + Epoch: ev.(containerEvent.StartEstimation).Epoch(), + }) + }) + + setContainerNotificationParser(c, stopEstimationNotifyEvent, containerEvent.ParseStopEstimation) + addContainerNotificationHandler(c, stopEstimationNotifyEvent, func(ev event.Event) { + ctrl.Stop(loadcontroller.StopPrm{ + Epoch: ev.(containerEvent.StopEstimation).Epoch(), + }) + }) + containerGRPC.RegisterContainerServiceServer(c.cfgGRPC.server, containerTransportGRPC.New( containerService.NewSignService(