From d466b7bd09b94159526164430dde2f3d9bbce1ca Mon Sep 17 00:00:00 2001 From: Pritha Srivastava Date: Wed, 14 Jul 2021 15:40:39 +0530 Subject: [PATCH] rgw/sts: adding tests for testing assumerolewithwebidentity using 'sub' and 'azp' fields in the web token. Signed-off-by: Pritha Srivastava --- s3tests.conf.SAMPLE | 4 + s3tests_boto3/functional/__init__.py | 8 ++ s3tests_boto3/functional/test_sts.py | 110 ++++++++++++++++++++++++++- 3 files changed, 121 insertions(+), 1 deletion(-) diff --git a/s3tests.conf.SAMPLE b/s3tests.conf.SAMPLE index 127a79f..b26bdca 100644 --- a/s3tests.conf.SAMPLE +++ b/s3tests.conf.SAMPLE @@ -98,6 +98,10 @@ token= aud= +sub= + +azp= + thumbprint= KC_REALM= diff --git a/s3tests_boto3/functional/__init__.py b/s3tests_boto3/functional/__init__.py index 24db114..7a95670 100644 --- a/s3tests_boto3/functional/__init__.py +++ b/s3tests_boto3/functional/__init__.py @@ -294,6 +294,8 @@ def check_webidentity(): config.webidentity_aud = cfg.get('webidentity', "aud") config.webidentity_token = cfg.get('webidentity', "token") config.webidentity_realm = cfg.get('webidentity', "KC_REALM") + config.webidentity_sub = cfg.get('webidentity', "sub") + config.webidentity_azp = cfg.get('webidentity', "azp") def get_client(client_config=None): if client_config == None: @@ -575,6 +577,12 @@ def get_thumbprint(): def get_aud(): return config.webidentity_aud +def get_sub(): + return config.webidentity_sub + +def get_azp(): + return config.webidentity_azp + def get_token(): return config.webidentity_token diff --git a/s3tests_boto3/functional/test_sts.py b/s3tests_boto3/functional/test_sts.py index bbfb99a..6b2eba0 100644 --- a/s3tests_boto3/functional/test_sts.py +++ b/s3tests_boto3/functional/test_sts.py @@ -47,7 +47,9 @@ from . import( get_realm_name, check_webidentity, get_iam_access_key, - get_iam_secret_key + get_iam_secret_key, + get_sub, + get_azp ) log = logging.getLogger(__name__) @@ -395,6 +397,7 @@ def test_assume_role_allow_head_nonexistent(): @attr(operation='check') @attr(assertion='assuming role through web token') @attr('webidentity_test') +@attr('token_claims_trust_policy_test') def test_assume_role_with_web_identity(): check_webidentity() iam_client=get_iam_client() @@ -1313,3 +1316,108 @@ def test_session_policy_bucket_policy_deny(): oidc_remove=iam_client.delete_open_id_connect_provider( OpenIDConnectProviderArn=oidc_arn ) + +@attr(resource='assume role with web identity') +@attr(method='get') +@attr(operation='check') +@attr(assertion='assuming role using web token using sub in trust policy') +@attr('webidentity_test') +@attr('token_claims_trust_policy_test') +def test_assume_role_with_web_identity_with_sub(): + check_webidentity() + iam_client=get_iam_client() + sts_client=get_sts_client() + default_endpoint=get_config_endpoint() + role_session_name=get_parameter_name() + thumbprint=get_thumbprint() + sub=get_sub() + token=get_token() + realm=get_realm_name() + + oidc_response = iam_client.create_open_id_connect_provider( + Url='http://localhost:8080/auth/realms/{}'.format(realm), + ThumbprintList=[ + thumbprint, + ], + ) + + policy_document = "{\"Version\":\"2012-10-17\",\"Statement\":[{\"Effect\":\"Allow\",\"Principal\":{\"Federated\":[\""+oidc_response["OpenIDConnectProviderArn"]+"\"]},\"Action\":[\"sts:AssumeRoleWithWebIdentity\"],\"Condition\":{\"StringEquals\":{\"localhost:8080/auth/realms/"+realm+":sub\":\""+sub+"\"}}}]}" + (role_error,role_response,general_role_name)=create_role(iam_client,'/',None,policy_document,None,None,None) + eq(role_response['Role']['Arn'],'arn:aws:iam:::role/'+general_role_name+'') + + role_policy = "{\"Version\":\"2012-10-17\",\"Statement\":{\"Effect\":\"Allow\",\"Action\":\"s3:*\",\"Resource\":\"arn:aws:s3:::*\"}}" + (role_err,response)=put_role_policy(iam_client,general_role_name,None,role_policy) + eq(response['ResponseMetadata']['HTTPStatusCode'],200) + + resp=sts_client.assume_role_with_web_identity(RoleArn=role_response['Role']['Arn'],RoleSessionName=role_session_name,WebIdentityToken=token) + eq(resp['ResponseMetadata']['HTTPStatusCode'],200) + + s3_client = boto3.client('s3', + aws_access_key_id = resp['Credentials']['AccessKeyId'], + aws_secret_access_key = resp['Credentials']['SecretAccessKey'], + aws_session_token = resp['Credentials']['SessionToken'], + endpoint_url=default_endpoint, + region_name='', + ) + bucket_name = get_new_bucket_name() + s3bucket = s3_client.create_bucket(Bucket=bucket_name) + eq(s3bucket['ResponseMetadata']['HTTPStatusCode'],200) + bkt = s3_client.delete_bucket(Bucket=bucket_name) + eq(bkt['ResponseMetadata']['HTTPStatusCode'],204) + + oidc_remove=iam_client.delete_open_id_connect_provider( + OpenIDConnectProviderArn=oidc_response["OpenIDConnectProviderArn"] + ) + +@attr(resource='assume role with web identity') +@attr(method='get') +@attr(operation='check') +@attr(assertion='assuming role using web token using azp in trust policy') +@attr('webidentity_test') +@attr('token_claims_trust_policy_test') +def test_assume_role_with_web_identity_with_azp(): + check_webidentity() + iam_client=get_iam_client() + sts_client=get_sts_client() + default_endpoint=get_config_endpoint() + role_session_name=get_parameter_name() + thumbprint=get_thumbprint() + azp=get_azp() + token=get_token() + realm=get_realm_name() + + oidc_response = iam_client.create_open_id_connect_provider( + Url='http://localhost:8080/auth/realms/{}'.format(realm), + ThumbprintList=[ + thumbprint, + ], + ) + + policy_document = "{\"Version\":\"2012-10-17\",\"Statement\":[{\"Effect\":\"Allow\",\"Principal\":{\"Federated\":[\""+oidc_response["OpenIDConnectProviderArn"]+"\"]},\"Action\":[\"sts:AssumeRoleWithWebIdentity\"],\"Condition\":{\"StringEquals\":{\"localhost:8080/auth/realms/"+realm+":azp\":\""+azp+"\"}}}]}" + (role_error,role_response,general_role_name)=create_role(iam_client,'/',None,policy_document,None,None,None) + eq(role_response['Role']['Arn'],'arn:aws:iam:::role/'+general_role_name+'') + + role_policy = "{\"Version\":\"2012-10-17\",\"Statement\":{\"Effect\":\"Allow\",\"Action\":\"s3:*\",\"Resource\":\"arn:aws:s3:::*\"}}" + (role_err,response)=put_role_policy(iam_client,general_role_name,None,role_policy) + eq(response['ResponseMetadata']['HTTPStatusCode'],200) + + resp=sts_client.assume_role_with_web_identity(RoleArn=role_response['Role']['Arn'],RoleSessionName=role_session_name,WebIdentityToken=token) + eq(resp['ResponseMetadata']['HTTPStatusCode'],200) + + s3_client = boto3.client('s3', + aws_access_key_id = resp['Credentials']['AccessKeyId'], + aws_secret_access_key = resp['Credentials']['SecretAccessKey'], + aws_session_token = resp['Credentials']['SessionToken'], + endpoint_url=default_endpoint, + region_name='', + ) + bucket_name = get_new_bucket_name() + s3bucket = s3_client.create_bucket(Bucket=bucket_name) + eq(s3bucket['ResponseMetadata']['HTTPStatusCode'],200) + bkt = s3_client.delete_bucket(Bucket=bucket_name) + eq(bkt['ResponseMetadata']['HTTPStatusCode'],204) + + oidc_remove=iam_client.delete_open_id_connect_provider( + OpenIDConnectProviderArn=oidc_response["OpenIDConnectProviderArn"] + ) + \ No newline at end of file