Compare commits
8 commits
master
...
add-test-s
Author | SHA1 | Date | |
---|---|---|---|
550662d5a6 | |||
6025d97f0d | |||
97a39810ca | |||
bc14897919 | |||
ae3e1a848d | |||
4db9d8a4a6 | |||
b5ce51a72e | |||
9eac53965a |
7 changed files with 332 additions and 16 deletions
|
@ -92,7 +92,7 @@ class TestEACLContainer(ClusterTestBase):
|
||||||
cluster=self.cluster,
|
cluster=self.cluster,
|
||||||
)
|
)
|
||||||
|
|
||||||
with reporter.step(f"Check {not_deny_role_wallet} has full access to eACL public container"):
|
with reporter.step(f"Check {not_deny_role_str} has full access to eACL public container"):
|
||||||
check_full_access_to_container(
|
check_full_access_to_container(
|
||||||
not_deny_role_wallet,
|
not_deny_role_wallet,
|
||||||
cid,
|
cid,
|
||||||
|
|
|
@ -23,7 +23,7 @@ from frostfs_testlib.resources.common import (
|
||||||
)
|
)
|
||||||
from frostfs_testlib.s3 import AwsCliClient, Boto3ClientWrapper, S3ClientWrapper, VersioningStatus
|
from frostfs_testlib.s3 import AwsCliClient, Boto3ClientWrapper, S3ClientWrapper, VersioningStatus
|
||||||
from frostfs_testlib.shell import LocalShell, Shell
|
from frostfs_testlib.shell import LocalShell, Shell
|
||||||
from frostfs_testlib.steps.cli.container import DEFAULT_PLACEMENT_RULE, DEFAULT_EC_PLACEMENT_RULE
|
from frostfs_testlib.steps.cli.container import DEFAULT_EC_PLACEMENT_RULE, DEFAULT_PLACEMENT_RULE
|
||||||
from frostfs_testlib.steps.cli.object import get_netmap_netinfo
|
from frostfs_testlib.steps.cli.object import get_netmap_netinfo
|
||||||
from frostfs_testlib.steps.s3 import s3_helper
|
from frostfs_testlib.steps.s3 import s3_helper
|
||||||
from frostfs_testlib.storage import get_service_registry
|
from frostfs_testlib.storage import get_service_registry
|
||||||
|
@ -210,6 +210,7 @@ def object_size(
|
||||||
|
|
||||||
return complex_object_size
|
return complex_object_size
|
||||||
|
|
||||||
|
|
||||||
@pytest.fixture(scope="session")
|
@pytest.fixture(scope="session")
|
||||||
def rep_placement_policy() -> PlacementPolicy:
|
def rep_placement_policy() -> PlacementPolicy:
|
||||||
return PlacementPolicy("rep", DEFAULT_PLACEMENT_RULE)
|
return PlacementPolicy("rep", DEFAULT_PLACEMENT_RULE)
|
||||||
|
@ -222,10 +223,7 @@ def ec_placement_policy() -> PlacementPolicy:
|
||||||
|
|
||||||
# By default we want all tests to be executed with both storage policies.
|
# By default we want all tests to be executed with both storage policies.
|
||||||
# This can be overriden in choosen tests if needed.
|
# This can be overriden in choosen tests if needed.
|
||||||
@pytest.fixture(
|
@pytest.fixture(scope="session", params=[pytest.param("rep", marks=pytest.mark.rep)])
|
||||||
scope="session",
|
|
||||||
params=[pytest.param("rep", marks=pytest.mark.rep), pytest.param("ec", marks=pytest.mark.ec)],
|
|
||||||
)
|
|
||||||
def placement_policy(
|
def placement_policy(
|
||||||
rep_placement_policy: PlacementPolicy, ec_placement_policy: PlacementPolicy, request: pytest.FixtureRequest
|
rep_placement_policy: PlacementPolicy, ec_placement_policy: PlacementPolicy, request: pytest.FixtureRequest
|
||||||
) -> PlacementPolicy:
|
) -> PlacementPolicy:
|
||||||
|
@ -234,6 +232,7 @@ def placement_policy(
|
||||||
|
|
||||||
return ec_placement_policy
|
return ec_placement_policy
|
||||||
|
|
||||||
|
|
||||||
@pytest.fixture(scope="session")
|
@pytest.fixture(scope="session")
|
||||||
def cluster(temp_directory: str, hosting: Hosting, client_shell: Shell) -> Cluster:
|
def cluster(temp_directory: str, hosting: Hosting, client_shell: Shell) -> Cluster:
|
||||||
cluster = Cluster(hosting)
|
cluster = Cluster(hosting)
|
||||||
|
|
|
@ -387,6 +387,7 @@ class TestMaintenanceMode(ClusterTestBase):
|
||||||
expected_status == node_snapshot.node_status
|
expected_status == node_snapshot.node_status
|
||||||
), f"{node_under_test} status should be {expected_status}, but was {node_snapshot.node_status}. See netmap:\n{netmap}"
|
), f"{node_under_test} status should be {expected_status}, but was {node_snapshot.node_status}. See netmap:\n{netmap}"
|
||||||
|
|
||||||
|
@pytest.mark.skip(reason="The test is not stable for 0.39")
|
||||||
@allure.title("Test of basic node operations in maintenance mode")
|
@allure.title("Test of basic node operations in maintenance mode")
|
||||||
def test_maintenance_mode(
|
def test_maintenance_mode(
|
||||||
self,
|
self,
|
||||||
|
@ -456,6 +457,7 @@ class TestMaintenanceMode(ClusterTestBase):
|
||||||
|
|
||||||
os.remove(file_path)
|
os.remove(file_path)
|
||||||
|
|
||||||
|
@pytest.mark.skip(reason="The test is not stable for 0.39")
|
||||||
@pytest.mark.sanity
|
@pytest.mark.sanity
|
||||||
@allure.title("MAINTENANCE and OFFLINE mode transitions")
|
@allure.title("MAINTENANCE and OFFLINE mode transitions")
|
||||||
def test_mode_transitions(
|
def test_mode_transitions(
|
||||||
|
@ -558,6 +560,7 @@ class TestMaintenanceMode(ClusterTestBase):
|
||||||
with reporter.step("Check node status is 'maintenance'"):
|
with reporter.step("Check node status is 'maintenance'"):
|
||||||
self.check_node_status(NodeStatus.MAINTENANCE, node_under_test, frostfs_cli, alive_rpc_endpoint)
|
self.check_node_status(NodeStatus.MAINTENANCE, node_under_test, frostfs_cli, alive_rpc_endpoint)
|
||||||
|
|
||||||
|
@pytest.mark.skip(reason="The test is not stable for 0.39")
|
||||||
@allure.title("A node cannot go into maintenance if maintenance is prohibited globally in the network")
|
@allure.title("A node cannot go into maintenance if maintenance is prohibited globally in the network")
|
||||||
def test_maintenance_globally_forbidden(
|
def test_maintenance_globally_forbidden(
|
||||||
self,
|
self,
|
||||||
|
|
97
pytest_tests/testsuites/metrics/test_container_metrics.py
Normal file
97
pytest_tests/testsuites/metrics/test_container_metrics.py
Normal file
|
@ -0,0 +1,97 @@
|
||||||
|
import math
|
||||||
|
import re
|
||||||
|
|
||||||
|
import allure
|
||||||
|
import pytest
|
||||||
|
from frostfs_testlib import reporter
|
||||||
|
from frostfs_testlib.steps.cli.container import create_container, delete_container
|
||||||
|
from frostfs_testlib.steps.cli.object import delete_object, get_object_nodes, put_object_to_random_node
|
||||||
|
from frostfs_testlib.storage.cluster import Cluster, ClusterNode
|
||||||
|
from frostfs_testlib.storage.dataclasses.object_size import ObjectSize
|
||||||
|
from frostfs_testlib.storage.dataclasses.wallet import WalletInfo
|
||||||
|
from frostfs_testlib.testing.cluster_test_base import ClusterTestBase
|
||||||
|
from frostfs_testlib.testing.test_control import wait_for_success
|
||||||
|
from frostfs_testlib.utils.file_utils import generate_file
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.container
|
||||||
|
class TestContainerMetrics(ClusterTestBase):
|
||||||
|
@wait_for_success(interval=10)
|
||||||
|
def check_sum_counter_metrics_in_nodes(
|
||||||
|
self, cluster_nodes: list[ClusterNode], cid: str, phy_exp: int, logic_exp: int, user_exp: int
|
||||||
|
):
|
||||||
|
counter_phy = 0
|
||||||
|
counter_logic = 0
|
||||||
|
counter_user = 0
|
||||||
|
for cluster_node in cluster_nodes:
|
||||||
|
metric_result = cluster_node.metrics.storage.get_metric_container(f"container_objects_total", cid)
|
||||||
|
counter_phy += self.get_count_metric_type_from_stdout(metric_result.stdout, "phy")
|
||||||
|
counter_logic += self.get_count_metric_type_from_stdout(metric_result.stdout, "logic")
|
||||||
|
counter_user += self.get_count_metric_type_from_stdout(metric_result.stdout, "user")
|
||||||
|
|
||||||
|
assert counter_phy == phy_exp, f"Expected metric Phy={phy_exp}, Actual: {counter_phy} in nodes: {cluster_nodes}"
|
||||||
|
assert (
|
||||||
|
counter_logic == logic_exp
|
||||||
|
), f"Expected metric logic={logic_exp}, Actual: {counter_logic} in nodes: {cluster_nodes}"
|
||||||
|
assert (
|
||||||
|
counter_user == user_exp
|
||||||
|
), f"Expected metric User={user_exp}, Actual: {counter_user} in nodes: {cluster_nodes}"
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def get_count_metric_type_from_stdout(metric_result_stdout: str, metric_type: str):
|
||||||
|
result = re.findall(rf'type="{metric_type}"}}\s(\d+)', metric_result_stdout)
|
||||||
|
return sum(map(int, result))
|
||||||
|
|
||||||
|
@allure.title("Container metrics (obj_size={object_size})")
|
||||||
|
def test_container_metrics(
|
||||||
|
self, object_size: ObjectSize, max_object_size: int, default_wallet: WalletInfo, cluster: Cluster
|
||||||
|
):
|
||||||
|
file_path = generate_file(object_size.value)
|
||||||
|
placement_policy = "REP 2 IN X CBF 2 SELECT 2 FROM * AS X"
|
||||||
|
copies = 2
|
||||||
|
object_chunks = 0
|
||||||
|
head_object = 1
|
||||||
|
link_object = 0
|
||||||
|
if object_size.value > max_object_size:
|
||||||
|
object_chunks = math.ceil(object_size.value / max_object_size)
|
||||||
|
link_object = 1
|
||||||
|
|
||||||
|
with reporter.step(f"Create container with policy {placement_policy}"):
|
||||||
|
cid = create_container(
|
||||||
|
default_wallet,
|
||||||
|
rule=placement_policy,
|
||||||
|
shell=self.shell,
|
||||||
|
endpoint=self.cluster.default_rpc_endpoint,
|
||||||
|
)
|
||||||
|
|
||||||
|
with reporter.step("Put object to random node"):
|
||||||
|
storage_object_id = put_object_to_random_node(
|
||||||
|
wallet=default_wallet,
|
||||||
|
path=file_path,
|
||||||
|
cid=cid,
|
||||||
|
shell=self.shell,
|
||||||
|
cluster=cluster,
|
||||||
|
)
|
||||||
|
|
||||||
|
with reporter.step("Check metric appears in node where the object is located"):
|
||||||
|
object_nodes = get_object_nodes(
|
||||||
|
cluster=cluster, cid=cid, oid=storage_object_id, alive_node=cluster.cluster_nodes[0]
|
||||||
|
)
|
||||||
|
count_metrics_exp = (object_chunks + head_object + link_object) * copies
|
||||||
|
self.check_sum_counter_metrics_in_nodes(
|
||||||
|
object_nodes, cid, phy_exp=count_metrics_exp, logic_exp=count_metrics_exp, user_exp=copies
|
||||||
|
)
|
||||||
|
|
||||||
|
with reporter.step("Delete file, wait until gc remove object"):
|
||||||
|
delete_object(default_wallet, cid, storage_object_id, self.shell, self.cluster.default_rpc_endpoint)
|
||||||
|
count_metrics_exp = len(object_nodes)
|
||||||
|
self.check_sum_counter_metrics_in_nodes(
|
||||||
|
object_nodes, cid, phy_exp=count_metrics_exp, logic_exp=count_metrics_exp, user_exp=0
|
||||||
|
)
|
||||||
|
|
||||||
|
with reporter.step("Check metrics(Phy, Logic, User) in each nodes"):
|
||||||
|
# Phy and Logic metrics are 4, because in rule 'CBF 2 SELECT 2 FROM', cbf2*sel2=4
|
||||||
|
self.check_sum_counter_metrics_in_nodes(cluster.cluster_nodes, cid, phy_exp=4, logic_exp=4, user_exp=0)
|
||||||
|
|
||||||
|
with reporter.step("Delete container"):
|
||||||
|
delete_container(default_wallet, cid, shell=self.shell, endpoint=self.cluster.default_rpc_endpoint)
|
179
pytest_tests/testsuites/metrics/test_shard_metrics.py
Normal file
179
pytest_tests/testsuites/metrics/test_shard_metrics.py
Normal file
|
@ -0,0 +1,179 @@
|
||||||
|
import random
|
||||||
|
import re
|
||||||
|
|
||||||
|
import allure
|
||||||
|
import pytest
|
||||||
|
from frostfs_testlib import reporter
|
||||||
|
from frostfs_testlib.resources.wellknown_acl import EACL_PUBLIC_READ_WRITE
|
||||||
|
from frostfs_testlib.shell import Shell
|
||||||
|
from frostfs_testlib.steps.cli.container import create_container, delete_container
|
||||||
|
from frostfs_testlib.steps.cli.object import get_object, get_object_nodes, put_object
|
||||||
|
from frostfs_testlib.steps.node_management import node_shard_list, node_shard_set_mode
|
||||||
|
from frostfs_testlib.storage.cluster import Cluster, ClusterNode
|
||||||
|
from frostfs_testlib.storage.controllers import ShardsWatcher
|
||||||
|
from frostfs_testlib.storage.dataclasses.wallet import WalletInfo
|
||||||
|
from frostfs_testlib.testing import parallel, wait_for_success
|
||||||
|
from frostfs_testlib.testing.cluster_test_base import ClusterTestBase
|
||||||
|
from frostfs_testlib.utils.file_utils import generate_file
|
||||||
|
|
||||||
|
|
||||||
|
class TestShardMetrics(ClusterTestBase):
|
||||||
|
@pytest.fixture()
|
||||||
|
@allure.title("Get two shards for set mode")
|
||||||
|
def two_shards_and_node(self, cluster: Cluster) -> tuple[str, str, ClusterNode]:
|
||||||
|
node = random.choice(cluster.cluster_nodes)
|
||||||
|
shards = node_shard_list(node.storage_node)
|
||||||
|
two_shards = random.sample(shards, k=2)
|
||||||
|
|
||||||
|
yield two_shards[0], two_shards[1], node
|
||||||
|
|
||||||
|
for shard in two_shards:
|
||||||
|
node_shard_set_mode(node.storage_node, shard, "read-write")
|
||||||
|
|
||||||
|
node_shard_list(node.storage_node)
|
||||||
|
|
||||||
|
@pytest.fixture()
|
||||||
|
@allure.title("Revert all shards mode")
|
||||||
|
def revert_all_shards_mode(self):
|
||||||
|
yield
|
||||||
|
parallel(self.set_shard_rw_mode, self.cluster.cluster_nodes)
|
||||||
|
|
||||||
|
def set_shard_rw_mode(self, node: ClusterNode):
|
||||||
|
watcher = ShardsWatcher(node)
|
||||||
|
shards = watcher.get_shards()
|
||||||
|
for shard in shards:
|
||||||
|
watcher.set_shard_mode(shard["shard_id"], mode="read-write")
|
||||||
|
watcher.await_for_all_shards_status(status="read-write")
|
||||||
|
|
||||||
|
@wait_for_success(interval=10)
|
||||||
|
def check_metrics_in_node(self, cluster_node: ClusterNode, counter_exp: int, **metrics_greps: str):
|
||||||
|
counter_act = 0
|
||||||
|
try:
|
||||||
|
metric_result = cluster_node.metrics.storage.get_metrics_search_by_greps(**metrics_greps)
|
||||||
|
counter_act += self.calc_metrics_count_from_stdout(metric_result.stdout)
|
||||||
|
except RuntimeError as e:
|
||||||
|
...
|
||||||
|
assert counter_act == counter_exp, f"Expected: {counter_exp}, Actual: {counter_act} in node: {cluster_node}"
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def calc_metrics_count_from_stdout(metric_result_stdout: str):
|
||||||
|
result = re.findall(r"}\s(\d+)", metric_result_stdout)
|
||||||
|
return sum(map(int, result))
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def get_error_count_from_logs(shell: Shell, object_path: str, object_name: str):
|
||||||
|
error_count = 0
|
||||||
|
try:
|
||||||
|
logs = shell.exec(f"journalctl -u frostfs-storage --grep='error count' --no-pager")
|
||||||
|
# search error logs for current object
|
||||||
|
for error_line in logs.stdout.split("\n"):
|
||||||
|
if object_path in error_line and object_name in error_line:
|
||||||
|
result = re.findall(r'"error\scount":\s(\d+)', error_line)
|
||||||
|
error_count += sum(map(int, result))
|
||||||
|
except RuntimeError as e:
|
||||||
|
...
|
||||||
|
|
||||||
|
return error_count
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
@wait_for_success(180, 30)
|
||||||
|
def get_object_path_and_name_file(oid: str, cid: str, node: ClusterNode) -> tuple[str, str]:
|
||||||
|
oid_path = f"{oid[0]}/{oid[1]}/{oid[2]}/{oid[3]}"
|
||||||
|
object_path = None
|
||||||
|
|
||||||
|
with reporter.step("Search object file"):
|
||||||
|
node_shell = node.storage_node.host.get_shell()
|
||||||
|
data_path = node.storage_node.get_data_directory()
|
||||||
|
all_datas = node_shell.exec(f"ls -la {data_path}/data | awk '{{ print $9 }}'").stdout.strip()
|
||||||
|
for data_dir in all_datas.replace(".", "").strip().split("\n"):
|
||||||
|
check_dir = node_shell.exec(
|
||||||
|
f" [ -d {data_path}/data/{data_dir}/data/{oid_path} ] && echo 1 || echo 0"
|
||||||
|
).stdout
|
||||||
|
if "1" in check_dir:
|
||||||
|
object_path = f"{data_path}/data/{data_dir}/data/{oid_path}"
|
||||||
|
object_name = f"{oid[4:]}.{cid}"
|
||||||
|
break
|
||||||
|
|
||||||
|
assert object_path is not None, f"{oid} object not found in directory - {data_path}/data"
|
||||||
|
return object_path, object_name
|
||||||
|
|
||||||
|
@allure.title("Metric for shard mode")
|
||||||
|
def test_shard_metrics_set_mode(self, two_shards_and_node: tuple[str, str, ClusterNode]):
|
||||||
|
metrics_counter = 1
|
||||||
|
shard1, shard2, node = two_shards_and_node
|
||||||
|
|
||||||
|
with reporter.step("Shard1 set to mode 'read-only'"):
|
||||||
|
node_shard_set_mode(node.storage_node, shard1, "read-only")
|
||||||
|
|
||||||
|
with reporter.step(f"Check shard metrics, 'the mode will change to 'READ_ONLY'"):
|
||||||
|
self.check_metrics_in_node(
|
||||||
|
node, metrics_counter, command="frostfs_node_engine_mode_info", mode="READ_ONLY", shard_id=shard1
|
||||||
|
)
|
||||||
|
|
||||||
|
with reporter.step("Shard2 set to mode 'degraded-read-only'"):
|
||||||
|
node_shard_set_mode(node.storage_node, shard2, "degraded-read-only")
|
||||||
|
|
||||||
|
with reporter.step(f"Check shard metrics, 'the mode will change to 'DEGRADED_READ_ONLY'"):
|
||||||
|
self.check_metrics_in_node(
|
||||||
|
node,
|
||||||
|
metrics_counter,
|
||||||
|
command="frostfs_node_engine_mode_info",
|
||||||
|
mode="DEGRADED_READ_ONLY",
|
||||||
|
shard_id=shard2,
|
||||||
|
)
|
||||||
|
|
||||||
|
with reporter.step("Both shards set to mode 'read-write'"):
|
||||||
|
for shard in [shard1, shard2]:
|
||||||
|
node_shard_set_mode(node.storage_node, shard, "read-write")
|
||||||
|
|
||||||
|
with reporter.step(f"Check shard metrics, 'the mode will change to 'READ_WRITE'"):
|
||||||
|
self.check_metrics_in_node(
|
||||||
|
node, metrics_counter, command="frostfs_node_engine_mode_info", mode="READ_WRITE", shard_id=shard
|
||||||
|
)
|
||||||
|
|
||||||
|
@allure.title("Metric for error count on shard")
|
||||||
|
def test_shard_metrics_error_count(
|
||||||
|
self, max_object_size: int, default_wallet: WalletInfo, cluster: Cluster, revert_all_shards_mode
|
||||||
|
):
|
||||||
|
file_path = generate_file(round(max_object_size * 0.8))
|
||||||
|
|
||||||
|
with reporter.step(f"Create container"):
|
||||||
|
cid = create_container(
|
||||||
|
wallet=default_wallet,
|
||||||
|
shell=self.shell,
|
||||||
|
endpoint=self.cluster.default_rpc_endpoint,
|
||||||
|
rule="REP 1 CBF 1",
|
||||||
|
basic_acl=EACL_PUBLIC_READ_WRITE,
|
||||||
|
)
|
||||||
|
|
||||||
|
with reporter.step("Put object"):
|
||||||
|
oid = put_object(default_wallet, file_path, cid, self.shell, self.cluster.default_rpc_endpoint)
|
||||||
|
|
||||||
|
with reporter.step("Get object nodes"):
|
||||||
|
object_nodes = get_object_nodes(cluster, cid, oid, cluster.cluster_nodes[0])
|
||||||
|
node = object_nodes[0]
|
||||||
|
|
||||||
|
with reporter.step("Search object in system."):
|
||||||
|
object_path, object_name = self.get_object_path_and_name_file(oid, cid, node)
|
||||||
|
|
||||||
|
with reporter.step("Block read file"):
|
||||||
|
node.host.get_shell().exec(f"chmod a-r {object_path}/{object_name}")
|
||||||
|
|
||||||
|
with reporter.step("Get object, expect error"):
|
||||||
|
with pytest.raises(RuntimeError):
|
||||||
|
get_object(
|
||||||
|
wallet=default_wallet,
|
||||||
|
cid=cid,
|
||||||
|
oid=oid,
|
||||||
|
shell=self.shell,
|
||||||
|
endpoint=node.storage_node.get_rpc_endpoint(),
|
||||||
|
)
|
||||||
|
|
||||||
|
with reporter.step(f"Get shard error count from logs"):
|
||||||
|
counter = self.get_error_count_from_logs(node.host.get_shell(), object_path, object_name)
|
||||||
|
|
||||||
|
with reporter.step(f"Check shard error metrics"):
|
||||||
|
self.check_metrics_in_node(node, counter, command="frostfs_node_engine_errors_total")
|
||||||
|
|
||||||
|
with reporter.step("Delete container"):
|
||||||
|
delete_container(default_wallet, cid, self.shell, cluster.default_rpc_endpoint)
|
|
@ -17,15 +17,23 @@ PART_SIZE = 5 * 1024 * 1024
|
||||||
class TestS3GateMultipart(ClusterTestBase):
|
class TestS3GateMultipart(ClusterTestBase):
|
||||||
NO_SUCH_UPLOAD = "The upload ID may be invalid, or the upload may have been aborted or completed."
|
NO_SUCH_UPLOAD = "The upload ID may be invalid, or the upload may have been aborted or completed."
|
||||||
|
|
||||||
@allure.title("Object Multipart API (s3_client={s3_client})")
|
@allure.title("Object Multipart API (s3_client={s3_client}, bucket versioning = {versioning_status})")
|
||||||
@pytest.mark.parametrize("versioning_status", [VersioningStatus.ENABLED], indirect=True)
|
@pytest.mark.parametrize("versioning_status", [VersioningStatus.ENABLED, VersioningStatus.UNDEFINED], indirect=True)
|
||||||
def test_s3_object_multipart(self, s3_client: S3ClientWrapper, bucket: str):
|
def test_s3_object_multipart(
|
||||||
|
self, s3_client: S3ClientWrapper, bucket: str, default_wallet: WalletInfo, versioning_status: str
|
||||||
|
):
|
||||||
parts_count = 5
|
parts_count = 5
|
||||||
file_name_large = generate_file(PART_SIZE * parts_count) # 5Mb - min part
|
file_name_large = generate_file(PART_SIZE * parts_count) # 5Mb - min part
|
||||||
object_key = s3_helper.object_key_from_file_path(file_name_large)
|
object_key = s3_helper.object_key_from_file_path(file_name_large)
|
||||||
part_files = split_file(file_name_large, parts_count)
|
part_files = split_file(file_name_large, parts_count)
|
||||||
parts = []
|
parts = []
|
||||||
|
|
||||||
|
with reporter.step(f"Get related container_id for bucket"):
|
||||||
|
for cluster_node in self.cluster.cluster_nodes:
|
||||||
|
container_id = search_container_by_name(bucket, cluster_node)
|
||||||
|
if container_id:
|
||||||
|
break
|
||||||
|
|
||||||
with reporter.step("Upload first part"):
|
with reporter.step("Upload first part"):
|
||||||
upload_id = s3_client.create_multipart_upload(bucket, object_key)
|
upload_id = s3_client.create_multipart_upload(bucket, object_key)
|
||||||
uploads = s3_client.list_multipart_uploads(bucket)
|
uploads = s3_client.list_multipart_uploads(bucket)
|
||||||
|
@ -39,7 +47,11 @@ class TestS3GateMultipart(ClusterTestBase):
|
||||||
etag = s3_client.upload_part(bucket, object_key, upload_id, part_id, file_path)
|
etag = s3_client.upload_part(bucket, object_key, upload_id, part_id, file_path)
|
||||||
parts.append((part_id, etag))
|
parts.append((part_id, etag))
|
||||||
got_parts = s3_client.list_parts(bucket, object_key, upload_id)
|
got_parts = s3_client.list_parts(bucket, object_key, upload_id)
|
||||||
s3_client.complete_multipart_upload(bucket, object_key, upload_id, parts)
|
response = s3_client.complete_multipart_upload(bucket, object_key, upload_id, parts)
|
||||||
|
|
||||||
|
version_id = None
|
||||||
|
if versioning_status == VersioningStatus.ENABLED:
|
||||||
|
version_id = response["VersionId"]
|
||||||
assert len(got_parts) == len(part_files), f"Expected {parts_count} parts, got\n{got_parts}"
|
assert len(got_parts) == len(part_files), f"Expected {parts_count} parts, got\n{got_parts}"
|
||||||
|
|
||||||
with reporter.step("Check upload list is empty"):
|
with reporter.step("Check upload list is empty"):
|
||||||
|
@ -50,6 +62,21 @@ class TestS3GateMultipart(ClusterTestBase):
|
||||||
got_object = s3_client.get_object(bucket, object_key)
|
got_object = s3_client.get_object(bucket, object_key)
|
||||||
assert get_file_hash(got_object) == get_file_hash(file_name_large)
|
assert get_file_hash(got_object) == get_file_hash(file_name_large)
|
||||||
|
|
||||||
|
if version_id:
|
||||||
|
with reporter.step("Delete the object version"):
|
||||||
|
s3_client.delete_object(bucket, object_key, version_id)
|
||||||
|
else:
|
||||||
|
with reporter.step("Delete the object"):
|
||||||
|
s3_client.delete_object(bucket, object_key)
|
||||||
|
|
||||||
|
with reporter.step("List objects in the bucket, expect to be empty"):
|
||||||
|
objects_list = s3_client.list_objects(bucket)
|
||||||
|
assert not objects_list, f"Expected empty bucket, got {objects_list}"
|
||||||
|
|
||||||
|
with reporter.step("List objects in the container via rpc, expect to be empty"):
|
||||||
|
objects = list_objects(default_wallet, self.shell, container_id, self.cluster.default_rpc_endpoint)
|
||||||
|
assert len(objects) == 0, f"Expected no objects in container, got\n{objects}"
|
||||||
|
|
||||||
@allure.title("Abort Multipart Upload (s3_client={s3_client})")
|
@allure.title("Abort Multipart Upload (s3_client={s3_client})")
|
||||||
@pytest.mark.parametrize("versioning_status", [VersioningStatus.ENABLED], indirect=True)
|
@pytest.mark.parametrize("versioning_status", [VersioningStatus.ENABLED], indirect=True)
|
||||||
def test_s3_abort_multipart(
|
def test_s3_abort_multipart(
|
||||||
|
|
|
@ -1,7 +1,9 @@
|
||||||
|
import json
|
||||||
import os
|
import os
|
||||||
|
|
||||||
import allure
|
import allure
|
||||||
import pytest
|
import pytest
|
||||||
|
from botocore.exceptions import ClientError
|
||||||
from frostfs_testlib import reporter
|
from frostfs_testlib import reporter
|
||||||
from frostfs_testlib.s3 import S3ClientWrapper, VersioningStatus
|
from frostfs_testlib.s3 import S3ClientWrapper, VersioningStatus
|
||||||
from frostfs_testlib.steps.cli.container import search_container_by_name
|
from frostfs_testlib.steps.cli.container import search_container_by_name
|
||||||
|
@ -87,23 +89,24 @@ class TestS3GatePolicy(ClusterTestBase):
|
||||||
|
|
||||||
@allure.title("Bucket policy (s3_client={s3_client})")
|
@allure.title("Bucket policy (s3_client={s3_client})")
|
||||||
def test_s3_bucket_policy(self, s3_client: S3ClientWrapper):
|
def test_s3_bucket_policy(self, s3_client: S3ClientWrapper):
|
||||||
with reporter.step("Create bucket with default policy"):
|
with reporter.step("Create bucket"):
|
||||||
bucket = s3_client.create_bucket()
|
bucket = s3_client.create_bucket()
|
||||||
s3_helper.set_bucket_versioning(s3_client, bucket, VersioningStatus.ENABLED)
|
s3_helper.set_bucket_versioning(s3_client, bucket, VersioningStatus.ENABLED)
|
||||||
|
|
||||||
with reporter.step("GetBucketPolicy"):
|
with reporter.step("GetBucketPolicy"):
|
||||||
|
with pytest.raises((RuntimeError, ClientError)):
|
||||||
s3_client.get_bucket_policy(bucket)
|
s3_client.get_bucket_policy(bucket)
|
||||||
|
|
||||||
with reporter.step("Put new policy"):
|
with reporter.step("Put new policy"):
|
||||||
custom_policy = f"file://{os.getcwd()}/pytest_tests/resources/files/bucket_policy.json"
|
custom_policy = f"file://{os.getcwd()}/pytest_tests/resources/files/bucket_policy.json"
|
||||||
custom_policy = {
|
custom_policy = {
|
||||||
"Version": "2008-10-17",
|
"Version": "2012-10-17",
|
||||||
"Id": "aaaa-bbbb-cccc-dddd",
|
"Id": "aaaa-bbbb-cccc-dddd",
|
||||||
"Statement": [
|
"Statement": [
|
||||||
{
|
{
|
||||||
"Sid": "AddPerm",
|
"Sid": "AddPerm",
|
||||||
"Effect": "Allow",
|
"Effect": "Allow",
|
||||||
"Principal": {"AWS": "*"},
|
"Principal": "*",
|
||||||
"Action": ["s3:GetObject"],
|
"Action": ["s3:GetObject"],
|
||||||
"Resource": [f"arn:aws:s3:::{bucket}/*"],
|
"Resource": [f"arn:aws:s3:::{bucket}/*"],
|
||||||
}
|
}
|
||||||
|
@ -112,8 +115,16 @@ class TestS3GatePolicy(ClusterTestBase):
|
||||||
|
|
||||||
s3_client.put_bucket_policy(bucket, custom_policy)
|
s3_client.put_bucket_policy(bucket, custom_policy)
|
||||||
with reporter.step("GetBucketPolicy"):
|
with reporter.step("GetBucketPolicy"):
|
||||||
policy_1 = s3_client.get_bucket_policy(bucket)
|
returned_policy = json.loads(s3_client.get_bucket_policy(bucket))
|
||||||
print(policy_1)
|
|
||||||
|
assert returned_policy == custom_policy, "Wrong policy was received"
|
||||||
|
|
||||||
|
with reporter.step("Delete the policy"):
|
||||||
|
s3_client.delete_bucket_policy(bucket)
|
||||||
|
|
||||||
|
with reporter.step("GetBucketPolicy"):
|
||||||
|
with pytest.raises((RuntimeError, ClientError)):
|
||||||
|
s3_client.get_bucket_policy(bucket)
|
||||||
|
|
||||||
@allure.title("Bucket CORS (s3_client={s3_client})")
|
@allure.title("Bucket CORS (s3_client={s3_client})")
|
||||||
def test_s3_cors(self, s3_client: S3ClientWrapper):
|
def test_s3_cors(self, s3_client: S3ClientWrapper):
|
||||||
|
|
Loading…
Reference in a new issue