s3-tests/s3tests_boto3/functional/test_sts.py
root daf9062a22 STS issue fix (https://tracker.ceph.com/issues/47588)
This is the fix for the issue reported (https://tracker.ceph.com/issues/47588). The issue was with the argument which was passed to the function. After removing that argument (as it's already an optional argument) the issue is fixed.

Signed-off-by: Kalpesh Pandya <kapandya@redhat.com>
2020-10-06 02:02:44 +05:30

233 lines
10 KiB
Python

import boto3
import botocore.session
from botocore.exceptions import ClientError
from botocore.exceptions import ParamValidationError
from nose.tools import eq_ as eq
from nose.plugins.attrib import attr
from nose.plugins.skip import SkipTest
import isodate
import email.utils
import datetime
import threading
import re
import pytz
from collections import OrderedDict
import requests
import json
import base64
import hmac
import hashlib
import xml.etree.ElementTree as ET
import time
import operator
import nose
import os
import string
import random
import socket
import ssl
import logging
from collections import namedtuple
from email.header import decode_header
from . import(
get_iam_client,
get_sts_client,
get_client,
get_alt_user_id,
get_config_endpoint,
get_new_bucket_name,
get_parameter_name,
get_main_aws_access_key,
get_main_aws_secret_key,
)
def create_role(iam_client,path,rolename,policy_document,description,sessionduration,permissionboundary):
role_err=None
if rolename is None:
rolename=get_parameter_name()
try:
role_response = iam_client.create_role(Path=path,RoleName=rolename,AssumeRolePolicyDocument=policy_document,)
except ClientError as e:
role_err = e.response['Code']
return (role_err,role_response,rolename)
def put_role_policy(iam_client,rolename,policyname,role_policy):
role_err=None
if policyname is None:
policyname=get_parameter_name()
try:
role_response = iam_client.put_role_policy(RoleName=rolename,PolicyName=policyname,PolicyDocument=role_policy)
except ClientError as e:
role_err = e.response['Code']
return (role_err,role_response)
def put_user_policy(iam_client,username,policyname,policy_document):
role_err=None
if policyname is None:
policyname=get_parameter_name()
try:
role_response = iam_client.put_user_policy(UserName=username,PolicyName=policyname,PolicyDocument=policy_document)
except ClientError as e:
role_err = e.response['Code']
return (role_err,role_response)
@attr(resource='get session token')
@attr(method='get')
@attr(operation='check')
@attr(assertion='s3 ops only accessible by temporary credentials')
@attr('sts_test')
def test_get_session_token():
iam_client=get_iam_client()
sts_client=get_sts_client()
sts_user_id=get_alt_user_id()
default_endpoint=get_config_endpoint()
user_policy = "{\"Version\":\"2012-10-17\",\"Statement\":[{\"Effect\":\"Deny\",\"Action\":\"s3:*\",\"Resource\":[\"*\"],\"Condition\":{\"BoolIfExists\":{\"sts:authentication\":\"false\"}}},{\"Effect\":\"Allow\",\"Action\":\"sts:GetSessionToken\",\"Resource\":\"*\",\"Condition\":{\"BoolIfExists\":{\"sts:authentication\":\"false\"}}}]}"
(resp_err,resp)=put_user_policy(iam_client,sts_user_id,None,user_policy)
eq(resp['ResponseMetadata']['HTTPStatusCode'],200)
response=sts_client.get_session_token()
eq(response['ResponseMetadata']['HTTPStatusCode'],200)
s3_client=boto3.client('s3',
aws_access_key_id = response['Credentials']['AccessKeyId'],
aws_secret_access_key = response['Credentials']['SecretAccessKey'],
aws_session_token = response['Credentials']['SessionToken'],
endpoint_url=default_endpoint,
region_name='',
)
bucket_name = get_new_bucket_name()
s3bucket = s3_client.create_bucket(Bucket=bucket_name)
eq(s3bucket['ResponseMetadata']['HTTPStatusCode'],200)
finish=s3_client.delete_bucket(Bucket=bucket_name)
@attr(resource='get session token')
@attr(method='get')
@attr(operation='check')
@attr(assertion='s3 ops denied by permanent credentials')
@attr('sts_test')
def test_get_session_token_permanent_creds_denied():
s3bucket_error=None
iam_client=get_iam_client()
sts_client=get_sts_client()
sts_user_id=get_alt_user_id()
default_endpoint=get_config_endpoint()
s3_main_access_key=get_main_aws_access_key()
s3_main_secret_key=get_main_aws_secret_key()
user_policy = "{\"Version\":\"2012-10-17\",\"Statement\":[{\"Effect\":\"Deny\",\"Action\":\"s3:*\",\"Resource\":[\"*\"],\"Condition\":{\"BoolIfExists\":{\"sts:authentication\":\"false\"}}},{\"Effect\":\"Allow\",\"Action\":\"sts:GetSessionToken\",\"Resource\":\"*\",\"Condition\":{\"BoolIfExists\":{\"sts:authentication\":\"false\"}}}]}"
(resp_err,resp)=put_user_policy(iam_client,sts_user_id,None,user_policy)
eq(resp['ResponseMetadata']['HTTPStatusCode'],200)
response=sts_client.get_session_token()
eq(response['ResponseMetadata']['HTTPStatusCode'],200)
s3_client=boto3.client('s3',
aws_access_key_id = s3_main_access_key,
aws_secret_access_key = s3_main_secret_key,
aws_session_token = response['Credentials']['SessionToken'],
endpoint_url=default_endpoint,
region_name='',
)
bucket_name = get_new_bucket_name()
try:
s3bucket = s3_client.create_bucket(Bucket=bucket_name)
except ClientError as e:
s3bucket_error = e.response.get("Error", {}).get("Code")
eq(s3bucket_error,'AccessDenied')
@attr(resource='assume role')
@attr(method='get')
@attr(operation='check')
@attr(assertion='role policy allows all s3 ops')
@attr('sts_test')
def test_assume_role_allow():
iam_client=get_iam_client()
sts_client=get_sts_client()
sts_user_id=get_alt_user_id()
default_endpoint=get_config_endpoint()
role_session_name=get_parameter_name()
policy_document = "{\"Version\":\"2012-10-17\",\"Statement\":[{\"Effect\":\"Allow\",\"Principal\":{\"AWS\":[\"arn:aws:iam:::user/"+sts_user_id+"\"]},\"Action\":[\"sts:AssumeRole\"]}]}"
(role_error,role_response,general_role_name)=create_role(iam_client,'/',None,policy_document,None,None,None)
eq(role_response['Role']['Arn'],'arn:aws:iam:::role/'+general_role_name+'')
role_policy = "{\"Version\":\"2012-10-17\",\"Statement\":{\"Effect\":\"Allow\",\"Action\":\"s3:*\",\"Resource\":\"arn:aws:s3:::*\"}}"
(role_err,response)=put_role_policy(iam_client,general_role_name,None,role_policy)
eq(response['ResponseMetadata']['HTTPStatusCode'],200)
resp=sts_client.assume_role(RoleArn=role_response['Role']['Arn'],RoleSessionName=role_session_name)
eq(resp['ResponseMetadata']['HTTPStatusCode'],200)
s3_client = boto3.client('s3',
aws_access_key_id = resp['Credentials']['AccessKeyId'],
aws_secret_access_key = resp['Credentials']['SecretAccessKey'],
aws_session_token = resp['Credentials']['SessionToken'],
endpoint_url=default_endpoint,
region_name='',
)
bucket_name = get_new_bucket_name()
s3bucket = s3_client.create_bucket(Bucket=bucket_name)
eq(s3bucket['ResponseMetadata']['HTTPStatusCode'],200)
bkt = s3_client.delete_bucket(Bucket=bucket_name)
eq(bkt['ResponseMetadata']['HTTPStatusCode'],204)
@attr(resource='assume role')
@attr(method='get')
@attr(operation='check')
@attr(assertion='role policy denies all s3 ops')
@attr('sts_test')
def test_assume_role_deny():
s3bucket_error=None
iam_client=get_iam_client()
sts_client=get_sts_client()
sts_user_id=get_alt_user_id()
default_endpoint=get_config_endpoint()
role_session_name=get_parameter_name()
policy_document = "{\"Version\":\"2012-10-17\",\"Statement\":[{\"Effect\":\"Allow\",\"Principal\":{\"AWS\":[\"arn:aws:iam:::user/"+sts_user_id+"\"]},\"Action\":[\"sts:AssumeRole\"]}]}"
(role_error,role_response,general_role_name)=create_role(iam_client,'/',None,policy_document,None,None,None)
eq(role_response['Role']['Arn'],'arn:aws:iam:::role/'+general_role_name+'')
role_policy = "{\"Version\":\"2012-10-17\",\"Statement\":{\"Effect\":\"Deny\",\"Action\":\"s3:*\",\"Resource\":\"arn:aws:s3:::*\"}}"
(role_err,response)=put_role_policy(iam_client,general_role_name,None,role_policy)
eq(response['ResponseMetadata']['HTTPStatusCode'],200)
resp=sts_client.assume_role(RoleArn=role_response['Role']['Arn'],RoleSessionName=role_session_name)
eq(resp['ResponseMetadata']['HTTPStatusCode'],200)
s3_client = boto3.client('s3',
aws_access_key_id = resp['Credentials']['AccessKeyId'],
aws_secret_access_key = resp['Credentials']['SecretAccessKey'],
aws_session_token = resp['Credentials']['SessionToken'],
endpoint_url=default_endpoint,
region_name='',
)
bucket_name = get_new_bucket_name()
try:
s3bucket = s3_client.create_bucket(Bucket=bucket_name)
except ClientError as e:
s3bucket_error = e.response.get("Error", {}).get("Code")
eq(s3bucket_error,'AccessDenied')
@attr(resource='assume role')
@attr(method='get')
@attr(operation='check')
@attr(assertion='creds expire so all s3 ops fails')
@attr('sts_test')
def test_assume_role_creds_expiry():
iam_client=get_iam_client()
sts_client=get_sts_client()
sts_user_id=get_alt_user_id()
default_endpoint=get_config_endpoint()
role_session_name=get_parameter_name()
policy_document = "{\"Version\":\"2012-10-17\",\"Statement\":[{\"Effect\":\"Allow\",\"Principal\":{\"AWS\":[\"arn:aws:iam:::user/"+sts_user_id+"\"]},\"Action\":[\"sts:AssumeRole\"]}]}"
(role_error,role_response,general_role_name)=create_role(iam_client,'/',None,policy_document,None,None,None)
eq(role_response['Role']['Arn'],'arn:aws:iam:::role/'+general_role_name+'')
role_policy = "{\"Version\":\"2012-10-17\",\"Statement\":{\"Effect\":\"Allow\",\"Action\":\"s3:*\",\"Resource\":\"arn:aws:s3:::*\"}}"
(role_err,response)=put_role_policy(iam_client,general_role_name,None,role_policy)
eq(response['ResponseMetadata']['HTTPStatusCode'],200)
resp=sts_client.assume_role(RoleArn=role_response['Role']['Arn'],RoleSessionName=role_session_name,DurationSeconds=900)
eq(resp['ResponseMetadata']['HTTPStatusCode'],200)
time.sleep(900)
s3_client = boto3.client('s3',
aws_access_key_id = resp['Credentials']['AccessKeyId'],
aws_secret_access_key = resp['Credentials']['SecretAccessKey'],
aws_session_token = resp['Credentials']['SessionToken'],
endpoint_url=default_endpoint,
region_name='',
)
bucket_name = get_new_bucket_name()
try:
s3bucket = s3_client.create_bucket(Bucket=bucket_name)
except ClientError as e:
s3bucket_error = e.response.get("Error", {}).get("Code")
eq(s3bucket_error,'AccessDenied')