test_cli.py 44 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27
##############################################################################
#
# Copyright (c) 2013 Vifib SARL and Contributors. All Rights Reserved.
#
# WARNING: This program as such is intended to be used by professional
# programmers who take the whole responsibility of assessing all potential
# consequences resulting from its eventual inadequacies and bugs
# End users who are looking for a ready-to-use solution with commercial
# guarantees and support are strongly adviced to contract a Free Software
# Service Company
#
# This program is Free Software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 3
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA  02111-1307, USA.
#
##############################################################################

28
import json
29 30
import logging
import pprint
31
import unittest
32
import tempfile
33 34 35
import shutil
import socket
import textwrap
36
import sys
37 38 39
import os
import pkg_resources

Bryton Lacquement's avatar
Bryton Lacquement committed
40
from contextlib import contextmanager
41
from mock import patch, create_autospec
42
import mock
Bryton Lacquement's avatar
Bryton Lacquement committed
43
from slapos.util import sqlite_connect, bytes2str
44
from slapos.slap.slap import DEFAULT_SOFTWARE_TYPE
45

46 47
import slapos.cli.console
import slapos.cli.entry
48
import slapos.cli.info
49
import slapos.cli.list
50 51 52
import slapos.cli.computer_info
import slapos.cli.computer_list
import slapos.cli.computer_token
53
import slapos.cli.supervisorctl
54
import slapos.cli.request
Bryton Lacquement's avatar
Bryton Lacquement committed
55
from slapos.cli.proxy_show import do_show, StringIO
56 57 58
from slapos.cli.cache_binarysr import do_lookup as cache_binarysr_do_lookup
from slapos.cli.cache_url import do_lookup as cache_url_do_lookup
from slapos.cli.cache_pypi import do_lookup as cache_pypi_do_lookup
59
from slapos.client import ClientConfig
60
from slapos.slap import SoftwareProductCollection
61
import slapos.grid.svcbackend
62
import slapos.proxy
63 64
import slapos.slap

65 66
import supervisor.supervisorctl

67
signature_certificate_list = """-----BEGIN CERTIFICATE-----
68 69
MIIB9jCCAV+gAwIBAgIJAKRvzcy7OH0UMA0GCSqGSIb3DQEBBQUAMBMxETAPBgNV
BAMMCENPTVAtNzcyMCAXDTEyMDgxMDE1NDI1MVoYDzIxMTIwNzE3MTU0MjUxWjAT
70 71
MREwDwYDVQQDDAhDT01QLTc3MjCBnzANBgkqhkiG9w0BAQEFAAOBjQAwgYkCgYEA
o7aipd6MbnuGDeR1UJUjuMLQUariAyQ2l2ZDS6TfOwjHiPw/mhzkielgk73kqN7A
72 73 74
sUREx41eTcYCXzTq3WP3xCLE4LxLg1eIhd4nwNHj8H18xR9aP0AGjo4UFl5BOMa1
mwoyBt3VtfGtUmb8whpeJgHhqrPPxLoON+i6fIbXDaUCAwEAAaNQME4wHQYDVR0O
BBYEFEfjy3OopT2lOksKmKBNHTJE2hFlMB8GA1UdIwQYMBaAFEfjy3OopT2lOksK
75
mKBNHTJE2hFlMAwGA1UdEwQFMAMBAf8wDQYJKoZIhvcNAQEFBQADgYEAaNRx6YN2
76 77 78 79 80 81
M/p3R8/xS6zvH1EqJ3FFD7XeAQ52WuQnKSREzuw0dsw12ClxjcHiQEFioyTiTtjs
5pW18Ry5Ie7iFK4cQMerZwWPxBodEbAteYlRsI6kePV7Gf735Y1RpuN8qZ2sYL6e
x2IMeSwJ82BpdEI5niXxB+iT0HxhmR+XaMI=
-----END CERTIFICATE-----
"""

82 83 84 85 86 87 88
def raiseNotFoundError(*args, **kwargs):
  raise slapos.slap.NotFoundError()

class CliMixin(unittest.TestCase):
  def setUp(self):
    slap = slapos.slap.slap()
    self.logger = create_autospec(logging.Logger)
89
    self.local = {'slap': slap, 'product': SoftwareProductCollection(self.logger, slap)}
90
    self.conf = create_autospec(ClientConfig)
91
    self.sign_cert_list = signature_certificate_list
92

93
class TestCliCacheBinarySr(CliMixin):
94 95 96

  test_url = "https://lab.nexedi.com/nexedi/slapos/raw/1.0.102/software/slaprunner/software.cfg"
  def test_cached_binary(self):
97
    self.assertEqual(0, cache_binarysr_do_lookup(
98 99
        self.logger,
        cache_dir="http://dir.shacache.org",
100 101 102
        cache_url="http://shacache.org",
        software_url=self.test_url,
        signature_certificate_list=self.sign_cert_list))
103

104
    self.logger.info.assert_any_call('Software URL: %s',
105 106
            u'https://lab.nexedi.com/nexedi/slapos/raw/1.0.102/software/slaprunner/software.cfg')
    self.logger.info.assert_any_call('MD5:          %s', 'cccdc51a07e8c575c880f2d70dd4d458')
107 108 109 110 111
    self.logger.info.assert_any_call(u'---------------------------------------------------------------------')
    self.logger.info.assert_any_call(u'    multiarch     distribution version    id   compatible? verified? ')
    self.logger.info.assert_any_call(u'---------------------------------------------------------------------')
    self.logger.info.assert_any_call(u' x86_64-linux-gnu CentOS Linux 7.5.1804  Core       no        yes    ')
    self.logger.info.assert_any_call(u' x86_64-linux-gnu    Ubuntu     18.04   bionic      no        yes    ')
112
    # Omit some lines as it may fail depending of the OS
113
    self.logger.info.assert_any_call(u'---------------------------------------------------------------------')
114 115

  def test_uncached_binary(self):
116
    self.assertEqual(1, cache_binarysr_do_lookup(
117 118
        self.logger,
        cache_dir="http://dir.shacache.org",
119 120 121
        cache_url="http://shacache.org",
        software_url="this_is_uncached_url",
        signature_certificate_list=self.sign_cert_list))
122

123 124
    self.logger.critical.assert_any_call(
        'Error while looking object %s', 'this_is_uncached_url', exc_info=True)
125 126

  def test_bad_cache_dir(self):
127
    self.assertEqual(1, cache_binarysr_do_lookup(
128 129
        self.logger,
        cache_dir="http://xxx.shacache.org",
130 131 132
        cache_url="http://shacache.org",
        software_url=self.test_url,
        signature_certificate_list=self.sign_cert_list))
133 134

    self.logger.critical.assert_any_call(
135 136 137
        'Error while looking object %s',
        'https://lab.nexedi.com/nexedi/slapos/raw/1.0.102/software/slaprunner/software.cfg',
        exc_info=True)
138

139 140 141 142
  def test_unverified_signature(self):
      with mock.patch(
              'slapos.grid.networkcache.machine_info_tuple',
              return_value=('x86_64-linux-gnu', ('debian', '8.10', ''))):
143
          self.assertEqual(0, cache_binarysr_do_lookup(
144 145 146 147 148 149
              self.logger,
              cache_dir="http://dir.shacache.org",
              cache_url="http://shacache.org",
              software_url=self.test_url,
              signature_certificate_list=""))

150
      self.logger.info.assert_any_call('Software URL: %s',
151 152 153 154 155 156 157 158 159 160
              u'https://lab.nexedi.com/nexedi/slapos/raw/1.0.102/software/slaprunner/software.cfg')
      self.logger.info.assert_any_call('MD5:          %s', 'cccdc51a07e8c575c880f2d70dd4d458')
      self.logger.info.assert_any_call(u'---------------------------------------------------------------------')
      self.logger.info.assert_any_call(u'    multiarch     distribution version    id   compatible? verified? ')
      self.logger.info.assert_any_call(u'---------------------------------------------------------------------')
      self.logger.info.assert_any_call(u' x86_64-linux-gnu CentOS Linux 7.5.1804  Core       no         no    ')
      self.logger.info.assert_any_call(u' x86_64-linux-gnu    Ubuntu     18.04   bionic      no         no    ')
      self.logger.info.assert_any_call(u' x86_64-linux-gnu    debian      8.10              yes         no    ')
      self.logger.info.assert_any_call(u'---------------------------------------------------------------------')

161

162
class TestCliCacheUrl(CliMixin):
163

164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201
  test_url = "https://ftp.gnu.org/gnu/aspell/aspell-0.60.7.tar.gz"
  cache_dir = "http://dir.shacache.org"
  def test_cached_url(self):
    self.assertEqual(0, cache_url_do_lookup(
        self.logger,
        cache_dir=self.cache_dir,
        cache_url="http://shacache.org",
        url=self.test_url,
        signature_certificate_list=""))


    self.logger.info.assert_any_call('Software source URL: %s', self.test_url)
    self.logger.info.assert_any_call('SHADIR URL: %s/%s\n', self.cache_dir, "file-urlmd5:f213fcd8e97aa729f685b8cb71b976a7")
    self.logger.info.assert_any_call(u'---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------')
    self.logger.info.assert_any_call(u'                         url                                                                                      sha512                                                              signed ')
    self.logger.info.assert_any_call(u'---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------')
    self.logger.info.assert_any_call(u' https://ftp.gnu.org/gnu/aspell/aspell-0.60.7.tar.gz 6f5fcd1c29164ee18f205594b66f382b51d19b17686293a931ca92c1442d3f7228627ca7d604d860551d0d367ac34dfb2ae34170a844f51e84e390fb1edc4535 False  ')
    self.logger.info.assert_any_call(u'---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------')

  def test_cached_signed_url(self):
    self.assertEqual(0, cache_url_do_lookup(
        self.logger,
        cache_dir=self.cache_dir,
        cache_url="http://shacache.org",
        url=self.test_url,
        signature_certificate_list=signature_certificate_list))


    self.logger.info.assert_any_call('Software source URL: %s', self.test_url)
    self.logger.info.assert_any_call('SHADIR URL: %s/%s\n', self.cache_dir, "file-urlmd5:f213fcd8e97aa729f685b8cb71b976a7")
    self.logger.info.assert_any_call(u'---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------')
    self.logger.info.assert_any_call(u'                         url                                                                                      sha512                                                              signed ')
    self.logger.info.assert_any_call(u'---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------')
    self.logger.info.assert_any_call(u' https://ftp.gnu.org/gnu/aspell/aspell-0.60.7.tar.gz 6f5fcd1c29164ee18f205594b66f382b51d19b17686293a931ca92c1442d3f7228627ca7d604d860551d0d367ac34dfb2ae34170a844f51e84e390fb1edc4535  True  ')
    self.logger.info.assert_any_call(u'---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------')

  def test_uncached_url(self):
    self.assertEqual(1, cache_url_do_lookup(
202 203
        self.logger,
        cache_dir="http://dir.shacache.org",
204 205 206
        cache_url="http://shacache.org",
        url="this_is_uncached_url",
        signature_certificate_list=""))
207

208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265
    self.logger.info.assert_any_call('Object not found in cache.')

  def test_bad_cache_dir(self):
    self.assertEqual(1, cache_url_do_lookup(
        self.logger,
        cache_dir="http://xxx.shacache.org",
        cache_url="http://shacache.org",
        url=self.test_url,
        signature_certificate_list=""))

    self.logger.critical.assert_any_call('Error while looking object %s', self.test_url, exc_info=True)

class TestCliCachePypi(CliMixin):

  egg_name = "pytz"
  egg_version = "2016.10"
  cache_dir = "http://dir.shacache.org"
  def test_cached_pypi(self):
    self.assertEqual(0, cache_pypi_do_lookup(
        self.logger,
        cache_dir=self.cache_dir,
        cache_url="http://shacache.org",
        name=self.egg_name,
        version=self.egg_version,
        signature_certificate_list=""))


    self.logger.info.assert_any_call('Python egg %s version %s', self.egg_name, self.egg_version)
    self.logger.info.assert_any_call('SHADIR URL: %s/%s\n', self.cache_dir, 'pypi:{}={}'.format(self.egg_name, self.egg_version))
    self.logger.info.assert_any_call('----------------------------------------------------------------------------------------------------------------------------------------------------------------')
    self.logger.info.assert_any_call('        basename                                                                     sha512                                                              signed ')
    self.logger.info.assert_any_call('----------------------------------------------------------------------------------------------------------------------------------------------------------------')
    self.logger.info.assert_any_call(' pytz-2016.10-py2.7.egg e072d146c42cb2efde946eef1d37ce6f8cb8eec6f6f928f6d57bb5312578bfa0031dcdbd816318015d765886bb64c02e1772adf7309142bc80324a4155b4ae8b False  ')
    self.logger.info.assert_any_call(' pytz-2016.10-py2.7.egg e072d146c42cb2efde946eef1d37ce6f8cb8eec6f6f928f6d57bb5312578bfa0031dcdbd816318015d765886bb64c02e1772adf7309142bc80324a4155b4ae8b False  ')
    self.logger.info.assert_any_call('----------------------------------------------------------------------------------------------------------------------------------------------------------------')


  def test_cached_signed_url(self):
    self.assertEqual(0, cache_pypi_do_lookup(
        self.logger,
        cache_dir=self.cache_dir,
        cache_url="http://shacache.org",
        name=self.egg_name,
        version=self.egg_version,
        signature_certificate_list=signature_certificate_list))


    self.logger.info.assert_any_call('Python egg %s version %s', self.egg_name, self.egg_version)
    self.logger.info.assert_any_call('SHADIR URL: %s/%s\n', self.cache_dir, 'pypi:{}={}'.format(self.egg_name, self.egg_version))
    self.logger.info.assert_any_call('----------------------------------------------------------------------------------------------------------------------------------------------------------------')
    self.logger.info.assert_any_call('        basename                                                                     sha512                                                              signed ')
    self.logger.info.assert_any_call('----------------------------------------------------------------------------------------------------------------------------------------------------------------')
    self.logger.info.assert_any_call(' pytz-2016.10-py2.7.egg e072d146c42cb2efde946eef1d37ce6f8cb8eec6f6f928f6d57bb5312578bfa0031dcdbd816318015d765886bb64c02e1772adf7309142bc80324a4155b4ae8b  True  ')
    self.logger.info.assert_any_call(' pytz-2016.10-py2.7.egg e072d146c42cb2efde946eef1d37ce6f8cb8eec6f6f928f6d57bb5312578bfa0031dcdbd816318015d765886bb64c02e1772adf7309142bc80324a4155b4ae8b False  ')
    self.logger.info.assert_any_call('----------------------------------------------------------------------------------------------------------------------------------------------------------------')

  def test_uncached_url(self):
    self.assertEqual(1, cache_pypi_do_lookup(
266 267
        self.logger,
        cache_dir="http://dir.shacache.org",
268 269 270 271
        cache_url="http://shacache.org",
        name="not-existing-egg",
        version=self.egg_version,
        signature_certificate_list=""))
272

273
    self.logger.info.assert_any_call('Object not found in cache.')
274 275

  def test_bad_cache_dir(self):
276
    self.assertEqual(1, cache_pypi_do_lookup(
277 278
        self.logger,
        cache_dir="http://xxx.shacache.org",
279 280 281 282
        cache_url="http://shacache.org",
        name=self.egg_name,
        version=self.egg_version,
        signature_certificate_list=""))
283

284
    self.logger.critical.assert_any_call('Error while looking egg %s version %s', self.egg_name, self.egg_version, exc_info=True)
285

286
class TestCliProxy(CliMixin):
287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308
  def test_generateSoftwareProductListFromString(self):
    """
    Test that generateSoftwareProductListFromString correctly parses a parameter
    coming from the configuration file.
    """
    software_product_list_string = """
product1 url1
product2 url2"""
    software_release_url_list = {
        'product1': 'url1',
        'product2': 'url2',
    }
    self.assertEqual(
        slapos.proxy._generateSoftwareProductListFromString(
            software_product_list_string),
        software_release_url_list
    )

  def test_generateSoftwareProductListFromString_emptyString(self):
    self.assertEqual(
        slapos.proxy._generateSoftwareProductListFromString(''),
        {}
309 310
    )

311 312 313 314 315 316 317 318 319
class TestCliProxyShow(CliMixin):
  def setUp(self):
    super(TestCliProxyShow, self).setUp()
    self.db_file = tempfile.NamedTemporaryFile(delete=False, suffix='.db')
    self.db_file.close()
    self.conf.database_uri = self.db_file.name
    self.conf.logger = self.logger

    # load database
320
    schema = bytes2str(pkg_resources.resource_string(
321 322
        'slapos.tests',
        os.path.join('test_slapproxy', 'database_dump_version_current.sql')))
323
    db = sqlite_connect(self.db_file.name)
Bryton Lacquement's avatar
Bryton Lacquement committed
324
    db.cursor().executescript(schema)
325 326
    db.commit()

327
    # by default we simulate being invoked with "show all" arguments
328 329 330 331 332 333 334
    self.conf.computers = True
    self.conf.software = True
    self.conf.partitions = True
    self.conf.slaves = True
    self.conf.params = True
    self.conf.network = True

335 336 337 338 339 340 341 342 343
  def tearDown(self):
    super(TestCliProxyShow, self).tearDown()
    os.remove(self.db_file.name)

  def test_proxy_show(self):
    logger = create_autospec(logging.Logger)
    with mock.patch(
            'slapos.cli.proxy_show.logging.getLogger',
            return_value=logger):
Bryton Lacquement's avatar
Bryton Lacquement committed
344
        do_show(self.conf)
345 346

    # installed softwares are listed
347
    logger.info.assert_any_call(
348
        '      /srv/slapgrid/slappart8/srv/runner/project/slapos/software/erp5/software.cfg          slaprunner        available    287375f0cba269902ba1bc50242839d7 ')
349 350 351

    # instance parameters are listed
    # _ parameter is json formatted
352
    logger.info.assert_any_call(
353 354
        '    %s = %s',
        '_',
Bryton Lacquement's avatar
Bryton Lacquement committed
355
        '{\n  "monitor-base-url": "",\n  "url": "memcached://10.0.30.235:2003/"\n}')
356 357

    # other parameters are displayed as simple string
358
    logger.info.assert_any_call(
359 360 361 362 363
        '    %s = %s',
        'url',
        'http://10.0.30.235:4444/wd/hub')

    # if _ cannot be decoded as json, it is displayed "as is"
364
    logger.info.assert_any_call(
365 366 367
        '    %s = %s',
        '_',
        u'Ahah this is not json \U0001f61c ')
368 369 370 371 372
    # Nothing was output on application logger, because this command uses
    # its own logger.
    self.logger.info.assert_not_called()

  def test_proxy_show_displays_on_stdout(self):
373 374 375 376 377
    # we patch logging to make sure our messages are outputed, even with test
    # runners like pytest which allows disabling output
    with patch.object(sys, 'stdout', StringIO()) as stdout, \
        patch.object(sys, 'stderr', StringIO()) as stderr, \
        patch('logging.Logger.isEnabledFor', returned_value=True):
Bryton Lacquement's avatar
Bryton Lacquement committed
378
      do_show(self.conf)
379 380 381 382 383

    # 287375f0cba269902ba1bc50242839d7 is the hash of an installed software
    # in our setup database
    self.assertIn('287375f0cba269902ba1bc50242839d7', stdout.getvalue())
    self.assertEqual('', stderr.getvalue())
384

385
  def test_proxy_show_use_pager(self):
386 387 388 389 390 391 392 393 394 395 396 397
    # we patch logging to make sure our messages are outputed, even with test
    # runners like pytest which allows disabling output
    with patch.object(sys, 'stdout', StringIO()) as stdout, \
        patch.object(sys, 'stderr', StringIO()) as stderr, \
        patch('logging.Logger.isEnabledFor', returned_value=True):
      stdout.isatty = lambda *args: True

      # use a pager that just output to a file.
      tmp = tempfile.NamedTemporaryFile(delete=False)
      self.addCleanup(os.unlink, tmp.name)
      os.environ['PAGER'] = 'cat > {}'.format(tmp.name)

Bryton Lacquement's avatar
Bryton Lacquement committed
398
      do_show(self.conf)
399 400 401 402

    self.assertEqual('', stdout.getvalue())
    self.assertEqual('', stderr.getvalue())
    # our pager was set to output to this temporary file
403 404
    with open(tmp.name, 'r') as f:
      self.assertIn('287375f0cba269902ba1bc50242839d7', f.read())
405

406

407 408 409 410 411 412 413 414 415 416 417
class TestCliBoot(CliMixin):
  def test_node_boot(self):
    # initialize instance root, with a timestamp in a partition
    temp_dir = tempfile.mkdtemp()
    self.addCleanup(shutil.rmtree, temp_dir)
    instance_root = os.path.join(temp_dir, 'instance')
    partition_base_name = 'partition'
    os.mkdir(instance_root)
    os.mkdir(os.path.join(instance_root, partition_base_name + '1'))
    timestamp = os.path.join(
        instance_root, partition_base_name + '1', '.timestamp')
418 419 420
    requested_state_path = os.path.join(instance_root,
                                        partition_base_name + '1',
                                        '.requested_state')
421 422
    with open(timestamp, 'w') as f:
      f.write("1578552471")
423 424
    with open(requested_state_path, 'w') as f:
      f.write("started")
425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442

    # make a config file using this instance root
    with tempfile.NamedTemporaryFile(mode='w') as slapos_conf:
      slapos_conf.write(
          textwrap.dedent(
              """\
              [slapos]
              instance_root = {instance_root}
              master_url = https://slap.vifib.com/

              [slapformat]
              partition_base_name = {partition_base_name}
              interface_name = interface_name_from_config
              """).format(**locals()))
      slapos_conf.flush()

      # run slapos node boot
      app = slapos.cli.entry.SlapOSApp()
443
      with patch('slapos.cli.boot.check_root_user', return_value=True) as check_root_user,\
444 445 446 447 448
          patch('slapos.cli.boot.SlapOSApp') as SlapOSApp,\
          patch('slapos.cli.boot.ConfigCommand.config_path', return_value=slapos_conf.name), \
          patch(
              'slapos.cli.boot.netifaces.ifaddresses',
              return_value={socket.AF_INET6: ({'addr': '2000::1'},),},) as ifaddresses,\
449 450
          patch('slapos.cli.boot._startComputerPartition', return_value=None) as start_partition,\
          patch('slapos.cli.boot.launchSupervisord', return_value=None),\
451 452 453 454 455
          patch('slapos.cli.boot._ping_hostname', return_value=1) as _ping_hostname:
        app.run(('node', 'boot'))

      # boot command runs as root
      check_root_user.assert_called_once()
456 457
      # Computer partition was started during boot
      start_partition.assert_called_once()
458 459 460 461 462
      # it waits for interface to have an IPv6 address
      ifaddresses.assert_called_once_with('interface_name_from_config')
      # then ping master hostname to wait for connectivity
      _ping_hostname.assert_called_once_with('slap.vifib.com')
      # then format and bang
463
      SlapOSApp().run.assert_any_call(['node', 'format', '--now', '--local', '--verbose'])
464 465 466
      SlapOSApp().run.assert_any_call(['node', 'bang', '-m', 'Reboot'])

      # timestamp files have been removed
467 468
      self.assertFalse(os.path.exists(timestamp),
              timestamp)
469

470
  def test_boot_failure(self):
471 472 473
    # In this test, the network interfaces will not have
    # IP address at the beginning, the global IPv6 address only appears later,
    # then format and bang commands will fail two time each.
474 475 476
    # `slapos node boot` command retries on failures.

    app = slapos.cli.entry.SlapOSApp()
477 478 479
    net1 = {socket.AF_INET: ({'addr': '127.0.0.1'},),}
    net2 = {socket.AF_INET: ({'addr': '127.0.0.1'},), socket.AF_INET6: ({'addr': 'fe80::1'},),}
    net3 = {socket.AF_INET: ({'addr': '127.0.0.1'},), socket.AF_INET6: ({'addr': 'fe80::1'}, {'addr': '2000::1'},),}
480 481 482
    with patch('slapos.cli.boot.check_root_user', return_value=True) as check_root_user,\
        patch('slapos.cli.boot.sleep') as sleep,\
        patch('slapos.cli.boot.netifaces.ifaddresses',
483
              side_effect=[net1, net2, net3]),\
484
        patch('slapos.cli.boot._ping_hostname', return_value=0),\
485
        patch('slapos.cli.boot._startComputerPartitionList', return_value=None) as start_partition,\
486 487 488 489 490 491 492 493 494
        patch('slapos.cli.format.check_root_user', return_value=True),\
        patch('slapos.cli.format.logging.FileHandler', return_value=logging.NullHandler()),\
        patch('slapos.cli.bang.check_root_user', return_value=True),\
        patch('slapos.cli.format.do_format', side_effect=[Exception, Exception, None]) as do_format,\
        patch('slapos.cli.bang.do_bang', side_effect=[Exception, Exception, None]) as do_bang:

      app.run(('node', 'boot'))

    check_root_user.assert_called_once()
495
    start_partition.assert_called_once()
496 497 498 499

    self.assertEqual(do_format.call_count, 3)
    self.assertEqual(do_bang.call_count, 3)

500 501 502
    # between retries of ping, we sleep 5 seconds
    # between retries of bang, we sleep 15 seconds
    self.assertEqual(sleep.mock_calls, [mock.call(5)]*2 + [mock.call(15)]*4)
503 504 505 506 507 508 509 510 511

    # we have only one logger on the console
    from slapos.cli import coloredlogs
    self.assertEqual(
        len([
            h for h in logging.getLogger('').handlers
            if isinstance(h, coloredlogs.ColoredStreamHandler)
        ]), 1)

512

513 514 515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 532 533 534 535 536 537 538 539 540 541 542 543 544 545 546 547 548 549 550 551 552 553 554 555 556 557 558 559 560 561 562 563 564 565 566 567 568 569 570 571
class TestCliNode(CliMixin):

  def test_node_software(self):
    """slapos node software command
    """
    app = slapos.cli.entry.SlapOSApp()

    software_release = mock.MagicMock()
    software_release.getState = mock.Mock(return_value='available')
    software_release.getURI = mock.Mock(return_value='http://example.org/software.cfg')
    software_release.building = mock.Mock()

    computer = mock.MagicMock()
    computer.getSoftwareReleaseList = mock.Mock(return_value=[software_release])

    software = mock.MagicMock()
    from slapos.grid.slapgrid import Slapgrid
    from slapos.slap.slap import slap
    with patch('slapos.cli.slapgrid.check_root_user', return_value=True) as checked_root_user, \
         patch('slapos.cli.slapgrid.setRunning') as write_pid_file, \
         patch.object(Slapgrid, 'checkEnvironmentAndCreateStructure') as checkEnvironmentAndCreateStructure, \
         patch.object(slap, 'registerComputer', return_value=computer) as registerComputer, \
         patch('slapos.grid.slapgrid.Software', return_value=software) as Software, \
         patch('slapos.grid.slapgrid.open') as _open:

      app.run(('node', 'software'))

      checked_root_user.assert_called_once()
      write_pid_file.assert_called_once_with(
          logger=mock.ANY,
          pidfile='/opt/slapos/slapgrid-sr.pid')
      checkEnvironmentAndCreateStructure.assert_called_once()
      registerComputer.assert_called_once()

      software_constructor_call, = Software.call_args_list
      self.assertEqual('http://example.org/software.cfg', software_constructor_call[1]['url'])
      # by default software are not built in debug mode
      self.assertFalse(software_constructor_call[1]['buildout_debug'])

      software.install.assert_called_once()

  def test_node_instance(self):
    """slapos node instance command
    """
    app = slapos.cli.entry.SlapOSApp()

    from slapos.grid.slapgrid import Slapgrid
    with patch('slapos.cli.slapgrid.check_root_user', return_value=True) as checked_root_user, \
         patch('slapos.cli.slapgrid.setRunning') as write_pid_file, \
         patch.object(Slapgrid, 'processComputerPartitionList') as processComputerPartitionList:

      app.run(('node', 'instance'))

      checked_root_user.assert_called_once()
      write_pid_file.assert_called_once_with(
          logger=mock.ANY,
          pidfile='/opt/slapos/slapgrid-cp.pid')
      processComputerPartitionList.assert_called_once()

572 573 574 575 576
  def test_node_prune(self):
    """slapos node prune command
    """
    app = slapos.cli.entry.SlapOSApp()

577 578
    with patch('slapos.cli.prune.check_root_user', return_value=True) as checked_root_user, \
         patch('slapos.cli.prune.setRunning') as write_pid_file, \
579 580
         patch('slapos.cli.prune.merged_options', return_value={
            'shared_part_list': 'something',
581
            'root_check': 'true',
582 583 584 585 586 587
            'pidfile_software': 'pidfile_software.pid',
         }), \
         patch('slapos.cli.prune.do_prune') as do_prune:

      app.run(('node', 'prune'))

588
      checked_root_user.assert_called_once()
589 590 591 592 593
      write_pid_file.assert_called_once_with(
          logger=mock.ANY,
          pidfile='pidfile_software.pid')
      do_prune.assert_called_once()

594

595 596 597
class TestCliList(CliMixin):
  def test_list(self):
    """
598
    Test "slapos service list" command output.
599 600 601 602 603 604 605 606 607 608 609 610 611 612 613 614 615 616 617 618 619
    """
    return_value = {
      'instance1': slapos.slap.SoftwareInstance(_title='instance1', _software_release_url='SR1'),
      'instance2': slapos.slap.SoftwareInstance(_title='instance2', _software_release_url='SR2'),
    }
    with patch.object(slapos.slap.slap, 'getOpenOrderDict', return_value=return_value) as _:
      slapos.cli.list.do_list(self.logger, None, self.local)

    self.logger.info.assert_any_call('%s %s', 'instance1', 'SR1')
    self.logger.info.assert_any_call('%s %s', 'instance2', 'SR2')

  def test_emptyList(self):
    with patch.object(slapos.slap.slap, 'getOpenOrderDict', return_value={}) as _:
      slapos.cli.list.do_list(self.logger, None, self.local)

    self.logger.info.assert_called_once_with('No existing service.')

@patch.object(slapos.slap.slap, 'registerOpenOrder', return_value=slapos.slap.OpenOrder())
class TestCliInfo(CliMixin):
  def test_info(self, _):
    """
620
    Test "slapos service info" command output.
621 622 623 624 625 626 627 628 629 630 631 632 633 634 635 636 637 638 639
    """
    setattr(self.conf, 'reference', 'instance1')
    instance = slapos.slap.SoftwareInstance(
        _software_release_url='SR1',
        _requested_state = 'mystate',
        _connection_dict = {'myconnectionparameter': 'value1'},
        _parameter_dict = {'myinstanceparameter': 'value2'}
    )
    with patch.object(slapos.slap.OpenOrder, 'getInformation', return_value=instance):
      slapos.cli.info.do_info(self.logger, self.conf, self.local)

    self.logger.info.assert_any_call(pprint.pformat(instance._parameter_dict))
    self.logger.info.assert_any_call('Software Release URL: %s', instance._software_release_url)
    self.logger.info.assert_any_call('Instance state: %s', instance._requested_state)
    self.logger.info.assert_any_call(pprint.pformat(instance._parameter_dict))
    self.logger.info.assert_any_call(pprint.pformat(instance._connection_dict))

  def test_unknownReference(self, _):
    """
640
    Test "slapos service info" command output in case reference
641 642 643 644 645 646 647 648
    of service is not known.
    """
    setattr(self.conf, 'reference', 'instance1')
    with patch.object(slapos.slap.OpenOrder, 'getInformation', side_effect=raiseNotFoundError):
      slapos.cli.info.do_info(self.logger, self.conf, self.local)

    self.logger.warning.assert_called_once_with('Instance %s does not exist.', self.conf.reference)

649 650 651
class TestCliComputerList(CliMixin):
  def test_computer_list(self):
    """
652
    Test "slapos computer list" command output.
653 654 655 656 657 658 659 660 661 662 663 664 665 666 667 668 669 670 671 672 673 674 675 676 677 678 679 680 681 682 683 684 685 686 687
    """
    return_value = {
      'computer1': slapos.slap.hateoas.TempDocument(title='computer1', _reference='COMP-1'),
      'computer2': slapos.slap.hateoas.TempDocument(title='computer2', _reference='COMP-0'),
    }
    with patch.object(slapos.slap.slap, 'getComputerDict', return_value=return_value) as _:
      slapos.cli.computer_list.do_list(self.logger, None, self.local)

    self.logger.info.assert_any_call('%s %s', 'COMP-1', 'computer1')
    self.logger.info.assert_any_call('%s %s', 'COMP-0', 'computer2')

  def test_computer_emptyList(self):
    with patch.object(slapos.slap.slap, 'getComputerDict', return_value={}) as _:
      slapos.cli.computer_list.do_list(self.logger, None, self.local)

    self.logger.info.assert_called_once_with('No existing computer.')

@patch.object(slapos.slap.slap, 'registerComputer', return_value=slapos.slap.Computer("COMP-1"))
class TestComputerCliInfo(CliMixin):
  def test_computer_info(self, _):
    """
    Test "slapos computer info" command output.
    """
    setattr(self.conf, 'reference', 'COMP-1')
    computer = slapos.slap.Computer("COMP-1")
    computer._reference = "COMP-1"
    computer._title = "computer1"
    with patch.object(slapos.slap.Computer, 'getInformation', return_value=computer):
      slapos.cli.computer_info.do_info(self.logger, self.conf, self.local)

    self.logger.info.assert_any_call('Computer Reference: %s', computer._reference)
    self.logger.info.assert_any_call('Computer Title    : %s', computer._title)

  def test_computer_unknownReference(self, _):
    """
688 689
    Test "slapos computer info" command output in case reference
    of computer is not known.
690 691 692 693 694 695 696 697 698 699 700 701 702 703 704 705 706 707
    """
    setattr(self.conf, 'reference', 'COMP-0')
    with patch.object(slapos.slap.Computer, 'getInformation', side_effect=raiseNotFoundError):
      slapos.cli.computer_info.do_info(self.logger, self.conf, self.local)

    self.logger.warning.assert_called_once_with('Computer %s does not exist.', self.conf.reference)

@patch.object(slapos.slap.slap, 'registerToken', return_value=slapos.slap.Token())
class TestComputerCliToken(CliMixin):
  def test_computer_token(self, _):
    """
    Test "slapos computer token" command output.
    """
    token = "1234567-90"
    with patch.object(slapos.slap.Token, 'request', return_value=token):
      slapos.cli.computer_token.do_token(self.logger, self.conf, self.local)

    self.logger.info.assert_any_call('Computer token: %s', "1234567-90")
708 709 710 711 712 713 714 715 716 717 718 719 720 721 722 723 724 725 726 727 728

@patch.object(supervisor.supervisorctl, 'main')
class TestCliSupervisorctl(CliMixin):
  def test_allow_supervisord_launch(self, _):
    """
    Test that "slapos node supervisorctl" tries to launch supervisord
    """
    instance_root = '/foo/bar'
    with patch.object(slapos.grid.svcbackend, 'launchSupervisord') as launchSupervisord:
      slapos.cli.supervisorctl.do_supervisorctl(self.logger, instance_root, ['status'], False)
      launchSupervisord.assert_any_call(instance_root=instance_root, logger=self.logger)

  def test_forbid_supervisord_launch(self, _):
    """
    Test that "slapos node supervisorctl" does not try to launch supervisord
    """
    instance_root = '/foo/bar'
    with patch.object(slapos.grid.svcbackend, 'launchSupervisord') as launchSupervisord:
      slapos.cli.supervisorctl.do_supervisorctl(self.logger, instance_root, ['status'], True)
      self.assertFalse(launchSupervisord.called)

729 730 731

class TestCliConsole(unittest.TestCase):

732
  default_script = """\
Bryton Lacquement's avatar
Bryton Lacquement committed
733 734
print(request('software_release', 'instance').getInstanceParameterDict()['parameter_name'])
"""
735

Bryton Lacquement's avatar
Bryton Lacquement committed
736 737
  @contextmanager
  def _test_console(self):
738
    cp = slapos.slap.ComputerPartition('computer_%s' % self.id(), 'partition_%s' % self.id())
Bryton Lacquement's avatar
Bryton Lacquement committed
739 740 741 742 743 744 745
    cp._parameter_dict = {'parameter_name': 'parameter_value'}
    with patch.object(slapos.slap.OpenOrder, 'request',
                      return_value = cp) as mock_request, \
         patch.object(sys, 'stdout', StringIO()) as app_stdout, \
         tempfile.NamedTemporaryFile() as config_file:
      config_file.write(b'[slapos]\nmaster_url=null\n')
      config_file.flush()
746 747 748 749 750 751 752 753 754 755 756
      yield slapos.cli.entry.SlapOSApp(), \
          config_file.name, \
          mock_request, \
          app_stdout


  def test_console_interactive(self):
    with self._test_console() as (app, config_file, mock_request, stdout), \
         patch.object(sys, 'stdin', StringIO(self.default_script)):
      app.run(('console', '--cfg', config_file))
      self.assertIn('parameter_value', stdout.getvalue())
Bryton Lacquement's avatar
Bryton Lacquement committed
757 758
      mock_request.assert_called_once_with(
          'software_release', 'instance')
759

760 761 762 763 764 765 766 767 768 769
  def test_console_interactive_no__file__(self):
    script = '''if 1:
      try:
        print('FAIL __file__ is set to', __file__)
      except NameError:
        print('OK __file__ is not set')

      '''
    with self._test_console() as (app, config_file, _, stdout), \
         patch.object(sys, 'stdin', StringIO(script)):
Bryton Lacquement's avatar
Bryton Lacquement committed
770
      app.run(('console', '--cfg', config_file))
771
      self.assertIn('OK __file__ is not set', stdout.getvalue())
772

773
  def test_console_script(self):
774
    with self._test_console() as (app, config_file, mock_request, stdout), \
Bryton Lacquement's avatar
Bryton Lacquement committed
775
         tempfile.NamedTemporaryFile('w') as script:
776 777 778 779 780 781 782 783 784 785 786 787 788 789 790 791
      script.write(self.default_script)
      script.flush()
      app.run(('console', '--cfg', config_file, script.name))
      self.assertIn('parameter_value', stdout.getvalue())
      mock_request.assert_called_once_with(
          'software_release', 'instance')

  def test_console_script__file__(self):
    with self._test_console() as (app, config_file, _, stdout),\
      tempfile.NamedTemporaryFile('w') as script:
      script.write('''if 1:
        if __file__ == %r:
          print('OK __file__ is set to script')
        else:
          print('FAIL __file__ is set to', __file__)
      ''' % script.name)
792
      script.flush()
Bryton Lacquement's avatar
Bryton Lacquement committed
793
      app.run(('console', '--cfg', config_file, script.name))
794
      self.assertIn('OK __file__ is set to script', stdout.getvalue())
795 796 797 798 799 800 801 802 803 804 805 806


class TestCliComplete(CliMixin):
  def test_complete_bash(self):
    with patch.object(sys, 'stdout', StringIO()) as app_stdout:
      self.assertEqual(slapos.cli.entry.SlapOSApp().run(['complete']), 0)
    self.assertIn('COMPREPLY', app_stdout.getvalue())

  def test_complete_fish(self):
    with patch.object(sys, 'stdout', StringIO()) as app_stdout:
      self.assertEqual(slapos.cli.entry.SlapOSApp().run(['complete', '--shell=fish']), 0)
    self.assertIn('__fish_seen_subcommand_from', app_stdout.getvalue())
807 808 809 810 811 812 813 814 815 816 817 818 819 820 821 822 823 824 825 826 827 828 829 830 831 832 833 834 835 836 837 838 839 840 841 842 843 844 845 846 847 848 849 850 851 852 853


class TestCliRequest(CliMixin):
  def test_parse_option_dict(self):
    parse_option_dict = slapos.cli.request.parse_option_dict

    self.assertEqual(parse_option_dict(['foo=bar', 'a=b']), {'foo': 'bar', 'a': 'b'})
    # malformed option = assignment
    self.assertRaises(ValueError, parse_option_dict, ['a'])
    # duplicated key
    self.assertRaises(ValueError, parse_option_dict, ['a=b', 'a=c'])
    # corner cases
    self.assertEqual(parse_option_dict(['a=a=b']), {'a': 'a=b'})
    self.assertEqual(parse_option_dict(['a=a\nb']), {'a': 'a\nb'})
    self.assertEqual(parse_option_dict([]), {})

  def test_request(self):
    self.conf.reference = 'instance reference'
    self.conf.software_url = 'software URL'
    self.conf.parameters = {'key': 'value'}
    self.conf.parameters_file = None
    self.conf.node = {'computer_guid': 'COMP-1234'}
    self.conf.type = None
    self.conf.state = None
    self.conf.slave = False

    with patch.object(
        slapos.slap.slap,
        'registerOpenOrder',
        return_value=mock.create_autospec(slapos.slap.OpenOrder)) as registerOpenOrder:
      slapos.cli.request.do_request(self.logger, self.conf, self.local)

    registerOpenOrder().request.assert_called_once_with(
        software_release='software URL',
        partition_reference='instance reference',
        partition_parameter_kw={'key': 'value'},
        software_type=None,
        filter_kw={'computer_guid': 'COMP-1234'},
        state=None,
        shared=False,
    )

    self.logger.info.assert_any_call(
        'Requesting %s as instance of %s...',
        'instance reference',
        'software URL',
    )
854

