Commit bce3bc78 authored by Julien Muchembled's avatar Julien Muchembled

client: always process invalidations in poll thread

This fixes an invalidation bug, including the following critical error:

CRITICAL txn.140440071526144 A storage error occurred during the second phase of the two-phase commit.  Resources may be in an inconsistent state.
------
ERROR Zope.SiteErrorLog 1342544345.990.582646288246 /erp5/person_module/Folder_create
Traceback (innermost last):
  Module ZPublisher.Publish, line 137, in publish
  Module Zope2.App.startup, line 291, in commit
  Module transaction._manager, line 93, in commit
  Module transaction._transaction, line 322, in commit
  Module transaction._transaction, line 424, in _commitResources
  Module neo.client, line 42, in tpc_finish
  Module neo.client.Storage, line 135, in tpc_finish
  Module neo.client.app, line 773, in tpc_finish
  Module neo.client, line 36, in callback
  Module ZODB.DB, line 693, in invalidate
  Module ZODB.DB, line 532, in _connectionMap
  Module ZODB.DB, line 221, in map
  Module transaction.weakset, line 58, in map
  Module ZODB.DB, line 692, in inval
  Module ZODB.Connection, line 350, in invalidate
AssertionError: invalidations out of order, '\x03\x97\xec;\x19\x86\xc9\xf6' < '\x03\x97\xec;\x19\x87_\xdd'
parent 4e1657c9
...@@ -218,10 +218,10 @@ class Application(object): ...@@ -218,10 +218,10 @@ class Application(object):
self.setHandlerData(None) self.setHandlerData(None)
@profiler_decorator @profiler_decorator
def _ask(self, conn, packet, handler=None): def _ask(self, conn, packet, handler=None, **kw):
self.setHandlerData(None) self.setHandlerData(None)
queue = self._getThreadQueue() queue = self._getThreadQueue()
msg_id = conn.ask(packet, queue=queue) msg_id = conn.ask(packet, queue=queue, **kw)
get = queue.get get = queue.get
_handlePacket = self._handlePacket _handlePacket = self._handlePacket
while True: while True:
...@@ -242,15 +242,15 @@ class Application(object): ...@@ -242,15 +242,15 @@ class Application(object):
return self.getHandlerData() return self.getHandlerData()
@profiler_decorator @profiler_decorator
def _askStorage(self, conn, packet): def _askStorage(self, conn, packet, **kw):
""" Send a request to a storage node and process its answer """ """ Send a request to a storage node and process its answer """
return self._ask(conn, packet, handler=self.storage_handler) return self._ask(conn, packet, handler=self.storage_handler, **kw)
@profiler_decorator @profiler_decorator
def _askPrimary(self, packet): def _askPrimary(self, packet, **kw):
""" Send a request to the primary master and process its answer """ """ Send a request to the primary master and process its answer """
return self._ask(self._getMasterConnection(), packet, return self._ask(self._getMasterConnection(), packet,
handler=self.primary_handler) handler=self.primary_handler, **kw)
@profiler_decorator @profiler_decorator
def _getMasterConnection(self): def _getMasterConnection(self):
...@@ -766,11 +766,7 @@ class Application(object): ...@@ -766,11 +766,7 @@ class Application(object):
# Call finish on master # Call finish on master
cache_dict = txn_context['cache_dict'] cache_dict = txn_context['cache_dict']
tid = self._askPrimary(Packets.AskFinishTransaction( tid = self._askPrimary(Packets.AskFinishTransaction(
txn_context['ttid'], cache_dict)) txn_context['ttid'], cache_dict), callback=f)
# Call function given by ZODB
if f is not None:
f(tid)
# Update cache # Update cache
self._cache_lock_acquire() self._cache_lock_acquire()
......
...@@ -228,3 +228,5 @@ class ClientCache(object): ...@@ -228,3 +228,5 @@ class ClientCache(object):
else: else:
if item.next_tid is None: if item.next_tid is None:
item.next_tid = tid item.next_tid = tid
else:
assert item.next_tid <= tid, (item, oid, tid)
...@@ -16,7 +16,7 @@ ...@@ -16,7 +16,7 @@
from neo.lib import logging from neo.lib import logging
from neo.lib.pt import MTPartitionTable as PartitionTable from neo.lib.pt import MTPartitionTable as PartitionTable
from neo.lib.protocol import NodeTypes, NodeStates, ProtocolError from neo.lib.protocol import NodeTypes, NodeStates, Packets, ProtocolError
from neo.lib.util import dump from neo.lib.util import dump
from . import BaseHandler, AnswerBaseHandler from . import BaseHandler, AnswerBaseHandler
from ..exception import NEOStorageError from ..exception import NEOStorageError
...@@ -90,6 +90,13 @@ class PrimaryBootstrapHandler(AnswerBaseHandler): ...@@ -90,6 +90,13 @@ class PrimaryBootstrapHandler(AnswerBaseHandler):
class PrimaryNotificationsHandler(BaseHandler): class PrimaryNotificationsHandler(BaseHandler):
""" Handler that process the notifications from the primary master """ """ Handler that process the notifications from the primary master """
def packetReceived(self, conn, packet, kw={}):
if type(packet) is Packets.AnswerTransactionFinished:
callback = kw.pop('callback')
if callback is not None:
callback(packet.decode()[1])
BaseHandler.packetReceived(self, conn, packet, kw)
def connectionClosed(self, conn): def connectionClosed(self, conn):
app = self.app app = self.app
if app.master_conn is not None: if app.master_conn is not None:
......
...@@ -49,9 +49,9 @@ def getPartitionTable(self): ...@@ -49,9 +49,9 @@ def getPartitionTable(self):
self.master_conn = _getMasterConnection(self) self.master_conn = _getMasterConnection(self)
return self.pt return self.pt
def _ask(self, conn, packet, handler=None): def _ask(self, conn, packet, handler=None, **kw):
self.setHandlerData(None) self.setHandlerData(None)
conn.ask(packet) conn.ask(packet, **kw)
if handler is None: if handler is None:
raise NotImplementedError raise NotImplementedError
else: else:
...@@ -490,9 +490,6 @@ class ClientApplicationTests(NeoUnitTestBase): ...@@ -490,9 +490,6 @@ class ClientApplicationTests(NeoUnitTestBase):
txn_context = self._begin(app, txn, tid) txn_context = self._begin(app, txn, tid)
self.f_called = False self.f_called = False
self.f_called_with_tid = None self.f_called_with_tid = None
def hook(tid):
self.f_called = True
self.f_called_with_tid = tid
packet = Packets.AnswerTransactionFinished(ttid, tid) packet = Packets.AnswerTransactionFinished(ttid, tid)
packet.setId(0) packet.setId(0)
app.master_conn = Mock({ app.master_conn = Mock({
...@@ -501,9 +498,7 @@ class ClientApplicationTests(NeoUnitTestBase): ...@@ -501,9 +498,7 @@ class ClientApplicationTests(NeoUnitTestBase):
'fakeReceived': packet, 'fakeReceived': packet,
}) })
txn_context['txn_voted'] = True txn_context['txn_voted'] = True
app.tpc_finish(txn, None, hook) app.tpc_finish(txn, None)
self.assertTrue(self.f_called)
self.assertEqual(self.f_called_with_tid, tid)
self.checkAskFinishTransaction(app.master_conn) self.checkAskFinishTransaction(app.master_conn)
#self.checkDispatcherRegisterCalled(app, app.master_conn) #self.checkDispatcherRegisterCalled(app, app.master_conn)
self.assertEqual(app._txn_container.get(txn), None) self.assertEqual(app._txn_container.get(txn), None)
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment