From d466b7bd09b94159526164430dde2f3d9bbce1ca Mon Sep 17 00:00:00 2001 From: Pritha Srivastava Date: Wed, 14 Jul 2021 15:40:39 +0530 Subject: [PATCH 1/6] rgw/sts: adding tests for testing assumerolewithwebidentity using 'sub' and 'azp' fields in the web token. Signed-off-by: Pritha Srivastava --- s3tests.conf.SAMPLE | 4 + s3tests_boto3/functional/__init__.py | 8 ++ s3tests_boto3/functional/test_sts.py | 110 ++++++++++++++++++++++++++- 3 files changed, 121 insertions(+), 1 deletion(-) diff --git a/s3tests.conf.SAMPLE b/s3tests.conf.SAMPLE index 127a79f..b26bdca 100644 --- a/s3tests.conf.SAMPLE +++ b/s3tests.conf.SAMPLE @@ -98,6 +98,10 @@ token= aud= +sub= + +azp= + thumbprint= KC_REALM= diff --git a/s3tests_boto3/functional/__init__.py b/s3tests_boto3/functional/__init__.py index 24db114..7a95670 100644 --- a/s3tests_boto3/functional/__init__.py +++ b/s3tests_boto3/functional/__init__.py @@ -294,6 +294,8 @@ def check_webidentity(): config.webidentity_aud = cfg.get('webidentity', "aud") config.webidentity_token = cfg.get('webidentity', "token") config.webidentity_realm = cfg.get('webidentity', "KC_REALM") + config.webidentity_sub = cfg.get('webidentity', "sub") + config.webidentity_azp = cfg.get('webidentity', "azp") def get_client(client_config=None): if client_config == None: @@ -575,6 +577,12 @@ def get_thumbprint(): def get_aud(): return config.webidentity_aud +def get_sub(): + return config.webidentity_sub + +def get_azp(): + return config.webidentity_azp + def get_token(): return config.webidentity_token diff --git a/s3tests_boto3/functional/test_sts.py b/s3tests_boto3/functional/test_sts.py index bbfb99a..6b2eba0 100644 --- a/s3tests_boto3/functional/test_sts.py +++ b/s3tests_boto3/functional/test_sts.py @@ -47,7 +47,9 @@ from . import( get_realm_name, check_webidentity, get_iam_access_key, - get_iam_secret_key + get_iam_secret_key, + get_sub, + get_azp ) log = logging.getLogger(__name__) @@ -395,6 +397,7 @@ def test_assume_role_allow_head_nonexistent(): @attr(operation='check') @attr(assertion='assuming role through web token') @attr('webidentity_test') +@attr('token_claims_trust_policy_test') def test_assume_role_with_web_identity(): check_webidentity() iam_client=get_iam_client() @@ -1313,3 +1316,108 @@ def test_session_policy_bucket_policy_deny(): oidc_remove=iam_client.delete_open_id_connect_provider( OpenIDConnectProviderArn=oidc_arn ) + +@attr(resource='assume role with web identity') +@attr(method='get') +@attr(operation='check') +@attr(assertion='assuming role using web token using sub in trust policy') +@attr('webidentity_test') +@attr('token_claims_trust_policy_test') +def test_assume_role_with_web_identity_with_sub(): + check_webidentity() + iam_client=get_iam_client() + sts_client=get_sts_client() + default_endpoint=get_config_endpoint() + role_session_name=get_parameter_name() + thumbprint=get_thumbprint() + sub=get_sub() + token=get_token() + realm=get_realm_name() + + oidc_response = iam_client.create_open_id_connect_provider( + Url='http://localhost:8080/auth/realms/{}'.format(realm), + ThumbprintList=[ + thumbprint, + ], + ) + + policy_document = "{\"Version\":\"2012-10-17\",\"Statement\":[{\"Effect\":\"Allow\",\"Principal\":{\"Federated\":[\""+oidc_response["OpenIDConnectProviderArn"]+"\"]},\"Action\":[\"sts:AssumeRoleWithWebIdentity\"],\"Condition\":{\"StringEquals\":{\"localhost:8080/auth/realms/"+realm+":sub\":\""+sub+"\"}}}]}" + (role_error,role_response,general_role_name)=create_role(iam_client,'/',None,policy_document,None,None,None) + eq(role_response['Role']['Arn'],'arn:aws:iam:::role/'+general_role_name+'') + + role_policy = "{\"Version\":\"2012-10-17\",\"Statement\":{\"Effect\":\"Allow\",\"Action\":\"s3:*\",\"Resource\":\"arn:aws:s3:::*\"}}" + (role_err,response)=put_role_policy(iam_client,general_role_name,None,role_policy) + eq(response['ResponseMetadata']['HTTPStatusCode'],200) + + resp=sts_client.assume_role_with_web_identity(RoleArn=role_response['Role']['Arn'],RoleSessionName=role_session_name,WebIdentityToken=token) + eq(resp['ResponseMetadata']['HTTPStatusCode'],200) + + s3_client = boto3.client('s3', + aws_access_key_id = resp['Credentials']['AccessKeyId'], + aws_secret_access_key = resp['Credentials']['SecretAccessKey'], + aws_session_token = resp['Credentials']['SessionToken'], + endpoint_url=default_endpoint, + region_name='', + ) + bucket_name = get_new_bucket_name() + s3bucket = s3_client.create_bucket(Bucket=bucket_name) + eq(s3bucket['ResponseMetadata']['HTTPStatusCode'],200) + bkt = s3_client.delete_bucket(Bucket=bucket_name) + eq(bkt['ResponseMetadata']['HTTPStatusCode'],204) + + oidc_remove=iam_client.delete_open_id_connect_provider( + OpenIDConnectProviderArn=oidc_response["OpenIDConnectProviderArn"] + ) + +@attr(resource='assume role with web identity') +@attr(method='get') +@attr(operation='check') +@attr(assertion='assuming role using web token using azp in trust policy') +@attr('webidentity_test') +@attr('token_claims_trust_policy_test') +def test_assume_role_with_web_identity_with_azp(): + check_webidentity() + iam_client=get_iam_client() + sts_client=get_sts_client() + default_endpoint=get_config_endpoint() + role_session_name=get_parameter_name() + thumbprint=get_thumbprint() + azp=get_azp() + token=get_token() + realm=get_realm_name() + + oidc_response = iam_client.create_open_id_connect_provider( + Url='http://localhost:8080/auth/realms/{}'.format(realm), + ThumbprintList=[ + thumbprint, + ], + ) + + policy_document = "{\"Version\":\"2012-10-17\",\"Statement\":[{\"Effect\":\"Allow\",\"Principal\":{\"Federated\":[\""+oidc_response["OpenIDConnectProviderArn"]+"\"]},\"Action\":[\"sts:AssumeRoleWithWebIdentity\"],\"Condition\":{\"StringEquals\":{\"localhost:8080/auth/realms/"+realm+":azp\":\""+azp+"\"}}}]}" + (role_error,role_response,general_role_name)=create_role(iam_client,'/',None,policy_document,None,None,None) + eq(role_response['Role']['Arn'],'arn:aws:iam:::role/'+general_role_name+'') + + role_policy = "{\"Version\":\"2012-10-17\",\"Statement\":{\"Effect\":\"Allow\",\"Action\":\"s3:*\",\"Resource\":\"arn:aws:s3:::*\"}}" + (role_err,response)=put_role_policy(iam_client,general_role_name,None,role_policy) + eq(response['ResponseMetadata']['HTTPStatusCode'],200) + + resp=sts_client.assume_role_with_web_identity(RoleArn=role_response['Role']['Arn'],RoleSessionName=role_session_name,WebIdentityToken=token) + eq(resp['ResponseMetadata']['HTTPStatusCode'],200) + + s3_client = boto3.client('s3', + aws_access_key_id = resp['Credentials']['AccessKeyId'], + aws_secret_access_key = resp['Credentials']['SecretAccessKey'], + aws_session_token = resp['Credentials']['SessionToken'], + endpoint_url=default_endpoint, + region_name='', + ) + bucket_name = get_new_bucket_name() + s3bucket = s3_client.create_bucket(Bucket=bucket_name) + eq(s3bucket['ResponseMetadata']['HTTPStatusCode'],200) + bkt = s3_client.delete_bucket(Bucket=bucket_name) + eq(bkt['ResponseMetadata']['HTTPStatusCode'],204) + + oidc_remove=iam_client.delete_open_id_connect_provider( + OpenIDConnectProviderArn=oidc_response["OpenIDConnectProviderArn"] + ) + \ No newline at end of file From 64068d7bf922ee349c9e47b103b17e5ded0a5450 Mon Sep 17 00:00:00 2001 From: Pritha Srivastava Date: Thu, 15 Jul 2021 13:33:08 +0530 Subject: [PATCH 2/6] rgw/sts: adding test to check for aws:RequestTag in the condition element of a role's trust policy. Signed-off-by: Pritha Srivastava --- s3tests.conf.SAMPLE | 2 + s3tests_boto3/functional/__init__.py | 4 ++ s3tests_boto3/functional/test_sts.py | 56 +++++++++++++++++++++++++++- 3 files changed, 60 insertions(+), 2 deletions(-) diff --git a/s3tests.conf.SAMPLE b/s3tests.conf.SAMPLE index b26bdca..c0c5fd9 100644 --- a/s3tests.conf.SAMPLE +++ b/s3tests.conf.SAMPLE @@ -102,6 +102,8 @@ sub= azp= +user_token=] + thumbprint= KC_REALM= diff --git a/s3tests_boto3/functional/__init__.py b/s3tests_boto3/functional/__init__.py index 7a95670..18579e2 100644 --- a/s3tests_boto3/functional/__init__.py +++ b/s3tests_boto3/functional/__init__.py @@ -296,6 +296,7 @@ def check_webidentity(): config.webidentity_realm = cfg.get('webidentity', "KC_REALM") config.webidentity_sub = cfg.get('webidentity', "sub") config.webidentity_azp = cfg.get('webidentity', "azp") + config.webidentity_user_token = cfg.get('webidentity', "user_token") def get_client(client_config=None): if client_config == None: @@ -594,3 +595,6 @@ def get_iam_access_key(): def get_iam_secret_key(): return config.iam_secret_key + +def get_user_token(): + return config.webidentity_user_token diff --git a/s3tests_boto3/functional/test_sts.py b/s3tests_boto3/functional/test_sts.py index 6b2eba0..d9b9a8f 100644 --- a/s3tests_boto3/functional/test_sts.py +++ b/s3tests_boto3/functional/test_sts.py @@ -49,7 +49,8 @@ from . import( get_iam_access_key, get_iam_secret_key, get_sub, - get_azp + get_azp, + get_user_token ) log = logging.getLogger(__name__) @@ -1420,4 +1421,55 @@ def test_assume_role_with_web_identity_with_azp(): oidc_remove=iam_client.delete_open_id_connect_provider( OpenIDConnectProviderArn=oidc_response["OpenIDConnectProviderArn"] ) - \ No newline at end of file + +@attr(resource='assume role with web identity') +@attr(method='get') +@attr(operation='check') +@attr(assertion='assuming role using web token using aws:RequestTag in trust policy') +@attr('webidentity_test') +@attr('token_request_tag_trust_policy_test') +def test_assume_role_with_web_identity_with_request_tag(): + check_webidentity() + iam_client=get_iam_client() + sts_client=get_sts_client() + default_endpoint=get_config_endpoint() + role_session_name=get_parameter_name() + thumbprint=get_thumbprint() + user_token=get_user_token() + realm=get_realm_name() + + oidc_response = iam_client.create_open_id_connect_provider( + Url='http://localhost:8080/auth/realms/{}'.format(realm), + ThumbprintList=[ + thumbprint, + ], + ) + + policy_document = "{\"Version\":\"2012-10-17\",\"Statement\":[{\"Effect\":\"Allow\",\"Principal\":{\"Federated\":[\""+oidc_response["OpenIDConnectProviderArn"]+"\"]},\"Action\":[\"sts:AssumeRoleWithWebIdentity\",\"sts:TagSession\"],\"Condition\":{\"StringEquals\":{\"aws:RequestTag/Department\":\"Engineering\"}}}]}" + (role_error,role_response,general_role_name)=create_role(iam_client,'/',None,policy_document,None,None,None) + eq(role_response['Role']['Arn'],'arn:aws:iam:::role/'+general_role_name+'') + + role_policy = "{\"Version\":\"2012-10-17\",\"Statement\":{\"Effect\":\"Allow\",\"Action\":\"s3:*\",\"Resource\":\"arn:aws:s3:::*\"}}" + (role_err,response)=put_role_policy(iam_client,general_role_name,None,role_policy) + eq(response['ResponseMetadata']['HTTPStatusCode'],200) + + resp=sts_client.assume_role_with_web_identity(RoleArn=role_response['Role']['Arn'],RoleSessionName=role_session_name,WebIdentityToken=user_token) + eq(resp['ResponseMetadata']['HTTPStatusCode'],200) + + s3_client = boto3.client('s3', + aws_access_key_id = resp['Credentials']['AccessKeyId'], + aws_secret_access_key = resp['Credentials']['SecretAccessKey'], + aws_session_token = resp['Credentials']['SessionToken'], + endpoint_url=default_endpoint, + region_name='', + ) + bucket_name = get_new_bucket_name() + s3bucket = s3_client.create_bucket(Bucket=bucket_name) + eq(s3bucket['ResponseMetadata']['HTTPStatusCode'],200) + bkt = s3_client.delete_bucket(Bucket=bucket_name) + eq(bkt['ResponseMetadata']['HTTPStatusCode'],204) + + oidc_remove=iam_client.delete_open_id_connect_provider( + OpenIDConnectProviderArn=oidc_response["OpenIDConnectProviderArn"] + ) + From 86fecf83b90b247a26e0f69203b7c3844b090af2 Mon Sep 17 00:00:00 2001 From: Pritha Srivastava Date: Thu, 15 Jul 2021 14:15:54 +0530 Subject: [PATCH 3/6] rgw/sts: adding test for aws:PrincipalTag in role permission policy. Signed-off-by: Pritha Srivastava --- s3tests_boto3/functional/test_sts.py | 155 +++++++++++++++++++++++++++ 1 file changed, 155 insertions(+) diff --git a/s3tests_boto3/functional/test_sts.py b/s3tests_boto3/functional/test_sts.py index d9b9a8f..71e5d93 100644 --- a/s3tests_boto3/functional/test_sts.py +++ b/s3tests_boto3/functional/test_sts.py @@ -1473,3 +1473,158 @@ def test_assume_role_with_web_identity_with_request_tag(): OpenIDConnectProviderArn=oidc_response["OpenIDConnectProviderArn"] ) +@attr(resource='assume role with web identity') +@attr(method='get') +@attr(operation='check') +@attr(assertion='assuming role using web token with aws:PrincipalTag in role policy') +@attr('webidentity_test') +@attr('token_principal_tag_role_policy_test') +def test_assume_role_with_web_identity_with_principal_tag(): + check_webidentity() + iam_client=get_iam_client() + sts_client=get_sts_client() + default_endpoint=get_config_endpoint() + role_session_name=get_parameter_name() + thumbprint=get_thumbprint() + user_token=get_user_token() + realm=get_realm_name() + + oidc_response = iam_client.create_open_id_connect_provider( + Url='http://localhost:8080/auth/realms/{}'.format(realm), + ThumbprintList=[ + thumbprint, + ], + ) + + policy_document = "{\"Version\":\"2012-10-17\",\"Statement\":[{\"Effect\":\"Allow\",\"Principal\":{\"Federated\":[\""+oidc_response["OpenIDConnectProviderArn"]+"\"]},\"Action\":[\"sts:AssumeRoleWithWebIdentity\",\"sts:TagSession\"],\"Condition\":{\"StringEquals\":{\"aws:RequestTag/Department\":\"Engineering\"}}}]}" + (role_error,role_response,general_role_name)=create_role(iam_client,'/',None,policy_document,None,None,None) + eq(role_response['Role']['Arn'],'arn:aws:iam:::role/'+general_role_name+'') + + role_policy = "{\"Version\":\"2012-10-17\",\"Statement\":{\"Effect\":\"Allow\",\"Action\":\"s3:*\",\"Resource\":\"arn:aws:s3:::*\",\"Condition\":{\"StringEquals\":{\"aws:PrincipalTag/Department\":\"Engineering\"}}}}" + (role_err,response)=put_role_policy(iam_client,general_role_name,None,role_policy) + eq(response['ResponseMetadata']['HTTPStatusCode'],200) + + resp=sts_client.assume_role_with_web_identity(RoleArn=role_response['Role']['Arn'],RoleSessionName=role_session_name,WebIdentityToken=user_token) + eq(resp['ResponseMetadata']['HTTPStatusCode'],200) + + s3_client = boto3.client('s3', + aws_access_key_id = resp['Credentials']['AccessKeyId'], + aws_secret_access_key = resp['Credentials']['SecretAccessKey'], + aws_session_token = resp['Credentials']['SessionToken'], + endpoint_url=default_endpoint, + region_name='', + ) + bucket_name = get_new_bucket_name() + s3bucket = s3_client.create_bucket(Bucket=bucket_name) + eq(s3bucket['ResponseMetadata']['HTTPStatusCode'],200) + bkt = s3_client.delete_bucket(Bucket=bucket_name) + eq(bkt['ResponseMetadata']['HTTPStatusCode'],204) + + oidc_remove=iam_client.delete_open_id_connect_provider( + OpenIDConnectProviderArn=oidc_response["OpenIDConnectProviderArn"] + ) + +@attr(resource='assume role with web identity') +@attr(method='get') +@attr(operation='check') +@attr(assertion='assuming role using web token with aws:PrincipalTag in role policy') +@attr('webidentity_test') +@attr('token_principal_tag_role_policy_test') +def test_assume_role_with_web_identity_for_all_values(): + check_webidentity() + iam_client=get_iam_client() + sts_client=get_sts_client() + default_endpoint=get_config_endpoint() + role_session_name=get_parameter_name() + thumbprint=get_thumbprint() + user_token=get_user_token() + realm=get_realm_name() + + oidc_response = iam_client.create_open_id_connect_provider( + Url='http://localhost:8080/auth/realms/{}'.format(realm), + ThumbprintList=[ + thumbprint, + ], + ) + + policy_document = "{\"Version\":\"2012-10-17\",\"Statement\":[{\"Effect\":\"Allow\",\"Principal\":{\"Federated\":[\""+oidc_response["OpenIDConnectProviderArn"]+"\"]},\"Action\":[\"sts:AssumeRoleWithWebIdentity\",\"sts:TagSession\"],\"Condition\":{\"StringEquals\":{\"aws:RequestTag/Department\":\"Engineering\"}}}]}" + (role_error,role_response,general_role_name)=create_role(iam_client,'/',None,policy_document,None,None,None) + eq(role_response['Role']['Arn'],'arn:aws:iam:::role/'+general_role_name+'') + + role_policy = "{\"Version\":\"2012-10-17\",\"Statement\":{\"Effect\":\"Allow\",\"Action\":\"s3:*\",\"Resource\":\"arn:aws:s3:::*\",\"Condition\":{\"ForAllValues:StringEquals\":{\"aws:PrincipalTag/Department\":[\"Engineering\",\"Marketing\"]}}}}" + (role_err,response)=put_role_policy(iam_client,general_role_name,None,role_policy) + eq(response['ResponseMetadata']['HTTPStatusCode'],200) + + resp=sts_client.assume_role_with_web_identity(RoleArn=role_response['Role']['Arn'],RoleSessionName=role_session_name,WebIdentityToken=user_token) + eq(resp['ResponseMetadata']['HTTPStatusCode'],200) + + s3_client = boto3.client('s3', + aws_access_key_id = resp['Credentials']['AccessKeyId'], + aws_secret_access_key = resp['Credentials']['SecretAccessKey'], + aws_session_token = resp['Credentials']['SessionToken'], + endpoint_url=default_endpoint, + region_name='', + ) + bucket_name = get_new_bucket_name() + s3bucket = s3_client.create_bucket(Bucket=bucket_name) + eq(s3bucket['ResponseMetadata']['HTTPStatusCode'],200) + bkt = s3_client.delete_bucket(Bucket=bucket_name) + eq(bkt['ResponseMetadata']['HTTPStatusCode'],204) + + oidc_remove=iam_client.delete_open_id_connect_provider( + OpenIDConnectProviderArn=oidc_response["OpenIDConnectProviderArn"] + ) + +@attr(resource='assume role with web identity') +@attr(method='get') +@attr(operation='check') +@attr(assertion='assuming role using web token with aws:PrincipalTag in role policy') +@attr('webidentity_test') +@attr('token_principal_tag_role_policy_test') +def test_assume_role_with_web_identity_for_all_values_deny(): + check_webidentity() + iam_client=get_iam_client() + sts_client=get_sts_client() + default_endpoint=get_config_endpoint() + role_session_name=get_parameter_name() + thumbprint=get_thumbprint() + user_token=get_user_token() + realm=get_realm_name() + + oidc_response = iam_client.create_open_id_connect_provider( + Url='http://localhost:8080/auth/realms/{}'.format(realm), + ThumbprintList=[ + thumbprint, + ], + ) + + policy_document = "{\"Version\":\"2012-10-17\",\"Statement\":[{\"Effect\":\"Allow\",\"Principal\":{\"Federated\":[\""+oidc_response["OpenIDConnectProviderArn"]+"\"]},\"Action\":[\"sts:AssumeRoleWithWebIdentity\",\"sts:TagSession\"],\"Condition\":{\"StringEquals\":{\"aws:RequestTag/Department\":\"Engineering\"}}}]}" + (role_error,role_response,general_role_name)=create_role(iam_client,'/',None,policy_document,None,None,None) + eq(role_response['Role']['Arn'],'arn:aws:iam:::role/'+general_role_name+'') + + #ForAllValues: The condition returns true if every key value in the request matches at least one value in the policy + role_policy = "{\"Version\":\"2012-10-17\",\"Statement\":{\"Effect\":\"Allow\",\"Action\":\"s3:*\",\"Resource\":\"arn:aws:s3:::*\",\"Condition\":{\"ForAllValues:StringEquals\":{\"aws:PrincipalTag/Department\":[\"Engineering\"]}}}}" + (role_err,response)=put_role_policy(iam_client,general_role_name,None,role_policy) + eq(response['ResponseMetadata']['HTTPStatusCode'],200) + + resp=sts_client.assume_role_with_web_identity(RoleArn=role_response['Role']['Arn'],RoleSessionName=role_session_name,WebIdentityToken=user_token) + eq(resp['ResponseMetadata']['HTTPStatusCode'],200) + + s3_client = boto3.client('s3', + aws_access_key_id = resp['Credentials']['AccessKeyId'], + aws_secret_access_key = resp['Credentials']['SecretAccessKey'], + aws_session_token = resp['Credentials']['SessionToken'], + endpoint_url=default_endpoint, + region_name='', + ) + bucket_name = get_new_bucket_name() + try: + s3bucket = s3_client.create_bucket(Bucket=bucket_name) + except ClientError as e: + s3bucket_error = e.response.get("Error", {}).get("Code") + eq(s3bucket_error,'AccessDenied') + + oidc_remove=iam_client.delete_open_id_connect_provider( + OpenIDConnectProviderArn=oidc_response["OpenIDConnectProviderArn"] + ) + From bf43a4a10a2eea15ae0c0c06fe9f16c1893d4115 Mon Sep 17 00:00:00 2001 From: Pritha Srivastava Date: Thu, 15 Jul 2021 15:40:04 +0530 Subject: [PATCH 4/6] rgw/sts: adding test for aws:TagKeys that can be used in the condition element of the role's trust policy and the role's permission policy. Signed-off-by: Pritha Srivastava --- s3tests_boto3/functional/test_sts.py | 102 +++++++++++++++++++++++++++ 1 file changed, 102 insertions(+) diff --git a/s3tests_boto3/functional/test_sts.py b/s3tests_boto3/functional/test_sts.py index 71e5d93..378b0c4 100644 --- a/s3tests_boto3/functional/test_sts.py +++ b/s3tests_boto3/functional/test_sts.py @@ -1628,3 +1628,105 @@ def test_assume_role_with_web_identity_for_all_values_deny(): OpenIDConnectProviderArn=oidc_response["OpenIDConnectProviderArn"] ) +@attr(resource='assume role with web identity') +@attr(method='get') +@attr(operation='check') +@attr(assertion='assuming role using web token with aws:TagKeys in trust policy') +@attr('webidentity_test') +@attr('token_tag_keys_test') +def test_assume_role_with_web_identity_tag_keys_trust_policy(): + check_webidentity() + iam_client=get_iam_client() + sts_client=get_sts_client() + default_endpoint=get_config_endpoint() + role_session_name=get_parameter_name() + thumbprint=get_thumbprint() + user_token=get_user_token() + realm=get_realm_name() + + oidc_response = iam_client.create_open_id_connect_provider( + Url='http://localhost:8080/auth/realms/{}'.format(realm), + ThumbprintList=[ + thumbprint, + ], + ) + + policy_document = "{\"Version\":\"2012-10-17\",\"Statement\":[{\"Effect\":\"Allow\",\"Principal\":{\"Federated\":[\""+oidc_response["OpenIDConnectProviderArn"]+"\"]},\"Action\":[\"sts:AssumeRoleWithWebIdentity\",\"sts:TagSession\"],\"Condition\":{\"StringEquals\":{\"aws:TagKeys\":\"Department\"}}}]}" + (role_error,role_response,general_role_name)=create_role(iam_client,'/',None,policy_document,None,None,None) + eq(role_response['Role']['Arn'],'arn:aws:iam:::role/'+general_role_name+'') + + role_policy = "{\"Version\":\"2012-10-17\",\"Statement\":{\"Effect\":\"Allow\",\"Action\":\"s3:*\",\"Resource\":\"arn:aws:s3:::*\",\"Condition\":{\"ForAnyValue:StringEquals\":{\"aws:PrincipalTag/Department\":[\"Engineering\"]}}}}" + (role_err,response)=put_role_policy(iam_client,general_role_name,None,role_policy) + eq(response['ResponseMetadata']['HTTPStatusCode'],200) + + resp=sts_client.assume_role_with_web_identity(RoleArn=role_response['Role']['Arn'],RoleSessionName=role_session_name,WebIdentityToken=user_token) + eq(resp['ResponseMetadata']['HTTPStatusCode'],200) + + s3_client = boto3.client('s3', + aws_access_key_id = resp['Credentials']['AccessKeyId'], + aws_secret_access_key = resp['Credentials']['SecretAccessKey'], + aws_session_token = resp['Credentials']['SessionToken'], + endpoint_url=default_endpoint, + region_name='', + ) + bucket_name = get_new_bucket_name() + s3bucket = s3_client.create_bucket(Bucket=bucket_name) + eq(s3bucket['ResponseMetadata']['HTTPStatusCode'],200) + bkt = s3_client.delete_bucket(Bucket=bucket_name) + eq(bkt['ResponseMetadata']['HTTPStatusCode'],204) + + oidc_remove=iam_client.delete_open_id_connect_provider( + OpenIDConnectProviderArn=oidc_response["OpenIDConnectProviderArn"] + ) + +@attr(resource='assume role with web identity') +@attr(method='get') +@attr(operation='check') +@attr(assertion='assuming role using web token with aws:TagKeys in role permission policy') +@attr('webidentity_test') +@attr('token_tag_keys_test') +def test_assume_role_with_web_identity_tag_keys_role_policy(): + check_webidentity() + iam_client=get_iam_client() + sts_client=get_sts_client() + default_endpoint=get_config_endpoint() + role_session_name=get_parameter_name() + thumbprint=get_thumbprint() + user_token=get_user_token() + realm=get_realm_name() + + oidc_response = iam_client.create_open_id_connect_provider( + Url='http://localhost:8080/auth/realms/{}'.format(realm), + ThumbprintList=[ + thumbprint, + ], + ) + + policy_document = "{\"Version\":\"2012-10-17\",\"Statement\":[{\"Effect\":\"Allow\",\"Principal\":{\"Federated\":[\""+oidc_response["OpenIDConnectProviderArn"]+"\"]},\"Action\":[\"sts:AssumeRoleWithWebIdentity\",\"sts:TagSession\"],\"Condition\":{\"StringEquals\":{\"aws:RequestTag/Department\":\"Engineering\"}}}]}" + (role_error,role_response,general_role_name)=create_role(iam_client,'/',None,policy_document,None,None,None) + eq(role_response['Role']['Arn'],'arn:aws:iam:::role/'+general_role_name+'') + + role_policy = "{\"Version\":\"2012-10-17\",\"Statement\":{\"Effect\":\"Allow\",\"Action\":\"s3:*\",\"Resource\":\"arn:aws:s3:::*\",\"Condition\":{\"StringEquals\":{\"aws:TagKeys\":[\"Department\"]}}}}" + (role_err,response)=put_role_policy(iam_client,general_role_name,None,role_policy) + eq(response['ResponseMetadata']['HTTPStatusCode'],200) + + resp=sts_client.assume_role_with_web_identity(RoleArn=role_response['Role']['Arn'],RoleSessionName=role_session_name,WebIdentityToken=user_token) + eq(resp['ResponseMetadata']['HTTPStatusCode'],200) + + s3_client = boto3.client('s3', + aws_access_key_id = resp['Credentials']['AccessKeyId'], + aws_secret_access_key = resp['Credentials']['SecretAccessKey'], + aws_session_token = resp['Credentials']['SessionToken'], + endpoint_url=default_endpoint, + region_name='', + ) + bucket_name = get_new_bucket_name() + s3bucket = s3_client.create_bucket(Bucket=bucket_name) + eq(s3bucket['ResponseMetadata']['HTTPStatusCode'],200) + bkt = s3_client.delete_bucket(Bucket=bucket_name) + eq(bkt['ResponseMetadata']['HTTPStatusCode'],204) + + oidc_remove=iam_client.delete_open_id_connect_provider( + OpenIDConnectProviderArn=oidc_response["OpenIDConnectProviderArn"] + ) + From 5dcc3dd689486ac759092a2bd3307c469bf0c7e1 Mon Sep 17 00:00:00 2001 From: Pritha Srivastava Date: Fri, 16 Jul 2021 13:17:54 +0530 Subject: [PATCH 5/6] rgw/sts: test for s3:ResourceTag in role's permission policy Signed-off-by: Pritha Srivastava --- s3tests_boto3/functional/test_sts.py | 370 +++++++++++++++++++++++++++ 1 file changed, 370 insertions(+) diff --git a/s3tests_boto3/functional/test_sts.py b/s3tests_boto3/functional/test_sts.py index 378b0c4..fb02aee 100644 --- a/s3tests_boto3/functional/test_sts.py +++ b/s3tests_boto3/functional/test_sts.py @@ -133,6 +133,20 @@ def create_oidc_provider(iam_client, url, clientidlist, thumbprintlist): oidc_arn = None return (oidc_arn, oidc_error) +def get_s3_resource_using_iam_creds(): + iam_access_key = get_iam_access_key() + iam_secret_key = get_iam_secret_key() + default_endpoint = get_config_endpoint() + + s3_res_iam_creds = boto3.resource('s3', + aws_access_key_id = iam_access_key, + aws_secret_access_key = iam_secret_key, + endpoint_url=default_endpoint, + region_name='', + ) + + return s3_res_iam_creds + @attr(resource='get session token') @attr(method='get') @attr(operation='check') @@ -1427,6 +1441,7 @@ def test_assume_role_with_web_identity_with_azp(): @attr(operation='check') @attr(assertion='assuming role using web token using aws:RequestTag in trust policy') @attr('webidentity_test') +@attr('abac_test') @attr('token_request_tag_trust_policy_test') def test_assume_role_with_web_identity_with_request_tag(): check_webidentity() @@ -1478,6 +1493,7 @@ def test_assume_role_with_web_identity_with_request_tag(): @attr(operation='check') @attr(assertion='assuming role using web token with aws:PrincipalTag in role policy') @attr('webidentity_test') +@attr('abac_test') @attr('token_principal_tag_role_policy_test') def test_assume_role_with_web_identity_with_principal_tag(): check_webidentity() @@ -1529,6 +1545,7 @@ def test_assume_role_with_web_identity_with_principal_tag(): @attr(operation='check') @attr(assertion='assuming role using web token with aws:PrincipalTag in role policy') @attr('webidentity_test') +@attr('abac_test') @attr('token_principal_tag_role_policy_test') def test_assume_role_with_web_identity_for_all_values(): check_webidentity() @@ -1580,6 +1597,7 @@ def test_assume_role_with_web_identity_for_all_values(): @attr(operation='check') @attr(assertion='assuming role using web token with aws:PrincipalTag in role policy') @attr('webidentity_test') +@attr('abac_test') @attr('token_principal_tag_role_policy_test') def test_assume_role_with_web_identity_for_all_values_deny(): check_webidentity() @@ -1633,6 +1651,7 @@ def test_assume_role_with_web_identity_for_all_values_deny(): @attr(operation='check') @attr(assertion='assuming role using web token with aws:TagKeys in trust policy') @attr('webidentity_test') +@attr('abac_test') @attr('token_tag_keys_test') def test_assume_role_with_web_identity_tag_keys_trust_policy(): check_webidentity() @@ -1684,6 +1703,7 @@ def test_assume_role_with_web_identity_tag_keys_trust_policy(): @attr(operation='check') @attr(assertion='assuming role using web token with aws:TagKeys in role permission policy') @attr('webidentity_test') +@attr('abac_test') @attr('token_tag_keys_test') def test_assume_role_with_web_identity_tag_keys_role_policy(): check_webidentity() @@ -1730,3 +1750,353 @@ def test_assume_role_with_web_identity_tag_keys_role_policy(): OpenIDConnectProviderArn=oidc_response["OpenIDConnectProviderArn"] ) +@attr(resource='assume role with web identity') +@attr(method='put') +@attr(operation='check') +@attr(assertion='assuming role using web token with s3:ResourceTag in role permission policy') +@attr('webidentity_test') +@attr('abac_test') +@attr('token_resource_tags_test') +def test_assume_role_with_web_identity_resource_tag(): + check_webidentity() + iam_client=get_iam_client() + sts_client=get_sts_client() + default_endpoint=get_config_endpoint() + role_session_name=get_parameter_name() + thumbprint=get_thumbprint() + user_token=get_user_token() + realm=get_realm_name() + + s3_res_iam_creds = get_s3_resource_using_iam_creds() + + s3_client_iam_creds = s3_res_iam_creds.meta.client + + bucket_name = get_new_bucket_name() + s3bucket = s3_client_iam_creds.create_bucket(Bucket=bucket_name) + eq(s3bucket['ResponseMetadata']['HTTPStatusCode'],200) + + bucket_tagging = s3_res_iam_creds.BucketTagging(bucket_name) + Set_Tag = bucket_tagging.put(Tagging={'TagSet':[{'Key':'Department', 'Value': 'Engineering'},{'Key':'Department', 'Value': 'Marketing'}]}) + + oidc_response = iam_client.create_open_id_connect_provider( + Url='http://localhost:8080/auth/realms/{}'.format(realm), + ThumbprintList=[ + thumbprint, + ], + ) + + policy_document = "{\"Version\":\"2012-10-17\",\"Statement\":[{\"Effect\":\"Allow\",\"Principal\":{\"Federated\":[\""+oidc_response["OpenIDConnectProviderArn"]+"\"]},\"Action\":[\"sts:AssumeRoleWithWebIdentity\",\"sts:TagSession\"],\"Condition\":{\"StringEquals\":{\"aws:RequestTag/Department\":\"Engineering\"}}}]}" + (role_error,role_response,general_role_name)=create_role(iam_client,'/',None,policy_document,None,None,None) + eq(role_response['Role']['Arn'],'arn:aws:iam:::role/'+general_role_name+'') + + role_policy = "{\"Version\":\"2012-10-17\",\"Statement\":{\"Effect\":\"Allow\",\"Action\":\"s3:*\",\"Resource\":\"arn:aws:s3:::*\",\"Condition\":{\"StringEquals\":{\"s3:ResourceTag/Department\":[\"Engineering\"]}}}}" + (role_err,response)=put_role_policy(iam_client,general_role_name,None,role_policy) + eq(response['ResponseMetadata']['HTTPStatusCode'],200) + + resp=sts_client.assume_role_with_web_identity(RoleArn=role_response['Role']['Arn'],RoleSessionName=role_session_name,WebIdentityToken=user_token) + eq(resp['ResponseMetadata']['HTTPStatusCode'],200) + + s3_client = boto3.client('s3', + aws_access_key_id = resp['Credentials']['AccessKeyId'], + aws_secret_access_key = resp['Credentials']['SecretAccessKey'], + aws_session_token = resp['Credentials']['SessionToken'], + endpoint_url=default_endpoint, + region_name='', + ) + + bucket_body = 'this is a test file' + s3_put_obj = s3_client.put_object(Body=bucket_body, Bucket=bucket_name, Key="test-1.txt") + eq(s3_put_obj['ResponseMetadata']['HTTPStatusCode'],200) + + oidc_remove=iam_client.delete_open_id_connect_provider( + OpenIDConnectProviderArn=oidc_response["OpenIDConnectProviderArn"] + ) + +@attr(resource='assume role with web identity') +@attr(method='put') +@attr(operation='check') +@attr(assertion='assuming role using web token with s3:ResourceTag with missing tags on bucket') +@attr('webidentity_test') +@attr('abac_test') +@attr('token_resource_tags_test') +def test_assume_role_with_web_identity_resource_tag_deny(): + check_webidentity() + iam_client=get_iam_client() + sts_client=get_sts_client() + default_endpoint=get_config_endpoint() + role_session_name=get_parameter_name() + thumbprint=get_thumbprint() + user_token=get_user_token() + realm=get_realm_name() + + s3_res_iam_creds = get_s3_resource_using_iam_creds() + + s3_client_iam_creds = s3_res_iam_creds.meta.client + + bucket_name = get_new_bucket_name() + s3bucket = s3_client_iam_creds.create_bucket(Bucket=bucket_name) + eq(s3bucket['ResponseMetadata']['HTTPStatusCode'],200) + + oidc_response = iam_client.create_open_id_connect_provider( + Url='http://localhost:8080/auth/realms/{}'.format(realm), + ThumbprintList=[ + thumbprint, + ], + ) + + policy_document = "{\"Version\":\"2012-10-17\",\"Statement\":[{\"Effect\":\"Allow\",\"Principal\":{\"Federated\":[\""+oidc_response["OpenIDConnectProviderArn"]+"\"]},\"Action\":[\"sts:AssumeRoleWithWebIdentity\",\"sts:TagSession\"],\"Condition\":{\"StringEquals\":{\"aws:RequestTag/Department\":\"Engineering\"}}}]}" + (role_error,role_response,general_role_name)=create_role(iam_client,'/',None,policy_document,None,None,None) + eq(role_response['Role']['Arn'],'arn:aws:iam:::role/'+general_role_name+'') + + role_policy = "{\"Version\":\"2012-10-17\",\"Statement\":{\"Effect\":\"Allow\",\"Action\":\"s3:*\",\"Resource\":\"arn:aws:s3:::*\",\"Condition\":{\"StringEquals\":{\"s3:ResourceTag/Department\":[\"Engineering\"]}}}}" + (role_err,response)=put_role_policy(iam_client,general_role_name,None,role_policy) + eq(response['ResponseMetadata']['HTTPStatusCode'],200) + + resp=sts_client.assume_role_with_web_identity(RoleArn=role_response['Role']['Arn'],RoleSessionName=role_session_name,WebIdentityToken=user_token) + eq(resp['ResponseMetadata']['HTTPStatusCode'],200) + + s3_client = boto3.client('s3', + aws_access_key_id = resp['Credentials']['AccessKeyId'], + aws_secret_access_key = resp['Credentials']['SecretAccessKey'], + aws_session_token = resp['Credentials']['SessionToken'], + endpoint_url=default_endpoint, + region_name='', + ) + + bucket_body = 'this is a test file' + try: + s3_put_obj = s3_client.put_object(Body=bucket_body, Bucket=bucket_name, Key="test-1.txt") + except ClientError as e: + s3_put_obj_error = e.response.get("Error", {}).get("Code") + eq(s3_put_obj_error,'AccessDenied') + + oidc_remove=iam_client.delete_open_id_connect_provider( + OpenIDConnectProviderArn=oidc_response["OpenIDConnectProviderArn"] + ) + +@attr(resource='assume role with web identity') +@attr(method='put') +@attr(operation='check') +@attr(assertion='assuming role using web token with s3:ResourceTag with wrong resource tag in policy') +@attr('webidentity_test') +@attr('abac_test') +@attr('token_resource_tags_test') +def test_assume_role_with_web_identity_wrong_resource_tag_deny(): + check_webidentity() + iam_client=get_iam_client() + sts_client=get_sts_client() + default_endpoint=get_config_endpoint() + role_session_name=get_parameter_name() + thumbprint=get_thumbprint() + user_token=get_user_token() + realm=get_realm_name() + + s3_res_iam_creds = get_s3_resource_using_iam_creds() + + s3_client_iam_creds = s3_res_iam_creds.meta.client + + bucket_name = get_new_bucket_name() + s3bucket = s3_client_iam_creds.create_bucket(Bucket=bucket_name) + eq(s3bucket['ResponseMetadata']['HTTPStatusCode'],200) + + bucket_tagging = s3_res_iam_creds.BucketTagging(bucket_name) + Set_Tag = bucket_tagging.put(Tagging={'TagSet':[{'Key':'Department', 'Value': 'WrongResourcetag'}]}) + + oidc_response = iam_client.create_open_id_connect_provider( + Url='http://localhost:8080/auth/realms/{}'.format(realm), + ThumbprintList=[ + thumbprint, + ], + ) + + policy_document = "{\"Version\":\"2012-10-17\",\"Statement\":[{\"Effect\":\"Allow\",\"Principal\":{\"Federated\":[\""+oidc_response["OpenIDConnectProviderArn"]+"\"]},\"Action\":[\"sts:AssumeRoleWithWebIdentity\",\"sts:TagSession\"],\"Condition\":{\"StringEquals\":{\"aws:RequestTag/Department\":\"Engineering\"}}}]}" + (role_error,role_response,general_role_name)=create_role(iam_client,'/',None,policy_document,None,None,None) + eq(role_response['Role']['Arn'],'arn:aws:iam:::role/'+general_role_name+'') + + role_policy = "{\"Version\":\"2012-10-17\",\"Statement\":{\"Effect\":\"Allow\",\"Action\":\"s3:*\",\"Resource\":\"arn:aws:s3:::*\",\"Condition\":{\"StringEquals\":{\"s3:ResourceTag/Department\":[\"Engineering\"]}}}}" + (role_err,response)=put_role_policy(iam_client,general_role_name,None,role_policy) + eq(response['ResponseMetadata']['HTTPStatusCode'],200) + + resp=sts_client.assume_role_with_web_identity(RoleArn=role_response['Role']['Arn'],RoleSessionName=role_session_name,WebIdentityToken=user_token) + eq(resp['ResponseMetadata']['HTTPStatusCode'],200) + + s3_client = boto3.client('s3', + aws_access_key_id = resp['Credentials']['AccessKeyId'], + aws_secret_access_key = resp['Credentials']['SecretAccessKey'], + aws_session_token = resp['Credentials']['SessionToken'], + endpoint_url=default_endpoint, + region_name='', + ) + + bucket_body = 'this is a test file' + try: + s3_put_obj = s3_client.put_object(Body=bucket_body, Bucket=bucket_name, Key="test-1.txt") + except ClientError as e: + s3_put_obj_error = e.response.get("Error", {}).get("Code") + eq(s3_put_obj_error,'AccessDenied') + + oidc_remove=iam_client.delete_open_id_connect_provider( + OpenIDConnectProviderArn=oidc_response["OpenIDConnectProviderArn"] + ) + +@attr(resource='assume role with web identity') +@attr(method='put') +@attr(operation='check') +@attr(assertion='assuming role using web token with s3:ResourceTag matching aws:PrincipalTag in role permission policy') +@attr('webidentity_test') +@attr('abac_test') +@attr('token_resource_tags_test') +def test_assume_role_with_web_identity_resource_tag_princ_tag(): + check_webidentity() + iam_client=get_iam_client() + sts_client=get_sts_client() + default_endpoint=get_config_endpoint() + role_session_name=get_parameter_name() + thumbprint=get_thumbprint() + user_token=get_user_token() + realm=get_realm_name() + + s3_res_iam_creds = get_s3_resource_using_iam_creds() + + s3_client_iam_creds = s3_res_iam_creds.meta.client + + bucket_name = get_new_bucket_name() + s3bucket = s3_client_iam_creds.create_bucket(Bucket=bucket_name) + eq(s3bucket['ResponseMetadata']['HTTPStatusCode'],200) + + bucket_tagging = s3_res_iam_creds.BucketTagging(bucket_name) + Set_Tag = bucket_tagging.put(Tagging={'TagSet':[{'Key':'Department', 'Value': 'Engineering'}]}) + + oidc_response = iam_client.create_open_id_connect_provider( + Url='http://localhost:8080/auth/realms/{}'.format(realm), + ThumbprintList=[ + thumbprint, + ], + ) + + policy_document = "{\"Version\":\"2012-10-17\",\"Statement\":[{\"Effect\":\"Allow\",\"Principal\":{\"Federated\":[\""+oidc_response["OpenIDConnectProviderArn"]+"\"]},\"Action\":[\"sts:AssumeRoleWithWebIdentity\",\"sts:TagSession\"],\"Condition\":{\"StringEquals\":{\"aws:RequestTag/Department\":\"Engineering\"}}}]}" + (role_error,role_response,general_role_name)=create_role(iam_client,'/',None,policy_document,None,None,None) + eq(role_response['Role']['Arn'],'arn:aws:iam:::role/'+general_role_name+'') + + role_policy = "{\"Version\":\"2012-10-17\",\"Statement\":{\"Effect\":\"Allow\",\"Action\":\"s3:*\",\"Resource\":\"arn:aws:s3:::*\",\"Condition\":{\"StringEquals\":{\"s3:ResourceTag/Department\":[\"${aws:PrincipalTag/Department}\"]}}}}" + (role_err,response)=put_role_policy(iam_client,general_role_name,None,role_policy) + eq(response['ResponseMetadata']['HTTPStatusCode'],200) + + resp=sts_client.assume_role_with_web_identity(RoleArn=role_response['Role']['Arn'],RoleSessionName=role_session_name,WebIdentityToken=user_token) + eq(resp['ResponseMetadata']['HTTPStatusCode'],200) + + s3_client = boto3.client('s3', + aws_access_key_id = resp['Credentials']['AccessKeyId'], + aws_secret_access_key = resp['Credentials']['SecretAccessKey'], + aws_session_token = resp['Credentials']['SessionToken'], + endpoint_url=default_endpoint, + region_name='', + ) + + bucket_body = 'this is a test file' + tags = 'Department=Engineering&Department=Marketing' + key = "test-1.txt" + s3_put_obj = s3_client.put_object(Body=bucket_body, Bucket=bucket_name, Key=key, Tagging=tags) + eq(s3_put_obj['ResponseMetadata']['HTTPStatusCode'],200) + + s3_get_obj = s3_client.get_object(Bucket=bucket_name, Key=key) + eq(s3_get_obj['ResponseMetadata']['HTTPStatusCode'],200) + + oidc_remove=iam_client.delete_open_id_connect_provider( + OpenIDConnectProviderArn=oidc_response["OpenIDConnectProviderArn"] + ) + +@attr(resource='assume role with web identity') +@attr(method='put') +@attr(operation='check') +@attr(assertion='assuming role using web token with s3:ResourceTag used to test copy object') +@attr('webidentity_test') +@attr('abac_test') +@attr('token_resource_tags_test') +def test_assume_role_with_web_identity_resource_tag_copy_obj(): + check_webidentity() + iam_client=get_iam_client() + sts_client=get_sts_client() + default_endpoint=get_config_endpoint() + role_session_name=get_parameter_name() + thumbprint=get_thumbprint() + user_token=get_user_token() + realm=get_realm_name() + + s3_res_iam_creds = get_s3_resource_using_iam_creds() + + s3_client_iam_creds = s3_res_iam_creds.meta.client + + #create two buckets and add same tags to both + bucket_name = get_new_bucket_name() + s3bucket = s3_client_iam_creds.create_bucket(Bucket=bucket_name) + eq(s3bucket['ResponseMetadata']['HTTPStatusCode'],200) + + bucket_tagging = s3_res_iam_creds.BucketTagging(bucket_name) + Set_Tag = bucket_tagging.put(Tagging={'TagSet':[{'Key':'Department', 'Value': 'Engineering'}]}) + + copy_bucket_name = get_new_bucket_name() + s3bucket = s3_client_iam_creds.create_bucket(Bucket=copy_bucket_name) + eq(s3bucket['ResponseMetadata']['HTTPStatusCode'],200) + + bucket_tagging = s3_res_iam_creds.BucketTagging(copy_bucket_name) + Set_Tag = bucket_tagging.put(Tagging={'TagSet':[{'Key':'Department', 'Value': 'Engineering'}]}) + + oidc_response = iam_client.create_open_id_connect_provider( + Url='http://localhost:8080/auth/realms/{}'.format(realm), + ThumbprintList=[ + thumbprint, + ], + ) + + policy_document = "{\"Version\":\"2012-10-17\",\"Statement\":[{\"Effect\":\"Allow\",\"Principal\":{\"Federated\":[\""+oidc_response["OpenIDConnectProviderArn"]+"\"]},\"Action\":[\"sts:AssumeRoleWithWebIdentity\",\"sts:TagSession\"],\"Condition\":{\"StringEquals\":{\"aws:RequestTag/Department\":\"Engineering\"}}}]}" + (role_error,role_response,general_role_name)=create_role(iam_client,'/',None,policy_document,None,None,None) + eq(role_response['Role']['Arn'],'arn:aws:iam:::role/'+general_role_name+'') + + role_policy = "{\"Version\":\"2012-10-17\",\"Statement\":{\"Effect\":\"Allow\",\"Action\":\"s3:*\",\"Resource\":\"arn:aws:s3:::*\",\"Condition\":{\"StringEquals\":{\"s3:ResourceTag/Department\":[\"${aws:PrincipalTag/Department}\"]}}}}" + (role_err,response)=put_role_policy(iam_client,general_role_name,None,role_policy) + eq(response['ResponseMetadata']['HTTPStatusCode'],200) + + resp=sts_client.assume_role_with_web_identity(RoleArn=role_response['Role']['Arn'],RoleSessionName=role_session_name,WebIdentityToken=user_token) + eq(resp['ResponseMetadata']['HTTPStatusCode'],200) + + s3_client = boto3.client('s3', + aws_access_key_id = resp['Credentials']['AccessKeyId'], + aws_secret_access_key = resp['Credentials']['SecretAccessKey'], + aws_session_token = resp['Credentials']['SessionToken'], + endpoint_url=default_endpoint, + region_name='', + ) + + bucket_body = 'this is a test file' + tags = 'Department=Engineering' + key = "test-1.txt" + s3_put_obj = s3_client.put_object(Body=bucket_body, Bucket=bucket_name, Key=key, Tagging=tags) + eq(s3_put_obj['ResponseMetadata']['HTTPStatusCode'],200) + + #copy to same bucket + copy_source = { + 'Bucket': bucket_name, + 'Key': 'test-1.txt' + } + + s3_client.copy(copy_source, bucket_name, "test-2.txt") + + s3_get_obj = s3_client.get_object(Bucket=bucket_name, Key="test-2.txt") + eq(s3_get_obj['ResponseMetadata']['HTTPStatusCode'],200) + + #copy to another bucket + copy_source = { + 'Bucket': bucket_name, + 'Key': 'test-1.txt' + } + + s3_client.copy(copy_source, copy_bucket_name, "test-1.txt") + + s3_get_obj = s3_client.get_object(Bucket=copy_bucket_name, Key="test-1.txt") + eq(s3_get_obj['ResponseMetadata']['HTTPStatusCode'],200) + + oidc_remove=iam_client.delete_open_id_connect_provider( + OpenIDConnectProviderArn=oidc_response["OpenIDConnectProviderArn"] + ) + From 71266fede99e585875128106c5d319ddbd08d1b4 Mon Sep 17 00:00:00 2001 From: Pritha Srivastava Date: Fri, 23 Jul 2021 12:08:07 +0530 Subject: [PATCH 6/6] rgw/sts: test to use role tag as iam:ResourceTag in role's trust policy. Signed-off-by: Pritha Srivastava --- s3tests_boto3/functional/test_sts.py | 73 +++++++++++++++++++++++++++- 1 file changed, 71 insertions(+), 2 deletions(-) diff --git a/s3tests_boto3/functional/test_sts.py b/s3tests_boto3/functional/test_sts.py index fb02aee..b47f2a6 100644 --- a/s3tests_boto3/functional/test_sts.py +++ b/s3tests_boto3/functional/test_sts.py @@ -55,12 +55,14 @@ from . import( log = logging.getLogger(__name__) -def create_role(iam_client,path,rolename,policy_document,description,sessionduration,permissionboundary): +def create_role(iam_client,path,rolename,policy_document,description,sessionduration,permissionboundary,tag_list=None): role_err=None if rolename is None: rolename=get_parameter_name() + if tag_list is None: + tag_list = [] try: - role_response = iam_client.create_role(Path=path,RoleName=rolename,AssumeRolePolicyDocument=policy_document,) + role_response = iam_client.create_role(Path=path,RoleName=rolename,AssumeRolePolicyDocument=policy_document,Tags=tag_list) except ClientError as e: role_err = e.response['Code'] return (role_err,role_response,rolename) @@ -2100,3 +2102,70 @@ def test_assume_role_with_web_identity_resource_tag_copy_obj(): OpenIDConnectProviderArn=oidc_response["OpenIDConnectProviderArn"] ) +@attr(resource='assume role with web identity') +@attr(method='put') +@attr(operation='check') +@attr(assertion='assuming role using web token with iam:ResourceTag in role trust policy') +@attr('webidentity_test') +@attr('abac_test') +@attr('token_role_tags_test') +def test_assume_role_with_web_identity_role_resource_tag(): + check_webidentity() + iam_client=get_iam_client() + sts_client=get_sts_client() + default_endpoint=get_config_endpoint() + role_session_name=get_parameter_name() + thumbprint=get_thumbprint() + user_token=get_user_token() + realm=get_realm_name() + + s3_res_iam_creds = get_s3_resource_using_iam_creds() + + s3_client_iam_creds = s3_res_iam_creds.meta.client + + bucket_name = get_new_bucket_name() + s3bucket = s3_client_iam_creds.create_bucket(Bucket=bucket_name) + eq(s3bucket['ResponseMetadata']['HTTPStatusCode'],200) + + bucket_tagging = s3_res_iam_creds.BucketTagging(bucket_name) + Set_Tag = bucket_tagging.put(Tagging={'TagSet':[{'Key':'Department', 'Value': 'Engineering'},{'Key':'Department', 'Value': 'Marketing'}]}) + + oidc_response = iam_client.create_open_id_connect_provider( + Url='http://localhost:8080/auth/realms/{}'.format(realm), + ThumbprintList=[ + thumbprint, + ], + ) + + #iam:ResourceTag refers to the tag attached to role, hence the role is allowed to be assumed only when it has a tag matching the policy. + policy_document = "{\"Version\":\"2012-10-17\",\"Statement\":[{\"Effect\":\"Allow\",\"Principal\":{\"Federated\":[\""+oidc_response["OpenIDConnectProviderArn"]+"\"]},\"Action\":[\"sts:AssumeRoleWithWebIdentity\",\"sts:TagSession\"],\"Condition\":{\"StringEquals\":{\"iam:ResourceTag/Department\":\"Engineering\"}}}]}" + tags_list = [ + {'Key':'Department','Value':'Engineering'}, + {'Key':'Department','Value':'Marketing'} + ] + + (role_error,role_response,general_role_name)=create_role(iam_client,'/',None,policy_document,None,None,None,tags_list) + eq(role_response['Role']['Arn'],'arn:aws:iam:::role/'+general_role_name+'') + + role_policy = "{\"Version\":\"2012-10-17\",\"Statement\":{\"Effect\":\"Allow\",\"Action\":\"s3:*\",\"Resource\":\"arn:aws:s3:::*\",\"Condition\":{\"StringEquals\":{\"s3:ResourceTag/Department\":[\"Engineering\"]}}}}" + (role_err,response)=put_role_policy(iam_client,general_role_name,None,role_policy) + eq(response['ResponseMetadata']['HTTPStatusCode'],200) + + resp=sts_client.assume_role_with_web_identity(RoleArn=role_response['Role']['Arn'],RoleSessionName=role_session_name,WebIdentityToken=user_token) + eq(resp['ResponseMetadata']['HTTPStatusCode'],200) + + s3_client = boto3.client('s3', + aws_access_key_id = resp['Credentials']['AccessKeyId'], + aws_secret_access_key = resp['Credentials']['SecretAccessKey'], + aws_session_token = resp['Credentials']['SessionToken'], + endpoint_url=default_endpoint, + region_name='', + ) + + bucket_body = 'this is a test file' + s3_put_obj = s3_client.put_object(Body=bucket_body, Bucket=bucket_name, Key="test-1.txt") + eq(s3_put_obj['ResponseMetadata']['HTTPStatusCode'],200) + + oidc_remove=iam_client.delete_open_id_connect_provider( + OpenIDConnectProviderArn=oidc_response["OpenIDConnectProviderArn"] + )