Commit 8cb567fd authored by Luke Macken's avatar Luke Macken

Run all injection tests against every interpreter that is installed.

parent 8cc09098
...@@ -18,6 +18,7 @@ ...@@ -18,6 +18,7 @@
import os import os
import sys import sys
import time import time
import glob
import unittest import unittest
import textwrap import textwrap
import tempfile import tempfile
...@@ -85,81 +86,58 @@ class TestCodeInjection(unittest.TestCase): ...@@ -85,81 +86,58 @@ class TestCodeInjection(unittest.TestCase):
assert text in str(stdout), \ assert text in str(stdout), \
"Code injection failed: %s\n%s" % (stdout, stderr) "Code injection failed: %s\n%s" % (stdout, stderr)
def test_threadless_injection(self): def interpreters(self):
p = self.run_python('-c "import time; time.sleep(2.0)"') for exe in glob.glob('/usr/bin/python*.*'):
pyrasite.inject(p.pid, 'pyrasite/payloads/helloworld.py', verbose=True) try:
pyrasite.inject(p.pid, self.stop_payload, verbose=True) minor_ver = int(exe.split('.')[-1])
stdout, stderr = p.communicate() except ValueError:
self.assert_output_contains(stdout, stderr, 'Hello World!') continue # skip python2.7-config, etc
yield exe
def test_multithreaded_injection(self):
p = self.run_python(self.default_payload)
pyrasite.inject(p.pid, 'pyrasite/payloads/helloworld.py', verbose=True)
pyrasite.inject(p.pid, self.stop_payload, verbose=True)
stdout, stderr = p.communicate()
self.assert_output_contains(stdout, stderr, 'Hello World!')
def test_many_threads_and_many_payloads(self): def test_many_threads_and_many_payloads(self):
payload = self.generate_payload(threads=100) payload = self.generate_payload(threads=100)
p = self.run_python(payload) for exe in self.interpreters():
p = self.run_python(payload, exe=exe)
total = 100 total = 100
for i in range(total): for i in range(total):
pyrasite.inject(p.pid, pyrasite.inject(p.pid,
'pyrasite/payloads/helloworld.py', verbose=True) 'pyrasite/payloads/helloworld.py', verbose=True)
pyrasite.inject(p.pid, self.stop_payload, verbose=True) pyrasite.inject(p.pid, self.stop_payload, verbose=True)
stdout, stderr = p.communicate() stdout, stderr = p.communicate()
count = 0 count = 0
for line in stdout.decode('utf-8').split('\n'): for line in stdout.decode('utf-8').split('\n'):
if line.strip() == 'Hello World!': if line.strip() == 'Hello World!':
count += 1 count += 1
os.unlink(payload) os.unlink(payload)
assert count == total, "Read %d hello worlds" % count assert count == total, "Read %d hello worlds" % count
def test_injecting_into_the_same_interpreter(self): def test_threadless_injection_into_all_interpreters(self):
print("sys.executable = %s" % sys.executable) for exe in self.interpreters():
p = self.run_python('-c "import time; time.sleep(2.0)"', exe=sys.executable) print("sys.executable = %s" % sys.executable)
pyrasite.inject(p.pid, 'pyrasite/payloads/helloworld.py', verbose=True) print("injecting into %s" % exe)
stdout, stderr = p.communicate() p = self.run_python('-c "import time; time.sleep(2.0)"', exe=exe)
self.assert_output_contains(stdout, stderr, 'Hello World!') pyrasite.inject(p.pid, 'pyrasite/payloads/helloworld.py', verbose=True)
stdout, stderr = p.communicate()
def test_injecting_threads_into_the_same_interpreter(self): self.assert_output_contains(stdout, stderr, 'Hello World!')
if sys.version_info[0] == 3: exe = 'python3'
else: exe = 'python2' def test_injecting_threads_into_all_interpreters(self):
print("sys.executable = %s" % sys.executable)
payload = self.generate_payload(threads=10)
p = self.run_python(payload, exe=exe)
pyrasite.inject(p.pid, 'pyrasite/payloads/helloworld.py', verbose=True)
pyrasite.inject(p.pid, self.stop_payload, verbose=True)
stdout, stderr = p.communicate()
os.unlink(payload)
self.assert_output_contains(stdout, stderr, 'Hello World!')
def test_injecting_into_different_interpreter_version(self):
if sys.version_info[0] == 3: exe = 'python2'
else: exe = 'python3'
print("sys.executable = %s" % sys.executable)
print("injecting into %s" % exe)
p = self.run_python('-c "import time; time.sleep(2.0)"', exe=exe)
pyrasite.inject(p.pid, 'pyrasite/payloads/helloworld.py', verbose=True)
stdout, stderr = p.communicate()
self.assert_output_contains(stdout, stderr, 'Hello World!')
def test_injecting_threads_into_different_interpreter(self):
if sys.version_info[0] == 3: exe = 'python2'
else: exe = 'python3'
print("sys.executable = %s" % sys.executable)
payload = self.generate_payload(threads=10) payload = self.generate_payload(threads=10)
p = self.run_python(payload, exe=exe) try:
pyrasite.inject(p.pid, 'pyrasite/payloads/helloworld.py', verbose=True) for exe in self.interpreters():
pyrasite.inject(p.pid, self.stop_payload, verbose=True) print("sys.executable = %s" % sys.executable)
stdout, stderr = p.communicate() print("injecting into %s" % exe)
os.unlink(payload) p = self.run_python(payload, exe=exe)
self.assert_output_contains(stdout, stderr, 'Hello World!') pyrasite.inject(p.pid, 'pyrasite/payloads/helloworld.py', verbose=True)
pyrasite.inject(p.pid, self.stop_payload, verbose=True)
stdout, stderr = p.communicate()
self.assert_output_contains(stdout, stderr, 'Hello World!')
finally:
os.unlink(payload)
if __name__ == '__main__': if __name__ == '__main__':
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment