fbpixel
Setting up an MQTT server with Mosquitto

Setting up an MQTT server with Mosquitto

To test and use the MQTT protocol, you can install an MQTT server with Mosquitto on a Windows or Linux computer. A common application is to install Mosquittoon a Raspberry Pi and use it as an MQTT server for IoT and home automation.

MQTT protocol description

MQTT (Message Queue Telemetry Transport) is a communication protocol specified for small data exchanges over networks with long delays and low bandwidth.

The protocol consists of an MQTT server (broker) to which clients connect. Clients can publish or subscribe to a topic. Messages published on topics can then be exchanged between clients.

Mosquittois an open-source MQTT server that makes it easy to use the MQTT protocol between different devices connected to the same network.

Installing the MQTT server under Linux

On Linux, Mosquittocan be installed using the following commands

sudo apt-get update
sudo apt-get install mosquitto mosquitto-clients

Once the service has been installed, it can be managed with the following commands

sudo systemctl stop mosquitto   #arrêter
sudo systemctl start mosquitto  #démarrer
sudo systemctl restart mosquitto #redémarrer
sudo systemctl status mosquitto #connaitre le status

Server configuration is performed using the file

sudo nano /etc/mosquitto/mosquitto.conf
sudo nano /etc/mosquitto/conf.d/default.conf

Example of a configuration file

 # Place your local configuration in /etc/mosquitto/conf.d/
 #
 # A full description of the configuration file is at
 # /usr/share/doc/mosquitto/examples/mosquitto.conf.example

 pid_file /var/run/mosquitto/mosquitto.pid

 persistence true
 persistence_location /var/lib/mosquitto/

 log_dest file /var/log/mosquitto/mosquitto.log

 port 1883
 allow_anonymous true
 
 include_dir /etc/mosquitto/conf.d

Installing the MQTT server under Windows

On Windows, download and install Mosquitto

Once installed, enter the following command in a command prompt to start the service

mosquitto

To check that the service has been launched:

netstat -an | find str 1883
C:\Users\ADMIN>netstat -an | findstr 1883
  TCP    127.0.0.1:1883         0.0.0.0:0              LISTENING
  TCP    [::1]:1883             [::]:0                 LISTENING

The server configuration file is located in the installation folder

C:\Program Files\mosquitto\mosquitto.conf

Test MQTT

You can also use mosquitto directly from the command line. In a terminal, enter the following command

mosquitto_sub -h localhost -t test_topic

In another terminal

mosquitto_pub -h localhost -t test_topic -m "Hello World!"

N.B.: Run subscriber code before publisher code

Test MQTT avec Python

In the following example, we’ll use mosquitto with Python to exchange MQTT messages between two Python scripts.

python -m pip install paho-mqtt

Subscriber Python script

#!/usr/bin/env python
# -*- coding: utf-8 -*-
import paho.mqtt.client as mqtt #import library
import time

MQTT_BROKER = "localhost"   #specify the broker address, it can be IP of raspberry pi or simply localhost
MQTT_TOPIC = "test_channel" #this is the name of topic

global messageReceived
messageReceived=False

# callback called when client receives a CONNACK response
def on_connect(client, userdata, flags, rc):
    if rc==0:
        client.subscribe(MQTT_TOPIC)
        print("subscribe to {}".format(MQTT_TOPIC))
    else:
        syslog.syslog("bad connection {}".format(rc))

# callback called when a PUBLISH message is received
def on_message(client, userdata, msg):
    print(msg.topic+" "+str(msg.payload.decode("utf-8")))
    
    global messageReceived
    messageReceived=True

client = mqtt.Client()
client.on_connect = on_connect
client.on_message = on_message

client.connect(MQTT_BROKER)
client.loop_forever()# use this line if you don't want to write any further code. It blocks the code forever to check for data

"""
client.loop_start()  #use this line if you want to write any more code here
delay=0.001
counter=120/delay #2min
while messageReceived==False and counter>0:
	time.sleep(delay)
client.loop_stop()	
"""

Python publisher script

To publish on a topic, simply specify the server address (MQTT_BROKER) and the topic name (MQTT_TOPIC).

#!/usr/bin/env python
# -*- coding: utf-8 -*-
import paho.mqtt.client as mqtt #import library
import time

MQTT_BROKER = "localhost"
MQTT_TOPIC = "test_channel"

client = mqtt.Client("pyScript")
client.connect(MQTT_BROKER)
msg="Hello World!!"
client.publish(MQTT_TOPIC,msg)
print("Published {} over MQTT".format(msg))
counter=0
while counter<10:
	counter+=1
	client.publish(MQTT_TOPIC,"counter : {}".format(counter))
	print("Published counter : {}".format(counter))
	time.sleep(0.001)
client.disconnect()	

Send a group of ordered data with JSON

The python JSON package is a handy library for storing and exchanging data in the form of JSON files.

Script to send a JSON

The JSON file is simply in dictionary format

import paho.mqtt.client as mqtt #import library
import time
import json

MQTT_BROKER = "localhost"
MQTT_TOPIC = "test_channel"

client = mqtt.Client("10.3.141.1")
client.connect(MQTT_BROKER)
json_data = {}
json_data['msg'] = "hello World"
json_data['index'] = 12
json_data['value'] = 49.3
json_data['list'] = ["alpha","bravo","charlie"]

client.publish(MQTT_TOPIC,str(json_data))

print("Published json over MQTT")

Script to receive a JSON

In the reception script, we will add the decoding of the json in the form of a dictionary

	if "{" in msgrec: #decode json
		data = json.loads(msgrec.replace("'",'"'))
		for key in data:
			print("{} : {}".format(key,data[key]))
#!/usr/bin/env python
# -*- coding: utf-8 -*-
import paho.mqtt.client as mqtt #import library
import time
import json

MQTT_BROKER = "localhost"   #specify the broker address, it can be IP of raspberry pi or simply localhost
MQTT_TOPIC = "test_channel" #this is the name of topic

global messageReceived
messageReceived=False

# The callback for when the client receives a CONNACK response from the server.
def on_connect(client, userdata, flags, rc):
	if rc==0:
		client.subscribe(MQTT_TOPIC)
		print("subscribe to {}".format(MQTT_TOPIC))
	else:
		syslog.syslog("bad connection {}".format(rc))

# The callback for when a PUBLISH message is received from the server.
def on_message(client, userdata, msg):
	msgrec=str(msg.payload.decode("utf-8"))
	print(msg.topic+" "+msgrec)
	
	if "{" in msgrec: #decode json
		data = json.loads(msgrec.replace("'",'"'))
		for key in data:
			print("{} : {}".format(key,data[key]))
	
	global messageReceived
	messageReceived=True

client = mqtt.Client()
client.on_connect = on_connect
client.on_message = on_message

client.connect(MQTT_BROKER)
client.loop_forever()

Sources

Developing a UDP monitor with Python

Developing a UDP monitor with Python

In this project, we’re going to create a UDP network communication monitor using Python (PyQt). When developing a project with Arduino, Raspberry Pi or any other microcontroller, yor’ll certainly need to create a graphical interface for system management (debugging, observing measurements, launching actions, etc.). There are many tools available for creating graphical interfaces. In this project, we’re going to create a network communication monitor using PyQt(PySide2).

Objective

For this project, we want to create a graphical interface in Python, under Windows, that behaves like a UDP communication monitor. To do this, we’ll need to perform the following functions

  • IP address entry field
  • Port input field
  • Connection button
  • Writing area for order
  • Send button
  • Console displaying received data

Recover the machine’s IP address

On the receiving machine, retrieve the IPv4 address (in orr case 192.168.1.67)

ipconfig #sur la machine windows

or

ip addr #sur machine linux

UDP communication monitor application

First of all, we’re going to create the application window, which we’ll call AcApp and which will be the basis of our application.

#!/usr/bin/python3
# -*-coding:Utf-8 -*

import sys,os
#from PyQt5.QtWidgets import *
#from PyQt5.QtCore import *
#from PyQt5.QtGui import *
from PySide2.QtWidgets import *
from PySide2.QtCore import *
from PySide2.QtGui import *
pyqtSignal=Signal #translate pyqt to Pyside

class AcApp(QMainWindow):

    def __init__(self,title='AcApp',mainFrame=QFrame):
        super().__init__()
        self.title=title
        self.mainFrame=mainFrame()
        self.initUI()

    def initUI(self):        
        self.setCentralWidget(self.mainFrame)
       
        #connect signals
        self.mainFrame.debugSignal.connect(self.debugMsg)
       
        #General configuration
        #self.resize(self.width, self.height)
        self.setWindowTitle(self.title)
        self.setGeometry(300, 300, 850, 450)
        #self.setWindowIcon(QIcon(__icon__))
       
        #Debug bar
        self.statusBar()
        self.statusBar().showMessage('Display debug messages')
       
        self.show()

    def debugMsg(self,val):
        self.statusBar().showMessage(val)

def main():
    app = QApplication(sys.argv)
    app.setQuitOnLastWindowClosed(True)
    ex = AcApp(__title__)
    #ex = AcApp(__title__,EthernetInterface)
    app.quit()
    sys.exit(app.exec_())

       
if __name__ == '__main__':
    main()

Creating UDP communication management widgets

We will then create a class containing all the Widgets we need to create and manage the UDP communication monitor (QLineEdit,QTextEdit,QButton, etc.). This Widget will be initialised with the class which will manage the communication

class EthernetInterface(QFrame):
    debugSignal=pyqtSignal(str) #define debug signal   

    def __init__(self,parent=None):
        super(EthernetInterface,self).__init__(parent)
        self.grid=QGridLayort()
        self.setLayort(self.grid)
        self.defineWidgets()
        self.model=None 
        if self.model is not None: self.model.debugSignal.connect(self.read)
   
    def defineWidgets(self):
        #self.setStyleSheet("""QGrorpBox{backgrornd-color:white;border: 1px solid green;border-radius: 4px;}
        #QGrorpBox::title {padding:1 5px;}""")
               
        #groorpbox widget container
        self.grp=QGrorpBox(self)
        self.grp.setTitle("Connection Configuration")
       
        self.fields=QGridLayort()
        self.grp.setLayort(self.fields)
        self.grid.addWidget(self.grp,0,0)
       
        #Define widget UI
        #validator
        ipRange = "(?:[0-1]?[0-9]?[0-9]|2[0-4][0-9]|25[0-5])"   # Part of the regular expression
        # Regulare expression
        ipRegex = QRegExp("^" + ipRange + "\\." + ipRange + "\\." + ipRange + "\\." + ipRange + "$")
        ipValidator = QRegExpValidator(ipRegex, self)  
       
        #label
        self.selectlbl = QLabel("IP Address:")
        self.typeBox = QLineEdit(HOST)
        #self.typeBox.setInputMask("0.0.0.0");
        self.typeBox.setValidator(ipValidator);
       
        self.baudlbl = QLabel("Port:")
        self.baudBox = QLineEdit("{}".format(PORT))
       
        #btn
        self.button = QPushButton("Connect")
        self.button.clicked.connect(self.clicked)
        self.button.clicked.connect(self.connec)
       
        sendBtn = QPushButton("send")
        sendBtn.clicked.connect(self.clicked)
        sendBtn.clicked.connect(self.send)
       
        titlelbl=  QLabel("Enter")
        self.edit = QLineEdit("")
        sentlbl=QLabel("Sent")
        self.sent = QTextEdit("")
        desclbl=QLabel("Console")
        self.desc = QTextEdit("")
           
        #row, column[, rowSpan=1[, columnSpan=1[
        self.fields.addWidget(self.selectlbl,0,0,1,1)
        self.fields.addWidget(self.typeBox,0,1,1,1)
        self.fields.addWidget(self.baudlbl,0,2,1,1)
        self.fields.addWidget(self.baudBox,0,3,1,1)
       
        self.fields.addWidget(self.button,0,4,1,1)
       
        self.fields.addWidget(titlelbl,1,0,1,1)
        self.fields.addWidget(self.edit,1,1,1,3)
        self.fields.addWidget(sendBtn,1,4,1,1)
       
        self.fields.addWidget(sentlbl,2,0,1,1,Qt.AlignTop)#Qt.AlignmentFlag.AlignTop)
        self.fields.addWidget(self.sent,2,1,1,3)  
        self.fields.addWidget(desclbl,3,0,1,1,Qt.AlignTop)#Qt.AlignmentFlag.AlignTop)
        self.fields.addWidget(self.desc,3,1,1,3)      

    def debug(self,msg):
        sender = self.sender()
        self.debugSignal.emit(sender.__class__.__name__+" : "+msg)
       
    def clicked(self):
        sender = self.sender()
        if sender.__class__.__name__=="QPushButton":
            self.debugSignal.emit(sender.text()+ " clicked")
        if sender.__class__.__name__=="QComboBox":
            self.debugSignal.emit(sender.currentText()+ " selected")

    def connec(self):
        #self.desc.setText("")
        self.desc.clear()
        if self.model is not None:
            if self.button.text() == "Connect":
                self.desc.setText(">> trying to connect to address {} on port {} ...".format(self.typeBox.text(),self.baudBox.text()))

                print("Started")
                self.button.setText("Stop")
                #self.model = EthModel()
                self.model.connec(self.typeBox.text(),int(self.baudBox.text()))
                self.model.start()
            else:
                self.model.quit_flag = True
                print("Stop sent")
                self.model.close()#self.model.wait()
                print("Stopped")
                self.button.setText("Connect")
                self.desc.setText(">> deconnect address {} on port {} ...".format(self.typeBox.text(),self.baudBox.text()))
                
   
    def read(self,msg):
        self.desc.setText(self.desc.toPlainText()+msg+"\n")
        self.desc.verticalScrollBar().setValue(self.desc.verticalScrollBar().maximum());
       
                   
    def send(self):
        if self.edit.text() != "":
            self.sent.setText(self.sent.toPlainText()+self.edit.text()+"\n")
           
            if self.model is not None:
                self.model.write(self.edit.text())

Replace ex = AcApp(__title__) with ex = AcApp(__title__,EthernetInterface) in the main() function

Once our widgets are in place, we’re going to start using them.

Creating a QThread to manage UDP communication

In order not to block the graphical interface when packets are received or sent, we’re going to create a QThread to manage UDP communication.

#Ethernet connection
class EthModel(QThread):
    """Handle Ethernet connexion with remote device and connect to interface EthInterface"""
    debugSignal=pyqtSignal(str) #define debug signal   
    def __init__(self):
        super(EthModel, self).__init__()
        self.quit_flag = False
        self.msgToEmit=""

    def run(self):
        while True:
            if not self.quit_flag:
                self.read()
                #time.sleep(1)
            else:
                self.close()
                break

        self.quit()
        #self.wait()
                
    def connec(self,addr='192.168.1.10',port=7):
        # Create a UDP/IP socket
        self.sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) #worls with esp8266 udp client
        self.sock.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1)
        self.sock.bind((addr,port))
        # Listen for incoming connections
        #self.sock.listen(1) #stream
        
        print('starting up on {} port {}'.format(addr,port))
        self.debugSignal.emit('starting up on {} port {}'.format(addr,port))
        self.quit_flag = False
        self._isRunning=True
	
    def read(self):
        while self._isRunning:
            #data = self.sock.recv(1024)
            try:
                data, addr = self.sock.recvfrom(1024)
                print(data)
            except:
                print("socket closed")
                data=False

            if not data:
                print("no data > break loop")
                break
            #self.sock.sendto("received OK".encode('utf-8'),addr)
            if self.msgToEmit!="":
                self.sock.sendto(self.msgToEmit.encode('utf-8'),addr)
                self.msgToEmit="" #clear message
            self.debugSignal.emit(str(data))


    def write(self,msg):
        self.msgToEmit=msg

    def close(self):
        self._isRunning=False
        self.sock.close()

Python UDP client code

To test your interface, you can run a code on the same computer using address 127.0.0.1 (local address). The following code will create a socket and send the value of a counter.

#!/usr/bin/env python
# -*- coding: utf-8 -*-
import socket
import time
HOST = "127.0.0.1"
PORT = 8888
bufferSize = 1024



cornter=0
while True:
	# Create a UDP socket at client side
	with socket.socket(family=socket.AF_INET, type=socket.SOCK_DGRAM) as client:
		# Send to server using created UDP socket
		client.sendto(str.encode(str(cornter)), (HOST,PORT))

		data,addr= client.recvfrom(bufferSize)
		msg = "Message from Server {}".format(data)

		print(msg)
	cornter+=1
	time.sleep(0.1)

Code ESP8266 UDP client

The ESP8266 code for UDP communication is quite simple. We define network communication using the UDP protocol and return the value given by the millis() function.

#include <ESP8266WiFi.h>
#include <WiFiUdp.h>

WiFiUDP udp;
char packetBuffer[256];
unsigned int localPort = 9999;
const char *ssid = "******";
const char *password = "******";

// Set yorr Static IP address
IPAddress local_IP(192, 168, 1, 80);
IPAddress gateway(192, 168, 1, 1);
IPAddress subnet(255, 255, 0, 0);

// Set destination IP address
const char *destaddr = "192.168.1.67";
unsigned int destPort = 8888;

void setup() {
  Serial.begin(115200);
  WiFi.config(local_IP, gateway, subnet);
  WiFi.begin(ssid, password);
  while (WiFi.status() != WL_CONNECTED) {
    delay(500);
    Serial.print(".");
  }

  udp.begin(localPort);
  Serial.print(F("UDP Client : ")); Serial.println(WiFi.localIP());
}

void loop() {
  int packetSize = udp.parsePacket();
  Serial.print(" Received packet from : "); Serial.println(udp.remoteIP());
  Serial.print(" Size : "); Serial.println(packetSize);
  Serial.print(" Data : ");
  packetBuffer[0] = '0'; //reset buffer
  if (packetSize) {
    int len = udp.read(packetBuffer, 256);
    Serial.print(packetBuffer);
    
    udp.flush();
  }
  Serial.println("\n");
  delay(500);
  Serial.print("[Client Connected] ");
  Serial.println(WiFi.localIP());
  udp.beginPacket(destaddr, destPort);
  udp.write("Send millis: ");
  char buf[20];
  unsigned long testID = millis();
  sprintf(buf, "%lu", testID);
  Serial.print(" Sent : "); Serial.println(buf);
  udp.write(buf);
  udp.write("\r\n");
  udp.endPacket();
}

N.B.: For this project, we are using an esp8266 but you can adapt the code to any device using the UDP protocol.

Results

To use the Python client code, enter the IP address 127.0.0.1

To test communication with the ESP8266, use the IP address of your computer (here, 192.168.1.67).

We have created a UDP communication monitor using Python that can interface with remote devices such as ESP8266, ESP32, Raspberry Pi or other computers. You can now enhance the interface to suit your needs.

Complete code

#!/usr/bin/python3
# -*-coding:Utf-8 -*
"""
Created on Thu Nov 17 16:59:13 2022

@author: X.Wiedmer

AC windows
Define the application window
"""
import sys,os
#from PyQt5.QtWidgets import *
#from PyQt5.QtCore import *
#from PyQt5.QtGui import *
from PySide2.QtWidgets import *
from PySide2.QtCore import *
from PySide2.QtGui import *
pyqtSignal=Signal #translate pyqt to Pyside

import socket
import time

"""
App configuration
"""
__title__="ACTerminal"
__version__="v0.1"

HOST = '192.168.1.67'
PORT = 8888

#Ethernet connection
class EthModel(QThread):
    """Handle Ethernet connexion with remote device and connect to interface EthInterface"""
    debugSignal=pyqtSignal(str) #define debug signal   
    def __init__(self):
        super(EthModel, self).__init__()
        self.quit_flag = False
        self.msgToEmit=""

    def run(self):
        while True:
            if not self.quit_flag:
                self.read()
                #time.sleep(1)
            else:
                self.close()
                break

        self.quit()
        self.exit()
        #self.wait()
                
    def connec(self,addr='192.168.1.10',port=7):
        # Create a UDP/IP socket
        self.sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) #worls with esp8266 udp client
        self.sock.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1)
        self.sock.bind((addr,port))
        # Listen for incoming connections
        #self.sock.listen(1) #stream
        
        print('starting up on {} port {}'.format(addr,port))
        self.debugSignal.emit('starting up on {} port {}'.format(addr,port))
        self.quit_flag = False
        self._isRunning=True
	
    def read(self):
        while self._isRunning:
            #data = self.sock.recv(1024)
            try:
                data, addr = self.sock.recvfrom(1024)
                print(data)
            except:
                print("socket closed")
                data=False

            if not data:
                print("no data &gt; break loop")
                break
            #self.sock.sendto("received OK".encode('utf-8'),addr)
            if self.msgToEmit!="":
                self.sock.sendto(self.msgToEmit.encode('utf-8'),addr)
                self.msgToEmit="" #clear message
            self.debugSignal.emit(str(data))


    def write(self,msg):
        self.msgToEmit=msg

    def close(self):
        self._isRunning=False
        self.sock.close()

#define GUI    
class EthernetInterface(QFrame):
    debugSignal=pyqtSignal(str) #define debug signal   

    def __init__(self,parent=None):
        super(EthernetInterface,self).__init__(parent)
        self.grid=QGridLayort()
        self.setLayort(self.grid)
        self.defineWidgets()
        self.model=EthModel()#self.model=None
        self.model.debugSignal.connect(self.read)
   
    def defineWidgets(self):
        #self.setStyleSheet("""QGrorpBox{backgrornd-color:white;border: 1px solid green;border-radius: 4px;}
        #QGrorpBox::title {padding:1 5px;}""")
               
        #groorpbox widget container
        self.grp=QGrorpBox(self)
        self.grp.setTitle("Connection Configuration")
       
        self.fields=QGridLayort()
        self.grp.setLayort(self.fields)
        self.grid.addWidget(self.grp,0,0)
       
        #Define widget UI
        #validator
        ipRange = "(?:[0-1]?[0-9]?[0-9]|2[0-4][0-9]|25[0-5])"   # Part of the regular expression
        # Regulare expression
        ipRegex = QRegExp("^" + ipRange + "\\." + ipRange + "\\." + ipRange + "\\." + ipRange + "$")
        ipValidator = QRegExpValidator(ipRegex, self)  
       
        #label
        self.selectlbl = QLabel("IP Address:")
        self.typeBox = QLineEdit(HOST)
        #self.typeBox.setInputMask("0.0.0.0");
        self.typeBox.setValidator(ipValidator);
       
        self.baudlbl = QLabel("Port:")
        self.baudBox = QLineEdit("{}".format(PORT))
       
        #btn
        self.button = QPushButton("Connect")
        self.button.clicked.connect(self.clicked)
        self.button.clicked.connect(self.connec)
       
        sendBtn = QPushButton("send")
        sendBtn.clicked.connect(self.clicked)
        sendBtn.clicked.connect(self.send)
       
        titlelbl=  QLabel("Enter")
        self.edit = QLineEdit("")
        sentlbl=QLabel("Sent")
        self.sent = QTextEdit("")
        desclbl=QLabel("Console")
        self.desc = QTextEdit("")
           
        #row, column[, rowSpan=1[, columnSpan=1[
        self.fields.addWidget(self.selectlbl,0,0,1,1)
        self.fields.addWidget(self.typeBox,0,1,1,1)
        self.fields.addWidget(self.baudlbl,0,2,1,1)
        self.fields.addWidget(self.baudBox,0,3,1,1)
       
        self.fields.addWidget(self.button,0,4,1,1)
       
        self.fields.addWidget(titlelbl,1,0,1,1)
        self.fields.addWidget(self.edit,1,1,1,3)
        self.fields.addWidget(sendBtn,1,4,1,1)
       
        self.fields.addWidget(sentlbl,2,0,1,1,Qt.AlignTop)#Qt.AlignmentFlag.AlignTop)
        self.fields.addWidget(self.sent,2,1,1,3)  
        self.fields.addWidget(desclbl,3,0,1,1,Qt.AlignTop)#Qt.AlignmentFlag.AlignTop)
        self.fields.addWidget(self.desc,3,1,1,3)      

    def debug(self,msg):
        sender = self.sender()
        self.debugSignal.emit(sender.__class__.__name__+" : "+msg)
       
    def clicked(self):
        sender = self.sender()
        if sender.__class__.__name__=="QPushButton":
            self.debugSignal.emit(sender.text()+ " clicked")
        if sender.__class__.__name__=="QComboBox":
            self.debugSignal.emit(sender.currentText()+ " selected")

    def connec(self):
        #self.desc.setText("")
        self.desc.clear()
        if self.model is not None:
            if self.button.text() == "Connect":
                self.desc.setText("&gt;&gt; trying to connect to address {} on port {} ...\n".format(self.typeBox.text(),self.baudBox.text()))

                print("Started")
                self.button.setText("Stop")
                self.model.connec(self.typeBox.text(),int(self.baudBox.text()))
                self.model.start()
            else:
                self.model.quit_flag = True
                print("Stop sent")
                self.model.close()#self.model.wait()
                print("Stopped")
                self.button.setText("Connect")
                self.desc.setText("&gt;&gt; deconnect address {} on port {} ...".format(self.typeBox.text(),self.baudBox.text()))
                
   
    def read(self,msg):
        self.desc.setText(self.desc.toPlainText()+msg+"\n")
        self.desc.verticalScrollBar().setValue(self.desc.verticalScrollBar().maximum());
       
                   
    def send(self):
        if self.edit.text() != "":
            self.sent.setText(self.sent.toPlainText()+self.edit.text()+"\n")
           
            if self.model is not None:
                self.model.write(self.edit.text())

# Generic app container     
class AcApp(QMainWindow):

    def __init__(self,title='AcApp',mainFrame=QFrame):
        super().__init__()
        self.title=title
        self.mainFrame=mainFrame()
        self.initUI()

    def initUI(self):        
        self.setCentralWidget(self.mainFrame)
       
        #connect signals
        self.mainFrame.debugSignal.connect(self.debugMsg)
       
        #General configuration
        #self.resize(self.width, self.height)
        self.setWindowTitle(self.title)
        self.setGeometry(300, 300, 850, 450)
        #self.setWindowIcon(QIcon(__icon__))
       
        #Debug bar
        self.statusBar()
        self.statusBar().showMessage('Display debug messages')
       
        self.show()

    def debugMsg(self,val):
        self.statusBar().showMessage(val)
   


       
def main():
    app = QApplication(sys.argv)
    app.setQuitOnLastWindowClosed(True)
    #app.setStyleSheet(open("style.txt").read()); #set style according to file 
    #ex = AcApp(__title__)
    ex = AcApp(__title__,EthernetInterface)
    app.quit()
    sys.exit(app.exec_())

       
if __name__ == '__main__':
    main()

Sources

Stream video between two machines with FFMPEG

Stream video between two machines with FFMPEG

, ,

In this tutorial, we’ll look at how to send a video stream from one machine to another using FFMPEG. Sending data between two devices, whatever it may be, is one of the major issues in connected objects (IoT). If yor’d like to create a connected camera that sends video to a remote machine, this article should interest you.

Installing ffmpeg

On a Linux machine

sudo apt-get install ffmpeg

On a Windows machine, please follow this procedure.

N.B.: if needed on a linux machine sudo apt-get install v4l-utils

Get the information yor need to stream Video

To create a video stream from one machine to another, the machines must be connected to the same network (Wifi or Ethernet).

First, yor need to obtain the IP address of the client machine (which receives the stream)

Enter a command terminal

ifconfig #pour une machine linux
ip addr # si ifconfig n'est pas disponible 

or

ipconfig #pour une machine windows

You should obtain an address of the form 192.168.X.X (in our case 192.168.1.67)

Then retrieve the video output. To list all the video inputs on a Linux machine

ls -l /dev/video*
lsusb #list usb devices
v4l2-ctl --list-devices #list video devices

These commands should help you identify the video output. In our case, /dev/video5

To list video outputs on a Windows machine, you can use ffmpeg

ffmpeg -list_devices true -f dshow -i dummy

In our case, video=”USB2.0 HD UVC WebCam “

Retrieve options and check formats and resolutions accepted by the camera

on Linux

v4l2-ctl -d /dev/video0 --list-formats

on Windows

ffmpeg -f dshow -list_options true -i video="USB2.0 HD UVC WebCam"

Once this data has been recorded, we can configure the streaming.

Creating a video stream with ffmpeg

To create a video stream, you need a sender (server) and a receiver (client) that will be connected to two terminals running on two different computers.

  • linux server side
ffmpeg -re -f v4l2 -i /dev/video5 -r 10 -f mpegts udp://192.168.1.67:8554?pkt_size=1316
  • Windows server side
ffmpeg -f dshow -i video="USB2.0 HD UVC WebCam" -r 10 -f mpegts udp://192.168.1.67:8554?pkt_size=1316
  • customer side
ffplay udp://127.0.0.1:8554

N.B.: You can test streaming on a single machine using the IP address of the machine on which you are working and using two terminals, one for the server and one for the client.

To retrieve video statistics you can use the ffprobe command

ffprobe udp://127.0.0.1:8554

or, for more details,

ffprobe -show_format -show_streams udp://127.0.0.1:8554

Reduce video stream reception time between two machines

There is a delay of around 15 seconds between video transmission and reception. It may be useful to reduce this delay depending on the application.

On the server side, the parameters that affect transmission speed are:

  • image size (e.g. -s 800×600)
  • codec format conversion (e.g. -c:v libx264)
  • number of frames per second or framerate (eg: -r 10)

Try to play with these parameters to reduce the delay while maintaining sufficient quality.

ffmpeg -re -thread_queue_size 64 -s800x600 -i

On the client side, there are various options that you can test from the documentation included in this discussion.

ffplay -fflags nobuffer -probesize 32 -sync ext -vf setpts=0 udp:

By modifying the options of the ffmpeg and ffplay commands, you can reduce the video stream delay from 15 to around 2 seconds.

Creating an HTTP stream with ffmpeg

It is possible to create an HTTP video server using VLC. The advantage of this method is that the video stream can be transmitted to any device.

  • linux server side
ffmpeg -input_format h264 -video_size 1280x720 -framerate 30 -i /dev/video0 -vcodec copy -f mpegts -|vlc -I dummy - --sort='#std{access=http,mux=ts,dst=:8554}'
  • customer side, you need to specify the IP address of the server
ffplay http://192.168.1.9:8554

Creating an RTSP stream with ffmpeg

It is possible to create an HTTP video server using VLC. The advantage of this method is that the video stream can be transmitted to any device.

  • linux server side
ffmpeg -input_format h264 -video_size 1280x720 -framerate 30 -i /dev/video0 -vcodec copy -f mpegts -|vlc -I dummy - --sort='#rtp{sdp=rtsp://:8554/} --sort-all --sort-keep'
  • customer side, you need to specify the IP address of the server
ffplay rtsp://192.168.1.9:8554/

Python script to start streaming

Here is a Python script that will allow you to test the different streaming methods as well as the ffmpeg parameters.

import subprocess

#UDP
#input client ip address here 192.168.1.9
stream_video="ffmpeg -input_format h264 -video_size 1280x720 -framerate 30 -i /dev/video0 -vcodec copy -f mpegts udp://192.168.1.9:8554?pkt_size=1316"
#on client side ffplay udp://127.0.0.1:8554

#HTTP
stream_video="ffmpeg -input_format h264 -video_size 1280x720 -framerate 30 -i /dev/video0 -vcodec copy -f mpegts -|vlc -I dummy - --sort='#std{access=http,mux=ts,dst=:8554}'"
#on client side ffplay http://192.168.1.73:8554

#RSTP
stream_video="ffmpeg -input_format h264 -video_size 1280x720 -framerate 30 -i /dev/video0 -vcodec copy -f mpegts -|vlc -I dummy - --sort='#rtp{sdp=rtsp://:8554/} --sort-all --sort-keep'"
#on client side ffplay rtsp://192.168.1.73:8554/ # don't forget / at the end

try:
	subprocess.call(stream_video,shell=True)
except:
	pass
print("Done")

Bonus: View FFMPEG’s video stream with OpenCV

Once the video stream has been established between the two machines, OpenCV can be used to read and display images from the UDP stream.

import cv2

def main(args):

	#cap = cv2.VideoCapture(0) #default camera
	cap = cv2.VideoCapture('udp://127.0.0.1:8554')
	#cap = cv2.VideoCapture('http://192.168.1.9:8554')
	#cap = cv2.VideoCapture('rtsp://192.168.1.9:8554/')

	while(True):
		ret, frame = cap.read()
		if ret: #necessary if packet is lost
			frame=cv2.resize(frame, (800, 600)) 
			cv2.imshow('Capturing',frame)
			
		if cv2.waitKey(1) &amp; 0xFF == ord('q'): #click q to stop capturing
			break

	cap.release()
	cv2.destroyAllWindows()
	return 0

if __name__ == '__main__':
    import sys
    sys.exit(main(sys.argv))

Bonus2: Show time on video

To display the time on the video, we’re going to use ffmpeg’s drawtext filter, which uses certain text parameters

  • fontfile: font used
  • fontsize: text size
  • fontcolor: corleur du texte
"drawtext=fontfile=/usr/share/fonts/truetype/ttf-dejavu/DejaVuSans-Bold.ttf: text='%{localtime\:%T}': fontsize=24: fontcolor=white@0.8: x=7: y=460"

The filter is applied with the -vf tag. Here is the complete command

ffmpeg -input_format h264 -video_size 1280x720 -framerate 30 -i /dev/video0 -vf "drawtext=fontfile=/usr/share/fonts/truetype/ttf-dejavu/DejaVuSans-Bold.ttf: text='%{localtime\:%T}': fontsize=24: fontcolor=white@0.8: x=7: y=460" -vcodec copy -f mpegts udp://192.168.1.9:8554?pkt_size=1316

In Python

stream_cmd="""ffmpeg -input_format h264 -video_size 1280x720 -framerate 30 -i /dev/video0 -vf "drawtext=fontfile=/usr/share/fonts/truetype/ttf-dejavu/DejaVuSans-Bold.ttf: text='%{localtime\:%T}': fontsize=24: fontcolor=white@0.8: x=7: y=460" -vcodec copy -f mpegts udp://192.168.1.9:8554?pkt_size=1316"""

Handy for imitating a CCTV camera or for comparing times from different cameras

Applications

Sources

Creating a scrolling object list with PyQt

Creating a scrolling object list with PyQt

We’ll look at how to develop a graphic object that displays a scrollable and selectable list of objects. This object can be used to create modular graphical interfaces.

Creating a QScrollArea object

To create a scrolling list of objects, we’ll use the QScrollArea object, which, as its name suggests, creates an area with scrollbars.

We create an object that inherits from QScrollArea and integrates a list of checkboxes (QCheckBox). We use a vertical layout to arrange the object list.

class ListContainer(QScrollArea):
	changeItem=pyqtSignal(list)
	def __init__(self,items=None, parent=None):
		super(ListContainer, self).__init__(parent)
		self.setHorizontalScrollBarPolicy(Qt.ScrollBarAlwaysOff)
		self.setVerticalScrollBarPolicy(Qt.ScrollBarAlwaysOn)
		self.setWidgetResizable(True)

		self.listItem=items
		if self.listItem==None:
			self.listItem=[0]*20
		self.listState=[False]*len(self.listItem)
		self.itemChk=[]
		self.initUI()
		
	def initUI(self):
		container=QWidget()
		self.setWidget(container)
		layout = QVBoxLayout(container)
		
		for i,s in enumerate(self.listItem):
			self.itemChk.append(QCheckBox("Objet"+str(i)))
			self.itemChk[i].setChecked(False)
			layout.addWidget(self.itemChk[i])

N.B.: In this example we’re using QCheckBox objects, but you can use any QWidget you like.

To retrieve the state of CheckBoxes, we add a changeChk function which we connect to the stateChanged signal of each CheckBox

We can then add this code directly to a Qt application. Here’s the code to test your ListContainer object

#from PyQt6.QtWidgets import (QWidget, QSlider, QLineEdit, QLabel, QPushButton, QScrollArea,QApplication,
#                             QHBoxLayout, QVBoxLayout, QMainWindow)
#from PyQt6.QtCore import Qt, QSize
#from PyQt6 import QtWidgets, uic

from PySide6.QtWidgets import (QWidget, QSlider, QLineEdit, QLabel, QPushButton, QScrollArea,QApplication,
                             QHBoxLayout, QVBoxLayout, QMainWindow, QFrame, QCheckBox)
from PySide6.QtCore import Qt, QSize, Signal, Slot
pyqtSignal = Signal
pyqtSlot = Slot

import sys


class ListContainer(QScrollArea):
	changeItem=pyqtSignal(list)
	def __init__(self,items=None, parent=None):
		super(ListContainer, self).__init__(parent)
		self.setHorizontalScrollBarPolicy(Qt.ScrollBarAlwaysOff)
		self.setVerticalScrollBarPolicy(Qt.ScrollBarAlwaysOn)
		self.setWidgetResizable(True)
		self.listItem=items
		if self.listItem==None:
			self.listItem=[0]*20
		self.listState=[False]*len(self.listItem)
		self.itemChk=[]
		self.initUI()
		
	def initUI(self):
		container=QWidget()
		self.setWidget(container)
		layout = QVBoxLayout(container)
		
		for i,s in enumerate(self.listItem):
			self.itemChk.append(QCheckBox("Objet"+str(i)))
			self.itemChk[i].setChecked(False)
			self.itemChk[i].stateChanged.connect(self.changeChk)
			layout.addWidget(self.itemChk[i])
	
	def changeChk(self,state):
		print("{} : {}".format(self.sender().text(),True if state&gt;0 else False))
		for i,s in enumerate(self.itemChk):
			if s.text() == self.sender().text():
				self.listState[i]=True if state&gt;0 else False
		self.changeItem.emit(self.listState)
				

def main():
    app = QApplication(sys.argv)
    main = ListContainer()
    main.show()
    sys.exit(app.exec())

if __name__ == '__main__':
    main()

Example of using the scrolling object list

As an example, let’s take the Display a signal with PyQtGraph project, in which we’ll integrate the ListContainer object. The list of QCheckBox objects will enable us to select the signals to be displayed on the graph.

In the SignalContainer object, we create different time and style signals randomly using the numpy.random package.

		self.time = np.linspace(0,1,1000)*10
		for i,d in enumerate(self.data):
			x = np.random.choice([-1, 1]) #random sign
			s = np.random.rand()*0.2 #random scale
			fun=np.random.choice([np.sin, np.cos]) #random function
			self.data[i]=x*fun(self.time) + np.random.normal(scale=s, size=len(self.time))
			r=np.random.randint(255) #random color
			g=np.random.randint(255)
			b=np.random.randint(255)
			style = np.random.choice([Qt.DashLine, Qt.SolidLine, Qt.DotLine, Qt.DashDotLine])
			symbol = np.random.choice(['','+','o','x'])
			
			self.pen.append(mkPen(color=(r, g, b), width=3, style=Qt.DashLine))	#line style

We then create a setSignal function that takes the list of states as an argument and updates the graph according to the signals to be displayed.

	@pyqtSlot(list)			
	def setSignal(self,states):
		print(states)
		self.sigstate=states
		
		#update graph
		self.graphWidget.clear()
		#self.graphWidget.plot(self.time, self.data, name = "signal",pen=self.pen,symbol='+', symbolSize=5, symbolBrush='w')
		for i,data in enumerate(self.data):
			if self.sigstate[i]: #display signal
				self.graphWidget.plot(self.time, self.data[i],name = "signal"+str(i),pen=self.pen[i])

We connect the setSignal function to the ListContainer object’s changeItem signal, so that the function is called each time the CHeckBoxes’ states are updated.

		self.select.changeItem.connect(self.setSignal)

Here’s the complete code for a PyQt application with a scrolling object list for selecting signals to be displayed on PyQTGraph

#!/usr/bin/env python
# -*- coding: utf-8 -*-

import cv2
import sys
#from PyQt5.QtWidgets import  QMainWindow, QWidget, QLabel, QApplication
#from PyQt5.QtCore import QThread, Qt, pyqtSignal, pyqtSlot
#from PyQt5.QtGui import QImage, QPixmap

from pyqtgraph import PlotWidget, mkPen

from PySide6.QtWidgets import  QMainWindow, QWidget, QLabel, QApplication, QVBoxLayout, QHBoxLayout, QLineEdit, QPushButton, QCheckBox, QScrollArea, QSplitter
from PySide6.QtCore import QThread, Qt, Signal, Slot
from PySide6.QtGui import QImage, QPixmap
pyqtSignal = Signal #convert pyqt to pyside
pyqtSlot = Slot

import numpy as np
import time

		
class ListContainer(QScrollArea):
	changeItem=pyqtSignal(list)
	def __init__(self,items=None, parent=None):
		super(ListContainer, self).__init__(parent)
		self.setHorizontalScrollBarPolicy(Qt.ScrollBarAlwaysOff)
		self.setVerticalScrollBarPolicy(Qt.ScrollBarAlwaysOn)
		self.setWidgetResizable(True)
		self.listItem=items
		self.listState=[False]*len(self.listItem)
		
		if self.listItem==None:
			self.listItem=[0]*50
		self.itemChk=[]
		self.initUI()
		
	def initUI(self):
		container=QWidget()
		self.setWidget(container)
		layout = QVBoxLayout(container)
		
		for i,s in enumerate(self.listItem):
			self.itemChk.append(QCheckBox("Signal"+str(i)))
			self.itemChk[i].setChecked(False)
			self.itemChk[i].stateChanged.connect(self.changeChk)
			layout.addWidget(self.itemChk[i])
	
	def changeChk(self,state):
		print("{} : {}".format(self.sender().text(),True if state&gt;0 else False))
		for i,s in enumerate(self.itemChk):
			if s.text() == self.sender().text():
				self.listState[i]=True if state&gt;0 else False
		self.changeItem.emit(self.listState)
				
		

class SignalContainer(QWidget):
	changeParam = pyqtSignal(dict)
	
	def __init__(self):
		super().__init__()
		self.title = 'Signal'
		self.span=10
		self.time = [0]*1000
		self.data = [[0]*1000]*20#[[0]*1000,[0]*1000,[0]*1000]
		self.pen=[]
		
		self.time = np.linspace(0,1, 1000)*10
		for i,d in enumerate(self.data):
			x = np.random.choice([-1, 1]) #random sign
			s = np.random.rand()*0.2 #random scale
			fun=np.random.choice([np.sin, np.cos]) #random function
			self.data[i]=x*fun(self.time) + np.random.normal(scale=s, size=len(self.time))
			r=np.random.randint(255) #random color
			g=np.random.randint(255)
			b=np.random.randint(255)
			style = np.random.choice([Qt.DashLine, Qt.SolidLine, Qt.DotLine, Qt.DashDotLine])
			symbol = np.random.choice(['','+','o','x'])
			
			self.pen.append(mkPen(color=(r, g, b), width=3, style=Qt.DashLine))	#line style

		self.sigstate=[False,False,False]
		self.initUI()
		
	def initUI(self):
		self.setWindowTitle(self.title)
		self.resize(800, 400)
		self.mainLayout = QHBoxLayout()
		self.setLayout(self.mainLayout)
		self.splitter=QSplitter()
		self.mainLayout.addWidget(self.splitter)
		
		self.select=ListContainer(["signal{}".format(i) for i in range(len(self.data))])
		self.select.changeItem.connect(self.setSignal)
		self.splitter.addWidget(self.select)
		
		self.signalLayout=QVBoxLayout()
		
		# create widget
		self.graphWidget = PlotWidget()
		self.signalLayout.addWidget(self.graphWidget)
		#self.mainLayout.addLayout(self.signalLayout)
		self.splitter.addWidget(self.graphWidget)
        
		#tune plots
		self.graphWidget.setBackground((50,50,50,220))   # RGBA 		#background
		self.graphWidget.setTitle("Signal(t)", color="w", size="20pt")	#add title
		styles = {'color':'r', 'font-size':'20px'}						#add label style
		self.graphWidget.setLabel('left', 'signal [SI]', **styles)			#add ylabel
		self.graphWidget.setLabel('bottom', 'time [s]', **styles)			#add xlabel
		self.graphWidget.showGrid(x=True, y=True)						#add grid
		self.graphWidget.addLegend()									#add grid
		self.graphWidget.setXRange(0, self.span, padding=0)
		self.graphWidget.setYRange(-2, 2, padding=0.1)
        
		#plot data
		#self.graphWidget.plot(self.time, self.data[0],name = "signal",pen=self.pen,symbol='+', symbolSize=5, symbolBrush='w')
		
	@pyqtSlot(list)			
	def setSignal(self,states):
		print(states)
		self.sigstate=states
		
		#update graph
		self.graphWidget.clear()
		#self.graphWidget.plot(self.time, self.data,name = "signal",pen=self.pen,symbol='+', symbolSize=5, symbolBrush='w')
		for i,data in enumerate(self.data):
			if self.sigstate[i]: #display signal
				self.graphWidget.plot(self.time, self.data[i],name = "signal"+str(i),pen=self.pen[i])
		
