modifying of the run_s3select routine; to handle the different statuses (progress,stats,end)

Signed-off-by: galsalomon66 <gal.salomon@gmail.com>
This commit is contained in:
galsalomon66 2023-04-09 15:02:24 +03:00
parent 89bbe654ca
commit bc2a3b0b70

View file

@ -275,6 +275,7 @@ def run_s3select(bucket,key,query,column_delim=",",row_delim="\n",quot_char='"',
s3 = get_client()
result = ""
result_status = {}
try:
r = s3.select_object_content(
Bucket=bucket,
@ -302,14 +303,18 @@ def run_s3select(bucket,key,query,column_delim=",",row_delim="\n",quot_char='"',
result.append(records.copy())
if 'Progress' in event:
progress = event['Progress']
result.append(progress.copy())
result_status['Progress'] = event['Progress']
if 'Stats' in event:
stats = event['Stats']
result.append(stats.copy())
result_status['Stats'] = event['Stats']
if 'End' in event:
end = event['End']
result.append(end.copy())
return result
result_status['End'] = event['End']
if progress == False:
return result
else:
return result,result_status
def run_s3select_output(bucket,key,query, quot_field, op_column_delim = ",", op_row_delim = "\n", column_delim=",", op_quot_char = '"', op_esc_char = '\\', row_delim="\n",quot_char='"',esc_char='\\',csv_header_info="NONE"):
@ -1550,21 +1555,26 @@ def test_progress_expressions():
obj_size = len(csv_obj.encode('utf-8'))
res_s3select_response = run_s3select(bucket_name,csv_obj_name,"select sum(int(_1)) from s3object;",progress = True)
records_payload_size = len(remove_xml_tags_from_result( run_s3select(bucket_name,csv_obj_name, 'select sum(int(_1)) from s3object;')).replace("\n",""))
result_status = {}
result_size = 0
total_response = len(res_s3select_response)
res_s3select_response,result_status = run_s3select(bucket_name,csv_obj_name,"select sum(int(_1)) from s3object;",progress = True)
for rec in res_s3select_response:
result_size += len(rec['Payload'])
records_payload_size = result_size
# To do: Validate bytes processed after supporting compressed data
s3select_assert_result(obj_size, res_s3select_response[total_response-3]['Details']['BytesScanned'])
s3select_assert_result(records_payload_size, res_s3select_response[total_response-3]['Details']['BytesReturned'])
s3select_assert_result(obj_size, result_status['Progress']['Details']['BytesScanned'])
s3select_assert_result(records_payload_size, result_status['Progress']['Details']['BytesReturned'])
# stats response payload validation
s3select_assert_result(obj_size, res_s3select_response[total_response-2]['Details']['BytesScanned'])
s3select_assert_result(records_payload_size, res_s3select_response[total_response-2]['Details']['BytesReturned'])
s3select_assert_result(obj_size, result_status['Stats']['Details']['BytesScanned'])
s3select_assert_result(records_payload_size, result_status['Stats']['Details']['BytesReturned'])
# end response
s3select_assert_result({}, res_s3select_response[total_response-1])
s3select_assert_result({}, result_status['End'])
@pytest.mark.s3select
def test_output_serial_expressions():