Compare commits

...

12 commits

Author SHA1 Message Date
5a46da975e [#225] Restore invalid_obj check
Signed-off-by: a.berezin <a.berezin@yadro.com>
2024-05-16 09:57:02 +00:00
8f7f222e0d [#213] Return response in complete_multipart_upload function 2024-04-24 14:55:31 +03:00
e3d5c95bde [#147] Prettify verifier messages for error rates
Signed-off-by: Andrey Berezin <a.berezin@yadro.com>
2023-12-13 14:02:06 +03:00
3e8614f912 [#141] Executive command changed
Added exception of error 'Too many requests' in log analyzer

Signed-off-by: Mikhail Kadilov m.kadilov@yadro.com
2023-12-11 14:15:00 +03:00
5bebfddc33 [OBJECT-138] Executive command changed
Added exception of error 'Too many requests' in log analyzer and fixed arguments ordr in get_filtered_logs

Signed-off-by: Mikhail Kadilov m.kadilov@yadro.com
2023-12-06 17:58:06 +03:00
8966ec20f2 [#135] Add method uptime service
Signed-off-by: Dmitriy Zayakin <d.zayakin@yadro.com>
2023-12-04 07:07:38 +00:00
74eb72f59d [#129] Change local timeout
Signed-off-by: Dmitriy Zayakin <d.zayakin@yadro.com>
2023-11-23 14:20:38 +03:00
5db091ca60 [#116] Updates for local scenario teardown
Signed-off-by: Andrey Berezin <a.berezin@yadro.com>
2023-11-22 11:11:23 +03:00
7cc2bb2d1d [#110] Move chattr call after get_results call
Signed-off-by: Andrey Berezin <a.berezin@yadro.com>
2023-11-22 11:11:18 +03:00
bb10456ec5 Store k6 output and add socket info collection
Signed-off-by: Andrey Berezin <a.berezin@yadro.com>
2023-11-20 16:09:22 +03:00
44e7c7eb29 Add AWS retries
Signed-off-by: Dmitriy Zayakin <d.zayakin@yadro.com>
2023-11-14 10:39:44 +03:00
5687b79b38 [#109] Update CSC with healthchecks
(cherry picked from commit e970fe2788)
Signed-off-by: Dmitry Anurin <d.anurin@yadro.com>
2023-11-13 12:03:59 +03:00
20 changed files with 355 additions and 270 deletions

View file

@ -50,10 +50,10 @@ basic = "frostfs_testlib.healthcheck.basic_healthcheck:BasicHealthcheck"
[tool.isort] [tool.isort]
profile = "black" profile = "black"
src_paths = ["src", "tests"] src_paths = ["src", "tests"]
line_length = 100 line_length = 120
[tool.black] [tool.black]
line-length = 100 line-length = 120
target-version = ["py310"] target-version = ["py310"]
[tool.bumpver] [tool.bumpver]

View file

@ -1,5 +1,7 @@
from frostfs_testlib.cli.frostfs_cli.cli import FrostfsCli
from frostfs_testlib.healthcheck.interfaces import Healthcheck from frostfs_testlib.healthcheck.interfaces import Healthcheck
from frostfs_testlib.reporter import get_reporter from frostfs_testlib.reporter import get_reporter
from frostfs_testlib.resources.cli import FROSTFS_CLI_EXEC
from frostfs_testlib.steps.node_management import storage_node_healthcheck from frostfs_testlib.steps.node_management import storage_node_healthcheck
from frostfs_testlib.storage.cluster import ClusterNode from frostfs_testlib.storage.cluster import ClusterNode
@ -9,6 +11,33 @@ reporter = get_reporter()
class BasicHealthcheck(Healthcheck): class BasicHealthcheck(Healthcheck):
@reporter.step_deco("Perform healthcheck for {cluster_node}") @reporter.step_deco("Perform healthcheck for {cluster_node}")
def perform(self, cluster_node: ClusterNode): def perform(self, cluster_node: ClusterNode):
health_check = storage_node_healthcheck(cluster_node.storage_node) result = self.storage_healthcheck(cluster_node)
if health_check.health_status != "READY" or health_check.network_status != "ONLINE": if result:
raise AssertionError("Node {cluster_node} is not healthy") raise AssertionError(result)
@reporter.step_deco("Tree healthcheck on {cluster_node}")
def tree_healthcheck(self, cluster_node: ClusterNode) -> str | None:
host = cluster_node.host
service_config = host.get_service_config(cluster_node.storage_node.name)
wallet_path = service_config.attributes["wallet_path"]
wallet_password = service_config.attributes["wallet_password"]
shell = host.get_shell()
wallet_config_path = f"/tmp/{cluster_node.storage_node.name}-config.yaml"
wallet_config = f'wallet: {wallet_path}\npassword: "{wallet_password}"'
shell.exec(f"echo '{wallet_config}' > {wallet_config_path}")
remote_cli = FrostfsCli(
shell,
host.get_cli_config(FROSTFS_CLI_EXEC).exec_path,
config_file=wallet_config_path,
)
result = remote_cli.tree.healthcheck(rpc_endpoint="127.0.0.1:8080")
if result.return_code != 0:
return f"Error during tree healthcheck (rc={result.return_code}): {result.stdout}. \n Stderr: {result.stderr}"
@reporter.step_deco("Storage healthcheck on {cluster_node}")
def storage_healthcheck(self, cluster_node: ClusterNode) -> str | None:
result = storage_node_healthcheck(cluster_node.storage_node)
if result.health_status != "READY" or result.network_status != "ONLINE":
return f"Node {cluster_node} is not healthy. Health={result.health_status}. Network={result.network_status}"

View file

@ -7,3 +7,11 @@ class Healthcheck(ABC):
@abstractmethod @abstractmethod
def perform(self, cluster_node: ClusterNode): def perform(self, cluster_node: ClusterNode):
"""Perform healthcheck on the target cluster node""" """Perform healthcheck on the target cluster node"""
@abstractmethod
def tree_healthcheck(self, cluster_node: ClusterNode):
"""Check tree sync status on target cluster node"""
@abstractmethod
def storage_healthcheck(self, cluster_node: ClusterNode):
"""Perform storage node healthcheck on target cluster node"""

View file

@ -235,6 +235,7 @@ class DockerHost(Host):
since: Optional[datetime] = None, since: Optional[datetime] = None,
until: Optional[datetime] = None, until: Optional[datetime] = None,
unit: Optional[str] = None, unit: Optional[str] = None,
exclude_filter: Optional[str] = None,
) -> str: ) -> str:
client = self._get_docker_client() client = self._get_docker_client()
filtered_logs = "" filtered_logs = ""
@ -246,8 +247,11 @@ class DockerHost(Host):
logger.info(f"Got exception while dumping logs of '{container_name}': {exc}") logger.info(f"Got exception while dumping logs of '{container_name}': {exc}")
continue continue
if exclude_filter:
filtered_logs = filtered_logs.replace(exclude_filter, "")
matches = re.findall(filter_regex, filtered_logs, re.IGNORECASE + re.MULTILINE) matches = re.findall(filter_regex, filtered_logs, re.IGNORECASE + re.MULTILINE)
found = list(matches) found = list(matches)
if found: if found:
filtered_logs += f"{container_name}:\n{os.linesep.join(found)}" filtered_logs += f"{container_name}:\n{os.linesep.join(found)}"

View file

@ -287,6 +287,7 @@ class Host(ABC):
since: Optional[datetime] = None, since: Optional[datetime] = None,
until: Optional[datetime] = None, until: Optional[datetime] = None,
unit: Optional[str] = None, unit: Optional[str] = None,
exclude_filter: Optional[str] = None,
) -> str: ) -> str:
"""Get logs from host filtered by regex. """Get logs from host filtered by regex.

View file

@ -1,4 +1,5 @@
from frostfs_testlib.load.interfaces import Loader, ScenarioRunner from frostfs_testlib.load.interfaces.loader import Loader
from frostfs_testlib.load.interfaces.scenario_runner import ScenarioRunner
from frostfs_testlib.load.load_config import ( from frostfs_testlib.load.load_config import (
EndpointSelectionStrategy, EndpointSelectionStrategy,
K6ProcessAllocationStrategy, K6ProcessAllocationStrategy,

View file

@ -0,0 +1,14 @@
from abc import ABC, abstractmethod
from frostfs_testlib.shell.interfaces import Shell
class Loader(ABC):
@abstractmethod
def get_shell(self) -> Shell:
"""Get shell for the loader"""
@property
@abstractmethod
def ip(self):
"""Get address of the loader"""

View file

@ -1,20 +1,8 @@
from abc import ABC, abstractmethod from abc import ABC, abstractmethod
from frostfs_testlib.load.k6 import K6
from frostfs_testlib.load.load_config import LoadParams from frostfs_testlib.load.load_config import LoadParams
from frostfs_testlib.shell.interfaces import Shell
from frostfs_testlib.storage.cluster import ClusterNode from frostfs_testlib.storage.cluster import ClusterNode
from frostfs_testlib.storage.dataclasses.wallet import WalletInfo
class Loader(ABC):
@abstractmethod
def get_shell(self) -> Shell:
"""Get shell for the loader"""
@property
@abstractmethod
def ip(self):
"""Get address of the loader"""
class ScenarioRunner(ABC): class ScenarioRunner(ABC):
@ -32,6 +20,10 @@ class ScenarioRunner(ABC):
def init_k6_instances(self, load_params: LoadParams, endpoints: list[str], k6_dir: str): def init_k6_instances(self, load_params: LoadParams, endpoints: list[str], k6_dir: str):
"""Init K6 instances""" """Init K6 instances"""
@abstractmethod
def get_k6_instances(self) -> list[K6]:
"""Get K6 instances"""
@abstractmethod @abstractmethod
def start(self): def start(self):
"""Start K6 instances""" """Start K6 instances"""

View file

@ -8,7 +8,7 @@ from time import sleep
from typing import Any from typing import Any
from urllib.parse import urlparse from urllib.parse import urlparse
from frostfs_testlib.load.interfaces import Loader from frostfs_testlib.load.interfaces.loader import Loader
from frostfs_testlib.load.load_config import ( from frostfs_testlib.load.load_config import (
K6ProcessAllocationStrategy, K6ProcessAllocationStrategy,
LoadParams, LoadParams,
@ -59,6 +59,7 @@ class K6:
self.loader: Loader = loader self.loader: Loader = loader
self.shell: Shell = shell self.shell: Shell = shell
self.wallet = wallet self.wallet = wallet
self.preset_output: str = ""
self.summary_json: str = os.path.join( self.summary_json: str = os.path.join(
self.load_params.working_dir, self.load_params.working_dir,
f"{self.load_params.load_id}_{self.load_params.scenario.value}_summary.json", f"{self.load_params.load_id}_{self.load_params.scenario.value}_summary.json",
@ -104,7 +105,9 @@ class K6:
assert ( assert (
result.return_code == EXIT_RESULT_CODE result.return_code == EXIT_RESULT_CODE
), f"Return code of preset is not zero: {result.stdout}" ), f"Return code of preset is not zero: {result.stdout}"
return result.stdout.strip("\n")
self.preset_output = result.stdout.strip("\n")
return self.preset_output
@reporter.step_deco("Generate K6 command") @reporter.step_deco("Generate K6 command")
def _generate_env_variables(self) -> str: def _generate_env_variables(self) -> str:

View file

@ -49,30 +49,20 @@ class LoadVerifier:
if deleters and not delete_operations: if deleters and not delete_operations:
issues.append(f"No any delete operation was performed") issues.append(f"No any delete operation was performed")
if ( error_rate = self._get_error_rate(writers, write_operations, write_errors)
write_operations if error_rate > self.load_params.error_threshold:
and writers rate_str = self._get_rate_str(error_rate)
and write_errors / write_operations * 100 > self.load_params.error_threshold issues.append(f"Write errors exceeded threshold: {rate_str} > {self.load_params.error_threshold}%")
):
issues.append( error_rate = self._get_error_rate(readers, read_operations, read_errors)
f"Write error rate is greater than threshold: {write_errors / write_operations * 100} > {self.load_params.error_threshold}" if error_rate > self.load_params.error_threshold:
) rate_str = self._get_rate_str(error_rate)
if ( issues.append(f"Read errors exceeded threshold: {rate_str} > {self.load_params.error_threshold}%")
read_operations
and readers error_rate = self._get_error_rate(deleters, delete_operations, delete_errors)
and read_errors / read_operations * 100 > self.load_params.error_threshold if error_rate > self.load_params.error_threshold:
): rate_str = self._get_rate_str(error_rate)
issues.append( issues.append(f"Delete errors exceeded threshold: {rate_str} > {self.load_params.error_threshold}%")
f"Read error rate is greater than threshold: {read_errors / read_operations * 100} > {self.load_params.error_threshold}"
)
if (
delete_operations
and deleters
and delete_errors / delete_operations * 100 > self.load_params.error_threshold
):
issues.append(
f"Delete error rate is greater than threshold: {delete_errors / delete_operations * 100} > {self.load_params.error_threshold}"
)
return issues return issues
@ -89,9 +79,17 @@ class LoadVerifier:
) )
return verify_issues return verify_issues
def _collect_verify_issues_on_process( def _get_error_rate(self, vus: int, operations: int, errors: int) -> float:
self, label, load_summary, verification_summary if not operations or not vus:
) -> list[str]: return 0
error_rate = errors / operations * 100
return error_rate
def _get_rate_str(self, rate: float, minimal: float = 0.01) -> str:
return f"{rate:.2f}%" if rate >= minimal else f"~{minimal}%"
def _collect_verify_issues_on_process(self, label, load_summary, verification_summary) -> list[str]:
issues = [] issues = []
load_metrics = get_metrics_object(self.load_params.scenario, load_summary) load_metrics = get_metrics_object(self.load_params.scenario, load_summary)
@ -110,6 +108,8 @@ class LoadVerifier:
invalid_objects = verify_metrics.read_failed_iterations invalid_objects = verify_metrics.read_failed_iterations
total_left_objects = load_metrics.write_success_iterations - delete_success total_left_objects = load_metrics.write_success_iterations - delete_success
if invalid_objects > 0:
issues.append(f"There were {invalid_objects} verification fails (hash mismatch).")
# Due to interruptions we may see total verified objects to be less than written on writers count # Due to interruptions we may see total verified objects to be less than written on writers count
if abs(total_left_objects - verified_objects) > writers: if abs(total_left_objects - verified_objects) > writers:
issues.append( issues.append(

View file

@ -1,4 +1,4 @@
from frostfs_testlib.load.interfaces import Loader from frostfs_testlib.load.interfaces.loader import Loader
from frostfs_testlib.resources.load_params import ( from frostfs_testlib.resources.load_params import (
LOAD_NODE_SSH_PASSWORD, LOAD_NODE_SSH_PASSWORD,
LOAD_NODE_SSH_PRIVATE_KEY_PASSPHRASE, LOAD_NODE_SSH_PRIVATE_KEY_PASSPHRASE,

View file

@ -3,7 +3,6 @@ import itertools
import math import math
import re import re
import time import time
from concurrent.futures import ThreadPoolExecutor
from dataclasses import fields from dataclasses import fields
from typing import Optional from typing import Optional
from urllib.parse import urlparse from urllib.parse import urlparse
@ -11,7 +10,8 @@ from urllib.parse import urlparse
import yaml import yaml
from frostfs_testlib.cli.frostfs_authmate.authmate import FrostfsAuthmate from frostfs_testlib.cli.frostfs_authmate.authmate import FrostfsAuthmate
from frostfs_testlib.load.interfaces import Loader, ScenarioRunner from frostfs_testlib.load.interfaces.loader import Loader
from frostfs_testlib.load.interfaces.scenario_runner import ScenarioRunner
from frostfs_testlib.load.k6 import K6 from frostfs_testlib.load.k6 import K6
from frostfs_testlib.load.load_config import K6ProcessAllocationStrategy, LoadParams, LoadType from frostfs_testlib.load.load_config import K6ProcessAllocationStrategy, LoadParams, LoadType
from frostfs_testlib.load.loaders import NodeLoader, RemoteLoader from frostfs_testlib.load.loaders import NodeLoader, RemoteLoader
@ -19,18 +19,17 @@ from frostfs_testlib.reporter import get_reporter
from frostfs_testlib.resources import optionals from frostfs_testlib.resources import optionals
from frostfs_testlib.resources.cli import FROSTFS_AUTHMATE_EXEC from frostfs_testlib.resources.cli import FROSTFS_AUTHMATE_EXEC
from frostfs_testlib.resources.common import STORAGE_USER_NAME from frostfs_testlib.resources.common import STORAGE_USER_NAME
from frostfs_testlib.resources.load_params import ( from frostfs_testlib.resources.load_params import BACKGROUND_LOAD_VUS_COUNT_DIVISOR, LOAD_NODE_SSH_USER, LOAD_NODES
BACKGROUND_LOAD_VUS_COUNT_DIVISOR, from frostfs_testlib.shell.command_inspectors import SuInspector
LOAD_NODE_SSH_USER,
LOAD_NODES,
)
from frostfs_testlib.shell.interfaces import CommandOptions, InteractiveInput from frostfs_testlib.shell.interfaces import CommandOptions, InteractiveInput
from frostfs_testlib.storage.cluster import ClusterNode from frostfs_testlib.storage.cluster import ClusterNode
from frostfs_testlib.storage.controllers.cluster_state_controller import ClusterStateController from frostfs_testlib.storage.controllers.cluster_state_controller import ClusterStateController
from frostfs_testlib.storage.dataclasses.frostfs_services import S3Gate, StorageNode from frostfs_testlib.storage.dataclasses.frostfs_services import S3Gate, StorageNode
from frostfs_testlib.storage.dataclasses.wallet import WalletInfo from frostfs_testlib.storage.dataclasses.wallet import WalletInfo
from frostfs_testlib.testing import parallel, run_optionally from frostfs_testlib.testing import parallel, run_optionally
from frostfs_testlib.utils import FileKeeper, datetime_utils from frostfs_testlib.testing.test_control import retry
from frostfs_testlib.utils import datetime_utils
from frostfs_testlib.utils.file_keeper import FileKeeper
reporter = get_reporter() reporter = get_reporter()
@ -52,6 +51,9 @@ class RunnerBase(ScenarioRunner):
return any([future.result() for future in futures]) return any([future.result() for future in futures])
def get_k6_instances(self):
return self.k6_instances
class DefaultRunner(RunnerBase): class DefaultRunner(RunnerBase):
loaders: list[Loader] loaders: list[Loader]
@ -81,14 +83,10 @@ class DefaultRunner(RunnerBase):
with reporter.step("Init s3 client on loaders"): with reporter.step("Init s3 client on loaders"):
storage_node = nodes_under_load[0].service(StorageNode) storage_node = nodes_under_load[0].service(StorageNode)
s3_public_keys = [ s3_public_keys = [node.service(S3Gate).get_wallet_public_key() for node in cluster_nodes]
node.service(S3Gate).get_wallet_public_key() for node in cluster_nodes
]
grpc_peer = storage_node.get_rpc_endpoint() grpc_peer = storage_node.get_rpc_endpoint()
parallel( parallel(self._prepare_loader, self.loaders, load_params, grpc_peer, s3_public_keys, k6_dir)
self._prepare_loader, self.loaders, load_params, grpc_peer, s3_public_keys, k6_dir
)
def _prepare_loader( def _prepare_loader(
self, self,
@ -110,9 +108,9 @@ class DefaultRunner(RunnerBase):
wallet_password=self.loaders_wallet.password, wallet_password=self.loaders_wallet.password,
).stdout ).stdout
aws_access_key_id = str( aws_access_key_id = str(
re.search( re.search(r"access_key_id.*:\s.(?P<aws_access_key_id>\w*)", issue_secret_output).group(
r"access_key_id.*:\s.(?P<aws_access_key_id>\w*)", issue_secret_output "aws_access_key_id"
).group("aws_access_key_id") )
) )
aws_secret_access_key = str( aws_secret_access_key = str(
re.search( re.search(
@ -123,9 +121,7 @@ class DefaultRunner(RunnerBase):
configure_input = [ configure_input = [
InteractiveInput(prompt_pattern=r"AWS Access Key ID.*", input=aws_access_key_id), InteractiveInput(prompt_pattern=r"AWS Access Key ID.*", input=aws_access_key_id),
InteractiveInput( InteractiveInput(prompt_pattern=r"AWS Secret Access Key.*", input=aws_secret_access_key),
prompt_pattern=r"AWS Secret Access Key.*", input=aws_secret_access_key
),
InteractiveInput(prompt_pattern=r".*", input=""), InteractiveInput(prompt_pattern=r".*", input=""),
InteractiveInput(prompt_pattern=r".*", input=""), InteractiveInput(prompt_pattern=r".*", input=""),
] ]
@ -142,16 +138,12 @@ class DefaultRunner(RunnerBase):
} }
endpoints_generators = { endpoints_generators = {
K6ProcessAllocationStrategy.PER_LOAD_NODE: itertools.cycle([endpoints]), K6ProcessAllocationStrategy.PER_LOAD_NODE: itertools.cycle([endpoints]),
K6ProcessAllocationStrategy.PER_ENDPOINT: itertools.cycle( K6ProcessAllocationStrategy.PER_ENDPOINT: itertools.cycle([[endpoint] for endpoint in endpoints]),
[[endpoint] for endpoint in endpoints]
),
} }
k6_processes_count = k6_distribution_count[load_params.k6_process_allocation_strategy] k6_processes_count = k6_distribution_count[load_params.k6_process_allocation_strategy]
endpoints_gen = endpoints_generators[load_params.k6_process_allocation_strategy] endpoints_gen = endpoints_generators[load_params.k6_process_allocation_strategy]
distributed_load_params_list = self._get_distributed_load_params_list( distributed_load_params_list = self._get_distributed_load_params_list(load_params, k6_processes_count)
load_params, k6_processes_count
)
futures = parallel( futures = parallel(
self._init_k6_instance, self._init_k6_instance,
@ -162,9 +154,7 @@ class DefaultRunner(RunnerBase):
) )
self.k6_instances = [future.result() for future in futures] self.k6_instances = [future.result() for future in futures]
def _init_k6_instance( def _init_k6_instance(self, load_params_for_loader: LoadParams, loader: Loader, endpoints: list[str], k6_dir: str):
self, load_params_for_loader: LoadParams, loader: Loader, endpoints: list[str], k6_dir: str
):
shell = loader.get_shell() shell = loader.get_shell()
with reporter.step(f"Init K6 instance on {loader.ip} for endpoints {endpoints}"): with reporter.step(f"Init K6 instance on {loader.ip} for endpoints {endpoints}"):
with reporter.step(f"Make working directory"): with reporter.step(f"Make working directory"):
@ -202,9 +192,7 @@ class DefaultRunner(RunnerBase):
and getattr(original_load_params, field.name) is not None and getattr(original_load_params, field.name) is not None
): ):
original_value = getattr(original_load_params, field.name) original_value = getattr(original_load_params, field.name)
distribution = self._get_distribution( distribution = self._get_distribution(math.ceil(original_value / divisor), workers_count)
math.ceil(original_value / divisor), workers_count
)
for i in range(workers_count): for i in range(workers_count):
setattr(distributed_load_params[i], field.name, distribution[i]) setattr(distributed_load_params[i], field.name, distribution[i])
@ -231,10 +219,7 @@ class DefaultRunner(RunnerBase):
# Remainder of clients left to be distributed # Remainder of clients left to be distributed
remainder = clients_count - clients_per_worker * workers_count remainder = clients_count - clients_per_worker * workers_count
distribution = [ distribution = [clients_per_worker + 1 if i < remainder else clients_per_worker for i in range(workers_count)]
clients_per_worker + 1 if i < remainder else clients_per_worker
for i in range(workers_count)
]
return distribution return distribution
def start(self): def start(self):
@ -243,9 +228,7 @@ class DefaultRunner(RunnerBase):
parallel([k6.start for k6 in self.k6_instances]) parallel([k6.start for k6 in self.k6_instances])
wait_after_start_time = datetime_utils.parse_time(load_params.setup_timeout) + 5 wait_after_start_time = datetime_utils.parse_time(load_params.setup_timeout) + 5
with reporter.step( with reporter.step(f"Wait for start timeout + couple more seconds ({wait_after_start_time}) before moving on"):
f"Wait for start timeout + couple more seconds ({wait_after_start_time}) before moving on"
):
time.sleep(wait_after_start_time) time.sleep(wait_after_start_time)
def stop(self): def stop(self):
@ -296,40 +279,49 @@ class LocalRunner(RunnerBase):
nodes_under_load: list[ClusterNode], nodes_under_load: list[ClusterNode],
k6_dir: str, k6_dir: str,
): ):
@reporter.step_deco("Prepare node {cluster_node}") parallel(self.prepare_node, nodes_under_load, k6_dir, load_params)
def prepare_node(cluster_node: ClusterNode):
shell = cluster_node.host.get_shell()
with reporter.step("Allow storage user to login into system"): @retry(3, 5, expected_result=True)
shell.exec(f"sudo chsh -s /bin/bash {STORAGE_USER_NAME}") def allow_user_to_login_in_system(self, cluster_node: ClusterNode):
shell.exec("sudo chattr +i /etc/passwd") shell = cluster_node.host.get_shell()
with reporter.step("Update limits.conf"): result = None
limits_path = "/etc/security/limits.conf" try:
self.file_keeper.add(cluster_node.storage_node, limits_path) shell.exec(f"sudo chsh -s /bin/bash {STORAGE_USER_NAME}")
content = f"{STORAGE_USER_NAME} hard nofile 65536\n{STORAGE_USER_NAME} soft nofile 65536\n" self.lock_passwd_on_node(cluster_node)
shell.exec(f"echo '{content}' | sudo tee {limits_path}") options = CommandOptions(check=False, extra_inspectors=[SuInspector(STORAGE_USER_NAME)])
result = shell.exec("whoami", options)
finally:
if not result or result.return_code:
self.restore_passwd_on_node(cluster_node)
return False
with reporter.step("Download K6"): return True
shell.exec(f"sudo rm -rf {k6_dir};sudo mkdir {k6_dir}")
shell.exec(f"sudo curl -so {k6_dir}/k6.tar.gz {load_params.k6_url}")
shell.exec(f"sudo tar xf {k6_dir}/k6.tar.gz -C {k6_dir}")
shell.exec(f"sudo chmod -R 777 {k6_dir}")
with reporter.step("Create empty_passwd"): @reporter.step_deco("Prepare node {cluster_node}")
self.wallet = WalletInfo( def prepare_node(self, cluster_node: ClusterNode, k6_dir: str, load_params: LoadParams):
f"{k6_dir}/scenarios/files/wallet.json", "", "/tmp/empty_passwd.yml" shell = cluster_node.host.get_shell()
)
content = yaml.dump({"password": ""})
shell.exec(f'echo "{content}" | sudo tee {self.wallet.config_path}')
shell.exec(f"sudo chmod -R 777 {self.wallet.config_path}")
with ThreadPoolExecutor(max_workers=len(nodes_under_load)) as executor: with reporter.step("Allow storage user to login into system"):
result = executor.map(prepare_node, nodes_under_load) self.allow_user_to_login_in_system(cluster_node)
# Check for exceptions with reporter.step("Update limits.conf"):
for _ in result: limits_path = "/etc/security/limits.conf"
pass self.file_keeper.add(cluster_node.storage_node, limits_path)
content = f"{STORAGE_USER_NAME} hard nofile 65536\n{STORAGE_USER_NAME} soft nofile 65536\n"
shell.exec(f"echo '{content}' | sudo tee {limits_path}")
with reporter.step("Download K6"):
shell.exec(f"sudo rm -rf {k6_dir};sudo mkdir {k6_dir}")
shell.exec(f"sudo curl -so {k6_dir}/k6.tar.gz {load_params.k6_url}")
shell.exec(f"sudo tar xf {k6_dir}/k6.tar.gz -C {k6_dir}")
shell.exec(f"sudo chmod -R 777 {k6_dir}")
with reporter.step("Create empty_passwd"):
self.wallet = WalletInfo(f"{k6_dir}/scenarios/files/wallet.json", "", "/tmp/empty_passwd.yml")
content = yaml.dump({"password": ""})
shell.exec(f'echo "{content}" | sudo tee {self.wallet.config_path}')
shell.exec(f"sudo chmod -R 777 {self.wallet.config_path}")
@reporter.step_deco("Init k6 instances") @reporter.step_deco("Init k6 instances")
def init_k6_instances(self, load_params: LoadParams, endpoints: list[str], k6_dir: str): def init_k6_instances(self, load_params: LoadParams, endpoints: list[str], k6_dir: str):
@ -368,30 +360,30 @@ class LocalRunner(RunnerBase):
def start(self): def start(self):
load_params = self.k6_instances[0].load_params load_params = self.k6_instances[0].load_params
self.cluster_state_controller.stop_all_s3_gates() self.cluster_state_controller.stop_services_of_type(S3Gate)
self.cluster_state_controller.stop_all_storage_services() self.cluster_state_controller.stop_services_of_type(StorageNode)
parallel([k6.start for k6 in self.k6_instances]) parallel([k6.start for k6 in self.k6_instances])
wait_after_start_time = datetime_utils.parse_time(load_params.setup_timeout) + 5 wait_after_start_time = datetime_utils.parse_time(load_params.setup_timeout) + 5
with reporter.step( with reporter.step(f"Wait for start timeout + couple more seconds ({wait_after_start_time}) before moving on"):
f"Wait for start timeout + couple more seconds ({wait_after_start_time}) before moving on"
):
time.sleep(wait_after_start_time) time.sleep(wait_after_start_time)
@reporter.step_deco("Restore passwd on {cluster_node}")
def restore_passwd_on_node(self, cluster_node: ClusterNode):
shell = cluster_node.host.get_shell()
shell.exec("sudo chattr -i /etc/passwd")
@reporter.step_deco("Lock passwd on {cluster_node}")
def lock_passwd_on_node(self, cluster_node: ClusterNode):
shell = cluster_node.host.get_shell()
shell.exec("sudo chattr +i /etc/passwd")
def stop(self): def stop(self):
for k6_instance in self.k6_instances: for k6_instance in self.k6_instances:
k6_instance.stop() k6_instance.stop()
@reporter.step_deco("Restore passwd on {cluster_node}") self.cluster_state_controller.start_all_stopped_services()
def restore_passwd_attr_on_node(cluster_node: ClusterNode):
shell = cluster_node.host.get_shell()
shell.exec("sudo chattr -i /etc/passwd")
parallel(restore_passwd_attr_on_node, self.nodes_under_load)
self.cluster_state_controller.start_stopped_storage_services()
self.cluster_state_controller.start_stopped_s3_gates()
def get_results(self) -> dict: def get_results(self) -> dict:
results = {} results = {}
@ -399,4 +391,6 @@ class LocalRunner(RunnerBase):
result = k6_instance.get_results() result = k6_instance.get_results()
results[k6_instance.loader.ip] = result results[k6_instance.loader.ip] = result
parallel(self.restore_passwd_on_node, self.nodes_under_load)
return results return results

View file

@ -43,6 +43,6 @@ with open(DEFAULT_WALLET_CONFIG, "w") as file:
# Number of attempts that S3 clients will attempt per each request (1 means single attempt # Number of attempts that S3 clients will attempt per each request (1 means single attempt
# without any retries) # without any retries)
MAX_REQUEST_ATTEMPTS = 1 MAX_REQUEST_ATTEMPTS = 5
RETRY_MODE = "standard" RETRY_MODE = "standard"
CREDENTIALS_CREATE_TIMEOUT = "1m" CREDENTIALS_CREATE_TIMEOUT = "1m"

View file

@ -7,12 +7,7 @@ from time import sleep
from typing import Literal, Optional, Union from typing import Literal, Optional, Union
from frostfs_testlib.reporter import get_reporter from frostfs_testlib.reporter import get_reporter
from frostfs_testlib.resources.common import ( from frostfs_testlib.resources.common import ASSETS_DIR, MAX_REQUEST_ATTEMPTS, RETRY_MODE, S3_SYNC_WAIT_TIME
ASSETS_DIR,
MAX_REQUEST_ATTEMPTS,
RETRY_MODE,
S3_SYNC_WAIT_TIME,
)
from frostfs_testlib.s3.interfaces import S3ClientWrapper, VersioningStatus, _make_objs_dict from frostfs_testlib.s3.interfaces import S3ClientWrapper, VersioningStatus, _make_objs_dict
from frostfs_testlib.shell import CommandOptions from frostfs_testlib.shell import CommandOptions
from frostfs_testlib.shell.local_shell import LocalShell from frostfs_testlib.shell.local_shell import LocalShell
@ -22,7 +17,7 @@ from frostfs_testlib.utils.cli_utils import _configure_aws_cli
reporter = get_reporter() reporter = get_reporter()
logger = logging.getLogger("NeoLogger") logger = logging.getLogger("NeoLogger")
command_options = CommandOptions(timeout=240) command_options = CommandOptions(timeout=480)
class AwsCliClient(S3ClientWrapper): class AwsCliClient(S3ClientWrapper):
@ -128,9 +123,7 @@ class AwsCliClient(S3ClientWrapper):
@reporter.step_deco("Put bucket tagging") @reporter.step_deco("Put bucket tagging")
def put_bucket_tagging(self, bucket: str, tags: list) -> None: def put_bucket_tagging(self, bucket: str, tags: list) -> None:
tags_json = { tags_json = {"TagSet": [{"Key": tag_key, "Value": tag_value} for tag_key, tag_value in tags]}
"TagSet": [{"Key": tag_key, "Value": tag_value} for tag_key, tag_value in tags]
}
cmd = ( cmd = (
f"aws {self.common_flags} s3api put-bucket-tagging --bucket {bucket} " f"aws {self.common_flags} s3api put-bucket-tagging --bucket {bucket} "
f"--tagging '{json.dumps(tags_json)}' --endpoint {self.s3gate_endpoint}" f"--tagging '{json.dumps(tags_json)}' --endpoint {self.s3gate_endpoint}"
@ -140,8 +133,7 @@ class AwsCliClient(S3ClientWrapper):
@reporter.step_deco("Get bucket tagging") @reporter.step_deco("Get bucket tagging")
def get_bucket_tagging(self, bucket: str) -> list: def get_bucket_tagging(self, bucket: str) -> list:
cmd = ( cmd = (
f"aws {self.common_flags} s3api get-bucket-tagging --bucket {bucket} " f"aws {self.common_flags} s3api get-bucket-tagging --bucket {bucket} " f"--endpoint {self.s3gate_endpoint}"
f"--endpoint {self.s3gate_endpoint}"
) )
output = self.local_shell.exec(cmd).stdout output = self.local_shell.exec(cmd).stdout
response = self._to_json(output) response = self._to_json(output)
@ -149,10 +141,7 @@ class AwsCliClient(S3ClientWrapper):
@reporter.step_deco("Get bucket acl") @reporter.step_deco("Get bucket acl")
def get_bucket_acl(self, bucket: str) -> list: def get_bucket_acl(self, bucket: str) -> list:
cmd = ( cmd = f"aws {self.common_flags} s3api get-bucket-acl --bucket {bucket} " f"--endpoint {self.s3gate_endpoint}"
f"aws {self.common_flags} s3api get-bucket-acl --bucket {bucket} "
f"--endpoint {self.s3gate_endpoint}"
)
output = self.local_shell.exec(cmd).stdout output = self.local_shell.exec(cmd).stdout
response = self._to_json(output) response = self._to_json(output)
return response.get("Grants") return response.get("Grants")
@ -160,8 +149,7 @@ class AwsCliClient(S3ClientWrapper):
@reporter.step_deco("Get bucket location") @reporter.step_deco("Get bucket location")
def get_bucket_location(self, bucket: str) -> dict: def get_bucket_location(self, bucket: str) -> dict:
cmd = ( cmd = (
f"aws {self.common_flags} s3api get-bucket-location --bucket {bucket} " f"aws {self.common_flags} s3api get-bucket-location --bucket {bucket} " f"--endpoint {self.s3gate_endpoint}"
f"--endpoint {self.s3gate_endpoint}"
) )
output = self.local_shell.exec(cmd).stdout output = self.local_shell.exec(cmd).stdout
response = self._to_json(output) response = self._to_json(output)
@ -169,10 +157,7 @@ class AwsCliClient(S3ClientWrapper):
@reporter.step_deco("List objects S3") @reporter.step_deco("List objects S3")
def list_objects(self, bucket: str, full_output: bool = False) -> Union[dict, list[str]]: def list_objects(self, bucket: str, full_output: bool = False) -> Union[dict, list[str]]:
cmd = ( cmd = f"aws {self.common_flags} s3api list-objects --bucket {bucket} " f"--endpoint {self.s3gate_endpoint}"
f"aws {self.common_flags} s3api list-objects --bucket {bucket} "
f"--endpoint {self.s3gate_endpoint}"
)
output = self.local_shell.exec(cmd).stdout output = self.local_shell.exec(cmd).stdout
response = self._to_json(output) response = self._to_json(output)
@ -183,10 +168,7 @@ class AwsCliClient(S3ClientWrapper):
@reporter.step_deco("List objects S3 v2") @reporter.step_deco("List objects S3 v2")
def list_objects_v2(self, bucket: str, full_output: bool = False) -> Union[dict, list[str]]: def list_objects_v2(self, bucket: str, full_output: bool = False) -> Union[dict, list[str]]:
cmd = ( cmd = f"aws {self.common_flags} s3api list-objects-v2 --bucket {bucket} " f"--endpoint {self.s3gate_endpoint}"
f"aws {self.common_flags} s3api list-objects-v2 --bucket {bucket} "
f"--endpoint {self.s3gate_endpoint}"
)
output = self.local_shell.exec(cmd).stdout output = self.local_shell.exec(cmd).stdout
response = self._to_json(output) response = self._to_json(output)
@ -371,10 +353,7 @@ class AwsCliClient(S3ClientWrapper):
grant_write: Optional[str] = None, grant_write: Optional[str] = None,
grant_read: Optional[str] = None, grant_read: Optional[str] = None,
) -> None: ) -> None:
cmd = ( cmd = f"aws {self.common_flags} s3api put-bucket-acl --bucket {bucket} " f" --endpoint {self.s3gate_endpoint}"
f"aws {self.common_flags} s3api put-bucket-acl --bucket {bucket} "
f" --endpoint {self.s3gate_endpoint}"
)
if acl: if acl:
cmd += f" --acl {acl}" cmd += f" --acl {acl}"
if grant_write: if grant_write:
@ -442,9 +421,7 @@ class AwsCliClient(S3ClientWrapper):
def delete_object_versions_without_dm(self, bucket: str, object_versions: list) -> None: def delete_object_versions_without_dm(self, bucket: str, object_versions: list) -> None:
# Delete objects without creating delete markers # Delete objects without creating delete markers
for object_version in object_versions: for object_version in object_versions:
self.delete_object( self.delete_object(bucket=bucket, key=object_version["Key"], version_id=object_version["VersionId"])
bucket=bucket, key=object_version["Key"], version_id=object_version["VersionId"]
)
@reporter.step_deco("Get object attributes") @reporter.step_deco("Get object attributes")
def get_object_attributes( def get_object_attributes(
@ -480,10 +457,7 @@ class AwsCliClient(S3ClientWrapper):
@reporter.step_deco("Get bucket policy") @reporter.step_deco("Get bucket policy")
def get_bucket_policy(self, bucket: str) -> dict: def get_bucket_policy(self, bucket: str) -> dict:
cmd = ( cmd = f"aws {self.common_flags} s3api get-bucket-policy --bucket {bucket} " f"--endpoint {self.s3gate_endpoint}"
f"aws {self.common_flags} s3api get-bucket-policy --bucket {bucket} "
f"--endpoint {self.s3gate_endpoint}"
)
output = self.local_shell.exec(cmd).stdout output = self.local_shell.exec(cmd).stdout
response = self._to_json(output) response = self._to_json(output)
return response.get("Policy") return response.get("Policy")
@ -505,10 +479,7 @@ class AwsCliClient(S3ClientWrapper):
@reporter.step_deco("Get bucket cors") @reporter.step_deco("Get bucket cors")
def get_bucket_cors(self, bucket: str) -> dict: def get_bucket_cors(self, bucket: str) -> dict:
cmd = ( cmd = f"aws {self.common_flags} s3api get-bucket-cors --bucket {bucket} " f"--endpoint {self.s3gate_endpoint}"
f"aws {self.common_flags} s3api get-bucket-cors --bucket {bucket} "
f"--endpoint {self.s3gate_endpoint}"
)
output = self.local_shell.exec(cmd).stdout output = self.local_shell.exec(cmd).stdout
response = self._to_json(output) response = self._to_json(output)
return response.get("CORSRules") return response.get("CORSRules")
@ -524,8 +495,7 @@ class AwsCliClient(S3ClientWrapper):
@reporter.step_deco("Delete bucket cors") @reporter.step_deco("Delete bucket cors")
def delete_bucket_cors(self, bucket: str) -> None: def delete_bucket_cors(self, bucket: str) -> None:
cmd = ( cmd = (
f"aws {self.common_flags} s3api delete-bucket-cors --bucket {bucket} " f"aws {self.common_flags} s3api delete-bucket-cors --bucket {bucket} " f"--endpoint {self.s3gate_endpoint}"
f"--endpoint {self.s3gate_endpoint}"
) )
self.local_shell.exec(cmd) self.local_shell.exec(cmd)
@ -608,10 +578,7 @@ class AwsCliClient(S3ClientWrapper):
acl: Optional[str] = None, acl: Optional[str] = None,
metadata: Optional[dict] = None, metadata: Optional[dict] = None,
) -> dict: ) -> dict:
cmd = ( cmd = f"aws {self.common_flags} s3 sync {dir_path} s3://{bucket} " f"--endpoint-url {self.s3gate_endpoint}"
f"aws {self.common_flags} s3 sync {dir_path} s3://{bucket} "
f"--endpoint-url {self.s3gate_endpoint}"
)
if metadata: if metadata:
cmd += " --metadata" cmd += " --metadata"
for key, value in metadata.items(): for key, value in metadata.items():
@ -674,9 +641,7 @@ class AwsCliClient(S3ClientWrapper):
self.local_shell.exec(cmd) self.local_shell.exec(cmd)
@reporter.step_deco("Upload part S3") @reporter.step_deco("Upload part S3")
def upload_part( def upload_part(self, bucket: str, key: str, upload_id: str, part_num: int, filepath: str) -> str:
self, bucket: str, key: str, upload_id: str, part_num: int, filepath: str
) -> str:
cmd = ( cmd = (
f"aws {self.common_flags} s3api upload-part --bucket {bucket} --key {key} " f"aws {self.common_flags} s3api upload-part --bucket {bucket} --key {key} "
f"--upload-id {upload_id} --part-number {part_num} --body {filepath} " f"--upload-id {upload_id} --part-number {part_num} --body {filepath} "
@ -688,9 +653,7 @@ class AwsCliClient(S3ClientWrapper):
return response["ETag"] return response["ETag"]
@reporter.step_deco("Upload copy part S3") @reporter.step_deco("Upload copy part S3")
def upload_part_copy( def upload_part_copy(self, bucket: str, key: str, upload_id: str, part_num: int, copy_source: str) -> str:
self, bucket: str, key: str, upload_id: str, part_num: int, copy_source: str
) -> str:
cmd = ( cmd = (
f"aws {self.common_flags} s3api upload-part-copy --bucket {bucket} --key {key} " f"aws {self.common_flags} s3api upload-part-copy --bucket {bucket} --key {key} "
f"--upload-id {upload_id} --part-number {part_num} --copy-source {copy_source} " f"--upload-id {upload_id} --part-number {part_num} --copy-source {copy_source} "
@ -698,9 +661,7 @@ class AwsCliClient(S3ClientWrapper):
) )
output = self.local_shell.exec(cmd, command_options).stdout output = self.local_shell.exec(cmd, command_options).stdout
response = self._to_json(output) response = self._to_json(output)
assert response.get("CopyPartResult", []).get( assert response.get("CopyPartResult", []).get("ETag"), f"Expected ETag in response:\n{response}"
"ETag"
), f"Expected ETag in response:\n{response}"
return response["CopyPartResult"]["ETag"] return response["CopyPartResult"]["ETag"]
@ -732,7 +693,10 @@ class AwsCliClient(S3ClientWrapper):
f"--key {key} --upload-id {upload_id} --multipart-upload file://{file_path} " f"--key {key} --upload-id {upload_id} --multipart-upload file://{file_path} "
f"--endpoint-url {self.s3gate_endpoint}" f"--endpoint-url {self.s3gate_endpoint}"
) )
self.local_shell.exec(cmd) output = self.local_shell.exec(cmd).stdout
response = self._to_json(output)
return response
@reporter.step_deco("Put object lock configuration") @reporter.step_deco("Put object lock configuration")
def put_object_lock_configuration(self, bucket: str, configuration: dict) -> dict: def put_object_lock_configuration(self, bucket: str, configuration: dict) -> dict:

View file

@ -573,6 +573,8 @@ class Boto3ClientWrapper(S3ClientWrapper):
) )
log_command_execution("S3 Complete multipart upload", response) log_command_execution("S3 Complete multipart upload", response)
return response
@reporter.step_deco("Put object retention") @reporter.step_deco("Put object retention")
@report_error @report_error
def put_object_retention( def put_object_retention(

View file

@ -2,7 +2,7 @@ import copy
from typing import Optional from typing import Optional
import frostfs_testlib.resources.optionals as optionals import frostfs_testlib.resources.optionals as optionals
from frostfs_testlib.load.interfaces import ScenarioRunner from frostfs_testlib.load.interfaces.scenario_runner import ScenarioRunner
from frostfs_testlib.load.load_config import ( from frostfs_testlib.load.load_config import (
EndpointSelectionStrategy, EndpointSelectionStrategy,
LoadParams, LoadParams,

View file

@ -1,15 +1,15 @@
import copy
import time import time
import frostfs_testlib.resources.optionals as optionals import frostfs_testlib.resources.optionals as optionals
from frostfs_testlib.healthcheck.interfaces import Healthcheck
from frostfs_testlib.reporter import get_reporter from frostfs_testlib.reporter import get_reporter
from frostfs_testlib.shell import CommandOptions, Shell, SshConnectionProvider from frostfs_testlib.shell import CommandOptions, Shell, SshConnectionProvider
from frostfs_testlib.steps.network import IfUpDownHelper, IpTablesHelper from frostfs_testlib.steps.network import IfUpDownHelper, IpTablesHelper
from frostfs_testlib.storage.cluster import Cluster, ClusterNode, StorageNode from frostfs_testlib.storage.cluster import Cluster, ClusterNode, S3Gate, StorageNode
from frostfs_testlib.storage.controllers.disk_controller import DiskController from frostfs_testlib.storage.controllers.disk_controller import DiskController
from frostfs_testlib.storage.dataclasses.node_base import NodeBase, ServiceClass from frostfs_testlib.storage.dataclasses.node_base import NodeBase, ServiceClass
from frostfs_testlib.testing import parallel from frostfs_testlib.testing import parallel
from frostfs_testlib.testing.test_control import run_optionally from frostfs_testlib.testing.test_control import run_optionally, wait_for_success
from frostfs_testlib.utils.failover_utils import ( from frostfs_testlib.utils.failover_utils import (
wait_all_storage_nodes_returned, wait_all_storage_nodes_returned,
wait_for_host_offline, wait_for_host_offline,
@ -22,18 +22,36 @@ if_up_down_helper = IfUpDownHelper()
class ClusterStateController: class ClusterStateController:
def __init__(self, shell: Shell, cluster: Cluster) -> None: def __init__(self, shell: Shell, cluster: Cluster, healthcheck: Healthcheck) -> None:
self.stopped_nodes: list[ClusterNode] = [] self.stopped_nodes: list[ClusterNode] = []
self.detached_disks: dict[str, DiskController] = {} self.detached_disks: dict[str, DiskController] = {}
self.stopped_storage_nodes: list[ClusterNode] = []
self.stopped_s3_gates: list[ClusterNode] = []
self.dropped_traffic: list[ClusterNode] = [] self.dropped_traffic: list[ClusterNode] = []
self.stopped_services: set[NodeBase] = set() self.stopped_services: set[NodeBase] = set()
self.cluster = cluster self.cluster = cluster
self.healthcheck = healthcheck
self.shell = shell self.shell = shell
self.suspended_services: dict[str, list[ClusterNode]] = {} self.suspended_services: dict[str, list[ClusterNode]] = {}
self.nodes_with_modified_interface: list[ClusterNode] = [] self.nodes_with_modified_interface: list[ClusterNode] = []
def _get_stopped_by_node(self, node: ClusterNode) -> set[NodeBase]:
stopped_by_node = [svc for svc in self.stopped_services if svc.host == node.host]
return set(stopped_by_node)
def _get_stopped_by_type(self, service_type: type[ServiceClass]) -> set[ServiceClass]:
stopped_by_type = [svc for svc in self.stopped_services if isinstance(svc, service_type)]
return set(stopped_by_type)
def _from_stopped_nodes(self, service_type: type[ServiceClass]) -> set[ServiceClass]:
stopped_on_nodes = set([node.service(service_type) for node in self.stopped_nodes])
return set(stopped_on_nodes)
def _get_online(self, service_type: type[ServiceClass]) -> set[ServiceClass]:
stopped_svc = self._get_stopped_by_type(service_type).union(
self._from_stopped_nodes(service_type)
)
online_svc = set(self.cluster.services(service_type)) - stopped_svc
return online_svc
@run_optionally(optionals.OPTIONAL_FAILOVER_ENABLED) @run_optionally(optionals.OPTIONAL_FAILOVER_ENABLED)
@reporter.step_deco("Stop host of node {node}") @reporter.step_deco("Stop host of node {node}")
def stop_node_host(self, node: ClusterNode, mode: str): def stop_node_host(self, node: ClusterNode, mode: str):
@ -65,34 +83,16 @@ class ClusterStateController:
for node in nodes: for node in nodes:
wait_for_host_offline(self.shell, node.storage_node) wait_for_host_offline(self.shell, node.storage_node)
@run_optionally(optionals.OPTIONAL_FAILOVER_ENABLED)
@reporter.step_deco("Stop all storage services on cluster")
def stop_all_storage_services(self, reversed_order: bool = False):
nodes = (
reversed(self.cluster.cluster_nodes) if reversed_order else self.cluster.cluster_nodes
)
for node in nodes:
self.stop_storage_service(node)
@run_optionally(optionals.OPTIONAL_FAILOVER_ENABLED)
@reporter.step_deco("Stop all S3 gates on cluster")
def stop_all_s3_gates(self, reversed_order: bool = False):
nodes = (
reversed(self.cluster.cluster_nodes) if reversed_order else self.cluster.cluster_nodes
)
for node in nodes:
self.stop_s3_gate(node)
@run_optionally(optionals.OPTIONAL_FAILOVER_ENABLED) @run_optionally(optionals.OPTIONAL_FAILOVER_ENABLED)
@reporter.step_deco("Start host of node {node}") @reporter.step_deco("Start host of node {node}")
def start_node_host(self, node: ClusterNode): def start_node_host(self, node: ClusterNode, tree_healthcheck: bool = True):
with reporter.step(f"Start host {node.host.config.address}"): with reporter.step(f"Start host {node.host.config.address}"):
node.host.start_host() node.host.start_host()
wait_for_host_online(self.shell, node.storage_node) wait_for_host_online(self.shell, node.storage_node)
self.stopped_nodes.remove(node)
wait_for_node_online(node.storage_node) wait_for_node_online(node.storage_node)
self.stopped_nodes.remove(node) if tree_healthcheck:
self.wait_tree_healthcheck()
@run_optionally(optionals.OPTIONAL_FAILOVER_ENABLED) @run_optionally(optionals.OPTIONAL_FAILOVER_ENABLED)
@reporter.step_deco("Start stopped hosts") @reporter.step_deco("Start stopped hosts")
@ -104,13 +104,10 @@ class ClusterStateController:
for node in nodes: for node in nodes:
with reporter.step(f"Start host {node.host.config.address}"): with reporter.step(f"Start host {node.host.config.address}"):
node.host.start_host() node.host.start_host()
if node in self.stopped_storage_nodes: self.stopped_services.difference_update(self._get_stopped_by_node(node))
self.stopped_storage_nodes.remove(node)
if node in self.stopped_s3_gates:
self.stopped_s3_gates.remove(node)
self.stopped_nodes = [] self.stopped_nodes = []
wait_all_storage_nodes_returned(self.shell, self.cluster) self.wait_after_storage_startup()
@run_optionally(optionals.OPTIONAL_FAILOVER_ENABLED) @run_optionally(optionals.OPTIONAL_FAILOVER_ENABLED)
@reporter.step_deco("Detach disk {device} at {mountpoint} on node {node}") @reporter.step_deco("Detach disk {device} at {mountpoint} on node {node}")
@ -133,42 +130,58 @@ class ClusterStateController:
disk_controller.attach() disk_controller.attach()
self.detached_disks = {} self.detached_disks = {}
@run_optionally(optionals.OPTIONAL_FAILOVER_ENABLED)
@reporter.step_deco("Stop storage service on {node}")
def stop_storage_service(self, node: ClusterNode, mask: bool = True):
self.stopped_storage_nodes.append(node)
node.storage_node.stop_service(mask)
@run_optionally(optionals.OPTIONAL_FAILOVER_ENABLED) @run_optionally(optionals.OPTIONAL_FAILOVER_ENABLED)
@reporter.step_deco("Stop all {service_type} services") @reporter.step_deco("Stop all {service_type} services")
def stop_services_of_type(self, service_type: type[ServiceClass]): def stop_services_of_type(self, service_type: type[ServiceClass], mask: bool = True):
services = self.cluster.services(service_type) services = self.cluster.services(service_type)
self.stopped_services.update(services) self.stopped_services.update(services)
parallel([service.stop_service for service in services]) parallel([service.stop_service for service in services], mask=mask)
@run_optionally(optionals.OPTIONAL_FAILOVER_ENABLED) @run_optionally(optionals.OPTIONAL_FAILOVER_ENABLED)
@reporter.step_deco("Start all {service_type} services") @reporter.step_deco("Start all {service_type} services")
def start_services_of_type(self, service_type: type[ServiceClass]): def start_services_of_type(self, service_type: type[ServiceClass]):
services = self.cluster.services(service_type) services = self.cluster.services(service_type)
parallel([service.start_service for service in services]) parallel([service.start_service for service in services])
self.stopped_services.difference_update(set(services))
if service_type == StorageNode: if service_type == StorageNode:
wait_all_storage_nodes_returned(self.shell, self.cluster) self.wait_after_storage_startup()
self.stopped_services = self.stopped_services - set(services) @wait_for_success(600, 60)
def wait_s3gate(self, s3gate: S3Gate):
with reporter.step(f"Wait for {s3gate} reconnection"):
result = s3gate.get_metric("frostfs_s3_gw_pool_current_nodes")
assert (
'address="127.0.0.1' in result.stdout
), "S3Gate should connect to local storage node"
@reporter.step_deco("Wait for S3Gates reconnection to local storage")
def wait_s3gates(self):
online_s3gates = self._get_online(S3Gate)
if online_s3gates:
parallel(self.wait_s3gate, online_s3gates)
@wait_for_success(600, 60)
def wait_tree_healthcheck(self):
nodes = self.cluster.nodes(self._get_online(StorageNode))
parallel(self.healthcheck.tree_healthcheck, nodes)
@reporter.step_deco("Wait for storage reconnection to the system")
def wait_after_storage_startup(self):
wait_all_storage_nodes_returned(self.shell, self.cluster)
self.wait_s3gates()
self.wait_tree_healthcheck()
@run_optionally(optionals.OPTIONAL_FAILOVER_ENABLED) @run_optionally(optionals.OPTIONAL_FAILOVER_ENABLED)
@reporter.step_deco("Start all stopped services") @reporter.step_deco("Start all stopped services")
def start_all_stopped_services(self): def start_all_stopped_services(self):
stopped_storages = self._get_stopped_by_type(StorageNode)
parallel([service.start_service for service in self.stopped_services]) parallel([service.start_service for service in self.stopped_services])
for service in self.stopped_services:
if isinstance(service, StorageNode):
wait_all_storage_nodes_returned(self.shell, self.cluster)
break
self.stopped_services.clear() self.stopped_services.clear()
if stopped_storages:
self.wait_after_storage_startup()
@run_optionally(optionals.OPTIONAL_FAILOVER_ENABLED) @run_optionally(optionals.OPTIONAL_FAILOVER_ENABLED)
@reporter.step_deco("Stop {service_type} service on {node}") @reporter.step_deco("Stop {service_type} service on {node}")
def stop_service_of_type( def stop_service_of_type(
@ -183,50 +196,78 @@ class ClusterStateController:
def start_service_of_type(self, node: ClusterNode, service_type: type[ServiceClass]): def start_service_of_type(self, node: ClusterNode, service_type: type[ServiceClass]):
service = node.service(service_type) service = node.service(service_type)
service.start_service() service.start_service()
if service in self.stopped_services: self.stopped_services.discard(service)
self.stopped_services.remove(service)
@run_optionally(optionals.OPTIONAL_FAILOVER_ENABLED)
@reporter.step_deco("Start all stopped {service_type} services")
def start_stopped_services_of_type(self, service_type: type[ServiceClass]):
stopped_svc = self._get_stopped_by_type(service_type)
if not stopped_svc:
return
parallel([svc.start_service for svc in stopped_svc])
self.stopped_services.difference_update(stopped_svc)
if service_type == StorageNode:
self.wait_after_storage_startup()
# TODO: Deprecated
@run_optionally(optionals.OPTIONAL_FAILOVER_ENABLED)
@reporter.step_deco("Stop all storage services on cluster")
def stop_all_storage_services(self, reversed_order: bool = False):
nodes = (
reversed(self.cluster.cluster_nodes) if reversed_order else self.cluster.cluster_nodes
)
for node in nodes:
self.stop_service_of_type(node, StorageNode)
# TODO: Deprecated
@run_optionally(optionals.OPTIONAL_FAILOVER_ENABLED)
@reporter.step_deco("Stop all S3 gates on cluster")
def stop_all_s3_gates(self, reversed_order: bool = False):
nodes = (
reversed(self.cluster.cluster_nodes) if reversed_order else self.cluster.cluster_nodes
)
for node in nodes:
self.stop_service_of_type(node, S3Gate)
# TODO: Deprecated
@run_optionally(optionals.OPTIONAL_FAILOVER_ENABLED)
@reporter.step_deco("Stop storage service on {node}")
def stop_storage_service(self, node: ClusterNode, mask: bool = True):
self.stop_service_of_type(node, StorageNode, mask)
# TODO: Deprecated
@run_optionally(optionals.OPTIONAL_FAILOVER_ENABLED) @run_optionally(optionals.OPTIONAL_FAILOVER_ENABLED)
@reporter.step_deco("Start storage service on {node}") @reporter.step_deco("Start storage service on {node}")
def start_storage_service(self, node: ClusterNode): def start_storage_service(self, node: ClusterNode):
node.storage_node.start_service() self.start_service_of_type(node, StorageNode)
self.stopped_storage_nodes.remove(node)
# TODO: Deprecated
@run_optionally(optionals.OPTIONAL_FAILOVER_ENABLED) @run_optionally(optionals.OPTIONAL_FAILOVER_ENABLED)
@reporter.step_deco("Start stopped storage services") @reporter.step_deco("Start stopped storage services")
def start_stopped_storage_services(self): def start_stopped_storage_services(self):
if not self.stopped_storage_nodes: self.start_stopped_services_of_type(StorageNode)
return
# In case if we stopped couple services, for example (s01-s04):
# After starting only s01, it may require connections to s02-s04, which is still down, and fail to start.
# Also, if something goes wrong here, we might skip s02-s04 start at all, and cluster will be left in a bad state.
# So in order to make sure that services are at least attempted to be started, using parallel runs here.
parallel(self.start_storage_service, copy.copy(self.stopped_storage_nodes))
wait_all_storage_nodes_returned(self.shell, self.cluster)
self.stopped_storage_nodes = []
# TODO: Deprecated
@run_optionally(optionals.OPTIONAL_FAILOVER_ENABLED) @run_optionally(optionals.OPTIONAL_FAILOVER_ENABLED)
@reporter.step_deco("Stop s3 gate on {node}") @reporter.step_deco("Stop s3 gate on {node}")
def stop_s3_gate(self, node: ClusterNode, mask: bool = True): def stop_s3_gate(self, node: ClusterNode, mask: bool = True):
node.s3_gate.stop_service(mask) self.stop_service_of_type(node, S3Gate, mask)
self.stopped_s3_gates.append(node)
# TODO: Deprecated
@run_optionally(optionals.OPTIONAL_FAILOVER_ENABLED) @run_optionally(optionals.OPTIONAL_FAILOVER_ENABLED)
@reporter.step_deco("Start s3 gate on {node}") @reporter.step_deco("Start s3 gate on {node}")
def start_s3_gate(self, node: ClusterNode): def start_s3_gate(self, node: ClusterNode):
node.s3_gate.start_service() self.start_service_of_type(node, S3Gate)
self.stopped_s3_gates.remove(node)
# TODO: Deprecated
@run_optionally(optionals.OPTIONAL_FAILOVER_ENABLED) @run_optionally(optionals.OPTIONAL_FAILOVER_ENABLED)
@reporter.step_deco("Start stopped S3 gates") @reporter.step_deco("Start stopped S3 gates")
def start_stopped_s3_gates(self): def start_stopped_s3_gates(self):
if not self.stopped_s3_gates: self.start_stopped_services_of_type(S3Gate)
return
parallel(self.start_s3_gate, copy.copy(self.stopped_s3_gates))
self.stopped_s3_gates = []
@run_optionally(optionals.OPTIONAL_FAILOVER_ENABLED) @run_optionally(optionals.OPTIONAL_FAILOVER_ENABLED)
@reporter.step_deco("Suspend {process_name} service in {node}") @reporter.step_deco("Suspend {process_name} service in {node}")
@ -312,7 +353,9 @@ class ClusterStateController:
@run_optionally(optionals.OPTIONAL_FAILOVER_ENABLED) @run_optionally(optionals.OPTIONAL_FAILOVER_ENABLED)
@reporter.step_deco("Hard reboot host {node} via magic SysRq option") @reporter.step_deco("Hard reboot host {node} via magic SysRq option")
def panic_reboot_host(self, node: ClusterNode, wait_for_return: bool = True): def panic_reboot_host(
self, node: ClusterNode, wait_for_return: bool = True, tree_healthcheck: bool = True
):
shell = node.host.get_shell() shell = node.host.get_shell()
shell.exec('sudo sh -c "echo 1 > /proc/sys/kernel/sysrq"') shell.exec('sudo sh -c "echo 1 > /proc/sys/kernel/sysrq"')
@ -329,6 +372,8 @@ class ClusterStateController:
time.sleep(10) time.sleep(10)
wait_for_host_online(self.shell, node.storage_node) wait_for_host_online(self.shell, node.storage_node)
wait_for_node_online(node.storage_node) wait_for_node_online(node.storage_node)
if tree_healthcheck:
self.wait_tree_healthcheck()
@reporter.step_deco("Down {interface} to {nodes}") @reporter.step_deco("Down {interface} to {nodes}")
def down_interface(self, nodes: list[ClusterNode], interface: str): def down_interface(self, nodes: list[ClusterNode], interface: str):

View file

@ -1,12 +1,15 @@
from abc import abstractmethod from abc import abstractmethod
from dataclasses import dataclass from dataclasses import dataclass
from datetime import datetime, timezone
from typing import Optional, TypedDict, TypeVar from typing import Optional, TypedDict, TypeVar
import yaml import yaml
from dateutil import parser
from frostfs_testlib.hosting.config import ServiceConfig from frostfs_testlib.hosting.config import ServiceConfig
from frostfs_testlib.hosting.interfaces import Host from frostfs_testlib.hosting.interfaces import Host
from frostfs_testlib.reporter import get_reporter from frostfs_testlib.reporter import get_reporter
from frostfs_testlib.shell.interfaces import CommandResult
from frostfs_testlib.storage.constants import ConfigAttributes from frostfs_testlib.storage.constants import ConfigAttributes
from frostfs_testlib.testing.readable import HumanReadableABC from frostfs_testlib.testing.readable import HumanReadableABC
from frostfs_testlib.utils import wallet_utils from frostfs_testlib.utils import wallet_utils
@ -67,6 +70,12 @@ class NodeBase(HumanReadableABC):
def service_healthcheck(self) -> bool: def service_healthcheck(self) -> bool:
"""Service healthcheck.""" """Service healthcheck."""
# TODO: Migrate to sub-class Metrcis (not yet exists :))
def get_metric(self, metric: str) -> CommandResult:
shell = self.host.get_shell()
result = shell.exec(f"curl -s {self.get_metrics_endpoint()} | grep -e '^{metric}'")
return result
def get_metrics_endpoint(self) -> str: def get_metrics_endpoint(self) -> str:
return self._get_attribute(ConfigAttributes.ENDPOINT_PROMETHEUS) return self._get_attribute(ConfigAttributes.ENDPOINT_PROMETHEUS)
@ -157,6 +166,15 @@ class NodeBase(HumanReadableABC):
def _get_service_config(self) -> ServiceConfig: def _get_service_config(self) -> ServiceConfig:
return self.host.get_service_config(self.name) return self.host.get_service_config(self.name)
def get_service_uptime(self, service: str) -> datetime:
result = self.host.get_shell().exec(
f"systemctl show {service} --property ActiveEnterTimestamp | cut -d '=' -f 2"
)
start_time = parser.parse(result.stdout.strip())
current_time = datetime.now(tz=timezone.utc)
active_time = current_time - start_time
return active_time
ServiceClass = TypeVar("ServiceClass", bound=NodeBase) ServiceClass = TypeVar("ServiceClass", bound=NodeBase)

View file

@ -3,6 +3,3 @@ import frostfs_testlib.utils.datetime_utils
import frostfs_testlib.utils.json_utils import frostfs_testlib.utils.json_utils
import frostfs_testlib.utils.string_utils import frostfs_testlib.utils.string_utils
import frostfs_testlib.utils.wallet_utils import frostfs_testlib.utils.wallet_utils
# TODO: Circullar dependency FileKeeper -> NodeBase -> Utils -> FileKeeper -> NodeBase
from frostfs_testlib.utils.file_keeper import FileKeeper

View file

@ -12,6 +12,7 @@ from frostfs_testlib.steps.node_management import storage_node_healthcheck
from frostfs_testlib.steps.storage_policy import get_nodes_with_object from frostfs_testlib.steps.storage_policy import get_nodes_with_object
from frostfs_testlib.storage.cluster import Cluster, ClusterNode, NodeBase, StorageNode from frostfs_testlib.storage.cluster import Cluster, ClusterNode, NodeBase, StorageNode
from frostfs_testlib.storage.dataclasses.frostfs_services import MorphChain from frostfs_testlib.storage.dataclasses.frostfs_services import MorphChain
from frostfs_testlib.testing.parallel import parallel
from frostfs_testlib.testing.test_control import retry, wait_for_success from frostfs_testlib.testing.test_control import retry, wait_for_success
from frostfs_testlib.utils.datetime_utils import parse_time from frostfs_testlib.utils.datetime_utils import parse_time
@ -26,12 +27,17 @@ def ping_host(shell: Shell, host: Host):
return shell.exec(f"ping {host.config.address} -c 1", options).return_code return shell.exec(f"ping {host.config.address} -c 1", options).return_code
# TODO: Move to ClusterStateController
@reporter.step_deco("Wait for storage nodes returned to cluster") @reporter.step_deco("Wait for storage nodes returned to cluster")
def wait_all_storage_nodes_returned(shell: Shell, cluster: Cluster) -> None: def wait_all_storage_nodes_returned(shell: Shell, cluster: Cluster) -> None:
for node in cluster.services(StorageNode): nodes = cluster.services(StorageNode)
with reporter.step(f"Run health check for storage at '{node}'"): parallel(_wait_for_storage_node, nodes, shell=shell)
wait_for_host_online(shell, node)
wait_for_node_online(node)
@reporter.step_deco("Run health check for storage at '{node}'")
def _wait_for_storage_node(node: StorageNode, shell: Shell) -> None:
wait_for_host_online(shell, node)
wait_for_node_online(node)
@retry(max_attempts=60, sleep_interval=5, expected_result=0) @retry(max_attempts=60, sleep_interval=5, expected_result=0)
@ -64,10 +70,17 @@ def wait_for_node_online(node: StorageNode):
except Exception as err: except Exception as err:
logger.warning(f"Node healthcheck fails with error {err}") logger.warning(f"Node healthcheck fails with error {err}")
return False return False
finally:
gather_socket_info(node)
return health_check.health_status == "READY" and health_check.network_status == "ONLINE" return health_check.health_status == "READY" and health_check.network_status == "ONLINE"
@reporter.step_deco("Gather socket info for {node}")
def gather_socket_info(node: StorageNode):
node.host.get_shell().exec("ss -tuln | grep 8080", CommandOptions(check=False))
@reporter.step_deco("Check and return status of given service") @reporter.step_deco("Check and return status of given service")
def service_status(service: str, shell: Shell) -> str: def service_status(service: str, shell: Shell) -> str:
return shell.exec(f"sudo systemctl is-active {service}").stdout.rstrip() return shell.exec(f"sudo systemctl is-active {service}").stdout.rstrip()