diff --git a/src/frostfs_testlib/cli/frostfs_adm/morph.py b/src/frostfs_testlib/cli/frostfs_adm/morph.py index a1693ac5..1d753d9f 100644 --- a/src/frostfs_testlib/cli/frostfs_adm/morph.py +++ b/src/frostfs_testlib/cli/frostfs_adm/morph.py @@ -27,11 +27,7 @@ class FrostfsAdmMorph(CliCommand): """ return self._execute( "morph deposit-notary", - **{ - param: param_value - for param, param_value in locals().items() - if param not in ["self"] - }, + **{param: param_value for param, param_value in locals().items() if param not in ["self"]}, ) def dump_balances( @@ -56,11 +52,7 @@ class FrostfsAdmMorph(CliCommand): """ return self._execute( "morph dump-balances", - **{ - param: param_value - for param, param_value in locals().items() - if param not in ["self"] - }, + **{param: param_value for param, param_value in locals().items() if param not in ["self"]}, ) def dump_config(self, rpc_endpoint: str) -> CommandResult: @@ -74,11 +66,25 @@ class FrostfsAdmMorph(CliCommand): """ return self._execute( "morph dump-config", - **{ - param: param_value - for param, param_value in locals().items() - if param not in ["self"] - }, + **{param: param_value for param, param_value in locals().items() if param not in ["self"]}, + ) + + def set_config( + self, set_key_value: str, rpc_endpoint: Optional[str] = None, alphabet_wallets: Optional[str] = None + ) -> CommandResult: + """Add/update global config value in the FrostFS network. + + Args: + set_key_value: key1=val1 [key2=val2 ...] + alphabet_wallets: Path to alphabet wallets dir + rpc_endpoint: N3 RPC node endpoint + + Returns: + Command's result. + """ + return self._execute( + f"morph set-config {set_key_value}", + **{param: param_value for param, param_value in locals().items() if param not in ["self", "set_key_value"]}, ) def dump_containers( @@ -101,11 +107,7 @@ class FrostfsAdmMorph(CliCommand): """ return self._execute( "morph dump-containers", - **{ - param: param_value - for param, param_value in locals().items() - if param not in ["self"] - }, + **{param: param_value for param, param_value in locals().items() if param not in ["self"]}, ) def dump_hashes(self, rpc_endpoint: str) -> CommandResult: @@ -119,11 +121,7 @@ class FrostfsAdmMorph(CliCommand): """ return self._execute( "morph dump-hashes", - **{ - param: param_value - for param, param_value in locals().items() - if param not in ["self"] - }, + **{param: param_value for param, param_value in locals().items() if param not in ["self"]}, ) def force_new_epoch( @@ -140,11 +138,7 @@ class FrostfsAdmMorph(CliCommand): """ return self._execute( "morph force-new-epoch", - **{ - param: param_value - for param, param_value in locals().items() - if param not in ["self"] - }, + **{param: param_value for param, param_value in locals().items() if param not in ["self"]}, ) def generate_alphabet( @@ -165,11 +159,7 @@ class FrostfsAdmMorph(CliCommand): """ return self._execute( "morph generate-alphabet", - **{ - param: param_value - for param, param_value in locals().items() - if param not in ["self"] - }, + **{param: param_value for param, param_value in locals().items() if param not in ["self"]}, ) def generate_storage_wallet( @@ -192,11 +182,7 @@ class FrostfsAdmMorph(CliCommand): """ return self._execute( "morph generate-storage-wallet", - **{ - param: param_value - for param, param_value in locals().items() - if param not in ["self"] - }, + **{param: param_value for param, param_value in locals().items() if param not in ["self"]}, ) def init( @@ -232,11 +218,7 @@ class FrostfsAdmMorph(CliCommand): """ return self._execute( "morph init", - **{ - param: param_value - for param, param_value in locals().items() - if param not in ["self"] - }, + **{param: param_value for param, param_value in locals().items() if param not in ["self"]}, ) def refill_gas( @@ -259,11 +241,7 @@ class FrostfsAdmMorph(CliCommand): """ return self._execute( "morph refill-gas", - **{ - param: param_value - for param, param_value in locals().items() - if param not in ["self"] - }, + **{param: param_value for param, param_value in locals().items() if param not in ["self"]}, ) def restore_containers( @@ -286,11 +264,7 @@ class FrostfsAdmMorph(CliCommand): """ return self._execute( "morph restore-containers", - **{ - param: param_value - for param, param_value in locals().items() - if param not in ["self"] - }, + **{param: param_value for param, param_value in locals().items() if param not in ["self"]}, ) def set_policy( @@ -348,17 +322,13 @@ class FrostfsAdmMorph(CliCommand): """ return self._execute( "morph update-contracts", - **{ - param: param_value - for param, param_value in locals().items() - if param not in ["self"] - }, + **{param: param_value for param, param_value in locals().items() if param not in ["self"]}, ) def remove_nodes( self, node_netmap_keys: list[str], rpc_endpoint: Optional[str] = None, alphabet_wallets: Optional[str] = None ) -> CommandResult: - """ Move node to the Offline state in the candidates list + """Move node to the Offline state in the candidates list and tick an epoch to update the netmap using frostfs-adm Args: @@ -371,7 +341,7 @@ class FrostfsAdmMorph(CliCommand): """ if not len(node_netmap_keys): raise AttributeError("Got empty node_netmap_keys list") - + return self._execute( f"morph remove-nodes {' '.join(node_netmap_keys)}", **{ @@ -379,4 +349,4 @@ class FrostfsAdmMorph(CliCommand): for param, param_value in locals().items() if param not in ["self", "node_netmap_keys"] }, - ) \ No newline at end of file + ) diff --git a/src/frostfs_testlib/cli/frostfs_cli/cli.py b/src/frostfs_testlib/cli/frostfs_cli/cli.py index a78da8b6..c20a9877 100644 --- a/src/frostfs_testlib/cli/frostfs_cli/cli.py +++ b/src/frostfs_testlib/cli/frostfs_cli/cli.py @@ -3,6 +3,7 @@ from typing import Optional from frostfs_testlib.cli.frostfs_cli.accounting import FrostfsCliAccounting from frostfs_testlib.cli.frostfs_cli.acl import FrostfsCliACL from frostfs_testlib.cli.frostfs_cli.container import FrostfsCliContainer +from frostfs_testlib.cli.frostfs_cli.control import FrostfsCliControl from frostfs_testlib.cli.frostfs_cli.netmap import FrostfsCliNetmap from frostfs_testlib.cli.frostfs_cli.object import FrostfsCliObject from frostfs_testlib.cli.frostfs_cli.session import FrostfsCliSession @@ -25,6 +26,7 @@ class FrostfsCli: storagegroup: FrostfsCliStorageGroup util: FrostfsCliUtil version: FrostfsCliVersion + control: FrostfsCliControl def __init__(self, shell: Shell, frostfs_cli_exec_path: str, config_file: Optional[str] = None): self.accounting = FrostfsCliAccounting(shell, frostfs_cli_exec_path, config=config_file) @@ -38,3 +40,4 @@ class FrostfsCli: self.util = FrostfsCliUtil(shell, frostfs_cli_exec_path, config=config_file) self.version = FrostfsCliVersion(shell, frostfs_cli_exec_path, config=config_file) self.tree = FrostfsCliTree(shell, frostfs_cli_exec_path, config=config_file) + self.control = FrostfsCliControl(shell, frostfs_cli_exec_path, config=config_file) diff --git a/src/frostfs_testlib/cli/frostfs_cli/control.py b/src/frostfs_testlib/cli/frostfs_cli/control.py new file mode 100644 index 00000000..bfcd6eca --- /dev/null +++ b/src/frostfs_testlib/cli/frostfs_cli/control.py @@ -0,0 +1,58 @@ +from typing import Optional + +from frostfs_testlib.cli.cli_command import CliCommand +from frostfs_testlib.shell import CommandResult + + +class FrostfsCliControl(CliCommand): + def set_status( + self, + endpoint: str, + status: str, + wallet: Optional[str] = None, + force: Optional[bool] = None, + address: Optional[str] = None, + timeout: Optional[str] = None, + ) -> CommandResult: + """Set status of the storage node in FrostFS network map + + Args: + wallet: Path to the wallet or binary key + address: Address of wallet account + endpoint: Remote node control address (as 'multiaddr' or ':') + force: Force turning to local maintenance + status: New netmap status keyword ('online', 'offline', 'maintenance') + timeout: Timeout for an operation (default 15s) + + Returns: + Command`s result. + """ + return self._execute( + "control set-status", + **{param: value for param, value in locals().items() if param not in ["self"]}, + ) + + def healthcheck( + self, + endpoint: str, + wallet: Optional[str] = None, + address: Optional[str] = None, + timeout: Optional[str] = None, + ) -> CommandResult: + """Set status of the storage node in FrostFS network map + + Args: + wallet: Path to the wallet or binary key + address: Address of wallet account + endpoint: Remote node control address (as 'multiaddr' or ':') + force: Force turning to local maintenance + status: New netmap status keyword ('online', 'offline', 'maintenance') + timeout: Timeout for an operation (default 15s) + + Returns: + Command`s result. + """ + return self._execute( + "control healthcheck", + **{param: value for param, value in locals().items() if param not in ["self"]}, + ) diff --git a/src/frostfs_testlib/cli/netmap_parser.py b/src/frostfs_testlib/cli/netmap_parser.py new file mode 100644 index 00000000..6d2eaaac --- /dev/null +++ b/src/frostfs_testlib/cli/netmap_parser.py @@ -0,0 +1,86 @@ +import re + +from frostfs_testlib.storage.cluster import ClusterNode +from frostfs_testlib.storage.dataclasses.storage_object_info import NodeNetInfo, NodeNetmapInfo + + +class NetmapParser: + @staticmethod + def netinfo(output: str) -> NodeNetInfo: + regexes = { + "epoch": r"Epoch: (?P\d+)", + "network_magic": r"Network magic: (?P.*$)", + "time_per_block": r"Time per block: (?P\d+\w+)", + "container_fee": r"Container fee: (?P\d+)", + "epoch_duration": r"Epoch duration: (?P\d+)", + "inner_ring_candidate_fee": r"Inner Ring candidate fee: (?P\d+)", + "maximum_object_size": r"Maximum object size: (?P\d+)", + "withdrawal_fee": r"Withdrawal fee: (?P\d+)", + "homomorphic_hashing_disabled": r"Homomorphic hashing disabled: (?Ptrue|false)", + "maintenance_mode_allowed": r"Maintenance mode allowed: (?Ptrue|false)", + "eigen_trust_alpha": r"EigenTrustAlpha: (?P\d+\w+$)", + "eigen_trust_iterations": r"EigenTrustIterations: (?P\d+)", + } + parse_result = {} + + for key, regex in regexes.items(): + search_result = re.search(regex, output, flags=re.MULTILINE) + if search_result == None: + parse_result[key] = None + continue + parse_result[key] = search_result[key].strip() + + node_netinfo = NodeNetInfo(**parse_result) + + return node_netinfo + + @staticmethod + def snapshot_all_nodes(output: str) -> list[NodeNetmapInfo]: + """The code will parse each line and return each node as dataclass.""" + netmap_nodes = output.split("Node ")[1:] + dataclasses_netmap = [] + result_netmap = {} + + regexes = { + "node_id": r"\d+: (?P\w+)", + "node_data_ips": r"(?P/ip4/.+?)$", + "node_status": r"(?PONLINE|OFFLINE)", + "cluster_name": r"ClusterName: (?P\w+)", + "continent": r"Continent: (?P\w+)", + "country": r"Country: (?P\w+)", + "country_code": r"CountryCode: (?P\w+)", + "external_address": r"ExternalAddr: (?P/ip[4].+?)$", + "location": r"Location: (?P\w+.*)", + "node": r"Node: (?P\d+\.\d+\.\d+\.\d+)", + "price": r"Price: (?P\d+)", + "sub_div": r"SubDiv: (?P.*)", + "sub_div_code": r"SubDivCode: (?P\w+)", + "un_locode": r"UN-LOCODE: (?P\w+.*)", + "role": r"role: (?P\w+)", + } + + for node in netmap_nodes: + for key, regex in regexes.items(): + search_result = re.search(regex, node, flags=re.MULTILINE) + if key == "node_data_ips": + result_netmap[key] = search_result[key].strip().split(" ") + continue + if key == "external_address": + result_netmap[key] = search_result[key].strip().split(",") + continue + if search_result == None: + result_netmap[key] = None + continue + result_netmap[key] = search_result[key].strip() + + dataclasses_netmap.append(NodeNetmapInfo(**result_netmap)) + + return dataclasses_netmap + + @staticmethod + def snapshot_one_node(output: str, cluster_node: ClusterNode) -> NodeNetmapInfo | None: + snapshot_nodes = NetmapParser.snapshot_all_nodes(output=output) + snapshot_node = [node for node in snapshot_nodes if node.node == cluster_node.host_ip] + if not snapshot_node: + return None + return snapshot_node[0] diff --git a/src/frostfs_testlib/shell/local_shell.py b/src/frostfs_testlib/shell/local_shell.py index fa07890b..26c7e9bb 100644 --- a/src/frostfs_testlib/shell/local_shell.py +++ b/src/frostfs_testlib/shell/local_shell.py @@ -62,7 +62,8 @@ class LocalShell(Shell): if options.check and result.return_code != 0: raise RuntimeError( f"Command: {command}\nreturn code: {result.return_code}\n" - f"Output: {result.stdout}" + f"Output: {result.stdout}\n" + f"Stderr: {result.stderr}\n" ) return result @@ -94,9 +95,7 @@ class LocalShell(Shell): return_code=exc.returncode, ) raise RuntimeError( - f"Command: {command}\nError:\n" - f"return code: {exc.returncode}\n" - f"output: {exc.output}" + f"Command: {command}\nError:\n" f"return code: {exc.returncode}\n" f"output: {exc.output}" ) from exc except OSError as exc: raise RuntimeError(f"Command: {command}\nOutput: {exc.strerror}") from exc diff --git a/src/frostfs_testlib/shell/ssh_shell.py b/src/frostfs_testlib/shell/ssh_shell.py index 6db7d517..6b12f81b 100644 --- a/src/frostfs_testlib/shell/ssh_shell.py +++ b/src/frostfs_testlib/shell/ssh_shell.py @@ -6,27 +6,11 @@ from functools import lru_cache, wraps from time import sleep from typing import ClassVar, Optional, Tuple -from paramiko import ( - AutoAddPolicy, - Channel, - ECDSAKey, - Ed25519Key, - PKey, - RSAKey, - SSHClient, - SSHException, - ssh_exception, -) +from paramiko import AutoAddPolicy, Channel, ECDSAKey, Ed25519Key, PKey, RSAKey, SSHClient, SSHException, ssh_exception from paramiko.ssh_exception import AuthenticationException from frostfs_testlib.reporter import get_reporter -from frostfs_testlib.shell.interfaces import ( - CommandInspector, - CommandOptions, - CommandResult, - Shell, - SshCredentials, -) +from frostfs_testlib.shell.interfaces import CommandInspector, CommandOptions, CommandResult, Shell, SshCredentials logger = logging.getLogger("frostfs.testlib.shell") reporter = get_reporter() @@ -97,8 +81,7 @@ class SshConnectionProvider: ) else: logger.info( - f"Trying to connect to host {host} as {creds.ssh_login} using password " - f"(attempt {attempt})" + f"Trying to connect to host {host} as {creds.ssh_login} using password " f"(attempt {attempt})" ) connection.connect( hostname=host, @@ -141,9 +124,7 @@ class HostIsNotAvailable(Exception): def log_command(func): @wraps(func) - def wrapper( - shell: "SSHShell", command: str, options: CommandOptions, *args, **kwargs - ) -> CommandResult: + def wrapper(shell: "SSHShell", command: str, options: CommandOptions, *args, **kwargs) -> CommandResult: command_info = command.removeprefix("$ProgressPreference='SilentlyContinue'\n") with reporter.step(command_info): logger.info(f'Execute command "{command}" on "{shell.host}"') @@ -238,15 +219,13 @@ class SSHShell(Shell): if options.check and result.return_code != 0: raise RuntimeError( - f"Command: {command}\nreturn code: {result.return_code}\nOutput: {result.stdout}" + f"Command: {command}\nreturn code: {result.return_code}\nOutput: {result.stdout}\nStderr: {result.stderr}\n" ) return result @log_command def _exec_interactive(self, command: str, options: CommandOptions) -> CommandResult: - stdin, stdout, stderr = self._connection.exec_command( - command, timeout=options.timeout, get_pty=True - ) + stdin, stdout, stderr = self._connection.exec_command(command, timeout=options.timeout, get_pty=True) for interactive_input in options.interactive_inputs: input = interactive_input.input if not input.endswith("\n"): diff --git a/src/frostfs_testlib/storage/controllers/cluster_state_controller.py b/src/frostfs_testlib/storage/controllers/cluster_state_controller.py index 45c08b3c..27fa034b 100644 --- a/src/frostfs_testlib/storage/controllers/cluster_state_controller.py +++ b/src/frostfs_testlib/storage/controllers/cluster_state_controller.py @@ -3,9 +3,13 @@ import time from typing import TypeVar import frostfs_testlib.resources.optionals as optionals +from frostfs_testlib.cli import FrostfsAdm, FrostfsCli +from frostfs_testlib.cli.netmap_parser import NetmapParser from frostfs_testlib.healthcheck.interfaces import Healthcheck from frostfs_testlib.plugins import load_all from frostfs_testlib.reporter import get_reporter +from frostfs_testlib.resources.cli import FROSTFS_ADM_CONFIG_PATH, FROSTFS_ADM_EXEC, FROSTFS_CLI_EXEC +from frostfs_testlib.resources.common import DEFAULT_WALLET_CONFIG, MORPH_BLOCK_TIME from frostfs_testlib.shell import CommandOptions, Shell, SshConnectionProvider from frostfs_testlib.steps.network import IfUpDownHelper, IpTablesHelper from frostfs_testlib.storage.cluster import Cluster, ClusterNode, S3Gate, StorageNode @@ -13,6 +17,7 @@ from frostfs_testlib.storage.controllers.disk_controller import DiskController from frostfs_testlib.storage.dataclasses.node_base import NodeBase, ServiceClass from frostfs_testlib.testing import parallel from frostfs_testlib.testing.test_control import run_optionally, wait_for_success +from frostfs_testlib.utils.datetime_utils import parse_time from frostfs_testlib.utils.failover_utils import ( wait_all_storage_nodes_returned, wait_for_host_offline, @@ -426,6 +431,79 @@ class ClusterStateController: return parallel(self._disable_date_synchronizer, self.cluster.cluster_nodes) + @reporter.step_deco("Set MaintenanceModeAllowed - {status}") + def set_maintenance_mode_allowed(self, status: str, cluster_node: ClusterNode) -> None: + frostfs_adm = FrostfsAdm( + shell=cluster_node.host.get_shell(), + frostfs_adm_exec_path=FROSTFS_ADM_EXEC, + config_file=FROSTFS_ADM_CONFIG_PATH, + ) + frostfs_adm.morph.set_config(set_key_value=f"MaintenanceModeAllowed={status}") + + @reporter.step_deco("Set mode node to {status}") + def set_mode_node(self, cluster_node: ClusterNode, wallet: str, status: str, await_tick: bool = True) -> None: + rpc_endpoint = cluster_node.storage_node.get_rpc_endpoint() + control_endpoint = cluster_node.service(StorageNode).get_control_endpoint() + + frostfs_adm, frostfs_cli, frostfs_cli_remote = self._get_cli(local_shell=self.shell, cluster_node=cluster_node) + node_netinfo = NetmapParser.netinfo(frostfs_cli.netmap.netinfo(rpc_endpoint=rpc_endpoint, wallet=wallet).stdout) + + with reporter.step("If status maintenance, then check that the option is enabled"): + if node_netinfo.maintenance_mode_allowed == "false": + frostfs_adm.morph.set_config(set_key_value="MaintenanceModeAllowed=true") + + with reporter.step(f"Change the status to {status}"): + frostfs_cli_remote.control.set_status(endpoint=control_endpoint, status=status) + + if not await_tick: + return + + with reporter.step("Tick 1 epoch, and await 2 block"): + frostfs_adm.morph.force_new_epoch() + time.sleep(parse_time(MORPH_BLOCK_TIME) * 2) + + self.check_node_status(status=status, wallet=wallet, cluster_node=cluster_node) + + @wait_for_success(80, 8) + @reporter.step_deco("Check status node, status - {status}") + def check_node_status(self, status: str, wallet: str, cluster_node: ClusterNode): + frostfs_cli = FrostfsCli( + shell=self.shell, frostfs_cli_exec_path=FROSTFS_CLI_EXEC, config_file=DEFAULT_WALLET_CONFIG + ) + netmap = NetmapParser.snapshot_all_nodes( + frostfs_cli.netmap.snapshot(rpc_endpoint=cluster_node.storage_node.get_rpc_endpoint(), wallet=wallet).stdout + ) + netmap = [node for node in netmap if cluster_node.host_ip == node.node] + if status == "offline": + assert cluster_node.host_ip not in netmap, f"{cluster_node.host_ip} not in Offline" + else: + assert netmap[0].node_status == status.upper(), f"Node state - {netmap[0].node_status} != {status} expect" + + def _get_cli(self, local_shell: Shell, cluster_node: ClusterNode) -> tuple[FrostfsAdm, FrostfsCli, FrostfsCli]: + # TODO Move to service config + host = cluster_node.host + service_config = host.get_service_config(cluster_node.storage_node.name) + wallet_path = service_config.attributes["wallet_path"] + wallet_password = service_config.attributes["wallet_password"] + + shell = host.get_shell() + wallet_config_path = f"/tmp/{cluster_node.storage_node.name}-config.yaml" + wallet_config = f'wallet: {wallet_path}\npassword: "{wallet_password}"' + shell.exec(f"echo '{wallet_config}' > {wallet_config_path}") + + frostfs_adm = FrostfsAdm( + shell=shell, frostfs_adm_exec_path=FROSTFS_ADM_EXEC, config_file=FROSTFS_ADM_CONFIG_PATH + ) + frostfs_cli = FrostfsCli( + shell=local_shell, frostfs_cli_exec_path=FROSTFS_CLI_EXEC, config_file=DEFAULT_WALLET_CONFIG + ) + frostfs_cli_remote = FrostfsCli( + shell=shell, + frostfs_cli_exec_path=FROSTFS_CLI_EXEC, + config_file=wallet_config_path, + ) + return frostfs_adm, frostfs_cli, frostfs_cli_remote + def _enable_date_synchronizer(self, cluster_node: ClusterNode): shell = cluster_node.host.get_shell() shell.exec("timedatectl set-ntp true") diff --git a/src/frostfs_testlib/storage/dataclasses/storage_object_info.py b/src/frostfs_testlib/storage/dataclasses/storage_object_info.py index d670d8ec..63a3cf21 100644 --- a/src/frostfs_testlib/storage/dataclasses/storage_object_info.py +++ b/src/frostfs_testlib/storage/dataclasses/storage_object_info.py @@ -1,5 +1,4 @@ from dataclasses import dataclass -from enum import Enum from typing import Optional from frostfs_testlib.testing.readable import HumanReadableEnum @@ -28,10 +27,16 @@ class StorageObjectInfo(ObjectRef): locks: Optional[list[LockObjectInfo]] = None +class ModeNode(HumanReadableEnum): + MAINTENANCE: str = "maintenance" + ONLINE: str = "online" + OFFLINE: str = "offline" + + @dataclass class NodeNetmapInfo: node_id: str = None - node_status: str = None + node_status: ModeNode = None node_data_ips: list[str] = None cluster_name: str = None continent: str = None @@ -53,3 +58,19 @@ class Interfaces(HumanReadableEnum): MGMT: str = "mgmt" INTERNAL_0: str = "internal0" INTERNAL_1: str = "internal1" + + +@dataclass +class NodeNetInfo: + epoch: str = None + network_magic: str = None + time_per_block: str = None + container_fee: str = None + epoch_duration: str = None + inner_ring_candidate_fee: str = None + maximum_object_size: str = None + withdrawal_fee: str = None + homomorphic_hashing_disabled: str = None + maintenance_mode_allowed: str = None + eigen_trust_alpha: str = None + eigen_trust_iterations: str = None