diff --git a/s3tests.conf.SAMPLE b/s3tests.conf.SAMPLE index 828ea90..0b7b41f 100644 --- a/s3tests.conf.SAMPLE +++ b/s3tests.conf.SAMPLE @@ -72,6 +72,9 @@ email = tenanteduser@example.com #following section needs to be added for all sts-tests [iam] #used for iam operations in sts-tests +#email from vstart.sh +email = s3@example.com + #user_id from vstart.sh user_id = 0123456789abcdef0123456789abcdef0123456789abcdef0123456789abcdef diff --git a/s3tests_boto3/functional/__init__.py b/s3tests_boto3/functional/__init__.py index 92bc3d5..ba8f9a7 100644 --- a/s3tests_boto3/functional/__init__.py +++ b/s3tests_boto3/functional/__init__.py @@ -224,6 +224,24 @@ def teardown(): nuke_prefixed_buckets(prefix=prefix) nuke_prefixed_buckets(prefix=prefix, client=alt_client) nuke_prefixed_buckets(prefix=prefix, client=tenant_client) + try: + iam_client = get_iam_client() + list_roles_resp = iam_client.list_roles() + for role in list_roles_resp['Roles']: + list_policies_resp = iam_client.list_role_policies(RoleName=role['RoleName']) + for policy in list_policies_resp['PolicyNames']: + del_policy_resp = iam_client.delete_role_policy( + RoleName=role['RoleName'], + PolicyName=policy + ) + del_role_resp = iam_client.delete_role(RoleName=role['RoleName']) + list_oidc_resp = iam_client.list_open_id_connect_providers() + for oidcprovider in list_oidc_resp['OpenIDConnectProviderList']: + del_oidc_resp = iam_client.delete_open_id_connect_provider( + OpenIDConnectProviderArn=oidcprovider['Arn'] + ) + except: + pass def check_webidentity(): cfg = configparser.RawConfigParser() @@ -514,3 +532,9 @@ def get_token(): def get_realm_name(): return config.webidentity_realm + +def get_iam_access_key(): + return config.iam_access_key + +def get_iam_secret_key(): + return config.iam_secret_key diff --git a/s3tests_boto3/functional/test_sts.py b/s3tests_boto3/functional/test_sts.py index 78c6901..bbfb99a 100644 --- a/s3tests_boto3/functional/test_sts.py +++ b/s3tests_boto3/functional/test_sts.py @@ -45,7 +45,9 @@ from . import( get_aud, get_token, get_realm_name, - check_webidentity + check_webidentity, + get_iam_access_key, + get_iam_secret_key ) log = logging.getLogger(__name__) @@ -80,6 +82,54 @@ def put_user_policy(iam_client,username,policyname,policy_document): role_err = e.response['Code'] return (role_err,role_response) +def get_s3_client_using_iam_creds(): + iam_access_key = get_iam_access_key() + iam_secret_key = get_iam_secret_key() + default_endpoint = get_config_endpoint() + + s3_client_iam_creds = boto3.client('s3', + aws_access_key_id = iam_access_key, + aws_secret_access_key = iam_secret_key, + endpoint_url=default_endpoint, + region_name='', + ) + + return s3_client_iam_creds + +def create_oidc_provider(iam_client, url, clientidlist, thumbprintlist): + oidc_arn = None + oidc_error = None + clientids = [] + if clientidlist is None: + clientidlist=clientids + try: + oidc_response = iam_client.create_open_id_connect_provider( + Url=url, + ClientIDList=clientidlist, + ThumbprintList=thumbprintlist, + ) + oidc_arn = oidc_response['OpenIDConnectProviderArn'] + print (oidc_arn) + except ClientError as e: + oidc_error = e.response['Code'] + print (oidc_error) + try: + oidc_error = None + print (url) + if url.startswith('http://'): + url = url[len('http://'):] + elif url.startswith('https://'): + url = url[len('https://'):] + elif url.startswith('www.'): + url = url[len('www.'):] + oidc_arn = 'arn:aws:iam:::oidc-provider/{}'.format(url) + print (url) + print (oidc_arn) + oidc_response = iam_client.get_open_id_connect_provider(OpenIDConnectProviderArn=oidc_arn) + except ClientError as e: + oidc_arn = None + return (oidc_arn, oidc_error) + @attr(resource='get session token') @attr(method='get') @attr(operation='check') @@ -437,3 +487,829 @@ def test_assume_role_with_web_identity_invalid_webtoken(): OpenIDConnectProviderArn=oidc_response["OpenIDConnectProviderArn"] ) ''' + +####################### +# Session Policy Tests +####################### + +@attr(resource='assume role with web identity') +@attr(method='get') +@attr(operation='check') +@attr(assertion='checking session policy working for two different buckets') +@attr('webidentity_test') +@attr('session_policy') +def test_session_policy_check_on_different_buckets(): + check_webidentity() + iam_client=get_iam_client() + sts_client=get_sts_client() + default_endpoint=get_config_endpoint() + role_session_name=get_parameter_name() + thumbprint=get_thumbprint() + aud=get_aud() + token=get_token() + realm=get_realm_name() + + url = 'http://localhost:8080/auth/realms/{}'.format(realm) + thumbprintlist = [thumbprint] + (oidc_arn,oidc_error) = create_oidc_provider(iam_client, url, None, thumbprintlist) + if oidc_error is not None: + raise RuntimeError('Unable to create/get openid connect provider {}'.format(oidc_error)) + + policy_document = "{\"Version\":\"2012-10-17\",\"Statement\":[{\"Effect\":\"Allow\",\"Principal\":{\"Federated\":[\""+oidc_arn+"\"]},\"Action\":[\"sts:AssumeRoleWithWebIdentity\"],\"Condition\":{\"StringEquals\":{\"localhost:8080/auth/realms/"+realm+":app_id\":\""+aud+"\"}}}]}" + (role_error,role_response,general_role_name)=create_role(iam_client,'/',None,policy_document,None,None,None) + eq(role_response['Role']['Arn'],'arn:aws:iam:::role/'+general_role_name+'') + + role_policy_new = "{\"Version\":\"2012-10-17\",\"Statement\":{\"Effect\":\"Allow\",\"Action\":\"s3:*\",\"Resource\":[\"arn:aws:s3:::test2\",\"arn:aws:s3:::test2/*\"]}}" + + (role_err,response)=put_role_policy(iam_client,general_role_name,None,role_policy_new) + eq(response['ResponseMetadata']['HTTPStatusCode'],200) + + session_policy = "{\"Version\":\"2012-10-17\",\"Statement\":{\"Effect\":\"Allow\",\"Action\":[\"s3:GetObject\",\"s3:PutObject\"],\"Resource\":[\"arn:aws:s3:::test1\",\"arn:aws:s3:::test1/*\"]}}" + + resp=sts_client.assume_role_with_web_identity(RoleArn=role_response['Role']['Arn'],RoleSessionName=role_session_name,WebIdentityToken=token,Policy=session_policy) + eq(resp['ResponseMetadata']['HTTPStatusCode'],200) + + s3_client = boto3.client('s3', + aws_access_key_id = resp['Credentials']['AccessKeyId'], + aws_secret_access_key = resp['Credentials']['SecretAccessKey'], + aws_session_token = resp['Credentials']['SessionToken'], + endpoint_url=default_endpoint, + region_name='', + ) + + bucket_name_1 = 'test1' + try: + s3bucket = s3_client.create_bucket(Bucket=bucket_name_1) + except ClientError as e: + s3bucket_error = e.response.get("Error", {}).get("Code") + eq(s3bucket_error, 'AccessDenied') + + bucket_name_2 = 'test2' + try: + s3bucket = s3_client.create_bucket(Bucket=bucket_name_2) + except ClientError as e: + s3bucket_error = e.response.get("Error", {}).get("Code") + eq(s3bucket_error, 'AccessDenied') + + bucket_body = 'please-write-something' + #body.encode(encoding='utf_8') + try: + s3_put_obj = s3_client.put_object(Body=bucket_body, Bucket=bucket_name_1, Key="test-1.txt") + except ClientError as e: + s3_put_obj_error = e.response.get("Error", {}).get("Code") + eq(s3_put_obj_error,'NoSuchBucket') + + oidc_remove=iam_client.delete_open_id_connect_provider( + OpenIDConnectProviderArn=oidc_arn + ) + + +@attr(resource='assume role with web identity') +@attr(method='put') +@attr(operation='check') +@attr(assertion='checking session policy working for same bucket') +@attr('webidentity_test') +@attr('session_policy') +def test_session_policy_check_on_same_bucket(): + check_webidentity() + iam_client=get_iam_client() + sts_client=get_sts_client() + default_endpoint=get_config_endpoint() + role_session_name=get_parameter_name() + thumbprint=get_thumbprint() + aud=get_aud() + token=get_token() + realm=get_realm_name() + + url = 'http://localhost:8080/auth/realms/{}'.format(realm) + thumbprintlist = [thumbprint] + (oidc_arn,oidc_error) = create_oidc_provider(iam_client, url, None, thumbprintlist) + if oidc_error is not None: + raise RuntimeError('Unable to create/get openid connect provider {}'.format(oidc_error)) + + policy_document = "{\"Version\":\"2012-10-17\",\"Statement\":[{\"Effect\":\"Allow\",\"Principal\":{\"Federated\":[\""+oidc_arn+"\"]},\"Action\":[\"sts:AssumeRoleWithWebIdentity\"],\"Condition\":{\"StringEquals\":{\"localhost:8080/auth/realms/"+realm+":app_id\":\""+aud+"\"}}}]}" + (role_error,role_response,general_role_name)=create_role(iam_client,'/',None,policy_document,None,None,None) + eq(role_response['Role']['Arn'],'arn:aws:iam:::role/'+general_role_name+'') + + role_policy_new = "{\"Version\":\"2012-10-17\",\"Statement\":{\"Effect\":\"Allow\",\"Action\":\"s3:*\",\"Resource\":[\"*\"]}}" + + (role_err,response)=put_role_policy(iam_client,general_role_name,None,role_policy_new) + eq(response['ResponseMetadata']['HTTPStatusCode'],200) + + s3_client_iam_creds = get_s3_client_using_iam_creds() + + bucket_name_1 = 'test1' + s3bucket = s3_client_iam_creds.create_bucket(Bucket=bucket_name_1) + eq(s3bucket['ResponseMetadata']['HTTPStatusCode'],200) + + session_policy = "{\"Version\":\"2012-10-17\",\"Statement\":{\"Effect\":\"Allow\",\"Action\":[\"s3:GetObject\",\"s3:PutObject\"],\"Resource\":[\"arn:aws:s3:::test1\",\"arn:aws:s3:::test1/*\"]}}" + + resp=sts_client.assume_role_with_web_identity(RoleArn=role_response['Role']['Arn'],RoleSessionName=role_session_name,WebIdentityToken=token,Policy=session_policy) + eq(resp['ResponseMetadata']['HTTPStatusCode'],200) + + s3_client = boto3.client('s3', + aws_access_key_id = resp['Credentials']['AccessKeyId'], + aws_secret_access_key = resp['Credentials']['SecretAccessKey'], + aws_session_token = resp['Credentials']['SessionToken'], + endpoint_url=default_endpoint, + region_name='', + ) + + bucket_body = 'this is a test file' + s3_put_obj = s3_client.put_object(Body=bucket_body, Bucket=bucket_name_1, Key="test-1.txt") + eq(s3_put_obj['ResponseMetadata']['HTTPStatusCode'],200) + + oidc_remove=iam_client.delete_open_id_connect_provider( + OpenIDConnectProviderArn=oidc_arn + ) + + +@attr(resource='assume role with web identity') +@attr(method='get') +@attr(operation='check') +@attr(assertion='checking put_obj op denial') +@attr('webidentity_test') +@attr('session_policy') +def test_session_policy_check_put_obj_denial(): + check_webidentity() + iam_client=get_iam_client() + iam_access_key=get_iam_access_key() + iam_secret_key=get_iam_secret_key() + sts_client=get_sts_client() + default_endpoint=get_config_endpoint() + role_session_name=get_parameter_name() + thumbprint=get_thumbprint() + aud=get_aud() + token=get_token() + realm=get_realm_name() + + url = 'http://localhost:8080/auth/realms/{}'.format(realm) + thumbprintlist = [thumbprint] + (oidc_arn,oidc_error) = create_oidc_provider(iam_client, url, None, thumbprintlist) + if oidc_error is not None: + raise RuntimeError('Unable to create/get openid connect provider {}'.format(oidc_error)) + + policy_document = "{\"Version\":\"2012-10-17\",\"Statement\":[{\"Effect\":\"Allow\",\"Principal\":{\"Federated\":[\""+oidc_arn+"\"]},\"Action\":[\"sts:AssumeRoleWithWebIdentity\"],\"Condition\":{\"StringEquals\":{\"localhost:8080/auth/realms/"+realm+":app_id\":\""+aud+"\"}}}]}" + (role_error,role_response,general_role_name)=create_role(iam_client,'/',None,policy_document,None,None,None) + eq(role_response['Role']['Arn'],'arn:aws:iam:::role/'+general_role_name+'') + + role_policy_new = "{\"Version\":\"2012-10-17\",\"Statement\":{\"Effect\":\"Allow\",\"Action\":\"s3:*\",\"Resource\":[\"*\"]}}" + + (role_err,response)=put_role_policy(iam_client,general_role_name,None,role_policy_new) + eq(response['ResponseMetadata']['HTTPStatusCode'],200) + + s3_client_iam_creds = get_s3_client_using_iam_creds() + + bucket_name_1 = 'test1' + s3bucket = s3_client_iam_creds.create_bucket(Bucket=bucket_name_1) + eq(s3bucket['ResponseMetadata']['HTTPStatusCode'],200) + + session_policy = "{\"Version\":\"2012-10-17\",\"Statement\":{\"Effect\":\"Allow\",\"Action\":[\"s3:GetObject\"],\"Resource\":[\"arn:aws:s3:::test1\",\"arn:aws:s3:::test1/*\"]}}" + + resp=sts_client.assume_role_with_web_identity(RoleArn=role_response['Role']['Arn'],RoleSessionName=role_session_name,WebIdentityToken=token,Policy=session_policy) + eq(resp['ResponseMetadata']['HTTPStatusCode'],200) + + s3_client = boto3.client('s3', + aws_access_key_id = resp['Credentials']['AccessKeyId'], + aws_secret_access_key = resp['Credentials']['SecretAccessKey'], + aws_session_token = resp['Credentials']['SessionToken'], + endpoint_url=default_endpoint, + region_name='', + ) + + bucket_body = 'this is a test file' + try: + s3_put_obj = s3_client.put_object(Body=bucket_body, Bucket=bucket_name_1, Key="test-1.txt") + except ClientError as e: + s3_put_obj_error = e.response.get("Error", {}).get("Code") + eq(s3_put_obj_error, 'AccessDenied') + + oidc_remove=iam_client.delete_open_id_connect_provider( + OpenIDConnectProviderArn=oidc_arn + ) + + +@attr(resource='assume role with web identity') +@attr(method='get') +@attr(operation='check') +@attr(assertion='checking put_obj working by swapping policies') +@attr('webidentity_test') +@attr('session_policy') +def test_swapping_role_policy_and_session_policy(): + check_webidentity() + iam_client=get_iam_client() + iam_access_key=get_iam_access_key() + iam_secret_key=get_iam_secret_key() + sts_client=get_sts_client() + default_endpoint=get_config_endpoint() + role_session_name=get_parameter_name() + thumbprint=get_thumbprint() + aud=get_aud() + token=get_token() + realm=get_realm_name() + + url = 'http://localhost:8080/auth/realms/{}'.format(realm) + thumbprintlist = [thumbprint] + (oidc_arn,oidc_error) = create_oidc_provider(iam_client, url, None, thumbprintlist) + if oidc_error is not None: + raise RuntimeError('Unable to create/get openid connect provider {}'.format(oidc_error)) + + policy_document = "{\"Version\":\"2012-10-17\",\"Statement\":[{\"Effect\":\"Allow\",\"Principal\":{\"Federated\":[\""+oidc_arn+"\"]},\"Action\":[\"sts:AssumeRoleWithWebIdentity\"],\"Condition\":{\"StringEquals\":{\"localhost:8080/auth/realms/"+realm+":app_id\":\""+aud+"\"}}}]}" + (role_error,role_response,general_role_name)=create_role(iam_client,'/',None,policy_document,None,None,None) + eq(role_response['Role']['Arn'],'arn:aws:iam:::role/'+general_role_name+'') + + role_policy_new = "{\"Version\":\"2012-10-17\",\"Statement\":{\"Effect\":\"Allow\",\"Action\":[\"s3:GetObject\",\"s3:PutObject\"],\"Resource\":[\"arn:aws:s3:::test1\",\"arn:aws:s3:::test1/*\"]}}" + + (role_err,response)=put_role_policy(iam_client,general_role_name,None,role_policy_new) + eq(response['ResponseMetadata']['HTTPStatusCode'],200) + + s3_client_iam_creds = get_s3_client_using_iam_creds() + + bucket_name_1 = 'test1' + s3bucket = s3_client_iam_creds.create_bucket(Bucket=bucket_name_1) + eq(s3bucket['ResponseMetadata']['HTTPStatusCode'],200) + + session_policy = "{\"Version\":\"2012-10-17\",\"Statement\":{\"Effect\":\"Allow\",\"Action\":\"s3:*\",\"Resource\":[\"*\"]}}" + + resp=sts_client.assume_role_with_web_identity(RoleArn=role_response['Role']['Arn'],RoleSessionName=role_session_name,WebIdentityToken=token,Policy=session_policy) + eq(resp['ResponseMetadata']['HTTPStatusCode'],200) + + s3_client = boto3.client('s3', + aws_access_key_id = resp['Credentials']['AccessKeyId'], + aws_secret_access_key = resp['Credentials']['SecretAccessKey'], + aws_session_token = resp['Credentials']['SessionToken'], + endpoint_url=default_endpoint, + region_name='', + ) + bucket_body = 'this is a test file' + s3_put_obj = s3_client.put_object(Body=bucket_body, Bucket=bucket_name_1, Key="test-1.txt") + eq(s3_put_obj['ResponseMetadata']['HTTPStatusCode'],200) + + oidc_remove=iam_client.delete_open_id_connect_provider( + OpenIDConnectProviderArn=oidc_arn + ) + +@attr(resource='assume role with web identity') +@attr(method='put') +@attr(operation='check') +@attr(assertion='checking put_obj working by setting different permissions to role and session policy') +@attr('webidentity_test') +@attr('session_policy') +def test_session_policy_check_different_op_permissions(): + check_webidentity() + iam_client=get_iam_client() + iam_access_key=get_iam_access_key() + iam_secret_key=get_iam_secret_key() + sts_client=get_sts_client() + default_endpoint=get_config_endpoint() + role_session_name=get_parameter_name() + thumbprint=get_thumbprint() + aud=get_aud() + token=get_token() + realm=get_realm_name() + + url = 'http://localhost:8080/auth/realms/{}'.format(realm) + thumbprintlist = [thumbprint] + (oidc_arn,oidc_error) = create_oidc_provider(iam_client, url, None, thumbprintlist) + if oidc_error is not None: + raise RuntimeError('Unable to create/get openid connect provider {}'.format(oidc_error)) + + policy_document = "{\"Version\":\"2012-10-17\",\"Statement\":[{\"Effect\":\"Allow\",\"Principal\":{\"Federated\":[\""+oidc_arn+"\"]},\"Action\":[\"sts:AssumeRoleWithWebIdentity\"],\"Condition\":{\"StringEquals\":{\"localhost:8080/auth/realms/"+realm+":app_id\":\""+aud+"\"}}}]}" + (role_error,role_response,general_role_name)=create_role(iam_client,'/',None,policy_document,None,None,None) + eq(role_response['Role']['Arn'],'arn:aws:iam:::role/'+general_role_name+'') + + role_policy_new = "{\"Version\":\"2012-10-17\",\"Statement\":{\"Effect\":\"Allow\",\"Action\":[\"s3:PutObject\"],\"Resource\":[\"arn:aws:s3:::test1\",\"arn:aws:s3:::test1/*\"]}}" + + (role_err,response)=put_role_policy(iam_client,general_role_name,None,role_policy_new) + eq(response['ResponseMetadata']['HTTPStatusCode'],200) + + s3_client_iam_creds = get_s3_client_using_iam_creds() + + bucket_name_1 = 'test1' + s3bucket = s3_client_iam_creds.create_bucket(Bucket=bucket_name_1) + eq(s3bucket['ResponseMetadata']['HTTPStatusCode'],200) + + session_policy = "{\"Version\":\"2012-10-17\",\"Statement\":{\"Effect\":\"Allow\",\"Action\":[\"s3:GetObject\"],\"Resource\":[\"arn:aws:s3:::test1\",\"arn:aws:s3:::test1/*\"]}}" + + resp=sts_client.assume_role_with_web_identity(RoleArn=role_response['Role']['Arn'],RoleSessionName=role_session_name,WebIdentityToken=token,Policy=session_policy) + eq(resp['ResponseMetadata']['HTTPStatusCode'],200) + + s3_client = boto3.client('s3', + aws_access_key_id = resp['Credentials']['AccessKeyId'], + aws_secret_access_key = resp['Credentials']['SecretAccessKey'], + aws_session_token = resp['Credentials']['SessionToken'], + endpoint_url=default_endpoint, + region_name='', + ) + + bucket_body = 'this is a test file' + try: + s3_put_obj = s3_client.put_object(Body=bucket_body, Bucket=bucket_name_1, Key="test-1.txt") + except ClientError as e: + s3_put_obj_error = e.response.get("Error", {}).get("Code") + eq(s3_put_obj_error, 'AccessDenied') + + oidc_remove=iam_client.delete_open_id_connect_provider( + OpenIDConnectProviderArn=oidc_arn + ) + + +@attr(resource='assume role with web identity') +@attr(method='put') +@attr(operation='check') +@attr(assertion='checking op behaviour with deny effect') +@attr('webidentity_test') +@attr('session_policy') +def test_session_policy_check_with_deny_effect(): + check_webidentity() + iam_client=get_iam_client() + iam_access_key=get_iam_access_key() + iam_secret_key=get_iam_secret_key() + sts_client=get_sts_client() + default_endpoint=get_config_endpoint() + role_session_name=get_parameter_name() + thumbprint=get_thumbprint() + aud=get_aud() + token=get_token() + realm=get_realm_name() + + url = 'http://localhost:8080/auth/realms/{}'.format(realm) + thumbprintlist = [thumbprint] + (oidc_arn,oidc_error) = create_oidc_provider(iam_client, url, None, thumbprintlist) + if oidc_error is not None: + raise RuntimeError('Unable to create/get openid connect provider {}'.format(oidc_error)) + + policy_document = "{\"Version\":\"2012-10-17\",\"Statement\":[{\"Effect\":\"Allow\",\"Principal\":{\"Federated\":[\""+oidc_arn+"\"]},\"Action\":[\"sts:AssumeRoleWithWebIdentity\"],\"Condition\":{\"StringEquals\":{\"localhost:8080/auth/realms/"+realm+":app_id\":\""+aud+"\"}}}]}" + (role_error,role_response,general_role_name)=create_role(iam_client,'/',None,policy_document,None,None,None) + eq(role_response['Role']['Arn'],'arn:aws:iam:::role/'+general_role_name+'') + + role_policy_new = "{\"Version\":\"2012-10-17\",\"Statement\":{\"Effect\":\"Deny\",\"Action\":\"s3:*\",\"Resource\":[\"*\"]}}" + + (role_err,response)=put_role_policy(iam_client,general_role_name,None,role_policy_new) + eq(response['ResponseMetadata']['HTTPStatusCode'],200) + + s3_client_iam_creds = get_s3_client_using_iam_creds() + + bucket_name_1 = 'test1' + s3bucket = s3_client_iam_creds.create_bucket(Bucket=bucket_name_1) + eq(s3bucket['ResponseMetadata']['HTTPStatusCode'],200) + + session_policy = "{\"Version\":\"2012-10-17\",\"Statement\":{\"Effect\":\"Allow\",\"Action\":[\"s3:PutObject\"],\"Resource\":[\"arn:aws:s3:::test1\",\"arn:aws:s3:::test1/*\"]}}" + + resp=sts_client.assume_role_with_web_identity(RoleArn=role_response['Role']['Arn'],RoleSessionName=role_session_name,WebIdentityToken=token,Policy=session_policy) + eq(resp['ResponseMetadata']['HTTPStatusCode'],200) + + s3_client = boto3.client('s3', + aws_access_key_id = resp['Credentials']['AccessKeyId'], + aws_secret_access_key = resp['Credentials']['SecretAccessKey'], + aws_session_token = resp['Credentials']['SessionToken'], + endpoint_url=default_endpoint, + region_name='', + ) + bucket_body = 'this is a test file' + try: + s3_put_obj = s3_client.put_object(Body=bucket_body, Bucket=bucket_name_1, Key="test-1.txt") + except ClientError as e: + s3_put_obj_error = e.response.get("Error", {}).get("Code") + eq(s3_put_obj_error, 'AccessDenied') + + oidc_remove=iam_client.delete_open_id_connect_provider( + OpenIDConnectProviderArn=oidc_arn + ) + + +@attr(resource='assume role with web identity') +@attr(method='put') +@attr(operation='check') +@attr(assertion='checking put_obj working with deny and allow on same op') +@attr('webidentity_test') +@attr('session_policy') +def test_session_policy_check_with_deny_on_same_op(): + check_webidentity() + iam_client=get_iam_client() + iam_access_key=get_iam_access_key() + iam_secret_key=get_iam_secret_key() + sts_client=get_sts_client() + default_endpoint=get_config_endpoint() + role_session_name=get_parameter_name() + thumbprint=get_thumbprint() + aud=get_aud() + token=get_token() + realm=get_realm_name() + + url = 'http://localhost:8080/auth/realms/{}'.format(realm) + thumbprintlist = [thumbprint] + (oidc_arn,oidc_error) = create_oidc_provider(iam_client, url, None, thumbprintlist) + if oidc_error is not None: + raise RuntimeError('Unable to create/get openid connect provider {}'.format(oidc_error)) + + policy_document = "{\"Version\":\"2012-10-17\",\"Statement\":[{\"Effect\":\"Allow\",\"Principal\":{\"Federated\":[\""+oidc_arn+"\"]},\"Action\":[\"sts:AssumeRoleWithWebIdentity\"],\"Condition\":{\"StringEquals\":{\"localhost:8080/auth/realms/"+realm+":app_id\":\""+aud+"\"}}}]}" + (role_error,role_response,general_role_name)=create_role(iam_client,'/',None,policy_document,None,None,None) + eq(role_response['Role']['Arn'],'arn:aws:iam:::role/'+general_role_name+'') + + role_policy_new = "{\"Version\":\"2012-10-17\",\"Statement\":{\"Effect\":\"Allow\",\"Action\":[\"s3:PutObject\"],\"Resource\":[\"arn:aws:s3:::test1\",\"arn:aws:s3:::test1/*\"]}}" + + (role_err,response)=put_role_policy(iam_client,general_role_name,None,role_policy_new) + eq(response['ResponseMetadata']['HTTPStatusCode'],200) + + s3_client_iam_creds = get_s3_client_using_iam_creds() + + bucket_name_1 = 'test1' + s3bucket = s3_client_iam_creds.create_bucket(Bucket=bucket_name_1) + eq(s3bucket['ResponseMetadata']['HTTPStatusCode'],200) + + session_policy = "{\"Version\":\"2012-10-17\",\"Statement\":{\"Effect\":\"Deny\",\"Action\":[\"s3:PutObject\"],\"Resource\":[\"arn:aws:s3:::test1\",\"arn:aws:s3:::test1/*\"]}}" + + resp=sts_client.assume_role_with_web_identity(RoleArn=role_response['Role']['Arn'],RoleSessionName=role_session_name,WebIdentityToken=token,Policy=session_policy) + eq(resp['ResponseMetadata']['HTTPStatusCode'],200) + + s3_client = boto3.client('s3', + aws_access_key_id = resp['Credentials']['AccessKeyId'], + aws_secret_access_key = resp['Credentials']['SecretAccessKey'], + aws_session_token = resp['Credentials']['SessionToken'], + endpoint_url=default_endpoint, + region_name='', + ) + + bucket_body = 'this is a test file' + try: + s3_put_obj = s3_client.put_object(Body=bucket_body, Bucket=bucket_name_1, Key="test-1.txt") + except ClientError as e: + s3_put_obj_error = e.response.get("Error", {}).get("Code") + eq(s3_put_obj_error, 'AccessDenied') + + oidc_remove=iam_client.delete_open_id_connect_provider( + OpenIDConnectProviderArn=oidc_arn + ) + +@attr(resource='assume role with web identity') +@attr(method='put') +@attr(operation='check') +@attr(assertion='checking op when bucket policy has role arn') +@attr('webidentity_test') +@attr('session_policy') +def test_session_policy_bucket_policy_role_arn(): + check_webidentity() + iam_client=get_iam_client() + sts_client=get_sts_client() + default_endpoint=get_config_endpoint() + role_session_name=get_parameter_name() + thumbprint=get_thumbprint() + aud=get_aud() + token=get_token() + realm=get_realm_name() + + url = 'http://localhost:8080/auth/realms/{}'.format(realm) + thumbprintlist = [thumbprint] + (oidc_arn,oidc_error) = create_oidc_provider(iam_client, url, None, thumbprintlist) + if oidc_error is not None: + raise RuntimeError('Unable to create/get openid connect provider {}'.format(oidc_error)) + + policy_document = "{\"Version\":\"2012-10-17\",\"Statement\":[{\"Effect\":\"Allow\",\"Principal\":{\"Federated\":[\""+oidc_arn+"\"]},\"Action\":[\"sts:AssumeRoleWithWebIdentity\"],\"Condition\":{\"StringEquals\":{\"localhost:8080/auth/realms/"+realm+":app_id\":\""+aud+"\"}}}]}" + (role_error,role_response,general_role_name)=create_role(iam_client,'/',None,policy_document,None,None,None) + eq(role_response['Role']['Arn'],'arn:aws:iam:::role/'+general_role_name+'') + role_policy = "{\"Version\":\"2012-10-17\",\"Statement\":{\"Effect\":\"Allow\",\"Action\":\"s3:*\",\"Resource\":[\"*\"]}}" + + (role_err,response)=put_role_policy(iam_client,general_role_name,None,role_policy) + eq(response['ResponseMetadata']['HTTPStatusCode'],200) + + s3client_iamcreds = get_s3_client_using_iam_creds() + bucket_name_1 = 'test1' + s3bucket = s3client_iamcreds.create_bucket(Bucket=bucket_name_1) + eq(s3bucket['ResponseMetadata']['HTTPStatusCode'],200) + + resource1 = "arn:aws:s3:::" + bucket_name_1 + resource2 = "arn:aws:s3:::" + bucket_name_1 + "/*" + rolearn = "arn:aws:iam:::role/" + general_role_name + bucket_policy = json.dumps( + { + "Version": "2012-10-17", + "Statement": [{ + "Effect": "Allow", + "Principal": {"AWS": "{}".format(rolearn)}, + "Action": ["s3:GetObject","s3:PutObject"], + "Resource": [ + "{}".format(resource1), + "{}".format(resource2) + ] + }] + }) + s3client_iamcreds.put_bucket_policy(Bucket=bucket_name_1, Policy=bucket_policy) + session_policy = "{\"Version\":\"2012-10-17\",\"Statement\":{\"Effect\":\"Allow\",\"Action\":[\"s3:PutObject\"],\"Resource\":[\"arn:aws:s3:::test1\",\"arn:aws:s3:::test1/*\"]}}" + + resp=sts_client.assume_role_with_web_identity(RoleArn=role_response['Role']['Arn'],RoleSessionName=role_session_name,WebIdentityToken=token,Policy=session_policy) + eq(resp['ResponseMetadata']['HTTPStatusCode'],200) + + s3_client = boto3.client('s3', + aws_access_key_id = resp['Credentials']['AccessKeyId'], + aws_secret_access_key = resp['Credentials']['SecretAccessKey'], + aws_session_token = resp['Credentials']['SessionToken'], + endpoint_url=default_endpoint, + region_name='', + ) + bucket_body = 'this is a test file' + s3_put_obj = s3_client.put_object(Body=bucket_body, Bucket=bucket_name_1, Key="test-1.txt") + eq(s3_put_obj['ResponseMetadata']['HTTPStatusCode'],200) + + try: + obj = s3_client.get_object(Bucket=bucket_name_1, Key="test-1.txt") + except ClientError as e: + s3object_error = e.response.get("Error", {}).get("Code") + eq(s3object_error, 'AccessDenied') + + oidc_remove=iam_client.delete_open_id_connect_provider( + OpenIDConnectProviderArn=oidc_arn + ) + +@attr(resource='assume role with web identity') +@attr(method='get') +@attr(operation='check') +@attr(assertion='checking op when bucket policy has session arn') +@attr('webidentity_test') +@attr('session_policy') +def test_session_policy_bucket_policy_session_arn(): + check_webidentity() + iam_client=get_iam_client() + sts_client=get_sts_client() + default_endpoint=get_config_endpoint() + role_session_name=get_parameter_name() + thumbprint=get_thumbprint() + aud=get_aud() + token=get_token() + realm=get_realm_name() + + url = 'http://localhost:8080/auth/realms/{}'.format(realm) + thumbprintlist = [thumbprint] + (oidc_arn,oidc_error) = create_oidc_provider(iam_client, url, None, thumbprintlist) + if oidc_error is not None: + raise RuntimeError('Unable to create/get openid connect provider {}'.format(oidc_error)) + + policy_document = "{\"Version\":\"2012-10-17\",\"Statement\":[{\"Effect\":\"Allow\",\"Principal\":{\"Federated\":[\""+oidc_arn+"\"]},\"Action\":[\"sts:AssumeRoleWithWebIdentity\"],\"Condition\":{\"StringEquals\":{\"localhost:8080/auth/realms/"+realm+":app_id\":\""+aud+"\"}}}]}" + (role_error,role_response,general_role_name)=create_role(iam_client,'/',None,policy_document,None,None,None) + eq(role_response['Role']['Arn'],'arn:aws:iam:::role/'+general_role_name+'') + role_policy = "{\"Version\":\"2012-10-17\",\"Statement\":{\"Effect\":\"Allow\",\"Action\":\"s3:*\",\"Resource\":[\"*\"]}}" + + (role_err,response)=put_role_policy(iam_client,general_role_name,None,role_policy) + eq(response['ResponseMetadata']['HTTPStatusCode'],200) + + s3client_iamcreds = get_s3_client_using_iam_creds() + bucket_name_1 = 'test1' + s3bucket = s3client_iamcreds.create_bucket(Bucket=bucket_name_1) + eq(s3bucket['ResponseMetadata']['HTTPStatusCode'],200) + + resource1 = "arn:aws:s3:::" + bucket_name_1 + resource2 = "arn:aws:s3:::" + bucket_name_1 + "/*" + rolesessionarn = "arn:aws:iam:::assumed-role/" + general_role_name + "/" + role_session_name + bucket_policy = json.dumps( + { + "Version": "2012-10-17", + "Statement": [{ + "Effect": "Allow", + "Principal": {"AWS": "{}".format(rolesessionarn)}, + "Action": ["s3:GetObject","s3:PutObject"], + "Resource": [ + "{}".format(resource1), + "{}".format(resource2) + ] + }] + }) + s3client_iamcreds.put_bucket_policy(Bucket=bucket_name_1, Policy=bucket_policy) + session_policy = "{\"Version\":\"2012-10-17\",\"Statement\":{\"Effect\":\"Allow\",\"Action\":[\"s3:PutObject\"],\"Resource\":[\"arn:aws:s3:::test1\",\"arn:aws:s3:::test1/*\"]}}" + + resp=sts_client.assume_role_with_web_identity(RoleArn=role_response['Role']['Arn'],RoleSessionName=role_session_name,WebIdentityToken=token,Policy=session_policy) + eq(resp['ResponseMetadata']['HTTPStatusCode'],200) + + s3_client = boto3.client('s3', + aws_access_key_id = resp['Credentials']['AccessKeyId'], + aws_secret_access_key = resp['Credentials']['SecretAccessKey'], + aws_session_token = resp['Credentials']['SessionToken'], + endpoint_url=default_endpoint, + region_name='', + ) + bucket_body = 'this is a test file' + s3_put_obj = s3_client.put_object(Body=bucket_body, Bucket=bucket_name_1, Key="test-1.txt") + eq(s3_put_obj['ResponseMetadata']['HTTPStatusCode'],200) + + + s3_get_obj = s3_client.get_object(Bucket=bucket_name_1, Key="test-1.txt") + eq(s3_get_obj['ResponseMetadata']['HTTPStatusCode'],200) + + oidc_remove=iam_client.delete_open_id_connect_provider( + OpenIDConnectProviderArn=oidc_arn + ) + +@attr(resource='assume role with web identity') +@attr(method='put') +@attr(operation='check') +@attr(assertion='checking copy object op with role, session and bucket policy') +@attr('webidentity_test') +@attr('session_policy') +def test_session_policy_copy_object(): + check_webidentity() + iam_client=get_iam_client() + sts_client=get_sts_client() + default_endpoint=get_config_endpoint() + role_session_name=get_parameter_name() + thumbprint=get_thumbprint() + aud=get_aud() + token=get_token() + realm=get_realm_name() + + url = 'http://localhost:8080/auth/realms/{}'.format(realm) + thumbprintlist = [thumbprint] + (oidc_arn,oidc_error) = create_oidc_provider(iam_client, url, None, thumbprintlist) + if oidc_error is not None: + raise RuntimeError('Unable to create/get openid connect provider {}'.format(oidc_error)) + + policy_document = "{\"Version\":\"2012-10-17\",\"Statement\":[{\"Effect\":\"Allow\",\"Principal\":{\"Federated\":[\""+oidc_arn+"\"]},\"Action\":[\"sts:AssumeRoleWithWebIdentity\"],\"Condition\":{\"StringEquals\":{\"localhost:8080/auth/realms/"+realm+":app_id\":\""+aud+"\"}}}]}" + (role_error,role_response,general_role_name)=create_role(iam_client,'/',None,policy_document,None,None,None) + eq(role_response['Role']['Arn'],'arn:aws:iam:::role/'+general_role_name+'') + role_policy = "{\"Version\":\"2012-10-17\",\"Statement\":{\"Effect\":\"Allow\",\"Action\":\"s3:*\",\"Resource\":[\"*\"]}}" + + (role_err,response)=put_role_policy(iam_client,general_role_name,None,role_policy) + eq(response['ResponseMetadata']['HTTPStatusCode'],200) + + s3client_iamcreds = get_s3_client_using_iam_creds() + bucket_name_1 = 'test1' + s3bucket = s3client_iamcreds.create_bucket(Bucket=bucket_name_1) + eq(s3bucket['ResponseMetadata']['HTTPStatusCode'],200) + + resource1 = "arn:aws:s3:::" + bucket_name_1 + resource2 = "arn:aws:s3:::" + bucket_name_1 + "/*" + rolesessionarn = "arn:aws:iam:::assumed-role/" + general_role_name + "/" + role_session_name + print (rolesessionarn) + bucket_policy = json.dumps( + { + "Version": "2012-10-17", + "Statement": [{ + "Effect": "Allow", + "Principal": {"AWS": "{}".format(rolesessionarn)}, + "Action": ["s3:GetObject","s3:PutObject"], + "Resource": [ + "{}".format(resource1), + "{}".format(resource2) + ] + }] + }) + s3client_iamcreds.put_bucket_policy(Bucket=bucket_name_1, Policy=bucket_policy) + session_policy = "{\"Version\":\"2012-10-17\",\"Statement\":{\"Effect\":\"Allow\",\"Action\":[\"s3:PutObject\"],\"Resource\":[\"arn:aws:s3:::test1\",\"arn:aws:s3:::test1/*\"]}}" + + resp=sts_client.assume_role_with_web_identity(RoleArn=role_response['Role']['Arn'],RoleSessionName=role_session_name,WebIdentityToken=token,Policy=session_policy) + eq(resp['ResponseMetadata']['HTTPStatusCode'],200) + + s3_client = boto3.client('s3', + aws_access_key_id = resp['Credentials']['AccessKeyId'], + aws_secret_access_key = resp['Credentials']['SecretAccessKey'], + aws_session_token = resp['Credentials']['SessionToken'], + endpoint_url=default_endpoint, + region_name='', + ) + bucket_body = 'this is a test file' + s3_put_obj = s3_client.put_object(Body=bucket_body, Bucket=bucket_name_1, Key="test-1.txt") + eq(s3_put_obj['ResponseMetadata']['HTTPStatusCode'],200) + + copy_source = { + 'Bucket': bucket_name_1, + 'Key': 'test-1.txt' + } + + s3_client.copy(copy_source, bucket_name_1, "test-2.txt") + + s3_get_obj = s3_client.get_object(Bucket=bucket_name_1, Key="test-2.txt") + eq(s3_get_obj['ResponseMetadata']['HTTPStatusCode'],200) + + oidc_remove=iam_client.delete_open_id_connect_provider( + OpenIDConnectProviderArn=oidc_arn + ) + +@attr(resource='assume role with web identity') +@attr(method='put') +@attr(operation='check') +@attr(assertion='checking op is denied when no role policy') +@attr('webidentity_test') +@attr('session_policy') +def test_session_policy_no_bucket_role_policy(): + check_webidentity() + iam_client=get_iam_client() + sts_client=get_sts_client() + default_endpoint=get_config_endpoint() + role_session_name=get_parameter_name() + thumbprint=get_thumbprint() + aud=get_aud() + token=get_token() + realm=get_realm_name() + + url = 'http://localhost:8080/auth/realms/{}'.format(realm) + thumbprintlist = [thumbprint] + (oidc_arn,oidc_error) = create_oidc_provider(iam_client, url, None, thumbprintlist) + if oidc_error is not None: + raise RuntimeError('Unable to create/get openid connect provider {}'.format(oidc_error)) + + policy_document = "{\"Version\":\"2012-10-17\",\"Statement\":[{\"Effect\":\"Allow\",\"Principal\":{\"Federated\":[\""+oidc_arn+"\"]},\"Action\":[\"sts:AssumeRoleWithWebIdentity\"],\"Condition\":{\"StringEquals\":{\"localhost:8080/auth/realms/"+realm+":app_id\":\""+aud+"\"}}}]}" + (role_error,role_response,general_role_name)=create_role(iam_client,'/',None,policy_document,None,None,None) + eq(role_response['Role']['Arn'],'arn:aws:iam:::role/'+general_role_name+'') + + s3client_iamcreds = get_s3_client_using_iam_creds() + bucket_name_1 = 'test1' + s3bucket = s3client_iamcreds.create_bucket(Bucket=bucket_name_1) + eq(s3bucket['ResponseMetadata']['HTTPStatusCode'],200) + + session_policy = "{\"Version\":\"2012-10-17\",\"Statement\":{\"Effect\":\"Allow\",\"Action\":[\"s3:PutObject\",\"s3:GetObject\"],\"Resource\":[\"arn:aws:s3:::test1\",\"arn:aws:s3:::test1/*\"]}}" + + resp=sts_client.assume_role_with_web_identity(RoleArn=role_response['Role']['Arn'],RoleSessionName=role_session_name,WebIdentityToken=token,Policy=session_policy) + eq(resp['ResponseMetadata']['HTTPStatusCode'],200) + + s3_client = boto3.client('s3', + aws_access_key_id = resp['Credentials']['AccessKeyId'], + aws_secret_access_key = resp['Credentials']['SecretAccessKey'], + aws_session_token = resp['Credentials']['SessionToken'], + endpoint_url=default_endpoint, + region_name='', + ) + bucket_body = 'this is a test file' + try: + s3_put_obj = s3_client.put_object(Body=bucket_body, Bucket=bucket_name_1, Key="test-1.txt") + except ClientError as e: + s3putobj_error = e.response.get("Error", {}).get("Code") + eq(s3putobj_error, 'AccessDenied') + + oidc_remove=iam_client.delete_open_id_connect_provider( + OpenIDConnectProviderArn=oidc_arn + ) + +@attr(resource='assume role with web identity') +@attr(method='put') +@attr(operation='check') +@attr(assertion='checking op is denied when resource policy denies') +@attr('webidentity_test') +@attr('session_policy') +def test_session_policy_bucket_policy_deny(): + check_webidentity() + iam_client=get_iam_client() + sts_client=get_sts_client() + default_endpoint=get_config_endpoint() + role_session_name=get_parameter_name() + thumbprint=get_thumbprint() + aud=get_aud() + token=get_token() + realm=get_realm_name() + + url = 'http://localhost:8080/auth/realms/{}'.format(realm) + thumbprintlist = [thumbprint] + (oidc_arn,oidc_error) = create_oidc_provider(iam_client, url, None, thumbprintlist) + if oidc_error is not None: + raise RuntimeError('Unable to create/get openid connect provider {}'.format(oidc_error)) + + policy_document = "{\"Version\":\"2012-10-17\",\"Statement\":[{\"Effect\":\"Allow\",\"Principal\":{\"Federated\":[\""+oidc_arn+"\"]},\"Action\":[\"sts:AssumeRoleWithWebIdentity\"],\"Condition\":{\"StringEquals\":{\"localhost:8080/auth/realms/"+realm+":app_id\":\""+aud+"\"}}}]}" + (role_error,role_response,general_role_name)=create_role(iam_client,'/',None,policy_document,None,None,None) + eq(role_response['Role']['Arn'],'arn:aws:iam:::role/'+general_role_name+'') + role_policy = "{\"Version\":\"2012-10-17\",\"Statement\":{\"Effect\":\"Allow\",\"Action\":\"s3:*\",\"Resource\":[\"*\"]}}" + + (role_err,response)=put_role_policy(iam_client,general_role_name,None,role_policy) + eq(response['ResponseMetadata']['HTTPStatusCode'],200) + + s3client_iamcreds = get_s3_client_using_iam_creds() + bucket_name_1 = 'test1' + s3bucket = s3client_iamcreds.create_bucket(Bucket=bucket_name_1) + eq(s3bucket['ResponseMetadata']['HTTPStatusCode'],200) + + resource1 = "arn:aws:s3:::" + bucket_name_1 + resource2 = "arn:aws:s3:::" + bucket_name_1 + "/*" + rolesessionarn = "arn:aws:iam:::assumed-role/" + general_role_name + "/" + role_session_name + bucket_policy = json.dumps( + { + "Version": "2012-10-17", + "Statement": [{ + "Effect": "Deny", + "Principal": {"AWS": "{}".format(rolesessionarn)}, + "Action": ["s3:GetObject","s3:PutObject"], + "Resource": [ + "{}".format(resource1), + "{}".format(resource2) + ] + }] + }) + s3client_iamcreds.put_bucket_policy(Bucket=bucket_name_1, Policy=bucket_policy) + session_policy = "{\"Version\":\"2012-10-17\",\"Statement\":{\"Effect\":\"Allow\",\"Action\":[\"s3:PutObject\"],\"Resource\":[\"arn:aws:s3:::test1\",\"arn:aws:s3:::test1/*\"]}}" + + resp=sts_client.assume_role_with_web_identity(RoleArn=role_response['Role']['Arn'],RoleSessionName=role_session_name,WebIdentityToken=token,Policy=session_policy) + eq(resp['ResponseMetadata']['HTTPStatusCode'],200) + + s3_client = boto3.client('s3', + aws_access_key_id = resp['Credentials']['AccessKeyId'], + aws_secret_access_key = resp['Credentials']['SecretAccessKey'], + aws_session_token = resp['Credentials']['SessionToken'], + endpoint_url=default_endpoint, + region_name='', + ) + bucket_body = 'this is a test file' + + try: + s3_put_obj = s3_client.put_object(Body=bucket_body, Bucket=bucket_name_1, Key="test-1.txt") + except ClientError as e: + s3putobj_error = e.response.get("Error", {}).get("Code") + eq(s3putobj_error, 'AccessDenied') + + oidc_remove=iam_client.delete_open_id_connect_provider( + OpenIDConnectProviderArn=oidc_arn + )