Compare commits

...

6 commits

6 changed files with 68 additions and 47 deletions

View file

@ -7,7 +7,7 @@ import allure
import pytest
from frostfs_testlib import reporter
from frostfs_testlib.healthcheck.interfaces import Healthcheck
from frostfs_testlib.resources.wellknown_acl import PUBLIC_ACL
from frostfs_testlib.resources.wellknown_acl import EACL_PUBLIC_READ_WRITE, PUBLIC_ACL
from frostfs_testlib.steps.cli.container import create_container
from frostfs_testlib.steps.cli.object import (
get_object,
@ -88,6 +88,7 @@ class TestFailoverNetwork(ClusterTestBase):
endpoint=self.cluster.default_rpc_endpoint,
rule=placement_rule,
await_mode=True,
basic_acl=EACL_PUBLIC_READ_WRITE,
)
storage_objects = []
@ -210,11 +211,9 @@ class TestFailoverNetwork(ClusterTestBase):
with reporter.step("Search nodes with object"):
nodes_with_object = get_object_nodes(
cluster=self.cluster,
wallet=default_wallet,
cid=storage_object.cid,
oid=storage_object.oid,
shell=self.shell,
endpoint=self.cluster.default_rpc_endpoint,
alive_node=self.cluster.cluster_nodes[0],
)
with reporter.step("Get data interface to node"):
@ -270,11 +269,9 @@ class TestFailoverNetwork(ClusterTestBase):
with reporter.step("Search nodes with object"):
nodes_with_object = get_object_nodes(
cluster=self.cluster,
wallet=default_wallet,
cid=storage_object.cid,
oid=storage_object.oid,
shell=self.shell,
endpoint=self.cluster.default_rpc_endpoint,
alive_node=self.cluster.cluster_nodes[0],
)
with reporter.step("Get internal interface to node"):

View file

@ -156,12 +156,7 @@ class TestFailoverServer(ClusterTestBase):
) -> tuple[StorageObjectInfo, list[ClusterNode]]:
object_info = container.generate_object(simple_object_size.value)
object_nodes = get_object_nodes(
cluster=self.cluster,
wallet=default_wallet,
cid=object_info.cid,
oid=object_info.oid,
shell=self.shell,
endpoint=self.cluster.default_rpc_endpoint,
cluster=self.cluster, cid=object_info.cid, oid=object_info.oid, alive_node=self.cluster.cluster_nodes[0]
)
return object_info, object_nodes
@ -313,12 +308,7 @@ class TestFailoverServer(ClusterTestBase):
)
with reporter.step("Search nodes with object"):
object_nodes = get_object_nodes(
cluster=self.cluster,
wallet=default_wallet,
cid=cid_1,
oid=oid,
shell=self.shell,
endpoint=self.cluster.default_rpc_endpoint,
cluster=self.cluster, cid=cid_1, oid=oid, alive_node=self.cluster.cluster_nodes[0]
)
with reporter.step("Turn off random node with object"):
cluster_state_controller.stop_node_host(node=random.choice(object_nodes[1:]), mode="hard")

View file

@ -20,6 +20,7 @@ from frostfs_testlib.steps.node_management import (
wait_for_node_to_be_ready,
)
from frostfs_testlib.steps.s3 import s3_helper
from frostfs_testlib.steps.s3.s3_helper import search_nodes_with_bucket
from frostfs_testlib.storage.cluster import Cluster, ClusterNode, S3Gate, StorageNode
from frostfs_testlib.storage.controllers import ClusterStateController, ShardsWatcher
from frostfs_testlib.storage.dataclasses.object_size import ObjectSize
@ -126,6 +127,7 @@ class TestFailoverStorage(ClusterTestBase):
def test_unhealthy_tree(
self,
s3_client: S3ClientWrapper,
default_wallet: str,
simple_object_size: ObjectSize,
cluster_state_controller: ClusterStateController,
after_run_return_all_stopped_services,
@ -155,11 +157,22 @@ class TestFailoverStorage(ClusterTestBase):
put_object = s3_client.put_object(bucket, file_path)
s3_helper.check_objects_in_bucket(s3_client, bucket, expected_objects=[file_name])
with reporter.step("Turn off all storage nodes except default"):
for node in self.cluster.cluster_nodes[1:]:
node_bucket = search_nodes_with_bucket(
cluster=self.cluster,
bucket_name=bucket,
wallet=default_wallet,
shell=self.shell,
endpoint=self.cluster.storage_nodes[0].get_rpc_endpoint(),
)[0]
with reporter.step("Turn off all storage nodes except bucket node"):
for node in [node_to_stop for node_to_stop in self.cluster.cluster_nodes if node_to_stop != node_bucket]:
with reporter.step(f"Stop storage service on node: {node}"):
cluster_state_controller.stop_service_of_type(node, StorageNode)
with reporter.step(f"Change s3 endpoint to bucket node"):
s3_client.set_endpoint(node_bucket.s3_gate.get_endpoint())
with reporter.step("Check that object is available"):
s3_helper.check_objects_in_bucket(s3_client, bucket, expected_objects=[file_name])

View file

@ -16,15 +16,23 @@ PART_SIZE = 5 * 1024 * 1024
class TestS3GateMultipart(ClusterTestBase):
NO_SUCH_UPLOAD = "The upload ID may be invalid, or the upload may have been aborted or completed."
@allure.title("Object Multipart API (s3_client={s3_client})")
@pytest.mark.parametrize("versioning_status", [VersioningStatus.ENABLED], indirect=True)
def test_s3_object_multipart(self, s3_client: S3ClientWrapper, bucket: str):
@allure.title("Object Multipart API (s3_client={s3_client}, bucket versioning = {versioning_status})")
@pytest.mark.parametrize("versioning_status", [VersioningStatus.ENABLED, VersioningStatus.UNDEFINED], indirect=True)
def test_s3_object_multipart(
self, s3_client: S3ClientWrapper, bucket: str, default_wallet: str, versioning_status: str
):
parts_count = 5
file_name_large = generate_file(PART_SIZE * parts_count) # 5Mb - min part
object_key = s3_helper.object_key_from_file_path(file_name_large)
part_files = split_file(file_name_large, parts_count)
parts = []
with reporter.step(f"Get related container_id for bucket"):
for cluster_node in self.cluster.cluster_nodes:
container_id = search_container_by_name(bucket, cluster_node)
if container_id:
break
with reporter.step("Upload first part"):
upload_id = s3_client.create_multipart_upload(bucket, object_key)
uploads = s3_client.list_multipart_uploads(bucket)
@ -38,7 +46,11 @@ class TestS3GateMultipart(ClusterTestBase):
etag = s3_client.upload_part(bucket, object_key, upload_id, part_id, file_path)
parts.append((part_id, etag))
got_parts = s3_client.list_parts(bucket, object_key, upload_id)
s3_client.complete_multipart_upload(bucket, object_key, upload_id, parts)
response = s3_client.complete_multipart_upload(bucket, object_key, upload_id, parts)
version_id = None
if versioning_status == VersioningStatus.ENABLED:
version_id = response["VersionId"]
assert len(got_parts) == len(part_files), f"Expected {parts_count} parts, got\n{got_parts}"
with reporter.step("Check upload list is empty"):
@ -49,6 +61,21 @@ class TestS3GateMultipart(ClusterTestBase):
got_object = s3_client.get_object(bucket, object_key)
assert get_file_hash(got_object) == get_file_hash(file_name_large)
if version_id:
with reporter.step("Delete the object version"):
s3_client.delete_object(bucket, object_key, version_id)
else:
with reporter.step("Delete the object"):
s3_client.delete_object(bucket, object_key)
with reporter.step("List objects in the bucket, expect to be empty"):
objects_list = s3_client.list_objects(bucket)
assert not objects_list, f"Expected empty bucket, got {objects_list}"
with reporter.step("List objects in the container via rpc, expect to be empty"):
objects = list_objects(default_wallet, self.shell, container_id, self.cluster.default_rpc_endpoint)
assert len(objects) == 0, f"Expected no objects in container, got\n{objects}"
@allure.title("Abort Multipart Upload (s3_client={s3_client})")
@pytest.mark.parametrize("versioning_status", [VersioningStatus.ENABLED], indirect=True)
def test_s3_abort_multipart(

View file

@ -51,12 +51,10 @@ class TestS3GatePolicy(ClusterTestBase):
assert bucket_loc_2 == "rep-3"
with reporter.step("Check object policy"):
cid_1 = search_container_by_name(
default_wallet,
bucket_1,
shell=self.shell,
endpoint=self.cluster.default_rpc_endpoint,
)
for cluster_node in self.cluster.cluster_nodes:
cid_1 = search_container_by_name(name=bucket_1, node=cluster_node)
if cid_1:
break
copies_1 = get_simple_object_copies(
wallet=default_wallet,
cid=cid_1,
@ -65,12 +63,10 @@ class TestS3GatePolicy(ClusterTestBase):
nodes=self.cluster.storage_nodes,
)
assert copies_1 == 1
cid_2 = search_container_by_name(
default_wallet,
bucket_2,
shell=self.shell,
endpoint=self.cluster.default_rpc_endpoint,
)
for cluster_node in self.cluster.cluster_nodes:
cid_2 = search_container_by_name(name=bucket_1, node=cluster_node)
if cid_2:
break
copies_2 = get_simple_object_copies(
wallet=default_wallet,
cid=cid_2,

View file

@ -7,6 +7,7 @@ from frostfs_testlib import reporter
from frostfs_testlib.cli import FrostfsCli
from frostfs_testlib.resources.cli import CLI_DEFAULT_TIMEOUT
from frostfs_testlib.resources.common import DEFAULT_WALLET_CONFIG
from frostfs_testlib.resources.wellknown_acl import EACL_PUBLIC_READ_WRITE
from frostfs_testlib.steps.cli.container import create_container, delete_container
from frostfs_testlib.steps.cli.object import delete_object, get_object, get_object_nodes, put_object
from frostfs_testlib.storage.cluster import Cluster, ClusterNode, StorageNode
@ -52,21 +53,18 @@ class TestControlShard(ClusterTestBase):
def oid_cid_node(self, default_wallet: str) -> tuple[str, str, ClusterNode]:
with reporter.step("Create container, and put object"):
cid = create_container(
wallet=default_wallet, shell=self.shell, endpoint=self.cluster.default_rpc_endpoint, rule="REP 1 CBF 1"
wallet=default_wallet,
shell=self.shell,
endpoint=self.cluster.default_rpc_endpoint,
rule="REP 1 CBF 1",
basic_acl=EACL_PUBLIC_READ_WRITE,
)
file = generate_file(5242880)
oid = put_object(
wallet=default_wallet, path=file, cid=cid, shell=self.shell, endpoint=self.cluster.default_rpc_endpoint
)
with reporter.step("Search node with object"):
nodes = get_object_nodes(
cluster=self.cluster,
wallet=default_wallet,
cid=cid,
oid=oid,
shell=self.shell,
endpoint=self.cluster.default_rpc_endpoint,
)
nodes = get_object_nodes(cluster=self.cluster, cid=cid, oid=oid, alive_node=self.cluster.cluster_nodes[0])
yield oid, cid, nodes[0]
@ -136,7 +134,7 @@ class TestControlShard(ClusterTestBase):
revert_all_shards_mode: None,
):
oid, cid, node = oid_cid_node
object_path, object_name = self.get_object_path_and_name_file(oid, cid, node)
object_path, object_name = self.get_object_path_and_name_file(*oid_cid_node)
with reporter.step("Block read file"):
node.host.get_shell().exec(f"chmod a-r {object_path}/{object_name}")
with reporter.step("Get object, expect 6 errors"):