s3: reenable tenanted bucket policy test

the before-call hook url-encodes the ':' part of tenanted bucket names
to resolve SignatureDoesNotMatch errors

removed the list-v2 version of the test since it isn't relevant to
bucket policy test coverage

add a new test case that creates the bucket under the tenanted user,
then uses the main client to access it

Signed-off-by: Casey Bodley <cbodley@redhat.com>
(cherry picked from commit 2e41494293)
This commit is contained in:
Casey Bodley 2024-08-13 10:23:19 -04:00
parent 06a5851a92
commit cf0103e3f3
3 changed files with 38 additions and 39 deletions

View file

@ -117,6 +117,9 @@ secret_key = opqrstuvwxyzabcdefghijklmnopqrstuvwxyzab
# tenant email set in vstart.sh # tenant email set in vstart.sh
email = tenanteduser@example.com email = tenanteduser@example.com
# tenant name
tenant = testx
#following section needs to be added for all sts-tests #following section needs to be added for all sts-tests
[iam] [iam]
#used for iam operations in sts-tests #used for iam operations in sts-tests

View file

@ -259,6 +259,7 @@ def setup():
config.tenant_display_name = cfg.get('s3 tenant',"display_name") config.tenant_display_name = cfg.get('s3 tenant',"display_name")
config.tenant_user_id = cfg.get('s3 tenant',"user_id") config.tenant_user_id = cfg.get('s3 tenant',"user_id")
config.tenant_email = cfg.get('s3 tenant',"email") config.tenant_email = cfg.get('s3 tenant',"email")
config.tenant_name = cfg.get('s3 tenant',"tenant")
config.iam_access_key = cfg.get('iam',"access_key") config.iam_access_key = cfg.get('iam',"access_key")
config.iam_secret_key = cfg.get('iam',"secret_key") config.iam_secret_key = cfg.get('iam',"secret_key")
@ -693,6 +694,9 @@ def get_tenant_aws_secret_key():
def get_tenant_display_name(): def get_tenant_display_name():
return config.tenant_display_name return config.tenant_display_name
def get_tenant_name():
return config.tenant_name
def get_tenant_user_id(): def get_tenant_user_id():
return config.tenant_user_id return config.tenant_user_id

View file

