Commit c7c01ce4 authored by Kirill Smelkov's avatar Kirill Smelkov

bigfile/zodb: ZODB.Connection can migrate between threads on close/open and we have to care

Intro
-----

ZODB maintains pool of opened-to-DB connections. For each request Zope
opens 1 connection and, after request handling is done, returns the
connection back to ZODB pool (via Connection.close()). The same
connection will be opened again for handling some future next request at
some future time. This next open can happen in different-from-first
request worker thread.

TransactionManager  (as accessed by transaction.{get,commit,abort,...})
is thread-local, that is e.g. transaction.get() returns different
transaction for threads T1 and T2.

When _ZBigFileH hooks into txn_manager to get a chance to run its
.beforeCompletion() when transaction.commit() is run, it hooks into
_current_ _thread_ transaction manager.

Without unhooking on connection close, and circumstances where
connection migrates to different thread this can lead to
dissynchronization between ZBigFileH managing fileh pages and Connection
with ZODB objects. And even to data corruption, e.g.

    T1              T2

    open
    zarray[0] = 11
    commit
    close

                    open                # opens connection as closed in T1
    open
                    zarray[0] = 21
    commit
                    abort

    close           close

Here zarray[0]=21 _will_ be committed by T1 as part of T1 transaction -
because when T1 does commit .beforeCompletion() for zarray is invoked,
sees there is dirty data and propagate changes to zodb objects in
connection for T2, joins connection for T2 into txn for T1, and then txn
for t1 when doing two-phase-commit stores modified objects to DB ->
oops.

----------------------------------------

To prevent such dissynchronization _ZBigFileH needs to be a DataManager
which works in sync with the connection it was initially created under -
on connection close, unregister from transaction_manager, and on
connection open, register to transaction manager in current, possibly
different, thread context. Then there won't be incorrect
beforeCompletion() notification and corruption.

This issue, besides possible data corruption, was probably also exposing
itself via following ways we've seen in production (everywhere
connection was migrated from T1 to T2):

1. Exception ZODB.POSException.ConnectionStateError:
        ConnectionStateError('Cannot close a connection joined to a transaction',)
        in <bound method Cleanup.__del__ of <App.ZApplication.Cleanup instance at 0x7f10f4bab050>> ignored

     T1          T2

                 modify zarray
                 commit/abort    # does not join zarray to T2.txn,
                                 # because .beforeCompletion() is
                                 # registered in T1.txn_manager

     commit                      # T1 invokes .beforeCompletion()
     ...                         # beforeCompletion() joins ZBigFileH and zarray._p_jar (=T2.conn) to T1.txn
     ...                         # commit is going on in progress
     ...
     ...         close           # T2 thinks request handling is done and
     ...                         # and closes connection. But T2.conn is
     ...                         # still joined to T1.txn

2. Traceback (most recent call last):
     File ".../wendelin/bigfile/file_zodb.py", line 121, in storeblk
       def storeblk(self, blk, buf):   return self.zself.storeblk(blk, buf)
     File ".../wendelin/bigfile/file_zodb.py", line 220, in storeblk
       zblk._v_blkdata = bytes(buf)    # FIXME does memcpy
     File ".../ZODB/Connection.py", line 857, in setstate
       raise ConnectionStateError(msg)
   ZODB.POSException.ConnectionStateError: Shouldn't load state for 0x1f23a5 when the connection is closed

   Similar to "1", but close in T2 happens sooner, so that when T1 does
   the commit and tries to store object to database, Connection refuses to
   do the store:

     T1          T2

                 modify zarray
                 commit/abort

     commit
     ...         close
     ...
     ...
     . obj.store()
     ...
     ...

3. Traceback (most recent call last):
     File ".../wendelin/bigfile/file_zodb.py", line 121, in storeblk
       def storeblk(self, blk, buf):   return self.zself.storeblk(blk, buf)
     File ".../wendelin/bigfile/file_zodb.py", line 221, in storeblk
       zblk._p_changed = True          # if zblk was already in DB: _p_state -> CHANGED
     File ".../ZODB/Connection.py", line 979, in register
       self._register(obj)
     File ".../ZODB/Connection.py", line 989, in _register
       self.transaction_manager.get().join(self)
     File ".../transaction/_transaction.py", line 220, in join
       Status.ACTIVE, Status.DOOMED, self.status))
   ValueError: expected txn status 'Active' or 'Doomed', but it's 'Committing'

  ( storeblk() does zblk._p_changed -> Connection.register(zblk) ->
    txn.join() but txn is already committing

    IOW storeblk() was invoked with txn.state being already 'Committing' )

    T1          T2

                modify obj      # this way T2.conn joins T2.txn
                modify zarray

    commit                      # T1 invokes .beforeCompletion()
    ...                         # beforeCompletion() joins only _ZBigFileH to T1.txn
    ...                         # (because T2.conn is already marked as joined)
    ...
    ...         commit/abort    # T2 does commit/abort - this touches only T2.conn, not ZBigFileH
    ...                         # in particular T2.conn is now reset to be not joined
    ...
    . tpc_begin                 # actual active commit phase of T1 was somehow delayed a bit
    . tpc_commit                # when changes from RAM propagate to ZODB objects associated
    .  storeblk                 # connection (= T2.conn !) is notified again,
    .   zblk = ...              # wants to join txn for it thinks its transaction_manager,
                                # which when called from under T1 returns *T1* transaction manager for
                                # which T1.txn is already in state='Committing'

4. Empty transaction committed to NEO

   ( different from doing just transaction.commit() without changing
     any data - a connection was joined to txn, but set of modified
     object turned out to be empty )

   This is probably a race in Connection._register when both T1 and T2
   go to it at the same time:

   https://github.com/zopefoundation/ZODB/blob/3.10/src/ZODB/Connection.py#L988

   def _register(self, obj=None):
        if self._needs_to_join:
            self.transaction_manager.get().join(self)
            self._needs_to_join = False

    T1                          T2

                                modify zarray
    commit
    ...
    .beforeCompletion           modify obj
    . if T2.conn.needs_join      if T2.conn.needs_join      # race here
    .   T2.conn.join(T1.txn)       T2.conn.join(T2.txn)     # as a result T2.conn joins both T1.txn and T2.txn
    .
    commit finishes             # T2.conn registered-for-commit object list is now empty

                                commit
                                 tpc_begin
                                  storage.tpc_begin
                                 tpc_commit
                                  # no object stored, because for-commit-list is empty

/cc @jm, @klaus, @Tyagov, @vpelletier
parent 64d1f40b
...@@ -16,12 +16,14 @@ ...@@ -16,12 +16,14 @@
# #
# See COPYING file for full licensing terms. # See COPYING file for full licensing terms.
from wendelin.bigarray.array_zodb import ZBigArray from wendelin.bigarray.array_zodb import ZBigArray
from wendelin.bigfile.tests.test_filezodb import kkey, cacheInfo from wendelin.bigfile.tests.test_filezodb import kkey, cacheInfo, NotifyChannel
from wendelin.lib.zodb import dbclose from wendelin.lib.zodb import dbclose
from wendelin.lib.testing import getTestDB from wendelin.lib.testing import getTestDB
from persistent import UPTODATE from persistent import UPTODATE
import transaction import transaction
from numpy import dtype, uint8, all, array_equal, arange from numpy import dtype, uint8, all, array_equal, arange
from threading import Thread
from six.moves import _thread
testdb = None testdb = None
def setup_module(): def setup_module():
...@@ -180,3 +182,178 @@ def test_zbigarray(): ...@@ -180,3 +182,178 @@ def test_zbigarray():
assert a[24*1024*1024+3] == 13 assert a[24*1024*1024+3] == 13
dbclose(root) dbclose(root)
# the same as test_bigfile_filezodb_vs_conn_migration but explicitly for ZBigArray
# ( NOTE this test is almost dup of test_zbigarray_vs_conn_migration() )
def test_zbigarray_vs_conn_migration():
root01 = testdb.dbopen()
conn01 = root01._p_jar
db = conn01.db()
conn01.close()
del root01
c12_1 = NotifyChannel() # T11 -> T21
c21_1 = NotifyChannel() # T21 -> T11
# open, modify, commit, close, open, commit
def T11():
tell, wait = c12_1.tell, c21_1.wait
conn11_1 = db.open()
assert conn11_1 is conn01
# setup zarray
root11_1 = conn11_1.root()
root11_1['zarray2'] = a11 = ZBigArray((10,), uint8)
transaction.commit()
# set initial data
a11[0:1] = [11] # XXX -> [0] = 11 after BigArray can
transaction.commit()
# close conn, wait till T21 reopens it
del a11, root11_1
conn11_1.close()
tell('T1-conn11_1-closed')
wait('T2-conn21-opened')
# open nother connection. it must be different
# (see appropriate place in zfile test about why)
conn11_2 = db.open()
assert conn11_2 is not conn11_1
root11_2 = conn11_2.root()
wait('T2-zarray2-modified')
transaction.commit() # should be nothing
tell('T1-txn12-committed')
wait('T2-conn21-closed')
del root11_2
conn11_2.close()
# hold on this thread until main driver tells us
wait('T11-exit-command')
# open, modify, abort
def T21():
tell, wait = c21_1.tell, c12_1.wait
# wait until T1 finish setting up initial data and get its connection
# (see appropriate place in zfile tests for details)
wait('T1-conn11_1-closed')
conn21 = db.open()
assert conn21 is conn01
tell('T2-conn21-opened')
# modify zarray and arrange timings so that T1 commits after zarray is
# modified, but before we commit/abort.
root21 = conn21.root()
a21 = root21['zarray2']
a21[0:1] = [21] # XXX -> [0] = 21 after BigArray can
tell('T2-zarray2-modified')
wait('T1-txn12-committed')
# abort - zarray2 should stay unchanged
transaction.abort()
del a21, root21
conn21.close()
tell('T2-conn21-closed')
t11, t21 = Thread(target=T11), Thread(target=T21)
t11.start(); t21.start()
t11_ident = t11.ident
t21.join() # NOTE not joining t11 yet
# now verify that zarray2 stays at 11 state, i.e. T21 was really aborted
conn02 = db.open()
# NOTE top of connection stack is conn21(=conn01), becase conn11_2 has 0
# active objects
assert conn02 is conn01
root02 = conn02.root()
a02 = root02['zarray2']
assert a02[0] == 11
del a02, root02
conn02.close()
c12_2 = NotifyChannel() # T12 -> T22
c21_2 = NotifyChannel() # T22 -> T12
# open, abort
def T12():
tell, wait = c12_2.tell, c21_2.wait
wait('T2-conn22-opened')
conn12 = db.open()
tell('T1-conn12-opened')
wait('T2-zarray2-modified')
transaction.abort()
tell('T1-txn-aborted')
wait('T2-txn-committed')
conn12.close()
# open, modify, commit
def T22():
tell, wait = c21_2.tell, c12_2.wait
# make sure we are not the same thread which ran T11
# (should be so because we cared not to stop T11 yet)
assert _thread.get_ident() != t11_ident
conn22 = db.open()
assert conn22 is conn01
tell('T2-conn22-opened')
# modify zarray and arrange timings so that T1 does abort after we
# modify, but before we commit
wait('T1-conn12-opened')
root22 = conn22.root()
a22 = root22['zarray2']
a22[0:1] = [22] # XXX -> [0] = 22 after BigArray can
tell('T2-zarray2-modified')
wait('T1-txn-aborted')
# commit - changes should propagate to zarray
transaction.commit()
tell('T2-txn-committed')
conn22.close()
t12, t22 = Thread(target=T12), Thread(target=T22)
t12.start(); t22.start()
t12.join(); t22.join()
# tell T11 to stop also
c21_1.tell('T11-exit-command')
t11.join()
# now verify that zarray2 changed to 22 state, i.e. T22 was really committed
conn03 = db.open()
# NOTE top of connection stack is conn22(=conn01), becase it has most # of
# active objectd
assert conn03 is conn01
root03 = conn03.root()
a03 = root03['zarray2']
assert a03[0] == 22
del a03
dbclose(root03)
...@@ -263,8 +263,34 @@ Connection.open = Connection_open ...@@ -263,8 +263,34 @@ Connection.open = Connection_open
# handles ZBigFile acts as a database, and for real ZODB database # handles ZBigFile acts as a database, and for real ZODB database
# it acts as (one of) connections. # it acts as (one of) connections.
# #
# NOTE ISynchronizer is used only to be able to join into transactions without # NOTE ISynchronizer is used to be able to join into transactions without
# tracking intermediate changes to pages. # tracking intermediate changes to pages:
#
# _ZBigFileH sticks to ZODB.Connection under which it was originally opened
# and then participates in that connection lifecycle forever keeping sync on
# that connection close and reopen.
#
# This is required because ZBigFile and ZBigArray are both LivePersistent
# (i.e. they never go to ghost state) and thus stays forever live (= active)
# in Connection._cache.
#
# If it was only ZBigFile, we could be opening new fileh every time for
# each connection open and close/unref that fileh object on connection
# close. But:
#
# 1. this scheme is inefficient (upon close, fileh forgets all its loaded
# memory, and thus for newly opened fileh we'd need to reload file data
# from scratch)
#
# 2. ZBigArray need to reference opened fileh --- since ZBigArray stays
# live in Connection._cache, fileh also automatically stay live.
#
# So in essence _ZBigFileH is a data manager which works in sync with ZODB
# Connection propagating changes between fileh memory and ZODB objects.
#
# NOTE Bear in mind that after close, connection can be reopened in different
# thread - that's why we have to adjust registration to per-thread
# transaction_manager.
@implementer(IDataManager) @implementer(IDataManager)
@implementer(ISynchronizer) @implementer(ISynchronizer)
class _ZBigFileH(object): class _ZBigFileH(object):
...@@ -281,15 +307,33 @@ class _ZBigFileH(object): ...@@ -281,15 +307,33 @@ class _ZBigFileH(object):
# IDataManager requires .transaction_manager # IDataManager requires .transaction_manager
self.transaction_manager = zfile._p_jar.transaction_manager self.transaction_manager = zfile._p_jar.transaction_manager
# when connection will be reopened -> txn_manager.registerSynch(self)
zfile._p_jar.onOpenCallback(self.on_connection_open)
# when we are just initially created, the connection is already opened,
# so manually compensate for it.
self.on_connection_open()
def on_connection_open(self):
# when connection is closed -> txn_manager.unregisterSynch(self)
# NOTE close callbacks are fired once, and thus we have to re-register
# it on every open.
self.zfile._p_jar.onCloseCallback(self.on_connection_close)
# attach us to _current_ _thread_ TM (staying in sync with Connection):
#
# Hook into txn_manager so that we get a chance to run before # Hook into txn_manager so that we get a chance to run before
# transaction.commit(). (see .beforeCompletion() with more details) # transaction.commit(). (see .beforeCompletion() with more details)
self.transaction_manager.registerSynch(self) self.transaction_manager.registerSynch(self)
# XXX txn_manager unregister synchs itself (it uses weakset to keep references)
# XXX however that unregistration is delayed to gc.collect() time and
# XXX maybe we should not perform any action right after Connection is closed
# (as it is now .beforeCompletion() continue to be getting notified)
def on_connection_close(self):
# detach us from _current_ _thread_ TM (staying in sync with Connection)
self.transaction_manager.unregisterSynch(self)
# NOTE open callbacks are setup once and fire on every open - we don't
# need to resetup them here.
# ~~~~ BigFileH wrapper ~~~~ # ~~~~ BigFileH wrapper ~~~~
...@@ -306,9 +350,13 @@ class _ZBigFileH(object): ...@@ -306,9 +350,13 @@ class _ZBigFileH(object):
# then immediately join txn, like ZODB Connection do for objects), but # then immediately join txn, like ZODB Connection do for objects), but
# instead join txn here right before commit/abort. # instead join txn here right before commit/abort.
# make sure we are called only when connection is opened
assert self.zfile._p_jar.opened
if not self.zfileh.isdirty(): if not self.zfileh.isdirty():
return return
assert self not in txn._resources # (not to join twice)
txn.join(self) txn.join(self)
# XXX hack - join Connection manually before transaction.commit() starts. # XXX hack - join Connection manually before transaction.commit() starts.
# #
...@@ -327,6 +375,7 @@ class _ZBigFileH(object): ...@@ -327,6 +375,7 @@ class _ZBigFileH(object):
zconn = self.zfile._p_jar zconn = self.zfile._p_jar
assert txn is zconn.transaction_manager.get() assert txn is zconn.transaction_manager.get()
if zconn._needs_to_join: # same as Connection._register(obj) if zconn._needs_to_join: # same as Connection._register(obj)
assert zconn not in txn._resources # (not to join twice)
txn.join(zconn) # on first obj, without registering txn.join(zconn) # on first obj, without registering
zconn._needs_to_join = False # anything. zconn._needs_to_join = False # anything.
......
...@@ -22,6 +22,8 @@ from wendelin.lib.testing import getTestDB ...@@ -22,6 +22,8 @@ from wendelin.lib.testing import getTestDB
from persistent import UPTODATE, GHOST from persistent import UPTODATE, GHOST
import transaction import transaction
from numpy import ndarray, array_equal, uint8, zeros from numpy import ndarray, array_equal, uint8, zeros
from threading import Thread
from six.moves import _thread
from pytest import raises from pytest import raises
from six.moves import range as xrange from six.moves import range as xrange
...@@ -252,3 +254,233 @@ def test_bigfile_filezodb(): ...@@ -252,3 +254,233 @@ def test_bigfile_filezodb():
dbclose(root) dbclose(root)
# Notify channel for
# - one thread to .wait('condition'), until
# - other thread does .tell('condition')
class NotifyChannel:
def __init__(self):
self.state = None
def tell(self, condition):
#print >>sys.stderr, ' tell %s\tthread_id: %s\n' \
# % (condition, _thread.get_ident()),
# wait until other thread reads previous tell
while self.state is not None:
pass
self.state = condition
def wait(self, condition):
#print >>sys.stderr, ' wait %s\tthread_id: %s\n' \
# % (condition, _thread.get_ident()),
while self.state != condition:
pass
#print >>sys.stderr, ' have %s\tthread_id: %s\n' \
# % (condition, _thread.get_ident()),
self.state = None
# connection can migrate between threads handling requests.
# verify _ZBigFileH properly adjusts.
# ( NOTE this test is almost dupped at test_zbigarray_vs_conn_migration() )
def test_bigfile_filezodb_vs_conn_migration():
root01 = dbopen()
conn01 = root01._p_jar
db = conn01.db()
conn01.close()
del root01
c12_1 = NotifyChannel() # T11 -> T21
c21_1 = NotifyChannel() # T21 -> T11
# open, modify, commit, close, open, commit
def T11():
tell, wait = c12_1.tell, c21_1.wait
conn11_1 = db.open()
assert conn11_1 is conn01
# setup zfile with ZBigArray-like satellite,
root11_1 = conn11_1.root()
root11_1['zfile2'] = f11 = ZBigFile(blksize)
transaction.commit()
root11_1['zarray2'] = a11 = LivePersistent()
a11._v_fileh = fh11 = f11.fileh_open()
transaction.commit()
# set zfile initial data
vma11 = fh11.mmap(0, 1)
Blk(vma11, 0)[0] = 11
transaction.commit()
# close conn, wait till T21 reopens it
del vma11, fh11, a11, f11, root11_1
conn11_1.close()
tell('T1-conn11_1-closed')
wait('T2-conn21-opened')
# open another connection (e.g. for handling next request) which does
# not touch zfile at all, and arrange timings so that T2 modifies
# zfile, but do not yet commit, and then commit here.
conn11_2 = db.open()
assert conn11_2 is not conn11_1
root11_2 = conn11_2.root()
wait('T2-zfile2-modified')
# XXX do we want to also modify some other objesct?
# (but this have side effect for joining conn11_2 to txn)
transaction.commit() # should be nothing
tell('T1-txn12-committed')
wait('T2-conn21-closed')
del root11_2
conn11_2.close()
# hold on this thread until main driver tells us
wait('T11-exit-command')
# open, modify, abort
def T21():
tell, wait = c21_1.tell, c12_1.wait
# - wait until T1 finish setting up initial data for zfile and closes connection.
# - open that connection before T1 is asleep - because ZODB organizes
# connection pool as stack (with correction for #active objects),
# we should get exactly the same connection T1 had.
wait('T1-conn11_1-closed')
conn21 = db.open()
assert conn21 is conn01
tell('T2-conn21-opened')
# modify zfile and arrange timings so that T1 commits after zfile is
# modified, but before we commit/abort.
root21 = conn21.root()
a21 = root21['zarray2']
fh21 = a21._v_fileh
vma21 = fh21.mmap(0, 1)
Blk(vma21, 0)[0] = 21
tell('T2-zfile2-modified')
wait('T1-txn12-committed')
# abort - zfile2 should stay unchanged
transaction.abort()
del vma21, fh21, a21, root21
conn21.close()
tell('T2-conn21-closed')
t11, t21 = Thread(target=T11), Thread(target=T21)
t11.start(); t21.start()
t11_ident = t11.ident
t21.join() # NOTE not joining t11 yet
# now verify that zfile2 stays at 11 state, i.e. T21 was really aborted
conn02 = db.open()
# NOTE top of connection stack is conn21(=conn01), becase conn11_2 has 0
# active objects
assert conn02 is conn01
root02 = conn02.root()
f02 = root02['zfile2']
# NOTE verification is done using afresh fileh to avoid depending on
# leftover state from T11/T21.
fh02 = f02.fileh_open()
vma02 = fh02.mmap(0, 1)
assert Blk(vma02, 0)[0] == 11
del vma02, fh02, f02, root02
conn02.close()
c12_2 = NotifyChannel() # T12 -> T22
c21_2 = NotifyChannel() # T22 -> T12
# open, abort
def T12():
tell, wait = c12_2.tell, c21_2.wait
wait('T2-conn22-opened')
conn12 = db.open()
tell('T1-conn12-opened')
wait('T2-zfile2-modified')
transaction.abort()
tell('T1-txn-aborted')
wait('T2-txn-committed')
conn12.close()
# open, modify, commit
def T22():
tell, wait = c21_2.tell, c12_2.wait
# make sure we are not the same thread which ran T11
# (should be so because we cared not to stop T11 yet)
assert _thread.get_ident() != t11_ident
conn22 = db.open()
assert conn22 is conn01
tell('T2-conn22-opened')
# modify zfile and arrange timings so that T1 does abort after we
# modify, but before we commit
wait('T1-conn12-opened')
root22 = conn22.root()
a22 = root22['zarray2']
fh22 = a22._v_fileh
vma22 = fh22.mmap(0, 1)
Blk(vma22, 0)[0] = 22
tell('T2-zfile2-modified')
wait('T1-txn-aborted')
# commit - changes should propagate to zfile
transaction.commit()
tell('T2-txn-committed')
conn22.close()
t12, t22 = Thread(target=T12), Thread(target=T22)
t12.start(); t22.start()
t12.join(); t22.join()
# tell T11 to stop also
c21_1.tell('T11-exit-command')
t11.join()
# now verify that zfile2 changed to 22 state, i.e. T22 was really committed
conn03 = db.open()
# NOTE top of connection stack is conn22(=conn01), becase it has most # of
# active objectd
assert conn03 is conn01
root03 = conn03.root()
f03 = root03['zfile2']
fh03 = f03.fileh_open()
vma03 = fh03.mmap(0, 1)
assert Blk(vma03, 0)[0] == 22
del vma03, fh03, f03
dbclose(root03)
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment