diff --git a/src/frostfs_testlib/cli/frostfs_cli/object.py b/src/frostfs_testlib/cli/frostfs_cli/object.py index 8915914..476af68 100644 --- a/src/frostfs_testlib/cli/frostfs_cli/object.py +++ b/src/frostfs_testlib/cli/frostfs_cli/object.py @@ -351,3 +351,45 @@ class FrostfsCliObject(CliCommand): "object search", **{param: value for param, value in locals().items() if param not in ["self"]}, ) + + def nodes( + self, + rpc_endpoint: str, + wallet: str, + cid: str, + address: Optional[str] = None, + bearer: Optional[str] = None, + generate_key: Optional = None, + oid: Optional[str] = None, + trace: bool = False, + root: bool = False, + verify_presence_all: bool = False, + ttl: Optional[int] = None, + xhdr: Optional[dict] = None, + timeout: Optional[str] = None, + ) -> CommandResult: + """ + Search object nodes. + + Args: + address: Address of wallet account. + bearer: File with signed JSON or binary encoded bearer token. + cid: Container ID. + generate_key: Generate new private key. + oid: Object ID. + trace: Generate trace ID and print it. + root: Search for user objects. + rpc_endpoint: Remote node address (as 'multiaddr' or ':'). + verify_presence_all: Verify the actual presence of the object on all netmap nodes. + ttl: TTL value in request meta header (default 2). + wallet: WIF (NEP-2) string or path to the wallet or binary key. + xhdr: Dict with request X-Headers. + timeout: Timeout for the operation (default 15s). + + Returns: + Command's result. + """ + return self._execute( + "object nodes", + **{param: value for param, value in locals().items() if param not in ["self"]}, + ) diff --git a/src/frostfs_testlib/hosting/config.py b/src/frostfs_testlib/hosting/config.py index dd8b4b9..6679470 100644 --- a/src/frostfs_testlib/hosting/config.py +++ b/src/frostfs_testlib/hosting/config.py @@ -64,6 +64,7 @@ class HostConfig: services: list[ServiceConfig] = field(default_factory=list) clis: list[CLIConfig] = field(default_factory=list) attributes: dict[str, str] = field(default_factory=dict) + interfaces: dict[str, str] = field(default_factory=dict) def __post_init__(self) -> None: self.services = [ServiceConfig(**service) for service in self.services or []] diff --git a/src/frostfs_testlib/steps/cli/object.py b/src/frostfs_testlib/steps/cli/object.py index 9a63604..9c7c694 100644 --- a/src/frostfs_testlib/steps/cli/object.py +++ b/src/frostfs_testlib/steps/cli/object.py @@ -11,8 +11,9 @@ from frostfs_testlib.reporter import get_reporter from frostfs_testlib.resources.cli import CLI_DEFAULT_TIMEOUT, FROSTFS_CLI_EXEC, NEOGO_EXECUTABLE from frostfs_testlib.resources.common import ASSETS_DIR, DEFAULT_WALLET_CONFIG from frostfs_testlib.shell import Shell -from frostfs_testlib.storage.cluster import Cluster +from frostfs_testlib.storage.cluster import Cluster, ClusterNode from frostfs_testlib.utils import json_utils +from frostfs_testlib.utils.cli_utils import parse_cmd_table, parse_netmap_output logger = logging.getLogger("NeoLogger") reporter = get_reporter() @@ -731,3 +732,62 @@ def neo_go_query_height(shell: Shell, endpoint: str) -> dict: latest_block[0].replace(":", ""): int(latest_block[1]), validated_state[0].replace(":", ""): int(validated_state[1]), } + + +@reporter.step_deco("Search object nodes") +def get_object_nodes( + cluster: Cluster, + wallet: str, + cid: str, + oid: str, + shell: Shell, + endpoint: str, + bearer: str = "", + xhdr: Optional[dict] = None, + is_direct: bool = False, + verify_presence_all: bool = False, + wallet_config: Optional[str] = None, + timeout: Optional[str] = CLI_DEFAULT_TIMEOUT, +) -> list[ClusterNode]: + cli = FrostfsCli(shell, FROSTFS_CLI_EXEC, wallet_config or DEFAULT_WALLET_CONFIG) + + result_object_nodes = cli.object.nodes( + rpc_endpoint=endpoint, + wallet=wallet, + cid=cid, + oid=oid, + bearer=bearer, + ttl=1 if is_direct else None, + xhdr=xhdr, + timeout=timeout, + verify_presence_all=verify_presence_all, + ) + + parsing_output = parse_cmd_table(result_object_nodes.stdout, "|") + list_object_nodes = [ + node + for node in parsing_output + if node["should_contain_object"] == "true" and node["actually_contains_object"] == "true" + ] + + netmap_nodes_list = parse_netmap_output( + cli.netmap.snapshot( + rpc_endpoint=endpoint, + wallet=wallet, + ).stdout + ) + netmap_nodes = [ + netmap_node + for object_node in list_object_nodes + for netmap_node in netmap_nodes_list + if object_node["node_id"] == netmap_node.node_id + ] + + result = [ + cluster_node + for netmap_node in netmap_nodes + for cluster_node in cluster.cluster_nodes + if netmap_node.node == cluster_node.host_ip + ] + + return result diff --git a/src/frostfs_testlib/steps/iptables.py b/src/frostfs_testlib/steps/iptables.py new file mode 100644 index 0000000..db0bb22 --- /dev/null +++ b/src/frostfs_testlib/steps/iptables.py @@ -0,0 +1,42 @@ +from frostfs_testlib.shell import Shell +from frostfs_testlib.storage.cluster import ClusterNode + + +class IpTablesHelper: + @staticmethod + def drop_input_traffic_to_port(node: ClusterNode, ports: list[str]) -> None: + shell = node.host.get_shell() + for port in ports: + shell.exec(f"iptables -A INPUT -p tcp --dport {port} -j DROP") + + @staticmethod + def drop_input_traffic_to_node(node: ClusterNode, block_ip: list[str]) -> None: + shell = node.host.get_shell() + for ip in block_ip: + shell.exec(f"iptables -A INPUT -s {ip} -j DROP") + + @staticmethod + def restore_input_traffic_to_port(node: ClusterNode) -> None: + shell = node.host.get_shell() + ports = ( + shell.exec("iptables -L --numeric | grep DROP | awk '{print $7}'") + .stdout.strip() + .split("\n") + ) + if ports[0] == "": + return + for port in ports: + shell.exec(f"iptables -D INPUT -p tcp --dport {port.split(':')[-1]} -j DROP") + + @staticmethod + def restore_input_traffic_to_node(node: ClusterNode) -> None: + shell = node.host.get_shell() + unlock_ip = ( + shell.exec("iptables -L --numeric | grep DROP | awk '{print $4}'") + .stdout.strip() + .split("\n") + ) + if unlock_ip[0] == "": + return + for ip in unlock_ip: + shell.exec(f"iptables -D INPUT -s {ip} -j DROP") diff --git a/src/frostfs_testlib/storage/controllers/cluster_state_controller.py b/src/frostfs_testlib/storage/controllers/cluster_state_controller.py index 3a2b509..2d439d9 100644 --- a/src/frostfs_testlib/storage/controllers/cluster_state_controller.py +++ b/src/frostfs_testlib/storage/controllers/cluster_state_controller.py @@ -1,13 +1,16 @@ import copy +import itertools import time import frostfs_testlib.resources.optionals as optionals from frostfs_testlib.reporter import get_reporter from frostfs_testlib.shell import CommandOptions, Shell +from frostfs_testlib.steps import epoch +from frostfs_testlib.steps.iptables import IpTablesHelper from frostfs_testlib.storage.cluster import Cluster, ClusterNode, StorageNode from frostfs_testlib.storage.controllers.disk_controller import DiskController from frostfs_testlib.testing import parallel -from frostfs_testlib.testing.test_control import run_optionally +from frostfs_testlib.testing.test_control import run_optionally, wait_for_success from frostfs_testlib.utils.failover_utils import ( wait_all_storage_nodes_returned, wait_for_host_offline, @@ -24,6 +27,7 @@ class ClusterStateController: self.detached_disks: dict[str, DiskController] = {} self.stopped_storage_nodes: list[ClusterNode] = [] self.stopped_s3_gates: list[ClusterNode] = [] + self.dropped_traffic: list[ClusterNode] = [] self.cluster = cluster self.shell = shell self.suspended_services: dict[str, list[ClusterNode]] = {} @@ -191,6 +195,62 @@ class ClusterStateController: [node.host.wait_success_resume_process(process_name) for node in list_nodes] self.suspended_services = {} + @reporter.step_deco("Drop traffic to {node}, with ports - {ports}, nodes - {block_nodes}") + def drop_traffic( + self, + mode: str, + node: ClusterNode, + wakeup_timeout: int, + ports: list[str] = None, + block_nodes: list[ClusterNode] = None, + ) -> None: + allowed_modes = ["ports", "nodes"] + assert mode in allowed_modes + + match mode: + case "ports": + IpTablesHelper.drop_input_traffic_to_port(node, ports) + case "nodes": + list_ip = self._parse_intefaces(block_nodes) + IpTablesHelper.drop_input_traffic_to_node(node, list_ip) + time.sleep(wakeup_timeout) + self.dropped_traffic.append(node) + + @reporter.step_deco("Ping traffic") + def ping_traffic( + self, + node: ClusterNode, + nodes_list: list[ClusterNode], + expect_result: int, + ) -> bool: + shell = node.host.get_shell() + options = CommandOptions(check=False) + ips = self._parse_intefaces(nodes_list) + for ip in ips: + code = shell.exec(f"ping {ip} -c 1", options).return_code + if code != expect_result: + return False + return True + + @reporter.step_deco("Start traffic to {node}") + def restore_traffic( + self, + mode: str, + node: ClusterNode, + ) -> None: + allowed_modes = ["ports", "nodes"] + assert mode in allowed_modes + + match mode: + case "ports": + IpTablesHelper.restore_input_traffic_to_port(node=node) + case "nodes": + IpTablesHelper.restore_input_traffic_to_node(node=node) + + @reporter.step_deco("Restore blocked nodes") + def restore_all_traffic(self): + parallel(self._restore_traffic_to_node, self.dropped_traffic) + @run_optionally(optionals.OPTIONAL_FAILOVER_ENABLED) @reporter.step_deco("Hard reboot host {node} via magic SysRq option") def panic_reboot_host(self, node: ClusterNode, wait_for_return: bool = True): @@ -217,3 +277,16 @@ class ClusterStateController: disk_controller = DiskController(node, device, mountpoint) return disk_controller + + def _restore_traffic_to_node(self, node): + IpTablesHelper.restore_input_traffic_to_port(node) + IpTablesHelper.restore_input_traffic_to_node(node) + + def _parse_intefaces(self, nodes: list[ClusterNode]): + interfaces = [] + for node in nodes: + dict_interfaces = node.host.config.interfaces + for type, ip in dict_interfaces.items(): + if "mgmt" not in type: + interfaces.append(ip) + return interfaces diff --git a/src/frostfs_testlib/storage/dataclasses/storage_object_info.py b/src/frostfs_testlib/storage/dataclasses/storage_object_info.py index dd46740..7747ea8 100644 --- a/src/frostfs_testlib/storage/dataclasses/storage_object_info.py +++ b/src/frostfs_testlib/storage/dataclasses/storage_object_info.py @@ -23,3 +23,21 @@ class StorageObjectInfo(ObjectRef): attributes: Optional[list[dict[str, str]]] = None tombstone: Optional[str] = None locks: Optional[list[LockObjectInfo]] = None + + +@dataclass +class NodeNetmapInfo: + node_id: str + node_status: str + node_data_ip: str + continent: str + country: str + country_code: str + external_address: str + location: str + node: str + price: int + sub_div: str + sub_div_code: int + un_locode: str + role: str diff --git a/src/frostfs_testlib/utils/cli_utils.py b/src/frostfs_testlib/utils/cli_utils.py index d869714..5bd4695 100644 --- a/src/frostfs_testlib/utils/cli_utils.py +++ b/src/frostfs_testlib/utils/cli_utils.py @@ -5,18 +5,21 @@ """ Helper functions to use with `frostfs-cli`, `neo-go` and other CLIs. """ +import csv import json import logging import subprocess import sys from contextlib import suppress from datetime import datetime +from io import StringIO from textwrap import shorten -from typing import TypedDict, Union +from typing import Dict, List, TypedDict, Union import pexpect from frostfs_testlib.reporter import get_reporter +from frostfs_testlib.storage.dataclasses.storage_object_info import NodeNetmapInfo reporter = get_reporter() logger = logging.getLogger("NeoLogger") @@ -131,3 +134,49 @@ def log_command_execution(cmd: str, output: Union[str, TypedDict]) -> None: command_attachment = f"COMMAND: '{cmd}'\n" f"OUTPUT:\n {output}\n" with reporter.step(f'COMMAND: {shorten(cmd, width=60, placeholder="...")}'): reporter.attach(command_attachment, "Command execution") + + +def parse_netmap_output(output: str) -> list[NodeNetmapInfo]: + """ + The cli command will return something like. + + Epoch: 240 + Node 1: 01234 ONLINE /ip4/10.10.10.10/tcp/8080 + Continent: Europe + Country: Russia + CountryCode: RU + ExternalAddr: /ip4/10.10.11.18/tcp/8080 + Location: Moskva + Node: 10.10.10.12 + Price: 5 + SubDiv: Moskva + SubDivCode: MOW + UN-LOCODE: RU MOW + role: alphabet + + The code will parse each line and return each node as dataclass. + """ + netmap_list = output.split("Node ")[1:] + dataclass_list = [] + for node in netmap_list: + node = node.replace("\t", "").split("\n") + node = *node[0].split(" ")[1:-1], *[row.split(": ")[-1] for row in node[1:-1]] + dataclass_list.append(NodeNetmapInfo(*node)) + + return dataclass_list + + +def parse_cmd_table(output: str, delimiter="|") -> list[dict[str, str]]: + parsing_output = [] + reader = csv.reader(StringIO(output.strip()), delimiter=delimiter) + iter_reader = iter(reader) + header_row = next(iter_reader) + for row in iter_reader: + table = {} + for i in range(len(row)): + header = header_row[i].strip().lower().replace(" ", "_") + value = row[i].strip().lower() + if header: + table[header] = value + parsing_output.append(table) + return parsing_output