import random import re import allure import pytest from frostfs_testlib import reporter from frostfs_testlib.steps.cli.container import create_container, delete_container, search_nodes_with_container from frostfs_testlib.steps.cli.object import delete_object, lock_object, put_object, put_object_to_random_node from frostfs_testlib.steps.metrics import check_metrics_counter, get_metrics_value from frostfs_testlib.steps.storage_policy import get_nodes_with_object from frostfs_testlib.storage.cluster import Cluster, ClusterNode from frostfs_testlib.storage.controllers.cluster_state_controller import ClusterStateController from frostfs_testlib.storage.dataclasses.object_size import ObjectSize from frostfs_testlib.storage.dataclasses.wallet import WalletInfo from frostfs_testlib.testing.cluster_test_base import ClusterTestBase from frostfs_testlib.utils.file_utils import generate_file class TestObjectMetrics(ClusterTestBase): @allure.title("Object metrics of removed container (obj_size={object_size})") def test_object_metrics_removed_container( self, object_size: ObjectSize, default_wallet: WalletInfo, cluster: Cluster ): file_path = generate_file(object_size.value) placement_policy = "REP 2 IN X CBF 2 SELECT 2 FROM * AS X" copies = 2 with reporter.step(f"Create container with policy {placement_policy}"): cid = create_container(default_wallet, self.shell, cluster.default_rpc_endpoint, placement_policy) with reporter.step("Put object to random node"): oid = put_object_to_random_node(default_wallet, file_path, cid, self.shell, cluster) with reporter.step("Check metric appears in node where the object is located"): object_storage_nodes = get_nodes_with_object(cid, oid, self.shell, cluster.storage_nodes) object_nodes = [ cluster_node for cluster_node in cluster.cluster_nodes if cluster_node.storage_node in object_storage_nodes ] check_metrics_counter( object_nodes, counter_exp=copies, command="frostfs_node_engine_container_objects_total", cid=cid, type="user", ) with reporter.step("Delete container"): delete_container(default_wallet, cid, shell=self.shell, endpoint=self.cluster.default_rpc_endpoint) with reporter.step("Tick Epoch"): self.tick_epochs(epochs_to_tick=2, wait_block=2) with reporter.step("Check metrics of removed containers doesn't appear in the storage node"): check_metrics_counter( object_nodes, counter_exp=0, command="frostfs_node_engine_container_objects_total", cid=cid, type="user" ) check_metrics_counter( object_nodes, counter_exp=0, command="frostfs_node_engine_container_size_byte", cid=cid ) for node in object_nodes: all_metrics = node.metrics.storage.get_all_metrics() assert ( cid not in all_metrics.stdout ), "metrics of removed containers shouldn't appear in the storage node" @allure.title("Object metrics, locked object (obj_size={object_size}, policy={placement_policy})") @pytest.mark.parametrize( "placement_policy", ["REP 1 IN X CBF 1 SELECT 1 FROM * AS X", "REP 2 IN X CBF 2 SELECT 2 FROM * AS X"] ) def test_object_metrics_blocked_object( self, object_size: ObjectSize, default_wallet: WalletInfo, cluster: Cluster, placement_policy: str ): file_path = generate_file(object_size.value) metric_step = int(re.search(r"REP\s(\d+)", placement_policy).group(1)) with reporter.step(f"Create container with policy {placement_policy}"): cid = create_container(default_wallet, self.shell, cluster.default_rpc_endpoint, placement_policy) with reporter.step("Search container nodes"): container_nodes = search_nodes_with_container( wallet=default_wallet, cid=cid, shell=self.shell, endpoint=self.cluster.default_rpc_endpoint, cluster=cluster, ) with reporter.step("Get current metrics for metric_type=user"): objects_metric_counter = 0 for node in container_nodes: objects_metric_counter += get_metrics_value( node, command="frostfs_node_engine_objects_total", type="user" ) with reporter.step("Put object to container node"): oid = put_object( default_wallet, file_path, cid, self.shell, container_nodes[0].storage_node.get_rpc_endpoint() ) with reporter.step(f"Check metric user 'the counter should increase by {metric_step}'"): objects_metric_counter += metric_step check_metrics_counter( container_nodes, counter_exp=objects_metric_counter, command="frostfs_node_engine_objects_total", type="user", ) check_metrics_counter( container_nodes, counter_exp=metric_step, command="frostfs_node_engine_container_objects_total", cid=cid, type="user", ) with reporter.step("Delete object"): delete_object(default_wallet, cid, oid, self.shell, self.cluster.default_rpc_endpoint) with reporter.step(f"Check metric user 'the counter should decrease by {metric_step}'"): objects_metric_counter -= metric_step check_metrics_counter( container_nodes, counter_exp=objects_metric_counter, command="frostfs_node_engine_objects_total", type="user", ) check_metrics_counter( container_nodes, counter_exp=0, command="frostfs_node_engine_container_objects_total", cid=cid, type="user", ) with reporter.step("Put object and lock it to next epoch"): oid = put_object( default_wallet, file_path, cid, self.shell, container_nodes[0].storage_node.get_rpc_endpoint() ) current_epoch = self.get_epoch() lock_object( default_wallet, cid, oid, self.shell, container_nodes[0].storage_node.get_rpc_endpoint(), expire_at=current_epoch + 1, ) with reporter.step(f"Check metric user 'the counter should increase by {metric_step}'"): objects_metric_counter += metric_step check_metrics_counter( container_nodes, counter_exp=objects_metric_counter, command="frostfs_node_engine_objects_total", type="user", ) check_metrics_counter( container_nodes, counter_exp=metric_step, command="frostfs_node_engine_container_objects_total", cid=cid, type="user", ) with reporter.step(f"Wait until remove locking 'the counter doesn't change'"): self.tick_epochs(epochs_to_tick=2) check_metrics_counter( container_nodes, counter_exp=objects_metric_counter, command="frostfs_node_engine_objects_total", type="user", ) with reporter.step("Delete object"): delete_object(default_wallet, cid, oid, self.shell, self.cluster.default_rpc_endpoint) with reporter.step(f"Check metric user 'the counter should decrease by {metric_step}'"): objects_metric_counter -= metric_step check_metrics_counter( container_nodes, counter_exp=objects_metric_counter, command="frostfs_node_engine_objects_total", type="user", ) check_metrics_counter( container_nodes, counter_exp=0, command="frostfs_node_engine_container_objects_total", cid=cid, type="user", ) with reporter.step("Put object with expire_at"): current_epoch = self.get_epoch() oid = put_object( default_wallet, file_path, cid, self.shell, container_nodes[0].storage_node.get_rpc_endpoint(), expire_at=current_epoch + 1, ) with reporter.step(f"Check metric user 'the counter should increase by {metric_step}'"): objects_metric_counter += metric_step check_metrics_counter( container_nodes, counter_exp=objects_metric_counter, command="frostfs_node_engine_objects_total", type="user", ) check_metrics_counter( container_nodes, counter_exp=metric_step, command="frostfs_node_engine_container_objects_total", cid=cid, type="user", ) with reporter.step("Tick Epoch"): self.tick_epochs(epochs_to_tick=2) with reporter.step(f"Check metric user 'the counter should decrease by {metric_step}'"): objects_metric_counter -= metric_step check_metrics_counter( container_nodes, counter_exp=objects_metric_counter, command="frostfs_node_engine_objects_total", type="user", ) check_metrics_counter( container_nodes, counter_exp=0, command="frostfs_node_engine_container_objects_total", cid=cid, type="user", ) @allure.title("Object metrics, stop the node (obj_size={object_size})") def test_object_metrics_stop_node( self, object_size: ObjectSize, default_wallet: WalletInfo, cluster_state_controller: ClusterStateController, ): placement_policy = "REP 2 IN X CBF 2 SELECT 2 FROM * AS X" file_path = generate_file(object_size.value) copies = 2 with reporter.step(f"Create container with policy {placement_policy}"): cid = create_container(default_wallet, self.shell, self.cluster.default_rpc_endpoint, placement_policy) with reporter.step(f"Check object metrics in container 'should be zero'"): check_metrics_counter( self.cluster.cluster_nodes, counter_exp=0, command="frostfs_node_engine_container_objects_total", type="user", cid=cid, ) with reporter.step("Get current metrics for each nodes"): objects_metric_counter: dict[ClusterNode:int] = {} for node in self.cluster.cluster_nodes: objects_metric_counter[node] = get_metrics_value( node, command="frostfs_node_engine_objects_total", type="user" ) with reporter.step("Put object"): oid = put_object(default_wallet, file_path, cid, self.shell, self.cluster.default_rpc_endpoint) with reporter.step("Get object nodes"): object_storage_nodes = get_nodes_with_object(cid, oid, self.shell, self.cluster.storage_nodes) object_nodes = [ cluster_node for cluster_node in self.cluster.cluster_nodes if cluster_node.storage_node in object_storage_nodes ] with reporter.step(f"Check metrics in object nodes 'the counter should increase by {copies}'"): counter_exp = sum(objects_metric_counter[node] for node in object_nodes) + copies check_metrics_counter( object_nodes, counter_exp=counter_exp, command="frostfs_node_engine_objects_total", type="user" ) check_metrics_counter( object_nodes, counter_exp=copies, command="frostfs_node_engine_container_objects_total", type="user", cid=cid, ) with reporter.step(f"Select node to stop"): node_to_stop = random.choice(object_nodes) alive_nodes = set(object_nodes).difference({node_to_stop}) with reporter.step(f"Stop the node, wait until the object is replicated to another node"): cluster_state_controller.stop_node_host(node_to_stop, "hard") objects_metric_counter[node_to_stop] += 1 with reporter.step(f"Check metric in alive nodes 'the counter should increase'"): counter_exp = sum(objects_metric_counter[node] for node in alive_nodes) check_metrics_counter( alive_nodes, ">=", counter_exp, command="frostfs_node_engine_objects_total", type="user" ) with reporter.step("Start node"): cluster_state_controller.start_node_host(node_to_stop) with reporter.step(f"Check metric in restarted node, 'the counter doesn't change'"): check_metrics_counter( object_nodes, counter_exp=copies, command="frostfs_node_engine_container_objects_total", type="user", cid=cid, )