[#1437] ir: Fix contextcheck linters

Signed-off-by: Dmitrii Stepanov <d.stepanov@yadro.com>
This commit is contained in:
Dmitrii Stepanov 2024-10-21 12:21:01 +03:00
parent e2c9701089
commit 08dfa8e2db
Signed by: dstepanov-yadro
GPG key ID: 237AF1A763293BC0
27 changed files with 166 additions and 158 deletions

View file

@ -1091,7 +1091,7 @@ func (c *cfg) LocalAddress() network.AddressGroup {
func initLocalStorage(ctx context.Context, c *cfg) { func initLocalStorage(ctx context.Context, c *cfg) {
ls := engine.New(c.engineOpts()...) ls := engine.New(c.engineOpts()...)
addNewEpochAsyncNotificationHandler(c, func(ev event.Event) { addNewEpochAsyncNotificationHandler(c, func(ctx context.Context, ev event.Event) {
ls.HandleNewEpoch(ctx, ev.(netmap2.NewEpoch).EpochNumber()) ls.HandleNewEpoch(ctx, ev.(netmap2.NewEpoch).EpochNumber())
}) })

View file

@ -92,7 +92,7 @@ func configureEACLAndContainerSources(c *cfg, client *cntClient.Client, cnrSrc c
if c.cfgMorph.containerCacheSize > 0 { if c.cfgMorph.containerCacheSize > 0 {
containerCache := newCachedContainerStorage(cnrSrc, c.cfgMorph.cacheTTL, c.cfgMorph.containerCacheSize) containerCache := newCachedContainerStorage(cnrSrc, c.cfgMorph.cacheTTL, c.cfgMorph.containerCacheSize)
subscribeToContainerCreation(c, func(e event.Event) { subscribeToContainerCreation(c, func(ctx context.Context, e event.Event) {
ev := e.(containerEvent.PutSuccess) ev := e.(containerEvent.PutSuccess)
// read owner of the created container in order to update the reading cache. // read owner of the created container in order to update the reading cache.
@ -105,21 +105,21 @@ func configureEACLAndContainerSources(c *cfg, client *cntClient.Client, cnrSrc c
} else { } else {
// unlike removal, we expect successful receive of the container // unlike removal, we expect successful receive of the container
// after successful creation, so logging can be useful // after successful creation, so logging can be useful
c.log.Error(context.Background(), logs.FrostFSNodeReadNewlyCreatedContainerAfterTheNotification, c.log.Error(ctx, logs.FrostFSNodeReadNewlyCreatedContainerAfterTheNotification,
zap.Stringer("id", ev.ID), zap.Stringer("id", ev.ID),
zap.Error(err), zap.Error(err),
) )
} }
c.log.Debug(context.Background(), logs.FrostFSNodeContainerCreationEventsReceipt, c.log.Debug(ctx, logs.FrostFSNodeContainerCreationEventsReceipt,
zap.Stringer("id", ev.ID), zap.Stringer("id", ev.ID),
) )
}) })
subscribeToContainerRemoval(c, func(e event.Event) { subscribeToContainerRemoval(c, func(ctx context.Context, e event.Event) {
ev := e.(containerEvent.DeleteSuccess) ev := e.(containerEvent.DeleteSuccess)
containerCache.handleRemoval(ev.ID) containerCache.handleRemoval(ev.ID)
c.log.Debug(context.Background(), logs.FrostFSNodeContainerRemovalEventsReceipt, c.log.Debug(ctx, logs.FrostFSNodeContainerRemovalEventsReceipt,
zap.Stringer("id", ev.ID), zap.Stringer("id", ev.ID),
) )
}) })

View file

@ -172,11 +172,11 @@ func initNetmapService(ctx context.Context, c *cfg) {
} }
func addNewEpochNotificationHandlers(c *cfg) { func addNewEpochNotificationHandlers(c *cfg) {
addNewEpochNotificationHandler(c, func(ev event.Event) { addNewEpochNotificationHandler(c, func(_ context.Context, ev event.Event) {
c.cfgNetmap.state.setCurrentEpoch(ev.(netmapEvent.NewEpoch).EpochNumber()) c.cfgNetmap.state.setCurrentEpoch(ev.(netmapEvent.NewEpoch).EpochNumber())
}) })
addNewEpochAsyncNotificationHandler(c, func(ev event.Event) { addNewEpochAsyncNotificationHandler(c, func(ctx context.Context, ev event.Event) {
e := ev.(netmapEvent.NewEpoch).EpochNumber() e := ev.(netmapEvent.NewEpoch).EpochNumber()
c.updateContractNodeInfo(e) c.updateContractNodeInfo(e)
@ -186,15 +186,15 @@ func addNewEpochNotificationHandlers(c *cfg) {
} }
if err := c.bootstrap(); err != nil { if err := c.bootstrap(); err != nil {
c.log.Warn(context.Background(), logs.FrostFSNodeCantSendRebootstrapTx, zap.Error(err)) c.log.Warn(ctx, logs.FrostFSNodeCantSendRebootstrapTx, zap.Error(err))
} }
}) })
if c.cfgMorph.notaryEnabled { if c.cfgMorph.notaryEnabled {
addNewEpochAsyncNotificationHandler(c, func(_ event.Event) { addNewEpochAsyncNotificationHandler(c, func(ctx context.Context, _ event.Event) {
_, _, err := makeNotaryDeposit(c) _, _, err := makeNotaryDeposit(c)
if err != nil { if err != nil {
c.log.Error(context.Background(), logs.FrostFSNodeCouldNotMakeNotaryDeposit, c.log.Error(ctx, logs.FrostFSNodeCouldNotMakeNotaryDeposit,
zap.String("error", err.Error()), zap.String("error", err.Error()),
) )
} }

View file

@ -48,7 +48,7 @@ func initSessionService(c *cfg) {
_ = c.privateTokenStore.Close() _ = c.privateTokenStore.Close()
}) })
addNewEpochNotificationHandler(c, func(ev event.Event) { addNewEpochNotificationHandler(c, func(_ context.Context, ev event.Event) {
c.privateTokenStore.RemoveOld(ev.(netmap.NewEpoch).EpochNumber()) c.privateTokenStore.RemoveOld(ev.(netmap.NewEpoch).EpochNumber())
}) })

View file

@ -79,10 +79,10 @@ func initTreeService(c *cfg) {
})) }))
if d := treeConfig.SyncInterval(); d == 0 { if d := treeConfig.SyncInterval(); d == 0 {
addNewEpochNotificationHandler(c, func(_ event.Event) { addNewEpochNotificationHandler(c, func(ctx context.Context, _ event.Event) {
err := c.treeService.SynchronizeAll() err := c.treeService.SynchronizeAll()
if err != nil { if err != nil {
c.log.Error(context.Background(), logs.FrostFSNodeCouldNotSynchronizeTreeService, zap.Error(err)) c.log.Error(ctx, logs.FrostFSNodeCouldNotSynchronizeTreeService, zap.Error(err))
} }
}) })
} else { } else {
@ -102,15 +102,15 @@ func initTreeService(c *cfg) {
}() }()
} }
subscribeToContainerRemoval(c, func(e event.Event) { subscribeToContainerRemoval(c, func(ctx context.Context, e event.Event) {
ev := e.(containerEvent.DeleteSuccess) ev := e.(containerEvent.DeleteSuccess)
// This is executed asynchronously, so we don't care about the operation taking some time. // This is executed asynchronously, so we don't care about the operation taking some time.
c.log.Debug(context.Background(), logs.FrostFSNodeRemovingAllTreesForContainer, zap.Stringer("cid", ev.ID)) c.log.Debug(ctx, logs.FrostFSNodeRemovingAllTreesForContainer, zap.Stringer("cid", ev.ID))
err := c.treeService.DropTree(context.Background(), ev.ID, "") err := c.treeService.DropTree(ctx, ev.ID, "")
if err != nil && !errors.Is(err, pilorama.ErrTreeNotFound) { if err != nil && !errors.Is(err, pilorama.ErrTreeNotFound) {
// Ignore pilorama.ErrTreeNotFound but other errors, including shard.ErrReadOnly, should be logged. // Ignore pilorama.ErrTreeNotFound but other errors, including shard.ErrReadOnly, should be logged.
c.log.Error(context.Background(), logs.FrostFSNodeContainerRemovalEventReceivedButTreesWerentRemoved, c.log.Error(ctx, logs.FrostFSNodeContainerRemovalEventReceivedButTreesWerentRemoved,
zap.Stringer("cid", ev.ID), zap.Stringer("cid", ev.ID),
zap.String("error", err.Error())) zap.String("error", err.Error()))
} }

View file

@ -139,12 +139,12 @@ func (s *Server) enableNotarySupport() error {
return nil return nil
} }
func (s *Server) initNotaryConfig() { func (s *Server) initNotaryConfig(ctx context.Context) {
s.mainNotaryConfig = notaryConfigs( s.mainNotaryConfig = notaryConfigs(
!s.withoutMainNet && s.mainnetClient.ProbeNotary(), // if mainnet disabled then notary flag must be disabled too !s.withoutMainNet && s.mainnetClient.ProbeNotary(), // if mainnet disabled then notary flag must be disabled too
) )
s.log.Info(context.Background(), logs.InnerringNotarySupport, s.log.Info(ctx, logs.InnerringNotarySupport,
zap.Bool("sidechain_enabled", true), zap.Bool("sidechain_enabled", true),
zap.Bool("mainchain_enabled", !s.mainNotaryConfig.disabled), zap.Bool("mainchain_enabled", !s.mainNotaryConfig.disabled),
) )
@ -154,8 +154,8 @@ func (s *Server) createAlphaSync(cfg *viper.Viper, frostfsCli *frostfsClient.Cli
var alphaSync event.Handler var alphaSync event.Handler
if s.withoutMainNet || cfg.GetBool("governance.disable") { if s.withoutMainNet || cfg.GetBool("governance.disable") {
alphaSync = func(event.Event) { alphaSync = func(ctx context.Context, _ event.Event) {
s.log.Debug(context.Background(), logs.InnerringAlphabetKeysSyncIsDisabled) s.log.Debug(ctx, logs.InnerringAlphabetKeysSyncIsDisabled)
} }
} else { } else {
// create governance processor // create governance processor
@ -198,11 +198,11 @@ func (s *Server) createIRFetcher() irFetcher {
return irf return irf
} }
func (s *Server) initTimers(cfg *viper.Viper, morphClients *serverMorphClients) { func (s *Server) initTimers(ctx context.Context, cfg *viper.Viper, morphClients *serverMorphClients) {
s.epochTimer = newEpochTimer(&epochTimerArgs{ s.epochTimer = newEpochTimer(&epochTimerArgs{
l: s.log, l: s.log,
alphabetState: s, alphabetState: s,
newEpochHandlers: s.newEpochTickHandlers(), newEpochHandlers: s.newEpochTickHandlers(ctx),
cnrWrapper: morphClients.CnrClient, cnrWrapper: morphClients.CnrClient,
epoch: s, epoch: s,
stopEstimationDMul: cfg.GetUint32("timers.stop_estimation.mul"), stopEstimationDMul: cfg.GetUint32("timers.stop_estimation.mul"),

View file

@ -152,7 +152,7 @@ func (s *Server) Start(ctx context.Context, intError chan<- error) (err error) {
return err return err
} }
err = s.initConfigFromBlockchain() err = s.initConfigFromBlockchain(ctx)
if err != nil { if err != nil {
return err return err
} }
@ -173,14 +173,14 @@ func (s *Server) Start(ctx context.Context, intError chan<- error) (err error) {
prm.Validators = s.predefinedValidators prm.Validators = s.predefinedValidators
// vote for sidechain validator if it is prepared in config // vote for sidechain validator if it is prepared in config
err = s.voteForSidechainValidator(prm) err = s.voteForSidechainValidator(ctx, prm)
if err != nil { if err != nil {
// we don't stop inner ring execution on this error // we don't stop inner ring execution on this error
s.log.Warn(ctx, logs.InnerringCantVoteForPreparedValidators, s.log.Warn(ctx, logs.InnerringCantVoteForPreparedValidators,
zap.String("error", err.Error())) zap.String("error", err.Error()))
} }
s.tickInitialExpoch() s.tickInitialExpoch(ctx)
morphErr := make(chan error) morphErr := make(chan error)
mainnnetErr := make(chan error) mainnnetErr := make(chan error)
@ -283,11 +283,11 @@ func (s *Server) initSideNotary(ctx context.Context) error {
) )
} }
func (s *Server) tickInitialExpoch() { func (s *Server) tickInitialExpoch(ctx context.Context) {
initialEpochTicker := timer.NewOneTickTimer( initialEpochTicker := timer.NewOneTickTimer(
timer.StaticBlockMeter(s.initialEpochTickDelta), timer.StaticBlockMeter(s.initialEpochTickDelta),
func() { func() {
s.netmapProcessor.HandleNewEpochTick(timerEvent.NewEpochTick{}) s.netmapProcessor.HandleNewEpochTick(ctx, timerEvent.NewEpochTick{})
}) })
s.addBlockTimer(initialEpochTicker) s.addBlockTimer(initialEpochTicker)
} }
@ -376,7 +376,7 @@ func New(ctx context.Context, log *logger.Logger, cfg *viper.Viper, errChan chan
return nil, err return nil, err
} }
server.initNotaryConfig() server.initNotaryConfig(ctx)
err = server.initContracts(cfg) err = server.initContracts(cfg)
if err != nil { if err != nil {
@ -405,7 +405,7 @@ func New(ctx context.Context, log *logger.Logger, cfg *viper.Viper, errChan chan
return nil, err return nil, err
} }
server.initTimers(cfg, morphClients) server.initTimers(ctx, cfg, morphClients)
err = server.initGRPCServer(cfg, log, audit) err = server.initGRPCServer(cfg, log, audit)
if err != nil { if err != nil {
@ -573,7 +573,7 @@ func parseMultinetConfig(cfg *viper.Viper, m metrics.MultinetMetrics) internalNe
return nc return nc
} }
func (s *Server) initConfigFromBlockchain() error { func (s *Server) initConfigFromBlockchain(ctx context.Context) error {
// get current epoch // get current epoch
epoch, err := s.netmapClient.Epoch() epoch, err := s.netmapClient.Epoch()
if err != nil { if err != nil {
@ -602,8 +602,8 @@ func (s *Server) initConfigFromBlockchain() error {
return err return err
} }
s.log.Debug(context.Background(), logs.InnerringReadConfigFromBlockchain, s.log.Debug(ctx, logs.InnerringReadConfigFromBlockchain,
zap.Bool("active", s.IsActive()), zap.Bool("active", s.IsActive(ctx)),
zap.Bool("alphabet", s.IsAlphabet()), zap.Bool("alphabet", s.IsAlphabet()),
zap.Uint64("epoch", epoch), zap.Uint64("epoch", epoch),
zap.Uint32("precision", balancePrecision), zap.Uint32("precision", balancePrecision),
@ -635,17 +635,17 @@ func (s *Server) nextEpochBlockDelta() (uint32, error) {
// onlyAlphabet wrapper around event handler that executes it // onlyAlphabet wrapper around event handler that executes it
// only if inner ring node is alphabet node. // only if inner ring node is alphabet node.
func (s *Server) onlyAlphabetEventHandler(f event.Handler) event.Handler { func (s *Server) onlyAlphabetEventHandler(f event.Handler) event.Handler {
return func(ev event.Event) { return func(ctx context.Context, ev event.Event) {
if s.IsAlphabet() { if s.IsAlphabet() {
f(ev) f(ctx, ev)
} }
} }
} }
func (s *Server) newEpochTickHandlers() []newEpochHandler { func (s *Server) newEpochTickHandlers(ctx context.Context) []newEpochHandler {
newEpochHandlers := []newEpochHandler{ newEpochHandlers := []newEpochHandler{
func() { func() {
s.netmapProcessor.HandleNewEpochTick(timerEvent.NewEpochTick{}) s.netmapProcessor.HandleNewEpochTick(ctx, timerEvent.NewEpochTick{})
}, },
} }

View file

@ -50,16 +50,16 @@ func (s *Server) depositSideNotary() (util.Uint256, error) {
return tx, err return tx, err
} }
func (s *Server) notaryHandler(_ event.Event) { func (s *Server) notaryHandler(ctx context.Context, _ event.Event) {
if !s.mainNotaryConfig.disabled { if !s.mainNotaryConfig.disabled {
_, err := s.depositMainNotary() _, err := s.depositMainNotary()
if err != nil { if err != nil {
s.log.Error(context.Background(), logs.InnerringCantMakeNotaryDepositInMainChain, zap.Error(err)) s.log.Error(ctx, logs.InnerringCantMakeNotaryDepositInMainChain, zap.Error(err))
} }
} }
if _, err := s.depositSideNotary(); err != nil { if _, err := s.depositSideNotary(); err != nil {
s.log.Error(context.Background(), logs.InnerringCantMakeNotaryDepositInSideChain, zap.Error(err)) s.log.Error(ctx, logs.InnerringCantMakeNotaryDepositInSideChain, zap.Error(err))
} }
} }

View file

@ -11,9 +11,9 @@ import (
"go.uber.org/zap" "go.uber.org/zap"
) )
func (bp *Processor) handleLock(ev event.Event) { func (bp *Processor) handleLock(ctx context.Context, ev event.Event) {
lock := ev.(balanceEvent.Lock) lock := ev.(balanceEvent.Lock)
bp.log.Info(context.Background(), logs.Notification, bp.log.Info(ctx, logs.Notification,
zap.String("type", "lock"), zap.String("type", "lock"),
zap.String("value", hex.EncodeToString(lock.ID()))) zap.String("value", hex.EncodeToString(lock.ID())))
@ -24,7 +24,7 @@ func (bp *Processor) handleLock(ev event.Event) {
}) })
if err != nil { if err != nil {
// there system can be moved into controlled degradation stage // there system can be moved into controlled degradation stage
bp.log.Warn(context.Background(), logs.BalanceBalanceWorkerPoolDrained, bp.log.Warn(ctx, logs.BalanceBalanceWorkerPoolDrained,
zap.Int("capacity", bp.pool.Cap())) zap.Int("capacity", bp.pool.Cap()))
} }
} }

View file

@ -1,6 +1,7 @@
package balance package balance
import ( import (
"context"
"testing" "testing"
"time" "time"
@ -30,7 +31,7 @@ func TestProcessorCallsFrostFSContractForLockEvent(t *testing.T) {
}) })
require.NoError(t, err, "failed to create processor") require.NoError(t, err, "failed to create processor")
processor.handleLock(balanceEvent.Lock{}) processor.handleLock(context.Background(), balanceEvent.Lock{})
for processor.pool.Running() > 0 { for processor.pool.Running() > 0 {
time.Sleep(10 * time.Millisecond) time.Sleep(10 * time.Millisecond)
@ -56,7 +57,7 @@ func TestProcessorDoesntCallFrostFSContractIfNotAlphabet(t *testing.T) {
}) })
require.NoError(t, err, "failed to create processor") require.NoError(t, err, "failed to create processor")
processor.handleLock(balanceEvent.Lock{}) processor.handleLock(context.Background(), balanceEvent.Lock{})
for processor.pool.Running() > 0 { for processor.pool.Running() > 0 {
time.Sleep(10 * time.Millisecond) time.Sleep(10 * time.Millisecond)

View file

@ -12,11 +12,11 @@ import (
"go.uber.org/zap" "go.uber.org/zap"
) )
func (cp *Processor) handlePut(ev event.Event) { func (cp *Processor) handlePut(ctx context.Context, ev event.Event) {
put := ev.(putEvent) put := ev.(putEvent)
id := sha256.Sum256(put.Container()) id := sha256.Sum256(put.Container())
cp.log.Info(context.Background(), logs.Notification, cp.log.Info(ctx, logs.Notification,
zap.String("type", "container put"), zap.String("type", "container put"),
zap.String("id", base58.Encode(id[:]))) zap.String("id", base58.Encode(id[:])))
@ -27,14 +27,14 @@ func (cp *Processor) handlePut(ev event.Event) {
}) })
if err != nil { if err != nil {
// there system can be moved into controlled degradation stage // there system can be moved into controlled degradation stage
cp.log.Warn(context.Background(), logs.ContainerContainerProcessorWorkerPoolDrained, cp.log.Warn(ctx, logs.ContainerContainerProcessorWorkerPoolDrained,
zap.Int("capacity", cp.pool.Cap())) zap.Int("capacity", cp.pool.Cap()))
} }
} }
func (cp *Processor) handleDelete(ev event.Event) { func (cp *Processor) handleDelete(ctx context.Context, ev event.Event) {
del := ev.(containerEvent.Delete) del := ev.(containerEvent.Delete)
cp.log.Info(context.Background(), logs.Notification, cp.log.Info(ctx, logs.Notification,
zap.String("type", "container delete"), zap.String("type", "container delete"),
zap.String("id", base58.Encode(del.ContainerID()))) zap.String("id", base58.Encode(del.ContainerID())))
@ -45,7 +45,7 @@ func (cp *Processor) handleDelete(ev event.Event) {
}) })
if err != nil { if err != nil {
// there system can be moved into controlled degradation stage // there system can be moved into controlled degradation stage
cp.log.Warn(context.Background(), logs.ContainerContainerProcessorWorkerPoolDrained, cp.log.Warn(ctx, logs.ContainerContainerProcessorWorkerPoolDrained,
zap.Int("capacity", cp.pool.Cap())) zap.Int("capacity", cp.pool.Cap()))
} }
} }

View file

@ -1,6 +1,7 @@
package container package container
import ( import (
"context"
"crypto/ecdsa" "crypto/ecdsa"
"encoding/hex" "encoding/hex"
"testing" "testing"
@ -71,7 +72,7 @@ func TestPutEvent(t *testing.T) {
nr: nr, nr: nr,
} }
proc.handlePut(event) proc.handlePut(context.Background(), event)
for proc.pool.Running() > 0 { for proc.pool.Running() > 0 {
time.Sleep(10 * time.Millisecond) time.Sleep(10 * time.Millisecond)
@ -143,7 +144,7 @@ func TestDeleteEvent(t *testing.T) {
Signature: signature, Signature: signature,
} }
proc.handleDelete(ev) proc.handleDelete(context.Background(), ev)
for proc.pool.Running() > 0 { for proc.pool.Running() > 0 {
time.Sleep(10 * time.Millisecond) time.Sleep(10 * time.Millisecond)

View file

@ -13,11 +13,11 @@ import (
"go.uber.org/zap" "go.uber.org/zap"
) )
func (np *Processor) handleDeposit(ev event.Event) { func (np *Processor) handleDeposit(ctx context.Context, ev event.Event) {
deposit := ev.(frostfsEvent.Deposit) deposit := ev.(frostfsEvent.Deposit)
depositIDBin := bytes.Clone(deposit.ID()) depositIDBin := bytes.Clone(deposit.ID())
slices.Reverse(depositIDBin) slices.Reverse(depositIDBin)
np.log.Info(context.Background(), logs.Notification, np.log.Info(ctx, logs.Notification,
zap.String("type", "deposit"), zap.String("type", "deposit"),
zap.String("id", hex.EncodeToString(depositIDBin))) zap.String("id", hex.EncodeToString(depositIDBin)))
@ -28,16 +28,16 @@ func (np *Processor) handleDeposit(ev event.Event) {
}) })
if err != nil { if err != nil {
// there system can be moved into controlled degradation stage // there system can be moved into controlled degradation stage
np.log.Warn(context.Background(), logs.FrostFSFrostfsProcessorWorkerPoolDrained, np.log.Warn(ctx, logs.FrostFSFrostfsProcessorWorkerPoolDrained,
zap.Int("capacity", np.pool.Cap())) zap.Int("capacity", np.pool.Cap()))
} }
} }
func (np *Processor) handleWithdraw(ev event.Event) { func (np *Processor) handleWithdraw(ctx context.Context, ev event.Event) {
withdraw := ev.(frostfsEvent.Withdraw) withdraw := ev.(frostfsEvent.Withdraw)
withdrawBin := bytes.Clone(withdraw.ID()) withdrawBin := bytes.Clone(withdraw.ID())
slices.Reverse(withdrawBin) slices.Reverse(withdrawBin)
np.log.Info(context.Background(), logs.Notification, np.log.Info(ctx, logs.Notification,
zap.String("type", "withdraw"), zap.String("type", "withdraw"),
zap.String("id", hex.EncodeToString(withdrawBin))) zap.String("id", hex.EncodeToString(withdrawBin)))
@ -48,14 +48,14 @@ func (np *Processor) handleWithdraw(ev event.Event) {
}) })
if err != nil { if err != nil {
// there system can be moved into controlled degradation stage // there system can be moved into controlled degradation stage
np.log.Warn(context.Background(), logs.FrostFSFrostfsProcessorWorkerPoolDrained, np.log.Warn(ctx, logs.FrostFSFrostfsProcessorWorkerPoolDrained,
zap.Int("capacity", np.pool.Cap())) zap.Int("capacity", np.pool.Cap()))
} }
} }
func (np *Processor) handleCheque(ev event.Event) { func (np *Processor) handleCheque(ctx context.Context, ev event.Event) {
cheque := ev.(frostfsEvent.Cheque) cheque := ev.(frostfsEvent.Cheque)
np.log.Info(context.Background(), logs.Notification, np.log.Info(ctx, logs.Notification,
zap.String("type", "cheque"), zap.String("type", "cheque"),
zap.String("id", hex.EncodeToString(cheque.ID()))) zap.String("id", hex.EncodeToString(cheque.ID())))
@ -66,14 +66,14 @@ func (np *Processor) handleCheque(ev event.Event) {
}) })
if err != nil { if err != nil {
// there system can be moved into controlled degradation stage // there system can be moved into controlled degradation stage
np.log.Warn(context.Background(), logs.FrostFSFrostfsProcessorWorkerPoolDrained, np.log.Warn(ctx, logs.FrostFSFrostfsProcessorWorkerPoolDrained,
zap.Int("capacity", np.pool.Cap())) zap.Int("capacity", np.pool.Cap()))
} }
} }
func (np *Processor) handleConfig(ev event.Event) { func (np *Processor) handleConfig(ctx context.Context, ev event.Event) {
cfg := ev.(frostfsEvent.Config) cfg := ev.(frostfsEvent.Config)
np.log.Info(context.Background(), logs.Notification, np.log.Info(ctx, logs.Notification,
zap.String("type", "set config"), zap.String("type", "set config"),
zap.String("key", hex.EncodeToString(cfg.Key())), zap.String("key", hex.EncodeToString(cfg.Key())),
zap.String("value", hex.EncodeToString(cfg.Value()))) zap.String("value", hex.EncodeToString(cfg.Value())))
@ -85,7 +85,7 @@ func (np *Processor) handleConfig(ev event.Event) {
}) })
if err != nil { if err != nil {
// there system can be moved into controlled degradation stage // there system can be moved into controlled degradation stage
np.log.Warn(context.Background(), logs.FrostFSFrostfsProcessorWorkerPoolDrained, np.log.Warn(ctx, logs.FrostFSFrostfsProcessorWorkerPoolDrained,
zap.Int("capacity", np.pool.Cap())) zap.Int("capacity", np.pool.Cap()))
} }
} }

