import logging from time import sleep from typing import Optional from frostfs_testlib import reporter from frostfs_testlib.cli import FrostfsAdm, FrostfsCli, NeoGo from frostfs_testlib.resources.cli import CLI_DEFAULT_TIMEOUT, FROSTFS_ADM_CONFIG_PATH, FROSTFS_ADM_EXEC, FROSTFS_CLI_EXEC, NEOGO_EXECUTABLE from frostfs_testlib.resources.common import MORPH_BLOCK_TIME from frostfs_testlib.shell import Shell from frostfs_testlib.steps.payment_neogo import get_contract_hash from frostfs_testlib.storage.cluster import Cluster, StorageNode from frostfs_testlib.storage.dataclasses.frostfs_services import InnerRing, MorphChain from frostfs_testlib.testing.test_control import wait_for_success from frostfs_testlib.utils import datetime_utils, wallet_utils logger = logging.getLogger("NeoLogger") @reporter.step("Get epochs from nodes") def get_epochs_from_nodes(shell: Shell, cluster: Cluster) -> dict[str, int]: """ Get current epochs on each node. Args: shell: shell to run commands on. cluster: cluster under test. Returns: Dict of {node_ip: epoch}. """ epochs_by_node = {} for node in cluster.services(StorageNode): epochs_by_node[node.host.config.address] = get_epoch(shell, cluster, node) return epochs_by_node @reporter.step("Ensure fresh epoch") def ensure_fresh_epoch(shell: Shell, cluster: Cluster, alive_node: Optional[StorageNode] = None) -> int: # ensure new fresh epoch to avoid epoch switch during test session alive_node = alive_node if alive_node else cluster.services(StorageNode)[0] current_epoch = get_epoch(shell, cluster, alive_node) tick_epoch(shell, cluster, alive_node) epoch = get_epoch(shell, cluster, alive_node) assert epoch > current_epoch, "Epoch wasn't ticked" return epoch @reporter.step("Wait up to {timeout} seconds for nodes on cluster to align epochs") def wait_for_epochs_align(shell: Shell, cluster: Cluster, timeout=60): @wait_for_success(timeout, 5, None, True) def check_epochs(): epochs_by_node = get_epochs_from_nodes(shell, cluster) assert len(set(epochs_by_node.values())) == 1, f"unaligned epochs found: {epochs_by_node}" check_epochs() @reporter.step("Get Epoch") def get_epoch(shell: Shell, cluster: Cluster, alive_node: Optional[StorageNode] = None): alive_node = alive_node if alive_node else cluster.services(StorageNode)[0] endpoint = alive_node.get_rpc_endpoint() wallet_path = alive_node.get_wallet_path() wallet_config = alive_node.get_wallet_config_path() cli = FrostfsCli(shell=shell, frostfs_cli_exec_path=FROSTFS_CLI_EXEC, config_file=wallet_config) epoch = cli.netmap.epoch(endpoint, wallet_path, timeout=CLI_DEFAULT_TIMEOUT) return int(epoch.stdout) @reporter.step("Tick Epoch") def tick_epoch(shell: Shell, cluster: Cluster, alive_node: Optional[StorageNode] = None): """ Tick epoch using frostfs-adm or NeoGo if frostfs-adm is not available (DevEnv) Args: shell: local shell to make queries about current epoch. Remote shell will be used to tick new one cluster: cluster instance under test alive_node: node to send requests to (first node in cluster by default) """ alive_node = alive_node if alive_node else cluster.services(StorageNode)[0] remote_shell = alive_node.host.get_shell() if "force_transactions" not in alive_node.host.config.attributes: # If frostfs-adm is available, then we tick epoch with it (to be consistent with UAT tests) frostfs_adm = FrostfsAdm( shell=remote_shell, frostfs_adm_exec_path=FROSTFS_ADM_EXEC, config_file=FROSTFS_ADM_CONFIG_PATH, ) frostfs_adm.morph.force_new_epoch() return # Otherwise we tick epoch using transaction cur_epoch = get_epoch(shell, cluster) # Use first node by default ir_node = cluster.services(InnerRing)[0] # In case if no local_wallet_path is provided, we use wallet_path ir_wallet_path = ir_node.get_wallet_path() ir_wallet_pass = ir_node.get_wallet_password() ir_address = wallet_utils.get_last_address_from_wallet(ir_wallet_path, ir_wallet_pass) morph_chain = cluster.services(MorphChain)[0] morph_endpoint = morph_chain.get_endpoint() neogo = NeoGo(shell, neo_go_exec_path=NEOGO_EXECUTABLE) neogo.contract.invokefunction( wallet=ir_wallet_path, wallet_password=ir_wallet_pass, scripthash=get_contract_hash(morph_chain, "netmap.frostfs", shell=shell), method="newEpoch", arguments=f"int:{cur_epoch + 1}", multisig_hash=f"{ir_address}:Global", address=ir_address, rpc_endpoint=morph_endpoint, force=True, gas=1, ) sleep(datetime_utils.parse_time(MORPH_BLOCK_TIME))