############################################################################## # # Copyright (c) 2020 Nexedi SA and Contributors. All Rights Reserved. # # WARNING: This program as such is intended to be used by professional # programmers who take the whole responsibility of assessing all potential # consequences resulting from its eventual inadequacies and bugs # End users who are looking for a ready-to-use solution with commercial # guarantees and support are strongly adviced to contract a Free Software # Service Company # # This program is Free Software; you can redistribute it and/or # modify it under the terms of the GNU General Public License # as published by the Free Software Foundation; either version 3 # of the License, or (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program; if not, write to the Free Software # Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA. # ############################################################################## import codecs import csv import multiprocessing import os import json import xmlrpc.client as xmlrpclib import urllib.parse as urllib_parse import ssl import base64 import io import requests import PyPDF2 from slapos.testing.testcase import makeModuleSetUpAndTestCaseClass setUpModule, _CloudOooTestCase = makeModuleSetUpAndTestCaseClass( os.path.abspath( os.path.join(os.path.dirname(__file__), '..', 'software.cfg'))) class CloudOooTestCase(_CloudOooTestCase): # Cloudooo needs a lot of time before being available. instance_max_retry = 30 def setUp(self): self.url = json.loads( self.computer_partition.getConnectionParameterDict()["_"])['cloudooo'] # XXX ignore certificate errors ssl_context = ssl.create_default_context() ssl_context.check_hostname = False ssl_context.verify_mode = ssl.CERT_NONE self.server = xmlrpclib.ServerProxy( self.url, context=ssl_context, allow_none=True, ) self.addCleanup(self.server('close')) def normalizeFontName(font_name): if '+' in font_name: return font_name.split('+')[1] if font_name.startswith('/'): return font_name[1:] def getReferencedFonts(pdf_file_reader): """Return fonts referenced in this pdf """ fonts = set() def collectFonts(obj): """Recursively visit PDF objects and collect referenced fonts in `fonts` """ if hasattr(obj, 'keys'): if '/BaseFont' in obj: fonts.add(obj['/BaseFont']) for k in obj.keys(): collectFonts(obj[k]) for page in pdf_file_reader.pages: collectFonts(page.getObject()['/Resources']) return {normalizeFontName(font) for font in fonts} class HTMLtoPDFConversionFontTestMixin: """Mix-In class to test how fonts are selected during HTML to PDF conversions. This needs to be mixed with a test case defining: * pdf_producer : the name of /Producer in PDF metadata * expected_font_mapping : a mapping of resulting font name in pdf, keyed by font-family in the input html * _convert_html_to_pdf: a method to to convert html to pdf """ def _convert_html_to_pdf(self, src_html): # type: (str) -> bytes """Convert the HTML source to pdf bytes. """ def test(self): actual_font_mapping_mapping = {} for font in self.expected_font_mapping: src_html = f''' <style> p {{ font-family: "{font}"; font-size: 20pt; }} </style> <p>the quick brown fox jumps over the lazy dog.</p> <p>THE QUICK BROWN FOX JUMPS OVER THE LAZY DOG.</p> ''' pdf_data = self._convert_html_to_pdf(src_html) pdf_reader = PyPDF2.PdfFileReader(io.BytesIO(pdf_data)) self.assertEqual( self.pdf_producer, pdf_reader.getDocumentInfo()['/Producer']) fonts_in_pdf = getReferencedFonts(pdf_reader) if len(fonts_in_pdf) == 1: actual_font_mapping_mapping[font] = fonts_in_pdf.pop() else: actual_font_mapping_mapping[font] = fonts_in_pdf self.maxDiff = None self.assertEqual(self.expected_font_mapping, actual_font_mapping_mapping) class TestWkhtmlToPDF(HTMLtoPDFConversionFontTestMixin, CloudOooTestCase): __partition_reference__ = 'wk' pdf_producer = 'Qt 4.8.7' expected_font_mapping = { 'Arial': 'LiberationSans', 'Arial Black': 'LiberationSans', 'Avant Garde': 'LiberationSans', 'Bookman': 'LiberationSans', 'Carlito': 'Carlito', 'Comic Sans MS': 'LiberationSans', 'Courier New': 'LiberationSans', 'DejaVu Sans': 'DejaVuSans', 'DejaVu Sans Condensed': 'LiberationSans', 'DejaVu Sans ExtraLight': 'LiberationSans', 'DejaVu Sans Mono': 'DejaVuSansMono', 'DejaVu Serif': 'DejaVuSerif', 'DejaVu Serif Condensed': 'LiberationSans', 'Garamond': 'LiberationSans', 'Gentium Basic': 'GentiumBasic', 'Gentium Book Basic': 'GentiumBookBasic', 'Georgia': 'LiberationSans', 'Helvetica': 'LiberationSans', 'IPAex Gothic': 'LiberationSans', 'IPAex Mincho': 'LiberationSans', 'Impact': 'LiberationSans', 'Liberation Mono': 'LiberationMono', 'Liberation Sans': 'LiberationSans', 'Liberation Sans Narrow': 'LiberationSansNarrow', 'Liberation Serif': 'LiberationSerif', 'Linux LibertineG': 'LiberationSans', 'OpenSymbol': {'DejaVuSans', 'OpenSymbol'}, 'Palatino': 'LiberationSans', 'Roboto Black': 'LiberationSans', 'Roboto Condensed Light': 'LiberationSans', 'Roboto Condensed Regular': 'LiberationSans', 'Roboto Light': 'LiberationSans', 'Roboto Medium': 'LiberationSans', 'Roboto Thin': 'LiberationSans', 'Times New Roman': 'LiberationSans', 'Trebuchet MS': 'LiberationSans', 'Verdana': 'LiberationSans', 'ZZZdefault fonts when no match': 'LiberationSans' } def _convert_html_to_pdf(self, src_html): return base64.decodebytes( self.server.convertFile( base64.encodebytes(src_html.encode()).decode(), 'html', 'pdf', False, False, { 'encoding': 'utf-8' }, ).encode()) class TestLibreoffice(HTMLtoPDFConversionFontTestMixin, CloudOooTestCase): __partition_reference__ = 'lo' pdf_producer = 'LibreOffice 5.2' expected_font_mapping = { 'Arial': 'LiberationSans', 'Arial Black': 'DejaVuSans', 'Avant Garde': 'DejaVuSans', 'Bookman': 'DejaVuSans', 'Carlito': 'Carlito', 'Comic Sans MS': 'DejaVuSans', 'Courier New': 'LiberationMono', 'DejaVu Sans': 'DejaVuSans', 'DejaVu Sans Condensed': 'DejaVuSansCondensed', 'DejaVu Sans ExtraLight': 'DejaVuSans', 'DejaVu Sans Mono': 'DejaVuSansMono', 'DejaVu Serif': 'DejaVuSerif', 'DejaVu Serif Condensed': 'DejaVuSerifCondensed', 'Garamond': 'DejaVuSerif', 'Gentium Basic': 'GentiumBasic', 'Gentium Book Basic': 'GentiumBookBasic', 'Georgia': 'DejaVuSerif', 'Helvetica': 'LiberationSans', 'IPAex Gothic': 'IPAexGothic', 'IPAex Mincho': 'IPAexMincho', 'Impact': 'DejaVuSans', 'Liberation Mono': 'LiberationMono', 'Liberation Sans': 'LiberationSans', 'Liberation Sans Narrow': 'LiberationSansNarrow', 'Liberation Serif': 'LiberationSerif', 'Linux LibertineG': 'LinuxLibertineG', 'OpenSymbol': 'OpenSymbol', 'Palatino': 'DejaVuSerif', 'Roboto Black': 'Roboto-Black', 'Roboto Condensed Light': 'RobotoCondensed-Light', 'Roboto Condensed Regular': 'DejaVuSans', 'Roboto Light': 'Roboto-Light', 'Roboto Medium': 'Roboto-Medium', 'Roboto Thin': 'Roboto-Thin', 'Times New Roman': 'LiberationSerif', 'Trebuchet MS': 'DejaVuSans', 'Verdana': 'DejaVuSans', 'ZZZdefault fonts when no match': 'DejaVuSans' } def _convert_html_to_pdf(self, src_html): return base64.decodebytes( self.server.convertFile( base64.encodebytes(src_html.encode()).decode(), 'html', 'pdf', ).encode()) class TestLibreOfficeTextConversion(CloudOooTestCase): __partition_reference__ = 'txt' def test_html_to_text(self): self.assertEqual( base64.decodebytes( self.server.convertFile( base64.encodebytes( '<html>héhé</html>'.encode()).decode(), 'html', 'txt', ).encode()), codecs.BOM_UTF8 + b'h\xc3\xa9h\xc3\xa9\n', ) class TestLibreOfficeCluster(CloudOooTestCase): __partition_reference__ = 'lc' @classmethod def getInstanceParameterDict(cls): return {'backend-count': 4} def test_multiple_conversions(self): # make this function global so that it can be picked and used by multiprocessing global _convert_html_to_text def _convert_html_to_text(src_html): return base64.decodebytes( self.server.convertFile( base64.encodebytes(src_html.encode()).decode(), 'html', 'txt', ).encode()) pool = multiprocessing.Pool(5) with pool: converted = pool.map( _convert_html_to_text, ['<html><body>hello</body></html>'] * 100) self.assertEqual(converted, [codecs.BOM_UTF8 + b'hello\n'] * 100) # haproxy stats are exposed res = requests.get( urllib_parse.urljoin(self.url, '/haproxy;csv'), verify=False, ) reader = csv.DictReader(io.StringIO(res.text)) line_list = list(reader) # requests have been balanced total_hrsp_2xx = { line['svname']: int(line['hrsp_2xx']) for line in line_list } self.assertEqual(total_hrsp_2xx['FRONTEND'], 100) self.assertEqual(total_hrsp_2xx['BACKEND'], 100) for backend in 'cloudooo_1', 'cloudooo_2', 'cloudooo_3', 'cloudooo_4': # ideally there should be 25% of requests on each backend, because we use # round robin scheduling, but it can happen that some backend take longer # to start, so we are tolerant here and just check that each backend # process at least one request. self.assertGreater(total_hrsp_2xx[backend], 0) # no errors total_eresp = { line['svname']: int(line['eresp'] or 0) for line in line_list } self.assertEqual( total_eresp, { 'FRONTEND': 0, 'cloudooo_1': 0, 'cloudooo_2': 0, 'cloudooo_3': 0, 'cloudooo_4': 0, 'BACKEND': 0, })