############################################################################## # # Copyright (c) 2018 Nexedi SA and Contributors. All Rights Reserved. # # WARNING: This program as such is intended to be used by professional # programmers who take the whole responsibility of assessing all potential # consequences resulting from its eventual inadequacies and bugs # End users who are looking for a ready-to-use solution with commercial # guarantees and support are strongly adviced to contract a Free Software # Service Company # # This program is Free Software; you can redistribute it and/or # modify it under the terms of the GNU General Public License # as published by the Free Software Foundation; either version 3 # of the License, or (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program; if not, write to the Free Software # Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA. # ############################################################################## import cgi import json import multiprocessing import os import tempfile import unittest import urlparse from BaseHTTPServer import BaseHTTPRequestHandler, HTTPServer from io import BytesIO import paramiko import requests from PIL import Image from selenium import webdriver from selenium.webdriver.common.by import By from selenium.webdriver.common.desired_capabilities import DesiredCapabilities from selenium.webdriver.support import expected_conditions as EC from selenium.webdriver.support.ui import WebDriverWait from utils import SlapOSInstanceTestCase, findFreeTCPPort debug_mode = os.environ.get('DEBUG') # for development: debugging logs and install Ctrl+C handler if debug_mode: import logging logging.basicConfig(level=logging.DEBUG) unittest.installHandler() class WebServerMixin(object): """Mixin class which provides a simple web server reachable at self.server_url """ def setUp(self): """Start a minimal web server. """ class TestHandler(BaseHTTPRequestHandler): """Request handler for our test server. The implemented server is: - submit q and you'll get a page with q as title - upload a file and the file content will be displayed in div.uploadedfile """ def log_message(self, *args, **kw): if debug_mode: BaseHTTPRequestHandler.log_message(self, *args, **kw) def do_GET(self): self.send_response(200) self.end_headers() self.wfile.write(''' <html> <title>Test page</title> <body> <form action="/" method="POST" enctype="multipart/form-data"> <input name="q" type="text"></input> <input name="f" type="file" ></input> <input type="submit" value="I'm feeling lucky"></input> </form> </body> </html>''') def do_POST(self): form = cgi.FieldStorage( fp=self.rfile, headers=self.headers, environ={'REQUEST_METHOD':'POST', 'CONTENT_TYPE':self.headers['Content-Type'],}) self.send_response(200) self.end_headers() file_data = 'no file' if form.has_key('f'): file_data = form['f'].file.read() self.wfile.write(''' <html> <title>%s</title> <div>%s</div> </html> ''' % (form['q'].value, file_data)) super(WebServerMixin, self).setUp() ip = os.environ.get('LOCAL_IPV4', '127.0.1.1') port = findFreeTCPPort(ip) server = HTTPServer((ip, port), TestHandler) self.server_process = multiprocessing.Process(target=server.serve_forever) self.server_process.start() self.server_url = 'http://%s:%s/' % (ip, port) def tearDown(self): self.server_process.terminate() self.server_process.join() super(WebServerMixin, self).tearDown() class BrowserCompatibilityMixin(WebServerMixin): """Mixin class to run validation tests on a specific browser """ desired_capabilities = NotImplemented user_agent = NotImplemented def setUp(self): super(BrowserCompatibilityMixin, self).setUp() self.driver = webdriver.Remote( command_executor=self.computer_partition.getConnectionParameterDict()['backend-url'], desired_capabilities=self.desired_capabilities) def tearDown(self): self.driver.quit() super(BrowserCompatibilityMixin, self).tearDown() def test_user_agent(self): self.assertIn( self.user_agent, self.driver.execute_script('return navigator.userAgent')) def test_simple_submit_scenario(self): self.driver.get(self.server_url) input_element = WebDriverWait(self.driver, 3).until( EC.visibility_of_element_located((By.NAME, 'q'))) input_element.send_keys(self.id()) input_element.submit() WebDriverWait(self.driver, 3).until(EC.title_is(self.id())) def test_upload_file(self): f = tempfile.NamedTemporaryFile(delete=False) f.write(self.id()) f.close() self.addCleanup(lambda: os.remove(f.name)) self.driver.get(self.server_url) self.driver.find_element_by_xpath('//input[@name="f"]').send_keys(f.name) self.driver.find_element_by_xpath('//input[@type="submit"]').click() self.assertEqual( self.id(), self.driver.find_element_by_xpath('//div').text) def test_screenshot(self): self.driver.get(self.server_url) screenshot = Image.open(BytesIO(self.driver.get_screenshot_as_png())) # just check it's not a white screen self.assertGreater(len(screenshot.getcolors(maxcolors=512)), 2) def test_window_and_screen_size(self): size = json.loads(self.driver.execute_script(''' return JSON.stringify({ 'screen.width': window.screen.width, 'screen.height': window.screen.height, 'screen.pixelDepth': window.screen.pixelDepth, 'innerWidth': window.innerWidth, 'innerHeight': window.innerHeight })''')) # Xvfb is configured like this self.assertEqual(1024, size['screen.width']) self.assertEqual(768, size['screen.height']) self.assertEqual(24, size['screen.pixelDepth']) # window size must not be 0 (wrong firefox integration report this) self.assertGreater(size['innerWidth'], 0) self.assertGreater(size['innerHeight'], 0) def test_resize_window(self): self.driver.set_window_size(800, 900) size = json.loads(self.driver.execute_script(''' return JSON.stringify({ 'outerWidth': window.outerWidth, 'outerHeight': window.outerHeight })''')) self.assertEqual(800, size['outerWidth']) self.assertEqual(900, size['outerHeight']) def test_multiple_clients(self): parameter_dict = self.computer_partition.getConnectionParameterDict() webdriver_url = parameter_dict['backend-url'] queue = multiprocessing.Queue() def _test(q, server_url): driver = webdriver.Remote( command_executor=webdriver_url, desired_capabilities=self.desired_capabilities) try: driver.get(server_url) q.put(driver.title == 'Test page') finally: driver.quit() nb_workers = 10 workers = [] for i in range(nb_workers): worker = multiprocessing.Process( target=_test, args=(queue, self.server_url)) worker.start() workers.append(worker) del worker # pylint _ = [worker.join(timeout=30) for worker in workers] # terminate workers if they are still alive after 30 seconds _ = [worker.terminate() for worker in workers if worker.is_alive()] _ = [worker.join() for worker in workers] del _ # pylint self.assertEqual( [True] * nb_workers, [queue.get() for _ in range(nb_workers)]) class SeleniumServerTestCase(SlapOSInstanceTestCase): """Test the remote driver on a minimal web server. """ @classmethod def getSoftwareURLList(cls): return (os.path.abspath(os.path.join(os.path.dirname(__file__), '..', 'software.cfg')), ) class TestBrowserSelection(WebServerMixin, SeleniumServerTestCase): """Test browser can be selected by `desiredCapabilities`` """ def test_chrome(self): parameter_dict = self.computer_partition.getConnectionParameterDict() webdriver_url = parameter_dict['backend-url'] driver = webdriver.Remote( command_executor=webdriver_url, desired_capabilities=DesiredCapabilities.CHROME) driver.get(self.server_url) self.assertEqual('Test page', driver.title) self.assertIn( 'Chrome', driver.execute_script('return navigator.userAgent')) self.assertNotIn( 'Firefox', driver.execute_script('return navigator.userAgent')) driver.quit() def test_firefox(self): parameter_dict = self.computer_partition.getConnectionParameterDict() webdriver_url = parameter_dict['backend-url'] driver = webdriver.Remote( command_executor=webdriver_url, desired_capabilities=DesiredCapabilities.FIREFOX) driver.get(self.server_url) self.assertEqual('Test page', driver.title) self.assertIn( 'Firefox', driver.execute_script('return navigator.userAgent')) driver.quit() def test_firefox_desired_version(self): parameter_dict = self.computer_partition.getConnectionParameterDict() webdriver_url = parameter_dict['backend-url'] desired_capabilities = DesiredCapabilities.FIREFOX.copy() desired_capabilities['version'] = '60.0.2esr' driver = webdriver.Remote( command_executor=webdriver_url, desired_capabilities=desired_capabilities) self.assertIn( 'Gecko/20100101 Firefox/60.0', driver.execute_script('return navigator.userAgent')) driver.quit() desired_capabilities['version'] = '52.9.0esr' driver = webdriver.Remote( command_executor=webdriver_url, desired_capabilities=desired_capabilities) self.assertIn( 'Gecko/20100101 Firefox/52.0', driver.execute_script('return navigator.userAgent')) driver.quit() class TestFrontend(WebServerMixin, SeleniumServerTestCase): """Test hub's https frontend. """ def test_admin(self): parameter_dict = self.computer_partition.getConnectionParameterDict() admin_url = parameter_dict['admin-url'] parsed = urlparse.urlparse(admin_url) self.assertEqual('admin', parsed.username) self.assertTrue(parsed.password) self.assertIn( 'Grid Console', requests.get(admin_url, verify=False).text) def test_browser_use_hub(self): parameter_dict = self.computer_partition.getConnectionParameterDict() webdriver_url = parameter_dict['url'] parsed = urlparse.urlparse(webdriver_url) self.assertEqual('selenium', parsed.username) self.assertTrue(parsed.password) driver = webdriver.Remote( command_executor=webdriver_url, desired_capabilities=DesiredCapabilities.CHROME) driver.get(self.server_url) self.assertEqual('Test page', driver.title) driver.quit() class TestSSHServer(SeleniumServerTestCase): @classmethod def getInstanceParameterDict(cls): cls.ssh_key = paramiko.RSAKey.generate(1024) return {'ssh-authorized-key': 'ssh-rsa {}'.format(cls.ssh_key.get_base64())} def test_connect(self): parameter_dict = self.computer_partition.getConnectionParameterDict() ssh_url = parameter_dict['ssh-url'] parsed = urlparse.urlparse(ssh_url) self.assertEqual('ssh', parsed.scheme) client = paramiko.SSHClient() client.set_missing_host_key_policy(paramiko.client.WarningPolicy) client.connect( username=urlparse.urlparse(ssh_url).username, hostname=urlparse.urlparse(ssh_url).hostname, port=urlparse.urlparse(ssh_url).port, pkey=self.ssh_key, ) channel = client.invoke_shell() channel.settimeout(30) # openssh prints a warning 'Attempt to write login records by non-root user (aborting)' # so we received more than the lenght of the asserted message. self.assertIn("Welcome to SlapOS Selenium Server.", channel.recv(100)) class TestFirefox52(BrowserCompatibilityMixin, SeleniumServerTestCase): desired_capabilities = dict(DesiredCapabilities.FIREFOX, version='52.9.0esr') user_agent = 'Gecko/20100101 Firefox/52.0' # resizing window is not supported on firefox 52 geckodriver test_resize_window = unittest.expectedFailure( BrowserCompatibilityMixin.test_resize_window) class TestFirefox60(BrowserCompatibilityMixin, SeleniumServerTestCase): desired_capabilities = dict(DesiredCapabilities.FIREFOX, version='60.0.2esr') user_agent = 'Gecko/20100101 Firefox/60.0' class TestChrome69(BrowserCompatibilityMixin, SeleniumServerTestCase): desired_capabilities = dict(DesiredCapabilities.CHROME, version='69.0.3497.0') user_agent = 'Chrome/69.0.3497.0'