ClientStorage.py 42.4 KB
Newer Older
1
##############################################################################
2
#
3
# Copyright (c) 2001, 2002, 2003 Zope Corporation and Contributors.
4
# All Rights Reserved.
5
#
6
# This software is subject to the provisions of the Zope Public License,
Jim Fulton's avatar
Jim Fulton committed
7
# Version 2.1 (ZPL).  A copy of the ZPL should accompany this distribution.
8 9 10 11
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE
12
#
13
##############################################################################
14
"""The ClientStorage class and the exceptions that it may raise.
15

16 17 18
Public contents of this module:

ClientStorage -- the main class, implementing the Storage API
19
"""
Jim Fulton's avatar
Jim Fulton committed
20

Jeremy Hylton's avatar
Jeremy Hylton committed
21 22
import cPickle
import os
23
import socket
Jeremy Hylton's avatar
Jeremy Hylton committed
24 25 26
import tempfile
import threading
import time
27
import types
28
import logging
29

30 31
from ZEO import ServerStub
from ZEO.cache import ClientCache
Jeremy Hylton's avatar
Jeremy Hylton committed
32
from ZEO.TransactionBuffer import TransactionBuffer
Martijn Faassen's avatar
Martijn Faassen committed
33
from ZEO.Exceptions import ClientStorageError, ClientDisconnected, AuthError
34
from ZEO.auth import get_module
Jeremy Hylton's avatar
Jeremy Hylton committed
35
from ZEO.zrpc.client import ConnectionManager
36

Jeremy Hylton's avatar
Jeremy Hylton committed
37
from ZODB import POSException
38
from ZODB.loglevels import BLATHER
39
from persistent.TimeStamp import TimeStamp
Jim Fulton's avatar
Jim Fulton committed
40

41 42 43 44 45 46
logger = logging.getLogger('ZEO.ClientStorage')
_pid = str(os.getpid())

def log2(msg, level=logging.INFO, subsys=_pid, exc_info=False):
    message = "(%s) %s" % (subsys, msg)
    logger.log(level, message, exc_info=exc_info)
47

48 49
try:
    from ZODB.ConflictResolution import ResolvedSerial
Jeremy Hylton's avatar
Jeremy Hylton committed
50 51
except ImportError:
    ResolvedSerial = 'rs'
Jim Fulton's avatar
Jim Fulton committed
52

53 54 55
def tid2time(tid):
    return str(TimeStamp(tid))

Jeremy Hylton's avatar
Jeremy Hylton committed
56
def get_timestamp(prev_ts=None):
57 58 59 60 61 62
    """Internal helper to return a unique TimeStamp instance.

    If the optional argument is not None, it must be a TimeStamp; the
    return value is then guaranteed to be at least 1 microsecond later
    the argument.
    """
Jeremy Hylton's avatar
Jeremy Hylton committed
63
    t = time.time()
64
    t = TimeStamp(*time.gmtime(t)[:5] + (t % 60,))
Jeremy Hylton's avatar
Jeremy Hylton committed
65 66 67
    if prev_ts is not None:
        t = t.laterThan(prev_ts)
    return t
68

Jeremy Hylton's avatar
Jeremy Hylton committed
69
class DisconnectedServerStub:
70 71 72 73 74 75 76
    """Internal helper class used as a faux RPC stub when disconnected.

    This raises ClientDisconnected on all attribute accesses.

    This is a singleton class -- there should be only one instance,
    the global disconnected_stub, os it can be tested by identity.
    """
77

Jeremy Hylton's avatar
Jeremy Hylton committed
78 79
    def __getattr__(self, attr):
        raise ClientDisconnected()
80

81
# Singleton instance of DisconnectedServerStub
Jeremy Hylton's avatar
Jeremy Hylton committed
82
disconnected_stub = DisconnectedServerStub()
83

84 85
MB = 1024**2

86
class ClientStorage(object):
87

88 89 90 91 92 93 94 95
    """A Storage class that is a network client to a remote storage.

    This is a faithful implementation of the Storage API.

    This class is thread-safe; transactions are serialized in
    tpc_begin().
    """

96 97 98
    # Classes we instantiate.  A subclass might override.

    TransactionBufferClass = TransactionBuffer
99
    ClientCacheClass = ClientCache
100 101 102
    ConnectionManagerClass = ConnectionManager
    StorageServerStubClass = ServerStub.StorageServer

103
    def __init__(self, addr, storage='1', cache_size=20 * MB,
104
                 name='', client=None, debug=0, var=None,
Jeremy Hylton's avatar
Jeremy Hylton committed
105
                 min_disconnect_poll=5, max_disconnect_poll=300,
106
                 wait_for_server_on_startup=None, # deprecated alias for wait
107
                 wait=None, wait_timeout=None,
108 109
                 read_only=0, read_only_fallback=0,
                 username='', password='', realm=None):
110 111 112 113 114 115 116 117
        """ClientStorage constructor.

        This is typically invoked from a custom_zodb.py file.

        All arguments except addr should be keyword arguments.
        Arguments:

        addr -- The server address(es).  This is either a list of
118 119
            addresses or a single address.  Each address can be a
            (hostname, port) tuple to signify a TCP/IP connection or
120 121 122 123
            a pathname string to signify a Unix domain socket
            connection.  A hostname may be a DNS name or a dotted IP
            address.  Required.

124
        storage -- The storage name, defaulting to '1'.  The name must
125
            match one of the storage names supported by the server(s)
126 127
            specified by the addr argument.  The storage name is
            displayed in the Zope control panel.
128 129 130 131 132 133 134

        cache_size -- The disk cache size, defaulting to 20 megabytes.
            This is passed to the ClientCache constructor.

        name -- The storage name, defaulting to ''.  If this is false,
            str(addr) is used as the storage name.

135 136
        client -- A name used to construct persistent cache filenames.
            Defaults to None, in which case the cache is not persistent.
137
            See ClientCache for more info.
138 139 140 141

        debug -- Ignored.  This is present only for backwards
            compatibility with ZEO 1.

142 143 144
        var -- When client is not None, this specifies the directory
            where the persistent cache files are created.  It defaults
            to None, in whichcase the current directory is used.
145 146 147 148 149 150 151 152 153 154 155 156 157 158 159

        min_disconnect_poll -- The minimum delay in seconds between
            attempts to connect to the server, in seconds.  Defaults
            to 5 seconds.

        max_disconnect_poll -- The maximum delay in seconds between
            attempts to connect to the server, in seconds.  Defaults
            to 300 seconds.

        wait_for_server_on_startup -- A backwards compatible alias for
            the wait argument.

        wait -- A flag indicating whether to wait until a connection
            with a server is made, defaulting to true.

160 161 162
        wait_timeout -- Maximum time to wait for a connection before
            giving up.  Only meaningful if wait is True.

163 164 165 166 167 168 169 170 171
        read_only -- A flag indicating whether this should be a
            read-only storage, defaulting to false (i.e. writing is
            allowed by default).

        read_only_fallback -- A flag indicating whether a read-only
            remote storage should be acceptable as a fallback when no
            writable storages are available.  Defaults to false.  At
            most one of read_only and read_only_fallback should be
            true.
172 173 174 175

        username -- string with username to be used when authenticating.
            These only need to be provided if you are connecting to an
            authenticated server storage.
Tim Peters's avatar
Tim Peters committed
176

177 178 179 180 181 182
        password -- string with plaintext password to be used
            when authenticated.

        Note that the authentication protocol is defined by the server
        and is detected by the ClientStorage upon connecting (see
        testConnection() and doAuth() for details).
183 184
        """

185
        log2("%s (pid=%d) created %s/%s for storage: %r" %
186 187
             (self.__class__.__name__,
              os.getpid(),
188 189 190 191
              read_only and "RO" or "RW",
              read_only_fallback and "fallback" or "normal",
              storage))

192
        if debug:
193
            log2("ClientStorage(): debug argument is no longer used")
194 195 196 197 198

        # wait defaults to True, but wait_for_server_on_startup overrides
        # if not None
        if wait_for_server_on_startup is not None:
            if wait is not None and wait != wait_for_server_on_startup:
199 200 201
                log2("ClientStorage(): conflicting values for wait and "
                     "wait_for_server_on_startup; wait prevails",
                     level=logging.WARNING)
202
            else:
203
                log2("ClientStorage(): wait_for_server_on_startup "
204 205 206 207 208
                     "is deprecated; please use wait instead")
                wait = wait_for_server_on_startup
        elif wait is None:
            wait = 1

209
        self._addr = addr # For tests
210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227

        # A ZEO client can run in disconnected mode, using data from
        # its cache, or in connected mode.  Several instance variables
        # are related to whether the client is connected.

        # _server: All method calls are invoked through the server
        #    stub.  When not connect, set to disconnected_stub an
        #    object that raises ClientDisconnected errors.

        # _ready: A threading Event that is set only if _server
        #    is set to a real stub.

        # _connection: The current zrpc connection or None.

        # _connection is set as soon as a connection is established,
        # but _server is set only after cache verification has finished
        # and clients can safely use the server.  _pending_server holds
        # a server stub while it is being verified.
Tim Peters's avatar
Tim Peters committed
228

Jeremy Hylton's avatar
Jeremy Hylton committed
229
        self._server = disconnected_stub
230 231 232
        self._connection = None
        self._pending_server = None
        self._ready = threading.Event()
233 234

        # _is_read_only stores the constructor argument
Jeremy Hylton's avatar
Jeremy Hylton committed
235
        self._is_read_only = read_only
236
        # _conn_is_read_only stores the status of the current connection
Jeremy Hylton's avatar
Jeremy Hylton committed
237
        self._conn_is_read_only = 0
Jeremy Hylton's avatar
Jeremy Hylton committed
238
        self._storage = storage
239
        self._read_only_fallback = read_only_fallback
240 241 242
        self._username = username
        self._password = password
        self._realm = realm
243 244 245 246 247

        # Flag tracking disconnections in the middle of a transaction.  This
        # is reset in tpc_begin() and set in notifyDisconnected().
        self._midtxn_disconnect = 0

248 249
        # _server_addr is used by sortKey()
        self._server_addr = None
250 251
        self._tfile = None
        self._pickler = None
252

Jeremy Hylton's avatar
Jeremy Hylton committed
253
        self._info = {'length': 0, 'size': 0, 'name': 'ZEO Client',
254 255
                      'supportsUndo':0, 'supportsVersions': 0,
                      'supportsTransactionalUndo': 0}
256

257
        self._tbuf = self.TransactionBufferClass()
Jeremy Hylton's avatar
Jeremy Hylton committed
258
        self._db = None
259
        self._ltid = None # the last committed transaction
Guido van Rossum's avatar
Guido van Rossum committed
260

Jeremy Hylton's avatar
Jeremy Hylton committed
261 262 263
        # _serials: stores (oid, serialno) as returned by server
        # _seriald: _check_serials() moves from _serials to _seriald,
        #           which maps oid to serialno
264

265
        # TODO:  If serial number matches transaction id, then there is
266 267 268 269
        # no need to have all this extra infrastructure for handling
        # serial numbers.  The vote call can just return the tid.
        # If there is a conflict error, we can't have a special method
        # called just to propagate the error.
Jeremy Hylton's avatar
Jeremy Hylton committed
270 271
        self._serials = []
        self._seriald = {}
272

Guido van Rossum's avatar
Guido van Rossum committed
273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290
        self.__name__ = name or str(addr) # Standard convention for storages

        # A ClientStorage only allows one thread to commit at a time.
        # Mutual exclusion is achieved using _tpc_cond, which
        # protects _transaction.  A thread that wants to assign to
        # self._transaction must acquire _tpc_cond first.  A thread
        # that decides it's done with a transaction (whether via success
        # or failure) must set _transaction to None and do
        # _tpc_cond.notify() before releasing _tpc_cond.
        self._tpc_cond = threading.Condition()
        self._transaction = None

        # Prevent multiple new_oid calls from going out.  The _oids
        # variable should only be modified while holding the
        # _oid_lock.
        self._oid_lock = threading.Lock()
        self._oids = [] # Object ids retrieved from new_oids()

291 292 293 294 295 296 297 298 299
        # load() and tpc_finish() must be serialized to guarantee
        # that cache modifications from each occur atomically.
        # It also prevents multiple load calls occuring simultaneously,
        # which simplifies the cache logic.
        self._load_lock = threading.Lock()
        # _load_oid and _load_status are protected by _lock
        self._load_oid = None
        self._load_status = None

300 301 302 303 304 305
        # Can't read data in one thread while writing data
        # (tpc_finish) in another thread.  In general, the lock
        # must prevent access to the cache while _update_cache
        # is executing.
        self._lock = threading.Lock()

Jeremy Hylton's avatar
Jeremy Hylton committed
306
        # Decide whether to use non-temporary files
307 308 309 310 311
        if client is not None:
            dir = var or os.getcwd()
            cache_path = os.path.join(dir, "%s-%s.zec" % (client, storage))
        else:
            cache_path = None
312
        self._cache = self.ClientCacheClass(cache_path, size=cache_size)
313
        # TODO:  maybe there's a better time to open the cache?  Unclear.
314
        self._cache.open()
Jeremy Hylton's avatar
Jeremy Hylton committed
315

316 317 318
        self._rpc_mgr = self.ConnectionManagerClass(addr, self,
                                                    tmin=min_disconnect_poll,
                                                    tmax=max_disconnect_poll)
Jeremy Hylton's avatar
Jeremy Hylton committed
319 320

        if wait:
321
            self._wait(wait_timeout)
Jeremy Hylton's avatar
Jeremy Hylton committed
322
        else:
323 324 325
            # attempt_connect() will make an attempt that doesn't block
            # "too long," for a very vague notion of too long.  If that
            # doesn't succeed, call connect() to start a thread.
Jeremy Hylton's avatar
Jeremy Hylton committed
326 327
            if not self._rpc_mgr.attempt_connect():
                self._rpc_mgr.connect()
328

329 330 331
    def _wait(self, timeout=None):
        if timeout is not None:
            deadline = time.time() + timeout
332
            log2("Setting deadline to %f" % deadline, level=BLATHER)
333 334
        else:
            deadline = None
335 336 337 338 339 340 341 342 343 344 345 346
        # Wait for a connection to be established.
        self._rpc_mgr.connect(sync=1)
        # When a synchronous connect() call returns, there is
        # a valid _connection object but cache validation may
        # still be going on.  This code must wait until validation
        # finishes, but if the connection isn't a zrpc async
        # connection it also needs to poll for input.
        if self._connection.is_async():
            while 1:
                self._ready.wait(30)
                if self._ready.isSet():
                    break
347
                if timeout and time.time() > deadline:
348 349
                    log2("Timed out waiting for connection",
                         level=logging.WARNING)
350
                    break
351
                log2("Waiting for cache verification to finish")
352
        else:
353
            self._wait_sync(deadline)
354

355
    def _wait_sync(self, deadline=None):
356 357 358 359 360 361 362
        # Log no more than one "waiting" message per LOG_THROTTLE seconds.
        LOG_THROTTLE = 300 # 5 minutes
        next_log_time = time.time()

        while not self._ready.isSet():
            now = time.time()
            if deadline and now > deadline:
363
                log2("Timed out waiting for connection", level=logging.WARNING)
364
                break
365 366 367
            if now >= next_log_time:
                log2("Waiting for cache verification to finish")
                next_log_time = now + LOG_THROTTLE
368 369 370
            if self._connection is None:
                # If the connection was closed while we were
                # waiting for it to become ready, start over.
371 372 373 374 375 376 377 378
                if deadline is None:
                    timeout = None
                else:
                    timeout = deadline - now
                return self._wait(timeout)
            # No mainloop ia running, so we need to call something fancy to
            # handle asyncore events.
            self._connection.pending(30)
379

Jeremy Hylton's avatar
Jeremy Hylton committed
380
    def close(self):
381
        """Storage API: finalize the storage, releasing external resources."""
382
        self._tbuf.close()
Jeremy Hylton's avatar
Jeremy Hylton committed
383 384
        if self._cache is not None:
            self._cache.close()
385 386 387 388
            self._cache = None
        if self._rpc_mgr is not None:
            self._rpc_mgr.close()
            self._rpc_mgr = None
389

Jeremy Hylton's avatar
Jeremy Hylton committed
390
    def registerDB(self, db, limit):
391 392 393 394 395 396
        """Storage API: register a database for invalidation messages.

        This is called by ZODB.DB (and by some tests).

        The storage isn't really ready to use until after this call.
        """
Jeremy Hylton's avatar
Jeremy Hylton committed
397
        self._db = db
398

Jeremy Hylton's avatar
Jeremy Hylton committed
399
    def is_connected(self):
400
        """Return whether the storage is currently connected to a server."""
401 402 403
        # This function is used by clients, so we only report that a
        # connection exists when the connection is ready to use.
        return self._ready.isSet()
404

405
    def sync(self):
406 407 408 409
        """Handle any pending invalidation messages.

        This is called by the sync method in ZODB.Connection.
        """
410 411 412
        # If there is no connection, return immediately.  Technically,
        # there are no pending invalidations so they are all handled.
        # There doesn't seem to be much benefit to raising an exception.
Tim Peters's avatar
Tim Peters committed
413

414 415 416
        cn = self._connection
        if cn is not None:
            cn.pending()
417

418 419
    def doAuth(self, protocol, stub):
        if not (self._username and self._password):
420
            raise AuthError("empty username or password")
421 422 423

        module = get_module(protocol)
        if not module:
424 425
            log2("%s: no such an auth protocol: %s" %
                 (self.__class__.__name__, protocol), level=logging.WARNING)
426 427 428 429 430
            return

        storage_class, client, db_class = module

        if not client:
431 432
            log2("%s: %s isn't a valid protocol, must have a Client class" %
                 (self.__class__.__name__, protocol), level=logging.WARNING)
433
            raise AuthError("invalid protocol")
Tim Peters's avatar
Tim Peters committed
434

435
        c = client(stub)
Tim Peters's avatar
Tim Peters committed
436

437 438
        # Initiate authentication, returns boolean specifying whether OK
        return c.start(self._username, self._realm, self._password)
Tim Peters's avatar
Tim Peters committed
439

440
    def testConnection(self, conn):
441
        """Internal: test the given connection.
442

443
        This returns:   1 if the connection is an optimal match,
444
                        0 if it is a suboptimal but acceptable match.
445 446
        It can also raise DisconnectedError or ReadOnlyError.

447 448 449 450 451 452 453 454 455 456 457 458 459
        This is called by ZEO.zrpc.ConnectionManager to decide which
        connection to use in case there are multiple, and some are
        read-only and others are read-write.

        This works by calling register() on the server.  In read-only
        mode, register() is called with the read_only flag set.  In
        writable mode and in read-only fallback mode, register() is
        called with the read_only flag cleared.  In read-only fallback
        mode only, if the register() call raises ReadOnlyError, it is
        retried with the read-only flag set, and if this succeeds,
        this is deemed a suboptimal match.  In all other cases, a
        succeeding register() call is deemed an optimal match, and any
        exception raised by register() is passed through.
460
        """
461
        log2("Testing connection %r" % conn)
462
        # TODO:  Should we check the protocol version here?
Jeremy Hylton's avatar
Jeremy Hylton committed
463
        self._conn_is_read_only = 0
464
        stub = self.StorageServerStubClass(conn)
465 466

        auth = stub.getAuthProtocol()
467
        log2("Server authentication protocol %r" % auth)
468
        if auth:
469 470
            skey = self.doAuth(auth, stub)
            if skey:
471
                log2("Client authentication successful")
472
                conn.setSessionKey(skey)
473
            else:
474
                log2("Authentication failed")
475
                raise AuthError("Authentication failed")
Tim Peters's avatar
Tim Peters committed
476

477 478
        try:
            stub.register(str(self._storage), self._is_read_only)
479
            return 1
480
        except POSException.ReadOnlyError:
481
            if not self._read_only_fallback:
482
                raise
483
            log2("Got ReadOnlyError; trying again with read_only=1")
484
            stub.register(str(self._storage), read_only=1)
Jeremy Hylton's avatar
Jeremy Hylton committed
485
            self._conn_is_read_only = 1
486
            return 0
487

488
    def notifyConnected(self, conn):
489
        """Internal: start using the given connection.
490 491

        This is called by ConnectionManager after it has decided which
492
        connection should be used.
493
        """
494 495 496 497 498
        if self._cache is None:
            # the storage was closed, but the connect thread called
            # this method before it was stopped.
            return

499
        # TODO:  report whether we get a read-only connection.
500
        if self._connection is not None:
501
            reconnect = 1
502
        else:
503
            reconnect = 0
504
        self.set_server_addr(conn.get_addr())
505

506 507 508
        # If we are upgrading from a read-only fallback connection,
        # we must close the old connection to prevent it from being
        # used while the cache is verified against the new connection.
509 510 511
        if self._connection is not None:
            self._connection.close()
        self._connection = conn
Jim Fulton's avatar
Jim Fulton committed
512

513
        if reconnect:
514
            log2("Reconnected to storage: %s" % self._server_addr)
515
        else:
516
            log2("Connected to storage: %s" % self._server_addr)
517

518 519 520 521
        stub = self.StorageServerStubClass(conn)
        self._oids = []
        self._info.update(stub.get_info())
        self.verify_cache(stub)
522
        if not conn.is_async():
523
            log2("Waiting for cache verification to finish")
524
            self._wait_sync()
525
        self._handle_extensions()
526

527
    def _handle_extensions(self):
528
        for name in self.getExtensionMethods().keys():
529 530
            if not hasattr(self, name):
                setattr(self, name, self._server.extensionMethod(name))
531

532 533 534 535 536 537 538 539 540 541 542 543 544 545 546
    def set_server_addr(self, addr):
        # Normalize server address and convert to string
        if isinstance(addr, types.StringType):
            self._server_addr = addr
        else:
            assert isinstance(addr, types.TupleType)
            # If the server is on a remote host, we need to guarantee
            # that all clients used the same name for the server.  If
            # they don't, the sortKey() may be different for each client.
            # The best solution seems to be the official name reported
            # by gethostbyaddr().
            host = addr[0]
            try:
                canonical, aliases, addrs = socket.gethostbyaddr(host)
            except socket.error, err:
547 548
                log2("Error resolving host: %s (%s)" % (host, err),
                     level=BLATHER)
549 550 551 552 553 554 555 556 557
                canonical = host
            self._server_addr = str((canonical, addr[1]))

    def sortKey(self):
        # If the client isn't connected to anything, it can't have a
        # valid sortKey().  Raise an error to stop the transaction early.
        if self._server_addr is None:
            raise ClientDisconnected
        else:
558
            return '%s:%s' % (self._storage, self._server_addr)
559

Jeremy Hylton's avatar
Jeremy Hylton committed
560
    def verify_cache(self, server):
561 562 563 564 565
        """Internal routine called to verify the cache.

        The return value (indicating which path we took) is used by
        the test suite.
        """
566 567 568 569

        # If verify_cache() finishes the cache verification process,
        # it should set self._server.  If it goes through full cache
        # verification, then endVerify() should self._server.
Tim Peters's avatar
Tim Peters committed
570

571 572 573 574
        last_inval_tid = self._cache.getLastTid()
        if last_inval_tid is not None:
            ltid = server.lastTransaction()
            if ltid == last_inval_tid:
575
                log2("No verification necessary (last_inval_tid up-to-date)")
576 577
                self._server = server
                self._ready.set()
578 579 580
                return "no verification"

            # log some hints about last transaction
581
            log2("last inval tid: %r %s\n"
582
                 % (last_inval_tid, tid2time(last_inval_tid)))
