New methods for stopping processes #37

Merged
ylukoyan merged 1 commit from d.zayakin/frostfs-testlib:add-test-suspend-service into master 2023-06-05 14:12:43 +00:00
10 changed files with 171 additions and 11 deletions

View file

@ -262,3 +262,45 @@ class FrostfsCliContainer(CliCommand):
"container set-eacl",
**{param: value for param, value in locals().items() if param not in ["self"]},
)
def search_node(
self,
rpc_endpoint: str,
wallet: str,
cid: str,
address: Optional[str] = None,
ttl: Optional[int] = None,
from_file: Optional[str] = None,
short: Optional[bool] = True,
xhdr: Optional[dict] = None,
generate_key: Optional[bool] = None,
timeout: Optional[str] = None,
) -> CommandResult:
"""
Show the nodes participating in the container in the current epoch.
Args:
rpc_endpoint: string Remote host address (as 'multiaddr' or '<host>:<port>')
wallet: WIF (NEP-2) string or path to the wallet or binary key.
cid: Container ID.
address: Address of wallet account.
ttl: TTL value in request meta header (default 2).
from_file: string File path with encoded container
timeout: duration Timeout for the operation (default 15 s)
short: shorten the output of node information.
xhdr: Dict with request X-Headers.
generate_key: Generate a new private key
Returns:
"""
from_str = f"--from {from_file}" if from_file else ""
return self._execute(
f"container nodes {from_str}",
**{
param: value
for param, value in locals().items()
if param not in ["self", "from_file", "from_str"]
},
)

View file

@ -117,6 +117,12 @@ class DockerHost(Host):
timeout=service_attributes.stop_timeout,
)
def wait_success_suspend_process(self, service_name: str):
raise NotImplementedError("Not supported for docker")
def wait_success_resume_process(self, service_name: str):
raise NotImplementedError("Not supported for docker")
def restart_service(self, service_name: str) -> None:
service_attributes = self._get_service_attributes(service_name)
@ -134,7 +140,7 @@ class DockerHost(Host):
def delete_blobovnicza(self, service_name: str) -> None:
raise NotImplementedError("Not implemented for docker")
def delete_pilorama(self, service_name: str) -> None:
raise NotImplementedError("Not implemented for docker")

View file

@ -112,6 +112,20 @@ class Host(ABC):
service_name: Name of the service to restart.
"""
@abstractmethod
def wait_success_suspend_process(self, process_name: str) -> None:
"""Search for a service ID by its name and stop the process
Args:
process_name: Name
"""
@abstractmethod
def wait_success_resume_process(self, process_name: str) -> None:
"""Search for a service by its ID and start the process
Args:
process_name: Name
"""
@abstractmethod
def delete_storage_node_data(self, service_name: str, cache_only: bool = False) -> None:
"""Erases all data of the storage node with specified name.

View file

@ -1,5 +1,6 @@
import json
import logging
import re
from dataclasses import dataclass
from time import sleep
from typing import Optional, Union
@ -10,7 +11,7 @@ from frostfs_testlib.resources.cli import CLI_DEFAULT_TIMEOUT, FROSTFS_CLI_EXEC
from frostfs_testlib.resources.common import DEFAULT_WALLET_CONFIG
from frostfs_testlib.shell import Shell
from frostfs_testlib.steps.cli.object import put_object, put_object_to_random_node
from frostfs_testlib.storage.cluster import Cluster
from frostfs_testlib.storage.cluster import Cluster, ClusterNode
from frostfs_testlib.storage.dataclasses.storage_object_info import StorageObjectInfo
from frostfs_testlib.storage.dataclasses.wallet import WalletInfo
from frostfs_testlib.utils import json_utils
@ -357,3 +358,27 @@ def search_container_by_name(wallet: str, name: str, shell: Shell, endpoint: str
if cont_info.get("attributes", {}).get("Name", None) == name:
return cid
return None
@reporter.step_deco("Search for nodes with a container")
def search_nodes_with_container(
wallet: str,
cid: str,
shell: Shell,
endpoint: str,
cluster: Cluster,
timeout: Optional[str] = CLI_DEFAULT_TIMEOUT,
) -> list[ClusterNode]:
cli = FrostfsCli(shell, FROSTFS_CLI_EXEC, DEFAULT_WALLET_CONFIG)
result = cli.container.search_node(
rpc_endpoint=endpoint, wallet=wallet, cid=cid, timeout=timeout
)
pattern = r"[0-9]+(?:\.[0-9]+){3}"
nodes_ip = list(set(re.findall(pattern, result.stdout)))
with reporter.step(f"nodes ips = {nodes_ip}"):
nodes_list = cluster.get_nodes_by_ip(nodes_ip)
with reporter.step(f"Return nodes - {nodes_list}"):
return nodes_list

View file

@ -28,7 +28,13 @@ ASSETS_DIR = os.getenv("ASSETS_DIR", "TemporaryDir/")
@reporter.step_deco("Get via HTTP Gate")
def get_via_http_gate(cid: str, oid: str, endpoint: str, request_path: Optional[str] = None):
def get_via_http_gate(
cid: str,
oid: str,
endpoint: str,
request_path: Optional[str] = None,
timeout: Optional[int] = 300,
):
"""
This function gets given object from HTTP gate
cid: container id to get object from
@ -43,7 +49,7 @@ def get_via_http_gate(cid: str, oid: str, endpoint: str, request_path: Optional[
else:
request = f"{endpoint}{request_path}"
resp = requests.get(request, stream=True)
resp = requests.get(request, stream=True, timeout=timeout)
if not resp.ok:
raise Exception(
@ -63,7 +69,7 @@ def get_via_http_gate(cid: str, oid: str, endpoint: str, request_path: Optional[
@reporter.step_deco("Get via Zip HTTP Gate")
def get_via_zip_http_gate(cid: str, prefix: str, endpoint: str):
def get_via_zip_http_gate(cid: str, prefix: str, endpoint: str, timeout: Optional[int] = 300):
"""
This function gets given object from HTTP gate
cid: container id to get object from
@ -71,7 +77,7 @@ def get_via_zip_http_gate(cid: str, prefix: str, endpoint: str):
endpoint: http gate endpoint
"""
request = f"{endpoint}/zip/{cid}/{prefix}"
resp = requests.get(request, stream=True)
resp = requests.get(request, stream=True, timeout=timeout)
if not resp.ok:
raise Exception(
@ -96,7 +102,11 @@ def get_via_zip_http_gate(cid: str, prefix: str, endpoint: str):
@reporter.step_deco("Get via HTTP Gate by attribute")
def get_via_http_gate_by_attribute(
cid: str, attribute: dict, endpoint: str, request_path: Optional[str] = None
cid: str,
attribute: dict,
endpoint: str,
request_path: Optional[str] = None,
timeout: Optional[int] = 300,
):
"""
This function gets given object from HTTP gate
@ -113,7 +123,7 @@ def get_via_http_gate_by_attribute(
else:
request = f"{endpoint}{request_path}"
resp = requests.get(request, stream=True)
resp = requests.get(request, stream=True, timeout=timeout)
if not resp.ok:
raise Exception(
@ -133,7 +143,9 @@ def get_via_http_gate_by_attribute(
@reporter.step_deco("Upload via HTTP Gate")
def upload_via_http_gate(cid: str, path: str, endpoint: str, headers: Optional[dict] = None) -> str:
def upload_via_http_gate(
cid: str, path: str, endpoint: str, headers: Optional[dict] = None, timeout: Optional[int] = 300
) -> str:
"""
This function upload given object through HTTP gate
cid: CID to get object from
@ -144,7 +156,7 @@ def upload_via_http_gate(cid: str, path: str, endpoint: str, headers: Optional[d
request = f"{endpoint}/upload/{cid}"
files = {"upload_file": open(path, "rb")}
body = {"filename": path}
resp = requests.post(request, files=files, data=body, headers=headers)
resp = requests.post(request, files=files, data=body, headers=headers, timeout=timeout)
if not resp.ok:
raise Exception(

View file

@ -12,7 +12,12 @@ from frostfs_testlib.reporter import get_reporter
from frostfs_testlib.resources.cli import FROSTFS_AUTHMATE_EXEC
from frostfs_testlib.resources.common import CREDENTIALS_CREATE_TIMEOUT
from frostfs_testlib.s3 import S3ClientWrapper, VersioningStatus
from frostfs_testlib.storage.cluster import Cluster
from frostfs_testlib.shell import Shell
from frostfs_testlib.steps.cli.container import (
search_container_by_name,
search_nodes_with_container,
)
from frostfs_testlib.storage.cluster import Cluster, ClusterNode
from frostfs_testlib.storage.dataclasses.frostfs_services import S3Gate
from frostfs_testlib.utils.cli_utils import _run_with_passwd
@ -245,3 +250,18 @@ def delete_bucket_with_objects(s3_client: S3ClientWrapper, bucket: str):
# Delete the bucket itself
s3_client.delete_bucket(bucket)
@reporter.step_deco("Search nodes bucket")
def search_nodes_with_bucket(
cluster: Cluster,
bucket_name: str,
wallet: str,
shell: Shell,
endpoint: str,
) -> list[ClusterNode]:
cid = search_container_by_name(wallet=wallet, name=bucket_name, shell=shell, endpoint=endpoint)
nodes_list = search_nodes_with_container(
wallet=wallet, cid=cid, shell=shell, endpoint=endpoint, cluster=cluster
)
return nodes_list

View file

@ -2,9 +2,11 @@ import random
import re
import yaml
from yarl import URL
from frostfs_testlib.hosting import Host, Hosting
from frostfs_testlib.hosting.config import ServiceConfig
from frostfs_testlib.reporter import get_reporter
from frostfs_testlib.storage import get_service_registry
from frostfs_testlib.storage.constants import ConfigAttributes
from frostfs_testlib.storage.dataclasses.frostfs_services import (
@ -17,6 +19,8 @@ from frostfs_testlib.storage.dataclasses.frostfs_services import (
from frostfs_testlib.storage.dataclasses.node_base import NodeBase, ServiceClass
from frostfs_testlib.storage.service_registry import ServiceRegistry
reporter = get_reporter()
class ClusterNode:
"""
@ -235,3 +239,10 @@ class Cluster:
def get_morph_endpoints(self) -> list[str]:
nodes: list[MorphChain] = self.services(MorphChain)
return [node.get_endpoint() for node in nodes]
def get_nodes_by_ip(self, ips: list[str]) -> list[ClusterNode]:
cluster_nodes = [
node for node in self.cluster_nodes if URL(node.morph_chain.get_endpoint()).host in ips
]
with reporter.step(f"Return cluster nodes - {cluster_nodes}"):
return cluster_nodes

View file

@ -26,6 +26,7 @@ class ClusterStateController:
self.stopped_storage_nodes: list[StorageNode] = []
self.cluster = cluster
self.shell = shell
self.suspended_services: dict[str, list[ClusterNode]] = {}
@run_optionally(optionals.OPTIONAL_FAILOVER_ENABLED)
@reporter.step_deco("Stop host of node {node}")
@ -92,6 +93,31 @@ class ClusterStateController:
node.start_service()
self.stopped_storage_nodes = []
@run_optionally(optionals.OPTIONAL_FAILOVER_ENABLED)
@reporter.step_deco("Suspend {process_name} service in {node}")
def suspend_service(self, process_name: str, node: ClusterNode):
node.host.wait_success_suspend_process(process_name)
if self.suspended_services.get(process_name):
self.suspended_services[process_name].append(node)
else:
self.suspended_services[process_name] = [node]
@run_optionally(optionals.OPTIONAL_FAILOVER_ENABLED)
@reporter.step_deco("Resume {process_name} service in {node}")
def resume_service(self, process_name: str, node: ClusterNode):
node.host.wait_success_resume_process(process_name)
if self.suspended_services.get(process_name):
self.suspended_services[process_name].append(node)
else:
self.suspended_services[process_name] = [node]
@run_optionally(optionals.OPTIONAL_FAILOVER_ENABLED)
@reporter.step_deco("Start suspend processes services")
def resume_suspended_services(self):
for process_name, list_nodes in self.suspended_services.items():
[node.host.wait_success_resume_process(process_name) for node in list_nodes]
self.suspended_services = {}
@run_optionally(optionals.OPTIONAL_FAILOVER_ENABLED)
@reporter.step_deco("Hard reboot host {node} via magic SysRq option")
def panic_reboot_host(self, node: ClusterNode, wait_for_return: bool = True):

View file

@ -17,6 +17,7 @@ class NodeBase(ABC):
id: str
name: str
host: Host
_process_name: str
def __init__(self, id, name, host) -> None:
self.id = id
@ -46,6 +47,9 @@ class NodeBase(ABC):
def get_service_systemctl_name(self) -> str:
return self._get_attribute(ConfigAttributes.SERVICE_NAME)
def get_process_name(self) -> str:
return self._process_name
def start_service(self):
self.host.start_service(self.name)