forked from TrueCloudLab/frostfs-testlib
[#307] added methods for testing MFA
This commit is contained in:
parent
451de5e07e
commit
a1953684b8
4 changed files with 195 additions and 2 deletions
|
@ -41,6 +41,8 @@ class Boto3ClientWrapper(S3ClientWrapper):
|
|||
self.boto3_iam_client: S3Client = None
|
||||
self.iam_endpoint: str = ""
|
||||
|
||||
self.boto3_sts_client: S3Client = None
|
||||
|
||||
self.access_key_id: str = access_key_id
|
||||
self.secret_access_key: str = secret_access_key
|
||||
self.profile = profile
|
||||
|
@ -87,6 +89,14 @@ class Boto3ClientWrapper(S3ClientWrapper):
|
|||
endpoint_url=self.iam_endpoint,
|
||||
verify=False,
|
||||
)
|
||||
# since the STS does not have an enpoint, IAM is used
|
||||
self.boto3_sts_client = self.session.client(
|
||||
service_name="sts",
|
||||
aws_access_key_id=self.access_key_id,
|
||||
aws_secret_access_key=self.secret_access_key,
|
||||
endpoint_url=iam_endpoint,
|
||||
verify=False,
|
||||
)
|
||||
|
||||
def _to_s3_param(self, param: str) -> str:
|
||||
replacement_map = {
|
||||
|
@ -1265,3 +1275,66 @@ class Boto3ClientWrapper(S3ClientWrapper):
|
|||
endpoint=self.iam_endpoint,
|
||||
profile=self.profile,
|
||||
)
|
||||
|
||||
# MFA methods
|
||||
@reporter.step("Creates a new virtual MFA device")
|
||||
def iam_create_virtual_mfa_device(
|
||||
self, virtual_mfa_device_name: str, outfile: Optional[str] = None, bootstrap_method: Optional[str] = None
|
||||
) -> tuple:
|
||||
response = self.boto3_iam_client.create_virtual_mfa_device(VirtualMFADeviceName=virtual_mfa_device_name)
|
||||
|
||||
serial_number = response.get("VirtualMFADevice", {}).get("SerialNumber")
|
||||
base32StringSeed = response.get("VirtualMFADevice", {}).get("Base32StringSeed")
|
||||
assert serial_number, f"Expected SerialNumber in response:\n{response}"
|
||||
assert base32StringSeed, f"Expected Base32StringSeed in response:\n{response}"
|
||||
|
||||
return serial_number, base32StringSeed
|
||||
|
||||
@reporter.step("Deactivates the specified MFA device and removes it from association with the user name")
|
||||
def iam_deactivate_mfa_device(self, user_name: str, serial_number: str) -> dict:
|
||||
response = self.boto3_iam_client.deactivate_mfa_device(UserName=user_name, SerialNumber=serial_number)
|
||||
|
||||
return response
|
||||
|
||||
@reporter.step("Deletes a virtual MFA device")
|
||||
def iam_delete_virtual_mfa_device(self, serial_number: str) -> dict:
|
||||
response = self.boto3_iam_client.delete_virtual_mfa_device(SerialNumber=serial_number)
|
||||
|
||||
return response
|
||||
|
||||
@reporter.step("Enables the specified MFA device and associates it with the specified IAM user")
|
||||
def iam_enable_mfa_device(self, user_name: str, serial_number: str, authentication_code1: str, authentication_code2: str) -> dict:
|
||||
response = self.boto3_iam_client.enable_mfa_device(
|
||||
UserName=user_name,
|
||||
SerialNumber=serial_number,
|
||||
AuthenticationCode1=authentication_code1,
|
||||
AuthenticationCode2=authentication_code2,
|
||||
)
|
||||
|
||||
return response
|
||||
|
||||
@reporter.step("Lists the MFA devices for an IAM user")
|
||||
def iam_list_virtual_mfa_devices(self) -> dict:
|
||||
response = self.boto3_iam_client.list_virtual_mfa_devices()
|
||||
assert response.get("VirtualMFADevices"), f"Expected VirtualMFADevices in response:\n{response}"
|
||||
|
||||
return response
|
||||
|
||||
@reporter.step("Get session token for user")
|
||||
def sts_get_session_token(
|
||||
self, duration_seconds: Optional[str] = "", serial_number: Optional[str] = "", token_code: Optional[str] = ""
|
||||
) -> tuple:
|
||||
response = self.boto3_sts_client.get_session_token(
|
||||
DurationSeconds=duration_seconds,
|
||||
SerialNumber=serial_number,
|
||||
TokenCode=token_code,
|
||||
)
|
||||
|
||||
access_key = response.get("Credentials", {}).get("AccessKeyId")
|
||||
secret_access_key = response.get("Credentials", {}).get("SecretAccessKey")
|
||||
session_token = response.get("Credentials", {}).get("SessionToken")
|
||||
assert access_key, f"Expected AccessKeyId in response:\n{response}"
|
||||
assert secret_access_key, f"Expected SecretAccessKey in response:\n{response}"
|
||||
assert session_token, f"Expected SessionToken in response:\n{response}"
|
||||
|
||||
return access_key, secret_access_key, session_token
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue