Commit a5904b19 authored by ben's avatar ben

More optimization: rewrote selection iteration to not recur, and added

"fast processing" to IterTreeReducer, so objects don't need to be
created in the typical case.


git-svn-id: http://svn.savannah.nongnu.org/svn/rdiff-backup@145 2b77aa54-bcbc-44c9-a7ec-4f6cf2b41109
parent 7d45c32c
......@@ -14,7 +14,7 @@ import re, os
# The current version of rdiff-backup
version = "0.9.1"
version = "$version"
# If this is set, use this value in seconds as the current time
# instead of reading it from the clock.
......@@ -167,9 +167,9 @@ print_statistics = None
# replaced by the source and mirror Select objects respectively.
select_source, select_mirror = None, None
# On the backup writer connection, holds the main incrementing
# function. Access is provided to increment error counts.
ITR = None
# On the backup writer connection, holds the root incrementing branch
# object. Access is provided to increment error counts.
ITRB = None
def get(name):
"""Return the value of something in this module"""
......
......@@ -74,40 +74,37 @@ static PyObject *c_make_file_dict(self, args)
else if S_ISDIR(mode) strcpy(filetype, "dir");
else if S_ISSOCK(mode) strcpy(filetype, "sock");
else strcpy(filetype, "fifo");
return Py_BuildValue("{s:s,s:N,s:l,s:l,s:l,s:N,s:N,s:l,s:N,s:N}",
"type", filetype,
"size", size,
"perms", perms,
"uid", (long)sbuf.st_uid,
"gid", (long)sbuf.st_gid,
"inode", inode,
"devloc", devloc,
"nlink", (long)sbuf.st_nlink,
"mtime", mtime,
"atime", atime);
return_val = Py_BuildValue("{s:s,s:O,s:l,s:l,s:l,s:O,s:O,s:l,s:O,s:O}",
"type", filetype,
"size", size,
"perms", perms,
"uid", (long)sbuf.st_uid,
"gid", (long)sbuf.st_gid,
"inode", inode,
"devloc", devloc,
"nlink", (long)sbuf.st_nlink,
"mtime", mtime,
"atime", atime);
} else if S_ISLNK(mode) {
/* Symbolic links */
char linkname[1024];
int len_link = readlink(filename, linkname, 1023);
if (len_link < 0) {
PyErr_SetFromErrno(PyExc_OSError);
return NULL;
return_val = NULL;
} else {
linkname[len_link] = '\0';
return_val = Py_BuildValue("{s:s,s:O,s:l,s:l,s:l,s:O,s:O,s:l,s:s}",
"type", "sym",
"size", size,
"perms", perms,
"uid", (long)sbuf.st_uid,
"gid", (long)sbuf.st_gid,
"inode", inode,
"devloc", devloc,
"nlink", (long)sbuf.st_nlink,
"linkname", linkname);
}
linkname[len_link] = '\0';
return_val = Py_BuildValue("{s:s,s:N,s:l,s:l,s:l,s:N,s:N,s:l,s:s}",
"type", "sym",
"size", size,
"perms", perms,
"uid", (long)sbuf.st_uid,
"gid", (long)sbuf.st_gid,
"inode", inode,
"devloc", devloc,
"nlink", (long)sbuf.st_nlink,
"linkname", linkname);
Py_DECREF(mtime);
Py_DECREF(atime);
return return_val;
} else if (S_ISCHR(mode) || S_ISBLK(mode)) {
/* Device files */
char devtype[2];
......@@ -121,7 +118,7 @@ static PyObject *c_make_file_dict(self, args)
int minor_num = (int)(devnums & 0xff);
if S_ISCHR(mode) strcpy(devtype, "c");
else strcpy(devtype, "b");
return_val = Py_BuildValue("{s:s,s:N,s:l,s:l,s:l,s:N,s:N,s:l,s:N}",
return_val = Py_BuildValue("{s:s,s:O,s:l,s:l,s:l,s:O,s:O,s:l,s:N}",
"type", "dev",
"size", size,
"perms", perms,
......@@ -132,19 +129,18 @@ static PyObject *c_make_file_dict(self, args)
"nlink", (long)sbuf.st_nlink,
"devnums", Py_BuildValue("(s,O,i)", devtype,
major_num, minor_num));
Py_DECREF(mtime);
Py_DECREF(atime);
return return_val;
Py_DECREF(major_num);
} else {
/* Unrecognized file type - raise exception */
Py_DECREF(size);
Py_DECREF(inode);
Py_DECREF(devloc);
Py_DECREF(mtime);
Py_DECREF(atime);
PyErr_SetString(UnknownFileTypeError, filename);
return NULL;
return_val = NULL;
}
Py_DECREF(size);
Py_DECREF(inode);
Py_DECREF(devloc);
Py_DECREF(mtime);
Py_DECREF(atime);
return return_val;
}
......
......@@ -199,7 +199,7 @@ class DSRPath(RPath):
return self.__class__(self.source, self.conn, self.base, index)
class DestructiveSteppingFinalizer(ErrorITR):
class DestructiveSteppingFinalizer(ITRBranch):
"""Finalizer that can work on an iterator of dsrpaths
The reason we have to use an IterTreeReducer is that some files
......@@ -215,6 +215,12 @@ class DestructiveSteppingFinalizer(ErrorITR):
def end_process(self):
if self.dsrpath: self.dsrpath.write_changes()
def can_fast_process(self, index, dsrpath):
return not self.dsrpath.isdir()
def fast_process(self, index, dsrpath):
if self.dsrpath: self.dsrpath.write_changes()
from log import *
from robust import *
......
......@@ -108,7 +108,7 @@ class HLSourceStruct:
"""
collated = RORPIter.CollateIterators(cls.initial_dsiter2, sigiter)
finalizer = DestructiveSteppingFinalizer()
finalizer = IterTreeReducer(DestructiveSteppingFinalizer, [])
def error_handler(exc, dest_sig, dsrp):
Log("Error %s producing a diff of %s" %
(exc, dsrp and dsrp.path), 2)
......@@ -213,26 +213,26 @@ class HLDestinationStruct:
"""Return finalizer, starting from session info if necessary"""
old_finalizer = cls._session_info and cls._session_info.finalizer
if old_finalizer: return old_finalizer
else: return DestructiveSteppingFinalizer()
else: return IterTreeReducer(DestructiveSteppingFinalizer, [])
def get_ITR(cls, inc_rpath):
"""Return ITR, starting from state if necessary"""
if cls._session_info and cls._session_info.ITR:
return cls._session_info.ITR
else:
iitr = IncrementITR(inc_rpath)
iitr.override_changed()
Globals.ITR = iitr
iitr.Errors = 0
iitr = IterTreeReducer(IncrementITRB, [inc_rpath])
iitr.root_branch.override_changed()
Globals.ITRB = iitr.root_branch
iitr.root_branch.Errors = 0
return iitr
def get_MirrorITR(cls, inc_rpath):
"""Return MirrorITR, starting from state if available"""
if cls._session_info and cls._session_info.ITR:
return cls._session_info.ITR
ITR = MirrorITR(inc_rpath)
Globals.ITR = ITR
ITR.Errors = 0
ITR = IterTreeReducer(MirrorITRB, [inc_rpath])
Globals.ITRB = ITR.root_branch
ITR.root_branch.Errors = 0
return ITR
def patch_and_finalize(cls, dest_rpath, diffs):
......@@ -282,7 +282,7 @@ class HLDestinationStruct:
if Globals.preserve_hardlinks: Hardlink.final_writedata()
MiscStats.close_dir_stats_file()
MiscStats.write_session_statistics(ITR)
MiscStats.write_session_statistics(ITR.root_branch)
SaveState.checkpoint_remove()
def patch_increment_and_finalize(cls, dest_rpath, diffs, inc_rpath):
......@@ -309,7 +309,7 @@ class HLDestinationStruct:
if Globals.preserve_hardlinks: Hardlink.final_writedata()
MiscStats.close_dir_stats_file()
MiscStats.write_session_statistics(ITR)
MiscStats.write_session_statistics(ITR.root_branch)
SaveState.checkpoint_remove()
def handle_last_error(cls, dsrp, finalizer, ITR):
......
......@@ -123,7 +123,7 @@ class Inc:
MakeStatic(Inc)
class IncrementITR(ErrorITR, StatsITR):
class IncrementITRB(StatsITRB):
"""Patch and increment mirror directory
This has to be an ITR because directories that have files in them
......@@ -155,7 +155,7 @@ class IncrementITR(ErrorITR, StatsITR):
def __init__(self, inc_rpath):
"""Set inc_rpath, an rpath of the base of the tree"""
self.inc_rpath = inc_rpath
StatsITR.__init__(self, inc_rpath)
StatsITRB.__init__(self)
def start_process(self, index, diff_rorp, dsrp):
"""Initial processing of file
......@@ -266,20 +266,28 @@ class IncrementITR(ErrorITR, StatsITR):
if self.mirror_isdirectory or dsrp.isdir():
MiscStats.write_dir_stats_line(self, dsrp.index)
def branch_process(self, subinstance):
def can_fast_process(self, index, diff_rorp, dsrp):
"""True if there is no change in file and is just a leaf"""
return not diff_rorp and dsrp.isreg()
def fast_process(self, index, diff_rorp, dsrp):
"""Just update statistics"""
StatsITRB.fast_process(self, dsrp)
def branch_process(self, branch):
"""Update statistics, and the has_changed flag if change in branch"""
if subinstance.changed: self.changed = 1
self.add_file_stats(subinstance)
if branch.changed: self.changed = 1
self.add_file_stats(branch)
class MirrorITR(ErrorITR, StatsITR):
class MirrorITRB(StatsITRB):
"""Like IncrementITR, but only patch mirror directory, don't increment"""
# This is always None since no increments will be created
incrp = None
def __init__(self, inc_rpath):
"""Set inc_rpath, an rpath of the base of the inc tree"""
self.inc_rpath = inc_rpath
StatsITR.__init__(self, inc_rpath)
StatsITRB.__init__(self)
def start_process(self, index, diff_rorp, mirror_dsrp):
"""Initialize statistics, do actual writing to mirror"""
......@@ -296,9 +304,17 @@ class MirrorITR(ErrorITR, StatsITR):
if self.mirror_dsrp.isdir():
MiscStats.write_dir_stats_line(self, self.mirror_dsrp.index)
def branch_process(self, subinstance):
def can_fast_process(self, index, diff_rorp, mirror_dsrp):
"""True if there is no change in file and it is just a leaf"""
return not diff_rorp and mirror_dsrp.isreg()
def fast_process(self, index, diff_rorp, mirror_dsrp):
"""Just update statistics"""
StatsITRB.fast_process(self, mirror_dsrp)
def branch_process(self, branch):
"""Update statistics with subdirectory results"""
self.add_file_stats(subinstance)
self.add_file_stats(branch)
from log import *
......
......@@ -55,7 +55,7 @@ class Iter:
for i1 in iter1:
try: i2 = iter2.next()
except StopIteration:
if verbose: print "End when i1 = %s" % i1
if verbose: print "End when i1 = %s" % (i1,)
return None
if not operator(i1, i2):
if verbose: print "%s not equal to %s" % (i1, i2)
......@@ -212,85 +212,55 @@ class IterTreeReducer:
when subclassing (this is used to resume failed sessions).
"""
def __init__(self, *args):
def __init__(self, branch_class, branch_args):
"""ITR initializer"""
self.init_args = args
self.base_index = self.index = None
self.subinstances = [self]
self.finished = None
self.caught_exception = self.start_successful = None
self.branch_class = branch_class
self.branch_args = branch_args
self.index = None
self.root_branch = branch_class(*branch_args)
self.branches = [self.root_branch]
def finish_subinstances(self, index):
"""Run Finish() on all subinstances index has passed
def finish_branches(self, index):
"""Run Finish() on all branches index has passed
When we pass out of a subinstance's tree, delete it and
process it with the parent. The innermost subinstances will
be the last in the list. Return None if we are out of the
entire tree, and 1 otherwise.
When we pass out of a branch, delete it and process it with
the parent. The innermost branches will be the last in the
list. Return None if we are out of the entire tree, and 1
otherwise.
"""
subinstances = self.subinstances
branches = self.branches
while 1:
to_be_finished = subinstances[-1]
to_be_finished = branches[-1]
base_index = to_be_finished.base_index
if base_index != index[:len(base_index)]:
# out of the tree, finish with to_be_finished
to_be_finished.call_end_proc()
del subinstances[-1]
if not subinstances: return None
subinstances[-1].branch_process(to_be_finished)
del branches[-1]
if not branches: return None
branches[-1].branch_process(to_be_finished)
else: return 1
def call_end_proc(self):
"""Runs the end_process on self, checking for errors"""
if self.finished or not self.start_successful:
self.caught_exception = 1
if self.caught_exception: self.log_prev_error(self.base_index)
else: Robust.check_common_error(self.on_error, self.end_process)
self.finished = 1
def add_subinstance(self):
"""Return subinstance of same type as self, add to subinstances"""
subinst = self.__class__(*self.init_args)
self.subinstances.append(subinst)
return subinst
def process_w_subinstance(self, index, subinst, args):
"""Run start_process on latest subinstance"""
Robust.check_common_error(subinst.on_error,
subinst.start_process, args)
if not subinst.caught_exception: subinst.start_successful = 1
subinst.base_index = index
def start_process(self, *args):
"""Do some initial processing (stub)"""
pass
def end_process(self):
"""Do any final processing before leaving branch (stub)"""
pass
def add_branch(self):
"""Return branch of type self.branch_class, add to branch list"""
branch = self.branch_class(*self.branch_args)
self.branches.append(branch)
return branch
def branch_process(self, subinstance):
"""Process a branch right after it is finished (stub)"""
assert subinstance.finished
pass
def on_error(self, exc, *args):
"""This will be run on any exception in start/end-process"""
pass
def process_w_branch(self, index, branch, args):
"""Run start_process on latest branch"""
Robust.check_common_error(branch.on_error,
branch.start_process, args)
if not branch.caught_exception: branch.start_successful = 1
branch.base_index = index
def Finish(self):
"""Call at end of sequence to tie everything up"""
while 1:
to_be_finished = self.subinstances.pop()
to_be_finished = self.branches.pop()
to_be_finished.call_end_proc()
if not self.subinstances: break
self.subinstances[-1].branch_process(to_be_finished)
def log_prev_error(self, index):
"""Call function if no pending exception"""
Log("Skipping %s because of previous error" %
(os.path.join(*index),), 2)
if not self.branches: break
self.branches[-1].branch_process(to_be_finished)
def __call__(self, *args):
"""Process args, where args[0] is current position in iterator
......@@ -304,8 +274,8 @@ class IterTreeReducer:
"""
index = args[0]
if self.base_index is None:
self.process_w_subinstance(index, self, args)
if self.index is None:
self.process_w_branch(index, self.root_branch, args)
self.index = index
return 1
......@@ -313,19 +283,56 @@ class IterTreeReducer:
Log("Warning: oldindex %s >= newindex %s" % (self.index, index), 2)
return 1
if self.finish_subinstances(index) is None:
if self.finish_branches(index) is None:
return None # We are no longer in the main tree
if self.subinstances[-1].start_successful:
subinst = self.add_subinstance()
self.process_w_subinstance(index, subinst, args)
else: self.log_prev_error(index)
last_branch = self.branches[-1]
if last_branch.start_successful:
if last_branch.can_fast_process(*args):
last_branch.fast_process(*args)
else:
branch = self.add_branch()
self.process_w_branch(index, branch, args)
else: last_branch.log_prev_error(index)
self.index = index
return 1
class ErrorITR(IterTreeReducer):
"""Adds some error handling to above ITR, if ITR processes files"""
class ITRBranch:
"""Helper class for IterTreeReducer below"""
base_index = index = None
finished = None
caught_exception = start_successful = None
def call_end_proc(self):
"""Runs the end_process on self, checking for errors"""
if self.finished or not self.start_successful:
self.caught_exception = 1
if self.caught_exception: self.log_prev_error(self.base_index)
else: Robust.check_common_error(self.on_error, self.end_process)
self.finished = 1
def start_process(self, *args):
"""Do some initial processing (stub)"""
pass
def end_process(self):
"""Do any final processing before leaving branch (stub)"""
pass
def branch_process(self, branch):
"""Process a branch right after it is finished (stub)"""
assert branch.finished
pass
def can_fast_process(self, *args):
"""True if object can be processed without new branch (stub)"""
return None
def fast_process(self, *args):
"""Process args without new child branch (stub)"""
pass
def on_error(self, exc, *args):
"""This is run on any exception in start/end-process"""
self.caught_exception = 1
......@@ -335,6 +342,11 @@ class ErrorITR(IterTreeReducer):
else: filename = "."
Log("Error '%s' processing %s" % (exc, filename), 2)
def log_prev_error(self, index):
"""Call function if no pending exception"""
Log("Skipping %s because of previous error" %
(os.path.join(*index),), 2)
# Put at bottom to prevent (viciously) circular module dependencies
from robust import *
......
......@@ -119,8 +119,8 @@ class Restore:
assert isinstance(mirror, DSRPath) and isinstance(target, DSRPath)
assert mirror.index == rid.index
mirror_finalizer = DestructiveSteppingFinalizer()
target_finalizer = DestructiveSteppingFinalizer()
mirror_finalizer = IterTreeReducer(DestructiveSteppingFinalizer, ())
target_finalizer = IterTreeReducer(DestructiveSteppingFinalizer, ())
for rcd in Restore.yield_rcds(rid.index, mirror, rid,
target, time, mirror_time):
rcd.RestoreFile()
......
......@@ -264,8 +264,8 @@ class Robust:
Log.exception()
conn = Globals.backup_writer
if conn is not None: # increment error count
ITR_exists = conn.Globals.is_not_None('ITR')
if ITR_exists: conn.Globals.ITR.increment_stat('Errors')
ITRB_exists = conn.Globals.is_not_None('ITRB')
if ITRB_exists: conn.Globals.ITRB.increment_stat('Errors')
if error_handler: return error_handler(exc, *args)
else:
Log.exception(1, 2)
......
......@@ -433,7 +433,9 @@ class RPath(RORPath):
self.conn = connection
self.index = index
self.base = base
if base is not None: self.path = "/".join((base,) + index)
if base is not None:
if base == "/": self.path = "/" + "/".join(index)
else: self.path = "/".join((base,) + index)
self.file = None
if data or base is None: self.data = data
else: self.data = self.conn.C.make_file_dict(self.path)
......@@ -623,7 +625,7 @@ class RPath(RORPath):
self.setdata()
if not self.lstat(): return # must have been deleted in meantime
elif self.isdir():
itm = RpathDeleter()
itm = IterTreeReducer(RpathDeleter, [])
for dsrp in Select(DSRPath(None, self)).set_iter():
itm(dsrp.index, dsrp)
itm.Finish()
......@@ -795,7 +797,8 @@ from lazy import *
from selection import *
from destructive_stepping import *
class RpathDeleter(IterTreeReducer):
class RpathDeleter(ITRBranch):
"""Delete a directory. Called by RPath.delete()"""
def start_process(self, index, dsrp):
self.dsrp = dsrp
......
......@@ -116,9 +116,8 @@ class Select:
def Iterate_fast(self, dsrpath, sel_func):
"""Like Iterate, but don't recur, saving time
This is a bit harder to read than Iterate/iterate_in_dir, but
it should be faster because it only recurs to half as much
depth. It doesn't handle the quoting case.
Only handles standard case (quoting off, starting from
beginning).
"""
def error_handler(exc, filename):
......@@ -126,38 +125,40 @@ class Select:
return None
def diryield(dsrpath):
s = sel_func(dsrpath)
if s == 0: return
elif s == 1:
yield dsrpath
for filename in Robust.listrp(dsrpath):
new_dsrp = Robust.check_common_error(error_handler,
dsrpath.append, [filename])
if new_dsrp:
if new_dsrp.isdir():
for dsrp in diryield(new_dsrp): yield dsrp
elif sel_func(new_dsrp) == 1: yield new_dsrp
elif s == 2:
yielded_something = None
for filename in Robust.listrp(dsrpath):
new_dsrp = Robust.check_common_error(error_handler,
dsrpath.append, [filename])
if new_dsrp:
if new_dsrp.isdir():
for dsrp in diryield(new_dsrp):
if not yielded_something:
yielded_something = 1
yield dsrpath
yield dsrp
elif sel_func(new_dsrp) == 1:
if not yielded_something:
yielded_something = 1
yield dsrpath
yield new_dsrp
if dsrpath.isdir():
for dsrp in diryield(dsrpath): yield dsrp
elif sel_func(dsrpath) == 1: yield dsrpath
"""Generate relevant files in directory dsrpath
Returns (dsrp, num) where num == 0 means dsrp should be
generated normally, num == 1 means the dsrp is a directory
and should be included iff something inside is included.
"""
for filename in Robust.listrp(dsrpath):
new_dsrp = Robust.check_common_error(error_handler,
dsrpath.append, (filename,))
if new_dsrp:
s = sel_func(new_dsrp)
if s == 1: yield (new_dsrp, 0)
elif s == 2 and new_dsrp.isdir(): yield (new_dsrp, 1)
yield dsrpath
diryield_stack = [diryield(dsrpath)]
delayed_dsrp_stack = []
while diryield_stack:
try: dsrp, val = diryield_stack[-1].next()
except StopIteration:
diryield_stack.pop()
if delayed_dsrp_stack: delayed_dsrp_stack.pop()
continue
if val == 0:
if delayed_dsrp_stack:
for delayed_dsrp in delayed_dsrp_stack: yield delayed_dsrp
del delayed_dsrp_stack[:]
yield dsrp
if dsrp.isdir(): diryield_stack.append(diryield(dsrp))
elif val == 1:
delayed_dsrp_stack.append(dsrp)
diryield_stack.append(diryield(dsrp))
def Iterate(self, dsrpath, rec_func, sel_func):
"""Return iterator yielding dsrps in dsrpath
......@@ -219,7 +220,7 @@ class Select:
def iterate_with_finalizer(self):
"""Like Iterate, but missing some options, and add finalizer"""
finalize = DestructiveSteppingFinalizer()
finalize = IterTreeReducer(DestructiveSteppingFinalizer, ())
for dsrp in self:
yield dsrp
finalize(dsrp.index, dsrp)
......
......@@ -217,18 +217,17 @@ class StatsObj:
return s
class StatsITR(IterTreeReducer, StatsObj):
class StatsITRB(ITRBranch, StatsObj):
"""Keep track of per directory statistics
This is subclassed by the mirroring and incrementing ITRs.
"""
def __init__(self, *args):
def __init__(self):
"""StatsITR initializer - zero out statistics"""
attr_dict = self.__dict__
for attr in StatsObj.stat_file_attrs: attr_dict[attr] = 0
self.ElapsedTime = self.Filename = None
IterTreeReducer.__init__(self, *args)
def start_stats(self, mirror_dsrp):
"""Record status of mirror dsrp
......@@ -273,16 +272,24 @@ class StatsITR(IterTreeReducer, StatsObj):
self.DeletedFileSize += self.mirror_base_size
self.stats_incr_incfiles(inc_rp)
def fast_process(self, mirror_rorp):
"""Use when there is no change from source to mirror"""
source_size = self.stats_getsize(mirror_rorp)
self.SourceFiles += 1
self.MirrorFiles += 1
self.SourceFileSize += source_size
self.MirrorFileSize += source_size
def stats_incr_incfiles(self, inc_rp):
"""Increment IncrementFile statistics"""
if inc_rp:
self.IncrementFiles += 1
self.IncrementFileSize += self.stats_getsize(inc_rp)
def add_file_stats(self, subinstance):
"""Add all file statistics from subinstance to current totals"""
def add_file_stats(self, branch):
"""Add all file statistics from branch to current totals"""
for attr in self.stat_file_attrs:
self.__dict__[attr] += subinstance.__dict__[attr]
self.__dict__[attr] += branch.__dict__[attr]
from log import *
......
......@@ -14,7 +14,7 @@ import re, os
# The current version of rdiff-backup
version = "0.9.1"
version = "$version"
# If this is set, use this value in seconds as the current time
# instead of reading it from the clock.
......@@ -167,9 +167,9 @@ print_statistics = None
# replaced by the source and mirror Select objects respectively.
select_source, select_mirror = None, None
# On the backup writer connection, holds the main incrementing
# function. Access is provided to increment error counts.
ITR = None
# On the backup writer connection, holds the root incrementing branch
# object. Access is provided to increment error counts.
ITRB = None
def get(name):
"""Return the value of something in this module"""
......
......@@ -74,40 +74,37 @@ static PyObject *c_make_file_dict(self, args)
else if S_ISDIR(mode) strcpy(filetype, "dir");
else if S_ISSOCK(mode) strcpy(filetype, "sock");
else strcpy(filetype, "fifo");
return Py_BuildValue("{s:s,s:N,s:l,s:l,s:l,s:N,s:N,s:l,s:N,s:N}",
"type", filetype,
"size", size,
"perms", perms,
"uid", (long)sbuf.st_uid,
"gid", (long)sbuf.st_gid,
"inode", inode,
"devloc", devloc,
"nlink", (long)sbuf.st_nlink,
"mtime", mtime,
"atime", atime);
return_val = Py_BuildValue("{s:s,s:O,s:l,s:l,s:l,s:O,s:O,s:l,s:O,s:O}",
"type", filetype,
"size", size,
"perms", perms,
"uid", (long)sbuf.st_uid,
"gid", (long)sbuf.st_gid,
"inode", inode,
"devloc", devloc,
"nlink", (long)sbuf.st_nlink,
"mtime", mtime,
"atime", atime);
} else if S_ISLNK(mode) {
/* Symbolic links */
char linkname[1024];
int len_link = readlink(filename, linkname, 1023);
if (len_link < 0) {
PyErr_SetFromErrno(PyExc_OSError);
return NULL;
return_val = NULL;
} else {
linkname[len_link] = '\0';
return_val = Py_BuildValue("{s:s,s:O,s:l,s:l,s:l,s:O,s:O,s:l,s:s}",
"type", "sym",
"size", size,
"perms", perms,
"uid", (long)sbuf.st_uid,
"gid", (long)sbuf.st_gid,
"inode", inode,
"devloc", devloc,
"nlink", (long)sbuf.st_nlink,
"linkname", linkname);
}
linkname[len_link] = '\0';
return_val = Py_BuildValue("{s:s,s:N,s:l,s:l,s:l,s:N,s:N,s:l,s:s}",
"type", "sym",
"size", size,
"perms", perms,
"uid", (long)sbuf.st_uid,
"gid", (long)sbuf.st_gid,
"inode", inode,
"devloc", devloc,
"nlink", (long)sbuf.st_nlink,
"linkname", linkname);
Py_DECREF(mtime);
Py_DECREF(atime);
return return_val;
} else if (S_ISCHR(mode) || S_ISBLK(mode)) {
/* Device files */
char devtype[2];
......@@ -121,7 +118,7 @@ static PyObject *c_make_file_dict(self, args)
int minor_num = (int)(devnums & 0xff);
if S_ISCHR(mode) strcpy(devtype, "c");
else strcpy(devtype, "b");
return_val = Py_BuildValue("{s:s,s:N,s:l,s:l,s:l,s:N,s:N,s:l,s:N}",
return_val = Py_BuildValue("{s:s,s:O,s:l,s:l,s:l,s:O,s:O,s:l,s:N}",
"type", "dev",
"size", size,
"perms", perms,
......@@ -132,19 +129,18 @@ static PyObject *c_make_file_dict(self, args)
"nlink", (long)sbuf.st_nlink,
"devnums", Py_BuildValue("(s,O,i)", devtype,
major_num, minor_num));
Py_DECREF(mtime);
Py_DECREF(atime);
return return_val;
Py_DECREF(major_num);
} else {
/* Unrecognized file type - raise exception */
Py_DECREF(size);
Py_DECREF(inode);
Py_DECREF(devloc);
Py_DECREF(mtime);
Py_DECREF(atime);
PyErr_SetString(UnknownFileTypeError, filename);
return NULL;
return_val = NULL;
}
Py_DECREF(size);
Py_DECREF(inode);
Py_DECREF(devloc);
Py_DECREF(mtime);
Py_DECREF(atime);
return return_val;
}
......
......@@ -199,7 +199,7 @@ class DSRPath(RPath):
return self.__class__(self.source, self.conn, self.base, index)
class DestructiveSteppingFinalizer(ErrorITR):
class DestructiveSteppingFinalizer(ITRBranch):
"""Finalizer that can work on an iterator of dsrpaths
The reason we have to use an IterTreeReducer is that some files
......@@ -215,6 +215,12 @@ class DestructiveSteppingFinalizer(ErrorITR):
def end_process(self):
if self.dsrpath: self.dsrpath.write_changes()
def can_fast_process(self, index, dsrpath):
return not self.dsrpath.isdir()
def fast_process(self, index, dsrpath):
if self.dsrpath: self.dsrpath.write_changes()
from log import *
from robust import *
......
......@@ -108,7 +108,7 @@ class HLSourceStruct:
"""
collated = RORPIter.CollateIterators(cls.initial_dsiter2, sigiter)
finalizer = DestructiveSteppingFinalizer()
finalizer = IterTreeReducer(DestructiveSteppingFinalizer, [])
def error_handler(exc, dest_sig, dsrp):
Log("Error %s producing a diff of %s" %
(exc, dsrp and dsrp.path), 2)
......@@ -213,26 +213,26 @@ class HLDestinationStruct:
"""Return finalizer, starting from session info if necessary"""
old_finalizer = cls._session_info and cls._session_info.finalizer
if old_finalizer: return old_finalizer
else: return DestructiveSteppingFinalizer()
else: return IterTreeReducer(DestructiveSteppingFinalizer, [])
def get_ITR(cls, inc_rpath):
"""Return ITR, starting from state if necessary"""
if cls._session_info and cls._session_info.ITR:
return cls._session_info.ITR
else:
iitr = IncrementITR(inc_rpath)
iitr.override_changed()
Globals.ITR = iitr
iitr.Errors = 0
iitr = IterTreeReducer(IncrementITRB, [inc_rpath])
iitr.root_branch.override_changed()
Globals.ITRB = iitr.root_branch
iitr.root_branch.Errors = 0
return iitr
def get_MirrorITR(cls, inc_rpath):
"""Return MirrorITR, starting from state if available"""
if cls._session_info and cls._session_info.ITR:
return cls._session_info.ITR
ITR = MirrorITR(inc_rpath)
Globals.ITR = ITR
ITR.Errors = 0
ITR = IterTreeReducer(MirrorITRB, [inc_rpath])
Globals.ITRB = ITR.root_branch
ITR.root_branch.Errors = 0
return ITR
def patch_and_finalize(cls, dest_rpath, diffs):
......@@ -282,7 +282,7 @@ class HLDestinationStruct:
if Globals.preserve_hardlinks: Hardlink.final_writedata()
MiscStats.close_dir_stats_file()
MiscStats.write_session_statistics(ITR)
MiscStats.write_session_statistics(ITR.root_branch)
SaveState.checkpoint_remove()
def patch_increment_and_finalize(cls, dest_rpath, diffs, inc_rpath):
......@@ -309,7 +309,7 @@ class HLDestinationStruct:
if Globals.preserve_hardlinks: Hardlink.final_writedata()
MiscStats.close_dir_stats_file()
MiscStats.write_session_statistics(ITR)
MiscStats.write_session_statistics(ITR.root_branch)
SaveState.checkpoint_remove()
def handle_last_error(cls, dsrp, finalizer, ITR):
......
......@@ -123,7 +123,7 @@ class Inc:
MakeStatic(Inc)
class IncrementITR(ErrorITR, StatsITR):
class IncrementITRB(StatsITRB):
"""Patch and increment mirror directory
This has to be an ITR because directories that have files in them
......@@ -155,7 +155,7 @@ class IncrementITR(ErrorITR, StatsITR):
def __init__(self, inc_rpath):
"""Set inc_rpath, an rpath of the base of the tree"""
self.inc_rpath = inc_rpath
StatsITR.__init__(self, inc_rpath)
StatsITRB.__init__(self)
def start_process(self, index, diff_rorp, dsrp):
"""Initial processing of file
......@@ -266,20 +266,28 @@ class IncrementITR(ErrorITR, StatsITR):
if self.mirror_isdirectory or dsrp.isdir():
MiscStats.write_dir_stats_line(self, dsrp.index)
def branch_process(self, subinstance):
def can_fast_process(self, index, diff_rorp, dsrp):
"""True if there is no change in file and is just a leaf"""
return not diff_rorp and dsrp.isreg()
def fast_process(self, index, diff_rorp, dsrp):
"""Just update statistics"""
StatsITRB.fast_process(self, dsrp)
def branch_process(self, branch):
"""Update statistics, and the has_changed flag if change in branch"""
if subinstance.changed: self.changed = 1
self.add_file_stats(subinstance)
if branch.changed: self.changed = 1
self.add_file_stats(branch)
class MirrorITR(ErrorITR, StatsITR):
class MirrorITRB(StatsITRB):
"""Like IncrementITR, but only patch mirror directory, don't increment"""
# This is always None since no increments will be created
incrp = None
def __init__(self, inc_rpath):
"""Set inc_rpath, an rpath of the base of the inc tree"""
self.inc_rpath = inc_rpath
StatsITR.__init__(self, inc_rpath)
StatsITRB.__init__(self)
def start_process(self, index, diff_rorp, mirror_dsrp):
"""Initialize statistics, do actual writing to mirror"""
......@@ -296,9 +304,17 @@ class MirrorITR(ErrorITR, StatsITR):
if self.mirror_dsrp.isdir():
MiscStats.write_dir_stats_line(self, self.mirror_dsrp.index)
def branch_process(self, subinstance):
def can_fast_process(self, index, diff_rorp, mirror_dsrp):
"""True if there is no change in file and it is just a leaf"""
return not diff_rorp and mirror_dsrp.isreg()
def fast_process(self, index, diff_rorp, mirror_dsrp):
"""Just update statistics"""
StatsITRB.fast_process(self, mirror_dsrp)
def branch_process(self, branch):
"""Update statistics with subdirectory results"""
self.add_file_stats(subinstance)
self.add_file_stats(branch)
from log import *
......
......@@ -55,7 +55,7 @@ class Iter:
for i1 in iter1:
try: i2 = iter2.next()
except StopIteration:
if verbose: print "End when i1 = %s" % i1
if verbose: print "End when i1 = %s" % (i1,)
return None
if not operator(i1, i2):
if verbose: print "%s not equal to %s" % (i1, i2)
......@@ -212,85 +212,55 @@ class IterTreeReducer:
when subclassing (this is used to resume failed sessions).
"""
def __init__(self, *args):
def __init__(self, branch_class, branch_args):
"""ITR initializer"""
self.init_args = args
self.base_index = self.index = None
self.subinstances = [self]
self.finished = None
self.caught_exception = self.start_successful = None
self.branch_class = branch_class
self.branch_args = branch_args
self.index = None
self.root_branch = branch_class(*branch_args)
self.branches = [self.root_branch]
def finish_subinstances(self, index):
"""Run Finish() on all subinstances index has passed
def finish_branches(self, index):
"""Run Finish() on all branches index has passed
When we pass out of a subinstance's tree, delete it and
process it with the parent. The innermost subinstances will
be the last in the list. Return None if we are out of the
entire tree, and 1 otherwise.
When we pass out of a branch, delete it and process it with
the parent. The innermost branches will be the last in the
list. Return None if we are out of the entire tree, and 1
otherwise.
"""
subinstances = self.subinstances
branches = self.branches
while 1:
to_be_finished = subinstances[-1]
to_be_finished = branches[-1]
base_index = to_be_finished.base_index
if base_index != index[:len(base_index)]:
# out of the tree, finish with to_be_finished
to_be_finished.call_end_proc()
del subinstances[-1]
if not subinstances: return None
subinstances[-1].branch_process(to_be_finished)
del branches[-1]
if not branches: return None
branches[-1].branch_process(to_be_finished)
else: return 1
def call_end_proc(self):
"""Runs the end_process on self, checking for errors"""
if self.finished or not self.start_successful:
self.caught_exception = 1
if self.caught_exception: self.log_prev_error(self.base_index)
else: Robust.check_common_error(self.on_error, self.end_process)
self.finished = 1
def add_subinstance(self):
"""Return subinstance of same type as self, add to subinstances"""
subinst = self.__class__(*self.init_args)
self.subinstances.append(subinst)
return subinst
def process_w_subinstance(self, index, subinst, args):
"""Run start_process on latest subinstance"""
Robust.check_common_error(subinst.on_error,
subinst.start_process, args)
if not subinst.caught_exception: subinst.start_successful = 1
subinst.base_index = index
def start_process(self, *args):
"""Do some initial processing (stub)"""
pass
def end_process(self):
"""Do any final processing before leaving branch (stub)"""
pass
def add_branch(self):
"""Return branch of type self.branch_class, add to branch list"""
branch = self.branch_class(*self.branch_args)
self.branches.append(branch)
return branch
def branch_process(self, subinstance):
"""Process a branch right after it is finished (stub)"""
assert subinstance.finished
pass
def on_error(self, exc, *args):
"""This will be run on any exception in start/end-process"""
pass
def process_w_branch(self, index, branch, args):
"""Run start_process on latest branch"""
Robust.check_common_error(branch.on_error,
branch.start_process, args)
if not branch.caught_exception: branch.start_successful = 1
branch.base_index = index
def Finish(self):
"""Call at end of sequence to tie everything up"""
while 1:
to_be_finished = self.subinstances.pop()
to_be_finished = self.branches.pop()
to_be_finished.call_end_proc()
if not self.subinstances: break
self.subinstances[-1].branch_process(to_be_finished)
def log_prev_error(self, index):
"""Call function if no pending exception"""
Log("Skipping %s because of previous error" %
(os.path.join(*index),), 2)
if not self.branches: break
self.branches[-1].branch_process(to_be_finished)
def __call__(self, *args):
"""Process args, where args[0] is current position in iterator
......@@ -304,8 +274,8 @@ class IterTreeReducer:
"""
index = args[0]
if self.base_index is None:
self.process_w_subinstance(index, self, args)
if self.index is None:
self.process_w_branch(index, self.root_branch, args)
self.index = index
return 1
......@@ -313,19 +283,56 @@ class IterTreeReducer:
Log("Warning: oldindex %s >= newindex %s" % (self.index, index), 2)
return 1
if self.finish_subinstances(index) is None:
if self.finish_branches(index) is None:
return None # We are no longer in the main tree
if self.subinstances[-1].start_successful:
subinst = self.add_subinstance()
self.process_w_subinstance(index, subinst, args)
else: self.log_prev_error(index)
last_branch = self.branches[-1]
if last_branch.start_successful:
if last_branch.can_fast_process(*args):
last_branch.fast_process(*args)
else:
branch = self.add_branch()
self.process_w_branch(index, branch, args)
else: last_branch.log_prev_error(index)
self.index = index
return 1
class ErrorITR(IterTreeReducer):
"""Adds some error handling to above ITR, if ITR processes files"""
class ITRBranch:
"""Helper class for IterTreeReducer below"""
base_index = index = None
finished = None
caught_exception = start_successful = None
def call_end_proc(self):
"""Runs the end_process on self, checking for errors"""
if self.finished or not self.start_successful:
self.caught_exception = 1
if self.caught_exception: self.log_prev_error(self.base_index)
else: Robust.check_common_error(self.on_error, self.end_process)
self.finished = 1
def start_process(self, *args):
"""Do some initial processing (stub)"""
pass
def end_process(self):
"""Do any final processing before leaving branch (stub)"""
pass
def branch_process(self, branch):
"""Process a branch right after it is finished (stub)"""
assert branch.finished
pass
def can_fast_process(self, *args):
"""True if object can be processed without new branch (stub)"""
return None
def fast_process(self, *args):
"""Process args without new child branch (stub)"""
pass
def on_error(self, exc, *args):
"""This is run on any exception in start/end-process"""
self.caught_exception = 1
......@@ -335,6 +342,11 @@ class ErrorITR(IterTreeReducer):
else: filename = "."
Log("Error '%s' processing %s" % (exc, filename), 2)
def log_prev_error(self, index):
"""Call function if no pending exception"""
Log("Skipping %s because of previous error" %
(os.path.join(*index),), 2)
# Put at bottom to prevent (viciously) circular module dependencies
from robust import *
......
......@@ -119,8 +119,8 @@ class Restore:
assert isinstance(mirror, DSRPath) and isinstance(target, DSRPath)
assert mirror.index == rid.index
mirror_finalizer = DestructiveSteppingFinalizer()
target_finalizer = DestructiveSteppingFinalizer()
mirror_finalizer = IterTreeReducer(DestructiveSteppingFinalizer, ())
target_finalizer = IterTreeReducer(DestructiveSteppingFinalizer, ())
for rcd in Restore.yield_rcds(rid.index, mirror, rid,
target, time, mirror_time):
rcd.RestoreFile()
......
......@@ -264,8 +264,8 @@ class Robust:
Log.exception()
conn = Globals.backup_writer
if conn is not None: # increment error count
ITR_exists = conn.Globals.is_not_None('ITR')
if ITR_exists: conn.Globals.ITR.increment_stat('Errors')
ITRB_exists = conn.Globals.is_not_None('ITRB')
if ITRB_exists: conn.Globals.ITRB.increment_stat('Errors')
if error_handler: return error_handler(exc, *args)
else:
Log.exception(1, 2)
......
......@@ -433,7 +433,9 @@ class RPath(RORPath):
self.conn = connection
self.index = index
self.base = base
if base is not None: self.path = "/".join((base,) + index)
if base is not None:
if base == "/": self.path = "/" + "/".join(index)
else: self.path = "/".join((base,) + index)
self.file = None
if data or base is None: self.data = data
else: self.data = self.conn.C.make_file_dict(self.path)
......@@ -623,7 +625,7 @@ class RPath(RORPath):
self.setdata()
if not self.lstat(): return # must have been deleted in meantime
elif self.isdir():
itm = RpathDeleter()
itm = IterTreeReducer(RpathDeleter, [])
for dsrp in Select(DSRPath(None, self)).set_iter():
itm(dsrp.index, dsrp)
itm.Finish()
......@@ -795,7 +797,8 @@ from lazy import *
from selection import *
from destructive_stepping import *
class RpathDeleter(IterTreeReducer):
class RpathDeleter(ITRBranch):
"""Delete a directory. Called by RPath.delete()"""
def start_process(self, index, dsrp):
self.dsrp = dsrp
......
......@@ -116,9 +116,8 @@ class Select:
def Iterate_fast(self, dsrpath, sel_func):
"""Like Iterate, but don't recur, saving time
This is a bit harder to read than Iterate/iterate_in_dir, but
it should be faster because it only recurs to half as much
depth. It doesn't handle the quoting case.
Only handles standard case (quoting off, starting from
beginning).
"""
def error_handler(exc, filename):
......@@ -126,38 +125,40 @@ class Select:
return None
def diryield(dsrpath):
s = sel_func(dsrpath)
if s == 0: return
elif s == 1:
yield dsrpath
for filename in Robust.listrp(dsrpath):
new_dsrp = Robust.check_common_error(error_handler,
dsrpath.append, [filename])
if new_dsrp:
if new_dsrp.isdir():
for dsrp in diryield(new_dsrp): yield dsrp
elif sel_func(new_dsrp) == 1: yield new_dsrp
elif s == 2:
yielded_something = None
for filename in Robust.listrp(dsrpath):
new_dsrp = Robust.check_common_error(error_handler,
dsrpath.append, [filename])
if new_dsrp:
if new_dsrp.isdir():
for dsrp in diryield(new_dsrp):
if not yielded_something:
yielded_something = 1
yield dsrpath
yield dsrp
elif sel_func(new_dsrp) == 1:
if not yielded_something:
yielded_something = 1
yield dsrpath
yield new_dsrp
if dsrpath.isdir():
for dsrp in diryield(dsrpath): yield dsrp
elif sel_func(dsrpath) == 1: yield dsrpath
"""Generate relevant files in directory dsrpath
Returns (dsrp, num) where num == 0 means dsrp should be
generated normally, num == 1 means the dsrp is a directory
and should be included iff something inside is included.
"""
for filename in Robust.listrp(dsrpath):
new_dsrp = Robust.check_common_error(error_handler,
dsrpath.append, (filename,))
if new_dsrp:
s = sel_func(new_dsrp)
if s == 1: yield (new_dsrp, 0)
elif s == 2 and new_dsrp.isdir(): yield (new_dsrp, 1)
yield dsrpath
diryield_stack = [diryield(dsrpath)]
delayed_dsrp_stack = []
while diryield_stack:
try: dsrp, val = diryield_stack[-1].next()
except StopIteration:
diryield_stack.pop()
if delayed_dsrp_stack: delayed_dsrp_stack.pop()
continue
if val == 0:
if delayed_dsrp_stack:
for delayed_dsrp in delayed_dsrp_stack: yield delayed_dsrp
del delayed_dsrp_stack[:]
yield dsrp
if dsrp.isdir(): diryield_stack.append(diryield(dsrp))
elif val == 1:
delayed_dsrp_stack.append(dsrp)
diryield_stack.append(diryield(dsrp))
def Iterate(self, dsrpath, rec_func, sel_func):
"""Return iterator yielding dsrps in dsrpath
......@@ -219,7 +220,7 @@ class Select:
def iterate_with_finalizer(self):
"""Like Iterate, but missing some options, and add finalizer"""
finalize = DestructiveSteppingFinalizer()
finalize = IterTreeReducer(DestructiveSteppingFinalizer, ())
for dsrp in self:
yield dsrp
finalize(dsrp.index, dsrp)
......
......@@ -217,18 +217,17 @@ class StatsObj:
return s
class StatsITR(IterTreeReducer, StatsObj):
class StatsITRB(ITRBranch, StatsObj):
"""Keep track of per directory statistics
This is subclassed by the mirroring and incrementing ITRs.
"""
def __init__(self, *args):
def __init__(self):
"""StatsITR initializer - zero out statistics"""
attr_dict = self.__dict__
for attr in StatsObj.stat_file_attrs: attr_dict[attr] = 0
self.ElapsedTime = self.Filename = None
IterTreeReducer.__init__(self, *args)
def start_stats(self, mirror_dsrp):
"""Record status of mirror dsrp
......@@ -273,16 +272,24 @@ class StatsITR(IterTreeReducer, StatsObj):
self.DeletedFileSize += self.mirror_base_size
self.stats_incr_incfiles(inc_rp)
def fast_process(self, mirror_rorp):
"""Use when there is no change from source to mirror"""
source_size = self.stats_getsize(mirror_rorp)
self.SourceFiles += 1
self.MirrorFiles += 1
self.SourceFileSize += source_size
self.MirrorFileSize += source_size
def stats_incr_incfiles(self, inc_rp):
"""Increment IncrementFile statistics"""
if inc_rp:
self.IncrementFiles += 1
self.IncrementFileSize += self.stats_getsize(inc_rp)
def add_file_stats(self, subinstance):
"""Add all file statistics from subinstance to current totals"""
def add_file_stats(self, branch):
"""Add all file statistics from branch to current totals"""
for attr in self.stat_file_attrs:
self.__dict__[attr] += subinstance.__dict__[attr]
self.__dict__[attr] += branch.__dict__[attr]
from log import *
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment