Commit e468b364 authored by Rafael Monnerat's avatar Rafael Monnerat

[slapos.collect] Refactor and Collect information from Computer and System

Major changes
  Rename pymonitor, add dependency and simplify
  Save all information using sqllite
  Record information from all processes individually
  Implement Snapshot for Computer and Systems information
  General Clean up
parent 101406c6
...@@ -48,6 +48,7 @@ setup(name=name, ...@@ -48,6 +48,7 @@ setup(name=name,
'netifaces', # to fetch information about network devices 'netifaces', # to fetch information about network devices
'setuptools', # namespaces 'setuptools', # namespaces
'supervisor', # slapgrid uses supervisor to manage processes 'supervisor', # slapgrid uses supervisor to manage processes
'psutil',
'xml_marshaller>=0.9.3', # to unmarshall/marshall python objects to/from 'xml_marshaller>=0.9.3', # to unmarshall/marshall python objects to/from
# XML # XML
'zope.interface', # slap library implementes interfaces 'zope.interface', # slap library implementes interfaces
...@@ -102,6 +103,7 @@ setup(name=name, ...@@ -102,6 +103,7 @@ setup(name=name,
'node software = slapos.cli.slapgrid:SoftwareCommand', 'node software = slapos.cli.slapgrid:SoftwareCommand',
'node instance = slapos.cli.slapgrid:InstanceCommand', 'node instance = slapos.cli.slapgrid:InstanceCommand',
'node boot = slapos.cli.boot:BootCommand', 'node boot = slapos.cli.boot:BootCommand',
'node collect = slapos.cli.collect:CollectCommand',
# SlapOS client commands # SlapOS client commands
'console = slapos.cli.console:ConsoleCommand', 'console = slapos.cli.console:ConsoleCommand',
'configure local = slapos.cli.configure_local:ConfigureLocalCommand', 'configure local = slapos.cli.configure_local:ConfigureLocalCommand',
......
# -*- coding: utf-8 -*-
import subprocess
from time import sleep
import socket
import glob
import os
from slapos.collect import do_collect
from slapos.cli.command import must_be_root
from slapos.cli.entry import SlapOSApp
from slapos.cli.config import ConfigCommand
class CollectCommand(ConfigCommand):
"""
Collect system consumption and data and store.
"""
command_group = 'node'
def get_parser(self, prog_name):
ap = super(CollectCommand, self).get_parser(prog_name)
return ap
@must_be_root
def take_action(self, args):
configp = self.fetch_config(args)
# Make sure ipv4 is working
do_collect(configp)
from psutil import process_iter, NoSuchProcess, AccessDenied
from time import time, sleep, strftime
from slapos.collect.db import Database
from slapos.util import mkdir_p
# Local import
from snapshot import ProcessSnapshot, SystemSnapshot, ComputerSnapshot
from entity import get_user_list, Computer
def _get_time():
return strftime("%Y-%m-%d -- %H:%M:%S").split(" -- ")
def build_snapshot(proc):
try:
return ProcessSnapshot(proc)
except NoSuchProcess:
return None
def current_state(user_dict):
"""
Iterator used to apply build_snapshot(...) on every single relevant process.
A process is considered relevant if its user matches our user list, i.e.
its user is a slapos user
"""
process_list = [p for p in process_iter() if p.username() in user_dict]
for i, process in enumerate(process_list):
yield build_snapshot(process)
def do_collect(conf):
"""
Main function
The idea here is to poll system every so many seconds
For each poll, we get a list of Snapshots, holding informations about
processes. We iterate over that list to store datas on a per user basis:
Each user object is a dict, indexed on timestamp. We add every snapshot
matching the user so that we get informations for each users
"""
try:
collected_date, collected_time = _get_time()
user_dict = get_user_list(conf)
try:
for snapshot in current_state(user_dict):
if snapshot:
user_dict[snapshot.username].append(snapshot)
except (KeyboardInterrupt, SystemExit, NoSuchProcess):
raise
# XXX: we should use a value from the config file and not a hardcoded one
instance_root = conf.get("slapos", "instance_root")
mkdir_p("%s/var/data-log/" % instance_root)
database = Database("%s/var/data-log/" % instance_root)
computer = Computer(ComputerSnapshot())
computer.save(database, collected_date, collected_time)
for user in user_dict.values():
user.save(database, collected_date, collected_time)
from slapos.collect.reporter import SystemJSONReporterDumper, RawCSVDumper, SystemCSVReporterDumper
#SystemJSONReporterDumper(database).dump()
SystemCSVReporterDumper(database).dump("%s/var/data-log/" % instance_root)
RawCSVDumper(database).dump("%s/var/data-log/" % instance_root)
except AccessDenied:
print "You HAVE TO execute this script with root permission."
import sqlite3
import os
from time import time, strftime
class Database:
database_name = "collector.db"
table_list = ["user", "computer", "system", "disk"]
CREATE_USER_TABLE = "create table if not exists user " \
"(partition text, pid real, process text, " \
" cpu_percent real, cpu_time real, " \
" cpu_num_threads real, memory_percent real, " \
" memory_rss real, io_rw_counter real, " \
" io_cycles_counter real, date text, time text, " \
" reported integer NULL DEFAULT 0)"
CREATE_COMPUTER_TABLE = "create table if not exists computer "\
"(cpu_num_core real, cpu_frequency real, cpu_type text," \
" memory_size real, memory_type text, partition_list text," \
" date text, time text, reported integer NULL DEFAULT 0)"
CREATE_SYSTEM_TABLE = "create table if not exists system " \
"(loadavg real, cpu_percent real, memory_used real, "\
" memory_free real, net_in_bytes real, net_in_errors real, "\
" net_in_dropped real, net_out_bytes real, net_out_errors real, "\
" net_out_dropped real, date text, time text, " \
" reported integer NULL DEFAULT 0)"
CREATE_DISK_PARTITION = "create table if not exists disk "\
"(partition text, used text, free text, mountpoint text, " \
" date text, time text, reported integer NULL DEFAULT 0)"
INSERT_USER_TEMPLATE = "insert into user(" \
"partition, pid, process, cpu_percent, cpu_time, " \
"cpu_num_threads, memory_percent," \
"memory_rss, io_rw_counter, io_cycles_counter, " \
"date, time) values " \
"('%s', %s, '%s', %s, %s, %s, %s, %s, %s, %s, '%s', '%s' )"
INSERT_COMPUTER_TEMPLATE = "insert into computer("\
" cpu_num_core, cpu_frequency, cpu_type," \
"memory_size, memory_type, partition_list," \
"date, time) values "\
"(%s, %s, '%s', %s, '%s', '%s', '%s', '%s' )"
INSERT_DISK_TEMPLATE = "insert into disk("\
" partition, used, free, mountpoint," \
" date, time) "\
"values ('%s', %s, %s, '%s', '%s', '%s' )"
INSERT_SYSTEM_TEMPLATE = "insert into system("\
" loadavg, cpu_percent, memory_used, memory_free," \
" net_in_bytes, net_in_errors, net_in_dropped," \
" net_out_bytes, net_out_errors, net_out_dropped, " \
" date, time) values "\
"( %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, '%s', '%s' )"
def __init__(self, directory = None):
assert self.database_name is not None
self.uri = os.path.join(directory, self.database_name)
self.connection = None
self.cursor = None
self._bootstrap()
def connect(self):
self.connection = sqlite3.connect(self.uri)
self.cursor = self.connection.cursor()
def commit(self):
assert self.connection is not None
self.connection.commit()
def close(self):
assert self.connection is not None
self.cursor.close()
self.connection.close()
def _execute(self, sql):
assert self.connection is not None
return self.cursor.execute(sql)
def _bootstrap(self):
assert self.CREATE_USER_TABLE is not None
self.connect()
self._execute(self.CREATE_USER_TABLE)
self._execute(self.CREATE_COMPUTER_TABLE)
self._execute(self.CREATE_SYSTEM_TABLE)
self._execute(self.CREATE_DISK_PARTITION)
self.commit()
self.close()
def _getInsertionDateTuple(self):
return strftime("%Y-%m-d -- %H:%M:%S").split(" -- ")
###################
# Insertion methods
###################
def insertUserSnapshot(self, partition, pid, process, cpu_percent, cpu_time,
cpu_num_threads, memory_percent, memory_rss, io_rw_counter,
io_cycles_counter, insertion_date, insertion_time):
""" Insert user processes snapshots information on a database """
insertion_sql = self.INSERT_USER_TEMPLATE % \
( partition, pid, process, cpu_percent, cpu_time,
cpu_num_threads, memory_percent,
memory_rss, io_rw_counter, io_cycles_counter,
insertion_date, insertion_time)
self._execute(insertion_sql)
return insertion_sql
def insertComputerSnapshot(self, cpu_num_core, cpu_frequency, cpu_type,
memory_size, memory_type, partition_list, insertion_date, insertion_time):
"""Insert Computer general informations snapshots informations on
the database
"""
insertion_sql = self.INSERT_COMPUTER_TEMPLATE % \
( cpu_num_core, cpu_frequency, cpu_type,
memory_size, memory_type,
partition_list, insertion_date,
insertion_time)
self._execute(insertion_sql)
return insertion_sql
def insertDiskPartitionSnapshot(self, partition, used, free, mountpoint,
insertion_date, insertion_time):
""" Insert Disk Partitions informations on the database """
insertion_sql = self.INSERT_DISK_TEMPLATE % \
( partition, used, free, mountpoint,
insertion_date, insertion_time )
self._execute(insertion_sql)
return insertion_sql
def insertSystemSnapshot(self, loadavg, cpu_percent, memory_used, memory_free,
net_in_bytes, net_in_errors, net_in_dropped, net_out_bytes,
net_out_errors, net_out_dropped, insertion_date, insertion_time):
""" Include System general Snapshot on the database
"""
insertion_sql = self.INSERT_SYSTEM_TEMPLATE % \
( loadavg, cpu_percent, memory_used, memory_free,
net_in_bytes, net_in_errors, net_in_dropped,
net_out_bytes, net_out_errors, net_out_dropped,
insertion_date, insertion_time )
self._execute(insertion_sql)
return insertion_sql
def getDataScopeList(self, ignore_date=None, reported=0):
""" Get from the present unique dates from the system
Use a smaller table to sabe time.
"""
if ignore_date is not None:
where_clause = " AND date != '%s'" % ignore_date
else:
where_clause = ""
select_sql = "SELECT date, count(time) FROM system "\
" WHERE reported = %s %s GROUP BY date" % \
(reported, where_clause)
return self._execute(select_sql)
def markDayAsReported(self, date_scope, table_list):
""" Mark all registers from a certain date as reported """
update_sql = "UPDATE %s SET reported = 1 " \
"WHERE date = '%s' AND reported = 0"
for table in table_list:
self._execute(update_sql % (table, date_scope))
def select(self, table, date=None, columns="*"):
""" Query database for a full table information """
if date is not None:
where_clause = " WHERE date = '%s' " % date
else:
where_clause = ""
select_sql = "SELECT %s FROM %s %s " % (columns, table, where_clause)
return self._execute(select_sql)
#####################################################
# Export Tables as Dict for handle realtime plotting
#####################################################
def exportSystemAsDict(self, date):
""" Export system table as dictionally, formatting the output
for present it in a nicer presentation.
"""
collected_entry_dict = {}
collected_entry_dict["loadavg"] = []
collected_entry_dict["cpu_percent"] = []
collected_entry_dict["memory_used"] = []
collected_entry_dict["memory_free"] = []
collected_entry_dict["net_in_bytes"] = []
collected_entry_dict["net_in_errors"] = []
collected_entry_dict["net_in_dropped"] = []
collected_entry_dict["net_out_bytes"] = []
collected_entry_dict["net_out_errors"] = []
collected_entry_dict["net_out_dropped"] = []
first_entry = 1
last_entry_in = 0
last_entry_out = 0
entry_list = self._execute(
"SELECT loadavg, cpu_percent, memory_used, memory_free," \
" net_in_bytes, net_in_errors, net_in_dropped," \
" net_out_bytes, net_out_errors, net_out_dropped, " \
" date, time FROM system WHERE date = '%s'" % date)
for entry in entry_list:
entry_time = "%s %s" % (entry[10], str(entry[11]))
if not first_entry:
_entry_in = entry[4] - last_entry_in
last_entry_in = entry[4]
entry_in = _entry_in
_entry_out = entry[7] - last_entry_out
last_entry_out = entry[7]
entry_out = _entry_out
else:
first_entry = 0
last_entry_in = entry[4]
last_entry_out = entry[7]
continue
collected_entry_dict["loadavg"].append(
{'entry': entry[0], 'time': entry_time })
collected_entry_dict["cpu_percent"].append(
{'entry': entry[1], 'time': entry_time })
collected_entry_dict["memory_used"].append(
{'entry': entry[2]/1024, 'time': entry_time })
collected_entry_dict["memory_free"].append(
{'entry': entry[3]/1024, 'time': entry_time })
collected_entry_dict["net_in_bytes"].append(
{'entry': entry_in/1024, 'time': entry_time })
collected_entry_dict["net_in_errors"].append(
{'entry': entry[5], 'time': entry_time })
collected_entry_dict["net_in_dropped"].append(
{'entry': entry[6], 'time': entry_time })
collected_entry_dict["net_out_bytes"].append(
{'entry': entry_out/1024, 'time': entry_time })
collected_entry_dict["net_out_errors"].append(
{'entry': entry[8], 'time': entry_time })
collected_entry_dict["net_out_dropped"].append(
{'entry': entry[9], 'time': entry_time })
return collected_entry_dict
def exportDiskAsDict(self, date):
""" Export a column from a table for a given date.
"""
collected_entry_dict = {}
entry_list = self._execute(
"SELECT partition, used, free, date, time "\
"from disk WHERE date = '%s'" % (date))
for partition, used, free, __date, __time in entry_list:
partition_used = "%s-used" % partition
partition_free = "%s-free" % partition
if partition_used not in collected_entry_dict:
collected_entry_dict[partition_used] = []
if partition_free not in collected_entry_dict:
collected_entry_dict[partition_free] = []
collected_entry_dict[partition_used].append(
{'entry': int(used)/1024,
'time': "%s %s" % (__date, str(__time))})
collected_entry_dict[partition_free].append(
{'entry': int(free)/1024,
'time': "%s %s" % (__date, str(__time))})
return collected_entry_dict
def get_user_list(config):
nb_user = int(config.get("slapformat", "partition_amount"))
name_prefix = config.get("slapformat", "user_base_name")
path_prefix = config.get("slapformat", "partition_base_name")
instance_root = config.get("slapos", "instance_root")
user_dict = {name: User(name, path)
for name, path in [
(
"%s%s" % (name_prefix, nb),
"%s/%s%s" % (instance_root, path_prefix, nb)
) for nb in range(nb_user)
]
}
#user_dict['root'] = User("root", "/opt/slapgrid")
return user_dict
class User(object):
def __init__(self, name, path):
self.name = str(name)
self.path = str(path)
self.snapshot_list = []
def append(self, value):
self.snapshot_list.append(value)
def save(self, database, collected_date, collected_time):
""" Insert collected data on user collector """
database.connect()
for snapshot_item in self.snapshot_list:
database.insertUserSnapshot(self.name,
pid=snapshot_item.get("pid"),
process=snapshot_item.get("process"),
cpu_percent=snapshot_item.get("cpu_percent"),
cpu_time=snapshot_item.get("cpu_time"),
cpu_num_threads=snapshot_item.get("cpu_num_threads"),
memory_percent=snapshot_item.get("memory_percent"),
memory_rss=snapshot_item.get("memory_rss"),
io_rw_counter=snapshot_item.get("io_rw_counter"),
io_cycles_counter=snapshot_item.get("io_cycles_counter"),
insertion_date=collected_date,
insertion_time=collected_time)
database.commit()
database.close()
class Computer(dict):
def __init__(self, computer_snapshot):
self.computer_snapshot = computer_snapshot
def save(self, database, collected_date, collected_time):
database.connect()
self._save_computer_snapshot(database, collected_date, collected_time)
self._save_system_snapshot(database, collected_date, collected_time)
self._save_disk_partition_snapshot(database, collected_date, collected_time)
database.commit()
database.close()
def _save_computer_snapshot(self, database, collected_date, collected_time):
partition_list = ";".join(["%s=%s" % (x,y) for x,y in \
self.computer_snapshot.get("partition_list")])
database.insertComputerSnapshot(
cpu_num_core=self.computer_snapshot.get("cpu_num_core"),
cpu_frequency=self.computer_snapshot.get("cpu_frequency"),
cpu_type=self.computer_snapshot.get("cpu_type"),
memory_size=self.computer_snapshot.get("memory_size"),
memory_type=self.computer_snapshot.get("memory_type"),
partition_list=partition_list,
insertion_date=collected_date,
insertion_time=collected_time)
def _save_system_snapshot(self, database, collected_date, collected_time):
snapshot = self.computer_snapshot.get("system_snapshot")
database.insertSystemSnapshot(
loadavg=snapshot.get("load"),
cpu_percent=snapshot.get("cpu_percent"),
memory_used=snapshot.get("memory_used"),
memory_free=snapshot.get("memory_free"),
net_in_bytes=snapshot.get("net_in_bytes"),
net_in_errors=snapshot.get("net_in_errors"),
net_in_dropped=snapshot.get("net_in_dropped"),
net_out_bytes=snapshot.get("net_out_bytes"),
net_out_errors= snapshot.get("net_out_errors"),
net_out_dropped=snapshot.get("net_out_dropped"),
insertion_date=collected_date,
insertion_time=collected_time)
def _save_disk_partition_snapshot(self, database, collected_date, collected_time):
for disk_partition in self.computer_snapshot.get("disk_snapshot_list"):
database.insertDiskPartitionSnapshot(
partition=disk_partition.partition,
used=disk_partition.disk_size_used,
free=disk_partition.disk_size_free,
mountpoint=';'.join(disk_partition.mountpoint_list),
insertion_date=collected_date,
insertion_time=collected_time)
#!/usr/bin/env python
from slapos.collect.db import Database
from slapos.util import mkdir_p
import os.path
import json
import csv
from time import strftime
class Dumper(object):
def __init__(self, database):
self.db = database
class SystemReporter(Dumper):
def dump(self, folder):
""" Dump data """
_date = strftime("%Y-%m-%d")
self.db.connect()
for item, collected_item_list in self.db.exportSystemAsDict(_date).iteritems():
self.writeFile(item, folder, collected_item_list)
for partition, collected_item_list in self.db.exportDiskAsDict(_date).iteritems():
partition_id = partition.split("-")[0].split("/")[-1]
item = "memory_%s" % partition.split("-")[1]
self.writeFile("disk_%s_%s" % (item, partition_id), folder, collected_item_list)
self.writeFile("disk_%s_%s" % (item, partition_id), folder, collected_item_list)
self.db.close()
class SystemJSONReporterDumper(SystemReporter):
def writeFile(self, name, folder, collected_entry_list=[]):
""" Dump data as json """
file_io = open(os.path.join(folder, "system_%s.json" % name), "w")
json.dump(collected_entry_list, file_io, sort_keys=True, indent=2)
file_io.close()
class SystemCSVReporterDumper(SystemReporter):
def writeFile(self, name, folder, collected_entry_list=[]):
""" Dump data as json """
file_io = open(os.path.join(folder, "system_%s.csv" % name), "w")
csv_output = csv.writer(file_io)
csv_output.writerow(["time", "entry"])
for collected_entry in collected_entry_list:
csv_output.writerow([collected_entry["time"], collected_entry["entry"]])
file_io.close()
class RawDumper(Dumper):
""" Dump raw data in a certain format
"""
def dump(self, folder):
date = strftime("%Y-%m-%d")
self.db.connect()
for date_scope, amount in self.db.getDataScopeList(ignore_date=date):
self.writeFile("system", folder, date_scope, self.db.select("system", date_scope))
self.writeFile("user", folder, date_scope, self.db.select("user", date_scope))
self.writeFile("disk", folder, date_scope, self.db.select("disk", date_scope))
self.db.markDayAsReported(date_scope, table_list = ["system", "user", "disk"])
self.db.commit()
self.db.close()
class RawCSVDumper(RawDumper):
def writeFile(self, name, folder, date_scope, rows):
mkdir_p(os.path.join(folder, date_scope))
file_io = open(os.path.join(folder, "%s/dump_%s.csv" % (date_scope, name)), "w")
csv_output = csv.writer(file_io)
csv_output.writerows(rows)
file_io.close()
# -*- coding: utf-8 -*-
##############################################################################
#
# Copyright (c) 2010, 2011, 2012 Vifib SARL and Contributors.
# All Rights Reserved.
#
# WARNING: This program as such is intended to be used by professional
# programmers who take the whole responsibility of assessing all potential
# consequences resulting from its eventual inadequacies and bugs
# End users who are looking for a ready-to-use solution with commercial
# guarantees and support are strongly adviced to contract a Free Software
# Service Company
#
# This program is Free Software; you can redistribute it and/or
# modify it under the terms of the GNU Lesser General Public License
# as published by the Free Software Foundation; either version 2.1
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
#
##############################################################################
import psutil
import os
class _Snapshot(object):
def get(self, property, default=None):
return getattr(self, property, default)
class ProcessSnapshot(_Snapshot):
""" Take a snapshot from the running process
"""
def __init__(self, process=None):
assert type(process) is psutil.Process
ui_counter_list = process.get_io_counters()
self.username = process.username()
self.pid = process.pid
# Save full command line from the process.
self.name = "%s-%s" % (process.pid, process.create_time())
# CPU percentage, we will have to get actual absolute value
self.cpu_percent = process.get_cpu_percent(None)
# CPU Time
self.cpu_time = sum(process.get_cpu_times())
# Thread number, might not be really relevant
self.cpu_num_threads = process.get_num_threads()
# Memory percentage
self.memory_percent = process.get_memory_percent()
# Resident Set Size, virtual memory size is not accouned for
self.memory_rss = process.get_memory_info()[0]
# Byte count, Read and write. OSX NOT SUPPORTED
self.io_rw_counter = ui_counter_list[2] + ui_counter_list[3]
# Read + write IO cycles
self.io_cycles_counter = ui_counter_list[0] + ui_counter_list[1]
class SystemSnapshot(_Snapshot):
""" Take a snapshot from current system usage
"""
def __init__(self):
memory = psutil.phymem_usage()
net_io = psutil.net_io_counters()
self.memory_used = memory.used
self.memory_free = memory.free
self.memory_percent = memory.percent
self.cpu_percent = psutil.cpu_percent()
self.load = os.getloadavg()[0]
self.net_in_bytes = net_io.bytes_recv
self.net_in_errors = net_io.errin
self.net_in_dropped = net_io.dropin
self.net_out_bytes = net_io.bytes_sent
self.net_out_errors = net_io.errout
self.net_out_dropped = net_io.dropout
class DiskPartitionSnapshot(_Snapshot):
""" Take Snapshot from general disk partitions
usage
"""
def __init__(self, partition, mountpoint):
self.partition = partition
self.mountpoint_list = [ mountpoint ]
disk = psutil.disk_usage(mountpoint)
disk_io = psutil.disk_io_counters()
self.disk_size_used = disk.used
self.disk_size_free = disk.free
self.disk_size_percent = disk.percent
class ComputerSnapshot(_Snapshot):
""" Take a snapshot from computer informations
"""
def __init__(self):
self.cpu_num_core = psutil.NUM_CPUS
self.cpu_frequency = 0
self.cpu_type = 0
self.memory_size = psutil.TOTAL_PHYMEM
self.memory_type = 0
#
# Include a SystemSnapshot and a list DiskPartitionSnapshot
# on a Computer Snapshot
#
self.system_snapshot = SystemSnapshot()
self.disk_snapshot_list = []
self.partition_list = self._get_physical_disk_info()
def _get_physical_disk_info(self):
partition_dict = {}
for partition in psutil.disk_partitions():
if partition.device not in partition_dict:
usage = psutil.disk_usage(partition.mountpoint)
partition_dict[partition.device] = usage.total
self.disk_snapshot_list.append(
DiskPartitionSnapshot(partition.device,
partition.mountpoint))
return [(k, v) for k, v in partition_dict.iteritems()]
...@@ -893,7 +893,7 @@ class Slapgrid(object): ...@@ -893,7 +893,7 @@ class Slapgrid(object):
try: try:
computer_partition_id = computer_partition.getId() computer_partition_id = computer_partition.getId()
#We want to execute all the script in the report folder # We want to execute all the script in the report folder
instance_path = os.path.join(self.instance_root, instance_path = os.path.join(self.instance_root,
computer_partition.getId()) computer_partition.getId())
report_path = os.path.join(instance_path, 'etc', 'report') report_path = os.path.join(instance_path, 'etc', 'report')
...@@ -902,7 +902,7 @@ class Slapgrid(object): ...@@ -902,7 +902,7 @@ class Slapgrid(object):
else: else:
script_list_to_run = [] script_list_to_run = []
#We now generate the pseudorandom name for the xml file # We now generate the pseudorandom name for the xml file
# and we add it in the invocation_list # and we add it in the invocation_list
f = tempfile.NamedTemporaryFile() f = tempfile.NamedTemporaryFile()
name_xml = '%s.%s' % ('slapreport', os.path.basename(f.name)) name_xml = '%s.%s' % ('slapreport', os.path.basename(f.name))
...@@ -914,13 +914,13 @@ class Slapgrid(object): ...@@ -914,13 +914,13 @@ class Slapgrid(object):
invocation_list = [] invocation_list = []
invocation_list.append(os.path.join(instance_path, 'etc', 'report', invocation_list.append(os.path.join(instance_path, 'etc', 'report',
script)) script))
#We add the xml_file name to the invocation_list # We add the xml_file name to the invocation_list
#f = tempfile.NamedTemporaryFile() #f = tempfile.NamedTemporaryFile()
#name_xml = '%s.%s' % ('slapreport', os.path.basename(f.name)) #name_xml = '%s.%s' % ('slapreport', os.path.basename(f.name))
#path_to_slapreport = os.path.join(instance_path, 'var', name_xml) #path_to_slapreport = os.path.join(instance_path, 'var', name_xml)
invocation_list.append(path_to_slapreport) invocation_list.append(path_to_slapreport)
#Dropping privileges # Dropping privileges
uid, gid = None, None uid, gid = None, None
stat_info = os.stat(instance_path) stat_info = os.stat(instance_path)
#stat sys call to get statistics informations #stat sys call to get statistics informations
...@@ -946,21 +946,25 @@ class Slapgrid(object): ...@@ -946,21 +946,25 @@ class Slapgrid(object):
self.logger.exception('Cannot run usage script(s) for %r:' % self.logger.exception('Cannot run usage script(s) for %r:' %
computer_partition.getId()) computer_partition.getId())
#Now we loop through the different computer partitions to report # Now we loop through the different computer partitions to report
report_usage_issue_cp_list = [] report_usage_issue_cp_list = []
for computer_partition in computer_partition_list: for computer_partition in computer_partition_list:
try: try:
filename_delete_list = [] filename_delete_list = []
computer_partition_id = computer_partition.getId() computer_partition_id = computer_partition.getId()
instance_path = os.path.join(self.instance_root, computer_partition_id) instance_path = os.path.join(self.instance_root, computer_partition_id)
dir_reports = os.path.join(instance_path, 'var', 'xml_report') dir_report_list = [os.path.join(instance_path, 'var', 'xml_report'),
#The directory xml_report contain a number of files equal os.path.join(self.instance_root, 'var', 'xml_report',
#to the number of software instance running inside the same partition computer_partition_id)]
for dir_reports in dir_report_list:
# The directory xml_report contain a number of files equal
# to the number of software instance running inside the same partition
if os.path.isdir(dir_reports): if os.path.isdir(dir_reports):
filename_list = os.listdir(dir_reports) filename_list = os.listdir(dir_reports)
else: else:
filename_list = [] filename_list = []
#self.logger.debug('name List %s' % filename_list) # self.logger.debug('name List %s' % filename_list)
for filename in filename_list: for filename in filename_list:
...@@ -968,7 +972,7 @@ class Slapgrid(object): ...@@ -968,7 +972,7 @@ class Slapgrid(object):
if os.path.exists(file_path): if os.path.exists(file_path):
usage = open(file_path, 'r').read() usage = open(file_path, 'r').read()
#We check the validity of xml content of each reports # We check the validity of xml content of each reports
if not self.validateXML(usage, partition_consumption_model): if not self.validateXML(usage, partition_consumption_model):
self.logger.info('WARNING: The XML file %s generated by slapreport is ' self.logger.info('WARNING: The XML file %s generated by slapreport is '
'not valid - This report is left as is at %s where you can ' 'not valid - This report is left as is at %s where you can '
...@@ -984,7 +988,7 @@ class Slapgrid(object): ...@@ -984,7 +988,7 @@ class Slapgrid(object):
else: else:
self.logger.debug('Usage report %r not found, ignored' % file_path) self.logger.debug('Usage report %r not found, ignored' % file_path)
#After sending the aggregated file we remove all the valid xml reports # After sending the aggregated file we remove all the valid xml reports
for filename in filename_delete_list: for filename in filename_delete_list:
os.remove(os.path.join(dir_reports, filename)) os.remove(os.path.join(dir_reports, filename))
...@@ -997,15 +1001,15 @@ class Slapgrid(object): ...@@ -997,15 +1001,15 @@ class Slapgrid(object):
self.logger.info('computer_partition_usage_list: %s - %s' % self.logger.info('computer_partition_usage_list: %s - %s' %
(computer_partition_usage.usage, computer_partition_usage.getId())) (computer_partition_usage.usage, computer_partition_usage.getId()))
#If there is, at least, one report # If there is, at least, one report
if computer_partition_usage_list != []: if computer_partition_usage_list != []:
try: try:
#We generate the final XML report with asXML method # We generate the final XML report with asXML method
computer_consumption = self.asXML(computer_partition_usage_list) computer_consumption = self.asXML(computer_partition_usage_list)
self.logger.info('Final xml report: %s' % computer_consumption) self.logger.info('Final xml report: %s' % computer_consumption)
#We test the XML report before sending it # We test the XML report before sending it
if self.validateXML(computer_consumption, computer_consumption_model): if self.validateXML(computer_consumption, computer_consumption_model):
self.logger.info('XML file generated by asXML is valid') self.logger.info('XML file generated by asXML is valid')
slap_computer_usage.reportUsage(computer_consumption) slap_computer_usage.reportUsage(computer_consumption)
......
from psutil import *
from psutil._error import NoSuchProcess, AccessDenied
from time import time, sleep
from datetime import datetime
import os
import ConfigParser
# Local import
from snapshot import Snapshot
from user import User
# XXX : this is BAAAAD !!
# ***************** Config *****************
GLOBAL_SLAPOS_CONFIGURATION = os.environ.get(
'SLAPOS_CONFIGURATION',
'/etc/opt/slapos/slapos.cfg'
)
# ******************************************
# XXX : should rebuild this to make it more explicit
def build_user_list():
config = ConfigParser.SafeConfigParser()
config.read(GLOBAL_SLAPOS_CONFIGURATION)
nb_user = int(config.get("slapformat", "partition_amount"))
name_prefix = config.get("slapformat", "user_base_name")
path_prefix = config.get("slapformat", "partition_base_name")
instance_root = config.get("slapos", "instance_root")
return {name: User(name, path)
for name, path in [
(
"%s%s" % (name_prefix, nb),
"%s/%s%s" % (instance_root, path_prefix, nb)
) for nb in range(nb_user)
]
}
def build_snapshot(proc):
assert type(proc) is Process
try:
return Snapshot(
proc.username,
# CPU percentage, we will have to get actual absolute value
cpu = proc.get_cpu_percent(None),
# Thread number, might not be really relevant
cpu_io = proc.get_num_threads(),
# Resident Set Size, virtual memory size is not accounted for
ram = proc.get_memory_info()[0],
# Byte count, Read and write. OSX NOT SUPPORTED
hd = proc.get_io_counters()[2] + proc.get_io_counters()[3],
# Read + write IO cycles
hd_io = proc.get_io_counters()[0] + proc.get_io_counters()[1],
)
except NoSuchProcess:
return None
def current_state():
"""
Iterator used to apply build_snapshot(...) on every single relevant process.
A process is considered relevant if its user matches our user list, i.e.
its user is a slapos user
"""
users = build_user_list()
pList = [p for p in process_iter() if p.username in users]
length = len(pList) / 5
for i, process in enumerate(pList):
if length > 0 and i % length == 0:
sleep(.5)
yield build_snapshot(process)
def main():
"""
Main function
The idea here is to poll system every so many seconds
For each poll, we get a list of Snapshots, holding informations about
processes. We iterate over that list to store datas on a per user basis:
Each user object is a dict, indexed on timestamp. We add every snapshot
matching the user so that we get informations for each users
"""
try:
while True:
users = build_user_list()
key = time()
try:
for snapshot in current_state():
if snapshot:
user = users[snapshot.username]
if key in user:
user[key] += snapshot
else:
user[key] = snapshot
except NoSuchProcess:
continue
except (KeyboardInterrupt, SystemExit):
break
# XXX: we should use a value from the config file and not a hardcoded one
for user in users.values():
user.dumpSummary(user.path + '/var/xml_report/consumption.xml')
except AccessDenied:
print "You HAVE TO execute this script with root permission."
if __name__ == '__main__':
main()
#!/usr/bin/env python
import os
from datetime import datetime
from time import sleep
import gzip
import sys
class Reporter:
def run(self, *args):
json = self._aggregate(*args)
if self._send(json):
self._archive(path_list)
else:
self._fallback(*args)
def _aggregate(self, paths):
json = ""
if paths:
for path in paths:
print ( path )
with open(path, 'r') as f:
json += f.read()
return json
# XXX : implement
def _send(self, json_str):
return False
def _archive(self, paths, archive_dir):
for path in paths:
dirname = os.path.dirname(path)
basename = os.path.basename(path)
f = open(path, 'r')
suffix = datetime.now() + '.gz'
zipfile = gzip.open(archive_dir + basename + suffix, 'w')
zipfile.writelines(f)
os.remove(path)
# XXX : set a more appropriate timer (like 1h or something)
def _fallback(self, *args):
sleep(30)
self.run(*args)
def check(args):
if not args:
print('missing argument : filename list')
sys.exit(-1)
for arg in args:
if not os.path.isfile(arg):
print(arg + ' is not a valid path')
sys.exit(-1)
if __name__ == '__main__':
reporter = Reporter()
# basically, we are waiting for a list of paths there
args = sys.argv[1:]
check(args)
reporter.run(args)
class Snapshot:
def __init__(self, username, cpu = 0, cpu_io = 0, ram = 0, hd = 0, hd_io = 0):
self.username = username
self.cpu = cpu
self.cpu_io = cpu_io
self.ram = ram
self.hd = hd
self.hd_io = hd_io
def __repr__(self):
return "%s : { cpu : {%s, %s}, ram : {%s, %s}, hd : {%s, %s}, net : {%s, %s} }" % (
self.username,
self.cpu, self.cpu_io,
self.ram, self.ram_io,
self.hd, self.hd_io,
self.net, self.net_io
)
def __add__(self, other):
assert self.username == other.username
return Snapshot(
self.username,
self.cpu + other.cpu,
self.cpu_io + other.cpu_io,
self.ram + other.ram,
self.hd + other.hd,
self.hd_io + other.hd_io,
)
def matters(self):
return self.cpu != 0 or self.cpu_io != 0 or self.ram != 0 or \
self.hd != 0 or self.hd_io != 0
from snapshot import Snapshot
class User(dict):
def __init__(self, name, path):
self.name = str(name)
self.path = str(path)
def dump(self, path):
with open(path, 'a') as f:
for v in self.values():
if v.matters():
f.write(v.__repr__() + "\n")
def dumpSummary(self, path):
summary = reduce(lambda x, y: x+y, self.values(), Snapshot(self.name))
if summary.matters():
with open(path, 'a') as f:
f.write(summary.__repr__() + "\n")
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment