Commit 8e05a091 authored by Thomas Gambier's avatar Thomas Gambier 🚴🏼

Fix errors in python3

See merge request nexedi/slapos.core!403
parents a6c5469b 35d7014b
Pipeline #22747 passed with stage
in 0 seconds
...@@ -227,7 +227,8 @@ def slapconfig(conf): ...@@ -227,7 +227,8 @@ def slapconfig(conf):
if os.path.dirname(directory) == directory: if os.path.dirname(directory) == directory:
break break
# Do "chmod g+xro+xr" # Do "chmod g+xro+xr"
os.chmod(directory, os.stat(directory).st_mode | stat.S_IXGRP | stat.S_IRGRP | stat.S_IXOTH | stat.S_IROTH) if (os.stat(directory).st_mode & (stat.S_IXGRP | stat.S_IRGRP | stat.S_IXOTH | stat.S_IROTH) != stat.S_IXGRP | stat.S_IRGRP | stat.S_IXOTH | stat.S_IROTH):
os.chmod(directory, os.stat(directory).st_mode | stat.S_IXGRP | stat.S_IRGRP | stat.S_IXOTH | stat.S_IROTH)
directory = os.path.dirname(directory) directory = os.path.dirname(directory)
if not os.path.exists(slap_conf_dir): if not os.path.exists(slap_conf_dir):
...@@ -250,7 +251,6 @@ def slapconfig(conf): ...@@ -250,7 +251,6 @@ def slapconfig(conf):
with open(dst, 'w') as destination: with open(dst, 'w') as destination:
destination.write(''.join(src)) destination.write(''.join(src))
os.chmod(dst, 0o600) os.chmod(dst, 0o600)
os.chown(dst, 0, 0)
certificate_repository_path = os.path.join(slap_conf_dir, 'ssl', 'partition_pki') certificate_repository_path = os.path.join(slap_conf_dir, 'ssl', 'partition_pki')
if not os.path.exists(certificate_repository_path): if not os.path.exists(certificate_repository_path):
...@@ -282,7 +282,7 @@ def slapconfig(conf): ...@@ -282,7 +282,7 @@ def slapconfig(conf):
cfg = re.sub('\n\\s*%s\\s*=.*' % key, '\n%s = %s' % (key, value), cfg) cfg = re.sub('\n\\s*%s\\s*=.*' % key, '\n%s = %s' % (key, value), cfg)
if not dry_run: if not dry_run:
with open(config_path, 'w') as fout: with open(config_path, 'wb') as fout:
fout.write(cfg.encode('utf8')) fout.write(cfg.encode('utf8'))
conf.logger.info('SlapOS configuration written to %s', config_path) conf.logger.info('SlapOS configuration written to %s', config_path)
......
...@@ -509,7 +509,7 @@ class Partition(object): ...@@ -509,7 +509,7 @@ class Partition(object):
else: else:
self.logger.info('Changed %s content. Updating %r' % (name, path)) self.logger.info('Changed %s content. Updating %r' % (name, path))
with os.fdopen(os.open(path, os.O_CREAT | os.O_WRONLY | os.O_TRUNC, 0o400), 'wb') as fout: with os.fdopen(os.open(path, os.O_CREAT | os.O_WRONLY | os.O_TRUNC, 0o400), 'w') as fout:
fout.write(new_content) fout.write(new_content)
os.chown(path, uid, gid) os.chown(path, uid, gid)
......
...@@ -160,7 +160,7 @@ def upload_network_cached(software_root, software_url, cached_key, ...@@ -160,7 +160,7 @@ def upload_network_cached(software_root, software_url, cached_key,
os=distribution_tuple(), os=distribution_tuple(),
) )
f = open(path, 'r') f = open(path, 'rb')
# convert '' into None in order to call nc nicely # convert '' into None in order to call nc nicely
if not signature_private_key_file: if not signature_private_key_file:
signature_private_key_file = None signature_private_key_file = None
......
...@@ -25,28 +25,119 @@ ...@@ -25,28 +25,119 @@
# #
############################################################################## ##############################################################################
import httmock
import mock
import os
import random
import shutil
import string
import tempfile
import unittest import unittest
import slapos.cli.register from argparse import Namespace
from six.moves.urllib import parse
import slapos.slap
from slapos.cli.register import RegisterCommand, RegisterConfig, fetch_configuration_template, do_register
from slapos.cli.entry import SlapOSApp
class TestRegister(unittest.TestCase): class TestRegister(unittest.TestCase):
""" Tests for slapos.cli.register """ Tests for slapos.cli.register
XXX There is a lack of tests for register, so include more. XXX There is a lack of tests for register, so include more.
""" """
def setUp(self):
self.slap = slapos.slap.slap()
self.app = SlapOSApp()
self.temp_dir = tempfile.mkdtemp()
self.computer_id = 'COMP-%s' % random.randrange(10000)
self.certificate = 'CN=%s/emailAddress=admin@vifib.org' % self.computer_id
self.key = 'key_test_%s' % ''.join(random.choice(string.ascii_lowercase) for i in range(64))
self.default_args=[
'test-computer',
'--token', 'token-test'
]
def tearDown(self):
if os.path.exists(self.temp_dir):
shutil.rmtree(self.temp_dir)
def request_handler(self, url, req):
"""
Define _callback.
Will register global sequence of message, sequence by partition
and error and error message by partition
"""
#self.sequence.append(url.path)
if req.method == 'GET':
qs = parse.parse_qs(url.query)
else:
qs = parse.parse_qs(req.body)
print("DEBUG THOMAS")
print(url)
print(req)
if url.path == '/Person_requestComputer':
return {
'status_code': 200,
'content': {'certificate': self.certificate, 'key': self.key}
}
def getConf(self, args):
cmd = RegisterCommand(self.app, Namespace())
cmd_parser = cmd.get_parser("slapos node register for test")
parsed_args = cmd_parser.parse_args(args)
conf = RegisterConfig(logger=self.app.log)
conf.setConfig(parsed_args)
return conf
def test_fetch_configuration(self): def test_fetch_configuration(self):
""" Simple test to Fetch the configuration template """ Simple test to Fetch the configuration template
""" """
template = slapos.cli.register.fetch_configuration_template() template = fetch_configuration_template()
self.assertNotEqual("", template) self.assertNotEqual("", template)
for entry in ['computer_id', for entry in ['computer_id',
'master_url', 'master_url',
'key_file', 'key_file',
'cert_file', 'cert_file',
'certificate_repository_path', 'certificate_repository_path',
'interface_name', 'interface_name',
'ipv4_local_network', 'ipv4_local_network',
'partition_amount', 'partition_amount',
'create_tap']: 'create_tap']:
self.assertTrue(entry in template, "%s is not in template (%s)" % (entry, template)) self.assertTrue(entry in template, "%s is not in template (%s)" % (entry, template))
def test_default_token_rejected(self):
""" Make sure that the token for test if rejected by default
"""
conf = self.getConf(self.default_args)
with self.assertRaises(SystemExit) as cm:
do_register(conf)
self.assertEqual(cm.exception.code, 1)
def test_write_configuration(self):
""" Simple test to see if we can write the configuration file
"""
conf = self.getConf(self.default_args)
# we manually set the parameters below because we mock the function COMPConfig
# indeed, we don't want to put the config file in '/etc/opt/slapos'
conf.slapos_configuration = self.temp_dir
conf.computer_id = self.computer_id
conf.certificate = self.certificate
conf.key = self.key
with httmock.HTTMock(self.request_handler), \
mock.patch.object(RegisterConfig, 'COMPConfig') as COMPConfigMock, \
mock.patch('slapos.cli.register.save_former_config') as save_former_config_mock:
return_code = do_register(conf)
COMPConfigMock.assert_called_with(
slapos_configuration = '/etc/opt/slapos/',
computer_id = self.computer_id,
certificate = self.certificate,
key = self.key)
save_former_config_mock.assert_called()
self.assertEquals(0, return_code)
self.assertTrue(
os.path.exists('%s/slapos.cfg' % self.temp_dir))
config_content = open('%s/slapos.cfg' % self.temp_dir).read()
self.assertIn('computer_id = %s' % self.computer_id, config_content)
This diff is collapsed.
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment