From 6921a890619cc23a2eb2d9e5b4e92cc1bc45ee41 Mon Sep 17 00:00:00 2001 From: Dmitrii Stepanov Date: Mon, 21 Oct 2024 12:21:01 +0300 Subject: [PATCH] [#1437] ir: Fix contextcheck linters Signed-off-by: Dmitrii Stepanov --- cmd/frostfs-node/config.go | 2 +- cmd/frostfs-node/container.go | 10 +++--- cmd/frostfs-node/netmap.go | 10 +++--- cmd/frostfs-node/session.go | 2 +- cmd/frostfs-node/tree.go | 12 +++---- pkg/innerring/initialization.go | 12 +++---- pkg/innerring/innerring.go | 28 ++++++++-------- pkg/innerring/notary.go | 6 ++-- pkg/innerring/processors/balance/handlers.go | 6 ++-- .../processors/balance/handlers_test.go | 5 +-- .../processors/container/handlers.go | 12 +++---- .../processors/container/handlers_test.go | 5 +-- pkg/innerring/processors/frostfs/handlers.go | 24 +++++++------- .../processors/frostfs/handlers_test.go | 11 ++++--- .../processors/governance/handlers.go | 8 ++--- .../processors/governance/handlers_test.go | 7 ++-- .../processors/governance/process_update.go | 20 ++++++------ .../processors/governance/processor.go | 3 +- pkg/innerring/processors/netmap/handlers.go | 32 +++++++++---------- .../processors/netmap/handlers_test.go | 15 +++++---- .../processors/netmap/process_epoch.go | 24 +++++++------- pkg/innerring/state.go | 26 +++++++-------- pkg/innerring/state_test.go | 6 ++-- pkg/morph/event/handlers.go | 4 ++- pkg/morph/event/listener.go | 24 +++++++------- pkg/morph/event/listener_test.go | 2 +- pkg/morph/event/utils.go | 6 ++-- 27 files changed, 165 insertions(+), 157 deletions(-) diff --git a/cmd/frostfs-node/config.go b/cmd/frostfs-node/config.go index bd1b99095..aa92e5ec5 100644 --- a/cmd/frostfs-node/config.go +++ b/cmd/frostfs-node/config.go @@ -1089,7 +1089,7 @@ func (c *cfg) LocalAddress() network.AddressGroup { func initLocalStorage(ctx context.Context, c *cfg) { ls := engine.New(c.engineOpts()...) - addNewEpochAsyncNotificationHandler(c, func(ev event.Event) { + addNewEpochAsyncNotificationHandler(c, func(ctx context.Context, ev event.Event) { ls.HandleNewEpoch(ctx, ev.(netmap2.NewEpoch).EpochNumber()) }) diff --git a/cmd/frostfs-node/container.go b/cmd/frostfs-node/container.go index 1a54f9ffc..3f75be235 100644 --- a/cmd/frostfs-node/container.go +++ b/cmd/frostfs-node/container.go @@ -89,7 +89,7 @@ func configureEACLAndContainerSources(c *cfg, client *cntClient.Client, cnrSrc c if c.cfgMorph.containerCacheSize > 0 { containerCache := newCachedContainerStorage(cnrSrc, c.cfgMorph.cacheTTL, c.cfgMorph.containerCacheSize) - subscribeToContainerCreation(c, func(e event.Event) { + subscribeToContainerCreation(c, func(ctx context.Context, e event.Event) { ev := e.(containerEvent.PutSuccess) // read owner of the created container in order to update the reading cache. @@ -102,21 +102,21 @@ func configureEACLAndContainerSources(c *cfg, client *cntClient.Client, cnrSrc c } else { // unlike removal, we expect successful receive of the container // after successful creation, so logging can be useful - c.log.Error(context.Background(), logs.FrostFSNodeReadNewlyCreatedContainerAfterTheNotification, + c.log.Error(ctx, logs.FrostFSNodeReadNewlyCreatedContainerAfterTheNotification, zap.Stringer("id", ev.ID), zap.Error(err), ) } - c.log.Debug(context.Background(), logs.FrostFSNodeContainerCreationEventsReceipt, + c.log.Debug(ctx, logs.FrostFSNodeContainerCreationEventsReceipt, zap.Stringer("id", ev.ID), ) }) - subscribeToContainerRemoval(c, func(e event.Event) { + subscribeToContainerRemoval(c, func(ctx context.Context, e event.Event) { ev := e.(containerEvent.DeleteSuccess) containerCache.handleRemoval(ev.ID) - c.log.Debug(context.Background(), logs.FrostFSNodeContainerRemovalEventsReceipt, + c.log.Debug(ctx, logs.FrostFSNodeContainerRemovalEventsReceipt, zap.Stringer("id", ev.ID), ) }) diff --git a/cmd/frostfs-node/netmap.go b/cmd/frostfs-node/netmap.go index 18667e636..35ab4d575 100644 --- a/cmd/frostfs-node/netmap.go +++ b/cmd/frostfs-node/netmap.go @@ -175,11 +175,11 @@ func initNetmapService(ctx context.Context, c *cfg) { } func addNewEpochNotificationHandlers(c *cfg) { - addNewEpochNotificationHandler(c, func(ev event.Event) { + addNewEpochNotificationHandler(c, func(_ context.Context, ev event.Event) { c.cfgNetmap.state.setCurrentEpoch(ev.(netmapEvent.NewEpoch).EpochNumber()) }) - addNewEpochAsyncNotificationHandler(c, func(ev event.Event) { + addNewEpochAsyncNotificationHandler(c, func(ctx context.Context, ev event.Event) { e := ev.(netmapEvent.NewEpoch).EpochNumber() c.updateContractNodeInfo(e) @@ -189,15 +189,15 @@ func addNewEpochNotificationHandlers(c *cfg) { } if err := c.bootstrap(); err != nil { - c.log.Warn(context.Background(), logs.FrostFSNodeCantSendRebootstrapTx, zap.Error(err)) + c.log.Warn(ctx, logs.FrostFSNodeCantSendRebootstrapTx, zap.Error(err)) } }) if c.cfgMorph.notaryEnabled { - addNewEpochAsyncNotificationHandler(c, func(_ event.Event) { + addNewEpochAsyncNotificationHandler(c, func(ctx context.Context, _ event.Event) { _, _, err := makeNotaryDeposit(c) if err != nil { - c.log.Error(context.Background(), logs.FrostFSNodeCouldNotMakeNotaryDeposit, + c.log.Error(ctx, logs.FrostFSNodeCouldNotMakeNotaryDeposit, zap.String("error", err.Error()), ) } diff --git a/cmd/frostfs-node/session.go b/cmd/frostfs-node/session.go index a35d4e470..2f3c9cbfe 100644 --- a/cmd/frostfs-node/session.go +++ b/cmd/frostfs-node/session.go @@ -48,7 +48,7 @@ func initSessionService(c *cfg) { _ = c.privateTokenStore.Close() }) - addNewEpochNotificationHandler(c, func(ev event.Event) { + addNewEpochNotificationHandler(c, func(_ context.Context, ev event.Event) { c.privateTokenStore.RemoveOld(ev.(netmap.NewEpoch).EpochNumber()) }) diff --git a/cmd/frostfs-node/tree.go b/cmd/frostfs-node/tree.go index 59923ee2f..c423c0660 100644 --- a/cmd/frostfs-node/tree.go +++ b/cmd/frostfs-node/tree.go @@ -80,10 +80,10 @@ func initTreeService(c *cfg) { })) if d := treeConfig.SyncInterval(); d == 0 { - addNewEpochNotificationHandler(c, func(_ event.Event) { + addNewEpochNotificationHandler(c, func(ctx context.Context, _ event.Event) { err := c.treeService.SynchronizeAll() if err != nil { - c.log.Error(context.Background(), logs.FrostFSNodeCouldNotSynchronizeTreeService, zap.Error(err)) + c.log.Error(ctx, logs.FrostFSNodeCouldNotSynchronizeTreeService, zap.Error(err)) } }) } else { @@ -103,15 +103,15 @@ func initTreeService(c *cfg) { }() } - subscribeToContainerRemoval(c, func(e event.Event) { + subscribeToContainerRemoval(c, func(ctx context.Context, e event.Event) { ev := e.(containerEvent.DeleteSuccess) // This is executed asynchronously, so we don't care about the operation taking some time. - c.log.Debug(context.Background(), logs.FrostFSNodeRemovingAllTreesForContainer, zap.Stringer("cid", ev.ID)) - err := c.treeService.DropTree(context.Background(), ev.ID, "") + c.log.Debug(ctx, logs.FrostFSNodeRemovingAllTreesForContainer, zap.Stringer("cid", ev.ID)) + err := c.treeService.DropTree(ctx, ev.ID, "") if err != nil && !errors.Is(err, pilorama.ErrTreeNotFound) { // Ignore pilorama.ErrTreeNotFound but other errors, including shard.ErrReadOnly, should be logged. - c.log.Error(context.Background(), logs.FrostFSNodeContainerRemovalEventReceivedButTreesWerentRemoved, + c.log.Error(ctx, logs.FrostFSNodeContainerRemovalEventReceivedButTreesWerentRemoved, zap.Stringer("cid", ev.ID), zap.String("error", err.Error())) } diff --git a/pkg/innerring/initialization.go b/pkg/innerring/initialization.go index b8812819e..e08a613c3 100644 --- a/pkg/innerring/initialization.go +++ b/pkg/innerring/initialization.go @@ -137,12 +137,12 @@ func (s *Server) enableNotarySupport() error { return nil } -func (s *Server) initNotaryConfig() { +func (s *Server) initNotaryConfig(ctx context.Context) { s.mainNotaryConfig = notaryConfigs( !s.withoutMainNet && s.mainnetClient.ProbeNotary(), // if mainnet disabled then notary flag must be disabled too ) - s.log.Info(context.Background(), logs.InnerringNotarySupport, + s.log.Info(ctx, logs.InnerringNotarySupport, zap.Bool("sidechain_enabled", true), zap.Bool("mainchain_enabled", !s.mainNotaryConfig.disabled), ) @@ -152,8 +152,8 @@ func (s *Server) createAlphaSync(cfg *viper.Viper, frostfsCli *frostfsClient.Cli var alphaSync event.Handler if s.withoutMainNet || cfg.GetBool("governance.disable") { - alphaSync = func(event.Event) { - s.log.Debug(context.Background(), logs.InnerringAlphabetKeysSyncIsDisabled) + alphaSync = func(ctx context.Context, _ event.Event) { + s.log.Debug(ctx, logs.InnerringAlphabetKeysSyncIsDisabled) } } else { // create governance processor @@ -196,9 +196,9 @@ func (s *Server) createIRFetcher() irFetcher { return irf } -func (s *Server) initTimers(cfg *viper.Viper) { +func (s *Server) initTimers(ctx context.Context, cfg *viper.Viper) { s.epochTimer = newEpochTimer(&epochTimerArgs{ - newEpochHandlers: s.newEpochTickHandlers(), + newEpochHandlers: s.newEpochTickHandlers(ctx), epoch: s, }) diff --git a/pkg/innerring/innerring.go b/pkg/innerring/innerring.go index 67927c10c..e81ec6bca 100644 --- a/pkg/innerring/innerring.go +++ b/pkg/innerring/innerring.go @@ -152,7 +152,7 @@ func (s *Server) Start(ctx context.Context, intError chan<- error) (err error) { return err } - err = s.initConfigFromBlockchain() + err = s.initConfigFromBlockchain(ctx) if err != nil { return err } @@ -173,14 +173,14 @@ func (s *Server) Start(ctx context.Context, intError chan<- error) (err error) { prm.Validators = s.predefinedValidators // vote for sidechain validator if it is prepared in config - err = s.voteForSidechainValidator(prm) + err = s.voteForSidechainValidator(ctx, prm) if err != nil { // we don't stop inner ring execution on this error s.log.Warn(ctx, logs.InnerringCantVoteForPreparedValidators, zap.String("error", err.Error())) } - s.tickInitialExpoch() + s.tickInitialExpoch(ctx) morphErr := make(chan error) mainnnetErr := make(chan error) @@ -283,11 +283,11 @@ func (s *Server) initSideNotary(ctx context.Context) error { ) } -func (s *Server) tickInitialExpoch() { +func (s *Server) tickInitialExpoch(ctx context.Context) { initialEpochTicker := timer.NewOneTickTimer( timer.StaticBlockMeter(s.initialEpochTickDelta), func() { - s.netmapProcessor.HandleNewEpochTick(timerEvent.NewEpochTick{}) + s.netmapProcessor.HandleNewEpochTick(ctx, timerEvent.NewEpochTick{}) }) s.addBlockTimer(initialEpochTicker) } @@ -376,7 +376,7 @@ func New(ctx context.Context, log *logger.Logger, cfg *viper.Viper, errChan chan return nil, err } - server.initNotaryConfig() + server.initNotaryConfig(ctx) err = server.initContracts(cfg) if err != nil { @@ -405,7 +405,7 @@ func New(ctx context.Context, log *logger.Logger, cfg *viper.Viper, errChan chan return nil, err } - server.initTimers(cfg) + server.initTimers(ctx, cfg) err = server.initGRPCServer(cfg, log, audit) if err != nil { @@ -573,7 +573,7 @@ func parseMultinetConfig(cfg *viper.Viper, m metrics.MultinetMetrics) internalNe return nc } -func (s *Server) initConfigFromBlockchain() error { +func (s *Server) initConfigFromBlockchain(ctx context.Context) error { // get current epoch epoch, err := s.netmapClient.Epoch() if err != nil { @@ -602,8 +602,8 @@ func (s *Server) initConfigFromBlockchain() error { return err } - s.log.Debug(context.Background(), logs.InnerringReadConfigFromBlockchain, - zap.Bool("active", s.IsActive()), + s.log.Debug(ctx, logs.InnerringReadConfigFromBlockchain, + zap.Bool("active", s.IsActive(ctx)), zap.Bool("alphabet", s.IsAlphabet()), zap.Uint64("epoch", epoch), zap.Uint32("precision", balancePrecision), @@ -635,17 +635,17 @@ func (s *Server) nextEpochBlockDelta() (uint32, error) { // onlyAlphabet wrapper around event handler that executes it // only if inner ring node is alphabet node. func (s *Server) onlyAlphabetEventHandler(f event.Handler) event.Handler { - return func(ev event.Event) { + return func(ctx context.Context, ev event.Event) { if s.IsAlphabet() { - f(ev) + f(ctx, ev) } } } -func (s *Server) newEpochTickHandlers() []newEpochHandler { +func (s *Server) newEpochTickHandlers(ctx context.Context) []newEpochHandler { newEpochHandlers := []newEpochHandler{ func() { - s.netmapProcessor.HandleNewEpochTick(timerEvent.NewEpochTick{}) + s.netmapProcessor.HandleNewEpochTick(ctx, timerEvent.NewEpochTick{}) }, } diff --git a/pkg/innerring/notary.go b/pkg/innerring/notary.go index 902a4c30a..dd3afa2c2 100644 --- a/pkg/innerring/notary.go +++ b/pkg/innerring/notary.go @@ -50,16 +50,16 @@ func (s *Server) depositSideNotary() (util.Uint256, error) { return tx, err } -func (s *Server) notaryHandler(_ event.Event) { +func (s *Server) notaryHandler(ctx context.Context, _ event.Event) { if !s.mainNotaryConfig.disabled { _, err := s.depositMainNotary() if err != nil { - s.log.Error(context.Background(), logs.InnerringCantMakeNotaryDepositInMainChain, zap.Error(err)) + s.log.Error(ctx, logs.InnerringCantMakeNotaryDepositInMainChain, zap.Error(err)) } } if _, err := s.depositSideNotary(); err != nil { - s.log.Error(context.Background(), logs.InnerringCantMakeNotaryDepositInSideChain, zap.Error(err)) + s.log.Error(ctx, logs.InnerringCantMakeNotaryDepositInSideChain, zap.Error(err)) } } diff --git a/pkg/innerring/processors/balance/handlers.go b/pkg/innerring/processors/balance/handlers.go index 3792fc2af..5a89e6f7c 100644 --- a/pkg/innerring/processors/balance/handlers.go +++ b/pkg/innerring/processors/balance/handlers.go @@ -11,9 +11,9 @@ import ( "go.uber.org/zap" ) -func (bp *Processor) handleLock(ev event.Event) { +func (bp *Processor) handleLock(ctx context.Context, ev event.Event) { lock := ev.(balanceEvent.Lock) - bp.log.Info(context.Background(), logs.Notification, + bp.log.Info(ctx, logs.Notification, zap.String("type", "lock"), zap.String("value", hex.EncodeToString(lock.ID()))) @@ -24,7 +24,7 @@ func (bp *Processor) handleLock(ev event.Event) { }) if err != nil { // there system can be moved into controlled degradation stage - bp.log.Warn(context.Background(), logs.BalanceBalanceWorkerPoolDrained, + bp.log.Warn(ctx, logs.BalanceBalanceWorkerPoolDrained, zap.Int("capacity", bp.pool.Cap())) } } diff --git a/pkg/innerring/processors/balance/handlers_test.go b/pkg/innerring/processors/balance/handlers_test.go index 86a9e15d0..3ef4959cc 100644 --- a/pkg/innerring/processors/balance/handlers_test.go +++ b/pkg/innerring/processors/balance/handlers_test.go @@ -1,6 +1,7 @@ package balance import ( + "context" "testing" "time" @@ -30,7 +31,7 @@ func TestProcessorCallsFrostFSContractForLockEvent(t *testing.T) { }) require.NoError(t, err, "failed to create processor") - processor.handleLock(balanceEvent.Lock{}) + processor.handleLock(context.Background(), balanceEvent.Lock{}) for processor.pool.Running() > 0 { time.Sleep(10 * time.Millisecond) @@ -56,7 +57,7 @@ func TestProcessorDoesntCallFrostFSContractIfNotAlphabet(t *testing.T) { }) require.NoError(t, err, "failed to create processor") - processor.handleLock(balanceEvent.Lock{}) + processor.handleLock(context.Background(), balanceEvent.Lock{}) for processor.pool.Running() > 0 { time.Sleep(10 * time.Millisecond) diff --git a/pkg/innerring/processors/container/handlers.go b/pkg/innerring/processors/container/handlers.go index b3d50d9d0..45cac513a 100644 --- a/pkg/innerring/processors/container/handlers.go +++ b/pkg/innerring/processors/container/handlers.go @@ -12,11 +12,11 @@ import ( "go.uber.org/zap" ) -func (cp *Processor) handlePut(ev event.Event) { +func (cp *Processor) handlePut(ctx context.Context, ev event.Event) { put := ev.(putEvent) id := sha256.Sum256(put.Container()) - cp.log.Info(context.Background(), logs.Notification, + cp.log.Info(ctx, logs.Notification, zap.String("type", "container put"), zap.String("id", base58.Encode(id[:]))) @@ -27,14 +27,14 @@ func (cp *Processor) handlePut(ev event.Event) { }) if err != nil { // there system can be moved into controlled degradation stage - cp.log.Warn(context.Background(), logs.ContainerContainerProcessorWorkerPoolDrained, + cp.log.Warn(ctx, logs.ContainerContainerProcessorWorkerPoolDrained, zap.Int("capacity", cp.pool.Cap())) } } -func (cp *Processor) handleDelete(ev event.Event) { +func (cp *Processor) handleDelete(ctx context.Context, ev event.Event) { del := ev.(containerEvent.Delete) - cp.log.Info(context.Background(), logs.Notification, + cp.log.Info(ctx, logs.Notification, zap.String("type", "container delete"), zap.String("id", base58.Encode(del.ContainerID()))) @@ -45,7 +45,7 @@ func (cp *Processor) handleDelete(ev event.Event) { }) if err != nil { // there system can be moved into controlled degradation stage - cp.log.Warn(context.Background(), logs.ContainerContainerProcessorWorkerPoolDrained, + cp.log.Warn(ctx, logs.ContainerContainerProcessorWorkerPoolDrained, zap.Int("capacity", cp.pool.Cap())) } } diff --git a/pkg/innerring/processors/container/handlers_test.go b/pkg/innerring/processors/container/handlers_test.go index dc1e919bb..a2fe50fa8 100644 --- a/pkg/innerring/processors/container/handlers_test.go +++ b/pkg/innerring/processors/container/handlers_test.go @@ -1,6 +1,7 @@ package container import ( + "context" "crypto/ecdsa" "encoding/hex" "testing" @@ -71,7 +72,7 @@ func TestPutEvent(t *testing.T) { nr: nr, } - proc.handlePut(event) + proc.handlePut(context.Background(), event) for proc.pool.Running() > 0 { time.Sleep(10 * time.Millisecond) @@ -143,7 +144,7 @@ func TestDeleteEvent(t *testing.T) { Signature: signature, } - proc.handleDelete(ev) + proc.handleDelete(context.Background(), ev) for proc.pool.Running() > 0 { time.Sleep(10 * time.Millisecond) diff --git a/pkg/innerring/processors/frostfs/handlers.go b/pkg/innerring/processors/frostfs/handlers.go index 02dfbaf60..d11ad0f5c 100644 --- a/pkg/innerring/processors/frostfs/handlers.go +++ b/pkg/innerring/processors/frostfs/handlers.go @@ -13,11 +13,11 @@ import ( "go.uber.org/zap" ) -func (np *Processor) handleDeposit(ev event.Event) { +func (np *Processor) handleDeposit(ctx context.Context, ev event.Event) { deposit := ev.(frostfsEvent.Deposit) depositIDBin := bytes.Clone(deposit.ID()) slices.Reverse(depositIDBin) - np.log.Info(context.Background(), logs.Notification, + np.log.Info(ctx, logs.Notification, zap.String("type", "deposit"), zap.String("id", hex.EncodeToString(depositIDBin))) @@ -28,16 +28,16 @@ func (np *Processor) handleDeposit(ev event.Event) { }) if err != nil { // there system can be moved into controlled degradation stage - np.log.Warn(context.Background(), logs.FrostFSFrostfsProcessorWorkerPoolDrained, + np.log.Warn(ctx, logs.FrostFSFrostfsProcessorWorkerPoolDrained, zap.Int("capacity", np.pool.Cap())) } } -func (np *Processor) handleWithdraw(ev event.Event) { +func (np *Processor) handleWithdraw(ctx context.Context, ev event.Event) { withdraw := ev.(frostfsEvent.Withdraw) withdrawBin := bytes.Clone(withdraw.ID()) slices.Reverse(withdrawBin) - np.log.Info(context.Background(), logs.Notification, + np.log.Info(ctx, logs.Notification, zap.String("type", "withdraw"), zap.String("id", hex.EncodeToString(withdrawBin))) @@ -48,14 +48,14 @@ func (np *Processor) handleWithdraw(ev event.Event) { }) if err != nil { // there system can be moved into controlled degradation stage - np.log.Warn(context.Background(), logs.FrostFSFrostfsProcessorWorkerPoolDrained, + np.log.Warn(ctx, logs.FrostFSFrostfsProcessorWorkerPoolDrained, zap.Int("capacity", np.pool.Cap())) } } -func (np *Processor) handleCheque(ev event.Event) { +func (np *Processor) handleCheque(ctx context.Context, ev event.Event) { cheque := ev.(frostfsEvent.Cheque) - np.log.Info(context.Background(), logs.Notification, + np.log.Info(ctx, logs.Notification, zap.String("type", "cheque"), zap.String("id", hex.EncodeToString(cheque.ID()))) @@ -66,14 +66,14 @@ func (np *Processor) handleCheque(ev event.Event) { }) if err != nil { // there system can be moved into controlled degradation stage - np.log.Warn(context.Background(), logs.FrostFSFrostfsProcessorWorkerPoolDrained, + np.log.Warn(ctx, logs.FrostFSFrostfsProcessorWorkerPoolDrained, zap.Int("capacity", np.pool.Cap())) } } -func (np *Processor) handleConfig(ev event.Event) { +func (np *Processor) handleConfig(ctx context.Context, ev event.Event) { cfg := ev.(frostfsEvent.Config) - np.log.Info(context.Background(), logs.Notification, + np.log.Info(ctx, logs.Notification, zap.String("type", "set config"), zap.String("key", hex.EncodeToString(cfg.Key())), zap.String("value", hex.EncodeToString(cfg.Value()))) @@ -85,7 +85,7 @@ func (np *Processor) handleConfig(ev event.Event) { }) if err != nil { // there system can be moved into controlled degradation stage - np.log.Warn(context.Background(), logs.FrostFSFrostfsProcessorWorkerPoolDrained, + np.log.Warn(ctx, logs.FrostFSFrostfsProcessorWorkerPoolDrained, zap.Int("capacity", np.pool.Cap())) } } diff --git a/pkg/innerring/processors/frostfs/handlers_test.go b/pkg/innerring/processors/frostfs/handlers_test.go index 6425172bd..c1541ca40 100644 --- a/pkg/innerring/processors/frostfs/handlers_test.go +++ b/pkg/innerring/processors/frostfs/handlers_test.go @@ -1,6 +1,7 @@ package frostfs import ( + "context" "testing" "time" @@ -36,7 +37,7 @@ func TestHandleDeposit(t *testing.T) { AmountValue: 1000, } - proc.handleDeposit(ev) + proc.handleDeposit(context.Background(), ev) for proc.pool.Running() > 0 { time.Sleep(10 * time.Millisecond) @@ -57,7 +58,7 @@ func TestHandleDeposit(t *testing.T) { es.epochCounter = 109 - proc.handleDeposit(ev) + proc.handleDeposit(context.Background(), ev) for proc.pool.Running() > 0 { time.Sleep(10 * time.Millisecond) @@ -98,7 +99,7 @@ func TestHandleWithdraw(t *testing.T) { AmountValue: 1000, } - proc.handleWithdraw(ev) + proc.handleWithdraw(context.Background(), ev) for proc.pool.Running() > 0 { time.Sleep(10 * time.Millisecond) @@ -139,7 +140,7 @@ func TestHandleCheque(t *testing.T) { LockValue: util.Uint160{200}, } - proc.handleCheque(ev) + proc.handleCheque(context.Background(), ev) for proc.pool.Running() > 0 { time.Sleep(10 * time.Millisecond) @@ -176,7 +177,7 @@ func TestHandleConfig(t *testing.T) { TxHashValue: util.Uint256{100}, } - proc.handleConfig(ev) + proc.handleConfig(context.Background(), ev) for proc.pool.Running() > 0 { time.Sleep(10 * time.Millisecond) diff --git a/pkg/innerring/processors/governance/handlers.go b/pkg/innerring/processors/governance/handlers.go index dee8c13e2..7e8ab629d 100644 --- a/pkg/innerring/processors/governance/handlers.go +++ b/pkg/innerring/processors/governance/handlers.go @@ -13,7 +13,7 @@ import ( "go.uber.org/zap" ) -func (gp *Processor) HandleAlphabetSync(e event.Event) { +func (gp *Processor) HandleAlphabetSync(ctx context.Context, e event.Event) { var ( typ string hash util.Uint256 @@ -34,16 +34,16 @@ func (gp *Processor) HandleAlphabetSync(e event.Event) { return } - gp.log.Info(context.Background(), logs.GovernanceNewEvent, zap.String("type", typ)) + gp.log.Info(ctx, logs.GovernanceNewEvent, zap.String("type", typ)) // send event to the worker pool err := processors.SubmitEvent(gp.pool, gp.metrics, "alphabet_sync", func() bool { - return gp.processAlphabetSync(hash) + return gp.processAlphabetSync(ctx, hash) }) if err != nil { // there system can be moved into controlled degradation stage - gp.log.Warn(context.Background(), logs.GovernanceGovernanceWorkerPoolDrained, + gp.log.Warn(ctx, logs.GovernanceGovernanceWorkerPoolDrained, zap.Int("capacity", gp.pool.Cap())) } } diff --git a/pkg/innerring/processors/governance/handlers_test.go b/pkg/innerring/processors/governance/handlers_test.go index 87040bdef..286935129 100644 --- a/pkg/innerring/processors/governance/handlers_test.go +++ b/pkg/innerring/processors/governance/handlers_test.go @@ -1,6 +1,7 @@ package governance import ( + "context" "encoding/binary" "sort" "testing" @@ -57,7 +58,7 @@ func TestHandleAlphabetSyncEvent(t *testing.T) { txHash: util.Uint256{100}, } - proc.HandleAlphabetSync(ev) + proc.HandleAlphabetSync(context.Background(), ev) for proc.pool.Running() > 0 { time.Sleep(10 * time.Millisecond) @@ -133,7 +134,7 @@ func TestHandleAlphabetDesignateEvent(t *testing.T) { Role: noderoles.NeoFSAlphabet, } - proc.HandleAlphabetSync(ev) + proc.HandleAlphabetSync(context.Background(), ev) for proc.pool.Running() > 0 { time.Sleep(10 * time.Millisecond) @@ -226,7 +227,7 @@ type testVoter struct { votes []VoteValidatorPrm } -func (v *testVoter) VoteForSidechainValidator(prm VoteValidatorPrm) error { +func (v *testVoter) VoteForSidechainValidator(_ context.Context, prm VoteValidatorPrm) error { v.votes = append(v.votes, prm) return nil } diff --git a/pkg/innerring/processors/governance/process_update.go b/pkg/innerring/processors/governance/process_update.go index faca22f67..fdfdfa479 100644 --- a/pkg/innerring/processors/governance/process_update.go +++ b/pkg/innerring/processors/governance/process_update.go @@ -19,39 +19,39 @@ const ( alphabetUpdateIDPrefix = "AlphabetUpdate" ) -func (gp *Processor) processAlphabetSync(txHash util.Uint256) bool { +func (gp *Processor) processAlphabetSync(ctx context.Context, txHash util.Uint256) bool { if !gp.alphabetState.IsAlphabet() { - gp.log.Info(context.Background(), logs.GovernanceNonAlphabetModeIgnoreAlphabetSync) + gp.log.Info(ctx, logs.GovernanceNonAlphabetModeIgnoreAlphabetSync) return true } mainnetAlphabet, err := gp.mainnetClient.NeoFSAlphabetList() if err != nil { - gp.log.Error(context.Background(), logs.GovernanceCantFetchAlphabetListFromMainNet, + gp.log.Error(ctx, logs.GovernanceCantFetchAlphabetListFromMainNet, zap.String("error", err.Error())) return false } sidechainAlphabet, err := gp.morphClient.Committee() if err != nil { - gp.log.Error(context.Background(), logs.GovernanceCantFetchAlphabetListFromSideChain, + gp.log.Error(ctx, logs.GovernanceCantFetchAlphabetListFromSideChain, zap.String("error", err.Error())) return false } newAlphabet, err := newAlphabetList(sidechainAlphabet, mainnetAlphabet) if err != nil { - gp.log.Error(context.Background(), logs.GovernanceCantMergeAlphabetListsFromMainNetAndSideChain, + gp.log.Error(ctx, logs.GovernanceCantMergeAlphabetListsFromMainNetAndSideChain, zap.String("error", err.Error())) return false } if newAlphabet == nil { - gp.log.Info(context.Background(), logs.GovernanceNoGovernanceUpdateAlphabetListHasNotBeenChanged) + gp.log.Info(ctx, logs.GovernanceNoGovernanceUpdateAlphabetListHasNotBeenChanged) return true } - gp.log.Info(context.Background(), logs.GovernanceAlphabetListHasBeenChangedStartingUpdate, + gp.log.Info(ctx, logs.GovernanceAlphabetListHasBeenChangedStartingUpdate, zap.String("side_chain_alphabet", prettyKeys(sidechainAlphabet)), zap.String("new_alphabet", prettyKeys(newAlphabet)), ) @@ -62,9 +62,9 @@ func (gp *Processor) processAlphabetSync(txHash util.Uint256) bool { } // 1. Vote to sidechain committee via alphabet contracts. - err = gp.voter.VoteForSidechainValidator(votePrm) + err = gp.voter.VoteForSidechainValidator(ctx, votePrm) if err != nil { - gp.log.Error(context.Background(), logs.GovernanceCantVoteForSideChainCommittee, + gp.log.Error(ctx, logs.GovernanceCantVoteForSideChainCommittee, zap.String("error", err.Error())) } @@ -77,7 +77,7 @@ func (gp *Processor) processAlphabetSync(txHash util.Uint256) bool { // 4. Update FrostFS contract in the mainnet. gp.updateFrostFSContractInMainnet(newAlphabet) - gp.log.Info(context.Background(), logs.GovernanceFinishedAlphabetListUpdate) + gp.log.Info(ctx, logs.GovernanceFinishedAlphabetListUpdate) return true } diff --git a/pkg/innerring/processors/governance/processor.go b/pkg/innerring/processors/governance/processor.go index 6daea417e..eaadfdb4f 100644 --- a/pkg/innerring/processors/governance/processor.go +++ b/pkg/innerring/processors/governance/processor.go @@ -1,6 +1,7 @@ package governance import ( + "context" "errors" "fmt" @@ -38,7 +39,7 @@ type VoteValidatorPrm struct { // Voter is a callback interface for alphabet contract voting. type Voter interface { - VoteForSidechainValidator(VoteValidatorPrm) error + VoteForSidechainValidator(context.Context, VoteValidatorPrm) error } type ( diff --git a/pkg/innerring/processors/netmap/handlers.go b/pkg/innerring/processors/netmap/handlers.go index 478ab5eab..61547e0ba 100644 --- a/pkg/innerring/processors/netmap/handlers.go +++ b/pkg/innerring/processors/netmap/handlers.go @@ -12,13 +12,13 @@ import ( "go.uber.org/zap" ) -func (np *Processor) HandleNewEpochTick(ev event.Event) { +func (np *Processor) HandleNewEpochTick(ctx context.Context, ev event.Event) { _ = ev.(timerEvent.NewEpochTick) np.log.Info(context.Background(), logs.NetmapTick, zap.String("type", "epoch")) // send an event to the worker pool - err := processors.SubmitEvent(np.pool, np.metrics, "netmap_new_epoch_tick", np.processNewEpochTick) + err := processors.SubmitEvent(np.pool, np.metrics, "netmap_new_epoch_tick", func() bool { return np.processNewEpochTick(ctx) }) if err != nil { // there system can be moved into controlled degradation stage np.log.Warn(context.Background(), logs.NetmapNetmapWorkerPoolDrained, @@ -26,28 +26,28 @@ func (np *Processor) HandleNewEpochTick(ev event.Event) { } } -func (np *Processor) handleNewEpoch(ev event.Event) { +func (np *Processor) handleNewEpoch(ctx context.Context, ev event.Event) { epochEvent := ev.(netmapEvent.NewEpoch) - np.log.Info(context.Background(), logs.Notification, + np.log.Info(ctx, logs.Notification, zap.String("type", "new epoch"), zap.Uint64("value", epochEvent.EpochNumber())) // send an event to the worker pool err := processors.SubmitEvent(np.pool, np.metrics, "netmap_new_epoch", func() bool { - return np.processNewEpoch(epochEvent) + return np.processNewEpoch(ctx, epochEvent) }) if err != nil { // there system can be moved into controlled degradation stage - np.log.Warn(context.Background(), logs.NetmapNetmapWorkerPoolDrained, + np.log.Warn(ctx, logs.NetmapNetmapWorkerPoolDrained, zap.Int("capacity", np.pool.Cap())) } } -func (np *Processor) handleAddPeer(ev event.Event) { +func (np *Processor) handleAddPeer(ctx context.Context, ev event.Event) { newPeer := ev.(netmapEvent.AddPeer) - np.log.Info(context.Background(), logs.Notification, + np.log.Info(ctx, logs.Notification, zap.String("type", "add peer"), ) @@ -58,14 +58,14 @@ func (np *Processor) handleAddPeer(ev event.Event) { }) if err != nil { // there system can be moved into controlled degradation stage - np.log.Warn(context.Background(), logs.NetmapNetmapWorkerPoolDrained, + np.log.Warn(ctx, logs.NetmapNetmapWorkerPoolDrained, zap.Int("capacity", np.pool.Cap())) } } -func (np *Processor) handleUpdateState(ev event.Event) { +func (np *Processor) handleUpdateState(ctx context.Context, ev event.Event) { updPeer := ev.(netmapEvent.UpdatePeer) - np.log.Info(context.Background(), logs.Notification, + np.log.Info(ctx, logs.Notification, zap.String("type", "update peer state"), zap.String("key", hex.EncodeToString(updPeer.PublicKey().Bytes()))) @@ -76,21 +76,21 @@ func (np *Processor) handleUpdateState(ev event.Event) { }) if err != nil { // there system can be moved into controlled degradation stage - np.log.Warn(context.Background(), logs.NetmapNetmapWorkerPoolDrained, + np.log.Warn(ctx, logs.NetmapNetmapWorkerPoolDrained, zap.Int("capacity", np.pool.Cap())) } } -func (np *Processor) handleCleanupTick(ev event.Event) { +func (np *Processor) handleCleanupTick(ctx context.Context, ev event.Event) { if !np.netmapSnapshot.enabled { - np.log.Debug(context.Background(), logs.NetmapNetmapCleanUpRoutineIsDisabled518) + np.log.Debug(ctx, logs.NetmapNetmapCleanUpRoutineIsDisabled518) return } cleanup := ev.(netmapCleanupTick) - np.log.Info(context.Background(), logs.NetmapTick, zap.String("type", "netmap cleaner")) + np.log.Info(ctx, logs.NetmapTick, zap.String("type", "netmap cleaner")) // send event to the worker pool err := processors.SubmitEvent(np.pool, np.metrics, "netmap_cleanup_tick", func() bool { @@ -98,7 +98,7 @@ func (np *Processor) handleCleanupTick(ev event.Event) { }) if err != nil { // there system can be moved into controlled degradation stage - np.log.Warn(context.Background(), logs.NetmapNetmapWorkerPoolDrained, + np.log.Warn(ctx, logs.NetmapNetmapWorkerPoolDrained, zap.Int("capacity", np.pool.Cap())) } } diff --git a/pkg/innerring/processors/netmap/handlers_test.go b/pkg/innerring/processors/netmap/handlers_test.go index a53458179..1e8be4095 100644 --- a/pkg/innerring/processors/netmap/handlers_test.go +++ b/pkg/innerring/processors/netmap/handlers_test.go @@ -1,6 +1,7 @@ package netmap import ( + "context" "fmt" "testing" "time" @@ -38,7 +39,7 @@ func TestNewEpochTick(t *testing.T) { require.NoError(t, err, "failed to create processor") ev := timerEvent.NewEpochTick{} - proc.HandleNewEpochTick(ev) + proc.HandleNewEpochTick(context.Background(), ev) for proc.pool.Running() > 0 { time.Sleep(10 * time.Millisecond) @@ -90,7 +91,7 @@ func TestNewEpoch(t *testing.T) { Num: 101, Hash: util.Uint256{101}, } - proc.handleNewEpoch(ev) + proc.handleNewEpoch(context.Background(), ev) for proc.pool.Running() > 0 { time.Sleep(10 * time.Millisecond) @@ -130,7 +131,7 @@ func TestAddPeer(t *testing.T) { MainTransaction: &transaction.Transaction{}, }, } - proc.handleAddPeer(ev) + proc.handleAddPeer(context.Background(), ev) for proc.pool.Running() > 0 { time.Sleep(10 * time.Millisecond) @@ -145,7 +146,7 @@ func TestAddPeer(t *testing.T) { MainTransaction: &transaction.Transaction{}, }, } - proc.handleAddPeer(ev) + proc.handleAddPeer(context.Background(), ev) for proc.pool.Running() > 0 { time.Sleep(10 * time.Millisecond) @@ -188,7 +189,7 @@ func TestUpdateState(t *testing.T) { MainTransaction: &transaction.Transaction{}, }, } - proc.handleUpdateState(ev) + proc.handleUpdateState(context.Background(), ev) for proc.pool.Running() > 0 { time.Sleep(10 * time.Millisecond) @@ -232,7 +233,7 @@ func TestCleanupTick(t *testing.T) { txHash: util.Uint256{123}, } - proc.handleCleanupTick(ev) + proc.handleCleanupTick(context.Background(), ev) for proc.pool.Running() > 0 { time.Sleep(10 * time.Millisecond) @@ -413,6 +414,6 @@ type testEventHandler struct { handledEvents []event.Event } -func (h *testEventHandler) Handle(e event.Event) { +func (h *testEventHandler) Handle(_ context.Context, e event.Event) { h.handledEvents = append(h.handledEvents, e) } diff --git a/pkg/innerring/processors/netmap/process_epoch.go b/pkg/innerring/processors/netmap/process_epoch.go index 8ad295a74..e401ef4f2 100644 --- a/pkg/innerring/processors/netmap/process_epoch.go +++ b/pkg/innerring/processors/netmap/process_epoch.go @@ -11,12 +11,12 @@ import ( // Process new epoch notification by setting global epoch value and resetting // local epoch timer. -func (np *Processor) processNewEpoch(ev netmapEvent.NewEpoch) bool { +func (np *Processor) processNewEpoch(ctx context.Context, ev netmapEvent.NewEpoch) bool { epoch := ev.EpochNumber() epochDuration, err := np.netmapClient.EpochDuration() if err != nil { - np.log.Warn(context.Background(), logs.NetmapCantGetEpochDuration, + np.log.Warn(ctx, logs.NetmapCantGetEpochDuration, zap.String("error", err.Error())) } else { np.epochState.SetEpochDuration(epochDuration) @@ -26,46 +26,46 @@ func (np *Processor) processNewEpoch(ev netmapEvent.NewEpoch) bool { h, err := np.netmapClient.MorphTxHeight(ev.TxHash()) if err != nil { - np.log.Warn(context.Background(), logs.NetmapCantGetTransactionHeight, + np.log.Warn(ctx, logs.NetmapCantGetTransactionHeight, zap.String("hash", ev.TxHash().StringLE()), zap.String("error", err.Error())) } if err := np.epochTimer.ResetEpochTimer(h); err != nil { - np.log.Warn(context.Background(), logs.NetmapCantResetEpochTimer, + np.log.Warn(ctx, logs.NetmapCantResetEpochTimer, zap.String("error", err.Error())) } // get new netmap snapshot networkMap, err := np.netmapClient.NetMap() if err != nil { - np.log.Warn(context.Background(), logs.NetmapCantGetNetmapSnapshotToPerformCleanup, + np.log.Warn(ctx, logs.NetmapCantGetNetmapSnapshotToPerformCleanup, zap.String("error", err.Error())) return false } np.netmapSnapshot.update(*networkMap, epoch) - np.handleCleanupTick(netmapCleanupTick{epoch: epoch, txHash: ev.TxHash()}) - np.handleAlphabetSync(governance.NewSyncEvent(ev.TxHash())) - np.handleNotaryDeposit(ev) + np.handleCleanupTick(ctx, netmapCleanupTick{epoch: epoch, txHash: ev.TxHash()}) + np.handleAlphabetSync(ctx, governance.NewSyncEvent(ev.TxHash())) + np.handleNotaryDeposit(ctx, ev) return true } // Process new epoch tick by invoking new epoch method in network map contract. -func (np *Processor) processNewEpochTick() bool { +func (np *Processor) processNewEpochTick(ctx context.Context) bool { if !np.alphabetState.IsAlphabet() { - np.log.Info(context.Background(), logs.NetmapNonAlphabetModeIgnoreNewEpochTick) + np.log.Info(ctx, logs.NetmapNonAlphabetModeIgnoreNewEpochTick) return true } nextEpoch := np.epochState.EpochCounter() + 1 - np.log.Debug(context.Background(), logs.NetmapNextEpoch, zap.Uint64("value", nextEpoch)) + np.log.Debug(ctx, logs.NetmapNextEpoch, zap.Uint64("value", nextEpoch)) err := np.netmapClient.NewEpoch(nextEpoch) if err != nil { - np.log.Error(context.Background(), logs.NetmapCantInvokeNetmapNewEpoch, zap.Error(err)) + np.log.Error(ctx, logs.NetmapCantInvokeNetmapNewEpoch, zap.Error(err)) return false } diff --git a/pkg/innerring/state.go b/pkg/innerring/state.go index 2dbcd7494..85f332fb6 100644 --- a/pkg/innerring/state.go +++ b/pkg/innerring/state.go @@ -48,8 +48,8 @@ func (s *Server) SetEpochDuration(val uint64) { } // IsActive is a getter for a global active flag state. -func (s *Server) IsActive() bool { - return s.InnerRingIndex() >= 0 +func (s *Server) IsActive(ctx context.Context) bool { + return s.InnerRingIndex(ctx) >= 0 } // IsAlphabet is a getter for a global alphabet flag state. @@ -59,10 +59,10 @@ func (s *Server) IsAlphabet() bool { // InnerRingIndex is a getter for a global index of node in inner ring list. Negative // index means that node is not in the inner ring list. -func (s *Server) InnerRingIndex() int { +func (s *Server) InnerRingIndex(ctx context.Context) int { index, err := s.statusIndex.InnerRingIndex() if err != nil { - s.log.Error(context.Background(), logs.InnerringCantGetInnerRingIndex, zap.String("error", err.Error())) + s.log.Error(ctx, logs.InnerringCantGetInnerRingIndex, zap.String("error", err.Error())) return -1 } @@ -71,10 +71,10 @@ func (s *Server) InnerRingIndex() int { // InnerRingSize is a getter for a global size of inner ring list. This value // paired with inner ring index. -func (s *Server) InnerRingSize() int { +func (s *Server) InnerRingSize(ctx context.Context) int { size, err := s.statusIndex.InnerRingSize() if err != nil { - s.log.Error(context.Background(), logs.InnerringCantGetInnerRingSize, zap.String("error", err.Error())) + s.log.Error(ctx, logs.InnerringCantGetInnerRingSize, zap.String("error", err.Error())) return 0 } @@ -93,18 +93,18 @@ func (s *Server) AlphabetIndex() int { return int(index) } -func (s *Server) voteForSidechainValidator(prm governance.VoteValidatorPrm) error { +func (s *Server) voteForSidechainValidator(ctx context.Context, prm governance.VoteValidatorPrm) error { validators := prm.Validators - index := s.InnerRingIndex() + index := s.InnerRingIndex(ctx) if s.contracts.alphabet.indexOutOfRange(index) { - s.log.Info(context.Background(), logs.InnerringIgnoreValidatorVoteNodeNotInAlphabetRange) + s.log.Info(ctx, logs.InnerringIgnoreValidatorVoteNodeNotInAlphabetRange) return nil } if len(validators) == 0 { - s.log.Info(context.Background(), logs.InnerringIgnoreValidatorVoteEmptyValidatorsList) + s.log.Info(ctx, logs.InnerringIgnoreValidatorVoteEmptyValidatorsList) return nil } @@ -129,7 +129,7 @@ func (s *Server) voteForSidechainValidator(prm governance.VoteValidatorPrm) erro s.contracts.alphabet.iterate(func(letter GlagoliticLetter, contract util.Uint160) { _, err := s.morphClient.NotaryInvoke(contract, s.feeConfig.SideChainFee(), nonce, vubP, voteMethod, epoch, validators) if err != nil { - s.log.Warn(context.Background(), logs.InnerringCantInvokeVoteMethodInAlphabetContract, + s.log.Warn(ctx, logs.InnerringCantInvokeVoteMethodInAlphabetContract, zap.Int8("alphabet_index", int8(letter)), zap.Uint64("epoch", epoch), zap.String("error", err.Error())) @@ -141,9 +141,9 @@ func (s *Server) voteForSidechainValidator(prm governance.VoteValidatorPrm) erro // VoteForSidechainValidator calls vote method on alphabet contracts with // the provided list of keys. -func (s *Server) VoteForSidechainValidator(prm governance.VoteValidatorPrm) error { +func (s *Server) VoteForSidechainValidator(ctx context.Context, prm governance.VoteValidatorPrm) error { sort.Sort(prm.Validators) - return s.voteForSidechainValidator(prm) + return s.voteForSidechainValidator(ctx, prm) } // ResetEpochTimer resets the block timer that produces events to update epoch diff --git a/pkg/innerring/state_test.go b/pkg/innerring/state_test.go index 9313edf78..17ab995af 100644 --- a/pkg/innerring/state_test.go +++ b/pkg/innerring/state_test.go @@ -46,9 +46,9 @@ func TestServerState(t *testing.T) { srv.setHealthStatus(context.Background(), healthStatus) require.Equal(t, healthStatus, srv.HealthStatus(), "invalid health status") - require.True(t, srv.IsActive(), "invalid IsActive result") + require.True(t, srv.IsActive(context.Background()), "invalid IsActive result") require.True(t, srv.IsAlphabet(), "invalid IsAlphabet result") - require.Equal(t, 0, srv.InnerRingIndex(), "invalid IR index") - require.Equal(t, 1, srv.InnerRingSize(), "invalid IR index") + require.Equal(t, 0, srv.InnerRingIndex(context.Background()), "invalid IR index") + require.Equal(t, 1, srv.InnerRingSize(context.Background()), "invalid IR index") require.Equal(t, 0, srv.AlphabetIndex(), "invalid alphabet index") } diff --git a/pkg/morph/event/handlers.go b/pkg/morph/event/handlers.go index 182b4667e..bda83ba54 100644 --- a/pkg/morph/event/handlers.go +++ b/pkg/morph/event/handlers.go @@ -1,11 +1,13 @@ package event import ( + "context" + "github.com/nspcc-dev/neo-go/pkg/core/block" ) // Handler is an Event processing function. -type Handler func(Event) +type Handler func(context.Context, Event) // BlockHandler is a chain block processing function. type BlockHandler func(*block.Block) diff --git a/pkg/morph/event/listener.go b/pkg/morph/event/listener.go index 3d3d806a4..eeec46540 100644 --- a/pkg/morph/event/listener.go +++ b/pkg/morph/event/listener.go @@ -280,7 +280,7 @@ loop: continue loop } - l.handleNotaryEvent(notaryEvent) + l.handleNotaryEvent(ctx, notaryEvent) case b, ok := <-chs.BlockCh: if !ok { l.log.Warn(ctx, logs.EventStopEventListenerByBlockChannel) @@ -307,11 +307,11 @@ func (l *listener) handleBlockEvent(b *block.Block) { } } -func (l *listener) handleNotaryEvent(notaryEvent *result.NotaryRequestEvent) { +func (l *listener) handleNotaryEvent(ctx context.Context, notaryEvent *result.NotaryRequestEvent) { if err := l.pool.Submit(func() { - l.parseAndHandleNotary(notaryEvent) + l.parseAndHandleNotary(ctx, notaryEvent) }); err != nil { - l.log.Warn(context.Background(), logs.EventListenerWorkerPoolDrained, + l.log.Warn(ctx, logs.EventListenerWorkerPoolDrained, zap.Int("capacity", l.pool.Cap())) } } @@ -376,11 +376,11 @@ func (l *listener) parseAndHandleNotification(ctx context.Context, notifyEvent * } for _, handler := range handlers { - handler(event) + handler(ctx, event) } } -func (l *listener) parseAndHandleNotary(nr *result.NotaryRequestEvent) { +func (l *listener) parseAndHandleNotary(ctx context.Context, nr *result.NotaryRequestEvent) { // prepare the notary event notaryEvent, err := l.notaryEventsPreparator.Prepare(nr.NotaryRequest) if err != nil { @@ -388,13 +388,13 @@ func (l *listener) parseAndHandleNotary(nr *result.NotaryRequestEvent) { switch { case errors.Is(err, ErrTXAlreadyHandled): case errors.As(err, &expErr): - l.log.Warn(context.Background(), logs.EventSkipExpiredMainTXNotaryEvent, + l.log.Warn(ctx, logs.EventSkipExpiredMainTXNotaryEvent, zap.String("error", err.Error()), zap.Uint32("current_block_height", expErr.CurrentBlockHeight), zap.Uint32("fallback_tx_not_valid_before_height", expErr.FallbackTXNotValidBeforeHeight), ) default: - l.log.Warn(context.Background(), logs.EventCouldNotPrepareAndValidateNotaryEvent, + l.log.Warn(ctx, logs.EventCouldNotPrepareAndValidateNotaryEvent, zap.String("error", err.Error()), ) } @@ -418,7 +418,7 @@ func (l *listener) parseAndHandleNotary(nr *result.NotaryRequestEvent) { l.mtx.RUnlock() if !ok { - log.Debug(context.Background(), logs.EventNotaryParserNotSet) + log.Debug(ctx, logs.EventNotaryParserNotSet) return } @@ -426,7 +426,7 @@ func (l *listener) parseAndHandleNotary(nr *result.NotaryRequestEvent) { // parse the notary event event, err := parser(notaryEvent) if err != nil { - log.Warn(context.Background(), logs.EventCouldNotParseNotaryEvent, + log.Warn(ctx, logs.EventCouldNotParseNotaryEvent, zap.String("error", err.Error()), ) @@ -439,14 +439,14 @@ func (l *listener) parseAndHandleNotary(nr *result.NotaryRequestEvent) { l.mtx.RUnlock() if !ok { - log.Info(context.Background(), logs.EventNotaryHandlersForParsedNotificationEventWereNotRegistered, + log.Info(ctx, logs.EventNotaryHandlersForParsedNotificationEventWereNotRegistered, zap.Any("event", event), ) return } - handler(event) + handler(ctx, event) } // SetNotificationParser sets the parser of particular contract event. diff --git a/pkg/morph/event/listener_test.go b/pkg/morph/event/listener_test.go index 5f7cf9f43..214daf694 100644 --- a/pkg/morph/event/listener_test.go +++ b/pkg/morph/event/listener_test.go @@ -59,7 +59,7 @@ func TestEventHandling(t *testing.T) { handledNotifications := make([]Event, 0) l.RegisterNotificationHandler(NotificationHandlerInfo{ scriptHashWithType: key, - h: func(e Event) { + h: func(_ context.Context, e Event) { handledNotifications = append(handledNotifications, e) notificationHandled <- true }, diff --git a/pkg/morph/event/utils.go b/pkg/morph/event/utils.go index 31bbf4432..99ea9a7f0 100644 --- a/pkg/morph/event/utils.go +++ b/pkg/morph/event/utils.go @@ -85,12 +85,12 @@ func (s typeValue) GetType() Type { // WorkerPoolHandler sets closure over worker pool w with passed handler h. func WorkerPoolHandler(w util2.WorkerPool, h Handler, log *logger.Logger) Handler { - return func(e Event) { + return func(ctx context.Context, e Event) { err := w.Submit(func() { - h(e) + h(ctx, e) }) if err != nil { - log.Warn(context.Background(), logs.EventCouldNotSubmitHandlerToWorkerPool, + log.Warn(ctx, logs.EventCouldNotSubmitHandlerToWorkerPool, zap.String("error", err.Error()), ) }