Commit dd98131b authored by Gabriel Monnerat's avatar Gabriel Monnerat

merge master into slave_instance branch

parents 144605b4 6d3bed94
......@@ -3,6 +3,7 @@ develop = .
parts =
slapos
pyflakes
test
find-links =
http://www.nexedi.org/static/packages/source/slapos.buildout/
......@@ -44,5 +45,10 @@ eggs =
rstctl
interpreter = python
[test]
recipe = zc.recipe.testrunner
eggs =
slapos.core
[versions]
zc.buildout = 1.5.3-dev-SlapOS-005
......@@ -29,6 +29,13 @@ from AccessControl import ClassSecurityInfo
from Products.ERP5Type import Permissions
from Products.ERP5.Document.Item import Item
from lxml import etree
import collections
class DisconnectedSoftwareTree(Exception):
pass
class CyclicSoftwareTree(Exception):
pass
class SoftwareInstance(Item):
"""
......@@ -60,3 +67,44 @@ class SoftwareInstance(Item):
value = element.text
result_dict[key] = value
return result_dict
security.declareProtected(Permissions.AccessContentsInformation,
'checkNotCyclic')
def checkNotCyclic(self, graph):
# see http://neopythonic.blogspot.com/2009/01/detecting-cycles-in-directed-graph.html
todo = set(graph.keys())
while todo:
node = todo.pop()
stack = [node]
while stack:
top = stack[-1]
for node in graph[top]:
if node in stack:
raise CyclicSoftwareTree
if node in todo:
stack.append(node)
todo.remove(node)
break
else:
node = stack.pop()
return True
security.declareProtected(Permissions.AccessContentsInformation,
'checkConnected')
def checkConnected(self, graph, root):
size = len(graph)
visited = set()
to_crawl = collections.deque(graph[root])
while to_crawl:
current = to_crawl.popleft()
if current in visited:
continue
visited.add(current)
node_children = set(graph[current])
to_crawl.extend(node_children - visited)
# add one to visited, as root won't be visited, only children
# this is false positive in case of cyclic graphs, but they are
# anyway wrong in Software Instance trees
if size != len(visited) + 1:
raise DisconnectedSoftwareTree
return True
......@@ -69,9 +69,12 @@ if is_slave == True:\n
else:\n
software_instance_portal_type = "Software Instance"\n
\n
# Get root software instance\n
# graph allows to "simulate" tree change after requested operation\n
graph = {}\n
# Get root software instance and create initial graph\n
predecessor_software_instance = software_instance\n
while (predecessor_software_instance is not None):\n
graph[predecessor_software_instance.getUid()] = predecessor_software_instance.getPredecessorUidList()\n
root_software_instance = predecessor_software_instance\n
predecessor_software_instance = predecessor_software_instance.getPredecessorRelatedValue(\n
portal_type="Software Instance")\n
......@@ -89,6 +92,11 @@ request_software_instance = software_instance.portal_catalog.getResultValue(\n
root_uid=root_software_instance.getUid(),\n
)\n
\n
# above query does not find root software instance, but as this case is easy\n
# to find, just check if such request does not come and raise\n
if root_software_instance.getTitle() == requested_partition_reference:\n
raise ValueError(\'It is disallowed to request root software instance\')\n
\n
if (request_software_instance is None):\n
if (portal.portal_activities.countMessageWithTag(tag) > 0):\n
# The software instance is already under creation but can not be fetched from catalog\n
......@@ -134,6 +142,8 @@ else:\n
predecessor_software_instance.edit(\n
predecessor_uid_list=predecessor_uid_list,\n
activate_kw={\'tag\': tag},)\n
graph[predecessor_software_instance.getUid()] = predecessor_uid_list\n
\n
if state == \'started\':\n
request_software_instance.startRequested()\n
request_software_instance.activate(after_tag=tag).requestStartComputerPartition()\n
......@@ -142,8 +152,17 @@ else:\n
request_software_instance.activate(after_tag=tag).requestStopComputerPartition()\n
else:\n
raise ValueError(\'State %r is not supported\' % state)\n
\n
predecessor_list = software_instance.getPredecessorList() + [request_software_instance.getRelativeUrl()]\n
# Add requested software instance to graph if does not exists there yet\n
if not request_software_instance.getUid() in graph:\n
graph[request_software_instance.getUid()] = []\n
\n
# update graph to reflect requested operation\n
graph[software_instance.getUid()] = software_instance.getPredecessorUidList() + [request_software_instance.getUid()]\n
\n
# check if all elements are still connected\n
software_instance.checkNotCyclic(graph)\n
software_instance.checkConnected(graph, root_software_instance.getUid())\n
\n
software_instance.edit(\n
predecessor_list=predecessor_list,\n
......
<?xml version="1.0"?>
<ZopeData>
<record id="1" aka="AAAAAAAAAAE=">
<pickle>
<global name="ZopePageTemplate" module="Products.PageTemplates.ZopePageTemplate"/>
</pickle>
<pickle>
<dictionary>
<item>
<key> <string>_bind_names</string> </key>
<value>
<object>
<klass>
<global name="NameAssignments" module="Shared.DC.Scripts.Bindings"/>
</klass>
<tuple/>
<state>
<dictionary>
<item>
<key> <string>_asgns</string> </key>
<value>
<dictionary>
<item>
<key> <string>name_subpath</string> </key>
<value> <string>traverse_subpath</string> </value>
</item>
</dictionary>
</value>
</item>
</dictionary>
</state>
</object>
</value>
</item>
<item>
<key> <string>_text</string> </key>
<value> <unicode encoding="cdata"><![CDATA[
<tal:block\n
xmlns:tal="http://xml.zope.org/namespaces/tal"\n
xmlns:metal="http://xml.zope.org/namespaces/metal"\n
xmlns:i18n="http://xml.zope.org/namespaces/i18n"\n
tal:define="field_id here/getId;\n
form_id python: here.getForm().id;\n
selection_name here/getSelectionName;\n
selection here/getSelection;\n
real_context here/getContext;\n
global portal_url_string here/getPortalUrlString;\n
context_url real_context/absolute_url;\n
md5_string here/getMD5Checksum;\n
hide_rows_on_no_search_criterion here/isHideRowsOnNoSearchCriterion;\n
is_domain_tree_mode here/isDomainTreeMode;\n
is_report_tree_mode here/isReportTreeMode;\n
global is_domain_tree_supported here/isDomainTreeSupported;\n
global is_report_tree_supported here/isReportTreeSupported;\n
global is_gadget_mode request/is_gadget_mode | nothing;\n
show_select_column here/showSelectColumn;\n
show_anchor_column here/showAnchorColumn;\n
show_search_line here/showSearchLine;\n
is_web_mode real_context/isWebMode | nothing;\n
is_dialog_mode request/dialog_mode | nothing;\n
display_style_list here/getDisplayStyleList;\n
list_style here/getListboxDisplayStyle;\n
global_search_column here/getGlobalSearchColumn;\n
global_search_column_script string:Base_doSelect;\n
show_global_search python: global_search_column not in (\'\', None);\n
line_list here/query;\n
listbox_max_lines python: int(here.getMaxLineNumber());\n
total_line python: int(here.total_size);\n
current_page python: int(here.current_page) + 1;\n
current_page_max python: listbox_max_lines * current_page;\n
current_page_start python: (listbox_max_lines * (current_page - 1)) + 1;\n
current_page_stop python: (total_line < current_page_max) and total_line or current_page_max;\n
form_url string:${context_url}/${form_id};\n
need_pagination python: total_line > listbox_max_lines;\n
show_list_style_selection python: len(display_style_list) > 1;\n
show_listbox_tree_mode_selection python: not is_gadget_mode and \n
(is_domain_tree_supported or is_report_tree_supported);\n
show_list_action_link python: here.field.get_value(\'list_action\');\n
page_navigation_template python: request.get(\'page_navigation_template\', here.getPageNavigationTemplate());\n
is_slider_mode python: \'Slider\' in page_navigation_template;\n
is_default_listbox_field python: field_id==\'listbox\';\n
field_prefix python: \'\';">\n
\n
<!-- Define hidden input. -->\n
<input type="hidden" \n
name="list_selection_name" \n
value="default" \n
tal:attributes="value selection_name" />\n
<input type="hidden" \n
name="list_selection_name" \n
value="default"\n
tal:attributes="value selection_name;\n
name string:${field_id}_list_selection_name" />\n
<input tal:condition="md5_string" \n
type="hidden" \n
name="md5_object_uid_list" \n
value="checksum" \n
tal:attributes="value md5_string" />\n
<input tal:condition="form_id" \n
type="hidden" \n
name="form_id" \n
tal:attributes="value form_id" \n
tal:replace="nothing"/>\n
<input tal:condition="field_id" \n
type="hidden" \n
name="field_id" \n
tal:attributes="value field_id" \n
tal:replace="nothing"/>\n
\n
<tal:block tal:condition="is_gadget_mode">\n
<tal:block tal:define="global box_relative_url python: request.get(\'box_relative_url\', \'\');\n
global box python: real_context.restrictedTraverse(box_relative_url); \n
global box_id python: \'%s_content\' %box_relative_url.replace(\'/\', \'_\');\n
global dom_id python: request.get(\'dom_id\',None) or box_id;\n
global field_prefix string:${box_id}_">\n
<input tal:condition="python:form_id" \n
type="hidden" \n
name="gadget_form_id"\n
tal:attributes="value form_id" />\n
</tal:block>\n
</tal:block>\n
\n
<div class="listbox-container">\n
\n
<div class="listbox-tree">\n
\n
<!-- Domain Report Tree mode -->\n
<div class="listbox-domain-tree-container" \n
tal:condition="is_domain_tree_mode">\n
<tal:block tal:define="selected_domain_path here/getSelectedDomainPath">\n
\n
<!-- Select domain node -->\n
<select name="domain_root_url"\n
tal:attributes="onChange string:submitAction(this.form, \'${context_url}/setDomainRoot\')">\n
<tal:block tal:repeat="c here/getDomainRootList">\n
<option value="base_domain"\n
tal:define="path python: c[0]; title python: c[1]"\n
tal:attributes="selected python: path == selected_domain_path; value path"\n
tal:content="title"\n
i18n:translate="" i18n:domain="ui"/>\n
</tal:block>\n
</select>\n
\n
<!-- Domain node contents -->\n
<table cellpadding="0"\n
summary="This table contains the domain tree"\n
class="listbox-table-domain-tree"\n
tal:attributes="class string:${field_id}-table-domain-tree"\n
tal:define="report_tree_list python: here.makeReportTreeList(report_path = selected_domain_path, unfolded_list = selection.getDomainList(), is_report_opened = False, sort_on=((\'int_index\', \'ASC\'),));\n
total_depth python: max([report_tree.depth for report_tree in report_tree_list] + [-1])">\n
<tr tal:repeat="report_tree report_tree_list">\n
<tal:block tal:repeat="i python: range(report_tree.depth)">\n
<td width="12" nowrap="nowrap">&nbsp;</td>\n
</tal:block>\n
<td colspan="1" \n
class="listbox-table-domain-tree-cell"\n
tal:attributes="colspan python: total_depth - report_tree.depth + 1">\n
<button type="submit"\n
name="foldDomain:method"\n
class="tree-open"\n
tal:condition="report_tree/is_open"\n
tal:content="report_tree/obj/getCompactTranslatedTitle"\n
tal:attributes="value string:${report_tree/domain_url}.${report_tree/depth}"/>\n
<button type="submit"\n
name="unfoldDomain:method"\n
class="tree-closed"\n
tal:condition="not: report_tree/is_open"\n
tal:content="report_tree/obj/getCompactTranslatedTitle"\n
tal:attributes="value string:${report_tree/domain_url}.${report_tree/depth}"/>\n
</td>\n
</tr>\n
</table>\n
</tal:block>\n
</div>\n
</div>\n
\n
<div class="listbox-content" \n
tal:attributes="class python: test(not is_domain_tree_mode, \'listbox-content maximal-width\', \'listbox-content listbox-content-fixed-width\')">\n
\n
<div class="listbox-head">\n
\n
<div class="listbox-head-spacer"></div>\n
\n
<div class="listbox-head-content">\n
\n
<!-- Listbox head (in left) -->\n
<div class="listbox-head-title">\n
\n
<!-- List tree mode choice -->\n
<div class="listbox-header-box"\n
tal:condition="python: show_listbox_tree_mode_selection and not is_web_mode">\n
<tal:block metal:use-macro="container/ListBox_asHTMLLibrary/macros/listbox_tree_mode_selection"/>\n
</div>\n
\n
<!-- Listbox title -->\n
<div class="listbox-header-box">\n
<tal:block metal:use-macro="container/ListBox_asHTMLLibrary/macros/listbox_title"/>\n
</div>\n
\n
<!-- Number of rows in ERP5 mode -->\n
<div class="listbox-header-box"\n
tal:condition="python: not is_web_mode">\n
<tal:block metal:use-macro="container/ListBox_asHTMLLibrary/macros/listbox_number_of_records"/>\n
</div>\n
\n
<!-- List style display mode -->\n
<div class="listbox-header-box"\n
tal:condition="python: show_list_style_selection and not is_web_mode">\n
<tal:block metal:use-macro="container/ListBox_asHTMLLibrary/macros/listbox_display_style_selection"/>\n
</div>\n
\n
</div>\n
\n
<!-- Listbox nagivation (in right) -->\n
<div class="listbox-head-navigation">\n
\n
<!--Show search result in web mode-->\n
<div class="listbox-header-box"\n
tal:condition="python: is_web_mode">\n
<tal:block metal:use-macro="container/ListBox_asHTMLLibrary/macros/listbox_number_of_records"/>\n
</div>\n
\n
<!--Page navigation -->\n
<div class="listbox-header-box"\n
tal:condition="python: need_pagination and not is_web_mode">\n
<tal:block metal:use-macro="container/ListBox_asHTMLLibrary/macros/listbox_navigation"/>\n
</div>\n
\n
</div>\n
\n
\n
</div>\n
\n
\n
</div>\n
\n
<div class="listbox-body">\n
<table class="listbox"\n
tal:attributes="class python: \'listbox %s %s-%s\' %(field_id, field_id, list_style)"> \n
<thead>\n
<!--Column title -->\n
<tr class="listbox-label-line"> \n
\n
<!--Report tree-->\n
<th tal:condition="is_report_tree_mode"\n
class="listbox-table-report-tree-selection-cell">\n
<select name="report_root_url"\n
tal:attributes="onChange string:submitAction(this.form, \'${context_url}/setReportRoot\')">\n
<tal:block tal:repeat="c here/getReportRootList">\n
<option value="base_domain"\n
tal:define="path python: c[0]; title python: c[1]"\n
tal:attributes="selected python: path == here.getSelectedReportPath(); value path"\n
tal:content="title" i18n:domain="ui" i18n:translate="">Domain</option>\n
</tal:block>\n
</select>\n
</th>\n
\n
<!-- Anchor cell -->\n
<th class="listbox-table-anchor-cell" tal:condition="show_anchor_column">&nbsp;</th>\n
\n
<!-- Select cell -->\n
<th tal:condition="python: show_select_column"\n
class="listbox-table-select-cell">\n
\n
<input class="listbox-check-all"\n
type="image"\n
name="checkAll:method" value="1"\n
alt="Check All" title="Check All"\n
tal:attributes="name string:${field_id}_checkAll:method;\n
src string:${portal_url_string}/images/checkall.png"\n
i18n:domain="ui" i18n:attributes="title" />\n
&nbsp;\n
<input class="listbox-uncheck-all"\n
type="image" \n
name="uncheckAll:method" value="1"\n
alt="Uncheck All" title="Uncheck All"\n
tal:attributes="src string:${portal_url_string}/images/decheckall.png;\n
name string:${field_id}_uncheckAll:method;"\n
i18n:domain="ui" i18n:attributes="title" /> \n
\n
</th>\n
\n
<!-- Label column row -->\n
<tal:block tal:repeat="value here/getLabelValueList">\n
<tal:block tal:define="sql python: value[0];\n
title python: value[1];\n
sort_order python: value[2]">\n
\n
<th tal:condition="sql" class="listbox-table-header-cell"\n
tal:define="bt_class python: sort_order==\'ascending\' and \'sort-button sort-button-asc\' \n
or sort_order == \'descending\' and \'sort-button sort-button-desc\' \n
or \'sort-button\';\n
bt_title python: sort_order==\'ascending\' and \'Ascending Display\'\n
or sort_order==\'descending\' and \'Descending Display\'\n
or \'Sort\';\n
listbox_field_id string:${field_id}.${sql};"> \n
<!-- Button in normal view -->\n
<button tal:condition="not:is_gadget_mode"\n
type="submit"\n
name="setSelectionQuickSortOrder:method"\n
tal:attributes="title title;\n
value listbox_field_id;\n
class bt_class;"\n
i18n:domain="ui" i18n:attributes="title">\n
<span i18n:translate="" i18n:domain="ui" tal:content="title"/>\n
</button>\n
\n
<!-- Button in gadget mode -->\n
<button tal:condition="is_gadget_mode" \n
tal:define ="params python: {\'setSelectionQuickSortOrder:method\':listbox_field_id};"\n
type="button" \n
tal:attributes="title title; \n
onclick python: real_context.KnowledgePad_generateAjaxCall(context_url+\'/\'+form_id,box,dom_id,params);\n
class bt_class;"\n
i18n:domain="ui" i18n:attributes="title">\n
<span i18n:translate="" i18n:domain="ui" tal:content="title"/>\n
</button>\n
\n
<!-- Icon showing sort order -->\n
<img src="images/transparent-image.gif"\n
tal:attributes="alt bt_title;\n
title bt_title;\n
class bt_class;\n
src string:${portal_url_string}/images/transparent-image.gif"\n
i18n:domain="ui" i18n:attributes="title;alt" />\n
\n
</th>\n
\n
<th class="listbox-table-header-cell"\n
tal:condition="not: sql" \n
tal:content="title" \n
i18n:domain="ui" i18n:translate=""/>\n
\n
</tal:block>\n
</tal:block>\n
</tr>\n
\n
<!--Search column input -->\n
<tr tal:condition="python: show_search_line or is_report_tree_mode"\n
class="listbox-search-line">\n
\n
<!--Report Tree -->\n
<tal:block tal:condition="is_report_tree_mode">\n
<th class="listbox-table-report-tree-selection-cell"\n
colspan="1"\n
tal:attributes="colspan python: show_search_line and 1 or (len(here.getSelectedColumnList()) + show_select_column + show_anchor_column + 1)"\n
tal:define="selection_index here/getSelectionIndex;\n
index python: selection_index is not None and \'&amp;selection_index=%s\' % selection_index or \'\';\n
is_report_opened python: int(not here.getSelection().isReportOpened());\n
requested_selection_name here/getRequestedSelectionName;\n
url here/getUrl;\n
report_depth python: selection.getParams().get(\'report_depth\', request.get(\'report_depth\', 0))">\n
<tal:block tal:repeat="i python: range(0, 6)">&nbsp;\n
<a href="?selection_name=default&amp;selection_index=0&amp;report_depth:int=0"\n
tal:attributes="href string:${url}?selection_name=${requested_selection_name}${index}&amp;report_depth:int=${i};\n
class python: test(i==report_depth, \'selected\', \'\');"\n
tal:content="i"/>\n
</tal:block>&nbsp;-&nbsp;\n
<a \n
href="?selection_name=default&amp;selection_index=0&amp;is_report_opened:int=0"\n
tal:attributes="href string:${url}?selection_name=${requested_selection_name}${index}&amp;is_report_opened:int=${is_report_opened}"\n
tal:content="python: is_report_opened and \'Show\' or \'Hide\'"\n
i18n:domain="ui" i18n:translate="">Show</a>\n
</th>\n
</tal:block>\n
\n
<!--Anchor cell -->\n
<th class="listbox-table-anchor-cell" tal:condition="show_anchor_column">&nbsp;</th>\n
\n
<!--Select cell -->\n
<th tal:condition="show_select_column"\n
class="listbox-table-select-cell">\n
<input class="listbox-select-action" type="image"\n
title="Action" alt="Action" name="Base_doSelect:method"\n
tal:attributes="class string:${field_id}-select-action;\n
src string:${portal_url_string}/images/exec16.png"\n
i18n:domain="ui" i18n:attributes="title" />\n
</th>\n
\n
<!-- Real search columns headers -->\n
<th class="listbox-table-filter-cell"\n
tal:condition="show_search_line"\n
tal:repeat="value here/getSearchValueList">\n
<tal:block tal:define="alias python: value[0];\n
param python: value[1];\n
search_field python: value[2]"\n
tal:condition="alias">\n
<!-- Render search field -->\n
<tal:block tal:condition="python: search_field is not None"\n
tal:replace="structure python: search_field.render(value=param, key=alias)"/>\n
\n
<tal:block tal:condition="python: search_field is None">\n
<input tal:condition="python: not is_gadget_mode" \n
size="5"\n
type="text" \n
tal:attributes="name string:${field_id}_${alias}; \n
value param"\n
onkeypress="submitFormOnEnter(event, this.form, \'Base_doSelect\');"/>\n
<!-- Search for gadget mode -->\n
<input tal:condition="python: is_gadget_mode" \n
tal:define ="params python: {alias:\'this.value\'};"\n
size="8"\n
type="text" \n
tal:attributes=\'value python: selection.getParams().get(alias,"");\n
onkeypress python:"if(event.keyCode==13){" + real_context.KnowledgePad_generateAjaxCall(context_url+"/"+form_id,box,dom_id,params).replace("\\"this.value\\"","this.value")+ "return false;;}"\'/>\n
</tal:block>\n
</tal:block>\n
</th>\n
</tr>\n
</thead>\n
\n
<!-- Stats -->\n
<tfoot tal:condition="python:here.showStat() and not hide_rows_on_no_search_criterion">\n
\n
<tr class="listbox_stat_line"\n
tal:attributes="class string:${field_id}_stat_line listbox-stat-line">\n
<td tal:condition="is_report_tree_mode" >&nbsp;</td>\n
<td class="listbox-table-anchor-cell" tal:condition="show_anchor_column">&nbsp;</td>\n
<td class="listbox-table-select-cell" tal:condition="show_select_column">&nbsp;</td>\n
<tal:block tal:repeat="value here/getStatValueList">\n
<td align="left"\n
class="listbox-table-data-cell"\n
tal:define="original_value python: value[0]; processed_value python: value[1]"\n
tal:content="structure processed_value" />\n
</tal:block>\n
</tr>\n
</tfoot>\n
\n
<tbody>\n
\n
<!-- Render listbox data-->\n
<tal:block tal:condition="line_list"\n
tal:define="checked_uid_set here/getCheckedUidSet">\n
<tr tal:repeat="line line_list" \n
tal:attributes=" \n
class python: line.getRowCSSClassName() or \'%s %s\' %(\'%s-data-line-%s\' %(field_id, repeat[\'line\'].index) ,test(repeat[\'line\'].index % 2, \'DataB\', \'DataA\'));">\n
\n
<tal:block tal:define="render_result line/render">\n
\n
<!--Report tree column -->\n
<td tal:condition="is_report_tree_mode"\n
class="listbox-table-report-tree-selection-cell" \n
tal:define="section_name python: line.getDomainTitle()">\n
<a tal:condition="section_name"\n
tal:define="method_id python: line.isOpen() and \'foldReport\' or \'unfoldReport\'"\n
tal:attributes="href string:${method_id}?report_url=${line/getDomainUrl}&amp;form_id=${form_id}&amp;list_selection_name=${selection_name};\n
class python:test(line.isOpen(), \'tree-open\', \'tree-closed\');\n
style python:\'white-space: nowrap;; margin-left: %spx\' % (line.getDepth() * 15)"\n
tal:content="section_name"/>\n
</td>\n
\n
<!--Anchor cell -->\n
<td tal:condition="show_anchor_column"\n
class="listbox-table-anchor-cell">\n
<!--Use [0][4] ? :(-->\n
<a href="#" tal:attributes="href python:render_result[0][4]">\n
<img src="document_icon.gif" alt="document" \n
tal:attributes="src string:${portal_url_string}/images/line_clickable.png" />\n
</a>\n
</td>\n
\n
<!--Select cell -->\n
<td tal:condition="show_select_column"\n
class="listbox-table-select-cell">\n
<input tal:condition="python: not line.isSummary()"\n
type="radio" id="listbox_cb_1" name="uids:list"\n
tal:attributes="checked python: line.getUid() in checked_uid_set;\n
value line/getUid;\n
id string:${field_id}_cb_${line/getUid}" /> \n
</td>\n
\n
<!-- Data cells -->\n
<tal:block tal:repeat="value render_result">\n
<td class="listbox-table-data-cell"\n
tal:define="html python: value[0];">\n
<input tal:condition="not: repeat/value/index"\n
type="hidden" value="1" name="listbox_uid:list"\n
tal:attributes="value python: line.getUid() or \'\';\n
name string:${field_prefix}${field_id}_uid:list" />\n
<tal:block tal:replace="structure html"/>\n
</td>\n
</tal:block>\n
</tal:block>\n
</tr>\n
</tal:block>\n
\n
<!-- Hide row on no search criterion-->\n
<tr tal:condition="hide_rows_on_no_search_criterion"\n
class="listbox_missing_search_criterion">\n
<td tal:attributes="colspan python: len(here.getSearchValueList()) + 1">\n
<span i18n:translate="" i18n:domain="ui">\n
To display actual content, please fill in one or more search criterion.\n
</span>\n
</td>\n
</tr>\n
\n
<!-- No results. -->\n
<tr tal:condition="python: total_line == 0 and not hide_rows_on_no_search_criterion">\n
<td tal:attributes="colspan python: len(here.getSearchValueList()) + 1"\n
class="listbox-table-no-result-row">\n
<span i18n:translate="" i18n:domain="ui">\n
No result.\n
</span>\n
</td>\n
</tr>\n
</tbody>\n
\n
</table>\n
</div>\n
\n
<div class="listbox-footer">\n
\n
<!-- List tree mode choice -->\n
<div class="listbox-footer-box"\n
tal:condition="python: show_listbox_tree_mode_selection and is_web_mode">\n
<tal:block metal:use-macro="container/ListBox_asHTMLLibrary/macros/listbox_tree_mode_selection"/>\n
</div>\n
\n
<!-- List style display mode -->\n
<div class="listbox-footer-box"\n
tal:condition="python: show_list_style_selection and is_web_mode">\n
<tal:block metal:use-macro="container/ListBox_asHTMLLibrary/macros/listbox_display_style_selection"/>\n
</div>\n
\n
<!-- Full text search -->\n
<div class="listbox-footer-box"\n
tal:condition="show_global_search">\n
<tal:block metal:use-macro="container/ListBox_asHTMLLibrary/macros/listbox_global_search"/>\n
</div>\n
\n
<!--Page navigation in web mode floating in right (slider) or whole width (text) -->\n
<div class="listbox-footer-box"\n
tal:condition="python: need_pagination and is_web_mode"\n
tal:attributes="style python: test(is_slider_mode, \'float:right\', \'width:100%\')">\n
<tal:block metal:use-macro="container/ListBox_asHTMLLibrary/macros/listbox_navigation"/>\n
</div>\n
\n
</div>\n
</div>\n
</div>\n
\n
</tal:block>
]]></unicode> </value>
</item>
<item>
<key> <string>content_type</string> </key>
<value> <string>text/html</string> </value>
</item>
<item>
<key> <string>expand</string> </key>
<value> <int>0</int> </value>
</item>
<item>
<key> <string>id</string> </key>
<value> <string>ListBox_asHTML</string> </value>
</item>
<item>
<key> <string>output_encoding</string> </key>
<value> <string>iso-8859-15</string> </value>
</item>
<item>
<key> <string>title</string> </key>
<value> <unicode></unicode> </value>
</item>
</dictionary>
</pickle>
</record>
</ZopeData>
......@@ -391,11 +391,12 @@ class TestVifibSlapWebService(testVifibMixin):
self.assertEqual(1, len(software_instance_list))
software_instance = software_instance_list[0]
sequence.edit(
software_instance_uid=software_instance.getUid(),
software_instance_reference=software_instance.getReference(),
hosting_subscription_uid=software_instance.getAggregateRelatedValue(
portal_type='Sale Order Line').getAggregateValue(
portal_type='Hosting Subscription').getUid())
root_software_instance_title=software_title,
software_instance_uid=software_instance.getUid(),
software_instance_reference=software_instance.getReference(),
hosting_subscription_uid=software_instance.getAggregateRelatedValue(
portal_type='Sale Order Line').getAggregateValue(
portal_type='Hosting Subscription').getUid())
def stepSetSelectedComputerPartition(self, sequence, **kw):
"""Sets in sequence computer partition parameters related to current
......@@ -1255,6 +1256,16 @@ class TestVifibSlapWebService(testVifibMixin):
def stepSelectRequestedReferenceChildrenBChild(self, sequence, **kw):
sequence.edit(requested_reference='children_b_child')
def stepSelectRequestedReferenceRootSoftwareInstanceTitle(self, sequence,
**kw):
sequence.edit(requested_reference=sequence['root_software_instance_title'])
def stepSelectRequestedReferenceB(self, sequence, **kw):
sequence.edit(requested_reference='b')
def stepSelectRequestedReferenceC(self, sequence, **kw):
sequence.edit(requested_reference='c')
def stepSelectEmptyRequestedParameterDict(self, sequence, **kw):
sequence.edit(requested_parameter_dict=None)
......@@ -3136,6 +3147,11 @@ class TestVifibSlapWebService(testVifibMixin):
def stepCheckSoftwareInstanceAndRelatedComputerPartition(self,
sequence, **kw):
self.stepCheckSoftwareInstanceAndRelatedComputerPartitionNoPackingListCheck(sequence, **kw)
self._checkSoftwareInstanceAndRelatedPartition(software_instance)
def stepCheckSoftwareInstanceAndRelatedComputerPartitionNoPackingListCheck(self,
sequence, **kw):
software_instance_uid = sequence['software_instance_uid']
software_instance = self.portal.portal_catalog.getResultValue(
uid=software_instance_uid)
......@@ -3143,7 +3159,6 @@ class TestVifibSlapWebService(testVifibMixin):
predecessor_value_list = software_instance.getPredecessorValueList()
self.assertEqual(1, len(predecessor_value_list))
self._checkSoftwareInstanceAndRelatedPartition(software_instance)
sequence.edit(
requested_software_instance_uid=predecessor_value_list[0].getUid(),
requested_software_instance_reference=predecessor_value_list[0].getReference())
......@@ -8726,12 +8741,42 @@ class TestVifibSlapWebService(testVifibMixin):
sequence_list.addSequenceString(sequence_string)
sequence_list.play(self)
def stepStoreCurrentSoftwareInstanceUidBufferA(self, sequence, **kw):
sequence['buffer_a_software_instance_uid'] = sequence['software_instance_uid']
def stepStoreCurrentSoftwareInstanceUidBufferB(self, sequence, **kw):
sequence['buffer_b_software_instance_uid'] = sequence['software_instance_uid']
def stepStoreCurrentComputerUidBufferA(self, sequence, **kw):
sequence['buffer_a_computer_uid'] = sequence['computer_uid']
def stepStoreCurrentComputerUidBufferB(self, sequence, **kw):
sequence['buffer_b_computer_uid'] = sequence['computer_uid']
def stepRestoreSoftwareInstanceUidFromBufferA(self, sequence, **kw):
sequence['software_instance_uid'] = sequence['buffer_a_software_instance_uid']
def stepRestoreSoftwareInstanceUidFromBufferB(self, sequence, **kw):
sequence['software_instance_uid'] = sequence['buffer_b_software_instance_uid']
def stepRestoreComputerUidFromBufferA(self, sequence, **kw):
sequence['computer_uid'] = sequence['buffer_a_computer_uid']
def stepRestoreComputerUidFromBufferB(self, sequence, **kw):
sequence['computer_uid'] = sequence['buffer_b_computer_uid']
def stepStoreCurrentComputerReferenceBufferA(self, sequence, **kw):
sequence['buffer_a_computer_reference'] = sequence['computer_reference']
def stepStoreCurrentComputerReferenceBufferB(self, sequence, **kw):
sequence['buffer_b_computer_reference'] = sequence['computer_reference']
def stepStoreCurrentComputerPartitionUidBufferA(self, sequence, **kw):
sequence['buffer_a_computer_partition_uid'] = sequence['computer_partition_uid']
def stepStoreCurrentComputerPartitionUidBufferB(self, sequence, **kw):
sequence['buffer_b_computer_partition_uid'] = sequence['computer_partition_uid']
def stepStoreCurrentComputerPartitionReferenceBufferA(self, sequence, **kw):
sequence['buffer_a_computer_partition_reference'] = sequence['computer_partition_reference']
......@@ -8744,6 +8789,12 @@ class TestVifibSlapWebService(testVifibMixin):
def stepRestoreComputerReferenceFromBufferB(self, sequence, **kw):
sequence['computer_reference'] = sequence['buffer_b_computer_reference']
def stepRestoreComputerPartitionUidFromBufferA(self, sequence, **kw):
sequence['computer_partition_uid'] = sequence['buffer_a_computer_partition_uid']
def stepRestoreComputerPartitionUidFromBufferB(self, sequence, **kw):
sequence['computer_partition_uid'] = sequence['buffer_b_computer_partition_uid']
def stepRestoreComputerPartitionReferenceFromBufferA(self, sequence, **kw):
sequence['computer_partition_reference'] = sequence['buffer_a_computer_partition_reference']
......@@ -8755,31 +8806,149 @@ class TestVifibSlapWebService(testVifibMixin):
If software instance originated on computer comes from another computer it
shall be possible to sucesfully destroy it.
Test is done in a way to trigger unstable Assignor role calculation
on Hosting Subscription which leads to unavailability of Software Instances
from one computer to another.
"""
sequence_list = SequenceList()
sequence_string = self.prepare_install_requested_computer_partition_sequence_string + \
"""
sequence_string = """
# Prepare software release shared by both Computers
LoginTestVifibDeveloper
SelectNewSoftwareReleaseUri
CreateSoftwareRelease
Tic
SubmitSoftwareRelease
Tic
CreateSoftwareProduct
Tic
ValidateSoftwareProduct
Tic
SetSoftwareProductToSoftwareRelease
PublishByActionSoftwareRelease
Logout
# Create first computer
LoginTestVifibAdmin
CreateComputer
Tic
Logout
SlapLoginCurrentComputer
FormatComputer
Tic
SlapLogout
StoreCurrentComputerReferenceBufferA
StoreCurrentComputerUidBufferA
# Install software on first computer
LoginTestVifibAdmin
RequestSoftwareInstallation
Tic
Logout
SlapLoginCurrentComputer
ComputerSoftwareReleaseAvailable
Tic
SlapLogout
# Now request and instantiate this software release on first computer
LoginTestVifibCustomer
PersonRequestSoftwareInstance
Tic
Logout
LoginDefaultUser
ConfirmOrderedSaleOrderActiveSense
Tic
SetSelectedComputerPartition
SelectCurrentlyUsedSalePackingListUid
Logout
StoreCurrentComputerPartitionReferenceBufferA
""" + \
self.prepare_formated_computer + \
"""
StoreCurrentComputerPartitionUidBufferA
StoreCurrentSoftwareInstanceUidBufferA
LoginDefaultUser
CheckComputerPartitionInstanceSetupSalePackingListConfirmed
Logout
# Start it..
SlapLoginCurrentComputer
SoftwareInstanceAvailable
Tic
SlapLogout
LoginDefaultUser
SetSelectedComputerPartition
CheckComputerPartitionInstanceSetupSalePackingListStopped
CheckComputerPartitionInstanceHostingSalePackingListConfirmed
Logout
SlapLoginCurrentComputer
SoftwareInstanceStarted
Tic
SlapLogout
LoginDefaultUser
CheckComputerPartitionInstanceHostingSalePackingListStarted
Logout
# ...stop it...
LoginDefaultUser
RequestSoftwareInstanceStop
Tic
Logout
LoginDefaultUser
CheckComputerPartitionInstanceHostingSalePackingListStopped
Logout
SlapLoginCurrentComputer
SoftwareInstanceStopped
Tic
SlapLogout
LoginDefaultUser
CheckComputerPartitionInstanceHostingSalePackingListDelivered
Logout
# ...and request destruction
LoginDefaultUser
RequestSoftwareInstanceDestroy
Tic
Logout
LoginDefaultUser
CheckComputerPartitionInstanceCleanupSalePackingListConfirmed
Logout
# Now prepare second computer
LoginTestVifibAdmin
CreateComputer
Tic
Logout
SlapLoginCurrentComputer
FormatComputer
Tic
SlapLogout
StoreCurrentComputerReferenceBufferB
StoreCurrentComputerPartitionReferenceBufferB
StoreCurrentComputerUidBufferB
LoginTestVifibAdmin
RequestSoftwareInstallation
Tic
Logout
SlapLoginCurrentComputer
ComputerSoftwareReleaseAvailable
Tic
SlapLogout
StoreCurrentComputerReferenceBufferB
StoreCurrentComputerUidBufferB
# Now request self software release from one computer to another
RestoreComputerReferenceFromBufferA
RestoreComputerPartitionReferenceFromBufferA
RestoreComputerUidFromBufferA
RestoreSoftwareInstanceUidFromBufferA
SlapLoginCurrentSoftwareInstance
RequestComputerPartitionNotReadyResponse
Tic
......@@ -8791,55 +8960,72 @@ class TestVifibSlapWebService(testVifibMixin):
SlapLogout
LoginDefaultUser
CheckSoftwareInstanceAndRelatedComputerPartition
CheckSoftwareInstanceAndRelatedComputerPartitionNoPackingListCheck
CheckRequestedSoftwareInstanceAndRelatedComputerPartition
Logout
SlapLoginCurrentSoftwareInstance
CheckRequestedComputerPartitionCleanParameterList
Logout
LoginDefaultUser
SetCurrentSoftwareInstanceRequested
SetSelectedComputerPartition
SelectCurrentlyUsedSalePackingListUid
Logout
SlapLoginCurrentSoftwareInstance
CheckRequestedComputerPartitionCleanParameterList
Logout
StoreCurrentComputerPartitionReferenceBufferB
StoreCurrentComputerPartitionUidBufferB
StoreCurrentSoftwareInstanceUidBufferB
RestoreComputerReferenceFromBufferB
RestoreComputerPartitionReferenceFromBufferB
RestoreComputerUidFromBufferB
# Start the requested software instance...
SlapLoginCurrentComputer
SoftwareInstanceBuilding
SoftwareInstanceAvailable
Tic
SlapLogout
LoginDefaultUser
CheckComputerPartitionInstanceSetupSalePackingListStarted
SetSelectedComputerPartition
CheckComputerPartitionInstanceSetupSalePackingListStopped
CheckComputerPartitionInstanceHostingSalePackingListConfirmed
Logout
SlapLoginCurrentComputer
SoftwareInstanceAvailable
SoftwareInstanceStarted
Tic
SlapLogout
LoginDefaultUser
CheckComputerPartitionInstanceSetupSalePackingListStopped
CheckComputerPartitionInstanceHostingSalePackingListConfirmed
CheckComputerPartitionInstanceHostingSalePackingListStarted
Logout
# ...and stop it
LoginDefaultUser
RequestSoftwareInstanceStop
Tic
Logout
LoginDefaultUser
CheckComputerPartitionInstanceHostingSalePackingListStopped
Logout
SlapLoginCurrentComputer
SoftwareInstanceStarted
SoftwareInstanceStopped
Tic
SlapLogout
LoginDefaultUser
CheckComputerPartitionInstanceHostingSalePackingListStarted
SetCurrentSoftwareInstanceRequester
SetSelectedComputerPartition
SelectCurrentlyUsedSalePackingListUid
CheckComputerPartitionInstanceHostingSalePackingListDelivered
Logout
LoginTestVifibCustomer
# Now request destruction of second software instance...
LoginDefaultUser
RequestSoftwareInstanceDestroy
Tic
Logout
......@@ -8848,8 +9034,24 @@ class TestVifibSlapWebService(testVifibMixin):
CheckComputerPartitionInstanceCleanupSalePackingListConfirmed
Logout
RestoreComputerReferenceFromBufferA
# ...and destroy it
SlapLoginCurrentComputer
SoftwareInstanceDestroyed
Tic
SlapLogout
LoginDefaultUser
CheckComputerPartitionInstanceCleanupSalePackingListDelivered
CheckComputerPartitionIsFree
Logout
# Time to switch back to first software instance and destroy it
RestoreComputerPartitionReferenceFromBufferA
RestoreComputerPartitionUidFromBufferA
RestoreSoftwareInstanceUidFromBufferA
RestoreComputerReferenceFromBufferA
RestoreComputerUidFromBufferA
SlapLoginCurrentComputer
SoftwareInstanceDestroyed
......@@ -8863,8 +9065,737 @@ class TestVifibSlapWebService(testVifibMixin):
"""
sequence_list.addSequenceString(sequence_string)
sequence_list.play(self)
def test_bug_hosting_subscription_assignor_role_instability(self):
"""Show instability issue of Assignor role on Hosting Subscription
Related to fact when Hosting Subscription is associated to
Software Instances deployed on many computers"""
raise NotImplementedError
def test_bug_destruction_with_unfinished_packing_list(self):
"""Proves that even if some packing lists are not fully delivered
it is possible to destroy software instance"""
sequence_list = SequenceList()
sequence_string = """
# Prepare software release
LoginTestVifibDeveloper
SelectNewSoftwareReleaseUri
CreateSoftwareRelease
Tic
SubmitSoftwareRelease
Tic
CreateSoftwareProduct
Tic
ValidateSoftwareProduct
Tic
SetSoftwareProductToSoftwareRelease
PublishByActionSoftwareRelease
Logout
# Create first computer
LoginTestVifibAdmin
CreateComputer
Tic
Logout
SlapLoginCurrentComputer
FormatComputer
Tic
SlapLogout
StoreCurrentComputerReferenceBufferA
StoreCurrentComputerUidBufferA
# Install software on first computer
LoginTestVifibAdmin
RequestSoftwareInstallation
Tic
Logout
SlapLoginCurrentComputer
ComputerSoftwareReleaseAvailable
Tic
SlapLogout
# Now request and instantiate this software release on first computer
LoginTestVifibCustomer
PersonRequestSoftwareInstance
Tic
Logout
LoginDefaultUser
ConfirmOrderedSaleOrderActiveSense
Tic
SetSelectedComputerPartition
SelectCurrentlyUsedSalePackingListUid
Logout
LoginDefaultUser
CheckComputerPartitionInstanceSetupSalePackingListConfirmed
Logout
# Request destruction...
LoginDefaultUser
RequestSoftwareInstanceDestroy
Tic
Logout
LoginDefaultUser
CheckComputerPartitionInstanceCleanupSalePackingListConfirmed
Logout
# ...and destroy it
SlapLoginCurrentComputer
SoftwareInstanceDestroyed
Tic
SlapLogout
LoginDefaultUser
CheckComputerPartitionInstanceCleanupSalePackingListDelivered
CheckComputerPartitionIsFree
Logout
"""
sequence_list.addSequenceString(sequence_string)
sequence_list.play(self)
def stepDirectRequestComputerPartitionRaisesDisconnectedSoftwareTree(self,
sequence, **kw):
software_instance = self.portal.portal_catalog.getResultValue(
uid = sequence['software_instance_uid'])
requested_reference = sequence['requested_reference']
from erp5.document.SoftwareInstance import DisconnectedSoftwareTree
self.assertRaises(DisconnectedSoftwareTree,
software_instance.requestSoftwareInstance,
software_release=sequence['software_release_uri'],
software_type=sequence['requested_reference'],
partition_reference=sequence['requested_reference'],
shared=False,
instance_xml=self.minimal_correct_xml,
sla_xml=self.minimal_correct_xml,
state='started'
)
def test_bug_orhpaned_software_instance(self):
"""Check that no orphaned Software Instances would be created
In below scenario system shall behave like mentioned:
OpenOrder.request(SR, A) | SR(A)
A.request(SR, B) | SR(A) <- SR(B)
B.request(SR, C) | SR(A) <- SR(B) <- SR(C)
C.request(SR, B) raises immediately, because the result would be:
SR(A)
SR(B) <- SR(C)
do B would become root of orphaned tree.
"""
# Setup sufficient amount of CP
self.computer_partition_amount = 3
sequence_list = SequenceList()
sequence_string = """
# Prepare software release
LoginTestVifibDeveloper
SelectNewSoftwareReleaseUri
CreateSoftwareRelease
Tic
SubmitSoftwareRelease
Tic
CreateSoftwareProduct
Tic
ValidateSoftwareProduct
Tic
SetSoftwareProductToSoftwareRelease
PublishByActionSoftwareRelease
Logout
# Create the computer
LoginTestVifibAdmin
CreateComputer
Tic
Logout
SlapLoginCurrentComputer
FormatComputer
Tic
SlapLogout
StoreCurrentComputerReferenceBufferA
StoreCurrentComputerUidBufferA
# Install the software release
LoginTestVifibAdmin
RequestSoftwareInstallation
Tic
Logout
SlapLoginCurrentComputer
ComputerSoftwareReleaseAvailable
Tic
SlapLogout
# Create Software Instance A (originates from Open Order)
LoginTestVifibCustomer
PersonRequestSoftwareInstance
Tic
Logout
LoginDefaultUser
ConfirmOrderedSaleOrderActiveSense
Tic
SetSelectedComputerPartition
SelectCurrentlyUsedSalePackingListUid
Logout
LoginDefaultUser
CheckComputerPartitionInstanceSetupSalePackingListConfirmed
Logout
# From root request B
SelectRequestedReferenceB
SlapLoginCurrentSoftwareInstance
RequestComputerPartitionNotReadyResponse
Tic
SlapLogout
SlapLoginCurrentSoftwareInstance
RequestComputerPartition
Tic
SlapLogout
LoginDefaultUser
CheckSoftwareInstanceAndRelatedComputerPartitionNoPackingListCheck
CheckRequestedSoftwareInstanceAndRelatedComputerPartition
Logout
LoginDefaultUser
SetCurrentSoftwareInstanceRequested
SetSelectedComputerPartition
SelectCurrentlyUsedSalePackingListUid
Logout
SlapLoginCurrentSoftwareInstance
CheckRequestedComputerPartitionCleanParameterList
Logout
# From B request C
SelectRequestedReferenceC
SlapLoginCurrentSoftwareInstance
RequestComputerPartitionNotReadyResponse
Tic
SlapLogout
SlapLoginCurrentSoftwareInstance
RequestComputerPartition
Tic
SlapLogout
LoginDefaultUser
CheckSoftwareInstanceAndRelatedComputerPartitionNoPackingListCheck
CheckRequestedSoftwareInstanceAndRelatedComputerPartition
Logout
LoginDefaultUser
SetCurrentSoftwareInstanceRequested
SetSelectedComputerPartition
SelectCurrentlyUsedSalePackingListUid
Logout
SlapLoginCurrentSoftwareInstance
CheckRequestedComputerPartitionCleanParameterList
Logout
# Try to: from C request B and prove that it raises
SelectRequestedReferenceB
LoginDefaultUser # login as superuser in order to work in erp5
DirectRequestComputerPartitionRaisesDisconnectedSoftwareTree
"""
sequence_list.addSequenceString(sequence_string)
sequence_list.play(self)
def stepDirectRequestComputerPartitionRaisesCyclicSoftwareTree(self,
sequence, **kw):
software_instance = self.portal.portal_catalog.getResultValue(
uid = sequence['software_instance_uid'])
requested_reference = sequence['requested_reference']
from erp5.document.SoftwareInstance import CyclicSoftwareTree
self.assertRaises(CyclicSoftwareTree,
software_instance.requestSoftwareInstance,
software_release=sequence['software_release_uri'],
software_type=sequence['requested_reference'],
partition_reference=sequence['requested_reference'],
shared=False,
instance_xml=self.minimal_correct_xml,
sla_xml=self.minimal_correct_xml,
state='started'
)
def test_bug_cyclic_software_instance(self):
"""Check that no cyclic Software Instance trees would be created
In below scenario system shall behave like mentioned:
OpenOrder.request(SR, A) | SR(A)
A.request(SR, B) | SR(A) <- SR(B)
B.request(SR, A) | SR(A) <- SR(B) <- SR(C)
C.request(SR, B) raises immediately, because the result would be:
SR(A)
SR(B) <-> SR(C)
so B and C would be cyclic
"""
# Setup sufficient amount of CP
self.computer_partition_amount = 3
sequence_list = SequenceList()
sequence_string = """
# Prepare software release
LoginTestVifibDeveloper
SelectNewSoftwareReleaseUri
CreateSoftwareRelease
Tic
SubmitSoftwareRelease
Tic
CreateSoftwareProduct
Tic
ValidateSoftwareProduct
Tic
SetSoftwareProductToSoftwareRelease
PublishByActionSoftwareRelease
Logout
# Create the computer
LoginTestVifibAdmin
CreateComputer
Tic
Logout
SlapLoginCurrentComputer
FormatComputer
Tic
SlapLogout
StoreCurrentComputerReferenceBufferA
StoreCurrentComputerUidBufferA
# Install the software release
LoginTestVifibAdmin
RequestSoftwareInstallation
Tic
Logout
SlapLoginCurrentComputer
ComputerSoftwareReleaseAvailable
Tic
SlapLogout
# Create Software Instance A (originates from Open Order)
LoginTestVifibCustomer
PersonRequestSoftwareInstance
Tic
Logout
LoginDefaultUser
ConfirmOrderedSaleOrderActiveSense
Tic
SetSelectedComputerPartition
SelectCurrentlyUsedSalePackingListUid
Logout
LoginDefaultUser
CheckComputerPartitionInstanceSetupSalePackingListConfirmed
Logout
# From root request B
SelectRequestedReferenceB
SlapLoginCurrentSoftwareInstance
RequestComputerPartitionNotReadyResponse
Tic
SlapLogout
SlapLoginCurrentSoftwareInstance
RequestComputerPartition
Tic
SlapLogout
LoginDefaultUser
CheckSoftwareInstanceAndRelatedComputerPartitionNoPackingListCheck
CheckRequestedSoftwareInstanceAndRelatedComputerPartition
Logout
LoginDefaultUser
SetCurrentSoftwareInstanceRequested
SetSelectedComputerPartition
SelectCurrentlyUsedSalePackingListUid
Logout
SlapLoginCurrentSoftwareInstance
CheckRequestedComputerPartitionCleanParameterList
Logout
# From B request C
SelectRequestedReferenceC
SlapLoginCurrentSoftwareInstance
RequestComputerPartitionNotReadyResponse
Tic
SlapLogout
SlapLoginCurrentSoftwareInstance
RequestComputerPartition
Tic
SlapLogout
LoginDefaultUser
CheckSoftwareInstanceAndRelatedComputerPartitionNoPackingListCheck
CheckRequestedSoftwareInstanceAndRelatedComputerPartition
Logout
LoginDefaultUser
SetCurrentSoftwareInstanceRequested
SetSelectedComputerPartition
SelectCurrentlyUsedSalePackingListUid
Logout
SlapLoginCurrentSoftwareInstance
CheckRequestedComputerPartitionCleanParameterList
Logout
# Try to: from C request B and prove that it raises
SelectRequestedReferenceB
LoginDefaultUser # login as superuser in order to work in erp5
DirectRequestComputerPartitionRaisesCyclicSoftwareTree
"""
sequence_list.addSequenceString(sequence_string)
sequence_list.play(self)
def stepDirectRequestComputerPartitionRaisesValueError(self,
sequence, **kw):
software_instance = self.portal.portal_catalog.getResultValue(
uid = sequence['software_instance_uid'])
requested_reference = sequence['requested_reference']
self.assertRaises(ValueError,
software_instance.requestSoftwareInstance,
software_release=sequence['software_release_uri'],
software_type=sequence['requested_reference'],
partition_reference=sequence['requested_reference'],
shared=False,
instance_xml=self.minimal_correct_xml,
sla_xml=self.minimal_correct_xml,
state='started'
)
def test_bug_cyclic_software_instance_small_tree(self):
"""Check that no cyclic Software Instance trees would be created
In below scenario system shall behave like mentioned:
OpenOrder.request(SR, A) | SR(A)
A.request(SR, B) | SR(A) <- SR(B)
B.request(SR, A) raises immediately, because the result would be:
SR(A) <-> SR(B)
so B and A would be cyclic
"""
# Setup sufficient amount of CP
self.computer_partition_amount = 2
sequence_list = SequenceList()
sequence_string = """
# Prepare software release
LoginTestVifibDeveloper
SelectNewSoftwareReleaseUri
CreateSoftwareRelease
Tic
SubmitSoftwareRelease
Tic
CreateSoftwareProduct
Tic
ValidateSoftwareProduct
Tic
SetSoftwareProductToSoftwareRelease
PublishByActionSoftwareRelease
Logout
# Create the computer
LoginTestVifibAdmin
CreateComputer
Tic
Logout
SlapLoginCurrentComputer
FormatComputer
Tic
SlapLogout
StoreCurrentComputerReferenceBufferA
StoreCurrentComputerUidBufferA
# Install the software release
LoginTestVifibAdmin
RequestSoftwareInstallation
Tic
Logout
SlapLoginCurrentComputer
ComputerSoftwareReleaseAvailable
Tic
SlapLogout
# Create Software Instance A (originates from Open Order)
LoginTestVifibCustomer
PersonRequestSoftwareInstance
Tic
Logout
LoginDefaultUser
ConfirmOrderedSaleOrderActiveSense
Tic
SetSelectedComputerPartition
SelectCurrentlyUsedSalePackingListUid
Logout
LoginDefaultUser
CheckComputerPartitionInstanceSetupSalePackingListConfirmed
Logout
# From root request B
SelectRequestedReferenceB
SlapLoginCurrentSoftwareInstance
RequestComputerPartitionNotReadyResponse
Tic
SlapLogout
SlapLoginCurrentSoftwareInstance
RequestComputerPartition
Tic
SlapLogout
LoginDefaultUser
CheckSoftwareInstanceAndRelatedComputerPartitionNoPackingListCheck
CheckRequestedSoftwareInstanceAndRelatedComputerPartition
Logout
LoginDefaultUser
SetCurrentSoftwareInstanceRequested
SetSelectedComputerPartition
SelectCurrentlyUsedSalePackingListUid
Logout
SlapLoginCurrentSoftwareInstance
CheckRequestedComputerPartitionCleanParameterList
Logout
# Try to: From B request root
SelectRequestedReferenceRootSoftwareInstanceTitle
LoginDefaultUser # login as superuser in order to work in erp5
DirectRequestComputerPartitionRaisesValueError
"""
sequence_list.addSequenceString(sequence_string)
sequence_list.play(self)
########################################
# Software Instance graph helpers
########################################
def _test_si_tree(self):
software_instance = self.portal.software_instance_module.newContent(
portal_type='Software Instance')
self.checkConnected = software_instance.checkConnected
self.checkNotCyclic = software_instance.checkNotCyclic
def test_si_tree_simple_connected(self):
"""Graph of one element is connected
A
"""
self._test_si_tree()
graph = {'A': []}
root = 'A'
self.assertEqual(True, self.checkConnected(graph, root))
def test_si_tree_simple_list_connected(self):
"""Graph of list is connected
B->C->A
"""
self._test_si_tree()
graph = {'A': [], 'B': ['C'], 'C': ['A']}
root = 'B'
self.assertEqual(True, self.checkConnected(graph, root))
def test_si_tree_complex_connected(self):
"""Tree is connected
B --> A
\-> C --> D
\-> E --> F
"""
self._test_si_tree()
graph = {
'A': [],
'B': ['A', 'C'],
'C': ['D', 'E'],
'D': [],
'E': ['F'],
'F': [],
}
root = 'B'
self.assertEqual(True, self.checkConnected(graph, root))
def test_si_tree_simple_list_disconnected(self):
"""Two lists are disconnected
A->B
C
"""
self._test_si_tree()
graph = {'A': ['B'], 'B': [], 'C': []}
root = 'A'
from erp5.document.SoftwareInstance import DisconnectedSoftwareTree
self.assertRaises(DisconnectedSoftwareTree, self.checkConnected, graph,
root)
# For now limitation of implementation gives false positive
@expectedFailure
def test_si_tree_cyclic_connected(self):
"""Cyclic is connected
A<->B
"""
self._test_si_tree()
graph = {'A': ['B'], 'B': ['A']}
root = 'B'
self.assertEqual(True, self.checkConnected(graph, root))
def test_si_tree_cyclic_disconnected(self):
"""Two trees, where one is cyclic are disconnected
B --> A
\-> H
C --> D --> G
^ \-> E --> F \
\------------/
"""
self._test_si_tree()
graph = {
'A': [],
'B': ['A', 'H'],
'C': ['D', 'E'],
'D': ['G'],
'E': ['F'],
'F': ['C'],
'G': [],
'H': [],
}
root = 'B'
from erp5.document.SoftwareInstance import DisconnectedSoftwareTree
self.assertRaises(DisconnectedSoftwareTree, self.checkConnected, graph,
root)
def test_si_tree_simple_not_cyclic(self):
"""Graph of one element is not cyclic
A
"""
self._test_si_tree()
graph = {'A': []}
self.assertEqual(True, self.checkNotCyclic(graph))
def test_si_tree_simple_list_not_cyclic(self):
"""Graph of list is not cyclic
B->C->A
"""
self._test_si_tree()
graph = {'A': [], 'B': ['C'], 'C': ['A']}
self.assertEqual(True, self.checkNotCyclic(graph))
def test_si_tree_simple_list_cyclic(self):
"""Graph of cyclic list is cyclic
B->C->A-\
^-------/
"""
self._test_si_tree()
graph = {'A': ['B'], 'B': ['C'], 'C': ['A']}
from erp5.document.SoftwareInstance import CyclicSoftwareTree
self.assertRaises(CyclicSoftwareTree, self.checkNotCyclic, graph)
def test_si_tree_simple_list_cyclic(self):
"""Graph of cyclic list is cyclic
B->C->D->A-\
^-------/
"""
self._test_si_tree()
graph = {'A': ['C'], 'B': ['C'], 'C': ['D'], 'D': ['A']}
from erp5.document.SoftwareInstance import CyclicSoftwareTree
self.assertRaises(CyclicSoftwareTree, self.checkNotCyclic, graph)
def test_si_tree_complex_not_cyclic(self):
"""Tree is not cyclic
B --> A
\-> C --> D
\-> E --> F
"""
self._test_si_tree()
graph = {
'A': [],
'B': ['A', 'C'],
'C': ['D', 'E'],
'D': [],
'E': ['F'],
'F': [],
}
self.assertEqual(True, self.checkNotCyclic(graph))
def test_si_tree_complex_cyclic(self):
"""Tree is not cyclic
B --> A
\-> C --> D
^ \-> E --> F -\
\-------------/
"""
self._test_si_tree()
graph = {
'A': [],
'B': ['A', 'C'],
'C': ['D', 'E'],
'D': [],
'E': ['F'],
'F': ['C'],
}
from erp5.document.SoftwareInstance import CyclicSoftwareTree
self.assertRaises(CyclicSoftwareTree, self.checkNotCyclic, graph)
def test_si_tree_simple_list_disconnected_not_cyclic(self):
"""Two lists are disconnected
A->B
C
"""
self._test_si_tree()
graph = {'A': ['B'], 'B': [], 'C': []}
self.assertEqual(True, self.checkNotCyclic(graph))
def test_si_tree_cyclic(self):
"""Cyclic is connected
A<->B
"""
self._test_si_tree()
graph = {'A': ['B'], 'B': ['A']}
from erp5.document.SoftwareInstance import CyclicSoftwareTree
self.assertRaises(CyclicSoftwareTree, self.checkNotCyclic, graph)
def test_si_tree_cyclic_disconnected_cyclic(self):
"""Two trees, where one is cyclic are disconnected
B --> A
\-> H
C --> D --> G
^ \-> E --> F \
\------------/
"""
self._test_si_tree()
graph = {
'A': [],
'B': ['A', 'H'],
'C': ['D', 'E'],
'D': ['G'],
'E': ['F'],
'F': ['C'],
'G': [],
'H': ['A'],
}
from erp5.document.SoftwareInstance import CyclicSoftwareTree
self.assertRaises(CyclicSoftwareTree, self.checkNotCyclic, graph)
########################################
# Other tests
########################################
......
......@@ -57,4 +57,5 @@ setup(name=name,
'slapproxy = slapos.proxy:main',
]
},
test_suite="slapos.tests",
)
......@@ -27,13 +27,13 @@
##############################################################################
from optparse import OptionParser, Option
from xml_marshaller import xml_marshaller
from pwd import getpwnam
import ConfigParser
import grp
import logging
import netaddr
import netifaces
import os
import pwd
import random
import slapos.slap as slap
import socket
......@@ -317,7 +317,7 @@ class Computer:
slapsoft.path = self.software_root
if alter_user:
slapsoft.create()
slapsoft_pw = getpwnam(slapsoft.name)
slapsoft_pw = pwd.getpwnam(slapsoft.name)
os.chown(self.software_root, slapsoft_pw.pw_uid, slapsoft_pw.pw_gid)
os.chmod(self.software_root, 0755)
......@@ -415,7 +415,7 @@ class Partition:
if not os.path.exists(self.path):
os.mkdir(self.path, 0750)
if alter_user:
owner_pw = getpwnam(owner.name)
owner_pw = pwd.getpwnam(owner.name)
os.chown(self.path, owner_pw.pw_uid, owner_pw.pw_gid)
os.chmod(self.path, 0750)
......@@ -457,7 +457,7 @@ class User:
user_parameter_list.extend(['-G', ','.join(self.additional_group_list)])
user_parameter_list.append(self.name)
try:
getpwnam(self.name)
pwd.getpwnam(self.name)
except KeyError:
callAndRead(['useradd'] + user_parameter_list)
else:
......@@ -475,7 +475,7 @@ class User:
"""
try:
getpwnam(self.name)
pwd.getpwnam(self.name)
return True
except KeyError:
......@@ -572,7 +572,7 @@ class Tap:
owner_id = int(open(check_file).read().strip())
except Exception:
pass
if (owner_id is None) or (owner_id != getpwnam(owner.name).pw_uid):
if (owner_id is None) or (owner_id != pwd.getpwnam(owner.name).pw_uid):
callAndRead(['tunctl', '-t', self.name, '-u', owner.name])
callAndRead(['ip', 'link', 'set', self.name, 'up'])
......@@ -584,7 +584,7 @@ class Tap:
class Bridge:
"Bridge represent a bridge on the system"
def __init__(self, name, ipv4_local_network, ipv6_interface):
def __init__(self, name, ipv4_local_network, ipv6_interface=None):
"""
Attributes:
name: String, the name of the bridge
......@@ -846,11 +846,14 @@ class Parser(OptionParser):
help="Shall slapformat alter network configuration [default: True]"),
])
def check_args(self):
def check_args(self, args):
"""
Check arguments
"""
(options, args) = self.parse_args()
if args:
(options, args) = self.parse_args(list(args))
else:
(options, args) = self.parse_args()
if len(args) != 1:
self.error("Incorrect number of arguments")
return options, args[0]
......@@ -1093,17 +1096,17 @@ class Config:
self.computer_xml = os.path.abspath(self.computer_xml)
def main():
def main(*args):
"Run default configuration."
global os
global callAndRead
global getpwnam
global pwd
real_callAndRead = callAndRead
usage = "usage: %s [options] CONFIGURATION_FILE" % sys.argv[0]
try:
# Parse arguments
options, configuration_file_path = Parser(usage=usage).check_args()
options, configuration_file_path = Parser(usage=usage).check_args(args)
config = Config()
config.setConfig(options, configuration_file_path)
os = OS(config)
......@@ -1125,7 +1128,7 @@ def main():
pw_uid = 12345
pw_gid = 54321
return result
getpwnam = fake_getpwnam
pwd.getpwnam = fake_getpwnam
else:
dry_callAndRead = real_callAndRead
if config.verbose:
......
......@@ -337,7 +337,7 @@ class Slapgrid(object):
computer_partition_list = self.computer.getComputerPartitionList()
except socket.error as error:
self.logger.fatal(error)
sys.exit(1)
raise
return computer_partition_list
def processSoftwareReleaseList(self):
......
#!/usr/bin/env python2
# -*- coding: utf-8 -*-
"""Mocked httplib
"""
from urlparse import urlparse
__all__ = []
def log(message):
"""Need to be overridden to get a proper logger
"""
pass
class HTTPConnection(object):
scheme = 'http'
def _callback(self, path, method, body, headers):
"""To get it works properly, you need to override
HTTPConnection._callback. This method received the instance, the path,
method and request body as parameter, and it has to return a tuple with
headers dictionary and body response string.
@param self object instance reference
@param URL the parsed URL
@param method the http method
@param body the request body
@param headers the request headers
@return tuple containing status integer, headers dictionary and body
response"""
return (0, {}, '', )
def __init__(self, host, port=None, strict=None,
timeout=None, source_address=None):
self.host = host
self.port = port
self.strict = strict
self.timeout = timeout
self.source_address = source_address
self.__response = None
def request(self, method, url, body=None, headers=None):
status, headers, body = self._callback(url, method, body, headers)
self.__response = HTTPResponse('HTTP/1.1', 200, 'OK', body, headers)
def getresponse(self):
response = self.__response
self.__response = None
return response
def set_debuglevel(self, level):
pass
def set_tunnel(self, host, port=None, headers=None):
pass
def connect(self):
pass
def close(self):
pass
def putrequest(self, request, selector, skip_host=None,
skip_accept_encoding=None):
pass
def putheader(self, *args):
pass
def endheaders(self):
pass
def send(self, data):
pass
class HTTPSConnection(HTTPConnection):
def __init__(self, host, port=None, key_file=None,
cert_file=None, strict=None, timeout=None,
source_address=None):
super().__init__(self, host, port, strict, timeout,
source_address)
pass
class HTTPResponse(object):
def __init__(self, version, status, reason, content, headers=()):
self.version = version
self.status = status
self.reason = reason
self.__headers = headers
self.__content = content
def read(self, amt=None):
result = None
if amt is None:
result = self.__content
self.__content = ''
else:
end = max(amt, len(self.__content))
result = self.__content[:end]
del self.__content[:end]
return result
def getheader(self, name, default=None):
pass
def getheaders(self):
pass
import logging
import slapos.format
import unittest
import netaddr
# for mocking
import grp
import netifaces
import os
import pwd
import time
USER_LIST = []
GROUP_LIST = []
INTERFACE_DICT = {}
class FakeConfig:
pass
class TestLoggerHandler(logging.Handler):
def __init__(self, *args, **kwargs):
self.bucket = []
logging.Handler.__init__(self, *args, **kwargs)
def emit(self, record):
self.bucket.append(record.msg)
class FakeCallAndRead:
def __init__(self):
self.external_command_list = []
def __call__(self, argument_list, raise_on_error=True):
retval = 0, 'UP'
global INTERFACE_DICT
if 'useradd' in argument_list:
global USER_LIST
USER_LIST.append(argument_list[-1])
elif 'groupadd' in argument_list:
global GROUP_LIST
GROUP_LIST.append(argument_list[-1])
elif argument_list[:3] == ['ip', 'addr', 'add']:
ip, interface = argument_list[3], argument_list[5]
if ':' not in ip:
netmask = netaddr.strategy.ipv4.int_to_str(
netaddr.strategy.ipv4.prefix_to_netmask[int(ip.split('/')[1])])
ip = ip.split('/')[0]
INTERFACE_DICT[interface][2].append({'addr': ip, 'netmask': netmask})
else:
netmask = netaddr.strategy.ipv6.int_to_str(
netaddr.strategy.ipv6.prefix_to_netmask[int(ip.split('/')[1])])
ip = ip.split('/')[0]
INTERFACE_DICT[interface][10].append({'addr': ip, 'netmask': netmask})
# stabilise by mangling ip to just ip string
argument_list[3] = 'ip/%s' % netmask
elif argument_list[:3] == ['ip', 'addr', 'list']:
retval = 0, str(INTERFACE_DICT)
self.external_command_list.append(' '.join(argument_list))
return retval
class LoggableWrapper:
def __init__(self, logger, name):
self.__logger = logger
self.__name = name
def __call__(self, *args, **kwargs):
arg_list = [repr(x) for x in args] + [
'%s=%r' % (x, y) for x, y in kwargs.iteritems()]
self.__logger.debug('%s(%s)' % (self.__name, ', '.join(arg_list)))
class TimeMock:
@classmethod
def sleep(self, seconds):
return
class GrpMock:
@classmethod
def getgrnam(self, name):
global GROUP_LIST
if name in GROUP_LIST:
return True
raise KeyError
class PwdMock:
@classmethod
def getpwnam(self, name):
global USER_LIST
if name in USER_LIST:
class result:
pw_uid = 0
pw_gid = 0
return result
raise KeyError
class NetifacesMock:
@classmethod
def ifaddresses(self, name):
global INTERFACE_DICT
if name in INTERFACE_DICT:
return INTERFACE_DICT[name]
raise ValueError
@classmethod
def interfaces(self):
global INTERFACE_DICT
return INTERFACE_DICT.keys()
class SlapformatMixin(unittest.TestCase):
# keep big diffs
maxDiff = None
def patchNetifaces(self):
self.netifaces = NetifacesMock()
self.saved_netifaces = dict()
for fake in vars(NetifacesMock):
self.saved_netifaces[fake] = getattr(netifaces, fake, None)
setattr(netifaces, fake, getattr(self.netifaces, fake))
def restoreNetifaces(self):
for name, original_value in self.saved_netifaces.items():
setattr(netifaces, name, original_value)
del self.saved_netifaces
def patchPwd(self):
self.saved_pwd = dict()
for fake in vars(PwdMock):
self.saved_pwd[fake] = getattr(pwd, fake, None)
setattr(pwd, fake, getattr(PwdMock, fake))
def restorePwd(self):
for name, original_value in self.saved_pwd.items():
setattr(pwd, name, original_value)
del self.saved_pwd
def patchTime(self):
self.saved_time = dict()
for fake in vars(TimeMock):
self.saved_time[fake] = getattr(time, fake, None)
setattr(time, fake, getattr(TimeMock, fake))
def restoreTime(self):
for name, original_value in self.saved_time.items():
setattr(time, name, original_value)
del self.saved_time
def patchGrp(self):
self.saved_grp = dict()
for fake in vars(GrpMock):
self.saved_grp[fake] = getattr(grp, fake, None)
setattr(grp, fake, getattr(GrpMock, fake))
def restoreGrp(self):
for name, original_value in self.saved_grp.items():
setattr(grp, name, original_value)
del self.saved_grp
def patchOs(self, logger):
self.saved_os = dict()
for fake in ['mkdir', 'chown', 'chmod', 'makedirs']:
self.saved_os[fake] = getattr(os, fake, None)
f = LoggableWrapper(logger, fake)
setattr(os, fake, f)
def restoreOs(self):
for name, original_value in self.saved_os.items():
setattr(os, name, original_value)
del self.saved_os
def setUp(self):
config = FakeConfig()
config.dry_run = True
config.verbose = True
logger = logging.getLogger('testcatch')
logger.setLevel(logging.DEBUG)
self.test_result = TestLoggerHandler()
logger.addHandler(self.test_result)
config.logger = logger
self.partition = slapos.format.Partition('partition', '/part_path',
slapos.format.User('testuser'), [], None)
global USER_LIST
USER_LIST = []
global GROUP_LIST
GROUP_LIST = []
global INTERFACE_DICT
INTERFACE_DICT = {}
self.real_callAndRead = slapos.format.callAndRead
self.fakeCallAndRead = FakeCallAndRead()
slapos.format.callAndRead = self.fakeCallAndRead
self.patchOs(logger)
self.patchGrp()
self.patchTime()
self.patchPwd()
self.patchNetifaces()
def tearDown(self):
self.restoreOs()
self.restoreGrp()
self.restoreTime()
self.restorePwd()
self.restoreNetifaces()
slapos.format.callAndRead = self.real_callAndRead
class TestComputer(SlapformatMixin):
def test_getAddress_empty_computer(self):
computer = slapos.format.Computer('computer')
self.assertEqual(computer.getAddress(), {'netmask': None, 'addr': None})
def test_construct_empty(self):
computer = slapos.format.Computer('computer')
computer.construct()
raise NotImplementedError
def test_construct_empty_prepared(self):
computer = slapos.format.Computer('computer',
bridge=slapos.format.Bridge('bridge', '127.0.0.1/16'))
computer.instance_root = '/instance_root'
computer.software_root = '/software_root'
computer.construct()
self.assertEqual([
"makedirs('/instance_root', 493)",
"makedirs('/software_root', 493)",
"chown('/software_root', 0, 0)",
"chmod('/software_root', 493)"],
self.test_result.bucket)
self.assertEqual([
'ip addr list bridge',
'groupadd slapsoft',
'useradd -d /software_root -g slapsoft -s /bin/false slapsoft'
],
self.fakeCallAndRead.external_command_list)
def test_construct_empty_prepared_no_alter_user(self):
computer = slapos.format.Computer('computer',
bridge=slapos.format.Bridge('bridge', '127.0.0.1/16'))
computer.instance_root = '/instance_root'
computer.software_root = '/software_root'
computer.construct(alter_user=False)
self.assertEqual([
"makedirs('/instance_root', 493)",
"makedirs('/software_root', 493)",
"chmod('/software_root', 493)"],
self.test_result.bucket)
self.assertEqual([
'ip addr list bridge',],
self.fakeCallAndRead.external_command_list)
def test_construct_empty_prepared_no_alter_network(self):
computer = slapos.format.Computer('computer',
bridge=slapos.format.Bridge('bridge', '127.0.0.1/16'))
computer.instance_root = '/instance_root'
computer.software_root = '/software_root'
computer.construct(alter_network=False)
self.assertEqual([
"makedirs('/instance_root', 493)",
"makedirs('/software_root', 493)",
"chown('/software_root', 0, 0)",
"chmod('/software_root', 493)"],
self.test_result.bucket)
self.assertEqual([
'ip addr list bridge',
'groupadd slapsoft',
'useradd -d /software_root -g slapsoft -s /bin/false slapsoft'
],
self.fakeCallAndRead.external_command_list)
def test_construct_empty_prepared_no_alter_network_user(self):
computer = slapos.format.Computer('computer',
bridge=slapos.format.Bridge('bridge', '127.0.0.1/16'))
computer.instance_root = '/instance_root'
computer.software_root = '/software_root'
computer.construct(alter_network=False, alter_user=False)
self.assertEqual([
"makedirs('/instance_root', 493)",
"makedirs('/software_root', 493)",
"chmod('/software_root', 493)"],
self.test_result.bucket)
self.assertEqual([
'ip addr list bridge',
],
self.fakeCallAndRead.external_command_list)
def test_construct_prepared(self):
computer = slapos.format.Computer('computer',
bridge=slapos.format.Bridge('bridge', '127.0.0.1/16'))
computer.instance_root = '/instance_root'
computer.software_root = '/software_root'
partition = slapos.format.Partition('partition', '/part_path',
slapos.format.User('testuser'), [], None)
partition.tap = slapos.format.Tap('tap')
computer.partition_list = [partition]
global INTERFACE_DICT
INTERFACE_DICT['bridge'] = {
2: [{'addr': '192.168.242.77', 'broadcast': '127.0.0.1',
'netmask': '255.255.255.0'}],
10: [{'addr': '2a01:e35:2e27::e59c', 'netmask': 'ffff:ffff:ffff:ffff::'}]
}
computer.construct()
self.assertEqual([
"makedirs('/instance_root', 493)",
"makedirs('/software_root', 493)",
"chown('/software_root', 0, 0)",
"chmod('/software_root', 493)",
"mkdir('/instance_root/partition', 488)",
"chown('/instance_root/partition', 0, 0)",
"chmod('/instance_root/partition', 488)"
],
self.test_result.bucket)
self.assertEqual([
'ip addr list bridge',
'groupadd slapsoft',
'useradd -d /software_root -g slapsoft -s /bin/false slapsoft',
'groupadd testuser',
'useradd -d /instance_root/partition -g testuser -s /bin/false -G slapsoft testuser',
'tunctl -t tap -u testuser',
'ip link set tap up',
'brctl show',
'brctl addif bridge tap',
'ip addr add ip/255.255.255.255 dev bridge',
'ip addr list bridge',
'ip addr add ip/ffff:ffff:ffff:ffff:: dev bridge',
'ip addr list bridge',
],
self.fakeCallAndRead.external_command_list)
def test_construct_prepared_no_alter_user(self):
computer = slapos.format.Computer('computer',
bridge=slapos.format.Bridge('bridge', '127.0.0.1/16'))
computer.instance_root = '/instance_root'
computer.software_root = '/software_root'
partition = slapos.format.Partition('partition', '/part_path',
slapos.format.User('testuser'), [], None)
partition.tap = slapos.format.Tap('tap')
computer.partition_list = [partition]
global INTERFACE_DICT
INTERFACE_DICT['bridge'] = {
2: [{'addr': '192.168.242.77', 'broadcast': '127.0.0.1',
'netmask': '255.255.255.0'}],
10: [{'addr': '2a01:e35:2e27::e59c', 'netmask': 'ffff:ffff:ffff:ffff::'}]
}
computer.construct(alter_user=False)
self.assertEqual([
"makedirs('/instance_root', 493)",
"makedirs('/software_root', 493)",
"chmod('/software_root', 493)",
"mkdir('/instance_root/partition', 488)",
"chmod('/instance_root/partition', 488)"
],
self.test_result.bucket)
self.assertEqual([
'ip addr list bridge',
'tunctl -t tap -u root',
'ip link set tap up',
'brctl show',
'brctl addif bridge tap',
'ip addr add ip/255.255.255.255 dev bridge',
'ip addr list bridge',
'ip addr add ip/ffff:ffff:ffff:ffff:: dev bridge',
'ip addr list bridge',
],
self.fakeCallAndRead.external_command_list)
def test_construct_prepared_no_alter_network(self):
computer = slapos.format.Computer('computer',
bridge=slapos.format.Bridge('bridge', '127.0.0.1/16'))
computer.instance_root = '/instance_root'
computer.software_root = '/software_root'
partition = slapos.format.Partition('partition', '/part_path',
slapos.format.User('testuser'), [], None)
partition.tap = slapos.format.Tap('tap')
computer.partition_list = [partition]
global INTERFACE_DICT
INTERFACE_DICT['bridge'] = {
2: [{'addr': '192.168.242.77', 'broadcast': '127.0.0.1',
'netmask': '255.255.255.0'}],
10: [{'addr': '2a01:e35:2e27::e59c', 'netmask': 'ffff:ffff:ffff:ffff::'}]
}
computer.construct(alter_network=False)
self.assertEqual([
"makedirs('/instance_root', 493)",
"makedirs('/software_root', 493)",
"chown('/software_root', 0, 0)",
"chmod('/software_root', 493)",
"mkdir('/instance_root/partition', 488)",
"chown('/instance_root/partition', 0, 0)",
"chmod('/instance_root/partition', 488)"
],
self.test_result.bucket)
self.assertEqual([
# 'ip addr list bridge',
'groupadd slapsoft',
'useradd -d /software_root -g slapsoft -s /bin/false slapsoft',
'groupadd testuser',
'useradd -d /instance_root/partition -g testuser -s /bin/false -G slapsoft testuser',
# 'ip addr add ip/255.255.255.255 dev bridge',
# 'ip addr list bridge',
# 'ip addr add ip/ffff:ffff:ffff:ffff:: dev bridge',
# 'ip addr list bridge',
],
self.fakeCallAndRead.external_command_list)
def test_construct_prepared_no_alter_network_user(self):
computer = slapos.format.Computer('computer',
bridge=slapos.format.Bridge('bridge', '127.0.0.1/16'))
computer.instance_root = '/instance_root'
computer.software_root = '/software_root'
partition = slapos.format.Partition('partition', '/part_path',
slapos.format.User('testuser'), [], None)
partition.tap = slapos.format.Tap('tap')
computer.partition_list = [partition]
global INTERFACE_DICT
INTERFACE_DICT['bridge'] = {
2: [{'addr': '192.168.242.77', 'broadcast': '127.0.0.1',
'netmask': '255.255.255.0'}],
10: [{'addr': '2a01:e35:2e27::e59c', 'netmask': 'ffff:ffff:ffff:ffff::'}]
}
computer.construct(alter_network=False, alter_user=False)
self.assertEqual([
"makedirs('/instance_root', 493)",
"makedirs('/software_root', 493)",
"chmod('/software_root', 493)",
"mkdir('/instance_root/partition', 488)",
"chmod('/instance_root/partition', 488)"
],
self.test_result.bucket)
self.assertEqual([
# 'ip addr list bridge',
# 'ip addr add ip/255.255.255.255 dev bridge',
# 'ip addr list bridge',
# 'ip addr add ip/ffff:ffff:ffff:ffff:: dev bridge',
# 'ip addr list bridge',
],
self.fakeCallAndRead.external_command_list)
class TestPartition(SlapformatMixin):
def test_createPath(self):
global USER_LIST
USER_LIST = ['testuser']
self.partition.createPath()
self.assertEqual(
[
"mkdir('/part_path', 488)",
"chown('/part_path', 0, 0)",
"chmod('/part_path', 488)"
],
self.test_result.bucket
)
def test_createPath_no_alter_user(self):
self.partition.createPath(False)
self.assertEqual(
[
"mkdir('/part_path', 488)",
"chmod('/part_path', 488)"
],
self.test_result.bucket
)
class TestUser(SlapformatMixin):
def test_create(self):
user = slapos.format.User('doesnotexistsyet')
user.setPath('/doesnotexistsyet')
user.create()
self.assertEqual([
'groupadd doesnotexistsyet',
'useradd -d /doesnotexistsyet -g doesnotexistsyet -s /bin/false '\
'doesnotexistsyet'
],
self.fakeCallAndRead.external_command_list)
def test_create_additional_groups(self):
user = slapos.format.User('doesnotexistsyet', ['additionalgroup1',
'additionalgroup2'])
user.setPath('/doesnotexistsyet')
user.create()
self.assertEqual([
'groupadd doesnotexistsyet',
'useradd -d /doesnotexistsyet -g doesnotexistsyet -s /bin/false -G '\
'additionalgroup1,additionalgroup2 doesnotexistsyet'
],
self.fakeCallAndRead.external_command_list)
def test_create_group_exists(self):
global GROUP_LIST
GROUP_LIST = ['testuser']
user = slapos.format.User('testuser')
user.setPath('/testuser')
user.create()
self.assertEqual([
'useradd -d /testuser -g testuser -s /bin/false testuser'
],
self.fakeCallAndRead.external_command_list)
def test_create_user_exists_additional_groups(self):
global USER_LIST
USER_LIST = ['testuser']
user = slapos.format.User('testuser', ['additionalgroup1',
'additionalgroup2'])
user.setPath('/testuser')
user.create()
self.assertEqual([
'groupadd testuser',
'usermod -d /testuser -g testuser -s /bin/false -G '\
'additionalgroup1,additionalgroup2 testuser'
],
self.fakeCallAndRead.external_command_list)
def test_create_user_exists(self):
global USER_LIST
USER_LIST = ['testuser']
user = slapos.format.User('testuser')
user.setPath('/testuser')
user.create()
self.assertEqual([
'groupadd testuser',
'usermod -d /testuser -g testuser -s /bin/false testuser'
],
self.fakeCallAndRead.external_command_list)
def test_create_user_group_exists(self):
global USER_LIST
USER_LIST = ['testuser']
global GROUP_LIST
GROUP_LIST = ['testuser']
user = slapos.format.User('testuser')
user.setPath('/testuser')
user.create()
self.assertEqual([
'usermod -d /testuser -g testuser -s /bin/false testuser'
],
self.fakeCallAndRead.external_command_list)
def test_isAvailable(self):
global USER_LIST
USER_LIST = ['testuser']
user = slapos.format.User('testuser')
self.assertTrue(user.isAvailable())
def test_isAvailable_notAvailable(self):
user = slapos.format.User('doesnotexistsyet')
self.assertFalse(user.isAvailable())
if __name__ == '__main__':
unittest.main()
from slapos.grid import slapgrid
import httplib
import logging
import os
import shutil
import signal
import slapos.slap.slap
import socket
import sys
import tempfile
import time
import unittest
import urlparse
import xml_marshaller
class BasicMixin:
def assertSortedListEqual(self, list1, list2, msg=None):
self.assertListEqual(sorted(list1), sorted(list2), msg)
def setUp(self):
self._tempdir = tempfile.mkdtemp()
self.software_root = os.path.join(self._tempdir, 'software')
self.instance_root = os.path.join(self._tempdir, 'instance')
if getattr(self, 'master_url', None) is None:
self.master_url = 'http://127.0.0.1:0/'
self.computer_id = 'computer'
self.supervisord_socket = os.path.join(self._tempdir, 'supervisord.sock')
self.supervisord_configuration_path = os.path.join(self._tempdir,
'supervisord')
self.usage_report_periodicity = 1
self.buildout = None
logging.basicConfig(level=logging.DEBUG)
self.grid = slapgrid.Slapgrid(self.software_root, self.instance_root,
self.master_url, self.computer_id, self.supervisord_socket,
self.supervisord_configuration_path, self.usage_report_periodicity,
self.buildout)
def tearDown(self):
# XXX: Hardcoded pid, as it is not configurable in slapos
svc = os.path.join(self.instance_root, 'var', 'run', 'supervisord.pid')
if os.path.exists(svc):
try:
pid = int(open(svc).read().strip())
except ValueError:
pass
else:
os.kill(pid, signal.SIGTERM)
shutil.rmtree(self._tempdir, True)
class TestBasicSlapgridCP(BasicMixin, unittest.TestCase):
def test_no_software_root(self):
self.assertRaises(OSError, self.grid.processComputerPartitionList)
def test_no_instance_root(self):
os.mkdir(self.software_root)
self.assertRaises(OSError, self.grid.processComputerPartitionList)
def test_no_master(self):
os.mkdir(self.software_root)
os.mkdir(self.instance_root)
self.assertRaises(socket.error, self.grid.processComputerPartitionList)
class MasterMixin(BasicMixin):
def _patchHttplib(self):
"""Overrides httplib"""
import mock.httplib
self.saved_httplib = dict()
for fake in vars(mock.httplib):
self.saved_httplib[fake] = getattr(httplib, fake, None)
setattr(httplib, fake, getattr(mock.httplib, fake))
def _unpatchHttplib(self):
"""Restores httplib overriding"""
import httplib
for name, original_value in self.saved_httplib.items():
setattr(httplib, name, original_value)
del self.saved_httplib
def setUp(self):
self._patchHttplib()
BasicMixin.setUp(self)
def tearDown(self):
self._unpatchHttplib()
BasicMixin.tearDown(self)
class TestSlapgridCPWithMaster(MasterMixin, unittest.TestCase):
def test_nothing_to_do(self):
def server_response(self, path, method, body, header):
parsed_url = urlparse.urlparse(path.lstrip('/'))
parsed_qs = urlparse.parse_qs(parsed_url.query)
if parsed_url.path == 'getComputerInformation' and \
'computer_id' in parsed_qs:
slap_computer = slapos.slap.Computer(parsed_qs['computer_id'])
slap_computer._software_release_list = []
slap_computer._computer_partition_list = []
return (200, {}, xml_marshaller.xml_marshaller.dumps(slap_computer))
else:
return (404, {}, '')
httplib.HTTPConnection._callback = server_response
os.mkdir(self.software_root)
os.mkdir(self.instance_root)
self.assertTrue(self.grid.processComputerPartitionList())
self.assertSortedListEqual(os.listdir(self.instance_root), ['etc', 'var'])
self.assertSortedListEqual(os.listdir(self.software_root), [])
def test_one_partition(self):
def server_response(self, path, method, body, header):
parsed_url = urlparse.urlparse('/' + path)
parsed_qs = urlparse.parse_qs(parsed_url.query)
if parsed_url.path == '/getComputerInformation' and \
'computer_id' in parsed_qs:
slap_computer = slapos.slap.Computer(parsed_qs['computer_id'])
slap_computer._software_release_list = []
partition = slapos.slap.ComputerPartition(parsed_qs['computer_id'],
'0')
partition._need_modification = True
sr = slapos.slap.SoftwareRelease()
sr._software_release = 'http://sr/'
partition._software_release_document = sr
partition._requested_state = 'stopped'
slap_computer._computer_partition_list = [partition]
return (200, {}, xml_marshaller.xml_marshaller.dumps(slap_computer))
else:
return (404, {}, '')
httplib.HTTPConnection._callback = server_response
os.mkdir(self.software_root)
os.mkdir(self.instance_root)
partition_path = os.path.join(self.instance_root, '0')
os.mkdir(partition_path, 0750)
software_hash = slapos.grid.utils.getSoftwareUrlHash('http://sr/')
srdir = os.path.join(self.software_root, software_hash)
os.mkdir(srdir)
open(os.path.join(srdir, 'template.cfg'), 'w').write(
"""[buildout]""")
srbindir = os.path.join(srdir, 'bin')
os.mkdir(srbindir)
open(os.path.join(srbindir, 'buildout'), 'w').write("""#!/bin/sh
touch worked""")
os.chmod(os.path.join(srbindir, 'buildout'), 0755)
self.assertTrue(self.grid.processComputerPartitionList())
self.assertSortedListEqual(os.listdir(self.instance_root), ['0', 'etc',
'var'])
partition = os.path.join(self.instance_root, '0')
self.assertSortedListEqual(os.listdir(partition), ['worked',
'buildout.cfg'])
self.assertSortedListEqual(os.listdir(self.software_root),
[software_hash])
def test_one_partition_started(self):
def server_response(self, path, method, body, header):
parsed_url = urlparse.urlparse('/' + path)
parsed_qs = urlparse.parse_qs(parsed_url.query)
if parsed_url.path == '/getComputerInformation' and \
'computer_id' in parsed_qs:
slap_computer = slapos.slap.Computer(parsed_qs['computer_id'])
slap_computer._software_release_list = []
partition = slapos.slap.ComputerPartition(parsed_qs['computer_id'],
'0')
partition._need_modification = True
sr = slapos.slap.SoftwareRelease()
sr._software_release = 'http://sr/'
partition._software_release_document = sr
partition._requested_state = 'started'
slap_computer._computer_partition_list = [partition]
return (200, {}, xml_marshaller.xml_marshaller.dumps(slap_computer))
else:
return (404, {}, '')
httplib.HTTPConnection._callback = server_response
os.mkdir(self.software_root)
os.mkdir(self.instance_root)
partition_path = os.path.join(self.instance_root, '0')
os.mkdir(partition_path, 0750)
software_hash = slapos.grid.utils.getSoftwareUrlHash('http://sr/')
srdir = os.path.join(self.software_root, software_hash)
os.mkdir(srdir)
open(os.path.join(srdir, 'template.cfg'), 'w').write(
"""[buildout]""")
srbindir = os.path.join(srdir, 'bin')
os.mkdir(srbindir)
open(os.path.join(srbindir, 'buildout'), 'w').write("""#!/bin/sh
touch worked &&
mkdir -p etc/run &&
echo "#!/bin/sh" > etc/run/wrapper &&
echo "while :; do echo "Working\\nWorking\\n" ; done" >> etc/run/wrapper &&
chmod 755 etc/run/wrapper
""")
os.chmod(os.path.join(srbindir, 'buildout'), 0755)
self.assertTrue(self.grid.processComputerPartitionList())
self.assertSortedListEqual(os.listdir(self.instance_root), ['0', 'etc',
'var'])
self.assertSortedListEqual(os.listdir(partition_path), ['.0_wrapper.log',
'worked', 'buildout.cfg', 'etc'])
tries = 10
wrapper_log = os.path.join(partition_path, '.0_wrapper.log')
while tries > 0:
tries -= 1
if os.path.getsize(wrapper_log) > 0:
break
time.sleep(0.2)
self.assertTrue('Working' in open(wrapper_log, 'r').read())
self.assertSortedListEqual(os.listdir(self.software_root),
[software_hash])
def test_one_partition_started_stopped(self):
def server_response(self, path, method, body, header):
parsed_url = urlparse.urlparse('/' + path)
parsed_qs = urlparse.parse_qs(parsed_url.query)
if parsed_url.path == '/getComputerInformation' and \
'computer_id' in parsed_qs:
slap_computer = slapos.slap.Computer(parsed_qs['computer_id'])
slap_computer._software_release_list = []
partition = slapos.slap.ComputerPartition(parsed_qs['computer_id'],
'0')
partition._need_modification = True
sr = slapos.slap.SoftwareRelease()
sr._software_release = 'http://sr/'
partition._software_release_document = sr
partition._requested_state = 'started'
slap_computer._computer_partition_list = [partition]
return (200, {}, xml_marshaller.xml_marshaller.dumps(slap_computer))
else:
return (404, {}, '')
httplib.HTTPConnection._callback = server_response
os.mkdir(self.software_root)
os.mkdir(self.instance_root)
partition_path = os.path.join(self.instance_root, '0')
os.mkdir(partition_path, 0750)
software_hash = slapos.grid.utils.getSoftwareUrlHash('http://sr/')
srdir = os.path.join(self.software_root, software_hash)
os.mkdir(srdir)
open(os.path.join(srdir, 'template.cfg'), 'w').write(
"""[buildout]""")
srbindir = os.path.join(srdir, 'bin')
os.mkdir(srbindir)
open(os.path.join(srbindir, 'buildout'), 'w').write("""#!/bin/sh
touch worked &&
mkdir -p etc/run &&
(
cat <<'HEREDOC'
#!%(python)s
import signal
def handler(signum, frame):
print 'Signal handler called with signal', signum
raise SystemExit
signal.signal(signal.SIGTERM, handler)
while True:
print "Working"
HEREDOC
)> etc/run/wrapper &&
chmod 755 etc/run/wrapper
""" % dict(python = sys.executable))
os.chmod(os.path.join(srbindir, 'buildout'), 0755)
self.assertTrue(self.grid.processComputerPartitionList())
self.assertSortedListEqual(os.listdir(self.instance_root), ['0', 'etc',
'var'])
self.assertSortedListEqual(os.listdir(partition_path), ['.0_wrapper.log',
'worked', 'buildout.cfg', 'etc'])
wrapper_log = os.path.join(partition_path, '.0_wrapper.log')
tries = 10
while tries > 0:
tries -= 1
if os.path.getsize(wrapper_log) > 0:
break
time.sleep(0.2)
last_size = os.path.getsize(wrapper_log)
self.assertTrue('Working' in open(wrapper_log, 'r').read())
self.assertSortedListEqual(os.listdir(self.software_root),
[software_hash])
def server_response(self, path, method, body, header):
parsed_url = urlparse.urlparse('/' + path)
parsed_qs = urlparse.parse_qs(parsed_url.query)
if parsed_url.path == '/getComputerInformation' and \
'computer_id' in parsed_qs:
slap_computer = slapos.slap.Computer(parsed_qs['computer_id'])
slap_computer._software_release_list = []
partition = slapos.slap.ComputerPartition(parsed_qs['computer_id'],
'0')
partition._need_modification = True
sr = slapos.slap.SoftwareRelease()
sr._software_release = 'http://sr/'
partition._software_release_document = sr
partition._requested_state = 'stopped'
slap_computer._computer_partition_list = [partition]
return (200, {}, xml_marshaller.xml_marshaller.dumps(slap_computer))
else:
return (404, {}, '')
httplib.HTTPConnection._callback = server_response
self.assertTrue(self.grid.processComputerPartitionList())
self.assertSortedListEqual(os.listdir(self.instance_root), ['0', 'etc',
'var'])
self.assertSortedListEqual(os.listdir(partition_path), ['.0_wrapper.log',
'.0_wrapper.log.1', 'worked', 'buildout.cfg', 'etc'])
tries = 10
expected_text = 'Signal handler called with signal 15'
while tries > 0:
tries -= 1
found = expected_text in open(wrapper_log, 'r').read()
if found:
break
time.sleep(0.2)
self.assertTrue(found)
def test_one_partition_stopped_started(self):
def server_response(self, path, method, body, header):
parsed_url = urlparse.urlparse('/' + path)
parsed_qs = urlparse.parse_qs(parsed_url.query)
if parsed_url.path == '/getComputerInformation' and \
'computer_id' in parsed_qs:
slap_computer = slapos.slap.Computer(parsed_qs['computer_id'])
slap_computer._software_release_list = []
partition = slapos.slap.ComputerPartition(parsed_qs['computer_id'],
'0')
partition._need_modification = True
sr = slapos.slap.SoftwareRelease()
sr._software_release = 'http://sr/'
partition._software_release_document = sr
partition._requested_state = 'stopped'
slap_computer._computer_partition_list = [partition]
return (200, {}, xml_marshaller.xml_marshaller.dumps(slap_computer))
else:
return (404, {}, '')
httplib.HTTPConnection._callback = server_response
os.mkdir(self.software_root)
os.mkdir(self.instance_root)
partition_path = os.path.join(self.instance_root, '0')
os.mkdir(partition_path, 0750)
software_hash = slapos.grid.utils.getSoftwareUrlHash('http://sr/')
srdir = os.path.join(self.software_root, software_hash)
os.mkdir(srdir)
open(os.path.join(srdir, 'template.cfg'), 'w').write(
"""[buildout]""")
srbindir = os.path.join(srdir, 'bin')
os.mkdir(srbindir)
open(os.path.join(srbindir, 'buildout'), 'w').write("""#!/bin/sh
touch worked &&
mkdir -p etc/run &&
echo "#!/bin/sh" > etc/run/wrapper &&
echo "while :; do echo "Working\\nWorking\\n" ; done" >> etc/run/wrapper &&
chmod 755 etc/run/wrapper
""")
os.chmod(os.path.join(srbindir, 'buildout'), 0755)
self.assertTrue(self.grid.processComputerPartitionList())
self.assertSortedListEqual(os.listdir(self.instance_root), ['0', 'etc',
'var'])
partition = os.path.join(self.instance_root, '0')
self.assertSortedListEqual(os.listdir(partition), ['worked', 'etc',
'buildout.cfg'])
self.assertSortedListEqual(os.listdir(self.software_root),
[software_hash])
def server_response(self, path, method, body, header):
parsed_url = urlparse.urlparse('/' + path)
parsed_qs = urlparse.parse_qs(parsed_url.query)
if parsed_url.path == '/getComputerInformation' and \
'computer_id' in parsed_qs:
slap_computer = slapos.slap.Computer(parsed_qs['computer_id'])
slap_computer._software_release_list = []
partition = slapos.slap.ComputerPartition(parsed_qs['computer_id'],
'0')
partition._need_modification = True
sr = slapos.slap.SoftwareRelease()
sr._software_release = 'http://sr/'
partition._software_release_document = sr
partition._requested_state = 'started'
slap_computer._computer_partition_list = [partition]
return (200, {}, xml_marshaller.xml_marshaller.dumps(slap_computer))
else:
return (404, {}, '')
httplib.HTTPConnection._callback = server_response
self.assertTrue(self.grid.processComputerPartitionList())
self.assertSortedListEqual(os.listdir(self.instance_root), ['0', 'etc',
'var'])
partition = os.path.join(self.instance_root, '0')
self.assertSortedListEqual(os.listdir(partition), ['.0_wrapper.log',
'worked', 'etc', 'buildout.cfg'])
self.assertSortedListEqual(os.listdir(self.software_root),
[software_hash])
tries = 10
wrapper_log = os.path.join(partition_path, '.0_wrapper.log')
while tries > 0:
tries -= 1
if os.path.getsize(wrapper_log) > 0:
break
time.sleep(0.2)
self.assertTrue('Working' in open(wrapper_log, 'r').read())
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment