Commit 8c88d03d authored by ben's avatar ben

Reexamined robust writing and statistics


git-svn-id: http://svn.savannah.nongnu.org/svn/rdiff-backup@110 2b77aa54-bcbc-44c9-a7ec-4f6cf2b41109
parent 42fdaace
#!/usr/bin/env python #!/usr/bin/env python
# #
# rdiff-backup -- Mirror files while keeping incremental changes # rdiff-backup -- Mirror files while keeping incremental changes
# Version 0.7.5.1 released May 25, 2002 # Version 0.7.5.3 released May 25, 2002
# Copyright (C) 2001, 2002 Ben Escoto <bescoto@stanford.edu> # Copyright (C) 2001, 2002 Ben Escoto <bescoto@stanford.edu>
# #
# This program is licensed under the GNU General Public License (GPL). # This program is licensed under the GNU General Public License (GPL).
......
...@@ -240,6 +240,7 @@ class HLDestinationStruct: ...@@ -240,6 +240,7 @@ class HLDestinationStruct:
"""Apply diffs and finalize, with checkpointing and statistics""" """Apply diffs and finalize, with checkpointing and statistics"""
collated = RORPIter.CollateIterators(diffs, cls.initial_dsiter2) collated = RORPIter.CollateIterators(diffs, cls.initial_dsiter2)
finalizer, ITR = cls.get_finalizer(), cls.get_MirrorITR(inc_rpath) finalizer, ITR = cls.get_finalizer(), cls.get_MirrorITR(inc_rpath)
Stats.open_dir_stats_file()
dsrp = None dsrp = None
def error_checked(): def error_checked():
...@@ -262,13 +263,15 @@ class HLDestinationStruct: ...@@ -262,13 +263,15 @@ class HLDestinationStruct:
cls.check_skip_error(finalizer.Finish, dsrp) cls.check_skip_error(finalizer.Finish, dsrp)
except: cls.handle_last_error(dsrp, finalizer, ITR) except: cls.handle_last_error(dsrp, finalizer, ITR)
if Globals.preserve_hardlinks: Hardlink.final_writedata() if Globals.preserve_hardlinks: Hardlink.final_writedata()
cls.write_statistics(ITR) Stats.close_dir_stats_file()
Stats.write_session_statistics(ITR)
SaveState.checkpoint_remove() SaveState.checkpoint_remove()
def patch_increment_and_finalize(cls, dest_rpath, diffs, inc_rpath): def patch_increment_and_finalize(cls, dest_rpath, diffs, inc_rpath):
"""Apply diffs, write increment if necessary, and finalize""" """Apply diffs, write increment if necessary, and finalize"""
collated = RORPIter.CollateIterators(diffs, cls.initial_dsiter2) collated = RORPIter.CollateIterators(diffs, cls.initial_dsiter2)
finalizer, ITR = cls.get_finalizer(), cls.get_ITR(inc_rpath) finalizer, ITR = cls.get_finalizer(), cls.get_ITR(inc_rpath)
Stats.open_dir_stats_file()
dsrp = None dsrp = None
def error_checked(): def error_checked():
...@@ -292,7 +295,8 @@ class HLDestinationStruct: ...@@ -292,7 +295,8 @@ class HLDestinationStruct:
cls.check_skip_error(finalizer.Finish, dsrp) cls.check_skip_error(finalizer.Finish, dsrp)
except: cls.handle_last_error(dsrp, finalizer, ITR) except: cls.handle_last_error(dsrp, finalizer, ITR)
if Globals.preserve_hardlinks: Hardlink.final_writedata() if Globals.preserve_hardlinks: Hardlink.final_writedata()
cls.write_statistics(ITR) Stats.close_dir_stats_file()
Stats.write_session_statistics(ITR)
SaveState.checkpoint_remove() SaveState.checkpoint_remove()
def check_skip_error(cls, thunk, dsrp): def check_skip_error(cls, thunk, dsrp):
...@@ -323,19 +327,4 @@ class HLDestinationStruct: ...@@ -323,19 +327,4 @@ class HLDestinationStruct:
SaveState.touch_last_file_definitive() SaveState.touch_last_file_definitive()
raise raise
def write_statistics(cls, ITR):
"""Write session statistics to file, log"""
stat_inc = Inc.get_inc(Globals.rbdir.append("session_statistics"),
Time.curtime, "data")
ITR.StartTime = Time.curtime
ITR.EndTime = time.time()
if Globals.preserve_hardlinks and Hardlink.final_inc:
# include hardlink data in size of increments
ITR.IncrementFileSize += Hardlink.final_inc.getsize()
ITR.write_stats_to_rp(stat_inc)
if Globals.print_statistics:
message = ITR.get_stats_logstring("Session statistics")
Log.log_to_file(message)
Globals.client_conn.sys.stdout.write(message)
MakeClass(HLDestinationStruct) MakeClass(HLDestinationStruct)
...@@ -22,7 +22,11 @@ class Inc: ...@@ -22,7 +22,11 @@ class Inc:
mirror is the mirrored file from the last backup, mirror is the mirrored file from the last backup,
incpref is the prefix of the increment file. incpref is the prefix of the increment file.
This function basically moves mirror -> incpref. This function basically moves the information about the mirror
file to incpref.
The returned RobustAction when executed should return the name
of the incfile, or None if none was created.
""" """
if not (new and new.lstat() or mirror.lstat()): if not (new and new.lstat() or mirror.lstat()):
...@@ -39,13 +43,15 @@ class Inc: ...@@ -39,13 +43,15 @@ class Inc:
else: return Inc.makesnapshot_action(mirror, incpref) else: return Inc.makesnapshot_action(mirror, incpref)
def Increment(new, mirror, incpref): def Increment(new, mirror, incpref):
Inc.Increment_action(new, mirror, incpref).execute() return Inc.Increment_action(new, mirror, incpref).execute()
def makemissing_action(incpref): def makemissing_action(incpref):
"""Signify that mirror file was missing""" """Signify that mirror file was missing"""
return RobustAction(lambda: None, def final(init_val):
Inc.get_inc_ext(incpref, "missing").touch, incrp = Inc.get_inc_ext(incpref, "missing")
lambda exp: None) incrp.touch()
return incrp
return RobustAction(None, final, None)
def makesnapshot_action(mirror, incpref): def makesnapshot_action(mirror, incpref):
"""Copy mirror to incfile, since new is quite different""" """Copy mirror to incfile, since new is quite different"""
...@@ -62,22 +68,29 @@ class Inc: ...@@ -62,22 +68,29 @@ class Inc:
if (Globals.compression and if (Globals.compression and
not Globals.no_compression_regexp.match(mirror.path)): not Globals.no_compression_regexp.match(mirror.path)):
diff = Inc.get_inc_ext(incpref, "diff.gz") diff = Inc.get_inc_ext(incpref, "diff.gz")
return Robust.chain([Rdiff.write_delta_action(new, mirror, compress = 1
diff, 1),
Robust.copy_attribs_action(mirror, diff)])
else: else:
diff = Inc.get_inc_ext(incpref, "diff") diff = Inc.get_inc_ext(incpref, "diff")
return Robust.chain([Rdiff.write_delta_action(new, mirror, compress = None
diff, None),
Robust.copy_attribs_action(mirror, diff)]) diff_tf = TempFileManager.new(diff)
sig_tf = TempFileManager.new(mirror, None)
def init():
Rdiff.write_delta(new, mirror, diff_tf, compress, sig_tf)
RPath.copy_attribs(mirror, diff_tf)
return diff
return Robust.make_tf_robustaction(init, (diff_tf, sig_tf),
(diff, None))
def makedir_action(mirrordir, incpref): def makedir_action(mirrordir, incpref):
"""Make file indicating directory mirrordir has changed""" """Make file indicating directory mirrordir has changed"""
dirsign = Inc.get_inc_ext(incpref, "dir") dirsign = Inc.get_inc_ext(incpref, "dir")
def final(): tf = TempFileManager.new(dirsign)
dirsign.touch() def init():
RPath.copy_attribs(mirrordir, dirsign) tf.touch()
return RobustAction(lambda: None, final, dirsign.delete) RPath.copy_attribs(mirrordir, tf)
return dirsign
return Robust.make_tf_robustaction(init, tf, dirsign)
def get_inc(rp, time, typestr): def get_inc(rp, time, typestr):
"""Return increment like rp but with time and typestr suffixes""" """Return increment like rp but with time and typestr suffixes"""
...@@ -127,8 +140,15 @@ class IncrementITR(StatsITR): ...@@ -127,8 +140,15 @@ class IncrementITR(StatsITR):
Remember this object needs to be pickable. Remember this object needs to be pickable.
""" """
mirror_isdirectory, directory_replacement = None, None # Iff true, mirror file was a directory
mirror_isdirectory = None
# If set, what the directory on the mirror side will be replaced with
directory_replacement = None
# True iff there has been some change at this level or lower (used
# for marking directories to be flagged)
changed = None changed = None
# Holds the RPath of the created increment file, if any
incrp = None
def __init__(self, inc_rpath): def __init__(self, inc_rpath):
"""Set inc_rpath, an rpath of the base of the tree""" """Set inc_rpath, an rpath of the base of the tree"""
...@@ -184,26 +204,34 @@ class IncrementITR(StatsITR): ...@@ -184,26 +204,34 @@ class IncrementITR(StatsITR):
if not (incpref.lstat() and incpref.isdir()): incpref.mkdir() if not (incpref.lstat() and incpref.isdir()): incpref.mkdir()
if diff_rorp and diff_rorp.isreg() and diff_rorp.file: if diff_rorp and diff_rorp.isreg() and diff_rorp.file:
tf = TempFileManager.new(dsrp) tf = TempFileManager.new(dsrp)
RPathStatic.copy_with_attribs(diff_rorp, tf) def init():
tf.set_attached_filetype(diff_rorp.get_attached_filetype()) RPathStatic.copy_with_attribs(diff_rorp, tf)
tf.set_attached_filetype(diff_rorp.get_attached_filetype())
def error(exc, ran_init, init_val): tf.delete()
RobustAction(init, None, error).execute()
self.directory_replacement = tf self.directory_replacement = tf
def init_non_dir(self, dsrp, diff_rorp, incpref): def init_non_dir(self, dsrp, diff_rorp, incpref):
"""Process a non directory file (initial pass)""" """Process a non directory file (initial pass)"""
if not diff_rorp: return # no diff, so no change necessary if not diff_rorp: return # no diff, so no change necessary
if diff_rorp.isreg() and (dsrp.isreg() or diff_rorp.isflaglinked()): if diff_rorp.isreg() and (dsrp.isreg() or diff_rorp.isflaglinked()):
tf = TempFileManager.new(dsrp) # Write updated mirror to temp file so we can compute
# reverse diff locally
mirror_tf = TempFileManager.new(dsrp)
def init_thunk(): def init_thunk():
if diff_rorp.isflaglinked(): if diff_rorp.isflaglinked():
Hardlink.link_rp(diff_rorp, tf, dsrp) Hardlink.link_rp(diff_rorp, mirror_tf, dsrp)
else: Rdiff.patch_with_attribs_action(dsrp, diff_rorp, else: Rdiff.patch_with_attribs_action(dsrp, diff_rorp,
tf).execute() mirror_tf).execute()
Inc.Increment_action(tf, dsrp, incpref).execute() self.incrp = Inc.Increment_action(mirror_tf, dsrp,
Robust.make_tf_robustaction(init_thunk, (tf,), (dsrp,)).execute() incpref).execute()
else: def final(init_val): mirror_tf.rename(dsrp)
Robust.chain([Inc.Increment_action(diff_rorp, dsrp, incpref), def error(exc, ran_init, init_val): mirror_tf.delete()
RORPIter.patchonce_action(None, dsrp, diff_rorp)] RobustAction(init_thunk, final, error).execute()
).execute() else: self.incrp = Robust.chain(
Inc.Increment_action(diff_rorp, dsrp, incpref),
RORPIter.patchonce_action(None, dsrp, diff_rorp)).execute()[0]
self.changed = 1 self.changed = 1
def end_process(self): def end_process(self):
...@@ -217,19 +245,18 @@ class IncrementITR(StatsITR): ...@@ -217,19 +245,18 @@ class IncrementITR(StatsITR):
if self.directory_replacement: if self.directory_replacement:
tf = self.directory_replacement tf = self.directory_replacement
Inc.Increment(tf, dsrp, incpref) self.incrp = Robust.chain(
RORPIter.patchonce_action(None, dsrp, tf).execute() Inc.Increment_action(tf, dsrp, incpref),
RORPIter.patchonce_action(None, dsrp, tf)).execute()[0]
tf.delete() tf.delete()
else: else:
Inc.Increment(diff_rorp, dsrp, incpref) self.incrp = Inc.Increment(diff_rorp, dsrp, incpref)
if diff_rorp: if diff_rorp:
RORPIter.patchonce_action(None, dsrp, diff_rorp).execute() RORPIter.patchonce_action(None, dsrp, diff_rorp).execute()
self.end_stats(diff_rorp, dsrp, Inc._inc_file) self.end_stats(diff_rorp, dsrp, self.incrp)
if self.incpref.isdir() and (self.mirror_isdirectory or dsrp.isdir()): if self.mirror_isdirectory or dsrp.isdir():
self.write_stats_to_rp(Inc.get_inc( Stats.write_dir_stats_line(self, dsrp.index)
self.incpref.append("directory_statistics"),
Time.curtime, "data"))
def branch_process(self, subinstance): def branch_process(self, subinstance):
"""Update statistics, and the has_changed flag if change in branch""" """Update statistics, and the has_changed flag if change in branch"""
...@@ -239,6 +266,8 @@ class IncrementITR(StatsITR): ...@@ -239,6 +266,8 @@ class IncrementITR(StatsITR):
class MirrorITR(StatsITR): class MirrorITR(StatsITR):
"""Like IncrementITR, but only patch mirror directory, don't increment""" """Like IncrementITR, but only patch mirror directory, don't increment"""
# This is always None since no increments will be created
incrp = None
def __init__(self, inc_rpath): def __init__(self, inc_rpath):
"""Set inc_rpath, an rpath of the base of the inc tree""" """Set inc_rpath, an rpath of the base of the inc tree"""
self.inc_rpath = inc_rpath self.inc_rpath = inc_rpath
...@@ -251,9 +280,6 @@ class MirrorITR(StatsITR): ...@@ -251,9 +280,6 @@ class MirrorITR(StatsITR):
RORPIter.patchonce_action(None, mirror_dsrp, diff_rorp).execute() RORPIter.patchonce_action(None, mirror_dsrp, diff_rorp).execute()
self.incpref = self.inc_rpath.new_index(index) self.incpref = self.inc_rpath.new_index(index)
if mirror_dsrp.isdir() and not self.incpref.lstat():
self.incpref.mkdir() # holds the statistics files
self.diff_rorp, self.mirror_dsrp = diff_rorp, mirror_dsrp self.diff_rorp, self.mirror_dsrp = diff_rorp, mirror_dsrp
def end_process(self): def end_process(self):
...@@ -262,11 +288,9 @@ class MirrorITR(StatsITR): ...@@ -262,11 +288,9 @@ class MirrorITR(StatsITR):
except AttributeError: # Some error above prevented these being set except AttributeError: # Some error above prevented these being set
return return
self.end_stats(self.diff_rorp, self.mirror_dsrp) self.end_stats(diff_rorp, mirror_dsrp)
if self.incpref.isdir(): if mirror_dsrp.isdir():
self.write_stats_to_rp(Inc.get_inc( Stats.write_dir_stats_line(self, mirror_dsrp.index)
self.incpref.append("directory_statistics"),
Time.curtime, "data"))
def branch_process(self, subinstance): def branch_process(self, subinstance):
"""Update statistics with subdirectory results""" """Update statistics with subdirectory results"""
......
...@@ -139,6 +139,5 @@ class Logger: ...@@ -139,6 +139,5 @@ class Logger:
logging_func("Exception %s raised of class %s" % logging_func("Exception %s raised of class %s" %
(exc_info[1], exc_info[0]), verbosity) (exc_info[1], exc_info[0]), verbosity)
logging_func("".join(traceback.format_tb(exc_info[2])), verbosity+1) logging_func("".join(traceback.format_tb(exc_info[2])), verbosity+1)
Log = Logger() Log = Logger()
...@@ -33,69 +33,98 @@ execfile("hardlink.py") ...@@ -33,69 +33,98 @@ execfile("hardlink.py")
class RobustAction: class RobustAction:
"""Represents a file operation to be accomplished later""" """Represents a file operation to be accomplished later"""
def __init__(self, init_thunk, final_thunk, error_thunk): def __init__(self, init_thunk, final_func, error_handler):
"""RobustAction initializer """RobustAction initializer
All the thunks are functions whose return value will be All the thunks are functions whose return value will be
ignored. init_thunk should not make any irreversible changes ignored. init_thunk should not make any irreversible changes
but prepare for the writing of the important data. final_thunk but prepare for the writing of the important data. final_func
should be as short as possible and do the real work. should be as short as possible and do the real work.
error_thunk is run if there is an error in init_thunk or error_handler is run if there is an error in init_thunk or
final_thunk. Errors in init_thunk should be corrected by final_func. Errors in init_thunk should be corrected by
error_thunk as if nothing had been run in the first place. error_handler as if nothing had been run in the first place.
The functions take no arguments except for error_thunk, which
receives the exception as its only argument. init_thunk takes no arguments.
final_thunk takes the return value of init_thunk as its
argument, and its return value is returned by execute().
error_handler takes three arguments: the exception, a value
which is true just in case self.init_thunk ran correctly, and
a value which will be the return value of init_thunk if it ran
correctly.
""" """
self.init_thunk = init_thunk self.init_thunk = init_thunk or self.default_init_thunk
self.final_thunk = final_thunk self.final_func = final_func or self.default_final_func
self.error_thunk = error_thunk self.error_handler = error_handler or self.default_error_handler
def execute(self): def execute(self):
"""Actually run the operation""" """Actually run the operation"""
ran_init_thunk = None
try: try:
self.init_thunk() init_val = self.init_thunk()
self.final_thunk() ran_init_thunk = 1
return self.final_func(init_val)
except Exception, exc: # Catch all errors except Exception, exc: # Catch all errors
Log.exception() Log.exception()
self.error_thunk(exc) if ran_init_thunk: self.error_handler(exc, 1, init_val)
else: self.error_handler(exc, None, None)
raise exc raise exc
def default_init_thunk(self): return None
def default_final_func(self, init_val): return init_val
def default_error_handler(self, exc, ran_init, init_val): pass
class Robust: class Robust:
"""Contains various file operations made safer using tempfiles""" """Contains various file operations made safer using tempfiles"""
null_action = RobustAction(lambda: None, lambda: None, lambda e: None) null_action = RobustAction(None, None, None)
def chain(robust_action_list): def chain(*robust_action_list):
"""Return chain tying together a number of robust actions """Return chain tying together a number of robust actions
The whole chain will be aborted if some error occurs in The whole chain will be aborted if some error occurs in
initialization stage of any of the component actions. initialization stage of any of the component actions.
""" """
ras_with_completed_inits = [] ras_with_started_inits, init_return_vals = [], []
def init(): def init():
for ra in robust_action_list: for ra in robust_action_list:
ras_with_completed_inits.append(ra) ras_with_started_inits.append(ra)
ra.init_thunk() init_return_vals.append(ra.init_thunk())
def final(): return init_return_vals
for ra in robust_action_list: ra.final_thunk() def final(init_return_vals):
def error(exc): final_vals = []
for ra in ras_with_completed_inits: ra.error_thunk(exc) for ra, init_val in zip(robust_action_list, init_return_vals):
final_vals.append(ra.final_func(init_val))
return final_vals
def error(exc, ran_init, init_val):
for ra, init_val in zip(ras_with_started_inits, init_return_vals):
ra.error_handler(exc, 1, init_val)
for ra in ras_with_started_inits[len(init_return_vals):]:
ra.error_handler(exc, None, None)
return RobustAction(init, final, error) return RobustAction(init, final, error)
def chain_nested(robust_action_list): def chain_nested(*robust_action_list):
"""Like chain but final actions performed in reverse order""" """Like chain but final actions performed in reverse order"""
ras_with_completed_inits = [] ras_with_started_inits, init_vals = [], []
def init(): def init():
for ra in robust_action_list: for ra in robust_action_list:
ras_with_completed_inits.append(ra) ras_with_started_inits.append(ra)
ra.init_thunk() init_vals.append(ra.init_thunk())
def final(): return init_vals
ralist_copy = robust_action_list[:] def final(init_vals):
ralist_copy.reverse() ras_and_inits = zip(robust_action_list, init_vals)
for ra in ralist_copy: ra.final_thunk() ras_and_inits.reverse()
def error(exc): final_vals = []
for ra in ras_with_completed_inits: ra.error_thunk(exc) for ra, init_val in ras_and_inits:
final_vals.append(ra.final_func(init_val))
return final_vals
def error(exc, ran_init, init_val):
for ra, init_val in zip(ras_with_started_inits, init_return_vals):
ra.error_handler(exc, 1, init_val)
for ra in ras_with_started_inits[len(init_return_vals):]:
ra.error_handler(exc, None, None)
return RobustAction(init, final, error) return RobustAction(init, final, error)
def make_tf_robustaction(init_thunk, tempfiles, final_renames = None): def make_tf_robustaction(init_thunk, tempfiles, final_renames = None):
...@@ -107,18 +136,19 @@ class Robust: ...@@ -107,18 +136,19 @@ class Robust:
create RobustActions of that type. create RobustActions of that type.
""" """
assert type(tempfiles) is types.TupleType, tempfiles if isinstance(tempfiles, TempFile): tempfiles = (tempfiles,)
if final_renames is None: final = lambda: None if isinstance(final_renames, RPath): final_renames = (final_renames,)
else: if final_renames is None: final_renames = [None] * len(tempfiles)
assert len(tempfiles) == len(final_renames) assert len(tempfiles) == len(final_renames)
def final(): # rename tempfiles to final positions
for i in range(len(tempfiles)): def final(init_val): # rename tempfiles to final positions
final_name = final_renames[i] for tempfile, destination in zip(tempfiles, final_renames):
if final_name: if destination:
if final_name.isdir(): # Cannot rename over directory if destination.isdir(): # Cannot rename over directory
final_name.delete() destination.delete()
tempfiles[i].rename(final_name) tempfile.rename(destination)
def error(exc): return init_val
def error(exc, ran_init, init_val):
for tf in tempfiles: tf.delete() for tf in tempfiles: tf.delete()
return RobustAction(init_thunk, final, error) return RobustAction(init_thunk, final, error)
...@@ -130,36 +160,46 @@ class Robust: ...@@ -130,36 +160,46 @@ class Robust:
overwritten). overwritten).
""" """
tfl = [None] # Need mutable object that init and final can access tfl = [None] # Need some mutable state to hold tf value
def init(): def init():
if not (rorpin.isdir() and rpout.isdir()): # already a dir if not (rorpin.isdir() and rpout.isdir()): # already a dir
tfl[0] = TempFileManager.new(rpout) tfl[0] = tf = TempFileManager.new(rpout)
if rorpin.isreg(): tfl[0].write_from_fileobj(rorpin.open("rb")) if rorpin.isreg(): tf.write_from_fileobj(rorpin.open("rb"))
else: RPath.copy(rorpin, tf) else: RPath.copy(rorpin, tf)
def final(): return tf
if tfl[0] and tfl[0].lstat(): else: return None
def final(tf):
if tf and tf.lstat():
if rpout.isdir(): rpout.delete() if rpout.isdir(): rpout.delete()
tfl[0].rename(rpout) tf.rename(rpout)
return RobustAction(init, final, lambda e: tfl[0] and tfl[0].delete()) return rpout
def error(exc, ran_init, init_val):
if tfl[0]: tfl[0].delete()
return RobustAction(init, final, error)
def copy_with_attribs_action(rorpin, rpout, compress = None): def copy_with_attribs_action(rorpin, rpout, compress = None):
"""Like copy_action but also copy attributes""" """Like copy_action but also copy attributes"""
tfl = [None] # Need mutable object that init and final can access tfl = [None] # Need some mutable state for error handler
def init(): def init():
if not (rorpin.isdir() and rpout.isdir()): # already a dir if not (rorpin.isdir() and rpout.isdir()): # already a dir
tfl[0] = TempFileManager.new(rpout) tfl[0] = tf = TempFileManager.new(rpout)
if rorpin.isreg(): if rorpin.isreg():
tfl[0].write_from_fileobj(rorpin.open("rb"), compress) tf.write_from_fileobj(rorpin.open("rb"), compress)
else: RPath.copy(rorpin, tfl[0]) else: RPath.copy(rorpin, tf)
if tfl[0].lstat(): # Some files, like sockets, won't be created if tf.lstat(): # Some files, like sockets, won't be created
RPathStatic.copy_attribs(rorpin, tfl[0]) RPathStatic.copy_attribs(rorpin, tf)
def final(): return tf
else: return None
def final(tf):
if rorpin.isdir() and rpout.isdir(): if rorpin.isdir() and rpout.isdir():
RPath.copy_attribs(rorpin, rpout) RPath.copy_attribs(rorpin, rpout)
elif tfl[0] and tfl[0].lstat(): elif tf and tf.lstat():
if rpout.isdir(): rpout.delete() if rpout.isdir(): rpout.delete() # can't rename over dir
tfl[0].rename(rpout) tf.rename(rpout)
return RobustAction(init, final, lambda e: tfl[0] and tfl[0].delete()) return rpout
def error(exc, ran_init, init_val):
if tfl[0]: tfl[0].delete()
return RobustAction(init, final, error)
def copy_attribs_action(rorpin, rpout): def copy_attribs_action(rorpin, rpout):
"""Return action which just copies attributes """Return action which just copies attributes
...@@ -168,14 +208,16 @@ class Robust: ...@@ -168,14 +208,16 @@ class Robust:
normal sequence. normal sequence.
""" """
def final(): RPath.copy_attribs(rorpin, rpout) def final(init_val):
return RobustAction(lambda: None, final, lambda e: None) RPath.copy_attribs(rorpin, rpout)
return rpout
return RobustAction(None, final, None)
def symlink_action(rpath, linktext): def symlink_action(rpath, linktext):
"""Return symlink action by moving one file over another""" """Return symlink action by moving one file over another"""
tf = TempFileManager.new(rpath) tf = TempFileManager.new(rpath)
def init(): tf.symlink(linktext) def init(): tf.symlink(linktext)
return Robust.make_tf_robustaction(init, (tf,), (rpath,)) return Robust.make_tf_robustaction(init, tf, rpath)
def destructive_write_action(rp, s): def destructive_write_action(rp, s):
"""Return action writing string s to rpath rp in robust way """Return action writing string s to rpath rp in robust way
...@@ -187,9 +229,9 @@ class Robust: ...@@ -187,9 +229,9 @@ class Robust:
def init(): def init():
fp = tf.open("wb") fp = tf.open("wb")
fp.write(s) fp.write(s)
assert not fp.close() fp.close()
tf.setdata() tf.setdata()
return Robust.make_tf_robustaction(init, (tf,), (rp,)) return Robust.make_tf_robustaction(init, tf, rp)
def check_common_error(init_thunk, error_thunk = lambda exc: None): def check_common_error(init_thunk, error_thunk = lambda exc: None):
"""Execute init_thunk, if error, run error_thunk on exception """Execute init_thunk, if error, run error_thunk on exception
...@@ -357,8 +399,8 @@ class SaveState: ...@@ -357,8 +399,8 @@ class SaveState:
symtext = apply(os.path.join, symtext = apply(os.path.join,
('increments',) + last_file_rorp.index) ('increments',) + last_file_rorp.index)
return Robust.symlink_action(cls._last_file_sym, symtext) return Robust.symlink_action(cls._last_file_sym, symtext)
else: return RobustAction(lambda: None, cls.touch_last_file, else: return RobustAction(None, lambda init_val: cls.touch_last_file(),
lambda exc: None) None)
def checkpoint(cls, ITR, finalizer, last_file_rorp, override = None): def checkpoint(cls, ITR, finalizer, last_file_rorp, override = None):
"""Save states of tree reducer and finalizer during inc backup """Save states of tree reducer and finalizer during inc backup
...@@ -372,9 +414,9 @@ class SaveState: ...@@ -372,9 +414,9 @@ class SaveState:
cls._last_checkpoint_time = time.time() cls._last_checkpoint_time = time.time()
Log("Writing checkpoint time %s" % cls._last_checkpoint_time, 7) Log("Writing checkpoint time %s" % cls._last_checkpoint_time, 7)
state_string = cPickle.dumps((ITR, finalizer)) state_string = cPickle.dumps((ITR, finalizer))
Robust.chain([Robust.destructive_write_action(cls._checkpoint_rp, Robust.chain(Robust.destructive_write_action(cls._checkpoint_rp,
state_string), state_string),
cls.record_last_file_action(last_file_rorp)]).execute() cls.record_last_file_action(last_file_rorp)).execute()
def checkpoint_needed(cls): def checkpoint_needed(cls):
"""Returns true if another checkpoint is called for""" """Returns true if another checkpoint is called for"""
......
...@@ -212,13 +212,13 @@ class RORPIter: ...@@ -212,13 +212,13 @@ class RORPIter:
"""Return action patching basisrp using diff_rorp""" """Return action patching basisrp using diff_rorp"""
assert diff_rorp, "Missing diff index %s" % basisrp.index assert diff_rorp, "Missing diff index %s" % basisrp.index
if not diff_rorp.lstat(): if not diff_rorp.lstat():
return RobustAction(lambda: None, basisrp.delete, lambda e: None) return RobustAction(None, lambda init_val: basisrp.delete(), None)
if Globals.preserve_hardlinks and diff_rorp.isflaglinked(): if Globals.preserve_hardlinks and diff_rorp.isflaglinked():
if not basisrp: basisrp = base_rp.new_index(diff_rorp.index) if not basisrp: basisrp = base_rp.new_index(diff_rorp.index)
return RobustAction(lambda: None, tf = TempFileManager.new(basisrp)
lambda: Hardlink.link_rp(diff_rorp, basisrp), def init(): Hardlink.link_rp(diff_rorp, tf, basisrp)
lambda e: None) return Robust.make_tf_robustaction(init, tf, basisrp)
elif basisrp and basisrp.isreg() and diff_rorp.isreg(): elif basisrp and basisrp.isreg() and diff_rorp.isreg():
assert diff_rorp.get_attached_filetype() == 'diff' assert diff_rorp.get_attached_filetype() == 'diff'
return Rdiff.patch_with_attribs_action(basisrp, diff_rorp) return Rdiff.patch_with_attribs_action(basisrp, diff_rorp)
......
...@@ -16,13 +16,29 @@ class StatsObj: ...@@ -16,13 +16,29 @@ class StatsObj:
'DeletedFiles', 'DeletedFileSize', 'DeletedFiles', 'DeletedFileSize',
'ChangedFiles', 'ChangedFiles',
'ChangedSourceSize', 'ChangedMirrorSize', 'ChangedSourceSize', 'ChangedMirrorSize',
'IncrementFileSize') 'IncrementFiles', 'IncrementFileSize')
stat_time_attrs = ('StartTime', 'EndTime', 'ElapsedTime') stat_time_attrs = ('StartTime', 'EndTime', 'ElapsedTime')
stat_attrs = stat_time_attrs + stat_file_attrs stat_attrs = ('Filename',) + stat_time_attrs + stat_file_attrs
# Below, the second value in each pair is true iff the value
# indicates a number of bytes
stat_file_pairs = (('SourceFiles', None), ('SourceFileSize', 1),
('MirrorFiles', None), ('MirrorFileSize', 1),
('NewFiles', None), ('NewFileSize', 1),
('DeletedFiles', None), ('DeletedFileSize', 1),
('ChangedFiles', None),
('ChangedSourceSize', 1), ('ChangedMirrorSize', 1),
('IncrementFiles', None), ('IncrementFileSize', 1))
# Set all stats to None, indicating info not available # Set all stats to None, indicating info not available
for attr in stat_attrs: locals()[attr] = None for attr in stat_attrs: locals()[attr] = None
# This is used in get_byte_summary_string below
byte_abbrev_list = ((1024*1024*1024*1024, "TB"),
(1024*1024*1024, "GB"),
(1024*1024, "MB"),
(1024, "KB"))
def get_stat(self, attribute): def get_stat(self, attribute):
"""Get a statistic""" """Get a statistic"""
try: return self.__dict__[attribute] try: return self.__dict__[attribute]
...@@ -34,33 +50,89 @@ class StatsObj: ...@@ -34,33 +50,89 @@ class StatsObj:
"""Set attribute to given value""" """Set attribute to given value"""
self.__dict__[attr] = value self.__dict__[attr] = value
def get_stats_line(self, index):
"""Return one line abbreviated version of full stats string"""
file_attrs = map(lambda attr: str(self.get_stat(attr)),
self.stat_file_attrs)
if not index: filename = "."
else:
# use repr to quote newlines in relative filename, then
# take of leading and trailing quote.
filename = repr(apply(os.path.join, index))[1:-1]
return " ".join([filename,] + file_attrs)
def set_stats_from_line(self, line):
"""Set statistics from given line"""
def error(): raise StatsException("Bad line '%s'" % line)
if line[-1] == "\n": line = line[:-1]
lineparts = line.split(" ")
if len(lineparts) < len(stat_file_attrs): error()
for attr, val_string in zip(stat_file_attrs,
lineparts[-len(stat_file_attrs):]):
try: val = long(val_string)
except ValueError:
try: val = float(val_string)
except ValueError: error()
self.set_stat(attr, val)
return self
def get_stats_string(self): def get_stats_string(self):
"""Return string printing out statistics""" """Return extended string printing out statistics"""
return self.get_timestats_string() + self.get_filestats_string()
def get_timestats_string(self):
"""Return portion of statistics string dealing with time"""
timelist = [] timelist = []
if self.StartTime is not None: if self.StartTime is not None:
timelist.append("StartTime %s (%s)\n" % timelist.append("StartTime %.2f (%s)\n" %
(self.StartTime, Time.timetopretty(self.StartTime))) (self.StartTime, Time.timetopretty(self.StartTime)))
if self.EndTime is not None: if self.EndTime is not None:
timelist.append("EndTime %s (%s)\n" % timelist.append("EndTime %.2f (%s)\n" %
(self.EndTime, Time.timetopretty(self.EndTime))) (self.EndTime, Time.timetopretty(self.EndTime)))
if self.StartTime is not None and self.EndTime is not None: if self.ElapsedTime or (self.StartTime is not None and
self.EndTime is not None):
if self.ElapsedTime is None: if self.ElapsedTime is None:
self.ElapsedTime = self.EndTime - self.StartTime self.ElapsedTime = self.EndTime - self.StartTime
timelist.append("ElapsedTime %s (%s)\n" % timelist.append("ElapsedTime %.2f (%s)\n" %
(self.ElapsedTime, Time.inttopretty(self.ElapsedTime))) (self.ElapsedTime, Time.inttopretty(self.ElapsedTime)))
return "".join(timelist)
def get_filestats_string(self):
"""Return portion of statistics string about files and bytes"""
def fileline(stat_file_pair):
"""Return zero or one line of the string"""
attr, in_bytes = stat_file_pair
val = self.get_stat(attr)
if val is None: return ""
if in_bytes:
return "%s %s (%s)\n" % (attr, val,
self.get_byte_summary_string(val))
else: return "%s %s\n" % (attr, val)
return "".join(map(fileline, self.stat_file_pairs))
filelist = ["%s %s\n" % (attr, self.get_stat(attr)) def get_byte_summary_string(self, byte_count):
for attr in self.stat_file_attrs """Turn byte count into human readable string like "7.23GB" """
if self.get_stat(attr) is not None] for abbrev_bytes, abbrev_string in self.byte_abbrev_list:
return "".join(timelist + filelist) if byte_count >= abbrev_bytes:
# Now get 3 significant figures
abbrev_count = float(byte_count)/abbrev_bytes
if abbrev_count >= 100: precision = 0
elif abbrev_count >= 10: precision = 1
else: precision = 2
return "%%.%df %s" % (precision, abbrev_string) \
% (abbrev_count,)
byte_count = round(byte_count)
if byte_count == 1: return "1 byte"
else: return "%d bytes" % (byte_count,)
def get_stats_logstring(self, title): def get_stats_logstring(self, title):
"""Like get_stats_string, but add header and footer""" """Like get_stats_string, but add header and footer"""
header = "-------------[ %s ]-------------" % title header = "--------------[ %s ]--------------" % title
footer = "-" * len(header) footer = "-" * len(header)
return "%s\n%s%s\n" % (header, self.get_stats_string(), footer) return "%s\n%s%s\n" % (header, self.get_stats_string(), footer)
def init_stats_from_string(self, s): def set_stats_from_string(self, s):
"""Initialize attributes from string, return self for convenience""" """Initialize attributes from string, return self for convenience"""
def error(line): raise StatsException("Bad line '%s'" % line) def error(line): raise StatsException("Bad line '%s'" % line)
...@@ -91,7 +163,7 @@ class StatsObj: ...@@ -91,7 +163,7 @@ class StatsObj:
def read_stats_from_rp(self, rp): def read_stats_from_rp(self, rp):
"""Set statistics from rpath, return self for convenience""" """Set statistics from rpath, return self for convenience"""
fp = rp.open("r") fp = rp.open("r")
self.init_stats_from_string(fp.read()) self.set_stats_from_string(fp.read())
fp.close() fp.close()
return self return self
...@@ -162,22 +234,96 @@ class StatsITR(IterTreeReducer, StatsObj): ...@@ -162,22 +234,96 @@ class StatsITR(IterTreeReducer, StatsObj):
self.ChangedFiles += 1 self.ChangedFiles += 1
self.ChangedSourceSize += mirror_dsrp.getsize() self.ChangedSourceSize += mirror_dsrp.getsize()
self.ChangedMirrorSize += self.mirror_base_size self.ChangedMirrorSize += self.mirror_base_size
self.IncrementFileSize += inc_rp and inc_rp.getsize() or 0 if inc_rp:
self.IncrementFiles += 1
self.IncrementFileSize += inc_rp.getsize()
else: # new file was created else: # new file was created
self.NewFiles += 1 self.NewFiles += 1
self.NewFileSize += mirror_dsrp.getsize() self.NewFileSize += mirror_dsrp.getsize()
self.IncrementFileSize += inc_rp and inc_rp.getsize() or 0 if inc_rp:
self.IncrementFiles += 1
self.IncrementFileSize += inc_rp.getsize()
else: else:
if self.mirror_base_exists: # file was deleted from mirror if self.mirror_base_exists: # file was deleted from mirror
self.MirrorFiles += 1 self.MirrorFiles += 1
self.MirrorFileSize += self.mirror_base_size self.MirrorFileSize += self.mirror_base_size
self.DeletedFiles += 1 self.DeletedFiles += 1
self.DeletedFileSize += self.mirror_base_size self.DeletedFileSize += self.mirror_base_size
self.IncrementFileSize += inc_rp and inc_rp.getsize() or 0 if inc_rp:
self.IncrementFiles += 1
self.IncrementFileSize += inc_rp.getsize()
def add_file_stats(self, subinstance): def add_file_stats(self, subinstance):
"""Add all file statistics from subinstance to current totals""" """Add all file statistics from subinstance to current totals"""
for attr in self.stat_file_attrs: for attr in self.stat_file_attrs:
self.set_stat(attr, self.set_stat(attr,
self.get_stat(attr) + subinstance.get_stat(attr)) self.get_stat(attr) + subinstance.get_stat(attr))
class Stats:
"""Misc statistics methods, pertaining to dir and session stat files"""
# This is the RPath of the directory statistics file, and the
# associated open file. It will hold a line of statistics for
# each directory that is backed up.
_dir_stats_rp = None
_dir_stats_fp = None
# This goes at the beginning of the directory statistics file and
# explains the format.
_dir_stats_header = """# rdiff-backup directory statistics file
#
# Each line is in the following format:
# RelativeDirName %s
""" % " ".join(StatsObj.stat_file_attrs)
def open_dir_stats_file(cls):
"""Open directory statistics file, write header"""
assert not cls._dir_stats_fp, "Directory file already open"
if Globals.compression: suffix = "data.gz"
else: suffix = "data"
cls._dir_stats_rp = Inc.get_inc(Globals.rbdir.append(
"directory_statistics"), Time.curtime, suffix)
if cls._dir_stats_rp.lstat():
Log("Warning, statistics file %s already exists, appending", 2)
cls._dir_stats_fp = cls._dir_stats_rp.open("ab",
Globals.compression)
else: cls._dir_stats_fp = \
cls._dir_stats_rp.open("wb", Globals.compression)
cls._dir_stats_fp.write(cls._dir_stats_header)
def write_dir_stats_line(cls, statobj, index):
"""Write info from statobj about rpath to statistics file"""
cls._dir_stats_fp.write(statobj.get_stats_line(index) +"\n")
def close_dir_stats_file(cls):
"""Close directory statistics file if its open"""
if cls._dir_stats_fp:
cls._dir_stats_fp.close()
cls._dir_stats_fp = None
def write_session_statistics(cls, statobj):
"""Write session statistics into file, log"""
stat_inc = Inc.get_inc(Globals.rbdir.append("session_statistics"),
Time.curtime, "data")
statobj.StartTime = Time.curtime
statobj.EndTime = time.time()
# include hardlink data and dir stats in size of increments
if Globals.preserve_hardlinks and Hardlink.final_inc:
# include hardlink data in size of increments
statobj.IncrementFiles += 1
statobj.IncrementFileSize += Hardlink.final_inc.getsize()
if cls._dir_stats_rp and cls._dir_stats_rp.lstat():
statobj.IncrementFiles += 1
statobj.IncrementFileSize += cls._dir_stats_rp.getsize()
statobj.write_stats_to_rp(stat_inc)
if Globals.print_statistics:
message = statobj.get_stats_logstring("Session statistics")
Log.log_to_file(message)
Globals.client_conn.sys.stdout.write(message)
MakeClass(Stats)
...@@ -8,7 +8,7 @@ import re, os ...@@ -8,7 +8,7 @@ import re, os
class Globals: class Globals:
# The current version of rdiff-backup # The current version of rdiff-backup
version = "0.7.5.1" version = "0.7.5.3"
# If this is set, use this value in seconds as the current time # If this is set, use this value in seconds as the current time
# instead of reading it from the clock. # instead of reading it from the clock.
......
#!/usr/bin/env python #!/usr/bin/env python
# #
# rdiff-backup -- Mirror files while keeping incremental changes # rdiff-backup -- Mirror files while keeping incremental changes
# Version 0.7.5.1 released May 25, 2002 # Version 0.7.5.3 released May 25, 2002
# Copyright (C) 2001, 2002 Ben Escoto <bescoto@stanford.edu> # Copyright (C) 2001, 2002 Ben Escoto <bescoto@stanford.edu>
# #
# This program is licensed under the GNU General Public License (GPL). # This program is licensed under the GNU General Public License (GPL).
......
...@@ -240,6 +240,7 @@ class HLDestinationStruct: ...@@ -240,6 +240,7 @@ class HLDestinationStruct:
"""Apply diffs and finalize, with checkpointing and statistics""" """Apply diffs and finalize, with checkpointing and statistics"""
collated = RORPIter.CollateIterators(diffs, cls.initial_dsiter2) collated = RORPIter.CollateIterators(diffs, cls.initial_dsiter2)
finalizer, ITR = cls.get_finalizer(), cls.get_MirrorITR(inc_rpath) finalizer, ITR = cls.get_finalizer(), cls.get_MirrorITR(inc_rpath)
Stats.open_dir_stats_file()
dsrp = None dsrp = None
def error_checked(): def error_checked():
...@@ -262,13 +263,15 @@ class HLDestinationStruct: ...@@ -262,13 +263,15 @@ class HLDestinationStruct:
cls.check_skip_error(finalizer.Finish, dsrp) cls.check_skip_error(finalizer.Finish, dsrp)
except: cls.handle_last_error(dsrp, finalizer, ITR) except: cls.handle_last_error(dsrp, finalizer, ITR)
if Globals.preserve_hardlinks: Hardlink.final_writedata() if Globals.preserve_hardlinks: Hardlink.final_writedata()
cls.write_statistics(ITR) Stats.close_dir_stats_file()
Stats.write_session_statistics(ITR)
SaveState.checkpoint_remove() SaveState.checkpoint_remove()
def patch_increment_and_finalize(cls, dest_rpath, diffs, inc_rpath): def patch_increment_and_finalize(cls, dest_rpath, diffs, inc_rpath):
"""Apply diffs, write increment if necessary, and finalize""" """Apply diffs, write increment if necessary, and finalize"""
collated = RORPIter.CollateIterators(diffs, cls.initial_dsiter2) collated = RORPIter.CollateIterators(diffs, cls.initial_dsiter2)
finalizer, ITR = cls.get_finalizer(), cls.get_ITR(inc_rpath) finalizer, ITR = cls.get_finalizer(), cls.get_ITR(inc_rpath)
Stats.open_dir_stats_file()
dsrp = None dsrp = None
def error_checked(): def error_checked():
...@@ -292,7 +295,8 @@ class HLDestinationStruct: ...@@ -292,7 +295,8 @@ class HLDestinationStruct:
cls.check_skip_error(finalizer.Finish, dsrp) cls.check_skip_error(finalizer.Finish, dsrp)
except: cls.handle_last_error(dsrp, finalizer, ITR) except: cls.handle_last_error(dsrp, finalizer, ITR)
if Globals.preserve_hardlinks: Hardlink.final_writedata() if Globals.preserve_hardlinks: Hardlink.final_writedata()
cls.write_statistics(ITR) Stats.close_dir_stats_file()
Stats.write_session_statistics(ITR)
SaveState.checkpoint_remove() SaveState.checkpoint_remove()
def check_skip_error(cls, thunk, dsrp): def check_skip_error(cls, thunk, dsrp):
...@@ -323,19 +327,4 @@ class HLDestinationStruct: ...@@ -323,19 +327,4 @@ class HLDestinationStruct:
SaveState.touch_last_file_definitive() SaveState.touch_last_file_definitive()
raise raise
def write_statistics(cls, ITR):
"""Write session statistics to file, log"""
stat_inc = Inc.get_inc(Globals.rbdir.append("session_statistics"),
Time.curtime, "data")
ITR.StartTime = Time.curtime
ITR.EndTime = time.time()
if Globals.preserve_hardlinks and Hardlink.final_inc:
# include hardlink data in size of increments
ITR.IncrementFileSize += Hardlink.final_inc.getsize()
ITR.write_stats_to_rp(stat_inc)
if Globals.print_statistics:
message = ITR.get_stats_logstring("Session statistics")
Log.log_to_file(message)
Globals.client_conn.sys.stdout.write(message)
MakeClass(HLDestinationStruct) MakeClass(HLDestinationStruct)
...@@ -22,7 +22,11 @@ class Inc: ...@@ -22,7 +22,11 @@ class Inc:
mirror is the mirrored file from the last backup, mirror is the mirrored file from the last backup,
incpref is the prefix of the increment file. incpref is the prefix of the increment file.
This function basically moves mirror -> incpref. This function basically moves the information about the mirror
file to incpref.
The returned RobustAction when executed should return the name
of the incfile, or None if none was created.
""" """
if not (new and new.lstat() or mirror.lstat()): if not (new and new.lstat() or mirror.lstat()):
...@@ -39,13 +43,15 @@ class Inc: ...@@ -39,13 +43,15 @@ class Inc:
else: return Inc.makesnapshot_action(mirror, incpref) else: return Inc.makesnapshot_action(mirror, incpref)
def Increment(new, mirror, incpref): def Increment(new, mirror, incpref):
Inc.Increment_action(new, mirror, incpref).execute() return Inc.Increment_action(new, mirror, incpref).execute()
def makemissing_action(incpref): def makemissing_action(incpref):
"""Signify that mirror file was missing""" """Signify that mirror file was missing"""
return RobustAction(lambda: None, def final(init_val):
Inc.get_inc_ext(incpref, "missing").touch, incrp = Inc.get_inc_ext(incpref, "missing")
lambda exp: None) incrp.touch()
return incrp
return RobustAction(None, final, None)
def makesnapshot_action(mirror, incpref): def makesnapshot_action(mirror, incpref):
"""Copy mirror to incfile, since new is quite different""" """Copy mirror to incfile, since new is quite different"""
...@@ -62,22 +68,29 @@ class Inc: ...@@ -62,22 +68,29 @@ class Inc:
if (Globals.compression and if (Globals.compression and
not Globals.no_compression_regexp.match(mirror.path)): not Globals.no_compression_regexp.match(mirror.path)):
diff = Inc.get_inc_ext(incpref, "diff.gz") diff = Inc.get_inc_ext(incpref, "diff.gz")
return Robust.chain([Rdiff.write_delta_action(new, mirror, compress = 1
diff, 1),
Robust.copy_attribs_action(mirror, diff)])
else: else:
diff = Inc.get_inc_ext(incpref, "diff") diff = Inc.get_inc_ext(incpref, "diff")
return Robust.chain([Rdiff.write_delta_action(new, mirror, compress = None
diff, None),
Robust.copy_attribs_action(mirror, diff)]) diff_tf = TempFileManager.new(diff)
sig_tf = TempFileManager.new(mirror, None)
def init():
Rdiff.write_delta(new, mirror, diff_tf, compress, sig_tf)
RPath.copy_attribs(mirror, diff_tf)
return diff
return Robust.make_tf_robustaction(init, (diff_tf, sig_tf),
(diff, None))
def makedir_action(mirrordir, incpref): def makedir_action(mirrordir, incpref):
"""Make file indicating directory mirrordir has changed""" """Make file indicating directory mirrordir has changed"""
dirsign = Inc.get_inc_ext(incpref, "dir") dirsign = Inc.get_inc_ext(incpref, "dir")
def final(): tf = TempFileManager.new(dirsign)
dirsign.touch() def init():
RPath.copy_attribs(mirrordir, dirsign) tf.touch()
return RobustAction(lambda: None, final, dirsign.delete) RPath.copy_attribs(mirrordir, tf)
return dirsign
return Robust.make_tf_robustaction(init, tf, dirsign)
def get_inc(rp, time, typestr): def get_inc(rp, time, typestr):
"""Return increment like rp but with time and typestr suffixes""" """Return increment like rp but with time and typestr suffixes"""
...@@ -127,8 +140,15 @@ class IncrementITR(StatsITR): ...@@ -127,8 +140,15 @@ class IncrementITR(StatsITR):
Remember this object needs to be pickable. Remember this object needs to be pickable.
""" """
mirror_isdirectory, directory_replacement = None, None # Iff true, mirror file was a directory
mirror_isdirectory = None
# If set, what the directory on the mirror side will be replaced with
directory_replacement = None
# True iff there has been some change at this level or lower (used
# for marking directories to be flagged)
changed = None changed = None
# Holds the RPath of the created increment file, if any
incrp = None
def __init__(self, inc_rpath): def __init__(self, inc_rpath):
"""Set inc_rpath, an rpath of the base of the tree""" """Set inc_rpath, an rpath of the base of the tree"""
...@@ -184,26 +204,34 @@ class IncrementITR(StatsITR): ...@@ -184,26 +204,34 @@ class IncrementITR(StatsITR):
if not (incpref.lstat() and incpref.isdir()): incpref.mkdir() if not (incpref.lstat() and incpref.isdir()): incpref.mkdir()
if diff_rorp and diff_rorp.isreg() and diff_rorp.file: if diff_rorp and diff_rorp.isreg() and diff_rorp.file:
tf = TempFileManager.new(dsrp) tf = TempFileManager.new(dsrp)
RPathStatic.copy_with_attribs(diff_rorp, tf) def init():
tf.set_attached_filetype(diff_rorp.get_attached_filetype()) RPathStatic.copy_with_attribs(diff_rorp, tf)
tf.set_attached_filetype(diff_rorp.get_attached_filetype())
def error(exc, ran_init, init_val): tf.delete()
RobustAction(init, None, error).execute()
self.directory_replacement = tf self.directory_replacement = tf
def init_non_dir(self, dsrp, diff_rorp, incpref): def init_non_dir(self, dsrp, diff_rorp, incpref):
"""Process a non directory file (initial pass)""" """Process a non directory file (initial pass)"""
if not diff_rorp: return # no diff, so no change necessary if not diff_rorp: return # no diff, so no change necessary
if diff_rorp.isreg() and (dsrp.isreg() or diff_rorp.isflaglinked()): if diff_rorp.isreg() and (dsrp.isreg() or diff_rorp.isflaglinked()):
tf = TempFileManager.new(dsrp) # Write updated mirror to temp file so we can compute
# reverse diff locally
mirror_tf = TempFileManager.new(dsrp)
def init_thunk(): def init_thunk():
if diff_rorp.isflaglinked(): if diff_rorp.isflaglinked():
Hardlink.link_rp(diff_rorp, tf, dsrp) Hardlink.link_rp(diff_rorp, mirror_tf, dsrp)
else: Rdiff.patch_with_attribs_action(dsrp, diff_rorp, else: Rdiff.patch_with_attribs_action(dsrp, diff_rorp,
tf).execute() mirror_tf).execute()
Inc.Increment_action(tf, dsrp, incpref).execute() self.incrp = Inc.Increment_action(mirror_tf, dsrp,
Robust.make_tf_robustaction(init_thunk, (tf,), (dsrp,)).execute() incpref).execute()
else: def final(init_val): mirror_tf.rename(dsrp)
Robust.chain([Inc.Increment_action(diff_rorp, dsrp, incpref), def error(exc, ran_init, init_val): mirror_tf.delete()
RORPIter.patchonce_action(None, dsrp, diff_rorp)] RobustAction(init_thunk, final, error).execute()
).execute() else: self.incrp = Robust.chain(
Inc.Increment_action(diff_rorp, dsrp, incpref),
RORPIter.patchonce_action(None, dsrp, diff_rorp)).execute()[0]
self.changed = 1 self.changed = 1
def end_process(self): def end_process(self):
...@@ -217,19 +245,18 @@ class IncrementITR(StatsITR): ...@@ -217,19 +245,18 @@ class IncrementITR(StatsITR):
if self.directory_replacement: if self.directory_replacement:
tf = self.directory_replacement tf = self.directory_replacement
Inc.Increment(tf, dsrp, incpref) self.incrp = Robust.chain(
RORPIter.patchonce_action(None, dsrp, tf).execute() Inc.Increment_action(tf, dsrp, incpref),
RORPIter.patchonce_action(None, dsrp, tf)).execute()[0]
tf.delete() tf.delete()
else: else:
Inc.Increment(diff_rorp, dsrp, incpref) self.incrp = Inc.Increment(diff_rorp, dsrp, incpref)
if diff_rorp: if diff_rorp:
RORPIter.patchonce_action(None, dsrp, diff_rorp).execute() RORPIter.patchonce_action(None, dsrp, diff_rorp).execute()
self.end_stats(diff_rorp, dsrp, Inc._inc_file) self.end_stats(diff_rorp, dsrp, self.incrp)
if self.incpref.isdir() and (self.mirror_isdirectory or dsrp.isdir()): if self.mirror_isdirectory or dsrp.isdir():
self.write_stats_to_rp(Inc.get_inc( Stats.write_dir_stats_line(self, dsrp.index)
self.incpref.append("directory_statistics"),
Time.curtime, "data"))
def branch_process(self, subinstance): def branch_process(self, subinstance):
"""Update statistics, and the has_changed flag if change in branch""" """Update statistics, and the has_changed flag if change in branch"""
...@@ -239,6 +266,8 @@ class IncrementITR(StatsITR): ...@@ -239,6 +266,8 @@ class IncrementITR(StatsITR):
class MirrorITR(StatsITR): class MirrorITR(StatsITR):
"""Like IncrementITR, but only patch mirror directory, don't increment""" """Like IncrementITR, but only patch mirror directory, don't increment"""
# This is always None since no increments will be created
incrp = None
def __init__(self, inc_rpath): def __init__(self, inc_rpath):
"""Set inc_rpath, an rpath of the base of the inc tree""" """Set inc_rpath, an rpath of the base of the inc tree"""
self.inc_rpath = inc_rpath self.inc_rpath = inc_rpath
...@@ -251,9 +280,6 @@ class MirrorITR(StatsITR): ...@@ -251,9 +280,6 @@ class MirrorITR(StatsITR):
RORPIter.patchonce_action(None, mirror_dsrp, diff_rorp).execute() RORPIter.patchonce_action(None, mirror_dsrp, diff_rorp).execute()
self.incpref = self.inc_rpath.new_index(index) self.incpref = self.inc_rpath.new_index(index)
if mirror_dsrp.isdir() and not self.incpref.lstat():
self.incpref.mkdir() # holds the statistics files
self.diff_rorp, self.mirror_dsrp = diff_rorp, mirror_dsrp self.diff_rorp, self.mirror_dsrp = diff_rorp, mirror_dsrp
def end_process(self): def end_process(self):
...@@ -262,11 +288,9 @@ class MirrorITR(StatsITR): ...@@ -262,11 +288,9 @@ class MirrorITR(StatsITR):
except AttributeError: # Some error above prevented these being set except AttributeError: # Some error above prevented these being set
return return
self.end_stats(self.diff_rorp, self.mirror_dsrp) self.end_stats(diff_rorp, mirror_dsrp)
if self.incpref.isdir(): if mirror_dsrp.isdir():
self.write_stats_to_rp(Inc.get_inc( Stats.write_dir_stats_line(self, mirror_dsrp.index)
self.incpref.append("directory_statistics"),
Time.curtime, "data"))
def branch_process(self, subinstance): def branch_process(self, subinstance):
"""Update statistics with subdirectory results""" """Update statistics with subdirectory results"""
......
...@@ -139,6 +139,5 @@ class Logger: ...@@ -139,6 +139,5 @@ class Logger:
logging_func("Exception %s raised of class %s" % logging_func("Exception %s raised of class %s" %
(exc_info[1], exc_info[0]), verbosity) (exc_info[1], exc_info[0]), verbosity)
logging_func("".join(traceback.format_tb(exc_info[2])), verbosity+1) logging_func("".join(traceback.format_tb(exc_info[2])), verbosity+1)
Log = Logger() Log = Logger()
...@@ -146,7 +146,8 @@ class Main: ...@@ -146,7 +146,8 @@ class Main:
self.action == "remove-older-than"): self.action == "remove-older-than"):
self.commandline_error("Only use one argument, " self.commandline_error("Only use one argument, "
"the root of the backup directory") "the root of the backup directory")
if l > 2: self.commandline_error("Too many arguments given") if l > 2 and self.action != "calculate-average":
self.commandline_error("Too many arguments given")
def commandline_error(self, message): def commandline_error(self, message):
sys.stderr.write("Error: %s\n" % message) sys.stderr.write("Error: %s\n" % message)
......
...@@ -24,11 +24,11 @@ class Rdiff: ...@@ -24,11 +24,11 @@ class Rdiff:
"""Like get_delta but signature is in a file object""" """Like get_delta but signature is in a file object"""
sig_tf = TempFileManager.new(rp_new, None) sig_tf = TempFileManager.new(rp_new, None)
sig_tf.write_from_fileobj(sig_fileobj) sig_tf.write_from_fileobj(sig_fileobj)
rdiff_popen_obj = Rdiff.get_delta(sig_tf, rp_new) rdiff_popen_obj = Rdiff.get_delta_sigrp(sig_tf, rp_new)
rdiff_popen_obj.set_thunk(sig_tf.delete) rdiff_popen_obj.set_thunk(sig_tf.delete)
return rdiff_popen_obj return rdiff_popen_obj
def get_delta(rp_signature, rp_new): def get_delta_sigrp(rp_signature, rp_new):
"""Take signature rp and new rp, return delta file object""" """Take signature rp and new rp, return delta file object"""
assert rp_signature.conn is rp_new.conn assert rp_signature.conn is rp_new.conn
Log("Getting delta of %s with signature %s" % Log("Getting delta of %s with signature %s" %
...@@ -45,18 +45,18 @@ class Rdiff: ...@@ -45,18 +45,18 @@ class Rdiff:
""" """
sig_tf = TempFileManager.new(new, None) sig_tf = TempFileManager.new(new, None)
delta_tf = TempFileManager.new(delta) delta_tf = TempFileManager.new(delta)
def init(): def init(): Rdiff.write_delta(basis, new, delta_tf, compress, sig_tf)
Log("Writing delta %s from %s -> %s" %
(basis.path, new.path, delta.path), 7)
sig_tf.write_from_fileobj(Rdiff.get_signature(basis))
delta_tf.write_from_fileobj(Rdiff.get_delta(sig_tf, new), compress)
sig_tf.delete()
return Robust.make_tf_robustaction(init, (sig_tf, delta_tf), return Robust.make_tf_robustaction(init, (sig_tf, delta_tf),
(None, delta)) (None, delta))
def write_delta(basis, new, delta, compress = None): def write_delta(basis, new, delta, compress = None, sig_tf = None):
"""Write rdiff delta which brings basis to new""" """Write rdiff delta which brings basis to new"""
Rdiff.write_delta_action(basis, new, delta, compress).execute() Log("Writing delta %s from %s -> %s" %
(basis.path, new.path, delta.path), 7)
if not sig_tf: sig_tf = TempFileManager.new(new, None)
sig_tf.write_from_fileobj(Rdiff.get_signature(basis))
delta.write_from_fileobj(Rdiff.get_delta_sigrp(sig_tf, new), compress)
sig_tf.delete()
def patch_action(rp_basis, rp_delta, rp_out = None, def patch_action(rp_basis, rp_delta, rp_out = None,
out_tf = None, delta_compressed = None): out_tf = None, delta_compressed = None):
...@@ -106,18 +106,20 @@ class Rdiff: ...@@ -106,18 +106,20 @@ class Rdiff:
if not rp_out: rp_out = rp_basis if not rp_out: rp_out = rp_basis
delta_tf = TempFileManager.new(rp_out, None) delta_tf = TempFileManager.new(rp_out, None)
def init(): delta_tf.write_from_fileobj(delta_fileobj) def init(): delta_tf.write_from_fileobj(delta_fileobj)
return Robust.chain_nested([RobustAction(init, delta_tf.delete, def final(init_val): delta_tf.delete()
lambda exc: delta_tf.delete), def error(exc, ran_init, init_val): delta_tf.delete()
Rdiff.patch_action(rp_basis, delta_tf, write_delta_action = RobustAction(init, final, error)
rp_out, out_tf)]) return Robust.chain(write_delta_action,
Rdiff.patch_action(rp_basis, delta_tf,
rp_out, out_tf))
def patch_with_attribs_action(rp_basis, rp_delta, rp_out = None): def patch_with_attribs_action(rp_basis, rp_delta, rp_out = None):
"""Like patch_action, but also transfers attributs from rp_delta""" """Like patch_action, but also transfers attributs from rp_delta"""
if not rp_out: rp_out = rp_basis if not rp_out: rp_out = rp_basis
tf = TempFileManager.new(rp_out) tf = TempFileManager.new(rp_out)
return Robust.chain_nested( return Robust.chain_nested(
[Rdiff.patch_action(rp_basis, rp_delta, rp_out, tf), Rdiff.patch_action(rp_basis, rp_delta, rp_out, tf),
Robust.copy_attribs_action(rp_delta, tf)]) Robust.copy_attribs_action(rp_delta, tf))
def copy_action(rpin, rpout): def copy_action(rpin, rpout):
"""Use rdiff to copy rpin to rpout, conserving bandwidth""" """Use rdiff to copy rpin to rpout, conserving bandwidth"""
......
...@@ -33,69 +33,98 @@ execfile("hardlink.py") ...@@ -33,69 +33,98 @@ execfile("hardlink.py")
class RobustAction: class RobustAction:
"""Represents a file operation to be accomplished later""" """Represents a file operation to be accomplished later"""
def __init__(self, init_thunk, final_thunk, error_thunk): def __init__(self, init_thunk, final_func, error_handler):
"""RobustAction initializer """RobustAction initializer
All the thunks are functions whose return value will be All the thunks are functions whose return value will be
ignored. init_thunk should not make any irreversible changes ignored. init_thunk should not make any irreversible changes
but prepare for the writing of the important data. final_thunk but prepare for the writing of the important data. final_func
should be as short as possible and do the real work. should be as short as possible and do the real work.
error_thunk is run if there is an error in init_thunk or error_handler is run if there is an error in init_thunk or
final_thunk. Errors in init_thunk should be corrected by final_func. Errors in init_thunk should be corrected by
error_thunk as if nothing had been run in the first place. error_handler as if nothing had been run in the first place.
The functions take no arguments except for error_thunk, which
receives the exception as its only argument. init_thunk takes no arguments.
final_thunk takes the return value of init_thunk as its
argument, and its return value is returned by execute().
error_handler takes three arguments: the exception, a value
which is true just in case self.init_thunk ran correctly, and
a value which will be the return value of init_thunk if it ran
correctly.
""" """
self.init_thunk = init_thunk self.init_thunk = init_thunk or self.default_init_thunk
self.final_thunk = final_thunk self.final_func = final_func or self.default_final_func
self.error_thunk = error_thunk self.error_handler = error_handler or self.default_error_handler
def execute(self): def execute(self):
"""Actually run the operation""" """Actually run the operation"""
ran_init_thunk = None
try: try:
self.init_thunk() init_val = self.init_thunk()
self.final_thunk() ran_init_thunk = 1
return self.final_func(init_val)
except Exception, exc: # Catch all errors except Exception, exc: # Catch all errors
Log.exception() Log.exception()
self.error_thunk(exc) if ran_init_thunk: self.error_handler(exc, 1, init_val)
else: self.error_handler(exc, None, None)
raise exc raise exc
def default_init_thunk(self): return None
def default_final_func(self, init_val): return init_val
def default_error_handler(self, exc, ran_init, init_val): pass
class Robust: class Robust:
"""Contains various file operations made safer using tempfiles""" """Contains various file operations made safer using tempfiles"""
null_action = RobustAction(lambda: None, lambda: None, lambda e: None) null_action = RobustAction(None, None, None)
def chain(robust_action_list): def chain(*robust_action_list):
"""Return chain tying together a number of robust actions """Return chain tying together a number of robust actions
The whole chain will be aborted if some error occurs in The whole chain will be aborted if some error occurs in
initialization stage of any of the component actions. initialization stage of any of the component actions.
""" """
ras_with_completed_inits = [] ras_with_started_inits, init_return_vals = [], []
def init(): def init():
for ra in robust_action_list: for ra in robust_action_list:
ras_with_completed_inits.append(ra) ras_with_started_inits.append(ra)
ra.init_thunk() init_return_vals.append(ra.init_thunk())
def final(): return init_return_vals
for ra in robust_action_list: ra.final_thunk() def final(init_return_vals):
def error(exc): final_vals = []
for ra in ras_with_completed_inits: ra.error_thunk(exc) for ra, init_val in zip(robust_action_list, init_return_vals):
final_vals.append(ra.final_func(init_val))
return final_vals
def error(exc, ran_init, init_val):
for ra, init_val in zip(ras_with_started_inits, init_return_vals):
ra.error_handler(exc, 1, init_val)
for ra in ras_with_started_inits[len(init_return_vals):]:
ra.error_handler(exc, None, None)
return RobustAction(init, final, error) return RobustAction(init, final, error)
def chain_nested(robust_action_list): def chain_nested(*robust_action_list):
"""Like chain but final actions performed in reverse order""" """Like chain but final actions performed in reverse order"""
ras_with_completed_inits = [] ras_with_started_inits, init_vals = [], []
def init(): def init():
for ra in robust_action_list: for ra in robust_action_list:
ras_with_completed_inits.append(ra) ras_with_started_inits.append(ra)
ra.init_thunk() init_vals.append(ra.init_thunk())
def final(): return init_vals
ralist_copy = robust_action_list[:] def final(init_vals):
ralist_copy.reverse() ras_and_inits = zip(robust_action_list, init_vals)
for ra in ralist_copy: ra.final_thunk() ras_and_inits.reverse()
def error(exc): final_vals = []
for ra in ras_with_completed_inits: ra.error_thunk(exc) for ra, init_val in ras_and_inits:
final_vals.append(ra.final_func(init_val))
return final_vals
def error(exc, ran_init, init_val):
for ra, init_val in zip(ras_with_started_inits, init_return_vals):
ra.error_handler(exc, 1, init_val)
for ra in ras_with_started_inits[len(init_return_vals):]:
ra.error_handler(exc, None, None)
return RobustAction(init, final, error) return RobustAction(init, final, error)
def make_tf_robustaction(init_thunk, tempfiles, final_renames = None): def make_tf_robustaction(init_thunk, tempfiles, final_renames = None):
...@@ -107,18 +136,19 @@ class Robust: ...@@ -107,18 +136,19 @@ class Robust:
create RobustActions of that type. create RobustActions of that type.
""" """
assert type(tempfiles) is types.TupleType, tempfiles if isinstance(tempfiles, TempFile): tempfiles = (tempfiles,)
if final_renames is None: final = lambda: None if isinstance(final_renames, RPath): final_renames = (final_renames,)
else: if final_renames is None: final_renames = [None] * len(tempfiles)
assert len(tempfiles) == len(final_renames) assert len(tempfiles) == len(final_renames)
def final(): # rename tempfiles to final positions
for i in range(len(tempfiles)): def final(init_val): # rename tempfiles to final positions
final_name = final_renames[i] for tempfile, destination in zip(tempfiles, final_renames):
if final_name: if destination:
if final_name.isdir(): # Cannot rename over directory if destination.isdir(): # Cannot rename over directory
final_name.delete() destination.delete()
tempfiles[i].rename(final_name) tempfile.rename(destination)
def error(exc): return init_val
def error(exc, ran_init, init_val):
for tf in tempfiles: tf.delete() for tf in tempfiles: tf.delete()
return RobustAction(init_thunk, final, error) return RobustAction(init_thunk, final, error)
...@@ -130,36 +160,46 @@ class Robust: ...@@ -130,36 +160,46 @@ class Robust:
overwritten). overwritten).
""" """
tfl = [None] # Need mutable object that init and final can access tfl = [None] # Need some mutable state to hold tf value
def init(): def init():
if not (rorpin.isdir() and rpout.isdir()): # already a dir if not (rorpin.isdir() and rpout.isdir()): # already a dir
tfl[0] = TempFileManager.new(rpout) tfl[0] = tf = TempFileManager.new(rpout)
if rorpin.isreg(): tfl[0].write_from_fileobj(rorpin.open("rb")) if rorpin.isreg(): tf.write_from_fileobj(rorpin.open("rb"))
else: RPath.copy(rorpin, tf) else: RPath.copy(rorpin, tf)
def final(): return tf
if tfl[0] and tfl[0].lstat(): else: return None
def final(tf):
if tf and tf.lstat():
if rpout.isdir(): rpout.delete() if rpout.isdir(): rpout.delete()
tfl[0].rename(rpout) tf.rename(rpout)
return RobustAction(init, final, lambda e: tfl[0] and tfl[0].delete()) return rpout
def error(exc, ran_init, init_val):
if tfl[0]: tfl[0].delete()
return RobustAction(init, final, error)
def copy_with_attribs_action(rorpin, rpout, compress = None): def copy_with_attribs_action(rorpin, rpout, compress = None):
"""Like copy_action but also copy attributes""" """Like copy_action but also copy attributes"""
tfl = [None] # Need mutable object that init and final can access tfl = [None] # Need some mutable state for error handler
def init(): def init():
if not (rorpin.isdir() and rpout.isdir()): # already a dir if not (rorpin.isdir() and rpout.isdir()): # already a dir
tfl[0] = TempFileManager.new(rpout) tfl[0] = tf = TempFileManager.new(rpout)
if rorpin.isreg(): if rorpin.isreg():
tfl[0].write_from_fileobj(rorpin.open("rb"), compress) tf.write_from_fileobj(rorpin.open("rb"), compress)
else: RPath.copy(rorpin, tfl[0]) else: RPath.copy(rorpin, tf)
if tfl[0].lstat(): # Some files, like sockets, won't be created if tf.lstat(): # Some files, like sockets, won't be created
RPathStatic.copy_attribs(rorpin, tfl[0]) RPathStatic.copy_attribs(rorpin, tf)
def final(): return tf
else: return None
def final(tf):
if rorpin.isdir() and rpout.isdir(): if rorpin.isdir() and rpout.isdir():
RPath.copy_attribs(rorpin, rpout) RPath.copy_attribs(rorpin, rpout)
elif tfl[0] and tfl[0].lstat(): elif tf and tf.lstat():
if rpout.isdir(): rpout.delete() if rpout.isdir(): rpout.delete() # can't rename over dir
tfl[0].rename(rpout) tf.rename(rpout)
return RobustAction(init, final, lambda e: tfl[0] and tfl[0].delete()) return rpout
def error(exc, ran_init, init_val):
if tfl[0]: tfl[0].delete()
return RobustAction(init, final, error)
def copy_attribs_action(rorpin, rpout): def copy_attribs_action(rorpin, rpout):
"""Return action which just copies attributes """Return action which just copies attributes
...@@ -168,14 +208,16 @@ class Robust: ...@@ -168,14 +208,16 @@ class Robust:
normal sequence. normal sequence.
""" """
def final(): RPath.copy_attribs(rorpin, rpout) def final(init_val):
return RobustAction(lambda: None, final, lambda e: None) RPath.copy_attribs(rorpin, rpout)
return rpout
return RobustAction(None, final, None)
def symlink_action(rpath, linktext): def symlink_action(rpath, linktext):
"""Return symlink action by moving one file over another""" """Return symlink action by moving one file over another"""
tf = TempFileManager.new(rpath) tf = TempFileManager.new(rpath)
def init(): tf.symlink(linktext) def init(): tf.symlink(linktext)
return Robust.make_tf_robustaction(init, (tf,), (rpath,)) return Robust.make_tf_robustaction(init, tf, rpath)
def destructive_write_action(rp, s): def destructive_write_action(rp, s):
"""Return action writing string s to rpath rp in robust way """Return action writing string s to rpath rp in robust way
...@@ -187,9 +229,9 @@ class Robust: ...@@ -187,9 +229,9 @@ class Robust:
def init(): def init():
fp = tf.open("wb") fp = tf.open("wb")
fp.write(s) fp.write(s)
assert not fp.close() fp.close()
tf.setdata() tf.setdata()
return Robust.make_tf_robustaction(init, (tf,), (rp,)) return Robust.make_tf_robustaction(init, tf, rp)
def check_common_error(init_thunk, error_thunk = lambda exc: None): def check_common_error(init_thunk, error_thunk = lambda exc: None):
"""Execute init_thunk, if error, run error_thunk on exception """Execute init_thunk, if error, run error_thunk on exception
...@@ -357,8 +399,8 @@ class SaveState: ...@@ -357,8 +399,8 @@ class SaveState:
symtext = apply(os.path.join, symtext = apply(os.path.join,
('increments',) + last_file_rorp.index) ('increments',) + last_file_rorp.index)
return Robust.symlink_action(cls._last_file_sym, symtext) return Robust.symlink_action(cls._last_file_sym, symtext)
else: return RobustAction(lambda: None, cls.touch_last_file, else: return RobustAction(None, lambda init_val: cls.touch_last_file(),
lambda exc: None) None)
def checkpoint(cls, ITR, finalizer, last_file_rorp, override = None): def checkpoint(cls, ITR, finalizer, last_file_rorp, override = None):
"""Save states of tree reducer and finalizer during inc backup """Save states of tree reducer and finalizer during inc backup
...@@ -372,9 +414,9 @@ class SaveState: ...@@ -372,9 +414,9 @@ class SaveState:
cls._last_checkpoint_time = time.time() cls._last_checkpoint_time = time.time()
Log("Writing checkpoint time %s" % cls._last_checkpoint_time, 7) Log("Writing checkpoint time %s" % cls._last_checkpoint_time, 7)
state_string = cPickle.dumps((ITR, finalizer)) state_string = cPickle.dumps((ITR, finalizer))
Robust.chain([Robust.destructive_write_action(cls._checkpoint_rp, Robust.chain(Robust.destructive_write_action(cls._checkpoint_rp,
state_string), state_string),
cls.record_last_file_action(last_file_rorp)]).execute() cls.record_last_file_action(last_file_rorp)).execute()
def checkpoint_needed(cls): def checkpoint_needed(cls):
"""Returns true if another checkpoint is called for""" """Returns true if another checkpoint is called for"""
......
...@@ -212,13 +212,13 @@ class RORPIter: ...@@ -212,13 +212,13 @@ class RORPIter:
"""Return action patching basisrp using diff_rorp""" """Return action patching basisrp using diff_rorp"""
assert diff_rorp, "Missing diff index %s" % basisrp.index assert diff_rorp, "Missing diff index %s" % basisrp.index
if not diff_rorp.lstat(): if not diff_rorp.lstat():
return RobustAction(lambda: None, basisrp.delete, lambda e: None) return RobustAction(None, lambda init_val: basisrp.delete(), None)
if Globals.preserve_hardlinks and diff_rorp.isflaglinked(): if Globals.preserve_hardlinks and diff_rorp.isflaglinked():
if not basisrp: basisrp = base_rp.new_index(diff_rorp.index) if not basisrp: basisrp = base_rp.new_index(diff_rorp.index)
return RobustAction(lambda: None, tf = TempFileManager.new(basisrp)
lambda: Hardlink.link_rp(diff_rorp, basisrp), def init(): Hardlink.link_rp(diff_rorp, tf, basisrp)
lambda e: None) return Robust.make_tf_robustaction(init, tf, basisrp)
elif basisrp and basisrp.isreg() and diff_rorp.isreg(): elif basisrp and basisrp.isreg() and diff_rorp.isreg():
assert diff_rorp.get_attached_filetype() == 'diff' assert diff_rorp.get_attached_filetype() == 'diff'
return Rdiff.patch_with_attribs_action(basisrp, diff_rorp) return Rdiff.patch_with_attribs_action(basisrp, diff_rorp)
......
...@@ -16,13 +16,29 @@ class StatsObj: ...@@ -16,13 +16,29 @@ class StatsObj:
'DeletedFiles', 'DeletedFileSize', 'DeletedFiles', 'DeletedFileSize',
'ChangedFiles', 'ChangedFiles',
'ChangedSourceSize', 'ChangedMirrorSize', 'ChangedSourceSize', 'ChangedMirrorSize',
'IncrementFileSize') 'IncrementFiles', 'IncrementFileSize')
stat_time_attrs = ('StartTime', 'EndTime', 'ElapsedTime') stat_time_attrs = ('StartTime', 'EndTime', 'ElapsedTime')
stat_attrs = stat_time_attrs + stat_file_attrs stat_attrs = ('Filename',) + stat_time_attrs + stat_file_attrs
# Below, the second value in each pair is true iff the value
# indicates a number of bytes
stat_file_pairs = (('SourceFiles', None), ('SourceFileSize', 1),
('MirrorFiles', None), ('MirrorFileSize', 1),
('NewFiles', None), ('NewFileSize', 1),
('DeletedFiles', None), ('DeletedFileSize', 1),
('ChangedFiles', None),
('ChangedSourceSize', 1), ('ChangedMirrorSize', 1),
('IncrementFiles', None), ('IncrementFileSize', 1))
# Set all stats to None, indicating info not available # Set all stats to None, indicating info not available
for attr in stat_attrs: locals()[attr] = None for attr in stat_attrs: locals()[attr] = None
# This is used in get_byte_summary_string below
byte_abbrev_list = ((1024*1024*1024*1024, "TB"),
(1024*1024*1024, "GB"),
(1024*1024, "MB"),
(1024, "KB"))
def get_stat(self, attribute): def get_stat(self, attribute):
"""Get a statistic""" """Get a statistic"""
try: return self.__dict__[attribute] try: return self.__dict__[attribute]
...@@ -34,33 +50,89 @@ class StatsObj: ...@@ -34,33 +50,89 @@ class StatsObj:
"""Set attribute to given value""" """Set attribute to given value"""
self.__dict__[attr] = value self.__dict__[attr] = value
def get_stats_line(self, index):
"""Return one line abbreviated version of full stats string"""
file_attrs = map(lambda attr: str(self.get_stat(attr)),
self.stat_file_attrs)
if not index: filename = "."
else:
# use repr to quote newlines in relative filename, then
# take of leading and trailing quote.
filename = repr(apply(os.path.join, index))[1:-1]
return " ".join([filename,] + file_attrs)
def set_stats_from_line(self, line):
"""Set statistics from given line"""
def error(): raise StatsException("Bad line '%s'" % line)
if line[-1] == "\n": line = line[:-1]
lineparts = line.split(" ")
if len(lineparts) < len(stat_file_attrs): error()
for attr, val_string in zip(stat_file_attrs,
lineparts[-len(stat_file_attrs):]):
try: val = long(val_string)
except ValueError:
try: val = float(val_string)
except ValueError: error()
self.set_stat(attr, val)
return self
def get_stats_string(self): def get_stats_string(self):
"""Return string printing out statistics""" """Return extended string printing out statistics"""
return self.get_timestats_string() + self.get_filestats_string()
def get_timestats_string(self):
"""Return portion of statistics string dealing with time"""
timelist = [] timelist = []
if self.StartTime is not None: if self.StartTime is not None:
timelist.append("StartTime %s (%s)\n" % timelist.append("StartTime %.2f (%s)\n" %
(self.StartTime, Time.timetopretty(self.StartTime))) (self.StartTime, Time.timetopretty(self.StartTime)))
if self.EndTime is not None: if self.EndTime is not None:
timelist.append("EndTime %s (%s)\n" % timelist.append("EndTime %.2f (%s)\n" %
(self.EndTime, Time.timetopretty(self.EndTime))) (self.EndTime, Time.timetopretty(self.EndTime)))
if self.StartTime is not None and self.EndTime is not None: if self.ElapsedTime or (self.StartTime is not None and
self.EndTime is not None):
if self.ElapsedTime is None: if self.ElapsedTime is None:
self.ElapsedTime = self.EndTime - self.StartTime self.ElapsedTime = self.EndTime - self.StartTime
timelist.append("ElapsedTime %s (%s)\n" % timelist.append("ElapsedTime %.2f (%s)\n" %
(self.ElapsedTime, Time.inttopretty(self.ElapsedTime))) (self.ElapsedTime, Time.inttopretty(self.ElapsedTime)))
return "".join(timelist)
def get_filestats_string(self):
"""Return portion of statistics string about files and bytes"""
def fileline(stat_file_pair):
"""Return zero or one line of the string"""
attr, in_bytes = stat_file_pair
val = self.get_stat(attr)
if val is None: return ""
if in_bytes:
return "%s %s (%s)\n" % (attr, val,
self.get_byte_summary_string(val))
else: return "%s %s\n" % (attr, val)
return "".join(map(fileline, self.stat_file_pairs))
filelist = ["%s %s\n" % (attr, self.get_stat(attr)) def get_byte_summary_string(self, byte_count):
for attr in self.stat_file_attrs """Turn byte count into human readable string like "7.23GB" """
if self.get_stat(attr) is not None] for abbrev_bytes, abbrev_string in self.byte_abbrev_list:
return "".join(timelist + filelist) if byte_count >= abbrev_bytes:
# Now get 3 significant figures
abbrev_count = float(byte_count)/abbrev_bytes
if abbrev_count >= 100: precision = 0
elif abbrev_count >= 10: precision = 1
else: precision = 2
return "%%.%df %s" % (precision, abbrev_string) \
% (abbrev_count,)
byte_count = round(byte_count)
if byte_count == 1: return "1 byte"
else: return "%d bytes" % (byte_count,)
def get_stats_logstring(self, title): def get_stats_logstring(self, title):
"""Like get_stats_string, but add header and footer""" """Like get_stats_string, but add header and footer"""
header = "-------------[ %s ]-------------" % title header = "--------------[ %s ]--------------" % title
footer = "-" * len(header) footer = "-" * len(header)
return "%s\n%s%s\n" % (header, self.get_stats_string(), footer) return "%s\n%s%s\n" % (header, self.get_stats_string(), footer)
def init_stats_from_string(self, s): def set_stats_from_string(self, s):
"""Initialize attributes from string, return self for convenience""" """Initialize attributes from string, return self for convenience"""
def error(line): raise StatsException("Bad line '%s'" % line) def error(line): raise StatsException("Bad line '%s'" % line)
...@@ -91,7 +163,7 @@ class StatsObj: ...@@ -91,7 +163,7 @@ class StatsObj:
def read_stats_from_rp(self, rp): def read_stats_from_rp(self, rp):
"""Set statistics from rpath, return self for convenience""" """Set statistics from rpath, return self for convenience"""
fp = rp.open("r") fp = rp.open("r")
self.init_stats_from_string(fp.read()) self.set_stats_from_string(fp.read())
fp.close() fp.close()
return self return self
...@@ -162,22 +234,96 @@ class StatsITR(IterTreeReducer, StatsObj): ...@@ -162,22 +234,96 @@ class StatsITR(IterTreeReducer, StatsObj):
self.ChangedFiles += 1 self.ChangedFiles += 1
self.ChangedSourceSize += mirror_dsrp.getsize() self.ChangedSourceSize += mirror_dsrp.getsize()
self.ChangedMirrorSize += self.mirror_base_size self.ChangedMirrorSize += self.mirror_base_size
self.IncrementFileSize += inc_rp and inc_rp.getsize() or 0 if inc_rp:
self.IncrementFiles += 1
self.IncrementFileSize += inc_rp.getsize()
else: # new file was created else: # new file was created
self.NewFiles += 1 self.NewFiles += 1
self.NewFileSize += mirror_dsrp.getsize() self.NewFileSize += mirror_dsrp.getsize()
self.IncrementFileSize += inc_rp and inc_rp.getsize() or 0 if inc_rp:
self.IncrementFiles += 1
self.IncrementFileSize += inc_rp.getsize()
else: else:
if self.mirror_base_exists: # file was deleted from mirror if self.mirror_base_exists: # file was deleted from mirror
self.MirrorFiles += 1 self.MirrorFiles += 1
self.MirrorFileSize += self.mirror_base_size self.MirrorFileSize += self.mirror_base_size
self.DeletedFiles += 1 self.DeletedFiles += 1
self.DeletedFileSize += self.mirror_base_size self.DeletedFileSize += self.mirror_base_size
self.IncrementFileSize += inc_rp and inc_rp.getsize() or 0 if inc_rp:
self.IncrementFiles += 1
self.IncrementFileSize += inc_rp.getsize()
def add_file_stats(self, subinstance): def add_file_stats(self, subinstance):
"""Add all file statistics from subinstance to current totals""" """Add all file statistics from subinstance to current totals"""
for attr in self.stat_file_attrs: for attr in self.stat_file_attrs:
self.set_stat(attr, self.set_stat(attr,
self.get_stat(attr) + subinstance.get_stat(attr)) self.get_stat(attr) + subinstance.get_stat(attr))
class Stats:
"""Misc statistics methods, pertaining to dir and session stat files"""
# This is the RPath of the directory statistics file, and the
# associated open file. It will hold a line of statistics for
# each directory that is backed up.
_dir_stats_rp = None
_dir_stats_fp = None
# This goes at the beginning of the directory statistics file and
# explains the format.
_dir_stats_header = """# rdiff-backup directory statistics file
#
# Each line is in the following format:
# RelativeDirName %s
""" % " ".join(StatsObj.stat_file_attrs)
def open_dir_stats_file(cls):
"""Open directory statistics file, write header"""
assert not cls._dir_stats_fp, "Directory file already open"
if Globals.compression: suffix = "data.gz"
else: suffix = "data"
cls._dir_stats_rp = Inc.get_inc(Globals.rbdir.append(
"directory_statistics"), Time.curtime, suffix)
if cls._dir_stats_rp.lstat():
Log("Warning, statistics file %s already exists, appending", 2)
cls._dir_stats_fp = cls._dir_stats_rp.open("ab",
Globals.compression)
else: cls._dir_stats_fp = \
cls._dir_stats_rp.open("wb", Globals.compression)
cls._dir_stats_fp.write(cls._dir_stats_header)
def write_dir_stats_line(cls, statobj, index):
"""Write info from statobj about rpath to statistics file"""
cls._dir_stats_fp.write(statobj.get_stats_line(index) +"\n")
def close_dir_stats_file(cls):
"""Close directory statistics file if its open"""
if cls._dir_stats_fp:
cls._dir_stats_fp.close()
cls._dir_stats_fp = None
def write_session_statistics(cls, statobj):
"""Write session statistics into file, log"""
stat_inc = Inc.get_inc(Globals.rbdir.append("session_statistics"),
Time.curtime, "data")
statobj.StartTime = Time.curtime
statobj.EndTime = time.time()
# include hardlink data and dir stats in size of increments
if Globals.preserve_hardlinks and Hardlink.final_inc:
# include hardlink data in size of increments
statobj.IncrementFiles += 1
statobj.IncrementFileSize += Hardlink.final_inc.getsize()
if cls._dir_stats_rp and cls._dir_stats_rp.lstat():
statobj.IncrementFiles += 1
statobj.IncrementFileSize += cls._dir_stats_rp.getsize()
statobj.write_stats_to_rp(stat_inc)
if Globals.print_statistics:
message = statobj.get_stats_logstring("Session statistics")
Log.log_to_file(message)
Globals.client_conn.sys.stdout.write(message)
MakeClass(Stats)
...@@ -95,7 +95,9 @@ class Time: ...@@ -95,7 +95,9 @@ class Time:
if seconds == 1: partlist.append("1 second") if seconds == 1: partlist.append("1 second")
elif not partlist or seconds > 1: elif not partlist or seconds > 1:
partlist.append("%s seconds" % seconds) if isinstance(seconds, int) or isinstance(seconds, long):
partlist.append("%s seconds" % seconds)
else: partlist.append("%.2f seconds" % seconds)
return " ".join(partlist) return " ".join(partlist)
def intstringtoseconds(cls, interval_string): def intstringtoseconds(cls, interval_string):
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment