add handling for EventStreamError exception

Signed-off-by: Gal Salomon <gal.salomon@gmail.com>
(cherry picked from commit 77f1334571)
This commit is contained in:
Gal Salomon 2024-04-17 18:20:37 +03:00 committed by Casey Bodley
parent f4e362dc3c
commit 565e5c8b9b

View file

@ -4,6 +4,7 @@ import string
import re import re
import json import json
from botocore.exceptions import ClientError from botocore.exceptions import ClientError
from botocore.exceptions import EventStreamError
import uuid import uuid
@ -276,6 +277,7 @@ def run_s3select(bucket,key,query,column_delim=",",row_delim="\n",quot_char='"',
s3 = get_client() s3 = get_client()
result = "" result = ""
result_status = {} result_status = {}
try: try:
r = s3.select_object_content( r = s3.select_object_content(
Bucket=bucket, Bucket=bucket,
@ -291,26 +293,34 @@ def run_s3select(bucket,key,query,column_delim=",",row_delim="\n",quot_char='"',
return result return result
if progress == False: if progress == False:
for event in r['Payload']:
if 'Records' in event:
records = event['Records']['Payload'].decode('utf-8')
result += records
else:
result = []
max_progress_scanned = 0
for event in r['Payload']:
if 'Records' in event:
records = event['Records']
result.append(records.copy())
if 'Progress' in event:
if(event['Progress']['Details']['BytesScanned'] > max_progress_scanned):
max_progress_scanned = event['Progress']['Details']['BytesScanned']
result_status['Progress'] = event['Progress']
if 'Stats' in event: try:
result_status['Stats'] = event['Stats'] for event in r['Payload']:
if 'End' in event: if 'Records' in event:
result_status['End'] = event['End'] records = event['Records']['Payload'].decode('utf-8')
result += records
except EventStreamError as c:
result = str(c)
return result
else:
result = []
max_progress_scanned = 0
for event in r['Payload']:
if 'Records' in event:
records = event['Records']
result.append(records.copy())
if 'Progress' in event:
if(event['Progress']['Details']['BytesScanned'] > max_progress_scanned):
max_progress_scanned = event['Progress']['Details']['BytesScanned']
result_status['Progress'] = event['Progress']
if 'Stats' in event:
result_status['Stats'] = event['Stats']
if 'End' in event:
result_status['End'] = event['End']
if progress == False: if progress == False:
return result return result
@ -1141,8 +1151,6 @@ def test_alias():
@pytest.mark.s3select @pytest.mark.s3select
def test_alias_cyclic_refernce(): def test_alias_cyclic_refernce():
## TEMP : RGW may return error-status that it is not handled by this test
return
number_of_rows = 10000 number_of_rows = 10000
# purpose of test is to validate the s3select-engine is able to detect a cyclic reference to alias. # purpose of test is to validate the s3select-engine is able to detect a cyclic reference to alias.
@ -1296,8 +1304,6 @@ def test_csv_definition():
@pytest.mark.s3select @pytest.mark.s3select
def test_schema_definition(): def test_schema_definition():
## TEMP : RGW may return error-status that it is not handled by this test
return
number_of_rows = 10000 number_of_rows = 10000
# purpose of test is to validate functionality using csv header info # purpose of test is to validate functionality using csv header info
@ -1313,7 +1319,6 @@ def test_schema_definition():
# using the scheme on first line, query is using the attach schema # using the scheme on first line, query is using the attach schema
res_use = remove_xml_tags_from_result( run_s3select(bucket_name,csv_obj_name,"select c1,c3 from s3object;",csv_header_info="USE") ).replace("\n","") res_use = remove_xml_tags_from_result( run_s3select(bucket_name,csv_obj_name,"select c1,c3 from s3object;",csv_header_info="USE") ).replace("\n","")
# result of both queries should be the same # result of both queries should be the same
s3select_assert_result( res_ignore, res_use) s3select_assert_result( res_ignore, res_use)