rgw/sts: test to use role tag as iam:ResourceTag in

role's trust policy.

Signed-off-by: Pritha Srivastava <prsrivas@redhat.com>
This commit is contained in:
Pritha Srivastava 2021-07-23 12:08:07 +05:30
parent 5dcc3dd689
commit 71266fede9

View file

@ -55,12 +55,14 @@ from . import(
log = logging.getLogger(__name__) log = logging.getLogger(__name__)
def create_role(iam_client,path,rolename,policy_document,description,sessionduration,permissionboundary): def create_role(iam_client,path,rolename,policy_document,description,sessionduration,permissionboundary,tag_list=None):
role_err=None role_err=None
if rolename is None: if rolename is None:
rolename=get_parameter_name() rolename=get_parameter_name()
if tag_list is None:
tag_list = []
try: try:
role_response = iam_client.create_role(Path=path,RoleName=rolename,AssumeRolePolicyDocument=policy_document,) role_response = iam_client.create_role(Path=path,RoleName=rolename,AssumeRolePolicyDocument=policy_document,Tags=tag_list)
except ClientError as e: except ClientError as e:
role_err = e.response['Code'] role_err = e.response['Code']
return (role_err,role_response,rolename) return (role_err,role_response,rolename)
@ -2100,3 +2102,70 @@ def test_assume_role_with_web_identity_resource_tag_copy_obj():
OpenIDConnectProviderArn=oidc_response["OpenIDConnectProviderArn"] OpenIDConnectProviderArn=oidc_response["OpenIDConnectProviderArn"]
) )
@attr(resource='assume role with web identity')
@attr(method='put')
@attr(operation='check')
@attr(assertion='assuming role using web token with iam:ResourceTag in role trust policy')
@attr('webidentity_test')
@attr('abac_test')
@attr('token_role_tags_test')
def test_assume_role_with_web_identity_role_resource_tag():
check_webidentity()
iam_client=get_iam_client()
sts_client=get_sts_client()
default_endpoint=get_config_endpoint()
role_session_name=get_parameter_name()
thumbprint=get_thumbprint()
user_token=get_user_token()
realm=get_realm_name()
s3_res_iam_creds = get_s3_resource_using_iam_creds()
s3_client_iam_creds = s3_res_iam_creds.meta.client
bucket_name = get_new_bucket_name()
s3bucket = s3_client_iam_creds.create_bucket(Bucket=bucket_name)
eq(s3bucket['ResponseMetadata']['HTTPStatusCode'],200)
bucket_tagging = s3_res_iam_creds.BucketTagging(bucket_name)
Set_Tag = bucket_tagging.put(Tagging={'TagSet':[{'Key':'Department', 'Value': 'Engineering'},{'Key':'Department', 'Value': 'Marketing'}]})
oidc_response = iam_client.create_open_id_connect_provider(
Url='http://localhost:8080/auth/realms/{}'.format(realm),
ThumbprintList=[
thumbprint,
],
)
#iam:ResourceTag refers to the tag attached to role, hence the role is allowed to be assumed only when it has a tag matching the policy.
policy_document = "{\"Version\":\"2012-10-17\",\"Statement\":[{\"Effect\":\"Allow\",\"Principal\":{\"Federated\":[\""+oidc_response["OpenIDConnectProviderArn"]+"\"]},\"Action\":[\"sts:AssumeRoleWithWebIdentity\",\"sts:TagSession\"],\"Condition\":{\"StringEquals\":{\"iam:ResourceTag/Department\":\"Engineering\"}}}]}"
tags_list = [
{'Key':'Department','Value':'Engineering'},
{'Key':'Department','Value':'Marketing'}
]
(role_error,role_response,general_role_name)=create_role(iam_client,'/',None,policy_document,None,None,None,tags_list)
eq(role_response['Role']['Arn'],'arn:aws:iam:::role/'+general_role_name+'')
role_policy = "{\"Version\":\"2012-10-17\",\"Statement\":{\"Effect\":\"Allow\",\"Action\":\"s3:*\",\"Resource\":\"arn:aws:s3:::*\",\"Condition\":{\"StringEquals\":{\"s3:ResourceTag/Department\":[\"Engineering\"]}}}}"
(role_err,response)=put_role_policy(iam_client,general_role_name,None,role_policy)
eq(response['ResponseMetadata']['HTTPStatusCode'],200)
resp=sts_client.assume_role_with_web_identity(RoleArn=role_response['Role']['Arn'],RoleSessionName=role_session_name,WebIdentityToken=user_token)
eq(resp['ResponseMetadata']['HTTPStatusCode'],200)
s3_client = boto3.client('s3',
aws_access_key_id = resp['Credentials']['AccessKeyId'],
aws_secret_access_key = resp['Credentials']['SecretAccessKey'],
aws_session_token = resp['Credentials']['SessionToken'],
endpoint_url=default_endpoint,
region_name='',
)
bucket_body = 'this is a test file'
s3_put_obj = s3_client.put_object(Body=bucket_body, Bucket=bucket_name, Key="test-1.txt")
eq(s3_put_obj['ResponseMetadata']['HTTPStatusCode'],200)
oidc_remove=iam_client.delete_open_id_connect_provider(
OpenIDConnectProviderArn=oidc_response["OpenIDConnectProviderArn"]
)