Commit ec3443d7 authored by Jean-Paul Smets's avatar Jean-Paul Smets

Initial code review. More to come.

git-svn-id: https://svn.erp5.org/repos/public/erp5/trunk@13957 20353a03-c40f-0410-a6d1-a30d3c3de9de
parent 7e275281
##############################################################################
#
# Copyright (c) 2005 Nexedi SARL and Contributors. All Rights Reserved.
# Kevin Deldycke <kevin_AT_nexedi_DOT_com>
# Copyright (c) 2007 Nexedi SA and Contributors. All Rights Reserved.
# Bartek Gorny <bg@erp5.pl>
# Jean-Paul Smets <jp@nexedi.com>
#
# WARNING: This program as such is intended to be used by professional
# programmers who take the whole responsability of assessing all potential
......@@ -26,25 +27,25 @@
#
##############################################################################
import os
import sys
import cStringIO
from xml.dom.minidom import parseString
import os, sys, cStringIO
import zipfile
from xml.dom.minidom import parseString
from cgi import FieldStorage
from zLOG import LOG
from zExceptions import BadRequest
from Testing import ZopeTestCase
from DateTime import DateTime
from AccessControl.SecurityManagement import newSecurityManager
from Products.CMFCore.utils import getToolByName
from Products.ERP5Type.Utils import convertToUpperCase
from Products.ERP5Type.tests.ERP5TypeTestCase import ERP5TypeTestCase
from Products.ERP5Type.tests.Sequence import SequenceList
from Products.ERP5Type.Cache import clearCache
from Products.ERP5OOo.Document.OOoDocument import ConversionError
from Products.ERP5.Document.File import _unpackData
from zLOG import LOG
if __name__ == '__main__':
execfile(os.path.join(sys.path[0], 'framework.py'))
......@@ -53,28 +54,20 @@ if __name__ == '__main__':
os.environ['EVENT_LOG_FILE'] = os.path.join(os.getcwd(), 'zLOG.log')
os.environ['EVENT_LOG_SEVERITY'] = '-300'
ooodoc_coordinates = ('127.0.0.1', 8008)
# Define the conversion server host
conversion_server_host = ('127.0.0.1', 8008)
testrun = ()
def shout(msg):
def printAndLog(msg):
"""
A utility function to print a message
to the standard output and to the LOG
at the same time
"""
msg = str(msg)
ZopeTestCase._print('\n ' + msg)
LOG('Testing... ', 0, msg)
def unpackData(data):
"""
Unpack Pdata into string
"""
if isinstance(data, str):
return data
else:
data_list = []
while data is not None:
data_list.append(data.data)
data = data.next
return ''.join(data_list)
class FileUploadTest(file):
__allow_access_to_unprotected_subobjects__=1
......@@ -85,7 +78,7 @@ class FileUploadTest(file):
self.headers = {}
def makeFilePath(name):
return os.getenv('INSTANCE_HOME') + '/../Products/ERP5OOo/tests/data/' + name
return os.getenv('INSTANCE_HOME') + '/../Products/ERP5OOo/tests/test_document/' + name
def makeFileUpload(name):
path = makeFilePath(name)
......@@ -97,7 +90,7 @@ class TestIngestion(ERP5TypeTestCase):
"""
# pseudo constants
RUN_ALL_TEST = 1
RUN_ALL_TEST = 0
QUIET = 0
##################################
......@@ -108,7 +101,7 @@ class TestIngestion(ERP5TypeTestCase):
"""
Return the title of the current test set.
"""
return "ERP5 DMS - ingestion"
return "ERP5 DMS - Ingestion"
def getBusinessTemplateList(self):
"""
......@@ -121,48 +114,33 @@ class TestIngestion(ERP5TypeTestCase):
Initialize the ERP5 site.
"""
self.login()
self.datetime = DateTime()
self.portal = self.getPortal()
self.datetime = DateTime()
self.portal = self.getPortal()
self.portal_categories = self.getCategoryTool()
self.portal_catalog = self.getCatalogTool()
self.createCategories()
self.createPreferences()
self.portal_catalog = self.getCatalogTool()
self.createDefaultCategoryList()
self.setSystemPreference()
self.createTools()
self.unpackData()
def unpackData(self):
"""
Unpack the content of testIngestion_docs.zip
"""
join = os.path.join
base_path = join(os.getenv('INSTANCE_HOME'), '..', 'Products', 'ERP5OOo', 'tests')
zf = zipfile.ZipFile(join(base_path, 'testIngestion_docs.zip'))
data_dir = join(base_path, 'data')
if not os.path.isdir(data_dir):
os.mkdir(data_dir)
for name in zf.namelist():
fname = join(data_dir, name)
if not os.path.exists(fname):
try:
f = open(fname, 'w')
f.write(zf.read(name))
finally:
f.close()
def createTools(self):
"""
Set up contribution tool and content type registry
NOTE: portal_contributions is not created yet
by ERP5Site bootstrap. This is why me must create it
here. We also delete it and recreate it in case
it was saved by a previous --save run of the test.
XXX - what about mimetype registry ?
"""
# XXX portal_contributions is not created in bootstrap
# so we have to create it here
# before we delete in case it was created before and --saved
# Delete and create portal_contributions
try:
self.portal._delObject('portal_contributions')
except AttributeError:
pass
addTool = self.portal.manage_addProduct['ERP5'].manage_addTool
addTool('ERP5 Contribution Tool', None)
# the same for portal_mailin
# Delete and create portal_mailin
try:
self.portal._delObject('portal_mailin')
except AttributeError:
......@@ -172,11 +150,12 @@ class TestIngestion(ERP5TypeTestCase):
mailin = self.portal.portal_mailin
mailin.edit_configuration('Document_ingestEmail')
def createPreferences(self):
def setSystemPreference(self):
default_pref = self.portal.portal_preferences.default_site_preference
default_pref.setPreferredOoodocServerAddress(ooodoc_coordinates[0])
default_pref.setPreferredOoodocServerPortNumber(ooodoc_coordinates[1])
default_pref.setPreferredDocumentFileNameRegularExpression("(?P<reference>[A-Z]{3,6})-(?P<language>[a-z]{2})-(?P<version>[0-9]{3})")
default_pref.setPreferredOoodocServerAddress(conversion_server_host[0])
default_pref.setPreferredOoodocServerPortNumber(conversion_server_host[1])
default_pref.setPreferredDocumentFileNameRegularExpression(
"(?P<reference>[A-Z]{3,6})-(?P<language>[a-z]{2})-(?P<version>[0-9]{3})")
default_pref.enable()
......@@ -188,15 +167,20 @@ class TestIngestion(ERP5TypeTestCase):
"""
Create a new manager user and login.
"""
user_name = 'bartek'
user_name = 'dms_user'
user_folder = self.portal.acl_users
user_folder._doAddUser(user_name, '', ['Manager', 'Owner', 'Assignor'], [])
user = user_folder.getUserById(user_name).__of__(user_folder)
newSecurityManager(None, user)
def createCategories(self):
def createDefaultCategoryList(self):
"""
Create some categories for testing.
Create some categories for testing. DMS security
is based on group, site, function, publication_section
and projects.
NOTE (XXX): some parts of this method could be either
moved to Category Tool or to ERP5 Test Case.
"""
self.category_list = [
# Role categories
......@@ -215,6 +199,12 @@ class TestIngestion(ERP5TypeTestCase):
,{'path' : 'group/anybody'
,'title': 'Anybody'
}
,{'path' : 'publication_section/cop'
,'title': 'COPs'
}
,{'path' : 'publication_section/cop/one'
,'title': 'COP one'
}
]
# Create categories
......@@ -258,147 +248,191 @@ class TestIngestion(ERP5TypeTestCase):
Get a list of categories with same base categories.
"""
categories = []
if base_category != None:
if base_category is not None:
for category in self.category_list:
if category["path"].split('/')[0] == base_category:
categories.append(category)
return categories
def getDocument(self, id):
dm = self.portal.document_module
doc = getattr(dm, id)
return doc
"""
Returns a document with given ID in the
document module.
"""
document_module = self.portal.document_module
return getattr(document_module, id)
def checkObjectCatalogged(self, portal_type, reference):
def checkIsObjectCatalogged(self, portal_type, **kw):
"""
make sure this object is already in the catalog
Make sure that a document with given portal type
and kw properties is already present in the catalog.
Typical use of this method consists in providing
an id or reference.
"""
res = self.portal_catalog(portal_type=portal_type, reference=reference)
res = self.portal_catalog(portal_type=portal_type, **kw.copy())
self.assertEquals(len(res), 1)
self.assertEquals(res[0].getReference(), reference)
for key, value in kw.items():
self.assertEquals(res[0].getProperty(key), value)
def createDocument(self, portal_type, id):
def newEmptyCataloggedDocument(self, portal_type, id):
"""
create an empty document of given portal type
it has id as given and reference like document_[id]
immediately catalogged and verified in two ways
Create an empty document of given portal type
and given ID.
Documents are immediately catalogged and verified
both form catalog point of view and from their
presence in the document module.
"""
dm = self.portal.getDefaultModule(portal_type)
doc = getattr(dm, id, None)
if doc is not None:
dm.manage_delObjects([id,])
reference = 'document_' + id
doc = dm.newContent(portal_type=portal_type, id=id, reference=reference)
#doctext._getServerCoordinate = getOoodCoordinate()
doc.reindexObject(); get_transaction().commit(); self.tic()
self.checkObjectCatalogged(portal_type, reference)
self.assert_(hasattr(dm, id))
document_module = self.portal.getDefaultModule(portal_type)
document = getattr(document_module, id, None)
if document is not None:
document_module.manage_delObjects([id,])
document = document_module.newContent(portal_type=portal_type, id=id)
document.reindexObject()
get_transaction().commit()
self.tic()
self.checkIsObjectCatalogged(portal_type, id=id, parent_uid=document_module.getUid())
self.assert_(hasattr(document_module, id))
return document
def ingestFormatList(self, doc_id, formats_from, portal_type=None):
def ingestFormatList(self, document_id, format_list, portal_type=None):
"""
method for bulk ingesting files of various formats
we take them one by one based on naming convention
ingest, convert
check that a magic word is in every of them
(unless it is Image or File)
Upload in document document_id all test files which match
any of the formats in format_list.
portal_type can be specified to force the use of
the default module for a given portal type instead
of the document module.
For every file, this checks is the word "magic"
is present in both SearchableText and asText.
"""
if portal_type is None:
dm = self.portal.document_module
document_module = self.portal.document_module
else:
dm = self.portal.getDefaultModule(portal_type)
context = getattr(dm, doc_id)
for rev, format in enumerate(formats_from):
document_module = self.portal.getDefaultModule(portal_type)
context = getattr(document_module, document_id)
for revision, format in enumerate(format_list):
filename = 'TEST-en-002.' + format
f = makeFileUpload(filename)
context.edit(file=f)
context.convertToBaseFormat()
context.reindexObject(); get_transaction().commit(); self.tic()
context.reindexObject()
get_transaction().commit()
self.tic()
self.failUnless(context.hasFile())
if context.getPortalType() in ('Image', 'File', 'PDF'): # these are not subject to conversion
if context.getPortalType() in ('Image', 'File',):
# File and images do not support conversion to text in DMS
self.assertEquals(context.getExternalProcessingState(), 'uploaded')
else:
self.assertEquals(context.getExternalProcessingState(), 'converted') # this is how we know if it was ok or not
self.assert_('magic' in context.SearchableText())
self.assert_('magic' in str(context.asText()))
def checkDocumentExportList(self, doc_id, format, targets):
def checkDocumentExportList(self, document_id, format, asserted_target_list):
"""
given the docs id
make sure targets are in
the objects target format list
Upload document ID document_id with
a test file of given format and assert that the document
can be converted to any of the formats in asserted_target_list
"""
context = self.getDocument(doc_id)
context = self.getDocument(document_id)
filename = 'TEST-en-002.' + format
f = makeFileUpload(filename)
context.edit(file=f)
context.convertToBaseFormat()
context.reindexObject(); get_transaction().commit(); self.tic()
clearCache()
target_list = [x[1] for x in context.getTargetFormatItemList()]
for target in targets:
context.reindexObject()
get_transaction().commit()
self.tic()
clearCache() # We call clear cache to be sure that
# the target list is updated
target_list = context.getTargetFormatList()
for target in asserted_target_list:
self.assert_(target in target_list)
def contributeFileList(self, with_portal_type=False):
"""
some file types fail, but generally it shows that if the content type
is detected correctly, everything goes smooth
the sdc issue requires further investigation
"""
ext2type = (
('ppt' , 'Presentation')
,('doc' , 'Text')
#,('sdc' , 'Spreadsheet') - XXX fails for totally mysterious reasons
#,('sxc' , 'Drawing') - not with this content_type_registry
,('pdf' , 'PDF')
,('jpg' , 'Image')
,('py' , 'File')
Tries to a create new content through portal_contributions
for every possible file type. If with_portal_type is set
to true, portal_type is specified when calling newContent
on portal_contributions.
"""
extension_to_type = (
('ppt', 'Presentation')
,('doc', 'Text')
,('sdc', 'Spreadsheet')
,('sxc', 'Drawing')
,('pdf', 'PDF')
,('jpg', 'Image')
,('py', 'File')
)
for ext, typ in ext2type:
shout(ext)
filename = 'TEST-en-002.' + ext
for extension, portal_type in extension_to_type:
printAndLog(extension)
filename = 'TEST-en-002.' + extension
file = makeFileUpload(filename)
shout(file.filename)
printAndLog(file.filename)
if with_portal_type:
ob = self.portal.portal_contributions.newContent(portal_type=typ, file=file)
ob = self.portal.portal_contributions.newContent(portal_type=portal_type, file=file)
else:
ob = self.portal.portal_contributions.newContent(file=file)
get_transaction().commit()
self.tic()
self.assertEquals(ob.getPortalType(), typ)
self.assertEquals(ob.getPortalType(), portal_type)
self.assertEquals(ob.getReference(), 'TEST')
ob.reindexObject(); get_transaction().commit(); self.tic()
if ob.getPortalType() in ('Image', 'File', 'PDF'): # these are not subject to conversion
if ob.getPortalType() in ('Image', 'File', 'PDF'):
# Image, File and PDF are not converted to a base format
self.assertEquals(ob.getExternalProcessingState(), 'uploaded')
else:
self.assertEquals(ob.getExternalProcessingState(), 'converted') # this is how we know if it was ok or not
# We check if conversion has succeeded by looking
# at the external_processing workflow
self.assertEquals(ob.getExternalProcessingState(), 'converted')
self.assert_('magic' in ob.SearchableText())
def setDiscoveryScript(self, object_id, script_id, args, code):
def newPythonScript(self, object_id, script_id, argument_list, code):
"""
Creates a new python script with given argument_list
and source code.
"""
context = self.getDocument(object_id)
factory = context.manage_addProduct['PythonScripts'].manage_addPythonScript
factory(id=script_id)
script = getattr(context, script_id)
script.ZPythonScript_edit(args, code)
script.ZPythonScript_edit(argument_list, code)
def setDiscoveryOrder(self, order, id='one'):
def setDiscoveryOrder(self, order, id):
"""
Creates and sets appropriate discovery order list script
Creates a script to define the metadata discovery order
for Text documents.
"""
script_code = "return %s" % str(order)
self.setDiscoveryScript(id, 'Text_getPreferredDocumentMetadataDiscoveryOrderList', '', script_code)
self.newPythonScript(id, 'Text_getPreferredDocumentMetadataDiscoveryOrderList', '', script_code)
def discoverMetadata(self):
context = self.getDocument('one')
shout(context.Text_getPreferredDocumentMetadataDiscoveryOrderList())
context._backup_input = dict(reference='INPUT', language='in', version='004', short_title='from_input', contributor='person_module/james')
shout(context.getSourceReference())
def discoverMetadata(self, document_id='one'):
"""
Sets input parameters and on the document ID document_id
and discover metadata. For reindexing
"""
context = self.getDocument(document_id)
printAndLog(context.Text_getPreferredDocumentMetadataDiscoveryOrderList())
context._backup_input = dict(reference='INPUT', language='in',
version='004', short_title='from_input',
contributor='person_module/james')
printAndLog(context.getSourceReference())
context.discoverMetadata(context.getSourceReference())
# we don't need user_login here because
# for testing we overwrite Text_getPropertyDictFromUserLogin
context.reindexObject(); get_transaction().commit(); self.tic()
context.reindexObject()
get_transaction().commit()
self.tic()
def checkMetadataOrder(self, expected):
context = self.getDocument('one')
for k, v in expected.items():
def checkMetadataOrder(self, expected_metadata, document_id='one'):
"""
Asserts that metadata of document ID document_id
is the same as expected_metadata
"""
context = self.getDocument(document_id)
for k, v in expected_metadata.items():
self.assertEquals(context.getProperty(k), v)
##################################
......@@ -408,46 +442,9 @@ class TestIngestion(ERP5TypeTestCase):
def stepTic(self, sequence=None, sequence_list=None, **kw):
self.tic()
def stepCheckPreferences(self, sequence=None, sequence_list=None, **kw):
"""
make sure preferences are set up properly and accessible
"""
self.assertEquals(self.portal.portal_preferences.getPreferredOoodocServerAddress(), ooodoc_coordinates[0])
self.assertEquals(self.portal.portal_preferences.getPreferredOoodocServerPortNumber(), ooodoc_coordinates[1])
self.assertEquals(self.portal.portal_preferences.default_site_preference.getPreferredDocumentFileNameRegularExpression(), "(?P<reference>[A-Z]{3,6})-(?P<language>[a-z]{2})-(?P<version>[0-9]{3})")
def stepCheckContentTypeRegistry(self, sequence=None, sequence_list=None, **kw):
"""
check if we successfully imported registry
and that it has all the entries we need
"""
reg = self.portal.content_type_registry
correct_type_mapping = {
'doc' : 'Text',
'txt' : 'Text',
'odt' : 'Text',
'sxw' : 'Text',
'rtf' : 'Text',
'gif' : 'Image',
'jpg' : 'Image',
'png' : 'Image',
'bmp' : 'Image',
'pdf' : 'PDF',
'xls' : 'Spreadsheet',
'ods' : 'Spreadsheet',
'sdc' : 'Spreadsheet',
'ppt' : 'Presentation',
'odp' : 'Presentation',
'sxi' : 'Presentation',
'xxx' : 'File',
}
for type, portal_type in correct_type_mapping.items():
file_name = 'aaa.' + type
self.assertEquals(reg.findTypeName(file_name, None, None), portal_type)
def stepCreatePerson(self, sequence=None, sequence_list=None, **kw):
"""
Create a person (if not exists).
Create a person with ID "john" if it does not exists already
"""
portal_type = 'Person'
id = 'john'
......@@ -463,69 +460,67 @@ class TestIngestion(ERP5TypeTestCase):
def stepCreateTextDocument(self, sequence=None, sequence_list=None, **kw):
"""
create an empty Text document 'one'
for further testing
(first delete if exists)
Create an empty Text document with ID 'one'
This document will be used in most tests.
"""
self.createDocument('Text', 'one')
self.newEmptyCataloggedDocument('Text', 'one')
def stepCreateSpreadsheetDocument(self, sequence=None, sequence_list=None, **kw):
"""
create an empty Spreadsheet document 'two'
for further testing
(first delete if exists)
Create an empty Spreadsheet document with ID 'two'
This document will be used in most tests.
"""
self.createDocument('Spreadsheet', 'two')
self.newEmptyCataloggedDocument('Spreadsheet', 'two')
def stepCreatePresentationDocument(self, sequence=None, sequence_list=None, **kw):
"""
create an empty Presentation document 'three'
for further testing
(first delete if exists)
Create an empty Presentation document with ID 'three'
This document will be used in most tests.
"""
self.createDocument('Presentation', 'three')
self.newEmptyCataloggedDocument('Presentation', 'three')
def stepCreateDrawingDocument(self, sequence=None, sequence_list=None, **kw):
"""
create an empty Drawing document 'four'
for further testing
(first delete if exists)
Create an empty Drawing document with ID 'four'
This document will be used in most tests.
"""
self.createDocument('Drawing', 'four')
self.newEmptyCataloggedDocument('Drawing', 'four')
def stepCreatePDFDocument(self, sequence=None, sequence_list=None, **kw):
"""
create an empty PDF document 'five'
for further testing
(first delete if exists)
Create an empty PDF document with ID 'five'
This document will be used in most tests.
"""
self.createDocument('PDF', 'five')
self.newEmptyCataloggedDocument('PDF', 'five')
def stepCreateImageDocument(self, sequence=None, sequence_list=None, **kw):
"""
create an empty Image document 'six'
for further testing
(first delete if exists)
Create an empty Image document with ID 'six'
This document will be used in most tests.
"""
self.createDocument('Image', 'six')
self.newEmptyCataloggedDocument('Image', 'six')
def stepCheckEmptyState(self, sequence=None, sequence_list=None, **kw):
"""
check if the document is in "empty" processing state
Check if the document is in "empty" processing state
(ie. no file upload has been done yet)
"""
context = self.getDocument('one')
return self.assertEquals(context.getExternalProcessingState(), 'empty')
def stepCheckUploadedState(self, sequence=None, sequence_list=None, **kw):
"""
check if the document is in "uploaded" processing state
Check if the document is in "uploaded" processing state
(ie. a file upload has been done)
"""
context = self.getDocument('one')
return self.assertEquals(context.getExternalProcessingState(), 'uploaded')
def stepCheckConvertedState(self, sequence=None, sequence_list=None, **kw):
"""
check if the document is in "converted" processing state
Check if the document is in "converted" processing state
(ie. a file upload has been done and the document has
been converted)
"""
context = self.getDocument('one')
return self.assertEquals(context.getExternalProcessingState(), 'converted')
......@@ -535,33 +530,55 @@ class TestIngestion(ERP5TypeTestCase):
Upload a file directly from the form
check if it has the data and source_reference
"""
doc = self.getDocument('one')
document = self.getDocument('one')
# Revision is 0 before upload (revisions are strings)
self.assertEquals(document.getRevision(), '0')
f = makeFileUpload('TEST-en-002.doc')
doc.edit(file=f)
self.assert_(doc.hasFile())
#self.assertEquals(doc.getSourceReference(), 'TEST-en-002.doc')
self.assertEquals(doc.getRevision(), '')
doc.reindexObject(); get_transaction().commit(); self.tic()
document.edit(file=f)
self.assert_(document.hasFile())
# XXX - source_reference set to file name ? is this a default feature or generic ?
#self.assertEquals(document.getSourceReference(), 'TEST-en-002.doc')
# Revision is 1 after upload (revisions are strings)
self.assertEquals(document.getRevision(), '1')
document.reindexObject()
get_transaction().commit()
self.tic()
def stepDialogUpload(self, sequence=None, sequence_list=None, **kw):
"""
upload a file using dialog
should increase revision
Upload a file using the dialog script Document_uploadFile
and make sure this increases the revision
"""
context = self.getDocument('one')
f = makeFileUpload('TEST-en-002.doc')
revision = context.getRevision()
context.Document_uploadFile(file=f)
self.assertEquals(context.getRevision(), '001')
context.reindexObject(); get_transaction().commit(); self.tic()
self.assertEquals(context.getRevision(), str(int(revision) + 1))
context.reindexObject()
get_transaction().commit()
self.tic()
def stepDiscoverFromFilename(self, sequence=None, sequence_list=None, **kw):
"""
upload file using dialog
this should trigger metadata discovery and we should have
basic coordinates immediately, from first stage
Upload a file using the dialog script Document_uploadFile.
This should trigger metadata discovery and we should have
basic coordinates immediately, from first stage.
"""
context = self.getDocument('one')
f = makeFileUpload('TEST-en-002.doc')
file_name = 'TEST-en-002.doc'
# First make sure the regular expressions work
property_dict = context.getPropertyDictFromFileName(file_name)
self.assertEquals(property_dict['reference'], 'TEST')
self.assertEquals(property_dict['language'], 'en')
self.assertEquals(property_dict['version'], '002')
# Then make sure content discover works
# XXX - This part must be extended
property_dict = context.getPropertyDictFromContent()
self.assertEquals(property_dict['title'], 'title')
self.assertEquals(property_dict['description'], 'comments')
self.assertEquals(property_dict['subject_list'], ['keywords'])
# Then make sure metadata discovery works
f = makeFileUpload(file_name)
context.Document_uploadFile(file=f)
self.assertEquals(context.getReference(), 'TEST')
self.assertEquals(context.getLanguage(), 'en')
......@@ -569,21 +586,25 @@ class TestIngestion(ERP5TypeTestCase):
def stepCheckConvertedContent(self, sequence=None, sequence_list=None, **kw):
"""
check if the input file was successfully converted
and that it includes what it should
Check that the input file was successfully converted
and that its SearchableText and asText contain
the word "magic"
"""
self.tic()
context = self.getDocument('one')
self.assert_(context.hasOOFile())
self.assert_(context.hasBaseData())
self.assert_('magic' in context.SearchableText())
self.assert_('magic' in str(context.asText()))
def stepSetDiscoveryScripts(self, sequence=None, sequence_list=None, **kw):
def stepSetSimulatedDiscoveryScript(self, sequence=None, sequence_list=None, **kw):
"""
Create Text_getPropertyDictFrom[source] scripts
to simulate custom site's configuration
"""
self.setDiscoveryScript('one', 'Text_getPropertyDictFromUserLogin', 'user_name=None', "return {'contributor':'person_module/john'}")
self.setDiscoveryScript('one', 'Text_getPropertyDictFromContent', '', "return {'short_title':'short'}")
self.newPythonScript('one', 'Text_getPropertyDictFromUserLogin',
'user_name=None', "return {'contributor':'person_module/john'}")
self.newPythonScript('one', 'Text_getPropertyDictFromContent', '',
"return {'short_title':'short'}")
def stepTestMetadataSetting(self, sequence=None, sequence_list=None, **kw):
"""
......@@ -595,18 +616,21 @@ class TestIngestion(ERP5TypeTestCase):
context.Document_uploadFile(file=f)
get_transaction().commit()
self.tic()
# Then make sure content discover works
property_dict = context.getPropertyDictFromUserLogin()
self.assertEquals(property_dict['contributor'], 'person_module/john')
# reference from filename (the rest was checked some other place)
self.assertEquals(context.getReference(), 'TEST')
# short_title from content
self.assertEquals(context.getShortTitle(), 'short')
# title from metadata inside the document
self.assertEquals(context.getTitle(), 'title')
# contributors from user
self.assertEquals(context.getContributor(), 'person_module/john')
# title from metadata inside the doc
self.assertEquals(context.getTitle(), 'title')
def stepEditMetadata(self, sequence=None, sequence_list=None, **kw):
"""
we change metadata in a doc which has ODF
we change metadata in a document which has ODF
"""
context = self.getDocument('one')
kw = dict(title='another title',
......@@ -627,7 +651,7 @@ class TestIngestion(ERP5TypeTestCase):
context = self.getDocument('one')
newcontent = context.getBaseData()
cs = cStringIO.StringIO()
cs.write(unpackData(newcontent))
cs.write(_unpackData(newcontent))
z = zipfile.ZipFile(cs)
s = z.read('meta.xml')
xmlob = parseString(s)
......@@ -643,55 +667,55 @@ class TestIngestion(ERP5TypeTestCase):
ingest all supported text formats
make sure they are converted
"""
formats_from = ['rtf', 'doc', 'txt', 'sxw', 'sdw']
self.ingestFormatList('one', formats_from)
format_list = ['rtf', 'doc', 'txt', 'sxw', 'sdw']
self.ingestFormatList('one', format_list)
def stepIngestSpreadsheetFormats(self, sequence=None, sequence_list=None, **kw):
"""
ingest all supported spreadsheet formats
make sure they are converted
"""
formats_from = ['xls', 'sxc', 'sdc']
self.ingestFormatList('two', formats_from)
format_list = ['xls', 'sxc', 'sdc']
self.ingestFormatList('two', format_list)
def stepIngestPresentationFormats(self, sequence=None, sequence_list=None, **kw):
"""
ingest all supported presentation formats
make sure they are converted
"""
formats_from = ['ppt', 'sxi', 'sdd']
self.ingestFormatList('three', formats_from)
format_list = ['ppt', 'sxi', 'sdd']
self.ingestFormatList('three', format_list)
def stepIngestPDFFormats(self, sequence=None, sequence_list=None, **kw):
"""
ingest all supported PDF formats
make sure they are converted
"""
formats_from = ['pdf']
self.ingestFormatList('five', formats_from)
format_list = ['pdf']
self.ingestFormatList('five', format_list)
def stepIngestDrawingFormats(self, sequence=None, sequence_list=None, **kw):
"""
ingest all supported presentation formats
make sure they are converted
"""
formats_from = ['sxd', 'sda']
self.ingestFormatList('four', formats_from)
format_list = ['sxd', 'sda']
self.ingestFormatList('four', format_list)
def stepIngestPDFFormats(self, sequence=None, sequence_list=None, **kw):
"""
ingest all supported pdf formats
make sure they are converted
"""
formats_from = ['pdf']
self.ingestFormatList('five', formats_from)
format_list = ['pdf']
self.ingestFormatList('five', format_list)
def stepIngestImageFormats(self, sequence=None, sequence_list=None, **kw):
"""
ingest all supported image formats
"""
formats_from = ['jpg', 'gif', 'bmp', 'png']
self.ingestFormatList('six', formats_from, 'Image')
format_list = ['jpg', 'gif', 'bmp', 'png']
self.ingestFormatList('six', format_list, 'Image')
def stepCheckTextDocumentExportList(self, sequence=None, sequence_list=None, **kw):
self.checkDocumentExportList('one', 'doc', ['pdf', 'doc', 'rtf', 'html-writer', 'txt'])
......@@ -710,12 +734,12 @@ class TestIngestion(ERP5TypeTestCase):
see if it is possible to make a jpg
test conversion to html
"""
doc = self.getDocument('five')
document = self.getDocument('five')
f = makeFileUpload('TEST-en-002.pdf')
doc.edit(file=f)
res = doc.convert('jpg')
document.edit(file=f)
res = document.convert('jpg')
self.failUnless(res)
res_html = doc.asHTML()
res_html = document.asHTML()
self.failUnless('magic' in res_html)
def stepExportImage(self, sequence=None, sequence_list=None, **kw):
......@@ -724,7 +748,7 @@ class TestIngestion(ERP5TypeTestCase):
of REQUEST and RESPONSE, and the rest of the implementation is way down
in Zope core
"""
shout('not implemented')
printAndLog('not implemented')
def stepCheckHasSnapshot(self, sequence=None, sequence_list=None, **kw):
context = self.getDocument('one')
......@@ -761,7 +785,7 @@ class TestIngestion(ERP5TypeTestCase):
"""
self.contributeFileList(with_portal_type=False)
def stepSetDiscoveryScriptsForOrdering(self, sequence=None, sequence_list=None, **kw):
def stepSetSimulatedDiscoveryScriptForOrdering(self, sequence=None, sequence_list=None, **kw):
"""
set scripts which are supposed to overwrite each other's metadata
desing is the following:
......@@ -772,53 +796,53 @@ class TestIngestion(ERP5TypeTestCase):
contributor john jack james
short_title from_content from_input
"""
self.setDiscoveryScript('one', 'Text_getPropertyDictFromUserLogin', 'user_name=None', "return {'reference':'USER', 'language':'us', 'contributor':'person_module/john'}")
self.setDiscoveryScript('one', 'Text_getPropertyDictFromContent', '', "return {'reference':'CONT', 'version':'003', 'contributor':'person_module/jack', 'short_title':'from_content'}")
self.newPythonScript('one', 'Text_getPropertyDictFromUserLogin', 'user_name=None', "return {'reference':'USER', 'language':'us', 'contributor':'person_module/john'}")
self.newPythonScript('one', 'Text_getPropertyDictFromContent', '', "return {'reference':'CONT', 'version':'003', 'contributor':'person_module/jack', 'short_title':'from_content'}")
def stepCheckMetadataSettingOrderFICU(self, sequence=None, sequence_list=None, **kw):
"""
This is the default
"""
expected = dict(reference='TEST', language='en', version='002', short_title='from_input', contributor='person_module/james')
expected_metadata = dict(reference='TEST', language='en', version='002', short_title='from_input', contributor='person_module/james')
self.setDiscoveryOrder(['file_name', 'input', 'content', 'user_login'])
self.discoverMetadata()
self.checkMetadataOrder(expected)
self.checkMetadataOrder(expected_metadata)
def stepCheckMetadataSettingOrderCUFI(self, sequence=None, sequence_list=None, **kw):
"""
Content - User - Filename - Input
"""
expected = dict(reference='CONT', language='us', version='003', short_title='from_content', contributor='person_module/jack')
expected_metadata = dict(reference='CONT', language='us', version='003', short_title='from_content', contributor='person_module/jack')
self.setDiscoveryOrder(['content', 'user_login', 'file_name', 'input'])
self.discoverMetadata()
self.checkMetadataOrder(expected)
self.checkMetadataOrder(expected_metadata)
def stepCheckMetadataSettingOrderUIFC(self, sequence=None, sequence_list=None, **kw):
"""
User - Input - Filename - Content
"""
expected = dict(reference='USER', language='us', version='004', short_title='from_input', contributor='person_module/john')
expected_metadata = dict(reference='USER', language='us', version='004', short_title='from_input', contributor='person_module/john')
self.setDiscoveryOrder(['user_login', 'input', 'file_name', 'content'])
self.discoverMetadata()
self.checkMetadataOrder(expected)
self.checkMetadataOrder(expected_metadata)
def stepCheckMetadataSettingOrderICUF(self, sequence=None, sequence_list=None, **kw):
"""
Input - Content - User - Filename
"""
expected = dict(reference='INPUT', language='in', version='004', short_title='from_input', contributor='person_module/james')
expected_metadata = dict(reference='INPUT', language='in', version='004', short_title='from_input', contributor='person_module/james')
self.setDiscoveryOrder(['input', 'content', 'user_login', 'file_name'])
self.discoverMetadata()
self.checkMetadataOrder(expected)
self.checkMetadataOrder(expected_metadata)
def stepCheckMetadataSettingOrderUFCI(self, sequence=None, sequence_list=None, **kw):
"""
User - Filename - Content - Input
"""
expected = dict(reference='USER', language='us', version='002', short_title='from_content', contributor='person_module/john')
expected_metadata = dict(reference='USER', language='us', version='002', short_title='from_content', contributor='person_module/john')
self.setDiscoveryOrder(['user_login', 'file_name', 'content', 'input'])
self.discoverMetadata()
self.checkMetadataOrder(expected)
self.checkMetadataOrder(expected_metadata)
def stepReceiveEmailFromUnknown(self, sequence=None, sequence_list=None, **kw):
"""
......@@ -853,9 +877,9 @@ class TestIngestion(ERP5TypeTestCase):
"""
res = self.portal_catalog(reference='MAIL')
self.assertEquals(len(res), 1) # check if it is there
doc = res[0].getObject()
document = res[0].getObject()
john_is_owner = 0
for role in doc.get_local_roles():
for role in document.get_local_roles():
if role[0] == 'john_doe' and 'Owner' in role[1]:
john_is_owner = 1
break
......@@ -866,31 +890,61 @@ class TestIngestion(ERP5TypeTestCase):
## Tests
##################################
def test_01_checkBasics(self, quiet=QUIET, run=RUN_ALL_TEST):
if testrun and 1 not in testrun:return
def test_01_PreferenceSetup(self, quiet=QUIET, run=1): # RUN_ALL_TEST)
"""
Make sure that preferences are set up properly and accessible
"""
if not run: return
if not quiet: shout('test_01_checkBasics')
sequence_list = SequenceList()
step_list = [ 'stepCheckPreferences'
,'stepCheckContentTypeRegistry'
]
sequence_string = ' '.join(step_list)
sequence_list.addSequenceString(sequence_string)
sequence_list.play(self, quiet=quiet)
if not quiet: printAndLog('test_01_PreferenceSetup')
preference_tool = self.portal.portal_preferences
self.assertEquals(preference_tool.getPreferredOoodocServerAddress(), conversion_server_host[0])
self.assertEquals(preference_tool.getPreferredOoodocServerPortNumber(), conversion_server_host[1])
self.assertEquals(preference_tool.getPreferredDocumentFileNameRegularExpression(),
"(?P<reference>[A-Z]{3,6})-(?P<language>[a-z]{2})-(?P<version>[0-9]{3})")
def test_02_TextDoc(self, quiet=QUIET, run=RUN_ALL_TEST):
def test_02_FileExtensionRegistry(self, quiet=QUIET, run=1): # RUN_ALL_TEST
"""
check if we successfully imported registry
and that it has all the entries we need
"""
if not run: return
if not quiet: printAndLog('test_02_FileExtensionRegistry')
reg = self.portal.content_type_registry
correct_type_mapping = {
'doc' : 'Text',
'txt' : 'Text',
'odt' : 'Text',
'sxw' : 'Text',
'rtf' : 'Text',
'gif' : 'Image',
'jpg' : 'Image',
'png' : 'Image',
'bmp' : 'Image',
'pdf' : 'PDF',
'xls' : 'Spreadsheet',
'ods' : 'Spreadsheet',
'sdc' : 'Spreadsheet',
'ppt' : 'Presentation',
'odp' : 'Presentation',
'sxi' : 'Presentation',
'xxx' : 'File',
}
for type, portal_type in correct_type_mapping.items():
file_name = 'aaa.' + type
self.assertEquals(reg.findTypeName(file_name, None, None), portal_type)
def test_03_TextDoc(self, quiet=QUIET, run=1):
"""h
Test basic behaviour of a document:
- create empty doc
- create empty document
- upload a file directly
- upload a file using upload dialog
- make sure revision was increased
- check that it was properly converted
- check if coordinates were extracted from file name
"""
if testrun and 2 not in testrun:return
if not run: return
if not quiet: shout('test_02_TextDoc')
if not quiet: printAndLog('test_03_TextDoc')
sequence_list = SequenceList()
step_list = [ 'stepCreateTextDocument'
,'stepCheckEmptyState'
......@@ -905,29 +959,24 @@ class TestIngestion(ERP5TypeTestCase):
sequence_list.addSequenceString(sequence_string)
sequence_list.play(self, quiet=quiet)
def test_03_MetadataExtraction(self, quiet=QUIET, run=RUN_ALL_TEST):
def test_04_MetadataExtraction(self, quiet=QUIET, run=1):
"""
Test metadata extraction from various sources:
- from filename (doublecheck)
- from user (by overwriting type-based method)
- from content (same way)
- from file name (doublecheck)
- from user (by overwriting type-based method
and simulating the result)
- from content (by overwriting type-based method
and simulating the result)
- from file metadata
We try to verity that all this works
(order will be tested later)
XXX Metadata of document (title, subject, description)
are retrieved and set upon conversion, and they
are not taken into account in the procedure
so they overwrite what was set before content and
are overwritten by what was after
XXX why do we use keywords to set subject_list???
"""
if testrun and 3 not in testrun:return
NOTE: metadata of document (title, subject, description)
are no longer retrieved and set upon conversion
"""
if not run: return
if not quiet: shout('test_03_MetadataExtraction')
if not quiet: printAndLog('test_04_MetadataExtraction')
sequence_list = SequenceList()
step_list = [ 'stepCreateTextDocument'
,'stepSetDiscoveryScripts'
,'stepSetSimulatedDiscoveryScript'
,'stepTestMetadataSetting'
]
sequence_string = ' '.join(step_list)
......@@ -940,9 +989,8 @@ class TestIngestion(ERP5TypeTestCase):
Edit metadata on the object
Download ODF, make sure it is changed
"""
if testrun and 4 not in testrun:return
if not run: return
if not quiet: shout('test_04_MetadataEditing')
if not quiet: printAndLog('test_04_MetadataEditing')
sequence_list = SequenceList()
step_list = [ 'stepCreateTextDocument'
,'stepDialogUpload'
......@@ -964,9 +1012,8 @@ class TestIngestion(ERP5TypeTestCase):
implement _convertToBase (e.g. Image)
Verify that you can not upload file of the wrong format.
"""
if testrun and 5 not in testrun:return
if not run: return
if not quiet: shout('test_05_FormatIngestion')
if not quiet: printAndLog('test_05_FormatIngestion')
sequence_list = SequenceList()
step_list = ['stepCreateTextDocument'
,'stepIngestTextFormats'
......@@ -992,9 +1039,8 @@ class TestIngestion(ERP5TypeTestCase):
actual generation is tested in oood tests
PDF and Image should be tested here
"""
if testrun and 6 not in testrun:return
if not run: return
if not quiet: shout('test_06_FormatGeneration')
if not quiet: printAndLog('test_06_FormatGeneration')
sequence_list = SequenceList()
step_list = [ 'stepCreateTextDocument'
,'stepCheckTextDocumentExportList'
......@@ -1019,9 +1065,8 @@ class TestIngestion(ERP5TypeTestCase):
try to generate it again, remove and
generate once more
"""
if testrun and 7 not in testrun:return
if not run: return
if not quiet: shout('test_07_SnapshotGeneration')
if not quiet: printAndLog('test_07_SnapshotGeneration')
sequence_list = SequenceList()
step_list = [ 'stepCreateTextDocument'
,'stepDialogUpload'
......@@ -1052,9 +1097,8 @@ class TestIngestion(ERP5TypeTestCase):
- the files were converted
- metadata was read
"""
if testrun and 9 not in testrun:return
if not run: return
if not quiet: shout('test_09_Contribute')
if not quiet: printAndLog('test_09_Contribute')
sequence_list = SequenceList()
step_list = ['stepContributeFileListWithNoType'
,'stepContributeFileListWithType'
......@@ -1066,35 +1110,34 @@ class TestIngestion(ERP5TypeTestCase):
def test_10_MetadataSettingPreferenceOrder(self, quiet=QUIET, run=RUN_ALL_TEST):
"""
Set some metadata discovery scripts
Contribute a doc, let it get metadata using default setup
Contribute a document, let it get metadata using default setup
(default is FUC)
check that the right ones are there
change preference order, check again
"""
if testrun and 10 not in testrun:return
if not run: return
if not quiet: shout('test_10_MetadataSettingPreferenceOrder')
if not quiet: printAndLog('test_10_MetadataSettingPreferenceOrder')
sequence_list = SequenceList()
step_list = [ 'stepCreateTextDocument'
,'stepStraightUpload'
,'stepSetDiscoveryScriptsForOrdering'
,'stepSetSimulatedDiscoveryScriptForOrdering'
,'stepCheckMetadataSettingOrderFICU'
,'stepCreateTextDocument'
,'stepStraightUpload'
,'stepSetDiscoveryScriptsForOrdering'
,'stepSetSimulatedDiscoveryScriptForOrdering'
,'stepCheckMetadataSettingOrderCUFI'
,'stepCreateTextDocument'
,'stepStraightUpload'
,'stepSetDiscoveryScriptsForOrdering'
,'stepSetSimulatedDiscoveryScriptForOrdering'
,'stepCheckMetadataSettingOrderUIFC'
,'stepCreateTextDocument'
,'stepStraightUpload'
,'stepSetDiscoveryScriptsForOrdering'
,'stepSetSimulatedDiscoveryScriptForOrdering'
,'stepCheckMetadataSettingOrderICUF'
,'stepCreateTextDocument'
,'stepStraightUpload'
,'stepSetDiscoveryScriptsForOrdering'
,'stepSetSimulatedDiscoveryScriptForOrdering'
,'stepCheckMetadataSettingOrderUFCI'
]
sequence_string = ' '.join(step_list)
......@@ -1107,9 +1150,8 @@ class TestIngestion(ERP5TypeTestCase):
Check that document objects are created and appropriate data are set
(owner, and anything discovered from user and mail body)
"""
if testrun and 11 not in testrun:return
if not run: return
if not quiet: shout('test_09_Contribute')
if not quiet: printAndLog('test_09_Contribute')
sequence_list = SequenceList()
step_list = [ 'stepReceiveEmailFromUnknown'
,'stepCreatePerson'
......@@ -1130,5 +1172,8 @@ else:
suite.addTest(unittest.makeSuite(TestIngestion))
return suite
# vim: filetype=python syntax=python shiftwidth=2
# Missing tests
"""
property_dict = context.getPropertyDictFromUserLogin()
property_dict = context.getPropertyDictFromInput()
"""
\ No newline at end of file
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment