Commit 99b91c84 authored by Kirill Smelkov's avatar Kirill Smelkov

RAMArray

RAMArray is compatible to ZBigArray in API and semantic, but stores its
data in RAM only. It is useful in situations where ZBigArray compatible
data type is needed, but the amount of data is small and the data itself
is needed only temporarily - e.g. in a simulation.

Please see details in individual patches.

Original merge request by @klaus (nexedi/wendelin.core!8).

/cc @Tyagov
/reviewed-on nexedi/wendelin.core!9
parents 318efce0 fc9b69d8
...@@ -692,8 +692,9 @@ class ArrayRef(object): ...@@ -692,8 +692,9 @@ class ArrayRef(object):
# 2) or it is a VMA created from under BigArray which will be # 2) or it is a VMA created from under BigArray which will be
# treated as top-level too, and corrected for in the end. # treated as top-level too, and corrected for in the end.
basetype = type(base) basetype = type(base)
if basetype.__module__ + "." + basetype.__name__ == "_bigfile.VMA": basepath = basetype.__module__ + "." + basetype.__name__
#if isinstance(base, _bigfile.VMA): XXX _bigfile does not expose VMA if basepath in ("_bigfile.VMA", "wendelin.bigarray.array_ram._VMA"):
#if isinstance(base, (_bigfile.VMA, array_ram._VMA)): XXX _bigfile does not expose VMA
bigvma = base bigvma = base
break break
......
# -*- coding: utf-8 -*-
# Wendelin.bigarray | RAM Array
# Copyright (C) 2014-2018 Nexedi SA and Contributors.
# Klaus Wölfel <klaus@nexedi.com>
# Kirill Smelkov <kirr@nexedi.com>
#
# This program is free software: you can Use, Study, Modify and Redistribute
# it under the terms of the GNU General Public License version 3, or (at your
# option) any later version, as published by the Free Software Foundation.
#
# You can also Link and Combine this program with other software covered by
# the terms of any of the Free Software licenses or any of the Open Source
# Initiative approved licenses and Convey the resulting work. Corresponding
# source of such a combination shall include the source code for all other
# software used.
#
# This program is distributed WITHOUT ANY WARRANTY; without even the implied
# warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
#
# See COPYING file for full licensing terms.
# See https://www.nexedi.com/licensing for rationale and options.
"""Module array_ram provides RAMArray that mimics ZBigArray, but keeps data in RAM.
RAMArray mimics ZBigArray API and semantic, but keeps data in RAM.
RAMArray should be used for temporary objects only - its data is not
persisted in any way.
"""
from wendelin import bigarray
import mmap, os, threading, tempfile, errno
import numpy as np
# RAMArray mimics ZBigArray API and semantic, but keeps data in RAM.
class RAMArray(bigarray.BigArray):
def __init__(self, shape, dtype, order='C'):
# the whole functionality of RAMArray is in _RAMFileH
super(RAMArray, self).__init__(shape, dtype, _RAMFileH(), order)
# _RAMFileH mimics _ZBigFileH with data kept in RAM in /dev/shm.
#
# ( we have to use mmap from a file in /dev/shm, not e.g. plain ndarray, because
# BigArray append semantic is to keep aliasing the data from previously-created
# views, and since ndarray.resize copies data, that property would not be preserved. )
class _RAMFileH(object):
# we mmap data as read/write by default.
# tests can overwrite this to be e.g. only PROT_READ to catch incorrect modifications.
_prot = mmap.PROT_READ | mmap.PROT_WRITE
def __init__(self):
# create temporary file in dev/shm and unlink it.
# ._fh keeps opened file descriptor to it.
fh, path = tempfile.mkstemp(dir="/dev/shm", prefix="ramfile.")
os.unlink(path)
self._fh = fh
# mmap(2) allows mmaping past the end, but python's mmap does not.
# we workaround it with explicitly growing file as needed.
# however we need to protect against races between concurrent .mmap() calls.
# ._mmapmu is used for this.
self._mmapmu = threading.Lock()
def mmap(self, pgoffset, pglen):
offset = pgoffset * bigarray.pagesize
length = pglen * bigarray.pagesize
with self._mmapmu:
# grow file, if needed, to cover mmaped range
needsize = offset + length
st = os.fstat(self._fh)
if st.st_size < needsize:
try:
os.ftruncate(self._fh, needsize)
except OverflowError as e:
# OverflowError: Python int too large to convert to C long
raise MemoryError(e)
# create requested mmap
try:
return _VMA(self._fh, pgoffset, pglen, bigarray.pagesize, self._prot)
# ENOMEM -> MemoryError (similarly to BigFile)
except mmap.error as e:
if e.errno == errno.ENOMEM:
raise MemoryError(e)
raise
def __del__(self):
os.close(self._fh)
# _VMA mimics PyVMA.
#
# it is just mmap.mmap, but, similarly to PyVMA, allows to set .pyuser and
# exposes other PyVMA compatible attributes.
class _VMA(mmap.mmap):
__slots__ = ['_pgoffset', '_pglen', '_pagesize', 'pyuser']
def __new__(cls, fh, pgoffset, pglen, pagesize, prot):
vma = mmap.mmap.__new__(cls,
fh,
length = pglen * pagesize,
flags = mmap.MAP_SHARED,
prot = prot,
offset = pgoffset * pagesize)
vma._pgoffset = pgoffset
vma._pglen = pglen
vma._pagesize = pagesize
return vma
def pagesize(self):
return self._pagesize
def filerange(self):
return (self._pgoffset, self._pglen)
@property
def addr_start(self):
# find out address where we are mmapped
a = np.ndarray(shape=(len(self),), dtype=np.uint8, buffer=self)
adata = a.__array_interface__.get('data')
assert adata is not None, "TODO __array_interface__.data = None"
assert isinstance(adata, tuple), "TODO __array_interface__.data is %r" % (adata,)
# adata is (data, readonly)
return adata[0]
@property
def addr_stop(self):
return self.addr_start + len(self)
...@@ -20,6 +20,7 @@ ...@@ -20,6 +20,7 @@
# See https://www.nexedi.com/licensing for rationale and options. # See https://www.nexedi.com/licensing for rationale and options.
from wendelin.bigarray import BigArray, ArrayRef, _flatbytev from wendelin.bigarray import BigArray, ArrayRef, _flatbytev
from wendelin.bigarray.array_ram import _RAMFileH
from wendelin.bigfile import BigFile from wendelin.bigfile import BigFile
from wendelin.lib.mem import memcpy from wendelin.lib.mem import memcpy
from wendelin.lib.calc import mul from wendelin.lib.calc import mul
...@@ -28,7 +29,8 @@ from numpy import ndarray, dtype, int64, int32, uint32, int16, uint8, all, zeros ...@@ -28,7 +29,8 @@ from numpy import ndarray, dtype, int64, int32, uint32, int16, uint8, all, zeros
from numpy.lib.stride_tricks import as_strided from numpy.lib.stride_tricks import as_strided
import numpy import numpy
from pytest import raises import os, mmap
from pytest import raises, fixture
# Synthetic bigfile that just loads zeros, and ignores writes (= a-la /dev/zero) # Synthetic bigfile that just loads zeros, and ignores writes (= a-la /dev/zero)
...@@ -59,19 +61,49 @@ class BigFile_Data(BigFile): ...@@ -59,19 +61,49 @@ class BigFile_Data(BigFile):
memcpy(self.datab[self.blksize * blk : self.blksize * (blk+1)], buf) memcpy(self.datab[self.blksize * blk : self.blksize * (blk+1)], buf)
# synthetic bigfile that only loads data from numpy array PS = 2*1024*1024 # FIXME hardcoded, TODO -> ram.pagesize
class BigFile_Data_RO(BigFile_Data):
def storeblk(self, blk, buf):
raise RuntimeError('tests should not try to change test data')
# tBigFile provides .fopen() to fileh_open a big file handle via ^^^ BigFile_*
class tBigFile:
def fopen(self, data=None, readonly=False):
if data is None:
bigf = BigFile_Zero(PS)
else:
bigf = BigFile_Data(data, PS)
if readonly:
def _(self, blk, buf):
raise RuntimeError('tests should not try to change test data')
bigf.storeblk = _
return bigf.fileh_open()
# tRAM provides .fopen() to open a file handle via _RAMFileH.
class tRAM:
def fopen(self, data=None, readonly=False):
fh = _RAMFileH()
if data is not None:
fh2 = os.dup(fh._fh) # fdopen takes ownershipf of fd and closes it
with os.fdopen(fh2, 'wb') as f:
f.write(data)
if readonly:
fh._prot = mmap.PROT_READ
return fh
# testbig is fixture that provides .fopen(...) to open a big file handle from
# ^^^ BigFile_* or correspondingly from RAM.
@fixture(scope="module", params=[tBigFile, tRAM])
def testbig(request):
cls = request.param
yield cls()
PS = 2*1024*1024 # FIXME hardcoded, TODO -> ram.pagesize
# make sure we don't let dtype with object to be used with BigArray # make sure we don't let dtype with object to be used with BigArray
def test_bigarray_noobject(): def test_bigarray_noobject(testbig):
Z = BigFile_Zero(PS) Zh = testbig.fopen()
Zh = Z.fileh_open()
# NOTE str & unicode are fixed-size types - if size is not explicitly given # NOTE str & unicode are fixed-size types - if size is not explicitly given
# it will become S0 or U0 # it will become S0 or U0
...@@ -81,9 +113,8 @@ def test_bigarray_noobject(): ...@@ -81,9 +113,8 @@ def test_bigarray_noobject():
# basic ndarray-compatibility attributes of BigArray # basic ndarray-compatibility attributes of BigArray
def test_bigarray_basic(): def test_bigarray_basic(testbig):
Z = BigFile_Zero(PS) Zh = testbig.fopen()
Zh = Z.fileh_open()
A = BigArray((10,3), int32, Zh) A = BigArray((10,3), int32, Zh)
...@@ -143,9 +174,8 @@ class DoubleCheck(DoubleGet): ...@@ -143,9 +174,8 @@ class DoubleCheck(DoubleGet):
# getitem/setitem (1d case) # getitem/setitem (1d case)
def test_bigarray_indexing_1d(): def test_bigarray_indexing_1d(testbig):
Z = BigFile_Zero(PS) Zh = testbig.fopen()
Zh = Z.fileh_open()
A = BigArray((10*PS,), uint8, Zh) A = BigArray((10*PS,), uint8, Zh)
...@@ -303,12 +333,11 @@ def test_bigarray_indexing_1d(): ...@@ -303,12 +333,11 @@ def test_bigarray_indexing_1d():
# indexing where accessed element overlaps edge between pages # indexing where accessed element overlaps edge between pages
def test_bigarray_indexing_pageedge(): def test_bigarray_indexing_pageedge(testbig):
shape = (10, PS-1) shape = (10, PS-1)
data = arange(mul(shape), dtype=uint32).view(uint8) # NOTE 4 times bigger than uint8 data = arange(mul(shape), dtype=uint32).view(uint8) # NOTE 4 times bigger than uint8
f = BigFile_Data_RO(data, PS) fh = testbig.fopen(data, readonly=True)
fh = f.fileh_open()
A = BigArray(shape, uint8, fh) # bigarray with test data and shape A = BigArray(shape, uint8, fh) # bigarray with test data and shape
A_ = data[:mul(shape)].reshape(shape) # ndarray ----//---- A_ = data[:mul(shape)].reshape(shape) # ndarray ----//----
...@@ -325,8 +354,7 @@ def test_bigarray_indexing_pageedge(): ...@@ -325,8 +354,7 @@ def test_bigarray_indexing_pageedge():
shape = (10, PS+1) shape = (10, PS+1)
f = BigFile_Data_RO(data, PS) fh = testbig.fopen(data, readonly=True)
fh = f.fileh_open()
A = BigArray(shape, uint8, fh) A = BigArray(shape, uint8, fh)
A_ = data[:mul(shape)].reshape(shape) A_ = data[:mul(shape)].reshape(shape)
...@@ -371,7 +399,7 @@ def idx_to_test(shape, idx_prefix=()): ...@@ -371,7 +399,7 @@ def idx_to_test(shape, idx_prefix=()):
# getitem/setitem (Nd case) # getitem/setitem (Nd case)
def test_bigarray_indexing_Nd(): def test_bigarray_indexing_Nd(testbig):
# shape of tested array - all primes, total size for uint32 ~ 7 2M pages # shape of tested array - all primes, total size for uint32 ~ 7 2M pages
# XXX even less dimensions (to speed up tests)? # XXX even less dimensions (to speed up tests)?
shape = tuple(reversed( (17,23,101,103) )) shape = tuple(reversed( (17,23,101,103) ))
...@@ -381,8 +409,7 @@ def test_bigarray_indexing_Nd(): ...@@ -381,8 +409,7 @@ def test_bigarray_indexing_Nd():
# (else data slice will be smaller than buf) # (else data slice will be smaller than buf)
data = arange(mul(shape) + PS, dtype=uint32) data = arange(mul(shape) + PS, dtype=uint32)
f = BigFile_Data_RO(data, PS) fh = testbig.fopen(data, readonly=True)
fh = f.fileh_open()
for order in ('C', 'F'): for order in ('C', 'F'):
A = BigArray(shape, uint32, fh, order=order) # bigarray with test data and shape A = BigArray(shape, uint32, fh, order=order) # bigarray with test data and shape
...@@ -440,10 +467,9 @@ def test_bigarray_indexing_Nd(): ...@@ -440,10 +467,9 @@ def test_bigarray_indexing_Nd():
""" """
def test_bigarray_resize(): def test_bigarray_resize(testbig):
data = zeros(8*PS, dtype=uint32) data = zeros(8*PS, dtype=uint32)
f = BigFile_Data(data, PS) fh = testbig.fopen(data)
fh = f.fileh_open()
# set first part & ensure it is set correctly # set first part & ensure it is set correctly
A = BigArray((10,3), uint32, fh) A = BigArray((10,3), uint32, fh)
...@@ -507,11 +533,10 @@ def arange32_f(start, stop, dtype=None): ...@@ -507,11 +533,10 @@ def arange32_f(start, stop, dtype=None):
return arange(start*3*2, stop*3*2, dtype=dtype).reshape(2,3,(stop-start), order='F') return arange(start*3*2, stop*3*2, dtype=dtype).reshape(2,3,(stop-start), order='F')
#return arange(start*3*2, stop*3*2, dtype=dtype).reshape(2,3,(stop-start)) #return arange(start*3*2, stop*3*2, dtype=dtype).reshape(2,3,(stop-start))
def test_bigarray_append(): def test_bigarray_append(testbig):
for order in ('C', 'F'): for order in ('C', 'F'):
data = zeros(8*PS, dtype=uint32) data = zeros(8*PS, dtype=uint32)
f = BigFile_Data(data, PS) fh = testbig.fopen(data)
fh = f.fileh_open()
arange32 = {'C': arange32_c, 'F': arange32_f} [order] arange32 = {'C': arange32_c, 'F': arange32_f} [order]
...@@ -552,9 +577,8 @@ def test_bigarray_append(): ...@@ -552,9 +577,8 @@ def test_bigarray_append():
def test_bigarray_list(): def test_bigarray_list(testbig):
Z = BigFile_Zero(PS) Zh = testbig.fopen()
Zh = Z.fileh_open()
A = BigArray((10,), uint8, Zh) A = BigArray((10,), uint8, Zh)
# the IndexError for out-of-bound scalar access should allow, though # the IndexError for out-of-bound scalar access should allow, though
...@@ -564,9 +588,8 @@ def test_bigarray_list(): ...@@ -564,9 +588,8 @@ def test_bigarray_list():
assert l == [0]*10 assert l == [0]*10
def test_bigarray_to_ndarray(): def test_bigarray_to_ndarray(testbig):
Z = BigFile_Zero(PS) Zh = testbig.fopen()
Zh = Z.fileh_open()
A = BigArray((10,), uint8, Zh) A = BigArray((10,), uint8, Zh)
# without IndexError on out-of-bound scalar access, the following # without IndexError on out-of-bound scalar access, the following
...@@ -594,7 +617,7 @@ def test_bigarray_to_ndarray(): ...@@ -594,7 +617,7 @@ def test_bigarray_to_ndarray():
def test_arrayref(): def test_arrayref(testbig):
# test data - all items are unique - so we can check array by content # test data - all items are unique - so we can check array by content
data = zeros(PS, dtype=uint8) data = zeros(PS, dtype=uint8)
data32 = data.view(uint32) data32 = data.view(uint32)
...@@ -683,8 +706,7 @@ def test_arrayref(): ...@@ -683,8 +706,7 @@ def test_arrayref():
# data_ is the same as data but shifted to exercise vma and vma->broot offsets calculation. # data_ is the same as data but shifted to exercise vma and vma->broot offsets calculation.
data_ = zeros(8*PS, dtype=uint8) data_ = zeros(8*PS, dtype=uint8)
data_[2*PS-1:][:PS] = data data_[2*PS-1:][:PS] = data
f = BigFile_Data_RO(data_, PS) fh = testbig.fopen(data_, readonly=True)
fh = f.fileh_open()
A = BigArray(data_.shape, data_.dtype, fh) A = BigArray(data_.shape, data_.dtype, fh)
assert array_equal(A[2*PS-1:][:PS], data) assert array_equal(A[2*PS-1:][:PS], data)
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment