diff --git a/pytest_tests/testsuites/shard/test_control_shard.py b/pytest_tests/testsuites/shard/test_control_shard.py index 1de2d48c..0f3297b5 100644 --- a/pytest_tests/testsuites/shard/test_control_shard.py +++ b/pytest_tests/testsuites/shard/test_control_shard.py @@ -3,15 +3,80 @@ import pathlib import allure import pytest +from frostfs_testlib import reporter from frostfs_testlib.cli import FrostfsCli from frostfs_testlib.resources.cli import CLI_DEFAULT_TIMEOUT from frostfs_testlib.resources.common import DEFAULT_WALLET_CONFIG -from frostfs_testlib.storage.cluster import Cluster, StorageNode +from frostfs_testlib.steps.cli.container import create_container, delete_container +from frostfs_testlib.steps.cli.object import delete_object, get_object, get_object_nodes, put_object +from frostfs_testlib.storage.cluster import Cluster, ClusterNode, StorageNode +from frostfs_testlib.storage.controllers import ClusterStateController, ShardsWatcher +from frostfs_testlib.storage.controllers.state_managers.config_state_manager import ConfigStateManager from frostfs_testlib.storage.dataclasses.shard import Shard +from frostfs_testlib.testing import parallel +from frostfs_testlib.testing.cluster_test_base import ClusterTestBase +from frostfs_testlib.utils.file_utils import generate_file @pytest.mark.shard -class TestControlShard: +class TestControlShard(ClusterTestBase): + @staticmethod + def get_object_path_and_name_file(oid: str, cid: str, node: ClusterNode) -> tuple[str, str]: + oid_path = f"{oid[0]}/{oid[1]}/{oid[2]}/{oid[3]}" + with reporter.step("Search object file"): + node_shell = node.storage_node.host.get_shell() + data_path = node.storage_node.get_data_directory() + all_datas = node_shell.exec(f"ls -la {data_path} | awk '{{ print $9 }}'").stdout.strip() + for data in all_datas.replace(".", "").strip().split("\n"): + check_dir = node_shell.exec(f" [ -d {data_path}/{data}/{oid_path} ] && echo 1 || echo 0").stdout + if "1" in check_dir: + object_path = f"{data_path}/{data}/{oid_path}" + object_name = f"{oid[4:]}.{cid}" + break + return object_path, object_name + + def set_shard_rw_mode(self, node: ClusterNode): + watcher = ShardsWatcher(node) + shards = watcher.get_shards() + for shard in shards: + watcher.set_shard_mode(shard["shard_id"], mode="read-write") + watcher.await_for_all_shards_status(status="read-write") + + @pytest.fixture() + @allure.title("Revert all shards mode") + def revert_all_shards_mode(self) -> None: + yield + parallel(self.set_shard_rw_mode, self.cluster.cluster_nodes) + + @pytest.fixture() + def oid_cid_node(self, default_wallet: str) -> tuple[str, str, ClusterNode]: + with reporter.step("Create container, and put object"): + cid = create_container( + wallet=default_wallet, shell=self.shell, endpoint=self.cluster.default_rpc_endpoint, rule="REP 1 CBF 1" + ) + file = generate_file(5242880) + oid = put_object( + wallet=default_wallet, path=file, cid=cid, shell=self.shell, endpoint=self.cluster.default_rpc_endpoint + ) + with reporter.step("Search node with object"): + nodes = get_object_nodes( + cluster=self.cluster, + wallet=default_wallet, + cid=cid, + oid=oid, + shell=self.shell, + endpoint=self.cluster.default_rpc_endpoint, + ) + + yield oid, cid, nodes[0] + + object_path, object_name = self.get_object_path_and_name_file(oid, cid, nodes[0]) + nodes[0].host.get_shell().exec(f"chmod +r {object_path}/{object_name}") + delete_object( + wallet=default_wallet, cid=cid, oid=oid, shell=self.shell, endpoint=self.cluster.default_rpc_endpoint + ) + delete_container(wallet=default_wallet, cid=cid, shell=self.shell, endpoint=self.cluster.default_rpc_endpoint) + @staticmethod def get_shards_from_config(node: StorageNode) -> list[Shard]: config_file = node.get_shard_config_path() @@ -44,6 +109,16 @@ class TestControlShard: ) return [Shard.from_object(shard) for shard in json.loads(result.stdout.split(">", 1)[1])] + @pytest.fixture() + def change_config_storage(self, cluster_state_controller: ClusterStateController): + with reporter.step("Change threshold error shards"): + cluster_state_controller.manager(ConfigStateManager).set_on_all_nodes( + service_type=StorageNode, values={"storage:shard_ro_error_threshold": "5"} + ) + yield + with reporter.step("Restore threshold error shards"): + cluster_state_controller.manager(ConfigStateManager).revert_all() + @allure.title("All shards are available") def test_control_shard(self, cluster: Cluster): for storage_node in cluster.storage_nodes: @@ -51,3 +126,32 @@ class TestControlShard: shards_from_cli = self.get_shards_from_cli(storage_node) assert set(shards_from_config) == set(shards_from_cli) + + @pytest.mark.failover + def test_shard_errors( + self, + default_wallet: str, + oid_cid_node: tuple[str, str, ClusterNode], + change_config_storage: None, + revert_all_shards_mode: None, + ): + oid, cid, node = oid_cid_node + object_path, object_name = self.get_object_path_and_name_file(oid, cid, node) + with reporter.step("Block read file"): + node.host.get_shell().exec(f"chmod a-r {object_path}/{object_name}") + with reporter.step("Get object, expect 6 errors"): + for _ in range(6): + with pytest.raises(RuntimeError): + get_object( + wallet=default_wallet, + cid=cid, + oid=oid, + shell=self.shell, + endpoint=node.storage_node.get_rpc_endpoint(), + ) + with reporter.step("Check shard status"): + for shard in ShardsWatcher(node).get_shards(): + if shard["blobstor"][1]["path"] in object_path: + with reporter.step(f"Shard - {shard['shard_id']} to {node.host_ip}, mode - {shard['mode']}"): + assert shard["mode"] == "read-only" + break