Merge pull request #484 from yuvalif/fix-sts-error-handling

better error handling in the STS tests
This commit is contained in:
Casey Bodley 2023-02-20 11:48:41 -05:00 committed by GitHub
commit 43b957792b
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
2 changed files with 56 additions and 11 deletions

View file

@ -52,6 +52,15 @@ You can run only the boto3 tests with::
This section contains some basic tests for the AssumeRole, GetSessionToken and AssumeRoleWithWebIdentity API's. The test file is located under ``s3tests_boto3/functional``.
To run the STS tests, the vstart cluster should be started with the following parameter (in addition to any parameters already used with it)::
vstart.sh -o rgw_sts_key=abcdefghijklmnop -o rgw_s3_auth_use_sts=true
Note that the ``rgw_sts_key`` can be set to anything that is 128 bits in length.
After the cluster is up the following command should be executed::
radosgw-admin caps add --tenant=testx --uid="9876543210abcdef0123456789abcdef0123456789abcdef0123456789abcdef" --caps="roles=*"
You can run only the sts tests (all the three API's) with::
S3TEST_CONF=your.conf tox s3tests_boto3/functional/test_sts.py

View file

@ -56,6 +56,7 @@ log = logging.getLogger(__name__)
def create_role(iam_client,path,rolename,policy_document,description,sessionduration,permissionboundary,tag_list=None):
role_err=None
role_response = None
if rolename is None:
rolename=get_parameter_name()
if tag_list is None:
@ -68,6 +69,7 @@ def create_role(iam_client,path,rolename,policy_document,description,sessiondura
def put_role_policy(iam_client,rolename,policyname,role_policy):
role_err=None
role_response = None
if policyname is None:
policyname=get_parameter_name()
try:
@ -78,6 +80,7 @@ def put_role_policy(iam_client,rolename,policyname,role_policy):
def put_user_policy(iam_client,username,policyname,policy_document):
role_err=None
role_response = None
if policyname is None:
policyname=get_parameter_name()
try:
@ -222,11 +225,17 @@ def test_assume_role_allow():
policy_document = "{\"Version\":\"2012-10-17\",\"Statement\":[{\"Effect\":\"Allow\",\"Principal\":{\"AWS\":[\"arn:aws:iam:::user/"+sts_user_id+"\"]},\"Action\":[\"sts:AssumeRole\"]}]}"
(role_error,role_response,general_role_name)=create_role(iam_client,'/',None,policy_document,None,None,None)
if role_response:
assert role_response['Role']['Arn'] == 'arn:aws:iam:::role/'+general_role_name+''
else:
assert False, role_error
role_policy = "{\"Version\":\"2012-10-17\",\"Statement\":{\"Effect\":\"Allow\",\"Action\":\"s3:*\",\"Resource\":\"arn:aws:s3:::*\"}}"
(role_err,response)=put_role_policy(iam_client,general_role_name,None,role_policy)
if response:
assert response['ResponseMetadata']['HTTPStatusCode'] == 200
else:
assert False, role_err
resp=sts_client.assume_role(RoleArn=role_response['Role']['Arn'],RoleSessionName=role_session_name)
assert resp['ResponseMetadata']['HTTPStatusCode'] == 200
@ -256,11 +265,17 @@ def test_assume_role_deny():
policy_document = "{\"Version\":\"2012-10-17\",\"Statement\":[{\"Effect\":\"Allow\",\"Principal\":{\"AWS\":[\"arn:aws:iam:::user/"+sts_user_id+"\"]},\"Action\":[\"sts:AssumeRole\"]}]}"
(role_error,role_response,general_role_name)=create_role(iam_client,'/',None,policy_document,None,None,None)
if role_response:
assert role_response['Role']['Arn'] == 'arn:aws:iam:::role/'+general_role_name+''
else:
assert False, role_error
role_policy = "{\"Version\":\"2012-10-17\",\"Statement\":{\"Effect\":\"Deny\",\"Action\":\"s3:*\",\"Resource\":\"arn:aws:s3:::*\"}}"
(role_err,response)=put_role_policy(iam_client,general_role_name,None,role_policy)
if response:
assert response['ResponseMetadata']['HTTPStatusCode'] == 200
else:
assert False, role_err
resp=sts_client.assume_role(RoleArn=role_response['Role']['Arn'],RoleSessionName=role_session_name)
assert resp['ResponseMetadata']['HTTPStatusCode'] == 200
@ -290,11 +305,17 @@ def test_assume_role_creds_expiry():
policy_document = "{\"Version\":\"2012-10-17\",\"Statement\":[{\"Effect\":\"Allow\",\"Principal\":{\"AWS\":[\"arn:aws:iam:::user/"+sts_user_id+"\"]},\"Action\":[\"sts:AssumeRole\"]}]}"
(role_error,role_response,general_role_name)=create_role(iam_client,'/',None,policy_document,None,None,None)
if role_response:
assert role_response['Role']['Arn'] == 'arn:aws:iam:::role/'+general_role_name+''
else:
assert False, role_error
role_policy = "{\"Version\":\"2012-10-17\",\"Statement\":{\"Effect\":\"Allow\",\"Action\":\"s3:*\",\"Resource\":\"arn:aws:s3:::*\"}}"
(role_err,response)=put_role_policy(iam_client,general_role_name,None,role_policy)
if response:
assert response['ResponseMetadata']['HTTPStatusCode'] == 200
else:
assert False, role_err
resp=sts_client.assume_role(RoleArn=role_response['Role']['Arn'],RoleSessionName=role_session_name,DurationSeconds=900)
assert resp['ResponseMetadata']['HTTPStatusCode'] == 200
@ -329,12 +350,18 @@ def test_assume_role_deny_head_nonexistent():
policy_document = '{"Version":"2012-10-17","Statement":[{"Effect":"Allow","Principal":{"AWS":["arn:aws:iam:::user/'+sts_user_id+'"]},"Action":["sts:AssumeRole"]}]}'
(role_error,role_response,general_role_name)=create_role(iam_client,'/',None,policy_document,None,None,None)
if role_response:
assert role_response['Role']['Arn'] == 'arn:aws:iam:::role/'+general_role_name
else:
assert False, role_error
# allow GetObject but deny ListBucket
role_policy = '{"Version":"2012-10-17","Statement":{"Effect":"Allow","Action":"s3:GetObject","Principal":"*","Resource":"arn:aws:s3:::*"}}'
(role_err,response)=put_role_policy(iam_client,general_role_name,None,role_policy)
if response:
assert response['ResponseMetadata']['HTTPStatusCode'] == 200
else:
assert False, role_err
resp=sts_client.assume_role(RoleArn=role_response['Role']['Arn'],RoleSessionName=role_session_name)
assert resp['ResponseMetadata']['HTTPStatusCode'] == 200
@ -367,12 +394,18 @@ def test_assume_role_allow_head_nonexistent():
policy_document = '{"Version":"2012-10-17","Statement":[{"Effect":"Allow","Principal":{"AWS":["arn:aws:iam:::user/'+sts_user_id+'"]},"Action":["sts:AssumeRole"]}]}'
(role_error,role_response,general_role_name)=create_role(iam_client,'/',None,policy_document,None,None,None)
if role_response:
assert role_response['Role']['Arn'] == 'arn:aws:iam:::role/'+general_role_name
else:
assert False, role_error
# allow GetObject and ListBucket
role_policy = '{"Version":"2012-10-17","Statement":{"Effect":"Allow","Action":["s3:GetObject","s3:ListBucket"],"Principal":"*","Resource":"arn:aws:s3:::*"}}'
(role_err,response)=put_role_policy(iam_client,general_role_name,None,role_policy)
if response:
assert response['ResponseMetadata']['HTTPStatusCode'] == 200
else:
assert False, role_err
resp=sts_client.assume_role(RoleArn=role_response['Role']['Arn'],RoleSessionName=role_session_name)
assert resp['ResponseMetadata']['HTTPStatusCode'] == 200
@ -418,7 +451,10 @@ def test_assume_role_with_web_identity():
role_policy = "{\"Version\":\"2012-10-17\",\"Statement\":{\"Effect\":\"Allow\",\"Action\":\"s3:*\",\"Resource\":\"arn:aws:s3:::*\"}}"
(role_err,response)=put_role_policy(iam_client,general_role_name,None,role_policy)
if response:
assert response['ResponseMetadata']['HTTPStatusCode'] == 200
else:
assert False, role_err
resp=sts_client.assume_role_with_web_identity(RoleArn=role_response['Role']['Arn'],RoleSessionName=role_session_name,WebIdentityToken=token)
assert resp['ResponseMetadata']['HTTPStatusCode'] == 200