Neste projeto, vamos criar um monitor de comunicação de rede UDP usando Python (PyQt). Quando se desenvolve um projeto com Arduino, Raspberry Pi ou qualquer outro microcontrolador, é certamente necessário criar uma interface gráfica para gerir o sistema (depurar, observar medições, lançar acções, etc.). Existem muitas ferramentas disponíveis para criar interfaces gráficas. Neste projeto, vamos criar um monitor de comunicação de rede usando PyQt (PySide2).
Objetivo
Para este projeto, queremos criar uma interface gráfica em Python para Windows que se comporte como um monitor de comunicação UDP. Para fazer isso, precisaremos executar as seguintes funções
- Caixa de introdução do endereço IP
- Zona de entrada no porto
- Botão de ligação
- Área de escrita para encomenda
- Botão Enviar
- Consola que apresenta os dados recebidos
Recuperar o endereço IP da máquina
Na máquina recetora, obtenha o endereço IPv4 (no nosso caso, 192.168.1.67)
ipconfig #sur la machine windows
ou
ip addr #sur machine linux

Aplicação de monitorização da comunicação UDP
Primeiro, vamos criar a janela da aplicação, a que chamaremos AcApp e que será a base da nossa aplicação.
#!/usr/bin/python3
# -*-coding:Utf-8 -*
import sys,os
#from PyQt5.QtWidgets import *
#from PyQt5.QtCore import *
#from PyQt5.QtGui import *
from PySide2.QtWidgets import *
from PySide2.QtCore import *
from PySide2.QtGui import *
pyqtSignal=Signal #translate pyqt to Pyside
class AcApp(QMainWindow):
def __init__(self,title='AcApp',mainFrame=QFrame):
super().__init__()
self.title=title
self.mainFrame=mainFrame()
self.initUI()
def initUI(self):
self.setCentralWidget(self.mainFrame)
#connect signals
self.mainFrame.debugSignal.connect(self.debugMsg)
#General configuration
#self.resize(self.width, self.height)
self.setWindowTitle(self.title)
self.setGeometry(300, 300, 850, 450)
#self.setWindowIcon(QIcon(__icon__))
#Debug bar
self.statusBar()
self.statusBar().showMessage('Display debug messages')
self.show()
def debugMsg(self,val):
self.statusBar().showMessage(val)
def main():
app = QApplication(sys.argv)
app.setQuitOnLastWindowClosed(True)
ex = AcApp(__title__)
#ex = AcApp(__title__,EthernetInterface)
app.quit()
sys.exit(app.exec_())
if __name__ == '__main__':
main()

Criação de widgets de gestão de comunicações UDP
Em seguida, criamos uma classe que contém todos os Widgets necessários para criar e gerir o monitor de comunicação UDP (QLineEdit, QTextEdit, QButton, etc.). Este Widget será inicializado com a classe que vai gerir a comunicação
class EthernetInterface(QFrame):
debugSignal=pyqtSignal(str) #define debug signal
def __init__(self,parent=None):
super(EthernetInterface,self).__init__(parent)
self.grid=QGridLayout()
self.setLayout(self.grid)
self.defineWidgets()
self.model=None
if self.model is not None: self.model.debugSignal.connect(self.read)
def defineWidgets(self):
#self.setStyleSheet("""QGroupBox{background-color:white;border: 1px solid green;border-radius: 4px;}
#QGroupBox::title {padding:1 5px;}""")
#grooupbox widget container
self.grp=QGroupBox(self)
self.grp.setTitle("Connection Configuration")
self.fields=QGridLayout()
self.grp.setLayout(self.fields)
self.grid.addWidget(self.grp,0,0)
#Define widget UI
#validator
ipRange = "(?:[0-1]?[0-9]?[0-9]|2[0-4][0-9]|25[0-5])" # Part of the regular expression
# Regulare expression
ipRegex = QRegExp("^" + ipRange + "\\." + ipRange + "\\." + ipRange + "\\." + ipRange + "$")
ipValidator = QRegExpValidator(ipRegex, self)
#label
self.selectlbl = QLabel("IP Address:")
self.typeBox = QLineEdit(HOST)
#self.typeBox.setInputMask("0.0.0.0");
self.typeBox.setValidator(ipValidator);
self.baudlbl = QLabel("Port:")
self.baudBox = QLineEdit("{}".format(PORT))
#btn
self.button = QPushButton("Connect")
self.button.clicked.connect(self.clicked)
self.button.clicked.connect(self.connec)
sendBtn = QPushButton("send")
sendBtn.clicked.connect(self.clicked)
sendBtn.clicked.connect(self.send)
titlelbl= QLabel("Enter")
self.edit = QLineEdit("")
sentlbl=QLabel("Sent")
self.sent = QTextEdit("")
desclbl=QLabel("Console")
self.desc = QTextEdit("")
#row, column[, rowSpan=1[, columnSpan=1[
self.fields.addWidget(self.selectlbl,0,0,1,1)
self.fields.addWidget(self.typeBox,0,1,1,1)
self.fields.addWidget(self.baudlbl,0,2,1,1)
self.fields.addWidget(self.baudBox,0,3,1,1)
self.fields.addWidget(self.button,0,4,1,1)
self.fields.addWidget(titlelbl,1,0,1,1)
self.fields.addWidget(self.edit,1,1,1,3)
self.fields.addWidget(sendBtn,1,4,1,1)
self.fields.addWidget(sentlbl,2,0,1,1,Qt.AlignTop)#Qt.AlignmentFlag.AlignTop)
self.fields.addWidget(self.sent,2,1,1,3)
self.fields.addWidget(desclbl,3,0,1,1,Qt.AlignTop)#Qt.AlignmentFlag.AlignTop)
self.fields.addWidget(self.desc,3,1,1,3)
def debug(self,msg):
sender = self.sender()
self.debugSignal.emit(sender.__class__.__name__+" : "+msg)
def clicked(self):
sender = self.sender()
if sender.__class__.__name__=="QPushButton":
self.debugSignal.emit(sender.text()+ " clicked")
if sender.__class__.__name__=="QComboBox":
self.debugSignal.emit(sender.currentText()+ " selected")
def connec(self):
#self.desc.setText("")
self.desc.clear()
if self.model is not None:
if self.button.text() == "Connect":
self.desc.setText(">> trying to connect to address {} on port {} ...".format(self.typeBox.text(),self.baudBox.text()))
print("Started")
self.button.setText("Stop")
#self.model = EthModel()
self.model.connec(self.typeBox.text(),int(self.baudBox.text()))
self.model.start()
else:
self.model.quit_flag = True
print("Stop sent")
self.model.close()#self.model.wait()
print("Stopped")
self.button.setText("Connect")
self.desc.setText(">> deconnect address {} on port {} ...".format(self.typeBox.text(),self.baudBox.text()))
def read(self,msg):
self.desc.setText(self.desc.toPlainText()+msg+"\n")
self.desc.verticalScrollBar().setValue(self.desc.verticalScrollBar().maximum());
def send(self):
if self.edit.text() != "":
self.sent.setText(self.sent.toPlainText()+self.edit.text()+"\n")
if self.model is not None:
self.model.write(self.edit.text())
Substituir ex = AcApp(__title__) por ex = AcApp(__title__,EthernetInterface) na função main()

Assim que os nossos widgets estiverem no sítio, podemos começar a utilizá-los.
Criar uma QThread para gerir a comunicação UDP
Para não bloquear a GUI quando os pacotes são recebidos ou enviados, vamos criar uma QThread que irá gerir a comunicação UDP
#Ethernet connection
class EthModel(QThread):
"""Handle Ethernet connexion with remote device and connect to interface EthInterface"""
debugSignal=pyqtSignal(str) #define debug signal
def __init__(self):
super(EthModel, self).__init__()
self.quit_flag = False
self.msgToEmit=""
def run(self):
while True:
if not self.quit_flag:
self.read()
#time.sleep(1)
else:
self.close()
break
self.quit()
#self.wait()
def connec(self,addr='192.168.1.10',port=7):
# Create a UDP/IP socket
self.sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) #worls with esp8266 udp client
self.sock.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1)
self.sock.bind((addr,port))
# Listen for incoming connections
#self.sock.listen(1) #stream
print('starting up on {} port {}'.format(addr,port))
self.debugSignal.emit('starting up on {} port {}'.format(addr,port))
self.quit_flag = False
self._isRunning=True
def read(self):
while self._isRunning:
#data = self.sock.recv(1024)
try:
data, addr = self.sock.recvfrom(1024)
print(data)
except:
print("socket closed")
data=False
if not data:
print("no data > break loop")
break
#self.sock.sendto("received OK".encode('utf-8'),addr)
if self.msgToEmit!="":
self.sock.sendto(self.msgToEmit.encode('utf-8'),addr)
self.msgToEmit="" #clear message
self.debugSignal.emit(str(data))
def write(self,msg):
self.msgToEmit=msg
def close(self):
self._isRunning=False
self.sock.close()
Código Python do cliente UDP
Para testar a sua interface, pode executar um código no mesmo computador utilizando o endereço 127.0.0.1 (endereço local). O código a seguir criará um socket e enviará o valor de um contador.
#!/usr/bin/env python
# -*- coding: utf-8 -*-
import socket
import time
HOST = "127.0.0.1"
PORT = 8888
bufferSize = 1024
counter=0
while True:
# Create a UDP socket at client side
with socket.socket(family=socket.AF_INET, type=socket.SOCK_DGRAM) as client:
# Send to server using created UDP socket
client.sendto(str.encode(str(counter)), (HOST,PORT))
data,addr= client.recvfrom(bufferSize)
msg = "Message from Server {}".format(data)
print(msg)
counter+=1
time.sleep(0.1)
Código Cliente UDP ESP8266
O código do ESP8266 para a comunicação UDP é bastante simples. Definimos a comunicação em rede utilizando o protocolo UDP e devolvemos o valor dado pela função millis().
#include <ESP8266WiFi.h> #include <WiFiUdp.h> WiFiUDP udp; char packetBuffer[256]; unsigned int localPort = 9999; const char *ssid = "******"; const char *password = "******"; // Set your Static IP address IPAddress local_IP(192, 168, 1, 80); IPAddress gateway(192, 168, 1, 1); IPAddress subnet(255, 255, 0, 0); // Set destination IP address const char *destaddr = "192.168.1.67"; unsigned int destPort = 8888; void setup() { Serial.begin(115200); WiFi.config(local_IP, gateway, subnet); WiFi.begin(ssid, password); while (WiFi.status() != WL_CONNECTED) { delay(500); Serial.print("."); } udp.begin(localPort); Serial.print(F("UDP Client : ")); Serial.println(WiFi.localIP()); } void loop() { int packetSize = udp.parsePacket(); Serial.print(" Received packet from : "); Serial.println(udp.remoteIP()); Serial.print(" Size : "); Serial.println(packetSize); Serial.print(" Data : "); packetBuffer[0] = '0'; //reset buffer if (packetSize) { int len = udp.read(packetBuffer, 256); Serial.print(packetBuffer); udp.flush(); } Serial.println("\n"); delay(500); Serial.print("[Client Connected] "); Serial.println(WiFi.localIP()); udp.beginPacket(destaddr, destPort); udp.write("Send millis: "); char buf[20]; unsigned long testID = millis(); sprintf(buf, "%lu", testID); Serial.print(" Sent : "); Serial.println(buf); udp.write(buf); udp.write("\r\n"); udp.endPacket(); }
N.B.: Para este projeto, utilizamos um esp8266, mas pode adaptar o código a qualquer dispositivo que utilize o protocolo UDP.
Resultados
Para utilizar o código de cliente Python, introduza o endereço IP 127.0.0.1
Para testar a comunicação com o ESP8266, utilize o endereço IP do seu computador (aqui, 192.168.1.67)


Criámos um monitor de comunicação UDP utilizando Python que pode interagir com dispositivos remotos como o ESP8266, ESP32, Raspberry Pi ou outros computadores. Pode agora melhorar a interface de acordo com as suas necessidades
Código completo
#!/usr/bin/python3
# -*-coding:Utf-8 -*
"""
Created on Thu Nov 17 16:59:13 2022
@author: X.Wiedmer
AC windows
Define the application window
"""
import sys,os
#from PyQt5.QtWidgets import *
#from PyQt5.QtCore import *
#from PyQt5.QtGui import *
from PySide2.QtWidgets import *
from PySide2.QtCore import *
from PySide2.QtGui import *
pyqtSignal=Signal #translate pyqt to Pyside
import socket
import time
"""
App configuration
"""
__title__="ACTerminal"
__version__="v0.1"
HOST = '192.168.1.67'
PORT = 8888
#Ethernet connection
class EthModel(QThread):
"""Handle Ethernet connexion with remote device and connect to interface EthInterface"""
debugSignal=pyqtSignal(str) #define debug signal
def __init__(self):
super(EthModel, self).__init__()
self.quit_flag = False
self.msgToEmit=""
def run(self):
while True:
if not self.quit_flag:
self.read()
#time.sleep(1)
else:
self.close()
break
self.quit()
self.exit()
#self.wait()
def connec(self,addr='192.168.1.10',port=7):
# Create a UDP/IP socket
self.sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) #worls with esp8266 udp client
self.sock.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1)
self.sock.bind((addr,port))
# Listen for incoming connections
#self.sock.listen(1) #stream
print('starting up on {} port {}'.format(addr,port))
self.debugSignal.emit('starting up on {} port {}'.format(addr,port))
self.quit_flag = False
self._isRunning=True
def read(self):
while self._isRunning:
#data = self.sock.recv(1024)
try:
data, addr = self.sock.recvfrom(1024)
print(data)
except:
print("socket closed")
data=False
if not data:
print("no data > break loop")
break
#self.sock.sendto("received OK".encode('utf-8'),addr)
if self.msgToEmit!="":
self.sock.sendto(self.msgToEmit.encode('utf-8'),addr)
self.msgToEmit="" #clear message
self.debugSignal.emit(str(data))
def write(self,msg):
self.msgToEmit=msg
def close(self):
self._isRunning=False
self.sock.close()
#define GUI
class EthernetInterface(QFrame):
debugSignal=pyqtSignal(str) #define debug signal
def __init__(self,parent=None):
super(EthernetInterface,self).__init__(parent)
self.grid=QGridLayout()
self.setLayout(self.grid)
self.defineWidgets()
self.model=EthModel()#self.model=None
self.model.debugSignal.connect(self.read)
def defineWidgets(self):
#self.setStyleSheet("""QGroupBox{background-color:white;border: 1px solid green;border-radius: 4px;}
#QGroupBox::title {padding:1 5px;}""")
#grooupbox widget container
self.grp=QGroupBox(self)
self.grp.setTitle("Connection Configuration")
self.fields=QGridLayout()
self.grp.setLayout(self.fields)
self.grid.addWidget(self.grp,0,0)
#Define widget UI
#validator
ipRange = "(?:[0-1]?[0-9]?[0-9]|2[0-4][0-9]|25[0-5])" # Part of the regular expression
# Regulare expression
ipRegex = QRegExp("^" + ipRange + "\\." + ipRange + "\\." + ipRange + "\\." + ipRange + "$")
ipValidator = QRegExpValidator(ipRegex, self)
#label
self.selectlbl = QLabel("IP Address:")
self.typeBox = QLineEdit(HOST)
#self.typeBox.setInputMask("0.0.0.0");
self.typeBox.setValidator(ipValidator);
self.baudlbl = QLabel("Port:")
self.baudBox = QLineEdit("{}".format(PORT))
#btn
self.button = QPushButton("Connect")
self.button.clicked.connect(self.clicked)
self.button.clicked.connect(self.connec)
sendBtn = QPushButton("send")
sendBtn.clicked.connect(self.clicked)
sendBtn.clicked.connect(self.send)
titlelbl= QLabel("Enter")
self.edit = QLineEdit("")
sentlbl=QLabel("Sent")
self.sent = QTextEdit("")
desclbl=QLabel("Console")
self.desc = QTextEdit("")
#row, column[, rowSpan=1[, columnSpan=1[
self.fields.addWidget(self.selectlbl,0,0,1,1)
self.fields.addWidget(self.typeBox,0,1,1,1)
self.fields.addWidget(self.baudlbl,0,2,1,1)
self.fields.addWidget(self.baudBox,0,3,1,1)
self.fields.addWidget(self.button,0,4,1,1)
self.fields.addWidget(titlelbl,1,0,1,1)
self.fields.addWidget(self.edit,1,1,1,3)
self.fields.addWidget(sendBtn,1,4,1,1)
self.fields.addWidget(sentlbl,2,0,1,1,Qt.AlignTop)#Qt.AlignmentFlag.AlignTop)
self.fields.addWidget(self.sent,2,1,1,3)
self.fields.addWidget(desclbl,3,0,1,1,Qt.AlignTop)#Qt.AlignmentFlag.AlignTop)
self.fields.addWidget(self.desc,3,1,1,3)
def debug(self,msg):
sender = self.sender()
self.debugSignal.emit(sender.__class__.__name__+" : "+msg)
def clicked(self):
sender = self.sender()
if sender.__class__.__name__=="QPushButton":
self.debugSignal.emit(sender.text()+ " clicked")
if sender.__class__.__name__=="QComboBox":
self.debugSignal.emit(sender.currentText()+ " selected")
def connec(self):
#self.desc.setText("")
self.desc.clear()
if self.model is not None:
if self.button.text() == "Connect":
self.desc.setText(">> trying to connect to address {} on port {} ...\n".format(self.typeBox.text(),self.baudBox.text()))
print("Started")
self.button.setText("Stop")
self.model.connec(self.typeBox.text(),int(self.baudBox.text()))
self.model.start()
else:
self.model.quit_flag = True
print("Stop sent")
self.model.close()#self.model.wait()
print("Stopped")
self.button.setText("Connect")
self.desc.setText(">> deconnect address {} on port {} ...".format(self.typeBox.text(),self.baudBox.text()))
def read(self,msg):
self.desc.setText(self.desc.toPlainText()+msg+"\n")
self.desc.verticalScrollBar().setValue(self.desc.verticalScrollBar().maximum());
def send(self):
if self.edit.text() != "":
self.sent.setText(self.sent.toPlainText()+self.edit.text()+"\n")
if self.model is not None:
self.model.write(self.edit.text())
# Generic app container
class AcApp(QMainWindow):
def __init__(self,title='AcApp',mainFrame=QFrame):
super().__init__()
self.title=title
self.mainFrame=mainFrame()
self.initUI()
def initUI(self):
self.setCentralWidget(self.mainFrame)
#connect signals
self.mainFrame.debugSignal.connect(self.debugMsg)
#General configuration
#self.resize(self.width, self.height)
self.setWindowTitle(self.title)
self.setGeometry(300, 300, 850, 450)
#self.setWindowIcon(QIcon(__icon__))
#Debug bar
self.statusBar()
self.statusBar().showMessage('Display debug messages')
self.show()
def debugMsg(self,val):
self.statusBar().showMessage(val)
def main():
app = QApplication(sys.argv)
app.setQuitOnLastWindowClosed(True)
#app.setStyleSheet(open("style.txt").read()); #set style according to file
#ex = AcApp(__title__)
ex = AcApp(__title__,EthernetInterface)
app.quit()
sys.exit(app.exec_())
if __name__ == '__main__':
main()