[#756] adm: Add polling interval increase

Signed-off-by: Dmitrii Stepanov <d.stepanov@yadro.com>
This commit is contained in:
Dmitrii Stepanov 2023-11-01 11:07:56 +03:00
parent c7a7229484
commit 0b0e5dab24
2 changed files with 72 additions and 23 deletions

View file

@ -7,6 +7,7 @@ import (
"path/filepath" "path/filepath"
"time" "time"
"git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-adm/internal/commonflags"
"git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-adm/internal/modules/config" "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-adm/internal/modules/config"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/innerring" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/innerring"
morphClient "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/client" morphClient "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/client"
@ -376,56 +377,69 @@ func (c *clientContext) awaitTx(cmd *cobra.Command) error {
func awaitTx(cmd *cobra.Command, c Client, txs []hashVUBPair) error { func awaitTx(cmd *cobra.Command, c Client, txs []hashVUBPair) error {
cmd.Println("Waiting for transactions to persist...") cmd.Println("Waiting for transactions to persist...")
const pollInterval = time.Second
tick := time.NewTicker(pollInterval)
defer tick.Stop()
at := trigger.Application at := trigger.Application
var retErr error var retErr error
loop:
for i := range txs {
var it int
var pollInterval time.Duration
var pollIntervalChanged bool
for {
// We must fetch current height before application log, to avoid race condition.
currBlock, err := c.GetBlockCount() currBlock, err := c.GetBlockCount()
if err != nil { if err != nil {
return fmt.Errorf("can't fetch current block height: %w", err) return fmt.Errorf("can't fetch current block height: %w", err)
} }
res, err := c.GetApplicationLog(txs[i].hash, &at)
if err == nil {
if retErr == nil && len(res.Executions) > 0 && res.Executions[0].VMState != vmstate.Halt {
retErr = fmt.Errorf("tx %d persisted in %s state: %s",
i, res.Executions[0].VMState, res.Executions[0].FaultException)
}
continue loop
}
if txs[i].vub < currBlock {
return fmt.Errorf("tx was not persisted: vub=%d, height=%d", txs[i].vub, currBlock)
}
loop: pollInterval, pollIntervalChanged = nextPollInterval(it, pollInterval)
for i := range txs { if pollIntervalChanged && viper.GetBool(commonflags.Verbose) {
res, err := c.GetApplicationLog(txs[i].hash, &at) cmd.Printf("Pool interval to check transaction persistence changed: %s\n", pollInterval.String())
if err == nil {
if retErr == nil && len(res.Executions) > 0 && res.Executions[0].VMState != vmstate.Halt {
retErr = fmt.Errorf("tx %d persisted in %s state: %s",
i, res.Executions[0].VMState, res.Executions[0].FaultException)
} }
continue loop
} timer := time.NewTimer(pollInterval)
if txs[i].vub < currBlock { select {
return fmt.Errorf("tx was not persisted: vub=%d, height=%d", txs[i].vub, currBlock) case <-cmd.Context().Done():
} return cmd.Context().Err()
for range tick.C { case <-timer.C:
// We must fetch current height before application log, to avoid race condition.
currBlock, err = c.GetBlockCount()
if err != nil {
return fmt.Errorf("can't fetch current block height: %w", err)
}
res, err := c.GetApplicationLog(txs[i].hash, &at)
if err == nil {
if retErr == nil && len(res.Executions) > 0 && res.Executions[0].VMState != vmstate.Halt {
retErr = fmt.Errorf("tx %d persisted in %s state: %s",
i, res.Executions[0].VMState, res.Executions[0].FaultException)
}
continue loop
}
if txs[i].vub < currBlock {
return fmt.Errorf("tx was not persisted: vub=%d, height=%d", txs[i].vub, currBlock)
} }
it++
} }
} }
return retErr return retErr
} }
func nextPollInterval(it int, previous time.Duration) (time.Duration, bool) {
const minPollInterval = 1 * time.Second
const maxPollInterval = 16 * time.Second
const changeAfter = 5
if it == 0 {
return minPollInterval, true
}
if it%changeAfter != 0 {
return previous, false
}
nextInterval := previous * 2
if nextInterval > maxPollInterval {
return maxPollInterval, previous != maxPollInterval
}
return nextInterval, true
}
// sendCommitteeTx creates transaction from script, signs it by committee nodes and sends it to RPC. // sendCommitteeTx creates transaction from script, signs it by committee nodes and sends it to RPC.
// If tryGroup is false, global scope is used for the signer (useful when // If tryGroup is false, global scope is used for the signer (useful when
// working with native contracts). // working with native contracts).

View file

@ -145,3 +145,38 @@ func setTestCredentials(v *viper.Viper, size int) {
} }
v.Set("credentials.contract", testContractPassword) v.Set("credentials.contract", testContractPassword)
} }
func TestNextPollInterval(t *testing.T) {
var pollInterval time.Duration
var iteration int
pollInterval, hasChanged := nextPollInterval(iteration, pollInterval)
require.True(t, hasChanged)
require.Equal(t, time.Second, pollInterval)
iteration = 4
pollInterval, hasChanged = nextPollInterval(iteration, pollInterval)
require.False(t, hasChanged)
require.Equal(t, time.Second, pollInterval)
iteration = 5
pollInterval, hasChanged = nextPollInterval(iteration, pollInterval)
require.True(t, hasChanged)
require.Equal(t, 2*time.Second, pollInterval)
iteration = 10
pollInterval, hasChanged = nextPollInterval(iteration, pollInterval)
require.True(t, hasChanged)
require.Equal(t, 4*time.Second, pollInterval)
iteration = 20
pollInterval = 32 * time.Second
pollInterval, hasChanged = nextPollInterval(iteration, pollInterval)
require.True(t, hasChanged) // from 32s to 16s
require.Equal(t, 16*time.Second, pollInterval)
pollInterval = 16 * time.Second
pollInterval, hasChanged = nextPollInterval(iteration, pollInterval)
require.False(t, hasChanged)
require.Equal(t, 16*time.Second, pollInterval)
}