Exécuter la boucle loop du robot dans une tâche en parallèle en Python

Dans les articles précédents, à partir du moment où le robot est lancé par l'invocation de la méthode run(), le seul moyen de l'arrêter est de procéder à une interruption brutale du programme, soit en cliquant le petit carré rouge de la barre de debug de VSCode, soit en cliquant le bouton retour de la brique EV3.
Pourtant il est légitime de vouloir contrôler l'arrêt du robot par programmation.
Le présent article, présente une astuce pour conserver le contrôle du robot en exécutant la boucle sans fin dans une tâche en parallèle.

Description du problème

Lorsqu'on examine le programme de l'article intitulé « Programmation objet d'un robot MindStrom en Pyhton », la cause évidente de l'impossibilité d'interrompre le programme est la boucle sans fin while True:. En effet, la constante True de Python satisfait définitivement la condition de l'instruction while. Et celle-ci boucle à l'infini.
Intuitivement, il est facile de remplacer la constante True par une variable _is_running qui serait initialisée à True au démarrage du programme et que l'on pourrait remettre à False dans une méthode stop() comme le suggère le code ci-dessous :
#!/usr/bin/env python3

import time
from ev3dev2.motor import MoveTank, OUTPUT_B, OUTPUT_C
from ev3dev2.sensor.lego import InfraredSensor
from ev3dev2.sensor import INPUT_4


class IRControlledTank():

    DEFAULT_MOTORS = MoveTank(OUTPUT_B, OUTPUT_C)
    DEFAULT_IRSENSOR = InfraredSensor(INPUT_4)
    DEFAULT_SPEED = 50

    def __init__(self, motors=DEFAULT_MOTORS,
            irsensor=DEFAULT_IRSENSOR,
            speed=DEFAULT_SPEED):
        self._motors = motors
        self._irsensor = irsensor
        self._speed = speed
        self._is_running = False

    def run(self):
        self.setup()
        while self._is_running:
            self.loop()

    def start(self):
        self._is_running = True
        self.run()

    def stop(self):
        self._is_running = False

    def setup(self):
        pass

    def loop(self):
        speed_left, speed_right = 0, 0
        if self._irsensor.top_left(1):
            speed_left = self._speed
        if self._irsensor.bottom_left(1):
            speed_left = -self._speed
        if self._irsensor.top_right(1):
            speed_right = self._speed
        if self._irsensor.bottom_right(1):
            speed_right = -self._speed
        self._motors.on(speed_left, speed_right)


if __name__ == '__main__':
    robot = IRControlledTank()
    robot.start()
    time.sleep(180)
    robot.stop()
A la fin du programme, on espère, grâce à l'instruction time.sleep(180), que le programme s'interrompt après trois minute, puisqu'une invocation à la méthode stop() du robot est effectuée.
En fait, ni l'instruction time.sleep(180), ni l'instruction robot.stop() ne pourront être exécutées. Un fois la boucle lancée, jamais elles ne seront atteintes. Il faudrait attendre que l'instruction robot.start() se termine. Ce qui est impossible, puisque rien ne vient modifier la variable _is_running dans l'exécution de la boucle et que celle-ci de donnera pas la main pour pouvoir invoquer la méthode stop().
Le seul moyen de sortir de cette impasse est d'exécuter la boucle dans une tâche parallèle. Ce qui est un pratique courante en robotique.

Programmation de la boucle du robot dans une tâche parallèle

A l'examen du code ci-dessous, la tâche du robot correspond à la fonction run(). Il suffit donc de faire en sorte que celle-ci soit lancée dans une tâche parallèle. En python, cela est relativement simple comme le montre le code ci-dessous :

Codage

#!/usr/bin/env python3

#1#
import time
from threading import Thread
from ev3dev2.motor import MoveTank, OUTPUT_B, OUTPUT_C
from ev3dev2.sensor.lego import InfraredSensor
from ev3dev2.sensor import INPUT_4


class IRControlledTank():

    DEFAULT_MOTORS = MoveTank(OUTPUT_B, OUTPUT_C)
    DEFAULT_IRSENSOR = InfraredSensor(INPUT_4)
    DEFAULT_SPEED = 50

    #2#
    def __init__(self, motors=DEFAULT_MOTORS, irsensor=DEFAULT_IRSENSOR, speed=DEFAULT_SPEED):
        self._motors = motors
        self._irsensor = irsensor
        self._speed = speed
        self._is_running = False

    def run(self):
        self.setup()
        while self._is_running:
            self.loop()

    #3#
    def start(self):
        self._is_running = True
        task = Thread(target=self.run)
        task.start()

    #4#
    def stop(self):
        self._is_running = False

    def setup(self):
        pass

    def loop(self):
        speed_left, speed_right = 0, 0
        if self._irsensor.top_left(1):
            speed_left = self._speed
        if self._irsensor.bottom_left(1):
            speed_left = -self._speed
        if self._irsensor.top_right(1):
            speed_right = self._speed
        if self._irsensor.bottom_right(1):
            speed_right = -self._speed
        self._motors.on(speed_left, speed_right)


if __name__ == '__main__':
    robot = IRControlledTank()
    #5#
    robot.start()
    time.sleep(180)
    robot.stop()

Explications

  1. Comme la classe Thread est utilisée pour lancer la boucle en parallèle, celle-ci doit être importée à partir du module  threading.
  2. Une variable d'instance _is_running est ajoutée dans la fonction de construction __init__(). A la construction elle est initialisée à False.
  3. Dans la méthode start(), on positionne _is_running à True. Puis on instancie un objet Thread en passant en paramètre la méthode run() qui sera exécutée en parallèle. Puis l'objet Thread est lancé en invoquant dessus la méthode start(). A partir de ce moment, la méthode run() est exécutée et comme elle contient la boucle et que la variable _is_running est initialisée à True, elle va fonctionner en boucle jusqu'à ce que la variable _is_running soit à nouveau remise à False. en invoquant la méthode stop() par exemple.
  4. Pour arrêter le robot, il suffit d'invoquer la méthode stop(). Celle-ci positionne à False la variable _is_running. La condition de la boucle n'étant pas satisfaite, la tâche run() s'achève et le robot s'arrête.
  5. Après avoir été instancié, le robot est démarré en invoquant la méthode start(). Cette méthode crée une tâche dans laquelle la méthode run() (qui contient la boucle) est exécutée en parallèle. Puis la méthode start() étant achevée, celle-ci donne la main immédiatement à l'instruction suivante (alors que la boucle est toujours en exécution). Invoquer immédiatement après la méthode stop() va arrêter le robot sans que celui-ci ait eu le temps de fonctionner. C'est pour cela que l'on invoque le méthode sleep(180) pendant trois minutes (180 secondes). Pendant ces trois minutes, le robot va fonctionner puis, la méthode stop() étant invoquée, va s'arrêter et le programme va se terminer normalement.

Conclusion

Ce dernier article présente l'exécution de la boucle du robot dans une seule tâche en parallèle. Mais, pour des robots plus sophistiqués, il sera sans doute nécessaire d'exécuter plusieurs boucles en parallèle. Un futur article présentera un squelette de robot permettant de contrôler plusieurs tâches.
Cet article conclut la série d'articles permettant de s’approprier la programmation objet d'un robot Lego MindStorm EV3 en Python sur la base du robot Explorer présenté dans l'article « Construction du robot Explorateur ». D'autres articles seront publiés pour exploiter ce robot dans des contextes différents comme l'exploration, l'apprentissage géographique, ou les relevés topographiques.

Commentaires

Posts les plus consultés de ce blog

Connecter ev3dev2 à Internet en WiFi

Connecter Visual Studio Code à un robot MindStorm EV3 avec ev3dev-browser

Installer les modules EV3DEV2 sur Python