Commit 01d74a54 authored by bescoto's avatar bescoto

Final checkin for 0.11.3 (fixed pipeline flushing problem)


git-svn-id: http://svn.savannah.nongnu.org/svn/rdiff-backup@292 2b77aa54-bcbc-44c9-a7ec-4f6cf2b41109
parent b2612dd4
New in v0.11.3 (2003/04/01) New in v0.11.3 (2003/03/04)
--------------------------- ---------------------------
Fixed a number of bugs reported by Olivier Mueller: Fixed a number of bugs reported by Olivier Mueller:
...@@ -13,6 +13,8 @@ Fixed a number of bugs reported by Olivier Mueller: ...@@ -13,6 +13,8 @@ Fixed a number of bugs reported by Olivier Mueller:
--print-statistics option works again (before it would silently --print-statistics option works again (before it would silently
ignored). ignored).
Fixed cache pipeline overflow bug. This error could appear on
large remote backups when many files have not changed.
New in v0.11.2 (2003/03/01) New in v0.11.2 (2003/03/01)
......
---------[ Short term (next version) ]-------------------------
Fix --print-statistics
---------[ Medium term ]--------------------------------------- ---------[ Medium term ]---------------------------------------
Look at Kent Borg's suggestion for restore options and digests. Look at Kent Borg's suggestion for restore options and digests.
......
...@@ -40,7 +40,7 @@ conn_bufsize = 98304 ...@@ -40,7 +40,7 @@ conn_bufsize = 98304
# This is used in rorpiter.CacheIndexable. The number represents the # This is used in rorpiter.CacheIndexable. The number represents the
# number of rpaths which may be stuck in buffers when moving over a # number of rpaths which may be stuck in buffers when moving over a
# remote connection. # remote connection.
pipeline_max_length = int(conn_bufsize / 150)*2 pipeline_max_length = 500
# True if script is running as a server # True if script is running as a server
server = None server = None
......
...@@ -601,5 +601,6 @@ def checkdest_if_necessary(dest_rp): ...@@ -601,5 +601,6 @@ def checkdest_if_necessary(dest_rp):
""" """
need_check = checkdest_need_check(dest_rp) need_check = checkdest_need_check(dest_rp)
if need_check == 1: if need_check == 1:
Log("Previous backup seems to have failed, checking now.", 2) Log("Previous backup seems to have failed, regressing "
"destination now.", 2)
dest_rp.conn.regress.Regress(dest_rp) dest_rp.conn.regress.Regress(dest_rp)
...@@ -22,7 +22,7 @@ ...@@ -22,7 +22,7 @@
from __future__ import generators from __future__ import generators
import errno import errno
import Globals, metadata, rorpiter, TempFile, Hardlink, robust, increment, \ import Globals, metadata, rorpiter, TempFile, Hardlink, robust, increment, \
rpath, static, log, selection, Time, Rdiff, statistics rpath, static, log, selection, Time, Rdiff, statistics, iterfile
def Mirror(src_rpath, dest_rpath): def Mirror(src_rpath, dest_rpath):
"""Turn dest_rpath into a copy of src_rpath""" """Turn dest_rpath into a copy of src_rpath"""
...@@ -65,7 +65,7 @@ class SourceStruct: ...@@ -65,7 +65,7 @@ class SourceStruct:
sel = selection.Select(rpath) sel = selection.Select(rpath)
sel.ParseArgs(tuplelist, filelists) sel.ParseArgs(tuplelist, filelists)
sel.set_iter() sel.set_iter()
cache_size = Globals.pipeline_max_length * 2 # 2 because to and from cache_size = Globals.pipeline_max_length * 3 # to and from+leeway
cls.source_select = rorpiter.CacheIndexable(sel, cache_size) cls.source_select = rorpiter.CacheIndexable(sel, cache_size)
def get_source_select(cls): def get_source_select(cls):
...@@ -96,6 +96,9 @@ class SourceStruct: ...@@ -96,6 +96,9 @@ class SourceStruct:
diff_rorp.set_attached_filetype('snapshot') diff_rorp.set_attached_filetype('snapshot')
for dest_sig in dest_sigiter: for dest_sig in dest_sigiter:
if dest_sig is iterfile.RORPIterFlushRepeat:
yield iterfile.RORPIterFlush # Flush buffer when get_sigs does
continue
src_rp = (source_rps.get(dest_sig.index) or src_rp = (source_rps.get(dest_sig.index) or
rpath.RORPath(dest_sig.index)) rpath.RORPath(dest_sig.index))
diff_rorp = src_rp.getRORPath() diff_rorp = src_rp.getRORPath()
...@@ -139,29 +142,48 @@ class DestinationStruct: ...@@ -139,29 +142,48 @@ class DestinationStruct:
""" """
dest_iter = cls.get_dest_select(baserp, for_increment) dest_iter = cls.get_dest_select(baserp, for_increment)
collated = rorpiter.Collate2Iters(source_iter, dest_iter) collated = rorpiter.Collate2Iters(source_iter, dest_iter)
cls.CCPP = CacheCollatedPostProcess(collated, cls.CCPP = CacheCollatedPostProcess(
Globals.pipeline_max_length*2) collated, Globals.pipeline_max_length*4)
# pipeline len adds some leeway over just*3 (to and from and back)
def get_sigs(cls, dest_base_rpath): def get_sigs(cls, dest_base_rpath):
"""Yield signatures of any changed destination files""" """Yield signatures of any changed destination files
If we are backing up across a pipe, we must flush the pipeline
every so often so it doesn't get congested on destination end.
"""
flush_threshold = int(Globals.pipeline_max_length/2)
num_rorps_skipped = 0
for src_rorp, dest_rorp in cls.CCPP: for src_rorp, dest_rorp in cls.CCPP:
if (src_rorp and dest_rorp and src_rorp == dest_rorp and if (src_rorp and dest_rorp and src_rorp == dest_rorp and
(not Globals.preserve_hardlinks or (not Globals.preserve_hardlinks or
Hardlink.rorp_eq(src_rorp, dest_rorp))): continue Hardlink.rorp_eq(src_rorp, dest_rorp))):
index = src_rorp and src_rorp.index or dest_rorp.index num_rorps_skipped += 1
cls.CCPP.flag_changed(index) if (Globals.backup_reader is not Globals.backup_writer and
if (Globals.preserve_hardlinks and num_rorps_skipped > flush_threshold):
Hardlink.islinked(src_rorp or dest_rorp)): num_rorps_skipped = 0
dest_sig = rpath.RORPath(index) yield iterfile.RORPIterFlushRepeat
dest_sig.flaglinked(Hardlink.get_link_index(dest_sig)) else:
elif dest_rorp: index = src_rorp and src_rorp.index or dest_rorp.index
dest_sig = dest_rorp.getRORPath() cls.CCPP.flag_changed(index)
if dest_rorp.isreg(): yield cls.get_one_sig(dest_base_rpath, index,
dest_rp = dest_base_rpath.new_index(index) src_rorp, dest_rorp)
assert dest_rp.isreg()
dest_sig.setfile(Rdiff.get_signature(dest_rp)) def get_one_sig(cls, dest_base_rpath, index, src_rorp, dest_rorp):
else: dest_sig = rpath.RORPath(index) """Return a signature given source and destination rorps"""
yield dest_sig if (Globals.preserve_hardlinks and
Hardlink.islinked(src_rorp or dest_rorp)):
dest_sig = rpath.RORPath(index)
dest_sig.flaglinked(Hardlink.get_link_index(dest_sig))
elif dest_rorp:
dest_sig = dest_rorp.getRORPath()
if dest_rorp.isreg():
dest_rp = dest_base_rpath.new_index(index)
assert dest_rp.isreg()
dest_sig.setfile(Rdiff.get_signature(dest_rp))
else: dest_sig = rpath.RORPath(index)
return dest_sig
def patch(cls, dest_rpath, source_diffiter, start_index = ()): def patch(cls, dest_rpath, source_diffiter, start_index = ()):
"""Patch dest_rpath with an rorpiter of diffs""" """Patch dest_rpath with an rorpiter of diffs"""
...@@ -301,6 +323,9 @@ class CacheCollatedPostProcess: ...@@ -301,6 +323,9 @@ class CacheCollatedPostProcess:
def get_source_rorp(self, index): def get_source_rorp(self, index):
"""Retrieve source_rorp with given index from cache""" """Retrieve source_rorp with given index from cache"""
assert index >= self.cache_indicies[0], \
("CCPP index out of order: %s %s" %
(repr(index), repr(self.cache_indicies[0])))
return self.cache_dict[index][0] return self.cache_dict[index][0]
def get_mirror_rorp(self, index): def get_mirror_rorp(self, index):
...@@ -388,7 +413,7 @@ class PatchITRB(rorpiter.ITRBranch): ...@@ -388,7 +413,7 @@ class PatchITRB(rorpiter.ITRBranch):
""" """
if not new_rp.isreg(): return 1 if not new_rp.isreg(): return 1
cached_rorp = self.CCPP.get_source_rorp(diff_rorp.index) cached_rorp = self.CCPP.get_source_rorp(diff_rorp.index)
if cached_rorp.equal_loose(new_rp): return 1 if cached_rorp and cached_rorp.equal_loose(new_rp): return 1
log.ErrorLog.write_if_open("UpdateError", diff_rorp, "Updated mirror " log.ErrorLog.write_if_open("UpdateError", diff_rorp, "Updated mirror "
"temp file %s does not match source" % (new_rp.path,)) "temp file %s does not match source" % (new_rp.path,))
return 0 return 0
......
...@@ -144,7 +144,8 @@ class LowLevelPipeConnection(Connection): ...@@ -144,7 +144,8 @@ class LowLevelPipeConnection(Connection):
def _putiter(self, iterator, req_num): def _putiter(self, iterator, req_num):
"""Put an iterator through the pipe""" """Put an iterator through the pipe"""
self._write("i", str(VirtualFile.new(rorpiter.ToFile(iterator))), self._write("i",
str(VirtualFile.new(iterfile.RORPIterToFile(iterator))),
req_num) req_num)
def _putrpath(self, rpath, req_num): def _putrpath(self, rpath, req_num):
...@@ -226,8 +227,7 @@ class LowLevelPipeConnection(Connection): ...@@ -226,8 +227,7 @@ class LowLevelPipeConnection(Connection):
elif format_string == "b": result = data elif format_string == "b": result = data
elif format_string == "f": result = VirtualFile(self, int(data)) elif format_string == "f": result = VirtualFile(self, int(data))
elif format_string == "i": elif format_string == "i":
result = rorpiter.FromFile(iterfile.BufferedRead( result = iterfile.FileToRORPIter(VirtualFile(self, int(data)))
VirtualFile(self, int(data))))
elif format_string == "r": result = self._getrorpath(data) elif format_string == "r": result = self._getrorpath(data)
elif format_string == "R": result = self._getrpath(data) elif format_string == "R": result = self._getrpath(data)
else: else:
...@@ -456,7 +456,8 @@ class VirtualFile: ...@@ -456,7 +456,8 @@ class VirtualFile:
getbyid = classmethod(getbyid) getbyid = classmethod(getbyid)
def readfromid(cls, id, length): def readfromid(cls, id, length):
return cls.vfiles[id].read(length) if length is None: return cls.vfiles[id].read()
else: return cls.vfiles[id].read(length)
readfromid = classmethod(readfromid) readfromid = classmethod(readfromid)
def readlinefromid(cls, id): def readlinefromid(cls, id):
...@@ -487,7 +488,7 @@ class VirtualFile: ...@@ -487,7 +488,7 @@ class VirtualFile:
self.connection = connection self.connection = connection
self.id = id self.id = id
def read(self, length = -1): def read(self, length = None):
return self.connection.VirtualFile.readfromid(self.id, length) return self.connection.VirtualFile.readfromid(self.id, length)
def readline(self): def readline(self):
......
...@@ -19,8 +19,8 @@ ...@@ -19,8 +19,8 @@
"""Convert an iterator to a file object and vice-versa""" """Convert an iterator to a file object and vice-versa"""
import cPickle, array import cPickle, array, types
import Globals, C, robust, log import Globals, C, robust, log, rpath
class IterFileException(Exception): pass class IterFileException(Exception): pass
...@@ -129,7 +129,7 @@ class IterVirtualFile(UnwrapFile): ...@@ -129,7 +129,7 @@ class IterVirtualFile(UnwrapFile):
def addtobuffer(self): def addtobuffer(self):
"""Read a chunk from the file and add it to the buffer""" """Read a chunk from the file and add it to the buffer"""
assert self.iwf.currently_in_file assert self.iwf.currently_in_file
type, data = self._get() type, data = self.iwf._get()
if type == "e": if type == "e":
self.iwf.currently_in_file = None self.iwf.currently_in_file = None
raise data raise data
...@@ -238,32 +238,183 @@ class FileWrappingIter: ...@@ -238,32 +238,183 @@ class FileWrappingIter:
def close(self): self.closed = 1 def close(self): self.closed = 1
class BufferedRead: class RORPIterFlush:
"""Buffer the .read() calls to the given file """Used to signal that a RORPIterToFile should flush buffer"""
pass
class RORPIterFlushRepeat(RORPIterFlush):
"""Flush, but then cause RORPIter to yield this same object
This is used to lessen overhead and latency when a file is sent Thus if we put together a pipeline of these, one RORPIterContFlush
over a connection. Profiling said that arrays were faster than can cause all the segments to flush in sequence.
strings here.
""" """
def __init__(self, file): pass
self.file = file
self.array_buf = array.array('c') class RORPIterToFile(FileWrappingIter):
self.bufsize = Globals.conn_bufsize """Take a RORPIter and give it a file-ish interface
This is how we send signatures and diffs across the line. As
sending each one separately via a read() call would result in a
lot of latency, the read()'s are buffered - a read() call with no
arguments will return a variable length string (possibly empty).
To flush the RORPIterToFile, have the iterator yield a
RORPIterFlush class.
"""
def __init__(self, rpiter, max_buffer_bytes = None, max_buffer_rps = None):
"""RORPIterToFile initializer
max_buffer_bytes is the maximum size of the buffer in bytes.
max_buffer_rps is the maximum size of the buffer in rorps.
def read(self, l = -1): """
array_buf = self.array_buf self.max_buffer_bytes = max_buffer_bytes or Globals.conn_bufsize
if l < 0: # Read as much as possible self.max_buffer_rps = max_buffer_rps or Globals.pipeline_max_length
result = array_buf.tostring() + self.file.read() self.rorps_in_buffer = 0
del array_buf[:] self.next_in_line = None
FileWrappingIter.__init__(self, rpiter)
def read(self, length = None):
"""Return some number of bytes, including 0"""
assert not self.closed
if length is None:
while (len(self.array_buf) < self.max_buffer_bytes and
self.rorps_in_buffer < self.max_buffer_rps):
if not self.addtobuffer(): break
result = self.array_buf.tostring()
del self.array_buf[:]
self.rorps_in_buffer = 0
return result return result
else:
assert length >= 0
read_buffer = self.read()
while len(read_buffer) < length: read_buffer += self.read()
self.array_buf.fromstring(read_buffer[length:])
return read_buffer[length:]
if len(array_buf) < l: # Try to make buffer at least as long as l def addtobuffer(self):
array_buf.fromstring(self.file.read(max(self.bufsize, l))) """Add some number of bytes to the buffer. Return false if done"""
result = array_buf[:l].tostring() if self.currently_in_file:
del array_buf[:l] self.addfromfile("c")
return result if not self.currently_in_file: self.rorps_in_buffer += 1
else:
if self.next_in_line:
currentobj = self.next_in_line
self.next_in_line = 0
else:
try: currentobj = self.iter.next()
except StopIteration:
self.addfinal()
return None
def close(self): return self.file.close() if hasattr(currentobj, "read") and hasattr(currentobj, "close"):
self.currently_in_file = currentobj
self.addfromfile("f")
elif (type(currentobj) is types.ClassType and
issubclass(currentobj, iterfile.RORPIterFlush)):
if currentobj is iterfile.RORPIterFlushRepeat:
self.add_flush_repeater()
return None
else: self.addrorp(currentobj)
return 1
def add_flush_repeater(self):
"""Add a RORPIterFlushRepeat object to the buffer"""
pickle = cPickle.dumps(iterfile.RORPIterFlushRepeat, 1)
self.array_buf.fromstring("o")
self.array_buf.fromstring(C.long2str(long(len(pickle))))
self.array_buf.fromstring(pickle)
def addrorp(self, rorp):
"""Add a rorp to the buffer"""
if rorp.file:
pickle = cPickle.dumps((rorp.index, rorp.data, 1), 1)
self.next_in_line = rorp.file
else:
pickle = cPickle.dumps((rorp.index, rorp.data, 0), 1)
self.rorps_in_buffer += 1
self.array_buf.fromstring("o")
self.array_buf.fromstring(C.long2str(long(len(pickle))))
self.array_buf.fromstring(pickle)
def addfinal(self):
"""Signal the end of the iterator to the other end"""
self.array_buf.fromstring("z")
self.array_buf.fromstring(C.long2str(0L))
def close(self): self.closed = 1
class FileToRORPIter(IterWrappingFile):
"""Take a RORPIterToFile and turn it back into a RORPIter"""
def __init__(self, file):
IterWrappingFile.__init__(self, file)
self.buf = ""
def __iter__(self): return self
def next(self):
"""Return next object in iter, or raise StopIteration"""
if self.currently_in_file:
self.currently_in_file.close()
type = None
while not type: type, data = self._get()
if type == "z": raise StopIteration
elif type == "o":
if data is iterfile.RORPIterFlushRepeat: return data
else: return self.get_rorp(data)
else: raise IterFileException("Bad file type %s" % (type,))
def get_rorp(self, pickled_tuple):
"""Return rorp that data represents"""
index, data_dict, num_files = pickled_tuple
rorp = rpath.RORPath(index, data_dict)
if num_files:
assert num_files == 1, "Only one file accepted right now"
rorp.setfile(self.get_file())
return rorp
def get_file(self):
"""Read file object from file"""
type, data = self._get()
if type == "f":
file = IterVirtualFile(self, data)
if data: self.currently_in_file = file
else: self.currently_in_file = None
return file
assert type == "e", "Expected type e, got %s" % (type,)
assert isinstance(data, Exception)
return ErrorFile(data)
def _get(self):
"""Return (type, data or object) pair
This is like UnwrapFile._get() but reads in variable length
blocks. Also type "z" is allowed, which means end of
iterator. An empty read() is not considered to mark the end
of remote iter.
"""
if not self.buf: self.buf += self.file.read()
if not self.buf: return None, None
assert len(self.buf) >= 8, "Unexpected end of RORPIter file"
type, length = self.buf[0], C.str2long(self.buf[1:8])
data = self.buf[8:8+length]
self.buf = self.buf[8+length:]
if type == "o" or type == "e": return type, cPickle.loads(data)
else: return type, data
class ErrorFile:
"""File-like that just raises error (used by FileToRORPIter above)"""
def __init__(self, exc):
"""Initialize new ErrorFile. exc is the exception to raise on read"""
self.exc = exc
def read(self, l=-1): raise self.exc
def close(self): return None
import iterfile
...@@ -33,48 +33,6 @@ import os, tempfile, UserList, types ...@@ -33,48 +33,6 @@ import os, tempfile, UserList, types
import Globals, rpath, iterfile import Globals, rpath, iterfile
class RORPIterException(Exception): pass
def ToRaw(rorp_iter):
"""Convert a rorp iterator to raw form"""
for rorp in rorp_iter:
if rorp.file:
yield (rorp.index, rorp.data, 1)
yield rorp.file
else: yield (rorp.index, rorp.data, 0)
def FromRaw(raw_iter):
"""Convert raw rorp iter back to standard form"""
for index, data, num_files in raw_iter:
rorp = rpath.RORPath(index, data)
if num_files:
assert num_files == 1, "Only one file accepted right now"
rorp.setfile(get_next_file(raw_iter))
yield rorp
class ErrorFile:
"""Used by get_next_file below, file-like that just raises error"""
def __init__(self, exc):
"""Initialize new ErrorFile. exc is the exception to raise on read"""
self.exc = exc
def read(self, l=-1): raise self.exc
def close(self): return None
def get_next_file(iter):
"""Return the next element of an iterator, raising error if none"""
try: next = iter.next()
except StopIteration: raise RORPIterException("Unexpected end to iter")
if isinstance(next, Exception): return ErrorFile(next)
return next
def ToFile(rorp_iter):
"""Return file version of iterator"""
return iterfile.FileWrappingIter(ToRaw(rorp_iter))
def FromFile(fileobj):
"""Recover rorp iterator from file interface"""
return FromRaw(iterfile.IterWrappingFile(fileobj))
def CollateIterators(*rorp_iters): def CollateIterators(*rorp_iters):
"""Collate RORPath iterators by index """Collate RORPath iterators by index
......
...@@ -52,15 +52,98 @@ class testIterFile(unittest.TestCase): ...@@ -52,15 +52,98 @@ class testIterFile(unittest.TestCase):
assert new_iter.next() == "foo" assert new_iter.next() == "foo"
self.assertRaises(StopIteration, new_iter.next) self.assertRaises(StopIteration, new_iter.next)
class testRORPIters(unittest.TestCase):
"""Test sending rorpiter back and forth"""
def setUp(self):
"""Make testfiles/output directory and a few files"""
Myrm("testfiles/output")
self.outputrp = rpath.RPath(Globals.local_connection,
"testfiles/output")
self.regfile1 = self.outputrp.append("reg1")
self.regfile2 = self.outputrp.append("reg2")
self.regfile3 = self.outputrp.append("reg3")
self.outputrp.mkdir()
fp = self.regfile1.open("wb")
fp.write("hello")
fp.close()
self.regfile1.setfile(self.regfile1.open("rb"))
self.regfile2.touch()
self.regfile2.setfile(self.regfile2.open("rb"))
fp = self.regfile3.open("wb")
fp.write("goodbye")
fp.close()
self.regfile3.setfile(self.regfile3.open("rb"))
self.regfile1.setdata()
self.regfile2.setdata()
self.regfile3.setdata()
class testBufferedRead(unittest.TestCase): def print_RORPIterFile(self, rpiter_file):
def testBuffering(self): """Print the given rorpiter file"""
"""Test buffering a StringIO""" while 1:
fp = StringIO.StringIO("12345678"*10000) buf = rpiter_file.read()
bfp = BufferedRead(fp) sys.stdout.write(buf)
assert bfp.read(5) == "12345" if buf[0] == "z": break
assert bfp.read(4) == "6781"
assert len(bfp.read(75000)) == 75000 def testBasic(self):
"""Test basic conversion"""
l = [self.outputrp, self.regfile1, self.regfile2, self.regfile3]
i_out = FileToRORPIter(RORPIterToFile(iter(l)))
out1 = i_out.next()
assert out1 == self.outputrp
out2 = i_out.next()
assert out2 == self.regfile1
fp = out2.open("rb")
assert fp.read() == "hello"
assert not fp.close()
out3 = i_out.next()
assert out3 == self.regfile2
fp = out3.open("rb")
assert fp.read() == ""
assert not fp.close()
i_out.next()
self.assertRaises(StopIteration, i_out.next)
def testFlush(self):
"""Test flushing property of RORPIterToFile"""
l = [self.outputrp, RORPIterFlush, self.outputrp]
filelike = RORPIterToFile(iter(l))
new_filelike = StringIO.StringIO((filelike.read() + "z" +
C.long2str(0L)))
i_out = FileToRORPIter(new_filelike)
assert i_out.next() == self.outputrp
self.assertRaises(StopIteration, i_out.next)
i_out2 = FileToRORPIter(filelike)
assert i_out2.next() == self.outputrp
self.assertRaises(StopIteration, i_out2.next)
def testFlushRepeat(self):
"""Test flushing like above, but have Flush obj emerge from iter"""
l = [self.outputrp, RORPIterFlushRepeat, self.outputrp]
filelike = RORPIterToFile(iter(l))
new_filelike = StringIO.StringIO((filelike.read() + "z" +
C.long2str(0L)))
i_out = FileToRORPIter(new_filelike)
assert i_out.next() == self.outputrp
assert i_out.next() is RORPIterFlushRepeat
self.assertRaises(StopIteration, i_out.next)
i_out2 = FileToRORPIter(filelike)
assert i_out2.next() == self.outputrp
self.assertRaises(StopIteration, i_out2.next)
if __name__ == "__main__": unittest.main() if __name__ == "__main__": unittest.main()
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment