One place for hosting & domains

      ThreadPoolExecutor

      Verwenden von ThreadPoolExecutor in Python 3


      Der Autor hat den COVID-19 Relief Fund dazu ausgewählt, eine Spende im Rahmen des Programms Write for DOnations zu erhalten.

      Einführung

      Python-Threads stellen eine Form von Parallelismus dar, mit der Ihr Programm verschiedene Operationen gleichzeitig ausführen kann. Parallelismus in Python lässt sich auch durch Verwendung mehrerer Prozesse erzielen; Threads eignen sich jedoch besonders gut für die Beschleunigung von Anwendungen, die hohe I/O-Leistung benötigen.

      Beispiel: I/O-gerichtete Operationen umfassen die Erstellung von Webanfragen und das Lesen von Daten aus Dateien. Im Gegensatz zu I/O-gerichteten Operationen werden CPU-gerichtete Operationen (wie die Ausführung von Berechnungen mit der Python-Standardbibliothek) von Python-Threads nur wenig profitieren.

      Python 3 enthält das Dienstprogramm ThreadPoolExecutor zur Ausführung von Code in einem Thread.

      In diesem Tutorial werden wir ThreadPoolExecutor verwenden, um zügige Netzwerkanfragen zu erstellen. Wir werden eine Funktion definieren, die für Aufrufe innerhalb von Threads geeignet ist, ThreadPoolExecutor zur Ausführung dieser Funktion nutzen und Ergebnisse aus diesen Ausführungen verarbeiten.

      In diesem Tutorial werden wir Netzwerkanfragen stellen, um die Existenz von Wikipedia-Seiten zu überprüfen.

      Anmerkung: Die Tatsache, dass I/O-gerichtete Operationen mehr von Threads profitieren als I/O-orientierte Operationen, hängt mit einer Eigenart von Python zusammen, die_ global interpreter loc_k genannt wird. Wenn Sie möchten, können Sie in der offiziellen Python-Dokumentation mehr über „global interpreter lock“ von Python erfahren.

      Voraussetzungen

      Für eine optimale Nutzung des Tutorials empfiehlt sich Vertrautheit mit der Programmierung in Python und einer lokalen Python-Programmierumgebung mit installiertem requests-Paket.

      Sie können für die notwendigen Hintergrundinformationen diese Tutorials durchsehen:

      • pip install --user requests==2.23.0

      Schritt 1 — Definieren einer Funktion zur Ausführung in Threads

      Definieren wir zunächst eine Funktion, die wir mithilfe von Threads ausführen möchten.

      Mit nano oder Ihrem bevorzugten Texteditor/Ihrer bevorzugten Entwicklungsumgebung können Sie diese Datei öffnen:

      • nano wiki_page_function.py

      In diesem Tutorial werden wir eine Funktion schreiben, die ermittelt, ob eine Wikipedia-Seite vorhanden ist oder nicht:

      wiki_page_function.py

      import requests
      
      def get_wiki_page_existence(wiki_page_url, timeout=10):
          response = requests.get(url=wiki_page_url, timeout=timeout)
      
          page_status = "unknown"
          if response.status_code == 200:
              page_status = "exists"
          elif response.status_code == 404:
              page_status = "does not exist"
      
          return wiki_page_url + " - " + page_status
      

      Die Funktion get_wiki_page_existence akzeptiert zwei Argumente: eine URL zu einer Wikipedia-Seite (wiki_page_url) und eine timeout-Anzahl von Sekunden, während der auf eine Antwort von dieser URL gewartet werden soll.

      get_wiki_page_existence nutzt das requests-Paket, um eine Webanfrage an diese URL zu stellen. Je nach Statuscode der HTTP-Antwort wird eine Zeichenfolge zurückgegeben, die beschreibt, ob die Seite vorhanden ist oder nicht. Verschiedene Statuscodes stellen verschiedene Ergebnisse einer HTTP-Anfrage dar. Hier gehen wir davon aus, dass ein 200-Statuscode („Erfolg“) bedeutet, dass die Wikipedia-Seite existiert, und ein 404-Statuscode („Nicht gefunden“) bedeutet, dass die Wikipedia-Seite nicht existiert.

      Wie im Abschnitt zu den Voraussetzungen beschrieben, benötigen Sie das installierte requests-Paket, um diese Funktion ausführen zu können.

      Versuchen wir, die Funktion auszuführen, indem wir die url und den Funktionsaufruf nach der Funktion get_wiki_page_existence hinzufügen:

      wiki_page_function.py

      . . .
      url = "https://en.wikipedia.org/wiki/Ocean"
      print(get_wiki_page_existence(wiki_page_url=url))
      

      Nachdem Sie den Code hinzugefügt haben, speichern und schließen Sie die Datei.

      Wenn wir diesen Code ausführen:

      • python wiki_page_function.py

      Erhalten wir eine Ausgabe wie die folgende:

      Output

      https://en.wikipedia.org/wiki/Ocean - exists

      Bei Aufruf der Funktion get_wiki_page_existence mit einer gültigen Wikipedia-Seite wird eine Zeichenfolge zurückgegeben, die bestätigt, dass die Seite tatsächlich existiert.

      Achtung: Im Allgemeinen ist es nicht sicher, Python-Objekte oder -Status zwischen Threads zu teilen, ohne sorgfältig darauf zu achten, dass keine Parallelitätsfehler auftreten. Wenn Sie eine Funktion definieren, die in einem Thread ausgeführt werden soll, ist es am besten, eine Funktion festzulegen, die einen einzelnen Auftrag ausführt und den Status nicht an andere Threads weitergibt oder veröffentlicht. get_wiki_page_existence ist ein Beispiel für eine solche Funktion.

      Schritt 2 — Verwenden von ThreadPoolExecutor zur Ausführung einer Funktion in Threads

      Nachdem wir nun über eine Funktion verfügen, die sich in Threads aufrufen lässt, können wir ThreadPoolExecutor verwenden, um zügig mehrere Aufrufe dieser Funktion auszuführen.

      Fügen Sie Ihrem Programm in wiki_page_function.py den folgenden hervorgehobenen Code hinzu:

      wiki_page_function.py

      import requests
      import concurrent.futures
      
      def get_wiki_page_existence(wiki_page_url, timeout=10):
          response = requests.get(url=wiki_page_url, timeout=timeout)
      
          page_status = "unknown"
          if response.status_code == 200:
              page_status = "exists"
          elif response.status_code == 404:
              page_status = "does not exist"
      
          return wiki_page_url + " - " + page_status
      
      wiki_page_urls = [
          "https://en.wikipedia.org/wiki/Ocean",
          "https://en.wikipedia.org/wiki/Island",
          "https://en.wikipedia.org/wiki/this_page_does_not_exist",
          "https://en.wikipedia.org/wiki/Shark",
      ]
      with concurrent.futures.ThreadPoolExecutor() as executor:
          futures = []
          for url in wiki_page_urls:
              futures.append(executor.submit(get_wiki_page_existence, wiki_page_url=url))
          for future in concurrent.futures.as_completed(futures):
              print(future.result())
      

      Werfen wir einen Blick auf die Funktionsweise dieses Codes:

      • concurrent.futures wird importiert, um uns Zugriff auf ThreadPoolExecutor zu gewähren.
      • Eine with-Anweisung dient der Erstellung eines ThreadPoolExecutor-Instanz-Executors, der Threads unmittelbar nach dem Abschluss bereinigt.
      • Vier Aufträge werden dem Executor übergeben: einer für jede der URLs in der Liste wiki_page_urls.
      • Jeder Aufruf an submit gibt eine Future-Instanz zurück, die in der futures-Liste gespeichert ist.
      • Die Funktion as_completed wartet, bis jeder Future get_wiki_page_existence-Aufruf abgeschlossen ist, damit wir das Ergebnis ausgeben können.

      Wenn wir dieses Programm mit dem folgenden Befehl erneut ausführen:

      • python wiki_page_function.py

      Erhalten wir eine Ausgabe wie die folgende:

      Output

      https://en.wikipedia.org/wiki/Island - exists https://en.wikipedia.org/wiki/Ocean - exists https://en.wikipedia.org/wiki/this_page_does_not_exist - does not exist https://en.wikipedia.org/wiki/Shark - exists

      Diese Ausgabe ergibt Sinn: drei der URLs sind gültige Wikipedia-Seiten, eine nicht (this_page_does_not_exist). Beachten Sie, dass Ihre Ausgabe eine andere Reihenfolge aufweisen kann als diese Ausgabe. Die Funktion concurrent.futures.as_completed in diesem Beispiel gibt Ergebnisse zurück, sobald sie verfügbar sind. Dabei ist es egal, in welcher Reihenfolge die Aufträge übermittelt wurden.

      Schritt 3 — Vearbeiten von Ausnahmen bei Funktionsausführungen in Threads

      Im vorherigen Schritt hat get_wiki_page_existence bei allen unseren Aufrufen erfolgreich einen Wert zurückgegeben. In diesem Schritt sehen wir, dass ThreadPoolExecutor auch Ausnahmen auslösen kann, die in Threaded-Funktionsaufrufen generiert werden.

      Betrachten wir den folgenden beispielhaften Codeblock:

      wiki_page_function.py

      import requests
      import concurrent.futures
      
      
      def get_wiki_page_existence(wiki_page_url, timeout=10):
          response = requests.get(url=wiki_page_url, timeout=timeout)
      
          page_status = "unknown"
          if response.status_code == 200:
              page_status = "exists"
          elif response.status_code == 404:
              page_status = "does not exist"
      
          return wiki_page_url + " - " + page_status
      
      
      wiki_page_urls = [
          "https://en.wikipedia.org/wiki/Ocean",
          "https://en.wikipedia.org/wiki/Island",
          "https://en.wikipedia.org/wiki/this_page_does_not_exist",
          "https://en.wikipedia.org/wiki/Shark",
      ]
      with concurrent.futures.ThreadPoolExecutor() as executor:
          futures = []
          for url in wiki_page_urls:
              futures.append(
                  executor.submit(
                      get_wiki_page_existence, wiki_page_url=url, timeout=0.00001
                  )
              )
          for future in concurrent.futures.as_completed(futures):
              try:
                  print(future.result())
              except requests.ConnectTimeout:
                  print("ConnectTimeout.")
      

      Dieser Codeblock ist fast identisch mit dem, den wir in Schritt 2 verwendet haben; er weist jedoch zwei wichtige Unterschiede auf:

      • Wir übergeben nun timeout=0.00001 an get_wiki_page_existence. Da das requests-Paket seine Webanfrage an Wikipedia in 0,00001 Sekunden nicht abschließen kann, wird eine ConnectTimeout-Ausnahme ausgelöst.
      • Wir erfassen ConnectTimeout-Ausnahmen, die durch future.result() ausgelöst werden, und drucken dabei jedes Mal eine Zeichenfolge aus.

      Wenn wir das Programm erneut ausführen, sehen wir die folgende Ausgabe:

      Output

      ConnectTimeout. ConnectTimeout. ConnectTimeout. ConnectTimeout.

      Vier ConnectTimeout-Nachrichten werden ausgegeben (eine für jede unserer vier wiki_page_urls), da keine davon in 0,00001 Sekunden abgeschlossen werden konnte und jede der vier get_wiki_page_existence-Aufrufe eine ConnectTimeout-Ausnahme ausgelöst hat.

      Sie haben gesehen, dass wenn ein Funktionsaufruf an einen ThreadPoolExecutor eine Ausnahme auslöst, diese Ausnahme normalerweise durch Aufruf von Future.result ausgelöst werden kann. Ein Aufruf von Future.result bei all Ihren übermittelten Aufrufen stellt sicher, dass Ihr Programm keine Ausnahmen verpasst, die von Ihrer Threaded-Funktion ausgelöst werden.

      Schritt 4 — Vergleichen der Ausführungszeit mit und ohne Threads

      Überprüfen wir nun, ob die Verwendung von ThreadPoolExecutor Ihr Programm tatsächlich schneller macht.

      Lassen Sie uns zunächst die Ausführung von get_wiki_page_existence ohne Threads messen:

      wiki_page_function.py

      import time
      import requests
      import concurrent.futures
      
      
      def get_wiki_page_existence(wiki_page_url, timeout=10):
          response = requests.get(url=wiki_page_url, timeout=timeout)
      
          page_status = "unknown"
          if response.status_code == 200:
              page_status = "exists"
          elif response.status_code == 404:
              page_status = "does not exist"
      
          return wiki_page_url + " - " + page_status
      
      wiki_page_urls = ["https://en.wikipedia.org/wiki/" + str(i) for i in range(50)]
      
      print("Running without threads:")
      without_threads_start = time.time()
      for url in wiki_page_urls:
          print(get_wiki_page_existence(wiki_page_url=url))
      print("Without threads time:", time.time() - without_threads_start)
      

      Im Codebeispiel rufen wir unsere get_wiki_page_existence-Funktion mit fünfzig verschiedenen URLs von Wikipedia-Seiten hintereinander auf. Wir verwenden die Funktion time.time(), um die Anzahl der Sekunden auszugeben, die für die Ausführung unseres Programms benötigt wurde.

      Wenn wir diesen Code wie zuvor erneut ausführen, erhalten wir eine Ausgabe wie die folgende:

      Output

      Running without threads: https://en.wikipedia.org/wiki/0 - exists https://en.wikipedia.org/wiki/1 - exists . . . https://en.wikipedia.org/wiki/48 - exists https://en.wikipedia.org/wiki/49 - exists Without threads time: 5.803015232086182

      Einträge 2 bis 47 in dieser Ausgabe wurden der Kürze halber ausgelassen.

      Die Anzahl der Sekunden, die nach Without threads time (Zeit ohne Threads) ausgegeben wird, wird sich bei Ausführung auf Ihrem Computer unterscheiden. Das ist in Ordnung; Sie erhalten einfach eine Baseline-Zahl, die Sie mit einer Lösung vergleichen können, die ThreadPoolExecutor nutzt. In diesem Fall waren es ~5,803 Sekunden.

      Führen wir nun die gleichen fünfzig Wikipedia-URLs über get_wiki_page_existence aus, diesmal jedoch mit ThreadPoolExecutor:

      wiki_page_function.py

      import time
      import requests
      import concurrent.futures
      
      
      def get_wiki_page_existence(wiki_page_url, timeout=10):
          response = requests.get(url=wiki_page_url, timeout=timeout)
      
          page_status = "unknown"
          if response.status_code == 200:
              page_status = "exists"
          elif response.status_code == 404:
              page_status = "does not exist"
      
          return wiki_page_url + " - " + page_status
      wiki_page_urls = ["https://en.wikipedia.org/wiki/" + str(i) for i in range(50)]
      
      print("Running threaded:")
      threaded_start = time.time()
      with concurrent.futures.ThreadPoolExecutor() as executor:
          futures = []
          for url in wiki_page_urls:
              futures.append(executor.submit(get_wiki_page_existence, wiki_page_url=url))
          for future in concurrent.futures.as_completed(futures):
              print(future.result())
      print("Threaded time:", time.time() - threaded_start)
      

      Der Code ist der gleiche Code, den wir in Schritt 2 erstellt haben; diesmal enthält er jedoch zusätzlich einige Druckanweisungen, um die Anzahl der Sekunden anzuzeigen, die zur Ausführung unseres Codes benötigt wurden.

      Wenn wir das Programm erneut ausführen, erhalten wir die folgende Ausgabe:

      Output

      Running threaded: https://en.wikipedia.org/wiki/1 - exists https://en.wikipedia.org/wiki/0 - exists . . . https://en.wikipedia.org/wiki/48 - exists https://en.wikipedia.org/wiki/49 - exists Threaded time: 1.2201685905456543

      Auch die Anzahl der Sekunden, die nach Threaded time (Zeit mit Threads) ausgegeben wird, wird sich auf Ihrem Computer unterscheiden (ebenso die Reihenfolge Ihrer Ausgabe).

      Jetzt können Sie die Ausführungszeit beim Abrufen der fünfzig URLs von Wikipedia-Seiten mit und ohne Threads miteinander vergleichen.

      Auf dem in diesem Tutorial verwendeten Rechner dauerte es ohne Threads ~5,803 Sekunden; mit Threads waren es ~1,220 Sekunden. Unser Programm lief mit Threads also deutlich schneller.

      Zusammenfassung

      In diesem Tutorial haben Sie erfahren, wie Sie das Dienstprogramm ThreadPoolExecutor in Python 3 verwenden können, um I/O-gerichteten Code effizient auszuführen. Sie haben eine Funktion erstellt, die sich für Aufrufe innerhalb von Threads eignet, gelernt, wie man sowohl Ausgaben als auch Ausnahmen von Threaded-Ausführungen dieser Funktion abruft, und den Leistungsschub beobachten können, der durch Verwendung von Threads entsteht.

      Nun können Sie mehr über andere Parallelitätsfunktionen des concurrent.futures-Moduls erfahren.



      Source link

      Cómo usar ThreadPoolExecutor en Python 3


      El autor seleccionó el COVID-19 Relief Fund para que reciba una donación como parte del programa Write for DOnations.

      Introducción

      Los subprocesos de Python son una especie de paralelismo que le permiten a su programa ejecutar varios procedimientos a la vez. Este paralelismo en Python también se puede lograr utilizando varios procesos, pero los subprocesos son particularmente adecuados para acelerar las aplicaciones que implican una cantidad considerable de E/S (entrada/salida).

      Algunos ejemplos de operaciones limitadas por las E/S son la realización de solicitudes web y la lectura de datos de archivos. A diferencia de las operaciones limitadas por las E/S, las operaciones limitadas por la CPU (como realizar cálculos matemáticos con la biblioteca estándar de Python) no se benefician mucho de los subprocesos de Python.

      Python 3 incluye la utilidad ThreadPoolExecutor para ejecutar código en subprocesos.

      En este tutorial, utilizaremos ThreadPoolExecutor para realizar solicitudes de red de forma rápida. Definiremos una función que se pueda invocar desde subprocesos, utilizaremos ThreadPoolExecutor para ejecutar esa función y procesaremos los resultados de esas ejecuciones.

      Para los fines de este tutorial, realizaremos solicitudes de red para verificar la existencia de páginas de Wikipedia.

      Nota: El hecho de que las operaciones limitadas por las E/S se beneficien más de los subprocesos que las limitadas por la CPU se debe a una idiosincrasia de Python denominada bloqueo global de intérpretes. Si desea obtener más información sobre el bloqueo global de intérpretes de Python, puede consultar la documentación oficial de Python.

      Requisitos previos

      Para aprovechar este tutorial al máximo, es recomendable tener algunos conocimientos de programación en Python y un entorno de programación local de Python con requests instalado.

      Puede revisar estos tutoriales para encontrar la información básica necesaria:

      • pip install --user requests==2.23.0

      Paso 1: Definir una función que se ejecute en subprocesos

      Vamos a comenzar por definir una función que nos gustaría ejecutar con la ayuda de subprocesos.

      Puede abrir este archivo usando nano o el editor de texto o entorno de desarrollo que prefiera:

      • nano wiki_page_function.py

      Para los fines de este tutorial, vamos a escribir una función que determine si una página de Wikipedia existe o no:

      wiki_page_function.py

      import requests
      
      def get_wiki_page_existence(wiki_page_url, timeout=10):
          response = requests.get(url=wiki_page_url, timeout=timeout)
      
          page_status = "unknown"
          if response.status_code == 200:
              page_status = "exists"
          elif response.status_code == 404:
              page_status = "does not exist"
      
          return wiki_page_url + " - " + page_status
      

      La función get_wiki_page_existence acepta dos argumentos: una URL a una página de Wikipedia (wiki_page_url) y un timeout que indica la cantidad de segundos que se debe esperar una respuesta de esa URL.

      get_wiki_page_existence utiliza el paquete requests para realizar una solicitud web a esa URL. Se devuelve una cadena que indica si la página existe o no dependiendo del código de estado de response, la respuesta HTTP. Los diferentes códigos de estado representan los distintos resultados que puede tener una solicitud HTTP. En este procedimiento, se asume que un código de estado 200, “correcto”, indica que la página de Wikipedia existe y un código de estado 404, “no encontrada”, que la página de Wikipedia no existe.

      Como se describe en la sección Requisitos previos, deberá tener el paquete requests instalado para poder ejecutar esta función.

      Intentemos ejecutar la función añadiendo la url y la invocación a la función después de la función get_wiki_page_existence:

      wiki_page_function.py

      . . .
      url = "https://en.wikipedia.org/wiki/Ocean"
      print(get_wiki_page_existence(wiki_page_url=url))
      

      Una vez que haya añadido el código, guarde y cierre el archivo.

      Si ejecutamos este código:

      • python wiki_page_function.py

      Veremos un resultado como el siguiente:

      Output

      https://en.wikipedia.org/wiki/Ocean - exists

      La invocación de la función get_wiki_page_existence con una página de Wikipedia válida devuelve una cadena que confirma que la página efectivamente existe.

      Advertencia: En general, no es seguro compartir objetos o estados de Python entre subprocesos sin tener especial cuidado para evitar errores de simultaneidad. A la hora de definir una función que se ejecute en un subproceso, lo mejor es definir una que realice una sola tarea y no publique ni comparta su estado con otros subprocesos. La función get_wiki_page_existence es un ejemplo de una función con estas características.

      Paso 2: Usar ThreadPoolExecutor para ejecutar una función en subprocesos

      Ahora que tenemos una función que se puede invocar con subprocesos, podemos usar ThreadPoolExecutor para invocar esa función varias veces de forma rápida.

      Agregue el siguiente código resaltado a su programa en wiki_page_function.py:

      wiki_page_function.py

      import requests
      import concurrent.futures
      
      def get_wiki_page_existence(wiki_page_url, timeout=10):
          response = requests.get(url=wiki_page_url, timeout=timeout)
      
          page_status = "unknown"
          if response.status_code == 200:
              page_status = "exists"
          elif response.status_code == 404:
              page_status = "does not exist"
      
          return wiki_page_url + " - " + page_status
      
      wiki_page_urls = [
          "https://en.wikipedia.org/wiki/Ocean",
          "https://en.wikipedia.org/wiki/Island",
          "https://en.wikipedia.org/wiki/this_page_does_not_exist",
          "https://en.wikipedia.org/wiki/Shark",
      ]
      with concurrent.futures.ThreadPoolExecutor() as executor:
          futures = []
          for url in wiki_page_urls:
              futures.append(executor.submit(get_wiki_page_existence, wiki_page_url=url))
          for future in concurrent.futures.as_completed(futures):
              print(future.result())
      

      Veamos cómo funciona este código:

      • concurrent.futures se importa para darnos acceso a ThreadPoolExecutor.
      • Se utiliza una instrucción with para crear un executor de una instancia de ThreadPoolExecutor que limpia de forma rápida los subprocesos al completarse.
      • Se envían (submit) cuatro tareas al executor para cada una de las URL de la lista wiki_page_urls.
      • Cada invocación a submit devuelve una instancia Future que se almacena en la lista de futures.
      • La función as_completed espera que se complete cada invocación a get_wiki_page_existence de Future para que podamos imprimir su resultado.

      Si volvemos a ejecutar este programa con el siguiente comando:

      • python wiki_page_function.py

      Veremos un resultado como el siguiente:

      Output

      https://en.wikipedia.org/wiki/Island - exists https://en.wikipedia.org/wiki/Ocean - exists https://en.wikipedia.org/wiki/this_page_does_not_exist - does not exist https://en.wikipedia.org/wiki/Shark - exists

      Este resultado es lógico: tres de las URL son páginas de Wikipedia válidas y una de ellas, this_page_does_not_exist, no lo es. Tenga en cuenta que su resultado puede tener un orden distinto a este. La función concurrent.futures.as_completed de este ejemplo devuelve resultados tan pronto estén disponibles, independientemente del orden en que se presentaron las tareas.

      Paso 3: Procesar excepciones de funciones ejecutadas en subprocesos

      En el paso anterior, get_wiki_page_existence devolvió correctamente un valor para todas nuestras invocaciones. En este paso, veremos que ThreadPoolExecutor también puede crear excepciones generadas en invocaciones de funciones con subprocesos.

      Consideremos el siguiente bloque de código de ejemplo:

      wiki_page_function.py

      import requests
      import concurrent.futures
      
      
      def get_wiki_page_existence(wiki_page_url, timeout=10):
          response = requests.get(url=wiki_page_url, timeout=timeout)
      
          page_status = "unknown"
          if response.status_code == 200:
              page_status = "exists"
          elif response.status_code == 404:
              page_status = "does not exist"
      
          return wiki_page_url + " - " + page_status
      
      
      wiki_page_urls = [
          "https://en.wikipedia.org/wiki/Ocean",
          "https://en.wikipedia.org/wiki/Island",
          "https://en.wikipedia.org/wiki/this_page_does_not_exist",
          "https://en.wikipedia.org/wiki/Shark",
      ]
      with concurrent.futures.ThreadPoolExecutor() as executor:
          futures = []
          for url in wiki_page_urls:
              futures.append(
                  executor.submit(
                      get_wiki_page_existence, wiki_page_url=url, timeout=0.00001
                  )
              )
          for future in concurrent.futures.as_completed(futures):
              try:
                  print(future.result())
              except requests.ConnectTimeout:
                  print("ConnectTimeout.")
      

      Este bloque de código es casi idéntico al que utilizamos en el paso 2, pero tiene dos diferencias clave:

      • Ahora, pasamos timeout=0.00001 a get_wiki_page_existence. Como el paquete requests no puede completar su solicitud web a Wikipedia en 0.00001 segundos, creará una excepción ConnectTimeout.
      • Tomamos las excepciones ConnectTimeout que generó future.result() e imprimimos una cadena cada vez que lo hacemos.

      Si volvemos a ejecutar el programa, veremos el siguiente resultado:

      Output

      ConnectTimeout. ConnectTimeout. ConnectTimeout. ConnectTimeout.

      Se imprimen cuatro mensajes de ConnectTimeout (uno para cada una de nuestras cuatro wiki_page_urls), dado que no se pudo completar ninguna de ellas en 0.00001 segundos y cada una de las cuatro invocaciones a get_wiki_page_existence generó la excepción ConnectTimeout.

      Aprendió que si la invocación a una función enviada a ThreadPoolExecutor crea una excepción, esa excepción se puede generar normalmente al invocar Future.result. Invocar Future.result en todas sus invocaciones enviadas garantiza que su programa no omita ninguna excepción que se haya generado a partir de su función de subprocesos.

      Paso 4: Comparar el tiempo de ejecución con y sin subprocesos

      Ahora, vamos a verificar que usar ThreadPoolExecutor efectivamente hace que su programa sea más rápido.

      Primero, vamos a medir el tiempo de ejecución de get_wiki_page_existence sin subprocesos:

      wiki_page_function.py

      import time
      import requests
      import concurrent.futures
      
      
      def get_wiki_page_existence(wiki_page_url, timeout=10):
          response = requests.get(url=wiki_page_url, timeout=timeout)
      
          page_status = "unknown"
          if response.status_code == 200:
              page_status = "exists"
          elif response.status_code == 404:
              page_status = "does not exist"
      
          return wiki_page_url + " - " + page_status
      
      wiki_page_urls = ["https://en.wikipedia.org/wiki/" + str(i) for i in range(50)]
      
      print("Running without threads:")
      without_threads_start = time.time()
      for url in wiki_page_urls:
          print(get_wiki_page_existence(wiki_page_url=url))
      print("Without threads time:", time.time() - without_threads_start)
      

      En el código de ejemplo, invocamos nuestra función get_wiki_page_existence con cincuenta URL de páginas de Wikipedia distintas, una por una. Usamos la función time.time() para imprimir la cantidad de segundos que toma la ejecución de nuestro programa.

      Si volvemos a ejecutar este código como antes, veremos un resultado similar al siguiente:

      Output

      Running without threads: https://en.wikipedia.org/wiki/0 - exists https://en.wikipedia.org/wiki/1 - exists . . . https://en.wikipedia.org/wiki/48 - exists https://en.wikipedia.org/wiki/49 - exists Without threads time: 5.803015232086182

      Se omitieron las entradas 2 a 47 de este resultado para mayor brevedad.

      La cantidad de segundos que se imprima después de Without threads time será distinta cuando lo ejecute en su máquina, lo que es normal, dado que solo está recibiendo un número de referencia para realizar una comparación con una solución que utiliza ThreadPoolExecutor. En este caso, tomó ~5.803 segundos.

      Ejecutemos las mismas cincuenta URL de Wikipedia a través de get_wiki_page_existence, pero, esta vez, utilizando ThreadPoolExecutor:

      wiki_page_function.py

      import time
      import requests
      import concurrent.futures
      
      
      def get_wiki_page_existence(wiki_page_url, timeout=10):
          response = requests.get(url=wiki_page_url, timeout=timeout)
      
          page_status = "unknown"
          if response.status_code == 200:
              page_status = "exists"
          elif response.status_code == 404:
              page_status = "does not exist"
      
          return wiki_page_url + " - " + page_status
      wiki_page_urls = ["https://en.wikipedia.org/wiki/" + str(i) for i in range(50)]
      
      print("Running threaded:")
      threaded_start = time.time()
      with concurrent.futures.ThreadPoolExecutor() as executor:
          futures = []
          for url in wiki_page_urls:
              futures.append(executor.submit(get_wiki_page_existence, wiki_page_url=url))
          for future in concurrent.futures.as_completed(futures):
              print(future.result())
      print("Threaded time:", time.time() - threaded_start)
      

      El código es el mismo que el que creamos en el Paso 2, solo agregamos algunas instrucciones de impresión que nos muestran la cantidad de segundos que toma la ejecución de nuestro código.

      Si volvemos a ejecutar el programa, veremos lo siguiente:

      Output

      Running threaded: https://en.wikipedia.org/wiki/1 - exists https://en.wikipedia.org/wiki/0 - exists . . . https://en.wikipedia.org/wiki/48 - exists https://en.wikipedia.org/wiki/49 - exists Threaded time: 1.2201685905456543

      Nuevamente, la cantidad de segundos que se imprime después de Threaded time será diferente en su computadora (al igual que el orden de su resultado).

      Ahora, puede comparar el tiempo de ejecución de la búsqueda de las cincuenta URL de páginas de Wikipedia con y sin subprocesos.

      En la máquina utilizada en este tutorial, la ejecución sin subprocesos llevó ~5.803 segundos y con subprocesos, ~1.220 segundos. Nuestro programa se ejecutó considerablemente más rápido con subprocesos.

      Conclusión

      En este tutorial, aprendió a usar la utilidad ThreadPoolExecutor de Python 3 para ejecutar de forma eficiente código limitado por las E/S. Creó una función que se puede invocar desde subprocesos, aprendió a recuperar resultados y excepciones de ejecuciones de esa función y notó la mejora de desempeño que se obtiene al utilizar subprocesos.

      Ahora, puede obtener más información sobre otras funciones de simultaneidad que ofrece el módulo concurrent.futures.



      Source link

      Comment utiliser ThreadPoolExecutor en Python 3


      L’auteur a choisi le COVID-19 Relief Fund pour recevoir un don dans le cadre du programme Write for DOnations.

      Introduction

      Les threads Python sont une forme de parallélisme qui permet à votre programme d’exécuter plusieurs procédures à la fois. Le parallélisme en Python peut également être réalisé en utilisant des processus multiples, mais les threads sont particulièrement bien adaptés pour accélérer les applications qui impliquent des quantités importantes d’entrées/sorties (input/output).

      Les opérations liées aux entrées/sorties comprennent, par exemple, les requêtes web et la lecture des données des fichiers. Contrairement aux opérations liées aux entrées/sorties, les opérations liées au CPU (comme l’exécution de calculs mathématiques avec la bibliothèque standard Python) ne bénéficieront pas beaucoup des threads Python.

      Python 3 inclut l’utilitaire ThreadPoolExecutor pour exécuter du code dans un thread.

      Au cours de ce tutoriel, nous utiliserons ThreadPoolExecutor pour effectuer rapidement des requêtes réseau. Nous allons définir une fonction bien adaptée à l’invocation dans les threads, utiliser ThreadPoolExecutor pour exécuter cette fonction, et traiter les résultats de ces exécutions.

      Pour ce tutoriel, nous allons faire des requêtes réseau pour vérifier l’existence de pages Wikipédia.

      Note : le fait que les opérations liées aux entrées/sorties bénéficient davantage des threads que les opérations liées au CPU est causé par une idiosyncrasie en Python appelée verrouillage global de l’interpréteur. Si vous le souhaitez, vous pouvez en apprendre davantage sur le verrouillage global de l’interpréteur de Python dans la documentation officielle de Python.

      Conditions préalables

      Pour tirer le meilleur parti de ce tutoriel, il est recommandé de se familiariser avec la programmation en Python et d’avoir un environnement de programmation Python local avec des requêtes installé.

      Vous pouvez consulter ces tutoriels pour obtenir les informations de base nécessaires :

      • pip install --user requests==2.23.0

      Étape 1 — Définition d’une fonction à exécuter en threads

      Commençons par définir une fonction que nous aimerions exécuter à l’aide de threads.

      En utilisant nano ou votre éditeur de texte/environnement de développement préféré, vous pouvez ouvrir ce fichier :

      • nano wiki_page_function.py

      Pour ce tutoriel, nous allons écrire une fonction qui détermine si une page Wikipédia existe ou non :

      wiki_page_function.py

      import requests
      
      def get_wiki_page_existence(wiki_page_url, timeout=10):
          response = requests.get(url=wiki_page_url, timeout=timeout)
      
          page_status = "unknown"
          if response.status_code == 200:
              page_status = "exists"
          elif response.status_code == 404:
              page_status = "does not exist"
      
          return wiki_page_url + " - " + page_status
      

      La fonction get_wiki_page_existence accepte deux arguments : une URL vers une page Wikipédia (wiki_page_url), et un timeout de quelques secondes pour obtenir une réponse de cette URL.

      get_wiki_page_existence utilise le paquet requests pour faire une requête web à cette URL. En fonction du code d’état de la response HTTP, une chaîne de caractères qui décrit si la page existe ou non est renvoyée. Les différents codes d’état représentent les différents résultats d’une requête HTTP. Cette procédure suppose qu’un code d’état 200 “réussi” signifie que la page Wikipédia existe, et qu’un code d’état 404 “non trouvé” signifie que la page Wikipédia n’existe pas.

      Comme décrit dans la section Prérequis, vous aurez besoin du paquet requests installé pour exécuter cette fonction.

      Essayons d’exécuter la fonction en ajoutant l’url et l’appel de fonction après la fonction get_wiki_page_existence :

      wiki_page_function.py

      . . .
      url = "https://en.wikipedia.org/wiki/Ocean"
      print(get_wiki_page_existence(wiki_page_url=url))
      

      Une fois que vous avez ajouté le code, enregistrez et fermez le fichier.

      Si nous exécutons ce code :

      • python wiki_page_function.py

      Nous verrons une sortie comme celle-ci :

      Output

      https://en.wikipedia.org/wiki/Ocean - exists

      L’appel de la fonction get_wiki_page_existence avec une page Wikipédia valide renvoie une chaîne de caractères qui confirme que la page existe bel et bien.

      Avertissement : en général, il n’est pas sûr de partager des objets ou des états Python entre les threads sans prendre un soin particulier pour éviter les bogues de concurrence. Lors de la définition d’une fonction à exécuter dans un thread, il est préférable de définir une fonction qui effectue un seul travail et qui ne partage ni ne publie l’état à d’autres threads. get_wiki_page_existence est un exemple d’une telle fonction.

      Étape 2 — Utilisation de ThreadPoolExecutor pour exécuter une fonction dans Threads

      Maintenant que nous disposons d’une fonction bien adaptée à l’invocation avec des threads, nous pouvons utiliser ThreadPoolExecutor pour effectuer de multiples invocations de cette fonction de manière opportune.

      Ajoutons le code surligné suivant à votre programme dans wiki_page_function.py :

      wiki_page_function.py

      import requests
      import concurrent.futures
      
      def get_wiki_page_existence(wiki_page_url, timeout=10):
          response = requests.get(url=wiki_page_url, timeout=timeout)
      
          page_status = "unknown"
          if response.status_code == 200:
              page_status = "exists"
          elif response.status_code == 404:
              page_status = "does not exist"
      
          return wiki_page_url + " - " + page_status
      
      wiki_page_urls = [
          "https://en.wikipedia.org/wiki/Ocean",
          "https://en.wikipedia.org/wiki/Island",
          "https://en.wikipedia.org/wiki/this_page_does_not_exist",
          "https://en.wikipedia.org/wiki/Shark",
      ]
      with concurrent.futures.ThreadPoolExecutor() as executor:
          futures = []
          for url in wiki_page_urls:
              futures.append(executor.submit(get_wiki_page_existence, wiki_page_url=url))
          for future in concurrent.futures.as_completed(futures):
              print(future.result())
      

      Voyons comment ce code fonctionne :

      • concurrent.futures est importé pour nous donner accès à ThreadPoolExecutor.
      • Un énoncé with est utilisé pour créer un executord’instance ThreadPoolExecutor qui nettoiera rapidement les threads dès leur achèvement.
      • Quatre emplois sont submitted à l’executor : un pour chacune des URL de la liste wiki_page_urls.
      • Chaque appel à submit renvoie une instance Future qui est stockée dans la liste futures.
      • La fonction as_completed attend que chaque appel Future get_wiki_page_existence soit terminé pour que nous puissions imprimer son résultat.

      Si nous exécutons à nouveau ce programme, avec la commande suivante :

      • python wiki_page_function.py

      Nous verrons une sortie comme celle-ci :

      Output

      https://en.wikipedia.org/wiki/Island - exists https://en.wikipedia.org/wiki/Ocean - exists https://en.wikipedia.org/wiki/this_page_does_not_exist - does not exist https://en.wikipedia.org/wiki/Shark - exists

      Cette sortie est logique : 3 des URLs sont des pages Wikipédia valides, et l’une d’entre elles, this_page_does_not_exist, ne l’est pas. Notez que votre sortie peut être ordonnée différemment de cette sortie. Dans cet exemple, la fonction concurrent.futures.as_completed renvoie les résultats dès qu’ils sont disponibles, quel que soit l’ordre dans lequel les emplois ont été soumis.

      Étape 3 — Traitement des exceptions aux fonctions exécutées dans Threads

      Au cours de l’étape précédente, get_wiki_page_existence a réussi à retourner une valeur pour toutes nos invocations. Dans cette étape, nous verrons que ThreadPoolExecutor peut également lever les exceptions générées dans les invocations de fonctions threadées.

      Considérons l’exemple de bloc de code suivant :

      wiki_page_function.py

      import requests
      import concurrent.futures
      
      
      def get_wiki_page_existence(wiki_page_url, timeout=10):
          response = requests.get(url=wiki_page_url, timeout=timeout)
      
          page_status = "unknown"
          if response.status_code == 200:
              page_status = "exists"
          elif response.status_code == 404:
              page_status = "does not exist"
      
          return wiki_page_url + " - " + page_status
      
      
      wiki_page_urls = [
          "https://en.wikipedia.org/wiki/Ocean",
          "https://en.wikipedia.org/wiki/Island",
          "https://en.wikipedia.org/wiki/this_page_does_not_exist",
          "https://en.wikipedia.org/wiki/Shark",
      ]
      with concurrent.futures.ThreadPoolExecutor() as executor:
          futures = []
          for url in wiki_page_urls:
              futures.append(
                  executor.submit(
                      get_wiki_page_existence, wiki_page_url=url, timeout=0.00001
                  )
              )
          for future in concurrent.futures.as_completed(futures):
              try:
                  print(future.result())
              except requests.ConnectTimeout:
                  print("ConnectTimeout.")
      

      Ce bloc de code est presque identique à celui que nous avons utilisé à l’étape 2, mais il présente deux différences essentielles :

      • Nous passons maintenant timeout=0.00001 à get_wiki_page_existence. Comme le paquet requests ne pourra pas terminer sa requête web à Wikipedia en 0,00001 seconde, cela entraînera une exception ConnectTimeout.
      • Nous attrapons les exceptions ConnectTimeout soulevées par future.result() et imprimons une chaîne de caractères à chaque fois que nous le faisons.

      Si nous relançons le programme, nous obtiendrons la sortie suivante :

      Output

      ConnectTimeout. ConnectTimeout. ConnectTimeout. ConnectTimeout.

      Quatre messages ConnectTimeout sont imprimés – un pour chacun de nos quatre wiki_page_urls, car aucun d’entre eux n’a pu être terminé en 0.00001 seconde et chacun des quatre appels get_wiki_page_existence a soulevé l’exception ConnectTimeout.

      Vous avez maintenant vu que si un appel de fonction soumis à un ThreadPoolExecutor soulève une exception, cette exception peut être soulevée normalement en appelant Future.result. Appeler Future.result sur toutes vos invocations soumises garantit que votre programme ne manquera aucune exception soulevée par votre fonction threadée.

      Étape 4 — Comparaison du temps d’exécution avec et sans threads

      Maintenant, vérifions que l’utilisation de ThreadPoolExecutor rend réellement votre programme plus rapide.

      Tout d’abord, chronométrons get_wiki_page_existence si nous le faisons fonctionner sans threads :

      wiki_page_function.py

      import time
      import requests
      import concurrent.futures
      
      
      def get_wiki_page_existence(wiki_page_url, timeout=10):
          response = requests.get(url=wiki_page_url, timeout=timeout)
      
          page_status = "unknown"
          if response.status_code == 200:
              page_status = "exists"
          elif response.status_code == 404:
              page_status = "does not exist"
      
          return wiki_page_url + " - " + page_status
      
      wiki_page_urls = ["https://en.wikipedia.org/wiki/" + str(i) for i in range(50)]
      
      print("Running without threads:")
      without_threads_start = time.time()
      for url in wiki_page_urls:
          print(get_wiki_page_existence(wiki_page_url=url))
      print("Without threads time:", time.time() - without_threads_start)
      

      Dans l’exemple de code, nous appelons notre fonction get_wiki_page_existence avec cinquante URL de pages Wikipédia différentes, une par une. Nous utilisons la fonction time.time() pour imprimer le nombre de secondes qu’il faut pour exécuter notre programme.

      Si nous exécutons à nouveau ce code comme auparavant, nous obtiendrons la sortie suivante :

      Output

      Running without threads: https://en.wikipedia.org/wiki/0 - exists https://en.wikipedia.org/wiki/1 - exists . . . https://en.wikipedia.org/wiki/48 - exists https://en.wikipedia.org/wiki/49 - exists Without threads time: 5.803015232086182

      Les entrées 2–47 dans cette sortie ont été omises par concision.

      Le nombre de secondes affiché après Without threads time (Temps écoulé sans threads) sera différent lorsque vous l’exécuterez sur votre machine – ce n’est pas grave, vous obtenez juste un nombre de base à comparer avec une solution qui utilise ThreadPoolExecutor. Dans ce cas, il était de ~5.803 secondes.

      Exécutons les mêmes cinquante URLs de Wikipédia dans get_wiki_page_existence, mais cette fois en utilisant ThreadPoolExecutor :

      wiki_page_function.py

      import time
      import requests
      import concurrent.futures
      
      
      def get_wiki_page_existence(wiki_page_url, timeout=10):
          response = requests.get(url=wiki_page_url, timeout=timeout)
      
          page_status = "unknown"
          if response.status_code == 200:
              page_status = "exists"
          elif response.status_code == 404:
              page_status = "does not exist"
      
          return wiki_page_url + " - " + page_status
      wiki_page_urls = ["https://en.wikipedia.org/wiki/" + str(i) for i in range(50)]
      
      print("Running threaded:")
      threaded_start = time.time()
      with concurrent.futures.ThreadPoolExecutor() as executor:
          futures = []
          for url in wiki_page_urls:
              futures.append(executor.submit(get_wiki_page_existence, wiki_page_url=url))
          for future in concurrent.futures.as_completed(futures):
              print(future.result())
      print("Threaded time:", time.time() - threaded_start)
      

      Le code est le même que celui que nous avons créé à l’étape 2, mais avec l’ajout de quelques énoncés imprimés qui nous indiquent le nombre de secondes qu’il faut pour exécuter notre code.

      Si nous relançons le programme, nous obtiendrons la sortie suivante :

      Output

      Running threaded: https://en.wikipedia.org/wiki/1 - exists https://en.wikipedia.org/wiki/0 - exists . . . https://en.wikipedia.org/wiki/48 - exists https://en.wikipedia.org/wiki/49 - exists Threaded time: 1.2201685905456543

      Là encore, le nombre de secondes imprimées après Threaded time (temps threadé écoulé) sera différent sur votre ordinateur (ainsi que l’ordre de votre sortie).

      Vous pouvez maintenant comparer le temps d’exécution pour récupérer les cinquante URLs des pages Wikipédia avec et sans threads.

      Sur la machine utilisée dans ce tutoriel, l’exécution sans threads a pris ~5.803 secondes, et celle avec threads a pris ~1.220 secondes. Notre programme a fonctionné beaucoup plus rapidement avec les threads.

      Conclusion

      Dans ce tutoriel, vous avez appris à utiliser l’utilitaire ThreadPoolExecutor en Python 3 pour exécuter efficacement du code lié aux entrées/sorties. Vous avez créé une fonction bien adaptée à l’invocation dans les threads, appris comment récupérer à la fois la sortie et les exceptions des exécutions threadées de cette fonction, et observé l’augmentation des performances obtenue en utilisant les threads.

      De là, vous pouvez en savoir plus sur les autres fonctions de concurrence offertes par le module concurrent.futures.



      Source link