Commit d120bfa9 authored by Jim Fulton's avatar Jim Fulton

Added client storage cache

parent 2dd378d9
...@@ -47,9 +47,10 @@ ...@@ -47,9 +47,10 @@
############################################################################## ##############################################################################
"""Network ZODB storage client """Network ZODB storage client
""" """
__version__='$Revision: 1.2 $'[11:-2] __version__='$Revision: 1.3 $'[11:-2]
import struct, time, os, socket, cPickle, string, Sync, zrpc import struct, time, os, socket, cPickle, string, Sync, zrpc, ClientCache
import tempfile
now=time.time now=time.time
from struct import pack, unpack from struct import pack, unpack
from ZODB import POSException, BaseStorage from ZODB import POSException, BaseStorage
...@@ -63,8 +64,12 @@ class UnrecognizedResult(POSException.StorageError): ...@@ -63,8 +64,12 @@ class UnrecognizedResult(POSException.StorageError):
class ClientStorage(BaseStorage.BaseStorage): class ClientStorage(BaseStorage.BaseStorage):
def __init__(self, connection, async=0): def __init__(self, connection, async=0, storage='1', cache_size=20000000):
# Decide whether to use non-temporary files
client=os.environ.get('ZEO_CLIENT','')
if client: async=1
if async: if async:
import asyncore import asyncore
def loop(timeout=30.0, use_poll=0, def loop(timeout=30.0, use_poll=0,
...@@ -77,17 +82,27 @@ class ClientStorage(BaseStorage.BaseStorage): ...@@ -77,17 +82,27 @@ class ClientStorage(BaseStorage.BaseStorage):
self._call=zrpc.sync(connection) self._call=zrpc.sync(connection)
self.__begin='tpc_begin_sync' self.__begin='tpc_begin_sync'
self._call._write('1') self._call._write(str(storage))
info=self._call('get_info') info=self._call('get_info')
self._len=info.get('length',0) self._len=info.get('length',0)
self._size=info.get('size',0) self._size=info.get('size',0)
self.__name__=info.get('name', str(connection)) name="%s %s" % (info.get('name', ''), str(connection))
self._supportsUndo=info.get('supportsUndo',0) self._supportsUndo=info.get('supportsUndo',0)
self._supportsVersions=info.get('supportsVersions',0) self._supportsVersions=info.get('supportsVersions',0)
BaseStorage.BaseStorage.__init__(self,
info.get('name', str(connection)), self._tfile=tempfile.TemporaryFile()
)
self._cache=ClientCache.ClientCache(storage, cache_size, client=client)
if async:
for oid, (s, vs) in self._cache.open():
self._call.queue('zeoVerify', oid, s, vs)
else:
for oid, (s, vs) in self._cache.open():
self._call.send('zeoVerify', oid, s, vs)
BaseStorage.BaseStorage.__init__(self, name)
def becomeAsync(self): def becomeAsync(self):
self._call=zrpc.async(self._call) self._call=zrpc.async(self._call)
...@@ -96,13 +111,15 @@ class ClientStorage(BaseStorage.BaseStorage): ...@@ -96,13 +111,15 @@ class ClientStorage(BaseStorage.BaseStorage):
def registerDB(self, db, limit): def registerDB(self, db, limit):
def invalidate(code, args, def invalidate(code, args,
invalidate=db.invalidate, dinvalidate=db.invalidate,
limit=limit, limit=limit,
release=self._commit_lock_release, release=self._commit_lock_release,
cinvalidate=self._cache.invalidate
): ):
if code == 'I': if code == 'I':
for oid, serial, version in args: for oid, serial, version in args:
invalidate(oid, version=version) cinvalidate(oid, version=version)
dinvalidate(oid, version=version)
elif code == 'U': elif code == 'U':
release() release()
...@@ -141,12 +158,22 @@ class ClientStorage(BaseStorage.BaseStorage): ...@@ -141,12 +158,22 @@ class ClientStorage(BaseStorage.BaseStorage):
def load(self, oid, version, _stuff=None): def load(self, oid, version, _stuff=None):
self._lock_acquire() self._lock_acquire()
try: return self._call('load', oid, version) try:
p = self._cache.load(oid, version)
if p is not None: return p
p, s, v, pv, sv = self._call('zeoLoad', oid)
self._cache.store(oid, p, s, v, pv, sv)
if not v or not version or version != v:
return p, s
return pv, sv
finally: self._lock_release() finally: self._lock_release()
def modifiedInVersion(self, oid): def modifiedInVersion(self, oid):
self._lock_acquire() self._lock_acquire()
try: return self._call('modifiedInVersion', oid) try:
v=self._cache.modifiedInVersion(oid)
if v is not None: return v
return self._call('modifiedInVersion', oid)
finally: self._lock_release() finally: self._lock_release()
def new_oid(self, last=None): def new_oid(self, last=None):
...@@ -165,8 +192,16 @@ class ClientStorage(BaseStorage.BaseStorage): ...@@ -165,8 +192,16 @@ class ClientStorage(BaseStorage.BaseStorage):
if transaction is not self._transaction: if transaction is not self._transaction:
raise POSException.StorageTransactionError(self, transaction) raise POSException.StorageTransactionError(self, transaction)
self._lock_acquire() self._lock_acquire()
try: return self._call('store', oid, serial, try:
data, version, self._serial) serial=self._call('store', oid, serial,
data, version, self._serial)
write=self._tfile.write
write(oid+serial+pack(">HI", len(version), len(data))+version)
write(data)
return serial
finally: self._lock_release() finally: self._lock_release()
def supportsUndo(self): return self._supportsUndo def supportsUndo(self): return self._supportsUndo
...@@ -178,6 +213,7 @@ class ClientStorage(BaseStorage.BaseStorage): ...@@ -178,6 +213,7 @@ class ClientStorage(BaseStorage.BaseStorage):
if transaction is not self._transaction: return if transaction is not self._transaction: return
self._call('tpc_abort', self._serial) self._call('tpc_abort', self._serial)
self._transaction=None self._transaction=None
self._tfile.seek(0)
self._commit_lock_release() self._commit_lock_release()
finally: self._lock_release() finally: self._lock_release()
...@@ -194,6 +230,8 @@ class ClientStorage(BaseStorage.BaseStorage): ...@@ -194,6 +230,8 @@ class ClientStorage(BaseStorage.BaseStorage):
self._ts=t=t.laterThan(self._ts) self._ts=t=t.laterThan(self._ts)
self._serial=id=`t` self._serial=id=`t`
self._tfile.seek(0)
while 1: while 1:
self._lock_release() self._lock_release()
self._commit_lock_acquire() self._commit_lock_acquire()
...@@ -216,6 +254,26 @@ class ClientStorage(BaseStorage.BaseStorage): ...@@ -216,6 +254,26 @@ class ClientStorage(BaseStorage.BaseStorage):
transaction.description, transaction.description,
transaction._extension) transaction._extension)
tfile=self._tfile
seek=tfile.seek
read=tfile.read
cache=self._cache
size=tfile.tell()
seek(0)
i=0
while i < size:
oid=read(8)
s=read(8)
h=read(6)
vlen, dlen = unpack(">HI", h)
if vlen: v=read(vlen)
else: v=''
p=read(dlen)
cache.update(oid, s, v, p)
i=i+22+vlen+dlen
seek(0)
self._transaction=None self._transaction=None
self._commit_lock_release() self._commit_lock_release()
finally: self._lock_release() finally: self._lock_release()
......
...@@ -18,3 +18,13 @@ Zope Enterprize Option, iteration 1 ...@@ -18,3 +18,13 @@ Zope Enterprize Option, iteration 1
The port number is, of course, the port number used to start the The port number is, of course, the port number used to start the
storage server. The async switch tells the client to switch storage server. The async switch tells the client to switch
itself to async mode (if and) when the asyncore main loop is called. itself to async mode (if and) when the asyncore main loop is called.
If you want a persistent client cache, you need to define the
environment variable, ZEO_CLIENT to a unique name for the
client. This is needed so that unique cache name files can be
computed. Otherwise, the client cache is stored in temporary files.
For example, to start two Zope processes with unique caches, use
something like:
python z2.py -P8700 ZEO_CLIENT=8700
python z2.py -P8800 ZEO_CLIENT=8800
...@@ -54,7 +54,7 @@ class StorageServer(asyncore.dispatcher): ...@@ -54,7 +54,7 @@ class StorageServer(asyncore.dispatcher):
for c in self.__connections[storage_id]: for c in self.__connections[storage_id]:
if c is connection: continue if c is connection: continue
c.message_output('I'+dumps(invalidated)) c.message_output('I'+dumps(invalidated))
def writable(self): return 0 def writable(self): return 0
def handle_read(self): pass def handle_read(self): pass
...@@ -75,8 +75,9 @@ storage_methods={} ...@@ -75,8 +75,9 @@ storage_methods={}
for n in ('get_info', 'abortVersion', 'commitVersion', 'history', for n in ('get_info', 'abortVersion', 'commitVersion', 'history',
'load', 'modifiedInVersion', 'new_oid', 'pack', 'store', 'load', 'modifiedInVersion', 'new_oid', 'pack', 'store',
'tpc_abort', 'tpc_begin', 'tpc_begin_sync', 'tpc_finish', 'undo', 'tpc_abort', 'tpc_begin', 'tpc_begin_sync', 'tpc_finish', 'undo',
'undoLog', 'undoLog', 'versionEmpty',
'versionEmpty'): 'zeoLoad', 'zeoVerify',
):
storage_methods[n]=1 storage_methods[n]=1
storage_method=storage_methods.has_key storage_method=storage_methods.has_key
...@@ -153,6 +154,27 @@ class Connection(smac): ...@@ -153,6 +154,27 @@ class Connection(smac):
'supportsVersions': storage.supportsVersions(), 'supportsVersions': storage.supportsVersions(),
} }
def zeoLoad(self, oid):
storage=self.__storage
v=storage.modifiedInVersion(oid)
if v: pv, sv = storage.load(oid, v)
else: pv=sv=None
p, s = storage.load(oid,'')
return p, s, v, pv, sv
def zeoVerify(self, oid, s, sv,
dumps=cPickle.dumps):
try: p, os, v, pv, osv = self.zeoLoad(oid)
except: return _noreturn
p=pv=None # free the pickles
if os != s:
self.message_output('I'+dumps(((oid, os, ''),)))
elif osv != sv:
self.message_output('I'+dumps(((oid, osv, v),)))
return _noreturn
def store(self, oid, serial, data, version, id): def store(self, oid, serial, data, version, id):
t=self._transaction t=self._transaction
if t is None or id != t.id: if t is None or id != t.id:
......
...@@ -20,6 +20,7 @@ class sync: ...@@ -20,6 +20,7 @@ class sync:
s=socket.socket(socket.AF_INET, socket.SOCK_STREAM) s=socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.connect(host, port) s.connect(host, port)
self._sync__s=s self._sync__s=s
self._sync__q=[]
self._outOfBand=outOfBand self._outOfBand=outOfBand
def setOutOfBand(self, f): self._outOfBand=f def setOutOfBand(self, f): self._outOfBand=f
...@@ -44,6 +45,12 @@ class sync: ...@@ -44,6 +45,12 @@ class sync:
else: else:
raise UnrecognizedResult, r raise UnrecognizedResult, r
def queue(self, *args):
self._sync__q.append(dumps(args,1))
def send(self, *args):
self._write(dumps(args,1))
def _write(self, data, pack=struct.pack): def _write(self, data, pack=struct.pack):
send=self._sync__s.send send=self._sync__s.send
h=pack(">i", len(data)) h=pack(">i", len(data))
...@@ -93,10 +100,15 @@ class async(smac, sync): ...@@ -93,10 +100,15 @@ class async(smac, sync):
host, port = connection host, port = connection
except: except:
s=connection._sync__s s=connection._sync__s
smac.__init__(self, s, None)
self._outOfBand=connection._outOfBand self._outOfBand=connection._outOfBand
for m in connection._sync__q:
self.message_output(m)
else: else:
s=socket.socket(socket.AF_INET, socket.SOCK_STREAM) s=socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.connect(host, port) s.connect(host, port)
smac.__init__(self, s, None)
self._outOfBand=outOfBand self._outOfBand=outOfBand
l=allocate_lock() l=allocate_lock()
...@@ -104,13 +116,15 @@ class async(smac, sync): ...@@ -104,13 +116,15 @@ class async(smac, sync):
self.__lr=l.release self.__lr=l.release
self.__r=None self.__r=None
l.acquire() l.acquire()
smac.__init__(self, s, None)
global Wakeup global Wakeup
if Wakeup is None: if Wakeup is None:
import ZServer.PubCore.ZEvent import ZServer.PubCore.ZEvent
Wakeup=ZServer.PubCore.ZEvent.Wakeup Wakeup=ZServer.PubCore.ZEvent.Wakeup
def queue(self, *args):
self.message_output(dumps(args,1))
Wakeup() # You dumb bastard
def _write(self, data): def _write(self, data):
self.message_output(data) self.message_output(data)
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment