From 2bf5077cfb78b5e7c4b6e09262ca2b3a44a79dc4 Mon Sep 17 00:00:00 2001 From: Roque <roqueporchetto@gmail.com> Date: Thu, 7 Sep 2017 10:51:18 +0200 Subject: [PATCH] testnode: move code from slapos.toolbox to testnode for communication with SlapOS master - The base of this code comes from some work done by Rafael in here: https://lab.nexedi.com/nexedi/slapos.toolbox/blob/master/slapos/agent/tester.py - Refactor in communication with slap objects for supply and request. - Added method to check if instance is requested. - Refactor in log calls. --- .../util/testnode/SlapOSMasterCommunicator.py | 688 +++++++++++++----- 1 file changed, 501 insertions(+), 187 deletions(-) diff --git a/erp5/util/testnode/SlapOSMasterCommunicator.py b/erp5/util/testnode/SlapOSMasterCommunicator.py index 6eefd2fcdb..952553b62f 100644 --- a/erp5/util/testnode/SlapOSMasterCommunicator.py +++ b/erp5/util/testnode/SlapOSMasterCommunicator.py @@ -1,206 +1,520 @@ +import datetime import json -import httplib -import urlparse +import sys +import traceback import time +#import feedparser +from uritemplate import expand -TIMEOUT = 30 +import slapos.slap +from slapos.slap import SoftwareProductCollection + +from slapos.slap.slap import ConnectionError +from requests.exceptions import HTTPError +from erp5.util.taskdistribution import SAFE_RPC_EXCEPTION_LIST + + +SOFTWARE_PRODUCT_NAMESPACE = "product." + + +SOFTWARE_STATE_UNKNOWN = "SOFTWARE_STATE_UNKNOWN" +SOFTWARE_STATE_INSTALLING = "SOFTWARE_STATE_INSTALLING" +SOFTWARE_STATE_INSTALLED = "SOFTWARE_STATE_INSTALLED" +SOFTWARE_STATE_DESTROYING = "SOFTWARE_STATE_DESTROYING" + +INSTANCE_STATE_UNKNOWN = "INSTANCE_STATE_UNKNOWN" +INSTANCE_STATE_STARTING = "INSTANCE_STATE_STARTING" +INSTANCE_STATE_STARTED = "started" +INSTANCE_STATE_STARTED_WITH_ERROR = "INSTANCE_STATE_STARTED_WITH_ERROR" +INSTANCE_STATE_STOPPING = "INSTANCE_STATE_STOPPING" +INSTANCE_STATE_STOPPED = "stopped" +INSTANCE_STATE_DESTROYING = "INSTANCE_STATE_DESTROYING" + +TESTER_STATE_INITIAL = "TESTER_STATE_INITIAL" +TESTER_STATE_NOTHING = "TESTER_STATE_NOTHING" +TESTER_STATE_SOFTWARE_INSTALLED = "TESTER_STATE_SOFTWARE_INSTALLED" +TESTER_STATE_INSTANCE_INSTALLED = "TESTER_STATE_INSTANCE_INSTALLED" +TESTER_STATE_INSTANCE_STARTED = "TESTER_STATE_INSTANCE_STARTED" +TESTER_STATE_INSTANCE_UNINSTALLED = "TESTER_STATE_INSTANCE_UNINSTALLED" + + +# Simple decorator to prevent raise due small +# network failures. +def retryOnNetworkFailure(func): + def wrapper(*args, **kwargs): + retry_time = 64 + while True: + try: + return func(*args, **kwargs) + except SAFE_RPC_EXCEPTION_LIST, e: + print 'Network failure: %s , %s' % (sys.exc_info(), e) + except HTTPError, e: + print 'Network failure: %s , %s' % (sys.exc_info(), e) + except ConnectionError, e: + print 'Network failure: %s , %s' % (sys.exc_info(), e) + except slapos.slap.ConnectionError, e: + print 'Network failure: %s , %s' % (sys.exc_info(), e) + + print 'Retry method %s in %i seconds' % (func, retry_time) + time.sleep(retry_time) + retry_time = min(retry_time*1.5, 640) + + + wrapper.__name__ = func.__name__ + wrapper.__doc__ = func.__doc__ + return wrapper -# TODO: News-> look list to get last news... (and not the first of the list) class SlapOSMasterCommunicator(object): - """ - Communication with slapos Master using Hateoas. - - collection: collection of data (hosting_subscription, instance, software_release...) - hosting_subscription: result of a request - instance(s): instance(s) related to an hosting_subscription - - usage: ex: - # Try to reuse same communicator, because initilization step may takes a lot of time - # due to listing of all instances (alive or not) related to the specified slapOS account. - communicator = SlapOSMasterCommunicator() + + + def __init__(self, slap, slap_supply, slap_order, url, logger ): + + self._logger = logger + self.slap = slap + self.slap_order = slap_order + self.slap_supply = slap_supply + self.hateoas_navigator = self.slap._hateoas_navigator + self.hosting_subscription_url = None + + if url is not None and \ + url.startswith(SOFTWARE_PRODUCT_NAMESPACE): + + product = SoftwareProductCollection(self._logger, self.slap) + try: + url = product.__getattr__(url[len(SOFTWARE_PRODUCT_NAMESPACE):]) + except AttributeError as e: + self._logger.warning('Error on get software release : %s ' % e.message) + + self.url = url + + @retryOnNetworkFailure + def _supply(self): + if self.computer_guid is None: + self._logger ('Nothing to supply for %s.' % (self.name)) + return None + self._logger('Supply %s@%s', self.url, self.computer_guid) + return self.slap_supply.supply(self.url, self.computer_guid) + + @retryOnNetworkFailure + def _request(self, state, instance_title=None, request_kw=None): + if instance_title is not None: + self.name = instance_title + if request_kw is not None: + if isinstance(request_kw, str) or \ + isinstance(request_kw, unicode): + self.request_kw = json.loads(request_kw) + else: + self.request_kw = request_kw + self._logger('Request %s@%s: %s', self.url, self.name, state) + self.latest_state = state + return self.slap_order.request( + software_release=self.url, + partition_reference=self.name, + state=state, + **self.request_kw) + + def isInstanceRequested(self, instance_title): + hateoas = getattr(self.slap, '_hateoas_navigator', None) + return instance_title in hateoas.getHostingSubscriptionDict() + + @retryOnNetworkFailure + def _hateoas_getComputer(self, reference): + + root_document = self.hateoas_navigator.getRootDocument() + search_url = root_document["_links"]['raw_search']['href'] + + getter_link = expand(search_url, { + "query": "reference:%s AND portal_type:Computer" % reference, + "select_list": ["relative_url"], + "limit": 1}) + + result = self.hateoas_navigator.GET(getter_link) + content_list = json.loads(result)['_embedded']['contents'] + + if len(content_list) == 0: + raise Exception('No Computer found.') + + computer_relative_url = content_list[0]["relative_url"] - # Print news related to 'TestScalability_21423104630420' all instances - instance_link_list = communicator._getRelatedInstanceLink('TestScalability_21423104630420') - for instance_link in instance_link_list: - news = communicator.getNewsFromInstanceLink(instance_link) - print news['news'] - """ - def __init__(self, certificate_path, key_path, log, - url): - # Create connection - api_scheme, api_netloc, api_path, api_query, api_fragment = urlparse.urlsplit(url) - self.log = log - self.certificate_path = certificate_path - self.key_path = key_path - self.url = url - self.connection = self._getConnection(self.certificate_path, self.key_path, self.url) - # Get master - master_link = {'href':api_path,'type':"application/vnd.slapos.org.hal+json; class=slapos.org.master"} - master = self._curl(master_link) - self.person_link = master['_links']['http://slapos.org/reg/me'] - # Get person related to specified key/certificate provided - person = self._curl(self.person_link) - self.personnal_collection_link = person['_links']['http://slapos.org/reg/hosting_subscription'] - # Get collection (of hosting subscriptions) - collection = self._curl(self.personnal_collection_link) - # XXX: This part may be extremly long (because here no hosting subscriptions - # has been visited) - self.hosting_subcriptions_dict = {} - self.visited_hosting_subcriptions_link_list = [] - self.log("SlapOSMasterCommunicator will read all hosting subscriptions entries, " - "it may take several time...") - self._update_hosting_subscription_informations() + getter_url = self.hateoas_navigator.getDocumentAndHateoas( + computer_relative_url) + + return json.loads(self.hateoas_navigator.GET(getter_url)) - def _getConnection(self,certificate_path, key_path, url): - api_scheme, api_netloc, api_path, api_query, api_fragment = urlparse.urlsplit(url) - #self.log("HTTPS Connection with: %s, cert=%s, key=%s" %(api_netloc,key_path,certificate_path)) - return httplib.HTTPSConnection(api_netloc, key_file=key_path, cert_file=certificate_path, timeout=TIMEOUT) + + @retryOnNetworkFailure + def getSoftwareInstallationList(self): + # XXX Move me to slap.py API - def _curl(self, link): - """ - 'link' must look like : {'href':url,'type':content_type} - """ - # Set timeout - import socket - socket.setdefaulttimeout(1.0*TIMEOUT) + computer = self._hateoas_getComputer(self.computer_guid) + + # Not a list ? + action = computer['_links']['action_object_slap'] + + if action.get('title') == 'getHateoasSoftwareInstallationList': + getter_link = action['href'] + else: + raise Exception('No Link found found.') + + result = self.hateoas_navigator.GET(getter_link) + return json.loads(result)['_links']['content'] + + + @retryOnNetworkFailure + def getSoftwareInstallationNews(self): + getter_link = None + for si in self.getSoftwareInstallationList(): + if si["title"] == self.url: + getter_link = si["href"] + break + + # We could not find the document, so it is probably too soon. + if getter_link is None: + return "" + + result = self.hateoas_navigator.GET(getter_link) + action_object_slap_list = json.loads(result)['_links']['action_object_slap'] + + for action in action_object_slap_list: + if action.get('title') == 'getHateoasNews': + getter_link = action['href'] + break + else: + raise Exception('getHateoasNews not found.') + + result = self.hateoas_navigator.GET(getter_link) + if len(json.loads(result)['news']) > 0: + return json.loads(result)['news'][0]["text"] + return "" + + @retryOnNetworkFailure + def getInstanceUrlList(self): + + if self.hosting_subscription_url is None: + hosting_subscription_dict = self.hateoas_navigator._hateoas_getHostingSubscriptionDict() + for hs in hosting_subscription_dict: + if hs['title'] == self.name: + self.hosting_subscription_url = hs['href'] + break + + if self.hosting_subscription_url is None: + return None + + return self.hateoas_navigator.getHateoasInstanceList( + self.hosting_subscription_url) + + @retryOnNetworkFailure + def getNewsFromInstance(self, url): + + result = self.hateoas_navigator.GET(url) + result = json.loads(result) + if result['_links'].get('action_object_slap', None) is None: + return None + + object_link = self.hateoas_navigator.hateoasGetLinkFromLinks( + result['_links']['action_object_slap'], 'getHateoasNews') - api_scheme, api_netloc, api_path, api_query, api_fragment = urlparse.urlsplit(link['href']) - max_retry = 10 - # Try to use existing conection + result = self.hateoas_navigator.GET(object_link) + return json.loads(result)['news'] + + @retryOnNetworkFailure + def getInformationFromInstance(self, url): + + result = self.hateoas_navigator.GET(url) + result = json.loads(result) + if result['_links'].get('action_object_slap', None) is None: + print result['links'] + return None + + object_link = self.hateoas_navigator.hateoasGetLinkFromLinks( + result['_links']['action_object_slap'], 'getHateoasInformation') + + result = self.hateoas_navigator.GET(object_link) + return json.loads(result) + + +class SoftwareReleaseTester(SlapOSMasterCommunicator): + deadline = None + latest_state = None + + def __init__(self, + name, + logger, + slap, + slap_order, + slap_supply, + url, # software release url + computer_guid=None, # computer for supply if desired + request_kw=None, # instance parameters, if instantiation + # testing is desired + software_timeout=3600, + instance_timeout=3600, + ): + + super(SoftwareReleaseTester, self).__init__( + slap, slap_supply, slap_order, url, logger) + + self.name = name + self.computer_guid = computer_guid + + if isinstance(request_kw, str) or \ + isinstance(request_kw, unicode): + self.request_kw = json.loads(request_kw) + else: + self.request_kw = request_kw + self.message_history = [] + + self.state = TESTER_STATE_INITIAL + self.transition_dict = { + # step function + # delay + # next_state + # software_state + # instance_state + TESTER_STATE_INITIAL: ( + lambda t: None, + None, + TESTER_STATE_NOTHING, + None, + None, + ), + TESTER_STATE_NOTHING: ( + lambda t: t._supply("available"), + int(software_timeout), + request_kw is None and TESTER_STATE_INSTANCE_UNINSTALLED or \ + TESTER_STATE_SOFTWARE_INSTALLED, + SOFTWARE_STATE_INSTALLED, + None, + ), + TESTER_STATE_SOFTWARE_INSTALLED: ( + lambda t: t._request("started"), + int(instance_timeout), + TESTER_STATE_INSTANCE_STARTED, + None, + INSTANCE_STATE_STARTED, + ), + TESTER_STATE_INSTANCE_STARTED: ( + lambda t: t._request("destroyed"), + int(1200), + TESTER_STATE_INSTANCE_UNINSTALLED, + None, + INSTANCE_STATE_STOPPED, + ), + TESTER_STATE_INSTANCE_UNINSTALLED: ( + lambda t: t._supply("destroyed"), + int(1200), + None, + None, + None, + ), + } + + def __repr__(self): + deadline = self.deadline + if deadline is not None: + deadline -= time.time() + deadline = '+%is' % (deadline, ) + return '<%s(state=%s, deadline=%s) at %x>' % ( + self.__class__.__name__, self.state, deadline, id(self)) + + def getInfo(self): + info = "" + info += "Software Release URL: %s\n" % (self.url) + if self.computer_guid is not None: + info += "Supply requested on: %s\n" % (self.computer_guid) + info += "Instance Requested (Parameters): %s\n" % self.request_kw + return info + + def getFormatedLastMessage(self): + if len(self.message_history) == 0: + return "No message" + + summary = "Summary about the test. Instance List and Status:\n" + message = "Last information about the tester:\n" + if self.message_history[-1] is not None: + message_list = self.message_history[-1] + for entry in message_list: + summary += "%s %s -> %s\n" % ( + entry['title'], entry["slave"] and "(slave)" or "", entry['state']) + for prop in entry: + if prop != "information": + message += "%s = %s\n" % (prop, json.dumps(entry[prop], indent=2)) + + message += "=== connection_dict === \n%s\n" % ( + json.dumps(entry["information"]["connection_dict"], indent=2)) + message += "\n" + message += "=== parameter_dict === \n%s\n" % ( + json.dumps(entry["information"]["parameter_dict"], indent=2)) + message += "\n" + message += "="*79 + message += "\n\n\n" + + return summary + message + + def _getSoftwareState(self): + if self.computer_guid is None: + return SOFTWARE_STATE_INSTALLED + + message = self.getSoftwareInstallationNews() + if message.startswith("#error no data found"): + return SOFTWARE_STATE_UNKNOWN + + if message.startswith('#access software release'): + return SOFTWARE_STATE_INSTALLED + + if message.startswith('#error'): + return SOFTWARE_STATE_INSTALLING + + return SOFTWARE_STATE_UNKNOWN + + @retryOnNetworkFailure + def getRSSEntryFromMonitoring(self, base_url): + if base_url is None: + return {} + + feed_url = base_url + '/monitor-public/rssfeed.html' + d = feedparser.parse(feed_url) + + if len(d.entries) > 0: + return {"date": d.entries[0].published, + "message": d.entries[0].description, + "title" : d.entries[0].title} + + return {} + + @retryOnNetworkFailure + def _getInstanceState(self): + latest_state = self.latest_state + self._logger('latest_state = %r', latest_state) + + if latest_state is None: + return INSTANCE_STATE_UNKNOWN + + message_list = [] try: - self.connection.request(method='GET', url=api_path, headers={'Accept': link['type']}, body="") - response = self.connection.getresponse() - return json.loads(response.read()) - # Create and use new connection + for instance in self.getInstanceUrlList(): + news = self.getNewsFromInstance(instance["href"]) + information = self.getInformationFromInstance(instance["href"]) + state = INSTANCE_STATE_UNKNOWN + monitor_information_dict = {} + + info_created_at = "-1" + is_slave = information['slave'] + if is_slave: + if (information["connection_dict"]) > 0: + state = INSTANCE_STATE_STARTED + else: + # not slave + instance_state = news[0] + if instance_state.get('created_at', '-1') != "-1": + # the following does NOT take TZ into account + created_at = datetime.datetime.strptime(instance_state['created_at'], + '%a, %d %b %Y %H:%M:%S %Z') + gmt_now = datetime.datetime(*time.gmtime()[:6]) + + info_created_at = '%s (%d)' % ( + instance_state['created_at'], (gmt_now - created_at).seconds) + + if instance_state['text'].startswith('#access'): + state = INSTANCE_STATE_STARTED + + if instance_state['text'].startswith('#access Instance correctly stopped'): + state = INSTANCE_STATE_STOPPED + + if instance_state['text'].startswith('#error'): + state = INSTANCE_STATE_STARTED_WITH_ERROR + + if state == INSTANCE_STATE_STARTED_WITH_ERROR: + # search for monitor url + monitor_v6_url = information["connection_dict"].get("monitor_v6_url") + try: + monitor_information_dict = self.getRSSEntryFromMonitoring(monitor_v6_url) + except Exception: + self._logger ('Unable to download promises for: %s' % (instance["title"])) + self._logger (traceback.format_exc()) + monitor_information_dict = {"message": "Unable to download"} + + message_list.append({ + 'title': instance["title"], + 'slave': is_slave, + 'news': news[0], + 'information': information, + 'monitor': monitor_information_dict, + 'state': state + }) + + except slapos.slap.ServerError: + self._logger ('Got an error requesting partition for ' + 'its state') + return INSTANCE_STATE_UNKNOWN except: - retry = 0 - # (re)Try several time to use new connection - while retry < max_retry: - try: - self.connection = self._getConnection(self.certificate_path, self.key_path, self.url) - self.connection.request(method='GET', url=api_path, headers={'Accept': link['type']}, body="") - response = self.connection.getresponse() - return json.loads(response.read()) - except: - self.log("SlapOSMasterCommunicator: Connection failed..") - retry += 1 - time.sleep(10) - self.log("SlapOSMasterCommunicator: All connection attempts failed after %d try.." %max_retry) - raise ValueError("SlapOSMasterCommunicator: Impossible to use connection") - - def _update_hosting_subscription_informations(self): - """ - Add all not already visited hosting_subcription - # Visit all hosting subscriptions and fill a dict containing all - # new hosting subscriptions. ( like: {hs1_title:hs1_link, hs2_title:hs2_link, ..} ) - # and a list of visited hosting_subsciption ( like: [hs1_link, hs2_link, ..] ) - """ - collection = self._curl(self.personnal_collection_link) - # For each hosting_subcription present in the collection - for hosting_subscription_link in collection['_links']['item']: - if hosting_subscription_link not in self.visited_hosting_subcriptions_link_list: - hosting_subscription = self._curl(hosting_subscription_link) - self.hosting_subcriptions_dict.update({hosting_subscription['title']:hosting_subscription_link}) - self.visited_hosting_subcriptions_link_list.append(hosting_subscription_link) - - def _getRelatedInstanceLink(self, hosting_subscription_title): - """ - Return a list of all related instance_url from an hosting_subscription_title - """ - # Update informations - self._update_hosting_subscription_informations() - # Get specified hosting_subscription - hosting_subscription_link = self.hosting_subcriptions_dict[hosting_subscription_title] - hosting_subscription = self._curl(hosting_subscription_link) - assert(hosting_subscription_title == hosting_subscription['title']) - # Get instance collection related to this hosting_subscription - instance_collection_link = hosting_subscription['_links']['http://slapos.org/reg/instance'] - instance_collection = self._curl(instance_collection_link) - related_instance_link_list = [] - # For each instance present in the collection - for instance in instance_collection['_links']['item']: - related_instance_link_list.append(instance) - return related_instance_link_list - - def getNewsFromInstanceLink(self, instance_link): - instance = self._curl(instance_link) - news_link = instance['_links']['http://slapos.org/reg/news'] - return self._curl(news_link) - - def isHostingSubsciptionStatusEqualTo(self, hosting_subscription_title, excepted_news_text): - """ - Return True if all related instance state are equal to status, - or False if not or if there is are no related instances. - """ - related_instance_link_list = _getRelatedInstanceLink(hosting_subscription_title) - # For each instance present in the collection - for instance_link in related_instance_link_list: - news = self.getNewsFromInstanceLink(instance_link) - if excepted_news_text != news['news'][0]['text']: - return False - return len(related_instance_link_list) > 0 - - def isInstanceReady(self, instance_link, status): - """ - Return True if instance status and instance news text ~looks corresponding. - ( use the matching of 'correctly' and 'Instance' and status ) - """ - # XXX: SlapOS Master doesn't store any "news" about slave instances. Assume true. - if self._curl(instance_link)['slave']: - return True - text = self.getNewsFromInstanceLink(instance_link)['news'][0]['text'] - return ('Instance' in text) and ('correctly' in text) and (status in text) - - # check if provided 'status' = status - def isHostingSubscriptionReady(self, hosting_subscription_title, status): - """ - Return True if all instance status and instance news text ~looks corresponding. - ( use the matching of 'correctly' and 'Instance' and status ). - """ - instance_link_list = self._getRelatedInstanceLink(hosting_subscription_title) - for instance_link in instance_link_list: - if not self.isInstanceReady(instance_link, status): - return False - return len(instance_link_list) > 0 + self._logger("ERROR getting instance state") + return INSTANCE_STATE_UNKNOWN + + started = 0 + stopped = 0 + self.message_history.append(message_list) + for instance in message_list: + if not instance['slave'] and \ + instance['state'] in (INSTANCE_STATE_UNKNOWN, INSTANCE_STATE_STARTED_WITH_ERROR): + return instance['state'] + elif not instance['slave'] and instance['state'] == INSTANCE_STATE_STARTED: + started = 1 + elif not instance['slave'] and instance['state'] == INSTANCE_STATE_STOPPED: + stopped = 1 + + if instance['slave'] and instance['state'] == INSTANCE_STATE_UNKNOWN: + return instance['state'] + + if started and stopped: + return INSTANCE_STATE_STOPPED + return INSTANCE_STATE_UNKNOWN - def isRegisteredHostingSubscription(self, hosting_subscription_title): - """ - Return True if the specified hosting_subscription is present on SlapOSMaster - """ - self._update_hosting_subscription_informations() - if self.hosting_subcriptions_dict.get(hosting_subscription_title): - return True - return False + if started: + return INSTANCE_STATE_STARTED - def getHostingSubscriptionDict(self): + if stopped: + return INSTANCE_STATE_STOPPED + + @retryOnNetworkFailure + def teardown(self): """ - Return the dict of hosting subcription. + Interrupt a running test sequence, putting it in idle state. """ - return self.hosting_subcriptions_dict + self._logger ('Invoking TearDown for %s@%s' % (self.url, self.name)) + if self.request_kw is not None: + self._request('destroyed') + if self.computer_guid is not None: + self._supply('destroyed') + self.state = TESTER_STATE_INSTANCE_UNINSTALLED - def getHostingSubscriptionInformationDict(self, title): + def tic(self, now): """ - Return a dict with informations about Hosting subscription + Check for missed deadlines (-> test failure), conditions for moving to + next state, and actually moving to next state (executing its payload). """ - related_instance_link_list = self._getRelatedInstanceLink(title) - related_instance_link = None - # Get root instance - for link in related_instance_link_list: - instance = self._curl(link) - if title == instance['title']: - related_instance_link = link - break - # Return information dict - if related_instance_link: - related_instance = self._curl(related_instance_link) - return { - 'title': related_instance['title'], - 'status': related_instance['status'], - 'software_url': related_instance['_links']['http://slapos.org/reg/release'], - 'software_type': related_instance['software_type'], - 'computer_guid': related_instance['sla']['computer_guid'] - } - else: - return None - \ No newline at end of file + self._logger ('[DEBUG] TIC') + deadline = self.deadline + + if deadline < now and deadline is not None: + raise TestTimeout(self.state) + + _, _, next_state, software_state, instance_state = self.transition_dict[ + self.state] + + if (software_state is None or + software_state == self._getSoftwareState()) and ( + instance_state is None or + instance_state == self._getInstanceState()): + + self._logger ('[DEBUG] Going to state %s (%r)', next_state, instance_state) + if next_state is None: + return None + + self.state = next_state + stepfunc, delay, _, _, _ = self.transition_dict[next_state] + self.deadline = now + delay + stepfunc(self) + return self.deadline + -- 2.30.9