Commit be00fc7d authored by ben's avatar ben

Directly interface to librsync instead of going through rdiff. Added

some supplementary scripts for testing.


git-svn-id: http://svn.savannah.nongnu.org/svn/rdiff-backup@149 2b77aa54-bcbc-44c9-a7ec-4f6cf2b41109
parent 9fa4fbd6
......@@ -15,7 +15,7 @@ RobustAction and the like.
"""
import os, popen2
import os, librsync
class RdiffException(Exception): pass
......@@ -23,23 +23,18 @@ class RdiffException(Exception): pass
def get_signature(rp):
"""Take signature of rpin file and return in file object"""
Log("Getting signature of %s" % rp.path, 7)
return rp.conn.Rdiff.Popen(['rdiff', 'signature', rp.path])
return librsync.SigFile(rp.open("rb"))
def get_delta_sigfileobj(sig_fileobj, rp_new):
"""Like get_delta but signature is in a file object"""
sig_tf = TempFileManager.new(rp_new, None)
sig_tf.write_from_fileobj(sig_fileobj)
rdiff_popen_obj = get_delta_sigrp(sig_tf, rp_new)
rdiff_popen_obj.set_thunk(sig_tf.delete)
return rdiff_popen_obj
Log("Getting delta of %s with signature stream" % (rp_new.path,), 7)
return librsync.DeltaFile(sig_fileobj, rp_new.open("rb"))
def get_delta_sigrp(rp_signature, rp_new):
"""Take signature rp and new rp, return delta file object"""
assert rp_signature.conn is rp_new.conn
Log("Getting delta of %s with signature %s" %
(rp_new.path, rp_signature.path), 7)
return rp_new.conn.Rdiff.Popen(['rdiff', 'delta',
rp_signature.path, rp_new.path])
return librsync.DeltaFile(rp_signature.open("rb"), rp_new.open("rb"))
def write_delta_action(basis, new, delta, compress = None):
"""Return action writing delta which brings basis to new
......@@ -48,23 +43,20 @@ def write_delta_action(basis, new, delta, compress = None):
before written to delta.
"""
sig_tf = TempFileManager.new(new, None)
delta_tf = TempFileManager.new(delta)
def init(): write_delta(basis, new, delta_tf, compress, sig_tf)
return Robust.make_tf_robustaction(init, (sig_tf, delta_tf),
(None, delta))
def init(): write_delta(basis, new, delta_tf, compress)
return Robust.make_tf_robustaction(init, delta_tf, delta)
def write_delta(basis, new, delta, compress = None, sig_tf = None):
def write_delta(basis, new, delta, compress = None):
"""Write rdiff delta which brings basis to new"""
Log("Writing delta %s from %s -> %s" %
(basis.path, new.path, delta.path), 7)
if not sig_tf: sig_tf = TempFileManager.new(new, None)
sig_tf.write_from_fileobj(get_signature(basis))
delta.write_from_fileobj(get_delta_sigrp(sig_tf, new), compress)
sig_tf.delete()
sigfile = librsync.SigFile(basis.open("rb"))
deltafile = librsync.DeltaFile(sigfile, new.open("rb"))
delta.write_from_fileobj(deltafile, compress)
def patch_action(rp_basis, rp_delta, rp_out = None,
out_tf = None, delta_compressed = None):
def patch_action(rp_basis, rp_delta, rp_out = None, out_tf = None,
delta_compressed = None):
"""Return RobustAction which patches rp_basis with rp_delta
If rp_out is None, put output in rp_basis. Will use TempFile
......@@ -73,47 +65,27 @@ def patch_action(rp_basis, rp_delta, rp_out = None,
"""
if not rp_out: rp_out = rp_basis
else: assert rp_out.conn is rp_basis.conn
if (delta_compressed or
not (isinstance(rp_delta, RPath) and isinstance(rp_basis, RPath)
and rp_basis.conn is rp_delta.conn)):
if delta_compressed:
assert isinstance(rp_delta, RPath)
return patch_fileobj_action(rp_basis, rp_delta.open('rb', 1),
rp_out, out_tf)
else: return patch_fileobj_action(rp_basis, rp_delta.open('rb'),
rp_out, out_tf)
# Files are uncompressed on same connection, run rdiff
if out_tf is None: out_tf = TempFileManager.new(rp_out)
if not out_tf: out_tf = TempFileManager.new(rp_out)
def init():
Log("Patching %s using %s to %s via %s" %
(rp_basis.path, rp_delta.path, rp_out.path, out_tf.path), 7)
cmdlist = ["rdiff", "patch", rp_basis.path,
rp_delta.path, out_tf.path]
return_val = rp_basis.conn.os.spawnvp(os.P_WAIT, 'rdiff', cmdlist)
rp_basis.conn.Rdiff.patch_local(rp_basis, rp_delta,
out_tf, delta_compressed)
out_tf.setdata()
if return_val != 0 or not out_tf.lstat():
RdiffException("Error running %s" % cmdlist)
return Robust.make_tf_robustaction(init, (out_tf,), (rp_out,))
return Robust.make_tf_robustaction(init, out_tf, rp_out)
def patch_fileobj_action(rp_basis, delta_fileobj, rp_out = None,
out_tf = None, delta_compressed = None):
"""Like patch_action but diff is given in fileobj form
def patch_local(rp_basis, rp_delta, outrp, delta_compressed = None):
"""Patch routine that must be run on rp_basis.conn
Nest a writing of a tempfile with the actual patching to
create a new action. We have to nest so that the tempfile
will be around until the patching finishes.
This is because librsync may need to seek() around in rp_basis,
and so needs a real file. Other rpaths can be remote.
"""
if not rp_out: rp_out = rp_basis
delta_tf = TempFileManager.new(rp_out, None)
def init(): delta_tf.write_from_fileobj(delta_fileobj)
def final(init_val): delta_tf.delete()
def error(exc, ran_init, init_val): delta_tf.delete()
write_delta_action = RobustAction(init, final, error)
return Robust.chain(write_delta_action, patch_action(rp_basis, delta_tf,
rp_out, out_tf))
assert rp_basis.conn is Globals.local_connection
if delta_compressed: deltafile = rp_delta.open("rb", 1)
else: deltafile = rp_delta.open("rb")
sigfile = librsync.SigFile(rp_basis.open("rb"))
patchfile = librsync.PatchedFile(rp_basis.open("rb"), deltafile)
outrp.write_from_fileobj(patchfile)
def patch_with_attribs_action(rp_basis, rp_delta, rp_out = None):
"""Like patch_action, but also transfers attributs from rp_delta"""
......@@ -129,63 +101,17 @@ def copy_action(rpin, rpout):
return Robust.copy_action(rpin, rpout)
Log("Rdiff copying %s to %s" % (rpin.path, rpout.path), 6)
delta_tf = TempFileManager.new(rpout, None)
return Robust.chain(write_delta_action(rpout, rpin, delta_tf),
patch_action(rpout, delta_tf),
RobustAction(lambda: None, delta_tf.delete,
lambda exc: delta_tf.delete))
class Popen:
"""Spawn process and treat stdout as file object
Instead of using popen, which evaluates arguments with the shell
and thus may lead to security holes (thanks to Jamie Heilman for
this point), use the popen2 class and discard stdin.
When closed, this object checks to make sure the process exited
cleanly, and executes closing_thunk.
"""
def __init__(self, cmdlist, closing_thunk = None):
"""RdiffFilehook initializer
fileobj is the file we are emulating
thunk is called with no parameters right after the file is closed
"""
assert type(cmdlist) is types.ListType
self.p3obj = popen2.Popen3(cmdlist)
self.fileobj = self.p3obj.fromchild
self.closing_thunk = closing_thunk
self.cmdlist = cmdlist
def set_thunk(self, closing_thunk):
"""Set closing_thunk if not already"""
assert not self.closing_thunk
self.closing_thunk = closing_thunk
def read(self, length = -1): return self.fileobj.read(length)
def close(self):
closeval = self.fileobj.close()
if self.closing_thunk: self.closing_thunk()
exitval = self.p3obj.poll()
if exitval == 0: return closeval
elif exitval == 256:
Log("Failure probably because %s couldn't be found in PATH."
% self.cmdlist[0], 2)
assert 0, "rdiff not found"
elif exitval == -1:
# There may a race condition where a process closes
# but doesn't provide its exitval fast enough.
Log("Waiting for process to close", 8)
time.sleep(0.2)
exitval = self.p3obj.poll()
if exitval == 0: return closeval
raise RdiffException("%s exited with non-zero value %d" %
(self.cmdlist, exitval))
out_tf = TempFileManager.new(rpout)
def init(): rpout.conn.Rdiff.copy_local(rpin, rpout, out_tf)
return Robust.make_tf_robustaction(init, out_tf, rpout)
def copy_local(rpin, rpout, rpnew):
"""Write rpnew == rpin using rpout as basis. rpout and rpnew local"""
assert rpnew.conn is rpout.conn is Globals.local_connection
sigfile = librsync.SigFile(rpout.open("rb"))
deltafile = rpin.conn.librsync.DeltaFile(sigfile, rpin.open("rb"))
rpnew.write_from_fileobj(librsync.PatchedFile(rpout.open("rb"), deltafile))
from log import *
from robust import *
......
This diff is collapsed.
......@@ -77,13 +77,11 @@ class Inc:
compress = None
diff_tf = TempFileManager.new(diff)
sig_tf = TempFileManager.new(mirror, None)
def init():
Rdiff.write_delta(new, mirror, diff_tf, compress, sig_tf)
Rdiff.write_delta(new, mirror, diff_tf, compress)
RPath.copy_attribs(mirror, diff_tf)
return diff
return Robust.make_tf_robustaction(init, (diff_tf, sig_tf),
(diff, None))
return Robust.make_tf_robustaction(init, diff_tf, diff)
def makedir_action(mirrordir, incpref):
"""Make file indicating directory mirrordir has changed"""
......
......@@ -94,14 +94,20 @@ class IterVirtualFile(UnwrapFile):
self.buffer = initial_data
self.closed = None
def read(self, length):
def read(self, length = -1):
"""Read length bytes from the file, updating buffers as necessary"""
assert not self.closed
if self.iwf.currently_in_file:
while length >= len(self.buffer):
if not self.addtobuffer(): break
if length >= 0:
while length >= len(self.buffer):
if not self.addtobuffer(): break
real_len = min(length, len(self.buffer))
else:
while 1:
if not self.addtobuffer(): break
real_len = len(self.buffer)
else: real_len = min(length, len(self.buffer))
real_len = min(length, len(self.buffer))
return_val = self.buffer[:real_len]
self.buffer = self.buffer[real_len:]
return return_val
......
# Copyright 2002 Ben Escoto
#
# This file is part of rdiff-backup.
#
# rdiff-backup is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, Inc., 675 Mass Ave, Cambridge MA
# 02139, USA; either version 2 of the License, or (at your option) any
# later version; incorporated herein by reference.
"""Provides a high-level interface to some librsync functions
This is a python wrapper around the lower-level _librsync module,
which is written in C. The goal was to use C as little as possible...
"""
import _librsync, types, array
blocksize = _librsync.RS_JOB_BLOCKSIZE
class librsyncError(Exception):
"""Signifies error in internal librsync processing (bad signature, etc.)
underlying _librsync.librsyncError's are regenerated using this
class because the C-created exceptions are by default
unPickleable. There is probably a way to fix this in _librsync,
but this scheme was easier.
"""
pass
class LikeFile:
"""File-like object used by SigFile, DeltaFile, and PatchFile"""
mode = "rb"
# This will be replaced in subclasses by an object with
# appropriate cycle() method
maker = None
def __init__(self, infile, need_seek = None):
"""LikeFile initializer - zero buffers, set eofs off"""
self.check_file(infile, need_seek)
self.infile = infile
self.closed = self.infile_closed = None
self.inbuf = ""
self.outbuf = array.array('c')
self.eof = self.infile_eof = None
def check_file(self, file, need_seek = None):
"""Raise type error if file doesn't have necessary attributes"""
if not hasattr(file, "read"):
raise TypeError("Basis file must have a read() method")
if not hasattr(file, "close"):
raise TypeError("Basis file must have a close() method")
if need_seek and not hasattr(file, "seek"):
raise TypeError("Basis file must have a seek() method")
def read(self, length = -1):
"""Build up self.outbuf, return first length bytes"""
if length == -1:
while not self.eof: self._add_to_outbuf_once()
real_len = len(self.outbuf)
else:
while not self.eof and len(self.outbuf) < length:
self._add_to_outbuf_once()
real_len = min(length, len(self.outbuf))
return_val = self.outbuf[:real_len].tostring()
del self.outbuf[:real_len]
return return_val
def _add_to_outbuf_once(self):
"""Add one cycle's worth of output to self.outbuf"""
if not self.infile_eof: self._add_to_inbuf()
try: self.eof, len_inbuf_read, cycle_out = self.maker.cycle(self.inbuf)
except _librsync.librsyncError, e: raise librsyncError(str(e))
self.inbuf = self.inbuf[len_inbuf_read:]
self.outbuf.fromstring(cycle_out)
def _add_to_inbuf(self):
"""Make sure len(self.inbuf) >= blocksize"""
assert not self.infile_eof
while len(self.inbuf) < blocksize:
new_in = self.infile.read(blocksize)
if not new_in:
self.infile_eof = 1
assert not self.infile.close()
self.infile_closed = 1
break
self.inbuf += new_in
def close(self):
"""Close infile"""
if not self.infile_closed: assert not self.infile.close()
self.closed = 1
class SigFile(LikeFile):
"""File-like object which incrementally generates a librsync signature"""
def __init__(self, infile):
"""SigFile initializer - takes basis file
basis file only needs to have read() and close() methods. It
will be closed when we come to the end of the signature.
"""
LikeFile.__init__(self, infile)
try: self.maker = _librsync.new_sigmaker()
except _librsync.librsyncError, e: raise librsyncError(str(e))
class DeltaFile(LikeFile):
"""File-like object which incrementally generates a librsync delta"""
def __init__(self, signature, new_file):
"""DeltaFile initializer - call with signature and new file
Signature can either be a string or a file with read() and
close() methods. New_file also only needs to have read() and
close() methods. It will be closed when self is closed.
"""
LikeFile.__init__(self, new_file)
if type(signature) is types.StringType: sig_string = signature
else:
self.check_file(signature)
sig_string = signature.read()
assert not signature.close()
try: self.maker = _librsync.new_deltamaker(sig_string)
except _librsync.librsyncError, e: raise librsyncError(str(e))
class PatchedFile(LikeFile):
"""File-like object which applies a librsync delta incrementally"""
def __init__(self, basis_file, delta_file):
"""PatchedFile initializer - call with basis delta
Here basis_file must be a true Python file, because we may
need to seek() around in it a lot, and this is done in C.
delta_file only needs read() and close() methods.
"""
LikeFile.__init__(self, delta_file)
if type(basis_file) is not types.FileType:
raise TypeError("basis_file must be a (true) file")
try: self.maker = _librsync.new_patchmaker(basis_file)
except _librsync.librsyncError, e: raise librsyncError(str(e))
#!/usr/bin/env python
"""Demonstrate a memory leak in pysync/librsync"""
import os, _librsync
from librsync import *
os.chdir("/tmp")
# Write 2 1 byte files
afile = open("a", "wb")
afile.write("a")
afile.close()
efile = open("e", "wb")
efile.write("e")
efile.close()
def copy(infileobj, outpath):
outfile = open(outpath, "wb")
while 1:
buf = infileobj.read(32768)
if not buf: break
outfile.write(buf)
assert not outfile.close()
assert not infileobj.close()
def test_cycle():
for i in xrange(100000):
sm = _librsync.new_sigmaker()
sm.cycle("a")
def main_test():
for i in xrange(100000):
# Write signature file
afile = open("a", "rb")
copy(SigFile(afile), "sig")
# Write delta file
efile = open("e", "r")
sigfile = open("sig", "rb")
copy(DeltaFile(sigfile, efile), "delta")
# Write patched file
afile = open("e", "rb")
deltafile = open("delta", "rb")
copy(PatchedFile(afile, deltafile), "a.out")
main_test()
#!/usr/bin/env python
"""Like rdiff, but written in python and uses librsync module.
Useful for benchmarking and testing of librsync and _librsync.
"""
import librsync, sys
blocksize = 32768
def makesig(inpath, outpath):
"""Write a signature of inpath at outpath"""
sf = librsync.SigFile(open(inpath, "rb"))
fout = open(outpath, "wb")
while 1:
buf = sf.read(blocksize)
if not buf: break
fout.write(buf)
assert not sf.close()
assert not fout.close()
def makedelta(sigpath, newpath, deltapath):
"""Write delta at deltapath using signature at sigpath"""
df = librsync.DeltaFile(open(sigpath, "rb"), open(newpath, "rb"))
fout = open(deltapath, "wb")
while 1:
buf = df.read(blocksize)
if not buf: break
fout.write(buf)
assert not df.close()
assert not fout.close()
def makepatch(basis_path, delta_path, new_path):
"""Write new given basis and delta"""
pf = librsync.PatchedFile(open(basis_path, "rb"), open(delta_path, "rb"))
fout = open(new_path, "wb")
while 1:
buf = pf.read(blocksize)
if not buf: break
fout.write(buf)
assert not pf.close()
assert not fout.close()
if sys.argv[1] == "signature":
makesig(sys.argv[2], sys.argv[3])
elif sys.argv[1] == "delta":
makedelta(sys.argv[2], sys.argv[3], sys.argv[4])
elif sys.argv[1] == "patch":
makepatch(sys.argv[2], sys.argv[3], sys.argv[4])
else: assert 0, "Bad mode argument %s" % (sys.argv[1],)
......@@ -254,7 +254,7 @@ class Robust:
try: return function(*args)
except (EnvironmentError, SkipFileException, DSRPPermError,
RPathException, Rdiff.RdiffException,
C.UnknownFileTypeError), exc:
librsync.librsyncError, C.UnknownFileTypeError), exc:
TracebackArchive.add()
if (not isinstance(exc, EnvironmentError) or
(errno.errorcode[exc[0]] in
......@@ -659,5 +659,5 @@ class ResumeSessionInfo:
from log import *
from destructive_stepping import *
import Time, Rdiff
import Time, Rdiff, librsync
from highlevel import *
......@@ -11,7 +11,7 @@
"""Operations on Iterators of Read Only Remote Paths"""
from __future__ import generators
import tempfile, UserList, types
import tempfile, UserList, types, librsync
from static import *
from log import *
from rpath import *
......@@ -202,6 +202,13 @@ class RORPIter:
return diff_rorp
elif sig_rorp and sig_rorp.isreg() and new_rp and new_rp.isreg():
diff_rorp = new_rp.getRORPath()
#fp = sig_rorp.open("rb")
#print "---------------------", fp
#tmp_sig_rp = RPath(Globals.local_connection, "/tmp/sig")
#tmp_sig_rp.delete()
#tmp_sig_rp.write_from_fileobj(fp)
#diff_rorp.setfile(Rdiff.get_delta_sigfileobj(tmp_sig_rp.open("rb"),
# new_rp))
diff_rorp.setfile(Rdiff.get_delta_sigfileobj(sig_rorp.open("rb"),
new_rp))
diff_rorp.set_attached_filetype('diff')
......@@ -244,7 +251,6 @@ class RORPIter:
MakeStatic(RORPIter)
class IndexedTuple(UserList.UserList):
"""Like a tuple, but has .index
......
......@@ -11,12 +11,15 @@
"""Generate and process aggregated backup information"""
from lazy import *
import re
class StatsException(Exception): pass
class StatsObj:
"""Contains various statistics, provide string conversion functions"""
# used when quoting files in get_stats_line
space_regex = re.compile(" ")
stat_file_attrs = ('SourceFiles', 'SourceFileSize',
'MirrorFiles', 'MirrorFileSize',
......@@ -71,8 +74,8 @@ class StatsObj:
filename = apply(os.path.join, index)
if use_repr:
# use repr to quote newlines in relative filename, then
# take of leading and trailing quote.
filename = repr(filename)[1:-1]
# take of leading and trailing quote and quote spaces.
filename = self.space_regex.sub("\\x20", repr(filename)[1:-1])
return " ".join([filename,] + file_attrs)
def set_stats_from_line(self, line):
......
......@@ -15,7 +15,7 @@ RobustAction and the like.
"""
import os, popen2
import os, librsync
class RdiffException(Exception): pass
......@@ -23,23 +23,18 @@ class RdiffException(Exception): pass
def get_signature(rp):
"""Take signature of rpin file and return in file object"""
Log("Getting signature of %s" % rp.path, 7)
return rp.conn.Rdiff.Popen(['rdiff', 'signature', rp.path])
return librsync.SigFile(rp.open("rb"))
def get_delta_sigfileobj(sig_fileobj, rp_new):
"""Like get_delta but signature is in a file object"""
sig_tf = TempFileManager.new(rp_new, None)
sig_tf.write_from_fileobj(sig_fileobj)
rdiff_popen_obj = get_delta_sigrp(sig_tf, rp_new)
rdiff_popen_obj.set_thunk(sig_tf.delete)
return rdiff_popen_obj
Log("Getting delta of %s with signature stream" % (rp_new.path,), 7)
return librsync.DeltaFile(sig_fileobj, rp_new.open("rb"))
def get_delta_sigrp(rp_signature, rp_new):
"""Take signature rp and new rp, return delta file object"""
assert rp_signature.conn is rp_new.conn
Log("Getting delta of %s with signature %s" %
(rp_new.path, rp_signature.path), 7)
return rp_new.conn.Rdiff.Popen(['rdiff', 'delta',
rp_signature.path, rp_new.path])
return librsync.DeltaFile(rp_signature.open("rb"), rp_new.open("rb"))
def write_delta_action(basis, new, delta, compress = None):
"""Return action writing delta which brings basis to new
......@@ -48,23 +43,20 @@ def write_delta_action(basis, new, delta, compress = None):
before written to delta.
"""
sig_tf = TempFileManager.new(new, None)
delta_tf = TempFileManager.new(delta)
def init(): write_delta(basis, new, delta_tf, compress, sig_tf)
return Robust.make_tf_robustaction(init, (sig_tf, delta_tf),
(None, delta))
def init(): write_delta(basis, new, delta_tf, compress)
return Robust.make_tf_robustaction(init, delta_tf, delta)
def write_delta(basis, new, delta, compress = None, sig_tf = None):
def write_delta(basis, new, delta, compress = None):
"""Write rdiff delta which brings basis to new"""
Log("Writing delta %s from %s -> %s" %
(basis.path, new.path, delta.path), 7)
if not sig_tf: sig_tf = TempFileManager.new(new, None)
sig_tf.write_from_fileobj(get_signature(basis))
delta.write_from_fileobj(get_delta_sigrp(sig_tf, new), compress)
sig_tf.delete()
sigfile = librsync.SigFile(basis.open("rb"))
deltafile = librsync.DeltaFile(sigfile, new.open("rb"))
delta.write_from_fileobj(deltafile, compress)
def patch_action(rp_basis, rp_delta, rp_out = None,
out_tf = None, delta_compressed = None):
def patch_action(rp_basis, rp_delta, rp_out = None, out_tf = None,
delta_compressed = None):
"""Return RobustAction which patches rp_basis with rp_delta
If rp_out is None, put output in rp_basis. Will use TempFile
......@@ -73,47 +65,27 @@ def patch_action(rp_basis, rp_delta, rp_out = None,
"""
if not rp_out: rp_out = rp_basis
else: assert rp_out.conn is rp_basis.conn
if (delta_compressed or
not (isinstance(rp_delta, RPath) and isinstance(rp_basis, RPath)
and rp_basis.conn is rp_delta.conn)):
if delta_compressed:
assert isinstance(rp_delta, RPath)
return patch_fileobj_action(rp_basis, rp_delta.open('rb', 1),
rp_out, out_tf)
else: return patch_fileobj_action(rp_basis, rp_delta.open('rb'),
rp_out, out_tf)
# Files are uncompressed on same connection, run rdiff
if out_tf is None: out_tf = TempFileManager.new(rp_out)
if not out_tf: out_tf = TempFileManager.new(rp_out)
def init():
Log("Patching %s using %s to %s via %s" %
(rp_basis.path, rp_delta.path, rp_out.path, out_tf.path), 7)
cmdlist = ["rdiff", "patch", rp_basis.path,
rp_delta.path, out_tf.path]
return_val = rp_basis.conn.os.spawnvp(os.P_WAIT, 'rdiff', cmdlist)
rp_basis.conn.Rdiff.patch_local(rp_basis, rp_delta,
out_tf, delta_compressed)
out_tf.setdata()
if return_val != 0 or not out_tf.lstat():
RdiffException("Error running %s" % cmdlist)
return Robust.make_tf_robustaction(init, (out_tf,), (rp_out,))
return Robust.make_tf_robustaction(init, out_tf, rp_out)
def patch_fileobj_action(rp_basis, delta_fileobj, rp_out = None,
out_tf = None, delta_compressed = None):
"""Like patch_action but diff is given in fileobj form
def patch_local(rp_basis, rp_delta, outrp, delta_compressed = None):
"""Patch routine that must be run on rp_basis.conn
Nest a writing of a tempfile with the actual patching to
create a new action. We have to nest so that the tempfile
will be around until the patching finishes.
This is because librsync may need to seek() around in rp_basis,
and so needs a real file. Other rpaths can be remote.
"""
if not rp_out: rp_out = rp_basis
delta_tf = TempFileManager.new(rp_out, None)
def init(): delta_tf.write_from_fileobj(delta_fileobj)
def final(init_val): delta_tf.delete()
def error(exc, ran_init, init_val): delta_tf.delete()
write_delta_action = RobustAction(init, final, error)
return Robust.chain(write_delta_action, patch_action(rp_basis, delta_tf,
rp_out, out_tf))
assert rp_basis.conn is Globals.local_connection
if delta_compressed: deltafile = rp_delta.open("rb", 1)
else: deltafile = rp_delta.open("rb")
sigfile = librsync.SigFile(rp_basis.open("rb"))
patchfile = librsync.PatchedFile(rp_basis.open("rb"), deltafile)
outrp.write_from_fileobj(patchfile)
def patch_with_attribs_action(rp_basis, rp_delta, rp_out = None):
"""Like patch_action, but also transfers attributs from rp_delta"""
......@@ -129,63 +101,17 @@ def copy_action(rpin, rpout):
return Robust.copy_action(rpin, rpout)
Log("Rdiff copying %s to %s" % (rpin.path, rpout.path), 6)
delta_tf = TempFileManager.new(rpout, None)
return Robust.chain(write_delta_action(rpout, rpin, delta_tf),
patch_action(rpout, delta_tf),
RobustAction(lambda: None, delta_tf.delete,
lambda exc: delta_tf.delete))
class Popen:
"""Spawn process and treat stdout as file object
Instead of using popen, which evaluates arguments with the shell
and thus may lead to security holes (thanks to Jamie Heilman for
this point), use the popen2 class and discard stdin.
When closed, this object checks to make sure the process exited
cleanly, and executes closing_thunk.
"""
def __init__(self, cmdlist, closing_thunk = None):
"""RdiffFilehook initializer
fileobj is the file we are emulating
thunk is called with no parameters right after the file is closed
"""
assert type(cmdlist) is types.ListType
self.p3obj = popen2.Popen3(cmdlist)
self.fileobj = self.p3obj.fromchild
self.closing_thunk = closing_thunk
self.cmdlist = cmdlist
def set_thunk(self, closing_thunk):
"""Set closing_thunk if not already"""
assert not self.closing_thunk
self.closing_thunk = closing_thunk
def read(self, length = -1): return self.fileobj.read(length)
def close(self):
closeval = self.fileobj.close()
if self.closing_thunk: self.closing_thunk()
exitval = self.p3obj.poll()
if exitval == 0: return closeval
elif exitval == 256:
Log("Failure probably because %s couldn't be found in PATH."
% self.cmdlist[0], 2)
assert 0, "rdiff not found"
elif exitval == -1:
# There may a race condition where a process closes
# but doesn't provide its exitval fast enough.
Log("Waiting for process to close", 8)
time.sleep(0.2)
exitval = self.p3obj.poll()
if exitval == 0: return closeval
raise RdiffException("%s exited with non-zero value %d" %
(self.cmdlist, exitval))
out_tf = TempFileManager.new(rpout)
def init(): rpout.conn.Rdiff.copy_local(rpin, rpout, out_tf)
return Robust.make_tf_robustaction(init, out_tf, rpout)
def copy_local(rpin, rpout, rpnew):
"""Write rpnew == rpin using rpout as basis. rpout and rpnew local"""
assert rpnew.conn is rpout.conn is Globals.local_connection
sigfile = librsync.SigFile(rpout.open("rb"))
deltafile = rpin.conn.librsync.DeltaFile(sigfile, rpin.open("rb"))
rpnew.write_from_fileobj(librsync.PatchedFile(rpout.open("rb"), deltafile))
from log import *
from robust import *
......
This diff is collapsed.
......@@ -77,13 +77,11 @@ class Inc:
compress = None
diff_tf = TempFileManager.new(diff)
sig_tf = TempFileManager.new(mirror, None)
def init():
Rdiff.write_delta(new, mirror, diff_tf, compress, sig_tf)
Rdiff.write_delta(new, mirror, diff_tf, compress)
RPath.copy_attribs(mirror, diff_tf)
return diff
return Robust.make_tf_robustaction(init, (diff_tf, sig_tf),
(diff, None))
return Robust.make_tf_robustaction(init, diff_tf, diff)
def makedir_action(mirrordir, incpref):
"""Make file indicating directory mirrordir has changed"""
......
......@@ -94,14 +94,20 @@ class IterVirtualFile(UnwrapFile):
self.buffer = initial_data
self.closed = None
def read(self, length):
def read(self, length = -1):
"""Read length bytes from the file, updating buffers as necessary"""
assert not self.closed
if self.iwf.currently_in_file:
while length >= len(self.buffer):
if not self.addtobuffer(): break
if length >= 0:
while length >= len(self.buffer):
if not self.addtobuffer(): break
real_len = min(length, len(self.buffer))
else:
while 1:
if not self.addtobuffer(): break
real_len = len(self.buffer)
else: real_len = min(length, len(self.buffer))
real_len = min(length, len(self.buffer))
return_val = self.buffer[:real_len]
self.buffer = self.buffer[real_len:]
return return_val
......
# Copyright 2002 Ben Escoto
#
# This file is part of rdiff-backup.
#
# rdiff-backup is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, Inc., 675 Mass Ave, Cambridge MA
# 02139, USA; either version 2 of the License, or (at your option) any
# later version; incorporated herein by reference.
"""Provides a high-level interface to some librsync functions
This is a python wrapper around the lower-level _librsync module,
which is written in C. The goal was to use C as little as possible...
"""
import _librsync, types, array
blocksize = _librsync.RS_JOB_BLOCKSIZE
class librsyncError(Exception):
"""Signifies error in internal librsync processing (bad signature, etc.)
underlying _librsync.librsyncError's are regenerated using this
class because the C-created exceptions are by default
unPickleable. There is probably a way to fix this in _librsync,
but this scheme was easier.
"""
pass
class LikeFile:
"""File-like object used by SigFile, DeltaFile, and PatchFile"""
mode = "rb"
# This will be replaced in subclasses by an object with
# appropriate cycle() method
maker = None
def __init__(self, infile, need_seek = None):
"""LikeFile initializer - zero buffers, set eofs off"""
self.check_file(infile, need_seek)
self.infile = infile
self.closed = self.infile_closed = None
self.inbuf = ""
self.outbuf = array.array('c')
self.eof = self.infile_eof = None
def check_file(self, file, need_seek = None):
"""Raise type error if file doesn't have necessary attributes"""
if not hasattr(file, "read"):
raise TypeError("Basis file must have a read() method")
if not hasattr(file, "close"):
raise TypeError("Basis file must have a close() method")
if need_seek and not hasattr(file, "seek"):
raise TypeError("Basis file must have a seek() method")
def read(self, length = -1):
"""Build up self.outbuf, return first length bytes"""
if length == -1:
while not self.eof: self._add_to_outbuf_once()
real_len = len(self.outbuf)
else:
while not self.eof and len(self.outbuf) < length:
self._add_to_outbuf_once()
real_len = min(length, len(self.outbuf))
return_val = self.outbuf[:real_len].tostring()
del self.outbuf[:real_len]
return return_val
def _add_to_outbuf_once(self):
"""Add one cycle's worth of output to self.outbuf"""
if not self.infile_eof: self._add_to_inbuf()
try: self.eof, len_inbuf_read, cycle_out = self.maker.cycle(self.inbuf)
except _librsync.librsyncError, e: raise librsyncError(str(e))
self.inbuf = self.inbuf[len_inbuf_read:]
self.outbuf.fromstring(cycle_out)
def _add_to_inbuf(self):
"""Make sure len(self.inbuf) >= blocksize"""
assert not self.infile_eof
while len(self.inbuf) < blocksize:
new_in = self.infile.read(blocksize)
if not new_in:
self.infile_eof = 1
assert not self.infile.close()
self.infile_closed = 1
break
self.inbuf += new_in
def close(self):
"""Close infile"""
if not self.infile_closed: assert not self.infile.close()
self.closed = 1
class SigFile(LikeFile):
"""File-like object which incrementally generates a librsync signature"""
def __init__(self, infile):
"""SigFile initializer - takes basis file
basis file only needs to have read() and close() methods. It
will be closed when we come to the end of the signature.
"""
LikeFile.__init__(self, infile)
try: self.maker = _librsync.new_sigmaker()
except _librsync.librsyncError, e: raise librsyncError(str(e))
class DeltaFile(LikeFile):
"""File-like object which incrementally generates a librsync delta"""
def __init__(self, signature, new_file):
"""DeltaFile initializer - call with signature and new file
Signature can either be a string or a file with read() and
close() methods. New_file also only needs to have read() and
close() methods. It will be closed when self is closed.
"""
LikeFile.__init__(self, new_file)
if type(signature) is types.StringType: sig_string = signature
else:
self.check_file(signature)
sig_string = signature.read()
assert not signature.close()
try: self.maker = _librsync.new_deltamaker(sig_string)
except _librsync.librsyncError, e: raise librsyncError(str(e))
class PatchedFile(LikeFile):
"""File-like object which applies a librsync delta incrementally"""
def __init__(self, basis_file, delta_file):
"""PatchedFile initializer - call with basis delta
Here basis_file must be a true Python file, because we may
need to seek() around in it a lot, and this is done in C.
delta_file only needs read() and close() methods.
"""
LikeFile.__init__(self, delta_file)
if type(basis_file) is not types.FileType:
raise TypeError("basis_file must be a (true) file")
try: self.maker = _librsync.new_patchmaker(basis_file)
except _librsync.librsyncError, e: raise librsyncError(str(e))
#!/usr/bin/env python
"""Demonstrate a memory leak in pysync/librsync"""
import os, _librsync
from librsync import *
os.chdir("/tmp")
# Write 2 1 byte files
afile = open("a", "wb")
afile.write("a")
afile.close()
efile = open("e", "wb")
efile.write("e")
efile.close()
def copy(infileobj, outpath):
outfile = open(outpath, "wb")
while 1:
buf = infileobj.read(32768)
if not buf: break
outfile.write(buf)
assert not outfile.close()
assert not infileobj.close()
def test_cycle():
for i in xrange(100000):
sm = _librsync.new_sigmaker()
sm.cycle("a")
def main_test():
for i in xrange(100000):
# Write signature file
afile = open("a", "rb")
copy(SigFile(afile), "sig")
# Write delta file
efile = open("e", "r")
sigfile = open("sig", "rb")
copy(DeltaFile(sigfile, efile), "delta")
# Write patched file
afile = open("e", "rb")
deltafile = open("delta", "rb")
copy(PatchedFile(afile, deltafile), "a.out")
main_test()
#!/usr/bin/env python
"""Like rdiff, but written in python and uses librsync module.
Useful for benchmarking and testing of librsync and _librsync.
"""
import librsync, sys
blocksize = 32768
def makesig(inpath, outpath):
"""Write a signature of inpath at outpath"""
sf = librsync.SigFile(open(inpath, "rb"))
fout = open(outpath, "wb")
while 1:
buf = sf.read(blocksize)
if not buf: break
fout.write(buf)
assert not sf.close()
assert not fout.close()
def makedelta(sigpath, newpath, deltapath):
"""Write delta at deltapath using signature at sigpath"""
df = librsync.DeltaFile(open(sigpath, "rb"), open(newpath, "rb"))
fout = open(deltapath, "wb")
while 1:
buf = df.read(blocksize)
if not buf: break
fout.write(buf)
assert not df.close()
assert not fout.close()
def makepatch(basis_path, delta_path, new_path):
"""Write new given basis and delta"""
pf = librsync.PatchedFile(open(basis_path, "rb"), open(delta_path, "rb"))
fout = open(new_path, "wb")
while 1:
buf = pf.read(blocksize)
if not buf: break
fout.write(buf)
assert not pf.close()
assert not fout.close()
if sys.argv[1] == "signature":
makesig(sys.argv[2], sys.argv[3])
elif sys.argv[1] == "delta":
makedelta(sys.argv[2], sys.argv[3], sys.argv[4])
elif sys.argv[1] == "patch":
makepatch(sys.argv[2], sys.argv[3], sys.argv[4])
else: assert 0, "Bad mode argument %s" % (sys.argv[1],)
......@@ -254,7 +254,7 @@ class Robust:
try: return function(*args)
except (EnvironmentError, SkipFileException, DSRPPermError,
RPathException, Rdiff.RdiffException,
C.UnknownFileTypeError), exc:
librsync.librsyncError, C.UnknownFileTypeError), exc:
TracebackArchive.add()
if (not isinstance(exc, EnvironmentError) or
(errno.errorcode[exc[0]] in
......@@ -659,5 +659,5 @@ class ResumeSessionInfo:
from log import *
from destructive_stepping import *
import Time, Rdiff
import Time, Rdiff, librsync
from highlevel import *
......@@ -11,7 +11,7 @@
"""Operations on Iterators of Read Only Remote Paths"""
from __future__ import generators
import tempfile, UserList, types
import tempfile, UserList, types, librsync
from static import *
from log import *
from rpath import *
......@@ -202,6 +202,13 @@ class RORPIter:
return diff_rorp
elif sig_rorp and sig_rorp.isreg() and new_rp and new_rp.isreg():
diff_rorp = new_rp.getRORPath()
#fp = sig_rorp.open("rb")
#print "---------------------", fp
#tmp_sig_rp = RPath(Globals.local_connection, "/tmp/sig")
#tmp_sig_rp.delete()
#tmp_sig_rp.write_from_fileobj(fp)
#diff_rorp.setfile(Rdiff.get_delta_sigfileobj(tmp_sig_rp.open("rb"),
# new_rp))
diff_rorp.setfile(Rdiff.get_delta_sigfileobj(sig_rorp.open("rb"),
new_rp))
diff_rorp.set_attached_filetype('diff')
......@@ -244,7 +251,6 @@ class RORPIter:
MakeStatic(RORPIter)
class IndexedTuple(UserList.UserList):
"""Like a tuple, but has .index
......
......@@ -11,12 +11,15 @@
"""Generate and process aggregated backup information"""
from lazy import *
import re
class StatsException(Exception): pass
class StatsObj:
"""Contains various statistics, provide string conversion functions"""
# used when quoting files in get_stats_line
space_regex = re.compile(" ")
stat_file_attrs = ('SourceFiles', 'SourceFileSize',
'MirrorFiles', 'MirrorFileSize',
......@@ -71,8 +74,8 @@ class StatsObj:
filename = apply(os.path.join, index)
if use_repr:
# use repr to quote newlines in relative filename, then
# take of leading and trailing quote.
filename = repr(filename)[1:-1]
# take of leading and trailing quote and quote spaces.
filename = self.space_regex.sub("\\x20", repr(filename)[1:-1])
return " ".join([filename,] + file_attrs)
def set_stats_from_line(self, line):
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment