forked from TrueCloudLab/frostfs-node
[#1437] ir: Fix contextcheck linters
Signed-off-by: Dmitrii Stepanov <d.stepanov@yadro.com>
This commit is contained in:
parent
507e889924
commit
6a2eeadb8f
27 changed files with 165 additions and 157 deletions
|
@ -1091,7 +1091,7 @@ func (c *cfg) LocalAddress() network.AddressGroup {
|
||||||
func initLocalStorage(ctx context.Context, c *cfg) {
|
func initLocalStorage(ctx context.Context, c *cfg) {
|
||||||
ls := engine.New(c.engineOpts()...)
|
ls := engine.New(c.engineOpts()...)
|
||||||
|
|
||||||
addNewEpochAsyncNotificationHandler(c, func(ev event.Event) {
|
addNewEpochAsyncNotificationHandler(c, func(ctx context.Context, ev event.Event) {
|
||||||
ls.HandleNewEpoch(ctx, ev.(netmap2.NewEpoch).EpochNumber())
|
ls.HandleNewEpoch(ctx, ev.(netmap2.NewEpoch).EpochNumber())
|
||||||
})
|
})
|
||||||
|
|
||||||
|
|
|
@ -92,7 +92,7 @@ func configureEACLAndContainerSources(c *cfg, client *cntClient.Client, cnrSrc c
|
||||||
if c.cfgMorph.containerCacheSize > 0 {
|
if c.cfgMorph.containerCacheSize > 0 {
|
||||||
containerCache := newCachedContainerStorage(cnrSrc, c.cfgMorph.cacheTTL, c.cfgMorph.containerCacheSize)
|
containerCache := newCachedContainerStorage(cnrSrc, c.cfgMorph.cacheTTL, c.cfgMorph.containerCacheSize)
|
||||||
|
|
||||||
subscribeToContainerCreation(c, func(e event.Event) {
|
subscribeToContainerCreation(c, func(ctx context.Context, e event.Event) {
|
||||||
ev := e.(containerEvent.PutSuccess)
|
ev := e.(containerEvent.PutSuccess)
|
||||||
|
|
||||||
// read owner of the created container in order to update the reading cache.
|
// read owner of the created container in order to update the reading cache.
|
||||||
|
@ -105,21 +105,21 @@ func configureEACLAndContainerSources(c *cfg, client *cntClient.Client, cnrSrc c
|
||||||
} else {
|
} else {
|
||||||
// unlike removal, we expect successful receive of the container
|
// unlike removal, we expect successful receive of the container
|
||||||
// after successful creation, so logging can be useful
|
// after successful creation, so logging can be useful
|
||||||
c.log.Error(context.Background(), logs.FrostFSNodeReadNewlyCreatedContainerAfterTheNotification,
|
c.log.Error(ctx, logs.FrostFSNodeReadNewlyCreatedContainerAfterTheNotification,
|
||||||
zap.Stringer("id", ev.ID),
|
zap.Stringer("id", ev.ID),
|
||||||
zap.Error(err),
|
zap.Error(err),
|
||||||
)
|
)
|
||||||
}
|
}
|
||||||
|
|
||||||
c.log.Debug(context.Background(), logs.FrostFSNodeContainerCreationEventsReceipt,
|
c.log.Debug(ctx, logs.FrostFSNodeContainerCreationEventsReceipt,
|
||||||
zap.Stringer("id", ev.ID),
|
zap.Stringer("id", ev.ID),
|
||||||
)
|
)
|
||||||
})
|
})
|
||||||
|
|
||||||
subscribeToContainerRemoval(c, func(e event.Event) {
|
subscribeToContainerRemoval(c, func(ctx context.Context, e event.Event) {
|
||||||
ev := e.(containerEvent.DeleteSuccess)
|
ev := e.(containerEvent.DeleteSuccess)
|
||||||
containerCache.handleRemoval(ev.ID)
|
containerCache.handleRemoval(ev.ID)
|
||||||
c.log.Debug(context.Background(), logs.FrostFSNodeContainerRemovalEventsReceipt,
|
c.log.Debug(ctx, logs.FrostFSNodeContainerRemovalEventsReceipt,
|
||||||
zap.Stringer("id", ev.ID),
|
zap.Stringer("id", ev.ID),
|
||||||
)
|
)
|
||||||
})
|
})
|
||||||
|
|
|
@ -172,11 +172,11 @@ func initNetmapService(ctx context.Context, c *cfg) {
|
||||||
}
|
}
|
||||||
|
|
||||||
func addNewEpochNotificationHandlers(c *cfg) {
|
func addNewEpochNotificationHandlers(c *cfg) {
|
||||||
addNewEpochNotificationHandler(c, func(ev event.Event) {
|
addNewEpochNotificationHandler(c, func(_ context.Context, ev event.Event) {
|
||||||
c.cfgNetmap.state.setCurrentEpoch(ev.(netmapEvent.NewEpoch).EpochNumber())
|
c.cfgNetmap.state.setCurrentEpoch(ev.(netmapEvent.NewEpoch).EpochNumber())
|
||||||
})
|
})
|
||||||
|
|
||||||
addNewEpochAsyncNotificationHandler(c, func(ev event.Event) {
|
addNewEpochAsyncNotificationHandler(c, func(ctx context.Context, ev event.Event) {
|
||||||
e := ev.(netmapEvent.NewEpoch).EpochNumber()
|
e := ev.(netmapEvent.NewEpoch).EpochNumber()
|
||||||
|
|
||||||
c.updateContractNodeInfo(e)
|
c.updateContractNodeInfo(e)
|
||||||
|
@ -186,15 +186,15 @@ func addNewEpochNotificationHandlers(c *cfg) {
|
||||||
}
|
}
|
||||||
|
|
||||||
if err := c.bootstrap(); err != nil {
|
if err := c.bootstrap(); err != nil {
|
||||||
c.log.Warn(context.Background(), logs.FrostFSNodeCantSendRebootstrapTx, zap.Error(err))
|
c.log.Warn(ctx, logs.FrostFSNodeCantSendRebootstrapTx, zap.Error(err))
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
|
|
||||||
if c.cfgMorph.notaryEnabled {
|
if c.cfgMorph.notaryEnabled {
|
||||||
addNewEpochAsyncNotificationHandler(c, func(_ event.Event) {
|
addNewEpochAsyncNotificationHandler(c, func(ctx context.Context, _ event.Event) {
|
||||||
_, _, err := makeNotaryDeposit(c)
|
_, _, err := makeNotaryDeposit(c)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
c.log.Error(context.Background(), logs.FrostFSNodeCouldNotMakeNotaryDeposit,
|
c.log.Error(ctx, logs.FrostFSNodeCouldNotMakeNotaryDeposit,
|
||||||
zap.String("error", err.Error()),
|
zap.String("error", err.Error()),
|
||||||
)
|
)
|
||||||
}
|
}
|
||||||
|
|
|
@ -48,7 +48,7 @@ func initSessionService(c *cfg) {
|
||||||
_ = c.privateTokenStore.Close()
|
_ = c.privateTokenStore.Close()
|
||||||
})
|
})
|
||||||
|
|
||||||
addNewEpochNotificationHandler(c, func(ev event.Event) {
|
addNewEpochNotificationHandler(c, func(_ context.Context, ev event.Event) {
|
||||||
c.privateTokenStore.RemoveOld(ev.(netmap.NewEpoch).EpochNumber())
|
c.privateTokenStore.RemoveOld(ev.(netmap.NewEpoch).EpochNumber())
|
||||||
})
|
})
|
||||||
|
|
||||||
|
|
|
@ -79,10 +79,10 @@ func initTreeService(c *cfg) {
|
||||||
}))
|
}))
|
||||||
|
|
||||||
if d := treeConfig.SyncInterval(); d == 0 {
|
if d := treeConfig.SyncInterval(); d == 0 {
|
||||||
addNewEpochNotificationHandler(c, func(_ event.Event) {
|
addNewEpochNotificationHandler(c, func(ctx context.Context, _ event.Event) {
|
||||||
err := c.treeService.SynchronizeAll()
|
err := c.treeService.SynchronizeAll()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
c.log.Error(context.Background(), logs.FrostFSNodeCouldNotSynchronizeTreeService, zap.Error(err))
|
c.log.Error(ctx, logs.FrostFSNodeCouldNotSynchronizeTreeService, zap.Error(err))
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
} else {
|
} else {
|
||||||
|
@ -102,15 +102,15 @@ func initTreeService(c *cfg) {
|
||||||
}()
|
}()
|
||||||
}
|
}
|
||||||
|
|
||||||
subscribeToContainerRemoval(c, func(e event.Event) {
|
subscribeToContainerRemoval(c, func(ctx context.Context, e event.Event) {
|
||||||
ev := e.(containerEvent.DeleteSuccess)
|
ev := e.(containerEvent.DeleteSuccess)
|
||||||
|
|
||||||
// This is executed asynchronously, so we don't care about the operation taking some time.
|
// This is executed asynchronously, so we don't care about the operation taking some time.
|
||||||
c.log.Debug(context.Background(), logs.FrostFSNodeRemovingAllTreesForContainer, zap.Stringer("cid", ev.ID))
|
c.log.Debug(ctx, logs.FrostFSNodeRemovingAllTreesForContainer, zap.Stringer("cid", ev.ID))
|
||||||
err := c.treeService.DropTree(context.Background(), ev.ID, "")
|
err := c.treeService.DropTree(ctx, ev.ID, "")
|
||||||
if err != nil && !errors.Is(err, pilorama.ErrTreeNotFound) {
|
if err != nil && !errors.Is(err, pilorama.ErrTreeNotFound) {
|
||||||
// Ignore pilorama.ErrTreeNotFound but other errors, including shard.ErrReadOnly, should be logged.
|
// Ignore pilorama.ErrTreeNotFound but other errors, including shard.ErrReadOnly, should be logged.
|
||||||
c.log.Error(context.Background(), logs.FrostFSNodeContainerRemovalEventReceivedButTreesWerentRemoved,
|
c.log.Error(ctx, logs.FrostFSNodeContainerRemovalEventReceivedButTreesWerentRemoved,
|
||||||
zap.Stringer("cid", ev.ID),
|
zap.Stringer("cid", ev.ID),
|
||||||
zap.String("error", err.Error()))
|
zap.String("error", err.Error()))
|
||||||
}
|
}
|
||||||
|
|
|
@ -137,12 +137,12 @@ func (s *Server) enableNotarySupport() error {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *Server) initNotaryConfig() {
|
func (s *Server) initNotaryConfig(ctx context.Context) {
|
||||||
s.mainNotaryConfig = notaryConfigs(
|
s.mainNotaryConfig = notaryConfigs(
|
||||||
!s.withoutMainNet && s.mainnetClient.ProbeNotary(), // if mainnet disabled then notary flag must be disabled too
|
!s.withoutMainNet && s.mainnetClient.ProbeNotary(), // if mainnet disabled then notary flag must be disabled too
|
||||||
)
|
)
|
||||||
|
|
||||||
s.log.Info(context.Background(), logs.InnerringNotarySupport,
|
s.log.Info(ctx, logs.InnerringNotarySupport,
|
||||||
zap.Bool("sidechain_enabled", true),
|
zap.Bool("sidechain_enabled", true),
|
||||||
zap.Bool("mainchain_enabled", !s.mainNotaryConfig.disabled),
|
zap.Bool("mainchain_enabled", !s.mainNotaryConfig.disabled),
|
||||||
)
|
)
|
||||||
|
@ -152,8 +152,8 @@ func (s *Server) createAlphaSync(cfg *viper.Viper, frostfsCli *frostfsClient.Cli
|
||||||
var alphaSync event.Handler
|
var alphaSync event.Handler
|
||||||
|
|
||||||
if s.withoutMainNet || cfg.GetBool("governance.disable") {
|
if s.withoutMainNet || cfg.GetBool("governance.disable") {
|
||||||
alphaSync = func(event.Event) {
|
alphaSync = func(ctx context.Context, _ event.Event) {
|
||||||
s.log.Debug(context.Background(), logs.InnerringAlphabetKeysSyncIsDisabled)
|
s.log.Debug(ctx, logs.InnerringAlphabetKeysSyncIsDisabled)
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
// create governance processor
|
// create governance processor
|
||||||
|
@ -196,9 +196,9 @@ func (s *Server) createIRFetcher() irFetcher {
|
||||||
return irf
|
return irf
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *Server) initTimers(cfg *viper.Viper) {
|
func (s *Server) initTimers(ctx context.Context, cfg *viper.Viper) {
|
||||||
s.epochTimer = newEpochTimer(&epochTimerArgs{
|
s.epochTimer = newEpochTimer(&epochTimerArgs{
|
||||||
newEpochHandlers: s.newEpochTickHandlers(),
|
newEpochHandlers: s.newEpochTickHandlers(ctx),
|
||||||
epoch: s,
|
epoch: s,
|
||||||
})
|
})
|
||||||
|
|
||||||
|
|
|
@ -152,7 +152,7 @@ func (s *Server) Start(ctx context.Context, intError chan<- error) (err error) {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
err = s.initConfigFromBlockchain()
|
err = s.initConfigFromBlockchain(ctx)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
@ -173,14 +173,14 @@ func (s *Server) Start(ctx context.Context, intError chan<- error) (err error) {
|
||||||
prm.Validators = s.predefinedValidators
|
prm.Validators = s.predefinedValidators
|
||||||
|
|
||||||
// vote for sidechain validator if it is prepared in config
|
// vote for sidechain validator if it is prepared in config
|
||||||
err = s.voteForSidechainValidator(prm)
|
err = s.voteForSidechainValidator(ctx, prm)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
// we don't stop inner ring execution on this error
|
// we don't stop inner ring execution on this error
|
||||||
s.log.Warn(ctx, logs.InnerringCantVoteForPreparedValidators,
|
s.log.Warn(ctx, logs.InnerringCantVoteForPreparedValidators,
|
||||||
zap.String("error", err.Error()))
|
zap.String("error", err.Error()))
|
||||||
}
|
}
|
||||||
|
|
||||||
s.tickInitialExpoch()
|
s.tickInitialExpoch(ctx)
|
||||||
|
|
||||||
morphErr := make(chan error)
|
morphErr := make(chan error)
|
||||||
mainnnetErr := make(chan error)
|
mainnnetErr := make(chan error)
|
||||||
|
@ -283,11 +283,11 @@ func (s *Server) initSideNotary(ctx context.Context) error {
|
||||||
)
|
)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *Server) tickInitialExpoch() {
|
func (s *Server) tickInitialExpoch(ctx context.Context) {
|
||||||
initialEpochTicker := timer.NewOneTickTimer(
|
initialEpochTicker := timer.NewOneTickTimer(
|
||||||
timer.StaticBlockMeter(s.initialEpochTickDelta),
|
timer.StaticBlockMeter(s.initialEpochTickDelta),
|
||||||
func() {
|
func() {
|
||||||
s.netmapProcessor.HandleNewEpochTick(timerEvent.NewEpochTick{})
|
s.netmapProcessor.HandleNewEpochTick(ctx, timerEvent.NewEpochTick{})
|
||||||
})
|
})
|
||||||
s.addBlockTimer(initialEpochTicker)
|
s.addBlockTimer(initialEpochTicker)
|
||||||
}
|
}
|
||||||
|
@ -376,7 +376,7 @@ func New(ctx context.Context, log *logger.Logger, cfg *viper.Viper, errChan chan
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
server.initNotaryConfig()
|
server.initNotaryConfig(ctx)
|
||||||
|
|
||||||
err = server.initContracts(cfg)
|
err = server.initContracts(cfg)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -405,7 +405,7 @@ func New(ctx context.Context, log *logger.Logger, cfg *viper.Viper, errChan chan
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
server.initTimers(cfg)
|
server.initTimers(ctx, cfg)
|
||||||
|
|
||||||
err = server.initGRPCServer(cfg, log, audit)
|
err = server.initGRPCServer(cfg, log, audit)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -573,7 +573,7 @@ func parseMultinetConfig(cfg *viper.Viper, m metrics.MultinetMetrics) internalNe
|
||||||
return nc
|
return nc
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *Server) initConfigFromBlockchain() error {
|
func (s *Server) initConfigFromBlockchain(ctx context.Context) error {
|
||||||
// get current epoch
|
// get current epoch
|
||||||
epoch, err := s.netmapClient.Epoch()
|
epoch, err := s.netmapClient.Epoch()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -602,8 +602,8 @@ func (s *Server) initConfigFromBlockchain() error {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
s.log.Debug(context.Background(), logs.InnerringReadConfigFromBlockchain,
|
s.log.Debug(ctx, logs.InnerringReadConfigFromBlockchain,
|
||||||
zap.Bool("active", s.IsActive()),
|
zap.Bool("active", s.IsActive(ctx)),
|
||||||
zap.Bool("alphabet", s.IsAlphabet()),
|
zap.Bool("alphabet", s.IsAlphabet()),
|
||||||
zap.Uint64("epoch", epoch),
|
zap.Uint64("epoch", epoch),
|
||||||
zap.Uint32("precision", balancePrecision),
|
zap.Uint32("precision", balancePrecision),
|
||||||
|
@ -635,17 +635,17 @@ func (s *Server) nextEpochBlockDelta() (uint32, error) {
|
||||||
// onlyAlphabet wrapper around event handler that executes it
|
// onlyAlphabet wrapper around event handler that executes it
|
||||||
// only if inner ring node is alphabet node.
|
// only if inner ring node is alphabet node.
|
||||||
func (s *Server) onlyAlphabetEventHandler(f event.Handler) event.Handler {
|
func (s *Server) onlyAlphabetEventHandler(f event.Handler) event.Handler {
|
||||||
return func(ev event.Event) {
|
return func(ctx context.Context, ev event.Event) {
|
||||||
if s.IsAlphabet() {
|
if s.IsAlphabet() {
|
||||||
f(ev)
|
f(ctx, ev)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *Server) newEpochTickHandlers() []newEpochHandler {
|
func (s *Server) newEpochTickHandlers(ctx context.Context) []newEpochHandler {
|
||||||
newEpochHandlers := []newEpochHandler{
|
newEpochHandlers := []newEpochHandler{
|
||||||
func() {
|
func() {
|
||||||
s.netmapProcessor.HandleNewEpochTick(timerEvent.NewEpochTick{})
|
s.netmapProcessor.HandleNewEpochTick(ctx, timerEvent.NewEpochTick{})
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -50,16 +50,16 @@ func (s *Server) depositSideNotary() (util.Uint256, error) {
|
||||||
return tx, err
|
return tx, err
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *Server) notaryHandler(_ event.Event) {
|
func (s *Server) notaryHandler(ctx context.Context, _ event.Event) {
|
||||||
if !s.mainNotaryConfig.disabled {
|
if !s.mainNotaryConfig.disabled {
|
||||||
_, err := s.depositMainNotary()
|
_, err := s.depositMainNotary()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
s.log.Error(context.Background(), logs.InnerringCantMakeNotaryDepositInMainChain, zap.Error(err))
|
s.log.Error(ctx, logs.InnerringCantMakeNotaryDepositInMainChain, zap.Error(err))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if _, err := s.depositSideNotary(); err != nil {
|
if _, err := s.depositSideNotary(); err != nil {
|
||||||
s.log.Error(context.Background(), logs.InnerringCantMakeNotaryDepositInSideChain, zap.Error(err))
|
s.log.Error(ctx, logs.InnerringCantMakeNotaryDepositInSideChain, zap.Error(err))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -11,9 +11,9 @@ import (
|
||||||
"go.uber.org/zap"
|
"go.uber.org/zap"
|
||||||
)
|
)
|
||||||
|
|
||||||
func (bp *Processor) handleLock(ev event.Event) {
|
func (bp *Processor) handleLock(ctx context.Context, ev event.Event) {
|
||||||
lock := ev.(balanceEvent.Lock)
|
lock := ev.(balanceEvent.Lock)
|
||||||
bp.log.Info(context.Background(), logs.Notification,
|
bp.log.Info(ctx, logs.Notification,
|
||||||
zap.String("type", "lock"),
|
zap.String("type", "lock"),
|
||||||
zap.String("value", hex.EncodeToString(lock.ID())))
|
zap.String("value", hex.EncodeToString(lock.ID())))
|
||||||
|
|
||||||
|
@ -24,7 +24,7 @@ func (bp *Processor) handleLock(ev event.Event) {
|
||||||
})
|
})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
// there system can be moved into controlled degradation stage
|
// there system can be moved into controlled degradation stage
|
||||||
bp.log.Warn(context.Background(), logs.BalanceBalanceWorkerPoolDrained,
|
bp.log.Warn(ctx, logs.BalanceBalanceWorkerPoolDrained,
|
||||||
zap.Int("capacity", bp.pool.Cap()))
|
zap.Int("capacity", bp.pool.Cap()))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,6 +1,7 @@
|
||||||
package balance
|
package balance
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"context"
|
||||||
"testing"
|
"testing"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
@ -30,7 +31,7 @@ func TestProcessorCallsFrostFSContractForLockEvent(t *testing.T) {
|
||||||
})
|
})
|
||||||
require.NoError(t, err, "failed to create processor")
|
require.NoError(t, err, "failed to create processor")
|
||||||
|
|
||||||
processor.handleLock(balanceEvent.Lock{})
|
processor.handleLock(context.Background(), balanceEvent.Lock{})
|
||||||
|
|
||||||
for processor.pool.Running() > 0 {
|
for processor.pool.Running() > 0 {
|
||||||
time.Sleep(10 * time.Millisecond)
|
time.Sleep(10 * time.Millisecond)
|
||||||
|
@ -56,7 +57,7 @@ func TestProcessorDoesntCallFrostFSContractIfNotAlphabet(t *testing.T) {
|
||||||
})
|
})
|
||||||
require.NoError(t, err, "failed to create processor")
|
require.NoError(t, err, "failed to create processor")
|
||||||
|
|
||||||
processor.handleLock(balanceEvent.Lock{})
|
processor.handleLock(context.Background(), balanceEvent.Lock{})
|
||||||
|
|
||||||
for processor.pool.Running() > 0 {
|
for processor.pool.Running() > 0 {
|
||||||
time.Sleep(10 * time.Millisecond)
|
time.Sleep(10 * time.Millisecond)
|
||||||
|
|
|
@ -12,11 +12,11 @@ import (
|
||||||
"go.uber.org/zap"
|
"go.uber.org/zap"
|
||||||
)
|
)
|
||||||
|
|
||||||
func (cp *Processor) handlePut(ev event.Event) {
|
func (cp *Processor) handlePut(ctx context.Context, ev event.Event) {
|
||||||
put := ev.(putEvent)
|
put := ev.(putEvent)
|
||||||
|
|
||||||
id := sha256.Sum256(put.Container())
|
id := sha256.Sum256(put.Container())
|
||||||
cp.log.Info(context.Background(), logs.Notification,
|
cp.log.Info(ctx, logs.Notification,
|
||||||
zap.String("type", "container put"),
|
zap.String("type", "container put"),
|
||||||
zap.String("id", base58.Encode(id[:])))
|
zap.String("id", base58.Encode(id[:])))
|
||||||
|
|
||||||
|
@ -27,14 +27,14 @@ func (cp *Processor) handlePut(ev event.Event) {
|
||||||
})
|
})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
// there system can be moved into controlled degradation stage
|
// there system can be moved into controlled degradation stage
|
||||||
cp.log.Warn(context.Background(), logs.ContainerContainerProcessorWorkerPoolDrained,
|
cp.log.Warn(ctx, logs.ContainerContainerProcessorWorkerPoolDrained,
|
||||||
zap.Int("capacity", cp.pool.Cap()))
|
zap.Int("capacity", cp.pool.Cap()))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (cp *Processor) handleDelete(ev event.Event) {
|
func (cp *Processor) handleDelete(ctx context.Context, ev event.Event) {
|
||||||
del := ev.(containerEvent.Delete)
|
del := ev.(containerEvent.Delete)
|
||||||
cp.log.Info(context.Background(), logs.Notification,
|
cp.log.Info(ctx, logs.Notification,
|
||||||
zap.String("type", "container delete"),
|
zap.String("type", "container delete"),
|
||||||
zap.String("id", base58.Encode(del.ContainerID())))
|
zap.String("id", base58.Encode(del.ContainerID())))
|
||||||
|
|
||||||
|
@ -45,7 +45,7 @@ func (cp *Processor) handleDelete(ev event.Event) {
|
||||||
})
|
})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
// there system can be moved into controlled degradation stage
|
// there system can be moved into controlled degradation stage
|
||||||
cp.log.Warn(context.Background(), logs.ContainerContainerProcessorWorkerPoolDrained,
|
cp.log.Warn(ctx, logs.ContainerContainerProcessorWorkerPoolDrained,
|
||||||
zap.Int("capacity", cp.pool.Cap()))
|
zap.Int("capacity", cp.pool.Cap()))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,6 +1,7 @@
|
||||||
package container
|
package container
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"context"
|
||||||
"crypto/ecdsa"
|
"crypto/ecdsa"
|
||||||
"encoding/hex"
|
"encoding/hex"
|
||||||
"testing"
|
"testing"
|
||||||
|
@ -71,7 +72,7 @@ func TestPutEvent(t *testing.T) {
|
||||||
nr: nr,
|
nr: nr,
|
||||||
}
|
}
|
||||||
|
|
||||||
proc.handlePut(event)
|
proc.handlePut(context.Background(), event)
|
||||||
|
|
||||||
for proc.pool.Running() > 0 {
|
for proc.pool.Running() > 0 {
|
||||||
time.Sleep(10 * time.Millisecond)
|
time.Sleep(10 * time.Millisecond)
|
||||||
|
@ -143,7 +144,7 @@ func TestDeleteEvent(t *testing.T) {
|
||||||
Signature: signature,
|
Signature: signature,
|
||||||
}
|
}
|
||||||
|
|
||||||
proc.handleDelete(ev)
|
proc.handleDelete(context.Background(), ev)
|
||||||
|
|
||||||
for proc.pool.Running() > 0 {
|
for proc.pool.Running() > 0 {
|
||||||
time.Sleep(10 * time.Millisecond)
|
time.Sleep(10 * time.Millisecond)
|
||||||
|
|
|
@ -13,11 +13,11 @@ import (
|
||||||
"go.uber.org/zap"
|
"go.uber.org/zap"
|
||||||
)
|
)
|
||||||
|
|
||||||
func (np *Processor) handleDeposit(ev event.Event) {
|
func (np *Processor) handleDeposit(ctx context.Context, ev event.Event) {
|
||||||
deposit := ev.(frostfsEvent.Deposit)
|
deposit := ev.(frostfsEvent.Deposit)
|
||||||
depositIDBin := bytes.Clone(deposit.ID())
|
depositIDBin := bytes.Clone(deposit.ID())
|
||||||
slices.Reverse(depositIDBin)
|
slices.Reverse(depositIDBin)
|
||||||
np.log.Info(context.Background(), logs.Notification,
|
np.log.Info(ctx, logs.Notification,
|
||||||
zap.String("type", "deposit"),
|
zap.String("type", "deposit"),
|
||||||
zap.String("id", hex.EncodeToString(depositIDBin)))
|
zap.String("id", hex.EncodeToString(depositIDBin)))
|
||||||
|
|
||||||
|
@ -28,16 +28,16 @@ func (np *Processor) handleDeposit(ev event.Event) {
|
||||||
})
|
})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
// there system can be moved into controlled degradation stage
|
// there system can be moved into controlled degradation stage
|
||||||
np.log.Warn(context.Background(), logs.FrostFSFrostfsProcessorWorkerPoolDrained,
|
np.log.Warn(ctx, logs.FrostFSFrostfsProcessorWorkerPoolDrained,
|
||||||
zap.Int("capacity", np.pool.Cap()))
|
zap.Int("capacity", np.pool.Cap()))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (np *Processor) handleWithdraw(ev event.Event) {
|
func (np *Processor) handleWithdraw(ctx context.Context, ev event.Event) {
|
||||||
withdraw := ev.(frostfsEvent.Withdraw)
|
withdraw := ev.(frostfsEvent.Withdraw)
|
||||||
withdrawBin := bytes.Clone(withdraw.ID())
|
withdrawBin := bytes.Clone(withdraw.ID())
|
||||||
slices.Reverse(withdrawBin)
|
slices.Reverse(withdrawBin)
|
||||||
np.log.Info(context.Background(), logs.Notification,
|
np.log.Info(ctx, logs.Notification,
|
||||||
zap.String("type", "withdraw"),
|
zap.String("type", "withdraw"),
|
||||||
zap.String("id", hex.EncodeToString(withdrawBin)))
|
zap.String("id", hex.EncodeToString(withdrawBin)))
|
||||||
|
|
||||||
|
@ -48,14 +48,14 @@ func (np *Processor) handleWithdraw(ev event.Event) {
|
||||||
})
|
})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
// there system can be moved into controlled degradation stage
|
// there system can be moved into controlled degradation stage
|
||||||
np.log.Warn(context.Background(), logs.FrostFSFrostfsProcessorWorkerPoolDrained,
|
np.log.Warn(ctx, logs.FrostFSFrostfsProcessorWorkerPoolDrained,
|
||||||
zap.Int("capacity", np.pool.Cap()))
|
zap.Int("capacity", np.pool.Cap()))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (np *Processor) handleCheque(ev event.Event) {
|
func (np *Processor) handleCheque(ctx context.Context, ev event.Event) {
|
||||||
cheque := ev.(frostfsEvent.Cheque)
|
cheque := ev.(frostfsEvent.Cheque)
|
||||||
np.log.Info(context.Background(), logs.Notification,
|
np.log.Info(ctx, logs.Notification,
|
||||||
zap.String("type", "cheque"),
|
zap.String("type", "cheque"),
|
||||||
zap.String("id", hex.EncodeToString(cheque.ID())))
|
zap.String("id", hex.EncodeToString(cheque.ID())))
|
||||||
|
|
||||||
|
@ -66,14 +66,14 @@ func (np *Processor) handleCheque(ev event.Event) {
|
||||||
})
|
})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
// there system can be moved into controlled degradation stage
|
// there system can be moved into controlled degradation stage
|
||||||
np.log.Warn(context.Background(), logs.FrostFSFrostfsProcessorWorkerPoolDrained,
|
np.log.Warn(ctx, logs.FrostFSFrostfsProcessorWorkerPoolDrained,
|
||||||
zap.Int("capacity", np.pool.Cap()))
|
zap.Int("capacity", np.pool.Cap()))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (np *Processor) handleConfig(ev event.Event) {
|
func (np *Processor) handleConfig(ctx context.Context, ev event.Event) {
|
||||||
cfg := ev.(frostfsEvent.Config)
|
cfg := ev.(frostfsEvent.Config)
|
||||||
np.log.Info(context.Background(), logs.Notification,
|
np.log.Info(ctx, logs.Notification,
|
||||||
zap.String("type", "set config"),
|
zap.String("type", "set config"),
|
||||||
zap.String("key", hex.EncodeToString(cfg.Key())),
|
zap.String("key", hex.EncodeToString(cfg.Key())),
|
||||||
zap.String("value", hex.EncodeToString(cfg.Value())))
|
zap.String("value", hex.EncodeToString(cfg.Value())))
|
||||||
|
@ -85,7 +85,7 @@ func (np *Processor) handleConfig(ev event.Event) {
|
||||||
})
|
})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
// there system can be moved into controlled degradation stage
|
// there system can be moved into controlled degradation stage
|
||||||
np.log.Warn(context.Background(), logs.FrostFSFrostfsProcessorWorkerPoolDrained,
|
np.log.Warn(ctx, logs.FrostFSFrostfsProcessorWorkerPoolDrained,
|
||||||
zap.Int("capacity", np.pool.Cap()))
|
zap.Int("capacity", np.pool.Cap()))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,6 +1,7 @@
|
||||||
package frostfs
|
package frostfs
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"context"
|
||||||
"testing"
|
"testing"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
@ -36,7 +37,7 @@ func TestHandleDeposit(t *testing.T) {
|
||||||
AmountValue: 1000,
|
AmountValue: 1000,
|
||||||
}
|
}
|
||||||
|
|
||||||
proc.handleDeposit(ev)
|
proc.handleDeposit(context.Background(), ev)
|
||||||
|
|
||||||
for proc.pool.Running() > 0 {
|
for proc.pool.Running() > 0 {
|
||||||
time.Sleep(10 * time.Millisecond)
|
time.Sleep(10 * time.Millisecond)
|
||||||
|
@ -57,7 +58,7 @@ func TestHandleDeposit(t *testing.T) {
|
||||||
|
|
||||||
es.epochCounter = 109
|
es.epochCounter = 109
|
||||||
|
|
||||||
proc.handleDeposit(ev)
|
proc.handleDeposit(context.Background(), ev)
|
||||||
|
|
||||||
for proc.pool.Running() > 0 {
|
for proc.pool.Running() > 0 {
|
||||||
time.Sleep(10 * time.Millisecond)
|
time.Sleep(10 * time.Millisecond)
|
||||||
|
@ -98,7 +99,7 @@ func TestHandleWithdraw(t *testing.T) {
|
||||||
AmountValue: 1000,
|
AmountValue: 1000,
|
||||||
}
|
}
|
||||||
|
|
||||||
proc.handleWithdraw(ev)
|
proc.handleWithdraw(context.Background(), ev)
|
||||||
|
|
||||||
for proc.pool.Running() > 0 {
|
for proc.pool.Running() > 0 {
|
||||||
time.Sleep(10 * time.Millisecond)
|
time.Sleep(10 * time.Millisecond)
|
||||||
|
@ -139,7 +140,7 @@ func TestHandleCheque(t *testing.T) {
|
||||||
LockValue: util.Uint160{200},
|
LockValue: util.Uint160{200},
|
||||||
}
|
}
|
||||||
|
|
||||||
proc.handleCheque(ev)
|
proc.handleCheque(context.Background(), ev)
|
||||||
|
|
||||||
for proc.pool.Running() > 0 {
|
for proc.pool.Running() > 0 {
|
||||||
time.Sleep(10 * time.Millisecond)
|
time.Sleep(10 * time.Millisecond)
|
||||||
|
@ -176,7 +177,7 @@ func TestHandleConfig(t *testing.T) {
|
||||||
TxHashValue: util.Uint256{100},
|
TxHashValue: util.Uint256{100},
|
||||||
}
|
}
|
||||||
|
|
||||||
proc.handleConfig(ev)
|
proc.handleConfig(context.Background(), ev)
|
||||||
|
|
||||||
for proc.pool.Running() > 0 {
|
for proc.pool.Running() > 0 {
|
||||||
time.Sleep(10 * time.Millisecond)
|
time.Sleep(10 * time.Millisecond)
|
||||||
|
|
|
@ -13,7 +13,7 @@ import (
|
||||||
"go.uber.org/zap"
|
"go.uber.org/zap"
|
||||||
)
|
)
|
||||||
|
|
||||||
func (gp *Processor) HandleAlphabetSync(e event.Event) {
|
func (gp *Processor) HandleAlphabetSync(ctx context.Context, e event.Event) {
|
||||||
var (
|
var (
|
||||||
typ string
|
typ string
|
||||||
hash util.Uint256
|
hash util.Uint256
|
||||||
|
@ -34,16 +34,16 @@ func (gp *Processor) HandleAlphabetSync(e event.Event) {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
gp.log.Info(context.Background(), logs.GovernanceNewEvent, zap.String("type", typ))
|
gp.log.Info(ctx, logs.GovernanceNewEvent, zap.String("type", typ))
|
||||||
|
|
||||||
// send event to the worker pool
|
// send event to the worker pool
|
||||||
|
|
||||||
err := processors.SubmitEvent(gp.pool, gp.metrics, "alphabet_sync", func() bool {
|
err := processors.SubmitEvent(gp.pool, gp.metrics, "alphabet_sync", func() bool {
|
||||||
return gp.processAlphabetSync(hash)
|
return gp.processAlphabetSync(ctx, hash)
|
||||||
})
|
})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
// there system can be moved into controlled degradation stage
|
// there system can be moved into controlled degradation stage
|
||||||
gp.log.Warn(context.Background(), logs.GovernanceGovernanceWorkerPoolDrained,
|
gp.log.Warn(ctx, logs.GovernanceGovernanceWorkerPoolDrained,
|
||||||
zap.Int("capacity", gp.pool.Cap()))
|
zap.Int("capacity", gp.pool.Cap()))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,6 +1,7 @@
|
||||||
package governance
|
package governance
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"context"
|
||||||
"encoding/binary"
|
"encoding/binary"
|
||||||
"sort"
|
"sort"
|
||||||
"testing"
|
"testing"
|
||||||
|
@ -57,7 +58,7 @@ func TestHandleAlphabetSyncEvent(t *testing.T) {
|
||||||
txHash: util.Uint256{100},
|
txHash: util.Uint256{100},
|
||||||
}
|
}
|
||||||
|
|
||||||
proc.HandleAlphabetSync(ev)
|
proc.HandleAlphabetSync(context.Background(), ev)
|
||||||
|
|
||||||
for proc.pool.Running() > 0 {
|
for proc.pool.Running() > 0 {
|
||||||
time.Sleep(10 * time.Millisecond)
|
time.Sleep(10 * time.Millisecond)
|
||||||
|
@ -133,7 +134,7 @@ func TestHandleAlphabetDesignateEvent(t *testing.T) {
|
||||||
Role: noderoles.NeoFSAlphabet,
|
Role: noderoles.NeoFSAlphabet,
|
||||||
}
|
}
|
||||||
|
|
||||||
proc.HandleAlphabetSync(ev)
|
proc.HandleAlphabetSync(context.Background(), ev)
|
||||||
|
|
||||||
for proc.pool.Running() > 0 {
|
for proc.pool.Running() > 0 {
|
||||||
time.Sleep(10 * time.Millisecond)
|
time.Sleep(10 * time.Millisecond)
|
||||||
|
@ -226,7 +227,7 @@ type testVoter struct {
|
||||||
votes []VoteValidatorPrm
|
votes []VoteValidatorPrm
|
||||||
}
|
}
|
||||||
|
|
||||||
func (v *testVoter) VoteForSidechainValidator(prm VoteValidatorPrm) error {
|
func (v *testVoter) VoteForSidechainValidator(_ context.Context, prm VoteValidatorPrm) error {
|
||||||
v.votes = append(v.votes, prm)
|
v.votes = append(v.votes, prm)
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
|
@ -19,39 +19,39 @@ const (
|
||||||
alphabetUpdateIDPrefix = "AlphabetUpdate"
|
alphabetUpdateIDPrefix = "AlphabetUpdate"
|
||||||
)
|
)
|
||||||
|
|
||||||
func (gp *Processor) processAlphabetSync(txHash util.Uint256) bool {
|
func (gp *Processor) processAlphabetSync(ctx context.Context, txHash util.Uint256) bool {
|
||||||
if !gp.alphabetState.IsAlphabet() {
|
if !gp.alphabetState.IsAlphabet() {
|
||||||
gp.log.Info(context.Background(), logs.GovernanceNonAlphabetModeIgnoreAlphabetSync)
|
gp.log.Info(ctx, logs.GovernanceNonAlphabetModeIgnoreAlphabetSync)
|
||||||
return true
|
return true
|
||||||
}
|
}
|
||||||
|
|
||||||
mainnetAlphabet, err := gp.mainnetClient.NeoFSAlphabetList()
|
mainnetAlphabet, err := gp.mainnetClient.NeoFSAlphabetList()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
gp.log.Error(context.Background(), logs.GovernanceCantFetchAlphabetListFromMainNet,
|
gp.log.Error(ctx, logs.GovernanceCantFetchAlphabetListFromMainNet,
|
||||||
zap.String("error", err.Error()))
|
zap.String("error", err.Error()))
|
||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
|
|
||||||
sidechainAlphabet, err := gp.morphClient.Committee()
|
sidechainAlphabet, err := gp.morphClient.Committee()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
gp.log.Error(context.Background(), logs.GovernanceCantFetchAlphabetListFromSideChain,
|
gp.log.Error(ctx, logs.GovernanceCantFetchAlphabetListFromSideChain,
|
||||||
zap.String("error", err.Error()))
|
zap.String("error", err.Error()))
|
||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
|
|
||||||
newAlphabet, err := newAlphabetList(sidechainAlphabet, mainnetAlphabet)
|
newAlphabet, err := newAlphabetList(sidechainAlphabet, mainnetAlphabet)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
gp.log.Error(context.Background(), logs.GovernanceCantMergeAlphabetListsFromMainNetAndSideChain,
|
gp.log.Error(ctx, logs.GovernanceCantMergeAlphabetListsFromMainNetAndSideChain,
|
||||||
zap.String("error", err.Error()))
|
zap.String("error", err.Error()))
|
||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
|
|
||||||
if newAlphabet == nil {
|
if newAlphabet == nil {
|
||||||
gp.log.Info(context.Background(), logs.GovernanceNoGovernanceUpdateAlphabetListHasNotBeenChanged)
|
gp.log.Info(ctx, logs.GovernanceNoGovernanceUpdateAlphabetListHasNotBeenChanged)
|
||||||
return true
|
return true
|
||||||
}
|
}
|
||||||
|
|
||||||
gp.log.Info(context.Background(), logs.GovernanceAlphabetListHasBeenChangedStartingUpdate,
|
gp.log.Info(ctx, logs.GovernanceAlphabetListHasBeenChangedStartingUpdate,
|
||||||
zap.String("side_chain_alphabet", prettyKeys(sidechainAlphabet)),
|
zap.String("side_chain_alphabet", prettyKeys(sidechainAlphabet)),
|
||||||
zap.String("new_alphabet", prettyKeys(newAlphabet)),
|
zap.String("new_alphabet", prettyKeys(newAlphabet)),
|
||||||
)
|
)
|
||||||
|
@ -62,9 +62,9 @@ func (gp *Processor) processAlphabetSync(txHash util.Uint256) bool {
|
||||||
}
|
}
|
||||||
|
|
||||||
// 1. Vote to sidechain committee via alphabet contracts.
|
// 1. Vote to sidechain committee via alphabet contracts.
|
||||||
err = gp.voter.VoteForSidechainValidator(votePrm)
|
err = gp.voter.VoteForSidechainValidator(ctx, votePrm)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
gp.log.Error(context.Background(), logs.GovernanceCantVoteForSideChainCommittee,
|
gp.log.Error(ctx, logs.GovernanceCantVoteForSideChainCommittee,
|
||||||
zap.String("error", err.Error()))
|
zap.String("error", err.Error()))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -77,7 +77,7 @@ func (gp *Processor) processAlphabetSync(txHash util.Uint256) bool {
|
||||||
// 4. Update FrostFS contract in the mainnet.
|
// 4. Update FrostFS contract in the mainnet.
|
||||||
gp.updateFrostFSContractInMainnet(newAlphabet)
|
gp.updateFrostFSContractInMainnet(newAlphabet)
|
||||||
|
|
||||||
gp.log.Info(context.Background(), logs.GovernanceFinishedAlphabetListUpdate)
|
gp.log.Info(ctx, logs.GovernanceFinishedAlphabetListUpdate)
|
||||||
|
|
||||||
return true
|
return true
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,6 +1,7 @@
|
||||||
package governance
|
package governance
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"context"
|
||||||
"errors"
|
"errors"
|
||||||
"fmt"
|
"fmt"
|
||||||
|
|
||||||
|
@ -38,7 +39,7 @@ type VoteValidatorPrm struct {
|
||||||
|
|
||||||
// Voter is a callback interface for alphabet contract voting.
|
// Voter is a callback interface for alphabet contract voting.
|
||||||
type Voter interface {
|
type Voter interface {
|
||||||
VoteForSidechainValidator(VoteValidatorPrm) error
|
VoteForSidechainValidator(context.Context, VoteValidatorPrm) error
|
||||||
}
|
}
|
||||||
|
|
||||||
type (
|
type (
|
||||||
|
|
|
@ -12,13 +12,13 @@ import (
|
||||||
"go.uber.org/zap"
|
"go.uber.org/zap"
|
||||||
)
|
)
|
||||||
|
|
||||||
func (np *Processor) HandleNewEpochTick(ev event.Event) {
|
func (np *Processor) HandleNewEpochTick(ctx context.Context, ev event.Event) {
|
||||||
_ = ev.(timerEvent.NewEpochTick)
|
_ = ev.(timerEvent.NewEpochTick)
|
||||||
np.log.Info(context.Background(), logs.NetmapTick, zap.String("type", "epoch"))
|
np.log.Info(context.Background(), logs.NetmapTick, zap.String("type", "epoch"))
|
||||||
|
|
||||||
// send an event to the worker pool
|
// send an event to the worker pool
|
||||||
|
|
||||||
err := processors.SubmitEvent(np.pool, np.metrics, "netmap_new_epoch_tick", np.processNewEpochTick)
|
err := processors.SubmitEvent(np.pool, np.metrics, "netmap_new_epoch_tick", func() bool { return np.processNewEpochTick(ctx) })
|
||||||
if err != nil {
|
if err != nil {
|
||||||
// there system can be moved into controlled degradation stage
|
// there system can be moved into controlled degradation stage
|
||||||
np.log.Warn(context.Background(), logs.NetmapNetmapWorkerPoolDrained,
|
np.log.Warn(context.Background(), logs.NetmapNetmapWorkerPoolDrained,
|
||||||
|
@ -26,28 +26,28 @@ func (np *Processor) HandleNewEpochTick(ev event.Event) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (np *Processor) handleNewEpoch(ev event.Event) {
|
func (np *Processor) handleNewEpoch(ctx context.Context, ev event.Event) {
|
||||||
epochEvent := ev.(netmapEvent.NewEpoch)
|
epochEvent := ev.(netmapEvent.NewEpoch)
|
||||||
np.log.Info(context.Background(), logs.Notification,
|
np.log.Info(ctx, logs.Notification,
|
||||||
zap.String("type", "new epoch"),
|
zap.String("type", "new epoch"),
|
||||||
zap.Uint64("value", epochEvent.EpochNumber()))
|
zap.Uint64("value", epochEvent.EpochNumber()))
|
||||||
|
|
||||||
// send an event to the worker pool
|
// send an event to the worker pool
|
||||||
|
|
||||||
err := processors.SubmitEvent(np.pool, np.metrics, "netmap_new_epoch", func() bool {
|
err := processors.SubmitEvent(np.pool, np.metrics, "netmap_new_epoch", func() bool {
|
||||||
return np.processNewEpoch(epochEvent)
|
return np.processNewEpoch(ctx, epochEvent)
|
||||||
})
|
})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
// there system can be moved into controlled degradation stage
|
// there system can be moved into controlled degradation stage
|
||||||
np.log.Warn(context.Background(), logs.NetmapNetmapWorkerPoolDrained,
|
np.log.Warn(ctx, logs.NetmapNetmapWorkerPoolDrained,
|
||||||
zap.Int("capacity", np.pool.Cap()))
|
zap.Int("capacity", np.pool.Cap()))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (np *Processor) handleAddPeer(ev event.Event) {
|
func (np *Processor) handleAddPeer(ctx context.Context, ev event.Event) {
|
||||||
newPeer := ev.(netmapEvent.AddPeer)
|
newPeer := ev.(netmapEvent.AddPeer)
|
||||||
|
|
||||||
np.log.Info(context.Background(), logs.Notification,
|
np.log.Info(ctx, logs.Notification,
|
||||||
zap.String("type", "add peer"),
|
zap.String("type", "add peer"),
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -58,14 +58,14 @@ func (np *Processor) handleAddPeer(ev event.Event) {
|
||||||
})
|
})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
// there system can be moved into controlled degradation stage
|
// there system can be moved into controlled degradation stage
|
||||||
np.log.Warn(context.Background(), logs.NetmapNetmapWorkerPoolDrained,
|
np.log.Warn(ctx, logs.NetmapNetmapWorkerPoolDrained,
|
||||||
zap.Int("capacity", np.pool.Cap()))
|
zap.Int("capacity", np.pool.Cap()))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (np *Processor) handleUpdateState(ev event.Event) {
|
func (np *Processor) handleUpdateState(ctx context.Context, ev event.Event) {
|
||||||
updPeer := ev.(netmapEvent.UpdatePeer)
|
updPeer := ev.(netmapEvent.UpdatePeer)
|
||||||
np.log.Info(context.Background(), logs.Notification,
|
np.log.Info(ctx, logs.Notification,
|
||||||
zap.String("type", "update peer state"),
|
zap.String("type", "update peer state"),
|
||||||
zap.String("key", hex.EncodeToString(updPeer.PublicKey().Bytes())))
|
zap.String("key", hex.EncodeToString(updPeer.PublicKey().Bytes())))
|
||||||
|
|
||||||
|
@ -76,21 +76,21 @@ func (np *Processor) handleUpdateState(ev event.Event) {
|
||||||
})
|
})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
// there system can be moved into controlled degradation stage
|
// there system can be moved into controlled degradation stage
|
||||||
np.log.Warn(context.Background(), logs.NetmapNetmapWorkerPoolDrained,
|
np.log.Warn(ctx, logs.NetmapNetmapWorkerPoolDrained,
|
||||||
zap.Int("capacity", np.pool.Cap()))
|
zap.Int("capacity", np.pool.Cap()))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (np *Processor) handleCleanupTick(ev event.Event) {
|
func (np *Processor) handleCleanupTick(ctx context.Context, ev event.Event) {
|
||||||
if !np.netmapSnapshot.enabled {
|
if !np.netmapSnapshot.enabled {
|
||||||
np.log.Debug(context.Background(), logs.NetmapNetmapCleanUpRoutineIsDisabled518)
|
np.log.Debug(ctx, logs.NetmapNetmapCleanUpRoutineIsDisabled518)
|
||||||
|
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
cleanup := ev.(netmapCleanupTick)
|
cleanup := ev.(netmapCleanupTick)
|
||||||
|
|
||||||
np.log.Info(context.Background(), logs.NetmapTick, zap.String("type", "netmap cleaner"))
|
np.log.Info(ctx, logs.NetmapTick, zap.String("type", "netmap cleaner"))
|
||||||
|
|
||||||
// send event to the worker pool
|
// send event to the worker pool
|
||||||
err := processors.SubmitEvent(np.pool, np.metrics, "netmap_cleanup_tick", func() bool {
|
err := processors.SubmitEvent(np.pool, np.metrics, "netmap_cleanup_tick", func() bool {
|
||||||
|
@ -98,7 +98,7 @@ func (np *Processor) handleCleanupTick(ev event.Event) {
|
||||||
})
|
})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
// there system can be moved into controlled degradation stage
|
// there system can be moved into controlled degradation stage
|
||||||
np.log.Warn(context.Background(), logs.NetmapNetmapWorkerPoolDrained,
|
np.log.Warn(ctx, logs.NetmapNetmapWorkerPoolDrained,
|
||||||
zap.Int("capacity", np.pool.Cap()))
|
zap.Int("capacity", np.pool.Cap()))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,6 +1,7 @@
|
||||||
package netmap
|
package netmap
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"context"
|
||||||
"fmt"
|
"fmt"
|
||||||
"testing"
|
"testing"
|
||||||
"time"
|
"time"
|
||||||
|
@ -38,7 +39,7 @@ func TestNewEpochTick(t *testing.T) {
|
||||||
require.NoError(t, err, "failed to create processor")
|
require.NoError(t, err, "failed to create processor")
|
||||||
|
|
||||||
ev := timerEvent.NewEpochTick{}
|
ev := timerEvent.NewEpochTick{}
|
||||||
proc.HandleNewEpochTick(ev)
|
proc.HandleNewEpochTick(context.Background(), ev)
|
||||||
|
|
||||||
for proc.pool.Running() > 0 {
|
for proc.pool.Running() > 0 {
|
||||||
time.Sleep(10 * time.Millisecond)
|
time.Sleep(10 * time.Millisecond)
|
||||||
|
@ -90,7 +91,7 @@ func TestNewEpoch(t *testing.T) {
|
||||||
Num: 101,
|
Num: 101,
|
||||||
Hash: util.Uint256{101},
|
Hash: util.Uint256{101},
|
||||||
}
|
}
|
||||||
proc.handleNewEpoch(ev)
|
proc.handleNewEpoch(context.Background(), ev)
|
||||||
|
|
||||||
for proc.pool.Running() > 0 {
|
for proc.pool.Running() > 0 {
|
||||||
time.Sleep(10 * time.Millisecond)
|
time.Sleep(10 * time.Millisecond)
|
||||||
|
@ -130,7 +131,7 @@ func TestAddPeer(t *testing.T) {
|
||||||
MainTransaction: &transaction.Transaction{},
|
MainTransaction: &transaction.Transaction{},
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
proc.handleAddPeer(ev)
|
proc.handleAddPeer(context.Background(), ev)
|
||||||
|
|
||||||
for proc.pool.Running() > 0 {
|
for proc.pool.Running() > 0 {
|
||||||
time.Sleep(10 * time.Millisecond)
|
time.Sleep(10 * time.Millisecond)
|
||||||
|
@ -145,7 +146,7 @@ func TestAddPeer(t *testing.T) {
|
||||||
MainTransaction: &transaction.Transaction{},
|
MainTransaction: &transaction.Transaction{},
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
proc.handleAddPeer(ev)
|
proc.handleAddPeer(context.Background(), ev)
|
||||||
|
|
||||||
for proc.pool.Running() > 0 {
|
for proc.pool.Running() > 0 {
|
||||||
time.Sleep(10 * time.Millisecond)
|
time.Sleep(10 * time.Millisecond)
|
||||||
|
@ -188,7 +189,7 @@ func TestUpdateState(t *testing.T) {
|
||||||
MainTransaction: &transaction.Transaction{},
|
MainTransaction: &transaction.Transaction{},
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
proc.handleUpdateState(ev)
|
proc.handleUpdateState(context.Background(), ev)
|
||||||
|
|
||||||
for proc.pool.Running() > 0 {
|
for proc.pool.Running() > 0 {
|
||||||
time.Sleep(10 * time.Millisecond)
|
time.Sleep(10 * time.Millisecond)
|
||||||
|
@ -232,7 +233,7 @@ func TestCleanupTick(t *testing.T) {
|
||||||
txHash: util.Uint256{123},
|
txHash: util.Uint256{123},
|
||||||
}
|
}
|
||||||
|
|
||||||
proc.handleCleanupTick(ev)
|
proc.handleCleanupTick(context.Background(), ev)
|
||||||
|
|
||||||
for proc.pool.Running() > 0 {
|
for proc.pool.Running() > 0 {
|
||||||
time.Sleep(10 * time.Millisecond)
|
time.Sleep(10 * time.Millisecond)
|
||||||
|
@ -413,6 +414,6 @@ type testEventHandler struct {
|
||||||
handledEvents []event.Event
|
handledEvents []event.Event
|
||||||
}
|
}
|
||||||
|
|
||||||
func (h *testEventHandler) Handle(e event.Event) {
|
func (h *testEventHandler) Handle(_ context.Context, e event.Event) {
|
||||||
h.handledEvents = append(h.handledEvents, e)
|
h.handledEvents = append(h.handledEvents, e)
|
||||||
}
|
}
|
||||||
|
|
|
@ -11,12 +11,12 @@ import (
|
||||||
|
|
||||||
// Process new epoch notification by setting global epoch value and resetting
|
// Process new epoch notification by setting global epoch value and resetting
|
||||||
// local epoch timer.
|
// local epoch timer.
|
||||||
func (np *Processor) processNewEpoch(ev netmapEvent.NewEpoch) bool {
|
func (np *Processor) processNewEpoch(ctx context.Context, ev netmapEvent.NewEpoch) bool {
|
||||||
epoch := ev.EpochNumber()
|
epoch := ev.EpochNumber()
|
||||||
|
|
||||||
epochDuration, err := np.netmapClient.EpochDuration()
|
epochDuration, err := np.netmapClient.EpochDuration()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
np.log.Warn(context.Background(), logs.NetmapCantGetEpochDuration,
|
np.log.Warn(ctx, logs.NetmapCantGetEpochDuration,
|
||||||
zap.String("error", err.Error()))
|
zap.String("error", err.Error()))
|
||||||
} else {
|
} else {
|
||||||
np.epochState.SetEpochDuration(epochDuration)
|
np.epochState.SetEpochDuration(epochDuration)
|
||||||
|
@ -26,46 +26,46 @@ func (np *Processor) processNewEpoch(ev netmapEvent.NewEpoch) bool {
|
||||||
|
|
||||||
h, err := np.netmapClient.MorphTxHeight(ev.TxHash())
|
h, err := np.netmapClient.MorphTxHeight(ev.TxHash())
|
||||||
if err != nil {
|
if err != nil {
|
||||||
np.log.Warn(context.Background(), logs.NetmapCantGetTransactionHeight,
|
np.log.Warn(ctx, logs.NetmapCantGetTransactionHeight,
|
||||||
zap.String("hash", ev.TxHash().StringLE()),
|
zap.String("hash", ev.TxHash().StringLE()),
|
||||||
zap.String("error", err.Error()))
|
zap.String("error", err.Error()))
|
||||||
}
|
}
|
||||||
|
|
||||||
if err := np.epochTimer.ResetEpochTimer(h); err != nil {
|
if err := np.epochTimer.ResetEpochTimer(h); err != nil {
|
||||||
np.log.Warn(context.Background(), logs.NetmapCantResetEpochTimer,
|
np.log.Warn(ctx, logs.NetmapCantResetEpochTimer,
|
||||||
zap.String("error", err.Error()))
|
zap.String("error", err.Error()))
|
||||||
}
|
}
|
||||||
|
|
||||||
// get new netmap snapshot
|
// get new netmap snapshot
|
||||||
networkMap, err := np.netmapClient.NetMap()
|
networkMap, err := np.netmapClient.NetMap()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
np.log.Warn(context.Background(), logs.NetmapCantGetNetmapSnapshotToPerformCleanup,
|
np.log.Warn(ctx, logs.NetmapCantGetNetmapSnapshotToPerformCleanup,
|
||||||
zap.String("error", err.Error()))
|
zap.String("error", err.Error()))
|
||||||
|
|
||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
|
|
||||||
np.netmapSnapshot.update(*networkMap, epoch)
|
np.netmapSnapshot.update(*networkMap, epoch)
|
||||||
np.handleCleanupTick(netmapCleanupTick{epoch: epoch, txHash: ev.TxHash()})
|
np.handleCleanupTick(ctx, netmapCleanupTick{epoch: epoch, txHash: ev.TxHash()})
|
||||||
np.handleAlphabetSync(governance.NewSyncEvent(ev.TxHash()))
|
np.handleAlphabetSync(ctx, governance.NewSyncEvent(ev.TxHash()))
|
||||||
np.handleNotaryDeposit(ev)
|
np.handleNotaryDeposit(ctx, ev)
|
||||||
|
|
||||||
return true
|
return true
|
||||||
}
|
}
|
||||||
|
|
||||||
// Process new epoch tick by invoking new epoch method in network map contract.
|
// Process new epoch tick by invoking new epoch method in network map contract.
|
||||||
func (np *Processor) processNewEpochTick() bool {
|
func (np *Processor) processNewEpochTick(ctx context.Context) bool {
|
||||||
if !np.alphabetState.IsAlphabet() {
|
if !np.alphabetState.IsAlphabet() {
|
||||||
np.log.Info(context.Background(), logs.NetmapNonAlphabetModeIgnoreNewEpochTick)
|
np.log.Info(ctx, logs.NetmapNonAlphabetModeIgnoreNewEpochTick)
|
||||||
return true
|
return true
|
||||||
}
|
}
|
||||||
|
|
||||||
nextEpoch := np.epochState.EpochCounter() + 1
|
nextEpoch := np.epochState.EpochCounter() + 1
|
||||||
np.log.Debug(context.Background(), logs.NetmapNextEpoch, zap.Uint64("value", nextEpoch))
|
np.log.Debug(ctx, logs.NetmapNextEpoch, zap.Uint64("value", nextEpoch))
|
||||||
|
|
||||||
err := np.netmapClient.NewEpoch(nextEpoch)
|
err := np.netmapClient.NewEpoch(nextEpoch)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
np.log.Error(context.Background(), logs.NetmapCantInvokeNetmapNewEpoch, zap.Error(err))
|
np.log.Error(ctx, logs.NetmapCantInvokeNetmapNewEpoch, zap.Error(err))
|
||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -48,8 +48,8 @@ func (s *Server) SetEpochDuration(val uint64) {
|
||||||
}
|
}
|
||||||
|
|
||||||
// IsActive is a getter for a global active flag state.
|
// IsActive is a getter for a global active flag state.
|
||||||
func (s *Server) IsActive() bool {
|
func (s *Server) IsActive(ctx context.Context) bool {
|
||||||
return s.InnerRingIndex() >= 0
|
return s.InnerRingIndex(ctx) >= 0
|
||||||
}
|
}
|
||||||
|
|
||||||
// IsAlphabet is a getter for a global alphabet flag state.
|
// IsAlphabet is a getter for a global alphabet flag state.
|
||||||
|
@ -59,10 +59,10 @@ func (s *Server) IsAlphabet() bool {
|
||||||
|
|
||||||
// InnerRingIndex is a getter for a global index of node in inner ring list. Negative
|
// InnerRingIndex is a getter for a global index of node in inner ring list. Negative
|
||||||
// index means that node is not in the inner ring list.
|
// index means that node is not in the inner ring list.
|
||||||
func (s *Server) InnerRingIndex() int {
|
func (s *Server) InnerRingIndex(ctx context.Context) int {
|
||||||
index, err := s.statusIndex.InnerRingIndex()
|
index, err := s.statusIndex.InnerRingIndex()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
s.log.Error(context.Background(), logs.InnerringCantGetInnerRingIndex, zap.String("error", err.Error()))
|
s.log.Error(ctx, logs.InnerringCantGetInnerRingIndex, zap.String("error", err.Error()))
|
||||||
return -1
|
return -1
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -71,10 +71,10 @@ func (s *Server) InnerRingIndex() int {
|
||||||
|
|
||||||
// InnerRingSize is a getter for a global size of inner ring list. This value
|
// InnerRingSize is a getter for a global size of inner ring list. This value
|
||||||
// paired with inner ring index.
|
// paired with inner ring index.
|
||||||
func (s *Server) InnerRingSize() int {
|
func (s *Server) InnerRingSize(ctx context.Context) int {
|
||||||
size, err := s.statusIndex.InnerRingSize()
|
size, err := s.statusIndex.InnerRingSize()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
s.log.Error(context.Background(), logs.InnerringCantGetInnerRingSize, zap.String("error", err.Error()))
|
s.log.Error(ctx, logs.InnerringCantGetInnerRingSize, zap.String("error", err.Error()))
|
||||||
return 0
|
return 0
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -93,18 +93,18 @@ func (s *Server) AlphabetIndex() int {
|
||||||
return int(index)
|
return int(index)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *Server) voteForSidechainValidator(prm governance.VoteValidatorPrm) error {
|
func (s *Server) voteForSidechainValidator(ctx context.Context, prm governance.VoteValidatorPrm) error {
|
||||||
validators := prm.Validators
|
validators := prm.Validators
|
||||||
|
|
||||||
index := s.InnerRingIndex()
|
index := s.InnerRingIndex(ctx)
|
||||||
if s.contracts.alphabet.indexOutOfRange(index) {
|
if s.contracts.alphabet.indexOutOfRange(index) {
|
||||||
s.log.Info(context.Background(), logs.InnerringIgnoreValidatorVoteNodeNotInAlphabetRange)
|
s.log.Info(ctx, logs.InnerringIgnoreValidatorVoteNodeNotInAlphabetRange)
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
if len(validators) == 0 {
|
if len(validators) == 0 {
|
||||||
s.log.Info(context.Background(), logs.InnerringIgnoreValidatorVoteEmptyValidatorsList)
|
s.log.Info(ctx, logs.InnerringIgnoreValidatorVoteEmptyValidatorsList)
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
@ -129,7 +129,7 @@ func (s *Server) voteForSidechainValidator(prm governance.VoteValidatorPrm) erro
|
||||||
s.contracts.alphabet.iterate(func(letter GlagoliticLetter, contract util.Uint160) {
|
s.contracts.alphabet.iterate(func(letter GlagoliticLetter, contract util.Uint160) {
|
||||||
_, err := s.morphClient.NotaryInvoke(contract, s.feeConfig.SideChainFee(), nonce, vubP, voteMethod, epoch, validators)
|
_, err := s.morphClient.NotaryInvoke(contract, s.feeConfig.SideChainFee(), nonce, vubP, voteMethod, epoch, validators)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
s.log.Warn(context.Background(), logs.InnerringCantInvokeVoteMethodInAlphabetContract,
|
s.log.Warn(ctx, logs.InnerringCantInvokeVoteMethodInAlphabetContract,
|
||||||
zap.Int8("alphabet_index", int8(letter)),
|
zap.Int8("alphabet_index", int8(letter)),
|
||||||
zap.Uint64("epoch", epoch),
|
zap.Uint64("epoch", epoch),
|
||||||
zap.String("error", err.Error()))
|
zap.String("error", err.Error()))
|
||||||
|
@ -141,9 +141,9 @@ func (s *Server) voteForSidechainValidator(prm governance.VoteValidatorPrm) erro
|
||||||
|
|
||||||
// VoteForSidechainValidator calls vote method on alphabet contracts with
|
// VoteForSidechainValidator calls vote method on alphabet contracts with
|
||||||
// the provided list of keys.
|
// the provided list of keys.
|
||||||
func (s *Server) VoteForSidechainValidator(prm governance.VoteValidatorPrm) error {
|
func (s *Server) VoteForSidechainValidator(ctx context.Context, prm governance.VoteValidatorPrm) error {
|
||||||
sort.Sort(prm.Validators)
|
sort.Sort(prm.Validators)
|
||||||
return s.voteForSidechainValidator(prm)
|
return s.voteForSidechainValidator(ctx, prm)
|
||||||
}
|
}
|
||||||
|
|
||||||
// ResetEpochTimer resets the block timer that produces events to update epoch
|
// ResetEpochTimer resets the block timer that produces events to update epoch
|
||||||
|
|
|
@ -46,9 +46,9 @@ func TestServerState(t *testing.T) {
|
||||||
srv.setHealthStatus(context.Background(), healthStatus)
|
srv.setHealthStatus(context.Background(), healthStatus)
|
||||||
require.Equal(t, healthStatus, srv.HealthStatus(), "invalid health status")
|
require.Equal(t, healthStatus, srv.HealthStatus(), "invalid health status")
|
||||||
|
|
||||||
require.True(t, srv.IsActive(), "invalid IsActive result")
|
require.True(t, srv.IsActive(context.Background()), "invalid IsActive result")
|
||||||
require.True(t, srv.IsAlphabet(), "invalid IsAlphabet result")
|
require.True(t, srv.IsAlphabet(), "invalid IsAlphabet result")
|
||||||
require.Equal(t, 0, srv.InnerRingIndex(), "invalid IR index")
|
require.Equal(t, 0, srv.InnerRingIndex(context.Background()), "invalid IR index")
|
||||||
require.Equal(t, 1, srv.InnerRingSize(), "invalid IR index")
|
require.Equal(t, 1, srv.InnerRingSize(context.Background()), "invalid IR index")
|
||||||
require.Equal(t, 0, srv.AlphabetIndex(), "invalid alphabet index")
|
require.Equal(t, 0, srv.AlphabetIndex(), "invalid alphabet index")
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,11 +1,13 @@
|
||||||
package event
|
package event
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"context"
|
||||||
|
|
||||||
"github.com/nspcc-dev/neo-go/pkg/core/block"
|
"github.com/nspcc-dev/neo-go/pkg/core/block"
|
||||||
)
|
)
|
||||||
|
|
||||||
// Handler is an Event processing function.
|
// Handler is an Event processing function.
|
||||||
type Handler func(Event)
|
type Handler func(context.Context, Event)
|
||||||
|
|
||||||
// BlockHandler is a chain block processing function.
|
// BlockHandler is a chain block processing function.
|
||||||
type BlockHandler func(*block.Block)
|
type BlockHandler func(*block.Block)
|
||||||
|
|
|
@ -280,7 +280,7 @@ loop:
|
||||||
continue loop
|
continue loop
|
||||||
}
|
}
|
||||||
|
|
||||||
l.handleNotaryEvent(notaryEvent)
|
l.handleNotaryEvent(ctx, notaryEvent)
|
||||||
case b, ok := <-chs.BlockCh:
|
case b, ok := <-chs.BlockCh:
|
||||||
if !ok {
|
if !ok {
|
||||||
l.log.Warn(ctx, logs.EventStopEventListenerByBlockChannel)
|
l.log.Warn(ctx, logs.EventStopEventListenerByBlockChannel)
|
||||||
|
@ -307,11 +307,11 @@ func (l *listener) handleBlockEvent(b *block.Block) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (l *listener) handleNotaryEvent(notaryEvent *result.NotaryRequestEvent) {
|
func (l *listener) handleNotaryEvent(ctx context.Context, notaryEvent *result.NotaryRequestEvent) {
|
||||||
if err := l.pool.Submit(func() {
|
if err := l.pool.Submit(func() {
|
||||||
l.parseAndHandleNotary(notaryEvent)
|
l.parseAndHandleNotary(ctx, notaryEvent)
|
||||||
}); err != nil {
|
}); err != nil {
|
||||||
l.log.Warn(context.Background(), logs.EventListenerWorkerPoolDrained,
|
l.log.Warn(ctx, logs.EventListenerWorkerPoolDrained,
|
||||||
zap.Int("capacity", l.pool.Cap()))
|
zap.Int("capacity", l.pool.Cap()))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -376,11 +376,11 @@ func (l *listener) parseAndHandleNotification(ctx context.Context, notifyEvent *
|
||||||
}
|
}
|
||||||
|
|
||||||
for _, handler := range handlers {
|
for _, handler := range handlers {
|
||||||
handler(event)
|
handler(ctx, event)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (l *listener) parseAndHandleNotary(nr *result.NotaryRequestEvent) {
|
func (l *listener) parseAndHandleNotary(ctx context.Context, nr *result.NotaryRequestEvent) {
|
||||||
// prepare the notary event
|
// prepare the notary event
|
||||||
notaryEvent, err := l.notaryEventsPreparator.Prepare(nr.NotaryRequest)
|
notaryEvent, err := l.notaryEventsPreparator.Prepare(nr.NotaryRequest)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -388,13 +388,13 @@ func (l *listener) parseAndHandleNotary(nr *result.NotaryRequestEvent) {
|
||||||
switch {
|
switch {
|
||||||
case errors.Is(err, ErrTXAlreadyHandled):
|
case errors.Is(err, ErrTXAlreadyHandled):
|
||||||
case errors.As(err, &expErr):
|
case errors.As(err, &expErr):
|
||||||
l.log.Warn(context.Background(), logs.EventSkipExpiredMainTXNotaryEvent,
|
l.log.Warn(ctx, logs.EventSkipExpiredMainTXNotaryEvent,
|
||||||
zap.String("error", err.Error()),
|
zap.String("error", err.Error()),
|
||||||
zap.Uint32("current_block_height", expErr.CurrentBlockHeight),
|
zap.Uint32("current_block_height", expErr.CurrentBlockHeight),
|
||||||
zap.Uint32("fallback_tx_not_valid_before_height", expErr.FallbackTXNotValidBeforeHeight),
|
zap.Uint32("fallback_tx_not_valid_before_height", expErr.FallbackTXNotValidBeforeHeight),
|
||||||
)
|
)
|
||||||
default:
|
default:
|
||||||
l.log.Warn(context.Background(), logs.EventCouldNotPrepareAndValidateNotaryEvent,
|
l.log.Warn(ctx, logs.EventCouldNotPrepareAndValidateNotaryEvent,
|
||||||
zap.String("error", err.Error()),
|
zap.String("error", err.Error()),
|
||||||
)
|
)
|
||||||
}
|
}
|
||||||
|
@ -418,7 +418,7 @@ func (l *listener) parseAndHandleNotary(nr *result.NotaryRequestEvent) {
|
||||||
l.mtx.RUnlock()
|
l.mtx.RUnlock()
|
||||||
|
|
||||||
if !ok {
|
if !ok {
|
||||||
log.Debug(context.Background(), logs.EventNotaryParserNotSet)
|
log.Debug(ctx, logs.EventNotaryParserNotSet)
|
||||||
|
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
@ -426,7 +426,7 @@ func (l *listener) parseAndHandleNotary(nr *result.NotaryRequestEvent) {
|
||||||
// parse the notary event
|
// parse the notary event
|
||||||
event, err := parser(notaryEvent)
|
event, err := parser(notaryEvent)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Warn(context.Background(), logs.EventCouldNotParseNotaryEvent,
|
log.Warn(ctx, logs.EventCouldNotParseNotaryEvent,
|
||||||
zap.String("error", err.Error()),
|
zap.String("error", err.Error()),
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -439,14 +439,14 @@ func (l *listener) parseAndHandleNotary(nr *result.NotaryRequestEvent) {
|
||||||
l.mtx.RUnlock()
|
l.mtx.RUnlock()
|
||||||
|
|
||||||
if !ok {
|
if !ok {
|
||||||
log.Info(context.Background(), logs.EventNotaryHandlersForParsedNotificationEventWereNotRegistered,
|
log.Info(ctx, logs.EventNotaryHandlersForParsedNotificationEventWereNotRegistered,
|
||||||
zap.Any("event", event),
|
zap.Any("event", event),
|
||||||
)
|
)
|
||||||
|
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
handler(event)
|
handler(ctx, event)
|
||||||
}
|
}
|
||||||
|
|
||||||
// SetNotificationParser sets the parser of particular contract event.
|
// SetNotificationParser sets the parser of particular contract event.
|
||||||
|
|
|
@ -59,7 +59,7 @@ func TestEventHandling(t *testing.T) {
|
||||||
handledNotifications := make([]Event, 0)
|
handledNotifications := make([]Event, 0)
|
||||||
l.RegisterNotificationHandler(NotificationHandlerInfo{
|
l.RegisterNotificationHandler(NotificationHandlerInfo{
|
||||||
scriptHashWithType: key,
|
scriptHashWithType: key,
|
||||||
h: func(e Event) {
|
h: func(_ context.Context, e Event) {
|
||||||
handledNotifications = append(handledNotifications, e)
|
handledNotifications = append(handledNotifications, e)
|
||||||
notificationHandled <- true
|
notificationHandled <- true
|
||||||
},
|
},
|
||||||
|
|
|
@ -85,12 +85,12 @@ func (s typeValue) GetType() Type {
|
||||||
|
|
||||||
// WorkerPoolHandler sets closure over worker pool w with passed handler h.
|
// WorkerPoolHandler sets closure over worker pool w with passed handler h.
|
||||||
func WorkerPoolHandler(w util2.WorkerPool, h Handler, log *logger.Logger) Handler {
|
func WorkerPoolHandler(w util2.WorkerPool, h Handler, log *logger.Logger) Handler {
|
||||||
return func(e Event) {
|
return func(ctx context.Context, e Event) {
|
||||||
err := w.Submit(func() {
|
err := w.Submit(func() {
|
||||||
h(e)
|
h(ctx, e)
|
||||||
})
|
})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Warn(context.Background(), logs.EventCouldNotSubmitHandlerToWorkerPool,
|
log.Warn(ctx, logs.EventCouldNotSubmitHandlerToWorkerPool,
|
||||||
zap.String("error", err.Error()),
|
zap.String("error", err.Error()),
|
||||||
)
|
)
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in a new issue