mirror of
https://github.com/ceph/s3-tests.git
synced 2024-11-25 03:47:22 +00:00
rgw/sts: Addition of new STS tests for testing
session policies alongwith role's permission policy and bucket policy. Signed-off-by: Kalpesh Pandya <kapandya@redhat.com> Signed-off-by: Pritha Srivastava <prsrivas@redhat.com>
This commit is contained in:
parent
2851712901
commit
245a93326e
3 changed files with 904 additions and 1 deletions
|
@ -72,6 +72,9 @@ email = tenanteduser@example.com
|
||||||
#following section needs to be added for all sts-tests
|
#following section needs to be added for all sts-tests
|
||||||
[iam]
|
[iam]
|
||||||
#used for iam operations in sts-tests
|
#used for iam operations in sts-tests
|
||||||
|
#email from vstart.sh
|
||||||
|
email = s3@example.com
|
||||||
|
|
||||||
#user_id from vstart.sh
|
#user_id from vstart.sh
|
||||||
user_id = 0123456789abcdef0123456789abcdef0123456789abcdef0123456789abcdef
|
user_id = 0123456789abcdef0123456789abcdef0123456789abcdef0123456789abcdef
|
||||||
|
|
||||||
|
|
|
@ -224,6 +224,24 @@ def teardown():
|
||||||
nuke_prefixed_buckets(prefix=prefix)
|
nuke_prefixed_buckets(prefix=prefix)
|
||||||
nuke_prefixed_buckets(prefix=prefix, client=alt_client)
|
nuke_prefixed_buckets(prefix=prefix, client=alt_client)
|
||||||
nuke_prefixed_buckets(prefix=prefix, client=tenant_client)
|
nuke_prefixed_buckets(prefix=prefix, client=tenant_client)
|
||||||
|
try:
|
||||||
|
iam_client = get_iam_client()
|
||||||
|
list_roles_resp = iam_client.list_roles()
|
||||||
|
for role in list_roles_resp['Roles']:
|
||||||
|
list_policies_resp = iam_client.list_role_policies(RoleName=role['RoleName'])
|
||||||
|
for policy in list_policies_resp['PolicyNames']:
|
||||||
|
del_policy_resp = iam_client.delete_role_policy(
|
||||||
|
RoleName=role['RoleName'],
|
||||||
|
PolicyName=policy
|
||||||
|
)
|
||||||
|
del_role_resp = iam_client.delete_role(RoleName=role['RoleName'])
|
||||||
|
list_oidc_resp = iam_client.list_open_id_connect_providers()
|
||||||
|
for oidcprovider in list_oidc_resp['OpenIDConnectProviderList']:
|
||||||
|
del_oidc_resp = iam_client.delete_open_id_connect_provider(
|
||||||
|
OpenIDConnectProviderArn=oidcprovider['Arn']
|
||||||
|
)
|
||||||
|
except:
|
||||||
|
pass
|
||||||
|
|
||||||
def check_webidentity():
|
def check_webidentity():
|
||||||
cfg = configparser.RawConfigParser()
|
cfg = configparser.RawConfigParser()
|
||||||
|
@ -514,3 +532,9 @@ def get_token():
|
||||||
|
|
||||||
def get_realm_name():
|
def get_realm_name():
|
||||||
return config.webidentity_realm
|
return config.webidentity_realm
|
||||||
|
|
||||||
|
def get_iam_access_key():
|
||||||
|
return config.iam_access_key
|
||||||
|
|
||||||
|
def get_iam_secret_key():
|
||||||
|
return config.iam_secret_key
|
||||||
|
|
|
@ -45,7 +45,9 @@ from . import(
|
||||||
get_aud,
|
get_aud,
|
||||||
get_token,
|
get_token,
|
||||||
get_realm_name,
|
get_realm_name,
|
||||||
check_webidentity
|
check_webidentity,
|
||||||
|
get_iam_access_key,
|
||||||
|
get_iam_secret_key
|
||||||
)
|
)
|
||||||
|
|
||||||
log = logging.getLogger(__name__)
|
log = logging.getLogger(__name__)
|
||||||
|
@ -80,6 +82,54 @@ def put_user_policy(iam_client,username,policyname,policy_document):
|
||||||
role_err = e.response['Code']
|
role_err = e.response['Code']
|
||||||
return (role_err,role_response)
|
return (role_err,role_response)
|
||||||
|
|
||||||
|
def get_s3_client_using_iam_creds():
|
||||||
|
iam_access_key = get_iam_access_key()
|
||||||
|
iam_secret_key = get_iam_secret_key()
|
||||||
|
default_endpoint = get_config_endpoint()
|
||||||
|
|
||||||
|
s3_client_iam_creds = boto3.client('s3',
|
||||||
|
aws_access_key_id = iam_access_key,
|
||||||
|
aws_secret_access_key = iam_secret_key,
|
||||||
|
endpoint_url=default_endpoint,
|
||||||
|
region_name='',
|
||||||
|
)
|
||||||
|
|
||||||
|
return s3_client_iam_creds
|
||||||
|
|
||||||
|
def create_oidc_provider(iam_client, url, clientidlist, thumbprintlist):
|
||||||
|
oidc_arn = None
|
||||||
|
oidc_error = None
|
||||||
|
clientids = []
|
||||||
|
if clientidlist is None:
|
||||||
|
clientidlist=clientids
|
||||||
|
try:
|
||||||
|
oidc_response = iam_client.create_open_id_connect_provider(
|
||||||
|
Url=url,
|
||||||
|
ClientIDList=clientidlist,
|
||||||
|
ThumbprintList=thumbprintlist,
|
||||||
|
)
|
||||||
|
oidc_arn = oidc_response['OpenIDConnectProviderArn']
|
||||||
|
print (oidc_arn)
|
||||||
|
except ClientError as e:
|
||||||
|
oidc_error = e.response['Code']
|
||||||
|
print (oidc_error)
|
||||||
|
try:
|
||||||
|
oidc_error = None
|
||||||
|
print (url)
|
||||||
|
if url.startswith('http://'):
|
||||||
|
url = url[len('http://'):]
|
||||||
|
elif url.startswith('https://'):
|
||||||
|
url = url[len('https://'):]
|
||||||
|
elif url.startswith('www.'):
|
||||||
|
url = url[len('www.'):]
|
||||||
|
oidc_arn = 'arn:aws:iam:::oidc-provider/{}'.format(url)
|
||||||
|
print (url)
|
||||||
|
print (oidc_arn)
|
||||||
|
oidc_response = iam_client.get_open_id_connect_provider(OpenIDConnectProviderArn=oidc_arn)
|
||||||
|
except ClientError as e:
|
||||||
|
oidc_arn = None
|
||||||
|
return (oidc_arn, oidc_error)
|
||||||
|
|
||||||
@attr(resource='get session token')
|
@attr(resource='get session token')
|
||||||
@attr(method='get')
|
@attr(method='get')
|
||||||
@attr(operation='check')
|
@attr(operation='check')
|
||||||
|
@ -354,3 +404,829 @@ def test_assume_role_with_web_identity_invalid_webtoken():
|
||||||
OpenIDConnectProviderArn=oidc_response["OpenIDConnectProviderArn"]
|
OpenIDConnectProviderArn=oidc_response["OpenIDConnectProviderArn"]
|
||||||
)
|
)
|
||||||
'''
|
'''
|
||||||
|
|
||||||
|
#######################
|
||||||
|
# Session Policy Tests
|
||||||
|
#######################
|
||||||
|
|
||||||
|
@attr(resource='assume role with web identity')
|
||||||
|
@attr(method='get')
|
||||||
|
@attr(operation='check')
|
||||||
|
@attr(assertion='checking session policy working for two different buckets')
|
||||||
|
@attr('webidentity_test')
|
||||||
|
@attr('session_policy')
|
||||||
|
def test_session_policy_check_on_different_buckets():
|
||||||
|
check_webidentity()
|
||||||
|
iam_client=get_iam_client()
|
||||||
|
sts_client=get_sts_client()
|
||||||
|
default_endpoint=get_config_endpoint()
|
||||||
|
role_session_name=get_parameter_name()
|
||||||
|
thumbprint=get_thumbprint()
|
||||||
|
aud=get_aud()
|
||||||
|
token=get_token()
|
||||||
|
realm=get_realm_name()
|
||||||
|
|
||||||
|
url = 'http://localhost:8080/auth/realms/{}'.format(realm)
|
||||||
|
thumbprintlist = [thumbprint]
|
||||||
|
(oidc_arn,oidc_error) = create_oidc_provider(iam_client, url, None, thumbprintlist)
|
||||||
|
if oidc_error is not None:
|
||||||
|
raise RuntimeError('Unable to create/get openid connect provider {}'.format(oidc_error))
|
||||||
|
|
||||||
|
policy_document = "{\"Version\":\"2012-10-17\",\"Statement\":[{\"Effect\":\"Allow\",\"Principal\":{\"Federated\":[\""+oidc_arn+"\"]},\"Action\":[\"sts:AssumeRoleWithWebIdentity\"],\"Condition\":{\"StringEquals\":{\"localhost:8080/auth/realms/"+realm+":app_id\":\""+aud+"\"}}}]}"
|
||||||
|
(role_error,role_response,general_role_name)=create_role(iam_client,'/',None,policy_document,None,None,None)
|
||||||
|
eq(role_response['Role']['Arn'],'arn:aws:iam:::role/'+general_role_name+'')
|
||||||
|
|
||||||
|
role_policy_new = "{\"Version\":\"2012-10-17\",\"Statement\":{\"Effect\":\"Allow\",\"Action\":\"s3:*\",\"Resource\":[\"arn:aws:s3:::test2\",\"arn:aws:s3:::test2/*\"]}}"
|
||||||
|
|
||||||
|
(role_err,response)=put_role_policy(iam_client,general_role_name,None,role_policy_new)
|
||||||
|
eq(response['ResponseMetadata']['HTTPStatusCode'],200)
|
||||||
|
|
||||||
|
session_policy = "{\"Version\":\"2012-10-17\",\"Statement\":{\"Effect\":\"Allow\",\"Action\":[\"s3:GetObject\",\"s3:PutObject\"],\"Resource\":[\"arn:aws:s3:::test1\",\"arn:aws:s3:::test1/*\"]}}"
|
||||||
|
|
||||||
|
resp=sts_client.assume_role_with_web_identity(RoleArn=role_response['Role']['Arn'],RoleSessionName=role_session_name,WebIdentityToken=token,Policy=session_policy)
|
||||||
|
eq(resp['ResponseMetadata']['HTTPStatusCode'],200)
|
||||||
|
|
||||||
|
s3_client = boto3.client('s3',
|
||||||
|
aws_access_key_id = resp['Credentials']['AccessKeyId'],
|
||||||
|
aws_secret_access_key = resp['Credentials']['SecretAccessKey'],
|
||||||
|
aws_session_token = resp['Credentials']['SessionToken'],
|
||||||
|
endpoint_url=default_endpoint,
|
||||||
|
region_name='',
|
||||||
|
)
|
||||||
|
|
||||||
|
bucket_name_1 = 'test1'
|
||||||
|
try:
|
||||||
|
s3bucket = s3_client.create_bucket(Bucket=bucket_name_1)
|
||||||
|
except ClientError as e:
|
||||||
|
s3bucket_error = e.response.get("Error", {}).get("Code")
|
||||||
|
eq(s3bucket_error, 'AccessDenied')
|
||||||
|
|
||||||
|
bucket_name_2 = 'test2'
|
||||||
|
try:
|
||||||
|
s3bucket = s3_client.create_bucket(Bucket=bucket_name_2)
|
||||||
|
except ClientError as e:
|
||||||
|
s3bucket_error = e.response.get("Error", {}).get("Code")
|
||||||
|
eq(s3bucket_error, 'AccessDenied')
|
||||||
|
|
||||||
|
bucket_body = 'please-write-something'
|
||||||
|
#body.encode(encoding='utf_8')
|
||||||
|
try:
|
||||||
|
s3_put_obj = s3_client.put_object(Body=bucket_body, Bucket=bucket_name_1, Key="test-1.txt")
|
||||||
|
except ClientError as e:
|
||||||
|
s3_put_obj_error = e.response.get("Error", {}).get("Code")
|
||||||
|
eq(s3_put_obj_error,'NoSuchBucket')
|
||||||
|
|
||||||
|
oidc_remove=iam_client.delete_open_id_connect_provider(
|
||||||
|
OpenIDConnectProviderArn=oidc_arn
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
@attr(resource='assume role with web identity')
|
||||||
|
@attr(method='put')
|
||||||
|
@attr(operation='check')
|
||||||
|
@attr(assertion='checking session policy working for same bucket')
|
||||||
|
@attr('webidentity_test')
|
||||||
|
@attr('session_policy')
|
||||||
|
def test_session_policy_check_on_same_bucket():
|
||||||
|
check_webidentity()
|
||||||
|
iam_client=get_iam_client()
|
||||||
|
sts_client=get_sts_client()
|
||||||
|
default_endpoint=get_config_endpoint()
|
||||||
|
role_session_name=get_parameter_name()
|
||||||
|
thumbprint=get_thumbprint()
|
||||||
|
aud=get_aud()
|
||||||
|
token=get_token()
|
||||||
|
realm=get_realm_name()
|
||||||
|
|
||||||
|
url = 'http://localhost:8080/auth/realms/{}'.format(realm)
|
||||||
|
thumbprintlist = [thumbprint]
|
||||||
|
(oidc_arn,oidc_error) = create_oidc_provider(iam_client, url, None, thumbprintlist)
|
||||||
|
if oidc_error is not None:
|
||||||
|
raise RuntimeError('Unable to create/get openid connect provider {}'.format(oidc_error))
|
||||||
|
|
||||||
|
policy_document = "{\"Version\":\"2012-10-17\",\"Statement\":[{\"Effect\":\"Allow\",\"Principal\":{\"Federated\":[\""+oidc_arn+"\"]},\"Action\":[\"sts:AssumeRoleWithWebIdentity\"],\"Condition\":{\"StringEquals\":{\"localhost:8080/auth/realms/"+realm+":app_id\":\""+aud+"\"}}}]}"
|
||||||
|
(role_error,role_response,general_role_name)=create_role(iam_client,'/',None,policy_document,None,None,None)
|
||||||
|
eq(role_response['Role']['Arn'],'arn:aws:iam:::role/'+general_role_name+'')
|
||||||
|
|
||||||
|
role_policy_new = "{\"Version\":\"2012-10-17\",\"Statement\":{\"Effect\":\"Allow\",\"Action\":\"s3:*\",\"Resource\":[\"*\"]}}"
|
||||||
|
|
||||||
|
(role_err,response)=put_role_policy(iam_client,general_role_name,None,role_policy_new)
|
||||||
|
eq(response['ResponseMetadata']['HTTPStatusCode'],200)
|
||||||
|
|
||||||
|
s3_client_iam_creds = get_s3_client_using_iam_creds()
|
||||||
|
|
||||||
|
bucket_name_1 = 'test1'
|
||||||
|
s3bucket = s3_client_iam_creds.create_bucket(Bucket=bucket_name_1)
|
||||||
|
eq(s3bucket['ResponseMetadata']['HTTPStatusCode'],200)
|
||||||
|
|
||||||
|
session_policy = "{\"Version\":\"2012-10-17\",\"Statement\":{\"Effect\":\"Allow\",\"Action\":[\"s3:GetObject\",\"s3:PutObject\"],\"Resource\":[\"arn:aws:s3:::test1\",\"arn:aws:s3:::test1/*\"]}}"
|
||||||
|
|
||||||
|
resp=sts_client.assume_role_with_web_identity(RoleArn=role_response['Role']['Arn'],RoleSessionName=role_session_name,WebIdentityToken=token,Policy=session_policy)
|
||||||
|
eq(resp['ResponseMetadata']['HTTPStatusCode'],200)
|
||||||
|
|
||||||
|
s3_client = boto3.client('s3',
|
||||||
|
aws_access_key_id = resp['Credentials']['AccessKeyId'],
|
||||||
|
aws_secret_access_key = resp['Credentials']['SecretAccessKey'],
|
||||||
|
aws_session_token = resp['Credentials']['SessionToken'],
|
||||||
|
endpoint_url=default_endpoint,
|
||||||
|
region_name='',
|
||||||
|
)
|
||||||
|
|
||||||
|
bucket_body = 'this is a test file'
|
||||||
|
s3_put_obj = s3_client.put_object(Body=bucket_body, Bucket=bucket_name_1, Key="test-1.txt")
|
||||||
|
eq(s3_put_obj['ResponseMetadata']['HTTPStatusCode'],200)
|
||||||
|
|
||||||
|
oidc_remove=iam_client.delete_open_id_connect_provider(
|
||||||
|
OpenIDConnectProviderArn=oidc_arn
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
@attr(resource='assume role with web identity')
|
||||||
|
@attr(method='get')
|
||||||
|
@attr(operation='check')
|
||||||
|
@attr(assertion='checking put_obj op denial')
|
||||||
|
@attr('webidentity_test')
|
||||||
|
@attr('session_policy')
|
||||||
|
def test_session_policy_check_put_obj_denial():
|
||||||
|
check_webidentity()
|
||||||
|
iam_client=get_iam_client()
|
||||||
|
iam_access_key=get_iam_access_key()
|
||||||
|
iam_secret_key=get_iam_secret_key()
|
||||||
|
sts_client=get_sts_client()
|
||||||
|
default_endpoint=get_config_endpoint()
|
||||||
|
role_session_name=get_parameter_name()
|
||||||
|
thumbprint=get_thumbprint()
|
||||||
|
aud=get_aud()
|
||||||
|
token=get_token()
|
||||||
|
realm=get_realm_name()
|
||||||
|
|
||||||
|
url = 'http://localhost:8080/auth/realms/{}'.format(realm)
|
||||||
|
thumbprintlist = [thumbprint]
|
||||||
|
(oidc_arn,oidc_error) = create_oidc_provider(iam_client, url, None, thumbprintlist)
|
||||||
|
if oidc_error is not None:
|
||||||
|
raise RuntimeError('Unable to create/get openid connect provider {}'.format(oidc_error))
|
||||||
|
|
||||||
|
policy_document = "{\"Version\":\"2012-10-17\",\"Statement\":[{\"Effect\":\"Allow\",\"Principal\":{\"Federated\":[\""+oidc_arn+"\"]},\"Action\":[\"sts:AssumeRoleWithWebIdentity\"],\"Condition\":{\"StringEquals\":{\"localhost:8080/auth/realms/"+realm+":app_id\":\""+aud+"\"}}}]}"
|
||||||
|
(role_error,role_response,general_role_name)=create_role(iam_client,'/',None,policy_document,None,None,None)
|
||||||
|
eq(role_response['Role']['Arn'],'arn:aws:iam:::role/'+general_role_name+'')
|
||||||
|
|
||||||
|
role_policy_new = "{\"Version\":\"2012-10-17\",\"Statement\":{\"Effect\":\"Allow\",\"Action\":\"s3:*\",\"Resource\":[\"*\"]}}"
|
||||||
|
|
||||||
|
(role_err,response)=put_role_policy(iam_client,general_role_name,None,role_policy_new)
|
||||||
|
eq(response['ResponseMetadata']['HTTPStatusCode'],200)
|
||||||
|
|
||||||
|
s3_client_iam_creds = get_s3_client_using_iam_creds()
|
||||||
|
|
||||||
|
bucket_name_1 = 'test1'
|
||||||
|
s3bucket = s3_client_iam_creds.create_bucket(Bucket=bucket_name_1)
|
||||||
|
eq(s3bucket['ResponseMetadata']['HTTPStatusCode'],200)
|
||||||
|
|
||||||
|
session_policy = "{\"Version\":\"2012-10-17\",\"Statement\":{\"Effect\":\"Allow\",\"Action\":[\"s3:GetObject\"],\"Resource\":[\"arn:aws:s3:::test1\",\"arn:aws:s3:::test1/*\"]}}"
|
||||||
|
|
||||||
|
resp=sts_client.assume_role_with_web_identity(RoleArn=role_response['Role']['Arn'],RoleSessionName=role_session_name,WebIdentityToken=token,Policy=session_policy)
|
||||||
|
eq(resp['ResponseMetadata']['HTTPStatusCode'],200)
|
||||||
|
|
||||||
|
s3_client = boto3.client('s3',
|
||||||
|
aws_access_key_id = resp['Credentials']['AccessKeyId'],
|
||||||
|
aws_secret_access_key = resp['Credentials']['SecretAccessKey'],
|
||||||
|
aws_session_token = resp['Credentials']['SessionToken'],
|
||||||
|
endpoint_url=default_endpoint,
|
||||||
|
region_name='',
|
||||||
|
)
|
||||||
|
|
||||||
|
bucket_body = 'this is a test file'
|
||||||
|
try:
|
||||||
|
s3_put_obj = s3_client.put_object(Body=bucket_body, Bucket=bucket_name_1, Key="test-1.txt")
|
||||||
|
except ClientError as e:
|
||||||
|
s3_put_obj_error = e.response.get("Error", {}).get("Code")
|
||||||
|
eq(s3_put_obj_error, 'AccessDenied')
|
||||||
|
|
||||||
|
oidc_remove=iam_client.delete_open_id_connect_provider(
|
||||||
|
OpenIDConnectProviderArn=oidc_arn
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
@attr(resource='assume role with web identity')
|
||||||
|
@attr(method='get')
|
||||||
|
@attr(operation='check')
|
||||||
|
@attr(assertion='checking put_obj working by swapping policies')
|
||||||
|
@attr('webidentity_test')
|
||||||
|
@attr('session_policy')
|
||||||
|
def test_swapping_role_policy_and_session_policy():
|
||||||
|
check_webidentity()
|
||||||
|
iam_client=get_iam_client()
|
||||||
|
iam_access_key=get_iam_access_key()
|
||||||
|
iam_secret_key=get_iam_secret_key()
|
||||||
|
sts_client=get_sts_client()
|
||||||
|
default_endpoint=get_config_endpoint()
|
||||||
|
role_session_name=get_parameter_name()
|
||||||
|
thumbprint=get_thumbprint()
|
||||||
|
aud=get_aud()
|
||||||
|
token=get_token()
|
||||||
|
realm=get_realm_name()
|
||||||
|
|
||||||
|
url = 'http://localhost:8080/auth/realms/{}'.format(realm)
|
||||||
|
thumbprintlist = [thumbprint]
|
||||||
|
(oidc_arn,oidc_error) = create_oidc_provider(iam_client, url, None, thumbprintlist)
|
||||||
|
if oidc_error is not None:
|
||||||
|
raise RuntimeError('Unable to create/get openid connect provider {}'.format(oidc_error))
|
||||||
|
|
||||||
|
policy_document = "{\"Version\":\"2012-10-17\",\"Statement\":[{\"Effect\":\"Allow\",\"Principal\":{\"Federated\":[\""+oidc_arn+"\"]},\"Action\":[\"sts:AssumeRoleWithWebIdentity\"],\"Condition\":{\"StringEquals\":{\"localhost:8080/auth/realms/"+realm+":app_id\":\""+aud+"\"}}}]}"
|
||||||
|
(role_error,role_response,general_role_name)=create_role(iam_client,'/',None,policy_document,None,None,None)
|
||||||
|
eq(role_response['Role']['Arn'],'arn:aws:iam:::role/'+general_role_name+'')
|
||||||
|
|
||||||
|
role_policy_new = "{\"Version\":\"2012-10-17\",\"Statement\":{\"Effect\":\"Allow\",\"Action\":[\"s3:GetObject\",\"s3:PutObject\"],\"Resource\":[\"arn:aws:s3:::test1\",\"arn:aws:s3:::test1/*\"]}}"
|
||||||
|
|
||||||
|
(role_err,response)=put_role_policy(iam_client,general_role_name,None,role_policy_new)
|
||||||
|
eq(response['ResponseMetadata']['HTTPStatusCode'],200)
|
||||||
|
|
||||||
|
s3_client_iam_creds = get_s3_client_using_iam_creds()
|
||||||
|
|
||||||
|
bucket_name_1 = 'test1'
|
||||||
|
s3bucket = s3_client_iam_creds.create_bucket(Bucket=bucket_name_1)
|
||||||
|
eq(s3bucket['ResponseMetadata']['HTTPStatusCode'],200)
|
||||||
|
|
||||||
|
session_policy = "{\"Version\":\"2012-10-17\",\"Statement\":{\"Effect\":\"Allow\",\"Action\":\"s3:*\",\"Resource\":[\"*\"]}}"
|
||||||
|
|
||||||
|
resp=sts_client.assume_role_with_web_identity(RoleArn=role_response['Role']['Arn'],RoleSessionName=role_session_name,WebIdentityToken=token,Policy=session_policy)
|
||||||
|
eq(resp['ResponseMetadata']['HTTPStatusCode'],200)
|
||||||
|
|
||||||
|
s3_client = boto3.client('s3',
|
||||||
|
aws_access_key_id = resp['Credentials']['AccessKeyId'],
|
||||||
|
aws_secret_access_key = resp['Credentials']['SecretAccessKey'],
|
||||||
|
aws_session_token = resp['Credentials']['SessionToken'],
|
||||||
|
endpoint_url=default_endpoint,
|
||||||
|
region_name='',
|
||||||
|
)
|
||||||
|
bucket_body = 'this is a test file'
|
||||||
|
s3_put_obj = s3_client.put_object(Body=bucket_body, Bucket=bucket_name_1, Key="test-1.txt")
|
||||||
|
eq(s3_put_obj['ResponseMetadata']['HTTPStatusCode'],200)
|
||||||
|
|
||||||
|
oidc_remove=iam_client.delete_open_id_connect_provider(
|
||||||
|
OpenIDConnectProviderArn=oidc_arn
|
||||||
|
)
|
||||||
|
|
||||||
|
@attr(resource='assume role with web identity')
|
||||||
|
@attr(method='put')
|
||||||
|
@attr(operation='check')
|
||||||
|
@attr(assertion='checking put_obj working by setting different permissions to role and session policy')
|
||||||
|
@attr('webidentity_test')
|
||||||
|
@attr('session_policy')
|
||||||
|
def test_session_policy_check_different_op_permissions():
|
||||||
|
check_webidentity()
|
||||||
|
iam_client=get_iam_client()
|
||||||
|
iam_access_key=get_iam_access_key()
|
||||||
|
iam_secret_key=get_iam_secret_key()
|
||||||
|
sts_client=get_sts_client()
|
||||||
|
default_endpoint=get_config_endpoint()
|
||||||
|
role_session_name=get_parameter_name()
|
||||||
|
thumbprint=get_thumbprint()
|
||||||
|
aud=get_aud()
|
||||||
|
token=get_token()
|
||||||
|
realm=get_realm_name()
|
||||||
|
|
||||||
|
url = 'http://localhost:8080/auth/realms/{}'.format(realm)
|
||||||
|
thumbprintlist = [thumbprint]
|
||||||
|
(oidc_arn,oidc_error) = create_oidc_provider(iam_client, url, None, thumbprintlist)
|
||||||
|
if oidc_error is not None:
|
||||||
|
raise RuntimeError('Unable to create/get openid connect provider {}'.format(oidc_error))
|
||||||
|
|
||||||
|
policy_document = "{\"Version\":\"2012-10-17\",\"Statement\":[{\"Effect\":\"Allow\",\"Principal\":{\"Federated\":[\""+oidc_arn+"\"]},\"Action\":[\"sts:AssumeRoleWithWebIdentity\"],\"Condition\":{\"StringEquals\":{\"localhost:8080/auth/realms/"+realm+":app_id\":\""+aud+"\"}}}]}"
|
||||||
|
(role_error,role_response,general_role_name)=create_role(iam_client,'/',None,policy_document,None,None,None)
|
||||||
|
eq(role_response['Role']['Arn'],'arn:aws:iam:::role/'+general_role_name+'')
|
||||||
|
|
||||||
|
role_policy_new = "{\"Version\":\"2012-10-17\",\"Statement\":{\"Effect\":\"Allow\",\"Action\":[\"s3:PutObject\"],\"Resource\":[\"arn:aws:s3:::test1\",\"arn:aws:s3:::test1/*\"]}}"
|
||||||
|
|
||||||
|
(role_err,response)=put_role_policy(iam_client,general_role_name,None,role_policy_new)
|
||||||
|
eq(response['ResponseMetadata']['HTTPStatusCode'],200)
|
||||||
|
|
||||||
|
s3_client_iam_creds = get_s3_client_using_iam_creds()
|
||||||
|
|
||||||
|
bucket_name_1 = 'test1'
|
||||||
|
s3bucket = s3_client_iam_creds.create_bucket(Bucket=bucket_name_1)
|
||||||
|
eq(s3bucket['ResponseMetadata']['HTTPStatusCode'],200)
|
||||||
|
|
||||||
|
session_policy = "{\"Version\":\"2012-10-17\",\"Statement\":{\"Effect\":\"Allow\",\"Action\":[\"s3:GetObject\"],\"Resource\":[\"arn:aws:s3:::test1\",\"arn:aws:s3:::test1/*\"]}}"
|
||||||
|
|
||||||
|
resp=sts_client.assume_role_with_web_identity(RoleArn=role_response['Role']['Arn'],RoleSessionName=role_session_name,WebIdentityToken=token,Policy=session_policy)
|
||||||
|
eq(resp['ResponseMetadata']['HTTPStatusCode'],200)
|
||||||
|
|
||||||
|
s3_client = boto3.client('s3',
|
||||||
|
aws_access_key_id = resp['Credentials']['AccessKeyId'],
|
||||||
|
aws_secret_access_key = resp['Credentials']['SecretAccessKey'],
|
||||||
|
aws_session_token = resp['Credentials']['SessionToken'],
|
||||||
|
endpoint_url=default_endpoint,
|
||||||
|
region_name='',
|
||||||
|
)
|
||||||
|
|
||||||
|
bucket_body = 'this is a test file'
|
||||||
|
try:
|
||||||
|
s3_put_obj = s3_client.put_object(Body=bucket_body, Bucket=bucket_name_1, Key="test-1.txt")
|
||||||
|
except ClientError as e:
|
||||||
|
s3_put_obj_error = e.response.get("Error", {}).get("Code")
|
||||||
|
eq(s3_put_obj_error, 'AccessDenied')
|
||||||
|
|
||||||
|
oidc_remove=iam_client.delete_open_id_connect_provider(
|
||||||
|
OpenIDConnectProviderArn=oidc_arn
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
@attr(resource='assume role with web identity')
|
||||||
|
@attr(method='put')
|
||||||
|
@attr(operation='check')
|
||||||
|
@attr(assertion='checking op behaviour with deny effect')
|
||||||
|
@attr('webidentity_test')
|
||||||
|
@attr('session_policy')
|
||||||
|
def test_session_policy_check_with_deny_effect():
|
||||||
|
check_webidentity()
|
||||||
|
iam_client=get_iam_client()
|
||||||
|
iam_access_key=get_iam_access_key()
|
||||||
|
iam_secret_key=get_iam_secret_key()
|
||||||
|
sts_client=get_sts_client()
|
||||||
|
default_endpoint=get_config_endpoint()
|
||||||
|
role_session_name=get_parameter_name()
|
||||||
|
thumbprint=get_thumbprint()
|
||||||
|
aud=get_aud()
|
||||||
|
token=get_token()
|
||||||
|
realm=get_realm_name()
|
||||||
|
|
||||||
|
url = 'http://localhost:8080/auth/realms/{}'.format(realm)
|
||||||
|
thumbprintlist = [thumbprint]
|
||||||
|
(oidc_arn,oidc_error) = create_oidc_provider(iam_client, url, None, thumbprintlist)
|
||||||
|
if oidc_error is not None:
|
||||||
|
raise RuntimeError('Unable to create/get openid connect provider {}'.format(oidc_error))
|
||||||
|
|
||||||
|
policy_document = "{\"Version\":\"2012-10-17\",\"Statement\":[{\"Effect\":\"Allow\",\"Principal\":{\"Federated\":[\""+oidc_arn+"\"]},\"Action\":[\"sts:AssumeRoleWithWebIdentity\"],\"Condition\":{\"StringEquals\":{\"localhost:8080/auth/realms/"+realm+":app_id\":\""+aud+"\"}}}]}"
|
||||||
|
(role_error,role_response,general_role_name)=create_role(iam_client,'/',None,policy_document,None,None,None)
|
||||||
|
eq(role_response['Role']['Arn'],'arn:aws:iam:::role/'+general_role_name+'')
|
||||||
|
|
||||||
|
role_policy_new = "{\"Version\":\"2012-10-17\",\"Statement\":{\"Effect\":\"Deny\",\"Action\":\"s3:*\",\"Resource\":[\"*\"]}}"
|
||||||
|
|
||||||
|
(role_err,response)=put_role_policy(iam_client,general_role_name,None,role_policy_new)
|
||||||
|
eq(response['ResponseMetadata']['HTTPStatusCode'],200)
|
||||||
|
|
||||||
|
s3_client_iam_creds = get_s3_client_using_iam_creds()
|
||||||
|
|
||||||
|
bucket_name_1 = 'test1'
|
||||||
|
s3bucket = s3_client_iam_creds.create_bucket(Bucket=bucket_name_1)
|
||||||
|
eq(s3bucket['ResponseMetadata']['HTTPStatusCode'],200)
|
||||||
|
|
||||||
|
session_policy = "{\"Version\":\"2012-10-17\",\"Statement\":{\"Effect\":\"Allow\",\"Action\":[\"s3:PutObject\"],\"Resource\":[\"arn:aws:s3:::test1\",\"arn:aws:s3:::test1/*\"]}}"
|
||||||
|
|
||||||
|
resp=sts_client.assume_role_with_web_identity(RoleArn=role_response['Role']['Arn'],RoleSessionName=role_session_name,WebIdentityToken=token,Policy=session_policy)
|
||||||
|
eq(resp['ResponseMetadata']['HTTPStatusCode'],200)
|
||||||
|
|
||||||
|
s3_client = boto3.client('s3',
|
||||||
|
aws_access_key_id = resp['Credentials']['AccessKeyId'],
|
||||||
|
aws_secret_access_key = resp['Credentials']['SecretAccessKey'],
|
||||||
|
aws_session_token = resp['Credentials']['SessionToken'],
|
||||||
|
endpoint_url=default_endpoint,
|
||||||
|
region_name='',
|
||||||
|
)
|
||||||
|
bucket_body = 'this is a test file'
|
||||||
|
try:
|
||||||
|
s3_put_obj = s3_client.put_object(Body=bucket_body, Bucket=bucket_name_1, Key="test-1.txt")
|
||||||
|
except ClientError as e:
|
||||||
|
s3_put_obj_error = e.response.get("Error", {}).get("Code")
|
||||||
|
eq(s3_put_obj_error, 'AccessDenied')
|
||||||
|
|
||||||
|
oidc_remove=iam_client.delete_open_id_connect_provider(
|
||||||
|
OpenIDConnectProviderArn=oidc_arn
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
@attr(resource='assume role with web identity')
|
||||||
|
@attr(method='put')
|
||||||
|
@attr(operation='check')
|
||||||
|
@attr(assertion='checking put_obj working with deny and allow on same op')
|
||||||
|
@attr('webidentity_test')
|
||||||
|
@attr('session_policy')
|
||||||
|
def test_session_policy_check_with_deny_on_same_op():
|
||||||
|
check_webidentity()
|
||||||
|
iam_client=get_iam_client()
|
||||||
|
iam_access_key=get_iam_access_key()
|
||||||
|
iam_secret_key=get_iam_secret_key()
|
||||||
|
sts_client=get_sts_client()
|
||||||
|
default_endpoint=get_config_endpoint()
|
||||||
|
role_session_name=get_parameter_name()
|
||||||
|
thumbprint=get_thumbprint()
|
||||||
|
aud=get_aud()
|
||||||
|
token=get_token()
|
||||||
|
realm=get_realm_name()
|
||||||
|
|
||||||
|
url = 'http://localhost:8080/auth/realms/{}'.format(realm)
|
||||||
|
thumbprintlist = [thumbprint]
|
||||||
|
(oidc_arn,oidc_error) = create_oidc_provider(iam_client, url, None, thumbprintlist)
|
||||||
|
if oidc_error is not None:
|
||||||
|
raise RuntimeError('Unable to create/get openid connect provider {}'.format(oidc_error))
|
||||||
|
|
||||||
|
policy_document = "{\"Version\":\"2012-10-17\",\"Statement\":[{\"Effect\":\"Allow\",\"Principal\":{\"Federated\":[\""+oidc_arn+"\"]},\"Action\":[\"sts:AssumeRoleWithWebIdentity\"],\"Condition\":{\"StringEquals\":{\"localhost:8080/auth/realms/"+realm+":app_id\":\""+aud+"\"}}}]}"
|
||||||
|
(role_error,role_response,general_role_name)=create_role(iam_client,'/',None,policy_document,None,None,None)
|
||||||
|
eq(role_response['Role']['Arn'],'arn:aws:iam:::role/'+general_role_name+'')
|
||||||
|
|
||||||
|
role_policy_new = "{\"Version\":\"2012-10-17\",\"Statement\":{\"Effect\":\"Allow\",\"Action\":[\"s3:PutObject\"],\"Resource\":[\"arn:aws:s3:::test1\",\"arn:aws:s3:::test1/*\"]}}"
|
||||||
|
|
||||||
|
(role_err,response)=put_role_policy(iam_client,general_role_name,None,role_policy_new)
|
||||||
|
eq(response['ResponseMetadata']['HTTPStatusCode'],200)
|
||||||
|
|
||||||
|
s3_client_iam_creds = get_s3_client_using_iam_creds()
|
||||||
|
|
||||||
|
bucket_name_1 = 'test1'
|
||||||
|
s3bucket = s3_client_iam_creds.create_bucket(Bucket=bucket_name_1)
|
||||||
|
eq(s3bucket['ResponseMetadata']['HTTPStatusCode'],200)
|
||||||
|
|
||||||
|
session_policy = "{\"Version\":\"2012-10-17\",\"Statement\":{\"Effect\":\"Deny\",\"Action\":[\"s3:PutObject\"],\"Resource\":[\"arn:aws:s3:::test1\",\"arn:aws:s3:::test1/*\"]}}"
|
||||||
|
|
||||||
|
resp=sts_client.assume_role_with_web_identity(RoleArn=role_response['Role']['Arn'],RoleSessionName=role_session_name,WebIdentityToken=token,Policy=session_policy)
|
||||||
|
eq(resp['ResponseMetadata']['HTTPStatusCode'],200)
|
||||||
|
|
||||||
|
s3_client = boto3.client('s3',
|
||||||
|
aws_access_key_id = resp['Credentials']['AccessKeyId'],
|
||||||
|
aws_secret_access_key = resp['Credentials']['SecretAccessKey'],
|
||||||
|
aws_session_token = resp['Credentials']['SessionToken'],
|
||||||
|
endpoint_url=default_endpoint,
|
||||||
|
region_name='',
|
||||||
|
)
|
||||||
|
|
||||||
|
bucket_body = 'this is a test file'
|
||||||
|
try:
|
||||||
|
s3_put_obj = s3_client.put_object(Body=bucket_body, Bucket=bucket_name_1, Key="test-1.txt")
|
||||||
|
except ClientError as e:
|
||||||
|
s3_put_obj_error = e.response.get("Error", {}).get("Code")
|
||||||
|
eq(s3_put_obj_error, 'AccessDenied')
|
||||||
|
|
||||||
|
oidc_remove=iam_client.delete_open_id_connect_provider(
|
||||||
|
OpenIDConnectProviderArn=oidc_arn
|
||||||
|
)
|
||||||
|
|
||||||
|
@attr(resource='assume role with web identity')
|
||||||
|
@attr(method='put')
|
||||||
|
@attr(operation='check')
|
||||||
|
@attr(assertion='checking op when bucket policy has role arn')
|
||||||
|
@attr('webidentity_test')
|
||||||
|
@attr('session_policy')
|
||||||
|
def test_session_policy_bucket_policy_role_arn():
|
||||||
|
check_webidentity()
|
||||||
|
iam_client=get_iam_client()
|
||||||
|
sts_client=get_sts_client()
|
||||||
|
default_endpoint=get_config_endpoint()
|
||||||
|
role_session_name=get_parameter_name()
|
||||||
|
thumbprint=get_thumbprint()
|
||||||
|
aud=get_aud()
|
||||||
|
token=get_token()
|
||||||
|
realm=get_realm_name()
|
||||||
|
|
||||||
|
url = 'http://localhost:8080/auth/realms/{}'.format(realm)
|
||||||
|
thumbprintlist = [thumbprint]
|
||||||
|
(oidc_arn,oidc_error) = create_oidc_provider(iam_client, url, None, thumbprintlist)
|
||||||
|
if oidc_error is not None:
|
||||||
|
raise RuntimeError('Unable to create/get openid connect provider {}'.format(oidc_error))
|
||||||
|
|
||||||
|
policy_document = "{\"Version\":\"2012-10-17\",\"Statement\":[{\"Effect\":\"Allow\",\"Principal\":{\"Federated\":[\""+oidc_arn+"\"]},\"Action\":[\"sts:AssumeRoleWithWebIdentity\"],\"Condition\":{\"StringEquals\":{\"localhost:8080/auth/realms/"+realm+":app_id\":\""+aud+"\"}}}]}"
|
||||||
|
(role_error,role_response,general_role_name)=create_role(iam_client,'/',None,policy_document,None,None,None)
|
||||||
|
eq(role_response['Role']['Arn'],'arn:aws:iam:::role/'+general_role_name+'')
|
||||||
|
role_policy = "{\"Version\":\"2012-10-17\",\"Statement\":{\"Effect\":\"Allow\",\"Action\":\"s3:*\",\"Resource\":[\"*\"]}}"
|
||||||
|
|
||||||
|
(role_err,response)=put_role_policy(iam_client,general_role_name,None,role_policy)
|
||||||
|
eq(response['ResponseMetadata']['HTTPStatusCode'],200)
|
||||||
|
|
||||||
|
s3client_iamcreds = get_s3_client_using_iam_creds()
|
||||||
|
bucket_name_1 = 'test1'
|
||||||
|
s3bucket = s3client_iamcreds.create_bucket(Bucket=bucket_name_1)
|
||||||
|
eq(s3bucket['ResponseMetadata']['HTTPStatusCode'],200)
|
||||||
|
|
||||||
|
resource1 = "arn:aws:s3:::" + bucket_name_1
|
||||||
|
resource2 = "arn:aws:s3:::" + bucket_name_1 + "/*"
|
||||||
|
rolearn = "arn:aws:iam:::role/" + general_role_name
|
||||||
|
bucket_policy = json.dumps(
|
||||||
|
{
|
||||||
|
"Version": "2012-10-17",
|
||||||
|
"Statement": [{
|
||||||
|
"Effect": "Allow",
|
||||||
|
"Principal": {"AWS": "{}".format(rolearn)},
|
||||||
|
"Action": ["s3:GetObject","s3:PutObject"],
|
||||||
|
"Resource": [
|
||||||
|
"{}".format(resource1),
|
||||||
|
"{}".format(resource2)
|
||||||
|
]
|
||||||
|
}]
|
||||||
|
})
|
||||||
|
s3client_iamcreds.put_bucket_policy(Bucket=bucket_name_1, Policy=bucket_policy)
|
||||||
|
session_policy = "{\"Version\":\"2012-10-17\",\"Statement\":{\"Effect\":\"Allow\",\"Action\":[\"s3:PutObject\"],\"Resource\":[\"arn:aws:s3:::test1\",\"arn:aws:s3:::test1/*\"]}}"
|
||||||
|
|
||||||
|
resp=sts_client.assume_role_with_web_identity(RoleArn=role_response['Role']['Arn'],RoleSessionName=role_session_name,WebIdentityToken=token,Policy=session_policy)
|
||||||
|
eq(resp['ResponseMetadata']['HTTPStatusCode'],200)
|
||||||
|
|
||||||
|
s3_client = boto3.client('s3',
|
||||||
|
aws_access_key_id = resp['Credentials']['AccessKeyId'],
|
||||||
|
aws_secret_access_key = resp['Credentials']['SecretAccessKey'],
|
||||||
|
aws_session_token = resp['Credentials']['SessionToken'],
|
||||||
|
endpoint_url=default_endpoint,
|
||||||
|
region_name='',
|
||||||
|
)
|
||||||
|
bucket_body = 'this is a test file'
|
||||||
|
s3_put_obj = s3_client.put_object(Body=bucket_body, Bucket=bucket_name_1, Key="test-1.txt")
|
||||||
|
eq(s3_put_obj['ResponseMetadata']['HTTPStatusCode'],200)
|
||||||
|
|
||||||
|
try:
|
||||||
|
obj = s3_client.get_object(Bucket=bucket_name_1, Key="test-1.txt")
|
||||||
|
except ClientError as e:
|
||||||
|
s3object_error = e.response.get("Error", {}).get("Code")
|
||||||
|
eq(s3object_error, 'AccessDenied')
|
||||||
|
|
||||||
|
oidc_remove=iam_client.delete_open_id_connect_provider(
|
||||||
|
OpenIDConnectProviderArn=oidc_arn
|
||||||
|
)
|
||||||
|
|
||||||
|
@attr(resource='assume role with web identity')
|
||||||
|
@attr(method='get')
|
||||||
|
@attr(operation='check')
|
||||||
|
@attr(assertion='checking op when bucket policy has session arn')
|
||||||
|
@attr('webidentity_test')
|
||||||
|
@attr('session_policy')
|
||||||
|
def test_session_policy_bucket_policy_session_arn():
|
||||||
|
check_webidentity()
|
||||||
|
iam_client=get_iam_client()
|
||||||
|
sts_client=get_sts_client()
|
||||||
|
default_endpoint=get_config_endpoint()
|
||||||
|
role_session_name=get_parameter_name()
|
||||||
|
thumbprint=get_thumbprint()
|
||||||
|
aud=get_aud()
|
||||||
|
token=get_token()
|
||||||
|
realm=get_realm_name()
|
||||||
|
|
||||||
|
url = 'http://localhost:8080/auth/realms/{}'.format(realm)
|
||||||
|
thumbprintlist = [thumbprint]
|
||||||
|
(oidc_arn,oidc_error) = create_oidc_provider(iam_client, url, None, thumbprintlist)
|
||||||
|
if oidc_error is not None:
|
||||||
|
raise RuntimeError('Unable to create/get openid connect provider {}'.format(oidc_error))
|
||||||
|
|
||||||
|
policy_document = "{\"Version\":\"2012-10-17\",\"Statement\":[{\"Effect\":\"Allow\",\"Principal\":{\"Federated\":[\""+oidc_arn+"\"]},\"Action\":[\"sts:AssumeRoleWithWebIdentity\"],\"Condition\":{\"StringEquals\":{\"localhost:8080/auth/realms/"+realm+":app_id\":\""+aud+"\"}}}]}"
|
||||||
|
(role_error,role_response,general_role_name)=create_role(iam_client,'/',None,policy_document,None,None,None)
|
||||||
|
eq(role_response['Role']['Arn'],'arn:aws:iam:::role/'+general_role_name+'')
|
||||||
|
role_policy = "{\"Version\":\"2012-10-17\",\"Statement\":{\"Effect\":\"Allow\",\"Action\":\"s3:*\",\"Resource\":[\"*\"]}}"
|
||||||
|
|
||||||
|
(role_err,response)=put_role_policy(iam_client,general_role_name,None,role_policy)
|
||||||
|
eq(response['ResponseMetadata']['HTTPStatusCode'],200)
|
||||||
|
|
||||||
|
s3client_iamcreds = get_s3_client_using_iam_creds()
|
||||||
|
bucket_name_1 = 'test1'
|
||||||
|
s3bucket = s3client_iamcreds.create_bucket(Bucket=bucket_name_1)
|
||||||
|
eq(s3bucket['ResponseMetadata']['HTTPStatusCode'],200)
|
||||||
|
|
||||||
|
resource1 = "arn:aws:s3:::" + bucket_name_1
|
||||||
|
resource2 = "arn:aws:s3:::" + bucket_name_1 + "/*"
|
||||||
|
rolesessionarn = "arn:aws:iam:::assumed-role/" + general_role_name + "/" + role_session_name
|
||||||
|
bucket_policy = json.dumps(
|
||||||
|
{
|
||||||
|
"Version": "2012-10-17",
|
||||||
|
"Statement": [{
|
||||||
|
"Effect": "Allow",
|
||||||
|
"Principal": {"AWS": "{}".format(rolesessionarn)},
|
||||||
|
"Action": ["s3:GetObject","s3:PutObject"],
|
||||||
|
"Resource": [
|
||||||
|
"{}".format(resource1),
|
||||||
|
"{}".format(resource2)
|
||||||
|
]
|
||||||
|
}]
|
||||||
|
})
|
||||||
|
s3client_iamcreds.put_bucket_policy(Bucket=bucket_name_1, Policy=bucket_policy)
|
||||||
|
session_policy = "{\"Version\":\"2012-10-17\",\"Statement\":{\"Effect\":\"Allow\",\"Action\":[\"s3:PutObject\"],\"Resource\":[\"arn:aws:s3:::test1\",\"arn:aws:s3:::test1/*\"]}}"
|
||||||
|
|
||||||
|
resp=sts_client.assume_role_with_web_identity(RoleArn=role_response['Role']['Arn'],RoleSessionName=role_session_name,WebIdentityToken=token,Policy=session_policy)
|
||||||
|
eq(resp['ResponseMetadata']['HTTPStatusCode'],200)
|
||||||
|
|
||||||
|
s3_client = boto3.client('s3',
|
||||||
|
aws_access_key_id = resp['Credentials']['AccessKeyId'],
|
||||||
|
aws_secret_access_key = resp['Credentials']['SecretAccessKey'],
|
||||||
|
aws_session_token = resp['Credentials']['SessionToken'],
|
||||||
|
endpoint_url=default_endpoint,
|
||||||
|
region_name='',
|
||||||
|
)
|
||||||
|
bucket_body = 'this is a test file'
|
||||||
|
s3_put_obj = s3_client.put_object(Body=bucket_body, Bucket=bucket_name_1, Key="test-1.txt")
|
||||||
|
eq(s3_put_obj['ResponseMetadata']['HTTPStatusCode'],200)
|
||||||
|
|
||||||
|
|
||||||
|
s3_get_obj = s3_client.get_object(Bucket=bucket_name_1, Key="test-1.txt")
|
||||||
|
eq(s3_get_obj['ResponseMetadata']['HTTPStatusCode'],200)
|
||||||
|
|
||||||
|
oidc_remove=iam_client.delete_open_id_connect_provider(
|
||||||
|
OpenIDConnectProviderArn=oidc_arn
|
||||||
|
)
|
||||||
|
|
||||||
|
@attr(resource='assume role with web identity')
|
||||||
|
@attr(method='put')
|
||||||
|
@attr(operation='check')
|
||||||
|
@attr(assertion='checking copy object op with role, session and bucket policy')
|
||||||
|
@attr('webidentity_test')
|
||||||
|
@attr('session_policy')
|
||||||
|
def test_session_policy_copy_object():
|
||||||
|
check_webidentity()
|
||||||
|
iam_client=get_iam_client()
|
||||||
|
sts_client=get_sts_client()
|
||||||
|
default_endpoint=get_config_endpoint()
|
||||||
|
role_session_name=get_parameter_name()
|
||||||
|
thumbprint=get_thumbprint()
|
||||||
|
aud=get_aud()
|
||||||
|
token=get_token()
|
||||||
|
realm=get_realm_name()
|
||||||
|
|
||||||
|
url = 'http://localhost:8080/auth/realms/{}'.format(realm)
|
||||||
|
thumbprintlist = [thumbprint]
|
||||||
|
(oidc_arn,oidc_error) = create_oidc_provider(iam_client, url, None, thumbprintlist)
|
||||||
|
if oidc_error is not None:
|
||||||
|
raise RuntimeError('Unable to create/get openid connect provider {}'.format(oidc_error))
|
||||||
|
|
||||||
|
policy_document = "{\"Version\":\"2012-10-17\",\"Statement\":[{\"Effect\":\"Allow\",\"Principal\":{\"Federated\":[\""+oidc_arn+"\"]},\"Action\":[\"sts:AssumeRoleWithWebIdentity\"],\"Condition\":{\"StringEquals\":{\"localhost:8080/auth/realms/"+realm+":app_id\":\""+aud+"\"}}}]}"
|
||||||
|
(role_error,role_response,general_role_name)=create_role(iam_client,'/',None,policy_document,None,None,None)
|
||||||
|
eq(role_response['Role']['Arn'],'arn:aws:iam:::role/'+general_role_name+'')
|
||||||
|
role_policy = "{\"Version\":\"2012-10-17\",\"Statement\":{\"Effect\":\"Allow\",\"Action\":\"s3:*\",\"Resource\":[\"*\"]}}"
|
||||||
|
|
||||||
|
(role_err,response)=put_role_policy(iam_client,general_role_name,None,role_policy)
|
||||||
|
eq(response['ResponseMetadata']['HTTPStatusCode'],200)
|
||||||
|
|
||||||
|
s3client_iamcreds = get_s3_client_using_iam_creds()
|
||||||
|
bucket_name_1 = 'test1'
|
||||||
|
s3bucket = s3client_iamcreds.create_bucket(Bucket=bucket_name_1)
|
||||||
|
eq(s3bucket['ResponseMetadata']['HTTPStatusCode'],200)
|
||||||
|
|
||||||
|
resource1 = "arn:aws:s3:::" + bucket_name_1
|
||||||
|
resource2 = "arn:aws:s3:::" + bucket_name_1 + "/*"
|
||||||
|
rolesessionarn = "arn:aws:iam:::assumed-role/" + general_role_name + "/" + role_session_name
|
||||||
|
print (rolesessionarn)
|
||||||
|
bucket_policy = json.dumps(
|
||||||
|
{
|
||||||
|
"Version": "2012-10-17",
|
||||||
|
"Statement": [{
|
||||||
|
"Effect": "Allow",
|
||||||
|
"Principal": {"AWS": "{}".format(rolesessionarn)},
|
||||||
|
"Action": ["s3:GetObject","s3:PutObject"],
|
||||||
|
"Resource": [
|
||||||
|
"{}".format(resource1),
|
||||||
|
"{}".format(resource2)
|
||||||
|
]
|
||||||
|
}]
|
||||||
|
})
|
||||||
|
s3client_iamcreds.put_bucket_policy(Bucket=bucket_name_1, Policy=bucket_policy)
|
||||||
|
session_policy = "{\"Version\":\"2012-10-17\",\"Statement\":{\"Effect\":\"Allow\",\"Action\":[\"s3:PutObject\"],\"Resource\":[\"arn:aws:s3:::test1\",\"arn:aws:s3:::test1/*\"]}}"
|
||||||
|
|
||||||
|
resp=sts_client.assume_role_with_web_identity(RoleArn=role_response['Role']['Arn'],RoleSessionName=role_session_name,WebIdentityToken=token,Policy=session_policy)
|
||||||
|
eq(resp['ResponseMetadata']['HTTPStatusCode'],200)
|
||||||
|
|
||||||
|
s3_client = boto3.client('s3',
|
||||||
|
aws_access_key_id = resp['Credentials']['AccessKeyId'],
|
||||||
|
aws_secret_access_key = resp['Credentials']['SecretAccessKey'],
|
||||||
|
aws_session_token = resp['Credentials']['SessionToken'],
|
||||||
|
endpoint_url=default_endpoint,
|
||||||
|
region_name='',
|
||||||
|
)
|
||||||
|
bucket_body = 'this is a test file'
|
||||||
|
s3_put_obj = s3_client.put_object(Body=bucket_body, Bucket=bucket_name_1, Key="test-1.txt")
|
||||||
|
eq(s3_put_obj['ResponseMetadata']['HTTPStatusCode'],200)
|
||||||
|
|
||||||
|
copy_source = {
|
||||||
|
'Bucket': bucket_name_1,
|
||||||
|
'Key': 'test-1.txt'
|
||||||
|
}
|
||||||
|
|
||||||
|
s3_client.copy(copy_source, bucket_name_1, "test-2.txt")
|
||||||
|
|
||||||
|
s3_get_obj = s3_client.get_object(Bucket=bucket_name_1, Key="test-2.txt")
|
||||||
|
eq(s3_get_obj['ResponseMetadata']['HTTPStatusCode'],200)
|
||||||
|
|
||||||
|
oidc_remove=iam_client.delete_open_id_connect_provider(
|
||||||
|
OpenIDConnectProviderArn=oidc_arn
|
||||||
|
)
|
||||||
|
|
||||||
|
@attr(resource='assume role with web identity')
|
||||||
|
@attr(method='put')
|
||||||
|
@attr(operation='check')
|
||||||
|
@attr(assertion='checking op is denied when no role policy')
|
||||||
|
@attr('webidentity_test')
|
||||||
|
@attr('session_policy')
|
||||||
|
def test_session_policy_no_bucket_role_policy():
|
||||||
|
check_webidentity()
|
||||||
|
iam_client=get_iam_client()
|
||||||
|
sts_client=get_sts_client()
|
||||||
|
default_endpoint=get_config_endpoint()
|
||||||
|
role_session_name=get_parameter_name()
|
||||||
|
thumbprint=get_thumbprint()
|
||||||
|
aud=get_aud()
|
||||||
|
token=get_token()
|
||||||
|
realm=get_realm_name()
|
||||||
|
|
||||||
|
url = 'http://localhost:8080/auth/realms/{}'.format(realm)
|
||||||
|
thumbprintlist = [thumbprint]
|
||||||
|
(oidc_arn,oidc_error) = create_oidc_provider(iam_client, url, None, thumbprintlist)
|
||||||
|
if oidc_error is not None:
|
||||||
|
raise RuntimeError('Unable to create/get openid connect provider {}'.format(oidc_error))
|
||||||
|
|
||||||
|
policy_document = "{\"Version\":\"2012-10-17\",\"Statement\":[{\"Effect\":\"Allow\",\"Principal\":{\"Federated\":[\""+oidc_arn+"\"]},\"Action\":[\"sts:AssumeRoleWithWebIdentity\"],\"Condition\":{\"StringEquals\":{\"localhost:8080/auth/realms/"+realm+":app_id\":\""+aud+"\"}}}]}"
|
||||||
|
(role_error,role_response,general_role_name)=create_role(iam_client,'/',None,policy_document,None,None,None)
|
||||||
|
eq(role_response['Role']['Arn'],'arn:aws:iam:::role/'+general_role_name+'')
|
||||||
|
|
||||||
|
s3client_iamcreds = get_s3_client_using_iam_creds()
|
||||||
|
bucket_name_1 = 'test1'
|
||||||
|
s3bucket = s3client_iamcreds.create_bucket(Bucket=bucket_name_1)
|
||||||
|
eq(s3bucket['ResponseMetadata']['HTTPStatusCode'],200)
|
||||||
|
|
||||||
|
session_policy = "{\"Version\":\"2012-10-17\",\"Statement\":{\"Effect\":\"Allow\",\"Action\":[\"s3:PutObject\",\"s3:GetObject\"],\"Resource\":[\"arn:aws:s3:::test1\",\"arn:aws:s3:::test1/*\"]}}"
|
||||||
|
|
||||||
|
resp=sts_client.assume_role_with_web_identity(RoleArn=role_response['Role']['Arn'],RoleSessionName=role_session_name,WebIdentityToken=token,Policy=session_policy)
|
||||||
|
eq(resp['ResponseMetadata']['HTTPStatusCode'],200)
|
||||||
|
|
||||||
|
s3_client = boto3.client('s3',
|
||||||
|
aws_access_key_id = resp['Credentials']['AccessKeyId'],
|
||||||
|
aws_secret_access_key = resp['Credentials']['SecretAccessKey'],
|
||||||
|
aws_session_token = resp['Credentials']['SessionToken'],
|
||||||
|
endpoint_url=default_endpoint,
|
||||||
|
region_name='',
|
||||||
|
)
|
||||||
|
bucket_body = 'this is a test file'
|
||||||
|
try:
|
||||||
|
s3_put_obj = s3_client.put_object(Body=bucket_body, Bucket=bucket_name_1, Key="test-1.txt")
|
||||||
|
except ClientError as e:
|
||||||
|
s3putobj_error = e.response.get("Error", {}).get("Code")
|
||||||
|
eq(s3putobj_error, 'AccessDenied')
|
||||||
|
|
||||||
|
oidc_remove=iam_client.delete_open_id_connect_provider(
|
||||||
|
OpenIDConnectProviderArn=oidc_arn
|
||||||
|
)
|
||||||
|
|
||||||
|
@attr(resource='assume role with web identity')
|
||||||
|
@attr(method='put')
|
||||||
|
@attr(operation='check')
|
||||||
|
@attr(assertion='checking op is denied when resource policy denies')
|
||||||
|
@attr('webidentity_test')
|
||||||
|
@attr('session_policy')
|
||||||
|
def test_session_policy_bucket_policy_deny():
|
||||||
|
check_webidentity()
|
||||||
|
iam_client=get_iam_client()
|
||||||
|
sts_client=get_sts_client()
|
||||||
|
default_endpoint=get_config_endpoint()
|
||||||
|
role_session_name=get_parameter_name()
|
||||||
|
thumbprint=get_thumbprint()
|
||||||
|
aud=get_aud()
|
||||||
|
token=get_token()
|
||||||
|
realm=get_realm_name()
|
||||||
|
|
||||||
|
url = 'http://localhost:8080/auth/realms/{}'.format(realm)
|
||||||
|
thumbprintlist = [thumbprint]
|
||||||
|
(oidc_arn,oidc_error) = create_oidc_provider(iam_client, url, None, thumbprintlist)
|
||||||
|
if oidc_error is not None:
|
||||||
|
raise RuntimeError('Unable to create/get openid connect provider {}'.format(oidc_error))
|
||||||
|
|
||||||
|
policy_document = "{\"Version\":\"2012-10-17\",\"Statement\":[{\"Effect\":\"Allow\",\"Principal\":{\"Federated\":[\""+oidc_arn+"\"]},\"Action\":[\"sts:AssumeRoleWithWebIdentity\"],\"Condition\":{\"StringEquals\":{\"localhost:8080/auth/realms/"+realm+":app_id\":\""+aud+"\"}}}]}"
|
||||||
|
(role_error,role_response,general_role_name)=create_role(iam_client,'/',None,policy_document,None,None,None)
|
||||||
|
eq(role_response['Role']['Arn'],'arn:aws:iam:::role/'+general_role_name+'')
|
||||||
|
role_policy = "{\"Version\":\"2012-10-17\",\"Statement\":{\"Effect\":\"Allow\",\"Action\":\"s3:*\",\"Resource\":[\"*\"]}}"
|
||||||
|
|
||||||
|
(role_err,response)=put_role_policy(iam_client,general_role_name,None,role_policy)
|
||||||
|
eq(response['ResponseMetadata']['HTTPStatusCode'],200)
|
||||||
|
|
||||||
|
s3client_iamcreds = get_s3_client_using_iam_creds()
|
||||||
|
bucket_name_1 = 'test1'
|
||||||
|
s3bucket = s3client_iamcreds.create_bucket(Bucket=bucket_name_1)
|
||||||
|
eq(s3bucket['ResponseMetadata']['HTTPStatusCode'],200)
|
||||||
|
|
||||||
|
resource1 = "arn:aws:s3:::" + bucket_name_1
|
||||||
|
resource2 = "arn:aws:s3:::" + bucket_name_1 + "/*"
|
||||||
|
rolesessionarn = "arn:aws:iam:::assumed-role/" + general_role_name + "/" + role_session_name
|
||||||
|
bucket_policy = json.dumps(
|
||||||
|
{
|
||||||
|
"Version": "2012-10-17",
|
||||||
|
"Statement": [{
|
||||||
|
"Effect": "Deny",
|
||||||
|
"Principal": {"AWS": "{}".format(rolesessionarn)},
|
||||||
|
"Action": ["s3:GetObject","s3:PutObject"],
|
||||||
|
"Resource": [
|
||||||
|
"{}".format(resource1),
|
||||||
|
"{}".format(resource2)
|
||||||
|
]
|
||||||
|
}]
|
||||||
|
})
|
||||||
|
s3client_iamcreds.put_bucket_policy(Bucket=bucket_name_1, Policy=bucket_policy)
|
||||||
|
session_policy = "{\"Version\":\"2012-10-17\",\"Statement\":{\"Effect\":\"Allow\",\"Action\":[\"s3:PutObject\"],\"Resource\":[\"arn:aws:s3:::test1\",\"arn:aws:s3:::test1/*\"]}}"
|
||||||
|
|
||||||
|
resp=sts_client.assume_role_with_web_identity(RoleArn=role_response['Role']['Arn'],RoleSessionName=role_session_name,WebIdentityToken=token,Policy=session_policy)
|
||||||
|
eq(resp['ResponseMetadata']['HTTPStatusCode'],200)
|
||||||
|
|
||||||
|
s3_client = boto3.client('s3',
|
||||||
|
aws_access_key_id = resp['Credentials']['AccessKeyId'],
|
||||||
|
aws_secret_access_key = resp['Credentials']['SecretAccessKey'],
|
||||||
|
aws_session_token = resp['Credentials']['SessionToken'],
|
||||||
|
endpoint_url=default_endpoint,
|
||||||
|
region_name='',
|
||||||
|
)
|
||||||
|
bucket_body = 'this is a test file'
|
||||||
|
|
||||||
|
try:
|
||||||
|
s3_put_obj = s3_client.put_object(Body=bucket_body, Bucket=bucket_name_1, Key="test-1.txt")
|
||||||
|
except ClientError as e:
|
||||||
|
s3putobj_error = e.response.get("Error", {}).get("Code")
|
||||||
|
eq(s3putobj_error, 'AccessDenied')
|
||||||
|
|
||||||
|
oidc_remove=iam_client.delete_open_id_connect_provider(
|
||||||
|
OpenIDConnectProviderArn=oidc_arn
|
||||||
|
)
|
||||||
|
|
Loading…
Reference in a new issue