319 lines
14 KiB
Python
319 lines
14 KiB
Python
|
import random
|
||
|
import re
|
||
|
|
||
|
import allure
|
||
|
import pytest
|
||
|
from frostfs_testlib import reporter
|
||
|
from frostfs_testlib.steps.cli.container import create_container, delete_container, search_nodes_with_container
|
||
|
from frostfs_testlib.steps.cli.object import (
|
||
|
delete_object,
|
||
|
get_object_nodes,
|
||
|
lock_object,
|
||
|
put_object,
|
||
|
put_object_to_random_node,
|
||
|
)
|
||
|
from frostfs_testlib.storage.cluster import Cluster, ClusterNode
|
||
|
from frostfs_testlib.storage.controllers.cluster_state_controller import ClusterStateController
|
||
|
from frostfs_testlib.storage.dataclasses.object_size import ObjectSize
|
||
|
from frostfs_testlib.storage.dataclasses.wallet import WalletInfo
|
||
|
from frostfs_testlib.testing.cluster_test_base import ClusterTestBase
|
||
|
from frostfs_testlib.testing.test_control import wait_for_success
|
||
|
from frostfs_testlib.utils.file_utils import generate_file
|
||
|
|
||
|
|
||
|
class TestObjectMetrics(ClusterTestBase):
|
||
|
@wait_for_success(interval=10)
|
||
|
def check_metrics_by_type(
|
||
|
self, cluster_nodes: list[ClusterNode], metric_command: str, grep_by: str, metric_type: str, counter_exp: int
|
||
|
):
|
||
|
counter_act = 0
|
||
|
for cluster_node in cluster_nodes:
|
||
|
try:
|
||
|
metric_result = cluster_node.metrics.storage.get_metrics_search_by_greps(
|
||
|
command=metric_command, grep_by=grep_by
|
||
|
)
|
||
|
counter_act += self.calc_metrics_count_from_stdout(metric_result.stdout, metric_type)
|
||
|
except RuntimeError as e:
|
||
|
...
|
||
|
assert (
|
||
|
counter_act == counter_exp
|
||
|
), f"Expected metric {metric_type}={counter_exp}, Actual: {counter_act} in nodes: {cluster_nodes}"
|
||
|
|
||
|
@staticmethod
|
||
|
def calc_metrics_count_from_stdout(metric_result_stdout: str, metric_type: str):
|
||
|
result = re.findall(rf'type="{metric_type}"}}\s(\d+)', metric_result_stdout)
|
||
|
return sum(map(int, result))
|
||
|
|
||
|
@wait_for_success(interval=10)
|
||
|
def check_object_metrics_total_and_container(
|
||
|
self, cluster_nodes: list[ClusterNode], cid: str, objects_metric_total: int, objects_metric_container: int
|
||
|
):
|
||
|
self.check_metrics_by_type(
|
||
|
cluster_nodes,
|
||
|
"frostfs_node_engine_objects_total",
|
||
|
grep_by="user",
|
||
|
metric_type="user",
|
||
|
counter_exp=objects_metric_total,
|
||
|
)
|
||
|
|
||
|
objects_metric_container_act = 0
|
||
|
for node in cluster_nodes:
|
||
|
try:
|
||
|
metrics_container = node.metrics.storage.get_metrics_search_by_greps(
|
||
|
command="frostfs_node_engine_container_objects_total", cid=cid, type="user"
|
||
|
)
|
||
|
objects_metric_container_act += self.calc_metrics_count_from_stdout(
|
||
|
metrics_container.stdout, metric_type="user"
|
||
|
)
|
||
|
except RuntimeError as e:
|
||
|
...
|
||
|
assert (
|
||
|
objects_metric_container_act == objects_metric_container
|
||
|
), f"Expected {objects_metric_container} objects in container"
|
||
|
|
||
|
@wait_for_success(max_wait_time=120, interval=10)
|
||
|
def check_object_metrics_container(
|
||
|
self, cluster_nodes: list[ClusterNode], cid: str, objects_metric_container_exp: int
|
||
|
):
|
||
|
objects_metric_container_act = 0
|
||
|
for node in cluster_nodes:
|
||
|
try:
|
||
|
metrics_container = node.metrics.storage.get_metrics_search_by_greps(
|
||
|
command="frostfs_node_engine_container_objects_total", cid=cid, type="user"
|
||
|
)
|
||
|
objects_metric_container_act += self.calc_metrics_count_from_stdout(
|
||
|
metrics_container.stdout, metric_type="user"
|
||
|
)
|
||
|
except RuntimeError as e:
|
||
|
...
|
||
|
assert (
|
||
|
objects_metric_container_act == objects_metric_container_exp
|
||
|
), f"Expected {objects_metric_container_exp} objects in container"
|
||
|
|
||
|
@allure.title("Object metrics of removed container")
|
||
|
def test_object_metrics_removed_container(
|
||
|
self, object_size: ObjectSize, default_wallet: WalletInfo, cluster: Cluster
|
||
|
):
|
||
|
file_path = generate_file(object_size.value)
|
||
|
placement_policy = "REP 2 IN X CBF 2 SELECT 2 FROM * AS X"
|
||
|
copies = 2
|
||
|
|
||
|
with reporter.step(f"Create container with policy {placement_policy}"):
|
||
|
cid = create_container(
|
||
|
default_wallet,
|
||
|
rule=placement_policy,
|
||
|
shell=self.shell,
|
||
|
endpoint=self.cluster.default_rpc_endpoint,
|
||
|
)
|
||
|
|
||
|
with reporter.step("Put object to random node"):
|
||
|
storage_object_id = put_object_to_random_node(
|
||
|
wallet=default_wallet,
|
||
|
path=file_path,
|
||
|
cid=cid,
|
||
|
shell=self.shell,
|
||
|
cluster=cluster,
|
||
|
)
|
||
|
|
||
|
with reporter.step("Check metric appears in node where the object is located"):
|
||
|
object_nodes = get_object_nodes(
|
||
|
cluster=cluster, cid=cid, oid=storage_object_id, alive_node=cluster.cluster_nodes[0]
|
||
|
)
|
||
|
|
||
|
self.check_metrics_by_type(object_nodes, "frostfs_node_engine_container_objects_total", cid, "user", copies)
|
||
|
|
||
|
with reporter.step("Delete container"):
|
||
|
delete_container(default_wallet, cid, shell=self.shell, endpoint=self.cluster.default_rpc_endpoint)
|
||
|
|
||
|
with reporter.step("Tick Epoch"):
|
||
|
self.tick_epochs(epochs_to_tick=2, wait_block=2)
|
||
|
|
||
|
with reporter.step("Check metrics of removed containers doesn't appear in the storage node"):
|
||
|
self.check_metrics_by_type(object_nodes, "frostfs_node_engine_container_objects_total", cid, "user", 0)
|
||
|
|
||
|
for node in object_nodes:
|
||
|
with pytest.raises(RuntimeError):
|
||
|
node.metrics.storage.get_metric_container(f"frostfs_node_engine_container_size_byte", cid)
|
||
|
|
||
|
@allure.title("Object metrics, locked object, (policy={placement_policy})")
|
||
|
@pytest.mark.parametrize(
|
||
|
"placement_policy", ["REP 1 IN X CBF 1 SELECT 1 FROM * AS X", "REP 2 IN X CBF 2 SELECT 2 FROM * AS X"]
|
||
|
)
|
||
|
def test_object_metrics_blocked_object(
|
||
|
self, object_size: ObjectSize, default_wallet: WalletInfo, cluster: Cluster, placement_policy: str
|
||
|
):
|
||
|
file_path = generate_file(object_size.value)
|
||
|
metric_step = int(re.search(r"REP\s(\d+)", placement_policy).group(1))
|
||
|
|
||
|
with reporter.step(f"Create container with policy {placement_policy}"):
|
||
|
cid = create_container(
|
||
|
wallet=default_wallet,
|
||
|
rule=placement_policy,
|
||
|
shell=self.shell,
|
||
|
endpoint=self.cluster.default_rpc_endpoint,
|
||
|
)
|
||
|
|
||
|
with reporter.step("Search container nodes"):
|
||
|
container_nodes = search_nodes_with_container(
|
||
|
wallet=default_wallet,
|
||
|
cid=cid,
|
||
|
shell=self.shell,
|
||
|
endpoint=self.cluster.default_rpc_endpoint,
|
||
|
cluster=cluster,
|
||
|
)
|
||
|
|
||
|
with reporter.step("Get current metrics for metric_type=user"):
|
||
|
objects_metric_counter = 0
|
||
|
for node in container_nodes:
|
||
|
metric_objects_total = node.metrics.storage.get_metrics_search_by_greps(
|
||
|
command="frostfs_node_engine_objects_total", type="user"
|
||
|
)
|
||
|
objects_metric_counter += self.calc_metrics_count_from_stdout(
|
||
|
metric_objects_total.stdout, metric_type="user"
|
||
|
)
|
||
|
|
||
|
with reporter.step("Put object to container node"):
|
||
|
oid = put_object(
|
||
|
wallet=default_wallet,
|
||
|
path=file_path,
|
||
|
cid=cid,
|
||
|
shell=self.shell,
|
||
|
endpoint=container_nodes[0].storage_node.get_rpc_endpoint(),
|
||
|
)
|
||
|
|
||
|
with reporter.step(f"Check metric user 'the counter should increase by {metric_step}'"):
|
||
|
objects_metric_counter += metric_step
|
||
|
self.check_object_metrics_total_and_container(container_nodes, cid, objects_metric_counter, metric_step)
|
||
|
|
||
|
with reporter.step("Delete object"):
|
||
|
delete_object(default_wallet, cid, oid, self.shell, self.cluster.default_rpc_endpoint)
|
||
|
|
||
|
with reporter.step(f"Check metric user 'the counter should decrease by {metric_step}'"):
|
||
|
objects_metric_counter -= metric_step
|
||
|
self.check_object_metrics_total_and_container(container_nodes, cid, objects_metric_counter, 0)
|
||
|
|
||
|
with reporter.step("Put object and lock it"):
|
||
|
oid = put_object(
|
||
|
default_wallet, file_path, cid, self.shell, container_nodes[0].storage_node.get_rpc_endpoint()
|
||
|
)
|
||
|
current_epoch = self.get_epoch()
|
||
|
lock_object(
|
||
|
default_wallet,
|
||
|
cid,
|
||
|
oid,
|
||
|
self.shell,
|
||
|
container_nodes[0].storage_node.get_rpc_endpoint(),
|
||
|
expire_at=current_epoch + 1,
|
||
|
)
|
||
|
|
||
|
with reporter.step(f"Check metric user 'the counter should increase by {metric_step}'"):
|
||
|
objects_metric_counter += metric_step
|
||
|
self.check_object_metrics_total_and_container(container_nodes, cid, objects_metric_counter, metric_step)
|
||
|
|
||
|
with reporter.step(f"Wait until remove locking 'the counter doesn't change'"):
|
||
|
self.tick_epochs(epochs_to_tick=2)
|
||
|
self.check_object_metrics_total_and_container(container_nodes, cid, objects_metric_counter, metric_step)
|
||
|
|
||
|
with reporter.step("Delete object"):
|
||
|
delete_object(default_wallet, cid, oid, self.shell, self.cluster.default_rpc_endpoint)
|
||
|
|
||
|
with reporter.step(f"Check metric user 'the counter should decrease by {metric_step}'"):
|
||
|
objects_metric_counter -= metric_step
|
||
|
self.check_object_metrics_total_and_container(container_nodes, cid, objects_metric_counter, 0)
|
||
|
|
||
|
with reporter.step("Put object with expire_at"):
|
||
|
current_epoch = self.get_epoch()
|
||
|
oid = put_object(
|
||
|
default_wallet,
|
||
|
file_path,
|
||
|
cid,
|
||
|
self.shell,
|
||
|
container_nodes[0].storage_node.get_rpc_endpoint(),
|
||
|
expire_at=current_epoch + 1,
|
||
|
)
|
||
|
|
||
|
with reporter.step(f"Check metric user 'the counter should increase by {metric_step}'"):
|
||
|
objects_metric_counter += metric_step
|
||
|
self.check_object_metrics_total_and_container(container_nodes, cid, objects_metric_counter, metric_step)
|
||
|
|
||
|
with reporter.step("Tick Epoch"):
|
||
|
self.tick_epochs(epochs_to_tick=2)
|
||
|
|
||
|
with reporter.step(f"Check metric user 'the counter should decrease by {metric_step}'"):
|
||
|
objects_metric_counter -= metric_step
|
||
|
self.check_object_metrics_total_and_container(container_nodes, cid, objects_metric_counter, 0)
|
||
|
|
||
|
with reporter.step("Delete container"):
|
||
|
delete_container(default_wallet, cid, shell=self.shell, endpoint=self.cluster.default_rpc_endpoint)
|
||
|
|
||
|
@allure.title("Object metrics, stop the node")
|
||
|
def test_object_metrics_stop_node(
|
||
|
self,
|
||
|
object_size: ObjectSize,
|
||
|
max_object_size: int,
|
||
|
default_wallet: WalletInfo,
|
||
|
cluster_state_controller: ClusterStateController,
|
||
|
):
|
||
|
placement_policy = "REP 2 IN X CBF 2 SELECT 2 FROM * AS X"
|
||
|
file_path = generate_file(object_size.value)
|
||
|
copies = 2
|
||
|
|
||
|
with reporter.step(f"Create container with policy {placement_policy}"):
|
||
|
cid = create_container(
|
||
|
wallet=default_wallet,
|
||
|
rule=placement_policy,
|
||
|
shell=self.shell,
|
||
|
endpoint=self.cluster.default_rpc_endpoint,
|
||
|
)
|
||
|
|
||
|
with reporter.step("Search container nodes"):
|
||
|
container_nodes = search_nodes_with_container(
|
||
|
wallet=default_wallet,
|
||
|
cid=cid,
|
||
|
shell=self.shell,
|
||
|
endpoint=self.cluster.default_rpc_endpoint,
|
||
|
cluster=self.cluster,
|
||
|
)
|
||
|
|
||
|
with reporter.step("Get current metrics for container nodes"):
|
||
|
objects_metric_counter = 0
|
||
|
for node in container_nodes:
|
||
|
metric_objects_total = node.metrics.storage.get_metrics_search_by_greps(
|
||
|
command="frostfs_node_engine_objects_total", type="user"
|
||
|
)
|
||
|
objects_metric_counter += self.calc_metrics_count_from_stdout(
|
||
|
metric_objects_total.stdout, metric_type="user"
|
||
|
)
|
||
|
|
||
|
with reporter.step("Put object to container node"):
|
||
|
oid = put_object(
|
||
|
wallet=default_wallet,
|
||
|
path=file_path,
|
||
|
cid=cid,
|
||
|
shell=self.shell,
|
||
|
endpoint=container_nodes[0].storage_node.get_rpc_endpoint(),
|
||
|
)
|
||
|
|
||
|
with reporter.step(f"Check metric in container nodes 'the counter should increase by {copies}'"):
|
||
|
objects_metric_counter += copies
|
||
|
self.check_object_metrics_total_and_container(container_nodes, cid, objects_metric_counter, copies)
|
||
|
|
||
|
with reporter.step(f"Select node to stop"):
|
||
|
node_to_stop = container_nodes[0]
|
||
|
alive_nodes = [node for node in container_nodes if node != node_to_stop]
|
||
|
|
||
|
with reporter.step(f"Stop the node, wait until the object is replicated to another node"):
|
||
|
cluster_state_controller.stop_node_host(node_to_stop, "hard")
|
||
|
|
||
|
with reporter.step(f"Check metric in alive nodes 'the counter should increase by 1'"):
|
||
|
self.check_object_metrics_container(alive_nodes, cid, copies)
|
||
|
|
||
|
with reporter.step("Start node"):
|
||
|
cluster_state_controller.start_node_host(node_to_stop)
|
||
|
|
||
|
with reporter.step(f"Check metric in container nodes 'the counter doesn't change'"):
|
||
|
self.check_object_metrics_total_and_container(container_nodes, cid, objects_metric_counter, copies)
|
||
|
|
||
|
with reporter.step("Delete container"):
|
||
|
delete_container(default_wallet, cid, shell=self.shell, endpoint=self.cluster.default_rpc_endpoint)
|