forked from TrueCloudLab/frostfs-node
[#1437] ir: Fix contextcheck linters
Signed-off-by: Dmitrii Stepanov <d.stepanov@yadro.com>
This commit is contained in:
parent
cdb3cea4e7
commit
c75f845f4b
27 changed files with 165 additions and 157 deletions
|
@ -1089,7 +1089,7 @@ func (c *cfg) LocalAddress() network.AddressGroup {
|
|||
func initLocalStorage(ctx context.Context, c *cfg) {
|
||||
ls := engine.New(c.engineOpts()...)
|
||||
|
||||
addNewEpochAsyncNotificationHandler(c, func(ev event.Event) {
|
||||
addNewEpochAsyncNotificationHandler(c, func(ctx context.Context, ev event.Event) {
|
||||
ls.HandleNewEpoch(ctx, ev.(netmap2.NewEpoch).EpochNumber())
|
||||
})
|
||||
|
||||
|
|
|
@ -89,7 +89,7 @@ func configureEACLAndContainerSources(c *cfg, client *cntClient.Client, cnrSrc c
|
|||
if c.cfgMorph.containerCacheSize > 0 {
|
||||
containerCache := newCachedContainerStorage(cnrSrc, c.cfgMorph.cacheTTL, c.cfgMorph.containerCacheSize)
|
||||
|
||||
subscribeToContainerCreation(c, func(e event.Event) {
|
||||
subscribeToContainerCreation(c, func(ctx context.Context, e event.Event) {
|
||||
ev := e.(containerEvent.PutSuccess)
|
||||
|
||||
// read owner of the created container in order to update the reading cache.
|
||||
|
@ -102,21 +102,21 @@ func configureEACLAndContainerSources(c *cfg, client *cntClient.Client, cnrSrc c
|
|||
} else {
|
||||
// unlike removal, we expect successful receive of the container
|
||||
// after successful creation, so logging can be useful
|
||||
c.log.Error(context.Background(), logs.FrostFSNodeReadNewlyCreatedContainerAfterTheNotification,
|
||||
c.log.Error(ctx, logs.FrostFSNodeReadNewlyCreatedContainerAfterTheNotification,
|
||||
zap.Stringer("id", ev.ID),
|
||||
zap.Error(err),
|
||||
)
|
||||
}
|
||||
|
||||
c.log.Debug(context.Background(), logs.FrostFSNodeContainerCreationEventsReceipt,
|
||||
c.log.Debug(ctx, logs.FrostFSNodeContainerCreationEventsReceipt,
|
||||
zap.Stringer("id", ev.ID),
|
||||
)
|
||||
})
|
||||
|
||||
subscribeToContainerRemoval(c, func(e event.Event) {
|
||||
subscribeToContainerRemoval(c, func(ctx context.Context, e event.Event) {
|
||||
ev := e.(containerEvent.DeleteSuccess)
|
||||
containerCache.handleRemoval(ev.ID)
|
||||
c.log.Debug(context.Background(), logs.FrostFSNodeContainerRemovalEventsReceipt,
|
||||
c.log.Debug(ctx, logs.FrostFSNodeContainerRemovalEventsReceipt,
|
||||
zap.Stringer("id", ev.ID),
|
||||
)
|
||||
})
|
||||
|
|
|
@ -175,11 +175,11 @@ func initNetmapService(ctx context.Context, c *cfg) {
|
|||
}
|
||||
|
||||
func addNewEpochNotificationHandlers(c *cfg) {
|
||||
addNewEpochNotificationHandler(c, func(ev event.Event) {
|
||||
addNewEpochNotificationHandler(c, func(_ context.Context, ev event.Event) {
|
||||
c.cfgNetmap.state.setCurrentEpoch(ev.(netmapEvent.NewEpoch).EpochNumber())
|
||||
})
|
||||
|
||||
addNewEpochAsyncNotificationHandler(c, func(ev event.Event) {
|
||||
addNewEpochAsyncNotificationHandler(c, func(ctx context.Context, ev event.Event) {
|
||||
e := ev.(netmapEvent.NewEpoch).EpochNumber()
|
||||
|
||||
c.updateContractNodeInfo(e)
|
||||
|
@ -189,15 +189,15 @@ func addNewEpochNotificationHandlers(c *cfg) {
|
|||
}
|
||||
|
||||
if err := c.bootstrap(); err != nil {
|
||||
c.log.Warn(context.Background(), logs.FrostFSNodeCantSendRebootstrapTx, zap.Error(err))
|
||||
c.log.Warn(ctx, logs.FrostFSNodeCantSendRebootstrapTx, zap.Error(err))
|
||||
}
|
||||
})
|
||||
|
||||
if c.cfgMorph.notaryEnabled {
|
||||
addNewEpochAsyncNotificationHandler(c, func(_ event.Event) {
|
||||
addNewEpochAsyncNotificationHandler(c, func(ctx context.Context, _ event.Event) {
|
||||
_, _, err := makeNotaryDeposit(c)
|
||||
if err != nil {
|
||||
c.log.Error(context.Background(), logs.FrostFSNodeCouldNotMakeNotaryDeposit,
|
||||
c.log.Error(ctx, logs.FrostFSNodeCouldNotMakeNotaryDeposit,
|
||||
zap.String("error", err.Error()),
|
||||
)
|
||||
}
|
||||
|
|
|
@ -48,7 +48,7 @@ func initSessionService(c *cfg) {
|
|||
_ = c.privateTokenStore.Close()
|
||||
})
|
||||
|
||||
addNewEpochNotificationHandler(c, func(ev event.Event) {
|
||||
addNewEpochNotificationHandler(c, func(_ context.Context, ev event.Event) {
|
||||
c.privateTokenStore.RemoveOld(ev.(netmap.NewEpoch).EpochNumber())
|
||||
})
|
||||
|
||||
|
|
|
@ -80,10 +80,10 @@ func initTreeService(c *cfg) {
|
|||
}))
|
||||
|
||||
if d := treeConfig.SyncInterval(); d == 0 {
|
||||
addNewEpochNotificationHandler(c, func(_ event.Event) {
|
||||
addNewEpochNotificationHandler(c, func(ctx context.Context, _ event.Event) {
|
||||
err := c.treeService.SynchronizeAll()
|
||||
if err != nil {
|
||||
c.log.Error(context.Background(), logs.FrostFSNodeCouldNotSynchronizeTreeService, zap.Error(err))
|
||||
c.log.Error(ctx, logs.FrostFSNodeCouldNotSynchronizeTreeService, zap.Error(err))
|
||||
}
|
||||
})
|
||||
} else {
|
||||
|
@ -103,15 +103,15 @@ func initTreeService(c *cfg) {
|
|||
}()
|
||||
}
|
||||
|
||||
subscribeToContainerRemoval(c, func(e event.Event) {
|
||||
subscribeToContainerRemoval(c, func(ctx context.Context, e event.Event) {
|
||||
ev := e.(containerEvent.DeleteSuccess)
|
||||
|
||||
// This is executed asynchronously, so we don't care about the operation taking some time.
|
||||
c.log.Debug(context.Background(), logs.FrostFSNodeRemovingAllTreesForContainer, zap.Stringer("cid", ev.ID))
|
||||
err := c.treeService.DropTree(context.Background(), ev.ID, "")
|
||||
c.log.Debug(ctx, logs.FrostFSNodeRemovingAllTreesForContainer, zap.Stringer("cid", ev.ID))
|
||||
err := c.treeService.DropTree(ctx, ev.ID, "")
|
||||
if err != nil && !errors.Is(err, pilorama.ErrTreeNotFound) {
|
||||
// Ignore pilorama.ErrTreeNotFound but other errors, including shard.ErrReadOnly, should be logged.
|
||||
c.log.Error(context.Background(), logs.FrostFSNodeContainerRemovalEventReceivedButTreesWerentRemoved,
|
||||
c.log.Error(ctx, logs.FrostFSNodeContainerRemovalEventReceivedButTreesWerentRemoved,
|
||||
zap.Stringer("cid", ev.ID),
|
||||
zap.String("error", err.Error()))
|
||||
}
|
||||
|
|
|
@ -137,12 +137,12 @@ func (s *Server) enableNotarySupport() error {
|
|||
return nil
|
||||
}
|
||||
|
||||
func (s *Server) initNotaryConfig() {
|
||||
func (s *Server) initNotaryConfig(ctx context.Context) {
|
||||
s.mainNotaryConfig = notaryConfigs(
|
||||
!s.withoutMainNet && s.mainnetClient.ProbeNotary(), // if mainnet disabled then notary flag must be disabled too
|
||||
)
|
||||
|
||||
s.log.Info(context.Background(), logs.InnerringNotarySupport,
|
||||
s.log.Info(ctx, logs.InnerringNotarySupport,
|
||||
zap.Bool("sidechain_enabled", true),
|
||||
zap.Bool("mainchain_enabled", !s.mainNotaryConfig.disabled),
|
||||
)
|
||||
|
@ -152,8 +152,8 @@ func (s *Server) createAlphaSync(cfg *viper.Viper, frostfsCli *frostfsClient.Cli
|
|||
var alphaSync event.Handler
|
||||
|
||||
if s.withoutMainNet || cfg.GetBool("governance.disable") {
|
||||
alphaSync = func(event.Event) {
|
||||
s.log.Debug(context.Background(), logs.InnerringAlphabetKeysSyncIsDisabled)
|
||||
alphaSync = func(ctx context.Context, _ event.Event) {
|
||||
s.log.Debug(ctx, logs.InnerringAlphabetKeysSyncIsDisabled)
|
||||
}
|
||||
} else {
|
||||
// create governance processor
|
||||
|
@ -196,9 +196,9 @@ func (s *Server) createIRFetcher() irFetcher {
|
|||
return irf
|
||||
}
|
||||
|
||||
func (s *Server) initTimers(cfg *viper.Viper) {
|
||||
func (s *Server) initTimers(ctx context.Context, cfg *viper.Viper) {
|
||||
s.epochTimer = newEpochTimer(&epochTimerArgs{
|
||||
newEpochHandlers: s.newEpochTickHandlers(),
|
||||
newEpochHandlers: s.newEpochTickHandlers(ctx),
|
||||
epoch: s,
|
||||
})
|
||||
|
||||
|
|
|
@ -152,7 +152,7 @@ func (s *Server) Start(ctx context.Context, intError chan<- error) (err error) {
|
|||
return err
|
||||
}
|
||||
|
||||
err = s.initConfigFromBlockchain()
|
||||
err = s.initConfigFromBlockchain(ctx)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
@ -173,14 +173,14 @@ func (s *Server) Start(ctx context.Context, intError chan<- error) (err error) {
|
|||
prm.Validators = s.predefinedValidators
|
||||
|
||||
// vote for sidechain validator if it is prepared in config
|
||||
err = s.voteForSidechainValidator(prm)
|
||||
err = s.voteForSidechainValidator(ctx, prm)
|
||||
if err != nil {
|
||||
// we don't stop inner ring execution on this error
|
||||
s.log.Warn(ctx, logs.InnerringCantVoteForPreparedValidators,
|
||||
zap.String("error", err.Error()))
|
||||
}
|
||||
|
||||
s.tickInitialExpoch()
|
||||
s.tickInitialExpoch(ctx)
|
||||
|
||||
morphErr := make(chan error)
|
||||
mainnnetErr := make(chan error)
|
||||
|
@ -283,11 +283,11 @@ func (s *Server) initSideNotary(ctx context.Context) error {
|
|||
)
|
||||
}
|
||||
|
||||
func (s *Server) tickInitialExpoch() {
|
||||
func (s *Server) tickInitialExpoch(ctx context.Context) {
|
||||
initialEpochTicker := timer.NewOneTickTimer(
|
||||
timer.StaticBlockMeter(s.initialEpochTickDelta),
|
||||
func() {
|
||||
s.netmapProcessor.HandleNewEpochTick(timerEvent.NewEpochTick{})
|
||||
s.netmapProcessor.HandleNewEpochTick(ctx, timerEvent.NewEpochTick{})
|
||||
})
|
||||
s.addBlockTimer(initialEpochTicker)
|
||||
}
|
||||
|
@ -376,7 +376,7 @@ func New(ctx context.Context, log *logger.Logger, cfg *viper.Viper, errChan chan
|
|||
return nil, err
|
||||
}
|
||||
|
||||
server.initNotaryConfig()
|
||||
server.initNotaryConfig(ctx)
|
||||
|
||||
err = server.initContracts(cfg)
|
||||
if err != nil {
|
||||
|
@ -405,7 +405,7 @@ func New(ctx context.Context, log *logger.Logger, cfg *viper.Viper, errChan chan
|
|||
return nil, err
|
||||
}
|
||||
|
||||
server.initTimers(cfg)
|
||||
server.initTimers(ctx, cfg)
|
||||
|
||||
err = server.initGRPCServer(cfg, log, audit)
|
||||
if err != nil {
|
||||
|
@ -573,7 +573,7 @@ func parseMultinetConfig(cfg *viper.Viper, m metrics.MultinetMetrics) internalNe
|
|||
return nc
|
||||
}
|
||||
|
||||
func (s *Server) initConfigFromBlockchain() error {
|
||||
func (s *Server) initConfigFromBlockchain(ctx context.Context) error {
|
||||
// get current epoch
|
||||
epoch, err := s.netmapClient.Epoch()
|
||||
if err != nil {
|
||||
|
@ -602,8 +602,8 @@ func (s *Server) initConfigFromBlockchain() error {
|
|||
return err
|
||||
}
|
||||
|
||||
s.log.Debug(context.Background(), logs.InnerringReadConfigFromBlockchain,
|
||||
zap.Bool("active", s.IsActive()),
|
||||
s.log.Debug(ctx, logs.InnerringReadConfigFromBlockchain,
|
||||
zap.Bool("active", s.IsActive(ctx)),
|
||||
zap.Bool("alphabet", s.IsAlphabet()),
|
||||
zap.Uint64("epoch", epoch),
|
||||
zap.Uint32("precision", balancePrecision),
|
||||
|
@ -635,17 +635,17 @@ func (s *Server) nextEpochBlockDelta() (uint32, error) {
|
|||
// onlyAlphabet wrapper around event handler that executes it
|
||||
// only if inner ring node is alphabet node.
|
||||
func (s *Server) onlyAlphabetEventHandler(f event.Handler) event.Handler {
|
||||
return func(ev event.Event) {
|
||||
return func(ctx context.Context, ev event.Event) {
|
||||
if s.IsAlphabet() {
|
||||
f(ev)
|
||||
f(ctx, ev)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (s *Server) newEpochTickHandlers() []newEpochHandler {
|
||||
func (s *Server) newEpochTickHandlers(ctx context.Context) []newEpochHandler {
|
||||
newEpochHandlers := []newEpochHandler{
|
||||
func() {
|
||||
s.netmapProcessor.HandleNewEpochTick(timerEvent.NewEpochTick{})
|
||||
s.netmapProcessor.HandleNewEpochTick(ctx, timerEvent.NewEpochTick{})
|
||||
},
|
||||
}
|
||||
|
||||
|
|
|
@ -50,16 +50,16 @@ func (s *Server) depositSideNotary() (util.Uint256, error) {
|
|||
return tx, err
|
||||
}
|
||||
|
||||
func (s *Server) notaryHandler(_ event.Event) {
|
||||
func (s *Server) notaryHandler(ctx context.Context, _ event.Event) {
|
||||
if !s.mainNotaryConfig.disabled {
|
||||
_, err := s.depositMainNotary()
|
||||
if err != nil {
|
||||
s.log.Error(context.Background(), logs.InnerringCantMakeNotaryDepositInMainChain, zap.Error(err))
|
||||
s.log.Error(ctx, logs.InnerringCantMakeNotaryDepositInMainChain, zap.Error(err))
|
||||
}
|
||||
}
|
||||
|
||||
if _, err := s.depositSideNotary(); err != nil {
|
||||
s.log.Error(context.Background(), logs.InnerringCantMakeNotaryDepositInSideChain, zap.Error(err))
|
||||
s.log.Error(ctx, logs.InnerringCantMakeNotaryDepositInSideChain, zap.Error(err))
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -11,9 +11,9 @@ import (
|
|||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
func (bp *Processor) handleLock(ev event.Event) {
|
||||
func (bp *Processor) handleLock(ctx context.Context, ev event.Event) {
|
||||
lock := ev.(balanceEvent.Lock)
|
||||
bp.log.Info(context.Background(), logs.Notification,
|
||||
bp.log.Info(ctx, logs.Notification,
|
||||
zap.String("type", "lock"),
|
||||
zap.String("value", hex.EncodeToString(lock.ID())))
|
||||
|
||||
|
@ -24,7 +24,7 @@ func (bp *Processor) handleLock(ev event.Event) {
|
|||
})
|
||||
if err != nil {
|
||||
// there system can be moved into controlled degradation stage
|
||||
bp.log.Warn(context.Background(), logs.BalanceBalanceWorkerPoolDrained,
|
||||
bp.log.Warn(ctx, logs.BalanceBalanceWorkerPoolDrained,
|
||||
zap.Int("capacity", bp.pool.Cap()))
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1,6 +1,7 @@
|
|||
package balance
|
||||
|
||||
import (
|
||||
"context"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
|
@ -30,7 +31,7 @@ func TestProcessorCallsFrostFSContractForLockEvent(t *testing.T) {
|
|||
})
|
||||
require.NoError(t, err, "failed to create processor")
|
||||
|
||||
processor.handleLock(balanceEvent.Lock{})
|
||||
processor.handleLock(context.Background(), balanceEvent.Lock{})
|
||||
|
||||
for processor.pool.Running() > 0 {
|
||||
time.Sleep(10 * time.Millisecond)
|
||||
|
@ -56,7 +57,7 @@ func TestProcessorDoesntCallFrostFSContractIfNotAlphabet(t *testing.T) {
|
|||
})
|
||||
require.NoError(t, err, "failed to create processor")
|
||||
|
||||
processor.handleLock(balanceEvent.Lock{})
|
||||
processor.handleLock(context.Background(), balanceEvent.Lock{})
|
||||
|
||||
for processor.pool.Running() > 0 {
|
||||
time.Sleep(10 * time.Millisecond)
|
||||
|
|
|
@ -12,11 +12,11 @@ import (
|
|||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
func (cp *Processor) handlePut(ev event.Event) {
|
||||
func (cp *Processor) handlePut(ctx context.Context, ev event.Event) {
|
||||
put := ev.(putEvent)
|
||||
|
||||
id := sha256.Sum256(put.Container())
|
||||
cp.log.Info(context.Background(), logs.Notification,
|
||||
cp.log.Info(ctx, logs.Notification,
|
||||
zap.String("type", "container put"),
|
||||
zap.String("id", base58.Encode(id[:])))
|
||||
|
||||
|
@ -27,14 +27,14 @@ func (cp *Processor) handlePut(ev event.Event) {
|
|||
})
|
||||
if err != nil {
|
||||
// there system can be moved into controlled degradation stage
|
||||
cp.log.Warn(context.Background(), logs.ContainerContainerProcessorWorkerPoolDrained,
|
||||
cp.log.Warn(ctx, logs.ContainerContainerProcessorWorkerPoolDrained,
|
||||
zap.Int("capacity", cp.pool.Cap()))
|
||||
}
|
||||
}
|
||||
|
||||
func (cp *Processor) handleDelete(ev event.Event) {
|
||||
func (cp *Processor) handleDelete(ctx context.Context, ev event.Event) {
|
||||
del := ev.(containerEvent.Delete)
|
||||
cp.log.Info(context.Background(), logs.Notification,
|
||||
cp.log.Info(ctx, logs.Notification,
|
||||
zap.String("type", "container delete"),
|
||||
zap.String("id", base58.Encode(del.ContainerID())))
|
||||
|
||||
|
@ -45,7 +45,7 @@ func (cp *Processor) handleDelete(ev event.Event) {
|
|||
})
|
||||
if err != nil {
|
||||
// there system can be moved into controlled degradation stage
|
||||
cp.log.Warn(context.Background(), logs.ContainerContainerProcessorWorkerPoolDrained,
|
||||
cp.log.Warn(ctx, logs.ContainerContainerProcessorWorkerPoolDrained,
|
||||
zap.Int("capacity", cp.pool.Cap()))
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1,6 +1,7 @@
|
|||
package container
|
||||
|
||||
import (
|
||||
"context"
|
||||
"crypto/ecdsa"
|
||||
"encoding/hex"
|
||||
"testing"
|
||||
|
@ -71,7 +72,7 @@ func TestPutEvent(t *testing.T) {
|
|||
nr: nr,
|
||||
}
|
||||
|
||||
proc.handlePut(event)
|
||||
proc.handlePut(context.Background(), event)
|
||||
|
||||
for proc.pool.Running() > 0 {
|
||||
time.Sleep(10 * time.Millisecond)
|
||||
|
@ -143,7 +144,7 @@ func TestDeleteEvent(t *testing.T) {
|
|||
Signature: signature,
|
||||
}
|
||||
|
||||
proc.handleDelete(ev)
|
||||
proc.handleDelete(context.Background(), ev)
|
||||
|
||||
for proc.pool.Running() > 0 {
|
||||
time.Sleep(10 * time.Millisecond)
|
||||
|
|
|
@ -13,11 +13,11 @@ import (
|
|||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
func (np *Processor) handleDeposit(ev event.Event) {
|
||||
func (np *Processor) handleDeposit(ctx context.Context, ev event.Event) {
|
||||
deposit := ev.(frostfsEvent.Deposit)
|
||||
depositIDBin := bytes.Clone(deposit.ID())
|
||||
slices.Reverse(depositIDBin)
|
||||
np.log.Info(context.Background(), logs.Notification,
|
||||
np.log.Info(ctx, logs.Notification,
|
||||
zap.String("type", "deposit"),
|
||||
zap.String("id", hex.EncodeToString(depositIDBin)))
|
||||
|
||||
|
@ -28,16 +28,16 @@ func (np *Processor) handleDeposit(ev event.Event) {
|
|||
})
|
||||
if err != nil {
|
||||
// there system can be moved into controlled degradation stage
|
||||
np.log.Warn(context.Background(), logs.FrostFSFrostfsProcessorWorkerPoolDrained,
|
||||
np.log.Warn(ctx, logs.FrostFSFrostfsProcessorWorkerPoolDrained,
|
||||
zap.Int("capacity", np.pool.Cap()))
|
||||
}
|
||||
}
|
||||
|
||||
func (np *Processor) handleWithdraw(ev event.Event) {
|
||||
func (np *Processor) handleWithdraw(ctx context.Context, ev event.Event) {
|
||||
withdraw := ev.(frostfsEvent.Withdraw)
|
||||
withdrawBin := bytes.Clone(withdraw.ID())
|
||||
slices.Reverse(withdrawBin)
|
||||
np.log.Info(context.Background(), logs.Notification,
|
||||
np.log.Info(ctx, logs.Notification,
|
||||
zap.String("type", "withdraw"),
|
||||
zap.String("id", hex.EncodeToString(withdrawBin)))
|
||||
|
||||
|
@ -48,14 +48,14 @@ func (np *Processor) handleWithdraw(ev event.Event) {
|
|||
})
|
||||
if err != nil {
|
||||
// there system can be moved into controlled degradation stage
|
||||
np.log.Warn(context.Background(), logs.FrostFSFrostfsProcessorWorkerPoolDrained,
|
||||
np.log.Warn(ctx, logs.FrostFSFrostfsProcessorWorkerPoolDrained,
|
||||
zap.Int("capacity", np.pool.Cap()))
|
||||
}
|
||||
}
|
||||
|
||||
func (np *Processor) handleCheque(ev event.Event) {
|
||||
func (np *Processor) handleCheque(ctx context.Context, ev event.Event) {
|
||||
cheque := ev.(frostfsEvent.Cheque)
|
||||
np.log.Info(context.Background(), logs.Notification,
|
||||
np.log.Info(ctx, logs.Notification,
|
||||
zap.String("type", "cheque"),
|
||||
zap.String("id", hex.EncodeToString(cheque.ID())))
|
||||
|
||||
|
@ -66,14 +66,14 @@ func (np *Processor) handleCheque(ev event.Event) {
|
|||
})
|
||||
if err != nil {
|
||||
// there system can be moved into controlled degradation stage
|
||||
np.log.Warn(context.Background(), logs.FrostFSFrostfsProcessorWorkerPoolDrained,
|
||||
np.log.Warn(ctx, logs.FrostFSFrostfsProcessorWorkerPoolDrained,
|
||||
zap.Int("capacity", np.pool.Cap()))
|
||||
}
|
||||
}
|
||||
|
||||
func (np *Processor) handleConfig(ev event.Event) {
|
||||
func (np *Processor) handleConfig(ctx context.Context, ev event.Event) {
|
||||
cfg := ev.(frostfsEvent.Config)
|
||||
np.log.Info(context.Background(), logs.Notification,
|
||||
np.log.Info(ctx, logs.Notification,
|
||||
zap.String("type", "set config"),
|
||||
zap.String("key", hex.EncodeToString(cfg.Key())),
|
||||
zap.String("value", hex.EncodeToString(cfg.Value())))
|
||||
|
@ -85,7 +85,7 @@ func (np *Processor) handleConfig(ev event.Event) {
|
|||
})
|
||||
if err != nil {
|
||||
// there system can be moved into controlled degradation stage
|
||||
np.log.Warn(context.Background(), logs.FrostFSFrostfsProcessorWorkerPoolDrained,
|
||||
np.log.Warn(ctx, logs.FrostFSFrostfsProcessorWorkerPoolDrained,
|
||||
zap.Int("capacity", np.pool.Cap()))
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1,6 +1,7 @@
|
|||
package frostfs
|
||||
|
||||
import (
|
||||
"context"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
|
@ -36,7 +37,7 @@ func TestHandleDeposit(t *testing.T) {
|
|||
AmountValue: 1000,
|
||||
}
|
||||
|
||||
proc.handleDeposit(ev)
|
||||
proc.handleDeposit(context.Background(), ev)
|
||||
|
||||
for proc.pool.Running() > 0 {
|
||||
time.Sleep(10 * time.Millisecond)
|
||||
|
@ -57,7 +58,7 @@ func TestHandleDeposit(t *testing.T) {
|
|||
|
||||
es.epochCounter = 109
|
||||
|
||||
proc.handleDeposit(ev)
|
||||
proc.handleDeposit(context.Background(), ev)
|
||||
|
||||
for proc.pool.Running() > 0 {
|
||||
time.Sleep(10 * time.Millisecond)
|
||||
|
@ -98,7 +99,7 @@ func TestHandleWithdraw(t *testing.T) {
|
|||
AmountValue: 1000,
|
||||
}
|
||||
|
||||
proc.handleWithdraw(ev)
|
||||
proc.handleWithdraw(context.Background(), ev)
|
||||
|
||||
for proc.pool.Running() > 0 {
|
||||
time.Sleep(10 * time.Millisecond)
|
||||
|
@ -139,7 +140,7 @@ func TestHandleCheque(t *testing.T) {
|
|||
LockValue: util.Uint160{200},
|
||||
}
|
||||
|
||||
proc.handleCheque(ev)
|
||||
proc.handleCheque(context.Background(), ev)
|
||||
|
||||
for proc.pool.Running() > 0 {
|
||||
time.Sleep(10 * time.Millisecond)
|
||||
|
@ -176,7 +177,7 @@ func TestHandleConfig(t *testing.T) {
|
|||
TxHashValue: util.Uint256{100},
|
||||
}
|
||||
|
||||
proc.handleConfig(ev)
|
||||
proc.handleConfig(context.Background(), ev)
|
||||
|
||||
for proc.pool.Running() > 0 {
|
||||
time.Sleep(10 * time.Millisecond)
|
||||
|
|
|
@ -13,7 +13,7 @@ import (
|
|||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
func (gp *Processor) HandleAlphabetSync(e event.Event) {
|
||||
func (gp *Processor) HandleAlphabetSync(ctx context.Context, e event.Event) {
|
||||
var (
|
||||
typ string
|
||||
hash util.Uint256
|
||||
|
@ -34,16 +34,16 @@ func (gp *Processor) HandleAlphabetSync(e event.Event) {
|
|||
return
|
||||
}
|
||||
|
||||
gp.log.Info(context.Background(), logs.GovernanceNewEvent, zap.String("type", typ))
|
||||
gp.log.Info(ctx, logs.GovernanceNewEvent, zap.String("type", typ))
|
||||
|
||||
// send event to the worker pool
|
||||
|
||||
err := processors.SubmitEvent(gp.pool, gp.metrics, "alphabet_sync", func() bool {
|
||||
return gp.processAlphabetSync(hash)
|
||||
return gp.processAlphabetSync(ctx, hash)
|
||||
})
|
||||
if err != nil {
|
||||
// there system can be moved into controlled degradation stage
|
||||
gp.log.Warn(context.Background(), logs.GovernanceGovernanceWorkerPoolDrained,
|
||||
gp.log.Warn(ctx, logs.GovernanceGovernanceWorkerPoolDrained,
|
||||
zap.Int("capacity", gp.pool.Cap()))
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1,6 +1,7 @@
|
|||
package governance
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/binary"
|
||||
"sort"
|
||||
"testing"
|
||||
|
@ -57,7 +58,7 @@ func TestHandleAlphabetSyncEvent(t *testing.T) {
|
|||
txHash: util.Uint256{100},
|
||||
}
|
||||
|
||||
proc.HandleAlphabetSync(ev)
|
||||
proc.HandleAlphabetSync(context.Background(), ev)
|
||||
|
||||
for proc.pool.Running() > 0 {
|
||||
time.Sleep(10 * time.Millisecond)
|
||||
|
@ -133,7 +134,7 @@ func TestHandleAlphabetDesignateEvent(t *testing.T) {
|
|||
Role: noderoles.NeoFSAlphabet,
|
||||
}
|
||||
|
||||
proc.HandleAlphabetSync(ev)
|
||||
proc.HandleAlphabetSync(context.Background(), ev)
|
||||
|
||||
for proc.pool.Running() > 0 {
|
||||
time.Sleep(10 * time.Millisecond)
|
||||
|
@ -226,7 +227,7 @@ type testVoter struct {
|
|||
votes []VoteValidatorPrm
|
||||
}
|
||||
|
||||
func (v *testVoter) VoteForSidechainValidator(prm VoteValidatorPrm) error {
|
||||
func (v *testVoter) VoteForSidechainValidator(_ context.Context, prm VoteValidatorPrm) error {
|
||||
v.votes = append(v.votes, prm)
|
||||
return nil
|
||||
}
|
||||
|
|
|
@ -19,39 +19,39 @@ const (
|
|||
alphabetUpdateIDPrefix = "AlphabetUpdate"
|
||||
)
|
||||
|
||||
func (gp *Processor) processAlphabetSync(txHash util.Uint256) bool {
|
||||
func (gp *Processor) processAlphabetSync(ctx context.Context, txHash util.Uint256) bool {
|
||||
if !gp.alphabetState.IsAlphabet() {
|
||||
gp.log.Info(context.Background(), logs.GovernanceNonAlphabetModeIgnoreAlphabetSync)
|
||||
gp.log.Info(ctx, logs.GovernanceNonAlphabetModeIgnoreAlphabetSync)
|
||||
return true
|
||||
}
|
||||
|
||||
mainnetAlphabet, err := gp.mainnetClient.NeoFSAlphabetList()
|
||||
if err != nil {
|
||||
gp.log.Error(context.Background(), logs.GovernanceCantFetchAlphabetListFromMainNet,
|
||||
gp.log.Error(ctx, logs.GovernanceCantFetchAlphabetListFromMainNet,
|
||||
zap.String("error", err.Error()))
|
||||
return false
|
||||
}
|
||||
|
||||
sidechainAlphabet, err := gp.morphClient.Committee()
|
||||
if err != nil {
|
||||
gp.log.Error(context.Background(), logs.GovernanceCantFetchAlphabetListFromSideChain,
|
||||
gp.log.Error(ctx, logs.GovernanceCantFetchAlphabetListFromSideChain,
|
||||
zap.String("error", err.Error()))
|
||||
return false
|
||||
}
|
||||
|
||||
newAlphabet, err := newAlphabetList(sidechainAlphabet, mainnetAlphabet)
|
||||
if err != nil {
|
||||
gp.log.Error(context.Background(), logs.GovernanceCantMergeAlphabetListsFromMainNetAndSideChain,
|
||||
gp.log.Error(ctx, logs.GovernanceCantMergeAlphabetListsFromMainNetAndSideChain,
|
||||
zap.String("error", err.Error()))
|
||||
return false
|
||||
}
|
||||
|
||||
if newAlphabet == nil {
|
||||
gp.log.Info(context.Background(), logs.GovernanceNoGovernanceUpdateAlphabetListHasNotBeenChanged)
|
||||
gp.log.Info(ctx, logs.GovernanceNoGovernanceUpdateAlphabetListHasNotBeenChanged)
|
||||
return true
|
||||
}
|
||||
|
||||
gp.log.Info(context.Background(), logs.GovernanceAlphabetListHasBeenChangedStartingUpdate,
|
||||
gp.log.Info(ctx, logs.GovernanceAlphabetListHasBeenChangedStartingUpdate,
|
||||
zap.String("side_chain_alphabet", prettyKeys(sidechainAlphabet)),
|
||||
zap.String("new_alphabet", prettyKeys(newAlphabet)),
|
||||
)
|
||||
|
@ -62,9 +62,9 @@ func (gp *Processor) processAlphabetSync(txHash util.Uint256) bool {
|
|||
}
|
||||
|
||||
// 1. Vote to sidechain committee via alphabet contracts.
|
||||
err = gp.voter.VoteForSidechainValidator(votePrm)
|
||||
err = gp.voter.VoteForSidechainValidator(ctx, votePrm)
|
||||
if err != nil {
|
||||
gp.log.Error(context.Background(), logs.GovernanceCantVoteForSideChainCommittee,
|
||||
gp.log.Error(ctx, logs.GovernanceCantVoteForSideChainCommittee,
|
||||
zap.String("error", err.Error()))
|
||||
}
|
||||
|
||||
|
@ -77,7 +77,7 @@ func (gp *Processor) processAlphabetSync(txHash util.Uint256) bool {
|
|||
// 4. Update FrostFS contract in the mainnet.
|
||||
gp.updateFrostFSContractInMainnet(newAlphabet)
|
||||
|
||||
gp.log.Info(context.Background(), logs.GovernanceFinishedAlphabetListUpdate)
|
||||
gp.log.Info(ctx, logs.GovernanceFinishedAlphabetListUpdate)
|
||||
|
||||
return true
|
||||
}
|
||||
|
|
|
@ -1,6 +1,7 @@
|
|||
package governance
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
|
||||
|
@ -38,7 +39,7 @@ type VoteValidatorPrm struct {
|
|||
|
||||
// Voter is a callback interface for alphabet contract voting.
|
||||
type Voter interface {
|
||||
VoteForSidechainValidator(VoteValidatorPrm) error
|
||||
VoteForSidechainValidator(context.Context, VoteValidatorPrm) error
|
||||
}
|
||||
|
||||
type (
|
||||
|
|
|
@ -12,13 +12,13 @@ import (
|
|||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
func (np *Processor) HandleNewEpochTick(ev event.Event) {
|
||||
func (np *Processor) HandleNewEpochTick(ctx context.Context, ev event.Event) {
|
||||
_ = ev.(timerEvent.NewEpochTick)
|
||||
np.log.Info(context.Background(), logs.NetmapTick, zap.String("type", "epoch"))
|
||||
|
||||
// send an event to the worker pool
|
||||
|
||||
err := processors.SubmitEvent(np.pool, np.metrics, "netmap_new_epoch_tick", np.processNewEpochTick)
|
||||
err := processors.SubmitEvent(np.pool, np.metrics, "netmap_new_epoch_tick", func() bool { return np.processNewEpochTick(ctx) })
|
||||
if err != nil {
|
||||
// there system can be moved into controlled degradation stage
|
||||
np.log.Warn(context.Background(), logs.NetmapNetmapWorkerPoolDrained,
|
||||
|
@ -26,28 +26,28 @@ func (np *Processor) HandleNewEpochTick(ev event.Event) {
|
|||
}
|
||||
}
|
||||
|
||||
func (np *Processor) handleNewEpoch(ev event.Event) {
|
||||
func (np *Processor) handleNewEpoch(ctx context.Context, ev event.Event) {
|
||||
epochEvent := ev.(netmapEvent.NewEpoch)
|
||||
np.log.Info(context.Background(), logs.Notification,
|
||||
np.log.Info(ctx, logs.Notification,
|
||||
zap.String("type", "new epoch"),
|
||||
zap.Uint64("value", epochEvent.EpochNumber()))
|
||||
|
||||
// send an event to the worker pool
|
||||
|
||||
err := processors.SubmitEvent(np.pool, np.metrics, "netmap_new_epoch", func() bool {
|
||||
return np.processNewEpoch(epochEvent)
|
||||
return np.processNewEpoch(ctx, epochEvent)
|
||||
})
|
||||
if err != nil {
|
||||
// there system can be moved into controlled degradation stage
|
||||
np.log.Warn(context.Background(), logs.NetmapNetmapWorkerPoolDrained,
|
||||
np.log.Warn(ctx, logs.NetmapNetmapWorkerPoolDrained,
|
||||
zap.Int("capacity", np.pool.Cap()))
|
||||
}
|
||||
}
|
||||
|
||||
func (np *Processor) handleAddPeer(ev event.Event) {
|
||||
func (np *Processor) handleAddPeer(ctx context.Context, ev event.Event) {
|
||||
newPeer := ev.(netmapEvent.AddPeer)
|
||||
|
||||
np.log.Info(context.Background(), logs.Notification,
|
||||
np.log.Info(ctx, logs.Notification,
|
||||
zap.String("type", "add peer"),
|
||||
)
|
||||
|
||||
|
@ -58,14 +58,14 @@ func (np *Processor) handleAddPeer(ev event.Event) {
|
|||
})
|
||||
if err != nil {
|
||||
// there system can be moved into controlled degradation stage
|
||||
np.log.Warn(context.Background(), logs.NetmapNetmapWorkerPoolDrained,
|
||||
np.log.Warn(ctx, logs.NetmapNetmapWorkerPoolDrained,
|
||||
zap.Int("capacity", np.pool.Cap()))
|
||||
}
|
||||
}
|
||||
|
||||
func (np *Processor) handleUpdateState(ev event.Event) {
|
||||
func (np *Processor) handleUpdateState(ctx context.Context, ev event.Event) {
|
||||
updPeer := ev.(netmapEvent.UpdatePeer)
|
||||
np.log.Info(context.Background(), logs.Notification,
|
||||
np.log.Info(ctx, logs.Notification,
|
||||
zap.String("type", "update peer state"),
|
||||
zap.String("key", hex.EncodeToString(updPeer.PublicKey().Bytes())))
|
||||
|
||||
|
@ -76,21 +76,21 @@ func (np *Processor) handleUpdateState(ev event.Event) {
|
|||
})
|
||||
if err != nil {
|
||||
// there system can be moved into controlled degradation stage
|
||||
np.log.Warn(context.Background(), logs.NetmapNetmapWorkerPoolDrained,
|
||||
np.log.Warn(ctx, logs.NetmapNetmapWorkerPoolDrained,
|
||||
zap.Int("capacity", np.pool.Cap()))
|
||||
}
|
||||
}
|
||||
|
||||
func (np *Processor) handleCleanupTick(ev event.Event) {
|
||||
func (np *Processor) handleCleanupTick(ctx context.Context, ev event.Event) {
|
||||
if !np.netmapSnapshot.enabled {
|
||||
np.log.Debug(context.Background(), logs.NetmapNetmapCleanUpRoutineIsDisabled518)
|
||||
np.log.Debug(ctx, logs.NetmapNetmapCleanUpRoutineIsDisabled518)
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
cleanup := ev.(netmapCleanupTick)
|
||||
|
||||
np.log.Info(context.Background(), logs.NetmapTick, zap.String("type", "netmap cleaner"))
|
||||
np.log.Info(ctx, logs.NetmapTick, zap.String("type", "netmap cleaner"))
|
||||
|
||||
// send event to the worker pool
|
||||
err := processors.SubmitEvent(np.pool, np.metrics, "netmap_cleanup_tick", func() bool {
|
||||
|
@ -98,7 +98,7 @@ func (np *Processor) handleCleanupTick(ev event.Event) {
|
|||
})
|
||||
if err != nil {
|
||||
// there system can be moved into controlled degradation stage
|
||||
np.log.Warn(context.Background(), logs.NetmapNetmapWorkerPoolDrained,
|
||||
np.log.Warn(ctx, logs.NetmapNetmapWorkerPoolDrained,
|
||||
zap.Int("capacity", np.pool.Cap()))
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1,6 +1,7 @@
|
|||
package netmap
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"testing"
|
||||
"time"
|
||||
|
@ -38,7 +39,7 @@ func TestNewEpochTick(t *testing.T) {
|
|||
require.NoError(t, err, "failed to create processor")
|
||||
|
||||
ev := timerEvent.NewEpochTick{}
|
||||
proc.HandleNewEpochTick(ev)
|
||||
proc.HandleNewEpochTick(context.Background(), ev)
|
||||
|
||||
for proc.pool.Running() > 0 {
|
||||
time.Sleep(10 * time.Millisecond)
|
||||
|
@ -90,7 +91,7 @@ func TestNewEpoch(t *testing.T) {
|
|||
Num: 101,
|
||||
Hash: util.Uint256{101},
|
||||
}
|
||||
proc.handleNewEpoch(ev)
|
||||
proc.handleNewEpoch(context.Background(), ev)
|
||||
|
||||
for proc.pool.Running() > 0 {
|
||||
time.Sleep(10 * time.Millisecond)
|
||||
|
@ -130,7 +131,7 @@ func TestAddPeer(t *testing.T) {
|
|||
MainTransaction: &transaction.Transaction{},
|
||||
},
|
||||
}
|
||||
proc.handleAddPeer(ev)
|
||||
proc.handleAddPeer(context.Background(), ev)
|
||||
|
||||
for proc.pool.Running() > 0 {
|
||||
time.Sleep(10 * time.Millisecond)
|
||||
|
@ -145,7 +146,7 @@ func TestAddPeer(t *testing.T) {
|
|||
MainTransaction: &transaction.Transaction{},
|
||||
},
|
||||
}
|
||||
proc.handleAddPeer(ev)
|
||||
proc.handleAddPeer(context.Background(), ev)
|
||||
|
||||
for proc.pool.Running() > 0 {
|
||||
time.Sleep(10 * time.Millisecond)
|
||||
|
@ -188,7 +189,7 @@ func TestUpdateState(t *testing.T) {
|
|||
MainTransaction: &transaction.Transaction{},
|
||||
},
|
||||
}
|
||||
proc.handleUpdateState(ev)
|
||||
proc.handleUpdateState(context.Background(), ev)
|
||||
|
||||
for proc.pool.Running() > 0 {
|
||||
time.Sleep(10 * time.Millisecond)
|
||||
|
@ -232,7 +233,7 @@ func TestCleanupTick(t *testing.T) {
|
|||
txHash: util.Uint256{123},
|
||||
}
|
||||
|
||||
proc.handleCleanupTick(ev)
|
||||
proc.handleCleanupTick(context.Background(), ev)
|
||||
|
||||
for proc.pool.Running() > 0 {
|
||||
time.Sleep(10 * time.Millisecond)
|
||||
|
@ -413,6 +414,6 @@ type testEventHandler struct {
|
|||
handledEvents []event.Event
|
||||
}
|
||||
|
||||
func (h *testEventHandler) Handle(e event.Event) {
|
||||
func (h *testEventHandler) Handle(_ context.Context, e event.Event) {
|
||||
h.handledEvents = append(h.handledEvents, e)
|
||||
}
|
||||
|
|
|
@ -11,12 +11,12 @@ import (
|
|||
|
||||
// Process new epoch notification by setting global epoch value and resetting
|
||||
// local epoch timer.
|
||||
func (np *Processor) processNewEpoch(ev netmapEvent.NewEpoch) bool {
|
||||
func (np *Processor) processNewEpoch(ctx context.Context, ev netmapEvent.NewEpoch) bool {
|
||||
epoch := ev.EpochNumber()
|
||||
|
||||
epochDuration, err := np.netmapClient.EpochDuration()
|
||||
if err != nil {
|
||||
np.log.Warn(context.Background(), logs.NetmapCantGetEpochDuration,
|
||||
np.log.Warn(ctx, logs.NetmapCantGetEpochDuration,
|
||||
zap.String("error", err.Error()))
|
||||
} else {
|
||||
np.epochState.SetEpochDuration(epochDuration)
|
||||
|
@ -26,46 +26,46 @@ func (np *Processor) processNewEpoch(ev netmapEvent.NewEpoch) bool {
|
|||
|
||||
h, err := np.netmapClient.MorphTxHeight(ev.TxHash())
|
||||
if err != nil {
|
||||
np.log.Warn(context.Background(), logs.NetmapCantGetTransactionHeight,
|
||||
np.log.Warn(ctx, logs.NetmapCantGetTransactionHeight,
|
||||
zap.String("hash", ev.TxHash().StringLE()),
|
||||
zap.String("error", err.Error()))
|
||||
}
|
||||
|
||||
if err := np.epochTimer.ResetEpochTimer(h); err != nil {
|
||||
np.log.Warn(context.Background(), logs.NetmapCantResetEpochTimer,
|
||||
np.log.Warn(ctx, logs.NetmapCantResetEpochTimer,
|
||||
zap.String("error", err.Error()))
|
||||
}
|
||||
|
||||
// get new netmap snapshot
|
||||
networkMap, err := np.netmapClient.NetMap()
|
||||
if err != nil {
|
||||
np.log.Warn(context.Background(), logs.NetmapCantGetNetmapSnapshotToPerformCleanup,
|
||||
np.log.Warn(ctx, logs.NetmapCantGetNetmapSnapshotToPerformCleanup,
|
||||
zap.String("error", err.Error()))
|
||||
|
||||
return false
|
||||
}
|
||||
|
||||
np.netmapSnapshot.update(*networkMap, epoch)
|
||||
np.handleCleanupTick(netmapCleanupTick{epoch: epoch, txHash: ev.TxHash()})
|
||||
np.handleAlphabetSync(governance.NewSyncEvent(ev.TxHash()))
|
||||
np.handleNotaryDeposit(ev)
|
||||
np.handleCleanupTick(ctx, netmapCleanupTick{epoch: epoch, txHash: ev.TxHash()})
|
||||
np.handleAlphabetSync(ctx, governance.NewSyncEvent(ev.TxHash()))
|
||||
np.handleNotaryDeposit(ctx, ev)
|
||||
|
||||
return true
|
||||
}
|
||||
|
||||
// Process new epoch tick by invoking new epoch method in network map contract.
|
||||
func (np *Processor) processNewEpochTick() bool {
|
||||
func (np *Processor) processNewEpochTick(ctx context.Context) bool {
|
||||
if !np.alphabetState.IsAlphabet() {
|
||||
np.log.Info(context.Background(), logs.NetmapNonAlphabetModeIgnoreNewEpochTick)
|
||||
np.log.Info(ctx, logs.NetmapNonAlphabetModeIgnoreNewEpochTick)
|
||||
return true
|
||||
}
|
||||
|
||||
nextEpoch := np.epochState.EpochCounter() + 1
|
||||
np.log.Debug(context.Background(), logs.NetmapNextEpoch, zap.Uint64("value", nextEpoch))
|
||||
np.log.Debug(ctx, logs.NetmapNextEpoch, zap.Uint64("value", nextEpoch))
|
||||
|
||||
err := np.netmapClient.NewEpoch(nextEpoch)
|
||||
if err != nil {
|
||||
np.log.Error(context.Background(), logs.NetmapCantInvokeNetmapNewEpoch, zap.Error(err))
|
||||
np.log.Error(ctx, logs.NetmapCantInvokeNetmapNewEpoch, zap.Error(err))
|
||||
return false
|
||||
}
|
||||
|
||||
|
|
|
@ -48,8 +48,8 @@ func (s *Server) SetEpochDuration(val uint64) {
|
|||
}
|
||||
|
||||
// IsActive is a getter for a global active flag state.
|
||||
func (s *Server) IsActive() bool {
|
||||
return s.InnerRingIndex() >= 0
|
||||
func (s *Server) IsActive(ctx context.Context) bool {
|
||||
return s.InnerRingIndex(ctx) >= 0
|
||||
}
|
||||
|
||||
// IsAlphabet is a getter for a global alphabet flag state.
|
||||
|
@ -59,10 +59,10 @@ func (s *Server) IsAlphabet() bool {
|
|||
|
||||
// InnerRingIndex is a getter for a global index of node in inner ring list. Negative
|
||||
// index means that node is not in the inner ring list.
|
||||
func (s *Server) InnerRingIndex() int {
|
||||
func (s *Server) InnerRingIndex(ctx context.Context) int {
|
||||
index, err := s.statusIndex.InnerRingIndex()
|
||||
if err != nil {
|
||||
s.log.Error(context.Background(), logs.InnerringCantGetInnerRingIndex, zap.String("error", err.Error()))
|
||||
s.log.Error(ctx, logs.InnerringCantGetInnerRingIndex, zap.String("error", err.Error()))
|
||||
return -1
|
||||
}
|
||||
|
||||
|
@ -71,10 +71,10 @@ func (s *Server) InnerRingIndex() int {
|
|||
|
||||
// InnerRingSize is a getter for a global size of inner ring list. This value
|
||||
// paired with inner ring index.
|
||||
func (s *Server) InnerRingSize() int {
|
||||
func (s *Server) InnerRingSize(ctx context.Context) int {
|
||||
size, err := s.statusIndex.InnerRingSize()
|
||||
if err != nil {
|
||||
s.log.Error(context.Background(), logs.InnerringCantGetInnerRingSize, zap.String("error", err.Error()))
|
||||
s.log.Error(ctx, logs.InnerringCantGetInnerRingSize, zap.String("error", err.Error()))
|
||||
return 0
|
||||
}
|
||||
|
||||
|
@ -93,18 +93,18 @@ func (s *Server) AlphabetIndex() int {
|
|||
return int(index)
|
||||
}
|
||||
|
||||
func (s *Server) voteForSidechainValidator(prm governance.VoteValidatorPrm) error {
|
||||
func (s *Server) voteForSidechainValidator(ctx context.Context, prm governance.VoteValidatorPrm) error {
|
||||
validators := prm.Validators
|
||||
|
||||
index := s.InnerRingIndex()
|
||||
index := s.InnerRingIndex(ctx)
|
||||
if s.contracts.alphabet.indexOutOfRange(index) {
|
||||
s.log.Info(context.Background(), logs.InnerringIgnoreValidatorVoteNodeNotInAlphabetRange)
|
||||
s.log.Info(ctx, logs.InnerringIgnoreValidatorVoteNodeNotInAlphabetRange)
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
if len(validators) == 0 {
|
||||
s.log.Info(context.Background(), logs.InnerringIgnoreValidatorVoteEmptyValidatorsList)
|
||||
s.log.Info(ctx, logs.InnerringIgnoreValidatorVoteEmptyValidatorsList)
|
||||
|
||||
return nil
|
||||
}
|
||||
|
@ -129,7 +129,7 @@ func (s *Server) voteForSidechainValidator(prm governance.VoteValidatorPrm) erro
|
|||
s.contracts.alphabet.iterate(func(letter GlagoliticLetter, contract util.Uint160) {
|
||||
_, err := s.morphClient.NotaryInvoke(contract, s.feeConfig.SideChainFee(), nonce, vubP, voteMethod, epoch, validators)
|
||||
if err != nil {
|
||||
s.log.Warn(context.Background(), logs.InnerringCantInvokeVoteMethodInAlphabetContract,
|
||||
s.log.Warn(ctx, logs.InnerringCantInvokeVoteMethodInAlphabetContract,
|
||||
zap.Int8("alphabet_index", int8(letter)),
|
||||
zap.Uint64("epoch", epoch),
|
||||
zap.String("error", err.Error()))
|
||||
|
@ -141,9 +141,9 @@ func (s *Server) voteForSidechainValidator(prm governance.VoteValidatorPrm) erro
|
|||
|
||||
// VoteForSidechainValidator calls vote method on alphabet contracts with
|
||||
// the provided list of keys.
|
||||
func (s *Server) VoteForSidechainValidator(prm governance.VoteValidatorPrm) error {
|
||||
func (s *Server) VoteForSidechainValidator(ctx context.Context, prm governance.VoteValidatorPrm) error {
|
||||
sort.Sort(prm.Validators)
|
||||
return s.voteForSidechainValidator(prm)
|
||||
return s.voteForSidechainValidator(ctx, prm)
|
||||
}
|
||||
|
||||
// ResetEpochTimer resets the block timer that produces events to update epoch
|
||||
|
|
|
@ -46,9 +46,9 @@ func TestServerState(t *testing.T) {
|
|||
srv.setHealthStatus(context.Background(), healthStatus)
|
||||
require.Equal(t, healthStatus, srv.HealthStatus(), "invalid health status")
|
||||
|
||||
require.True(t, srv.IsActive(), "invalid IsActive result")
|
||||
require.True(t, srv.IsActive(context.Background()), "invalid IsActive result")
|
||||
require.True(t, srv.IsAlphabet(), "invalid IsAlphabet result")
|
||||
require.Equal(t, 0, srv.InnerRingIndex(), "invalid IR index")
|
||||
require.Equal(t, 1, srv.InnerRingSize(), "invalid IR index")
|
||||
require.Equal(t, 0, srv.InnerRingIndex(context.Background()), "invalid IR index")
|
||||
require.Equal(t, 1, srv.InnerRingSize(context.Background()), "invalid IR index")
|
||||
require.Equal(t, 0, srv.AlphabetIndex(), "invalid alphabet index")
|
||||
}
|
||||
|
|
|
@ -1,11 +1,13 @@
|
|||
package event
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
||||
"github.com/nspcc-dev/neo-go/pkg/core/block"
|
||||
)
|
||||
|
||||
// Handler is an Event processing function.
|
||||
type Handler func(Event)
|
||||
type Handler func(context.Context, Event)
|
||||
|
||||
// BlockHandler is a chain block processing function.
|
||||
type BlockHandler func(*block.Block)
|
||||
|
|
|
@ -280,7 +280,7 @@ loop:
|
|||
continue loop
|
||||
}
|
||||
|
||||
l.handleNotaryEvent(notaryEvent)
|
||||
l.handleNotaryEvent(ctx, notaryEvent)
|
||||
case b, ok := <-chs.BlockCh:
|
||||
if !ok {
|
||||
l.log.Warn(ctx, logs.EventStopEventListenerByBlockChannel)
|
||||
|
@ -307,11 +307,11 @@ func (l *listener) handleBlockEvent(b *block.Block) {
|
|||
}
|
||||
}
|
||||
|
||||
func (l *listener) handleNotaryEvent(notaryEvent *result.NotaryRequestEvent) {
|
||||
func (l *listener) handleNotaryEvent(ctx context.Context, notaryEvent *result.NotaryRequestEvent) {
|
||||
if err := l.pool.Submit(func() {
|
||||
l.parseAndHandleNotary(notaryEvent)
|
||||
l.parseAndHandleNotary(ctx, notaryEvent)
|
||||
}); err != nil {
|
||||
l.log.Warn(context.Background(), logs.EventListenerWorkerPoolDrained,
|
||||
l.log.Warn(ctx, logs.EventListenerWorkerPoolDrained,
|
||||
zap.Int("capacity", l.pool.Cap()))
|
||||
}
|
||||
}
|
||||
|
@ -376,11 +376,11 @@ func (l *listener) parseAndHandleNotification(ctx context.Context, notifyEvent *
|
|||
}
|
||||
|
||||
for _, handler := range handlers {
|
||||
handler(event)
|
||||
handler(ctx, event)
|
||||
}
|
||||
}
|
||||
|
||||
func (l *listener) parseAndHandleNotary(nr *result.NotaryRequestEvent) {
|
||||
func (l *listener) parseAndHandleNotary(ctx context.Context, nr *result.NotaryRequestEvent) {
|
||||
// prepare the notary event
|
||||
notaryEvent, err := l.notaryEventsPreparator.Prepare(nr.NotaryRequest)
|
||||
if err != nil {
|
||||
|
@ -388,13 +388,13 @@ func (l *listener) parseAndHandleNotary(nr *result.NotaryRequestEvent) {
|
|||
switch {
|
||||
case errors.Is(err, ErrTXAlreadyHandled):
|
||||
case errors.As(err, &expErr):
|
||||
l.log.Warn(context.Background(), logs.EventSkipExpiredMainTXNotaryEvent,
|
||||
l.log.Warn(ctx, logs.EventSkipExpiredMainTXNotaryEvent,
|
||||
zap.String("error", err.Error()),
|
||||
zap.Uint32("current_block_height", expErr.CurrentBlockHeight),
|
||||
zap.Uint32("fallback_tx_not_valid_before_height", expErr.FallbackTXNotValidBeforeHeight),
|
||||
)
|
||||
default:
|
||||
l.log.Warn(context.Background(), logs.EventCouldNotPrepareAndValidateNotaryEvent,
|
||||
l.log.Warn(ctx, logs.EventCouldNotPrepareAndValidateNotaryEvent,
|
||||
zap.String("error", err.Error()),
|
||||
)
|
||||
}
|
||||
|
@ -418,7 +418,7 @@ func (l *listener) parseAndHandleNotary(nr *result.NotaryRequestEvent) {
|
|||
l.mtx.RUnlock()
|
||||
|
||||
if !ok {
|
||||
log.Debug(context.Background(), logs.EventNotaryParserNotSet)
|
||||
log.Debug(ctx, logs.EventNotaryParserNotSet)
|
||||
|
||||
return
|
||||
}
|
||||
|
@ -426,7 +426,7 @@ func (l *listener) parseAndHandleNotary(nr *result.NotaryRequestEvent) {
|
|||
// parse the notary event
|
||||
event, err := parser(notaryEvent)
|
||||
if err != nil {
|
||||
log.Warn(context.Background(), logs.EventCouldNotParseNotaryEvent,
|
||||
log.Warn(ctx, logs.EventCouldNotParseNotaryEvent,
|
||||
zap.String("error", err.Error()),
|
||||
)
|
||||
|
||||
|
@ -439,14 +439,14 @@ func (l *listener) parseAndHandleNotary(nr *result.NotaryRequestEvent) {
|
|||
l.mtx.RUnlock()
|
||||
|
||||
if !ok {
|
||||
log.Info(context.Background(), logs.EventNotaryHandlersForParsedNotificationEventWereNotRegistered,
|
||||
log.Info(ctx, logs.EventNotaryHandlersForParsedNotificationEventWereNotRegistered,
|
||||
zap.Any("event", event),
|
||||
)
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
handler(event)
|
||||
handler(ctx, event)
|
||||
}
|
||||
|
||||
// SetNotificationParser sets the parser of particular contract event.
|
||||
|
|
|
@ -59,7 +59,7 @@ func TestEventHandling(t *testing.T) {
|
|||
handledNotifications := make([]Event, 0)
|
||||
l.RegisterNotificationHandler(NotificationHandlerInfo{
|
||||
scriptHashWithType: key,
|
||||
h: func(e Event) {
|
||||
h: func(_ context.Context, e Event) {
|
||||
handledNotifications = append(handledNotifications, e)
|
||||
notificationHandled <- true
|
||||
},
|
||||
|
|
|
@ -85,12 +85,12 @@ func (s typeValue) GetType() Type {
|
|||
|
||||
// WorkerPoolHandler sets closure over worker pool w with passed handler h.
|
||||
func WorkerPoolHandler(w util2.WorkerPool, h Handler, log *logger.Logger) Handler {
|
||||
return func(e Event) {
|
||||
return func(ctx context.Context, e Event) {
|
||||
err := w.Submit(func() {
|
||||
h(e)
|
||||
h(ctx, e)
|
||||
})
|
||||
if err != nil {
|
||||
log.Warn(context.Background(), logs.EventCouldNotSubmitHandlerToWorkerPool,
|
||||
log.Warn(ctx, logs.EventCouldNotSubmitHandlerToWorkerPool,
|
||||
zap.String("error", err.Error()),
|
||||
)
|
||||
}
|
||||
|
|
Loading…
Reference in a new issue