forked from TrueCloudLab/s3-tests
Compare commits
3 commits
master
...
parquet_s3
Author | SHA1 | Date | |
---|---|---|---|
|
de402d0e00 | ||
|
e7a0e54942 | ||
|
717cb97d0c |
1 changed files with 76 additions and 0 deletions
|
@ -12,6 +12,10 @@ from . import (
|
|||
get_client
|
||||
)
|
||||
|
||||
import pandas as pd
|
||||
import pyarrow as pa
|
||||
import pyarrow.parquet as pq
|
||||
|
||||
import logging
|
||||
logging.basicConfig(level=logging.INFO)
|
||||
|
||||
|
@ -218,6 +222,70 @@ def upload_csv_object(bucket_name,new_key,obj):
|
|||
response = c2.get_object(Bucket=bucket_name, Key=new_key)
|
||||
eq(response['Body'].read().decode('utf-8'), obj, 's3select error[ downloaded object not equal to uploaded objecy')
|
||||
|
||||
def parquet_generator(bucket_name,parquet_obj_name,parquet_size):
|
||||
|
||||
#purpose: creating parquet object with several columns, each column contains random numbers
|
||||
#the s3select query results(counts of where clause) is verified against arrays comparision results.
|
||||
|
||||
a=[]
|
||||
for i in range(parquet_size):
|
||||
a.append(int(random.randint(1,10000)))
|
||||
|
||||
b=[]
|
||||
for i in range(parquet_size):
|
||||
b.append(int(random.randint(1,10000)))
|
||||
|
||||
c=[]
|
||||
for i in range(parquet_size):
|
||||
c.append(int(random.randint(1,10000)))
|
||||
|
||||
d=[]
|
||||
for i in range(parquet_size):
|
||||
d.append(int(random.randint(1,10000)))
|
||||
|
||||
# count where a = b, validation against the s3select query result
|
||||
count_a_b_eq=0
|
||||
for i in range(parquet_size):
|
||||
if(a[i] == b[i]):
|
||||
count_a_b_eq=count_a_b_eq+1
|
||||
|
||||
# count where c = d, validation against the s3select query result
|
||||
count_c_d_eq=0
|
||||
for i in range(parquet_size):
|
||||
if(c[i] == d[i]):
|
||||
count_c_d_eq=count_c_d_eq+1
|
||||
|
||||
|
||||
#create dataframe. the actual columns to reside on parquet object
|
||||
df3 = pd.DataFrame({'a': a,
|
||||
'b': b,
|
||||
'c': c,
|
||||
'd': d}
|
||||
)
|
||||
|
||||
# creating parquet object
|
||||
table = pa.Table.from_pandas(df3,preserve_index=False)
|
||||
obj = pa.BufferOutputStream()
|
||||
pq.write_table(table, obj)##, compression="snappy")
|
||||
|
||||
## load file-content into s3-storage
|
||||
client = get_client()
|
||||
client.create_bucket(Bucket=bucket_name)
|
||||
client.put_object(Bucket=bucket_name, Key=parquet_obj_name, Body=obj.getvalue().to_pybytes())
|
||||
|
||||
# following queries verify query result against the count on arrays that reside on parquet object
|
||||
res_s3select = remove_xml_tags_from_result( run_s3select(bucket_name,parquet_obj_name,"select count(0) from s3object;") ).replace(",","")
|
||||
|
||||
s3select_assert_result( parquet_size , int( res_s3select ))
|
||||
|
||||
res_s3select = remove_xml_tags_from_result( run_s3select(bucket_name,parquet_obj_name,"select count(0) from s3object where a = b;") ).replace(",","")
|
||||
|
||||
s3select_assert_result( count_a_b_eq , int( res_s3select ))
|
||||
|
||||
res_s3select = remove_xml_tags_from_result( run_s3select(bucket_name,parquet_obj_name,"select count(0) from s3object where c = d;") ).replace(",","")
|
||||
|
||||
s3select_assert_result( count_c_d_eq , int( res_s3select ))
|
||||
|
||||
def run_s3select(bucket,key,query,column_delim=",",row_delim="\n",quot_char='"',esc_char='\\',csv_header_info="NONE", progress = False):
|
||||
|
||||
s3 = get_client()
|
||||
|
@ -1274,3 +1342,11 @@ def test_output_serial_expressions():
|
|||
|
||||
s3select_assert_result( res_s3select_quot, res_s3select_final )
|
||||
|
||||
@attr('s3select')
|
||||
def test_parquet():
|
||||
|
||||
parquet_obj_name = "4col.parquet"
|
||||
size = 1000000
|
||||
|
||||
parquet_generator("test",parquet_obj_name,size)
|
||||
|
||||
|
|
Loading…
Reference in a new issue