Commit 5f531227 authored by Luke Macken's avatar Luke Macken

Add a ton of unit tests

parent c9dec984
......@@ -15,39 +15,163 @@
#
# Copyright (C) 2011, 2012 Red Hat, Inc.
import os
import sys
import time
import unittest
import textwrap
import tempfile
import subprocess
import pyrasite
subprocesses = []
default_payload = stop_payload = None
def teardown():
os.unlink(default_payload)
os.unlink(stop_payload)
for p in subprocesses:
try:
p.kill()
except:
pass
class TestCodeInjection(unittest.TestCase):
def test_injection(self):
def __init__(self, *args, **kw):
super(TestCodeInjection, self).__init__(*args, **kw)
global default_payload, stop_payload
self.stop_payload = default_payload = self.generate_payload_stopper()
self.default_payload = stop_payload = self.generate_payload()
def generate_payload(self, threads=1):
(fd, filename) = tempfile.mkstemp()
tmp = os.fdopen(fd, 'w')
script = textwrap.dedent("""
import time, threading, random
global running
running = True
def snooze():
global running
while running:
time.sleep(random.random())
""")
for t in range(threads):
script += "threading.Thread(target=snooze).start()\n"
tmp.write(script)
tmp.close()
return filename
def generate_payload_stopper(self):
(fd, filename) = tempfile.mkstemp()
tmp = os.fdopen(fd, 'w')
tmp.write('globals()["running"] = False')
tmp.close()
return filename
def run_python(self, payload, exe='python'):
p = subprocess.Popen('%s %s' % (exe, payload),
shell=True, stdout=subprocess.PIPE,
stderr=subprocess.PIPE)
subprocesses.append(p)
return p
def assert_output_contains(self, stdout, stderr, text):
assert text in str(stdout), \
"Code injection failed: %s\n%s" % (stdout, stderr)
def test_threadless_injection(self):
cmd = 'python -c "import time; time.sleep(0.5)"'
p = subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE,
stderr=subprocess.PIPE)
pyrasite.inject(p.pid, 'pyrasite/payloads/helloworld.py', verbose=True)
pyrasite.inject(p.pid, self.stop_payload, verbose=True)
stdout, stderr = p.communicate()
self.assert_output_contains(stdout, stderr, 'Hello World!')
def test_multithreaded_injection(self):
p = self.run_python(self.default_payload)
time.sleep(0.5)
pyrasite.inject(p.pid, 'pyrasite/payloads/helloworld.py', verbose=True)
pyrasite.inject(p.pid, self.stop_payload, verbose=True)
stdout, stderr = p.communicate()
self.assert_output_contains(stdout, stderr, 'Hello World!')
def test_many_threads_and_many_payloads(self):
payload = self.generate_payload(threads=100)
p = self.run_python(payload)
time.sleep(0.5)
total = 100
for i in range(total):
pyrasite.inject(p.pid,
'pyrasite/payloads/helloworld.py', verbose=True)
pyrasite.inject(p.pid, self.stop_payload, verbose=True)
stdout, stderr = p.communicate()
assert 'Hello World!' in stdout.decode('utf-8'), \
"Code injection failed"
def test_multithreaded_injection(self):
cmd = [
'import time, threading',
'snooze = lambda: time.sleep(0.5)',
'threading.Thread(target=snooze).start()',
]
p = subprocess.Popen('python -c "%s"' % ';'.join(cmd), shell=True,
stdout=subprocess.PIPE, stderr=subprocess.PIPE)
count = 0
for line in stdout.decode('utf-8').split('\n'):
if line.strip() == 'Hello World!':
count += 1
os.unlink(payload)
assert count == total, "Read %d hello worlds" % count
def test_injecting_into_the_same_interpreter(self):
print("sys.executable = %s" % sys.executable)
cmd = '%s -c "import time; time.sleep(2.0)"' % sys.executable
p = subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE,
stderr=subprocess.PIPE)
time.sleep(0.5)
pyrasite.inject(p.pid, 'pyrasite/payloads/helloworld.py', verbose=True)
stdout, stderr = p.communicate()
self.assert_output_contains(stdout, stderr, 'Hello World!')
def test_injecting_threads_into_the_same_interpreter(self):
if sys.version_info[0] == 3: exe = 'python3'
else: exe = 'python2'
print("sys.executable = %s" % sys.executable)
payload = self.generate_payload(threads=10)
p = self.run_python(payload, exe=exe)
time.sleep(0.5)
pyrasite.inject(p.pid, 'pyrasite/payloads/helloworld.py', verbose=True)
pyrasite.inject(p.pid, self.stop_payload, verbose=True)
stdout, stderr = p.communicate()
os.unlink(payload)
assert 'Hello World!' in str(stdout), \
"Code injection failed: %s\n%s" % (stdout, stderr)
def test_injecting_into_different_interpreter_version(self):
if sys.version_info[0] == 3: exe = 'python2'
else: exe = 'python3'
print("sys.executable = %s" % sys.executable)
print("injecting into %s" % exe)
cmd = '%s -c "import time; time.sleep(2.0)"' % exe
p = subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE,
stderr=subprocess.PIPE)
time.sleep(0.5)
pyrasite.inject(p.pid, 'pyrasite/payloads/helloworld.py', verbose=True)
stdout, stderr = p.communicate()
self.assert_output_contains(stdout, stderr, 'Hello World!')
def test_injecting_threads_into_different_interpreter(self):
if sys.version_info[0] == 3: exe = 'python2'
else: exe = 'python3'
print("sys.executable = %s" % sys.executable)
payload = self.generate_payload(threads=10)
p = self.run_python(payload, exe=exe)
time.sleep(0.5)
pyrasite.inject(p.pid, 'pyrasite/payloads/helloworld.py', verbose=True)
pyrasite.inject(p.pid, self.stop_payload, verbose=True)
stdout, stderr = p.communicate()
assert 'Hello World!' in stdout.decode('utf-8'), \
"Multi-threaded code injection failed"
os.unlink(payload)
self.assert_output_contains(stdout, stderr, 'Hello World!')
if __name__ == '__main__':
......
......@@ -30,7 +30,7 @@ class TestIPC(unittest.TestCase):
self.ipc = pyrasite.PyrasiteIPC(self.p.pid)
def tearDown(self):
self.p.kill()
self.p.terminate()
self.ipc.close()
def test_listen(self):
......@@ -58,6 +58,15 @@ class TestIPC(unittest.TestCase):
self.ipc.connect()
assert self.ipc.cmd('print("mu")') == 'mu\n'
def test_unreliable(self):
self.ipc.reliable = False
self.ipc.connect()
out = self.ipc.cmd('print("mu")')
assert out == 'mu\n', out
def test_repr(self):
assert repr(self.ipc)
if __name__ == '__main__':
unittest.main()
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment