From 79a91fb4a1ffac3196e69ff7ab05f2e6c8d73ced Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?J=C3=A9rome=20Perrin?= <jerome@nexedi.com>
Date: Fri, 28 Dec 2018 03:42:09 +0100
Subject: [PATCH] grid,manager: close xmlrpc.ServerProxy sockets

by using it getSupervisorRPC in a context manager which on python 3
automatically closes.
This needs fix from https://github.com/Supervisor/supervisor/issues/1184
we assume that this fix is only on python 3.
On python2, we keep this same behavior of not closing socket explicitly
and leaving it to destructors.

Also re-raise a few error cases that were ignored.
---
 slapos/grid/SlapObject.py   | 95 ++++++++++++++++++++-----------------
 slapos/grid/svcbackend.py   | 39 ++++++++++-----
 slapos/manager/portredir.py | 12 ++---
 slapos/manager/prerm.py     | 20 ++++----
 4 files changed, 94 insertions(+), 72 deletions(-)

diff --git a/slapos/grid/SlapObject.py b/slapos/grid/SlapObject.py
index caa7cef84..dc35f0b25 100644
--- a/slapos/grid/SlapObject.py
+++ b/slapos/grid/SlapObject.py
@@ -708,14 +708,16 @@ class Partition(object):
     """Asks supervisord to start the instance. If this instance is not
     installed, we install it.
     """
-    supervisor = self.getSupervisorRPC()
     partition_id = self.computer_partition.getId()
     try:
-      supervisor.startProcessGroup(partition_id, False)
+      with self.getSupervisorRPC() as supervisor:
+        supervisor.startProcessGroup(partition_id, False)
     except xmlrpclib.Fault as exc:
       if exc.faultString.startswith('BAD_NAME:'):
         self.logger.info("Nothing to start on %s..." %
                          self.computer_partition.getId())
+      else:
+        raise
     else:
       self.logger.info("Requested start of %s..." % self.computer_partition.getId())
 
@@ -723,11 +725,13 @@ class Partition(object):
     """Asks supervisord to stop the instance."""
     partition_id = self.computer_partition.getId()
     try:
-      supervisor = self.getSupervisorRPC()
-      supervisor.stopProcessGroup(partition_id, False)
+      with self.getSupervisorRPC() as supervisor:
+        supervisor.stopProcessGroup(partition_id, False)
     except xmlrpclib.Fault as exc:
       if exc.faultString.startswith('BAD_NAME:'):
         self.logger.info('Partition %s not known in supervisord, ignoring' % partition_id)
+      else:
+        raise
     else:
       self.logger.info("Requested stop of %s..." % self.computer_partition.getId())
 
@@ -798,15 +802,18 @@ class Partition(object):
 
   def checkProcessesFromStateList(self, process_list, state_list):
     """Asks supervisord to check if one of the processes are in the state_list."""
-    supervisor = self.getSupervisorRPC()
     for process in process_list:
       try:
-        info = supervisor.getProcessInfo(process)
+        with self.getSupervisorRPC() as supervisor:
+          info = supervisor.getProcessInfo(process)
         if info['statename'] in state_list:
           return True
       except xmlrpclib.Fault as exc:
-        self.logger.debug("BAD process name: %r" % process)
-        continue
+        if exc.faultString.startswith('BAD_NAME:'):
+          self.logger.debug("BAD process name: %r" % process)
+          continue
+        else:
+          raise
     return False
 
   def cleanupFolder(self, folder_path):
@@ -832,43 +839,43 @@ class Partition(object):
     #       In future it will not be needed, as update command
     #       is going to be implemented on server side.
     self.logger.debug('Updating supervisord')
-    supervisor = self.getSupervisorRPC()
-    # took from supervisord.supervisorctl.do_update
-    result = supervisor.reloadConfig()
-    added, changed, removed = result[0]
-
-    for gname in removed:
-      results = supervisor.stopProcessGroup(gname)
-      fails = [res for res in results
-               if res['status'] == xmlrpc.Faults.FAILED]
-      if fails:
-        self.logger.warning('Problem while stopping process %r, will try later' % gname)
-      else:
+    with self.getSupervisorRPC() as supervisor:
+      # took from supervisord.supervisorctl.do_update
+      result = supervisor.reloadConfig()
+      added, changed, removed = result[0]
+
+      for gname in removed:
+        results = supervisor.stopProcessGroup(gname)
+        fails = [res for res in results
+                 if res['status'] == xmlrpc.Faults.FAILED]
+        if fails:
+          self.logger.warning('Problem while stopping process %r, will try later' % gname)
+        else:
+          self.logger.info('Stopped %r' % gname)
+        for i in range(0, 10):
+          # Some process may be still running, be nice and wait for them to be stopped.
+          try:
+            supervisor.removeProcessGroup(gname)
+            break
+          except:
+            if i == 9:
+              raise
+            time.sleep(1)
+
+        self.logger.info('Removed %r' % gname)
+
+      for gname in changed:
+        results = supervisor.stopProcessGroup(gname)
         self.logger.info('Stopped %r' % gname)
-      for i in range(0, 10):
-        # Some process may be still running, be nice and wait for them to be stopped.
-        try:
-          supervisor.removeProcessGroup(gname)
-          break
-        except:
-          if i == 9:
-            raise
-          time.sleep(1)
-
-      self.logger.info('Removed %r' % gname)
-
-    for gname in changed:
-      results = supervisor.stopProcessGroup(gname)
-      self.logger.info('Stopped %r' % gname)
-
-      supervisor.removeProcessGroup(gname)
-      supervisor.addProcessGroup(gname)
-      self.logger.info('Updated %r' % gname)
-
-    for gname in added:
-      supervisor.addProcessGroup(gname)
-      self.logger.info('Updated %r' % gname)
-    self.logger.debug('Supervisord updated')
+
+        supervisor.removeProcessGroup(gname)
+        supervisor.addProcessGroup(gname)
+        self.logger.info('Updated %r' % gname)
+
+      for gname in added:
+        supervisor.addProcessGroup(gname)
+        self.logger.info('Updated %r' % gname)
+      self.logger.debug('Supervisord updated')
 
   def _set_ownership(self, path):
     """
diff --git a/slapos/grid/svcbackend.py b/slapos/grid/svcbackend.py
index 241d87235..668680cee 100644
--- a/slapos/grid/svcbackend.py
+++ b/slapos/grid/svcbackend.py
@@ -36,6 +36,7 @@ import stat
 import sys
 import time
 from six.moves import xmlrpc_client as xmlrpclib
+import contextlib
 
 from slapos.grid.utils import (createPrivateDirectory, SlapPopen, updateFile)
 from slapos.util import bytes2str
@@ -43,13 +44,25 @@ from slapos.util import bytes2str
 from supervisor import xmlrpc, states
 
 
+@contextlib.contextmanager
 def getSupervisorRPC(socket):
+  """Get a supervisor XML-RPC connection.
+
+  Use in a context manager for proper closing of sockets.
+  """
   supervisor_transport = xmlrpc.SupervisorTransport('', '',
       'unix://' + socket)
   server_proxy = xmlrpclib.ServerProxy('http://127.0.0.1',
       supervisor_transport)
-  return getattr(server_proxy, 'supervisor')
 
+  # python3's xmlrpc is a closing context manager, python2 is not and cannot be
+  # just used as a context manager as it would call __enter__ and __exit__ on
+  # XML-RPC.
+  if sys.version_info.major == 2:
+    yield server_proxy.supervisor
+  else:
+    with server_proxy as s:
+      yield s.supervisor
 
 def _getSupervisordSocketPath(instance_root):
   return os.path.join(instance_root, 'supervisord.socket')
@@ -116,13 +129,13 @@ def _updateWatchdog(socket):
   Then, when running slapgrid, the real watchdog configuration is generated.
   We thus need to reload watchdog configuration if needed and start it.
   """
-  supervisor = getSupervisorRPC(socket)
-  if supervisor.getProcessInfo('watchdog')['state'] not in states.RUNNING_STATES:
-    # XXX workaround for https://github.com/Supervisor/supervisor/issues/339
-    # In theory, only reloadConfig is needed.
-    supervisor.removeProcessGroup('watchdog')
-    supervisor.reloadConfig()
-    supervisor.addProcessGroup('watchdog')
+  with getSupervisorRPC(socket) as supervisor:
+    if supervisor.getProcessInfo('watchdog')['state'] not in states.RUNNING_STATES:
+      # XXX workaround for https://github.com/Supervisor/supervisor/issues/339
+      # In theory, only reloadConfig is needed.
+      supervisor.removeProcessGroup('watchdog')
+      supervisor.reloadConfig()
+      supervisor.addProcessGroup('watchdog')
 
 def launchSupervisord(instance_root, logger,
                       supervisord_additional_argument_list=None):
@@ -132,13 +145,15 @@ def launchSupervisord(instance_root, logger,
     trynum = 1
     while trynum < 6:
       try:
-        supervisor = getSupervisorRPC(socket)
-        status = supervisor.getState()
+        with getSupervisorRPC(socket) as supervisor:
+          status = supervisor.getState()
       except xmlrpclib.Fault as e:
         if e.faultCode == 6 and e.faultString == 'SHUTDOWN_STATE':
           logger.info('Supervisor in shutdown procedure, will check again later.')
           trynum += 1
           time.sleep(2 * trynum)
+        else:
+          raise
       except Exception:
         # In case if there is problem with connection, assume that supervisord
         # is not running and try to run it
@@ -187,8 +202,8 @@ def launchSupervisord(instance_root, logger,
     while trynum < 6:
       try:
         socketlib.setdefaulttimeout(current_timeout)
-        supervisor = getSupervisorRPC(socket)
-        status = supervisor.getState()
+        with getSupervisorRPC(socket) as supervisor:
+          status = supervisor.getState()
         if status['statename'] == 'RUNNING' and status['statecode'] == 1:
           return
         logger.warning('Wrong status name %(statename)r and code '
diff --git a/slapos/manager/portredir.py b/slapos/manager/portredir.py
index 777246efa..09cd0a294 100644
--- a/slapos/manager/portredir.py
+++ b/slapos/manager/portredir.py
@@ -167,13 +167,13 @@ class Manager(object):
     partition.writeSupervisorConfigurationFile()
 
     # Start processes
-    supervisord = partition.getSupervisorRPC()
-    for program in socat_programs:
-      process_name = '{}:{}'.format(group_id, program['name'])
-      status = supervisord.getProcessInfo(process_name)
+    with partition.getSupervisorRPC() as supervisor:
+      for program in socat_programs:
+        process_name = '{}:{}'.format(group_id, program['name'])
+        status = supervisor.getProcessInfo(process_name)
 
-      if status['start'] == 0:
-        supervisord.startProcess(process_name, False)
+        if status['start'] == 0:
+          supervisor.startProcess(process_name, False)
 
   def report(self, partition):
     """Method called at `slapos node report` phase.
diff --git a/slapos/manager/prerm.py b/slapos/manager/prerm.py
index 37c5228a5..d53bb28ac 100644
--- a/slapos/manager/prerm.py
+++ b/slapos/manager/prerm.py
@@ -80,16 +80,16 @@ class Manager(object):
       partition.writeSupervisorConfigurationFile()
 
       # check the state of all process, if the process is not started yes, start it
-      supervisord = partition.getSupervisorRPC()
-      process_list_string = ""
-      for name in wrapper_list:
-        process_name = '-'.join([partition_id, group_suffix]) + ':' + name
-        process_list_string += '%s\n' % process_name
-        status = supervisord.getProcessInfo(process_name)
-        if status['start'] == 0:
-          # process is not started yet
-          logger.info("Starting pre-delete process %r..." % name)
-          supervisord.startProcess(process_name, False)
+      with partition.getSupervisorRPC() as supervisor:
+        process_list_string = ""
+        for name in wrapper_list:
+          process_name = '-'.join([partition_id, group_suffix]) + ':' + name
+          process_list_string += '%s\n' % process_name
+          status = supervisor.getProcessInfo(process_name)
+          if status['start'] == 0:
+            # process is not started yet
+            logger.info("Starting pre-delete process %r..." % name)
+            supervisor.startProcess(process_name, False)
 
       # ask to slapgrid to check theses scripts before destroy partition
       with open(wait_filepath, 'w') as f:
-- 
2.30.9