Commit a6d4c4e9 authored by Julien Muchembled's avatar Julien Muchembled

Serialize empty transaction extension with an empty string

The protocol version is increased to ensure that client nodes are able to
handle an empty 'extension' field in AnswerTransactionInformation.

It also means that once new transactions are written, going back to a previous
revision is not possible.
parent 346c9d00
...@@ -14,10 +14,14 @@ ...@@ -14,10 +14,14 @@
# You should have received a copy of the GNU General Public License # You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>. # along with this program. If not, see <http://www.gnu.org/licenses/>.
from cPickle import dumps, loads
import heapq import heapq
import time import time
try:
from ZODB._compat import dumps, loads, _protocol
except ImportError:
from cPickle import dumps, loads
_protocol = 1
from ZODB.POSException import UndoError, ConflictError, ReadConflictError from ZODB.POSException import UndoError, ConflictError, ReadConflictError
from . import OLD_ZODB from . import OLD_ZODB
if OLD_ZODB: if OLD_ZODB:
...@@ -543,9 +547,12 @@ class Application(ThreadedApplication): ...@@ -543,9 +547,12 @@ class Application(ThreadedApplication):
txn_context = self._txn_container.get(transaction) txn_context = self._txn_container.get(transaction)
self.waitStoreResponses(txn_context) self.waitStoreResponses(txn_context)
ttid = txn_context.ttid ttid = txn_context.ttid
ext = transaction._extension
ext = dumps(ext, _protocol) if ext else ''
# user and description are cast to str in case they're unicode.
# BBB: This is not required anymore with recent ZODB.
packet = Packets.AskStoreTransaction(ttid, str(transaction.user), packet = Packets.AskStoreTransaction(ttid, str(transaction.user),
str(transaction.description), dumps(transaction._extension), str(transaction.description), ext, txn_context.cache_dict)
txn_context.cache_dict)
queue = txn_context.queue queue = txn_context.queue
involved_nodes = txn_context.involved_nodes involved_nodes = txn_context.involved_nodes
# Ask in parallel all involved storage nodes to commit object metadata. # Ask in parallel all involved storage nodes to commit object metadata.
...@@ -775,10 +782,6 @@ class Application(ThreadedApplication): ...@@ -775,10 +782,6 @@ class Application(ThreadedApplication):
self.waitStoreResponses(txn_context) self.waitStoreResponses(txn_context)
return None, txn_oid_list return None, txn_oid_list
def _insertMetadata(self, txn_info, extension):
for k, v in loads(extension).items():
txn_info[k] = v
def _getTransactionInformation(self, tid): def _getTransactionInformation(self, tid):
return self._askStorageForRead(tid, return self._askStorageForRead(tid,
Packets.AskTransactionInformation(tid)) Packets.AskTransactionInformation(tid))
...@@ -818,7 +821,8 @@ class Application(ThreadedApplication): ...@@ -818,7 +821,8 @@ class Application(ThreadedApplication):
if filter is None or filter(txn_info): if filter is None or filter(txn_info):
txn_info.pop('packed') txn_info.pop('packed')
txn_info.pop("oids") txn_info.pop("oids")
self._insertMetadata(txn_info, txn_ext) if txn_ext:
txn_info.update(loads(txn_ext))
append(txn_info) append(txn_info)
if len(undo_info) >= last - first: if len(undo_info) >= last - first:
break break
...@@ -846,7 +850,7 @@ class Application(ThreadedApplication): ...@@ -846,7 +850,7 @@ class Application(ThreadedApplication):
tid = None tid = None
for tid in tid_list: for tid in tid_list:
(txn_info, txn_ext) = self._getTransactionInformation(tid) (txn_info, txn_ext) = self._getTransactionInformation(tid)
txn_info['ext'] = loads(txn_ext) txn_info['ext'] = loads(txn_ext) if txn_ext else {}
append(txn_info) append(txn_info)
return (tid, txn_list) return (tid, txn_list)
...@@ -865,7 +869,8 @@ class Application(ThreadedApplication): ...@@ -865,7 +869,8 @@ class Application(ThreadedApplication):
txn_info['size'] = size txn_info['size'] = size
if filter is None or filter(txn_info): if filter is None or filter(txn_info):
result.append(txn_info) result.append(txn_info)
self._insertMetadata(txn_info, txn_ext) if txn_ext:
txn_info.update(loads(txn_ext))
return result return result
def importFrom(self, storage, source): def importFrom(self, storage, source):
......
...@@ -22,7 +22,7 @@ from struct import Struct ...@@ -22,7 +22,7 @@ from struct import Struct
# The protocol version must be increased whenever upgrading a node may require # The protocol version must be increased whenever upgrading a node may require
# to upgrade other nodes. It is encoded as a 4-bytes big-endian integer and # to upgrade other nodes. It is encoded as a 4-bytes big-endian integer and
# the high order byte 0 is different from TLS Handshake (0x16). # the high order byte 0 is different from TLS Handshake (0x16).
PROTOCOL_VERSION = 1 PROTOCOL_VERSION = 2
ENCODED_VERSION = Struct('!L').pack(PROTOCOL_VERSION) ENCODED_VERSION = Struct('!L').pack(PROTOCOL_VERSION)
# Avoid memory errors on corrupted data. # Avoid memory errors on corrupted data.
......
...@@ -24,6 +24,11 @@ from ZConfig import loadConfigFile ...@@ -24,6 +24,11 @@ from ZConfig import loadConfigFile
from ZODB import BaseStorage from ZODB import BaseStorage
from ZODB.config import getStorageSchema, storageFromString from ZODB.config import getStorageSchema, storageFromString
from ZODB.POSException import POSKeyError from ZODB.POSException import POSKeyError
try:
from ZODB._compat import dumps, loads, _protocol
except ImportError:
from cPickle import dumps, loads
_protocol = 1
from . import buildDatabaseManager, DatabaseFailure from . import buildDatabaseManager, DatabaseFailure
from .manager import DatabaseManager from .manager import DatabaseManager
...@@ -35,6 +40,12 @@ patch.speedupFileStorageTxnLookup() ...@@ -35,6 +40,12 @@ patch.speedupFileStorageTxnLookup()
FORK = sys.platform != 'win32' FORK = sys.platform != 'win32'
def transactionAsTuple(txn):
ext = txn.extension
return (txn.user, txn.description,
dumps(ext, _protocol) if ext else '',
txn.status == 'p', txn.tid)
class Reference(object): class Reference(object):
...@@ -484,11 +495,8 @@ class ImporterDatabaseManager(DatabaseManager): ...@@ -484,11 +495,8 @@ class ImporterDatabaseManager(DatabaseManager):
if tid != z.tid: if tid != z.tid:
if tid: if tid:
yield txn yield txn
txn = z.transaction txn = transactionAsTuple(z.transaction)
tid = txn.tid tid = txn[-1]
txn = (str(txn.user), str(txn.description),
cPickle.dumps(txn.extension),
txn.status == 'p', tid)
zodb = z.zodb zodb = z.zodb
for r in z.transaction: for r in z.transaction:
oid = p64(u64(r.oid) + zodb.shift_oid) oid = p64(u64(r.oid) + zodb.shift_oid)
...@@ -577,8 +585,7 @@ class ImporterDatabaseManager(DatabaseManager): ...@@ -577,8 +585,7 @@ class ImporterDatabaseManager(DatabaseManager):
p64 = util.p64 p64 = util.p64
shift_oid = zodb.shift_oid shift_oid = zodb.shift_oid
return ([p64(u64(x.oid) + shift_oid) for x in txn], return ([p64(u64(x.oid) + shift_oid) for x in txn],
txn.user, txn.description, ) + transactionAsTuple(txn)
cPickle.dumps(txn.extension), 0, tid)
else: else:
return self.db.getTransaction(tid, all) return self.db.getTransaction(tid, all)
...@@ -746,7 +753,7 @@ class TransactionRecord(BaseStorage.TransactionRecord): ...@@ -746,7 +753,7 @@ class TransactionRecord(BaseStorage.TransactionRecord):
def __init__(self, db, tid): def __init__(self, db, tid):
self._oid_list, user, desc, ext, _, _ = db.getTransaction(tid) self._oid_list, user, desc, ext, _, _ = db.getTransaction(tid)
super(TransactionRecord, self).__init__(tid, ' ', user, desc, super(TransactionRecord, self).__init__(tid, ' ', user, desc,
cPickle.loads(ext) if ext else {}) loads(ext) if ext else {})
self._db = db self._db = db
def __iter__(self): def __iter__(self):
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment