Update multipart upload abort test #31
7 changed files with 105 additions and 41 deletions
|
@ -490,7 +490,7 @@ class AwsCliClient:
|
||||||
f"--endpoint-url {self.s3gate_endpoint} --recursive"
|
f"--endpoint-url {self.s3gate_endpoint} --recursive"
|
||||||
)
|
)
|
||||||
if Metadata:
|
if Metadata:
|
||||||
cmd += f" --metadata"
|
cmd += " --metadata"
|
||||||
for key, value in Metadata.items():
|
for key, value in Metadata.items():
|
||||||
cmd += f" {key}={value}"
|
cmd += f" {key}={value}"
|
||||||
if ACL:
|
if ACL:
|
||||||
|
|
|
@ -217,6 +217,34 @@ def list_containers(
|
||||||
return result.stdout.split()
|
return result.stdout.split()
|
||||||
|
|
||||||
|
|
||||||
|
@allure.step("List Objects in container")
|
||||||
|
def list_objects(
|
||||||
|
wallet: str,
|
||||||
|
shell: Shell,
|
||||||
|
container_id: str,
|
||||||
|
endpoint: str,
|
||||||
|
timeout: Optional[str] = CLI_DEFAULT_TIMEOUT,
|
||||||
|
) -> list[str]:
|
||||||
|
"""
|
||||||
|
A wrapper for `frostfs-cli container list-objects` call. It returns all the
|
||||||
|
available objects in container.
|
||||||
|
Args:
|
||||||
|
wallet (str): a wallet on whose behalf we list the containers objects
|
||||||
|
shell: executor for cli command
|
||||||
|
container_id: cid of container
|
||||||
|
endpoint: FrostFS endpoint to send request to, appends to `--rpc-endpoint` key
|
||||||
|
timeout: Timeout for the operation.
|
||||||
|
Returns:
|
||||||
|
(list): list of containers
|
||||||
|
"""
|
||||||
|
cli = FrostfsCli(shell, FROSTFS_CLI_EXEC, WALLET_CONFIG)
|
||||||
|
result = cli.container.list_objects(
|
||||||
|
rpc_endpoint=endpoint, wallet=wallet, cid=container_id, timeout=timeout
|
||||||
|
)
|
||||||
|
logger.info(f"Container objects: \n{result}")
|
||||||
|
return result.stdout.split()
|
||||||
|
|
||||||
|
|
||||||
@allure.step("Get Container")
|
@allure.step("Get Container")
|
||||||
def get_container(
|
def get_container(
|
||||||
wallet: str,
|
wallet: str,
|
||||||
|
@ -326,6 +354,6 @@ def search_container_by_name(wallet: str, name: str, shell: Shell, endpoint: str
|
||||||
list_cids = list_containers(wallet, shell, endpoint)
|
list_cids = list_containers(wallet, shell, endpoint)
|
||||||
for cid in list_cids:
|
for cid in list_cids:
|
||||||
cont_info = get_container(wallet, cid, shell, endpoint, True)
|
cont_info = get_container(wallet, cid, shell, endpoint, True)
|
||||||
if cont_info.get("attributes").get("Name", None) == name:
|
if cont_info.get("attributes", {}).get("Name", None) == name:
|
||||||
return cid
|
return cid
|
||||||
return None
|
return None
|
||||||
|
|
|
@ -43,7 +43,7 @@ def try_to_get_objects_and_expect_error(s3_client, bucket: str, object_keys: lis
|
||||||
), f"Expected error in exception {err}"
|
), f"Expected error in exception {err}"
|
||||||
|
|
||||||
|
|
||||||
@allure.step("Set versioning enable for bucket")
|
@allure.step("Set versioning status to '{status}' for bucket '{bucket}'")
|
||||||
def set_bucket_versioning(s3_client, bucket: str, status: s3_gate_bucket.VersioningStatus):
|
def set_bucket_versioning(s3_client, bucket: str, status: s3_gate_bucket.VersioningStatus):
|
||||||
s3_gate_bucket.get_bucket_versioning_status(s3_client, bucket)
|
s3_gate_bucket.get_bucket_versioning_status(s3_client, bucket)
|
||||||
s3_gate_bucket.set_bucket_versioning(s3_client, bucket, status=status)
|
s3_gate_bucket.set_bucket_versioning(s3_client, bucket, status=status)
|
||||||
|
|
|
@ -18,6 +18,7 @@ from pytest_tests.helpers.aws_cli_client import AwsCliClient
|
||||||
from pytest_tests.helpers.cli_helpers import _cmd_run, _configure_aws_cli, _run_with_passwd
|
from pytest_tests.helpers.cli_helpers import _cmd_run, _configure_aws_cli, _run_with_passwd
|
||||||
from pytest_tests.helpers.cluster import Cluster
|
from pytest_tests.helpers.cluster import Cluster
|
||||||
from pytest_tests.helpers.container import list_containers
|
from pytest_tests.helpers.container import list_containers
|
||||||
|
from pytest_tests.helpers.s3_helper import set_bucket_versioning
|
||||||
from pytest_tests.resources.common import FROSTFS_AUTHMATE_EXEC
|
from pytest_tests.resources.common import FROSTFS_AUTHMATE_EXEC
|
||||||
from pytest_tests.steps import s3_gate_bucket, s3_gate_object
|
from pytest_tests.steps import s3_gate_bucket, s3_gate_object
|
||||||
from pytest_tests.steps.cluster_test_base import ClusterTestBase
|
from pytest_tests.steps.cluster_test_base import ClusterTestBase
|
||||||
|
@ -46,14 +47,13 @@ class TestS3GateBase(ClusterTestBase):
|
||||||
def s3_client(
|
def s3_client(
|
||||||
self, default_wallet, client_shell: Shell, request: FixtureRequest, cluster: Cluster
|
self, default_wallet, client_shell: Shell, request: FixtureRequest, cluster: Cluster
|
||||||
) -> Any:
|
) -> Any:
|
||||||
wallet = default_wallet
|
|
||||||
s3_bearer_rules_file = f"{os.getcwd()}/pytest_tests/resources/files/s3_bearer_rules.json"
|
s3_bearer_rules_file = f"{os.getcwd()}/pytest_tests/resources/files/s3_bearer_rules.json"
|
||||||
policy = None if isinstance(request.param, str) else request.param[1]
|
policy = None if isinstance(request.param, str) else request.param[1]
|
||||||
(cid, bucket, access_key_id, secret_access_key, owner_private_key,) = init_s3_credentials(
|
(cid, bucket, access_key_id, secret_access_key, owner_private_key,) = init_s3_credentials(
|
||||||
wallet, cluster, s3_bearer_rules_file=s3_bearer_rules_file, policy=policy
|
default_wallet, cluster, s3_bearer_rules_file=s3_bearer_rules_file, policy=policy
|
||||||
)
|
)
|
||||||
containers_list = list_containers(
|
containers_list = list_containers(
|
||||||
wallet, shell=client_shell, endpoint=self.cluster.default_rpc_endpoint
|
default_wallet, shell=client_shell, endpoint=self.cluster.default_rpc_endpoint
|
||||||
)
|
)
|
||||||
assert cid in containers_list, f"Expected cid {cid} in {containers_list}"
|
assert cid in containers_list, f"Expected cid {cid} in {containers_list}"
|
||||||
|
|
||||||
|
@ -66,12 +66,20 @@ class TestS3GateBase(ClusterTestBase):
|
||||||
access_key_id, secret_access_key, cluster.default_s3_gate_endpoint
|
access_key_id, secret_access_key, cluster.default_s3_gate_endpoint
|
||||||
)
|
)
|
||||||
TestS3GateBase.s3_client = client
|
TestS3GateBase.s3_client = client
|
||||||
TestS3GateBase.wallet = wallet
|
TestS3GateBase.wallet = default_wallet
|
||||||
|
|
||||||
@pytest.fixture
|
@pytest.fixture
|
||||||
@allure.title("Create/delete bucket")
|
@allure.title("Create/delete bucket")
|
||||||
def bucket(self):
|
def bucket(self, request: FixtureRequest):
|
||||||
bucket = s3_gate_bucket.create_bucket_s3(self.s3_client)
|
bucket = s3_gate_bucket.create_bucket_s3(self.s3_client)
|
||||||
|
|
||||||
|
versioning_status: Optional[s3_gate_bucket.VersioningStatus] = None
|
||||||
|
if "param" in request.__dict__:
|
||||||
|
versioning_status = request.param
|
||||||
|
|
||||||
|
if versioning_status:
|
||||||
|
set_bucket_versioning(self.s3_client, bucket, versioning_status)
|
||||||
|
|
||||||
yield bucket
|
yield bucket
|
||||||
self.delete_all_object_in_bucket(bucket)
|
self.delete_all_object_in_bucket(bucket)
|
||||||
|
|
||||||
|
|
|
@ -375,7 +375,7 @@ def list_multipart_uploads_s3(s3_client, bucket_name: str) -> Optional[list[dict
|
||||||
|
|
||||||
|
|
||||||
@allure.step("Abort multipart upload S3")
|
@allure.step("Abort multipart upload S3")
|
||||||
def abort_multipart_uploads_s3(s3_client, bucket_name: str, object_key: str, upload_id: str):
|
def abort_multipart_upload_s3(s3_client, bucket_name: str, object_key: str, upload_id: str):
|
||||||
try:
|
try:
|
||||||
response = s3_client.abort_multipart_upload(
|
response = s3_client.abort_multipart_upload(
|
||||||
Bucket=bucket_name, Key=object_key, UploadId=upload_id
|
Bucket=bucket_name, Key=object_key, UploadId=upload_id
|
||||||
|
|
|
@ -281,7 +281,7 @@ class TestS3Gate(TestS3GateBase):
|
||||||
uploads[0].get("UploadId") == upload_id
|
uploads[0].get("UploadId") == upload_id
|
||||||
), f"Expected correct UploadId {upload_id} in upload {uploads}"
|
), f"Expected correct UploadId {upload_id} in upload {uploads}"
|
||||||
|
|
||||||
s3_gate_object.abort_multipart_uploads_s3(self.s3_client, bucket, object_key, upload_id)
|
s3_gate_object.abort_multipart_upload_s3(self.s3_client, bucket, object_key, upload_id)
|
||||||
uploads = s3_gate_object.list_multipart_uploads_s3(self.s3_client, bucket)
|
uploads = s3_gate_object.list_multipart_uploads_s3(self.s3_client, bucket)
|
||||||
assert not uploads, f"Expected there is no uploads in bucket {bucket}"
|
assert not uploads, f"Expected there is no uploads in bucket {bucket}"
|
||||||
|
|
||||||
|
|
|
@ -1,12 +1,11 @@
|
||||||
|
import logging
|
||||||
|
|
||||||
import allure
|
import allure
|
||||||
import pytest
|
import pytest
|
||||||
|
|
||||||
|
import pytest_tests.helpers.container as container
|
||||||
from pytest_tests.helpers.file_helper import generate_file, get_file_hash, split_file
|
from pytest_tests.helpers.file_helper import generate_file, get_file_hash, split_file
|
||||||
from pytest_tests.helpers.s3_helper import (
|
from pytest_tests.helpers.s3_helper import check_objects_in_bucket, object_key_from_file_path
|
||||||
check_objects_in_bucket,
|
|
||||||
object_key_from_file_path,
|
|
||||||
set_bucket_versioning,
|
|
||||||
)
|
|
||||||
from pytest_tests.steps import s3_gate_bucket, s3_gate_object
|
from pytest_tests.steps import s3_gate_bucket, s3_gate_object
|
||||||
from pytest_tests.steps.s3_gate_base import TestS3GateBase
|
from pytest_tests.steps.s3_gate_base import TestS3GateBase
|
||||||
|
|
||||||
|
@ -22,10 +21,13 @@ def pytest_generate_tests(metafunc):
|
||||||
@pytest.mark.s3_gate
|
@pytest.mark.s3_gate
|
||||||
@pytest.mark.s3_gate_multipart
|
@pytest.mark.s3_gate_multipart
|
||||||
class TestS3GateMultipart(TestS3GateBase):
|
class TestS3GateMultipart(TestS3GateBase):
|
||||||
|
NO_SUCH_UPLOAD = (
|
||||||
|
"The upload ID may be invalid, or the upload may have been aborted or completed."
|
||||||
|
)
|
||||||
|
|
||||||
@allure.title("Test S3 Object Multipart API")
|
@allure.title("Test S3 Object Multipart API")
|
||||||
def test_s3_object_multipart(self):
|
@pytest.mark.parametrize("bucket", [s3_gate_bucket.VersioningStatus.ENABLED], indirect=True)
|
||||||
bucket = s3_gate_bucket.create_bucket_s3(self.s3_client)
|
def test_s3_object_multipart(self, bucket):
|
||||||
set_bucket_versioning(self.s3_client, bucket, s3_gate_bucket.VersioningStatus.ENABLED)
|
|
||||||
parts_count = 5
|
parts_count = 5
|
||||||
file_name_large = generate_file(PART_SIZE * parts_count) # 5Mb - min part
|
file_name_large = generate_file(PART_SIZE * parts_count) # 5Mb - min part
|
||||||
object_key = object_key_from_file_path(file_name_large)
|
object_key = object_key_from_file_path(file_name_large)
|
||||||
|
@ -66,37 +68,63 @@ class TestS3GateMultipart(TestS3GateBase):
|
||||||
got_object = s3_gate_object.get_object_s3(self.s3_client, bucket, object_key)
|
got_object = s3_gate_object.get_object_s3(self.s3_client, bucket, object_key)
|
||||||
assert get_file_hash(got_object) == get_file_hash(file_name_large)
|
assert get_file_hash(got_object) == get_file_hash(file_name_large)
|
||||||
|
|
||||||
@allure.title("Test S3 Multipart abord")
|
@allure.title("Test S3 Multipart abort")
|
||||||
def test_s3_abort_multipart(self):
|
@pytest.mark.parametrize("bucket", [s3_gate_bucket.VersioningStatus.ENABLED], indirect=True)
|
||||||
bucket = s3_gate_bucket.create_bucket_s3(self.s3_client)
|
def test_s3_abort_multipart(
|
||||||
set_bucket_versioning(self.s3_client, bucket, s3_gate_bucket.VersioningStatus.ENABLED)
|
self, bucket: str, simple_object_size: int, complex_object_size: int
|
||||||
parts_count = 5
|
):
|
||||||
file_name_large = generate_file(PART_SIZE * parts_count) # 5Mb - min part
|
complex_file = generate_file(complex_object_size)
|
||||||
object_key = object_key_from_file_path(file_name_large)
|
simple_file = generate_file(simple_object_size)
|
||||||
part_files = split_file(file_name_large, parts_count)
|
to_upload = [complex_file, complex_file, simple_file]
|
||||||
parts = []
|
files_count = len(to_upload)
|
||||||
|
upload_key = "multipart_abort"
|
||||||
|
|
||||||
with allure.step("Upload first part"):
|
with allure.step(f"Get related container_id for bucket '{bucket}'"):
|
||||||
|
container_id = container.search_container_by_name(
|
||||||
|
self.wallet, bucket, self.shell, self.cluster.default_rpc_endpoint
|
||||||
|
)
|
||||||
|
|
||||||
|
with allure.step("Create multipart upload"):
|
||||||
upload_id = s3_gate_object.create_multipart_upload_s3(
|
upload_id = s3_gate_object.create_multipart_upload_s3(
|
||||||
self.s3_client, bucket, object_key
|
self.s3_client, bucket, upload_key
|
||||||
)
|
)
|
||||||
uploads = s3_gate_object.list_multipart_uploads_s3(self.s3_client, bucket)
|
|
||||||
etag = s3_gate_object.upload_part_s3(
|
with allure.step(f"Upload {files_count} files to multipart upload"):
|
||||||
self.s3_client, bucket, object_key, upload_id, 1, part_files[0]
|
for i, file in enumerate(to_upload, 1):
|
||||||
|
s3_gate_object.upload_part_s3(
|
||||||
|
self.s3_client, bucket, upload_key, upload_id, i, file
|
||||||
)
|
)
|
||||||
parts.append((1, etag))
|
|
||||||
got_parts = s3_gate_object.list_parts_s3(self.s3_client, bucket, object_key, upload_id)
|
with allure.step(f"Check that we have {files_count} files in bucket"):
|
||||||
assert len(got_parts) == 1, f"Expected {1} parts, got\n{got_parts}"
|
parts = s3_gate_object.list_parts_s3(self.s3_client, bucket, upload_key, upload_id)
|
||||||
|
assert len(parts) == files_count, f"Expected {files_count} parts, got\n{parts}"
|
||||||
|
|
||||||
|
with allure.step(f"Check that we have {files_count} files in container '{container_id}'"):
|
||||||
|
objects = container.list_objects(
|
||||||
|
self.wallet, self.shell, container_id, self.cluster.default_rpc_endpoint
|
||||||
|
)
|
||||||
|
assert (
|
||||||
|
len(objects) == files_count
|
||||||
|
), f"Expected {files_count} objects in container, got\n{objects}"
|
||||||
|
|
||||||
with allure.step("Abort multipart upload"):
|
with allure.step("Abort multipart upload"):
|
||||||
s3_gate_object.abort_multipart_uploads_s3(self.s3_client, bucket, object_key, upload_id)
|
s3_gate_object.abort_multipart_upload_s3(self.s3_client, bucket, upload_key, upload_id)
|
||||||
uploads = s3_gate_object.list_multipart_uploads_s3(self.s3_client, bucket)
|
uploads = s3_gate_object.list_multipart_uploads_s3(self.s3_client, bucket)
|
||||||
assert not uploads, f"Expected there is no uploads in bucket {bucket}"
|
assert not uploads, f"Expected no uploads in bucket {bucket}"
|
||||||
|
|
||||||
|
with allure.step("Check that we have no files in bucket since upload was aborted"):
|
||||||
|
with pytest.raises(Exception, match=self.NO_SUCH_UPLOAD):
|
||||||
|
s3_gate_object.list_parts_s3(self.s3_client, bucket, upload_key, upload_id)
|
||||||
|
|
||||||
|
with allure.step("Check that we have no files in container since upload was aborted"):
|
||||||
|
objects = container.list_objects(
|
||||||
|
self.wallet, self.shell, container_id, self.cluster.default_rpc_endpoint
|
||||||
|
)
|
||||||
|
assert len(objects) == 0, f"Expected no objects in container, got\n{objects}"
|
||||||
|
|
||||||
@allure.title("Test S3 Upload Part Copy")
|
@allure.title("Test S3 Upload Part Copy")
|
||||||
def test_s3_multipart_copy(self):
|
@pytest.mark.parametrize("bucket", [s3_gate_bucket.VersioningStatus.ENABLED], indirect=True)
|
||||||
bucket = s3_gate_bucket.create_bucket_s3(self.s3_client)
|
def test_s3_multipart_copy(self, bucket):
|
||||||
set_bucket_versioning(self.s3_client, bucket, s3_gate_bucket.VersioningStatus.ENABLED)
|
|
||||||
parts_count = 3
|
parts_count = 3
|
||||||
file_name_large = generate_file(PART_SIZE * parts_count) # 5Mb - min part
|
file_name_large = generate_file(PART_SIZE * parts_count) # 5Mb - min part
|
||||||
object_key = object_key_from_file_path(file_name_large)
|
object_key = object_key_from_file_path(file_name_large)
|
||||||
|
@ -104,7 +132,7 @@ class TestS3GateMultipart(TestS3GateBase):
|
||||||
parts = []
|
parts = []
|
||||||
objs = []
|
objs = []
|
||||||
|
|
||||||
with allure.step(f"Put {parts_count} objec in bucket"):
|
with allure.step(f"Put {parts_count} objects in bucket"):
|
||||||
for part in part_files:
|
for part in part_files:
|
||||||
s3_gate_object.put_object_s3(self.s3_client, bucket, part)
|
s3_gate_object.put_object_s3(self.s3_client, bucket, part)
|
||||||
objs.append(object_key_from_file_path(part))
|
objs.append(object_key_from_file_path(part))
|
||||||
|
|
Loading…
Reference in a new issue