rgw/s3select: json output format for csv, json & parquet

Signed-off-by: Albin Antony <albinantony20@gmail.com>
This commit is contained in:
Albin Antony 2023-11-16 15:25:53 +05:30
parent 00b9a2a291
commit 2e53973095
2 changed files with 426 additions and 194 deletions

View file

@ -13,3 +13,5 @@ httplib2
lxml lxml
pytest pytest
tox tox
pandas
pyarrow

View file

@ -5,6 +5,10 @@ import re
import json import json
from botocore.exceptions import ClientError from botocore.exceptions import ClientError
import pandas as pd
import pyarrow as pa
import pyarrow.parquet as pq
import uuid import uuid
from . import ( from . import (
@ -235,30 +239,45 @@ def create_random_json_object(rows,columns,col_delim=",",record_delim="\n",csv_s
return result return result
def csv_to_json(obj, field_split=",",row_split="\n",csv_schema=""): def create_parquet_object(parquet_size):
result = "{\"root\" : [" # Initialize lists with random integers
result += row_split a = [random.randint(1, 10000) for _ in range(parquet_size)]
if len(csv_schema)>0 : b = [random.randint(1, 10000) for _ in range(parquet_size)]
result = csv_schema + row_split c = [random.randint(1, 10000) for _ in range(parquet_size)]
d = [random.randint(1, 10000) for _ in range(parquet_size)]
for rec in obj.split(row_split):
row = ""
num = 0
row += "{"
for col in rec.split(field_split):
if col == "":
break
num += 1
row = row + "\"c" + str(num) + "\"" + ": " "{}{}".format(col,field_split)
row = row[:-1]
row += "}"
row += ","
result += row + row_split
result = result[:-5]
result += row_split
result += "]" + "}"
# Create DataFrame
df3 = pd.DataFrame({'a': a, 'b': b, 'c': c, 'd': d})
# Create Parquet object
table = pa.Table.from_pandas(df3, preserve_index=False)
obj = pa.BufferOutputStream()
pq.write_table(table, obj)
return obj.getvalue().to_pybytes()
def csv_to_json(obj, field_split=",", row_split="\n", csv_schema=""):
result = "{\"root\" : ["
rows = obj.split(row_split)
for rec in rows:
if rec.strip() == "":
continue
row = "{"
columns = rec.split(field_split)
for i, col in enumerate(columns):
if col.strip() == "":
continue
if col.isdigit() or (col.replace('.', '', 1).isdigit() and col.count('.') < 2):
row += "\"c{}\": {}, ".format(i + 1, col)
else:
row += "\"c{}\": \"{}\", ".format(i + 1, col)
row = row.rstrip(', ') + "},"
result += row + row_split
result = result.rstrip(',\n')
result += "]}"
return result return result
def upload_object(bucket_name,new_key,obj): def upload_object(bucket_name,new_key,obj):
@ -272,20 +291,44 @@ def upload_object(bucket_name,new_key,obj):
response = c2.get_object(Bucket=bucket_name, Key=new_key) response = c2.get_object(Bucket=bucket_name, Key=new_key)
assert response['Body'].read().decode('utf-8') == obj, 's3select error[ downloaded object not equal to uploaded objecy' assert response['Body'].read().decode('utf-8') == obj, 's3select error[ downloaded object not equal to uploaded objecy'
def run_s3select(bucket,key,query,column_delim=",",row_delim="\n",quot_char='"',esc_char='\\',csv_header_info="NONE", progress = False): def upload_parquet_object(bucket_name,parquet_obj_name,obj):
client = get_client()
client.create_bucket(Bucket=bucket_name)
client.put_object(Bucket=bucket_name, Key=parquet_obj_name, Body=obj)
def run_s3select(bucket,key,query,input="CSV",output="CSV",quot_field="", op_column_delim = ",", op_row_delim = "\n",op_quot_char = '"', op_esc_char = '\\',output_fields = False,column_delim=",",row_delim="\n",quot_char='"',esc_char='\\',csv_header_info="NONE", progress = False):
s3 = get_client() s3 = get_client()
result = ""
result_status = {} result_status = {}
result = ""
output_serialization = {"CSV": {}}
if input == "JSON":
input_serialization = {"JSON": {"Type": "DOCUMENT"}}
if(output == "JSON"):
output_serialization = {"JSON": {}}
elif(input == "CSV"):
input_serialization = {"CSV": {"RecordDelimiter" : row_delim, "FieldDelimiter" : column_delim,"QuoteEscapeCharacter": esc_char, "QuoteCharacter": quot_char, "FileHeaderInfo": csv_header_info}, "CompressionType": "NONE"}
if(output == "JSON"):
output_serialization = {"JSON": {}}
if(output_fields == True):
output_serialization = {"CSV": {"RecordDelimiter" : op_row_delim, "FieldDelimiter" : op_column_delim, "QuoteCharacter" : op_quot_char, "QuoteEscapeCharacter" : op_esc_char, "QuoteFields" : quot_field}}
elif(input == "PARQUET"):
input_serialization = {'Parquet': {}}
if(output == "JSON"):
output_serialization = {"JSON": {}}
try: try:
r = s3.select_object_content( r = s3.select_object_content(
Bucket=bucket, Bucket=bucket,
Key=key, Key=key,
ExpressionType='SQL', ExpressionType='SQL',
InputSerialization = {"CSV": {"RecordDelimiter" : row_delim, "FieldDelimiter" : column_delim,"QuoteEscapeCharacter": esc_char, "QuoteCharacter": quot_char, "FileHeaderInfo": csv_header_info}, "CompressionType": "NONE"}, InputSerialization = input_serialization,
OutputSerialization = {"CSV": {}}, OutputSerialization = output_serialization,
Expression=query, Expression=query,
RequestProgress = {"Enabled": progress}) RequestProgress = {"Enabled": progress})
#Record delimiter optional in output serialization
except ClientError as c: except ClientError as c:
result += str(c) result += str(c)
@ -318,47 +361,6 @@ def run_s3select(bucket,key,query,column_delim=",",row_delim="\n",quot_char='"',
else: else:
return result,result_status return result,result_status
def run_s3select_output(bucket,key,query, quot_field, op_column_delim = ",", op_row_delim = "\n", column_delim=",", op_quot_char = '"', op_esc_char = '\\', row_delim="\n",quot_char='"',esc_char='\\',csv_header_info="NONE"):
s3 = get_client()
r = s3.select_object_content(
Bucket=bucket,
Key=key,
ExpressionType='SQL',
InputSerialization = {"CSV": {"RecordDelimiter" : row_delim, "FieldDelimiter" : column_delim,"QuoteEscapeCharacter": esc_char, "QuoteCharacter": quot_char, "FileHeaderInfo": csv_header_info}, "CompressionType": "NONE"},
OutputSerialization = {"CSV": {"RecordDelimiter" : op_row_delim, "FieldDelimiter" : op_column_delim, "QuoteCharacter" : op_quot_char, "QuoteEscapeCharacter" : op_esc_char, "QuoteFields" : quot_field}},
Expression=query,)
result = ""
for event in r['Payload']:
if 'Records' in event:
records = event['Records']['Payload'].decode('utf-8')
result += records
return result
def run_s3select_json(bucket,key,query, op_row_delim = "\n"):
s3 = get_client()
r = s3.select_object_content(
Bucket=bucket,
Key=key,
ExpressionType='SQL',
InputSerialization = {"JSON": {"Type": "DOCUMENT"}},
OutputSerialization = {"JSON": {}},
Expression=query,)
#Record delimiter optional in output serialization
result = ""
for event in r['Payload']:
if 'Records' in event:
records = event['Records']['Payload'].decode('utf-8')
result += records
return result
def remove_xml_tags_from_result(obj): def remove_xml_tags_from_result(obj):
result = "" result = ""
for rec in obj.split("\n"): for rec in obj.split("\n"):
@ -388,6 +390,13 @@ def create_list_of_int(column_pos,obj,field_split=",",row_split="\n"):
return list_of_int return list_of_int
def get_max_from_parquet_column(parquet_obj, column_name):
table = pq.read_table(pa.BufferReader(parquet_obj))
df = table.to_pandas()
return df[column_name].max()
@pytest.mark.s3select @pytest.mark.s3select
def test_count_operation(): def test_count_operation():
csv_obj_name = get_random_string() csv_obj_name = get_random_string()
@ -396,7 +405,7 @@ def test_count_operation():
obj_to_load = create_random_csv_object(num_of_rows,10) obj_to_load = create_random_csv_object(num_of_rows,10)
upload_object(bucket_name,csv_obj_name,obj_to_load) upload_object(bucket_name,csv_obj_name,obj_to_load)
res = remove_xml_tags_from_result( run_s3select(bucket_name,csv_obj_name,"select count(0) from s3object;") ).replace(",","") res = remove_xml_tags_from_result( run_s3select(bucket_name,csv_obj_name,"select count(0) from s3object;") ).replace(",","")
s3select_assert_result( num_of_rows, int( res )) s3select_assert_result( num_of_rows, int( res ))
@pytest.mark.s3select @pytest.mark.s3select
@ -407,125 +416,17 @@ def test_count_json_operation():
num_of_rows = 1 num_of_rows = 1
obj_to_load = create_random_json_object(num_of_rows,10) obj_to_load = create_random_json_object(num_of_rows,10)
upload_object(bucket_name,json_obj_name,obj_to_load) upload_object(bucket_name,json_obj_name,obj_to_load)
res = remove_xml_tags_from_result(run_s3select_json(bucket_name,json_obj_name,"select count(0) from s3object[*];")) res = run_s3select(bucket_name,json_obj_name,"select count(0) from s3object[*];","JSON","JSON")
s3select_assert_result( 1, int(res)) s3select_assert_result( '{"_1":1}\n', res)
res = remove_xml_tags_from_result(run_s3select_json(bucket_name,json_obj_name,"select count(0) from s3object[*].root;")) res = run_s3select(bucket_name,json_obj_name,"select count(0) from s3object[*].root;","JSON","JSON")
s3select_assert_result( 1, int(res)) s3select_assert_result( '{"_1":1}\n', res)
json_obj_name = get_random_string()
obj_to_load = create_random_json_object(3,10) obj_to_load = create_random_json_object(3,10)
upload_object(bucket_name,json_obj_name,obj_to_load) upload_object(bucket_name,json_obj_name,obj_to_load)
res = remove_xml_tags_from_result(run_s3select_json(bucket_name,json_obj_name,"select count(0) from s3object[*].root;")) res = run_s3select(bucket_name,json_obj_name,"select count(0) from s3object[*].root;","JSON","JSON")
s3select_assert_result( 3, int(res)) s3select_assert_result( '{"_1":3}\n', res)
@pytest.mark.s3select
def test_json_column_sum_min_max():
csv_obj = create_random_csv_object(10000,10)
json_obj = csv_to_json(csv_obj);
json_obj_name = get_random_string()
bucket_name = get_new_bucket_name()
upload_object(bucket_name,json_obj_name,json_obj)
json_obj_name_2 = get_random_string()
bucket_name_2 = "testbuck2"
upload_object(bucket_name_2,json_obj_name_2,json_obj)
res_s3select = remove_xml_tags_from_result( run_s3select_json(bucket_name,json_obj_name,"select min(_1.c1) from s3object[*].root;") ).replace(",","")
list_int = create_list_of_int( 1 , csv_obj )
res_target = min( list_int )
s3select_assert_result( int(res_s3select), int(res_target))
res_s3select = remove_xml_tags_from_result( run_s3select_json(bucket_name,json_obj_name,"select min(_1.c4) from s3object[*].root;") ).replace(",","")
list_int = create_list_of_int( 4 , csv_obj )
res_target = min( list_int )
s3select_assert_result( int(res_s3select), int(res_target))
res_s3select = remove_xml_tags_from_result( run_s3select_json(bucket_name,json_obj_name,"select avg(_1.c6) from s3object[*].root;") ).replace(",","")
list_int = create_list_of_int( 6 , csv_obj )
res_target = float(sum(list_int ))/10000
s3select_assert_result( float(res_s3select), float(res_target))
res_s3select = remove_xml_tags_from_result( run_s3select_json(bucket_name,json_obj_name,"select max(_1.c4) from s3object[*].root;") ).replace(",","")
list_int = create_list_of_int( 4 , csv_obj )
res_target = max( list_int )
s3select_assert_result( int(res_s3select), int(res_target))
res_s3select = remove_xml_tags_from_result( run_s3select_json(bucket_name,json_obj_name,"select max(_1.c7) from s3object[*].root;") ).replace(",","")
list_int = create_list_of_int( 7 , csv_obj )
res_target = max( list_int )
s3select_assert_result( int(res_s3select), int(res_target))
res_s3select = remove_xml_tags_from_result( run_s3select_json(bucket_name,json_obj_name,"select sum(_1.c4) from s3object[*].root;") ).replace(",","")
list_int = create_list_of_int( 4 , csv_obj )
res_target = sum( list_int )
s3select_assert_result( int(res_s3select), int(res_target))
res_s3select = remove_xml_tags_from_result( run_s3select_json(bucket_name,json_obj_name,"select sum(_1.c7) from s3object[*].root;") ).replace(",","")
list_int = create_list_of_int( 7 , csv_obj )
res_target = sum( list_int )
s3select_assert_result( int(res_s3select) , int(res_target) )
# the following queries, validates on *random* input an *accurate* relation between condition result,sum operation and count operation.
res_s3select = remove_xml_tags_from_result( run_s3select_json(bucket_name_2,json_obj_name_2,"select count(0),sum(_1.c1),sum(_1.c2) from s3object[*].root where (_1.c1-_1.c2) = 2;" ) )
count,sum1,sum2 = res_s3select.split(",")
s3select_assert_result( int(count)*2 , int(sum1)-int(sum2 ) )
res_s3select = remove_xml_tags_from_result( run_s3select_json(bucket_name,json_obj_name,"select count(0),sum(_1.c1),sum(_1.c2) from s3object[*].root where (_1.c1-_1.c2) = 4;" ) )
count,sum1,sum2 = res_s3select.split(",")
s3select_assert_result( int(count)*4 , int(sum1)-int(sum2) )
@pytest.mark.s3select
def test_json_nullif_expressions():
json_obj = create_random_json_object(10000,10)
json_obj_name = get_random_string()
bucket_name = get_new_bucket_name()
upload_object(bucket_name,json_obj_name,json_obj)
res_s3select_nullif = remove_xml_tags_from_result( run_s3select_json(bucket_name,json_obj_name,"select count(0) from s3object[*].root where nullif(_1.c1,_1.c2) is null ;") ).replace("\n","")
res_s3select = remove_xml_tags_from_result( run_s3select_json(bucket_name,json_obj_name,"select count(0) from s3object[*].root where _1.c1 = _1.c2 ;") ).replace("\n","")
s3select_assert_result( res_s3select_nullif, res_s3select)
res_s3select_nullif = remove_xml_tags_from_result( run_s3select_json(bucket_name,json_obj_name,"select (nullif(_1.c1,_1.c2) is null) from s3object[*].root ;") ).replace("\n","")
res_s3select = remove_xml_tags_from_result( run_s3select_json(bucket_name,json_obj_name,"select (_1.c1 = _1.c2) from s3object[*].root ;") ).replace("\n","")
s3select_assert_result( res_s3select_nullif, res_s3select)
res_s3select_nullif = remove_xml_tags_from_result( run_s3select_json(bucket_name,json_obj_name,"select count(0) from s3object[*].root where not nullif(_1.c1,_1.c2) is null ;") ).replace("\n","")
res_s3select = remove_xml_tags_from_result( run_s3select_json(bucket_name,json_obj_name,"select count(0) from s3object[*].root where _1.c1 != _1.c2 ;") ).replace("\n","")
s3select_assert_result( res_s3select_nullif, res_s3select)
res_s3select_nullif = remove_xml_tags_from_result( run_s3select_json(bucket_name,json_obj_name,"select (nullif(_1.c1,_1.c2) is not null) from s3object[*].root ;") ).replace("\n","")
res_s3select = remove_xml_tags_from_result( run_s3select_json(bucket_name,json_obj_name,"select (_1.c1 != _1.c2) from s3object[*].root ;") ).replace("\n","")
s3select_assert_result( res_s3select_nullif, res_s3select)
res_s3select_nullif = remove_xml_tags_from_result( run_s3select_json(bucket_name,json_obj_name,"select count(0) from s3object[*].root where nullif(_1.c1,_1.c2) = _1.c1 ;") ).replace("\n","")
res_s3select = remove_xml_tags_from_result( run_s3select_json(bucket_name,json_obj_name,"select count(0) from s3object[*].root where _1.c1 != _1.c2 ;") ).replace("\n","")
s3select_assert_result( res_s3select_nullif, res_s3select)
@pytest.mark.s3select @pytest.mark.s3select
def test_column_sum_min_max(): def test_column_sum_min_max():
@ -593,6 +494,96 @@ def test_column_sum_min_max():
s3select_assert_result( int(count)*4 , int(sum1)-int(sum2) ) s3select_assert_result( int(count)*4 , int(sum1)-int(sum2) )
@pytest.mark.s3select
def test_csv_json_format_column_sum_min_max():
csv_obj = create_random_csv_object(10000,10)
csv_obj_name = get_random_string()
bucket_name = get_new_bucket_name()
upload_object(bucket_name,csv_obj_name,csv_obj)
csv_obj_name_2 = get_random_string()
bucket_name_2 = "testbuck2"
upload_object(bucket_name_2,csv_obj_name_2,csv_obj)
res_s3select = run_s3select(bucket_name,csv_obj_name,"select max(int(_1)) from s3object;","CSV","JSON")
list_int = create_list_of_int( 1 , csv_obj )
res_target = max( list_int )
s3select_assert_result(res_s3select, '{{"_1":{}}}\n'.format(res_target))
@pytest.mark.s3select
def test_parquet_json_format_column_sum_min_max():
a = [random.randint(1, 10000) for _ in range(100)]
df3 = pd.DataFrame({'a': a})
table = pa.Table.from_pandas(df3, preserve_index=False)
obj = pa.BufferOutputStream()
pq.write_table(table, obj)
parquet_obj = obj.getvalue().to_pybytes()
parquet_obj_name = "4col.parquet"
bucket_name = get_new_bucket_name()
upload_parquet_object(bucket_name,parquet_obj_name,parquet_obj)
max_value = get_max_from_parquet_column(parquet_obj, 'a')
res_s3select = run_s3select(bucket_name,parquet_obj_name,'select max(a) from s3object ;',"PARQUET","JSON")
s3select_assert_result( res_s3select, '{{"_1":{}}}\n'.format(max_value))
@pytest.mark.s3select
def test_json_column_sum_min_max():
csv_obj = create_random_csv_object(10,10)
json_obj = csv_to_json(csv_obj)
json_obj_name = get_random_string()
bucket_name = get_new_bucket_name()
upload_object(bucket_name,json_obj_name,json_obj)
res_s3select = run_s3select(bucket_name,json_obj_name,"select min(_1.c1) from s3object[*].root;","JSON","JSON")
list_int = create_list_of_int( 1 , csv_obj )
res_target = min( list_int )
s3select_assert_result( res_s3select, '{{"_1":{}}}\n'.format(res_target))
res_s3select = run_s3select(bucket_name,json_obj_name,"select min(_1.c4) from s3object[*].root;","JSON","JSON")
list_int = create_list_of_int( 4 , csv_obj )
res_target = min( list_int )
s3select_assert_result( res_s3select, '{{"_1":{}}}\n'.format(res_target))
res_s3select = run_s3select(bucket_name,json_obj_name,"select max(_1.c4) from s3object[*].root;","JSON","JSON")
list_int = create_list_of_int( 4 , csv_obj )
res_target = max( list_int )
s3select_assert_result( res_s3select, '{{"_1":{}}}\n'.format(res_target))
res_s3select = run_s3select(bucket_name,json_obj_name,"select max(_1.c7) from s3object[*].root;","JSON","JSON")
list_int = create_list_of_int( 7 , csv_obj )
res_target = max( list_int )
s3select_assert_result( res_s3select, '{{"_1":{}}}\n'.format(res_target))
res_s3select = run_s3select(bucket_name,json_obj_name,"select sum(_1.c4) from s3object[*].root;","JSON","JSON")
list_int = create_list_of_int( 4 , csv_obj )
res_target = sum( list_int )
s3select_assert_result( res_s3select, '{{"_1":{}}}\n'.format(res_target))
res_s3select = run_s3select(bucket_name,json_obj_name,"select sum(_1.c7) from s3object[*].root;","JSON","JSON")
list_int = create_list_of_int( 7 , csv_obj )
res_target = sum( list_int )
s3select_assert_result( res_s3select, '{{"_1":{}}}\n'.format(res_target))
@pytest.mark.s3select @pytest.mark.s3select
def test_nullif_expressions(): def test_nullif_expressions():
@ -649,6 +640,90 @@ def test_nullif_expressions():
s3select_assert_result( res_s3select_nullif, res_s3select) s3select_assert_result( res_s3select_nullif, res_s3select)
@pytest.mark.s3select
def test_json_nullif_expressions():
json_obj = create_random_json_object(10000,10)
json_obj_name = get_random_string()
bucket_name = get_new_bucket_name()
upload_object(bucket_name,json_obj_name,json_obj)
res_s3select_nullif = remove_xml_tags_from_result( run_s3select(bucket_name,json_obj_name,"select count(0) from s3object[*].root where nullif(_1.c1,_1.c2) is null ;","JSON","JSON") ).replace("\n","")
res_s3select = remove_xml_tags_from_result( run_s3select(bucket_name,json_obj_name,"select count(0) from s3object[*].root where _1.c1 = _1.c2 ;","JSON","JSON") ).replace("\n","")
s3select_assert_result( res_s3select_nullif, res_s3select)
res_s3select_nullif = remove_xml_tags_from_result( run_s3select(bucket_name,json_obj_name,"select (nullif(_1.c1,_1.c2) is null) from s3object[*].root ;","JSON","JSON") ).replace("\n","")
res_s3select = remove_xml_tags_from_result( run_s3select(bucket_name,json_obj_name,"select (_1.c1 = _1.c2) from s3object[*].root ;","JSON","JSON") ).replace("\n","")
s3select_assert_result( res_s3select_nullif, res_s3select)
res_s3select_nullif = remove_xml_tags_from_result( run_s3select(bucket_name,json_obj_name,"select count(0) from s3object[*].root where not nullif(_1.c1,_1.c2) is null ;","JSON","JSON") ).replace("\n","")
res_s3select = remove_xml_tags_from_result( run_s3select(bucket_name,json_obj_name,"select count(0) from s3object[*].root where _1.c1 != _1.c2 ;","JSON","JSON") ).replace("\n","")
s3select_assert_result( res_s3select_nullif, res_s3select)
res_s3select_nullif = remove_xml_tags_from_result( run_s3select(bucket_name,json_obj_name,"select (nullif(_1.c1,_1.c2) is not null) from s3object[*].root ;","JSON","JSON") ).replace("\n","")
res_s3select = remove_xml_tags_from_result( run_s3select(bucket_name,json_obj_name,"select (_1.c1 != _1.c2) from s3object[*].root ;","JSON","JSON") ).replace("\n","")
s3select_assert_result( res_s3select_nullif, res_s3select)
res_s3select_nullif = remove_xml_tags_from_result( run_s3select(bucket_name,json_obj_name,"select count(0) from s3object[*].root where nullif(_1.c1,_1.c2) = _1.c1 ;","JSON","JSON") ).replace("\n","")
res_s3select = remove_xml_tags_from_result( run_s3select(bucket_name,json_obj_name,"select count(0) from s3object[*].root where _1.c1 != _1.c2 ;","JSON","JSON") ).replace("\n","")
s3select_assert_result( res_s3select_nullif, res_s3select)
@pytest.mark.s3select
def test_parquet_nullif_expressions():
a = [random.randint(1, 10000) for _ in range(100)]
b = [random.randint(1, 10000) for _ in range(100)]
df3 = pd.DataFrame({'a': a,'b':b})
table = pa.Table.from_pandas(df3, preserve_index=False)
obj = pa.BufferOutputStream()
pq.write_table(table, obj)
parquet_obj = obj.getvalue().to_pybytes()
parquet_obj_name = "2col.parquet"
bucket_name = get_new_bucket_name()
upload_parquet_object(bucket_name,parquet_obj_name,parquet_obj)
res_s3select_nullif = run_s3select(bucket_name,parquet_obj_name,"select count(0) from s3object where nullif(a, b) is null ;","PARQUET")
res_s3select = run_s3select(bucket_name,parquet_obj_name,"select count(0) from s3object where a=b ;","PARQUET")
s3select_assert_result( res_s3select_nullif, res_s3select)
a = [random.uniform(1.0, 10000.0) for _ in range(100)]
b = [random.uniform(1.0, 10000.0) for _ in range(100)]
df3 = pd.DataFrame({'a': a, 'b': b})
table = pa.Table.from_pandas(df3, preserve_index=False)
obj = pa.BufferOutputStream()
pq.write_table(table, obj)
parquet_obj = obj.getvalue().to_pybytes()
upload_parquet_object(bucket_name,parquet_obj_name,parquet_obj)
res_s3select_nullif = run_s3select(bucket_name,parquet_obj_name,"select count(0) from s3object where nullif(a, b) is null ;","PARQUET")
res_s3select = run_s3select(bucket_name,parquet_obj_name,"select count(0) from s3object where a=b ;","PARQUET")
s3select_assert_result( res_s3select_nullif, res_s3select)
@pytest.mark.s3select @pytest.mark.s3select
def test_nulliftrue_expressions(): def test_nulliftrue_expressions():
@ -699,6 +774,28 @@ def test_is_not_null_expressions():
s3select_assert_result( res_s3select_null, res_s3select) s3select_assert_result( res_s3select_null, res_s3select)
@pytest.mark.s3select
def test_json_is_not_null_expressions():
json_obj = create_random_json_object(10000,10)
json_obj_name = get_random_string()
bucket_name = get_new_bucket_name()
upload_object(bucket_name,json_obj_name,json_obj)
res_s3select_null = remove_xml_tags_from_result( run_s3select(bucket_name,json_obj_name,"select count(*) from s3object[*].root where nullif(_1.c1,_1.c2) is not null ;","JSON","JSON") ).replace("\n","")
res_s3select = remove_xml_tags_from_result( run_s3select(bucket_name,json_obj_name,"select count(*) from s3object[*].root where _1.c1 != _1.c2 ;","JSON","JSON") ).replace("\n","")
s3select_assert_result( res_s3select_null, res_s3select)
res_s3select_null = remove_xml_tags_from_result( run_s3select(bucket_name,json_obj_name,"select count(*) from s3object[*].root where (nullif(_1.c1,_1.c1) and _1.c1 = _1.c2) is not null ;","JSON","JSON") ).replace("\n","")
res_s3select = remove_xml_tags_from_result( run_s3select(bucket_name,json_obj_name,"select count(*) from s3object[*].root where _1.c1 != _1.c2 ;","JSON","JSON") ).replace("\n","")
s3select_assert_result( res_s3select_null, res_s3select)
@pytest.mark.s3select @pytest.mark.s3select
def test_lowerupper_expressions(): def test_lowerupper_expressions():
@ -717,6 +814,43 @@ def test_lowerupper_expressions():
s3select_assert_result( res_s3select, "AB12CD$$") s3select_assert_result( res_s3select, "AB12CD$$")
@pytest.mark.s3select
def test_json_lowerupper_expressions():
json_obj = create_random_json_object(1,10)
json_obj_name = get_random_string()
bucket_name = get_new_bucket_name()
upload_object(bucket_name,json_obj_name,json_obj)
res_s3select = run_s3select(bucket_name,json_obj_name,'select lower("AB12cd$$") from s3object[*] ;',"JSON","JSON")
s3select_assert_result( res_s3select, '{"_1":ab12cd$$}\n')
res_s3select = run_s3select(bucket_name,json_obj_name,'select upper("ab12CD$$") from s3object[*] ;',"JSON","JSON")
s3select_assert_result( res_s3select, '{"_1":AB12CD$$}\n')
@pytest.mark.s3select
def test_parquet_lowerupper_expressions():
parquet_obj = create_parquet_object(1)
parquet_obj_name = "4col.parquet"
bucket_name = get_new_bucket_name()
upload_parquet_object(bucket_name,parquet_obj_name,parquet_obj)
res_s3select = run_s3select(bucket_name,parquet_obj_name,'select lower("AB12cd$$") from s3object ;',"PARQUET","JSON")
s3select_assert_result( res_s3select, '{"_1":ab12cd$$}\n')
res_s3select = run_s3select(bucket_name,parquet_obj_name,'select upper("ab12CD$$") from s3object ;',"PARQUET","JSON")
s3select_assert_result( res_s3select, '{"_1":AB12CD$$}\n')
@pytest.mark.s3select @pytest.mark.s3select
def test_in_expressions(): def test_in_expressions():
@ -922,6 +1056,70 @@ def test_like_expressions():
s3select_assert_result( res_s3select_like, res_s3select ) s3select_assert_result( res_s3select_like, res_s3select )
@pytest.mark.s3select
def test_json_like_expressions():
csv_obj = create_random_csv_object_string(1000,10)
json_obj = csv_to_json(csv_obj)
json_obj_name = get_random_string()
bucket_name = get_new_bucket_name()
upload_object(bucket_name,json_obj_name,json_obj)
res_s3select_like = remove_xml_tags_from_result(run_s3select(bucket_name,json_obj_name,"select count(0) from s3object[*].root where _1.c1 like \"%aeio%\";","JSON","JSON")).replace("\n","")
res_s3select = remove_xml_tags_from_result(run_s3select(bucket_name,json_obj_name, "select count(0) from s3object[*].root where substring(_1.c1,11,4) = \"aeio\" ;","JSON","JSON")).replace("\n","")
s3select_assert_result( res_s3select_like, res_s3select )
res_s3select_like = run_s3select(bucket_name,json_obj_name,'select (_1.c1 like "%aeio%") from s3object[*].root ;',"JSON","JSON")
res_s3select = run_s3select(bucket_name,json_obj_name, 'select (substring(_1.c1,11,4) = "aeio") from s3object[*].root ;',"JSON","JSON")
s3select_assert_result( res_s3select_like, res_s3select )
@pytest.mark.s3select
def test_parquet_like_expressions():
rows = 1000
columns = 3
data = {f'col_{i+1}': [] for i in range(columns)}
for _ in range(rows):
for col in data:
if random.randint(0, 9) == 5:
data[col].append(''.join(random.choice(string.ascii_letters) for _ in range(10)) + "aeiou")
else:
data[col].append(''.join("cbcd" + random.choice(string.ascii_letters) for _ in range(10)) + "vwxyzzvwxyz")
df = pd.DataFrame(data)
table = pa.Table.from_pandas(df, preserve_index=False)
obj = pa.BufferOutputStream()
pq.write_table(table, obj)
parquet_obj = obj.getvalue().to_pybytes()
parquet_obj_name = parquet_obj_name = "3col.parquet"
bucket_name = get_new_bucket_name()
upload_parquet_object(bucket_name,parquet_obj_name,parquet_obj)
res_s3select_like = run_s3select(bucket_name,parquet_obj_name,"select count(0) from s3object where col_1 like \"%aeio%\";","PARQUET")
res_s3select = run_s3select(bucket_name,parquet_obj_name, "select count(0) from s3object where substring(col_1,11,4) = \"aeio\" ;","PARQUET")
s3select_assert_result( res_s3select_like, res_s3select )
res_s3select_like = run_s3select(bucket_name,parquet_obj_name,'select (col_1 like "%aeio%") from s3object ;',"PARQUET")
res_s3select = run_s3select(bucket_name,parquet_obj_name, 'select (substring(col_1,11,4) = "aeio") from s3object ;',"PARQUET")
s3select_assert_result( res_s3select_like, res_s3select )
@pytest.mark.s3select @pytest.mark.s3select
def test_truefalselike_expressions(): def test_truefalselike_expressions():
@ -1241,6 +1439,38 @@ def test_true_false_datetime():
s3select_assert_result( res_s3select_date_time_utcnow, res_s3select_count) s3select_assert_result( res_s3select_date_time_utcnow, res_s3select_count)
@pytest.mark.s3select
def test_json_true_false_datetime():
csv_obj = create_csv_object_for_datetime(10000,1)
json_obj = csv_to_json(csv_obj)
json_obj_name = get_random_string()
bucket_name = get_new_bucket_name()
upload_object(bucket_name,json_obj_name,json_obj)
res_s3select_date_time = remove_xml_tags_from_result( run_s3select(bucket_name,json_obj_name,'select count(0) from s3object[*].root where (extract(year from to_timestamp(_1.c1)) > 1950) = true and (extract(year from to_timestamp(_1.c1)) < 1960) = true;',"JSON","JSON") )
res_s3select_substring = remove_xml_tags_from_result( run_s3select(bucket_name,json_obj_name,'select count(0) from s3object[*].root where int(substring(_1.c1,1,4))>1950 and int(substring(_1.c1,1,4))<1960;',"JSON","JSON") )
s3select_assert_result( res_s3select_date_time, res_s3select_substring)
res_s3select_date_time = remove_xml_tags_from_result( run_s3select(bucket_name,json_obj_name,'select count(0) from s3object[*].root where (date_diff(month,to_timestamp(_1.c1),date_add(month,2,to_timestamp(_1.c1)) ) = 2) = true;',"JSON","JSON") )
res_s3select_count = remove_xml_tags_from_result( run_s3select(bucket_name,json_obj_name,'select count(0) from s3object[*].root;',"JSON","JSON") )
s3select_assert_result( res_s3select_date_time, res_s3select_count)
res_s3select_date_time = remove_xml_tags_from_result( run_s3select(bucket_name,json_obj_name,'select count(0) from s3object[*].root where (date_diff(year,to_timestamp(_1.c1),date_add(day, 366 ,to_timestamp(_1.c1))) = 1) = true ;',"JSON","JSON") )
s3select_assert_result( res_s3select_date_time, res_s3select_count)
# validate that utcnow is integrate correctly with other date-time functions
res_s3select_date_time_utcnow = remove_xml_tags_from_result( run_s3select(bucket_name,json_obj_name,'select count(0) from s3object[*].root where (date_diff(hour,utcnow(),date_add(day,1,utcnow())) = 24) = true ;',"JSON","JSON") )
s3select_assert_result( res_s3select_date_time_utcnow, res_s3select_count)
@pytest.mark.s3select @pytest.mark.s3select
def test_csv_parser(): def test_csv_parser():
@ -1295,13 +1525,13 @@ def test_csv_definition():
upload_object(bucket_name,csv_obj_name,csv_obj) upload_object(bucket_name,csv_obj_name,csv_obj)
# purpose of tests is to parse correctly input with different csv defintions # purpose of tests is to parse correctly input with different csv defintions
res = remove_xml_tags_from_result( run_s3select(bucket_name,csv_obj_name,"select count(0) from s3object;","|","\t") ).replace(",","") res = remove_xml_tags_from_result( run_s3select(bucket_name,csv_obj_name,"select count(0) from s3object;",column_delim="|",row_delim="\t") ).replace(",","")
s3select_assert_result( number_of_rows, int(res)) s3select_assert_result( number_of_rows, int(res))
# assert is according to radom-csv function # assert is according to radom-csv function
# purpose of test is validate that tokens are processed correctly # purpose of test is validate that tokens are processed correctly
res_s3select = remove_xml_tags_from_result( run_s3select(bucket_name,csv_obj_name,"select min(int(_1)),max(int(_2)),min(int(_3))+1 from s3object;","|","\t") ).replace("\n","") res_s3select = remove_xml_tags_from_result( run_s3select(bucket_name,csv_obj_name,"select min(int(_1)),max(int(_2)),min(int(_3))+1 from s3object;",column_delim="|",row_delim="\t") ).replace("\n","")
min_1 = min ( create_list_of_int( 1 , csv_obj , "|","\t") ) min_1 = min ( create_list_of_int( 1 , csv_obj , "|","\t") )
max_2 = max ( create_list_of_int( 2 , csv_obj , "|","\t") ) max_2 = max ( create_list_of_int( 2 , csv_obj , "|","\t") )
@ -1617,7 +1847,7 @@ def test_output_serial_expressions():
upload_object(bucket_name,csv_obj_name,csv_obj) upload_object(bucket_name,csv_obj_name,csv_obj)
res_s3select_1 = remove_xml_tags_from_result( run_s3select_output(bucket_name,csv_obj_name,"select _1, _2 from s3object where nullif(_1,_2) is null ;", "ALWAYS") ).replace("\n",",").replace(",","") res_s3select_1 = remove_xml_tags_from_result( run_s3select(bucket_name,csv_obj_name,"select _1, _2 from s3object where nullif(_1,_2) is null ;", quot_field="ALWAYS") ).replace("\n",",").replace(",","")
res_s3select = remove_xml_tags_from_result( run_s3select(bucket_name,csv_obj_name,"select _1, _2 from s3object where _1 = _2 ;") ).replace("\n",",") res_s3select = remove_xml_tags_from_result( run_s3select(bucket_name,csv_obj_name,"select _1, _2 from s3object where _1 = _2 ;") ).replace("\n",",")
@ -1630,7 +1860,7 @@ def test_output_serial_expressions():
s3select_assert_result( '""'+res_s3select_1+'""', res_s3select_final) s3select_assert_result( '""'+res_s3select_1+'""', res_s3select_final)
res_s3select_in = remove_xml_tags_from_result( run_s3select_output(bucket_name,csv_obj_name,'select int(_1) from s3object where (int(_1) in(int(_2)));', "ASNEEDED", '$', '#')).replace("\n","#") ## TODO why \n appears in output? res_s3select_in = remove_xml_tags_from_result( run_s3select(bucket_name,csv_obj_name,'select int(_1) from s3object where (int(_1) in(int(_2)));', quot_field="ASNEEDED", op_column_delim='$', op_row_delim='#')).replace("\n","#") ## TODO why \n appears in output?
res_s3select = remove_xml_tags_from_result( run_s3select(bucket_name,csv_obj_name,'select int(_1) from s3object where int(_1) = int(_2);')).replace("\n","#") res_s3select = remove_xml_tags_from_result( run_s3select(bucket_name,csv_obj_name,'select int(_1) from s3object where int(_1) = int(_2);')).replace("\n","#")
@ -1644,7 +1874,7 @@ def test_output_serial_expressions():
s3select_assert_result(res_s3select_in , res_s3select_final ) s3select_assert_result(res_s3select_in , res_s3select_final )
res_s3select_quot = remove_xml_tags_from_result( run_s3select_output(bucket_name,csv_obj_name,'select int(_1) from s3object where (int(_1) in(int(_2)));', "ALWAYS", '$', '#')).replace("\n","") res_s3select_quot = remove_xml_tags_from_result( run_s3select(bucket_name,csv_obj_name,'select int(_1) from s3object where (int(_1) in(int(_2)));', quot_field="ALWAYS", op_column_delim='$', op_row_delim='#')).replace("\n","")
res_s3select = remove_xml_tags_from_result( run_s3select(bucket_name,csv_obj_name,'select int(_1) from s3object where int(_1) = int(_2);')).replace("\n","#") res_s3select = remove_xml_tags_from_result( run_s3select(bucket_name,csv_obj_name,'select int(_1) from s3object where int(_1) = int(_2);')).replace("\n","#")
res_s3select_list = res_s3select.split('#') res_s3select_list = res_s3select.split('#')