I have a python project that I want to test. I already have unit test with unittest
but I need to do integration tests.
For that, I have two applications: the real one that I have to test, and a "testing" application that will send requests to the first one, wait for the response and then compare it with the expected result:
Like that I wil be able to test if the application is responding correctly to the requests.
For the moment, I have what I described above, but in the main.py (not a specific test file). Also, the comparaison is just done with print function, so that I can see that it works. But I have to execute these tests and be able to get results in a conventional format, like junit xml.
How can I write, run and get results of these tests ?
EDIT
I'm developing an Azure IoT Edge Module, and I'm using Route to wire the modules. Here is the code of the testing module, where I need to execute tests:
import random
import time
import sys
import iothub_client
import json
# pylint: disable=E0611
from iothub_client import IoTHubModuleClient, IoTHubClientError, IoTHubTransportProvider
from iothub_client import IoTHubMessage, IoTHubMessageDispositionResult, IoTHubError
# Callback received when the message that we're forwarding is processed.
def send_confirmation_callback(message, result, user_context):
print ( "Confirmation[%d] received for message with result = %s" % (user_context, result) )
# receive_message_callback is invoked when an incoming message arrives on INPUT queue
def receive_message_callback(message, hubManager):
message_buffer = message.get_bytearray()
size = len(message_buffer)
message_text = message_buffer[:size].decode('utf-8')
data = json.loads(message_text)
result = data["result"]
print ("expected_result: %d; result: %d ==> %r" %(EXPECTED_RESULT, result, EXPECTED_RESULT==result))
class HubManager(object):
def __init__(self, protocol=IoTHubTransportProvider.MQTT):
self.client_protocol = protocol
self.client = IoTHubModuleClient()
self.client.create_from_environment(protocol)
self.client.set_option("messageTimeout", MESSAGE_TIMEOUT)
# sets the callback when a message arrives on INPUT queue.
self.client.set_message_callback(INPUT, receive_message_callback, self)
# Forwards the message received onto the next stage in the process.
def forward_event_to_output(self, outputQueueName, event, send_context):
self.client.send_event_async(
outputQueueName, event, send_confirmation_callback, send_context)
def main(protocol):
try:
hub_manager = HubManager(protocol)
# Send request
message = "{\"param1\": %d,\"param2\": %d}" % (PARAM_1, PARAM_2)
msg_txt_formatted = IoTHubMessage(message)
hub_manager.forward_event_to_output(OUTPUT, msg_txt_formatted, 0)
while True:
time.sleep(1)
except IoTHubError as iothub_error:
print ( "Unexpected error %s from IoTHub" % iothub_error )
return
except KeyboardInterrupt:
print ( "IoTHubModuleClient sample stopped" )
if __name__ == '__main__':
main(PROTOCOL)