diff --git a/README.rst b/README.rst index 96e0bbe..c36a8a7 100644 --- a/README.rst +++ b/README.rst @@ -58,13 +58,16 @@ You can run only the boto3 tests with:: STS compatibility tests ======================== -This section contains some basic tests for the AssumeRole and GetSessionToken API's. The test file is located under ``s3tests_boto3/functional``. +This section contains some basic tests for the AssumeRole, GetSessionToken and AssumeRoleWithWebIdentity API's. The test file is located under ``s3tests_boto3/functional``. -You can run only the sts tests with:: +You can run only the sts tests (all the three API's) with:: S3TEST_CONF=your.conf ./virtualenv/bin/nosetests s3tests_boto3.functional.test_sts -You can filter tests based on the attributes. There is a attribute named ``sts_test`` to run specifically the sts tests as below:: +You can filter tests based on the attributes. There is a attribute named ``test_of_sts`` to run AssumeRole and GetSessionToken tests and ``webidentity_test`` to run the AssumeRoleWithWebIdentity tests. If you want to execute only ``test_of_sts`` tests you can apply that filter as below:: - S3TEST_CONF=your.conf ./virtualenv/bin/nosetests -v -s -A 'sts_test' s3tests_boto3.functional.test_sts + S3TEST_CONF=your.conf ./virtualenv/bin/nosetests -v -s -A 'test_of_sts' s3tests_boto3.functional.test_sts +For running ``webidentity_test`` you'll need have Keycloak running. + +In order to run any STS test you'll need to add "iam" section to the config file. For further reference on how your config file should look check ``s3tests.conf.SAMPLE``. diff --git a/s3tests.conf.SAMPLE b/s3tests.conf.SAMPLE index 39c8694..828ea90 100644 --- a/s3tests.conf.SAMPLE +++ b/s3tests.conf.SAMPLE @@ -69,6 +69,7 @@ secret_key = opqrstuvwxyzabcdefghijklmnopqrstuvwxyzab # tenant email set in vstart.sh email = tenanteduser@example.com +#following section needs to be added for all sts-tests [iam] #used for iam operations in sts-tests #user_id from vstart.sh @@ -82,3 +83,15 @@ secret_key = abcdefghijklmnopqrstuvwxyzabcdefghijklmn #display_name from vstart.sh display_name = youruseridhere + +#following section needs to be added when you want to run Assume Role With Webidentity test +[webidentity] +#used for assume role with web identity test in sts-tests +#all parameters will be obtained from ceph/qa/tasks/keycloak.py +token= + +aud= + +thumbprint= + +KC_REALM= diff --git a/s3tests_boto3/functional/__init__.py b/s3tests_boto3/functional/__init__.py index 8883152..92bc3d5 100644 --- a/s3tests_boto3/functional/__init__.py +++ b/s3tests_boto3/functional/__init__.py @@ -158,8 +158,6 @@ def setup(): raise RuntimeError('Your config file is missing the "s3 alt" section!') if not cfg.has_section("s3 tenant"): raise RuntimeError('Your config file is missing the "s3 tenant" section!') - if not cfg.has_section("iam"): - raise RuntimeError('Your config file is missing the "iam" section!') global prefix @@ -207,12 +205,6 @@ def setup(): config.tenant_user_id = cfg.get('s3 tenant',"user_id") config.tenant_email = cfg.get('s3 tenant',"email") - config.iam_access_key = cfg.get('iam',"access_key") - config.iam_secret_key = cfg.get('iam',"secret_key") - config.iam_display_name = cfg.get('iam',"display_name") - config.iam_user_id = cfg.get('iam',"user_id") - #config.iam_email = cfg.get('iam',"email") - # vars from the fixtures section try: template = cfg.get('fixtures', "bucket prefix") @@ -233,6 +225,24 @@ def teardown(): nuke_prefixed_buckets(prefix=prefix, client=alt_client) nuke_prefixed_buckets(prefix=prefix, client=tenant_client) +def check_webidentity(): + cfg = configparser.RawConfigParser() + try: + path = os.environ['S3TEST_CONF'] + except KeyError: + raise RuntimeError( + 'To run tests, point environment ' + + 'variable S3TEST_CONF to a config file.', + ) + cfg.read(path) + if not cfg.has_section("webidentity"): + raise RuntimeError('Your config file is missing the "webidentity" section!') + + config.webidentity_thumbprint = cfg.get('webidentity', "thumbprint") + config.webidentity_aud = cfg.get('webidentity', "aud") + config.webidentity_token = cfg.get('webidentity', "token") + config.webidentity_realm = cfg.get('webidentity', "KC_REALM") + def get_client(client_config=None): if client_config == None: client_config = Config(signature_version='s3v4') @@ -268,6 +278,24 @@ def get_sts_client(client_config=None): return client def get_iam_client(client_config=None): + cfg = configparser.RawConfigParser() + try: + path = os.environ['S3TEST_CONF'] + except KeyError: + raise RuntimeError( + 'To run tests, point environment ' + + 'variable S3TEST_CONF to a config file.', + ) + cfg.read(path) + if not cfg.has_section("iam"): + raise RuntimeError('Your config file is missing the "iam" section!') + + config.iam_access_key = cfg.get('iam',"access_key") + config.iam_secret_key = cfg.get('iam',"secret_key") + config.iam_display_name = cfg.get('iam',"display_name") + config.iam_user_id = cfg.get('iam',"user_id") + config.iam_email = cfg.get('iam',"email") + if client_config == None: client_config = Config(signature_version='s3v4') @@ -474,3 +502,15 @@ def get_tenant_user_id(): def get_tenant_email(): return config.tenant_email + +def get_thumbprint(): + return config.webidentity_thumbprint + +def get_aud(): + return config.webidentity_aud + +def get_token(): + return config.webidentity_token + +def get_realm_name(): + return config.webidentity_realm diff --git a/s3tests_boto3/functional/test_sts.py b/s3tests_boto3/functional/test_sts.py index 4799f14..fb4ab11 100644 --- a/s3tests_boto3/functional/test_sts.py +++ b/s3tests_boto3/functional/test_sts.py @@ -41,8 +41,15 @@ from . import( get_parameter_name, get_main_aws_access_key, get_main_aws_secret_key, + get_thumbprint, + get_aud, + get_token, + get_realm_name, + check_webidentity ) +log = logging.getLogger(__name__) + def create_role(iam_client,path,rolename,policy_document,description,sessionduration,permissionboundary): role_err=None if rolename is None: @@ -77,17 +84,20 @@ def put_user_policy(iam_client,username,policyname,policy_document): @attr(method='get') @attr(operation='check') @attr(assertion='s3 ops only accessible by temporary credentials') -@attr('sts_test') +@attr('test_of_sts') def test_get_session_token(): iam_client=get_iam_client() sts_client=get_sts_client() sts_user_id=get_alt_user_id() default_endpoint=get_config_endpoint() + user_policy = "{\"Version\":\"2012-10-17\",\"Statement\":[{\"Effect\":\"Deny\",\"Action\":\"s3:*\",\"Resource\":[\"*\"],\"Condition\":{\"BoolIfExists\":{\"sts:authentication\":\"false\"}}},{\"Effect\":\"Allow\",\"Action\":\"sts:GetSessionToken\",\"Resource\":\"*\",\"Condition\":{\"BoolIfExists\":{\"sts:authentication\":\"false\"}}}]}" (resp_err,resp)=put_user_policy(iam_client,sts_user_id,None,user_policy) eq(resp['ResponseMetadata']['HTTPStatusCode'],200) + response=sts_client.get_session_token() eq(response['ResponseMetadata']['HTTPStatusCode'],200) + s3_client=boto3.client('s3', aws_access_key_id = response['Credentials']['AccessKeyId'], aws_secret_access_key = response['Credentials']['SecretAccessKey'], @@ -104,7 +114,7 @@ def test_get_session_token(): @attr(method='get') @attr(operation='check') @attr(assertion='s3 ops denied by permanent credentials') -@attr('sts_test') +@attr('test_of_sts') def test_get_session_token_permanent_creds_denied(): s3bucket_error=None iam_client=get_iam_client() @@ -113,11 +123,14 @@ def test_get_session_token_permanent_creds_denied(): default_endpoint=get_config_endpoint() s3_main_access_key=get_main_aws_access_key() s3_main_secret_key=get_main_aws_secret_key() + user_policy = "{\"Version\":\"2012-10-17\",\"Statement\":[{\"Effect\":\"Deny\",\"Action\":\"s3:*\",\"Resource\":[\"*\"],\"Condition\":{\"BoolIfExists\":{\"sts:authentication\":\"false\"}}},{\"Effect\":\"Allow\",\"Action\":\"sts:GetSessionToken\",\"Resource\":\"*\",\"Condition\":{\"BoolIfExists\":{\"sts:authentication\":\"false\"}}}]}" (resp_err,resp)=put_user_policy(iam_client,sts_user_id,None,user_policy) eq(resp['ResponseMetadata']['HTTPStatusCode'],200) + response=sts_client.get_session_token() eq(response['ResponseMetadata']['HTTPStatusCode'],200) + s3_client=boto3.client('s3', aws_access_key_id = s3_main_access_key, aws_secret_access_key = s3_main_secret_key, @@ -136,21 +149,25 @@ def test_get_session_token_permanent_creds_denied(): @attr(method='get') @attr(operation='check') @attr(assertion='role policy allows all s3 ops') -@attr('sts_test') +@attr('test_of_sts') def test_assume_role_allow(): iam_client=get_iam_client() sts_client=get_sts_client() sts_user_id=get_alt_user_id() default_endpoint=get_config_endpoint() role_session_name=get_parameter_name() + policy_document = "{\"Version\":\"2012-10-17\",\"Statement\":[{\"Effect\":\"Allow\",\"Principal\":{\"AWS\":[\"arn:aws:iam:::user/"+sts_user_id+"\"]},\"Action\":[\"sts:AssumeRole\"]}]}" (role_error,role_response,general_role_name)=create_role(iam_client,'/',None,policy_document,None,None,None) eq(role_response['Role']['Arn'],'arn:aws:iam:::role/'+general_role_name+'') + role_policy = "{\"Version\":\"2012-10-17\",\"Statement\":{\"Effect\":\"Allow\",\"Action\":\"s3:*\",\"Resource\":\"arn:aws:s3:::*\"}}" (role_err,response)=put_role_policy(iam_client,general_role_name,None,role_policy) eq(response['ResponseMetadata']['HTTPStatusCode'],200) + resp=sts_client.assume_role(RoleArn=role_response['Role']['Arn'],RoleSessionName=role_session_name) eq(resp['ResponseMetadata']['HTTPStatusCode'],200) + s3_client = boto3.client('s3', aws_access_key_id = resp['Credentials']['AccessKeyId'], aws_secret_access_key = resp['Credentials']['SecretAccessKey'], @@ -168,7 +185,7 @@ def test_assume_role_allow(): @attr(method='get') @attr(operation='check') @attr(assertion='role policy denies all s3 ops') -@attr('sts_test') +@attr('test_of_sts') def test_assume_role_deny(): s3bucket_error=None iam_client=get_iam_client() @@ -176,14 +193,18 @@ def test_assume_role_deny(): sts_user_id=get_alt_user_id() default_endpoint=get_config_endpoint() role_session_name=get_parameter_name() + policy_document = "{\"Version\":\"2012-10-17\",\"Statement\":[{\"Effect\":\"Allow\",\"Principal\":{\"AWS\":[\"arn:aws:iam:::user/"+sts_user_id+"\"]},\"Action\":[\"sts:AssumeRole\"]}]}" (role_error,role_response,general_role_name)=create_role(iam_client,'/',None,policy_document,None,None,None) eq(role_response['Role']['Arn'],'arn:aws:iam:::role/'+general_role_name+'') + role_policy = "{\"Version\":\"2012-10-17\",\"Statement\":{\"Effect\":\"Deny\",\"Action\":\"s3:*\",\"Resource\":\"arn:aws:s3:::*\"}}" (role_err,response)=put_role_policy(iam_client,general_role_name,None,role_policy) eq(response['ResponseMetadata']['HTTPStatusCode'],200) + resp=sts_client.assume_role(RoleArn=role_response['Role']['Arn'],RoleSessionName=role_session_name) eq(resp['ResponseMetadata']['HTTPStatusCode'],200) + s3_client = boto3.client('s3', aws_access_key_id = resp['Credentials']['AccessKeyId'], aws_secret_access_key = resp['Credentials']['SecretAccessKey'], @@ -202,22 +223,26 @@ def test_assume_role_deny(): @attr(method='get') @attr(operation='check') @attr(assertion='creds expire so all s3 ops fails') -@attr('sts_test') +@attr('test_of_sts') def test_assume_role_creds_expiry(): iam_client=get_iam_client() sts_client=get_sts_client() sts_user_id=get_alt_user_id() default_endpoint=get_config_endpoint() role_session_name=get_parameter_name() + policy_document = "{\"Version\":\"2012-10-17\",\"Statement\":[{\"Effect\":\"Allow\",\"Principal\":{\"AWS\":[\"arn:aws:iam:::user/"+sts_user_id+"\"]},\"Action\":[\"sts:AssumeRole\"]}]}" (role_error,role_response,general_role_name)=create_role(iam_client,'/',None,policy_document,None,None,None) eq(role_response['Role']['Arn'],'arn:aws:iam:::role/'+general_role_name+'') + role_policy = "{\"Version\":\"2012-10-17\",\"Statement\":{\"Effect\":\"Allow\",\"Action\":\"s3:*\",\"Resource\":\"arn:aws:s3:::*\"}}" (role_err,response)=put_role_policy(iam_client,general_role_name,None,role_policy) eq(response['ResponseMetadata']['HTTPStatusCode'],200) + resp=sts_client.assume_role(RoleArn=role_response['Role']['Arn'],RoleSessionName=role_session_name,DurationSeconds=900) eq(resp['ResponseMetadata']['HTTPStatusCode'],200) time.sleep(900) + s3_client = boto3.client('s3', aws_access_key_id = resp['Credentials']['AccessKeyId'], aws_secret_access_key = resp['Credentials']['SecretAccessKey'], @@ -231,3 +256,101 @@ def test_assume_role_creds_expiry(): except ClientError as e: s3bucket_error = e.response.get("Error", {}).get("Code") eq(s3bucket_error,'AccessDenied') + +@attr(resource='assume role with web identity') +@attr(method='get') +@attr(operation='check') +@attr(assertion='assuming role through web token') +@attr('webidentity_test') +def test_assume_role_with_web_identity(): + check_webidentity() + iam_client=get_iam_client() + sts_client=get_sts_client() + default_endpoint=get_config_endpoint() + role_session_name=get_parameter_name() + thumbprint=get_thumbprint() + aud=get_aud() + token=get_token() + realm=get_realm_name() + + oidc_response = iam_client.create_open_id_connect_provider( + Url='http://localhost:8080/auth/realms/{}'.format(realm), + ThumbprintList=[ + thumbprint, + ], + ) + + policy_document = "{\"Version\":\"2012-10-17\",\"Statement\":[{\"Effect\":\"Allow\",\"Principal\":{\"Federated\":[\""+oidc_response["OpenIDConnectProviderArn"]+"\"]},\"Action\":[\"sts:AssumeRoleWithWebIdentity\"],\"Condition\":{\"StringEquals\":{\"localhost:8080/auth/realms/"+realm+":app_id\":\""+aud+"\"}}}]}" + (role_error,role_response,general_role_name)=create_role(iam_client,'/',None,policy_document,None,None,None) + eq(role_response['Role']['Arn'],'arn:aws:iam:::role/'+general_role_name+'') + + role_policy = "{\"Version\":\"2012-10-17\",\"Statement\":{\"Effect\":\"Allow\",\"Action\":\"s3:*\",\"Resource\":\"arn:aws:s3:::*\"}}" + (role_err,response)=put_role_policy(iam_client,general_role_name,None,role_policy) + eq(response['ResponseMetadata']['HTTPStatusCode'],200) + + resp=sts_client.assume_role_with_web_identity(RoleArn=role_response['Role']['Arn'],RoleSessionName=role_session_name,WebIdentityToken=token) + eq(resp['ResponseMetadata']['HTTPStatusCode'],200) + + s3_client = boto3.client('s3', + aws_access_key_id = resp['Credentials']['AccessKeyId'], + aws_secret_access_key = resp['Credentials']['SecretAccessKey'], + aws_session_token = resp['Credentials']['SessionToken'], + endpoint_url=default_endpoint, + region_name='', + ) + bucket_name = get_new_bucket_name() + s3bucket = s3_client.create_bucket(Bucket=bucket_name) + eq(s3bucket['ResponseMetadata']['HTTPStatusCode'],200) + bkt = s3_client.delete_bucket(Bucket=bucket_name) + eq(bkt['ResponseMetadata']['HTTPStatusCode'],204) + + oidc_remove=iam_client.delete_open_id_connect_provider( + OpenIDConnectProviderArn=oidc_response["OpenIDConnectProviderArn"] + ) + +''' +@attr(resource='assume role with web identity') +@attr(method='get') +@attr(operation='check') +@attr(assertion='assume_role_with_web_token creds expire') +@attr('webidentity_test') +def test_assume_role_with_web_identity_invalid_webtoken(): + resp_error=None + iam_client=get_iam_client() + sts_client=get_sts_client() + default_endpoint=get_config_endpoint() + role_session_name=get_parameter_name() + thumbprint=get_thumbprint() + aud=get_aud() + token=get_token() + realm=get_realm_name() + + oidc_response = iam_client.create_open_id_connect_provider( + Url='http://localhost:8080/auth/realms/{}'.format(realm), + ThumbprintList=[ + thumbprint, + ], + ) + + policy_document = "{\"Version\":\"2012-10-17\",\"Statement\":[{\"Effect\":\"Allow\",\"Principal\":{\"Federated\":[\""+oidc_response["OpenIDConnectProviderArn"]+"\"]},\"Action\":[\"sts:AssumeRoleWithWebIdentity\"],\"Condition\":{\"StringEquals\":{\"localhost:8080/auth/realms/"+realm+":app_id\":\""+aud+"\"}}}]}" + (role_error,role_response,general_role_name)=create_role(iam_client,'/',None,policy_document,None,None,None) + eq(role_response['Role']['Arn'],'arn:aws:iam:::role/'+general_role_name+'') + + role_policy = "{\"Version\":\"2012-10-17\",\"Statement\":{\"Effect\":\"Allow\",\"Action\":\"s3:*\",\"Resource\":\"arn:aws:s3:::*\"}}" + (role_err,response)=put_role_policy(iam_client,general_role_name,None,role_policy) + eq(response['ResponseMetadata']['HTTPStatusCode'],200) + + resp="" + try: + resp=sts_client.assume_role_with_web_identity(RoleArn=role_response['Role']['Arn'],RoleSessionName=role_session_name,WebIdentityToken='abcdef') + except InvalidIdentityTokenException as e: + log.debug('{}'.format(resp)) + log.debug('{}'.format(e.response.get("Error", {}).get("Code"))) + log.debug('{}'.format(e)) + resp_error = e.response.get("Error", {}).get("Code") + eq(resp_error,'AccessDenied') + + oidc_remove=iam_client.delete_open_id_connect_provider( + OpenIDConnectProviderArn=oidc_response["OpenIDConnectProviderArn"] + ) +'''