import json import logging import os from datetime import datetime from time import sleep from typing import Literal, Optional, Union from frostfs_testlib import reporter from frostfs_testlib.clients.s3.interfaces import S3ClientWrapper, VersioningStatus, _make_objs_dict from frostfs_testlib.resources.common import ASSETS_DIR, MAX_REQUEST_ATTEMPTS, RETRY_MODE, S3_SYNC_WAIT_TIME from frostfs_testlib.shell import CommandOptions from frostfs_testlib.shell.local_shell import LocalShell from frostfs_testlib.utils import string_utils # TODO: Refactor this code to use shell instead of _cmd_run from frostfs_testlib.utils.cli_utils import _configure_aws_cli from frostfs_testlib.utils.file_utils import TestFile logger = logging.getLogger("NeoLogger") command_options = CommandOptions(timeout=480) class AwsCliClient(S3ClientWrapper): __repr_name__: str = "AWS CLI" # Flags that we use for all S3 commands: disable SSL verification (as we use self-signed # certificate in devenv) and disable automatic pagination in CLI output common_flags = "--no-verify-ssl --no-paginate" s3gate_endpoint: str @reporter.step("Configure S3 client (aws cli)") def __init__( self, access_key_id: str, secret_access_key: str, s3gate_endpoint: str, profile: str = "default", region: str = "us-east-1" ) -> None: self.s3gate_endpoint = s3gate_endpoint self.iam_endpoint = None self.access_key_id: str = access_key_id self.secret_access_key: str = secret_access_key self.profile = profile self.region = region self.local_shell = LocalShell() try: _configure_aws_cli(f"aws configure --profile {profile}", access_key_id, secret_access_key, region) self.local_shell.exec(f"aws configure set max_attempts {MAX_REQUEST_ATTEMPTS} --profile {profile}") self.local_shell.exec( f"aws configure set retry_mode {RETRY_MODE} --profile {profile}", ) except Exception as err: raise RuntimeError("Error while configuring AwsCliClient") from err @reporter.step("Set S3 endpoint to {s3gate_endpoint}") def set_endpoint(self, s3gate_endpoint: str): self.s3gate_endpoint = s3gate_endpoint @reporter.step("Set IAM endpoint to {iam_endpoint}") def set_iam_endpoint(self, iam_endpoint: str): self.iam_endpoint = iam_endpoint @reporter.step("Create bucket S3") def create_bucket( self, bucket: Optional[str] = None, object_lock_enabled_for_bucket: Optional[bool] = None, acl: Optional[str] = None, grant_write: Optional[str] = None, grant_read: Optional[str] = None, grant_full_control: Optional[str] = None, location_constraint: Optional[str] = None, ) -> str: if bucket is None: bucket = string_utils.unique_name("bucket-") if bucket.startswith("-") or " " in bucket: bucket = f'"{bucket}"' if object_lock_enabled_for_bucket is None: object_lock = "" elif object_lock_enabled_for_bucket: object_lock = " --object-lock-enabled-for-bucket" else: object_lock = " --no-object-lock-enabled-for-bucket" cmd = ( f"aws {self.common_flags} s3api create-bucket --bucket {bucket} " f"{object_lock} --endpoint {self.s3gate_endpoint} --profile {self.profile}" ) if acl: cmd += f" --acl {acl}" if grant_full_control: cmd += f" --grant-full-control {grant_full_control}" if grant_write: cmd += f" --grant-write {grant_write}" if grant_read: cmd += f" --grant-read {grant_read}" if location_constraint: cmd += f" --create-bucket-configuration LocationConstraint={location_constraint}" self.local_shell.exec(cmd) return bucket @reporter.step("List buckets S3") def list_buckets(self) -> list[str]: cmd = f"aws {self.common_flags} s3api list-buckets --endpoint {self.s3gate_endpoint} --profile {self.profile}" output = self.local_shell.exec(cmd).stdout buckets_json = self._to_json(output) return [bucket["Name"] for bucket in buckets_json["Buckets"]] @reporter.step("Delete bucket S3") def delete_bucket(self, bucket: str) -> None: if bucket.startswith("-") or " " in bucket: bucket = f'"{bucket}"' cmd = f"aws {self.common_flags} s3api delete-bucket --bucket {bucket} --endpoint {self.s3gate_endpoint} --profile {self.profile}" self.local_shell.exec(cmd, command_options) @reporter.step("Head bucket S3") def head_bucket(self, bucket: str) -> None: if bucket.startswith("-") or " " in bucket: bucket = f'"{bucket}"' cmd = f"aws {self.common_flags} s3api head-bucket --bucket {bucket} --endpoint {self.s3gate_endpoint} --profile {self.profile}" self.local_shell.exec(cmd) @reporter.step("Put bucket versioning status") def put_bucket_versioning(self, bucket: str, status: VersioningStatus) -> None: if bucket.startswith("-") or " " in bucket: bucket = f'"{bucket}"' cmd = ( f"aws {self.common_flags} s3api put-bucket-versioning --bucket {bucket} " f"--versioning-configuration Status={status.value} " f"--endpoint {self.s3gate_endpoint} --profile {self.profile}" ) self.local_shell.exec(cmd) @reporter.step("Get bucket versioning status") def get_bucket_versioning_status(self, bucket: str) -> Literal["Enabled", "Suspended"]: if bucket.startswith("-") or " " in bucket: bucket = f'"{bucket}"' cmd = ( f"aws {self.common_flags} s3api get-bucket-versioning --bucket {bucket} " f"--endpoint {self.s3gate_endpoint} --profile {self.profile}" ) output = self.local_shell.exec(cmd).stdout response = self._to_json(output) return response.get("Status") @reporter.step("Put bucket tagging") def put_bucket_tagging(self, bucket: str, tags: list) -> None: if bucket.startswith("-") or " " in bucket: bucket = f'"{bucket}"' tags_json = {"TagSet": [{"Key": tag_key, "Value": tag_value} for tag_key, tag_value in tags]} cmd = ( f"aws {self.common_flags} s3api put-bucket-tagging --bucket {bucket} " f"--tagging '{json.dumps(tags_json)}' --endpoint {self.s3gate_endpoint} --profile {self.profile}" ) self.local_shell.exec(cmd) @reporter.step("Get bucket tagging") def get_bucket_tagging(self, bucket: str) -> list: if bucket.startswith("-") or " " in bucket: bucket = f'"{bucket}"' cmd = ( f"aws {self.common_flags} s3api get-bucket-tagging --bucket {bucket} " f"--endpoint {self.s3gate_endpoint} --profile {self.profile}" ) output = self.local_shell.exec(cmd).stdout response = self._to_json(output) return response.get("TagSet") @reporter.step("Get bucket acl") def get_bucket_acl(self, bucket: str) -> dict: if bucket.startswith("-") or " " in bucket: bucket = f'"{bucket}"' cmd = ( f"aws {self.common_flags} s3api get-bucket-acl --bucket {bucket} " f"--endpoint {self.s3gate_endpoint} --profile {self.profile}" ) output = self.local_shell.exec(cmd).stdout return self._to_json(output) @reporter.step("Get bucket location") def get_bucket_location(self, bucket: str) -> dict: if bucket.startswith("-") or " " in bucket: bucket = f'"{bucket}"' cmd = ( f"aws {self.common_flags} s3api get-bucket-location --bucket {bucket} " f"--endpoint {self.s3gate_endpoint} --profile {self.profile}" ) output = self.local_shell.exec(cmd).stdout response = self._to_json(output) return response.get("LocationConstraint") @reporter.step("List objects S3") def list_objects( self, bucket: str, full_output: bool = False, page_size: Optional[int] = None, prefix: Optional[str] = None ) -> Union[dict, list[str]]: if bucket.startswith("-") or " " in bucket: bucket = f'"{bucket}"' cmd = f"aws {self.common_flags} s3api list-objects --bucket {bucket} " f"--endpoint {self.s3gate_endpoint} " if page_size: cmd = cmd.replace("--no-paginate", "") cmd += f" --page-size {page_size} " if prefix: cmd += f" --prefix {prefix}" if self.profile: cmd += f" --profile {self.profile} " output = self.local_shell.exec(cmd).stdout response = self._to_json(output) obj_list = [obj["Key"] for obj in response.get("Contents", [])] logger.info(f"Found s3 objects: {obj_list}") return response if full_output else obj_list @reporter.step("List objects S3 v2") def list_objects_v2(self, bucket: str, full_output: bool = False) -> Union[dict, list[str]]: if bucket.startswith("-") or " " in bucket: bucket = f'"{bucket}"' cmd = ( f"aws {self.common_flags} s3api list-objects-v2 --bucket {bucket} " f"--endpoint {self.s3gate_endpoint} --profile {self.profile}" ) output = self.local_shell.exec(cmd).stdout response = self._to_json(output) obj_list = [obj["Key"] for obj in response.get("Contents", [])] logger.info(f"Found s3 objects: {obj_list}") return response if full_output else obj_list @reporter.step("List objects versions S3") def list_objects_versions(self, bucket: str, full_output: bool = False) -> dict: if bucket.startswith("-") or " " in bucket: bucket = f'"{bucket}"' cmd = ( f"aws {self.common_flags} s3api list-object-versions --bucket {bucket} " f"--endpoint {self.s3gate_endpoint} --profile {self.profile}" ) output = self.local_shell.exec(cmd).stdout response = self._to_json(output) return response if full_output else response.get("Versions", []) @reporter.step("List objects delete markers S3") def list_delete_markers(self, bucket: str, full_output: bool = False) -> list: if bucket.startswith("-") or " " in bucket: bucket = f'"{bucket}"' cmd = ( f"aws {self.common_flags} s3api list-object-versions --bucket {bucket} " f"--endpoint {self.s3gate_endpoint} --profile {self.profile}" ) output = self.local_shell.exec(cmd).stdout response = self._to_json(output) return response if full_output else response.get("DeleteMarkers", []) @reporter.step("Copy object S3") def copy_object( self, source_bucket: str, source_key: str, bucket: Optional[str] = None, key: Optional[str] = None, acl: Optional[str] = None, metadata_directive: Optional[Literal["COPY", "REPLACE"]] = None, metadata: Optional[dict] = None, tagging_directive: Optional[Literal["COPY", "REPLACE"]] = None, tagging: Optional[str] = None, ) -> str: if bucket is None: bucket = source_bucket if bucket.startswith("-") or " " in bucket: bucket = f'"{bucket}"' if key is None: key = string_utils.unique_name("copy-object-") copy_source = f"{source_bucket}/{source_key}" cmd = ( f"aws {self.common_flags} s3api copy-object --copy-source {copy_source} " f"--bucket {bucket} --key {key} --endpoint {self.s3gate_endpoint} --profile {self.profile}" ) if acl: cmd += f" --acl {acl}" if metadata_directive: cmd += f" --metadata-directive {metadata_directive}" if metadata: cmd += " --metadata " for meta_key, value in metadata.items(): cmd += f" {meta_key}={value}" if tagging_directive: cmd += f" --tagging-directive {tagging_directive}" if tagging: cmd += f" --tagging {tagging}" self.local_shell.exec(cmd, command_options) return key @reporter.step("Put object S3") def put_object( self, bucket: str, filepath: str, key: Optional[str] = None, metadata: Optional[dict] = None, tagging: Optional[str] = None, acl: Optional[str] = None, object_lock_mode: Optional[str] = None, object_lock_retain_until_date: Optional[datetime] = None, object_lock_legal_hold_status: Optional[str] = None, grant_full_control: Optional[str] = None, grant_read: Optional[str] = None, ) -> str: if bucket.startswith("-") or " " in bucket: bucket = f'"{bucket}"' if key is None: key = os.path.basename(filepath) cmd = ( f"aws {self.common_flags} s3api put-object --bucket {bucket} --key {key} " f"--body {filepath} --endpoint {self.s3gate_endpoint} --profile {self.profile}" ) if metadata: cmd += " --metadata" for key, value in metadata.items(): cmd += f" {key}={value}" if tagging: cmd += f" --tagging '{tagging}'" if acl: cmd += f" --acl {acl}" if object_lock_mode: cmd += f" --object-lock-mode {object_lock_mode}" if object_lock_retain_until_date: cmd += f' --object-lock-retain-until-date "{object_lock_retain_until_date}"' if object_lock_legal_hold_status: cmd += f" --object-lock-legal-hold-status {object_lock_legal_hold_status}" if grant_full_control: cmd += f" --grant-full-control '{grant_full_control}'" if grant_read: cmd += f" --grant-read {grant_read}" output = self.local_shell.exec(cmd, command_options).stdout response = self._to_json(output) return response.get("VersionId") @reporter.step("Head object S3") def head_object(self, bucket: str, key: str, version_id: Optional[str] = None) -> dict: if bucket.startswith("-") or " " in bucket: bucket = f'"{bucket}"' version = f" --version-id {version_id}" if version_id else "" cmd = ( f"aws {self.common_flags} s3api head-object --bucket {bucket} --key {key} " f"{version} --endpoint {self.s3gate_endpoint} --profile {self.profile}" ) output = self.local_shell.exec(cmd).stdout response = self._to_json(output) return response @reporter.step("Get object S3") def get_object( self, bucket: str, key: str, version_id: Optional[str] = None, object_range: Optional[tuple[int, int]] = None, full_output: bool = False, ) -> dict | TestFile: if bucket.startswith("-") or " " in bucket: bucket = f'"{bucket}"' test_file = TestFile(os.path.join(os.getcwd(), ASSETS_DIR, string_utils.unique_name("dl-object-"))) version = f" --version-id {version_id}" if version_id else "" cmd = ( f"aws {self.common_flags} s3api get-object --bucket {bucket} --key {key} " f"{version} {test_file} --endpoint {self.s3gate_endpoint} --profile {self.profile}" ) if object_range: cmd += f" --range bytes={object_range[0]}-{object_range[1]}" output = self.local_shell.exec(cmd).stdout response = self._to_json(output) return response if full_output else test_file @reporter.step("Get object ACL") def get_object_acl(self, bucket: str, key: str, version_id: Optional[str] = None) -> list: if bucket.startswith("-") or " " in bucket: bucket = f'"{bucket}"' version = f" --version-id {version_id}" if version_id else "" cmd = ( f"aws {self.common_flags} s3api get-object-acl --bucket {bucket} --key {key} " f"{version} --endpoint {self.s3gate_endpoint} --profile {self.profile}" ) output = self.local_shell.exec(cmd).stdout response = self._to_json(output) return response.get("Grants") @reporter.step("Put object ACL") def put_object_acl( self, bucket: str, key: str, acl: Optional[str] = None, grant_write: Optional[str] = None, grant_read: Optional[str] = None, ) -> list: if bucket.startswith("-") or " " in bucket: bucket = f'"{bucket}"' cmd = ( f"aws {self.common_flags} s3api put-object-acl --bucket {bucket} --key {key} " f" --endpoint {self.s3gate_endpoint} --profile {self.profile}" ) if acl: cmd += f" --acl {acl}" if grant_write: cmd += f" --grant-write {grant_write}" if grant_read: cmd += f" --grant-read {grant_read}" output = self.local_shell.exec(cmd).stdout response = self._to_json(output) return response.get("Grants") @reporter.step("Put bucket ACL") def put_bucket_acl( self, bucket: str, acl: Optional[str] = None, grant_write: Optional[str] = None, grant_read: Optional[str] = None, ) -> None: if bucket.startswith("-") or " " in bucket: bucket = f'"{bucket}"' cmd = ( f"aws {self.common_flags} s3api put-bucket-acl --bucket {bucket} " f" --endpoint {self.s3gate_endpoint} --profile {self.profile}" ) if acl: cmd += f" --acl {acl}" if grant_write: cmd += f" --grant-write {grant_write}" if grant_read: cmd += f" --grant-read {grant_read}" self.local_shell.exec(cmd) @reporter.step("Delete objects S3") def delete_objects(self, bucket: str, keys: list[str]) -> dict: if bucket.startswith("-") or " " in bucket: bucket = f'"{bucket}"' file_path = os.path.join(os.getcwd(), ASSETS_DIR, "delete.json") delete_structure = json.dumps(_make_objs_dict(keys)) with open(file_path, "w") as out_file: out_file.write(delete_structure) logger.info(f"Input file for delete-objects: {delete_structure}") cmd = ( f"aws {self.common_flags} s3api delete-objects --bucket {bucket} " f"--delete file://{file_path} --endpoint {self.s3gate_endpoint} --profile {self.profile}" ) output = self.local_shell.exec(cmd, command_options).stdout response = self._to_json(output) return response @reporter.step("Delete object S3") def delete_object(self, bucket: str, key: str, version_id: Optional[str] = None) -> dict: if bucket.startswith("-") or " " in bucket: bucket = f'"{bucket}"' version = f" --version-id {version_id}" if version_id else "" cmd = ( f"aws {self.common_flags} s3api delete-object --bucket {bucket} " f"--key {key} {version} --endpoint {self.s3gate_endpoint} --profile {self.profile}" ) output = self.local_shell.exec(cmd, command_options).stdout return self._to_json(output) @reporter.step("Delete object versions S3") def delete_object_versions(self, bucket: str, object_versions: list) -> dict: if bucket.startswith("-") or " " in bucket: bucket = f'"{bucket}"' # Build deletion list in S3 format delete_list = { "Objects": [ { "Key": object_version["Key"], "VersionId": object_version["VersionId"], } for object_version in object_versions ] } file_path = os.path.join(os.getcwd(), ASSETS_DIR, "delete.json") delete_structure = json.dumps(delete_list) with open(file_path, "w") as out_file: out_file.write(delete_structure) logger.info(f"Input file for delete-objects: {delete_structure}") cmd = ( f"aws {self.common_flags} s3api delete-objects --bucket {bucket} " f"--delete file://{file_path} --endpoint {self.s3gate_endpoint} --profile {self.profile}" ) output = self.local_shell.exec(cmd, command_options).stdout return self._to_json(output) @reporter.step("Delete object versions S3 without delete markers") def delete_object_versions_without_dm(self, bucket: str, object_versions: list) -> None: if bucket.startswith("-") or " " in bucket: bucket = f'"{bucket}"' # Delete objects without creating delete markers for object_version in object_versions: self.delete_object(bucket=bucket, key=object_version["Key"], version_id=object_version["VersionId"]) @reporter.step("Get object attributes") def get_object_attributes( self, bucket: str, key: str, attributes: list[str], version_id: str = "", max_parts: int = 0, part_number: int = 0, full_output: bool = True, ) -> dict: if bucket.startswith("-") or " " in bucket: bucket = f'"{bucket}"' attrs = ",".join(attributes) version = f" --version-id {version_id}" if version_id else "" parts = f"--max-parts {max_parts}" if max_parts else "" part_number_str = f"--part-number-marker {part_number}" if part_number else "" cmd = ( f"aws {self.common_flags} s3api get-object-attributes --bucket {bucket} " f"--key {key} {version} {parts} {part_number_str} --object-attributes {attrs} " f"--endpoint {self.s3gate_endpoint} --profile {self.profile}" ) output = self.local_shell.exec(cmd).stdout response = self._to_json(output) for attr in attributes: assert attr in response, f"Expected attribute {attr} in {response}" if full_output: return response else: return response.get(attributes[0]) @reporter.step("Get bucket policy") def get_bucket_policy(self, bucket: str) -> dict: if bucket.startswith("-") or " " in bucket: bucket = f'"{bucket}"' cmd = ( f"aws {self.common_flags} s3api get-bucket-policy --bucket {bucket} " f"--endpoint {self.s3gate_endpoint} --profile {self.profile}" ) output = self.local_shell.exec(cmd).stdout response = self._to_json(output) return response.get("Policy") @reporter.step("Delete bucket policy") def delete_bucket_policy(self, bucket: str) -> dict: if bucket.startswith("-") or " " in bucket: bucket = f'"{bucket}"' cmd = ( f"aws {self.common_flags} s3api delete-bucket-policy --bucket {bucket} " f"--endpoint {self.s3gate_endpoint} --profile {self.profile}" ) output = self.local_shell.exec(cmd).stdout response = self._to_json(output) return response @reporter.step("Put bucket policy") def put_bucket_policy(self, bucket: str, policy: dict) -> None: if bucket.startswith("-") or " " in bucket: bucket = f'"{bucket}"' # Leaving it as is was in test repo. Double dumps to escape resulting string # Example: # policy = {"a": 1} # json.dumps(policy) => {"a": 1} # json.dumps(json.dumps(policy)) => "{\"a\": 1}" # TODO: update this dumped_policy = json.dumps(json.dumps(policy)) cmd = ( f"aws {self.common_flags} s3api put-bucket-policy --bucket {bucket} " f"--policy {dumped_policy} --endpoint {self.s3gate_endpoint} --profile {self.profile}" ) self.local_shell.exec(cmd) @reporter.step("Get bucket cors") def get_bucket_cors(self, bucket: str) -> dict: if bucket.startswith("-") or " " in bucket: bucket = f'"{bucket}"' cmd = ( f"aws {self.common_flags} s3api get-bucket-cors --bucket {bucket} " f"--endpoint {self.s3gate_endpoint} --profile {self.profile}" ) output = self.local_shell.exec(cmd).stdout response = self._to_json(output) return response.get("CORSRules") @reporter.step("Put bucket cors") def put_bucket_cors(self, bucket: str, cors_configuration: dict) -> None: if bucket.startswith("-") or " " in bucket: bucket = f'"{bucket}"' cmd = ( f"aws {self.common_flags} s3api put-bucket-cors --bucket {bucket} " f"--cors-configuration '{json.dumps(cors_configuration)}' --endpoint {self.s3gate_endpoint} --profile {self.profile}" ) self.local_shell.exec(cmd) @reporter.step("Delete bucket cors") def delete_bucket_cors(self, bucket: str) -> None: if bucket.startswith("-") or " " in bucket: bucket = f'"{bucket}"' cmd = ( f"aws {self.common_flags} s3api delete-bucket-cors --bucket {bucket} " f"--endpoint {self.s3gate_endpoint} --profile {self.profile}" ) self.local_shell.exec(cmd) @reporter.step("Delete bucket tagging") def delete_bucket_tagging(self, bucket: str) -> None: if bucket.startswith("-") or " " in bucket: bucket = f'"{bucket}"' cmd = ( f"aws {self.common_flags} s3api delete-bucket-tagging --bucket {bucket} " f"--endpoint {self.s3gate_endpoint} --profile {self.profile}" ) self.local_shell.exec(cmd) @reporter.step("Put object retention") def put_object_retention( self, bucket: str, key: str, retention: dict, version_id: Optional[str] = None, bypass_governance_retention: Optional[bool] = None, ) -> None: if bucket.startswith("-") or " " in bucket: bucket = f'"{bucket}"' version = f" --version-id {version_id}" if version_id else "" cmd = ( f"aws {self.common_flags} s3api put-object-retention --bucket {bucket} --key {key} " f"{version} --retention '{json.dumps(retention, indent=4, sort_keys=True, default=str)}' --endpoint {self.s3gate_endpoint} --profile {self.profile}" ) if bypass_governance_retention is not None: cmd += " --bypass-governance-retention" self.local_shell.exec(cmd) @reporter.step("Put object legal hold") def put_object_legal_hold( self, bucket: str, key: str, legal_hold_status: Literal["ON", "OFF"], version_id: Optional[str] = None, ) -> None: if bucket.startswith("-") or " " in bucket: bucket = f'"{bucket}"' version = f" --version-id {version_id}" if version_id else "" legal_hold = json.dumps({"Status": legal_hold_status}) cmd = ( f"aws {self.common_flags} s3api put-object-legal-hold --bucket {bucket} --key {key} " f"{version} --legal-hold '{legal_hold}' --endpoint {self.s3gate_endpoint} --profile {self.profile}" ) self.local_shell.exec(cmd) @reporter.step("Put object tagging") def put_object_tagging(self, bucket: str, key: str, tags: list, version_id: Optional[str] = "") -> None: if bucket.startswith("-") or " " in bucket: bucket = f'"{bucket}"' tags = [{"Key": tag_key, "Value": tag_value} for tag_key, tag_value in tags] tagging = {"TagSet": tags} version = f" --version-id {version_id}" if version_id else "" cmd = ( f"aws {self.common_flags} s3api put-object-tagging --bucket {bucket} --key {key} " f"{version} --tagging '{json.dumps(tagging)}' --endpoint {self.s3gate_endpoint} --profile {self.profile}" ) self.local_shell.exec(cmd) @reporter.step("Get object tagging") def get_object_tagging(self, bucket: str, key: str, version_id: Optional[str] = None) -> list: if bucket.startswith("-") or " " in bucket: bucket = f'"{bucket}"' version = f" --version-id {version_id}" if version_id else "" cmd = ( f"aws {self.common_flags} s3api get-object-tagging --bucket {bucket} --key {key} " f"{version} --endpoint {self.s3gate_endpoint} --profile {self.profile}" ) output = self.local_shell.exec(cmd).stdout response = self._to_json(output) return response.get("TagSet") @reporter.step("Delete object tagging") def delete_object_tagging(self, bucket: str, key: str, version_id: Optional[str] = None) -> None: if bucket.startswith("-") or " " in bucket: bucket = f'"{bucket}"' version = f" --version-id {version_id}" if version_id else "" cmd = ( f"aws {self.common_flags} s3api delete-object-tagging --bucket {bucket} " f"--key {key} {version} --endpoint {self.s3gate_endpoint} --profile {self.profile}" ) self.local_shell.exec(cmd) @reporter.step("Sync directory S3") def sync( self, bucket: str, dir_path: str, acl: Optional[str] = None, metadata: Optional[dict] = None, ) -> dict: if bucket.startswith("-") or " " in bucket: bucket = f'"{bucket}"' cmd = ( f"aws {self.common_flags} s3 sync {dir_path} s3://{bucket} " f"--endpoint-url {self.s3gate_endpoint} --profile {self.profile}" ) if metadata: cmd += " --metadata" for key, value in metadata.items(): cmd += f" {key}={value}" if acl: cmd += f" --acl {acl}" output = self.local_shell.exec(cmd, command_options).stdout return self._to_json(output) @reporter.step("CP directory S3") def cp( self, bucket: str, dir_path: str, acl: Optional[str] = None, metadata: Optional[dict] = None, ) -> dict: if bucket.startswith("-") or " " in bucket: bucket = f'"{bucket}"' cmd = ( f"aws {self.common_flags} s3 cp {dir_path} s3://{bucket} " f"--endpoint-url {self.s3gate_endpoint} --recursive --profile {self.profile}" ) if metadata: cmd += " --metadata" for key, value in metadata.items(): cmd += f" {key}={value}" if acl: cmd += f" --acl {acl}" output = self.local_shell.exec(cmd, command_options).stdout return self._to_json(output) @reporter.step("Create multipart upload S3") def create_multipart_upload(self, bucket: str, key: str) -> str: if bucket.startswith("-") or " " in bucket: bucket = f'"{bucket}"' cmd = ( f"aws {self.common_flags} s3api create-multipart-upload --bucket {bucket} " f"--key {key} --endpoint-url {self.s3gate_endpoint} --profile {self.profile}" ) output = self.local_shell.exec(cmd).stdout response = self._to_json(output) assert response.get("UploadId"), f"Expected UploadId in response:\n{response}" return response["UploadId"] @reporter.step("List multipart uploads S3") def list_multipart_uploads(self, bucket: str) -> Optional[list[dict]]: if bucket.startswith("-") or " " in bucket: bucket = f'"{bucket}"' cmd = ( f"aws {self.common_flags} s3api list-multipart-uploads --bucket {bucket} " f"--endpoint-url {self.s3gate_endpoint} --profile {self.profile}" ) output = self.local_shell.exec(cmd).stdout response = self._to_json(output) return response.get("Uploads") @reporter.step("Abort multipart upload S3") def abort_multipart_upload(self, bucket: str, key: str, upload_id: str) -> None: if bucket.startswith("-") or " " in bucket: bucket = f'"{bucket}"' cmd = ( f"aws {self.common_flags} s3api abort-multipart-upload --bucket {bucket} " f"--key {key} --upload-id {upload_id} --endpoint-url {self.s3gate_endpoint} --profile {self.profile}" ) self.local_shell.exec(cmd) @reporter.step("Upload part S3") def upload_part(self, bucket: str, key: str, upload_id: str, part_num: int, filepath: str) -> str: if bucket.startswith("-") or " " in bucket: bucket = f'"{bucket}"' cmd = ( f"aws {self.common_flags} s3api upload-part --bucket {bucket} --key {key} " f"--upload-id {upload_id} --part-number {part_num} --body {filepath} " f"--endpoint-url {self.s3gate_endpoint} --profile {self.profile}" ) output = self.local_shell.exec(cmd, command_options).stdout response = self._to_json(output) assert response.get("ETag"), f"Expected ETag in response:\n{response}" return response["ETag"] @reporter.step("Upload copy part S3") def upload_part_copy(self, bucket: str, key: str, upload_id: str, part_num: int, copy_source: str) -> str: if bucket.startswith("-") or " " in bucket: bucket = f'"{bucket}"' cmd = ( f"aws {self.common_flags} s3api upload-part-copy --bucket {bucket} --key {key} " f"--upload-id {upload_id} --part-number {part_num} --copy-source {copy_source} " f"--endpoint-url {self.s3gate_endpoint} --profile {self.profile}" ) output = self.local_shell.exec(cmd, command_options).stdout response = self._to_json(output) assert response.get("CopyPartResult", []).get("ETag"), f"Expected ETag in response:\n{response}" return response["CopyPartResult"]["ETag"] @reporter.step("List parts S3") def list_parts(self, bucket: str, key: str, upload_id: str) -> list[dict]: if bucket.startswith("-") or " " in bucket: bucket = f'"{bucket}"' cmd = ( f"aws {self.common_flags} s3api list-parts --bucket {bucket} --key {key} " f"--upload-id {upload_id} --endpoint-url {self.s3gate_endpoint} --profile {self.profile}" ) output = self.local_shell.exec(cmd).stdout response = self._to_json(output) assert response.get("Parts"), f"Expected Parts in response:\n{response}" return response["Parts"] @reporter.step("Complete multipart upload S3") def complete_multipart_upload(self, bucket: str, key: str, upload_id: str, parts: list) -> dict: if bucket.startswith("-") or " " in bucket: bucket = f'"{bucket}"' file_path = os.path.join(os.getcwd(), ASSETS_DIR, "parts.json") parts_dict = {"Parts": [{"ETag": etag, "PartNumber": part_num} for part_num, etag in parts]} with open(file_path, "w") as out_file: out_file.write(json.dumps(parts_dict)) logger.info(f"Input file for complete-multipart-upload: {json.dumps(parts_dict)}") cmd = ( f"aws {self.common_flags} s3api complete-multipart-upload --bucket {bucket} " f"--key {key} --upload-id {upload_id} --multipart-upload file://{file_path} " f"--endpoint-url {self.s3gate_endpoint} --profile {self.profile}" ) output = self.local_shell.exec(cmd).stdout response = self._to_json(output) return response @reporter.step("Put object lock configuration") def put_object_lock_configuration(self, bucket: str, configuration: dict) -> dict: if bucket.startswith("-") or " " in bucket: bucket = f'"{bucket}"' cmd = ( f"aws {self.common_flags} s3api put-object-lock-configuration --bucket {bucket} " f"--object-lock-configuration '{json.dumps(configuration)}' --endpoint-url {self.s3gate_endpoint} --profile {self.profile}" ) output = self.local_shell.exec(cmd).stdout return self._to_json(output) @reporter.step("Get object lock configuration") def get_object_lock_configuration(self, bucket: str): if bucket.startswith("-") or " " in bucket: bucket = f'"{bucket}"' cmd = ( f"aws {self.common_flags} s3api get-object-lock-configuration --bucket {bucket} " f"--endpoint-url {self.s3gate_endpoint} --profile {self.profile}" ) output = self.local_shell.exec(cmd).stdout response = self._to_json(output) return response.get("ObjectLockConfiguration") @reporter.step("Put bucket lifecycle configuration") def put_bucket_lifecycle_configuration(self, bucket: str, lifecycle_configuration: dict, dumped_configuration: str) -> dict: if bucket.startswith("-") or " " in bucket: bucket = f'"{bucket}"' cmd = ( f"aws {self.common_flags} s3api put-bucket-lifecycle-configuration --bucket {bucket} " f"--endpoint-url {self.s3gate_endpoint} --lifecycle-configuration file://{dumped_configuration} --profile {self.profile}" ) output = self.local_shell.exec(cmd).stdout response = self._to_json(output) return response @reporter.step("Get bucket lifecycle configuration") def get_bucket_lifecycle_configuration(self, bucket: str) -> dict: if bucket.startswith("-") or " " in bucket: bucket = f'"{bucket}"' cmd = ( f"aws {self.common_flags} s3api get-bucket-lifecycle-configuration --bucket {bucket} " f"--endpoint-url {self.s3gate_endpoint} --profile {self.profile}" ) output = self.local_shell.exec(cmd).stdout response = self._to_json(output) return response @reporter.step("Delete bucket lifecycle configuration") def delete_bucket_lifecycle(self, bucket: str) -> dict: if bucket.startswith("-") or " " in bucket: bucket = f'"{bucket}"' cmd = ( f"aws {self.common_flags} s3api delete-bucket-lifecycle --bucket {bucket} " f"--endpoint-url {self.s3gate_endpoint} --profile {self.profile}" ) output = self.local_shell.exec(cmd).stdout response = self._to_json(output) return response @staticmethod def _to_json(output: str) -> dict: json_output = {} if "{" not in output and "}" not in output: logger.warning(f"Could not parse json from output {output}") return json_output json_output = json.loads(output[output.index("{") :]) return json_output @reporter.step("Create presign url for the object") def create_presign_url(self, method: str, bucket: str, key: str, expires_in: Optional[int] = 3600) -> str: # AWS CLI does not support method definition and world only in 'get_object' state by default cmd = f"aws {self.common_flags} s3 presign s3://{bucket}/{key} " f"--endpoint-url {self.s3gate_endpoint} --profile {self.profile}" if expires_in: cmd += f" --expires-in {expires_in}" response = self.local_shell.exec(cmd).stdout return response.strip() # IAM METHODS # # Some methods don't have checks because AWS is silent in some cases (delete, attach, etc.) @reporter.step("Adds the specified user to the specified group") def iam_add_user_to_group(self, user_name: str, group_name: str) -> dict: cmd = f"aws {self.common_flags} iam add-user-to-group --user-name {user_name} --group-name {group_name} --endpoint {self.iam_endpoint}" if self.profile: cmd += f" --profile {self.profile}" output = self.local_shell.exec(cmd).stdout response = self._to_json(output) return response @reporter.step("Attaches the specified managed policy to the specified IAM group") def iam_attach_group_policy(self, group_name: str, policy_arn: str) -> dict: cmd = f"aws {self.common_flags} iam attach-group-policy --group-name {group_name} --policy-arn {policy_arn} --endpoint {self.iam_endpoint}" if self.profile: cmd += f" --profile {self.profile}" output = self.local_shell.exec(cmd).stdout response = self._to_json(output) sleep(S3_SYNC_WAIT_TIME * 14) return response @reporter.step("Attaches the specified managed policy to the specified user") def iam_attach_user_policy(self, user_name: str, policy_arn: str) -> dict: cmd = f"aws {self.common_flags} iam attach-user-policy --user-name {user_name} --policy-arn {policy_arn} --endpoint {self.iam_endpoint}" if self.profile: cmd += f" --profile {self.profile}" output = self.local_shell.exec(cmd).stdout response = self._to_json(output) sleep(S3_SYNC_WAIT_TIME * 14) return response @reporter.step("Creates a new AWS secret access key and access key ID for the specified user") def iam_create_access_key(self, user_name: Optional[str] = None) -> dict: cmd = f"aws {self.common_flags} iam create-access-key --endpoint {self.iam_endpoint}" if self.profile: cmd += f" --profile {self.profile}" if user_name: cmd += f" --user-name {user_name}" output = self.local_shell.exec(cmd).stdout response = self._to_json(output) access_key_id = response["AccessKey"].get("AccessKeyId") secret_access_key = response["AccessKey"].get("SecretAccessKey") assert access_key_id, f"Expected AccessKeyId in response:\n{response}" assert secret_access_key, f"Expected SecretAccessKey in response:\n{response}" return access_key_id, secret_access_key @reporter.step("Creates a new group") def iam_create_group(self, group_name: str) -> dict: cmd = f"aws {self.common_flags} iam create-group --group-name {group_name} --endpoint {self.iam_endpoint}" if self.profile: cmd += f" --profile {self.profile}" output = self.local_shell.exec(cmd).stdout response = self._to_json(output) assert response.get("Group"), f"Expected Group in response:\n{response}" assert response["Group"].get("GroupName") == group_name, f"GroupName should be equal to {group_name}" return response @reporter.step("Creates a new managed policy for your AWS account") def iam_create_policy(self, policy_name: str, policy_document: dict) -> dict: cmd = ( f"aws {self.common_flags} iam create-policy --endpoint {self.iam_endpoint}" f" --policy-name {policy_name} --policy-document '{json.dumps(policy_document)}'" ) if self.profile: cmd += f" --profile {self.profile}" output = self.local_shell.exec(cmd).stdout response = self._to_json(output) assert response.get("Policy"), f"Expected Policy in response:\n{response}" assert response["Policy"].get("PolicyName") == policy_name, f"PolicyName should be equal to {policy_name}" return response @reporter.step("Creates a new IAM user for your AWS account") def iam_create_user(self, user_name: str) -> dict: cmd = f"aws {self.common_flags} iam create-user --user-name {user_name} --endpoint {self.iam_endpoint}" if self.profile: cmd += f" --profile {self.profile}" output = self.local_shell.exec(cmd).stdout response = self._to_json(output) assert response.get("User"), f"Expected User in response:\n{response}" assert response["User"].get("UserName") == user_name, f"UserName should be equal to {user_name}" return response @reporter.step("Deletes the access key pair associated with the specified IAM user") def iam_delete_access_key(self, access_key_id: str, user_name: str) -> dict: cmd = f"aws {self.common_flags} iam delete-access-key --access-key-id {access_key_id} --user-name {user_name} --endpoint {self.iam_endpoint}" if self.profile: cmd += f" --profile {self.profile}" output = self.local_shell.exec(cmd).stdout response = self._to_json(output) return response @reporter.step("Deletes the specified IAM group") def iam_delete_group(self, group_name: str) -> dict: cmd = f"aws {self.common_flags} iam delete-group --group-name {group_name} --endpoint {self.iam_endpoint}" if self.profile: cmd += f" --profile {self.profile}" output = self.local_shell.exec(cmd).stdout response = self._to_json(output) return response @reporter.step("Deletes the specified inline policy that is embedded in the specified IAM group") def iam_delete_group_policy(self, group_name: str, policy_name: str) -> dict: cmd = f"aws {self.common_flags} iam delete-group-policy --group-name {group_name} --policy-name {policy_name} --endpoint {self.iam_endpoint}" if self.profile: cmd += f" --profile {self.profile}" output = self.local_shell.exec(cmd).stdout response = self._to_json(output) return response @reporter.step("Deletes the specified managed policy") def iam_delete_policy(self, policy_arn: str) -> dict: cmd = f"aws {self.common_flags} iam delete-policy --policy-arn {policy_arn} --endpoint {self.iam_endpoint}" if self.profile: cmd += f" --profile {self.profile}" output = self.local_shell.exec(cmd).stdout response = self._to_json(output) return response @reporter.step("Deletes the specified IAM user") def iam_delete_user(self, user_name: str) -> dict: cmd = f"aws {self.common_flags} iam delete-user --user-name {user_name} --endpoint {self.iam_endpoint}" if self.profile: cmd += f" --profile {self.profile}" output = self.local_shell.exec(cmd).stdout response = self._to_json(output) return response @reporter.step("Deletes the specified inline policy that is embedded in the specified IAM user") def iam_delete_user_policy(self, user_name: str, policy_name: str) -> dict: cmd = f"aws {self.common_flags} iam delete-user-policy --user-name {user_name} --policy-name {policy_name} --endpoint {self.iam_endpoint}" if self.profile: cmd += f" --profile {self.profile}" output = self.local_shell.exec(cmd).stdout response = self._to_json(output) return response @reporter.step("Removes the specified managed policy from the specified IAM group") def iam_detach_group_policy(self, group_name: str, policy_arn: str) -> dict: cmd = f"aws {self.common_flags} iam detach-group-policy --group-name {group_name} --policy-arn {policy_arn} --endpoint {self.iam_endpoint}" if self.profile: cmd += f" --profile {self.profile}" output = self.local_shell.exec(cmd).stdout response = self._to_json(output) sleep(S3_SYNC_WAIT_TIME * 14) return response @reporter.step("Removes the specified managed policy from the specified user") def iam_detach_user_policy(self, user_name: str, policy_arn: str) -> dict: cmd = f"aws {self.common_flags} iam detach-user-policy --user-name {user_name} --policy-arn {policy_arn} --endpoint {self.iam_endpoint}" if self.profile: cmd += f" --profile {self.profile}" output = self.local_shell.exec(cmd).stdout response = self._to_json(output) sleep(S3_SYNC_WAIT_TIME * 14) return response @reporter.step("Returns a list of IAM users that are in the specified IAM group") def iam_get_group(self, group_name: str) -> dict: cmd = f"aws {self.common_flags} iam get-group --group-name {group_name} --endpoint {self.iam_endpoint}" if self.profile: cmd += f" --profile {self.profile}" output = self.local_shell.exec(cmd).stdout response = self._to_json(output) assert "Users" in response.keys(), f"Expected Users in response:\n{response}" assert response.get("Group").get("GroupName") == group_name, f"GroupName should be equal to {group_name}" return response @reporter.step("Retrieves the specified inline policy document that is embedded in the specified IAM group") def iam_get_group_policy(self, group_name: str, policy_name: str) -> dict: cmd = f"aws {self.common_flags} iam get-group-policy --group-name {group_name} --policy-name {policy_name} --endpoint {self.iam_endpoint}" if self.profile: cmd += f" --profile {self.profile}" output = self.local_shell.exec(cmd).stdout response = self._to_json(output) return response @reporter.step("Retrieves information about the specified managed policy") def iam_get_policy(self, policy_arn: str) -> dict: cmd = f"aws {self.common_flags} iam get-policy --policy-arn {policy_arn} --endpoint {self.iam_endpoint}" if self.profile: cmd += f" --profile {self.profile}" output = self.local_shell.exec(cmd).stdout response = self._to_json(output) assert response.get("Policy"), f"Expected Policy in response:\n{response}" assert response["Policy"].get("Arn") == policy_arn, f"PolicyArn should be equal to {policy_arn}" return response @reporter.step("Retrieves information about the specified version of the specified managed policy") def iam_get_policy_version(self, policy_arn: str, version_id: str) -> dict: cmd = f"aws {self.common_flags} iam get-policy-version --policy-arn {policy_arn} --version-id {version_id} --endpoint {self.iam_endpoint}" if self.profile: cmd += f" --profile {self.profile}" output = self.local_shell.exec(cmd).stdout response = self._to_json(output) assert response.get("PolicyVersion"), f"Expected PolicyVersion in response:\n{response}" assert response["PolicyVersion"].get("VersionId") == version_id, f"VersionId should be equal to {version_id}" return response @reporter.step("Retrieves information about the specified IAM user") def iam_get_user(self, user_name: str) -> dict: cmd = f"aws {self.common_flags} iam get-user --user-name {user_name} --endpoint {self.iam_endpoint}" if self.profile: cmd += f" --profile {self.profile}" output = self.local_shell.exec(cmd).stdout response = self._to_json(output) assert response.get("User"), f"Expected User in response:\n{response}" assert response["User"].get("UserName") == user_name, f"UserName should be equal to {user_name}" return response @reporter.step("Retrieves the specified inline policy document that is embedded in the specified IAM user") def iam_get_user_policy(self, user_name: str, policy_name: str) -> dict: cmd = f"aws {self.common_flags} iam get-user-policy --user-name {user_name} --policy-name {policy_name} --endpoint {self.iam_endpoint}" if self.profile: cmd += f" --profile {self.profile}" output = self.local_shell.exec(cmd).stdout response = self._to_json(output) assert response.get("UserName"), f"Expected User in response:\n{response}" return response @reporter.step("Returns information about the access key IDs associated with the specified IAM user") def iam_list_access_keys(self, user_name: str) -> dict: cmd = f"aws {self.common_flags} iam list-access-keys --user-name {user_name} --endpoint {self.iam_endpoint}" if self.profile: cmd += f" --profile {self.profile}" output = self.local_shell.exec(cmd).stdout response = self._to_json(output) return response @reporter.step("Lists all managed policies that are attached to the specified IAM group") def iam_list_attached_group_policies(self, group_name: str) -> dict: cmd = f"aws {self.common_flags} iam list-attached-group-policies --group-name {group_name} --endpoint {self.iam_endpoint}" if self.profile: cmd += f" --profile {self.profile}" output = self.local_shell.exec(cmd).stdout response = self._to_json(output) assert "AttachedPolicies" in response.keys(), f"Expected AttachedPolicies in response:\n{response}" return response @reporter.step("Lists all managed policies that are attached to the specified IAM user") def iam_list_attached_user_policies(self, user_name: str) -> dict: cmd = f"aws {self.common_flags} iam list-attached-user-policies --user-name {user_name} --endpoint {self.iam_endpoint}" if self.profile: cmd += f" --profile {self.profile}" output = self.local_shell.exec(cmd).stdout response = self._to_json(output) assert "AttachedPolicies" in response.keys(), f"Expected AttachedPolicies in response:\n{response}" return response @reporter.step("Lists all IAM users, groups, and roles that the specified managed policy is attached to") def iam_list_entities_for_policy(self, policy_arn: str) -> dict: cmd = f"aws {self.common_flags} iam list-entities-for-policy --policy-arn {policy_arn} --endpoint {self.iam_endpoint}" if self.profile: cmd += f" --profile {self.profile}" output = self.local_shell.exec(cmd).stdout response = self._to_json(output) assert response.get("PolicyGroups"), f"Expected PolicyGroups in response:\n{response}" assert response.get("PolicyUsers"), f"Expected PolicyUsers in response:\n{response}" return response @reporter.step("Lists the names of the inline policies that are embedded in the specified IAM group") def iam_list_group_policies(self, group_name: str) -> dict: cmd = f"aws {self.common_flags} iam list-group-policies --group-name {group_name} --endpoint {self.iam_endpoint}" if self.profile: cmd += f" --profile {self.profile}" output = self.local_shell.exec(cmd).stdout response = self._to_json(output) assert "PolicyNames" in response.keys(), f"Expected PolicyNames in response:\n{response}" return response @reporter.step("Lists the IAM groups") def iam_list_groups(self) -> dict: cmd = f"aws {self.common_flags} iam list-groups --endpoint {self.iam_endpoint}" if self.profile: cmd += f" --profile {self.profile}" output = self.local_shell.exec(cmd).stdout response = self._to_json(output) assert "Groups" in response.keys(), f"Expected Groups in response:\n{response}" return response @reporter.step("Lists the IAM groups that the specified IAM user belongs to") def iam_list_groups_for_user(self, user_name: str) -> dict: cmd = f"aws {self.common_flags} iam list-groups-for-user --user-name {user_name} --endpoint {self.iam_endpoint}" if self.profile: cmd += f" --profile {self.profile}" output = self.local_shell.exec(cmd).stdout response = self._to_json(output) assert "Groups" in response.keys(), f"Expected Groups in response:\n{response}" return response @reporter.step("Lists all the managed policies that are available in your AWS account") def iam_list_policies(self) -> dict: cmd = f"aws {self.common_flags} iam list-policies --endpoint {self.iam_endpoint}" if self.profile: cmd += f" --profile {self.profile}" output = self.local_shell.exec(cmd).stdout response = self._to_json(output) assert "Policies" in response.keys(), f"Expected Policies in response:\n{response}" return response @reporter.step("Lists information about the versions of the specified managed policy") def iam_list_policy_versions(self, policy_arn: str) -> dict: cmd = f"aws {self.common_flags} iam list-policy-versions --policy-arn {policy_arn} --endpoint {self.iam_endpoint}" if self.profile: cmd += f" --profile {self.profile}" output = self.local_shell.exec(cmd).stdout response = self._to_json(output) assert response.get("Versions"), f"Expected Versions in response:\n{response}" return response @reporter.step("Lists the names of the inline policies embedded in the specified IAM user") def iam_list_user_policies(self, user_name: str) -> dict: cmd = f"aws {self.common_flags} iam list-user-policies --user-name {user_name} --endpoint {self.iam_endpoint}" if self.profile: cmd += f" --profile {self.profile}" output = self.local_shell.exec(cmd).stdout response = self._to_json(output) assert "PolicyNames" in response.keys(), f"Expected PolicyNames in response:\n{response}" return response @reporter.step("Lists the IAM users") def iam_list_users(self) -> dict: cmd = f"aws {self.common_flags} iam list-users --endpoint {self.iam_endpoint}" if self.profile: cmd += f" --profile {self.profile}" output = self.local_shell.exec(cmd).stdout response = self._to_json(output) assert "Users" in response.keys(), f"Expected Users in response:\n{response}" return response @reporter.step("Adds or updates an inline policy document that is embedded in the specified IAM group") def iam_put_group_policy(self, group_name: str, policy_name: str, policy_document: dict) -> dict: cmd = ( f"aws {self.common_flags} iam put-group-policy --endpoint {self.iam_endpoint}" f" --group-name {group_name} --policy-name {policy_name} --policy-document '{json.dumps(policy_document)}'" ) if self.profile: cmd += f" --profile {self.profile}" output = self.local_shell.exec(cmd).stdout response = self._to_json(output) sleep(S3_SYNC_WAIT_TIME * 14) return response @reporter.step("Adds or updates an inline policy document that is embedded in the specified IAM user") def iam_put_user_policy(self, user_name: str, policy_name: str, policy_document: dict) -> dict: cmd = ( f"aws {self.common_flags} iam put-user-policy --endpoint {self.iam_endpoint}" f" --user-name {user_name} --policy-name {policy_name} --policy-document '{json.dumps(policy_document)}'" ) if self.profile: cmd += f" --profile {self.profile}" output = self.local_shell.exec(cmd).stdout response = self._to_json(output) sleep(S3_SYNC_WAIT_TIME * 14) return response @reporter.step("Removes the specified user from the specified group") def iam_remove_user_from_group(self, group_name: str, user_name: str) -> dict: cmd = ( f"aws {self.common_flags} iam remove-user-from-group --endpoint {self.iam_endpoint}" f" --group-name {group_name} --user-name {user_name}" ) if self.profile: cmd += f" --profile {self.profile}" output = self.local_shell.exec(cmd).stdout response = self._to_json(output) return response @reporter.step("Updates the name and/or the path of the specified IAM group") def iam_update_group(self, group_name: str, new_name: Optional[str] = None, new_path: Optional[str] = None) -> dict: cmd = f"aws {self.common_flags} iam update-group --group-name {group_name} --endpoint {self.iam_endpoint}" if new_name: cmd += f" --new-group-name {new_name}" if new_path: cmd += f" --new-path {new_path}" if self.profile: cmd += f" --profile {self.profile}" output = self.local_shell.exec(cmd).stdout response = self._to_json(output) return response @reporter.step("Updates the name and/or the path of the specified IAM user") def iam_update_user(self, user_name: str, new_name: Optional[str] = None, new_path: Optional[str] = None) -> dict: cmd = f"aws {self.common_flags} iam update-user --user-name {user_name} --endpoint {self.iam_endpoint}" if new_name: cmd += f" --new-user-name {new_name}" if new_path: cmd += f" --new-path {new_path}" if self.profile: cmd += f" --profile {self.profile}" output = self.local_shell.exec(cmd).stdout response = self._to_json(output) return response @reporter.step("Adds one or more tags to an IAM user") def iam_tag_user(self, user_name: str, tags: list) -> dict: tags_json = [{"Key": tag_key, "Value": tag_value} for tag_key, tag_value in tags] cmd = ( f"aws {self.common_flags} iam tag-user --user-name {user_name} --tags '{json.dumps(tags_json)}' --endpoint {self.iam_endpoint}" ) if self.profile: cmd += f" --profile {self.profile}" output = self.local_shell.exec(cmd).stdout response = self._to_json(output) return response @reporter.step("List tags of IAM user") def iam_list_user_tags(self, user_name: str) -> dict: cmd = f"aws {self.common_flags} iam list-user-tags --user-name {user_name} --endpoint {self.iam_endpoint}" if self.profile: cmd += f" --profile {self.profile}" output = self.local_shell.exec(cmd).stdout response = self._to_json(output) return response @reporter.step("Removes the specified tags from the user") def iam_untag_user(self, user_name: str, tag_keys: list) -> dict: tag_keys_joined = " ".join(tag_keys) cmd = f"aws {self.common_flags} iam untag-user --user-name {user_name} --tag-keys {tag_keys_joined} --endpoint {self.iam_endpoint}" if self.profile: cmd += f" --profile {self.profile}" output = self.local_shell.exec(cmd).stdout response = self._to_json(output) return response # MFA METHODS @reporter.step("Creates a new virtual MFA device") def iam_create_virtual_mfa_device(self, virtual_mfa_device_name: str, outfile: str, bootstrap_method: str) -> tuple: cmd = f"aws {self.common_flags} iam create-virtual-mfa-device --virtual-mfa-device-name {virtual_mfa_device_name}\ --outfile {outfile} --bootstrap-method {bootstrap_method} --endpoint {self.iam_endpoint}" if self.profile: cmd += f" --profile {self.profile}" output = self.local_shell.exec(cmd).stdout response = self._to_json(output) serial_number = response.get("VirtualMFADevice", {}).get("SerialNumber") assert serial_number, f"Expected SerialNumber in response:\n{response}" return serial_number, False @reporter.step("Deactivates the specified MFA device and removes it from association with the user name") def iam_deactivate_mfa_device(self, user_name: str, serial_number: str) -> dict: cmd = f"aws {self.common_flags} iam deactivate-mfa-device --user-name {user_name} --serial-number {serial_number} --endpoint {self.iam_endpoint}" if self.profile: cmd += f" --profile {self.profile}" output = self.local_shell.exec(cmd).stdout response = self._to_json(output) return response @reporter.step("Deletes a virtual MFA device") def iam_delete_virtual_mfa_device(self, serial_number: str) -> dict: cmd = f"aws {self.common_flags} iam delete-virtual-mfa-device --serial-number {serial_number} --endpoint {self.iam_endpoint}" if self.profile: cmd += f" --profile {self.profile}" output = self.local_shell.exec(cmd).stdout response = self._to_json(output) return response @reporter.step("Enables the specified MFA device and associates it with the specified IAM user") def iam_enable_mfa_device(self, user_name: str, serial_number: str, authentication_code1: str, authentication_code2: str) -> dict: cmd = f"aws {self.common_flags} iam enable-mfa-device --user-name {user_name} --serial-number {serial_number} --authentication-code1 {authentication_code1}\ --authentication-code2 {authentication_code2} --endpoint {self.iam_endpoint}" if self.profile: cmd += f" --profile {self.profile}" output = self.local_shell.exec(cmd).stdout response = self._to_json(output) return response @reporter.step("Lists the MFA devices for an IAM user") def iam_list_virtual_mfa_devices(self) -> dict: cmd = f"aws {self.common_flags} iam list-virtual-mfa-devices --endpoint {self.iam_endpoint}" if self.profile: cmd += f" --profile {self.profile}" output = self.local_shell.exec(cmd).stdout response = self._to_json(output) assert response.get("VirtualMFADevices"), f"Expected VirtualMFADevices in response:\n{response}" return response @reporter.step("Get session token for user") def sts_get_session_token( self, duration_seconds: Optional[str] = None, serial_number: Optional[str] = None, token_code: Optional[str] = None ) -> tuple: cmd = f"aws {self.common_flags} sts get-session-token --endpoint {self.iam_endpoint}" if duration_seconds: cmd += f" --duration-seconds {duration_seconds}" if serial_number: cmd += f" --serial-number {serial_number}" if token_code: cmd += f" --token-code {token_code}" if self.profile: cmd += f" --profile {self.profile}" output = self.local_shell.exec(cmd).stdout response = self._to_json(output) access_key = response.get("Credentials", {}).get("AccessKeyId") secret_access_key = response.get("Credentials", {}).get("SecretAccessKey") session_token = response.get("Credentials", {}).get("SessionToken") assert access_key, f"Expected AccessKeyId in response:\n{response}" assert secret_access_key, f"Expected SecretAccessKey in response:\n{response}" assert session_token, f"Expected SessionToken in response:\n{response}" return access_key, secret_access_key, session_token