Merge branch 'master' into json_format

This commit is contained in:
albin-antony 2024-07-23 11:20:28 +05:30 committed by GitHub
commit d2c16dba3c
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
4 changed files with 252 additions and 21 deletions

View file

@ -7,6 +7,7 @@ markers =
auth_common auth_common
bucket_policy bucket_policy
bucket_encryption bucket_encryption
checksum
cloud_transition cloud_transition
encryption encryption
fails_on_aws fails_on_aws

View file

@ -8,7 +8,7 @@ munch >=2.0.0
gevent >=1.0 gevent >=1.0
isodate >=0.4.4 isodate >=0.4.4
requests >=2.23.0 requests >=2.23.0
pytz >=2011k pytz
httplib2 httplib2
lxml lxml
pytest pytest

View file

@ -6458,6 +6458,48 @@ def test_multipart_get_part():
assert status == 400 assert status == 400
assert error_code == 'InvalidPart' assert error_code == 'InvalidPart'
@pytest.mark.fails_on_dbstore
def test_multipart_single_get_part():
bucket_name = get_new_bucket()
client = get_client()
key = "mymultipart"
part_size = 5*1024*1024
part_sizes = [part_size] # just one part
part_count = len(part_sizes)
total_size = sum(part_sizes)
(upload_id, data, parts) = _multipart_upload(bucket_name, key, total_size, part_size)
# request part before complete
e = assert_raises(ClientError, client.get_object, Bucket=bucket_name, Key=key, PartNumber=1)
status, error_code = _get_status_and_error_code(e.response)
assert status == 404
assert error_code == 'NoSuchKey'
client.complete_multipart_upload(Bucket=bucket_name, Key=key, UploadId=upload_id, MultipartUpload={'Parts': parts})
assert len(parts) == part_count
for part, size in zip(parts, part_sizes):
response = client.head_object(Bucket=bucket_name, Key=key, PartNumber=part['PartNumber'])
assert response['PartsCount'] == part_count
assert response['ETag'] == '"{}"'.format(part['ETag'])
response = client.get_object(Bucket=bucket_name, Key=key, PartNumber=part['PartNumber'])
assert response['PartsCount'] == part_count
assert response['ETag'] == '"{}"'.format(part['ETag'])
assert response['ContentLength'] == size
# compare contents
for chunk in response['Body'].iter_chunks():
assert chunk.decode() == data[0:len(chunk)]
data = data[len(chunk):]
# request PartNumber out of range
e = assert_raises(ClientError, client.get_object, Bucket=bucket_name, Key=key, PartNumber=5)
status, error_code = _get_status_and_error_code(e.response)
assert status == 400
assert error_code == 'InvalidPart'
@pytest.mark.fails_on_dbstore @pytest.mark.fails_on_dbstore
def test_non_multipart_get_part(): def test_non_multipart_get_part():
bucket_name = get_new_bucket() bucket_name = get_new_bucket()
@ -13434,3 +13476,183 @@ def test_get_object_torrent():
status, error_code = _get_status_and_error_code(e.response) status, error_code = _get_status_and_error_code(e.response)
assert status == 404 assert status == 404
assert error_code == 'NoSuchKey' assert error_code == 'NoSuchKey'
@pytest.mark.checksum
def test_object_checksum_sha256():
bucket = get_new_bucket()
client = get_client()
key = "myobj"
size = 1024
body = FakeWriteFile(size, 'A')
sha256sum = 'arcu6553sHVAiX4MjW0j7I7vD4w6R+Gz9Ok0Q9lTa+0='
response = client.put_object(Bucket=bucket, Key=key, Body=body, ChecksumAlgorithm='SHA256', ChecksumSHA256=sha256sum)
assert sha256sum == response['ChecksumSHA256']
response = client.head_object(Bucket=bucket, Key=key)
assert 'ChecksumSHA256' not in response
response = client.head_object(Bucket=bucket, Key=key, ChecksumMode='ENABLED')
assert sha256sum == response['ChecksumSHA256']
e = assert_raises(ClientError, client.put_object, Bucket=bucket, Key=key, Body=body, ChecksumAlgorithm='SHA256', ChecksumSHA256='bad')
status, error_code = _get_status_and_error_code(e.response)
assert status == 400
assert error_code == 'InvalidRequest'
@pytest.mark.checksum
@pytest.mark.fails_on_dbstore
def test_multipart_checksum_sha256():
bucket = get_new_bucket()
client = get_client()
key = "mymultipart"
response = client.create_multipart_upload(Bucket=bucket, Key=key, ChecksumAlgorithm='SHA256')
assert 'SHA256' == response['ChecksumAlgorithm']
upload_id = response['UploadId']
size = 1024
body = FakeWriteFile(size, 'A')
part_sha256sum = 'arcu6553sHVAiX4MjW0j7I7vD4w6R+Gz9Ok0Q9lTa+0='
response = client.upload_part(UploadId=upload_id, Bucket=bucket, Key=key, PartNumber=1, Body=body, ChecksumAlgorithm='SHA256', ChecksumSHA256=part_sha256sum)
# should reject the bad request checksum
e = assert_raises(ClientError, client.complete_multipart_upload, Bucket=bucket, Key=key, UploadId=upload_id, ChecksumSHA256='bad', MultipartUpload={'Parts': [
{'ETag': response['ETag'].strip('"'), 'ChecksumSHA256': response['ChecksumSHA256'], 'PartNumber': 1}]})
status, error_code = _get_status_and_error_code(e.response)
assert status == 400
assert error_code == 'InvalidRequest'
# XXXX re-trying the complete is failing in RGW due to an internal error that appears not caused
# checksums;
# 2024-04-25T17:47:47.991-0400 7f78e3a006c0 0 req 4931907640780566174 0.011000143s s3:complete_multipart check_previously_completed() ERROR: get_obj_attrs() returned ret=-2
# 2024-04-25T17:47:47.991-0400 7f78e3a006c0 2 req 4931907640780566174 0.011000143s s3:complete_multipart completing
# 2024-04-25T17:47:47.991-0400 7f78e3a006c0 1 req 4931907640780566174 0.011000143s s3:complete_multipart ERROR: either op_ret is negative (execute failed) or target_obj is null, op_ret: -2200
# -2200 turns into 500, InternalError
key = "mymultipart2"
response = client.create_multipart_upload(Bucket=bucket, Key=key, ChecksumAlgorithm='SHA256')
assert 'SHA256' == response['ChecksumAlgorithm']
upload_id = response['UploadId']
body = FakeWriteFile(size, 'A')
part_sha256sum = 'arcu6553sHVAiX4MjW0j7I7vD4w6R+Gz9Ok0Q9lTa+0='
response = client.upload_part(UploadId=upload_id, Bucket=bucket, Key=key, PartNumber=1, Body=body, ChecksumAlgorithm='SHA256', ChecksumSHA256=part_sha256sum)
# should reject the missing part checksum
e = assert_raises(ClientError, client.complete_multipart_upload, Bucket=bucket, Key=key, UploadId=upload_id, ChecksumSHA256='bad', MultipartUpload={'Parts': [
{'ETag': response['ETag'].strip('"'), 'PartNumber': 1}]})
status, error_code = _get_status_and_error_code(e.response)
assert status == 400
assert error_code == 'InvalidRequest'
key = "mymultipart3"
response = client.create_multipart_upload(Bucket=bucket, Key=key, ChecksumAlgorithm='SHA256')
assert 'SHA256' == response['ChecksumAlgorithm']
upload_id = response['UploadId']
body = FakeWriteFile(size, 'A')
part_sha256sum = 'arcu6553sHVAiX4MjW0j7I7vD4w6R+Gz9Ok0Q9lTa+0='
response = client.upload_part(UploadId=upload_id, Bucket=bucket, Key=key, PartNumber=1, Body=body, ChecksumAlgorithm='SHA256', ChecksumSHA256=part_sha256sum)
composite_sha256sum = 'Ok6Cs5b96ux6+MWQkJO7UBT5sKPBeXBLwvj/hK89smg=-1'
response = client.complete_multipart_upload(Bucket=bucket, Key=key, UploadId=upload_id, ChecksumSHA256=composite_sha256sum, MultipartUpload={'Parts': [
{'ETag': response['ETag'].strip('"'), 'ChecksumSHA256': response['ChecksumSHA256'], 'PartNumber': 1}]})
assert composite_sha256sum == response['ChecksumSHA256']
response = client.head_object(Bucket=bucket, Key=key)
assert 'ChecksumSHA256' not in response
response = client.head_object(Bucket=bucket, Key=key, ChecksumMode='ENABLED')
assert composite_sha256sum == response['ChecksumSHA256']
@pytest.mark.checksum
@pytest.mark.fails_on_dbstore
def test_multipart_checksum_3parts():
bucket = get_new_bucket()
client = get_client()
key = "mymultipart3"
response = client.create_multipart_upload(Bucket=bucket, Key=key, ChecksumAlgorithm='SHA256')
assert 'SHA256' == response['ChecksumAlgorithm']
upload_id = response['UploadId']
size = 5 * 1024 * 1024 # each part but the last must be at least 5M
body = FakeWriteFile(size, 'A')
part1_sha256sum = '275VF5loJr1YYawit0XSHREhkFXYkkPKGuoK0x9VKxI='
response = client.upload_part(UploadId=upload_id, Bucket=bucket, Key=key, PartNumber=1, Body=body, ChecksumAlgorithm='SHA256', ChecksumSHA256=part1_sha256sum)
etag1 = response['ETag'].strip('"')
body = FakeWriteFile(size, 'B')
part2_sha256sum = 'mrHwOfjTL5Zwfj74F05HOQGLdUb7E5szdCbxgUSq6NM='
response = client.upload_part(UploadId=upload_id, Bucket=bucket, Key=key, PartNumber=2, Body=body, ChecksumAlgorithm='SHA256', ChecksumSHA256=part2_sha256sum)
etag2 = response['ETag'].strip('"')
body = FakeWriteFile(size, 'C')
part3_sha256sum = 'Vw7oB/nKQ5xWb3hNgbyfkvDiivl+U+/Dft48nfJfDow='
response = client.upload_part(UploadId=upload_id, Bucket=bucket, Key=key, PartNumber=3, Body=body, ChecksumAlgorithm='SHA256', ChecksumSHA256=part3_sha256sum)
etag3 = response['ETag'].strip('"')
composite_sha256sum = 'uWBwpe1dxI4Vw8Gf0X9ynOdw/SS6VBzfWm9giiv1sf4=-3'
response = client.complete_multipart_upload(Bucket=bucket, Key=key, UploadId=upload_id, ChecksumSHA256=composite_sha256sum, MultipartUpload={'Parts': [
{'ETag': etag1, 'ChecksumSHA256': response['ChecksumSHA256'], 'PartNumber': 1},
{'ETag': etag2, 'ChecksumSHA256': response['ChecksumSHA256'], 'PartNumber': 2},
{'ETag': etag3, 'ChecksumSHA256': response['ChecksumSHA256'], 'PartNumber': 3}]})
assert composite_sha256sum == response['ChecksumSHA256']
response = client.head_object(Bucket=bucket, Key=key)
assert 'ChecksumSHA256' not in response
response = client.head_object(Bucket=bucket, Key=key, ChecksumMode='ENABLED')
assert composite_sha256sum == response['ChecksumSHA256']
@pytest.mark.checksum
def test_post_object_upload_checksum():
megabytes = 1024 * 1024
min_size = 0
max_size = 5 * megabytes
test_payload_size = 2 * megabytes
bucket_name = get_new_bucket()
client = get_client()
url = _get_post_url(bucket_name)
utc = pytz.utc
expires = datetime.datetime.now(utc) + datetime.timedelta(seconds=+6000)
policy_document = {"expiration": expires.strftime("%Y-%m-%dT%H:%M:%SZ"),\
"conditions": [\
{"bucket": bucket_name},\
["starts-with", "$key", "foo_cksum_test"],\
{"acl": "private"},\
["starts-with", "$Content-Type", "text/plain"],\
["content-length-range", min_size, max_size],\
]\
}
test_payload = b'x' * test_payload_size
json_policy_document = json.JSONEncoder().encode(policy_document)
bytes_json_policy_document = bytes(json_policy_document, 'utf-8')
policy = base64.b64encode(bytes_json_policy_document)
aws_secret_access_key = get_main_aws_secret_key()
aws_access_key_id = get_main_aws_access_key()
signature = base64.b64encode(hmac.new(bytes(aws_secret_access_key, 'utf-8'), policy, hashlib.sha1).digest())
# good checksum payload (checked via upload from awscli)
payload = OrderedDict([ ("key" , "foo_cksum_test.txt"),("AWSAccessKeyId" , aws_access_key_id),\
("acl" , "private"),("signature" , signature),("policy" , policy),\
("Content-Type" , "text/plain"),\
('x-amz-checksum-sha256', 'aTL9MeXa9HObn6eP93eygxsJlcwdCwCTysgGAZAgE7w='),\
('file', (test_payload)),])
r = requests.post(url, files=payload, verify=get_config_ssl_verify())
assert r.status_code == 204
# bad checksum payload
payload = OrderedDict([ ("key" , "foo_cksum_test.txt"),("AWSAccessKeyId" , aws_access_key_id),\
("acl" , "private"),("signature" , signature),("policy" , policy),\
("Content-Type" , "text/plain"),\
('x-amz-checksum-sha256', 'sailorjerry'),\
('file', (test_payload)),])
r = requests.post(url, files=payload, verify=get_config_ssl_verify())
assert r.status_code == 400

View file

@ -4,6 +4,7 @@ import string
import re import re
import json import json
from botocore.exceptions import ClientError from botocore.exceptions import ClientError
from botocore.exceptions import EventStreamError
import pandas as pd import pandas as pd
import pyarrow as pa import pyarrow as pa
@ -335,26 +336,34 @@ def run_s3select(bucket,key,query,input="CSV",output="CSV",quot_field="", op_col
return result return result
if progress == False: if progress == False:
for event in r['Payload']:
if 'Records' in event:
records = event['Records']['Payload'].decode('utf-8')
result += records
else:
result = []
max_progress_scanned = 0
for event in r['Payload']:
if 'Records' in event:
records = event['Records']
result.append(records.copy())
if 'Progress' in event:
if(event['Progress']['Details']['BytesScanned'] > max_progress_scanned):
max_progress_scanned = event['Progress']['Details']['BytesScanned']
result_status['Progress'] = event['Progress']
if 'Stats' in event: try:
result_status['Stats'] = event['Stats'] for event in r['Payload']:
if 'End' in event: if 'Records' in event:
result_status['End'] = event['End'] records = event['Records']['Payload'].decode('utf-8')
result += records
except EventStreamError as c:
result = str(c)
return result
else:
result = []
max_progress_scanned = 0
for event in r['Payload']:
if 'Records' in event:
records = event['Records']
result.append(records.copy())
if 'Progress' in event:
if(event['Progress']['Details']['BytesScanned'] > max_progress_scanned):
max_progress_scanned = event['Progress']['Details']['BytesScanned']
result_status['Progress'] = event['Progress']
if 'Stats' in event:
result_status['Stats'] = event['Stats']
if 'End' in event:
result_status['End'] = event['End']
if progress == False: if progress == False:
return result return result
@ -1559,7 +1568,6 @@ def test_schema_definition():
# using the scheme on first line, query is using the attach schema # using the scheme on first line, query is using the attach schema
res_use = remove_xml_tags_from_result( run_s3select(bucket_name,csv_obj_name,"select c1,c3 from s3object;",csv_header_info="USE") ).replace("\n","") res_use = remove_xml_tags_from_result( run_s3select(bucket_name,csv_obj_name,"select c1,c3 from s3object;",csv_header_info="USE") ).replace("\n","")
# result of both queries should be the same # result of both queries should be the same
s3select_assert_result( res_ignore, res_use) s3select_assert_result( res_ignore, res_use)