Commit 7c9fdce8 authored by Tomáš Peterka's avatar Tomáš Peterka Committed by Rafael Monnerat

[manager.cpuset] Manager interface and cpuset implementation (with tests)

parent e01881fd
......@@ -55,6 +55,7 @@ setup(name=name,
'zc.buildout',
'cliff',
'requests>=2.4.3',
'six',
'uritemplate', # used by hateoas navigator
] + additional_install_requires,
extras_require={
......
......@@ -10,3 +10,16 @@ Manager is a plugin-like class that is being run in multiple phases of slapos no
Constructor will receive configuration of current stage. Then each method receives
object most related to the current operation. For details see <slapos/manager/interface.py>.
In code, a list of manager instances can be easily retreived by
from slapos import manager
manager_list = manager.from_config(config)
Where `from_config` extracts "manager_list" item from dict-like `config` argument
and then dynamically loads modules named according to the configuration inside
`slapos.manager` package. The manager must be a class named Manager and implementing
interface `slapos.manager.interface.IManager`.
Managers might require a list of user for whom they are allowed to perform tasks.
This list of users is given by "power_user_list" in [slapos] section in the
config file.
......@@ -280,7 +280,10 @@ class Computer(object):
# attributes starting with '_' are saved from serialization
# monkey-patch use of class instead of dictionary
self._config = config if isinstance(config, dict) else config.__dict__
if config is None:
logger.warning("Computer needs config in constructor to allow managers.")
self._config = config if config is None or isinstance(config, dict) else config.__dict__
self._manager_list = slapmanager.from_config(self._config)
def __getinitargs__(self):
......
......@@ -1086,7 +1086,7 @@ stderr_logfile_backups=1
# call manager for every software release
for manager in self._manager_list:
manager.instance(partition)
manager.instance(local_partition)
if computer_partition_state == COMPUTER_PARTITION_STARTED_STATE:
local_partition.install()
......
# coding: utf-8
import re
import importlib
import re
import six
from zope.interface import declarations
......@@ -12,20 +13,27 @@ def load_manager(name):
if re.match(r'[a-zA-Z_]', name) is None:
raise ValueError("Manager name \"{!s}\" is not allowed! Must contain only letters and \"_\"".
format(name))
manager_module_name = "slapos.manager.{}".format(name)
from slapos.manager import interface
manager_module = importlib.import_module("slapos.manager." + name)
manager_module = importlib.import_module(manager_module_name)
if not hasattr(manager_module, "Manager"):
raise AttributeError("Manager class in {} has to be called \"Manager\"".format(
name))
raise AttributeError("Manager class in {} has to be called \"Manager\"".format(
manager_module_name))
if not interface.IManager.implementedBy(manager_module.Manager):
raise RuntimeError("Manager class in {} must zope.interface.implements \"IManager\"".format(
name))
raise RuntimeError("Manager class in {} must zope.interface.implements \"IManager\"".format(
manager_module_name))
return manager_module.Manager
def from_config(config):
"""Return list of instances of managers allowed from the config."""
name_list = config.get(config_option, "").split()
if config is None:
return []
name_list = config.get(config_option, "")
if isinstance(name_list, six.string_types):
name_list = name_list.replace(",", " ").split()
return [load_manager(name)(config) for name in name_list]
\ No newline at end of file
......@@ -2,6 +2,8 @@
import logging
import os
import os.path
import pwd
import time
from zope import interface as zope_interface
from slapos.manager import interface
......@@ -82,13 +84,14 @@ class Manager(object):
for cpu_folder in self._cpu_folder_list()]
# Gather exclusive CPU usage map {username: set[cpu_id]}
cpu_usage = defaultdict(set)
for cpu_id in self._cpu_id_list()[1:]: # skip the first public CPU
pids = [int(pid)
for pid in read_file(cpu_tasks_file_list[cpu_id]).splitlines()]
for pid in pids:
process = psutil.Process(pid)
cpu_usage[process.username()].add(cpu_id)
# We do not need to gather that since we have no limits yet
#cpu_usage = defaultdict(set)
#for cpu_id in self._cpu_id_list()[1:]: # skip the first public CPU
# pids = [int(pid)
# for pid in read_file(cpu_tasks_file_list[cpu_id]).splitlines()]
# for pid in pids:
# process = psutil.Process(pid)
# cpu_usage[process.username()].add(cpu_id)
# Move all PIDs from the pool of all CPUs onto the first exclusive CPU.
running_list = sorted(list(map(int, read_file(tasks_file).split())), reverse=True)
......@@ -105,7 +108,7 @@ class Manager(object):
"Suceeded in moving {:d} PIDs {!s}\n".format(
len(refused_set), refused_set, len(success_set), success_set))
cpu_list = self._cpu_folder_list()
cpu_folder_list = self._cpu_folder_list()
generic_cpu_path = cpu_folder_list[0]
exclusive_cpu_path_list = cpu_folder_list[1:]
......@@ -116,7 +119,7 @@ class Manager(object):
# gather already exclusively running PIDs
exclusive_pid_set = set()
for cpu_tasks_file in cpu_tasks_file_list[1:]:
exclusive_pid_set.update(map(int, read_content(cpu_tasks_file).split()))
exclusive_pid_set.update(map(int, read_file(cpu_tasks_file).split()))
# Move processes to their demanded exclusive CPUs
with open(request_file, "rt") as fi:
......@@ -135,7 +138,7 @@ class Manager(object):
def _cpu_folder_list(self):
"""Return list of folders for exclusive cpu cores."""
return [os.path.join(self.cpuset_path, "cpu" + str(cpu_id))
for cpu_id in self._cpu_id_list]
for cpu_id in self._cpu_id_list()]
def _cpu_id_list(self):
"""Extract IDs of available CPUs and return them as a list.
......
......@@ -155,14 +155,18 @@ class PwdMock:
global USER_LIST
if name in USER_LIST:
class PwdResult:
pw_uid = 0
pw_gid = 0
def __init__(self, name):
self.pw_name = name
self.pw_uid = self.pw_gid = USER_LIST.index(name)
def __getitem__(self, index):
if index == 0:
return self.pw_name
if index == 2:
return self.pw_uid
if index == 3:
return self.pw_gid
return PwdResult()
return PwdResult(name)
raise KeyError("User \"{}\" not in global USER_LIST {!s}".format(name, USER_LIST))
......@@ -656,10 +660,14 @@ class TestComputer(SlapformatMixin):
class SlapGridPartitionMock:
def __init__(self, partition):
self.partition = partition
self.instance_path = partition.path
def getUserGroupId(self):
return (0, 0)
class TestComputerWithCPUSet(SlapformatMixin):
......@@ -667,6 +675,9 @@ class TestComputerWithCPUSet(SlapformatMixin):
task_write_mode = "at" # append insted of write tasks PIDs for the tests
def setUp(self):
logging.getLogger("slapos.manager.cpuset").addHandler(
logging.StreamHandler())
super(TestComputerWithCPUSet, self).setUp()
self.restoreOs()
......@@ -710,14 +721,13 @@ class TestComputerWithCPUSet(SlapformatMixin):
],
config={
"manager_list": "cpuset",
"power_user_list": "testuser"
"power_user_list": "testuser root"
}
)
# self.patchOs(self.logger)
def tearDown(self):
"""Cleanup temporary test folders."""
from slapos.manager.cpuset import Manager
Manager.cpuset_path = self.orig_cpuset_path
Manager.task_write_mode = self.orig_task_write_mode
......@@ -726,6 +736,7 @@ class TestComputerWithCPUSet(SlapformatMixin):
shutil.rmtree("/tmp/slapgrid/")
if self.cpuset_path.startswith("/tmp"):
shutil.rmtree(self.cpuset_path)
logging.getLogger("slapos.manager.cpuset")
def test_positive_cgroups(self):
"""Positive test of cgroups."""
......@@ -741,25 +752,23 @@ class TestComputerWithCPUSet(SlapformatMixin):
if cpu_id > 0:
self.assertEqual("", file_content(os.path.join(cpu_n_path, "tasks")))
# Simulate slapos instance call
self.computer._manager_list[0].instance(SlapGridPartitionMock(self.computer.partition_list[0]))
# Test that format moved all PIDs from CPU pool into CPU0
tasks_at_cpu0 = file_content(os.path.join(self.cpuset_path, "cpu0", "tasks")).split()
self.assertIn("1000", tasks_at_cpu0)
self.assertIn("1001", tasks_at_cpu0)
self.assertIn("1002", tasks_at_cpu0)
# Simulate cgroup behaviour - empty tasks in the pool
file_write("", os.path.join(self.cpuset_path, "tasks"))
# test moving tasks from generic core to private core
# Test moving tasks from generic core to private core
# request PID 1001 to be moved to its private CPU
request_file_path = os.path.join(self.computer.partition_list[0].path,
self.cpu_exclusive_file)
slapos.manager.cpuset.Manager.cpu_exclusive_file)
file_write("1001\n", request_file_path)
# let format do the moving
self.computer.update()
# Simulate slapos instance call to perform the actual movement
self.computer._manager_list[0].instance(
SlapGridPartitionMock(self.computer.partition_list[0]))
# Simulate cgroup behaviour - empty tasks in the pool
file_write("", os.path.join(self.cpuset_path, "tasks"))
# Test that format moved all PIDs from CPU pool into CPU0
tasks_at_cpu0 = file_content(os.path.join(self.cpuset_path, "cpu0", "tasks")).split()
self.assertIn("1000", tasks_at_cpu0)
# test if the moving suceeded into any provate CPUS (id>0)
self.assertTrue(any("1001" in file_content(exclusive_task)
for exclusive_task in glob.glob(os.path.join(self.cpuset_path, "cpu[1-9]", "tasks"))))
self.assertIn("1002", tasks_at_cpu0)
# slapformat should remove successfully moved PIDs from the .slapos-cpu-exclusive file
self.assertEqual("", file_content(request_file_path).strip())
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment