forked from TrueCloudLab/frostfs-testcases
Compare commits
4 commits
e65d19f056
...
0cbf319835
Author | SHA1 | Date | |
---|---|---|---|
0cbf319835 | |||
cc440f9c12 | |||
5ec844417a | |||
d08dbfa07d |
10 changed files with 468 additions and 487 deletions
|
@ -1,5 +1,5 @@
|
|||
import itertools
|
||||
import logging
|
||||
import os.path
|
||||
import random
|
||||
|
||||
import allure
|
||||
|
@ -17,7 +17,6 @@ from frostfs_testlib.storage.dataclasses.wallet import WalletInfo
|
|||
from frostfs_testlib.testing.cluster_test_base import ClusterTestBase
|
||||
from frostfs_testlib.testing.parallel import parallel
|
||||
from frostfs_testlib.testing.test_control import wait_for_success
|
||||
from frostfs_testlib.utils.failover_utils import wait_object_replication
|
||||
from frostfs_testlib.utils.file_utils import get_file_hash
|
||||
from pytest import FixtureRequest
|
||||
|
||||
|
@ -35,7 +34,7 @@ class TestFailoverServer(ClusterTestBase):
|
|||
def wait_node_in_map(self, *args, **kwargs):
|
||||
check_node_in_map(*args, **kwargs)
|
||||
|
||||
@reporter.step("Create {count_containers} containers and {count_files} objects")
|
||||
@allure.title("[Test] Create containers")
|
||||
@pytest.fixture
|
||||
def containers(
|
||||
self,
|
||||
|
@ -45,22 +44,24 @@ class TestFailoverServer(ClusterTestBase):
|
|||
|
||||
placement_rule = "REP 2 CBF 2 SELECT 2 FROM *"
|
||||
|
||||
containers = []
|
||||
containers_count = request.param
|
||||
results = parallel(
|
||||
[create_container for _ in range(containers_count)],
|
||||
wallet=default_wallet,
|
||||
shell=self.shell,
|
||||
endpoint=self.cluster.default_rpc_endpoint,
|
||||
rule=placement_rule,
|
||||
basic_acl=PUBLIC_ACL,
|
||||
)
|
||||
|
||||
for _ in range(request.param):
|
||||
cont_id = create_container(
|
||||
default_wallet,
|
||||
shell=self.shell,
|
||||
endpoint=self.cluster.default_rpc_endpoint,
|
||||
rule=placement_rule,
|
||||
basic_acl=PUBLIC_ACL,
|
||||
)
|
||||
storage_cont_info = StorageContainerInfo(cont_id, default_wallet)
|
||||
containers.append(StorageContainer(storage_cont_info, self.shell, self.cluster))
|
||||
containers = [
|
||||
StorageContainer(StorageContainerInfo(result.result(), default_wallet), self.shell, self.cluster)
|
||||
for result in results
|
||||
]
|
||||
|
||||
return containers
|
||||
|
||||
@reporter.step("Creation container")
|
||||
@allure.title("[Test] Create container")
|
||||
@pytest.fixture()
|
||||
def container(self, default_wallet: WalletInfo) -> StorageContainer:
|
||||
select = len(self.cluster.cluster_nodes)
|
||||
|
@ -75,7 +76,7 @@ class TestFailoverServer(ClusterTestBase):
|
|||
storage_cont_info = StorageContainerInfo(cont_id, default_wallet)
|
||||
return StorageContainer(storage_cont_info, self.shell, self.cluster)
|
||||
|
||||
@reporter.step("Create object and delete after test")
|
||||
@allure.title("[Class] Create objects")
|
||||
@pytest.fixture(scope="class")
|
||||
def storage_objects(
|
||||
self,
|
||||
|
@ -83,69 +84,48 @@ class TestFailoverServer(ClusterTestBase):
|
|||
containers: list[StorageContainer],
|
||||
simple_object_size: ObjectSize,
|
||||
complex_object_size: ObjectSize,
|
||||
) -> StorageObjectInfo:
|
||||
count_object = request.param
|
||||
object_sizes = [simple_object_size, complex_object_size]
|
||||
object_list: list[StorageObjectInfo] = []
|
||||
for cont in containers:
|
||||
for _ in range(count_object):
|
||||
object_list.append(cont.generate_object(size=random.choice(object_sizes).value))
|
||||
|
||||
for storage_object in object_list:
|
||||
os.remove(storage_object.file_path)
|
||||
|
||||
yield object_list
|
||||
|
||||
@reporter.step("Upload object with nodes and compare")
|
||||
def get_corrupted_objects_list(
|
||||
self, nodes: list[StorageNode], storage_objects: list[StorageObjectInfo]
|
||||
) -> list[StorageObjectInfo]:
|
||||
corrupted_objects = []
|
||||
errors_get = []
|
||||
for node in nodes:
|
||||
for storage_object in storage_objects:
|
||||
try:
|
||||
got_file_path = get_object(
|
||||
storage_object.wallet,
|
||||
storage_object.cid,
|
||||
storage_object.oid,
|
||||
endpoint=node.get_rpc_endpoint(),
|
||||
shell=self.shell,
|
||||
timeout="60s",
|
||||
)
|
||||
if storage_object.file_hash != get_file_hash(got_file_path):
|
||||
corrupted_objects.append(storage_object)
|
||||
os.remove(got_file_path)
|
||||
except RuntimeError:
|
||||
errors_get.append(storage_object.oid)
|
||||
object_count = request.param
|
||||
sizes_samples = [simple_object_size, complex_object_size]
|
||||
samples_count = len(sizes_samples)
|
||||
assert object_count >= samples_count, f"Object count is too low, must be >= {samples_count}"
|
||||
|
||||
assert len(errors_get) == 0, f"Get failed - {errors_get}"
|
||||
return corrupted_objects
|
||||
sizes_weights = [2, 1]
|
||||
sizes = sizes_samples + random.choices(sizes_samples, weights=sizes_weights, k=object_count - samples_count)
|
||||
|
||||
def check_objects_replication(
|
||||
self, storage_objects: list[StorageObjectInfo], storage_nodes: list[StorageNode]
|
||||
) -> None:
|
||||
for storage_object in storage_objects:
|
||||
wait_object_replication(
|
||||
storage_object.cid,
|
||||
storage_object.oid,
|
||||
2,
|
||||
shell=self.shell,
|
||||
nodes=storage_nodes,
|
||||
sleep_interval=45,
|
||||
attempts=60,
|
||||
)
|
||||
results = parallel(
|
||||
[container.generate_object for _ in sizes for container in containers],
|
||||
size=itertools.cycle([size.value for size in sizes]),
|
||||
)
|
||||
|
||||
return [result.result() for result in results]
|
||||
|
||||
@allure.title("[Test] Create objects and get nodes with object")
|
||||
@pytest.fixture()
|
||||
def object_and_nodes(
|
||||
self, simple_object_size: ObjectSize, container: StorageContainer
|
||||
) -> tuple[StorageObjectInfo, list[ClusterNode]]:
|
||||
object_info = container.generate_object(simple_object_size.value)
|
||||
object_nodes = get_object_nodes(
|
||||
cluster=self.cluster, cid=object_info.cid, oid=object_info.oid, alive_node=self.cluster.cluster_nodes[0]
|
||||
)
|
||||
object_nodes = get_object_nodes(self.cluster, object_info.cid, object_info.oid, self.cluster.cluster_nodes[0])
|
||||
return object_info, object_nodes
|
||||
|
||||
def _verify_object(self, storage_object: StorageObjectInfo, node: StorageNode):
|
||||
with reporter.step(f"Verify object {storage_object.oid} from node {node}"):
|
||||
file_path = get_object(
|
||||
storage_object.wallet,
|
||||
storage_object.cid,
|
||||
storage_object.oid,
|
||||
endpoint=node.get_rpc_endpoint(),
|
||||
shell=self.shell,
|
||||
timeout="60s",
|
||||
)
|
||||
|
||||
assert storage_object.file_hash == get_file_hash(file_path)
|
||||
|
||||
@reporter.step("Verify objects")
|
||||
def verify_objects(self, nodes: list[StorageNode], storage_objects: list[StorageObjectInfo]) -> None:
|
||||
parallel(self._verify_object, storage_objects * len(nodes), node=itertools.cycle(nodes))
|
||||
|
||||
@allure.title("Full shutdown node")
|
||||
@pytest.mark.parametrize("containers, storage_objects", [(5, 10)], indirect=True)
|
||||
def test_complete_node_shutdown(
|
||||
|
@ -154,22 +134,21 @@ class TestFailoverServer(ClusterTestBase):
|
|||
node_under_test: ClusterNode,
|
||||
cluster_state_controller: ClusterStateController,
|
||||
):
|
||||
with reporter.step(f"Remove {node_under_test} from the list of nodes"):
|
||||
with reporter.step(f"Remove one node from the list of nodes"):
|
||||
alive_nodes = list(set(self.cluster.cluster_nodes) - {node_under_test})
|
||||
|
||||
storage_nodes = [cluster.storage_node for cluster in alive_nodes]
|
||||
|
||||
with reporter.step("Tick epoch and wait for 2 blocks"):
|
||||
self.tick_epochs(1, storage_nodes[0], wait_block=2)
|
||||
with reporter.step("Tick 2 epochs and wait for 2 blocks"):
|
||||
self.tick_epochs(2, storage_nodes[0], wait_block=2)
|
||||
|
||||
with reporter.step(f"Stop node"):
|
||||
cluster_state_controller.stop_node_host(node=node_under_test, mode="hard")
|
||||
cluster_state_controller.stop_node_host(node_under_test, "hard")
|
||||
|
||||
with reporter.step("Verify that there are no corrupted objects"):
|
||||
corrupted_objects_list = self.get_corrupted_objects_list(storage_nodes, storage_objects)
|
||||
assert not corrupted_objects_list
|
||||
self.verify_objects(storage_nodes, storage_objects)
|
||||
|
||||
with reporter.step(f"check {node_under_test.storage_node} in map"):
|
||||
with reporter.step(f"Check node still in map"):
|
||||
self.wait_node_in_map(node_under_test.storage_node, self.shell, alive_node=storage_nodes[0])
|
||||
|
||||
count_tick_epoch = int(alive_nodes[0].ir_node.get_netmap_cleaner_threshold()) + 4
|
||||
|
@ -177,12 +156,11 @@ class TestFailoverServer(ClusterTestBase):
|
|||
with reporter.step(f"Tick {count_tick_epoch} epochs and wait for 2 blocks"):
|
||||
self.tick_epochs(count_tick_epoch, storage_nodes[0], wait_block=2)
|
||||
|
||||
with reporter.step(f"Check {node_under_test} in not map"):
|
||||
with reporter.step(f"Check node in not map after {count_tick_epoch} epochs"):
|
||||
self.wait_node_not_in_map(node_under_test.storage_node, self.shell, alive_node=storage_nodes[0])
|
||||
|
||||
with reporter.step(f"Verify that there are no corrupted objects after {count_tick_epoch} epoch"):
|
||||
corrupted_objects_list = self.get_corrupted_objects_list(storage_nodes, storage_objects)
|
||||
assert not corrupted_objects_list
|
||||
with reporter.step(f"Verify that there are no corrupted objects after {count_tick_epoch} epochs"):
|
||||
self.verify_objects(storage_nodes, storage_objects)
|
||||
|
||||
@allure.title("Temporarily disable a node")
|
||||
@pytest.mark.parametrize("containers, storage_objects", [(5, 10)], indirect=True)
|
||||
|
@ -192,27 +170,26 @@ class TestFailoverServer(ClusterTestBase):
|
|||
node_under_test: ClusterNode,
|
||||
cluster_state_controller: ClusterStateController,
|
||||
):
|
||||
with reporter.step(f"Remove {node_under_test} from the list of nodes"):
|
||||
with reporter.step(f"Remove one node from the list"):
|
||||
storage_nodes = list(set(self.cluster.storage_nodes) - {node_under_test.storage_node})
|
||||
|
||||
with reporter.step("Tick epoch and wait for 2 blocks"):
|
||||
self.tick_epochs(1, storage_nodes[0], wait_block=2)
|
||||
with reporter.step("Tick 2 epochs and wait for 2 blocks"):
|
||||
self.tick_epochs(2, storage_nodes[0], wait_block=2)
|
||||
|
||||
with reporter.step(f"Stop node"):
|
||||
cluster_state_controller.stop_node_host(node_under_test, "hard")
|
||||
|
||||
with reporter.step("Verify that there are no corrupted objects"):
|
||||
corrupted_objects_list = self.get_corrupted_objects_list(storage_nodes, storage_objects)
|
||||
assert not corrupted_objects_list
|
||||
self.verify_objects(storage_nodes, storage_objects)
|
||||
|
||||
with reporter.step(f"Check {node_under_test} in map"):
|
||||
with reporter.step(f"Check node still in map"):
|
||||
self.wait_node_in_map(node_under_test.storage_node, self.shell, alive_node=storage_nodes[0])
|
||||
|
||||
cluster_state_controller.start_node_host(node_under_test)
|
||||
with reporter.step(f"Start node"):
|
||||
cluster_state_controller.start_node_host(node_under_test)
|
||||
|
||||
with reporter.step("Verify that there are no corrupted objects"):
|
||||
corrupted_objects_list = self.get_corrupted_objects_list(storage_nodes, storage_objects)
|
||||
assert not corrupted_objects_list
|
||||
self.verify_objects(storage_nodes, storage_objects)
|
||||
|
||||
@allure.title("Not enough nodes in the container with policy - 'REP 3 CBF 1 SELECT 4 FROM *'")
|
||||
def test_not_enough_nodes_in_container_rep_3(
|
||||
|
@ -237,7 +214,7 @@ class TestFailoverServer(ClusterTestBase):
|
|||
with reporter.step(f"Get object from node with object"):
|
||||
get_object(default_wallet, object_info.cid, object_info.oid, self.shell, endpoint_with_object)
|
||||
|
||||
with reporter.step(f"Put operation to node with object, expect error"):
|
||||
with reporter.step(f"[Negative] Put operation to node with object"):
|
||||
with pytest.raises(RuntimeError):
|
||||
put_object(default_wallet, simple_file, object_info.cid, self.shell, endpoint_with_object)
|
||||
|
||||
|
@ -277,8 +254,7 @@ class TestFailoverServer(ClusterTestBase):
|
|||
oid_2 = put_object(default_wallet, simple_file, cid, self.shell, alive_endpoint_with_object)
|
||||
|
||||
with reporter.step("Get object from alive node with object"):
|
||||
get_file = get_object(default_wallet, cid, oid_2, self.shell, alive_endpoint_with_object)
|
||||
os.remove(get_file)
|
||||
get_object(default_wallet, cid, oid_2, self.shell, alive_endpoint_with_object)
|
||||
|
||||
with reporter.step("Create container on alive node"):
|
||||
create_container(
|
||||
|
|
|
@ -1,47 +1,21 @@
|
|||
import math
|
||||
import re
|
||||
|
||||
import allure
|
||||
import pytest
|
||||
from frostfs_testlib import reporter
|
||||
from frostfs_testlib.steps.cli.container import create_container, delete_container
|
||||
from frostfs_testlib.steps.cli.object import delete_object, get_object_nodes, put_object_to_random_node
|
||||
from frostfs_testlib.storage.cluster import Cluster, ClusterNode
|
||||
from frostfs_testlib.steps.cli.container import create_container
|
||||
from frostfs_testlib.steps.cli.object import delete_object, put_object_to_random_node
|
||||
from frostfs_testlib.steps.metrics import check_metrics_counter
|
||||
from frostfs_testlib.steps.storage_policy import get_nodes_with_object
|
||||
from frostfs_testlib.storage.cluster import Cluster
|
||||
from frostfs_testlib.storage.dataclasses.object_size import ObjectSize
|
||||
from frostfs_testlib.storage.dataclasses.wallet import WalletInfo
|
||||
from frostfs_testlib.testing.cluster_test_base import ClusterTestBase
|
||||
from frostfs_testlib.testing.test_control import wait_for_success
|
||||
from frostfs_testlib.utils.file_utils import generate_file
|
||||
|
||||
|
||||
@pytest.mark.container
|
||||
class TestContainerMetrics(ClusterTestBase):
|
||||
@wait_for_success(interval=10)
|
||||
def check_sum_counter_metrics_in_nodes(
|
||||
self, cluster_nodes: list[ClusterNode], cid: str, phy_exp: int, logic_exp: int, user_exp: int
|
||||
):
|
||||
counter_phy = 0
|
||||
counter_logic = 0
|
||||
counter_user = 0
|
||||
for cluster_node in cluster_nodes:
|
||||
metric_result = cluster_node.metrics.storage.get_metric_container(f"container_objects_total", cid)
|
||||
counter_phy += self.get_count_metric_type_from_stdout(metric_result.stdout, "phy")
|
||||
counter_logic += self.get_count_metric_type_from_stdout(metric_result.stdout, "logic")
|
||||
counter_user += self.get_count_metric_type_from_stdout(metric_result.stdout, "user")
|
||||
|
||||
assert counter_phy == phy_exp, f"Expected metric Phy={phy_exp}, Actual: {counter_phy} in nodes: {cluster_nodes}"
|
||||
assert (
|
||||
counter_logic == logic_exp
|
||||
), f"Expected metric logic={logic_exp}, Actual: {counter_logic} in nodes: {cluster_nodes}"
|
||||
assert (
|
||||
counter_user == user_exp
|
||||
), f"Expected metric User={user_exp}, Actual: {counter_user} in nodes: {cluster_nodes}"
|
||||
|
||||
@staticmethod
|
||||
def get_count_metric_type_from_stdout(metric_result_stdout: str, metric_type: str):
|
||||
result = re.findall(rf'type="{metric_type}"}}\s(\d+)', metric_result_stdout)
|
||||
return sum(map(int, result))
|
||||
|
||||
@allure.title("Container metrics (obj_size={object_size})")
|
||||
def test_container_metrics(
|
||||
self, object_size: ObjectSize, max_object_size: int, default_wallet: WalletInfo, cluster: Cluster
|
||||
|
@ -57,15 +31,10 @@ class TestContainerMetrics(ClusterTestBase):
|
|||
link_object = 1
|
||||
|
||||
with reporter.step(f"Create container with policy {placement_policy}"):
|
||||
cid = create_container(
|
||||
default_wallet,
|
||||
rule=placement_policy,
|
||||
shell=self.shell,
|
||||
endpoint=self.cluster.default_rpc_endpoint,
|
||||
)
|
||||
cid = create_container(default_wallet, self.shell, cluster.default_rpc_endpoint, placement_policy)
|
||||
|
||||
with reporter.step("Put object to random node"):
|
||||
storage_object_id = put_object_to_random_node(
|
||||
oid = put_object_to_random_node(
|
||||
wallet=default_wallet,
|
||||
path=file_path,
|
||||
cid=cid,
|
||||
|
@ -73,25 +42,46 @@ class TestContainerMetrics(ClusterTestBase):
|
|||
cluster=cluster,
|
||||
)
|
||||
|
||||
with reporter.step("Get object nodes"):
|
||||
object_storage_nodes = get_nodes_with_object(cid, oid, self.shell, cluster.storage_nodes)
|
||||
object_nodes = [
|
||||
cluster_node
|
||||
for cluster_node in cluster.cluster_nodes
|
||||
if cluster_node.storage_node in object_storage_nodes
|
||||
]
|
||||
|
||||
with reporter.step("Check metric appears in node where the object is located"):
|
||||
object_nodes = get_object_nodes(
|
||||
cluster=cluster, cid=cid, oid=storage_object_id, alive_node=cluster.cluster_nodes[0]
|
||||
count_metrics = (object_chunks + head_object + link_object) * copies
|
||||
check_metrics_counter(
|
||||
object_nodes, counter_exp=count_metrics, command="container_objects_total", cid=cid, type="phy"
|
||||
)
|
||||
count_metrics_exp = (object_chunks + head_object + link_object) * copies
|
||||
self.check_sum_counter_metrics_in_nodes(
|
||||
object_nodes, cid, phy_exp=count_metrics_exp, logic_exp=count_metrics_exp, user_exp=copies
|
||||
check_metrics_counter(
|
||||
object_nodes, counter_exp=count_metrics, command="container_objects_total", cid=cid, type="logic"
|
||||
)
|
||||
check_metrics_counter(
|
||||
object_nodes, counter_exp=copies, command="container_objects_total", cid=cid, type="user"
|
||||
)
|
||||
|
||||
with reporter.step("Delete file, wait until gc remove object"):
|
||||
delete_object(default_wallet, cid, storage_object_id, self.shell, self.cluster.default_rpc_endpoint)
|
||||
count_metrics_exp = len(object_nodes)
|
||||
self.check_sum_counter_metrics_in_nodes(
|
||||
object_nodes, cid, phy_exp=count_metrics_exp, logic_exp=count_metrics_exp, user_exp=0
|
||||
delete_object(default_wallet, cid, oid, self.shell, cluster.default_rpc_endpoint)
|
||||
|
||||
with reporter.step(f"Check container metrics 'the counter should equal {len(object_nodes)}' in object nodes"):
|
||||
check_metrics_counter(
|
||||
object_nodes, counter_exp=len(object_nodes), command="container_objects_total", cid=cid, type="phy"
|
||||
)
|
||||
check_metrics_counter(
|
||||
object_nodes, counter_exp=len(object_nodes), command="container_objects_total", cid=cid, type="logic"
|
||||
)
|
||||
check_metrics_counter(object_nodes, counter_exp=0, command="container_objects_total", cid=cid, type="user")
|
||||
|
||||
with reporter.step("Check metrics(Phy, Logic, User) in each nodes"):
|
||||
# Phy and Logic metrics are 4, because in rule 'CBF 2 SELECT 2 FROM', cbf2*sel2=4
|
||||
self.check_sum_counter_metrics_in_nodes(cluster.cluster_nodes, cid, phy_exp=4, logic_exp=4, user_exp=0)
|
||||
|
||||
with reporter.step("Delete container"):
|
||||
delete_container(default_wallet, cid, shell=self.shell, endpoint=self.cluster.default_rpc_endpoint)
|
||||
check_metrics_counter(
|
||||
cluster.cluster_nodes, counter_exp=4, command="container_objects_total", cid=cid, type="phy"
|
||||
)
|
||||
check_metrics_counter(
|
||||
cluster.cluster_nodes, counter_exp=4, command="container_objects_total", cid=cid, type="logic"
|
||||
)
|
||||
check_metrics_counter(
|
||||
cluster.cluster_nodes, counter_exp=0, command="container_objects_total", cid=cid, type="user"
|
||||
)
|
||||
|
|
|
@ -2,9 +2,12 @@ import random
|
|||
import re
|
||||
|
||||
import allure
|
||||
import pytest
|
||||
from frostfs_testlib import reporter
|
||||
from frostfs_testlib.steps.cli.container import create_container, delete_container
|
||||
from frostfs_testlib.steps.cli.object import delete_object, get_object_nodes, put_object, put_object_to_random_node
|
||||
from frostfs_testlib.steps.cli.container import create_container
|
||||
from frostfs_testlib.steps.cli.object import delete_object, put_object, put_object_to_random_node
|
||||
from frostfs_testlib.steps.metrics import check_metrics_counter, get_metrics_value
|
||||
from frostfs_testlib.steps.storage_policy import get_nodes_with_object
|
||||
from frostfs_testlib.storage.cluster import Cluster, ClusterNode
|
||||
from frostfs_testlib.storage.dataclasses.object_size import ObjectSize
|
||||
from frostfs_testlib.storage.dataclasses.wallet import WalletInfo
|
||||
|
@ -40,10 +43,9 @@ class TestGarbageCollectorMetrics(ClusterTestBase):
|
|||
with reporter.step("Get current garbage collector metrics for each nodes"):
|
||||
metrics_counter = {}
|
||||
for node in cluster.cluster_nodes:
|
||||
command_result = node.metrics.storage.get_metrics_search_by_greps(
|
||||
command="frostfs_node_garbage_collector_marked_for_removal_objects_total"
|
||||
metrics_counter[node] = get_metrics_value(
|
||||
node, command="frostfs_node_garbage_collector_marked_for_removal_objects_total"
|
||||
)
|
||||
metrics_counter[node] = self.calc_metrics_count_from_stdout(command_result.stdout)
|
||||
|
||||
with reporter.step(f"Create container with policy {placement_policy}"):
|
||||
cid = create_container(default_wallet, self.shell, cluster.default_rpc_endpoint, placement_policy)
|
||||
|
@ -60,23 +62,29 @@ class TestGarbageCollectorMetrics(ClusterTestBase):
|
|||
)
|
||||
|
||||
with reporter.step("Get object nodes"):
|
||||
object_nodes = get_object_nodes(cluster, cid, oid, cluster.cluster_nodes[0])
|
||||
object_storage_nodes = get_nodes_with_object(cid, oid, self.shell, cluster.storage_nodes)
|
||||
object_nodes = [
|
||||
cluster_node
|
||||
for cluster_node in cluster.cluster_nodes
|
||||
if cluster_node.storage_node in object_storage_nodes
|
||||
]
|
||||
|
||||
with reporter.step("Tick Epoch"):
|
||||
self.tick_epochs(epochs_to_tick=2, wait_block=2)
|
||||
|
||||
with reporter.step(f"Check garbage collector metrics 'the counter should increase by {metrics_step}'"):
|
||||
with reporter.step(
|
||||
f"Check garbage collector metrics 'the counter should increase by {metrics_step}' in object nodes"
|
||||
):
|
||||
for node in object_nodes:
|
||||
metrics_counter[node] += metrics_step
|
||||
|
||||
for node, counter in metrics_counter.items():
|
||||
self.check_metrics_in_node(
|
||||
node, counter, command="frostfs_node_garbage_collector_marked_for_removal_objects_total"
|
||||
check_metrics_counter(
|
||||
[node],
|
||||
counter_exp=counter,
|
||||
command="frostfs_node_garbage_collector_marked_for_removal_objects_total",
|
||||
)
|
||||
|
||||
with reporter.step("Delete container"):
|
||||
delete_container(default_wallet, cid, self.shell, cluster.default_rpc_endpoint)
|
||||
|
||||
@allure.title("Garbage collector delete object")
|
||||
def test_garbage_collector_metrics_deleted_objects(
|
||||
self, simple_object_size: ObjectSize, default_wallet: WalletInfo, cluster: Cluster
|
||||
|
@ -89,10 +97,7 @@ class TestGarbageCollectorMetrics(ClusterTestBase):
|
|||
node = random.choice(cluster.cluster_nodes)
|
||||
|
||||
with reporter.step("Get current garbage collector metrics for selected node"):
|
||||
command_result = node.metrics.storage.get_metrics_search_by_greps(
|
||||
command="frostfs_node_garbage_collector_deleted_objects_total"
|
||||
)
|
||||
metrics_counter = self.calc_metrics_count_from_stdout(command_result.stdout)
|
||||
metrics_counter = get_metrics_value(node, command="frostfs_node_garbage_collector_deleted_objects_total")
|
||||
|
||||
with reporter.step(f"Create container with policy {placement_policy}"):
|
||||
cid = create_container(default_wallet, self.shell, node.storage_node.get_rpc_endpoint(), placement_policy)
|
||||
|
@ -105,9 +110,6 @@ class TestGarbageCollectorMetrics(ClusterTestBase):
|
|||
|
||||
with reporter.step(f"Check garbage collector metrics 'the counter should increase by {metrics_step}'"):
|
||||
metrics_counter += metrics_step
|
||||
self.check_metrics_in_node(
|
||||
node, metrics_counter, command="frostfs_node_garbage_collector_deleted_objects_total"
|
||||
check_metrics_counter(
|
||||
[node], counter_exp=metrics_counter, command="frostfs_node_garbage_collector_deleted_objects_total"
|
||||
)
|
||||
|
||||
with reporter.step("Delete container"):
|
||||
delete_container(default_wallet, cid, self.shell, cluster.default_rpc_endpoint)
|
||||
|
|
|
@ -1,21 +1,20 @@
|
|||
import random
|
||||
import re
|
||||
|
||||
import allure
|
||||
import pytest
|
||||
from frostfs_testlib import reporter
|
||||
from frostfs_testlib.healthcheck.interfaces import Healthcheck
|
||||
from frostfs_testlib.steps.cli.container import create_container, delete_container, get_container, list_containers
|
||||
from frostfs_testlib.steps.cli.container import create_container, get_container, list_containers
|
||||
from frostfs_testlib.steps.cli.object import get_object, head_object, put_object, search_object
|
||||
from frostfs_testlib.steps.cli.tree import get_tree_list
|
||||
from frostfs_testlib.storage.cluster import Cluster, ClusterNode
|
||||
from frostfs_testlib.steps.metrics import check_metrics_counter, get_metrics_value
|
||||
from frostfs_testlib.storage.cluster import Cluster
|
||||
from frostfs_testlib.storage.controllers.cluster_state_controller import ClusterStateController
|
||||
from frostfs_testlib.storage.controllers.state_managers.config_state_manager import ConfigStateManager
|
||||
from frostfs_testlib.storage.dataclasses.frostfs_services import StorageNode
|
||||
from frostfs_testlib.storage.dataclasses.object_size import ObjectSize
|
||||
from frostfs_testlib.storage.dataclasses.wallet import WalletInfo
|
||||
from frostfs_testlib.testing.cluster_test_base import ClusterTestBase
|
||||
from frostfs_testlib.testing.test_control import wait_for_success
|
||||
from frostfs_testlib.utils.file_utils import generate_file
|
||||
|
||||
|
||||
|
@ -27,25 +26,6 @@ class TestGRPCMetrics(ClusterTestBase):
|
|||
yield
|
||||
cluster_state_controller.manager(ConfigStateManager).revert_all()
|
||||
|
||||
@wait_for_success(interval=10)
|
||||
def check_metrics_in_node(self, cluster_node: ClusterNode, counter_exp: int, **metrics_greps: str):
|
||||
counter_act = self.get_metrics_value(cluster_node, **metrics_greps)
|
||||
assert counter_act == counter_exp, f"Expected: {counter_exp}, Actual: {counter_act} in node: {cluster_node}"
|
||||
|
||||
def get_metrics_value(self, node: ClusterNode, **metrics_greps: str):
|
||||
try:
|
||||
command_result = node.metrics.storage.get_metrics_search_by_greps(**metrics_greps)
|
||||
metrics_counter = self.calc_metrics_count_from_stdout(command_result.stdout)
|
||||
except RuntimeError as e:
|
||||
metrics_counter = 0
|
||||
|
||||
return metrics_counter
|
||||
|
||||
@staticmethod
|
||||
def calc_metrics_count_from_stdout(metric_result_stdout: str):
|
||||
result = re.findall(r"}\s(\d+)", metric_result_stdout)
|
||||
return sum(map(int, result))
|
||||
|
||||
@allure.title("GRPC metrics container operations")
|
||||
def test_grpc_metrics_container_operations(self, default_wallet: WalletInfo, cluster: Cluster):
|
||||
placement_policy = "REP 2 IN X CBF 1 SELECT 4 FROM * AS X"
|
||||
|
@ -54,7 +34,7 @@ class TestGRPCMetrics(ClusterTestBase):
|
|||
node = random.choice(cluster.cluster_nodes)
|
||||
|
||||
with reporter.step("Get current gRPC metrics for method 'Put'"):
|
||||
metrics_counter_put = self.get_metrics_value(
|
||||
metrics_counter_put = get_metrics_value(
|
||||
node, command="grpc_server_handled_total", service="ContainerService", method="Put"
|
||||
)
|
||||
|
||||
|
@ -63,12 +43,16 @@ class TestGRPCMetrics(ClusterTestBase):
|
|||
|
||||
with reporter.step(f"Check gRPC metrics method 'Put', 'the counter should increase by 1'"):
|
||||
metrics_counter_put += 1
|
||||
self.check_metrics_in_node(
|
||||
node, metrics_counter_put, command="grpc_server_handled_total", service="ContainerService", method="Put"
|
||||
check_metrics_counter(
|
||||
[node],
|
||||
counter_exp=metrics_counter_put,
|
||||
command="grpc_server_handled_total",
|
||||
service="ContainerService",
|
||||
method="Put",
|
||||
)
|
||||
|
||||
with reporter.step("Get current gRPC metrics for method 'Get'"):
|
||||
metrics_counter_get = self.get_metrics_value(
|
||||
metrics_counter_get = get_metrics_value(
|
||||
node, command="grpc_server_handled_total", service="ContainerService", method="Get"
|
||||
)
|
||||
|
||||
|
@ -77,12 +61,16 @@ class TestGRPCMetrics(ClusterTestBase):
|
|||
|
||||
with reporter.step(f"Check gRPC metrics method=Get, 'the counter should increase by 1'"):
|
||||
metrics_counter_get += 1
|
||||
self.check_metrics_in_node(
|
||||
node, metrics_counter_get, command="grpc_server_handled_total", service="ContainerService", method="Get"
|
||||
check_metrics_counter(
|
||||
[node],
|
||||
counter_exp=metrics_counter_get,
|
||||
command="grpc_server_handled_total",
|
||||
service="ContainerService",
|
||||
method="Get",
|
||||
)
|
||||
|
||||
with reporter.step("Get current gRPC metrics for method 'List'"):
|
||||
metrics_counter_list = self.get_metrics_value(
|
||||
metrics_counter_list = get_metrics_value(
|
||||
node, command="grpc_server_handled_total", service="ContainerService", method="List"
|
||||
)
|
||||
|
||||
|
@ -91,17 +79,14 @@ class TestGRPCMetrics(ClusterTestBase):
|
|||
|
||||
with reporter.step(f"Check gRPC metrics method=List, 'the counter should increase by 1'"):
|
||||
metrics_counter_list += 1
|
||||
self.check_metrics_in_node(
|
||||
node,
|
||||
metrics_counter_list,
|
||||
check_metrics_counter(
|
||||
[node],
|
||||
counter_exp=metrics_counter_list,
|
||||
command="grpc_server_handled_total",
|
||||
service="ContainerService",
|
||||
method="List",
|
||||
)
|
||||
|
||||
with reporter.step("Delete container"):
|
||||
delete_container(default_wallet, cid, self.shell, self.cluster.default_rpc_endpoint)
|
||||
|
||||
@allure.title("GRPC metrics object operations")
|
||||
def test_grpc_metrics_object_operations(
|
||||
self, simple_object_size: ObjectSize, default_wallet: WalletInfo, cluster: Cluster, disable_policer
|
||||
|
@ -116,7 +101,7 @@ class TestGRPCMetrics(ClusterTestBase):
|
|||
cid = create_container(default_wallet, self.shell, node.storage_node.get_rpc_endpoint(), placement_policy)
|
||||
|
||||
with reporter.step("Get current gRPC metrics for method 'Put'"):
|
||||
metrics_counter_put = self.get_metrics_value(
|
||||
metrics_counter_put = get_metrics_value(
|
||||
node, command="grpc_server_handled_total", service="ObjectService", method="Put"
|
||||
)
|
||||
|
||||
|
@ -125,12 +110,16 @@ class TestGRPCMetrics(ClusterTestBase):
|
|||
|
||||
with reporter.step(f"Check gRPC metrics method 'Put', 'the counter should increase by 1'"):
|
||||
metrics_counter_put += 1
|
||||
self.check_metrics_in_node(
|
||||
node, metrics_counter_put, command="grpc_server_handled_total", service="ObjectService", method="Put"
|
||||
check_metrics_counter(
|
||||
[node],
|
||||
counter_exp=metrics_counter_put,
|
||||
command="grpc_server_handled_total",
|
||||
service="ObjectService",
|
||||
method="Put",
|
||||
)
|
||||
|
||||
with reporter.step("Get current gRPC metrics for method 'Get'"):
|
||||
metrics_counter_get = self.get_metrics_value(
|
||||
metrics_counter_get = get_metrics_value(
|
||||
node, command="grpc_server_handled_total", service="ObjectService", method="Get"
|
||||
)
|
||||
|
||||
|
@ -139,12 +128,16 @@ class TestGRPCMetrics(ClusterTestBase):
|
|||
|
||||
with reporter.step(f"Check gRPC metrics method=Get, 'the counter should increase by 1'"):
|
||||
metrics_counter_get += 1
|
||||
self.check_metrics_in_node(
|
||||
node, metrics_counter_get, command="grpc_server_handled_total", service="ObjectService", method="Get"
|
||||
check_metrics_counter(
|
||||
[node],
|
||||
counter_exp=metrics_counter_get,
|
||||
command="grpc_server_handled_total",
|
||||
service="ObjectService",
|
||||
method="Get",
|
||||
)
|
||||
|
||||
with reporter.step("Get current gRPC metrics for method 'Search'"):
|
||||
metrics_counter_search = self.get_metrics_value(
|
||||
metrics_counter_search = get_metrics_value(
|
||||
node, command="grpc_server_handled_total", service="ObjectService", method="Search"
|
||||
)
|
||||
|
||||
|
@ -153,16 +146,16 @@ class TestGRPCMetrics(ClusterTestBase):
|
|||
|
||||
with reporter.step(f"Check gRPC metrics method=Search, 'the counter should increase by 1'"):
|
||||
metrics_counter_search += 1
|
||||
self.check_metrics_in_node(
|
||||
node,
|
||||
metrics_counter_search,
|
||||
check_metrics_counter(
|
||||
[node],
|
||||
counter_exp=metrics_counter_search,
|
||||
command="grpc_server_handled_total",
|
||||
service="ObjectService",
|
||||
method="Search",
|
||||
)
|
||||
|
||||
with reporter.step("Get current gRPC metrics for method 'Head'"):
|
||||
metrics_counter_head = self.get_metrics_value(
|
||||
metrics_counter_head = get_metrics_value(
|
||||
node, command="grpc_server_handled_total", service="ObjectService", method="Head"
|
||||
)
|
||||
|
||||
|
@ -171,20 +164,21 @@ class TestGRPCMetrics(ClusterTestBase):
|
|||
|
||||
with reporter.step(f"Check gRPC metrics method=Head, 'the counter should increase by 1'"):
|
||||
metrics_counter_head += 1
|
||||
self.check_metrics_in_node(
|
||||
node, metrics_counter_head, command="grpc_server_handled_total", service="ObjectService", method="Head"
|
||||
check_metrics_counter(
|
||||
[node],
|
||||
counter_exp=metrics_counter_head,
|
||||
command="grpc_server_handled_total",
|
||||
service="ObjectService",
|
||||
method="Head",
|
||||
)
|
||||
|
||||
with reporter.step("Delete container"):
|
||||
delete_container(default_wallet, cid, self.shell, self.cluster.default_rpc_endpoint)
|
||||
|
||||
@allure.title("GRPC metrics Tree healthcheck")
|
||||
def test_grpc_metrics_tree_service(self, cluster: Cluster, healthcheck: Healthcheck):
|
||||
with reporter.step("Select random node"):
|
||||
node = random.choice(cluster.cluster_nodes)
|
||||
|
||||
with reporter.step("Get current gRPC metrics for Healthcheck"):
|
||||
metrics_counter = self.get_metrics_value(
|
||||
metrics_counter = get_metrics_value(
|
||||
node, command="grpc_server_handled_total", service="TreeService", method="Healthcheck"
|
||||
)
|
||||
|
||||
|
@ -192,10 +186,14 @@ class TestGRPCMetrics(ClusterTestBase):
|
|||
healthcheck.tree_healthcheck(node)
|
||||
|
||||
with reporter.step(f"Check gRPC metrics for Healthcheck, 'the counter should increase'"):
|
||||
metrics_counter_new = self.get_metrics_value(
|
||||
node, command="grpc_server_handled_total", service="TreeService", method="Healthcheck"
|
||||
check_metrics_counter(
|
||||
[node],
|
||||
">",
|
||||
metrics_counter,
|
||||
command="grpc_server_handled_total",
|
||||
service="TreeService",
|
||||
method="Healthcheck",
|
||||
)
|
||||
assert metrics_counter_new > metrics_counter, "the metrics has not increased"
|
||||
|
||||
@allure.title("GRPC metrics Tree list")
|
||||
def test_grpc_metrics_tree_list(self, default_wallet: WalletInfo, cluster: Cluster):
|
||||
|
@ -208,7 +206,7 @@ class TestGRPCMetrics(ClusterTestBase):
|
|||
cid = create_container(default_wallet, self.shell, node.storage_node.get_rpc_endpoint(), placement_policy)
|
||||
|
||||
with reporter.step("Get current gRPC metrics for Tree List"):
|
||||
metrics_counter = self.get_metrics_value(
|
||||
metrics_counter = get_metrics_value(
|
||||
node, command="grpc_server_handled_total", service="TreeService", method="TreeList"
|
||||
)
|
||||
|
||||
|
@ -217,9 +215,10 @@ class TestGRPCMetrics(ClusterTestBase):
|
|||
|
||||
with reporter.step(f"Check gRPC metrics for Tree List, 'the counter should increase by 1'"):
|
||||
metrics_counter += 1
|
||||
self.check_metrics_in_node(
|
||||
node, metrics_counter, command="grpc_server_handled_total", service="TreeService", method="TreeList"
|
||||
check_metrics_counter(
|
||||
[node],
|
||||
counter_exp=metrics_counter,
|
||||
command="grpc_server_handled_total",
|
||||
service="TreeService",
|
||||
method="TreeList",
|
||||
)
|
||||
|
||||
with reporter.step("Delete container"):
|
||||
delete_container(default_wallet, cid, self.shell, self.cluster.default_rpc_endpoint)
|
||||
|
|
|
@ -1,11 +1,11 @@
|
|||
import random
|
||||
import re
|
||||
from datetime import datetime
|
||||
from datetime import datetime, timezone
|
||||
|
||||
import allure
|
||||
import pytest
|
||||
from frostfs_testlib import reporter
|
||||
from frostfs_testlib.shell import Shell
|
||||
from frostfs_testlib.steps.metrics import get_metrics_value
|
||||
from frostfs_testlib.storage.cluster import Cluster, ClusterNode
|
||||
from frostfs_testlib.storage.controllers.cluster_state_controller import ClusterStateController
|
||||
from frostfs_testlib.storage.controllers.state_managers.config_state_manager import ConfigStateManager
|
||||
|
@ -16,59 +16,57 @@ from frostfs_testlib.testing.test_control import wait_for_success
|
|||
|
||||
class TestLogsMetrics(ClusterTestBase):
|
||||
@pytest.fixture
|
||||
def restart_storage_service(self, cluster_state_controller: ClusterStateController) -> str:
|
||||
def restart_storage_service(self, cluster_state_controller: ClusterStateController) -> datetime:
|
||||
config_manager = cluster_state_controller.manager(ConfigStateManager)
|
||||
config_manager.csc.stop_services_of_type(StorageNode)
|
||||
restart_time = datetime.utcnow().strftime("%Y-%m-%d %H:%M:%S")
|
||||
restart_time = datetime.now(timezone.utc)
|
||||
config_manager.csc.start_services_of_type(StorageNode)
|
||||
yield restart_time
|
||||
|
||||
cluster_state_controller.manager(ConfigStateManager).revert_all()
|
||||
|
||||
@wait_for_success(interval=10)
|
||||
def check_metrics_in_node(self, cluster_node: ClusterNode, counter_exp: int, **metrics_greps: str):
|
||||
counter_act = self.get_metrics_value(cluster_node, **metrics_greps)
|
||||
assert counter_act == counter_exp, f"Expected: {counter_exp}, Actual: {counter_act} in node: {cluster_node}"
|
||||
|
||||
def get_metrics_value(self, node: ClusterNode, **metrics_greps: str):
|
||||
try:
|
||||
command_result = node.metrics.storage.get_metrics_search_by_greps(**metrics_greps)
|
||||
metrics_counter = self.calc_metrics_count_from_stdout(command_result.stdout)
|
||||
except RuntimeError as e:
|
||||
metrics_counter = 0
|
||||
|
||||
return metrics_counter
|
||||
def check_metrics_in_node(
|
||||
self, cluster_node: ClusterNode, restart_time: datetime, log_priority: str = None, **metrics_greps
|
||||
):
|
||||
counter_logs = self.get_count_logs_by_level(
|
||||
cluster_node, metrics_greps.get("level"), restart_time, log_priority
|
||||
)
|
||||
counter_metrics = get_metrics_value(cluster_node, **metrics_greps)
|
||||
assert (
|
||||
counter_logs == counter_metrics
|
||||
), f"counter_logs: {counter_logs}, counter_metrics: {counter_metrics} in node: {cluster_node}"
|
||||
|
||||
@staticmethod
|
||||
def calc_metrics_count_from_stdout(metric_result_stdout: str):
|
||||
result = re.findall(r"}\s(\d+)", metric_result_stdout)
|
||||
return sum(map(int, result))
|
||||
|
||||
@staticmethod
|
||||
def get_count_logs_by_level(shell: Shell, log_level: str, after_time: str):
|
||||
def get_count_logs_by_level(cluster_node: ClusterNode, log_level: str, after_time: datetime, log_priority: str):
|
||||
count_logs = 0
|
||||
try:
|
||||
logs = shell.exec(f"journalctl -u frostfs-storage --grep='{log_level}' --since '{after_time}'")
|
||||
result = re.findall(rf"Z\s+{log_level}\s+", logs.stdout)
|
||||
logs = cluster_node.host.get_filtered_logs(
|
||||
log_level, unit="frostfs-storage", since=after_time, priority=log_priority
|
||||
)
|
||||
result = re.findall(rf"\s+{log_level}\s+", logs)
|
||||
count_logs += len(result)
|
||||
except RuntimeError as e:
|
||||
...
|
||||
return count_logs
|
||||
|
||||
@allure.title("Metrics for the log counter")
|
||||
def test_log_counter_metrics(self, cluster: Cluster, restart_storage_service: str):
|
||||
def test_log_counter_metrics(self, cluster: Cluster, restart_storage_service: datetime):
|
||||
restart_time = restart_storage_service
|
||||
with reporter.step("Select random node"):
|
||||
node = random.choice(cluster.cluster_nodes)
|
||||
|
||||
with reporter.step("Get count logs from journalctl with level 'info'"):
|
||||
count_logs_info = self.get_count_logs_by_level(node.host.get_shell(), "info", restart_time)
|
||||
|
||||
with reporter.step(f"Check metrics count logs with level 'info'"):
|
||||
self.check_metrics_in_node(node, count_logs_info, command="frostfs_node_logger_entry_count", level="info")
|
||||
|
||||
with reporter.step("Get count logs from journalctl with level 'error'"):
|
||||
count_logs_error = self.get_count_logs_by_level(node.host.get_shell(), "error", restart_time)
|
||||
self.check_metrics_in_node(
|
||||
node,
|
||||
restart_time,
|
||||
log_priority="6..6",
|
||||
command="frostfs_node_logger_entry_count",
|
||||
level="info",
|
||||
dropped="false",
|
||||
)
|
||||
|
||||
with reporter.step(f"Check metrics count logs with level 'error'"):
|
||||
self.check_metrics_in_node(node, count_logs_error, command="frostfs_node_logger_entry_count", level="error")
|
||||
self.check_metrics_in_node(
|
||||
node, restart_time, command="frostfs_node_logger_entry_count", level="error", dropped="false"
|
||||
)
|
||||
|
|
|
@ -5,92 +5,19 @@ import allure
|
|||
import pytest
|
||||
from frostfs_testlib import reporter
|
||||
from frostfs_testlib.steps.cli.container import create_container, delete_container, search_nodes_with_container
|
||||
from frostfs_testlib.steps.cli.object import (
|
||||
delete_object,
|
||||
get_object_nodes,
|
||||
lock_object,
|
||||
put_object,
|
||||
put_object_to_random_node,
|
||||
)
|
||||
from frostfs_testlib.steps.cli.object import delete_object, lock_object, put_object, put_object_to_random_node
|
||||
from frostfs_testlib.steps.metrics import check_metrics_counter, get_metrics_value
|
||||
from frostfs_testlib.steps.storage_policy import get_nodes_with_object
|
||||
from frostfs_testlib.storage.cluster import Cluster, ClusterNode
|
||||
from frostfs_testlib.storage.controllers.cluster_state_controller import ClusterStateController
|
||||
from frostfs_testlib.storage.dataclasses.object_size import ObjectSize
|
||||
from frostfs_testlib.storage.dataclasses.wallet import WalletInfo
|
||||
from frostfs_testlib.testing.cluster_test_base import ClusterTestBase
|
||||
from frostfs_testlib.testing.test_control import wait_for_success
|
||||
from frostfs_testlib.utils.file_utils import generate_file
|
||||
|
||||
|
||||
class TestObjectMetrics(ClusterTestBase):
|
||||
@wait_for_success(interval=10)
|
||||
def check_metrics_by_type(
|
||||
self, cluster_nodes: list[ClusterNode], metric_command: str, grep_by: str, metric_type: str, counter_exp: int
|
||||
):
|
||||
counter_act = 0
|
||||
for cluster_node in cluster_nodes:
|
||||
try:
|
||||
metric_result = cluster_node.metrics.storage.get_metrics_search_by_greps(
|
||||
command=metric_command, grep_by=grep_by
|
||||
)
|
||||
counter_act += self.calc_metrics_count_from_stdout(metric_result.stdout, metric_type)
|
||||
except RuntimeError as e:
|
||||
...
|
||||
assert (
|
||||
counter_act == counter_exp
|
||||
), f"Expected metric {metric_type}={counter_exp}, Actual: {counter_act} in nodes: {cluster_nodes}"
|
||||
|
||||
@staticmethod
|
||||
def calc_metrics_count_from_stdout(metric_result_stdout: str, metric_type: str):
|
||||
result = re.findall(rf'type="{metric_type}"}}\s(\d+)', metric_result_stdout)
|
||||
return sum(map(int, result))
|
||||
|
||||
@wait_for_success(interval=10)
|
||||
def check_object_metrics_total_and_container(
|
||||
self, cluster_nodes: list[ClusterNode], cid: str, objects_metric_total: int, objects_metric_container: int
|
||||
):
|
||||
self.check_metrics_by_type(
|
||||
cluster_nodes,
|
||||
"frostfs_node_engine_objects_total",
|
||||
grep_by="user",
|
||||
metric_type="user",
|
||||
counter_exp=objects_metric_total,
|
||||
)
|
||||
|
||||
objects_metric_container_act = 0
|
||||
for node in cluster_nodes:
|
||||
try:
|
||||
metrics_container = node.metrics.storage.get_metrics_search_by_greps(
|
||||
command="frostfs_node_engine_container_objects_total", cid=cid, type="user"
|
||||
)
|
||||
objects_metric_container_act += self.calc_metrics_count_from_stdout(
|
||||
metrics_container.stdout, metric_type="user"
|
||||
)
|
||||
except RuntimeError as e:
|
||||
...
|
||||
assert (
|
||||
objects_metric_container_act == objects_metric_container
|
||||
), f"Expected {objects_metric_container} objects in container"
|
||||
|
||||
@wait_for_success(max_wait_time=120, interval=10)
|
||||
def check_object_metrics_container(
|
||||
self, cluster_nodes: list[ClusterNode], cid: str, objects_metric_container_exp: int
|
||||
):
|
||||
objects_metric_container_act = 0
|
||||
for node in cluster_nodes:
|
||||
try:
|
||||
metrics_container = node.metrics.storage.get_metrics_search_by_greps(
|
||||
command="frostfs_node_engine_container_objects_total", cid=cid, type="user"
|
||||
)
|
||||
objects_metric_container_act += self.calc_metrics_count_from_stdout(
|
||||
metrics_container.stdout, metric_type="user"
|
||||
)
|
||||
except RuntimeError as e:
|
||||
...
|
||||
assert (
|
||||
objects_metric_container_act == objects_metric_container_exp
|
||||
), f"Expected {objects_metric_container_exp} objects in container"
|
||||
|
||||
@allure.title("Object metrics of removed container")
|
||||
@allure.title("Object metrics of removed container (obj_size={object_size})")
|
||||
def test_object_metrics_removed_container(
|
||||
self, object_size: ObjectSize, default_wallet: WalletInfo, cluster: Cluster
|
||||
):
|
||||
|
@ -99,28 +26,26 @@ class TestObjectMetrics(ClusterTestBase):
|
|||
copies = 2
|
||||
|
||||
with reporter.step(f"Create container with policy {placement_policy}"):
|
||||
cid = create_container(
|
||||
default_wallet,
|
||||
rule=placement_policy,
|
||||
shell=self.shell,
|
||||
endpoint=self.cluster.default_rpc_endpoint,
|
||||
)
|
||||
cid = create_container(default_wallet, self.shell, cluster.default_rpc_endpoint, placement_policy)
|
||||
|
||||
with reporter.step("Put object to random node"):
|
||||
storage_object_id = put_object_to_random_node(
|
||||
wallet=default_wallet,
|
||||
path=file_path,
|
||||
cid=cid,
|
||||
shell=self.shell,
|
||||
cluster=cluster,
|
||||
)
|
||||
oid = put_object_to_random_node(default_wallet, file_path, cid, self.shell, cluster)
|
||||
|
||||
with reporter.step("Check metric appears in node where the object is located"):
|
||||
object_nodes = get_object_nodes(
|
||||
cluster=cluster, cid=cid, oid=storage_object_id, alive_node=cluster.cluster_nodes[0]
|
||||
)
|
||||
object_storage_nodes = get_nodes_with_object(cid, oid, self.shell, cluster.storage_nodes)
|
||||
object_nodes = [
|
||||
cluster_node
|
||||
for cluster_node in cluster.cluster_nodes
|
||||
if cluster_node.storage_node in object_storage_nodes
|
||||
]
|
||||
|
||||
self.check_metrics_by_type(object_nodes, "frostfs_node_engine_container_objects_total", cid, "user", copies)
|
||||
check_metrics_counter(
|
||||
object_nodes,
|
||||
counter_exp=copies,
|
||||
command="frostfs_node_engine_container_objects_total",
|
||||
cid=cid,
|
||||
type="user",
|
||||
)
|
||||
|
||||
with reporter.step("Delete container"):
|
||||
delete_container(default_wallet, cid, shell=self.shell, endpoint=self.cluster.default_rpc_endpoint)
|
||||
|
@ -129,13 +54,22 @@ class TestObjectMetrics(ClusterTestBase):
|
|||
self.tick_epochs(epochs_to_tick=2, wait_block=2)
|
||||
|
||||
with reporter.step("Check metrics of removed containers doesn't appear in the storage node"):
|
||||
self.check_metrics_by_type(object_nodes, "frostfs_node_engine_container_objects_total", cid, "user", 0)
|
||||
check_metrics_counter(
|
||||
object_nodes, counter_exp=0, command="frostfs_node_engine_container_objects_total", cid=cid, type="user"
|
||||
)
|
||||
check_metrics_counter(
|
||||
object_nodes, counter_exp=0, command="frostfs_node_engine_container_size_byte", cid=cid
|
||||
)
|
||||
|
||||
for node in object_nodes:
|
||||
with pytest.raises(RuntimeError):
|
||||
node.metrics.storage.get_metric_container(f"frostfs_node_engine_container_size_byte", cid)
|
||||
all_metrics = node.metrics.storage.get_metrics_search_by_greps(
|
||||
command="frostfs_node_engine_container_size_byte"
|
||||
)
|
||||
assert (
|
||||
cid not in all_metrics.stdout
|
||||
), "metrics of removed containers shouldn't appear in the storage node"
|
||||
|
||||
@allure.title("Object metrics, locked object, (policy={placement_policy})")
|
||||
@allure.title("Object metrics, locked object (obj_size={object_size}, policy={placement_policy})")
|
||||
@pytest.mark.parametrize(
|
||||
"placement_policy", ["REP 1 IN X CBF 1 SELECT 1 FROM * AS X", "REP 2 IN X CBF 2 SELECT 2 FROM * AS X"]
|
||||
)
|
||||
|
@ -146,12 +80,7 @@ class TestObjectMetrics(ClusterTestBase):
|
|||
metric_step = int(re.search(r"REP\s(\d+)", placement_policy).group(1))
|
||||
|
||||
with reporter.step(f"Create container with policy {placement_policy}"):
|
||||
cid = create_container(
|
||||
wallet=default_wallet,
|
||||
rule=placement_policy,
|
||||
shell=self.shell,
|
||||
endpoint=self.cluster.default_rpc_endpoint,
|
||||
)
|
||||
cid = create_container(default_wallet, self.shell, cluster.default_rpc_endpoint, placement_policy)
|
||||
|
||||
with reporter.step("Search container nodes"):
|
||||
container_nodes = search_nodes_with_container(
|
||||
|
@ -165,34 +94,51 @@ class TestObjectMetrics(ClusterTestBase):
|
|||
with reporter.step("Get current metrics for metric_type=user"):
|
||||
objects_metric_counter = 0
|
||||
for node in container_nodes:
|
||||
metric_objects_total = node.metrics.storage.get_metrics_search_by_greps(
|
||||
command="frostfs_node_engine_objects_total", type="user"
|
||||
)
|
||||
objects_metric_counter += self.calc_metrics_count_from_stdout(
|
||||
metric_objects_total.stdout, metric_type="user"
|
||||
objects_metric_counter += get_metrics_value(
|
||||
node, command="frostfs_node_engine_objects_total", type="user"
|
||||
)
|
||||
|
||||
with reporter.step("Put object to container node"):
|
||||
oid = put_object(
|
||||
wallet=default_wallet,
|
||||
path=file_path,
|
||||
cid=cid,
|
||||
shell=self.shell,
|
||||
endpoint=container_nodes[0].storage_node.get_rpc_endpoint(),
|
||||
default_wallet, file_path, cid, self.shell, container_nodes[0].storage_node.get_rpc_endpoint()
|
||||
)
|
||||
|
||||
with reporter.step(f"Check metric user 'the counter should increase by {metric_step}'"):
|
||||
objects_metric_counter += metric_step
|
||||
self.check_object_metrics_total_and_container(container_nodes, cid, objects_metric_counter, metric_step)
|
||||
check_metrics_counter(
|
||||
container_nodes,
|
||||
counter_exp=objects_metric_counter,
|
||||
command="frostfs_node_engine_objects_total",
|
||||
type="user",
|
||||
)
|
||||
check_metrics_counter(
|
||||
container_nodes,
|
||||
counter_exp=metric_step,
|
||||
command="frostfs_node_engine_container_objects_total",
|
||||
cid=cid,
|
||||
type="user",
|
||||
)
|
||||
|
||||
with reporter.step("Delete object"):
|
||||
delete_object(default_wallet, cid, oid, self.shell, self.cluster.default_rpc_endpoint)
|
||||
|
||||
with reporter.step(f"Check metric user 'the counter should decrease by {metric_step}'"):
|
||||
objects_metric_counter -= metric_step
|
||||
self.check_object_metrics_total_and_container(container_nodes, cid, objects_metric_counter, 0)
|
||||
check_metrics_counter(
|
||||
container_nodes,
|
||||
counter_exp=objects_metric_counter,
|
||||
command="frostfs_node_engine_objects_total",
|
||||
type="user",
|
||||
)
|
||||
check_metrics_counter(
|
||||
container_nodes,
|
||||
counter_exp=0,
|
||||
command="frostfs_node_engine_container_objects_total",
|
||||
cid=cid,
|
||||
type="user",
|
||||
)
|
||||
|
||||
with reporter.step("Put object and lock it"):
|
||||
with reporter.step("Put object and lock it to next epoch"):
|
||||
oid = put_object(
|
||||
default_wallet, file_path, cid, self.shell, container_nodes[0].storage_node.get_rpc_endpoint()
|
||||
)
|
||||
|
@ -208,18 +154,47 @@ class TestObjectMetrics(ClusterTestBase):
|
|||
|
||||
with reporter.step(f"Check metric user 'the counter should increase by {metric_step}'"):
|
||||
objects_metric_counter += metric_step
|
||||
self.check_object_metrics_total_and_container(container_nodes, cid, objects_metric_counter, metric_step)
|
||||
check_metrics_counter(
|
||||
container_nodes,
|
||||
counter_exp=objects_metric_counter,
|
||||
command="frostfs_node_engine_objects_total",
|
||||
type="user",
|
||||
)
|
||||
check_metrics_counter(
|
||||
container_nodes,
|
||||
counter_exp=metric_step,
|
||||
command="frostfs_node_engine_container_objects_total",
|
||||
cid=cid,
|
||||
type="user",
|
||||
)
|
||||
|
||||
with reporter.step(f"Wait until remove locking 'the counter doesn't change'"):
|
||||
self.tick_epochs(epochs_to_tick=2)
|
||||
self.check_object_metrics_total_and_container(container_nodes, cid, objects_metric_counter, metric_step)
|
||||
check_metrics_counter(
|
||||
container_nodes,
|
||||
counter_exp=objects_metric_counter,
|
||||
command="frostfs_node_engine_objects_total",
|
||||
type="user",
|
||||
)
|
||||
|
||||
with reporter.step("Delete object"):
|
||||
delete_object(default_wallet, cid, oid, self.shell, self.cluster.default_rpc_endpoint)
|
||||
|
||||
with reporter.step(f"Check metric user 'the counter should decrease by {metric_step}'"):
|
||||
objects_metric_counter -= metric_step
|
||||
self.check_object_metrics_total_and_container(container_nodes, cid, objects_metric_counter, 0)
|
||||
check_metrics_counter(
|
||||
container_nodes,
|
||||
counter_exp=objects_metric_counter,
|
||||
command="frostfs_node_engine_objects_total",
|
||||
type="user",
|
||||
)
|
||||
check_metrics_counter(
|
||||
container_nodes,
|
||||
counter_exp=0,
|
||||
command="frostfs_node_engine_container_objects_total",
|
||||
cid=cid,
|
||||
type="user",
|
||||
)
|
||||
|
||||
with reporter.step("Put object with expire_at"):
|
||||
current_epoch = self.get_epoch()
|
||||
|
@ -234,23 +209,43 @@ class TestObjectMetrics(ClusterTestBase):
|
|||
|
||||
with reporter.step(f"Check metric user 'the counter should increase by {metric_step}'"):
|
||||
objects_metric_counter += metric_step
|
||||
self.check_object_metrics_total_and_container(container_nodes, cid, objects_metric_counter, metric_step)
|
||||
check_metrics_counter(
|
||||
container_nodes,
|
||||
counter_exp=objects_metric_counter,
|
||||
command="frostfs_node_engine_objects_total",
|
||||
type="user",
|
||||
)
|
||||
check_metrics_counter(
|
||||
container_nodes,
|
||||
counter_exp=metric_step,
|
||||
command="frostfs_node_engine_container_objects_total",
|
||||
cid=cid,
|
||||
type="user",
|
||||
)
|
||||
|
||||
with reporter.step("Tick Epoch"):
|
||||
self.tick_epochs(epochs_to_tick=2)
|
||||
|
||||
with reporter.step(f"Check metric user 'the counter should decrease by {metric_step}'"):
|
||||
objects_metric_counter -= metric_step
|
||||
self.check_object_metrics_total_and_container(container_nodes, cid, objects_metric_counter, 0)
|
||||
check_metrics_counter(
|
||||
container_nodes,
|
||||
counter_exp=objects_metric_counter,
|
||||
command="frostfs_node_engine_objects_total",
|
||||
type="user",
|
||||
)
|
||||
check_metrics_counter(
|
||||
container_nodes,
|
||||
counter_exp=0,
|
||||
command="frostfs_node_engine_container_objects_total",
|
||||
cid=cid,
|
||||
type="user",
|
||||
)
|
||||
|
||||
with reporter.step("Delete container"):
|
||||
delete_container(default_wallet, cid, shell=self.shell, endpoint=self.cluster.default_rpc_endpoint)
|
||||
|
||||
@allure.title("Object metrics, stop the node")
|
||||
@allure.title("Object metrics, stop the node (obj_size={object_size})")
|
||||
def test_object_metrics_stop_node(
|
||||
self,
|
||||
object_size: ObjectSize,
|
||||
max_object_size: int,
|
||||
default_wallet: WalletInfo,
|
||||
cluster_state_controller: ClusterStateController,
|
||||
):
|
||||
|
@ -259,60 +254,70 @@ class TestObjectMetrics(ClusterTestBase):
|
|||
copies = 2
|
||||
|
||||
with reporter.step(f"Create container with policy {placement_policy}"):
|
||||
cid = create_container(
|
||||
wallet=default_wallet,
|
||||
rule=placement_policy,
|
||||
shell=self.shell,
|
||||
endpoint=self.cluster.default_rpc_endpoint,
|
||||
)
|
||||
cid = create_container(default_wallet, self.shell, self.cluster.default_rpc_endpoint, placement_policy)
|
||||
|
||||
with reporter.step("Search container nodes"):
|
||||
container_nodes = search_nodes_with_container(
|
||||
wallet=default_wallet,
|
||||
with reporter.step(f"Check object metrics in container 'should be zero'"):
|
||||
check_metrics_counter(
|
||||
self.cluster.cluster_nodes,
|
||||
counter_exp=0,
|
||||
command="frostfs_node_engine_container_objects_total",
|
||||
type="user",
|
||||
cid=cid,
|
||||
shell=self.shell,
|
||||
endpoint=self.cluster.default_rpc_endpoint,
|
||||
cluster=self.cluster,
|
||||
)
|
||||
|
||||
with reporter.step("Get current metrics for container nodes"):
|
||||
objects_metric_counter = 0
|
||||
for node in container_nodes:
|
||||
metric_objects_total = node.metrics.storage.get_metrics_search_by_greps(
|
||||
command="frostfs_node_engine_objects_total", type="user"
|
||||
)
|
||||
objects_metric_counter += self.calc_metrics_count_from_stdout(
|
||||
metric_objects_total.stdout, metric_type="user"
|
||||
with reporter.step("Get current metrics for each nodes"):
|
||||
objects_metric_counter: dict[ClusterNode:int] = {}
|
||||
for node in self.cluster.cluster_nodes:
|
||||
objects_metric_counter[node] = get_metrics_value(
|
||||
node, command="frostfs_node_engine_objects_total", type="user"
|
||||
)
|
||||
|
||||
with reporter.step("Put object to container node"):
|
||||
oid = put_object(
|
||||
wallet=default_wallet,
|
||||
path=file_path,
|
||||
cid=cid,
|
||||
shell=self.shell,
|
||||
endpoint=container_nodes[0].storage_node.get_rpc_endpoint(),
|
||||
)
|
||||
with reporter.step("Put object"):
|
||||
oid = put_object(default_wallet, file_path, cid, self.shell, self.cluster.default_rpc_endpoint)
|
||||
|
||||
with reporter.step(f"Check metric in container nodes 'the counter should increase by {copies}'"):
|
||||
objects_metric_counter += copies
|
||||
self.check_object_metrics_total_and_container(container_nodes, cid, objects_metric_counter, copies)
|
||||
with reporter.step("Get object nodes"):
|
||||
object_storage_nodes = get_nodes_with_object(cid, oid, self.shell, self.cluster.storage_nodes)
|
||||
object_nodes = [
|
||||
cluster_node
|
||||
for cluster_node in self.cluster.cluster_nodes
|
||||
if cluster_node.storage_node in object_storage_nodes
|
||||
]
|
||||
|
||||
with reporter.step(f"Check metrics in object nodes 'the counter should increase by {copies}'"):
|
||||
counter_exp = sum(objects_metric_counter[node] for node in object_nodes) + copies
|
||||
check_metrics_counter(
|
||||
object_nodes, counter_exp=counter_exp, command="frostfs_node_engine_objects_total", type="user"
|
||||
)
|
||||
check_metrics_counter(
|
||||
object_nodes,
|
||||
counter_exp=copies,
|
||||
command="frostfs_node_engine_container_objects_total",
|
||||
type="user",
|
||||
cid=cid,
|
||||
)
|
||||
|
||||
with reporter.step(f"Select node to stop"):
|
||||
node_to_stop = container_nodes[0]
|
||||
alive_nodes = [node for node in container_nodes if node != node_to_stop]
|
||||
node_to_stop = random.choice(object_nodes)
|
||||
alive_nodes = set(object_nodes).difference({node_to_stop})
|
||||
|
||||
with reporter.step(f"Stop the node, wait until the object is replicated to another node"):
|
||||
cluster_state_controller.stop_node_host(node_to_stop, "hard")
|
||||
objects_metric_counter[node_to_stop] += 1
|
||||
|
||||
with reporter.step(f"Check metric in alive nodes 'the counter should increase by 1'"):
|
||||
self.check_object_metrics_container(alive_nodes, cid, copies)
|
||||
with reporter.step(f"Check metric in alive nodes 'the counter should increase'"):
|
||||
counter_exp = sum(objects_metric_counter[node] for node in alive_nodes)
|
||||
check_metrics_counter(
|
||||
alive_nodes, ">=", counter_exp, command="frostfs_node_engine_objects_total", type="user"
|
||||
)
|
||||
|
||||
with reporter.step("Start node"):
|
||||
cluster_state_controller.start_node_host(node_to_stop)
|
||||
|
||||
with reporter.step(f"Check metric in container nodes 'the counter doesn't change'"):
|
||||
self.check_object_metrics_total_and_container(container_nodes, cid, objects_metric_counter, copies)
|
||||
|
||||
with reporter.step("Delete container"):
|
||||
delete_container(default_wallet, cid, shell=self.shell, endpoint=self.cluster.default_rpc_endpoint)
|
||||
with reporter.step(f"Check metric in restarted node, 'the counter doesn't change'"):
|
||||
check_metrics_counter(
|
||||
object_nodes,
|
||||
counter_exp=copies,
|
||||
command="frostfs_node_engine_container_objects_total",
|
||||
type="user",
|
||||
cid=cid,
|
||||
)
|
||||
|
|
|
@ -4,11 +4,13 @@ import re
|
|||
import allure
|
||||
import pytest
|
||||
from frostfs_testlib import reporter
|
||||
from frostfs_testlib.resources.error_patterns import OBJECT_NOT_FOUND
|
||||
from frostfs_testlib.resources.wellknown_acl import EACL_PUBLIC_READ_WRITE
|
||||
from frostfs_testlib.shell import Shell
|
||||
from frostfs_testlib.steps.cli.container import create_container, delete_container
|
||||
from frostfs_testlib.steps.cli.object import get_object, get_object_nodes, put_object
|
||||
from frostfs_testlib.steps.cli.container import create_container
|
||||
from frostfs_testlib.steps.cli.object import get_object, put_object
|
||||
from frostfs_testlib.steps.metrics import check_metrics_counter
|
||||
from frostfs_testlib.steps.node_management import node_shard_list, node_shard_set_mode
|
||||
from frostfs_testlib.steps.storage_policy import get_nodes_with_object
|
||||
from frostfs_testlib.storage.cluster import Cluster, ClusterNode
|
||||
from frostfs_testlib.storage.controllers import ShardsWatcher
|
||||
from frostfs_testlib.storage.dataclasses.wallet import WalletInfo
|
||||
|
@ -45,28 +47,13 @@ class TestShardMetrics(ClusterTestBase):
|
|||
watcher.set_shard_mode(shard["shard_id"], mode="read-write")
|
||||
watcher.await_for_all_shards_status(status="read-write")
|
||||
|
||||
@wait_for_success(interval=10)
|
||||
def check_metrics_in_node(self, cluster_node: ClusterNode, counter_exp: int, **metrics_greps: str):
|
||||
counter_act = 0
|
||||
try:
|
||||
metric_result = cluster_node.metrics.storage.get_metrics_search_by_greps(**metrics_greps)
|
||||
counter_act += self.calc_metrics_count_from_stdout(metric_result.stdout)
|
||||
except RuntimeError as e:
|
||||
...
|
||||
assert counter_act == counter_exp, f"Expected: {counter_exp}, Actual: {counter_act} in node: {cluster_node}"
|
||||
|
||||
@staticmethod
|
||||
def calc_metrics_count_from_stdout(metric_result_stdout: str):
|
||||
result = re.findall(r"}\s(\d+)", metric_result_stdout)
|
||||
return sum(map(int, result))
|
||||
|
||||
@staticmethod
|
||||
def get_error_count_from_logs(shell: Shell, object_path: str, object_name: str):
|
||||
def get_error_count_from_logs(cluster_node: ClusterNode, object_path: str, object_name: str):
|
||||
error_count = 0
|
||||
try:
|
||||
logs = shell.exec(f"journalctl -u frostfs-storage --grep='error count' --no-pager")
|
||||
logs = cluster_node.host.get_filtered_logs("error count", unit="frostfs-storage")
|
||||
# search error logs for current object
|
||||
for error_line in logs.stdout.split("\n"):
|
||||
for error_line in logs.split("\n"):
|
||||
if object_path in error_line and object_name in error_line:
|
||||
result = re.findall(r'"error\scount":\s(\d+)', error_line)
|
||||
error_count += sum(map(int, result))
|
||||
|
@ -106,17 +93,21 @@ class TestShardMetrics(ClusterTestBase):
|
|||
node_shard_set_mode(node.storage_node, shard1, "read-only")
|
||||
|
||||
with reporter.step(f"Check shard metrics, 'the mode will change to 'READ_ONLY'"):
|
||||
self.check_metrics_in_node(
|
||||
node, metrics_counter, command="frostfs_node_engine_mode_info", mode="READ_ONLY", shard_id=shard1
|
||||
check_metrics_counter(
|
||||
[node],
|
||||
counter_exp=metrics_counter,
|
||||
command="frostfs_node_engine_mode_info",
|
||||
mode="READ_ONLY",
|
||||
shard_id=shard1,
|
||||
)
|
||||
|
||||
with reporter.step("Shard2 set to mode 'degraded-read-only'"):
|
||||
node_shard_set_mode(node.storage_node, shard2, "degraded-read-only")
|
||||
|
||||
with reporter.step(f"Check shard metrics, 'the mode will change to 'DEGRADED_READ_ONLY'"):
|
||||
self.check_metrics_in_node(
|
||||
node,
|
||||
metrics_counter,
|
||||
check_metrics_counter(
|
||||
[node],
|
||||
counter_exp=metrics_counter,
|
||||
command="frostfs_node_engine_mode_info",
|
||||
mode="DEGRADED_READ_ONLY",
|
||||
shard_id=shard2,
|
||||
|
@ -126,9 +117,14 @@ class TestShardMetrics(ClusterTestBase):
|
|||
for shard in [shard1, shard2]:
|
||||
node_shard_set_mode(node.storage_node, shard, "read-write")
|
||||
|
||||
with reporter.step(f"Check shard metrics, 'the mode will change to 'READ_WRITE'"):
|
||||
self.check_metrics_in_node(
|
||||
node, metrics_counter, command="frostfs_node_engine_mode_info", mode="READ_WRITE", shard_id=shard
|
||||
with reporter.step(f"Check shard metrics, 'the mode will change to 'READ_WRITE'"):
|
||||
for shard in [shard1, shard2]:
|
||||
check_metrics_counter(
|
||||
[node],
|
||||
counter_exp=metrics_counter,
|
||||
command="frostfs_node_engine_mode_info",
|
||||
mode="READ_WRITE",
|
||||
shard_id=shard,
|
||||
)
|
||||
|
||||
@allure.title("Metric for error count on shard")
|
||||
|
@ -141,17 +137,22 @@ class TestShardMetrics(ClusterTestBase):
|
|||
cid = create_container(
|
||||
wallet=default_wallet,
|
||||
shell=self.shell,
|
||||
endpoint=self.cluster.default_rpc_endpoint,
|
||||
endpoint=cluster.default_rpc_endpoint,
|
||||
rule="REP 1 CBF 1",
|
||||
basic_acl=EACL_PUBLIC_READ_WRITE,
|
||||
)
|
||||
|
||||
with reporter.step("Put object"):
|
||||
oid = put_object(default_wallet, file_path, cid, self.shell, self.cluster.default_rpc_endpoint)
|
||||
oid = put_object(default_wallet, file_path, cid, self.shell, cluster.default_rpc_endpoint)
|
||||
|
||||
with reporter.step("Get object nodes"):
|
||||
object_nodes = get_object_nodes(cluster, cid, oid, cluster.cluster_nodes[0])
|
||||
node = object_nodes[0]
|
||||
object_storage_nodes = get_nodes_with_object(cid, oid, self.shell, cluster.storage_nodes)
|
||||
object_nodes = [
|
||||
cluster_node
|
||||
for cluster_node in cluster.cluster_nodes
|
||||
if cluster_node.storage_node in object_storage_nodes
|
||||
]
|
||||
node = random.choice(object_nodes)
|
||||
|
||||
with reporter.step("Search object in system."):
|
||||
object_path, object_name = self.get_object_path_and_name_file(oid, cid, node)
|
||||
|
@ -160,7 +161,7 @@ class TestShardMetrics(ClusterTestBase):
|
|||
node.host.get_shell().exec(f"chmod a-r {object_path}/{object_name}")
|
||||
|
||||
with reporter.step("Get object, expect error"):
|
||||
with pytest.raises(RuntimeError):
|
||||
with pytest.raises(RuntimeError, match=OBJECT_NOT_FOUND):
|
||||
get_object(
|
||||
wallet=default_wallet,
|
||||
cid=cid,
|
||||
|
@ -170,10 +171,7 @@ class TestShardMetrics(ClusterTestBase):
|
|||
)
|
||||
|
||||
with reporter.step(f"Get shard error count from logs"):
|
||||
counter = self.get_error_count_from_logs(node.host.get_shell(), object_path, object_name)
|
||||
counter = self.get_error_count_from_logs(node, object_path, object_name)
|
||||
|
||||
with reporter.step(f"Check shard error metrics"):
|
||||
self.check_metrics_in_node(node, counter, command="frostfs_node_engine_errors_total")
|
||||
|
||||
with reporter.step("Delete container"):
|
||||
delete_container(default_wallet, cid, self.shell, cluster.default_rpc_endpoint)
|
||||
check_metrics_counter([node], counter_exp=counter, command="frostfs_node_engine_errors_total")
|
||||
|
|
|
@ -62,10 +62,10 @@ class TestS3GateBucket:
|
|||
|
||||
s3_client.head_bucket(bucket_1)
|
||||
|
||||
with reporter.step(f"Delete empty bucket {bucket_2}"):
|
||||
with reporter.step("Delete empty bucket_2"):
|
||||
s3_client.delete_bucket(bucket_2)
|
||||
|
||||
with reporter.step(f"Check bucket {bucket_2} deleted"):
|
||||
with reporter.step("Check bucket_2 is deleted"):
|
||||
with pytest.raises(Exception, match=r".*Not Found.*"):
|
||||
s3_client.head_bucket(bucket_2)
|
||||
|
||||
|
@ -73,14 +73,14 @@ class TestS3GateBucket:
|
|||
assert bucket_1 in buckets, f"Expected bucket {bucket_1} is in the list"
|
||||
assert bucket_2 not in buckets, f"Expected bucket {bucket_2} is not in the list"
|
||||
|
||||
with reporter.step(f"Delete object from {bucket_1}"):
|
||||
with reporter.step("Delete object from bucket_1"):
|
||||
s3_client.delete_object(bucket_1, file_name, version_id)
|
||||
s3_helper.check_objects_in_bucket(s3_client, bucket_1, expected_objects=[])
|
||||
|
||||
with reporter.step(f"Delete bucket {bucket_1}"):
|
||||
with reporter.step("Delete bucket_1"):
|
||||
s3_client.delete_bucket(bucket_1)
|
||||
|
||||
with reporter.step(f"Check bucket {bucket_1} deleted"):
|
||||
with reporter.step("Check bucket_1 deleted"):
|
||||
with pytest.raises(Exception, match=r".*Not Found.*"):
|
||||
s3_client.head_bucket(bucket_1)
|
||||
|
||||
|
|
|
@ -7,6 +7,7 @@ from frostfs_testlib.steps.s3 import s3_helper
|
|||
from frostfs_testlib.storage.dataclasses.object_size import ObjectSize
|
||||
from frostfs_testlib.storage.dataclasses.wallet import WalletInfo
|
||||
from frostfs_testlib.testing.cluster_test_base import ClusterTestBase
|
||||
from frostfs_testlib.testing.test_control import wait_for_success
|
||||
from frostfs_testlib.utils.file_utils import generate_file, get_file_hash, split_file
|
||||
|
||||
PART_SIZE = 5 * 1024 * 1024
|
||||
|
@ -68,11 +69,11 @@ class TestS3GateMultipart(ClusterTestBase):
|
|||
with reporter.step("Delete the object"):
|
||||
s3_client.delete_object(bucket, object_key, version_id)
|
||||
|
||||
with reporter.step("List objects in the bucket, expect to be empty"):
|
||||
with reporter.step("There should be no objects in bucket"):
|
||||
objects_list = s3_client.list_objects(bucket)
|
||||
assert not objects_list, f"Expected empty bucket, got {objects_list}"
|
||||
|
||||
with reporter.step("List objects in the container via rpc, expect to be empty"):
|
||||
with reporter.step("There should be no objects in container"):
|
||||
objects = list_objects(default_wallet, self.shell, container_id, self.cluster.default_rpc_endpoint)
|
||||
assert len(objects) == 0, f"Expected no objects in container, got\n{objects}"
|
||||
|
||||
|
@ -92,7 +93,7 @@ class TestS3GateMultipart(ClusterTestBase):
|
|||
files_count = len(to_upload)
|
||||
upload_key = "multipart_abort"
|
||||
|
||||
with reporter.step(f"Get related container_id for bucket '{bucket}'"):
|
||||
with reporter.step("Get related container_id for bucket"):
|
||||
for cluster_node in self.cluster.cluster_nodes:
|
||||
container_id = search_container_by_name(bucket, cluster_node)
|
||||
if container_id:
|
||||
|
@ -101,15 +102,15 @@ class TestS3GateMultipart(ClusterTestBase):
|
|||
with reporter.step("Create multipart upload"):
|
||||
upload_id = s3_client.create_multipart_upload(bucket, upload_key)
|
||||
|
||||
with reporter.step(f"Upload {files_count} files to multipart upload"):
|
||||
with reporter.step(f"Upload {files_count} parts to multipart upload"):
|
||||
for i, file in enumerate(to_upload, 1):
|
||||
s3_client.upload_part(bucket, upload_key, upload_id, i, file)
|
||||
|
||||
with reporter.step(f"Check that we have {files_count} files in bucket"):
|
||||
with reporter.step(f"There should be {files_count} objects in bucket"):
|
||||
parts = s3_client.list_parts(bucket, upload_key, upload_id)
|
||||
assert len(parts) == files_count, f"Expected {files_count} parts, got\n{parts}"
|
||||
|
||||
with reporter.step(f"Check that we have {files_count} files in container '{container_id}'"):
|
||||
with reporter.step(f"There should be {files_count} objects in container"):
|
||||
objects = list_objects(default_wallet, self.shell, container_id, self.cluster.default_rpc_endpoint)
|
||||
assert len(objects) == files_count, f"Expected {files_count} objects in container, got\n{objects}"
|
||||
|
||||
|
@ -118,13 +119,18 @@ class TestS3GateMultipart(ClusterTestBase):
|
|||
uploads = s3_client.list_multipart_uploads(bucket)
|
||||
assert not uploads, f"Expected no uploads in bucket {bucket}"
|
||||
|
||||
with reporter.step("Check that we have no files in bucket since upload was aborted"):
|
||||
with reporter.step("There should be no objects in bucket"):
|
||||
with pytest.raises(Exception, match=self.NO_SUCH_UPLOAD):
|
||||
s3_client.list_parts(bucket, upload_key, upload_id)
|
||||
|
||||
with reporter.step("Check that we have no files in container since upload was aborted"):
|
||||
objects = list_objects(default_wallet, self.shell, container_id, self.cluster.default_rpc_endpoint)
|
||||
assert len(objects) == 0, f"Expected no objects in container, got\n{objects}"
|
||||
with reporter.step("There should be no objects in container"):
|
||||
|
||||
@wait_for_success(120, 10)
|
||||
def check_no_objects():
|
||||
objects = list_objects(default_wallet, self.shell, container_id, self.cluster.default_rpc_endpoint)
|
||||
assert len(objects) == 0, f"Expected no objects in container, got\n{objects}"
|
||||
|
||||
check_no_objects()
|
||||
|
||||
@allure.title("Upload Part Copy (s3_client={s3_client})")
|
||||
@pytest.mark.parametrize("versioning_status", [VersioningStatus.ENABLED], indirect=True)
|
||||
|
@ -159,6 +165,6 @@ class TestS3GateMultipart(ClusterTestBase):
|
|||
s3_client.complete_multipart_upload(bucket, object_key, upload_id, parts)
|
||||
assert len(got_parts) == len(part_files), f"Expected {parts_count} parts, got\n{got_parts}"
|
||||
|
||||
with reporter.step("Check we can get whole object from bucket"):
|
||||
with reporter.step("Get whole object from bucket"):
|
||||
got_object = s3_client.get_object(bucket, object_key)
|
||||
assert get_file_hash(got_object) == get_file_hash(file_name_large)
|
||||
|
|
|
@ -884,17 +884,24 @@ class TestS3GateObject:
|
|||
self,
|
||||
s3_client: S3ClientWrapper,
|
||||
bucket: str,
|
||||
temp_directory,
|
||||
simple_object_size: ObjectSize,
|
||||
):
|
||||
path = "/".join(["".join(random.choices(string.ascii_letters, k=3)) for _ in range(10)])
|
||||
file_path_1 = TestFile(os.path.join(temp_directory, path, "test_file_1"))
|
||||
generate_file_with_content(simple_object_size.value, file_path=file_path_1)
|
||||
file_name = s3_helper.object_key_from_file_path(file_path_1)
|
||||
key_characters_sample = string.ascii_letters + string.digits + "._-"
|
||||
|
||||
with reporter.step("Put object"):
|
||||
s3_client.put_object(bucket, file_path_1)
|
||||
s3_helper.check_objects_in_bucket(s3_client, bucket, [file_name])
|
||||
test_file = generate_file(simple_object_size.value)
|
||||
obj_key = (
|
||||
"/"
|
||||
+ "/".join(["".join(random.choices(key_characters_sample, k=5)) for _ in range(10)])
|
||||
+ "/test_file_1"
|
||||
)
|
||||
s3_client.put_object(bucket, test_file, obj_key)
|
||||
|
||||
with reporter.step("Check object can be downloaded"):
|
||||
s3_client.get_object(bucket, obj_key)
|
||||
|
||||
with reporter.step("Check object listing"):
|
||||
s3_helper.check_objects_in_bucket(s3_client, bucket, [obj_key])
|
||||
|
||||
@allure.title("Delete non-existing object from empty bucket (s3_client={s3_client})")
|
||||
def test_s3_delete_non_existing_object(self, s3_client: S3ClientWrapper, bucket: str):
|
||||
|
|
Loading…
Reference in a new issue