From c4173bf804df551faf51b703f7e3c39ed1bce28e Mon Sep 17 00:00:00 2001 From: Kirill Sosnovskikh Date: Wed, 23 Oct 2024 14:08:54 +0300 Subject: [PATCH] [#311] Add AWS CLI command to report from Boto3 request Signed-off-by: Kirill Sosnovskikh --- src/frostfs_testlib/s3/boto3_client.py | 1000 +++++++++++++++--------- src/frostfs_testlib/utils/cli_utils.py | 72 +- 2 files changed, 672 insertions(+), 400 deletions(-) diff --git a/src/frostfs_testlib/s3/boto3_client.py b/src/frostfs_testlib/s3/boto3_client.py index a99b866..91d8c5a 100644 --- a/src/frostfs_testlib/s3/boto3_client.py +++ b/src/frostfs_testlib/s3/boto3_client.py @@ -1,8 +1,8 @@ import json import logging import os +from collections.abc import Callable from datetime import datetime -from functools import wraps from time import sleep from typing import Literal, Optional, Union @@ -28,48 +28,32 @@ logger = logging.getLogger("NeoLogger") urllib3.disable_warnings() -def report_error(func): - @wraps(func) - def deco(*a, **kw): - try: - return func(*a, **kw) - except ClientError as err: - url = None - params = {"args": a, "kwargs": kw} - - if isinstance(a[0], Boto3ClientWrapper): - client: Boto3ClientWrapper = a[0] - url = client.s3gate_endpoint - params = {"args": a[1:], "kwargs": kw} - - log_command_execution(url, f"Failed {err.operation_name}", err.response, params) - raise - - return deco - - class Boto3ClientWrapper(S3ClientWrapper): __repr_name__: str = "Boto3 client" @reporter.step("Configure S3 client (boto3)") - @report_error def __init__( self, access_key_id: str, secret_access_key: str, s3gate_endpoint: str, profile: str = "default", region: str = "us-east-1" ) -> None: self.boto3_client: S3Client = None - self.session = boto3.Session() + self.s3gate_endpoint: str = "" + + self.boto3_iam_client: S3Client = None + self.iam_endpoint: str = "" + + self.access_key_id: str = access_key_id + self.secret_access_key: str = secret_access_key + self.profile = profile self.region = region + + self.session = boto3.Session() self.config = Config( retries={ "max_attempts": MAX_REQUEST_ATTEMPTS, "mode": RETRY_MODE, } ) - self.access_key_id: str = access_key_id - self.secret_access_key: str = secret_access_key - self.s3gate_endpoint: str = "" - self.iam_endpoint: str = "" - self.boto3_iam_client: S3Client = None + self.set_endpoint(s3gate_endpoint) @reporter.step("Set endpoint S3 to {s3gate_endpoint}") @@ -116,13 +100,24 @@ class Boto3ClientWrapper(S3ClientWrapper): return result def _convert_to_s3_params(self, scope: dict, exclude: Optional[list[str]] = None) -> dict: - if not exclude: - exclude = ["self"] - return {self._to_s3_param(param): value for param, value in scope if param not in exclude and value is not None} + exclude = ["self", "cls"] if not exclude else exclude + ["self", "cls"] + return {self._to_s3_param(param): value for param, value in scope.items() if param not in exclude and value is not None} + + def _exec_request(self, method: Callable, params: Optional[dict] = None, **kwargs): + if not params: + params = {} + + try: + result = method(**params) + except ClientError as err: + log_command_execution(method.__name__, err.response, params, **kwargs) + raise + + log_command_execution(method.__name__, result, params, **kwargs) + return result # BUCKET METHODS # @reporter.step("Create bucket S3") - @report_error def create_bucket( self, bucket: Optional[str] = None, @@ -151,81 +146,98 @@ class Boto3ClientWrapper(S3ClientWrapper): if location_constraint: params.update({"CreateBucketConfiguration": {"LocationConstraint": location_constraint}}) - s3_bucket = self.boto3_client.create_bucket(**params) - log_command_execution(self.s3gate_endpoint, f"Created S3 bucket {bucket}", s3_bucket, params) + self._exec_request(self.boto3_client.create_bucket, params, endpoint=self.s3gate_endpoint, profile=self.profile) return bucket @reporter.step("List buckets S3") - @report_error def list_buckets(self) -> list[str]: - found_buckets = [] - - response = self.boto3_client.list_buckets() - log_command_execution(self.s3gate_endpoint, "S3 List buckets result", response) - - for bucket in response["Buckets"]: - found_buckets.append(bucket["Name"]) - - return found_buckets + response = self._exec_request( + self.boto3_client.list_buckets, + endpoint=self.s3gate_endpoint, + profile=self.profile, + ) + return [bucket["Name"] for bucket in response["Buckets"]] @reporter.step("Delete bucket S3") - @report_error def delete_bucket(self, bucket: str) -> None: - response = self.boto3_client.delete_bucket(Bucket=bucket) - log_command_execution(self.s3gate_endpoint, "S3 Delete bucket result", response, {"Bucket": bucket}) + self._exec_request( + self.boto3_client.delete_bucket, + params={"Bucket": bucket}, + endpoint=self.s3gate_endpoint, + profile=self.profile, + ) @reporter.step("Head bucket S3") - @report_error def head_bucket(self, bucket: str) -> None: - response = self.boto3_client.head_bucket(Bucket=bucket) - log_command_execution(self.s3gate_endpoint, "S3 Head bucket result", response, {"Bucket": bucket}) + self._exec_request( + self.boto3_client.head_bucket, + params={"Bucket": bucket}, + endpoint=self.s3gate_endpoint, + profile=self.profile, + ) @reporter.step("Put bucket versioning status") - @report_error def put_bucket_versioning(self, bucket: str, status: VersioningStatus) -> None: params = {"Bucket": bucket, "VersioningConfiguration": {"Status": status.value}} - response = self.boto3_client.put_bucket_versioning(**params) - log_command_execution(self.s3gate_endpoint, "S3 Set bucket versioning to", response, params) + self._exec_request( + self.boto3_client.put_bucket_versioning, + params, + endpoint=self.s3gate_endpoint, + profile=self.profile, + ) @reporter.step("Get bucket versioning status") - @report_error def get_bucket_versioning_status(self, bucket: str) -> Literal["Enabled", "Suspended"]: - response = self.boto3_client.get_bucket_versioning(Bucket=bucket) - status = response.get("Status") - log_command_execution(self.s3gate_endpoint, "S3 Got bucket versioning status", response, {"Bucket": bucket}) - return status + response = self._exec_request( + self.boto3_client.get_bucket_versioning, + params={"Bucket": bucket}, + endpoint=self.s3gate_endpoint, + profile=self.profile, + ) + return response.get("Status") @reporter.step("Put bucket tagging") - @report_error def put_bucket_tagging(self, bucket: str, tags: list) -> None: tags = [{"Key": tag_key, "Value": tag_value} for tag_key, tag_value in tags] tagging = {"TagSet": tags} - params = self._convert_to_s3_params(locals().items(), exclude=["self", "tags"]) - response = self.boto3_client.put_bucket_tagging(**params) - log_command_execution(self.s3gate_endpoint, "S3 Put bucket tagging", response, params) + params = self._convert_to_s3_params(locals(), exclude=["tags"]) + self._exec_request( + self.boto3_client.put_bucket_tagging, + params, + endpoint=self.s3gate_endpoint, + profile=self.profile, + ) @reporter.step("Get bucket tagging") - @report_error def get_bucket_tagging(self, bucket: str) -> list: - response = self.boto3_client.get_bucket_tagging(Bucket=bucket) - log_command_execution(self.s3gate_endpoint, "S3 Get bucket tagging", response, {"Bucket": bucket}) + response = self._exec_request( + self.boto3_client.get_bucket_tagging, + params={"Bucket": bucket}, + endpoint=self.s3gate_endpoint, + profile=self.profile, + ) return response.get("TagSet") @reporter.step("Get bucket acl") - @report_error def get_bucket_acl(self, bucket: str) -> list: - response = self.boto3_client.get_bucket_acl(Bucket=bucket) - log_command_execution(self.s3gate_endpoint, "S3 Get bucket acl", response, {"Bucket": bucket}) + response = self._exec_request( + self.boto3_client.get_bucket_acl, + params={"Bucket": bucket}, + endpoint=self.s3gate_endpoint, + profile=self.profile, + ) return response.get("Grants") @reporter.step("Delete bucket tagging") - @report_error def delete_bucket_tagging(self, bucket: str) -> None: - response = self.boto3_client.delete_bucket_tagging(Bucket=bucket) - log_command_execution(self.s3gate_endpoint, "S3 Delete bucket tagging", response, {"Bucket": bucket}) + self._exec_request( + self.boto3_client.delete_bucket_tagging, + params={"Bucket": bucket}, + endpoint=self.s3gate_endpoint, + profile=self.profile, + ) @reporter.step("Put bucket ACL") - @report_error def put_bucket_acl( self, bucket: str, @@ -233,141 +245,181 @@ class Boto3ClientWrapper(S3ClientWrapper): grant_write: Optional[str] = None, grant_read: Optional[str] = None, ) -> None: - params = self._convert_to_s3_params(locals().items()) - response = self.boto3_client.put_bucket_acl(**params) - log_command_execution(self.s3gate_endpoint, "S3 ACL bucket result", response, params) + params = self._convert_to_s3_params(locals()) + self._exec_request( + self.boto3_client.put_bucket_acl, + params, + endpoint=self.s3gate_endpoint, + profile=self.profile, + ) @reporter.step("Put object lock configuration") - @report_error def put_object_lock_configuration(self, bucket: str, configuration: dict) -> dict: params = {"Bucket": bucket, "ObjectLockConfiguration": configuration} - response = self.boto3_client.put_object_lock_configuration(**params) - log_command_execution(self.s3gate_endpoint, "S3 put_object_lock_configuration result", response, params) - return response + return self._exec_request( + self.boto3_client.put_object_lock_configuration, + params, + endpoint=self.s3gate_endpoint, + profile=self.profile, + ) @reporter.step("Get object lock configuration") - @report_error def get_object_lock_configuration(self, bucket: str) -> dict: - response = self.boto3_client.get_object_lock_configuration(Bucket=bucket) - log_command_execution(self.s3gate_endpoint, "S3 get_object_lock_configuration result", response, {"Bucket": bucket}) + response = self._exec_request( + self.boto3_client.get_object_lock_configuration, + params={"Bucket": bucket}, + endpoint=self.s3gate_endpoint, + profile=self.profile, + ) return response.get("ObjectLockConfiguration") @reporter.step("Get bucket policy") - @report_error def get_bucket_policy(self, bucket: str) -> str: - response = self.boto3_client.get_bucket_policy(Bucket=bucket) - log_command_execution(self.s3gate_endpoint, "S3 get_bucket_policy result", response, {"Bucket": bucket}) + response = self._exec_request( + self.boto3_client.get_bucket_policy, + params={"Bucket": bucket}, + endpoint=self.s3gate_endpoint, + profile=self.profile, + ) return response.get("Policy") @reporter.step("Delete bucket policy") - @report_error def delete_bucket_policy(self, bucket: str) -> str: - response = self.boto3_client.delete_bucket_policy(Bucket=bucket) - log_command_execution(self.s3gate_endpoint, "S3 delete_bucket_policy result", response, {"Bucket": bucket}) - return response + return self._exec_request( + self.boto3_client.delete_bucket_policy, + params={"Bucket": bucket}, + endpoint=self.s3gate_endpoint, + profile=self.profile, + ) @reporter.step("Put bucket policy") - @report_error def put_bucket_policy(self, bucket: str, policy: dict) -> None: params = {"Bucket": bucket, "Policy": json.dumps(policy)} - response = self.boto3_client.put_bucket_policy(**params) - log_command_execution(self.s3gate_endpoint, "S3 put_bucket_policy result", response, params) - return response + return self._exec_request( + self.boto3_client.put_bucket_policy, + params, + # Overriding option for AWS CLI + policy=policy, + endpoint=self.s3gate_endpoint, + profile=self.profile, + ) @reporter.step("Get bucket cors") - @report_error def get_bucket_cors(self, bucket: str) -> dict: - response = self.boto3_client.get_bucket_cors(Bucket=bucket) - log_command_execution(self.s3gate_endpoint, "S3 get_bucket_cors result", response, {"Bucket": bucket}) + response = self._exec_request( + self.boto3_client.get_bucket_cors, + params={"Bucket": bucket}, + endpoint=self.s3gate_endpoint, + profile=self.profile, + ) return response.get("CORSRules") @reporter.step("Get bucket location") - @report_error def get_bucket_location(self, bucket: str) -> str: - response = self.boto3_client.get_bucket_location(Bucket=bucket) - log_command_execution(self.s3gate_endpoint, "S3 get_bucket_location result", response, {"Bucket": bucket}) + response = self._exec_request( + self.boto3_client.get_bucket_location, + params={"Bucket": bucket}, + endpoint=self.s3gate_endpoint, + profile=self.profile, + ) return response.get("LocationConstraint") @reporter.step("Put bucket cors") - @report_error def put_bucket_cors(self, bucket: str, cors_configuration: dict) -> None: - params = self._convert_to_s3_params(locals().items()) - response = self.boto3_client.put_bucket_cors(**params) - log_command_execution(self.s3gate_endpoint, "S3 put_bucket_cors result", response, params) - return response + params = self._convert_to_s3_params(locals()) + return self._exec_request( + self.boto3_client.put_bucket_cors, + params, + endpoint=self.s3gate_endpoint, + profile=self.profile, + ) @reporter.step("Delete bucket cors") - @report_error def delete_bucket_cors(self, bucket: str) -> None: - response = self.boto3_client.delete_bucket_cors(Bucket=bucket) - log_command_execution(self.s3gate_endpoint, "S3 delete_bucket_cors result", response, {"Bucket": bucket}) + self._exec_request( + self.boto3_client.delete_bucket_cors, + params={"Bucket": bucket}, + endpoint=self.s3gate_endpoint, + profile=self.profile, + ) @reporter.step("Put bucket lifecycle configuration") - @report_error def put_bucket_lifecycle_configuration(self, bucket: str, lifecycle_configuration: dict, dumped_configuration: str) -> dict: - response = self.boto3_client.put_bucket_lifecycle_configuration(Bucket=bucket, LifecycleConfiguration=lifecycle_configuration) - log_command_execution(self.s3gate_endpoint, "S3 put_bucket_lifecycle_configuration result", response, {"Bucket": bucket}) - return response + params = self._convert_to_s3_params(locals(), exclude=["dumped_configuration"]) + return self._exec_request( + self.boto3_client.put_bucket_lifecycle_configuration, + params, + endpoint=self.s3gate_endpoint, + profile=self.profile, + ) @reporter.step("Get bucket lifecycle configuration") - @report_error def get_bucket_lifecycle_configuration(self, bucket: str) -> dict: - response = self.boto3_client.get_bucket_lifecycle_configuration(Bucket=bucket) - log_command_execution(self.s3gate_endpoint, "S3 get_bucket_lifecycle_configuration result", response, {"Bucket": bucket}) + response = self._exec_request( + self.boto3_client.get_bucket_lifecycle_configuration, + params={"Bucket": bucket}, + endpoint=self.s3gate_endpoint, + profile=self.profile, + ) return {"Rules": response.get("Rules")} @reporter.step("Delete bucket lifecycle configuration") - @report_error def delete_bucket_lifecycle(self, bucket: str) -> dict: - response = self.boto3_client.delete_bucket_lifecycle(Bucket=bucket) - log_command_execution(self.s3gate_endpoint, "S3 delete_bucket_lifecycle result", response, {"Bucket": bucket}) - return response + return self._exec_request( + self.boto3_client.delete_bucket_lifecycle, + params={"Bucket": bucket}, + endpoint=self.s3gate_endpoint, + profile=self.profile, + ) # END OF BUCKET METHODS # # OBJECT METHODS # @reporter.step("List objects S3 v2") - @report_error def list_objects_v2(self, bucket: str, full_output: bool = False) -> Union[dict, list[str]]: - params = self._convert_to_s3_params(locals().items()) - response = self.boto3_client.list_objects_v2(Bucket=bucket) - log_command_execution(self.s3gate_endpoint, "S3 v2 List objects result", response, params) - + response = self._exec_request( + self.boto3_client.list_objects_v2, + params={"Bucket": bucket}, + endpoint=self.s3gate_endpoint, + profile=self.profile, + ) obj_list = [obj["Key"] for obj in response.get("Contents", [])] logger.info(f"Found s3 objects: {obj_list}") - return response if full_output else obj_list @reporter.step("List objects S3") - @report_error def list_objects(self, bucket: str, full_output: bool = False) -> Union[dict, list[str]]: - params = self._convert_to_s3_params(locals().items()) - response = self.boto3_client.list_objects(Bucket=bucket) - log_command_execution(self.s3gate_endpoint, "S3 List objects result", response, params) - + response = self._exec_request( + self.boto3_client.list_objects, + params={"Bucket": bucket}, + endpoint=self.s3gate_endpoint, + profile=self.profile, + ) obj_list = [obj["Key"] for obj in response.get("Contents", [])] logger.info(f"Found s3 objects: {obj_list}") - return response if full_output else obj_list @reporter.step("List objects versions S3") - @report_error def list_objects_versions(self, bucket: str, full_output: bool = False) -> dict: - params = self._convert_to_s3_params(locals().items()) - response = self.boto3_client.list_object_versions(Bucket=bucket) - log_command_execution(self.s3gate_endpoint, "S3 List objects versions result", response, params) + response = self._exec_request( + self.boto3_client.list_object_versions, + params={"Bucket": bucket}, + endpoint=self.s3gate_endpoint, + profile=self.profile, + ) return response if full_output else response.get("Versions", []) @reporter.step("List objects delete markers S3") - @report_error def list_delete_markers(self, bucket: str, full_output: bool = False) -> list: - params = self._convert_to_s3_params(locals().items()) - response = self.boto3_client.list_object_versions(Bucket=bucket) - log_command_execution(self.s3gate_endpoint, "S3 List objects delete markers result", response, params) + response = self._exec_request( + self.boto3_client.list_object_versions, + params={"Bucket": bucket}, + endpoint=self.s3gate_endpoint, + profile=self.profile, + ) return response if full_output else response.get("DeleteMarkers", []) @reporter.step("Put object S3") - @report_error def put_object( self, bucket: str, @@ -388,40 +440,53 @@ class Boto3ClientWrapper(S3ClientWrapper): with open(filepath, "rb") as put_file: body = put_file.read() - params = self._convert_to_s3_params(locals().items(), exclude=["self", "filepath", "put_file", "body"]) - response = self.boto3_client.put_object(Body=body, **params) - log_command_execution(self.s3gate_endpoint, "S3 Put object result", response, params) + params = self._convert_to_s3_params(locals(), exclude=["filepath", "put_file"]) + response = self._exec_request( + self.boto3_client.put_object, + params, + body=filepath, + endpoint=self.s3gate_endpoint, + profile=self.profile, + ) return response.get("VersionId") @reporter.step("Head object S3") - @report_error def head_object(self, bucket: str, key: str, version_id: Optional[str] = None) -> dict: - params = self._convert_to_s3_params(locals().items()) - response = self.boto3_client.head_object(**params) - log_command_execution(self.s3gate_endpoint, "S3 Head object result", response, params) - return response + params = self._convert_to_s3_params(locals()) + return self._exec_request( + self.boto3_client.head_object, + params, + endpoint=self.s3gate_endpoint, + profile=self.profile, + ) @reporter.step("Delete object S3") - @report_error def delete_object(self, bucket: str, key: str, version_id: Optional[str] = None) -> dict: - params = self._convert_to_s3_params(locals().items()) - response = self.boto3_client.delete_object(**params) - log_command_execution(self.s3gate_endpoint, "S3 Delete object result", response, params) - return response + params = self._convert_to_s3_params(locals()) + return self._exec_request( + self.boto3_client.delete_object, + params, + endpoint=self.s3gate_endpoint, + profile=self.profile, + ) @reporter.step("Delete objects S3") - @report_error def delete_objects(self, bucket: str, keys: list[str]) -> dict: params = {"Bucket": bucket, "Delete": _make_objs_dict(keys)} - response = self.boto3_client.delete_objects(**params) - log_command_execution(self.s3gate_endpoint, "S3 Delete objects result", response, params) + response = self._exec_request( + self.boto3_client.delete_objects, + params, + endpoint=self.s3gate_endpoint, + profile=self.profile, + ) + assert ( "Errors" not in response ), f'The following objects have not been deleted: {[err_info["Key"] for err_info in response["Errors"]]}.\nError Message: {response["Errors"]["Message"]}' + return response @reporter.step("Delete object versions S3") - @report_error def delete_object_versions(self, bucket: str, object_versions: list) -> dict: # Build deletion list in S3 format delete_list = { @@ -434,21 +499,26 @@ class Boto3ClientWrapper(S3ClientWrapper): ] } params = {"Bucket": bucket, "Delete": delete_list} - response = self.boto3_client.delete_objects(**params) - log_command_execution(self.s3gate_endpoint, "S3 Delete objects result", response, params) - return response + return self._exec_request( + self.boto3_client.delete_objects, + params, + endpoint=self.s3gate_endpoint, + profile=self.profile, + ) @reporter.step("Delete object versions S3 without delete markers") - @report_error def delete_object_versions_without_dm(self, bucket: str, object_versions: list) -> None: # Delete objects without creating delete markers for object_version in object_versions: params = {"Bucket": bucket, "Key": object_version["Key"], "VersionId": object_version["VersionId"]} - response = self.boto3_client.delete_object(**params) - log_command_execution(self.s3gate_endpoint, "S3 Delete object result", response, params) + self._exec_request( + self.boto3_client.delete_object, + params, + endpoint=self.s3gate_endpoint, + profile=self.profile, + ) @reporter.step("Put object ACL") - @report_error def put_object_acl( self, bucket: str, @@ -457,21 +527,27 @@ class Boto3ClientWrapper(S3ClientWrapper): grant_write: Optional[str] = None, grant_read: Optional[str] = None, ) -> list: - params = self._convert_to_s3_params(locals().items()) - response = self.boto3_client.put_object_acl(**params) - log_command_execution(self.s3gate_endpoint, "S3 put object ACL", response, params) + params = self._convert_to_s3_params(locals()) + response = self._exec_request( + self.boto3_client.put_object_acl, + params, + endpoint=self.s3gate_endpoint, + profile=self.profile, + ) return response.get("Grants") @reporter.step("Get object ACL") - @report_error def get_object_acl(self, bucket: str, key: str, version_id: Optional[str] = None) -> list: - params = self._convert_to_s3_params(locals().items()) - response = self.boto3_client.get_object_acl(**params) - log_command_execution(self.s3gate_endpoint, "S3 ACL objects result", response, params) + params = self._convert_to_s3_params(locals()) + response = self._exec_request( + self.boto3_client.get_object_acl, + params, + endpoint=self.s3gate_endpoint, + profile=self.profile, + ) return response.get("Grants") @reporter.step("Copy object S3") - @report_error def copy_object( self, source_bucket: str, @@ -486,17 +562,22 @@ class Boto3ClientWrapper(S3ClientWrapper): ) -> str: if bucket is None: bucket = source_bucket + if key is None: key = string_utils.unique_name("copy-object-") - copy_source = f"{source_bucket}/{source_key}" - params = self._convert_to_s3_params(locals().items(), exclude=["self", "source_bucket", "source_key"]) - response = self.boto3_client.copy_object(**params) - log_command_execution(self.s3gate_endpoint, "S3 Copy objects result", response, params) + copy_source = f"{source_bucket}/{source_key}" + params = self._convert_to_s3_params(locals(), exclude=["source_bucket", "source_key"]) + + self._exec_request( + self.boto3_client.copy_object, + params, + endpoint=self.s3gate_endpoint, + profile=self.profile, + ) return key @reporter.step("Get object S3") - @report_error def get_object( self, bucket: str, @@ -509,12 +590,15 @@ class Boto3ClientWrapper(S3ClientWrapper): if object_range: range_str = f"bytes={object_range[0]}-{object_range[1]}" - params = self._convert_to_s3_params( - {**locals(), **{"Range": range_str}}.items(), - exclude=["self", "object_range", "full_output", "range_str"], + params = locals() + params.update({"Range": f"bytes={object_range[0]}-{object_range[1]}"} if object_range else {}) + params = self._convert_to_s3_params(params, exclude=["object_range", "full_output", "range_str"]) + response = self._exec_request( + self.boto3_client.get_object, + params, + endpoint=self.s3gate_endpoint, + profile=self.profile, ) - response = self.boto3_client.get_object(**params) - log_command_execution(self.s3gate_endpoint, "S3 Get objects result", response, params) if full_output: return response @@ -528,78 +612,93 @@ class Boto3ClientWrapper(S3ClientWrapper): return test_file @reporter.step("Create multipart upload S3") - @report_error def create_multipart_upload(self, bucket: str, key: str) -> str: - params = self._convert_to_s3_params(locals().items()) - response = self.boto3_client.create_multipart_upload(**params) - log_command_execution(self.s3gate_endpoint, "S3 Created multipart upload", response, params) + params = self._convert_to_s3_params(locals()) + response = self._exec_request( + self.boto3_client.create_multipart_upload, + params, + endpoint=self.s3gate_endpoint, + profile=self.profile, + ) assert response.get("UploadId"), f"Expected UploadId in response:\n{response}" - return response["UploadId"] @reporter.step("List multipart uploads S3") - @report_error def list_multipart_uploads(self, bucket: str) -> Optional[list[dict]]: - response = self.boto3_client.list_multipart_uploads(Bucket=bucket) - log_command_execution(self.s3gate_endpoint, "S3 List multipart upload", response, {"Bucket": bucket}) - + response = self._exec_request( + self.boto3_client.list_multipart_uploads, + params={"Bucket": bucket}, + endpoint=self.s3gate_endpoint, + profile=self.profile, + ) return response.get("Uploads") @reporter.step("Abort multipart upload S3") - @report_error def abort_multipart_upload(self, bucket: str, key: str, upload_id: str) -> None: - params = self._convert_to_s3_params(locals().items()) - response = self.boto3_client.abort_multipart_upload(**params) - log_command_execution(self.s3gate_endpoint, "S3 Abort multipart upload", response, params) + params = self._convert_to_s3_params(locals()) + self._exec_request( + self.boto3_client.abort_multipart_upload, + params, + endpoint=self.s3gate_endpoint, + profile=self.profile, + ) @reporter.step("Upload part S3") - @report_error def upload_part(self, bucket: str, key: str, upload_id: str, part_num: int, filepath: str) -> str: with open(filepath, "rb") as put_file: body = put_file.read() - params = self._convert_to_s3_params(locals().items(), exclude=["self", "put_file", "part_num", "filepath", "body"]) + params = self._convert_to_s3_params(locals(), exclude=["put_file", "part_num", "filepath"]) params["PartNumber"] = part_num - response = self.boto3_client.upload_part(Body=body, **params) - log_command_execution(self.s3gate_endpoint, "S3 Upload part", response, params) - assert response.get("ETag"), f"Expected ETag in response:\n{response}" + response = self._exec_request( + self.boto3_client.upload_part, + params, + body=filepath, + endpoint=self.s3gate_endpoint, + profile=self.profile, + ) + assert response.get("ETag"), f"Expected ETag in response:\n{response}" return response["ETag"] @reporter.step("Upload copy part S3") - @report_error def upload_part_copy(self, bucket: str, key: str, upload_id: str, part_num: int, copy_source: str) -> str: - params = self._convert_to_s3_params(locals().items(), exclude=["self", "put_file", "part_num", "filepath"]) + params = self._convert_to_s3_params(locals(), exclude=["put_file", "part_num", "filepath"]) params["PartNumber"] = part_num - response = self.boto3_client.upload_part_copy(**params) - log_command_execution(self.s3gate_endpoint, "S3 Upload copy part", response, params) + response = self._exec_request( + self.boto3_client.upload_part_copy, + params, + endpoint=self.s3gate_endpoint, + profile=self.profile, + ) assert response.get("CopyPartResult", []).get("ETag"), f"Expected ETag in response:\n{response}" - return response["CopyPartResult"]["ETag"] @reporter.step("List parts S3") - @report_error def list_parts(self, bucket: str, key: str, upload_id: str) -> list[dict]: - params = self._convert_to_s3_params(locals().items()) - response = self.boto3_client.list_parts(**params) - log_command_execution(self.s3gate_endpoint, "S3 List part", response, params) + params = self._convert_to_s3_params(locals()) + response = self._exec_request( + self.boto3_client.list_parts, + params, + endpoint=self.s3gate_endpoint, + profile=self.profile, + ) assert response.get("Parts"), f"Expected Parts in response:\n{response}" - return response["Parts"] @reporter.step("Complete multipart upload S3") - @report_error def complete_multipart_upload(self, bucket: str, key: str, upload_id: str, parts: list) -> None: parts = [{"ETag": etag, "PartNumber": part_num} for part_num, etag in parts] - params = self._convert_to_s3_params(locals().items(), exclude=["self", "parts"]) + params = self._convert_to_s3_params(locals(), exclude=["parts"]) params["MultipartUpload"] = {"Parts": parts} - response = self.boto3_client.complete_multipart_upload(**params) - log_command_execution(self.s3gate_endpoint, "S3 Complete multipart upload", response, params) - - return response + return self._exec_request( + self.boto3_client.complete_multipart_upload, + params, + endpoint=self.s3gate_endpoint, + profile=self.profile, + ) @reporter.step("Put object retention") - @report_error def put_object_retention( self, bucket: str, @@ -608,12 +707,15 @@ class Boto3ClientWrapper(S3ClientWrapper): version_id: Optional[str] = None, bypass_governance_retention: Optional[bool] = None, ) -> None: - params = self._convert_to_s3_params(locals().items()) - response = self.boto3_client.put_object_retention(**params) - log_command_execution(self.s3gate_endpoint, "S3 Put object retention ", response, params) + params = self._convert_to_s3_params(locals()) + self._exec_request( + self.boto3_client.put_object_retention, + params, + endpoint=self.s3gate_endpoint, + profile=self.profile, + ) @reporter.step("Put object legal hold") - @report_error def put_object_legal_hold( self, bucket: str, @@ -622,36 +724,48 @@ class Boto3ClientWrapper(S3ClientWrapper): version_id: Optional[str] = None, ) -> None: legal_hold = {"Status": legal_hold_status} - params = self._convert_to_s3_params(locals().items(), exclude=["self", "legal_hold_status"]) - response = self.boto3_client.put_object_legal_hold(**params) - log_command_execution(self.s3gate_endpoint, "S3 Put object legal hold ", response, params) + params = self._convert_to_s3_params(locals(), exclude=["legal_hold_status"]) + self._exec_request( + self.boto3_client.put_object_legal_hold, + params, + endpoint=self.s3gate_endpoint, + profile=self.profile, + ) @reporter.step("Put object tagging") - @report_error def put_object_tagging(self, bucket: str, key: str, tags: list, version_id: Optional[str] = "") -> None: tags = [{"Key": tag_key, "Value": tag_value} for tag_key, tag_value in tags] tagging = {"TagSet": tags} - params = self._convert_to_s3_params(locals().items(), exclude=["self", "tags"]) - response = self.boto3_client.put_object_tagging(**params) - log_command_execution(self.s3gate_endpoint, "S3 Put object tagging", response, params) + params = self._convert_to_s3_params(locals(), exclude=["tags"]) + self._exec_request( + self.boto3_client.put_object_tagging, + params, + endpoint=self.s3gate_endpoint, + profile=self.profile, + ) @reporter.step("Get object tagging") - @report_error def get_object_tagging(self, bucket: str, key: str, version_id: Optional[str] = None) -> list: - params = self._convert_to_s3_params(locals().items()) - response = self.boto3_client.get_object_tagging(**params) - log_command_execution(self.s3gate_endpoint, "S3 Get object tagging", response, params) + params = self._convert_to_s3_params(locals()) + response = self._exec_request( + self.boto3_client.get_object_tagging, + params, + endpoint=self.s3gate_endpoint, + profile=self.profile, + ) return response.get("TagSet") @reporter.step("Delete object tagging") - @report_error def delete_object_tagging(self, bucket: str, key: str) -> None: - params = self._convert_to_s3_params(locals().items()) - response = self.boto3_client.delete_object_tagging(**params) - log_command_execution(self.s3gate_endpoint, "S3 Delete object tagging", response, params) + params = self._convert_to_s3_params(locals()) + self._exec_request( + self.boto3_client.delete_object_tagging, + params, + endpoint=self.s3gate_endpoint, + profile=self.profile, + ) @reporter.step("Get object attributes") - @report_error def get_object_attributes( self, bucket: str, @@ -666,7 +780,6 @@ class Boto3ClientWrapper(S3ClientWrapper): return {} @reporter.step("Sync directory S3") - @report_error def sync( self, bucket: str, @@ -677,7 +790,6 @@ class Boto3ClientWrapper(S3ClientWrapper): raise NotImplementedError("Sync is not supported for boto3 client") @reporter.step("CP directory S3") - @report_error def cp( self, bucket: str, @@ -693,36 +805,47 @@ class Boto3ClientWrapper(S3ClientWrapper): # Some methods don't have checks because boto3 is silent in some cases (delete, attach, etc.) @reporter.step("Adds the specified user to the specified group") - @report_error def iam_add_user_to_group(self, user_name: str, group_name: str) -> dict: - params = self._convert_to_s3_params(locals().items()) - response = self.boto3_iam_client.add_user_to_group(**params) - log_command_execution(self.iam_endpoint, "IAM Add User to Group", response, params) - return response + params = self._convert_to_s3_params(locals()) + return self._exec_request( + self.boto3_iam_client.add_user_to_group, + params, + endpoint=self.iam_endpoint, + profile=self.profile, + ) @reporter.step("Attaches the specified managed policy to the specified IAM group") - @report_error def iam_attach_group_policy(self, group_name: str, policy_arn: str) -> dict: - params = self._convert_to_s3_params(locals().items()) - response = self.boto3_iam_client.attach_group_policy(**params) - log_command_execution(self.iam_endpoint, "IAM Attach Group Policy", response, params) + params = self._convert_to_s3_params(locals()) + response = self._exec_request( + self.boto3_iam_client.attach_group_policy, + params, + endpoint=self.iam_endpoint, + profile=self.profile, + ) sleep(S3_SYNC_WAIT_TIME * 10) return response @reporter.step("Attaches the specified managed policy to the specified user") - @report_error def iam_attach_user_policy(self, user_name: str, policy_arn: str) -> dict: - params = self._convert_to_s3_params(locals().items()) - response = self.boto3_iam_client.attach_user_policy(**params) - log_command_execution(self.iam_endpoint, "IAM Attach User Policy", response, params) + params = self._convert_to_s3_params(locals()) + response = self._exec_request( + self.boto3_iam_client.attach_user_policy, + params, + endpoint=self.iam_endpoint, + profile=self.profile, + ) sleep(S3_SYNC_WAIT_TIME * 10) return response @reporter.step("Creates a new AWS secret access key and access key ID for the specified user") - @report_error def iam_create_access_key(self, user_name: str) -> dict: - response = self.boto3_iam_client.create_access_key(UserName=user_name) - log_command_execution(self.iam_endpoint, "IAM Create Access Key", response, {"UserName": user_name}) + response = self._exec_request( + self.boto3_iam_client.create_access_key, + params={"UserName": user_name}, + endpoint=self.iam_endpoint, + profile=self.profile, + ) access_key_id = response["AccessKey"].get("AccessKeyId") secret_access_key = response["AccessKey"].get("SecretAccessKey") @@ -732,10 +855,13 @@ class Boto3ClientWrapper(S3ClientWrapper): return access_key_id, secret_access_key @reporter.step("Creates a new group") - @report_error def iam_create_group(self, group_name: str) -> dict: - response = self.boto3_iam_client.create_group(GroupName=group_name) - log_command_execution(self.iam_endpoint, "IAM Create Group", response, {"GroupName": group_name}) + response = self._exec_request( + self.boto3_iam_client.create_group, + params={"GroupName": group_name}, + endpoint=self.iam_endpoint, + profile=self.profile, + ) assert response.get("Group"), f"Expected Group in response:\n{response}" assert response["Group"].get("GroupName") == group_name, f"GroupName should be equal to {group_name}" @@ -743,12 +869,17 @@ class Boto3ClientWrapper(S3ClientWrapper): return response @reporter.step("Creates a new managed policy for your AWS account") - @report_error def iam_create_policy(self, policy_name: str, policy_document: dict) -> dict: - params = self._convert_to_s3_params(locals().items()) + params = self._convert_to_s3_params(locals()) params["PolicyDocument"] = json.dumps(policy_document) - response = self.boto3_iam_client.create_policy(**params) - log_command_execution(self.iam_endpoint, "IAM Create Policy", response, params) + response = self._exec_request( + self.boto3_iam_client.create_policy, + params, + # Overriding option for AWS CLI + policy_document=policy_document, + endpoint=self.iam_endpoint, + profile=self.profile, + ) assert response.get("Policy"), f"Expected Policy in response:\n{response}" assert response["Policy"].get("PolicyName") == policy_name, f"PolicyName should be equal to {policy_name}" @@ -756,10 +887,13 @@ class Boto3ClientWrapper(S3ClientWrapper): return response @reporter.step("Creates a new IAM user for your AWS account") - @report_error def iam_create_user(self, user_name: str) -> dict: - response = self.boto3_iam_client.create_user(UserName=user_name) - log_command_execution(self.iam_endpoint, "IAM Create User", response, {"UserName": user_name}) + response = self._exec_request( + self.boto3_iam_client.create_user, + params={"UserName": user_name}, + endpoint=self.iam_endpoint, + profile=self.profile, + ) assert response.get("User"), f"Expected User in response:\n{response}" assert response["User"].get("UserName") == user_name, f"UserName should be equal to {user_name}" @@ -767,89 +901,115 @@ class Boto3ClientWrapper(S3ClientWrapper): return response @reporter.step("Deletes the access key pair associated with the specified IAM user") - @report_error def iam_delete_access_key(self, access_key_id: str, user_name: str) -> dict: - params = self._convert_to_s3_params(locals().items()) - response = self.boto3_iam_client.delete_access_key(**params) - log_command_execution(self.iam_endpoint, "IAM Delete Access Key", response, params) - return response + params = self._convert_to_s3_params(locals()) + return self._exec_request( + self.boto3_iam_client.delete_access_key, + params, + endpoint=self.iam_endpoint, + profile=self.profile, + ) @reporter.step("Deletes the specified IAM group") - @report_error def iam_delete_group(self, group_name: str) -> dict: - response = self.boto3_iam_client.delete_group(GroupName=group_name) - log_command_execution(self.iam_endpoint, "IAM Delete Group", response, {"GroupName": group_name}) - return response + return self._exec_request( + self.boto3_iam_client.delete_group, + params={"GroupName": group_name}, + endpoint=self.iam_endpoint, + profile=self.profile, + ) @reporter.step("Deletes the specified inline policy that is embedded in the specified IAM group") - @report_error def iam_delete_group_policy(self, group_name: str, policy_name: str) -> dict: - params = self._convert_to_s3_params(locals().items()) - response = self.boto3_iam_client.delete_group_policy(**params) - log_command_execution(self.iam_endpoint, "IAM Delete Group Policy", response, params) - return response + params = self._convert_to_s3_params(locals()) + return self._exec_request( + self.boto3_iam_client.delete_group_policy, + params, + endpoint=self.iam_endpoint, + profile=self.profile, + ) @reporter.step("Deletes the specified managed policy") - @report_error def iam_delete_policy(self, policy_arn: str) -> dict: - response = self.boto3_iam_client.delete_policy(PolicyArn=policy_arn) - log_command_execution(self.iam_endpoint, "IAM Delete Policy", response, {"PolicyArn": policy_arn}) - return response + return self._exec_request( + self.boto3_iam_client.delete_policy, + params={"PolicyArn": policy_arn}, + endpoint=self.iam_endpoint, + profile=self.profile, + ) @reporter.step("Deletes the specified IAM user") - @report_error def iam_delete_user(self, user_name: str) -> dict: - response = self.boto3_iam_client.delete_user(UserName=user_name) - log_command_execution(self.iam_endpoint, "IAM Delete User", response, {"UserName": user_name}) - return response + return self._exec_request( + self.boto3_iam_client.delete_user, + params={"UserName": user_name}, + endpoint=self.iam_endpoint, + profile=self.profile, + ) @reporter.step("Deletes the specified inline policy that is embedded in the specified IAM user") - @report_error def iam_delete_user_policy(self, user_name: str, policy_name: str) -> dict: - params = self._convert_to_s3_params(locals().items()) - response = self.boto3_iam_client.delete_user_policy(**params) - log_command_execution(self.iam_endpoint, "IAM Delete User Policy", response, params) - return response + params = self._convert_to_s3_params(locals()) + return self._exec_request( + self.boto3_iam_client.delete_user_policy, + params, + endpoint=self.iam_endpoint, + profile=self.profile, + ) @reporter.step("Removes the specified managed policy from the specified IAM group") - @report_error def iam_detach_group_policy(self, group_name: str, policy_arn: str) -> dict: - params = self._convert_to_s3_params(locals().items()) - response = self.boto3_iam_client.detach_group_policy(**params) - log_command_execution(self.iam_endpoint, "IAM Detach Group Policy", response, params) + params = self._convert_to_s3_params(locals()) + response = self._exec_request( + self.boto3_iam_client.detach_group_policy, + params, + endpoint=self.iam_endpoint, + profile=self.profile, + ) sleep(S3_SYNC_WAIT_TIME * 10) return response @reporter.step("Removes the specified managed policy from the specified user") - @report_error def iam_detach_user_policy(self, user_name: str, policy_arn: str) -> dict: - params = self._convert_to_s3_params(locals().items()) - response = self.boto3_iam_client.detach_user_policy(**params) - log_command_execution(self.iam_endpoint, "IAM Detach User Policy", response, params) + params = self._convert_to_s3_params(locals()) + response = self._exec_request( + self.boto3_iam_client.detach_user_policy, + params, + endpoint=self.iam_endpoint, + profile=self.profile, + ) sleep(S3_SYNC_WAIT_TIME * 10) return response @reporter.step("Returns a list of IAM users that are in the specified IAM group") - @report_error def iam_get_group(self, group_name: str) -> dict: - response = self.boto3_iam_client.get_group(GroupName=group_name) - log_command_execution(self.iam_endpoint, "IAM Get Group", response, {"GroupName": group_name}) + response = self._exec_request( + self.boto3_iam_client.get_group, + params={"GroupName": group_name}, + endpoint=self.iam_endpoint, + profile=self.profile, + ) assert response.get("Group").get("GroupName") == group_name, f"GroupName should be equal to {group_name}" return response @reporter.step("Retrieves the specified inline policy document that is embedded in the specified IAM group") - @report_error def iam_get_group_policy(self, group_name: str, policy_name: str) -> dict: - params = self._convert_to_s3_params(locals().items()) - response = self.boto3_iam_client.get_group_policy(**params) - log_command_execution(self.iam_endpoint, "IAM Get Group Policy", response, params) - return response + params = self._convert_to_s3_params(locals()) + return self._exec_request( + self.boto3_iam_client.get_group_policy, + params, + endpoint=self.iam_endpoint, + profile=self.profile, + ) @reporter.step("Retrieves information about the specified managed policy") - @report_error def iam_get_policy(self, policy_arn: str) -> dict: - response = self.boto3_iam_client.get_policy(PolicyArn=policy_arn) - log_command_execution(self.iam_endpoint, "IAM Get Policy", response, {"PolicyArn": policy_arn}) + response = self._exec_request( + self.boto3_iam_client.get_policy, + params={"PolicyArn": policy_arn}, + endpoint=self.iam_endpoint, + profile=self.profile, + ) assert response.get("Policy"), f"Expected Policy in response:\n{response}" assert response["Policy"].get("Arn") == policy_arn, f"PolicyArn should be equal to {policy_arn}" @@ -857,11 +1017,14 @@ class Boto3ClientWrapper(S3ClientWrapper): return response @reporter.step("Retrieves information about the specified version of the specified managed policy") - @report_error def iam_get_policy_version(self, policy_arn: str, version_id: str) -> dict: - params = self._convert_to_s3_params(locals().items()) - response = self.boto3_iam_client.get_policy_version(**params) - log_command_execution(self.iam_endpoint, "IAM Get Policy Version", response, params) + params = self._convert_to_s3_params(locals()) + response = self._exec_request( + self.boto3_iam_client.get_policy_version, + params, + endpoint=self.iam_endpoint, + profile=self.profile, + ) assert response.get("PolicyVersion"), f"Expected PolicyVersion in response:\n{response}" assert response["PolicyVersion"].get("VersionId") == version_id, f"VersionId should be equal to {version_id}" @@ -869,10 +1032,13 @@ class Boto3ClientWrapper(S3ClientWrapper): return response @reporter.step("Retrieves information about the specified IAM user") - @report_error def iam_get_user(self, user_name: str) -> dict: - response = self.boto3_iam_client.get_user(UserName=user_name) - log_command_execution(self.iam_endpoint, "IAM Get User", response, {"UserName": user_name}) + response = self._exec_request( + self.boto3_iam_client.get_user, + params={"UserName": user_name}, + endpoint=self.iam_endpoint, + profile=self.profile, + ) assert response.get("User"), f"Expected User in response:\n{response}" assert response["User"].get("UserName") == user_name, f"UserName should be equal to {user_name}" @@ -880,42 +1046,56 @@ class Boto3ClientWrapper(S3ClientWrapper): return response @reporter.step("Retrieves the specified inline policy document that is embedded in the specified IAM user") - @report_error def iam_get_user_policy(self, user_name: str, policy_name: str) -> dict: - params = self._convert_to_s3_params(locals().items()) - response = self.boto3_iam_client.get_user_policy(**params) - log_command_execution(self.iam_endpoint, "IAM Get User Policy", response, params) + params = self._convert_to_s3_params(locals()) + response = self._exec_request( + self.boto3_iam_client.get_user_policy, + params, + endpoint=self.iam_endpoint, + profile=self.profile, + ) assert response.get("UserName"), f"Expected UserName in response:\n{response}" return response @reporter.step("Returns information about the access key IDs associated with the specified IAM user") - @report_error def iam_list_access_keys(self, user_name: str) -> dict: - response = self.boto3_iam_client.list_access_keys(UserName=user_name) - log_command_execution(self.iam_endpoint, "IAM List Access Keys", response, {"UserName": user_name}) - return response + return self._exec_request( + self.boto3_iam_client.list_access_keys, + params={"UserName": user_name}, + endpoint=self.iam_endpoint, + profile=self.profile, + ) @reporter.step("Lists all managed policies that are attached to the specified IAM group") - @report_error def iam_list_attached_group_policies(self, group_name: str) -> dict: - response = self.boto3_iam_client.list_attached_group_policies(GroupName=group_name) - log_command_execution(self.iam_endpoint, "IAM List Attached Group Policies", response, {"GroupName": group_name}) + response = self._exec_request( + self.boto3_iam_client.list_attached_group_policies, + params={"GroupName": group_name}, + endpoint=self.iam_endpoint, + profile=self.profile, + ) assert response.get("AttachedPolicies"), f"Expected AttachedPolicies in response:\n{response}" return response @reporter.step("Lists all managed policies that are attached to the specified IAM user") - @report_error def iam_list_attached_user_policies(self, user_name: str) -> dict: - response = self.boto3_iam_client.list_attached_user_policies(UserName=user_name) - log_command_execution(self.iam_endpoint, "IAM List Attached User Policies", response, {"UserName": user_name}) + response = self._exec_request( + self.boto3_iam_client.list_attached_user_policies, + params={"UserName": user_name}, + endpoint=self.iam_endpoint, + profile=self.profile, + ) assert response.get("AttachedPolicies"), f"Expected AttachedPolicies in response:\n{response}" return response @reporter.step("Lists all IAM users, groups, and roles that the specified managed policy is attached to") - @report_error def iam_list_entities_for_policy(self, policy_arn: str) -> dict: - response = self.boto3_iam_client.list_entities_for_policy(PolicyArn=policy_arn) - log_command_execution(self.iam_endpoint, "IAM List Entities For Policy", response, {"PolicyArn": policy_arn}) + response = self._exec_request( + self.boto3_iam_client.list_entities_for_policy, + params={"PolicyArn": policy_arn}, + endpoint=self.iam_endpoint, + profile=self.profile, + ) assert response.get("PolicyGroups"), f"Expected PolicyGroups in response:\n{response}" assert response.get("PolicyUsers"), f"Expected PolicyUsers in response:\n{response}" @@ -923,125 +1103,165 @@ class Boto3ClientWrapper(S3ClientWrapper): return response @reporter.step("Lists the names of the inline policies that are embedded in the specified IAM group") - @report_error def iam_list_group_policies(self, group_name: str) -> dict: - response = self.boto3_iam_client.list_group_policies(GroupName=group_name) - log_command_execution(self.iam_endpoint, "IAM List Group Policies", response, {"GroupName": group_name}) + response = self._exec_request( + self.boto3_iam_client.list_group_policies, + params={"GroupName": group_name}, + endpoint=self.iam_endpoint, + profile=self.profile, + ) assert response.get("PolicyNames"), f"Expected PolicyNames in response:\n{response}" return response @reporter.step("Lists the IAM groups") - @report_error def iam_list_groups(self) -> dict: - response = self.boto3_iam_client.list_groups() - log_command_execution(self.iam_endpoint, "IAM List Groups", response) + response = self._exec_request( + self.boto3_iam_client.list_groups, + endpoint=self.iam_endpoint, + profile=self.profile, + ) assert response.get("Groups"), f"Expected Groups in response:\n{response}" return response @reporter.step("Lists the IAM groups that the specified IAM user belongs to") - @report_error def iam_list_groups_for_user(self, user_name: str) -> dict: - response = self.boto3_iam_client.list_groups_for_user(UserName=user_name) - log_command_execution(self.iam_endpoint, "IAM List Groups For User", response, {"UserName": user_name}) + response = self._exec_request( + self.boto3_iam_client.list_groups_for_user, + params={"UserName": user_name}, + endpoint=self.iam_endpoint, + profile=self.profile, + ) assert response.get("Groups"), f"Expected Groups in response:\n{response}" return response @reporter.step("Lists all the managed policies that are available in your AWS account") - @report_error def iam_list_policies(self) -> dict: - response = self.boto3_iam_client.list_policies() - log_command_execution(self.iam_endpoint, "IAM List Policies", response) + response = self._exec_request( + self.boto3_iam_client.list_policies, + endpoint=self.iam_endpoint, + profile=self.profile, + ) assert response.get("Policies"), f"Expected Policies in response:\n{response}" return response @reporter.step("Lists information about the versions of the specified managed policy") - @report_error def iam_list_policy_versions(self, policy_arn: str) -> dict: - response = self.boto3_iam_client.list_policy_versions(PolicyArn=policy_arn) - log_command_execution(self.iam_endpoint, "IAM List Policy Versions", response, {"PolicyArn": policy_arn}) + response = self._exec_request( + self.boto3_iam_client.list_policy_versions, + params={"PolicyArn": policy_arn}, + endpoint=self.iam_endpoint, + profile=self.profile, + ) assert response.get("Versions"), f"Expected Versions in response:\n{response}" return response @reporter.step("Lists the names of the inline policies embedded in the specified IAM user") - @report_error def iam_list_user_policies(self, user_name: str) -> dict: - response = self.boto3_iam_client.list_user_policies(UserName=user_name) - log_command_execution(self.iam_endpoint, "IAM List User Policies", response, {"UserName": user_name}) + response = self._exec_request( + self.boto3_iam_client.list_user_policies, + params={"UserName": user_name}, + endpoint=self.iam_endpoint, + profile=self.profile, + ) assert response.get("PolicyNames"), f"Expected PolicyNames in response:\n{response}" return response @reporter.step("Lists the IAM users") - @report_error def iam_list_users(self) -> dict: - response = self.boto3_iam_client.list_users() - log_command_execution(self.iam_endpoint, "IAM List Users", response) + response = self._exec_request( + self.boto3_iam_client.list_users, + endpoint=self.iam_endpoint, + profile=self.profile, + ) assert response.get("Users"), f"Expected Users in response:\n{response}" return response @reporter.step("Adds or updates an inline policy document that is embedded in the specified IAM group") - @report_error def iam_put_group_policy(self, group_name: str, policy_name: str, policy_document: dict) -> dict: - params = self._convert_to_s3_params(locals().items()) + params = self._convert_to_s3_params(locals()) params["PolicyDocument"] = json.dumps(policy_document) - response = self.boto3_iam_client.put_group_policy(**params) - log_command_execution(self.iam_endpoint, "IAM Put Group Policy", response, params) + response = self._exec_request( + self.boto3_iam_client.put_group_policy, + params, + # Overriding option for AWS CLI + policy_document=policy_document, + endpoint=self.iam_endpoint, + profile=self.profile, + ) sleep(S3_SYNC_WAIT_TIME * 10) return response @reporter.step("Adds or updates an inline policy document that is embedded in the specified IAM user") - @report_error def iam_put_user_policy(self, user_name: str, policy_name: str, policy_document: dict) -> dict: - params = self._convert_to_s3_params(locals().items()) + params = self._convert_to_s3_params(locals()) params["PolicyDocument"] = json.dumps(policy_document) - response = self.boto3_iam_client.put_user_policy(**params) - log_command_execution(self.iam_endpoint, "IAM Put User Policy", response, params) + response = self._exec_request( + self.boto3_iam_client.put_user_policy, + params, + # Overriding option for AWS CLI + policy_document=policy_document, + endpoint=self.iam_endpoint, + profile=self.profile, + ) sleep(S3_SYNC_WAIT_TIME * 10) return response @reporter.step("Removes the specified user from the specified group") - @report_error def iam_remove_user_from_group(self, group_name: str, user_name: str) -> dict: - params = self._convert_to_s3_params(locals().items()) - response = self.boto3_iam_client.remove_user_from_group(**params) - log_command_execution(self.iam_endpoint, "IAM Remove User From Group", response, params) - return response + params = self._convert_to_s3_params(locals()) + return self._exec_request( + self.boto3_iam_client.remove_user_from_group, + params, + endpoint=self.iam_endpoint, + profile=self.profile, + ) @reporter.step("Updates the name and/or the path of the specified IAM group") - @report_error def iam_update_group(self, group_name: str, new_name: str, new_path: Optional[str] = None) -> dict: params = {"GroupName": group_name, "NewGroupName": new_name, "NewPath": "/"} - response = self.boto3_iam_client.update_group(**params) - log_command_execution(self.iam_endpoint, "IAM Update Group", response, params) - return response + return self._exec_request( + self.boto3_iam_client.update_group, + params, + endpoint=self.iam_endpoint, + profile=self.profile, + ) @reporter.step("Updates the name and/or the path of the specified IAM user") - @report_error def iam_update_user(self, user_name: str, new_name: str, new_path: Optional[str] = None) -> dict: params = {"UserName": user_name, "NewUserName": new_name, "NewPath": "/"} - response = self.boto3_iam_client.update_user(**params) - log_command_execution(self.iam_endpoint, "IAM Update User", response, params) - return response + return self._exec_request( + self.boto3_iam_client.update_user, + params, + endpoint=self.iam_endpoint, + profile=self.profile, + ) @reporter.step("Adds one or more tags to an IAM user") - @report_error def iam_tag_user(self, user_name: str, tags: list) -> dict: - params = self._convert_to_s3_params(locals().items()) + params = self._convert_to_s3_params(locals()) params["Tags"] = [{"Key": tag_key, "Value": tag_value} for tag_key, tag_value in tags] - response = self.boto3_iam_client.tag_user(**params) - log_command_execution(self.iam_endpoint, "IAM Tag User", response, params) - return response + return self._exec_request( + self.boto3_iam_client.tag_user, + params, + endpoint=self.iam_endpoint, + profile=self.profile, + ) @reporter.step("List tags of IAM user") - @report_error def iam_list_user_tags(self, user_name: str) -> dict: - response = self.boto3_iam_client.list_user_tags(UserName=user_name) - log_command_execution(self.iam_endpoint, "IAM List User Tags", response, {"UserName": user_name}) - return response + return self._exec_request( + self.boto3_iam_client.list_user_tags, + params={"UserName": user_name}, + endpoint=self.iam_endpoint, + profile=self.profile, + ) @reporter.step("Removes the specified tags from the user") - @report_error def iam_untag_user(self, user_name: str, tag_keys: list) -> dict: - params = self._convert_to_s3_params(locals().items()) - response = self.boto3_iam_client.untag_user(**params) - log_command_execution(self.iam_endpoint, "IAM Untag User", response, params) - return response + params = self._convert_to_s3_params(locals()) + return self._exec_request( + self.boto3_iam_client.untag_user, + params, + endpoint=self.iam_endpoint, + profile=self.profile, + ) diff --git a/src/frostfs_testlib/utils/cli_utils.py b/src/frostfs_testlib/utils/cli_utils.py index 8e019ea..32e4346 100644 --- a/src/frostfs_testlib/utils/cli_utils.py +++ b/src/frostfs_testlib/utils/cli_utils.py @@ -9,13 +9,12 @@ import csv import json import logging import re -import subprocess import sys from contextlib import suppress from datetime import datetime from io import StringIO from textwrap import shorten -from typing import Dict, List, Optional, TypedDict, Union +from typing import Any, Optional, Union import pexpect @@ -75,22 +74,75 @@ def _attach_allure_log(cmd: str, output: str, return_code: int, start_time: date reporter.attach(command_attachment, "Command execution") -def log_command_execution(url: str, cmd: str, output: Union[str, dict], params: Optional[dict] = None) -> None: +def log_command_execution(cmd: str, output: Union[str, dict], params: Optional[dict] = None, **kwargs) -> None: logger.info(f"{cmd}: {output}") - with suppress(Exception): - json_output = json.dumps(output, indent=4, sort_keys=True) - output = json_output + if not params: + params = {} + + output_params = params try: - json_params = json.dumps(params, indent=4, sort_keys=True) + json_params = json.dumps(params, indent=4, sort_keys=True, default=str) except TypeError as err: logger.warning(f"Failed to serialize '{cmd}' request parameters:\n{params}\nException: {err}") else: - params = json_params + output_params = json_params - command_attachment = f"COMMAND: '{cmd}'\n" f"URL: {url}\n" f"PARAMS:\n{params}\n" f"OUTPUT:\n{output}\n" - reporter.attach(command_attachment, "Command execution") + output = json.dumps(output, indent=4, sort_keys=True, default=str) + + command_execution = f"COMMAND: '{cmd}'\n" f"URL: {kwargs['endpoint']}\n" f"PARAMS:\n{output_params}\n" f"OUTPUT:\n{output}\n" + aws_command = _convert_request_to_aws_cli_command(cmd, params, **kwargs) + + reporter.attach(command_execution, "Command execution") + reporter.attach(aws_command, "AWS CLI Command") + + +def _convert_request_to_aws_cli_command(command: str, params: dict, **kwargs) -> str: + overriden_names = [_convert_json_name_to_aws_cli(name) for name in kwargs.keys()] + command = command.replace("_", "-") + options = [] + + for name, value in params.items(): + name = _convert_json_name_to_aws_cli(name) + + # To override parameters for AWS CLI + if name in overriden_names: + continue + + if option := _create_option(name, value): + options.append(option) + + for name, value in kwargs.items(): + name = _convert_json_name_to_aws_cli(name) + if option := _create_option(name, value): + options.append(option) + + options = " ".join(options) + api = "s3api" if "s3" in kwargs["endpoint"] else "iam" + return f"aws --no-verify-ssl --no-paginate {api} {command} {options}" + + +def _convert_json_name_to_aws_cli(name: str) -> str: + specific_names = {"CORSConfiguration": "cors-configuration"} + + if aws_cli_name := specific_names.get(name): + return aws_cli_name + return re.sub(r"([a-z])([A-Z])", r"\1 \2", name).lower().replace(" ", "-").replace("_", "-") + + +def _create_option(name: str, value: Any) -> str | None: + if isinstance(value, bool) and value: + return f"--{name}" + + if isinstance(value, dict): + value = json.dumps(value, indent=4, sort_keys=True, default=str) + return f"--{name} '{value}'" + + if value: + return f"--{name} {value}" + + return None def parse_netmap_output(output: str) -> list[NodeNetmapInfo]: