Commit 3806eaff authored by Jim Fulton's avatar Jim Fulton

Multi-threaded IO support.

parent 7ef494df
...@@ -36,6 +36,7 @@ import errno ...@@ -36,6 +36,7 @@ import errno
import logging import logging
import os import os
import sys import sys
import threading
import time import time
import ZODB.blob import ZODB.blob
import ZODB.interfaces import ZODB.interfaces
...@@ -128,7 +129,7 @@ class FileStorage( ...@@ -128,7 +129,7 @@ class FileStorage(
else: else:
self._tfile = None self._tfile = None
self._file_name = file_name self._file_name = os.path.abspath(file_name)
self._pack_gc = pack_gc self._pack_gc = pack_gc
self.pack_keep_old = pack_keep_old self.pack_keep_old = pack_keep_old
...@@ -167,6 +168,7 @@ class FileStorage( ...@@ -167,6 +168,7 @@ class FileStorage(
self._file = open(file_name, 'w+b') self._file = open(file_name, 'w+b')
self._file.write(packed_version) self._file.write(packed_version)
self._files = FilePool(self._file_name)
r = self._restore_index() r = self._restore_index()
if r is not None: if r is not None:
self._used_index = 1 # Marker for testing self._used_index = 1 # Marker for testing
...@@ -401,6 +403,7 @@ class FileStorage( ...@@ -401,6 +403,7 @@ class FileStorage(
def close(self): def close(self):
self._file.close() self._file.close()
self._files.close()
if hasattr(self,'_lock_file'): if hasattr(self,'_lock_file'):
self._lock_file.close() self._lock_file.close()
if self._tfile: if self._tfile:
...@@ -426,22 +429,22 @@ class FileStorage( ...@@ -426,22 +429,22 @@ class FileStorage(
"""Return pickle data and serial number.""" """Return pickle data and serial number."""
assert not version assert not version
self._lock_acquire() _file = self._files.get()
try: try:
pos = self._lookup_pos(oid) pos = self._lookup_pos(oid)
h = self._read_data_header(pos, oid) h = self._read_data_header(pos, oid, _file)
if h.plen: if h.plen:
data = self._file.read(h.plen) data = _file.read(h.plen)
return data, h.tid return data, h.tid
elif h.back: elif h.back:
# Get the data from the backpointer, but tid from # Get the data from the backpointer, but tid from
# current txn. # current txn.
data = self._loadBack_impl(oid, h.back)[0] data = self._loadBack_impl(oid, h.back, _file=_file)[0]
return data, h.tid return data, h.tid
else: else:
raise POSKeyError(oid) raise POSKeyError(oid)
finally: finally:
self._lock_release() self._files.put(_file)
def loadSerial(self, oid, serial): def loadSerial(self, oid, serial):
self._lock_acquire() self._lock_acquire()
...@@ -462,12 +465,12 @@ class FileStorage( ...@@ -462,12 +465,12 @@ class FileStorage(
self._lock_release() self._lock_release()
def loadBefore(self, oid, tid): def loadBefore(self, oid, tid):
self._lock_acquire() _file = self._files.get()
try: try:
pos = self._lookup_pos(oid) pos = self._lookup_pos(oid)
end_tid = None end_tid = None
while True: while True:
h = self._read_data_header(pos, oid) h = self._read_data_header(pos, oid, _file)
if h.tid < tid: if h.tid < tid:
break break
...@@ -477,13 +480,12 @@ class FileStorage( ...@@ -477,13 +480,12 @@ class FileStorage(
return None return None
if h.back: if h.back:
data, _, _, _ = self._loadBack_impl(oid, h.back) data, _, _, _ = self._loadBack_impl(oid, h.back, _file=_file)
return data, h.tid, end_tid return data, h.tid, end_tid
else: else:
return self._file.read(h.plen), h.tid, end_tid return _file.read(h.plen), h.tid, end_tid
finally: finally:
self._lock_release() self._files.put(_file)
def store(self, oid, oldserial, data, version, transaction): def store(self, oid, oldserial, data, version, transaction):
if self._is_read_only: if self._is_read_only:
...@@ -735,6 +737,32 @@ class FileStorage( ...@@ -735,6 +737,32 @@ class FileStorage(
finally: finally:
self._lock_release() self._lock_release()
def tpc_finish(self, transaction, f=None):
# Get write lock
self._files.write_lock()
try:
self._lock_acquire()
try:
if transaction is not self._transaction:
raise POSException.StorageTransactionError(
"tpc_finish called with wrong transaction")
try:
if f is not None:
f(self._tid)
u, d, e = self._ude
self._finish(self._tid, u, d, e)
self._clear_temp()
finally:
self._ude = None
self._transaction = None
self._commit_lock_release()
finally:
self._lock_release()
finally:
self._files.write_unlock()
def _finish(self, tid, u, d, e): def _finish(self, tid, u, d, e):
# If self._nextpos is 0, then the transaction didn't write any # If self._nextpos is 0, then the transaction didn't write any
# data, so we don't bother writing anything to the file. # data, so we don't bother writing anything to the file.
...@@ -1131,8 +1159,10 @@ class FileStorage( ...@@ -1131,8 +1159,10 @@ class FileStorage(
return return
have_commit_lock = True have_commit_lock = True
opos, index = pack_result opos, index = pack_result
self._files.write_lock()
self._lock_acquire() self._lock_acquire()
try: try:
self._files.empty()
self._file.close() self._file.close()
try: try:
os.rename(self._file_name, oldpath) os.rename(self._file_name, oldpath)
...@@ -1146,6 +1176,7 @@ class FileStorage( ...@@ -1146,6 +1176,7 @@ class FileStorage(
self._initIndex(index, self._tindex) self._initIndex(index, self._tindex)
self._pos = opos self._pos = opos
finally: finally:
self._files.write_unlock()
self._lock_release() self._lock_release()
# We're basically done. Now we need to deal with removed # We're basically done. Now we need to deal with removed
...@@ -2037,3 +2068,72 @@ class UndoSearch: ...@@ -2037,3 +2068,72 @@ class UndoSearch:
'description': d} 'description': d}
d.update(e) d.update(e)
return d return d
class FilePool:
closed = False
writing = False
def __init__(self, file_name):
self.name = file_name
self._files = []
self._out = []
self._cond = threading.Condition()
def write_lock(self):
self._cond.acquire()
try:
self.writing = True
while self._out:
self._cond.wait()
finally:
self._cond.release()
def write_unlock(self):
self._cond.acquire()
self.writing = False
self._cond.notifyAll()
self._cond.release()
def get(self):
self._cond.acquire()
try:
while self.writing:
self._cond.wait()
if self.closed:
raise ValueError('closed')
try:
f = self._files.pop()
except IndexError:
f = open(self.name, 'rb')
self._out.append(f)
return f
finally:
self._cond.release()
def put(self, f):
self._out.remove(f)
self._files.append(f)
if not self._out:
self._cond.acquire()
try:
if self.writing and not self._out:
self._cond.notifyAll()
finally:
self._cond.release()
def empty(self):
while self._files:
self._files.pop().close()
def close(self):
self._cond.acquire()
self.closed = True
self._cond.release()
self.write_lock()
try:
self.empty()
finally:
self.write_unlock()
...@@ -134,21 +134,24 @@ class FileStorageFormatter(object): ...@@ -134,21 +134,24 @@ class FileStorageFormatter(object):
self._file.seek(pos) self._file.seek(pos)
return u64(self._file.read(8)) return u64(self._file.read(8))
def _read_data_header(self, pos, oid=None): def _read_data_header(self, pos, oid=None, _file=None):
"""Return a DataHeader object for data record at pos. """Return a DataHeader object for data record at pos.
If ois is not None, raise CorruptedDataError if oid passed If ois is not None, raise CorruptedDataError if oid passed
does not match oid in file. does not match oid in file.
""" """
self._file.seek(pos) if _file is None:
s = self._file.read(DATA_HDR_LEN) _file = self._file
_file.seek(pos)
s = _file.read(DATA_HDR_LEN)
if len(s) != DATA_HDR_LEN: if len(s) != DATA_HDR_LEN:
raise CorruptedDataError(oid, s, pos) raise CorruptedDataError(oid, s, pos)
h = DataHeaderFromString(s) h = DataHeaderFromString(s)
if oid is not None and oid != h.oid: if oid is not None and oid != h.oid:
raise CorruptedDataError(oid, s, pos) raise CorruptedDataError(oid, s, pos)
if not h.plen: if not h.plen:
h.back = u64(self._file.read(8)) h.back = u64(_file.read(8))
return h return h
def _read_txn_header(self, pos, tid=None): def _read_txn_header(self, pos, tid=None):
...@@ -164,20 +167,22 @@ class FileStorageFormatter(object): ...@@ -164,20 +167,22 @@ class FileStorageFormatter(object):
h.ext = self._file.read(h.elen) h.ext = self._file.read(h.elen)
return h return h
def _loadBack_impl(self, oid, back, fail=True): def _loadBack_impl(self, oid, back, fail=True, _file=None):
# shared implementation used by various _loadBack methods # shared implementation used by various _loadBack methods
# #
# If the backpointer ultimately resolves to 0: # If the backpointer ultimately resolves to 0:
# If fail is True, raise KeyError for zero backpointer. # If fail is True, raise KeyError for zero backpointer.
# If fail is False, return the empty data from the record # If fail is False, return the empty data from the record
# with no backpointer. # with no backpointer.
if _file is None:
_file = self._file
while 1: while 1:
if not back: if not back:
# If backpointer is 0, object does not currently exist. # If backpointer is 0, object does not currently exist.
raise POSKeyError(oid) raise POSKeyError(oid)
h = self._read_data_header(back) h = self._read_data_header(back, _file=_file)
if h.plen: if h.plen:
return self._file.read(h.plen), h.tid, back, h.tloc return _file.read(h.plen), h.tid, back, h.tloc
if h.back == 0 and not fail: if h.back == 0 and not fail:
return None, h.tid, back, h.tloc return None, h.tid, back, h.tloc
back = h.back back = h.back
......
...@@ -587,10 +587,10 @@ def deal_with_finish_failures(): ...@@ -587,10 +587,10 @@ def deal_with_finish_failures():
>>> handler.uninstall() >>> handler.uninstall()
>>> fs.load('\0'*8, '') >>> fs.load('\0'*8, '') # doctest: +ELLIPSIS
Traceback (most recent call last): Traceback (most recent call last):
... ...
ValueError: I/O operation on closed file ValueError: ...
>>> db.close() >>> db.close()
>>> fs = ZODB.FileStorage.FileStorage('data.fs') >>> fs = ZODB.FileStorage.FileStorage('data.fs')
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment