Commit 5c2fe859 authored by Jim Fulton's avatar Jim Fulton

Merged changes from ZEO-Dev branch in preparation for 0.2

parent 5f6e7d61
Zope Enterprise Option (ZEO) Revision History
ZEO 0.2
This release is expected to be close to beta quality. Initially, the
primary goals of this release were to:
- Correct some consistency problems that had been observed in
0.1 on starup.
- Allow ZEO clients to detect, survive, and recover from
disconnection from the ZEO server.
Based on some feedback from some folks who tried 0.1, improving
write performance was made a priority.
Features
- The ZEO Client now handles server failures gracefully:
o The client with a persistent cache can generally startup
even if the server is not running, assuming that it has at
least a minimal number of objects in the cache.
o The client will continue to function even if the server
connection is interuppted.
o Server availability is detected by the client (which tries
to connect to the server every few minutes). A disconnected
client will automatically reconnect to an available server.
o When the client is disconnected, write transactions cannot
be performed. Reads fail for objects that are not in the
cache.
- Performance enhancements
The speed of write-intensive operations have been improved
approximately 70%. When using Unix domain sockets for
client/server communication, ZEO transactions take roughly 2-3
times as long as FileStorage transactions to commit.
(This was based on some tests. Your mileage may vary.)
- Packing support was added. Note that packing is done
asynchrounously. The client returns immediately from a pack
call. The server packs in a thread and sends updated
statistics to the client when packing is completed.
- Support for Unix-domain sockets was added.
- Pickles sent to the server are now checked to make sure that
they don't contain unapproved instance or global-variable
(function) pickles.
Bugs fixed
- Data could be badly inconsistent when a persistent cache
was started, due to a bug in the cache initialization logic.
- The application was allowed to begin operation while the cache
was being verified. This could lead to harmful inconsistencies.
Changes made to Zope to support ZEO
- A number of changes were made to ZODB to support asynchronous
storage during transaction commit.
- Normally Zope updates the database during startup to reflect
product changes. This behavior is now suppressed when the
ZEO_CLIENT environment variable is set. It doesn't make sense
for many clients to update the database for the same products.
- The asyncore module was modified to add support for multiple
asyncore loops. This change was applied to asyncore in the
Zope and the (official, owned by Sam Rushing) medusa CVS
trees.
- A new module, ThreadedAsync.py has been added in the Zope
lib/python directory. This module provides notification to
async objects (like ZEO clients) to let them know when the
asyncore main loop has started. This was needed to enable use
of async code before the main loop starts.
ZEO 0.1 (aka "iteration 1")
This was an initial alpha of ZEO that demonstrated basic
functionalities. It lacked robustness and has some performance
problems on writes.
###################################################################### ##############################################################################
# Digital Creations Options License Version 0.9.0
# -----------------------------------------------
# #
# Copyright (c) 1999, Digital Creations. All rights reserved. # Zope Public License (ZPL) Version 1.0
# -------------------------------------
# #
# This license covers Zope software delivered as "options" by Digital # Copyright (c) Digital Creations. All rights reserved.
# Creations.
# #
# Use in source and binary forms, with or without modification, are # This license has been certified as Open Source(tm).
# permitted provided that the following conditions are met:
# #
# 1. Redistributions are not permitted in any form. # Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are
# met:
# #
# 2. This license permits one copy of software to be used by up to five # 1. Redistributions in source code must retain the above copyright
# developers in a single company. Use by more than five developers # notice, this list of conditions, and the following disclaimer.
# requires additional licenses.
# #
# 3. Software may be used to operate any type of website, including # 2. Redistributions in binary form must reproduce the above copyright
# publicly accessible ones. # notice, this list of conditions, and the following disclaimer in
# the documentation and/or other materials provided with the
# distribution.
# #
# 4. Software is not fully documented, and the customer acknowledges # 3. Digital Creations requests that attribution be given to Zope
# that the product can best be utilized by reading the source code. # in any manner possible. Zope includes a "Powered by Zope"
# button that is installed by default. While it is not a license
# violation to remove this button, it is requested that the
# attribution remain. A significant investment has been put
# into Zope, and this effort will continue if the Zope community
# continues to grow. This is one way to assure that growth.
#
# 4. All advertising materials and documentation mentioning
# features derived from or use of this software must display
# the following acknowledgement:
#
# "This product includes software developed by Digital Creations
# for use in the Z Object Publishing Environment
# (http://www.zope.org/)."
#
# In the event that the product being advertised includes an
# intact Zope distribution (with copyright and license included)
# then this clause is waived.
#
# 5. Names associated with Zope or Digital Creations must not be used to
# endorse or promote products derived from this software without
# prior written permission from Digital Creations.
#
# 6. Modified redistributions of any form whatsoever must retain
# the following acknowledgment:
#
# "This product includes software developed by Digital Creations
# for use in the Z Object Publishing Environment
# (http://www.zope.org/)."
#
# Intact (re-)distributions of any official Zope release do not
# require an external acknowledgement.
#
# 7. Modifications are encouraged but must be packaged separately as
# patches to official Zope releases. Distributions that do not
# clearly separate the patches from the original work must be clearly
# labeled as unofficial distributions. Modifications which do not
# carry the name Zope may be packaged in any form, as long as they
# conform to all of the clauses above.
# #
# 5. Support for software is included for 90 days in email only. Further
# support can be purchased separately.
# #
# Disclaimer # Disclaimer
# #
...@@ -39,7 +75,13 @@ ...@@ -39,7 +75,13 @@
# OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT # OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT
# OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF # OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
# SUCH DAMAGE. # SUCH DAMAGE.
###################################################################### #
#
# This software consists of contributions made by Digital Creations and
# many individuals on behalf of Digital Creations. Specific
# attributions are listed in the accompanying credits file.
#
##############################################################################
"""Implement a client cache """Implement a client cache
The cache is managed as two files, var/c0.zec and var/c1.zec. The cache is managed as two files, var/c0.zec and var/c1.zec.
...@@ -102,7 +144,7 @@ file 0 and file 1. ...@@ -102,7 +144,7 @@ file 0 and file 1.
""" """
__version__ = "$Revision: 1.4 $"[11:-2] __version__ = "$Revision: 1.5 $"[11:-2]
import os, tempfile import os, tempfile
from struct import pack, unpack from struct import pack, unpack
...@@ -111,9 +153,9 @@ magic='ZEC0' ...@@ -111,9 +153,9 @@ magic='ZEC0'
class ClientCache: class ClientCache:
def __init__(self, storage='', size=20000000, client=None): def __init__(self, storage='', size=20000000, client=None, var=None):
var=os.path.join(INSTANCE_HOME,'var')
if client: if client:
if var is None: var=os.path.join(INSTANCE_HOME,'var')
self._p=p=map(lambda i, p=storage, var=var, c=client: self._p=p=map(lambda i, p=storage, var=var, c=client:
os.path.join(var,'c%s-%s-%s.zec' % (p, c, i)), os.path.join(var,'c%s-%s-%s.zec' % (p, c, i)),
(0,1)) (0,1))
...@@ -317,6 +359,12 @@ def read_index(index, serial, f, current): ...@@ -317,6 +359,12 @@ def read_index(index, serial, f, current):
if current: index[oid]=-pos if current: index[oid]=-pos
else: index[oid]=pos else: index[oid]=pos
serial[oid]=h[-8:], vs serial[oid]=h[-8:], vs
else:
if serial.has_key(oid):
# We has a record for this oid, but it was invalidated!
del serial[oid]
del index[oid]
pos=pos+tlen pos=pos+tlen
......
###################################################################### ##############################################################################
# Digital Creations Options License Version 0.9.0
# -----------------------------------------------
# #
# Copyright (c) 1999, Digital Creations. All rights reserved. # Zope Public License (ZPL) Version 1.0
# -------------------------------------
# #
# This license covers Zope software delivered as "options" by Digital # Copyright (c) Digital Creations. All rights reserved.
# Creations.
# #
# Use in source and binary forms, with or without modification, are # This license has been certified as Open Source(tm).
# permitted provided that the following conditions are met:
# #
# 1. Redistributions are not permitted in any form. # Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are
# met:
# #
# 2. This license permits one copy of software to be used by up to five # 1. Redistributions in source code must retain the above copyright
# developers in a single company. Use by more than five developers # notice, this list of conditions, and the following disclaimer.
# requires additional licenses.
# #
# 3. Software may be used to operate any type of website, including # 2. Redistributions in binary form must reproduce the above copyright
# publicly accessible ones. # notice, this list of conditions, and the following disclaimer in
# the documentation and/or other materials provided with the
# distribution.
# #
# 4. Software is not fully documented, and the customer acknowledges # 3. Digital Creations requests that attribution be given to Zope
# that the product can best be utilized by reading the source code. # in any manner possible. Zope includes a "Powered by Zope"
# button that is installed by default. While it is not a license
# violation to remove this button, it is requested that the
# attribution remain. A significant investment has been put
# into Zope, and this effort will continue if the Zope community
# continues to grow. This is one way to assure that growth.
#
# 4. All advertising materials and documentation mentioning
# features derived from or use of this software must display
# the following acknowledgement:
#
# "This product includes software developed by Digital Creations
# for use in the Z Object Publishing Environment
# (http://www.zope.org/)."
#
# In the event that the product being advertised includes an
# intact Zope distribution (with copyright and license included)
# then this clause is waived.
#
# 5. Names associated with Zope or Digital Creations must not be used to
# endorse or promote products derived from this software without
# prior written permission from Digital Creations.
#
# 6. Modified redistributions of any form whatsoever must retain
# the following acknowledgment:
#
# "This product includes software developed by Digital Creations
# for use in the Z Object Publishing Environment
# (http://www.zope.org/)."
#
# Intact (re-)distributions of any official Zope release do not
# require an external acknowledgement.
#
# 7. Modifications are encouraged but must be packaged separately as
# patches to official Zope releases. Distributions that do not
# clearly separate the patches from the original work must be clearly
# labeled as unofficial distributions. Modifications which do not
# carry the name Zope may be packaged in any form, as long as they
# conform to all of the clauses above.
# #
# 5. Support for software is included for 90 days in email only. Further
# support can be purchased separately.
# #
# Disclaimer # Disclaimer
# #
...@@ -39,17 +75,26 @@ ...@@ -39,17 +75,26 @@
# OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT # OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT
# OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF # OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
# SUCH DAMAGE. # SUCH DAMAGE.
###################################################################### #
#
# This software consists of contributions made by Digital Creations and
# many individuals on behalf of Digital Creations. Specific
# attributions are listed in the accompanying credits file.
#
##############################################################################
"""Network ZODB storage client """Network ZODB storage client
""" """
__version__='$Revision: 1.6 $'[11:-2] __version__='$Revision: 1.7 $'[11:-2]
import struct, time, os, socket, string, Sync, zrpc, ClientCache
import tempfile, Invalidator, ExtensionClass, thread
import ThreadedAsync
import struct, time, os, socket, cPickle, string, Sync, zrpc, ClientCache
import tempfile
now=time.time now=time.time
from struct import pack, unpack from struct import pack, unpack
from ZODB import POSException, BaseStorage from ZODB import POSException, BaseStorage
from ZODB.TimeStamp import TimeStamp from ZODB.TimeStamp import TimeStamp
from zLOG import LOG, PROBLEM, INFO
TupleType=type(()) TupleType=type(())
...@@ -57,72 +102,128 @@ class UnrecognizedResult(POSException.StorageError): ...@@ -57,72 +102,128 @@ class UnrecognizedResult(POSException.StorageError):
"""A server call returned an unrecognized result """A server call returned an unrecognized result
""" """
class ClientStorage(BaseStorage.BaseStorage): class ClientStorage(ExtensionClass.Base, BaseStorage.BaseStorage):
_connected=_async=0
__begin='tpc_begin_sync'
def __init__(self, connection, async=0, storage='1', cache_size=20000000, def __init__(self, connection, async=0, storage='1', cache_size=20000000,
name=''): name='', client='', debug=0, var=None):
# Decide whether to use non-temporary files # Decide whether to use non-temporary files
client=os.environ.get('ZEO_CLIENT','') client=client or os.environ.get('ZEO_CLIENT','')
if client: async=1
self._connection=connection
self._storage=storage
self._debug=debug
self._info={'length': 0, 'size': 0, 'name': 'ZEO Client',
'supportsUndo':0, 'supportsVersions': 0,
}
if async: self._call=zrpc.asyncRPC(connection, debug=debug)
import asyncore
def loop(timeout=30.0, use_poll=0,
self=self, asyncore=asyncore, loop=asyncore.loop):
self.becomeAsync()
asyncore.loop=loop
loop(timeout, use_poll)
asyncore.loop=loop
self._call=zrpc.syncRPC(connection)
self.__begin='tpc_begin_sync'
self._call._write(str(storage))
info=self._call('get_info')
self._len=info.get('length',0)
self._size=info.get('size',0)
name=name or ("%s %s" % (info.get('name', ''), str(connection)))
self._supportsUndo=info.get('supportsUndo',0)
self._supportsVersions=info.get('supportsVersions',0)
name = name or str(connection)
self._tfile=tempfile.TemporaryFile() self._tfile=tempfile.TemporaryFile()
self._oids=[]
self._serials=[]
self._seriald={}
ClientStorage.inheritedAttribute('__init__')(self, name)
self.__lock_acquire=self._lock_acquire
self._cache=ClientCache.ClientCache(
storage, cache_size, client=client, var=var)
ThreadedAsync.register_loop_callback(self.becomeAsync)
def _startup(self):
if not self._call.connect():
# If we can't connect right away, go ahead and open the cache
# and start a separate thread to try and reconnect.
LOG("ClientStorage", PROBLEM, "Failed to connect to storage")
self._cache.open()
thread.start_new_thread(self._call.connect,(0,))
def notifyConnected(self, s):
LOG("ClientStorage", INFO, "Connected to storage")
self._lock_acquire()
try:
# We let the connection keep coming up now that
# we have the storage lock. This way, we know no calls
# will be made while in the process of coming up.
self._call.finishConnect(s)
self._connected=1
self._oids=[]
self.__begin='tpc_begin_sync'
self._call.message_output(str(self._storage))
self._info.update(self._call('get_info'))
cached=self._cache.open()
if cached:
self._call.sendMessage('beginZeoVerify')
for oid, (s, vs) in cached:
self._call.sendMessage('zeoVerify', oid, s, vs)
self._call.sendMessage('endZeoVerify')
self._cache=ClientCache.ClientCache(storage, cache_size, client=client) finally: self._lock_release()
if async:
for oid, (s, vs) in self._cache.open():
self._call.queue('zeoVerify', oid, s, vs)
else:
for oid, (s, vs) in self._cache.open():
self._call.send('zeoVerify', oid, s, vs)
BaseStorage.BaseStorage.__init__(self, name) if self._async:
import ZServer.medusa.asyncore
self.becomeAsync(ZServer.medusa.asyncore.socket_map)
def becomeAsync(self): def notifyDisconnected(self, ignored):
self._call=zrpc.asyncRPC(self._call) LOG("ClientStorage", PROBLEM, "Disconnected from storage")
self.__begin='tpc_begin' self._connected=0
thread.start_new_thread(self._call.connect,(0,))
def becomeAsync(self, map):
self._lock_acquire()
try:
self._async=1
if self._connected:
import ZServer.PubCore.ZEvent
self._call.setLoop(map,
ZServer.PubCore.ZEvent.Wakeup)
self.__begin='tpc_begin'
finally: self._lock_release()
def registerDB(self, db, limit): def registerDB(self, db, limit):
def invalidate(code, args, invalidator=Invalidator.Invalidator(
dinvalidate=db.invalidate, db.invalidate,
limit=limit, self._cache.invalidate)
release=self._commit_lock_release,
cinvalidate=self._cache.invalidate def out_of_band_hook(
): code, args,
if code == 'I': get_hook={
for oid, serial, version in args: 'b': (invalidator.begin, 0),
cinvalidate(oid, version=version) 'i': (invalidator.invalidate, 1),
dinvalidate(oid, version=version) 'e': (invalidator.end, 0),
elif code == 'U': 'I': (invalidator.Invalidate, 1),
release() 'r': (self._commit_lock_release, 0),
's': (self._serials.append, 1),
self._call.setOutOfBand(invalidate) 'S': (self._info.update, 1),
}.get):
hook = get_hook(code, None)
if hook is None: return
hook, flag = hook
if flag: hook(args)
else: hook()
self._call.setOutOfBand(out_of_band_hook)
self._startup()
def __len__(self): return self._len def __len__(self): return self._info['length']
def abortVersion(self, src, transaction): def abortVersion(self, src, transaction):
if transaction is not self._transaction: if transaction is not self._transaction:
...@@ -143,9 +244,12 @@ class ClientStorage(BaseStorage.BaseStorage): ...@@ -143,9 +244,12 @@ class ClientStorage(BaseStorage.BaseStorage):
try: return self._call('commitVersion', src, dest, self._serial) try: return self._call('commitVersion', src, dest, self._serial)
finally: self._lock_release() finally: self._lock_release()
def getName(self): return self.__name__ def getName(self):
return "%s (%s)" % (
self.__name__,
self._connected and 'connected' or 'disconnected')
def getSize(self): return self._size def getSize(self): return self._info['size']
def history(self, oid, version, length=1): def history(self, oid, version, length=1):
self._lock_acquire() self._lock_acquire()
...@@ -174,7 +278,13 @@ class ClientStorage(BaseStorage.BaseStorage): ...@@ -174,7 +278,13 @@ class ClientStorage(BaseStorage.BaseStorage):
def new_oid(self, last=None): def new_oid(self, last=None):
self._lock_acquire() self._lock_acquire()
try: return self._call('new_oid') try:
oids=self._oids
if not oids:
oids[:]=self._call('new_oids')
oids.reverse()
return oids.pop()
finally: self._lock_release() finally: self._lock_release()
def pack(self, t, rf): def pack(self, t, rf):
...@@ -189,19 +299,48 @@ class ClientStorage(BaseStorage.BaseStorage): ...@@ -189,19 +299,48 @@ class ClientStorage(BaseStorage.BaseStorage):
raise POSException.StorageTransactionError(self, transaction) raise POSException.StorageTransactionError(self, transaction)
self._lock_acquire() self._lock_acquire()
try: try:
serial=self._call('store', oid, serial, serial=self._call.sendMessage('storea', oid, serial,
data, version, self._serial) data, version, self._serial)
write=self._tfile.write write=self._tfile.write
write(oid+serial+pack(">HI", len(version), len(data))+version) write(oid+pack(">HI", len(version), len(data))+version)
write(data) write(data)
if self._serials:
s=self._serials
l=len(s)
r=s[:l]
del s[:l]
d=self._seriald
for oid, s in r: d[oid]=s
return r
return serial return serial
finally: self._lock_release() finally: self._lock_release()
def supportsUndo(self): return self._supportsUndo def tpc_vote(self, transaction):
def supportsVersions(self): return self._supportsVersions if transaction is not self._transaction:
raise POSException.StorageTransactionError(self, transaction)
self._lock_acquire()
try:
self._call('vote')
if self._serials:
s=self._serials
l=len(s)
r=s[:l]
del s[:l]
d=self._seriald
for oid, s in r: d[oid]=s
return r
finally: self._lock_release()
def supportsUndo(self): return self._info['supportsUndo']
def supportsVersions(self): return self._info['supportsVersions']
def tpc_abort(self, transaction): def tpc_abort(self, transaction):
self._lock_acquire() self._lock_acquire()
...@@ -210,6 +349,8 @@ class ClientStorage(BaseStorage.BaseStorage): ...@@ -210,6 +349,8 @@ class ClientStorage(BaseStorage.BaseStorage):
self._call('tpc_abort', self._serial) self._call('tpc_abort', self._serial)
self._transaction=None self._transaction=None
self._tfile.seek(0) self._tfile.seek(0)
self._seriald.clear()
del self._serials[:]
self._commit_lock_release() self._commit_lock_release()
finally: self._lock_release() finally: self._lock_release()
...@@ -227,13 +368,19 @@ class ClientStorage(BaseStorage.BaseStorage): ...@@ -227,13 +368,19 @@ class ClientStorage(BaseStorage.BaseStorage):
self._serial=id=`t` self._serial=id=`t`
self._tfile.seek(0) self._tfile.seek(0)
self._seriald.clear()
del self._serials[:]
while 1: while 1:
self._lock_release() self._lock_release()
self._commit_lock_acquire() self._commit_lock_acquire()
self._lock_acquire() self._lock_acquire()
if self._call(self.__begin, id, user, desc, ext) is None: try: r=self._call(self.__begin, id, user, desc, ext)
break except:
self._commit_lock_release()
raise
if r is None: break
self._transaction=transaction self._transaction=transaction
...@@ -250,6 +397,14 @@ class ClientStorage(BaseStorage.BaseStorage): ...@@ -250,6 +397,14 @@ class ClientStorage(BaseStorage.BaseStorage):
transaction.description, transaction.description,
transaction._extension) transaction._extension)
seriald=self._seriald
if self._serials:
s=self._serials
l=len(s)
r=s[:l]
del s[:l]
for oid, s in r: seriald[oid]=s
tfile=self._tfile tfile=self._tfile
seek=tfile.seek seek=tfile.seek
read=tfile.read read=tfile.read
...@@ -259,7 +414,7 @@ class ClientStorage(BaseStorage.BaseStorage): ...@@ -259,7 +414,7 @@ class ClientStorage(BaseStorage.BaseStorage):
i=0 i=0
while i < size: while i < size:
oid=read(8) oid=read(8)
s=read(8) s=seriald[oid]
h=read(6) h=read(6)
vlen, dlen = unpack(">HI", h) vlen, dlen = unpack(">HI", h)
if vlen: v=read(vlen) if vlen: v=read(vlen)
...@@ -306,4 +461,3 @@ class ClientStorage(BaseStorage.BaseStorage): ...@@ -306,4 +461,3 @@ class ClientStorage(BaseStorage.BaseStorage):
self._lock_acquire() self._lock_acquire()
try: return self._call('versionEmpty', max) try: return self._call('versionEmpty', max)
finally: self._lock_release() finally: self._lock_release()
##############################################################################
#
# Zope Public License (ZPL) Version 1.0
# -------------------------------------
#
# Copyright (c) Digital Creations. All rights reserved.
#
# This license has been certified as Open Source(tm).
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are
# met:
#
# 1. Redistributions in source code must retain the above copyright
# notice, this list of conditions, and the following disclaimer.
#
# 2. Redistributions in binary form must reproduce the above copyright
# notice, this list of conditions, and the following disclaimer in
# the documentation and/or other materials provided with the
# distribution.
#
# 3. Digital Creations requests that attribution be given to Zope
# in any manner possible. Zope includes a "Powered by Zope"
# button that is installed by default. While it is not a license
# violation to remove this button, it is requested that the
# attribution remain. A significant investment has been put
# into Zope, and this effort will continue if the Zope community
# continues to grow. This is one way to assure that growth.
#
# 4. All advertising materials and documentation mentioning
# features derived from or use of this software must display
# the following acknowledgement:
#
# "This product includes software developed by Digital Creations
# for use in the Z Object Publishing Environment
# (http://www.zope.org/)."
#
# In the event that the product being advertised includes an
# intact Zope distribution (with copyright and license included)
# then this clause is waived.
#
# 5. Names associated with Zope or Digital Creations must not be used to
# endorse or promote products derived from this software without
# prior written permission from Digital Creations.
#
# 6. Modified redistributions of any form whatsoever must retain
# the following acknowledgment:
#
# "This product includes software developed by Digital Creations
# for use in the Z Object Publishing Environment
# (http://www.zope.org/)."
#
# Intact (re-)distributions of any official Zope release do not
# require an external acknowledgement.
#
# 7. Modifications are encouraged but must be packaged separately as
# patches to official Zope releases. Distributions that do not
# clearly separate the patches from the original work must be clearly
# labeled as unofficial distributions. Modifications which do not
# carry the name Zope may be packaged in any form, as long as they
# conform to all of the clauses above.
#
#
# Disclaimer
#
# THIS SOFTWARE IS PROVIDED BY DIGITAL CREATIONS ``AS IS'' AND ANY
# EXPRESSED OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL DIGITAL CREATIONS OR ITS
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
# ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
# OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT
# OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
# SUCH DAMAGE.
#
#
# This software consists of contributions made by Digital Creations and
# many individuals on behalf of Digital Creations. Specific
# attributions are listed in the accompanying credits file.
#
##############################################################################
"""Facility for (roughly) atomically invalidating cache entries. """Facility for (roughly) atomically invalidating cache entries.
Note that this is not *really* atomic, but it is close enough. Note that this is not *really* atomic, but it is close enough.
...@@ -15,16 +99,17 @@ class Invalidator: ...@@ -15,16 +99,17 @@ class Invalidator:
def begin(self): def begin(self):
self._tfile=tempfile.TemporaryFile() self._tfile=tempfile.TemporaryFile()
self._d=cPickle.Pickler(self._tfile).dump pickler=cPickle.Pickler(self._tfile, 1)
pickler.fast=1 # Don't use the memo
self._d=pickler.dump
def invalidate(self, args): def invalidate(self, args):
if self._d is None: return if self._d is None: return
for arg in args: self._d(args)
self._d(arg)
def end(self): def end(self):
if self._d is None: return if self._d is None: return
self._d((0,0,0)) self._d((0,0))
self._d=None self._d=None
self._tfile.seek(0) self._tfile.seek(0)
load=cPickle.Unpickler(self._tfile).load load=cPickle.Unpickler(self._tfile).load
...@@ -34,8 +119,15 @@ class Invalidator: ...@@ -34,8 +119,15 @@ class Invalidator:
dinvalidate=self.dinvalidate dinvalidate=self.dinvalidate
while 1: while 1:
oid, serial, version = load() oid, version = load()
if not oid: break if not oid: break
cinvalidate(oid, version=version) cinvalidate(oid, version=version)
dinvalidate(oid, version=version) dinvalidate(oid, version=version)
def Invalidate(self, args):
if self._d is None: return
cinvalidate=self.cinvalidate
dinvalidate=self.dinvalidate
for oid, version in args:
cinvalidate(oid, version=version)
dinvalidate(oid, version=version)
Zope Enterprize Option, iteration 1 Zope Enterprize Option, ZEO 0.2
Put this package in your Zope lib/python. Put this package in your Zope lib/python.
Note -- This release of ZEO requires Zope 2.2 or a CVS checkout
of Zope. See 'CHANGES.txt' for details.
You also need to symbolically link (or copy) ZServer to lib/python::
cd lib/python
ln -s ../../ZServer .
To start the storage server, go to your Zope install directory and:: To start the storage server, go to your Zope install directory and::
lib/python/ZEO/start.py -p port_number lib/python/ZEO/start.py -p port_number
(Run start without arguments to see options.) (Run start without arguments to see options.)
Of course, the server and the client don't have to be on the same
machine.
If the server and client *are* on the same machine, then you can use
a Unix domain socket::
lib/python/ZEO/start.py -U filename
To get Zope to use the server, create a custom_zodb module, To get Zope to use the server, create a custom_zodb module,
custom_zodb.py, in your Zope install directory, so that uses a custom_zodb.py, in your Zope install directory, so that Zope uses a
ClientStorage:: ClientStorage::
import ZEO.ClientStorage import ZEO.ClientStorage
Storage=ZEO.ClientStorage.ClientStorage(('',port_number), async=1) Storage=ZEO.ClientStorage.ClientStorage(('',port_number))
You can specify a host name (rather than '') if you want. You can specify a host name (rather than '') if you want.
The port number is, of course, the port number used to start the The port number is, of course, the port number used to start the
storage server. The async switch tells the client to switch storage server. The async switch tells the client to switch
itself to async mode (if and) when the asyncore main loop is called. itself to async mode (if and) when the asyncore main loop is called.
You can also give the name of a Unix domain socket file::
import ZEO.ClientStorage
Storage=ZEO.ClientStorage.ClientStorage(filename)
If you want a persistent client cache, you need to define the If you want a persistent client cache, you need to define the
environment variable, ZEO_CLIENT to a unique name for the environment variable, ZEO_CLIENT to a unique name for the
client. This is needed so that unique cache name files can be client. This is needed so that unique cache name files can be
...@@ -28,4 +50,46 @@ Zope Enterprize Option, iteration 1 ...@@ -28,4 +50,46 @@ Zope Enterprize Option, iteration 1
something like: something like:
python z2.py -P8700 ZEO_CLIENT=8700 python z2.py -P8700 ZEO_CLIENT=8700
python z2.py -P8800 ZEO_CLIENT=8700 python z2.py -P8800 ZEO_CLIENT=8800
The ClientStorage constructor provides a number of additional
options (arguments):
storage -- The name of the storage to connect to.
cache_size -- The number of bytes to allow for the client cache.
The default is 20,000,000.
name -- The name to use for the storage. This will be shown in
Zope's control panel. The default name is a representation of
the connection information.
debug -- If this is provided, it should be a non-empty string. It
indicates that client should log tracing and debugging
information, using zLOG.
var -- The directory in which persistent cache files should be
written. If this option is provided, it is unnecessary to
set INSTANCE_HOME in __builtins__.
Notes for non Zope users
First, we regret the amount of dependence on Zope. We intend for
the dependence to decrease in the long run.
Known dependencies:
- Shared must be in the Python path. This is due to a lame
dependency on some Zope XML facilities used by ZODB for XML
export and import.
- ZServer should be in the Python path, or you should copy the
version of asyncore.py from ZServer (from Zope 2.2 or CVS) to
your Python path, or you should copy a version of a asyncore
from the medusa CVS tree to your Python path. A recent change
in asyncore is required.
- The module, ThreadedAsync must be in the python path.
###################################################################### ##############################################################################
# Digital Creations Options License Version 0.9.0
# -----------------------------------------------
# #
# Copyright (c) 1999, Digital Creations. All rights reserved. # Zope Public License (ZPL) Version 1.0
# -------------------------------------
# #
# This license covers Zope software delivered as "options" by Digital # Copyright (c) Digital Creations. All rights reserved.
# Creations.
# #
# Use in source and binary forms, with or without modification, are # This license has been certified as Open Source(tm).
# permitted provided that the following conditions are met:
# #
# 1. Redistributions are not permitted in any form. # Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are
# met:
# #
# 2. This license permits one copy of software to be used by up to five # 1. Redistributions in source code must retain the above copyright
# developers in a single company. Use by more than five developers # notice, this list of conditions, and the following disclaimer.
# requires additional licenses.
# #
# 3. Software may be used to operate any type of website, including # 2. Redistributions in binary form must reproduce the above copyright
# publicly accessible ones. # notice, this list of conditions, and the following disclaimer in
# the documentation and/or other materials provided with the
# distribution.
# #
# 4. Software is not fully documented, and the customer acknowledges # 3. Digital Creations requests that attribution be given to Zope
# that the product can best be utilized by reading the source code. # in any manner possible. Zope includes a "Powered by Zope"
# button that is installed by default. While it is not a license
# violation to remove this button, it is requested that the
# attribution remain. A significant investment has been put
# into Zope, and this effort will continue if the Zope community
# continues to grow. This is one way to assure that growth.
#
# 4. All advertising materials and documentation mentioning
# features derived from or use of this software must display
# the following acknowledgement:
#
# "This product includes software developed by Digital Creations
# for use in the Z Object Publishing Environment
# (http://www.zope.org/)."
#
# In the event that the product being advertised includes an
# intact Zope distribution (with copyright and license included)
# then this clause is waived.
#
# 5. Names associated with Zope or Digital Creations must not be used to
# endorse or promote products derived from this software without
# prior written permission from Digital Creations.
#
# 6. Modified redistributions of any form whatsoever must retain
# the following acknowledgment:
#
# "This product includes software developed by Digital Creations
# for use in the Z Object Publishing Environment
# (http://www.zope.org/)."
#
# Intact (re-)distributions of any official Zope release do not
# require an external acknowledgement.
#
# 7. Modifications are encouraged but must be packaged separately as
# patches to official Zope releases. Distributions that do not
# clearly separate the patches from the original work must be clearly
# labeled as unofficial distributions. Modifications which do not
# carry the name Zope may be packaged in any form, as long as they
# conform to all of the clauses above.
# #
# 5. Support for software is included for 90 days in email only. Further
# support can be purchased separately.
# #
# Disclaimer # Disclaimer
# #
...@@ -39,29 +75,44 @@ ...@@ -39,29 +75,44 @@
# OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT # OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT
# OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF # OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
# SUCH DAMAGE. # SUCH DAMAGE.
###################################################################### #
#
# This software consists of contributions made by Digital Creations and
# many individuals on behalf of Digital Creations. Specific
# attributions are listed in the accompanying credits file.
#
##############################################################################
__version__ = "$Revision: 1.7 $"[11:-2] __version__ = "$Revision: 1.8 $"[11:-2]
import asyncore, socket, string, sys, cPickle import asyncore, socket, string, sys, cPickle, os
from smac import SizedMessageAsyncConnection from smac import SizedMessageAsyncConnection
from ZODB import POSException from ZODB import POSException
from ZODB.Transaction import Transaction from ZODB.Transaction import Transaction
import traceback import traceback
from zLOG import LOG, INFO, ERROR from zLOG import LOG, INFO, ERROR, TRACE
from ZODB.referencesf import referencesf from ZODB.referencesf import referencesf
from thread import start_new_thread from thread import start_new_thread
from cPickle import Unpickler
from cStringIO import StringIO
class StorageServerError(POSException.StorageError): pass class StorageServerError(POSException.StorageError): pass
def blather(*args): def blather(*args):
LOG('ZEO Server', INFO, string.join(args)) LOG('ZEO Server', TRACE, string.join(args))
# We create a special fast pickler! This allows us
# to create slightly more efficient pickles and
# to create them a tad faster.
pickler=cPickle.Pickler()
pickler.fast=1 # Don't use the memo
dump=pickler.dump
class StorageServer(asyncore.dispatcher): class StorageServer(asyncore.dispatcher):
def __init__(self, connection, storages): def __init__(self, connection, storages):
self.host, self.port = connection
self.__storages=storages self.__storages=storages
self.__connections={} self.__connections={}
...@@ -69,9 +120,15 @@ class StorageServer(asyncore.dispatcher): ...@@ -69,9 +120,15 @@ class StorageServer(asyncore.dispatcher):
asyncore.dispatcher.__init__(self) asyncore.dispatcher.__init__(self)
self.create_socket(socket.AF_INET, socket.SOCK_STREAM)
self.bind((self.host, self.port)) if type(connection) is type(''):
self.create_socket(socket.AF_UNIX, socket.SOCK_STREAM)
try: os.unlink(connection)
except: pass
else:
self.create_socket(socket.AF_INET, socket.SOCK_STREAM)
self.bind(connection)
self.listen(5) self.listen(5)
...@@ -99,10 +156,12 @@ class StorageServer(asyncore.dispatcher): ...@@ -99,10 +156,12 @@ class StorageServer(asyncore.dispatcher):
self.__connections[storage_id]=n self.__connections[storage_id]=n
def invalidate(self, connection, storage_id, invalidated, def invalidate(self, connection, storage_id, invalidated,
dumps=cPickle.dumps): dump=dump):
for c in self.__connections[storage_id]: for c in self.__connections[storage_id]:
if c is connection: continue if c is connection: continue
c.message_output('I'+dumps(invalidated)) c.message_output('bN.')
c.message_output('I'+dump(invalidated, 1))
c.message_output('eN.')
def writable(self): return 0 def writable(self): return 0
...@@ -128,15 +187,33 @@ class StorageServer(asyncore.dispatcher): ...@@ -128,15 +187,33 @@ class StorageServer(asyncore.dispatcher):
log=log_info log=log_info
storage_methods={} storage_methods={}
for n in ('get_info', 'abortVersion', 'commitVersion', 'history', for n in (
'load', 'modifiedInVersion', 'new_oid', 'pack', 'store', 'get_info', 'abortVersion', 'commitVersion', 'history', 'load',
'tpc_abort', 'tpc_begin', 'tpc_begin_sync', 'tpc_finish', 'undo', 'modifiedInVersion', 'new_oid', 'new_oids', 'pack', 'store',
'undoLog', 'undoInfo', 'versionEmpty', 'storea', 'tpc_abort', 'tpc_begin', 'tpc_begin_sync',
'zeoLoad', 'zeoVerify', 'tpc_finish', 'undo', 'undoLog', 'undoInfo', 'versionEmpty',
): 'vote', 'zeoLoad', 'zeoVerify', 'beginZeoVerify', 'endZeoVerify',
):
storage_methods[n]=1 storage_methods[n]=1
storage_method=storage_methods.has_key storage_method=storage_methods.has_key
def find_global(module, name,
global_dict=globals(), silly=('__doc__',)):
try: m=__import__(module, global_dict, global_dict, silly)
except:
raise StorageServerError, (
"Couldn\'t import global module %s" % module)
try: r=getattr(m, name)
except:
raise StorageServerError, (
"Couldn\'t find global %s in module %s" % (name, module))
safe=getattr(r, '__no_side_effects__', 0)
if safe: return r
raise StorageServerError, 'Unsafe global, %s.%s' % (module, name)
_noreturn=[] _noreturn=[]
class Connection(SizedMessageAsyncConnection): class Connection(SizedMessageAsyncConnection):
...@@ -144,10 +221,12 @@ class Connection(SizedMessageAsyncConnection): ...@@ -144,10 +221,12 @@ class Connection(SizedMessageAsyncConnection):
__storage=__storage_id=None __storage=__storage_id=None
def __init__(self, server, sock, addr): def __init__(self, server, sock, addr):
SizedMessageAsyncConnection.__init__(self, sock, addr)
self.__server=server self.__server=server
self.__invalidated=[] self.__invalidated=[]
self.__closed=None self.__closed=None
if __debug__: debug='ZEO Server'
else: debug=0
SizedMessageAsyncConnection.__init__(self, sock, addr, debug=debug)
def close(self): def close(self):
t=self._transaction t=self._transaction
...@@ -159,7 +238,9 @@ class Connection(SizedMessageAsyncConnection): ...@@ -159,7 +238,9 @@ class Connection(SizedMessageAsyncConnection):
self.__closed=1 self.__closed=1
SizedMessageAsyncConnection.close(self) SizedMessageAsyncConnection.close(self)
def message_input(self, message): def message_input(self, message,
dump=dump, Unpickler=Unpickler, StringIO=StringIO,
None=None):
if __debug__: if __debug__:
m=`message` m=`message`
if len(m) > 60: m=m[:60]+' ...' if len(m) > 60: m=m[:60]+' ...'
...@@ -172,7 +253,12 @@ class Connection(SizedMessageAsyncConnection): ...@@ -172,7 +253,12 @@ class Connection(SizedMessageAsyncConnection):
rt='R' rt='R'
try: try:
args=cPickle.loads(message)
# Unpickle carefully.
unpickler=Unpickler(StringIO(message))
unpickler.find_global=find_global
args=unpickler.load()
name, args = args[0], args[1:] name, args = args[0], args[1:]
if __debug__: if __debug__:
m=`tuple(args)` m=`tuple(args)`
...@@ -197,7 +283,7 @@ class Connection(SizedMessageAsyncConnection): ...@@ -197,7 +283,7 @@ class Connection(SizedMessageAsyncConnection):
if len(m) > 60: m=m[:60]+' ...' if len(m) > 60: m=m[:60]+' ...'
blather('%s: %s' % (rt, m)) blather('%s: %s' % (rt, m))
r=cPickle.dumps(r,1) r=dump(r,1)
self.message_output(rt+r) self.message_output(rt+r)
def get_info(self): def get_info(self):
...@@ -218,20 +304,39 @@ class Connection(SizedMessageAsyncConnection): ...@@ -218,20 +304,39 @@ class Connection(SizedMessageAsyncConnection):
p, s = storage.load(oid,'') p, s = storage.load(oid,'')
return p, s, v, pv, sv return p, s, v, pv, sv
def beginZeoVerify(self):
self.message_output('bN.')
return _noreturn
def zeoVerify(self, oid, s, sv, def zeoVerify(self, oid, s, sv,
dumps=cPickle.dumps): dump=dump):
try: p, os, v, pv, osv = self.zeoLoad(oid) try: p, os, v, pv, osv = self.zeoLoad(oid)
except: return _noreturn except: return _noreturn
p=pv=None # free the pickles p=pv=None # free the pickles
if os != s: if os != s:
self.message_output('I'+dumps(((oid, os, ''),))) self.message_output('i'+dump((oid, ''),1))
elif osv != sv: elif osv != sv:
self.message_output('I'+dumps(((oid, osv, v),))) self.message_output('i'+dump((oid, v),1))
return _noreturn return _noreturn
def endZeoVerify(self):
self.message_output('eN.')
return _noreturn
def new_oids(self, n=100):
new_oid=self.__storage.new_oid
if n < 0: n=1
r=range(n)
for i in r: r[i]=new_oid()
return r
def pack(self, t): def pack(self, t):
start_new_thread(self.__storage.pack, (t, referencesf)) start_new_thread(self._pack, (t,))
def _pack(self, t):
self.__storage.pack(t, referencesf)
self.message_output('S'+dump(self.get_info(), 1))
def store(self, oid, serial, data, version, id): def store(self, oid, serial, data, version, id):
t=self._transaction t=self._transaction
...@@ -239,9 +344,27 @@ class Connection(SizedMessageAsyncConnection): ...@@ -239,9 +344,27 @@ class Connection(SizedMessageAsyncConnection):
raise POSException.StorageTransactionError(self, id) raise POSException.StorageTransactionError(self, id)
newserial=self.__storage.store(oid, serial, data, version, t) newserial=self.__storage.store(oid, serial, data, version, t)
if serial != '\0\0\0\0\0\0\0\0': if serial != '\0\0\0\0\0\0\0\0':
self.__invalidated.append(oid, serial, version) self.__invalidated.append((oid, version))
return newserial return newserial
def storea(self, oid, serial, data, version, id,
dump=dump):
t=self._transaction
if t is None or id != t.id:
raise POSException.StorageTransactionError(self, id)
try: newserial=self.__storage.store(oid, serial, data, version, t)
except POSException.ConflictError, v:
newserial=v
else:
if serial != '\0\0\0\0\0\0\0\0':
self.__invalidated.append((oid, version))
self.message_output('s'+dump((oid,newserial), 1))
return _noreturn
def vote(self): pass
def undo(self, transaction_id): def undo(self, transaction_id):
oids=self.__storage.undo(transaction_id) oids=self.__storage.undo(transaction_id)
self.__server.invalidate( self.__server.invalidate(
...@@ -328,8 +451,10 @@ class Connection(SizedMessageAsyncConnection): ...@@ -328,8 +451,10 @@ class Connection(SizedMessageAsyncConnection):
if apply(f,args): break if apply(f,args): break
self._transaction=None self._transaction=None
self.__server.invalidate(self, self.__storage_id, self.__invalidated) if self.__invalidated:
self.__invalidated=[] self.__server.invalidate(self, self.__storage_id,
self.__invalidated)
self.__invalidated=[]
if __name__=='__main__': if __name__=='__main__':
......
""" Soon to be the ZPL """ ##############################################################################
#
# Zope Public License (ZPL) Version 1.0
# -------------------------------------
#
# Copyright (c) Digital Creations. All rights reserved.
#
# This license has been certified as Open Source(tm).
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are
# met:
#
# 1. Redistributions in source code must retain the above copyright
# notice, this list of conditions, and the following disclaimer.
#
# 2. Redistributions in binary form must reproduce the above copyright
# notice, this list of conditions, and the following disclaimer in
# the documentation and/or other materials provided with the
# distribution.
#
# 3. Digital Creations requests that attribution be given to Zope
# in any manner possible. Zope includes a "Powered by Zope"
# button that is installed by default. While it is not a license
# violation to remove this button, it is requested that the
# attribution remain. A significant investment has been put
# into Zope, and this effort will continue if the Zope community
# continues to grow. This is one way to assure that growth.
#
# 4. All advertising materials and documentation mentioning
# features derived from or use of this software must display
# the following acknowledgement:
#
# "This product includes software developed by Digital Creations
# for use in the Z Object Publishing Environment
# (http://www.zope.org/)."
#
# In the event that the product being advertised includes an
# intact Zope distribution (with copyright and license included)
# then this clause is waived.
#
# 5. Names associated with Zope or Digital Creations must not be used to
# endorse or promote products derived from this software without
# prior written permission from Digital Creations.
#
# 6. Modified redistributions of any form whatsoever must retain
# the following acknowledgment:
#
# "This product includes software developed by Digital Creations
# for use in the Z Object Publishing Environment
# (http://www.zope.org/)."
#
# Intact (re-)distributions of any official Zope release do not
# require an external acknowledgement.
#
# 7. Modifications are encouraged but must be packaged separately as
# patches to official Zope releases. Distributions that do not
# clearly separate the patches from the original work must be clearly
# labeled as unofficial distributions. Modifications which do not
# carry the name Zope may be packaged in any form, as long as they
# conform to all of the clauses above.
#
#
# Disclaimer
#
# THIS SOFTWARE IS PROVIDED BY DIGITAL CREATIONS ``AS IS'' AND ANY
# EXPRESSED OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL DIGITAL CREATIONS OR ITS
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
# ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
# OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT
# OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
# SUCH DAMAGE.
#
#
# This software consists of contributions made by Digital Creations and
# many individuals on behalf of Digital Creations. Specific
# attributions are listed in the accompanying credits file.
#
##############################################################################
...@@ -3,7 +3,7 @@ ...@@ -3,7 +3,7 @@
# This file tells Zope to look to a ZEO Storage Server for all of its # This file tells Zope to look to a ZEO Storage Server for all of its
# database objects. # database objects.
__version__ = "$Revision: 1.4 $"[11:-2] __version__ = "$Revision: 1.5 $"[11:-2]
# First we have to get the right ZEO components # First we have to get the right ZEO components
import ZEO.ClientStorage import ZEO.ClientStorage
...@@ -12,10 +12,9 @@ import ZEO.ClientStorage ...@@ -12,10 +12,9 @@ import ZEO.ClientStorage
ZSS_HOST = '' # Host name where ZSS is running ZSS_HOST = '' # Host name where ZSS is running
ZSS_PORT = 8800 # Port that the ZSS is running on ZSS_PORT = 8800 # Port that the ZSS is running on
ZSS_ASYNC = 1 # Set to 1 for async calls
ZSS_NAME = 'ZEOStorage' # Name of the storage being used ZSS_NAME = 'ZEOStorage' # Name of the storage being used
# Now we tell Zope where its storage is: # Now we tell Zope where its storage is:
Storage=ZEO.ClientStorage.ClientStorage((ZSS_HOST, ZSS_PORT), Storage=ZEO.ClientStorage.ClientStorage((ZSS_HOST, ZSS_PORT),
async=ZSS_ASYNC, name=ZSS_NAME) name=ZSS_NAME)
###################################################################### ##############################################################################
# Digital Creations Options License Version 0.9.0
# -----------------------------------------------
# #
# Copyright (c) 1999, Digital Creations. All rights reserved. # Zope Public License (ZPL) Version 1.0
# -------------------------------------
# #
# This license covers Zope software delivered as "options" by Digital # Copyright (c) Digital Creations. All rights reserved.
# Creations.
# #
# Use in source and binary forms, with or without modification, are # This license has been certified as Open Source(tm).
# permitted provided that the following conditions are met:
# #
# 1. Redistributions are not permitted in any form. # Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are
# met:
# #
# 2. This license permits one copy of software to be used by up to five # 1. Redistributions in source code must retain the above copyright
# developers in a single company. Use by more than five developers # notice, this list of conditions, and the following disclaimer.
# requires additional licenses.
# #
# 3. Software may be used to operate any type of website, including # 2. Redistributions in binary form must reproduce the above copyright
# publicly accessible ones. # notice, this list of conditions, and the following disclaimer in
# the documentation and/or other materials provided with the
# distribution.
# #
# 4. Software is not fully documented, and the customer acknowledges # 3. Digital Creations requests that attribution be given to Zope
# that the product can best be utilized by reading the source code. # in any manner possible. Zope includes a "Powered by Zope"
# button that is installed by default. While it is not a license
# violation to remove this button, it is requested that the
# attribution remain. A significant investment has been put
# into Zope, and this effort will continue if the Zope community
# continues to grow. This is one way to assure that growth.
#
# 4. All advertising materials and documentation mentioning
# features derived from or use of this software must display
# the following acknowledgement:
#
# "This product includes software developed by Digital Creations
# for use in the Z Object Publishing Environment
# (http://www.zope.org/)."
#
# In the event that the product being advertised includes an
# intact Zope distribution (with copyright and license included)
# then this clause is waived.
#
# 5. Names associated with Zope or Digital Creations must not be used to
# endorse or promote products derived from this software without
# prior written permission from Digital Creations.
#
# 6. Modified redistributions of any form whatsoever must retain
# the following acknowledgment:
#
# "This product includes software developed by Digital Creations
# for use in the Z Object Publishing Environment
# (http://www.zope.org/)."
#
# Intact (re-)distributions of any official Zope release do not
# require an external acknowledgement.
#
# 7. Modifications are encouraged but must be packaged separately as
# patches to official Zope releases. Distributions that do not
# clearly separate the patches from the original work must be clearly
# labeled as unofficial distributions. Modifications which do not
# carry the name Zope may be packaged in any form, as long as they
# conform to all of the clauses above.
# #
# 5. Support for software is included for 90 days in email only. Further
# support can be purchased separately.
# #
# Disclaimer # Disclaimer
# #
...@@ -39,79 +75,118 @@ ...@@ -39,79 +75,118 @@
# OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT # OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT
# OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF # OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
# SUCH DAMAGE. # SUCH DAMAGE.
###################################################################### #
#
# This software consists of contributions made by Digital Creations and
# many individuals on behalf of Digital Creations. Specific
# attributions are listed in the accompanying credits file.
#
##############################################################################
"""Sized message async connections """Sized message async connections
""" """
__version__ = "$Revision: 1.6 $"[11:-2] __version__ = "$Revision: 1.7 $"[11:-2]
import asyncore, string, struct, zLOG, sys, Acquisition
from zLOG import LOG, TRACE, ERROR
import asyncore, string, struct, zLOG class SizedMessageAsyncConnection(Acquisition.Explicit, asyncore.dispatcher):
from zLOG import LOG, INFO, ERROR
class SizedMessageAsyncConnection(asyncore.dispatcher): __append=None # Marker indicating that we're closed
def __init__(self, sock, addr): socket=None # to outwit Sam's getattr
asyncore.dispatcher.__init__(self, sock)
def __init__(self, sock, addr, map=None, debug=None):
SizedMessageAsyncConnection.inheritedAttribute(
'__init__')(self, sock, map)
self.addr=addr self.addr=addr
if debug is not None:
self._debug=debug
elif not hasattr(self, '_debug'):
self._debug=__debug__ and 'smac'
self.__state=None self.__state=None
self.__inp=None self.__inp=None
self.__inpl=0
self.__l=4 self.__l=4
self.__output=output=[] self.__output=output=[]
self.__append=output.append self.__append=output.append
self.__pop=output.pop self.__pop=output.pop
def handle_read(self, def handle_read(self,
join=string.join, StringType=type('')): join=string.join, StringType=type(''), _type=type,
l=self.__l StringType=type(''), _None=None):
d=self.recv(l)
d=self.recv(8096)
if not d: return
inp=self.__inp inp=self.__inp
if inp is None: if inp is _None:
inp=d inp=d
elif type(inp) is StringType: elif _type(inp) is StringType:
inp=[inp,d] inp=[inp,d]
else: else:
inp.append(d) inp.append(d)
l=l-len(d) inpl=self.__inpl+len(d)
if l <= 0: l=self.__l
if type(inp) is not StringType: inp=join(inp,'')
if self.__state is None: while 1:
# waiting for message
self.__l=struct.unpack(">i",inp)[0] if l <= inpl:
self.__state=1 # Woo hoo, we have enough data
self.__inp=None if _type(inp) is not StringType: inp=join(inp,'')
d=inp[:l]
inp=inp[l:]
inpl=inpl-l
if self.__state is _None:
# waiting for message
l=struct.unpack(">i",d)[0]
self.__state=1
else:
l=4
self.__state=_None
self.message_input(d)
else: else:
self.__inp=None break # not enough data
self.__l=4
self.__state=None self.__l=l
self.message_input(inp) self.__inp=inp
else: self.__inpl=inpl
self.__l=l
self.__inp=inp
def readable(self): return 1 def readable(self): return 1
def writable(self): return not not self.__output def writable(self): return not not self.__output
def handle_write(self): def handle_write(self):
output=self.__output output=self.__output
if output: while output:
v=output[0] v=output[0]
n=self.send(v) n=self.send(v)
if n < len(v): if n < len(v):
output[0]=v[n:] output[0]=v[n:]
break # we can't write any more
else: else:
del output[0] del output[0]
#break # waaa
def handle_close(self): def handle_close(self):
self.close() self.close()
def message_output(self, message, def message_output(self, message,
pack=struct.pack, len=len): pack=struct.pack, len=len):
if __debug__: if self._debug:
if len(message) > 40: m=message[:40]+' ...' if len(message) > 40: m=message[:40]+' ...'
else: m=message else: m=message
LOG('smax', INFO, 'message_output %s' % `m`) LOG(self._debug, TRACE, 'message_output %s' % `m`)
self.__append(pack(">i",len(message))+message)
append=self.__append
if append is None:
raise Disconnected, (
"This action is temporarily unavailable."
"<p>"
)
append(pack(">i",len(message))+message)
def log_info(self, message, type='info'): def log_info(self, message, type='info'):
if type=='error': type=ERROR if type=='error': type=ERROR
...@@ -119,3 +194,13 @@ class SizedMessageAsyncConnection(asyncore.dispatcher): ...@@ -119,3 +194,13 @@ class SizedMessageAsyncConnection(asyncore.dispatcher):
LOG('ZEO', type, message) LOG('ZEO', type, message)
log=log_info log=log_info
def close(self):
if self.__append is not None:
self.__append=None
SizedMessageAsyncConnection.inheritedAttribute('close')(self)
class Disconnected(Exception):
"""The client has become disconnected from the server
"""
###################################################################### ##############################################################################
# Digital Creations Options License Version 0.9.0
# -----------------------------------------------
# #
# Copyright (c) 1999, Digital Creations. All rights reserved. # Zope Public License (ZPL) Version 1.0
# -------------------------------------
# #
# This license covers Zope software delivered as "options" by Digital # Copyright (c) Digital Creations. All rights reserved.
# Creations.
# #
# Use in source and binary forms, with or without modification, are # This license has been certified as Open Source(tm).
# permitted provided that the following conditions are met:
# #
# 1. Redistributions are not permitted in any form. # Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are
# met:
# #
# 2. This license permits one copy of software to be used by up to five # 1. Redistributions in source code must retain the above copyright
# developers in a single company. Use by more than five developers # notice, this list of conditions, and the following disclaimer.
# requires additional licenses.
# #
# 3. Software may be used to operate any type of website, including # 2. Redistributions in binary form must reproduce the above copyright
# publicly accessible ones. # notice, this list of conditions, and the following disclaimer in
# the documentation and/or other materials provided with the
# distribution.
# #
# 4. Software is not fully documented, and the customer acknowledges # 3. Digital Creations requests that attribution be given to Zope
# that the product can best be utilized by reading the source code. # in any manner possible. Zope includes a "Powered by Zope"
# button that is installed by default. While it is not a license
# violation to remove this button, it is requested that the
# attribution remain. A significant investment has been put
# into Zope, and this effort will continue if the Zope community
# continues to grow. This is one way to assure that growth.
#
# 4. All advertising materials and documentation mentioning
# features derived from or use of this software must display
# the following acknowledgement:
#
# "This product includes software developed by Digital Creations
# for use in the Z Object Publishing Environment
# (http://www.zope.org/)."
#
# In the event that the product being advertised includes an
# intact Zope distribution (with copyright and license included)
# then this clause is waived.
#
# 5. Names associated with Zope or Digital Creations must not be used to
# endorse or promote products derived from this software without
# prior written permission from Digital Creations.
#
# 6. Modified redistributions of any form whatsoever must retain
# the following acknowledgment:
#
# "This product includes software developed by Digital Creations
# for use in the Z Object Publishing Environment
# (http://www.zope.org/)."
#
# Intact (re-)distributions of any official Zope release do not
# require an external acknowledgement.
#
# 7. Modifications are encouraged but must be packaged separately as
# patches to official Zope releases. Distributions that do not
# clearly separate the patches from the original work must be clearly
# labeled as unofficial distributions. Modifications which do not
# carry the name Zope may be packaged in any form, as long as they
# conform to all of the clauses above.
# #
# 5. Support for software is included for 90 days in email only. Further
# support can be purchased separately.
# #
# Disclaimer # Disclaimer
# #
...@@ -39,12 +75,18 @@ ...@@ -39,12 +75,18 @@
# OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT # OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT
# OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF # OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
# SUCH DAMAGE. # SUCH DAMAGE.
##################################################################### #
#
# This software consists of contributions made by Digital Creations and
# many individuals on behalf of Digital Creations. Specific
# attributions are listed in the accompanying credits file.
#
##############################################################################
"""Start the server storage. """Start the server storage.
""" """
__version__ = "$Revision: 1.4 $"[11:-2] __version__ = "$Revision: 1.5 $"[11:-2]
import sys, os, getopt, string import sys, os, getopt, string
...@@ -77,30 +119,46 @@ def main(argv): ...@@ -77,30 +119,46 @@ def main(argv):
os.path.join(INSTANCE_HOME, 'var', 'ZEO_SERVER.pid') os.path.join(INSTANCE_HOME, 'var', 'ZEO_SERVER.pid')
) )
opts, args = getopt.getopt(args, 'p:Dh:') opts, args = getopt.getopt(args, 'p:Dh:U:Z:')
fs=os.path.join(INSTANCE_HOME, 'var', 'Data.fs') fs=os.path.join(INSTANCE_HOME, 'var', 'Data.fs')
usage="""%s -p port [options] [filename] usage="""%s [options] [filename]
where options are: where options are:
-D -- Run in debug mode -D -- Run in debug mode
-U -- Unix-domain socket file to listen on
-p -- port to listen on
-h -- host address to listen on -h -- host address to listen on
-s -- Don't use zdeamon
if no file name is specified, then %s is used. if no file name is specified, then %s is used.
""" % (me, fs) """ % (me, fs)
port=None port=None
debug=0 debug=0
host='' host=''
unix=None
Z=1
for o, v in opts: for o, v in opts:
if o=='-p': port=string.atoi(v) if o=='-p': port=string.atoi(v)
elif o=='-h': host=v elif o=='-h': host=v
elif o=='-U': unix=v
elif o=='-D': debug=1 elif o=='-D': debug=1
elif o=='-s': Z=0
if port is None:
try:
from ZServer.medusa import asyncore
sys.modules['asyncore']=asyncore
except: pass
if port is None and unix is None:
print usage print usage
print 'No port specified.' print 'No port specified.'
sys.exit(1) sys.exit(1)
...@@ -115,18 +173,20 @@ def main(argv): ...@@ -115,18 +173,20 @@ def main(argv):
__builtins__.__debug__=debug __builtins__.__debug__=debug
if debug: os.environ['Z_DEBUG_MODE']='1' if debug: os.environ['Z_DEBUG_MODE']='1'
try: import posix if Z:
except: pass try: import posix
else: except: pass
import zdaemon else:
zdaemon.run(sys.argv, '') import zdaemon
zdaemon.run(sys.argv, '')
import ZEO.StorageServer, ZODB.FileStorage, asyncore, zLOG import ZEO.StorageServer, ZODB.FileStorage, asyncore, zLOG
zLOG.LOG('ZEO Server', zLOG.INFO, 'Serving %s' % fs) zLOG.LOG('ZEO Server', zLOG.INFO, 'Serving %s' % fs)
if not unix: unix=host, port
ZEO.StorageServer.StorageServer( ZEO.StorageServer.StorageServer(
(host,port), unix,
{ {
'1': ZODB.FileStorage.FileStorage(fs) '1': ZODB.FileStorage.FileStorage(fs)
}, },
......
###################################################################### ##############################################################################
# Digital Creations Options License Version 0.9.0
# -----------------------------------------------
# #
# Copyright (c) 1999, Digital Creations. All rights reserved. # Zope Public License (ZPL) Version 1.0
# -------------------------------------
# #
# This license covers Zope software delivered as "options" by Digital # Copyright (c) Digital Creations. All rights reserved.
# Creations.
# #
# Use in source and binary forms, with or without modification, are # This license has been certified as Open Source(tm).
# permitted provided that the following conditions are met:
# #
# 1. Redistributions are not permitted in any form. # Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are
# met:
# #
# 2. This license permits one copy of software to be used by up to five # 1. Redistributions in source code must retain the above copyright
# developers in a single company. Use by more than five developers # notice, this list of conditions, and the following disclaimer.
# requires additional licenses.
# #
# 3. Software may be used to operate any type of website, including # 2. Redistributions in binary form must reproduce the above copyright
# publicly accessible ones. # notice, this list of conditions, and the following disclaimer in
# the documentation and/or other materials provided with the
# distribution.
# #
# 4. Software is not fully documented, and the customer acknowledges # 3. Digital Creations requests that attribution be given to Zope
# that the product can best be utilized by reading the source code. # in any manner possible. Zope includes a "Powered by Zope"
# button that is installed by default. While it is not a license
# violation to remove this button, it is requested that the
# attribution remain. A significant investment has been put
# into Zope, and this effort will continue if the Zope community
# continues to grow. This is one way to assure that growth.
#
# 4. All advertising materials and documentation mentioning
# features derived from or use of this software must display
# the following acknowledgement:
#
# "This product includes software developed by Digital Creations
# for use in the Z Object Publishing Environment
# (http://www.zope.org/)."
#
# In the event that the product being advertised includes an
# intact Zope distribution (with copyright and license included)
# then this clause is waived.
#
# 5. Names associated with Zope or Digital Creations must not be used to
# endorse or promote products derived from this software without
# prior written permission from Digital Creations.
#
# 6. Modified redistributions of any form whatsoever must retain
# the following acknowledgment:
#
# "This product includes software developed by Digital Creations
# for use in the Z Object Publishing Environment
# (http://www.zope.org/)."
#
# Intact (re-)distributions of any official Zope release do not
# require an external acknowledgement.
#
# 7. Modifications are encouraged but must be packaged separately as
# patches to official Zope releases. Distributions that do not
# clearly separate the patches from the original work must be clearly
# labeled as unofficial distributions. Modifications which do not
# carry the name Zope may be packaged in any form, as long as they
# conform to all of the clauses above.
# #
# 5. Support for software is included for 90 days in email only. Further
# support can be purchased separately.
# #
# Disclaimer # Disclaimer
# #
...@@ -39,56 +75,118 @@ ...@@ -39,56 +75,118 @@
# OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT # OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT
# OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF # OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
# SUCH DAMAGE. # SUCH DAMAGE.
##################################################################### #
#
# This software consists of contributions made by Digital Creations and
# many individuals on behalf of Digital Creations. Specific
# attributions are listed in the accompanying credits file.
#
##############################################################################
"""Simple rpc mechanisms """Simple rpc mechanisms
""" """
__version__ = "$Revision: 1.5 $"[11:-2] __version__ = "$Revision: 1.6 $"[11:-2]
from cPickle import dumps, loads from cPickle import loads
from thread import allocate_lock from thread import allocate_lock
from smac import SizedMessageAsyncConnection from smac import SizedMessageAsyncConnection
import socket, string, struct import socket, string, struct, asyncore, sys, time, cPickle
TupleType=type(()) TupleType=type(())
from zLOG import LOG, TRACE, DEBUG
Wakeup=None # We create a special fast pickler! This allows us
# to create slightly more efficient pickles and
# to create them a tad faster.
pickler=cPickle.Pickler()
pickler.fast=1 # Don't use the memo
dump=pickler.dump
class asyncRPC(SizedMessageAsyncConnection):
class syncRPC: __map=0
"""Synchronous rpc""" def __Wakeup(*args): pass
_outOfBand=None def __init__(self, connection, outOfBand=None, tmin=5, tmax=300, debug=0):
self._connection=connection
def __init__(self, connection, outOfBand=None):
host, port = connection
s=socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.connect(host, port)
self._sync__s=s
self._sync__q=[]
self._outOfBand=outOfBand self._outOfBand=outOfBand
self._tmin, self._tmax = tmin, tmax
self._debug=debug
def setOutOfBand(self, f): l=allocate_lock() # Response lock used to wait for call results
"""Define a call-back function for handling out-of-band communication self.__la=l.acquire
self.__lr=l.release
Normal communications from the server consists of call returns self.__r=None
and exception returns. The server may also send asynchronous l.acquire()
messages to the client. For the client to recieve these
messages, it must register an out-of-band callback
function. The function will be called with a single-character
message code and a message argument.
"""
self._outOfBand=f def connect(self, tryonce=1):
t=self._tmin
connection = self._connection
debug=self._debug
while 1:
try:
if type(connection) is type(''):
s=socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
else:
s=socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.connect(connection)
except:
if debug:
LOG(debug, DEBUG, "Failed to connect to server")
if tryonce: return 0
time.sleep(t)
t=t*2
if t > self._tmax: t=self._tmax
else:
if debug:
LOG(debug, DEBUG, "Connected to server")
# Make sure the result lock is set, se we don't
# get an old result (e.g. the exception that
# we generated on close).
self.__r=None
self.__la(0)
self.aq_parent.notifyConnected(s)
return 1
def finishConnect(self, s):
SizedMessageAsyncConnection.__init__(self, s, {})
# we are our own socket map!
def keys(self): return (self._fileno,)
def values(self): return (self,)
def items(self): return ((self._fileno,self),)
def __len__(self): return 1
def __getitem__(self, key):
if key==self._fileno: return self
raise KeyError, key
def readLoop(self):
la=self.__la
while not la(0):
asyncore.poll(60.0, self)
self.__lr()
def setLoop(self, map=None, Wakeup=lambda : None):
if map is None: self.__map=0
else:
self.add_channel(map) # asyncore registration
self.__map=1
def close(self): self._sync__s.close() self.__Wakeup=Wakeup
def __call__(self, *args): def __call__(self, *args):
args=dumps(args,1) args=dump(args,1)
self._write(args) self.message_output(args)
if self.__map: self.__Wakeup() # You dumb bastard
else: self.readLoop()
while 1: while 1:
r=self._read() r=self._read()
c=r[:1] c=r[:1]
if c=='R': if c=='R':
if r=='RN.': return None # Common case!
return loads(r[1:]) return loads(r[1:])
if c=='E': if c=='E':
r=loads(r[1:]) r=loads(r[1:])
...@@ -96,101 +194,36 @@ class syncRPC: ...@@ -96,101 +194,36 @@ class syncRPC:
raise r raise r
oob=self._outOfBand oob=self._outOfBand
if oob is not None: if oob is not None:
oob(c, loads(r[1:])) r=r[1:]
if r=='N.': r=None # Common case!
else: r=loads(r)
oob(c, r)
else: else:
raise UnrecognizedResult, r raise UnrecognizedResult, r
def queue(self, *args): def sendMessage(self, *args):
self._sync__q.append(dumps(args,1)) self.message_output(dump(args,1))
if self.__map: self.__Wakeup() # You dumb bastard
def send(self, *args): else: asyncore.poll(0.0, self)
self._write(dumps(args,1))
def _write(self, data, pack=struct.pack):
send=self._sync__s.send
h=pack(">i", len(data))
l=len(h)
while l > 0:
sent=send(h)
h=h[sent:]
l=l-sent
l=len(data)
while l > 0:
sent=send(data)
data=data[sent:]
l=l-sent
def _read(self, _st=type(''), join=string.join, unpack=struct.unpack):
recv=self._sync__s.recv
l=4
data=None
while l > 0:
d=recv(l)
if data is None: data=d
elif type(data) is _st: data=[data, d]
else: data.append(d)
l=l-len(d)
if type(data) is not _st: data=join(data,'')
l,=unpack(">i", data)
data=None
while l > 0:
d=recv(l)
if data is None: data=d
elif type(data) is _st: data=[data, d]
else: data.append(d)
l=l-len(d)
if type(data) is not _st: data=join(data,'')
return data
class asyncRPC(SizedMessageAsyncConnection, syncRPC):
def __init__(self, connection, outOfBand=None):
try:
host, port = connection
except:
s=connection._sync__s
SizedMessageAsyncConnection.__init__(self, s, None)
self._outOfBand=connection._outOfBand
for m in connection._sync__q:
self.message_output(m)
else:
s=socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.connect(host, port)
SizedMessageAsyncConnection.__init__(self, s, None)
self._outOfBand=outOfBand
l=allocate_lock()
self.__la=l.acquire
self.__lr=l.release
self.__r=None
l.acquire()
global Wakeup def setOutOfBand(self, f):
if Wakeup is None: """Define a call-back function for handling out-of-band communication
import ZServer.PubCore.ZEvent
Wakeup=ZServer.PubCore.ZEvent.Wakeup
def queue(self, *args): Normal communications from the server consists of call returns
self.message_output(dumps(args,1)) and exception returns. The server may also send asynchronous
Wakeup() # You dumb bastard messages to the client. For the client to recieve these
messages, it must register an out-of-band callback
def _write(self, data): function. The function will be called with a single-character
self.message_output(data) message code and a message argument.
Wakeup() # You dumb bastard """
self._outOfBand=f
def message_input(self, m): def message_input(self, m):
if __debug__: if self._debug:
md=`m` md=`m`
if len(m) > 60: md=md[:60]+' ...' if len(m) > 60: md=md[:60]+' ...'
print 'message_input', md LOG(self._debug, TRACE, 'message_input %s' % md)
c=m[:1] c=m[:1]
if c in 'RE': if c in 'RE':
...@@ -198,11 +231,19 @@ class asyncRPC(SizedMessageAsyncConnection, syncRPC): ...@@ -198,11 +231,19 @@ class asyncRPC(SizedMessageAsyncConnection, syncRPC):
self.__lr() self.__lr()
else: else:
oob=self._outOfBand oob=self._outOfBand
if oob is not None: oob(c, loads(m[1:])) if oob is not None:
m=m[1:]
if m=='N.': m=None
else: m=loads(m)
oob(c, m)
def _read(self): def _read(self):
self.__la() self.__la()
return self.__r return self.__r
def close(self):
asyncRPC.inheritedAttribute('close')(self)
self.aq_parent.notifyDisconnected(self)
self.__r='E'+dump(sys.exc_info()[:2], 1)
try: self.__lr()
except: pass
###################################################################### ##############################################################################
# Digital Creations Options License Version 0.9.0
# -----------------------------------------------
# #
# Copyright (c) 1999, Digital Creations. All rights reserved. # Zope Public License (ZPL) Version 1.0
# -------------------------------------
# #
# This license covers Zope software delivered as "options" by Digital # Copyright (c) Digital Creations. All rights reserved.
# Creations.
# #
# Use in source and binary forms, with or without modification, are # This license has been certified as Open Source(tm).
# permitted provided that the following conditions are met:
# #
# 1. Redistributions are not permitted in any form. # Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are
# met:
# #
# 2. This license permits one copy of software to be used by up to five # 1. Redistributions in source code must retain the above copyright
# developers in a single company. Use by more than five developers # notice, this list of conditions, and the following disclaimer.
# requires additional licenses.
# #
# 3. Software may be used to operate any type of website, including # 2. Redistributions in binary form must reproduce the above copyright
# publicly accessible ones. # notice, this list of conditions, and the following disclaimer in
# the documentation and/or other materials provided with the
# distribution.
# #
# 4. Software is not fully documented, and the customer acknowledges # 3. Digital Creations requests that attribution be given to Zope
# that the product can best be utilized by reading the source code. # in any manner possible. Zope includes a "Powered by Zope"
# button that is installed by default. While it is not a license
# violation to remove this button, it is requested that the
# attribution remain. A significant investment has been put
# into Zope, and this effort will continue if the Zope community
# continues to grow. This is one way to assure that growth.
#
# 4. All advertising materials and documentation mentioning
# features derived from or use of this software must display
# the following acknowledgement:
#
# "This product includes software developed by Digital Creations
# for use in the Z Object Publishing Environment
# (http://www.zope.org/)."
#
# In the event that the product being advertised includes an
# intact Zope distribution (with copyright and license included)
# then this clause is waived.
#
# 5. Names associated with Zope or Digital Creations must not be used to
# endorse or promote products derived from this software without
# prior written permission from Digital Creations.
#
# 6. Modified redistributions of any form whatsoever must retain
# the following acknowledgment:
#
# "This product includes software developed by Digital Creations
# for use in the Z Object Publishing Environment
# (http://www.zope.org/)."
#
# Intact (re-)distributions of any official Zope release do not
# require an external acknowledgement.
#
# 7. Modifications are encouraged but must be packaged separately as
# patches to official Zope releases. Distributions that do not
# clearly separate the patches from the original work must be clearly
# labeled as unofficial distributions. Modifications which do not
# carry the name Zope may be packaged in any form, as long as they
# conform to all of the clauses above.
# #
# 5. Support for software is included for 90 days in email only. Further
# support can be purchased separately.
# #
# Disclaimer # Disclaimer
# #
...@@ -39,79 +75,118 @@ ...@@ -39,79 +75,118 @@
# OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT # OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT
# OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF # OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
# SUCH DAMAGE. # SUCH DAMAGE.
###################################################################### #
#
# This software consists of contributions made by Digital Creations and
# many individuals on behalf of Digital Creations. Specific
# attributions are listed in the accompanying credits file.
#
##############################################################################
"""Sized message async connections """Sized message async connections
""" """
__version__ = "$Revision: 1.6 $"[11:-2] __version__ = "$Revision: 1.7 $"[11:-2]
import asyncore, string, struct, zLOG, sys, Acquisition
from zLOG import LOG, TRACE, ERROR
import asyncore, string, struct, zLOG class SizedMessageAsyncConnection(Acquisition.Explicit, asyncore.dispatcher):
from zLOG import LOG, INFO, ERROR
class SizedMessageAsyncConnection(asyncore.dispatcher): __append=None # Marker indicating that we're closed
def __init__(self, sock, addr): socket=None # to outwit Sam's getattr
asyncore.dispatcher.__init__(self, sock)
def __init__(self, sock, addr, map=None, debug=None):
SizedMessageAsyncConnection.inheritedAttribute(
'__init__')(self, sock, map)
self.addr=addr self.addr=addr
if debug is not None:
self._debug=debug
elif not hasattr(self, '_debug'):
self._debug=__debug__ and 'smac'
self.__state=None self.__state=None
self.__inp=None self.__inp=None
self.__inpl=0
self.__l=4 self.__l=4
self.__output=output=[] self.__output=output=[]
self.__append=output.append self.__append=output.append
self.__pop=output.pop self.__pop=output.pop
def handle_read(self, def handle_read(self,
join=string.join, StringType=type('')): join=string.join, StringType=type(''), _type=type,
l=self.__l StringType=type(''), _None=None):
d=self.recv(l)
d=self.recv(8096)
if not d: return
inp=self.__inp inp=self.__inp
if inp is None: if inp is _None:
inp=d inp=d
elif type(inp) is StringType: elif _type(inp) is StringType:
inp=[inp,d] inp=[inp,d]
else: else:
inp.append(d) inp.append(d)
l=l-len(d) inpl=self.__inpl+len(d)
if l <= 0: l=self.__l
if type(inp) is not StringType: inp=join(inp,'')
if self.__state is None: while 1:
# waiting for message
self.__l=struct.unpack(">i",inp)[0] if l <= inpl:
self.__state=1 # Woo hoo, we have enough data
self.__inp=None if _type(inp) is not StringType: inp=join(inp,'')
d=inp[:l]
inp=inp[l:]
inpl=inpl-l
if self.__state is _None:
# waiting for message
l=struct.unpack(">i",d)[0]
self.__state=1
else:
l=4
self.__state=_None
self.message_input(d)
else: else:
self.__inp=None break # not enough data
self.__l=4
self.__state=None self.__l=l
self.message_input(inp) self.__inp=inp
else: self.__inpl=inpl
self.__l=l
self.__inp=inp
def readable(self): return 1 def readable(self): return 1
def writable(self): return not not self.__output def writable(self): return not not self.__output
def handle_write(self): def handle_write(self):
output=self.__output output=self.__output
if output: while output:
v=output[0] v=output[0]
n=self.send(v) n=self.send(v)
if n < len(v): if n < len(v):
output[0]=v[n:] output[0]=v[n:]
break # we can't write any more
else: else:
del output[0] del output[0]
#break # waaa
def handle_close(self): def handle_close(self):
self.close() self.close()
def message_output(self, message, def message_output(self, message,
pack=struct.pack, len=len): pack=struct.pack, len=len):
if __debug__: if self._debug:
if len(message) > 40: m=message[:40]+' ...' if len(message) > 40: m=message[:40]+' ...'
else: m=message else: m=message
LOG('smax', INFO, 'message_output %s' % `m`) LOG(self._debug, TRACE, 'message_output %s' % `m`)
self.__append(pack(">i",len(message))+message)
append=self.__append
if append is None:
raise Disconnected, (
"This action is temporarily unavailable."
"<p>"
)
append(pack(">i",len(message))+message)
def log_info(self, message, type='info'): def log_info(self, message, type='info'):
if type=='error': type=ERROR if type=='error': type=ERROR
...@@ -119,3 +194,13 @@ class SizedMessageAsyncConnection(asyncore.dispatcher): ...@@ -119,3 +194,13 @@ class SizedMessageAsyncConnection(asyncore.dispatcher):
LOG('ZEO', type, message) LOG('ZEO', type, message)
log=log_info log=log_info
def close(self):
if self.__append is not None:
self.__append=None
SizedMessageAsyncConnection.inheritedAttribute('close')(self)
class Disconnected(Exception):
"""The client has become disconnected from the server
"""
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment