[#328] container/load: Process Start/Stop notification events from contract

Construct used space Controller on node's app-side. Call Controller.Start on
StartEstimation event from sidechain. Call Controller.Stop on
StopEstimation event from sidechain.

Signed-off-by: Leonard Lyubich <leonard@nspcc.ru>
This commit is contained in:
Leonard Lyubich 2021-02-01 15:41:00 +03:00 committed by Leonard Lyubich
parent e9cbdc4a0d
commit b270c49b5c

View file

@ -11,6 +11,7 @@ import (
"github.com/nspcc-dev/neofs-api-go/pkg/netmap" "github.com/nspcc-dev/neofs-api-go/pkg/netmap"
containerV2 "github.com/nspcc-dev/neofs-api-go/v2/container" containerV2 "github.com/nspcc-dev/neofs-api-go/v2/container"
containerGRPC "github.com/nspcc-dev/neofs-api-go/v2/container/grpc" containerGRPC "github.com/nspcc-dev/neofs-api-go/v2/container/grpc"
crypto "github.com/nspcc-dev/neofs-crypto"
containerCore "github.com/nspcc-dev/neofs-node/pkg/core/container" containerCore "github.com/nspcc-dev/neofs-node/pkg/core/container"
netmapCore "github.com/nspcc-dev/neofs-node/pkg/core/netmap" netmapCore "github.com/nspcc-dev/neofs-node/pkg/core/netmap"
"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/engine" "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/engine"
@ -18,18 +19,26 @@ import (
"github.com/nspcc-dev/neofs-node/pkg/morph/client/container" "github.com/nspcc-dev/neofs-node/pkg/morph/client/container"
"github.com/nspcc-dev/neofs-node/pkg/morph/client/container/wrapper" "github.com/nspcc-dev/neofs-node/pkg/morph/client/container/wrapper"
"github.com/nspcc-dev/neofs-node/pkg/morph/event" "github.com/nspcc-dev/neofs-node/pkg/morph/event"
containerEvent "github.com/nspcc-dev/neofs-node/pkg/morph/event/container"
"github.com/nspcc-dev/neofs-node/pkg/network" "github.com/nspcc-dev/neofs-node/pkg/network"
"github.com/nspcc-dev/neofs-node/pkg/network/cache" "github.com/nspcc-dev/neofs-node/pkg/network/cache"
containerTransportGRPC "github.com/nspcc-dev/neofs-node/pkg/network/transport/container/grpc" containerTransportGRPC "github.com/nspcc-dev/neofs-node/pkg/network/transport/container/grpc"
containerService "github.com/nspcc-dev/neofs-node/pkg/services/container" containerService "github.com/nspcc-dev/neofs-node/pkg/services/container"
loadcontroller "github.com/nspcc-dev/neofs-node/pkg/services/container/announcement/load/controller" loadcontroller "github.com/nspcc-dev/neofs-node/pkg/services/container/announcement/load/controller"
loadroute "github.com/nspcc-dev/neofs-node/pkg/services/container/announcement/load/route" loadroute "github.com/nspcc-dev/neofs-node/pkg/services/container/announcement/load/route"
placementrouter "github.com/nspcc-dev/neofs-node/pkg/services/container/announcement/load/route/placement"
loadstorage "github.com/nspcc-dev/neofs-node/pkg/services/container/announcement/load/storage"
containerMorph "github.com/nspcc-dev/neofs-node/pkg/services/container/morph" containerMorph "github.com/nspcc-dev/neofs-node/pkg/services/container/morph"
"github.com/nspcc-dev/neofs-node/pkg/util/logger" "github.com/nspcc-dev/neofs-node/pkg/util/logger"
"github.com/pkg/errors" "github.com/pkg/errors"
"go.uber.org/zap" "go.uber.org/zap"
) )
const (
startEstimationNotifyEvent = "StartEstimation"
stopEstimationNotifyEvent = "StopEstimation"
)
func initContainerService(c *cfg) { func initContainerService(c *cfg) {
staticClient, err := client.NewStatic( staticClient, err := client.NewStatic(
c.cfgMorph.client, c.cfgMorph.client,
@ -47,6 +56,69 @@ func initContainerService(c *cfg) {
c.cfgObject.cnrStorage = wrap // use RPC node as source of containers c.cfgObject.cnrStorage = wrap // use RPC node as source of containers
c.cfgObject.cnrClient = wrap c.cfgObject.cnrClient = wrap
localMetrics := &localStorageLoad{
log: c.log,
engine: c.cfgObject.cfgLocalStorage.localStorage,
}
pubKey := crypto.MarshalPublicKey(&c.key.PublicKey)
resultWriter := &morphLoadWriter{
log: c.log,
cnrMorphClient: wrap,
key: pubKey,
}
loadAccumulator := loadstorage.New(loadstorage.Prm{})
loadPlacementBuilder := &loadPlacementBuilder{
log: c.log,
nmSrc: c.cfgNetmap.wrapper,
cnrSrc: wrap,
}
routeBuilder := placementrouter.New(placementrouter.Prm{
PlacementBuilder: loadPlacementBuilder,
})
loadRouter := loadroute.New(
loadroute.Prm{
LocalServerInfo: c,
RemoteWriterProvider: &remoteLoadAnnounceProvider{
key: c.key,
loadAddrSrc: c,
clientCache: cache.NewSDKClientCache(), // FIXME: use shared cache
deadEndProvider: loadcontroller.SimpleWriterProvider(loadAccumulator),
},
Builder: routeBuilder,
},
loadroute.WithLogger(c.log),
)
ctrl := loadcontroller.New(
loadcontroller.Prm{
LocalMetrics: loadcontroller.SimpleIteratorProvider(localMetrics),
AnnouncementAccumulator: loadcontroller.SimpleIteratorProvider(loadAccumulator),
LocalAnnouncementTarget: loadRouter,
ResultReceiver: loadcontroller.SimpleWriterProvider(resultWriter),
},
loadcontroller.WithLogger(c.log),
)
setContainerNotificationParser(c, startEstimationNotifyEvent, containerEvent.ParseStartEstimation)
addContainerNotificationHandler(c, startEstimationNotifyEvent, func(ev event.Event) {
ctrl.Start(loadcontroller.StartPrm{
Epoch: ev.(containerEvent.StartEstimation).Epoch(),
})
})
setContainerNotificationParser(c, stopEstimationNotifyEvent, containerEvent.ParseStopEstimation)
addContainerNotificationHandler(c, stopEstimationNotifyEvent, func(ev event.Event) {
ctrl.Stop(loadcontroller.StopPrm{
Epoch: ev.(containerEvent.StopEstimation).Epoch(),
})
})
containerGRPC.RegisterContainerServiceServer(c.cfgGRPC.server, containerGRPC.RegisterContainerServiceServer(c.cfgGRPC.server,
containerTransportGRPC.New( containerTransportGRPC.New(
containerService.NewSignService( containerService.NewSignService(