Connection.py 23.5 KB
Newer Older
Jim Fulton's avatar
alpha1  
Jim Fulton committed
1
##############################################################################
matt@zope.com's avatar
matt@zope.com committed
2
#
Guido van Rossum's avatar
Guido van Rossum committed
3 4
# Copyright (c) 2001, 2002 Zope Corporation and Contributors.
# All Rights Reserved.
5
#
matt@zope.com's avatar
matt@zope.com committed
6 7 8 9 10 11
# This software is subject to the provisions of the Zope Public License,
# Version 2.0 (ZPL).  A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE
12
#
Jim Fulton's avatar
alpha1  
Jim Fulton committed
13 14 15
##############################################################################
"""Database connection support

16
$Id: Connection.py,v 1.74 2002/09/07 16:40:59 jeremy Exp $"""
Jim Fulton's avatar
alpha1  
Jim Fulton committed
17

18
from cPickleCache import PickleCache, MUCH_RING_CHECKING
19
from POSException import ConflictError, ReadConflictError
20
from ExtensionClass import Base
21
import ExportImport, TmpStore
22
from zLOG import LOG, ERROR, BLATHER, WARNING
Jim Fulton's avatar
 
Jim Fulton committed
23
from coptimizations import new_persistent_id
24
from ConflictResolution import ResolvedSerial
25 26 27 28 29 30

from cPickle import Unpickler, Pickler
from cStringIO import StringIO
import sys
from time import time
from types import StringType, ClassType
31

Shane Hathaway's avatar
Shane Hathaway committed
32 33
global_code_timestamp = 0

34
if MUCH_RING_CHECKING:
35 36 37 38
    # To get rid of this warning, change the define inside
    # cPickleCache.c and recompile.
    LOG('ZODB',WARNING,
        'Using cPickleCache with low performance (but extra debugging checks)')
39 40
del MUCH_RING_CHECKING

Shane Hathaway's avatar
Shane Hathaway committed
41 42 43 44 45 46 47 48 49
def updateCodeTimestamp():
    '''
    Called after changes are made to persistence-based classes.
    Causes all connection caches to be re-created as the
    connections are reopened.
    '''
    global global_code_timestamp
    global_code_timestamp = time()

50
ExtensionKlass=Base.__class__
Jim Fulton's avatar
alpha1  
Jim Fulton committed
51

52
class Connection(ExportImport.ExportImport):
Jim Fulton's avatar
alpha1  
Jim Fulton committed
53 54 55 56 57 58 59 60
    """Object managers for individual object space.

    An object space is a version of collection of objects.  In a
    multi-threaded application, each thread get's it's own object
    space.

    The Connection manages movement of objects in and out of object storage.
    """
61
    _tmp=None
62 63
    _debug_info=()
    _opened=None
Shane Hathaway's avatar
Shane Hathaway committed
64
    _code_timestamp = 0
Jim Fulton's avatar
alpha1  
Jim Fulton committed
65

66 67 68
    # Experimental. Other connections can register to be closed
    # when we close by putting something here.

69
    def __init__(self, version='', cache_size=400,
Jim Fulton's avatar
alpha1  
Jim Fulton committed
70 71 72
                 cache_deactivate_after=60):
        """Create a new Connection"""
        self._version=version
73
        self._cache = cache = PickleCache(self, cache_size)
74 75 76 77
        if version:
            # Caches for versions end up empty if the version
            # is not used for a while. Non-version caches
            # keep their content indefinitely.
78 79 80

            # XXX Why do we want version caches to behave this way?

81
            self._cache.cache_drain_resistance = 100
82
        self._incrgc=self.cacheGC=cache.incrgc
83 84 85
        self._invalidated=d={}
        self._invalid=d.has_key
        self._committed=[]
Shane Hathaway's avatar
Shane Hathaway committed
86
        self._code_timestamp = global_code_timestamp
87 88
        self._load_count = 0   # Number of objects unghosted
        self._store_count = 0  # Number of objects stored
Jim Fulton's avatar
alpha1  
Jim Fulton committed
89

90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105
    def _cache_items(self):
        # find all items on the lru list
        items = self._cache.lru_items()
        # fine everything. some on the lru list, some not
        everything = self._cache.cache_data
        # remove those items that are on the lru list
        for k,v in items:
            del everything[k]
        # return a list of [ghosts....not recently used.....recently used]
        return everything.items() + items

    def __repr__(self):
        if self._version:
            ver = ' (in version %s)' % `self._version`
        else:
            ver = ''
106
        return '<Connection at %08x%s>' % (id(self), ver)
107

Jim Fulton's avatar
alpha1  
Jim Fulton committed
108 109 110 111 112
    def _breakcr(self):
        try: del self._cache
        except: pass
        try: del self._incrgc
        except: pass
113 114
        try: del self.cacheGC
        except: pass
Jim Fulton's avatar
alpha1  
Jim Fulton committed
115

116 117 118
    def __getitem__(self, oid, tt=type(())):
        obj = self._cache.get(oid, None)
        if obj is not None:
119
            return obj
Jim Fulton's avatar
alpha1  
Jim Fulton committed
120

121
        __traceback_info__ = (oid)
Jim Fulton's avatar
Jim Fulton committed
122
        p, serial = self._storage.load(oid, self._version)
123
        __traceback_info__ = (oid, p)
Jim Fulton's avatar
alpha1  
Jim Fulton committed
124 125 126 127
        file=StringIO(p)
        unpickler=Unpickler(file)
        unpickler.persistent_load=self._persistent_load

128 129 130 131 132
        try:
            object = unpickler.load()
        except:
            raise "Could not load oid %s, pickled data in traceback info may\
            contain clues" % (oid)
Jim Fulton's avatar
alpha1  
Jim Fulton committed
133

Jim Fulton's avatar
Jim Fulton committed
134
        klass, args = object
135 136 137 138

        if type(klass) is tt:
            module, name = klass
            klass=self._db._classFactory(self, module, name)
139

Jim Fulton's avatar
Jim Fulton committed
140 141 142
        if (args is None or
            not args and not hasattr(klass,'__getinitargs__')):
            object=klass.__basicnew__()
Jim Fulton's avatar
alpha1  
Jim Fulton committed
143
        else:
Jim Fulton's avatar
Jim Fulton committed
144
            object=apply(klass,args)
145 146
            if klass is not ExtensionKlass:
                object.__dict__.clear()
Jim Fulton's avatar
alpha1  
Jim Fulton committed
147

Jim Fulton's avatar
Jim Fulton committed
148 149 150 151
        object._p_oid=oid
        object._p_jar=self
        object._p_changed=None
        object._p_serial=serial
Jim Fulton's avatar
alpha1  
Jim Fulton committed
152

153 154
        self._cache[oid] = object
        if oid=='\0\0\0\0\0\0\0\0':
155
            self._root_=object # keep a ref
Jim Fulton's avatar
alpha1  
Jim Fulton committed
156 157 158
        return object

    def _persistent_load(self,oid,
159
                        tt=type(())):
Jim Fulton's avatar
alpha1  
Jim Fulton committed
160 161 162 163 164 165 166

        __traceback_info__=oid

        if type(oid) is tt:
            # Quick instance reference.  We know all we need to know
            # to create the instance wo hitting the db, so go for it!
            oid, klass = oid
167 168 169
            obj = self._cache.get(oid, None)
            if obj is not None:
                return obj
170 171 172 173 174 175 176 177 178

            if type(klass) is tt:
                module, name = klass
                try: klass=self._db._classFactory(self, module, name)
                except:
                    # Eek, we couldn't get the class. Hm.
                    # Maybe their's more current data in the
                    # object's actual record!
                    return self[oid]
179

Jim Fulton's avatar
Jim Fulton committed
180 181 182 183
            object=klass.__basicnew__()
            object._p_oid=oid
            object._p_jar=self
            object._p_changed=None
184

185
            self._cache[oid] = object
186

Jim Fulton's avatar
alpha1  
Jim Fulton committed
187 188
            return object

189 190 191
        obj = self._cache.get(oid, None)
        if obj is not None:
            return obj
192
        return self[oid]
Jim Fulton's avatar
alpha1  
Jim Fulton committed
193

194
    def _setDB(self, odb):
Jim Fulton's avatar
alpha1  
Jim Fulton committed
195 196 197
        """Begin a new transaction.

        Any objects modified since the last transaction are invalidated.
Shane Hathaway's avatar
Shane Hathaway committed
198
        """
Jim Fulton's avatar
alpha1  
Jim Fulton committed
199
        self._db=odb
200 201
        self._storage=s=odb._storage
        self.new_oid=s.new_oid
Shane Hathaway's avatar
Shane Hathaway committed
202 203 204 205 206
        if self._code_timestamp != global_code_timestamp:
            # New code is in place.  Start a new cache.
            self._resetCache()
        else:
            self._cache.invalidate(self._invalidated)
207
        self._opened=time()
Jim Fulton's avatar
alpha1  
Jim Fulton committed
208 209 210

        return self

Shane Hathaway's avatar
Shane Hathaway committed
211 212 213 214 215 216 217
    def _resetCache(self):
        '''
        Creates a new cache, discarding the old.
        '''
        self._code_timestamp = global_code_timestamp
        self._invalidated.clear()
        orig_cache = self._cache
218
        self._cache = PickleCache(self, orig_cache.cache_size)
Shane Hathaway's avatar
Shane Hathaway committed
219

Jim Fulton's avatar
Jim Fulton committed
220 221 222 223 224
    def abort(self, object, transaction):
        """Abort the object in the transaction.

        This just deactivates the thing.
        """
225 226 227
        if object is self:
            self._cache.invalidate(self._invalidated)
        else:
228 229
            assert object._p_oid is not None
            self._cache.invalidate(object._p_oid)
Jim Fulton's avatar
Jim Fulton committed
230

231 232
    def cacheFullSweep(self, dt=0):
        self._cache.full_sweep(dt)
233

234
    def cacheMinimize(self, dt=0):
235 236
        # dt is ignored
        self._cache.minimize()
237

238
    __onCloseCallbacks = None
239

240
    def onCloseCallback(self, f):
241 242 243
        if self.__onCloseCallbacks is None:
            self.__onCloseCallbacks = []
        self.__onCloseCallbacks.append(f)
244

Jim Fulton's avatar
alpha1  
Jim Fulton committed
245
    def close(self):
Jim Fulton's avatar
Jim Fulton committed
246
        self._incrgc() # This is a good time to do some GC
247
        db=self._db
Jim Fulton's avatar
alpha1  
Jim Fulton committed
248

249
        # Call the close callbacks.
250 251 252 253 254 255 256 257
        if self.__onCloseCallbacks is not None:
            for f in self.__onCloseCallbacks:
                try: f()
                except:
                    f=getattr(f, 'im_self', f)
                    LOG('ZODB',ERROR, 'Close callback failed for %s' % f,
                        error=sys.exc_info())
            self.__onCloseCallbacks = None
258 259 260 261
        self._db=self._storage=self._tmp=self.new_oid=self._opened=None
        self._debug_info=()
        # Return the connection to the pool.
        db._closeConnection(self)
262

263
    __onCommitActions = None
264

265
    def onCommitAction(self, method_name, *args, **kw):
266 267 268
        if self.__onCommitActions is None:
            self.__onCommitActions = []
        self.__onCommitActions.append((method_name, args, kw))
269 270
        get_transaction().register(self)

271
    def commit(self, object, transaction):
272
        if object is self:
273
            # We registered ourself.  Execute a commit action, if any.
274 275
            if self.__onCommitActions is not None:
                method_name, args, kw = self.__onCommitActions.pop(0)
276 277
                apply(getattr(self, method_name), (transaction,) + args, kw)
            return
Jeremy Hylton's avatar
Jeremy Hylton committed
278 279
        oid = object._p_oid
        invalid = self._invalid
Jim Fulton's avatar
 
Jim Fulton committed
280
        if oid is None or object._p_jar is not self:
281
            # new object
Jim Fulton's avatar
 
Jim Fulton committed
282
            oid = self.new_oid()
Jeremy Hylton's avatar
Jeremy Hylton committed
283 284
            object._p_jar = self
            object._p_oid = oid
285
            self._creating.append(oid)
Jim Fulton's avatar
alpha1  
Jim Fulton committed
286

Jim Fulton's avatar
 
Jim Fulton committed
287
        elif object._p_changed:
288
            if invalid(oid) and not hasattr(object, '_p_resolveConflict'):
289
                raise ConflictError(object=object)
Jim Fulton's avatar
 
Jim Fulton committed
290
            self._invalidating.append(oid)
291

Jim Fulton's avatar
 
Jim Fulton committed
292 293 294 295
        else:
            # Nothing to do
            return

Jeremy Hylton's avatar
Jeremy Hylton committed
296
        stack = [object]
Jim Fulton's avatar
 
Jim Fulton committed
297 298 299 300 301 302 303 304 305

        # Create a special persistent_id that passes T and the subobject
        # stack along:
        #
        # def persistent_id(object,
        #                   self=self,
        #                   stackup=stackup, new_oid=self.new_oid):
        #     if (not hasattr(object, '_p_oid') or
        #         type(object) is ClassType): return None
306
        #
Jim Fulton's avatar
 
Jim Fulton committed
307
        #     oid=object._p_oid
308
        #
Jim Fulton's avatar
 
Jim Fulton committed
309 310 311 312 313
        #     if oid is None or object._p_jar is not self:
        #         oid = self.new_oid()
        #         object._p_jar=self
        #         object._p_oid=oid
        #         stackup(object)
314
        #
Jim Fulton's avatar
 
Jim Fulton committed
315
        #     klass=object.__class__
316
        #
Jim Fulton's avatar
 
Jim Fulton committed
317
        #     if klass is ExtensionKlass: return oid
318
        #
Jim Fulton's avatar
 
Jim Fulton committed
319
        #     if hasattr(klass, '__getinitargs__'): return oid
320
        #
Jim Fulton's avatar
 
Jim Fulton committed
321 322
        #     module=getattr(klass,'__module__','')
        #     if module: klass=module, klass.__name__
323
        #
Jim Fulton's avatar
 
Jim Fulton committed
324
        #     return oid, klass
325

Jim Fulton's avatar
 
Jim Fulton committed
326 327 328 329 330 331 332
        file=StringIO()
        seek=file.seek
        pickler=Pickler(file,1)
        pickler.persistent_id=new_persistent_id(self, stack.append)
        dbstore=self._storage.store
        file=file.getvalue
        cache=self._cache
333
        get=cache.get
Jim Fulton's avatar
 
Jim Fulton committed
334 335
        dump=pickler.dump
        clear_memo=pickler.clear_memo
336

337

Jim Fulton's avatar
 
Jim Fulton committed
338
        version=self._version
339

Jim Fulton's avatar
 
Jim Fulton committed
340 341 342 343 344
        while stack:
            object=stack[-1]
            del stack[-1]
            oid=object._p_oid
            serial=getattr(object, '_p_serial', '\0\0\0\0\0\0\0\0')
345 346 347 348 349
            if serial == '\0\0\0\0\0\0\0\0':
                # new object
                self._creating.append(oid)
            else:
                #XXX We should never get here
350
                if invalid(oid) and not hasattr(object, '_p_resolveConflict'):
351
                    raise ConflictError(object=object)
352
                self._invalidating.append(oid)
353

Jim Fulton's avatar
 
Jim Fulton committed
354
            klass = object.__class__
355

Jim Fulton's avatar
 
Jim Fulton committed
356 357 358 359 360 361 362 363 364 365 366 367 368
            if klass is ExtensionKlass:
                # Yee Ha!
                dict={}
                dict.update(object.__dict__)
                del dict['_p_jar']
                args=object.__name__, object.__bases__, dict
                state=None
            else:
                if hasattr(klass, '__getinitargs__'):
                    args = object.__getinitargs__()
                    len(args) # XXX Assert it's a sequence
                else:
                    args = None # New no-constructor protocol!
369

370
                module=getattr(klass,'__module__','')
371
                if module: klass=module, klass.__name__
Jim Fulton's avatar
 
Jim Fulton committed
372 373
                __traceback_info__=klass, oid, self._version
                state=object.__getstate__()
374

Jim Fulton's avatar
 
Jim Fulton committed
375 376 377 378 379
            seek(0)
            clear_memo()
            dump((klass,args))
            dump(state)
            p=file(1)
380
            s=dbstore(oid,serial,p,version,transaction)
381
            self._store_count = self._store_count + 1
382 383 384
            # Put the object in the cache before handling the
            # response, just in case the response contains the
            # serial number for a newly created object
Jim Fulton's avatar
 
Jim Fulton committed
385 386 387 388 389
            try: cache[oid]=object
            except:
                # Dang, I bet its wrapped:
                if hasattr(object, 'aq_base'):
                    cache[oid]=object.aq_base
390 391
                else:
                    raise
Jim Fulton's avatar
alpha1  
Jim Fulton committed
392

393 394
            self._handle_serial(s, oid)

395
    def commit_sub(self, t):
396
        """Commit all work done in subtransactions"""
397
        tmp=self._tmp
398
        if tmp is None: return
399
        src=self._storage
400 401 402

        LOG('ZODB', BLATHER,
            'Commiting subtransaction of size %s' % src.getSize())
403

404
        self._storage=tmp
405
        self._tmp=None
406 407

        tmp.tpc_begin(t)
408

409 410 411 412
        load=src.load
        store=tmp.store
        dest=self._version
        get=self._cache.get
413
        oids=src._index.keys()
414 415

        # Copy invalidating and creating info from temporary storage:
416 417
        invalidating=self._invalidating
        invalidating[len(invalidating):]=oids
418 419
        creating=self._creating
        creating[len(creating):]=src._creating
420

421
        for oid in oids:
422 423
            data, serial = load(oid, src)
            s=store(oid, serial, data, dest, t)
424
            self._handle_serial(s, oid, change=0)
425 426

    def abort_sub(self, t):
427
        """Abort work done in subtransactions"""
428 429 430 431 432
        tmp=self._tmp
        if tmp is None: return
        src=self._storage
        self._tmp=None
        self._storage=tmp
433

434
        self._cache.invalidate(src._index.keys())
435 436 437 438 439 440 441 442 443 444 445 446 447 448
        self._invalidate_creating(src._creating)

    def _invalidate_creating(self, creating=None):
        """Dissown any objects newly saved in an uncommitted transaction.
        """
        if creating is None:
            creating=self._creating
            self._creating=[]

        cache=self._cache
        cache_get=cache.get
        for oid in creating:
            o=cache_get(oid, None)
            if o is not None:
449
                del cache[oid]
450 451 452 453
                del o._p_jar
                del o._p_oid

    #XXX
454

Jim Fulton's avatar
alpha1  
Jim Fulton committed
455 456 457
    def db(self): return self._db

    def getVersion(self): return self._version
458

Jim Fulton's avatar
alpha1  
Jim Fulton committed
459 460 461 462 463 464 465
    def invalidate(self, oid):
        """Invalidate a particular oid

        This marks the oid as invalid, but doesn't actually invalidate
        it.  The object data will be actually invalidated at certain
        transaction boundaries.
        """
466 467
        assert oid is not None
        self._invalidated[oid] = 1
Jim Fulton's avatar
alpha1  
Jim Fulton committed
468

469 470 471 472
    def modifiedInVersion(self, oid):
        try: return self._db.modifiedInVersion(oid)
        except KeyError:
            return self._version
Jim Fulton's avatar
alpha1  
Jim Fulton committed
473

474 475 476 477 478 479 480
    def register(self, object):
        """Register an object with the appropriate transaction manager.

        A subclass could override this method to customize the default
        policy of one transaction manager for each thread.
        """
        assert object._p_jar is self
Jeremy Hylton's avatar
Jeremy Hylton committed
481 482
        # XXX Figure out why this assert causes test failures
        # assert object._p_oid is not None
483 484 485 486
        get_transaction().register(object)

    def root(self):
        return self['\0\0\0\0\0\0\0\0']
Jim Fulton's avatar
alpha1  
Jim Fulton committed
487

488
    def setstate(self, object):
489
        oid = object._p_oid
490 491

        if self._storage is None:
492 493 494
            msg = ("Shouldn't load state for %s "
                   "when the connection is closed" % `oid`)
            LOG('ZODB', ERROR, msg)
495 496
            raise RuntimeError(msg)

497
        try:
498
            p, serial = self._storage.load(oid, self._version)
499
            self._load_count = self._load_count + 1
500 501 502 503 504 505 506 507 508 509 510

            # XXX this is quite conservative!
            # We need, however, to avoid reading data from a transaction
            # that committed after the current "session" started, as
            # that might lead to mixing of cached data from earlier
            # transactions and new inconsistent data.
            #
            # Note that we (carefully) wait until after we call the
            # storage to make sure that we don't miss an invaildation
            # notifications between the time we check and the time we
            # read.
511
            if self._invalid(oid):
512 513
                if not hasattr(object.__class__, '_p_independent'):
                    get_transaction().register(self)
514
                    raise ReadConflictError(object=object)
515
                invalid = 1
516
            else:
517
                invalid = 0
518

519 520 521
            file = StringIO(p)
            unpickler = Unpickler(file)
            unpickler.persistent_load = self._persistent_load
522 523
            unpickler.load()
            state = unpickler.load()
524 525 526 527

            if hasattr(object, '__setstate__'):
                object.__setstate__(state)
            else:
528 529 530
                d = object.__dict__
                for k, v in state.items():
                    d[k] = v
531

532
            object._p_serial = serial
533

534 535
            if invalid:
                if object._p_independent():
536 537 538 539
                    try:
                        del self._invalidated[oid]
                    except KeyError:
                        pass
540 541
                else:
                    get_transaction().register(self)
542
                    raise ConflictError(object=object)
543

544 545
        except ConflictError:
            raise
546
        except:
547
            LOG('ZODB',ERROR, "Couldn't load state for %s" % `oid`,
548
                error=sys.exc_info())
549
            raise
550

551 552 553 554 555 556 557 558 559
    def oldstate(self, object, serial):
        oid=object._p_oid
        p = self._storage.loadSerial(oid, serial)
        file=StringIO(p)
        unpickler=Unpickler(file)
        unpickler.persistent_load=self._persistent_load
        unpickler.load()
        return  unpickler.load()

560
    def setklassstate(self, object):
561 562 563 564 565 566 567
        try:
            oid=object._p_oid
            __traceback_info__=oid
            p, serial = self._storage.load(oid, self._version)
            file=StringIO(p)
            unpickler=Unpickler(file)
            unpickler.persistent_load=self._persistent_load
568

569
            copy = unpickler.load()
570

571
            klass, args = copy
572

573 574 575 576 577
            if klass is not ExtensionKlass:
                LOG('ZODB',ERROR,
                    "Unexpected klass when setting class state on %s"
                    % getattr(object,'__name__','(?)'))
                return
578

579 580 581
            copy=apply(klass,args)
            object.__dict__.clear()
            object.__dict__.update(copy.__dict__)
582

583 584 585 586 587
            object._p_oid=oid
            object._p_jar=self
            object._p_changed=0
            object._p_serial=serial
        except:
588
            LOG('ZODB',ERROR, 'setklassstate failed', error=sys.exc_info())
589
            raise
590

Jim Fulton's avatar
alpha1  
Jim Fulton committed
591
    def tpc_abort(self, transaction):
Andreas Jung's avatar
Andreas Jung committed
592
        if self.__onCommitActions is not None:
593
            del self.__onCommitActions
Jim Fulton's avatar
alpha1  
Jim Fulton committed
594 595
        self._storage.tpc_abort(transaction)
        cache=self._cache
596 597
        cache.invalidate(self._invalidated)
        cache.invalidate(self._invalidating)
598
        self._invalidate_creating()
Jim Fulton's avatar
alpha1  
Jim Fulton committed
599

600
    def tpc_begin(self, transaction, sub=None):
Jeremy Hylton's avatar
Jeremy Hylton committed
601 602
        self._invalidating = []
        self._creating = []
603 604 605 606 607 608 609 610 611 612

        if sub:
            # Sub-transaction!
            _tmp=self._tmp
            if _tmp is None:
                _tmp=TmpStore.TmpStore(self._version)
                self._tmp=self._storage
                self._storage=_tmp
                _tmp.registerDB(self._db, 0)

Jim Fulton's avatar
alpha1  
Jim Fulton committed
613 614
        self._storage.tpc_begin(transaction)

615 616 617 618 619 620 621 622
    def tpc_vote(self, transaction):
        if self.__onCommitActions is not None:
            del self.__onCommitActions
        try:
            vote=self._storage.tpc_vote
        except AttributeError:
            return
        s = vote(transaction)
623 624 625 626 627 628 629 630 631 632 633 634 635
        self._handle_serial(s)

    def _handle_serial(self, store_return, oid=None, change=1):
        """Handle the returns from store() and tpc_vote() calls."""

        # These calls can return different types depending on whether
        # ZEO is used.  ZEO uses asynchronous returns that may be
        # returned in batches by the ClientStorage.  ZEO1 can also
        # return an exception object and expect that the Connection
        # will raise the exception.

        # When commit_sub() exceutes a store, there is no need to
        # update the _p_changed flag, because the subtransaction
Jeremy Hylton's avatar
Jeremy Hylton committed
636
        # tpc_vote() calls already did this.  The change=1 argument
637
        # exists to allow commit_sub() to avoid setting the flag
638
        # again.
639 640 641 642 643 644
        if not store_return:
            return
        if isinstance(store_return, StringType):
            assert oid is not None
            serial = store_return
            obj = self._cache.get(oid, None)
645 646
            if obj is None:
                return
647 648 649 650 651 652 653 654 655 656 657 658 659 660 661 662 663 664 665 666
            if serial == ResolvedSerial:
                obj._p_changed = None
            else:
                if change:
                    obj._p_changed = 0
                obj._p_serial = serial
        else:
            for oid, serial in store_return:
                if not isinstance(serial, StringType):
                    raise serial
                obj = self._cache.get(oid, None)
                if obj is None:
                    continue
                if serial == ResolvedSerial:
                    obj._p_changed = None
                else:
                    if change:
                        obj._p_changed = 0
                    obj._p_serial = serial

Jim Fulton's avatar
alpha1  
Jim Fulton committed
667
    def tpc_finish(self, transaction):
668
        # It's important that the storage call the function we pass
669 670 671 672
        # (self._invalidate_invalidating) while it still has it's
        # lock.  We don't want another thread to be able to read any
        # updated data until we've had a chance to send an
        # invalidation message to all of the other connections!
673 674 675 676

        if self._tmp is not None:
            # Commiting a subtransaction!
            # There is no need to invalidate anything.
677
            self._storage.tpc_finish(transaction)
678 679 680
            self._storage._creating[:0]=self._creating
            del self._creating[:]
        else:
681
            self._db.begin_invalidation()
682 683 684
            self._storage.tpc_finish(transaction,
                                     self._invalidate_invalidating)

685
        self._cache.invalidate(self._invalidated)
Jim Fulton's avatar
Jim Fulton committed
686
        self._incrgc() # This is a good time to do some GC
Jim Fulton's avatar
alpha1  
Jim Fulton committed
687

688
    def _invalidate_invalidating(self):
689
        for oid in self._invalidating:
690 691
            assert oid is not None
            self._db.invalidate(oid, self)
692
        self._db.finish_invalidation()
693

694 695
    def sync(self):
        get_transaction().abort()
696 697
        sync=getattr(self._storage, 'sync', 0)
        if sync != 0: sync()
698
        self._cache.invalidate(self._invalidated)
Jim Fulton's avatar
Jim Fulton committed
699
        self._incrgc() # This is a good time to do some GC
700

701 702
    def getDebugInfo(self):
        return self._debug_info
703

704 705
    def setDebugInfo(self, *args):
        self._debug_info = self._debug_info + args
706

707 708 709 710 711
    def getTransferCounts(self, clear=0):
        """Returns the number of objects loaded and stored.

        Set the clear argument to reset the counters.
        """
712
        res = self._load_count, self._store_count
713 714 715 716 717
        if clear:
            self._load_count = 0
            self._store_count = 0
        return res

718 719 720 721 722 723 724 725 726

    ######################################################################
    # Just plain weird. Don't try this at home kids.
    def exchange(self, old, new):
        oid=old._p_oid
        new._p_oid=oid
        new._p_jar=self
        new._p_changed=1
        get_transaction().register(new)
727
        self._cache[oid]=new
728

Jim Fulton's avatar
alpha1  
Jim Fulton committed
729 730 731 732
class tConnection(Connection):

    def close(self):
        self._breakcr()