Commit 2bd3bf5b authored by Julien Muchembled's avatar Julien Muchembled

tests: code cleanup

parent 35a73567
......@@ -156,9 +156,9 @@ class NetworkcacheClient(object):
try:
shacache_connection.request('POST', self.shacache_path, file_descriptor,
self.shacache_header_dict)
print 'uploade'
print 'uploaded'
result = shacache_connection.getresponse()
print 'repondu'
print 'answered'
data = result.read()
print 'read'
finally:
......
......@@ -119,6 +119,13 @@ class Server(BaseHTTPServer.HTTPServer):
__run = True
@classmethod
def run(cls, *args, **kw):
thread = threading.Thread(target=cls(*args, **kw).serve_forever)
thread.daemon = True
thread.start()
return thread
def serve_forever(self):
while self.__run:
self.handle_request()
......@@ -127,12 +134,6 @@ class Server(BaseHTTPServer.HTTPServer):
self.__run = False
def _run_nc(tree, host, port):
server_address = (host, port)
httpd = Server(tree, server_address, NCHandler)
httpd.serve_forever()
class OfflineTest(unittest.TestCase):
shacache_url = 'http://localhost:1/shacache'
shadir_url = 'http://localhost:1/shadir'
......@@ -147,10 +148,9 @@ class OfflineTest(unittest.TestCase):
self.assertRaises(IOError, nc.download, 'sha512sum')
def test_upload_offline(self):
content = tempfile.TemporaryFile()
nc = slapos.libnetworkcache.NetworkcacheClient(self.shacache_url,
self.shadir_url)
self.assertRaises(IOError, nc.upload, content)
self.assertRaises(IOError, nc.upload, tempfile.TemporaryFile())
def test_init_method_normal_http_url(self):
"""
......@@ -179,50 +179,23 @@ class OfflineTest(unittest.TestCase):
self.assertTrue(nc.shadir_host in self.shadir_url)
def wait(host, port):
addr = host, port
for i in range(120):
time.sleep(0.25)
try:
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.connect(addr)
s.close()
break
except socket.error, e:
if e[0] not in (errno.ECONNREFUSED, errno.ECONNRESET):
raise
s.close()
else:
raise
class OnlineMixin:
def _start_nc(self):
self.thread = threading.Thread(target=_run_nc, args=(self.tree,
self.host, self.port))
self.thread.setDaemon(True)
self.thread.start()
wait(self.host, self.port)
handler = NCHandler
schema = 'http'
def setUp(self):
self.host = "127.0.0.1"
self.port = 8080
self.url = '%s://%s:%s/' % (self.schema, self.host, self.port)
self.shacache = os.environ.get('TEST_SHA_CACHE',
self.url + 'shacache')
self.shadir = os.environ.get('TEST_SHA_DIR',
self.url + 'shadir')
self.shacache = os.environ.get('TEST_SHA_CACHE', self.url + 'shacache')
self.shadir = os.environ.get('TEST_SHA_DIR', self.url + 'shadir')
if not 'TEST_SHA_CACHE' in os.environ and not 'TEST_SHA_DIR' in os.environ:
self.tree = tempfile.mkdtemp()
self._start_nc()
self.thread = Server.run(self.tree, (self.host, self.port), self.handler)
self.test_data = tempfile.TemporaryFile()
self.test_string = str(random.random())
self.test_data.write(self.test_string)
self.test_data.seek(0)
self.test_shasum = hashlib.sha512(self.test_data.read()).hexdigest()
self.test_data.seek(0)
self.test_shasum = hashlib.sha512(self.test_string).hexdigest()
def tearDown(self):
if not 'TEST_SHA_CACHE' in os.environ and not 'TEST_SHA_DIR' in os.environ:
......@@ -235,6 +208,13 @@ class OnlineMixin:
self.thread.join()
shutil.rmtree(self.tree)
def assertDirectoryNotFound(self, msg, func, *args, **kw):
try:
func(*args, **kw)
self.fail("DirectoryNotFound not raised")
except slapos.libnetworkcache.DirectoryNotFound, e:
self.assertTrue(str(e).startswith(msg))
key = """-----BEGIN RSA PRIVATE KEY-----
MIICXQIBAAKBgQDDrOO87nSiDcXOf+xGc4Iqcdjfwd0RTOxEkO9z8mPZVg2bTPwt
/GwtPgmIC4po3bJdsCpJH21ZJwfmUpaQWIApj3odDAbRXQHWhNiw9ZPMHTCmf8Zl
......@@ -408,7 +388,6 @@ class OnlineTest(OnlineMixin, unittest.TestCase):
def test_upload_twice_same(self):
nc = slapos.libnetworkcache.NetworkcacheClient(self.shacache, self.shadir)
nc.upload(self.test_data)
self.test_data.seek(0)
nc.upload(self.test_data)
def test_download(self):
......@@ -436,18 +415,11 @@ class OnlineTest(OnlineMixin, unittest.TestCase):
key = 'somekey' + str(random.random())
urlmd5 = str(random.random())
file_name = 'my file'
test_data = tempfile.TemporaryFile()
test_string = str(random.random())
test_data.write(test_string)
test_data.seek(0)
key_file = tempfile.NamedTemporaryFile()
key_file.write(self.key)
key_file.flush()
key_file.seek(0)
signed_nc = slapos.libnetworkcache.NetworkcacheClient(
self.shacache, self.shadir, key_file.name, [self.certificate])
signed_nc.upload(self.test_data, key, urlmd5=urlmd5, file_name=file_name)
result = signed_nc.select(key)
self.assertEqual(result.read(), self.test_string)
......@@ -456,19 +428,12 @@ class OnlineTest(OnlineMixin, unittest.TestCase):
key = 'somekey' + str(random.random())
urlmd5 = str(random.random())
file_name = 'my file'
test_data = tempfile.TemporaryFile()
test_string = str(random.random())
test_data.write(test_string)
test_data.seek(0)
key_file = tempfile.NamedTemporaryFile()
key_file.write(self.key)
key_file.flush()
key_file.seek(0)
signed_nc = slapos.libnetworkcache.NetworkcacheClient(
self.shacache, self.shadir, key_file.name, [self.alternate_certificate,
self.certificate])
self.shacache, self.shadir, key_file.name,
(self.alternate_certificate, self.certificate))
signed_nc.upload(self.test_data, key, urlmd5=urlmd5, file_name=file_name)
result = signed_nc.select(key)
self.assertEqual(result.read(), self.test_string)
......@@ -477,18 +442,11 @@ class OnlineTest(OnlineMixin, unittest.TestCase):
key = 'somekey' + str(random.random())
urlmd5 = str(random.random())
file_name = 'my file'
test_data = tempfile.TemporaryFile()
test_string = str(random.random())
test_data.write(test_string)
test_data.seek(0)
key_file = tempfile.NamedTemporaryFile()
key_file.write(self.key)
key_file.flush()
key_file.seek(0)
signed_nc = slapos.libnetworkcache.NetworkcacheClient(
self.shacache, self.shadir, key_file.name, [self.certificate])
signed_nc.upload(self.test_data, key, urlmd5=urlmd5, file_name=file_name)
signed_nc.upload(self.test_data, key, urlmd5=urlmd5, file_name=file_name)
result = signed_nc.select(key)
......@@ -498,21 +456,15 @@ class OnlineTest(OnlineMixin, unittest.TestCase):
key = 'somekey' + str(random.random())
urlmd5 = str(random.random())
file_name = 'my file'
test_data = tempfile.TemporaryFile()
test_string = str(random.random())
test_data.write(test_string)
test_data.seek(0)
key_file = tempfile.NamedTemporaryFile()
key_file.write(self.key)
key_file.flush()
key_file.seek(0)
signed_nc = slapos.libnetworkcache.NetworkcacheClient(
self.shacache, self.shadir, key_file.name, [self.certificate])
signed_nc.openssl = '/doesnotexists'
try:
signed_nc.upload(self.test_data, key, urlmd5=urlmd5, file_name=file_name)
self.fail("UploadError not raised")
except slapos.libnetworkcache.UploadError, e:
self.assertTrue(str(e).startswith("Impossible to sign content, error"))
......@@ -520,21 +472,15 @@ class OnlineTest(OnlineMixin, unittest.TestCase):
key = 'somekey' + str(random.random())
urlmd5 = str(random.random())
file_name = 'my file'
test_data = tempfile.TemporaryFile()
test_string = str(random.random())
test_data.write(test_string)
test_data.seek(0)
key_file = tempfile.NamedTemporaryFile()
key_file.write(self.key)
key_file.flush()
key_file.seek(0)
signed_nc = slapos.libnetworkcache.NetworkcacheClient(
self.shacache, self.shadir, key_file.name, [self.certificate])
signed_nc.openssl = sys.executable
try:
signed_nc.upload(self.test_data, key, urlmd5=urlmd5, file_name=file_name)
self.fail("UploadError not raised")
except slapos.libnetworkcache.UploadError, e:
self.assertTrue(str(e).startswith("Impossible to sign content, error"))
......@@ -542,67 +488,36 @@ class OnlineTest(OnlineMixin, unittest.TestCase):
key = 'somekey' + str(random.random())
urlmd5 = str(random.random())
file_name = 'my file'
test_data = tempfile.TemporaryFile()
test_string = str(random.random())
test_data.write(test_string)
test_data.seek(0)
key_file = tempfile.NamedTemporaryFile()
key_file.write(self.key)
key_file.flush()
key_file.seek(0)
signed_nc = slapos.libnetworkcache.NetworkcacheClient(
self.shacache, self.shadir, key_file.name, [self.certificate])
self.assertEqual(True, signed_nc.upload(self.test_data, key,
urlmd5=urlmd5, file_name=file_name))
try:
# disable openssl during download
signed_nc.openssl = '/doesnotexists'
result = signed_nc.select(key)
except slapos.libnetworkcache.DirectoryNotFound, msg:
# No way to download content from shadir w/o openssl
self.assertEqual(str(msg), 'Could not find a trustable entry.')
self.assertDirectoryNotFound('Could not find a trustable entry.',
signed_nc.select, key)
def test_select_signed_content_openssl_non_functional(self):
key = 'somekey' + str(random.random())
urlmd5 = str(random.random())
file_name = 'my file'
test_data = tempfile.TemporaryFile()
test_string = str(random.random())
test_data.write(test_string)
test_data.seek(0)
key_file = tempfile.NamedTemporaryFile()
key_file.write(self.key)
key_file.flush()
key_file.seek(0)
signed_nc = slapos.libnetworkcache.NetworkcacheClient(
self.shacache, self.shadir, key_file.name, [self.certificate])
self.assertEqual(True, signed_nc.upload(self.test_data, key,
urlmd5=urlmd5, file_name=file_name))
try:
# disable openssl during download
signed_nc.openssl = sys.executable
result = signed_nc.select(key)
except slapos.libnetworkcache.DirectoryNotFound, msg:
# No way to download content from shadir w/o openssl
self.assertEqual(str(msg), 'Could not find a trustable entry.')
self.assertDirectoryNotFound('Could not find a trustable entry.',
signed_nc.select, key)
def test_select_no_entries(self):
key = 'somekey' + str(random.random())
urlmd5 = str(random.random())
file_name = 'my file'
test_data = tempfile.TemporaryFile()
test_string = str(random.random())
test_data.write(test_string)
test_data.seek(0)
key_file = tempfile.NamedTemporaryFile()
key_file.write(self.key)
key_file.flush()
key_file.seek(0)
nc = slapos.libnetworkcache.NetworkcacheClient(
self.shacache, self.shadir)
self.assertEqual(True, nc.upload(self.test_data, key, urlmd5=urlmd5,
......@@ -610,104 +525,60 @@ class OnlineTest(OnlineMixin, unittest.TestCase):
f = os.path.join(self.tree, 'shadir', key)
# now remove the entry from shacache
open(f, 'w').write(json.dumps([]))
try:
nc.select(key)
except slapos.libnetworkcache.DirectoryNotFound, msg:
# empty content in shadir so usual exception is raised
self.assertEqual(str(msg), 'Could not find a trustable entry.')
self.assertDirectoryNotFound('Could not find a trustable entry.',
nc.select, key)
def test_select_no_json_response(self):
key = 'somekey' + str(random.random())
urlmd5 = str(random.random())
file_name = 'my file'
test_data = tempfile.TemporaryFile()
test_string = str(random.random())
test_data.write(test_string)
test_data.seek(0)
key_file = tempfile.NamedTemporaryFile()
key_file.write(self.key)
key_file.flush()
key_file.seek(0)
nc = slapos.libnetworkcache.NetworkcacheClient(
self.shacache, self.shadir)
self.assertEqual(True, nc.upload(self.test_data, key, urlmd5=urlmd5,
file_name=file_name))
f = os.path.join(self.tree, 'shadir', key)
with open(os.path.join(self.tree, 'shadir', key), 'w') as f:
# now remove the entry from shacache
open(f, 'w').write('This is not a json.')
try:
nc.select(key)
except slapos.libnetworkcache.DirectoryNotFound, msg:
self.assertTrue(str(msg).startswith('It was impossible to parse json '
'response'))
f.write('This is not a json.')
self.assertDirectoryNotFound('It was impossible to parse json response',
nc.select, key)
def test_select_json_no_in_json_response(self):
key = 'somekey' + str(random.random())
urlmd5 = str(random.random())
file_name = 'my file'
test_data = tempfile.TemporaryFile()
test_string = str(random.random())
test_data.write(test_string)
test_data.seek(0)
key_file = tempfile.NamedTemporaryFile()
key_file.write(self.key)
key_file.flush()
key_file.seek(0)
nc = slapos.libnetworkcache.NetworkcacheClient(
self.shacache, self.shadir)
self.assertEqual(True, nc.upload(self.test_data, key, urlmd5=urlmd5,
file_name=file_name))
f = os.path.join(self.tree, 'shadir', key)
with open(os.path.join(self.tree, 'shadir', key), 'w') as f:
# now remove the entry from shacache
open(f, 'w').write(json.dumps([['This is not a json.', 'signature']]))
try:
nc.select(key)
except slapos.libnetworkcache.DirectoryNotFound, msg:
self.assertTrue(str(msg).startswith('It was impossible to parse '
'json-in-json response'))
f.write(json.dumps([['This is not a json.', 'signature']]))
self.assertDirectoryNotFound(
'It was impossible to parse json-in-json response',
nc.select, key)
def test_select_json_in_json_no_dict(self):
key = 'somekey' + str(random.random())
urlmd5 = str(random.random())
file_name = 'my file'
test_data = tempfile.TemporaryFile()
test_string = str(random.random())
test_data.write(test_string)
test_data.seek(0)
key_file = tempfile.NamedTemporaryFile()
key_file.write(self.key)
key_file.flush()
key_file.seek(0)
nc = slapos.libnetworkcache.NetworkcacheClient(
self.shacache, self.shadir)
self.assertEqual(True, nc.upload(self.test_data, key, urlmd5=urlmd5,
file_name=file_name))
f = os.path.join(self.tree, 'shadir', key)
with open(os.path.join(self.tree, 'shadir', key), 'w') as f:
# now remove the entry from shacache
open(f, 'w').write(json.dumps([[json.dumps('This is a string'),
'signature']]))
try:
nc.select(key)
except slapos.libnetworkcache.DirectoryNotFound, msg:
self.assertTrue(str(msg).startswith('It was impossible to fetch sha512 '
'from directory response'))
f.write(json.dumps([[json.dumps('This is a string'), 'signature']]))
self.assertDirectoryNotFound(
'It was impossible to fetch sha512 from directory response',
nc.select, key)
def test_select_signed_content_server_hacked(self):
key = 'somekey' + str(random.random())
urlmd5 = str(random.random())
file_name = 'my file'
test_data = tempfile.TemporaryFile()
test_string = str(random.random())
test_data.write(test_string)
test_data.seek(0)
key_file = tempfile.NamedTemporaryFile()
key_file.write(self.key)
key_file.flush()
key_file.seek(0)
signed_nc = slapos.libnetworkcache.NetworkcacheClient(
self.shacache, self.shadir, key_file.name, [self.certificate])
......@@ -725,11 +596,8 @@ class OnlineTest(OnlineMixin, unittest.TestCase):
# ...but as he has no access to key, no way to sign data..
# hacked_json[0][1] is still a good key
open(f, 'w').write(json.dumps(hacked_json))
try:
result = signed_nc.select(key)
except slapos.libnetworkcache.DirectoryNotFound, msg:
# hacked content is not trusted
self.assertEqual(str(msg), 'Could not find a trustable entry.')
self.assertDirectoryNotFound('Could not find a trustable entry.',
signed_nc.select, key)
def test_select_DirectoryNotFound_too_many_for_key(self):
nc = slapos.libnetworkcache.NetworkcacheClient(self.shacache, self.shadir)
......@@ -739,20 +607,15 @@ class OnlineTest(OnlineMixin, unittest.TestCase):
test_data = tempfile.TemporaryFile()
test_string = str(random.random())
test_data.write(test_string)
test_data.seek(0)
nc.upload(self.test_data, key, urlmd5=urlmd5, file_name=file_name)
nc.upload(test_data, key, urlmd5=urlmd5, file_name=file_name)
try:
nc.select(key)
except slapos.libnetworkcache.DirectoryNotFound, msg:
self.assertTrue(
str(msg).startswith("Too many entries for a given key %r" % key))
self.assertDirectoryNotFound("Too many entries for a given key %r" % key,
nc.select, key)
def test_DirectoryNotFound_non_trustable_entry(self):
key_file = tempfile.NamedTemporaryFile()
key_file.write(self.key)
key_file.flush()
key_file.seek(0)
nc = slapos.libnetworkcache.NetworkcacheClient(self.shacache, self.shadir)
key = 'somekey' + str(random.random())
......@@ -766,10 +629,8 @@ class OnlineTest(OnlineMixin, unittest.TestCase):
selected = nc.select(key).read()
self.assertEqual(selected, self.test_string)
# but when signature is used, networkcache will complain
try:
signed_nc.select(key)
except slapos.libnetworkcache.DirectoryNotFound, msg:
self.assertEqual(str(msg), 'Could not find a trustable entry.')
self.assertDirectoryNotFound('Could not find a trustable entry.',
signed_nc.select, key)
# of course if proper key will be used to sign the content uploaded
# into shacache all will work
......@@ -783,12 +644,10 @@ class OnlineTest(OnlineMixin, unittest.TestCase):
key_file = tempfile.NamedTemporaryFile()
key_file.write(self.key)
key_file.flush()
key_file.seek(0)
certificate_file = tempfile.NamedTemporaryFile()
certificate_file.write(self.certificate)
certificate_file.flush()
certificate_file.seek(0)
nc = slapos.libnetworkcache.NetworkcacheClient(self.shacache, self.shadir,
shacache_cert_file=certificate_file, shacache_key_file=key_file)
......@@ -800,12 +659,10 @@ class OnlineTest(OnlineMixin, unittest.TestCase):
key_file = tempfile.NamedTemporaryFile()
key_file.write(self.auth_key)
key_file.flush()
key_file.seek(0)
certificate_file = tempfile.NamedTemporaryFile()
certificate_file.write(self.auth_certificate)
certificate_file.flush()
certificate_file.seek(0)
# simplified assertion, as no http authentication server is available
nc = slapos.libnetworkcache.NetworkcacheClient(self.shadir, self.shadir,
......@@ -863,19 +720,8 @@ class OnlineTestSSLServer(OnlineMixin, unittest.TestCase):
file_name=str(random.random()), urlmd5=str(random.random()))
def _run_nc_POST200(tree, host, port):
server_address = (host, port)
httpd = Server(tree, server_address, NCHandlerPOST200)
httpd.serve_forever()
class OnlineTestPOST200(OnlineMixin, unittest.TestCase):
def _start_nc(self):
self.thread = threading.Thread(target=_run_nc_POST200, args=(self.tree,
self.host, self.port))
self.thread.setDaemon(True)
self.thread.start()
wait(self.host, self.port)
handler = NCHandlerPOST200
def test_upload_wrong_return_code(self):
"""Check reaction on HTTP return code different then 201"""
......@@ -884,19 +730,8 @@ class OnlineTestPOST200(OnlineMixin, unittest.TestCase):
self.test_data)
def _run_nc_POSTWrongChecksum(tree, host, port):
server_address = (host, port)
httpd = Server(tree, server_address, NCHandlerReturnWrong)
httpd.serve_forever()
class OnlineTestWrongChecksum(OnlineMixin, unittest.TestCase):
def _start_nc(self):
self.thread = threading.Thread(target=_run_nc_POSTWrongChecksum,
args=(self.tree, self.host, self.port))
self.thread.setDaemon(True)
self.thread.start()
wait(self.host, self.port)
handler = NCHandlerReturnWrong
def test_upload_wrong_return_sha(self):
"""Check reaction in case of wrong sha returned"""
......@@ -913,12 +748,9 @@ class GenerateSignatureScriptTest(unittest.TestCase):
self.certificate = os.path.join(tempfile.gettempdir(), tempfile.gettempprefix()
+ str(random.random()))
self.common_name = str(random.random())
self.key_exist = tempfile.NamedTemporaryFile(delete=False).name
self.certificate_exist = tempfile.NamedTemporaryFile(delete=False).name
def tearDown(self):
for f in [self.key, self.certificate, self.key_exist,
self.certificate_exist]:
for f in self.key, self.certificate:
if os.path.lexists(f):
os.unlink(f)
......@@ -934,10 +766,11 @@ class GenerateSignatureScriptTest(unittest.TestCase):
self.assertTrue(' %s ' % (today.year + 100) in result)
def test_generate_key_exists(self):
with tempfile.NamedTemporaryFile() as key:
self.assertRaises(ValueError, slapos.signature.generateCertificate,
self.certificate, self.key_exist, self.common_name)
self.certificate, key.name, self.common_name)
def test_generate_cert_exists(self):
with tempfile.NamedTemporaryFile() as cert:
self.assertRaises(ValueError, slapos.signature.generateCertificate,
self.certificate_exist, self.key, self.common_name)
cert.name, self.key, self.common_name)
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment