[#1437] ir: Fix contextcheck linters

Signed-off-by: Dmitrii Stepanov <d.stepanov@yadro.com>
This commit is contained in:
Dmitrii Stepanov 2024-10-21 12:21:01 +03:00
parent 47f43db29b
commit 802d2b449f
Signed by: dstepanov-yadro
GPG key ID: 237AF1A763293BC0
27 changed files with 166 additions and 158 deletions

View file

@ -1091,7 +1091,7 @@ func (c *cfg) LocalAddress() network.AddressGroup {
func initLocalStorage(ctx context.Context, c *cfg) {
ls := engine.New(c.engineOpts()...)
addNewEpochAsyncNotificationHandler(c, func(ev event.Event) {
addNewEpochAsyncNotificationHandler(c, func(ctx context.Context, ev event.Event) {
ls.HandleNewEpoch(ctx, ev.(netmap2.NewEpoch).EpochNumber())
})

View file

@ -92,7 +92,7 @@ func configureEACLAndContainerSources(c *cfg, client *cntClient.Client, cnrSrc c
if c.cfgMorph.containerCacheSize > 0 {
containerCache := newCachedContainerStorage(cnrSrc, c.cfgMorph.cacheTTL, c.cfgMorph.containerCacheSize)
subscribeToContainerCreation(c, func(e event.Event) {
subscribeToContainerCreation(c, func(ctx context.Context, e event.Event) {
ev := e.(containerEvent.PutSuccess)
// read owner of the created container in order to update the reading cache.
@ -105,21 +105,21 @@ func configureEACLAndContainerSources(c *cfg, client *cntClient.Client, cnrSrc c
} else {
// unlike removal, we expect successful receive of the container
// after successful creation, so logging can be useful
c.log.Error(context.Background(), logs.FrostFSNodeReadNewlyCreatedContainerAfterTheNotification,
c.log.Error(ctx, logs.FrostFSNodeReadNewlyCreatedContainerAfterTheNotification,
zap.Stringer("id", ev.ID),
zap.Error(err),
)
}
c.log.Debug(context.Background(), logs.FrostFSNodeContainerCreationEventsReceipt,
c.log.Debug(ctx, logs.FrostFSNodeContainerCreationEventsReceipt,
zap.Stringer("id", ev.ID),
)
})
subscribeToContainerRemoval(c, func(e event.Event) {
subscribeToContainerRemoval(c, func(ctx context.Context, e event.Event) {
ev := e.(containerEvent.DeleteSuccess)
containerCache.handleRemoval(ev.ID)
c.log.Debug(context.Background(), logs.FrostFSNodeContainerRemovalEventsReceipt,
c.log.Debug(ctx, logs.FrostFSNodeContainerRemovalEventsReceipt,
zap.Stringer("id", ev.ID),
)
})

View file

@ -172,11 +172,11 @@ func initNetmapService(ctx context.Context, c *cfg) {
}
func addNewEpochNotificationHandlers(c *cfg) {
addNewEpochNotificationHandler(c, func(ev event.Event) {
addNewEpochNotificationHandler(c, func(_ context.Context, ev event.Event) {
c.cfgNetmap.state.setCurrentEpoch(ev.(netmapEvent.NewEpoch).EpochNumber())
})
addNewEpochAsyncNotificationHandler(c, func(ev event.Event) {
addNewEpochAsyncNotificationHandler(c, func(ctx context.Context, ev event.Event) {
e := ev.(netmapEvent.NewEpoch).EpochNumber()
c.updateContractNodeInfo(e)
@ -186,15 +186,15 @@ func addNewEpochNotificationHandlers(c *cfg) {
}
if err := c.bootstrap(); err != nil {
c.log.Warn(context.Background(), logs.FrostFSNodeCantSendRebootstrapTx, zap.Error(err))
c.log.Warn(ctx, logs.FrostFSNodeCantSendRebootstrapTx, zap.Error(err))
}
})
if c.cfgMorph.notaryEnabled {
addNewEpochAsyncNotificationHandler(c, func(_ event.Event) {
addNewEpochAsyncNotificationHandler(c, func(ctx context.Context, _ event.Event) {
_, _, err := makeNotaryDeposit(c)
if err != nil {
c.log.Error(context.Background(), logs.FrostFSNodeCouldNotMakeNotaryDeposit,
c.log.Error(ctx, logs.FrostFSNodeCouldNotMakeNotaryDeposit,
zap.String("error", err.Error()),
)
}

View file

@ -48,7 +48,7 @@ func initSessionService(c *cfg) {
_ = c.privateTokenStore.Close()
})
addNewEpochNotificationHandler(c, func(ev event.Event) {
addNewEpochNotificationHandler(c, func(_ context.Context, ev event.Event) {
c.privateTokenStore.RemoveOld(ev.(netmap.NewEpoch).EpochNumber())
})

View file

@ -79,10 +79,10 @@ func initTreeService(c *cfg) {
}))
if d := treeConfig.SyncInterval(); d == 0 {
addNewEpochNotificationHandler(c, func(_ event.Event) {
addNewEpochNotificationHandler(c, func(ctx context.Context, _ event.Event) {
err := c.treeService.SynchronizeAll()
if err != nil {
c.log.Error(context.Background(), logs.FrostFSNodeCouldNotSynchronizeTreeService, zap.Error(err))
c.log.Error(ctx, logs.FrostFSNodeCouldNotSynchronizeTreeService, zap.Error(err))
}
})
} else {
@ -102,15 +102,15 @@ func initTreeService(c *cfg) {
}()
}
subscribeToContainerRemoval(c, func(e event.Event) {
subscribeToContainerRemoval(c, func(ctx context.Context, e event.Event) {
ev := e.(containerEvent.DeleteSuccess)
// This is executed asynchronously, so we don't care about the operation taking some time.
c.log.Debug(context.Background(), logs.FrostFSNodeRemovingAllTreesForContainer, zap.Stringer("cid", ev.ID))
err := c.treeService.DropTree(context.Background(), ev.ID, "")
c.log.Debug(ctx, logs.FrostFSNodeRemovingAllTreesForContainer, zap.Stringer("cid", ev.ID))
err := c.treeService.DropTree(ctx, ev.ID, "")
if err != nil && !errors.Is(err, pilorama.ErrTreeNotFound) {
// Ignore pilorama.ErrTreeNotFound but other errors, including shard.ErrReadOnly, should be logged.
c.log.Error(context.Background(), logs.FrostFSNodeContainerRemovalEventReceivedButTreesWerentRemoved,
c.log.Error(ctx, logs.FrostFSNodeContainerRemovalEventReceivedButTreesWerentRemoved,
zap.Stringer("cid", ev.ID),
zap.String("error", err.Error()))
}

View file

@ -139,12 +139,12 @@ func (s *Server) enableNotarySupport() error {
return nil
}
func (s *Server) initNotaryConfig() {
func (s *Server) initNotaryConfig(ctx context.Context) {
s.mainNotaryConfig = notaryConfigs(
!s.withoutMainNet && s.mainnetClient.ProbeNotary(), // if mainnet disabled then notary flag must be disabled too
)
s.log.Info(context.Background(), logs.InnerringNotarySupport,
s.log.Info(ctx, logs.InnerringNotarySupport,
zap.Bool("sidechain_enabled", true),
zap.Bool("mainchain_enabled", !s.mainNotaryConfig.disabled),
)
@ -154,8 +154,8 @@ func (s *Server) createAlphaSync(cfg *viper.Viper, frostfsCli *frostfsClient.Cli
var alphaSync event.Handler
if s.withoutMainNet || cfg.GetBool("governance.disable") {
alphaSync = func(event.Event) {
s.log.Debug(context.Background(), logs.InnerringAlphabetKeysSyncIsDisabled)
alphaSync = func(ctx context.Context, _ event.Event) {
s.log.Debug(ctx, logs.InnerringAlphabetKeysSyncIsDisabled)
}
} else {
// create governance processor
@ -198,11 +198,11 @@ func (s *Server) createIRFetcher() irFetcher {
return irf
}
func (s *Server) initTimers(cfg *viper.Viper, morphClients *serverMorphClients) {
func (s *Server) initTimers(ctx context.Context, cfg *viper.Viper, morphClients *serverMorphClients) {
s.epochTimer = newEpochTimer(&epochTimerArgs{
l: s.log,
alphabetState: s,
newEpochHandlers: s.newEpochTickHandlers(),
newEpochHandlers: s.newEpochTickHandlers(ctx),
cnrWrapper: morphClients.CnrClient,
epoch: s,
stopEstimationDMul: cfg.GetUint32("timers.stop_estimation.mul"),

View file

@ -152,7 +152,7 @@ func (s *Server) Start(ctx context.Context, intError chan<- error) (err error) {
return err
}
err = s.initConfigFromBlockchain()
err = s.initConfigFromBlockchain(ctx)
if err != nil {
return err
}
@ -173,14 +173,14 @@ func (s *Server) Start(ctx context.Context, intError chan<- error) (err error) {
prm.Validators = s.predefinedValidators
// vote for sidechain validator if it is prepared in config
err = s.voteForSidechainValidator(prm)
err = s.voteForSidechainValidator(ctx, prm)
if err != nil {
// we don't stop inner ring execution on this error
s.log.Warn(ctx, logs.InnerringCantVoteForPreparedValidators,
zap.String("error", err.Error()))
}
s.tickInitialExpoch()
s.tickInitialExpoch(ctx)
morphErr := make(chan error)
mainnnetErr := make(chan error)
@ -283,11 +283,11 @@ func (s *Server) initSideNotary(ctx context.Context) error {
)
}
func (s *Server) tickInitialExpoch() {
func (s *Server) tickInitialExpoch(ctx context.Context) {
initialEpochTicker := timer.NewOneTickTimer(
timer.StaticBlockMeter(s.initialEpochTickDelta),
func() {
s.netmapProcessor.HandleNewEpochTick(timerEvent.NewEpochTick{})
s.netmapProcessor.HandleNewEpochTick(ctx, timerEvent.NewEpochTick{})
})
s.addBlockTimer(initialEpochTicker)
}
@ -376,7 +376,7 @@ func New(ctx context.Context, log *logger.Logger, cfg *viper.Viper, errChan chan
return nil, err
}
server.initNotaryConfig()
server.initNotaryConfig(ctx)
err = server.initContracts(cfg)
if err != nil {
@ -405,7 +405,7 @@ func New(ctx context.Context, log *logger.Logger, cfg *viper.Viper, errChan chan
return nil, err
}
server.initTimers(cfg, morphClients)
server.initTimers(ctx, cfg, morphClients)
err = server.initGRPCServer(cfg, log, audit)
if err != nil {
@ -573,7 +573,7 @@ func parseMultinetConfig(cfg *viper.Viper, m metrics.MultinetMetrics) internalNe
return nc
}
func (s *Server) initConfigFromBlockchain() error {
func (s *Server) initConfigFromBlockchain(ctx context.Context) error {
// get current epoch
epoch, err := s.netmapClient.Epoch()
if err != nil {
@ -602,8 +602,8 @@ func (s *Server) initConfigFromBlockchain() error {
return err
}
s.log.Debug(context.Background(), logs.InnerringReadConfigFromBlockchain,
zap.Bool("active", s.IsActive()),
s.log.Debug(ctx, logs.InnerringReadConfigFromBlockchain,
zap.Bool("active", s.IsActive(ctx)),
zap.Bool("alphabet", s.IsAlphabet()),
zap.Uint64("epoch", epoch),
zap.Uint32("precision", balancePrecision),
@ -635,17 +635,17 @@ func (s *Server) nextEpochBlockDelta() (uint32, error) {
// onlyAlphabet wrapper around event handler that executes it
// only if inner ring node is alphabet node.
func (s *Server) onlyAlphabetEventHandler(f event.Handler) event.Handler {
return func(ev event.Event) {
return func(ctx context.Context, ev event.Event) {
if s.IsAlphabet() {
f(ev)
f(ctx, ev)
}
}
}
func (s *Server) newEpochTickHandlers() []newEpochHandler {
func (s *Server) newEpochTickHandlers(ctx context.Context) []newEpochHandler {
newEpochHandlers := []newEpochHandler{
func() {
s.netmapProcessor.HandleNewEpochTick(timerEvent.NewEpochTick{})
s.netmapProcessor.HandleNewEpochTick(ctx, timerEvent.NewEpochTick{})
},
}

View file

@ -50,16 +50,16 @@ func (s *Server) depositSideNotary() (util.Uint256, error) {
return tx, err
}
func (s *Server) notaryHandler(_ event.Event) {
func (s *Server) notaryHandler(ctx context.Context, _ event.Event) {
if !s.mainNotaryConfig.disabled {
_, err := s.depositMainNotary()
if err != nil {
s.log.Error(context.Background(), logs.InnerringCantMakeNotaryDepositInMainChain, zap.Error(err))
s.log.Error(ctx, logs.InnerringCantMakeNotaryDepositInMainChain, zap.Error(err))
}
}
if _, err := s.depositSideNotary(); err != nil {
s.log.Error(context.Background(), logs.InnerringCantMakeNotaryDepositInSideChain, zap.Error(err))
s.log.Error(ctx, logs.InnerringCantMakeNotaryDepositInSideChain, zap.Error(err))
}
}

View file

@ -11,9 +11,9 @@ import (
"go.uber.org/zap"
)
func (bp *Processor) handleLock(ev event.Event) {
func (bp *Processor) handleLock(ctx context.Context, ev event.Event) {
lock := ev.(balanceEvent.Lock)
bp.log.Info(context.Background(), logs.Notification,
bp.log.Info(ctx, logs.Notification,
zap.String("type", "lock"),
zap.String("value", hex.EncodeToString(lock.ID())))
@ -24,7 +24,7 @@ func (bp *Processor) handleLock(ev event.Event) {
})
if err != nil {
// there system can be moved into controlled degradation stage
bp.log.Warn(context.Background(), logs.BalanceBalanceWorkerPoolDrained,
bp.log.Warn(ctx, logs.BalanceBalanceWorkerPoolDrained,
zap.Int("capacity", bp.pool.Cap()))
}
}

View file

@ -1,6 +1,7 @@
package balance
import (
"context"
"testing"
"time"
@ -30,7 +31,7 @@ func TestProcessorCallsFrostFSContractForLockEvent(t *testing.T) {
})
require.NoError(t, err, "failed to create processor")
processor.handleLock(balanceEvent.Lock{})
processor.handleLock(context.Background(), balanceEvent.Lock{})
for processor.pool.Running() > 0 {
time.Sleep(10 * time.Millisecond)
@ -56,7 +57,7 @@ func TestProcessorDoesntCallFrostFSContractIfNotAlphabet(t *testing.T) {
})
require.NoError(t, err, "failed to create processor")
processor.handleLock(balanceEvent.Lock{})
processor.handleLock(context.Background(), balanceEvent.Lock{})
for processor.pool.Running() > 0 {
time.Sleep(10 * time.Millisecond)

View file

@ -12,11 +12,11 @@ import (
"go.uber.org/zap"
)
func (cp *Processor) handlePut(ev event.Event) {
func (cp *Processor) handlePut(ctx context.Context, ev event.Event) {
put := ev.(putEvent)
id := sha256.Sum256(put.Container())
cp.log.Info(context.Background(), logs.Notification,
cp.log.Info(ctx, logs.Notification,
zap.String("type", "container put"),
zap.String("id", base58.Encode(id[:])))
@ -27,14 +27,14 @@ func (cp *Processor) handlePut(ev event.Event) {
})
if err != nil {
// there system can be moved into controlled degradation stage
cp.log.Warn(context.Background(), logs.ContainerContainerProcessorWorkerPoolDrained,
cp.log.Warn(ctx, logs.ContainerContainerProcessorWorkerPoolDrained,
zap.Int("capacity", cp.pool.Cap()))
}
}
func (cp *Processor) handleDelete(ev event.Event) {
func (cp *Processor) handleDelete(ctx context.Context, ev event.Event) {
del := ev.(containerEvent.Delete)
cp.log.Info(context.Background(), logs.Notification,
cp.log.Info(ctx, logs.Notification,
zap.String("type", "container delete"),
zap.String("id", base58.Encode(del.ContainerID())))
@ -45,7 +45,7 @@ func (cp *Processor) handleDelete(ev event.Event) {
})
if err != nil {
// there system can be moved into controlled degradation stage
cp.log.Warn(context.Background(), logs.ContainerContainerProcessorWorkerPoolDrained,
cp.log.Warn(ctx, logs.ContainerContainerProcessorWorkerPoolDrained,
zap.Int("capacity", cp.pool.Cap()))
}
}

View file

@ -1,6 +1,7 @@
package container
import (
"context"
"crypto/ecdsa"
"encoding/hex"
"testing"
@ -71,7 +72,7 @@ func TestPutEvent(t *testing.T) {
nr: nr,
}
proc.handlePut(event)
proc.handlePut(context.Background(), event)
for proc.pool.Running() > 0 {
time.Sleep(10 * time.Millisecond)
@ -143,7 +144,7 @@ func TestDeleteEvent(t *testing.T) {
Signature: signature,
}
proc.handleDelete(ev)
proc.handleDelete(context.Background(), ev)
for proc.pool.Running() > 0 {
time.Sleep(10 * time.Millisecond)

View file

@ -13,11 +13,11 @@ import (
"go.uber.org/zap"
)
func (np *Processor) handleDeposit(ev event.Event) {
func (np *Processor) handleDeposit(ctx context.Context, ev event.Event) {
deposit := ev.(frostfsEvent.Deposit)
depositIDBin := bytes.Clone(deposit.ID())
slices.Reverse(depositIDBin)
np.log.Info(context.Background(), logs.Notification,
np.log.Info(ctx, logs.Notification,
zap.String("type", "deposit"),
zap.String("id", hex.EncodeToString(depositIDBin)))
@ -28,16 +28,16 @@ func (np *Processor) handleDeposit(ev event.Event) {
})
if err != nil {
// there system can be moved into controlled degradation stage
np.log.Warn(context.Background(), logs.FrostFSFrostfsProcessorWorkerPoolDrained,
np.log.Warn(ctx, logs.FrostFSFrostfsProcessorWorkerPoolDrained,
zap.Int("capacity", np.pool.Cap()))
}
}
func (np *Processor) handleWithdraw(ev event.Event) {
func (np *Processor) handleWithdraw(ctx context.Context, ev event.Event) {
withdraw := ev.(frostfsEvent.Withdraw)
withdrawBin := bytes.Clone(withdraw.ID())
slices.Reverse(withdrawBin)
np.log.Info(context.Background(), logs.Notification,
np.log.Info(ctx, logs.Notification,
zap.String("type", "withdraw"),
zap.String("id", hex.EncodeToString(withdrawBin)))
@ -48,14 +48,14 @@ func (np *Processor) handleWithdraw(ev event.Event) {
})
if err != nil {
// there system can be moved into controlled degradation stage
np.log.Warn(context.Background(), logs.FrostFSFrostfsProcessorWorkerPoolDrained,
np.log.Warn(ctx, logs.FrostFSFrostfsProcessorWorkerPoolDrained,
zap.Int("capacity", np.pool.Cap()))
}
}
func (np *Processor) handleCheque(ev event.Event) {
func (np *Processor) handleCheque(ctx context.Context, ev event.Event) {
cheque := ev.(frostfsEvent.Cheque)
np.log.Info(context.Background(), logs.Notification,
np.log.Info(ctx, logs.Notification,
zap.String("type", "cheque"),
zap.String("id", hex.EncodeToString(cheque.ID())))
@ -66,14 +66,14 @@ func (np *Processor) handleCheque(ev event.Event) {
})
if err != nil {
// there system can be moved into controlled degradation stage
np.log.Warn(context.Background(), logs.FrostFSFrostfsProcessorWorkerPoolDrained,
np.log.Warn(ctx, logs.FrostFSFrostfsProcessorWorkerPoolDrained,
zap.Int("capacity", np.pool.Cap()))
}
}
func (np *Processor) handleConfig(ev event.Event) {
func (np *Processor) handleConfig(ctx context.Context, ev event.Event) {
cfg := ev.(frostfsEvent.Config)
np.log.Info(context.Background(), logs.Notification,
np.log.Info(ctx, logs.Notification,
zap.String("type", "set config"),
zap.String("key", hex.EncodeToString(cfg.Key())),
zap.String("value", hex.EncodeToString(cfg.Value())))
@ -85,7 +85,7 @@ func (np *Processor) handleConfig(ev event.Event) {
})
if err != nil {
// there system can be moved into controlled degradation stage
np.log.Warn(context.Background(), logs.FrostFSFrostfsProcessorWorkerPoolDrained,
np.log.Warn(ctx, logs.FrostFSFrostfsProcessorWorkerPoolDrained,
zap.Int("capacity", np.pool.Cap()))
}
}

View file

@ -1,6 +1,7 @@
package frostfs
import (
"context"
"testing"
"time"
@ -36,7 +37,7 @@ func TestHandleDeposit(t *testing.T) {
AmountValue: 1000,
}
proc.handleDeposit(ev)
proc.handleDeposit(context.Background(), ev)
for proc.pool.Running() > 0 {
time.Sleep(10 * time.Millisecond)
@ -57,7 +58,7 @@ func TestHandleDeposit(t *testing.T) {
es.epochCounter = 109
proc.handleDeposit(ev)
proc.handleDeposit(context.Background(), ev)
for proc.pool.Running() > 0 {
time.Sleep(10 * time.Millisecond)
@ -98,7 +99,7 @@ func TestHandleWithdraw(t *testing.T) {
AmountValue: 1000,
}
proc.handleWithdraw(ev)
proc.handleWithdraw(context.Background(), ev)
for proc.pool.Running() > 0 {
time.Sleep(10 * time.Millisecond)
@ -139,7 +140,7 @@ func TestHandleCheque(t *testing.T) {
LockValue: util.Uint160{200},
}
proc.handleCheque(ev)
proc.handleCheque(context.Background(), ev)
for proc.pool.Running() > 0 {
time.Sleep(10 * time.Millisecond)
@ -176,7 +177,7 @@ func TestHandleConfig(t *testing.T) {
TxHashValue: util.Uint256{100},
}
proc.handleConfig(ev)
proc.handleConfig(context.Background(), ev)
for proc.pool.Running() > 0 {
time.Sleep(10 * time.Millisecond)

View file

@ -13,7 +13,7 @@ import (
"go.uber.org/zap"
)
func (gp *Processor) HandleAlphabetSync(e event.Event) {
func (gp *Processor) HandleAlphabetSync(ctx context.Context, e event.Event) {
var (
typ string
hash util.Uint256
@ -34,16 +34,16 @@ func (gp *Processor) HandleAlphabetSync(e event.Event) {
return
}
gp.log.Info(context.Background(), logs.GovernanceNewEvent, zap.String("type", typ))
gp.log.Info(ctx, logs.GovernanceNewEvent, zap.String("type", typ))
// send event to the worker pool
err := processors.SubmitEvent(gp.pool, gp.metrics, "alphabet_sync", func() bool {
return gp.processAlphabetSync(hash)
return gp.processAlphabetSync(ctx, hash)
})
if err != nil {
// there system can be moved into controlled degradation stage
gp.log.Warn(context.Background(), logs.GovernanceGovernanceWorkerPoolDrained,
gp.log.Warn(ctx, logs.GovernanceGovernanceWorkerPoolDrained,
zap.Int("capacity", gp.pool.Cap()))
}
}

View file

@ -1,6 +1,7 @@
package governance
import (
"context"
"encoding/binary"
"sort"
"testing"
@ -57,7 +58,7 @@ func TestHandleAlphabetSyncEvent(t *testing.T) {
txHash: util.Uint256{100},
}
proc.HandleAlphabetSync(ev)
proc.HandleAlphabetSync(context.Background(), ev)
for proc.pool.Running() > 0 {
time.Sleep(10 * time.Millisecond)
@ -133,7 +134,7 @@ func TestHandleAlphabetDesignateEvent(t *testing.T) {
Role: noderoles.NeoFSAlphabet,
}
proc.HandleAlphabetSync(ev)
proc.HandleAlphabetSync(context.Background(), ev)
for proc.pool.Running() > 0 {
time.Sleep(10 * time.Millisecond)
@ -226,7 +227,7 @@ type testVoter struct {
votes []VoteValidatorPrm
}
func (v *testVoter) VoteForSidechainValidator(prm VoteValidatorPrm) error {
func (v *testVoter) VoteForSidechainValidator(_ context.Context, prm VoteValidatorPrm) error {
v.votes = append(v.votes, prm)
return nil
}

View file

@ -19,39 +19,39 @@ const (
alphabetUpdateIDPrefix = "AlphabetUpdate"
)
func (gp *Processor) processAlphabetSync(txHash util.Uint256) bool {
func (gp *Processor) processAlphabetSync(ctx context.Context, txHash util.Uint256) bool {
if !gp.alphabetState.IsAlphabet() {
gp.log.Info(context.Background(), logs.GovernanceNonAlphabetModeIgnoreAlphabetSync)
gp.log.Info(ctx, logs.GovernanceNonAlphabetModeIgnoreAlphabetSync)
return true
}
mainnetAlphabet, err := gp.mainnetClient.NeoFSAlphabetList()
if err != nil {
gp.log.Error(context.Background(), logs.GovernanceCantFetchAlphabetListFromMainNet,
gp.log.Error(ctx, logs.GovernanceCantFetchAlphabetListFromMainNet,
zap.String("error", err.Error()))
return false
}
sidechainAlphabet, err := gp.morphClient.Committee()
if err != nil {
gp.log.Error(context.Background(), logs.GovernanceCantFetchAlphabetListFromSideChain,
gp.log.Error(ctx, logs.GovernanceCantFetchAlphabetListFromSideChain,
zap.String("error", err.Error()))
return false
}
newAlphabet, err := newAlphabetList(sidechainAlphabet, mainnetAlphabet)
if err != nil {
gp.log.Error(context.Background(), logs.GovernanceCantMergeAlphabetListsFromMainNetAndSideChain,
gp.log.Error(ctx, logs.GovernanceCantMergeAlphabetListsFromMainNetAndSideChain,
zap.String("error", err.Error()))
return false
}
if newAlphabet == nil {
gp.log.Info(context.Background(), logs.GovernanceNoGovernanceUpdateAlphabetListHasNotBeenChanged)
gp.log.Info(ctx, logs.GovernanceNoGovernanceUpdateAlphabetListHasNotBeenChanged)
return true
}
gp.log.Info(context.Background(), logs.GovernanceAlphabetListHasBeenChangedStartingUpdate,
gp.log.Info(ctx, logs.GovernanceAlphabetListHasBeenChangedStartingUpdate,
zap.String("side_chain_alphabet", prettyKeys(sidechainAlphabet)),
zap.String("new_alphabet", prettyKeys(newAlphabet)),
)
@ -62,9 +62,9 @@ func (gp *Processor) processAlphabetSync(txHash util.Uint256) bool {
}
// 1. Vote to sidechain committee via alphabet contracts.
err = gp.voter.VoteForSidechainValidator(votePrm)
err = gp.voter.VoteForSidechainValidator(ctx, votePrm)
if err != nil {
gp.log.Error(context.Background(), logs.GovernanceCantVoteForSideChainCommittee,
gp.log.Error(ctx, logs.GovernanceCantVoteForSideChainCommittee,
zap.String("error", err.Error()))
}
@ -77,7 +77,7 @@ func (gp *Processor) processAlphabetSync(txHash util.Uint256) bool {
// 4. Update FrostFS contract in the mainnet.
gp.updateFrostFSContractInMainnet(newAlphabet)
gp.log.Info(context.Background(), logs.GovernanceFinishedAlphabetListUpdate)
gp.log.Info(ctx, logs.GovernanceFinishedAlphabetListUpdate)
return true
}

View file

@ -1,6 +1,7 @@
package governance
import (
"context"
"errors"
"fmt"
@ -38,7 +39,7 @@ type VoteValidatorPrm struct {
// Voter is a callback interface for alphabet contract voting.
type Voter interface {
VoteForSidechainValidator(VoteValidatorPrm) error
VoteForSidechainValidator(context.Context, VoteValidatorPrm) error
}
type (

View file

@ -12,13 +12,13 @@ import (
"go.uber.org/zap"
)
func (np *Processor) HandleNewEpochTick(ev event.Event) {
func (np *Processor) HandleNewEpochTick(ctx context.Context, ev event.Event) {
_ = ev.(timerEvent.NewEpochTick)
np.log.Info(context.Background(), logs.NetmapTick, zap.String("type", "epoch"))
// send an event to the worker pool
err := processors.SubmitEvent(np.pool, np.metrics, "netmap_new_epoch_tick", np.processNewEpochTick)
err := processors.SubmitEvent(np.pool, np.metrics, "netmap_new_epoch_tick", func() bool { return np.processNewEpochTick(ctx) })
if err != nil {
// there system can be moved into controlled degradation stage
np.log.Warn(context.Background(), logs.NetmapNetmapWorkerPoolDrained,
@ -26,28 +26,28 @@ func (np *Processor) HandleNewEpochTick(ev event.Event) {
}
}
func (np *Processor) handleNewEpoch(ev event.Event) {
func (np *Processor) handleNewEpoch(ctx context.Context, ev event.Event) {
epochEvent := ev.(netmapEvent.NewEpoch)
np.log.Info(context.Background(), logs.Notification,
np.log.Info(ctx, logs.Notification,
zap.String("type", "new epoch"),
zap.Uint64("value", epochEvent.EpochNumber()))
// send an event to the worker pool
err := processors.SubmitEvent(np.pool, np.metrics, "netmap_new_epoch", func() bool {
return np.processNewEpoch(epochEvent)
return np.processNewEpoch(ctx, epochEvent)
})
if err != nil {
// there system can be moved into controlled degradation stage
np.log.Warn(context.Background(), logs.NetmapNetmapWorkerPoolDrained,
np.log.Warn(ctx, logs.NetmapNetmapWorkerPoolDrained,
zap.Int("capacity", np.pool.Cap()))
}
}
func (np *Processor) handleAddPeer(ev event.Event) {
func (np *Processor) handleAddPeer(ctx context.Context, ev event.Event) {
newPeer := ev.(netmapEvent.AddPeer)
np.log.Info(context.Background(), logs.Notification,
np.log.Info(ctx, logs.Notification,
zap.String("type", "add peer"),
)
@ -58,14 +58,14 @@ func (np *Processor) handleAddPeer(ev event.Event) {
})
if err != nil {
// there system can be moved into controlled degradation stage
np.log.Warn(context.Background(), logs.NetmapNetmapWorkerPoolDrained,
np.log.Warn(ctx, logs.NetmapNetmapWorkerPoolDrained,
zap.Int("capacity", np.pool.Cap()))
}
}
func (np *Processor) handleUpdateState(ev event.Event) {
func (np *Processor) handleUpdateState(ctx context.Context, ev event.Event) {
updPeer := ev.(netmapEvent.UpdatePeer)
np.log.Info(context.Background(), logs.Notification,
np.log.Info(ctx, logs.Notification,
zap.String("type", "update peer state"),
zap.String("key", hex.EncodeToString(updPeer.PublicKey().Bytes())))
@ -76,21 +76,21 @@ func (np *Processor) handleUpdateState(ev event.Event) {
})
if err != nil {
// there system can be moved into controlled degradation stage
np.log.Warn(context.Background(), logs.NetmapNetmapWorkerPoolDrained,
np.log.Warn(ctx, logs.NetmapNetmapWorkerPoolDrained,
zap.Int("capacity", np.pool.Cap()))
}
}
func (np *Processor) handleCleanupTick(ev event.Event) {
func (np *Processor) handleCleanupTick(ctx context.Context, ev event.Event) {
if !np.netmapSnapshot.enabled {
np.log.Debug(context.Background(), logs.NetmapNetmapCleanUpRoutineIsDisabled518)
np.log.Debug(ctx, logs.NetmapNetmapCleanUpRoutineIsDisabled518)
return
}
cleanup := ev.(netmapCleanupTick)
np.log.Info(context.Background(), logs.NetmapTick, zap.String("type", "netmap cleaner"))
np.log.Info(ctx, logs.NetmapTick, zap.String("type", "netmap cleaner"))
// send event to the worker pool
err := processors.SubmitEvent(np.pool, np.metrics, "netmap_cleanup_tick", func() bool {
@ -98,7 +98,7 @@ func (np *Processor) handleCleanupTick(ev event.Event) {
})
if err != nil {
// there system can be moved into controlled degradation stage
np.log.Warn(context.Background(), logs.NetmapNetmapWorkerPoolDrained,
np.log.Warn(ctx, logs.NetmapNetmapWorkerPoolDrained,
zap.Int("capacity", np.pool.Cap()))
}
}

View file

@ -1,6 +1,7 @@
package netmap
import (
"context"
"fmt"
"testing"
"time"
@ -39,7 +40,7 @@ func TestNewEpochTick(t *testing.T) {
require.NoError(t, err, "failed to create processor")
ev := timerEvent.NewEpochTick{}
proc.HandleNewEpochTick(ev)
proc.HandleNewEpochTick(context.Background(), ev)
for proc.pool.Running() > 0 {
time.Sleep(10 * time.Millisecond)
@ -93,7 +94,7 @@ func TestNewEpoch(t *testing.T) {
Num: 101,
Hash: util.Uint256{101},
}
proc.handleNewEpoch(ev)
proc.handleNewEpoch(context.Background(), ev)
for proc.pool.Running() > 0 {
time.Sleep(10 * time.Millisecond)
@ -138,7 +139,7 @@ func TestAddPeer(t *testing.T) {
MainTransaction: &transaction.Transaction{},
},
}
proc.handleAddPeer(ev)
proc.handleAddPeer(context.Background(), ev)
for proc.pool.Running() > 0 {
time.Sleep(10 * time.Millisecond)
@ -153,7 +154,7 @@ func TestAddPeer(t *testing.T) {
MainTransaction: &transaction.Transaction{},
},
}
proc.handleAddPeer(ev)
proc.handleAddPeer(context.Background(), ev)
for proc.pool.Running() > 0 {
time.Sleep(10 * time.Millisecond)
@ -196,7 +197,7 @@ func TestUpdateState(t *testing.T) {
MainTransaction: &transaction.Transaction{},
},
}
proc.handleUpdateState(ev)
proc.handleUpdateState(context.Background(), ev)
for proc.pool.Running() > 0 {
time.Sleep(10 * time.Millisecond)
@ -240,7 +241,7 @@ func TestCleanupTick(t *testing.T) {
txHash: util.Uint256{123},
}
proc.handleCleanupTick(ev)
proc.handleCleanupTick(context.Background(), ev)
for proc.pool.Running() > 0 {
time.Sleep(10 * time.Millisecond)
@ -432,6 +433,6 @@ type testEventHandler struct {
handledEvents []event.Event
}
func (h *testEventHandler) Handle(e event.Event) {
func (h *testEventHandler) Handle(_ context.Context, e event.Event) {
h.handledEvents = append(h.handledEvents, e)
}

View file

@ -12,12 +12,12 @@ import (
// Process new epoch notification by setting global epoch value and resetting
// local epoch timer.
func (np *Processor) processNewEpoch(ev netmapEvent.NewEpoch) bool {
func (np *Processor) processNewEpoch(ctx context.Context, ev netmapEvent.NewEpoch) bool {
epoch := ev.EpochNumber()
epochDuration, err := np.netmapClient.EpochDuration()
if err != nil {
np.log.Warn(context.Background(), logs.NetmapCantGetEpochDuration,
np.log.Warn(ctx, logs.NetmapCantGetEpochDuration,
zap.String("error", err.Error()))
} else {
np.epochState.SetEpochDuration(epochDuration)
@ -27,20 +27,20 @@ func (np *Processor) processNewEpoch(ev netmapEvent.NewEpoch) bool {
h, err := np.netmapClient.MorphTxHeight(ev.TxHash())
if err != nil {
np.log.Warn(context.Background(), logs.NetmapCantGetTransactionHeight,
np.log.Warn(ctx, logs.NetmapCantGetTransactionHeight,
zap.String("hash", ev.TxHash().StringLE()),
zap.String("error", err.Error()))
}
if err := np.epochTimer.ResetEpochTimer(h); err != nil {
np.log.Warn(context.Background(), logs.NetmapCantResetEpochTimer,
np.log.Warn(ctx, logs.NetmapCantResetEpochTimer,
zap.String("error", err.Error()))
}
// get new netmap snapshot
networkMap, err := np.netmapClient.NetMap()
if err != nil {
np.log.Warn(context.Background(), logs.NetmapCantGetNetmapSnapshotToPerformCleanup,
np.log.Warn(ctx, logs.NetmapCantGetNetmapSnapshotToPerformCleanup,
zap.String("error", err.Error()))
return false
@ -54,33 +54,33 @@ func (np *Processor) processNewEpoch(ev netmapEvent.NewEpoch) bool {
if epoch > 0 && np.alphabetState.IsAlphabet() { // estimates are invalid in genesis epoch
err = np.containerWrp.StartEstimation(prm)
if err != nil {
np.log.Warn(context.Background(), logs.NetmapCantStartContainerSizeEstimation,
np.log.Warn(ctx, logs.NetmapCantStartContainerSizeEstimation,
zap.Uint64("epoch", epoch),
zap.String("error", err.Error()))
}
}
np.netmapSnapshot.update(*networkMap, epoch)
np.handleCleanupTick(netmapCleanupTick{epoch: epoch, txHash: ev.TxHash()})
np.handleAlphabetSync(governance.NewSyncEvent(ev.TxHash()))
np.handleNotaryDeposit(ev)
np.handleCleanupTick(ctx, netmapCleanupTick{epoch: epoch, txHash: ev.TxHash()})
np.handleAlphabetSync(ctx, governance.NewSyncEvent(ev.TxHash()))
np.handleNotaryDeposit(ctx, ev)
return true
}
// Process new epoch tick by invoking new epoch method in network map contract.
func (np *Processor) processNewEpochTick() bool {
func (np *Processor) processNewEpochTick(ctx context.Context) bool {
if !np.alphabetState.IsAlphabet() {
np.log.Info(context.Background(), logs.NetmapNonAlphabetModeIgnoreNewEpochTick)
np.log.Info(ctx, logs.NetmapNonAlphabetModeIgnoreNewEpochTick)
return true
}
nextEpoch := np.epochState.EpochCounter() + 1
np.log.Debug(context.Background(), logs.NetmapNextEpoch, zap.Uint64("value", nextEpoch))
np.log.Debug(ctx, logs.NetmapNextEpoch, zap.Uint64("value", nextEpoch))
err := np.netmapClient.NewEpoch(nextEpoch)
if err != nil {
np.log.Error(context.Background(), logs.NetmapCantInvokeNetmapNewEpoch, zap.Error(err))
np.log.Error(ctx, logs.NetmapCantInvokeNetmapNewEpoch, zap.Error(err))
return false
}

View file

@ -48,8 +48,8 @@ func (s *Server) SetEpochDuration(val uint64) {
}
// IsActive is a getter for a global active flag state.
func (s *Server) IsActive() bool {
return s.InnerRingIndex() >= 0
func (s *Server) IsActive(ctx context.Context) bool {
return s.InnerRingIndex(ctx) >= 0
}
// IsAlphabet is a getter for a global alphabet flag state.
@ -59,10 +59,10 @@ func (s *Server) IsAlphabet() bool {
// InnerRingIndex is a getter for a global index of node in inner ring list. Negative
// index means that node is not in the inner ring list.
func (s *Server) InnerRingIndex() int {
func (s *Server) InnerRingIndex(ctx context.Context) int {
index, err := s.statusIndex.InnerRingIndex()
if err != nil {
s.log.Error(context.Background(), logs.InnerringCantGetInnerRingIndex, zap.String("error", err.Error()))
s.log.Error(ctx, logs.InnerringCantGetInnerRingIndex, zap.String("error", err.Error()))
return -1
}
@ -71,10 +71,10 @@ func (s *Server) InnerRingIndex() int {
// InnerRingSize is a getter for a global size of inner ring list. This value
// paired with inner ring index.
func (s *Server) InnerRingSize() int {
func (s *Server) InnerRingSize(ctx context.Context) int {
size, err := s.statusIndex.InnerRingSize()
if err != nil {
s.log.Error(context.Background(), logs.InnerringCantGetInnerRingSize, zap.String("error", err.Error()))
s.log.Error(ctx, logs.InnerringCantGetInnerRingSize, zap.String("error", err.Error()))
return 0
}
@ -93,18 +93,18 @@ func (s *Server) AlphabetIndex() int {
return int(index)
}
func (s *Server) voteForSidechainValidator(prm governance.VoteValidatorPrm) error {
func (s *Server) voteForSidechainValidator(ctx context.Context, prm governance.VoteValidatorPrm) error {
validators := prm.Validators
index := s.InnerRingIndex()
index := s.InnerRingIndex(ctx)
if s.contracts.alphabet.indexOutOfRange(index) {
s.log.Info(context.Background(), logs.InnerringIgnoreValidatorVoteNodeNotInAlphabetRange)
s.log.Info(ctx, logs.InnerringIgnoreValidatorVoteNodeNotInAlphabetRange)
return nil
}
if len(validators) == 0 {
s.log.Info(context.Background(), logs.InnerringIgnoreValidatorVoteEmptyValidatorsList)
s.log.Info(ctx, logs.InnerringIgnoreValidatorVoteEmptyValidatorsList)
return nil
}
@ -129,7 +129,7 @@ func (s *Server) voteForSidechainValidator(prm governance.VoteValidatorPrm) erro
s.contracts.alphabet.iterate(func(letter GlagoliticLetter, contract util.Uint160) {
_, err := s.morphClient.NotaryInvoke(contract, s.feeConfig.SideChainFee(), nonce, vubP, voteMethod, epoch, validators)
if err != nil {
s.log.Warn(context.Background(), logs.InnerringCantInvokeVoteMethodInAlphabetContract,
s.log.Warn(ctx, logs.InnerringCantInvokeVoteMethodInAlphabetContract,
zap.Int8("alphabet_index", int8(letter)),
zap.Uint64("epoch", epoch),
zap.String("error", err.Error()))
@ -141,9 +141,9 @@ func (s *Server) voteForSidechainValidator(prm governance.VoteValidatorPrm) erro
// VoteForSidechainValidator calls vote method on alphabet contracts with
// the provided list of keys.
func (s *Server) VoteForSidechainValidator(prm governance.VoteValidatorPrm) error {
func (s *Server) VoteForSidechainValidator(ctx context.Context, prm governance.VoteValidatorPrm) error {
sort.Sort(prm.Validators)
return s.voteForSidechainValidator(prm)
return s.voteForSidechainValidator(ctx, prm)
}
// ResetEpochTimer resets the block timer that produces events to update epoch

View file

@ -46,9 +46,9 @@ func TestServerState(t *testing.T) {
srv.setHealthStatus(context.Background(), healthStatus)
require.Equal(t, healthStatus, srv.HealthStatus(), "invalid health status")
require.True(t, srv.IsActive(), "invalid IsActive result")
require.True(t, srv.IsActive(context.Background()), "invalid IsActive result")
require.True(t, srv.IsAlphabet(), "invalid IsAlphabet result")
require.Equal(t, 0, srv.InnerRingIndex(), "invalid IR index")
require.Equal(t, 1, srv.InnerRingSize(), "invalid IR index")
require.Equal(t, 0, srv.InnerRingIndex(context.Background()), "invalid IR index")
require.Equal(t, 1, srv.InnerRingSize(context.Background()), "invalid IR index")
require.Equal(t, 0, srv.AlphabetIndex(), "invalid alphabet index")
}

View file

@ -1,11 +1,13 @@
package event
import (
"context"
"github.com/nspcc-dev/neo-go/pkg/core/block"
)
// Handler is an Event processing function.
type Handler func(Event)
type Handler func(context.Context, Event)
// BlockHandler is a chain block processing function.
type BlockHandler func(*block.Block)

View file

@ -280,7 +280,7 @@ loop:
continue loop
}
l.handleNotaryEvent(notaryEvent)
l.handleNotaryEvent(ctx, notaryEvent)
case b, ok := <-chs.BlockCh:
if !ok {
l.log.Warn(ctx, logs.EventStopEventListenerByBlockChannel)
@ -307,11 +307,11 @@ func (l *listener) handleBlockEvent(b *block.Block) {
}
}
func (l *listener) handleNotaryEvent(notaryEvent *result.NotaryRequestEvent) {
func (l *listener) handleNotaryEvent(ctx context.Context, notaryEvent *result.NotaryRequestEvent) {
if err := l.pool.Submit(func() {
l.parseAndHandleNotary(notaryEvent)
l.parseAndHandleNotary(ctx, notaryEvent)
}); err != nil {
l.log.Warn(context.Background(), logs.EventListenerWorkerPoolDrained,
l.log.Warn(ctx, logs.EventListenerWorkerPoolDrained,
zap.Int("capacity", l.pool.Cap()))
}
}
@ -376,11 +376,11 @@ func (l *listener) parseAndHandleNotification(ctx context.Context, notifyEvent *
}
for _, handler := range handlers {
handler(event)
handler(ctx, event)
}
}
func (l *listener) parseAndHandleNotary(nr *result.NotaryRequestEvent) {
func (l *listener) parseAndHandleNotary(ctx context.Context, nr *result.NotaryRequestEvent) {
// prepare the notary event
notaryEvent, err := l.notaryEventsPreparator.Prepare(nr.NotaryRequest)
if err != nil {
@ -388,13 +388,13 @@ func (l *listener) parseAndHandleNotary(nr *result.NotaryRequestEvent) {
switch {
case errors.Is(err, ErrTXAlreadyHandled):
case errors.As(err, &expErr):
l.log.Warn(context.Background(), logs.EventSkipExpiredMainTXNotaryEvent,
l.log.Warn(ctx, logs.EventSkipExpiredMainTXNotaryEvent,
zap.String("error", err.Error()),
zap.Uint32("current_block_height", expErr.CurrentBlockHeight),
zap.Uint32("fallback_tx_not_valid_before_height", expErr.FallbackTXNotValidBeforeHeight),
)
default:
l.log.Warn(context.Background(), logs.EventCouldNotPrepareAndValidateNotaryEvent,
l.log.Warn(ctx, logs.EventCouldNotPrepareAndValidateNotaryEvent,
zap.String("error", err.Error()),
)
}
@ -418,7 +418,7 @@ func (l *listener) parseAndHandleNotary(nr *result.NotaryRequestEvent) {
l.mtx.RUnlock()
if !ok {
log.Debug(context.Background(), logs.EventNotaryParserNotSet)
log.Debug(ctx, logs.EventNotaryParserNotSet)
return
}
@ -426,7 +426,7 @@ func (l *listener) parseAndHandleNotary(nr *result.NotaryRequestEvent) {
// parse the notary event
event, err := parser(notaryEvent)
if err != nil {
log.Warn(context.Background(), logs.EventCouldNotParseNotaryEvent,
log.Warn(ctx, logs.EventCouldNotParseNotaryEvent,
zap.String("error", err.Error()),
)
@ -439,14 +439,14 @@ func (l *listener) parseAndHandleNotary(nr *result.NotaryRequestEvent) {
l.mtx.RUnlock()
if !ok {
log.Info(context.Background(), logs.EventNotaryHandlersForParsedNotificationEventWereNotRegistered,
log.Info(ctx, logs.EventNotaryHandlersForParsedNotificationEventWereNotRegistered,
zap.Any("event", event),
)
return
}
handler(event)
handler(ctx, event)
}
// SetNotificationParser sets the parser of particular contract event.

View file

@ -59,7 +59,7 @@ func TestEventHandling(t *testing.T) {
handledNotifications := make([]Event, 0)
l.RegisterNotificationHandler(NotificationHandlerInfo{
scriptHashWithType: key,
h: func(e Event) {
h: func(_ context.Context, e Event) {
handledNotifications = append(handledNotifications, e)
notificationHandled <- true
},

View file

@ -85,12 +85,12 @@ func (s typeValue) GetType() Type {
// WorkerPoolHandler sets closure over worker pool w with passed handler h.
func WorkerPoolHandler(w util2.WorkerPool, h Handler, log *logger.Logger) Handler {
return func(e Event) {
return func(ctx context.Context, e Event) {
err := w.Submit(func() {
h(e)
h(ctx, e)
})
if err != nil {
log.Warn(context.Background(), logs.EventCouldNotSubmitHandlerToWorkerPool,
log.Warn(ctx, logs.EventCouldNotSubmitHandlerToWorkerPool,
zap.String("error", err.Error()),
)
}