@ -68,6 +68,7 @@ from . import (
get_alt_client, get_alt_client,
get_tenant_client, get_tenant_client,
get_tenant_iam_client, get_tenant_iam_client,
get_tenant_name,
get_tenant_user_id, get_tenant_user_id,
get_buckets_list, get_buckets_list,
get_objects_list, get_objects_list,
@ -10573,17 +10574,29 @@ def test_bucketv2_policy_acl():
client.delete_bucket_policy(Bucket=bucket_name) client.delete_bucket_policy(Bucket=bucket_name)
client.put_bucket_acl(Bucket=bucket_name, ACL='public-read') client.put_bucket_acl(Bucket=bucket_name, ACL='public-read')
def tenanted_bucket_name(tenant):
def change_bucket_name(params, **kwargs):
old_name = params['context']['signing']['bucket']
new_name = "{}:{}".format(tenant, old_name)
params['Bucket'] = new_name
params['context']['signing']['bucket'] = new_name
# the : needs to be url-encoded for urls
new_name_url = "{}%3A{}".format(tenant, old_name)
params['url'] = params['url'].replace(old_name, new_name_url)
params['url_path'] = params['url_path'].replace(old_name, new_name_url)
return change_bucket_name
@pytest.mark.bucket_policy @pytest.mark.bucket_policy
# TODO: remove this fails_on_rgw when I fix it
@pytest.mark.fails_on_rgw
def test_bucket_policy_different_tenant(): def test_bucket_policy_different_tenant():
bucket_name = get_new_bucket() bucket_name = get_new_bucket()
client = get_client() client = get_client()
key = 'asdf' key = 'asdf'
client.put_object(Bucket=bucket_name, Key=key, Body='asdf') client.put_object(Bucket=bucket_name, Key=key, Body='asdf')
resource1 = "arn:aws:s3::*:" + bucket_name resource1 = "arn:aws:s3:::" + bucket_name
resource2 = "arn:aws:s3::*:" + bucket_name + "/*" resource2 = "arn:aws:s3:::" + bucket_name + "/*"
policy_document = json.dumps( policy_document = json.dumps(
{ {
"Version": "2012-10-17", "Version": "2012-10-17",
@ -10600,35 +10613,22 @@ def test_bucket_policy_different_tenant():
client.put_bucket_policy(Bucket=bucket_name, Policy=policy_document) client.put_bucket_policy(Bucket=bucket_name, Policy=policy_document)
# TODO: figure out how to change the bucketname # use the tenanted client to list the global tenant's bucket
def change_bucket_name(**kwargs):
kwargs['params']['url'] = "http://localhost:8000/:{bucket_name}?encoding-type=url".format(bucket_name=bucket_name)
kwargs['params']['url_path'] = "/:{bucket_name}".format(bucket_name=bucket_name)
kwargs['params']['context']['signing']['bucket'] = ":{bucket_name}".format(bucket_name=bucket_name)
print(kwargs['request_signer'])
print(kwargs)
#bucket_name = ":" + bucket_name
tenant_client = get_tenant_client() tenant_client = get_tenant_client()
tenant_client.meta.events.register('before-call.s3.ListObjects', change_bucket_name) tenant_client.meta.events.register('before-call.s3.ListObjects', tenanted_bucket_name(''))
response = tenant_client.list_objects(Bucket=bucket_name) response = tenant_client.list_objects(Bucket=bucket_name)
#alt_client = get_alt_client()
#response = alt_client.list_objects(Bucket=bucket_name)
assert len(response['Contents']) == 1 assert len(response['Contents']) == 1
@pytest.mark.bucket_policy @pytest.mark.bucket_policy
# TODO: remove this fails_on_rgw when I fix it def test_bucket_policy_tenanted_bucket():
@pytest.mark.fails_on_rgw tenant_client = get_tenant_client()
@pytest.mark.list_objects_v2 bucket_name = get_new_bucket(tenant_client)
def test_bucketv2_policy_different_tenant():
bucket_name = get_new_bucket()
client = get_client()
key = 'asdf' key = 'asdf'
client.put_object(Bucket=bucket_name, Key=key, Body='asdf') tenant_client.put_object(Bucket=bucket_name, Key=key, Body='asdf')
resource1 = "arn:aws:s3::*:" + bucket_name resource1 = "arn:aws:s3:::" + bucket_name
resource2 = "arn:aws:s3::*:" + bucket_name + "/*" resource2 = "arn:aws:s3:::" + bucket_name + "/*"
policy_document = json.dumps( policy_document = json.dumps(
{ {
"Version": "2012-10-17", "Version": "2012-10-17",
@ -10643,23 +10643,15 @@ def test_bucketv2_policy_different_tenant():
}] }]
}) })
client.put_bucket_policy(Bucket=bucket_name, Policy=policy_document) tenant_client.put_bucket_policy(Bucket=bucket_name, Policy=policy_document)
# TODO: figure out how to change the bucketname tenant = get_tenant_name()
def change_bucket_name(**kwargs):
kwargs['params']['url'] = "http://localhost:8000/:{bucket_name}?encoding-type=url".format(bucket_name=bucket_name)
kwargs['params']['url_path'] = "/:{bucket_name}".format(bucket_name=bucket_name)
kwargs['params']['context']['signing']['bucket'] = ":{bucket_name}".format(bucket_name=bucket_name)
print(kwargs['request_signer'])
print(kwargs)
#bucket_name = ":" + bucket_name # use the global tenant's client to list the tenanted bucket
tenant_client = get_tenant_client() client = get_client()
tenant_client.meta.events.register('before-call.s3.ListObjects', change_bucket_name) client.meta.events.register('before-call.s3.ListObjects', tenanted_bucket_name(tenant))
response = tenant_client.list_objects_v2(Bucket=bucket_name)
#alt_client = get_alt_client()
#response = alt_client.list_objects_v2(Bucket=bucket_name)
response = client.list_objects(Bucket=bucket_name)
assert len(response['Contents']) == 1 assert len(response['Contents']) == 1
@pytest.mark.bucket_policy @pytest.mark.bucket_policy