diff --git a/pytest_tests/helpers/s3_helper.py b/pytest_tests/helpers/s3_helper.py index 0c9ece9..c6146e9 100644 --- a/pytest_tests/helpers/s3_helper.py +++ b/pytest_tests/helpers/s3_helper.py @@ -43,3 +43,7 @@ def set_bucket_versioning(s3_client, bucket: str, status: s3_gate_bucket.Version s3_gate_bucket.set_bucket_versioning(s3_client, bucket, status=status) bucket_status = s3_gate_bucket.get_bucket_versioning_status(s3_client, bucket) assert bucket_status == status.value, f"Expected {bucket_status} status. Got {status.value}" + + +def object_key_from_file_path(full_path: str) -> str: + return os.path.basename(full_path) diff --git a/pytest_tests/steps/aws_cli_client.py b/pytest_tests/steps/aws_cli_client.py index 2a50790..a00043b 100644 --- a/pytest_tests/steps/aws_cli_client.py +++ b/pytest_tests/steps/aws_cli_client.py @@ -28,7 +28,12 @@ class AwsCliClient: logger.info(f"Executing command: {cmd}") _configure_aws_cli(cmd, self.access_key_id, self.secret_access_key) - def create_bucket(self, Bucket: str, ObjectLockEnabledForBucket: bool = None): + def create_bucket( + self, + Bucket: str, + ObjectLockEnabledForBucket: Optional[bool] = None, + ACL: Optional[str] = None, + ): if ObjectLockEnabledForBucket is None: object_lock = "" elif ObjectLockEnabledForBucket: @@ -39,6 +44,8 @@ class AwsCliClient: f"aws {self.common_flags} s3api create-bucket --bucket {Bucket} " f"{object_lock} --endpoint {S3_GATE}" ) + if ACL: + cmd += f" --acl {ACL}" _cmd_run(cmd, REGULAR_TIMEOUT) def list_buckets(self) -> dict: @@ -46,6 +53,14 @@ class AwsCliClient: output = _cmd_run(cmd) return self._to_json(output) + def get_bucket_acl(self, Bucket: str) -> dict: + cmd = ( + f"aws {self.common_flags} s3api get-bucket-acl --bucket {Bucket} " + f"--endpoint {S3_GATE}" + ) + output = _cmd_run(cmd, REGULAR_TIMEOUT) + return self._to_json(output) + def get_bucket_versioning(self, Bucket: str) -> dict: cmd = ( f"aws {self.common_flags} s3api get-bucket-versioning --bucket {Bucket} " @@ -196,6 +211,47 @@ class AwsCliClient: output = _cmd_run(cmd, REGULAR_TIMEOUT) return self._to_json(output) + def put_object_acl( + self, + Bucket: str, + Key: str, + ACL: Optional[str] = None, + GrantWrite: Optional[str] = None, + GrantRead: Optional[str] = None, + ) -> dict: + cmd = ( + f"aws {self.common_flags} s3api put-object-acl --bucket {Bucket} --key {Key} " + f" --endpoint {S3_GATE}" + ) + if ACL: + cmd += f" --acl {ACL}" + if GrantWrite: + cmd += f" --grant-write {GrantWrite}" + if GrantRead: + cmd += f" --grant-read {GrantRead}" + output = _cmd_run(cmd, REGULAR_TIMEOUT) + return self._to_json(output) + + def put_bucket_acl( + self, + Bucket: str, + ACL: Optional[str] = None, + GrantWrite: Optional[str] = None, + GrantRead: Optional[str] = None, + ) -> dict: + cmd = ( + f"aws {self.common_flags} s3api put-bucket-acl --bucket {Bucket} " + f" --endpoint {S3_GATE}" + ) + if ACL: + cmd += f" --acl {ACL}" + if GrantWrite: + cmd += f" --grant-write {GrantWrite}" + if GrantRead: + cmd += f" --grant-read {GrantRead}" + output = _cmd_run(cmd, REGULAR_TIMEOUT) + return self._to_json(output) + def delete_objects(self, Bucket: str, Delete: dict) -> dict: file_path = f"{os.getcwd()}/{ASSETS_DIR}/delete.json" with open(file_path, "w") as out_file: diff --git a/pytest_tests/steps/s3_gate_bucket.py b/pytest_tests/steps/s3_gate_bucket.py index 87e1f82..1dcad0b 100644 --- a/pytest_tests/steps/s3_gate_bucket.py +++ b/pytest_tests/steps/s3_gate_bucket.py @@ -17,6 +17,8 @@ from cli_helpers import _run_with_passwd, log_command_execution from common import NEOFS_ENDPOINT, S3_GATE, S3_GATE_WALLET_PASS, S3_GATE_WALLET_PATH from data_formatters import get_wallet_public_key +from steps.aws_cli_client import AwsCliClient + ########################################################## # Disabling warnings on self-signed certificate which the # boto library produces on requests to S3-gate in dev-env. @@ -102,13 +104,17 @@ def config_s3_client(access_key_id: str, secret_access_key: str): @allure.step("Create bucket S3") -def create_bucket_s3(s3_client, object_lock_enabled_for_bucket: Optional[bool] = None): +def create_bucket_s3( + s3_client, object_lock_enabled_for_bucket: Optional[bool] = None, acl: Optional[str] = None +) -> str: bucket_name = str(uuid.uuid4()) try: params = {"Bucket": bucket_name} if object_lock_enabled_for_bucket is not None: params.update({"ObjectLockEnabledForBucket": object_lock_enabled_for_bucket}) + if acl is not None: + params.update({"ACL": acl}) s3_bucket = s3_client.create_bucket(**params) log_command_execution(f"Created S3 bucket {bucket_name}", s3_bucket) sleep(S3_SYNC_WAIT_TIME) @@ -206,6 +212,17 @@ def put_bucket_tagging(s3_client, bucket_name: str, tags: list): raise Exception(f"Got error during put bucket tagging: {err}") from err +@allure.step("Get bucket acl") +def get_bucket_acl(s3_client, bucket_name: str) -> list: + try: + response = s3_client.get_bucket_acl(Bucket=bucket_name) + log_command_execution("S3 Get bucket acl", response) + return response.get("Grants") + + except ClientError as err: + raise Exception(f"Got error during get bucket tagging: {err}") from err + + @allure.step("Get bucket tagging") def get_bucket_tagging(s3_client, bucket_name: str) -> list: try: @@ -225,3 +242,31 @@ def delete_bucket_tagging(s3_client, bucket_name: str) -> None: except ClientError as err: raise Exception(f"Got error during delete bucket tagging: {err}") from err + + +@allure.step("Put bucket ACL") +def put_bucket_acl_s3( + s3_client, + bucket: str, + acl: Optional[str] = None, + grant_write: Optional[str] = None, + grant_read: Optional[str] = None, +) -> list: + params = {"Bucket": bucket} + if acl: + params.update({"ACL": acl}) + elif grant_write or grant_read: + if grant_write: + params.update({"GrantWrite": grant_write}) + elif grant_read: + params.update({"GrantRead": grant_read}) + try: + response = s3_client.put_bucket_acl(**params) + log_command_execution("S3 ACL bucket result", response) + return response.get("Grants") + + except ClientError as err: + raise Exception( + f'Error Message: {err.response["Error"]["Message"]}\n' + f'Http status code: {err.response["ResponseMetadata"]["HTTPStatusCode"]}' + ) from err diff --git a/pytest_tests/steps/s3_gate_object.py b/pytest_tests/steps/s3_gate_object.py index b77fbec..3f603f2 100644 --- a/pytest_tests/steps/s3_gate_object.py +++ b/pytest_tests/steps/s3_gate_object.py @@ -8,6 +8,7 @@ from time import sleep from typing import Optional import allure +import pytest import urllib3 from botocore.exceptions import ClientError from cli_helpers import log_command_execution @@ -188,8 +189,41 @@ def delete_object_versions_s3(s3_client, bucket: str, object_versions: list): ) from err +@allure.step("Put object ACL") +def put_object_acl_s3( + s3_client, + bucket: str, + object_key: str, + acl: Optional[str] = None, + grant_write: Optional[str] = None, + grant_read: Optional[str] = None, +) -> list: + if not isinstance(s3_client, AwsCliClient): + pytest.skip("Method put_object_acl is not supported by boto3 client") + params = {"Bucket": bucket, "Key": object_key} + if acl: + params.update({"ACL": acl}) + elif grant_write or grant_read: + if grant_write: + params.update({"GrantWrite": grant_write}) + elif grant_read: + params.update({"GrantRead": grant_read}) + try: + response = s3_client.put_object_acl(**params) + log_command_execution("S3 ACL objects result", response) + return response.get("Grants") + + except ClientError as err: + raise Exception( + f'Error Message: {err.response["Error"]["Message"]}\n' + f'Http status code: {err.response["ResponseMetadata"]["HTTPStatusCode"]}' + ) from err + + @allure.step("Get object ACL") -def get_object_acl_s3(s3_client, bucket: str, object_key: str, version_id: Optional[str] = None): +def get_object_acl_s3( + s3_client, bucket: str, object_key: str, version_id: Optional[str] = None +) -> list: params = {"Bucket": bucket, "Key": object_key} try: if version_id: @@ -208,7 +242,7 @@ def get_object_acl_s3(s3_client, bucket: str, object_key: str, version_id: Optio @allure.step("Copy object S3") def copy_object_s3( s3_client, bucket: str, object_key: str, bucket_dst: Optional[str] = None, **kwargs -): +) -> str: filename = f"{os.getcwd()}/{uuid.uuid4()}" try: params = { diff --git a/pytest_tests/testsuites/services/s3_gate/test_s3_ACL.py b/pytest_tests/testsuites/services/s3_gate/test_s3_ACL.py new file mode 100644 index 0000000..808468a --- /dev/null +++ b/pytest_tests/testsuites/services/s3_gate/test_s3_ACL.py @@ -0,0 +1,98 @@ +import os + +import allure +import pytest +from python_keywords.utility_keywords import generate_file +from s3_helper import object_key_from_file_path + +from steps import s3_gate_bucket, s3_gate_object +from steps.s3_gate_base import TestS3GateBase + + +def pytest_generate_tests(metafunc): + if "s3_client" in metafunc.fixturenames: + metafunc.parametrize("s3_client", ["aws cli", "boto3"], indirect=True) + + +@pytest.mark.s3_gate +class TestS3GateACL(TestS3GateBase): + @allure.title("Test S3: Object ACL") + def test_s3_object_ACL(self): + file_path = generate_file() + file_name = object_key_from_file_path(file_path) + + bucket = s3_gate_bucket.create_bucket_s3(self.s3_client, True, acl="public-read-write") + objects_list = s3_gate_object.list_objects_s3(self.s3_client, bucket) + assert not objects_list, f"Expected empty bucket, got {objects_list}" + + with allure.step("Put object into bucket, Check ACL is empty"): + s3_gate_object.put_object_s3(self.s3_client, bucket, file_path) + obj_acl = s3_gate_object.get_object_acl_s3(self.s3_client, bucket, file_name) + assert obj_acl == [], f"Expected ACL is empty, got {obj_acl}" + + with allure.step("Put object ACL = public-read"): + s3_gate_object.put_object_acl_s3(self.s3_client, bucket, file_name, "public-read") + obj_acl = s3_gate_object.get_object_acl_s3(self.s3_client, bucket, file_name) + obj_permission = [permission.get("Permission") for permission in obj_acl] + assert obj_permission == [ + "FULL_CONTROL", + "FULL_CONTROL", + ], "Permission for all groups is FULL_CONTROL" + + with allure.step("Put object ACL = private"): + s3_gate_object.put_object_acl_s3(self.s3_client, bucket, file_name, "private") + obj_acl = s3_gate_object.get_object_acl_s3(self.s3_client, bucket, file_name) + obj_permission = [permission.get("Permission") for permission in obj_acl] + assert obj_permission == [ + "FULL_CONTROL", + ], "Permission for Canonical User is FULL_CONTROL" + + with allure.step( + "Put object with grant-read uri=http://acs.amazonaws.com/groups/global/AllUsers" + ): + s3_gate_object.put_object_acl_s3( + self.s3_client, + bucket, + file_name, + grant_read="uri=http://acs.amazonaws.com/groups/global/AllUsers", + ) + obj_acl = s3_gate_object.get_object_acl_s3(self.s3_client, bucket, file_name) + obj_permission = [permission.get("Permission") for permission in obj_acl] + assert obj_permission == [ + "FULL_CONTROL", + "FULL_CONTROL", + ], "Permission for all groups is FULL_CONTROL" + + @allure.title("Test S3: Bucket ACL") + def test_s3_bucket_ACL(self): + with allure.step("Create bucket with ACL = public-read-write"): + bucket = s3_gate_bucket.create_bucket_s3(self.s3_client, True, acl="public-read-write") + bucket_acl = s3_gate_bucket.get_bucket_acl(self.s3_client, bucket) + bucket_permission = [permission.get("Permission") for permission in bucket_acl] + assert bucket_permission == [ + "FULL_CONTROL", + "FULL_CONTROL", + ], "Permission for all groups is FULL_CONTROL" + + with allure.step("Change bucket ACL to private"): + s3_gate_bucket.put_bucket_acl_s3(self.s3_client, bucket, acl="private") + bucket_acl = s3_gate_bucket.get_bucket_acl(self.s3_client, bucket) + bucket_permission = [permission.get("Permission") for permission in bucket_acl] + assert bucket_permission == [ + "FULL_CONTROL" + ], "Permission for CanonicalUser is FULL_CONTROL" + + with allure.step( + "Change bucket acl to --grant-write uri=http://acs.amazonaws.com/groups/global/AllUsers" + ): + s3_gate_bucket.put_bucket_acl_s3( + self.s3_client, + bucket, + grant_write="uri=http://acs.amazonaws.com/groups/global/AllUsers", + ) + bucket_acl = s3_gate_bucket.get_bucket_acl(self.s3_client, bucket) + bucket_permission = [permission.get("Permission") for permission in bucket_acl] + assert bucket_permission == [ + "FULL_CONTROL", + "FULL_CONTROL", + ], "Permission for all groups is FULL_CONTROL"