Compare commits

...

12 commits

Author SHA1 Message Date
5a46da975e [#225] Restore invalid_obj check
Signed-off-by: a.berezin <a.berezin@yadro.com>
2024-05-16 09:57:02 +00:00
8f7f222e0d [#213] Return response in complete_multipart_upload function 2024-04-24 14:55:31 +03:00
e3d5c95bde [#147] Prettify verifier messages for error rates
Signed-off-by: Andrey Berezin <a.berezin@yadro.com>
2023-12-13 14:02:06 +03:00
3e8614f912 [#141] Executive command changed
Added exception of error 'Too many requests' in log analyzer

Signed-off-by: Mikhail Kadilov m.kadilov@yadro.com
2023-12-11 14:15:00 +03:00
5bebfddc33 [OBJECT-138] Executive command changed
Added exception of error 'Too many requests' in log analyzer and fixed arguments ordr in get_filtered_logs

Signed-off-by: Mikhail Kadilov m.kadilov@yadro.com
2023-12-06 17:58:06 +03:00
8966ec20f2 [#135] Add method uptime service
Signed-off-by: Dmitriy Zayakin <d.zayakin@yadro.com>
2023-12-04 07:07:38 +00:00
74eb72f59d [#129] Change local timeout
Signed-off-by: Dmitriy Zayakin <d.zayakin@yadro.com>
2023-11-23 14:20:38 +03:00
5db091ca60 [#116] Updates for local scenario teardown
Signed-off-by: Andrey Berezin <a.berezin@yadro.com>
2023-11-22 11:11:23 +03:00
7cc2bb2d1d [#110] Move chattr call after get_results call
Signed-off-by: Andrey Berezin <a.berezin@yadro.com>
2023-11-22 11:11:18 +03:00
bb10456ec5 Store k6 output and add socket info collection
Signed-off-by: Andrey Berezin <a.berezin@yadro.com>
2023-11-20 16:09:22 +03:00
44e7c7eb29 Add AWS retries
Signed-off-by: Dmitriy Zayakin <d.zayakin@yadro.com>
2023-11-14 10:39:44 +03:00
5687b79b38 [#109] Update CSC with healthchecks
(cherry picked from commit e970fe2788)
Signed-off-by: Dmitry Anurin <d.anurin@yadro.com>
2023-11-13 12:03:59 +03:00
20 changed files with 355 additions and 270 deletions

View file

@ -50,10 +50,10 @@ basic = "frostfs_testlib.healthcheck.basic_healthcheck:BasicHealthcheck"
[tool.isort]
profile = "black"
src_paths = ["src", "tests"]
line_length = 100
line_length = 120
[tool.black]
line-length = 100
line-length = 120
target-version = ["py310"]
[tool.bumpver]

View file

@ -1,5 +1,7 @@
from frostfs_testlib.cli.frostfs_cli.cli import FrostfsCli
from frostfs_testlib.healthcheck.interfaces import Healthcheck
from frostfs_testlib.reporter import get_reporter
from frostfs_testlib.resources.cli import FROSTFS_CLI_EXEC
from frostfs_testlib.steps.node_management import storage_node_healthcheck
from frostfs_testlib.storage.cluster import ClusterNode
@ -9,6 +11,33 @@ reporter = get_reporter()
class BasicHealthcheck(Healthcheck):
@reporter.step_deco("Perform healthcheck for {cluster_node}")
def perform(self, cluster_node: ClusterNode):
health_check = storage_node_healthcheck(cluster_node.storage_node)
if health_check.health_status != "READY" or health_check.network_status != "ONLINE":
raise AssertionError("Node {cluster_node} is not healthy")
result = self.storage_healthcheck(cluster_node)
if result:
raise AssertionError(result)
@reporter.step_deco("Tree healthcheck on {cluster_node}")
def tree_healthcheck(self, cluster_node: ClusterNode) -> str | None:
host = cluster_node.host
service_config = host.get_service_config(cluster_node.storage_node.name)
wallet_path = service_config.attributes["wallet_path"]
wallet_password = service_config.attributes["wallet_password"]
shell = host.get_shell()
wallet_config_path = f"/tmp/{cluster_node.storage_node.name}-config.yaml"
wallet_config = f'wallet: {wallet_path}\npassword: "{wallet_password}"'
shell.exec(f"echo '{wallet_config}' > {wallet_config_path}")
remote_cli = FrostfsCli(
shell,
host.get_cli_config(FROSTFS_CLI_EXEC).exec_path,
config_file=wallet_config_path,
)
result = remote_cli.tree.healthcheck(rpc_endpoint="127.0.0.1:8080")
if result.return_code != 0:
return f"Error during tree healthcheck (rc={result.return_code}): {result.stdout}. \n Stderr: {result.stderr}"
@reporter.step_deco("Storage healthcheck on {cluster_node}")
def storage_healthcheck(self, cluster_node: ClusterNode) -> str | None:
result = storage_node_healthcheck(cluster_node.storage_node)
if result.health_status != "READY" or result.network_status != "ONLINE":
return f"Node {cluster_node} is not healthy. Health={result.health_status}. Network={result.network_status}"

View file

@ -7,3 +7,11 @@ class Healthcheck(ABC):
@abstractmethod
def perform(self, cluster_node: ClusterNode):
"""Perform healthcheck on the target cluster node"""
@abstractmethod
def tree_healthcheck(self, cluster_node: ClusterNode):
"""Check tree sync status on target cluster node"""
@abstractmethod
def storage_healthcheck(self, cluster_node: ClusterNode):
"""Perform storage node healthcheck on target cluster node"""

View file

@ -235,6 +235,7 @@ class DockerHost(Host):
since: Optional[datetime] = None,
until: Optional[datetime] = None,
unit: Optional[str] = None,
exclude_filter: Optional[str] = None,
) -> str:
client = self._get_docker_client()
filtered_logs = ""
@ -246,8 +247,11 @@ class DockerHost(Host):
logger.info(f"Got exception while dumping logs of '{container_name}': {exc}")
continue
if exclude_filter:
filtered_logs = filtered_logs.replace(exclude_filter, "")
matches = re.findall(filter_regex, filtered_logs, re.IGNORECASE + re.MULTILINE)
found = list(matches)
if found:
filtered_logs += f"{container_name}:\n{os.linesep.join(found)}"

View file

@ -287,6 +287,7 @@ class Host(ABC):
since: Optional[datetime] = None,
until: Optional[datetime] = None,
unit: Optional[str] = None,
exclude_filter: Optional[str] = None,
) -> str:
"""Get logs from host filtered by regex.

View file

@ -1,4 +1,5 @@
from frostfs_testlib.load.interfaces import Loader, ScenarioRunner
from frostfs_testlib.load.interfaces.loader import Loader
from frostfs_testlib.load.interfaces.scenario_runner import ScenarioRunner
from frostfs_testlib.load.load_config import (
EndpointSelectionStrategy,
K6ProcessAllocationStrategy,

View file

@ -0,0 +1,14 @@
from abc import ABC, abstractmethod
from frostfs_testlib.shell.interfaces import Shell
class Loader(ABC):
@abstractmethod
def get_shell(self) -> Shell:
"""Get shell for the loader"""
@property
@abstractmethod
def ip(self):
"""Get address of the loader"""

View file

@ -1,20 +1,8 @@
from abc import ABC, abstractmethod
from frostfs_testlib.load.k6 import K6
from frostfs_testlib.load.load_config import LoadParams
from frostfs_testlib.shell.interfaces import Shell
from frostfs_testlib.storage.cluster import ClusterNode
from frostfs_testlib.storage.dataclasses.wallet import WalletInfo
class Loader(ABC):
@abstractmethod
def get_shell(self) -> Shell:
"""Get shell for the loader"""
@property
@abstractmethod
def ip(self):
"""Get address of the loader"""
class ScenarioRunner(ABC):
@ -32,6 +20,10 @@ class ScenarioRunner(ABC):
def init_k6_instances(self, load_params: LoadParams, endpoints: list[str], k6_dir: str):
"""Init K6 instances"""
@abstractmethod
def get_k6_instances(self) -> list[K6]:
"""Get K6 instances"""
@abstractmethod
def start(self):
"""Start K6 instances"""

View file

@ -8,7 +8,7 @@ from time import sleep
from typing import Any
from urllib.parse import urlparse
from frostfs_testlib.load.interfaces import Loader
from frostfs_testlib.load.interfaces.loader import Loader
from frostfs_testlib.load.load_config import (
K6ProcessAllocationStrategy,
LoadParams,
@ -59,6 +59,7 @@ class K6:
self.loader: Loader = loader
self.shell: Shell = shell
self.wallet = wallet
self.preset_output: str = ""
self.summary_json: str = os.path.join(
self.load_params.working_dir,
f"{self.load_params.load_id}_{self.load_params.scenario.value}_summary.json",
@ -104,7 +105,9 @@ class K6:
assert (
result.return_code == EXIT_RESULT_CODE
), f"Return code of preset is not zero: {result.stdout}"
return result.stdout.strip("\n")
self.preset_output = result.stdout.strip("\n")
return self.preset_output
@reporter.step_deco("Generate K6 command")
def _generate_env_variables(self) -> str:

View file

@ -49,30 +49,20 @@ class LoadVerifier:
if deleters and not delete_operations:
issues.append(f"No any delete operation was performed")
if (
write_operations
and writers
and write_errors / write_operations * 100 > self.load_params.error_threshold
):
issues.append(
f"Write error rate is greater than threshold: {write_errors / write_operations * 100} > {self.load_params.error_threshold}"
)
if (
read_operations
and readers
and read_errors / read_operations * 100 > self.load_params.error_threshold
):
issues.append(
f"Read error rate is greater than threshold: {read_errors / read_operations * 100} > {self.load_params.error_threshold}"
)
if (
delete_operations
and deleters
and delete_errors / delete_operations * 100 > self.load_params.error_threshold
):
issues.append(
f"Delete error rate is greater than threshold: {delete_errors / delete_operations * 100} > {self.load_params.error_threshold}"
)
error_rate = self._get_error_rate(writers, write_operations, write_errors)
if error_rate > self.load_params.error_threshold:
rate_str = self._get_rate_str(error_rate)
issues.append(f"Write errors exceeded threshold: {rate_str} > {self.load_params.error_threshold}%")
error_rate = self._get_error_rate(readers, read_operations, read_errors)
if error_rate > self.load_params.error_threshold:
rate_str = self._get_rate_str(error_rate)
issues.append(f"Read errors exceeded threshold: {rate_str} > {self.load_params.error_threshold}%")
error_rate = self._get_error_rate(deleters, delete_operations, delete_errors)
if error_rate > self.load_params.error_threshold:
rate_str = self._get_rate_str(error_rate)
issues.append(f"Delete errors exceeded threshold: {rate_str} > {self.load_params.error_threshold}%")
return issues
@ -89,9 +79,17 @@ class LoadVerifier:
)
return verify_issues
def _collect_verify_issues_on_process(
self, label, load_summary, verification_summary
) -> list[str]:
def _get_error_rate(self, vus: int, operations: int, errors: int) -> float:
if not operations or not vus:
return 0
error_rate = errors / operations * 100
return error_rate
def _get_rate_str(self, rate: float, minimal: float = 0.01) -> str:
return f"{rate:.2f}%" if rate >= minimal else f"~{minimal}%"
def _collect_verify_issues_on_process(self, label, load_summary, verification_summary) -> list[str]:
issues = []
load_metrics = get_metrics_object(self.load_params.scenario, load_summary)
@ -110,6 +108,8 @@ class LoadVerifier:
invalid_objects = verify_metrics.read_failed_iterations
total_left_objects = load_metrics.write_success_iterations - delete_success
if invalid_objects > 0:
issues.append(f"There were {invalid_objects} verification fails (hash mismatch).")
# Due to interruptions we may see total verified objects to be less than written on writers count
if abs(total_left_objects - verified_objects) > writers:
issues.append(

View file

@ -1,4 +1,4 @@
from frostfs_testlib.load.interfaces import Loader
from frostfs_testlib.load.interfaces.loader import Loader
from frostfs_testlib.resources.load_params import (
LOAD_NODE_SSH_PASSWORD,
LOAD_NODE_SSH_PRIVATE_KEY_PASSPHRASE,

View file

@ -3,7 +3,6 @@ import itertools
import math
import re
import time
from concurrent.futures import ThreadPoolExecutor
from dataclasses import fields
from typing import Optional
from urllib.parse import urlparse
@ -11,7 +10,8 @@ from urllib.parse import urlparse
import yaml
from frostfs_testlib.cli.frostfs_authmate.authmate import FrostfsAuthmate
from frostfs_testlib.load.interfaces import Loader, ScenarioRunner
from frostfs_testlib.load.interfaces.loader import Loader
from frostfs_testlib.load.interfaces.scenario_runner import ScenarioRunner
from frostfs_testlib.load.k6 import K6
from frostfs_testlib.load.load_config import K6ProcessAllocationStrategy, LoadParams, LoadType
from frostfs_testlib.load.loaders import NodeLoader, RemoteLoader
@ -19,18 +19,17 @@ from frostfs_testlib.reporter import get_reporter
from frostfs_testlib.resources import optionals
from frostfs_testlib.resources.cli import FROSTFS_AUTHMATE_EXEC
from frostfs_testlib.resources.common import STORAGE_USER_NAME
from frostfs_testlib.resources.load_params import (
BACKGROUND_LOAD_VUS_COUNT_DIVISOR,
LOAD_NODE_SSH_USER,
LOAD_NODES,
)
from frostfs_testlib.resources.load_params import BACKGROUND_LOAD_VUS_COUNT_DIVISOR, LOAD_NODE_SSH_USER, LOAD_NODES
from frostfs_testlib.shell.command_inspectors import SuInspector
from frostfs_testlib.shell.interfaces import CommandOptions, InteractiveInput
from frostfs_testlib.storage.cluster import ClusterNode
from frostfs_testlib.storage.controllers.cluster_state_controller import ClusterStateController
from frostfs_testlib.storage.dataclasses.frostfs_services import S3Gate, StorageNode
from frostfs_testlib.storage.dataclasses.wallet import WalletInfo
from frostfs_testlib.testing import parallel, run_optionally
from frostfs_testlib.utils import FileKeeper, datetime_utils
from frostfs_testlib.testing.test_control import retry
from frostfs_testlib.utils import datetime_utils
from frostfs_testlib.utils.file_keeper import FileKeeper
reporter = get_reporter()
@ -52,6 +51,9 @@ class RunnerBase(ScenarioRunner):
return any([future.result() for future in futures])
def get_k6_instances(self):
return self.k6_instances
class DefaultRunner(RunnerBase):
loaders: list[Loader]
@ -81,14 +83,10 @@ class DefaultRunner(RunnerBase):
with reporter.step("Init s3 client on loaders"):
storage_node = nodes_under_load[0].service(StorageNode)
s3_public_keys = [
node.service(S3Gate).get_wallet_public_key() for node in cluster_nodes
]
s3_public_keys = [node.service(S3Gate).get_wallet_public_key() for node in cluster_nodes]
grpc_peer = storage_node.get_rpc_endpoint()
parallel(
self._prepare_loader, self.loaders, load_params, grpc_peer, s3_public_keys, k6_dir
)
parallel(self._prepare_loader, self.loaders, load_params, grpc_peer, s3_public_keys, k6_dir)
def _prepare_loader(
self,
@ -110,9 +108,9 @@ class DefaultRunner(RunnerBase):
wallet_password=self.loaders_wallet.password,
).stdout
aws_access_key_id = str(
re.search(
r"access_key_id.*:\s.(?P<aws_access_key_id>\w*)", issue_secret_output
).group("aws_access_key_id")
re.search(r"access_key_id.*:\s.(?P<aws_access_key_id>\w*)", issue_secret_output).group(
"aws_access_key_id"
)
)
aws_secret_access_key = str(
re.search(
@ -123,9 +121,7 @@ class DefaultRunner(RunnerBase):
configure_input = [
InteractiveInput(prompt_pattern=r"AWS Access Key ID.*", input=aws_access_key_id),
InteractiveInput(
prompt_pattern=r"AWS Secret Access Key.*", input=aws_secret_access_key
),
InteractiveInput(prompt_pattern=r"AWS Secret Access Key.*", input=aws_secret_access_key),
InteractiveInput(prompt_pattern=r".*", input=""),
InteractiveInput(prompt_pattern=r".*", input=""),
]
@ -142,16 +138,12 @@ class DefaultRunner(RunnerBase):
}
endpoints_generators = {
K6ProcessAllocationStrategy.PER_LOAD_NODE: itertools.cycle([endpoints]),
K6ProcessAllocationStrategy.PER_ENDPOINT: itertools.cycle(
[[endpoint] for endpoint in endpoints]
),
K6ProcessAllocationStrategy.PER_ENDPOINT: itertools.cycle([[endpoint] for endpoint in endpoints]),
}
k6_processes_count = k6_distribution_count[load_params.k6_process_allocation_strategy]
endpoints_gen = endpoints_generators[load_params.k6_process_allocation_strategy]
distributed_load_params_list = self._get_distributed_load_params_list(
load_params, k6_processes_count
)
distributed_load_params_list = self._get_distributed_load_params_list(load_params, k6_processes_count)
futures = parallel(
self._init_k6_instance,
@ -162,9 +154,7 @@ class DefaultRunner(RunnerBase):
)
self.k6_instances = [future.result() for future in futures]
def _init_k6_instance(
self, load_params_for_loader: LoadParams, loader: Loader, endpoints: list[str], k6_dir: str
):
def _init_k6_instance(self, load_params_for_loader: LoadParams, loader: Loader, endpoints: list[str], k6_dir: str):
shell = loader.get_shell()
with reporter.step(f"Init K6 instance on {loader.ip} for endpoints {endpoints}"):
with reporter.step(f"Make working directory"):
@ -202,9 +192,7 @@ class DefaultRunner(RunnerBase):
and getattr(original_load_params, field.name) is not None
):
original_value = getattr(original_load_params, field.name)
distribution = self._get_distribution(
math.ceil(original_value / divisor), workers_count
)
distribution = self._get_distribution(math.ceil(original_value / divisor), workers_count)
for i in range(workers_count):
setattr(distributed_load_params[i], field.name, distribution[i])
@ -231,10 +219,7 @@ class DefaultRunner(RunnerBase):
# Remainder of clients left to be distributed
remainder = clients_count - clients_per_worker * workers_count
distribution = [
clients_per_worker + 1 if i < remainder else clients_per_worker
for i in range(workers_count)
]
distribution = [clients_per_worker + 1 if i < remainder else clients_per_worker for i in range(workers_count)]
return distribution
def start(self):
@ -243,9 +228,7 @@ class DefaultRunner(RunnerBase):
parallel([k6.start for k6 in self.k6_instances])
wait_after_start_time = datetime_utils.parse_time(load_params.setup_timeout) + 5
with reporter.step(
f"Wait for start timeout + couple more seconds ({wait_after_start_time}) before moving on"
):
with reporter.step(f"Wait for start timeout + couple more seconds ({wait_after_start_time}) before moving on"):
time.sleep(wait_after_start_time)
def stop(self):
@ -296,13 +279,31 @@ class LocalRunner(RunnerBase):
nodes_under_load: list[ClusterNode],
k6_dir: str,
):
parallel(self.prepare_node, nodes_under_load, k6_dir, load_params)
@retry(3, 5, expected_result=True)
def allow_user_to_login_in_system(self, cluster_node: ClusterNode):
shell = cluster_node.host.get_shell()
result = None
try:
shell.exec(f"sudo chsh -s /bin/bash {STORAGE_USER_NAME}")
self.lock_passwd_on_node(cluster_node)
options = CommandOptions(check=False, extra_inspectors=[SuInspector(STORAGE_USER_NAME)])
result = shell.exec("whoami", options)
finally:
if not result or result.return_code:
self.restore_passwd_on_node(cluster_node)
return False
return True
@reporter.step_deco("Prepare node {cluster_node}")
def prepare_node(cluster_node: ClusterNode):
def prepare_node(self, cluster_node: ClusterNode, k6_dir: str, load_params: LoadParams):
shell = cluster_node.host.get_shell()
with reporter.step("Allow storage user to login into system"):
shell.exec(f"sudo chsh -s /bin/bash {STORAGE_USER_NAME}")
shell.exec("sudo chattr +i /etc/passwd")
self.allow_user_to_login_in_system(cluster_node)
with reporter.step("Update limits.conf"):
limits_path = "/etc/security/limits.conf"
@ -317,20 +318,11 @@ class LocalRunner(RunnerBase):
shell.exec(f"sudo chmod -R 777 {k6_dir}")
with reporter.step("Create empty_passwd"):
self.wallet = WalletInfo(
f"{k6_dir}/scenarios/files/wallet.json", "", "/tmp/empty_passwd.yml"
)
self.wallet = WalletInfo(f"{k6_dir}/scenarios/files/wallet.json", "", "/tmp/empty_passwd.yml")
content = yaml.dump({"password": ""})
shell.exec(f'echo "{content}" | sudo tee {self.wallet.config_path}')
shell.exec(f"sudo chmod -R 777 {self.wallet.config_path}")
with ThreadPoolExecutor(max_workers=len(nodes_under_load)) as executor:
result = executor.map(prepare_node, nodes_under_load)
# Check for exceptions
for _ in result:
pass
@reporter.step_deco("Init k6 instances")
def init_k6_instances(self, load_params: LoadParams, endpoints: list[str], k6_dir: str):
self.k6_instances = []
@ -368,30 +360,30 @@ class LocalRunner(RunnerBase):
def start(self):
load_params = self.k6_instances[0].load_params
self.cluster_state_controller.stop_all_s3_gates()
self.cluster_state_controller.stop_all_storage_services()
self.cluster_state_controller.stop_services_of_type(S3Gate)
self.cluster_state_controller.stop_services_of_type(StorageNode)
parallel([k6.start for k6 in self.k6_instances])
wait_after_start_time = datetime_utils.parse_time(load_params.setup_timeout) + 5
with reporter.step(
f"Wait for start timeout + couple more seconds ({wait_after_start_time}) before moving on"
):
with reporter.step(f"Wait for start timeout + couple more seconds ({wait_after_start_time}) before moving on"):
time.sleep(wait_after_start_time)
@reporter.step_deco("Restore passwd on {cluster_node}")
def restore_passwd_on_node(self, cluster_node: ClusterNode):
shell = cluster_node.host.get_shell()
shell.exec("sudo chattr -i /etc/passwd")
@reporter.step_deco("Lock passwd on {cluster_node}")
def lock_passwd_on_node(self, cluster_node: ClusterNode):
shell = cluster_node.host.get_shell()
shell.exec("sudo chattr +i /etc/passwd")
def stop(self):
for k6_instance in self.k6_instances:
k6_instance.stop()
@reporter.step_deco("Restore passwd on {cluster_node}")
def restore_passwd_attr_on_node(cluster_node: ClusterNode):
shell = cluster_node.host.get_shell()
shell.exec("sudo chattr -i /etc/passwd")
parallel(restore_passwd_attr_on_node, self.nodes_under_load)
self.cluster_state_controller.start_stopped_storage_services()
self.cluster_state_controller.start_stopped_s3_gates()
self.cluster_state_controller.start_all_stopped_services()
def get_results(self) -> dict:
results = {}
@ -399,4 +391,6 @@ class LocalRunner(RunnerBase):
result = k6_instance.get_results()
results[k6_instance.loader.ip] = result
parallel(self.restore_passwd_on_node, self.nodes_under_load)
return results

View file

@ -43,6 +43,6 @@ with open(DEFAULT_WALLET_CONFIG, "w") as file:
# Number of attempts that S3 clients will attempt per each request (1 means single attempt
# without any retries)
MAX_REQUEST_ATTEMPTS = 1
MAX_REQUEST_ATTEMPTS = 5
RETRY_MODE = "standard"
CREDENTIALS_CREATE_TIMEOUT = "1m"

View file

@ -7,12 +7,7 @@ from time import sleep
from typing import Literal, Optional, Union
from frostfs_testlib.reporter import get_reporter
from frostfs_testlib.resources.common import (
ASSETS_DIR,
MAX_REQUEST_ATTEMPTS,
RETRY_MODE,
S3_SYNC_WAIT_TIME,
)
from frostfs_testlib.resources.common import ASSETS_DIR, MAX_REQUEST_ATTEMPTS, RETRY_MODE, S3_SYNC_WAIT_TIME
from frostfs_testlib.s3.interfaces import S3ClientWrapper, VersioningStatus, _make_objs_dict
from frostfs_testlib.shell import CommandOptions
from frostfs_testlib.shell.local_shell import LocalShell
@ -22,7 +17,7 @@ from frostfs_testlib.utils.cli_utils import _configure_aws_cli
reporter = get_reporter()
logger = logging.getLogger("NeoLogger")
command_options = CommandOptions(timeout=240)
command_options = CommandOptions(timeout=480)
class AwsCliClient(S3ClientWrapper):
@ -128,9 +123,7 @@ class AwsCliClient(S3ClientWrapper):
@reporter.step_deco("Put bucket tagging")
def put_bucket_tagging(self, bucket: str, tags: list) -> None:
tags_json = {
"TagSet": [{"Key": tag_key, "Value": tag_value} for tag_key, tag_value in tags]
}
tags_json = {"TagSet": [{"Key": tag_key, "Value": tag_value} for tag_key, tag_value in tags]}
cmd = (
f"aws {self.common_flags} s3api put-bucket-tagging --bucket {bucket} "
f"--tagging '{json.dumps(tags_json)}' --endpoint {self.s3gate_endpoint}"
@ -140,8 +133,7 @@ class AwsCliClient(S3ClientWrapper):
@reporter.step_deco("Get bucket tagging")
def get_bucket_tagging(self, bucket: str) -> list:
cmd = (
f"aws {self.common_flags} s3api get-bucket-tagging --bucket {bucket} "
f"--endpoint {self.s3gate_endpoint}"
f"aws {self.common_flags} s3api get-bucket-tagging --bucket {bucket} " f"--endpoint {self.s3gate_endpoint}"
)
output = self.local_shell.exec(cmd).stdout
response = self._to_json(output)
@ -149,10 +141,7 @@ class AwsCliClient(S3ClientWrapper):
@reporter.step_deco("Get bucket acl")
def get_bucket_acl(self, bucket: str) -> list:
cmd = (
f"aws {self.common_flags} s3api get-bucket-acl --bucket {bucket} "
f"--endpoint {self.s3gate_endpoint}"
)
cmd = f"aws {self.common_flags} s3api get-bucket-acl --bucket {bucket} " f"--endpoint {self.s3gate_endpoint}"
output = self.local_shell.exec(cmd).stdout
response = self._to_json(output)
return response.get("Grants")
@ -160,8 +149,7 @@ class AwsCliClient(S3ClientWrapper):
@reporter.step_deco("Get bucket location")
def get_bucket_location(self, bucket: str) -> dict:
cmd = (
f"aws {self.common_flags} s3api get-bucket-location --bucket {bucket} "
f"--endpoint {self.s3gate_endpoint}"
f"aws {self.common_flags} s3api get-bucket-location --bucket {bucket} " f"--endpoint {self.s3gate_endpoint}"
)
output = self.local_shell.exec(cmd).stdout
response = self._to_json(output)
@ -169,10 +157,7 @@ class AwsCliClient(S3ClientWrapper):
@reporter.step_deco("List objects S3")
def list_objects(self, bucket: str, full_output: bool = False) -> Union[dict, list[str]]:
cmd = (
f"aws {self.common_flags} s3api list-objects --bucket {bucket} "
f"--endpoint {self.s3gate_endpoint}"
)
cmd = f"aws {self.common_flags} s3api list-objects --bucket {bucket} " f"--endpoint {self.s3gate_endpoint}"
output = self.local_shell.exec(cmd).stdout
response = self._to_json(output)
@ -183,10 +168,7 @@ class AwsCliClient(S3ClientWrapper):
@reporter.step_deco("List objects S3 v2")
def list_objects_v2(self, bucket: str, full_output: bool = False) -> Union[dict, list[str]]:
cmd = (
f"aws {self.common_flags} s3api list-objects-v2 --bucket {bucket} "
f"--endpoint {self.s3gate_endpoint}"
)
cmd = f"aws {self.common_flags} s3api list-objects-v2 --bucket {bucket} " f"--endpoint {self.s3gate_endpoint}"
output = self.local_shell.exec(cmd).stdout
response = self._to_json(output)
@ -371,10 +353,7 @@ class AwsCliClient(S3ClientWrapper):
grant_write: Optional[str] = None,
grant_read: Optional[str] = None,
) -> None:
cmd = (
f"aws {self.common_flags} s3api put-bucket-acl --bucket {bucket} "
f" --endpoint {self.s3gate_endpoint}"
)
cmd = f"aws {self.common_flags} s3api put-bucket-acl --bucket {bucket} " f" --endpoint {self.s3gate_endpoint}"
if acl:
cmd += f" --acl {acl}"
if grant_write:
@ -442,9 +421,7 @@ class AwsCliClient(S3ClientWrapper):
def delete_object_versions_without_dm(self, bucket: str, object_versions: list) -> None:
# Delete objects without creating delete markers
for object_version in object_versions:
self.delete_object(
bucket=bucket, key=object_version["Key"], version_id=object_version["VersionId"]
)
self.delete_object(bucket=bucket, key=object_version["Key"], version_id=object_version["VersionId"])
@reporter.step_deco("Get object attributes")
def get_object_attributes(
@ -480,10 +457,7 @@ class AwsCliClient(S3ClientWrapper):
@reporter.step_deco("Get bucket policy")
def get_bucket_policy(self, bucket: str) -> dict:
cmd = (
f"aws {self.common_flags} s3api get-bucket-policy --bucket {bucket} "
f"--endpoint {self.s3gate_endpoint}"
)
cmd = f"aws {self.common_flags} s3api get-bucket-policy --bucket {bucket} " f"--endpoint {self.s3gate_endpoint}"
output = self.local_shell.exec(cmd).stdout
response = self._to_json(output)
return response.get("Policy")
@ -505,10 +479,7 @@ class AwsCliClient(S3ClientWrapper):
@reporter.step_deco("Get bucket cors")
def get_bucket_cors(self, bucket: str) -> dict:
cmd = (
f"aws {self.common_flags} s3api get-bucket-cors --bucket {bucket} "
f"--endpoint {self.s3gate_endpoint}"
)
cmd = f"aws {self.common_flags} s3api get-bucket-cors --bucket {bucket} " f"--endpoint {self.s3gate_endpoint}"
output = self.local_shell.exec(cmd).stdout
response = self._to_json(output)
return response.get("CORSRules")
@ -524,8 +495,7 @@ class AwsCliClient(S3ClientWrapper):
@reporter.step_deco("Delete bucket cors")
def delete_bucket_cors(self, bucket: str) -> None:
cmd = (
f"aws {self.common_flags} s3api delete-bucket-cors --bucket {bucket} "
f"--endpoint {self.s3gate_endpoint}"
f"aws {self.common_flags} s3api delete-bucket-cors --bucket {bucket} " f"--endpoint {self.s3gate_endpoint}"
)
self.local_shell.exec(cmd)
@ -608,10 +578,7 @@ class AwsCliClient(S3ClientWrapper):
acl: Optional[str] = None,
metadata: Optional[dict] = None,
) -> dict:
cmd = (
f"aws {self.common_flags} s3 sync {dir_path} s3://{bucket} "
f"--endpoint-url {self.s3gate_endpoint}"
)
cmd = f"aws {self.common_flags} s3 sync {dir_path} s3://{bucket} " f"--endpoint-url {self.s3gate_endpoint}"
if metadata:
cmd += " --metadata"
for key, value in metadata.items():
@ -674,9 +641,7 @@ class AwsCliClient(S3ClientWrapper):
self.local_shell.exec(cmd)
@reporter.step_deco("Upload part S3")
def upload_part(
self, bucket: str, key: str, upload_id: str, part_num: int, filepath: str
) -> str:
def upload_part(self, bucket: str, key: str, upload_id: str, part_num: int, filepath: str) -> str:
cmd = (
f"aws {self.common_flags} s3api upload-part --bucket {bucket} --key {key} "
f"--upload-id {upload_id} --part-number {part_num} --body {filepath} "
@ -688,9 +653,7 @@ class AwsCliClient(S3ClientWrapper):
return response["ETag"]
@reporter.step_deco("Upload copy part S3")
def upload_part_copy(
self, bucket: str, key: str, upload_id: str, part_num: int, copy_source: str
) -> str:
def upload_part_copy(self, bucket: str, key: str, upload_id: str, part_num: int, copy_source: str) -> str:
cmd = (
f"aws {self.common_flags} s3api upload-part-copy --bucket {bucket} --key {key} "
f"--upload-id {upload_id} --part-number {part_num} --copy-source {copy_source} "
@ -698,9 +661,7 @@ class AwsCliClient(S3ClientWrapper):
)
output = self.local_shell.exec(cmd, command_options).stdout
response = self._to_json(output)
assert response.get("CopyPartResult", []).get(
"ETag"
), f"Expected ETag in response:\n{response}"
assert response.get("CopyPartResult", []).get("ETag"), f"Expected ETag in response:\n{response}"
return response["CopyPartResult"]["ETag"]
@ -732,7 +693,10 @@ class AwsCliClient(S3ClientWrapper):
f"--key {key} --upload-id {upload_id} --multipart-upload file://{file_path} "
f"--endpoint-url {self.s3gate_endpoint}"
)
self.local_shell.exec(cmd)
output = self.local_shell.exec(cmd).stdout
response = self._to_json(output)
return response
@reporter.step_deco("Put object lock configuration")
def put_object_lock_configuration(self, bucket: str, configuration: dict) -> dict:

View file

@ -573,6 +573,8 @@ class Boto3ClientWrapper(S3ClientWrapper):
)
log_command_execution("S3 Complete multipart upload", response)
return response
@reporter.step_deco("Put object retention")
@report_error
def put_object_retention(

View file

@ -2,7 +2,7 @@ import copy
from typing import Optional
import frostfs_testlib.resources.optionals as optionals
from frostfs_testlib.load.interfaces import ScenarioRunner
from frostfs_testlib.load.interfaces.scenario_runner import ScenarioRunner
from frostfs_testlib.load.load_config import (
EndpointSelectionStrategy,
LoadParams,

View file

@ -1,15 +1,15 @@
import copy
import time
import frostfs_testlib.resources.optionals as optionals
from frostfs_testlib.healthcheck.interfaces import Healthcheck
from frostfs_testlib.reporter import get_reporter
from frostfs_testlib.shell import CommandOptions, Shell, SshConnectionProvider
from frostfs_testlib.steps.network import IfUpDownHelper, IpTablesHelper
from frostfs_testlib.storage.cluster import Cluster, ClusterNode, StorageNode
from frostfs_testlib.storage.cluster import Cluster, ClusterNode, S3Gate, StorageNode
from frostfs_testlib.storage.controllers.disk_controller import DiskController
from frostfs_testlib.storage.dataclasses.node_base import NodeBase, ServiceClass
from frostfs_testlib.testing import parallel
from frostfs_testlib.testing.test_control import run_optionally
from frostfs_testlib.testing.test_control import run_optionally, wait_for_success
from frostfs_testlib.utils.failover_utils import (
wait_all_storage_nodes_returned,
wait_for_host_offline,
@ -22,18 +22,36 @@ if_up_down_helper = IfUpDownHelper()
class ClusterStateController:
def __init__(self, shell: Shell, cluster: Cluster) -> None:
def __init__(self, shell: Shell, cluster: Cluster, healthcheck: Healthcheck) -> None:
self.stopped_nodes: list[ClusterNode] = []
self.detached_disks: dict[str, DiskController] = {}
self.stopped_storage_nodes: list[ClusterNode] = []
self.stopped_s3_gates: list[ClusterNode] = []
self.dropped_traffic: list[ClusterNode] = []
self.stopped_services: set[NodeBase] = set()
self.cluster = cluster
self.healthcheck = healthcheck
self.shell = shell
self.suspended_services: dict[str, list[ClusterNode]] = {}
self.nodes_with_modified_interface: list[ClusterNode] = []
def _get_stopped_by_node(self, node: ClusterNode) -> set[NodeBase]:
stopped_by_node = [svc for svc in self.stopped_services if svc.host == node.host]
return set(stopped_by_node)
def _get_stopped_by_type(self, service_type: type[ServiceClass]) -> set[ServiceClass]:
stopped_by_type = [svc for svc in self.stopped_services if isinstance(svc, service_type)]
return set(stopped_by_type)
def _from_stopped_nodes(self, service_type: type[ServiceClass]) -> set[ServiceClass]:
stopped_on_nodes = set([node.service(service_type) for node in self.stopped_nodes])
return set(stopped_on_nodes)
def _get_online(self, service_type: type[ServiceClass]) -> set[ServiceClass]:
stopped_svc = self._get_stopped_by_type(service_type).union(
self._from_stopped_nodes(service_type)
)
online_svc = set(self.cluster.services(service_type)) - stopped_svc
return online_svc
@run_optionally(optionals.OPTIONAL_FAILOVER_ENABLED)
@reporter.step_deco("Stop host of node {node}")
def stop_node_host(self, node: ClusterNode, mode: str):
@ -65,34 +83,16 @@ class ClusterStateController:
for node in nodes:
wait_for_host_offline(self.shell, node.storage_node)
@run_optionally(optionals.OPTIONAL_FAILOVER_ENABLED)
@reporter.step_deco("Stop all storage services on cluster")
def stop_all_storage_services(self, reversed_order: bool = False):
nodes = (
reversed(self.cluster.cluster_nodes) if reversed_order else self.cluster.cluster_nodes
)
for node in nodes:
self.stop_storage_service(node)
@run_optionally(optionals.OPTIONAL_FAILOVER_ENABLED)
@reporter.step_deco("Stop all S3 gates on cluster")
def stop_all_s3_gates(self, reversed_order: bool = False):
nodes = (
reversed(self.cluster.cluster_nodes) if reversed_order else self.cluster.cluster_nodes
)
for node in nodes:
self.stop_s3_gate(node)
@run_optionally(optionals.OPTIONAL_FAILOVER_ENABLED)
@reporter.step_deco("Start host of node {node}")
def start_node_host(self, node: ClusterNode):
def start_node_host(self, node: ClusterNode, tree_healthcheck: bool = True):
with reporter.step(f"Start host {node.host.config.address}"):
node.host.start_host()
wait_for_host_online(self.shell, node.storage_node)
wait_for_node_online(node.storage_node)
self.stopped_nodes.remove(node)
wait_for_node_online(node.storage_node)
if tree_healthcheck:
self.wait_tree_healthcheck()
@run_optionally(optionals.OPTIONAL_FAILOVER_ENABLED)
@reporter.step_deco("Start stopped hosts")
@ -104,13 +104,10 @@ class ClusterStateController:
for node in nodes:
with reporter.step(f"Start host {node.host.config.address}"):
node.host.start_host()
if node in self.stopped_storage_nodes:
self.stopped_storage_nodes.remove(node)
self.stopped_services.difference_update(self._get_stopped_by_node(node))
if node in self.stopped_s3_gates:
self.stopped_s3_gates.remove(node)
self.stopped_nodes = []
wait_all_storage_nodes_returned(self.shell, self.cluster)
self.wait_after_storage_startup()
@run_optionally(optionals.OPTIONAL_FAILOVER_ENABLED)
@reporter.step_deco("Detach disk {device} at {mountpoint} on node {node}")
@ -133,42 +130,58 @@ class ClusterStateController:
disk_controller.attach()
self.detached_disks = {}
@run_optionally(optionals.OPTIONAL_FAILOVER_ENABLED)
@reporter.step_deco("Stop storage service on {node}")
def stop_storage_service(self, node: ClusterNode, mask: bool = True):
self.stopped_storage_nodes.append(node)
node.storage_node.stop_service(mask)
@run_optionally(optionals.OPTIONAL_FAILOVER_ENABLED)
@reporter.step_deco("Stop all {service_type} services")
def stop_services_of_type(self, service_type: type[ServiceClass]):
def stop_services_of_type(self, service_type: type[ServiceClass], mask: bool = True):
services = self.cluster.services(service_type)
self.stopped_services.update(services)
parallel([service.stop_service for service in services])
parallel([service.stop_service for service in services], mask=mask)
@run_optionally(optionals.OPTIONAL_FAILOVER_ENABLED)
@reporter.step_deco("Start all {service_type} services")
def start_services_of_type(self, service_type: type[ServiceClass]):
services = self.cluster.services(service_type)
parallel([service.start_service for service in services])
self.stopped_services.difference_update(set(services))
if service_type == StorageNode:
wait_all_storage_nodes_returned(self.shell, self.cluster)
self.wait_after_storage_startup()
self.stopped_services = self.stopped_services - set(services)
@wait_for_success(600, 60)
def wait_s3gate(self, s3gate: S3Gate):
with reporter.step(f"Wait for {s3gate} reconnection"):
result = s3gate.get_metric("frostfs_s3_gw_pool_current_nodes")
assert (
'address="127.0.0.1' in result.stdout
), "S3Gate should connect to local storage node"
@reporter.step_deco("Wait for S3Gates reconnection to local storage")
def wait_s3gates(self):
online_s3gates = self._get_online(S3Gate)
if online_s3gates:
parallel(self.wait_s3gate, online_s3gates)
@wait_for_success(600, 60)
def wait_tree_healthcheck(self):
nodes = self.cluster.nodes(self._get_online(StorageNode))
parallel(self.healthcheck.tree_healthcheck, nodes)
@reporter.step_deco("Wait for storage reconnection to the system")
def wait_after_storage_startup(self):
wait_all_storage_nodes_returned(self.shell, self.cluster)
self.wait_s3gates()
self.wait_tree_healthcheck()
@run_optionally(optionals.OPTIONAL_FAILOVER_ENABLED)
@reporter.step_deco("Start all stopped services")
def start_all_stopped_services(self):
stopped_storages = self._get_stopped_by_type(StorageNode)
parallel([service.start_service for service in self.stopped_services])
for service in self.stopped_services:
if isinstance(service, StorageNode):
wait_all_storage_nodes_returned(self.shell, self.cluster)
break
self.stopped_services.clear()
if stopped_storages:
self.wait_after_storage_startup()
@run_optionally(optionals.OPTIONAL_FAILOVER_ENABLED)
@reporter.step_deco("Stop {service_type} service on {node}")
def stop_service_of_type(
@ -183,50 +196,78 @@ class ClusterStateController:
def start_service_of_type(self, node: ClusterNode, service_type: type[ServiceClass]):
service = node.service(service_type)
service.start_service()
if service in self.stopped_services:
self.stopped_services.remove(service)
self.stopped_services.discard(service)
@run_optionally(optionals.OPTIONAL_FAILOVER_ENABLED)
@reporter.step_deco("Start all stopped {service_type} services")
def start_stopped_services_of_type(self, service_type: type[ServiceClass]):
stopped_svc = self._get_stopped_by_type(service_type)
if not stopped_svc:
return
parallel([svc.start_service for svc in stopped_svc])
self.stopped_services.difference_update(stopped_svc)
if service_type == StorageNode:
self.wait_after_storage_startup()
# TODO: Deprecated
@run_optionally(optionals.OPTIONAL_FAILOVER_ENABLED)
@reporter.step_deco("Stop all storage services on cluster")
def stop_all_storage_services(self, reversed_order: bool = False):
nodes = (
reversed(self.cluster.cluster_nodes) if reversed_order else self.cluster.cluster_nodes
)
for node in nodes:
self.stop_service_of_type(node, StorageNode)
# TODO: Deprecated
@run_optionally(optionals.OPTIONAL_FAILOVER_ENABLED)
@reporter.step_deco("Stop all S3 gates on cluster")
def stop_all_s3_gates(self, reversed_order: bool = False):
nodes = (
reversed(self.cluster.cluster_nodes) if reversed_order else self.cluster.cluster_nodes
)
for node in nodes:
self.stop_service_of_type(node, S3Gate)
# TODO: Deprecated
@run_optionally(optionals.OPTIONAL_FAILOVER_ENABLED)
@reporter.step_deco("Stop storage service on {node}")
def stop_storage_service(self, node: ClusterNode, mask: bool = True):
self.stop_service_of_type(node, StorageNode, mask)
# TODO: Deprecated
@run_optionally(optionals.OPTIONAL_FAILOVER_ENABLED)
@reporter.step_deco("Start storage service on {node}")
def start_storage_service(self, node: ClusterNode):
node.storage_node.start_service()
self.stopped_storage_nodes.remove(node)
self.start_service_of_type(node, StorageNode)
# TODO: Deprecated
@run_optionally(optionals.OPTIONAL_FAILOVER_ENABLED)
@reporter.step_deco("Start stopped storage services")
def start_stopped_storage_services(self):
if not self.stopped_storage_nodes:
return
# In case if we stopped couple services, for example (s01-s04):
# After starting only s01, it may require connections to s02-s04, which is still down, and fail to start.
# Also, if something goes wrong here, we might skip s02-s04 start at all, and cluster will be left in a bad state.
# So in order to make sure that services are at least attempted to be started, using parallel runs here.
parallel(self.start_storage_service, copy.copy(self.stopped_storage_nodes))
wait_all_storage_nodes_returned(self.shell, self.cluster)
self.stopped_storage_nodes = []
self.start_stopped_services_of_type(StorageNode)
# TODO: Deprecated
@run_optionally(optionals.OPTIONAL_FAILOVER_ENABLED)
@reporter.step_deco("Stop s3 gate on {node}")
def stop_s3_gate(self, node: ClusterNode, mask: bool = True):
node.s3_gate.stop_service(mask)
self.stopped_s3_gates.append(node)
self.stop_service_of_type(node, S3Gate, mask)
# TODO: Deprecated
@run_optionally(optionals.OPTIONAL_FAILOVER_ENABLED)
@reporter.step_deco("Start s3 gate on {node}")
def start_s3_gate(self, node: ClusterNode):
node.s3_gate.start_service()
self.stopped_s3_gates.remove(node)
self.start_service_of_type(node, S3Gate)
# TODO: Deprecated
@run_optionally(optionals.OPTIONAL_FAILOVER_ENABLED)
@reporter.step_deco("Start stopped S3 gates")
def start_stopped_s3_gates(self):
if not self.stopped_s3_gates:
return
parallel(self.start_s3_gate, copy.copy(self.stopped_s3_gates))
self.stopped_s3_gates = []
self.start_stopped_services_of_type(S3Gate)
@run_optionally(optionals.OPTIONAL_FAILOVER_ENABLED)
@reporter.step_deco("Suspend {process_name} service in {node}")
@ -312,7 +353,9 @@ class ClusterStateController:
@run_optionally(optionals.OPTIONAL_FAILOVER_ENABLED)
@reporter.step_deco("Hard reboot host {node} via magic SysRq option")
def panic_reboot_host(self, node: ClusterNode, wait_for_return: bool = True):
def panic_reboot_host(
self, node: ClusterNode, wait_for_return: bool = True, tree_healthcheck: bool = True
):
shell = node.host.get_shell()
shell.exec('sudo sh -c "echo 1 > /proc/sys/kernel/sysrq"')
@ -329,6 +372,8 @@ class ClusterStateController:
time.sleep(10)
wait_for_host_online(self.shell, node.storage_node)
wait_for_node_online(node.storage_node)
if tree_healthcheck:
self.wait_tree_healthcheck()
@reporter.step_deco("Down {interface} to {nodes}")
def down_interface(self, nodes: list[ClusterNode], interface: str):

View file

@ -1,12 +1,15 @@
from abc import abstractmethod
from dataclasses import dataclass
from datetime import datetime, timezone
from typing import Optional, TypedDict, TypeVar
import yaml
from dateutil import parser
from frostfs_testlib.hosting.config import ServiceConfig
from frostfs_testlib.hosting.interfaces import Host
from frostfs_testlib.reporter import get_reporter
from frostfs_testlib.shell.interfaces import CommandResult
from frostfs_testlib.storage.constants import ConfigAttributes
from frostfs_testlib.testing.readable import HumanReadableABC
from frostfs_testlib.utils import wallet_utils
@ -67,6 +70,12 @@ class NodeBase(HumanReadableABC):
def service_healthcheck(self) -> bool:
"""Service healthcheck."""
# TODO: Migrate to sub-class Metrcis (not yet exists :))
def get_metric(self, metric: str) -> CommandResult:
shell = self.host.get_shell()
result = shell.exec(f"curl -s {self.get_metrics_endpoint()} | grep -e '^{metric}'")
return result
def get_metrics_endpoint(self) -> str:
return self._get_attribute(ConfigAttributes.ENDPOINT_PROMETHEUS)
@ -157,6 +166,15 @@ class NodeBase(HumanReadableABC):
def _get_service_config(self) -> ServiceConfig:
return self.host.get_service_config(self.name)
def get_service_uptime(self, service: str) -> datetime:
result = self.host.get_shell().exec(
f"systemctl show {service} --property ActiveEnterTimestamp | cut -d '=' -f 2"
)
start_time = parser.parse(result.stdout.strip())
current_time = datetime.now(tz=timezone.utc)
active_time = current_time - start_time
return active_time
ServiceClass = TypeVar("ServiceClass", bound=NodeBase)

View file

@ -3,6 +3,3 @@ import frostfs_testlib.utils.datetime_utils
import frostfs_testlib.utils.json_utils
import frostfs_testlib.utils.string_utils
import frostfs_testlib.utils.wallet_utils
# TODO: Circullar dependency FileKeeper -> NodeBase -> Utils -> FileKeeper -> NodeBase
from frostfs_testlib.utils.file_keeper import FileKeeper

View file

@ -12,6 +12,7 @@ from frostfs_testlib.steps.node_management import storage_node_healthcheck
from frostfs_testlib.steps.storage_policy import get_nodes_with_object
from frostfs_testlib.storage.cluster import Cluster, ClusterNode, NodeBase, StorageNode
from frostfs_testlib.storage.dataclasses.frostfs_services import MorphChain
from frostfs_testlib.testing.parallel import parallel
from frostfs_testlib.testing.test_control import retry, wait_for_success
from frostfs_testlib.utils.datetime_utils import parse_time
@ -26,10 +27,15 @@ def ping_host(shell: Shell, host: Host):
return shell.exec(f"ping {host.config.address} -c 1", options).return_code
# TODO: Move to ClusterStateController
@reporter.step_deco("Wait for storage nodes returned to cluster")
def wait_all_storage_nodes_returned(shell: Shell, cluster: Cluster) -> None:
for node in cluster.services(StorageNode):
with reporter.step(f"Run health check for storage at '{node}'"):
nodes = cluster.services(StorageNode)
parallel(_wait_for_storage_node, nodes, shell=shell)
@reporter.step_deco("Run health check for storage at '{node}'")
def _wait_for_storage_node(node: StorageNode, shell: Shell) -> None:
wait_for_host_online(shell, node)
wait_for_node_online(node)
@ -64,10 +70,17 @@ def wait_for_node_online(node: StorageNode):
except Exception as err:
logger.warning(f"Node healthcheck fails with error {err}")
return False
finally:
gather_socket_info(node)
return health_check.health_status == "READY" and health_check.network_status == "ONLINE"
@reporter.step_deco("Gather socket info for {node}")
def gather_socket_info(node: StorageNode):
node.host.get_shell().exec("ss -tuln | grep 8080", CommandOptions(check=False))
@reporter.step_deco("Check and return status of given service")
def service_status(service: str, shell: Shell) -> str:
return shell.exec(f"sudo systemctl is-active {service}").stdout.rstrip()