[#355] innerring: Produce container size estimation notifications

There are two notifications:
- start estimation notification produced at the beginning of the
  epoch,
- stop estimation notifications should be produced before
  basic audit settlement starts.

Signed-off-by: Alex Vanin <alexey@nspcc.ru>
This commit is contained in:
Alex Vanin 2021-01-29 10:48:47 +03:00 committed by Alex Vanin
parent 6848a816f9
commit 5b550bff22
5 changed files with 62 additions and 7 deletions

View file

@ -76,6 +76,8 @@ func defaultConfiguration(cfg *viper.Viper) {
cfg.SetDefault("timers.epoch", "0") cfg.SetDefault("timers.epoch", "0")
cfg.SetDefault("timers.emit", "0") cfg.SetDefault("timers.emit", "0")
cfg.SetDefault("timers.stop_estimation.mul", 1)
cfg.SetDefault("timers.stop_estimation.div", 1)
cfg.SetDefault("workers.netmap", "10") cfg.SetDefault("workers.netmap", "10")
cfg.SetDefault("workers.balance", "10") cfg.SetDefault("workers.balance", "10")

View file

@ -4,13 +4,26 @@ import (
"github.com/nspcc-dev/neofs-node/pkg/innerring/processors/alphabet" "github.com/nspcc-dev/neofs-node/pkg/innerring/processors/alphabet"
"github.com/nspcc-dev/neofs-node/pkg/innerring/processors/netmap" "github.com/nspcc-dev/neofs-node/pkg/innerring/processors/netmap"
"github.com/nspcc-dev/neofs-node/pkg/innerring/timers" "github.com/nspcc-dev/neofs-node/pkg/innerring/timers"
container "github.com/nspcc-dev/neofs-node/pkg/morph/client/container/wrapper"
"go.uber.org/zap"
) )
type ( type (
epochState interface {
EpochCounter() uint64
}
epochTimerArgs struct { epochTimerArgs struct {
l *zap.Logger
nm *netmap.Processor // to handle new epoch tick nm *netmap.Processor // to handle new epoch tick
epochDuration uint32 // in blocks cnrWrapper *container.Wrapper // to invoke stop container estimation
epoch epochState // to specify which epoch to stop
epochDuration uint32 // in blocks
stopEstimationDMul uint32 // X: X/Y of epoch in blocks
stopEstimationDDiv uint32 // Y: X/Y of epoch in blocks
} }
emitTimerArgs struct { emitTimerArgs struct {
@ -48,6 +61,25 @@ func newEpochTimer(args *epochTimerArgs) *timers.BlockTimer {
}, },
) )
// sub-timer for epoch timer to tick stop container estimation events at
// some block in epoch
epochTimer.OnDelta(
args.stopEstimationDMul,
args.stopEstimationDDiv,
func() {
epochN := args.epoch.EpochCounter()
if epochN == 0 { // estimates are invalid in genesis epoch
return
}
err := args.cnrWrapper.StopEstimation(epochN - 1)
if err != nil {
args.l.Warn("can't stop epoch estimation",
zap.Uint64("epoch", epochN),
zap.String("error", err.Error()))
}
})
return epochTimer return epochTimer
} }

View file

