From 565e5c8b9ba8a2ef769b133083907667e9c9104d Mon Sep 17 00:00:00 2001 From: Gal Salomon Date: Wed, 17 Apr 2024 18:20:37 +0300 Subject: [PATCH] add handling for EventStreamError exception Signed-off-by: Gal Salomon (cherry picked from commit 77f1334571416e110d27f574c7f563d8c9873d9b) --- s3tests_boto3/functional/test_s3select.py | 53 +++++++++++++---------- 1 file changed, 29 insertions(+), 24 deletions(-) diff --git a/s3tests_boto3/functional/test_s3select.py b/s3tests_boto3/functional/test_s3select.py index ec8e1a8..b8533f9 100644 --- a/s3tests_boto3/functional/test_s3select.py +++ b/s3tests_boto3/functional/test_s3select.py @@ -4,6 +4,7 @@ import string import re import json from botocore.exceptions import ClientError +from botocore.exceptions import EventStreamError import uuid @@ -276,6 +277,7 @@ def run_s3select(bucket,key,query,column_delim=",",row_delim="\n",quot_char='"', s3 = get_client() result = "" result_status = {} + try: r = s3.select_object_content( Bucket=bucket, @@ -291,26 +293,34 @@ def run_s3select(bucket,key,query,column_delim=",",row_delim="\n",quot_char='"', return result if progress == False: - for event in r['Payload']: - if 'Records' in event: - records = event['Records']['Payload'].decode('utf-8') - result += records - else: - result = [] - max_progress_scanned = 0 - for event in r['Payload']: - if 'Records' in event: - records = event['Records'] - result.append(records.copy()) - if 'Progress' in event: - if(event['Progress']['Details']['BytesScanned'] > max_progress_scanned): - max_progress_scanned = event['Progress']['Details']['BytesScanned'] - result_status['Progress'] = event['Progress'] - if 'Stats' in event: - result_status['Stats'] = event['Stats'] - if 'End' in event: - result_status['End'] = event['End'] + try: + for event in r['Payload']: + if 'Records' in event: + records = event['Records']['Payload'].decode('utf-8') + result += records + + except EventStreamError as c: + result = str(c) + return result + + else: + result = [] + max_progress_scanned = 0 + for event in r['Payload']: + if 'Records' in event: + records = event['Records'] + result.append(records.copy()) + if 'Progress' in event: + if(event['Progress']['Details']['BytesScanned'] > max_progress_scanned): + max_progress_scanned = event['Progress']['Details']['BytesScanned'] + result_status['Progress'] = event['Progress'] + + if 'Stats' in event: + result_status['Stats'] = event['Stats'] + if 'End' in event: + result_status['End'] = event['End'] + if progress == False: return result @@ -1141,8 +1151,6 @@ def test_alias(): @pytest.mark.s3select def test_alias_cyclic_refernce(): - ## TEMP : RGW may return error-status that it is not handled by this test - return number_of_rows = 10000 # purpose of test is to validate the s3select-engine is able to detect a cyclic reference to alias. @@ -1296,8 +1304,6 @@ def test_csv_definition(): @pytest.mark.s3select def test_schema_definition(): - ## TEMP : RGW may return error-status that it is not handled by this test - return number_of_rows = 10000 # purpose of test is to validate functionality using csv header info @@ -1313,7 +1319,6 @@ def test_schema_definition(): # using the scheme on first line, query is using the attach schema res_use = remove_xml_tags_from_result( run_s3select(bucket_name,csv_obj_name,"select c1,c3 from s3object;",csv_header_info="USE") ).replace("\n","") - # result of both queries should be the same s3select_assert_result( res_ignore, res_use)