Commit 660a7b1e authored by Guido van Rossum's avatar Guido van Rossum

Major refactoring of the ConnectThread, to support the

read_only_fallback option of ClientStorage.

Unfortunately, this still doesn't work completely; as shown by some
disabled tests that I'll check in shortly, the reconnect feature is
still broken.  But it's weekend time, and I need a checkpoint to keep
my own sanity.
parent 703cf4f3
......@@ -22,6 +22,8 @@ import types
import ThreadedAsync
import zLOG
from ZODB.POSException import ReadOnlyError
from ZEO.zrpc.log import log
from ZEO.zrpc.trigger import trigger
from ZEO.zrpc.connection import ManagedConnection
......@@ -86,14 +88,14 @@ class ConnectionManager:
self.thread_lock.acquire()
try:
t = self.thread
if t is not None:
t.stop()
finally:
self.thread_lock.release()
if t is not None:
log("CM.close(): stopping and joining thread")
t.stop()
t.join(30)
if t.isAlive():
log("ConnectionManager.close(): self.thread.join() timed out")
log("CM.close(): self.thread.join() timed out")
if self.connection:
self.connection.close()
if self.trigger is not None:
......@@ -112,7 +114,9 @@ class ConnectionManager:
# XXX need each connection started with async==0 to have a
# callback
log("CM.set_async(%s)" % repr(map))
if not self.closed and self.trigger is None:
log("CM.set_async(): first call")
self.trigger = trigger()
self.thr_async = 1 # XXX needs to be set on the Connection
......@@ -146,7 +150,7 @@ class ConnectionManager:
try:
t = self.thread
if t is None:
log("starting thread to connect to server")
log("CM.connect(): starting ConnectThread")
self.thread = t = ConnectThread(self, self.client,
self.addrlist,
self.tmin, self.tmax)
......@@ -156,14 +160,14 @@ class ConnectionManager:
if sync:
t.join(30)
while t.isAlive():
log("ConnectionManager.connect(sync=1): "
"self.thread.join() timed out")
log("CM.connect(sync=1): thread join timed out")
t.join(30)
def connect_done(self, c):
log("connect_done()")
def connect_done(self, conn, preferred):
log("CM.connect_done(preferred=%s)" % preferred)
self.connected = 1
self.connection = c
self.connection = conn
if preferred:
self.thread_lock.acquire()
try:
self.thread = None
......@@ -171,6 +175,7 @@ class ConnectionManager:
self.thread_lock.release()
def notify_closed(self):
log("CM.notify_closed()")
self.connected = 0
self.connection = None
self.client.notifyDisconnected()
......@@ -192,14 +197,24 @@ else: # Unix
class ConnectThread(threading.Thread):
"""Thread that tries to connect to server given one or more addresses.
The thread is passed a ConnectionManager and the manager's client
as arguments. It calls notifyConnected() on the client when a
socket connects. If notifyConnected() returns without raising an
exception, the thread is done; it calls connect_done() on the
manager and exits.
as arguments. It calls testConnection() on the client when a
socket connects; that should return a tuple (stub, score) where
stub is an RPC stub, and score is 1 or 0 depending on whether this
is a preferred or a fallback connection. It may also raise an
exception, in which case the connection is abandoned.
The thread will continue to run, attempting connections, until a
successful notifyConnected() or stop() is called.
preferred stub is seen or until all sockets have been tried.
As soon as testConnection() returns a preferred stub, or after all
sockets have been tried and at least one fallback stub has been
seen, notifyConnected(stub) is called on the client and
connect_done() on the manager. If this was a preferred stub, the
thread then exits; otherwise, it keeps trying until it gets a
preferred stub, and then reconnects the client using that stub.
"""
__super_init = threading.Thread.__init__
......@@ -216,189 +231,248 @@ class ConnectThread(threading.Thread):
self.tmax = tmax
self.stopped = 0
self.one_attempt = threading.Event()
self.fallback = None
# A ConnectThread keeps track of whether it has finished a
# call to attempt_connects(). This allows the
# ConnectionManager to make an attempt to connect right away,
# but not block for too long if the server isn't immediately
# available.
# call to try_connecting(). This allows the ConnectionManager
# to make an attempt to connect right away, but not block for
# too long if the server isn't immediately available.
def stop(self):
self.stopped = 1
# Every method from run() to the end is used internally by the Thread.
def run(self):
delay = self.tmin
while not self.stopped:
success = self.attempt_connects()
success = self.try_connecting()
if not self.one_attempt.isSet():
self.one_attempt.set()
if success:
if success > 0:
break
time.sleep(delay)
delay *= 2
if delay > self.tmax:
delay = self.tmax
log("thread exiting: %s" % self.getName())
def close_sockets(self):
for s in self.sockets.keys():
s.close()
delay = min(delay*2, self.tmax)
log("CT: exiting thread: %s" % self.getName())
def attempt_connects(self):
def try_connecting(self):
"""Try connecting to all self.addrlist addresses.
If at least one succeeds, pick a success arbitrarily, close all other
successes (if any), and return true. If none succeed, return false.
Return 1 if a preferred connection was found; 0 if no
connection was found; and -1 if a fallback connection was
found.
"""
self.sockets = {} # {open socket: connection address}
log("CT: attempting to connect on %d sockets" % len(self.addrlist))
log("attempting connection on %d sockets" % len(self.addrlist))
ok = 0
# Create socket wrappers
wrappers = {} # keys are active wrappers
for domain, addr in self.addrlist:
if __debug__:
log("attempt connection to %s" % repr(addr),
level=zLOG.DEBUG)
try:
s = socket.socket(domain, socket.SOCK_STREAM)
except socket.error, err:
log("Failed to create socket with domain=%s: %s" % (
domain, err), level=zLOG.ERROR)
continue
s.setblocking(0)
self.sockets[s] = addr
# XXX can still block for a while if addr requires DNS
if self.try_connect(s):
ok = 1
break
wrap = ConnectWrapper(domain, addr, self.mgr, self.client)
wrap.connect_procedure()
if wrap.state == "notified":
for wrap in wrappers.keys():
wrap.close()
return 1
if wrap.state != "closed":
wrappers[wrap] = wrap
# next wait until they actually connect
while not ok and self.sockets:
# Next wait until they all actually connect (or fail)
# XXX If a sockets never connects, nor fails, we'd wait forever!
while wrappers:
if self.stopped:
self.close_sockets()
for wrap in wrappers.keys():
wrap.close()
return 0
# Select connecting wrappers
connecting = [wrap
for wrap in wrappers.keys()
if wrap.state == "connecting"]
if not connecting:
break
try:
sockets = self.sockets.keys()
r, w, x = select.select([], sockets, sockets, 1.0)
except select.error:
r, w, x = select.select([], connecting, connecting, 1.0)
except select.error, msg:
log("CT: select failed; msg=%s" % str(msg),
level=zLOG.WARNING) # XXX Is this the right level?
continue
for s in x:
del self.sockets[s]
s.close()
for s in w:
if self.try_connect(s):
ok = 1
break
if ok:
del self.sockets[s] # don't close the newly connected socket
self.close_sockets()
# Exceptable wrappers are in trouble; close these suckers
for wrap in x:
del wrappers[wrap]
wrap.close()
# Writable sockets are connected
for wrap in w:
wrap.connect_procedure()
if wrap.state == "notified":
del wrappers[wrap] # Don't close this one
for wrap in wrappers.keys():
wrap.close()
return 1
if self.fallback:
(c, stub) = self.fallback
self.fallback = None
try:
self.client.notifyConnected(stub)
except:
log("error in notifyConnected (%r)" % addr,
level=zLOG.ERROR, error=sys.exc_info())
c.close()
return 0
if wrap.state == "closed":
del wrappers[wrap]
# If we've got wrappers left at this point, they're fallback
# connections. Try notifying then until one succeeds.
for wrap in wrappers.keys():
assert wrap.state == "tested" and wrap.preferred == 0
if self.mgr.connected:
wrap.close()
else:
self.mgr.connect_done(c)
return 1
wrap.notify_client()
if wrap.state == "notified":
del wrappers[wrap] # Don't close this one
for wrap in wrappers.keys():
wrap.close()
return -1
assert wrap.state == "closed"
del wrappers[wrap]
# Alas, no luck.
assert not wrappers
return 0
def try_connect(self, s):
"""Call s.connect_ex(addr); return true iff connection succeeds.
We have to handle several possible return values from
connect_ex(). If the socket is connected and the initial ZEO
setup works, we're done. Report success by raising an
exception. Yes, the is odd, but we need to bail out of the
select() loop in the caller and an exception is a principled
way to do the abort.
_USING_WINSOCK = sys.platform.startswith("win")
If the socket sonnects and the initial ZEO setup
(notifyConnected()) fails or the connect_ex() returns an
error, we close the socket, remove it from self.sockets, and
proceed with the other sockets.
class ConnectWrapper:
"""An object that handles the connection procedure for one socket.
If connect_ex() returns EINPROGRESS, we need to try again later.
This is a little state machine with states:
closed
opened
connecting
connected
tested
notified
"""
addr = self.sockets[s]
def __init__(self, domain, addr, mgr, client):
"""Store arguments and create non-blocking socket."""
self.domain = domain
self.addr = addr
self.mgr = mgr
self.client = client
# These attributes are part of the interface
self.state = "closed"
self.sock = None
self.conn = None
self.stub = None
self.preferred = 0
log("CW: attempt to connect to %s" % repr(addr))
try:
self.sock = socket.socket(domain, socket.SOCK_STREAM)
except socket.error, err:
log("CW: can't create socket, domain=%s: %s" % (domain, err),
level=zLOG.ERROR)
self.close()
return
self.sock.setblocking(0)
self.state = "opened"
def connect_procedure(self):
"""Call sock.connect_ex(addr) and interpret result."""
if self.state in ("opened", "connecting"):
try:
e = s.connect_ex(addr)
err = self.sock.connect_ex(self.addr)
except socket.error, msg:
log("failed to connect to %s: %s" % (addr, msg),
log("CW: connect_ex(%r) failed: %s" % (self.addr, msg),
level=zLOG.ERROR)
self.close()
return
log("CW: connect_ex(%s) returned %s" %
(self.addr, errno.errorcode.get(err) or str(err)))
if err in _CONNECT_IN_PROGRESS:
self.state = "connecting"
return
if err not in _CONNECT_OK:
log("CW: error connecting to %s: %s" %
(self.addr, errno.errorcode.get(err) or str(err)),
level=zLOG.WARNING)
self.close()
return
if err == 0 and _USING_WINSOCK:
self.winsock_check_connected()
else:
log("connect_ex(%s) == %s" % (addr, e))
if e in _CONNECT_IN_PROGRESS:
return 0
elif e in _CONNECT_OK:
# special cases to deal with winsock oddities
if sys.platform.startswith("win") and e == 0:
# It appears that winsock isn't behaving as
# expected on Win2k. It's possible for connect_ex()
# to return 0, but the connection to have failed.
# In particular, in situations where I expect to
# get a Connection refused (10061), I'm seeing
# connect_ex() return 0. OTOH, it looks like
# select() is a more reliable indicator on
# Windows.
r, w, x = select.select([s], [s], [s], 0.1)
self.state = "connected"
if self.state == "connected":
self.test_connection()
def winsock_check_connected(self):
"""Deal with winsock oddities.
XXX How much of this is superstition?
It appears that winsock isn't behaving as expected on Win2k.
It's possible for connect_ex() to return 0, but the connection
to have failed. In particular, in situations where I expect
to get a Connection refused (10061), I'm seeing connect_ex()
return 0. OTOH, it looks like select() is a more reliable
indicator on Windows.
"""
# XXX Why not use 0.0 as timeout?
r, w, x = select.select([self.sock], [self.sock], [self.sock], 0.1)
if not (r or w or x):
return 0
if x:
# see comment at the end of the function
s.close()
del self.socket[s]
c = self.test_connection(s, addr)
if c:
log("connected to %s" % repr(addr), level=zLOG.DEBUG)
return 1
self.state = "connecting"
elif x:
self.close()
else:
log("error connecting to %s: %s" % (addr, errno.errorcode[e]),
level=zLOG.DEBUG)
# Any execution that doesn't raise Connected() or return
# because of CONNECT_IN_PROGRESS is an error. Make sure the
# socket is closed and remove it from the dict of pending
# sockets.
s.close()
del self.sockets[s]
return 0
self.state = "connected"
def test_connection(self):
"""Establish and test a connection at the zrpc level.
def test_connection(self, s, addr):
# Establish a connection at the zrpc level and call the
# client's notifyConnected(), giving the zrpc application a
# chance to do app-level check of whether the connection is
# okay.
c = ManagedConnection(s, addr, self.client, self.mgr)
Call the client's testConnection(), giving the client a chance
to do app-level check of the connection.
"""
self.conn = ManagedConnection(self.sock, self.addr,
self.client, self.mgr)
try:
(stub, preferred) = self.client.testConnection(c)
(self.stub, self.preferred) = self.client.testConnection(self.conn)
self.state = "tested"
except ReadOnlyError:
log("CW: ReadOnlyError in testConnection (%s)" % repr(self.addr))
self.close()
return
except:
log("error in testConnection (%r)" % (addr,),
log("CW: error in testConnection (%s)" % repr(self.addr),
level=zLOG.ERROR, error=sys.exc_info())
c.close()
# Closing the ZRPC connection will eventually close the
# socket, somewhere in asyncore.
return 0
if preferred:
self.close()
return
if self.preferred:
self.notify_client()
def notify_client(self):
"""Call the client's notifyConnected().
If this succeeds, call the manager's connect_done().
If the client is already connected, we assume it's a fallbac
connection, the new stub must be a preferred stub, and we
first disconnect the client.
"""
if self.mgr.connected:
assert self.preferred
log("CW: reconnecting client to preferred stub")
self.mgr.notify_closed()
try:
self.client.notifyConnected(stub)
self.client.notifyConnected(self.stub)
except:
log("error in notifyConnected (%r)" % (addr,),
log("CW: error in notifyConnected (%s)" % repr(self.addr),
level=zLOG.ERROR, error=sys.exc_info())
c.close()
return 0
else:
self.mgr.connect_done(c)
return 1
if self.fallback is None:
self.fallback = (c, stub)
return 0
self.close()
return
self.state = "notified"
self.mgr.connect_done(self.conn, self.preferred)
def close(self):
"""Close the socket and reset everything."""
self.state = "closed"
self.stub = self.mgr = self.client = None
self.preferred = 0
if self.conn is not None:
# Closing the ZRPC connection will eventually close the
# socket, somewhere in asyncore.
# XXX Why do we care? --Guido
self.conn.close()
self.conn = None
if self.sock is not None:
self.sock.close()
self.sock = None
def fileno(self):
return self.sock.fileno()
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment