From 5b550bff225fbfc8a8f7f5466ead87e4c599face Mon Sep 17 00:00:00 2001 From: Alex Vanin Date: Fri, 29 Jan 2021 10:48:47 +0300 Subject: [PATCH] [#355] innerring: Produce container size estimation notifications There are two notifications: - start estimation notification produced at the beginning of the epoch, - stop estimation notifications should be produced before basic audit settlement starts. Signed-off-by: Alex Vanin --- cmd/neofs-ir/defaults.go | 2 ++ pkg/innerring/blocktimer.go | 34 ++++++++++++++++++- pkg/innerring/innerring.go | 10 ++++-- .../processors/netmap/process_epoch.go | 9 +++++ pkg/innerring/processors/netmap/processor.go | 14 +++++--- 5 files changed, 62 insertions(+), 7 deletions(-) diff --git a/cmd/neofs-ir/defaults.go b/cmd/neofs-ir/defaults.go index 808cd174e..5ee1504da 100644 --- a/cmd/neofs-ir/defaults.go +++ b/cmd/neofs-ir/defaults.go @@ -76,6 +76,8 @@ func defaultConfiguration(cfg *viper.Viper) { cfg.SetDefault("timers.epoch", "0") cfg.SetDefault("timers.emit", "0") + cfg.SetDefault("timers.stop_estimation.mul", 1) + cfg.SetDefault("timers.stop_estimation.div", 1) cfg.SetDefault("workers.netmap", "10") cfg.SetDefault("workers.balance", "10") diff --git a/pkg/innerring/blocktimer.go b/pkg/innerring/blocktimer.go index 456ad80d7..fefa4cc63 100644 --- a/pkg/innerring/blocktimer.go +++ b/pkg/innerring/blocktimer.go @@ -4,13 +4,26 @@ import ( "github.com/nspcc-dev/neofs-node/pkg/innerring/processors/alphabet" "github.com/nspcc-dev/neofs-node/pkg/innerring/processors/netmap" "github.com/nspcc-dev/neofs-node/pkg/innerring/timers" + container "github.com/nspcc-dev/neofs-node/pkg/morph/client/container/wrapper" + "go.uber.org/zap" ) type ( + epochState interface { + EpochCounter() uint64 + } + epochTimerArgs struct { + l *zap.Logger + nm *netmap.Processor // to handle new epoch tick - epochDuration uint32 // in blocks + cnrWrapper *container.Wrapper // to invoke stop container estimation + epoch epochState // to specify which epoch to stop + + epochDuration uint32 // in blocks + stopEstimationDMul uint32 // X: X/Y of epoch in blocks + stopEstimationDDiv uint32 // Y: X/Y of epoch in blocks } emitTimerArgs struct { @@ -48,6 +61,25 @@ func newEpochTimer(args *epochTimerArgs) *timers.BlockTimer { }, ) + // sub-timer for epoch timer to tick stop container estimation events at + // some block in epoch + epochTimer.OnDelta( + args.stopEstimationDMul, + args.stopEstimationDDiv, + func() { + epochN := args.epoch.EpochCounter() + if epochN == 0 { // estimates are invalid in genesis epoch + return + } + + err := args.cnrWrapper.StopEstimation(epochN - 1) + if err != nil { + args.l.Warn("can't stop epoch estimation", + zap.Uint64("epoch", epochN), + zap.String("error", err.Error())) + } + }) + return epochTimer } diff --git a/pkg/innerring/innerring.go b/pkg/innerring/innerring.go index fe9a0fd7d..cc1041a6c 100644 --- a/pkg/innerring/innerring.go +++ b/pkg/innerring/innerring.go @@ -324,6 +324,7 @@ func New(ctx context.Context, log *zap.Logger, cfg *viper.Viper) (*Server, error ActiveState: server, CleanupEnabled: cfg.GetBool("netmap_cleaner.enabled"), CleanupThreshold: cfg.GetUint64("netmap_cleaner.threshold"), + ContainerWrapper: cnrClient, HandleAudit: server.onlyActiveEventHandler( auditProcessor.StartAuditHandler(), ), @@ -425,8 +426,13 @@ func New(ctx context.Context, log *zap.Logger, cfg *viper.Viper) (*Server, error // initialize epoch timers server.epochTimer = newEpochTimer(&epochTimerArgs{ - nm: netmapProcessor, - epochDuration: cfg.GetUint32("timers.epoch"), + l: server.log, + nm: netmapProcessor, + cnrWrapper: cnrClient, + epoch: server, + epochDuration: cfg.GetUint32("timers.epoch"), + stopEstimationDMul: cfg.GetUint32("timers.stop_estimation.mul"), + stopEstimationDDiv: cfg.GetUint32("timers.stop_estimation.div"), }) server.addBlockTimer(server.epochTimer) diff --git a/pkg/innerring/processors/netmap/process_epoch.go b/pkg/innerring/processors/netmap/process_epoch.go index 64ac439c1..18ae7946a 100644 --- a/pkg/innerring/processors/netmap/process_epoch.go +++ b/pkg/innerring/processors/netmap/process_epoch.go @@ -25,6 +25,15 @@ func (np *Processor) processNewEpoch(epoch uint64) { return } + if epoch > 0 { // estimates are invalid in genesis epoch + err = np.containerWrp.StartEstimation(epoch - 1) + if err != nil { + np.log.Warn("can't start container size estimation", + zap.Uint64("epoch", epoch), + zap.String("error", err.Error())) + } + } + np.netmapSnapshot.update(snapshot, epoch) np.handleCleanupTick(netmapCleanupTick{epoch: epoch}) np.handleNewAudit(audit.NewAuditStartEvent(epoch)) diff --git a/pkg/innerring/processors/netmap/processor.go b/pkg/innerring/processors/netmap/processor.go index e94504008..b84e40dc3 100644 --- a/pkg/innerring/processors/netmap/processor.go +++ b/pkg/innerring/processors/netmap/processor.go @@ -3,6 +3,7 @@ package netmap import ( "github.com/nspcc-dev/neo-go/pkg/util" "github.com/nspcc-dev/neofs-node/pkg/morph/client" + container "github.com/nspcc-dev/neofs-node/pkg/morph/client/container/wrapper" "github.com/nspcc-dev/neofs-node/pkg/morph/event" netmapEvent "github.com/nspcc-dev/neofs-node/pkg/morph/event/netmap" "github.com/panjf2000/ants/v2" @@ -36,12 +37,13 @@ type ( epochTimer EpochTimerReseter epochState EpochState activeState ActiveState - morphClient *client.Client + + morphClient *client.Client + containerWrp *container.Wrapper netmapSnapshot cleanupTable - handleNewAudit event.Handler - + handleNewAudit event.Handler handleAuditSettlements event.Handler } @@ -56,8 +58,9 @@ type ( ActiveState ActiveState CleanupEnabled bool CleanupThreshold uint64 // in epochs - HandleAudit event.Handler + ContainerWrapper *container.Wrapper + HandleAudit event.Handler AuditSettlementsHandler event.Handler } ) @@ -85,6 +88,8 @@ func New(p *Params) (*Processor, error) { return nil, errors.New("ir/netmap: audit handler is not set") case p.AuditSettlementsHandler == nil: return nil, errors.New("ir/netmap: audit settlement handler is not set") + case p.ContainerWrapper == nil: + return nil, errors.New("ir/netmap: container contract wrapper is not set") } p.Log.Debug("netmap worker pool", zap.Int("size", p.PoolSize)) @@ -102,6 +107,7 @@ func New(p *Params) (*Processor, error) { epochState: p.EpochState, activeState: p.ActiveState, morphClient: p.MorphClient, + containerWrp: p.ContainerWrapper, netmapSnapshot: newCleanupTable(p.CleanupEnabled, p.CleanupThreshold), handleNewAudit: p.HandleAudit,