forked from TrueCloudLab/frostfs-testlib
Compare commits
10 commits
2c2af7f8ed
...
3af4dfd977
Author | SHA1 | Date | |
---|---|---|---|
3af4dfd977 | |||
8a360683ae | |||
f4111a1374 | |||
b1a3d740e9 | |||
0c3bb20af5 | |||
e1f3444e92 | |||
cff5db5a67 | |||
1c3bbe26f7 | |||
dd347dd8fb | |||
98f9c78f09 |
17 changed files with 290 additions and 124 deletions
|
@ -18,11 +18,11 @@ keywords = ["frostfs", "test"]
|
|||
dependencies = [
|
||||
"allure-python-commons>=2.13.2",
|
||||
"docker>=4.4.0",
|
||||
"importlib_metadata>=5.0; python_version < '3.10'",
|
||||
"pyyaml==6.0.1",
|
||||
"neo-mamba==1.0.0",
|
||||
"paramiko>=2.10.3",
|
||||
"pexpect>=4.8.0",
|
||||
"requests>=2.28.0",
|
||||
"requests==2.28.1",
|
||||
"docstring_parser>=0.15",
|
||||
"testrail-api>=1.12.0",
|
||||
"pytest==7.1.2",
|
||||
|
|
|
@ -1,6 +1,5 @@
|
|||
allure-python-commons==2.13.2
|
||||
docker==4.4.0
|
||||
importlib_metadata==5.0.0
|
||||
neo-mamba==1.0.0
|
||||
paramiko==2.10.3
|
||||
pexpect==4.8.0
|
||||
|
|
|
@ -11,7 +11,7 @@ import docker
|
|||
from requests import HTTPError
|
||||
|
||||
from frostfs_testlib.hosting.config import ParsedAttributes
|
||||
from frostfs_testlib.hosting.interfaces import DiskInfo, Host
|
||||
from frostfs_testlib.hosting.interfaces import DiskInfo, Host, HostStatus
|
||||
from frostfs_testlib.shell import LocalShell, Shell, SSHShell
|
||||
from frostfs_testlib.shell.command_inspectors import SudoInspector
|
||||
|
||||
|
@ -87,6 +87,15 @@ class DockerHost(Host):
|
|||
for service_config in self._config.services:
|
||||
self.start_service(service_config.name)
|
||||
|
||||
def get_host_status(self) -> HostStatus:
|
||||
# We emulate host status by checking all services.
|
||||
for service_config in self._config.services:
|
||||
state = self._get_container_state(service_config.name)
|
||||
if state != "running":
|
||||
return HostStatus.OFFLINE
|
||||
|
||||
return HostStatus.ONLINE
|
||||
|
||||
def stop_host(self) -> None:
|
||||
# We emulate stopping machine by stopping all services
|
||||
# As an alternative we can probably try to stop docker service...
|
||||
|
@ -117,6 +126,14 @@ class DockerHost(Host):
|
|||
timeout=service_attributes.stop_timeout,
|
||||
)
|
||||
|
||||
def mask_service(self, service_name: str) -> None:
|
||||
# Not required for Docker
|
||||
return
|
||||
|
||||
def unmask_service(self, service_name: str) -> None:
|
||||
# Not required for Docker
|
||||
return
|
||||
|
||||
def wait_success_suspend_process(self, service_name: str):
|
||||
raise NotImplementedError("Not supported for docker")
|
||||
|
||||
|
@ -212,11 +229,36 @@ class DockerHost(Host):
|
|||
with open(file_path, "wb") as file:
|
||||
file.write(logs)
|
||||
|
||||
def get_filtered_logs(
|
||||
self,
|
||||
filter_regex: str,
|
||||
since: Optional[datetime] = None,
|
||||
until: Optional[datetime] = None,
|
||||
unit: Optional[str] = None,
|
||||
) -> str:
|
||||
client = self._get_docker_client()
|
||||
filtered_logs = ""
|
||||
for service_config in self._config.services:
|
||||
container_name = self._get_service_attributes(service_config.name).container_name
|
||||
try:
|
||||
filtered_logs = client.logs(container_name, since=since, until=until)
|
||||
except HTTPError as exc:
|
||||
logger.info(f"Got exception while dumping logs of '{container_name}': {exc}")
|
||||
continue
|
||||
|
||||
matches = re.findall(filter_regex, filtered_logs, re.IGNORECASE + re.MULTILINE)
|
||||
found = list(matches)
|
||||
if found:
|
||||
filtered_logs += f"{container_name}:\n{os.linesep.join(found)}"
|
||||
|
||||
return filtered_logs
|
||||
|
||||
def is_message_in_logs(
|
||||
self,
|
||||
message_regex: str,
|
||||
since: Optional[datetime] = None,
|
||||
until: Optional[datetime] = None,
|
||||
unit: Optional[str] = None,
|
||||
) -> bool:
|
||||
client = self._get_docker_client()
|
||||
for service_config in self._config.services:
|
||||
|
@ -268,11 +310,16 @@ class DockerHost(Host):
|
|||
# To speed things up, we break timeout in smaller iterations and check container state
|
||||
# several times. This way waiting stops as soon as container reaches the expected state
|
||||
for _ in range(iterations):
|
||||
container = self._get_container_by_name(container_name)
|
||||
logger.debug(f"Current container state\n:{json.dumps(container, indent=2)}")
|
||||
state = self._get_container_state(container_name)
|
||||
|
||||
if container and container["State"] == expected_state:
|
||||
if state == expected_state:
|
||||
return
|
||||
time.sleep(iteration_wait_time)
|
||||
|
||||
raise RuntimeError(f"Container {container_name} is not in {expected_state} state.")
|
||||
|
||||
def _get_container_state(self, container_name: str) -> str:
|
||||
container = self._get_container_by_name(container_name)
|
||||
logger.debug(f"Current container state\n:{json.dumps(container, indent=2)}")
|
||||
|
||||
return container.get("State", None)
|
||||
|
|
|
@ -4,6 +4,13 @@ from typing import Optional
|
|||
|
||||
from frostfs_testlib.hosting.config import CLIConfig, HostConfig, ServiceConfig
|
||||
from frostfs_testlib.shell.interfaces import Shell
|
||||
from frostfs_testlib.testing.readable import HumanReadableEnum
|
||||
|
||||
|
||||
class HostStatus(HumanReadableEnum):
|
||||
ONLINE = "Online"
|
||||
OFFLINE = "Offline"
|
||||
UNKNOWN = "Unknown"
|
||||
|
||||
|
||||
class DiskInfo(dict):
|
||||
|
@ -79,6 +86,10 @@ class Host(ABC):
|
|||
def start_host(self) -> None:
|
||||
"""Starts the host machine."""
|
||||
|
||||
@abstractmethod
|
||||
def get_host_status(self) -> HostStatus:
|
||||
"""Check host status."""
|
||||
|
||||
@abstractmethod
|
||||
def stop_host(self, mode: str) -> None:
|
||||
"""Stops the host machine.
|
||||
|
@ -107,6 +118,26 @@ class Host(ABC):
|
|||
service_name: Name of the service to stop.
|
||||
"""
|
||||
|
||||
@abstractmethod
|
||||
def mask_service(self, service_name: str) -> None:
|
||||
"""Prevent the service from start by any activity by masking it.
|
||||
|
||||
The service must be hosted on this host.
|
||||
|
||||
Args:
|
||||
service_name: Name of the service to mask.
|
||||
"""
|
||||
|
||||
@abstractmethod
|
||||
def unmask_service(self, service_name: str) -> None:
|
||||
"""Allow the service to start by any activity by unmasking it.
|
||||
|
||||
The service must be hosted on this host.
|
||||
|
||||
Args:
|
||||
service_name: Name of the service to unmask.
|
||||
"""
|
||||
|
||||
@abstractmethod
|
||||
def restart_service(self, service_name: str) -> None:
|
||||
"""Restarts the service with specified name and waits until it starts.
|
||||
|
@ -115,7 +146,6 @@ class Host(ABC):
|
|||
service_name: Name of the service to restart.
|
||||
"""
|
||||
|
||||
|
||||
@abstractmethod
|
||||
def get_data_directory(self, service_name: str) -> str:
|
||||
"""
|
||||
|
@ -126,7 +156,6 @@ class Host(ABC):
|
|||
service_name: Name of storage node service.
|
||||
"""
|
||||
|
||||
|
||||
@abstractmethod
|
||||
def wait_success_suspend_process(self, process_name: str) -> None:
|
||||
"""Search for a service ID by its name and stop the process
|
||||
|
@ -251,12 +280,34 @@ class Host(ABC):
|
|||
filter_regex: regex to filter output
|
||||
"""
|
||||
|
||||
@abstractmethod
|
||||
def get_filtered_logs(
|
||||
self,
|
||||
filter_regex: str,
|
||||
since: Optional[datetime] = None,
|
||||
until: Optional[datetime] = None,
|
||||
unit: Optional[str] = None,
|
||||
) -> str:
|
||||
"""Get logs from host filtered by regex.
|
||||
|
||||
Args:
|
||||
filter_regex: regex filter for logs.
|
||||
since: If set, limits the time from which logs should be collected. Must be in UTC.
|
||||
until: If set, limits the time until which logs should be collected. Must be in UTC.
|
||||
unit: required unit.
|
||||
|
||||
Returns:
|
||||
Found entries as str if any found.
|
||||
Empty string otherwise.
|
||||
"""
|
||||
|
||||
@abstractmethod
|
||||
def is_message_in_logs(
|
||||
self,
|
||||
message_regex: str,
|
||||
since: Optional[datetime] = None,
|
||||
until: Optional[datetime] = None,
|
||||
unit: Optional[str] = None,
|
||||
) -> bool:
|
||||
"""Checks logs on host for specified message regex.
|
||||
|
||||
|
@ -270,9 +321,10 @@ class Host(ABC):
|
|||
False otherwise.
|
||||
"""
|
||||
|
||||
|
||||
@abstractmethod
|
||||
def wait_for_service_to_be_in_state(self, systemd_service_name: str, expected_state: str, timeout: int) -> None:
|
||||
def wait_for_service_to_be_in_state(
|
||||
self, systemd_service_name: str, expected_state: str, timeout: int
|
||||
) -> None:
|
||||
"""
|
||||
Waites for service to be in specified state.
|
||||
|
||||
|
|
|
@ -19,6 +19,7 @@ class LoadScenario(Enum):
|
|||
gRPC_CAR = "grpc_car"
|
||||
S3 = "s3"
|
||||
S3_CAR = "s3_car"
|
||||
S3_MULTIPART = "s3_multipart"
|
||||
HTTP = "http"
|
||||
VERIFY = "verify"
|
||||
LOCAL = "local"
|
||||
|
@ -37,10 +38,11 @@ all_load_scenarios = [
|
|||
LoadScenario.S3_CAR,
|
||||
LoadScenario.gRPC_CAR,
|
||||
LoadScenario.LOCAL,
|
||||
LoadScenario.S3_MULTIPART
|
||||
]
|
||||
all_scenarios = all_load_scenarios.copy() + [LoadScenario.VERIFY]
|
||||
|
||||
constant_vus_scenarios = [LoadScenario.gRPC, LoadScenario.S3, LoadScenario.HTTP, LoadScenario.LOCAL]
|
||||
constant_vus_scenarios = [LoadScenario.gRPC, LoadScenario.S3, LoadScenario.HTTP, LoadScenario.LOCAL, LoadScenario.S3_MULTIPART]
|
||||
constant_arrival_rate_scenarios = [LoadScenario.gRPC_CAR, LoadScenario.S3_CAR]
|
||||
|
||||
grpc_preset_scenarios = [
|
||||
|
@ -49,7 +51,7 @@ grpc_preset_scenarios = [
|
|||
LoadScenario.gRPC_CAR,
|
||||
LoadScenario.LOCAL,
|
||||
]
|
||||
s3_preset_scenarios = [LoadScenario.S3, LoadScenario.S3_CAR]
|
||||
s3_preset_scenarios = [LoadScenario.S3, LoadScenario.S3_CAR, LoadScenario.S3_MULTIPART]
|
||||
|
||||
|
||||
@dataclass
|
||||
|
@ -172,7 +174,7 @@ class LoadParams:
|
|||
k6_url: Optional[str] = None
|
||||
# No ssl verification flag
|
||||
no_verify_ssl: Optional[bool] = metadata_field(
|
||||
[LoadScenario.S3, LoadScenario.S3_CAR, LoadScenario.VERIFY, LoadScenario.HTTP],
|
||||
[LoadScenario.S3, LoadScenario.S3_CAR, LoadScenario.S3_MULTIPART, LoadScenario.VERIFY, LoadScenario.HTTP],
|
||||
"no-verify-ssl",
|
||||
"NO_VERIFY_SSL",
|
||||
False,
|
||||
|
@ -258,6 +260,14 @@ class LoadParams:
|
|||
constant_arrival_rate_scenarios, None, "MAX_DELETERS", False, True
|
||||
)
|
||||
|
||||
# Multipart
|
||||
# Number of parts to upload in parallel
|
||||
writers_multipart: Optional[int] = metadata_field(
|
||||
[LoadScenario.S3_MULTIPART], None, "WRITERS_MULTIPART", False, True
|
||||
)
|
||||
# part size must be greater than (5 MB)
|
||||
write_object_part_size: Optional[int] = metadata_field([LoadScenario.S3_MULTIPART], None, "WRITE_OBJ_PART_SIZE", False)
|
||||
|
||||
# Period of time to apply the rate value.
|
||||
time_unit: Optional[str] = metadata_field(
|
||||
constant_arrival_rate_scenarios, None, "TIME_UNIT", False
|
||||
|
|
|
@ -196,6 +196,7 @@ def get_metrics_object(load_type: LoadScenario, summary: dict[str, Any]) -> Metr
|
|||
LoadScenario.HTTP: GrpcMetrics,
|
||||
LoadScenario.S3: S3Metrics,
|
||||
LoadScenario.S3_CAR: S3Metrics,
|
||||
LoadScenario.S3_MULTIPART: S3Metrics,
|
||||
LoadScenario.VERIFY: VerifyMetrics,
|
||||
LoadScenario.LOCAL: LocalMetrics,
|
||||
}
|
||||
|
|
|
@ -92,6 +92,7 @@ class LoadReport:
|
|||
model_map = {
|
||||
LoadScenario.gRPC: "closed model",
|
||||
LoadScenario.S3: "closed model",
|
||||
LoadScenario.S3_MULTIPART: "closed model",
|
||||
LoadScenario.HTTP: "closed model",
|
||||
LoadScenario.gRPC_CAR: "open model",
|
||||
LoadScenario.S3_CAR: "open model",
|
||||
|
|
|
@ -1,12 +1,6 @@
|
|||
import sys
|
||||
from importlib.metadata import entry_points
|
||||
from typing import Any
|
||||
|
||||
if sys.version_info < (3, 10):
|
||||
# On Python prior 3.10 we need to use backport of entry points
|
||||
from importlib_metadata import entry_points
|
||||
else:
|
||||
from importlib.metadata import entry_points
|
||||
|
||||
|
||||
def load_plugin(plugin_group: str, name: str) -> Any:
|
||||
"""Loads plugin using entry point specification.
|
||||
|
|
|
@ -39,7 +39,7 @@ class LocalShell(Shell):
|
|||
log_file = tempfile.TemporaryFile() # File is reliable cross-platform way to capture output
|
||||
|
||||
try:
|
||||
command_process = pexpect.spawn(command, timeout=options.timeout)
|
||||
command_process = pexpect.spawn(command, timeout=options.timeout, use_poll=True)
|
||||
except (pexpect.ExceptionPexpect, OSError) as exc:
|
||||
raise RuntimeError(f"Command: {command}") from exc
|
||||
|
||||
|
|
|
@ -15,8 +15,7 @@ from frostfs_testlib.resources.cli import (
|
|||
)
|
||||
from frostfs_testlib.resources.common import MORPH_BLOCK_TIME
|
||||
from frostfs_testlib.shell import Shell
|
||||
from frostfs_testlib.steps.epoch import tick_epoch
|
||||
from frostfs_testlib.steps.epoch import wait_for_epochs_align
|
||||
from frostfs_testlib.steps.epoch import tick_epoch, wait_for_epochs_align
|
||||
from frostfs_testlib.storage.cluster import Cluster, StorageNode
|
||||
from frostfs_testlib.storage.dataclasses.frostfs_services import S3Gate
|
||||
from frostfs_testlib.utils import datetime_utils
|
||||
|
@ -41,44 +40,6 @@ class HealthStatus:
|
|||
return HealthStatus(network, health)
|
||||
|
||||
|
||||
@reporter.step_deco("Stop random storage nodes")
|
||||
def stop_random_storage_nodes(number: int, nodes: list[StorageNode]) -> list[StorageNode]:
|
||||
"""
|
||||
Shuts down the given number of randomly selected storage nodes.
|
||||
Args:
|
||||
number: the number of storage nodes to stop
|
||||
nodes: the list of storage nodes to stop
|
||||
Returns:
|
||||
the list of nodes that were stopped
|
||||
"""
|
||||
nodes_to_stop = random.sample(nodes, number)
|
||||
for node in nodes_to_stop:
|
||||
node.stop_service()
|
||||
return nodes_to_stop
|
||||
|
||||
|
||||
@reporter.step_deco("Start storage node")
|
||||
def start_storage_nodes(nodes: list[StorageNode]) -> None:
|
||||
"""
|
||||
The function starts specified storage nodes.
|
||||
Args:
|
||||
nodes: the list of nodes to start
|
||||
"""
|
||||
for node in nodes:
|
||||
node.start_service()
|
||||
|
||||
|
||||
@reporter.step_deco("Stop storage node")
|
||||
def stop_storage_nodes(nodes: list[StorageNode]) -> None:
|
||||
"""
|
||||
The function starts specified storage nodes.
|
||||
Args:
|
||||
nodes: the list of nodes to start
|
||||
"""
|
||||
for node in nodes:
|
||||
node.stop_service()
|
||||
|
||||
|
||||
@reporter.step_deco("Get Locode from random storage node")
|
||||
def get_locode_from_random_node(cluster: Cluster) -> str:
|
||||
node = random.choice(cluster.services(StorageNode))
|
||||
|
@ -329,25 +290,3 @@ def _run_control_command(node: StorageNode, command: str) -> None:
|
|||
f"--wallet {wallet_path} --config {wallet_config_path}"
|
||||
)
|
||||
return result.stdout
|
||||
|
||||
|
||||
@reporter.step_deco("Start services s3gate ")
|
||||
def start_s3gates(cluster: Cluster) -> None:
|
||||
"""
|
||||
The function starts specified storage nodes.
|
||||
Args:
|
||||
cluster: cluster instance under test
|
||||
"""
|
||||
for gate in cluster.services(S3Gate):
|
||||
gate.start_service()
|
||||
|
||||
|
||||
@reporter.step_deco("Stop services s3gate ")
|
||||
def stop_s3gates(cluster: Cluster) -> None:
|
||||
"""
|
||||
The function starts specified storage nodes.
|
||||
Args:
|
||||
cluster: cluster instance under test
|
||||
"""
|
||||
for gate in cluster.services(S3Gate):
|
||||
gate.stop_service()
|
||||
|
|
|
@ -17,6 +17,7 @@ from frostfs_testlib.storage.dataclasses.frostfs_services import (
|
|||
StorageNode,
|
||||
)
|
||||
from frostfs_testlib.storage.dataclasses.node_base import NodeBase, ServiceClass
|
||||
from frostfs_testlib.storage.dataclasses.storage_object_info import Interfaces
|
||||
from frostfs_testlib.storage.service_registry import ServiceRegistry
|
||||
|
||||
reporter = get_reporter()
|
||||
|
@ -121,6 +122,40 @@ class ClusterNode:
|
|||
config.attributes[ConfigAttributes.SERVICE_NAME] for config in self.host.config.services
|
||||
]
|
||||
|
||||
def get_all_interfaces(self) -> dict[str, str]:
|
||||
return self.host.config.interfaces
|
||||
|
||||
def get_interface(self, interface: Interfaces) -> str:
|
||||
return self.host.config.interfaces[interface.value]
|
||||
|
||||
def get_data_interfaces(self) -> list[str]:
|
||||
return [
|
||||
ip_address
|
||||
for name_interface, ip_address in self.host.config.interfaces.items()
|
||||
if "data" in name_interface
|
||||
]
|
||||
|
||||
def get_data_interface(self, search_interface: str) -> list[str]:
|
||||
return [
|
||||
self.host.config.interfaces[interface]
|
||||
for interface in self.host.config.interfaces.keys()
|
||||
if search_interface == interface
|
||||
]
|
||||
|
||||
def get_internal_interfaces(self) -> list[str]:
|
||||
return [
|
||||
ip_address
|
||||
for name_interface, ip_address in self.host.config.interfaces.items()
|
||||
if "internal" in name_interface
|
||||
]
|
||||
|
||||
def get_internal_interface(self, search_internal: str) -> list[str]:
|
||||
return [
|
||||
self.host.config.interfaces[interface]
|
||||
for interface in self.host.config.interfaces.keys()
|
||||
if search_internal == interface
|
||||
]
|
||||
|
||||
|
||||
class Cluster:
|
||||
"""
|
||||
|
@ -173,6 +208,42 @@ class Cluster:
|
|||
def morph_chain(self) -> list[MorphChain]:
|
||||
return self.services(MorphChain)
|
||||
|
||||
def nodes(self, services: list[ServiceClass]) -> list[ClusterNode]:
|
||||
"""
|
||||
Resolve which cluster nodes hosting the specified services.
|
||||
|
||||
Args:
|
||||
services: list of services to resolve hosting cluster nodes.
|
||||
|
||||
Returns:
|
||||
list of cluster nodes which host specified services.
|
||||
"""
|
||||
|
||||
cluster_nodes = set()
|
||||
for service in services:
|
||||
cluster_nodes.update(
|
||||
[node for node in self.cluster_nodes if node.service(type(service)) == service]
|
||||
)
|
||||
|
||||
return list(cluster_nodes)
|
||||
|
||||
def node(self, service: ServiceClass) -> ClusterNode:
|
||||
"""
|
||||
Resolve single cluster node hosting the specified service.
|
||||
|
||||
Args:
|
||||
services: list of services to resolve hosting cluster nodes.
|
||||
|
||||
Returns:
|
||||
list of cluster nodes which host specified services.
|
||||
"""
|
||||
|
||||
nodes = [node for node in self.cluster_nodes if node.service(type(service)) == service]
|
||||
if not len(nodes):
|
||||
raise RuntimeError(f"Cannot find service {service} on any node")
|
||||
|
||||
return nodes[0]
|
||||
|
||||
def services(self, service_type: type[ServiceClass]) -> list[ServiceClass]:
|
||||
"""
|
||||
Get all services in a cluster of specified type.
|
||||
|
|
|
@ -10,6 +10,7 @@ class ConfigAttributes:
|
|||
ENDPOINT_DATA_0 = "endpoint_data0"
|
||||
ENDPOINT_DATA_1 = "endpoint_data1"
|
||||
ENDPOINT_INTERNAL = "endpoint_internal0"
|
||||
ENDPOINT_PROMETHEUS = "endpoint_prometheus"
|
||||
CONTROL_ENDPOINT = "control_endpoint"
|
||||
UN_LOCODE = "un_locode"
|
||||
HTTP_HOSTNAME = "http_hostname"
|
||||
|
|
|
@ -41,10 +41,10 @@ class ClusterStateController:
|
|||
provider = SshConnectionProvider()
|
||||
provider.drop(node.host_ip)
|
||||
|
||||
self.stopped_nodes.append(node)
|
||||
with reporter.step(f"Stop host {node.host.config.address}"):
|
||||
node.host.stop_host(mode=mode)
|
||||
wait_for_host_offline(self.shell, node.storage_node)
|
||||
self.stopped_nodes.append(node)
|
||||
|
||||
@run_optionally(optionals.OPTIONAL_FAILOVER_ENABLED)
|
||||
@reporter.step_deco("Shutdown whole cluster")
|
||||
|
@ -135,9 +135,9 @@ class ClusterStateController:
|
|||
|
||||
@run_optionally(optionals.OPTIONAL_FAILOVER_ENABLED)
|
||||
@reporter.step_deco("Stop storage service on {node}")
|
||||
def stop_storage_service(self, node: ClusterNode):
|
||||
node.storage_node.stop_service()
|
||||
def stop_storage_service(self, node: ClusterNode, mask: bool = True):
|
||||
self.stopped_storage_nodes.append(node)
|
||||
node.storage_node.stop_service(mask)
|
||||
|
||||
@run_optionally(optionals.OPTIONAL_FAILOVER_ENABLED)
|
||||
@reporter.step_deco("Stop all {service_type} services")
|
||||
|
@ -171,9 +171,11 @@ class ClusterStateController:
|
|||
|
||||
@run_optionally(optionals.OPTIONAL_FAILOVER_ENABLED)
|
||||
@reporter.step_deco("Stop {service_type} service on {node}")
|
||||
def stop_service_of_type(self, node: ClusterNode, service_type: type[ServiceClass]):
|
||||
def stop_service_of_type(
|
||||
self, node: ClusterNode, service_type: type[ServiceClass], mask: bool = True
|
||||
):
|
||||
service = node.service(service_type)
|
||||
service.stop_service()
|
||||
service.stop_service(mask)
|
||||
self.stopped_services.add(service)
|
||||
|
||||
@run_optionally(optionals.OPTIONAL_FAILOVER_ENABLED)
|
||||
|
@ -207,8 +209,8 @@ class ClusterStateController:
|
|||
|
||||
@run_optionally(optionals.OPTIONAL_FAILOVER_ENABLED)
|
||||
@reporter.step_deco("Stop s3 gate on {node}")
|
||||
def stop_s3_gate(self, node: ClusterNode):
|
||||
node.s3_gate.stop_service()
|
||||
def stop_s3_gate(self, node: ClusterNode, mask: bool = True):
|
||||
node.s3_gate.stop_service(mask)
|
||||
self.stopped_s3_gates.append(node)
|
||||
|
||||
@run_optionally(optionals.OPTIONAL_FAILOVER_ENABLED)
|
||||
|
|
|
@ -110,6 +110,10 @@ class MorphChain(NodeBase):
|
|||
def label(self) -> str:
|
||||
return f"{self.name}: {self.get_endpoint()}"
|
||||
|
||||
def get_http_endpoint(self) -> str:
|
||||
return self._get_attribute("http_endpoint")
|
||||
|
||||
|
||||
class StorageNode(NodeBase):
|
||||
"""
|
||||
Class represents storage node in a storage cluster
|
||||
|
@ -141,6 +145,9 @@ class StorageNode(NodeBase):
|
|||
def get_shard_config_path(self) -> str:
|
||||
return self._get_attribute(ConfigAttributes.SHARD_CONFIG_PATH)
|
||||
|
||||
def get_shards_config(self) -> tuple[str, dict]:
|
||||
return self.get_config(self.get_shard_config_path())
|
||||
|
||||
def get_control_endpoint(self) -> str:
|
||||
return self._get_attribute(ConfigAttributes.CONTROL_ENDPOINT)
|
||||
|
||||
|
|
|
@ -1,6 +1,6 @@
|
|||
from abc import abstractmethod
|
||||
from dataclasses import dataclass
|
||||
from typing import Optional, Tuple, TypedDict, TypeVar
|
||||
from typing import Optional, TypedDict, TypeVar
|
||||
|
||||
import yaml
|
||||
|
||||
|
@ -57,6 +57,9 @@ class NodeBase(HumanReadableABC):
|
|||
return self._process_name
|
||||
|
||||
def start_service(self):
|
||||
with reporter.step(f"Unmask {self.name} service on {self.host.config.address}"):
|
||||
self.host.unmask_service(self.name)
|
||||
|
||||
with reporter.step(f"Start {self.name} service on {self.host.config.address}"):
|
||||
self.host.start_service(self.name)
|
||||
|
||||
|
@ -64,7 +67,14 @@ class NodeBase(HumanReadableABC):
|
|||
def service_healthcheck(self) -> bool:
|
||||
"""Service healthcheck."""
|
||||
|
||||
def stop_service(self):
|
||||
def get_metrics_endpoint(self) -> str:
|
||||
return self._get_attribute(ConfigAttributes.ENDPOINT_PROMETHEUS)
|
||||
|
||||
def stop_service(self, mask: bool = True):
|
||||
if mask:
|
||||
with reporter.step(f"Mask {self.name} service on {self.host.config.address}"):
|
||||
self.host.mask_service(self.name)
|
||||
|
||||
with reporter.step(f"Stop {self.name} service on {self.host.config.address}"):
|
||||
self.host.stop_service(self.name)
|
||||
|
||||
|
@ -103,8 +113,10 @@ class NodeBase(HumanReadableABC):
|
|||
ConfigAttributes.WALLET_CONFIG,
|
||||
)
|
||||
|
||||
def get_config(self) -> Tuple[str, dict]:
|
||||
def get_config(self, config_file_path: Optional[str] = None) -> tuple[str, dict]:
|
||||
if config_file_path is None:
|
||||
config_file_path = self._get_attribute(ConfigAttributes.CONFIG_PATH)
|
||||
|
||||
shell = self.host.get_shell()
|
||||
|
||||
result = shell.exec(f"cat {config_file_path}")
|
||||
|
@ -113,8 +125,10 @@ class NodeBase(HumanReadableABC):
|
|||
config = yaml.safe_load(config_text)
|
||||
return config_file_path, config
|
||||
|
||||
def save_config(self, new_config: dict) -> None:
|
||||
def save_config(self, new_config: dict, config_file_path: Optional[str] = None) -> None:
|
||||
if config_file_path is None:
|
||||
config_file_path = self._get_attribute(ConfigAttributes.CONFIG_PATH)
|
||||
|
||||
shell = self.host.get_shell()
|
||||
|
||||
config_str = yaml.dump(new_config)
|
||||
|
|
|
@ -1,6 +1,9 @@
|
|||
from dataclasses import dataclass
|
||||
from enum import Enum
|
||||
from typing import Optional
|
||||
|
||||
from frostfs_testlib.testing.readable import HumanReadableEnum
|
||||
|
||||
|
||||
@dataclass
|
||||
class ObjectRef:
|
||||
|
@ -29,15 +32,24 @@ class StorageObjectInfo(ObjectRef):
|
|||
class NodeNetmapInfo:
|
||||
node_id: str = None
|
||||
node_status: str = None
|
||||
node_data_ip: str = None
|
||||
node_data_ips: list[str] = None
|
||||
cluster_name: str = None
|
||||
continent: str = None
|
||||
country: str = None
|
||||
country_code: str = None
|
||||
external_address: str = None
|
||||
external_address: list[str] = None
|
||||
location: str = None
|
||||
node: str = None
|
||||
price: int = None
|
||||
sub_div: str = None
|
||||
sub_div_code: int = None
|
||||
un_locode: str = None
|
||||
role: str = None
|
||||
|
||||
|
||||
class Interfaces(HumanReadableEnum):
|
||||
DATA_O: str = "data0"
|
||||
DATA_1: str = "data1"
|
||||
MGMT: str = "mgmt"
|
||||
INTERNAL_0: str = "internal0"
|
||||
INTERNAL_1: str = "internal1"
|
||||
|
|
|
@ -8,6 +8,7 @@ Helper functions to use with `frostfs-cli`, `neo-go` and other CLIs.
|
|||
import csv
|
||||
import json
|
||||
import logging
|
||||
import re
|
||||
import subprocess
|
||||
import sys
|
||||
from contextlib import suppress
|
||||
|
@ -138,32 +139,47 @@ def log_command_execution(cmd: str, output: Union[str, TypedDict]) -> None:
|
|||
|
||||
def parse_netmap_output(output: str) -> list[NodeNetmapInfo]:
|
||||
"""
|
||||
The cli command will return something like.
|
||||
|
||||
Epoch: 240
|
||||
Node 1: 01234 ONLINE /ip4/10.10.10.10/tcp/8080
|
||||
Continent: Europe
|
||||
Country: Russia
|
||||
CountryCode: RU
|
||||
ExternalAddr: /ip4/10.10.11.18/tcp/8080
|
||||
Location: Moskva
|
||||
Node: 10.10.10.12
|
||||
Price: 5
|
||||
SubDiv: Moskva
|
||||
SubDivCode: MOW
|
||||
UN-LOCODE: RU MOW
|
||||
role: alphabet
|
||||
|
||||
The code will parse each line and return each node as dataclass.
|
||||
"""
|
||||
netmap_list = output.split("Node ")[1:]
|
||||
dataclass_list = []
|
||||
for node in netmap_list:
|
||||
node = node.replace("\t", "").split("\n")
|
||||
node = *node[0].split(" ")[1:-1], *[row.split(": ")[-1] for row in node[1:-1]]
|
||||
dataclass_list.append(NodeNetmapInfo(*node))
|
||||
netmap_nodes = output.split("Node ")[1:]
|
||||
dataclasses_netmap = []
|
||||
result_netmap = {}
|
||||
|
||||
return dataclass_list
|
||||
regexes = {
|
||||
"node_id": r"\d+: (?P<node_id>\w+)",
|
||||
"node_data_ips": r"(?P<node_data_ips>/ip4/.+?)$",
|
||||
"node_status": r"(?P<node_status>ONLINE|OFFLINE)",
|
||||
"cluster_name": r"ClusterName: (?P<cluster_name>\w+)",
|
||||
"continent": r"Continent: (?P<continent>\w+)",
|
||||
"country": r"Country: (?P<country>\w+)",
|
||||
"country_code": r"CountryCode: (?P<country_code>\w+)",
|
||||
"external_address": r"ExternalAddr: (?P<external_address>/ip[4].+?)$",
|
||||
"location": r"Location: (?P<location>\w+.*)",
|
||||
"node": r"Node: (?P<node>\d+\.\d+\.\d+\.\d+)",
|
||||
"price": r"Price: (?P<price>\d+)",
|
||||
"sub_div": r"SubDiv: (?P<sub_div>.*)",
|
||||
"sub_div_code": r"SubDivCode: (?P<sub_div_code>\w+)",
|
||||
"un_locode": r"UN-LOCODE: (?P<un_locode>\w+.*)",
|
||||
"role": r"role: (?P<role>\w+)",
|
||||
}
|
||||
|
||||
for node in netmap_nodes:
|
||||
for key, regex in regexes.items():
|
||||
search_result = re.search(regex, node, flags=re.MULTILINE)
|
||||
if key == "node_data_ips":
|
||||
result_netmap[key] = search_result[key].strip().split(" ")
|
||||
continue
|
||||
if key == "external_address":
|
||||
result_netmap[key] = search_result[key].strip().split(",")
|
||||
continue
|
||||
if search_result == None:
|
||||
result_netmap[key] = None
|
||||
continue
|
||||
result_netmap[key] = search_result[key].strip()
|
||||
|
||||
dataclasses_netmap.append(NodeNetmapInfo(**result_netmap))
|
||||
|
||||
return dataclasses_netmap
|
||||
|
||||
|
||||
def parse_cmd_table(output: str, delimiter="|") -> list[dict[str, str]]:
|
||||
|
|
Loading…
Reference in a new issue