855 856 857 858 859 860 861 862 863 864 865 866 867 868 869 870 871 872 873 874 875 876 877 878 879 880 881 882 883 884 885 886 887 888 889 890 891 892 893 894 895 896 897 898 899 900 901 902 903 904 905 906 907
  def test_request_json_in_xml_published_parameters(self):
    tmpdir = tempfile.mkdtemp()
    self.addCleanup(shutil.rmtree, tmpdir)
    with open(os.path.join(tmpdir, 'software.cfg.json'), 'w') as f:
      json.dump(
          {
              "name": "Test Software",
              "description": "Dummy software for Test",
              "serialisation": "json-in-xml",
              "software-type": {
                  DEFAULT_SOFTWARE_TYPE: {
                      "title": "Default",
                      "description": "Default type",
                      "request": "instance-default-input-schema.json",
                      "response": "instance-default-output-schema.json",
                      "index": 0
                  },
              }
          }, f)

    self.conf.reference = 'instance reference'
    self.conf.software_url = os.path.join(tmpdir, 'software.cfg')
    self.conf.parameters = {'key': 'value'}
    self.conf.parameters_file = None
    self.conf.node = {'computer_guid': 'COMP-1234'}
    self.conf.type = None
    self.conf.state = None
    self.conf.slave = False

    cp = slapos.slap.ComputerPartition(
        'computer_%s' % self.id(),
        'partition_%s' % self.id())
    cp._requested_state = 'started'
    cp._connection_dict = {'_': json.dumps({'foo': 'bar'})}

    with patch.object(
        slapos.slap.slap,
        'registerOpenOrder',
        return_value=mock.create_autospec(slapos.slap.OpenOrder)) as registerOpenOrder:
      registerOpenOrder().request.return_value = cp
      slapos.cli.request.do_request(self.logger, self.conf, self.local)

    registerOpenOrder().request.assert_called_once()

    self.assertEqual(self.logger.info.mock_calls, [
        mock.call('Requesting %s as instance of %s...', self.conf.reference,
                  self.conf.software_url),
        mock.call('Instance requested.\nState is : %s.', 'started'),
        mock.call('Connection parameters of instance are:'),
        mock.call("{'foo': 'bar'}"),
        mock.call('You can rerun the command to get up-to-date information.'),
    ])

908 909 910 911 912 913 914 915 916 917 918 919 920 921 922 923 924 925 926 927 928 929 930 931 932 933 934 935 936 937 938 939 940 941 942 943 944 945 946 947 948 949 950 951 952 953 954 955 956 957 958 959 960 961 962 963 964 965 966 967 968 969 970 971 972 973 974 975 976 977 978 979 980 981 982 983 984 985 986 987 988 989 990 991 992 993 994 995 996 997 998 999 1000 1001

class TestCliRequestParametersFileJson(CliMixin):
  """Request with --parameter-file, with a .json file.
  """
  expected_partition_parameter_kw = {'foo': ['bar']}

  def _makeParameterFile(self):
    f = tempfile.NamedTemporaryFile(suffix='.json', mode='w', delete=False)
    self.addCleanup(os.unlink, f.name)
    f.write(textwrap.dedent('''\
    {
      "foo": ["bar"]
    }
    '''))
    f.flush()
    return f.name

  def test_request_parameters_file(self):
    self.conf.reference = 'instance reference'
    self.conf.software_url = 'software URL'
    self.conf.parameters =  None
    f = open(self._makeParameterFile())
    self.addCleanup(f.close)
    self.conf.parameters_file = f
    self.conf.node = {'computer_guid': 'COMP-1234'}
    self.conf.type = None
    self.conf.state = None
    self.conf.slave = False

    with patch.object(
        slapos.slap.slap,
        'registerOpenOrder',
        return_value=mock.create_autospec(slapos.slap.OpenOrder)) as registerOpenOrder:
      slapos.cli.request.do_request(self.logger, self.conf, self.local)

    registerOpenOrder().request.assert_called_once_with(
        software_release='software URL',
        partition_reference='instance reference',
        partition_parameter_kw=self.expected_partition_parameter_kw,
        software_type=None,
        filter_kw={'computer_guid': 'COMP-1234'},
        state=None,
        shared=False,
    )
    self.logger.info.assert_any_call(
        'Requesting %s as instance of %s...',
        'instance reference',
        'software URL',
    )


class TestCliRequestParametersFileJsonJsonInXMLSerialisation(
    TestCliRequestParametersFileJson):
  """Request with --parameter-file, with a .json file and a software using
  json-in-xml for serialisation. In that case, the parameters are automatically
  serialised with {'_': json.dumps(params)}
  """
  expected_partition_parameter_kw = {"_": "{\"foo\": [\"bar\"]}"}

  def test_request_parameters_file(self):
    with mock.patch(
        'slapos.cli.request.SoftwareReleaseSchema.getSerialisation',
        return_value='json-in-xml'):
      super(TestCliRequestParametersFileJsonJsonInXMLSerialisation,
            self).test_request_parameters_file()


class TestCliRequestParametersFileJsonJsonInXMLSerialisationAlreadySerialised(
    TestCliRequestParametersFileJson):
  """Request with --parameter-file, with a .json file and a software using
  json-in-xml for serialisation and parameters already serialised with
  {'_': json.dumps(params)}. In that case, parameters are not serialized one
  more time.
  """
  expected_partition_parameter_kw = {"_": "{\"foo\": [\"bar\"]}"}

  def _makeParameterFile(self):
    f = tempfile.NamedTemporaryFile(suffix='.json', mode='w', delete=False)
    self.addCleanup(os.unlink, f.name)
    f.write(textwrap.dedent(r'''
      {"_": "{\"foo\": [\"bar\"]}"}
    '''))
    f.flush()
    return f.name

  def test_request_parameters_file(self):
    with mock.patch(
        'slapos.cli.request.SoftwareReleaseSchema.getSerialisation',
        return_value='json-in-xml'):
      super(
          TestCliRequestParametersFileJsonJsonInXMLSerialisationAlreadySerialised,
          self).test_request_parameters_file()


1002 1003 1004 1005 1006 1007 1008 1009 1010 1011 1012 1013 1014 1015
class TestCliRequestParametersFileYaml(TestCliRequestParametersFileJson):
  """Request with --parameter-file, with a .yaml file. This behaves like json.
  """
  def _makeParameterFile(self):
    f = tempfile.NamedTemporaryFile(suffix='.yaml', mode='w', delete=False)
    self.addCleanup(os.unlink, f.name)
    f.write(textwrap.dedent('''\
      foo:
      - bar
    '''))
    f.flush()
    return f.name


1016 1017 1018 1019 1020 1021 1022 1023 1024 1025 1026 1027 1028 1029 1030
class TestCliRequestParametersFileXml(TestCliRequestParametersFileJson):
  """Request with --parameter-file, with a .xml file
  """
  expected_partition_parameter_kw = {'foo': 'bar'}
  def _makeParameterFile(self):
    f = tempfile.NamedTemporaryFile(suffix='.xml', mode='w', delete=False)
    f.write(textwrap.dedent('''\
      <?xml version="1.0" encoding="utf-8"?>
      <instance>
          <parameter id="foo">bar</parameter>
      </instance>
    '''))
    f.flush()
    self.addCleanup(os.unlink, f.name)
    return f.name