Compare commits

...

3 commits

Author SHA1 Message Date
gal salomon
de402d0e00 missing arrow/parquet import
Signed-off-by: gal salomon <gal.salomon@gmail.com>
2022-02-12 09:36:21 +02:00
gal salomon
e7a0e54942 rename attribute s3select_parquet s3select
Signed-off-by: gal salomon <gal.salomon@gmail.com>
2022-02-10 09:12:03 +02:00
gal salomon
717cb97d0c adding parquet test. the test creates several columns table using pyarrow. each column contains random numbers. the test verified s3select query results
Signed-off-by: gal salomon <gal.salomon@gmail.com>
2022-02-10 06:49:48 +02:00

View file

@ -12,6 +12,10 @@ from . import (
get_client
)
import pandas as pd
import pyarrow as pa
import pyarrow.parquet as pq
import logging
logging.basicConfig(level=logging.INFO)
@ -218,6 +222,70 @@ def upload_csv_object(bucket_name,new_key,obj):
response = c2.get_object(Bucket=bucket_name, Key=new_key)
eq(response['Body'].read().decode('utf-8'), obj, 's3select error[ downloaded object not equal to uploaded objecy')
def parquet_generator(bucket_name,parquet_obj_name,parquet_size):
#purpose: creating parquet object with several columns, each column contains random numbers
#the s3select query results(counts of where clause) is verified against arrays comparision results.
a=[]
for i in range(parquet_size):
a.append(int(random.randint(1,10000)))
b=[]
for i in range(parquet_size):
b.append(int(random.randint(1,10000)))
c=[]
for i in range(parquet_size):
c.append(int(random.randint(1,10000)))
d=[]
for i in range(parquet_size):
d.append(int(random.randint(1,10000)))
# count where a = b, validation against the s3select query result
count_a_b_eq=0
for i in range(parquet_size):
if(a[i] == b[i]):
count_a_b_eq=count_a_b_eq+1
# count where c = d, validation against the s3select query result
count_c_d_eq=0
for i in range(parquet_size):
if(c[i] == d[i]):
count_c_d_eq=count_c_d_eq+1
#create dataframe. the actual columns to reside on parquet object
df3 = pd.DataFrame({'a': a,
'b': b,
'c': c,
'd': d}
)
# creating parquet object
table = pa.Table.from_pandas(df3,preserve_index=False)
obj = pa.BufferOutputStream()
pq.write_table(table, obj)##, compression="snappy")
## load file-content into s3-storage
client = get_client()
client.create_bucket(Bucket=bucket_name)
client.put_object(Bucket=bucket_name, Key=parquet_obj_name, Body=obj.getvalue().to_pybytes())
# following queries verify query result against the count on arrays that reside on parquet object
res_s3select = remove_xml_tags_from_result( run_s3select(bucket_name,parquet_obj_name,"select count(0) from s3object;") ).replace(",","")
s3select_assert_result( parquet_size , int( res_s3select ))
res_s3select = remove_xml_tags_from_result( run_s3select(bucket_name,parquet_obj_name,"select count(0) from s3object where a = b;") ).replace(",","")
s3select_assert_result( count_a_b_eq , int( res_s3select ))
res_s3select = remove_xml_tags_from_result( run_s3select(bucket_name,parquet_obj_name,"select count(0) from s3object where c = d;") ).replace(",","")
s3select_assert_result( count_c_d_eq , int( res_s3select ))
def run_s3select(bucket,key,query,column_delim=",",row_delim="\n",quot_char='"',esc_char='\\',csv_header_info="NONE", progress = False):
s3 = get_client()
@ -1274,3 +1342,11 @@ def test_output_serial_expressions():
s3select_assert_result( res_s3select_quot, res_s3select_final )
@attr('s3select')
def test_parquet():
parquet_obj_name = "4col.parquet"
size = 1000000
parquet_generator("test",parquet_obj_name,size)