Commit 7cb60051 authored by Jim Fulton's avatar Jim Fulton

use a "threadless" multi-processing queue

To avoid spew about threads being left behind by tests.  The normal
multi-processing.Queue uses a thread to send data and the machinery
for stopping this thread seems to be buggy (or inscrutable) on Linux,
causing test runner spew that made output hard to interpret.
parent 42516476
......@@ -19,11 +19,13 @@ import random
import sys
import time
import errno
import multiprocessing
import socket
import subprocess
import logging
import tempfile
import six
from six.moves.queue import Empty
import ZODB.tests.util
import zope.testing.setupstack
from ZEO._compat import StringIO
......@@ -112,7 +114,6 @@ def runner(config, qin, qout, timeout=None,
try:
import threading
from six.moves.queue import Empty
if ZEO4_SERVER:
from .ZEO4 import runzeo
......@@ -141,7 +142,7 @@ def runner(config, qin, qout, timeout=None,
thread.start()
try:
qin.get(timeout=timeout)
qin.get(timeout=timeout) # wait for shutdown
except Empty:
pass
server.server.close()
......@@ -156,10 +157,6 @@ def runner(config, qin, qout, timeout=None,
pass
qout.put(thread.is_alive())
qin.get(timeout=11) # ack
if hasattr(qout, 'close'):
qout.close()
qout.cancel_join_thread()
except Exception:
logger.exception("In server thread")
......@@ -172,7 +169,6 @@ def runner(config, qin, qout, timeout=None,
def stop_runner(thread, config, qin, qout, stop_timeout=9, pid=None):
qin.put('stop')
dirty = qout.get(timeout=stop_timeout)
qin.put('ack')
if dirty:
print("WARNING SERVER DIDN'T STOP CLEANLY", file=sys.stderr)
......@@ -187,9 +183,6 @@ def stop_runner(thread, config, qin, qout, stop_timeout=9, pid=None):
thread.join(stop_timeout)
os.remove(config)
if hasattr(qin, 'close'):
qin.close()
qin.cancel_join_thread()
def start_zeo_server(storage_conf=None, zeo_conf=None, port=None, keep=False,
path='Data.fs', protocol=None, blob_dir=None,
......@@ -238,7 +231,7 @@ def start_zeo_server(storage_conf=None, zeo_conf=None, port=None, keep=False,
from six.moves.queue import Queue
else:
from multiprocessing import Process as Thread
from multiprocessing import Queue
Queue = ThreadlessQueue
qin = Queue()
qout = Queue()
......@@ -421,3 +414,17 @@ def debug_logging(logger='ZEO', stream='stderr', level=logging.DEBUG):
def whine(*message):
print(*message, file=sys.stderr)
sys.stderr.flush()
class ThreadlessQueue(object):
def __init__(self):
self.cin, self.cout = multiprocessing.Pipe(False)
def put(self, v):
self.cout.send(v)
def get(self, timeout=None):
if self.cin.poll(timeout):
return self.cin.recv()
else:
raise Empty()
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment