mirror of
https://github.com/ceph/s3-tests.git
synced 2024-11-27 23:31:41 +00:00
Webidentity Test addition to test_sts.py
Signed-off-by: Kalpesh Pandya <kapandya@redhat.com> Few main changes/additions: 1. Webidentity test addition to test_sts.py. 2. A function named check_webidentity() added to __init__.py in order to check for section presence. 3. Few lines shifted from setup() to get_iam_client() to make them execute only when sts-tests run. 4. Documentation update (for sts section) 5. Changes in s3tests.conf.SAMPLE regarding sts sections
This commit is contained in:
parent
0b2d7f729d
commit
f4a052dfcf
4 changed files with 196 additions and 17 deletions
11
README.rst
11
README.rst
|
@ -58,13 +58,16 @@ You can run only the boto3 tests with::
|
||||||
STS compatibility tests
|
STS compatibility tests
|
||||||
========================
|
========================
|
||||||
|
|
||||||
This section contains some basic tests for the AssumeRole and GetSessionToken API's. The test file is located under ``s3tests_boto3/functional``.
|
This section contains some basic tests for the AssumeRole, GetSessionToken and AssumeRoleWithWebIdentity API's. The test file is located under ``s3tests_boto3/functional``.
|
||||||
|
|
||||||
You can run only the sts tests with::
|
You can run only the sts tests (all the three API's) with::
|
||||||
|
|
||||||
S3TEST_CONF=your.conf ./virtualenv/bin/nosetests s3tests_boto3.functional.test_sts
|
S3TEST_CONF=your.conf ./virtualenv/bin/nosetests s3tests_boto3.functional.test_sts
|
||||||
|
|
||||||
You can filter tests based on the attributes. There is a attribute named ``sts_test`` to run specifically the sts tests as below::
|
You can filter tests based on the attributes. There is a attribute named ``test_of_sts`` to run AssumeRole and GetSessionToken tests and ``webidentity_test`` to run the AssumeRoleWithWebIdentity tests. If you want to execute only ``test_of_sts`` tests you can apply that filter as below::
|
||||||
|
|
||||||
S3TEST_CONF=your.conf ./virtualenv/bin/nosetests -v -s -A 'sts_test' s3tests_boto3.functional.test_sts
|
S3TEST_CONF=your.conf ./virtualenv/bin/nosetests -v -s -A 'test_of_sts' s3tests_boto3.functional.test_sts
|
||||||
|
|
||||||
|
For running ``webidentity_test`` you'll need have Keycloak running.
|
||||||
|
|
||||||
|
In order to run any STS test you'll need to add "iam" section to the config file. For further reference on how your config file should look check ``s3tests.conf.SAMPLE``.
|
||||||
|
|
|
@ -69,6 +69,7 @@ secret_key = opqrstuvwxyzabcdefghijklmnopqrstuvwxyzab
|
||||||
# tenant email set in vstart.sh
|
# tenant email set in vstart.sh
|
||||||
email = tenanteduser@example.com
|
email = tenanteduser@example.com
|
||||||
|
|
||||||
|
#following section needs to be added for all sts-tests
|
||||||
[iam]
|
[iam]
|
||||||
#used for iam operations in sts-tests
|
#used for iam operations in sts-tests
|
||||||
#user_id from vstart.sh
|
#user_id from vstart.sh
|
||||||
|
@ -82,3 +83,15 @@ secret_key = abcdefghijklmnopqrstuvwxyzabcdefghijklmn
|
||||||
|
|
||||||
#display_name from vstart.sh
|
#display_name from vstart.sh
|
||||||
display_name = youruseridhere
|
display_name = youruseridhere
|
||||||
|
|
||||||
|
#following section needs to be added when you want to run Assume Role With Webidentity test
|
||||||
|
[webidentity]
|
||||||
|
#used for assume role with web identity test in sts-tests
|
||||||
|
#all parameters will be obtained from ceph/qa/tasks/keycloak.py
|
||||||
|
token=<access_token>
|
||||||
|
|
||||||
|
aud=<obtained after introspecting token>
|
||||||
|
|
||||||
|
thumbprint=<obtained from x509 certificate>
|
||||||
|
|
||||||
|
KC_REALM=<name of the realm>
|
||||||
|
|
|
@ -158,8 +158,6 @@ def setup():
|
||||||
raise RuntimeError('Your config file is missing the "s3 alt" section!')
|
raise RuntimeError('Your config file is missing the "s3 alt" section!')
|
||||||
if not cfg.has_section("s3 tenant"):
|
if not cfg.has_section("s3 tenant"):
|
||||||
raise RuntimeError('Your config file is missing the "s3 tenant" section!')
|
raise RuntimeError('Your config file is missing the "s3 tenant" section!')
|
||||||
if not cfg.has_section("iam"):
|
|
||||||
raise RuntimeError('Your config file is missing the "iam" section!')
|
|
||||||
|
|
||||||
global prefix
|
global prefix
|
||||||
|
|
||||||
|
@ -207,12 +205,6 @@ def setup():
|
||||||
config.tenant_user_id = cfg.get('s3 tenant',"user_id")
|
config.tenant_user_id = cfg.get('s3 tenant',"user_id")
|
||||||
config.tenant_email = cfg.get('s3 tenant',"email")
|
config.tenant_email = cfg.get('s3 tenant',"email")
|
||||||
|
|
||||||
config.iam_access_key = cfg.get('iam',"access_key")
|
|
||||||
config.iam_secret_key = cfg.get('iam',"secret_key")
|
|
||||||
config.iam_display_name = cfg.get('iam',"display_name")
|
|
||||||
config.iam_user_id = cfg.get('iam',"user_id")
|
|
||||||
#config.iam_email = cfg.get('iam',"email")
|
|
||||||
|
|
||||||
# vars from the fixtures section
|
# vars from the fixtures section
|
||||||
try:
|
try:
|
||||||
template = cfg.get('fixtures', "bucket prefix")
|
template = cfg.get('fixtures', "bucket prefix")
|
||||||
|
@ -233,6 +225,24 @@ def teardown():
|
||||||
nuke_prefixed_buckets(prefix=prefix, client=alt_client)
|
nuke_prefixed_buckets(prefix=prefix, client=alt_client)
|
||||||
nuke_prefixed_buckets(prefix=prefix, client=tenant_client)
|
nuke_prefixed_buckets(prefix=prefix, client=tenant_client)
|
||||||
|
|
||||||
|
def check_webidentity():
|
||||||
|
cfg = configparser.RawConfigParser()
|
||||||
|
try:
|
||||||
|
path = os.environ['S3TEST_CONF']
|
||||||
|
except KeyError:
|
||||||
|
raise RuntimeError(
|
||||||
|
'To run tests, point environment '
|
||||||
|
+ 'variable S3TEST_CONF to a config file.',
|
||||||
|
)
|
||||||
|
cfg.read(path)
|
||||||
|
if not cfg.has_section("webidentity"):
|
||||||
|
raise RuntimeError('Your config file is missing the "webidentity" section!')
|
||||||
|
|
||||||
|
config.webidentity_thumbprint = cfg.get('webidentity', "thumbprint")
|
||||||
|
config.webidentity_aud = cfg.get('webidentity', "aud")
|
||||||
|
config.webidentity_token = cfg.get('webidentity', "token")
|
||||||
|
config.webidentity_realm = cfg.get('webidentity', "KC_REALM")
|
||||||
|
|
||||||
def get_client(client_config=None):
|
def get_client(client_config=None):
|
||||||
if client_config == None:
|
if client_config == None:
|
||||||
client_config = Config(signature_version='s3v4')
|
client_config = Config(signature_version='s3v4')
|
||||||
|
@ -268,6 +278,24 @@ def get_sts_client(client_config=None):
|
||||||
return client
|
return client
|
||||||
|
|
||||||
def get_iam_client(client_config=None):
|
def get_iam_client(client_config=None):
|
||||||
|
cfg = configparser.RawConfigParser()
|
||||||
|
try:
|
||||||
|
path = os.environ['S3TEST_CONF']
|
||||||
|
except KeyError:
|
||||||
|
raise RuntimeError(
|
||||||
|
'To run tests, point environment '
|
||||||
|
+ 'variable S3TEST_CONF to a config file.',
|
||||||
|
)
|
||||||
|
cfg.read(path)
|
||||||
|
if not cfg.has_section("iam"):
|
||||||
|
raise RuntimeError('Your config file is missing the "iam" section!')
|
||||||
|
|
||||||
|
config.iam_access_key = cfg.get('iam',"access_key")
|
||||||
|
config.iam_secret_key = cfg.get('iam',"secret_key")
|
||||||
|
config.iam_display_name = cfg.get('iam',"display_name")
|
||||||
|
config.iam_user_id = cfg.get('iam',"user_id")
|
||||||
|
config.iam_email = cfg.get('iam',"email")
|
||||||
|
|
||||||
if client_config == None:
|
if client_config == None:
|
||||||
client_config = Config(signature_version='s3v4')
|
client_config = Config(signature_version='s3v4')
|
||||||
|
|
||||||
|
@ -474,3 +502,15 @@ def get_tenant_user_id():
|
||||||
|
|
||||||
def get_tenant_email():
|
def get_tenant_email():
|
||||||
return config.tenant_email
|
return config.tenant_email
|
||||||
|
|
||||||
|
def get_thumbprint():
|
||||||
|
return config.webidentity_thumbprint
|
||||||
|
|
||||||
|
def get_aud():
|
||||||
|
return config.webidentity_aud
|
||||||
|
|
||||||
|
def get_token():
|
||||||
|
return config.webidentity_token
|
||||||
|
|
||||||
|
def get_realm_name():
|
||||||
|
return config.webidentity_realm
|
||||||
|
|
|
@ -41,8 +41,15 @@ from . import(
|
||||||
get_parameter_name,
|
get_parameter_name,
|
||||||
get_main_aws_access_key,
|
get_main_aws_access_key,
|
||||||
get_main_aws_secret_key,
|
get_main_aws_secret_key,
|
||||||
|
get_thumbprint,
|
||||||
|
get_aud,
|
||||||
|
get_token,
|
||||||
|
get_realm_name,
|
||||||
|
check_webidentity
|
||||||
)
|
)
|
||||||
|
|
||||||
|
log = logging.getLogger(__name__)
|
||||||
|
|
||||||
def create_role(iam_client,path,rolename,policy_document,description,sessionduration,permissionboundary):
|
def create_role(iam_client,path,rolename,policy_document,description,sessionduration,permissionboundary):
|
||||||
role_err=None
|
role_err=None
|
||||||
if rolename is None:
|
if rolename is None:
|
||||||
|
@ -77,17 +84,20 @@ def put_user_policy(iam_client,username,policyname,policy_document):
|
||||||
@attr(method='get')
|
@attr(method='get')
|
||||||
@attr(operation='check')
|
@attr(operation='check')
|
||||||
@attr(assertion='s3 ops only accessible by temporary credentials')
|
@attr(assertion='s3 ops only accessible by temporary credentials')
|
||||||
@attr('sts_test')
|
@attr('test_of_sts')
|
||||||
def test_get_session_token():
|
def test_get_session_token():
|
||||||
iam_client=get_iam_client()
|
iam_client=get_iam_client()
|
||||||
sts_client=get_sts_client()
|
sts_client=get_sts_client()
|
||||||
sts_user_id=get_alt_user_id()
|
sts_user_id=get_alt_user_id()
|
||||||
default_endpoint=get_config_endpoint()
|
default_endpoint=get_config_endpoint()
|
||||||
|
|
||||||
user_policy = "{\"Version\":\"2012-10-17\",\"Statement\":[{\"Effect\":\"Deny\",\"Action\":\"s3:*\",\"Resource\":[\"*\"],\"Condition\":{\"BoolIfExists\":{\"sts:authentication\":\"false\"}}},{\"Effect\":\"Allow\",\"Action\":\"sts:GetSessionToken\",\"Resource\":\"*\",\"Condition\":{\"BoolIfExists\":{\"sts:authentication\":\"false\"}}}]}"
|
user_policy = "{\"Version\":\"2012-10-17\",\"Statement\":[{\"Effect\":\"Deny\",\"Action\":\"s3:*\",\"Resource\":[\"*\"],\"Condition\":{\"BoolIfExists\":{\"sts:authentication\":\"false\"}}},{\"Effect\":\"Allow\",\"Action\":\"sts:GetSessionToken\",\"Resource\":\"*\",\"Condition\":{\"BoolIfExists\":{\"sts:authentication\":\"false\"}}}]}"
|
||||||
(resp_err,resp)=put_user_policy(iam_client,sts_user_id,None,user_policy)
|
(resp_err,resp)=put_user_policy(iam_client,sts_user_id,None,user_policy)
|
||||||
eq(resp['ResponseMetadata']['HTTPStatusCode'],200)
|
eq(resp['ResponseMetadata']['HTTPStatusCode'],200)
|
||||||
|
|
||||||
response=sts_client.get_session_token()
|
response=sts_client.get_session_token()
|
||||||
eq(response['ResponseMetadata']['HTTPStatusCode'],200)
|
eq(response['ResponseMetadata']['HTTPStatusCode'],200)
|
||||||
|
|
||||||
s3_client=boto3.client('s3',
|
s3_client=boto3.client('s3',
|
||||||
aws_access_key_id = response['Credentials']['AccessKeyId'],
|
aws_access_key_id = response['Credentials']['AccessKeyId'],
|
||||||
aws_secret_access_key = response['Credentials']['SecretAccessKey'],
|
aws_secret_access_key = response['Credentials']['SecretAccessKey'],
|
||||||
|
@ -104,7 +114,7 @@ def test_get_session_token():
|
||||||
@attr(method='get')
|
@attr(method='get')
|
||||||
@attr(operation='check')
|
@attr(operation='check')
|
||||||
@attr(assertion='s3 ops denied by permanent credentials')
|
@attr(assertion='s3 ops denied by permanent credentials')
|
||||||
@attr('sts_test')
|
@attr('test_of_sts')
|
||||||
def test_get_session_token_permanent_creds_denied():
|
def test_get_session_token_permanent_creds_denied():
|
||||||
s3bucket_error=None
|
s3bucket_error=None
|
||||||
iam_client=get_iam_client()
|
iam_client=get_iam_client()
|
||||||
|
@ -113,11 +123,14 @@ def test_get_session_token_permanent_creds_denied():
|
||||||
default_endpoint=get_config_endpoint()
|
default_endpoint=get_config_endpoint()
|
||||||
s3_main_access_key=get_main_aws_access_key()
|
s3_main_access_key=get_main_aws_access_key()
|
||||||
s3_main_secret_key=get_main_aws_secret_key()
|
s3_main_secret_key=get_main_aws_secret_key()
|
||||||
|
|
||||||
user_policy = "{\"Version\":\"2012-10-17\",\"Statement\":[{\"Effect\":\"Deny\",\"Action\":\"s3:*\",\"Resource\":[\"*\"],\"Condition\":{\"BoolIfExists\":{\"sts:authentication\":\"false\"}}},{\"Effect\":\"Allow\",\"Action\":\"sts:GetSessionToken\",\"Resource\":\"*\",\"Condition\":{\"BoolIfExists\":{\"sts:authentication\":\"false\"}}}]}"
|
user_policy = "{\"Version\":\"2012-10-17\",\"Statement\":[{\"Effect\":\"Deny\",\"Action\":\"s3:*\",\"Resource\":[\"*\"],\"Condition\":{\"BoolIfExists\":{\"sts:authentication\":\"false\"}}},{\"Effect\":\"Allow\",\"Action\":\"sts:GetSessionToken\",\"Resource\":\"*\",\"Condition\":{\"BoolIfExists\":{\"sts:authentication\":\"false\"}}}]}"
|
||||||
(resp_err,resp)=put_user_policy(iam_client,sts_user_id,None,user_policy)
|
(resp_err,resp)=put_user_policy(iam_client,sts_user_id,None,user_policy)
|
||||||
eq(resp['ResponseMetadata']['HTTPStatusCode'],200)
|
eq(resp['ResponseMetadata']['HTTPStatusCode'],200)
|
||||||
|
|
||||||
response=sts_client.get_session_token()
|
response=sts_client.get_session_token()
|
||||||
eq(response['ResponseMetadata']['HTTPStatusCode'],200)
|
eq(response['ResponseMetadata']['HTTPStatusCode'],200)
|
||||||
|
|
||||||
s3_client=boto3.client('s3',
|
s3_client=boto3.client('s3',
|
||||||
aws_access_key_id = s3_main_access_key,
|
aws_access_key_id = s3_main_access_key,
|
||||||
aws_secret_access_key = s3_main_secret_key,
|
aws_secret_access_key = s3_main_secret_key,
|
||||||
|
@ -136,21 +149,25 @@ def test_get_session_token_permanent_creds_denied():
|
||||||
@attr(method='get')
|
@attr(method='get')
|
||||||
@attr(operation='check')
|
@attr(operation='check')
|
||||||
@attr(assertion='role policy allows all s3 ops')
|
@attr(assertion='role policy allows all s3 ops')
|
||||||
@attr('sts_test')
|
@attr('test_of_sts')
|
||||||
def test_assume_role_allow():
|
def test_assume_role_allow():
|
||||||
iam_client=get_iam_client()
|
iam_client=get_iam_client()
|
||||||
sts_client=get_sts_client()
|
sts_client=get_sts_client()
|
||||||
sts_user_id=get_alt_user_id()
|
sts_user_id=get_alt_user_id()
|
||||||
default_endpoint=get_config_endpoint()
|
default_endpoint=get_config_endpoint()
|
||||||
role_session_name=get_parameter_name()
|
role_session_name=get_parameter_name()
|
||||||
|
|
||||||
policy_document = "{\"Version\":\"2012-10-17\",\"Statement\":[{\"Effect\":\"Allow\",\"Principal\":{\"AWS\":[\"arn:aws:iam:::user/"+sts_user_id+"\"]},\"Action\":[\"sts:AssumeRole\"]}]}"
|
policy_document = "{\"Version\":\"2012-10-17\",\"Statement\":[{\"Effect\":\"Allow\",\"Principal\":{\"AWS\":[\"arn:aws:iam:::user/"+sts_user_id+"\"]},\"Action\":[\"sts:AssumeRole\"]}]}"
|
||||||
(role_error,role_response,general_role_name)=create_role(iam_client,'/',None,policy_document,None,None,None)
|
(role_error,role_response,general_role_name)=create_role(iam_client,'/',None,policy_document,None,None,None)
|
||||||
eq(role_response['Role']['Arn'],'arn:aws:iam:::role/'+general_role_name+'')
|
eq(role_response['Role']['Arn'],'arn:aws:iam:::role/'+general_role_name+'')
|
||||||
|
|
||||||
role_policy = "{\"Version\":\"2012-10-17\",\"Statement\":{\"Effect\":\"Allow\",\"Action\":\"s3:*\",\"Resource\":\"arn:aws:s3:::*\"}}"
|
role_policy = "{\"Version\":\"2012-10-17\",\"Statement\":{\"Effect\":\"Allow\",\"Action\":\"s3:*\",\"Resource\":\"arn:aws:s3:::*\"}}"
|
||||||
(role_err,response)=put_role_policy(iam_client,general_role_name,None,role_policy)
|
(role_err,response)=put_role_policy(iam_client,general_role_name,None,role_policy)
|
||||||
eq(response['ResponseMetadata']['HTTPStatusCode'],200)
|
eq(response['ResponseMetadata']['HTTPStatusCode'],200)
|
||||||
|
|
||||||
resp=sts_client.assume_role(RoleArn=role_response['Role']['Arn'],RoleSessionName=role_session_name)
|
resp=sts_client.assume_role(RoleArn=role_response['Role']['Arn'],RoleSessionName=role_session_name)
|
||||||
eq(resp['ResponseMetadata']['HTTPStatusCode'],200)
|
eq(resp['ResponseMetadata']['HTTPStatusCode'],200)
|
||||||
|
|
||||||
s3_client = boto3.client('s3',
|
s3_client = boto3.client('s3',
|
||||||
aws_access_key_id = resp['Credentials']['AccessKeyId'],
|
aws_access_key_id = resp['Credentials']['AccessKeyId'],
|
||||||
aws_secret_access_key = resp['Credentials']['SecretAccessKey'],
|
aws_secret_access_key = resp['Credentials']['SecretAccessKey'],
|
||||||
|
@ -168,7 +185,7 @@ def test_assume_role_allow():
|
||||||
@attr(method='get')
|
@attr(method='get')
|
||||||
@attr(operation='check')
|
@attr(operation='check')
|
||||||
@attr(assertion='role policy denies all s3 ops')
|
@attr(assertion='role policy denies all s3 ops')
|
||||||
@attr('sts_test')
|
@attr('test_of_sts')
|
||||||
def test_assume_role_deny():
|
def test_assume_role_deny():
|
||||||
s3bucket_error=None
|
s3bucket_error=None
|
||||||
iam_client=get_iam_client()
|
iam_client=get_iam_client()
|
||||||
|
@ -176,14 +193,18 @@ def test_assume_role_deny():
|
||||||
sts_user_id=get_alt_user_id()
|
sts_user_id=get_alt_user_id()
|
||||||
default_endpoint=get_config_endpoint()
|
default_endpoint=get_config_endpoint()
|
||||||
role_session_name=get_parameter_name()
|
role_session_name=get_parameter_name()
|
||||||
|
|
||||||
policy_document = "{\"Version\":\"2012-10-17\",\"Statement\":[{\"Effect\":\"Allow\",\"Principal\":{\"AWS\":[\"arn:aws:iam:::user/"+sts_user_id+"\"]},\"Action\":[\"sts:AssumeRole\"]}]}"
|
policy_document = "{\"Version\":\"2012-10-17\",\"Statement\":[{\"Effect\":\"Allow\",\"Principal\":{\"AWS\":[\"arn:aws:iam:::user/"+sts_user_id+"\"]},\"Action\":[\"sts:AssumeRole\"]}]}"
|
||||||
(role_error,role_response,general_role_name)=create_role(iam_client,'/',None,policy_document,None,None,None)
|
(role_error,role_response,general_role_name)=create_role(iam_client,'/',None,policy_document,None,None,None)
|
||||||
eq(role_response['Role']['Arn'],'arn:aws:iam:::role/'+general_role_name+'')
|
eq(role_response['Role']['Arn'],'arn:aws:iam:::role/'+general_role_name+'')
|
||||||
|
|
||||||
role_policy = "{\"Version\":\"2012-10-17\",\"Statement\":{\"Effect\":\"Deny\",\"Action\":\"s3:*\",\"Resource\":\"arn:aws:s3:::*\"}}"
|
role_policy = "{\"Version\":\"2012-10-17\",\"Statement\":{\"Effect\":\"Deny\",\"Action\":\"s3:*\",\"Resource\":\"arn:aws:s3:::*\"}}"
|
||||||
(role_err,response)=put_role_policy(iam_client,general_role_name,None,role_policy)
|
(role_err,response)=put_role_policy(iam_client,general_role_name,None,role_policy)
|
||||||
eq(response['ResponseMetadata']['HTTPStatusCode'],200)
|
eq(response['ResponseMetadata']['HTTPStatusCode'],200)
|
||||||
|
|
||||||
resp=sts_client.assume_role(RoleArn=role_response['Role']['Arn'],RoleSessionName=role_session_name)
|
resp=sts_client.assume_role(RoleArn=role_response['Role']['Arn'],RoleSessionName=role_session_name)
|
||||||
eq(resp['ResponseMetadata']['HTTPStatusCode'],200)
|
eq(resp['ResponseMetadata']['HTTPStatusCode'],200)
|
||||||
|
|
||||||
s3_client = boto3.client('s3',
|
s3_client = boto3.client('s3',
|
||||||
aws_access_key_id = resp['Credentials']['AccessKeyId'],
|
aws_access_key_id = resp['Credentials']['AccessKeyId'],
|
||||||
aws_secret_access_key = resp['Credentials']['SecretAccessKey'],
|
aws_secret_access_key = resp['Credentials']['SecretAccessKey'],
|
||||||
|
@ -202,22 +223,26 @@ def test_assume_role_deny():
|
||||||
@attr(method='get')
|
@attr(method='get')
|
||||||
@attr(operation='check')
|
@attr(operation='check')
|
||||||
@attr(assertion='creds expire so all s3 ops fails')
|
@attr(assertion='creds expire so all s3 ops fails')
|
||||||
@attr('sts_test')
|
@attr('test_of_sts')
|
||||||
def test_assume_role_creds_expiry():
|
def test_assume_role_creds_expiry():
|
||||||
iam_client=get_iam_client()
|
iam_client=get_iam_client()
|
||||||
sts_client=get_sts_client()
|
sts_client=get_sts_client()
|
||||||
sts_user_id=get_alt_user_id()
|
sts_user_id=get_alt_user_id()
|
||||||
default_endpoint=get_config_endpoint()
|
default_endpoint=get_config_endpoint()
|
||||||
role_session_name=get_parameter_name()
|
role_session_name=get_parameter_name()
|
||||||
|
|
||||||
policy_document = "{\"Version\":\"2012-10-17\",\"Statement\":[{\"Effect\":\"Allow\",\"Principal\":{\"AWS\":[\"arn:aws:iam:::user/"+sts_user_id+"\"]},\"Action\":[\"sts:AssumeRole\"]}]}"
|
policy_document = "{\"Version\":\"2012-10-17\",\"Statement\":[{\"Effect\":\"Allow\",\"Principal\":{\"AWS\":[\"arn:aws:iam:::user/"+sts_user_id+"\"]},\"Action\":[\"sts:AssumeRole\"]}]}"
|
||||||
(role_error,role_response,general_role_name)=create_role(iam_client,'/',None,policy_document,None,None,None)
|
(role_error,role_response,general_role_name)=create_role(iam_client,'/',None,policy_document,None,None,None)
|
||||||
eq(role_response['Role']['Arn'],'arn:aws:iam:::role/'+general_role_name+'')
|
eq(role_response['Role']['Arn'],'arn:aws:iam:::role/'+general_role_name+'')
|
||||||
|
|
||||||
role_policy = "{\"Version\":\"2012-10-17\",\"Statement\":{\"Effect\":\"Allow\",\"Action\":\"s3:*\",\"Resource\":\"arn:aws:s3:::*\"}}"
|
role_policy = "{\"Version\":\"2012-10-17\",\"Statement\":{\"Effect\":\"Allow\",\"Action\":\"s3:*\",\"Resource\":\"arn:aws:s3:::*\"}}"
|
||||||
(role_err,response)=put_role_policy(iam_client,general_role_name,None,role_policy)
|
(role_err,response)=put_role_policy(iam_client,general_role_name,None,role_policy)
|
||||||
eq(response['ResponseMetadata']['HTTPStatusCode'],200)
|
eq(response['ResponseMetadata']['HTTPStatusCode'],200)
|
||||||
|
|
||||||
resp=sts_client.assume_role(RoleArn=role_response['Role']['Arn'],RoleSessionName=role_session_name,DurationSeconds=900)
|
resp=sts_client.assume_role(RoleArn=role_response['Role']['Arn'],RoleSessionName=role_session_name,DurationSeconds=900)
|
||||||
eq(resp['ResponseMetadata']['HTTPStatusCode'],200)
|
eq(resp['ResponseMetadata']['HTTPStatusCode'],200)
|
||||||
time.sleep(900)
|
time.sleep(900)
|
||||||
|
|
||||||
s3_client = boto3.client('s3',
|
s3_client = boto3.client('s3',
|
||||||
aws_access_key_id = resp['Credentials']['AccessKeyId'],
|
aws_access_key_id = resp['Credentials']['AccessKeyId'],
|
||||||
aws_secret_access_key = resp['Credentials']['SecretAccessKey'],
|
aws_secret_access_key = resp['Credentials']['SecretAccessKey'],
|
||||||
|
@ -231,3 +256,101 @@ def test_assume_role_creds_expiry():
|
||||||
except ClientError as e:
|
except ClientError as e:
|
||||||
s3bucket_error = e.response.get("Error", {}).get("Code")
|
s3bucket_error = e.response.get("Error", {}).get("Code")
|
||||||
eq(s3bucket_error,'AccessDenied')
|
eq(s3bucket_error,'AccessDenied')
|
||||||
|
|
||||||
|
@attr(resource='assume role with web identity')
|
||||||
|
@attr(method='get')
|
||||||
|
@attr(operation='check')
|
||||||
|
@attr(assertion='assuming role through web token')
|
||||||
|
@attr('webidentity_test')
|
||||||
|
def test_assume_role_with_web_identity():
|
||||||
|
check_webidentity()
|
||||||
|
iam_client=get_iam_client()
|
||||||
|
sts_client=get_sts_client()
|
||||||
|
default_endpoint=get_config_endpoint()
|
||||||
|
role_session_name=get_parameter_name()
|
||||||
|
thumbprint=get_thumbprint()
|
||||||
|
aud=get_aud()
|
||||||
|
token=get_token()
|
||||||
|
realm=get_realm_name()
|
||||||
|
|
||||||
|
oidc_response = iam_client.create_open_id_connect_provider(
|
||||||
|
Url='http://localhost:8080/auth/realms/{}'.format(realm),
|
||||||
|
ThumbprintList=[
|
||||||
|
thumbprint,
|
||||||
|
],
|
||||||
|
)
|
||||||
|
|
||||||
|
policy_document = "{\"Version\":\"2012-10-17\",\"Statement\":[{\"Effect\":\"Allow\",\"Principal\":{\"Federated\":[\""+oidc_response["OpenIDConnectProviderArn"]+"\"]},\"Action\":[\"sts:AssumeRoleWithWebIdentity\"],\"Condition\":{\"StringEquals\":{\"localhost:8080/auth/realms/"+realm+":app_id\":\""+aud+"\"}}}]}"
|
||||||
|
(role_error,role_response,general_role_name)=create_role(iam_client,'/',None,policy_document,None,None,None)
|
||||||
|
eq(role_response['Role']['Arn'],'arn:aws:iam:::role/'+general_role_name+'')
|
||||||
|
|
||||||
|
role_policy = "{\"Version\":\"2012-10-17\",\"Statement\":{\"Effect\":\"Allow\",\"Action\":\"s3:*\",\"Resource\":\"arn:aws:s3:::*\"}}"
|
||||||
|
(role_err,response)=put_role_policy(iam_client,general_role_name,None,role_policy)
|
||||||
|
eq(response['ResponseMetadata']['HTTPStatusCode'],200)
|
||||||
|
|
||||||
|
resp=sts_client.assume_role_with_web_identity(RoleArn=role_response['Role']['Arn'],RoleSessionName=role_session_name,WebIdentityToken=token)
|
||||||
|
eq(resp['ResponseMetadata']['HTTPStatusCode'],200)
|
||||||
|
|
||||||
|
s3_client = boto3.client('s3',
|
||||||
|
aws_access_key_id = resp['Credentials']['AccessKeyId'],
|
||||||
|
aws_secret_access_key = resp['Credentials']['SecretAccessKey'],
|
||||||
|
aws_session_token = resp['Credentials']['SessionToken'],
|
||||||
|
endpoint_url=default_endpoint,
|
||||||
|
region_name='',
|
||||||
|
)
|
||||||
|
bucket_name = get_new_bucket_name()
|
||||||
|
s3bucket = s3_client.create_bucket(Bucket=bucket_name)
|
||||||
|
eq(s3bucket['ResponseMetadata']['HTTPStatusCode'],200)
|
||||||
|
bkt = s3_client.delete_bucket(Bucket=bucket_name)
|
||||||
|
eq(bkt['ResponseMetadata']['HTTPStatusCode'],204)
|
||||||
|
|
||||||
|
oidc_remove=iam_client.delete_open_id_connect_provider(
|
||||||
|
OpenIDConnectProviderArn=oidc_response["OpenIDConnectProviderArn"]
|
||||||
|
)
|
||||||
|
|
||||||
|
'''
|
||||||
|
@attr(resource='assume role with web identity')
|
||||||
|
@attr(method='get')
|
||||||
|
@attr(operation='check')
|
||||||
|
@attr(assertion='assume_role_with_web_token creds expire')
|
||||||
|
@attr('webidentity_test')
|
||||||
|
def test_assume_role_with_web_identity_invalid_webtoken():
|
||||||
|
resp_error=None
|
||||||
|
iam_client=get_iam_client()
|
||||||
|
sts_client=get_sts_client()
|
||||||
|
default_endpoint=get_config_endpoint()
|
||||||
|
role_session_name=get_parameter_name()
|
||||||
|
thumbprint=get_thumbprint()
|
||||||
|
aud=get_aud()
|
||||||
|
token=get_token()
|
||||||
|
realm=get_realm_name()
|
||||||
|
|
||||||
|
oidc_response = iam_client.create_open_id_connect_provider(
|
||||||
|
Url='http://localhost:8080/auth/realms/{}'.format(realm),
|
||||||
|
ThumbprintList=[
|
||||||
|
thumbprint,
|
||||||
|
],
|
||||||
|
)
|
||||||
|
|
||||||
|
policy_document = "{\"Version\":\"2012-10-17\",\"Statement\":[{\"Effect\":\"Allow\",\"Principal\":{\"Federated\":[\""+oidc_response["OpenIDConnectProviderArn"]+"\"]},\"Action\":[\"sts:AssumeRoleWithWebIdentity\"],\"Condition\":{\"StringEquals\":{\"localhost:8080/auth/realms/"+realm+":app_id\":\""+aud+"\"}}}]}"
|
||||||
|
(role_error,role_response,general_role_name)=create_role(iam_client,'/',None,policy_document,None,None,None)
|
||||||
|
eq(role_response['Role']['Arn'],'arn:aws:iam:::role/'+general_role_name+'')
|
||||||
|
|
||||||
|
role_policy = "{\"Version\":\"2012-10-17\",\"Statement\":{\"Effect\":\"Allow\",\"Action\":\"s3:*\",\"Resource\":\"arn:aws:s3:::*\"}}"
|
||||||
|
(role_err,response)=put_role_policy(iam_client,general_role_name,None,role_policy)
|
||||||
|
eq(response['ResponseMetadata']['HTTPStatusCode'],200)
|
||||||
|
|
||||||
|
resp=""
|
||||||
|
try:
|
||||||
|
resp=sts_client.assume_role_with_web_identity(RoleArn=role_response['Role']['Arn'],RoleSessionName=role_session_name,WebIdentityToken='abcdef')
|
||||||
|
except InvalidIdentityTokenException as e:
|
||||||
|
log.debug('{}'.format(resp))
|
||||||
|
log.debug('{}'.format(e.response.get("Error", {}).get("Code")))
|
||||||
|
log.debug('{}'.format(e))
|
||||||
|
resp_error = e.response.get("Error", {}).get("Code")
|
||||||
|
eq(resp_error,'AccessDenied')
|
||||||
|
|
||||||
|
oidc_remove=iam_client.delete_open_id_connect_provider(
|
||||||
|
OpenIDConnectProviderArn=oidc_response["OpenIDConnectProviderArn"]
|
||||||
|
)
|
||||||
|
'''
|
||||||
|
|
Loading…
Reference in a new issue