@ -324,6 +324,7 @@ func New(ctx context.Context, log *zap.Logger, cfg *viper.Viper) (*Server, error
ActiveState: server, ActiveState: server,
CleanupEnabled: cfg.GetBool("netmap_cleaner.enabled"), CleanupEnabled: cfg.GetBool("netmap_cleaner.enabled"),
CleanupThreshold: cfg.GetUint64("netmap_cleaner.threshold"), CleanupThreshold: cfg.GetUint64("netmap_cleaner.threshold"),
ContainerWrapper: cnrClient,
HandleAudit: server.onlyActiveEventHandler( HandleAudit: server.onlyActiveEventHandler(
auditProcessor.StartAuditHandler(), auditProcessor.StartAuditHandler(),
), ),
@ -425,8 +426,13 @@ func New(ctx context.Context, log *zap.Logger, cfg *viper.Viper) (*Server, error
// initialize epoch timers // initialize epoch timers
server.epochTimer = newEpochTimer(&epochTimerArgs{ server.epochTimer = newEpochTimer(&epochTimerArgs{
nm: netmapProcessor, l: server.log,
epochDuration: cfg.GetUint32("timers.epoch"), nm: netmapProcessor,
cnrWrapper: cnrClient,
epoch: server,
epochDuration: cfg.GetUint32("timers.epoch"),
stopEstimationDMul: cfg.GetUint32("timers.stop_estimation.mul"),
stopEstimationDDiv: cfg.GetUint32("timers.stop_estimation.div"),
}) })
server.addBlockTimer(server.epochTimer) server.addBlockTimer(server.epochTimer)

View file

@ -25,6 +25,15 @@ func (np *Processor) processNewEpoch(epoch uint64) {
return return
} }
if epoch > 0 { // estimates are invalid in genesis epoch
err = np.containerWrp.StartEstimation(epoch - 1)
if err != nil {
np.log.Warn("can't start container size estimation",
zap.Uint64("epoch", epoch),
zap.String("error", err.Error()))
}
}
np.netmapSnapshot.update(snapshot, epoch) np.netmapSnapshot.update(snapshot, epoch)
np.handleCleanupTick(netmapCleanupTick{epoch: epoch}) np.handleCleanupTick(netmapCleanupTick{epoch: epoch})
np.handleNewAudit(audit.NewAuditStartEvent(epoch)) np.handleNewAudit(audit.NewAuditStartEvent(epoch))

View file

@ -3,6 +3,7 @@ package netmap
import ( import (
"github.com/nspcc-dev/neo-go/pkg/util" "github.com/nspcc-dev/neo-go/pkg/util"
"github.com/nspcc-dev/neofs-node/pkg/morph/client" "github.com/nspcc-dev/neofs-node/pkg/morph/client"
container "github.com/nspcc-dev/neofs-node/pkg/morph/client/container/wrapper"
"github.com/nspcc-dev/neofs-node/pkg/morph/event" "github.com/nspcc-dev/neofs-node/pkg/morph/event"
netmapEvent "github.com/nspcc-dev/neofs-node/pkg/morph/event/netmap" netmapEvent "github.com/nspcc-dev/neofs-node/pkg/morph/event/netmap"
"github.com/panjf2000/ants/v2" "github.com/panjf2000/ants/v2"
@ -36,12 +37,13 @@ type (
epochTimer EpochTimerReseter epochTimer EpochTimerReseter
epochState EpochState epochState EpochState
activeState ActiveState activeState ActiveState
morphClient *client.Client
morphClient *client.Client
containerWrp *container.Wrapper
netmapSnapshot cleanupTable netmapSnapshot cleanupTable
handleNewAudit event.Handler handleNewAudit event.Handler
handleAuditSettlements event.Handler handleAuditSettlements event.Handler
} }
@ -56,8 +58,9 @@ type (
ActiveState ActiveState ActiveState ActiveState
CleanupEnabled bool CleanupEnabled bool
CleanupThreshold uint64 // in epochs CleanupThreshold uint64 // in epochs
HandleAudit event.Handler ContainerWrapper *container.Wrapper
HandleAudit event.Handler
AuditSettlementsHandler event.Handler AuditSettlementsHandler event.Handler
} }
) )
@ -85,6 +88,8 @@ func New(p *Params) (*Processor, error) {
return nil, errors.New("ir/netmap: audit handler is not set") return nil, errors.New("ir/netmap: audit handler is not set")
case p.AuditSettlementsHandler == nil: case p.AuditSettlementsHandler == nil:
return nil, errors.New("ir/netmap: audit settlement handler is not set") return nil, errors.New("ir/netmap: audit settlement handler is not set")
case p.ContainerWrapper == nil:
return nil, errors.New("ir/netmap: container contract wrapper is not set")
} }
p.Log.Debug("netmap worker pool", zap.Int("size", p.PoolSize)) p.Log.Debug("netmap worker pool", zap.Int("size", p.PoolSize))
@ -102,6 +107,7 @@ func New(p *Params) (*Processor, error) {
epochState: p.EpochState, epochState: p.EpochState,
activeState: p.ActiveState, activeState: p.ActiveState,
morphClient: p.MorphClient, morphClient: p.MorphClient,
containerWrp: p.ContainerWrapper,
netmapSnapshot: newCleanupTable(p.CleanupEnabled, p.CleanupThreshold), netmapSnapshot: newCleanupTable(p.CleanupEnabled, p.CleanupThreshold),
handleNewAudit: p.HandleAudit, handleNewAudit: p.HandleAudit,