rgw/sts: adding test to check for aws:RequestTag

in the condition element of a role's trust policy.

Signed-off-by: Pritha Srivastava <prsrivas@redhat.com>
This commit is contained in:
Pritha Srivastava 2021-07-15 13:33:08 +05:30
parent d466b7bd09
commit 64068d7bf9
3 changed files with 60 additions and 2 deletions

View file

@ -102,6 +102,8 @@ sub=<obtained after introspecting token>
azp=<obtained after introspecting token> azp=<obtained after introspecting token>
user_token=<access token for a user, with attribute Department=[Engineering, Marketing>]
thumbprint=<obtained from x509 certificate> thumbprint=<obtained from x509 certificate>
KC_REALM=<name of the realm> KC_REALM=<name of the realm>

View file

@ -296,6 +296,7 @@ def check_webidentity():
config.webidentity_realm = cfg.get('webidentity', "KC_REALM") config.webidentity_realm = cfg.get('webidentity', "KC_REALM")
config.webidentity_sub = cfg.get('webidentity', "sub") config.webidentity_sub = cfg.get('webidentity', "sub")
config.webidentity_azp = cfg.get('webidentity', "azp") config.webidentity_azp = cfg.get('webidentity', "azp")
config.webidentity_user_token = cfg.get('webidentity', "user_token")
def get_client(client_config=None): def get_client(client_config=None):
if client_config == None: if client_config == None:
@ -594,3 +595,6 @@ def get_iam_access_key():
def get_iam_secret_key(): def get_iam_secret_key():
return config.iam_secret_key return config.iam_secret_key
def get_user_token():
return config.webidentity_user_token

View file

@ -49,7 +49,8 @@ from . import(
get_iam_access_key, get_iam_access_key,
get_iam_secret_key, get_iam_secret_key,
get_sub, get_sub,
get_azp get_azp,
get_user_token
) )
log = logging.getLogger(__name__) log = logging.getLogger(__name__)
@ -1420,4 +1421,55 @@ def test_assume_role_with_web_identity_with_azp():
oidc_remove=iam_client.delete_open_id_connect_provider( oidc_remove=iam_client.delete_open_id_connect_provider(
OpenIDConnectProviderArn=oidc_response["OpenIDConnectProviderArn"] OpenIDConnectProviderArn=oidc_response["OpenIDConnectProviderArn"]
) )
@attr(resource='assume role with web identity')
@attr(method='get')
@attr(operation='check')
@attr(assertion='assuming role using web token using aws:RequestTag in trust policy')
@attr('webidentity_test')
@attr('token_request_tag_trust_policy_test')
def test_assume_role_with_web_identity_with_request_tag():
check_webidentity()
iam_client=get_iam_client()
sts_client=get_sts_client()
default_endpoint=get_config_endpoint()
role_session_name=get_parameter_name()
thumbprint=get_thumbprint()
user_token=get_user_token()
realm=get_realm_name()
oidc_response = iam_client.create_open_id_connect_provider(
Url='http://localhost:8080/auth/realms/{}'.format(realm),
ThumbprintList=[
thumbprint,
],
)
policy_document = "{\"Version\":\"2012-10-17\",\"Statement\":[{\"Effect\":\"Allow\",\"Principal\":{\"Federated\":[\""+oidc_response["OpenIDConnectProviderArn"]+"\"]},\"Action\":[\"sts:AssumeRoleWithWebIdentity\",\"sts:TagSession\"],\"Condition\":{\"StringEquals\":{\"aws:RequestTag/Department\":\"Engineering\"}}}]}"
(role_error,role_response,general_role_name)=create_role(iam_client,'/',None,policy_document,None,None,None)
eq(role_response['Role']['Arn'],'arn:aws:iam:::role/'+general_role_name+'')
role_policy = "{\"Version\":\"2012-10-17\",\"Statement\":{\"Effect\":\"Allow\",\"Action\":\"s3:*\",\"Resource\":\"arn:aws:s3:::*\"}}"
(role_err,response)=put_role_policy(iam_client,general_role_name,None,role_policy)
eq(response['ResponseMetadata']['HTTPStatusCode'],200)
resp=sts_client.assume_role_with_web_identity(RoleArn=role_response['Role']['Arn'],RoleSessionName=role_session_name,WebIdentityToken=user_token)
eq(resp['ResponseMetadata']['HTTPStatusCode'],200)
s3_client = boto3.client('s3',
aws_access_key_id = resp['Credentials']['AccessKeyId'],
aws_secret_access_key = resp['Credentials']['SecretAccessKey'],
aws_session_token = resp['Credentials']['SessionToken'],
endpoint_url=default_endpoint,
region_name='',
)
bucket_name = get_new_bucket_name()
s3bucket = s3_client.create_bucket(Bucket=bucket_name)
eq(s3bucket['ResponseMetadata']['HTTPStatusCode'],200)
bkt = s3_client.delete_bucket(Bucket=bucket_name)
eq(bkt['ResponseMetadata']['HTTPStatusCode'],204)
oidc_remove=iam_client.delete_open_id_connect_provider(
OpenIDConnectProviderArn=oidc_response["OpenIDConnectProviderArn"]
)