View file

@ -1,6 +1,7 @@
package frostfs package frostfs
import ( import (
"context"
"testing" "testing"
"time" "time"
@ -36,7 +37,7 @@ func TestHandleDeposit(t *testing.T) {
AmountValue: 1000, AmountValue: 1000,
} }
proc.handleDeposit(ev) proc.handleDeposit(context.Background(), ev)
for proc.pool.Running() > 0 { for proc.pool.Running() > 0 {
time.Sleep(10 * time.Millisecond) time.Sleep(10 * time.Millisecond)
@ -57,7 +58,7 @@ func TestHandleDeposit(t *testing.T) {
es.epochCounter = 109 es.epochCounter = 109
proc.handleDeposit(ev) proc.handleDeposit(context.Background(), ev)
for proc.pool.Running() > 0 { for proc.pool.Running() > 0 {
time.Sleep(10 * time.Millisecond) time.Sleep(10 * time.Millisecond)
@ -98,7 +99,7 @@ func TestHandleWithdraw(t *testing.T) {
AmountValue: 1000, AmountValue: 1000,
} }
proc.handleWithdraw(ev) proc.handleWithdraw(context.Background(), ev)
for proc.pool.Running() > 0 { for proc.pool.Running() > 0 {
time.Sleep(10 * time.Millisecond) time.Sleep(10 * time.Millisecond)
@ -139,7 +140,7 @@ func TestHandleCheque(t *testing.T) {
LockValue: util.Uint160{200}, LockValue: util.Uint160{200},
} }
proc.handleCheque(ev) proc.handleCheque(context.Background(), ev)
for proc.pool.Running() > 0 { for proc.pool.Running() > 0 {
time.Sleep(10 * time.Millisecond) time.Sleep(10 * time.Millisecond)
@ -176,7 +177,7 @@ func TestHandleConfig(t *testing.T) {
TxHashValue: util.Uint256{100}, TxHashValue: util.Uint256{100},
} }
proc.handleConfig(ev) proc.handleConfig(context.Background(), ev)
for proc.pool.Running() > 0 { for proc.pool.Running() > 0 {
time.Sleep(10 * time.Millisecond) time.Sleep(10 * time.Millisecond)

View file

@ -13,7 +13,7 @@ import (
"go.uber.org/zap" "go.uber.org/zap"
) )
func (gp *Processor) HandleAlphabetSync(e event.Event) { func (gp *Processor) HandleAlphabetSync(ctx context.Context, e event.Event) {
var ( var (
typ string typ string
hash util.Uint256 hash util.Uint256
@ -34,16 +34,16 @@ func (gp *Processor) HandleAlphabetSync(e event.Event) {
return return
} }
gp.log.Info(context.Background(), logs.GovernanceNewEvent, zap.String("type", typ)) gp.log.Info(ctx, logs.GovernanceNewEvent, zap.String("type", typ))
// send event to the worker pool // send event to the worker pool
err := processors.SubmitEvent(gp.pool, gp.metrics, "alphabet_sync", func() bool { err := processors.SubmitEvent(gp.pool, gp.metrics, "alphabet_sync", func() bool {
return gp.processAlphabetSync(hash) return gp.processAlphabetSync(ctx, hash)
}) })
if err != nil { if err != nil {
// there system can be moved into controlled degradation stage // there system can be moved into controlled degradation stage
gp.log.Warn(context.Background(), logs.GovernanceGovernanceWorkerPoolDrained, gp.log.Warn(ctx, logs.GovernanceGovernanceWorkerPoolDrained,
zap.Int("capacity", gp.pool.Cap())) zap.Int("capacity", gp.pool.Cap()))
} }
} }

View file

@ -1,6 +1,7 @@
package governance package governance
import ( import (
"context"
"encoding/binary" "encoding/binary"
"sort" "sort"
"testing" "testing"
@ -57,7 +58,7 @@ func TestHandleAlphabetSyncEvent(t *testing.T) {
txHash: util.Uint256{100}, txHash: util.Uint256{100},
} }
proc.HandleAlphabetSync(ev) proc.HandleAlphabetSync(context.Background(), ev)
for proc.pool.Running() > 0 { for proc.pool.Running() > 0 {
time.Sleep(10 * time.Millisecond) time.Sleep(10 * time.Millisecond)
@ -133,7 +134,7 @@ func TestHandleAlphabetDesignateEvent(t *testing.T) {
Role: noderoles.NeoFSAlphabet, Role: noderoles.NeoFSAlphabet,
} }
proc.HandleAlphabetSync(ev) proc.HandleAlphabetSync(context.Background(), ev)
for proc.pool.Running() > 0 { for proc.pool.Running() > 0 {
time.Sleep(10 * time.Millisecond) time.Sleep(10 * time.Millisecond)
@ -226,7 +227,7 @@ type testVoter struct {
votes []VoteValidatorPrm votes []VoteValidatorPrm
} }
func (v *testVoter) VoteForSidechainValidator(prm VoteValidatorPrm) error { func (v *testVoter) VoteForSidechainValidator(_ context.Context, prm VoteValidatorPrm) error {
v.votes = append(v.votes, prm) v.votes = append(v.votes, prm)
return nil return nil
} }

View file

@ -19,39 +19,39 @@ const (
alphabetUpdateIDPrefix = "AlphabetUpdate" alphabetUpdateIDPrefix = "AlphabetUpdate"
) )
func (gp *Processor) processAlphabetSync(txHash util.Uint256) bool { func (gp *Processor) processAlphabetSync(ctx context.Context, txHash util.Uint256) bool {
if !gp.alphabetState.IsAlphabet() { if !gp.alphabetState.IsAlphabet() {
gp.log.Info(context.Background(), logs.GovernanceNonAlphabetModeIgnoreAlphabetSync) gp.log.Info(ctx, logs.GovernanceNonAlphabetModeIgnoreAlphabetSync)
return true return true
} }
mainnetAlphabet, err := gp.mainnetClient.NeoFSAlphabetList() mainnetAlphabet, err := gp.mainnetClient.NeoFSAlphabetList()
if err != nil { if err != nil {
gp.log.Error(context.Background(), logs.GovernanceCantFetchAlphabetListFromMainNet, gp.log.Error(ctx, logs.GovernanceCantFetchAlphabetListFromMainNet,
zap.String("error", err.Error())) zap.String("error", err.Error()))
return false return false
} }
sidechainAlphabet, err := gp.morphClient.Committee() sidechainAlphabet, err := gp.morphClient.Committee()
if err != nil { if err != nil {
gp.log.Error(context.Background(), logs.GovernanceCantFetchAlphabetListFromSideChain, gp.log.Error(ctx, logs.GovernanceCantFetchAlphabetListFromSideChain,
zap.String("error", err.Error())) zap.String("error", err.Error()))
return false return false
} }
newAlphabet, err := newAlphabetList(sidechainAlphabet, mainnetAlphabet) newAlphabet, err := newAlphabetList(sidechainAlphabet, mainnetAlphabet)
if err != nil { if err != nil {
gp.log.Error(context.Background(), logs.GovernanceCantMergeAlphabetListsFromMainNetAndSideChain, gp.log.Error(ctx, logs.GovernanceCantMergeAlphabetListsFromMainNetAndSideChain,
zap.String("error", err.Error())) zap.String("error", err.Error()))
return false return false
} }
if newAlphabet == nil { if newAlphabet == nil {
gp.log.Info(context.Background(), logs.GovernanceNoGovernanceUpdateAlphabetListHasNotBeenChanged) gp.log.Info(ctx, logs.GovernanceNoGovernanceUpdateAlphabetListHasNotBeenChanged)
return true return true
} }
gp.log.Info(context.Background(), logs.GovernanceAlphabetListHasBeenChangedStartingUpdate, gp.log.Info(ctx, logs.GovernanceAlphabetListHasBeenChangedStartingUpdate,
zap.String("side_chain_alphabet", prettyKeys(sidechainAlphabet)), zap.String("side_chain_alphabet", prettyKeys(sidechainAlphabet)),
zap.String("new_alphabet", prettyKeys(newAlphabet)), zap.String("new_alphabet", prettyKeys(newAlphabet)),
) )
@ -62,9 +62,9 @@ func (gp *Processor) processAlphabetSync(txHash util.Uint256) bool {
} }
// 1. Vote to sidechain committee via alphabet contracts. // 1. Vote to sidechain committee via alphabet contracts.
err = gp.voter.VoteForSidechainValidator(votePrm) err = gp.voter.VoteForSidechainValidator(ctx, votePrm)
if err != nil { if err != nil {
gp.log.Error(context.Background(), logs.GovernanceCantVoteForSideChainCommittee, gp.log.Error(ctx, logs.GovernanceCantVoteForSideChainCommittee,
zap.String("error", err.Error())) zap.String("error", err.Error()))
} }
@ -77,7 +77,7 @@ func (gp *Processor) processAlphabetSync(txHash util.Uint256) bool {
// 4. Update FrostFS contract in the mainnet. // 4. Update FrostFS contract in the mainnet.
gp.updateFrostFSContractInMainnet(newAlphabet) gp.updateFrostFSContractInMainnet(newAlphabet)
gp.log.Info(context.Background(), logs.GovernanceFinishedAlphabetListUpdate) gp.log.Info(ctx, logs.GovernanceFinishedAlphabetListUpdate)
return true return true
} }

View file

@ -1,6 +1,7 @@
package governance package governance
import ( import (
"context"
"errors" "errors"
"fmt" "fmt"
@ -38,7 +39,7 @@ type VoteValidatorPrm struct {
// Voter is a callback interface for alphabet contract voting. // Voter is a callback interface for alphabet contract voting.
type Voter interface { type Voter interface {
VoteForSidechainValidator(VoteValidatorPrm) error VoteForSidechainValidator(context.Context, VoteValidatorPrm) error
} }
type ( type (

View file

@ -12,13 +12,13 @@ import (
"go.uber.org/zap" "go.uber.org/zap"
) )
func (np *Processor) HandleNewEpochTick(ev event.Event) { func (np *Processor) HandleNewEpochTick(ctx context.Context, ev event.Event) {
_ = ev.(timerEvent.NewEpochTick) _ = ev.(timerEvent.NewEpochTick)
np.log.Info(context.Background(), logs.NetmapTick, zap.String("type", "epoch")) np.log.Info(context.Background(), logs.NetmapTick, zap.String("type", "epoch"))
// send an event to the worker pool // send an event to the worker pool
err := processors.SubmitEvent(np.pool, np.metrics, "netmap_new_epoch_tick", np.processNewEpochTick) err := processors.SubmitEvent(np.pool, np.metrics, "netmap_new_epoch_tick", func() bool { return np.processNewEpochTick(ctx) })
if err != nil { if err != nil {
// there system can be moved into controlled degradation stage // there system can be moved into controlled degradation stage
np.log.Warn(context.Background(), logs.NetmapNetmapWorkerPoolDrained, np.log.Warn(context.Background(), logs.NetmapNetmapWorkerPoolDrained,
@ -26,28 +26,28 @@ func (np *Processor) HandleNewEpochTick(ev event.Event) {
} }
} }
func (np *Processor) handleNewEpoch(ev event.Event) { func (np *Processor) handleNewEpoch(ctx context.Context, ev event.Event) {
epochEvent := ev.(netmapEvent.NewEpoch) epochEvent := ev.(netmapEvent.NewEpoch)
np.log.Info(context.Background(), logs.Notification, np.log.Info(ctx, logs.Notification,
zap.String("type", "new epoch"), zap.String("type", "new epoch"),
zap.Uint64("value", epochEvent.EpochNumber())) zap.Uint64("value", epochEvent.EpochNumber()))
// send an event to the worker pool // send an event to the worker pool
err := processors.SubmitEvent(np.pool, np.metrics, "netmap_new_epoch", func() bool { err := processors.SubmitEvent(np.pool, np.metrics, "netmap_new_epoch", func() bool {
return np.processNewEpoch(epochEvent) return np.processNewEpoch(ctx, epochEvent)
}) })
if err != nil { if err != nil {
// there system can be moved into controlled degradation stage // there system can be moved into controlled degradation stage
np.log.Warn(context.Background(), logs.NetmapNetmapWorkerPoolDrained, np.log.Warn(ctx, logs.NetmapNetmapWorkerPoolDrained,
zap.Int("capacity", np.pool.Cap())) zap.Int("capacity", np.pool.Cap()))
} }
} }
func (np *Processor) handleAddPeer(ev event.Event) { func (np *Processor) handleAddPeer(ctx context.Context, ev event.Event) {
newPeer := ev.(netmapEvent.AddPeer) newPeer := ev.(netmapEvent.AddPeer)
np.log.Info(context.Background(), logs.Notification, np.log.Info(ctx, logs.Notification,
zap.String("type", "add peer"), zap.String("type", "add peer"),
) )
@ -58,14 +58,14 @@ func (np *Processor) handleAddPeer(ev event.Event) {
}) })
if err != nil { if err != nil {
// there system can be moved into controlled degradation stage // there system can be moved into controlled degradation stage
np.log.Warn(context.Background(), logs.NetmapNetmapWorkerPoolDrained, np.log.Warn(ctx, logs.NetmapNetmapWorkerPoolDrained,
zap.Int("capacity", np.pool.Cap())) zap.Int("capacity", np.pool.Cap()))
} }
} }
func (np *Processor) handleUpdateState(ev event.Event) { func (np *Processor) handleUpdateState(ctx context.Context, ev event.Event) {
updPeer := ev.(netmapEvent.UpdatePeer) updPeer := ev.(netmapEvent.UpdatePeer)
np.log.Info(context.Background(), logs.Notification, np.log.Info(ctx, logs.Notification,
zap.String("type", "update peer state"), zap.String("type", "update peer state"),
zap.String("key", hex.EncodeToString(updPeer.PublicKey().Bytes()))) zap.String("key", hex.EncodeToString(updPeer.PublicKey().Bytes())))
@ -76,21 +76,21 @@ func (np *Processor) handleUpdateState(ev event.Event) {
}) })
if err != nil { if err != nil {
// there system can be moved into controlled degradation stage // there system can be moved into controlled degradation stage
np.log.Warn(context.Background(), logs.NetmapNetmapWorkerPoolDrained, np.log.Warn(ctx, logs.NetmapNetmapWorkerPoolDrained,
zap.Int("capacity", np.pool.Cap())) zap.Int("capacity", np.pool.Cap()))
} }
} }
func (np *Processor) handleCleanupTick(ev event.Event) { func (np *Processor) handleCleanupTick(ctx context.Context, ev event.Event) {
if !np.netmapSnapshot.enabled { if !np.netmapSnapshot.enabled {
np.log.Debug(context.Background(), logs.NetmapNetmapCleanUpRoutineIsDisabled518) np.log.Debug(ctx, logs.NetmapNetmapCleanUpRoutineIsDisabled518)
return return
} }
cleanup := ev.(netmapCleanupTick) cleanup := ev.(netmapCleanupTick)
np.log.Info(context.Background(), logs.NetmapTick, zap.String("type", "netmap cleaner")) np.log.Info(ctx, logs.NetmapTick, zap.String("type", "netmap cleaner"))
// send event to the worker pool // send event to the worker pool
err := processors.SubmitEvent(np.pool, np.metrics, "netmap_cleanup_tick", func() bool { err := processors.SubmitEvent(np.pool, np.metrics, "netmap_cleanup_tick", func() bool {
@ -98,7 +98,7 @@ func (np *Processor) handleCleanupTick(ev event.Event) {
}) })
if err != nil { if err != nil {
// there system can be moved into controlled degradation stage // there system can be moved into controlled degradation stage
np.log.Warn(context.Background(), logs.NetmapNetmapWorkerPoolDrained, np.log.Warn(ctx, logs.NetmapNetmapWorkerPoolDrained,
zap.Int("capacity", np.pool.Cap())) zap.Int("capacity", np.pool.Cap()))
} }
} }

View file

@ -1,6 +1,7 @@
package netmap package netmap
import ( import (
"context"
"fmt" "fmt"
"testing" "testing"
"time" "time"
@ -39,7 +40,7 @@ func TestNewEpochTick(t *testing.T) {
require.NoError(t, err, "failed to create processor") require.NoError(t, err, "failed to create processor")
ev := timerEvent.NewEpochTick{} ev := timerEvent.NewEpochTick{}
proc.HandleNewEpochTick(ev) proc.HandleNewEpochTick(context.Background(), ev)
for proc.pool.Running() > 0 { for proc.pool.Running() > 0 {
time.Sleep(10 * time.Millisecond) time.Sleep(10 * time.Millisecond)
@ -93,7 +94,7 @@ func TestNewEpoch(t *testing.T) {
Num: 101, Num: 101,
Hash: util.Uint256{101}, Hash: util.Uint256{101},
} }
proc.handleNewEpoch(ev) proc.handleNewEpoch(context.Background(), ev)
for proc.pool.Running() > 0 { for proc.pool.Running() > 0 {
time.Sleep(10 * time.Millisecond) time.Sleep(10 * time.Millisecond)
@ -138,7 +139,7 @@ func TestAddPeer(t *testing.T) {
MainTransaction: &transaction.Transaction{}, MainTransaction: &transaction.Transaction{},
}, },
} }
proc.handleAddPeer(ev) proc.handleAddPeer(context.Background(), ev)
for proc.pool.Running() > 0 { for proc.pool.Running() > 0 {
time.Sleep(10 * time.Millisecond) time.Sleep(10 * time.Millisecond)
@ -153,7 +154,7 @@ func TestAddPeer(t *testing.T) {
MainTransaction: &transaction.Transaction{}, MainTransaction: &transaction.Transaction{},
}, },
} }
proc.handleAddPeer(ev) proc.handleAddPeer(context.Background(), ev)
for proc.pool.Running() > 0 { for proc.pool.Running() > 0 {
time.Sleep(10 * time.Millisecond) time.Sleep(10 * time.Millisecond)
@ -196,7 +197,7 @@ func TestUpdateState(t *testing.T) {
MainTransaction: &transaction.Transaction{}, MainTransaction: &transaction.Transaction{},
}, },
} }
proc.handleUpdateState(ev) proc.handleUpdateState(context.Background(), ev)
for proc.pool.Running() > 0 { for proc.pool.Running() > 0 {
time.Sleep(10 * time.Millisecond) time.Sleep(10 * time.Millisecond)
@ -240,7 +241,7 @@ func TestCleanupTick(t *testing.T) {
txHash: util.Uint256{123}, txHash: util.Uint256{123},
} }
proc.handleCleanupTick(ev) proc.handleCleanupTick(context.Background(), ev)
for proc.pool.Running() > 0 { for proc.pool.Running() > 0 {
time.Sleep(10 * time.Millisecond) time.Sleep(10 * time.Millisecond)
@ -432,6 +433,6 @@ type testEventHandler struct {
handledEvents []event.Event handledEvents []event.Event
} }
func (h *testEventHandler) Handle(e event.Event) { func (h *testEventHandler) Handle(_ context.Context, e event.Event) {
h.handledEvents = append(h.handledEvents, e) h.handledEvents = append(h.handledEvents, e)
} }

View file

@ -12,12 +12,12 @@ import (
// Process new epoch notification by setting global epoch value and resetting // Process new epoch notification by setting global epoch value and resetting
// local epoch timer. // local epoch timer.
func (np *Processor) processNewEpoch(ev netmapEvent.NewEpoch) bool { func (np *Processor) processNewEpoch(ctx context.Context, ev netmapEvent.NewEpoch) bool {
epoch := ev.EpochNumber() epoch := ev.EpochNumber()
epochDuration, err := np.netmapClient.EpochDuration() epochDuration, err := np.netmapClient.EpochDuration()
if err != nil { if err != nil {
np.log.Warn(context.Background(), logs.NetmapCantGetEpochDuration, np.log.Warn(ctx, logs.NetmapCantGetEpochDuration,
zap.String("error", err.Error())) zap.String("error", err.Error()))
} else { } else {
np.epochState.SetEpochDuration(epochDuration) np.epochState.SetEpochDuration(epochDuration)
@ -27,20 +27,20 @@ func (np *Processor) processNewEpoch(ev netmapEvent.NewEpoch) bool {
h, err := np.netmapClient.MorphTxHeight(ev.TxHash()) h, err := np.netmapClient.MorphTxHeight(ev.TxHash())
if err != nil { if err != nil {
np.log.Warn(context.Background(), logs.NetmapCantGetTransactionHeight, np.log.Warn(ctx, logs.NetmapCantGetTransactionHeight,
zap.String("hash", ev.TxHash().StringLE()), zap.String("hash", ev.TxHash().StringLE()),
zap.String("error", err.Error())) zap.String("error", err.Error()))
} }
if err := np.epochTimer.ResetEpochTimer(h); err != nil { if err := np.epochTimer.ResetEpochTimer(h); err != nil {
np.log.Warn(context.Background(), logs.NetmapCantResetEpochTimer, np.log.Warn(ctx, logs.NetmapCantResetEpochTimer,
zap.String("error", err.Error())) zap.String("error", err.Error()))
} }
// get new netmap snapshot // get new netmap snapshot
networkMap, err := np.netmapClient.NetMap() networkMap, err := np.netmapClient.NetMap()
if err != nil { if err != nil {
np.log.Warn(context.Background(), logs.NetmapCantGetNetmapSnapshotToPerformCleanup, np.log.Warn(ctx, logs.NetmapCantGetNetmapSnapshotToPerformCleanup,
zap.String("error", err.Error())) zap.String("error", err.Error()))
return false return false
@ -54,33 +54,33 @@ func (np *Processor) processNewEpoch(ev netmapEvent.NewEpoch) bool {
if epoch > 0 && np.alphabetState.IsAlphabet() { // estimates are invalid in genesis epoch if epoch > 0 && np.alphabetState.IsAlphabet() { // estimates are invalid in genesis epoch
err = np.containerWrp.StartEstimation(prm) err = np.containerWrp.StartEstimation(prm)
if err != nil { if err != nil {
np.log.Warn(context.Background(), logs.NetmapCantStartContainerSizeEstimation, np.log.Warn(ctx, logs.NetmapCantStartContainerSizeEstimation,
zap.Uint64("epoch", epoch), zap.Uint64("epoch", epoch),
zap.String("error", err.Error())) zap.String("error", err.Error()))
} }
} }
np.netmapSnapshot.update(*networkMap, epoch) np.netmapSnapshot.update(*networkMap, epoch)
np.handleCleanupTick(netmapCleanupTick{epoch: epoch, txHash: ev.TxHash()}) np.handleCleanupTick(ctx, netmapCleanupTick{epoch: epoch, txHash: ev.TxHash()})
np.handleAlphabetSync(governance.NewSyncEvent(ev.TxHash())) np.handleAlphabetSync(ctx, governance.NewSyncEvent(ev.TxHash()))
np.handleNotaryDeposit(ev) np.handleNotaryDeposit(ctx, ev)
return true return true
} }
// Process new epoch tick by invoking new epoch method in network map contract. // Process new epoch tick by invoking new epoch method in network map contract.
func (np *Processor) processNewEpochTick() bool { func (np *Processor) processNewEpochTick(ctx context.Context) bool {
if !np.alphabetState.IsAlphabet() { if !np.alphabetState.IsAlphabet() {
np.log.Info(context.Background(), logs.NetmapNonAlphabetModeIgnoreNewEpochTick) np.log.Info(ctx, logs.NetmapNonAlphabetModeIgnoreNewEpochTick)
return true return true
} }
nextEpoch := np.epochState.EpochCounter() + 1 nextEpoch := np.epochState.EpochCounter() + 1
np.log.Debug(context.Background(), logs.NetmapNextEpoch, zap.Uint64("value", nextEpoch)) np.log.Debug(ctx, logs.NetmapNextEpoch, zap.Uint64("value", nextEpoch))
err := np.netmapClient.NewEpoch(nextEpoch) err := np.netmapClient.NewEpoch(nextEpoch)
if err != nil { if err != nil {
np.log.Error(context.Background(), logs.NetmapCantInvokeNetmapNewEpoch, zap.Error(err)) np.log.Error(ctx, logs.NetmapCantInvokeNetmapNewEpoch, zap.Error(err))
return false return false
} }

View file

@ -48,8 +48,8 @@ func (s *Server) SetEpochDuration(val uint64) {
} }
// IsActive is a getter for a global active flag state. // IsActive is a getter for a global active flag state.
func (s *Server) IsActive() bool { func (s *Server) IsActive(ctx context.Context) bool {
return s.InnerRingIndex() >= 0 return s.InnerRingIndex(ctx) >= 0
} }
// IsAlphabet is a getter for a global alphabet flag state. // IsAlphabet is a getter for a global alphabet flag state.
@ -59,10 +59,10 @@ func (s *Server) IsAlphabet() bool {
// InnerRingIndex is a getter for a global index of node in inner ring list. Negative // InnerRingIndex is a getter for a global index of node in inner ring list. Negative
// index means that node is not in the inner ring list. // index means that node is not in the inner ring list.
func (s *Server) InnerRingIndex() int { func (s *Server) InnerRingIndex(ctx context.Context) int {
index, err := s.statusIndex.InnerRingIndex() index, err := s.statusIndex.InnerRingIndex()
if err != nil { if err != nil {
s.log.Error(context.Background(), logs.InnerringCantGetInnerRingIndex, zap.String("error", err.Error())) s.log.Error(ctx, logs.InnerringCantGetInnerRingIndex, zap.String("error", err.Error()))
return -1 return -1
} }
@ -71,10 +71,10 @@ func (s *Server) InnerRingIndex() int {
// InnerRingSize is a getter for a global size of inner ring list. This value // InnerRingSize is a getter for a global size of inner ring list. This value
// paired with inner ring index. // paired with inner ring index.
func (s *Server) InnerRingSize() int { func (s *Server) InnerRingSize(ctx context.Context) int {
size, err := s.statusIndex.InnerRingSize() size, err := s.statusIndex.InnerRingSize()
if err != nil { if err != nil {
s.log.Error(context.Background(), logs.InnerringCantGetInnerRingSize, zap.String("error", err.Error())) s.log.Error(ctx, logs.InnerringCantGetInnerRingSize, zap.String("error", err.Error()))
return 0 return 0
} }
@ -93,18 +93,18 @@ func (s *Server) AlphabetIndex() int {
return int(index) return int(index)
} }
func (s *Server) voteForSidechainValidator(prm governance.VoteValidatorPrm) error { func (s *Server) voteForSidechainValidator(ctx context.Context, prm governance.VoteValidatorPrm) error {
validators := prm.Validators validators := prm.Validators
index := s.InnerRingIndex() index := s.InnerRingIndex(ctx)
if s.contracts.alphabet.indexOutOfRange(index) { if s.contracts.alphabet.indexOutOfRange(index) {
s.log.Info(context.Background(), logs.InnerringIgnoreValidatorVoteNodeNotInAlphabetRange) s.log.Info(ctx, logs.InnerringIgnoreValidatorVoteNodeNotInAlphabetRange)
return nil return nil
} }
if len(validators) == 0 { if len(validators) == 0 {
s.log.Info(context.Background(), logs.InnerringIgnoreValidatorVoteEmptyValidatorsList) s.log.Info(ctx, logs.InnerringIgnoreValidatorVoteEmptyValidatorsList)
return nil return nil
} }
@ -129,7 +129,7 @@ func (s *Server) voteForSidechainValidator(prm governance.VoteValidatorPrm) erro
s.contracts.alphabet.iterate(func(letter GlagoliticLetter, contract util.Uint160) { s.contracts.alphabet.iterate(func(letter GlagoliticLetter, contract util.Uint160) {
_, err := s.morphClient.NotaryInvoke(contract, s.feeConfig.SideChainFee(), nonce, vubP, voteMethod, epoch, validators) _, err := s.morphClient.NotaryInvoke(contract, s.feeConfig.SideChainFee(), nonce, vubP, voteMethod, epoch, validators)
if err != nil { if err != nil {
s.log.Warn(context.Background(), logs.InnerringCantInvokeVoteMethodInAlphabetContract, s.log.Warn(ctx, logs.InnerringCantInvokeVoteMethodInAlphabetContract,
zap.Int8("alphabet_index", int8(letter)), zap.Int8("alphabet_index", int8(letter)),
zap.Uint64("epoch", epoch), zap.Uint64("epoch", epoch),
zap.String("error", err.Error())) zap.String("error", err.Error()))
@ -141,9 +141,9 @@ func (s *Server) voteForSidechainValidator(prm governance.VoteValidatorPrm) erro
// VoteForSidechainValidator calls vote method on alphabet contracts with // VoteForSidechainValidator calls vote method on alphabet contracts with
// the provided list of keys. // the provided list of keys.
func (s *Server) VoteForSidechainValidator(prm governance.VoteValidatorPrm) error { func (s *Server) VoteForSidechainValidator(ctx context.Context, prm governance.VoteValidatorPrm) error {
sort.Sort(prm.Validators) sort.Sort(prm.Validators)
return s.voteForSidechainValidator(prm) return s.voteForSidechainValidator(ctx, prm)
} }
// ResetEpochTimer resets the block timer that produces events to update epoch // ResetEpochTimer resets the block timer that produces events to update epoch

View file

@ -46,9 +46,9 @@ func TestServerState(t *testing.T) {
srv.setHealthStatus(context.Background(), healthStatus) srv.setHealthStatus(context.Background(), healthStatus)
require.Equal(t, healthStatus, srv.HealthStatus(), "invalid health status") require.Equal(t, healthStatus, srv.HealthStatus(), "invalid health status")
require.True(t, srv.IsActive(), "invalid IsActive result") require.True(t, srv.IsActive(context.Background()), "invalid IsActive result")
require.True(t, srv.IsAlphabet(), "invalid IsAlphabet result") require.True(t, srv.IsAlphabet(), "invalid IsAlphabet result")
require.Equal(t, 0, srv.InnerRingIndex(), "invalid IR index") require.Equal(t, 0, srv.InnerRingIndex(context.Background()), "invalid IR index")
require.Equal(t, 1, srv.InnerRingSize(), "invalid IR index") require.Equal(t, 1, srv.InnerRingSize(context.Background()), "invalid IR index")
require.Equal(t, 0, srv.AlphabetIndex(), "invalid alphabet index") require.Equal(t, 0, srv.AlphabetIndex(), "invalid alphabet index")
} }

View file

@ -1,11 +1,13 @@
package event package event
import ( import (
"context"
"github.com/nspcc-dev/neo-go/pkg/core/block" "github.com/nspcc-dev/neo-go/pkg/core/block"
) )
// Handler is an Event processing function. // Handler is an Event processing function.
type Handler func(Event) type Handler func(context.Context, Event)
// BlockHandler is a chain block processing function. // BlockHandler is a chain block processing function.
type BlockHandler func(*block.Block) type BlockHandler func(*block.Block)

View file

@ -280,7 +280,7 @@ loop:
continue loop continue loop
} }
l.handleNotaryEvent(notaryEvent) l.handleNotaryEvent(ctx, notaryEvent)
case b, ok := <-chs.BlockCh: case b, ok := <-chs.BlockCh:
if !ok { if !ok {
l.log.Warn(ctx, logs.EventStopEventListenerByBlockChannel) l.log.Warn(ctx, logs.EventStopEventListenerByBlockChannel)
@ -307,11 +307,11 @@ func (l *listener) handleBlockEvent(b *block.Block) {
} }
} }
func (l *listener) handleNotaryEvent(notaryEvent *result.NotaryRequestEvent) { func (l *listener) handleNotaryEvent(ctx context.Context, notaryEvent *result.NotaryRequestEvent) {
if err := l.pool.Submit(func() { if err := l.pool.Submit(func() {
l.parseAndHandleNotary(notaryEvent) l.parseAndHandleNotary(ctx, notaryEvent)
}); err != nil { }); err != nil {
l.log.Warn(context.Background(), logs.EventListenerWorkerPoolDrained, l.log.Warn(ctx, logs.EventListenerWorkerPoolDrained,
zap.Int("capacity", l.pool.Cap())) zap.Int("capacity", l.pool.Cap()))
} }
} }
@ -376,11 +376,11 @@ func (l *listener) parseAndHandleNotification(ctx context.Context, notifyEvent *
} }
for _, handler := range handlers { for _, handler := range handlers {
handler(event) handler(ctx, event)
} }
} }
func (l *listener) parseAndHandleNotary(nr *result.NotaryRequestEvent) { func (l *listener) parseAndHandleNotary(ctx context.Context, nr *result.NotaryRequestEvent) {
// prepare the notary event // prepare the notary event
notaryEvent, err := l.notaryEventsPreparator.Prepare(nr.NotaryRequest) notaryEvent, err := l.notaryEventsPreparator.Prepare(nr.NotaryRequest)
if err != nil { if err != nil {
@ -388,13 +388,13 @@ func (l *listener) parseAndHandleNotary(nr *result.NotaryRequestEvent) {
switch { switch {
case errors.Is(err, ErrTXAlreadyHandled): case errors.Is(err, ErrTXAlreadyHandled):
case errors.As(err, &expErr): case errors.As(err, &expErr):
l.log.Warn(context.Background(), logs.EventSkipExpiredMainTXNotaryEvent, l.log.Warn(ctx, logs.EventSkipExpiredMainTXNotaryEvent,
zap.String("error", err.Error()), zap.String("error", err.Error()),
zap.Uint32("current_block_height", expErr.CurrentBlockHeight), zap.Uint32("current_block_height", expErr.CurrentBlockHeight),
zap.Uint32("fallback_tx_not_valid_before_height", expErr.FallbackTXNotValidBeforeHeight), zap.Uint32("fallback_tx_not_valid_before_height", expErr.FallbackTXNotValidBeforeHeight),
) )
default: default:
l.log.Warn(context.Background(), logs.EventCouldNotPrepareAndValidateNotaryEvent, l.log.Warn(ctx, logs.EventCouldNotPrepareAndValidateNotaryEvent,
zap.String("error", err.Error()), zap.String("error", err.Error()),
) )
} }
@ -418,7 +418,7 @@ func (l *listener) parseAndHandleNotary(nr *result.NotaryRequestEvent) {
l.mtx.RUnlock() l.mtx.RUnlock()
if !ok { if !ok {
log.Debug(context.Background(), logs.EventNotaryParserNotSet) log.Debug(ctx, logs.EventNotaryParserNotSet)
return return
} }
@ -426,7 +426,7 @@ func (l *listener) parseAndHandleNotary(nr *result.NotaryRequestEvent) {
// parse the notary event // parse the notary event
event, err := parser(notaryEvent) event, err := parser(notaryEvent)
if err != nil { if err != nil {
log.Warn(context.Background(), logs.EventCouldNotParseNotaryEvent, log.Warn(ctx, logs.EventCouldNotParseNotaryEvent,
zap.String("error", err.Error()), zap.String("error", err.Error()),
) )
@ -439,14 +439,14 @@ func (l *listener) parseAndHandleNotary(nr *result.NotaryRequestEvent) {
l.mtx.RUnlock() l.mtx.RUnlock()
if !ok { if !ok {
log.Info(context.Background(), logs.EventNotaryHandlersForParsedNotificationEventWereNotRegistered, log.Info(ctx, logs.EventNotaryHandlersForParsedNotificationEventWereNotRegistered,
zap.Any("event", event), zap.Any("event", event),
) )
return return
} }
handler(event) handler(ctx, event)
} }
// SetNotificationParser sets the parser of particular contract event. // SetNotificationParser sets the parser of particular contract event.

View file

@ -59,7 +59,7 @@ func TestEventHandling(t *testing.T) {
handledNotifications := make([]Event, 0) handledNotifications := make([]Event, 0)
l.RegisterNotificationHandler(NotificationHandlerInfo{ l.RegisterNotificationHandler(NotificationHandlerInfo{
scriptHashWithType: key, scriptHashWithType: key,
h: func(e Event) { h: func(_ context.Context, e Event) {
handledNotifications = append(handledNotifications, e) handledNotifications = append(handledNotifications, e)
notificationHandled <- true notificationHandled <- true
}, },

View file

@ -85,12 +85,12 @@ func (s typeValue) GetType() Type {
// WorkerPoolHandler sets closure over worker pool w with passed handler h. // WorkerPoolHandler sets closure over worker pool w with passed handler h.
func WorkerPoolHandler(w util2.WorkerPool, h Handler, log *logger.Logger) Handler { func WorkerPoolHandler(w util2.WorkerPool, h Handler, log *logger.Logger) Handler {
return func(e Event) { return func(ctx context.Context, e Event) {
err := w.Submit(func() { err := w.Submit(func() {
h(e) h(ctx, e)
}) })
if err != nil { if err != nil {
log.Warn(context.Background(), logs.EventCouldNotSubmitHandlerToWorkerPool, log.Warn(ctx, logs.EventCouldNotSubmitHandlerToWorkerPool,
zap.String("error", err.Error()), zap.String("error", err.Error()),
) )
} }