Merge pull request #487 from robbat2/rgw_post_content_length

test_post_object_upload_size_below_minimum_rgw_max_chunk_size: new testcase for size validation bug

Reviewed-by: Casey Bodley <cbodley@redhat.com>
This commit is contained in:
Casey Bodley 2023-02-20 09:28:53 -05:00 committed by GitHub
commit 2087c1ba26
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23

View file

@ -2818,6 +2818,53 @@ def test_post_object_upload_size_below_minimum():
r = requests.post(url, files=payload, verify=get_config_ssl_verify()) r = requests.post(url, files=payload, verify=get_config_ssl_verify())
assert r.status_code == 400 assert r.status_code == 400
def test_post_object_upload_size_rgw_chunk_size_bug():
# Test for https://tracker.ceph.com/issues/58627
# TODO: if this value is different in Teuthology runs, this would need tuning
# https://github.com/ceph/ceph/blob/main/qa/suites/rgw/verify/striping%24/stripe-greater-than-chunk.yaml
_rgw_max_chunk_size = 4 * 2**20 # 4MiB
min_size = _rgw_max_chunk_size
max_size = _rgw_max_chunk_size * 3
# [(chunk),(small)]
test_payload_size = _rgw_max_chunk_size + 200 # extra bit to push it over the chunk boundary
# it should be valid when we run this test!
assert test_payload_size > min_size
assert test_payload_size < max_size
bucket_name = get_new_bucket()
client = get_client()
url = _get_post_url(bucket_name)
utc = pytz.utc
expires = datetime.datetime.now(utc) + datetime.timedelta(seconds=+6000)
policy_document = {"expiration": expires.strftime("%Y-%m-%dT%H:%M:%SZ"),\
"conditions": [\
{"bucket": bucket_name},\
["starts-with", "$key", "foo"],\
{"acl": "private"},\
["starts-with", "$Content-Type", "text/plain"],\
["content-length-range", min_size, max_size],\
]\
}
test_payload = 'x' * test_payload_size
json_policy_document = json.JSONEncoder().encode(policy_document)
bytes_json_policy_document = bytes(json_policy_document, 'utf-8')
policy = base64.b64encode(bytes_json_policy_document)
aws_secret_access_key = get_main_aws_secret_key()
aws_access_key_id = get_main_aws_access_key()
signature = base64.b64encode(hmac.new(bytes(aws_secret_access_key, 'utf-8'), policy, hashlib.sha1).digest())
payload = OrderedDict([ ("key" , "foo.txt"),("AWSAccessKeyId" , aws_access_key_id),\
("acl" , "private"),("signature" , signature),("policy" , policy),\
("Content-Type" , "text/plain"),('file', (test_payload))])
r = requests.post(url, files=payload, verify=get_config_ssl_verify())
assert r.status_code == 204
def test_post_object_empty_conditions(): def test_post_object_empty_conditions():
bucket_name = get_new_bucket() bucket_name = get_new_bucket()
client = get_client() client = get_client()