Commit 48bcb3a7 authored by Jeremy Hylton's avatar Jeremy Hylton

Changes from the ZEO-ZRPC-Dev branch merge.

parent 98eb0f1e
......@@ -144,18 +144,20 @@ file 0 and file 1.
"""
__version__ = "$Revision: 1.18 $"[11:-2]
__version__ = "$Revision: 1.19 $"[11:-2]
import os, tempfile
from struct import pack, unpack
from thread import allocate_lock
import zLOG
magic='ZEC0'
import sys
import zLOG
def LOG(msg, level=zLOG.BLATHER):
def log(msg, level=zLOG.INFO):
zLOG.LOG("ZEC", level, msg)
magic='ZEC0'
class ClientCache:
def __init__(self, storage='', size=20000000, client=None, var=None):
......@@ -211,16 +213,14 @@ class ClientCache:
f[0].write(magic)
current=0
log("cache opened. current = %s" % current)
self._limit=size/2
self._current=current
def close(self):
try:
self._f[self._current].close()
except (os.error, ValueError):
pass
def open(self):
# XXX open is overloaded to perform two tasks for
# optimization reasons
self._acquire()
try:
self._index=index={}
......@@ -235,6 +235,19 @@ class ClientCache:
return serial.items()
finally: self._release()
def close(self):
for f in self._f:
if f is not None:
f.close()
def verify(self, verifyFunc):
"""Call the verifyFunc on every object in the cache.
verifyFunc(oid, serialno, version)
"""
for oid, (s, vs) in self.open():
verifyFunc(oid, s, vs)
def invalidate(self, oid, version):
self._acquire()
try:
......@@ -373,8 +386,6 @@ class ClientCache:
self._f[current]=open(self._p[current],'w+b')
else:
# Temporary cache file:
if self._f[current] is not None:
self._f[current].close()
self._f[current] = tempfile.TemporaryFile(suffix='.zec')
self._f[current].write(magic)
self._pos=pos=4
......@@ -383,55 +394,57 @@ class ClientCache:
def store(self, oid, p, s, version, pv, sv):
self._acquire()
try: self._store(oid, p, s, version, pv, sv)
finally: self._release()
try:
self._store(oid, p, s, version, pv, sv)
finally:
self._release()
def _store(self, oid, p, s, version, pv, sv):
if not s:
p=''
s='\0\0\0\0\0\0\0\0'
tlen=31+len(p)
p = ''
s = '\0\0\0\0\0\0\0\0'
tlen = 31 + len(p)
if version:
tlen=tlen+len(version)+12+len(pv)
vlen=len(version)
tlen = tlen + len(version) + 12 + len(pv)
vlen = len(version)
else:
vlen=0
vlen = 0
pos=self._pos
current=self._current
f=self._f[current]
f.seek(pos)
stlen=pack(">I",tlen)
write=f.write
write(oid+'v'+stlen+pack(">HI", vlen, len(p))+s)
if p: write(p)
stlen = pack(">I", tlen)
# accumulate various data to write into a list
l = [oid, 'v', stlen, pack(">HI", vlen, len(p)), s]
if p:
l.append(p)
if version:
write(version)
write(pack(">I", len(pv)))
write(pv)
write(sv)
write(stlen)
if current: self._index[oid]=-pos
else: self._index[oid]=pos
l.extend([version,
pack(">I", len(pv)),
pv, sv])
l.append(stlen)
f = self._f[self._current]
f.seek(self._pos)
f.write("".join(l))
if self._current:
self._index[oid] = - self._pos
else:
self._index[oid] = self._pos
self._pos=pos+tlen
self._pos += tlen
def read_index(index, serial, f, current):
LOG("read_index(%s)" % f.name)
seek=f.seek
read=f.read
pos=4
seek(0,2)
size=f.tell()
while 1:
seek(pos)
f.seek(pos)
h=read(27)
if len(h)==27 and h[8] in 'vni':
tlen, vlen, dlen = unpack(">iHi", h[9:19])
else:
break
else: tlen=-1
if tlen <= 0 or vlen < 0 or dlen < 0 or vlen+dlen > tlen:
break
......@@ -466,15 +479,3 @@ def read_index(index, serial, f, current):
except: pass
return pos
def main(files):
for file in files:
print file
index = {}
serial = {}
read_index(index, serial, open(file), 0)
print index.keys()
if __name__ == "__main__":
import sys
main(sys.argv[1:])
......@@ -83,178 +83,167 @@
#
##############################################################################
"""Network ZODB storage client
"""
__version__='$Revision: 1.35 $'[11:-2]
XXX support multiple outstanding requests up until the vote?
XXX is_connected() vis ClientDisconnected error
"""
__version__='$Revision: 1.36 $'[11:-2]
import cPickle
import os
import socket
import string
import struct
import sys
import tempfile
import thread
import threading
import time
from types import TupleType, StringType
from struct import pack, unpack
import struct, time, os, socket, string, Sync, zrpc, ClientCache
import tempfile, Invalidator, ExtensionClass, thread
import ThreadedAsync
import ExtensionClass, Sync, ThreadLock
import ClientCache
import zrpc2
import ServerStub
from TransactionBuffer import TransactionBuffer
now=time.time
from struct import pack, unpack
from ZODB import POSException, BaseStorage
from ZODB import POSException
from ZODB.TimeStamp import TimeStamp
from zLOG import LOG, PROBLEM, INFO
from zLOG import LOG, PROBLEM, INFO, BLATHER
from Exceptions import Disconnected
try: from ZODB.ConflictResolution import ResolvedSerial
except: ResolvedSerial='rs'
def log2(type, msg, subsys="ClientStorage %d" % os.getpid()):
LOG(subsys, type, msg)
TupleType=type(())
try:
from ZODB.ConflictResolution import ResolvedSerial
except ImportError:
ResolvedSerial = 'rs'
class ClientStorageError(POSException.StorageError):
"""An error occured in the ZEO Client Storage"""
class UnrecognizedResult(ClientStorageError):
"""A server call returned an unrecognized result
"""
class ClientDisconnected(ClientStorageError):
"""The database storage is disconnected from the storage.
"""
class ClientStorage(ExtensionClass.Base, BaseStorage.BaseStorage):
_connected=_async=0
__begin='tpc_begin_sync'
def __init__(self, connection, storage='1', cache_size=20000000,
name='', client='', debug=0, var=None,
min_disconnect_poll=5, max_disconnect_poll=300,
wait_for_server_on_startup=1):
"""A server call returned an unrecognized result"""
# Decide whether to use non-temporary files
client=client or os.environ.get('ZEO_CLIENT','')
class ClientDisconnected(ClientStorageError, Disconnected):
"""The database storage is disconnected from the storage."""
self._connection=connection
self._storage=storage
self._debug=debug
self._wait_for_server_on_startup=wait_for_server_on_startup
def get_timestamp(prev_ts):
t = time.time()
t = apply(TimeStamp, (time.gmtime(t)[:5] + (t % 60,)))
t = t.laterThan(prev_ts)
return t
self._info={'length': 0, 'size': 0, 'name': 'ZEO Client',
'supportsUndo':0, 'supportsVersions': 0,
}
self._call=zrpc.asyncRPC(connection, debug=debug,
tmin=min_disconnect_poll,
tmax=max_disconnect_poll)
class DisconnectedServerStub:
"""Raise ClientDisconnected on all attribute access."""
name = name or str(connection)
def __getattr__(self, attr):
raise ClientDisconnected()
self.closed = 0
self._tfile=tempfile.TemporaryFile()
self._oids=[]
self._serials=[]
self._seriald={}
disconnected_stub = DisconnectedServerStub()
ClientStorage.inheritedAttribute('__init__')(self, name)
class ClientStorage:
self.__lock_acquire=self._lock_acquire
def __init__(self, addr, storage='1', cache_size=20000000,
name='', client='', debug=0, var=None,
min_disconnect_poll=5, max_disconnect_poll=300,
wait_for_server_on_startup=0, read_only=0):
self._cache=ClientCache.ClientCache(
storage, cache_size, client=client, var=var)
self._server = disconnected_stub
self._is_read_only = read_only
self._storage = storage
self._info = {'length': 0, 'size': 0, 'name': 'ZEO Client',
'supportsUndo':0, 'supportsVersions': 0}
ThreadedAsync.register_loop_callback(self.becomeAsync)
self._tbuf = TransactionBuffer()
self._db = None
self._oids = []
# XXX It's confusing to have _serial, _serials, and _seriald.
self._serials = []
self._seriald = {}
# IMPORTANT: Note that we aren't fully "there" yet.
# In particular, we don't actually connect to the server
# until we have a controlling database set with registerDB
# below.
self._basic_init(name or str(addr))
def registerDB(self, db, limit):
"""Register that the storage is controlled by the given DB.
"""
# Decide whether to use non-temporary files
client = client or os.environ.get('ZEO_CLIENT', '')
self._cache = ClientCache.ClientCache(storage, cache_size,
client=client, var=var)
self._cache.open() # XXX
self._rpc_mgr = zrpc2.ConnectionManager(addr, self,
#debug=debug,
tmin=min_disconnect_poll,
tmax=max_disconnect_poll)
# XXX What if we can only get a read-only connection and we
# want a read-write connection? Looks like the current code
# will block forever.
# Among other things, we know that our data methods won't get
# called until after this call.
self.invalidator = Invalidator.Invalidator(db.invalidate,
self._cache.invalidate)
def out_of_band_hook(
code, args,
get_hook={
'b': (self.invalidator.begin, 0),
'i': (self.invalidator.invalidate, 1),
'e': (self.invalidator.end, 0),
'I': (self.invalidator.Invalidate, 1),
'U': (self._commit_lock_release, 0),
's': (self._serials.append, 1),
'S': (self._info.update, 1),
}.get):
hook = get_hook(code, None)
if hook is None: return
hook, flag = hook
if flag: hook(args)
else: hook()
self._call.setOutOfBand(out_of_band_hook)
# Now that we have our callback system in place, we can
# try to connect
self._startup()
def _startup(self):
if not self._call.connect(not self._wait_for_server_on_startup):
# If we can't connect right away, go ahead and open the cache
# and start a separate thread to try and reconnect.
LOG("ClientStorage", PROBLEM, "Failed to connect to storage")
self._cache.open()
thread.start_new_thread(self._call.connect,(0,))
# If the connect succeeds then this work will be done by
# notifyConnected
def notifyConnected(self, s):
LOG("ClientStorage", INFO, "Connected to storage")
self._lock_acquire()
try:
# We let the connection keep coming up now that
# we have the storage lock. This way, we know no calls
# will be made while in the process of coming up.
self._call.finishConnect(s)
if wait_for_server_on_startup:
self._rpc_mgr.connect(sync=1)
else:
if not self._rpc_mgr.attempt_connect():
self._rpc_mgr.connect()
def _basic_init(self, name):
"""Handle initialization activites of BaseStorage"""
self.__name__ = name
# A ClientStorage only allows one client to commit at a time.
# A client enters the commit state by finding tpc_tid set to
# None and updating it to the new transaction's id. The
# tpc_tid variable is protected by tpc_cond.
self.tpc_cond = threading.Condition()
self._transaction = None
# Prevent multiple new_oid calls from going out. The _oids
# variable should only be modified while holding the
# oid_cond.
self.oid_cond = threading.Condition()
commit_lock = thread.allocate_lock()
self._commit_lock_acquire = commit_lock.acquire
self._commit_lock_release = commit_lock.release
t = time.time()
t = self._ts = apply(TimeStamp,(time.gmtime(t)[:5]+(t%60,)))
self._serial = `t`
self._oid='\0\0\0\0\0\0\0\0'
if self.closed:
return
self._connected=1
self._oids=[]
# we do synchronous commits until we are sure that
# we have and are ready for a main loop.
def registerDB(self, db, limit):
"""Register that the storage is controlled by the given DB."""
log2(INFO, "registerDB(%s, %s)" % (repr(db), repr(limit)))
self._db = db
# Hm. This is a little silly. If self._async, then
# we will really never do a synchronous commit.
# See below.
self.__begin='tpc_begin_sync'
self._call.message_output(str(self._storage))
def is_connected(self):
if self._server is disconnected_stub:
return 0
else:
return 1
### This seems silly. We should get the info asynchronously.
# self._info.update(self._call('get_info'))
def notifyConnected(self, c):
log2(INFO, "Connected to storage")
stub = ServerStub.StorageServer(c)
cached=self._cache.open()
### This is a little expensive for large caches
if cached:
self._call.sendMessage('beginZeoVerify')
for oid, (s, vs) in cached:
self._call.sendMessage('zeoVerify', oid, s, vs)
self._call.sendMessage('endZeoVerify')
self._oids = []
finally: self._lock_release()
# XXX Why is this synchronous? If it were async, verification
# would start faster.
stub.register(str(self._storage), self._is_read_only)
self.verify_cache(stub)
if self._async:
import asyncore
self.becomeAsync(asyncore.socket_map)
# Don't make the server available to clients until after
# validating the cache
self._server = stub
def verify_cache(self, server):
server.beginZeoVerify()
self._cache.verify(server.zeoVerify)
server.endZeoVerify()
### Is there a race condition between notifyConnected and
### notifyDisconnected? In Particular, what if we get
......@@ -268,363 +257,345 @@ class ClientStorage(ExtensionClass.Base, BaseStorage.BaseStorage):
### in the middle of notifyDisconnected, because *it's*
### responsible for starting the thread that makes the connection.
def notifyDisconnected(self, ignored):
LOG("ClientStorage", PROBLEM, "Disconnected from storage")
self._connected=0
self._transaction=None
thread.start_new_thread(self._call.connect,(0,))
if self._transaction is not None:
try:
self._commit_lock_release()
except:
pass
def notifyDisconnected(self):
log2(PROBLEM, "Disconnected from storage")
self._server = disconnected_stub
if self._transaction:
self._transaction = None
self.tpc_cond.notifyAll()
self.tpc_cond.release()
def __len__(self):
return self._info['length']
def becomeAsync(self, map):
self._lock_acquire()
def getName(self):
return "%s (%s)" % (self.__name__, "XXX")
def getSize(self):
return self._info['size']
def supportsUndo(self):
return self._info['supportsUndo']
def supportsVersions(self):
return self._info['supportsVersions']
def supportsTransactionalUndo(self):
try:
self._async=1
if self._connected:
self._call.setLoop(map, getWakeup())
self.__begin='tpc_begin'
finally: self._lock_release()
return self._info['supportsTransactionalUndo']
except KeyError:
return 0
def __len__(self): return self._info['length']
def isReadOnly(self):
return self._is_read_only
def abortVersion(self, src, transaction):
if transaction is not self._transaction:
raise POSException.StorageTransactionError(self, transaction)
self._lock_acquire()
def _check_trans(self, trans, exc=None):
if self._transaction is not trans:
if exc is None:
return 0
else:
raise exc(self._transaction, trans)
return 1
def _check_tid(self, tid, exc=None):
# XXX Is all this locking unnecessary? The only way to
# begin a transaction is to call tpc_begin(). If we assume
# clients are single-threaded and well-behaved, i.e. they call
# tpc_begin() first, then there appears to be no need for
# locking. If _check_tid() is called and self.tpc_tid != tid,
# then there is no way it can be come equal during the call.
# Thus, there should be no race.
if self.tpc_tid != tid:
if exc is None:
return 0
else:
raise exc(self.tpc_tid, tid)
return 1
# XXX But I'm not sure
self.tpc_cond.acquire()
try:
oids=self._call('abortVersion', src, self._serial)
vlen = pack(">H", len(src))
for oid in oids:
self._tfile.write("i%s%s%s" % (oid, vlen, src))
return oids
finally: self._lock_release()
if self.tpc_tid != tid:
if exc is None:
return 0
else:
raise exc(self.tpc_tid, tid)
return 1
finally:
self.tpc_cond.release()
def abortVersion(self, src, transaction):
if self._is_read_only:
raise POSException.ReadOnlyError()
self._check_trans(transaction,
POSException.StorageTransactionError)
oids = self._server.abortVersion(src, self._serial)
for oid in oids:
self._tbuf.invalidate(oid, src)
return oids
def close(self):
self._lock_acquire()
try:
LOG("ClientStorage", INFO, "close")
self._call.closeIntensionally()
try:
self._tfile.close()
except os.error:
# On Windows, this can fail if it is called more than
# once, because it tries to delete the file each
# time.
pass
self._rpc_mgr.close()
if self._cache is not None:
self._cache.close()
if self.invalidator is not None:
self.invalidator.close()
self.invalidator = None
self.closed = 1
finally: self._lock_release()
def commitVersion(self, src, dest, transaction):
if transaction is not self._transaction:
raise POSException.StorageTransactionError(self, transaction)
self._lock_acquire()
try:
oids=self._call('commitVersion', src, dest, self._serial)
if dest:
vlen = pack(">H", len(src))
# just invalidate our version data
for oid in oids:
self._tfile.write("i%s%s%s" % (oid, vlen, src))
else:
vlen = pack(">H", len(dest))
# dest is '', so invalidate version and non-version
for oid in oids:
self._tfile.write("i%s%s%s" % (oid, vlen, dest))
return oids
finally: self._lock_release()
def getName(self):
return "%s (%s)" % (
self.__name__,
self._connected and 'connected' or 'disconnected')
if self._is_read_only:
raise POSException.ReadOnlyError()
self._check_trans(transaction,
POSException.StorageTransactionError)
oids = self._server.commitVersion(src, dest, self._serial)
if dest:
# just invalidate our version data
for oid in oids:
self._tbuf.invalidate(oid, src)
else:
# dest is '', so invalidate version and non-version
for oid in oids:
self._tbuf.invalidate(oid, dest)
return oids
def getSize(self): return self._info['size']
def history(self, oid, version, length=1):
self._lock_acquire()
try: return self._call('history', oid, version, length)
finally: self._lock_release()
return self._server.history(oid, version, length)
def loadSerial(self, oid, serial):
self._lock_acquire()
try: return self._call('loadSerial', oid, serial)
finally: self._lock_release()
return self._server.loadSerial(oid, serial)
def load(self, oid, version, _stuff=None):
self._lock_acquire()
try:
cache=self._cache
p = cache.load(oid, version)
if p: return p
p, s, v, pv, sv = self._call('zeoLoad', oid)
cache.checkSize(0)
cache.store(oid, p, s, v, pv, sv)
if not v or not version or version != v:
if s: return p, s
raise KeyError, oid # no non-version data for this
p = self._cache.load(oid, version)
if p:
return p
if self._server is None:
raise ClientDisconnected()
p, s, v, pv, sv = self._server.zeoLoad(oid)
self._cache.checkSize(0)
self._cache.store(oid, p, s, v, pv, sv)
if v and version and v == version:
return pv, sv
finally: self._lock_release()
else:
if s:
return p, s
raise KeyError, oid # no non-version data for this
def modifiedInVersion(self, oid):
self._lock_acquire()
try:
v=self._cache.modifiedInVersion(oid)
if v is not None: return v
return self._call('modifiedInVersion', oid)
finally: self._lock_release()
v = self._cache.modifiedInVersion(oid)
if v is not None:
return v
return self._server.modifiedInVersion(oid)
def new_oid(self, last=None):
self._lock_acquire()
try:
oids=self._oids
if not oids:
oids[:]=self._call('new_oids')
oids.reverse()
return oids.pop()
finally: self._lock_release()
if self._is_read_only:
raise POSException.ReadOnlyError()
# We want to avoid a situation where multiple oid requests are
# made at the same time.
self.oid_cond.acquire()
if not self._oids:
self._oids = self._server.new_oids()
self._oids.reverse()
self.oid_cond.notifyAll()
oid = self._oids.pop()
self.oid_cond.release()
return oid
def pack(self, t=None, rf=None, wait=0, days=0):
if self._is_read_only:
raise POSException.ReadOnlyError()
# Note that we ignore the rf argument. The server
# will provide it's own implementation.
if t is None: t=time.time()
t=t-(days*86400)
self._lock_acquire()
try: return self._call('pack', t, wait)
finally: self._lock_release()
if t is None:
t = time.time()
t = t - (days * 86400)
return self._server.pack(t, wait)
def _check_serials(self):
if self._serials:
l = len(self._serials)
r = self._serials[:l]
del self._serials[:l]
for oid, s in r:
if isinstance(s, Exception):
raise s
self._seriald[oid] = s
return r
def store(self, oid, serial, data, version, transaction):
if transaction is not self._transaction:
raise POSException.StorageTransactionError(self, transaction)
self._lock_acquire()
try:
serial=self._call.sendMessage('storea', oid, serial,
data, version, self._serial)
write=self._tfile.write
buf = string.join(("s", oid,
pack(">HI", len(version), len(data)),
version, data), "")
write(buf)
if self._serials:
s=self._serials
l=len(s)
r=s[:l]
del s[:l]
d=self._seriald
for oid, s in r: d[oid]=s
return r
return serial
finally: self._lock_release()
if self._is_read_only:
raise POSException.ReadOnlyError()
self._check_trans(transaction, POSException.StorageTransactionError)
self._server.storea(oid, serial, data, version, self._serial)
self._tbuf.store(oid, version, data)
return self._check_serials()
def tpc_vote(self, transaction):
self._lock_acquire()
try:
if transaction is not self._transaction:
return
self._call('vote', self._serial)
if self._serials:
s=self._serials
l=len(s)
r=s[:l]
del s[:l]
d=self._seriald
for oid, s in r: d[oid]=s
return r
finally: self._lock_release()
if transaction is not self._transaction:
return
self._server.vote(self._serial)
return self._check_serials()
def supportsUndo(self):
return self._info['supportsUndo']
def supportsVersions(self):
return self._info['supportsVersions']
def supportsTransactionalUndo(self):
try:
return self._info['supportsTransactionalUndo']
except KeyError:
return 0
def tpc_abort(self, transaction):
self._lock_acquire()
try:
if transaction is not self._transaction: return
self._call('tpc_abort', self._serial)
self._transaction=None
self._tfile.seek(0)
self._seriald.clear()
del self._serials[:]
self._commit_lock_release()
finally: self._lock_release()
if transaction is not self._transaction:
return
self._server.tpc_abort(self._serial)
self._tbuf.clear()
self._seriald.clear()
del self._serials[:]
self._transaction = None
self.tpc_cond.notify()
self.tpc_cond.release()
def tpc_begin(self, transaction):
self._lock_acquire()
try:
if self._transaction is transaction: return
user=transaction.user
desc=transaction.description
ext=transaction._extension
while 1:
self._lock_release()
self._commit_lock_acquire()
self._lock_acquire()
# We've got the local commit lock. Now get
# a (tentative) transaction time stamp.
t=time.time()
t=apply(TimeStamp,(time.gmtime(t)[:5]+(t%60,)))
self._ts=t=t.laterThan(self._ts)
id=`t`
try:
if not self._connected:
raise ClientDisconnected(
"This action is temporarily unavailable.<p>")
r=self._call(self.__begin, id, user, desc, ext)
except:
# XXX can't seem to guarantee that the lock is held here.
self._commit_lock_release()
raise
if r is None: break
# We have *BOTH* the local and distributed commit
# lock, now we can actually get ready to get started.
self._serial=id
self._tfile.seek(0)
self._seriald.clear()
del self._serials[:]
self._transaction=transaction
self.tpc_cond.acquire()
while self._transaction is not None:
if self._transaction == transaction:
self.tpc_cond.release()
return
self.tpc_cond.wait()
if self._server is None:
self.tpc_cond.release()
raise ClientDisconnected()
finally: self._lock_release()
self._ts = get_timestamp(self._ts)
id = `self._ts`
self._transaction = transaction
def tpc_finish(self, transaction, f=None):
self._lock_acquire()
try:
if transaction is not self._transaction: return
if f is not None: f()
self._call('tpc_finish', self._serial,
transaction.user,
transaction.description,
transaction._extension)
seriald=self._seriald
if self._serials:
s=self._serials
l=len(s)
r=s[:l]
del s[:l]
for oid, s in r: seriald[oid]=s
tfile=self._tfile
seek=tfile.seek
read=tfile.read
cache=self._cache
size=tfile.tell()
cache.checkSize(size)
seek(0)
i=0
while i < size:
opcode=read(1)
if opcode == "s":
oid=read(8)
s=seriald[oid]
h=read(6)
vlen, dlen = unpack(">HI", h)
if vlen: v=read(vlen)
else: v=''
p=read(dlen)
if len(p) != dlen:
raise ClientStorageError, (
"Unexpected end of file in client storage "
"temporary file."
)
if s==ResolvedSerial:
self._cache.invalidate(oid, v)
else:
self._cache.update(oid, s, v, p)
i=i+15+vlen+dlen
elif opcode == "i":
oid=read(8)
h=read(2)
vlen=unpack(">H", h)[0]
v=read(vlen)
self._cache.invalidate(oid, v)
i=i+11+vlen
seek(0)
self._transaction=None
self._commit_lock_release()
finally: self._lock_release()
r = self._server.tpc_begin(id,
transaction.user,
transaction.description,
transaction._extension)
except:
# If _server is None, then the client disconnected during
# the tpc_begin() and notifyDisconnected() will have
# released the lock.
if self._server is not disconnected_stub:
self.tpc_cond.release()
raise
self._serial = id
self._seriald.clear()
del self._serials[:]
def tpc_finish(self, transaction, f=None):
if transaction is not self._transaction:
return
if f is not None: # XXX what is f()?
f()
self._server.tpc_finish(self._serial)
r = self._check_serials()
assert r is None or len(r) == 0, "unhandled serialnos: %s" % r
self._update_cache()
self._transaction = None
self.tpc_cond.notify()
self.tpc_cond.release()
def _update_cache(self):
# Iterate over the objects in the transaction buffer and
# update or invalidate the cache.
self._cache.checkSize(self._tbuf.get_size())
self._tbuf.begin_iterate()
while 1:
try:
t = self._tbuf.next()
except ValueError, msg:
raise ClientStorageError, (
"Unexpected error reading temporary file in "
"client storage: %s" % msg)
if t is None:
break
oid, v, p = t
if p is None: # an invalidation
s = None
else:
s = self._seriald[oid]
if s == ResolvedSerial or s is None:
self._cache.invalidate(oid, v)
else:
self._cache.update(oid, s, v, p)
self._tbuf.clear()
def transactionalUndo(self, trans_id, trans):
self._lock_acquire()
try:
if trans is not self._transaction:
raise POSException.StorageTransactionError(self, transaction)
oids = self._call('transactionalUndo', trans_id, self._serial)
for oid in oids:
# write invalidation records with no version
self._tfile.write("i%s\000\000" % oid)
return oids
finally: self._lock_release()
if self._is_read_only:
raise POSException.ReadOnlyError()
self._check_trans(trans, POSException.StorageTransactionError)
oids = self._server.transactionalUndo(trans_id, self._serial)
for oid in oids:
self._tbuf.invalidate(oid, '')
return oids
def undo(self, transaction_id):
self._lock_acquire()
try:
oids=self._call('undo', transaction_id)
cinvalidate=self._cache.invalidate
for oid in oids:
cinvalidate(oid,'')
return oids
finally: self._lock_release()
if self._is_read_only:
raise POSException.ReadOnlyError()
# XXX what are the sync issues here?
oids = self._server.undo(transaction_id)
for oid in oids:
self._cache.invalidate(oid, '')
return oids
def undoInfo(self, first=0, last=-20, specification=None):
self._lock_acquire()
try:
return self._call('undoInfo', first, last, specification)
finally: self._lock_release()
return self._server.undoInfo(first, last, specification)
def undoLog(self, first, last, filter=None):
if filter is not None: return ()
if filter is not None:
return () # XXX can't pass a filter to server
self._lock_acquire()
try: return self._call('undoLog', first, last) # Eek!
finally: self._lock_release()
return self._server.undoLog(first, last) # Eek!
def versionEmpty(self, version):
self._lock_acquire()
try: return self._call('versionEmpty', version)
finally: self._lock_release()
return self._server.versionEmpty(version)
def versions(self, max=None):
self._lock_acquire()
try: return self._call('versions', max)
finally: self._lock_release()
def sync(self): self._call.sync()
def getWakeup(_w=[]):
if _w: return _w[0]
import trigger
t=trigger.trigger().pull_trigger
_w.append(t)
return t
return self._server.versions(max)
# below are methods invoked by the StorageServer
def serialno(self, arg):
self._serials.append(arg)
def info(self, dict):
self._info.update(dict)
def begin(self):
self._tfile = tempfile.TemporaryFile()
self._pickler = cPickle.Pickler(self._tfile, 1)
self._pickler.fast = 1 # Don't use the memo
def invalidate(self, args):
if self._pickler is None:
return
self._pickler.dump(args)
def end(self):
if self._pickler is None:
return
self._pickler.dump((0,0))
## self._pickler.dump = None
self._tfile.seek(0)
unpick = cPickle.Unpickler(self._tfile)
self._tfile = None
while 1:
oid, version = unpick.load()
if not oid:
break
self._cache.invalidate(oid, version=version)
self._db.invalidate(oid, version=version)
def Invalidate(self, args):
# XXX _db could be None
for oid, version in args:
self._cache.invalidate(oid, version=version)
try:
self._db.invalidate(oid, version=version)
except AttributeError, msg:
log2(PROBLEM,
"Invalidate(%s, %s) failed for _db: %s" % (repr(oid),
repr(version),
msg))
"""Stub for interface exported by ClientStorage"""
class ClientStorage:
def __init__(self, rpc):
self.rpc = rpc
def beginVerify(self):
self.rpc.callAsync('begin')
# XXX what's the difference between these two?
def invalidate(self, args):
self.rpc.callAsync('invalidate', args)
def Invalidate(self, args):
self.rpc.callAsync('Invalidate', args)
def endVerify(self):
self.rpc.callAsync('end')
def serialno(self, arg):
self.rpc.callAsync('serialno', arg)
def info(self, arg):
self.rpc.callAsync('info', arg)
"""Exceptions for ZEO."""
class Disconnected(Exception):
"""Exception raised when a ZEO client is disconnected from the
ZEO server."""
"""Stub for interface exposed by StorageServer"""
class StorageServer:
def __init__(self, rpc):
self.rpc = rpc
def register(self, storage_name, read_only):
self.rpc.call('register', storage_name, read_only)
def get_info(self):
return self.rpc.call('get_info')
def get_size_info(self):
return self.rpc.call('get_size_info')
def beginZeoVerify(self):
self.rpc.callAsync('beginZeoVerify')
def zeoVerify(self, oid, s, sv):
self.rpc.callAsync('zeoVerify', oid, s, sv)
def endZeoVerify(self):
self.rpc.callAsync('endZeoVerify')
def new_oids(self, n=None):
if n is None:
return self.rpc.call('new_oids')
else:
return self.rpc.call('new_oids', n)
def pack(self, t, wait=None):
if wait is None:
self.rpc.call('pack', t)
else:
self.rpc.call('pack', t, wait)
def zeoLoad(self, oid):
return self.rpc.call('zeoLoad', oid)
def storea(self, oid, serial, data, version, id):
self.rpc.callAsync('storea', oid, serial, data, version, id)
def tpc_begin(self, id, user, descr, ext):
return self.rpc.call('tpc_begin', id, user, descr, ext)
def vote(self, trans_id):
return self.rpc.call('vote', trans_id)
def tpc_finish(self, id):
return self.rpc.call('tpc_finish', id)
def tpc_abort(self, id):
self.rpc.callAsync('tpc_abort', id)
def abortVersion(self, src, id):
return self.rpc.call('abortVersion', src, id)
def commitVersion(self, src, dest, id):
return self.rpc.call('commitVersion', src, dest, id)
def history(self, oid, version, length=None):
if length is not None:
return self.rpc.call('history', oid, version)
else:
return self.rpc.call('history', oid, version, length)
def load(self, oid, version):
return self.rpc.call('load', oid, version)
def loadSerial(self, oid, serial):
return self.rpc.call('loadSerial', oid, serial)
def modifiedInVersion(self, oid):
return self.rpc.call('modifiedInVersion', oid)
def new_oid(self, last=None):
if last is None:
return self.rpc.call('new_oid')
else:
return self.rpc.call('new_oid', last)
def store(self, oid, serial, data, version, trans):
return self.rpc.call('store', oid, serial, data, version, trans)
def transactionalUndo(self, trans_id, trans):
return self.rpc.call('transactionalUndo', trans_id, trans)
def undo(self, trans_id):
return self.rpc.call('undo', trans_id)
def undoLog(self, first, last):
# XXX filter not allowed across RPC
return self.rpc.call('undoLog', first, last)
def undoInfo(self, first, last, spec):
return self.rpc.call('undoInfo', first, last, spec)
def versionEmpty(self, vers):
return self.rpc.call('versionEmpty', vers)
def versions(self, max=None):
if max is None:
return self.rpc.call('versions')
else:
return self.rpc.call('versions', max)
#############################################################################
##############################################################################
#
# Zope Public License (ZPL) Version 1.0
# -------------------------------------
......@@ -59,7 +59,7 @@
# labeled as unofficial distributions. Modifications which do not
# carry the name Zope may be packaged in any form, as long as they
# conform to all of the clauses above.
#
#
#
# Disclaimer
#
......@@ -82,527 +82,394 @@
# attributions are listed in the accompanying credits file.
#
##############################################################################
"""Network ZODB storage server
__version__ = "$Revision: 1.32 $"[11:-2]
This server acts as a front-end for one or more real storages, like
file storage or Berkeley storage.
import asyncore, socket, string, sys, os
from smac import SizedMessageAsyncConnection
from ZODB import POSException
XXX Need some basic access control-- a declaration of the methods
exported for invocation by the server.
"""
import asyncore
import cPickle
from cPickle import Unpickler
from ZODB.POSException import TransactionError, UndoError, VersionCommitError
from ZODB.Transaction import Transaction
import traceback
from zLOG import LOG, INFO, ERROR, TRACE, BLATHER
import os
import sys
import threading
import types
import ClientStub
import zrpc2
import zLOG
from zrpc2 import Dispatcher, Handler, ManagedServerConnection, Delay
from ZODB.POSException import StorageError, StorageTransactionError, \
TransactionError, ReadOnlyError
from ZODB.referencesf import referencesf
from thread import start_new_thread
from cStringIO import StringIO
from ZEO import trigger
from ZEO import asyncwrap
from types import StringType
class StorageServerError(POSException.StorageError): pass
max_blather=120
def blather(*args):
accum = []
total_len = 0
for arg in args:
if not isinstance(arg, StringType):
arg = str(arg)
accum.append(arg)
total_len = total_len + len(arg)
if total_len >= max_blather:
break
m = string.join(accum)
if len(m) > max_blather: m = m[:max_blather] + ' ...'
LOG('ZEO Server', TRACE, m)
from ZODB.Transaction import Transaction
# We create a special fast pickler! This allows us
# to create slightly more efficient pickles and
# to create them a tad faster.
pickler=cPickle.Pickler()
pickler.fast=1 # Don't use the memo
dump=pickler.dump
class StorageServer(asyncore.dispatcher):
def __init__(self, connection, storages):
pickler = cPickle.Pickler()
pickler.fast = 1 # Don't use the memo
dump = pickler.dump
def log(message, level=zLOG.INFO, label="ZEO Server:%s" % os.getpid(),
error=None):
zLOG.LOG(label, level, message, error=error)
class StorageServerError(StorageError):
pass
class StorageServer:
def __init__(self, addr, storages, read_only=0):
# XXX should read_only be a per-storage option? not yet...
self.addr = addr
self.storages = storages
self.read_only = read_only
self.connections = {}
for name, store in storages.items():
fixup_storage(store)
self.dispatcher = Dispatcher(addr, factory=self.newConnection,
reuse_addr=1)
def newConnection(self, sock, addr, nil):
c = ManagedServerConnection(sock, addr, None, self)
c.register_object(StorageProxy(self, c))
return c
self.__storages=storages
for n, s in storages.items():
init_storage(s)
self.__connections={}
self.__get_connections=self.__connections.get
self._pack_trigger = trigger.trigger()
asyncore.dispatcher.__init__(self)
if type(connection) is type(''):
self.create_socket(socket.AF_UNIX, socket.SOCK_STREAM)
try: os.unlink(connection)
except: pass
else:
self.create_socket(socket.AF_INET, socket.SOCK_STREAM)
self.set_reuse_addr()
LOG('ZEO Server', INFO, 'Listening on %s' % repr(connection))
self.bind(connection)
self.listen(5)
def register_connection(self, connection, storage_id):
storage=self.__storages.get(storage_id, None)
if storage is None:
LOG('ZEO Server', ERROR, "Unknown storage_id: %s" % storage_id)
connection.close()
return None, None
connections=self.__get_connections(storage_id, None)
if connections is None:
self.__connections[storage_id]=connections=[]
connections.append(connection)
return storage, storage_id
def unregister_connection(self, connection, storage_id):
connections=self.__get_connections(storage_id, None)
if connections:
n=[]
for c in connections:
if c is not connection:
n.append(c)
self.__connections[storage_id]=n
def invalidate(self, connection, storage_id, invalidated=(), info=0,
dump=dump):
for c in self.__connections[storage_id]:
if invalidated and c is not connection:
c.message_output('I'+dump(invalidated, 1))
if info:
c.message_output('S'+dump(info, 1))
def writable(self): return 0
def handle_read(self): pass
def readable(self): return 1
def handle_connect (self): pass
def handle_accept(self):
try:
sock, addr = self.accept()
except socket.error:
sys.stderr.write('warning: accept failed\n')
def register(self, storage_id, proxy):
"""Register a connection's use with a particular storage.
This information is needed to handle invalidation.
"""
l = self.connections.get(storage_id)
if l is None:
l = self.connections[storage_id] = []
# intialize waiting list
self.storages[storage_id]._StorageProxy__waiting = []
l.append(proxy)
def invalidate(self, conn, storage_id, invalidated=(), info=0):
for p in self.connections[storage_id]:
if invalidated and p is not conn:
p.client.Invalidate(invalidated)
else:
p.client.info(info)
def close_server(self):
# Close the dispatcher so that there are no new connections.
self.dispatcher.close()
for storage in self.storages.values():
storage.close()
# Force the asyncore mainloop to exit by hackery, i.e. close
# every socket in the map. loop() will return when the map is
# empty.
for s in asyncore.socket_map.values():
try:
s.close()
except:
pass
def close(self, conn):
# XXX who calls this?
# called when conn is closed
# way too inefficient
removed = 0
for sid, cl in self.connections.items():
if conn.obj in cl:
cl.remove(conn.obj)
removed = 1
class StorageProxy(Handler):
def __init__(self, server, conn):
self.server = server
self.client = ClientStub.ClientStorage(conn)
self.__storage = None
self.__invalidated = []
self._transaction = None
def __repr__(self):
tid = self._transaction and repr(self._transaction.id)
if self.__storage:
stid = self.__storage._transaction and \
repr(self.__storage._transaction.id)
else:
ZEOConnection(self, sock, addr)
def log_info(self, message, type='info'):
if type=='error': type=ERROR
else: type=INFO
LOG('ZEO Server', type, message)
log=log_info
storage_methods={}
for n in (
'get_info', 'abortVersion', 'commitVersion',
'history', 'load', 'loadSerial',
'modifiedInVersion', 'new_oid', 'new_oids', 'pack', 'store',
'storea', 'tpc_abort', 'tpc_begin', 'tpc_begin_sync',
'tpc_finish', 'undo', 'undoLog', 'undoInfo', 'versionEmpty', 'versions',
'transactionalUndo',
'vote', 'zeoLoad', 'zeoVerify', 'beginZeoVerify', 'endZeoVerify',
):
storage_methods[n]=1
storage_method=storage_methods.has_key
def find_global(module, name,
global_dict=globals(), silly=('__doc__',)):
try: m=__import__(module, global_dict, global_dict, silly)
except:
raise StorageServerError, (
"Couldn\'t import global module %s" % module)
try: r=getattr(m, name)
except:
raise StorageServerError, (
"Couldn\'t find global %s in module %s" % (name, module))
safe=getattr(r, '__no_side_effects__', 0)
if safe: return r
raise StorageServerError, 'Unsafe global, %s.%s' % (module, name)
_noreturn=[]
class ZEOConnection(SizedMessageAsyncConnection):
_transaction=None
__storage=__storage_id=None
def __init__(self, server, sock, addr):
self.__server=server
self.__invalidated=[]
self.__closed=None
if __debug__: debug='ZEO Server'
else: debug=0
SizedMessageAsyncConnection.__init__(self, sock, addr, debug=debug)
LOG('ZEO Server', INFO, 'Connect %s %s' % (id(self), `addr`))
def close(self):
t=self._transaction
if (t is not None and self.__storage is not None and
self.__storage._transaction is t):
self.tpc_abort(t.id)
else:
self._transaction=None
self.__invalidated=[]
self.__server.unregister_connection(self, self.__storage_id)
self.__closed=1
SizedMessageAsyncConnection.close(self)
LOG('ZEO Server', INFO, 'Close %s' % id(self))
def message_input(self, message,
dump=dump, Unpickler=Unpickler, StringIO=StringIO,
None=None):
if __debug__:
if len(message) > max_blather:
tmp = `message[:max_blather]`
stid = None
return "<StorageProxy %X trans=%s s_trans=%s>" % (id(self), tid,
stid)
def _log(self, msg, level=zLOG.INFO, error=None, pid=os.getpid()):
zLOG.LOG("ZEO Server %s %X" % (pid, id(self)),
level, msg, error=error)
def setup_delegation(self):
"""Delegate several methods to the storage"""
self.undoInfo = self.__storage.undoInfo
self.undoLog = self.__storage.undoLog
self.versionEmpty = self.__storage.versionEmpty
self.versions = self.__storage.versions
self.history = self.__storage.history
self.load = self.__storage.load
self.loadSerial = self.__storage.loadSerial
def _check_tid(self, tid, exc=None):
caller = sys._getframe().f_back.f_code.co_name
if self._transaction is None:
self._log("no current transaction: %s()" % caller,
zLOG.PROBLEM)
if exc is not None:
raise exc(None, tid)
else:
tmp = `message`
blather('message_input', id(self), tmp)
if self.__storage is None:
# This is the first communication from the client
self.__storage, self.__storage_id = (
self.__server.register_connection(self, message))
# Send info back asynchronously, so client need not ask
self.message_output('S'+dump(self.get_info(), 1))
return
try:
# Unpickle carefully.
unpickler=Unpickler(StringIO(message))
unpickler.find_global=find_global
args=unpickler.load()
name, args = args[0], args[1:]
if __debug__:
apply(blather,
("call", id(self), ":", name,) + args)
if not storage_method(name):
raise 'Invalid Method Name', name
if hasattr(self, name):
r=apply(getattr(self, name), args)
return 0
if self._transaction.id != tid:
self._log("%s(%s) invalid; current transaction = %s" % \
(caller, repr(tid), repr(self._transaction.id)),
zLOG.PROBLEM)
if exc is not None:
raise exc(self._transaction.id, tid)
else:
r=apply(getattr(self.__storage, name), args)
if r is _noreturn: return
except (UndoError, VersionCommitError):
# These are normal usage errors. No need to leg them
self.return_error(sys.exc_info()[0], sys.exc_info()[1])
return
except:
LOG('ZEO Server', ERROR, 'error', error=sys.exc_info())
self.return_error(sys.exc_info()[0], sys.exc_info()[1])
return
return 0
return 1
if __debug__:
blather("%s R: %s" % (id(self), `r`))
r=dump(r,1)
self.message_output('R'+r)
def register(self, storage_id, read_only):
"""Select the storage that this client will use
def return_error(self, err_type, err_value, type=type, dump=dump):
if type(err_value) is not type(self):
err_value = err_type, err_value
This method must be the first one called by the client.
"""
storage = self.server.storages.get(storage_id)
if storage is None:
self._log("unknown storage_id: %s" % storage_id)
raise ValueError, "unknown storage: %s" % storage_id
if __debug__:
blather("%s E: %s" % (id(self), `err_value`))
try: r=dump(err_value, 1)
except:
# Ugh, must be an unpicklable exception
r=StorageServerError("Couldn't pickle error %s" % `r`)
dump('',1) # clear pickler
r=dump(r,1)
if not read_only and (self.server.read_only or storage.isReadOnly()):
raise ReadOnlyError()
self.message_output('E'+r)
self.__storage_id = storage_id
self.__storage = storage
self.setup_delegation()
self.server.register(storage_id, self)
self._log("registered storage %s: %s" % (storage_id, storage))
def get_info(self):
storage=self.__storage
info = {
'length': len(storage),
'size': storage.getSize(),
'name': storage.getName(),
}
for feature in ('supportsUndo',
'supportsVersions',
'supportsTransactionalUndo',):
if hasattr(storage, feature):
info[feature] = getattr(storage, feature)()
else:
info[feature] = 0
return info
return {'length': len(self.__storage),
'size': self.__storage.getSize(),
'name': self.__storage.getName(),
'supportsUndo': self.__storage.supportsUndo(),
'supportsVersions': self.__storage.supportsVersions(),
'supportsTransactionalUndo':
self.__storage.supportsTransactionalUndo(),
}
def get_size_info(self):
storage=self.__storage
return {
'length': len(storage),
'size': storage.getSize(),
}
return {'length': len(self.__storage),
'size': self.__storage.getSize(),
}
def zeoLoad(self, oid):
storage=self.__storage
v=storage.modifiedInVersion(oid)
if v: pv, sv = storage.load(oid, v)
else: pv=sv=None
v = self.__storage.modifiedInVersion(oid)
if v:
pv, sv = self.__storage.load(oid, v)
else:
pv = sv = None
try:
p, s = storage.load(oid,'')
p, s = self.__storage.load(oid, '')
except KeyError:
if sv:
# Created in version, no non-version data
p=s=None
p = s = None
else:
raise
return p, s, v, pv, sv
def beginZeoVerify(self):
self.message_output('bN.')
return _noreturn
def zeoVerify(self, oid, s, sv,
dump=dump):
try: p, os, v, pv, osv = self.zeoLoad(oid)
except: return _noreturn
p=pv=None # free the pickles
self.client.beginVerify()
def zeoVerify(self, oid, s, sv):
try:
p, os, v, pv, osv = self.zeoLoad(oid)
except: # except what?
return None
if os != s:
self.message_output('i'+dump((oid, ''),1))
self.client.invalidate((oid, ''))
elif osv != sv:
self.message_output('i'+dump((oid, v),1))
return _noreturn
self.client.invalidate((oid, v))
def endZeoVerify(self):
self.message_output('eN.')
return _noreturn
def new_oids(self, n=100):
new_oid=self.__storage.new_oid
if n < 0: n=1
r=range(n)
for i in r: r[i]=new_oid()
return r
self.client.endVerify()
def pack(self, t, wait=0):
start_new_thread(self._pack, (t,wait))
if wait: return _noreturn
t = threading.Thread(target=self._pack, args=(t, wait))
t.start()
def _pack(self, t, wait=0):
try:
LOG('ZEO Server', BLATHER, 'pack begin')
try:
self.__storage.pack(t, referencesf)
LOG('ZEO Server', BLATHER, 'pack end')
except:
LOG('ZEO Server', ERROR,
'Pack failed for %s' % self.__storage_id,
error=sys.exc_info())
self._log('ZEO Server', zLOG.ERROR,
'Pack failed for %s' % self.__storage_id,
error=sys.exc_info())
if wait:
self.return_error(sys.exc_info()[0], sys.exc_info()[1])
self.__server._pack_trigger.pull_trigger()
raise
else:
if wait:
self.message_output('RN.')
self.__server._pack_trigger.pull_trigger()
else:
if not wait:
# Broadcast new size statistics
self.__server.invalidate(0, self.__storage_id, (),
self.get_size_info())
self.server.invalidate(0, self.__storage_id, (),
self.get_size_info())
def abortVersion(self, src, id):
t=self._transaction
if t is None or id != t.id:
raise POSException.StorageTransactionError(self, id)
oids=self.__storage.abortVersion(src, t)
a=self.__invalidated.append
for oid in oids: a((oid,src))
self._check_tid(id, exc=StorageTransactionError)
oids = self.__storage.abortVersion(src, self._transaction)
for oid in oids:
self.__invalidated.append((oid, src))
return oids
def commitVersion(self, src, dest, id):
t=self._transaction
if t is None or id != t.id:
raise POSException.StorageTransactionError(self, id)
oids=self.__storage.commitVersion(src, dest, t)
a=self.__invalidated.append
self._check_tid(id, exc=StorageTransactionError)
oids = self.__storage.commitVersion(src, dest, self._transaction)
for oid in oids:
a((oid,dest))
if dest: a((oid,src))
self.__invalidated.append((oid, dest))
if dest:
self.__invalidated.append((oid, src))
return oids
def storea(self, oid, serial, data, version, id,
dump=dump):
def storea(self, oid, serial, data, version, id):
self._check_tid(id, exc=StorageTransactionError)
try:
t=self._transaction
if t is None or id != t.id:
raise POSException.StorageTransactionError(self, id)
# XXX does this stmt need to be in the try/except?
newserial=self.__storage.store(oid, serial, data, version, t)
newserial = self.__storage.store(oid, serial, data, version,
self._transaction)
except TransactionError, v:
# This is a normal transaction errorm such as a conflict error
# or a version lock or conflict error. It doen't need to be
# This is a normal transaction error such as a conflict error
# or a version lock or conflict error. It doesn't need to be
# logged.
newserial=v
self._log("transaction error: %s" % repr(v))
newserial = v
except:
# all errors need to be serialized to prevent unexpected
# returns, which would screw up the return handling.
# IOW, Anything that ends up here is evil enough to be logged.
LOG('ZEO Server', ERROR, 'store error', error=sys.exc_info())
newserial=sys.exc_info()[1]
error = sys.exc_info()
self._log('store error: %s: %s' % (error[0], error[1]),
zLOG.ERROR, error=error)
newserial = sys.exc_info()[1]
else:
if serial != '\0\0\0\0\0\0\0\0':
self.__invalidated.append((oid, version))
try: r=dump((oid,newserial), 1)
try:
nil = dump(newserial, 1)
except:
# We got a pickling error, must be because the
# newserial is an unpicklable exception.
r=StorageServerError("Couldn't pickle exception %s" % `newserial`)
dump('',1) # clear pickler
r=dump((oid, r),1)
self._log("couldn't pickle newserial: %s" % repr(newserial),
zLOG.ERROR)
dump('', 1) # clear pickler
r = StorageServerError("Couldn't pickle exception %s" % \
`newserial`)
newserial = r
self.message_output('s'+r)
return _noreturn
self.client.serialno((oid, newserial))
def vote(self, id):
t=self._transaction
if t is None or id != t.id:
raise POSException.StorageTransactionError(self, id)
return self.__storage.tpc_vote(t)
def vote(self, id):
self._check_tid(id, exc=StorageTransactionError)
self.__storage.tpc_vote(self._transaction)
def transactionalUndo(self, trans_id, id):
t=self._transaction
if t is None or id != t.id:
raise POSException.StorageTransactionError(self, id)
self._check_tid(id, exc=StorageTransactionError)
return self.__storage.transactionalUndo(trans_id, self._transaction)
def undo(self, transaction_id):
oids=self.__storage.undo(transaction_id)
oids = self.__storage.undo(transaction_id)
if oids:
self.__server.invalidate(
self, self.__storage_id, map(lambda oid: (oid,None), oids)
)
self.server.invalidate(self, self.__storage_id,
map(lambda oid: (oid, None, ''), oids))
return oids
return ()
def tpc_abort(self, id):
t=self._transaction
if t is None or id != t.id: return
r=self.__storage.tpc_abort(t)
storage=self.__storage
try: waiting=storage.__waiting
except: waiting=storage.__waiting=[]
while waiting:
f, args = waiting.pop(0)
if apply(f,args): break
self._transaction=None
self.__invalidated=[]
def unlock(self):
if self.__closed: return
self.message_output('UN.')
# When multiple clients are using a single storage, there are several
# different _transaction attributes to keep track of. Each
# StorageProxy object has a single _transaction that refers to its
# current transaction. The storage (self.__storage) has another
# _transaction that is used for the *real* transaction.
# The real trick comes with the __waiting queue for a storage.
# When a StorageProxy pulls a new transaction from the queue, it
# must inform the new transaction's proxy. (The two proxies may
# be the same.) The new transaction's proxy sets its _transaction
# and continues from there.
def tpc_begin(self, id, user, description, ext):
t=self._transaction
if t is not None:
if id == t.id: return
if self._transaction is not None:
if self._transaction.id == id:
self._log("duplicate tpc_begin(%s)" % repr(id))
return
else:
raise StorageServerError(
"Multiple simultaneous tpc_begin requests from the same "
"client."
)
storage=self.__storage
if storage._transaction is not None:
try: waiting=storage.__waiting
except: waiting=storage.__waiting=[]
waiting.append((self.unlock, ()))
return 1 # Return a flag indicating a lock condition.
self._transaction=t=Transaction()
t.id=id
t.user=user
t.description=description
t._extension=ext
storage.tpc_begin(t)
self.__invalidated=[]
def tpc_begin_sync(self, id, user, description, ext):
if self.__closed: return
t=self._transaction
if t is not None and id == t.id: return
storage=self.__storage
if storage._transaction is None:
self.try_again_sync(id, user, description, ext)
else:
try: waiting=storage.__waiting
except: waiting=storage.__waiting=[]
waiting.append((self.try_again_sync, (id, user, description, ext)))
raise StorageTransactionError("Multiple simultaneous tpc_begin"
" requests from one client.")
t = Transaction()
t.id = id
t.user = user
t.description = description
t._extension = ext
if self.__storage._transaction is not None:
d = zrpc2.Delay()
self.__storage.__waiting.append((d, self, t))
return d
self._transaction = t
self.__storage.tpc_begin(t)
self.__invalidated = []
def tpc_finish(self, id):
if not self._check_tid(id):
return
return _noreturn
def try_again_sync(self, id, user, description, ext):
storage=self.__storage
if storage._transaction is None:
self._transaction=t=Transaction()
t.id=id
t.user=user
t.description=description
storage.tpc_begin(t)
self.__invalidated=[]
self.message_output('RN.')
return 1
r = self.__storage.tpc_finish(self._transaction)
assert self.__storage._transaction is None
def tpc_finish(self, id, user, description, ext):
t=self._transaction
if id != t.id: return
if self.__invalidated:
self.server.invalidate(self, self.__storage_id,
self.__invalidated,
self.get_size_info())
storage=self.__storage
r=storage.tpc_finish(t)
try: waiting=storage.__waiting
except: waiting=storage.__waiting=[]
while waiting:
f, args = waiting.pop(0)
if apply(f,args): break
if not self._handle_waiting():
self._transaction = None
self.__invalidated = []
self._transaction=None
if self.__invalidated:
self.__server.invalidate(self, self.__storage_id,
self.__invalidated,
self.get_size_info())
self.__invalidated=[]
def init_storage(storage):
if not hasattr(storage,'tpc_vote'): storage.tpc_vote=lambda *args: None
if __name__=='__main__':
import ZODB.FileStorage
name, port = sys.argv[1:3]
blather(name, port)
try:
port='', int(port)
except:
pass
d = {'1': ZODB.FileStorage.FileStorage(name)}
StorageServer(port, d)
asyncwrap.loop()
def tpc_abort(self, id):
if not self._check_tid(id):
return
r = self.__storage.tpc_abort(self._transaction)
assert self.__storage._transaction is None
if not self._handle_waiting():
self._transaction = None
self.__invalidated = []
def _restart_delayed_transaction(self, delay, trans):
self._transaction = trans
self.__storage.tpc_begin(trans)
self.__invalidated = []
assert self._transaction.id == self.__storage._transaction.id
delay.reply(None)
def _handle_waiting(self):
if self.__storage.__waiting:
delay, proxy, trans = self.__storage.__waiting.pop(0)
proxy._restart_delayed_transaction(delay, trans)
if self is proxy:
return 1
def new_oids(self, n=100):
"""Return a sequence of n new oids, where n defaults to 100"""
if n < 0:
n = 1
return [self.__storage.new_oid() for i in range(n)]
def fixup_storage(storage):
# backwards compatibility hack
if not hasattr(storage,'tpc_vote'):
storage.tpc_vote = lambda *args: None
"""A TransactionBuffer store transaction updates until commit or abort.
A transaction may generate enough data that it is not practical to
always hold pending updates in memory. Instead, a TransactionBuffer
is used to store the data until a commit or abort.
"""
# XXX Figure out what a sensible storage format is
# XXX A faster implementation might store trans data in memory until
# it reaches a certain size.
import tempfile
import cPickle
class TransactionBuffer:
def __init__(self):
self.file = tempfile.TemporaryFile()
self.count = 0
self.size = 0
# It's safe to use a fast pickler because the only objects
# stored are builtin types -- strings or None.
self.pickler = cPickle.Pickler(self.file, 1)
self.pickler.fast = 1
def store(self, oid, version, data):
"""Store oid, version, data for later retrieval"""
self.pickler.dump((oid, version, data))
self.count += 1
# Estimate per-record cache size
self.size = self.size + len(data) + (27 + 12)
if version:
self.size = self.size + len(version) + 4
def invalidate(self, oid, version):
self.pickler.dump((oid, version, None))
self.count += 1
def clear(self):
"""Mark the buffer as empty"""
self.file.seek(0)
self.count = 0
self.size = 0
# XXX unchecked constraints:
# 1. can't call store() after begin_iterate()
# 2. must call clear() after iteration finishes
def begin_iterate(self):
"""Move the file pointer in advance of iteration"""
self.file.flush()
self.file.seek(0)
self.unpickler = cPickle.Unpickler(self.file)
def next(self):
"""Return next tuple of data or None if EOF"""
if self.count == 0:
del self.unpickler
return None
oid_ver_data = self.unpickler.load()
self.count -= 1
return oid_ver_data
def get_size(self):
"""Return size of data stored in buffer (just a hint)."""
return self.size
......@@ -85,11 +85,14 @@
"""Sized message async connections
"""
__version__ = "$Revision: 1.11 $"[11:-2]
__version__ = "$Revision: 1.12 $"[11:-2]
import asyncore, struct
from Exceptions import Disconnected
from zLOG import LOG, TRACE, ERROR, INFO, BLATHER
from types import StringType
import asyncore, string, struct, zLOG, sys, Acquisition
import socket, errno
from zLOG import LOG, TRACE, ERROR, INFO
# Use the dictionary to make sure we get the minimum number of errno
# entries. We expect that EWOULDBLOCK == EAGAIN on most systems --
......@@ -109,81 +112,101 @@ tmp_dict = {errno.EAGAIN: 0,
expected_socket_write_errors = tuple(tmp_dict.keys())
del tmp_dict
class SizedMessageAsyncConnection(Acquisition.Explicit, asyncore.dispatcher):
class SizedMessageAsyncConnection(asyncore.dispatcher):
__super_init = asyncore.dispatcher.__init__
__super_close = asyncore.dispatcher.close
__closed = 1 # Marker indicating that we're closed
__append=None # Marker indicating that we're closed
socket = None # to outwit Sam's getattr
socket=None # to outwit Sam's getattr
READ_SIZE = 8096
def __init__(self, sock, addr, map=None, debug=None):
SizedMessageAsyncConnection.inheritedAttribute(
'__init__')(self, sock, map)
self.addr=addr
self.__super_init(sock, map)
self.addr = addr
if debug is not None:
self._debug=debug
self._debug = debug
elif not hasattr(self, '_debug'):
self._debug=__debug__ and 'smac'
self.__state=None
self.__inp=None
self.__inpl=0
self.__l=4
self.__output=output=[]
self.__append=output.append
self.__pop=output.pop
def handle_read(self,
join=string.join, StringType=type(''), _type=type,
_None=None):
self._debug = __debug__ and 'smac'
self.__state = None
self.__inp = None # None, a single String, or a list
self.__input_len = 0
self.__msg_size = 4
self.__output = []
self.__closed = None
# XXX avoid expensive getattr calls?
def __nonzero__(self):
return 1
def handle_read(self):
# Use a single __inp buffer and integer indexes to make this
# fast.
try:
d=self.recv(8096)
except socket.error, err:
if err[0] in expected_socket_read_errors:
return
raise
if not d: return
inp=self.__inp
if inp is _None:
inp=d
elif _type(inp) is StringType:
inp=[inp,d]
if not d:
return
input_len = self.__input_len + len(d)
msg_size = self.__msg_size
state = self.__state
inp = self.__inp
if msg_size > input_len:
if inp is None:
self.__inp = d
elif type(self.__inp) is StringType:
self.__inp = [self.__inp, d]
else:
self.__inp.append(d)
self.__input_len = input_len
return # keep waiting for more input
# load all previous input and d into single string inp
if isinstance(inp, StringType):
inp = inp + d
elif inp is None:
inp = d
else:
inp.append(d)
inpl=self.__inpl+len(d)
l=self.__l
while 1:
if l <= inpl:
# Woo hoo, we have enough data
if _type(inp) is not StringType: inp=join(inp,'')
d=inp[:l]
inp=inp[l:]
inpl=inpl-l
if self.__state is _None:
# waiting for message
l=struct.unpack(">i",d)[0]
self.__state=1
else:
l=4
self.__state=_None
self.message_input(d)
inp = "".join(inp)
offset = 0
while (offset + msg_size) <= input_len:
msg = inp[offset:offset + msg_size]
offset = offset + msg_size
if state is None:
# waiting for message
msg_size = struct.unpack(">i", msg)[0]
state = 1
else:
break # not enough data
self.__l=l
self.__inp=inp
self.__inpl=inpl
msg_size = 4
state = None
self.message_input(msg)
def readable(self): return 1
def writable(self): return not not self.__output
self.__state = state
self.__msg_size = msg_size
self.__inp = inp[offset:]
self.__input_len = input_len - offset
def readable(self):
return 1
def writable(self):
if len(self.__output) == 0:
return 0
else:
return 1
def handle_write(self):
output=self.__output
output = self.__output
while output:
v=output[0]
v = output[0]
try:
n=self.send(v)
except socket.error, err:
......@@ -191,42 +214,33 @@ class SizedMessageAsyncConnection(Acquisition.Explicit, asyncore.dispatcher):
break # we couldn't write anything
raise
if n < len(v):
output[0]=v[n:]
output[0] = v[n:]
break # we can't write any more
else:
del output[0]
#break # waaa
def handle_close(self):
self.close()
def message_output(self, message,
pack=struct.pack, len=len):
if self._debug:
if len(message) > 40: m=message[:40]+' ...'
else: m=message
LOG(self._debug, TRACE, 'message_output %s' % `m`)
append=self.__append
if append is None:
raise Disconnected("This action is temporarily unavailable.<p>")
append(pack(">i",len(message))+message)
def log_info(self, message, type='info'):
if type=='error': type=ERROR
else: type=INFO
LOG('ZEO', type, message)
log=log_info
def message_output(self, message):
if __debug__:
if self._debug:
if len(message) > 40:
m = message[:40]+' ...'
else:
m = message
LOG(self._debug, TRACE, 'message_output %s' % `m`)
if self.__closed is not None:
raise Disconnected, (
"This action is temporarily unavailable."
"<p>"
)
# do two separate appends to avoid copying the message string
self.__output.append(struct.pack(">i", len(message)))
self.__output.append(message)
def close(self):
if self.__append is not None:
self.__append=None
SizedMessageAsyncConnection.inheritedAttribute('close')(self)
class Disconnected(Exception):
"""The client has become disconnected from the server
"""
if self.__closed is None:
self.__closed = 1
self.__super_close()
......@@ -86,10 +86,13 @@
"""Start the server storage.
"""
__version__ = "$Revision: 1.26 $"[11:-2]
__version__ = "$Revision: 1.27 $"[11:-2]
import sys, os, getopt, string
import StorageServer
import asyncore
def directory(p, n=1):
d=p
while n:
......@@ -115,9 +118,11 @@ def get_storage(m, n, cache={}):
def main(argv):
me=argv[0]
sys.path[:]==filter(None, sys.path)
sys.path.insert(0, directory(me, 2))
# XXX hack for profiling support
global unix, storages, zeo_pid, asyncore
args=[]
last=''
for a in argv[1:]:
......@@ -130,25 +135,13 @@ def main(argv):
args.append(a)
last=a
if os.environ.has_key('INSTANCE_HOME'):
INSTANCE_HOME=os.environ['INSTANCE_HOME']
elif os.path.isdir(os.path.join(directory(me, 4),'var')):
INSTANCE_HOME=directory(me, 4)
else:
INSTANCE_HOME=os.getcwd()
if os.path.isdir(os.path.join(INSTANCE_HOME, 'var')):
var=os.path.join(INSTANCE_HOME, 'var')
else:
var=INSTANCE_HOME
INSTANCE_HOME=os.environ.get('INSTANCE_HOME', directory(me, 4))
zeo_pid=os.environ.get('ZEO_SERVER_PID',
os.path.join(var, 'ZEO_SERVER.pid')
os.path.join(INSTANCE_HOME, 'var', 'ZEO_SERVER.pid')
)
opts, args = getopt.getopt(args, 'p:Ddh:U:sS:u:')
fs=os.path.join(var, 'Data.fs')
fs=os.path.join(INSTANCE_HOME, 'var', 'Data.fs')
usage="""%s [options] [filename]
......@@ -156,17 +149,14 @@ def main(argv):
-D -- Run in debug mode
-d -- Generate detailed debug logging without running
in the foreground.
-U -- Unix-domain socket file to listen on
-u username or uid number
The username to run the ZEO server as. You may want to run
the ZEO server as 'nobody' or some other user with limited
resouces. The only works under Unix, and if the storage
server is started by root.
resouces. The only works under Unix, and if ZServer is
started by root.
-p port -- port to listen on
......@@ -189,23 +179,42 @@ def main(argv):
attr_name -- This is the name to which the storage object
is assigned in the module.
-P file -- Run under profile and dump output to file. Implies the
-s flag.
if no file name is specified, then %s is used.
""" % (me, fs)
try:
opts, args = getopt.getopt(args, 'p:Dh:U:sS:u:P:')
except getopt.error, msg:
print usage
print msg
sys.exit(1)
port=None
debug=detailed=0
debug=0
host=''
unix=None
Z=1
UID='nobody'
prof = None
for o, v in opts:
if o=='-p': port=string.atoi(v)
elif o=='-h': host=v
elif o=='-U': unix=v
elif o=='-u': UID=v
elif o=='-D': debug=1
elif o=='-d': detailed=1
elif o=='-s': Z=0
elif o=='-P': prof = v
if prof:
Z = 0
try:
from ZServer.medusa import asyncore
sys.modules['asyncore']=asyncore
except: pass
if port is None and unix is None:
print usage
......@@ -219,10 +228,9 @@ def main(argv):
sys.exit(1)
fs=args[0]
__builtins__.__debug__=debug
if debug: os.environ['Z_DEBUG_MODE']='1'
if detailed: os.environ['STUPID_LOG_SEVERITY']='-99999'
from zLOG import LOG, INFO, ERROR
# Try to set uid to "-u" -provided uid.
......@@ -263,71 +271,54 @@ def main(argv):
import zdaemon
zdaemon.run(sys.argv, '')
try:
import ZEO.StorageServer, asyncore
storages={}
for o, v in opts:
if o=='-S':
n, m = string.split(v,'=')
if string.find(m,':'):
# we got an attribute name
m, a = string.split(m,':')
else:
# attribute name must be same as storage name
a=n
storages[n]=get_storage(m,a)
if not storages:
import ZODB.FileStorage
storages['1']=ZODB.FileStorage.FileStorage(fs)
# Try to set up a signal handler
try:
import signal
signal.signal(signal.SIGTERM,
lambda sig, frame, s=storages: shutdown(s)
)
signal.signal(signal.SIGINT,
lambda sig, frame, s=storages: shutdown(s, 0)
)
try: signal.signal(signal.SIGHUP, rotate_logs_handler)
except: pass
except: pass
items=storages.items()
items.sort()
for kv in items:
LOG('ZEO Server', INFO, 'Serving %s:\t%s' % kv)
if not unix: unix=host, port
ZEO.StorageServer.StorageServer(unix, storages)
try: ppid, pid = os.getppid(), os.getpid()
except: pass # getpid not supported
else: open(zeo_pid,'w').write("%s %s" % (ppid, pid))
except:
# Log startup exception and tell zdaemon not to restart us.
info=sys.exc_info()
try:
import zLOG
zLOG.LOG("z2", zLOG.PANIC, "Startup exception",
error=info)
except:
pass
import traceback
apply(traceback.print_exception, info)
sys.exit(0)
storages={}
for o, v in opts:
if o=='-S':
n, m = string.split(v,'=')
if string.find(m,':'):
# we got an attribute name
m, a = string.split(m,':')
else:
# attribute name must be same as storage name
a=n
storages[n]=get_storage(m,a)
asyncore.loop()
if not storages:
import ZODB.FileStorage
storages['1']=ZODB.FileStorage.FileStorage(fs)
# Try to set up a signal handler
try:
import signal
signal.signal(signal.SIGTERM,
lambda sig, frame, s=storages: shutdown(s)
)
signal.signal(signal.SIGINT,
lambda sig, frame, s=storages: shutdown(s, 0)
)
signal.signal(signal.SIGHUP, rotate_logs_handler)
finally: pass
items=storages.items()
items.sort()
for kv in items:
LOG('ZEO Server', INFO, 'Serving %s:\t%s' % kv)
if not unix: unix=host, port
if prof:
cmds = \
"StorageServer.StorageServer(unix, storages);" \
'open(zeo_pid,"w").write("%s %s" % (os.getppid(), os.getpid()));' \
"asyncore.loop()"
import profile
profile.run(cmds, prof)
else:
StorageServer.StorageServer(unix, storages)
open(zeo_pid,'w').write("%s %s" % (os.getppid(), os.getpid()))
asyncore.loop()
def rotate_logs():
import zLOG
......@@ -335,10 +326,7 @@ def rotate_logs():
zLOG.log_write.reinitialize()
else:
# Hm, lets at least try to take care of the stupid logger:
if hasattr(zLOG, '_set_stupid_dest'):
zLOG._set_stupid_dest(None)
else:
zLOG._stupid_dest = None
zLOG._stupid_dest=None
def rotate_logs_handler(signum, frame):
rotate_logs()
......@@ -359,7 +347,7 @@ def shutdown(storages, die=1):
for storage in storages.values():
try: storage.close()
except: pass
finally: pass
try:
from zLOG import LOG, INFO
......
# Copyright (c) 2001 Zope Corporation and Contributors. All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 1.1 (ZPL). A copy of the ZPL should accompany this
# distribution. THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL
# EXPRESS OR IMPLIED WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT
# LIMITED TO, THE IMPLIED WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST
# INFRINGEMENT, AND FITNESS FOR A PARTICULAR PURPOSE.
"""Library for forking storage server and connecting client storage"""
import asyncore
import os
import profile
import random
import socket
import sys
import traceback
import types
import ZEO.ClientStorage, ZEO.StorageServer
# Change value of PROFILE to enable server-side profiling
PROFILE = 0
if PROFILE:
import hotshot
def get_port():
"""Return a port that is not in use.
......@@ -66,9 +78,11 @@ else:
buf = self.recv(4)
if buf:
assert buf == "done"
server.close_server()
asyncore.socket_map.clear()
def handle_close(self):
server.close_server()
asyncore.socket_map.clear()
class ZEOClientExit:
......@@ -77,20 +91,27 @@ else:
self.pipe = pipe
def close(self):
os.write(self.pipe, "done")
os.close(self.pipe)
try:
os.write(self.pipe, "done")
os.close(self.pipe)
except os.error:
pass
def start_zeo_server(storage, addr):
rd, wr = os.pipe()
pid = os.fork()
if pid == 0:
if PROFILE:
p = profile.Profile()
p.runctx("run_server(storage, addr, rd, wr)", globals(),
locals())
p.dump_stats("stats.s.%d" % os.getpid())
else:
run_server(storage, addr, rd, wr)
try:
if PROFILE:
p = hotshot.Profile("stats.s.%d" % os.getpid())
p.runctx("run_server(storage, addr, rd, wr)",
globals(), locals())
p.close()
else:
run_server(storage, addr, rd, wr)
except:
print "Exception in ZEO server process"
traceback.print_exc()
os._exit(0)
else:
os.close(rd)
......@@ -98,11 +119,11 @@ else:
def run_server(storage, addr, rd, wr):
# in the child, run the storage server
global server
os.close(wr)
ZEOServerExit(rd)
serv = ZEO.StorageServer.StorageServer(addr, {'1':storage})
server = ZEO.StorageServer.StorageServer(addr, {'1':storage})
asyncore.loop()
os.close(rd)
storage.close()
if isinstance(addr, types.StringType):
os.unlink(addr)
......@@ -128,6 +149,7 @@ else:
s = ZEO.ClientStorage.ClientStorage(addr, storage_id,
debug=1, client=cache,
cache_size=cache_size,
min_disconnect_poll=0.5)
min_disconnect_poll=0.5,
wait_for_server_on_startup=1)
return s, exit, pid
import random
import unittest
from ZEO.TransactionBuffer import TransactionBuffer
def random_string(size):
"""Return a random string of size size."""
l = [chr(random.randrange(256)) for i in range(size)]
return "".join(l)
def new_store_data():
"""Return arbitrary data to use as argument to store() method."""
return random_string(8), '', random_string(random.randrange(1000))
def new_invalidate_data():
"""Return arbitrary data to use as argument to invalidate() method."""
return random_string(8), ''
class TransBufTests(unittest.TestCase):
def checkTypicalUsage(self):
tbuf = TransactionBuffer()
tbuf.store(*new_store_data())
tbuf.invalidate(*new_invalidate_data())
tbuf.begin_iterate()
while 1:
o = tbuf.next()
if o is None:
break
tbuf.clear()
def doUpdates(self, tbuf):
data = []
for i in range(10):
d = new_store_data()
tbuf.store(*d)
data.append(d)
d = new_invalidate_data()
tbuf.invalidate(*d)
data.append(d)
tbuf.begin_iterate()
for i in range(len(data)):
x = tbuf.next()
if x[2] is None:
# the tbuf add a dummy None to invalidates
x = x[:2]
self.assertEqual(x, data[i])
def checkOrderPreserved(self):
tbuf = TransactionBuffer()
self.doUpdates(tbuf)
def checkReusable(self):
tbuf = TransactionBuffer()
self.doUpdates(tbuf)
tbuf.clear()
self.doUpdates(tbuf)
tbuf.clear()
self.doUpdates(tbuf)
def test_suite():
return unittest.makeSuite(TransBufTests, 'check')
......@@ -85,11 +85,14 @@
"""Sized message async connections
"""
__version__ = "$Revision: 1.11 $"[11:-2]
__version__ = "$Revision: 1.12 $"[11:-2]
import asyncore, struct
from Exceptions import Disconnected
from zLOG import LOG, TRACE, ERROR, INFO, BLATHER
from types import StringType
import asyncore, string, struct, zLOG, sys, Acquisition
import socket, errno
from zLOG import LOG, TRACE, ERROR, INFO
# Use the dictionary to make sure we get the minimum number of errno
# entries. We expect that EWOULDBLOCK == EAGAIN on most systems --
......@@ -109,81 +112,101 @@ tmp_dict = {errno.EAGAIN: 0,
expected_socket_write_errors = tuple(tmp_dict.keys())
del tmp_dict
class SizedMessageAsyncConnection(Acquisition.Explicit, asyncore.dispatcher):
class SizedMessageAsyncConnection(asyncore.dispatcher):
__super_init = asyncore.dispatcher.__init__
__super_close = asyncore.dispatcher.close
__closed = 1 # Marker indicating that we're closed
__append=None # Marker indicating that we're closed
socket = None # to outwit Sam's getattr
socket=None # to outwit Sam's getattr
READ_SIZE = 8096
def __init__(self, sock, addr, map=None, debug=None):
SizedMessageAsyncConnection.inheritedAttribute(
'__init__')(self, sock, map)
self.addr=addr
self.__super_init(sock, map)
self.addr = addr
if debug is not None:
self._debug=debug
self._debug = debug
elif not hasattr(self, '_debug'):
self._debug=__debug__ and 'smac'
self.__state=None
self.__inp=None
self.__inpl=0
self.__l=4
self.__output=output=[]
self.__append=output.append
self.__pop=output.pop
def handle_read(self,
join=string.join, StringType=type(''), _type=type,
_None=None):
self._debug = __debug__ and 'smac'
self.__state = None
self.__inp = None # None, a single String, or a list
self.__input_len = 0
self.__msg_size = 4
self.__output = []
self.__closed = None
# XXX avoid expensive getattr calls?
def __nonzero__(self):
return 1
def handle_read(self):
# Use a single __inp buffer and integer indexes to make this
# fast.
try:
d=self.recv(8096)
except socket.error, err:
if err[0] in expected_socket_read_errors:
return
raise
if not d: return
inp=self.__inp
if inp is _None:
inp=d
elif _type(inp) is StringType:
inp=[inp,d]
if not d:
return
input_len = self.__input_len + len(d)
msg_size = self.__msg_size
state = self.__state
inp = self.__inp
if msg_size > input_len:
if inp is None:
self.__inp = d
elif type(self.__inp) is StringType:
self.__inp = [self.__inp, d]
else:
self.__inp.append(d)
self.__input_len = input_len
return # keep waiting for more input
# load all previous input and d into single string inp
if isinstance(inp, StringType):
inp = inp + d
elif inp is None:
inp = d
else:
inp.append(d)
inpl=self.__inpl+len(d)
l=self.__l
while 1:
if l <= inpl:
# Woo hoo, we have enough data
if _type(inp) is not StringType: inp=join(inp,'')
d=inp[:l]
inp=inp[l:]
inpl=inpl-l
if self.__state is _None:
# waiting for message
l=struct.unpack(">i",d)[0]
self.__state=1
else:
l=4
self.__state=_None
self.message_input(d)
inp = "".join(inp)
offset = 0
while (offset + msg_size) <= input_len:
msg = inp[offset:offset + msg_size]
offset = offset + msg_size
if state is None:
# waiting for message
msg_size = struct.unpack(">i", msg)[0]
state = 1
else:
break # not enough data
self.__l=l
self.__inp=inp
self.__inpl=inpl
msg_size = 4
state = None
self.message_input(msg)
def readable(self): return 1
def writable(self): return not not self.__output
self.__state = state
self.__msg_size = msg_size
self.__inp = inp[offset:]
self.__input_len = input_len - offset
def readable(self):
return 1
def writable(self):
if len(self.__output) == 0:
return 0
else:
return 1
def handle_write(self):
output=self.__output
output = self.__output
while output:
v=output[0]
v = output[0]
try:
n=self.send(v)
except socket.error, err:
......@@ -191,42 +214,33 @@ class SizedMessageAsyncConnection(Acquisition.Explicit, asyncore.dispatcher):
break # we couldn't write anything
raise
if n < len(v):
output[0]=v[n:]
output[0] = v[n:]
break # we can't write any more
else:
del output[0]
#break # waaa
def handle_close(self):
self.close()
def message_output(self, message,
pack=struct.pack, len=len):
if self._debug:
if len(message) > 40: m=message[:40]+' ...'
else: m=message
LOG(self._debug, TRACE, 'message_output %s' % `m`)
append=self.__append
if append is None:
raise Disconnected("This action is temporarily unavailable.<p>")
append(pack(">i",len(message))+message)
def log_info(self, message, type='info'):
if type=='error': type=ERROR
else: type=INFO
LOG('ZEO', type, message)
log=log_info
def message_output(self, message):
if __debug__:
if self._debug:
if len(message) > 40:
m = message[:40]+' ...'
else:
m = message
LOG(self._debug, TRACE, 'message_output %s' % `m`)
if self.__closed is not None:
raise Disconnected, (
"This action is temporarily unavailable."
"<p>"
)
# do two separate appends to avoid copying the message string
self.__output.append(struct.pack(">i", len(message)))
self.__output.append(message)
def close(self):
if self.__append is not None:
self.__append=None
SizedMessageAsyncConnection.inheritedAttribute('close')(self)
class Disconnected(Exception):
"""The client has become disconnected from the server
"""
if self.__closed is None:
self.__closed = 1
self.__super_close()
"""RPC protocol for ZEO based on asyncore
The basic protocol is as:
a pickled tuple containing: msgid, flags, method, args
msgid is an integer.
flags is an integer.
The only currently defined flag is ASYNC (0x1), which means
the client does not expect a reply.
method is a string specifying the method to invoke.
For a reply, the method is ".reply".
args is a tuple of the argument to pass to method.
XXX need to specify a version number that describes the protocol.
allow for future revision.
XXX support multiple outstanding calls
XXX factor out common pattern of deciding what protocol to use based
on whether address is tuple or string
"""
import asyncore
import errno
import cPickle
import os
import select
import socket
import sys
import threading
import thread
import time
import traceback
import types
from cStringIO import StringIO
from ZODB import POSException
from ZEO import smac, trigger
from Exceptions import Disconnected
import zLOG
import ThreadedAsync
from Exceptions import Disconnected
REPLY = ".reply" # message name used for replies
ASYNC = 1
_label = "zrpc:%s" % os.getpid()
def new_label():
global _label
_label = "zrpc:%s" % os.getpid()
def log(message, level=zLOG.BLATHER, label=None, error=None):
zLOG.LOG(label or _label, level, message, error=error)
class ZRPCError(POSException.StorageError):
pass
class DecodingError(ZRPCError):
"""A ZRPC message could not be decoded."""
class DisconnectedError(ZRPCError, Disconnected):
"""The database storage is disconnected from the storage server."""
# Export the mainloop function from asycnore to zrpc clients
loop = asyncore.loop
def connect(addr, client=None):
if type(addr) == types.TupleType:
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
else:
s = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
s.connect(addr)
c = Connection(s, addr, client)
return c
class Marshaller:
"""Marshal requests and replies to second across network"""
# It's okay to share a single Pickler as long as it's in fast
# mode, which means that it doesn't have a memo.
pickler = cPickle.Pickler()
pickler.fast = 1
pickle = pickler.dump
errors = (cPickle.UnpickleableError,
cPickle.UnpicklingError,
cPickle.PickleError,
cPickle.PicklingError)
def encode(self, msgid, flags, name, args):
"""Returns an encoded message"""
return self.pickle((msgid, flags, name, args), 1)
def decode(self, msg):
"""Decodes msg and returns its parts"""
unpickler = cPickle.Unpickler(StringIO(msg))
unpickler.find_global = find_global
try:
return unpickler.load() # msgid, flags, name, args
except (cPickle.UnpicklingError, IndexError), err_msg:
log("can't decode %s" % repr(msg), level=zLOG.ERROR)
raise DecodingError(msg)
class Delay:
"""Used to delay response to client for synchronous calls
When a synchronous call is made and the original handler returns
without handling the call, it returns a Delay object that prevents
the mainloop from sending a response.
"""
def set_sender(self, msgid, send_reply):
self.msgid = msgid
self.send_reply = send_reply
def reply(self, obj):
self.send_reply(self.msgid, obj)
class Connection(smac.SizedMessageAsyncConnection):
"""Dispatcher for RPC on object
The connection supports synchronous calls, which expect a return,
and asynchronous calls that do not.
It uses the Marshaller class to handle encoding and decoding of
method calls are arguments.
A Connection is designed for use in a multithreaded application,
where a synchronous call must block until a response is ready.
The current design only allows a single synchronous call to be
outstanding.
"""
__super_init = smac.SizedMessageAsyncConnection.__init__
__super_close = smac.SizedMessageAsyncConnection.close
__super_writable = smac.SizedMessageAsyncConnection.writable
def __init__(self, sock, addr, obj=None):
self.msgid = 0
self.obj = obj
self.marshal = Marshaller()
self.closed = 0
self.async = 0
# The reply lock is used to block when a synchronous call is
# waiting for a response
self.__super_init(sock, addr)
self._map = {self._fileno: self}
self._prepare_async()
self.__call_lock = thread.allocate_lock()
self.__reply_lock = thread.allocate_lock()
self.__reply_lock.acquire()
if isinstance(obj, Handler):
self.set_caller = 1
else:
self.set_caller = 0
def __repr__(self):
return "<%s %s>" % (self.__class__.__name__, self.addr)
def close(self):
if self.closed:
return
self.closed = 1
self.__super_close()
def register_object(self, obj):
"""Register obj as the true object to invoke methods on"""
self.obj = obj
def message_input(self, message):
"""Decoding an incoming message and dispatch it"""
# XXX Not sure what to do with errors that reach this level.
# Need to catch ZRPCErrors in handle_reply() and
# handle_request() so that they get back to the client.
try:
msgid, flags, name, args = self.marshal.decode(message)
except DecodingError, msg:
return self.return_error(None, None, sys.exc_info()[0],
sys.exc_info()[1])
if __debug__:
log("recv msg: %s, %s, %s, %s" % (msgid, flags, name,
repr(args)[:40]),
level=zLOG.DEBUG)
if name == REPLY:
self.handle_reply(msgid, flags, args)
else:
self.handle_request(msgid, flags, name, args)
def handle_reply(self, msgid, flags, args):
if __debug__:
log("recv reply: %s, %s, %s" % (msgid, flags, str(args)[:40]),
level=zLOG.DEBUG)
self.__reply = msgid, flags, args
self.__reply_lock.release() # will fail if lock is unlocked
def handle_request(self, msgid, flags, name, args):
if __debug__:
log("call %s%s on %s" % (name, repr(args)[:40], repr(self.obj)),
zLOG.DEBUG)
if not self.check_method(name):
raise ZRPCError("Invalid method name: %s on %s" % (name,
`self.obj`))
meth = getattr(self.obj, name)
try:
if self.set_caller:
self.obj.set_caller(self)
try:
ret = meth(*args)
finally:
self.obj.clear_caller()
else:
ret = meth(*args)
except (POSException.UndoError,
POSException.VersionCommitError), msg:
error = sys.exc_info()
log("%s() raised exception: %s" % (name, msg), zLOG.ERROR, error)
return self.return_error(msgid, flags, error[0], error[1])
except Exception, msg:
error = sys.exc_info()
log("%s() raised exception: %s" % (name, msg), zLOG.ERROR, error)
return self.return_error(msgid, flags, error[0], error[1])
if flags & ASYNC:
if ret is not None:
log("async method %s returned value %s" % (name, repr(ret)),
zLOG.ERROR)
raise ZRPCError("async method returned value")
else:
if __debug__:
log("%s return %s" % (name, repr(ret)[:40]), zLOG.DEBUG)
if isinstance(ret, Delay):
ret.set_sender(msgid, self.send_reply)
else:
self.send_reply(msgid, ret)
def handle_error(self):
self.log_error()
self.close()
def log_error(self, msg="No error message supplied"):
error = sys.exc_info()
log(msg, zLOG.ERROR, error=error)
del error
def check_method(self, name):
# XXX minimal security check should go here: Is name exported?
return hasattr(self.obj, name)
def send_reply(self, msgid, ret):
msg = self.marshal.encode(msgid, 0, REPLY, ret)
self.message_output(msg)
def return_error(self, msgid, flags, err_type, err_value):
if flags is None:
self.log_error("Exception raised during decoding")
return
if flags & ASYNC:
self.log_error("Asynchronous call raised exception: %s" % self)
return
if type(err_value) is not types.InstanceType:
err_value = err_type, err_value
try:
msg = self.marshal.encode(msgid, 0, REPLY, (err_type, err_value))
except self.marshal.errors:
err = ZRPCError("Couldn't pickle error %s" % `err_value`)
msg = self.marshal.encode(msgid, 0, REPLY, (ZRPCError, err))
self.message_output(msg)
self._do_io()
# The next two methods are used by clients to invoke methods on
# remote objects
# XXX Should revise design to allow multiple outstanding
# synchronous calls
def call(self, method, *args):
self.__call_lock.acquire()
try:
return self._call(method, args)
finally:
self.__call_lock.release()
def _call(self, method, args):
if self.closed:
raise DisconnectedError("This action is temporarily unavailable")
msgid = self.msgid
self.msgid = self.msgid + 1
if __debug__:
log("send msg: %d, 0, %s, ..." % (msgid, method))
self.message_output(self.marshal.encode(msgid, 0, method, args))
self.__reply = None
# lock is currently held
self._do_io(wait=1)
# lock is held again...
r_msgid, r_flags, r_args = self.__reply
self.__reply_lock.acquire()
assert r_msgid == msgid, "%s != %s: %s" % (r_msgid, msgid, r_args)
if type(r_args) == types.TupleType \
and type(r_args[0]) == types.ClassType \
and issubclass(r_args[0], Exception):
raise r_args[1] # error raised by server
return r_args
def callAsync(self, method, *args):
self.__call_lock.acquire()
try:
self._callAsync(method, args)
finally:
self.__call_lock.release()
def _callAsync(self, method, args):
if self.closed:
raise DisconnectedError("This action is temporarily unavailable")
msgid = self.msgid
self.msgid += 1
if __debug__:
log("send msg: %d, %d, %s, ..." % (msgid, ASYNC, method))
self.message_output(self.marshal.encode(msgid, ASYNC, method, args))
self._do_io()
# handle IO, possibly in async mode
def sync(self):
pass # XXX what is this supposed to do?
def _prepare_async(self):
self._async = 0
ThreadedAsync.register_loop_callback(self.set_async)
# XXX If we are not in async mode, this will cause dead
# Connections to be leaked.
def set_async(self, map):
# XXX do we need a lock around this? I'm not sure there is
# any harm to a race with _do_io().
self._async = 1
self.trigger = trigger.trigger()
def is_async(self):
return self._async
def _do_io(self, wait=0): # XXX need better name
# XXX invariant? lock must be held when calling with wait==1
# otherwise, in non-async mode, there will be no poll
if __debug__:
log("_do_io(wait=%d), async=%d" % (wait, self.is_async()),
level=zLOG.DEBUG)
if self.is_async():
self.trigger.pull_trigger()
if wait:
self.__reply_lock.acquire()
# wait until reply...
self.__reply_lock.release()
else:
if wait:
# do loop only if lock is already acquired
while not self.__reply_lock.acquire(0):
asyncore.poll(10.0, self._map)
if self.closed:
raise Disconnected()
self.__reply_lock.release()
else:
asyncore.poll(0.0, self._map)
# XXX it seems that we need to release before returning if
# called with wait==1. perhaps the caller need not acquire
# upon return...
class ServerConnection(Connection):
# XXX this is a hack
def _do_io(self, wait=0):
"""If this is a server, there is no explicit IO to do"""
pass
class ConnectionManager:
"""Keeps a connection up over time"""
# XXX requires that obj implement notifyConnected and
# notifyDisconnected. make this optional?
def __init__(self, addr, obj=None, debug=1, tmin=1, tmax=180):
self.set_addr(addr)
self.obj = obj
self.tmin = tmin
self.tmax = tmax
self.debug = debug
self.connected = 0
self.connection = None
# If _thread is not None, then there is a helper thread
# attempting to connect. _thread is protected by _connect_lock.
self._thread = None
self._connect_lock = threading.Lock()
self.trigger = None
self.async = 0
self.closed = 0
ThreadedAsync.register_loop_callback(self.set_async)
def __repr__(self):
return "<%s for %s>" % (self.__class__.__name__, self.addr)
def set_addr(self, addr):
"Set one or more addresses to use for server."
# For backwards compatibility (and simplicity?) the
# constructor accepts a single address in the addr argument --
# a string for a Unix domain socket or a 2-tuple with a
# hostname and port. It can also accept a list of such addresses.
addr_type = self._guess_type(addr)
if addr_type is not None:
self.addr = [(addr_type, addr)]
else:
self.addr = []
for a in addr:
addr_type = self._guess_type(a)
if addr_type is None:
raise ValueError, "unknown address in list: %s" % repr(a)
self.addr.append((addr_type, a))
def _guess_type(self, addr):
if isinstance(addr, types.StringType):
return socket.AF_UNIX
if (len(addr) == 2
and isinstance(addr[0], types.StringType)
and isinstance(addr[1], types.IntType)):
return socket.AF_INET
# not anything I know about
return None
def close(self):
"""Prevent ConnectionManager from opening new connections"""
self.closed = 1
self._connect_lock.acquire()
try:
if self._thread is not None:
self._thread.join()
finally:
self._connect_lock.release()
if self.connection:
self.connection.close()
def register_object(self, obj):
self.obj = obj
def set_async(self, map):
# XXX need each connection started with async==0 to have a callback
self.async = 1 # XXX needs to be set on the Connection
self.trigger = trigger.trigger()
def connect(self, sync=0):
if self.connected == 1:
return
self._connect_lock.acquire()
try:
if self._thread is None:
zLOG.LOG(_label, zLOG.BLATHER,
"starting thread to connect to server")
self._thread = threading.Thread(target=self.__m_connect)
self._thread.start()
if sync:
try:
self._thread.join()
except AttributeError:
# probably means the thread exited quickly
pass
finally:
self._connect_lock.release()
def attempt_connect(self):
# XXX will _attempt_connects() take too long? think select().
self._attempt_connects()
return self.connected
def notify_closed(self, conn):
self.connected = 0
self.connection = None
self.obj.notifyDisconnected()
if not self.closed:
self.connect()
class Connected(Exception):
def __init__(self, sock):
self.sock = sock
def __m_connect(self):
# a new __connect that handles multiple addresses
try:
delay = self.tmin
while not (self.closed or self._attempt_connects()):
time.sleep(delay)
delay *= 2
if delay > self.tmax:
delay = self.tmax
finally:
self._thread = None
def _attempt_connects(self):
"Return true if any connect attempt succeeds."
sockets = {}
zLOG.LOG(_label, zLOG.BLATHER,
"attempting connection on %d sockets" % len(self.addr))
try:
for domain, addr in self.addr:
if __debug__:
zLOG.LOG(_label, zLOG.DEBUG,
"attempt connection to %s" % repr(addr))
s = socket.socket(domain, socket.SOCK_STREAM)
s.setblocking(0)
# XXX can still block for a while if addr requires DNS
e = self._connect_ex(s, addr)
if e is not None:
sockets[s] = addr
# next wait until the actually connect
while sockets:
if self.closed:
for s in sockets.keys():
s.close()
return 0
try:
r, w, x = select.select([], sockets.keys(), [], 1.0)
except select.error:
continue
for s in w:
e = self._connect_ex(s, sockets[s])
if e is None:
del sockets[s]
except self.Connected, container:
s = container.sock
del sockets[s]
# close all the other sockets
for s in sockets.keys():
s.close()
return 1
return 0
def _connect_ex(self, s, addr):
"""Call s.connect_ex(addr) and return true if loop should continue.
We have to handle several possible return values from
connect_ex(). If the socket is connected and the initial ZEO
setup works, we're done. Report success by raising an
exception. Yes, the is odd, but we need to bail out of the
select() loop in the caller and an exception is a principled
way to do the abort.
If the socket sonnects and the initial ZEO setup fails or the
connect_ex() returns an error, we close the socket and ignore it.
If connect_ex() returns EINPROGRESS, we need to try again later.
"""
e = s.connect_ex(addr)
if e == errno.EINPROGRESS:
return 1
elif e == 0:
c = self._test_connection(s, addr)
zLOG.LOG(_label, zLOG.DEBUG, "connected to %s" % repr(addr))
if c:
self.connected = 1
raise self.Connected(s)
else:
if __debug__:
zLOG.LOG(_label, zLOG.DEBUG,
"error connecting to %s: %s" % (addr,
errno.errorcode[e]))
s.close()
def _test_connection(self, s, addr):
c = ManagedConnection(s, addr, self.obj, self)
try:
self.obj.notifyConnected(c)
self.connection = c
return 1
except:
# XXX zLOG the error
c.close()
return 0
class ManagedServerConnection(ServerConnection):
"""A connection that notifies its ConnectionManager of closing"""
__super_init = Connection.__init__
__super_close = Connection.close
def __init__(self, sock, addr, obj, mgr):
self.__mgr = mgr
self.__super_init(sock, addr, obj)
def close(self):
self.__super_close()
self.__mgr.close(self)
class ManagedConnection(Connection):
"""A connection that notifies its ConnectionManager of closing.
A managed connection also defers the ThreadedAsync work to its
manager.
"""
__super_init = Connection.__init__
__super_close = Connection.close
def __init__(self, sock, addr, obj, mgr):
self.__mgr = mgr
if self.__mgr.async:
self.__async = 1
self.trigger = self.__mgr.trigger
else:
self.__async = None
self.__super_init(sock, addr, obj)
def _prepare_async(self):
# Don't do the register_loop_callback that the superclass does
pass
def is_async(self):
if self.__async:
return 1
async = self.__mgr.async
if async:
self.__async = 1
self.trigger = self.__mgr.trigger
return async
def close(self):
self.__super_close()
self.__mgr.notify_closed(self)
class Dispatcher(asyncore.dispatcher):
"""A server that accepts incoming RPC connections"""
__super_init = asyncore.dispatcher.__init__
reuse_addr = 1
def __init__(self, addr, obj=None, factory=Connection, reuse_addr=None):
self.__super_init()
self.addr = addr
self.obj = obj
self.factory = factory
self.clients = []
if reuse_addr is not None:
self.reuse_addr = reuse_addr
self._open_socket()
def _open_socket(self):
if type(self.addr) == types.TupleType:
self.create_socket(socket.AF_INET, socket.SOCK_STREAM)
else:
self.create_socket(socket.AF_UNIX, socket.SOCK_STREAM)
self.set_reuse_addr()
self.bind(self.addr)
self.listen(5)
def writable(self):
return 0
def readable(self):
return 1
def handle_accept(self):
try:
sock, addr = self.accept()
except socket.error, msg:
log("accepted failed: %s" % msg)
return
c = self.factory(sock, addr, self.obj)
log("connect from %s: %s" % (repr(addr), c))
self.clients.append(c)
class Handler:
"""Base class used to handle RPC caller discovery"""
def set_caller(self, addr):
self.__caller = addr
def get_caller(self):
return self.__caller
def clear_caller(self):
self.__caller = None
_globals = globals()
_silly = ('__doc__',)
def find_global(module, name):
"""Helper for message unpickler"""
try:
m = __import__(module, _globals, _globals, _silly)
except ImportError, msg:
raise ZRPCError("import error %s: %s" % (module, msg))
try:
r = getattr(m, name)
except AttributeError:
raise ZRPCError("module %s has no global %s" % (module, name))
safe = getattr(r, '__no_side_effects__', 0)
if safe:
return r
if type(r) == types.ClassType and issubclass(r, Exception):
return r
raise ZRPCError("Unsafe global: %s.%s" % (module, name))
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment