import random import re import allure import pytest from frostfs_testlib import reporter from frostfs_testlib.resources.error_patterns import OBJECT_NOT_FOUND from frostfs_testlib.resources.wellknown_acl import EACL_PUBLIC_READ_WRITE from frostfs_testlib.steps.cli.container import create_container from frostfs_testlib.steps.cli.object import get_object, put_object from frostfs_testlib.steps.metrics import check_metrics_counter from frostfs_testlib.steps.node_management import node_shard_list, node_shard_set_mode from frostfs_testlib.steps.storage_policy import get_nodes_with_object from frostfs_testlib.storage.cluster import Cluster, ClusterNode from frostfs_testlib.storage.controllers import ShardsWatcher from frostfs_testlib.storage.dataclasses.wallet import WalletInfo from frostfs_testlib.testing import parallel, wait_for_success from frostfs_testlib.testing.cluster_test_base import ClusterTestBase from frostfs_testlib.utils.file_utils import generate_file @pytest.mark.nightly class TestShardMetrics(ClusterTestBase): @pytest.fixture() @allure.title("Get two shards for set mode") def two_shards_and_node(self, cluster: Cluster) -> tuple[str, str, ClusterNode]: node = random.choice(cluster.cluster_nodes) shards = node_shard_list(node.storage_node) two_shards = random.sample(shards, k=2) yield two_shards[0], two_shards[1], node for shard in two_shards: node_shard_set_mode(node.storage_node, shard, "read-write") node_shard_list(node.storage_node) @pytest.fixture() @allure.title("Revert all shards mode") def revert_all_shards_mode(self): yield parallel(self.set_shard_rw_mode, self.cluster.cluster_nodes) def set_shard_rw_mode(self, node: ClusterNode): watcher = ShardsWatcher(node) shards = watcher.get_shards() for shard in shards: watcher.set_shard_mode(shard["shard_id"], mode="read-write") watcher.await_for_all_shards_status(status="read-write") @staticmethod def get_error_count_from_logs(cluster_node: ClusterNode, object_path: str, object_name: str): error_count = 0 try: logs = cluster_node.host.get_filtered_logs("error count", unit="frostfs-storage") # search error logs for current object for error_line in logs.split("\n"): if object_path in error_line and object_name in error_line: result = re.findall(r'"error\scount":\s(\d+)', error_line) error_count += sum(map(int, result)) except RuntimeError as e: ... return error_count @staticmethod @wait_for_success(180, 30) def get_object_path_and_name_file(oid: str, cid: str, node: ClusterNode) -> tuple[str, str]: oid_path = f"{oid[0]}/{oid[1]}/{oid[2]}/{oid[3]}" object_path = None with reporter.step("Search object file"): node_shell = node.storage_node.host.get_shell() data_path = node.storage_node.get_data_directory() all_datas = node_shell.exec(f"ls -la {data_path}/data | awk '{{ print $9 }}'").stdout.strip() for data_dir in all_datas.replace(".", "").strip().split("\n"): check_dir = node_shell.exec(f" [ -d {data_path}/data/{data_dir}/data/{oid_path} ] && echo 1 || echo 0").stdout if "1" in check_dir: object_path = f"{data_path}/data/{data_dir}/data/{oid_path}" object_name = f"{oid[4:]}.{cid}" break assert object_path is not None, f"{oid} object not found in directory - {data_path}/data" return object_path, object_name @allure.title("Metric for shard mode") def test_shard_metrics_set_mode(self, two_shards_and_node: tuple[str, str, ClusterNode]): metrics_counter = 1 shard1, shard2, node = two_shards_and_node with reporter.step("Shard1 set to mode 'read-only'"): node_shard_set_mode(node.storage_node, shard1, "read-only") with reporter.step(f"Check shard metrics, 'the mode will change to 'READ_ONLY'"): check_metrics_counter( [node], counter_exp=metrics_counter, command="frostfs_node_engine_mode_info", mode="READ_ONLY", shard_id=shard1, ) with reporter.step("Shard2 set to mode 'degraded-read-only'"): node_shard_set_mode(node.storage_node, shard2, "degraded-read-only") with reporter.step(f"Check shard metrics, 'the mode will change to 'DEGRADED_READ_ONLY'"): check_metrics_counter( [node], counter_exp=metrics_counter, command="frostfs_node_engine_mode_info", mode="DEGRADED_READ_ONLY", shard_id=shard2, ) with reporter.step("Both shards set to mode 'read-write'"): for shard in [shard1, shard2]: node_shard_set_mode(node.storage_node, shard, "read-write") with reporter.step(f"Check shard metrics, 'the mode will change to 'READ_WRITE'"): for shard in [shard1, shard2]: check_metrics_counter( [node], counter_exp=metrics_counter, command="frostfs_node_engine_mode_info", mode="READ_WRITE", shard_id=shard, ) @allure.title("Metric for error count on shard") def test_shard_metrics_error_count(self, max_object_size: int, default_wallet: WalletInfo, cluster: Cluster, revert_all_shards_mode): file_path = generate_file(round(max_object_size * 0.8)) with reporter.step(f"Create container"): cid = create_container( wallet=default_wallet, shell=self.shell, endpoint=cluster.default_rpc_endpoint, rule="REP 1 CBF 1", basic_acl=EACL_PUBLIC_READ_WRITE, ) with reporter.step("Put object"): oid = put_object(default_wallet, file_path, cid, self.shell, cluster.default_rpc_endpoint) with reporter.step("Get object nodes"): object_storage_nodes = get_nodes_with_object(cid, oid, self.shell, cluster.storage_nodes) object_nodes = [cluster_node for cluster_node in cluster.cluster_nodes if cluster_node.storage_node in object_storage_nodes] node = random.choice(object_nodes) with reporter.step("Search object in system."): object_path, object_name = self.get_object_path_and_name_file(oid, cid, node) with reporter.step("Block read file"): node.host.get_shell().exec(f"chmod a-r {object_path}/{object_name}") with reporter.step("Get object, expect error"): with pytest.raises(RuntimeError, match=OBJECT_NOT_FOUND): get_object( wallet=default_wallet, cid=cid, oid=oid, shell=self.shell, endpoint=node.storage_node.get_rpc_endpoint(), ) with reporter.step(f"Get shard error count from logs"): counter = self.get_error_count_from_logs(node, object_path, object_name) with reporter.step(f"Check shard error metrics"): check_metrics_counter([node], counter_exp=counter, command="frostfs_node_engine_errors_total")