Commit 28914ad2 authored by Jim Fulton's avatar Jim Fulton

Use uvloop in the single-threaded server

To accomplish this, it was necessary to rearrange the tests so that
tests that ran servers in threads rather than using multiprocessing
into their own layer.  This is due to a bug currently in uvloop that
prevents using running uvloop servers in a process and in subprocesses
created with multiprocessing:

https://github.com/MagicStack/uvloop/issues/39

To run the tests with uvloop installed, it's necessary to use the
``-j`` option to run layers in separate processes.
parent 6e13bbe8
...@@ -2,8 +2,13 @@ from .._compat import PY3 ...@@ -2,8 +2,13 @@ from .._compat import PY3
if PY3: if PY3:
import asyncio import asyncio
try:
from uvloop import new_event_loop
except ImportError:
from asyncio import new_event_loop
else: else:
import trollius as asyncio import trollius as asyncio
from trollius import new_event_loop
import json import json
import logging import logging
...@@ -223,7 +228,7 @@ class Acceptor(object): ...@@ -223,7 +228,7 @@ class Acceptor(object):
self.storage_server = storage_server self.storage_server = storage_server
self.addr = addr self.addr = addr
self.ssl_context = ssl self.ssl_context = ssl
self.event_loop = loop = asyncio.new_event_loop() self.event_loop = loop = new_event_loop()
if isinstance(addr, tuple): if isinstance(addr, tuple):
cr = loop.create_server(self.factory, addr[0], addr[1], cr = loop.create_server(self.factory, addr[0], addr[1],
......
...@@ -10,7 +10,7 @@ ZEO includes a script that provides a nagios monitor plugin: ...@@ -10,7 +10,7 @@ ZEO includes a script that provides a nagios monitor plugin:
In it's simplest form, the script just checks if it can get status: In it's simplest form, the script just checks if it can get status:
>>> import ZEO >>> import ZEO
>>> addr, stop = ZEO.server('test.fs') >>> addr, stop = ZEO.server('test.fs', threaded=False)
>>> saddr = ':'.join(map(str, addr)) # (host, port) -> host:port >>> saddr = ':'.join(map(str, addr)) # (host, port) -> host:port
>>> nagios([saddr]) >>> nagios([saddr])
...@@ -39,7 +39,7 @@ The monitor will optionally output server metric data. There are 2 ...@@ -39,7 +39,7 @@ The monitor will optionally output server metric data. There are 2
kinds of metrics it can output, level and rate metric. If we use the kinds of metrics it can output, level and rate metric. If we use the
-m/--output-metrics option, we'll just get rate metrics: -m/--output-metrics option, we'll just get rate metrics:
>>> addr, stop = ZEO.server('test.fs') >>> addr, stop = ZEO.server('test.fs', threaded=False)
>>> saddr = ':'.join(map(str, addr)) # (host, port) -> host:port >>> saddr = ':'.join(map(str, addr)) # (host, port) -> host:port
>>> nagios([saddr, '-m']) >>> nagios([saddr, '-m'])
OK|active_txns=0 OK|active_txns=0
...@@ -115,7 +115,7 @@ profixes metrics with a storage id. ...@@ -115,7 +115,7 @@ profixes metrics with a storage id.
... </mappingstorage> ... </mappingstorage>
... <mappingstorage second> ... <mappingstorage second>
... </mappingstorage> ... </mappingstorage>
... """) ... """, threaded=False)
>>> saddr = ':'.join(map(str, addr)) # (host, port) -> host:port >>> saddr = ':'.join(map(str, addr)) # (host, port) -> host:port
>>> nagios([saddr, '-m', '-sstatus']) >>> nagios([saddr, '-m', '-sstatus'])
Empty storage u'first'|first:active_txns=0 Empty storage u'first'|first:active_txns=0
......
...@@ -71,7 +71,7 @@ is generated before the cache is dropped or the message is logged. ...@@ -71,7 +71,7 @@ is generated before the cache is dropped or the message is logged.
Now, we'll restart the server on the original address: Now, we'll restart the server on the original address:
>>> _, admin = start_server(zeo_conf=dict(invalidation_queue_size=1), >>> _, admin = start_server(zeo_conf=dict(invalidation_queue_size=1),
... addr=addr, keep=1, threaded=True) ... addr=addr, keep=1)
>>> wait_connected(db.storage) >>> wait_connected(db.storage)
......
...@@ -4,7 +4,7 @@ We'll start by setting up a server and connecting to it: ...@@ -4,7 +4,7 @@ We'll start by setting up a server and connecting to it:
>>> import ZEO, transaction >>> import ZEO, transaction
>>> addr, stop = ZEO.server(path='test.fs') >>> addr, stop = ZEO.server(path='test.fs', threaded=False)
>>> conn = ZEO.connection(addr) >>> conn = ZEO.connection(addr)
>>> client = conn.db().storage >>> client = conn.db().storage
>>> client.is_connected() >>> client.is_connected()
...@@ -24,7 +24,7 @@ And wait for the connectin to notice it's disconnected: ...@@ -24,7 +24,7 @@ And wait for the connectin to notice it's disconnected:
Now, we'll restart the server: Now, we'll restart the server:
>>> addr, stop = ZEO.server(path='test.fs') >>> addr, stop = ZEO.server(path='test.fs', threaded=False)
Update with another client: Update with another client:
......
...@@ -19,6 +19,7 @@ from zope.testing import setupstack ...@@ -19,6 +19,7 @@ from zope.testing import setupstack
from ZODB.config import storageFromString from ZODB.config import storageFromString
from .forker import start_zeo_server from .forker import start_zeo_server
from .threaded import threaded_server_tests
class ZEOConfigTestBase(setupstack.TestCase): class ZEOConfigTestBase(setupstack.TestCase):
...@@ -120,4 +121,6 @@ class ZEOConfigTest(ZEOConfigTestBase): ...@@ -120,4 +121,6 @@ class ZEOConfigTest(ZEOConfigTestBase):
blob_cache_size_check=50) blob_cache_size_check=50)
def test_suite(): def test_suite():
return unittest.makeSuite(ZEOConfigTest) suite = unittest.makeSuite(ZEOConfigTest)
suite.layer = threaded_server_tests
return suite
...@@ -1609,15 +1609,6 @@ def test_suite(): ...@@ -1609,15 +1609,6 @@ def test_suite():
globs={'print_function': print_function}, globs={'print_function': print_function},
), ),
) )
if not forker.ZEO4_SERVER:
zeo.addTest(
doctest.DocFileSuite(
'dynamic_server_ports.test',
setUp=forker.setUp, tearDown=zope.testing.setupstack.tearDown,
checker=renormalizing.RENormalizing(patterns),
globs={'print_function': print_function},
),
)
zeo.addTest(PackableStorage.IExternalGC_suite( zeo.addTest(PackableStorage.IExternalGC_suite(
lambda : lambda :
ServerManagingClientStorageForIExternalGCTest( ServerManagingClientStorageForIExternalGCTest(
...@@ -1655,6 +1646,17 @@ def test_suite(): ...@@ -1655,6 +1646,17 @@ def test_suite():
suite.addTest(ZODB.tests.testblob.storage_reusable_suite( suite.addTest(ZODB.tests.testblob.storage_reusable_suite(
'ClientStorageSharedBlobs', create_storage_shared)) 'ClientStorageSharedBlobs', create_storage_shared))
if not forker.ZEO4_SERVER:
from .threaded import threaded_server_tests
dynamic_server_ports_suite = doctest.DocFileSuite(
'dynamic_server_ports.test',
setUp=forker.setUp, tearDown=zope.testing.setupstack.tearDown,
checker=renormalizing.RENormalizing(patterns),
globs={'print_function': print_function},
)
dynamic_server_ports_suite.layer = threaded_server_tests
suite.addTest(dynamic_server_ports_suite)
return suite return suite
......
...@@ -497,7 +497,7 @@ def test_prefetch(self): ...@@ -497,7 +497,7 @@ def test_prefetch(self):
>>> count = 999 >>> count = 999
>>> import ZEO >>> import ZEO
>>> addr, stop = ZEO.server() >>> addr, stop = ZEO.server(threaded=False)
>>> conn = ZEO.connection(addr) >>> conn = ZEO.connection(addr)
>>> root = conn.root() >>> root = conn.root()
>>> cls = root.__class__ >>> cls = root.__class__
......
...@@ -9,6 +9,7 @@ import unittest ...@@ -9,6 +9,7 @@ import unittest
import ZEO.StorageServer import ZEO.StorageServer
from . import forker from . import forker
from .threaded import threaded_server_tests
@unittest.skipIf(forker.ZEO4_SERVER, "ZEO4 servers don't support SSL") @unittest.skipIf(forker.ZEO4_SERVER, "ZEO4 servers don't support SSL")
class ClientAuthTests(setupstack.TestCase): class ClientAuthTests(setupstack.TestCase):
...@@ -54,5 +55,7 @@ class ClientAuthTests(setupstack.TestCase): ...@@ -54,5 +55,7 @@ class ClientAuthTests(setupstack.TestCase):
stop() stop()
def test_suite(): def test_suite():
return unittest.makeSuite(ClientAuthTests) suite = unittest.makeSuite(ClientAuthTests)
suite.layer = threaded_server_tests
return suite
...@@ -111,7 +111,7 @@ class ClientSideConflictResolutionTests(zope.testing.setupstack.TestCase): ...@@ -111,7 +111,7 @@ class ClientSideConflictResolutionTests(zope.testing.setupstack.TestCase):
def test_client_side(self): def test_client_side(self):
# First, traditional: # First, traditional:
addr, stop = ZEO.server('data.fs') addr, stop = ZEO.server('data.fs', threaded=False)
db = ZEO.DB(addr) db = ZEO.DB(addr)
with db.transaction() as conn: with db.transaction() as conn:
conn.root.l = Length(0) conn.root.l = Length(0)
...@@ -130,6 +130,7 @@ class ClientSideConflictResolutionTests(zope.testing.setupstack.TestCase): ...@@ -130,6 +130,7 @@ class ClientSideConflictResolutionTests(zope.testing.setupstack.TestCase):
addr2, stop = ZEO.server( addr2, stop = ZEO.server(
storage_conf='<mappingstorage>\n</mappingstorage>\n', storage_conf='<mappingstorage>\n</mappingstorage>\n',
zeo_conf=dict(client_conflict_resolution=True), zeo_conf=dict(client_conflict_resolution=True),
threaded=False,
) )
db = ZEO.DB(addr2) db = ZEO.DB(addr2)
......
...@@ -11,6 +11,7 @@ from .. import runzeo ...@@ -11,6 +11,7 @@ from .. import runzeo
from .testConfig import ZEOConfigTestBase from .testConfig import ZEOConfigTestBase
from . import forker from . import forker
from .threaded import threaded_server_tests
here = os.path.dirname(__file__) here = os.path.dirname(__file__)
server_cert = os.path.join(here, 'server.pem') server_cert = os.path.join(here, 'server.pem')
...@@ -121,6 +122,7 @@ class SSLConfigTest(ZEOConfigTestBase): ...@@ -121,6 +122,7 @@ class SSLConfigTest(ZEOConfigTestBase):
@mock.patch(('asyncio' if PY3 else 'trollius') + '.set_event_loop') @mock.patch(('asyncio' if PY3 else 'trollius') + '.set_event_loop')
@mock.patch(('asyncio' if PY3 else 'trollius') + '.new_event_loop') @mock.patch(('asyncio' if PY3 else 'trollius') + '.new_event_loop')
@mock.patch('ZEO.asyncio.client.new_event_loop') @mock.patch('ZEO.asyncio.client.new_event_loop')
@mock.patch('ZEO.asyncio.server.new_event_loop')
class SSLConfigTestMockiavellian(ZEOConfigTestBase): class SSLConfigTestMockiavellian(ZEOConfigTestBase):
@mock.patch('ssl.create_default_context') @mock.patch('ssl.create_default_context')
...@@ -335,10 +337,12 @@ pwfunc = lambda : '1234' ...@@ -335,10 +337,12 @@ pwfunc = lambda : '1234'
def test_suite(): def test_suite():
return unittest.TestSuite(( suite = unittest.TestSuite((
unittest.makeSuite(SSLConfigTest), unittest.makeSuite(SSLConfigTest),
unittest.makeSuite(SSLConfigTestMockiavellian), unittest.makeSuite(SSLConfigTestMockiavellian),
)) ))
suite.layer = threaded_server_tests
return suite
# Helpers for other tests: # Helpers for other tests:
......
"""Test layer for threaded-server tests
uvloop currently has a bug,
https://github.com/MagicStack/uvloop/issues/39, that causes failure if
multiprocessing and threaded servers are mixed in the same
application, so we isolate the few threaded tests in their own layer.
"""
import ZODB.tests.util
threaded_server_tests = ZODB.tests.util.MininalTestLayer(
'threaded_server_tests')
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment