BucketPolicy: add test for sse-c in conditions

Ref. https://github.com/ceph/ceph/pull/58689

Signed-off-by: Seena Fallah <seenafallah@gmail.com>
(cherry picked from commit 87b496f25f)
This commit is contained in:
Seena Fallah 2024-07-19 23:07:03 +02:00 committed by Casey Bodley
parent abd28c3aef
commit 0887ec43fd

View file

@ -10077,6 +10077,44 @@ def test_encryption_sse_c_post_object_authenticated_request():
body = _get_body(response) body = _get_body(response)
assert body == 'bar' assert body == 'bar'
@pytest.mark.encryption
@pytest.mark.fails_on_dbstore
def test_encryption_sse_c_enforced_with_bucket_policy():
bucket_name = get_new_bucket()
client = get_client()
deny_incorrect_algo = {
"StringNotEquals": {
"s3:x-amz-server-side-encryption-customer-algorithm": "AES256"
}
}
deny_unencrypted_obj = {
"Null" : {
"s3:x-amz-server-side-encryption-customer-algorithm": "true"
}
}
p = Policy()
resource = _make_arn_resource("{}/{}".format(bucket_name, "*"))
s1 = Statement("s3:PutObject", resource, effect="Deny", condition=deny_incorrect_algo)
s2 = Statement("s3:PutObject", resource, effect="Deny", condition=deny_unencrypted_obj)
policy_document = p.add_statement(s1).add_statement(s2).to_json()
client.put_bucket_policy(Bucket=bucket_name, Policy=policy_document)
check_access_denied(client.put_object, Bucket=bucket_name, Key='foo', Body='bar')
client.put_object(
Bucket=bucket_name, Key='foo', Body='bar',
SSECustomerAlgorithm='AES256',
SSECustomerKey='pO3upElrwuEXSoFwCfnZPdSsmt/xWeFa0N9KgDijwVs=',
SSECustomerKeyMD5='DWygnHRtgiJ77HCm+1rvHw=='
)
@pytest.mark.encryption @pytest.mark.encryption
@pytest.mark.fails_on_dbstore @pytest.mark.fails_on_dbstore
def _test_sse_kms_customer_write(file_size, key_id = 'testkey-1'): def _test_sse_kms_customer_write(file_size, key_id = 'testkey-1'):
@ -10102,10 +10140,6 @@ def _test_sse_kms_customer_write(file_size, key_id = 'testkey-1'):
assert body == data assert body == data
@pytest.mark.encryption @pytest.mark.encryption
@pytest.mark.fails_on_dbstore @pytest.mark.fails_on_dbstore
def test_sse_kms_method_head(): def test_sse_kms_method_head():