Compare commits

...

8 commits

7 changed files with 378 additions and 16 deletions

View file

@ -92,7 +92,7 @@ class TestEACLContainer(ClusterTestBase):
cluster=self.cluster,
)
with reporter.step(f"Check {not_deny_role_wallet} has full access to eACL public container"):
with reporter.step(f"Check {not_deny_role_str} has full access to eACL public container"):
check_full_access_to_container(
not_deny_role_wallet,
cid,

View file

@ -23,7 +23,7 @@ from frostfs_testlib.resources.common import (
)
from frostfs_testlib.s3 import AwsCliClient, Boto3ClientWrapper, S3ClientWrapper, VersioningStatus
from frostfs_testlib.shell import LocalShell, Shell
from frostfs_testlib.steps.cli.container import DEFAULT_PLACEMENT_RULE, DEFAULT_EC_PLACEMENT_RULE
from frostfs_testlib.steps.cli.container import DEFAULT_EC_PLACEMENT_RULE, DEFAULT_PLACEMENT_RULE
from frostfs_testlib.steps.cli.object import get_netmap_netinfo
from frostfs_testlib.steps.s3 import s3_helper
from frostfs_testlib.storage import get_service_registry
@ -210,6 +210,7 @@ def object_size(
return complex_object_size
@pytest.fixture(scope="session")
def rep_placement_policy() -> PlacementPolicy:
return PlacementPolicy("rep", DEFAULT_PLACEMENT_RULE)
@ -222,10 +223,7 @@ def ec_placement_policy() -> PlacementPolicy:
# By default we want all tests to be executed with both storage policies.
# This can be overriden in choosen tests if needed.
@pytest.fixture(
scope="session",
params=[pytest.param("rep", marks=pytest.mark.rep), pytest.param("ec", marks=pytest.mark.ec)],
)
@pytest.fixture(scope="session", params=[pytest.param("rep", marks=pytest.mark.rep)])
def placement_policy(
rep_placement_policy: PlacementPolicy, ec_placement_policy: PlacementPolicy, request: pytest.FixtureRequest
) -> PlacementPolicy:
@ -234,6 +232,7 @@ def placement_policy(
return ec_placement_policy
@pytest.fixture(scope="session")
def cluster(temp_directory: str, hosting: Hosting, client_shell: Shell) -> Cluster:
cluster = Cluster(hosting)

View file

@ -387,6 +387,7 @@ class TestMaintenanceMode(ClusterTestBase):
expected_status == node_snapshot.node_status
), f"{node_under_test} status should be {expected_status}, but was {node_snapshot.node_status}. See netmap:\n{netmap}"
@pytest.mark.skip(reason="The test is not stable for 0.39")
@allure.title("Test of basic node operations in maintenance mode")
def test_maintenance_mode(
self,
@ -456,6 +457,7 @@ class TestMaintenanceMode(ClusterTestBase):
os.remove(file_path)
@pytest.mark.skip(reason="The test is not stable for 0.39")
@pytest.mark.sanity
@allure.title("MAINTENANCE and OFFLINE mode transitions")
def test_mode_transitions(
@ -558,6 +560,7 @@ class TestMaintenanceMode(ClusterTestBase):
with reporter.step("Check node status is 'maintenance'"):
self.check_node_status(NodeStatus.MAINTENANCE, node_under_test, frostfs_cli, alive_rpc_endpoint)
@pytest.mark.skip(reason="The test is not stable for 0.39")
@allure.title("A node cannot go into maintenance if maintenance is prohibited globally in the network")
def test_maintenance_globally_forbidden(
self,

View file

@ -0,0 +1,97 @@
import math
import re
import allure
import pytest
from frostfs_testlib import reporter
from frostfs_testlib.steps.cli.container import create_container, delete_container
from frostfs_testlib.steps.cli.object import delete_object, get_object_nodes, put_object_to_random_node
from frostfs_testlib.storage.cluster import Cluster, ClusterNode
from frostfs_testlib.storage.dataclasses.object_size import ObjectSize
from frostfs_testlib.storage.dataclasses.wallet import WalletInfo
from frostfs_testlib.testing.cluster_test_base import ClusterTestBase
from frostfs_testlib.testing.test_control import wait_for_success
from frostfs_testlib.utils.file_utils import generate_file
@pytest.mark.container
class TestContainerMetrics(ClusterTestBase):
@wait_for_success(interval=10)
def check_sum_counter_metrics_in_nodes(
self, cluster_nodes: list[ClusterNode], cid: str, phy_exp: int, logic_exp: int, user_exp: int
):
counter_phy = 0
counter_logic = 0
counter_user = 0
for cluster_node in cluster_nodes:
metric_result = cluster_node.metrics.storage.get_metric_container(f"container_objects_total", cid)
counter_phy += self.get_count_metric_type_from_stdout(metric_result.stdout, "phy")
counter_logic += self.get_count_metric_type_from_stdout(metric_result.stdout, "logic")
counter_user += self.get_count_metric_type_from_stdout(metric_result.stdout, "user")
assert counter_phy == phy_exp, f"Expected metric Phy={phy_exp}, Actual: {counter_phy} in nodes: {cluster_nodes}"
assert (
counter_logic == logic_exp
), f"Expected metric logic={logic_exp}, Actual: {counter_logic} in nodes: {cluster_nodes}"
assert (
counter_user == user_exp
), f"Expected metric User={user_exp}, Actual: {counter_user} in nodes: {cluster_nodes}"
@staticmethod
def get_count_metric_type_from_stdout(metric_result_stdout: str, metric_type: str):
result = re.findall(rf'type="{metric_type}"}}\s(\d+)', metric_result_stdout)
return sum(map(int, result))
@allure.title("Container metrics (obj_size={object_size})")
def test_container_metrics(
self, object_size: ObjectSize, max_object_size: int, default_wallet: WalletInfo, cluster: Cluster
):
file_path = generate_file(object_size.value)
placement_policy = "REP 2 IN X CBF 2 SELECT 2 FROM * AS X"
copies = 2
object_chunks = 0
head_object = 1
link_object = 0
if object_size.value > max_object_size:
object_chunks = math.ceil(object_size.value / max_object_size)
link_object = 1
with reporter.step(f"Create container with policy {placement_policy}"):
cid = create_container(
default_wallet,
rule=placement_policy,
shell=self.shell,
endpoint=self.cluster.default_rpc_endpoint,
)
with reporter.step("Put object to random node"):
storage_object_id = put_object_to_random_node(
wallet=default_wallet,
path=file_path,
cid=cid,
shell=self.shell,
cluster=cluster,
)
with reporter.step("Check metric appears in node where the object is located"):
object_nodes = get_object_nodes(
cluster=cluster, cid=cid, oid=storage_object_id, alive_node=cluster.cluster_nodes[0]
)
count_metrics_exp = (object_chunks + head_object + link_object) * copies
self.check_sum_counter_metrics_in_nodes(
object_nodes, cid, phy_exp=count_metrics_exp, logic_exp=count_metrics_exp, user_exp=copies
)
with reporter.step("Delete file, wait until gc remove object"):
delete_object(default_wallet, cid, storage_object_id, self.shell, self.cluster.default_rpc_endpoint)
count_metrics_exp = len(object_nodes)
self.check_sum_counter_metrics_in_nodes(
object_nodes, cid, phy_exp=count_metrics_exp, logic_exp=count_metrics_exp, user_exp=0
)
with reporter.step("Check metrics(Phy, Logic, User) in each nodes"):
# Phy and Logic metrics are 4, because in rule 'CBF 2 SELECT 2 FROM', cbf2*sel2=4
self.check_sum_counter_metrics_in_nodes(cluster.cluster_nodes, cid, phy_exp=4, logic_exp=4, user_exp=0)
with reporter.step("Delete container"):
delete_container(default_wallet, cid, shell=self.shell, endpoint=self.cluster.default_rpc_endpoint)

View file

@ -0,0 +1,225 @@
import random
import re
import allure
import pytest
from frostfs_testlib import reporter
from frostfs_testlib.healthcheck.interfaces import Healthcheck
from frostfs_testlib.steps.cli.container import create_container, delete_container, get_container, list_containers
from frostfs_testlib.steps.cli.object import get_object, head_object, put_object, search_object
from frostfs_testlib.steps.cli.tree import get_tree_list
from frostfs_testlib.storage.cluster import Cluster, ClusterNode
from frostfs_testlib.storage.controllers.cluster_state_controller import ClusterStateController
from frostfs_testlib.storage.controllers.state_managers.config_state_manager import ConfigStateManager
from frostfs_testlib.storage.dataclasses.frostfs_services import StorageNode
from frostfs_testlib.storage.dataclasses.object_size import ObjectSize
from frostfs_testlib.storage.dataclasses.wallet import WalletInfo
from frostfs_testlib.testing.cluster_test_base import ClusterTestBase
from frostfs_testlib.testing.test_control import wait_for_success
from frostfs_testlib.utils.file_utils import generate_file
class TestGRPCMetrics(ClusterTestBase):
@pytest.fixture
def disable_policer(self, cluster_state_controller: ClusterStateController):
config_manager = cluster_state_controller.manager(ConfigStateManager)
config_manager.set_on_all_nodes(StorageNode, {"policer:unsafe_disable": "true"})
yield
cluster_state_controller.manager(ConfigStateManager).revert_all()
@wait_for_success(interval=10)
def check_metrics_in_node(self, cluster_node: ClusterNode, counter_exp: int, **metrics_greps: str):
counter_act = self.get_metrics_value(cluster_node, **metrics_greps)
assert counter_act == counter_exp, f"Expected: {counter_exp}, Actual: {counter_act} in node: {cluster_node}"
def get_metrics_value(self, node: ClusterNode, **metrics_greps: str):
try:
command_result = node.metrics.storage.get_metrics_search_by_greps(**metrics_greps)
metrics_counter = self.calc_metrics_count_from_stdout(command_result.stdout)
except RuntimeError as e:
metrics_counter = 0
return metrics_counter
@staticmethod
def calc_metrics_count_from_stdout(metric_result_stdout: str):
result = re.findall(r"}\s(\d+)", metric_result_stdout)
return sum(map(int, result))
@allure.title("GRPC metrics container operations")
def test_grpc_metrics_container_operations(self, default_wallet: WalletInfo, cluster: Cluster):
placement_policy = "REP 2 IN X CBF 1 SELECT 4 FROM * AS X"
with reporter.step("Select random node"):
node = random.choice(cluster.cluster_nodes)
with reporter.step("Get current gRPC metrics for method 'Put'"):
metrics_counter_put = self.get_metrics_value(
node, command="grpc_server_handled_total", service="ContainerService", method="Put"
)
with reporter.step(f"Create container with policy {placement_policy}"):
cid = create_container(default_wallet, self.shell, node.storage_node.get_rpc_endpoint(), placement_policy)
with reporter.step(f"Check gRPC metrics method 'Put', 'the counter should increase by 1'"):
metrics_counter_put += 1
self.check_metrics_in_node(
node, metrics_counter_put, command="grpc_server_handled_total", service="ContainerService", method="Put"
)
with reporter.step("Get current gRPC metrics for method 'Get'"):
metrics_counter_get = self.get_metrics_value(
node, command="grpc_server_handled_total", service="ContainerService", method="Get"
)
with reporter.step(f"Get container"):
get_container(default_wallet, cid, self.shell, node.storage_node.get_rpc_endpoint())
with reporter.step(f"Check gRPC metrics method=Get, 'the counter should increase by 1'"):
metrics_counter_get += 1
self.check_metrics_in_node(
node, metrics_counter_get, command="grpc_server_handled_total", service="ContainerService", method="Get"
)
with reporter.step("Get current gRPC metrics for method 'List'"):
metrics_counter_list = self.get_metrics_value(
node, command="grpc_server_handled_total", service="ContainerService", method="List"
)
with reporter.step(f"Get container list"):
list_containers(default_wallet, self.shell, node.storage_node.get_rpc_endpoint())
with reporter.step(f"Check gRPC metrics method=List, 'the counter should increase by 1'"):
metrics_counter_list += 1
self.check_metrics_in_node(
node,
metrics_counter_list,
command="grpc_server_handled_total",
service="ContainerService",
method="List",
)
with reporter.step("Delete container"):
delete_container(default_wallet, cid, self.shell, self.cluster.default_rpc_endpoint)
@allure.title("GRPC metrics object operations")
def test_grpc_metrics_object_operations(
self, simple_object_size: ObjectSize, default_wallet: WalletInfo, cluster: Cluster, disable_policer
):
file_path = generate_file(simple_object_size.value)
placement_policy = "REP 2 IN X CBF 1 SELECT 4 FROM * AS X"
with reporter.step("Select random node"):
node = random.choice(cluster.cluster_nodes)
with reporter.step(f"Create container with policy {placement_policy}"):
cid = create_container(default_wallet, self.shell, node.storage_node.get_rpc_endpoint(), placement_policy)
with reporter.step("Get current gRPC metrics for method 'Put'"):
metrics_counter_put = self.get_metrics_value(
node, command="grpc_server_handled_total", service="ObjectService", method="Put"
)
with reporter.step("Put object to selected node"):
oid = put_object(default_wallet, file_path, cid, self.shell, node.storage_node.get_rpc_endpoint())
with reporter.step(f"Check gRPC metrics method 'Put', 'the counter should increase by 1'"):
metrics_counter_put += 1
self.check_metrics_in_node(
node, metrics_counter_put, command="grpc_server_handled_total", service="ObjectService", method="Put"
)
with reporter.step("Get current gRPC metrics for method 'Get'"):
metrics_counter_get = self.get_metrics_value(
node, command="grpc_server_handled_total", service="ObjectService", method="Get"
)
with reporter.step(f"Get object"):
get_object(default_wallet, cid, oid, self.shell, node.storage_node.get_rpc_endpoint())
with reporter.step(f"Check gRPC metrics method=Get, 'the counter should increase by 1'"):
metrics_counter_get += 1
self.check_metrics_in_node(
node, metrics_counter_get, command="grpc_server_handled_total", service="ObjectService", method="Get"
)
with reporter.step("Get current gRPC metrics for method 'Search'"):
metrics_counter_search = self.get_metrics_value(
node, command="grpc_server_handled_total", service="ObjectService", method="Search"
)
with reporter.step(f"Search object"):
search_object(default_wallet, cid, self.shell, node.storage_node.get_rpc_endpoint())
with reporter.step(f"Check gRPC metrics method=Search, 'the counter should increase by 1'"):
metrics_counter_search += 1
self.check_metrics_in_node(
node,
metrics_counter_search,
command="grpc_server_handled_total",
service="ObjectService",
method="Search",
)
with reporter.step("Get current gRPC metrics for method 'Head'"):
metrics_counter_head = self.get_metrics_value(
node, command="grpc_server_handled_total", service="ObjectService", method="Head"
)
with reporter.step(f"Head object"):
head_object(default_wallet, cid, oid, self.shell, node.storage_node.get_rpc_endpoint())
with reporter.step(f"Check gRPC metrics method=Head, 'the counter should increase by 1'"):
metrics_counter_head += 1
self.check_metrics_in_node(
node, metrics_counter_head, command="grpc_server_handled_total", service="ObjectService", method="Head"
)
with reporter.step("Delete container"):
delete_container(default_wallet, cid, self.shell, self.cluster.default_rpc_endpoint)
@allure.title("GRPC metrics Tree healthcheck")
def test_grpc_metrics_tree_service(self, cluster: Cluster, healthcheck: Healthcheck):
with reporter.step("Select random node"):
node = random.choice(cluster.cluster_nodes)
with reporter.step("Get current gRPC metrics for Healthcheck"):
metrics_counter = self.get_metrics_value(
node, command="grpc_server_handled_total", service="TreeService", method="Healthcheck"
)
with reporter.step("Query Tree healthcheck status"):
healthcheck.tree_healthcheck(node)
with reporter.step(f"Check gRPC metrics for Healthcheck, 'the counter should increase'"):
metrics_counter_new = self.get_metrics_value(
node, command="grpc_server_handled_total", service="TreeService", method="Healthcheck"
)
assert metrics_counter_new > metrics_counter, "the metrics has not increased"
@allure.title("GRPC metrics Tree list")
def test_grpc_metrics_tree_list(self, default_wallet: WalletInfo, cluster: Cluster):
placement_policy = "REP 2 IN X CBF 1 SELECT 4 FROM * AS X"
with reporter.step("Select random node"):
node = random.choice(cluster.cluster_nodes)
with reporter.step(f"Create container with policy {placement_policy}"):
cid = create_container(default_wallet, self.shell, node.storage_node.get_rpc_endpoint(), placement_policy)
with reporter.step("Get current gRPC metrics for Tree List"):
metrics_counter = self.get_metrics_value(
node, command="grpc_server_handled_total", service="TreeService", method="TreeList"
)
with reporter.step("Query Tree List"):
get_tree_list(default_wallet, cid, self.shell, node.storage_node.get_rpc_endpoint())
with reporter.step(f"Check gRPC metrics for Tree List, 'the counter should increase by 1'"):
metrics_counter += 1
self.check_metrics_in_node(
node, metrics_counter, command="grpc_server_handled_total", service="TreeService", method="TreeList"
)
with reporter.step("Delete container"):
delete_container(default_wallet, cid, self.shell, self.cluster.default_rpc_endpoint)

View file

@ -17,15 +17,23 @@ PART_SIZE = 5 * 1024 * 1024
class TestS3GateMultipart(ClusterTestBase):
NO_SUCH_UPLOAD = "The upload ID may be invalid, or the upload may have been aborted or completed."
@allure.title("Object Multipart API (s3_client={s3_client})")
@pytest.mark.parametrize("versioning_status", [VersioningStatus.ENABLED], indirect=True)
def test_s3_object_multipart(self, s3_client: S3ClientWrapper, bucket: str):
@allure.title("Object Multipart API (s3_client={s3_client}, bucket versioning = {versioning_status})")
@pytest.mark.parametrize("versioning_status", [VersioningStatus.ENABLED, VersioningStatus.UNDEFINED], indirect=True)
def test_s3_object_multipart(
self, s3_client: S3ClientWrapper, bucket: str, default_wallet: WalletInfo, versioning_status: str
):
parts_count = 5
file_name_large = generate_file(PART_SIZE * parts_count) # 5Mb - min part
object_key = s3_helper.object_key_from_file_path(file_name_large)
part_files = split_file(file_name_large, parts_count)
parts = []
with reporter.step(f"Get related container_id for bucket"):
for cluster_node in self.cluster.cluster_nodes:
container_id = search_container_by_name(bucket, cluster_node)
if container_id:
break
with reporter.step("Upload first part"):
upload_id = s3_client.create_multipart_upload(bucket, object_key)
uploads = s3_client.list_multipart_uploads(bucket)
@ -39,7 +47,11 @@ class TestS3GateMultipart(ClusterTestBase):
etag = s3_client.upload_part(bucket, object_key, upload_id, part_id, file_path)
parts.append((part_id, etag))
got_parts = s3_client.list_parts(bucket, object_key, upload_id)
s3_client.complete_multipart_upload(bucket, object_key, upload_id, parts)
response = s3_client.complete_multipart_upload(bucket, object_key, upload_id, parts)
version_id = None
if versioning_status == VersioningStatus.ENABLED:
version_id = response["VersionId"]
assert len(got_parts) == len(part_files), f"Expected {parts_count} parts, got\n{got_parts}"
with reporter.step("Check upload list is empty"):
@ -50,6 +62,21 @@ class TestS3GateMultipart(ClusterTestBase):
got_object = s3_client.get_object(bucket, object_key)
assert get_file_hash(got_object) == get_file_hash(file_name_large)
if version_id:
with reporter.step("Delete the object version"):
s3_client.delete_object(bucket, object_key, version_id)
else:
with reporter.step("Delete the object"):
s3_client.delete_object(bucket, object_key)
with reporter.step("List objects in the bucket, expect to be empty"):
objects_list = s3_client.list_objects(bucket)
assert not objects_list, f"Expected empty bucket, got {objects_list}"
with reporter.step("List objects in the container via rpc, expect to be empty"):
objects = list_objects(default_wallet, self.shell, container_id, self.cluster.default_rpc_endpoint)
assert len(objects) == 0, f"Expected no objects in container, got\n{objects}"
@allure.title("Abort Multipart Upload (s3_client={s3_client})")
@pytest.mark.parametrize("versioning_status", [VersioningStatus.ENABLED], indirect=True)
def test_s3_abort_multipart(

View file

@ -1,7 +1,9 @@
import json
import os
import allure
import pytest
from botocore.exceptions import ClientError
from frostfs_testlib import reporter
from frostfs_testlib.s3 import S3ClientWrapper, VersioningStatus
from frostfs_testlib.steps.cli.container import search_container_by_name
@ -87,23 +89,24 @@ class TestS3GatePolicy(ClusterTestBase):
@allure.title("Bucket policy (s3_client={s3_client})")
def test_s3_bucket_policy(self, s3_client: S3ClientWrapper):
with reporter.step("Create bucket with default policy"):
with reporter.step("Create bucket"):
bucket = s3_client.create_bucket()
s3_helper.set_bucket_versioning(s3_client, bucket, VersioningStatus.ENABLED)
with reporter.step("GetBucketPolicy"):
s3_client.get_bucket_policy(bucket)
with pytest.raises((RuntimeError, ClientError)):
s3_client.get_bucket_policy(bucket)
with reporter.step("Put new policy"):
custom_policy = f"file://{os.getcwd()}/pytest_tests/resources/files/bucket_policy.json"
custom_policy = {
"Version": "2008-10-17",
"Version": "2012-10-17",
"Id": "aaaa-bbbb-cccc-dddd",
"Statement": [
{
"Sid": "AddPerm",
"Effect": "Allow",
"Principal": {"AWS": "*"},
"Principal": "*",
"Action": ["s3:GetObject"],
"Resource": [f"arn:aws:s3:::{bucket}/*"],
}
@ -112,8 +115,16 @@ class TestS3GatePolicy(ClusterTestBase):
s3_client.put_bucket_policy(bucket, custom_policy)
with reporter.step("GetBucketPolicy"):
policy_1 = s3_client.get_bucket_policy(bucket)
print(policy_1)
returned_policy = json.loads(s3_client.get_bucket_policy(bucket))
assert returned_policy == custom_policy, "Wrong policy was received"
with reporter.step("Delete the policy"):
s3_client.delete_bucket_policy(bucket)
with reporter.step("GetBucketPolicy"):
with pytest.raises((RuntimeError, ClientError)):
s3_client.get_bucket_policy(bucket)
@allure.title("Bucket CORS (s3_client={s3_client})")
def test_s3_cors(self, s3_client: S3ClientWrapper):