Commit 1f66431c authored by Jeremy Hylton's avatar Jeremy Hylton

Backport fix for concurrent writing and packing.

The pack() implementation would leave a corrupted storage behind if
transactions committed while packing.
parent 3231cfb6
...@@ -115,7 +115,7 @@ ...@@ -115,7 +115,7 @@
# may have a back pointer to a version record or to a non-version # may have a back pointer to a version record or to a non-version
# record. # record.
# #
__version__='$Revision: 1.131 $'[11:-2] __version__='$Revision: 1.132 $'[11:-2]
import base64 import base64
from cPickle import Pickler, Unpickler, loads from cPickle import Pickler, Unpickler, loads
...@@ -1460,25 +1460,28 @@ class FileStorage(BaseStorage.BaseStorage, ...@@ -1460,25 +1460,28 @@ class FileStorage(BaseStorage.BaseStorage,
if opos is None: if opos is None:
return return
oldpath = self._file_name + ".old" oldpath = self._file_name + ".old"
self._file.close() self._lock_acquire()
try: try:
if os.path.exists(oldpath): self._file.close()
os.remove(oldpath) try:
os.rename(self._file_name, oldpath) if os.path.exists(oldpath):
except Exception, msg: os.remove(oldpath)
self._file = open(self._file_name, 'r+b') os.rename(self._file_name, oldpath)
raise except Exception, msg:
self._file = open(self._file_name, 'r+b')
raise
# OK, we're beyond the point of no return # OK, we're beyond the point of no return
os.rename(self._file_name + '.pack', self._file_name) os.rename(self._file_name + '.pack', self._file_name)
self._file = open(self._file_name, 'r+b') self._file = open(self._file_name, 'r+b')
self._initIndex(p.index, p.vindex, p.tindex, p.tvindex) self._initIndex(p.index, p.vindex, p.tindex, p.tvindex)
self._pos = opos self._pos = opos
self._save_index() self._save_index()
finally:
self._lock_release()
finally: finally:
if p.locked: if p.locked:
self._commit_lock_release() self._commit_lock_release()
self._lock_release()
self._lock_acquire() self._lock_acquire()
self._packt = z64 self._packt = z64
self._lock_release() self._lock_release()
......
...@@ -45,6 +45,23 @@ except ImportError: ...@@ -45,6 +45,23 @@ except ImportError:
class CorruptedError(Exception): class CorruptedError(Exception):
pass pass
class CorruptedDataError(CorruptedError):
def __init__(self, oid=None, buf=None, pos=None):
self.oid = oid
self.buf = buf
self.pos = pos
def __str__(self):
if self.oid:
msg = "Error reading oid %s. Found %r" % (_fmt_oid(self.oid),
self.buf)
else:
msg = "Error reading unknown oid. Found %r" % self.buf
if self.pos:
msg += " at %d" % self.pos
return msg
# the struct formats for the headers # the struct formats for the headers
TRANS_HDR = ">8s8scHHH" TRANS_HDR = ">8s8scHHH"
DATA_HDR = ">8s8s8s8sH8s" DATA_HDR = ">8s8s8s8sH8s"
...@@ -672,6 +689,14 @@ class FileStoragePacker(FileStorageFormatter): ...@@ -672,6 +689,14 @@ class FileStoragePacker(FileStorageFormatter):
self._tfile.close() self._tfile.close()
os.remove(self._name + ".pack") os.remove(self._name + ".pack")
return None return None
self._commit_lock_acquire()
self.locked = True
self._lock_acquire()
try:
self._file.seek(0, 2)
self.file_end = self._file.tell()
finally:
self._lock_release()
if ipos < self.file_end: if ipos < self.file_end:
self.copyRest(ipos) self.copyRest(ipos)
...@@ -795,34 +820,58 @@ class FileStoragePacker(FileStorageFormatter): ...@@ -795,34 +820,58 @@ class FileStoragePacker(FileStorageFormatter):
# After the pack time, all data records are copied. # After the pack time, all data records are copied.
# Copy one txn at a time, using copy() for data. # Copy one txn at a time, using copy() for data.
while ipos < self.file_end: # Release the commit lock every 20 copies
th = self._read_txn_header(ipos) self._lock_counter = 0
pos = self._tfile.tell()
self._copier.setTxnPos(pos) try:
self._tfile.write(th.asString()) while 1:
tend = ipos + th.tlen ipos = self.copyOne(ipos)
ipos += th.headerlen() except CorruptedDataError, err:
# The last call to copyOne() will raise
while ipos < tend: # CorruptedDataError, because it will attempt to read past
h = self._read_data_header(ipos) # the end of the file. Double-check that the exception
ipos += h.recordlen() # occurred for this reason.
prev_txn = None self._file.seek(0, 2)
if h.plen: endpos = self._file.tell()
data = self._file.read(h.plen) if endpos != err.pos:
else: raise
data = self.fetchBackpointer(h.oid, h.back)
if h.back: def copyOne(self, ipos):
prev_txn = self.getTxnFromData(h.oid, h.back) # The call below will raise CorruptedDataError at EOF.
th = self._read_txn_header(ipos)
self._copier.copy(h.oid, h.serial, data, h.version, self._lock_counter += 1
prev_txn, pos, self._tfile.tell()) if self._lock_counter % 20 == 0:
self._commit_lock_release()
tlen = self._tfile.tell() - pos pos = self._tfile.tell()
assert tlen == th.tlen self._copier.setTxnPos(pos)
self._tfile.write(p64(tlen)) self._tfile.write(th.asString())
ipos += 8 tend = ipos + th.tlen
ipos += th.headerlen()
self.index.update(self.tindex)
self.tindex.clear() while ipos < tend:
self.vindex.update(self.tvindex) h = self._read_data_header(ipos)
self.tvindex.clear() ipos += h.recordlen()
prev_txn = None
if h.plen:
data = self._file.read(h.plen)
else:
data = self.fetchBackpointer(h.oid, h.back)
if h.back:
prev_txn = self.getTxnFromData(h.oid, h.back)
self._copier.copy(h.oid, h.serial, data, h.version,
prev_txn, pos, self._tfile.tell())
tlen = self._tfile.tell() - pos
assert tlen == th.tlen
self._tfile.write(p64(tlen))
ipos += 8
self.index.update(self.tindex)
self.tindex.clear()
self.vindex.update(self.tvindex)
self.tvindex.clear()
if self._lock_counter % 20 == 0:
self._commit_lock_acquire()
return ipos
...@@ -25,11 +25,15 @@ try: ...@@ -25,11 +25,15 @@ try:
except ImportError: except ImportError:
from StringIO import StringIO from StringIO import StringIO
import threading
import time import time
from ZODB import DB from ZODB import DB
from Persistence import Persistent from Persistence import Persistent
from ZODB.referencesf import referencesf from ZODB.referencesf import referencesf
from ZODB.tests.MinPO import MinPO
from ZODB.tests.StorageTestBase import snooze
from ZODB.POSException import ConflictError
ZERO = '\0'*8 ZERO = '\0'*8
...@@ -376,3 +380,56 @@ class PackableStorage(PackableStorageBase): ...@@ -376,3 +380,56 @@ class PackableStorage(PackableStorageBase):
conn.sync() conn.sync()
eq(root['obj'].value, 7) eq(root['obj'].value, 7)
def checkPackWhileWriting(self):
# A storage should allow some reading and writing during
# a pack. This test attempts to exercise locking code
# in the storage to test that it is safe. It generates
# a lot of revisions, so that pack takes a long time.
db = DB(self._storage)
conn = db.open()
root = conn.root()
for i in range(10):
root[i] = MinPO(i)
get_transaction().commit()
snooze()
packt = time.time()
for j in range(10):
for i in range(10):
root[i].value = MinPO(i)
get_transaction().commit()
threads = [ClientThread(db) for i in range(4)]
for t in threads:
t.start()
db.pack(packt)
for t in threads:
t.join(30)
for t in threads:
t.join(1)
self.assert_(not t.isAlive())
# iterator over the storage to make sure it's sane
iter = self._storage.iterator()
for txn in iter:
for data in txn:
pass
iter.close()
class ClientThread(threading.Thread):
def __init__(self, db):
threading.Thread.__init__(self)
self.root = db.open().root()
def run(self):
for j in range(50):
try:
self.root[j % 10].value = MinPO(j)
get_transaction().commit()
except ConflictError:
get_transaction().abort()
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment