Commit 8c54b4f4 authored by Douglas's avatar Douglas

refactoring to array population and write formats

- array is now populated by in-memory size and the number of rows is calculated automatically;
- array is populated using chunks of data to save some memory;
- parameter to choose between format ZBlk0 (one zodb object) and ZBlk1 (group of zodb objects).
parent a2dd4281
......@@ -19,33 +19,38 @@ def bigger_than_memory(self):
message_list.extend(bigger_than_memory_read(self))
return message_list
def bigger_than_memory_write(self, out_of_core_index=False):
def bigger_than_memory_write(self, out_of_core_index=False, zblk_format='0', **kwargs):
message_list = []
message_list.append('Write start: ' + get_process_memory_usage())
_, array_schema = get_field_names_and_schema()
float_dtype = dtype(float64)
float_dtype = np.dtype(float64)
array_memory_size = 1*GB
array_memory_size = 251*MB
array_number_of_items = array_memory_size // array_schema.itemsize
big_array = create_zbigarray(memory_size=array_memory_size, dtype=array_schema)
root = self.getPhysicalRoot()
if out_of_core_index:
big_index = create_zbigarray(memory_size=array_number_of_items, dtype=float_dtype)
big_array, big_index = store_arrays(root, [big_array, 'big_array'], [big_index, 'big_index'])
else:
big_index = np.arange(array_number_of_items)
big_array = store_arrays(root, [big_array, 'big_array'])[0]
with timer('Create arrays instances', message_list):
if out_of_core_index:
big_index = create_zbigarray(array_number_of_items*float_dtype.itemsize, float_dtype)
big_array, big_index = store_arrays(root, [big_array, 'big_array'], [big_index, 'big_index'])
else:
big_index = np.arange(array_number_of_items)
big_array = store_arrays(root, [big_array, 'big_array'])[0]
message_list.append(set_zblock_format(big_array, zblk_format))
check_zblock_format(big_array, zblk_format)
message_list.append('Populating array with %s rows.' % array_number_of_items)
response = populate_array(self, big_array, big_index, array_schema)
message_list.extend(response['messages'])
message_list.append('Write end: ' + get_process_memory_usage())
return message_list
def bigger_than_memory_read(self, out_of_core_index=False):
def bigger_than_memory_read(self, out_of_core_index=False, **kwargs):
message_list = []
message_list.append('Read start ' + get_process_memory_usage())
......@@ -78,38 +83,57 @@ def timer(name, message_list):
start_time = time()
yield
elapsed_time = time() - start_time
message_list.append('[%s] finished in % seconds' % (name, elapsed_time))
message_list.append('[%s] finished in %s seconds' % (name, elapsed_time))
def get_process_memory_usage():
'''Return the runnign process' memory usage.'''
process = psutil.Process(os.getpid())
m = process.memory_info()
return 'VIRT: %i MB\tRSS: %iMB' % (m.vms//MB, m.rss//MB)
def get_available_memory():
return psutil.virtual_memory().available
def get_field_names_and_schema():
'''Returns the array's dtype and its field names.'''
field_names = ['f%s' % number if not number == 13 else 'quantity' for number in range(23)]
# TODO: parameterize to get normalize or not normalized schema
array_schema = np.dtype({
'names' : field_names,
'formats': map(
np.dtype,
['i8', 'i8', 'i8', 'i8', 'i8', 'i8', 'i8', 'i8', 'i8', 'i8',
'i8', 'i8', 'i8', 'f8', 'i8', 'i8', 'f8', 'f8', 'f8',
# 'a50', 'a50', 'a50', 'a50']
'f8', 'f8', 'f8', 'f8']
'a50', 'a50', 'a50', 'a50']
#'f8', 'f8', 'f8', 'f8']
)
})
return field_names, array_schema
def create_zbigarray(memory_size=None, dtype=None):
'''Create a ZBigArray with memory_size bytes with the defined dtype.'''
if memory_size and dtype:
element_number = memory_size // dtype.itemsize
return ZBigArray((element_number,), dtype)
# if memory_size and dtype:
element_number = memory_size // dtype.itemsize
return ZBigArray((element_number,), dtype)
# else:
# raise Exception('You need to provide memory_size and dtype, %s - %s' % (memory_size, dtype))
def set_zblock_format(array, zblk_format):
'''Set the array to write with the provide zblock format.
Formats are:
- 0 (low-overhead access time and high-overhead storage size)
- 1 (low overhead storage size and high-overhead access time)
'''
if zblk_format in ('0', '1'):
array._fileh.ZBlk_fmt_write = zblk_format_string = 'ZBlk%s' % zblk_format
return 'Writting with format: %s' % zblk_format_string
else:
raise 'Unknown zblk_format provided. Choose between 0 and 1. See docs strings for more information.'
def check_zblock_format(array, zblk_format):
if not array._fileh.ZBlk_fmt_write == 'ZBlk%s' % zblk_format:
raise 'Zblk format didnt match'
else:
raise Exception('You need to provide memory_size and dtype')
return True
def store_arrays(root, *arrays_filenames):
'''Store all [array, filename] pair in arrays_filenames in root and return an
......@@ -124,13 +148,26 @@ def store_arrays(root, *arrays_filenames):
def populate_array(root, big_array, big_index, schema):
'''Fill big_array with copies of a MySQL row until it is completely full'''
message_list = []
data = root.stock_offset(my_offset=0, my_limit=1)
columns = data._names
with timer('Time to prepare data for assignment', message_list):
row = tuple([filter_item(item, normalize=True) for item in data.tuples()[0]])
with timer('Time to fill ZBigArray', message_list):
big_array[:] = row
transaction.commit()
data = root.stock_offset(my_offset=0, my_limit=1)
columns = data._names
row = tuple([filter_item(item, normalize=False) for item in data.tuples()[0]])
max_memory_usage = 100*MB
message_list.append('%s megabytes of memory can be used per chunk.' % (max_memory_usage//MB))
message_list.append('Total array size is %s megabytes.' % (big_array.nbytes//MB))
# little trick here to do a ceil-division instead of floor-division: ceil(a / b) = -(-a // b)
chunks = -(-big_array.nbytes // max_memory_usage)
chunk_begin = 0
chunksize = len(big_array) // chunks
message_list.append('Using %s chunks of %s items.' % (chunks, chunksize))
with timer('Complete assign & store ZBigArray', message_list):
for i in range(chunks):
big_array[chunk_begin:chunksize*(i+1)] = row
chunk_begin = chunksize + 1
with timer('Commit each ZBigArray chunk', message_list):
transaction.commit()
return {'messages': message_list, 'columns': columns}
# method to cast Zope's DateTime objects, falsy-values and strings to floats
......@@ -155,6 +192,9 @@ def process_data(big_array, big_index, columns):
message_list = []
result = None
# When benchmarking, only runs either pandas or numpy, one process creates
# a cache for the other and this gives deceiving results.
# TODO: add parameter to choose wether numpy or pandas should be used
try:
with timer('pandas sum', message_list):
df = pd.DataFrame.from_records(big_array, index=big_index)
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment