Commit 4a8f846f authored by Jérome Perrin's avatar Jérome Perrin

software/mosquitto/test: fix flaky test

This test is using two connection one with a client to subscribe to a
topic and wait for message and another one with publish.single to
publish to the topic.
The test was failing from time to time because the publish might have
happened after the client was subscribed.

Refactor the test to use `loop` on the client to have more control
and be able to wait for the client to be subscribed using the
`on_subscribe` callback.

The test is also factorized, instead of having the same test twice for
IPv4 and IPv6, we pass the host as parameter.
parent 8bff11cc
[instance-profile]
filename = instance.cfg.in
md5sum = a6061e8bea111d96c10223f9b201ecc0
md5sum = 136afc6a9b8ce14757d5931f5930de66
import os
import time
import pathlib
import paho.mqtt.client as mqtt
import paho.mqtt.publish as publish
from slapos.testing.testcase import makeModuleSetUpAndTestCaseClass
setUpModule, SlapOSInstanceTestCase = makeModuleSetUpAndTestCaseClass(
os.path.abspath(
os.path.join(os.path.dirname(__file__), '..', 'software.cfg')))
pathlib.Path(__file__).parent.parent / "software.cfg")
class TestMosquitto(SlapOSInstanceTestCase):
class TestMosquitto(SlapOSInstanceTestCase):
"""
Test if mosquitto service can publish and subscribe
to specific topics with custom authentication ...
"""
def on_connect(self, client, userdata, flags, rc):
client.subscribe("test")
self.code = rc
def test_ipv4(self):
self._test(self.computer_partition.getConnectionParameterDict()["ipv4"])
def on_message(self, client, userdata, msg):
self.topic = msg.topic
self.payload = str(msg.payload.decode())
def test_ipv6(self):
self._test(self.computer_partition.getConnectionParameterDict()["ipv6"])
def test_topic_ipv4(self):
host = self.computer_partition.getConnectionParameterDict()["ipv4"]
def _test(self, host):
username = self.computer_partition.getConnectionParameterDict()["username"]
password = self.computer_partition.getConnectionParameterDict()["password"]
port = int(self.computer_partition.getConnectionParameterDict()["port"])
topic = "test"
payload = "Hello, World!"
client = mqtt.Client()
client.on_connect = self.on_connect
client.on_message = self.on_message
client.username_pw_set(username=f"{username}", password=f"{password}")
client.connect(f"{host}", 1883, 10)
client.loop_start()
publish.single(
topic=topic,
payload=payload,
hostname=f"{host}",
auth={ "username": f"{username}", "password": f"{password}" }
)
time.sleep(10)
client.loop_stop()
self.assertEqual(self.code, 0)
self.assertEqual(self.topic, topic)
client.enable_logger(self.logger)
def test_payload_ipv4(self):
host = self.computer_partition.getConnectionParameterDict()["ipv4"]
username = self.computer_partition.getConnectionParameterDict()["username"]
password = self.computer_partition.getConnectionParameterDict()["password"]
topic = "test"
payload = "Hello, World!"
client = mqtt.Client()
client.on_connect = self.on_connect
client.on_message = self.on_message
client.username_pw_set(username=f"{username}", password=f"{password}")
client.connect(f"{host}", 1883, 10)
def on_connect(client, userdata, flags, rc):
client.subscribe(topic)
client.loop_start()
client.on_connect = on_connect
def on_subscribe(client, userdata, mid, granted_qos, properties=None):
# once our client is subscribed, publish from another connection
publish.single(
topic=topic,
payload=payload,
hostname=f"{host}",
auth={ "username": f"{username}", "password": f"{password}" }
hostname=host,
auth={"username": username, "password": password},
)
time.sleep(10)
client.loop_stop()
client.on_subscribe = on_subscribe
self.assertEqual(self.code, 0)
self.assertEqual(self.payload, payload)
def test_topic_ipv6(self):
host = self.computer_partition.getConnectionParameterDict()["ipv6"]
username = self.computer_partition.getConnectionParameterDict()["username"]
password = self.computer_partition.getConnectionParameterDict()["password"]
topic = "test"
payload = "Hello, World!"
def on_message(client, userdata, msg):
self.topic = msg.topic
self.payload = str(msg.payload.decode())
client = mqtt.Client()
client.on_connect = self.on_connect
client.on_message = self.on_message
client.username_pw_set(username=f"{username}", password=f"{password}")
client.connect(f"{host}", 1883, 10)
client.on_message = on_message
client.loop_start()
client.username_pw_set(username=username, password=password)
client.connect(host, port)
publish.single(
topic=topic,
payload=payload,
hostname=f"{host}",
auth={ "username": f"{username}", "password": f"{password}" }
)
time.sleep(10)
client.loop_stop()
self.topic = None # will be set by on_message
max_retries = 100 # give up after this number of iterations
for _ in range(max_retries):
client.loop()
if self.topic is not None:
break
self.assertEqual(self.code, 0)
self.assertEqual(self.topic, topic)
def test_payload_ipv6(self):
host = self.computer_partition.getConnectionParameterDict()["ipv6"]
username = self.computer_partition.getConnectionParameterDict()["username"]
password = self.computer_partition.getConnectionParameterDict()["password"]
topic = "test"
payload = "Hello, World!"
client = mqtt.Client()
client.on_connect = self.on_connect
client.on_message = self.on_message
client.username_pw_set(username=f"{username}", password=f"{password}")
client.connect(f"{host}", 1883, 10)
client.loop_start()
publish.single(
topic=topic,
payload=payload,
hostname=f"{host}",
auth={ "username": f"{username}", "password": f"{password}" }
)
time.sleep(10)
client.loop_stop()
self.assertEqual(self.code, 0)
self.assertEqual(self.payload, payload)
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment