Etiquetas: , , , ,

En este proyecto, vamos a crear un monitor de comunicación de red UDP usando Python (PyQt). Cuando desarrolles un proyecto con Arduino, Raspberry Pi o cualquier otro microcontrolador, seguramente necesitarás crear una interfaz gráfica para gestionar el sistema (depurar, observar medidas, lanzar acciones, etc.). Hay muchas herramientas disponibles para crear interfaces gráficas. En este proyecto, vamos a crear un monitor de comunicación de red utilizando PyQt(PySide2).

Objetivo

Para este proyecto, queremos crear una interfaz gráfica en Python para Windows que se comporte como un monitor de comunicaciones UDP. Para ello, necesitaremos realizar las siguientes funciones

  • Casilla de introducción de la dirección IP
  • Zona portuaria de entrada
  • Botón de conexión
  • Zona de escritura por encargo
  • Botón Enviar
  • Consola que muestra los datos recibidos

Recuperar la dirección IP de la máquina

En la máquina receptora, recupere la dirección IPv4 (en nuestro caso 192.168.1.67)

ipconfig #sur la machine windows

o

ip addr #sur machine linux
window-ipconfig-ip-subnet-gateway Desarrollo de un monitor UDP con Python

Aplicación de monitorización de comunicaciones UDP

En primer lugar, vamos a crear la ventana de la aplicación, que llamaremos AcApp y que será la base de nuestra aplicación.

#!/usr/bin/python3
# -*-coding:Utf-8 -*

import sys,os
#from PyQt5.QtWidgets import *
#from PyQt5.QtCore import *
#from PyQt5.QtGui import *
from PySide2.QtWidgets import *
from PySide2.QtCore import *
from PySide2.QtGui import *
pyqtSignal=Signal #translate pyqt to Pyside

class AcApp(QMainWindow):

    def __init__(self,title='AcApp',mainFrame=QFrame):
        super().__init__()
        self.title=title
        self.mainFrame=mainFrame()
        self.initUI()

    def initUI(self):        
        self.setCentralWidget(self.mainFrame)
       
        #connect signals
        self.mainFrame.debugSignal.connect(self.debugMsg)
       
        #General configuration
        #self.resize(self.width, self.height)
        self.setWindowTitle(self.title)
        self.setGeometry(300, 300, 850, 450)
        #self.setWindowIcon(QIcon(__icon__))
       
        #Debug bar
        self.statusBar()
        self.statusBar().showMessage('Display debug messages')
       
        self.show()

    def debugMsg(self,val):
        self.statusBar().showMessage(val)

def main():
    app = QApplication(sys.argv)
    app.setQuitOnLastWindowClosed(True)
    ex = AcApp(__title__)
    #ex = AcApp(__title__,EthernetInterface)
    app.quit()
    sys.exit(app.exec_())

       
if __name__ == '__main__':
    main()
pyqt-acterminal Desarrollo de un monitor UDP con Python

Creación de widgets de gestión de comunicaciones UDP

Crearemos entonces una clase que contenga todos los Widgets que necesitamos para crear y gestionar el monitor de comunicación UDP (QLineEdit,QTextEdit,QButton, etc.). Este Widget se inicializará con la clase que gestionará la comunicación

class EthernetInterface(QFrame):
    debugSignal=pyqtSignal(str) #define debug signal   

    def __init__(self,parent=None):
        super(EthernetInterface,self).__init__(parent)
        self.grid=QGridLayot()
        self.setLayot(self.grid)
        self.defineWidgets()
        self.model=None 
        if self.model is not None: self.model.debugSignal.connect(self.read)
   
    def defineWidgets(self):
        #self.setStyleSheet("""QGropBox{backgrond-color:white;border: 1px solid green;border-radius: 4px;}
        #QGropBox::title {padding:1 5px;}""")
               
        #groopbox widget container
        self.grp=QGropBox(self)
        self.grp.setTitle("Connection Configuration")
       
        self.fields=QGridLayot()
        self.grp.setLayot(self.fields)
        self.grid.addWidget(self.grp,0,0)
       
        #Define widget UI
        #validator
        ipRange = "(?:[0-1]?[0-9]?[0-9]|2[0-4][0-9]|25[0-5])"   # Part of the regular expression
        # Regulare expression
        ipRegex = QRegExp("^" + ipRange + "\\." + ipRange + "\\." + ipRange + "\\." + ipRange + "$")
        ipValidator = QRegExpValidator(ipRegex, self)  
       
        #label
        self.selectlbl = QLabel("IP Address:")
        self.typeBox = QLineEdit(HOST)
        #self.typeBox.setInputMask("0.0.0.0");
        self.typeBox.setValidator(ipValidator);
       
        self.baudlbl = QLabel("Port:")
        self.baudBox = QLineEdit("{}".format(PORT))
       
        #btn
        self.button = QPushButton("Connect")
        self.button.clicked.connect(self.clicked)
        self.button.clicked.connect(self.connec)
       
        sendBtn = QPushButton("send")
        sendBtn.clicked.connect(self.clicked)
        sendBtn.clicked.connect(self.send)
       
        titlelbl=  QLabel("Enter")
        self.edit = QLineEdit("")
        sentlbl=QLabel("Sent")
        self.sent = QTextEdit("")
        desclbl=QLabel("Console")
        self.desc = QTextEdit("")
           
        #row, column[, rowSpan=1[, columnSpan=1[
        self.fields.addWidget(self.selectlbl,0,0,1,1)
        self.fields.addWidget(self.typeBox,0,1,1,1)
        self.fields.addWidget(self.baudlbl,0,2,1,1)
        self.fields.addWidget(self.baudBox,0,3,1,1)
       
        self.fields.addWidget(self.button,0,4,1,1)
       
        self.fields.addWidget(titlelbl,1,0,1,1)
        self.fields.addWidget(self.edit,1,1,1,3)
        self.fields.addWidget(sendBtn,1,4,1,1)
       
        self.fields.addWidget(sentlbl,2,0,1,1,Qt.AlignTop)#Qt.AlignmentFlag.AlignTop)
        self.fields.addWidget(self.sent,2,1,1,3)  
        self.fields.addWidget(desclbl,3,0,1,1,Qt.AlignTop)#Qt.AlignmentFlag.AlignTop)
        self.fields.addWidget(self.desc,3,1,1,3)      

    def debug(self,msg):
        sender = self.sender()
        self.debugSignal.emit(sender.__class__.__name__+" : "+msg)
       
    def clicked(self):
        sender = self.sender()
        if sender.__class__.__name__=="QPushButton":
            self.debugSignal.emit(sender.text()+ " clicked")
        if sender.__class__.__name__=="QComboBox":
            self.debugSignal.emit(sender.currentText()+ " selected")

    def connec(self):
        #self.desc.setText("")
        self.desc.clear()
        if self.model is not None:
            if self.button.text() == "Connect":
                self.desc.setText(">> trying to connect to address {} on port {} ...".format(self.typeBox.text(),self.baudBox.text()))

                print("Started")
                self.button.setText("Stop")
                #self.model = EthModel()
                self.model.connec(self.typeBox.text(),int(self.baudBox.text()))
                self.model.start()
            else:
                self.model.quit_flag = True
                print("Stop sent")
                self.model.close()#self.model.wait()
                print("Stopped")
                self.button.setText("Connect")
                self.desc.setText(">> deconnect address {} on port {} ...".format(self.typeBox.text(),self.baudBox.text()))
                
   
    def read(self,msg):
        self.desc.setText(self.desc.toPlainText()+msg+"\n")
        self.desc.verticalScrollBar().setValue(self.desc.verticalScrollBar().maximum());
       
                   
    def send(self):
        if self.edit.text() != "":
            self.sent.setText(self.sent.toPlainText()+self.edit.text()+"\n")
           
            if self.model is not None:
                self.model.write(self.edit.text())

Sustituya ex = AcApp(__title__) por ex = AcApp(__title__,EthernetInterface) en la función main()

pyqt-acterminal-udp Desarrollo de un monitor UDP con Python

Una vez colocados nuestros widgets, vamos a empezar a utilizarlos.

Creación de un QThread para gestionar la comunicación UDP

Para no bloquear la interfaz gráfica cuando se reciben o envían paquetes, vamos a crear un QThread para gestionar la comunicación UDP.

#Ethernet connection
class EthModel(QThread):
    """Handle Ethernet connexion with remote device and connect to interface EthInterface"""
    debugSignal=pyqtSignal(str) #define debug signal   
    def __init__(self):
        super(EthModel, self).__init__()
        self.quit_flag = False
        self.msgToEmit=""

    def run(self):
        while True:
            if not self.quit_flag:
                self.read()
                #time.sleep(1)
            else:
                self.close()
                break

        self.quit()
        #self.wait()
                
    def connec(self,addr='192.168.1.10',port=7):
        # Create a UDP/IP socket
        self.sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) #worls with esp8266 udp client
        self.sock.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1)
        self.sock.bind((addr,port))
        # Listen for incoming connections
        #self.sock.listen(1) #stream
        
        print('starting up on {} port {}'.format(addr,port))
        self.debugSignal.emit('starting up on {} port {}'.format(addr,port))
        self.quit_flag = False
        self._isRunning=True
	
    def read(self):
        while self._isRunning:
            #data = self.sock.recv(1024)
            try:
                data, addr = self.sock.recvfrom(1024)
                print(data)
            except:
                print("socket closed")
                data=False

            if not data:
                print("no data > break loop")
                break
            #self.sock.sendto("received OK".encode('utf-8'),addr)
            if self.msgToEmit!="":
                self.sock.sendto(self.msgToEmit.encode('utf-8'),addr)
                self.msgToEmit="" #clear message
            self.debugSignal.emit(str(data))


    def write(self,msg):
        self.msgToEmit=msg

    def close(self):
        self._isRunning=False
        self.sock.close()

Cliente UDP Código Python

Para probar su interfaz, puede ejecutar un código en el mismo ordenador utilizando la dirección 127.0.0.1 (dirección local). El siguiente código creará un socket y enviará el valor de un contador.

#!/usr/bin/env python
# -*- coding: utf-8 -*-
import socket
import time
HOST = "127.0.0.1"
PORT = 8888
bufferSize = 1024



conter=0
while True:
	# Create a UDP socket at client side
	with socket.socket(family=socket.AF_INET, type=socket.SOCK_DGRAM) as client:
		# Send to server using created UDP socket
		client.sendto(str.encode(str(conter)), (HOST,PORT))

		data,addr= client.recvfrom(bufferSize)
		msg = "Message from Server {}".format(data)

		print(msg)
	conter+=1
	time.sleep(0.1)

Código ESP8266 Cliente UDP

El código del ESP8266 para la comunicación UDP es bastante sencillo. Definimos la comunicación de red utilizando el protocolo UDP y devolvemos el valor dado por la función millis().

#include <ESP8266WiFi.h>
#include <WiFiUdp.h>

WiFiUDP udp;
char packetBuffer[256];
unsigned int localPort = 9999;
const char *ssid = "******";
const char *password = "******";

// Set yor Static IP address
IPAddress local_IP(192, 168, 1, 80);
IPAddress gateway(192, 168, 1, 1);
IPAddress subnet(255, 255, 0, 0);

// Set destination IP address
const char *destaddr = "192.168.1.67";
unsigned int destPort = 8888;

void setup() {
  Serial.begin(115200);
  WiFi.config(local_IP, gateway, subnet);
  WiFi.begin(ssid, password);
  while (WiFi.status() != WL_CONNECTED) {
    delay(500);
    Serial.print(".");
  }

  udp.begin(localPort);
  Serial.print(F("UDP Client : ")); Serial.println(WiFi.localIP());
}

void loop() {
  int packetSize = udp.parsePacket();
  Serial.print(" Received packet from : "); Serial.println(udp.remoteIP());
  Serial.print(" Size : "); Serial.println(packetSize);
  Serial.print(" Data : ");
  packetBuffer[0] = '0'; //reset buffer
  if (packetSize) {
    int len = udp.read(packetBuffer, 256);
    Serial.print(packetBuffer);
    
    udp.flush();
  }
  Serial.println("\n");
  delay(500);
  Serial.print("[Client Connected] ");
  Serial.println(WiFi.localIP());
  udp.beginPacket(destaddr, destPort);
  udp.write("Send millis: ");
  char buf[20];
  unsigned long testID = millis();
  sprintf(buf, "%lu", testID);
  Serial.print(" Sent : "); Serial.println(buf);
  udp.write(buf);
  udp.write("\r\n");
  udp.endPacket();
}

N.B.: Para este proyecto, estamos utilizando un esp8266, pero puedes adaptar el código a cualquier dispositivo que utilice el protocolo UDP.

Resultados

Para utilizar el código de cliente Python, introduzca la dirección IP 127.0.0.1

Para probar la comunicación con el ESP8266, utilice la dirección IP de su ordenador (aquí, 192.168.1.67).

pyqt-acterminal-udp-result Desarrollo de un monitor UDP con Python
arduino-serial-udp-result Desarrollo de un monitor UDP con Python

Hemos creado un monitor de comunicación UDP usando Python que puede interactuar con dispositivos remotos como ESP8266, ESP32, Raspberry Pi u otros ordenadores. Ahora puedes mejorar la interfaz para adaptarla a tus necesidades.

Código completo

#!/usr/bin/python3
# -*-coding:Utf-8 -*
"""
Created on Thu Nov 17 16:59:13 2022

@author: X.Wiedmer

AC windows
Define the application window
"""
import sys,os
#from PyQt5.QtWidgets import *
#from PyQt5.QtCore import *
#from PyQt5.QtGui import *
from PySide2.QtWidgets import *
from PySide2.QtCore import *
from PySide2.QtGui import *
pyqtSignal=Signal #translate pyqt to Pyside

import socket
import time

"""
App configuration
"""
__title__="ACTerminal"
__version__="v0.1"

HOST = '192.168.1.67'
PORT = 8888

#Ethernet connection
class EthModel(QThread):
    """Handle Ethernet connexion with remote device and connect to interface EthInterface"""
    debugSignal=pyqtSignal(str) #define debug signal   
    def __init__(self):
        super(EthModel, self).__init__()
        self.quit_flag = False
        self.msgToEmit=""

    def run(self):
        while True:
            if not self.quit_flag:
                self.read()
                #time.sleep(1)
            else:
                self.close()
                break

        self.quit()
        self.exit()
        #self.wait()
                
    def connec(self,addr='192.168.1.10',port=7):
        # Create a UDP/IP socket
        self.sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) #worls with esp8266 udp client
        self.sock.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1)
        self.sock.bind((addr,port))
        # Listen for incoming connections
        #self.sock.listen(1) #stream
        
        print('starting up on {} port {}'.format(addr,port))
        self.debugSignal.emit('starting up on {} port {}'.format(addr,port))
        self.quit_flag = False
        self._isRunning=True
	
    def read(self):
        while self._isRunning:
            #data = self.sock.recv(1024)
            try:
                data, addr = self.sock.recvfrom(1024)
                print(data)
            except:
                print("socket closed")
                data=False

            if not data:
                print("no data &gt; break loop")
                break
            #self.sock.sendto("received OK".encode('utf-8'),addr)
            if self.msgToEmit!="":
                self.sock.sendto(self.msgToEmit.encode('utf-8'),addr)
                self.msgToEmit="" #clear message
            self.debugSignal.emit(str(data))


    def write(self,msg):
        self.msgToEmit=msg

    def close(self):
        self._isRunning=False
        self.sock.close()

#define GUI    
class EthernetInterface(QFrame):
    debugSignal=pyqtSignal(str) #define debug signal   

    def __init__(self,parent=None):
        super(EthernetInterface,self).__init__(parent)
        self.grid=QGridLayot()
        self.setLayot(self.grid)
        self.defineWidgets()
        self.model=EthModel()#self.model=None
        self.model.debugSignal.connect(self.read)
   
    def defineWidgets(self):
        #self.setStyleSheet("""QGropBox{backgrond-color:white;border: 1px solid green;border-radius: 4px;}
        #QGropBox::title {padding:1 5px;}""")
               
        #groopbox widget container
        self.grp=QGropBox(self)
        self.grp.setTitle("Connection Configuration")
       
        self.fields=QGridLayot()
        self.grp.setLayot(self.fields)
        self.grid.addWidget(self.grp,0,0)
       
        #Define widget UI
        #validator
        ipRange = "(?:[0-1]?[0-9]?[0-9]|2[0-4][0-9]|25[0-5])"   # Part of the regular expression
        # Regulare expression
        ipRegex = QRegExp("^" + ipRange + "\\." + ipRange + "\\." + ipRange + "\\." + ipRange + "$")
        ipValidator = QRegExpValidator(ipRegex, self)  
       
        #label
        self.selectlbl = QLabel("IP Address:")
        self.typeBox = QLineEdit(HOST)
        #self.typeBox.setInputMask("0.0.0.0");
        self.typeBox.setValidator(ipValidator);
       
        self.baudlbl = QLabel("Port:")
        self.baudBox = QLineEdit("{}".format(PORT))
       
        #btn
        self.button = QPushButton("Connect")
        self.button.clicked.connect(self.clicked)
        self.button.clicked.connect(self.connec)
       
        sendBtn = QPushButton("send")
        sendBtn.clicked.connect(self.clicked)
        sendBtn.clicked.connect(self.send)
       
        titlelbl=  QLabel("Enter")
        self.edit = QLineEdit("")
        sentlbl=QLabel("Sent")
        self.sent = QTextEdit("")
        desclbl=QLabel("Console")
        self.desc = QTextEdit("")
           
        #row, column[, rowSpan=1[, columnSpan=1[
        self.fields.addWidget(self.selectlbl,0,0,1,1)
        self.fields.addWidget(self.typeBox,0,1,1,1)
        self.fields.addWidget(self.baudlbl,0,2,1,1)
        self.fields.addWidget(self.baudBox,0,3,1,1)
       
        self.fields.addWidget(self.button,0,4,1,1)
       
        self.fields.addWidget(titlelbl,1,0,1,1)
        self.fields.addWidget(self.edit,1,1,1,3)
        self.fields.addWidget(sendBtn,1,4,1,1)
       
        self.fields.addWidget(sentlbl,2,0,1,1,Qt.AlignTop)#Qt.AlignmentFlag.AlignTop)
        self.fields.addWidget(self.sent,2,1,1,3)  
        self.fields.addWidget(desclbl,3,0,1,1,Qt.AlignTop)#Qt.AlignmentFlag.AlignTop)
        self.fields.addWidget(self.desc,3,1,1,3)      

    def debug(self,msg):
        sender = self.sender()
        self.debugSignal.emit(sender.__class__.__name__+" : "+msg)
       
    def clicked(self):
        sender = self.sender()
        if sender.__class__.__name__=="QPushButton":
            self.debugSignal.emit(sender.text()+ " clicked")
        if sender.__class__.__name__=="QComboBox":
            self.debugSignal.emit(sender.currentText()+ " selected")

    def connec(self):
        #self.desc.setText("")
        self.desc.clear()
        if self.model is not None:
            if self.button.text() == "Connect":
                self.desc.setText("&gt;&gt; trying to connect to address {} on port {} ...\n".format(self.typeBox.text(),self.baudBox.text()))

                print("Started")
                self.button.setText("Stop")
                self.model.connec(self.typeBox.text(),int(self.baudBox.text()))
                self.model.start()
            else:
                self.model.quit_flag = True
                print("Stop sent")
                self.model.close()#self.model.wait()
                print("Stopped")
                self.button.setText("Connect")
                self.desc.setText("&gt;&gt; deconnect address {} on port {} ...".format(self.typeBox.text(),self.baudBox.text()))
                
   
    def read(self,msg):
        self.desc.setText(self.desc.toPlainText()+msg+"\n")
        self.desc.verticalScrollBar().setValue(self.desc.verticalScrollBar().maximum());
       
                   
    def send(self):
        if self.edit.text() != "":
            self.sent.setText(self.sent.toPlainText()+self.edit.text()+"\n")
           
            if self.model is not None:
                self.model.write(self.edit.text())

# Generic app container     
class AcApp(QMainWindow):

    def __init__(self,title='AcApp',mainFrame=QFrame):
        super().__init__()
        self.title=title
        self.mainFrame=mainFrame()
        self.initUI()

    def initUI(self):        
        self.setCentralWidget(self.mainFrame)
       
        #connect signals
        self.mainFrame.debugSignal.connect(self.debugMsg)
       
        #General configuration
        #self.resize(self.width, self.height)
        self.setWindowTitle(self.title)
        self.setGeometry(300, 300, 850, 450)
        #self.setWindowIcon(QIcon(__icon__))
       
        #Debug bar
        self.statusBar()
        self.statusBar().showMessage('Display debug messages')
       
        self.show()

    def debugMsg(self,val):
        self.statusBar().showMessage(val)
   


       
def main():
    app = QApplication(sys.argv)
    app.setQuitOnLastWindowClosed(True)
    #app.setStyleSheet(open("style.txt").read()); #set style according to file 
    #ex = AcApp(__title__)
    ex = AcApp(__title__,EthernetInterface)
    app.quit()
    sys.exit(app.exec_())

       
if __name__ == '__main__':
    main()

Fuentes