From 86fecf83b90b247a26e0f69203b7c3844b090af2 Mon Sep 17 00:00:00 2001 From: Pritha Srivastava Date: Thu, 15 Jul 2021 14:15:54 +0530 Subject: [PATCH] rgw/sts: adding test for aws:PrincipalTag in role permission policy. Signed-off-by: Pritha Srivastava --- s3tests_boto3/functional/test_sts.py | 155 +++++++++++++++++++++++++++ 1 file changed, 155 insertions(+) diff --git a/s3tests_boto3/functional/test_sts.py b/s3tests_boto3/functional/test_sts.py index d9b9a8f..71e5d93 100644 --- a/s3tests_boto3/functional/test_sts.py +++ b/s3tests_boto3/functional/test_sts.py @@ -1473,3 +1473,158 @@ def test_assume_role_with_web_identity_with_request_tag(): OpenIDConnectProviderArn=oidc_response["OpenIDConnectProviderArn"] ) +@attr(resource='assume role with web identity') +@attr(method='get') +@attr(operation='check') +@attr(assertion='assuming role using web token with aws:PrincipalTag in role policy') +@attr('webidentity_test') +@attr('token_principal_tag_role_policy_test') +def test_assume_role_with_web_identity_with_principal_tag(): + check_webidentity() + iam_client=get_iam_client() + sts_client=get_sts_client() + default_endpoint=get_config_endpoint() + role_session_name=get_parameter_name() + thumbprint=get_thumbprint() + user_token=get_user_token() + realm=get_realm_name() + + oidc_response = iam_client.create_open_id_connect_provider( + Url='http://localhost:8080/auth/realms/{}'.format(realm), + ThumbprintList=[ + thumbprint, + ], + ) + + policy_document = "{\"Version\":\"2012-10-17\",\"Statement\":[{\"Effect\":\"Allow\",\"Principal\":{\"Federated\":[\""+oidc_response["OpenIDConnectProviderArn"]+"\"]},\"Action\":[\"sts:AssumeRoleWithWebIdentity\",\"sts:TagSession\"],\"Condition\":{\"StringEquals\":{\"aws:RequestTag/Department\":\"Engineering\"}}}]}" + (role_error,role_response,general_role_name)=create_role(iam_client,'/',None,policy_document,None,None,None) + eq(role_response['Role']['Arn'],'arn:aws:iam:::role/'+general_role_name+'') + + role_policy = "{\"Version\":\"2012-10-17\",\"Statement\":{\"Effect\":\"Allow\",\"Action\":\"s3:*\",\"Resource\":\"arn:aws:s3:::*\",\"Condition\":{\"StringEquals\":{\"aws:PrincipalTag/Department\":\"Engineering\"}}}}" + (role_err,response)=put_role_policy(iam_client,general_role_name,None,role_policy) + eq(response['ResponseMetadata']['HTTPStatusCode'],200) + + resp=sts_client.assume_role_with_web_identity(RoleArn=role_response['Role']['Arn'],RoleSessionName=role_session_name,WebIdentityToken=user_token) + eq(resp['ResponseMetadata']['HTTPStatusCode'],200) + + s3_client = boto3.client('s3', + aws_access_key_id = resp['Credentials']['AccessKeyId'], + aws_secret_access_key = resp['Credentials']['SecretAccessKey'], + aws_session_token = resp['Credentials']['SessionToken'], + endpoint_url=default_endpoint, + region_name='', + ) + bucket_name = get_new_bucket_name() + s3bucket = s3_client.create_bucket(Bucket=bucket_name) + eq(s3bucket['ResponseMetadata']['HTTPStatusCode'],200) + bkt = s3_client.delete_bucket(Bucket=bucket_name) + eq(bkt['ResponseMetadata']['HTTPStatusCode'],204) + + oidc_remove=iam_client.delete_open_id_connect_provider( + OpenIDConnectProviderArn=oidc_response["OpenIDConnectProviderArn"] + ) + +@attr(resource='assume role with web identity') +@attr(method='get') +@attr(operation='check') +@attr(assertion='assuming role using web token with aws:PrincipalTag in role policy') +@attr('webidentity_test') +@attr('token_principal_tag_role_policy_test') +def test_assume_role_with_web_identity_for_all_values(): + check_webidentity() + iam_client=get_iam_client() + sts_client=get_sts_client() + default_endpoint=get_config_endpoint() + role_session_name=get_parameter_name() + thumbprint=get_thumbprint() + user_token=get_user_token() + realm=get_realm_name() + + oidc_response = iam_client.create_open_id_connect_provider( + Url='http://localhost:8080/auth/realms/{}'.format(realm), + ThumbprintList=[ + thumbprint, + ], + ) + + policy_document = "{\"Version\":\"2012-10-17\",\"Statement\":[{\"Effect\":\"Allow\",\"Principal\":{\"Federated\":[\""+oidc_response["OpenIDConnectProviderArn"]+"\"]},\"Action\":[\"sts:AssumeRoleWithWebIdentity\",\"sts:TagSession\"],\"Condition\":{\"StringEquals\":{\"aws:RequestTag/Department\":\"Engineering\"}}}]}" + (role_error,role_response,general_role_name)=create_role(iam_client,'/',None,policy_document,None,None,None) + eq(role_response['Role']['Arn'],'arn:aws:iam:::role/'+general_role_name+'') + + role_policy = "{\"Version\":\"2012-10-17\",\"Statement\":{\"Effect\":\"Allow\",\"Action\":\"s3:*\",\"Resource\":\"arn:aws:s3:::*\",\"Condition\":{\"ForAllValues:StringEquals\":{\"aws:PrincipalTag/Department\":[\"Engineering\",\"Marketing\"]}}}}" + (role_err,response)=put_role_policy(iam_client,general_role_name,None,role_policy) + eq(response['ResponseMetadata']['HTTPStatusCode'],200) + + resp=sts_client.assume_role_with_web_identity(RoleArn=role_response['Role']['Arn'],RoleSessionName=role_session_name,WebIdentityToken=user_token) + eq(resp['ResponseMetadata']['HTTPStatusCode'],200) + + s3_client = boto3.client('s3', + aws_access_key_id = resp['Credentials']['AccessKeyId'], + aws_secret_access_key = resp['Credentials']['SecretAccessKey'], + aws_session_token = resp['Credentials']['SessionToken'], + endpoint_url=default_endpoint, + region_name='', + ) + bucket_name = get_new_bucket_name() + s3bucket = s3_client.create_bucket(Bucket=bucket_name) + eq(s3bucket['ResponseMetadata']['HTTPStatusCode'],200) + bkt = s3_client.delete_bucket(Bucket=bucket_name) + eq(bkt['ResponseMetadata']['HTTPStatusCode'],204) + + oidc_remove=iam_client.delete_open_id_connect_provider( + OpenIDConnectProviderArn=oidc_response["OpenIDConnectProviderArn"] + ) + +@attr(resource='assume role with web identity') +@attr(method='get') +@attr(operation='check') +@attr(assertion='assuming role using web token with aws:PrincipalTag in role policy') +@attr('webidentity_test') +@attr('token_principal_tag_role_policy_test') +def test_assume_role_with_web_identity_for_all_values_deny(): + check_webidentity() + iam_client=get_iam_client() + sts_client=get_sts_client() + default_endpoint=get_config_endpoint() + role_session_name=get_parameter_name() + thumbprint=get_thumbprint() + user_token=get_user_token() + realm=get_realm_name() + + oidc_response = iam_client.create_open_id_connect_provider( + Url='http://localhost:8080/auth/realms/{}'.format(realm), + ThumbprintList=[ + thumbprint, + ], + ) + + policy_document = "{\"Version\":\"2012-10-17\",\"Statement\":[{\"Effect\":\"Allow\",\"Principal\":{\"Federated\":[\""+oidc_response["OpenIDConnectProviderArn"]+"\"]},\"Action\":[\"sts:AssumeRoleWithWebIdentity\",\"sts:TagSession\"],\"Condition\":{\"StringEquals\":{\"aws:RequestTag/Department\":\"Engineering\"}}}]}" + (role_error,role_response,general_role_name)=create_role(iam_client,'/',None,policy_document,None,None,None) + eq(role_response['Role']['Arn'],'arn:aws:iam:::role/'+general_role_name+'') + + #ForAllValues: The condition returns true if every key value in the request matches at least one value in the policy + role_policy = "{\"Version\":\"2012-10-17\",\"Statement\":{\"Effect\":\"Allow\",\"Action\":\"s3:*\",\"Resource\":\"arn:aws:s3:::*\",\"Condition\":{\"ForAllValues:StringEquals\":{\"aws:PrincipalTag/Department\":[\"Engineering\"]}}}}" + (role_err,response)=put_role_policy(iam_client,general_role_name,None,role_policy) + eq(response['ResponseMetadata']['HTTPStatusCode'],200) + + resp=sts_client.assume_role_with_web_identity(RoleArn=role_response['Role']['Arn'],RoleSessionName=role_session_name,WebIdentityToken=user_token) + eq(resp['ResponseMetadata']['HTTPStatusCode'],200) + + s3_client = boto3.client('s3', + aws_access_key_id = resp['Credentials']['AccessKeyId'], + aws_secret_access_key = resp['Credentials']['SecretAccessKey'], + aws_session_token = resp['Credentials']['SessionToken'], + endpoint_url=default_endpoint, + region_name='', + ) + bucket_name = get_new_bucket_name() + try: + s3bucket = s3_client.create_bucket(Bucket=bucket_name) + except ClientError as e: + s3bucket_error = e.response.get("Error", {}).get("Code") + eq(s3bucket_error,'AccessDenied') + + oidc_remove=iam_client.delete_open_id_connect_provider( + OpenIDConnectProviderArn=oidc_response["OpenIDConnectProviderArn"] + ) +