forked from TrueCloudLab/frostfs-testlib
[] create autotests for MFA
Signed-off-by: Roman Chernykh <r.chernykh@yadro.com>
This commit is contained in:
parent
0caca54e36
commit
577ec5ce27
3 changed files with 190 additions and 0 deletions
|
@ -1256,3 +1256,92 @@ class AwsCliClient(S3ClientWrapper):
|
|||
response = self._to_json(output)
|
||||
|
||||
return response
|
||||
|
||||
# MFA METHODS
|
||||
@reporter.step("Creates a new virtual MFA device")
|
||||
def iam_create_virtual_mfa_device(self, virtual_mfa_device_name: str, outfile: str, bootstrap_method: str) -> dict:
|
||||
cmd = f"aws {self.common_flags} iam create-virtual-mfa-device --virtual-mfa-device-name {virtual_mfa_device_name}\
|
||||
--outfile {outfile} --bootstrap-method {bootstrap_method} --endpoint {self.iam_endpoint}"
|
||||
|
||||
if self.profile:
|
||||
cmd += f" --profile {self.profile}"
|
||||
|
||||
output = self.local_shell.exec(cmd).stdout
|
||||
response = self._to_json(output)
|
||||
serial_number = response.get("VirtualMFADevice", {}).get("SerialNumber")
|
||||
assert serial_number, f"Expected SerialNumber in response:\n{response}"
|
||||
|
||||
return serial_number, False
|
||||
|
||||
@reporter.step("Deactivates the specified MFA device and removes it from association with the user name")
|
||||
def iam_deactivate_mfa_device(self, user_name: str, serial_number: str) -> dict:
|
||||
cmd = f"aws {self.common_flags} iam deactivate-mfa-device --user-name {user_name} --serial-number {serial_number} --endpoint {self.iam_endpoint}"
|
||||
if self.profile:
|
||||
cmd += f" --profile {self.profile}"
|
||||
|
||||
output = self.local_shell.exec(cmd).stdout
|
||||
response = self._to_json(output)
|
||||
|
||||
return response
|
||||
|
||||
@reporter.step("Deletes a virtual MFA device")
|
||||
def iam_delete_virtual_mfa_device(self, serial_number: str) -> dict:
|
||||
cmd = f"aws {self.common_flags} iam delete-virtual-mfa-device --serial-number {serial_number} --endpoint {self.iam_endpoint}"
|
||||
if self.profile:
|
||||
cmd += f" --profile {self.profile}"
|
||||
|
||||
output = self.local_shell.exec(cmd).stdout
|
||||
response = self._to_json(output)
|
||||
|
||||
return response
|
||||
|
||||
@reporter.step("Enables the specified MFA device and associates it with the specified IAM user")
|
||||
def iam_enable_mfa_device(self, user_name: str, serial_number: str, authentication_code1: str, authentication_code2: str) -> dict:
|
||||
cmd = f"aws {self.common_flags} iam enable-mfa-device --user-name {user_name} --serial-number {serial_number} --authentication-code1 {authentication_code1}\
|
||||
--authentication-code2 {authentication_code2} --endpoint {self.iam_endpoint}"
|
||||
if self.profile:
|
||||
cmd += f" --profile {self.profile}"
|
||||
|
||||
output = self.local_shell.exec(cmd).stdout
|
||||
response = self._to_json(output)
|
||||
|
||||
return response
|
||||
|
||||
@reporter.step("Lists the MFA devices for an IAM user")
|
||||
def iam_list_virtual_mfa_devices(self, user_name: Optional[str] = None) -> dict:
|
||||
cmd = f"aws {self.common_flags} iam list-virtual-mfa-devices --endpoint {self.iam_endpoint}"
|
||||
if user_name:
|
||||
cmd += f" --user-name {user_name}"
|
||||
if self.profile:
|
||||
cmd += f" --profile {self.profile}"
|
||||
|
||||
output = self.local_shell.exec(cmd).stdout
|
||||
response = self._to_json(output)
|
||||
assert response.get("MFADevices"), f"Expected MFADevices in response:\n{response}"
|
||||
|
||||
return response
|
||||
|
||||
@reporter.step("Get session token to user")
|
||||
def sts_get_session_token(
|
||||
self, duration_seconds: Optional[str] = None, serial_number: Optional[str] = None, token_code: Optional[str] = None
|
||||
) -> dict:
|
||||
cmd = f"aws {self.common_flags} sts get-session-token --endpoint {self.iam_endpoint}"
|
||||
if duration_seconds:
|
||||
cmd += f" --duration-seconds {duration_seconds}"
|
||||
if serial_number:
|
||||
cmd += f" --serial-number {serial_number}"
|
||||
if token_code:
|
||||
cmd += f" --token-code {token_code}"
|
||||
if self.profile:
|
||||
cmd += f" --profile {self.profile}"
|
||||
|
||||
output = self.local_shell.exec(cmd).stdout
|
||||
response = self._to_json(output)
|
||||
access_key = response.get("Credentials", {}).get("AccessKeyId")
|
||||
secret_access_key = response.get("Credentials", {}).get("SecretAccessKey")
|
||||
session_token = response.get("Credentials", {}).get("SessionToken")
|
||||
assert access_key, f"Expected AccessKeyId in response:\n{response}"
|
||||
assert secret_access_key, f"Expected SecretAccessKey in response:\n{response}"
|
||||
assert session_token, f"Expected SessionToken in response:\n{response}"
|
||||
|
||||
return access_key, secret_access_key, session_token
|
||||
|
|
|
@ -69,6 +69,7 @@ class Boto3ClientWrapper(S3ClientWrapper):
|
|||
self.secret_access_key: str = secret_access_key
|
||||
self.s3gate_endpoint: str = ""
|
||||
self.boto3_iam_client: S3Client = None
|
||||
self.boto3_sts_client: S3Client = None
|
||||
self.set_endpoint(s3gate_endpoint)
|
||||
|
||||
@reporter.step("Set endpoint S3 to {s3gate_endpoint}")
|
||||
|
@ -97,6 +98,14 @@ class Boto3ClientWrapper(S3ClientWrapper):
|
|||
endpoint_url=iam_endpoint,
|
||||
verify=False,
|
||||
)
|
||||
# since the STS does not have an enpoint, IAM is used
|
||||
self.boto3_sts_client = self.session.client(
|
||||
service_name="sts",
|
||||
aws_access_key_id=self.access_key_id,
|
||||
aws_secret_access_key=self.secret_access_key,
|
||||
endpoint_url=iam_endpoint,
|
||||
verify=False,
|
||||
)
|
||||
|
||||
def _to_s3_param(self, param: str) -> str:
|
||||
replacement_map = {
|
||||
|
@ -928,3 +937,66 @@ class Boto3ClientWrapper(S3ClientWrapper):
|
|||
def iam_untag_user(self, user_name: str, tag_keys: list) -> dict:
|
||||
response = self.boto3_iam_client.untag_user(UserName=user_name, TagKeys=tag_keys)
|
||||
return response
|
||||
|
||||
# MFA methods
|
||||
@reporter.step("Creates a new virtual MFA device")
|
||||
def iam_create_virtual_mfa_device(
|
||||
self, virtual_mfa_device_name: str, outfile: Optional[str] = None, bootstrap_method: Optional[str] = None
|
||||
) -> dict:
|
||||
response = self.boto3_iam_client.create_virtual_mfa_device(VirtualMFADeviceName=virtual_mfa_device_name)
|
||||
|
||||
serial_number = response.get("VirtualMFADevice", {}).get("SerialNumber")
|
||||
base32StringSeed = response.get("VirtualMFADevice", {}).get("Base32StringSeed")
|
||||
assert serial_number, f"Expected SerialNumber in response:\n{response}"
|
||||
assert base32StringSeed, f"Expected Base32StringSeed in response:\n{response}"
|
||||
|
||||
return serial_number, base32StringSeed
|
||||
|
||||
@reporter.step("Deactivates the specified MFA device and removes it from association with the user name")
|
||||
def iam_deactivate_mfa_device(self, user_name: str, serial_number: str) -> dict:
|
||||
response = self.boto3_iam_client.deactivate_mfa_device(UserName=user_name, SerialNumber=serial_number)
|
||||
|
||||
return response
|
||||
|
||||
@reporter.step("Deletes a virtual MFA device")
|
||||
def iam_delete_virtual_mfa_device(self, serial_number: str) -> dict:
|
||||
response = self.boto3_iam_client.delete_virtual_mfa_device(SerialNumber=serial_number)
|
||||
|
||||
return response
|
||||
|
||||
@reporter.step("Enables the specified MFA device and associates it with the specified IAM user")
|
||||
def iam_enable_mfa_device(self, user_name: str, serial_number: str, authentication_code1: str, authentication_code2: str) -> dict:
|
||||
response = self.boto3_iam_client.enable_mfa_device(
|
||||
UserName=user_name,
|
||||
SerialNumber=serial_number,
|
||||
AuthenticationCode1=authentication_code1,
|
||||
AuthenticationCode2=authentication_code2,
|
||||
)
|
||||
|
||||
return response
|
||||
|
||||
@reporter.step("Lists the MFA devices for an IAM user")
|
||||
def iam_list_virtual_mfa_devices(self, user_name: Optional[str] = None) -> dict:
|
||||
response = self.boto3_iam_client.list_virtual_mfa_devices(UserName="/")
|
||||
assert response.get("MFADevices"), f"Expected MFADevices in response:\n{response}"
|
||||
|
||||
return response
|
||||
|
||||
@reporter.step("Get session token to user")
|
||||
def sts_get_session_token(
|
||||
self, duration_seconds: Optional[str] = "", serial_number: Optional[str] = "", token_code: Optional[str] = ""
|
||||
) -> dict:
|
||||
response = self.boto3_sts_client.get_session_token(
|
||||
DurationSeconds=duration_seconds,
|
||||
SerialNumber=serial_number,
|
||||
TokenCode=token_code,
|
||||
)
|
||||
|
||||
access_key = response.get("Credentials", {}).get("AccessKeyId")
|
||||
secret_access_key = response.get("Credentials", {}).get("SecretAccessKey")
|
||||
session_token = response.get("Credentials", {}).get("SessionToken")
|
||||
assert access_key, f"Expected AccessKeyId in response:\n{response}"
|
||||
assert secret_access_key, f"Expected SecretAccessKey in response:\n{response}"
|
||||
assert session_token, f"Expected SessionToken in response:\n{response}"
|
||||
|
||||
return access_key, secret_access_key, session_token
|
||||
|
|
|
@ -562,3 +562,32 @@ class S3ClientWrapper(HumanReadableABC):
|
|||
@abstractmethod
|
||||
def iam_untag_user(self, user_name: str, tag_keys: list) -> dict:
|
||||
"""Removes the specified tags from the user"""
|
||||
|
||||
# MFA methods
|
||||
@abstractmethod
|
||||
def iam_create_virtual_mfa_device(
|
||||
self, virtual_mfa_device_name: str, outfile: Optional[str] = None, bootstrap_method: Optional[str] = None
|
||||
) -> dict:
|
||||
"""Creates a new virtual MFA device"""
|
||||
|
||||
@abstractmethod
|
||||
def iam_deactivate_mfa_device(self, user_name: str, serial_number: str) -> dict:
|
||||
"""Deactivates the specified MFA device and removes it from association with the user name"""
|
||||
|
||||
@abstractmethod
|
||||
def iam_delete_virtual_mfa_device(self, serial_number: str) -> dict:
|
||||
"""Deletes a virtual MFA device"""
|
||||
|
||||
@abstractmethod
|
||||
def iam_enable_mfa_device(self, user_name: str, serial_number: str, authentication_code1: str, authentication_code2: str) -> dict:
|
||||
"""Enables the specified MFA device and associates it with the specified IAM user"""
|
||||
|
||||
@abstractmethod
|
||||
def iam_list_virtual_mfa_devices(self, user_name: Optional[str] = None) -> dict:
|
||||
"""Lists the MFA devices for an IAM user"""
|
||||
|
||||
@abstractmethod
|
||||
def sts_get_session_token(
|
||||
self, duration_seconds: Optional[str] = None, serial_number: Optional[str] = None, token_code: Optional[str] = None
|
||||
) -> dict:
|
||||
"""Get session token to user"""
|
||||
|
|
Loading…
Add table
Reference in a new issue