Commit b56cd93a authored by Vincent Pelletier's avatar Vincent Pelletier

Locking is required for a multithreaded start (showcased by ZODB testMT).

Move startup code to ThreadedPoll class.
Refcount-ish based would be better, so add a TODO (idea: browse existing
connections to see if there are pending responses, preventing thread
shutdown).

git-svn-id: https://svn.erp5.org/repos/neo/trunk@2453 71dcc9de-d417-0410-9af5-da40c76e7ee4
parent f954f851
...@@ -16,6 +16,7 @@ ...@@ -16,6 +16,7 @@
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
from threading import Thread, Event, enumerate as thread_enum from threading import Thread, Event, enumerate as thread_enum
from neo.locking import Lock
import neo import neo
class _ThreadedPoll(Thread): class _ThreadedPoll(Thread):
...@@ -60,6 +61,9 @@ class ThreadedPoll(object): ...@@ -60,6 +61,9 @@ class ThreadedPoll(object):
_started = False _started = False
def __init__(self, *args, **kw): def __init__(self, *args, **kw):
lock = Lock()
self._status_lock_acquire = lock.acquire
self._status_lock_release = lock.release
self._args = args self._args = args
self._kw = kw self._kw = kw
self.newThread() self.newThread()
...@@ -68,11 +72,32 @@ class ThreadedPoll(object): ...@@ -68,11 +72,32 @@ class ThreadedPoll(object):
self._thread = _ThreadedPoll(*self._args, **self._kw) self._thread = _ThreadedPoll(*self._args, **self._kw)
def start(self): def start(self):
if self._started: """
self.newThread() Start thread if not started or restart it if it's shutting down.
else: """
self._started = True # TODO: a refcount-based approach would be better, but more intrusive.
self._thread.start() self._status_lock_acquire()
try:
thread = self._thread
if thread.stopping():
# XXX: ideally, we should wake thread up here, to be sure not
# to wait forever.
thread.join()
if not thread.isAlive():
if self._started:
self.newThread()
else:
self._started = True
self._thread.start()
finally:
self._status_lock_release()
def stop(self):
self._status_lock_acquire()
try:
self._thread.stop()
finally:
self._status_lock_release()
def __getattr__(self, key): def __getattr__(self, key):
return getattr(self._thread, key) return getattr(self._thread, key)
......
...@@ -66,19 +66,7 @@ class Dispatcher: ...@@ -66,19 +66,7 @@ class Dispatcher:
return True return True
def needPollThread(self): def needPollThread(self):
thread = self.poll_thread self.poll_thread.start()
# If thread has been stopped, wait for it to stop
# Note: This is not, ironically, thread safe: if one thread is
# stopping poll thread while we are checking its state here, a
# race condition will occur. If safety is required, locks should
# be added to control the access to thread's "start", "stopping"
# and "stop" methods.
if thread.stopping():
# XXX: ideally, we should wake thread up here, to be sure not
# to wait forever.
thread.join()
if not thread.isAlive():
thread.start()
@giant_lock @giant_lock
@profiler_decorator @profiler_decorator
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment