frostfs-testlib/src/frostfs_testlib/s3/aws_cli_client.py

1326 lines
53 KiB
Python
Raw Normal View History

import json
import logging
import os
import uuid
from datetime import datetime
from time import sleep
from typing import Literal, Optional, Union
from frostfs_testlib import reporter
from frostfs_testlib.resources.common import ASSETS_DIR, MAX_REQUEST_ATTEMPTS, RETRY_MODE, S3_SYNC_WAIT_TIME
from frostfs_testlib.s3.interfaces import S3ClientWrapper, VersioningStatus, _make_objs_dict
from frostfs_testlib.shell import CommandOptions
from frostfs_testlib.shell.local_shell import LocalShell
# TODO: Refactor this code to use shell instead of _cmd_run
from frostfs_testlib.utils.cli_utils import _configure_aws_cli
logger = logging.getLogger("NeoLogger")
command_options = CommandOptions(timeout=480)
class AwsCliClient(S3ClientWrapper):
__repr_name__: str = "AWS CLI"
# Flags that we use for all S3 commands: disable SSL verification (as we use self-signed
# certificate in devenv) and disable automatic pagination in CLI output
common_flags = "--no-verify-ssl --no-paginate"
s3gate_endpoint: str
@reporter.step("Configure S3 client (aws cli)")
def __init__(
2023-11-23 12:53:03 +00:00
self, access_key_id: str, secret_access_key: str, s3gate_endpoint: str, profile: str = "default", region: str = "us-east-1"
) -> None:
self.s3gate_endpoint = s3gate_endpoint
2023-11-23 12:53:03 +00:00
self.access_key_id: str = access_key_id
self.secret_access_key: str = secret_access_key
2023-11-24 13:32:26 +00:00
self.profile = profile
self.local_shell = LocalShell()
2023-11-23 12:53:03 +00:00
self.region = region
self.iam_endpoint = None
try:
2023-11-23 12:53:03 +00:00
_configure_aws_cli(f"aws configure --profile {profile}", access_key_id, secret_access_key, region)
2023-11-24 13:32:26 +00:00
self.local_shell.exec(f"aws configure set max_attempts {MAX_REQUEST_ATTEMPTS} --profile {profile}")
self.local_shell.exec(
2023-11-24 13:32:26 +00:00
f"aws configure set retry_mode {RETRY_MODE} --profile {profile}",
)
except Exception as err:
raise RuntimeError("Error while configuring AwsCliClient") from err
2023-11-23 12:53:03 +00:00
@reporter.step("Set S3 endpoint to {s3gate_endpoint}")
def set_endpoint(self, s3gate_endpoint: str):
self.s3gate_endpoint = s3gate_endpoint
2023-11-23 12:53:03 +00:00
@reporter.step("Set IAM endpoint to {iam_endpoint}")
def set_iam_endpoint(self, iam_endpoint: str):
self.iam_endpoint = iam_endpoint
@reporter.step("Create bucket S3")
def create_bucket(
self,
bucket: Optional[str] = None,
object_lock_enabled_for_bucket: Optional[bool] = None,
acl: Optional[str] = None,
grant_write: Optional[str] = None,
grant_read: Optional[str] = None,
grant_full_control: Optional[str] = None,
location_constraint: Optional[str] = None,
) -> str:
if bucket is None:
bucket = str(uuid.uuid4())
if object_lock_enabled_for_bucket is None:
object_lock = ""
elif object_lock_enabled_for_bucket:
object_lock = " --object-lock-enabled-for-bucket"
else:
object_lock = " --no-object-lock-enabled-for-bucket"
cmd = (
f"aws {self.common_flags} s3api create-bucket --bucket {bucket} "
2023-11-24 13:32:26 +00:00
f"{object_lock} --endpoint {self.s3gate_endpoint} --profile {self.profile}"
)
if acl:
cmd += f" --acl {acl}"
if grant_full_control:
cmd += f" --grant-full-control {grant_full_control}"
if grant_write:
cmd += f" --grant-write {grant_write}"
if grant_read:
cmd += f" --grant-read {grant_read}"
if location_constraint:
cmd += f" --create-bucket-configuration LocationConstraint={location_constraint}"
self.local_shell.exec(cmd)
sleep(S3_SYNC_WAIT_TIME)
return bucket
@reporter.step("List buckets S3")
def list_buckets(self) -> list[str]:
2023-11-24 13:32:26 +00:00
cmd = f"aws {self.common_flags} s3api list-buckets --endpoint {self.s3gate_endpoint} --profile {self.profile}"
output = self.local_shell.exec(cmd).stdout
buckets_json = self._to_json(output)
return [bucket["Name"] for bucket in buckets_json["Buckets"]]
@reporter.step("Delete bucket S3")
def delete_bucket(self, bucket: str) -> None:
2023-11-24 13:32:26 +00:00
cmd = f"aws {self.common_flags} s3api delete-bucket --bucket {bucket} --endpoint {self.s3gate_endpoint} --profile {self.profile}"
self.local_shell.exec(cmd, command_options)
sleep(S3_SYNC_WAIT_TIME)
@reporter.step("Head bucket S3")
def head_bucket(self, bucket: str) -> None:
2023-11-24 13:32:26 +00:00
cmd = f"aws {self.common_flags} s3api head-bucket --bucket {bucket} --endpoint {self.s3gate_endpoint} --profile {self.profile}"
self.local_shell.exec(cmd)
@reporter.step("Put bucket versioning status")
def put_bucket_versioning(self, bucket: str, status: VersioningStatus) -> None:
cmd = (
f"aws {self.common_flags} s3api put-bucket-versioning --bucket {bucket} "
f"--versioning-configuration Status={status.value} "
2023-11-24 13:32:26 +00:00
f"--endpoint {self.s3gate_endpoint} --profile {self.profile}"
)
self.local_shell.exec(cmd)
@reporter.step("Get bucket versioning status")
def get_bucket_versioning_status(self, bucket: str) -> Literal["Enabled", "Suspended"]:
cmd = (
f"aws {self.common_flags} s3api get-bucket-versioning --bucket {bucket} "
2023-11-24 13:32:26 +00:00
f"--endpoint {self.s3gate_endpoint} --profile {self.profile}"
)
output = self.local_shell.exec(cmd).stdout
response = self._to_json(output)
return response.get("Status")
@reporter.step("Put bucket tagging")
def put_bucket_tagging(self, bucket: str, tags: list) -> None:
tags_json = {"TagSet": [{"Key": tag_key, "Value": tag_value} for tag_key, tag_value in tags]}
cmd = (
f"aws {self.common_flags} s3api put-bucket-tagging --bucket {bucket} "
2023-11-24 13:32:26 +00:00
f"--tagging '{json.dumps(tags_json)}' --endpoint {self.s3gate_endpoint} --profile {self.profile}"
)
self.local_shell.exec(cmd)
@reporter.step("Get bucket tagging")
def get_bucket_tagging(self, bucket: str) -> list:
cmd = (
f"aws {self.common_flags} s3api get-bucket-tagging --bucket {bucket} "
f"--endpoint {self.s3gate_endpoint} --profile {self.profile}"
)
output = self.local_shell.exec(cmd).stdout
response = self._to_json(output)
return response.get("TagSet")
@reporter.step("Get bucket acl")
def get_bucket_acl(self, bucket: str) -> list:
cmd = (
f"aws {self.common_flags} s3api get-bucket-acl --bucket {bucket} "
f"--endpoint {self.s3gate_endpoint} --profile {self.profile}"
)
output = self.local_shell.exec(cmd).stdout
response = self._to_json(output)
return response.get("Grants")
@reporter.step("Get bucket location")
def get_bucket_location(self, bucket: str) -> dict:
cmd = (
f"aws {self.common_flags} s3api get-bucket-location --bucket {bucket} "
f"--endpoint {self.s3gate_endpoint} --profile {self.profile}"
)
output = self.local_shell.exec(cmd).stdout
response = self._to_json(output)
return response.get("LocationConstraint")
@reporter.step("List objects S3")
def list_objects(self, bucket: str, full_output: bool = False) -> Union[dict, list[str]]:
cmd = (
f"aws {self.common_flags} s3api list-objects --bucket {bucket} "
f"--endpoint {self.s3gate_endpoint} --profile {self.profile}"
)
output = self.local_shell.exec(cmd).stdout
response = self._to_json(output)
obj_list = [obj["Key"] for obj in response.get("Contents", [])]
logger.info(f"Found s3 objects: {obj_list}")
return response if full_output else obj_list
@reporter.step("List objects S3 v2")
def list_objects_v2(self, bucket: str, full_output: bool = False) -> Union[dict, list[str]]:
cmd = (
f"aws {self.common_flags} s3api list-objects-v2 --bucket {bucket} "
f"--endpoint {self.s3gate_endpoint} --profile {self.profile}"
)
output = self.local_shell.exec(cmd).stdout
response = self._to_json(output)
obj_list = [obj["Key"] for obj in response.get("Contents", [])]
logger.info(f"Found s3 objects: {obj_list}")
return response if full_output else obj_list
@reporter.step("List objects versions S3")
def list_objects_versions(self, bucket: str, full_output: bool = False) -> dict:
cmd = (
f"aws {self.common_flags} s3api list-object-versions --bucket {bucket} "
2023-11-24 13:32:26 +00:00
f"--endpoint {self.s3gate_endpoint} --profile {self.profile}"
)
output = self.local_shell.exec(cmd).stdout
response = self._to_json(output)
return response if full_output else response.get("Versions", [])
@reporter.step("List objects delete markers S3")
def list_delete_markers(self, bucket: str, full_output: bool = False) -> list:
cmd = (
f"aws {self.common_flags} s3api list-object-versions --bucket {bucket} "
2023-11-24 13:32:26 +00:00
f"--endpoint {self.s3gate_endpoint} --profile {self.profile}"
)
output = self.local_shell.exec(cmd).stdout
response = self._to_json(output)
return response if full_output else response.get("DeleteMarkers", [])
@reporter.step("Copy object S3")
def copy_object(
self,
source_bucket: str,
source_key: str,
bucket: Optional[str] = None,
key: Optional[str] = None,
acl: Optional[str] = None,
metadata_directive: Optional[Literal["COPY", "REPLACE"]] = None,
metadata: Optional[dict] = None,
tagging_directive: Optional[Literal["COPY", "REPLACE"]] = None,
tagging: Optional[str] = None,
) -> str:
if bucket is None:
bucket = source_bucket
if key is None:
key = os.path.join(os.getcwd(), str(uuid.uuid4()))
copy_source = f"{source_bucket}/{source_key}"
cmd = (
f"aws {self.common_flags} s3api copy-object --copy-source {copy_source} "
2023-11-24 13:32:26 +00:00
f"--bucket {bucket} --key {key} --endpoint {self.s3gate_endpoint} --profile {self.profile}"
)
if acl:
cmd += f" --acl {acl}"
if metadata_directive:
cmd += f" --metadata-directive {metadata_directive}"
if metadata:
cmd += " --metadata "
for meta_key, value in metadata.items():
cmd += f" {meta_key}={value}"
if tagging_directive:
cmd += f" --tagging-directive {tagging_directive}"
if tagging:
cmd += f" --tagging {tagging}"
self.local_shell.exec(cmd, command_options)
return key
@reporter.step("Put object S3")
def put_object(
self,
bucket: str,
filepath: str,
key: Optional[str] = None,
metadata: Optional[dict] = None,
tagging: Optional[str] = None,
acl: Optional[str] = None,
object_lock_mode: Optional[str] = None,
object_lock_retain_until_date: Optional[datetime] = None,
object_lock_legal_hold_status: Optional[str] = None,
grant_full_control: Optional[str] = None,
grant_read: Optional[str] = None,
) -> str:
if key is None:
key = os.path.basename(filepath)
cmd = (
f"aws {self.common_flags} s3api put-object --bucket {bucket} --key {key} "
2023-11-24 13:32:26 +00:00
f"--body {filepath} --endpoint {self.s3gate_endpoint} --profile {self.profile}"
)
if metadata:
cmd += " --metadata"
for key, value in metadata.items():
cmd += f" {key}={value}"
if tagging:
cmd += f" --tagging '{tagging}'"
if acl:
cmd += f" --acl {acl}"
if object_lock_mode:
cmd += f" --object-lock-mode {object_lock_mode}"
if object_lock_retain_until_date:
cmd += f' --object-lock-retain-until-date "{object_lock_retain_until_date}"'
if object_lock_legal_hold_status:
cmd += f" --object-lock-legal-hold-status {object_lock_legal_hold_status}"
if grant_full_control:
cmd += f" --grant-full-control '{grant_full_control}'"
if grant_read:
cmd += f" --grant-read {grant_read}"
output = self.local_shell.exec(cmd, command_options).stdout
response = self._to_json(output)
return response.get("VersionId")
@reporter.step("Head object S3")
def head_object(self, bucket: str, key: str, version_id: Optional[str] = None) -> dict:
version = f" --version-id {version_id}" if version_id else ""
cmd = (
f"aws {self.common_flags} s3api head-object --bucket {bucket} --key {key} "
2023-11-24 13:32:26 +00:00
f"{version} --endpoint {self.s3gate_endpoint} --profile {self.profile}"
)
output = self.local_shell.exec(cmd).stdout
response = self._to_json(output)
return response
@reporter.step("Get object S3")
def get_object(
self,
bucket: str,
key: str,
version_id: Optional[str] = None,
object_range: Optional[tuple[int, int]] = None,
full_output: bool = False,
) -> Union[dict, str]:
file_path = os.path.join(os.getcwd(), ASSETS_DIR, str(uuid.uuid4()))
version = f" --version-id {version_id}" if version_id else ""
cmd = (
f"aws {self.common_flags} s3api get-object --bucket {bucket} --key {key} "
2023-11-24 13:32:26 +00:00
f"{version} {file_path} --endpoint {self.s3gate_endpoint} --profile {self.profile}"
)
if object_range:
cmd += f" --range bytes={object_range[0]}-{object_range[1]}"
output = self.local_shell.exec(cmd).stdout
response = self._to_json(output)
return response if full_output else file_path
@reporter.step("Get object ACL")
def get_object_acl(self, bucket: str, key: str, version_id: Optional[str] = None) -> list:
version = f" --version-id {version_id}" if version_id else ""
cmd = (
f"aws {self.common_flags} s3api get-object-acl --bucket {bucket} --key {key} "
2023-11-24 13:32:26 +00:00
f"{version} --endpoint {self.s3gate_endpoint} --profile {self.profile}"
)
output = self.local_shell.exec(cmd).stdout
response = self._to_json(output)
return response.get("Grants")
@reporter.step("Put object ACL")
def put_object_acl(
self,
bucket: str,
key: str,
acl: Optional[str] = None,
grant_write: Optional[str] = None,
grant_read: Optional[str] = None,
) -> list:
cmd = (
f"aws {self.common_flags} s3api put-object-acl --bucket {bucket} --key {key} "
2023-11-24 13:32:26 +00:00
f" --endpoint {self.s3gate_endpoint} --profile {self.profile}"
)
if acl:
cmd += f" --acl {acl}"
if grant_write:
cmd += f" --grant-write {grant_write}"
if grant_read:
cmd += f" --grant-read {grant_read}"
output = self.local_shell.exec(cmd).stdout
response = self._to_json(output)
return response.get("Grants")
@reporter.step("Put bucket ACL")
def put_bucket_acl(
self,
bucket: str,
acl: Optional[str] = None,
grant_write: Optional[str] = None,
grant_read: Optional[str] = None,
) -> None:
cmd = (
f"aws {self.common_flags} s3api put-bucket-acl --bucket {bucket} "
f" --endpoint {self.s3gate_endpoint} --profile {self.profile}"
)
if acl:
cmd += f" --acl {acl}"
if grant_write:
cmd += f" --grant-write {grant_write}"
if grant_read:
cmd += f" --grant-read {grant_read}"
self.local_shell.exec(cmd)
@reporter.step("Delete objects S3")
def delete_objects(self, bucket: str, keys: list[str]) -> dict:
file_path = os.path.join(os.getcwd(), ASSETS_DIR, "delete.json")
delete_structure = json.dumps(_make_objs_dict(keys))
with open(file_path, "w") as out_file:
out_file.write(delete_structure)
logger.info(f"Input file for delete-objects: {delete_structure}")
cmd = (
f"aws {self.common_flags} s3api delete-objects --bucket {bucket} "
2023-11-24 13:32:26 +00:00
f"--delete file://{file_path} --endpoint {self.s3gate_endpoint} --profile {self.profile}"
)
output = self.local_shell.exec(cmd, command_options).stdout
response = self._to_json(output)
sleep(S3_SYNC_WAIT_TIME)
return response
@reporter.step("Delete object S3")
def delete_object(self, bucket: str, key: str, version_id: Optional[str] = None) -> dict:
version = f" --version-id {version_id}" if version_id else ""
cmd = (
f"aws {self.common_flags} s3api delete-object --bucket {bucket} "
2023-11-24 13:32:26 +00:00
f"--key {key} {version} --endpoint {self.s3gate_endpoint} --profile {self.profile}"
)
output = self.local_shell.exec(cmd, command_options).stdout
sleep(S3_SYNC_WAIT_TIME)
return self._to_json(output)
@reporter.step("Delete object versions S3")
def delete_object_versions(self, bucket: str, object_versions: list) -> dict:
# Build deletion list in S3 format
delete_list = {
"Objects": [
{
"Key": object_version["Key"],
"VersionId": object_version["VersionId"],
}
for object_version in object_versions
]
}
file_path = os.path.join(os.getcwd(), ASSETS_DIR, "delete.json")
delete_structure = json.dumps(delete_list)
with open(file_path, "w") as out_file:
out_file.write(delete_structure)
logger.info(f"Input file for delete-objects: {delete_structure}")
cmd = (
f"aws {self.common_flags} s3api delete-objects --bucket {bucket} "
2023-11-24 13:32:26 +00:00
f"--delete file://{file_path} --endpoint {self.s3gate_endpoint} --profile {self.profile}"
)
output = self.local_shell.exec(cmd, command_options).stdout
sleep(S3_SYNC_WAIT_TIME)
return self._to_json(output)
@reporter.step("Delete object versions S3 without delete markers")
def delete_object_versions_without_dm(self, bucket: str, object_versions: list) -> None:
# Delete objects without creating delete markers
for object_version in object_versions:
self.delete_object(bucket=bucket, key=object_version["Key"], version_id=object_version["VersionId"])
@reporter.step("Get object attributes")
def get_object_attributes(
self,
bucket: str,
key: str,
attributes: list[str],
version_id: str = "",
max_parts: int = 0,
part_number: int = 0,
full_output: bool = True,
) -> dict:
attrs = ",".join(attributes)
version = f" --version-id {version_id}" if version_id else ""
parts = f"--max-parts {max_parts}" if max_parts else ""
part_number_str = f"--part-number-marker {part_number}" if part_number else ""
cmd = (
f"aws {self.common_flags} s3api get-object-attributes --bucket {bucket} "
f"--key {key} {version} {parts} {part_number_str} --object-attributes {attrs} "
2023-11-24 13:32:26 +00:00
f"--endpoint {self.s3gate_endpoint} --profile {self.profile}"
)
output = self.local_shell.exec(cmd).stdout
response = self._to_json(output)
for attr in attributes:
assert attr in response, f"Expected attribute {attr} in {response}"
if full_output:
return response
else:
return response.get(attributes[0])
@reporter.step("Get bucket policy")
def get_bucket_policy(self, bucket: str) -> dict:
cmd = (
f"aws {self.common_flags} s3api get-bucket-policy --bucket {bucket} "
f"--endpoint {self.s3gate_endpoint} --profile {self.profile}"
)
output = self.local_shell.exec(cmd).stdout
response = self._to_json(output)
return response.get("Policy")
@reporter.step("Put bucket policy")
def put_bucket_policy(self, bucket: str, policy: dict) -> None:
# Leaving it as is was in test repo. Double dumps to escape resulting string
# Example:
# policy = {"a": 1}
# json.dumps(policy) => {"a": 1}
# json.dumps(json.dumps(policy)) => "{\"a\": 1}"
# TODO: update this
dumped_policy = json.dumps(json.dumps(policy))
cmd = (
f"aws {self.common_flags} s3api put-bucket-policy --bucket {bucket} "
2023-11-24 13:32:26 +00:00
f"--policy {dumped_policy} --endpoint {self.s3gate_endpoint} --profile {self.profile}"
)
self.local_shell.exec(cmd)
@reporter.step("Get bucket cors")
def get_bucket_cors(self, bucket: str) -> dict:
cmd = (
f"aws {self.common_flags} s3api get-bucket-cors --bucket {bucket} "
f"--endpoint {self.s3gate_endpoint} --profile {self.profile}"
)
output = self.local_shell.exec(cmd).stdout
response = self._to_json(output)
return response.get("CORSRules")
@reporter.step("Put bucket cors")
def put_bucket_cors(self, bucket: str, cors_configuration: dict) -> None:
cmd = (
f"aws {self.common_flags} s3api put-bucket-cors --bucket {bucket} "
2023-11-24 13:32:26 +00:00
f"--cors-configuration '{json.dumps(cors_configuration)}' --endpoint {self.s3gate_endpoint} --profile {self.profile}"
)
self.local_shell.exec(cmd)
@reporter.step("Delete bucket cors")
def delete_bucket_cors(self, bucket: str) -> None:
cmd = (
f"aws {self.common_flags} s3api delete-bucket-cors --bucket {bucket} "
f"--endpoint {self.s3gate_endpoint} --profile {self.profile}"
)
self.local_shell.exec(cmd)
@reporter.step("Delete bucket tagging")
def delete_bucket_tagging(self, bucket: str) -> None:
cmd = (
f"aws {self.common_flags} s3api delete-bucket-tagging --bucket {bucket} "
2023-11-24 13:32:26 +00:00
f"--endpoint {self.s3gate_endpoint} --profile {self.profile}"
)
self.local_shell.exec(cmd)
@reporter.step("Put object retention")
def put_object_retention(
self,
bucket: str,
key: str,
retention: dict,
version_id: Optional[str] = None,
bypass_governance_retention: Optional[bool] = None,
) -> None:
version = f" --version-id {version_id}" if version_id else ""
cmd = (
f"aws {self.common_flags} s3api put-object-retention --bucket {bucket} --key {key} "
2023-11-24 13:32:26 +00:00
f"{version} --retention '{json.dumps(retention, indent=4, sort_keys=True, default=str)}' --endpoint {self.s3gate_endpoint} --profile {self.profile}"
)
if bypass_governance_retention is not None:
cmd += " --bypass-governance-retention"
self.local_shell.exec(cmd)
@reporter.step("Put object legal hold")
def put_object_legal_hold(
self,
bucket: str,
key: str,
legal_hold_status: Literal["ON", "OFF"],
version_id: Optional[str] = None,
) -> None:
version = f" --version-id {version_id}" if version_id else ""
legal_hold = json.dumps({"Status": legal_hold_status})
cmd = (
f"aws {self.common_flags} s3api put-object-legal-hold --bucket {bucket} --key {key} "
2023-11-24 13:32:26 +00:00
f"{version} --legal-hold '{legal_hold}' --endpoint {self.s3gate_endpoint} --profile {self.profile}"
)
self.local_shell.exec(cmd)
@reporter.step("Put object tagging")
2023-11-23 12:53:03 +00:00
def put_object_tagging(self, bucket: str, key: str, tags: list, version_id: Optional[str] = '') -> None:
tags = [{"Key": tag_key, "Value": tag_value} for tag_key, tag_value in tags]
tagging = {"TagSet": tags}
2023-11-23 12:53:03 +00:00
version = f" --version-id {version_id}" if version_id else ""
cmd = (
f"aws {self.common_flags} s3api put-object-tagging --bucket {bucket} --key {key} "
2023-11-23 12:53:03 +00:00
f"{version} --tagging '{json.dumps(tagging)}' --endpoint {self.s3gate_endpoint} --profile {self.profile}"
)
self.local_shell.exec(cmd)
@reporter.step("Get object tagging")
def get_object_tagging(self, bucket: str, key: str, version_id: Optional[str] = None) -> list:
version = f" --version-id {version_id}" if version_id else ""
cmd = (
f"aws {self.common_flags} s3api get-object-tagging --bucket {bucket} --key {key} "
2023-11-24 13:32:26 +00:00
f"{version} --endpoint {self.s3gate_endpoint} --profile {self.profile}"
)
output = self.local_shell.exec(cmd).stdout
response = self._to_json(output)
return response.get("TagSet")
@reporter.step("Delete object tagging")
2023-11-23 12:53:03 +00:00
def delete_object_tagging(self, bucket: str, key: str, version_id: Optional[str] = None) -> None:
version = f" --version-id {version_id}" if version_id else ""
cmd = (
f"aws {self.common_flags} s3api delete-object-tagging --bucket {bucket} "
2023-11-23 12:53:03 +00:00
f"--key {key} {version} --endpoint {self.s3gate_endpoint} --profile {self.profile}"
)
self.local_shell.exec(cmd)
@reporter.step("Sync directory S3")
def sync(
self,
bucket: str,
dir_path: str,
acl: Optional[str] = None,
metadata: Optional[dict] = None,
) -> dict:
cmd = (
f"aws {self.common_flags} s3 sync {dir_path} s3://{bucket} "
f"--endpoint-url {self.s3gate_endpoint} --profile {self.profile}"
)
if metadata:
cmd += " --metadata"
for key, value in metadata.items():
cmd += f" {key}={value}"
if acl:
cmd += f" --acl {acl}"
output = self.local_shell.exec(cmd, command_options).stdout
return self._to_json(output)
@reporter.step("CP directory S3")
def cp(
self,
bucket: str,
dir_path: str,
acl: Optional[str] = None,
metadata: Optional[dict] = None,
) -> dict:
cmd = (
f"aws {self.common_flags} s3 cp {dir_path} s3://{bucket} "
2023-11-24 13:32:26 +00:00
f"--endpoint-url {self.s3gate_endpoint} --recursive --profile {self.profile}"
)
if metadata:
cmd += " --metadata"
for key, value in metadata.items():
cmd += f" {key}={value}"
if acl:
cmd += f" --acl {acl}"
output = self.local_shell.exec(cmd, command_options).stdout
return self._to_json(output)
@reporter.step("Create multipart upload S3")
def create_multipart_upload(self, bucket: str, key: str) -> str:
cmd = (
f"aws {self.common_flags} s3api create-multipart-upload --bucket {bucket} "
2023-11-24 13:32:26 +00:00
f"--key {key} --endpoint-url {self.s3gate_endpoint} --profile {self.profile}"
)
output = self.local_shell.exec(cmd).stdout
response = self._to_json(output)
assert response.get("UploadId"), f"Expected UploadId in response:\n{response}"
return response["UploadId"]
@reporter.step("List multipart uploads S3")
def list_multipart_uploads(self, bucket: str) -> Optional[list[dict]]:
cmd = (
f"aws {self.common_flags} s3api list-multipart-uploads --bucket {bucket} "
2023-11-24 13:32:26 +00:00
f"--endpoint-url {self.s3gate_endpoint} --profile {self.profile}"
)
output = self.local_shell.exec(cmd).stdout
response = self._to_json(output)
return response.get("Uploads")
@reporter.step("Abort multipart upload S3")
def abort_multipart_upload(self, bucket: str, key: str, upload_id: str) -> None:
cmd = (
f"aws {self.common_flags} s3api abort-multipart-upload --bucket {bucket} "
2023-11-24 13:32:26 +00:00
f"--key {key} --upload-id {upload_id} --endpoint-url {self.s3gate_endpoint} --profile {self.profile}"
)
self.local_shell.exec(cmd)
@reporter.step("Upload part S3")
def upload_part(self, bucket: str, key: str, upload_id: str, part_num: int, filepath: str) -> str:
cmd = (
f"aws {self.common_flags} s3api upload-part --bucket {bucket} --key {key} "
f"--upload-id {upload_id} --part-number {part_num} --body {filepath} "
2023-11-24 13:32:26 +00:00
f"--endpoint-url {self.s3gate_endpoint} --profile {self.profile}"
)
output = self.local_shell.exec(cmd, command_options).stdout
response = self._to_json(output)
assert response.get("ETag"), f"Expected ETag in response:\n{response}"
return response["ETag"]
@reporter.step("Upload copy part S3")
def upload_part_copy(self, bucket: str, key: str, upload_id: str, part_num: int, copy_source: str) -> str:
cmd = (
f"aws {self.common_flags} s3api upload-part-copy --bucket {bucket} --key {key} "
f"--upload-id {upload_id} --part-number {part_num} --copy-source {copy_source} "
2023-11-24 13:32:26 +00:00
f"--endpoint-url {self.s3gate_endpoint} --profile {self.profile}"
)
output = self.local_shell.exec(cmd, command_options).stdout
response = self._to_json(output)
assert response.get("CopyPartResult", []).get("ETag"), f"Expected ETag in response:\n{response}"
return response["CopyPartResult"]["ETag"]
@reporter.step("List parts S3")
def list_parts(self, bucket: str, key: str, upload_id: str) -> list[dict]:
cmd = (
f"aws {self.common_flags} s3api list-parts --bucket {bucket} --key {key} "
2023-11-24 13:32:26 +00:00
f"--upload-id {upload_id} --endpoint-url {self.s3gate_endpoint} --profile {self.profile}"
)
output = self.local_shell.exec(cmd).stdout
response = self._to_json(output)
assert response.get("Parts"), f"Expected Parts in response:\n{response}"
return response["Parts"]
@reporter.step("Complete multipart upload S3")
def complete_multipart_upload(self, bucket: str, key: str, upload_id: str, parts: list) -> None:
file_path = os.path.join(os.getcwd(), ASSETS_DIR, "parts.json")
parts_dict = {"Parts": [{"ETag": etag, "PartNumber": part_num} for part_num, etag in parts]}
with open(file_path, "w") as out_file:
out_file.write(json.dumps(parts_dict))
logger.info(f"Input file for complete-multipart-upload: {json.dumps(parts_dict)}")
cmd = (
f"aws {self.common_flags} s3api complete-multipart-upload --bucket {bucket} "
f"--key {key} --upload-id {upload_id} --multipart-upload file://{file_path} "
2023-11-24 13:32:26 +00:00
f"--endpoint-url {self.s3gate_endpoint} --profile {self.profile}"
)
output = self.local_shell.exec(cmd).stdout
response = self._to_json(output)
return response
@reporter.step("Put object lock configuration")
def put_object_lock_configuration(self, bucket: str, configuration: dict) -> dict:
cmd = (
f"aws {self.common_flags} s3api put-object-lock-configuration --bucket {bucket} "
2023-11-24 13:32:26 +00:00
f"--object-lock-configuration '{json.dumps(configuration)}' --endpoint-url {self.s3gate_endpoint} --profile {self.profile}"
)
output = self.local_shell.exec(cmd).stdout
return self._to_json(output)
@reporter.step("Get object lock configuration")
def get_object_lock_configuration(self, bucket: str):
cmd = (
f"aws {self.common_flags} s3api get-object-lock-configuration --bucket {bucket} "
2023-11-24 13:32:26 +00:00
f"--endpoint-url {self.s3gate_endpoint} --profile {self.profile}"
)
output = self.local_shell.exec(cmd).stdout
response = self._to_json(output)
return response.get("ObjectLockConfiguration")
@staticmethod
def _to_json(output: str) -> dict:
json_output = {}
if "{" not in output and "}" not in output:
logger.warning(f"Could not parse json from output {output}")
return json_output
json_output = json.loads(output[output.index("{") :])
return json_output
2023-11-23 12:53:03 +00:00
# IAM METHODS #
# Some methods don't have checks because AWS is silent in some cases (delete, attach, etc.)
@reporter.step("Adds the specified user to the specified group")
def iam_add_user_to_group(self, user_name: str, group_name: str) -> dict:
cmd = (
f"aws {self.common_flags} iam add-user-to-group --user-name {user_name} --group-name {group_name} --endpoint {self.iam_endpoint}"
)
if self.profile:
cmd += f" --profile {self.profile}"
output = self.local_shell.exec(cmd).stdout
response = self._to_json(output)
return response
@reporter.step("Attaches the specified managed policy to the specified IAM group")
def iam_attach_group_policy(self, group_name: str, policy_arn: str) -> dict:
cmd = (
f"aws {self.common_flags} iam attach-group-policy --group-name {group_name} --policy-arn {policy_arn} --endpoint {self.iam_endpoint}"
)
if self.profile:
cmd += f" --profile {self.profile}"
output = self.local_shell.exec(cmd).stdout
response = self._to_json(output)
sleep(S3_SYNC_WAIT_TIME * 10)
return response
@reporter.step("Attaches the specified managed policy to the specified user")
def iam_attach_user_policy(self, user_name: str, policy_arn: str) -> dict:
cmd = (
f"aws {self.common_flags} iam attach-user-policy --user-name {user_name} --policy-arn {policy_arn} --endpoint {self.iam_endpoint}"
)
if self.profile:
cmd += f" --profile {self.profile}"
output = self.local_shell.exec(cmd).stdout
response = self._to_json(output)
sleep(S3_SYNC_WAIT_TIME * 10)
return response
@reporter.step("Creates a new AWS secret access key and access key ID for the specified user")
def iam_create_access_key(self, user_name: Optional[str] = None) -> dict:
cmd = (
f"aws {self.common_flags} iam create-access-key --endpoint {self.iam_endpoint}"
)
if self.profile:
cmd += f" --profile {self.profile}"
if user_name:
cmd += f" --user-name {user_name}"
output = self.local_shell.exec(cmd).stdout
response = self._to_json(output)
access_key_id = response["AccessKey"].get("AccessKeyId")
secret_access_key = response["AccessKey"].get("SecretAccessKey")
assert access_key_id, f"Expected AccessKeyId in response:\n{response}"
assert secret_access_key, f"Expected SecretAccessKey in response:\n{response}"
return access_key_id, secret_access_key
@reporter.step("Creates a new group")
def iam_create_group(self, group_name: str) -> dict:
cmd = (
f"aws {self.common_flags} iam create-group --group-name {group_name} --endpoint {self.iam_endpoint}"
)
if self.profile:
cmd += f" --profile {self.profile}"
output = self.local_shell.exec(cmd).stdout
response = self._to_json(output)
assert response.get("Group"), f"Expected Group in response:\n{response}"
assert response["Group"].get("GroupName") == group_name, f"GroupName should be equal to {group_name}"
return response
@reporter.step("Creates a new managed policy for your AWS account")
def iam_create_policy(self, policy_name: str, policy_document: dict) -> dict:
cmd = (
f"aws {self.common_flags} iam create-policy --endpoint {self.iam_endpoint}"
f" --policy-name {policy_name} --policy-document '{json.dumps(policy_document)}'"
)
if self.profile:
cmd += f" --profile {self.profile}"
output = self.local_shell.exec(cmd).stdout
response = self._to_json(output)
assert response.get("Policy"), f"Expected Policy in response:\n{response}"
assert response["Policy"].get("PolicyName") == policy_name, f"PolicyName should be equal to {policy_name}"
return response
@reporter.step("Creates a new IAM user for your AWS account")
def iam_create_user(self, user_name: str) -> dict:
cmd = (
f"aws {self.common_flags} iam create-user --user-name {user_name} --endpoint {self.iam_endpoint}"
)
if self.profile:
cmd += f" --profile {self.profile}"
output = self.local_shell.exec(cmd).stdout
response = self._to_json(output)
assert response.get("User"), f"Expected User in response:\n{response}"
assert response["User"].get("UserName") == user_name, f"UserName should be equal to {user_name}"
return response
@reporter.step("Deletes the access key pair associated with the specified IAM user")
def iam_delete_access_key(self, access_key_id: str, user_name: str) -> dict:
cmd = (
f"aws {self.common_flags} iam delete-access-key --access-key-id {access_key_id} --user-name {user_name} --endpoint {self.iam_endpoint}"
)
if self.profile:
cmd += f" --profile {self.profile}"
output = self.local_shell.exec(cmd).stdout
response = self._to_json(output)
return response
@reporter.step("Deletes the specified IAM group")
def iam_delete_group(self, group_name: str) -> dict:
cmd = (
f"aws {self.common_flags} iam delete-group --group-name {group_name} --endpoint {self.iam_endpoint}"
)
if self.profile:
cmd += f" --profile {self.profile}"
output = self.local_shell.exec(cmd).stdout
response = self._to_json(output)
return response
@reporter.step("Deletes the specified inline policy that is embedded in the specified IAM group")
def iam_delete_group_policy(self, group_name: str, policy_name: str) -> dict:
cmd = (
f"aws {self.common_flags} iam delete-group-policy --group-name {group_name} --policy-name {policy_name} --endpoint {self.iam_endpoint}"
)
if self.profile:
cmd += f" --profile {self.profile}"
output = self.local_shell.exec(cmd).stdout
response = self._to_json(output)
return response
@reporter.step("Deletes the specified managed policy")
def iam_delete_policy(self, policy_arn: str) -> dict:
cmd = (
f"aws {self.common_flags} iam delete-policy --policy-arn {policy_arn} --endpoint {self.iam_endpoint}"
)
if self.profile:
cmd += f" --profile {self.profile}"
output = self.local_shell.exec(cmd).stdout
response = self._to_json(output)
return response
@reporter.step("Deletes the specified IAM user")
def iam_delete_user(self, user_name: str) -> dict:
cmd = (
f"aws {self.common_flags} iam delete-user --user-name {user_name} --endpoint {self.iam_endpoint}"
)
if self.profile:
cmd += f" --profile {self.profile}"
output = self.local_shell.exec(cmd).stdout
response = self._to_json(output)
return response
@reporter.step("Deletes the specified inline policy that is embedded in the specified IAM user")
def iam_delete_user_policy(self, user_name: str, policy_name: str) -> dict:
cmd = (
f"aws {self.common_flags} iam delete-user-policy --user-name {user_name} --policy-name {policy_name} --endpoint {self.iam_endpoint}"
)
if self.profile:
cmd += f" --profile {self.profile}"
output = self.local_shell.exec(cmd).stdout
response = self._to_json(output)
return response
@reporter.step("Removes the specified managed policy from the specified IAM group")
def iam_detach_group_policy(self, group_name: str, policy_arn: str) -> dict:
cmd = (
f"aws {self.common_flags} iam detach-group-policy --group-name {group_name} --policy-arn {policy_arn} --endpoint {self.iam_endpoint}"
)
if self.profile:
cmd += f" --profile {self.profile}"
output = self.local_shell.exec(cmd).stdout
response = self._to_json(output)
sleep(S3_SYNC_WAIT_TIME * 10)
return response
@reporter.step("Removes the specified managed policy from the specified user")
def iam_detach_user_policy(self, user_name: str, policy_arn: str) -> dict:
cmd = (
f"aws {self.common_flags} iam detach-user-policy --user-name {user_name} --policy-arn {policy_arn} --endpoint {self.iam_endpoint}"
)
if self.profile:
cmd += f" --profile {self.profile}"
output = self.local_shell.exec(cmd).stdout
response = self._to_json(output)
sleep(S3_SYNC_WAIT_TIME * 10)
return response
@reporter.step("Returns a list of IAM users that are in the specified IAM group")
def iam_get_group(self, group_name: str) -> dict:
cmd = (
f"aws {self.common_flags} iam get-group --group-name {group_name} --endpoint {self.iam_endpoint}"
)
if self.profile:
cmd += f" --profile {self.profile}"
output = self.local_shell.exec(cmd).stdout
response = self._to_json(output)
assert "Users" in response.keys(), f"Expected Users in response:\n{response}"
assert response.get("Group").get("GroupName") == group_name, f"GroupName should be equal to {group_name}"
return response
@reporter.step("Retrieves the specified inline policy document that is embedded in the specified IAM group")
def iam_get_group_policy(self, group_name: str, policy_name: str) -> dict:
cmd = (
f"aws {self.common_flags} iam get-group-policy --group-name {group_name} --policy-name {policy_name} --endpoint {self.iam_endpoint}"
)
if self.profile:
cmd += f" --profile {self.profile}"
output = self.local_shell.exec(cmd).stdout
response = self._to_json(output)
return response
@reporter.step("Retrieves information about the specified managed policy")
def iam_get_policy(self, policy_arn: str) -> dict:
cmd = (
f"aws {self.common_flags} iam get-policy --policy-arn {policy_arn} --endpoint {self.iam_endpoint}"
)
if self.profile:
cmd += f" --profile {self.profile}"
output = self.local_shell.exec(cmd).stdout
response = self._to_json(output)
assert response.get("Policy"), f"Expected Policy in response:\n{response}"
assert response["Policy"].get("PolicyName") == policy_name, f"PolicyName should be equal to {policy_name}"
return response
@reporter.step("Retrieves information about the specified version of the specified managed policy")
def iam_get_policy_version(self, policy_arn: str, version_id: str) -> dict:
cmd = (
f"aws {self.common_flags} iam get-policy-version --policy-arn {policy_arn} --version-id {version_id} --endpoint {self.iam_endpoint}"
)
if self.profile:
cmd += f" --profile {self.profile}"
output = self.local_shell.exec(cmd).stdout
response = self._to_json(output)
assert response.get("PolicyVersion"), f"Expected PolicyVersion in response:\n{response}"
assert response["PolicyVersion"].get("VersionId") == version_id, f"VersionId should be equal to {version_id}"
return response
@reporter.step("Retrieves information about the specified IAM user")
def iam_get_user(self, user_name: str) -> dict:
cmd = (
f"aws {self.common_flags} iam get-user --user-name {user_name} --endpoint {self.iam_endpoint}"
)
if self.profile:
cmd += f" --profile {self.profile}"
output = self.local_shell.exec(cmd).stdout
response = self._to_json(output)
assert response.get("User"), f"Expected User in response:\n{response}"
assert response["User"].get("UserName") == user_name, f"UserName should be equal to {user_name}"
return response
@reporter.step("Retrieves the specified inline policy document that is embedded in the specified IAM user")
def iam_get_user_policy(self, user_name: str, policy_name: str) -> dict:
cmd = (
f"aws {self.common_flags} iam get-user-policy --user-name {user_name} --policy-name {policy_name} --endpoint {self.iam_endpoint}"
)
if self.profile:
cmd += f" --profile {self.profile}"
output = self.local_shell.exec(cmd).stdout
response = self._to_json(output)
assert response.get("UserName"), f"Expected User in response:\n{response}"
return response
@reporter.step("Returns information about the access key IDs associated with the specified IAM user")
def iam_list_access_keys(self, user_name: str) -> dict:
cmd = (
f"aws {self.common_flags} iam list-access-keys --user-name {user_name} --endpoint {self.iam_endpoint}"
)
if self.profile:
cmd += f" --profile {self.profile}"
output = self.local_shell.exec(cmd).stdout
response = self._to_json(output)
return response
@reporter.step("Lists all managed policies that are attached to the specified IAM group")
def iam_list_attached_group_policies(self, group_name: str) -> dict:
cmd = (
f"aws {self.common_flags} iam list-attached-group-policies --group-name {group_name} --endpoint {self.iam_endpoint}"
)
if self.profile:
cmd += f" --profile {self.profile}"
output = self.local_shell.exec(cmd).stdout
response = self._to_json(output)
assert response.get("AttachedPolicies"), f"Expected AttachedPolicies in response:\n{response}"
return response
@reporter.step("Lists all managed policies that are attached to the specified IAM user")
def iam_list_attached_user_policies(self, user_name: str) -> dict:
cmd = (
f"aws {self.common_flags} iam list-attached-user-policies --user-name {user_name} --endpoint {self.iam_endpoint}"
)
if self.profile:
cmd += f" --profile {self.profile}"
output = self.local_shell.exec(cmd).stdout
response = self._to_json(output)
assert response.get("AttachedPolicies"), f"Expected AttachedPolicies in response:\n{response}"
return response
@reporter.step("Lists all IAM users, groups, and roles that the specified managed policy is attached to")
def iam_list_entities_for_policy(self, policy_arn: str) -> dict:
cmd = (
f"aws {self.common_flags} iam list-entities-for-policy --policy-arn {policy_arn} --endpoint {self.iam_endpoint}"
)
if self.profile:
cmd += f" --profile {self.profile}"
output = self.local_shell.exec(cmd).stdout
response = self._to_json(output)
assert response.get("PolicyGroups"), f"Expected PolicyGroups in response:\n{response}"
assert response.get("PolicyUsers"), f"Expected PolicyUsers in response:\n{response}"
return response
@reporter.step("Lists the names of the inline policies that are embedded in the specified IAM group")
def iam_list_group_policies(self, group_name: str) -> dict:
cmd = (
f"aws {self.common_flags} iam list-group-policies --group-name {group_name} --endpoint {self.iam_endpoint}"
)
if self.profile:
cmd += f" --profile {self.profile}"
output = self.local_shell.exec(cmd).stdout
response = self._to_json(output)
assert response.get("PolicyNames"), f"Expected PolicyNames in response:\n{response}"
return response
@reporter.step("Lists the IAM groups")
def iam_list_groups(self) -> dict:
cmd = (
f"aws {self.common_flags} iam list-groups --endpoint {self.iam_endpoint}"
)
if self.profile:
cmd += f" --profile {self.profile}"
output = self.local_shell.exec(cmd).stdout
response = self._to_json(output)
assert response.get("Groups"), f"Expected Groups in response:\n{response}"
return response
@reporter.step("Lists the IAM groups that the specified IAM user belongs to")
def iam_list_groups_for_user(self, user_name: str) -> dict:
cmd = (
f"aws {self.common_flags} iam list-groups-for-user --user-name {user_name} --endpoint {self.iam_endpoint}"
)
if self.profile:
cmd += f" --profile {self.profile}"
output = self.local_shell.exec(cmd).stdout
response = self._to_json(output)
assert response.get("Groups"), f"Expected Groups in response:\n{response}"
return response
@reporter.step("Lists all the managed policies that are available in your AWS account")
def iam_list_policies(self) -> dict:
cmd = (
f"aws {self.common_flags} iam list-policies --endpoint {self.iam_endpoint}"
)
if self.profile:
cmd += f" --profile {self.profile}"
output = self.local_shell.exec(cmd).stdout
response = self._to_json(output)
assert 'Policies' in response.keys(), f"Expected Policies in response:\n{response}"
return response
@reporter.step("Lists information about the versions of the specified managed policy")
def iam_list_policy_versions(self, policy_arn: str) -> dict:
cmd = (
f"aws {self.common_flags} iam list-policy-versions --policy-arn {policy_arn} --endpoint {self.iam_endpoint}"
)
if self.profile:
cmd += f" --profile {self.profile}"
output = self.local_shell.exec(cmd).stdout
response = self._to_json(output)
assert response.get("Versions"), f"Expected Versions in response:\n{response}"
return response
@reporter.step("Lists the names of the inline policies embedded in the specified IAM user")
def iam_list_user_policies(self, user_name: str) -> dict:
cmd = (
f"aws {self.common_flags} iam list-user-policies --user-name {user_name} --endpoint {self.iam_endpoint}"
)
if self.profile:
cmd += f" --profile {self.profile}"
output = self.local_shell.exec(cmd).stdout
response = self._to_json(output)
assert response.get("PolicyNames"), f"Expected PolicyNames in response:\n{response}"
return response
@reporter.step("Lists the IAM users")
def iam_list_users(self) -> dict:
cmd = (
f"aws {self.common_flags} iam list-users --endpoint {self.iam_endpoint}"
)
if self.profile:
cmd += f" --profile {self.profile}"
output = self.local_shell.exec(cmd).stdout
response = self._to_json(output)
assert "Users" in response.keys(), f"Expected Users in response:\n{response}"
return response
@reporter.step("Adds or updates an inline policy document that is embedded in the specified IAM group")
def iam_put_group_policy(self, group_name: str, policy_name: str, policy_document: dict) -> dict:
cmd = (
f"aws {self.common_flags} iam put-group-policy --endpoint {self.iam_endpoint}"
f" --group-name {group_name} --policy-name {policy_name} --policy-document \'{json.dumps(policy_document)}\'"
)
if self.profile:
cmd += f" --profile {self.profile}"
output = self.local_shell.exec(cmd).stdout
response = self._to_json(output)
sleep(S3_SYNC_WAIT_TIME * 10)
return response
@reporter.step("Adds or updates an inline policy document that is embedded in the specified IAM user")
def iam_put_user_policy(self, user_name: str, policy_name: str, policy_document: dict) -> dict:
cmd = (
f"aws {self.common_flags} iam put-user-policy --endpoint {self.iam_endpoint}"
f" --user-name {user_name} --policy-name {policy_name} --policy-document \'{json.dumps(policy_document)}\'"
)
if self.profile:
cmd += f" --profile {self.profile}"
output = self.local_shell.exec(cmd).stdout
response = self._to_json(output)
sleep(S3_SYNC_WAIT_TIME * 10)
return response
@reporter.step("Removes the specified user from the specified group")
def iam_remove_user_from_group(self, group_name: str, user_name: str) -> dict:
cmd = (
f"aws {self.common_flags} iam remove-user-from-group --endpoint {self.iam_endpoint}"
f" --group-name {group_name} --user-name {user_name}"
)
if self.profile:
cmd += f" --profile {self.profile}"
output = self.local_shell.exec(cmd).stdout
response = self._to_json(output)
return response
@reporter.step("Updates the name and/or the path of the specified IAM group")
def iam_update_group(self, group_name: str, new_name: Optional[str] = None, new_path: Optional[str] = None) -> dict:
cmd = (
f"aws {self.common_flags} iam update-group --group-name {group_name} --endpoint {self.iam_endpoint}"
)
if new_name:
cmd += f" --new-group-name {new_name}"
if new_path:
cmd += f" --new-path {new_path}"
if self.profile:
cmd += f" --profile {self.profile}"
output = self.local_shell.exec(cmd).stdout
response = self._to_json(output)
return response
@reporter.step("Updates the name and/or the path of the specified IAM user")
def iam_update_user(self, user_name: str, new_name: Optional[str] = None, new_path: Optional[str] = None) -> dict:
cmd = (
f"aws {self.common_flags} iam update-user --user-name {user_name} --endpoint {self.iam_endpoint}"
)
if new_name:
cmd += f" --new-user-name {new_name}"
if new_path:
cmd += f" --new-path {new_path}"
if self.profile:
cmd += f" --profile {self.profile}"
output = self.local_shell.exec(cmd).stdout
response = self._to_json(output)
return response