Commit 81f586c4 authored by Jeremy Hylton's avatar Jeremy Hylton

Merge ZEO2-branch to trunk.

parent d09403de
......@@ -10,20 +10,23 @@ ClientStorage
Creating a ClientStorage
At a minimum, a client storage requires an argument (named
connection) giving connection information. This argument should be
a string, specifying a unix-domain socket file name, or a tuple
consisting of a host and port. The host should be a string host
name or IP number. The port should be a numeric port number.
The ClientStorage requires at leats one argument, the address or
addresses of the server(s) to use. It accepts several other
optional keyword arguments.
The ClientStorage constructor provides a number of additional
options (arguments). The full list of arguments is:
The address argument can be one of:
- a tuple containing hostname and port number
- a string specifying the path to a Unix domain socket
- a sequence of the previous two
connection -- Connection information.
If a sequence of addresses is specified, the client will use the
first server from the list that it can connect to.
This argument is either a string containing a socket file name
or a tuple consisting of a string host name or ip number and an
integer port.
The ClientStorage constructor provides a number of additional
options (arguments). The full list of arguments is:
storage -- The name of the storage to connect to.
......@@ -33,7 +36,9 @@ ClientStorage
default name for both the server and client is '1'.
cache_size -- The number of bytes to allow for the client cache.
The default is 20,000,000.
The default is 20,000,000. A large cache can significantly
increase the performance of a ZEO system. For applications that
have a large database, the default size may be too small.
For more information on client caches, see ClientCache.txt.
......@@ -54,10 +59,6 @@ ClientStorage
For more information on client cache files, see ClientCache.txt.
debug -- If this is provided, it should be a non-empty string. It
indicates that client should log tracing and debugging
information, using zLOG.
var -- The directory in which persistent cache files should be
written. If this option is provided, it is unnecessary to
set INSTANCE_HOME in __builtins__.
......@@ -82,6 +83,13 @@ ClientStorage
The default is 300 seconds.
wait_for_server_on_starup -- Indicate whether the ClientStorage
should block waiting for a storage server connection, or whether
it should proceed, satisfying reads from the client cache.
wait -- Indicate whether the ClientStorage should block waiting
for a storage server connection, or whether it should proceed,
satisfying reads from the client cache.
read_only -- Open a read-only connection to the server. If the
client attempts to commit a transaction, it will get a
ReadOnlyError exception.
Each storage served by a ZEO server can be configured as either
read-write or read-only.
Zope Enterprize Objects
ZEO 1.0 requires Python 2.0 when used without Zope. It depends on
versions of asyncore and cPickle that were first released with
Python 2.0.
Installation
Put the ZEO package in a directory on your Python path. On a Unix
system, you can use the site-packages directory of your Python lib
directory. The ZEO package is the directory named ZEO that contains
an __init__.py file.
ZEO 2.0 requires Python 2.1 or higher when used without Zope. If
you use Python 2.1, we recommend the latest minor release (2.1.3 as
of this writing) because it includes a few bug fixes that affect
ZEO.
Starting (and configuring) the ZEO Server
ZEO is packaged with distutils. To install it, run this command
from the top-level ZEO directory::
python setup.py install
The setup script will install the ZEO package in your Python
site-packages directory.
You can test ZEO before installing it with the test script::
To start the storage server, run the start.py script contained in
the ZEO package. You can run the script from the package
directory or copy it to a directory on your path.
python test.py -v
Run the script with the -h option for a full list of options. The
ZEO 2.0a1 release contains 87 unit tests on Unix.
Starting (and configuring) the ZEO Server
Specify the port number when you run the script::
To start the storage server, go to your Zope install directory and
run::
python ZEO/start.py -p port_number
python lib/python/ZEO/start.py -p port_number
Or run start.py without arguments to see options. The options are
documented in start.txt.
This run the storage sever under zdaemon. zdaemon automatically
restarts programs that exit unexpectedly.
The server and the client don't have to be on the same machine.
If the server and client *are* on the same machine, then you can
use a Unix domain socket::
If they are on the same machine, then you can use a Unix domain
socket::
python ZEO/start.py -U filename
python lib/python/ZEO/start.py -U filename
The start script provides a number of options not documented here.
See doc/start.txt for more information.
Running a ZEO client
In your application, create a ClientStorage, rather than, say, a
FileStorage:
import ZODB, ZEO.ClientStorage
Storage=ZEO.ClientStorage.ClientStorage(('',port_number))
db=ZODB.DB(Storage)
import ZODB
from ZEO.ClientStorage import ClientStorage
Storage = ClientStorage(('', port_number))
db = ZODB.DB(Storage)
You can specify a host name (rather than '') if you want. The port
number is, of course, the port number used to start the storage
......@@ -43,38 +57,24 @@ Zope Enterprize Objects
You can also give the name of a Unix domain socket file::
import ZODB, ZEO.ClientStorage
Storage=ZEO.ClientStorage.ClientStorage(filename)
db=ZODB.DB(Storage)
import ZODB
from ZEO.ClientStorage import ClientStorage
Storage = ClientStorage(filename)
db = ZODB.DB(Storage)
There are a number of configuration options available for the
ClientStorage. See ClientStorage.txt for details.
If you want a persistent client cache which retains cache contents
across ClientStorage restarts, you need to define the environment
variable, ZEO_CLIENT, to a unique name for the client. This is
needed so that unique cache name files can be computed. Otherwise,
the client cache is stored in temporary files which are removed when
variable, ZEO_CLIENT, or set the client keyword argument to the
constructor to a unique name for the client. This is needed so
that unique cache name files can be computed. Otherwise, the
client cache is stored in temporary files which are removed when
the ClientStorage shuts down.
Dependencies on other modules
- The module ThreadedAsync must be on the Python path.
- The zdaemon module is necessary if you want to run your
storage server as a daemon that automatically restarts itself
if there is a fatal error.
- The zLOG module provides a handy logging capability.
If you are using a version of Python before Python 2:
- ZServer should be in the Python path, or you should copy the
version of asyncore.py from ZServer (from Zope 2.2 or CVS) to
your Python path, or you should copy a version of a asyncore
from the medusa CVS tree to your Python path. A recent change
in asyncore is required.
- The version of cPickle from Zope, or from the python.org CVS
tree must be used. It has a hook to provide control over which
"global objects" (e.g. classes) may be pickled.
ZEO depends on other modules that are distributed with
StandaloneZODB and with Zope. You can download StandaloneZODB
from http://www.zope.org/Products/StandaloneZODB.
......@@ -2,30 +2,38 @@ Zope Enterprise Objects
Installation
ZEO 1.0 requires Zope 2.2 or higher.
ZEO 2.0 requires Zope 2.4 or higher and Python 2.1 or higher.
If you use Python 2.1, we recommend the latest minor release
(2.1.3 as of this writing) because it includes a few bug fixes
that affect ZEO.
Put this package (the ZEO directory, without any wrapping directory
Put the package (the ZEO directory, without any wrapping directory
included in a distribution) in your Zope lib/python.
If you are using Python 1.5.2, the lib/python/ZODB directory must
contain a cPickle.so (Unix) or cPickle.pyd (Windows) file. In
many cases, the Zope installation process will not place this file
in the right location. You may need to copy it from lib/python to
lib/python/ZODB.
The setup.py script in the top-level ZEO directory can also be
used. Run "python setup.py install --home=ZOPE" where ZOPE is the
top-level Zope directory.
You can test ZEO before installing it with the test script::
python test.py -v
Run the script with the -h option for a full list of options. The
ZEO 2.0a1 release contains 87 unit tests on Unix.
Starting (and configuring) the ZEO Server
To start the storage server, go to your Zope install directory and::
To start the storage server, go to your Zope install directory and
run::
python lib/python/ZEO/start.py -p port_number
(Run start without arguments to see options.)
This run the storage sever under zdaemon. zdaemon automatically
restarts programs that exit unexpectedly.
Of course, the server and the client don't have to be on the same
machine.
If the server and client *are* on the same machine, then you can use
a Unix domain socket::
The server and the client don't have to be on the same machine.
If they are on the same machine, then you can use a Unix domain
socket::
python lib/python/ZEO/start.py -U filename
......@@ -38,10 +46,8 @@ Zope Enterprise Objects
custom_zodb.py, in your Zope install directory, so that Zope uses a
ClientStorage::
import ZEO.ClientStorage
Storage=ZEO.ClientStorage.ClientStorage(('',port_number))
(See the misc/custom_zodb.py for an example.)
from ZEO.ClientStorage import ClientStorage
Storage = ClientStorage(('', port_number))
You can specify a host name (rather than '') if you want. The port
number is, of course, the port number used to start the storage
......@@ -49,19 +55,20 @@ Zope Enterprise Objects
You can also give the name of a Unix domain socket file::
import ZEO.ClientStorage
Storage=ZEO.ClientStorage.ClientStorage(filename)
from ZEO.ClientStorage import ClientStorage
Storage = ClientStorage(filename)
There are a number of configuration options available for the
ClientStorage. See doc/ClientStorage.txt for details.
If you want a persistent client cache which retains cache contents
across ClientStorage restarts, you need to define the environment
variable, ZEO_CLIENT, to a unique name for the client. This is
needed so that unique cache name files can be computed. Otherwise,
the client cache is stored in temporary files which are removed when
variable, ZEO_CLIENT, or set the client keyword argument to the
constructor to a unique name for the client. This is needed so
that unique cache name files can be computed. Otherwise, the
client cache is stored in temporary files which are removed when
the ClientStorage shuts down. For example, to start two Zope
processes with unique caches, use something like:
processes with unique caches, use something like::
python z2.py -P8700 ZEO_CLIENT=8700
python z2.py -P8800 ZEO_CLIENT=8800
......@@ -74,9 +81,8 @@ Zope Enterprise Objects
different clients have different software installed, the correct
state of the database is ambiguous.
Starting in Zope 2.2, Zope will not modify the Zope database
during product installation if the environment variable ZEO_CLIENT
is set.
Zope will not modify the Zope database during product installation
if the environment variable ZEO_CLIENT is set.
Normally, Zope ZEO clients should be run with ZEO_CLIENT set so
that product initialization is not performed.
......
......@@ -12,7 +12,7 @@
#
##############################################################################
"""Implement a client cache
The cache is managed as two files, var/c0.zec and var/c1.zec.
Each cache file is a sequence of records of the form:
......@@ -73,145 +73,183 @@ file 0 and file 1.
"""
__version__ = "$Revision: 1.22 $"[11:-2]
__version__ = "$Revision: 1.23 $"[11:-2]
import os, tempfile
import os
import sys
import tempfile
from struct import pack, unpack
from thread import allocate_lock
import zLOG
magic='ZEC0'
import zLOG
from ZEO.ICache import ICache
def LOG(msg, level=zLOG.BLATHER):
def log(msg, level=zLOG.INFO):
zLOG.LOG("ZEC", level, msg)
magic='ZEC0'
class ClientCache:
__implements__ = ICache
def __init__(self, storage='', size=20000000, client=None, var=None):
# Allocate locks:
l=allocate_lock()
self._acquire=l.acquire
self._release=l.release
L = allocate_lock()
self._acquire = L.acquire
self._release = L.release
if client:
# Create a persistent cache
if var is None:
try: var=CLIENT_HOME
try:
var = CLIENT_HOME
except:
try: var=os.path.join(INSTANCE_HOME,'var')
except: var=os.getcwd()
try:
var = os.path.join(INSTANCE_HOME, 'var')
except:
var = os.getcwd()
# Get the list of cache file names
self._p=p=map(lambda i, p=storage, var=var, c=client:
os.path.join(var,'c%s-%s-%s.zec' % (p, c, i)),
(0,1))
self._p = p = map(lambda i, p=storage, var=var, c=client:
os.path.join(var, 'c%s-%s-%s.zec' % (p, c, i)),
(0, 1))
# get the list of cache files
self._f=f=[None, None]
self._f = f = [None, None]
# initialize cache serial numbers
s=['\0\0\0\0\0\0\0\0', '\0\0\0\0\0\0\0\0']
for i in 0,1:
for i in 0, 1:
if os.path.exists(p[i]):
fi=open(p[i],'r+b')
if fi.read(4)==magic: # Minimal sanity
fi.seek(0,2)
fi = open(p[i],'r+b')
if fi.read(4) == magic: # Minimal sanity
fi.seek(0, 2)
if fi.tell() > 30:
fi.seek(22)
s[i]=fi.read(8)
s[i] = fi.read(8)
# If we found a non-zero serial, then use the file
if s[i] != '\0\0\0\0\0\0\0\0': f[i]=fi
fi=None
if s[i] != '\0\0\0\0\0\0\0\0':
f[i] = fi
fi = None
# Whoever has the larger serial is the current
if s[1] > s[0]: current=1
elif s[0] > s[1]: current=0
if s[1] > s[0]:
current = 1
elif s[0] > s[1]:
current = 0
else:
if f[0] is None:
# We started, open the first cache file
f[0]=open(p[0], 'w+b')
f[0] = open(p[0], 'w+b')
f[0].write(magic)
current=0
f[1]=None
current = 0
f[1] = None
else:
self._f = f = [tempfile.TemporaryFile(suffix='.zec'), None]
# self._p file names 'None' signifies unnamed temp files.
# self._p file name 'None' signifies an unnamed temp file.
self._p = p = [None, None]
f[0].write(magic)
current=0
current = 0
self._limit=size/2
self._current=current
log("cache opened. current = %s" % current)
def close(self):
try:
self._f[self._current].close()
except (os.error, ValueError):
pass
self._limit = size / 2
self._current = current
def open(self):
# XXX open is overloaded to perform two tasks for
# optimization reasons
self._acquire()
try:
self._index=index={}
self._get=index.get
serial={}
f=self._f
current=self._current
self._get = index.get
serial = {}
f = self._f
current = self._current
if f[not current] is not None:
read_index(index, serial, f[not current], not current)
self._pos=read_index(index, serial, f[current], current)
self._pos = read_index(index, serial, f[current], current)
return serial.items()
finally: self._release()
finally:
self._release()
def close(self):
for f in self._f:
if f is not None:
# In 2.1 on Windows, the TemporaryFileWrapper doesn't allow
# closing a file more than once.
try:
f.close()
except OSError:
pass
def verify(self, verifyFunc):
"""Call the verifyFunc on every object in the cache.
verifyFunc(oid, serialno, version)
"""
for oid, (s, vs) in self.open():
verifyFunc(oid, s, vs)
def invalidate(self, oid, version):
self._acquire()
try:
p=self._get(oid, None)
if p is None: return None
f=self._f[p < 0]
ap=abs(p)
p = self._get(oid, None)
if p is None:
return None
f = self._f[p < 0]
ap = abs(p)
f.seek(ap)
h=f.read(8)
if h != oid: return
f.seek(8,1) # Dang, we shouldn't have to do this. Bad Solaris & NT
h = f.read(8)
if h != oid:
return
f.seek(8, 1) # Dang, we shouldn't have to do this. Bad Solaris & NT
if version:
f.write('n')
else:
del self._index[oid]
f.write('i')
finally: self._release()
finally:
self._release()
def load(self, oid, version):
self._acquire()
try:
p=self._get(oid, None)
if p is None: return None
f=self._f[p < 0]
ap=abs(p)
seek=f.seek
read=f.read
p = self._get(oid, None)
if p is None:
return None
f = self._f[p < 0]
ap = abs(p)
seek = f.seek
read = f.read
seek(ap)
h=read(27)
h = read(27)
if len(h)==27 and h[8] in 'nv' and h[:8]==oid:
tlen, vlen, dlen = unpack(">iHi", h[9:19])
else: tlen=-1
else:
tlen = -1
if tlen <= 0 or vlen < 0 or dlen < 0 or vlen+dlen > tlen:
del self._index[oid]
return None
if h[8]=='n':
if version: return None
if h[8]=='n':
if version:
return None
if not dlen:
del self._index[oid]
return None
return None
if not vlen or not version:
if dlen: return read(dlen), h[19:]
else: return None
if dlen:
return read(dlen), h[19:]
else:
return None
if dlen: seek(dlen, 1)
v=read(vlen)
if dlen:
seek(dlen, 1)
v = read(vlen)
if version != v:
if dlen:
seek(-dlen-vlen, 1)
......@@ -219,24 +257,25 @@ class ClientCache:
else:
return None
dlen=unpack(">i", read(4))[0]
dlen = unpack(">i", read(4))[0]
return read(dlen), read(8)
finally: self._release()
finally:
self._release()
def update(self, oid, serial, version, data):
self._acquire()
try:
if version:
# We need to find and include non-version data
p=self._get(oid, None)
p = self._get(oid, None)
if p is None:
return self._store(oid, '', '', version, data, serial)
f=self._f[p < 0]
ap=abs(p)
seek=f.seek
read=f.read
f = self._f[p < 0]
ap = abs(p)
seek = f.seek
read = f.read
seek(ap)
h=read(27)
h = read(27)
if len(h)==27 and h[8] in 'nv' and h[:8]==oid:
tlen, vlen, dlen = unpack(">iHi", h[9:19])
else:
......@@ -246,50 +285,56 @@ class ClientCache:
return self._store(oid, '', '', version, data, serial)
if dlen:
p=read(dlen)
s=h[19:]
else:
p = read(dlen)
s = h[19:]
else:
return self._store(oid, '', '', version, data, serial)
self._store(oid, p, s, version, data, serial)
else:
# Simple case, just store new data:
self._store(oid, data, serial, '', None, None)
finally: self._release()
finally:
self._release()
def modifiedInVersion(self, oid):
self._acquire()
try:
p=self._get(oid, None)
if p is None: return None
f=self._f[p < 0]
ap=abs(p)
seek=f.seek
read=f.read
p = self._get(oid, None)
if p is None:
return None
f = self._f[p < 0]
ap = abs(p)
seek = f.seek
read = f.read
seek(ap)
h=read(27)
h = read(27)
if len(h)==27 and h[8] in 'nv' and h[:8]==oid:
tlen, vlen, dlen = unpack(">iHi", h[9:19])
else: tlen=-1
else:
tlen = -1
if tlen <= 0 or vlen < 0 or dlen < 0 or vlen+dlen > tlen:
del self._index[oid]
return None
if h[8]=='n': return None
if h[8] == 'n':
return None
if not vlen: return ''
if not vlen:
return ''
seek(dlen, 1)
return read(vlen)
finally: self._release()
finally:
self._release()
def checkSize(self, size):
self._acquire()
try:
# Make sure we aren't going to exceed the target size.
# If we are, then flip the cache.
if self._pos+size > self._limit:
current=not self._current
self._current=current
if self._pos + size > self._limit:
current = not self._current
self._current = current
if self._p[current] is not None:
# Persistent cache file:
# Note that due to permission madness, waaa,
......@@ -297,113 +342,113 @@ class ClientCache:
# we open the new one. Waaaaaaaaaa.
if self._f[current] is not None:
self._f[current].close()
try: os.remove(self._p[current])
except: pass
self._f[current]=open(self._p[current],'w+b')
try:
os.remove(self._p[current])
except:
pass
self._f[current] = open(self._p[current],'w+b')
else:
# Temporary cache file:
if self._f[current] is not None:
self._f[current].close()
self._f[current] = tempfile.TemporaryFile(suffix='.zec')
self._f[current].write(magic)
self._pos=pos=4
finally: self._release()
self._pos = pos = 4
finally:
self._release()
def store(self, oid, p, s, version, pv, sv):
self._acquire()
try: self._store(oid, p, s, version, pv, sv)
finally: self._release()
try:
self._store(oid, p, s, version, pv, sv)
finally:
self._release()
def _store(self, oid, p, s, version, pv, sv):
if not s:
p=''
s='\0\0\0\0\0\0\0\0'
tlen=31+len(p)
p = ''
s = '\0\0\0\0\0\0\0\0'
tlen = 31 + len(p)
if version:
tlen=tlen+len(version)+12+len(pv)
vlen=len(version)
tlen = tlen + len(version) + 12 + len(pv)
vlen = len(version)
else:
vlen=0
vlen = 0
pos=self._pos
current=self._current
f=self._f[current]
f.seek(pos)
stlen=pack(">I",tlen)
write=f.write
write(oid+'v'+stlen+pack(">HI", vlen, len(p))+s)
if p: write(p)
stlen = pack(">I", tlen)
# accumulate various data to write into a list
l = [oid, 'v', stlen, pack(">HI", vlen, len(p)), s]
if p:
l.append(p)
if version:
write(version)
write(pack(">I", len(pv)))
write(pv)
write(sv)
write(stlen)
if current: self._index[oid]=-pos
else: self._index[oid]=pos
l.extend([version,
pack(">I", len(pv)),
pv, sv])
l.append(stlen)
f = self._f[self._current]
f.seek(self._pos)
f.write("".join(l))
if self._current:
self._index[oid] = - self._pos
else:
self._index[oid] = self._pos
self._pos=pos+tlen
self._pos += tlen
def read_index(index, serial, f, current):
LOG("read_index(%s)" % f.name)
seek=f.seek
read=f.read
pos=4
seek = f.seek
read = f.read
pos = 4
seek(0, 2)
size = f.tell()
while 1:
seek(pos)
h=read(27)
f.seek(pos)
h = read(27)
if len(h)==27 and h[8] in 'vni':
tlen, vlen, dlen = unpack(">iHi", h[9:19])
else:
break
if tlen <= 0 or vlen < 0 or dlen < 0 or vlen+dlen > tlen:
tlen = -1
if tlen <= 0 or vlen < 0 or dlen < 0 or vlen + dlen > tlen:
break
oid=h[:8]
oid = h[:8]
if h[8]=='v' and vlen:
seek(dlen+vlen, 1)
vdlen=read(4)
if len(vdlen) != 4: break
vdlen=unpack(">i", vdlen)[0]
if vlen+dlen+42+vdlen > tlen: break
vdlen = read(4)
if len(vdlen) != 4:
break
vdlen = unpack(">i", vdlen)[0]
if vlen+dlen+42+vdlen > tlen:
break
seek(vdlen, 1)
vs=read(8)
if read(4) != h[9:13]: break
else: vs=None
vs = read(8)
if read(4) != h[9:13]:
break
else:
vs = None
if h[8] in 'vn':
if current: index[oid]=-pos
else: index[oid]=pos
serial[oid]=h[-8:], vs
if current:
index[oid] = -pos
else:
index[oid] = pos
serial[oid] = h[-8:], vs
else:
if serial.has_key(oid):
# We have a record for this oid, but it was invalidated!
del serial[oid]
del index[oid]
pos=pos+tlen
pos = pos + tlen
f.seek(pos)
try: f.truncate()
except: pass
return pos
try:
f.truncate()
except:
pass
def main(files):
for file in files:
print file
index = {}
serial = {}
read_index(index, serial, open(file), 0)
print index.keys()
if __name__ == "__main__":
import sys
main(sys.argv[1:])
return pos
......@@ -13,185 +13,166 @@
##############################################################################
"""Network ZODB storage client
"""
__version__='$Revision: 1.40 $'[11:-2]
__version__='$Revision: 1.41 $'[11:-2]
import struct, time, os, socket, string
import tempfile, thread
from struct import pack, unpack
from types import TupleType
import cPickle
import os
import tempfile
import threading
import time
import Invalidator, ExtensionClass
import ThreadedAsync, Sync, zrpc, ClientCache
from ZEO import ClientCache, ServerStub
from ZEO.TransactionBuffer import TransactionBuffer
from ZEO.Exceptions import Disconnected
from ZEO.zrpc.client import ConnectionManager
from ZODB import POSException, BaseStorage
from ZODB import POSException
from ZODB.TimeStamp import TimeStamp
from zLOG import LOG, PROBLEM, INFO, BLATHER
from ZEO.logger import zLogger
log = zLogger("ZEO Client")
def log2(type, msg, subsys="ClientStorage %d" % os.getpid()):
LOG(subsys, type, msg)
try:
from ZODB.ConflictResolution import ResolvedSerial
except:
ResolvedSerial='rs'
except ImportError:
ResolvedSerial = 'rs'
class ClientStorageError(POSException.StorageError):
"""An error occured in the ZEO Client Storage"""
class UnrecognizedResult(ClientStorageError):
"""A server call returned an unrecognized result
"""
class ClientDisconnected(ClientStorageError):
"""The database storage is disconnected from the storage.
"""
class ClientStorage(ExtensionClass.Base, BaseStorage.BaseStorage):
_connected=_async=0
__begin='tpc_begin_sync'
def __init__(self, connection, storage='1', cache_size=20000000,
name='', client='', debug=0, var=None,
min_disconnect_poll=5, max_disconnect_poll=300,
wait_for_server_on_startup=1):
# Decide whether to use non-temporary files
client=client or os.environ.get('ZEO_CLIENT','')
self._connection=connection
self._storage=storage
self._debug=debug
self._wait_for_server_on_startup=wait_for_server_on_startup
self._info={'length': 0, 'size': 0, 'name': 'ZEO Client',
'supportsUndo':0, 'supportsVersions': 0,
}
if debug:
debug_log = log
else:
debug_log = None
self._call=zrpc.asyncRPC(connection, debug=debug_log,
tmin=min_disconnect_poll,
tmax=max_disconnect_poll)
name = name or str(connection)
self.closed = 0
self._tfile=tempfile.TemporaryFile()
self._oids=[]
self._serials=[]
self._seriald={}
"""A server call returned an unrecognized result"""
ClientStorage.inheritedAttribute('__init__')(self, name)
class ClientDisconnected(ClientStorageError, Disconnected):
"""The database storage is disconnected from the storage."""
self.__lock_acquire=self._lock_acquire
self._cache=ClientCache.ClientCache(
storage, cache_size, client=client, var=var)
def get_timestamp(prev_ts=None):
t = time.time()
t = apply(TimeStamp, (time.gmtime(t)[:5] + (t % 60,)))
if prev_ts is not None:
t = t.laterThan(prev_ts)
return t
class DisconnectedServerStub:
"""Raise ClientDisconnected on all attribute access."""
ThreadedAsync.register_loop_callback(self.becomeAsync)
def __getattr__(self, attr):
raise ClientDisconnected()
# IMPORTANT: Note that we aren't fully "there" yet.
# In particular, we don't actually connect to the server
# until we have a controlling database set with registerDB
# below.
disconnected_stub = DisconnectedServerStub()
def registerDB(self, db, limit):
"""Register that the storage is controlled by the given DB.
"""
# Among other things, we know that our data methods won't get
# called until after this call.
class ClientStorage:
self.invalidator = Invalidator.Invalidator(db.invalidate,
self._cache.invalidate)
def __init__(self, addr, storage='1', cache_size=20000000,
name='', client='', var=None,
min_disconnect_poll=5, max_disconnect_poll=300,
wait=0, read_only=0):
def out_of_band_hook(
code, args,
get_hook={
'b': (self.invalidator.begin, 0),
'i': (self.invalidator.invalidate, 1),
'e': (self.invalidator.end, 0),
'I': (self.invalidator.Invalidate, 1),
'U': (self._commit_lock_release, 0),
's': (self._serials.append, 1),
'S': (self._info.update, 1),
}.get):
self._server = disconnected_stub
self._is_read_only = read_only
self._storage = storage
hook = get_hook(code, None)
if hook is None: return
hook, flag = hook
if flag: hook(args)
else: hook()
self._info = {'length': 0, 'size': 0, 'name': 'ZEO Client',
'supportsUndo':0, 'supportsVersions': 0}
self._call.setOutOfBand(out_of_band_hook)
self._tbuf = TransactionBuffer()
self._db = None
self._oids = []
# _serials: stores (oid, serialno) as returned by server
# _seriald: _check_serials() moves from _serials to _seriald,
# which maps oid to serialno
self._serials = []
self._seriald = {}
# Now that we have our callback system in place, we can
# try to connect
self._basic_init(name or str(addr))
self._startup()
# Decide whether to use non-temporary files
client = client or os.environ.get('ZEO_CLIENT', '')
self._cache = ClientCache.ClientCache(storage, cache_size,
client=client, var=var)
self._cache.open() # XXX open now? or later?
self._rpc_mgr = ConnectionManager(addr, self,
tmin=min_disconnect_poll,
tmax=max_disconnect_poll)
# XXX What if we can only get a read-only connection and we
# want a read-write connection? Looks like the current code
# will block forever. (Future feature)
if wait:
self._rpc_mgr.connect(sync=1)
else:
if not self._rpc_mgr.attempt_connect():
self._rpc_mgr.connect()
def _startup(self):
def _basic_init(self, name):
"""Handle initialization activites of BaseStorage"""
if not self._call.connect(not self._wait_for_server_on_startup):
# XXX does anything depend on attr being __name__
self.__name__ = name
# If we can't connect right away, go ahead and open the cache
# and start a separate thread to try and reconnect.
# A ClientStorage only allows one client to commit at a time.
# A client enters the commit state by finding tpc_tid set to
# None and updating it to the new transaction's id. The
# tpc_tid variable is protected by tpc_cond.
self.tpc_cond = threading.Condition()
self._transaction = None
log.problem("Failed to connect to storage")
self._cache.open()
thread.start_new_thread(self._call.connect,(0,))
# Prevent multiple new_oid calls from going out. The _oids
# variable should only be modified while holding the
# oid_cond.
self.oid_cond = threading.Condition()
# If the connect succeeds then this work will be done by
# notifyConnected
commit_lock = threading.Lock()
self._commit_lock_acquire = commit_lock.acquire
self._commit_lock_release = commit_lock.release
def notifyConnected(self, s):
log.info("Connected to storage")
self._lock_acquire()
try:
# We let the connection keep coming up now that
# we have the storage lock. This way, we know no calls
# will be made while in the process of coming up.
t = self._ts = get_timestamp()
self._serial = `t`
self._oid='\0\0\0\0\0\0\0\0'
self._call.finishConnect(s)
def close(self):
if self._tbuf is not None:
self._tbuf.close()
if self._cache is not None:
self._cache.close()
self._rpc_mgr.close()
if self.closed:
return
def registerDB(self, db, limit):
"""Register that the storage is controlled by the given DB."""
log2(INFO, "registerDB(%s, %s)" % (repr(db), repr(limit)))
self._db = db
self._connected=1
self._oids=[]
def is_connected(self):
if self._server is disconnected_stub:
return 0
else:
return 1
# we do synchronous commits until we are sure that
# we have and are ready for a main loop.
def notifyConnected(self, c):
log2(INFO, "Connected to storage via %s" % repr(c))
# Hm. This is a little silly. If self._async, then
# we will really never do a synchronous commit.
# See below.
self.__begin='tpc_begin_sync'
self._call.message_output(str(self._storage))
# check the protocol version here?
### This seems silly. We should get the info asynchronously.
# self._info.update(self._call('get_info'))
stub = ServerStub.StorageServer(c)
cached=self._cache.open()
### This is a little expensive for large caches
if cached:
self._call.sendMessage('beginZeoVerify')
for oid, (s, vs) in cached:
self._call.sendMessage('zeoVerify', oid, s, vs)
self._call.sendMessage('endZeoVerify')
self._oids = []
finally: self._lock_release()
# XXX Why is this synchronous? If it were async, verification
# would start faster.
stub.register(str(self._storage), self._is_read_only)
self._info.update(stub.get_info())
self.verify_cache(stub)
if self._async:
import asyncore
self.becomeAsync(asyncore.socket_map)
# Don't make the server available to clients until after
# validating the cache
self._server = stub
def verify_cache(self, server):
server.beginZeoVerify()
self._cache.verify(server.zeoVerify)
server.endZeoVerify()
### Is there a race condition between notifyConnected and
### notifyDisconnected? In Particular, what if we get
......@@ -201,370 +182,334 @@ class ClientStorage(ExtensionClass.Base, BaseStorage.BaseStorage):
### notifyDisconnected had to get the instance lock. There's
### nothing to gain by getting the instance lock.
### Note that we *don't* have to worry about getting connected
### in the middle of notifyDisconnected, because *it's*
### responsible for starting the thread that makes the connection.
def notifyDisconnected(self):
log2(PROBLEM, "Disconnected from storage")
self._server = disconnected_stub
def notifyDisconnected(self, ignored):
log.problem("Disconnected from storage")
self._connected=0
self._transaction=None
thread.start_new_thread(self._call.connect,(0,))
if self._transaction is not None:
try:
self._commit_lock_release()
except:
pass
def __len__(self):
return self._info['length']
def becomeAsync(self, map):
self._lock_acquire()
try:
self._async=1
if self._connected:
self._call.setLoop(map, getWakeup())
self.__begin='tpc_begin'
finally: self._lock_release()
def getName(self):
return "%s (%s)" % (self.__name__, "XXX")
def __len__(self): return self._info['length']
def getSize(self):
return self._info['size']
def abortVersion(self, src, transaction):
if transaction is not self._transaction:
raise POSException.StorageTransactionError(self, transaction)
self._lock_acquire()
try:
oids=self._call('abortVersion', src, self._serial)
vlen = pack(">H", len(src))
for oid in oids:
self._tfile.write("i%s%s%s" % (oid, vlen, src))
return oids
finally: self._lock_release()
def supportsUndo(self):
return self._info['supportsUndo']
def close(self):
self._lock_acquire()
try:
log.info("close")
self._call.closeIntensionally()
try:
self._tfile.close()
except os.error:
# On Windows, this can fail if it is called more than
# once, because it tries to delete the file each
# time.
pass
self._cache.close()
if self.invalidator is not None:
self.invalidator.close()
self.invalidator = None
self.closed = 1
finally: self._lock_release()
def commitVersion(self, src, dest, transaction):
if transaction is not self._transaction:
raise POSException.StorageTransactionError(self, transaction)
self._lock_acquire()
def supportsVersions(self):
return self._info['supportsVersions']
def supportsTransactionalUndo(self):
try:
oids=self._call('commitVersion', src, dest, self._serial)
if dest:
vlen = pack(">H", len(src))
# just invalidate our version data
for oid in oids:
self._tfile.write("i%s%s%s" % (oid, vlen, src))
return self._info['supportsTransactionalUndo']
except KeyError:
return 0
def isReadOnly(self):
return self._is_read_only
def _check_trans(self, trans, exc=None):
if self._transaction is not trans:
if exc is None:
return 0
else:
vlen = pack(">H", len(dest))
# dest is '', so invalidate version and non-version
for oid in oids:
self._tfile.write("i%s%s%s" % (oid, vlen, dest))
return oids
finally: self._lock_release()
raise exc(self._transaction, trans)
return 1
def getName(self):
return "%s (%s)" % (
self.__name__,
self._connected and 'connected' or 'disconnected')
def _check_tid(self, tid, exc=None):
if self.tpc_tid != tid:
if exc is None:
return 0
else:
raise exc(self.tpc_tid, tid)
return 1
def abortVersion(self, src, transaction):
if self._is_read_only:
raise POSException.ReadOnlyError()
self._check_trans(transaction,
POSException.StorageTransactionError)
oids = self._server.abortVersion(src, self._serial)
for oid in oids:
self._tbuf.invalidate(oid, src)
return oids
def commitVersion(self, src, dest, transaction):
if self._is_read_only:
raise POSException.ReadOnlyError()
self._check_trans(transaction,
POSException.StorageTransactionError)
oids = self._server.commitVersion(src, dest, self._serial)
if dest:
# just invalidate our version data
for oid in oids:
self._tbuf.invalidate(oid, src)
else:
# dest is '', so invalidate version and non-version
for oid in oids:
self._tbuf.invalidate(oid, dest)
return oids
def getSize(self): return self._info['size']
def history(self, oid, version, length=1):
self._lock_acquire()
try: return self._call('history', oid, version, length)
finally: self._lock_release()
return self._server.history(oid, version, length)
def loadSerial(self, oid, serial):
self._lock_acquire()
try: return self._call('loadSerial', oid, serial)
finally: self._lock_release()
return self._server.loadSerial(oid, serial)
def load(self, oid, version, _stuff=None):
self._lock_acquire()
try:
cache=self._cache
p = cache.load(oid, version)
if p: return p
p, s, v, pv, sv = self._call('zeoLoad', oid)
cache.checkSize(0)
cache.store(oid, p, s, v, pv, sv)
if not v or not version or version != v:
if s: return p, s
raise KeyError, oid # no non-version data for this
p = self._cache.load(oid, version)
if p:
return p
if self._server is None:
raise ClientDisconnected()
p, s, v, pv, sv = self._server.zeoLoad(oid)
self._cache.checkSize(0)
self._cache.store(oid, p, s, v, pv, sv)
if v and version and v == version:
return pv, sv
finally: self._lock_release()
else:
if s:
return p, s
raise KeyError, oid # no non-version data for this
def modifiedInVersion(self, oid):
self._lock_acquire()
try:
v=self._cache.modifiedInVersion(oid)
if v is not None: return v
return self._call('modifiedInVersion', oid)
finally: self._lock_release()
v = self._cache.modifiedInVersion(oid)
if v is not None:
return v
return self._server.modifiedInVersion(oid)
def new_oid(self, last=None):
self._lock_acquire()
try:
oids=self._oids
if not oids:
oids[:]=self._call('new_oids')
oids.reverse()
return oids.pop()
finally: self._lock_release()
if self._is_read_only:
raise POSException.ReadOnlyError()
# avoid multiple oid requests to server at the same time
self.oid_cond.acquire()
if not self._oids:
self._oids = self._server.new_oids()
self._oids.reverse()
self.oid_cond.notifyAll()
oid = self._oids.pop()
self.oid_cond.release()
return oid
def pack(self, t=None, rf=None, wait=0, days=0):
# Note that we ignore the rf argument. The server
# will provide it's own implementation.
if t is None: t=time.time()
t=t-(days*86400)
self._lock_acquire()
try: return self._call('pack', t, wait)
finally: self._lock_release()
if self._is_read_only:
raise POSException.ReadOnlyError()
# rf argument ignored; server will provide it's own implementation
if t is None:
t = time.time()
t = t - (days * 86400)
return self._server.pack(t, wait)
def _check_serials(self):
if self._serials:
l = len(self._serials)
r = self._serials[:l]
del self._serials[:l]
for oid, s in r:
if isinstance(s, Exception):
raise s
self._seriald[oid] = s
return r
def store(self, oid, serial, data, version, transaction):
if transaction is not self._transaction:
raise POSException.StorageTransactionError(self, transaction)
self._lock_acquire()
try:
serial=self._call.sendMessage('storea', oid, serial,
data, version, self._serial)
write=self._tfile.write
buf = string.join(("s", oid,
pack(">HI", len(version), len(data)),
version, data), "")
write(buf)
if self._serials:
s=self._serials
l=len(s)
r=s[:l]
del s[:l]
d=self._seriald
for oid, s in r: d[oid]=s
return r
return serial
finally: self._lock_release()
if self._is_read_only:
raise POSException.ReadOnlyError()
self._check_trans(transaction, POSException.StorageTransactionError)
self._server.storea(oid, serial, data, version, self._serial)
self._tbuf.store(oid, version, data)
return self._check_serials()
def tpc_vote(self, transaction):
self._lock_acquire()
try:
if transaction is not self._transaction:
return
self._call('vote', self._serial)
if self._serials:
s=self._serials
l=len(s)
r=s[:l]
del s[:l]
d=self._seriald
for oid, s in r: d[oid]=s
return r
finally: self._lock_release()
def supportsUndo(self):
return self._info['supportsUndo']
def supportsVersions(self):
return self._info['supportsVersions']
def supportsTransactionalUndo(self):
try:
return self._info['supportsTransactionalUndo']
except KeyError:
return 0
if transaction is not self._transaction:
return
self._server.vote(self._serial)
return self._check_serials()
def tpc_abort(self, transaction):
self._lock_acquire()
if transaction is not self._transaction:
return
try:
if transaction is not self._transaction: return
self._call('tpc_abort', self._serial)
self._transaction=None
self._tfile.seek(0)
self._server.tpc_abort(self._serial)
self._tbuf.clear()
self._seriald.clear()
del self._serials[:]
self._commit_lock_release()
finally: self._lock_release()
finally:
self._transaction = None
self.tpc_cond.notify()
self.tpc_cond.release()
def tpc_begin(self, transaction, tid=None, status=' '):
self.tpc_cond.acquire()
while self._transaction is not None:
if self._transaction == transaction:
# Our tpc_cond lock is re-entrant. It is allowable for a
# client to call two tpc_begins in a row with the same
# transaction, and the second of these must be ignored. Our
# locking is safe because the acquire() above gives us a
# second lock on tpc_cond, and the following release() brings
# us back to owning just the one tpc_cond lock (acquired
# during the first of two consecutive tpc_begins).
self.tpc_cond.release()
return
self.tpc_cond.wait()
def tpc_begin(self, transaction):
self._lock_acquire()
try:
if self._transaction is transaction: return
user=transaction.user
desc=transaction.description
ext=transaction._extension
while 1:
self._lock_release()
self._commit_lock_acquire()
self._lock_acquire()
# We've got the local commit lock. Now get
# a (tentative) transaction time stamp.
t=time.time()
t=apply(TimeStamp,(time.gmtime(t)[:5]+(t%60,)))
self._ts=t=t.laterThan(self._ts)
id=`t`
try:
if not self._connected:
raise ClientDisconnected(
"This action is temporarily unavailable.<p>")
r=self._call(self.__begin, id, user, desc, ext)
except:
# XXX can't seem to guarantee that the lock is held here.
self._commit_lock_release()
raise
if r is None: break
# We have *BOTH* the local and distributed commit
# lock, now we can actually get ready to get started.
self._serial=id
self._tfile.seek(0)
self._seriald.clear()
del self._serials[:]
if self._server is None:
self.tpc_cond.release()
self._transaction = None
raise ClientDisconnected()
self._transaction=transaction
finally: self._lock_release()
if tid is None:
self._ts = get_timestamp(self._ts)
id = `self._ts`
else:
self._ts = TimeStamp(tid)
id = tid
self._transaction = transaction
def tpc_finish(self, transaction, f=None):
self._lock_acquire()
try:
if transaction is not self._transaction: return
if f is not None: f()
self._call('tpc_finish', self._serial,
transaction.user,
transaction.description,
transaction._extension)
seriald=self._seriald
if self._serials:
s=self._serials
l=len(s)
r=s[:l]
del s[:l]
for oid, s in r: seriald[oid]=s
tfile=self._tfile
seek=tfile.seek
read=tfile.read
cache=self._cache
size=tfile.tell()
cache.checkSize(size)
seek(0)
i=0
while i < size:
opcode=read(1)
if opcode == "s":
oid=read(8)
s=seriald[oid]
h=read(6)
vlen, dlen = unpack(">HI", h)
if vlen: v=read(vlen)
else: v=''
p=read(dlen)
if len(p) != dlen:
raise ClientStorageError, (
"Unexpected end of file in client storage "
"temporary file."
)
if s==ResolvedSerial:
self._cache.invalidate(oid, v)
else:
self._cache.update(oid, s, v, p)
i=i+15+vlen+dlen
elif opcode == "i":
oid=read(8)
h=read(2)
vlen=unpack(">H", h)[0]
v=read(vlen)
self._cache.invalidate(oid, v)
i=i+11+vlen
seek(0)
self._transaction=None
self._commit_lock_release()
finally: self._lock_release()
r = self._server.tpc_begin(id,
transaction.user,
transaction.description,
transaction._extension,
tid, status)
except:
# Client may have disconnected during the tpc_begin().
# Then notifyDisconnected() will have released the lock.
if self._server is not disconnected_stub:
self._transaction = None
self.tpc_cond.release()
raise
self._serial = id
self._seriald.clear()
del self._serials[:]
def transactionalUndo(self, trans_id, trans):
self._lock_acquire()
def tpc_finish(self, transaction, f=None):
if transaction is not self._transaction:
return
try:
if trans is not self._transaction:
raise POSException.StorageTransactionError(self, transaction)
oids = self._call('transactionalUndo', trans_id, self._serial)
for oid in oids:
# write invalidation records with no version
self._tfile.write("i%s\000\000" % oid)
return oids
finally: self._lock_release()
if f is not None:
f()
def undo(self, transaction_id):
self._lock_acquire()
self._server.tpc_finish(self._serial)
r = self._check_serials()
assert r is None or len(r) == 0, "unhandled serialnos: %s" % r
self._update_cache()
finally:
self._transaction = None
self.tpc_cond.notify()
self.tpc_cond.release()
def _update_cache(self):
# Iterate over the objects in the transaction buffer and
# update or invalidate the cache.
self._cache.checkSize(self._tbuf.get_size())
try:
oids=self._call('undo', transaction_id)
cinvalidate=self._cache.invalidate
for oid in oids:
cinvalidate(oid,'')
return oids
finally: self._lock_release()
self._tbuf.begin_iterate()
except ValueError, msg:
raise ClientStorageError, (
"Unexpected error reading temporary file in "
"client storage: %s" % msg)
while 1:
try:
t = self._tbuf.next()
except ValueError, msg:
raise ClientStorageError, (
"Unexpected error reading temporary file in "
"client storage: %s" % msg)
if t is None:
break
oid, v, p = t
if p is None: # an invalidation
s = None
else:
s = self._seriald[oid]
if s == ResolvedSerial or s is None:
self._cache.invalidate(oid, v)
else:
self._cache.update(oid, s, v, p)
self._tbuf.clear()
def transactionalUndo(self, trans_id, trans):
if self._is_read_only:
raise POSException.ReadOnlyError()
self._check_trans(trans, POSException.StorageTransactionError)
oids = self._server.transactionalUndo(trans_id, self._serial)
for oid in oids:
self._tbuf.invalidate(oid, '')
return oids
def undo(self, transaction_id):
if self._is_read_only:
raise POSException.ReadOnlyError()
# XXX what are the sync issues here?
oids = self._server.undo(transaction_id)
for oid in oids:
self._cache.invalidate(oid, '')
return oids
def undoInfo(self, first=0, last=-20, specification=None):
self._lock_acquire()
try:
return self._call('undoInfo', first, last, specification)
finally: self._lock_release()
return self._server.undoInfo(first, last, specification)
def undoLog(self, first, last, filter=None):
if filter is not None: return ()
self._lock_acquire()
try: return self._call('undoLog', first, last) # Eek!
finally: self._lock_release()
if filter is not None:
return () # can't pass a filter to server
return self._server.undoLog(first, last) # Eek!
def versionEmpty(self, version):
self._lock_acquire()
try: return self._call('versionEmpty', version)
finally: self._lock_release()
return self._server.versionEmpty(version)
def versions(self, max=None):
self._lock_acquire()
try: return self._call('versions', max)
finally: self._lock_release()
def sync(self): self._call.sync()
def status(self):
self._call.sendMessage('status')
def getWakeup(_w=[]):
if _w: return _w[0]
import trigger
t=trigger.trigger().pull_trigger
_w.append(t)
return t
return self._server.versions(max)
# below are methods invoked by the StorageServer
def serialnos(self, args):
self._serials.extend(args)
def info(self, dict):
self._info.update(dict)
def begin(self):
self._tfile = tempfile.TemporaryFile(suffix=".inv")
self._pickler = cPickle.Pickler(self._tfile, 1)
self._pickler.fast = 1 # Don't use the memo
def invalidate(self, args):
# Queue an invalidate for the end the transaction
if self._pickler is None:
return
self._pickler.dump(args)
def end(self):
if self._pickler is None:
return
self._pickler.dump((0,0))
self._tfile.seek(0)
unpick = cPickle.Unpickler(self._tfile)
f = self._tfile
self._tfile = None
while 1:
oid, version = unpick.load()
if not oid:
break
self._cache.invalidate(oid, version=version)
self._db.invalidate(oid, version=version)
f.close()
def Invalidate(self, args):
for oid, version in args:
self._cache.invalidate(oid, version=version)
try:
self._db.invalidate(oid, version=version)
except AttributeError, msg:
log2(PROBLEM,
"Invalidate(%s, %s) failed for _db: %s" % (repr(oid),
repr(version),
msg))
##############################################################################
#
# Copyright (c) 2001, 2002 Zope Corporation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.0 (ZPL). A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE
#
##############################################################################
"""Stub for interface exported by ClientStorage"""
class ClientStorage:
def __init__(self, rpc):
self.rpc = rpc
def beginVerify(self):
self.rpc.callAsync('begin')
# XXX must rename the two invalidate messages. I can never
# remember which is which
def invalidate(self, args):
self.rpc.callAsync('invalidate', args)
def Invalidate(self, args):
self.rpc.callAsync('Invalidate', args)
def endVerify(self):
self.rpc.callAsync('end')
def serialnos(self, arg):
self.rpc.callAsync('serialnos', arg)
def info(self, arg):
self.rpc.callAsync('info', arg)
##############################################################################
#
# Copyright (c) 2001, 2002 Zope Corporation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.0 (ZPL). A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE
#
##############################################################################
"""Log a transaction's commit info during two-phase commit.
A storage server allows multiple clients to commit transactions, but
must serialize them as the actually execute at the server. The
concurrent commits are achieved by logging actions up until the
tpc_vote(). At that point, the entire transaction is committed on the
real storage.
"""
import cPickle
import tempfile
class CommitLog:
def __init__(self):
self.file = tempfile.TemporaryFile(suffix=".log")
self.pickler = cPickle.Pickler(self.file, 1)
self.pickler.fast = 1
self.stores = 0
self.read = 0
def tpc_begin(self, t, tid, status):
self.t = t
self.tid = tid
self.status = status
def store(self, oid, serial, data, version):
self.pickler.dump((oid, serial, data, version))
self.stores += 1
def get_loader(self):
self.read = 1
self.file.seek(0)
return self.stores, cPickle.Unpickler(self.file)
##############################################################################
#
# Copyright (c) 2001, 2002 Zope Corporation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.0 (ZPL). A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE
#
##############################################################################
"""Exceptions for ZEO."""
class Disconnected(Exception):
"""Exception raised when a ZEO client is disconnected from the
ZEO server."""
try:
from Interface import Base
except ImportError:
class Base:
# a dummy interface for use when Zope's is unavailable
pass
class ICache(Base):
"""ZEO client cache.
__init__(storage, size, client, var)
All arguments optional.
storage -- name of storage
size -- max size of cache in bytes
client -- a string; if specified, cache is persistent.
var -- var directory to store cache files in
"""
def open():
"""Returns a sequence of object info tuples.
An object info tuple is a pair containing an object id and a
pair of serialnos, a non-version serialno and a version serialno:
oid, (serial, ver_serial)
This method builds an index of the cache and returns a
sequence used for cache validation.
"""
def close():
"""Closes the cache."""
def verify(func):
"""Call func on every object in cache.
func is called with three arguments
func(oid, serial, ver_serial)
"""
def invalidate(oid, version):
"""Remove object from cache."""
def load(oid, version):
"""Load object from cache.
Return None if object not in cache.
Return data, serialno if object is in cache.
"""
def store(oid, p, s, version, pv, sv):
"""Store a new object in the cache."""
def update(oid, serial, version, data):
"""Update an object already in the cache.
XXX This method is called to update objects that were modified by
a transaction. It's likely that it is already in the cache,
and it may be possible for the implementation to operate more
efficiently.
"""
def modifiedInVersion(oid):
"""Return the version an object is modified in.
'' signifies the trunk.
Returns None if the object is not in the cache.
"""
def checkSize(size):
"""Check if adding size bytes would exceed cache limit.
This method is often called just before store or update. The
size is a hint about the amount of data that is about to be
stored. The cache may want to evict some data to make space.
"""
##############################################################################
#
# Copyright (c) 2001, 2002 Zope Corporation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.0 (ZPL). A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE
#
##############################################################################
"""Stub for interface exposed by StorageServer"""
class StorageServer:
def __init__(self, rpc):
self.rpc = rpc
def register(self, storage_name, read_only):
self.rpc.call('register', storage_name, read_only)
def get_info(self):
return self.rpc.call('get_info')
def get_size_info(self):
return self.rpc.call('get_size_info')
def beginZeoVerify(self):
self.rpc.callAsync('beginZeoVerify')
def zeoVerify(self, oid, s, sv):
self.rpc.callAsync('zeoVerify', oid, s, sv)
def endZeoVerify(self):
self.rpc.callAsync('endZeoVerify')
def new_oids(self, n=None):
if n is None:
return self.rpc.call('new_oids')
else:
return self.rpc.call('new_oids', n)
def pack(self, t, wait=None):
if wait is None:
self.rpc.call('pack', t)
else:
self.rpc.call('pack', t, wait)
def zeoLoad(self, oid):
return self.rpc.call('zeoLoad', oid)
def storea(self, oid, serial, data, version, id):
self.rpc.callAsync('storea', oid, serial, data, version, id)
def tpc_begin(self, id, user, descr, ext, tid, status):
return self.rpc.call('tpc_begin', id, user, descr, ext, tid, status)
def vote(self, trans_id):
return self.rpc.call('vote', trans_id)
def tpc_finish(self, id):
return self.rpc.call('tpc_finish', id)
def tpc_abort(self, id):
self.rpc.callAsync('tpc_abort', id)
def abortVersion(self, src, id):
return self.rpc.call('abortVersion', src, id)
def commitVersion(self, src, dest, id):
return self.rpc.call('commitVersion', src, dest, id)
def history(self, oid, version, length=None):
if length is not None:
return self.rpc.call('history', oid, version)
else:
return self.rpc.call('history', oid, version, length)
def load(self, oid, version):
return self.rpc.call('load', oid, version)
def loadSerial(self, oid, serial):
return self.rpc.call('loadSerial', oid, serial)
def modifiedInVersion(self, oid):
return self.rpc.call('modifiedInVersion', oid)
def new_oid(self, last=None):
if last is None:
return self.rpc.call('new_oid')
else:
return self.rpc.call('new_oid', last)
def store(self, oid, serial, data, version, trans):
return self.rpc.call('store', oid, serial, data, version, trans)
def transactionalUndo(self, trans_id, trans):
return self.rpc.call('transactionalUndo', trans_id, trans)
def undo(self, trans_id):
return self.rpc.call('undo', trans_id)
def undoLog(self, first, last):
# XXX filter not allowed across RPC
return self.rpc.call('undoLog', first, last)
def undoInfo(self, first, last, spec):
return self.rpc.call('undoInfo', first, last, spec)
def versionEmpty(self, vers):
return self.rpc.call('versionEmpty', vers)
def versions(self, max=None):
if max is None:
return self.rpc.call('versions')
else:
return self.rpc.call('versions', max)
......@@ -11,660 +11,550 @@
# FOR A PARTICULAR PURPOSE
#
##############################################################################
"""Network ZODB storage server
__version__ = "$Revision: 1.36 $"[11:-2]
This server acts as a front-end for one or more real storages, like
file storage or Berkeley storage.
import asyncore, socket, string, sys, os
XXX Need some basic access control-- a declaration of the methods
exported for invocation by the server.
"""
import asyncore
import cPickle
from cPickle import Unpickler
from cStringIO import StringIO
from thread import start_new_thread
import time
from types import StringType
from ZODB import POSException
from ZODB.POSException import TransactionError, UndoError, VersionCommitError
from ZODB.Transaction import Transaction
import os
import sys
import threading
from ZEO import ClientStub
from ZEO.CommitLog import CommitLog
from ZEO.zrpc.server import Dispatcher
from ZEO.zrpc.connection import ManagedServerConnection, Delay
import zLOG
from ZODB.POSException import StorageError, StorageTransactionError, \
TransactionError, ReadOnlyError
from ZODB.referencesf import referencesf
from ZODB.utils import U64
from ZEO import trigger
from ZEO import asyncwrap
from ZEO.smac import Disconnected, SizedMessageAsyncConnection
from ZEO.logger import zLogger, format_msg
class StorageServerError(POSException.StorageError):
pass
from ZODB.Transaction import Transaction
from ZODB.TmpStore import TmpStore
# We create a special fast pickler! This allows us
# to create slightly more efficient pickles and
# to create them a tad faster.
pickler=cPickle.Pickler()
pickler.fast=1 # Don't use the memo
dump=pickler.dump
log = zLogger("ZEO Server")
class StorageServer(asyncore.dispatcher):
pickler = cPickle.Pickler()
pickler.fast = 1 # Don't use the memo
dump = pickler.dump
def log(message, level=zLOG.INFO, label="ZEO Server:%s" % os.getpid(),
error=None):
zLOG.LOG(label, level, message, error=error)
# a version of log that includes the storage name
def slog(storage, msg, level=zLOG.INFO, error=None, pid=os.getpid()):
name = getattr(storage, '__name__', None)
if name is None:
name = str(self.storage)
zLOG.LOG("ZEO Server:%s:%s" % (pid, name), level, msg, error=error)
class StorageServerError(StorageError):
pass
def __init__(self, connection, storages):
self.__storages=storages
for n, s in storages.items():
init_storage(s)
# Create a waiting list to support the distributed commit lock.
class StorageServer:
def __init__(self, addr, storages, read_only=0):
# XXX should read_only be a per-storage option? not yet...
self.addr = addr
self.storages = storages
for s in storages.values():
s._waiting = []
self.__connections={}
self.__get_connections=self.__connections.get
self._pack_trigger = trigger.trigger()
asyncore.dispatcher.__init__(self)
if type(connection) is type(''):
self.create_socket(socket.AF_UNIX, socket.SOCK_STREAM)
try: os.unlink(connection)
except: pass
else:
self.create_socket(socket.AF_INET, socket.SOCK_STREAM)
self.set_reuse_addr()
log.info('Listening on %s' % repr(connection))
self.bind(connection)
self.listen(5)
def register_connection(self, connection, storage_id):
storage=self.__storages.get(storage_id, None)
if storage is None:
log.error("Unknown storage_id: %s" % storage_id)
connection.close()
return None, None
connections=self.__get_connections(storage_id, None)
if connections is None:
self.__connections[storage_id]=connections=[]
connections.append(connection)
return storage, storage_id
def unregister_connection(self, connection, storage_id):
connections=self.__get_connections(storage_id, None)
if connections:
n=[]
for c in connections:
if c is not connection:
n.append(c)
self.__connections[storage_id]=n
def invalidate(self, connection, storage_id, invalidated=(), info=0,
dump=dump):
for c in self.__connections[storage_id]:
if invalidated and c is not connection:
c.message_output('I'+dump(invalidated, 1))
if info:
c.message_output('S'+dump(info, 1))
def writable(self): return 0
def handle_read(self): pass
def readable(self): return 1
def handle_connect (self): pass
def handle_accept(self):
try:
r = self.accept()
if r is None:
return
sock, addr = r
except socket.error, err:
log.warning("accept() failed: %s" % err)
else:
ZEOConnection(self, sock, addr)
def status(self):
"""Log status information about connections and storages"""
lines = []
for storage_id, connections in self.__connections.items():
s = "Storage %s has %d connections" % (storage_id,
len(connections))
lines.append(s)
for c in connections:
lines.append("%s readable=%s writeable=%s" % (
c, c.readable(), c.writable()))
lines.append("\t" + c.stats())
log.info(string.join(lines, "\n"))
return _noreturn
storage_methods={}
for n in (
'get_info', 'abortVersion', 'commitVersion',
'history', 'load', 'loadSerial',
'modifiedInVersion', 'new_oid', 'new_oids', 'pack', 'store',
'storea', 'tpc_abort', 'tpc_begin', 'tpc_begin_sync',
'tpc_finish', 'undo', 'undoLog', 'undoInfo', 'versionEmpty', 'versions',
'transactionalUndo',
'vote', 'zeoLoad', 'zeoVerify', 'beginZeoVerify', 'endZeoVerify',
'status'
):
storage_methods[n]=1
storage_method=storage_methods.has_key
def find_global(module, name,
global_dict=globals(), silly=('__doc__',)):
try: m=__import__(module, global_dict, global_dict, silly)
except:
raise StorageServerError, (
"Couldn\'t import global module %s" % module)
try:
r=getattr(m, name)
except:
raise StorageServerError, (
"Couldn\'t find global %s in module %s" % (name, module))
safe=getattr(r, '__no_side_effects__', 0)
if safe: return r
raise StorageServerError, 'Unsafe global, %s.%s' % (module, name)
_noreturn=[]
class ZEOConnection(SizedMessageAsyncConnection):
_transaction=None
__storage=__storage_id=None
def __init__(self, server, sock, addr):
self.__server=server
self.status = server.status
self.__invalidated=[]
self.__closed=None
if __debug__:
debug = log
else:
debug = None
if __debug__:
# store some detailed statistics about method calls
self._last_method = None
self._t_begin = None
self._t_end = None
self._ncalls = 0
SizedMessageAsyncConnection.__init__(self, sock, addr, debug=debug)
self.logaddr = repr(addr) # form of addr suitable for logging
log.info('Connect %s %s' % (id(self), self.logaddr))
def stats(self):
# This method is called via the status() command. The stats
# are of limited use for the current command, because the
# actual invocation of status() will clobber the previous
# method's statistics.
#
# When there are multiple connections active, a new connection
# can always get detailed statistics about other connections.
if __debug__:
if self._last_method == "status":
return "method=status begin=%s end=... ncalls=%d" % (
self._t_begin, self._ncalls)
if self._t_end is not None and self._t_begin is not None:
delta = self._t_end - self._t_begin
self.read_only = read_only
self.connections = {}
self.dispatcher = Dispatcher(addr, factory=self.newConnection,
reuse_addr=1)
def newConnection(self, sock, addr):
c = ManagedServerConnection(sock, addr, ZEOStorage(self), self)
log("new connection %s: %s" % (addr, `c`))
return c
def register(self, storage_id, proxy):
"""Register a connection's use with a particular storage.
This information is needed to handle invalidation.
"""
l = self.connections.get(storage_id)
if l is None:
l = self.connections[storage_id] = []
l.append(proxy)
def invalidate(self, conn, storage_id, invalidated=(), info=0):
for p in self.connections.get(storage_id, ()):
if invalidated and p is not conn:
p.client.Invalidate(invalidated)
else:
delta = -1
return "method=%s begin=%s end=%s delta=%.3f ncalls=%d" % (
self._last_method, self._t_begin, self._t_end, delta,
self._ncalls)
else:
return ""
p.client.info(info)
def close_server(self):
# Close the dispatcher so that there are no new connections.
self.dispatcher.close()
for storage in self.storages.values():
storage.close()
# Force the asyncore mainloop to exit by hackery, i.e. close
# every socket in the map. loop() will return when the map is
# empty.
for s in asyncore.socket_map.values():
try:
s.close()
except:
pass
def close(self, conn):
removed = 0
for sid, cl in self.connections.items():
if conn.obj in cl:
cl.remove(conn.obj)
removed = 1
class ZEOStorage:
def __init__(self, server):
self.server = server
self.client = None
self.__storage = None
self.__storage_id = "uninitialized"
self._transaction = None
def notifyConnected(self, conn):
self.client = ClientStub.ClientStorage(conn)
def __repr__(self):
return "<ZEOConnection %s%s" % (`self.addr`,
# sort of messy way to add tag 'closed' to
# connections that are closed
(self.__closed is None and '>' or ' closed>'))
def close(self):
t=self._transaction
if (t is not None and self.__storage is not None and
self.__storage._transaction is t):
self.tpc_abort(t.id)
else:
self._transaction=None
self.__invalidated=[]
self.__server.unregister_connection(self, self.__storage_id)
self.__closed=1
SizedMessageAsyncConnection.close(self)
log.info('Close %s' % id(self))
def message_input(self, message,
dump=dump, Unpickler=Unpickler, StringIO=StringIO,
None=None):
if __debug__:
self._t_begin = time.time()
self._t_end = None
if len(message) > 120: # XXX need constant from logger
tmp = `message[:120]`
tid = self._transaction and repr(self._transaction.id)
if self.__storage:
stid = self.__storage._transaction and \
repr(self.__storage._transaction.id)
else:
stid = None
name = self.__class__.__name__
return "<%s %X trans=%s s_trans=%s>" % (name, id(self), tid, stid)
def _log(self, msg, level=zLOG.INFO, error=None, pid=os.getpid()):
zLOG.LOG("ZEO Server:%s:%s" % (pid, self.__storage_id),
level, msg, error=error)
def setup_delegation(self):
"""Delegate several methods to the storage"""
self.undoInfo = self.__storage.undoInfo
self.undoLog = self.__storage.undoLog
self.versionEmpty = self.__storage.versionEmpty
self.versions = self.__storage.versions
self.history = self.__storage.history
self.load = self.__storage.load
self.loadSerial = self.__storage.loadSerial
def _check_tid(self, tid, exc=None):
caller = sys._getframe().f_back.f_code.co_name
if self._transaction is None:
self._log("no current transaction: %s()" % caller,
zLOG.PROBLEM)
if exc is not None:
raise exc(None, tid)
else:
tmp = `message`
log.trace("message_input %s" % tmp)
if self.__storage is None:
if __debug__:
log.blather("register connection to %s from %s" % (message,
self.logaddr))
# This is the first communication from the client
self.__storage, self.__storage_id = (
self.__server.register_connection(self, message))
# Send info back asynchronously, so client need not ask
self.message_output('S'+dump(self.get_info(), 1))
return
try:
# Unpickle carefully.
unpickler=Unpickler(StringIO(message))
unpickler.find_global=find_global
args=unpickler.load()
name, args = args[0], args[1:]
if __debug__:
self._last_method = name
self._ncalls = self._ncalls + 1
log.debug("call %s%s from %s" % (name, format_msg(args),
self.logaddr))
if not storage_method(name):
log.warning("Invalid method name: %s" % name)
if __debug__:
self._t_end = time.time()
raise 'Invalid Method Name', name
if hasattr(self, name):
r=apply(getattr(self, name), args)
return 0
if self._transaction.id != tid:
self._log("%s(%s) invalid; current transaction = %s" % \
(caller, repr(tid), repr(self._transaction.id)),
zLOG.PROBLEM)
if exc is not None:
raise exc(self._transaction.id, tid)
else:
r=apply(getattr(self.__storage, name), args)
if r is _noreturn:
if __debug__:
log.debug("no return to %s" % self.logaddr)
self._t_end = time.time()
return
except (UndoError, VersionCommitError), err:
if __debug__:
log.debug("return error %s to %s" % (err, self.logaddr))
self._t_end = time.time()
# These are normal usage errors. No need to log them.
self.return_error(sys.exc_info()[0], sys.exc_info()[1])
return
except:
if __debug__:
self._t_end = time.time()
log.error("error", error=sys.exc_info())
self.return_error(sys.exc_info()[0], sys.exc_info()[1])
return
return 0
return 1
if __debug__:
log.debug("return %s to %s" % (format_msg(r), self.logaddr))
self._t_end = time.time()
r=dump(r,1)
self.message_output('R'+r)
def register(self, storage_id, read_only):
"""Select the storage that this client will use
def return_error(self, err_type, err_value, type=type, dump=dump):
if type(err_value) is not type(self):
err_value = err_type, err_value
This method must be the first one called by the client.
"""
self._log("register(%s, %s)" % (storage_id, read_only))
storage = self.server.storages.get(storage_id)
if storage is None:
self._log("unknown storage_id: %s" % storage_id)
raise ValueError, "unknown storage: %s" % storage_id
if __debug__:
log.trace("%s E: %s" % (id(self), `err_value`))
try: r=dump(err_value, 1)
except:
# Ugh, must be an unpicklable exception
r=StorageServerError("Couldn't pickle error %s" % `r`)
dump('',1) # clear pickler
r=dump(r,1)
if not read_only and (self.server.read_only or storage.isReadOnly()):
raise ReadOnlyError()
self.message_output('E'+r)
self.__storage_id = storage_id
self.__storage = storage
self.setup_delegation()
self.server.register(storage_id, self)
self._log("registered storage %s: %s" % (storage_id, storage))
def get_info(self):
storage=self.__storage
info = {
'length': len(storage),
'size': storage.getSize(),
'name': storage.getName(),
}
for feature in ('supportsUndo',
'supportsVersions',
'supportsTransactionalUndo',):
if hasattr(storage, feature):
info[feature] = getattr(storage, feature)()
else:
info[feature] = 0
return info
return {'length': len(self.__storage),
'size': self.__storage.getSize(),
'name': self.__storage.getName(),
'supportsUndo': self.__storage.supportsUndo(),
'supportsVersions': self.__storage.supportsVersions(),
'supportsTransactionalUndo':
self.__storage.supportsTransactionalUndo(),
}
def get_size_info(self):
storage=self.__storage
return {
'length': len(storage),
'size': storage.getSize(),
}
return {'length': len(self.__storage),
'size': self.__storage.getSize(),
}
def zeoLoad(self, oid):
if __debug__:
log.blather("zeoLoad(%s) %s" % (U64(oid), self.logaddr))
storage=self.__storage
v=storage.modifiedInVersion(oid)
if v: pv, sv = storage.load(oid, v)
else: pv=sv=None
v = self.__storage.modifiedInVersion(oid)
if v:
pv, sv = self.__storage.load(oid, v)
else:
pv = sv = None
try:
p, s = storage.load(oid,'')
p, s = self.__storage.load(oid, '')
except KeyError:
if sv:
# Created in version, no non-version data
p=s=None
p = s = None
else:
raise
return p, s, v, pv, sv
def beginZeoVerify(self):
if __debug__:
log.blather("beginZeoVerify() %s" % self.logaddr)
self.message_output('bN.')
return _noreturn
def zeoVerify(self, oid, s, sv,
dump=dump):
try: p, os, v, pv, osv = self.zeoLoad(oid)
except: return _noreturn
p=pv=None # free the pickles
self.client.beginVerify()
def zeoVerify(self, oid, s, sv):
try:
p, os, v, pv, osv = self.zeoLoad(oid)
except: # except what?
return None
if os != s:
self.message_output('i'+dump((oid, ''),1))
self.client.invalidate((oid, ''))
elif osv != sv:
self.message_output('i'+dump((oid, v),1))
return _noreturn
self.client.invalidate((oid, v))
def endZeoVerify(self):
if __debug__:
log.blather("endZeoVerify() %s" % self.logaddr)
self.message_output('eN.')
return _noreturn
self.client.endVerify()
def new_oids(self, n=100):
new_oid=self.__storage.new_oid
if n < 0: n=1
r=range(n)
for i in r: r[i]=new_oid()
return r
def modifiedInVersion(self, oid):
return self.__storage.modifiedInVersion(oid)
def pack(self, t, wait=0):
start_new_thread(self._pack, (t,wait))
if wait: return _noreturn
t = threading.Thread(target=self._pack, args=(t, wait))
t.start()
def _pack(self, t, wait=0):
try:
log.blather('pack begin')
self.__storage.pack(t, referencesf)
log.blather('pack end')
except:
log.error(
'Pack failed for %s' % self.__storage_id,
error=sys.exc_info())
self._log('Pack failed for %s' % self.__storage_id,
zLOG.ERROR,
error=sys.exc_info())
if wait:
self.return_error(sys.exc_info()[0], sys.exc_info()[1])
self.__server._pack_trigger.pull_trigger()
raise
else:
if wait:
self.message_output('RN.')
self.__server._pack_trigger.pull_trigger()
else:
# XXX Why doesn't we broadcast on wait?
if not wait:
# Broadcast new size statistics
self.__server.invalidate(0, self.__storage_id, (),
self.get_size_info())
self.server.invalidate(0, self.__storage_id, (),
self.get_size_info())
def new_oids(self, n=100):
"""Return a sequence of n new oids, where n defaults to 100"""
if n < 0:
n = 1
return [self.__storage.new_oid() for i in range(n)]
def undo(self, transaction_id):
oids = self.__storage.undo(transaction_id)
if oids:
self.server.invalidate(self, self.__storage_id,
map(lambda oid: (oid, None, ''), oids))
return oids
return ()
def tpc_begin(self, id, user, description, ext, tid, status):
if self._transaction is not None:
if self._transaction.id == id:
self._log("duplicate tpc_begin(%s)" % repr(id))
return
else:
raise StorageTransactionError("Multiple simultaneous tpc_begin"
" requests from one client.")
if self.__storage._transaction is None:
self.strategy = ImmediateCommitStrategy(self.__storage,
self.client)
else:
self.strategy = DelayedCommitStrategy(self.__storage,
self.wait)
t = Transaction()
t.id = id
t.user = user
t.description = description
t._extension = ext
self.strategy.tpc_begin(t, tid, status)
self._transaction = t
def tpc_finish(self, id):
if not self._check_tid(id):
return
invalidated = self.strategy.tpc_finish()
if invalidated:
self.server.invalidate(self, self.__storage_id,
invalidated, self.get_size_info())
if not self._handle_waiting():
self._transaction = None
self.strategy = None
def tpc_abort(self, id):
if not self._check_tid(id):
return
self.strategy.tpc_abort()
if not self._handle_waiting():
self._transaction = None
self.strategy = None
# XXX handle new serialnos
def storea(self, oid, serial, data, version, id):
self._check_tid(id, exc=StorageTransactionError)
self.strategy.store(oid, serial, data, version)
def vote(self, id):
self._check_tid(id, exc=StorageTransactionError)
return self.strategy.tpc_vote()
def abortVersion(self, src, id):
t=self._transaction
if t is None or id != t.id:
raise POSException.StorageTransactionError(self, id)
oids=self.__storage.abortVersion(src, t)
a=self.__invalidated.append
for oid in oids: a((oid,src))
return oids
self._check_tid(id, exc=StorageTransactionError)
return self.strategy.abortVersion(src)
def commitVersion(self, src, dest, id):
t=self._transaction
if t is None or id != t.id:
raise POSException.StorageTransactionError(self, id)
oids=self.__storage.commitVersion(src, dest, t)
a=self.__invalidated.append
for oid in oids:
a((oid,dest))
if dest: a((oid,src))
return oids
self._check_tid(id, exc=StorageTransactionError)
return self.strategy.commitVersion(src, dest)
def transactionalUndo(self, trans_id, id):
self._check_tid(id, exc=StorageTransactionError)
return self.strategy.transactionalUndo(trans_id)
# When a delayed transaction is restarted, the dance is
# complicated. The restart occurs when one ZEOStorage instance
# finishes as a transaction and finds another instance is in the
# _waiting list.
# XXX It might be better to have a mechanism to explicitly send
# the finishing transaction's reply before restarting the waiting
# transaction. If the restart takes a long time, the previous
# client will be blocked until it finishes.
def wait(self):
if self.__storage._transaction:
d = Delay()
self.__storage._waiting.append((d, self))
self._log("Transaction block waiting for storage. "
"%d clients waiting." % len(self.__storage._waiting))
return d
else:
self.restart()
def _handle_waiting(self):
while self.__storage._waiting:
delay, zeo_storage = self.__storage._waiting.pop(0)
if self._restart(zeo_storage, delay):
break
def storea(self, oid, serial, data, version, id,
dump=dump):
if __debug__:
log.blather("storea(%s, [%d], %s) %s" % (U64(oid), len(data),
U64(id), self.logaddr))
def _restart(self, zeo_storage, delay):
# call the restart() method on the appropriate server
try:
t=self._transaction
if t is None or id != t.id:
raise POSException.StorageTransactionError(self, id)
newserial=self.__storage.store(oid, serial, data, version, t)
except TransactionError, v:
# This is a normal transaction errorm such as a conflict error
# or a version lock or conflict error. It doen't need to be
# logged.
newserial=v
zeo_storage.restart(delay)
except:
# all errors need to be serialized to prevent unexpected
# returns, which would screw up the return handling.
# IOW, Anything that ends up here is evil enough to be logged.
log.error('store error', error=sys.exc_info())
newserial=sys.exc_info()[1]
self._log("Unexpected error handling waiting transaction",
level=zLOG.WARNING, error=sys.exc_info())
zeo_storage.close()
return 0
else:
if serial != '\0\0\0\0\0\0\0\0':
self.__invalidated.append((oid, version))
return 1
def restart(self, delay=None):
old_strategy = self.strategy
self.strategy = ImmediateCommitStrategy(self.__storage,
self.client)
resp = old_strategy.restart(self.strategy)
if delay is not None:
delay.reply(resp)
try: r=dump((oid,newserial), 1)
except:
# We got a pickling error, must be because the
# newserial is an unpicklable exception.
r=StorageServerError("Couldn't pickle exception %s" % `newserial`)
dump('',1) # clear pickler
r=dump((oid, r),1)
# A ZEOStorage instance can use different strategies to commit a
# transaction. The current implementation uses different strategies
# depending on whether the underlying storage is available. These
# strategies implement the distributed commit lock.
self.message_output('s'+r)
return _noreturn
# If the underlying storage is availabe, start the commit immediately
# using the ImmediateCommitStrategy. If the underlying storage is not
# available because another client is committing a transaction, delay
# the commit as long as possible. At some point it will no longer be
# possible to delay; either the transaction will reach the vote stage
# or a synchronous method like transactionalUndo() will be called.
# When it is no longer possible to delay, the client must block until
# the storage is ready. Then we switch back to the immediate strategy.
def vote(self, id):
t=self._transaction
if t is None or id != t.id:
raise POSException.StorageTransactionError(self, id)
return self.__storage.tpc_vote(t)
class ICommitStrategy:
"""A class that describes that commit strategy interface.
def transactionalUndo(self, trans_id, id):
if __debug__:
log.blather("transactionalUndo(%s, %s) %s" % (trans_id,
U64(id), self.logaddr))
t=self._transaction
if t is None or id != t.id:
raise POSException.StorageTransactionError(self, id)
return self.__storage.transactionalUndo(trans_id, self._transaction)
def undo(self, transaction_id):
if __debug__:
log.blather("undo(%s) %s" % (transaction_id, self.logaddr))
oids=self.__storage.undo(transaction_id)
if oids:
self.__server.invalidate(
self, self.__storage_id, map(lambda oid: (oid,None), oids)
)
return oids
return ()
The commit strategy interface does not require the transaction
argument, except for tpc_begin(). The storage interface requires
the client to pass a transaction object/id to each transactional
method. The strategy does not; it requires the caller to only
call methods for a single transaction.
"""
# This isn't a proper Zope interface, because I don't want to
# introduce a dependency between ZODB and Zope interfaces.
# distributed commit lock support methods
# Only one client at a time can commit a transaction on a
# storage. If one client is committing a transaction, and a
# second client sends a tpc_begin(), then second client is queued.
# When the first transaction finishes, either by abort or commit,
# the request from the queued client must be handled.
# It is important that this code be robust. If a queued
# transaction is not restarted, the server will stop processing
# new transactions.
# This lock is implemented by storing the queued requests in a
# list on the storage object. The list contains:
# a callable object to resume request
# arguments to that object
# a callable object to handle errors during resume
# XXX I am not sure that the commitlock_resume() method is
# sufficiently paranoid.
def commitlock_suspend(self, resume, args, onerror):
self.__storage._waiting.append((resume, args, onerror))
log.blather("suspend %s. %d queued clients" % (resume.im_self,
len(self.__storage._waiting)))
def commitlock_resume(self):
waiting = self.__storage._waiting
while waiting:
resume, args, onerror = waiting.pop(0)
log.blather("resuming queued client %s, %d still queued" % (
resume.im_self, len(waiting)))
try:
if apply(resume, args):
break
except Disconnected:
# A disconnected error isn't an unexpected error.
# There should be no need to log it, because the
# disconnect will have generated its own log event.
onerror()
except:
log.error(
"Unexpected error handling queued tpc_begin()",
error=sys.exc_info())
onerror()
def tpc_begin(self, trans, tid, status): pass
def tpc_abort(self, id):
if __debug__:
try:
log.blather("tpc_abort(%s) %s" % (U64(id), self.logaddr))
except:
print repr(id)
raise
t = self._transaction
if t is None or id != t.id:
return
r = self.__storage.tpc_abort(t)
def store(self, oid, serial, data, version): pass
self._transaction = None
self.__invalidated = []
self.commitlock_resume()
def unlock(self):
if self.__closed:
return
self.message_output('UN.')
def tpc_begin(self, id, user, description, ext):
if __debug__:
log.blather("tpc_begin(%s, %s, %s) %s" % (U64(id), `user`,
`description`,
self.logaddr))
t = self._transaction
if t is not None:
if id == t.id:
return
else:
raise StorageServerError(
"Multiple simultaneous tpc_begin requests from the same "
"client."
)
storage = self.__storage
if storage._transaction is not None:
self.commitlock_suspend(self.unlock, (), self.close)
return 1 # Return a flag indicating a lock condition.
assert id != 't'
self._transaction=t=Transaction()
t.id=id
t.user=user
t.description=description
t._extension=ext
storage.tpc_begin(t)
self.__invalidated=[]
def tpc_begin_sync(self, id, user, description, ext):
if self.__closed: return
t=self._transaction
if t is not None and id == t.id: return
storage=self.__storage
if storage._transaction is None:
self.try_again_sync(id, user, description, ext)
def abortVersion(self, src): pass
def commitVersion(self, src, dest): pass
# the trans_id arg to transactionalUndo is not the current txn's id
def transactionalUndo(self, trans_id): pass
def tpc_vote(self): pass
def tpc_abort(self): pass
def tpc_finish(self): pass
class ImmediateCommitStrategy:
"""The storage is available so do a normal commit."""
def __init__(self, storage, client):
self.storage = storage
self.client = client
self.invalidated = []
self.serials = []
def tpc_begin(self, txn, tid, status):
self.txn = txn
self.storage.tpc_begin(txn, tid, status)
def tpc_vote(self):
# send all the serialnos as a batch
self.client.serialnos(self.serials)
return self.storage.tpc_vote(self.txn)
def tpc_finish(self):
self.storage.tpc_finish(self.txn)
return self.invalidated
def tpc_abort(self):
self.storage.tpc_abort(self.txn)
def store(self, oid, serial, data, version):
try:
newserial = self.storage.store(oid, serial, data, version,
self.txn)
except TransactionError, err:
# Storage errors are passed to the client
newserial = err
except:
# Unexpected storage errors are logged and passed to the client
exc_info = sys.exc_info()
slog(self.storage, "store error: %s, %s" % exc_info[:2],
zLOG.ERROR, error=exc_info)
newserial = exc_info[1]
del exc_info
else:
self.commitlock_suspend(self.try_again_sync,
(id, user, description, ext),
self.close)
return _noreturn
def try_again_sync(self, id, user, description, ext):
storage=self.__storage
if storage._transaction is None:
self._transaction=t=Transaction()
t.id=id
t.user=user
t.description=description
storage.tpc_begin(t)
self.__invalidated=[]
self.message_output('RN.')
return 1
if serial != "\0\0\0\0\0\0\0\0":
self.invalidated.append((oid, version))
def tpc_finish(self, id, user, description, ext):
if __debug__:
log.blather("tpc_finish(%s) %s" % (U64(id), self.logaddr))
t = self._transaction
if id != t.id:
return
try:
nil = dump(newserial, 1)
except:
msg = "Couldn't pickle storage exception: %s" % repr(newserial)
slog(self.storage, msg, zLOG.ERROR)
dump('', 1) # clear pickler
r = StorageServerError(msg)
newserial = r
self.serials.append((oid, newserial))
def commitVersion(self, src, dest):
oids = self.storage.commitVersion(src, dest, self.txn)
inv = [(oid, dest) for oid in oids]
self.invalidated.extend(inv)
if dest:
inv = [(oid, src) for oid in oids]
self.invalidated.extend(inv)
return oids
storage = self.__storage
r = storage.tpc_finish(t)
def abortVersion(self, src):
oids = self.storage.abortVersion(src, self.txn)
inv = [(oid, src) for oid in oids]
self.invalidated.extend(inv)
return oids
self._transaction = None
if self.__invalidated:
self.__server.invalidate(self, self.__storage_id,
self.__invalidated,
self.get_size_info())
self.__invalidated = []
self.commitlock_resume()
def init_storage(storage):
if not hasattr(storage,'tpc_vote'): storage.tpc_vote=lambda *args: None
if __name__=='__main__':
import ZODB.FileStorage
name, port = sys.argv[1:3]
log.trace(format_msg(name, port))
try:
port='', int(port)
except:
pass
d = {'1': ZODB.FileStorage.FileStorage(name)}
StorageServer(port, d)
asyncwrap.loop()
def transactionalUndo(self, trans_id):
oids = self.storage.transactionalUndo(trans_id, self.txn)
inv = [(oid, None) for oid in oids]
self.invalidated.extend(inv)
return oids
class DelayedCommitStrategy:
"""The storage is unavailable, so log to a file."""
def __init__(self, storage, block):
# the block argument is called when we can't delay any longer
self.storage = storage
self.block = block
self.log = CommitLog()
self.invalidated = []
# Store information about the call that blocks
self.name = None
self.args = None
def tpc_begin(self, txn, tid, status):
self.txn = txn
self.tid = tid
self.status = status
def store(self, oid, serial, data, version):
self.log.store(oid, serial, data, version)
def tpc_abort(self):
pass # just forget about this strategy
def tpc_finish(self):
raise RuntimeError, "Logic error. This method must not be called."
def tpc_vote(self):
self.name = "tpc_vote"
self.args = ()
return self.block()
def commitVersion(self, src, dest):
self.name = "commitVersion"
self.args = src, dest
return self.block()
def abortVersion(self, src):
self.name = "abortVersion"
self.args = src,
return self.block()
def transactionalUndo(self, trans_id):
self.name = "transactionalUndo"
self.args = trans_id,
return self.block()
def restart(self, new_strategy):
# called by the storage when the storage is available
new_strategy.tpc_begin(self.txn, self.tid, self.status)
loads, loader = self.log.get_loader()
for i in range(loads):
oid, serial, data, version = loader.load()
new_strategy.store(oid, serial, data, version)
meth = getattr(new_strategy, self.name)
return meth(*self.args)
##############################################################################
#
# Copyright (c) 2001, 2002 Zope Corporation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.0 (ZPL). A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE
#
##############################################################################
"""A TransactionBuffer store transaction updates until commit or abort.
A transaction may generate enough data that it is not practical to
always hold pending updates in memory. Instead, a TransactionBuffer
is used to store the data until a commit or abort.
"""
# A faster implementation might store trans data in memory until it
# reaches a certain size.
import tempfile
import cPickle
class TransactionBuffer:
def __init__(self):
self.file = tempfile.TemporaryFile(suffix=".tbuf")
self.count = 0
self.size = 0
# It's safe to use a fast pickler because the only objects
# stored are builtin types -- strings or None.
self.pickler = cPickle.Pickler(self.file, 1)
self.pickler.fast = 1
def close(self):
try:
self.file.close()
except OSError:
pass
def store(self, oid, version, data):
"""Store oid, version, data for later retrieval"""
self.pickler.dump((oid, version, data))
self.count += 1
# Estimate per-record cache size
self.size = self.size + len(data) + (27 + 12)
if version:
self.size = self.size + len(version) + 4
def invalidate(self, oid, version):
self.pickler.dump((oid, version, None))
self.count += 1
def clear(self):
"""Mark the buffer as empty"""
self.file.seek(0)
self.count = 0
self.size = 0
# unchecked constraints:
# 1. can't call store() after begin_iterate()
# 2. must call clear() after iteration finishes
def begin_iterate(self):
"""Move the file pointer in advance of iteration"""
self.file.flush()
self.file.seek(0)
self.unpickler = cPickle.Unpickler(self.file)
def next(self):
"""Return next tuple of data or None if EOF"""
if self.count == 0:
del self.unpickler
return None
oid_ver_data = self.unpickler.load()
self.count -= 1
return oid_ver_data
def get_size(self):
"""Return size of data stored in buffer (just a hint)."""
return self.size
......@@ -11,5 +11,3 @@
# FOR A PARTICULAR PURPOSE
#
##############################################################################
import fap
......@@ -14,11 +14,14 @@
"""Sized message async connections
"""
__version__ = "$Revision: 1.16 $"[11:-2]
__version__ = "$Revision: 1.17 $"[11:-2]
import asyncore, struct
from Exceptions import Disconnected
from zLOG import LOG, TRACE, ERROR, INFO, BLATHER
from types import StringType
import asyncore, string, struct, zLOG, sys, Acquisition
import socket, errno
from logger import zLogger
# Use the dictionary to make sure we get the minimum number of errno
# entries. We expect that EWOULDBLOCK == EAGAIN on most systems --
......@@ -38,81 +41,103 @@ tmp_dict = {errno.EAGAIN: 0,
expected_socket_write_errors = tuple(tmp_dict.keys())
del tmp_dict
class SizedMessageAsyncConnection(Acquisition.Explicit, asyncore.dispatcher):
class SizedMessageAsyncConnection(asyncore.dispatcher):
__super_init = asyncore.dispatcher.__init__
__super_close = asyncore.dispatcher.close
__closed = 1 # Marker indicating that we're closed
__append=None # Marker indicating that we're closed
socket = None # to outwit Sam's getattr
socket=None # to outwit Sam's getattr
READ_SIZE = 8096
def __init__(self, sock, addr, map=None, debug=None):
SizedMessageAsyncConnection.inheritedAttribute(
'__init__')(self, sock, map)
self.addr=addr
if debug is None and __debug__:
self._debug = zLogger("smac")
else:
self.addr = addr
if debug is not None:
self._debug = debug
self.__state=None
self.__inp=None
self.__inpl=0
self.__l=4
self.__output=output=[]
self.__append=output.append
self.__pop=output.pop
def handle_read(self,
join=string.join, StringType=type(''), _type=type,
_None=None):
elif not hasattr(self, '_debug'):
self._debug = __debug__ and 'smac'
self.__state = None
self.__inp = None # None, a single String, or a list
self.__input_len = 0
self.__msg_size = 4
self.__output = []
self.__closed = None
self.__super_init(sock, map)
# XXX avoid expensive getattr calls? Can't remember exactly what
# this comment was supposed to mean, but it has something to do
# with the way asyncore uses getattr and uses if sock:
def __nonzero__(self):
return 1
def handle_read(self):
# Use a single __inp buffer and integer indexes to make this
# fast.
try:
d=self.recv(8096)
except socket.error, err:
if err[0] in expected_socket_read_errors:
return
raise
if not d: return
inp=self.__inp
if inp is _None:
inp=d
elif _type(inp) is StringType:
inp=[inp,d]
if not d:
return
input_len = self.__input_len + len(d)
msg_size = self.__msg_size
state = self.__state
inp = self.__inp
if msg_size > input_len:
if inp is None:
self.__inp = d
elif type(self.__inp) is StringType:
self.__inp = [self.__inp, d]
else:
self.__inp.append(d)
self.__input_len = input_len
return # keep waiting for more input
# load all previous input and d into single string inp
if isinstance(inp, StringType):
inp = inp + d
elif inp is None:
inp = d
else:
inp.append(d)
inpl=self.__inpl+len(d)
l=self.__l
while 1:
if l <= inpl:
# Woo hoo, we have enough data
if _type(inp) is not StringType: inp=join(inp,'')
d=inp[:l]
inp=inp[l:]
inpl=inpl-l
if self.__state is _None:
# waiting for message
l=struct.unpack(">i",d)[0]
self.__state=1
else:
l=4
self.__state=_None
self.message_input(d)
inp = "".join(inp)
offset = 0
while (offset + msg_size) <= input_len:
msg = inp[offset:offset + msg_size]
offset = offset + msg_size
if state is None:
# waiting for message
msg_size = struct.unpack(">i", msg)[0]
state = 1
else:
break # not enough data
self.__l=l
self.__inp=inp
self.__inpl=inpl
msg_size = 4
state = None
self.message_input(msg)
def readable(self): return 1
def writable(self): return not not self.__output
self.__state = state
self.__msg_size = msg_size
self.__inp = inp[offset:]
self.__input_len = input_len - offset
def readable(self):
return 1
def writable(self):
if len(self.__output) == 0:
return 0
else:
return 1
def handle_write(self):
output=self.__output
output = self.__output
while output:
v=output[0]
v = output[0]
try:
n=self.send(v)
except socket.error, err:
......@@ -120,37 +145,33 @@ class SizedMessageAsyncConnection(Acquisition.Explicit, asyncore.dispatcher):
break # we couldn't write anything
raise
if n < len(v):
output[0]=v[n:]
output[0] = v[n:]
break # we can't write any more
else:
del output[0]
#break # waaa
def handle_close(self):
self.close()
def message_output(self, message,
pack=struct.pack, len=len):
if self._debug is not None:
if len(message) > 40:
m = message[:40]+' ...'
else:
m = message
self._debug.trace('message_output %s' % `m`)
append=self.__append
if append is None:
raise Disconnected("This action is temporarily unavailable.<p>")
append(pack(">i",len(message))+message)
def message_output(self, message):
if __debug__:
if self._debug:
if len(message) > 40:
m = message[:40]+' ...'
else:
m = message
LOG(self._debug, TRACE, 'message_output %s' % `m`)
if self.__closed is not None:
raise Disconnected, (
"This action is temporarily unavailable."
"<p>"
)
# do two separate appends to avoid copying the message string
self.__output.append(struct.pack(">i", len(message)))
self.__output.append(message)
def close(self):
if self.__append is not None:
self.__append=None
SizedMessageAsyncConnection.inheritedAttribute('close')(self)
class Disconnected(Exception):
"""The client has become disconnected from the server
"""
if self.__closed is None:
self.__closed = 1
self.__super_close()
......@@ -11,21 +11,23 @@
# FOR A PARTICULAR PURPOSE
#
##############################################################################
"""Start the server storage.
"""
__version__ = "$Revision: 1.32 $"[11:-2]
__version__ = "$Revision: 1.33 $"[11:-2]
import sys, os, getopt, string
import StorageServer
import asyncore
def directory(p, n=1):
d=p
while n:
d=os.path.split(d)[0]
if not d or d=='.': d=os.getcwd()
n=n-1
return d
def get_storage(m, n, cache={}):
......@@ -44,9 +46,11 @@ def get_storage(m, n, cache={}):
def main(argv):
me=argv[0]
sys.path[:]==filter(None, sys.path)
sys.path.insert(0, directory(me, 2))
# XXX hack for profiling support
global unix, storages, zeo_pid, asyncore
args=[]
last=''
for a in argv[1:]:
......@@ -77,23 +81,22 @@ def main(argv):
fs = os.path.join(var, 'Data.fs')
usage = """%s [options] [filename]
usage="""%s [options] [filename]
where options are:
-D -- Run in debug mode
-d -- Generate detailed debug logging without running
in the foreground.
-d -- Set STUPD_LOG_SEVERITY to -300
-U -- Unix-domain socket file to listen on
-u username or uid number
The username to run the ZEO server as. You may want to run
the ZEO server as 'nobody' or some other user with limited
resouces. The only works under Unix, and if the storage
server is started by root.
resouces. The only works under Unix, and if ZServer is
started by root.
-p port -- port to listen on
......@@ -116,30 +119,47 @@ def main(argv):
attr_name -- This is the name to which the storage object
is assigned in the module.
-P file -- Run under profile and dump output to file. Implies the
-s flag.
if no file name is specified, then %s is used.
""" % (me, fs)
try:
opts, args = getopt.getopt(args, 'p:Ddh:U:sS:u:')
except getopt.error, err:
print err
opts, args = getopt.getopt(args, 'p:Dh:U:sS:u:P:d')
except getopt.error, msg:
print usage
print msg
sys.exit(1)
port=None
debug=detailed=0
host=''
unix=None
Z=1
UID='nobody'
port = None
debug = 0
host = ''
unix =None
Z = 1
UID = 'nobody'
prof = None
detailed = 0
for o, v in opts:
if o=='-p': port=string.atoi(v)
elif o=='-h': host=v
elif o=='-U': unix=v
elif o=='-u': UID=v
elif o=='-D': debug=1
elif o=='-d': detailed=1
elif o=='-s': Z=0
if o=='-p':
port = int(v)
elif o=='-h':
host = v
elif o=='-U':
unix = v
elif o=='-u':
UID = v
elif o=='-D':
debug = 1
elif o=='-d':
detailed = 1
elif o=='-s':
Z = 0
elif o=='-P':
prof = v
if prof:
Z = 0
if port is None and unix is None:
print usage
......@@ -153,14 +173,16 @@ def main(argv):
sys.exit(1)
fs=args[0]
if debug: os.environ['Z_DEBUG_MODE']='1'
if detailed: os.environ['STUPID_LOG_SEVERITY']='-99999'
__builtins__.__debug__=debug
if debug:
os.environ['Z_DEBUG_MODE'] = '1'
if detailed:
os.environ['STUPID_LOG_SEVERITY'] = '-300'
from zLOG import LOG, INFO, ERROR
# Try to set uid to "-u" -provided uid.
# Try to set gid to "-u" user's primary group.
# Try to set gid to "-u" user's primary group.
# This will only work if this script is run by root.
try:
import pwd
......@@ -175,7 +197,7 @@ def main(argv):
uid = pwd.getpwuid(UID)[2]
gid = pwd.getpwuid(UID)[3]
else:
raise KeyError
raise KeyError
try:
if gid is not None:
try:
......@@ -200,7 +222,7 @@ def main(argv):
try:
import ZEO.StorageServer, asyncore
storages={}
for o, v in opts:
if o=='-S':
......@@ -243,15 +265,15 @@ def main(argv):
if not unix: unix=host, port
ZEO.StorageServer.StorageServer(unix, storages)
StorageServer.StorageServer(unix, storages)
try:
ppid, pid = os.getppid(), os.getpid()
except:
pass # getpid not supported
else:
open(zeo_pid,'w').write("%s %s" % (ppid, pid))
except:
# Log startup exception and tell zdaemon not to restart us.
info = sys.exc_info()
......@@ -269,7 +291,6 @@ def main(argv):
asyncore.loop()
def rotate_logs():
import zLOG
if hasattr(zLOG.log_write, 'reinitialize'):
......@@ -292,29 +313,21 @@ def shutdown(storages, die=1):
# unnecessary, since we now use so_reuseaddr.
for ignored in 1,2:
for socket in asyncore.socket_map.values():
try:
socket.close()
except:
pass
try: socket.close()
except: pass
for storage in storages.values():
try:
storage.close()
except:
pass
try: storage.close()
finally: pass
try:
from zLOG import LOG, INFO
LOG('ZEO Server', INFO,
"Shutting down (%s)" % (die and "shutdown" or "restart")
)
except:
pass
if die:
sys.exit(0)
else:
sys.exit(1)
except: pass
if die: sys.exit(0)
else: sys.exit(1)
if __name__ == '__main__':
main(sys.argv)
if __name__=='__main__': main(sys.argv)
......@@ -46,7 +46,7 @@ class TransUndoStorageWithCache:
# Make sure this doesn't load invalid data into the cache
self._storage.load(oid, '')
self._storage.tpc_vote(t)
self._storage.tpc_finish(t)
......
##############################################################################
#
# Copyright (c) 2002 Zope Corporation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.0 (ZPL). A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE
#
##############################################################################
"""Tests of the distributed commit lock."""
import threading
from ZODB.Transaction import Transaction
from ZODB.tests.StorageTestBase import zodb_pickle, MinPO
import ZEO.ClientStorage
from ZEO.Exceptions import Disconnected
ZERO = '\0'*8
class DummyDB:
def invalidate(self, *args):
pass
class WorkerThread(threading.Thread):
# run the entire test in a thread so that the blocking call for
# tpc_vote() doesn't hang the test suite.
def __init__(self, storage, trans, method="tpc_finish"):
self.storage = storage
self.trans = trans
self.method = method
threading.Thread.__init__(self)
def run(self):
try:
self.storage.tpc_begin(self.trans)
oid = self.storage.new_oid()
self.storage.store(oid, ZERO, zodb_pickle(MinPO("c")), '', self.trans)
oid = self.storage.new_oid()
self.storage.store(oid, ZERO, zodb_pickle(MinPO("c")), '', self.trans)
self.storage.tpc_vote(self.trans)
if self.method == "tpc_finish":
self.storage.tpc_finish(self.trans)
else:
self.storage.tpc_abort(self.trans)
except Disconnected:
pass
class CommitLockTests:
# The commit lock tests verify that the storage successfully
# blocks and restarts transactions when there is content for a
# single storage. There are a lot of cases to cover.
# CommitLock1 checks the case where a single transaction delays
# other transactions before they actually block. IOW, by the time
# the other transactions get to the vote stage, the first
# transaction has finished.
def checkCommitLock1OnCommit(self):
self._storages = []
try:
self._checkCommitLock("tpc_finish", self._dosetup1, self._dowork1)
finally:
self._cleanup()
def checkCommitLock1OnAbort(self):
self._storages = []
try:
self._checkCommitLock("tpc_abort", self._dosetup1, self._dowork1)
finally:
self._cleanup()
def checkCommitLock2OnCommit(self):
self._storages = []
try:
self._checkCommitLock("tpc_finish", self._dosetup2, self._dowork2)
finally:
self._cleanup()
def checkCommitLock2OnAbort(self):
self._storages = []
try:
self._checkCommitLock("tpc_abort", self._dosetup2, self._dowork2)
finally:
self._cleanup()
def _cleanup(self):
for store, trans in self._storages:
store.tpc_abort(trans)
store.close()
self._storages = []
def _checkCommitLock(self, method_name, dosetup, dowork):
# check the commit lock when a client attemps a transaction,
# but fails/exits before finishing the commit.
# Start on transaction normally.
t = Transaction()
self._storage.tpc_begin(t)
# Start a second transaction on a different connection without
# blocking the test thread.
self._storages = []
for i in range(4):
storage2 = self._duplicate_client()
t2 = Transaction()
tid = `ZEO.ClientStorage.get_timestamp()` # XXX why?
dosetup(storage2, t2, tid)
if i == 0:
storage2.close()
else:
self._storages.append((storage2, t2))
oid = self._storage.new_oid()
self._storage.store(oid, ZERO, zodb_pickle(MinPO(1)), '', t)
self._storage.tpc_vote(t)
if method_name == "tpc_finish":
self._storage.tpc_finish(t)
self._storage.load(oid, '')
else:
self._storage.tpc_abort(t)
dowork(method_name)
# Make sure the server is still responsive
self._dostore()
def _dosetup1(self, storage, trans, tid):
storage.tpc_begin(trans, tid)
def _dowork1(self, method_name):
for store, trans in self._storages:
oid = store.new_oid()
store.store(oid, ZERO, zodb_pickle(MinPO("c")), '', trans)
store.tpc_vote(trans)
if method_name == "tpc_finish":
store.tpc_finish(trans)
else:
store.tpc_abort(trans)
def _dosetup2(self, storage, trans, tid):
self._threads = []
t = WorkerThread(storage, trans)
self._threads.append(t)
t.start()
def _dowork2(self, method_name):
for t in self._threads:
t.join()
def _duplicate_client(self):
"Open another ClientStorage to the same server."
# XXX argh it's hard to find the actual address
# The rpc mgr addr attribute is a list. Each element in the
# list is a socket domain (AF_INET, AF_UNIX, etc.) and an
# address.
addr = self._storage._rpc_mgr.addr[0][1]
new = ZEO.ClientStorage.ClientStorage(addr, wait=1)
new.registerDB(DummyDB(), None)
return new
def _get_timestamp(self):
t = time.time()
t = apply(TimeStamp,(time.gmtime(t)[:5]+(t%60,)))
return `t`
##############################################################################
#
# Copyright (c) 2002 Zope Corporation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.0 (ZPL). A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE
#
##############################################################################
"""Compromising positions involving threads."""
import threading
from ZODB.Transaction import Transaction
from ZODB.tests.StorageTestBase import zodb_pickle, MinPO
import ZEO.ClientStorage
from ZEO.Exceptions import Disconnected
ZERO = '\0'*8
class BasicThread(threading.Thread):
def __init__(self, storage, doNextEvent, threadStartedEvent):
self.storage = storage
self.trans = Transaction()
self.doNextEvent = doNextEvent
self.threadStartedEvent = threadStartedEvent
self.gotValueError = 0
self.gotDisconnected = 0
threading.Thread.__init__(self)
class GetsThroughVoteThread(BasicThread):
# This thread gets partially through a transaction before it turns
# execution over to another thread. We're trying to establish that a
# tpc_finish() after a storage has been closed by another thread will get
# a ClientStorageError error.
#
# This class gets does a tpc_begin(), store(), tpc_vote() and is waiting
# to do the tpc_finish() when the other thread closes the storage.
def run(self):
self.storage.tpc_begin(self.trans)
oid = self.storage.new_oid()
self.storage.store(oid, ZERO, zodb_pickle(MinPO("c")), '', self.trans)
self.storage.tpc_vote(self.trans)
self.threadStartedEvent.set()
self.doNextEvent.wait(10)
try:
self.storage.tpc_finish(self.trans)
except ZEO.ClientStorage.ClientStorageError:
self.gotValueError = 1
self.storage.tpc_abort(self.trans)
class GetsThroughBeginThread(BasicThread):
# This class is like the above except that it is intended to be run when
# another thread is already in a tpc_begin(). Thus, this thread will
# block in the tpc_begin until another thread closes the storage. When
# that happens, this one will get disconnected too.
def run(self):
try:
self.storage.tpc_begin(self.trans)
except ZEO.ClientStorage.ClientStorageError:
self.gotValueError = 1
class AbortsAfterBeginFailsThread(BasicThread):
# This class is identical to GetsThroughBeginThread except that it
# attempts to tpc_abort() after the tpc_begin() fails. That will raise a
# ClientDisconnected exception which implies that we don't have the lock,
# and that's what we really want to test (but it's difficult given the
# threading module's API).
def run(self):
try:
self.storage.tpc_begin(self.trans)
except ZEO.ClientStorage.ClientStorageError:
self.gotValueError = 1
try:
self.storage.tpc_abort(self.trans)
except Disconnected:
self.gotDisconnected = 1
class ThreadTests:
# Thread 1 should start a transaction, but not get all the way through it.
# Main thread should close the connection. Thread 1 should then get
# disconnected.
def checkDisconnectedOnThread2Close(self):
doNextEvent = threading.Event()
threadStartedEvent = threading.Event()
thread1 = GetsThroughVoteThread(self._storage,
doNextEvent, threadStartedEvent)
thread1.start()
threadStartedEvent.wait(10)
self._storage.close()
doNextEvent.set()
thread1.join()
self.assertEqual(thread1.gotValueError, 1)
# Thread 1 should start a transaction, but not get all the way through
# it. While thread 1 is in the middle of the transaction, a second thread
# should start a transaction, and it will block in the tcp_begin() --
# because thread 1 has acquired the lock in its tpc_begin(). Now the main
# thread closes the storage and both sub-threads should get disconnected.
def checkSecondBeginFails(self):
doNextEvent = threading.Event()
threadStartedEvent = threading.Event()
thread1 = GetsThroughVoteThread(self._storage,
doNextEvent, threadStartedEvent)
thread2 = GetsThroughBeginThread(self._storage,
doNextEvent, threadStartedEvent)
thread1.start()
threadStartedEvent.wait(1)
thread2.start()
self._storage.close()
doNextEvent.set()
thread1.join()
thread2.join()
self.assertEqual(thread1.gotValueError, 1)
self.assertEqual(thread2.gotValueError, 1)
def checkThatFailedBeginDoesNotHaveLock(self):
doNextEvent = threading.Event()
threadStartedEvent = threading.Event()
thread1 = GetsThroughVoteThread(self._storage,
doNextEvent, threadStartedEvent)
thread2 = AbortsAfterBeginFailsThread(self._storage,
doNextEvent, threadStartedEvent)
thread1.start()
threadStartedEvent.wait(1)
thread2.start()
self._storage.close()
doNextEvent.set()
thread1.join()
thread2.join()
self.assertEqual(thread1.gotValueError, 1)
self.assertEqual(thread2.gotValueError, 1)
self.assertEqual(thread2.gotDisconnected, 1)
......@@ -15,14 +15,17 @@
import asyncore
import os
import profile
import random
import socket
import sys
import traceback
import types
import ZEO.ClientStorage, ZEO.StorageServer
import ZEO.ClientStorage
# Change value of PROFILE to enable server-side profiling
PROFILE = 0
if PROFILE:
import hotshot
def get_port():
"""Return a port that is not in use.
......@@ -47,21 +50,23 @@ def get_port():
if os.name == "nt":
def start_zeo_server(storage_name, args, port=None):
def start_zeo_server(storage_name, args, addr=None):
"""Start a ZEO server in a separate process.
Returns the ZEO port, the test server port, and the pid.
"""
import ZEO.tests.winserver
if port is None:
if addr is None:
port = get_port()
else:
port = addr[1]
script = ZEO.tests.winserver.__file__
if script.endswith('.pyc'):
script = script[:-1]
args = (sys.executable, script, str(port), storage_name) + args
d = os.environ.copy()
d['PYTHONPATH'] = os.pathsep.join(sys.path)
pid = os.spawnve(os.P_NOWAIT, sys.executable, args, os.environ)
pid = os.spawnve(os.P_NOWAIT, sys.executable, args, d)
return ('localhost', port), ('localhost', port + 1), pid
else:
......@@ -79,9 +84,11 @@ else:
buf = self.recv(4)
if buf:
assert buf == "done"
server.close_server()
asyncore.socket_map.clear()
def handle_close(self):
server.close_server()
asyncore.socket_map.clear()
class ZEOClientExit:
......@@ -90,38 +97,56 @@ else:
self.pipe = pipe
def close(self):
os.write(self.pipe, "done")
os.close(self.pipe)
try:
os.write(self.pipe, "done")
os.close(self.pipe)
except os.error:
pass
def start_zeo_server(storage, addr):
def start_zeo_server(storage_name, args, addr):
assert isinstance(args, types.TupleType)
rd, wr = os.pipe()
pid = os.fork()
if pid == 0:
if PROFILE:
p = profile.Profile()
p.runctx("run_server(storage, addr, rd, wr)", globals(),
locals())
p.dump_stats("stats.s.%d" % os.getpid())
else:
run_server(storage, addr, rd, wr)
import ZEO.zrpc.log
reload(ZEO.zrpc.log)
try:
if PROFILE:
p = hotshot.Profile("stats.s.%d" % os.getpid())
p.runctx("run_server(storage, addr, rd, wr)",
globals(), locals())
p.close()
else:
run_server(addr, rd, wr, storage_name, args)
except:
print "Exception in ZEO server process"
traceback.print_exc()
os._exit(0)
else:
os.close(rd)
return pid, ZEOClientExit(wr)
def run_server(storage, addr, rd, wr):
def load_storage(name, args):
package = __import__("ZODB." + name)
mod = getattr(package, name)
klass = getattr(mod, name)
return klass(*args)
def run_server(addr, rd, wr, storage_name, args):
# in the child, run the storage server
global server
os.close(wr)
ZEOServerExit(rd)
serv = ZEO.StorageServer.StorageServer(addr, {'1':storage})
asyncore.loop()
os.close(rd)
import ZEO.StorageServer, ZEO.zrpc.server
storage = load_storage(storage_name, args)
server = ZEO.StorageServer.StorageServer(addr, {'1':storage})
ZEO.zrpc.server.loop()
storage.close()
if isinstance(addr, types.StringType):
os.unlink(addr)
def start_zeo(storage, cache=None, cleanup=None, domain="AF_INET",
storage_id="1", cache_size=20000000):
def start_zeo(storage_name, args, cache=None, cleanup=None,
domain="AF_INET", storage_id="1", cache_size=20000000):
"""Setup ZEO client-server for storage.
Returns a ClientStorage instance and a ZEOClientExit instance.
......@@ -137,10 +162,10 @@ else:
else:
raise ValueError, "bad domain: %s" % domain
pid, exit = start_zeo_server(storage, addr)
pid, exit = start_zeo_server(storage_name, args, addr)
s = ZEO.ClientStorage.ClientStorage(addr, storage_id,
debug=1, client=cache,
client=cache,
cache_size=cache_size,
min_disconnect_poll=0.5)
min_disconnect_poll=0.5,
wait=1)
return s, exit, pid
......@@ -69,16 +69,18 @@ def start_server(addr):
def start_client(addr, client_func=None):
pid = os.fork()
if pid == 0:
import ZEO.ClientStorage
if VERBOSE:
print "Client process started:", os.getpid()
cli = ZEO.ClientStorage.ClientStorage(addr, client=CLIENT_CACHE)
if client_func is None:
run(cli)
else:
client_func(cli)
cli.close()
os._exit(0)
try:
import ZEO.ClientStorage
if VERBOSE:
print "Client process started:", os.getpid()
cli = ZEO.ClientStorage.ClientStorage(addr, client=CLIENT_CACHE)
if client_func is None:
run(cli)
else:
client_func(cli)
cli.close()
finally:
os._exit(0)
else:
return pid
......
......@@ -41,7 +41,7 @@ Options:
-t n Number of concurrent threads to run.
"""
import asyncore
import asyncore
import sys, os, getopt, string, time
##sys.path.insert(0, os.getcwd())
......@@ -81,7 +81,7 @@ def work(db, results, nrep, compress, data, detailed, minimize, threadno=None):
for r in 1, 10, 100, 1000:
t = time.time()
conflicts = 0
jar = db.open()
while 1:
try:
......@@ -105,7 +105,7 @@ def work(db, results, nrep, compress, data, detailed, minimize, threadno=None):
else:
break
jar.close()
t = time.time() - t
if detailed:
if threadno is None:
......@@ -205,11 +205,11 @@ def mean(l):
for v in l:
tot = tot + v
return tot / len(l)
##def compress(s):
## c = zlib.compressobj()
## o = c.compress(s)
## return o + c.flush()
## return o + c.flush()
if __name__=='__main__':
main(sys.argv[1:])
......@@ -103,8 +103,13 @@ def start_child(zaddr):
pid = os.fork()
if pid != 0:
return pid
storage = ClientStorage(zaddr, debug=1, min_disconnect_poll=0.5)
try:
_start_child(zaddr)
finally:
os._exit(0)
def _start_child(zaddr):
storage = ClientStorage(zaddr, debug=1, min_disconnect_poll=0.5, wait=1)
db = ZODB.DB(storage, pool_size=NUM_CONNECTIONS)
setup(db.open())
conns = []
......@@ -129,7 +134,5 @@ def start_child(zaddr):
c.__count += 1
work(c)
os._exit(0)
if __name__ == "__main__":
main()
##############################################################################
#
# Copyright (c) 2001, 2002 Zope Corporation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.0 (ZPL). A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE
#
##############################################################################
import random
import unittest
from ZEO.TransactionBuffer import TransactionBuffer
def random_string(size):
"""Return a random string of size size."""
l = [chr(random.randrange(256)) for i in range(size)]
return "".join(l)
def new_store_data():
"""Return arbitrary data to use as argument to store() method."""
return random_string(8), '', random_string(random.randrange(1000))
def new_invalidate_data():
"""Return arbitrary data to use as argument to invalidate() method."""
return random_string(8), ''
class TransBufTests(unittest.TestCase):
def checkTypicalUsage(self):
tbuf = TransactionBuffer()
tbuf.store(*new_store_data())
tbuf.invalidate(*new_invalidate_data())
tbuf.begin_iterate()
while 1:
o = tbuf.next()
if o is None:
break
tbuf.clear()
def doUpdates(self, tbuf):
data = []
for i in range(10):
d = new_store_data()
tbuf.store(*d)
data.append(d)
d = new_invalidate_data()
tbuf.invalidate(*d)
data.append(d)
tbuf.begin_iterate()
for i in range(len(data)):
x = tbuf.next()
if x[2] is None:
# the tbuf add a dummy None to invalidates
x = x[:2]
self.assertEqual(x, data[i])
def checkOrderPreserved(self):
tbuf = TransactionBuffer()
self.doUpdates(tbuf)
def checkReusable(self):
tbuf = TransactionBuffer()
self.doUpdates(tbuf)
tbuf.clear()
self.doUpdates(tbuf)
tbuf.clear()
self.doUpdates(tbuf)
def test_suite():
return unittest.makeSuite(TransBufTests, 'check')
......@@ -16,9 +16,11 @@
import asyncore
import os
import random
import select
import socket
import sys
import tempfile
import thread
import time
import types
import unittest
......@@ -26,22 +28,20 @@ import unittest
import ZEO.ClientStorage, ZEO.StorageServer
import ThreadedAsync, ZEO.trigger
from ZODB.FileStorage import FileStorage
from ZODB.TimeStamp import TimeStamp
from ZODB.Transaction import Transaction
import thread
from ZODB.tests.StorageTestBase import zodb_pickle, MinPO
import zLOG
from ZEO.tests import forker, Cache
from ZEO.tests import forker, Cache, CommitLockTests, ThreadTests
from ZEO.smac import Disconnected
# Sorry Jim...
from ZODB.tests import StorageTestBase, BasicStorage, VersionStorage, \
TransactionalUndoStorage, TransactionalUndoVersionStorage, \
PackableStorage, Synchronization, ConflictResolution
PackableStorage, Synchronization, ConflictResolution, RevisionStorage, \
MTStorage, ReadOnlyStorage
from ZODB.tests.MinPO import MinPO
from ZODB.tests.StorageTestBase import zodb_unpickle
ZERO = '\0'*8
class DummyDB:
def invalidate(self, *args):
pass
......@@ -56,93 +56,22 @@ class PackWaitWrapper:
def pack(self, t, f):
self.storage.pack(t, f, wait=1)
class ZEOTestBase(StorageTestBase.StorageTestBase):
"""Version of the storage test class that supports ZEO.
For ZEO, we don't always get the serialno/exception for a
particular store as the return value from the store. But we
will get no later than the return value from vote.
"""
def _dostore(self, oid=None, revid=None, data=None, version=None,
already_pickled=0, user=None, description=None):
"""Do a complete storage transaction.
The defaults are:
- oid=None, ask the storage for a new oid
- revid=None, use a revid of ZERO
- data=None, pickle up some arbitrary data (the integer 7)
- version=None, use the empty string version
Returns the object's new revision id.
"""
if oid is None:
oid = self._storage.new_oid()
if revid is None:
revid = ZERO
if data is None:
data = MinPO(7)
if not already_pickled:
data = StorageTestBase.zodb_pickle(data)
if version is None:
version = ''
# Begin the transaction
t = Transaction()
if user is not None:
t.user = user
if description is not None:
t.description = description
self._storage.tpc_begin(t)
# Store an object
r1 = self._storage.store(oid, revid, data, version, t)
s1 = self._get_serial(r1)
# Finish the transaction
r2 = self._storage.tpc_vote(t)
s2 = self._get_serial(r2)
self._storage.tpc_finish(t)
# s1, s2 can be None or dict
assert not (s1 and s2)
return s1 and s1[oid] or s2 and s2[oid]
def _get_serial(self, r):
"""Return oid -> serialno dict from sequence of ZEO replies."""
d = {}
if r is None:
return None
if type(r) == types.StringType:
raise RuntimeError, "unexpected ZEO response: no oid"
else:
for oid, serial in r:
if isinstance(serial, Exception):
raise serial
d[oid] = serial
return d
# Some of the ZEO tests depend on the version of FileStorage available
# for the tests. If we run these tests using Zope 2.3, FileStorage
# doesn't support TransactionalUndo.
if hasattr(FileStorage, 'supportsTransactionalUndo'):
# XXX Assume that a FileStorage that supports transactional undo
# also supports conflict resolution.
class VersionDependentTests(
TransactionalUndoStorage.TransactionalUndoStorage,
TransactionalUndoVersionStorage.TransactionalUndoVersionStorage,
ConflictResolution.ConflictResolvingStorage,
ConflictResolution.ConflictResolvingTransUndoStorage):
pass
else:
class VersionDependentTests:
pass
class GenericTests(ZEOTestBase,
VersionDependentTests,
class GenericTests(StorageTestBase.StorageTestBase,
TransactionalUndoStorage.TransactionalUndoStorage,
TransactionalUndoVersionStorage.TransactionalUndoVersionStorage,
ConflictResolution.ConflictResolvingStorage,
ConflictResolution.ConflictResolvingTransUndoStorage,
Cache.StorageWithCache,
Cache.TransUndoStorageWithCache,
BasicStorage.BasicStorage,
VersionStorage.VersionStorage,
RevisionStorage.RevisionStorage,
PackableStorage.PackableStorage,
Synchronization.SynchronizedStorage,
MTStorage.MTStorage,
ReadOnlyStorage.ReadOnlyStorage,
CommitLockTests.CommitLockTests,
ThreadTests.ThreadTests,
):
"""An abstract base class for ZEO tests
......@@ -152,108 +81,76 @@ class GenericTests(ZEOTestBase,
returns a specific storage, e.g. FileStorage.
"""
__super_setUp = StorageTestBase.StorageTestBase.setUp
__super_tearDown = StorageTestBase.StorageTestBase.tearDown
def setUp(self):
"""Start a ZEO server using a Unix domain socket
The ZEO server uses the storage object returned by the
getStorage() method.
"""
self.__super_setUp()
zLOG.LOG("testZEO", zLOG.INFO, "setUp() %s" % self.id())
self.running = 1
client, exit, pid = forker.start_zeo(self.getStorage())
self._pid = pid
self._server = exit
client, exit, pid = forker.start_zeo(*self.getStorage())
self._pids = [pid]
self._servers = [exit]
self._storage = PackWaitWrapper(client)
client.registerDB(DummyDB(), None)
def tearDown(self):
"""Try to cause the tests to halt"""
self.running = 0
self._storage.status()
self._storage.close()
self._server.close()
os.waitpid(self._pid, 0)
for server in self._servers:
server.close()
for pid in self._pids:
os.waitpid(pid, 0)
self.delStorage()
self.__super_tearDown()
def checkTwoArgBegin(self):
# XXX ZEO doesn't support 2-arg begin
pass
def open(self, read_only=0):
# XXX Needed to support ReadOnlyStorage tests. Ought to be a
# cleaner way.
# Is this the only way to get the address?
addr = self._storage._rpc_mgr.addr[0][1]
self._storage.close()
self._storage = ZEO.ClientStorage.ClientStorage(addr, read_only=1,
wait=1)
def checkLargeUpdate(self):
obj = MinPO("X" * (10 * 128 * 1024))
self._dostore(data=obj)
def checkCommitLockOnCommit(self):
self._checkCommitLock("tpc_finish")
def checkCommitLockOnAbort(self):
self._checkCommitLock("tpc_abort")
def _checkCommitLock(self, method_name):
# check the commit lock when a client attemps a transaction,
# but fails/exits before finishing the commit.
# Start on transaction normally.
t = Transaction()
self._storage.tpc_begin(t)
# Start a second transaction on a different connection without
# blocking the test thread.
self._storages = []
for i in range(3):
storage2 = self._duplicate_client()
t2 = Transaction()
tid = self._get_timestamp()
storage2._call.sendMessage('tpc_begin_sync',
tid, t2.user, t2.description,
t2._extension)
if i == 0:
storage2.close()
else:
self._storages.append((storage2, t2))
oid = self._storage.new_oid()
self._storage.store(oid, None, '', '', t)
self._storage.tpc_vote(t)
self._storage.status()
self._storage.tpc_finish(t)
for store, trans in self._storages:
store.tpc_abort(trans)
store.close()
# Make sure the server is still responsive
self._dostore()
def _duplicate_client(self):
"Open another ClientStorage to the same server."
addr = self._storage._connection
new = ZEO.ClientStorage.ClientStorage(addr)
new.registerDB(DummyDB(), None)
return new
def checkZEOInvalidation(self):
addr = self._storage._rpc_mgr.addr[0][1]
storage2 = ZEO.ClientStorage.ClientStorage(addr, wait=1,
min_disconnect_poll=0.1)
try:
oid = self._storage.new_oid()
ob = MinPO('first')
revid1 = self._dostore(oid, data=ob)
data, serial = storage2.load(oid, '')
self.assertEqual(zodb_unpickle(data), MinPO('first'))
self.assertEqual(serial, revid1)
revid2 = self._dostore(oid, data=MinPO('second'), revid=revid1)
for n in range(3):
# Let the server and client talk for a moment.
# Is there a better way to do this?
asyncore.poll(0.1)
data, serial = storage2.load(oid, '')
self.assertEqual(zodb_unpickle(data), MinPO('second'),
'Invalidation message was not sent!')
self.assertEqual(serial, revid2)
finally:
storage2.close()
def _get_timestamp(self):
t = time.time()
t = apply(TimeStamp,(time.gmtime(t)[:5]+(t%60,)))
return `t`
class ZEOFileStorageTests(GenericTests):
__super_setUp = GenericTests.setUp
def setUp(self):
self.__fs_base = tempfile.mktemp()
self.__super_setUp()
def getStorage(self):
return FileStorage(self.__fs_base, create=1)
self.__fs_base = tempfile.mktemp()
return 'FileStorage', (self.__fs_base, '1')
def delStorage(self):
# file storage appears to create four files
for ext in '', '.index', '.lock', '.tmp':
for ext in '', '.index', '.lock', '.tmp', '.old':
path = self.__fs_base + ext
try:
os.remove(path)
......@@ -267,17 +164,14 @@ class WindowsGenericTests(GenericTests):
can't be created in the parent process and passed to the child.
All the work has to be done in the server's process.
"""
__super_setUp = StorageTestBase.StorageTestBase.setUp
__super_tearDown = StorageTestBase.StorageTestBase.tearDown
def setUp(self):
self.__super_setUp()
args = self.getStorageInfo()
name = args[0]
args = args[1:]
args = args[1]
zeo_addr, self.test_addr, self.test_pid = \
forker.start_zeo_server(name, args)
storage = ZEO.ClientStorage.ClientStorage(zeo_addr, debug=1,
storage = ZEO.ClientStorage.ClientStorage(zeo_addr, wait=1,
min_disconnect_poll=0.1)
self._storage = PackWaitWrapper(storage)
storage.registerDB(DummyDB(), None)
......@@ -288,16 +182,14 @@ class WindowsGenericTests(GenericTests):
s.connect(self.test_addr)
s.close()
# the connection should cause the storage server to die
## os.waitpid(self.test_pid, 0)
time.sleep(0.5)
self.delStorage()
self.__super_tearDown()
class WindowsZEOFileStorageTests(WindowsGenericTests):
def getStorageInfo(self):
self.__fs_base = tempfile.mktemp()
return 'FileStorage', self.__fs_base, '1'
return 'FileStorage', (self.__fs_base, '1') # create=1
def delStorage(self):
# file storage appears to create four files
......@@ -308,13 +200,13 @@ class WindowsZEOFileStorageTests(WindowsGenericTests):
except os.error:
pass
class ConnectionTests(ZEOTestBase):
class ConnectionTests(StorageTestBase.StorageTestBase):
"""Tests that explicitly manage the server process.
To test the cache or re-connection, these test cases explicit
start and stop a ZEO storage server.
"""
__super_tearDown = StorageTestBase.StorageTestBase.tearDown
ports = []
......@@ -322,53 +214,124 @@ class ConnectionTests(ZEOTestBase):
ports.append(random.randrange(25000, 30000))
del i
def openClientStorage(self, cache='', cache_size=200000, wait=1):
# defined by subclasses
pass
def setUp(self):
"""Start a ZEO server using a Unix domain socket
def shutdownServer(self):
# defined by subclasses
pass
The ZEO server uses the storage object returned by the
getStorage() method.
"""
self.running = 1
self.file = tempfile.mktemp()
self.addr = []
self._pids = []
self._servers = []
self._newAddr()
self._startServer()
def _newAddr(self):
self.addr.append(self._getAddr())
def _getAddr(self):
return 'localhost', self.ports.pop()
def openClientStorage(self, cache='', cache_size=200000, wait=1):
base = ZEO.ClientStorage.ClientStorage(self.addr,
client=cache,
cache_size=cache_size,
wait=wait,
min_disconnect_poll=0.1)
storage = PackWaitWrapper(base)
storage.registerDB(DummyDB(), None)
return storage
def tearDown(self):
"""Try to cause the tests to halt"""
if getattr(self, '_storage', None) is not None:
self._storage.close()
self.shutdownServer()
# file storage appears to create four files
for ext in '', '.index', '.lock', '.tmp':
path = self.file + ext
if os.path.exists(path):
os.unlink(path)
for i in range(len(self.addr)):
for ext in '', '.index', '.lock', '.tmp':
path = "%s.%s%s" % (self.file, i, ext)
if os.path.exists(path):
try:
os.unlink(path)
except os.error:
pass
for i in 0, 1:
path = "c1-test-%d.zec" % i
if os.path.exists(path):
os.unlink(path)
try:
os.unlink(path)
except os.error:
pass
self.__super_tearDown()
def checkMultipleAddresses(self):
for i in range(4):
self._newAddr()
self._storage = self.openClientStorage('test', 100000, wait=1)
oid = self._storage.new_oid()
obj = MinPO(12)
revid1 = self._dostore(oid, data=obj)
self._storage.close()
def checkMultipleServers(self):
# XXX crude test at first -- just start two servers and do a
# commit at each one.
self._newAddr()
self._storage = self.openClientStorage('test', 100000, wait=1)
self._dostore()
self.shutdownServer(index=0)
self._startServer(index=1)
# If we can still store after shutting down one of the
# servers, we must be reconnecting to the other server.
for i in range(10):
try:
self._dostore()
break
except Disconnected:
time.sleep(0.5)
def checkDisconnectionError(self):
# Make sure we get a Disconnected when we try to read an
# object when we're not connected to a storage server and the
# object is not in the cache.
self.shutdownServer()
self._storage = self.openClientStorage('test', 1000, wait=0)
self.assertRaises(Disconnected, self._storage.load, 'fredwash', '')
def checkBasicPersistence(self):
"""Verify cached data persists across client storage instances.
# Verify cached data persists across client storage instances.
To verify that the cache is being used, the test closes the
server and then starts a new client with the server down.
"""
self._storage = self.openClientStorage('test', 100000, 1)
# To verify that the cache is being used, the test closes the
# server and then starts a new client with the server down.
# When the server is down, a load() gets the data from its cache.
self._storage = self.openClientStorage('test', 100000, wait=1)
oid = self._storage.new_oid()
obj = MinPO(12)
revid1 = self._dostore(oid, data=obj)
self._storage.close()
self.shutdownServer()
self._storage = self.openClientStorage('test', 100000, 0)
self._storage = self.openClientStorage('test', 100000, wait=0)
data, revid2 = self._storage.load(oid, '')
assert zodb_unpickle(data) == MinPO(12)
assert revid1 == revid2
self.assertEqual(zodb_unpickle(data), MinPO(12))
self.assertEqual(revid1, revid2)
self._storage.close()
def checkRollover(self):
"""Check that the cache works when the files are swapped.
# Check that the cache works when the files are swapped.
In this case, only one object fits in a cache file. When the
cache files swap, the first object is effectively uncached.
"""
self._storage = self.openClientStorage('test', 1000, 1)
# In this case, only one object fits in a cache file. When the
# cache files swap, the first object is effectively uncached.
self._storage = self.openClientStorage('test', 1000, wait=1)
oid1 = self._storage.new_oid()
obj1 = MinPO("1" * 500)
revid1 = self._dostore(oid1, data=obj1)
......@@ -377,123 +340,88 @@ class ConnectionTests(ZEOTestBase):
revid2 = self._dostore(oid2, data=obj2)
self._storage.close()
self.shutdownServer()
self._storage = self.openClientStorage('test', 1000, 0)
self._storage = self.openClientStorage('test', 1000, wait=0)
self._storage.load(oid1, '')
self._storage.load(oid2, '')
self.assertRaises(Disconnected, self._storage.load, oid1, '')
def checkReconnection(self):
"""Check that the client reconnects when a server restarts."""
# Check that the client reconnects when a server restarts.
# XXX Seem to get occasional errors that look like this:
# File ZEO/zrpc2.py, line 217, in handle_request
# File ZEO/StorageServer.py, line 325, in storea
# File ZEO/StorageServer.py, line 209, in _check_tid
# StorageTransactionError: (None, <tid>)
# could system reconnect and continue old transaction?
from ZEO.ClientStorage import ClientDisconnected
self._storage = self.openClientStorage()
oid = self._storage.new_oid()
obj = MinPO(12)
revid1 = self._dostore(oid, data=obj)
zLOG.LOG("checkReconnection", zLOG.INFO,
"About to shutdown server")
self.shutdownServer()
self.running = 1
zLOG.LOG("checkReconnection", zLOG.INFO,
"About to restart server")
self._startServer(create=0)
oid = self._storage.new_oid()
obj = MinPO(12)
while 1:
try:
revid1 = self._dostore(oid, data=obj)
except (ClientDisconnected, thread.error, socket.error), err:
get_transaction().abort()
time.sleep(0.1)
else:
break
except (Disconnected, select.error, thread.error, socket.error), \
err:
zLOG.LOG("checkReconnection", zLOG.INFO,
"Error after server restart; retrying.",
error=sys.exc_info())
get_transaction().abort()
time.sleep(0.1) # XXX how long to sleep
# XXX This is a bloody pain. We're placing a heavy burden
# on users to catch a plethora of exceptions in order to
# write robust code. Need to think about implementing
# John Heintz's suggestion to make sure all exceptions
# inherit from POSException.
# inherit from POSException.
zLOG.LOG("checkReconnection", zLOG.INFO, "finished")
class UnixConnectionTests(ConnectionTests):
__super_setUp = StorageTestBase.StorageTestBase.setUp
def setUp(self):
"""Start a ZEO server using a Unix domain socket
The ZEO server uses the storage object returned by the
getStorage() method.
"""
self.running = 1
self.file = tempfile.mktemp()
self.addr = '', self.ports.pop()
self._startServer()
self.__super_setUp()
def _startServer(self, create=1):
fs = FileStorage(self.file, create=create)
self._pid, self._server = forker.start_zeo_server(fs, self.addr)
def openClientStorage(self, cache='', cache_size=200000, wait=1):
base = ZEO.ClientStorage.ClientStorage(self.addr,
client=cache,
cache_size=cache_size,
wait_for_server_on_startup=wait)
storage = PackWaitWrapper(base)
storage.registerDB(DummyDB(), None)
return storage
def shutdownServer(self):
def _startServer(self, create=1, index=0):
path = "%s.%d" % (self.file, index)
addr = self.addr[index]
pid, server = forker.start_zeo_server('FileStorage',
(path, create), addr)
self._pids.append(pid)
self._servers.append(server)
def shutdownServer(self, index=0):
if self.running:
self.running = 0
self._server.close()
os.waitpid(self._pid, 0)
self._servers[index].close()
try:
os.waitpid(self._pids[index], 0)
except os.error:
pass
class WindowsConnectionTests(ConnectionTests):
__super_setUp = StorageTestBase.StorageTestBase.setUp
def setUp(self):
self.file = tempfile.mktemp()
self._startServer()
self.__super_setUp()
def _startServer(self, create=1):
if create == 0:
port = self.addr[1]
else:
port = None
self.addr, self.test_a, pid = forker.start_zeo_server('FileStorage',
(self.file,
str(create)),
port)
self.running = 1
def openClientStorage(self, cache='', cache_size=200000, wait=1):
base = ZEO.ClientStorage.ClientStorage(self.addr,
client=cache,
cache_size=cache_size,
debug=1,
wait_for_server_on_startup=wait)
storage = PackWaitWrapper(base)
storage.registerDB(DummyDB(), None)
return storage
def shutdownServer(self):
def _startServer(self, create=1, index=0):
path = "%s.%d" % (self.file, index)
addr = self.addr[index]
_addr, test_addr, test_pid = forker.start_zeo_server('FileStorage',
(path, str(create)), addr)
self._pids.append(test_pid)
self._servers.append(test_addr)
def shutdownServer(self, index=0):
if self.running:
self.running = 0
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.connect(self.test_a)
s.connect(self._servers[index])
s.close()
time.sleep(1.0)
def tearDown(self):
self.shutdownServer()
def get_methods(klass):
l = [klass]
meth = {}
while l:
klass = l.pop(0)
for base in klass.__bases__:
l.append(base)
for k, v in klass.__dict__.items():
if callable(v):
meth[k] = 1
return meth.keys()
# XXX waitpid() isn't available until Python 2.3
time.sleep(0.5)
if os.name == "posix":
test_classes = ZEOFileStorageTests, UnixConnectionTests
......@@ -502,36 +430,12 @@ elif os.name == "nt":
else:
raise RuntimeError, "unsupported os: %s" % os.name
def makeTestSuite(testname=''):
def test_suite():
suite = unittest.TestSuite()
name = 'check' + testname
lname = len(name)
for klass in test_classes:
for meth in get_methods(klass):
if meth[:lname] == name:
suite.addTest(klass(meth))
sub = unittest.makeSuite(klass, 'check')
suite.addTest(sub)
return suite
def test_suite():
return makeTestSuite()
def main():
import sys, getopt
name_of_test = ''
opts, args = getopt.getopt(sys.argv[1:], 'n:')
for flag, val in opts:
if flag == '-n':
name_of_test = val
if args:
print "Did not expect arguments. Got %s" % args
return 0
tests = makeTestSuite(name_of_test)
runner = unittest.TextTestRunner()
runner.run(tests)
if __name__ == "__main__":
main()
unittest.main(defaultTest='test_suite')
......@@ -11,20 +11,16 @@
# FOR A PARTICULAR PURPOSE
#
##############################################################################
# This module is a simplified version of the select_trigger module
# from Sam Rushing's Medusa server.
import asyncore
import errno
import os
import socket
import string
import thread
if os.name == 'posix':
class trigger(asyncore.file_dispatcher):
class trigger (asyncore.file_dispatcher):
"Wake up a call to select() running in the main thread"
......@@ -56,46 +52,50 @@ if os.name == 'posix':
# new data onto a channel's outgoing data queue at the same time that
# the main thread is trying to remove some]
def __init__(self):
def __init__ (self):
r, w = self._fds = os.pipe()
self.trigger = w
asyncore.file_dispatcher.__init__(self, r)
asyncore.file_dispatcher.__init__ (self, r)
self.lock = thread.allocate_lock()
self.thunks = []
self._closed = None
# Override the asyncore close() method, because it seems that
# it would only close the r file descriptor and not w. The
# constructor calls file_dispactcher.__init__ and passes r,
# which would get stored in a file_wrapper and get closed by
# the default close. But that would leave w open...
def __del__(self):
os.close(self._fds[0])
os.close(self._fds[1])
def close(self):
if self._closed is None:
self._closed = 1
self.del_channel()
for fd in self._fds:
os.close(fd)
def __repr__(self):
return '<select-trigger(pipe) at %x>' % id(self)
def __repr__ (self):
return '<select-trigger (pipe) at %x>' % id(self)
def readable(self):
def readable (self):
return 1
def writable(self):
def writable (self):
return 0
def handle_connect(self):
def handle_connect (self):
pass
def pull_trigger(self, thunk=None):
# print 'PULL_TRIGGER: ', len(self.thunks)
def pull_trigger (self, thunk=None):
if thunk:
try:
self.lock.acquire()
self.thunks.append(thunk)
self.thunks.append (thunk)
finally:
self.lock.release()
os.write(self.trigger, 'x')
os.write (self.trigger, 'x')
def handle_read(self):
try:
self.recv(8192)
except os.error, err:
if err[0] == errno.EAGAIN: # resource temporarily unavailable
return
raise
def handle_read (self):
self.recv (8192)
try:
self.lock.acquire()
for thunk in self.thunks:
......@@ -104,7 +104,7 @@ if os.name == 'posix':
except:
nil, t, v, tbinfo = asyncore.compact_traceback()
print ('exception in trigger thunk:'
'(%s:%s %s)' % (t, v, tbinfo))
' (%s:%s %s)' % (t, v, tbinfo))
self.thunks = []
finally:
self.lock.release()
......@@ -116,13 +116,13 @@ else:
# win32-safe version
class trigger(asyncore.dispatcher):
class trigger (asyncore.dispatcher):
address = ('127.9.9.9', 19999)
def __init__(self):
a = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
w = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
def __init__ (self):
a = socket.socket (socket.AF_INET, socket.SOCK_STREAM)
w = socket.socket (socket.AF_INET, socket.SOCK_STREAM)
# set TCP_NODELAY to true to avoid buffering
w.setsockopt(socket.IPPROTO_TCP, 1, 1)
......@@ -139,51 +139,46 @@ else:
if port <= 19950:
raise 'Bind Error', 'Cannot bind trigger!'
port=port - 1
a.listen(1)
w.setblocking(0)
a.listen (1)
w.setblocking (0)
try:
w.connect(self.address)
w.connect (self.address)
except:
pass
r, addr = a.accept()
a.close()
w.setblocking(1)
w.setblocking (1)
self.trigger = w
asyncore.dispatcher.__init__(self, r)
asyncore.dispatcher.__init__ (self, r)
self.lock = thread.allocate_lock()
self.thunks = []
self._trigger_connected = 0
def __repr__(self):
def __repr__ (self):
return '<select-trigger (loopback) at %x>' % id(self)
def readable(self):
def readable (self):
return 1
def writable(self):
def writable (self):
return 0
def handle_connect(self):
def handle_connect (self):
pass
def pull_trigger(self, thunk=None):
def pull_trigger (self, thunk=None):
if thunk:
try:
self.lock.acquire()
self.thunks.append(thunk)
self.thunks.append (thunk)
finally:
self.lock.release()
self.trigger.send('x')
self.trigger.send ('x')
def handle_read(self):
try:
self.recv(8192)
except os.error, err:
if err[0] == errno.EAGAIN: # resource temporarily unavailable
return
raise
def handle_read (self):
self.recv (8192)
try:
self.lock.acquire()
for thunk in self.thunks:
......
......@@ -14,11 +14,14 @@
"""Sized message async connections
"""
__version__ = "$Revision: 1.16 $"[11:-2]
__version__ = "$Revision: 1.17 $"[11:-2]
import asyncore, struct
from Exceptions import Disconnected
from zLOG import LOG, TRACE, ERROR, INFO, BLATHER
from types import StringType
import asyncore, string, struct, zLOG, sys, Acquisition
import socket, errno
from logger import zLogger
# Use the dictionary to make sure we get the minimum number of errno
# entries. We expect that EWOULDBLOCK == EAGAIN on most systems --
......@@ -38,81 +41,103 @@ tmp_dict = {errno.EAGAIN: 0,
expected_socket_write_errors = tuple(tmp_dict.keys())
del tmp_dict
class SizedMessageAsyncConnection(Acquisition.Explicit, asyncore.dispatcher):
class SizedMessageAsyncConnection(asyncore.dispatcher):
__super_init = asyncore.dispatcher.__init__
__super_close = asyncore.dispatcher.close
__closed = 1 # Marker indicating that we're closed
__append=None # Marker indicating that we're closed
socket = None # to outwit Sam's getattr
socket=None # to outwit Sam's getattr
READ_SIZE = 8096
def __init__(self, sock, addr, map=None, debug=None):
SizedMessageAsyncConnection.inheritedAttribute(
'__init__')(self, sock, map)
self.addr=addr
if debug is None and __debug__:
self._debug = zLogger("smac")
else:
self.addr = addr
if debug is not None:
self._debug = debug
self.__state=None
self.__inp=None
self.__inpl=0
self.__l=4
self.__output=output=[]
self.__append=output.append
self.__pop=output.pop
def handle_read(self,
join=string.join, StringType=type(''), _type=type,
_None=None):
elif not hasattr(self, '_debug'):
self._debug = __debug__ and 'smac'
self.__state = None
self.__inp = None # None, a single String, or a list
self.__input_len = 0
self.__msg_size = 4
self.__output = []
self.__closed = None
self.__super_init(sock, map)
# XXX avoid expensive getattr calls? Can't remember exactly what
# this comment was supposed to mean, but it has something to do
# with the way asyncore uses getattr and uses if sock:
def __nonzero__(self):
return 1
def handle_read(self):
# Use a single __inp buffer and integer indexes to make this
# fast.
try:
d=self.recv(8096)
except socket.error, err:
if err[0] in expected_socket_read_errors:
return
raise
if not d: return
inp=self.__inp
if inp is _None:
inp=d
elif _type(inp) is StringType:
inp=[inp,d]
if not d:
return
input_len = self.__input_len + len(d)
msg_size = self.__msg_size
state = self.__state
inp = self.__inp
if msg_size > input_len:
if inp is None:
self.__inp = d
elif type(self.__inp) is StringType:
self.__inp = [self.__inp, d]
else:
self.__inp.append(d)
self.__input_len = input_len
return # keep waiting for more input
# load all previous input and d into single string inp
if isinstance(inp, StringType):
inp = inp + d
elif inp is None:
inp = d
else:
inp.append(d)
inpl=self.__inpl+len(d)
l=self.__l
while 1:
if l <= inpl:
# Woo hoo, we have enough data
if _type(inp) is not StringType: inp=join(inp,'')
d=inp[:l]
inp=inp[l:]
inpl=inpl-l
if self.__state is _None:
# waiting for message
l=struct.unpack(">i",d)[0]
self.__state=1
else:
l=4
self.__state=_None
self.message_input(d)
inp = "".join(inp)
offset = 0
while (offset + msg_size) <= input_len:
msg = inp[offset:offset + msg_size]
offset = offset + msg_size
if state is None:
# waiting for message
msg_size = struct.unpack(">i", msg)[0]
state = 1
else:
break # not enough data
self.__l=l
self.__inp=inp
self.__inpl=inpl
msg_size = 4
state = None
self.message_input(msg)
def readable(self): return 1
def writable(self): return not not self.__output
self.__state = state
self.__msg_size = msg_size
self.__inp = inp[offset:]
self.__input_len = input_len - offset
def readable(self):
return 1
def writable(self):
if len(self.__output) == 0:
return 0
else:
return 1
def handle_write(self):
output=self.__output
output = self.__output
while output:
v=output[0]
v = output[0]
try:
n=self.send(v)
except socket.error, err:
......@@ -120,37 +145,33 @@ class SizedMessageAsyncConnection(Acquisition.Explicit, asyncore.dispatcher):
break # we couldn't write anything
raise
if n < len(v):
output[0]=v[n:]
output[0] = v[n:]
break # we can't write any more
else:
del output[0]
#break # waaa
def handle_close(self):
self.close()
def message_output(self, message,
pack=struct.pack, len=len):
if self._debug is not None:
if len(message) > 40:
m = message[:40]+' ...'
else:
m = message
self._debug.trace('message_output %s' % `m`)
append=self.__append
if append is None:
raise Disconnected("This action is temporarily unavailable.<p>")
append(pack(">i",len(message))+message)
def message_output(self, message):
if __debug__:
if self._debug:
if len(message) > 40:
m = message[:40]+' ...'
else:
m = message
LOG(self._debug, TRACE, 'message_output %s' % `m`)
if self.__closed is not None:
raise Disconnected, (
"This action is temporarily unavailable."
"<p>"
)
# do two separate appends to avoid copying the message string
self.__output.append(struct.pack(">i", len(message)))
self.__output.append(message)
def close(self):
if self.__append is not None:
self.__append=None
SizedMessageAsyncConnection.inheritedAttribute('close')(self)
class Disconnected(Exception):
"""The client has become disconnected from the server
"""
if self.__closed is None:
self.__closed = 1
self.__super_close()
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment