import random import re import allure import pytest from frostfs_testlib import reporter from frostfs_testlib.steps.cli.container import create_container from frostfs_testlib.steps.cli.object import delete_object, put_object, put_object_to_random_node from frostfs_testlib.steps.metrics import check_metrics_counter, get_metrics_value from frostfs_testlib.steps.storage_policy import get_nodes_with_object from frostfs_testlib.storage.cluster import Cluster, ClusterNode from frostfs_testlib.storage.dataclasses.object_size import ObjectSize from frostfs_testlib.storage.dataclasses.wallet import WalletInfo from frostfs_testlib.testing.cluster_test_base import ClusterTestBase from frostfs_testlib.testing.test_control import wait_for_success from frostfs_testlib.utils.file_utils import generate_file class TestGarbageCollectorMetrics(ClusterTestBase): @wait_for_success(interval=10) def check_metrics_in_node(self, cluster_node: ClusterNode, counter_exp: int, **metrics_greps: str): counter_act = 0 try: metric_result = cluster_node.metrics.storage.get_metrics_search_by_greps(**metrics_greps) counter_act += self.calc_metrics_count_from_stdout(metric_result.stdout) except RuntimeError as e: ... assert counter_act == counter_exp, f"Expected: {counter_exp}, Actual: {counter_act} in node: {cluster_node}" @staticmethod def calc_metrics_count_from_stdout(metric_result_stdout: str): result = re.findall(r"}\s(\d+)", metric_result_stdout) return sum(map(int, result)) @allure.title("Garbage collector expire_at object") def test_garbage_collector_metrics_expire_at_object(self, simple_object_size: ObjectSize, default_wallet: WalletInfo, cluster: Cluster): file_path = generate_file(simple_object_size.value) placement_policy = "REP 2 IN X CBF 2 SELECT 2 FROM * AS X" metrics_step = 1 with reporter.step("Get current garbage collector metrics for each nodes"): metrics_counter = {} for node in cluster.cluster_nodes: metrics_counter[node] = get_metrics_value(node, command="frostfs_node_garbage_collector_marked_for_removal_objects_total") with reporter.step(f"Create container with policy {placement_policy}"): cid = create_container(default_wallet, self.shell, cluster.default_rpc_endpoint, placement_policy) with reporter.step("Put object to random node with expire_at"): current_epoch = self.get_epoch() oid = put_object_to_random_node( default_wallet, file_path, cid, self.shell, cluster, expire_at=current_epoch + 1, ) with reporter.step("Get object nodes"): object_storage_nodes = get_nodes_with_object(cid, oid, self.shell, cluster.storage_nodes) object_nodes = [cluster_node for cluster_node in cluster.cluster_nodes if cluster_node.storage_node in object_storage_nodes] with reporter.step("Tick Epoch"): self.tick_epochs(epochs_to_tick=2, wait_block=2) with reporter.step(f"Check garbage collector metrics 'the counter should increase by {metrics_step}' in object nodes"): for node in object_nodes: metrics_counter[node] += metrics_step for node, counter in metrics_counter.items(): check_metrics_counter( [node], counter_exp=counter, command="frostfs_node_garbage_collector_marked_for_removal_objects_total", ) @allure.title("Garbage collector delete object") def test_garbage_collector_metrics_deleted_objects(self, simple_object_size: ObjectSize, default_wallet: WalletInfo, cluster: Cluster): file_path = generate_file(simple_object_size.value) placement_policy = "REP 2 IN X CBF 2 SELECT 2 FROM * AS X" metrics_step = 1 with reporter.step("Get current garbage collector metrics for each nodes"): metrics_counter = {} for node in cluster.cluster_nodes: metrics_counter[node] = get_metrics_value(node, command="frostfs_node_garbage_collector_deleted_objects_total") with reporter.step(f"Create container with policy {placement_policy}"): cid = create_container(default_wallet, self.shell, node.storage_node.get_rpc_endpoint(), placement_policy) with reporter.step("Put object to random node"): oid = put_object_to_random_node( default_wallet, file_path, cid, self.shell, cluster, ) with reporter.step("Get object nodes"): object_storage_nodes = get_nodes_with_object(cid, oid, self.shell, cluster.storage_nodes) object_nodes = [cluster_node for cluster_node in cluster.cluster_nodes if cluster_node.storage_node in object_storage_nodes] with reporter.step("Delete file, wait until gc remove object"): delete_object(default_wallet, cid, oid, self.shell, node.storage_node.get_rpc_endpoint()) with reporter.step(f"Check garbage collector metrics 'the counter should increase by {metrics_step}'"): for node in object_nodes: exp_metrics_counter = metrics_counter[node] + metrics_step check_metrics_counter( [node], counter_exp=exp_metrics_counter, command="frostfs_node_garbage_collector_deleted_objects_total" )