214 lines
12 KiB
Python
214 lines
12 KiB
Python
import math
|
|
|
|
import allure
|
|
from frostfs_testlib.testing.parallel import parallel
|
|
import pytest
|
|
from frostfs_testlib import reporter
|
|
from frostfs_testlib.steps.cli.container import create_container, delete_container, search_nodes_with_container, wait_for_container_deletion
|
|
from frostfs_testlib.steps.cli.object import delete_object, head_object, put_object_to_random_node
|
|
from frostfs_testlib.steps.metrics import calc_metrics_count_from_stdout, check_metrics_counter, get_metrics_value
|
|
from frostfs_testlib.steps.storage_policy import get_nodes_with_object
|
|
from frostfs_testlib.storage.cluster import Cluster, ClusterNode
|
|
from frostfs_testlib.storage.dataclasses.object_size import ObjectSize
|
|
from frostfs_testlib.storage.dataclasses.wallet import WalletInfo
|
|
from frostfs_testlib.testing.cluster_test_base import ClusterTestBase
|
|
from frostfs_testlib.utils.file_utils import generate_file
|
|
|
|
from ...helpers.utility import are_numbers_similar
|
|
|
|
|
|
@pytest.mark.nightly
|
|
@pytest.mark.container
|
|
class TestContainerMetrics(ClusterTestBase):
|
|
@reporter.step("Put object to container: {cid}")
|
|
def put_object_parallel(self, file_path: str, wallet: WalletInfo, cid: str):
|
|
oid = put_object_to_random_node(wallet, file_path, cid, self.shell, self.cluster)
|
|
return oid
|
|
|
|
@reporter.step("Get metrics value from node")
|
|
def get_metrics_search_by_greps_parallel(self, node: ClusterNode, **greps):
|
|
try:
|
|
content_stdout = node.metrics.storage.get_metrics_search_by_greps(greps)
|
|
return calc_metrics_count_from_stdout(content_stdout)
|
|
except Exception as e:
|
|
return None
|
|
|
|
@allure.title("Container metrics (obj_size={object_size},policy={policy})")
|
|
@pytest.mark.parametrize("placement_policy, policy", [("REP 2 IN X CBF 2 SELECT 2 FROM * AS X", "REP"), ("EC 1.1 CBF 1", "EC")])
|
|
def test_container_metrics(
|
|
self,
|
|
object_size: ObjectSize,
|
|
max_object_size: int,
|
|
default_wallet: WalletInfo,
|
|
cluster: Cluster,
|
|
placement_policy: str,
|
|
policy: str,
|
|
):
|
|
file_path = generate_file(object_size.value)
|
|
copies = 2 if policy == "REP" else 1
|
|
object_chunks = 1
|
|
link_object = 0
|
|
|
|
with reporter.step(f"Create container with policy {placement_policy}"):
|
|
cid = create_container(default_wallet, self.shell, cluster.default_rpc_endpoint, placement_policy)
|
|
|
|
if object_size.value > max_object_size:
|
|
object_chunks = math.ceil(object_size.value / max_object_size)
|
|
link_object = len(search_nodes_with_container(default_wallet, cid, self.shell, cluster.default_rpc_endpoint, cluster))
|
|
|
|
with reporter.step("Put object to random node"):
|
|
oid = put_object_to_random_node(
|
|
wallet=default_wallet,
|
|
path=file_path,
|
|
cid=cid,
|
|
shell=self.shell,
|
|
cluster=cluster,
|
|
)
|
|
|
|
with reporter.step("Get object nodes"):
|
|
object_storage_nodes = get_nodes_with_object(cid, oid, self.shell, cluster.storage_nodes)
|
|
object_nodes = [cluster_node for cluster_node in cluster.cluster_nodes if cluster_node.storage_node in object_storage_nodes]
|
|
|
|
with reporter.step("Check metric appears in node where the object is located"):
|
|
count_metrics = (object_chunks * copies) + link_object
|
|
if policy == "EC":
|
|
count_metrics = (object_chunks * 2) + link_object
|
|
check_metrics_counter(object_nodes, counter_exp=count_metrics, command="container_objects_total", cid=cid, type="phy")
|
|
check_metrics_counter(object_nodes, counter_exp=count_metrics, command="container_objects_total", cid=cid, type="logic")
|
|
check_metrics_counter(object_nodes, counter_exp=copies, command="container_objects_total", cid=cid, type="user")
|
|
|
|
with reporter.step("Delete file, wait until gc remove object"):
|
|
delete_object(default_wallet, cid, oid, self.shell, cluster.default_rpc_endpoint)
|
|
|
|
with reporter.step(f"Check container metrics 'the counter should equal {len(object_nodes)}' in object nodes"):
|
|
check_metrics_counter(object_nodes, counter_exp=len(object_nodes), command="container_objects_total", cid=cid, type="phy")
|
|
check_metrics_counter(object_nodes, counter_exp=len(object_nodes), command="container_objects_total", cid=cid, type="logic")
|
|
check_metrics_counter(object_nodes, counter_exp=0, command="container_objects_total", cid=cid, type="user")
|
|
|
|
with reporter.step("Check metrics(Phy, Logic, User) in each nodes"):
|
|
# Phy and Logic metrics are 4, because in rule 'CBF 2 SELECT 2 FROM', cbf2*sel2=4
|
|
expect_metrics = 4 if policy == "REP" else 2
|
|
check_metrics_counter(cluster.cluster_nodes, counter_exp=expect_metrics, command="container_objects_total", cid=cid, type="phy")
|
|
check_metrics_counter(
|
|
cluster.cluster_nodes, counter_exp=expect_metrics, command="container_objects_total", cid=cid, type="logic"
|
|
)
|
|
check_metrics_counter(cluster.cluster_nodes, counter_exp=0, command="container_objects_total", cid=cid, type="user")
|
|
|
|
@allure.title("Container size metrics (obj_size={object_size},policy={policy})")
|
|
@pytest.mark.parametrize("placement_policy, policy", [("REP 2 IN X CBF 2 SELECT 2 FROM * AS X", "REP"), ("EC 1.1 CBF 1", "EC")])
|
|
def test_container_size_metrics(
|
|
self,
|
|
object_size: ObjectSize,
|
|
default_wallet: WalletInfo,
|
|
placement_policy: str,
|
|
policy: str,
|
|
):
|
|
file_path = generate_file(object_size.value)
|
|
|
|
with reporter.step(f"Create container with policy {policy}"):
|
|
cid = create_container(default_wallet, self.shell, self.cluster.default_rpc_endpoint, placement_policy)
|
|
|
|
with reporter.step("Put object to random node"):
|
|
oid = put_object_to_random_node(
|
|
wallet=default_wallet,
|
|
path=file_path,
|
|
cid=cid,
|
|
shell=self.shell,
|
|
cluster=self.cluster,
|
|
)
|
|
|
|
with reporter.step("Get object nodes"):
|
|
object_storage_nodes = get_nodes_with_object(cid, oid, self.shell, self.cluster.storage_nodes)
|
|
object_nodes = [
|
|
cluster_node for cluster_node in self.cluster.cluster_nodes if cluster_node.storage_node in object_storage_nodes
|
|
]
|
|
|
|
with reporter.step("Check metric appears in all node where the object is located"):
|
|
act_metric = sum(
|
|
[get_metrics_value(node, command="frostfs_node_engine_container_size_bytes", cid=cid) for node in object_nodes]
|
|
)
|
|
assert (act_metric // 2) == object_size.value
|
|
|
|
with reporter.step("Delete file, wait until gc remove object"):
|
|
id_tombstone = delete_object(default_wallet, cid, oid, self.shell, self.cluster.default_rpc_endpoint)
|
|
tombstone = head_object(default_wallet, cid, id_tombstone, self.shell, self.cluster.default_rpc_endpoint)
|
|
|
|
with reporter.step(f"Check container size metrics"):
|
|
act_metric = get_metrics_value(object_nodes[0], command="frostfs_node_engine_container_size_bytes", cid=cid)
|
|
assert act_metric == int(tombstone["header"]["payloadLength"])
|
|
|
|
@allure.title("Container size metrics put {objects_count} objects (obj_size={object_size})")
|
|
@pytest.mark.parametrize("objects_count", [5, 10, 20])
|
|
def test_container_size_metrics_more_objects(
|
|
self,
|
|
object_size: ObjectSize,
|
|
default_wallet: WalletInfo,
|
|
objects_count: int
|
|
):
|
|
with reporter.step(f"Create container"):
|
|
cid = create_container(default_wallet, self.shell, self.cluster.default_rpc_endpoint)
|
|
|
|
with reporter.step(f"Put {objects_count} objects"):
|
|
files_path = [generate_file(object_size.value) for _ in range(objects_count)]
|
|
futures = parallel(self.put_object_parallel, files_path, wallet=default_wallet, cid=cid)
|
|
oids = [future.result() for future in futures]
|
|
|
|
with reporter.step("Check metric appears in all nodes"):
|
|
metric_values = [get_metrics_value(node, command="frostfs_node_engine_container_size_bytes", cid=cid) for node in self.cluster.cluster_nodes]
|
|
actual_value = sum(metric_values) // 2 # for policy REP 2, value divide by 2
|
|
expected_value = object_size.value * objects_count
|
|
assert are_numbers_similar(actual_value, expected_value, tolerance_percentage=2), "metric container size bytes value not correct"
|
|
|
|
with reporter.step("Delete file, wait until gc remove object"):
|
|
tombstones_size = 0
|
|
for oid in oids:
|
|
tombstone_id = delete_object(default_wallet, cid, oid, self.shell, self.cluster.default_rpc_endpoint)
|
|
tombstone = head_object(default_wallet, cid, tombstone_id, self.shell, self.cluster.default_rpc_endpoint)
|
|
tombstones_size += int(tombstone["header"]["payloadLength"])
|
|
|
|
with reporter.step(f"Check container size metrics, 'should be positive in all nodes'"):
|
|
futures = parallel(get_metrics_value, self.cluster.cluster_nodes, command="frostfs_node_engine_container_size_bytes", cid=cid)
|
|
metrics_value_nodes = [future.result() for future in futures]
|
|
for act_metric in metrics_value_nodes:
|
|
assert act_metric >= 0, "Metrics value is negative"
|
|
assert sum(metrics_value_nodes) // len(self.cluster.cluster_nodes) == tombstones_size, "tomstone size of objects not correct"
|
|
|
|
|
|
@allure.title("Container metrics (policy={policy})")
|
|
@pytest.mark.parametrize("placement_policy, policy", [("REP 2 IN X CBF 2 SELECT 2 FROM * AS X", "REP"), ("EC 1.1 CBF 1", "EC")])
|
|
def test_container_metrics_delete_complex_objects(
|
|
self,
|
|
complex_object_size: ObjectSize,
|
|
default_wallet: WalletInfo,
|
|
cluster: Cluster,
|
|
placement_policy: str,
|
|
policy: str
|
|
):
|
|
copies = 2 if policy == "REP" else 1
|
|
objects_count = 2
|
|
metric_name = "frostfs_node_engine_container_objects_total"
|
|
with reporter.step(f"Create container"):
|
|
cid = create_container(default_wallet, self.shell, cluster.default_rpc_endpoint, rule=placement_policy)
|
|
|
|
with reporter.step(f"Put {objects_count} objects"):
|
|
files_path = [generate_file(complex_object_size.value) for _ in range(objects_count)]
|
|
futures = parallel(self.put_object_parallel, files_path, wallet=default_wallet, cid=cid)
|
|
oids = [future.result() for future in futures]
|
|
|
|
with reporter.step(f"Check metrics value in each nodes, should be {objects_count} for 'user'"):
|
|
check_metrics_counter(cluster.cluster_nodes, counter_exp=objects_count * copies, command=metric_name, cid=cid, type="user")
|
|
|
|
with reporter.step("Delete objects and container"):
|
|
for oid in oids:
|
|
delete_object(default_wallet, cid, oid, self.shell, cluster.default_rpc_endpoint)
|
|
|
|
delete_container(default_wallet, cid, self.shell, cluster.default_rpc_endpoint)
|
|
|
|
with reporter.step("Tick epoch and check container was deleted"):
|
|
self.tick_epoch()
|
|
wait_for_container_deletion(default_wallet, cid, shell=self.shell, endpoint=cluster.default_rpc_endpoint)
|
|
|
|
with reporter.step(f"Check metrics value in each nodes, should not be show any result"):
|
|
futures = parallel(self.get_metrics_search_by_greps_parallel, cluster.cluster_nodes, command=metric_name, cid=cid)
|
|
metrics_results = [future.result() for future in futures if future.result() is not None]
|
|
assert len(metrics_results) == 0, f"Metrics value is not empty in Prometheus, actual value in nodes: {metrics_results}"
|