mirror of
https://github.com/ceph/s3-tests.git
synced 2025-01-06 03:51:37 +00:00
rgw/sts: adding test for aws:PrincipalTag in role permission
policy.
Signed-off-by: Pritha Srivastava <prsrivas@redhat.com>
(cherry picked from commit 86fecf83b9
)
This commit is contained in:
parent
95154bf0ce
commit
47fff36c36
1 changed files with 155 additions and 0 deletions
|
@ -1473,3 +1473,158 @@ def test_assume_role_with_web_identity_with_request_tag():
|
|||
OpenIDConnectProviderArn=oidc_response["OpenIDConnectProviderArn"]
|
||||
)
|
||||
|
||||
@attr(resource='assume role with web identity')
|
||||
@attr(method='get')
|
||||
@attr(operation='check')
|
||||
@attr(assertion='assuming role using web token with aws:PrincipalTag in role policy')
|
||||
@attr('webidentity_test')
|
||||
@attr('token_principal_tag_role_policy_test')
|
||||
def test_assume_role_with_web_identity_with_principal_tag():
|
||||
check_webidentity()
|
||||
iam_client=get_iam_client()
|
||||
sts_client=get_sts_client()
|
||||
default_endpoint=get_config_endpoint()
|
||||
role_session_name=get_parameter_name()
|
||||
thumbprint=get_thumbprint()
|
||||
user_token=get_user_token()
|
||||
realm=get_realm_name()
|
||||
|
||||
oidc_response = iam_client.create_open_id_connect_provider(
|
||||
Url='http://localhost:8080/auth/realms/{}'.format(realm),
|
||||
ThumbprintList=[
|
||||
thumbprint,
|
||||
],
|
||||
)
|
||||
|
||||
policy_document = "{\"Version\":\"2012-10-17\",\"Statement\":[{\"Effect\":\"Allow\",\"Principal\":{\"Federated\":[\""+oidc_response["OpenIDConnectProviderArn"]+"\"]},\"Action\":[\"sts:AssumeRoleWithWebIdentity\",\"sts:TagSession\"],\"Condition\":{\"StringEquals\":{\"aws:RequestTag/Department\":\"Engineering\"}}}]}"
|
||||
(role_error,role_response,general_role_name)=create_role(iam_client,'/',None,policy_document,None,None,None)
|
||||
eq(role_response['Role']['Arn'],'arn:aws:iam:::role/'+general_role_name+'')
|
||||
|
||||
role_policy = "{\"Version\":\"2012-10-17\",\"Statement\":{\"Effect\":\"Allow\",\"Action\":\"s3:*\",\"Resource\":\"arn:aws:s3:::*\",\"Condition\":{\"StringEquals\":{\"aws:PrincipalTag/Department\":\"Engineering\"}}}}"
|
||||
(role_err,response)=put_role_policy(iam_client,general_role_name,None,role_policy)
|
||||
eq(response['ResponseMetadata']['HTTPStatusCode'],200)
|
||||
|
||||
resp=sts_client.assume_role_with_web_identity(RoleArn=role_response['Role']['Arn'],RoleSessionName=role_session_name,WebIdentityToken=user_token)
|
||||
eq(resp['ResponseMetadata']['HTTPStatusCode'],200)
|
||||
|
||||
s3_client = boto3.client('s3',
|
||||
aws_access_key_id = resp['Credentials']['AccessKeyId'],
|
||||
aws_secret_access_key = resp['Credentials']['SecretAccessKey'],
|
||||
aws_session_token = resp['Credentials']['SessionToken'],
|
||||
endpoint_url=default_endpoint,
|
||||
region_name='',
|
||||
)
|
||||
bucket_name = get_new_bucket_name()
|
||||
s3bucket = s3_client.create_bucket(Bucket=bucket_name)
|
||||
eq(s3bucket['ResponseMetadata']['HTTPStatusCode'],200)
|
||||
bkt = s3_client.delete_bucket(Bucket=bucket_name)
|
||||
eq(bkt['ResponseMetadata']['HTTPStatusCode'],204)
|
||||
|
||||
oidc_remove=iam_client.delete_open_id_connect_provider(
|
||||
OpenIDConnectProviderArn=oidc_response["OpenIDConnectProviderArn"]
|
||||
)
|
||||
|
||||
@attr(resource='assume role with web identity')
|
||||
@attr(method='get')
|
||||
@attr(operation='check')
|
||||
@attr(assertion='assuming role using web token with aws:PrincipalTag in role policy')
|
||||
@attr('webidentity_test')
|
||||
@attr('token_principal_tag_role_policy_test')
|
||||
def test_assume_role_with_web_identity_for_all_values():
|
||||
check_webidentity()
|
||||
iam_client=get_iam_client()
|
||||
sts_client=get_sts_client()
|
||||
default_endpoint=get_config_endpoint()
|
||||
role_session_name=get_parameter_name()
|
||||
thumbprint=get_thumbprint()
|
||||
user_token=get_user_token()
|
||||
realm=get_realm_name()
|
||||
|
||||
oidc_response = iam_client.create_open_id_connect_provider(
|
||||
Url='http://localhost:8080/auth/realms/{}'.format(realm),
|
||||
ThumbprintList=[
|
||||
thumbprint,
|
||||
],
|
||||
)
|
||||
|
||||
policy_document = "{\"Version\":\"2012-10-17\",\"Statement\":[{\"Effect\":\"Allow\",\"Principal\":{\"Federated\":[\""+oidc_response["OpenIDConnectProviderArn"]+"\"]},\"Action\":[\"sts:AssumeRoleWithWebIdentity\",\"sts:TagSession\"],\"Condition\":{\"StringEquals\":{\"aws:RequestTag/Department\":\"Engineering\"}}}]}"
|
||||
(role_error,role_response,general_role_name)=create_role(iam_client,'/',None,policy_document,None,None,None)
|
||||
eq(role_response['Role']['Arn'],'arn:aws:iam:::role/'+general_role_name+'')
|
||||
|
||||
role_policy = "{\"Version\":\"2012-10-17\",\"Statement\":{\"Effect\":\"Allow\",\"Action\":\"s3:*\",\"Resource\":\"arn:aws:s3:::*\",\"Condition\":{\"ForAllValues:StringEquals\":{\"aws:PrincipalTag/Department\":[\"Engineering\",\"Marketing\"]}}}}"
|
||||
(role_err,response)=put_role_policy(iam_client,general_role_name,None,role_policy)
|
||||
eq(response['ResponseMetadata']['HTTPStatusCode'],200)
|
||||
|
||||
resp=sts_client.assume_role_with_web_identity(RoleArn=role_response['Role']['Arn'],RoleSessionName=role_session_name,WebIdentityToken=user_token)
|
||||
eq(resp['ResponseMetadata']['HTTPStatusCode'],200)
|
||||
|
||||
s3_client = boto3.client('s3',
|
||||
aws_access_key_id = resp['Credentials']['AccessKeyId'],
|
||||
aws_secret_access_key = resp['Credentials']['SecretAccessKey'],
|
||||
aws_session_token = resp['Credentials']['SessionToken'],
|
||||
endpoint_url=default_endpoint,
|
||||
region_name='',
|
||||
)
|
||||
bucket_name = get_new_bucket_name()
|
||||
s3bucket = s3_client.create_bucket(Bucket=bucket_name)
|
||||
eq(s3bucket['ResponseMetadata']['HTTPStatusCode'],200)
|
||||
bkt = s3_client.delete_bucket(Bucket=bucket_name)
|
||||
eq(bkt['ResponseMetadata']['HTTPStatusCode'],204)
|
||||
|
||||
oidc_remove=iam_client.delete_open_id_connect_provider(
|
||||
OpenIDConnectProviderArn=oidc_response["OpenIDConnectProviderArn"]
|
||||
)
|
||||
|
||||
@attr(resource='assume role with web identity')
|
||||
@attr(method='get')
|
||||
@attr(operation='check')
|
||||
@attr(assertion='assuming role using web token with aws:PrincipalTag in role policy')
|
||||
@attr('webidentity_test')
|
||||
@attr('token_principal_tag_role_policy_test')
|
||||
def test_assume_role_with_web_identity_for_all_values_deny():
|
||||
check_webidentity()
|
||||
iam_client=get_iam_client()
|
||||
sts_client=get_sts_client()
|
||||
default_endpoint=get_config_endpoint()
|
||||
role_session_name=get_parameter_name()
|
||||
thumbprint=get_thumbprint()
|
||||
user_token=get_user_token()
|
||||
realm=get_realm_name()
|
||||
|
||||
oidc_response = iam_client.create_open_id_connect_provider(
|
||||
Url='http://localhost:8080/auth/realms/{}'.format(realm),
|
||||
ThumbprintList=[
|
||||
thumbprint,
|
||||
],
|
||||
)
|
||||
|
||||
policy_document = "{\"Version\":\"2012-10-17\",\"Statement\":[{\"Effect\":\"Allow\",\"Principal\":{\"Federated\":[\""+oidc_response["OpenIDConnectProviderArn"]+"\"]},\"Action\":[\"sts:AssumeRoleWithWebIdentity\",\"sts:TagSession\"],\"Condition\":{\"StringEquals\":{\"aws:RequestTag/Department\":\"Engineering\"}}}]}"
|
||||
(role_error,role_response,general_role_name)=create_role(iam_client,'/',None,policy_document,None,None,None)
|
||||
eq(role_response['Role']['Arn'],'arn:aws:iam:::role/'+general_role_name+'')
|
||||
|
||||
#ForAllValues: The condition returns true if every key value in the request matches at least one value in the policy
|
||||
role_policy = "{\"Version\":\"2012-10-17\",\"Statement\":{\"Effect\":\"Allow\",\"Action\":\"s3:*\",\"Resource\":\"arn:aws:s3:::*\",\"Condition\":{\"ForAllValues:StringEquals\":{\"aws:PrincipalTag/Department\":[\"Engineering\"]}}}}"
|
||||
(role_err,response)=put_role_policy(iam_client,general_role_name,None,role_policy)
|
||||
eq(response['ResponseMetadata']['HTTPStatusCode'],200)
|
||||
|
||||
resp=sts_client.assume_role_with_web_identity(RoleArn=role_response['Role']['Arn'],RoleSessionName=role_session_name,WebIdentityToken=user_token)
|
||||
eq(resp['ResponseMetadata']['HTTPStatusCode'],200)
|
||||
|
||||
s3_client = boto3.client('s3',
|
||||
aws_access_key_id = resp['Credentials']['AccessKeyId'],
|
||||
aws_secret_access_key = resp['Credentials']['SecretAccessKey'],
|
||||
aws_session_token = resp['Credentials']['SessionToken'],
|
||||
endpoint_url=default_endpoint,
|
||||
region_name='',
|
||||
)
|
||||
bucket_name = get_new_bucket_name()
|
||||
try:
|
||||
s3bucket = s3_client.create_bucket(Bucket=bucket_name)
|
||||
except ClientError as e:
|
||||
s3bucket_error = e.response.get("Error", {}).get("Code")
|
||||
eq(s3bucket_error,'AccessDenied')
|
||||
|
||||
oidc_remove=iam_client.delete_open_id_connect_provider(
|
||||
OpenIDConnectProviderArn=oidc_response["OpenIDConnectProviderArn"]
|
||||
)
|
||||
|
||||
|
|
Loading…
Reference in a new issue