import json import logging import os import uuid from datetime import datetime from functools import wraps from time import sleep from typing import Literal, Optional, Union import boto3 import urllib3 from botocore.config import Config from botocore.exceptions import ClientError from mypy_boto3_s3 import S3Client from frostfs_testlib import reporter from frostfs_testlib.resources.common import ASSETS_DIR, MAX_REQUEST_ATTEMPTS, RETRY_MODE, S3_SYNC_WAIT_TIME from frostfs_testlib.s3.interfaces import S3ClientWrapper, VersioningStatus, _make_objs_dict from frostfs_testlib.utils.cli_utils import log_command_execution # TODO: Refactor this code to use shell instead of _cmd_run from frostfs_testlib.utils.cli_utils import _configure_aws_cli logger = logging.getLogger("NeoLogger") # Disable warnings on self-signed certificate which the # boto library produces on requests to S3-gate in dev-env urllib3.disable_warnings() def report_error(func): @wraps(func) def deco(*a, **kw): try: return func(*a, **kw) except ClientError as err: log_command_execution("Result", str(err)) raise return deco class Boto3ClientWrapper(S3ClientWrapper): __repr_name__: str = "Boto3 client" @reporter.step("Configure S3 client (boto3)") @report_error def __init__( self, access_key_id: str, secret_access_key: str, s3gate_endpoint: str, profile: str = "default", region: str = "us-east-1" ) -> None: self.boto3_client: S3Client = None self.session = boto3.Session() self.region = region self.config = Config( retries={ "max_attempts": MAX_REQUEST_ATTEMPTS, "mode": RETRY_MODE, } ) self.access_key_id: str = access_key_id self.secret_access_key: str = secret_access_key self.s3gate_endpoint: str = "" self.boto3_iam_client: S3Client = None self.set_endpoint(s3gate_endpoint) @reporter.step("Set endpoint S3 to {s3gate_endpoint}") def set_endpoint(self, s3gate_endpoint: str): if self.s3gate_endpoint == s3gate_endpoint: return self.s3gate_endpoint = s3gate_endpoint self.boto3_client: S3Client = self.session.client( service_name="s3", aws_access_key_id=self.access_key_id, aws_secret_access_key=self.secret_access_key, region_name=self.region, config=self.config, endpoint_url=s3gate_endpoint, verify=False, ) @reporter.step("Set endpoint IAM to {iam_endpoint}") def set_iam_endpoint(self, iam_endpoint: str): self.boto3_iam_client = self.session.client( service_name="iam", aws_access_key_id=self.access_key_id, aws_secret_access_key=self.secret_access_key, endpoint_url=iam_endpoint, verify=False,) def _to_s3_param(self, param: str): replacement_map = { "Acl": "ACL", "Cors": "CORS", "_": "", } result = param.title() for find, replace in replacement_map.items(): result = result.replace(find, replace) return result # BUCKET METHODS # @reporter.step("Create bucket S3") @report_error def create_bucket( self, bucket: Optional[str] = None, object_lock_enabled_for_bucket: Optional[bool] = None, acl: Optional[str] = None, grant_write: Optional[str] = None, grant_read: Optional[str] = None, grant_full_control: Optional[str] = None, location_constraint: Optional[str] = None, ) -> str: if bucket is None: bucket = str(uuid.uuid4()) params = {"Bucket": bucket} if object_lock_enabled_for_bucket is not None: params.update({"ObjectLockEnabledForBucket": object_lock_enabled_for_bucket}) if acl is not None: params.update({"ACL": acl}) elif grant_write or grant_read or grant_full_control: if grant_write: params.update({"GrantWrite": grant_write}) elif grant_read: params.update({"GrantRead": grant_read}) elif grant_full_control: params.update({"GrantFullControl": grant_full_control}) if location_constraint: params.update({"CreateBucketConfiguration": {"LocationConstraint": location_constraint}}) s3_bucket = self.boto3_client.create_bucket(**params) log_command_execution(f"Created S3 bucket {bucket}", s3_bucket) sleep(S3_SYNC_WAIT_TIME) return bucket @reporter.step("List buckets S3") @report_error def list_buckets(self) -> list[str]: found_buckets = [] response = self.boto3_client.list_buckets() log_command_execution("S3 List buckets result", response) for bucket in response["Buckets"]: found_buckets.append(bucket["Name"]) return found_buckets @reporter.step("Delete bucket S3") @report_error def delete_bucket(self, bucket: str) -> None: response = self.boto3_client.delete_bucket(Bucket=bucket) log_command_execution("S3 Delete bucket result", response) sleep(S3_SYNC_WAIT_TIME) @reporter.step("Head bucket S3") @report_error def head_bucket(self, bucket: str) -> None: response = self.boto3_client.head_bucket(Bucket=bucket) log_command_execution("S3 Head bucket result", response) @reporter.step("Put bucket versioning status") @report_error def put_bucket_versioning(self, bucket: str, status: VersioningStatus) -> None: response = self.boto3_client.put_bucket_versioning( Bucket=bucket, VersioningConfiguration={"Status": status.value} ) log_command_execution("S3 Set bucket versioning to", response) @reporter.step("Get bucket versioning status") @report_error def get_bucket_versioning_status(self, bucket: str) -> Literal["Enabled", "Suspended"]: response = self.boto3_client.get_bucket_versioning(Bucket=bucket) status = response.get("Status") log_command_execution("S3 Got bucket versioning status", response) return status @reporter.step("Put bucket tagging") @report_error def put_bucket_tagging(self, bucket: str, tags: list) -> None: tags = [{"Key": tag_key, "Value": tag_value} for tag_key, tag_value in tags] tagging = {"TagSet": tags} response = self.boto3_client.put_bucket_tagging(Bucket=bucket, Tagging=tagging) log_command_execution("S3 Put bucket tagging", response) @reporter.step("Get bucket tagging") @report_error def get_bucket_tagging(self, bucket: str) -> list: response = self.boto3_client.get_bucket_tagging(Bucket=bucket) log_command_execution("S3 Get bucket tagging", response) return response.get("TagSet") @reporter.step("Get bucket acl") @report_error def get_bucket_acl(self, bucket: str) -> list: response = self.boto3_client.get_bucket_acl(Bucket=bucket) log_command_execution("S3 Get bucket acl", response) return response.get("Grants") @reporter.step("Delete bucket tagging") @report_error def delete_bucket_tagging(self, bucket: str) -> None: response = self.boto3_client.delete_bucket_tagging(Bucket=bucket) log_command_execution("S3 Delete bucket tagging", response) @reporter.step("Put bucket ACL") @report_error def put_bucket_acl( self, bucket: str, acl: Optional[str] = None, grant_write: Optional[str] = None, grant_read: Optional[str] = None, ) -> None: params = { self._to_s3_param(param): value for param, value in locals().items() if param not in ["self"] and value is not None } response = self.boto3_client.put_bucket_acl(**params) log_command_execution("S3 ACL bucket result", response) @reporter.step("Put object lock configuration") @report_error def put_object_lock_configuration(self, bucket: str, configuration: dict) -> dict: response = self.boto3_client.put_object_lock_configuration(Bucket=bucket, ObjectLockConfiguration=configuration) log_command_execution("S3 put_object_lock_configuration result", response) return response @reporter.step("Get object lock configuration") @report_error def get_object_lock_configuration(self, bucket: str) -> dict: response = self.boto3_client.get_object_lock_configuration(Bucket=bucket) log_command_execution("S3 get_object_lock_configuration result", response) return response.get("ObjectLockConfiguration") @reporter.step("Get bucket policy") @report_error def get_bucket_policy(self, bucket: str) -> str: response = self.boto3_client.get_bucket_policy(Bucket=bucket) log_command_execution("S3 get_bucket_policy result", response) return response.get("Policy") @reporter.step("Delete bucket policy") @report_error def delete_bucket_policy(self, bucket: str) -> str: response = self.boto3_client.delete_bucket_policy(Bucket=bucket) log_command_execution("S3 delete_bucket_policy result", response) return response @reporter.step("Put bucket policy") @report_error def put_bucket_policy(self, bucket: str, policy: dict) -> None: response = self.boto3_client.put_bucket_policy(Bucket=bucket, Policy=json.dumps(policy)) log_command_execution("S3 put_bucket_policy result", response) return response @reporter.step("Get bucket cors") @report_error def get_bucket_cors(self, bucket: str) -> dict: response = self.boto3_client.get_bucket_cors(Bucket=bucket) log_command_execution("S3 get_bucket_cors result", response) return response.get("CORSRules") @reporter.step("Get bucket location") @report_error def get_bucket_location(self, bucket: str) -> str: response = self.boto3_client.get_bucket_location(Bucket=bucket) log_command_execution("S3 get_bucket_location result", response) return response.get("LocationConstraint") @reporter.step("Put bucket cors") @report_error def put_bucket_cors(self, bucket: str, cors_configuration: dict) -> None: response = self.boto3_client.put_bucket_cors(Bucket=bucket, CORSConfiguration=cors_configuration) log_command_execution("S3 put_bucket_cors result", response) return response @reporter.step("Delete bucket cors") @report_error def delete_bucket_cors(self, bucket: str) -> None: response = self.boto3_client.delete_bucket_cors(Bucket=bucket) log_command_execution("S3 delete_bucket_cors result", response) # END OF BUCKET METHODS # # OBJECT METHODS # @reporter.step("List objects S3 v2") @report_error def list_objects_v2(self, bucket: str, full_output: bool = False) -> Union[dict, list[str]]: response = self.boto3_client.list_objects_v2(Bucket=bucket) log_command_execution("S3 v2 List objects result", response) obj_list = [obj["Key"] for obj in response.get("Contents", [])] logger.info(f"Found s3 objects: {obj_list}") return response if full_output else obj_list @reporter.step("List objects S3") @report_error def list_objects(self, bucket: str, full_output: bool = False) -> Union[dict, list[str]]: response = self.boto3_client.list_objects(Bucket=bucket) log_command_execution("S3 List objects result", response) obj_list = [obj["Key"] for obj in response.get("Contents", [])] logger.info(f"Found s3 objects: {obj_list}") return response if full_output else obj_list @reporter.step("List objects versions S3") @report_error def list_objects_versions(self, bucket: str, full_output: bool = False) -> dict: response = self.boto3_client.list_object_versions(Bucket=bucket) log_command_execution("S3 List objects versions result", response) return response if full_output else response.get("Versions", []) @reporter.step("List objects delete markers S3") @report_error def list_delete_markers(self, bucket: str, full_output: bool = False) -> list: response = self.boto3_client.list_object_versions(Bucket=bucket) log_command_execution("S3 List objects delete markers result", response) return response if full_output else response.get("DeleteMarkers", []) @reporter.step("Put object S3") @report_error def put_object( self, bucket: str, filepath: str, key: Optional[str] = None, metadata: Optional[dict] = None, tagging: Optional[str] = None, acl: Optional[str] = None, object_lock_mode: Optional[str] = None, object_lock_retain_until_date: Optional[datetime] = None, object_lock_legal_hold_status: Optional[str] = None, grant_full_control: Optional[str] = None, grant_read: Optional[str] = None, ) -> str: if key is None: key = os.path.basename(filepath) with open(filepath, "rb") as put_file: body = put_file.read() params = { self._to_s3_param(param): value for param, value in locals().items() if param not in ["self", "filepath", "put_file"] and value is not None } response = self.boto3_client.put_object(**params) log_command_execution("S3 Put object result", response) return response.get("VersionId") @reporter.step("Head object S3") @report_error def head_object(self, bucket: str, key: str, version_id: Optional[str] = None) -> dict: params = { self._to_s3_param(param): value for param, value in locals().items() if param not in ["self"] and value is not None } response = self.boto3_client.head_object(**params) log_command_execution("S3 Head object result", response) return response @reporter.step("Delete object S3") @report_error def delete_object(self, bucket: str, key: str, version_id: Optional[str] = None) -> dict: params = { self._to_s3_param(param): value for param, value in locals().items() if param not in ["self"] and value is not None } response = self.boto3_client.delete_object(**params) log_command_execution("S3 Delete object result", response) sleep(S3_SYNC_WAIT_TIME) return response @reporter.step("Delete objects S3") @report_error def delete_objects(self, bucket: str, keys: list[str]) -> dict: response = self.boto3_client.delete_objects(Bucket=bucket, Delete=_make_objs_dict(keys)) log_command_execution("S3 Delete objects result", response) assert ( "Errors" not in response ), f'The following objects have not been deleted: {[err_info["Key"] for err_info in response["Errors"]]}.\nError Message: {response["Errors"]["Message"]}' sleep(S3_SYNC_WAIT_TIME) return response @reporter.step("Delete object versions S3") @report_error def delete_object_versions(self, bucket: str, object_versions: list) -> dict: # Build deletion list in S3 format delete_list = { "Objects": [ { "Key": object_version["Key"], "VersionId": object_version["VersionId"], } for object_version in object_versions ] } response = self.boto3_client.delete_objects(Bucket=bucket, Delete=delete_list) log_command_execution("S3 Delete objects result", response) return response @reporter.step("Delete object versions S3 without delete markers") @report_error def delete_object_versions_without_dm(self, bucket: str, object_versions: list) -> None: # Delete objects without creating delete markers for object_version in object_versions: response = self.boto3_client.delete_object( Bucket=bucket, Key=object_version["Key"], VersionId=object_version["VersionId"] ) log_command_execution("S3 Delete object result", response) @reporter.step("Put object ACL") @report_error def put_object_acl( self, bucket: str, key: str, acl: Optional[str] = None, grant_write: Optional[str] = None, grant_read: Optional[str] = None, ) -> list: # pytest.skip("Method put_object_acl is not supported by boto3 client") raise NotImplementedError("Unsupported for boto3 client") @reporter.step("Get object ACL") @report_error def get_object_acl(self, bucket: str, key: str, version_id: Optional[str] = None) -> list: params = { self._to_s3_param(param): value for param, value in locals().items() if param not in ["self"] and value is not None } response = self.boto3_client.get_object_acl(**params) log_command_execution("S3 ACL objects result", response) return response.get("Grants") @reporter.step("Copy object S3") @report_error def copy_object( self, source_bucket: str, source_key: str, bucket: Optional[str] = None, key: Optional[str] = None, acl: Optional[str] = None, metadata_directive: Optional[Literal["COPY", "REPLACE"]] = None, metadata: Optional[dict] = None, tagging_directive: Optional[Literal["COPY", "REPLACE"]] = None, tagging: Optional[str] = None, ) -> str: if bucket is None: bucket = source_bucket if key is None: key = os.path.join(os.getcwd(), str(uuid.uuid4())) copy_source = f"{source_bucket}/{source_key}" params = { self._to_s3_param(param): value for param, value in locals().items() if param not in ["self", "source_bucket", "source_key"] and value is not None } response = self.boto3_client.copy_object(**params) log_command_execution("S3 Copy objects result", response) return key @reporter.step("Get object S3") @report_error def get_object( self, bucket: str, key: str, version_id: Optional[str] = None, object_range: Optional[tuple[int, int]] = None, full_output: bool = False, ) -> Union[dict, str]: filename = os.path.join(os.getcwd(), ASSETS_DIR, str(uuid.uuid4())) range_str = None if object_range: range_str = f"bytes={object_range[0]}-{object_range[1]}" params = { self._to_s3_param(param): value for param, value in {**locals(), **{"Range": range_str}}.items() if param not in ["self", "object_range", "full_output", "range_str", "filename"] and value is not None } response = self.boto3_client.get_object(**params) log_command_execution("S3 Get objects result", response) with open(f"{filename}", "wb") as get_file: chunk = response["Body"].read(1024) while chunk: get_file.write(chunk) chunk = response["Body"].read(1024) return response if full_output else filename @reporter.step("Create multipart upload S3") @report_error def create_multipart_upload(self, bucket: str, key: str) -> str: response = self.boto3_client.create_multipart_upload(Bucket=bucket, Key=key) log_command_execution("S3 Created multipart upload", response) assert response.get("UploadId"), f"Expected UploadId in response:\n{response}" return response["UploadId"] @reporter.step("List multipart uploads S3") @report_error def list_multipart_uploads(self, bucket: str) -> Optional[list[dict]]: response = self.boto3_client.list_multipart_uploads(Bucket=bucket) log_command_execution("S3 List multipart upload", response) return response.get("Uploads") @reporter.step("Abort multipart upload S3") @report_error def abort_multipart_upload(self, bucket: str, key: str, upload_id: str) -> None: response = self.boto3_client.abort_multipart_upload(Bucket=bucket, Key=key, UploadId=upload_id) log_command_execution("S3 Abort multipart upload", response) @reporter.step("Upload part S3") @report_error def upload_part(self, bucket: str, key: str, upload_id: str, part_num: int, filepath: str) -> str: with open(filepath, "rb") as put_file: body = put_file.read() response = self.boto3_client.upload_part( UploadId=upload_id, Bucket=bucket, Key=key, PartNumber=part_num, Body=body, ) log_command_execution("S3 Upload part", response) assert response.get("ETag"), f"Expected ETag in response:\n{response}" return response["ETag"] @reporter.step("Upload copy part S3") @report_error def upload_part_copy(self, bucket: str, key: str, upload_id: str, part_num: int, copy_source: str) -> str: response = self.boto3_client.upload_part_copy( UploadId=upload_id, Bucket=bucket, Key=key, PartNumber=part_num, CopySource=copy_source, ) log_command_execution("S3 Upload copy part", response) assert response.get("CopyPartResult", []).get("ETag"), f"Expected ETag in response:\n{response}" return response["CopyPartResult"]["ETag"] @reporter.step("List parts S3") @report_error def list_parts(self, bucket: str, key: str, upload_id: str) -> list[dict]: response = self.boto3_client.list_parts(UploadId=upload_id, Bucket=bucket, Key=key) log_command_execution("S3 List part", response) assert response.get("Parts"), f"Expected Parts in response:\n{response}" return response["Parts"] @reporter.step("Complete multipart upload S3") @report_error def complete_multipart_upload(self, bucket: str, key: str, upload_id: str, parts: list) -> None: parts = [{"ETag": etag, "PartNumber": part_num} for part_num, etag in parts] response = self.boto3_client.complete_multipart_upload( Bucket=bucket, Key=key, UploadId=upload_id, MultipartUpload={"Parts": parts} ) log_command_execution("S3 Complete multipart upload", response) return response @reporter.step("Put object retention") @report_error def put_object_retention( self, bucket: str, key: str, retention: dict, version_id: Optional[str] = None, bypass_governance_retention: Optional[bool] = None, ) -> None: params = { self._to_s3_param(param): value for param, value in locals().items() if param not in ["self"] and value is not None } response = self.boto3_client.put_object_retention(**params) log_command_execution("S3 Put object retention ", response) @reporter.step("Put object legal hold") @report_error def put_object_legal_hold( self, bucket: str, key: str, legal_hold_status: Literal["ON", "OFF"], version_id: Optional[str] = None, ) -> None: legal_hold = {"Status": legal_hold_status} params = { self._to_s3_param(param): value for param, value in locals().items() if param not in ["self", "legal_hold_status"] and value is not None } response = self.boto3_client.put_object_legal_hold(**params) log_command_execution("S3 Put object legal hold ", response) @reporter.step("Put object tagging") @report_error def put_object_tagging(self, bucket: str, key: str, tags: list, version_id: Optional[str] = '') -> None: tags = [{"Key": tag_key, "Value": tag_value} for tag_key, tag_value in tags] tagging = {"TagSet": tags} response = self.boto3_client.put_object_tagging(Bucket=bucket, Key=key, Tagging=tagging, VersionId=version_id) log_command_execution("S3 Put object tagging", response) @reporter.step("Get object tagging") @report_error def get_object_tagging(self, bucket: str, key: str, version_id: Optional[str] = None) -> list: params = { self._to_s3_param(param): value for param, value in locals().items() if param not in ["self"] and value is not None } response = self.boto3_client.get_object_tagging(**params) log_command_execution("S3 Get object tagging", response) return response.get("TagSet") @reporter.step("Delete object tagging") @report_error def delete_object_tagging(self, bucket: str, key: str) -> None: response = self.boto3_client.delete_object_tagging(Bucket=bucket, Key=key) log_command_execution("S3 Delete object tagging", response) @reporter.step("Get object attributes") @report_error def get_object_attributes( self, bucket: str, key: str, attributes: list[str], version_id: Optional[str] = None, max_parts: Optional[int] = None, part_number: Optional[int] = None, full_output: bool = True, ) -> dict: logger.warning("Method get_object_attributes is not supported by boto3 client") return {} @reporter.step("Sync directory S3") @report_error def sync( self, bucket: str, dir_path: str, acl: Optional[str] = None, metadata: Optional[dict] = None, ) -> dict: raise NotImplementedError("Sync is not supported for boto3 client") @reporter.step("CP directory S3") @report_error def cp( self, bucket: str, dir_path: str, acl: Optional[str] = None, metadata: Optional[dict] = None, ) -> dict: raise NotImplementedError("Cp is not supported for boto3 client") # END OBJECT METHODS # # IAM METHODS # # Some methods don't have checks because boto3 is silent in some cases (delete, attach, etc.) @reporter.step("Adds the specified user to the specified group") def iam_add_user_to_group(self, user_name: str, group_name: str) -> dict: response = self.boto3_iam_client.add_user_to_group(UserName=user_name, GroupName=group_name) return response @reporter.step("Attaches the specified managed policy to the specified IAM group") def iam_attach_group_policy(self, group_name: str, policy_arn: str) -> dict: response = self.boto3_iam_client.attach_group_policy(GroupName=group_name, PolicyArn=policy_arn) sleep(S3_SYNC_WAIT_TIME * 10) return response @reporter.step("Attaches the specified managed policy to the specified user") def iam_attach_user_policy(self, user_name: str, policy_arn: str) -> dict: response = self.boto3_iam_client.attach_user_policy(UserName=user_name, PolicyArn=policy_arn) sleep(S3_SYNC_WAIT_TIME * 10) return response @reporter.step("Creates a new AWS secret access key and access key ID for the specified user") def iam_create_access_key(self, user_name: str) -> dict: response = self.boto3_iam_client.create_access_key(UserName=user_name) access_key_id = response["AccessKey"].get("AccessKeyId") secret_access_key = response["AccessKey"].get("SecretAccessKey") assert access_key_id, f"Expected AccessKeyId in response:\n{response}" assert secret_access_key, f"Expected SecretAccessKey in response:\n{response}" return access_key_id, secret_access_key @reporter.step("Creates a new group") def iam_create_group(self, group_name: str) -> dict: response = self.boto3_iam_client.create_group(GroupName=group_name) assert response.get("Group"), f"Expected Group in response:\n{response}" assert response["Group"].get("GroupName") == group_name, f"GroupName should be equal to {group_name}" return response @reporter.step("Creates a new managed policy for your AWS account") def iam_create_policy(self, policy_name: str, policy_document: dict) -> dict: response = self.boto3_iam_client.create_policy(PolicyName=policy_name, PolicyDocument=json.dumps(policy_document)) assert response.get("Policy"), f"Expected Policy in response:\n{response}" assert response["Policy"].get("PolicyName") == policy_name, f"PolicyName should be equal to {policy_name}" return response @reporter.step("Creates a new IAM user for your AWS account") def iam_create_user(self, user_name: str) -> dict: response = self.boto3_iam_client.create_user(UserName=user_name) assert response.get("User"), f"Expected User in response:\n{response}" assert response["User"].get("UserName") == user_name, f"UserName should be equal to {user_name}" return response @reporter.step("Deletes the access key pair associated with the specified IAM user") def iam_delete_access_key(self, access_key_id: str, user_name: str) -> dict: response = self.boto3_iam_client.delete_access_key(AccessKeyId=access_key_id, UserName=user_name) return response @reporter.step("Deletes the specified IAM group") def iam_delete_group(self, group_name: str) -> dict: response = self.boto3_iam_client.delete_group(GroupName=group_name) return response @reporter.step("Deletes the specified inline policy that is embedded in the specified IAM group") def iam_delete_group_policy(self, group_name: str, policy_name: str) -> dict: response = self.boto3_iam_client.delete_group_policy(GroupName=group_name, PolicyName=policy_name) return response @reporter.step("Deletes the specified managed policy") def iam_delete_policy(self, policy_arn: str) -> dict: response = self.boto3_iam_client.delete_policy(PolicyArn=policy_arn) return response @reporter.step("Deletes the specified IAM user") def iam_delete_user(self, user_name: str) -> dict: response = self.boto3_iam_client.delete_user(UserName=user_name) return response @reporter.step("Deletes the specified inline policy that is embedded in the specified IAM user") def iam_delete_user_policy(self, user_name: str, policy_name: str) -> dict: response = self.boto3_iam_client.delete_user_policy(UserName=user_name, PolicyName=policy_name) return response @reporter.step("Removes the specified managed policy from the specified IAM group") def iam_detach_group_policy(self, group_name: str, policy_arn: str) -> dict: response = self.boto3_iam_client.detach_group_policy(GroupName=group_name, PolicyArn=policy_arn) sleep(S3_SYNC_WAIT_TIME * 10) return response @reporter.step("Removes the specified managed policy from the specified user") def iam_detach_user_policy(self, user_name: str, policy_arn: str) -> dict: response = self.boto3_iam_client.detach_user_policy(UserName=user_name, PolicyArn=policy_arn) sleep(S3_SYNC_WAIT_TIME * 10) return response @reporter.step("Returns a list of IAM users that are in the specified IAM group") def iam_get_group(self, group_name: str) -> dict: response = self.boto3_iam_client.get_group(GroupName=group_name) assert response.get("Group").get("GroupName") == group_name, f"GroupName should be equal to {group_name}" return response @reporter.step("Retrieves the specified inline policy document that is embedded in the specified IAM group") def iam_get_group_policy(self, group_name: str, policy_name: str) -> dict: response = self.boto3_iam_client.get_group_policy(GroupName=group_name, PolicyName=policy_name) return response @reporter.step("Retrieves information about the specified managed policy") def iam_get_policy(self, policy_arn: str) -> dict: response = self.boto3_iam_client.get_policy(PolicyArn=policy_arn) assert response.get("Policy"), f"Expected Policy in response:\n{response}" assert response["Policy"].get("PolicyName") == policy_name, f"PolicyName should be equal to {policy_name}" return response @reporter.step("Retrieves information about the specified version of the specified managed policy") def iam_get_policy_version(self, policy_arn: str, version_id: str) -> dict: response = self.boto3_iam_client.get_policy_version(PolicyArn=policy_arn, VersionId=version_id) assert response.get("PolicyVersion"), f"Expected PolicyVersion in response:\n{response}" assert response["PolicyVersion"].get("VersionId") == version_id, f"VersionId should be equal to {version_id}" return response @reporter.step("Retrieves information about the specified IAM user") def iam_get_user(self, user_name: str) -> dict: response = self.boto3_iam_client.get_user(UserName=user_name) assert response.get("User"), f"Expected User in response:\n{response}" assert response["User"].get("UserName") == user_name, f"UserName should be equal to {user_name}" return response @reporter.step("Retrieves the specified inline policy document that is embedded in the specified IAM user") def iam_get_user_policy(self, user_name: str, policy_name: str) -> dict: response = self.boto3_iam_client.get_user_policy(UserName=user_name, PolicyName=policy_name) assert response.get("UserName"), f"Expected UserName in response:\n{response}" return response @reporter.step("Returns information about the access key IDs associated with the specified IAM user") def iam_list_access_keys(self, user_name: str) -> dict: response = self.boto3_iam_client.list_access_keys(UserName=user_name) return response @reporter.step("Lists all managed policies that are attached to the specified IAM group") def iam_list_attached_group_policies(self, group_name: str) -> dict: response = self.boto3_iam_client.list_attached_group_policies(GroupName=group_name) assert response.get("AttachedPolicies"), f"Expected AttachedPolicies in response:\n{response}" return response @reporter.step("Lists all managed policies that are attached to the specified IAM user") def iam_list_attached_user_policies(self, user_name: str) -> dict: response = self.boto3_iam_client.list_attached_user_policies(UserName=user_name) assert response.get("AttachedPolicies"), f"Expected AttachedPolicies in response:\n{response}" return response @reporter.step("Lists all IAM users, groups, and roles that the specified managed policy is attached to") def iam_list_entities_for_policy(self, policy_arn: str) -> dict: response = self.boto3_iam_client.list_entities_for_policy(PolicyArn=policy_arn) assert response.get("PolicyGroups"), f"Expected PolicyGroups in response:\n{response}" assert response.get("PolicyUsers"), f"Expected PolicyUsers in response:\n{response}" return response @reporter.step("Lists the names of the inline policies that are embedded in the specified IAM group") def iam_list_group_policies(self, group_name: str) -> dict: response = self.boto3_iam_client.list_group_policies(GroupName=group_name) assert response.get("PolicyNames"), f"Expected PolicyNames in response:\n{response}" return response @reporter.step("Lists the IAM groups") def iam_list_groups(self) -> dict: response = self.boto3_iam_client.list_groups() assert response.get("Groups"), f"Expected Groups in response:\n{response}" return response @reporter.step("Lists the IAM groups that the specified IAM user belongs to") def iam_list_groups_for_user(self, user_name: str) -> dict: response = self.boto3_iam_client.list_groups_for_user(UserName=user_name) assert response.get("Groups"), f"Expected Groups in response:\n{response}" return response @reporter.step("Lists all the managed policies that are available in your AWS account") def iam_list_policies(self) -> dict: response = self.boto3_iam_client.list_policies() assert response.get("Policies"), f"Expected Policies in response:\n{response}" return response @reporter.step("Lists information about the versions of the specified managed policy") def iam_list_policy_versions(self, policy_arn: str) -> dict: response = self.boto3_iam_client.list_policy_versions(PolicyArn=policy_arn) assert response.get("Versions"), f"Expected Versions in response:\n{response}" return response @reporter.step("Lists the names of the inline policies embedded in the specified IAM user") def iam_list_user_policies(self, user_name: str) -> dict: response = self.boto3_iam_client.list_user_policies(UserName=user_name) assert response.get("PolicyNames"), f"Expected PolicyNames in response:\n{response}" return response @reporter.step("Lists the IAM users") def iam_list_users(self) -> dict: response = self.boto3_iam_client.list_users() assert response.get("Users"), f"Expected Users in response:\n{response}" return response @reporter.step("Adds or updates an inline policy document that is embedded in the specified IAM group") def iam_put_group_policy(self, group_name: str, policy_name: str, policy_document: dict) -> dict: response = self.boto3_iam_client.put_group_policy(GroupName=group_name, PolicyName=policy_name, PolicyDocument=json.dumps(policy_document)) sleep(S3_SYNC_WAIT_TIME * 10) return response @reporter.step("Adds or updates an inline policy document that is embedded in the specified IAM user") def iam_put_user_policy(self, user_name: str, policy_name: str, policy_document: dict) -> dict: response = self.boto3_iam_client.put_user_policy(UserName=user_name, PolicyName=policy_name, PolicyDocument=json.dumps(policy_document)) sleep(S3_SYNC_WAIT_TIME * 10) return response @reporter.step("Removes the specified user from the specified group") def iam_remove_user_from_group(self, group_name: str, user_name: str) -> dict: response = self.boto3_iam_client.remove_user_from_group(GroupName=group_name, UserName=user_name) return response @reporter.step("Updates the name and/or the path of the specified IAM group") def iam_update_group(self, group_name: str, new_name: str, new_path: Optional[str] = None) -> dict: response = self.boto3_iam_client.update_group(GroupName=group_name, NewGroupName=new_name, NewPath='/') return response @reporter.step("Updates the name and/or the path of the specified IAM user") def iam_update_user(self, user_name: str, new_name: str, new_path: Optional[str] = None) -> dict: response = self.boto3_iam_client.update_user(UserName=user_name, NewUserName=new_name, NewPath='/') return response