s3-tests/s3tests_boto3/functional/test_sts.py
Casey Bodley 8186dd7561 pytest: replace nose eq() with assert ==
Signed-off-by: Casey Bodley <cbodley@redhat.com>
(cherry picked from commit f5d0bc9be3)
2023-01-25 18:03:50 -05:00

2318 lines
103 KiB
Python

import boto3
import botocore.session
from botocore.exceptions import ClientError
from botocore.exceptions import ParamValidationError
from nose.plugins.attrib import attr
import pytest
import isodate
import email.utils
import datetime
import threading
import re
import pytz
from collections import OrderedDict
import requests
import json
import base64
import hmac
import hashlib
import xml.etree.ElementTree as ET
import time
import operator
import nose
import os
import string
import random
import socket
import ssl
import logging
from collections import namedtuple
from email.header import decode_header
from . import(
configfile,
setup_teardown,
get_iam_client,
get_sts_client,
get_client,
get_alt_user_id,
get_config_endpoint,
get_new_bucket_name,
get_parameter_name,
get_main_aws_access_key,
get_main_aws_secret_key,
get_thumbprint,
get_aud,
get_token,
get_realm_name,
check_webidentity,
get_iam_access_key,
get_iam_secret_key,
get_sub,
get_azp,
get_user_token
)
log = logging.getLogger(__name__)
def create_role(iam_client,path,rolename,policy_document,description,sessionduration,permissionboundary,tag_list=None):
role_err=None
if rolename is None:
rolename=get_parameter_name()
if tag_list is None:
tag_list = []
try:
role_response = iam_client.create_role(Path=path,RoleName=rolename,AssumeRolePolicyDocument=policy_document,Tags=tag_list)
except ClientError as e:
role_err = e.response['Code']
return (role_err,role_response,rolename)
def put_role_policy(iam_client,rolename,policyname,role_policy):
role_err=None
if policyname is None:
policyname=get_parameter_name()
try:
role_response = iam_client.put_role_policy(RoleName=rolename,PolicyName=policyname,PolicyDocument=role_policy)
except ClientError as e:
role_err = e.response['Code']
return (role_err,role_response)
def put_user_policy(iam_client,username,policyname,policy_document):
role_err=None
if policyname is None:
policyname=get_parameter_name()
try:
role_response = iam_client.put_user_policy(UserName=username,PolicyName=policyname,PolicyDocument=policy_document)
except ClientError as e:
role_err = e.response['Code']
return (role_err,role_response,policyname)
def get_s3_client_using_iam_creds():
iam_access_key = get_iam_access_key()
iam_secret_key = get_iam_secret_key()
default_endpoint = get_config_endpoint()
s3_client_iam_creds = boto3.client('s3',
aws_access_key_id = iam_access_key,
aws_secret_access_key = iam_secret_key,
endpoint_url=default_endpoint,
region_name='',
)
return s3_client_iam_creds
def create_oidc_provider(iam_client, url, clientidlist, thumbprintlist):
oidc_arn = None
oidc_error = None
clientids = []
if clientidlist is None:
clientidlist=clientids
try:
oidc_response = iam_client.create_open_id_connect_provider(
Url=url,
ClientIDList=clientidlist,
ThumbprintList=thumbprintlist,
)
oidc_arn = oidc_response['OpenIDConnectProviderArn']
print (oidc_arn)
except ClientError as e:
oidc_error = e.response['Code']
print (oidc_error)
try:
oidc_error = None
print (url)
if url.startswith('http://'):
url = url[len('http://'):]
elif url.startswith('https://'):
url = url[len('https://'):]
elif url.startswith('www.'):
url = url[len('www.'):]
oidc_arn = 'arn:aws:iam:::oidc-provider/{}'.format(url)
print (url)
print (oidc_arn)
oidc_response = iam_client.get_open_id_connect_provider(OpenIDConnectProviderArn=oidc_arn)
except ClientError as e:
oidc_arn = None
return (oidc_arn, oidc_error)
def get_s3_resource_using_iam_creds():
iam_access_key = get_iam_access_key()
iam_secret_key = get_iam_secret_key()
default_endpoint = get_config_endpoint()
s3_res_iam_creds = boto3.resource('s3',
aws_access_key_id = iam_access_key,
aws_secret_access_key = iam_secret_key,
endpoint_url=default_endpoint,
region_name='',
)
return s3_res_iam_creds
@attr(resource='get session token')
@attr(method='get')
@attr(operation='check')
@attr(assertion='s3 ops only accessible by temporary credentials')
@attr('test_of_sts')
@pytest.mark.test_of_sts
@attr('fails_on_dbstore')
@pytest.mark.fails_on_dbstore
def test_get_session_token():
iam_client=get_iam_client()
sts_client=get_sts_client()
sts_user_id=get_alt_user_id()
default_endpoint=get_config_endpoint()
user_policy = "{\"Version\":\"2012-10-17\",\"Statement\":[{\"Effect\":\"Deny\",\"Action\":\"s3:*\",\"Resource\":[\"*\"],\"Condition\":{\"BoolIfExists\":{\"sts:authentication\":\"false\"}}},{\"Effect\":\"Allow\",\"Action\":\"sts:GetSessionToken\",\"Resource\":\"*\",\"Condition\":{\"BoolIfExists\":{\"sts:authentication\":\"false\"}}}]}"
(resp_err,resp,policy_name)=put_user_policy(iam_client,sts_user_id,None,user_policy)
assert resp['ResponseMetadata']['HTTPStatusCode'] == 200
response=sts_client.get_session_token()
assert response['ResponseMetadata']['HTTPStatusCode'] == 200
s3_client=boto3.client('s3',
aws_access_key_id = response['Credentials']['AccessKeyId'],
aws_secret_access_key = response['Credentials']['SecretAccessKey'],
aws_session_token = response['Credentials']['SessionToken'],
endpoint_url=default_endpoint,
region_name='',
)
bucket_name = get_new_bucket_name()
try:
s3bucket = s3_client.create_bucket(Bucket=bucket_name)
assert s3bucket['ResponseMetadata']['HTTPStatusCode'] == 200
finish=s3_client.delete_bucket(Bucket=bucket_name)
finally: # clean up user policy even if create_bucket/delete_bucket fails
iam_client.delete_user_policy(UserName=sts_user_id,PolicyName=policy_name)
@attr(resource='get session token')
@attr(method='get')
@attr(operation='check')
@attr(assertion='s3 ops denied by permanent credentials')
@attr('test_of_sts')
@pytest.mark.test_of_sts
@attr('fails_on_dbstore')
@pytest.mark.fails_on_dbstore
def test_get_session_token_permanent_creds_denied():
s3bucket_error=None
iam_client=get_iam_client()
sts_client=get_sts_client()
sts_user_id=get_alt_user_id()
default_endpoint=get_config_endpoint()
s3_main_access_key=get_main_aws_access_key()
s3_main_secret_key=get_main_aws_secret_key()
user_policy = "{\"Version\":\"2012-10-17\",\"Statement\":[{\"Effect\":\"Deny\",\"Action\":\"s3:*\",\"Resource\":[\"*\"],\"Condition\":{\"BoolIfExists\":{\"sts:authentication\":\"false\"}}},{\"Effect\":\"Allow\",\"Action\":\"sts:GetSessionToken\",\"Resource\":\"*\",\"Condition\":{\"BoolIfExists\":{\"sts:authentication\":\"false\"}}}]}"
(resp_err,resp,policy_name)=put_user_policy(iam_client,sts_user_id,None,user_policy)
assert resp['ResponseMetadata']['HTTPStatusCode'] == 200
response=sts_client.get_session_token()
assert response['ResponseMetadata']['HTTPStatusCode'] == 200
s3_client=boto3.client('s3',
aws_access_key_id = s3_main_access_key,
aws_secret_access_key = s3_main_secret_key,
aws_session_token = response['Credentials']['SessionToken'],
endpoint_url=default_endpoint,
region_name='',
)
bucket_name = get_new_bucket_name()
try:
s3bucket = s3_client.create_bucket(Bucket=bucket_name)
except ClientError as e:
s3bucket_error = e.response.get("Error", {}).get("Code")
assert s3bucket_error == 'AccessDenied'
iam_client.delete_user_policy(UserName=sts_user_id,PolicyName=policy_name)
@attr(resource='assume role')
@attr(method='get')
@attr(operation='check')
@attr(assertion='role policy allows all s3 ops')
@attr('test_of_sts')
@pytest.mark.test_of_sts
@attr('fails_on_dbstore')
@pytest.mark.fails_on_dbstore
def test_assume_role_allow():
iam_client=get_iam_client()
sts_client=get_sts_client()
sts_user_id=get_alt_user_id()
default_endpoint=get_config_endpoint()
role_session_name=get_parameter_name()
policy_document = "{\"Version\":\"2012-10-17\",\"Statement\":[{\"Effect\":\"Allow\",\"Principal\":{\"AWS\":[\"arn:aws:iam:::user/"+sts_user_id+"\"]},\"Action\":[\"sts:AssumeRole\"]}]}"
(role_error,role_response,general_role_name)=create_role(iam_client,'/',None,policy_document,None,None,None)
assert role_response['Role']['Arn'] == 'arn:aws:iam:::role/'+general_role_name+''
role_policy = "{\"Version\":\"2012-10-17\",\"Statement\":{\"Effect\":\"Allow\",\"Action\":\"s3:*\",\"Resource\":\"arn:aws:s3:::*\"}}"
(role_err,response)=put_role_policy(iam_client,general_role_name,None,role_policy)
assert response['ResponseMetadata']['HTTPStatusCode'] == 200
resp=sts_client.assume_role(RoleArn=role_response['Role']['Arn'],RoleSessionName=role_session_name)
assert resp['ResponseMetadata']['HTTPStatusCode'] == 200
s3_client = boto3.client('s3',
aws_access_key_id = resp['Credentials']['AccessKeyId'],
aws_secret_access_key = resp['Credentials']['SecretAccessKey'],
aws_session_token = resp['Credentials']['SessionToken'],
endpoint_url=default_endpoint,
region_name='',
)
bucket_name = get_new_bucket_name()
s3bucket = s3_client.create_bucket(Bucket=bucket_name)
assert s3bucket['ResponseMetadata']['HTTPStatusCode'] == 200
bkt = s3_client.delete_bucket(Bucket=bucket_name)
assert bkt['ResponseMetadata']['HTTPStatusCode'] == 204
@attr(resource='assume role')
@attr(method='get')
@attr(operation='check')
@attr(assertion='role policy denies all s3 ops')
@attr('test_of_sts')
@pytest.mark.test_of_sts
@attr('fails_on_dbstore')
@pytest.mark.fails_on_dbstore
def test_assume_role_deny():
s3bucket_error=None
iam_client=get_iam_client()
sts_client=get_sts_client()
sts_user_id=get_alt_user_id()
default_endpoint=get_config_endpoint()
role_session_name=get_parameter_name()
policy_document = "{\"Version\":\"2012-10-17\",\"Statement\":[{\"Effect\":\"Allow\",\"Principal\":{\"AWS\":[\"arn:aws:iam:::user/"+sts_user_id+"\"]},\"Action\":[\"sts:AssumeRole\"]}]}"
(role_error,role_response,general_role_name)=create_role(iam_client,'/',None,policy_document,None,None,None)
assert role_response['Role']['Arn'] == 'arn:aws:iam:::role/'+general_role_name+''
role_policy = "{\"Version\":\"2012-10-17\",\"Statement\":{\"Effect\":\"Deny\",\"Action\":\"s3:*\",\"Resource\":\"arn:aws:s3:::*\"}}"
(role_err,response)=put_role_policy(iam_client,general_role_name,None,role_policy)
assert response['ResponseMetadata']['HTTPStatusCode'] == 200
resp=sts_client.assume_role(RoleArn=role_response['Role']['Arn'],RoleSessionName=role_session_name)
assert resp['ResponseMetadata']['HTTPStatusCode'] == 200
s3_client = boto3.client('s3',
aws_access_key_id = resp['Credentials']['AccessKeyId'],
aws_secret_access_key = resp['Credentials']['SecretAccessKey'],
aws_session_token = resp['Credentials']['SessionToken'],
endpoint_url=default_endpoint,
region_name='',
)
bucket_name = get_new_bucket_name()
try:
s3bucket = s3_client.create_bucket(Bucket=bucket_name)
except ClientError as e:
s3bucket_error = e.response.get("Error", {}).get("Code")
assert s3bucket_error == 'AccessDenied'
@attr(resource='assume role')
@attr(method='get')
@attr(operation='check')
@attr(assertion='creds expire so all s3 ops fails')
@attr('test_of_sts')
@pytest.mark.test_of_sts
@attr('fails_on_dbstore')
@pytest.mark.fails_on_dbstore
def test_assume_role_creds_expiry():
iam_client=get_iam_client()
sts_client=get_sts_client()
sts_user_id=get_alt_user_id()
default_endpoint=get_config_endpoint()
role_session_name=get_parameter_name()
policy_document = "{\"Version\":\"2012-10-17\",\"Statement\":[{\"Effect\":\"Allow\",\"Principal\":{\"AWS\":[\"arn:aws:iam:::user/"+sts_user_id+"\"]},\"Action\":[\"sts:AssumeRole\"]}]}"
(role_error,role_response,general_role_name)=create_role(iam_client,'/',None,policy_document,None,None,None)
assert role_response['Role']['Arn'] == 'arn:aws:iam:::role/'+general_role_name+''
role_policy = "{\"Version\":\"2012-10-17\",\"Statement\":{\"Effect\":\"Allow\",\"Action\":\"s3:*\",\"Resource\":\"arn:aws:s3:::*\"}}"
(role_err,response)=put_role_policy(iam_client,general_role_name,None,role_policy)
assert response['ResponseMetadata']['HTTPStatusCode'] == 200
resp=sts_client.assume_role(RoleArn=role_response['Role']['Arn'],RoleSessionName=role_session_name,DurationSeconds=900)
assert resp['ResponseMetadata']['HTTPStatusCode'] == 200
time.sleep(900)
s3_client = boto3.client('s3',
aws_access_key_id = resp['Credentials']['AccessKeyId'],
aws_secret_access_key = resp['Credentials']['SecretAccessKey'],
aws_session_token = resp['Credentials']['SessionToken'],
endpoint_url=default_endpoint,
region_name='',
)
bucket_name = get_new_bucket_name()
try:
s3bucket = s3_client.create_bucket(Bucket=bucket_name)
except ClientError as e:
s3bucket_error = e.response.get("Error", {}).get("Code")
assert s3bucket_error == 'AccessDenied'
@attr(resource='assume role')
@attr(method='head')
@attr(operation='check')
@attr(assertion='HEAD fails with 403 when role policy denies s3:ListBucket')
@attr('test_of_sts')
@pytest.mark.test_of_sts
@attr('fails_on_dbstore')
@pytest.mark.fails_on_dbstore
def test_assume_role_deny_head_nonexistent():
# create a bucket with the normal s3 client
bucket_name = get_new_bucket_name()
get_client().create_bucket(Bucket=bucket_name)
iam_client=get_iam_client()
sts_client=get_sts_client()
sts_user_id=get_alt_user_id()
default_endpoint=get_config_endpoint()
role_session_name=get_parameter_name()
policy_document = '{"Version":"2012-10-17","Statement":[{"Effect":"Allow","Principal":{"AWS":["arn:aws:iam:::user/'+sts_user_id+'"]},"Action":["sts:AssumeRole"]}]}'
(role_error,role_response,general_role_name)=create_role(iam_client,'/',None,policy_document,None,None,None)
assert role_response['Role']['Arn'] == 'arn:aws:iam:::role/'+general_role_name
# allow GetObject but deny ListBucket
role_policy = '{"Version":"2012-10-17","Statement":{"Effect":"Allow","Action":"s3:GetObject","Principal":"*","Resource":"arn:aws:s3:::*"}}'
(role_err,response)=put_role_policy(iam_client,general_role_name,None,role_policy)
assert response['ResponseMetadata']['HTTPStatusCode'] == 200
resp=sts_client.assume_role(RoleArn=role_response['Role']['Arn'],RoleSessionName=role_session_name)
assert resp['ResponseMetadata']['HTTPStatusCode'] == 200
s3_client = boto3.client('s3',
aws_access_key_id = resp['Credentials']['AccessKeyId'],
aws_secret_access_key = resp['Credentials']['SecretAccessKey'],
aws_session_token = resp['Credentials']['SessionToken'],
endpoint_url=default_endpoint,
region_name='')
status=200
try:
s3_client.head_object(Bucket=bucket_name, Key='nonexistent')
except ClientError as e:
status = e.response['ResponseMetadata']['HTTPStatusCode']
assert status == 403
@attr(resource='assume role')
@attr(method='head')
@attr(operation='check')
@attr(assertion='HEAD fails with 404 when role policy allows s3:ListBucket')
@attr('test_of_sts')
@pytest.mark.test_of_sts
@attr('fails_on_dbstore')
@pytest.mark.fails_on_dbstore
def test_assume_role_allow_head_nonexistent():
# create a bucket with the normal s3 client
bucket_name = get_new_bucket_name()
get_client().create_bucket(Bucket=bucket_name)
iam_client=get_iam_client()
sts_client=get_sts_client()
sts_user_id=get_alt_user_id()
default_endpoint=get_config_endpoint()
role_session_name=get_parameter_name()
policy_document = '{"Version":"2012-10-17","Statement":[{"Effect":"Allow","Principal":{"AWS":["arn:aws:iam:::user/'+sts_user_id+'"]},"Action":["sts:AssumeRole"]}]}'
(role_error,role_response,general_role_name)=create_role(iam_client,'/',None,policy_document,None,None,None)
assert role_response['Role']['Arn'] == 'arn:aws:iam:::role/'+general_role_name
# allow GetObject and ListBucket
role_policy = '{"Version":"2012-10-17","Statement":{"Effect":"Allow","Action":["s3:GetObject","s3:ListBucket"],"Principal":"*","Resource":"arn:aws:s3:::*"}}'
(role_err,response)=put_role_policy(iam_client,general_role_name,None,role_policy)
assert response['ResponseMetadata']['HTTPStatusCode'] == 200
resp=sts_client.assume_role(RoleArn=role_response['Role']['Arn'],RoleSessionName=role_session_name)
assert resp['ResponseMetadata']['HTTPStatusCode'] == 200
s3_client = boto3.client('s3',
aws_access_key_id = resp['Credentials']['AccessKeyId'],
aws_secret_access_key = resp['Credentials']['SecretAccessKey'],
aws_session_token = resp['Credentials']['SessionToken'],
endpoint_url=default_endpoint,
region_name='')
status=200
try:
s3_client.head_object(Bucket=bucket_name, Key='nonexistent')
except ClientError as e:
status = e.response['ResponseMetadata']['HTTPStatusCode']
assert status == 404
@attr(resource='assume role with web identity')
@attr(method='get')
@attr(operation='check')
@attr(assertion='assuming role through web token')
@attr('webidentity_test')
@pytest.mark.webidentity_test
@attr('token_claims_trust_policy_test')
@pytest.mark.token_claims_trust_policy_test
@attr('fails_on_dbstore')
@pytest.mark.fails_on_dbstore
def test_assume_role_with_web_identity():
check_webidentity()
iam_client=get_iam_client()
sts_client=get_sts_client()
default_endpoint=get_config_endpoint()
role_session_name=get_parameter_name()
thumbprint=get_thumbprint()
aud=get_aud()
token=get_token()
realm=get_realm_name()
oidc_response = iam_client.create_open_id_connect_provider(
Url='http://localhost:8080/auth/realms/{}'.format(realm),
ThumbprintList=[
thumbprint,
],
)
policy_document = "{\"Version\":\"2012-10-17\",\"Statement\":[{\"Effect\":\"Allow\",\"Principal\":{\"Federated\":[\""+oidc_response["OpenIDConnectProviderArn"]+"\"]},\"Action\":[\"sts:AssumeRoleWithWebIdentity\"],\"Condition\":{\"StringEquals\":{\"localhost:8080/auth/realms/"+realm+":app_id\":\""+aud+"\"}}}]}"
(role_error,role_response,general_role_name)=create_role(iam_client,'/',None,policy_document,None,None,None)
assert role_response['Role']['Arn'] == 'arn:aws:iam:::role/'+general_role_name+''
role_policy = "{\"Version\":\"2012-10-17\",\"Statement\":{\"Effect\":\"Allow\",\"Action\":\"s3:*\",\"Resource\":\"arn:aws:s3:::*\"}}"
(role_err,response)=put_role_policy(iam_client,general_role_name,None,role_policy)
assert response['ResponseMetadata']['HTTPStatusCode'] == 200
resp=sts_client.assume_role_with_web_identity(RoleArn=role_response['Role']['Arn'],RoleSessionName=role_session_name,WebIdentityToken=token)
assert resp['ResponseMetadata']['HTTPStatusCode'] == 200
s3_client = boto3.client('s3',
aws_access_key_id = resp['Credentials']['AccessKeyId'],
aws_secret_access_key = resp['Credentials']['SecretAccessKey'],
aws_session_token = resp['Credentials']['SessionToken'],
endpoint_url=default_endpoint,
region_name='',
)
bucket_name = get_new_bucket_name()
s3bucket = s3_client.create_bucket(Bucket=bucket_name)
assert s3bucket['ResponseMetadata']['HTTPStatusCode'] == 200
bkt = s3_client.delete_bucket(Bucket=bucket_name)
assert bkt['ResponseMetadata']['HTTPStatusCode'] == 204
oidc_remove=iam_client.delete_open_id_connect_provider(
OpenIDConnectProviderArn=oidc_response["OpenIDConnectProviderArn"]
)
'''
@attr(resource='assume role with web identity')
@attr(method='get')
@attr(operation='check')
@attr(assertion='assume_role_with_web_token creds expire')
@attr('webidentity_test')
@pytest.mark.webidentity_test
def test_assume_role_with_web_identity_invalid_webtoken():
resp_error=None
iam_client=get_iam_client()
sts_client=get_sts_client()
default_endpoint=get_config_endpoint()
role_session_name=get_parameter_name()
thumbprint=get_thumbprint()
aud=get_aud()
token=get_token()
realm=get_realm_name()
oidc_response = iam_client.create_open_id_connect_provider(
Url='http://localhost:8080/auth/realms/{}'.format(realm),
ThumbprintList=[
thumbprint,
],
)
policy_document = "{\"Version\":\"2012-10-17\",\"Statement\":[{\"Effect\":\"Allow\",\"Principal\":{\"Federated\":[\""+oidc_response["OpenIDConnectProviderArn"]+"\"]},\"Action\":[\"sts:AssumeRoleWithWebIdentity\"],\"Condition\":{\"StringEquals\":{\"localhost:8080/auth/realms/"+realm+":app_id\":\""+aud+"\"}}}]}"
(role_error,role_response,general_role_name)=create_role(iam_client,'/',None,policy_document,None,None,None)
assert role_response['Role']['Arn'] == 'arn:aws:iam:::role/'+general_role_name+''
role_policy = "{\"Version\":\"2012-10-17\",\"Statement\":{\"Effect\":\"Allow\",\"Action\":\"s3:*\",\"Resource\":\"arn:aws:s3:::*\"}}"
(role_err,response)=put_role_policy(iam_client,general_role_name,None,role_policy)
assert response['ResponseMetadata']['HTTPStatusCode'] == 200
resp=""
try:
resp=sts_client.assume_role_with_web_identity(RoleArn=role_response['Role']['Arn'],RoleSessionName=role_session_name,WebIdentityToken='abcdef')
except InvalidIdentityTokenException as e:
log.debug('{}'.format(resp))
log.debug('{}'.format(e.response.get("Error", {}).get("Code")))
log.debug('{}'.format(e))
resp_error = e.response.get("Error", {}).get("Code")
assert resp_error == 'AccessDenied'
oidc_remove=iam_client.delete_open_id_connect_provider(
OpenIDConnectProviderArn=oidc_response["OpenIDConnectProviderArn"]
)
'''
#######################
# Session Policy Tests
#######################
@attr(resource='assume role with web identity')
@attr(method='get')
@attr(operation='check')
@attr(assertion='checking session policy working for two different buckets')
@attr('webidentity_test')
@pytest.mark.webidentity_test
@attr('session_policy')
@pytest.mark.session_policy
@attr('fails_on_dbstore')
@pytest.mark.fails_on_dbstore
def test_session_policy_check_on_different_buckets():
check_webidentity()
iam_client=get_iam_client()
sts_client=get_sts_client()
default_endpoint=get_config_endpoint()
role_session_name=get_parameter_name()
thumbprint=get_thumbprint()
aud=get_aud()
token=get_token()
realm=get_realm_name()
url = 'http://localhost:8080/auth/realms/{}'.format(realm)
thumbprintlist = [thumbprint]
(oidc_arn,oidc_error) = create_oidc_provider(iam_client, url, None, thumbprintlist)
if oidc_error is not None:
raise RuntimeError('Unable to create/get openid connect provider {}'.format(oidc_error))
policy_document = "{\"Version\":\"2012-10-17\",\"Statement\":[{\"Effect\":\"Allow\",\"Principal\":{\"Federated\":[\""+oidc_arn+"\"]},\"Action\":[\"sts:AssumeRoleWithWebIdentity\"],\"Condition\":{\"StringEquals\":{\"localhost:8080/auth/realms/"+realm+":app_id\":\""+aud+"\"}}}]}"
(role_error,role_response,general_role_name)=create_role(iam_client,'/',None,policy_document,None,None,None)
assert role_response['Role']['Arn'] == 'arn:aws:iam:::role/'+general_role_name+''
role_policy_new = "{\"Version\":\"2012-10-17\",\"Statement\":{\"Effect\":\"Allow\",\"Action\":\"s3:*\",\"Resource\":[\"arn:aws:s3:::test2\",\"arn:aws:s3:::test2/*\"]}}"
(role_err,response)=put_role_policy(iam_client,general_role_name,None,role_policy_new)
assert response['ResponseMetadata']['HTTPStatusCode'] == 200
session_policy = "{\"Version\":\"2012-10-17\",\"Statement\":{\"Effect\":\"Allow\",\"Action\":[\"s3:GetObject\",\"s3:PutObject\"],\"Resource\":[\"arn:aws:s3:::test1\",\"arn:aws:s3:::test1/*\"]}}"
resp=sts_client.assume_role_with_web_identity(RoleArn=role_response['Role']['Arn'],RoleSessionName=role_session_name,WebIdentityToken=token,Policy=session_policy)
assert resp['ResponseMetadata']['HTTPStatusCode'] == 200
s3_client = boto3.client('s3',
aws_access_key_id = resp['Credentials']['AccessKeyId'],
aws_secret_access_key = resp['Credentials']['SecretAccessKey'],
aws_session_token = resp['Credentials']['SessionToken'],
endpoint_url=default_endpoint,
region_name='',
)
bucket_name_1 = 'test1'
try:
s3bucket = s3_client.create_bucket(Bucket=bucket_name_1)
except ClientError as e:
s3bucket_error = e.response.get("Error", {}).get("Code")
assert s3bucket_error == 'AccessDenied'
bucket_name_2 = 'test2'
try:
s3bucket = s3_client.create_bucket(Bucket=bucket_name_2)
except ClientError as e:
s3bucket_error = e.response.get("Error", {}).get("Code")
assert s3bucket_error == 'AccessDenied'
bucket_body = 'please-write-something'
#body.encode(encoding='utf_8')
try:
s3_put_obj = s3_client.put_object(Body=bucket_body, Bucket=bucket_name_1, Key="test-1.txt")
except ClientError as e:
s3_put_obj_error = e.response.get("Error", {}).get("Code")
assert s3_put_obj_error == 'NoSuchBucket'
oidc_remove=iam_client.delete_open_id_connect_provider(
OpenIDConnectProviderArn=oidc_arn
)
@attr(resource='assume role with web identity')
@attr(method='put')
@attr(operation='check')
@attr(assertion='checking session policy working for same bucket')
@attr('webidentity_test')
@pytest.mark.webidentity_test
@attr('session_policy')
@pytest.mark.session_policy
@attr('fails_on_dbstore')
@pytest.mark.fails_on_dbstore
def test_session_policy_check_on_same_bucket():
check_webidentity()
iam_client=get_iam_client()
sts_client=get_sts_client()
default_endpoint=get_config_endpoint()
role_session_name=get_parameter_name()
thumbprint=get_thumbprint()
aud=get_aud()
token=get_token()
realm=get_realm_name()
url = 'http://localhost:8080/auth/realms/{}'.format(realm)
thumbprintlist = [thumbprint]
(oidc_arn,oidc_error) = create_oidc_provider(iam_client, url, None, thumbprintlist)
if oidc_error is not None:
raise RuntimeError('Unable to create/get openid connect provider {}'.format(oidc_error))
policy_document = "{\"Version\":\"2012-10-17\",\"Statement\":[{\"Effect\":\"Allow\",\"Principal\":{\"Federated\":[\""+oidc_arn+"\"]},\"Action\":[\"sts:AssumeRoleWithWebIdentity\"],\"Condition\":{\"StringEquals\":{\"localhost:8080/auth/realms/"+realm+":app_id\":\""+aud+"\"}}}]}"
(role_error,role_response,general_role_name)=create_role(iam_client,'/',None,policy_document,None,None,None)
assert role_response['Role']['Arn'] == 'arn:aws:iam:::role/'+general_role_name+''
role_policy_new = "{\"Version\":\"2012-10-17\",\"Statement\":{\"Effect\":\"Allow\",\"Action\":\"s3:*\",\"Resource\":[\"*\"]}}"
(role_err,response)=put_role_policy(iam_client,general_role_name,None,role_policy_new)
assert response['ResponseMetadata']['HTTPStatusCode'] == 200
s3_client_iam_creds = get_s3_client_using_iam_creds()
bucket_name_1 = 'test1'
s3bucket = s3_client_iam_creds.create_bucket(Bucket=bucket_name_1)
assert s3bucket['ResponseMetadata']['HTTPStatusCode'] == 200
session_policy = "{\"Version\":\"2012-10-17\",\"Statement\":{\"Effect\":\"Allow\",\"Action\":[\"s3:GetObject\",\"s3:PutObject\"],\"Resource\":[\"arn:aws:s3:::test1\",\"arn:aws:s3:::test1/*\"]}}"
resp=sts_client.assume_role_with_web_identity(RoleArn=role_response['Role']['Arn'],RoleSessionName=role_session_name,WebIdentityToken=token,Policy=session_policy)
assert resp['ResponseMetadata']['HTTPStatusCode'] == 200
s3_client = boto3.client('s3',
aws_access_key_id = resp['Credentials']['AccessKeyId'],
aws_secret_access_key = resp['Credentials']['SecretAccessKey'],
aws_session_token = resp['Credentials']['SessionToken'],
endpoint_url=default_endpoint,
region_name='',
)
bucket_body = 'this is a test file'
s3_put_obj = s3_client.put_object(Body=bucket_body, Bucket=bucket_name_1, Key="test-1.txt")
assert s3_put_obj['ResponseMetadata']['HTTPStatusCode'] == 200
oidc_remove=iam_client.delete_open_id_connect_provider(
OpenIDConnectProviderArn=oidc_arn
)
@attr(resource='assume role with web identity')
@attr(method='get')
@attr(operation='check')
@attr(assertion='checking put_obj op denial')
@attr('webidentity_test')
@pytest.mark.webidentity_test
@attr('session_policy')
@pytest.mark.session_policy
@attr('fails_on_dbstore')
@pytest.mark.fails_on_dbstore
def test_session_policy_check_put_obj_denial():
check_webidentity()
iam_client=get_iam_client()
iam_access_key=get_iam_access_key()
iam_secret_key=get_iam_secret_key()
sts_client=get_sts_client()
default_endpoint=get_config_endpoint()
role_session_name=get_parameter_name()
thumbprint=get_thumbprint()
aud=get_aud()
token=get_token()
realm=get_realm_name()
url = 'http://localhost:8080/auth/realms/{}'.format(realm)
thumbprintlist = [thumbprint]
(oidc_arn,oidc_error) = create_oidc_provider(iam_client, url, None, thumbprintlist)
if oidc_error is not None:
raise RuntimeError('Unable to create/get openid connect provider {}'.format(oidc_error))
policy_document = "{\"Version\":\"2012-10-17\",\"Statement\":[{\"Effect\":\"Allow\",\"Principal\":{\"Federated\":[\""+oidc_arn+"\"]},\"Action\":[\"sts:AssumeRoleWithWebIdentity\"],\"Condition\":{\"StringEquals\":{\"localhost:8080/auth/realms/"+realm+":app_id\":\""+aud+"\"}}}]}"
(role_error,role_response,general_role_name)=create_role(iam_client,'/',None,policy_document,None,None,None)
assert role_response['Role']['Arn'] == 'arn:aws:iam:::role/'+general_role_name+''
role_policy_new = "{\"Version\":\"2012-10-17\",\"Statement\":{\"Effect\":\"Allow\",\"Action\":\"s3:*\",\"Resource\":[\"*\"]}}"
(role_err,response)=put_role_policy(iam_client,general_role_name,None,role_policy_new)
assert response['ResponseMetadata']['HTTPStatusCode'] == 200
s3_client_iam_creds = get_s3_client_using_iam_creds()
bucket_name_1 = 'test1'
s3bucket = s3_client_iam_creds.create_bucket(Bucket=bucket_name_1)
assert s3bucket['ResponseMetadata']['HTTPStatusCode'] == 200
session_policy = "{\"Version\":\"2012-10-17\",\"Statement\":{\"Effect\":\"Allow\",\"Action\":[\"s3:GetObject\"],\"Resource\":[\"arn:aws:s3:::test1\",\"arn:aws:s3:::test1/*\"]}}"
resp=sts_client.assume_role_with_web_identity(RoleArn=role_response['Role']['Arn'],RoleSessionName=role_session_name,WebIdentityToken=token,Policy=session_policy)
assert resp['ResponseMetadata']['HTTPStatusCode'] == 200
s3_client = boto3.client('s3',
aws_access_key_id = resp['Credentials']['AccessKeyId'],
aws_secret_access_key = resp['Credentials']['SecretAccessKey'],
aws_session_token = resp['Credentials']['SessionToken'],
endpoint_url=default_endpoint,
region_name='',
)
bucket_body = 'this is a test file'
try:
s3_put_obj = s3_client.put_object(Body=bucket_body, Bucket=bucket_name_1, Key="test-1.txt")
except ClientError as e:
s3_put_obj_error = e.response.get("Error", {}).get("Code")
assert s3_put_obj_error == 'AccessDenied'
oidc_remove=iam_client.delete_open_id_connect_provider(
OpenIDConnectProviderArn=oidc_arn
)
@attr(resource='assume role with web identity')
@attr(method='get')
@attr(operation='check')
@attr(assertion='checking put_obj working by swapping policies')
@attr('webidentity_test')
@pytest.mark.webidentity_test
@attr('session_policy')
@pytest.mark.session_policy
@attr('fails_on_dbstore')
@pytest.mark.fails_on_dbstore
def test_swapping_role_policy_and_session_policy():
check_webidentity()
iam_client=get_iam_client()
iam_access_key=get_iam_access_key()
iam_secret_key=get_iam_secret_key()
sts_client=get_sts_client()
default_endpoint=get_config_endpoint()
role_session_name=get_parameter_name()
thumbprint=get_thumbprint()
aud=get_aud()
token=get_token()
realm=get_realm_name()
url = 'http://localhost:8080/auth/realms/{}'.format(realm)
thumbprintlist = [thumbprint]
(oidc_arn,oidc_error) = create_oidc_provider(iam_client, url, None, thumbprintlist)
if oidc_error is not None:
raise RuntimeError('Unable to create/get openid connect provider {}'.format(oidc_error))
policy_document = "{\"Version\":\"2012-10-17\",\"Statement\":[{\"Effect\":\"Allow\",\"Principal\":{\"Federated\":[\""+oidc_arn+"\"]},\"Action\":[\"sts:AssumeRoleWithWebIdentity\"],\"Condition\":{\"StringEquals\":{\"localhost:8080/auth/realms/"+realm+":app_id\":\""+aud+"\"}}}]}"
(role_error,role_response,general_role_name)=create_role(iam_client,'/',None,policy_document,None,None,None)
assert role_response['Role']['Arn'] == 'arn:aws:iam:::role/'+general_role_name+''
role_policy_new = "{\"Version\":\"2012-10-17\",\"Statement\":{\"Effect\":\"Allow\",\"Action\":[\"s3:GetObject\",\"s3:PutObject\"],\"Resource\":[\"arn:aws:s3:::test1\",\"arn:aws:s3:::test1/*\"]}}"
(role_err,response)=put_role_policy(iam_client,general_role_name,None,role_policy_new)
assert response['ResponseMetadata']['HTTPStatusCode'] == 200
s3_client_iam_creds = get_s3_client_using_iam_creds()
bucket_name_1 = 'test1'
s3bucket = s3_client_iam_creds.create_bucket(Bucket=bucket_name_1)
assert s3bucket['ResponseMetadata']['HTTPStatusCode'] == 200
session_policy = "{\"Version\":\"2012-10-17\",\"Statement\":{\"Effect\":\"Allow\",\"Action\":\"s3:*\",\"Resource\":[\"*\"]}}"
resp=sts_client.assume_role_with_web_identity(RoleArn=role_response['Role']['Arn'],RoleSessionName=role_session_name,WebIdentityToken=token,Policy=session_policy)
assert resp['ResponseMetadata']['HTTPStatusCode'] == 200
s3_client = boto3.client('s3',
aws_access_key_id = resp['Credentials']['AccessKeyId'],
aws_secret_access_key = resp['Credentials']['SecretAccessKey'],
aws_session_token = resp['Credentials']['SessionToken'],
endpoint_url=default_endpoint,
region_name='',
)
bucket_body = 'this is a test file'
s3_put_obj = s3_client.put_object(Body=bucket_body, Bucket=bucket_name_1, Key="test-1.txt")
assert s3_put_obj['ResponseMetadata']['HTTPStatusCode'] == 200
oidc_remove=iam_client.delete_open_id_connect_provider(
OpenIDConnectProviderArn=oidc_arn
)
@attr(resource='assume role with web identity')
@attr(method='put')
@attr(operation='check')
@attr(assertion='checking put_obj working by setting different permissions to role and session policy')
@attr('webidentity_test')
@pytest.mark.webidentity_test
@attr('session_policy')
@pytest.mark.session_policy
@attr('fails_on_dbstore')
@pytest.mark.fails_on_dbstore
def test_session_policy_check_different_op_permissions():
check_webidentity()
iam_client=get_iam_client()
iam_access_key=get_iam_access_key()
iam_secret_key=get_iam_secret_key()
sts_client=get_sts_client()
default_endpoint=get_config_endpoint()
role_session_name=get_parameter_name()
thumbprint=get_thumbprint()
aud=get_aud()
token=get_token()
realm=get_realm_name()
url = 'http://localhost:8080/auth/realms/{}'.format(realm)
thumbprintlist = [thumbprint]
(oidc_arn,oidc_error) = create_oidc_provider(iam_client, url, None, thumbprintlist)
if oidc_error is not None:
raise RuntimeError('Unable to create/get openid connect provider {}'.format(oidc_error))
policy_document = "{\"Version\":\"2012-10-17\",\"Statement\":[{\"Effect\":\"Allow\",\"Principal\":{\"Federated\":[\""+oidc_arn+"\"]},\"Action\":[\"sts:AssumeRoleWithWebIdentity\"],\"Condition\":{\"StringEquals\":{\"localhost:8080/auth/realms/"+realm+":app_id\":\""+aud+"\"}}}]}"
(role_error,role_response,general_role_name)=create_role(iam_client,'/',None,policy_document,None,None,None)
assert role_response['Role']['Arn'] == 'arn:aws:iam:::role/'+general_role_name+''
role_policy_new = "{\"Version\":\"2012-10-17\",\"Statement\":{\"Effect\":\"Allow\",\"Action\":[\"s3:PutObject\"],\"Resource\":[\"arn:aws:s3:::test1\",\"arn:aws:s3:::test1/*\"]}}"
(role_err,response)=put_role_policy(iam_client,general_role_name,None,role_policy_new)
assert response['ResponseMetadata']['HTTPStatusCode'] == 200
s3_client_iam_creds = get_s3_client_using_iam_creds()
bucket_name_1 = 'test1'
s3bucket = s3_client_iam_creds.create_bucket(Bucket=bucket_name_1)
assert s3bucket['ResponseMetadata']['HTTPStatusCode'] == 200
session_policy = "{\"Version\":\"2012-10-17\",\"Statement\":{\"Effect\":\"Allow\",\"Action\":[\"s3:GetObject\"],\"Resource\":[\"arn:aws:s3:::test1\",\"arn:aws:s3:::test1/*\"]}}"
resp=sts_client.assume_role_with_web_identity(RoleArn=role_response['Role']['Arn'],RoleSessionName=role_session_name,WebIdentityToken=token,Policy=session_policy)
assert resp['ResponseMetadata']['HTTPStatusCode'] == 200
s3_client = boto3.client('s3',
aws_access_key_id = resp['Credentials']['AccessKeyId'],
aws_secret_access_key = resp['Credentials']['SecretAccessKey'],
aws_session_token = resp['Credentials']['SessionToken'],
endpoint_url=default_endpoint,
region_name='',
)
bucket_body = 'this is a test file'
try:
s3_put_obj = s3_client.put_object(Body=bucket_body, Bucket=bucket_name_1, Key="test-1.txt")
except ClientError as e:
s3_put_obj_error = e.response.get("Error", {}).get("Code")
assert s3_put_obj_error == 'AccessDenied'
oidc_remove=iam_client.delete_open_id_connect_provider(
OpenIDConnectProviderArn=oidc_arn
)
@attr(resource='assume role with web identity')
@attr(method='put')
@attr(operation='check')
@attr(assertion='checking op behaviour with deny effect')
@attr('webidentity_test')
@pytest.mark.webidentity_test
@attr('session_policy')
@pytest.mark.session_policy
@attr('fails_on_dbstore')
@pytest.mark.fails_on_dbstore
def test_session_policy_check_with_deny_effect():
check_webidentity()
iam_client=get_iam_client()
iam_access_key=get_iam_access_key()
iam_secret_key=get_iam_secret_key()
sts_client=get_sts_client()
default_endpoint=get_config_endpoint()
role_session_name=get_parameter_name()
thumbprint=get_thumbprint()
aud=get_aud()
token=get_token()
realm=get_realm_name()
url = 'http://localhost:8080/auth/realms/{}'.format(realm)
thumbprintlist = [thumbprint]
(oidc_arn,oidc_error) = create_oidc_provider(iam_client, url, None, thumbprintlist)
if oidc_error is not None:
raise RuntimeError('Unable to create/get openid connect provider {}'.format(oidc_error))
policy_document = "{\"Version\":\"2012-10-17\",\"Statement\":[{\"Effect\":\"Allow\",\"Principal\":{\"Federated\":[\""+oidc_arn+"\"]},\"Action\":[\"sts:AssumeRoleWithWebIdentity\"],\"Condition\":{\"StringEquals\":{\"localhost:8080/auth/realms/"+realm+":app_id\":\""+aud+"\"}}}]}"
(role_error,role_response,general_role_name)=create_role(iam_client,'/',None,policy_document,None,None,None)
assert role_response['Role']['Arn'] == 'arn:aws:iam:::role/'+general_role_name+''
role_policy_new = "{\"Version\":\"2012-10-17\",\"Statement\":{\"Effect\":\"Deny\",\"Action\":\"s3:*\",\"Resource\":[\"*\"]}}"
(role_err,response)=put_role_policy(iam_client,general_role_name,None,role_policy_new)
assert response['ResponseMetadata']['HTTPStatusCode'] == 200
s3_client_iam_creds = get_s3_client_using_iam_creds()
bucket_name_1 = 'test1'
s3bucket = s3_client_iam_creds.create_bucket(Bucket=bucket_name_1)
assert s3bucket['ResponseMetadata']['HTTPStatusCode'] == 200
session_policy = "{\"Version\":\"2012-10-17\",\"Statement\":{\"Effect\":\"Allow\",\"Action\":[\"s3:PutObject\"],\"Resource\":[\"arn:aws:s3:::test1\",\"arn:aws:s3:::test1/*\"]}}"
resp=sts_client.assume_role_with_web_identity(RoleArn=role_response['Role']['Arn'],RoleSessionName=role_session_name,WebIdentityToken=token,Policy=session_policy)
assert resp['ResponseMetadata']['HTTPStatusCode'] == 200
s3_client = boto3.client('s3',
aws_access_key_id = resp['Credentials']['AccessKeyId'],
aws_secret_access_key = resp['Credentials']['SecretAccessKey'],
aws_session_token = resp['Credentials']['SessionToken'],
endpoint_url=default_endpoint,
region_name='',
)
bucket_body = 'this is a test file'
try:
s3_put_obj = s3_client.put_object(Body=bucket_body, Bucket=bucket_name_1, Key="test-1.txt")
except ClientError as e:
s3_put_obj_error = e.response.get("Error", {}).get("Code")
assert s3_put_obj_error == 'AccessDenied'
oidc_remove=iam_client.delete_open_id_connect_provider(
OpenIDConnectProviderArn=oidc_arn
)
@attr(resource='assume role with web identity')
@attr(method='put')
@attr(operation='check')
@attr(assertion='checking put_obj working with deny and allow on same op')
@attr('webidentity_test')
@pytest.mark.webidentity_test
@attr('session_policy')
@pytest.mark.session_policy
@attr('fails_on_dbstore')
@pytest.mark.fails_on_dbstore
def test_session_policy_check_with_deny_on_same_op():
check_webidentity()
iam_client=get_iam_client()
iam_access_key=get_iam_access_key()
iam_secret_key=get_iam_secret_key()
sts_client=get_sts_client()
default_endpoint=get_config_endpoint()
role_session_name=get_parameter_name()
thumbprint=get_thumbprint()
aud=get_aud()
token=get_token()
realm=get_realm_name()
url = 'http://localhost:8080/auth/realms/{}'.format(realm)
thumbprintlist = [thumbprint]
(oidc_arn,oidc_error) = create_oidc_provider(iam_client, url, None, thumbprintlist)
if oidc_error is not None:
raise RuntimeError('Unable to create/get openid connect provider {}'.format(oidc_error))
policy_document = "{\"Version\":\"2012-10-17\",\"Statement\":[{\"Effect\":\"Allow\",\"Principal\":{\"Federated\":[\""+oidc_arn+"\"]},\"Action\":[\"sts:AssumeRoleWithWebIdentity\"],\"Condition\":{\"StringEquals\":{\"localhost:8080/auth/realms/"+realm+":app_id\":\""+aud+"\"}}}]}"
(role_error,role_response,general_role_name)=create_role(iam_client,'/',None,policy_document,None,None,None)
assert role_response['Role']['Arn'] == 'arn:aws:iam:::role/'+general_role_name+''
role_policy_new = "{\"Version\":\"2012-10-17\",\"Statement\":{\"Effect\":\"Allow\",\"Action\":[\"s3:PutObject\"],\"Resource\":[\"arn:aws:s3:::test1\",\"arn:aws:s3:::test1/*\"]}}"
(role_err,response)=put_role_policy(iam_client,general_role_name,None,role_policy_new)
assert response['ResponseMetadata']['HTTPStatusCode'] == 200
s3_client_iam_creds = get_s3_client_using_iam_creds()
bucket_name_1 = 'test1'
s3bucket = s3_client_iam_creds.create_bucket(Bucket=bucket_name_1)
assert s3bucket['ResponseMetadata']['HTTPStatusCode'] == 200
session_policy = "{\"Version\":\"2012-10-17\",\"Statement\":{\"Effect\":\"Deny\",\"Action\":[\"s3:PutObject\"],\"Resource\":[\"arn:aws:s3:::test1\",\"arn:aws:s3:::test1/*\"]}}"
resp=sts_client.assume_role_with_web_identity(RoleArn=role_response['Role']['Arn'],RoleSessionName=role_session_name,WebIdentityToken=token,Policy=session_policy)
assert resp['ResponseMetadata']['HTTPStatusCode'] == 200
s3_client = boto3.client('s3',
aws_access_key_id = resp['Credentials']['AccessKeyId'],
aws_secret_access_key = resp['Credentials']['SecretAccessKey'],
aws_session_token = resp['Credentials']['SessionToken'],
endpoint_url=default_endpoint,
region_name='',
)
bucket_body = 'this is a test file'
try:
s3_put_obj = s3_client.put_object(Body=bucket_body, Bucket=bucket_name_1, Key="test-1.txt")
except ClientError as e:
s3_put_obj_error = e.response.get("Error", {}).get("Code")
assert s3_put_obj_error == 'AccessDenied'
oidc_remove=iam_client.delete_open_id_connect_provider(
OpenIDConnectProviderArn=oidc_arn
)
@attr(resource='assume role with web identity')
@attr(method='put')
@attr(operation='check')
@attr(assertion='checking op when bucket policy has role arn')
@attr('webidentity_test')
@pytest.mark.webidentity_test
@attr('session_policy')
@pytest.mark.session_policy
@attr('fails_on_dbstore')
@pytest.mark.fails_on_dbstore
def test_session_policy_bucket_policy_role_arn():
check_webidentity()
iam_client=get_iam_client()
sts_client=get_sts_client()
default_endpoint=get_config_endpoint()
role_session_name=get_parameter_name()
thumbprint=get_thumbprint()
aud=get_aud()
token=get_token()
realm=get_realm_name()
url = 'http://localhost:8080/auth/realms/{}'.format(realm)
thumbprintlist = [thumbprint]
(oidc_arn,oidc_error) = create_oidc_provider(iam_client, url, None, thumbprintlist)
if oidc_error is not None:
raise RuntimeError('Unable to create/get openid connect provider {}'.format(oidc_error))
policy_document = "{\"Version\":\"2012-10-17\",\"Statement\":[{\"Effect\":\"Allow\",\"Principal\":{\"Federated\":[\""+oidc_arn+"\"]},\"Action\":[\"sts:AssumeRoleWithWebIdentity\"],\"Condition\":{\"StringEquals\":{\"localhost:8080/auth/realms/"+realm+":app_id\":\""+aud+"\"}}}]}"
(role_error,role_response,general_role_name)=create_role(iam_client,'/',None,policy_document,None,None,None)
assert role_response['Role']['Arn'] == 'arn:aws:iam:::role/'+general_role_name+''
role_policy = "{\"Version\":\"2012-10-17\",\"Statement\":{\"Effect\":\"Allow\",\"Action\":\"s3:*\",\"Resource\":[\"*\"]}}"
(role_err,response)=put_role_policy(iam_client,general_role_name,None,role_policy)
assert response['ResponseMetadata']['HTTPStatusCode'] == 200
s3client_iamcreds = get_s3_client_using_iam_creds()
bucket_name_1 = 'test1'
s3bucket = s3client_iamcreds.create_bucket(Bucket=bucket_name_1)
assert s3bucket['ResponseMetadata']['HTTPStatusCode'] == 200
resource1 = "arn:aws:s3:::" + bucket_name_1
resource2 = "arn:aws:s3:::" + bucket_name_1 + "/*"
rolearn = "arn:aws:iam:::role/" + general_role_name
bucket_policy = json.dumps(
{
"Version": "2012-10-17",
"Statement": [{
"Effect": "Allow",
"Principal": {"AWS": "{}".format(rolearn)},
"Action": ["s3:GetObject","s3:PutObject"],
"Resource": [
"{}".format(resource1),
"{}".format(resource2)
]
}]
})
s3client_iamcreds.put_bucket_policy(Bucket=bucket_name_1, Policy=bucket_policy)
session_policy = "{\"Version\":\"2012-10-17\",\"Statement\":{\"Effect\":\"Allow\",\"Action\":[\"s3:PutObject\"],\"Resource\":[\"arn:aws:s3:::test1\",\"arn:aws:s3:::test1/*\"]}}"
resp=sts_client.assume_role_with_web_identity(RoleArn=role_response['Role']['Arn'],RoleSessionName=role_session_name,WebIdentityToken=token,Policy=session_policy)
assert resp['ResponseMetadata']['HTTPStatusCode'] == 200
s3_client = boto3.client('s3',
aws_access_key_id = resp['Credentials']['AccessKeyId'],
aws_secret_access_key = resp['Credentials']['SecretAccessKey'],
aws_session_token = resp['Credentials']['SessionToken'],
endpoint_url=default_endpoint,
region_name='',
)
bucket_body = 'this is a test file'
s3_put_obj = s3_client.put_object(Body=bucket_body, Bucket=bucket_name_1, Key="test-1.txt")
assert s3_put_obj['ResponseMetadata']['HTTPStatusCode'] == 200
try:
obj = s3_client.get_object(Bucket=bucket_name_1, Key="test-1.txt")
except ClientError as e:
s3object_error = e.response.get("Error", {}).get("Code")
assert s3object_error == 'AccessDenied'
oidc_remove=iam_client.delete_open_id_connect_provider(
OpenIDConnectProviderArn=oidc_arn
)
@attr(resource='assume role with web identity')
@attr(method='get')
@attr(operation='check')
@attr(assertion='checking op when bucket policy has session arn')
@attr('webidentity_test')
@pytest.mark.webidentity_test
@attr('session_policy')
@pytest.mark.session_policy
@attr('fails_on_dbstore')
@pytest.mark.fails_on_dbstore
def test_session_policy_bucket_policy_session_arn():
check_webidentity()
iam_client=get_iam_client()
sts_client=get_sts_client()
default_endpoint=get_config_endpoint()
role_session_name=get_parameter_name()
thumbprint=get_thumbprint()
aud=get_aud()
token=get_token()
realm=get_realm_name()
url = 'http://localhost:8080/auth/realms/{}'.format(realm)
thumbprintlist = [thumbprint]
(oidc_arn,oidc_error) = create_oidc_provider(iam_client, url, None, thumbprintlist)
if oidc_error is not None:
raise RuntimeError('Unable to create/get openid connect provider {}'.format(oidc_error))
policy_document = "{\"Version\":\"2012-10-17\",\"Statement\":[{\"Effect\":\"Allow\",\"Principal\":{\"Federated\":[\""+oidc_arn+"\"]},\"Action\":[\"sts:AssumeRoleWithWebIdentity\"],\"Condition\":{\"StringEquals\":{\"localhost:8080/auth/realms/"+realm+":app_id\":\""+aud+"\"}}}]}"
(role_error,role_response,general_role_name)=create_role(iam_client,'/',None,policy_document,None,None,None)
assert role_response['Role']['Arn'] == 'arn:aws:iam:::role/'+general_role_name+''
role_policy = "{\"Version\":\"2012-10-17\",\"Statement\":{\"Effect\":\"Allow\",\"Action\":\"s3:*\",\"Resource\":[\"*\"]}}"
(role_err,response)=put_role_policy(iam_client,general_role_name,None,role_policy)
assert response['ResponseMetadata']['HTTPStatusCode'] == 200
s3client_iamcreds = get_s3_client_using_iam_creds()
bucket_name_1 = 'test1'
s3bucket = s3client_iamcreds.create_bucket(Bucket=bucket_name_1)
assert s3bucket['ResponseMetadata']['HTTPStatusCode'] == 200
resource1 = "arn:aws:s3:::" + bucket_name_1
resource2 = "arn:aws:s3:::" + bucket_name_1 + "/*"
rolesessionarn = "arn:aws:iam:::assumed-role/" + general_role_name + "/" + role_session_name
bucket_policy = json.dumps(
{
"Version": "2012-10-17",
"Statement": [{
"Effect": "Allow",
"Principal": {"AWS": "{}".format(rolesessionarn)},
"Action": ["s3:GetObject","s3:PutObject"],
"Resource": [
"{}".format(resource1),
"{}".format(resource2)
]
}]
})
s3client_iamcreds.put_bucket_policy(Bucket=bucket_name_1, Policy=bucket_policy)
session_policy = "{\"Version\":\"2012-10-17\",\"Statement\":{\"Effect\":\"Allow\",\"Action\":[\"s3:PutObject\"],\"Resource\":[\"arn:aws:s3:::test1\",\"arn:aws:s3:::test1/*\"]}}"
resp=sts_client.assume_role_with_web_identity(RoleArn=role_response['Role']['Arn'],RoleSessionName=role_session_name,WebIdentityToken=token,Policy=session_policy)
assert resp['ResponseMetadata']['HTTPStatusCode'] == 200
s3_client = boto3.client('s3',
aws_access_key_id = resp['Credentials']['AccessKeyId'],
aws_secret_access_key = resp['Credentials']['SecretAccessKey'],
aws_session_token = resp['Credentials']['SessionToken'],
endpoint_url=default_endpoint,
region_name='',
)
bucket_body = 'this is a test file'
s3_put_obj = s3_client.put_object(Body=bucket_body, Bucket=bucket_name_1, Key="test-1.txt")
assert s3_put_obj['ResponseMetadata']['HTTPStatusCode'] == 200
s3_get_obj = s3_client.get_object(Bucket=bucket_name_1, Key="test-1.txt")
assert s3_get_obj['ResponseMetadata']['HTTPStatusCode'] == 200
oidc_remove=iam_client.delete_open_id_connect_provider(
OpenIDConnectProviderArn=oidc_arn
)
@attr(resource='assume role with web identity')
@attr(method='put')
@attr(operation='check')
@attr(assertion='checking copy object op with role, session and bucket policy')
@attr('webidentity_test')
@pytest.mark.webidentity_test
@attr('session_policy')
@pytest.mark.session_policy
@attr('fails_on_dbstore')
@pytest.mark.fails_on_dbstore
def test_session_policy_copy_object():
check_webidentity()
iam_client=get_iam_client()
sts_client=get_sts_client()
default_endpoint=get_config_endpoint()
role_session_name=get_parameter_name()
thumbprint=get_thumbprint()
aud=get_aud()
token=get_token()
realm=get_realm_name()
url = 'http://localhost:8080/auth/realms/{}'.format(realm)
thumbprintlist = [thumbprint]
(oidc_arn,oidc_error) = create_oidc_provider(iam_client, url, None, thumbprintlist)
if oidc_error is not None:
raise RuntimeError('Unable to create/get openid connect provider {}'.format(oidc_error))
policy_document = "{\"Version\":\"2012-10-17\",\"Statement\":[{\"Effect\":\"Allow\",\"Principal\":{\"Federated\":[\""+oidc_arn+"\"]},\"Action\":[\"sts:AssumeRoleWithWebIdentity\"],\"Condition\":{\"StringEquals\":{\"localhost:8080/auth/realms/"+realm+":app_id\":\""+aud+"\"}}}]}"
(role_error,role_response,general_role_name)=create_role(iam_client,'/',None,policy_document,None,None,None)
assert role_response['Role']['Arn'] == 'arn:aws:iam:::role/'+general_role_name+''
role_policy = "{\"Version\":\"2012-10-17\",\"Statement\":{\"Effect\":\"Allow\",\"Action\":\"s3:*\",\"Resource\":[\"*\"]}}"
(role_err,response)=put_role_policy(iam_client,general_role_name,None,role_policy)
assert response['ResponseMetadata']['HTTPStatusCode'] == 200
s3client_iamcreds = get_s3_client_using_iam_creds()
bucket_name_1 = 'test1'
s3bucket = s3client_iamcreds.create_bucket(Bucket=bucket_name_1)
assert s3bucket['ResponseMetadata']['HTTPStatusCode'] == 200
resource1 = "arn:aws:s3:::" + bucket_name_1
resource2 = "arn:aws:s3:::" + bucket_name_1 + "/*"
rolesessionarn = "arn:aws:iam:::assumed-role/" + general_role_name + "/" + role_session_name
print (rolesessionarn)
bucket_policy = json.dumps(
{
"Version": "2012-10-17",
"Statement": [{
"Effect": "Allow",
"Principal": {"AWS": "{}".format(rolesessionarn)},
"Action": ["s3:GetObject","s3:PutObject"],
"Resource": [
"{}".format(resource1),
"{}".format(resource2)
]
}]
})
s3client_iamcreds.put_bucket_policy(Bucket=bucket_name_1, Policy=bucket_policy)
session_policy = "{\"Version\":\"2012-10-17\",\"Statement\":{\"Effect\":\"Allow\",\"Action\":[\"s3:PutObject\"],\"Resource\":[\"arn:aws:s3:::test1\",\"arn:aws:s3:::test1/*\"]}}"
resp=sts_client.assume_role_with_web_identity(RoleArn=role_response['Role']['Arn'],RoleSessionName=role_session_name,WebIdentityToken=token,Policy=session_policy)
assert resp['ResponseMetadata']['HTTPStatusCode'] == 200
s3_client = boto3.client('s3',
aws_access_key_id = resp['Credentials']['AccessKeyId'],
aws_secret_access_key = resp['Credentials']['SecretAccessKey'],
aws_session_token = resp['Credentials']['SessionToken'],
endpoint_url=default_endpoint,
region_name='',
)
bucket_body = 'this is a test file'
s3_put_obj = s3_client.put_object(Body=bucket_body, Bucket=bucket_name_1, Key="test-1.txt")
assert s3_put_obj['ResponseMetadata']['HTTPStatusCode'] == 200
copy_source = {
'Bucket': bucket_name_1,
'Key': 'test-1.txt'
}
s3_client.copy(copy_source, bucket_name_1, "test-2.txt")
s3_get_obj = s3_client.get_object(Bucket=bucket_name_1, Key="test-2.txt")
assert s3_get_obj['ResponseMetadata']['HTTPStatusCode'] == 200
oidc_remove=iam_client.delete_open_id_connect_provider(
OpenIDConnectProviderArn=oidc_arn
)
@attr(resource='assume role with web identity')
@attr(method='put')
@attr(operation='check')
@attr(assertion='checking op is denied when no role policy')
@attr('webidentity_test')
@pytest.mark.webidentity_test
@attr('session_policy')
@pytest.mark.session_policy
@attr('fails_on_dbstore')
@pytest.mark.fails_on_dbstore
def test_session_policy_no_bucket_role_policy():
check_webidentity()
iam_client=get_iam_client()
sts_client=get_sts_client()
default_endpoint=get_config_endpoint()
role_session_name=get_parameter_name()
thumbprint=get_thumbprint()
aud=get_aud()
token=get_token()
realm=get_realm_name()
url = 'http://localhost:8080/auth/realms/{}'.format(realm)
thumbprintlist = [thumbprint]
(oidc_arn,oidc_error) = create_oidc_provider(iam_client, url, None, thumbprintlist)
if oidc_error is not None:
raise RuntimeError('Unable to create/get openid connect provider {}'.format(oidc_error))
policy_document = "{\"Version\":\"2012-10-17\",\"Statement\":[{\"Effect\":\"Allow\",\"Principal\":{\"Federated\":[\""+oidc_arn+"\"]},\"Action\":[\"sts:AssumeRoleWithWebIdentity\"],\"Condition\":{\"StringEquals\":{\"localhost:8080/auth/realms/"+realm+":app_id\":\""+aud+"\"}}}]}"
(role_error,role_response,general_role_name)=create_role(iam_client,'/',None,policy_document,None,None,None)
assert role_response['Role']['Arn'] == 'arn:aws:iam:::role/'+general_role_name+''
s3client_iamcreds = get_s3_client_using_iam_creds()
bucket_name_1 = 'test1'
s3bucket = s3client_iamcreds.create_bucket(Bucket=bucket_name_1)
assert s3bucket['ResponseMetadata']['HTTPStatusCode'] == 200
session_policy = "{\"Version\":\"2012-10-17\",\"Statement\":{\"Effect\":\"Allow\",\"Action\":[\"s3:PutObject\",\"s3:GetObject\"],\"Resource\":[\"arn:aws:s3:::test1\",\"arn:aws:s3:::test1/*\"]}}"
resp=sts_client.assume_role_with_web_identity(RoleArn=role_response['Role']['Arn'],RoleSessionName=role_session_name,WebIdentityToken=token,Policy=session_policy)
assert resp['ResponseMetadata']['HTTPStatusCode'] == 200
s3_client = boto3.client('s3',
aws_access_key_id = resp['Credentials']['AccessKeyId'],
aws_secret_access_key = resp['Credentials']['SecretAccessKey'],
aws_session_token = resp['Credentials']['SessionToken'],
endpoint_url=default_endpoint,
region_name='',
)
bucket_body = 'this is a test file'
try:
s3_put_obj = s3_client.put_object(Body=bucket_body, Bucket=bucket_name_1, Key="test-1.txt")
except ClientError as e:
s3putobj_error = e.response.get("Error", {}).get("Code")
assert s3putobj_error == 'AccessDenied'
oidc_remove=iam_client.delete_open_id_connect_provider(
OpenIDConnectProviderArn=oidc_arn
)
@attr(resource='assume role with web identity')
@attr(method='put')
@attr(operation='check')
@attr(assertion='checking op is denied when resource policy denies')
@attr('webidentity_test')
@pytest.mark.webidentity_test
@attr('session_policy')
@pytest.mark.session_policy
@attr('fails_on_dbstore')
@pytest.mark.fails_on_dbstore
def test_session_policy_bucket_policy_deny():
check_webidentity()
iam_client=get_iam_client()
sts_client=get_sts_client()
default_endpoint=get_config_endpoint()
role_session_name=get_parameter_name()
thumbprint=get_thumbprint()
aud=get_aud()
token=get_token()
realm=get_realm_name()
url = 'http://localhost:8080/auth/realms/{}'.format(realm)
thumbprintlist = [thumbprint]
(oidc_arn,oidc_error) = create_oidc_provider(iam_client, url, None, thumbprintlist)
if oidc_error is not None:
raise RuntimeError('Unable to create/get openid connect provider {}'.format(oidc_error))
policy_document = "{\"Version\":\"2012-10-17\",\"Statement\":[{\"Effect\":\"Allow\",\"Principal\":{\"Federated\":[\""+oidc_arn+"\"]},\"Action\":[\"sts:AssumeRoleWithWebIdentity\"],\"Condition\":{\"StringEquals\":{\"localhost:8080/auth/realms/"+realm+":app_id\":\""+aud+"\"}}}]}"
(role_error,role_response,general_role_name)=create_role(iam_client,'/',None,policy_document,None,None,None)
assert role_response['Role']['Arn'] == 'arn:aws:iam:::role/'+general_role_name+''
role_policy = "{\"Version\":\"2012-10-17\",\"Statement\":{\"Effect\":\"Allow\",\"Action\":\"s3:*\",\"Resource\":[\"*\"]}}"
(role_err,response)=put_role_policy(iam_client,general_role_name,None,role_policy)
assert response['ResponseMetadata']['HTTPStatusCode'] == 200
s3client_iamcreds = get_s3_client_using_iam_creds()
bucket_name_1 = 'test1'
s3bucket = s3client_iamcreds.create_bucket(Bucket=bucket_name_1)
assert s3bucket['ResponseMetadata']['HTTPStatusCode'] == 200
resource1 = "arn:aws:s3:::" + bucket_name_1
resource2 = "arn:aws:s3:::" + bucket_name_1 + "/*"
rolesessionarn = "arn:aws:iam:::assumed-role/" + general_role_name + "/" + role_session_name
bucket_policy = json.dumps(
{
"Version": "2012-10-17",
"Statement": [{
"Effect": "Deny",
"Principal": {"AWS": "{}".format(rolesessionarn)},
"Action": ["s3:GetObject","s3:PutObject"],
"Resource": [
"{}".format(resource1),
"{}".format(resource2)
]
}]
})
s3client_iamcreds.put_bucket_policy(Bucket=bucket_name_1, Policy=bucket_policy)
session_policy = "{\"Version\":\"2012-10-17\",\"Statement\":{\"Effect\":\"Allow\",\"Action\":[\"s3:PutObject\"],\"Resource\":[\"arn:aws:s3:::test1\",\"arn:aws:s3:::test1/*\"]}}"
resp=sts_client.assume_role_with_web_identity(RoleArn=role_response['Role']['Arn'],RoleSessionName=role_session_name,WebIdentityToken=token,Policy=session_policy)
assert resp['ResponseMetadata']['HTTPStatusCode'] == 200
s3_client = boto3.client('s3',
aws_access_key_id = resp['Credentials']['AccessKeyId'],
aws_secret_access_key = resp['Credentials']['SecretAccessKey'],
aws_session_token = resp['Credentials']['SessionToken'],
endpoint_url=default_endpoint,
region_name='',
)
bucket_body = 'this is a test file'
try:
s3_put_obj = s3_client.put_object(Body=bucket_body, Bucket=bucket_name_1, Key="test-1.txt")
except ClientError as e:
s3putobj_error = e.response.get("Error", {}).get("Code")
assert s3putobj_error == 'AccessDenied'
oidc_remove=iam_client.delete_open_id_connect_provider(
OpenIDConnectProviderArn=oidc_arn
)
@attr(resource='assume role with web identity')
@attr(method='get')
@attr(operation='check')
@attr(assertion='assuming role using web token using sub in trust policy')
@attr('webidentity_test')
@pytest.mark.webidentity_test
@attr('token_claims_trust_policy_test')
@pytest.mark.token_claims_trust_policy_test
@attr('fails_on_dbstore')
@pytest.mark.fails_on_dbstore
def test_assume_role_with_web_identity_with_sub():
check_webidentity()
iam_client=get_iam_client()
sts_client=get_sts_client()
default_endpoint=get_config_endpoint()
role_session_name=get_parameter_name()
thumbprint=get_thumbprint()
sub=get_sub()
token=get_token()
realm=get_realm_name()
oidc_response = iam_client.create_open_id_connect_provider(
Url='http://localhost:8080/auth/realms/{}'.format(realm),
ThumbprintList=[
thumbprint,
],
)
policy_document = "{\"Version\":\"2012-10-17\",\"Statement\":[{\"Effect\":\"Allow\",\"Principal\":{\"Federated\":[\""+oidc_response["OpenIDConnectProviderArn"]+"\"]},\"Action\":[\"sts:AssumeRoleWithWebIdentity\"],\"Condition\":{\"StringEquals\":{\"localhost:8080/auth/realms/"+realm+":sub\":\""+sub+"\"}}}]}"
(role_error,role_response,general_role_name)=create_role(iam_client,'/',None,policy_document,None,None,None)
assert role_response['Role']['Arn'] == 'arn:aws:iam:::role/'+general_role_name+''
role_policy = "{\"Version\":\"2012-10-17\",\"Statement\":{\"Effect\":\"Allow\",\"Action\":\"s3:*\",\"Resource\":\"arn:aws:s3:::*\"}}"
(role_err,response)=put_role_policy(iam_client,general_role_name,None,role_policy)
assert response['ResponseMetadata']['HTTPStatusCode'] == 200
resp=sts_client.assume_role_with_web_identity(RoleArn=role_response['Role']['Arn'],RoleSessionName=role_session_name,WebIdentityToken=token)
assert resp['ResponseMetadata']['HTTPStatusCode'] == 200
s3_client = boto3.client('s3',
aws_access_key_id = resp['Credentials']['AccessKeyId'],
aws_secret_access_key = resp['Credentials']['SecretAccessKey'],
aws_session_token = resp['Credentials']['SessionToken'],
endpoint_url=default_endpoint,
region_name='',
)
bucket_name = get_new_bucket_name()
s3bucket = s3_client.create_bucket(Bucket=bucket_name)
assert s3bucket['ResponseMetadata']['HTTPStatusCode'] == 200
bkt = s3_client.delete_bucket(Bucket=bucket_name)
assert bkt['ResponseMetadata']['HTTPStatusCode'] == 204
oidc_remove=iam_client.delete_open_id_connect_provider(
OpenIDConnectProviderArn=oidc_response["OpenIDConnectProviderArn"]
)
@attr(resource='assume role with web identity')
@attr(method='get')
@attr(operation='check')
@attr(assertion='assuming role using web token using azp in trust policy')
@attr('webidentity_test')
@pytest.mark.webidentity_test
@attr('token_claims_trust_policy_test')
@pytest.mark.token_claims_trust_policy_test
@attr('fails_on_dbstore')
@pytest.mark.fails_on_dbstore
def test_assume_role_with_web_identity_with_azp():
check_webidentity()
iam_client=get_iam_client()
sts_client=get_sts_client()
default_endpoint=get_config_endpoint()
role_session_name=get_parameter_name()
thumbprint=get_thumbprint()
azp=get_azp()
token=get_token()
realm=get_realm_name()
oidc_response = iam_client.create_open_id_connect_provider(
Url='http://localhost:8080/auth/realms/{}'.format(realm),
ThumbprintList=[
thumbprint,
],
)
policy_document = "{\"Version\":\"2012-10-17\",\"Statement\":[{\"Effect\":\"Allow\",\"Principal\":{\"Federated\":[\""+oidc_response["OpenIDConnectProviderArn"]+"\"]},\"Action\":[\"sts:AssumeRoleWithWebIdentity\"],\"Condition\":{\"StringEquals\":{\"localhost:8080/auth/realms/"+realm+":azp\":\""+azp+"\"}}}]}"
(role_error,role_response,general_role_name)=create_role(iam_client,'/',None,policy_document,None,None,None)
assert role_response['Role']['Arn'] == 'arn:aws:iam:::role/'+general_role_name+''
role_policy = "{\"Version\":\"2012-10-17\",\"Statement\":{\"Effect\":\"Allow\",\"Action\":\"s3:*\",\"Resource\":\"arn:aws:s3:::*\"}}"
(role_err,response)=put_role_policy(iam_client,general_role_name,None,role_policy)
assert response['ResponseMetadata']['HTTPStatusCode'] == 200
resp=sts_client.assume_role_with_web_identity(RoleArn=role_response['Role']['Arn'],RoleSessionName=role_session_name,WebIdentityToken=token)
assert resp['ResponseMetadata']['HTTPStatusCode'] == 200
s3_client = boto3.client('s3',
aws_access_key_id = resp['Credentials']['AccessKeyId'],
aws_secret_access_key = resp['Credentials']['SecretAccessKey'],
aws_session_token = resp['Credentials']['SessionToken'],
endpoint_url=default_endpoint,
region_name='',
)
bucket_name = get_new_bucket_name()
s3bucket = s3_client.create_bucket(Bucket=bucket_name)
assert s3bucket['ResponseMetadata']['HTTPStatusCode'] == 200
bkt = s3_client.delete_bucket(Bucket=bucket_name)
assert bkt['ResponseMetadata']['HTTPStatusCode'] == 204
oidc_remove=iam_client.delete_open_id_connect_provider(
OpenIDConnectProviderArn=oidc_response["OpenIDConnectProviderArn"]
)
@attr(resource='assume role with web identity')
@attr(method='get')
@attr(operation='check')
@attr(assertion='assuming role using web token using aws:RequestTag in trust policy')
@attr('webidentity_test')
@pytest.mark.webidentity_test
@attr('abac_test')
@pytest.mark.abac_test
@attr('token_request_tag_trust_policy_test')
@pytest.mark.token_request_tag_trust_policy_test
@attr('fails_on_dbstore')
@pytest.mark.fails_on_dbstore
def test_assume_role_with_web_identity_with_request_tag():
check_webidentity()
iam_client=get_iam_client()
sts_client=get_sts_client()
default_endpoint=get_config_endpoint()
role_session_name=get_parameter_name()
thumbprint=get_thumbprint()
user_token=get_user_token()
realm=get_realm_name()
oidc_response = iam_client.create_open_id_connect_provider(
Url='http://localhost:8080/auth/realms/{}'.format(realm),
ThumbprintList=[
thumbprint,
],
)
policy_document = "{\"Version\":\"2012-10-17\",\"Statement\":[{\"Effect\":\"Allow\",\"Principal\":{\"Federated\":[\""+oidc_response["OpenIDConnectProviderArn"]+"\"]},\"Action\":[\"sts:AssumeRoleWithWebIdentity\",\"sts:TagSession\"],\"Condition\":{\"StringEquals\":{\"aws:RequestTag/Department\":\"Engineering\"}}}]}"
(role_error,role_response,general_role_name)=create_role(iam_client,'/',None,policy_document,None,None,None)
assert role_response['Role']['Arn'] == 'arn:aws:iam:::role/'+general_role_name+''
role_policy = "{\"Version\":\"2012-10-17\",\"Statement\":{\"Effect\":\"Allow\",\"Action\":\"s3:*\",\"Resource\":\"arn:aws:s3:::*\"}}"
(role_err,response)=put_role_policy(iam_client,general_role_name,None,role_policy)
assert response['ResponseMetadata']['HTTPStatusCode'] == 200
resp=sts_client.assume_role_with_web_identity(RoleArn=role_response['Role']['Arn'],RoleSessionName=role_session_name,WebIdentityToken=user_token)
assert resp['ResponseMetadata']['HTTPStatusCode'] == 200
s3_client = boto3.client('s3',
aws_access_key_id = resp['Credentials']['AccessKeyId'],
aws_secret_access_key = resp['Credentials']['SecretAccessKey'],
aws_session_token = resp['Credentials']['SessionToken'],
endpoint_url=default_endpoint,
region_name='',
)
bucket_name = get_new_bucket_name()
s3bucket = s3_client.create_bucket(Bucket=bucket_name)
assert s3bucket['ResponseMetadata']['HTTPStatusCode'] == 200
bkt = s3_client.delete_bucket(Bucket=bucket_name)
assert bkt['ResponseMetadata']['HTTPStatusCode'] == 204
oidc_remove=iam_client.delete_open_id_connect_provider(
OpenIDConnectProviderArn=oidc_response["OpenIDConnectProviderArn"]
)
@attr(resource='assume role with web identity')
@attr(method='get')
@attr(operation='check')
@attr(assertion='assuming role using web token with aws:PrincipalTag in role policy')
@attr('webidentity_test')
@pytest.mark.webidentity_test
@attr('abac_test')
@pytest.mark.abac_test
@attr('token_principal_tag_role_policy_test')
@pytest.mark.token_principal_tag_role_policy_test
@attr('fails_on_dbstore')
@pytest.mark.fails_on_dbstore
def test_assume_role_with_web_identity_with_principal_tag():
check_webidentity()
iam_client=get_iam_client()
sts_client=get_sts_client()
default_endpoint=get_config_endpoint()
role_session_name=get_parameter_name()
thumbprint=get_thumbprint()
user_token=get_user_token()
realm=get_realm_name()
oidc_response = iam_client.create_open_id_connect_provider(
Url='http://localhost:8080/auth/realms/{}'.format(realm),
ThumbprintList=[
thumbprint,
],
)
policy_document = "{\"Version\":\"2012-10-17\",\"Statement\":[{\"Effect\":\"Allow\",\"Principal\":{\"Federated\":[\""+oidc_response["OpenIDConnectProviderArn"]+"\"]},\"Action\":[\"sts:AssumeRoleWithWebIdentity\",\"sts:TagSession\"],\"Condition\":{\"StringEquals\":{\"aws:RequestTag/Department\":\"Engineering\"}}}]}"
(role_error,role_response,general_role_name)=create_role(iam_client,'/',None,policy_document,None,None,None)
assert role_response['Role']['Arn'] == 'arn:aws:iam:::role/'+general_role_name+''
role_policy = "{\"Version\":\"2012-10-17\",\"Statement\":{\"Effect\":\"Allow\",\"Action\":\"s3:*\",\"Resource\":\"arn:aws:s3:::*\",\"Condition\":{\"StringEquals\":{\"aws:PrincipalTag/Department\":\"Engineering\"}}}}"
(role_err,response)=put_role_policy(iam_client,general_role_name,None,role_policy)
assert response['ResponseMetadata']['HTTPStatusCode'] == 200
resp=sts_client.assume_role_with_web_identity(RoleArn=role_response['Role']['Arn'],RoleSessionName=role_session_name,WebIdentityToken=user_token)
assert resp['ResponseMetadata']['HTTPStatusCode'] == 200
s3_client = boto3.client('s3',
aws_access_key_id = resp['Credentials']['AccessKeyId'],
aws_secret_access_key = resp['Credentials']['SecretAccessKey'],
aws_session_token = resp['Credentials']['SessionToken'],
endpoint_url=default_endpoint,
region_name='',
)
bucket_name = get_new_bucket_name()
s3bucket = s3_client.create_bucket(Bucket=bucket_name)
assert s3bucket['ResponseMetadata']['HTTPStatusCode'] == 200
bkt = s3_client.delete_bucket(Bucket=bucket_name)
assert bkt['ResponseMetadata']['HTTPStatusCode'] == 204
oidc_remove=iam_client.delete_open_id_connect_provider(
OpenIDConnectProviderArn=oidc_response["OpenIDConnectProviderArn"]
)
@attr(resource='assume role with web identity')
@attr(method='get')
@attr(operation='check')
@attr(assertion='assuming role using web token with aws:PrincipalTag in role policy')
@attr('webidentity_test')
@pytest.mark.webidentity_test
@attr('abac_test')
@pytest.mark.abac_test
@attr('token_principal_tag_role_policy_test')
@pytest.mark.token_principal_tag_role_policy_test
@attr('fails_on_dbstore')
@pytest.mark.fails_on_dbstore
def test_assume_role_with_web_identity_for_all_values():
check_webidentity()
iam_client=get_iam_client()
sts_client=get_sts_client()
default_endpoint=get_config_endpoint()
role_session_name=get_parameter_name()
thumbprint=get_thumbprint()
user_token=get_user_token()
realm=get_realm_name()
oidc_response = iam_client.create_open_id_connect_provider(
Url='http://localhost:8080/auth/realms/{}'.format(realm),
ThumbprintList=[
thumbprint,
],
)
policy_document = "{\"Version\":\"2012-10-17\",\"Statement\":[{\"Effect\":\"Allow\",\"Principal\":{\"Federated\":[\""+oidc_response["OpenIDConnectProviderArn"]+"\"]},\"Action\":[\"sts:AssumeRoleWithWebIdentity\",\"sts:TagSession\"],\"Condition\":{\"StringEquals\":{\"aws:RequestTag/Department\":\"Engineering\"}}}]}"
(role_error,role_response,general_role_name)=create_role(iam_client,'/',None,policy_document,None,None,None)
assert role_response['Role']['Arn'] == 'arn:aws:iam:::role/'+general_role_name+''
role_policy = "{\"Version\":\"2012-10-17\",\"Statement\":{\"Effect\":\"Allow\",\"Action\":\"s3:*\",\"Resource\":\"arn:aws:s3:::*\",\"Condition\":{\"ForAllValues:StringEquals\":{\"aws:PrincipalTag/Department\":[\"Engineering\",\"Marketing\"]}}}}"
(role_err,response)=put_role_policy(iam_client,general_role_name,None,role_policy)
assert response['ResponseMetadata']['HTTPStatusCode'] == 200
resp=sts_client.assume_role_with_web_identity(RoleArn=role_response['Role']['Arn'],RoleSessionName=role_session_name,WebIdentityToken=user_token)
assert resp['ResponseMetadata']['HTTPStatusCode'] == 200
s3_client = boto3.client('s3',
aws_access_key_id = resp['Credentials']['AccessKeyId'],
aws_secret_access_key = resp['Credentials']['SecretAccessKey'],
aws_session_token = resp['Credentials']['SessionToken'],
endpoint_url=default_endpoint,
region_name='',
)
bucket_name = get_new_bucket_name()
s3bucket = s3_client.create_bucket(Bucket=bucket_name)
assert s3bucket['ResponseMetadata']['HTTPStatusCode'] == 200
bkt = s3_client.delete_bucket(Bucket=bucket_name)
assert bkt['ResponseMetadata']['HTTPStatusCode'] == 204
oidc_remove=iam_client.delete_open_id_connect_provider(
OpenIDConnectProviderArn=oidc_response["OpenIDConnectProviderArn"]
)
@attr(resource='assume role with web identity')
@attr(method='get')
@attr(operation='check')
@attr(assertion='assuming role using web token with aws:PrincipalTag in role policy')
@attr('webidentity_test')
@pytest.mark.webidentity_test
@attr('abac_test')
@pytest.mark.abac_test
@attr('token_principal_tag_role_policy_test')
@pytest.mark.token_principal_tag_role_policy_test
@attr('fails_on_dbstore')
@pytest.mark.fails_on_dbstore
def test_assume_role_with_web_identity_for_all_values_deny():
check_webidentity()
iam_client=get_iam_client()
sts_client=get_sts_client()
default_endpoint=get_config_endpoint()
role_session_name=get_parameter_name()
thumbprint=get_thumbprint()
user_token=get_user_token()
realm=get_realm_name()
oidc_response = iam_client.create_open_id_connect_provider(
Url='http://localhost:8080/auth/realms/{}'.format(realm),
ThumbprintList=[
thumbprint,
],
)
policy_document = "{\"Version\":\"2012-10-17\",\"Statement\":[{\"Effect\":\"Allow\",\"Principal\":{\"Federated\":[\""+oidc_response["OpenIDConnectProviderArn"]+"\"]},\"Action\":[\"sts:AssumeRoleWithWebIdentity\",\"sts:TagSession\"],\"Condition\":{\"StringEquals\":{\"aws:RequestTag/Department\":\"Engineering\"}}}]}"
(role_error,role_response,general_role_name)=create_role(iam_client,'/',None,policy_document,None,None,None)
assert role_response['Role']['Arn'] == 'arn:aws:iam:::role/'+general_role_name+''
#ForAllValues: The condition returns true if every key value in the request matches at least one value in the policy
role_policy = "{\"Version\":\"2012-10-17\",\"Statement\":{\"Effect\":\"Allow\",\"Action\":\"s3:*\",\"Resource\":\"arn:aws:s3:::*\",\"Condition\":{\"ForAllValues:StringEquals\":{\"aws:PrincipalTag/Department\":[\"Engineering\"]}}}}"
(role_err,response)=put_role_policy(iam_client,general_role_name,None,role_policy)
assert response['ResponseMetadata']['HTTPStatusCode'] == 200
resp=sts_client.assume_role_with_web_identity(RoleArn=role_response['Role']['Arn'],RoleSessionName=role_session_name,WebIdentityToken=user_token)
assert resp['ResponseMetadata']['HTTPStatusCode'] == 200
s3_client = boto3.client('s3',
aws_access_key_id = resp['Credentials']['AccessKeyId'],
aws_secret_access_key = resp['Credentials']['SecretAccessKey'],
aws_session_token = resp['Credentials']['SessionToken'],
endpoint_url=default_endpoint,
region_name='',
)
bucket_name = get_new_bucket_name()
try:
s3bucket = s3_client.create_bucket(Bucket=bucket_name)
except ClientError as e:
s3bucket_error = e.response.get("Error", {}).get("Code")
assert s3bucket_error == 'AccessDenied'
oidc_remove=iam_client.delete_open_id_connect_provider(
OpenIDConnectProviderArn=oidc_response["OpenIDConnectProviderArn"]
)
@attr(resource='assume role with web identity')
@attr(method='get')
@attr(operation='check')
@attr(assertion='assuming role using web token with aws:TagKeys in trust policy')
@attr('webidentity_test')
@pytest.mark.webidentity_test
@attr('abac_test')
@pytest.mark.abac_test
@attr('token_tag_keys_test')
@pytest.mark.token_tag_keys_test
@attr('fails_on_dbstore')
@pytest.mark.fails_on_dbstore
def test_assume_role_with_web_identity_tag_keys_trust_policy():
check_webidentity()
iam_client=get_iam_client()
sts_client=get_sts_client()
default_endpoint=get_config_endpoint()
role_session_name=get_parameter_name()
thumbprint=get_thumbprint()
user_token=get_user_token()
realm=get_realm_name()
oidc_response = iam_client.create_open_id_connect_provider(
Url='http://localhost:8080/auth/realms/{}'.format(realm),
ThumbprintList=[
thumbprint,
],
)
policy_document = "{\"Version\":\"2012-10-17\",\"Statement\":[{\"Effect\":\"Allow\",\"Principal\":{\"Federated\":[\""+oidc_response["OpenIDConnectProviderArn"]+"\"]},\"Action\":[\"sts:AssumeRoleWithWebIdentity\",\"sts:TagSession\"],\"Condition\":{\"StringEquals\":{\"aws:TagKeys\":\"Department\"}}}]}"
(role_error,role_response,general_role_name)=create_role(iam_client,'/',None,policy_document,None,None,None)
assert role_response['Role']['Arn'] == 'arn:aws:iam:::role/'+general_role_name+''
role_policy = "{\"Version\":\"2012-10-17\",\"Statement\":{\"Effect\":\"Allow\",\"Action\":\"s3:*\",\"Resource\":\"arn:aws:s3:::*\",\"Condition\":{\"ForAnyValue:StringEquals\":{\"aws:PrincipalTag/Department\":[\"Engineering\"]}}}}"
(role_err,response)=put_role_policy(iam_client,general_role_name,None,role_policy)
assert response['ResponseMetadata']['HTTPStatusCode'] == 200
resp=sts_client.assume_role_with_web_identity(RoleArn=role_response['Role']['Arn'],RoleSessionName=role_session_name,WebIdentityToken=user_token)
assert resp['ResponseMetadata']['HTTPStatusCode'] == 200
s3_client = boto3.client('s3',
aws_access_key_id = resp['Credentials']['AccessKeyId'],
aws_secret_access_key = resp['Credentials']['SecretAccessKey'],
aws_session_token = resp['Credentials']['SessionToken'],
endpoint_url=default_endpoint,
region_name='',
)
bucket_name = get_new_bucket_name()
s3bucket = s3_client.create_bucket(Bucket=bucket_name)
assert s3bucket['ResponseMetadata']['HTTPStatusCode'] == 200
bkt = s3_client.delete_bucket(Bucket=bucket_name)
assert bkt['ResponseMetadata']['HTTPStatusCode'] == 204
oidc_remove=iam_client.delete_open_id_connect_provider(
OpenIDConnectProviderArn=oidc_response["OpenIDConnectProviderArn"]
)
@attr(resource='assume role with web identity')
@attr(method='get')
@attr(operation='check')
@attr(assertion='assuming role using web token with aws:TagKeys in role permission policy')
@attr('webidentity_test')
@pytest.mark.webidentity_test
@attr('abac_test')
@pytest.mark.abac_test
@attr('token_tag_keys_test')
@pytest.mark.token_tag_keys_test
@attr('fails_on_dbstore')
@pytest.mark.fails_on_dbstore
def test_assume_role_with_web_identity_tag_keys_role_policy():
check_webidentity()
iam_client=get_iam_client()
sts_client=get_sts_client()
default_endpoint=get_config_endpoint()
role_session_name=get_parameter_name()
thumbprint=get_thumbprint()
user_token=get_user_token()
realm=get_realm_name()
oidc_response = iam_client.create_open_id_connect_provider(
Url='http://localhost:8080/auth/realms/{}'.format(realm),
ThumbprintList=[
thumbprint,
],
)
policy_document = "{\"Version\":\"2012-10-17\",\"Statement\":[{\"Effect\":\"Allow\",\"Principal\":{\"Federated\":[\""+oidc_response["OpenIDConnectProviderArn"]+"\"]},\"Action\":[\"sts:AssumeRoleWithWebIdentity\",\"sts:TagSession\"],\"Condition\":{\"StringEquals\":{\"aws:RequestTag/Department\":\"Engineering\"}}}]}"
(role_error,role_response,general_role_name)=create_role(iam_client,'/',None,policy_document,None,None,None)
assert role_response['Role']['Arn'] == 'arn:aws:iam:::role/'+general_role_name+''
role_policy = "{\"Version\":\"2012-10-17\",\"Statement\":{\"Effect\":\"Allow\",\"Action\":\"s3:*\",\"Resource\":\"arn:aws:s3:::*\",\"Condition\":{\"StringEquals\":{\"aws:TagKeys\":[\"Department\"]}}}}"
(role_err,response)=put_role_policy(iam_client,general_role_name,None,role_policy)
assert response['ResponseMetadata']['HTTPStatusCode'] == 200
resp=sts_client.assume_role_with_web_identity(RoleArn=role_response['Role']['Arn'],RoleSessionName=role_session_name,WebIdentityToken=user_token)
assert resp['ResponseMetadata']['HTTPStatusCode'] == 200
s3_client = boto3.client('s3',
aws_access_key_id = resp['Credentials']['AccessKeyId'],
aws_secret_access_key = resp['Credentials']['SecretAccessKey'],
aws_session_token = resp['Credentials']['SessionToken'],
endpoint_url=default_endpoint,
region_name='',
)
bucket_name = get_new_bucket_name()
s3bucket = s3_client.create_bucket(Bucket=bucket_name)
assert s3bucket['ResponseMetadata']['HTTPStatusCode'] == 200
bkt = s3_client.delete_bucket(Bucket=bucket_name)
assert bkt['ResponseMetadata']['HTTPStatusCode'] == 204
oidc_remove=iam_client.delete_open_id_connect_provider(
OpenIDConnectProviderArn=oidc_response["OpenIDConnectProviderArn"]
)
@attr(resource='assume role with web identity')
@attr(method='put')
@attr(operation='check')
@attr(assertion='assuming role using web token with s3:ResourceTag in role permission policy')
@attr('webidentity_test')
@pytest.mark.webidentity_test
@attr('abac_test')
@pytest.mark.abac_test
@attr('token_resource_tags_test')
@pytest.mark.token_resource_tags_test
@attr('fails_on_dbstore')
@pytest.mark.fails_on_dbstore
def test_assume_role_with_web_identity_resource_tag():
check_webidentity()
iam_client=get_iam_client()
sts_client=get_sts_client()
default_endpoint=get_config_endpoint()
role_session_name=get_parameter_name()
thumbprint=get_thumbprint()
user_token=get_user_token()
realm=get_realm_name()
s3_res_iam_creds = get_s3_resource_using_iam_creds()
s3_client_iam_creds = s3_res_iam_creds.meta.client
bucket_name = get_new_bucket_name()
s3bucket = s3_client_iam_creds.create_bucket(Bucket=bucket_name)
assert s3bucket['ResponseMetadata']['HTTPStatusCode'] == 200
bucket_tagging = s3_res_iam_creds.BucketTagging(bucket_name)
Set_Tag = bucket_tagging.put(Tagging={'TagSet':[{'Key':'Department', 'Value': 'Engineering'},{'Key':'Department', 'Value': 'Marketing'}]})
oidc_response = iam_client.create_open_id_connect_provider(
Url='http://localhost:8080/auth/realms/{}'.format(realm),
ThumbprintList=[
thumbprint,
],
)
policy_document = "{\"Version\":\"2012-10-17\",\"Statement\":[{\"Effect\":\"Allow\",\"Principal\":{\"Federated\":[\""+oidc_response["OpenIDConnectProviderArn"]+"\"]},\"Action\":[\"sts:AssumeRoleWithWebIdentity\",\"sts:TagSession\"],\"Condition\":{\"StringEquals\":{\"aws:RequestTag/Department\":\"Engineering\"}}}]}"
(role_error,role_response,general_role_name)=create_role(iam_client,'/',None,policy_document,None,None,None)
assert role_response['Role']['Arn'] == 'arn:aws:iam:::role/'+general_role_name+''
role_policy = "{\"Version\":\"2012-10-17\",\"Statement\":{\"Effect\":\"Allow\",\"Action\":\"s3:*\",\"Resource\":\"arn:aws:s3:::*\",\"Condition\":{\"StringEquals\":{\"s3:ResourceTag/Department\":[\"Engineering\"]}}}}"
(role_err,response)=put_role_policy(iam_client,general_role_name,None,role_policy)
assert response['ResponseMetadata']['HTTPStatusCode'] == 200
resp=sts_client.assume_role_with_web_identity(RoleArn=role_response['Role']['Arn'],RoleSessionName=role_session_name,WebIdentityToken=user_token)
assert resp['ResponseMetadata']['HTTPStatusCode'] == 200
s3_client = boto3.client('s3',
aws_access_key_id = resp['Credentials']['AccessKeyId'],
aws_secret_access_key = resp['Credentials']['SecretAccessKey'],
aws_session_token = resp['Credentials']['SessionToken'],
endpoint_url=default_endpoint,
region_name='',
)
bucket_body = 'this is a test file'
s3_put_obj = s3_client.put_object(Body=bucket_body, Bucket=bucket_name, Key="test-1.txt")
assert s3_put_obj['ResponseMetadata']['HTTPStatusCode'] == 200
oidc_remove=iam_client.delete_open_id_connect_provider(
OpenIDConnectProviderArn=oidc_response["OpenIDConnectProviderArn"]
)
@attr(resource='assume role with web identity')
@attr(method='put')
@attr(operation='check')
@attr(assertion='assuming role using web token with s3:ResourceTag with missing tags on bucket')
@attr('webidentity_test')
@pytest.mark.webidentity_test
@attr('abac_test')
@pytest.mark.abac_test
@attr('token_resource_tags_test')
@pytest.mark.token_resource_tags_test
@attr('fails_on_dbstore')
@pytest.mark.fails_on_dbstore
def test_assume_role_with_web_identity_resource_tag_deny():
check_webidentity()
iam_client=get_iam_client()
sts_client=get_sts_client()
default_endpoint=get_config_endpoint()
role_session_name=get_parameter_name()
thumbprint=get_thumbprint()
user_token=get_user_token()
realm=get_realm_name()
s3_res_iam_creds = get_s3_resource_using_iam_creds()
s3_client_iam_creds = s3_res_iam_creds.meta.client
bucket_name = get_new_bucket_name()
s3bucket = s3_client_iam_creds.create_bucket(Bucket=bucket_name)
assert s3bucket['ResponseMetadata']['HTTPStatusCode'] == 200
oidc_response = iam_client.create_open_id_connect_provider(
Url='http://localhost:8080/auth/realms/{}'.format(realm),
ThumbprintList=[
thumbprint,
],
)
policy_document = "{\"Version\":\"2012-10-17\",\"Statement\":[{\"Effect\":\"Allow\",\"Principal\":{\"Federated\":[\""+oidc_response["OpenIDConnectProviderArn"]+"\"]},\"Action\":[\"sts:AssumeRoleWithWebIdentity\",\"sts:TagSession\"],\"Condition\":{\"StringEquals\":{\"aws:RequestTag/Department\":\"Engineering\"}}}]}"
(role_error,role_response,general_role_name)=create_role(iam_client,'/',None,policy_document,None,None,None)
assert role_response['Role']['Arn'] == 'arn:aws:iam:::role/'+general_role_name+''
role_policy = "{\"Version\":\"2012-10-17\",\"Statement\":{\"Effect\":\"Allow\",\"Action\":\"s3:*\",\"Resource\":\"arn:aws:s3:::*\",\"Condition\":{\"StringEquals\":{\"s3:ResourceTag/Department\":[\"Engineering\"]}}}}"
(role_err,response)=put_role_policy(iam_client,general_role_name,None,role_policy)
assert response['ResponseMetadata']['HTTPStatusCode'] == 200
resp=sts_client.assume_role_with_web_identity(RoleArn=role_response['Role']['Arn'],RoleSessionName=role_session_name,WebIdentityToken=user_token)
assert resp['ResponseMetadata']['HTTPStatusCode'] == 200
s3_client = boto3.client('s3',
aws_access_key_id = resp['Credentials']['AccessKeyId'],
aws_secret_access_key = resp['Credentials']['SecretAccessKey'],
aws_session_token = resp['Credentials']['SessionToken'],
endpoint_url=default_endpoint,
region_name='',
)
bucket_body = 'this is a test file'
try:
s3_put_obj = s3_client.put_object(Body=bucket_body, Bucket=bucket_name, Key="test-1.txt")
except ClientError as e:
s3_put_obj_error = e.response.get("Error", {}).get("Code")
assert s3_put_obj_error == 'AccessDenied'
oidc_remove=iam_client.delete_open_id_connect_provider(
OpenIDConnectProviderArn=oidc_response["OpenIDConnectProviderArn"]
)
@attr(resource='assume role with web identity')
@attr(method='put')
@attr(operation='check')
@attr(assertion='assuming role using web token with s3:ResourceTag with wrong resource tag in policy')
@attr('webidentity_test')
@pytest.mark.webidentity_test
@attr('abac_test')
@pytest.mark.abac_test
@attr('token_resource_tags_test')
@pytest.mark.token_resource_tags_test
@attr('fails_on_dbstore')
@pytest.mark.fails_on_dbstore
def test_assume_role_with_web_identity_wrong_resource_tag_deny():
check_webidentity()
iam_client=get_iam_client()
sts_client=get_sts_client()
default_endpoint=get_config_endpoint()
role_session_name=get_parameter_name()
thumbprint=get_thumbprint()
user_token=get_user_token()
realm=get_realm_name()
s3_res_iam_creds = get_s3_resource_using_iam_creds()
s3_client_iam_creds = s3_res_iam_creds.meta.client
bucket_name = get_new_bucket_name()
s3bucket = s3_client_iam_creds.create_bucket(Bucket=bucket_name)
assert s3bucket['ResponseMetadata']['HTTPStatusCode'] == 200
bucket_tagging = s3_res_iam_creds.BucketTagging(bucket_name)
Set_Tag = bucket_tagging.put(Tagging={'TagSet':[{'Key':'Department', 'Value': 'WrongResourcetag'}]})
oidc_response = iam_client.create_open_id_connect_provider(
Url='http://localhost:8080/auth/realms/{}'.format(realm),
ThumbprintList=[
thumbprint,
],
)
policy_document = "{\"Version\":\"2012-10-17\",\"Statement\":[{\"Effect\":\"Allow\",\"Principal\":{\"Federated\":[\""+oidc_response["OpenIDConnectProviderArn"]+"\"]},\"Action\":[\"sts:AssumeRoleWithWebIdentity\",\"sts:TagSession\"],\"Condition\":{\"StringEquals\":{\"aws:RequestTag/Department\":\"Engineering\"}}}]}"
(role_error,role_response,general_role_name)=create_role(iam_client,'/',None,policy_document,None,None,None)
assert role_response['Role']['Arn'] == 'arn:aws:iam:::role/'+general_role_name+''
role_policy = "{\"Version\":\"2012-10-17\",\"Statement\":{\"Effect\":\"Allow\",\"Action\":\"s3:*\",\"Resource\":\"arn:aws:s3:::*\",\"Condition\":{\"StringEquals\":{\"s3:ResourceTag/Department\":[\"Engineering\"]}}}}"
(role_err,response)=put_role_policy(iam_client,general_role_name,None,role_policy)
assert response['ResponseMetadata']['HTTPStatusCode'] == 200
resp=sts_client.assume_role_with_web_identity(RoleArn=role_response['Role']['Arn'],RoleSessionName=role_session_name,WebIdentityToken=user_token)
assert resp['ResponseMetadata']['HTTPStatusCode'] == 200
s3_client = boto3.client('s3',
aws_access_key_id = resp['Credentials']['AccessKeyId'],
aws_secret_access_key = resp['Credentials']['SecretAccessKey'],
aws_session_token = resp['Credentials']['SessionToken'],
endpoint_url=default_endpoint,
region_name='',
)
bucket_body = 'this is a test file'
try:
s3_put_obj = s3_client.put_object(Body=bucket_body, Bucket=bucket_name, Key="test-1.txt")
except ClientError as e:
s3_put_obj_error = e.response.get("Error", {}).get("Code")
assert s3_put_obj_error == 'AccessDenied'
oidc_remove=iam_client.delete_open_id_connect_provider(
OpenIDConnectProviderArn=oidc_response["OpenIDConnectProviderArn"]
)
@attr(resource='assume role with web identity')
@attr(method='put')
@attr(operation='check')
@attr(assertion='assuming role using web token with s3:ResourceTag matching aws:PrincipalTag in role permission policy')
@attr('webidentity_test')
@pytest.mark.webidentity_test
@attr('abac_test')
@pytest.mark.abac_test
@attr('token_resource_tags_test')
@pytest.mark.token_resource_tags_test
@attr('fails_on_dbstore')
@pytest.mark.fails_on_dbstore
def test_assume_role_with_web_identity_resource_tag_princ_tag():
check_webidentity()
iam_client=get_iam_client()
sts_client=get_sts_client()
default_endpoint=get_config_endpoint()
role_session_name=get_parameter_name()
thumbprint=get_thumbprint()
user_token=get_user_token()
realm=get_realm_name()
s3_res_iam_creds = get_s3_resource_using_iam_creds()
s3_client_iam_creds = s3_res_iam_creds.meta.client
bucket_name = get_new_bucket_name()
s3bucket = s3_client_iam_creds.create_bucket(Bucket=bucket_name)
assert s3bucket['ResponseMetadata']['HTTPStatusCode'] == 200
bucket_tagging = s3_res_iam_creds.BucketTagging(bucket_name)
Set_Tag = bucket_tagging.put(Tagging={'TagSet':[{'Key':'Department', 'Value': 'Engineering'}]})
oidc_response = iam_client.create_open_id_connect_provider(
Url='http://localhost:8080/auth/realms/{}'.format(realm),
ThumbprintList=[
thumbprint,
],
)
policy_document = "{\"Version\":\"2012-10-17\",\"Statement\":[{\"Effect\":\"Allow\",\"Principal\":{\"Federated\":[\""+oidc_response["OpenIDConnectProviderArn"]+"\"]},\"Action\":[\"sts:AssumeRoleWithWebIdentity\",\"sts:TagSession\"],\"Condition\":{\"StringEquals\":{\"aws:RequestTag/Department\":\"Engineering\"}}}]}"
(role_error,role_response,general_role_name)=create_role(iam_client,'/',None,policy_document,None,None,None)
assert role_response['Role']['Arn'] == 'arn:aws:iam:::role/'+general_role_name+''
role_policy = "{\"Version\":\"2012-10-17\",\"Statement\":{\"Effect\":\"Allow\",\"Action\":\"s3:*\",\"Resource\":\"arn:aws:s3:::*\",\"Condition\":{\"StringEquals\":{\"s3:ResourceTag/Department\":[\"${aws:PrincipalTag/Department}\"]}}}}"
(role_err,response)=put_role_policy(iam_client,general_role_name,None,role_policy)
assert response['ResponseMetadata']['HTTPStatusCode'] == 200
resp=sts_client.assume_role_with_web_identity(RoleArn=role_response['Role']['Arn'],RoleSessionName=role_session_name,WebIdentityToken=user_token)
assert resp['ResponseMetadata']['HTTPStatusCode'] == 200
s3_client = boto3.client('s3',
aws_access_key_id = resp['Credentials']['AccessKeyId'],
aws_secret_access_key = resp['Credentials']['SecretAccessKey'],
aws_session_token = resp['Credentials']['SessionToken'],
endpoint_url=default_endpoint,
region_name='',
)
bucket_body = 'this is a test file'
tags = 'Department=Engineering&Department=Marketing'
key = "test-1.txt"
s3_put_obj = s3_client.put_object(Body=bucket_body, Bucket=bucket_name, Key=key, Tagging=tags)
assert s3_put_obj['ResponseMetadata']['HTTPStatusCode'] == 200
s3_get_obj = s3_client.get_object(Bucket=bucket_name, Key=key)
assert s3_get_obj['ResponseMetadata']['HTTPStatusCode'] == 200
oidc_remove=iam_client.delete_open_id_connect_provider(
OpenIDConnectProviderArn=oidc_response["OpenIDConnectProviderArn"]
)
@attr(resource='assume role with web identity')
@attr(method='put')
@attr(operation='check')
@attr(assertion='assuming role using web token with s3:ResourceTag used to test copy object')
@attr('webidentity_test')
@pytest.mark.webidentity_test
@attr('abac_test')
@pytest.mark.abac_test
@attr('token_resource_tags_test')
@pytest.mark.token_resource_tags_test
@attr('fails_on_dbstore')
@pytest.mark.fails_on_dbstore
def test_assume_role_with_web_identity_resource_tag_copy_obj():
check_webidentity()
iam_client=get_iam_client()
sts_client=get_sts_client()
default_endpoint=get_config_endpoint()
role_session_name=get_parameter_name()
thumbprint=get_thumbprint()
user_token=get_user_token()
realm=get_realm_name()
s3_res_iam_creds = get_s3_resource_using_iam_creds()
s3_client_iam_creds = s3_res_iam_creds.meta.client
#create two buckets and add same tags to both
bucket_name = get_new_bucket_name()
s3bucket = s3_client_iam_creds.create_bucket(Bucket=bucket_name)
assert s3bucket['ResponseMetadata']['HTTPStatusCode'] == 200
bucket_tagging = s3_res_iam_creds.BucketTagging(bucket_name)
Set_Tag = bucket_tagging.put(Tagging={'TagSet':[{'Key':'Department', 'Value': 'Engineering'}]})
copy_bucket_name = get_new_bucket_name()
s3bucket = s3_client_iam_creds.create_bucket(Bucket=copy_bucket_name)
assert s3bucket['ResponseMetadata']['HTTPStatusCode'] == 200
bucket_tagging = s3_res_iam_creds.BucketTagging(copy_bucket_name)
Set_Tag = bucket_tagging.put(Tagging={'TagSet':[{'Key':'Department', 'Value': 'Engineering'}]})
oidc_response = iam_client.create_open_id_connect_provider(
Url='http://localhost:8080/auth/realms/{}'.format(realm),
ThumbprintList=[
thumbprint,
],
)
policy_document = "{\"Version\":\"2012-10-17\",\"Statement\":[{\"Effect\":\"Allow\",\"Principal\":{\"Federated\":[\""+oidc_response["OpenIDConnectProviderArn"]+"\"]},\"Action\":[\"sts:AssumeRoleWithWebIdentity\",\"sts:TagSession\"],\"Condition\":{\"StringEquals\":{\"aws:RequestTag/Department\":\"Engineering\"}}}]}"
(role_error,role_response,general_role_name)=create_role(iam_client,'/',None,policy_document,None,None,None)
assert role_response['Role']['Arn'] == 'arn:aws:iam:::role/'+general_role_name+''
role_policy = "{\"Version\":\"2012-10-17\",\"Statement\":{\"Effect\":\"Allow\",\"Action\":\"s3:*\",\"Resource\":\"arn:aws:s3:::*\",\"Condition\":{\"StringEquals\":{\"s3:ResourceTag/Department\":[\"${aws:PrincipalTag/Department}\"]}}}}"
(role_err,response)=put_role_policy(iam_client,general_role_name,None,role_policy)
assert response['ResponseMetadata']['HTTPStatusCode'] == 200
resp=sts_client.assume_role_with_web_identity(RoleArn=role_response['Role']['Arn'],RoleSessionName=role_session_name,WebIdentityToken=user_token)
assert resp['ResponseMetadata']['HTTPStatusCode'] == 200
s3_client = boto3.client('s3',
aws_access_key_id = resp['Credentials']['AccessKeyId'],
aws_secret_access_key = resp['Credentials']['SecretAccessKey'],
aws_session_token = resp['Credentials']['SessionToken'],
endpoint_url=default_endpoint,
region_name='',
)
bucket_body = 'this is a test file'
tags = 'Department=Engineering'
key = "test-1.txt"
s3_put_obj = s3_client.put_object(Body=bucket_body, Bucket=bucket_name, Key=key, Tagging=tags)
assert s3_put_obj['ResponseMetadata']['HTTPStatusCode'] == 200
#copy to same bucket
copy_source = {
'Bucket': bucket_name,
'Key': 'test-1.txt'
}
s3_client.copy(copy_source, bucket_name, "test-2.txt")
s3_get_obj = s3_client.get_object(Bucket=bucket_name, Key="test-2.txt")
assert s3_get_obj['ResponseMetadata']['HTTPStatusCode'] == 200
#copy to another bucket
copy_source = {
'Bucket': bucket_name,
'Key': 'test-1.txt'
}
s3_client.copy(copy_source, copy_bucket_name, "test-1.txt")
s3_get_obj = s3_client.get_object(Bucket=copy_bucket_name, Key="test-1.txt")
assert s3_get_obj['ResponseMetadata']['HTTPStatusCode'] == 200
oidc_remove=iam_client.delete_open_id_connect_provider(
OpenIDConnectProviderArn=oidc_response["OpenIDConnectProviderArn"]
)
@attr(resource='assume role with web identity')
@attr(method='put')
@attr(operation='check')
@attr(assertion='assuming role using web token with iam:ResourceTag in role trust policy')
@attr('webidentity_test')
@pytest.mark.webidentity_test
@attr('abac_test')
@pytest.mark.abac_test
@attr('token_role_tags_test')
@pytest.mark.token_role_tags_test
@attr('fails_on_dbstore')
@pytest.mark.fails_on_dbstore
def test_assume_role_with_web_identity_role_resource_tag():
check_webidentity()
iam_client=get_iam_client()
sts_client=get_sts_client()
default_endpoint=get_config_endpoint()
role_session_name=get_parameter_name()
thumbprint=get_thumbprint()
user_token=get_user_token()
realm=get_realm_name()
s3_res_iam_creds = get_s3_resource_using_iam_creds()
s3_client_iam_creds = s3_res_iam_creds.meta.client
bucket_name = get_new_bucket_name()
s3bucket = s3_client_iam_creds.create_bucket(Bucket=bucket_name)
assert s3bucket['ResponseMetadata']['HTTPStatusCode'] == 200
bucket_tagging = s3_res_iam_creds.BucketTagging(bucket_name)
Set_Tag = bucket_tagging.put(Tagging={'TagSet':[{'Key':'Department', 'Value': 'Engineering'},{'Key':'Department', 'Value': 'Marketing'}]})
oidc_response = iam_client.create_open_id_connect_provider(
Url='http://localhost:8080/auth/realms/{}'.format(realm),
ThumbprintList=[
thumbprint,
],
)
#iam:ResourceTag refers to the tag attached to role, hence the role is allowed to be assumed only when it has a tag matching the policy.
policy_document = "{\"Version\":\"2012-10-17\",\"Statement\":[{\"Effect\":\"Allow\",\"Principal\":{\"Federated\":[\""+oidc_response["OpenIDConnectProviderArn"]+"\"]},\"Action\":[\"sts:AssumeRoleWithWebIdentity\",\"sts:TagSession\"],\"Condition\":{\"StringEquals\":{\"iam:ResourceTag/Department\":\"Engineering\"}}}]}"
tags_list = [
{'Key':'Department','Value':'Engineering'},
{'Key':'Department','Value':'Marketing'}
]
(role_error,role_response,general_role_name)=create_role(iam_client,'/',None,policy_document,None,None,None,tags_list)
assert role_response['Role']['Arn'] == 'arn:aws:iam:::role/'+general_role_name+''
role_policy = "{\"Version\":\"2012-10-17\",\"Statement\":{\"Effect\":\"Allow\",\"Action\":\"s3:*\",\"Resource\":\"arn:aws:s3:::*\",\"Condition\":{\"StringEquals\":{\"s3:ResourceTag/Department\":[\"Engineering\"]}}}}"
(role_err,response)=put_role_policy(iam_client,general_role_name,None,role_policy)
assert response['ResponseMetadata']['HTTPStatusCode'] == 200
resp=sts_client.assume_role_with_web_identity(RoleArn=role_response['Role']['Arn'],RoleSessionName=role_session_name,WebIdentityToken=user_token)
assert resp['ResponseMetadata']['HTTPStatusCode'] == 200
s3_client = boto3.client('s3',
aws_access_key_id = resp['Credentials']['AccessKeyId'],
aws_secret_access_key = resp['Credentials']['SecretAccessKey'],
aws_session_token = resp['Credentials']['SessionToken'],
endpoint_url=default_endpoint,
region_name='',
)
bucket_body = 'this is a test file'
s3_put_obj = s3_client.put_object(Body=bucket_body, Bucket=bucket_name, Key="test-1.txt")
assert s3_put_obj['ResponseMetadata']['HTTPStatusCode'] == 200
oidc_remove=iam_client.delete_open_id_connect_provider(
OpenIDConnectProviderArn=oidc_response["OpenIDConnectProviderArn"]
)