diff --git a/pytest.ini b/pytest.ini index 73d1563..1a7d9a8 100644 --- a/pytest.ini +++ b/pytest.ini @@ -7,6 +7,7 @@ markers = auth_common bucket_policy bucket_encryption + checksum cloud_transition encryption fails_on_aws diff --git a/requirements.txt b/requirements.txt index a750192..b84990c 100644 --- a/requirements.txt +++ b/requirements.txt @@ -8,7 +8,7 @@ munch >=2.0.0 gevent >=1.0 isodate >=0.4.4 requests >=2.23.0 -pytz >=2011k +pytz httplib2 lxml pytest diff --git a/s3tests_boto3/functional/test_s3.py b/s3tests_boto3/functional/test_s3.py index 47cc525..98b3cdd 100644 --- a/s3tests_boto3/functional/test_s3.py +++ b/s3tests_boto3/functional/test_s3.py @@ -6458,6 +6458,48 @@ def test_multipart_get_part(): assert status == 400 assert error_code == 'InvalidPart' +@pytest.mark.fails_on_dbstore +def test_multipart_single_get_part(): + bucket_name = get_new_bucket() + client = get_client() + key = "mymultipart" + + part_size = 5*1024*1024 + part_sizes = [part_size] # just one part + part_count = len(part_sizes) + total_size = sum(part_sizes) + + (upload_id, data, parts) = _multipart_upload(bucket_name, key, total_size, part_size) + + # request part before complete + e = assert_raises(ClientError, client.get_object, Bucket=bucket_name, Key=key, PartNumber=1) + status, error_code = _get_status_and_error_code(e.response) + assert status == 404 + assert error_code == 'NoSuchKey' + + client.complete_multipart_upload(Bucket=bucket_name, Key=key, UploadId=upload_id, MultipartUpload={'Parts': parts}) + assert len(parts) == part_count + + for part, size in zip(parts, part_sizes): + response = client.head_object(Bucket=bucket_name, Key=key, PartNumber=part['PartNumber']) + assert response['PartsCount'] == part_count + assert response['ETag'] == '"{}"'.format(part['ETag']) + + response = client.get_object(Bucket=bucket_name, Key=key, PartNumber=part['PartNumber']) + assert response['PartsCount'] == part_count + assert response['ETag'] == '"{}"'.format(part['ETag']) + assert response['ContentLength'] == size + # compare contents + for chunk in response['Body'].iter_chunks(): + assert chunk.decode() == data[0:len(chunk)] + data = data[len(chunk):] + + # request PartNumber out of range + e = assert_raises(ClientError, client.get_object, Bucket=bucket_name, Key=key, PartNumber=5) + status, error_code = _get_status_and_error_code(e.response) + assert status == 400 + assert error_code == 'InvalidPart' + @pytest.mark.fails_on_dbstore def test_non_multipart_get_part(): bucket_name = get_new_bucket() @@ -13434,3 +13476,183 @@ def test_get_object_torrent(): status, error_code = _get_status_and_error_code(e.response) assert status == 404 assert error_code == 'NoSuchKey' + +@pytest.mark.checksum +def test_object_checksum_sha256(): + bucket = get_new_bucket() + client = get_client() + + key = "myobj" + size = 1024 + body = FakeWriteFile(size, 'A') + sha256sum = 'arcu6553sHVAiX4MjW0j7I7vD4w6R+Gz9Ok0Q9lTa+0=' + response = client.put_object(Bucket=bucket, Key=key, Body=body, ChecksumAlgorithm='SHA256', ChecksumSHA256=sha256sum) + assert sha256sum == response['ChecksumSHA256'] + + response = client.head_object(Bucket=bucket, Key=key) + assert 'ChecksumSHA256' not in response + response = client.head_object(Bucket=bucket, Key=key, ChecksumMode='ENABLED') + assert sha256sum == response['ChecksumSHA256'] + + e = assert_raises(ClientError, client.put_object, Bucket=bucket, Key=key, Body=body, ChecksumAlgorithm='SHA256', ChecksumSHA256='bad') + status, error_code = _get_status_and_error_code(e.response) + assert status == 400 + assert error_code == 'InvalidRequest' + +@pytest.mark.checksum +@pytest.mark.fails_on_dbstore +def test_multipart_checksum_sha256(): + bucket = get_new_bucket() + client = get_client() + + key = "mymultipart" + response = client.create_multipart_upload(Bucket=bucket, Key=key, ChecksumAlgorithm='SHA256') + assert 'SHA256' == response['ChecksumAlgorithm'] + upload_id = response['UploadId'] + + size = 1024 + body = FakeWriteFile(size, 'A') + part_sha256sum = 'arcu6553sHVAiX4MjW0j7I7vD4w6R+Gz9Ok0Q9lTa+0=' + response = client.upload_part(UploadId=upload_id, Bucket=bucket, Key=key, PartNumber=1, Body=body, ChecksumAlgorithm='SHA256', ChecksumSHA256=part_sha256sum) + + # should reject the bad request checksum + e = assert_raises(ClientError, client.complete_multipart_upload, Bucket=bucket, Key=key, UploadId=upload_id, ChecksumSHA256='bad', MultipartUpload={'Parts': [ + {'ETag': response['ETag'].strip('"'), 'ChecksumSHA256': response['ChecksumSHA256'], 'PartNumber': 1}]}) + status, error_code = _get_status_and_error_code(e.response) + assert status == 400 + assert error_code == 'InvalidRequest' + + # XXXX re-trying the complete is failing in RGW due to an internal error that appears not caused + # checksums; + # 2024-04-25T17:47:47.991-0400 7f78e3a006c0 0 req 4931907640780566174 0.011000143s s3:complete_multipart check_previously_completed() ERROR: get_obj_attrs() returned ret=-2 + # 2024-04-25T17:47:47.991-0400 7f78e3a006c0 2 req 4931907640780566174 0.011000143s s3:complete_multipart completing + # 2024-04-25T17:47:47.991-0400 7f78e3a006c0 1 req 4931907640780566174 0.011000143s s3:complete_multipart ERROR: either op_ret is negative (execute failed) or target_obj is null, op_ret: -2200 + # -2200 turns into 500, InternalError + + key = "mymultipart2" + response = client.create_multipart_upload(Bucket=bucket, Key=key, ChecksumAlgorithm='SHA256') + assert 'SHA256' == response['ChecksumAlgorithm'] + upload_id = response['UploadId'] + + body = FakeWriteFile(size, 'A') + part_sha256sum = 'arcu6553sHVAiX4MjW0j7I7vD4w6R+Gz9Ok0Q9lTa+0=' + response = client.upload_part(UploadId=upload_id, Bucket=bucket, Key=key, PartNumber=1, Body=body, ChecksumAlgorithm='SHA256', ChecksumSHA256=part_sha256sum) + + # should reject the missing part checksum + e = assert_raises(ClientError, client.complete_multipart_upload, Bucket=bucket, Key=key, UploadId=upload_id, ChecksumSHA256='bad', MultipartUpload={'Parts': [ + {'ETag': response['ETag'].strip('"'), 'PartNumber': 1}]}) + status, error_code = _get_status_and_error_code(e.response) + assert status == 400 + assert error_code == 'InvalidRequest' + + key = "mymultipart3" + response = client.create_multipart_upload(Bucket=bucket, Key=key, ChecksumAlgorithm='SHA256') + assert 'SHA256' == response['ChecksumAlgorithm'] + upload_id = response['UploadId'] + + body = FakeWriteFile(size, 'A') + part_sha256sum = 'arcu6553sHVAiX4MjW0j7I7vD4w6R+Gz9Ok0Q9lTa+0=' + response = client.upload_part(UploadId=upload_id, Bucket=bucket, Key=key, PartNumber=1, Body=body, ChecksumAlgorithm='SHA256', ChecksumSHA256=part_sha256sum) + + composite_sha256sum = 'Ok6Cs5b96ux6+MWQkJO7UBT5sKPBeXBLwvj/hK89smg=-1' + response = client.complete_multipart_upload(Bucket=bucket, Key=key, UploadId=upload_id, ChecksumSHA256=composite_sha256sum, MultipartUpload={'Parts': [ + {'ETag': response['ETag'].strip('"'), 'ChecksumSHA256': response['ChecksumSHA256'], 'PartNumber': 1}]}) + assert composite_sha256sum == response['ChecksumSHA256'] + + response = client.head_object(Bucket=bucket, Key=key) + assert 'ChecksumSHA256' not in response + response = client.head_object(Bucket=bucket, Key=key, ChecksumMode='ENABLED') + assert composite_sha256sum == response['ChecksumSHA256'] + +@pytest.mark.checksum +@pytest.mark.fails_on_dbstore +def test_multipart_checksum_3parts(): + bucket = get_new_bucket() + client = get_client() + + key = "mymultipart3" + response = client.create_multipart_upload(Bucket=bucket, Key=key, ChecksumAlgorithm='SHA256') + assert 'SHA256' == response['ChecksumAlgorithm'] + upload_id = response['UploadId'] + + size = 5 * 1024 * 1024 # each part but the last must be at least 5M + body = FakeWriteFile(size, 'A') + part1_sha256sum = '275VF5loJr1YYawit0XSHREhkFXYkkPKGuoK0x9VKxI=' + response = client.upload_part(UploadId=upload_id, Bucket=bucket, Key=key, PartNumber=1, Body=body, ChecksumAlgorithm='SHA256', ChecksumSHA256=part1_sha256sum) + etag1 = response['ETag'].strip('"') + + body = FakeWriteFile(size, 'B') + part2_sha256sum = 'mrHwOfjTL5Zwfj74F05HOQGLdUb7E5szdCbxgUSq6NM=' + response = client.upload_part(UploadId=upload_id, Bucket=bucket, Key=key, PartNumber=2, Body=body, ChecksumAlgorithm='SHA256', ChecksumSHA256=part2_sha256sum) + etag2 = response['ETag'].strip('"') + + body = FakeWriteFile(size, 'C') + part3_sha256sum = 'Vw7oB/nKQ5xWb3hNgbyfkvDiivl+U+/Dft48nfJfDow=' + response = client.upload_part(UploadId=upload_id, Bucket=bucket, Key=key, PartNumber=3, Body=body, ChecksumAlgorithm='SHA256', ChecksumSHA256=part3_sha256sum) + etag3 = response['ETag'].strip('"') + + composite_sha256sum = 'uWBwpe1dxI4Vw8Gf0X9ynOdw/SS6VBzfWm9giiv1sf4=-3' + response = client.complete_multipart_upload(Bucket=bucket, Key=key, UploadId=upload_id, ChecksumSHA256=composite_sha256sum, MultipartUpload={'Parts': [ + {'ETag': etag1, 'ChecksumSHA256': response['ChecksumSHA256'], 'PartNumber': 1}, + {'ETag': etag2, 'ChecksumSHA256': response['ChecksumSHA256'], 'PartNumber': 2}, + {'ETag': etag3, 'ChecksumSHA256': response['ChecksumSHA256'], 'PartNumber': 3}]}) + assert composite_sha256sum == response['ChecksumSHA256'] + + response = client.head_object(Bucket=bucket, Key=key) + assert 'ChecksumSHA256' not in response + response = client.head_object(Bucket=bucket, Key=key, ChecksumMode='ENABLED') + assert composite_sha256sum == response['ChecksumSHA256'] + +@pytest.mark.checksum +def test_post_object_upload_checksum(): + megabytes = 1024 * 1024 + min_size = 0 + max_size = 5 * megabytes + test_payload_size = 2 * megabytes + + bucket_name = get_new_bucket() + client = get_client() + + url = _get_post_url(bucket_name) + utc = pytz.utc + expires = datetime.datetime.now(utc) + datetime.timedelta(seconds=+6000) + + policy_document = {"expiration": expires.strftime("%Y-%m-%dT%H:%M:%SZ"),\ + "conditions": [\ + {"bucket": bucket_name},\ + ["starts-with", "$key", "foo_cksum_test"],\ + {"acl": "private"},\ + ["starts-with", "$Content-Type", "text/plain"],\ + ["content-length-range", min_size, max_size],\ + ]\ + } + + test_payload = b'x' * test_payload_size + + json_policy_document = json.JSONEncoder().encode(policy_document) + bytes_json_policy_document = bytes(json_policy_document, 'utf-8') + policy = base64.b64encode(bytes_json_policy_document) + aws_secret_access_key = get_main_aws_secret_key() + aws_access_key_id = get_main_aws_access_key() + + signature = base64.b64encode(hmac.new(bytes(aws_secret_access_key, 'utf-8'), policy, hashlib.sha1).digest()) + + # good checksum payload (checked via upload from awscli) + payload = OrderedDict([ ("key" , "foo_cksum_test.txt"),("AWSAccessKeyId" , aws_access_key_id),\ + ("acl" , "private"),("signature" , signature),("policy" , policy),\ + ("Content-Type" , "text/plain"),\ + ('x-amz-checksum-sha256', 'aTL9MeXa9HObn6eP93eygxsJlcwdCwCTysgGAZAgE7w='),\ + ('file', (test_payload)),]) + + r = requests.post(url, files=payload, verify=get_config_ssl_verify()) + assert r.status_code == 204 + + # bad checksum payload + payload = OrderedDict([ ("key" , "foo_cksum_test.txt"),("AWSAccessKeyId" , aws_access_key_id),\ + ("acl" , "private"),("signature" , signature),("policy" , policy),\ + ("Content-Type" , "text/plain"),\ + ('x-amz-checksum-sha256', 'sailorjerry'),\ + ('file', (test_payload)),]) + + r = requests.post(url, files=payload, verify=get_config_ssl_verify()) + assert r.status_code == 400 diff --git a/s3tests_boto3/functional/test_s3select.py b/s3tests_boto3/functional/test_s3select.py index 7ea8128..88cd111 100644 --- a/s3tests_boto3/functional/test_s3select.py +++ b/s3tests_boto3/functional/test_s3select.py @@ -4,6 +4,7 @@ import string import re import json from botocore.exceptions import ClientError +from botocore.exceptions import EventStreamError import pandas as pd import pyarrow as pa @@ -335,26 +336,34 @@ def run_s3select(bucket,key,query,input="CSV",output="CSV",quot_field="", op_col return result if progress == False: - for event in r['Payload']: - if 'Records' in event: - records = event['Records']['Payload'].decode('utf-8') - result += records - else: - result = [] - max_progress_scanned = 0 - for event in r['Payload']: - if 'Records' in event: - records = event['Records'] - result.append(records.copy()) - if 'Progress' in event: - if(event['Progress']['Details']['BytesScanned'] > max_progress_scanned): - max_progress_scanned = event['Progress']['Details']['BytesScanned'] - result_status['Progress'] = event['Progress'] - if 'Stats' in event: - result_status['Stats'] = event['Stats'] - if 'End' in event: - result_status['End'] = event['End'] + try: + for event in r['Payload']: + if 'Records' in event: + records = event['Records']['Payload'].decode('utf-8') + result += records + + except EventStreamError as c: + result = str(c) + return result + + else: + result = [] + max_progress_scanned = 0 + for event in r['Payload']: + if 'Records' in event: + records = event['Records'] + result.append(records.copy()) + if 'Progress' in event: + if(event['Progress']['Details']['BytesScanned'] > max_progress_scanned): + max_progress_scanned = event['Progress']['Details']['BytesScanned'] + result_status['Progress'] = event['Progress'] + + if 'Stats' in event: + result_status['Stats'] = event['Stats'] + if 'End' in event: + result_status['End'] = event['End'] + if progress == False: return result @@ -1559,7 +1568,6 @@ def test_schema_definition(): # using the scheme on first line, query is using the attach schema res_use = remove_xml_tags_from_result( run_s3select(bucket_name,csv_obj_name,"select c1,c3 from s3object;",csv_header_info="USE") ).replace("\n","") - # result of both queries should be the same s3select_assert_result( res_ignore, res_use)