Commit a24ac864 authored by bescoto's avatar bescoto

Iterfiles and iterrorps now can contain exceptions.


git-svn-id: http://svn.savannah.nongnu.org/svn/rdiff-backup@283 2b77aa54-bcbc-44c9-a7ec-4f6cf2b41109
parent 2eabc158
......@@ -40,10 +40,15 @@ class UnwrapFile:
def _get(self):
"""Return pair (type, data) next in line on the file
type is a single character which is either "o" for object, "f"
for file, "c" for a continution of a file, or None if no more
data can be read. Data is either the file's data, if type is
"c" or "f", or the actual object if the type is "o".
type is a single character which is either
"o" for object,
"f" for file,
"c" for a continution of a file,
"e" for an exception, or
None if no more data can be read.
Data is either the file's data, if type is "c" or "f", or the
actual object if the type is "o" or "e".
"""
header = self.file.read(8)
......@@ -52,7 +57,7 @@ class UnwrapFile:
assert None, "Header %s is only %d bytes" % (header, len(header))
type, length = header[0], C.str2long(header[1:])
buf = self.file.read(length)
if type == "o": return type, cPickle.loads(buf)
if type == "o" or type == "e": return type, cPickle.loads(buf)
else: return type, buf
......@@ -74,7 +79,7 @@ class IterWrappingFile(UnwrapFile):
self.currently_in_file.close() # no error checking by this point
type, data = self._get()
if not type: raise StopIteration
if type == "o": return data
if type == "o" or type == "e": return data
elif type == "f":
file = IterVirtualFile(self, data)
if data: self.currently_in_file = file
......@@ -125,6 +130,9 @@ class IterVirtualFile(UnwrapFile):
"""Read a chunk from the file and add it to the buffer"""
assert self.iwf.currently_in_file
type, data = self._get()
if type == "e":
self.iwf.currently_in_file = None
raise data
assert type == "c", "Type is %s instead of c" % type
if data:
self.buffer += data
......@@ -179,39 +187,44 @@ class FileWrappingIter:
otherwise return true.
"""
array_buf = self.array_buf
if self.currently_in_file:
array_buf.fromstring("c")
array_buf.fromstring(self.addfromfile())
if self.currently_in_file: self.addfromfile("c")
else:
try: currentobj = self.iter.next()
except StopIteration: return None
if hasattr(currentobj, "read") and hasattr(currentobj, "close"):
self.currently_in_file = currentobj
array_buf.fromstring("f")
array_buf.fromstring(self.addfromfile())
self.addfromfile("f")
else:
pickle = cPickle.dumps(currentobj, 1)
array_buf.fromstring("o")
array_buf.fromstring(C.long2str(long(len(pickle))))
array_buf.fromstring(pickle)
self.array_buf.fromstring("o")
self.array_buf.fromstring(C.long2str(long(len(pickle))))
self.array_buf.fromstring(pickle)
return 1
def addfromfile(self):
"""Read a chunk from the current file and return it"""
# Check file read for errors, buf = "" if find one
def addfromfile(self, prefix_letter):
"""Read a chunk from the current file and add to array_buf
prefix_letter and the length will be prepended to the file
data. If there is an exception while reading the file, the
exception will be added to array_buf instead.
"""
buf = robust.check_common_error(self.read_error_handler,
self.currently_in_file.read,
[Globals.blocksize])
if not buf:
if buf == "" or buf is None:
assert not self.currently_in_file.close()
self.currently_in_file = None
return C.long2str(long(len(buf))) + buf
if buf is None: # error occurred above, encode exception
prefix_letter = "e"
buf = cPickle.dumps(self.last_exception, 1)
total = "".join((prefix_letter, C.long2str(long(len(buf))), buf))
self.array_buf.fromstring(total)
def read_error_handler(self, exc, blocksize):
"""Log error when reading from file"""
log.Log("Error '%s' reading from fileobj, truncating" % (str(exc),), 2)
return ""
self.last_exception = exc
return None
def _l2s_old(self, l):
"""Convert long int to string of 7 characters"""
......
......@@ -30,8 +30,7 @@ files), where files is the number of files attached (usually 1 or
from __future__ import generators
import os, tempfile, UserList, types
import librsync, Globals, Rdiff, Hardlink, robust, log, static, \
rpath, iterfile
import Globals, rpath, iterfile
class RORPIterException(Exception): pass
......@@ -50,13 +49,22 @@ def FromRaw(raw_iter):
rorp = rpath.RORPath(index, data)
if num_files:
assert num_files == 1, "Only one file accepted right now"
rorp.setfile(getnext(raw_iter))
rorp.setfile(get_next_file(raw_iter))
yield rorp
def getnext(iter):
class ErrorFile:
"""Used by get_next_file below, file-like that just raises error"""
def __init__(self, exc):
"""Initialize new ErrorFile. exc is the exception to raise on read"""
self.exc = exc
def read(self, l=-1): raise self.exc
def close(self): return None
def get_next_file(iter):
"""Return the next element of an iterator, raising error if none"""
try: next = iter.next()
except StopIteration: raise RORPIterException("Unexpected end to iter")
if isinstance(next, Exception): return ErrorFile(next)
return next
def ToFile(rorp_iter):
......@@ -258,7 +266,7 @@ class IterTreeReducer:
base_index = to_be_finished.base_index
if base_index != index[:len(base_index)]:
# out of the tree, finish with to_be_finished
to_be_finished.call_end_proc()
to_be_finished.end_process()
del branches[-1]
if not branches: return None
branches[-1].branch_process(to_be_finished)
......@@ -271,18 +279,12 @@ class IterTreeReducer:
self.branches.append(branch)
return branch
def process_w_branch(self, branch, args):
"""Run start_process on latest branch"""
robust.check_common_error(branch.on_error,
branch.start_process, args)
if not branch.caught_exception: branch.start_successful = 1
def Finish(self):
"""Call at end of sequence to tie everything up"""
if self.index is None or self.root_fast_processed: return
while 1:
to_be_finished = self.branches.pop()
to_be_finished.call_end_proc()
to_be_finished.end_process()
if not self.branches: break
self.branches[-1].branch_process(to_be_finished)
......@@ -303,26 +305,19 @@ class IterTreeReducer:
if self.root_branch.can_fast_process(*args):
self.root_branch.fast_process(*args)
self.root_fast_processed = 1
else: self.process_w_branch(self.root_branch, args)
else: self.root_branch.start_process(*args)
self.index = index
return 1
if index <= self.index:
log.Log("Warning: oldindex %s >= newindex %s" %
(self.index, index), 2)
return 1
assert index > self.index, "Index out of order"
if self.finish_branches(index) is None:
return None # We are no longer in the main tree
last_branch = self.branches[-1]
if last_branch.start_successful:
if last_branch.can_fast_process(*args):
robust.check_common_error(last_branch.on_error,
last_branch.fast_process, args)
last_branch.fast_process(*args)
else:
branch = self.add_branch(index)
self.process_w_branch(branch, args)
else: last_branch.log_prev_error(index)
branch.start_process(*args)
self.index = index
return 1
......@@ -338,17 +333,6 @@ class ITRBranch:
"""
base_index = index = None
finished = None
caught_exception = start_successful = None
def call_end_proc(self):
"""Runs the end_process on self, checking for errors"""
if self.finished or not self.start_successful:
self.caught_exception = 1
if self.caught_exception: self.log_prev_error(self.base_index)
else: robust.check_common_error(self.on_error, self.end_process)
self.finished = 1
def start_process(self, *args):
"""Do some initial processing (stub)"""
pass
......@@ -359,7 +343,6 @@ class ITRBranch:
def branch_process(self, branch):
"""Process a branch right after it is finished (stub)"""
assert branch.finished
pass
def can_fast_process(self, *args):
......@@ -370,20 +353,6 @@ class ITRBranch:
"""Process args without new child branch (stub)"""
pass
def on_error(self, exc, *args):
"""This is run on any exception in start/end-process"""
self.caught_exception = 1
if args and args[0] and isinstance(args[0], tuple):
filename = os.path.join(*args[0])
elif self.index: filename = os.path.join(*self.index)
else: filename = "."
log.Log("Error '%s' processing %s" % (exc, filename), 2)
def log_prev_error(self, index):
"""Call function if no pending exception"""
log.Log("Skipping %s because of previous error" % \
(index and os.path.join(*index) or '()',), 2)
class CacheIndexable:
"""Cache last few indexed elements in iterator
......@@ -423,5 +392,6 @@ class CacheIndexable:
"""Return element with index index from cache"""
try: return self.cache_dict[index]
except KeyError:
assert index > self.cache_indicies[0], index
assert index >= self.cache_indicies[0], \
repr((index, self.cache_indicies[0]))
return None
......@@ -3,6 +3,17 @@ from commontest import *
from rdiff_backup.iterfile import *
from rdiff_backup import lazy
class FileException:
"""Like a file, but raise exception after certain # bytes read"""
def __init__(self, max):
self.count = 0
self.max = max
def read(self, l):
self.count += l
if self.count > self.max: raise IOError(13, "Permission Denied")
return "a"*l
def close(self): return None
class testIterFile(unittest.TestCase):
def setUp(self):
......@@ -15,6 +26,33 @@ class testIterFile(unittest.TestCase):
assert lazy.Iter.equal(itm(),
IterWrappingFile(FileWrappingIter(itm())))
def testFile(self):
"""Test sending files through iters"""
buf1 = "hello"*10000
file1 = StringIO.StringIO(buf1)
buf2 = "goodbye"*10000
file2 = StringIO.StringIO(buf2)
file_iter = FileWrappingIter(iter([file1, file2]))
new_iter = IterWrappingFile(file_iter)
assert new_iter.next().read() == buf1
assert new_iter.next().read() == buf2
self.assertRaises(StopIteration, new_iter.next)
def testFileException(self):
"""Test encoding a file which raises an exception"""
f = FileException(100*1024)
new_iter = IterWrappingFile(FileWrappingIter(iter([f, "foo"])))
f_out = new_iter.next()
assert f_out.read(10000) == "a"*10000
try: buf = f_out.read(100*1024)
except IOError: pass
else: assert 0, len(buf)
assert new_iter.next() == "foo"
self.assertRaises(StopIteration, new_iter.next)
class testBufferedRead(unittest.TestCase):
def testBuffering(self):
"""Test buffering a StringIO"""
......
......@@ -291,28 +291,5 @@ class CacheIndexableTest(unittest.TestCase):
assert l1 == l2, (l1, l2)
class CacheIndexableProcessorTest(unittest.TestCase):
def function(self, elem):
"""Called by CIP on each elem"""
self.l.append(elem)
def testReorder(self):
"""Test the reordering abilities of CIP"""
CIP = rorpiter.CachedIndexableProcessor(self.function, 3)
in_list = [rorpiter.IndexedTuple((x,), (x,)) for x in range(6)]
self.l = []
CIP(in_list[0])
CIP(in_list[2])
CIP(in_list[1])
CIP(in_list[5])
CIP(in_list[3])
CIP(in_list[4])
self.assertRaises(AssertionError, CIP, in_list[0])
CIP.close()
assert self.l == in_list, (self.l, in_list)
if __name__ == "__main__": unittest.main()
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment