New methods with nodes
Signed-off-by: Dmitriy Zayakin <d.zayakin@yadro.com>
This commit is contained in:
parent
123b5425a8
commit
28886a5c76
10 changed files with 171 additions and 11 deletions
|
@ -262,3 +262,45 @@ class FrostfsCliContainer(CliCommand):
|
|||
"container set-eacl",
|
||||
**{param: value for param, value in locals().items() if param not in ["self"]},
|
||||
)
|
||||
|
||||
def search_node(
|
||||
self,
|
||||
rpc_endpoint: str,
|
||||
wallet: str,
|
||||
cid: str,
|
||||
address: Optional[str] = None,
|
||||
ttl: Optional[int] = None,
|
||||
from_file: Optional[str] = None,
|
||||
short: Optional[bool] = True,
|
||||
xhdr: Optional[dict] = None,
|
||||
generate_key: Optional[bool] = None,
|
||||
timeout: Optional[str] = None,
|
||||
) -> CommandResult:
|
||||
"""
|
||||
Show the nodes participating in the container in the current epoch.
|
||||
|
||||
Args:
|
||||
rpc_endpoint: string Remote host address (as 'multiaddr' or '<host>:<port>')
|
||||
wallet: WIF (NEP-2) string or path to the wallet or binary key.
|
||||
cid: Container ID.
|
||||
address: Address of wallet account.
|
||||
ttl: TTL value in request meta header (default 2).
|
||||
from_file: string File path with encoded container
|
||||
timeout: duration Timeout for the operation (default 15 s)
|
||||
short: shorten the output of node information.
|
||||
xhdr: Dict with request X-Headers.
|
||||
generate_key: Generate a new private key
|
||||
|
||||
Returns:
|
||||
|
||||
"""
|
||||
from_str = f"--from {from_file}" if from_file else ""
|
||||
|
||||
return self._execute(
|
||||
f"container nodes {from_str}",
|
||||
**{
|
||||
param: value
|
||||
for param, value in locals().items()
|
||||
if param not in ["self", "from_file", "from_str"]
|
||||
},
|
||||
)
|
||||
|
|
|
@ -117,6 +117,12 @@ class DockerHost(Host):
|
|||
timeout=service_attributes.stop_timeout,
|
||||
)
|
||||
|
||||
def wait_success_suspend_process(self, service_name: str):
|
||||
raise NotImplementedError("Not supported for docker")
|
||||
|
||||
def wait_success_resume_process(self, service_name: str):
|
||||
raise NotImplementedError("Not supported for docker")
|
||||
|
||||
def restart_service(self, service_name: str) -> None:
|
||||
service_attributes = self._get_service_attributes(service_name)
|
||||
|
||||
|
@ -134,7 +140,7 @@ class DockerHost(Host):
|
|||
|
||||
def delete_blobovnicza(self, service_name: str) -> None:
|
||||
raise NotImplementedError("Not implemented for docker")
|
||||
|
||||
|
||||
def delete_pilorama(self, service_name: str) -> None:
|
||||
raise NotImplementedError("Not implemented for docker")
|
||||
|
||||
|
|
|
@ -112,6 +112,20 @@ class Host(ABC):
|
|||
service_name: Name of the service to restart.
|
||||
"""
|
||||
|
||||
@abstractmethod
|
||||
def wait_success_suspend_process(self, process_name: str) -> None:
|
||||
"""Search for a service ID by its name and stop the process
|
||||
Args:
|
||||
process_name: Name
|
||||
"""
|
||||
|
||||
@abstractmethod
|
||||
def wait_success_resume_process(self, process_name: str) -> None:
|
||||
"""Search for a service by its ID and start the process
|
||||
Args:
|
||||
process_name: Name
|
||||
"""
|
||||
|
||||
@abstractmethod
|
||||
def delete_storage_node_data(self, service_name: str, cache_only: bool = False) -> None:
|
||||
"""Erases all data of the storage node with specified name.
|
||||
|
|
|
@ -1,5 +1,6 @@
|
|||
import json
|
||||
import logging
|
||||
import re
|
||||
from dataclasses import dataclass
|
||||
from time import sleep
|
||||
from typing import Optional, Union
|
||||
|
@ -10,7 +11,7 @@ from frostfs_testlib.resources.cli import CLI_DEFAULT_TIMEOUT, FROSTFS_CLI_EXEC
|
|||
from frostfs_testlib.resources.common import DEFAULT_WALLET_CONFIG
|
||||
from frostfs_testlib.shell import Shell
|
||||
from frostfs_testlib.steps.cli.object import put_object, put_object_to_random_node
|
||||
from frostfs_testlib.storage.cluster import Cluster
|
||||
from frostfs_testlib.storage.cluster import Cluster, ClusterNode
|
||||
from frostfs_testlib.storage.dataclasses.storage_object_info import StorageObjectInfo
|
||||
from frostfs_testlib.storage.dataclasses.wallet import WalletInfo
|
||||
from frostfs_testlib.utils import json_utils
|
||||
|
@ -357,3 +358,27 @@ def search_container_by_name(wallet: str, name: str, shell: Shell, endpoint: str
|
|||
if cont_info.get("attributes", {}).get("Name", None) == name:
|
||||
return cid
|
||||
return None
|
||||
|
||||
|
||||
@reporter.step_deco("Search for nodes with a container")
|
||||
def search_nodes_with_container(
|
||||
wallet: str,
|
||||
cid: str,
|
||||
shell: Shell,
|
||||
endpoint: str,
|
||||
cluster: Cluster,
|
||||
timeout: Optional[str] = CLI_DEFAULT_TIMEOUT,
|
||||
) -> list[ClusterNode]:
|
||||
cli = FrostfsCli(shell, FROSTFS_CLI_EXEC, DEFAULT_WALLET_CONFIG)
|
||||
result = cli.container.search_node(
|
||||
rpc_endpoint=endpoint, wallet=wallet, cid=cid, timeout=timeout
|
||||
)
|
||||
|
||||
pattern = r"[0-9]+(?:\.[0-9]+){3}"
|
||||
nodes_ip = list(set(re.findall(pattern, result.stdout)))
|
||||
|
||||
with reporter.step(f"nodes ips = {nodes_ip}"):
|
||||
nodes_list = cluster.get_nodes_by_ip(nodes_ip)
|
||||
|
||||
with reporter.step(f"Return nodes - {nodes_list}"):
|
||||
return nodes_list
|
||||
|
|
|
@ -28,7 +28,13 @@ ASSETS_DIR = os.getenv("ASSETS_DIR", "TemporaryDir/")
|
|||
|
||||
|
||||
@reporter.step_deco("Get via HTTP Gate")
|
||||
def get_via_http_gate(cid: str, oid: str, endpoint: str, request_path: Optional[str] = None):
|
||||
def get_via_http_gate(
|
||||
cid: str,
|
||||
oid: str,
|
||||
endpoint: str,
|
||||
request_path: Optional[str] = None,
|
||||
timeout: Optional[int] = 300,
|
||||
):
|
||||
"""
|
||||
This function gets given object from HTTP gate
|
||||
cid: container id to get object from
|
||||
|
@ -43,7 +49,7 @@ def get_via_http_gate(cid: str, oid: str, endpoint: str, request_path: Optional[
|
|||
else:
|
||||
request = f"{endpoint}{request_path}"
|
||||
|
||||
resp = requests.get(request, stream=True)
|
||||
resp = requests.get(request, stream=True, timeout=timeout)
|
||||
|
||||
if not resp.ok:
|
||||
raise Exception(
|
||||
|
@ -63,7 +69,7 @@ def get_via_http_gate(cid: str, oid: str, endpoint: str, request_path: Optional[
|
|||
|
||||
|
||||
@reporter.step_deco("Get via Zip HTTP Gate")
|
||||
def get_via_zip_http_gate(cid: str, prefix: str, endpoint: str):
|
||||
def get_via_zip_http_gate(cid: str, prefix: str, endpoint: str, timeout: Optional[int] = 300):
|
||||
"""
|
||||
This function gets given object from HTTP gate
|
||||
cid: container id to get object from
|
||||
|
@ -71,7 +77,7 @@ def get_via_zip_http_gate(cid: str, prefix: str, endpoint: str):
|
|||
endpoint: http gate endpoint
|
||||
"""
|
||||
request = f"{endpoint}/zip/{cid}/{prefix}"
|
||||
resp = requests.get(request, stream=True)
|
||||
resp = requests.get(request, stream=True, timeout=timeout)
|
||||
|
||||
if not resp.ok:
|
||||
raise Exception(
|
||||
|
@ -96,7 +102,11 @@ def get_via_zip_http_gate(cid: str, prefix: str, endpoint: str):
|
|||
|
||||
@reporter.step_deco("Get via HTTP Gate by attribute")
|
||||
def get_via_http_gate_by_attribute(
|
||||
cid: str, attribute: dict, endpoint: str, request_path: Optional[str] = None
|
||||
cid: str,
|
||||
attribute: dict,
|
||||
endpoint: str,
|
||||
request_path: Optional[str] = None,
|
||||
timeout: Optional[int] = 300,
|
||||
):
|
||||
"""
|
||||
This function gets given object from HTTP gate
|
||||
|
@ -113,7 +123,7 @@ def get_via_http_gate_by_attribute(
|
|||
else:
|
||||
request = f"{endpoint}{request_path}"
|
||||
|
||||
resp = requests.get(request, stream=True)
|
||||
resp = requests.get(request, stream=True, timeout=timeout)
|
||||
|
||||
if not resp.ok:
|
||||
raise Exception(
|
||||
|
@ -133,7 +143,9 @@ def get_via_http_gate_by_attribute(
|
|||
|
||||
|
||||
@reporter.step_deco("Upload via HTTP Gate")
|
||||
def upload_via_http_gate(cid: str, path: str, endpoint: str, headers: Optional[dict] = None) -> str:
|
||||
def upload_via_http_gate(
|
||||
cid: str, path: str, endpoint: str, headers: Optional[dict] = None, timeout: Optional[int] = 300
|
||||
) -> str:
|
||||
"""
|
||||
This function upload given object through HTTP gate
|
||||
cid: CID to get object from
|
||||
|
@ -144,7 +156,7 @@ def upload_via_http_gate(cid: str, path: str, endpoint: str, headers: Optional[d
|
|||
request = f"{endpoint}/upload/{cid}"
|
||||
files = {"upload_file": open(path, "rb")}
|
||||
body = {"filename": path}
|
||||
resp = requests.post(request, files=files, data=body, headers=headers)
|
||||
resp = requests.post(request, files=files, data=body, headers=headers, timeout=timeout)
|
||||
|
||||
if not resp.ok:
|
||||
raise Exception(
|
||||
|
|
|
@ -12,7 +12,12 @@ from frostfs_testlib.reporter import get_reporter
|
|||
from frostfs_testlib.resources.cli import FROSTFS_AUTHMATE_EXEC
|
||||
from frostfs_testlib.resources.common import CREDENTIALS_CREATE_TIMEOUT
|
||||
from frostfs_testlib.s3 import S3ClientWrapper, VersioningStatus
|
||||
from frostfs_testlib.storage.cluster import Cluster
|
||||
from frostfs_testlib.shell import Shell
|
||||
from frostfs_testlib.steps.cli.container import (
|
||||
search_container_by_name,
|
||||
search_nodes_with_container,
|
||||
)
|
||||
from frostfs_testlib.storage.cluster import Cluster, ClusterNode
|
||||
from frostfs_testlib.storage.dataclasses.frostfs_services import S3Gate
|
||||
from frostfs_testlib.utils.cli_utils import _run_with_passwd
|
||||
|
||||
|
@ -245,3 +250,18 @@ def delete_bucket_with_objects(s3_client: S3ClientWrapper, bucket: str):
|
|||
|
||||
# Delete the bucket itself
|
||||
s3_client.delete_bucket(bucket)
|
||||
|
||||
|
||||
@reporter.step_deco("Search nodes bucket")
|
||||
def search_nodes_with_bucket(
|
||||
cluster: Cluster,
|
||||
bucket_name: str,
|
||||
wallet: str,
|
||||
shell: Shell,
|
||||
endpoint: str,
|
||||
) -> list[ClusterNode]:
|
||||
cid = search_container_by_name(wallet=wallet, name=bucket_name, shell=shell, endpoint=endpoint)
|
||||
nodes_list = search_nodes_with_container(
|
||||
wallet=wallet, cid=cid, shell=shell, endpoint=endpoint, cluster=cluster
|
||||
)
|
||||
return nodes_list
|
||||
|
|
|
@ -2,9 +2,11 @@ import random
|
|||
import re
|
||||
|
||||
import yaml
|
||||
from yarl import URL
|
||||
|
||||
from frostfs_testlib.hosting import Host, Hosting
|
||||
from frostfs_testlib.hosting.config import ServiceConfig
|
||||
from frostfs_testlib.reporter import get_reporter
|
||||
from frostfs_testlib.storage import get_service_registry
|
||||
from frostfs_testlib.storage.constants import ConfigAttributes
|
||||
from frostfs_testlib.storage.dataclasses.frostfs_services import (
|
||||
|
@ -17,6 +19,8 @@ from frostfs_testlib.storage.dataclasses.frostfs_services import (
|
|||
from frostfs_testlib.storage.dataclasses.node_base import NodeBase, ServiceClass
|
||||
from frostfs_testlib.storage.service_registry import ServiceRegistry
|
||||
|
||||
reporter = get_reporter()
|
||||
|
||||
|
||||
class ClusterNode:
|
||||
"""
|
||||
|
@ -235,3 +239,10 @@ class Cluster:
|
|||
def get_morph_endpoints(self) -> list[str]:
|
||||
nodes: list[MorphChain] = self.services(MorphChain)
|
||||
return [node.get_endpoint() for node in nodes]
|
||||
|
||||
def get_nodes_by_ip(self, ips: list[str]) -> list[ClusterNode]:
|
||||
cluster_nodes = [
|
||||
node for node in self.cluster_nodes if URL(node.morph_chain.get_endpoint()).host in ips
|
||||
]
|
||||
with reporter.step(f"Return cluster nodes - {cluster_nodes}"):
|
||||
return cluster_nodes
|
||||
|
|
|
@ -26,6 +26,7 @@ class ClusterStateController:
|
|||
self.stopped_storage_nodes: list[StorageNode] = []
|
||||
self.cluster = cluster
|
||||
self.shell = shell
|
||||
self.suspended_services: dict[str, list[ClusterNode]] = {}
|
||||
|
||||
@run_optionally(optionals.OPTIONAL_FAILOVER_ENABLED)
|
||||
@reporter.step_deco("Stop host of node {node}")
|
||||
|
@ -92,6 +93,31 @@ class ClusterStateController:
|
|||
node.start_service()
|
||||
self.stopped_storage_nodes = []
|
||||
|
||||
@run_optionally(optionals.OPTIONAL_FAILOVER_ENABLED)
|
||||
@reporter.step_deco("Suspend {process_name} service in {node}")
|
||||
def suspend_service(self, process_name: str, node: ClusterNode):
|
||||
node.host.wait_success_suspend_process(process_name)
|
||||
if self.suspended_services.get(process_name):
|
||||
self.suspended_services[process_name].append(node)
|
||||
else:
|
||||
self.suspended_services[process_name] = [node]
|
||||
|
||||
@run_optionally(optionals.OPTIONAL_FAILOVER_ENABLED)
|
||||
@reporter.step_deco("Resume {process_name} service in {node}")
|
||||
def resume_service(self, process_name: str, node: ClusterNode):
|
||||
node.host.wait_success_resume_process(process_name)
|
||||
if self.suspended_services.get(process_name):
|
||||
self.suspended_services[process_name].append(node)
|
||||
else:
|
||||
self.suspended_services[process_name] = [node]
|
||||
|
||||
@run_optionally(optionals.OPTIONAL_FAILOVER_ENABLED)
|
||||
@reporter.step_deco("Start suspend processes services")
|
||||
def resume_suspended_services(self):
|
||||
for process_name, list_nodes in self.suspended_services.items():
|
||||
[node.host.wait_success_resume_process(process_name) for node in list_nodes]
|
||||
self.suspended_services = {}
|
||||
|
||||
@run_optionally(optionals.OPTIONAL_FAILOVER_ENABLED)
|
||||
@reporter.step_deco("Hard reboot host {node} via magic SysRq option")
|
||||
def panic_reboot_host(self, node: ClusterNode, wait_for_return: bool = True):
|
||||
|
|
|
@ -17,6 +17,7 @@ class NodeBase(ABC):
|
|||
id: str
|
||||
name: str
|
||||
host: Host
|
||||
_process_name: str
|
||||
|
||||
def __init__(self, id, name, host) -> None:
|
||||
self.id = id
|
||||
|
@ -46,6 +47,9 @@ class NodeBase(ABC):
|
|||
def get_service_systemctl_name(self) -> str:
|
||||
return self._get_attribute(ConfigAttributes.SERVICE_NAME)
|
||||
|
||||
def get_process_name(self) -> str:
|
||||
return self._process_name
|
||||
|
||||
def start_service(self):
|
||||
self.host.start_service(self.name)
|
||||
|
||||
|
|
Loading…
Reference in a new issue