Commit a3de2ce8 authored by Jim Fulton's avatar Jim Fulton

Implementated checkCurrentSerialInTransaction for ZEO.

Also did some storage server implementation cleanup, removing some
dups and folding a small module into StorageServer.py.
parent 99cbe353
...@@ -931,6 +931,11 @@ class ClientStorage(object): ...@@ -931,6 +931,11 @@ class ClientStorage(object):
self._tbuf.store(oid, data) self._tbuf.store(oid, data)
return self._check_serials() return self._check_serials()
def checkCurrentSerialInTransaction(self, oid, serial, transaction):
self._check_trans(transaction)
self._server.checkCurrentSerialInTransaction(oid, serial,
id(transaction))
def storeBlob(self, oid, serial, data, blobfilename, version, txn): def storeBlob(self, oid, serial, data, blobfilename, version, txn):
"""Storage API: store a blob object.""" """Storage API: store a blob object."""
assert not version assert not version
......
##############################################################################
#
# Copyright (c) 2001, 2002 Zope Foundation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.1 (ZPL). A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE
#
##############################################################################
"""Log a transaction's commit info during two-phase commit.
A storage server allows multiple clients to commit transactions, but
must serialize them as the actually execute at the server. The
concurrent commits are achieved by logging actions up until the
tpc_vote(). At that point, the entire transaction is committed on the
real storage.
"""
import cPickle
import tempfile
class CommitLog:
def __init__(self):
self.file = tempfile.TemporaryFile(suffix=".comit-log")
self.pickler = cPickle.Pickler(self.file, 1)
self.pickler.fast = 1
self.stores = 0
def size(self):
return self.file.tell()
def delete(self, oid, serial):
self.pickler.dump(('_delete', (oid, serial)))
self.stores += 1
def store(self, oid, serial, data):
self.pickler.dump(('_store', (oid, serial, data)))
self.stores += 1
def restore(self, oid, serial, data, prev_txn):
self.pickler.dump(('_restore', (oid, serial, data, prev_txn)))
self.stores += 1
def undo(self, transaction_id):
self.pickler.dump(('_undo', (transaction_id, )))
self.stores += 1
def __iter__(self):
self.file.seek(0)
unpickler = cPickle.Unpickler(self.file)
for i in range(self.stores):
yield unpickler.load()
def close(self):
if self.file:
self.file.close()
self.file = None
=======
ZEO 2.0
=======
What's ZEO?
-----------
ZEO stands for Zope Enterprise Objects. ZEO is an add-on for Zope
that allows multiple processes to connect to a single ZODB storage.
Those processes can live on different machines, but don't need to.
ZEO 2 has many improvements over ZEO 1, and is incompatible with ZEO 1;
if you upgrade an existing ZEO 1 installation, you must upgrade the
server and all clients simultaneous. If you received ZEO 2 as part of
the ZODB 3 distribution, the ZEO 1 sources are provided in a separate
directory (ZEO1). Some documentation for ZEO is available in the ZODB 3
package in the Doc subdirectory. ZEO depends on the ZODB software; it
can be used with the version of ZODB distributed with Zope 2.5.1 or
later. More information about ZEO can be found in the ZODB Wiki:
http://www.zope.org/Wikis/ZODB
What's here?
------------
This list of filenames is mostly for ZEO developers::
ClientCache.py client-side cache implementation
ClientStorage.py client-side storage implementation
ClientStub.py RPC stubs for callbacks from server to client
CommitLog.py buffer used during two-phase commit on the server
Exceptions.py definitions of exceptions
ICache.py interface definition for the client-side cache
ServerStub.py RPC stubs for the server
StorageServer.py server-side storage implementation
TransactionBuffer.py buffer used for transaction data in the client
__init__.py near-empty file to make this directory a package
simul.py command-line tool to simulate cache behavior
start.py command-line tool to start the storage server
stats.py command-line tool to process client cache traces
tests/ unit tests and other test utilities
util.py utilities used by the server startup tool
version.txt text file indicating the ZEO version
zrpc/ subpackage implementing Remote Procedure Call (RPC)
...@@ -199,6 +199,9 @@ class StorageServer: ...@@ -199,6 +199,9 @@ class StorageServer:
def storea(self, oid, serial, data, id): def storea(self, oid, serial, data, id):
self.rpc.callAsync('storea', oid, serial, data, id) self.rpc.callAsync('storea', oid, serial, data, id)
def checkCurrentSerialInTransaction(self, oid, serial, id):
self.rpc.callAsync('checkCurrentSerialInTransaction', oid, serial, id)
def restorea(self, oid, serial, data, prev_txn, id): def restorea(self, oid, serial, data, prev_txn, id):
self.rpc.callAsync('restorea', oid, serial, data, prev_txn, id) self.rpc.callAsync('restorea', oid, serial, data, prev_txn, id)
......
...@@ -22,7 +22,6 @@ exported for invocation by the server. ...@@ -22,7 +22,6 @@ exported for invocation by the server.
from __future__ import with_statement from __future__ import with_statement
from ZEO.CommitLog import CommitLog
from ZEO.Exceptions import AuthError from ZEO.Exceptions import AuthError
from ZEO.monitor import StorageStats, StatsServer from ZEO.monitor import StorageStats, StatsServer
from ZEO.zrpc.connection import ManagedServerConnection, Delay, MTDelay, Result from ZEO.zrpc.connection import ManagedServerConnection, Delay, MTDelay, Result
...@@ -477,6 +476,7 @@ class ZEOStorage: ...@@ -477,6 +476,7 @@ class ZEOStorage:
if not getattr(self, op)(*args): if not getattr(self, op)(*args):
break break
# Blob support # Blob support
while self.blob_log and not self.store_failed: while self.blob_log and not self.store_failed:
oid, oldserial, data, blobfilename = self.blob_log.pop() oid, oldserial, data, blobfilename = self.blob_log.pop()
...@@ -531,6 +531,10 @@ class ZEOStorage: ...@@ -531,6 +531,10 @@ class ZEOStorage:
self.stats.stores += 1 self.stats.stores += 1
self.txnlog.store(oid, serial, data) self.txnlog.store(oid, serial, data)
def checkCurrentSerialInTransaction(self, oid, serial, id):
self._check_tid(id, exc=StorageTransactionError)
self.txnlog.checkread(oid, serial)
def restorea(self, oid, serial, data, prev_txn, id): def restorea(self, oid, serial, data, prev_txn, id):
self._check_tid(id, exc=StorageTransactionError) self._check_tid(id, exc=StorageTransactionError)
self.stats.stores += 1 self.stats.stores += 1
...@@ -581,13 +585,7 @@ class ZEOStorage: ...@@ -581,13 +585,7 @@ class ZEOStorage:
self._check_tid(tid, exc=StorageTransactionError) self._check_tid(tid, exc=StorageTransactionError)
self.txnlog.undo(trans_id) self.txnlog.undo(trans_id)
def _delete(self, oid, serial): def _op_error(self, oid, err, op):
err = None
try:
self.storage.deleteObject(oid, serial, self.transaction)
except (SystemExit, KeyboardInterrupt):
raise
except Exception, err:
self.store_failed = 1 self.store_failed = 1
if isinstance(err, ConflictError): if isinstance(err, ConflictError):
self.stats.conflicts += 1 self.stats.conflicts += 1
...@@ -595,16 +593,37 @@ class ZEOStorage: ...@@ -595,16 +593,37 @@ class ZEOStorage:
(oid_repr(oid), str(err)), BLATHER) (oid_repr(oid), str(err)), BLATHER)
if not isinstance(err, TransactionError): if not isinstance(err, TransactionError):
# Unexpected errors are logged and passed to the client # Unexpected errors are logged and passed to the client
self.log("store error: %s, %s" % sys.exc_info()[:2], self.log("%s error: %s, %s" % ((op,)+ sys.exc_info()[:2]),
logging.ERROR, exc_info=True) logging.ERROR, exc_info=True)
err = self._marshal_error(err) err = self._marshal_error(err)
# The exception is reported back as newserial for this oid # The exception is reported back as newserial for this oid
self.serials.append((oid, err)) self.serials.append((oid, err))
def _delete(self, oid, serial):
err = None
try:
self.storage.deleteObject(oid, serial, self.transaction)
except (SystemExit, KeyboardInterrupt):
raise
except Exception, err:
self._op_error(oid, err, 'delete')
else: else:
self.invalidated.append(oid) self.invalidated.append(oid)
return err is None return err is None
def _checkread(self, oid, serial):
err = None
try:
self.storage.checkCurrentSerialInTransaction(
oid, serial, self.transaction)
except (SystemExit, KeyboardInterrupt):
raise
except Exception, err:
self._op_error(oid, err, 'checkCurrentSerialInTransaction')
return err is None
def _store(self, oid, serial, data, blobfile=None): def _store(self, oid, serial, data, blobfile=None):
err = None err = None
try: try:
...@@ -617,18 +636,7 @@ class ZEOStorage: ...@@ -617,18 +636,7 @@ class ZEOStorage:
except (SystemExit, KeyboardInterrupt): except (SystemExit, KeyboardInterrupt):
raise raise
except Exception, err: except Exception, err:
self.store_failed = 1 self._op_error(oid, err, 'store')
if isinstance(err, ConflictError):
self.stats.conflicts += 1
self.log("conflict error oid=%s msg=%s" %
(oid_repr(oid), str(err)), BLATHER)
if not isinstance(err, TransactionError):
# Unexpected errors are logged and passed to the client
self.log("store error: %s, %s" % sys.exc_info()[:2],
logging.ERROR, exc_info=True)
err = self._marshal_error(err)
# The exception is reported back as newserial for this oid
newserial = [(oid, err)]
else: else:
if serial != "\0\0\0\0\0\0\0\0": if serial != "\0\0\0\0\0\0\0\0":
self.invalidated.append(oid) self.invalidated.append(oid)
...@@ -636,8 +644,7 @@ class ZEOStorage: ...@@ -636,8 +644,7 @@ class ZEOStorage:
if isinstance(newserial, str): if isinstance(newserial, str):
newserial = [(oid, newserial)] newserial = [(oid, newserial)]
if newserial: for oid, s in newserial or ():
for oid, s in newserial:
if s == ResolvedSerial: if s == ResolvedSerial:
self.stats.conflicts_resolved += 1 self.stats.conflicts_resolved += 1
...@@ -656,14 +663,7 @@ class ZEOStorage: ...@@ -656,14 +663,7 @@ class ZEOStorage:
except (SystemExit, KeyboardInterrupt): except (SystemExit, KeyboardInterrupt):
raise raise
except Exception, err: except Exception, err:
self.store_failed = 1 self._op_error(oid, err, 'restore')
if not isinstance(err, TransactionError):
# Unexpected errors are logged and passed to the client
self.log("store error: %s, %s" % sys.exc_info()[:2],
logging.ERROR, exc_info=True)
err = self._marshal_error(err)
# The exception is reported back as newserial for this oid
self.serials.append((oid, err))
return err is None return err is None
...@@ -674,14 +674,7 @@ class ZEOStorage: ...@@ -674,14 +674,7 @@ class ZEOStorage:
except (SystemExit, KeyboardInterrupt): except (SystemExit, KeyboardInterrupt):
raise raise
except Exception, err: except Exception, err:
self.store_failed = 1 self._op_error(z64, err, 'undo')
if not isinstance(err, TransactionError):
# Unexpected errors are logged and passed to the client
self.log("store error: %s, %s" % sys.exc_info()[:2],
logging.ERROR, exc_info=True)
err = self._marshal_error(err)
# The exception is reported back as newserial for this oid
self.serials.append((z64, err))
else: else:
self.invalidated.extend(oids) self.invalidated.extend(oids)
self.serials.extend((oid, ResolvedSerial) for oid in oids) self.serials.extend((oid, ResolvedSerial) for oid in oids)
...@@ -1535,3 +1528,44 @@ def _addr_label(addr): ...@@ -1535,3 +1528,44 @@ def _addr_label(addr):
host, port = addr host, port = addr
return str(host) + ":" + str(port) return str(host) + ":" + str(port)
class CommitLog:
def __init__(self):
self.file = tempfile.TemporaryFile(suffix=".comit-log")
self.pickler = cPickle.Pickler(self.file, 1)
self.pickler.fast = 1
self.stores = 0
def size(self):
return self.file.tell()
def delete(self, oid, serial):
self.pickler.dump(('_delete', (oid, serial)))
self.stores += 1
def checkread(self, oid, serial):
self.pickler.dump(('_checkread', (oid, serial)))
self.stores += 1
def store(self, oid, serial, data):
self.pickler.dump(('_store', (oid, serial, data)))
self.stores += 1
def restore(self, oid, serial, data, prev_txn):
self.pickler.dump(('_restore', (oid, serial, data, prev_txn)))
self.stores += 1
def undo(self, transaction_id):
self.pickler.dump(('_undo', (transaction_id, )))
self.stores += 1
def __iter__(self):
self.file.seek(0)
unpickler = cPickle.Unpickler(self.file)
for i in range(self.stores):
yield unpickler.load()
def close(self):
if self.file:
self.file.close()
self.file = None
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment