import math import allure import pytest from frostfs_testlib import reporter from frostfs_testlib.steps.cli.container import delete_container, search_nodes_with_container, wait_for_container_deletion from frostfs_testlib.steps.cli.object import delete_object, head_object, put_object_to_random_node from frostfs_testlib.steps.metrics import calc_metrics_count_from_stdout, check_metrics_counter, get_metrics_value from frostfs_testlib.steps.storage_policy import get_nodes_with_object from frostfs_testlib.storage.cluster import Cluster, ClusterNode from frostfs_testlib.storage.dataclasses.object_size import ObjectSize from frostfs_testlib.storage.dataclasses.wallet import WalletInfo from frostfs_testlib.testing.cluster_test_base import ClusterTestBase from frostfs_testlib.testing.parallel import parallel from frostfs_testlib.utils.file_utils import TestFile, generate_file from ...helpers.container_request import PUBLIC_WITH_POLICY, ContainerRequest, requires_container from ...helpers.utility import are_numbers_similar @pytest.mark.nightly @pytest.mark.metrics class TestContainerMetrics(ClusterTestBase): @reporter.step("Put object to container: {cid}") def put_object_parallel(self, file_path: str, wallet: WalletInfo, cid: str): oid = put_object_to_random_node(wallet, file_path, cid, self.shell, self.cluster) return oid @reporter.step("Get metrics value from node") def get_metrics_search_by_greps_parallel(self, node: ClusterNode, **greps): try: content_stdout = node.metrics.storage.get_metrics_search_by_greps(greps) return calc_metrics_count_from_stdout(content_stdout) except Exception as e: return None @allure.title("Container metrics (obj_size={object_size}, policy={container_request})") @pytest.mark.parametrize( "container_request, copies", [ (PUBLIC_WITH_POLICY("REP 2 IN X CBF 2 SELECT 2 FROM * AS X", short_name="REP"), 2), (PUBLIC_WITH_POLICY("EC 1.1 CBF 1", short_name="EC"), 1), ], indirect=["container_request"], ) def test_container_metrics( self, object_size: ObjectSize, max_object_size: int, default_wallet: WalletInfo, cluster: Cluster, copies: int, container: str, test_file: TestFile, container_request: ContainerRequest, ): object_chunks = 1 link_object = 0 if object_size.value > max_object_size: object_chunks = math.ceil(object_size.value / max_object_size) link_object = len(search_nodes_with_container(default_wallet, container, self.shell, cluster.default_rpc_endpoint, cluster)) with reporter.step("Put object to random node"): oid = put_object_to_random_node(default_wallet, test_file.path, container, self.shell, cluster) with reporter.step("Get object nodes"): object_storage_nodes = get_nodes_with_object(container, oid, self.shell, cluster.storage_nodes) object_nodes = [cluster_node for cluster_node in cluster.cluster_nodes if cluster_node.storage_node in object_storage_nodes] with reporter.step("Check metric appears in node where the object is located"): count_metrics = (object_chunks * copies) + link_object if container_request.short_name == "EC": count_metrics = (object_chunks * 2) + link_object check_metrics_counter(object_nodes, counter_exp=count_metrics, command="container_objects_total", cid=container, type="phy") check_metrics_counter(object_nodes, counter_exp=count_metrics, command="container_objects_total", cid=container, type="logic") check_metrics_counter(object_nodes, counter_exp=copies, command="container_objects_total", cid=container, type="user") with reporter.step("Delete file, wait until gc remove object"): delete_object(default_wallet, container, oid, self.shell, cluster.default_rpc_endpoint) with reporter.step(f"Check container metrics 'the counter should equal {len(object_nodes)}' in object nodes"): check_metrics_counter(object_nodes, counter_exp=len(object_nodes), command="container_objects_total", cid=container, type="phy") check_metrics_counter( object_nodes, counter_exp=len(object_nodes), command="container_objects_total", cid=container, type="logic" ) check_metrics_counter(object_nodes, counter_exp=0, command="container_objects_total", cid=container, type="user") with reporter.step("Check metrics(Phy, Logic, User) in each nodes"): # Phy and Logic metrics are x2, because in rule 'CBF 2 SELECT 2 FROM', cbf2*sel2=4 expect_metrics = copies * 2 check_metrics_counter( cluster.cluster_nodes, counter_exp=expect_metrics, command="container_objects_total", cid=container, type="phy" ) check_metrics_counter( cluster.cluster_nodes, counter_exp=expect_metrics, command="container_objects_total", cid=container, type="logic" ) check_metrics_counter(cluster.cluster_nodes, counter_exp=0, command="container_objects_total", cid=container, type="user") @allure.title("Container size metrics (obj_size={object_size}, policy={container_request})") @requires_container( [PUBLIC_WITH_POLICY("REP 2 IN X CBF 2 SELECT 2 FROM * AS X", short_name="REP"), PUBLIC_WITH_POLICY("EC 1.1 CBF 1", short_name="EC")] ) def test_container_size_metrics( self, object_size: ObjectSize, default_wallet: WalletInfo, test_file: TestFile, container: str, ): with reporter.step("Put object to random node"): oid = put_object_to_random_node(default_wallet, test_file.path, container, self.shell, self.cluster) with reporter.step("Get object nodes"): object_storage_nodes = get_nodes_with_object(container, oid, self.shell, self.cluster.storage_nodes) object_nodes = [ cluster_node for cluster_node in self.cluster.cluster_nodes if cluster_node.storage_node in object_storage_nodes ] with reporter.step("Check metric appears in all node where the object is located"): act_metric = sum( [get_metrics_value(node, command="frostfs_node_engine_container_size_bytes", cid=container) for node in object_nodes] ) assert (act_metric // 2) == object_size.value with reporter.step("Delete file, wait until gc remove object"): id_tombstone = delete_object(default_wallet, container, oid, self.shell, self.cluster.default_rpc_endpoint) tombstone = head_object(default_wallet, container, id_tombstone, self.shell, self.cluster.default_rpc_endpoint) with reporter.step(f"Check container size metrics"): act_metric = get_metrics_value(object_nodes[0], command="frostfs_node_engine_container_size_bytes", cid=container) assert act_metric == int(tombstone["header"]["payloadLength"]) @allure.title("Container size metrics put {objects_count} objects (obj_size={object_size})") @pytest.mark.parametrize("objects_count", [5, 10, 20]) @requires_container def test_container_size_metrics_more_objects( self, object_size: ObjectSize, default_wallet: WalletInfo, objects_count: int, container: str ): with reporter.step(f"Put {objects_count} objects"): files_path = [generate_file(object_size.value) for _ in range(objects_count)] futures = parallel(self.put_object_parallel, files_path, wallet=default_wallet, cid=container) oids = [future.result() for future in futures] with reporter.step("Check metric appears in all nodes"): metric_values = [ get_metrics_value(node, command="frostfs_node_engine_container_size_bytes", cid=container) for node in self.cluster.cluster_nodes ] actual_value = sum(metric_values) // 2 # for policy REP 2, value divide by 2 expected_value = object_size.value * objects_count assert are_numbers_similar( actual_value, expected_value, tolerance_percentage=2 ), "metric container size bytes value not correct" with reporter.step("Delete file, wait until gc remove object"): tombstones_size = 0 for oid in oids: tombstone_id = delete_object(default_wallet, container, oid, self.shell, self.cluster.default_rpc_endpoint) tombstone = head_object(default_wallet, container, tombstone_id, self.shell, self.cluster.default_rpc_endpoint) tombstones_size += int(tombstone["header"]["payloadLength"]) with reporter.step(f"Check container size metrics, 'should be positive in all nodes'"): futures = parallel( get_metrics_value, self.cluster.cluster_nodes, command="frostfs_node_engine_container_size_bytes", cid=container ) metrics_value_nodes = [future.result() for future in futures] for act_metric in metrics_value_nodes: assert act_metric >= 0, "Metrics value is negative" assert sum(metrics_value_nodes) // len(self.cluster.cluster_nodes) == tombstones_size, "tomstone size of objects not correct" @allure.title("Container metrics (policy={container_request})") @pytest.mark.parametrize( "container_request, copies", [ (PUBLIC_WITH_POLICY("REP 2 IN X CBF 2 SELECT 2 FROM * AS X", short_name="REP"), 2), (PUBLIC_WITH_POLICY("EC 1.1 CBF 1", short_name="EC"), 1), ], indirect=["container_request"], ) def test_container_metrics_delete_complex_objects( self, complex_object_size: ObjectSize, default_wallet: WalletInfo, cluster: Cluster, copies: int, container: str, container_request: ContainerRequest, ): objects_count = 2 metric_name = "frostfs_node_engine_container_objects_total" with reporter.step(f"Put {objects_count} objects"): files_path = [generate_file(complex_object_size.value) for _ in range(objects_count)] futures = parallel(self.put_object_parallel, files_path, wallet=default_wallet, cid=container) oids = [future.result() for future in futures] with reporter.step(f"Check metrics value in each nodes, should be {objects_count} for 'user'"): check_metrics_counter( cluster.cluster_nodes, counter_exp=objects_count * copies, command=metric_name, cid=container, type="user" ) with reporter.step("Delete objects and container"): for oid in oids: delete_object(default_wallet, container, oid, self.shell, cluster.default_rpc_endpoint) delete_container(default_wallet, container, self.shell, cluster.default_rpc_endpoint) with reporter.step("Tick epoch and check container was deleted"): self.tick_epoch() wait_for_container_deletion(default_wallet, container, shell=self.shell, endpoint=cluster.default_rpc_endpoint) with reporter.step(f"Check metrics value in each nodes, should not be show any result"): futures = parallel(self.get_metrics_search_by_greps_parallel, cluster.cluster_nodes, command=metric_name, cid=container) metrics_results = [future.result() for future in futures if future.result() is not None] assert len(metrics_results) == 0, f"Metrics value is not empty in Prometheus, actual value in nodes: {metrics_results}"