forked from TrueCloudLab/frostfs-testcases
350 lines
16 KiB
Python
350 lines
16 KiB
Python
import random
|
|
import re
|
|
|
|
import allure
|
|
import pytest
|
|
from frostfs_testlib import reporter
|
|
from frostfs_testlib.clients.s3.interfaces import BucketContainerResolver, S3ClientWrapper
|
|
from frostfs_testlib.steps import s3_helper
|
|
from frostfs_testlib.steps.cli.container import delete_container, search_nodes_with_container
|
|
from frostfs_testlib.steps.cli.object import delete_object, lock_object, put_object, put_object_to_random_node
|
|
from frostfs_testlib.steps.metrics import check_metrics_counter, get_metrics_value
|
|
from frostfs_testlib.steps.storage_policy import get_nodes_with_object
|
|
from frostfs_testlib.storage.cluster import Cluster, ClusterNode
|
|
from frostfs_testlib.storage.controllers.cluster_state_controller import ClusterStateController
|
|
from frostfs_testlib.storage.dataclasses.wallet import WalletInfo
|
|
from frostfs_testlib.testing.cluster_test_base import ClusterTestBase
|
|
from frostfs_testlib.utils.file_utils import TestFile, generate_file, split_file
|
|
|
|
from ...helpers.container_request import PUBLIC_WITH_POLICY, ContainerRequest, requires_container
|
|
|
|
|
|
@pytest.mark.order(-16)
|
|
@pytest.mark.nightly
|
|
@pytest.mark.metrics
|
|
class TestObjectMetrics(ClusterTestBase):
|
|
@allure.title("Object metrics of removed container (obj_size={object_size})")
|
|
@requires_container(PUBLIC_WITH_POLICY("REP 2 IN X CBF 2 SELECT 2 FROM * AS X"))
|
|
def test_object_metrics_removed_container(self, default_wallet: WalletInfo, cluster: Cluster, container: str, test_file: TestFile):
|
|
|
|
with reporter.step("Put object to random node"):
|
|
oid = put_object_to_random_node(default_wallet, test_file.path, container, self.shell, cluster)
|
|
|
|
with reporter.step("Check metric appears in node where the object is located"):
|
|
object_storage_nodes = get_nodes_with_object(container, oid, self.shell, cluster.storage_nodes)
|
|
object_nodes = [cluster_node for cluster_node in cluster.cluster_nodes if cluster_node.storage_node in object_storage_nodes]
|
|
|
|
check_metrics_counter(
|
|
object_nodes,
|
|
counter_exp=2,
|
|
command="frostfs_node_engine_container_objects_total",
|
|
cid=container,
|
|
type="user",
|
|
)
|
|
|
|
with reporter.step("Delete container"):
|
|
delete_container(default_wallet, container, shell=self.shell, endpoint=self.cluster.default_rpc_endpoint)
|
|
|
|
with reporter.step("Tick Epoch"):
|
|
self.tick_epochs(epochs_to_tick=2, wait_block=2)
|
|
|
|
with reporter.step("Check metrics of removed containers doesn't appear in the storage node"):
|
|
check_metrics_counter(
|
|
object_nodes, counter_exp=0, command="frostfs_node_engine_container_objects_total", cid=container, type="user"
|
|
)
|
|
check_metrics_counter(object_nodes, counter_exp=0, command="frostfs_node_engine_container_size_byte", cid=container)
|
|
|
|
with reporter.step("Check removed containers shouldn't appear in the storage node"):
|
|
for node in object_nodes:
|
|
try:
|
|
all_metrics = node.metrics.storage.get_metrics_search_by_greps(command="frostfs_node_engine_container_size_byte")
|
|
except:
|
|
continue
|
|
assert container not in all_metrics.stdout, "metrics of removed containers shouldn't appear in the storage node"
|
|
|
|
@allure.title("Object metrics, locked object (obj_size={object_size}, policy={container_request})")
|
|
@requires_container(
|
|
[
|
|
PUBLIC_WITH_POLICY("REP 1 IN X CBF 1 SELECT 1 FROM * AS X", short_name="REP 1"),
|
|
PUBLIC_WITH_POLICY("REP 2 IN X CBF 2 SELECT 2 FROM * AS X", short_name="REP 2"),
|
|
]
|
|
)
|
|
def test_object_metrics_blocked_object(
|
|
self, default_wallet: WalletInfo, cluster: Cluster, container: str, container_request: ContainerRequest, test_file: TestFile
|
|
):
|
|
metric_step = int(re.search(r"REP\s(\d+)", container_request.policy).group(1))
|
|
|
|
with reporter.step("Search container nodes"):
|
|
container_nodes = search_nodes_with_container(
|
|
wallet=default_wallet,
|
|
cid=container,
|
|
shell=self.shell,
|
|
endpoint=self.cluster.default_rpc_endpoint,
|
|
cluster=cluster,
|
|
)
|
|
|
|
with reporter.step("Get current metrics for metric_type=user"):
|
|
objects_metric_counter = 0
|
|
for node in container_nodes:
|
|
objects_metric_counter += get_metrics_value(node, command="frostfs_node_engine_objects_total", type="user")
|
|
|
|
with reporter.step("Put object to container node"):
|
|
oid = put_object(default_wallet, test_file.path, container, self.shell, container_nodes[0].storage_node.get_rpc_endpoint())
|
|
|
|
with reporter.step(f"Check metric user 'the counter should increase by {metric_step}'"):
|
|
objects_metric_counter += metric_step
|
|
check_metrics_counter(
|
|
container_nodes,
|
|
operator=">=",
|
|
counter_exp=objects_metric_counter,
|
|
command="frostfs_node_engine_objects_total",
|
|
type="user",
|
|
)
|
|
check_metrics_counter(
|
|
container_nodes,
|
|
counter_exp=metric_step,
|
|
command="frostfs_node_engine_container_objects_total",
|
|
cid=container,
|
|
type="user",
|
|
)
|
|
|
|
with reporter.step("Delete object"):
|
|
delete_object(default_wallet, container, oid, self.shell, self.cluster.default_rpc_endpoint)
|
|
|
|
with reporter.step(f"Check metric user 'the counter should decrease by {metric_step}'"):
|
|
objects_metric_counter -= metric_step
|
|
check_metrics_counter(
|
|
container_nodes,
|
|
operator=">=",
|
|
counter_exp=objects_metric_counter,
|
|
command="frostfs_node_engine_objects_total",
|
|
type="user",
|
|
)
|
|
check_metrics_counter(
|
|
container_nodes,
|
|
counter_exp=0,
|
|
command="frostfs_node_engine_container_objects_total",
|
|
cid=container,
|
|
type="user",
|
|
)
|
|
|
|
with reporter.step("Put object and lock it to next epoch"):
|
|
oid = put_object(default_wallet, test_file.path, container, self.shell, container_nodes[0].storage_node.get_rpc_endpoint())
|
|
current_epoch = self.get_epoch()
|
|
lock_object(
|
|
default_wallet,
|
|
container,
|
|
oid,
|
|
self.shell,
|
|
container_nodes[0].storage_node.get_rpc_endpoint(),
|
|
expire_at=current_epoch + 1,
|
|
)
|
|
|
|
with reporter.step(f"Check metric user 'the counter should increase by {metric_step}'"):
|
|
objects_metric_counter += metric_step
|
|
check_metrics_counter(
|
|
container_nodes,
|
|
operator=">=",
|
|
counter_exp=objects_metric_counter,
|
|
command="frostfs_node_engine_objects_total",
|
|
type="user",
|
|
)
|
|
check_metrics_counter(
|
|
container_nodes,
|
|
counter_exp=metric_step,
|
|
command="frostfs_node_engine_container_objects_total",
|
|
cid=container,
|
|
type="user",
|
|
)
|
|
|
|
with reporter.step(f"Wait until remove locking 'the counter doesn't change'"):
|
|
self.tick_epochs(epochs_to_tick=2)
|
|
check_metrics_counter(
|
|
container_nodes,
|
|
operator="<=",
|
|
counter_exp=objects_metric_counter,
|
|
command="frostfs_node_engine_objects_total",
|
|
type="user",
|
|
)
|
|
|
|
with reporter.step("Delete object"):
|
|
delete_object(default_wallet, container, oid, self.shell, self.cluster.default_rpc_endpoint)
|
|
|
|
with reporter.step(f"Check metric user 'the counter should decrease by {metric_step}'"):
|
|
objects_metric_counter -= metric_step
|
|
check_metrics_counter(
|
|
container_nodes,
|
|
operator=">=",
|
|
counter_exp=objects_metric_counter,
|
|
command="frostfs_node_engine_objects_total",
|
|
type="user",
|
|
)
|
|
check_metrics_counter(
|
|
container_nodes,
|
|
counter_exp=0,
|
|
command="frostfs_node_engine_container_objects_total",
|
|
cid=container,
|
|
type="user",
|
|
)
|
|
|
|
with reporter.step("Put object with expire_at"):
|
|
current_epoch = self.get_epoch()
|
|
oid = put_object(
|
|
default_wallet,
|
|
test_file.path,
|
|
container,
|
|
self.shell,
|
|
container_nodes[0].storage_node.get_rpc_endpoint(),
|
|
expire_at=current_epoch + 1,
|
|
)
|
|
|
|
with reporter.step(f"Check metric user 'the counter should increase by {metric_step}'"):
|
|
objects_metric_counter += metric_step
|
|
check_metrics_counter(
|
|
container_nodes,
|
|
operator=">=",
|
|
counter_exp=objects_metric_counter,
|
|
command="frostfs_node_engine_objects_total",
|
|
type="user",
|
|
)
|
|
check_metrics_counter(
|
|
container_nodes,
|
|
counter_exp=metric_step,
|
|
command="frostfs_node_engine_container_objects_total",
|
|
cid=container,
|
|
type="user",
|
|
)
|
|
|
|
with reporter.step("Tick Epoch"):
|
|
self.tick_epochs(epochs_to_tick=2)
|
|
|
|
with reporter.step(f"Check metric user 'the counter should decrease by {metric_step}'"):
|
|
objects_metric_counter -= metric_step
|
|
check_metrics_counter(
|
|
container_nodes,
|
|
operator=">=",
|
|
counter_exp=objects_metric_counter,
|
|
command="frostfs_node_engine_objects_total",
|
|
type="user",
|
|
)
|
|
check_metrics_counter(
|
|
container_nodes,
|
|
counter_exp=0,
|
|
command="frostfs_node_engine_container_objects_total",
|
|
cid=container,
|
|
type="user",
|
|
)
|
|
|
|
@allure.title("Object metrics, stop the node (obj_size={object_size})")
|
|
@requires_container(PUBLIC_WITH_POLICY("REP 2 IN X CBF 2 SELECT 2 FROM * AS X", short_name="REP 2"))
|
|
def test_object_metrics_stop_node(
|
|
self,
|
|
default_wallet: WalletInfo,
|
|
cluster_state_controller: ClusterStateController,
|
|
container: str,
|
|
test_file: TestFile,
|
|
):
|
|
copies = 2
|
|
|
|
with reporter.step(f"Check object metrics in container 'should be zero'"):
|
|
check_metrics_counter(
|
|
self.cluster.cluster_nodes,
|
|
counter_exp=0,
|
|
command="frostfs_node_engine_container_objects_total",
|
|
type="user",
|
|
cid=container,
|
|
)
|
|
|
|
with reporter.step("Get current metrics for each nodes"):
|
|
objects_metric_counter: dict[ClusterNode:int] = {}
|
|
for node in self.cluster.cluster_nodes:
|
|
objects_metric_counter[node] = get_metrics_value(node, command="frostfs_node_engine_objects_total", type="user")
|
|
|
|
with reporter.step("Put object"):
|
|
oid = put_object(default_wallet, test_file.path, container, self.shell, self.cluster.default_rpc_endpoint)
|
|
|
|
with reporter.step("Get object nodes"):
|
|
object_storage_nodes = get_nodes_with_object(container, oid, self.shell, self.cluster.storage_nodes)
|
|
object_nodes = [
|
|
cluster_node for cluster_node in self.cluster.cluster_nodes if cluster_node.storage_node in object_storage_nodes
|
|
]
|
|
|
|
with reporter.step(f"Check metrics in object nodes 'the counter should increase by {copies}'"):
|
|
counter_exp = sum(objects_metric_counter[node] for node in object_nodes) + copies
|
|
check_metrics_counter(object_nodes, counter_exp=counter_exp, command="frostfs_node_engine_objects_total", type="user")
|
|
check_metrics_counter(
|
|
object_nodes,
|
|
counter_exp=copies,
|
|
command="frostfs_node_engine_container_objects_total",
|
|
type="user",
|
|
cid=container,
|
|
)
|
|
|
|
with reporter.step(f"Select node to stop"):
|
|
node_to_stop = random.choice(object_nodes)
|
|
alive_nodes = set(object_nodes).difference({node_to_stop})
|
|
|
|
with reporter.step(f"Stop the node, wait until the object is replicated to another node"):
|
|
cluster_state_controller.stop_node_host(node_to_stop, "hard")
|
|
objects_metric_counter[node_to_stop] += 1
|
|
|
|
with reporter.step(f"Check metric in alive nodes 'the counter should increase'"):
|
|
counter_exp = sum(objects_metric_counter[node] for node in alive_nodes)
|
|
check_metrics_counter(alive_nodes, ">=", counter_exp, command="frostfs_node_engine_objects_total", type="user")
|
|
|
|
with reporter.step("Start node"):
|
|
cluster_state_controller.start_node_host(node_to_stop)
|
|
|
|
with reporter.step(f"Check metric in restarted node, 'the counter doesn't change'"):
|
|
check_metrics_counter(
|
|
object_nodes,
|
|
counter_exp=copies,
|
|
command="frostfs_node_engine_container_objects_total",
|
|
type="user",
|
|
cid=container,
|
|
)
|
|
|
|
@allure.title("Multipart object metrics, PUT over S3, check part counts (s3_client={s3_client}, parts_count={parts_count})")
|
|
@pytest.mark.parametrize("parts_count", [4, 5, 10])
|
|
def test_multipart_object_metrics_check_parts_count(
|
|
self, s3_client: S3ClientWrapper, bucket_container_resolver: BucketContainerResolver, parts_count: int
|
|
):
|
|
parts = []
|
|
object_size_10_mb = 10 * 1024 * 1024
|
|
original_size = object_size_10_mb * parts_count
|
|
|
|
with reporter.step("Create public container"):
|
|
bucket = s3_client.create_bucket()
|
|
|
|
with reporter.step("Generate original object and split it into parts"):
|
|
original_file = generate_file(original_size)
|
|
file_parts = split_file(original_file, parts_count)
|
|
object_key = s3_helper.object_key_from_file_path(original_file)
|
|
|
|
with reporter.step("Create multipart and upload parts"):
|
|
upload_id = s3_client.create_multipart_upload(bucket, object_key)
|
|
for part_id, file_path in enumerate(file_parts, start=1):
|
|
etag = s3_client.upload_part(bucket, object_key, upload_id, part_id, file_path)
|
|
parts.append((part_id, etag))
|
|
|
|
with reporter.step("Check all parts are visible in bucket"):
|
|
got_parts = s3_client.list_parts(bucket, object_key, upload_id)
|
|
assert len(got_parts) == len(file_parts), f"Expected {parts_count} parts, got:\n{got_parts}"
|
|
|
|
with reporter.step("Complete multipart upload"):
|
|
s3_client.complete_multipart_upload(bucket, object_key, upload_id, parts)
|
|
|
|
with reporter.step(f"Get related container_id for bucket"):
|
|
cid = bucket_container_resolver.resolve(self.cluster.cluster_nodes[0], bucket)
|
|
|
|
with reporter.step(f"Check metric count object should be equal count parts of multipart object"):
|
|
# plus 1, because creating an additional object when calling CompleteMultipart
|
|
# multiply by 2 because default location constraint is REP 2
|
|
expected_user_metric = (parts_count + 1) * 2
|
|
check_metrics_counter(
|
|
self.cluster.cluster_nodes,
|
|
counter_exp=expected_user_metric,
|
|
command="frostfs_node_engine_container_objects_total",
|
|
type="user",
|
|
cid=cid,
|
|
)
|