forked from TrueCloudLab/s3-tests
Merge pull request #402 from galsalomon66/progress-stats
s3select-tests: new-s3select-responses, output-serialization, presto alignments
This commit is contained in:
commit
9a6a1e9f19
1 changed files with 139 additions and 13 deletions
|
@ -3,6 +3,7 @@ import random
|
|||
import string
|
||||
import re
|
||||
from nose.plugins.attrib import attr
|
||||
from botocore.exceptions import ClientError
|
||||
|
||||
import uuid
|
||||
from nose.tools import eq_ as eq
|
||||
|
@ -218,7 +219,46 @@ def upload_csv_object(bucket_name,new_key,obj):
|
|||
eq(response['Body'].read().decode('utf-8'), obj, 's3select error[ downloaded object not equal to uploaded objecy')
|
||||
|
||||
|
||||
def run_s3select(bucket,key,query,column_delim=",",row_delim="\n",quot_char='"',esc_char='\\',csv_header_info="NONE"):
|
||||
def run_s3select(bucket,key,query,column_delim=",",row_delim="\n",quot_char='"',esc_char='\\',csv_header_info="NONE", progress = False):
|
||||
|
||||
s3 = get_client()
|
||||
result = ""
|
||||
try:
|
||||
r = s3.select_object_content(
|
||||
Bucket=bucket,
|
||||
Key=key,
|
||||
ExpressionType='SQL',
|
||||
InputSerialization = {"CSV": {"RecordDelimiter" : row_delim, "FieldDelimiter" : column_delim,"QuoteEscapeCharacter": esc_char, "QuoteCharacter": quot_char, "FileHeaderInfo": csv_header_info}, "CompressionType": "NONE"},
|
||||
OutputSerialization = {"CSV": {}},
|
||||
Expression=query,
|
||||
RequestProgress = {"Enabled": progress})
|
||||
|
||||
except ClientError as c:
|
||||
result += str(c)
|
||||
return result
|
||||
|
||||
if progress == False:
|
||||
for event in r['Payload']:
|
||||
if 'Records' in event:
|
||||
records = event['Records']['Payload'].decode('utf-8')
|
||||
result += records
|
||||
else:
|
||||
result = []
|
||||
for event in r['Payload']:
|
||||
if 'Records' in event:
|
||||
records = event['Records']
|
||||
result.append(records.copy())
|
||||
if 'Progress' in event:
|
||||
progress = event['Progress']
|
||||
result.append(progress.copy())
|
||||
if 'Stats' in event:
|
||||
stats = event['Stats']
|
||||
result.append(stats.copy())
|
||||
if 'End' in event:
|
||||
end = event['End']
|
||||
result.append(end.copy())
|
||||
return result
|
||||
def run_s3select_output(bucket,key,query, quot_field, op_column_delim = ",", op_row_delim = "\n", column_delim=",", op_quot_char = '"', op_esc_char = '\\', row_delim="\n",quot_char='"',esc_char='\\',csv_header_info="NONE"):
|
||||
|
||||
s3 = get_client()
|
||||
|
||||
|
@ -227,7 +267,7 @@ def run_s3select(bucket,key,query,column_delim=",",row_delim="\n",quot_char='"',
|
|||
Key=key,
|
||||
ExpressionType='SQL',
|
||||
InputSerialization = {"CSV": {"RecordDelimiter" : row_delim, "FieldDelimiter" : column_delim,"QuoteEscapeCharacter": esc_char, "QuoteCharacter": quot_char, "FileHeaderInfo": csv_header_info}, "CompressionType": "NONE"},
|
||||
OutputSerialization = {"CSV": {}},
|
||||
OutputSerialization = {"CSV": {"RecordDelimiter" : op_row_delim, "FieldDelimiter" : op_column_delim, "QuoteCharacter" : op_quot_char, "QuoteEscapeCharacter" : op_esc_char, "QuoteFields" : quot_field}},
|
||||
Expression=query,)
|
||||
|
||||
result = ""
|
||||
|
@ -266,7 +306,7 @@ def create_list_of_int(column_pos,obj,field_split=",",row_split="\n"):
|
|||
col_num+=1
|
||||
|
||||
return list_of_int
|
||||
|
||||
|
||||
@attr('s3select')
|
||||
def test_count_operation():
|
||||
csv_obj_name = get_random_string()
|
||||
|
@ -466,7 +506,7 @@ def test_lowerupper_expressions():
|
|||
@attr('s3select')
|
||||
def test_in_expressions():
|
||||
|
||||
# purpose of test: engine is process correctly several projections containing aggregation-functions
|
||||
# purpose of test: engine is process correctly several projections containing aggregation-functions
|
||||
csv_obj = create_random_csv_object(10000,10)
|
||||
|
||||
csv_obj_name = get_random_string()
|
||||
|
@ -605,6 +645,12 @@ def test_like_expressions():
|
|||
|
||||
s3select_assert_result( res_s3select_like, res_s3select )
|
||||
|
||||
res_s3select_like = remove_xml_tags_from_result( run_s3select(bucket_name,csv_obj_name,'select count(*) from stdin where _1 like "%aeio%" like;')).replace("\n","")
|
||||
|
||||
find_like = res_s3select_like.find("s3select-Syntax-Error")
|
||||
|
||||
assert int(find_like) >= 0
|
||||
|
||||
res_s3select_like = remove_xml_tags_from_result( run_s3select(bucket_name,csv_obj_name,'select (_1 like "cbcd%") from s3object;')).replace("\n","")
|
||||
|
||||
res_s3select = remove_xml_tags_from_result( run_s3select(bucket_name,csv_obj_name, 'select (substring(_1,1,4) = "cbcd") from s3object;')).replace("\n","")
|
||||
|
@ -794,20 +840,17 @@ def test_datetime():
|
|||
|
||||
s3select_assert_result( res_s3select_date_time, res_s3select_substring)
|
||||
|
||||
res_s3select_date_time = remove_xml_tags_from_result( run_s3select(bucket_name,csv_obj_name,'select count(0) from s3object where date_diff(month,to_timestamp(_1),date_add(month,2,to_timestamp(_1)) ) = 2;') )
|
||||
res_s3select_date_time_to_string = remove_xml_tags_from_result( run_s3select(bucket_name,csv_obj_name,'select cast(to_string(to_timestamp(_1), \'x\') as int) from s3object;') )
|
||||
|
||||
res_s3select_count = remove_xml_tags_from_result( run_s3select(bucket_name,csv_obj_name,'select count(0) from s3object;') )
|
||||
res_s3select_date_time_extract = remove_xml_tags_from_result( run_s3select(bucket_name,csv_obj_name,'select extract(timezone_hour from to_timestamp(_1)) from s3object;') )
|
||||
|
||||
s3select_assert_result( res_s3select_date_time, res_s3select_count)
|
||||
s3select_assert_result( res_s3select_date_time_to_string, res_s3select_date_time_extract )
|
||||
|
||||
res_s3select_date_time = remove_xml_tags_from_result( run_s3select(bucket_name,csv_obj_name,'select count(0) from s3object where date_diff(year,to_timestamp(_1),date_add(day, 366 ,to_timestamp(_1))) = 1 ;') )
|
||||
res_s3select_date_time_to_timestamp = remove_xml_tags_from_result( run_s3select(bucket_name,csv_obj_name,'select extract(month from to_timestamp(_1)) from s3object where extract(month from to_timestamp(_1)) = 5;') )
|
||||
|
||||
s3select_assert_result( res_s3select_date_time, res_s3select_count)
|
||||
res_s3select_substring = remove_xml_tags_from_result( run_s3select(bucket_name,csv_obj_name,'select cast(substring(_1, 5, 2) as int) from s3object where _1 like \'____05%\';') )
|
||||
|
||||
# validate that utcnow is integrate correctly with other date-time functions
|
||||
res_s3select_date_time_utcnow = remove_xml_tags_from_result( run_s3select(bucket_name,csv_obj_name,'select count(0) from s3object where date_diff(hour,utcnow(),date_add(day,1,utcnow())) = 24 ;') )
|
||||
|
||||
s3select_assert_result( res_s3select_date_time_utcnow, res_s3select_count)
|
||||
s3select_assert_result( res_s3select_date_time_to_timestamp, res_s3select_substring)
|
||||
|
||||
@attr('s3select')
|
||||
def test_true_false_datetime():
|
||||
|
@ -838,6 +881,7 @@ def test_true_false_datetime():
|
|||
|
||||
s3select_assert_result( res_s3select_date_time, res_s3select_count)
|
||||
|
||||
# validate that utcnow is integrate correctly with other date-time functions
|
||||
res_s3select_date_time_utcnow = remove_xml_tags_from_result( run_s3select(bucket_name,csv_obj_name,'select count(0) from s3object where (date_diff(hour,utcnow(),date_add(day,1,utcnow())) = 24) = true ;') )
|
||||
|
||||
s3select_assert_result( res_s3select_date_time_utcnow, res_s3select_count)
|
||||
|
@ -939,6 +983,10 @@ def test_schema_definition():
|
|||
|
||||
assert res_multiple_defintion.find("alias {c11} or column not exist in schema") > 0
|
||||
|
||||
find_processing_error = res_multiple_defintion.find("s3select-ProcessingTime-Error")
|
||||
|
||||
assert int(find_processing_error) >= 0
|
||||
|
||||
# alias-name is identical to column-name
|
||||
res_multiple_defintion = remove_xml_tags_from_result( run_s3select(bucket_name,csv_obj_name,"select int(c1)+int(c2) as c4,c4 from s3object;",csv_header_info="USE") ).replace("\n","")
|
||||
|
||||
|
@ -1162,3 +1210,81 @@ def test_bool_cast_expressions():
|
|||
res_s3select = remove_xml_tags_from_result( run_s3select(bucket_name,csv_obj_name, 'select count(*) from s3object where cast(_1 as int) != 0 ;')).replace("\n","")
|
||||
|
||||
s3select_assert_result( res_s3select_cast, res_s3select )
|
||||
|
||||
@attr('s3select')
|
||||
def test_progress_expressions():
|
||||
|
||||
csv_obj = create_random_csv_object(1000000,10)
|
||||
|
||||
csv_obj_name = get_random_string()
|
||||
bucket_name = "test"
|
||||
upload_csv_object(bucket_name,csv_obj_name,csv_obj)
|
||||
|
||||
obj_size = len(csv_obj.encode('utf-8'))
|
||||
|
||||
res_s3select_response = run_s3select(bucket_name,csv_obj_name,"select sum(int(_1)) from s3object;",progress = True)
|
||||
records_payload_size = len(remove_xml_tags_from_result( run_s3select(bucket_name,csv_obj_name, 'select sum(int(_1)) from s3object;')).replace("\n",""))
|
||||
|
||||
total_response = len(res_s3select_response)
|
||||
|
||||
# To do: Validate bytes processed after supporting compressed data
|
||||
s3select_assert_result(obj_size, res_s3select_response[total_response-3]['Details']['BytesScanned'])
|
||||
s3select_assert_result(records_payload_size, res_s3select_response[total_response-3]['Details']['BytesReturned'])
|
||||
|
||||
# stats response payload validation
|
||||
s3select_assert_result(obj_size, res_s3select_response[total_response-2]['Details']['BytesScanned'])
|
||||
s3select_assert_result(records_payload_size, res_s3select_response[total_response-2]['Details']['BytesReturned'])
|
||||
|
||||
# end response
|
||||
s3select_assert_result({}, res_s3select_response[total_response-1])
|
||||
|
||||
@attr('s3select')
|
||||
def test_output_serial_expressions():
|
||||
|
||||
csv_obj = create_random_csv_object(10000,10)
|
||||
|
||||
csv_obj_name = get_random_string()
|
||||
bucket_name = "test"
|
||||
upload_csv_object(bucket_name,csv_obj_name,csv_obj)
|
||||
|
||||
res_s3select_1 = remove_xml_tags_from_result( run_s3select_output(bucket_name,csv_obj_name,"select _1, _2 from s3object where nullif(_1,_2) is null ;", "ALWAYS") ).replace("\n","")
|
||||
|
||||
res_s3select = remove_xml_tags_from_result( run_s3select(bucket_name,csv_obj_name,"select _1, _2 from s3object where _1 = _2 ;") ).replace("\n","")
|
||||
|
||||
res_s3select_list = res_s3select.split(',')
|
||||
|
||||
res_s3select_list.pop()
|
||||
|
||||
res_s3select_final = (','.join('"' + item + '"' for item in res_s3select_list))
|
||||
|
||||
res_s3select_final += ','
|
||||
|
||||
s3select_assert_result( res_s3select_1, res_s3select_final)
|
||||
|
||||
res_s3select_in = remove_xml_tags_from_result( run_s3select_output(bucket_name,csv_obj_name,'select int(_1) from s3object where (int(_1) in(int(_2)));', "ASNEEDED", '$', '#')).replace("\n","")
|
||||
|
||||
res_s3select = remove_xml_tags_from_result( run_s3select(bucket_name,csv_obj_name,'select int(_1) from s3object where int(_1) = int(_2);')).replace("\n","")
|
||||
|
||||
res_s3select_list = res_s3select.split(',')
|
||||
|
||||
res_s3select_list.pop()
|
||||
|
||||
res_s3select_final = ('#'.join(item + '$' for item in res_s3select_list))
|
||||
|
||||
res_s3select_final += '#'
|
||||
|
||||
s3select_assert_result( res_s3select_in, res_s3select_final )
|
||||
|
||||
res_s3select_quot = remove_xml_tags_from_result( run_s3select_output(bucket_name,csv_obj_name,'select int(_1) from s3object where (int(_1) in(int(_2)));', "ALWAYS", '$', '#')).replace("\n","")
|
||||
|
||||
res_s3select = remove_xml_tags_from_result( run_s3select(bucket_name,csv_obj_name,'select int(_1) from s3object where int(_1) = int(_2);')).replace("\n","")
|
||||
|
||||
res_s3select_list = res_s3select.split(',')
|
||||
|
||||
res_s3select_list.pop()
|
||||
|
||||
res_s3select_final = ('#'.join('"' + item + '"' + '$' for item in res_s3select_list))
|
||||
|
||||
res_s3select_final += '#'
|
||||
|
||||
s3select_assert_result( res_s3select_quot, res_s3select_final )
|
||||
|
|
Loading…
Reference in a new issue