[#304] Improve logging Boto3 IAM methods #304
1 changed files with 135 additions and 39 deletions
|
@ -68,6 +68,7 @@ class Boto3ClientWrapper(S3ClientWrapper):
|
|||
self.access_key_id: str = access_key_id
|
||||
self.secret_access_key: str = secret_access_key
|
||||
self.s3gate_endpoint: str = ""
|
||||
self.iam_endpoint: str = ""
|
||||
self.boto3_iam_client: S3Client = None
|
||||
self.set_endpoint(s3gate_endpoint)
|
||||
|
||||
|
@ -90,11 +91,16 @@ class Boto3ClientWrapper(S3ClientWrapper):
|
|||
|
||||
@reporter.step("Set endpoint IAM to {iam_endpoint}")
|
||||
def set_iam_endpoint(self, iam_endpoint: str):
|
||||
if self.iam_endpoint == iam_endpoint:
|
||||
return
|
||||
|
||||
self.iam_endpoint = iam_endpoint
|
||||
|
||||
self.boto3_iam_client = self.session.client(
|
||||
service_name="iam",
|
||||
aws_access_key_id=self.access_key_id,
|
||||
aws_secret_access_key=self.secret_access_key,
|
||||
endpoint_url=iam_endpoint,
|
||||
endpoint_url=self.iam_endpoint,
|
||||
verify=False,
|
||||
)
|
||||
|
||||
|
@ -687,25 +693,36 @@ class Boto3ClientWrapper(S3ClientWrapper):
|
|||
# Some methods don't have checks because boto3 is silent in some cases (delete, attach, etc.)
|
||||
|
||||
@reporter.step("Adds the specified user to the specified group")
|
||||
@report_error
|
||||
def iam_add_user_to_group(self, user_name: str, group_name: str) -> dict:
|
||||
response = self.boto3_iam_client.add_user_to_group(UserName=user_name, GroupName=group_name)
|
||||
params = self._convert_to_s3_params(locals().items())
|
||||
response = self.boto3_iam_client.add_user_to_group(**params)
|
||||
log_command_execution(self.iam_endpoint, "IAM Add User to Group", response, params)
|
||||
return response
|
||||
|
||||
@reporter.step("Attaches the specified managed policy to the specified IAM group")
|
||||
@report_error
|
||||
def iam_attach_group_policy(self, group_name: str, policy_arn: str) -> dict:
|
||||
response = self.boto3_iam_client.attach_group_policy(GroupName=group_name, PolicyArn=policy_arn)
|
||||
params = self._convert_to_s3_params(locals().items())
|
||||
response = self.boto3_iam_client.attach_group_policy(**params)
|
||||
log_command_execution(self.iam_endpoint, "IAM Attach Group Policy", response, params)
|
||||
sleep(S3_SYNC_WAIT_TIME * 10)
|
||||
return response
|
||||
|
||||
@reporter.step("Attaches the specified managed policy to the specified user")
|
||||
@report_error
|
||||
def iam_attach_user_policy(self, user_name: str, policy_arn: str) -> dict:
|
||||
response = self.boto3_iam_client.attach_user_policy(UserName=user_name, PolicyArn=policy_arn)
|
||||
params = self._convert_to_s3_params(locals().items())
|
||||
response = self.boto3_iam_client.attach_user_policy(**params)
|
||||
log_command_execution(self.iam_endpoint, "IAM Attach User Policy", response, params)
|
||||
sleep(S3_SYNC_WAIT_TIME * 10)
|
||||
return response
|
||||
|
||||
@reporter.step("Creates a new AWS secret access key and access key ID for the specified user")
|
||||
@report_error
|
||||
def iam_create_access_key(self, user_name: str) -> dict:
|
||||
response = self.boto3_iam_client.create_access_key(UserName=user_name)
|
||||
log_command_execution(self.iam_endpoint, "IAM Create Access Key", response, {"UserName": user_name})
|
||||
|
||||
access_key_id = response["AccessKey"].get("AccessKeyId")
|
||||
secret_access_key = response["AccessKey"].get("SecretAccessKey")
|
||||
|
@ -715,138 +732,190 @@ class Boto3ClientWrapper(S3ClientWrapper):
|
|||
return access_key_id, secret_access_key
|
||||
|
||||
@reporter.step("Creates a new group")
|
||||
@report_error
|
||||
def iam_create_group(self, group_name: str) -> dict:
|
||||
response = self.boto3_iam_client.create_group(GroupName=group_name)
|
||||
log_command_execution(self.iam_endpoint, "IAM Create Group", response, {"GroupName": group_name})
|
||||
|
||||
assert response.get("Group"), f"Expected Group in response:\n{response}"
|
||||
assert response["Group"].get("GroupName") == group_name, f"GroupName should be equal to {group_name}"
|
||||
|
||||
return response
|
||||
|
||||
@reporter.step("Creates a new managed policy for your AWS account")
|
||||
@report_error
|
||||
def iam_create_policy(self, policy_name: str, policy_document: dict) -> dict:
|
||||
response = self.boto3_iam_client.create_policy(PolicyName=policy_name, PolicyDocument=json.dumps(policy_document))
|
||||
params = self._convert_to_s3_params(locals().items())
|
||||
params["PolicyDocument"] = json.dumps(policy_document)
|
||||
response = self.boto3_iam_client.create_policy(**params)
|
||||
log_command_execution(self.iam_endpoint, "IAM Create Policy", response, params)
|
||||
|
||||
assert response.get("Policy"), f"Expected Policy in response:\n{response}"
|
||||
assert response["Policy"].get("PolicyName") == policy_name, f"PolicyName should be equal to {policy_name}"
|
||||
|
||||
return response
|
||||
|
||||
@reporter.step("Creates a new IAM user for your AWS account")
|
||||
@report_error
|
||||
def iam_create_user(self, user_name: str) -> dict:
|
||||
response = self.boto3_iam_client.create_user(UserName=user_name)
|
||||
log_command_execution(self.iam_endpoint, "IAM Create User", response, {"UserName": user_name})
|
||||
|
||||
assert response.get("User"), f"Expected User in response:\n{response}"
|
||||
assert response["User"].get("UserName") == user_name, f"UserName should be equal to {user_name}"
|
||||
|
||||
return response
|
||||
|
||||
@reporter.step("Deletes the access key pair associated with the specified IAM user")
|
||||
@report_error
|
||||
def iam_delete_access_key(self, access_key_id: str, user_name: str) -> dict:
|
||||
response = self.boto3_iam_client.delete_access_key(AccessKeyId=access_key_id, UserName=user_name)
|
||||
params = self._convert_to_s3_params(locals().items())
|
||||
response = self.boto3_iam_client.delete_access_key(**params)
|
||||
log_command_execution(self.iam_endpoint, "IAM Delete Access Key", response, params)
|
||||
return response
|
||||
|
||||
@reporter.step("Deletes the specified IAM group")
|
||||
@report_error
|
||||
def iam_delete_group(self, group_name: str) -> dict:
|
||||
response = self.boto3_iam_client.delete_group(GroupName=group_name)
|
||||
log_command_execution(self.iam_endpoint, "IAM Delete Group", response, {"GroupName": group_name})
|
||||
return response
|
||||
|
||||
@reporter.step("Deletes the specified inline policy that is embedded in the specified IAM group")
|
||||
@report_error
|
||||
def iam_delete_group_policy(self, group_name: str, policy_name: str) -> dict:
|
||||
response = self.boto3_iam_client.delete_group_policy(GroupName=group_name, PolicyName=policy_name)
|
||||
params = self._convert_to_s3_params(locals().items())
|
||||
response = self.boto3_iam_client.delete_group_policy(**params)
|
||||
log_command_execution(self.iam_endpoint, "IAM Delete Group Policy", response, params)
|
||||
return response
|
||||
|
||||
@reporter.step("Deletes the specified managed policy")
|
||||
@report_error
|
||||
def iam_delete_policy(self, policy_arn: str) -> dict:
|
||||
response = self.boto3_iam_client.delete_policy(PolicyArn=policy_arn)
|
||||
log_command_execution(self.iam_endpoint, "IAM Delete Policy", response, {"PolicyArn": policy_arn})
|
||||
return response
|
||||
|
||||
@reporter.step("Deletes the specified IAM user")
|
||||
@report_error
|
||||
def iam_delete_user(self, user_name: str) -> dict:
|
||||
response = self.boto3_iam_client.delete_user(UserName=user_name)
|
||||
log_command_execution(self.iam_endpoint, "IAM Delete User", response, {"UserName": user_name})
|
||||
return response
|
||||
|
||||
@reporter.step("Deletes the specified inline policy that is embedded in the specified IAM user")
|
||||
@report_error
|
||||
def iam_delete_user_policy(self, user_name: str, policy_name: str) -> dict:
|
||||
response = self.boto3_iam_client.delete_user_policy(UserName=user_name, PolicyName=policy_name)
|
||||
params = self._convert_to_s3_params(locals().items())
|
||||
response = self.boto3_iam_client.delete_user_policy(**params)
|
||||
log_command_execution(self.iam_endpoint, "IAM Delete User Policy", response, params)
|
||||
return response
|
||||
|
||||
@reporter.step("Removes the specified managed policy from the specified IAM group")
|
||||
@report_error
|
||||
def iam_detach_group_policy(self, group_name: str, policy_arn: str) -> dict:
|
||||
response = self.boto3_iam_client.detach_group_policy(GroupName=group_name, PolicyArn=policy_arn)
|
||||
params = self._convert_to_s3_params(locals().items())
|
||||
response = self.boto3_iam_client.detach_group_policy(**params)
|
||||
log_command_execution(self.iam_endpoint, "IAM Detach Group Policy", response, params)
|
||||
sleep(S3_SYNC_WAIT_TIME * 10)
|
||||
return response
|
||||
|
||||
@reporter.step("Removes the specified managed policy from the specified user")
|
||||
@report_error
|
||||
def iam_detach_user_policy(self, user_name: str, policy_arn: str) -> dict:
|
||||
response = self.boto3_iam_client.detach_user_policy(UserName=user_name, PolicyArn=policy_arn)
|
||||
params = self._convert_to_s3_params(locals().items())
|
||||
response = self.boto3_iam_client.detach_user_policy(**params)
|
||||
log_command_execution(self.iam_endpoint, "IAM Detach User Policy", response, params)
|
||||
sleep(S3_SYNC_WAIT_TIME * 10)
|
||||
return response
|
||||
|
||||
@reporter.step("Returns a list of IAM users that are in the specified IAM group")
|
||||
@report_error
|
||||
def iam_get_group(self, group_name: str) -> dict:
|
||||
response = self.boto3_iam_client.get_group(GroupName=group_name)
|
||||
log_command_execution(self.iam_endpoint, "IAM Get Group", response, {"GroupName": group_name})
|
||||
assert response.get("Group").get("GroupName") == group_name, f"GroupName should be equal to {group_name}"
|
||||
|
||||
return response
|
||||
|
||||
@reporter.step("Retrieves the specified inline policy document that is embedded in the specified IAM group")
|
||||
@report_error
|
||||
def iam_get_group_policy(self, group_name: str, policy_name: str) -> dict:
|
||||
response = self.boto3_iam_client.get_group_policy(GroupName=group_name, PolicyName=policy_name)
|
||||
|
||||
params = self._convert_to_s3_params(locals().items())
|
||||
response = self.boto3_iam_client.get_group_policy(**params)
|
||||
log_command_execution(self.iam_endpoint, "IAM Get Group Policy", response, params)
|
||||
return response
|
||||
|
||||
@reporter.step("Retrieves information about the specified managed policy")
|
||||
@report_error
|
||||
def iam_get_policy(self, policy_arn: str) -> dict:
|
||||
response = self.boto3_iam_client.get_policy(PolicyArn=policy_arn)
|
||||
log_command_execution(self.iam_endpoint, "IAM Get Policy", response, {"PolicyArn": policy_arn})
|
||||
|
||||
assert response.get("Policy"), f"Expected Policy in response:\n{response}"
|
||||
assert response["Policy"].get("Arn") == policy_arn, f"PolicyArn should be equal to {policy_arn}"
|
||||
|
||||
return response
|
||||
|
||||
@reporter.step("Retrieves information about the specified version of the specified managed policy")
|
||||
@report_error
|
||||
def iam_get_policy_version(self, policy_arn: str, version_id: str) -> dict:
|
||||
response = self.boto3_iam_client.get_policy_version(PolicyArn=policy_arn, VersionId=version_id)
|
||||
params = self._convert_to_s3_params(locals().items())
|
||||
response = self.boto3_iam_client.get_policy_version(**params)
|
||||
log_command_execution(self.iam_endpoint, "IAM Get Policy Version", response, params)
|
||||
|
||||
assert response.get("PolicyVersion"), f"Expected PolicyVersion in response:\n{response}"
|
||||
assert response["PolicyVersion"].get("VersionId") == version_id, f"VersionId should be equal to {version_id}"
|
||||
|
||||
return response
|
||||
|
||||
@reporter.step("Retrieves information about the specified IAM user")
|
||||
@report_error
|
||||
def iam_get_user(self, user_name: str) -> dict:
|
||||
response = self.boto3_iam_client.get_user(UserName=user_name)
|
||||
log_command_execution(self.iam_endpoint, "IAM Get User", response, {"UserName": user_name})
|
||||
|
||||
assert response.get("User"), f"Expected User in response:\n{response}"
|
||||
assert response["User"].get("UserName") == user_name, f"UserName should be equal to {user_name}"
|
||||
|
||||
return response
|
||||
|
||||
@reporter.step("Retrieves the specified inline policy document that is embedded in the specified IAM user")
|
||||
@report_error
|
||||
def iam_get_user_policy(self, user_name: str, policy_name: str) -> dict:
|
||||
response = self.boto3_iam_client.get_user_policy(UserName=user_name, PolicyName=policy_name)
|
||||
params = self._convert_to_s3_params(locals().items())
|
||||
response = self.boto3_iam_client.get_user_policy(**params)
|
||||
log_command_execution(self.iam_endpoint, "IAM Get User Policy", response, params)
|
||||
assert response.get("UserName"), f"Expected UserName in response:\n{response}"
|
||||
|
||||
return response
|
||||
|
||||
@reporter.step("Returns information about the access key IDs associated with the specified IAM user")
|
||||
@report_error
|
||||
def iam_list_access_keys(self, user_name: str) -> dict:
|
||||
response = self.boto3_iam_client.list_access_keys(UserName=user_name)
|
||||
|
||||
log_command_execution(self.iam_endpoint, "IAM List Access Keys", response, {"UserName": user_name})
|
||||
return response
|
||||
|
||||
@reporter.step("Lists all managed policies that are attached to the specified IAM group")
|
||||
@report_error
|
||||
def iam_list_attached_group_policies(self, group_name: str) -> dict:
|
||||
response = self.boto3_iam_client.list_attached_group_policies(GroupName=group_name)
|
||||
log_command_execution(self.iam_endpoint, "IAM List Attached Group Policies", response, {"GroupName": group_name})
|
||||
assert response.get("AttachedPolicies"), f"Expected AttachedPolicies in response:\n{response}"
|
||||
|
||||
return response
|
||||
|
||||
@reporter.step("Lists all managed policies that are attached to the specified IAM user")
|
||||
@report_error
|
||||
def iam_list_attached_user_policies(self, user_name: str) -> dict:
|
||||
response = self.boto3_iam_client.list_attached_user_policies(UserName=user_name)
|
||||
log_command_execution(self.iam_endpoint, "IAM List Attached User Policies", response, {"UserName": user_name})
|
||||
assert response.get("AttachedPolicies"), f"Expected AttachedPolicies in response:\n{response}"
|
||||
|
||||
return response
|
||||
|
||||
@reporter.step("Lists all IAM users, groups, and roles that the specified managed policy is attached to")
|
||||
@report_error
|
||||
def iam_list_entities_for_policy(self, policy_arn: str) -> dict:
|
||||
response = self.boto3_iam_client.list_entities_for_policy(PolicyArn=policy_arn)
|
||||
log_command_execution(self.iam_endpoint, "IAM List Entities For Policy", response, {"PolicyArn": policy_arn})
|
||||
|
||||
assert response.get("PolicyGroups"), f"Expected PolicyGroups in response:\n{response}"
|
||||
assert response.get("PolicyUsers"), f"Expected PolicyUsers in response:\n{response}"
|
||||
|
@ -854,98 +923,125 @@ class Boto3ClientWrapper(S3ClientWrapper):
|
|||
return response
|
||||
|
||||
@reporter.step("Lists the names of the inline policies that are embedded in the specified IAM group")
|
||||
@report_error
|
||||
def iam_list_group_policies(self, group_name: str) -> dict:
|
||||
response = self.boto3_iam_client.list_group_policies(GroupName=group_name)
|
||||
log_command_execution(self.iam_endpoint, "IAM List Group Policies", response, {"GroupName": group_name})
|
||||
assert response.get("PolicyNames"), f"Expected PolicyNames in response:\n{response}"
|
||||
|
||||
return response
|
||||
|
||||
@reporter.step("Lists the IAM groups")
|
||||
@report_error
|
||||
def iam_list_groups(self) -> dict:
|
||||
response = self.boto3_iam_client.list_groups()
|
||||
log_command_execution(self.iam_endpoint, "IAM List Groups", response)
|
||||
assert response.get("Groups"), f"Expected Groups in response:\n{response}"
|
||||
|
||||
return response
|
||||
|
||||
@reporter.step("Lists the IAM groups that the specified IAM user belongs to")
|
||||
@report_error
|
||||
def iam_list_groups_for_user(self, user_name: str) -> dict:
|
||||
response = self.boto3_iam_client.list_groups_for_user(UserName=user_name)
|
||||
log_command_execution(self.iam_endpoint, "IAM List Groups For User", response, {"UserName": user_name})
|
||||
assert response.get("Groups"), f"Expected Groups in response:\n{response}"
|
||||
|
||||
return response
|
||||
|
||||
@reporter.step("Lists all the managed policies that are available in your AWS account")
|
||||
@report_error
|
||||
def iam_list_policies(self) -> dict:
|
||||
response = self.boto3_iam_client.list_policies()
|
||||
log_command_execution(self.iam_endpoint, "IAM List Policies", response)
|
||||
assert response.get("Policies"), f"Expected Policies in response:\n{response}"
|
||||
|
||||
return response
|
||||
|
||||
@reporter.step("Lists information about the versions of the specified managed policy")
|
||||
@report_error
|
||||
def iam_list_policy_versions(self, policy_arn: str) -> dict:
|
||||
response = self.boto3_iam_client.list_policy_versions(PolicyArn=policy_arn)
|
||||
log_command_execution(self.iam_endpoint, "IAM List Policy Versions", response, {"PolicyArn": policy_arn})
|
||||
assert response.get("Versions"), f"Expected Versions in response:\n{response}"
|
||||
|
||||
return response
|
||||
|
||||
@reporter.step("Lists the names of the inline policies embedded in the specified IAM user")
|
||||
@report_error
|
||||
def iam_list_user_policies(self, user_name: str) -> dict:
|
||||
response = self.boto3_iam_client.list_user_policies(UserName=user_name)
|
||||
log_command_execution(self.iam_endpoint, "IAM List User Policies", response, {"UserName": user_name})
|
||||
assert response.get("PolicyNames"), f"Expected PolicyNames in response:\n{response}"
|
||||
|
||||
return response
|
||||
|
||||
@reporter.step("Lists the IAM users")
|
||||
@report_error
|
||||
def iam_list_users(self) -> dict:
|
||||
response = self.boto3_iam_client.list_users()
|
||||
log_command_execution(self.iam_endpoint, "IAM List Users", response)
|
||||
assert response.get("Users"), f"Expected Users in response:\n{response}"
|
||||
|
||||
return response
|
||||
|
||||
@reporter.step("Adds or updates an inline policy document that is embedded in the specified IAM group")
|
||||
@report_error
|
||||
def iam_put_group_policy(self, group_name: str, policy_name: str, policy_document: dict) -> dict:
|
||||
response = self.boto3_iam_client.put_group_policy(
|
||||
GroupName=group_name, PolicyName=policy_name, PolicyDocument=json.dumps(policy_document)
|
||||
)
|
||||
params = self._convert_to_s3_params(locals().items())
|
||||
params["PolicyDocument"] = json.dumps(policy_document)
|
||||
response = self.boto3_iam_client.put_group_policy(**params)
|
||||
log_command_execution(self.iam_endpoint, "IAM Put Group Policy", response, params)
|
||||
sleep(S3_SYNC_WAIT_TIME * 10)
|
||||
return response
|
||||
|
||||
@reporter.step("Adds or updates an inline policy document that is embedded in the specified IAM user")
|
||||
@report_error
|
||||
def iam_put_user_policy(self, user_name: str, policy_name: str, policy_document: dict) -> dict:
|
||||
response = self.boto3_iam_client.put_user_policy(
|
||||
UserName=user_name, PolicyName=policy_name, PolicyDocument=json.dumps(policy_document)
|
||||
)
|
||||
params = self._convert_to_s3_params(locals().items())
|
||||
params["PolicyDocument"] = json.dumps(policy_document)
|
||||
response = self.boto3_iam_client.put_user_policy(**params)
|
||||
log_command_execution(self.iam_endpoint, "IAM Put User Policy", response, params)
|
||||
sleep(S3_SYNC_WAIT_TIME * 10)
|
||||
return response
|
||||
|
||||
@reporter.step("Removes the specified user from the specified group")
|
||||
@report_error
|
||||
def iam_remove_user_from_group(self, group_name: str, user_name: str) -> dict:
|
||||
response = self.boto3_iam_client.remove_user_from_group(GroupName=group_name, UserName=user_name)
|
||||
params = self._convert_to_s3_params(locals().items())
|
||||
response = self.boto3_iam_client.remove_user_from_group(**params)
|
||||
log_command_execution(self.iam_endpoint, "IAM Remove User From Group", response, params)
|
||||
return response
|
||||
|
||||
@reporter.step("Updates the name and/or the path of the specified IAM group")
|
||||
@report_error
|
||||
def iam_update_group(self, group_name: str, new_name: str, new_path: Optional[str] = None) -> dict:
|
||||
response = self.boto3_iam_client.update_group(GroupName=group_name, NewGroupName=new_name, NewPath="/")
|
||||
|
||||
params = {"GroupName": group_name, "NewGroupName": new_name, "NewPath": "/"}
|
||||
response = self.boto3_iam_client.update_group(**params)
|
||||
log_command_execution(self.iam_endpoint, "IAM Update Group", response, params)
|
||||
return response
|
||||
|
||||
@reporter.step("Updates the name and/or the path of the specified IAM user")
|
||||
@report_error
|
||||
def iam_update_user(self, user_name: str, new_name: str, new_path: Optional[str] = None) -> dict:
|
||||
response = self.boto3_iam_client.update_user(UserName=user_name, NewUserName=new_name, NewPath="/")
|
||||
params = {"UserName": user_name, "NewUserName": new_name, "NewPath": "/"}
|
||||
response = self.boto3_iam_client.update_user(**params)
|
||||
log_command_execution(self.iam_endpoint, "IAM Update User", response, params)
|
||||
return response
|
||||
|
||||
@reporter.step("Adds one or more tags to an IAM user")
|
||||
@report_error
|
||||
def iam_tag_user(self, user_name: str, tags: list) -> dict:
|
||||
tags_json = [{"Key": tag_key, "Value": tag_value} for tag_key, tag_value in tags]
|
||||
response = self.boto3_iam_client.tag_user(UserName=user_name, Tags=tags_json)
|
||||
params = self._convert_to_s3_params(locals().items())
|
||||
params["Tags"] = [{"Key": tag_key, "Value": tag_value} for tag_key, tag_value in tags]
|
||||
response = self.boto3_iam_client.tag_user(**params)
|
||||
log_command_execution(self.iam_endpoint, "IAM Tag User", response, params)
|
||||
return response
|
||||
|
||||
@reporter.step("List tags of IAM user")
|
||||
@report_error
|
||||
def iam_list_user_tags(self, user_name: str) -> dict:
|
||||
response = self.boto3_iam_client.list_user_tags(UserName=user_name)
|
||||
log_command_execution(self.iam_endpoint, "IAM List User Tags", response, {"UserName": user_name})
|
||||
return response
|
||||
|
||||
@reporter.step("Removes the specified tags from the user")
|
||||
@report_error
|
||||
def iam_untag_user(self, user_name: str, tag_keys: list) -> dict:
|
||||
response = self.boto3_iam_client.untag_user(UserName=user_name, TagKeys=tag_keys)
|
||||
params = self._convert_to_s3_params(locals().items())
|
||||
response = self.boto3_iam_client.untag_user(**params)
|
||||
log_command_execution(self.iam_endpoint, "IAM Untag User", response, params)
|
||||
return response
|
||||
|
|
Loading…
Reference in a new issue