583
            log2("last transaction: %r %s" %
584 585 586 587
                 (ltid, ltid and tid2time(ltid)))

            pair = server.getInvalidations(last_inval_tid)
            if pair is not None:
588
                log2("Recovering %d invalidations" % len(pair[1]))
589
                self.invalidateTransaction(*pair)
590 591
                self._server = server
                self._ready.set()
592
                return "quick verification"
Tim Peters's avatar
Tim Peters committed
593

594
        log2("Verifying cache")
595 596 597 598 599
        # setup tempfile to hold zeoVerify results
        self._tfile = tempfile.TemporaryFile(suffix=".inv")
        self._pickler = cPickle.Pickler(self._tfile, 1)
        self._pickler.fast = 1 # Don't use the memo

600 601
        # TODO:  should batch these operations for efficiency; would need
        # to acquire lock ...
602 603
        for oid, tid, version in self._cache.contents():
            server.verify(oid, version, tid)
604
        self._pending_server = server
Jeremy Hylton's avatar
Jeremy Hylton committed
605
        server.endZeoVerify()
606
        return "full verification"
607 608 609 610 611 612 613 614 615

    ### Is there a race condition between notifyConnected and
    ### notifyDisconnected? In Particular, what if we get
    ### notifyDisconnected in the middle of notifyConnected?
    ### The danger is that we'll proceed as if we were connected
    ### without worrying if we were, but this would happen any way if
    ### notifyDisconnected had to get the instance lock.  There's
    ### nothing to gain by getting the instance lock.

Jeremy Hylton's avatar
Jeremy Hylton committed
616
    def notifyDisconnected(self):
617 618 619 620 621
        """Internal: notify that the server connection was terminated.

        This is called by ConnectionManager when the connection is
        closed or when certain problems with the connection occur.
        """
622
        log2("Disconnected from storage: %s" % repr(self._server_addr))
623
        self._connection = None
624
        self._ready.clear()
Jeremy Hylton's avatar
Jeremy Hylton committed
625
        self._server = disconnected_stub
626
        self._midtxn_disconnect = 1
627

Jeremy Hylton's avatar
Jeremy Hylton committed
628
    def __len__(self):
629
        """Return the size of the storage."""
630
        # TODO:  Is this method used?
Jeremy Hylton's avatar
Jeremy Hylton committed
631
        return self._info['length']
632

Jeremy Hylton's avatar
Jeremy Hylton committed
633
    def getName(self):
634 635 636 637 638 639 640 641 642 643 644
        """Storage API: return the storage name as a string.

        The return value consists of two parts: the name as determined
        by the name and addr argments to the ClientStorage
        constructor, and the string 'connected' or 'disconnected' in
        parentheses indicating whether the storage is (currently)
        connected.
        """
        return "%s (%s)" % (
            self.__name__,
            self.is_connected() and "connected" or "disconnected")
Jim Fulton's avatar
Jim Fulton committed
645

Jeremy Hylton's avatar
Jeremy Hylton committed
646
    def getSize(self):
647
        """Storage API: an approximate size of the database, in bytes."""
Jeremy Hylton's avatar
Jeremy Hylton committed
648
        return self._info['size']
649

650 651 652 653 654 655 656 657 658 659
    def getExtensionMethods(self):
        """getExtensionMethods

        This returns a dictionary whose keys are names of extra methods
        provided by this storage. Storage proxies (such as ZEO) should
        call this method to determine the extra methods that they need
        to proxy in addition to the standard storage methods.
        Dictionary values should be None; this will be a handy place
        for extra marshalling information, should we need it
        """
660
        return self._info.get('extensionMethods', {})
661

Jeremy Hylton's avatar
Jeremy Hylton committed
662
    def supportsUndo(self):
663
        """Storage API: return whether we support undo."""
Jeremy Hylton's avatar
Jeremy Hylton committed
664
        return self._info['supportsUndo']
Jim Fulton's avatar
Jim Fulton committed
665

Jeremy Hylton's avatar
Jeremy Hylton committed
666
    def supportsVersions(self):
667
        """Storage API: return whether we support versions."""
Jeremy Hylton's avatar
Jeremy Hylton committed
668 669 670
        return self._info['supportsVersions']

    def supportsTransactionalUndo(self):
671
        """Storage API: return whether we support transactional undo."""
672
        return self._info['supportsTransactionalUndo']
Jeremy Hylton's avatar
Jeremy Hylton committed
673 674

    def isReadOnly(self):
675 676
        """Storage API: return whether we are in read-only mode."""
        if self._is_read_only:
Jeremy Hylton's avatar
Jeremy Hylton committed
677
            return 1
678 679 680 681 682
        else:
            # If the client is configured for a read-write connection
            # but has a read-only fallback connection, _conn_is_read_only
            # will be True.
            return self._conn_is_read_only
Jeremy Hylton's avatar
Jeremy Hylton committed
683

684
    def _check_trans(self, trans):
685
        """Internal helper to check a transaction argument for sanity."""
686 687
        if self._is_read_only:
            raise POSException.ReadOnlyError()
Jeremy Hylton's avatar
Jeremy Hylton committed
688
        if self._transaction is not trans:
689 690
            raise POSException.StorageTransactionError(self._transaction,
                                                       trans)
Jim Fulton's avatar
Jim Fulton committed
691

692
    def abortVersion(self, version, txn):
693
        """Storage API: clear any changes made by the given version."""
694 695
        self._check_trans(txn)
        tid, oids = self._server.abortVersion(version, id(txn))
696 697 698 699 700 701 702
        # When a version aborts, invalidate the version and
        # non-version data.  The non-version data should still be
        # valid, but older versions of ZODB will change the
        # non-version serialno on an abort version.  With those
        # versions of ZODB, you'd get a conflict error if you tried to
        # commit a transaction with the cached data.

703
        # If we could guarantee that ZODB gave the right answer,
704
        # we could just invalidate the version data.
Jeremy Hylton's avatar
Jeremy Hylton committed
705
        for oid in oids:
706
            self._tbuf.invalidate(oid, '')
707
        return tid, oids
Jeremy Hylton's avatar
Jeremy Hylton committed
708

709
    def commitVersion(self, source, destination, txn):
710
        """Storage API: commit the source version in the destination."""
711 712
        self._check_trans(txn)
        tid, oids = self._server.commitVersion(source, destination, id(txn))
713
        if destination:
Jeremy Hylton's avatar
Jeremy Hylton committed
714 715
            # just invalidate our version data
            for oid in oids:
716
                self._tbuf.invalidate(oid, source)
Jeremy Hylton's avatar
Jeremy Hylton committed
717
        else:
718
            # destination is "", so invalidate version and non-version
Jeremy Hylton's avatar
Jeremy Hylton committed
719
            for oid in oids:
720 721
                self._tbuf.invalidate(oid, "")
        return tid, oids
722

723
    def history(self, oid, version, length=1):
724 725 726 727 728
        """Storage API: return a sequence of HistoryEntry objects.

        This does not support the optional filter argument defined by
        the Storage API.
        """
Jeremy Hylton's avatar
Jeremy Hylton committed
729 730
        return self._server.history(oid, version, length)

731 732 733 734
    def getSerial(self, oid):
        """Storage API: return current serial number for oid."""
        return self._server.getSerial(oid)

735
    def loadSerial(self, oid, serial):
736
        """Storage API: load a historical revision of an object."""
Jeremy Hylton's avatar
Jeremy Hylton committed
737
        return self._server.loadSerial(oid, serial)
Jim Fulton's avatar
Jim Fulton committed
738

739 740 741 742 743 744 745
    def load(self, oid, version):
        """Storage API: return the data for a given object.

        This returns the pickle data and serial number for the object
        specified by the given object id and version, if they exist;
        otherwise a KeyError is raised.
        """
746 747 748
        return self.loadEx(oid, version)[:2]

    def loadEx(self, oid, version):
749 750
        self._lock.acquire()    # for atomic processing of invalidations
        try:
751 752 753
            t = self._cache.load(oid, version)
            if t:
                return t
754 755
        finally:
            self._lock.release()
756

Jeremy Hylton's avatar
Jeremy Hylton committed
757 758
        if self._server is None:
            raise ClientDisconnected()
759 760 761 762 763 764 765 766 767 768

        self._load_lock.acquire()
        try:
            self._lock.acquire()
            try:
                self._load_oid = oid
                self._load_status = 1
            finally:
                self._lock.release()

769
            data, tid, ver = self._server.loadEx(oid, version)
770 771 772 773

            self._lock.acquire()    # for atomic processing of invalidations
            try:
                if self._load_status:
774
                    self._cache.store(oid, ver, tid, None, data)
775 776 777 778 779 780
                self._load_oid = None
            finally:
                self._lock.release()
        finally:
            self._load_lock.release()

781 782 783 784 785 786 787 788 789 790 791 792 793 794 795 796 797 798 799 800
        return data, tid, ver

    def loadBefore(self, oid, tid):
        self._lock.acquire()
        try:
            t = self._cache.loadBefore(oid, tid)
            if t is not None:
                return t
        finally:
            self._lock.release()

        t = self._server.loadBefore(oid, tid)
        if t is None:
            return None
        data, start, end = t
        if end is None:
            # This method should not be used to get current data.  It
            # doesn't use the _load_lock, so it is possble to overlap
            # this load with an invalidation for the same object.

801
            # If we call again, we're guaranteed to get the
802 803 804 805 806 807 808 809 810 811 812 813 814 815 816 817
            # post-invalidation data.  But if the data is still
            # current, we'll still get end == None.

            # Maybe the best thing to do is to re-run the test with
            # the load lock in the case.  That's slow performance, but
            # I don't think real application code will ever care about
            # it.

            return data, start, end
        self._lock.acquire()
        try:
            self._cache.store(oid, "", start, end, data)
        finally:
            self._lock.release()

        return data, start, end
Jeremy Hylton's avatar
Jeremy Hylton committed
818

Jim Fulton's avatar
Jim Fulton committed
819
    def modifiedInVersion(self, oid):
820 821 822 823
        """Storage API: return the version, if any, that modfied an object.

        If no version modified the object, return an empty string.
        """
824 825 826 827 828 829 830
        self._lock.acquire()
        try:
            v = self._cache.modifiedInVersion(oid)
            if v is not None:
                return v
        finally:
            self._lock.release()
Jeremy Hylton's avatar
Jeremy Hylton committed
831
        return self._server.modifiedInVersion(oid)
Jim Fulton's avatar
Jim Fulton committed
832

833 834
    def new_oid(self):
        """Storage API: return a new object identifier."""
Jeremy Hylton's avatar
Jeremy Hylton committed
835 836 837
        if self._is_read_only:
            raise POSException.ReadOnlyError()
        # avoid multiple oid requests to server at the same time
838
        self._oid_lock.acquire()
839 840 841 842 843 844 845
        try:
            if not self._oids:
                self._oids = self._server.new_oids()
                self._oids.reverse()
            return self._oids.pop()
        finally:
            self._oid_lock.release()
Jeremy Hylton's avatar
Jeremy Hylton committed
846

847 848 849 850 851 852 853 854 855 856 857 858 859
    def pack(self, t=None, referencesf=None, wait=1, days=0):
        """Storage API: pack the storage.

        Deviations from the Storage API: the referencesf argument is
        ignored; two additional optional arguments wait and days are
        provided:

        wait -- a flag indicating whether to wait for the pack to
            complete; defaults to true.

        days -- a number of days to subtract from the pack time;
            defaults to zero.
        """
860 861
        # TODO: Is it okay that read-only connections allow pack()?
        # rf argument ignored; server will provide its own implementation
Jeremy Hylton's avatar
Jeremy Hylton committed
862 863 864 865 866 867
        if t is None:
            t = time.time()
        t = t - (days * 86400)
        return self._server.pack(t, wait)

    def _check_serials(self):
868
        """Internal helper to move data from _serials to _seriald."""
869
        # serials are always going to be the same, the only
870
        # question is whether an exception has been raised.
Jeremy Hylton's avatar
Jeremy Hylton committed
871 872 873 874 875 876 877 878 879
        if self._serials:
            l = len(self._serials)
            r = self._serials[:l]
            del self._serials[:l]
            for oid, s in r:
                if isinstance(s, Exception):
                    raise s
                self._seriald[oid] = s
            return r
Jim Fulton's avatar
Jim Fulton committed
880

881
    def store(self, oid, serial, data, version, txn):
882
        """Storage API: store data for an object."""
883 884
        self._check_trans(txn)
        self._server.storea(oid, serial, data, version, id(txn))
Jeremy Hylton's avatar
Jeremy Hylton committed
885 886
        self._tbuf.store(oid, version, data)
        return self._check_serials()
Jim Fulton's avatar
Jim Fulton committed
887

888
    def tpc_vote(self, txn):
889
        """Storage API: vote on a transaction."""
890
        if txn is not self._transaction:
Jeremy Hylton's avatar
Jeremy Hylton committed
891
            return
892
        self._server.vote(id(txn))
Jeremy Hylton's avatar
Jeremy Hylton committed
893 894
        return self._check_serials()

Jeremy Hylton's avatar
Jeremy Hylton committed
895
    def tpc_begin(self, txn, tid=None, status=' '):
896
        """Storage API: begin a transaction."""
897 898
        if self._is_read_only:
            raise POSException.ReadOnlyError()
899
        self._tpc_cond.acquire()
900
        self._midtxn_disconnect = 0
Jeremy Hylton's avatar
Jeremy Hylton committed
901
        while self._transaction is not None:
902 903 904
            # It is allowable for a client to call two tpc_begins in a
            # row with the same transaction, and the second of these
            # must be ignored.
Jeremy Hylton's avatar
Jeremy Hylton committed
905
            if self._transaction == txn:
906
                self._tpc_cond.release()
Jeremy Hylton's avatar
Jeremy Hylton committed
907
                return
908
            self._tpc_cond.wait(30)
Jeremy Hylton's avatar
Jeremy Hylton committed
909
        self._transaction = txn
910
        self._tpc_cond.release()
911 912

        try:
913
            self._server.tpc_begin(id(txn), txn.user, txn.description,
Jeremy Hylton's avatar
Jeremy Hylton committed
914
                                   txn._extension, tid, status)
Jeremy Hylton's avatar
Jeremy Hylton committed
915 916 917
        except:
            # Client may have disconnected during the tpc_begin().
            if self._server is not disconnected_stub:
918
                self.end_transaction()
Jeremy Hylton's avatar
Jeremy Hylton committed
919 920
            raise

921
        self._tbuf.clear()
Jeremy Hylton's avatar
Jeremy Hylton committed
922 923
        self._seriald.clear()
        del self._serials[:]
Jim Fulton's avatar
Jim Fulton committed
924

925
    def end_transaction(self):
926
        """Internal helper to end a transaction."""
927
        # the right way to set self._transaction to None
928 929
        # calls notify() on _tpc_cond in case there are waiting threads
        self._tpc_cond.acquire()
930
        self._transaction = None
931 932
        self._tpc_cond.notify()
        self._tpc_cond.release()
933

934
    def lastTransaction(self):
935
        return self._cache.getLastTid()
936

937
    def tpc_abort(self, txn):
938
        """Storage API: abort a transaction."""
939
        if txn is not self._transaction:
940 941
            return
        try:
942
            # Caution:  Are there any exceptions that should prevent an
943 944 945 946
            # abort from occurring?  It seems wrong to swallow them
            # all, yet you want to be sure that other abort logic is
            # executed regardless.
            try:
947
                self._server.tpc_abort(id(txn))
948
            except ClientDisconnected:
949 950
                log2("ClientDisconnected in tpc_abort() ignored",
                     level=BLATHER)
951
        finally:
952 953 954
            self._tbuf.clear()
            self._seriald.clear()
            del self._serials[:]
955
            self.end_transaction()
956

957
    def tpc_finish(self, txn, f=None):
958
        """Storage API: finish a transaction."""
959
        if txn is not self._transaction:
Jeremy Hylton's avatar
Jeremy Hylton committed
960
            return
961
        self._load_lock.acquire()
962
        try:
963 964 965 966
            if self._midtxn_disconnect:
                raise ClientDisconnected(
                       'Calling tpc_finish() on a disconnected transaction')

967 968 969 970
            # The calls to tpc_finish() and _update_cache() should
            # never run currently with another thread, because the
            # tpc_cond condition variable prevents more than one
            # thread from calling tpc_finish() at a time.
971
            tid = self._server.tpc_finish(id(txn))
972 973
            self._lock.acquire()  # for atomic processing of invalidations
            try:
974
                self._update_cache(tid)
975
                if f is not None:
976
                    f(tid)
977 978
            finally:
                self._lock.release()
Jeremy Hylton's avatar
Jeremy Hylton committed
979 980 981 982

            r = self._check_serials()
            assert r is None or len(r) == 0, "unhandled serialnos: %s" % r
        finally:
983
            self._load_lock.release()
984
            self.end_transaction()
Jeremy Hylton's avatar
Jeremy Hylton committed
985

986
    def _update_cache(self, tid):
987 988 989 990 991
        """Internal helper to handle objects modified by a transaction.

        This iterates over the objects in the transaction buffer and
        update or invalidate the cache.
        """
992
        # Must be called with _lock already acquired.
993

994
        # Not sure why _update_cache() would be called on a closed storage.
995 996 997
        if self._cache is None:
            return

998
        for oid, version, data in self._tbuf:
999 1000 1001
            self._cache.invalidate(oid, version, tid)
            # If data is None, we just invalidate.
            if data is not None:
Jeremy Hylton's avatar
Jeremy Hylton committed
1002
                s = self._seriald[oid]
1003 1004 1005
                if s != ResolvedSerial:
                    assert s == tid, (s, tid)
                    self._cache.store(oid, version, s, None, data)
Jeremy Hylton's avatar
Jeremy Hylton committed
1006
        self._tbuf.clear()
1007

1008
    def undo(self, trans_id, txn):
1009 1010 1011 1012 1013 1014 1015 1016
        """Storage API: undo a transaction.

        This is executed in a transactional context.  It has no effect
        until the transaction is committed.  It can be undone itself.

        Zope uses this to implement undo unless it is not supported by
        a storage.
        """
1017
        self._check_trans(txn)
1018
        tid, oids = self._server.undo(trans_id, id(txn))
Jeremy Hylton's avatar
Jeremy Hylton committed
1019 1020
        for oid in oids:
            self._tbuf.invalidate(oid, '')
1021
        return tid, oids
Jim Fulton's avatar
Jim Fulton committed
1022

1023
    def undoInfo(self, first=0, last=-20, specification=None):
1024
        """Storage API: return undo information."""
Jeremy Hylton's avatar
Jeremy Hylton committed
1025
        return self._server.undoInfo(first, last, specification)
Jim Fulton's avatar
Jim Fulton committed
1026

1027
    def undoLog(self, first=0, last=-20, filter=None):
1028
        """Storage API: return a sequence of TransactionDescription objects.
Jeremy Hylton's avatar
Jeremy Hylton committed
1029

1030 1031 1032 1033 1034 1035 1036 1037
        The filter argument should be None or left unspecified, since
        it is impossible to pass the filter function to the server to
        be executed there.  If filter is not None, an empty sequence
        is returned.
        """
        if filter is not None:
            return []
        return self._server.undoLog(first, last)
Jim Fulton's avatar
Jim Fulton committed
1038 1039

    def versionEmpty(self, version):
1040
        """Storage API: return whether the version has no transactions."""
Jeremy Hylton's avatar
Jeremy Hylton committed
1041
        return self._server.versionEmpty(version)
Jim Fulton's avatar
Jim Fulton committed
1042 1043

    def versions(self, max=None):
1044
        """Storage API: return a sequence of versions in the storage."""
Jeremy Hylton's avatar
Jeremy Hylton committed
1045 1046
        return self._server.versions(max)

1047
    # Below are methods invoked by the StorageServer
Jeremy Hylton's avatar
Jeremy Hylton committed
1048 1049

    def serialnos(self, args):
1050
        """Server callback to pass a list of changed (oid, serial) pairs."""
Jeremy Hylton's avatar
Jeremy Hylton committed
1051 1052 1053
        self._serials.extend(args)

    def info(self, dict):
1054
        """Server callback to update the info dictionary."""
Jeremy Hylton's avatar
Jeremy Hylton committed
1055 1056
        self._info.update(dict)

1057
    def invalidateVerify(self, args):
1058 1059 1060 1061
        """Server callback to invalidate an (oid, version) pair.

        This is called as part of cache validation.
        """
1062 1063
        # Invalidation as result of verify_cache().
        # Queue an invalidate for the end the verification procedure.
Jeremy Hylton's avatar
Jeremy Hylton committed
1064
        if self._pickler is None:
1065 1066
            # This should never happen.  TODO:  assert it doesn't, or log
            # if it does.
Jeremy Hylton's avatar
Jeremy Hylton committed
1067 1068 1069
            return
        self._pickler.dump(args)

1070 1071 1072 1073
    def _process_invalidations(self, invs):
        # Invalidations are sent by the ZEO server as a sequence of
        # oid, version pairs.  The DB's invalidate() method expects a
        # dictionary of oids.
1074

1075 1076 1077 1078
        self._lock.acquire()
        try:
            # versions maps version names to dictionary of invalidations
            versions = {}
1079
            for oid, version, tid in invs:
1080 1081
                if oid == self._load_oid:
                    self._load_status = 0
1082 1083
                self._cache.invalidate(oid, version, tid)
                versions.setdefault((version, tid), {})[oid] = tid
1084

1085
            if self._db is not None:
1086 1087
                for (version, tid), d in versions.items():
                    self._db.invalidate(tid, d, version=version)
1088 1089
        finally:
            self._lock.release()
1090

1091
    def endVerify(self):
1092
        """Server callback to signal end of cache validation."""
Jeremy Hylton's avatar
Jeremy Hylton committed
1093 1094
        if self._pickler is None:
            return
1095 1096
        # write end-of-data marker
        self._pickler.dump((None, None))
1097
        self._pickler = None
Jeremy Hylton's avatar
Jeremy Hylton committed
1098 1099 1100
        self._tfile.seek(0)
        f = self._tfile
        self._tfile = None
1101
        self._process_invalidations(InvalidationLogIterator(f))
Jeremy Hylton's avatar
Jeremy Hylton committed
1102 1103
        f.close()

1104
        log2("endVerify finishing")
1105 1106 1107
        self._server = self._pending_server
        self._ready.set()
        self._pending_conn = None
1108
        log2("endVerify finished")
1109

1110 1111
    def invalidateTransaction(self, tid, args):
        """Invalidate objects modified by tid."""
1112 1113 1114 1115 1116
        self._lock.acquire()
        try:
            self._cache.setLastTid(tid)
        finally:
            self._lock.release()
1117
        if self._pickler is not None:
1118 1119
            log2("Transactional invalidation during cache verification",
                 level=BLATHER)
1120
            for t in args:
1121
                self._pickler.dump(t)
1122
            return
1123 1124
        self._process_invalidations([(oid, version, tid)
                                     for oid, version in args])
1125 1126 1127 1128 1129 1130

    # The following are for compatibility with protocol version 2.0.0

    def invalidateTrans(self, args):
        return self.invalidateTransaction(None, args)

1131 1132 1133
    invalidate = invalidateVerify
    end = endVerify
    Invalidate = invalidateTrans
1134

1135 1136 1137 1138
def InvalidationLogIterator(fileobj):
    unpickler = cPickle.Unpickler(fileobj)
    while 1:
        oid, version = unpickler.load()
1139
        if oid is None:
1140 1141
            break
        yield oid, version, None