import random import re import allure import pytest from frostfs_testlib import reporter from frostfs_testlib.steps.cli.container import create_container, delete_container, search_nodes_with_container from frostfs_testlib.steps.cli.object import ( delete_object, get_object_nodes, lock_object, put_object, put_object_to_random_node, ) from frostfs_testlib.storage.cluster import Cluster, ClusterNode from frostfs_testlib.storage.controllers.cluster_state_controller import ClusterStateController from frostfs_testlib.storage.dataclasses.object_size import ObjectSize from frostfs_testlib.storage.dataclasses.wallet import WalletInfo from frostfs_testlib.testing.cluster_test_base import ClusterTestBase from frostfs_testlib.testing.test_control import wait_for_success from frostfs_testlib.utils.file_utils import generate_file class TestObjectMetrics(ClusterTestBase): @wait_for_success(interval=10) def check_metrics_by_type( self, cluster_nodes: list[ClusterNode], metric_command: str, grep_by: str, metric_type: str, counter_exp: int ): counter_act = 0 for cluster_node in cluster_nodes: try: metric_result = cluster_node.metrics.storage.get_metrics_search_by_greps( command=metric_command, grep_by=grep_by ) counter_act += self.calc_metrics_count_from_stdout(metric_result.stdout, metric_type) except RuntimeError as e: ... assert ( counter_act == counter_exp ), f"Expected metric {metric_type}={counter_exp}, Actual: {counter_act} in nodes: {cluster_nodes}" @staticmethod def calc_metrics_count_from_stdout(metric_result_stdout: str, metric_type: str): result = re.findall(rf'type="{metric_type}"}}\s(\d+)', metric_result_stdout) return sum(map(int, result)) @wait_for_success(interval=10) def check_object_metrics_total_and_container( self, cluster_nodes: list[ClusterNode], cid: str, objects_metric_total: int, objects_metric_container: int ): self.check_metrics_by_type( cluster_nodes, "frostfs_node_engine_objects_total", grep_by="user", metric_type="user", counter_exp=objects_metric_total, ) objects_metric_container_act = 0 for node in cluster_nodes: try: metrics_container = node.metrics.storage.get_metrics_search_by_greps( command="frostfs_node_engine_container_objects_total", cid=cid, type="user" ) objects_metric_container_act += self.calc_metrics_count_from_stdout( metrics_container.stdout, metric_type="user" ) except RuntimeError as e: ... assert ( objects_metric_container_act == objects_metric_container ), f"Expected {objects_metric_container} objects in container" @wait_for_success(max_wait_time=120, interval=10) def check_object_metrics_container( self, cluster_nodes: list[ClusterNode], cid: str, objects_metric_container_exp: int ): objects_metric_container_act = 0 for node in cluster_nodes: try: metrics_container = node.metrics.storage.get_metrics_search_by_greps( command="frostfs_node_engine_container_objects_total", cid=cid, type="user" ) objects_metric_container_act += self.calc_metrics_count_from_stdout( metrics_container.stdout, metric_type="user" ) except RuntimeError as e: ... assert ( objects_metric_container_act == objects_metric_container_exp ), f"Expected {objects_metric_container_exp} objects in container" @allure.title("Object metrics of removed container") def test_object_metrics_removed_container( self, object_size: ObjectSize, default_wallet: WalletInfo, cluster: Cluster ): file_path = generate_file(object_size.value) placement_policy = "REP 2 IN X CBF 2 SELECT 2 FROM * AS X" copies = 2 with reporter.step(f"Create container with policy {placement_policy}"): cid = create_container( default_wallet, rule=placement_policy, shell=self.shell, endpoint=self.cluster.default_rpc_endpoint, ) with reporter.step("Put object to random node"): storage_object_id = put_object_to_random_node( wallet=default_wallet, path=file_path, cid=cid, shell=self.shell, cluster=cluster, ) with reporter.step("Check metric appears in node where the object is located"): object_nodes = get_object_nodes( cluster=cluster, cid=cid, oid=storage_object_id, alive_node=cluster.cluster_nodes[0] ) self.check_metrics_by_type(object_nodes, "frostfs_node_engine_container_objects_total", cid, "user", copies) with reporter.step("Delete container"): delete_container(default_wallet, cid, shell=self.shell, endpoint=self.cluster.default_rpc_endpoint) with reporter.step("Tick Epoch"): self.tick_epochs(epochs_to_tick=2, wait_block=2) with reporter.step("Check metrics of removed containers doesn't appear in the storage node"): self.check_metrics_by_type(object_nodes, "frostfs_node_engine_container_objects_total", cid, "user", 0) for node in object_nodes: with pytest.raises(RuntimeError): node.metrics.storage.get_metric_container(f"frostfs_node_engine_container_size_byte", cid) @allure.title("Object metrics, locked object, (policy={placement_policy})") @pytest.mark.parametrize( "placement_policy", ["REP 1 IN X CBF 1 SELECT 1 FROM * AS X", "REP 2 IN X CBF 2 SELECT 2 FROM * AS X"] ) def test_object_metrics_blocked_object( self, object_size: ObjectSize, default_wallet: WalletInfo, cluster: Cluster, placement_policy: str ): file_path = generate_file(object_size.value) metric_step = int(re.search(r"REP\s(\d+)", placement_policy).group(1)) with reporter.step(f"Create container with policy {placement_policy}"): cid = create_container( wallet=default_wallet, rule=placement_policy, shell=self.shell, endpoint=self.cluster.default_rpc_endpoint, ) with reporter.step("Search container nodes"): container_nodes = search_nodes_with_container( wallet=default_wallet, cid=cid, shell=self.shell, endpoint=self.cluster.default_rpc_endpoint, cluster=cluster, ) with reporter.step("Get current metrics for metric_type=user"): objects_metric_counter = 0 for node in container_nodes: metric_objects_total = node.metrics.storage.get_metrics_search_by_greps( command="frostfs_node_engine_objects_total", type="user" ) objects_metric_counter += self.calc_metrics_count_from_stdout( metric_objects_total.stdout, metric_type="user" ) with reporter.step("Put object to container node"): oid = put_object( wallet=default_wallet, path=file_path, cid=cid, shell=self.shell, endpoint=container_nodes[0].storage_node.get_rpc_endpoint(), ) with reporter.step(f"Check metric user 'the counter should increase by {metric_step}'"): objects_metric_counter += metric_step self.check_object_metrics_total_and_container(container_nodes, cid, objects_metric_counter, metric_step) with reporter.step("Delete object"): delete_object(default_wallet, cid, oid, self.shell, self.cluster.default_rpc_endpoint) with reporter.step(f"Check metric user 'the counter should decrease by {metric_step}'"): objects_metric_counter -= metric_step self.check_object_metrics_total_and_container(container_nodes, cid, objects_metric_counter, 0) with reporter.step("Put object and lock it"): oid = put_object( default_wallet, file_path, cid, self.shell, container_nodes[0].storage_node.get_rpc_endpoint() ) current_epoch = self.get_epoch() lock_object( default_wallet, cid, oid, self.shell, container_nodes[0].storage_node.get_rpc_endpoint(), expire_at=current_epoch + 1, ) with reporter.step(f"Check metric user 'the counter should increase by {metric_step}'"): objects_metric_counter += metric_step self.check_object_metrics_total_and_container(container_nodes, cid, objects_metric_counter, metric_step) with reporter.step(f"Wait until remove locking 'the counter doesn't change'"): self.tick_epochs(epochs_to_tick=2) self.check_object_metrics_total_and_container(container_nodes, cid, objects_metric_counter, metric_step) with reporter.step("Delete object"): delete_object(default_wallet, cid, oid, self.shell, self.cluster.default_rpc_endpoint) with reporter.step(f"Check metric user 'the counter should decrease by {metric_step}'"): objects_metric_counter -= metric_step self.check_object_metrics_total_and_container(container_nodes, cid, objects_metric_counter, 0) with reporter.step("Put object with expire_at"): current_epoch = self.get_epoch() oid = put_object( default_wallet, file_path, cid, self.shell, container_nodes[0].storage_node.get_rpc_endpoint(), expire_at=current_epoch + 1, ) with reporter.step(f"Check metric user 'the counter should increase by {metric_step}'"): objects_metric_counter += metric_step self.check_object_metrics_total_and_container(container_nodes, cid, objects_metric_counter, metric_step) with reporter.step("Tick Epoch"): self.tick_epochs(epochs_to_tick=2) with reporter.step(f"Check metric user 'the counter should decrease by {metric_step}'"): objects_metric_counter -= metric_step self.check_object_metrics_total_and_container(container_nodes, cid, objects_metric_counter, 0) with reporter.step("Delete container"): delete_container(default_wallet, cid, shell=self.shell, endpoint=self.cluster.default_rpc_endpoint) @allure.title("Object metrics, stop the node") def test_object_metrics_stop_node( self, object_size: ObjectSize, max_object_size: int, default_wallet: WalletInfo, cluster_state_controller: ClusterStateController, ): placement_policy = "REP 2 IN X CBF 2 SELECT 2 FROM * AS X" file_path = generate_file(object_size.value) copies = 2 with reporter.step(f"Create container with policy {placement_policy}"): cid = create_container( wallet=default_wallet, rule=placement_policy, shell=self.shell, endpoint=self.cluster.default_rpc_endpoint, ) with reporter.step("Search container nodes"): container_nodes = search_nodes_with_container( wallet=default_wallet, cid=cid, shell=self.shell, endpoint=self.cluster.default_rpc_endpoint, cluster=self.cluster, ) with reporter.step("Get current metrics for container nodes"): objects_metric_counter = 0 for node in container_nodes: metric_objects_total = node.metrics.storage.get_metrics_search_by_greps( command="frostfs_node_engine_objects_total", type="user" ) objects_metric_counter += self.calc_metrics_count_from_stdout( metric_objects_total.stdout, metric_type="user" ) with reporter.step("Put object to container node"): oid = put_object( wallet=default_wallet, path=file_path, cid=cid, shell=self.shell, endpoint=container_nodes[0].storage_node.get_rpc_endpoint(), ) with reporter.step(f"Check metric in container nodes 'the counter should increase by {copies}'"): objects_metric_counter += copies self.check_object_metrics_total_and_container(container_nodes, cid, objects_metric_counter, copies) with reporter.step(f"Select node to stop"): node_to_stop = container_nodes[0] alive_nodes = [node for node in container_nodes if node != node_to_stop] with reporter.step(f"Stop the node, wait until the object is replicated to another node"): cluster_state_controller.stop_node_host(node_to_stop, "hard") with reporter.step(f"Check metric in alive nodes 'the counter should increase by 1'"): self.check_object_metrics_container(alive_nodes, cid, copies) with reporter.step("Start node"): cluster_state_controller.start_node_host(node_to_stop) with reporter.step(f"Check metric in container nodes 'the counter doesn't change'"): self.check_object_metrics_total_and_container(container_nodes, cid, objects_metric_counter, copies) with reporter.step("Delete container"): delete_container(default_wallet, cid, shell=self.shell, endpoint=self.cluster.default_rpc_endpoint)