forked from TrueCloudLab/s3-tests
Compare commits
3 commits
master
...
parquet_s3
Author | SHA1 | Date | |
---|---|---|---|
|
de402d0e00 | ||
|
e7a0e54942 | ||
|
717cb97d0c |
1 changed files with 76 additions and 0 deletions
|
@ -12,6 +12,10 @@ from . import (
|
||||||
get_client
|
get_client
|
||||||
)
|
)
|
||||||
|
|
||||||
|
import pandas as pd
|
||||||
|
import pyarrow as pa
|
||||||
|
import pyarrow.parquet as pq
|
||||||
|
|
||||||
import logging
|
import logging
|
||||||
logging.basicConfig(level=logging.INFO)
|
logging.basicConfig(level=logging.INFO)
|
||||||
|
|
||||||
|
@ -218,6 +222,70 @@ def upload_csv_object(bucket_name,new_key,obj):
|
||||||
response = c2.get_object(Bucket=bucket_name, Key=new_key)
|
response = c2.get_object(Bucket=bucket_name, Key=new_key)
|
||||||
eq(response['Body'].read().decode('utf-8'), obj, 's3select error[ downloaded object not equal to uploaded objecy')
|
eq(response['Body'].read().decode('utf-8'), obj, 's3select error[ downloaded object not equal to uploaded objecy')
|
||||||
|
|
||||||
|
def parquet_generator(bucket_name,parquet_obj_name,parquet_size):
|
||||||
|
|
||||||
|
#purpose: creating parquet object with several columns, each column contains random numbers
|
||||||
|
#the s3select query results(counts of where clause) is verified against arrays comparision results.
|
||||||
|
|
||||||
|
a=[]
|
||||||
|
for i in range(parquet_size):
|
||||||
|
a.append(int(random.randint(1,10000)))
|
||||||
|
|
||||||
|
b=[]
|
||||||
|
for i in range(parquet_size):
|
||||||
|
b.append(int(random.randint(1,10000)))
|
||||||
|
|
||||||
|
c=[]
|
||||||
|
for i in range(parquet_size):
|
||||||
|
c.append(int(random.randint(1,10000)))
|
||||||
|
|
||||||
|
d=[]
|
||||||
|
for i in range(parquet_size):
|
||||||
|
d.append(int(random.randint(1,10000)))
|
||||||
|
|
||||||
|
# count where a = b, validation against the s3select query result
|
||||||
|
count_a_b_eq=0
|
||||||
|
for i in range(parquet_size):
|
||||||
|
if(a[i] == b[i]):
|
||||||
|
count_a_b_eq=count_a_b_eq+1
|
||||||
|
|
||||||
|
# count where c = d, validation against the s3select query result
|
||||||
|
count_c_d_eq=0
|
||||||
|
for i in range(parquet_size):
|
||||||
|
if(c[i] == d[i]):
|
||||||
|
count_c_d_eq=count_c_d_eq+1
|
||||||
|
|
||||||
|
|
||||||
|
#create dataframe. the actual columns to reside on parquet object
|
||||||
|
df3 = pd.DataFrame({'a': a,
|
||||||
|
'b': b,
|
||||||
|
'c': c,
|
||||||
|
'd': d}
|
||||||
|
)
|
||||||
|
|
||||||
|
# creating parquet object
|
||||||
|
table = pa.Table.from_pandas(df3,preserve_index=False)
|
||||||
|
obj = pa.BufferOutputStream()
|
||||||
|
pq.write_table(table, obj)##, compression="snappy")
|
||||||
|
|
||||||
|
## load file-content into s3-storage
|
||||||
|
client = get_client()
|
||||||
|
client.create_bucket(Bucket=bucket_name)
|
||||||
|
client.put_object(Bucket=bucket_name, Key=parquet_obj_name, Body=obj.getvalue().to_pybytes())
|
||||||
|
|
||||||
|
# following queries verify query result against the count on arrays that reside on parquet object
|
||||||
|
res_s3select = remove_xml_tags_from_result( run_s3select(bucket_name,parquet_obj_name,"select count(0) from s3object;") ).replace(",","")
|
||||||
|
|
||||||
|
s3select_assert_result( parquet_size , int( res_s3select ))
|
||||||
|
|
||||||
|
res_s3select = remove_xml_tags_from_result( run_s3select(bucket_name,parquet_obj_name,"select count(0) from s3object where a = b;") ).replace(",","")
|
||||||
|
|
||||||
|
s3select_assert_result( count_a_b_eq , int( res_s3select ))
|
||||||
|
|
||||||
|
res_s3select = remove_xml_tags_from_result( run_s3select(bucket_name,parquet_obj_name,"select count(0) from s3object where c = d;") ).replace(",","")
|
||||||
|
|
||||||
|
s3select_assert_result( count_c_d_eq , int( res_s3select ))
|
||||||
|
|
||||||
def run_s3select(bucket,key,query,column_delim=",",row_delim="\n",quot_char='"',esc_char='\\',csv_header_info="NONE", progress = False):
|
def run_s3select(bucket,key,query,column_delim=",",row_delim="\n",quot_char='"',esc_char='\\',csv_header_info="NONE", progress = False):
|
||||||
|
|
||||||
s3 = get_client()
|
s3 = get_client()
|
||||||
|
@ -1274,3 +1342,11 @@ def test_output_serial_expressions():
|
||||||
|
|
||||||
s3select_assert_result( res_s3select_quot, res_s3select_final )
|
s3select_assert_result( res_s3select_quot, res_s3select_final )
|
||||||
|
|
||||||
|
@attr('s3select')
|
||||||
|
def test_parquet():
|
||||||
|
|
||||||
|
parquet_obj_name = "4col.parquet"
|
||||||
|
size = 1000000
|
||||||
|
|
||||||
|
parquet_generator("test",parquet_obj_name,size)
|
||||||
|
|
||||||
|
|
Loading…
Reference in a new issue