Commit d9c7eca5 authored by Jeremy Hylton's avatar Jeremy Hylton

Revise interface between ExportImport and Connection.

The Connection class inherits from the ExportImport mixin, but they
had an arm's length interaction.  Restructure the code to provide
direct interaction via the _import attribute.

Remove the more general onCommitCallback() method.  It was only used
by ExportImport.

A lot of cosmetic changes to ExportImport:
    - don't make local variables out of attribute lookups
    - avoid local variable names that shadow builtins
    - prefer isinstance() to type comparisons
    - prefer absolute imports to relative imports
parent c3abd877
......@@ -13,7 +13,7 @@
##############################################################################
"""Database connection support
$Id: Connection.py,v 1.115 2004/01/14 14:33:43 jeremy Exp $"""
$Id: Connection.py,v 1.116 2004/01/14 18:42:13 jeremy Exp $"""
import logging
import sys
......@@ -119,6 +119,12 @@ class Connection(ExportImport, object):
self._mvcc = mvcc and not version
self._txn_time = None
# To support importFile(), implemented in the ExportImport base
# class, we need to run _importDuringCommit() from our commit()
# method. If _import is not None, it is a two-tuple of arguments
# to pass to _importDuringCommit().
self._import = None
def getTransaction(self):
t = self._transaction
if t is None:
......@@ -249,20 +255,12 @@ class Connection(ExportImport, object):
# Return the connection to the pool.
self._db._closeConnection(self)
__onCommitActions = None
def onCommitAction(self, method_name, *args, **kw):
if self.__onCommitActions is None:
self.__onCommitActions = []
self.__onCommitActions.append((method_name, args, kw))
self.getTransaction().register(self)
def commit(self, object, transaction):
if object is self:
# We registered ourself. Execute a commit action, if any.
if self.__onCommitActions is not None:
method_name, args, kw = self.__onCommitActions.pop(0)
getattr(self, method_name)(transaction, *args, **kw)
if self._import:
self._importDuringCommit(transaction, *self._import)
self._import = None
return
oid = object._p_oid
......@@ -582,8 +580,8 @@ class Connection(ExportImport, object):
raise
def tpc_abort(self, transaction):
if self.__onCommitActions is not None:
del self.__onCommitActions
if self._import:
self._import = None
self._storage.tpc_abort(transaction)
self._cache.invalidate(self._modified)
self._flush_invalidations()
......@@ -604,8 +602,6 @@ class Connection(ExportImport, object):
self._storage.tpc_begin(transaction)
def tpc_vote(self, transaction):
if self.__onCommitActions is not None:
del self.__onCommitActions
try:
vote = self._storage.tpc_vote
except AttributeError:
......
......@@ -11,73 +11,66 @@
# FOR A PARTICULAR PURPOSE
#
##############################################################################
"""Support for database export and import."""
"""Support for database export and import.
"""
import POSException
from utils import p64, u64
from referencesf import referencesf
from cStringIO import StringIO
from cPickle import Pickler, Unpickler
from types import StringType, TupleType
from tempfile import TemporaryFile
from ZODB.POSException import ExportError
from ZODB.utils import p64, u64
from ZODB.referencesf import referencesf
import zLOG
class ExportImport:
def exportFile(self, oid, file=None):
if file is None: file=TemporaryFile()
elif type(file) is StringType: file=open(file,'w+b')
write=file.write
write('ZEXP')
version=self._version
ref=referencesf
oids=[oid]
done_oids={}
def exportFile(self, oid, f=None):
if f is None:
f = TemporaryFile()
elif isinstance(f, str):
f = open(f,'w+b')
f.write('ZEXP')
oids = [oid]
done_oids = {}
done=done_oids.has_key
load=self._storage.load
while oids:
oid=oids[0]
del oids[0]
if done(oid): continue
done_oids[oid]=1
oid = oids.pop(0)
if oid in done_oids:
continue
done_oids[oid] = True
try:
p, serial = load(oid, version)
p, serial = load(oid, self._version)
except:
zLOG.LOG("ZODB", zLOG.DEBUG,
"broken reference for oid %s" % `oid`,
err=sys.exc_info())
else:
ref(p, oids)
write(oid)
write(p64(len(p)))
write(p)
write(export_end_marker)
return file
def importFile(self, file, clue='', customImporters=None):
# This is tricky, because we need to work in a transaction!
referencesf(p, oids)
f.writelines([oid, p64(len(p)), p])
f.write(export_end_marker)
return f
if isinstance(file, StringType):
file = open(file,'rb')
read = file.read
def importFile(self, f, clue='', customImporters=None):
# This is tricky, because we need to work in a transaction!
magic = read(4)
if isinstance(f, str):
f = open(f,'rb')
magic = f.read(4)
if magic != 'ZEXP':
if customImporters and customImporters.has_key(magic):
file.seek(0)
return customImporters[magic](self, file, clue)
raise POSException.ExportError, 'Invalid export header'
f.seek(0)
return customImporters[magic](self, f, clue)
raise ExportError("Invalid export header")
t = self.getTransaction()
if clue:
t.note(clue)
return_oid_list = []
self.onCommitAction('_importDuringCommit', file, return_oid_list)
self._import = f, return_oid_list
self.getTransaction().register(self)
t.commit(1)
# Return the root imported object.
if return_oid_list:
......@@ -85,84 +78,77 @@ class ExportImport:
else:
return None
def _importDuringCommit(self, transaction, file, return_oid_list):
'''
def _importDuringCommit(self, transaction, f, return_oid_list):
"""Import data during two-phase commit.
Invoked by the transaction manager mid commit.
Appends one item, the OID of the first object created,
to return_oid_list.
'''
"""
oids = {}
storage = self._storage
new_oid = storage.new_oid
store = storage.store
read = file.read
def persistent_load(ooid,
Ghost=Ghost,
oids=oids, wrote_oid=oids.has_key,
new_oid=storage.new_oid):
"Remap a persistent id to a new ID and create a ghost for it."
def persistent_load(ooid):
"""Remap a persistent id to a new ID and create a ghost for it."""
if type(ooid) is TupleType: ooid, klass = ooid
else: klass=None
klass = None
if isinstance(ooid, tuple):
ooid, klass = ooid
if wrote_oid(ooid): oid=oids[ooid]
if ooid in oids:
oid = oids[ooid]
else:
if klass is None: oid=new_oid()
else: oid=new_oid(), klass
oids[ooid]=oid
if klass is None:
oid = self._storage.new_oid()
else:
oid = self._storage.new_oid(), klass
oids[ooid] = oid
Ghost=Ghost()
Ghost.oid=oid
return Ghost
return Ghost(oid)
version = self._version
while 1:
h=read(16)
if h==export_end_marker: break
h = f.read(16)
if h == export_end_marker:
break
if len(h) != 16:
raise POSException.ExportError, 'Truncated export file'
l=u64(h[8:16])
p=read(l)
raise ExportError("Truncated export file")
l = u64(h[8:16])
p = f.read(l)
if len(p) != l:
raise POSException.ExportError, 'Truncated export file'
raise ExportError("Truncated export file")
ooid=h[:8]
ooid = h[:8]
if oids:
oid=oids[ooid]
if type(oid) is TupleType: oid=oid[0]
oid = oids[ooid]
if isinstance(oid, tuple):
oid = oid[0]
else:
oids[ooid] = oid = storage.new_oid()
oids[ooid] = oid = self._storage.new_oid()
return_oid_list.append(oid)
pfile=StringIO(p)
unpickler=Unpickler(pfile)
unpickler.persistent_load=persistent_load
pfile = StringIO(p)
unpickler = Unpickler(pfile)
unpickler.persistent_load = persistent_load
newp=StringIO()
pickler=Pickler(newp,1)
pickler.persistent_id=persistent_id
newp = StringIO()
pickler = Pickler(newp, 1)
pickler.persistent_id = persistent_id
pickler.dump(unpickler.load())
pickler.dump(unpickler.load())
p=newp.getvalue()
store(oid, None, p, version, transaction)
p = newp.getvalue()
self._storage.store(oid, None, p, version, transaction)
def TemporaryFile():
# This is sneaky suicide
global TemporaryFile
import tempfile
TemporaryFile=tempfile.TemporaryFile
return TemporaryFile()
export_end_marker='\377'*16
export_end_marker = '\377'*16
class Ghost: pass
class Ghost(object):
__slots__ = ("oid",)
def __init__(self, oid):
self.oid = oid
def persistent_id(object, Ghost=Ghost):
if getattr(object, '__class__', None) is Ghost:
return object.oid
def persistent_id(obj):
if isinstance(obj, Ghost):
return obj.oid
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment