fbpixel
Créer un Web Crawler avec Python

Créer un Web Crawler avec Python

Pour récolter des données sur internet, il est possible de créer un Web crawler ou Web scraping avec Python. Un robot d’exploration du Web est un outil qui permet d’extraire des données d’une ou plusieurs pages Web.

Configuration de l’environnement Python

Nous partons du principe que Python3 et pip sont installés sur votre machine. Vous pouvez également utiliser un environnement virtuel pour conserver un projet propre et maitriser les versions de librairies de votre web crawler Python.

Nous allons tout d’abord installer la librairie requests qui permet de faire des requêtes HTTP au serveur pour récupérer les données.

python -m pip install requests

Pour analyser et naviguer dans les données du Web, nous utilisons la librairie Beautiful Soup qui permet de travailler avec des scripts à balises comme le HTML ou le XML

python -m pip install beautifulsoup4

Enfin, nous installons la librairie Selenium qui permet d’automatiser les tâches d’un navigateur Web. Elle permet d’afficher des pages web dynamiques et de réaliser des actions sur l’interface. Cette librairie permet à elle seule de faire du Web scraping sur internet car elle peut travailler avec un site web dynamique qui fonctionne avec JavaScript.

python -m pip install selenium

Pour faire fonctionner Selenium avec Mozilla, vous aurez besoin de télécharger Geckodriver

Récupérer une page Web avec resquest

Imaginons que nous souhaitions récupérer les données techniques d’une carte Arduino, nous pouvons charger la page désirée avec requests et bs4

page = requests.get("https://docs.arduino.cc/hardware/uno-rev3/")
content = BeautifulSoup(page.text, 'html.parser')

En observant la structure de la page, vous pouvez repérer les balises, classes, identifiants ou textes qui vous intéressent. Dans cet exemple, nous récupérons

  • le nom de la carte
  • la description de la carte

N.B.: Vous pouvez retrouver la structure de la page web sur votre navigateur avec clique-droit sur la page puis “Inspecter”

import requests
from bs4 import BeautifulSoup

print("Starting Web Crawling ...")

#website to crawl
website="https://docs.arduino.cc/hardware/uno-rev3/"

#google search
#keywords = ["arduino","datasheet"]
#googlesearch = "https://www.google.com/search?client=firefox-b-d&q="
#search = "+".join(keywords)
#website = googlesearch+search

# get page
page = requests.get(website)

#extract html data
content = BeautifulSoup(page.text, 'html.parser')

# extract tags
h1_elms = content.find_all('h1')
print("Board : ",h1_elms)

#get element by class
description = content.find(class_="product-features__description").text
print("Description : ",description)
Starting Web Crawling ...
Board :  [<h1>UNO R3</h1>]
Description :  Arduino UNO is a microcontroller board based on the ATmega328P. It has 14 digital input/output pins (of which 6 can be used as PWM outputs), 6 analog inputs, a 16 MHz ceramic resonator, a USB connection, a power jack, an ICSP header and a reset button. It contains everything needed to support the microcontroller; simply connect it to a computer with a USB cable or power it with a AC-to-DC adapter or battery to get started. You can tinker with your UNO without worrying too much about doing something wrong, worst case scenario you can replace the chip for a few dollars and start over again.

On pourrait imaginer boucler cet opération sur une liste d’URL pour plusieurs cartes.

websites = [
    "https://docs.arduino.cc/hardware/uno-rev3/",
    "https://docs.arduino.cc/hardware/nano/",
    "https://docs.arduino.cc/hardware/mega-2560/",
    "https://docs.arduino.cc/hardware/leonardo/",
]

Avec cette méthode, on ne peut malheureusement pas charger la liste détaillé des spécification “Tech Specs” pour cela nous devons nous servir du navigateur.

Mettre en place un Web Crawler avec Selenium

Pour charger une page rien de plus facile

from selenium import webdriver

GECKOPATH = "PATH_TO_GECKO"
sys.path.append(GECKOPATH)

print("Starting Web Crawling ...")

#website to crawl
website="https://docs.arduino.cc/hardware/uno-rev3/"

#create browser handler
browser = webdriver.Firefox()
browser.get(website)

#browser.quit()

Validation des cookies

En affichant la page, vous aller certainement tomber sur la bannière de cookie qu’il faudra valider ou non pour continuer la navigation. Pour cela, il faut retrouver et cliquer sur le bouton “accepter”

def acceptcookies():
	"""class="iubenda-cs-accept-btn iubenda-cs-btn-primary"""
	browser.find_elements(By.CLASS_NAME,"iubenda-cs-accept-btn")[0].click()
acceptcookies()

Attente de chargement

Comme la page est affiché dans le navigateur, il faut un certain temps pour qu’elle charge les données et que toutes les balises soient affichées. Pour attendre le chargement, vous pouvez attendre un temps arbitraire

browser.implicitly_wait(10)

Ou attendre qu’une balise particulière soit présente comme le bouton d’acceptation des cookies

from selenium.common.exceptions import TimeoutException
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions

def waitForElement(locator, timeout ):
    elm = WebDriverWait(browser, timeout).until(expected_conditions.presence_of_element_located(locator))
    return elm

myElem =waitForElement((By.CLASS_NAME , 'iubenda-cs-accept-btn'),30)

N.B: Si vous rencontrez d’autre problème (élément inconnu , bouton non cliquable, etc.) dans le script alors qu’il n’y a pas de soucis sur la page Web, n’hésitez pas à utiliser la fonction time.sleep()

Chercher et appuyer sur un élément DOM

Pour afficher les spécifications techniques, le script doit cliquer sur l’onglet ‘Tech Specs’. Il faut donc trouver l’élément à partir du texte. Pour cela, il y a deux méthodes: tester le texte de l’élément ou utiliser Xpath

#get element by text 
btn_text = 'Tech Specs'
btn_elms = browser.find_elements(By.CLASS_NAME,'tabs')[0].find_elements(By.TAG_NAME,'button')
for btn in btn_elms:
     if btn.text == btn_text:
          btn.click()

spec_btn = browser.find_element(By.XPATH, "//*[contains(text(),'Tech Specs')]")
spec_btn.click()

Récupérer les données désirées

Une fois la page souhaitée chargée, vous pouvez récupérer les données

Soit toutes les données qui sont affichées sous forme de tableau

#get all rows and children
print("Tech specs")
print("-------------------------------------")

tr_elms = browser.find_elements(By.TAG_NAME,'tr')

for tr in tr_elms:
     th_elms = tr.find_elements(By.XPATH, '*')
     if len(th_elms)>1:
        print(th_elms[0].text, " : ", th_elms[1].text)

Soit une donnée spécifique

#get parent and siblings
print("Specific data")
print("-------------------------------------")
data_row = browser.find_element(By.XPATH, "//*[contains(text(),'Main Processor')]")
data = data_row.find_element(By.XPATH, "following-sibling::*[1]").text
print(data_row.text, " : ", data)

Résultat du crawling des spécifications

Starting Web Crawling ...
Page is ready!
Tech specs
-------------------------------------
Name  :  Arduino UNO R3
SKU  :  A000066
Built-in LED Pin  :  13
Digital I/O Pins  :  14
Analog input pins  :  6
PWM pins  :  6
UART  :  Yes
I2C  :  Yes
SPI  :  Yes
I/O Voltage  :  5V
Input voltage (nominal)  :  7-12V
DC Current per I/O Pin  :  20 mA
Power Supply Connector  :  Barrel Plug
Main Processor  :  ATmega328P 16 MHz
USB-Serial Processor  :  ATmega16U2 16 MHz
ATmega328P  :  2KB SRAM, 32KB FLASH, 1KB EEPROM
Weight  :  25 g
Width  :  53.4 mm
Length  :  68.6 mm
Specific data
-------------------------------------
Main Processor  :  ATmega328P 16 MHz
PS D:\Formation\Python\WebCrawler> 

Récupérer des données sur différentes pages

Une fois que vous maitrisez ces outils et avez une bonne idée des données à récupérer et de la structure des pages Web, vous pouvez scraper des données sur plusieurs pages. Dans ce dernier exemple, nous récupérons les données techniques de différentes cartes Arduino. Pour cela, nous créons une boucle qui va exécuter le code précédent sur une liste de site

websites = [
    "https://docs.arduino.cc/hardware/uno-rev3/",
    "https://docs.arduino.cc/hardware/nano/",
    "https://docs.arduino.cc/hardware/mega-2560/",
    "https://docs.arduino.cc/hardware/leonardo/",
]

import sys
import time
import requests
from bs4 import BeautifulSoup
from selenium import webdriver
from selenium.webdriver.common.by import By

from selenium.common.exceptions import TimeoutException
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions

GECKOPATH = "D:\\AranaCorp\\Marketing\\Prospects"
sys.path.append(GECKOPATH)

print("Starting Web Crawling ...")

websites = [
    "https://docs.arduino.cc/hardware/uno-rev3/",
    "https://docs.arduino.cc/hardware/nano/",
    "https://docs.arduino.cc/hardware/mega-2560/",
    "https://docs.arduino.cc/hardware/leonardo/",
]

#create browser handler
browser = webdriver.Firefox()#Firefox(firefox_binary=binary)
def acceptcookies():
    #class="iubenda-cs-accept-btn iubenda-cs-btn-primary
    browser.find_elements(By.CLASS_NAME,"iubenda-cs-accept-btn")[0].click()
     
def waitForElement(locator, timeout ):
    elm = WebDriverWait(browser, timeout).until(expected_conditions.presence_of_element_located(locator))
    return elm

cookie_accepted=False
for website in websites:
    browser.get(website)
    time.sleep(2)

    if not cookie_accepted: #accept cookie once
        myElem =waitForElement((By.CLASS_NAME , 'iubenda-cs-accept-btn'),30)
        print("Page is ready!")
        acceptcookies()
        cookie_accepted = True
    else:
         myElem =waitForElement((By.CLASS_NAME , 'tabs__item'),30)
         
    #get board name
    name = browser.find_element(By.TAG_NAME,'h1').text

    #get tab Tech Specs
    btn_text = 'Tech Specs'
    spec_btn = WebDriverWait(browser, 20).until(expected_conditions.element_to_be_clickable((By.XPATH, "//*[contains(text(),'{}')]".format(btn_text))))
    spec_btn.click()
    #browser.execute_script("arguments[0].click();", spec_btn) #use script to click

    #get all rows and children
    print(name+" "+btn_text)
    print("-------------------------------------")

    tr_elms = browser.find_elements(By.TAG_NAME,'tr')

    for tr in tr_elms:
        th_elms = tr.find_elements(By.XPATH, '*')
        if len(th_elms)>1:
            print(th_elms[0].text, " : ", th_elms[1].text)


    #get parent and siblings
    print("Specific data")
    print("-------------------------------------")
    try:
        data_row = browser.find_element(By.XPATH, "//*[contains(text(),'Main Processor')]")
    except:
        data_row = browser.find_element(By.XPATH, "//*[contains(text(),'Processor')]")  
    data = data_row.find_element(By.XPATH, "following-sibling::*[1]").text
    print(data_row.text, " : ", data)

browser.quit()
Starting Web Crawling ...
Page is ready!
UNO R3 Tech Specs
-------------------------------------
Main Processor  :  ATmega328P 16 MHz
Nano Tech Specs
-------------------------------------
Processor  :  ATmega328 16 MHz
Mega 2560 Rev3 Tech Specs
-------------------------------------
Main Processor  :  ATmega2560 16 MHz
Leonardo Tech Specs
-------------------------------------
Processor  :  ATmega32U4 16 MHz

Combiner Selenium et BeautifulSoup

Il est possible de combiner les deux librairies afin de vous apporter toutes leurs fonctionnalités

from bs4 import BeautifulSoup
from selenium import webdriver

browser = webdriver.Firefox()
browser.get(website)

html = browser.page_source
content = BeautifulSoup(html, 'lxml')

browser.quit()

Applications

  • Automatiser des tâches de relevée de données sur le Web
  • Créer votre banque d’image pour l’entrainement de réseau de neurone
  • Trouver des prospect
  • Faire un étude de marché

Sources

Communication entre serveur et client WebSockets avec Python

Communication entre serveur et client WebSockets avec Python

Nous allons voir comment mettre en place une communication entre un serveur et un client en utilisant le protocole Websockets sous Python. WebSockets est un protocole de communication web simple et robuste permettant la communication en temps réel.

Installation de la librairie Websockets

Pour utiliser les WebSockets avec Python, nous installons le paquet nécessaire..

python3 -m pip install websockets

Nous utilisons aussi la librairie asyncio qui permet de faire de la programmation asynchrone pour le développement de serveur performant

Récupérer l’adresse IP du serveur

Comme dans toute communication internet, pour établir une connexion entre le client et le serveur, il faut connaître l’addresse IP du serveur.

Pour récupérer l’adresse IP de la machine serveur vous pouvez utiliser les commandes ipconfig (Windows) ou ifconfig/ ip addr (Linux) (ici: 192.168.1.59)

Il est aussi possible d’utiliser le paquet socket dans le script Python

def getIpAddress():
	import socket
	s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
	s.connect(("8.8.8.8", 80))
	ipaddress = s.getsockname()[0]
	s.close()
	return ipaddress

ipaddress = getIpAddress()

Notez l’adresse, vous l’utiliserez dans le code client.

Code Python pour le serveur WebSocket

Pour lancer le serveur, nous définissons une fonction main qui va ouvrir le serveur sur le port 8765 et le faire tourner en boucle. Nous appelons aussi la fonction callback echo() qui va gérer la réception de message. Dans cet exemple, nous renvoyons le message tel quel au client.

#!/usr/bin/env python
# python3 -m pip install websockets

import asyncio
from websockets.server import serve

def getIpAddress():
	import socket
	s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
	s.connect(("8.8.8.8", 80))
	ipaddress = s.getsockname()[0]
	s.close()
	return ipaddress

ipaddress = getIpAddress()
port = 8765

async def echo(websocket):
	async for message in websocket:
		print("received from {}:{} : ".format(websocket.remote_address[0],websocket.remote_address[1]) + message)
		await websocket.send(message)

async def main():
	print("Server is activated on ws://{}:{}".format(ipaddress,port))
	#async with serve(echo, "localhost", 8765):
	async with serve(echo, "0.0.0.0", port):
		await asyncio.Future()  # run forever

asyncio.run(main())

N.B.: pour que le serveur soit visible sur le réseau extérieur à l’ordinateur vous devez spécifier l’adresse “0.0.0.0” à la place de “localhost”

Code Python pour le client WebSocket

Dans le code client, nous nous connectons au serveur en spécifiant l’adresse IP et le port. Puis, nous envoyons un message. Enfin on attend le message de réponse pour l’afficher.

#!/usr/bin/env python
# python3 -m pip install websockets

import asyncio
from websockets.sync.client import connect

def hello():
    #with connect("ws://localhost:8765") as websocket:
    with connect("ws://192.168.1.52:8765") as websocket:
    
        websocket.send("Hello world!")
        message = websocket.recv()
        print(f"Received from server : {message}")

hello()

Résultat

Dans un terminal, lancez d’abord le script serveur: python websocket_server.py

Dans un second terminal, lancez ensuite le script client ; python websocket_client.py

Échange de message JSON via WebSocket

Le format JSON(JavaScript Object Notation) est un format d’échange de donnée très populaire dans la communication web. NousPour échanger des données JSON entre serveur et client nous utilisons la libraire pré-installée json

Dans le script client, nous allons envoyer une requête au format JSON. Le serveur va recevoir cette requête vérifier les valeurs et envoyer une réponse avec le résultat de la validation.

Les fonctions à connaitre pour gérer le format JSON sont:

  • json.loads() pour passer d’un String à un dict
  • json.dumps() pour passer d’un dict à un String

D’autre format Python peuvent être converti en leur équivalent JSON

PythonJSON
dictobject
list, tuplearray
strstring
int, float, intnumber
Truetrue
Falsefalse
Nonenull

script client

#!/usr/bin/env python
# python3 -m pip install websockets
import json
import asyncio
from websockets.sync.client import connect

jsondata = {"type": "setParam", "param1": 30, "param2": 2.3}
jsonwrong = {"type": "setParam", "param1": 30, "param2": -2.3}
jsonunkn = {"type": "setData", "param1": 30, "param2": 2.3}

def hello():
	with connect("ws://192.168.1.52:8765") as websocket:
		websocket.send("Hello world!")
		message = websocket.recv()
		print(f"Received from server : {message}")

		websocket.send(json.dumps(jsondata))
		message = websocket.recv()
		print(f"Received from server : {message}")

		websocket.send(json.dumps(jsonwrong))
		message = websocket.recv()
		print(f"Received from server : {message}")
				
		websocket.send(json.dumps(jsonunkn))
		message = websocket.recv()
		print(f"Received from server : {message}")

hello()

script serveur

#!/usr/bin/env python
# python3 -m pip install websockets

import json
import asyncio
from websockets.server import serve

def getIpAddress():
	import socket
	s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
	s.connect(("8.8.8.8", 80))
	ipaddress = s.getsockname()[0]
	s.close()
	return ipaddress

ipaddress = getIpAddress()
port = 8765

async def echo(websocket):
	async for message in websocket:
		print("received from {}:{} : ".format(websocket.remote_address[0],websocket.remote_address[1]) + message)
		
		if('{' not in message):
			await websocket.send(message)
		else:
			request = json.loads(message)
			answer = {}
			if(request['type'] == 'setParam'):
				answer['type'] = request['type']
				if(request['param1']<100 and request['param2']>1.2):
					answer['valid'] = True
					for key, val in request.items():
						print("\t"+key+": ", val)
				else:
					answer['valid'] = False
			else:
				answer['type'] = 'unknown'
				answer['valid'] = False
				
			await websocket.send(json.dumps(answer))

async def main():
	print("Server is activated on ws://{}:{}".format(ipaddress,port))
	#async with serve(echo, "localhost", 8765):
	async with serve(echo, "0.0.0.0", port):
		await asyncio.Future()  # run forever

asyncio.run(main())

Résultat

Sources

Créer un script Python sous ROS2

Créer un script Python sous ROS2

Dans ce tutoriel, nous allons voir comment créer et lancer des script Python sous ROS2. Vous pourrez ainsi créer vos propres noeuds et commencer à développer sous ROS.

Créer un espace de travail

Une bonne pratique pour développer sous ROS2 est de créer des workspaces dans lesquels on va installer les paquets désirés, séparés de l’installation principale.

Installation de colcon

sudo apt install python3-colcon-common-extensions

Créer le dossier

mkdir -p ~/tuto_ws/src
cd ~/tuto_ws

Copier ensuite un projet type, comme tutoriel contenant turtlesim

git clone https://github.com/ros/ros_tutorials.git -b iron

Puis construisez le projet à l’aide de la commande

colcon build

Pour charger votre workspace dans un nouveau terminal

source /opt/ros/iron/setup.bash
cd ~/ros2_ws
source install/local_setup.bash

N.B.: vous pouvez mettre les commandes suivantes dans un fichier ros_profile pour le source en une seule commande “source ros_profile”

Créer un Package python

ROS2 offre un outil simple pour créer l’architecture de fichiers d’un paquets Python ou C++. Vous devez alors spécifier au moins le nom du paquet (my_paquet). Il est possible également de donner le nom du nœud.

Dans le répertoire ~/ros2_sw/src, entrez la commande suivante

ros2 pkg create --build-type ament_python --license Apache-2.0 --node-name my_node my_package --license Apache-2.0

le script python se trouve alors dans le répertoire ~/ros2_ws/src/my_package/my_package

Une fois le paquet créé, vous pouvez ajouter autant de nœud que vous le souhaitez dans ce dossier.

Il faudra penser à mettre à jour les fichiers setup.py et package.xml avec les dépendances et points d’entrée

Installer les dépendances

Avant de compiler le projet, il est conseillé de résoudre les dépendances.

rosdep install -i --from-path src --rosdistro iron -y

Compiler le projet

Il est possible de compiler le projet entier ou de sélectionner un paquet en particulier

colcon build --packages_select my_package --symlink-install

N.B.: symlink-install permet de ne pas recompiler à chaque changement du script Python

Lancer le nœud

Une fois le paquet compilé, vous pouvez lancer le nœud à partir de la commande

ros2 run my_package my_node

Le code de base lors de la création d’un nœud

def main():
    print('Hi from my_package.')

if __name__ == '__main__':
    main()

Au lancement ce code affichera simplement le text du print

Création d’un Publisher et d’un Subscriber pour Turtlesim

Nous allons voir dans cet exemple, comment créer un paquet avec deux nœuds, l’un venant contrôler la tortue, l’autre, venant lire la position.

Création du paquet

ros2 pkg create --build-type ament_python --license Apache-2.0 --node-name projects control_turtle --license Apache-2.0

Voici l’architecture de fichiers désiré

.
├── LICENSE
├── package.xml
├── projects
│   ├── control_turtle.py
│   └── read_turtle.py
├── setup.cfg
└── setup.py

Ajouter les scripts Python

  • Publisher control_turtle.py

On importe le type de message à publier Twist pour le contrôle de la vitesse de la tortue

On se connecte au topic en précisant le nom (‘/turtle1/cmd_vel’) et le type de donnée (Twist)

On crée un timer qui nous permettra de publier régulièrement sur le topic (create_timer)

Enfin on publie la vitesse lineaire et angulaire

#!/usr/bin/env python
# -*- coding: utf-8 -*-
#
#  control_turtle.py


import rclpy
from rclpy.node import Node
from geometry_msgs.msg import Twist

import os

# Node should have the same name as the node file
nodename= os.path.splitext(os.path.basename(__file__))[0]

class TurtleController(Node):

	def __init__(self):
		super().__init__(nodename)
		self.get_logger().info(nodename+" started")
		self.publisher_ = self.create_publisher(
			Twist,
			'/turtle1/cmd_vel',
			10)
		timer_period = 0.5  # seconds
		self.timer = self.create_timer(timer_period, self.timer_callback)
		self.i = 0

	def timer_callback(self):
		#msg = String()
		#msg.data = 'Hello World: %d' % self.i
		msg = Twist()
		msg.linear.x = 1.0
		msg.angular.z = 0.5
		self.publisher_.publish(msg)
		self.get_logger().info('Publishing: "%s"' % msg)
		self.i += 1


def main(args=None):
	rclpy.init(args=args)

	turtle_controller = TurtleController()
	try:
		rclpy.spin(turtle_controller)
		turtle_controller.destroy_node()
		rclpy.shutdown()
	except:
		turtle_controller.get_logger().info(nodename+" stopped")


if __name__ == '__main__':
	main()

  • Subscriber read_turtle.py

On importe le type de message à publier Pose

On se connecte au topic en précisant le nom (‘/turtle1/pose’) et le type de donnée (Pose)

On écrit la fonction d’écoute qui s’exécute lorsqu’une une nouvelle donnée est disponible sur le topic

On crée un timer qui nous permettra d’afficher les valeurs à la féquence désirée

Enfin on publie la vitesse lineaire et angulaire

#!/usr/bin/env python
# -*- coding: utf-8 -*-
#
#  control_turtle.py


import rclpy #rospy
from rclpy.node import Node
from turtlesim.msg import Pose


import os

# Node should have the same name as the node file
nodename= os.path.splitext(os.path.basename(__file__))[0]

class TurtleReader(Node):

	def __init__(self):
		super().__init__(nodename)
		self.get_logger().info(nodename+" started")
		self.subscription = self.create_subscription(
			Pose,
			'/turtle1/pose',
			self.listener_callback,
			10)
		self.subscription  # prevent unused variable warning
		self.timer = self.create_timer(2, self.timer_callback)
		self.msg = None
		
	def listener_callback(self, msg):
		self.msg = msg
		
	def timer_callback(self):
		self.get_logger().info("I read %s" % self.msg )
		


def main(args=None):
	rclpy.init(args=args)

	turtle_reader = TurtleReader()
	try:
		rclpy.spin(turtle_reader)
		turtle_reader.destroy_node()
		rclpy.shutdown()
	except:
		turtle_reader.get_logger().info(nodename+" stopped")





if __name__ == '__main__':
	main()

Modifier le fichier setup.py

Dans les points d’entrée du fichier setup.py, vous devez spécifier les noms des noeuds ainsi que la fonction à lancer (main)

entry_points={
        'console_scripts': [
                'control_turtle = projects.control_turtle:main',
                'read_turtle = projects.read_turtle:main',
        ],
},

Compiler le paquet

colcon build --packages_select projects --symlink-install

Lancer les noeuds

Pour lancer un noeud dans un terminal, vous devez sourcer le projet

source /opt/ros/iron/setup.bash
cd ~/ros2_ws
source install/local_setup.bash

Dans trois terminaux différents, lancez les commandes suivantes

ros2 run turtlesim turtlesim_node #terminal1
ros2 run projects read_turtle #terminal2
ros2 run projects control_turtle #terminal3

Résultat

Vous pouvez voir la tortue décrire la forme d’un cercle et la position évoluer dans la fentre du subscriber

Lancer un noeud avec des arguments

Il est possible de configurer les noeuds lors de leur exécution. Dans cet exemple, nous allons définir des paramètre d’entrée pour la vitesse linéaire et angulaire. Nous allons donc modifier le code du Publieur ci-dessus

Dans l’initialisation du noeud, nous déclarons les paramètres avec les valeurs par défaut

    self.declare_parameter('x', 1.0)
    self.declare_parameter('z', 0.5)

Nous pouvons ensuite récupérer la valeur de ces paramètres pour les stocker dans des variables que nous utilisons dans la fonction callback

    self.linx = self.get_parameter('linx').get_parameter_value().double_value
    self.angz = self.get_parameter('angz').get_parameter_value().double_value

Vous pouvez désormais lancer un noeud avec et sans paramètre

ros2 run projects circle_turtle #default param
ros2 run projects circle_turtle --ros-args -p linx:=0.5 -p angz:=1.0

Code complet

#!/usr/bin/env python
# -*- coding: utf-8 -*-
#
#  control_turtle.py
import rclpy #rospy
from rclpy.node import Node
from std_msgs.msg import String

from turtlesim.msg import Pose
from geometry_msgs.msg import Twist

import os

nodename= os.path.splitext(os.path.basename(__file__))[0]

class TurtleController(Node):

	def __init__(self):
		super().__init__(nodename)
		self.get_logger().info(nodename+" started")
		self.publisher_ = self.create_publisher(
			Twist,
			'/turtle1/cmd_vel',
			10)
			
		self.declare_parameter('linx', 1.0)
		self.declare_parameter('angz', 0.5)
		self.linx = self.get_parameter('linx').get_parameter_value().double_value
		self.angz = self.get_parameter('angz').get_parameter_value().double_value
			
		timer_period = 0.5  # seconds
		self.timer = self.create_timer(timer_period, self.timer_callback)
		self.i = 0

	def timer_callback(self):
		#msg = String()
		#msg.data = 'Hello World: %d' % self.i
		msg = Twist()
		msg.linear.x = self.linx
		msg.angular.z = self.angz
		self.publisher_.publish(msg)
		self.get_logger().info('Publishing: "%s"' % msg)
		self.i += 1


def main(args=None):
	#os.system('ros2 service call /reset std_srvs/srv/Empty') #reset pos
	rclpy.init(args=args)

	turtle_controller = TurtleController()
	try:
		rclpy.spin(turtle_controller)
		turtle_controller.destroy_node()
		rclpy.shutdown()
	except:
		turtle_controller.get_logger().info(nodename+" stopped")


if __name__ == '__main__':
	main()

Ouvrir les différents terminaux avec un script

Pour gagner du temps lors de votre développement, vous pouvez ouvrir les terminaux et lancer les noeuds à partir d’un seul script bash

#open terminal and run turtlesim
gnome-terminal -- $SHELL -c "source ros_profile && ros2 run turtlesim turtlesim_node;exec $SHELL"
gnome-terminal -- $SHELL -c "source ros_profile && ros2 run projects control_turtle;exec $SHELL"

Troubleshoot

  • SetupTools deprecation

Vérifier la version setuptools

python3
import setuptools
setuptools.__version__

‘59.6.0’

Installer pip

sudo apt install python3-pip

puis installer la version 58.2.0 de setuptools

python3 -m pip install setuptools==58.2.0

Sources

Communication UDP avec React Native

Communication UDP avec React Native

Dans ce tutoriel, nous allons mettre en place une communication avec le protocole UDP sur une application React Native. L’application React Native pourra se comporter comme Serveur ou comme Client UDP. Nous utilisons un ordinateur avec un script Python comme partenaire de communication.

Configuration de React Native

Premièrement, créer un projet React Native UDPTerminalApp

Installer les librairies udp et netinfo

npm install react-native-udp
npm install react-native-network-info

Description du code de l’application

Importation

Pour utiliser les librairies, nous importons tout d’abord leur code.

   import dgram from 'react-native-udp'
   import UdpSocket from 'react-native-udp/lib/types/UdpSocket.js';
   import { NetworkInfo } from 'react-native-network-info'

Composant principal

Puis nous créons le composant fonctionnel qui nous permettra de gérer la communication Serveur/Client UDP.

const UDPTerminal = () =>  {
  const [isServer, setIsServer] = useState(false);
  const [connectionStatus, setConnectionStatus] = useState('');
  const [socket, setSocket] = useState<UdpSocket>();
  const [ipAddress, setIpAddress] = useState('');
  const [ipServer, setIpServer] = React.useState('');
  const [messageText, setMessageText] = useState("");
  const [messageToSend, setMessageToSend] = useState("Hello from server");
  const [receivedMessage, setReceivedMessage] = useState("");
  • isServer est True si l’application se comporte comme un serveur et false si elle se comporte comme un client
  • connectionStatus affiche le status de la connexion
  • socket contient la variable socket de la communication
  • ipAddress contient l’adresse IP de l’appareil Android sur le réseau
  • ipServer contient l’adresse IP entrée sur l’interface
  • messageText contient le texte tapé sur l’interface
  • messageToSend contient le message à envoyer au client ou au serveur
  • receivedMessage contient les messages reçu depuis le client ou le serveur

Gestion de la communication UDP

Le code qui permet la gestion de la communication UDP, que ce soit en mode serveur ou client, est entièrement contenu dans un hook useEffect

  useEffect(() => {
    const fetchIpAddress = async () => {
      const ip = await NetworkInfo.getIPV4Address();
      setIpAddress(ip);
      console.log("ip adresses ; ", ip)
    };

    fetchIpAddress();

    if (isServer) {
      // Configure app as server
      const server = dgram.createSocket('udp4');

      server.on('message', (data, rinfo) => {
        
        server.send(messageToSend, undefined, undefined, rinfo?.port, rinfo?.address, (error) => {
          if (error) {
            console.log('Error sending message:', error);
          } else {
            console.log('Message sent correctly');
          }
        });
        console.log('Received message:', data.toString());
        setReceivedMessage(receivedMessage => receivedMessage+data.toString()+"\n")
      });

      server.on('listening', () => {
        console.log('Server listening on port:', server.address().port);
        setConnectionStatus(`Server listening on port ${server.address().port}`);
      });
      try{
        setReceivedMessage("");
        server.bind(8888);
        setSocket(server);
      }catch(error){
        console.log("error binding server",error);
      }
      
    } else {
      setConnectionStatus(`Server disconnected`);
      // Configure app as client
      const client = dgram.createSocket('udp4');
      try{
        setReceivedMessage("");
        client.bind(8887);
        setSocket(client);
      }catch(error){
        console.log("error binding client",error);
      }
      
    }

    return () => {
      socket && socket.close();
    };
  }, [isServer, messageToSend, messageText]);

La fonction fetchIpAdress permet de récupérer l’adresse IP de l’appareil.

En mode serveur

  • nous créons le socket dgram.createSocket
  • lorsqu’un message est reçu: nous envoyons messageToSend au client
  • Puis nous affichons le message reçu
  • nous lions le serveur au port 8888

En mode client

  • nous créons le socket dgram.createSocket
  • nous lions le client au port 8887

Dans la fonction suivante

  • nous envoyons le message messageToSend au serveur
  • puis nous attendons la réponse que nous affichons dans receivedMessage

Fonction envoyer message

Nous définissons également une fonction qui permet de mettre à jour le message envoyé en mode Server et d’envoyer le message en mode Client.

  const sendMessage = () => {
    setMessageToSend(messageText);
    if (isServer) return;

    const client = socket;
    console.log("send message to "+ipServer)
    client.send(messageToSend, undefined, undefined, 8888, ipServer, (error) => {
      if (error) {
        console.log('Error sending message:', error);
      } else {
        console.log('Message sent correctly');
      }
    });
    client.on('message', async (message: { toString: () => string; }) => {
      setReceivedMessage(receivedMessage+message.toString()+"\n")
    });
  };

Définition de l’affichage

Enfin l’interface graphique de l’application est défini comme suit:

  • Le titre de l’application
  • L’adresse IP de l’appareil
  • Un bouton pour sélectionner le mode client ou serveur
  • Un encart permettant d’éditer le message à envoyer
  • Un bouton d’envoi
  • Un encart pour spécifier l’adresse IP du serveur en mode client
  • Une zone de texte pour afficher les messages reçu
return (
    <View style={styles.mainBody}>
      <Text
        style={styles.mainTitle}>
        AC UDP Terminal
      </Text>
      <ScrollView>

      <View style={styles.deviceItem}>
                      <View>
                        <Text style={styles.deviceName}>{ipAddress}</Text>
                        <Text style={styles.deviceInfo}>{connectionStatus}</Text>
                      </View>
                      <TouchableOpacity
                        onPress={() => setIsServer(!isServer)}
                        style={styles.deviceButton}>
                        <Text
                          style={styles.buttonText}>
                          {isServer ? 'Server' : 'Client'}
                        </Text>
                      </TouchableOpacity>
                    </View>


      <View
              style={styles.inputBar}>        
              <TextInput
                style={styles.textInput}
                placeholder="Enter a message"
                value={messageText}
                onChangeText={(text) =>    setMessageText(text)
                }
              />
              <TouchableOpacity
                        onPress={() => sendMessage()
                        }
                        style={[styles.sendButton]}>
                        <Text
                          style={styles.buttonText}>
                          SEND
                        </Text>
                      </TouchableOpacity>
        </View>

      <TextInput
        placeholder="Server IP"
        onChangeText={setIpServer}
        value={ipServer}
      />
      <Text>Received Message:</Text>
              <TextInput
              editable = {false}
              multiline
              numberOfLines={20}
              maxLength={300}
              style={styles.textOutput} >
                    {receivedMessage}
              </TextInput>
      </ScrollView>
    </View>
  );

N.B.: le style de l’interface est défini dans le fichier /src/styles/styles.jsx

React-Native UDP Client – Python UDP Server

Dans cet exemple, nous configurons l’application comme client et l’ordinateur comme serveur. Nous allons envoyer un message au serveur, depuis l’application, qui renverra une réponse au client.

Python UDP Server Code

#!/usr/bin/env python
# -*- coding: utf-8 -*-
import socket

localIP     = "0.0.0.0" #"127.0.0.1"
localPort   = 8888
bufferSize  = 1024

msgFromServer       = "Hello UDP Client"
bytesToSend         = str.encode(msgFromServer)

# Create a datagram socket
UDPServerSocket = socket.socket(family=socket.AF_INET, type=socket.SOCK_DGRAM)
 
# Bind to address and ip
UDPServerSocket.bind((localIP, localPort))
print("UDP server up and listening on port {}".format(localPort))

# Listen for incoming datagrams
while(True):

	data,addr = UDPServerSocket.recvfrom(bufferSize)
	print("{} > {}".format(addr,data))

	# Sending a reply to client
	UDPServerSocket.sendto(bytesToSend, addr)

N.B.: Pour écouter sur toutes les adresses locales nous utilisons 0.0.0.0. Si vous souhaitez tester la communication entre client et serveur sur le même ordinateur vous pouvez utiliser l’adresse 127.0.0.1

Sur l’application, dans l’encart ServerIp, entrez l’adresse IP de l’ordinateur (ici: 192.168.1.70). Modifiez ensuite le message à envoyer puis appuyer sur le bouton “SEND”.

Python terminal

UDP server up and listening on port 8888
('192.168.1.5', 8887) > b'Hello server'
('192.168.1.5', 8887) > b'Hello server'

Sur le terminal Python, nous recevons le message “Hello server” envoyé depuis l’application. Le serveur UDP renvoie “Hello UDP Client”

React-Native UDP Server – Python UDP Client

Ici, nous configurons l’application comme serveur et l’ordinateur comme client. Dans le code Python, nous entrons l’adresse IP du serveur.

Python UDP Client

#!/usr/bin/env python
# -*- coding: utf-8 -*-
import socket
import time
HOST = "192.168.1.5" #Server IP #"127.0.0.1" local address
PORT = 8888
bufferSize = 1024

counter=0
while True:
	# Create a UDP socket at client side
	with socket.socket(family=socket.AF_INET, type=socket.SOCK_DGRAM) as client:
		# Send to server using created UDP socket
		client.sendto(str.encode("client counter : "+str(counter)), (HOST,PORT))
		data,addr= client.recvfrom(bufferSize)
		msg = "{} > {}".format(addr,data)

		print(msg)
	counter+=1
	time.sleep(1)

Python terminal

('192.168.1.5', 8888) > b'Hello client '
('192.168.1.5', 8888) > b'Hello client '

Code complet de l’application

App.tsx

/**
 * npm install react-native-udp
 * npm install react-native-network-info
 * 
 */

import React, {useState, useEffect} from 'react';
import {   
  View, 
  ScrollView, 
  Text,
  TextInput,
  PermissionsAndroid,
  Platform, 
  TouchableOpacity } from 'react-native';
   import dgram from 'react-native-udp'
   import UdpSocket from 'react-native-udp/lib/types/UdpSocket.js';
   import { NetworkInfo } from 'react-native-network-info'
   import {styles} from "./src/styles/styles.jsx"


const UDPTerminal = () =>  {
  const [isServer, setIsServer] = useState(false);
  const [connectionStatus, setConnectionStatus] = useState('');
  const [socket, setSocket] = useState<UdpSocket>();
  const [ipAddress, setIpAddress] = useState('');
  const [ipServer, setIpServer] = React.useState('');
  const [messageText, setMessageText] = useState("");
  const [messageToSend, setMessageToSend] = useState("Hello from server");
  const [receivedMessage, setReceivedMessage] = useState("");

  useEffect(() => {
    const fetchIpAddress = async () => {
      const ip = await NetworkInfo.getIPV4Address();
      setIpAddress(ip);
      console.log("ip adresses ; ", ip)
    };

    fetchIpAddress();

    if (isServer) {
      // Configure app as server
      const server = dgram.createSocket('udp4');

      server.on('message', (data, rinfo) => {
        
        server.send(messageToSend, undefined, undefined, rinfo?.port, rinfo?.address, (error) => {
          if (error) {
            console.log('Error sending message:', error);
          } else {
            console.log('Message sent correctly');
          }
        });
        console.log('Received message:', data.toString());
		if(receivedMessage.length>300){
          setReceivedMessage("");
        }
        setReceivedMessage(receivedMessage => receivedMessage+data.toString()+"\n")
      });

      server.on('listening', () => {
        console.log('Server listening on port:', server.address().port);
        setConnectionStatus(`Server listening on port ${server.address().port}`);
      });
      try{
        setReceivedMessage("");
        server.bind(8888);
        setSocket(server);
      }catch(error){
        console.log("error binding server",error);
      }
      
    } else {
      setConnectionStatus(`Server disconnected`);
      // Configure app as client
      const client = dgram.createSocket('udp4');
      try{
        setReceivedMessage("");
        client.bind(8887);
        setSocket(client);
      }catch(error){
        console.log("error binding client",error);
      }
      
    }

    return () => {
      socket && socket.close();
    };
  }, [isServer, messageToSend, messageText]);

  const sendMessage = () => {
    setMessageToSend(messageText);
    if (isServer) return;

    const client = socket;
    console.log("send message to "+ipServer)
    client.send(messageToSend, undefined, undefined, 8888, ipServer, (error) => {
      if (error) {
        console.log('Error sending message:', error);
      } else {
        console.log('Message sent correctly');
      }
    });
    client.on('message', async (message: { toString: () => string; }) => {
	  if(receivedMessage.length>300){
          setReceivedMessage("");
      }
      setReceivedMessage(receivedMessage+message.toString()+"\n")
    });
  };

  return (
    <View style={styles.mainBody}>
      <Text
        style={styles.mainTitle}>
        AC UDP Terminal
      </Text>
      <ScrollView>

      <View style={styles.deviceItem}>
                      <View>
                        <Text style={styles.deviceName}>{ipAddress}</Text>
                        <Text style={styles.deviceInfo}>{connectionStatus}</Text>
                      </View>
                      <TouchableOpacity
                        onPress={() => setIsServer(!isServer)}
                        style={styles.deviceButton}>
                        <Text
                          style={styles.buttonText}>
                          {isServer ? 'Server' : 'Client'}
                        </Text>
                      </TouchableOpacity>
                    </View>


      <View
              style={styles.inputBar}>        
              <TextInput
                style={styles.textInput}
                placeholder="Enter a message"
                value={messageText}
                onChangeText={(text) =>    setMessageText(text)
                }
              />
              <TouchableOpacity
                        onPress={() => sendMessage()
                        }
                        style={[styles.sendButton]}>
                        <Text
                          style={styles.buttonText}>
                          SEND
                        </Text>
                      </TouchableOpacity>
        </View>

      <TextInput
        placeholder="Server IP"
        onChangeText={setIpServer}
        value={ipServer}
      />
      <Text>Received Message:</Text>
              <TextInput
              editable = {false}
              multiline
              numberOfLines={20}
              maxLength={300}
              style={styles.textOutput} >
                    {receivedMessage}
              </TextInput>
      </ScrollView>
    </View>
  );
}

export default UDPTerminal;

style.jsx

/* ./src/styles/styles.jsx */
import {StyleSheet, Dimensions} from 'react-native';
/**
 * links to palette
 *  https://colorhunt.co/palette/161616346751c84b31ecdbba
 * 
*/
//parameters
let BACKGROUND_COLOR = "#161616"; //191A19
let BUTTON_COLOR = "#346751"; //1E5128
let ERROR_COLOR = "#C84B31"; //4E9F3D
let TEXT_COLOR = "#ECDBBA"; //D8E9A8

const windowHeight = Dimensions.get('window').height;
export const styles = StyleSheet.create({
  mainBody: {
    flex: 1,
    justifyContent: 'center',
    height: windowHeight,
    color:TEXT_COLOR,
    backgroundColor: BACKGROUND_COLOR,
  },

  mainTitle:{
    color: TEXT_COLOR,
    fontSize: 30,
    textAlign: 'center',
    borderBottomWidth: 2,
    borderBottomColor: ERROR_COLOR,
  },

  scanButton: {
    backgroundColor: BUTTON_COLOR,
    paddingBottom: 10,
    paddingTop: 10,
    borderRadius: 10,
    margin: 2,
    marginBottom: 10,
    marginTop: 10,
    
    paddingHorizontal: 20,
    fontWeight: 'bold', 
    fontSize: 12,
  },

  buttonText: {
    color: TEXT_COLOR,
    fontWeight: 'bold',
    fontSize: 12,
	  textAlign: 'center',
  },
  
  noDevicesText: {
    color: TEXT_COLOR,
    textAlign: 'center',
    marginTop: 10,
    fontStyle: 'italic',
  },
  deviceItem: {
    flexDirection: 'row',
    justifyContent: 'space-between',
    marginBottom: 2,
  },
  deviceName: {
    color: TEXT_COLOR,
    fontSize: 14,
    fontWeight: 'bold',
  },
  deviceInfo: {
    color: TEXT_COLOR,
    fontSize: 10,
  },
  deviceButton: {
    backgroundColor: BUTTON_COLOR,
    padding: 10,
    borderRadius: 10,
    margin: 2,
    paddingHorizontal: 20,
    fontWeight: 'bold', 
    fontSize: 12,
  },

  inputBar:{
    flexDirection: 'row',
    justifyContent: 'space-between',
    margin: 5,
  },

  textInput:{
    backgroundColor: '#888888',
    margin: 2,
    borderRadius: 15,
    flex:3,
  },

  sendButton: {
  backgroundColor: BUTTON_COLOR,
  padding: 15,
  borderRadius: 15,
  margin: 2,
  paddingHorizontal: 20,
  },

  textOutput:{
    backgroundColor: '#333333',
    margin: 10,
    borderRadius: 2,
    borderWidth: 1,
    borderColor: '#EEEEEE',
    textAlignVertical: 'top',
  }

});

Applications

  • Pilotez votre projet Arduino, ESP32 ou Raspberry Pi avec une application

Sources

Streams vidéo synchronisés avec OpenCV et Multithreading

Streams vidéo synchronisés avec OpenCV et Multithreading

Nous allons voir dans ce tutoriel comment obtenir des streams vidéo synchronisés avec Python et OpenCV. Une des problématiques du streaming vidéo est d’émettre et d’acquérir des signaux vidéo de qualité et si possible avec le moins de délai possible. La capacité de synchroniser des flux vidéo est de pouvoir traité leurs données simultanément, comme pour la reconnaissance d’objets.

Pré-requis: Streaming vidéo entre deux machines

Matériel

  • Ordinateur avec Python et OpenCV
  • 2 sources vidéo (fichier, stream, webcam, etc.)
  • Une connexion internet ou ethernet

Pour ce tutoriel, j’utilise deux Orange Pi Zero, qui génère les streams vidéo à partir de caméras USB (ArduCam), connectés sur le réseau de l’ordinateur via un switch Ethernet.

Émission des streams

Pour créer le stream à partir du flux vidéo de la caméra, nous utilisons FFMPEG. Nous utilisons le protocole UDP dans lequel nous spécifions l’adresse IP de l’ordinateur et le port sur lequel se trouve le streaming.

Flux vidéo 0

ffmpeg -video_size 640x480 -i /dev/video0 -f mpegts udp://{ip_address}:8553?pkt_size=1316

Flux vidéo 1

ffmpeg -video_size 640x480 -i /dev/video0 -f mpegts udp://{ip_address}:8554?pkt_size=1316

Pour tester le flux vidéo, vous pouvez utiliser la commande ffplay

ffplay upd://127.0.0.1:8553 #video streaming 0
ffplay upd://127.0.0.1:8554 #video streaming 1

Dans le tutoriel, nous utiliserons le filtre drawtext qui permet de rajouter du texte sur la vidéo. Cela nous permet d’afficher l’heure et d’observer facilement le délai.

-vf "drawtext=fontfile=/usr/share/fonts/truetype/ttf-dejavu/DejaVuSans-Bold.tff: text='%{{localtime\:%T}}': fontsize=24: fontcolor=white@0.8: x=7: y=7"

N.B.: il est possible de mettre la commande ffmpeg dans un script Python

Lancement des commandes via SSH

Pour des raisons de simplicité, nous lançons les commandes ffmpeg à partir du script Python via SSH. Pour cela, nous utilisons la librairie paramiko

import socket
import paramiko

#computer ip address
hostname = socket.gethostname()
ip_address = socket.gethostbyname(hostname)
print(f"IP Address: {ip_address}")
#ip_address= "192.168.1.70"

ssh0 = paramiko.SSHClient()
ssh0.set_missing_host_key_policy(paramiko.AutoAddPolicy())
ssh1 = paramiko.SSHClient()
ssh1.set_missing_host_key_policy(paramiko.AutoAddPolicy())

ssh0.connect("192.168.1.32", username="root", password="root")
ssh1.connect("192.168.1.33", username="root", password="root")

#stream_cmd0 = "python3 video-stream.py {}:{}?pkt_size=1316".format(ip_address,8553)
#stream_cmd1 = "python3 video-stream.py {}:{}?pkt_size=1316".format(ip_address,8554)
stream_cmd0="""ffmpeg -video_size 640x480 -i /dev/video0 -vf "drawtext=fontfile=/usr/share/fonts/truetype/ttf-dejavu/DejaVuSans-Bold.tff: text='%{{localtime\:%T}}': fontsize=24: fontcolor=red@0.8: x=7: y=7" -f mpegts udp://{}:{}?pkt_size=1316""".format(ip_address,8553)
stream_cmd1="""ffmpeg -video_size 640x480 -i /dev/video0 -vf "drawtext=fontfile=/usr/share/fonts/truetype/ttf-dejavu/DejaVuSans-Bold.tff: text='%{{localtime\:%T}}': fontsize=24: fontcolor=white@0.8: x=7: y=7" -f mpegts udp://{}:{}?pkt_size=1316""".format(ip_address,8554)

ssh1_stdin, ssh1_stdout, ssh1_stderr = ssh1.exec_command(stream_cmd1)
ssh0_stdin, ssh0_stdout, ssh0_stderr = ssh0.exec_command(stream_cmd0)

N.B.: ne pas oublié de fermer la connexion ssh en fin de programme ssh0.close(), ssh1.close()

Réception des streams non-synchronisés

Pour réceptionner les flux vidéo, nous utilisons OpenCV avec Python.

Nous ouvrons d’abord les deux streams vidéo, que nous lisons dans une boucle tant qu’ils sont ouvert

cap0 = cv2.VideoCapture("udp://127.0.0.1:8553")
cap1 = cv2.VideoCapture("udp://127.0.0.1:8554")

Pour des raisons pratique, nous concaténons l’image dans une même fenêtre en nous assurant qu’elles ont des dimensions compatibles

frame0 =cv2.resize(frame0, (640,480))
frame1 =cv2.resize(frame1, (640,480))
Hori = np.concatenate((frame0, frame1), axis=1)

Enfin nous affichons l’heure locale de l’ordinateur pour comparaisons

Hori = cv2.putText(Hori, date_time,(10, 100),font, 1,(210, 155, 155), 4, cv2.LINE_4)

Voici le code complet pour la capture des streams vidéo non-synchronisés

N.B.: Ce code fonctionne une fois que les commandes ffmpeg ont été lancé sur chaque machine

#!/usr/bin/env python
# -*- coding: utf-8 -*-

import numpy as np
import cv2
import datetime

#<add code to run ffmpeg command via ssh>

cap0 = cv2.VideoCapture("udp://127.0.0.1:8553")
cap1 = cv2.VideoCapture("udp://127.0.0.1:8554")

while cap0.isOpened() and cap1.isOpened():
	# Capture frame-by-frame
	ret0, frame0 = cap0.read()
	ret1, frame1 = cap1.read()

	# Get current date and time  
	now=datetime.datetime.now()
	date_time = now.strftime("%H:%M:%S")
	font = cv2.FONT_HERSHEY_SIMPLEX

	# write the date time in the video frame
	#frame0 = cv2.putText(frame0, date_time,(10, 100),font, 1,(210, 155, 155), 4, cv2.LINE_4)
	#frame1 = cv2.putText(frame1, date_time,(10, 100),font, 1,(210, 155, 155), 4, cv2.LINE_4)
	
	#if (ret0):
	#	# Display the resulting frame
	#	cv2.imshow('Cam 0', frame0)

	#if (ret1):
	#	# Display the resulting frame
	#	cv2.imshow('Cam 1', frame1)

	if (ret0 and ret1):
		frame0 =cv2.resize(frame0, (640,480))
		frame1 =cv2.resize(frame1, (640,480))
		Hori = np.concatenate((frame0, frame1), axis=1)
		Hori = cv2.putText(Hori, date_time,(10, 100),font, 1,(210, 155, 155), 4, cv2.LINE_4)
		cv2.imshow('Cam 0&1', Hori)

	if cv2.waitKey(1) & 0xFF == ord('q'):
		break

#release captures
cap0.release()
cap1.release()
cv2.destroyAllWindows()

Nous observons un retard d’une seconde entre les deux flux vidéo. Ceci n’est pas acceptable si nous souhaitons traité les images de manière synchronisée.

Capture de streams avec Multithreading

La solution la plus simple est de dédier des threads à la capture des images. Pour cela nous utilisons le paquet threading. Nous allons créer une classe VideoStream qui va gérer la lecture du stream dans son propre thread

class VideoStream:
	def __init__(self, src=0):
		self.cap = cv2.VideoCapture(src)
		self.ret, self.frame = self.cap.read()
		self.started = False
		self.read_lock = Lock()
		
	def start(self):
		if self.started:
			return None
		self.started = True
		self.thread = Thread(target=self.update, args=())
		self.thread.start()
		return self
		
	def update(self):
		while self.started:
			ret,frame = self.cap.read()
			self.read_lock.acquire()
			self.ret, self.frame = ret,frame
			self.read_lock.release()
	
	def isOpened(self):
		return self.cap.isOpened()
				
	def read(self):
		self.read_lock.acquire()
		ret = self.ret
		frame = self.frame.copy()
		self.read_lock.release()
		return ret, frame
		
	def release(self):
		self.started = False
		self.thread.join()
		
	def __exit__(self, exc_type, exc_value, traceback):
		self.cap.release()

Nous pouvons ensuite instancier nos deux objets cap0 et cap1

cap0 = VideoStream("udp://127.0.0.1:8553").start()
cap1 = VideoStream("udp://127.0.0.1:8554").start()


while cap0.isOpened() and cap1.isOpened():
	ret0, frame0 = cap0.read()
	ret1, frame1 = cap1.read()

	# Get current date and time  
	#date_time = str(datetime.datetime.now())
	now=datetime.datetime.now()
	date_time = now.strftime("%H:%M:%S")
	font = cv2.FONT_HERSHEY_SIMPLEX
	
	if ret0 and ret1:
		frame0 =cv2.resize(frame0, (640,480))
		frame1 =cv2.resize(frame1, (640,480))
		Hori = np.concatenate((frame0, frame1), axis=1)
		Hori = cv2.putText(Hori, date_time,(10, 100),font, 1,(210, 155, 155), 4, cv2.LINE_4)
		cv2.imshow('Cam 0&1', Hori)
			
	if cv2.waitKey(1) & 0xFF == ord('q'):
		break
#release capture
cap0.release()
cap1.release()
cv2.destroyAllWindows()

Les deux flux vidéos sont maintenant synchronisés et il est possible de les enregistrer dans un fichier vidéo ou de faire du traitement d’image sur les deux streams en même temps.

Code complet la réception de streams vidéo synchronisés

#!/usr/bin/env python
# -*- coding: utf-8 -*-
# pip install opencv-python
# pip install paramiko

import numpy as np
import cv2
import paramiko
import time
import datetime
from threading import Thread, Lock


#computer ip address

import socket
hostname = socket.gethostname()
ip_address = socket.gethostbyname(hostname)
print(f"IP Address: {ip_address}")
#ip_address= "192.168.1.70"

ssh0 = paramiko.SSHClient()
ssh0.set_missing_host_key_policy(paramiko.AutoAddPolicy())
ssh0.connect("192.168.1.32", username="root", password="root")
ssh1 = paramiko.SSHClient()
ssh1.set_missing_host_key_policy(paramiko.AutoAddPolicy())
ssh1.connect("192.168.1.33", username="root", password="root")

#python
#stream_cmd0 = "python3 video-stream.py {}:{}?pkt_size=1316".format(ip_address,8553)
#stream_cmd1 = "python3 video-stream.py {}:{}?pkt_size=1316".format(ip_address,8554)

#ffmpeg
stream_cmd0="""ffmpeg -video_size 640x480 -i /dev/video0 -vf "drawtext=fontfile=/usr/share/fonts/truetype/ttf-dejavu/DejaVuSans-Bold.tff: text='%{{localtime\:%T}}': fontsize=24: fontcolor=red@0.8: x=7: y=7" -f mpegts udp://{}:{}?pkt_size=1316""".format(ip_address,8553)
stream_cmd1="""ffmpeg -video_size 640x480 -i /dev/video0 -vf "drawtext=fontfile=/usr/share/fonts/truetype/ttf-dejavu/DejaVuSans-Bold.tff: text='%{{localtime\:%T}}': fontsize=24: fontcolor=white@0.8: x=7: y=7" -f mpegts udp://{}:{}?pkt_size=1316""".format(ip_address,8554)

#manual
#ffmpeg -video_size 640x480 -i /dev/video0 -f mpegts udp://192.168.1.70:8553?pkt_size=1316
#ffmpeg -video_size 640x480 -i /dev/video0 -f mpegts udp://192.168.1.70:8554?pkt_size=1316

ssh1_stdin, ssh1_stdout, ssh1_stderr = ssh1.exec_command(stream_cmd1)
ssh0_stdin, ssh0_stdout, ssh0_stderr = ssh0.exec_command(stream_cmd0)
#print(ssh0_stdout.read().decode())
#print(ssh1_stdout.read().decode())


class VideoStream:
	def __init__(self, src=0):
		self.cap = cv2.VideoCapture(src)
		self.ret, self.frame = self.cap.read()
		self.started = False
		self.read_lock = Lock()
		
	def start(self):
		if self.started:
			return None
		self.started = True
		self.thread = Thread(target=self.update, args=())
		self.thread.start()
		return self
		
	def update(self):
		while self.started:
			ret,frame = self.cap.read()
			self.read_lock.acquire()
			self.ret, self.frame = ret,frame
			self.read_lock.release()
	
	def isOpened(self):
		return self.cap.isOpened()
				
	def read(self):
		self.read_lock.acquire()
		ret = self.ret
		frame = self.frame.copy()
		self.read_lock.release()
		return ret, frame
		
	def release(self):
		self.started = False
		self.thread.join()
		
	def __exit__(self, exc_type, exc_value, traceback):
		self.cap.release()





print("waiting for response...")


#cap0 = cv2.VideoCapture("udp://127.0.0.1:8553")
#cap1 = cv2.VideoCapture("udp://127.0.0.1:8554")
cap0 = VideoStream("udp://127.0.0.1:8553").start()
cap1 = VideoStream("udp://127.0.0.1:8554").start()


while cap0.isOpened() and cap1.isOpened():
	ret0, frame0 = cap0.read()
	ret1, frame1 = cap1.read()

	# Get current date and time  
	#date_time = str(datetime.datetime.now())
	now=datetime.datetime.now()
	date_time = now.strftime("%H:%M:%S")
	font = cv2.FONT_HERSHEY_SIMPLEX
	
	if ret0 and ret1:
		frame0 =cv2.resize(frame0, (640,480))
		frame1 =cv2.resize(frame1, (640,480))
		Hori = np.concatenate((frame0, frame1), axis=1)
		Hori = cv2.putText(Hori, date_time,(10, 100),font, 1,(210, 155, 155), 4, cv2.LINE_4)
		cv2.imshow('Cam 0&1', Hori)
			
	if cv2.waitKey(1) & 0xFF == ord('q'):
		break

#release capture
cap0.release()
cap1.release()
cv2.destroyAllWindows()
#kill ffmpeg
ssh1_stdin, ssh1_stdout, ssh1_stderr = ssh1.exec_command("killall ffmpeg")
ssh0_stdin, ssh0_stdout, ssh0_stderr = ssh0.exec_command("killall ffmpeg")
#close ssh
ssh0.close()
ssh1.close()

Applications

  • Réseau de caméra de surveillance CCTV
  • Reconnaissance d’objet sur des streams vidéos synchronisés

Sources