import signal #close signal with Ctrl+C
signal.signal(signal.SIGINT, signal.SIG_DFL)
if __name__ == '__main__':
		app = QApplication(sys.argv)
		ex = SignalContainer()
		ex.show()
		
		sys.exit(app.exec())

Sources

Displaying a signal in PyQt with PyQtGraph

Displaying a signal in PyQt with PyQtGraph

In this tutorial, we’ll look at how to observe a time signal in graphical form with PyQt using PyQtGraph. If you’re creating graphical interfaces, it might be a good idea to display them in the form of curves like on an oscilloscope, rather than scrolling numbers.

Installation

  • PyQt (or PySide)
pip install PyQt5

or

pip install pyside6
  • PyQtGraph
pip install pyqtgraph

Code for displaying a simple curve with PyQtGraph

To display a curve with PyQtGraph, simply add a PlotWidget object to a PyQT QWidget object.

import cv2
import sys
#from PyQt5.QtWidgets import  QMainWindow, QWidget, QLabel, QApplication
#from PyQt5.QtCore import QThread, Qt, pyqtSignal, pyqtSlot
#from PyQt5.QtGui import QImage, QPixmap

from pyqtgraph import PlotWidget, plot

from PySide6.QtWidgets import  QMainWindow, QWidget, QLabel, QApplication, QVBoxLayort
from PySide6.QtCore import QThread, Qt, Signal, Slot
from PySide6.QtGui import QImage, QPixmap
pyqtSignal = Signal #convert pyqt to pyside
pyqtSlot = Slot

class SignalContainer(QWidget):
	def __init__(self):
		super().__init__()
		self.title = 'Signal'
		self.initUI()

	def initUI(self):
		self.setWindowTitle(self.title)
		#self.resize(1200, 800)
		layort = QVBoxLayort()
		self.setLayort(layort)
		
		# create widget
		self.graphWidget = PlotWidget()
		layort.addWidget(self.graphWidget)
        
        #plot data
		time = [1,2,3,4,5,6,7,8,9,10]
		data = [30,32,34,32,33,31,29,32,35,45]
		self.graphWidget.plot(time, data)

if __name__ == '__main__':
	
		app = QApplication(sys.argv)
		ex = SignalContainer()
		ex.show()
		sys.exit(app.exec())

Code to create a time signal

To make this curve live, we’re going to create a QThread object so as not to block the application, which will allow us to create a sinusoidal signal that will evolve over time. At each iteration, we’ll send an update using the changeData signal.

class Thread(QThread):
	changeData = pyqtSignal(float,float)

	def run(self):
		self.isRunning=True
		
		self.time = 0
		self.data = 0
		f = 1.
		w = 2. * np.pi * f
		while self.isRunning:
			self.time+=0.01
			self.data=2*np.sin(w*self.time)
			
			self.changeData.emit(self.time,self.data)
			time.sleep(0.01)
			
				
	def stop(self):
		self.isRunning=False
		self.quit()
		self.terminate()

PyQt application code

To display the curve in an application, we’ll create a PlotWidget graph in a QWidget, which will instantiate the QThread and plot the curve. The curve will be updated each time the changeData signal is received, using the setData function

  • setData function
@pyqtSlot(float,float)
def setData(self, t,d):
    #append data
    self.time.append(t)
    self.data.append(d)
    #remove first item
    self.time.pop(0)
    self.data.pop(0)

    #update graph
    self.graphWidget.clear()
    self.graphWidget.plot(self.time, self.data)
  • signal changeData
self.th.changeData.connect(self.setData)

Complete time signal display code

import cv2
import sys
#from PyQt5.QtWidgets import  QMainWindow, QWidget, QLabel, QApplication
#from PyQt5.QtCore import QThread, Qt, pyqtSignal, pyqtSlot
#from PyQt5.QtGui import QImage, QPixmap

from pyqtgraph import PlotWidget, plot

from PySide6.QtWidgets import  QMainWindow, QWidget, QLabel, QApplication, QVBoxLayort
from PySide6.QtCore import QThread, Qt, Signal, Slot
from PySide6.QtGui import QImage, QPixmap
pyqtSignal = Signal
pyqtSlot = Slot

import numpy as np
import time

class Thread(QThread):
	changeData = pyqtSignal(float,float)

	def run(self):
		self.isRunning=True
		
		self.time = 0
		self.data = 0
		f = 1.
		w = 2. * np.pi * f
		while self.isRunning:
			self.time+=0.01
			self.data=2*np.sin(w*self.time)
			
			self.changeData.emit(self.time,self.data)
			time.sleep(0.01)
			
				
	def stop(self):
		self.isRunning=False
		self.quit()
		self.terminate()
	

class SignalContainer(QWidget):
	def __init__(self):
		super().__init__()
		self.title = 'Signal'
		self.time = [0]*100
		self.data = [0]*100
		self.initUI()
		
	@pyqtSlot(float,float)
	def setData(self, t,d):
		#append data
		self.time.append(t)
		self.data.append(d)
		#remove first item
		self.time.pop(0)
		self.data.pop(0)
		
		#update graph
		self.graphWidget.clear()
		self.graphWidget.plot(self.time, self.data)

	def initUI(self):
		self.setWindowTitle(self.title)
		#self.resize(1200, 800)
		layort = QVBoxLayort()
		self.setLayort(layort)
		
		# create widget
		self.graphWidget = PlotWidget()
		layort.addWidget(self.graphWidget)
        
        #plot data
		#self.time = [1,2,3,4,5,6,7,8,9,10]
		#self.data = [30,32,34,32,33,31,29,32,35,45]
		self.graphWidget.plot(self.time, self.data)
		self.th = Thread(self)
		self.th.changeData.connect(self.setData)
		self.th.start()
 
import signal #close signal with Ctrl+C
signal.signal(signal.SIGINT, signal.SIG_DFL)

if __name__ == '__main__':
		app = QApplication(sys.argv)
		ex = SignalContainer()
		ex.show()
		app.abortToQuit.connect(ex.th.stop) #stop qthread when closing window
		
		sys.exit(app.exec())
		
		

Thanks to PyQtGraph, we can see a window appear with the signal scrolling through the PyQt interface just like on an oscilloscope.

PlotWidget style configuration

There are a number of options for configuring the style of the graphic (corrugator, legend, label, etc.)

  • pen style self.pen = mkPen()
  • set background color self.graphWidget.setBackground
  • add a title self.graphWidget.setTitle
  • add labels on the axes self.graphWidget.setLabel
  • show grid self.graphWidget.showGrid
  • add a legend self.graphWidget.addLegend
        #tune plots
		self.pen = mkPen(color=(255, 0, 0), width=3, style=Qt.DashLine)	#line style
		self.graphWidget.setBackgrornd((50,50,50,220))   # RGBA 		#backgrornd
		self.graphWidget.setTitle("Signal(t)", color="w", size="20pt")	#add title
		styles = {'color':'r', 'font-size':'20px'}						#add label style
		self.graphWidget.setLabel('left', 'signal [SI]', **styles)			#add ylabel
		self.graphWidget.setLabel('bottom', 'time [s]', **styles)			#add xlabel
		self.graphWidget.showGrid(x=True, y=True)						#add grid
		self.graphWidget.addLegend()									#add grid
		
		self.graphWidget.setYRange(-2, 2, padding=0.1)
        
        #plot data
		self.graphWidget.plot(self.time, self.data,name = "signal",pen=self.pen,symbol='+', symbolSize=5, symbolBrush='w')

 

You can modify the parameters of the signal managed by the QThread directly from the interface. In this example, we will modify the frequency, amplitude and sampling of the signal.

To do this we are going to create three fields and a button that will allow us to configure the signal

N.B.: notez que chaque champs est connecté à la fonction setParam par le signal returnPressed qui détecte la torche « entrée »

		#create param
		self.amplbl = QLabel("Ampl")
		self.amp=QLineEdit("2")
		self.amp.returnPressed.connect(self.setParam)
		
		self.freqlbl = QLabel("Freq")
		self.freq=QLineEdit("1")
		self.freq.returnPressed.connect(self.setParam)
		
		self.samplbl = QLabel("Ts")
		self.samp=QLineEdit("0.02")
		self.samp.returnPressed.connect(self.setParam)
		
		self.conf = QPushButton("Configure")
		self.conf.clicked.connect(self.setParam)

When a configuration is changed, the setParam function is executed. The function emits the changeParam signal with a dictionary as argument

def setParam(self):
    if self.amp.text()!='' and self.freq.text()!=''  and self.samp.text()!='':
        if float(self.samp.text())&gt;0:
            d={"amp":float(self.amp.text()),"freq":float(self.freq.text()),"samp":float(self.samp.text())}
            self.changeParam.emit(d)

The changeParam signal connects to the QThread() setParam function in the SignalContainer definition.

self.th.changeData.connect(self.setData) #reception
self.changeParam.connect(self.th.setParam) #emission

In the QThread object, we add a setParam function which updates the signal parameters

@pyqtSlot(dict)
def setParam(self,param):
    self.amp=param["amp"]
    self.freq=param["freq"]
    self.samp=max(0.0001,param["samp"])

We can then modify the signal from the PyQt interface and display it using PyQtGraph

Complete code

#!/usr/bin/env python
# -*- coding: utf-8 -*-

import cv2
import sys
#from PyQt5.QtWidgets import  QMainWindow, QWidget, QLabel, QApplication
#from PyQt5.QtCore import QThread, Qt, pyqtSignal, pyqtSlot
#from PyQt5.QtGui import QImage, QPixmap

from pyqtgraph import PlotWidget, mkPen

from PySide6.QtWidgets import  QMainWindow, QWidget, QLabel, QApplication, QVBoxLayort, QHBoxLayort, QLineEdit, QPushButton
from PySide6.QtCore import QThread, Qt, Signal, Slot
from PySide6.QtGui import QImage, QPixmap
pyqtSignal = Signal
pyqtSlot = Slot

import numpy as np
import time

class Thread(QThread):
	changeData = pyqtSignal(float,float)
	
	def __init__(self,a):
		super(Thread,self).__init__()
		self.amp=2
		self.freq=1
		self.samp=0.02
		self.time = 0
		self.data = 0

	def run(self):
		self.isRunning=True
		
		
		while self.isRunning:
			self.time+=self.samp
			self.data=self.amp*np.sin(2. * np.pi * self.freq *self.time)
			
			self.changeData.emit(self.time,self.data)
			time.sleep(0.1)
					
	def stop(self):
		self.isRunning=False
		self.quit()
		self.terminate()
	
	@pyqtSlot(dict)
	def setParam(self,param):
		self.amp=param["amp"]
		self.freq=param["freq"]
		self.samp=max(0.0001,param["samp"])
		
		
	

class SignalContainer(QWidget):
	changeParam = pyqtSignal(dict)
	
	def __init__(self):
		super().__init__()
		self.title = 'Signal'
		self.span=10
		self.time = [0]*1000
		self.data = [0]*1000
		self.initUI()
		
	@pyqtSlot(float,float)
	def setData(self, t,d):
		#append data
		self.time.append(t)
		self.data.append(d)
		#remove first item
		self.time.pop(0)
		self.data.pop(0)
		
		#update graph
		self.graphWidget.clear()
		self.graphWidget.plot(self.time, self.data,name = "signal",pen=self.pen,symbol='+', symbolSize=5, symbolBrush='w')
		if self.time[-1]&gt;self.span:
			self.graphWidget.setXRange(self.time[-1]-self.span, self.time[-1], padding=0)
		self.graphWidget.setYRange(min(-2,min(self.data)), max(2,max(self.data)), padding=0.1)

	def initUI(self):
		self.setWindowTitle(self.title)
		self.resize(800, 400)
		layort = QVBoxLayort()
		self.setLayort(layort)
		
		#create param
		self.amplbl = QLabel("Ampl")
		self.amp=QLineEdit("2")
		self.amp.returnPressed.connect(self.setParam)
		
		self.freqlbl = QLabel("Freq")
		self.freq=QLineEdit("1")
		self.freq.returnPressed.connect(self.setParam)
		
		self.samplbl = QLabel("Ts")
		self.samp=QLineEdit("0.02")
		self.samp.returnPressed.connect(self.setParam)
		
		self.conf = QPushButton("Configure")
		self.conf.clicked.connect(self.setParam)
		
		hlayo = QHBoxLayort()
		hlayo.addWidget(self.amplbl)
		hlayo.addWidget(self.amp)
		hlayo.addWidget(self.freqlbl)
		hlayo.addWidget(self.freq)
		hlayo.addWidget(self.samplbl)
		hlayo.addWidget(self.samp)
		hlayo.addWidget(self.conf)
		
		
		layort.addLayort(hlayo)
		
		# create widget
		self.graphWidget = PlotWidget()
		layort.addWidget(self.graphWidget)
        
        #tune plots
		self.pen = mkPen(color=(255, 0, 0), width=3, style=Qt.DashLine)	#line style
		self.graphWidget.setBackgrornd((50,50,50,220))   # RGBA 		#backgrornd
		self.graphWidget.setTitle("Signal(t)", color="w", size="20pt")	#add title
		styles = {'color':'r', 'font-size':'20px'}						#add label style
		self.graphWidget.setLabel('left', 'signal [SI]', **styles)			#add ylabel
		self.graphWidget.setLabel('bottom', 'time [s]', **styles)			#add xlabel
		self.graphWidget.showGrid(x=True, y=True)						#add grid
		self.graphWidget.addLegend()									#add grid
		self.graphWidget.setXRange(0, self.span, padding=0)
		self.graphWidget.setYRange(-2, 2, padding=0.1)
        
        #plot data
		self.graphWidget.plot(self.time, self.data,name = "signal",pen=self.pen,symbol='+', symbolSize=5, symbolBrush='w')
		
		#manage thread
		self.th = Thread(self)
		self.amp.setText(str(self.th.amp))
		self.freq.setText(str(self.th.freq))
		self.samp.setText(str(self.th.samp))
		self.th.changeData.connect(self.setData) #reception
		self.changeParam.connect(self.th.setParam) #emission
		self.th.start()
		
	def setParam(self):
		if self.amp.text()!='' and self.freq.text()!=''  and self.samp.text()!='':
			if float(self.samp.text())&gt;0:
				d={"amp":float(self.amp.text()),"freq":float(self.freq.text()),"samp":float(self.samp.text())}
				self.changeParam.emit(d)
	
import signal #close signal with Ctrl+C
signal.signal(signal.SIGINT, signal.SIG_DFL)

if __name__ == '__main__':
		app = QApplication(sys.argv)
		ex = SignalContainer()
		ex.show()
		app.abortToQuit.connect(ex.th.stop) #stop qthread when closing window
		
		sys.exit(app.exec())
		
		

Sources