Commit 90476555 authored by Jeremy Hylton's avatar Jeremy Hylton

Revise interface between ExportImport and Connection.

The Connection class inherits from the ExportImport mixin, but they
had an arm's length interaction.  Restructure the code to provide
direct interaction via the _import attribute.

Remove the more general onCommitCallback() method.  It was only used
by ExportImport.

A lot of cosmetic changes to ExportImport:
    - don't make local variables out of attribute lookups
    - avoid local variable names that shadow builtins
    - prefer isinstance() to type comparisons
    - prefer absolute imports to relative imports
parent e4a6e04e
...@@ -13,7 +13,7 @@ ...@@ -13,7 +13,7 @@
############################################################################## ##############################################################################
"""Database connection support """Database connection support
$Id: Connection.py,v 1.115 2004/01/14 14:33:43 jeremy Exp $""" $Id: Connection.py,v 1.116 2004/01/14 18:42:13 jeremy Exp $"""
import logging import logging
import sys import sys
...@@ -119,6 +119,12 @@ class Connection(ExportImport, object): ...@@ -119,6 +119,12 @@ class Connection(ExportImport, object):
self._mvcc = mvcc and not version self._mvcc = mvcc and not version
self._txn_time = None self._txn_time = None
# To support importFile(), implemented in the ExportImport base
# class, we need to run _importDuringCommit() from our commit()
# method. If _import is not None, it is a two-tuple of arguments
# to pass to _importDuringCommit().
self._import = None
def getTransaction(self): def getTransaction(self):
t = self._transaction t = self._transaction
if t is None: if t is None:
...@@ -249,20 +255,12 @@ class Connection(ExportImport, object): ...@@ -249,20 +255,12 @@ class Connection(ExportImport, object):
# Return the connection to the pool. # Return the connection to the pool.
self._db._closeConnection(self) self._db._closeConnection(self)
__onCommitActions = None
def onCommitAction(self, method_name, *args, **kw):
if self.__onCommitActions is None:
self.__onCommitActions = []
self.__onCommitActions.append((method_name, args, kw))
self.getTransaction().register(self)
def commit(self, object, transaction): def commit(self, object, transaction):
if object is self: if object is self:
# We registered ourself. Execute a commit action, if any. # We registered ourself. Execute a commit action, if any.
if self.__onCommitActions is not None: if self._import:
method_name, args, kw = self.__onCommitActions.pop(0) self._importDuringCommit(transaction, *self._import)
getattr(self, method_name)(transaction, *args, **kw) self._import = None
return return
oid = object._p_oid oid = object._p_oid
...@@ -582,8 +580,8 @@ class Connection(ExportImport, object): ...@@ -582,8 +580,8 @@ class Connection(ExportImport, object):
raise raise
def tpc_abort(self, transaction): def tpc_abort(self, transaction):
if self.__onCommitActions is not None: if self._import:
del self.__onCommitActions self._import = None
self._storage.tpc_abort(transaction) self._storage.tpc_abort(transaction)
self._cache.invalidate(self._modified) self._cache.invalidate(self._modified)
self._flush_invalidations() self._flush_invalidations()
...@@ -604,8 +602,6 @@ class Connection(ExportImport, object): ...@@ -604,8 +602,6 @@ class Connection(ExportImport, object):
self._storage.tpc_begin(transaction) self._storage.tpc_begin(transaction)
def tpc_vote(self, transaction): def tpc_vote(self, transaction):
if self.__onCommitActions is not None:
del self.__onCommitActions
try: try:
vote = self._storage.tpc_vote vote = self._storage.tpc_vote
except AttributeError: except AttributeError:
......
...@@ -11,73 +11,66 @@ ...@@ -11,73 +11,66 @@
# FOR A PARTICULAR PURPOSE # FOR A PARTICULAR PURPOSE
# #
############################################################################## ##############################################################################
"""Support for database export and import."""
"""Support for database export and import.
"""
import POSException
from utils import p64, u64
from referencesf import referencesf
from cStringIO import StringIO from cStringIO import StringIO
from cPickle import Pickler, Unpickler from cPickle import Pickler, Unpickler
from types import StringType, TupleType from tempfile import TemporaryFile
from ZODB.POSException import ExportError
from ZODB.utils import p64, u64
from ZODB.referencesf import referencesf
import zLOG import zLOG
class ExportImport: class ExportImport:
def exportFile(self, oid, file=None): def exportFile(self, oid, f=None):
if f is None:
if file is None: file=TemporaryFile() f = TemporaryFile()
elif type(file) is StringType: file=open(file,'w+b') elif isinstance(f, str):
write=file.write f = open(f,'w+b')
write('ZEXP') f.write('ZEXP')
version=self._version oids = [oid]
ref=referencesf done_oids = {}
oids=[oid]
done_oids={}
done=done_oids.has_key done=done_oids.has_key
load=self._storage.load load=self._storage.load
while oids: while oids:
oid=oids[0] oid = oids.pop(0)
del oids[0] if oid in done_oids:
if done(oid): continue continue
done_oids[oid]=1 done_oids[oid] = True
try: try:
p, serial = load(oid, version) p, serial = load(oid, self._version)
except: except:
zLOG.LOG("ZODB", zLOG.DEBUG, zLOG.LOG("ZODB", zLOG.DEBUG,
"broken reference for oid %s" % `oid`, "broken reference for oid %s" % `oid`,
err=sys.exc_info()) err=sys.exc_info())
else: else:
ref(p, oids) referencesf(p, oids)
write(oid) f.writelines([oid, p64(len(p)), p])
write(p64(len(p))) f.write(export_end_marker)
write(p) return f
write(export_end_marker)
return file
def importFile(self, file, clue='', customImporters=None):
# This is tricky, because we need to work in a transaction!
if isinstance(file, StringType): def importFile(self, f, clue='', customImporters=None):
file = open(file,'rb') # This is tricky, because we need to work in a transaction!
read = file.read
magic = read(4) if isinstance(f, str):
f = open(f,'rb')
magic = f.read(4)
if magic != 'ZEXP': if magic != 'ZEXP':
if customImporters and customImporters.has_key(magic): if customImporters and customImporters.has_key(magic):
file.seek(0) f.seek(0)
return customImporters[magic](self, file, clue) return customImporters[magic](self, f, clue)
raise POSException.ExportError, 'Invalid export header' raise ExportError("Invalid export header")
t = self.getTransaction() t = self.getTransaction()
if clue: if clue:
t.note(clue) t.note(clue)
return_oid_list = [] return_oid_list = []
self.onCommitAction('_importDuringCommit', file, return_oid_list) self._import = f, return_oid_list
self.getTransaction().register(self)
t.commit(1) t.commit(1)
# Return the root imported object. # Return the root imported object.
if return_oid_list: if return_oid_list:
...@@ -85,84 +78,77 @@ class ExportImport: ...@@ -85,84 +78,77 @@ class ExportImport:
else: else:
return None return None
def _importDuringCommit(self, transaction, file, return_oid_list): def _importDuringCommit(self, transaction, f, return_oid_list):
''' """Import data during two-phase commit.
Invoked by the transaction manager mid commit. Invoked by the transaction manager mid commit.
Appends one item, the OID of the first object created, Appends one item, the OID of the first object created,
to return_oid_list. to return_oid_list.
''' """
oids = {} oids = {}
storage = self._storage
new_oid = storage.new_oid
store = storage.store
read = file.read
def persistent_load(ooid,
Ghost=Ghost,
oids=oids, wrote_oid=oids.has_key,
new_oid=storage.new_oid):
"Remap a persistent id to a new ID and create a ghost for it." def persistent_load(ooid):
"""Remap a persistent id to a new ID and create a ghost for it."""
if type(ooid) is TupleType: ooid, klass = ooid klass = None
else: klass=None if isinstance(ooid, tuple):
ooid, klass = ooid
if wrote_oid(ooid): oid=oids[ooid] if ooid in oids:
oid = oids[ooid]
else: else:
if klass is None: oid=new_oid() if klass is None:
else: oid=new_oid(), klass oid = self._storage.new_oid()
oids[ooid]=oid else:
oid = self._storage.new_oid(), klass
oids[ooid] = oid
Ghost=Ghost() return Ghost(oid)
Ghost.oid=oid
return Ghost
version = self._version version = self._version
while 1: while 1:
h=read(16) h = f.read(16)
if h==export_end_marker: break if h == export_end_marker:
break
if len(h) != 16: if len(h) != 16:
raise POSException.ExportError, 'Truncated export file' raise ExportError("Truncated export file")
l=u64(h[8:16]) l = u64(h[8:16])
p=read(l) p = f.read(l)
if len(p) != l: if len(p) != l:
raise POSException.ExportError, 'Truncated export file' raise ExportError("Truncated export file")
ooid=h[:8] ooid = h[:8]
if oids: if oids:
oid=oids[ooid] oid = oids[ooid]
if type(oid) is TupleType: oid=oid[0] if isinstance(oid, tuple):
oid = oid[0]
else: else:
oids[ooid] = oid = storage.new_oid() oids[ooid] = oid = self._storage.new_oid()
return_oid_list.append(oid) return_oid_list.append(oid)
pfile=StringIO(p) pfile = StringIO(p)
unpickler=Unpickler(pfile) unpickler = Unpickler(pfile)
unpickler.persistent_load=persistent_load unpickler.persistent_load = persistent_load
newp=StringIO() newp = StringIO()
pickler=Pickler(newp,1) pickler = Pickler(newp, 1)
pickler.persistent_id=persistent_id pickler.persistent_id = persistent_id
pickler.dump(unpickler.load()) pickler.dump(unpickler.load())
pickler.dump(unpickler.load()) pickler.dump(unpickler.load())
p=newp.getvalue() p = newp.getvalue()
store(oid, None, p, version, transaction)
self._storage.store(oid, None, p, version, transaction)
def TemporaryFile():
# This is sneaky suicide
global TemporaryFile
import tempfile
TemporaryFile=tempfile.TemporaryFile
return TemporaryFile()
export_end_marker='\377'*16 export_end_marker = '\377'*16
class Ghost: pass class Ghost(object):
__slots__ = ("oid",)
def __init__(self, oid):
self.oid = oid
def persistent_id(object, Ghost=Ghost): def persistent_id(obj):
if getattr(object, '__class__', None) is Ghost: if isinstance(obj, Ghost):
return object.oid return obj.oid
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment