[#304] Improve logging Boto3 IAM methods
Some checks failed
DCO action / DCO (pull_request) Has been cancelled

Signed-off-by: Kirill Sosnovskikh <k.sosnovskikh@yadro.com>
This commit is contained in:
k.sosnovskikh 2024-10-18 13:25:12 +03:00
parent 738cfacbb7
commit 5fa58a55c0

View file

@ -68,6 +68,7 @@ class Boto3ClientWrapper(S3ClientWrapper):
self.access_key_id: str = access_key_id self.access_key_id: str = access_key_id
self.secret_access_key: str = secret_access_key self.secret_access_key: str = secret_access_key
self.s3gate_endpoint: str = "" self.s3gate_endpoint: str = ""
self.iam_endpoint: str = ""
self.boto3_iam_client: S3Client = None self.boto3_iam_client: S3Client = None
self.set_endpoint(s3gate_endpoint) self.set_endpoint(s3gate_endpoint)
@ -90,11 +91,16 @@ class Boto3ClientWrapper(S3ClientWrapper):
@reporter.step("Set endpoint IAM to {iam_endpoint}") @reporter.step("Set endpoint IAM to {iam_endpoint}")
def set_iam_endpoint(self, iam_endpoint: str): def set_iam_endpoint(self, iam_endpoint: str):
if self.iam_endpoint == iam_endpoint:
return
self.iam_endpoint = iam_endpoint
self.boto3_iam_client = self.session.client( self.boto3_iam_client = self.session.client(
service_name="iam", service_name="iam",
aws_access_key_id=self.access_key_id, aws_access_key_id=self.access_key_id,
aws_secret_access_key=self.secret_access_key, aws_secret_access_key=self.secret_access_key,
endpoint_url=iam_endpoint, endpoint_url=self.iam_endpoint,
verify=False, verify=False,
) )
@ -687,25 +693,36 @@ class Boto3ClientWrapper(S3ClientWrapper):
# Some methods don't have checks because boto3 is silent in some cases (delete, attach, etc.) # Some methods don't have checks because boto3 is silent in some cases (delete, attach, etc.)
@reporter.step("Adds the specified user to the specified group") @reporter.step("Adds the specified user to the specified group")
@report_error
def iam_add_user_to_group(self, user_name: str, group_name: str) -> dict: def iam_add_user_to_group(self, user_name: str, group_name: str) -> dict:
response = self.boto3_iam_client.add_user_to_group(UserName=user_name, GroupName=group_name) params = self._convert_to_s3_params(locals().items())
response = self.boto3_iam_client.add_user_to_group(**params)
log_command_execution(self.iam_endpoint, "IAM Add User to Group", response, params)
return response return response
@reporter.step("Attaches the specified managed policy to the specified IAM group") @reporter.step("Attaches the specified managed policy to the specified IAM group")
@report_error
def iam_attach_group_policy(self, group_name: str, policy_arn: str) -> dict: def iam_attach_group_policy(self, group_name: str, policy_arn: str) -> dict:
response = self.boto3_iam_client.attach_group_policy(GroupName=group_name, PolicyArn=policy_arn) params = self._convert_to_s3_params(locals().items())
response = self.boto3_iam_client.attach_group_policy(**params)
log_command_execution(self.iam_endpoint, "IAM Attach Group Policy", response, params)
sleep(S3_SYNC_WAIT_TIME * 10) sleep(S3_SYNC_WAIT_TIME * 10)
return response return response
@reporter.step("Attaches the specified managed policy to the specified user") @reporter.step("Attaches the specified managed policy to the specified user")
@report_error
def iam_attach_user_policy(self, user_name: str, policy_arn: str) -> dict: def iam_attach_user_policy(self, user_name: str, policy_arn: str) -> dict:
response = self.boto3_iam_client.attach_user_policy(UserName=user_name, PolicyArn=policy_arn) params = self._convert_to_s3_params(locals().items())
response = self.boto3_iam_client.attach_user_policy(**params)
log_command_execution(self.iam_endpoint, "IAM Attach User Policy", response, params)
sleep(S3_SYNC_WAIT_TIME * 10) sleep(S3_SYNC_WAIT_TIME * 10)
return response return response
@reporter.step("Creates a new AWS secret access key and access key ID for the specified user") @reporter.step("Creates a new AWS secret access key and access key ID for the specified user")
@report_error
def iam_create_access_key(self, user_name: str) -> dict: def iam_create_access_key(self, user_name: str) -> dict:
response = self.boto3_iam_client.create_access_key(UserName=user_name) response = self.boto3_iam_client.create_access_key(UserName=user_name)
log_command_execution(self.iam_endpoint, "IAM Create Access Key", response, {"UserName": user_name})
access_key_id = response["AccessKey"].get("AccessKeyId") access_key_id = response["AccessKey"].get("AccessKeyId")
secret_access_key = response["AccessKey"].get("SecretAccessKey") secret_access_key = response["AccessKey"].get("SecretAccessKey")
@ -715,138 +732,190 @@ class Boto3ClientWrapper(S3ClientWrapper):
return access_key_id, secret_access_key return access_key_id, secret_access_key
@reporter.step("Creates a new group") @reporter.step("Creates a new group")
@report_error
def iam_create_group(self, group_name: str) -> dict: def iam_create_group(self, group_name: str) -> dict:
response = self.boto3_iam_client.create_group(GroupName=group_name) response = self.boto3_iam_client.create_group(GroupName=group_name)
log_command_execution(self.iam_endpoint, "IAM Create Group", response, {"GroupName": group_name})
assert response.get("Group"), f"Expected Group in response:\n{response}" assert response.get("Group"), f"Expected Group in response:\n{response}"
assert response["Group"].get("GroupName") == group_name, f"GroupName should be equal to {group_name}" assert response["Group"].get("GroupName") == group_name, f"GroupName should be equal to {group_name}"
return response return response
@reporter.step("Creates a new managed policy for your AWS account") @reporter.step("Creates a new managed policy for your AWS account")
@report_error
def iam_create_policy(self, policy_name: str, policy_document: dict) -> dict: def iam_create_policy(self, policy_name: str, policy_document: dict) -> dict:
response = self.boto3_iam_client.create_policy(PolicyName=policy_name, PolicyDocument=json.dumps(policy_document)) params = self._convert_to_s3_params(locals().items())
params["PolicyDocument"] = json.dumps(policy_document)
response = self.boto3_iam_client.create_policy(**params)
log_command_execution(self.iam_endpoint, "IAM Create Policy", response, params)
assert response.get("Policy"), f"Expected Policy in response:\n{response}" assert response.get("Policy"), f"Expected Policy in response:\n{response}"
assert response["Policy"].get("PolicyName") == policy_name, f"PolicyName should be equal to {policy_name}" assert response["Policy"].get("PolicyName") == policy_name, f"PolicyName should be equal to {policy_name}"
return response return response
@reporter.step("Creates a new IAM user for your AWS account") @reporter.step("Creates a new IAM user for your AWS account")
@report_error
def iam_create_user(self, user_name: str) -> dict: def iam_create_user(self, user_name: str) -> dict:
response = self.boto3_iam_client.create_user(UserName=user_name) response = self.boto3_iam_client.create_user(UserName=user_name)
log_command_execution(self.iam_endpoint, "IAM Create User", response, {"UserName": user_name})
assert response.get("User"), f"Expected User in response:\n{response}" assert response.get("User"), f"Expected User in response:\n{response}"
assert response["User"].get("UserName") == user_name, f"UserName should be equal to {user_name}" assert response["User"].get("UserName") == user_name, f"UserName should be equal to {user_name}"
return response return response
@reporter.step("Deletes the access key pair associated with the specified IAM user") @reporter.step("Deletes the access key pair associated with the specified IAM user")
@report_error
def iam_delete_access_key(self, access_key_id: str, user_name: str) -> dict: def iam_delete_access_key(self, access_key_id: str, user_name: str) -> dict:
response = self.boto3_iam_client.delete_access_key(AccessKeyId=access_key_id, UserName=user_name) params = self._convert_to_s3_params(locals().items())
response = self.boto3_iam_client.delete_access_key(**params)
log_command_execution(self.iam_endpoint, "IAM Delete Access Key", response, params)
return response return response
@reporter.step("Deletes the specified IAM group") @reporter.step("Deletes the specified IAM group")
@report_error
def iam_delete_group(self, group_name: str) -> dict: def iam_delete_group(self, group_name: str) -> dict:
response = self.boto3_iam_client.delete_group(GroupName=group_name) response = self.boto3_iam_client.delete_group(GroupName=group_name)
log_command_execution(self.iam_endpoint, "IAM Delete Group", response, {"GroupName": group_name})
return response return response
@reporter.step("Deletes the specified inline policy that is embedded in the specified IAM group") @reporter.step("Deletes the specified inline policy that is embedded in the specified IAM group")
@report_error
def iam_delete_group_policy(self, group_name: str, policy_name: str) -> dict: def iam_delete_group_policy(self, group_name: str, policy_name: str) -> dict:
response = self.boto3_iam_client.delete_group_policy(GroupName=group_name, PolicyName=policy_name) params = self._convert_to_s3_params(locals().items())
response = self.boto3_iam_client.delete_group_policy(**params)
log_command_execution(self.iam_endpoint, "IAM Delete Group Policy", response, params)
return response return response
@reporter.step("Deletes the specified managed policy") @reporter.step("Deletes the specified managed policy")
@report_error
def iam_delete_policy(self, policy_arn: str) -> dict: def iam_delete_policy(self, policy_arn: str) -> dict:
response = self.boto3_iam_client.delete_policy(PolicyArn=policy_arn) response = self.boto3_iam_client.delete_policy(PolicyArn=policy_arn)
log_command_execution(self.iam_endpoint, "IAM Delete Policy", response, {"PolicyArn": policy_arn})
return response return response
@reporter.step("Deletes the specified IAM user") @reporter.step("Deletes the specified IAM user")
@report_error
def iam_delete_user(self, user_name: str) -> dict: def iam_delete_user(self, user_name: str) -> dict:
response = self.boto3_iam_client.delete_user(UserName=user_name) response = self.boto3_iam_client.delete_user(UserName=user_name)
log_command_execution(self.iam_endpoint, "IAM Delete User", response, {"UserName": user_name})
return response return response
@reporter.step("Deletes the specified inline policy that is embedded in the specified IAM user") @reporter.step("Deletes the specified inline policy that is embedded in the specified IAM user")
@report_error
def iam_delete_user_policy(self, user_name: str, policy_name: str) -> dict: def iam_delete_user_policy(self, user_name: str, policy_name: str) -> dict:
response = self.boto3_iam_client.delete_user_policy(UserName=user_name, PolicyName=policy_name) params = self._convert_to_s3_params(locals().items())
response = self.boto3_iam_client.delete_user_policy(**params)
log_command_execution(self.iam_endpoint, "IAM Delete User Policy", response, params)
return response return response
@reporter.step("Removes the specified managed policy from the specified IAM group") @reporter.step("Removes the specified managed policy from the specified IAM group")
@report_error
def iam_detach_group_policy(self, group_name: str, policy_arn: str) -> dict: def iam_detach_group_policy(self, group_name: str, policy_arn: str) -> dict:
response = self.boto3_iam_client.detach_group_policy(GroupName=group_name, PolicyArn=policy_arn) params = self._convert_to_s3_params(locals().items())
response = self.boto3_iam_client.detach_group_policy(**params)
log_command_execution(self.iam_endpoint, "IAM Detach Group Policy", response, params)
sleep(S3_SYNC_WAIT_TIME * 10) sleep(S3_SYNC_WAIT_TIME * 10)
return response return response
@reporter.step("Removes the specified managed policy from the specified user") @reporter.step("Removes the specified managed policy from the specified user")
@report_error
def iam_detach_user_policy(self, user_name: str, policy_arn: str) -> dict: def iam_detach_user_policy(self, user_name: str, policy_arn: str) -> dict:
response = self.boto3_iam_client.detach_user_policy(UserName=user_name, PolicyArn=policy_arn) params = self._convert_to_s3_params(locals().items())
response = self.boto3_iam_client.detach_user_policy(**params)
log_command_execution(self.iam_endpoint, "IAM Detach User Policy", response, params)
sleep(S3_SYNC_WAIT_TIME * 10) sleep(S3_SYNC_WAIT_TIME * 10)
return response return response
@reporter.step("Returns a list of IAM users that are in the specified IAM group") @reporter.step("Returns a list of IAM users that are in the specified IAM group")
@report_error
def iam_get_group(self, group_name: str) -> dict: def iam_get_group(self, group_name: str) -> dict:
response = self.boto3_iam_client.get_group(GroupName=group_name) response = self.boto3_iam_client.get_group(GroupName=group_name)
log_command_execution(self.iam_endpoint, "IAM Get Group", response, {"GroupName": group_name})
assert response.get("Group").get("GroupName") == group_name, f"GroupName should be equal to {group_name}" assert response.get("Group").get("GroupName") == group_name, f"GroupName should be equal to {group_name}"
return response return response
@reporter.step("Retrieves the specified inline policy document that is embedded in the specified IAM group") @reporter.step("Retrieves the specified inline policy document that is embedded in the specified IAM group")
@report_error
def iam_get_group_policy(self, group_name: str, policy_name: str) -> dict: def iam_get_group_policy(self, group_name: str, policy_name: str) -> dict:
response = self.boto3_iam_client.get_group_policy(GroupName=group_name, PolicyName=policy_name) params = self._convert_to_s3_params(locals().items())
response = self.boto3_iam_client.get_group_policy(**params)
log_command_execution(self.iam_endpoint, "IAM Get Group Policy", response, params)
return response return response
@reporter.step("Retrieves information about the specified managed policy") @reporter.step("Retrieves information about the specified managed policy")
@report_error
def iam_get_policy(self, policy_arn: str) -> dict: def iam_get_policy(self, policy_arn: str) -> dict:
response = self.boto3_iam_client.get_policy(PolicyArn=policy_arn) response = self.boto3_iam_client.get_policy(PolicyArn=policy_arn)
log_command_execution(self.iam_endpoint, "IAM Get Policy", response, {"PolicyArn": policy_arn})
assert response.get("Policy"), f"Expected Policy in response:\n{response}" assert response.get("Policy"), f"Expected Policy in response:\n{response}"
assert response["Policy"].get("Arn") == policy_arn, f"PolicyArn should be equal to {policy_arn}" assert response["Policy"].get("Arn") == policy_arn, f"PolicyArn should be equal to {policy_arn}"
return response return response
@reporter.step("Retrieves information about the specified version of the specified managed policy") @reporter.step("Retrieves information about the specified version of the specified managed policy")
@report_error
def iam_get_policy_version(self, policy_arn: str, version_id: str) -> dict: def iam_get_policy_version(self, policy_arn: str, version_id: str) -> dict:
response = self.boto3_iam_client.get_policy_version(PolicyArn=policy_arn, VersionId=version_id) params = self._convert_to_s3_params(locals().items())
response = self.boto3_iam_client.get_policy_version(**params)
log_command_execution(self.iam_endpoint, "IAM Get Policy Version", response, params)
assert response.get("PolicyVersion"), f"Expected PolicyVersion in response:\n{response}" assert response.get("PolicyVersion"), f"Expected PolicyVersion in response:\n{response}"
assert response["PolicyVersion"].get("VersionId") == version_id, f"VersionId should be equal to {version_id}" assert response["PolicyVersion"].get("VersionId") == version_id, f"VersionId should be equal to {version_id}"
return response return response
@reporter.step("Retrieves information about the specified IAM user") @reporter.step("Retrieves information about the specified IAM user")
@report_error
def iam_get_user(self, user_name: str) -> dict: def iam_get_user(self, user_name: str) -> dict:
response = self.boto3_iam_client.get_user(UserName=user_name) response = self.boto3_iam_client.get_user(UserName=user_name)
log_command_execution(self.iam_endpoint, "IAM Get User", response, {"UserName": user_name})
assert response.get("User"), f"Expected User in response:\n{response}" assert response.get("User"), f"Expected User in response:\n{response}"
assert response["User"].get("UserName") == user_name, f"UserName should be equal to {user_name}" assert response["User"].get("UserName") == user_name, f"UserName should be equal to {user_name}"
return response return response
@reporter.step("Retrieves the specified inline policy document that is embedded in the specified IAM user") @reporter.step("Retrieves the specified inline policy document that is embedded in the specified IAM user")
@report_error
def iam_get_user_policy(self, user_name: str, policy_name: str) -> dict: def iam_get_user_policy(self, user_name: str, policy_name: str) -> dict:
response = self.boto3_iam_client.get_user_policy(UserName=user_name, PolicyName=policy_name) params = self._convert_to_s3_params(locals().items())
response = self.boto3_iam_client.get_user_policy(**params)
log_command_execution(self.iam_endpoint, "IAM Get User Policy", response, params)
assert response.get("UserName"), f"Expected UserName in response:\n{response}" assert response.get("UserName"), f"Expected UserName in response:\n{response}"
return response return response
@reporter.step("Returns information about the access key IDs associated with the specified IAM user") @reporter.step("Returns information about the access key IDs associated with the specified IAM user")
@report_error
def iam_list_access_keys(self, user_name: str) -> dict: def iam_list_access_keys(self, user_name: str) -> dict:
response = self.boto3_iam_client.list_access_keys(UserName=user_name) response = self.boto3_iam_client.list_access_keys(UserName=user_name)
log_command_execution(self.iam_endpoint, "IAM List Access Keys", response, {"UserName": user_name})
return response return response
@reporter.step("Lists all managed policies that are attached to the specified IAM group") @reporter.step("Lists all managed policies that are attached to the specified IAM group")
@report_error
def iam_list_attached_group_policies(self, group_name: str) -> dict: def iam_list_attached_group_policies(self, group_name: str) -> dict:
response = self.boto3_iam_client.list_attached_group_policies(GroupName=group_name) response = self.boto3_iam_client.list_attached_group_policies(GroupName=group_name)
log_command_execution(self.iam_endpoint, "IAM List Attached Group Policies", response, {"GroupName": group_name})
assert response.get("AttachedPolicies"), f"Expected AttachedPolicies in response:\n{response}" assert response.get("AttachedPolicies"), f"Expected AttachedPolicies in response:\n{response}"
return response return response
@reporter.step("Lists all managed policies that are attached to the specified IAM user") @reporter.step("Lists all managed policies that are attached to the specified IAM user")
@report_error
def iam_list_attached_user_policies(self, user_name: str) -> dict: def iam_list_attached_user_policies(self, user_name: str) -> dict:
response = self.boto3_iam_client.list_attached_user_policies(UserName=user_name) response = self.boto3_iam_client.list_attached_user_policies(UserName=user_name)
log_command_execution(self.iam_endpoint, "IAM List Attached User Policies", response, {"UserName": user_name})
assert response.get("AttachedPolicies"), f"Expected AttachedPolicies in response:\n{response}" assert response.get("AttachedPolicies"), f"Expected AttachedPolicies in response:\n{response}"
return response return response
@reporter.step("Lists all IAM users, groups, and roles that the specified managed policy is attached to") @reporter.step("Lists all IAM users, groups, and roles that the specified managed policy is attached to")
@report_error
def iam_list_entities_for_policy(self, policy_arn: str) -> dict: def iam_list_entities_for_policy(self, policy_arn: str) -> dict:
response = self.boto3_iam_client.list_entities_for_policy(PolicyArn=policy_arn) response = self.boto3_iam_client.list_entities_for_policy(PolicyArn=policy_arn)
log_command_execution(self.iam_endpoint, "IAM List Entities For Policy", response, {"PolicyArn": policy_arn})
assert response.get("PolicyGroups"), f"Expected PolicyGroups in response:\n{response}" assert response.get("PolicyGroups"), f"Expected PolicyGroups in response:\n{response}"
assert response.get("PolicyUsers"), f"Expected PolicyUsers in response:\n{response}" assert response.get("PolicyUsers"), f"Expected PolicyUsers in response:\n{response}"
@ -854,98 +923,125 @@ class Boto3ClientWrapper(S3ClientWrapper):
return response return response
@reporter.step("Lists the names of the inline policies that are embedded in the specified IAM group") @reporter.step("Lists the names of the inline policies that are embedded in the specified IAM group")
@report_error
def iam_list_group_policies(self, group_name: str) -> dict: def iam_list_group_policies(self, group_name: str) -> dict:
response = self.boto3_iam_client.list_group_policies(GroupName=group_name) response = self.boto3_iam_client.list_group_policies(GroupName=group_name)
log_command_execution(self.iam_endpoint, "IAM List Group Policies", response, {"GroupName": group_name})
assert response.get("PolicyNames"), f"Expected PolicyNames in response:\n{response}" assert response.get("PolicyNames"), f"Expected PolicyNames in response:\n{response}"
return response return response
@reporter.step("Lists the IAM groups") @reporter.step("Lists the IAM groups")
@report_error
def iam_list_groups(self) -> dict: def iam_list_groups(self) -> dict:
response = self.boto3_iam_client.list_groups() response = self.boto3_iam_client.list_groups()
log_command_execution(self.iam_endpoint, "IAM List Groups", response)
assert response.get("Groups"), f"Expected Groups in response:\n{response}" assert response.get("Groups"), f"Expected Groups in response:\n{response}"
return response return response
@reporter.step("Lists the IAM groups that the specified IAM user belongs to") @reporter.step("Lists the IAM groups that the specified IAM user belongs to")
@report_error
def iam_list_groups_for_user(self, user_name: str) -> dict: def iam_list_groups_for_user(self, user_name: str) -> dict:
response = self.boto3_iam_client.list_groups_for_user(UserName=user_name) response = self.boto3_iam_client.list_groups_for_user(UserName=user_name)
log_command_execution(self.iam_endpoint, "IAM List Groups For User", response, {"UserName": user_name})
assert response.get("Groups"), f"Expected Groups in response:\n{response}" assert response.get("Groups"), f"Expected Groups in response:\n{response}"
return response return response
@reporter.step("Lists all the managed policies that are available in your AWS account") @reporter.step("Lists all the managed policies that are available in your AWS account")
@report_error
def iam_list_policies(self) -> dict: def iam_list_policies(self) -> dict:
response = self.boto3_iam_client.list_policies() response = self.boto3_iam_client.list_policies()
log_command_execution(self.iam_endpoint, "IAM List Policies", response)
assert response.get("Policies"), f"Expected Policies in response:\n{response}" assert response.get("Policies"), f"Expected Policies in response:\n{response}"
return response return response
@reporter.step("Lists information about the versions of the specified managed policy") @reporter.step("Lists information about the versions of the specified managed policy")
@report_error
def iam_list_policy_versions(self, policy_arn: str) -> dict: def iam_list_policy_versions(self, policy_arn: str) -> dict:
response = self.boto3_iam_client.list_policy_versions(PolicyArn=policy_arn) response = self.boto3_iam_client.list_policy_versions(PolicyArn=policy_arn)
log_command_execution(self.iam_endpoint, "IAM List Policy Versions", response, {"PolicyArn": policy_arn})
assert response.get("Versions"), f"Expected Versions in response:\n{response}" assert response.get("Versions"), f"Expected Versions in response:\n{response}"
return response return response
@reporter.step("Lists the names of the inline policies embedded in the specified IAM user") @reporter.step("Lists the names of the inline policies embedded in the specified IAM user")
@report_error
def iam_list_user_policies(self, user_name: str) -> dict: def iam_list_user_policies(self, user_name: str) -> dict:
response = self.boto3_iam_client.list_user_policies(UserName=user_name) response = self.boto3_iam_client.list_user_policies(UserName=user_name)
log_command_execution(self.iam_endpoint, "IAM List User Policies", response, {"UserName": user_name})
assert response.get("PolicyNames"), f"Expected PolicyNames in response:\n{response}" assert response.get("PolicyNames"), f"Expected PolicyNames in response:\n{response}"
return response return response
@reporter.step("Lists the IAM users") @reporter.step("Lists the IAM users")
@report_error
def iam_list_users(self) -> dict: def iam_list_users(self) -> dict:
response = self.boto3_iam_client.list_users() response = self.boto3_iam_client.list_users()
log_command_execution(self.iam_endpoint, "IAM List Users", response)
assert response.get("Users"), f"Expected Users in response:\n{response}" assert response.get("Users"), f"Expected Users in response:\n{response}"
return response return response
@reporter.step("Adds or updates an inline policy document that is embedded in the specified IAM group") @reporter.step("Adds or updates an inline policy document that is embedded in the specified IAM group")
@report_error
def iam_put_group_policy(self, group_name: str, policy_name: str, policy_document: dict) -> dict: def iam_put_group_policy(self, group_name: str, policy_name: str, policy_document: dict) -> dict:
response = self.boto3_iam_client.put_group_policy( params = self._convert_to_s3_params(locals().items())
GroupName=group_name, PolicyName=policy_name, PolicyDocument=json.dumps(policy_document) params["PolicyDocument"] = json.dumps(policy_document)
) response = self.boto3_iam_client.put_group_policy(**params)
log_command_execution(self.iam_endpoint, "IAM Put Group Policy", response, params)
sleep(S3_SYNC_WAIT_TIME * 10) sleep(S3_SYNC_WAIT_TIME * 10)
return response return response
@reporter.step("Adds or updates an inline policy document that is embedded in the specified IAM user") @reporter.step("Adds or updates an inline policy document that is embedded in the specified IAM user")
@report_error
def iam_put_user_policy(self, user_name: str, policy_name: str, policy_document: dict) -> dict: def iam_put_user_policy(self, user_name: str, policy_name: str, policy_document: dict) -> dict:
response = self.boto3_iam_client.put_user_policy( params = self._convert_to_s3_params(locals().items())
UserName=user_name, PolicyName=policy_name, PolicyDocument=json.dumps(policy_document) params["PolicyDocument"] = json.dumps(policy_document)
) response = self.boto3_iam_client.put_user_policy(**params)
log_command_execution(self.iam_endpoint, "IAM Put User Policy", response, params)
sleep(S3_SYNC_WAIT_TIME * 10) sleep(S3_SYNC_WAIT_TIME * 10)
return response return response
@reporter.step("Removes the specified user from the specified group") @reporter.step("Removes the specified user from the specified group")
@report_error
def iam_remove_user_from_group(self, group_name: str, user_name: str) -> dict: def iam_remove_user_from_group(self, group_name: str, user_name: str) -> dict:
response = self.boto3_iam_client.remove_user_from_group(GroupName=group_name, UserName=user_name) params = self._convert_to_s3_params(locals().items())
response = self.boto3_iam_client.remove_user_from_group(**params)
log_command_execution(self.iam_endpoint, "IAM Remove User From Group", response, params)
return response return response
@reporter.step("Updates the name and/or the path of the specified IAM group") @reporter.step("Updates the name and/or the path of the specified IAM group")
@report_error
def iam_update_group(self, group_name: str, new_name: str, new_path: Optional[str] = None) -> dict: def iam_update_group(self, group_name: str, new_name: str, new_path: Optional[str] = None) -> dict:
response = self.boto3_iam_client.update_group(GroupName=group_name, NewGroupName=new_name, NewPath="/") params = {"GroupName": group_name, "NewGroupName": new_name, "NewPath": "/"}
response = self.boto3_iam_client.update_group(**params)
log_command_execution(self.iam_endpoint, "IAM Update Group", response, params)
return response return response
@reporter.step("Updates the name and/or the path of the specified IAM user") @reporter.step("Updates the name and/or the path of the specified IAM user")
@report_error
def iam_update_user(self, user_name: str, new_name: str, new_path: Optional[str] = None) -> dict: def iam_update_user(self, user_name: str, new_name: str, new_path: Optional[str] = None) -> dict:
response = self.boto3_iam_client.update_user(UserName=user_name, NewUserName=new_name, NewPath="/") params = {"UserName": user_name, "NewUserName": new_name, "NewPath": "/"}
response = self.boto3_iam_client.update_user(**params)
log_command_execution(self.iam_endpoint, "IAM Update User", response, params)
return response return response
@reporter.step("Adds one or more tags to an IAM user") @reporter.step("Adds one or more tags to an IAM user")
@report_error
def iam_tag_user(self, user_name: str, tags: list) -> dict: def iam_tag_user(self, user_name: str, tags: list) -> dict:
tags_json = [{"Key": tag_key, "Value": tag_value} for tag_key, tag_value in tags] params = self._convert_to_s3_params(locals().items())
response = self.boto3_iam_client.tag_user(UserName=user_name, Tags=tags_json) params["Tags"] = [{"Key": tag_key, "Value": tag_value} for tag_key, tag_value in tags]
response = self.boto3_iam_client.tag_user(**params)
log_command_execution(self.iam_endpoint, "IAM Tag User", response, params)
return response return response
@reporter.step("List tags of IAM user") @reporter.step("List tags of IAM user")
@report_error
def iam_list_user_tags(self, user_name: str) -> dict: def iam_list_user_tags(self, user_name: str) -> dict:
response = self.boto3_iam_client.list_user_tags(UserName=user_name) response = self.boto3_iam_client.list_user_tags(UserName=user_name)
log_command_execution(self.iam_endpoint, "IAM List User Tags", response, {"UserName": user_name})
return response return response
@reporter.step("Removes the specified tags from the user") @reporter.step("Removes the specified tags from the user")
@report_error
def iam_untag_user(self, user_name: str, tag_keys: list) -> dict: def iam_untag_user(self, user_name: str, tag_keys: list) -> dict:
response = self.boto3_iam_client.untag_user(UserName=user_name, TagKeys=tag_keys) params = self._convert_to_s3_params(locals().items())
response = self.boto3_iam_client.untag_user(**params)
log_command_execution(self.iam_endpoint, "IAM Untag User", response, params)
return response return response