import boto3 import botocore.session from botocore.exceptions import ClientError from botocore.exceptions import ParamValidationError from nose.tools import eq_ as eq from nose.plugins.attrib import attr from nose.plugins.skip import SkipTest import isodate import email.utils import datetime import threading import re import pytz from collections import OrderedDict import requests import json import base64 import hmac import hashlib import xml.etree.ElementTree as ET import time import operator import nose import os import string import random import socket import ssl import logging from collections import namedtuple from email.header import decode_header from . import( get_iam_client, get_sts_client, get_client, get_alt_user_id, get_config_endpoint, get_new_bucket_name, get_parameter_name, get_main_aws_access_key, get_main_aws_secret_key, get_thumbprint, get_aud, get_token, get_realm_name, check_webidentity, get_iam_access_key, get_iam_secret_key ) log = logging.getLogger(__name__) def create_role(iam_client,path,rolename,policy_document,description,sessionduration,permissionboundary): role_err=None if rolename is None: rolename=get_parameter_name() try: role_response = iam_client.create_role(Path=path,RoleName=rolename,AssumeRolePolicyDocument=policy_document,) except ClientError as e: role_err = e.response['Code'] return (role_err,role_response,rolename) def put_role_policy(iam_client,rolename,policyname,role_policy): role_err=None if policyname is None: policyname=get_parameter_name() try: role_response = iam_client.put_role_policy(RoleName=rolename,PolicyName=policyname,PolicyDocument=role_policy) except ClientError as e: role_err = e.response['Code'] return (role_err,role_response) def put_user_policy(iam_client,username,policyname,policy_document): role_err=None if policyname is None: policyname=get_parameter_name() try: role_response = iam_client.put_user_policy(UserName=username,PolicyName=policyname,PolicyDocument=policy_document) except ClientError as e: role_err = e.response['Code'] return (role_err,role_response) def get_s3_client_using_iam_creds(): iam_access_key = get_iam_access_key() iam_secret_key = get_iam_secret_key() default_endpoint = get_config_endpoint() s3_client_iam_creds = boto3.client('s3', aws_access_key_id = iam_access_key, aws_secret_access_key = iam_secret_key, endpoint_url=default_endpoint, region_name='', ) return s3_client_iam_creds def create_oidc_provider(iam_client, url, clientidlist, thumbprintlist): oidc_arn = None oidc_error = None clientids = [] if clientidlist is None: clientidlist=clientids try: oidc_response = iam_client.create_open_id_connect_provider( Url=url, ClientIDList=clientidlist, ThumbprintList=thumbprintlist, ) oidc_arn = oidc_response['OpenIDConnectProviderArn'] print (oidc_arn) except ClientError as e: oidc_error = e.response['Code'] print (oidc_error) try: oidc_error = None print (url) if url.startswith('http://'): url = url[len('http://'):] elif url.startswith('https://'): url = url[len('https://'):] elif url.startswith('www.'): url = url[len('www.'):] oidc_arn = 'arn:aws:iam:::oidc-provider/{}'.format(url) print (url) print (oidc_arn) oidc_response = iam_client.get_open_id_connect_provider(OpenIDConnectProviderArn=oidc_arn) except ClientError as e: oidc_arn = None return (oidc_arn, oidc_error) @attr(resource='get session token') @attr(method='get') @attr(operation='check') @attr(assertion='s3 ops only accessible by temporary credentials') @attr('test_of_sts') def test_get_session_token(): iam_client=get_iam_client() sts_client=get_sts_client() sts_user_id=get_alt_user_id() default_endpoint=get_config_endpoint() user_policy = "{\"Version\":\"2012-10-17\",\"Statement\":[{\"Effect\":\"Deny\",\"Action\":\"s3:*\",\"Resource\":[\"*\"],\"Condition\":{\"BoolIfExists\":{\"sts:authentication\":\"false\"}}},{\"Effect\":\"Allow\",\"Action\":\"sts:GetSessionToken\",\"Resource\":\"*\",\"Condition\":{\"BoolIfExists\":{\"sts:authentication\":\"false\"}}}]}" (resp_err,resp)=put_user_policy(iam_client,sts_user_id,None,user_policy) eq(resp['ResponseMetadata']['HTTPStatusCode'],200) response=sts_client.get_session_token() eq(response['ResponseMetadata']['HTTPStatusCode'],200) s3_client=boto3.client('s3', aws_access_key_id = response['Credentials']['AccessKeyId'], aws_secret_access_key = response['Credentials']['SecretAccessKey'], aws_session_token = response['Credentials']['SessionToken'], endpoint_url=default_endpoint, region_name='', ) bucket_name = get_new_bucket_name() s3bucket = s3_client.create_bucket(Bucket=bucket_name) eq(s3bucket['ResponseMetadata']['HTTPStatusCode'],200) finish=s3_client.delete_bucket(Bucket=bucket_name) @attr(resource='get session token') @attr(method='get') @attr(operation='check') @attr(assertion='s3 ops denied by permanent credentials') @attr('test_of_sts') def test_get_session_token_permanent_creds_denied(): s3bucket_error=None iam_client=get_iam_client() sts_client=get_sts_client() sts_user_id=get_alt_user_id() default_endpoint=get_config_endpoint() s3_main_access_key=get_main_aws_access_key() s3_main_secret_key=get_main_aws_secret_key() user_policy = "{\"Version\":\"2012-10-17\",\"Statement\":[{\"Effect\":\"Deny\",\"Action\":\"s3:*\",\"Resource\":[\"*\"],\"Condition\":{\"BoolIfExists\":{\"sts:authentication\":\"false\"}}},{\"Effect\":\"Allow\",\"Action\":\"sts:GetSessionToken\",\"Resource\":\"*\",\"Condition\":{\"BoolIfExists\":{\"sts:authentication\":\"false\"}}}]}" (resp_err,resp)=put_user_policy(iam_client,sts_user_id,None,user_policy) eq(resp['ResponseMetadata']['HTTPStatusCode'],200) response=sts_client.get_session_token() eq(response['ResponseMetadata']['HTTPStatusCode'],200) s3_client=boto3.client('s3', aws_access_key_id = s3_main_access_key, aws_secret_access_key = s3_main_secret_key, aws_session_token = response['Credentials']['SessionToken'], endpoint_url=default_endpoint, region_name='', ) bucket_name = get_new_bucket_name() try: s3bucket = s3_client.create_bucket(Bucket=bucket_name) except ClientError as e: s3bucket_error = e.response.get("Error", {}).get("Code") eq(s3bucket_error,'AccessDenied') @attr(resource='assume role') @attr(method='get') @attr(operation='check') @attr(assertion='role policy allows all s3 ops') @attr('test_of_sts') def test_assume_role_allow(): iam_client=get_iam_client() sts_client=get_sts_client() sts_user_id=get_alt_user_id() default_endpoint=get_config_endpoint() role_session_name=get_parameter_name() policy_document = "{\"Version\":\"2012-10-17\",\"Statement\":[{\"Effect\":\"Allow\",\"Principal\":{\"AWS\":[\"arn:aws:iam:::user/"+sts_user_id+"\"]},\"Action\":[\"sts:AssumeRole\"]}]}" (role_error,role_response,general_role_name)=create_role(iam_client,'/',None,policy_document,None,None,None) eq(role_response['Role']['Arn'],'arn:aws:iam:::role/'+general_role_name+'') role_policy = "{\"Version\":\"2012-10-17\",\"Statement\":{\"Effect\":\"Allow\",\"Action\":\"s3:*\",\"Resource\":\"arn:aws:s3:::*\"}}" (role_err,response)=put_role_policy(iam_client,general_role_name,None,role_policy) eq(response['ResponseMetadata']['HTTPStatusCode'],200) resp=sts_client.assume_role(RoleArn=role_response['Role']['Arn'],RoleSessionName=role_session_name) eq(resp['ResponseMetadata']['HTTPStatusCode'],200) s3_client = boto3.client('s3', aws_access_key_id = resp['Credentials']['AccessKeyId'], aws_secret_access_key = resp['Credentials']['SecretAccessKey'], aws_session_token = resp['Credentials']['SessionToken'], endpoint_url=default_endpoint, region_name='', ) bucket_name = get_new_bucket_name() s3bucket = s3_client.create_bucket(Bucket=bucket_name) eq(s3bucket['ResponseMetadata']['HTTPStatusCode'],200) bkt = s3_client.delete_bucket(Bucket=bucket_name) eq(bkt['ResponseMetadata']['HTTPStatusCode'],204) @attr(resource='assume role') @attr(method='get') @attr(operation='check') @attr(assertion='role policy denies all s3 ops') @attr('test_of_sts') def test_assume_role_deny(): s3bucket_error=None iam_client=get_iam_client() sts_client=get_sts_client() sts_user_id=get_alt_user_id() default_endpoint=get_config_endpoint() role_session_name=get_parameter_name() policy_document = "{\"Version\":\"2012-10-17\",\"Statement\":[{\"Effect\":\"Allow\",\"Principal\":{\"AWS\":[\"arn:aws:iam:::user/"+sts_user_id+"\"]},\"Action\":[\"sts:AssumeRole\"]}]}" (role_error,role_response,general_role_name)=create_role(iam_client,'/',None,policy_document,None,None,None) eq(role_response['Role']['Arn'],'arn:aws:iam:::role/'+general_role_name+'') role_policy = "{\"Version\":\"2012-10-17\",\"Statement\":{\"Effect\":\"Deny\",\"Action\":\"s3:*\",\"Resource\":\"arn:aws:s3:::*\"}}" (role_err,response)=put_role_policy(iam_client,general_role_name,None,role_policy) eq(response['ResponseMetadata']['HTTPStatusCode'],200) resp=sts_client.assume_role(RoleArn=role_response['Role']['Arn'],RoleSessionName=role_session_name) eq(resp['ResponseMetadata']['HTTPStatusCode'],200) s3_client = boto3.client('s3', aws_access_key_id = resp['Credentials']['AccessKeyId'], aws_secret_access_key = resp['Credentials']['SecretAccessKey'], aws_session_token = resp['Credentials']['SessionToken'], endpoint_url=default_endpoint, region_name='', ) bucket_name = get_new_bucket_name() try: s3bucket = s3_client.create_bucket(Bucket=bucket_name) except ClientError as e: s3bucket_error = e.response.get("Error", {}).get("Code") eq(s3bucket_error,'AccessDenied') @attr(resource='assume role') @attr(method='get') @attr(operation='check') @attr(assertion='creds expire so all s3 ops fails') @attr('test_of_sts') def test_assume_role_creds_expiry(): iam_client=get_iam_client() sts_client=get_sts_client() sts_user_id=get_alt_user_id() default_endpoint=get_config_endpoint() role_session_name=get_parameter_name() policy_document = "{\"Version\":\"2012-10-17\",\"Statement\":[{\"Effect\":\"Allow\",\"Principal\":{\"AWS\":[\"arn:aws:iam:::user/"+sts_user_id+"\"]},\"Action\":[\"sts:AssumeRole\"]}]}" (role_error,role_response,general_role_name)=create_role(iam_client,'/',None,policy_document,None,None,None) eq(role_response['Role']['Arn'],'arn:aws:iam:::role/'+general_role_name+'') role_policy = "{\"Version\":\"2012-10-17\",\"Statement\":{\"Effect\":\"Allow\",\"Action\":\"s3:*\",\"Resource\":\"arn:aws:s3:::*\"}}" (role_err,response)=put_role_policy(iam_client,general_role_name,None,role_policy) eq(response['ResponseMetadata']['HTTPStatusCode'],200) resp=sts_client.assume_role(RoleArn=role_response['Role']['Arn'],RoleSessionName=role_session_name,DurationSeconds=900) eq(resp['ResponseMetadata']['HTTPStatusCode'],200) time.sleep(900) s3_client = boto3.client('s3', aws_access_key_id = resp['Credentials']['AccessKeyId'], aws_secret_access_key = resp['Credentials']['SecretAccessKey'], aws_session_token = resp['Credentials']['SessionToken'], endpoint_url=default_endpoint, region_name='', ) bucket_name = get_new_bucket_name() try: s3bucket = s3_client.create_bucket(Bucket=bucket_name) except ClientError as e: s3bucket_error = e.response.get("Error", {}).get("Code") eq(s3bucket_error,'AccessDenied') @attr(resource='assume role') @attr(method='head') @attr(operation='check') @attr(assertion='HEAD fails with 403 when role policy denies s3:ListBucket') @attr('test_of_sts') def test_assume_role_deny_head_nonexistent(): # create a bucket with the normal s3 client bucket_name = get_new_bucket_name() get_client().create_bucket(Bucket=bucket_name) iam_client=get_iam_client() sts_client=get_sts_client() sts_user_id=get_alt_user_id() default_endpoint=get_config_endpoint() role_session_name=get_parameter_name() policy_document = '{"Version":"2012-10-17","Statement":[{"Effect":"Allow","Principal":{"AWS":["arn:aws:iam:::user/'+sts_user_id+'"]},"Action":["sts:AssumeRole"]}]}' (role_error,role_response,general_role_name)=create_role(iam_client,'/',None,policy_document,None,None,None) eq(role_response['Role']['Arn'],'arn:aws:iam:::role/'+general_role_name) # allow GetObject but deny ListBucket role_policy = '{"Version":"2012-10-17","Statement":{"Effect":"Allow","Action":"s3:GetObject","Principal":"*","Resource":"arn:aws:s3:::*"}}' (role_err,response)=put_role_policy(iam_client,general_role_name,None,role_policy) eq(response['ResponseMetadata']['HTTPStatusCode'],200) resp=sts_client.assume_role(RoleArn=role_response['Role']['Arn'],RoleSessionName=role_session_name) eq(resp['ResponseMetadata']['HTTPStatusCode'],200) s3_client = boto3.client('s3', aws_access_key_id = resp['Credentials']['AccessKeyId'], aws_secret_access_key = resp['Credentials']['SecretAccessKey'], aws_session_token = resp['Credentials']['SessionToken'], endpoint_url=default_endpoint, region_name='') status=200 try: s3_client.head_object(Bucket=bucket_name, Key='nonexistent') except ClientError as e: status = e.response['ResponseMetadata']['HTTPStatusCode'] eq(status,403) @attr(resource='assume role') @attr(method='head') @attr(operation='check') @attr(assertion='HEAD fails with 404 when role policy allows s3:ListBucket') @attr('test_of_sts') def test_assume_role_allow_head_nonexistent(): # create a bucket with the normal s3 client bucket_name = get_new_bucket_name() get_client().create_bucket(Bucket=bucket_name) iam_client=get_iam_client() sts_client=get_sts_client() sts_user_id=get_alt_user_id() default_endpoint=get_config_endpoint() role_session_name=get_parameter_name() policy_document = '{"Version":"2012-10-17","Statement":[{"Effect":"Allow","Principal":{"AWS":["arn:aws:iam:::user/'+sts_user_id+'"]},"Action":["sts:AssumeRole"]}]}' (role_error,role_response,general_role_name)=create_role(iam_client,'/',None,policy_document,None,None,None) eq(role_response['Role']['Arn'],'arn:aws:iam:::role/'+general_role_name) # allow GetObject and ListBucket role_policy = '{"Version":"2012-10-17","Statement":{"Effect":"Allow","Action":["s3:GetObject","s3:ListBucket"],"Principal":"*","Resource":"arn:aws:s3:::*"}}' (role_err,response)=put_role_policy(iam_client,general_role_name,None,role_policy) eq(response['ResponseMetadata']['HTTPStatusCode'],200) resp=sts_client.assume_role(RoleArn=role_response['Role']['Arn'],RoleSessionName=role_session_name) eq(resp['ResponseMetadata']['HTTPStatusCode'],200) s3_client = boto3.client('s3', aws_access_key_id = resp['Credentials']['AccessKeyId'], aws_secret_access_key = resp['Credentials']['SecretAccessKey'], aws_session_token = resp['Credentials']['SessionToken'], endpoint_url=default_endpoint, region_name='') status=200 try: s3_client.head_object(Bucket=bucket_name, Key='nonexistent') except ClientError as e: status = e.response['ResponseMetadata']['HTTPStatusCode'] eq(status,404) @attr(resource='assume role with web identity') @attr(method='get') @attr(operation='check') @attr(assertion='assuming role through web token') @attr('webidentity_test') def test_assume_role_with_web_identity(): check_webidentity() iam_client=get_iam_client() sts_client=get_sts_client() default_endpoint=get_config_endpoint() role_session_name=get_parameter_name() thumbprint=get_thumbprint() aud=get_aud() token=get_token() realm=get_realm_name() oidc_response = iam_client.create_open_id_connect_provider( Url='http://localhost:8080/auth/realms/{}'.format(realm), ThumbprintList=[ thumbprint, ], ) policy_document = "{\"Version\":\"2012-10-17\",\"Statement\":[{\"Effect\":\"Allow\",\"Principal\":{\"Federated\":[\""+oidc_response["OpenIDConnectProviderArn"]+"\"]},\"Action\":[\"sts:AssumeRoleWithWebIdentity\"],\"Condition\":{\"StringEquals\":{\"localhost:8080/auth/realms/"+realm+":app_id\":\""+aud+"\"}}}]}" (role_error,role_response,general_role_name)=create_role(iam_client,'/',None,policy_document,None,None,None) eq(role_response['Role']['Arn'],'arn:aws:iam:::role/'+general_role_name+'') role_policy = "{\"Version\":\"2012-10-17\",\"Statement\":{\"Effect\":\"Allow\",\"Action\":\"s3:*\",\"Resource\":\"arn:aws:s3:::*\"}}" (role_err,response)=put_role_policy(iam_client,general_role_name,None,role_policy) eq(response['ResponseMetadata']['HTTPStatusCode'],200) resp=sts_client.assume_role_with_web_identity(RoleArn=role_response['Role']['Arn'],RoleSessionName=role_session_name,WebIdentityToken=token) eq(resp['ResponseMetadata']['HTTPStatusCode'],200) s3_client = boto3.client('s3', aws_access_key_id = resp['Credentials']['AccessKeyId'], aws_secret_access_key = resp['Credentials']['SecretAccessKey'], aws_session_token = resp['Credentials']['SessionToken'], endpoint_url=default_endpoint, region_name='', ) bucket_name = get_new_bucket_name() s3bucket = s3_client.create_bucket(Bucket=bucket_name) eq(s3bucket['ResponseMetadata']['HTTPStatusCode'],200) bkt = s3_client.delete_bucket(Bucket=bucket_name) eq(bkt['ResponseMetadata']['HTTPStatusCode'],204) oidc_remove=iam_client.delete_open_id_connect_provider( OpenIDConnectProviderArn=oidc_response["OpenIDConnectProviderArn"] ) ''' @attr(resource='assume role with web identity') @attr(method='get') @attr(operation='check') @attr(assertion='assume_role_with_web_token creds expire') @attr('webidentity_test') def test_assume_role_with_web_identity_invalid_webtoken(): resp_error=None iam_client=get_iam_client() sts_client=get_sts_client() default_endpoint=get_config_endpoint() role_session_name=get_parameter_name() thumbprint=get_thumbprint() aud=get_aud() token=get_token() realm=get_realm_name() oidc_response = iam_client.create_open_id_connect_provider( Url='http://localhost:8080/auth/realms/{}'.format(realm), ThumbprintList=[ thumbprint, ], ) policy_document = "{\"Version\":\"2012-10-17\",\"Statement\":[{\"Effect\":\"Allow\",\"Principal\":{\"Federated\":[\""+oidc_response["OpenIDConnectProviderArn"]+"\"]},\"Action\":[\"sts:AssumeRoleWithWebIdentity\"],\"Condition\":{\"StringEquals\":{\"localhost:8080/auth/realms/"+realm+":app_id\":\""+aud+"\"}}}]}" (role_error,role_response,general_role_name)=create_role(iam_client,'/',None,policy_document,None,None,None) eq(role_response['Role']['Arn'],'arn:aws:iam:::role/'+general_role_name+'') role_policy = "{\"Version\":\"2012-10-17\",\"Statement\":{\"Effect\":\"Allow\",\"Action\":\"s3:*\",\"Resource\":\"arn:aws:s3:::*\"}}" (role_err,response)=put_role_policy(iam_client,general_role_name,None,role_policy) eq(response['ResponseMetadata']['HTTPStatusCode'],200) resp="" try: resp=sts_client.assume_role_with_web_identity(RoleArn=role_response['Role']['Arn'],RoleSessionName=role_session_name,WebIdentityToken='abcdef') except InvalidIdentityTokenException as e: log.debug('{}'.format(resp)) log.debug('{}'.format(e.response.get("Error", {}).get("Code"))) log.debug('{}'.format(e)) resp_error = e.response.get("Error", {}).get("Code") eq(resp_error,'AccessDenied') oidc_remove=iam_client.delete_open_id_connect_provider( OpenIDConnectProviderArn=oidc_response["OpenIDConnectProviderArn"] ) ''' ####################### # Session Policy Tests ####################### @attr(resource='assume role with web identity') @attr(method='get') @attr(operation='check') @attr(assertion='checking session policy working for two different buckets') @attr('webidentity_test') @attr('session_policy') def test_session_policy_check_on_different_buckets(): check_webidentity() iam_client=get_iam_client() sts_client=get_sts_client() default_endpoint=get_config_endpoint() role_session_name=get_parameter_name() thumbprint=get_thumbprint() aud=get_aud() token=get_token() realm=get_realm_name() url = 'http://localhost:8080/auth/realms/{}'.format(realm) thumbprintlist = [thumbprint] (oidc_arn,oidc_error) = create_oidc_provider(iam_client, url, None, thumbprintlist) if oidc_error is not None: raise RuntimeError('Unable to create/get openid connect provider {}'.format(oidc_error)) policy_document = "{\"Version\":\"2012-10-17\",\"Statement\":[{\"Effect\":\"Allow\",\"Principal\":{\"Federated\":[\""+oidc_arn+"\"]},\"Action\":[\"sts:AssumeRoleWithWebIdentity\"],\"Condition\":{\"StringEquals\":{\"localhost:8080/auth/realms/"+realm+":app_id\":\""+aud+"\"}}}]}" (role_error,role_response,general_role_name)=create_role(iam_client,'/',None,policy_document,None,None,None) eq(role_response['Role']['Arn'],'arn:aws:iam:::role/'+general_role_name+'') role_policy_new = "{\"Version\":\"2012-10-17\",\"Statement\":{\"Effect\":\"Allow\",\"Action\":\"s3:*\",\"Resource\":[\"arn:aws:s3:::test2\",\"arn:aws:s3:::test2/*\"]}}" (role_err,response)=put_role_policy(iam_client,general_role_name,None,role_policy_new) eq(response['ResponseMetadata']['HTTPStatusCode'],200) session_policy = "{\"Version\":\"2012-10-17\",\"Statement\":{\"Effect\":\"Allow\",\"Action\":[\"s3:GetObject\",\"s3:PutObject\"],\"Resource\":[\"arn:aws:s3:::test1\",\"arn:aws:s3:::test1/*\"]}}" resp=sts_client.assume_role_with_web_identity(RoleArn=role_response['Role']['Arn'],RoleSessionName=role_session_name,WebIdentityToken=token,Policy=session_policy) eq(resp['ResponseMetadata']['HTTPStatusCode'],200) s3_client = boto3.client('s3', aws_access_key_id = resp['Credentials']['AccessKeyId'], aws_secret_access_key = resp['Credentials']['SecretAccessKey'], aws_session_token = resp['Credentials']['SessionToken'], endpoint_url=default_endpoint, region_name='', ) bucket_name_1 = 'test1' try: s3bucket = s3_client.create_bucket(Bucket=bucket_name_1) except ClientError as e: s3bucket_error = e.response.get("Error", {}).get("Code") eq(s3bucket_error, 'AccessDenied') bucket_name_2 = 'test2' try: s3bucket = s3_client.create_bucket(Bucket=bucket_name_2) except ClientError as e: s3bucket_error = e.response.get("Error", {}).get("Code") eq(s3bucket_error, 'AccessDenied') bucket_body = 'please-write-something' #body.encode(encoding='utf_8') try: s3_put_obj = s3_client.put_object(Body=bucket_body, Bucket=bucket_name_1, Key="test-1.txt") except ClientError as e: s3_put_obj_error = e.response.get("Error", {}).get("Code") eq(s3_put_obj_error,'NoSuchBucket') oidc_remove=iam_client.delete_open_id_connect_provider( OpenIDConnectProviderArn=oidc_arn ) @attr(resource='assume role with web identity') @attr(method='put') @attr(operation='check') @attr(assertion='checking session policy working for same bucket') @attr('webidentity_test') @attr('session_policy') def test_session_policy_check_on_same_bucket(): check_webidentity() iam_client=get_iam_client() sts_client=get_sts_client() default_endpoint=get_config_endpoint() role_session_name=get_parameter_name() thumbprint=get_thumbprint() aud=get_aud() token=get_token() realm=get_realm_name() url = 'http://localhost:8080/auth/realms/{}'.format(realm) thumbprintlist = [thumbprint] (oidc_arn,oidc_error) = create_oidc_provider(iam_client, url, None, thumbprintlist) if oidc_error is not None: raise RuntimeError('Unable to create/get openid connect provider {}'.format(oidc_error)) policy_document = "{\"Version\":\"2012-10-17\",\"Statement\":[{\"Effect\":\"Allow\",\"Principal\":{\"Federated\":[\""+oidc_arn+"\"]},\"Action\":[\"sts:AssumeRoleWithWebIdentity\"],\"Condition\":{\"StringEquals\":{\"localhost:8080/auth/realms/"+realm+":app_id\":\""+aud+"\"}}}]}" (role_error,role_response,general_role_name)=create_role(iam_client,'/',None,policy_document,None,None,None) eq(role_response['Role']['Arn'],'arn:aws:iam:::role/'+general_role_name+'') role_policy_new = "{\"Version\":\"2012-10-17\",\"Statement\":{\"Effect\":\"Allow\",\"Action\":\"s3:*\",\"Resource\":[\"*\"]}}" (role_err,response)=put_role_policy(iam_client,general_role_name,None,role_policy_new) eq(response['ResponseMetadata']['HTTPStatusCode'],200) s3_client_iam_creds = get_s3_client_using_iam_creds() bucket_name_1 = 'test1' s3bucket = s3_client_iam_creds.create_bucket(Bucket=bucket_name_1) eq(s3bucket['ResponseMetadata']['HTTPStatusCode'],200) session_policy = "{\"Version\":\"2012-10-17\",\"Statement\":{\"Effect\":\"Allow\",\"Action\":[\"s3:GetObject\",\"s3:PutObject\"],\"Resource\":[\"arn:aws:s3:::test1\",\"arn:aws:s3:::test1/*\"]}}" resp=sts_client.assume_role_with_web_identity(RoleArn=role_response['Role']['Arn'],RoleSessionName=role_session_name,WebIdentityToken=token,Policy=session_policy) eq(resp['ResponseMetadata']['HTTPStatusCode'],200) s3_client = boto3.client('s3', aws_access_key_id = resp['Credentials']['AccessKeyId'], aws_secret_access_key = resp['Credentials']['SecretAccessKey'], aws_session_token = resp['Credentials']['SessionToken'], endpoint_url=default_endpoint, region_name='', ) bucket_body = 'this is a test file' s3_put_obj = s3_client.put_object(Body=bucket_body, Bucket=bucket_name_1, Key="test-1.txt") eq(s3_put_obj['ResponseMetadata']['HTTPStatusCode'],200) oidc_remove=iam_client.delete_open_id_connect_provider( OpenIDConnectProviderArn=oidc_arn ) @attr(resource='assume role with web identity') @attr(method='get') @attr(operation='check') @attr(assertion='checking put_obj op denial') @attr('webidentity_test') @attr('session_policy') def test_session_policy_check_put_obj_denial(): check_webidentity() iam_client=get_iam_client() iam_access_key=get_iam_access_key() iam_secret_key=get_iam_secret_key() sts_client=get_sts_client() default_endpoint=get_config_endpoint() role_session_name=get_parameter_name() thumbprint=get_thumbprint() aud=get_aud() token=get_token() realm=get_realm_name() url = 'http://localhost:8080/auth/realms/{}'.format(realm) thumbprintlist = [thumbprint] (oidc_arn,oidc_error) = create_oidc_provider(iam_client, url, None, thumbprintlist) if oidc_error is not None: raise RuntimeError('Unable to create/get openid connect provider {}'.format(oidc_error)) policy_document = "{\"Version\":\"2012-10-17\",\"Statement\":[{\"Effect\":\"Allow\",\"Principal\":{\"Federated\":[\""+oidc_arn+"\"]},\"Action\":[\"sts:AssumeRoleWithWebIdentity\"],\"Condition\":{\"StringEquals\":{\"localhost:8080/auth/realms/"+realm+":app_id\":\""+aud+"\"}}}]}" (role_error,role_response,general_role_name)=create_role(iam_client,'/',None,policy_document,None,None,None) eq(role_response['Role']['Arn'],'arn:aws:iam:::role/'+general_role_name+'') role_policy_new = "{\"Version\":\"2012-10-17\",\"Statement\":{\"Effect\":\"Allow\",\"Action\":\"s3:*\",\"Resource\":[\"*\"]}}" (role_err,response)=put_role_policy(iam_client,general_role_name,None,role_policy_new) eq(response['ResponseMetadata']['HTTPStatusCode'],200) s3_client_iam_creds = get_s3_client_using_iam_creds() bucket_name_1 = 'test1' s3bucket = s3_client_iam_creds.create_bucket(Bucket=bucket_name_1) eq(s3bucket['ResponseMetadata']['HTTPStatusCode'],200) session_policy = "{\"Version\":\"2012-10-17\",\"Statement\":{\"Effect\":\"Allow\",\"Action\":[\"s3:GetObject\"],\"Resource\":[\"arn:aws:s3:::test1\",\"arn:aws:s3:::test1/*\"]}}" resp=sts_client.assume_role_with_web_identity(RoleArn=role_response['Role']['Arn'],RoleSessionName=role_session_name,WebIdentityToken=token,Policy=session_policy) eq(resp['ResponseMetadata']['HTTPStatusCode'],200) s3_client = boto3.client('s3', aws_access_key_id = resp['Credentials']['AccessKeyId'], aws_secret_access_key = resp['Credentials']['SecretAccessKey'], aws_session_token = resp['Credentials']['SessionToken'], endpoint_url=default_endpoint, region_name='', ) bucket_body = 'this is a test file' try: s3_put_obj = s3_client.put_object(Body=bucket_body, Bucket=bucket_name_1, Key="test-1.txt") except ClientError as e: s3_put_obj_error = e.response.get("Error", {}).get("Code") eq(s3_put_obj_error, 'AccessDenied') oidc_remove=iam_client.delete_open_id_connect_provider( OpenIDConnectProviderArn=oidc_arn ) @attr(resource='assume role with web identity') @attr(method='get') @attr(operation='check') @attr(assertion='checking put_obj working by swapping policies') @attr('webidentity_test') @attr('session_policy') def test_swapping_role_policy_and_session_policy(): check_webidentity() iam_client=get_iam_client() iam_access_key=get_iam_access_key() iam_secret_key=get_iam_secret_key() sts_client=get_sts_client() default_endpoint=get_config_endpoint() role_session_name=get_parameter_name() thumbprint=get_thumbprint() aud=get_aud() token=get_token() realm=get_realm_name() url = 'http://localhost:8080/auth/realms/{}'.format(realm) thumbprintlist = [thumbprint] (oidc_arn,oidc_error) = create_oidc_provider(iam_client, url, None, thumbprintlist) if oidc_error is not None: raise RuntimeError('Unable to create/get openid connect provider {}'.format(oidc_error)) policy_document = "{\"Version\":\"2012-10-17\",\"Statement\":[{\"Effect\":\"Allow\",\"Principal\":{\"Federated\":[\""+oidc_arn+"\"]},\"Action\":[\"sts:AssumeRoleWithWebIdentity\"],\"Condition\":{\"StringEquals\":{\"localhost:8080/auth/realms/"+realm+":app_id\":\""+aud+"\"}}}]}" (role_error,role_response,general_role_name)=create_role(iam_client,'/',None,policy_document,None,None,None) eq(role_response['Role']['Arn'],'arn:aws:iam:::role/'+general_role_name+'') role_policy_new = "{\"Version\":\"2012-10-17\",\"Statement\":{\"Effect\":\"Allow\",\"Action\":[\"s3:GetObject\",\"s3:PutObject\"],\"Resource\":[\"arn:aws:s3:::test1\",\"arn:aws:s3:::test1/*\"]}}" (role_err,response)=put_role_policy(iam_client,general_role_name,None,role_policy_new) eq(response['ResponseMetadata']['HTTPStatusCode'],200) s3_client_iam_creds = get_s3_client_using_iam_creds() bucket_name_1 = 'test1' s3bucket = s3_client_iam_creds.create_bucket(Bucket=bucket_name_1) eq(s3bucket['ResponseMetadata']['HTTPStatusCode'],200) session_policy = "{\"Version\":\"2012-10-17\",\"Statement\":{\"Effect\":\"Allow\",\"Action\":\"s3:*\",\"Resource\":[\"*\"]}}" resp=sts_client.assume_role_with_web_identity(RoleArn=role_response['Role']['Arn'],RoleSessionName=role_session_name,WebIdentityToken=token,Policy=session_policy) eq(resp['ResponseMetadata']['HTTPStatusCode'],200) s3_client = boto3.client('s3', aws_access_key_id = resp['Credentials']['AccessKeyId'], aws_secret_access_key = resp['Credentials']['SecretAccessKey'], aws_session_token = resp['Credentials']['SessionToken'], endpoint_url=default_endpoint, region_name='', ) bucket_body = 'this is a test file' s3_put_obj = s3_client.put_object(Body=bucket_body, Bucket=bucket_name_1, Key="test-1.txt") eq(s3_put_obj['ResponseMetadata']['HTTPStatusCode'],200) oidc_remove=iam_client.delete_open_id_connect_provider( OpenIDConnectProviderArn=oidc_arn ) @attr(resource='assume role with web identity') @attr(method='put') @attr(operation='check') @attr(assertion='checking put_obj working by setting different permissions to role and session policy') @attr('webidentity_test') @attr('session_policy') def test_session_policy_check_different_op_permissions(): check_webidentity() iam_client=get_iam_client() iam_access_key=get_iam_access_key() iam_secret_key=get_iam_secret_key() sts_client=get_sts_client() default_endpoint=get_config_endpoint() role_session_name=get_parameter_name() thumbprint=get_thumbprint() aud=get_aud() token=get_token() realm=get_realm_name() url = 'http://localhost:8080/auth/realms/{}'.format(realm) thumbprintlist = [thumbprint] (oidc_arn,oidc_error) = create_oidc_provider(iam_client, url, None, thumbprintlist) if oidc_error is not None: raise RuntimeError('Unable to create/get openid connect provider {}'.format(oidc_error)) policy_document = "{\"Version\":\"2012-10-17\",\"Statement\":[{\"Effect\":\"Allow\",\"Principal\":{\"Federated\":[\""+oidc_arn+"\"]},\"Action\":[\"sts:AssumeRoleWithWebIdentity\"],\"Condition\":{\"StringEquals\":{\"localhost:8080/auth/realms/"+realm+":app_id\":\""+aud+"\"}}}]}" (role_error,role_response,general_role_name)=create_role(iam_client,'/',None,policy_document,None,None,None) eq(role_response['Role']['Arn'],'arn:aws:iam:::role/'+general_role_name+'') role_policy_new = "{\"Version\":\"2012-10-17\",\"Statement\":{\"Effect\":\"Allow\",\"Action\":[\"s3:PutObject\"],\"Resource\":[\"arn:aws:s3:::test1\",\"arn:aws:s3:::test1/*\"]}}" (role_err,response)=put_role_policy(iam_client,general_role_name,None,role_policy_new) eq(response['ResponseMetadata']['HTTPStatusCode'],200) s3_client_iam_creds = get_s3_client_using_iam_creds() bucket_name_1 = 'test1' s3bucket = s3_client_iam_creds.create_bucket(Bucket=bucket_name_1) eq(s3bucket['ResponseMetadata']['HTTPStatusCode'],200) session_policy = "{\"Version\":\"2012-10-17\",\"Statement\":{\"Effect\":\"Allow\",\"Action\":[\"s3:GetObject\"],\"Resource\":[\"arn:aws:s3:::test1\",\"arn:aws:s3:::test1/*\"]}}" resp=sts_client.assume_role_with_web_identity(RoleArn=role_response['Role']['Arn'],RoleSessionName=role_session_name,WebIdentityToken=token,Policy=session_policy) eq(resp['ResponseMetadata']['HTTPStatusCode'],200) s3_client = boto3.client('s3', aws_access_key_id = resp['Credentials']['AccessKeyId'], aws_secret_access_key = resp['Credentials']['SecretAccessKey'], aws_session_token = resp['Credentials']['SessionToken'], endpoint_url=default_endpoint, region_name='', ) bucket_body = 'this is a test file' try: s3_put_obj = s3_client.put_object(Body=bucket_body, Bucket=bucket_name_1, Key="test-1.txt") except ClientError as e: s3_put_obj_error = e.response.get("Error", {}).get("Code") eq(s3_put_obj_error, 'AccessDenied') oidc_remove=iam_client.delete_open_id_connect_provider( OpenIDConnectProviderArn=oidc_arn ) @attr(resource='assume role with web identity') @attr(method='put') @attr(operation='check') @attr(assertion='checking op behaviour with deny effect') @attr('webidentity_test') @attr('session_policy') def test_session_policy_check_with_deny_effect(): check_webidentity() iam_client=get_iam_client() iam_access_key=get_iam_access_key() iam_secret_key=get_iam_secret_key() sts_client=get_sts_client() default_endpoint=get_config_endpoint() role_session_name=get_parameter_name() thumbprint=get_thumbprint() aud=get_aud() token=get_token() realm=get_realm_name() url = 'http://localhost:8080/auth/realms/{}'.format(realm) thumbprintlist = [thumbprint] (oidc_arn,oidc_error) = create_oidc_provider(iam_client, url, None, thumbprintlist) if oidc_error is not None: raise RuntimeError('Unable to create/get openid connect provider {}'.format(oidc_error)) policy_document = "{\"Version\":\"2012-10-17\",\"Statement\":[{\"Effect\":\"Allow\",\"Principal\":{\"Federated\":[\""+oidc_arn+"\"]},\"Action\":[\"sts:AssumeRoleWithWebIdentity\"],\"Condition\":{\"StringEquals\":{\"localhost:8080/auth/realms/"+realm+":app_id\":\""+aud+"\"}}}]}" (role_error,role_response,general_role_name)=create_role(iam_client,'/',None,policy_document,None,None,None) eq(role_response['Role']['Arn'],'arn:aws:iam:::role/'+general_role_name+'') role_policy_new = "{\"Version\":\"2012-10-17\",\"Statement\":{\"Effect\":\"Deny\",\"Action\":\"s3:*\",\"Resource\":[\"*\"]}}" (role_err,response)=put_role_policy(iam_client,general_role_name,None,role_policy_new) eq(response['ResponseMetadata']['HTTPStatusCode'],200) s3_client_iam_creds = get_s3_client_using_iam_creds() bucket_name_1 = 'test1' s3bucket = s3_client_iam_creds.create_bucket(Bucket=bucket_name_1) eq(s3bucket['ResponseMetadata']['HTTPStatusCode'],200) session_policy = "{\"Version\":\"2012-10-17\",\"Statement\":{\"Effect\":\"Allow\",\"Action\":[\"s3:PutObject\"],\"Resource\":[\"arn:aws:s3:::test1\",\"arn:aws:s3:::test1/*\"]}}" resp=sts_client.assume_role_with_web_identity(RoleArn=role_response['Role']['Arn'],RoleSessionName=role_session_name,WebIdentityToken=token,Policy=session_policy) eq(resp['ResponseMetadata']['HTTPStatusCode'],200) s3_client = boto3.client('s3', aws_access_key_id = resp['Credentials']['AccessKeyId'], aws_secret_access_key = resp['Credentials']['SecretAccessKey'], aws_session_token = resp['Credentials']['SessionToken'], endpoint_url=default_endpoint, region_name='', ) bucket_body = 'this is a test file' try: s3_put_obj = s3_client.put_object(Body=bucket_body, Bucket=bucket_name_1, Key="test-1.txt") except ClientError as e: s3_put_obj_error = e.response.get("Error", {}).get("Code") eq(s3_put_obj_error, 'AccessDenied') oidc_remove=iam_client.delete_open_id_connect_provider( OpenIDConnectProviderArn=oidc_arn ) @attr(resource='assume role with web identity') @attr(method='put') @attr(operation='check') @attr(assertion='checking put_obj working with deny and allow on same op') @attr('webidentity_test') @attr('session_policy') def test_session_policy_check_with_deny_on_same_op(): check_webidentity() iam_client=get_iam_client() iam_access_key=get_iam_access_key() iam_secret_key=get_iam_secret_key() sts_client=get_sts_client() default_endpoint=get_config_endpoint() role_session_name=get_parameter_name() thumbprint=get_thumbprint() aud=get_aud() token=get_token() realm=get_realm_name() url = 'http://localhost:8080/auth/realms/{}'.format(realm) thumbprintlist = [thumbprint] (oidc_arn,oidc_error) = create_oidc_provider(iam_client, url, None, thumbprintlist) if oidc_error is not None: raise RuntimeError('Unable to create/get openid connect provider {}'.format(oidc_error)) policy_document = "{\"Version\":\"2012-10-17\",\"Statement\":[{\"Effect\":\"Allow\",\"Principal\":{\"Federated\":[\""+oidc_arn+"\"]},\"Action\":[\"sts:AssumeRoleWithWebIdentity\"],\"Condition\":{\"StringEquals\":{\"localhost:8080/auth/realms/"+realm+":app_id\":\""+aud+"\"}}}]}" (role_error,role_response,general_role_name)=create_role(iam_client,'/',None,policy_document,None,None,None) eq(role_response['Role']['Arn'],'arn:aws:iam:::role/'+general_role_name+'') role_policy_new = "{\"Version\":\"2012-10-17\",\"Statement\":{\"Effect\":\"Allow\",\"Action\":[\"s3:PutObject\"],\"Resource\":[\"arn:aws:s3:::test1\",\"arn:aws:s3:::test1/*\"]}}" (role_err,response)=put_role_policy(iam_client,general_role_name,None,role_policy_new) eq(response['ResponseMetadata']['HTTPStatusCode'],200) s3_client_iam_creds = get_s3_client_using_iam_creds() bucket_name_1 = 'test1' s3bucket = s3_client_iam_creds.create_bucket(Bucket=bucket_name_1) eq(s3bucket['ResponseMetadata']['HTTPStatusCode'],200) session_policy = "{\"Version\":\"2012-10-17\",\"Statement\":{\"Effect\":\"Deny\",\"Action\":[\"s3:PutObject\"],\"Resource\":[\"arn:aws:s3:::test1\",\"arn:aws:s3:::test1/*\"]}}" resp=sts_client.assume_role_with_web_identity(RoleArn=role_response['Role']['Arn'],RoleSessionName=role_session_name,WebIdentityToken=token,Policy=session_policy) eq(resp['ResponseMetadata']['HTTPStatusCode'],200) s3_client = boto3.client('s3', aws_access_key_id = resp['Credentials']['AccessKeyId'], aws_secret_access_key = resp['Credentials']['SecretAccessKey'], aws_session_token = resp['Credentials']['SessionToken'], endpoint_url=default_endpoint, region_name='', ) bucket_body = 'this is a test file' try: s3_put_obj = s3_client.put_object(Body=bucket_body, Bucket=bucket_name_1, Key="test-1.txt") except ClientError as e: s3_put_obj_error = e.response.get("Error", {}).get("Code") eq(s3_put_obj_error, 'AccessDenied') oidc_remove=iam_client.delete_open_id_connect_provider( OpenIDConnectProviderArn=oidc_arn ) @attr(resource='assume role with web identity') @attr(method='put') @attr(operation='check') @attr(assertion='checking op when bucket policy has role arn') @attr('webidentity_test') @attr('session_policy') def test_session_policy_bucket_policy_role_arn(): check_webidentity() iam_client=get_iam_client() sts_client=get_sts_client() default_endpoint=get_config_endpoint() role_session_name=get_parameter_name() thumbprint=get_thumbprint() aud=get_aud() token=get_token() realm=get_realm_name() url = 'http://localhost:8080/auth/realms/{}'.format(realm) thumbprintlist = [thumbprint] (oidc_arn,oidc_error) = create_oidc_provider(iam_client, url, None, thumbprintlist) if oidc_error is not None: raise RuntimeError('Unable to create/get openid connect provider {}'.format(oidc_error)) policy_document = "{\"Version\":\"2012-10-17\",\"Statement\":[{\"Effect\":\"Allow\",\"Principal\":{\"Federated\":[\""+oidc_arn+"\"]},\"Action\":[\"sts:AssumeRoleWithWebIdentity\"],\"Condition\":{\"StringEquals\":{\"localhost:8080/auth/realms/"+realm+":app_id\":\""+aud+"\"}}}]}" (role_error,role_response,general_role_name)=create_role(iam_client,'/',None,policy_document,None,None,None) eq(role_response['Role']['Arn'],'arn:aws:iam:::role/'+general_role_name+'') role_policy = "{\"Version\":\"2012-10-17\",\"Statement\":{\"Effect\":\"Allow\",\"Action\":\"s3:*\",\"Resource\":[\"*\"]}}" (role_err,response)=put_role_policy(iam_client,general_role_name,None,role_policy) eq(response['ResponseMetadata']['HTTPStatusCode'],200) s3client_iamcreds = get_s3_client_using_iam_creds() bucket_name_1 = 'test1' s3bucket = s3client_iamcreds.create_bucket(Bucket=bucket_name_1) eq(s3bucket['ResponseMetadata']['HTTPStatusCode'],200) resource1 = "arn:aws:s3:::" + bucket_name_1 resource2 = "arn:aws:s3:::" + bucket_name_1 + "/*" rolearn = "arn:aws:iam:::role/" + general_role_name bucket_policy = json.dumps( { "Version": "2012-10-17", "Statement": [{ "Effect": "Allow", "Principal": {"AWS": "{}".format(rolearn)}, "Action": ["s3:GetObject","s3:PutObject"], "Resource": [ "{}".format(resource1), "{}".format(resource2) ] }] }) s3client_iamcreds.put_bucket_policy(Bucket=bucket_name_1, Policy=bucket_policy) session_policy = "{\"Version\":\"2012-10-17\",\"Statement\":{\"Effect\":\"Allow\",\"Action\":[\"s3:PutObject\"],\"Resource\":[\"arn:aws:s3:::test1\",\"arn:aws:s3:::test1/*\"]}}" resp=sts_client.assume_role_with_web_identity(RoleArn=role_response['Role']['Arn'],RoleSessionName=role_session_name,WebIdentityToken=token,Policy=session_policy) eq(resp['ResponseMetadata']['HTTPStatusCode'],200) s3_client = boto3.client('s3', aws_access_key_id = resp['Credentials']['AccessKeyId'], aws_secret_access_key = resp['Credentials']['SecretAccessKey'], aws_session_token = resp['Credentials']['SessionToken'], endpoint_url=default_endpoint, region_name='', ) bucket_body = 'this is a test file' s3_put_obj = s3_client.put_object(Body=bucket_body, Bucket=bucket_name_1, Key="test-1.txt") eq(s3_put_obj['ResponseMetadata']['HTTPStatusCode'],200) try: obj = s3_client.get_object(Bucket=bucket_name_1, Key="test-1.txt") except ClientError as e: s3object_error = e.response.get("Error", {}).get("Code") eq(s3object_error, 'AccessDenied') oidc_remove=iam_client.delete_open_id_connect_provider( OpenIDConnectProviderArn=oidc_arn ) @attr(resource='assume role with web identity') @attr(method='get') @attr(operation='check') @attr(assertion='checking op when bucket policy has session arn') @attr('webidentity_test') @attr('session_policy') def test_session_policy_bucket_policy_session_arn(): check_webidentity() iam_client=get_iam_client() sts_client=get_sts_client() default_endpoint=get_config_endpoint() role_session_name=get_parameter_name() thumbprint=get_thumbprint() aud=get_aud() token=get_token() realm=get_realm_name() url = 'http://localhost:8080/auth/realms/{}'.format(realm) thumbprintlist = [thumbprint] (oidc_arn,oidc_error) = create_oidc_provider(iam_client, url, None, thumbprintlist) if oidc_error is not None: raise RuntimeError('Unable to create/get openid connect provider {}'.format(oidc_error)) policy_document = "{\"Version\":\"2012-10-17\",\"Statement\":[{\"Effect\":\"Allow\",\"Principal\":{\"Federated\":[\""+oidc_arn+"\"]},\"Action\":[\"sts:AssumeRoleWithWebIdentity\"],\"Condition\":{\"StringEquals\":{\"localhost:8080/auth/realms/"+realm+":app_id\":\""+aud+"\"}}}]}" (role_error,role_response,general_role_name)=create_role(iam_client,'/',None,policy_document,None,None,None) eq(role_response['Role']['Arn'],'arn:aws:iam:::role/'+general_role_name+'') role_policy = "{\"Version\":\"2012-10-17\",\"Statement\":{\"Effect\":\"Allow\",\"Action\":\"s3:*\",\"Resource\":[\"*\"]}}" (role_err,response)=put_role_policy(iam_client,general_role_name,None,role_policy) eq(response['ResponseMetadata']['HTTPStatusCode'],200) s3client_iamcreds = get_s3_client_using_iam_creds() bucket_name_1 = 'test1' s3bucket = s3client_iamcreds.create_bucket(Bucket=bucket_name_1) eq(s3bucket['ResponseMetadata']['HTTPStatusCode'],200) resource1 = "arn:aws:s3:::" + bucket_name_1 resource2 = "arn:aws:s3:::" + bucket_name_1 + "/*" rolesessionarn = "arn:aws:iam:::assumed-role/" + general_role_name + "/" + role_session_name bucket_policy = json.dumps( { "Version": "2012-10-17", "Statement": [{ "Effect": "Allow", "Principal": {"AWS": "{}".format(rolesessionarn)}, "Action": ["s3:GetObject","s3:PutObject"], "Resource": [ "{}".format(resource1), "{}".format(resource2) ] }] }) s3client_iamcreds.put_bucket_policy(Bucket=bucket_name_1, Policy=bucket_policy) session_policy = "{\"Version\":\"2012-10-17\",\"Statement\":{\"Effect\":\"Allow\",\"Action\":[\"s3:PutObject\"],\"Resource\":[\"arn:aws:s3:::test1\",\"arn:aws:s3:::test1/*\"]}}" resp=sts_client.assume_role_with_web_identity(RoleArn=role_response['Role']['Arn'],RoleSessionName=role_session_name,WebIdentityToken=token,Policy=session_policy) eq(resp['ResponseMetadata']['HTTPStatusCode'],200) s3_client = boto3.client('s3', aws_access_key_id = resp['Credentials']['AccessKeyId'], aws_secret_access_key = resp['Credentials']['SecretAccessKey'], aws_session_token = resp['Credentials']['SessionToken'], endpoint_url=default_endpoint, region_name='', ) bucket_body = 'this is a test file' s3_put_obj = s3_client.put_object(Body=bucket_body, Bucket=bucket_name_1, Key="test-1.txt") eq(s3_put_obj['ResponseMetadata']['HTTPStatusCode'],200) s3_get_obj = s3_client.get_object(Bucket=bucket_name_1, Key="test-1.txt") eq(s3_get_obj['ResponseMetadata']['HTTPStatusCode'],200) oidc_remove=iam_client.delete_open_id_connect_provider( OpenIDConnectProviderArn=oidc_arn ) @attr(resource='assume role with web identity') @attr(method='put') @attr(operation='check') @attr(assertion='checking copy object op with role, session and bucket policy') @attr('webidentity_test') @attr('session_policy') def test_session_policy_copy_object(): check_webidentity() iam_client=get_iam_client() sts_client=get_sts_client() default_endpoint=get_config_endpoint() role_session_name=get_parameter_name() thumbprint=get_thumbprint() aud=get_aud() token=get_token() realm=get_realm_name() url = 'http://localhost:8080/auth/realms/{}'.format(realm) thumbprintlist = [thumbprint] (oidc_arn,oidc_error) = create_oidc_provider(iam_client, url, None, thumbprintlist) if oidc_error is not None: raise RuntimeError('Unable to create/get openid connect provider {}'.format(oidc_error)) policy_document = "{\"Version\":\"2012-10-17\",\"Statement\":[{\"Effect\":\"Allow\",\"Principal\":{\"Federated\":[\""+oidc_arn+"\"]},\"Action\":[\"sts:AssumeRoleWithWebIdentity\"],\"Condition\":{\"StringEquals\":{\"localhost:8080/auth/realms/"+realm+":app_id\":\""+aud+"\"}}}]}" (role_error,role_response,general_role_name)=create_role(iam_client,'/',None,policy_document,None,None,None) eq(role_response['Role']['Arn'],'arn:aws:iam:::role/'+general_role_name+'') role_policy = "{\"Version\":\"2012-10-17\",\"Statement\":{\"Effect\":\"Allow\",\"Action\":\"s3:*\",\"Resource\":[\"*\"]}}" (role_err,response)=put_role_policy(iam_client,general_role_name,None,role_policy) eq(response['ResponseMetadata']['HTTPStatusCode'],200) s3client_iamcreds = get_s3_client_using_iam_creds() bucket_name_1 = 'test1' s3bucket = s3client_iamcreds.create_bucket(Bucket=bucket_name_1) eq(s3bucket['ResponseMetadata']['HTTPStatusCode'],200) resource1 = "arn:aws:s3:::" + bucket_name_1 resource2 = "arn:aws:s3:::" + bucket_name_1 + "/*" rolesessionarn = "arn:aws:iam:::assumed-role/" + general_role_name + "/" + role_session_name print (rolesessionarn) bucket_policy = json.dumps( { "Version": "2012-10-17", "Statement": [{ "Effect": "Allow", "Principal": {"AWS": "{}".format(rolesessionarn)}, "Action": ["s3:GetObject","s3:PutObject"], "Resource": [ "{}".format(resource1), "{}".format(resource2) ] }] }) s3client_iamcreds.put_bucket_policy(Bucket=bucket_name_1, Policy=bucket_policy) session_policy = "{\"Version\":\"2012-10-17\",\"Statement\":{\"Effect\":\"Allow\",\"Action\":[\"s3:PutObject\"],\"Resource\":[\"arn:aws:s3:::test1\",\"arn:aws:s3:::test1/*\"]}}" resp=sts_client.assume_role_with_web_identity(RoleArn=role_response['Role']['Arn'],RoleSessionName=role_session_name,WebIdentityToken=token,Policy=session_policy) eq(resp['ResponseMetadata']['HTTPStatusCode'],200) s3_client = boto3.client('s3', aws_access_key_id = resp['Credentials']['AccessKeyId'], aws_secret_access_key = resp['Credentials']['SecretAccessKey'], aws_session_token = resp['Credentials']['SessionToken'], endpoint_url=default_endpoint, region_name='', ) bucket_body = 'this is a test file' s3_put_obj = s3_client.put_object(Body=bucket_body, Bucket=bucket_name_1, Key="test-1.txt") eq(s3_put_obj['ResponseMetadata']['HTTPStatusCode'],200) copy_source = { 'Bucket': bucket_name_1, 'Key': 'test-1.txt' } s3_client.copy(copy_source, bucket_name_1, "test-2.txt") s3_get_obj = s3_client.get_object(Bucket=bucket_name_1, Key="test-2.txt") eq(s3_get_obj['ResponseMetadata']['HTTPStatusCode'],200) oidc_remove=iam_client.delete_open_id_connect_provider( OpenIDConnectProviderArn=oidc_arn ) @attr(resource='assume role with web identity') @attr(method='put') @attr(operation='check') @attr(assertion='checking op is denied when no role policy') @attr('webidentity_test') @attr('session_policy') def test_session_policy_no_bucket_role_policy(): check_webidentity() iam_client=get_iam_client() sts_client=get_sts_client() default_endpoint=get_config_endpoint() role_session_name=get_parameter_name() thumbprint=get_thumbprint() aud=get_aud() token=get_token() realm=get_realm_name() url = 'http://localhost:8080/auth/realms/{}'.format(realm) thumbprintlist = [thumbprint] (oidc_arn,oidc_error) = create_oidc_provider(iam_client, url, None, thumbprintlist) if oidc_error is not None: raise RuntimeError('Unable to create/get openid connect provider {}'.format(oidc_error)) policy_document = "{\"Version\":\"2012-10-17\",\"Statement\":[{\"Effect\":\"Allow\",\"Principal\":{\"Federated\":[\""+oidc_arn+"\"]},\"Action\":[\"sts:AssumeRoleWithWebIdentity\"],\"Condition\":{\"StringEquals\":{\"localhost:8080/auth/realms/"+realm+":app_id\":\""+aud+"\"}}}]}" (role_error,role_response,general_role_name)=create_role(iam_client,'/',None,policy_document,None,None,None) eq(role_response['Role']['Arn'],'arn:aws:iam:::role/'+general_role_name+'') s3client_iamcreds = get_s3_client_using_iam_creds() bucket_name_1 = 'test1' s3bucket = s3client_iamcreds.create_bucket(Bucket=bucket_name_1) eq(s3bucket['ResponseMetadata']['HTTPStatusCode'],200) session_policy = "{\"Version\":\"2012-10-17\",\"Statement\":{\"Effect\":\"Allow\",\"Action\":[\"s3:PutObject\",\"s3:GetObject\"],\"Resource\":[\"arn:aws:s3:::test1\",\"arn:aws:s3:::test1/*\"]}}" resp=sts_client.assume_role_with_web_identity(RoleArn=role_response['Role']['Arn'],RoleSessionName=role_session_name,WebIdentityToken=token,Policy=session_policy) eq(resp['ResponseMetadata']['HTTPStatusCode'],200) s3_client = boto3.client('s3', aws_access_key_id = resp['Credentials']['AccessKeyId'], aws_secret_access_key = resp['Credentials']['SecretAccessKey'], aws_session_token = resp['Credentials']['SessionToken'], endpoint_url=default_endpoint, region_name='', ) bucket_body = 'this is a test file' try: s3_put_obj = s3_client.put_object(Body=bucket_body, Bucket=bucket_name_1, Key="test-1.txt") except ClientError as e: s3putobj_error = e.response.get("Error", {}).get("Code") eq(s3putobj_error, 'AccessDenied') oidc_remove=iam_client.delete_open_id_connect_provider( OpenIDConnectProviderArn=oidc_arn ) @attr(resource='assume role with web identity') @attr(method='put') @attr(operation='check') @attr(assertion='checking op is denied when resource policy denies') @attr('webidentity_test') @attr('session_policy') def test_session_policy_bucket_policy_deny(): check_webidentity() iam_client=get_iam_client() sts_client=get_sts_client() default_endpoint=get_config_endpoint() role_session_name=get_parameter_name() thumbprint=get_thumbprint() aud=get_aud() token=get_token() realm=get_realm_name() url = 'http://localhost:8080/auth/realms/{}'.format(realm) thumbprintlist = [thumbprint] (oidc_arn,oidc_error) = create_oidc_provider(iam_client, url, None, thumbprintlist) if oidc_error is not None: raise RuntimeError('Unable to create/get openid connect provider {}'.format(oidc_error)) policy_document = "{\"Version\":\"2012-10-17\",\"Statement\":[{\"Effect\":\"Allow\",\"Principal\":{\"Federated\":[\""+oidc_arn+"\"]},\"Action\":[\"sts:AssumeRoleWithWebIdentity\"],\"Condition\":{\"StringEquals\":{\"localhost:8080/auth/realms/"+realm+":app_id\":\""+aud+"\"}}}]}" (role_error,role_response,general_role_name)=create_role(iam_client,'/',None,policy_document,None,None,None) eq(role_response['Role']['Arn'],'arn:aws:iam:::role/'+general_role_name+'') role_policy = "{\"Version\":\"2012-10-17\",\"Statement\":{\"Effect\":\"Allow\",\"Action\":\"s3:*\",\"Resource\":[\"*\"]}}" (role_err,response)=put_role_policy(iam_client,general_role_name,None,role_policy) eq(response['ResponseMetadata']['HTTPStatusCode'],200) s3client_iamcreds = get_s3_client_using_iam_creds() bucket_name_1 = 'test1' s3bucket = s3client_iamcreds.create_bucket(Bucket=bucket_name_1) eq(s3bucket['ResponseMetadata']['HTTPStatusCode'],200) resource1 = "arn:aws:s3:::" + bucket_name_1 resource2 = "arn:aws:s3:::" + bucket_name_1 + "/*" rolesessionarn = "arn:aws:iam:::assumed-role/" + general_role_name + "/" + role_session_name bucket_policy = json.dumps( { "Version": "2012-10-17", "Statement": [{ "Effect": "Deny", "Principal": {"AWS": "{}".format(rolesessionarn)}, "Action": ["s3:GetObject","s3:PutObject"], "Resource": [ "{}".format(resource1), "{}".format(resource2) ] }] }) s3client_iamcreds.put_bucket_policy(Bucket=bucket_name_1, Policy=bucket_policy) session_policy = "{\"Version\":\"2012-10-17\",\"Statement\":{\"Effect\":\"Allow\",\"Action\":[\"s3:PutObject\"],\"Resource\":[\"arn:aws:s3:::test1\",\"arn:aws:s3:::test1/*\"]}}" resp=sts_client.assume_role_with_web_identity(RoleArn=role_response['Role']['Arn'],RoleSessionName=role_session_name,WebIdentityToken=token,Policy=session_policy) eq(resp['ResponseMetadata']['HTTPStatusCode'],200) s3_client = boto3.client('s3', aws_access_key_id = resp['Credentials']['AccessKeyId'], aws_secret_access_key = resp['Credentials']['SecretAccessKey'], aws_session_token = resp['Credentials']['SessionToken'], endpoint_url=default_endpoint, region_name='', ) bucket_body = 'this is a test file' try: s3_put_obj = s3_client.put_object(Body=bucket_body, Bucket=bucket_name_1, Key="test-1.txt") except ClientError as e: s3putobj_error = e.response.get("Error", {}).get("Code") eq(s3putobj_error, 'AccessDenied') oidc_remove=iam_client.delete_open_id_connect_provider( OpenIDConnectProviderArn=oidc_arn )