Commit 145ce653 authored by Alain Takoudjou's avatar Alain Takoudjou

securedelete: use shred path as parameter, add stderr message when command fail

use shred path as parameter and print stderr message when shred command fail

/reviewed-on nexedi/slapos.toolbox!18
parent ffb8836b
......@@ -41,9 +41,9 @@ def getAgumentParser():
help='truncate and remove file after overwriting.')
parser.add_argument('-z', '--zero', action='store_true',
help='add a final overwrite with zeros to hide shredding.')
parser.add_argument('-s', '--skip-non-exist', action='store_true',
parser.add_argument('-s', '--check-exist', action='store_true',
default=False,
help='If a file don\'t exists, skip instead of raise.')
help='Check if files exist. If a file don\t exists, skip instead of raise.')
parser.add_argument('--file', dest='file_list',
required=True, nargs='+', metavar='FILE',
help='File(s) to overwrite and remove if -u is used.')
......@@ -52,17 +52,25 @@ def getAgumentParser():
'This is to prevent delete files used by a process.')
parser.add_argument('--check-pid-file', metavar="PID_FILE",
help='Same as "check-pid" option but read PID from PID_FILE')
parser.add_argument('--shred-bin', default="/usr/bin/shred",
help='Path of shred binary used to wipe files. Default: %(default)s')
return parser
def getFileList(source_file_list, check_exists):
def getFileList(source_file_list, check_exist):
file_list = []
for file_path in source_file_list:
for file in glob.glob(file_path):
if check_exists and not os.path.exists(file):
if file_path.find('*') != -1 or file_path.find('?') != -1:
sub_list = glob.glob(file_path)
else:
sub_list = [file_path]
for file in sub_list:
if check_exist and not os.path.exists(file):
continue
file_list.append(os.path.realpath(file))
assert len(file_list) > 0, file_list
file_list.append(file)
if os.path.islink(file):
# remove the link target file
file_list.append(os.path.realpath(file))
return file_list
def shred(options):
......@@ -77,24 +85,24 @@ def shred(options):
else:
check_pid = options.check_pid
if check_pid and psutil.pid_exists(check_pid):
raise Exception("check-pid is enabled and process with pid %s is running. "\
"Cannot wipe file(s) while the process is running." % check_pid)
return "check-pid is enabled and process with pid %s is running. Cannot " \
"wipe file(s) while the process is running." % check_pid
arg_list = ['/usr/bin/shred', '-n', str(options.iterations), '-v']
arg_list = [options.shred_bin, '-n', str(options.iterations), '-v']
if options.remove:
arg_list.append('-u')
if options.zero:
arg_list.append('-z')
arg_list.extend(getFileList(options.file_list, options.skip_non_exist))
arg_list.extend(getFileList(options.file_list, options.check_exist))
pshred = subprocess.Popen(arg_list, stdout=subprocess.PIPE,
stderr=subprocess.STDOUT)
result = pshred.communicate()[0]
result, stderr = pshred.communicate()
if pshred.returncode is None:
pshred.kill()
if pshred.returncode != 0:
raise RuntimeError('Command %r failed, with output:\n%s' % (
' '.join(arg_list), result))
raise RuntimeError('Command %r failed, with output:\n%s\n%s' % (
' '.join(arg_list), result, stderr or ''))
return result
def main():
......
......@@ -38,6 +38,8 @@ class TestSecureDelete(unittest.TestCase):
def setUp(self):
_, self.remove_file = tempfile.mkstemp()
_, self.remove_file2 = tempfile.mkstemp()
self.link_name = os.path.join(os.path.dirname(self.remove_file),
'%s_link' % os.path.basename(self.remove_file))
with open(self.remove_file, 'w') as f:
f.write("Skjsds@ßdjierhjzlalaa...")
with open(self.remove_file2, 'w') as f:
......@@ -48,6 +50,8 @@ class TestSecureDelete(unittest.TestCase):
os.remove(self.remove_file)
if os.path.exists(self.remove_file2):
os.remove(self.remove_file2)
if os.path.exists(self.link_name):
os.unlink(self.link_name)
def test_secure_remove_file(self):
options = getAgumentParser().parse_args(['-n', '2', '-u', '-z', '--file', self.remove_file])
......@@ -73,6 +77,31 @@ class TestSecureDelete(unittest.TestCase):
self.assertTrue("pass %s/%s" % (passes, passes) in result)
self.assertTrue("%s: removed" % os.path.basename(self.remove_file) in result)
def test_secure_remove_file_check_exist(self):
options = getAgumentParser().parse_args(['-n', '2', '-u', '-s', '--file', 'random.txt', self.remove_file])
passes = 2
result = shred(options)
self.assertFalse(os.path.exists(self.remove_file))
self.assertTrue("pass %s/%s" % (passes, passes) in result)
self.assertTrue("%s: removed" % os.path.basename(self.remove_file) in result)
def test_secure_remove_file_check_exist_false(self):
options = getAgumentParser().parse_args(['-n', '2', '-u', '--file', 'random.txt'])
passes = 2
with self.assertRaises(RuntimeError):
result = shred(options)
def test_secure_remove_file_with_link(self):
options = getAgumentParser().parse_args(['-n', '2', '-u', '--file', self.link_name])
passes = 2
os.symlink(self.remove_file, self.link_name)
result = shred(options)
# shred removed link and target file
self.assertFalse(os.path.exists(self.remove_file))
self.assertFalse(os.path.exists(self.link_name))
self.assertTrue("pass %s/%s" % (passes, passes) in result)
self.assertTrue("%s: removed" % os.path.basename(self.remove_file) in result)
def test_secure_remove_file_multiple_files(self):
options = getAgumentParser().parse_args(['-n', '2', '-u', '-z', '--file', self.remove_file, self.remove_file2])
passes = 2 + 1 # Option -z is used, plus one more pass
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment