Commit 81b8a103 authored by Douglas's avatar Douglas

improved test tool selection and zblk format and refactoring

- now there are parameters to run our tests with: wendelin+pandas, wendelin+numpy, numpy in memory, and numpy with memmaps
- fixed the selection of the ZBlkFormat
- test moved to performance_test_server.py (performance_test_client to be added next)
parent 88bcb9a1
...@@ -13,12 +13,6 @@ import cProfile ...@@ -13,12 +13,6 @@ import cProfile
import pstats import pstats
def bigger_than_memory(self):
message_list = []
message_list.extend(bigger_than_memory_write(self))
message_list.extend(bigger_than_memory_read(self))
return message_list
def bigger_than_memory_write(self, out_of_core_index=False, zblk_format='0', **kwargs): def bigger_than_memory_write(self, out_of_core_index=False, zblk_format='0', **kwargs):
message_list = [] message_list = []
message_list.append('Write start: ' + get_process_memory_usage()) message_list.append('Write start: ' + get_process_memory_usage())
...@@ -28,6 +22,7 @@ def bigger_than_memory_write(self, out_of_core_index=False, zblk_format='0', **k ...@@ -28,6 +22,7 @@ def bigger_than_memory_write(self, out_of_core_index=False, zblk_format='0', **k
array_memory_size = 251*MB array_memory_size = 251*MB
array_number_of_items = array_memory_size // array_schema.itemsize array_number_of_items = array_memory_size // array_schema.itemsize
message_list.append(set_zblock_format(zblk_format))
big_array = create_zbigarray(memory_size=array_memory_size, dtype=array_schema) big_array = create_zbigarray(memory_size=array_memory_size, dtype=array_schema)
...@@ -40,18 +35,22 @@ def bigger_than_memory_write(self, out_of_core_index=False, zblk_format='0', **k ...@@ -40,18 +35,22 @@ def bigger_than_memory_write(self, out_of_core_index=False, zblk_format='0', **k
big_index = np.arange(array_number_of_items) big_index = np.arange(array_number_of_items)
big_array = store_arrays(root, [big_array, 'big_array'])[0] big_array = store_arrays(root, [big_array, 'big_array'])[0]
message_list.append(set_zblock_format(big_array, zblk_format)) check_zblock_format(zblk_format)
check_zblock_format(big_array, zblk_format)
message_list.append('Populating array with %s rows.' % array_number_of_items) message_list.append('Populating array with %s rows.' % array_number_of_items)
response = populate_array(self, big_array, big_index, array_schema) response = populate_array(self, big_array, big_index, array_schema)
message_list.extend(response['messages']) message_list.extend(response['messages'])
for message in response['messages']:
if 'ZBigArray' in message:
return json.dumps({'messages': message_list, 'result': message})
message_list.append('Write end: ' + get_process_memory_usage()) message_list.append('Write end: ' + get_process_memory_usage())
return message_list return json.dumps(message_list)
def bigger_than_memory_read(self, out_of_core_index=False, **kwargs): def bigger_than_memory_read(self, out_of_core_index=False, tool='wendelin.numpy'):
message_list = [] message_list = []
message_list.append('Reading with format %s' % tool)
message_list.append('Read start ' + get_process_memory_usage()) message_list.append('Read start ' + get_process_memory_usage())
root = self.getPhysicalRoot() root = self.getPhysicalRoot()
...@@ -67,13 +66,19 @@ def bigger_than_memory_read(self, out_of_core_index=False, **kwargs): ...@@ -67,13 +66,19 @@ def bigger_than_memory_read(self, out_of_core_index=False, **kwargs):
number_of_items = len(root.big_array) number_of_items = len(root.big_array)
item_size = root.big_array.dtype.itemsize item_size = root.big_array.dtype.itemsize
message_list.append('Processing %s items with %s bytes each' % (number_of_items, item_size)) message_list.append('Processing %s items with %s bytes each' % (number_of_items, item_size))
message_list.append(len(root.big_array._p_jar.db().storage.load(root.big_array._p_oid, '')[0])+42)
big_index = root.big_index[:] if out_of_core_index else np.arange(number_of_items) big_index = root.big_index[:] if out_of_core_index else np.arange(number_of_items)
messages = process_data(root.big_array[:], big_index[:], columns) messages = process_data(self, root.big_array, big_index, columns, tool)
message_list.extend(messages) message_list.extend(messages)
message_list.append('Read end: ' + get_process_memory_usage()) message_list.append('Read end: ' + get_process_memory_usage())
return message_list
for message in messages:
for tool in ['wendelin.pandas', 'wendelin.numpy', 'numpy.memory', 'numpy.memmap']:
if tool in message:
return json.dumps({'messages': message_list, 'result': message})
return json.dumps(message_list)
@contextmanager @contextmanager
def timer(name, message_list): def timer(name, message_list):
...@@ -103,8 +108,8 @@ def get_field_names_and_schema(): ...@@ -103,8 +108,8 @@ def get_field_names_and_schema():
np.dtype, np.dtype,
['i8', 'i8', 'i8', 'i8', 'i8', 'i8', 'i8', 'i8', 'i8', 'i8', ['i8', 'i8', 'i8', 'i8', 'i8', 'i8', 'i8', 'i8', 'i8', 'i8',
'i8', 'i8', 'i8', 'f8', 'i8', 'i8', 'f8', 'f8', 'f8', 'i8', 'i8', 'i8', 'f8', 'i8', 'i8', 'f8', 'f8', 'f8',
'a50', 'a50', 'a50', 'a50'] # 'a50', 'a50', 'a50', 'a50']
#'f8', 'f8', 'f8', 'f8'] 'f8', 'f8', 'f8', 'f8']
) )
}) })
return field_names, array_schema return field_names, array_schema
...@@ -117,20 +122,17 @@ def create_zbigarray(memory_size=None, dtype=None): ...@@ -117,20 +122,17 @@ def create_zbigarray(memory_size=None, dtype=None):
# else: # else:
# raise Exception('You need to provide memory_size and dtype, %s - %s' % (memory_size, dtype)) # raise Exception('You need to provide memory_size and dtype, %s - %s' % (memory_size, dtype))
def set_zblock_format(array, zblk_format): def set_zblock_format(zblk_format):
'''Set the array to write with the provide zblock format. '''Set the array to write with the provide zblock format.
Formats are: Formats are:
- 0 (low-overhead access time and high-overhead storage size) - 0 (low-overhead access timed high-overhead storage size)
- 1 (low overhead storage size and high-overhead access time) - 1 (low overhead storage size and high-overhead access time)
''' '''
if zblk_format in ('0', '1'): file_zodb.ZBlk_fmt_write = zblk_format_string = 'ZBlk%s' % zblk_format
array._fileh.ZBlk_fmt_write = zblk_format_string = 'ZBlk%s' % zblk_format return 'Writting with format: %s' % zblk_format_string
return 'Writting with format: %s' % zblk_format_string
else:
raise 'Unknown zblk_format provided. Choose between 0 and 1. See docs strings for more information.'
def check_zblock_format(array, zblk_format): def check_zblock_format(zblk_format):
if not array._fileh.ZBlk_fmt_write == 'ZBlk%s' % zblk_format: if not file_zodb.ZBlk_fmt_write == 'ZBlk%s' % zblk_format:
raise 'Zblk format didnt match' raise 'Zblk format didnt match'
else: else:
return True return True
...@@ -151,7 +153,7 @@ def populate_array(root, big_array, big_index, schema): ...@@ -151,7 +153,7 @@ def populate_array(root, big_array, big_index, schema):
with timer('Time to prepare data for assignment', message_list): with timer('Time to prepare data for assignment', message_list):
data = root.stock_offset(my_offset=0, my_limit=1) data = root.stock_offset(my_offset=0, my_limit=1)
columns = data._names columns = data._names
row = tuple([filter_item(item, normalize=False) for item in data.tuples()[0]]) row = tuple([filter_item(item, normalize=True) for item in data.tuples()[0]])
max_memory_usage = 100*MB max_memory_usage = 100*MB
message_list.append('%s megabytes of memory can be used per chunk.' % (max_memory_usage//MB)) message_list.append('%s megabytes of memory can be used per chunk.' % (max_memory_usage//MB))
...@@ -166,11 +168,9 @@ def populate_array(root, big_array, big_index, schema): ...@@ -166,11 +168,9 @@ def populate_array(root, big_array, big_index, schema):
for i in range(chunks): for i in range(chunks):
big_array[chunk_begin:chunksize*(i+1)] = row big_array[chunk_begin:chunksize*(i+1)] = row
chunk_begin = chunksize + 1 chunk_begin = chunksize + 1
with timer('Commit each ZBigArray chunk', message_list): transaction.commit()
transaction.commit()
return {'messages': message_list, 'columns': columns} return {'messages': message_list, 'columns': columns}
# method to cast Zope's DateTime objects, falsy-values and strings to floats
def filter_item(item, normalize=True): def filter_item(item, normalize=True):
'''Typecast item to numeric values if it is a: DateTime or falsy to help '''Typecast item to numeric values if it is a: DateTime or falsy to help
pandas/numpy deal with them. If normalize is True it will typecast strings pandas/numpy deal with them. If normalize is True it will typecast strings
...@@ -185,26 +185,53 @@ def filter_item(item, normalize=True): ...@@ -185,26 +185,53 @@ def filter_item(item, normalize=True):
else: else:
return item return item
def process_data(big_array, big_index, columns): def process_data(root, big_array, big_index, columns, tool):
'''Process all data in big_array. Currently, does a complete sum of the '''Process all data in big_array. Currently, does a complete sum of the
quantity column both with wendein.core/numpy and pandas. quantity column both with wendein.core/numpy and pandas.
''' '''
message_list = [] message_list = []
result = None result = None
if tool == 'wendelin.pandas':
try:
with timer(tool, message_list):
df = pd.DataFrame.from_records(big_array[:], index=big_index)
result = df.quantity.sum()
if result:
message_list.append('Pandas result: %s' % result)
except MemoryError:
message_list.append('MemoryError while creating pandas.Dataframe!')
if tool == 'wendelin.numpy':
with timer(tool, message_list):
result = big_array[:]['quantity'].sum()
message_list.append('NumPy result: %s' % result)
# common setup for in memory and memmap
if tool == 'numpy.memory' or tool == 'numpy.memmap':
data = root.stock_offset(my_offset=0, my_limit=1)
columns = data._names
row = tuple([filter_item(item, normalize=True) for item in data.tuples()[0]])
_, schema = get_field_names_and_schema()
array = np.ndarray((1430394,), schema)
array[:] = row
# When benchmarking, only runs either pandas or numpy, one process creates if tool == 'numpy.memory':
# a cache for the other and this gives deceiving results. with timer(tool, message_list):
# TODO: add parameter to choose wether numpy or pandas should be used result = array['quantity'].sum()
try: message_list.append('numpy in memory result: %s' % result)
with timer('pandas sum', message_list):
df = pd.DataFrame.from_records(big_array, index=big_index) if tool == 'numpy.memmap':
result = df.quantity.sum() import os.path as path
message_list.append('Pandas result: %s' % result) if result else None filename = path.join('/tmp', 'numpy.dat')
except MemoryError: write_fp = np.memmap(filename, dtype=schema, mode='w+', shape=(1430394,))
message_list.append('MemoryError while creating pandas.Dataframe!') with timer('time to write numpy memmap', message_list):
write_fp[:] = row
with timer('numpy sum', message_list): write_fp.flush()
result = big_array['quantity'].sum() with timer(tool, message_list):
message_list.append('NumPy result: %s' % result) read_fp = np.memmap(filename, dtype=schema, mode='r', shape=(1430394,))
array = np.ndarray((1430394,), schema, buffer=read_fp)
result = array['quantity'].sum()
message_list.append('numpy memmap result: %s' % result)
return message_list return message_list
\ No newline at end of file
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment