[#307] added methods for testing MFA #307

Merged
r.chernykh merged 1 commit from r.chernykh/frostfs-testlib:feature--10059-MFA into master 2024-11-18 07:08:44 +00:00
4 changed files with 195 additions and 2 deletions
Showing only changes of commit 2089578f94 - Show all commits

View file

@ -1440,3 +1440,90 @@ class AwsCliClient(S3ClientWrapper):
response = self._to_json(output) response = self._to_json(output)
return response return response
# MFA METHODS
@reporter.step("Creates a new virtual MFA device")
def iam_create_virtual_mfa_device(self, virtual_mfa_device_name: str, outfile: str, bootstrap_method: str) -> tuple:
cmd = f"aws {self.common_flags} iam create-virtual-mfa-device --virtual-mfa-device-name {virtual_mfa_device_name}\
--outfile {outfile} --bootstrap-method {bootstrap_method} --endpoint {self.iam_endpoint}"
if self.profile:
cmd += f" --profile {self.profile}"
output = self.local_shell.exec(cmd).stdout
response = self._to_json(output)
serial_number = response.get("VirtualMFADevice", {}).get("SerialNumber")
assert serial_number, f"Expected SerialNumber in response:\n{response}"
return serial_number, False
r.chernykh marked this conversation as resolved Outdated

Why did you return tuple [serial_number, False]?
if the second argument is required than change function typing to tuple[str, bool]

Why did you return tuple [serial_number, False]? if the second argument is required than change function typing to tuple[str, bool]
@reporter.step("Deactivates the specified MFA device and removes it from association with the user name")
def iam_deactivate_mfa_device(self, user_name: str, serial_number: str) -> dict:
cmd = f"aws {self.common_flags} iam deactivate-mfa-device --user-name {user_name} --serial-number {serial_number} --endpoint {self.iam_endpoint}"
if self.profile:
cmd += f" --profile {self.profile}"
output = self.local_shell.exec(cmd).stdout
response = self._to_json(output)
return response
@reporter.step("Deletes a virtual MFA device")
def iam_delete_virtual_mfa_device(self, serial_number: str) -> dict:
cmd = f"aws {self.common_flags} iam delete-virtual-mfa-device --serial-number {serial_number} --endpoint {self.iam_endpoint}"
if self.profile:
cmd += f" --profile {self.profile}"
output = self.local_shell.exec(cmd).stdout
response = self._to_json(output)
return response
@reporter.step("Enables the specified MFA device and associates it with the specified IAM user")
def iam_enable_mfa_device(self, user_name: str, serial_number: str, authentication_code1: str, authentication_code2: str) -> dict:
cmd = f"aws {self.common_flags} iam enable-mfa-device --user-name {user_name} --serial-number {serial_number} --authentication-code1 {authentication_code1}\
--authentication-code2 {authentication_code2} --endpoint {self.iam_endpoint}"
if self.profile:
cmd += f" --profile {self.profile}"
output = self.local_shell.exec(cmd).stdout
response = self._to_json(output)
return response
@reporter.step("Lists the MFA devices for an IAM user")
def iam_list_virtual_mfa_devices(self) -> dict:
cmd = f"aws {self.common_flags} iam list-virtual-mfa-devices --endpoint {self.iam_endpoint}"
if self.profile:
cmd += f" --profile {self.profile}"
output = self.local_shell.exec(cmd).stdout
response = self._to_json(output)
assert response.get("VirtualMFADevices"), f"Expected VirtualMFADevices in response:\n{response}"
return response
@reporter.step("Get session token for user")
def sts_get_session_token(
self, duration_seconds: Optional[str] = None, serial_number: Optional[str] = None, token_code: Optional[str] = None
) -> tuple:
r.chernykh marked this conversation as resolved Outdated

fix typing dict -> tuple

fix typing `dict` -> `tuple`
cmd = f"aws {self.common_flags} sts get-session-token --endpoint {self.iam_endpoint}"
if duration_seconds:
cmd += f" --duration-seconds {duration_seconds}"
if serial_number:
cmd += f" --serial-number {serial_number}"
if token_code:
cmd += f" --token-code {token_code}"
if self.profile:
cmd += f" --profile {self.profile}"
output = self.local_shell.exec(cmd).stdout
response = self._to_json(output)
access_key = response.get("Credentials", {}).get("AccessKeyId")
secret_access_key = response.get("Credentials", {}).get("SecretAccessKey")
session_token = response.get("Credentials", {}).get("SessionToken")
assert access_key, f"Expected AccessKeyId in response:\n{response}"
assert secret_access_key, f"Expected SecretAccessKey in response:\n{response}"
assert session_token, f"Expected SessionToken in response:\n{response}"
return access_key, secret_access_key, session_token

View file

@ -41,6 +41,8 @@ class Boto3ClientWrapper(S3ClientWrapper):
self.boto3_iam_client: S3Client = None self.boto3_iam_client: S3Client = None
self.iam_endpoint: str = "" self.iam_endpoint: str = ""
self.boto3_sts_client: S3Client = None
self.access_key_id: str = access_key_id self.access_key_id: str = access_key_id
self.secret_access_key: str = secret_access_key self.secret_access_key: str = secret_access_key
self.profile = profile self.profile = profile
@ -87,6 +89,14 @@ class Boto3ClientWrapper(S3ClientWrapper):
endpoint_url=self.iam_endpoint, endpoint_url=self.iam_endpoint,
verify=False, verify=False,
) )
# since the STS does not have an enpoint, IAM is used
self.boto3_sts_client = self.session.client(
service_name="sts",
aws_access_key_id=self.access_key_id,
aws_secret_access_key=self.secret_access_key,
endpoint_url=iam_endpoint,
verify=False,
)
def _to_s3_param(self, param: str) -> str: def _to_s3_param(self, param: str) -> str:
replacement_map = { replacement_map = {
@ -1265,3 +1275,66 @@ class Boto3ClientWrapper(S3ClientWrapper):
endpoint=self.iam_endpoint, endpoint=self.iam_endpoint,
profile=self.profile, profile=self.profile,
) )
# MFA methods
@reporter.step("Creates a new virtual MFA device")
def iam_create_virtual_mfa_device(
self, virtual_mfa_device_name: str, outfile: Optional[str] = None, bootstrap_method: Optional[str] = None
) -> tuple:
response = self.boto3_iam_client.create_virtual_mfa_device(VirtualMFADeviceName=virtual_mfa_device_name)
serial_number = response.get("VirtualMFADevice", {}).get("SerialNumber")
base32StringSeed = response.get("VirtualMFADevice", {}).get("Base32StringSeed")
assert serial_number, f"Expected SerialNumber in response:\n{response}"
assert base32StringSeed, f"Expected Base32StringSeed in response:\n{response}"
return serial_number, base32StringSeed
@reporter.step("Deactivates the specified MFA device and removes it from association with the user name")
def iam_deactivate_mfa_device(self, user_name: str, serial_number: str) -> dict:
response = self.boto3_iam_client.deactivate_mfa_device(UserName=user_name, SerialNumber=serial_number)
return response
@reporter.step("Deletes a virtual MFA device")
def iam_delete_virtual_mfa_device(self, serial_number: str) -> dict:
response = self.boto3_iam_client.delete_virtual_mfa_device(SerialNumber=serial_number)
return response
@reporter.step("Enables the specified MFA device and associates it with the specified IAM user")
def iam_enable_mfa_device(self, user_name: str, serial_number: str, authentication_code1: str, authentication_code2: str) -> dict:
response = self.boto3_iam_client.enable_mfa_device(
UserName=user_name,
SerialNumber=serial_number,
AuthenticationCode1=authentication_code1,
AuthenticationCode2=authentication_code2,
)
return response
@reporter.step("Lists the MFA devices for an IAM user")
def iam_list_virtual_mfa_devices(self) -> dict:
response = self.boto3_iam_client.list_virtual_mfa_devices()
assert response.get("VirtualMFADevices"), f"Expected VirtualMFADevices in response:\n{response}"
return response
@reporter.step("Get session token for user")
def sts_get_session_token(
self, duration_seconds: Optional[str] = "", serial_number: Optional[str] = "", token_code: Optional[str] = ""
) -> tuple:
response = self.boto3_sts_client.get_session_token(
DurationSeconds=duration_seconds,
SerialNumber=serial_number,
TokenCode=token_code,
)
access_key = response.get("Credentials", {}).get("AccessKeyId")
secret_access_key = response.get("Credentials", {}).get("SecretAccessKey")
session_token = response.get("Credentials", {}).get("SessionToken")
assert access_key, f"Expected AccessKeyId in response:\n{response}"
assert secret_access_key, f"Expected SecretAccessKey in response:\n{response}"
assert session_token, f"Expected SessionToken in response:\n{response}"
return access_key, secret_access_key, session_token

View file

@ -578,3 +578,32 @@ class S3ClientWrapper(HumanReadableABC):
@abstractmethod @abstractmethod
def iam_untag_user(self, user_name: str, tag_keys: list) -> dict: def iam_untag_user(self, user_name: str, tag_keys: list) -> dict:
"""Removes the specified tags from the user""" """Removes the specified tags from the user"""
# MFA methods
@abstractmethod
def iam_create_virtual_mfa_device(
self, virtual_mfa_device_name: str, outfile: Optional[str] = None, bootstrap_method: Optional[str] = None
) -> tuple:
"""Creates a new virtual MFA device"""
@abstractmethod
def iam_deactivate_mfa_device(self, user_name: str, serial_number: str) -> dict:
"""Deactivates the specified MFA device and removes it from association with the user name"""
@abstractmethod
def iam_delete_virtual_mfa_device(self, serial_number: str) -> dict:
"""Deletes a virtual MFA device"""
@abstractmethod
def iam_enable_mfa_device(self, user_name: str, serial_number: str, authentication_code1: str, authentication_code2: str) -> dict:
"""Enables the specified MFA device and associates it with the specified IAM user"""
@abstractmethod
def iam_list_virtual_mfa_devices(self) -> dict:
"""Lists the MFA devices for an IAM user"""
@abstractmethod
def sts_get_session_token(
self, duration_seconds: Optional[str] = None, serial_number: Optional[str] = None, token_code: Optional[str] = None
) -> tuple:
"""Get session token for user"""

View file

@ -45,7 +45,7 @@ def ensure_directory_opener(path, flags):
# TODO: Do not add {size} to title yet, since it produces dynamic info in top level steps # TODO: Do not add {size} to title yet, since it produces dynamic info in top level steps
# Use object_size dt in future as argument # Use object_size dt in future as argument
@reporter.step("Generate file") @reporter.step("Generate file")
def generate_file(size: int) -> TestFile: def generate_file(size: int, file_name: Optional[str] = None) -> TestFile:
"""Generates a binary file with the specified size in bytes. """Generates a binary file with the specified size in bytes.
Args: Args:
@ -54,7 +54,11 @@ def generate_file(size: int) -> TestFile:
Returns: Returns:
The path to the generated file. The path to the generated file.
""" """
test_file = TestFile(os.path.join(ASSETS_DIR, string_utils.unique_name("object-")))
if file_name is None:
file_name = string_utils.unique_name("object-")
test_file = TestFile(os.path.join(ASSETS_DIR, file_name))
with open(test_file, "wb", opener=ensure_directory_opener) as file: with open(test_file, "wb", opener=ensure_directory_opener) as file:
file.write(os.urandom(size)) file.write(os.urandom(size))
logger.info(f"File with size {size} bytes has been generated: {test_file}") logger.info(f"File with size {size} bytes has been generated: {test_file}")