test.erp5.testWendelin.py 7.63 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28
##############################################################################
#
# Copyright (c) 2002-2015 Nexedi SA and Contributors. All Rights Reserved.
#
# WARNING: This program as such is intended to be used by professional
# programmers who take the whole responsibility of assessing all potential
# consequences resulting from its eventual inadequacies and bugs
# End users who are looking for a ready-to-use solution with commercial
# guarantees and support are strongly adviced to contract a Free Software
# Service Company
#
# This program is Free Software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
#
##############################################################################

from Products.ERP5Type.tests.ERP5TypeTestCase import ERP5TypeTestCase
29
from DateTime import DateTime
30 31
import msgpack
import numpy as np
32 33
import string
import random
34

Ivan Tyagov's avatar
Ivan Tyagov committed
35 36 37
def getRandomString():
  return 'test_%s' %''.join([random.choice(string.ascii_letters + string.digits) \
    for n in xrange(32)])
Ivan Tyagov's avatar
Ivan Tyagov committed
38 39 40 41 42
    
def chunks(l, n):
  """Yield successive n-sized chunks from l."""
  for i in xrange(0, len(l), n):
    yield l[i:i+n]
Ivan Tyagov's avatar
Ivan Tyagov committed
43

44 45 46 47 48 49 50 51 52 53 54 55 56 57
class Test(ERP5TypeTestCase):
  """
  Wendelin Test
  """

  def getTitle(self):
    return "Wendelin Test"

  def afterSetUp(self):
    """
    This is ran before anything, used to set the environment
    """
    # here, you can create the categories and objects your test will depend on
    pass
58
  
59 60 61 62 63 64 65
  def test_0_import(self): 		 
    """ 		 
    Test we can import certain libraries but still failure to do so should be a  		 
    a test step failure rather than global test failure. 		 
    """ 		 
    import scipy 		 
    import sklearn
66
    import pandas
Ivan Tyagov's avatar
Ivan Tyagov committed
67 68
    
  def test_01_IngestionFromFluentd(self):
69 70 71 72
    """
    Test ingestion using a POST Request containing a msgpack encoded message
    simulating input from fluentd
    """
73
    now = DateTime()
74 75 76
    portal = self.portal
    request = portal.REQUEST
    
Ivan Tyagov's avatar
Ivan Tyagov committed
77
    reference = getRandomString()
Ivan Tyagov's avatar
Ivan Tyagov committed
78 79 80
    number_string_list = []
    for my_list in list(chunks(range(0, 100001), 10)):
      number_string_list.append(','.join([str(x) for x in my_list]))
81 82
    real_data = '\n'.join(number_string_list)

Ivan Tyagov's avatar
Ivan Tyagov committed
83 84 85 86 87 88 89
    # create ingestion policy
    ingestion_policy = portal.portal_ingestion_policies.newContent( \
      portal_type ='Ingestion Policy',
      reference = reference,
      version = '001',
      script_id = 'ERP5Site_handleDefaultFluentdIngestion')
    ingestion_policy.validate()
90
    
Ivan Tyagov's avatar
Ivan Tyagov committed
91 92 93 94 95 96
    # create sensor
    sensor = portal.sensor_module.newContent( \
                            portal_type='Sensor', 
                            reference = reference)
    sensor.validate()

Ivan Tyagov's avatar
Ivan Tyagov committed
97 98 99
    # create new Data Stream for test purposes
    data_stream = portal.data_stream_module.newContent( \
                   portal_type='Data Stream', \
Ivan Tyagov's avatar
Ivan Tyagov committed
100
                   reference=reference)
Ivan Tyagov's avatar
Ivan Tyagov committed
101 102
    data_stream.validate()
    
103 104
    # create Data Supply
    resource = portal.restrictedTraverse('data_product_module/wendelin_4')
Ivan Tyagov's avatar
Ivan Tyagov committed
105
    data_supply_kw = {'reference': reference,
106 107 108 109 110 111 112 113 114
                      'version': '001',
                      'start_date': now,
                      'stop_date': now + 365}
    data_supply_line_kw = {'resource_value': resource,
                           'source_section_value': sensor,
                           'destination_section_value': data_stream}
    data_supply = ingestion_policy.PortalIngestionPolicy_addDataSupply( \
                                      data_supply_kw, \
                                      data_supply_line_kw)
Ivan Tyagov's avatar
Ivan Tyagov committed
115 116
    self.tic()
    
Ivan Tyagov's avatar
Ivan Tyagov committed
117
    # simulate fluentd by setting proper values in REQUEST
118 119 120 121
    request.method = 'POST'
    data_chunk = msgpack.packb([0, real_data], use_bin_type=True)
    request.set('reference', reference)
    request.set('data_chunk', data_chunk)
122
    ingestion_policy.ingest()
123 124 125 126
    
    # ingestion handler script saves new data using new line so we 
    # need to remove it, it also stringifies thus we need to
    data_stream_data = data_stream.getData()
127
    self.assertEqual(real_data, data_stream_data)
128
    
Ivan Tyagov's avatar
Ivan Tyagov committed
129
    # try sample transformation
130
    reference = 'test-data-array- %s' %reference
Ivan Tyagov's avatar
Ivan Tyagov committed
131 132
    data_array = portal.data_array_module.newContent(
                                            portal_type='Data Array',
133 134 135 136 137 138
                                            reference = reference,
                                            version = '001')
    data_array.validate()
    self.tic()
    
    data_stream.DataStream_transform(\
Ivan Tyagov's avatar
Ivan Tyagov committed
139
        chunk_length = 10450, \
140
        transform_script_id = 'DataStream_copyCSVToDataArray',
Ivan Tyagov's avatar
Ivan Tyagov committed
141
        data_array_reference = reference)
142

143 144 145 146 147
    self.tic()
    
    # test some numpy operations
    zarray = data_array.getArray()
    np.average(zarray)
148
    
Ivan Tyagov's avatar
Ivan Tyagov committed
149
    # test that extracted array contains same values as input CSV
150
    self.assertNotEqual(None, zarray)
Ivan Tyagov's avatar
Ivan Tyagov committed
151 152 153 154
    self.assertEqual(99999.0, np.amax(zarray, axis=0))
    self.assertEqual(0.0, np.amin(zarray, axis=0))
    # failing in array shape, not investigated why
    self.assertEqual((99999,), zarray.shape)
155
    
156
  def test_02_Examples(self):
157 158 159 160 161 162 163 164
    """
      Test we can use python scientific libraries by using directly created
      Wendelin examples.
    """
    portal = self.portal
    portal.game_of_life()
    # XXX: for now following ones are disabled as wendelin.core not available
    # in testnodes framework
165 166
    portal.game_of_life_out_of_core()
    portal.game_of_life_out_of_core_activities()
167 168 169 170 171 172 173 174 175
    
  def test_03_DataArray(self):
    """
      Test persistently saving a ZBig Array to a Data Array.
    """
    from wendelin.bigarray.array_zodb import ZBigArray
    
    data_array = self.portal.data_array_module.newContent( \
                   portal_type = 'Data Array')
176
    self.assertEqual(None, data_array.getArray())
177
    data_array.initArray((3, 3), np.uint8)
178 179
    self.tic()
    
180
    # test array stored and we return ZBig Array instance
181
    persistent_zbig_array = data_array.getArray()
182
    self.assertEqual(ZBigArray, persistent_zbig_array.__class__)
183
    
184 185 186 187 188 189 190 191 192 193 194 195
    # try to resize its numpy "view" and check that persistent one is not saved
    # as these are differerent objects
    pure_numpy_array = persistent_zbig_array[:,:] # ZBigArray -> ndarray view of it
    pure_numpy_array = np.resize(pure_numpy_array, (4, 4))
    self.assertNotEquals(pure_numpy_array.shape, persistent_zbig_array.shape)
    
    # test copy numpy -> wendelin but first resize persistent one (add new one)
    data_array.initArray((4, 4), np.uint8)
    persistent_zbig_array = data_array.getArray()
    new_array = np.arange(1,17).reshape((4,4))
    persistent_zbig_array[:,:] = new_array
    self.assertEquals(new_array.shape, persistent_zbig_array.shape)
196

197
    # (enable when new wendelin.core released as it can kill system)
198
    self.assertTrue(np.array_equal(new_array, persistent_zbig_array))
199
    
200 201
    # test set element in zbig array
    persistent_zbig_array[:2, 2] = 0
202
    self.assertFalse(np.array_equal(new_array, persistent_zbig_array))
203

204
    # resize Zbig Array (enable when new wendelin.core released as it can kill system)
205 206
    persistent_zbig_array = np.resize(persistent_zbig_array, (100,100))
    self.assertNotEquals(pure_numpy_array.shape, persistent_zbig_array.shape)