Commit 904b2c40 authored by Gabriel Monnerat's avatar Gabriel Monnerat

continuation of r42064. Avoid usage of shell == True.

git-svn-id: https://svn.erp5.org/repos/public/erp5/trunk/utils@42067 20353a03-c40f-0410-a6d1-a30d3c3de9de
parent 12bc0b89
...@@ -85,7 +85,7 @@ class OOHandler: ...@@ -85,7 +85,7 @@ class OOHandler:
for k, v in kw.iteritems(): for k, v in kw.iteritems():
command_list.append("--%s='%s'" % (k, v)) command_list.append("--%s='%s'" % (k, v))
return ' '.join(command_list) return command_list
def _startTimeout(self): def _startTimeout(self):
"""start the Monitor""" """start the Monitor"""
...@@ -98,14 +98,13 @@ class OOHandler: ...@@ -98,14 +98,13 @@ class OOHandler:
self.monitor.terminate() self.monitor.terminate()
return return
def _subprocess(self, command): def _subprocess(self, command_list):
"""Run one procedure""" """Run one procedure"""
if monitor_sleeping_time is not None: if monitor_sleeping_time is not None:
monitor_sleeping_time.touch() monitor_sleeping_time.touch()
try: try:
self._startTimeout() self._startTimeout()
process = Popen(command, process = Popen(command_list,
shell=True,
stdout=PIPE, stdout=PIPE,
stderr=PIPE, stderr=PIPE,
close_fds=True) close_fds=True)
...@@ -121,8 +120,8 @@ class OOHandler: ...@@ -121,8 +120,8 @@ class OOHandler:
if not openoffice.status(): if not openoffice.status():
xvfb.restart() xvfb.restart()
openoffice.start() openoffice.start()
command = self._getCommand(*feature_list, **kw) command_list = self._getCommand(*feature_list, **kw)
stdout, stderr = self._subprocess(command) stdout, stderr = self._subprocess(command_list)
if not stdout and len(re.findall("\w*Exception|\w*Error", stderr)) >= 1: if not stdout and len(re.findall("\w*Exception|\w*Error", stderr)) >= 1:
logger.debug(stderr) logger.debug(stderr)
self.document.restoreOriginal() self.document.restoreOriginal()
......
...@@ -83,8 +83,8 @@ class TestAllFormats(CloudoooTestCase): ...@@ -83,8 +83,8 @@ class TestAllFormats(CloudoooTestCase):
continue continue
output_file_url = '%s/test_%s.%s' % (self.tmp_url, document_type, extension[0]) output_file_url = '%s/test_%s.%s' % (self.tmp_url, document_type, extension[0])
open(output_file_url, 'w').write(decodestring(data_output)) open(output_file_url, 'w').write(decodestring(data_output))
stdout, stderr = Popen("file %s" % output_file_url, command = [file, output_file_url]
shell=True, stdout, stderr = Popen(command,
stdout=PIPE, stdout=PIPE,
stderr=PIPE).communicate() stderr=PIPE).communicate()
self.assertEquals(stdout.endswith(": empty"), False, stdout) self.assertEquals(stdout.endswith(": empty"), False, stdout)
......
...@@ -106,8 +106,9 @@ class TestFileSystemDocument(unittest.TestCase): ...@@ -106,8 +106,9 @@ class TestFileSystemDocument(unittest.TestCase):
open(path.join(self.fsdocument.directory_name, 'document2'), 'w').write('test') open(path.join(self.fsdocument.directory_name, 'document2'), 'w').write('test')
zip_file = self.fsdocument.getContent(True) zip_file = self.fsdocument.getContent(True)
open(zip_output_url, 'w').write(zip_file) open(zip_output_url, 'w').write(zip_file)
stdout, stderr = Popen("file %s" % zip_output_url, command = ["file", zip_output_url]
shell=True, stdout=PIPE).communicate() stdout, stderr = Popen(command,
stdout=PIPE).communicate()
self.assertEquals(stdout, '/tmp/ziptest.zip: Zip archive data, at least v2.0 to extract\n') self.assertEquals(stdout, '/tmp/ziptest.zip: Zip archive data, at least v2.0 to extract\n')
ziptest = ZipFile(zip_output_url, 'r') ziptest = ZipFile(zip_output_url, 'r')
self.assertEquals(len(ziptest.filelist), 2) self.assertEquals(len(ziptest.filelist), 2)
......
...@@ -47,8 +47,9 @@ class TestHighLoad(CloudoooTestCase): ...@@ -47,8 +47,9 @@ class TestHighLoad(CloudoooTestCase):
document = self.proxy.convertFile(data, source_format, destination_format) document = self.proxy.convertFile(data, source_format, destination_format)
document_output_url = os.path.join(self.tmp_url, "%s.%s" % (id, destination_format)) document_output_url = os.path.join(self.tmp_url, "%s.%s" % (id, destination_format))
open(document_output_url, 'wb').write(decodestring(document)) open(document_output_url, 'wb').write(decodestring(document))
stdout, stderr = subprocess.Popen("file -b %s" % document_output_url, command_list = ["file", "-b", document_output_url]
shell=True, stdout=subprocess.PIPE).communicate() stdout, stderr = subprocess.Popen(command_list,
stdout=subprocess.PIPE).communicate()
self.assertEquals(stdout, 'PDF document, version 1.4\n') self.assertEquals(stdout, 'PDF document, version 1.4\n')
self.assertEquals(stderr, None) self.assertEquals(stderr, None)
os.remove(document_output_url) os.remove(document_output_url)
......
...@@ -53,8 +53,8 @@ class TestOOHandler(CloudoooTestCase): ...@@ -53,8 +53,8 @@ class TestOOHandler(CloudoooTestCase):
def _assert_document_output(self, document_output_url, msg): def _assert_document_output(self, document_output_url, msg):
"""Check if the document was created correctly""" """Check if the document was created correctly"""
stdout, stderr = Popen("file -b %s" % document_output_url, command_list = ["file", "-b", document_output_url]
shell=True, stdout, stderr = Popen(command_list
stdout=PIPE).communicate() stdout=PIPE).communicate()
self.assertEquals(msg in stdout, self.assertEquals(msg in stdout,
True, True,
......
...@@ -108,11 +108,15 @@ class TestServer(CloudoooTestCase): ...@@ -108,11 +108,15 @@ class TestServer(CloudoooTestCase):
source_format, source_format,
destination_format, zip) destination_format, zip)
open(output_url, 'w').write(decodestring(output_data)) open(output_url, 'w').write(decodestring(output_data))
stdout, stderr = Popen("file -b %s" % output_url, shell=True, stdout, stderr = self._getFileType(output_url)
stdout=PIPE).communicate()
self.assertEquals(stdout, stdout_msg) self.assertEquals(stdout, stdout_msg)
self.assertEquals(stderr, None) self.assertEquals(stderr, None)
def _getFileType(self, output_url):
return Popen(["file", "-b", output_url],
stdout=PIPE,
stderr=PIPE).communicate()
def testGetAllowedExtensionListByType(self): def testGetAllowedExtensionListByType(self):
"""Call getAllowedExtensionList and verify if the returns is a list with """Call getAllowedExtensionList and verify if the returns is a list with
extension and ui_name. The request is by document type""" extension and ui_name. The request is by document type"""
...@@ -187,8 +191,7 @@ class TestServer(CloudoooTestCase): ...@@ -187,8 +191,7 @@ class TestServer(CloudoooTestCase):
True) True)
self.assertNotEquals(metadata_dict.get("Data"), None) self.assertNotEquals(metadata_dict.get("Data"), None)
open(document_output_url, 'w').write(decodestring(metadata_dict["Data"])) open(document_output_url, 'w').write(decodestring(metadata_dict["Data"]))
stdout, stderr = Popen("file -b %s" % document_output_url, shell=True, stdout, stderr = self._getFileType(document_output_url)
stdout=PIPE).communicate()
self.assertEquals(stdout, 'OpenDocument Text\n') self.assertEquals(stdout, 'OpenDocument Text\n')
self.assertEquals(metadata_dict.get("Title"), "cloudooo Test") self.assertEquals(metadata_dict.get("Title"), "cloudooo Test")
self.assertEquals(metadata_dict.get("Subject"), "Subject Test") self.assertEquals(metadata_dict.get("Subject"), "Subject Test")
...@@ -204,8 +207,7 @@ class TestServer(CloudoooTestCase): ...@@ -204,8 +207,7 @@ class TestServer(CloudoooTestCase):
odf_data = self.proxy.updateFileMetadata(encodestring(data), 'odt', odf_data = self.proxy.updateFileMetadata(encodestring(data), 'odt',
{"Title":"testSetMetadata"}) {"Title":"testSetMetadata"})
open(document_output_url, 'w').write(decodestring(odf_data)) open(document_output_url, 'w').write(decodestring(odf_data))
stdout, stderr = Popen("file -b %s" % document_output_url, shell=True, stdout, stderr = self._getFileType(document_output_url)
stdout=PIPE).communicate()
self.assertEquals(stdout, 'OpenDocument Text\n') self.assertEquals(stdout, 'OpenDocument Text\n')
metadata_dict = self.proxy.getFileMetadataItemList(odf_data, 'odt') metadata_dict = self.proxy.getFileMetadataItemList(odf_data, 'odt')
self.assertEquals(metadata_dict.get("Title"), "testSetMetadata") self.assertEquals(metadata_dict.get("Title"), "testSetMetadata")
...@@ -221,8 +223,7 @@ class TestServer(CloudoooTestCase): ...@@ -221,8 +223,7 @@ class TestServer(CloudoooTestCase):
'odt', 'odt',
{"Reference":"testSetMetadata"}) {"Reference":"testSetMetadata"})
open(document_output_url, 'w').write(decodestring(odf_data)) open(document_output_url, 'w').write(decodestring(odf_data))
stdout, stderr = Popen("file -b %s" % document_output_url, shell=True, stdout, stderr = self._getFileType(document_output_url)
stdout=PIPE).communicate()
self.assertEquals(stdout, 'OpenDocument Text\n') self.assertEquals(stdout, 'OpenDocument Text\n')
metadata_dict = self.proxy.getFileMetadataItemList(odf_data, 'odt') metadata_dict = self.proxy.getFileMetadataItemList(odf_data, 'odt')
self.assertEquals(metadata_dict.get("Reference"), "testSetMetadata") self.assertEquals(metadata_dict.get("Reference"), "testSetMetadata")
...@@ -237,8 +238,7 @@ class TestServer(CloudoooTestCase): ...@@ -237,8 +238,7 @@ class TestServer(CloudoooTestCase):
new_odf_data = self.proxy.updateFileMetadata(odf_data, 'odt', new_odf_data = self.proxy.updateFileMetadata(odf_data, 'odt',
{"Reference":"new value", "Something": "ABC"}) {"Reference":"new value", "Something": "ABC"})
open(document_output_url, 'w').write(decodestring(new_odf_data)) open(document_output_url, 'w').write(decodestring(new_odf_data))
stdout, stderr = Popen("file -b %s" % document_output_url, shell=True, stdout, stderr = self._getFileType(document_output_url)
stdout=PIPE).communicate()
self.assertEquals(stdout, 'OpenDocument Text\n') self.assertEquals(stdout, 'OpenDocument Text\n')
metadata_dict = self.proxy.getFileMetadataItemList(new_odf_data, 'odt') metadata_dict = self.proxy.getFileMetadataItemList(new_odf_data, 'odt')
self.assertEquals(metadata_dict.get("Reference"), "new value") self.assertEquals(metadata_dict.get("Reference"), "new value")
...@@ -437,8 +437,7 @@ class TestServer(CloudoooTestCase): ...@@ -437,8 +437,7 @@ class TestServer(CloudoooTestCase):
try: try:
png_path = join(self.tmp_url, "img0.png") png_path = join(self.tmp_url, "img0.png")
zipfile.extractall(self.tmp_url) zipfile.extractall(self.tmp_url)
stdout, stderr = Popen("file -b %s" % png_path, shell=True, stdout, stderr = self._getFileType(png_path)
stdout=PIPE).communicate()
self.assertTrue(stdout.startswith('PNG image data'), stdout) self.assertTrue(stdout.startswith('PNG image data'), stdout)
self.assertTrue("8-bit/color RGB" in stdout, stdout) self.assertTrue("8-bit/color RGB" in stdout, stdout)
finally: finally:
......
...@@ -73,13 +73,15 @@ class TestUnoConverter(CloudoooTestCase): ...@@ -73,13 +73,15 @@ class TestUnoConverter(CloudoooTestCase):
"--destination_format='%s'" % "doc", "--destination_format='%s'" % "doc",
"--source_format='%s'" % "odt", "--source_format='%s'" % "odt",
"--mimemapper='%s'" % mimemapper_pickled] "--mimemapper='%s'" % mimemapper_pickled]
stdout, stderr = Popen(' '.join(command), shell=True, stdout, stderr = Popen(command,
stdout=PIPE, stderr=PIPE).communicate() stdout=PIPE, stderr=PIPE).communicate()
self.assertEquals(stderr, '') self.assertEquals(stderr, '')
output_url = stdout.replace('\n', '') output_url = stdout.replace('\n', '')
self.assertTrue(exists(output_url), stdout) self.assertTrue(exists(output_url), stdout)
stdout, stderr = Popen("file %s" % output_url, shell=True, command = [file, output_url]
stdout=PIPE, stderr=PIPE).communicate() stdout, stderr = Popen(command,
stdout=PIPE,
stderr=PIPE).communicate()
self.assertEquals(self.file_msg_list[1] in stdout \ self.assertEquals(self.file_msg_list[1] in stdout \
or \ or \
self.file_msg_list[0] in stdout, self.file_msg_list[0] in stdout,
......
...@@ -61,8 +61,9 @@ class TestUnoMimeMapper(CloudoooTestCase): ...@@ -61,8 +61,9 @@ class TestUnoMimeMapper(CloudoooTestCase):
"'--office_binary_path=%s'" % self.office_binary_path, "'--office_binary_path=%s'" % self.office_binary_path,
"'--hostname=%s'" % self.hostname, "'--hostname=%s'" % self.hostname,
"'--port=%s'" % self.openoffice_port] "'--port=%s'" % self.openoffice_port]
stdout, stderr = Popen(' '.join(command), shell=True, stdout, stderr = Popen(command,
stdout=PIPE, stderr=PIPE).communicate() stdout=PIPE,
stderr=PIPE).communicate()
self.assertEquals(stderr, '') self.assertEquals(stderr, '')
filter_dict, type_dict = json.loads(stdout) filter_dict, type_dict = json.loads(stdout)
self.assertTrue('filter_dict' in locals()) self.assertTrue('filter_dict' in locals())
...@@ -81,8 +82,9 @@ class TestUnoMimeMapper(CloudoooTestCase): ...@@ -81,8 +82,9 @@ class TestUnoMimeMapper(CloudoooTestCase):
"handler/ooo/helper/unomimemapper.py"), "handler/ooo/helper/unomimemapper.py"),
"'--hostname=%s'" % self.hostname, "'--hostname=%s'" % self.hostname,
"'--port=%s'" % self.openoffice_port] "'--port=%s'" % self.openoffice_port]
stdout, stderr = Popen(' '.join(command), shell=True, stdout, stderr = Popen(command,
stdout=PIPE, stderr=PIPE).communicate() stdout=PIPE,
stderr=PIPE).communicate()
self.assertEquals(stderr, '') self.assertEquals(stderr, '')
filter_dict, type_dict = json.loads(stdout) filter_dict, type_dict = json.loads(stdout)
self.assertTrue('filter_dict' in locals()) self.assertTrue('filter_dict' in locals())
...@@ -106,8 +108,9 @@ class TestUnoMimeMapper(CloudoooTestCase): ...@@ -106,8 +108,9 @@ class TestUnoMimeMapper(CloudoooTestCase):
"'--office_binary_path=%s'" % self.office_binary_path, "'--office_binary_path=%s'" % self.office_binary_path,
"'--hostname=%s'" % self.hostname, "'--hostname=%s'" % self.hostname,
"'--port=%s'" % self.openoffice_port] "'--port=%s'" % self.openoffice_port]
stdout, stderr = Popen(' '.join(command), shell=True, stdout, stderr = Popen(command,
stdout=PIPE, stderr=PIPE).communicate() stdout=PIPE,
stderr=PIPE).communicate()
self.assertEquals(stdout, '') self.assertEquals(stdout, '')
self.assertTrue(stderr.endswith(error_msg), stderr) self.assertTrue(stderr.endswith(error_msg), stderr)
openoffice.start() openoffice.start()
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment