Commit 4a2d32ed authored by Jim Fulton's avatar Jim Fulton

Fixed bug:

- Unix domain sockets didn't work for ZEO (since the addition of IPv6
  support).

https://bugs.launchpad.net/zodb/+bug/663259
parent ee542828
......@@ -17,6 +17,9 @@ Bugs Fixed
https://bugs.launchpad.net/zodb/+bug/665452
- Unix domain sockets didn't work for ZEO (since the addition of IPv6
support). https://bugs.launchpad.net/zodb/+bug/663259
- Removed a missfeature that can cause performance problems when using
an external garbage collector with ZEO. When objects were deleted
from a storage, invalidations were sent to clients. This makes no
......
......@@ -33,6 +33,11 @@ class ZEOConfig:
"""Class to generate ZEO configuration file. """
def __init__(self, addr):
if isinstance(addr, str):
self.logpath = addr+'.log'
else:
self.logpath = 'server-%s.log' % addr[1]
addr = '%s:%s' % addr
self.address = addr
self.read_only = None
self.invalidation_queue_size = None
......@@ -46,7 +51,7 @@ class ZEOConfig:
def dump(self, f):
print >> f, "<zeo>"
print >> f, "address %s:%s" % self.address
print >> f, "address " + self.address
if self.read_only is not None:
print >> f, "read-only", self.read_only and "true" or "false"
if self.invalidation_queue_size is not None:
......@@ -69,10 +74,10 @@ class ZEOConfig:
<eventlog>
level %s
<logfile>
path server-%s.log
path %s
</logfile>
</eventlog>
""" % (self.loglevel, self.address[1])
""" % (self.loglevel, self.logpath)
def __str__(self):
f = StringIO.StringIO()
......@@ -110,8 +115,15 @@ def start_zeo_server(storage_conf=None, zeo_conf=None, port=None, keep=False,
if port is None:
raise AssertionError("The port wasn't specified")
if isinstance(port, int):
addr = 'localhost', port
adminaddr = 'localhost', port+1
else:
addr = port
adminaddr = port+'-test'
if zeo_conf is None or isinstance(zeo_conf, dict):
z = ZEOConfig(('localhost', port))
z = ZEOConfig(addr)
if zeo_conf:
z.__dict__.update(zeo_conf)
zeo_conf = z
......@@ -149,14 +161,18 @@ def start_zeo_server(storage_conf=None, zeo_conf=None, port=None, keep=False,
else:
pid = subprocess.Popen(args, env=d, close_fds=True).pid
adminaddr = ('localhost', port + 1)
# We need to wait until the server starts, but not forever.
# 30 seconds is a somewhat arbitrary upper bound. A BDBStorage
# takes a long time to open -- more than 10 seconds on occasion.
for i in range(120):
time.sleep(0.25)
for i in range(300):
time.sleep(0.1)
try:
if isinstance(adminaddr, str) and not os.path.exists(adminaddr):
continue
logger.debug('connect %s', i)
if isinstance(adminaddr, str):
s = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
else:
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.connect(adminaddr)
ack = s.recv(1024)
......@@ -170,7 +186,7 @@ def start_zeo_server(storage_conf=None, zeo_conf=None, port=None, keep=False,
else:
logging.debug('boo hoo')
raise
return ('localhost', port), adminaddr, pid, tmpfile
return addr, adminaddr, pid, tmpfile
if sys.platform[:3].lower() == "win":
......@@ -187,6 +203,9 @@ def shutdown_zeo_server(adminaddr):
# only requires two iterations, but do a third for pure
# superstition.
for i in range(3):
if isinstance(adminaddr, str):
s = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
else:
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.settimeout(.3)
try:
......
......@@ -1489,8 +1489,20 @@ def runzeo_logrotate_on_sigusr2():
>>> _ = p.wait()
"""
def unix_domain_sockets():
"""Make sure unix domain sockets work
>>> addr, _ = start_server(port='./sock')
>>> c = ZEO.connection(addr)
>>> c.root.x = 1
>>> transaction.commit()
>>> c.close()
"""
if sys.platform.startswith('win'):
del runzeo_logrotate_on_sigusr2
del unix_domain_sockets
if sys.version_info >= (2, 6):
import multiprocessing
......
......@@ -67,6 +67,9 @@ class ZEOTestServer(asyncore.dispatcher):
# Count down to zero, the number of connects
self._count = 1
self._label ='%d @ %s' % (os.getpid(), addr)
if isinstance(addr, str):
self.create_socket(socket.AF_UNIX, socket.SOCK_STREAM)
else:
self.create_socket(socket.AF_INET, socket.SOCK_STREAM)
# Some ZEO tests attempt a quick start of the server using the same
# port so we have to set the reuse flag.
......@@ -168,17 +171,15 @@ def main():
zo = ZEO.runzeo.ZEOOptions()
zo.realize(["-C", configfile])
zeo_port = int(zo.address[1])
addr = zo.address
if zo.auth_protocol == "plaintext":
__import__('ZEO.tests.auth_plaintext')
# Open the config file and let ZConfig parse the data there. Then remove
# the config file, otherwise we'll leave turds.
# The rest of the args are hostname, portnum
test_port = zeo_port + 1
test_addr = ('localhost', test_port)
addr = ('localhost', zeo_port)
if isinstance(addr, tuple):
test_addr = addr[0], addr[1]+1
else:
test_addr = addr + '-test'
log(label, 'creating the storage server')
storage = zo.storages[0].open()
mon_addr = None
......
......@@ -437,12 +437,13 @@ class ConnectThread(threading.Thread):
return 0
def _expand_addrlist(self):
for domain, (host, port) in self.addrlist:
for domain, addr in self.addrlist:
# AF_INET really means either IPv4 or IPv6, possibly
# indirected by DNS. By design, DNS lookup is deferred
# until connections get established, so that DNS
# reconfiguration can affect failover
if domain == socket.AF_INET:
host, port = addr
for (family, socktype, proto, cannoname, sockaddr
) in socket.getaddrinfo(host or 'localhost', port):
# for IPv6, drop flowinfo, and restrict addresses
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment