diff --git a/src/frostfs_testlib/cli/frostfs_cli/container.py b/src/frostfs_testlib/cli/frostfs_cli/container.py index 533ff1a8..5ea8ba83 100644 --- a/src/frostfs_testlib/cli/frostfs_cli/container.py +++ b/src/frostfs_testlib/cli/frostfs_cli/container.py @@ -262,3 +262,45 @@ class FrostfsCliContainer(CliCommand): "container set-eacl", **{param: value for param, value in locals().items() if param not in ["self"]}, ) + + def search_node( + self, + rpc_endpoint: str, + wallet: str, + cid: str, + address: Optional[str] = None, + ttl: Optional[int] = None, + from_file: Optional[str] = None, + short: Optional[bool] = True, + xhdr: Optional[dict] = None, + generate_key: Optional[bool] = None, + timeout: Optional[str] = None, + ) -> CommandResult: + """ + Show the nodes participating in the container in the current epoch. + + Args: + rpc_endpoint: string Remote host address (as 'multiaddr' or ':') + wallet: WIF (NEP-2) string or path to the wallet or binary key. + cid: Container ID. + address: Address of wallet account. + ttl: TTL value in request meta header (default 2). + from_file: string File path with encoded container + timeout: duration Timeout for the operation (default 15 s) + short: shorten the output of node information. + xhdr: Dict with request X-Headers. + generate_key: Generate a new private key + + Returns: + + """ + from_str = f"--from {from_file}" if from_file else "" + + return self._execute( + f"container nodes {from_str}", + **{ + param: value + for param, value in locals().items() + if param not in ["self", "from_file", "from_str"] + }, + ) diff --git a/src/frostfs_testlib/controllers/cluster_state_controller.py b/src/frostfs_testlib/controllers/cluster_state_controller.py new file mode 100644 index 00000000..e69de29b diff --git a/src/frostfs_testlib/hosting/docker_host.py b/src/frostfs_testlib/hosting/docker_host.py index b7f4852a..1e323402 100644 --- a/src/frostfs_testlib/hosting/docker_host.py +++ b/src/frostfs_testlib/hosting/docker_host.py @@ -117,6 +117,12 @@ class DockerHost(Host): timeout=service_attributes.stop_timeout, ) + def wait_success_suspend_process(self, service_name: str): + raise NotImplementedError("Not supported for docker") + + def wait_success_resume_process(self, service_name: str): + raise NotImplementedError("Not supported for docker") + def restart_service(self, service_name: str) -> None: service_attributes = self._get_service_attributes(service_name) diff --git a/src/frostfs_testlib/hosting/interfaces.py b/src/frostfs_testlib/hosting/interfaces.py index 91785234..95536c63 100644 --- a/src/frostfs_testlib/hosting/interfaces.py +++ b/src/frostfs_testlib/hosting/interfaces.py @@ -112,6 +112,20 @@ class Host(ABC): service_name: Name of the service to restart. """ + @abstractmethod + def wait_success_suspend_process(self, process_name: str) -> None: + """Search for a service ID by its name and stop the process + Args: + process_name: Name + """ + + @abstractmethod + def wait_success_resume_process(self, process_name: str) -> None: + """Search for a service by its ID and start the process + Args: + process_name: Name + """ + @abstractmethod def delete_storage_node_data(self, service_name: str, cache_only: bool = False) -> None: """Erases all data of the storage node with specified name. diff --git a/src/frostfs_testlib/steps/cli/container.py b/src/frostfs_testlib/steps/cli/container.py index 89070c47..74f445a7 100644 --- a/src/frostfs_testlib/steps/cli/container.py +++ b/src/frostfs_testlib/steps/cli/container.py @@ -1,5 +1,6 @@ import json import logging +import re from dataclasses import dataclass from time import sleep from typing import Optional, Union @@ -10,7 +11,7 @@ from frostfs_testlib.resources.cli import CLI_DEFAULT_TIMEOUT, FROSTFS_CLI_EXEC from frostfs_testlib.resources.common import DEFAULT_WALLET_CONFIG from frostfs_testlib.shell import Shell from frostfs_testlib.steps.cli.object import put_object, put_object_to_random_node -from frostfs_testlib.storage.cluster import Cluster +from frostfs_testlib.storage.cluster import Cluster, ClusterNode from frostfs_testlib.storage.dataclasses.storage_object_info import StorageObjectInfo from frostfs_testlib.storage.dataclasses.wallet import WalletInfo from frostfs_testlib.utils import json_utils @@ -357,3 +358,27 @@ def search_container_by_name(wallet: str, name: str, shell: Shell, endpoint: str if cont_info.get("attributes", {}).get("Name", None) == name: return cid return None + + +@reporter.step_deco("Search for nodes with a container") +def search_nodes_with_container( + wallet: str, + cid: str, + shell: Shell, + endpoint: str, + cluster: Cluster, + timeout: Optional[str] = CLI_DEFAULT_TIMEOUT, +) -> list[ClusterNode]: + cli = FrostfsCli(shell, FROSTFS_CLI_EXEC, DEFAULT_WALLET_CONFIG) + result = cli.container.search_node( + rpc_endpoint=endpoint, wallet=wallet, cid=cid, timeout=timeout + ) + + pattern = r"[0-9]+(?:\.[0-9]+){3}" + nodes_ip = list(set(re.findall(pattern, result.stdout))) + + with reporter.step(f"nodes ips = {nodes_ip}"): + nodes_list = cluster.get_nodes_by_ip(nodes_ip) + + with reporter.step(f"Return nodes - {nodes_list}"): + return nodes_list diff --git a/src/frostfs_testlib/steps/http/http_gate.py b/src/frostfs_testlib/steps/http/http_gate.py index c9769fb6..64bb5ce0 100644 --- a/src/frostfs_testlib/steps/http/http_gate.py +++ b/src/frostfs_testlib/steps/http/http_gate.py @@ -28,7 +28,13 @@ ASSETS_DIR = os.getenv("ASSETS_DIR", "TemporaryDir/") @reporter.step_deco("Get via HTTP Gate") -def get_via_http_gate(cid: str, oid: str, endpoint: str, request_path: Optional[str] = None): +def get_via_http_gate( + cid: str, + oid: str, + endpoint: str, + request_path: Optional[str] = None, + timeout: Optional[int] = 300, +): """ This function gets given object from HTTP gate cid: container id to get object from @@ -43,7 +49,7 @@ def get_via_http_gate(cid: str, oid: str, endpoint: str, request_path: Optional[ else: request = f"{endpoint}{request_path}" - resp = requests.get(request, stream=True) + resp = requests.get(request, stream=True, timeout=timeout) if not resp.ok: raise Exception( @@ -63,7 +69,7 @@ def get_via_http_gate(cid: str, oid: str, endpoint: str, request_path: Optional[ @reporter.step_deco("Get via Zip HTTP Gate") -def get_via_zip_http_gate(cid: str, prefix: str, endpoint: str): +def get_via_zip_http_gate(cid: str, prefix: str, endpoint: str, timeout: Optional[int] = 300): """ This function gets given object from HTTP gate cid: container id to get object from @@ -71,7 +77,7 @@ def get_via_zip_http_gate(cid: str, prefix: str, endpoint: str): endpoint: http gate endpoint """ request = f"{endpoint}/zip/{cid}/{prefix}" - resp = requests.get(request, stream=True) + resp = requests.get(request, stream=True, timeout=timeout) if not resp.ok: raise Exception( @@ -96,7 +102,11 @@ def get_via_zip_http_gate(cid: str, prefix: str, endpoint: str): @reporter.step_deco("Get via HTTP Gate by attribute") def get_via_http_gate_by_attribute( - cid: str, attribute: dict, endpoint: str, request_path: Optional[str] = None + cid: str, + attribute: dict, + endpoint: str, + request_path: Optional[str] = None, + timeout: Optional[int] = 300, ): """ This function gets given object from HTTP gate @@ -113,7 +123,7 @@ def get_via_http_gate_by_attribute( else: request = f"{endpoint}{request_path}" - resp = requests.get(request, stream=True) + resp = requests.get(request, stream=True, timeout=timeout) if not resp.ok: raise Exception( @@ -133,7 +143,9 @@ def get_via_http_gate_by_attribute( @reporter.step_deco("Upload via HTTP Gate") -def upload_via_http_gate(cid: str, path: str, endpoint: str, headers: Optional[dict] = None) -> str: +def upload_via_http_gate( + cid: str, path: str, endpoint: str, headers: Optional[dict] = None, timeout: Optional[int] = 300 +) -> str: """ This function upload given object through HTTP gate cid: CID to get object from @@ -144,7 +156,7 @@ def upload_via_http_gate(cid: str, path: str, endpoint: str, headers: Optional[d request = f"{endpoint}/upload/{cid}" files = {"upload_file": open(path, "rb")} body = {"filename": path} - resp = requests.post(request, files=files, data=body, headers=headers) + resp = requests.post(request, files=files, data=body, headers=headers, timeout=timeout) if not resp.ok: raise Exception( diff --git a/src/frostfs_testlib/steps/s3/s3_helper.py b/src/frostfs_testlib/steps/s3/s3_helper.py index 87f929ea..0c6c4489 100644 --- a/src/frostfs_testlib/steps/s3/s3_helper.py +++ b/src/frostfs_testlib/steps/s3/s3_helper.py @@ -12,7 +12,12 @@ from frostfs_testlib.reporter import get_reporter from frostfs_testlib.resources.cli import FROSTFS_AUTHMATE_EXEC from frostfs_testlib.resources.common import CREDENTIALS_CREATE_TIMEOUT from frostfs_testlib.s3 import S3ClientWrapper, VersioningStatus -from frostfs_testlib.storage.cluster import Cluster +from frostfs_testlib.shell import Shell +from frostfs_testlib.steps.cli.container import ( + search_container_by_name, + search_nodes_with_container, +) +from frostfs_testlib.storage.cluster import Cluster, ClusterNode from frostfs_testlib.storage.dataclasses.frostfs_services import S3Gate from frostfs_testlib.utils.cli_utils import _run_with_passwd @@ -245,3 +250,18 @@ def delete_bucket_with_objects(s3_client: S3ClientWrapper, bucket: str): # Delete the bucket itself s3_client.delete_bucket(bucket) + + +@reporter.step_deco("Search nodes bucket") +def search_nodes_with_bucket( + cluster: Cluster, + bucket_name: str, + wallet: str, + shell: Shell, + endpoint: str, +) -> list[ClusterNode]: + cid = search_container_by_name(wallet=wallet, name=bucket_name, shell=shell, endpoint=endpoint) + nodes_list = search_nodes_with_container( + wallet=wallet, cid=cid, shell=shell, endpoint=endpoint, cluster=cluster + ) + return nodes_list diff --git a/src/frostfs_testlib/storage/cluster.py b/src/frostfs_testlib/storage/cluster.py index 2158dc2d..91487c9f 100644 --- a/src/frostfs_testlib/storage/cluster.py +++ b/src/frostfs_testlib/storage/cluster.py @@ -2,9 +2,11 @@ import random import re import yaml +from yarl import URL from frostfs_testlib.hosting import Host, Hosting from frostfs_testlib.hosting.config import ServiceConfig +from frostfs_testlib.reporter import get_reporter from frostfs_testlib.storage import get_service_registry from frostfs_testlib.storage.constants import ConfigAttributes from frostfs_testlib.storage.dataclasses.frostfs_services import ( @@ -17,6 +19,8 @@ from frostfs_testlib.storage.dataclasses.frostfs_services import ( from frostfs_testlib.storage.dataclasses.node_base import NodeBase, ServiceClass from frostfs_testlib.storage.service_registry import ServiceRegistry +reporter = get_reporter() + class ClusterNode: """ @@ -250,3 +254,10 @@ class Cluster: def get_morph_endpoints(self) -> list[str]: nodes: list[MorphChain] = self.services(MorphChain) return [node.get_endpoint() for node in nodes] + + def get_nodes_by_ip(self, ips: list[str]) -> list[ClusterNode]: + cluster_nodes = [ + node for node in self.cluster_nodes if URL(node.morph_chain.get_endpoint()).host in ips + ] + with reporter.step(f"Return cluster nodes - {cluster_nodes}"): + return cluster_nodes diff --git a/src/frostfs_testlib/storage/controllers/cluster_state_controller.py b/src/frostfs_testlib/storage/controllers/cluster_state_controller.py index 705caf04..70f3e215 100644 --- a/src/frostfs_testlib/storage/controllers/cluster_state_controller.py +++ b/src/frostfs_testlib/storage/controllers/cluster_state_controller.py @@ -25,6 +25,7 @@ class ClusterStateController: self.stopped_storage_nodes: list[ClusterNode] = [] self.cluster = cluster self.shell = shell + self.suspended_services: dict[str, list[ClusterNode]] = {} @run_optionally(optionals.OPTIONAL_FAILOVER_ENABLED) @reporter.step_deco("Stop host of node {node}") @@ -130,6 +131,31 @@ class ClusterStateController: wait_all_storage_nodes_returned(self.shell, self.cluster) self.stopped_storage_nodes = [] + @run_optionally(optionals.OPTIONAL_FAILOVER_ENABLED) + @reporter.step_deco("Suspend {process_name} service in {node}") + def suspend_service(self, process_name: str, node: ClusterNode): + node.host.wait_success_suspend_process(process_name) + if self.suspended_services.get(process_name): + self.suspended_services[process_name].append(node) + else: + self.suspended_services[process_name] = [node] + + @run_optionally(optionals.OPTIONAL_FAILOVER_ENABLED) + @reporter.step_deco("Resume {process_name} service in {node}") + def resume_service(self, process_name: str, node: ClusterNode): + node.host.wait_success_resume_process(process_name) + if self.suspended_services.get(process_name): + self.suspended_services[process_name].append(node) + else: + self.suspended_services[process_name] = [node] + + @run_optionally(optionals.OPTIONAL_FAILOVER_ENABLED) + @reporter.step_deco("Start suspend processes services") + def resume_suspended_services(self): + for process_name, list_nodes in self.suspended_services.items(): + [node.host.wait_success_resume_process(process_name) for node in list_nodes] + self.suspended_services = {} + @run_optionally(optionals.OPTIONAL_FAILOVER_ENABLED) @reporter.step_deco("Hard reboot host {node} via magic SysRq option") def panic_reboot_host(self, node: ClusterNode, wait_for_return: bool = True): diff --git a/src/frostfs_testlib/storage/dataclasses/node_base.py b/src/frostfs_testlib/storage/dataclasses/node_base.py index 8fcb03b0..150b9636 100644 --- a/src/frostfs_testlib/storage/dataclasses/node_base.py +++ b/src/frostfs_testlib/storage/dataclasses/node_base.py @@ -19,6 +19,7 @@ class NodeBase(ABC): id: str name: str host: Host + _process_name: str def __init__(self, id, name, host) -> None: self.id = id @@ -48,6 +49,9 @@ class NodeBase(ABC): def get_service_systemctl_name(self) -> str: return self._get_attribute(ConfigAttributes.SERVICE_NAME) + def get_process_name(self) -> str: + return self._process_name + def start_service(self): self.host.start_service(self.name)