Commit 20316083 authored by Douglas's avatar Douglas

big refactoring to how to test works

- new context manager to measure code's performance
- now creates and fill arrays by memory size and not number of rows anymore
- only 1 row from mariadb is broadcasted to the entire array (doesnt spend a lot of time waiting mysql anymore)
- removed useless profile functions (just from this repository)
- added docstrings
parent cd432f09
...@@ -23,21 +23,23 @@ def bigger_than_memory_write(self, out_of_core_index=False): ...@@ -23,21 +23,23 @@ def bigger_than_memory_write(self, out_of_core_index=False):
message_list = [] message_list = []
message_list.append('Write start: ' + get_process_memory_usage()) message_list.append('Write start: ' + get_process_memory_usage())
#ram_nbytes = psutil.virtual_memory().total
#big_array_len = (1*ram_nbytes) // big_array_dtype.itemsize
_, array_schema = get_field_names_and_schema() _, array_schema = get_field_names_and_schema()
float_dtype = dtype(float64)
big_array = create_zbigarray((360000,), dtype=array_schema) array_memory_size = 1*GB
big_index = create_zbigarray((1, 360000), dtype(float64)) if out_of_core_index else np.arange(360000) array_number_of_items = array_memory_size // array_schema.itemsize
big_array = create_zbigarray(memory_size=array_memory_size, dtype=array_schema)
root = self.getPhysicalRoot() root = self.getPhysicalRoot()
if out_of_core_index: if out_of_core_index:
big_index = create_zbigarray(memory_size=array_number_of_items, dtype=float_dtype)
big_array, big_index = store_arrays(root, [big_array, 'big_array'], [big_index, 'big_index']) big_array, big_index = store_arrays(root, [big_array, 'big_array'], [big_index, 'big_index'])
else: else:
big_index = np.arange(array_number_of_items)
big_array = store_arrays(root, [big_array, 'big_array'])[0] big_array = store_arrays(root, [big_array, 'big_array'])[0]
response = populate_array(self, 30000, 12, big_array, big_index, array_schema) response = populate_array(self, big_array, big_index, array_schema)
message_list.extend(response['messages']) message_list.extend(response['messages'])
message_list.append('Write end: ' + get_process_memory_usage()) message_list.append('Write end: ' + get_process_memory_usage())
...@@ -57,42 +59,35 @@ def bigger_than_memory_read(self, out_of_core_index=False): ...@@ -57,42 +59,35 @@ def bigger_than_memory_read(self, out_of_core_index=False):
ob = root.big_index ob = root.big_index
message_list.append(len(ob._p_jar.db().storage.load(ob._p_oid, '')[0])+42) message_list.append(len(ob._p_jar.db().storage.load(ob._p_oid, '')[0])+42)
big_index = root.big_index[:] if out_of_core_index else np.arange(360000) number_of_items = len(root.big_array)
item_size = root.big_array.dtype.itemsize
message_list.append('Processing %s items with %s bytes each' % (number_of_items, item_size))
response = process_data(root.big_array[:], big_index[:], columns) big_index = root.big_index[:] if out_of_core_index else np.arange(number_of_items)
message_list.extend(response['messages'])
message_list.append(response['result']) messages = process_data(root.big_array[:], big_index[:], columns)
message_list.extend(messages)
message_list.append('Read end: ' + get_process_memory_usage()) message_list.append('Read end: ' + get_process_memory_usage())
return message_list return message_list
def bigger_than_memory_profile(self):
profiles = [bigger_than_memory_read_profile, bigger_than_memory_write_profile]
return [profile(self) for profile in profiles]
def bigger_than_memory_read_profile(self):
profile_path = '/tmp/profile_read'
cProfile.runctx('bigger_than_memory_read(self)', globals(), locals(), profile_path)
return "To see the profile start a console on server and: import pstats; pstats.Stats('%s').print_stats()" % profile_path
def bigger_than_memory_write_profile(self):
profile_path = '/tmp/profile_write'
cProfile.runctx('bigger_than_memory_write(self)', globals(), locals(), profile_path)
return "To see the profile start a console on server and: import pstats; pstats.Stats('%s').print_stats()" % profile_path
@contextmanager @contextmanager
def timer(name, message_list): def timer(name, message_list):
'''A context manager to time function calls and add the result to
message_list.
'''
start_time = time() start_time = time()
yield yield
elapsed_time = time() - start_time elapsed_time = time() - start_time
message_list.append('[%s] finished in % seconds' % (name, elapsed_time)) message_list.append('[%s] finished in % seconds' % (name, elapsed_time))
def get_process_memory_usage(): def get_process_memory_usage():
'''Return the runnign process' memory usage.'''
process = psutil.Process(os.getpid()) process = psutil.Process(os.getpid())
m = process.memory_info() m = process.memory_info()
return 'VIRT: %i MB\tRSS: %iMB' % (m.vms//MB, m.rss//MB) return 'VIRT: %i MB\tRSS: %iMB' % (m.vms//MB, m.rss//MB)
def get_field_names_and_schema(): def get_field_names_and_schema():
'''Returns the array's dtype and its field names.'''
field_names = ['f%s' % number if not number == 13 else 'quantity' for number in range(23)] field_names = ['f%s' % number if not number == 13 else 'quantity' for number in range(23)]
array_schema = np.dtype({ array_schema = np.dtype({
'names' : field_names, 'names' : field_names,
...@@ -100,58 +95,51 @@ def get_field_names_and_schema(): ...@@ -100,58 +95,51 @@ def get_field_names_and_schema():
np.dtype, np.dtype,
['i8', 'i8', 'i8', 'i8', 'i8', 'i8', 'i8', 'i8', 'i8', 'i8', ['i8', 'i8', 'i8', 'i8', 'i8', 'i8', 'i8', 'i8', 'i8', 'i8',
'i8', 'i8', 'i8', 'f8', 'i8', 'i8', 'f8', 'f8', 'f8', 'i8', 'i8', 'i8', 'f8', 'i8', 'i8', 'f8', 'f8', 'f8',
'a50', 'a50', 'a50', 'a50'] # 'a50', 'a50', 'a50', 'a50']
#'f8', 'f8', 'f8', 'f8'] 'f8', 'f8', 'f8', 'f8']
) )
}) })
return field_names, array_schema return field_names, array_schema
def create_zbigarray(dimension, dtype): def create_zbigarray(memory_size=None, dtype=None):
return ZBigArray(dimension, dtype) '''Create a ZBigArray with memory_size bytes with the defined dtype.'''
if memory_size and dtype:
element_number = memory_size // dtype.itemsize
return ZBigArray((element_number,), dtype)
else:
raise Exception('You need to provide memory_size and dtype')
def store_arrays(root, *arrays_filenames): def store_arrays(root, *arrays_filenames):
'''Store all [array, filename] pair in arrays_filenames in root and return an
ordered list with all store arrays.
'''
for array, filename in arrays_filenames: for array, filename in arrays_filenames:
setattr(root, filename, None) setattr(root, filename, None)
setattr(root, filename, array) setattr(root, filename, array)
transaction.commit() transaction.commit()
return [getattr(root, filename) for _, filename in arrays_filenames] return [getattr(root, filename) for _, filename in arrays_filenames]
def populate_array(root, chunk_size, number_of_chunks, big_array, big_index, schema): def populate_array(root, big_array, big_index, schema):
offset = 0 '''Fill big_array with copies of a MySQL row until it is completely full'''
start_populate_wendelin = mysql_time = time()
# ZSQL Method to fetch data from stocks table, chunk by chunk, and put it in
# the ZBigArray. Implemented using DTML's sqlvar instruction, LIMIT andd OFFSET.
data = root.stock_offset(my_offset=0, my_limit=chunk_size)
mysql_time = time() - mysql_time
columns = data._names
tuples = data.tuples()
counter = 0
message_list = [] message_list = []
message_list.append('Running with %s chunks of %s rows.' % (number_of_chunks, chunk_size)) data = root.stock_offset(my_offset=0, my_limit=1)
columns = data._names
chunk_byte_size = sys.getsizeof(tuples) with timer('Time to prepare data for assignment', message_list):
message_list.append('Memory (in bytes) for each chunk: %s' % chunk_byte_size) row = tuple([filter_item(item, normalize=True) for item in data.tuples()[0]])
for i in range(number_of_chunks): with timer('Time to fill ZBigArray', message_list):
for index, row in enumerate(tuples): big_array[:] = row
tuples[index] = tuple([filter_item(item) for item in row])
if len(tuples) < chunk_size:
chunk_end = chunk_size*(i) + len(tuples)
else:
chunk_end = chunk_size*(i+1)
big_array[offset:chunk_end] = np.array(tuples, schema)
offset += chunk_size
temp_mysql_time = time()
data = root.stock_offset(my_offset=offset, my_limit=chunk_size)
mysql_time += time()-temp_mysql_time
tuples = data.tuples()
total_time = time() - start_populate_wendelin
message_list.append('%s seconds to load & write %s records into wendelin array. %s seconds on MySQL loads and %s writting to disk.' % (total_time, chunk_end, mysql_time, total_time-mysql_time))
transaction.commit() transaction.commit()
message_list.append('Total size of out-of-core array: %s megabytes.' % (big_array.nbytes // MB))
return {'messages': message_list, 'columns': columns} return {'messages': message_list, 'columns': columns}
def filter_item(item): # method to cast Zope's DateTime objects, falsy-values and strings to floats
if not item or isinstance(item, type(None)): #or isinstance(item, (str, unicode)): def filter_item(item, normalize=True):
'''Typecast item to numeric values if it is a: DateTime or falsy to help
pandas/numpy deal with them. If normalize is True it will typecast strings
and unicodes to floats too, thus reducing that size dramatically.
'''
if not item or isinstance(item, type(None)):
return 0
if normalize and isinstance(item, (str, unicode)):
return 0 return 0
elif isinstance(item, DateTime): elif isinstance(item, DateTime):
return 0 return 0
...@@ -159,16 +147,22 @@ def filter_item(item): ...@@ -159,16 +147,22 @@ def filter_item(item):
return item return item
def process_data(big_array, big_index, columns): def process_data(big_array, big_index, columns):
'''Process all data in big_array. Currently, does a complete sum of the
quantity column both with wendein.core/numpy and pandas.
'''
message_list = [] message_list = []
result = None result = None
with timer('pandas sum', message_list): try:
df = pd.DataFrame.from_records(big_array, index=big_index) with timer('pandas sum', message_list):
result = df.quantity.sum() df = pd.DataFrame.from_records(big_array, index=big_index)
result = df.quantity.sum()
message_list.append('Pandas result: %s' % result) if result else None
except MemoryError:
message_list.append('MemoryError while creating pandas.Dataframe!')
with timer('numpy sum', message_list): with timer('numpy sum', message_list):
big_array['quantity'].sum() result = big_array['quantity'].sum()
message_list.append('NumPy result: %s' % result)
# message_list.append('%s seconds to read the wendelin array with %s rows.' % (finish-start, len(big_array)))
# message_list.append('Total size of out-of-core array: %s megabytes.' % (big_array.nbytes // MB)) return message_list
return { 'messages': message_list, 'result': result } \ No newline at end of file
\ No newline at end of file
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment