diff --git a/pytest_tests/testsuites/conftest.py b/pytest_tests/testsuites/conftest.py index 279024bb..92c7a06e 100644 --- a/pytest_tests/testsuites/conftest.py +++ b/pytest_tests/testsuites/conftest.py @@ -23,7 +23,7 @@ from frostfs_testlib.resources.common import ( ) from frostfs_testlib.s3 import AwsCliClient, Boto3ClientWrapper, S3ClientWrapper, VersioningStatus from frostfs_testlib.shell import LocalShell, Shell -from frostfs_testlib.steps.cli.container import DEFAULT_PLACEMENT_RULE, DEFAULT_EC_PLACEMENT_RULE +from frostfs_testlib.steps.cli.container import DEFAULT_EC_PLACEMENT_RULE, DEFAULT_PLACEMENT_RULE from frostfs_testlib.steps.cli.object import get_netmap_netinfo from frostfs_testlib.steps.s3 import s3_helper from frostfs_testlib.storage import get_service_registry @@ -210,6 +210,7 @@ def object_size( return complex_object_size + @pytest.fixture(scope="session") def rep_placement_policy() -> PlacementPolicy: return PlacementPolicy("rep", DEFAULT_PLACEMENT_RULE) @@ -234,6 +235,7 @@ def placement_policy( return ec_placement_policy + @pytest.fixture(scope="session") def cluster(temp_directory: str, hosting: Hosting, client_shell: Shell) -> Cluster: cluster = Cluster(hosting) @@ -266,10 +268,12 @@ def healthcheck(cluster: Cluster) -> Healthcheck: return healthcheck_cls() -@pytest.fixture(scope="session") +@pytest.fixture def cluster_state_controller(client_shell: Shell, cluster: Cluster, healthcheck: Healthcheck) -> ClusterStateController: controller = ClusterStateController(client_shell, cluster, healthcheck) yield controller + controller.start_stopped_hosts() + controller.start_all_stopped_services() @pytest.fixture(scope="session") diff --git a/pytest_tests/testsuites/metrics/test_object_metrics.py b/pytest_tests/testsuites/metrics/test_object_metrics.py new file mode 100644 index 00000000..ef78f7c8 --- /dev/null +++ b/pytest_tests/testsuites/metrics/test_object_metrics.py @@ -0,0 +1,318 @@ +import random +import re + +import allure +import pytest +from frostfs_testlib import reporter +from frostfs_testlib.steps.cli.container import create_container, delete_container, search_nodes_with_container +from frostfs_testlib.steps.cli.object import ( + delete_object, + get_object_nodes, + lock_object, + put_object, + put_object_to_random_node, +) +from frostfs_testlib.storage.cluster import Cluster, ClusterNode +from frostfs_testlib.storage.controllers.cluster_state_controller import ClusterStateController +from frostfs_testlib.storage.dataclasses.object_size import ObjectSize +from frostfs_testlib.storage.dataclasses.wallet import WalletInfo +from frostfs_testlib.testing.cluster_test_base import ClusterTestBase +from frostfs_testlib.testing.test_control import wait_for_success +from frostfs_testlib.utils.file_utils import generate_file + + +class TestObjectMetrics(ClusterTestBase): + @wait_for_success(interval=10) + def check_metrics_by_type( + self, cluster_nodes: list[ClusterNode], metric_command: str, grep_by: str, metric_type: str, counter_exp: int + ): + counter_act = 0 + for cluster_node in cluster_nodes: + try: + metric_result = cluster_node.metrics.storage.get_metrics_search_by_greps( + command=metric_command, grep_by=grep_by + ) + counter_act += self.calc_metrics_count_from_stdout(metric_result.stdout, metric_type) + except RuntimeError as e: + ... + assert ( + counter_act == counter_exp + ), f"Expected metric {metric_type}={counter_exp}, Actual: {counter_act} in nodes: {cluster_nodes}" + + @staticmethod + def calc_metrics_count_from_stdout(metric_result_stdout: str, metric_type: str): + result = re.findall(rf'type="{metric_type}"}}\s(\d+)', metric_result_stdout) + return sum(map(int, result)) + + @wait_for_success(interval=10) + def check_object_metrics_total_and_container( + self, cluster_nodes: list[ClusterNode], cid: str, objects_metric_total: int, objects_metric_container: int + ): + self.check_metrics_by_type( + cluster_nodes, + "frostfs_node_engine_objects_total", + grep_by="user", + metric_type="user", + counter_exp=objects_metric_total, + ) + + objects_metric_container_act = 0 + for node in cluster_nodes: + try: + metrics_container = node.metrics.storage.get_metrics_search_by_greps( + command="frostfs_node_engine_container_objects_total", cid=cid, type="user" + ) + objects_metric_container_act += self.calc_metrics_count_from_stdout( + metrics_container.stdout, metric_type="user" + ) + except RuntimeError as e: + ... + assert ( + objects_metric_container_act == objects_metric_container + ), f"Expected {objects_metric_container} objects in container" + + @wait_for_success(max_wait_time=120, interval=10) + def check_object_metrics_container( + self, cluster_nodes: list[ClusterNode], cid: str, objects_metric_container_exp: int + ): + objects_metric_container_act = 0 + for node in cluster_nodes: + try: + metrics_container = node.metrics.storage.get_metrics_search_by_greps( + command="frostfs_node_engine_container_objects_total", cid=cid, type="user" + ) + objects_metric_container_act += self.calc_metrics_count_from_stdout( + metrics_container.stdout, metric_type="user" + ) + except RuntimeError as e: + ... + assert ( + objects_metric_container_act == objects_metric_container_exp + ), f"Expected {objects_metric_container_exp} objects in container" + + @allure.title("Object metrics of removed container") + def test_object_metrics_removed_container( + self, object_size: ObjectSize, default_wallet: WalletInfo, cluster: Cluster + ): + file_path = generate_file(object_size.value) + placement_policy = "REP 2 IN X CBF 2 SELECT 2 FROM * AS X" + copies = 2 + + with reporter.step(f"Create container with policy {placement_policy}"): + cid = create_container( + default_wallet, + rule=placement_policy, + shell=self.shell, + endpoint=self.cluster.default_rpc_endpoint, + ) + + with reporter.step("Put object to random node"): + storage_object_id = put_object_to_random_node( + wallet=default_wallet, + path=file_path, + cid=cid, + shell=self.shell, + cluster=cluster, + ) + + with reporter.step("Check metric appears in node where the object is located"): + object_nodes = get_object_nodes( + cluster=cluster, cid=cid, oid=storage_object_id, alive_node=cluster.cluster_nodes[0] + ) + + self.check_metrics_by_type(object_nodes, "frostfs_node_engine_container_objects_total", cid, "user", copies) + + with reporter.step("Delete container"): + delete_container(default_wallet, cid, shell=self.shell, endpoint=self.cluster.default_rpc_endpoint) + + with reporter.step("Tick Epoch"): + self.tick_epochs(epochs_to_tick=2, wait_block=2) + + with reporter.step("Check metrics of removed containers doesn't appear in the storage node"): + self.check_metrics_by_type(object_nodes, "frostfs_node_engine_container_objects_total", cid, "user", 0) + + for node in object_nodes: + with pytest.raises(RuntimeError): + node.metrics.storage.get_metric_container(f"frostfs_node_engine_container_size_byte", cid) + + @allure.title("Object metrics, locked object, (policy={placement_policy})") + @pytest.mark.parametrize( + "placement_policy", ["REP 1 IN X CBF 1 SELECT 1 FROM * AS X", "REP 2 IN X CBF 2 SELECT 2 FROM * AS X"] + ) + def test_object_metrics_blocked_object( + self, object_size: ObjectSize, default_wallet: WalletInfo, cluster: Cluster, placement_policy: str + ): + file_path = generate_file(object_size.value) + metric_step = int(re.search(r"REP\s(\d+)", placement_policy).group(1)) + + with reporter.step(f"Create container with policy {placement_policy}"): + cid = create_container( + wallet=default_wallet, + rule=placement_policy, + shell=self.shell, + endpoint=self.cluster.default_rpc_endpoint, + ) + + with reporter.step("Search container nodes"): + container_nodes = search_nodes_with_container( + wallet=default_wallet, + cid=cid, + shell=self.shell, + endpoint=self.cluster.default_rpc_endpoint, + cluster=cluster, + ) + + with reporter.step("Get current metrics for metric_type=user"): + objects_metric_counter = 0 + for node in container_nodes: + metric_objects_total = node.metrics.storage.get_metrics_search_by_greps( + command="frostfs_node_engine_objects_total", type="user" + ) + objects_metric_counter += self.calc_metrics_count_from_stdout( + metric_objects_total.stdout, metric_type="user" + ) + + with reporter.step("Put object to container node"): + oid = put_object( + wallet=default_wallet, + path=file_path, + cid=cid, + shell=self.shell, + endpoint=container_nodes[0].storage_node.get_rpc_endpoint(), + ) + + with reporter.step(f"Check metric user 'the counter should increase by {metric_step}'"): + objects_metric_counter += metric_step + self.check_object_metrics_total_and_container(container_nodes, cid, objects_metric_counter, metric_step) + + with reporter.step("Delete object"): + delete_object(default_wallet, cid, oid, self.shell, self.cluster.default_rpc_endpoint) + + with reporter.step(f"Check metric user 'the counter should decrease by {metric_step}'"): + objects_metric_counter -= metric_step + self.check_object_metrics_total_and_container(container_nodes, cid, objects_metric_counter, 0) + + with reporter.step("Put object and lock it"): + oid = put_object( + default_wallet, file_path, cid, self.shell, container_nodes[0].storage_node.get_rpc_endpoint() + ) + current_epoch = self.get_epoch() + lock_object( + default_wallet, + cid, + oid, + self.shell, + container_nodes[0].storage_node.get_rpc_endpoint(), + expire_at=current_epoch + 1, + ) + + with reporter.step(f"Check metric user 'the counter should increase by {metric_step}'"): + objects_metric_counter += metric_step + self.check_object_metrics_total_and_container(container_nodes, cid, objects_metric_counter, metric_step) + + with reporter.step(f"Wait until remove locking 'the counter doesn't change'"): + self.tick_epochs(epochs_to_tick=2) + self.check_object_metrics_total_and_container(container_nodes, cid, objects_metric_counter, metric_step) + + with reporter.step("Delete object"): + delete_object(default_wallet, cid, oid, self.shell, self.cluster.default_rpc_endpoint) + + with reporter.step(f"Check metric user 'the counter should decrease by {metric_step}'"): + objects_metric_counter -= metric_step + self.check_object_metrics_total_and_container(container_nodes, cid, objects_metric_counter, 0) + + with reporter.step("Put object with expire_at"): + current_epoch = self.get_epoch() + oid = put_object( + default_wallet, + file_path, + cid, + self.shell, + container_nodes[0].storage_node.get_rpc_endpoint(), + expire_at=current_epoch + 1, + ) + + with reporter.step(f"Check metric user 'the counter should increase by {metric_step}'"): + objects_metric_counter += metric_step + self.check_object_metrics_total_and_container(container_nodes, cid, objects_metric_counter, metric_step) + + with reporter.step("Tick Epoch"): + self.tick_epochs(epochs_to_tick=2) + + with reporter.step(f"Check metric user 'the counter should decrease by {metric_step}'"): + objects_metric_counter -= metric_step + self.check_object_metrics_total_and_container(container_nodes, cid, objects_metric_counter, 0) + + with reporter.step("Delete container"): + delete_container(default_wallet, cid, shell=self.shell, endpoint=self.cluster.default_rpc_endpoint) + + @allure.title("Object metrics, stop the node") + def test_object_metrics_stop_node( + self, + object_size: ObjectSize, + max_object_size: int, + default_wallet: WalletInfo, + cluster_state_controller: ClusterStateController, + ): + placement_policy = "REP 2 IN X CBF 2 SELECT 2 FROM * AS X" + file_path = generate_file(object_size.value) + copies = 2 + + with reporter.step(f"Create container with policy {placement_policy}"): + cid = create_container( + wallet=default_wallet, + rule=placement_policy, + shell=self.shell, + endpoint=self.cluster.default_rpc_endpoint, + ) + + with reporter.step("Search container nodes"): + container_nodes = search_nodes_with_container( + wallet=default_wallet, + cid=cid, + shell=self.shell, + endpoint=self.cluster.default_rpc_endpoint, + cluster=self.cluster, + ) + + with reporter.step("Get current metrics for container nodes"): + objects_metric_counter = 0 + for node in container_nodes: + metric_objects_total = node.metrics.storage.get_metrics_search_by_greps( + command="frostfs_node_engine_objects_total", type="user" + ) + objects_metric_counter += self.calc_metrics_count_from_stdout( + metric_objects_total.stdout, metric_type="user" + ) + + with reporter.step("Put object to container node"): + oid = put_object( + wallet=default_wallet, + path=file_path, + cid=cid, + shell=self.shell, + endpoint=container_nodes[0].storage_node.get_rpc_endpoint(), + ) + + with reporter.step(f"Check metric in container nodes 'the counter should increase by {copies}'"): + objects_metric_counter += copies + self.check_object_metrics_total_and_container(container_nodes, cid, objects_metric_counter, copies) + + with reporter.step(f"Select node to stop"): + node_to_stop = container_nodes[0] + alive_nodes = [node for node in container_nodes if node != node_to_stop] + + with reporter.step(f"Stop the node, wait until the object is replicated to another node"): + cluster_state_controller.stop_node_host(node_to_stop, "hard") + + with reporter.step(f"Check metric in alive nodes 'the counter should increase by 1'"): + self.check_object_metrics_container(alive_nodes, cid, copies) + + with reporter.step("Start node"): + cluster_state_controller.start_node_host(node_to_stop) + + with reporter.step(f"Check metric in container nodes 'the counter doesn't change'"): + self.check_object_metrics_total_and_container(container_nodes, cid, objects_metric_counter, copies) + + with reporter.step("Delete container"): + delete_container(default_wallet, cid, shell=self.shell, endpoint=self.cluster.default_rpc_endpoint)