diff --git a/s3tests_boto3/functional/test_sts.py b/s3tests_boto3/functional/test_sts.py index fb4ab11..78c6901 100644 --- a/s3tests_boto3/functional/test_sts.py +++ b/s3tests_boto3/functional/test_sts.py @@ -257,6 +257,89 @@ def test_assume_role_creds_expiry(): s3bucket_error = e.response.get("Error", {}).get("Code") eq(s3bucket_error,'AccessDenied') +@attr(resource='assume role') +@attr(method='head') +@attr(operation='check') +@attr(assertion='HEAD fails with 403 when role policy denies s3:ListBucket') +@attr('test_of_sts') +def test_assume_role_deny_head_nonexistent(): + # create a bucket with the normal s3 client + bucket_name = get_new_bucket_name() + get_client().create_bucket(Bucket=bucket_name) + + iam_client=get_iam_client() + sts_client=get_sts_client() + sts_user_id=get_alt_user_id() + default_endpoint=get_config_endpoint() + role_session_name=get_parameter_name() + + policy_document = '{"Version":"2012-10-17","Statement":[{"Effect":"Allow","Principal":{"AWS":["arn:aws:iam:::user/'+sts_user_id+'"]},"Action":["sts:AssumeRole"]}]}' + (role_error,role_response,general_role_name)=create_role(iam_client,'/',None,policy_document,None,None,None) + eq(role_response['Role']['Arn'],'arn:aws:iam:::role/'+general_role_name) + + # allow GetObject but deny ListBucket + role_policy = '{"Version":"2012-10-17","Statement":{"Effect":"Allow","Action":"s3:GetObject","Principal":"*","Resource":"arn:aws:s3:::*"}}' + (role_err,response)=put_role_policy(iam_client,general_role_name,None,role_policy) + eq(response['ResponseMetadata']['HTTPStatusCode'],200) + + resp=sts_client.assume_role(RoleArn=role_response['Role']['Arn'],RoleSessionName=role_session_name) + eq(resp['ResponseMetadata']['HTTPStatusCode'],200) + + s3_client = boto3.client('s3', + aws_access_key_id = resp['Credentials']['AccessKeyId'], + aws_secret_access_key = resp['Credentials']['SecretAccessKey'], + aws_session_token = resp['Credentials']['SessionToken'], + endpoint_url=default_endpoint, + region_name='') + status=200 + try: + s3_client.head_object(Bucket=bucket_name, Key='nonexistent') + except ClientError as e: + status = e.response['ResponseMetadata']['HTTPStatusCode'] + eq(status,403) + +@attr(resource='assume role') +@attr(method='head') +@attr(operation='check') +@attr(assertion='HEAD fails with 404 when role policy allows s3:ListBucket') +@attr('test_of_sts') +def test_assume_role_allow_head_nonexistent(): + # create a bucket with the normal s3 client + bucket_name = get_new_bucket_name() + get_client().create_bucket(Bucket=bucket_name) + + iam_client=get_iam_client() + sts_client=get_sts_client() + sts_user_id=get_alt_user_id() + default_endpoint=get_config_endpoint() + role_session_name=get_parameter_name() + + policy_document = '{"Version":"2012-10-17","Statement":[{"Effect":"Allow","Principal":{"AWS":["arn:aws:iam:::user/'+sts_user_id+'"]},"Action":["sts:AssumeRole"]}]}' + (role_error,role_response,general_role_name)=create_role(iam_client,'/',None,policy_document,None,None,None) + eq(role_response['Role']['Arn'],'arn:aws:iam:::role/'+general_role_name) + + # allow GetObject and ListBucket + role_policy = '{"Version":"2012-10-17","Statement":{"Effect":"Allow","Action":["s3:GetObject","s3:ListBucket"],"Principal":"*","Resource":"arn:aws:s3:::*"}}' + (role_err,response)=put_role_policy(iam_client,general_role_name,None,role_policy) + eq(response['ResponseMetadata']['HTTPStatusCode'],200) + + resp=sts_client.assume_role(RoleArn=role_response['Role']['Arn'],RoleSessionName=role_session_name) + eq(resp['ResponseMetadata']['HTTPStatusCode'],200) + + s3_client = boto3.client('s3', + aws_access_key_id = resp['Credentials']['AccessKeyId'], + aws_secret_access_key = resp['Credentials']['SecretAccessKey'], + aws_session_token = resp['Credentials']['SessionToken'], + endpoint_url=default_endpoint, + region_name='') + status=200 + try: + s3_client.head_object(Bucket=bucket_name, Key='nonexistent') + except ClientError as e: + status = e.response['ResponseMetadata']['HTTPStatusCode'] + eq(status,404) + + @attr(resource='assume role with web identity') @attr(method='get') @attr(operation='check')