mirror of
https://github.com/ceph/s3-tests.git
synced 2024-12-23 03:41:35 +00:00
Merge pull request #403 from pritha-srivastava/wip-rgw-sts-abac
rgw sts abac tests
This commit is contained in:
commit
4a89a9a5b2
3 changed files with 877 additions and 3 deletions
|
@ -98,6 +98,12 @@ token=<access_token>
|
|||
|
||||
aud=<obtained after introspecting token>
|
||||
|
||||
sub=<obtained after introspecting token>
|
||||
|
||||
azp=<obtained after introspecting token>
|
||||
|
||||
user_token=<access token for a user, with attribute Department=[Engineering, Marketing>]
|
||||
|
||||
thumbprint=<obtained from x509 certificate>
|
||||
|
||||
KC_REALM=<name of the realm>
|
||||
|
|
|
@ -294,6 +294,9 @@ def check_webidentity():
|
|||
config.webidentity_aud = cfg.get('webidentity', "aud")
|
||||
config.webidentity_token = cfg.get('webidentity', "token")
|
||||
config.webidentity_realm = cfg.get('webidentity', "KC_REALM")
|
||||
config.webidentity_sub = cfg.get('webidentity', "sub")
|
||||
config.webidentity_azp = cfg.get('webidentity', "azp")
|
||||
config.webidentity_user_token = cfg.get('webidentity', "user_token")
|
||||
|
||||
def get_client(client_config=None):
|
||||
if client_config == None:
|
||||
|
@ -575,6 +578,12 @@ def get_thumbprint():
|
|||
def get_aud():
|
||||
return config.webidentity_aud
|
||||
|
||||
def get_sub():
|
||||
return config.webidentity_sub
|
||||
|
||||
def get_azp():
|
||||
return config.webidentity_azp
|
||||
|
||||
def get_token():
|
||||
return config.webidentity_token
|
||||
|
||||
|
@ -586,3 +595,6 @@ def get_iam_access_key():
|
|||
|
||||
def get_iam_secret_key():
|
||||
return config.iam_secret_key
|
||||
|
||||
def get_user_token():
|
||||
return config.webidentity_user_token
|
||||
|
|
|
@ -47,17 +47,22 @@ from . import(
|
|||
get_realm_name,
|
||||
check_webidentity,
|
||||
get_iam_access_key,
|
||||
get_iam_secret_key
|
||||
get_iam_secret_key,
|
||||
get_sub,
|
||||
get_azp,
|
||||
get_user_token
|
||||
)
|
||||
|
||||
log = logging.getLogger(__name__)
|
||||
|
||||
def create_role(iam_client,path,rolename,policy_document,description,sessionduration,permissionboundary):
|
||||
def create_role(iam_client,path,rolename,policy_document,description,sessionduration,permissionboundary,tag_list=None):
|
||||
role_err=None
|
||||
if rolename is None:
|
||||
rolename=get_parameter_name()
|
||||
if tag_list is None:
|
||||
tag_list = []
|
||||
try:
|
||||
role_response = iam_client.create_role(Path=path,RoleName=rolename,AssumeRolePolicyDocument=policy_document,)
|
||||
role_response = iam_client.create_role(Path=path,RoleName=rolename,AssumeRolePolicyDocument=policy_document,Tags=tag_list)
|
||||
except ClientError as e:
|
||||
role_err = e.response['Code']
|
||||
return (role_err,role_response,rolename)
|
||||
|
@ -130,6 +135,20 @@ def create_oidc_provider(iam_client, url, clientidlist, thumbprintlist):
|
|||
oidc_arn = None
|
||||
return (oidc_arn, oidc_error)
|
||||
|
||||
def get_s3_resource_using_iam_creds():
|
||||
iam_access_key = get_iam_access_key()
|
||||
iam_secret_key = get_iam_secret_key()
|
||||
default_endpoint = get_config_endpoint()
|
||||
|
||||
s3_res_iam_creds = boto3.resource('s3',
|
||||
aws_access_key_id = iam_access_key,
|
||||
aws_secret_access_key = iam_secret_key,
|
||||
endpoint_url=default_endpoint,
|
||||
region_name='',
|
||||
)
|
||||
|
||||
return s3_res_iam_creds
|
||||
|
||||
@attr(resource='get session token')
|
||||
@attr(method='get')
|
||||
@attr(operation='check')
|
||||
|
@ -395,6 +414,7 @@ def test_assume_role_allow_head_nonexistent():
|
|||
@attr(operation='check')
|
||||
@attr(assertion='assuming role through web token')
|
||||
@attr('webidentity_test')
|
||||
@attr('token_claims_trust_policy_test')
|
||||
def test_assume_role_with_web_identity():
|
||||
check_webidentity()
|
||||
iam_client=get_iam_client()
|
||||
|
@ -1313,3 +1333,839 @@ def test_session_policy_bucket_policy_deny():
|
|||
oidc_remove=iam_client.delete_open_id_connect_provider(
|
||||
OpenIDConnectProviderArn=oidc_arn
|
||||
)
|
||||
|
||||
@attr(resource='assume role with web identity')
|
||||
@attr(method='get')
|
||||
@attr(operation='check')
|
||||
@attr(assertion='assuming role using web token using sub in trust policy')
|
||||
@attr('webidentity_test')
|
||||
@attr('token_claims_trust_policy_test')
|
||||
def test_assume_role_with_web_identity_with_sub():
|
||||
check_webidentity()
|
||||
iam_client=get_iam_client()
|
||||
sts_client=get_sts_client()
|
||||
default_endpoint=get_config_endpoint()
|
||||
role_session_name=get_parameter_name()
|
||||
thumbprint=get_thumbprint()
|
||||
sub=get_sub()
|
||||
token=get_token()
|
||||
realm=get_realm_name()
|
||||
|
||||
oidc_response = iam_client.create_open_id_connect_provider(
|
||||
Url='http://localhost:8080/auth/realms/{}'.format(realm),
|
||||
ThumbprintList=[
|
||||
thumbprint,
|
||||
],
|
||||
)
|
||||
|
||||
policy_document = "{\"Version\":\"2012-10-17\",\"Statement\":[{\"Effect\":\"Allow\",\"Principal\":{\"Federated\":[\""+oidc_response["OpenIDConnectProviderArn"]+"\"]},\"Action\":[\"sts:AssumeRoleWithWebIdentity\"],\"Condition\":{\"StringEquals\":{\"localhost:8080/auth/realms/"+realm+":sub\":\""+sub+"\"}}}]}"
|
||||
(role_error,role_response,general_role_name)=create_role(iam_client,'/',None,policy_document,None,None,None)
|
||||
eq(role_response['Role']['Arn'],'arn:aws:iam:::role/'+general_role_name+'')
|
||||
|
||||
role_policy = "{\"Version\":\"2012-10-17\",\"Statement\":{\"Effect\":\"Allow\",\"Action\":\"s3:*\",\"Resource\":\"arn:aws:s3:::*\"}}"
|
||||
(role_err,response)=put_role_policy(iam_client,general_role_name,None,role_policy)
|
||||
eq(response['ResponseMetadata']['HTTPStatusCode'],200)
|
||||
|
||||
resp=sts_client.assume_role_with_web_identity(RoleArn=role_response['Role']['Arn'],RoleSessionName=role_session_name,WebIdentityToken=token)
|
||||
eq(resp['ResponseMetadata']['HTTPStatusCode'],200)
|
||||
|
||||
s3_client = boto3.client('s3',
|
||||
aws_access_key_id = resp['Credentials']['AccessKeyId'],
|
||||
aws_secret_access_key = resp['Credentials']['SecretAccessKey'],
|
||||
aws_session_token = resp['Credentials']['SessionToken'],
|
||||
endpoint_url=default_endpoint,
|
||||
region_name='',
|
||||
)
|
||||
bucket_name = get_new_bucket_name()
|
||||
s3bucket = s3_client.create_bucket(Bucket=bucket_name)
|
||||
eq(s3bucket['ResponseMetadata']['HTTPStatusCode'],200)
|
||||
bkt = s3_client.delete_bucket(Bucket=bucket_name)
|
||||
eq(bkt['ResponseMetadata']['HTTPStatusCode'],204)
|
||||
|
||||
oidc_remove=iam_client.delete_open_id_connect_provider(
|
||||
OpenIDConnectProviderArn=oidc_response["OpenIDConnectProviderArn"]
|
||||
)
|
||||
|
||||
@attr(resource='assume role with web identity')
|
||||
@attr(method='get')
|
||||
@attr(operation='check')
|
||||
@attr(assertion='assuming role using web token using azp in trust policy')
|
||||
@attr('webidentity_test')
|
||||
@attr('token_claims_trust_policy_test')
|
||||
def test_assume_role_with_web_identity_with_azp():
|
||||
check_webidentity()
|
||||
iam_client=get_iam_client()
|
||||
sts_client=get_sts_client()
|
||||
default_endpoint=get_config_endpoint()
|
||||
role_session_name=get_parameter_name()
|
||||
thumbprint=get_thumbprint()
|
||||
azp=get_azp()
|
||||
token=get_token()
|
||||
realm=get_realm_name()
|
||||
|
||||
oidc_response = iam_client.create_open_id_connect_provider(
|
||||
Url='http://localhost:8080/auth/realms/{}'.format(realm),
|
||||
ThumbprintList=[
|
||||
thumbprint,
|
||||
],
|
||||
)
|
||||
|
||||
policy_document = "{\"Version\":\"2012-10-17\",\"Statement\":[{\"Effect\":\"Allow\",\"Principal\":{\"Federated\":[\""+oidc_response["OpenIDConnectProviderArn"]+"\"]},\"Action\":[\"sts:AssumeRoleWithWebIdentity\"],\"Condition\":{\"StringEquals\":{\"localhost:8080/auth/realms/"+realm+":azp\":\""+azp+"\"}}}]}"
|
||||
(role_error,role_response,general_role_name)=create_role(iam_client,'/',None,policy_document,None,None,None)
|
||||
eq(role_response['Role']['Arn'],'arn:aws:iam:::role/'+general_role_name+'')
|
||||
|
||||
role_policy = "{\"Version\":\"2012-10-17\",\"Statement\":{\"Effect\":\"Allow\",\"Action\":\"s3:*\",\"Resource\":\"arn:aws:s3:::*\"}}"
|
||||
(role_err,response)=put_role_policy(iam_client,general_role_name,None,role_policy)
|
||||
eq(response['ResponseMetadata']['HTTPStatusCode'],200)
|
||||
|
||||
resp=sts_client.assume_role_with_web_identity(RoleArn=role_response['Role']['Arn'],RoleSessionName=role_session_name,WebIdentityToken=token)
|
||||
eq(resp['ResponseMetadata']['HTTPStatusCode'],200)
|
||||
|
||||
s3_client = boto3.client('s3',
|
||||
aws_access_key_id = resp['Credentials']['AccessKeyId'],
|
||||
aws_secret_access_key = resp['Credentials']['SecretAccessKey'],
|
||||
aws_session_token = resp['Credentials']['SessionToken'],
|
||||
endpoint_url=default_endpoint,
|
||||
region_name='',
|
||||
)
|
||||
bucket_name = get_new_bucket_name()
|
||||
s3bucket = s3_client.create_bucket(Bucket=bucket_name)
|
||||
eq(s3bucket['ResponseMetadata']['HTTPStatusCode'],200)
|
||||
bkt = s3_client.delete_bucket(Bucket=bucket_name)
|
||||
eq(bkt['ResponseMetadata']['HTTPStatusCode'],204)
|
||||
|
||||
oidc_remove=iam_client.delete_open_id_connect_provider(
|
||||
OpenIDConnectProviderArn=oidc_response["OpenIDConnectProviderArn"]
|
||||
)
|
||||
|
||||
@attr(resource='assume role with web identity')
|
||||
@attr(method='get')
|
||||
@attr(operation='check')
|
||||
@attr(assertion='assuming role using web token using aws:RequestTag in trust policy')
|
||||
@attr('webidentity_test')
|
||||
@attr('abac_test')
|
||||
@attr('token_request_tag_trust_policy_test')
|
||||
def test_assume_role_with_web_identity_with_request_tag():
|
||||
check_webidentity()
|
||||
iam_client=get_iam_client()
|
||||
sts_client=get_sts_client()
|
||||
default_endpoint=get_config_endpoint()
|
||||
role_session_name=get_parameter_name()
|
||||
thumbprint=get_thumbprint()
|
||||
user_token=get_user_token()
|
||||
realm=get_realm_name()
|
||||
|
||||
oidc_response = iam_client.create_open_id_connect_provider(
|
||||
Url='http://localhost:8080/auth/realms/{}'.format(realm),
|
||||
ThumbprintList=[
|
||||
thumbprint,
|
||||
],
|
||||
)
|
||||
|
||||
policy_document = "{\"Version\":\"2012-10-17\",\"Statement\":[{\"Effect\":\"Allow\",\"Principal\":{\"Federated\":[\""+oidc_response["OpenIDConnectProviderArn"]+"\"]},\"Action\":[\"sts:AssumeRoleWithWebIdentity\",\"sts:TagSession\"],\"Condition\":{\"StringEquals\":{\"aws:RequestTag/Department\":\"Engineering\"}}}]}"
|
||||
(role_error,role_response,general_role_name)=create_role(iam_client,'/',None,policy_document,None,None,None)
|
||||
eq(role_response['Role']['Arn'],'arn:aws:iam:::role/'+general_role_name+'')
|
||||
|
||||
role_policy = "{\"Version\":\"2012-10-17\",\"Statement\":{\"Effect\":\"Allow\",\"Action\":\"s3:*\",\"Resource\":\"arn:aws:s3:::*\"}}"
|
||||
(role_err,response)=put_role_policy(iam_client,general_role_name,None,role_policy)
|
||||
eq(response['ResponseMetadata']['HTTPStatusCode'],200)
|
||||
|
||||
resp=sts_client.assume_role_with_web_identity(RoleArn=role_response['Role']['Arn'],RoleSessionName=role_session_name,WebIdentityToken=user_token)
|
||||
eq(resp['ResponseMetadata']['HTTPStatusCode'],200)
|
||||
|
||||
s3_client = boto3.client('s3',
|
||||
aws_access_key_id = resp['Credentials']['AccessKeyId'],
|
||||
aws_secret_access_key = resp['Credentials']['SecretAccessKey'],
|
||||
aws_session_token = resp['Credentials']['SessionToken'],
|
||||
endpoint_url=default_endpoint,
|
||||
region_name='',
|
||||
)
|
||||
bucket_name = get_new_bucket_name()
|
||||
s3bucket = s3_client.create_bucket(Bucket=bucket_name)
|
||||
eq(s3bucket['ResponseMetadata']['HTTPStatusCode'],200)
|
||||
bkt = s3_client.delete_bucket(Bucket=bucket_name)
|
||||
eq(bkt['ResponseMetadata']['HTTPStatusCode'],204)
|
||||
|
||||
oidc_remove=iam_client.delete_open_id_connect_provider(
|
||||
OpenIDConnectProviderArn=oidc_response["OpenIDConnectProviderArn"]
|
||||
)
|
||||
|
||||
@attr(resource='assume role with web identity')
|
||||
@attr(method='get')
|
||||
@attr(operation='check')
|
||||
@attr(assertion='assuming role using web token with aws:PrincipalTag in role policy')
|
||||
@attr('webidentity_test')
|
||||
@attr('abac_test')
|
||||
@attr('token_principal_tag_role_policy_test')
|
||||
def test_assume_role_with_web_identity_with_principal_tag():
|
||||
check_webidentity()
|
||||
iam_client=get_iam_client()
|
||||
sts_client=get_sts_client()
|
||||
default_endpoint=get_config_endpoint()
|
||||
role_session_name=get_parameter_name()
|
||||
thumbprint=get_thumbprint()
|
||||
user_token=get_user_token()
|
||||
realm=get_realm_name()
|
||||
|
||||
oidc_response = iam_client.create_open_id_connect_provider(
|
||||
Url='http://localhost:8080/auth/realms/{}'.format(realm),
|
||||
ThumbprintList=[
|
||||
thumbprint,
|
||||
],
|
||||
)
|
||||
|
||||
policy_document = "{\"Version\":\"2012-10-17\",\"Statement\":[{\"Effect\":\"Allow\",\"Principal\":{\"Federated\":[\""+oidc_response["OpenIDConnectProviderArn"]+"\"]},\"Action\":[\"sts:AssumeRoleWithWebIdentity\",\"sts:TagSession\"],\"Condition\":{\"StringEquals\":{\"aws:RequestTag/Department\":\"Engineering\"}}}]}"
|
||||
(role_error,role_response,general_role_name)=create_role(iam_client,'/',None,policy_document,None,None,None)
|
||||
eq(role_response['Role']['Arn'],'arn:aws:iam:::role/'+general_role_name+'')
|
||||
|
||||
role_policy = "{\"Version\":\"2012-10-17\",\"Statement\":{\"Effect\":\"Allow\",\"Action\":\"s3:*\",\"Resource\":\"arn:aws:s3:::*\",\"Condition\":{\"StringEquals\":{\"aws:PrincipalTag/Department\":\"Engineering\"}}}}"
|
||||
(role_err,response)=put_role_policy(iam_client,general_role_name,None,role_policy)
|
||||
eq(response['ResponseMetadata']['HTTPStatusCode'],200)
|
||||
|
||||
resp=sts_client.assume_role_with_web_identity(RoleArn=role_response['Role']['Arn'],RoleSessionName=role_session_name,WebIdentityToken=user_token)
|
||||
eq(resp['ResponseMetadata']['HTTPStatusCode'],200)
|
||||
|
||||
s3_client = boto3.client('s3',
|
||||
aws_access_key_id = resp['Credentials']['AccessKeyId'],
|
||||
aws_secret_access_key = resp['Credentials']['SecretAccessKey'],
|
||||
aws_session_token = resp['Credentials']['SessionToken'],
|
||||
endpoint_url=default_endpoint,
|
||||
region_name='',
|
||||
)
|
||||
bucket_name = get_new_bucket_name()
|
||||
s3bucket = s3_client.create_bucket(Bucket=bucket_name)
|
||||
eq(s3bucket['ResponseMetadata']['HTTPStatusCode'],200)
|
||||
bkt = s3_client.delete_bucket(Bucket=bucket_name)
|
||||
eq(bkt['ResponseMetadata']['HTTPStatusCode'],204)
|
||||
|
||||
oidc_remove=iam_client.delete_open_id_connect_provider(
|
||||
OpenIDConnectProviderArn=oidc_response["OpenIDConnectProviderArn"]
|
||||
)
|
||||
|
||||
@attr(resource='assume role with web identity')
|
||||
@attr(method='get')
|
||||
@attr(operation='check')
|
||||
@attr(assertion='assuming role using web token with aws:PrincipalTag in role policy')
|
||||
@attr('webidentity_test')
|
||||
@attr('abac_test')
|
||||
@attr('token_principal_tag_role_policy_test')
|
||||
def test_assume_role_with_web_identity_for_all_values():
|
||||
check_webidentity()
|
||||
iam_client=get_iam_client()
|
||||
sts_client=get_sts_client()
|
||||
default_endpoint=get_config_endpoint()
|
||||
role_session_name=get_parameter_name()
|
||||
thumbprint=get_thumbprint()
|
||||
user_token=get_user_token()
|
||||
realm=get_realm_name()
|
||||
|
||||
oidc_response = iam_client.create_open_id_connect_provider(
|
||||
Url='http://localhost:8080/auth/realms/{}'.format(realm),
|
||||
ThumbprintList=[
|
||||
thumbprint,
|
||||
],
|
||||
)
|
||||
|
||||
policy_document = "{\"Version\":\"2012-10-17\",\"Statement\":[{\"Effect\":\"Allow\",\"Principal\":{\"Federated\":[\""+oidc_response["OpenIDConnectProviderArn"]+"\"]},\"Action\":[\"sts:AssumeRoleWithWebIdentity\",\"sts:TagSession\"],\"Condition\":{\"StringEquals\":{\"aws:RequestTag/Department\":\"Engineering\"}}}]}"
|
||||
(role_error,role_response,general_role_name)=create_role(iam_client,'/',None,policy_document,None,None,None)
|
||||
eq(role_response['Role']['Arn'],'arn:aws:iam:::role/'+general_role_name+'')
|
||||
|
||||
role_policy = "{\"Version\":\"2012-10-17\",\"Statement\":{\"Effect\":\"Allow\",\"Action\":\"s3:*\",\"Resource\":\"arn:aws:s3:::*\",\"Condition\":{\"ForAllValues:StringEquals\":{\"aws:PrincipalTag/Department\":[\"Engineering\",\"Marketing\"]}}}}"
|
||||
(role_err,response)=put_role_policy(iam_client,general_role_name,None,role_policy)
|
||||
eq(response['ResponseMetadata']['HTTPStatusCode'],200)
|
||||
|
||||
resp=sts_client.assume_role_with_web_identity(RoleArn=role_response['Role']['Arn'],RoleSessionName=role_session_name,WebIdentityToken=user_token)
|
||||
eq(resp['ResponseMetadata']['HTTPStatusCode'],200)
|
||||
|
||||
s3_client = boto3.client('s3',
|
||||
aws_access_key_id = resp['Credentials']['AccessKeyId'],
|
||||
aws_secret_access_key = resp['Credentials']['SecretAccessKey'],
|
||||
aws_session_token = resp['Credentials']['SessionToken'],
|
||||
endpoint_url=default_endpoint,
|
||||
region_name='',
|
||||
)
|
||||
bucket_name = get_new_bucket_name()
|
||||
s3bucket = s3_client.create_bucket(Bucket=bucket_name)
|
||||
eq(s3bucket['ResponseMetadata']['HTTPStatusCode'],200)
|
||||
bkt = s3_client.delete_bucket(Bucket=bucket_name)
|
||||
eq(bkt['ResponseMetadata']['HTTPStatusCode'],204)
|
||||
|
||||
oidc_remove=iam_client.delete_open_id_connect_provider(
|
||||
OpenIDConnectProviderArn=oidc_response["OpenIDConnectProviderArn"]
|
||||
)
|
||||
|
||||
@attr(resource='assume role with web identity')
|
||||
@attr(method='get')
|
||||
@attr(operation='check')
|
||||
@attr(assertion='assuming role using web token with aws:PrincipalTag in role policy')
|
||||
@attr('webidentity_test')
|
||||
@attr('abac_test')
|
||||
@attr('token_principal_tag_role_policy_test')
|
||||
def test_assume_role_with_web_identity_for_all_values_deny():
|
||||
check_webidentity()
|
||||
iam_client=get_iam_client()
|
||||
sts_client=get_sts_client()
|
||||
default_endpoint=get_config_endpoint()
|
||||
role_session_name=get_parameter_name()
|
||||
thumbprint=get_thumbprint()
|
||||
user_token=get_user_token()
|
||||
realm=get_realm_name()
|
||||
|
||||
oidc_response = iam_client.create_open_id_connect_provider(
|
||||
Url='http://localhost:8080/auth/realms/{}'.format(realm),
|
||||
ThumbprintList=[
|
||||
thumbprint,
|
||||
],
|
||||
)
|
||||
|
||||
policy_document = "{\"Version\":\"2012-10-17\",\"Statement\":[{\"Effect\":\"Allow\",\"Principal\":{\"Federated\":[\""+oidc_response["OpenIDConnectProviderArn"]+"\"]},\"Action\":[\"sts:AssumeRoleWithWebIdentity\",\"sts:TagSession\"],\"Condition\":{\"StringEquals\":{\"aws:RequestTag/Department\":\"Engineering\"}}}]}"
|
||||
(role_error,role_response,general_role_name)=create_role(iam_client,'/',None,policy_document,None,None,None)
|
||||
eq(role_response['Role']['Arn'],'arn:aws:iam:::role/'+general_role_name+'')
|
||||
|
||||
#ForAllValues: The condition returns true if every key value in the request matches at least one value in the policy
|
||||
role_policy = "{\"Version\":\"2012-10-17\",\"Statement\":{\"Effect\":\"Allow\",\"Action\":\"s3:*\",\"Resource\":\"arn:aws:s3:::*\",\"Condition\":{\"ForAllValues:StringEquals\":{\"aws:PrincipalTag/Department\":[\"Engineering\"]}}}}"
|
||||
(role_err,response)=put_role_policy(iam_client,general_role_name,None,role_policy)
|
||||
eq(response['ResponseMetadata']['HTTPStatusCode'],200)
|
||||
|
||||
resp=sts_client.assume_role_with_web_identity(RoleArn=role_response['Role']['Arn'],RoleSessionName=role_session_name,WebIdentityToken=user_token)
|
||||
eq(resp['ResponseMetadata']['HTTPStatusCode'],200)
|
||||
|
||||
s3_client = boto3.client('s3',
|
||||
aws_access_key_id = resp['Credentials']['AccessKeyId'],
|
||||
aws_secret_access_key = resp['Credentials']['SecretAccessKey'],
|
||||
aws_session_token = resp['Credentials']['SessionToken'],
|
||||
endpoint_url=default_endpoint,
|
||||
region_name='',
|
||||
)
|
||||
bucket_name = get_new_bucket_name()
|
||||
try:
|
||||
s3bucket = s3_client.create_bucket(Bucket=bucket_name)
|
||||
except ClientError as e:
|
||||
s3bucket_error = e.response.get("Error", {}).get("Code")
|
||||
eq(s3bucket_error,'AccessDenied')
|
||||
|
||||
oidc_remove=iam_client.delete_open_id_connect_provider(
|
||||
OpenIDConnectProviderArn=oidc_response["OpenIDConnectProviderArn"]
|
||||
)
|
||||
|
||||
@attr(resource='assume role with web identity')
|
||||
@attr(method='get')
|
||||
@attr(operation='check')
|
||||
@attr(assertion='assuming role using web token with aws:TagKeys in trust policy')
|
||||
@attr('webidentity_test')
|
||||
@attr('abac_test')
|
||||
@attr('token_tag_keys_test')
|
||||
def test_assume_role_with_web_identity_tag_keys_trust_policy():
|
||||
check_webidentity()
|
||||
iam_client=get_iam_client()
|
||||
sts_client=get_sts_client()
|
||||
default_endpoint=get_config_endpoint()
|
||||
role_session_name=get_parameter_name()
|
||||
thumbprint=get_thumbprint()
|
||||
user_token=get_user_token()
|
||||
realm=get_realm_name()
|
||||
|
||||
oidc_response = iam_client.create_open_id_connect_provider(
|
||||
Url='http://localhost:8080/auth/realms/{}'.format(realm),
|
||||
ThumbprintList=[
|
||||
thumbprint,
|
||||
],
|
||||
)
|
||||
|
||||
policy_document = "{\"Version\":\"2012-10-17\",\"Statement\":[{\"Effect\":\"Allow\",\"Principal\":{\"Federated\":[\""+oidc_response["OpenIDConnectProviderArn"]+"\"]},\"Action\":[\"sts:AssumeRoleWithWebIdentity\",\"sts:TagSession\"],\"Condition\":{\"StringEquals\":{\"aws:TagKeys\":\"Department\"}}}]}"
|
||||
(role_error,role_response,general_role_name)=create_role(iam_client,'/',None,policy_document,None,None,None)
|
||||
eq(role_response['Role']['Arn'],'arn:aws:iam:::role/'+general_role_name+'')
|
||||
|
||||
role_policy = "{\"Version\":\"2012-10-17\",\"Statement\":{\"Effect\":\"Allow\",\"Action\":\"s3:*\",\"Resource\":\"arn:aws:s3:::*\",\"Condition\":{\"ForAnyValue:StringEquals\":{\"aws:PrincipalTag/Department\":[\"Engineering\"]}}}}"
|
||||
(role_err,response)=put_role_policy(iam_client,general_role_name,None,role_policy)
|
||||
eq(response['ResponseMetadata']['HTTPStatusCode'],200)
|
||||
|
||||
resp=sts_client.assume_role_with_web_identity(RoleArn=role_response['Role']['Arn'],RoleSessionName=role_session_name,WebIdentityToken=user_token)
|
||||
eq(resp['ResponseMetadata']['HTTPStatusCode'],200)
|
||||
|
||||
s3_client = boto3.client('s3',
|
||||
aws_access_key_id = resp['Credentials']['AccessKeyId'],
|
||||
aws_secret_access_key = resp['Credentials']['SecretAccessKey'],
|
||||
aws_session_token = resp['Credentials']['SessionToken'],
|
||||
endpoint_url=default_endpoint,
|
||||
region_name='',
|
||||
)
|
||||
bucket_name = get_new_bucket_name()
|
||||
s3bucket = s3_client.create_bucket(Bucket=bucket_name)
|
||||
eq(s3bucket['ResponseMetadata']['HTTPStatusCode'],200)
|
||||
bkt = s3_client.delete_bucket(Bucket=bucket_name)
|
||||
eq(bkt['ResponseMetadata']['HTTPStatusCode'],204)
|
||||
|
||||
oidc_remove=iam_client.delete_open_id_connect_provider(
|
||||
OpenIDConnectProviderArn=oidc_response["OpenIDConnectProviderArn"]
|
||||
)
|
||||
|
||||
@attr(resource='assume role with web identity')
|
||||
@attr(method='get')
|
||||
@attr(operation='check')
|
||||
@attr(assertion='assuming role using web token with aws:TagKeys in role permission policy')
|
||||
@attr('webidentity_test')
|
||||
@attr('abac_test')
|
||||
@attr('token_tag_keys_test')
|
||||
def test_assume_role_with_web_identity_tag_keys_role_policy():
|
||||
check_webidentity()
|
||||
iam_client=get_iam_client()
|
||||
sts_client=get_sts_client()
|
||||
default_endpoint=get_config_endpoint()
|
||||
role_session_name=get_parameter_name()
|
||||
thumbprint=get_thumbprint()
|
||||
user_token=get_user_token()
|
||||
realm=get_realm_name()
|
||||
|
||||
oidc_response = iam_client.create_open_id_connect_provider(
|
||||
Url='http://localhost:8080/auth/realms/{}'.format(realm),
|
||||
ThumbprintList=[
|
||||
thumbprint,
|
||||
],
|
||||
)
|
||||
|
||||
policy_document = "{\"Version\":\"2012-10-17\",\"Statement\":[{\"Effect\":\"Allow\",\"Principal\":{\"Federated\":[\""+oidc_response["OpenIDConnectProviderArn"]+"\"]},\"Action\":[\"sts:AssumeRoleWithWebIdentity\",\"sts:TagSession\"],\"Condition\":{\"StringEquals\":{\"aws:RequestTag/Department\":\"Engineering\"}}}]}"
|
||||
(role_error,role_response,general_role_name)=create_role(iam_client,'/',None,policy_document,None,None,None)
|
||||
eq(role_response['Role']['Arn'],'arn:aws:iam:::role/'+general_role_name+'')
|
||||
|
||||
role_policy = "{\"Version\":\"2012-10-17\",\"Statement\":{\"Effect\":\"Allow\",\"Action\":\"s3:*\",\"Resource\":\"arn:aws:s3:::*\",\"Condition\":{\"StringEquals\":{\"aws:TagKeys\":[\"Department\"]}}}}"
|
||||
(role_err,response)=put_role_policy(iam_client,general_role_name,None,role_policy)
|
||||
eq(response['ResponseMetadata']['HTTPStatusCode'],200)
|
||||
|
||||
resp=sts_client.assume_role_with_web_identity(RoleArn=role_response['Role']['Arn'],RoleSessionName=role_session_name,WebIdentityToken=user_token)
|
||||
eq(resp['ResponseMetadata']['HTTPStatusCode'],200)
|
||||
|
||||
s3_client = boto3.client('s3',
|
||||
aws_access_key_id = resp['Credentials']['AccessKeyId'],
|
||||
aws_secret_access_key = resp['Credentials']['SecretAccessKey'],
|
||||
aws_session_token = resp['Credentials']['SessionToken'],
|
||||
endpoint_url=default_endpoint,
|
||||
region_name='',
|
||||
)
|
||||
bucket_name = get_new_bucket_name()
|
||||
s3bucket = s3_client.create_bucket(Bucket=bucket_name)
|
||||
eq(s3bucket['ResponseMetadata']['HTTPStatusCode'],200)
|
||||
bkt = s3_client.delete_bucket(Bucket=bucket_name)
|
||||
eq(bkt['ResponseMetadata']['HTTPStatusCode'],204)
|
||||
|
||||
oidc_remove=iam_client.delete_open_id_connect_provider(
|
||||
OpenIDConnectProviderArn=oidc_response["OpenIDConnectProviderArn"]
|
||||
)
|
||||
|
||||
@attr(resource='assume role with web identity')
|
||||
@attr(method='put')
|
||||
@attr(operation='check')
|
||||
@attr(assertion='assuming role using web token with s3:ResourceTag in role permission policy')
|
||||
@attr('webidentity_test')
|
||||
@attr('abac_test')
|
||||
@attr('token_resource_tags_test')
|
||||
def test_assume_role_with_web_identity_resource_tag():
|
||||
check_webidentity()
|
||||
iam_client=get_iam_client()
|
||||
sts_client=get_sts_client()
|
||||
default_endpoint=get_config_endpoint()
|
||||
role_session_name=get_parameter_name()
|
||||
thumbprint=get_thumbprint()
|
||||
user_token=get_user_token()
|
||||
realm=get_realm_name()
|
||||
|
||||
s3_res_iam_creds = get_s3_resource_using_iam_creds()
|
||||
|
||||
s3_client_iam_creds = s3_res_iam_creds.meta.client
|
||||
|
||||
bucket_name = get_new_bucket_name()
|
||||
s3bucket = s3_client_iam_creds.create_bucket(Bucket=bucket_name)
|
||||
eq(s3bucket['ResponseMetadata']['HTTPStatusCode'],200)
|
||||
|
||||
bucket_tagging = s3_res_iam_creds.BucketTagging(bucket_name)
|
||||
Set_Tag = bucket_tagging.put(Tagging={'TagSet':[{'Key':'Department', 'Value': 'Engineering'},{'Key':'Department', 'Value': 'Marketing'}]})
|
||||
|
||||
oidc_response = iam_client.create_open_id_connect_provider(
|
||||
Url='http://localhost:8080/auth/realms/{}'.format(realm),
|
||||
ThumbprintList=[
|
||||
thumbprint,
|
||||
],
|
||||
)
|
||||
|
||||
policy_document = "{\"Version\":\"2012-10-17\",\"Statement\":[{\"Effect\":\"Allow\",\"Principal\":{\"Federated\":[\""+oidc_response["OpenIDConnectProviderArn"]+"\"]},\"Action\":[\"sts:AssumeRoleWithWebIdentity\",\"sts:TagSession\"],\"Condition\":{\"StringEquals\":{\"aws:RequestTag/Department\":\"Engineering\"}}}]}"
|
||||
(role_error,role_response,general_role_name)=create_role(iam_client,'/',None,policy_document,None,None,None)
|
||||
eq(role_response['Role']['Arn'],'arn:aws:iam:::role/'+general_role_name+'')
|
||||
|
||||
role_policy = "{\"Version\":\"2012-10-17\",\"Statement\":{\"Effect\":\"Allow\",\"Action\":\"s3:*\",\"Resource\":\"arn:aws:s3:::*\",\"Condition\":{\"StringEquals\":{\"s3:ResourceTag/Department\":[\"Engineering\"]}}}}"
|
||||
(role_err,response)=put_role_policy(iam_client,general_role_name,None,role_policy)
|
||||
eq(response['ResponseMetadata']['HTTPStatusCode'],200)
|
||||
|
||||
resp=sts_client.assume_role_with_web_identity(RoleArn=role_response['Role']['Arn'],RoleSessionName=role_session_name,WebIdentityToken=user_token)
|
||||
eq(resp['ResponseMetadata']['HTTPStatusCode'],200)
|
||||
|
||||
s3_client = boto3.client('s3',
|
||||
aws_access_key_id = resp['Credentials']['AccessKeyId'],
|
||||
aws_secret_access_key = resp['Credentials']['SecretAccessKey'],
|
||||
aws_session_token = resp['Credentials']['SessionToken'],
|
||||
endpoint_url=default_endpoint,
|
||||
region_name='',
|
||||
)
|
||||
|
||||
bucket_body = 'this is a test file'
|
||||
s3_put_obj = s3_client.put_object(Body=bucket_body, Bucket=bucket_name, Key="test-1.txt")
|
||||
eq(s3_put_obj['ResponseMetadata']['HTTPStatusCode'],200)
|
||||
|
||||
oidc_remove=iam_client.delete_open_id_connect_provider(
|
||||
OpenIDConnectProviderArn=oidc_response["OpenIDConnectProviderArn"]
|
||||
)
|
||||
|
||||
@attr(resource='assume role with web identity')
|
||||
@attr(method='put')
|
||||
@attr(operation='check')
|
||||
@attr(assertion='assuming role using web token with s3:ResourceTag with missing tags on bucket')
|
||||
@attr('webidentity_test')
|
||||
@attr('abac_test')
|
||||
@attr('token_resource_tags_test')
|
||||
def test_assume_role_with_web_identity_resource_tag_deny():
|
||||
check_webidentity()
|
||||
iam_client=get_iam_client()
|
||||
sts_client=get_sts_client()
|
||||
default_endpoint=get_config_endpoint()
|
||||
role_session_name=get_parameter_name()
|
||||
thumbprint=get_thumbprint()
|
||||
user_token=get_user_token()
|
||||
realm=get_realm_name()
|
||||
|
||||
s3_res_iam_creds = get_s3_resource_using_iam_creds()
|
||||
|
||||
s3_client_iam_creds = s3_res_iam_creds.meta.client
|
||||
|
||||
bucket_name = get_new_bucket_name()
|
||||
s3bucket = s3_client_iam_creds.create_bucket(Bucket=bucket_name)
|
||||
eq(s3bucket['ResponseMetadata']['HTTPStatusCode'],200)
|
||||
|
||||
oidc_response = iam_client.create_open_id_connect_provider(
|
||||
Url='http://localhost:8080/auth/realms/{}'.format(realm),
|
||||
ThumbprintList=[
|
||||
thumbprint,
|
||||
],
|
||||
)
|
||||
|
||||
policy_document = "{\"Version\":\"2012-10-17\",\"Statement\":[{\"Effect\":\"Allow\",\"Principal\":{\"Federated\":[\""+oidc_response["OpenIDConnectProviderArn"]+"\"]},\"Action\":[\"sts:AssumeRoleWithWebIdentity\",\"sts:TagSession\"],\"Condition\":{\"StringEquals\":{\"aws:RequestTag/Department\":\"Engineering\"}}}]}"
|
||||
(role_error,role_response,general_role_name)=create_role(iam_client,'/',None,policy_document,None,None,None)
|
||||
eq(role_response['Role']['Arn'],'arn:aws:iam:::role/'+general_role_name+'')
|
||||
|
||||
role_policy = "{\"Version\":\"2012-10-17\",\"Statement\":{\"Effect\":\"Allow\",\"Action\":\"s3:*\",\"Resource\":\"arn:aws:s3:::*\",\"Condition\":{\"StringEquals\":{\"s3:ResourceTag/Department\":[\"Engineering\"]}}}}"
|
||||
(role_err,response)=put_role_policy(iam_client,general_role_name,None,role_policy)
|
||||
eq(response['ResponseMetadata']['HTTPStatusCode'],200)
|
||||
|
||||
resp=sts_client.assume_role_with_web_identity(RoleArn=role_response['Role']['Arn'],RoleSessionName=role_session_name,WebIdentityToken=user_token)
|
||||
eq(resp['ResponseMetadata']['HTTPStatusCode'],200)
|
||||
|
||||
s3_client = boto3.client('s3',
|
||||
aws_access_key_id = resp['Credentials']['AccessKeyId'],
|
||||
aws_secret_access_key = resp['Credentials']['SecretAccessKey'],
|
||||
aws_session_token = resp['Credentials']['SessionToken'],
|
||||
endpoint_url=default_endpoint,
|
||||
region_name='',
|
||||
)
|
||||
|
||||
bucket_body = 'this is a test file'
|
||||
try:
|
||||
s3_put_obj = s3_client.put_object(Body=bucket_body, Bucket=bucket_name, Key="test-1.txt")
|
||||
except ClientError as e:
|
||||
s3_put_obj_error = e.response.get("Error", {}).get("Code")
|
||||
eq(s3_put_obj_error,'AccessDenied')
|
||||
|
||||
oidc_remove=iam_client.delete_open_id_connect_provider(
|
||||
OpenIDConnectProviderArn=oidc_response["OpenIDConnectProviderArn"]
|
||||
)
|
||||
|
||||
@attr(resource='assume role with web identity')
|
||||
@attr(method='put')
|
||||
@attr(operation='check')
|
||||
@attr(assertion='assuming role using web token with s3:ResourceTag with wrong resource tag in policy')
|
||||
@attr('webidentity_test')
|
||||
@attr('abac_test')
|
||||
@attr('token_resource_tags_test')
|
||||
def test_assume_role_with_web_identity_wrong_resource_tag_deny():
|
||||
check_webidentity()
|
||||
iam_client=get_iam_client()
|
||||
sts_client=get_sts_client()
|
||||
default_endpoint=get_config_endpoint()
|
||||
role_session_name=get_parameter_name()
|
||||
thumbprint=get_thumbprint()
|
||||
user_token=get_user_token()
|
||||
realm=get_realm_name()
|
||||
|
||||
s3_res_iam_creds = get_s3_resource_using_iam_creds()
|
||||
|
||||
s3_client_iam_creds = s3_res_iam_creds.meta.client
|
||||
|
||||
bucket_name = get_new_bucket_name()
|
||||
s3bucket = s3_client_iam_creds.create_bucket(Bucket=bucket_name)
|
||||
eq(s3bucket['ResponseMetadata']['HTTPStatusCode'],200)
|
||||
|
||||
bucket_tagging = s3_res_iam_creds.BucketTagging(bucket_name)
|
||||
Set_Tag = bucket_tagging.put(Tagging={'TagSet':[{'Key':'Department', 'Value': 'WrongResourcetag'}]})
|
||||
|
||||
oidc_response = iam_client.create_open_id_connect_provider(
|
||||
Url='http://localhost:8080/auth/realms/{}'.format(realm),
|
||||
ThumbprintList=[
|
||||
thumbprint,
|
||||
],
|
||||
)
|
||||
|
||||
policy_document = "{\"Version\":\"2012-10-17\",\"Statement\":[{\"Effect\":\"Allow\",\"Principal\":{\"Federated\":[\""+oidc_response["OpenIDConnectProviderArn"]+"\"]},\"Action\":[\"sts:AssumeRoleWithWebIdentity\",\"sts:TagSession\"],\"Condition\":{\"StringEquals\":{\"aws:RequestTag/Department\":\"Engineering\"}}}]}"
|
||||
(role_error,role_response,general_role_name)=create_role(iam_client,'/',None,policy_document,None,None,None)
|
||||
eq(role_response['Role']['Arn'],'arn:aws:iam:::role/'+general_role_name+'')
|
||||
|
||||
role_policy = "{\"Version\":\"2012-10-17\",\"Statement\":{\"Effect\":\"Allow\",\"Action\":\"s3:*\",\"Resource\":\"arn:aws:s3:::*\",\"Condition\":{\"StringEquals\":{\"s3:ResourceTag/Department\":[\"Engineering\"]}}}}"
|
||||
(role_err,response)=put_role_policy(iam_client,general_role_name,None,role_policy)
|
||||
eq(response['ResponseMetadata']['HTTPStatusCode'],200)
|
||||
|
||||
resp=sts_client.assume_role_with_web_identity(RoleArn=role_response['Role']['Arn'],RoleSessionName=role_session_name,WebIdentityToken=user_token)
|
||||
eq(resp['ResponseMetadata']['HTTPStatusCode'],200)
|
||||
|
||||
s3_client = boto3.client('s3',
|
||||
aws_access_key_id = resp['Credentials']['AccessKeyId'],
|
||||
aws_secret_access_key = resp['Credentials']['SecretAccessKey'],
|
||||
aws_session_token = resp['Credentials']['SessionToken'],
|
||||
endpoint_url=default_endpoint,
|
||||
region_name='',
|
||||
)
|
||||
|
||||
bucket_body = 'this is a test file'
|
||||
try:
|
||||
s3_put_obj = s3_client.put_object(Body=bucket_body, Bucket=bucket_name, Key="test-1.txt")
|
||||
except ClientError as e:
|
||||
s3_put_obj_error = e.response.get("Error", {}).get("Code")
|
||||
eq(s3_put_obj_error,'AccessDenied')
|
||||
|
||||
oidc_remove=iam_client.delete_open_id_connect_provider(
|
||||
OpenIDConnectProviderArn=oidc_response["OpenIDConnectProviderArn"]
|
||||
)
|
||||
|
||||
@attr(resource='assume role with web identity')
|
||||
@attr(method='put')
|
||||
@attr(operation='check')
|
||||
@attr(assertion='assuming role using web token with s3:ResourceTag matching aws:PrincipalTag in role permission policy')
|
||||
@attr('webidentity_test')
|
||||
@attr('abac_test')
|
||||
@attr('token_resource_tags_test')
|
||||
def test_assume_role_with_web_identity_resource_tag_princ_tag():
|
||||
check_webidentity()
|
||||
iam_client=get_iam_client()
|
||||
sts_client=get_sts_client()
|
||||
default_endpoint=get_config_endpoint()
|
||||
role_session_name=get_parameter_name()
|
||||
thumbprint=get_thumbprint()
|
||||
user_token=get_user_token()
|
||||
realm=get_realm_name()
|
||||
|
||||
s3_res_iam_creds = get_s3_resource_using_iam_creds()
|
||||
|
||||
s3_client_iam_creds = s3_res_iam_creds.meta.client
|
||||
|
||||
bucket_name = get_new_bucket_name()
|
||||
s3bucket = s3_client_iam_creds.create_bucket(Bucket=bucket_name)
|
||||
eq(s3bucket['ResponseMetadata']['HTTPStatusCode'],200)
|
||||
|
||||
bucket_tagging = s3_res_iam_creds.BucketTagging(bucket_name)
|
||||
Set_Tag = bucket_tagging.put(Tagging={'TagSet':[{'Key':'Department', 'Value': 'Engineering'}]})
|
||||
|
||||
oidc_response = iam_client.create_open_id_connect_provider(
|
||||
Url='http://localhost:8080/auth/realms/{}'.format(realm),
|
||||
ThumbprintList=[
|
||||
thumbprint,
|
||||
],
|
||||
)
|
||||
|
||||
policy_document = "{\"Version\":\"2012-10-17\",\"Statement\":[{\"Effect\":\"Allow\",\"Principal\":{\"Federated\":[\""+oidc_response["OpenIDConnectProviderArn"]+"\"]},\"Action\":[\"sts:AssumeRoleWithWebIdentity\",\"sts:TagSession\"],\"Condition\":{\"StringEquals\":{\"aws:RequestTag/Department\":\"Engineering\"}}}]}"
|
||||
(role_error,role_response,general_role_name)=create_role(iam_client,'/',None,policy_document,None,None,None)
|
||||
eq(role_response['Role']['Arn'],'arn:aws:iam:::role/'+general_role_name+'')
|
||||
|
||||
role_policy = "{\"Version\":\"2012-10-17\",\"Statement\":{\"Effect\":\"Allow\",\"Action\":\"s3:*\",\"Resource\":\"arn:aws:s3:::*\",\"Condition\":{\"StringEquals\":{\"s3:ResourceTag/Department\":[\"${aws:PrincipalTag/Department}\"]}}}}"
|
||||
(role_err,response)=put_role_policy(iam_client,general_role_name,None,role_policy)
|
||||
eq(response['ResponseMetadata']['HTTPStatusCode'],200)
|
||||
|
||||
resp=sts_client.assume_role_with_web_identity(RoleArn=role_response['Role']['Arn'],RoleSessionName=role_session_name,WebIdentityToken=user_token)
|
||||
eq(resp['ResponseMetadata']['HTTPStatusCode'],200)
|
||||
|
||||
s3_client = boto3.client('s3',
|
||||
aws_access_key_id = resp['Credentials']['AccessKeyId'],
|
||||
aws_secret_access_key = resp['Credentials']['SecretAccessKey'],
|
||||
aws_session_token = resp['Credentials']['SessionToken'],
|
||||
endpoint_url=default_endpoint,
|
||||
region_name='',
|
||||
)
|
||||
|
||||
bucket_body = 'this is a test file'
|
||||
tags = 'Department=Engineering&Department=Marketing'
|
||||
key = "test-1.txt"
|
||||
s3_put_obj = s3_client.put_object(Body=bucket_body, Bucket=bucket_name, Key=key, Tagging=tags)
|
||||
eq(s3_put_obj['ResponseMetadata']['HTTPStatusCode'],200)
|
||||
|
||||
s3_get_obj = s3_client.get_object(Bucket=bucket_name, Key=key)
|
||||
eq(s3_get_obj['ResponseMetadata']['HTTPStatusCode'],200)
|
||||
|
||||
oidc_remove=iam_client.delete_open_id_connect_provider(
|
||||
OpenIDConnectProviderArn=oidc_response["OpenIDConnectProviderArn"]
|
||||
)
|
||||
|
||||
@attr(resource='assume role with web identity')
|
||||
@attr(method='put')
|
||||
@attr(operation='check')
|
||||
@attr(assertion='assuming role using web token with s3:ResourceTag used to test copy object')
|
||||
@attr('webidentity_test')
|
||||
@attr('abac_test')
|
||||
@attr('token_resource_tags_test')
|
||||
def test_assume_role_with_web_identity_resource_tag_copy_obj():
|
||||
check_webidentity()
|
||||
iam_client=get_iam_client()
|
||||
sts_client=get_sts_client()
|
||||
default_endpoint=get_config_endpoint()
|
||||
role_session_name=get_parameter_name()
|
||||
thumbprint=get_thumbprint()
|
||||
user_token=get_user_token()
|
||||
realm=get_realm_name()
|
||||
|
||||
s3_res_iam_creds = get_s3_resource_using_iam_creds()
|
||||
|
||||
s3_client_iam_creds = s3_res_iam_creds.meta.client
|
||||
|
||||
#create two buckets and add same tags to both
|
||||
bucket_name = get_new_bucket_name()
|
||||
s3bucket = s3_client_iam_creds.create_bucket(Bucket=bucket_name)
|
||||
eq(s3bucket['ResponseMetadata']['HTTPStatusCode'],200)
|
||||
|
||||
bucket_tagging = s3_res_iam_creds.BucketTagging(bucket_name)
|
||||
Set_Tag = bucket_tagging.put(Tagging={'TagSet':[{'Key':'Department', 'Value': 'Engineering'}]})
|
||||
|
||||
copy_bucket_name = get_new_bucket_name()
|
||||
s3bucket = s3_client_iam_creds.create_bucket(Bucket=copy_bucket_name)
|
||||
eq(s3bucket['ResponseMetadata']['HTTPStatusCode'],200)
|
||||
|
||||
bucket_tagging = s3_res_iam_creds.BucketTagging(copy_bucket_name)
|
||||
Set_Tag = bucket_tagging.put(Tagging={'TagSet':[{'Key':'Department', 'Value': 'Engineering'}]})
|
||||
|
||||
oidc_response = iam_client.create_open_id_connect_provider(
|
||||
Url='http://localhost:8080/auth/realms/{}'.format(realm),
|
||||
ThumbprintList=[
|
||||
thumbprint,
|
||||
],
|
||||
)
|
||||
|
||||
policy_document = "{\"Version\":\"2012-10-17\",\"Statement\":[{\"Effect\":\"Allow\",\"Principal\":{\"Federated\":[\""+oidc_response["OpenIDConnectProviderArn"]+"\"]},\"Action\":[\"sts:AssumeRoleWithWebIdentity\",\"sts:TagSession\"],\"Condition\":{\"StringEquals\":{\"aws:RequestTag/Department\":\"Engineering\"}}}]}"
|
||||
(role_error,role_response,general_role_name)=create_role(iam_client,'/',None,policy_document,None,None,None)
|
||||
eq(role_response['Role']['Arn'],'arn:aws:iam:::role/'+general_role_name+'')
|
||||
|
||||
role_policy = "{\"Version\":\"2012-10-17\",\"Statement\":{\"Effect\":\"Allow\",\"Action\":\"s3:*\",\"Resource\":\"arn:aws:s3:::*\",\"Condition\":{\"StringEquals\":{\"s3:ResourceTag/Department\":[\"${aws:PrincipalTag/Department}\"]}}}}"
|
||||
(role_err,response)=put_role_policy(iam_client,general_role_name,None,role_policy)
|
||||
eq(response['ResponseMetadata']['HTTPStatusCode'],200)
|
||||
|
||||
resp=sts_client.assume_role_with_web_identity(RoleArn=role_response['Role']['Arn'],RoleSessionName=role_session_name,WebIdentityToken=user_token)
|
||||
eq(resp['ResponseMetadata']['HTTPStatusCode'],200)
|
||||
|
||||
s3_client = boto3.client('s3',
|
||||
aws_access_key_id = resp['Credentials']['AccessKeyId'],
|
||||
aws_secret_access_key = resp['Credentials']['SecretAccessKey'],
|
||||
aws_session_token = resp['Credentials']['SessionToken'],
|
||||
endpoint_url=default_endpoint,
|
||||
region_name='',
|
||||
)
|
||||
|
||||
bucket_body = 'this is a test file'
|
||||
tags = 'Department=Engineering'
|
||||
key = "test-1.txt"
|
||||
s3_put_obj = s3_client.put_object(Body=bucket_body, Bucket=bucket_name, Key=key, Tagging=tags)
|
||||
eq(s3_put_obj['ResponseMetadata']['HTTPStatusCode'],200)
|
||||
|
||||
#copy to same bucket
|
||||
copy_source = {
|
||||
'Bucket': bucket_name,
|
||||
'Key': 'test-1.txt'
|
||||
}
|
||||
|
||||
s3_client.copy(copy_source, bucket_name, "test-2.txt")
|
||||
|
||||
s3_get_obj = s3_client.get_object(Bucket=bucket_name, Key="test-2.txt")
|
||||
eq(s3_get_obj['ResponseMetadata']['HTTPStatusCode'],200)
|
||||
|
||||
#copy to another bucket
|
||||
copy_source = {
|
||||
'Bucket': bucket_name,
|
||||
'Key': 'test-1.txt'
|
||||
}
|
||||
|
||||
s3_client.copy(copy_source, copy_bucket_name, "test-1.txt")
|
||||
|
||||
s3_get_obj = s3_client.get_object(Bucket=copy_bucket_name, Key="test-1.txt")
|
||||
eq(s3_get_obj['ResponseMetadata']['HTTPStatusCode'],200)
|
||||
|
||||
oidc_remove=iam_client.delete_open_id_connect_provider(
|
||||
OpenIDConnectProviderArn=oidc_response["OpenIDConnectProviderArn"]
|
||||
)
|
||||
|
||||
@attr(resource='assume role with web identity')
|
||||
@attr(method='put')
|
||||
@attr(operation='check')
|
||||
@attr(assertion='assuming role using web token with iam:ResourceTag in role trust policy')
|
||||
@attr('webidentity_test')
|
||||
@attr('abac_test')
|
||||
@attr('token_role_tags_test')
|
||||
def test_assume_role_with_web_identity_role_resource_tag():
|
||||
check_webidentity()
|
||||
iam_client=get_iam_client()
|
||||
sts_client=get_sts_client()
|
||||
default_endpoint=get_config_endpoint()
|
||||
role_session_name=get_parameter_name()
|
||||
thumbprint=get_thumbprint()
|
||||
user_token=get_user_token()
|
||||
realm=get_realm_name()
|
||||
|
||||
s3_res_iam_creds = get_s3_resource_using_iam_creds()
|
||||
|
||||
s3_client_iam_creds = s3_res_iam_creds.meta.client
|
||||
|
||||
bucket_name = get_new_bucket_name()
|
||||
s3bucket = s3_client_iam_creds.create_bucket(Bucket=bucket_name)
|
||||
eq(s3bucket['ResponseMetadata']['HTTPStatusCode'],200)
|
||||
|
||||
bucket_tagging = s3_res_iam_creds.BucketTagging(bucket_name)
|
||||
Set_Tag = bucket_tagging.put(Tagging={'TagSet':[{'Key':'Department', 'Value': 'Engineering'},{'Key':'Department', 'Value': 'Marketing'}]})
|
||||
|
||||
oidc_response = iam_client.create_open_id_connect_provider(
|
||||
Url='http://localhost:8080/auth/realms/{}'.format(realm),
|
||||
ThumbprintList=[
|
||||
thumbprint,
|
||||
],
|
||||
)
|
||||
|
||||
#iam:ResourceTag refers to the tag attached to role, hence the role is allowed to be assumed only when it has a tag matching the policy.
|
||||
policy_document = "{\"Version\":\"2012-10-17\",\"Statement\":[{\"Effect\":\"Allow\",\"Principal\":{\"Federated\":[\""+oidc_response["OpenIDConnectProviderArn"]+"\"]},\"Action\":[\"sts:AssumeRoleWithWebIdentity\",\"sts:TagSession\"],\"Condition\":{\"StringEquals\":{\"iam:ResourceTag/Department\":\"Engineering\"}}}]}"
|
||||
tags_list = [
|
||||
{'Key':'Department','Value':'Engineering'},
|
||||
{'Key':'Department','Value':'Marketing'}
|
||||
]
|
||||
|
||||
(role_error,role_response,general_role_name)=create_role(iam_client,'/',None,policy_document,None,None,None,tags_list)
|
||||
eq(role_response['Role']['Arn'],'arn:aws:iam:::role/'+general_role_name+'')
|
||||
|
||||
role_policy = "{\"Version\":\"2012-10-17\",\"Statement\":{\"Effect\":\"Allow\",\"Action\":\"s3:*\",\"Resource\":\"arn:aws:s3:::*\",\"Condition\":{\"StringEquals\":{\"s3:ResourceTag/Department\":[\"Engineering\"]}}}}"
|
||||
(role_err,response)=put_role_policy(iam_client,general_role_name,None,role_policy)
|
||||
eq(response['ResponseMetadata']['HTTPStatusCode'],200)
|
||||
|
||||
resp=sts_client.assume_role_with_web_identity(RoleArn=role_response['Role']['Arn'],RoleSessionName=role_session_name,WebIdentityToken=user_token)
|
||||
eq(resp['ResponseMetadata']['HTTPStatusCode'],200)
|
||||
|
||||
s3_client = boto3.client('s3',
|
||||
aws_access_key_id = resp['Credentials']['AccessKeyId'],
|
||||
aws_secret_access_key = resp['Credentials']['SecretAccessKey'],
|
||||
aws_session_token = resp['Credentials']['SessionToken'],
|
||||
endpoint_url=default_endpoint,
|
||||
region_name='',
|
||||
)
|
||||
|
||||
bucket_body = 'this is a test file'
|
||||
s3_put_obj = s3_client.put_object(Body=bucket_body, Bucket=bucket_name, Key="test-1.txt")
|
||||
eq(s3_put_obj['ResponseMetadata']['HTTPStatusCode'],200)
|
||||
|
||||
oidc_remove=iam_client.delete_open_id_connect_provider(
|
||||
OpenIDConnectProviderArn=oidc_response["OpenIDConnectProviderArn"]
|
||||
)
|
||||
|
|
Loading…
Reference in a new issue