One place for hosting & domains

      ThreadPoolExecutor

      Como usar o ThreadPoolExecutor em Python 3


      O autor selecionou a COVID-19 Relief Fund​​​​​ para receber uma doação como parte do programa Write for DOnations.

      Introdução

      Os threads em Python são uma forma de paralelismo que permitem que seu programa execute vários procedimentos ao mesmo tempo. O paralelismo em Python também pode ser alcançado usando vários processos, mas os threads são particularmente adequados para acelerar aplicativos que envolvam quantidades significativas de E/S (entrada/saída).

      Alguns exemplo de operações limitadas por E/S incluem realizar solicitações Web e ler dados de arquivos. Em contraste com as operações limitadas por E/S, as operações limitadas por CPU (como realizar operações matemáticas com a biblioteca padrão do Python) não serão tão beneficiadas com os threads em Python.

      O Python 3 inclui o utilitário ThreadPoolExecutor para executar o código em um thread.

      Neste tutorial, usaremos o ThreadPoolExecutor para fazer solicitações de rede de forma conveniente. Definiremos uma função adequada para a invocação dentro de threads, usaremos o ThreadPoolExecutor para executar essa função e processaremos os resultados dessas execuções.

      Para este tutorial, faremos solicitações de rede para verificar a existência de páginas da Wikipédia.

      Nota: o fato de as operações limitadas por E/S se beneficiarem mais dos threads do que as operações limitadas por CPU tem origem em uma idiossincrasia em Python chamada global interpreter lock. Saiba mais sobre o global interpreter lock do Python na documentação oficial do Python.

      Pré-requisitos

      Para aproveitar ao máximo este tutorial, é recomendado ter alguma familiaridade com a programação em Python e a um ambiente de programação local do Python com requests (solicitações) instaladas.

      Você pode revisar estes tutoriais para as informações básicas necessárias:

      • pip install --user requests==2.23.0

      Passo 1 — Definindo uma função para ser executada em threads

      Vamos começar definindo uma função que gostaríamos de executar com a ajuda dos threads.

      Usando o nano ou seu editor de texto/ambiente de desenvolvimento preferido, abra este arquivo:

      • nano wiki_page_function.py

      Para este tutorial, vamos escrever uma função que determina se uma página da Wikipédia existe ou não:

      wiki_page_function.py

      import requests
      
      def get_wiki_page_existence(wiki_page_url, timeout=10):
          response = requests.get(url=wiki_page_url, timeout=timeout)
      
          page_status = "unknown"
          if response.status_code == 200:
              page_status = "exists"
          elif response.status_code == 404:
              page_status = "does not exist"
      
          return wiki_page_url + " - " + page_status
      

      A função get_wiki_page_existence aceita dois argumentos: uma URL de uma página da Wikipédia (wiki_page_url) e um número de segundos timeout para se esperar por uma resposta dessa URL.

      A get_wiki_page_existence usa o pacote requests para fazer uma solicitação Web a essa URL. Dependendo do código de status da response (resposta) HTTP, uma string que descreve se a página existe ou não é retornada. Códigos de status diferentes representam resultados diferentes de uma solicitação HTTP. Este procedimento pressupõe que um código de status 200 de “sucesso” significa que a página da Wikipédia existe e um código de status 404 “não encontrado” significa que a página da Wikipédia não existe.

      Conforme descrito na seção Pré-requisitos, você precisará do pacote requests instalado para executar esta função.

      Vamos tentar executar a função adicionando a url e a chamada de função após a função get_wiki_page_existence:

      wiki_page_function.py

      . . .
      url = "https://en.wikipedia.org/wiki/Ocean"
      print(get_wiki_page_existence(wiki_page_url=url))
      

      Uma vez adicionado o código, salve e feche o arquivo.

      Se executarmos este código:

      • python wiki_page_function.py

      Veremos um resultado como o seguinte:

      Output

      https://en.wikipedia.org/wiki/Ocean - exists

      Chamar a função get_wiki_page_existence com uma página da Wikipédia válida retorna uma string que confirma que a página, de fato, existe.

      Aviso: em geral, não é seguro compartilhar o estado ou objetos Python entre threads sem tomar cuidados especiais para evitar erros de simultaneidade. Ao definir uma função a ser executada em um thread, é melhor definir uma função que execute uma tarefa única e não compartilhe ou publique o estado em outros threads. A get_wiki_page_existence é um exemplo de uma função como essa.

      Passo 2 — Usando o ThreadPoolExecutor para executar uma função em threads

      Agora que temos uma função adequada à invocação com threads, podemos usar o ThreadPoolExecutor para realizar várias invocações dessa função de maneira conveniente.

      Vamos adicionar o seguinte código destacado ao seu programa em wiki_page_function.py:

      wiki_page_function.py

      import requests
      import concurrent.futures
      
      def get_wiki_page_existence(wiki_page_url, timeout=10):
          response = requests.get(url=wiki_page_url, timeout=timeout)
      
          page_status = "unknown"
          if response.status_code == 200:
              page_status = "exists"
          elif response.status_code == 404:
              page_status = "does not exist"
      
          return wiki_page_url + " - " + page_status
      
      wiki_page_urls = [
          "https://en.wikipedia.org/wiki/Ocean",
          "https://en.wikipedia.org/wiki/Island",
          "https://en.wikipedia.org/wiki/this_page_does_not_exist",
          "https://en.wikipedia.org/wiki/Shark",
      ]
      with concurrent.futures.ThreadPoolExecutor() as executor:
          futures = []
          for url in wiki_page_urls:
              futures.append(executor.submit(get_wiki_page_existence, wiki_page_url=url))
          for future in concurrent.futures.as_completed(futures):
              print(future.result())
      

      Vamos dar uma olhada em como esse código funciona:

      • O concurrent.futures é importado para nos dar acesso ao ThreadPoolExecutor.
      • A declaração with é usada para criar um executor de instância do ThreadPoolExecutor que irá esvaziar os threads imediatamente após a conclusão.
      • Quatro tarefas são submitted (submetidas) ao executor: uma para cada uma das URLs na lista wiki_page_urls.
      • Cada chamada a submit retorna uma instância Future que está armazenada na lista futures.
      • A função as_completed espera cada chamada get_wiki_page_existence Future ser concluída para podermos imprimir seu resultado.

      Se executarmos esse programa novamente com o seguinte comando:

      • python wiki_page_function.py

      Veremos um resultado como o seguinte:

      Output

      https://en.wikipedia.org/wiki/Island - exists https://en.wikipedia.org/wiki/Ocean - exists https://en.wikipedia.org/wiki/this_page_does_not_exist - does not exist https://en.wikipedia.org/wiki/Shark - exists

      Esse resultado faz sentido: 3 das URLs são páginas válidas da Wikipédia, e uma delas, a this_page_does_not_exist, não é. Observe que seu resultado pode estar ordenado de maneira diferente do que este. A função concurrent.futures.as_completed nesse exemplo retorna resultados assim que eles estiverem disponíveis, independentemente da ordem em que as tarefas foram enviadas.

      Passo 3 — Processando exceções de execuções de funções em threads

      No passo anterior, get_wiki_page_existence retornou com sucesso um valor para todas as nossas invocações. Neste passo, veremos que o ThreadPoolExecutor também pode apurar exceções geradas em invocações de função em threads.

      Vamos considerar o seguinte bloco de código de exemplo:

      wiki_page_function.py

      import requests
      import concurrent.futures
      
      
      def get_wiki_page_existence(wiki_page_url, timeout=10):
          response = requests.get(url=wiki_page_url, timeout=timeout)
      
          page_status = "unknown"
          if response.status_code == 200:
              page_status = "exists"
          elif response.status_code == 404:
              page_status = "does not exist"
      
          return wiki_page_url + " - " + page_status
      
      
      wiki_page_urls = [
          "https://en.wikipedia.org/wiki/Ocean",
          "https://en.wikipedia.org/wiki/Island",
          "https://en.wikipedia.org/wiki/this_page_does_not_exist",
          "https://en.wikipedia.org/wiki/Shark",
      ]
      with concurrent.futures.ThreadPoolExecutor() as executor:
          futures = []
          for url in wiki_page_urls:
              futures.append(
                  executor.submit(
                      get_wiki_page_existence, wiki_page_url=url, timeout=0.00001
                  )
              )
          for future in concurrent.futures.as_completed(futures):
              try:
                  print(future.result())
              except requests.ConnectTimeout:
                  print("ConnectTimeout.")
      

      Este bloco de código é quase idêntico ao que usamos no Passo 2, mas possui duas diferenças chave:

      • Agora, passamos timeout=0.001 para get_wiki_page_existence. Como o pacote requests não será capaz de completar sua solicitação Web à Wikipédia em 0.00001 segundos, ele criará uma exceção ConnectTimeout.
      • Nós capturamos exceções ConnectTimeout geradas pelo future.result() e imprimimos uma string cada vez que fazemos isso.

      Se executarmos o programa novamente, veremos o seguinte resultado:

      Output

      ConnectTimeout. ConnectTimeout. ConnectTimeout. ConnectTimeout.

      Quatro mensagens ConnectTimeout são impressas — uma para cada uma de nossas quatro wiki_page_urls, uma vez que nenhuma delas pôde ser concluída em 0.00001 segundos e cada uma das quatro chamadas get_wiki_page_existence gerou a exceção ConnectTimeout.

      Agora, você viu que se uma chamada de função submetida a um ThreadPoolExecutor gera uma exceção, então essa exceção pode ser apurada normalmente chamando o Future.result. Chamar o Future.result em todas as suas invocações enviadas garante que seu programa não perca nenhuma exceção gerada em sua função em threads.

      Agora, vamos verificar se usar o ThreadPoolExecutor realmente torna seu programa mais rápido.

      Primeiro, vamos cronometrar o get_wiki_page_existence se executarmos ele sem threads:

      wiki_page_function.py

      import time
      import requests
      import concurrent.futures
      
      
      def get_wiki_page_existence(wiki_page_url, timeout=10):
          response = requests.get(url=wiki_page_url, timeout=timeout)
      
          page_status = "unknown"
          if response.status_code == 200:
              page_status = "exists"
          elif response.status_code == 404:
              page_status = "does not exist"
      
          return wiki_page_url + " - " + page_status
      
      wiki_page_urls = ["https://en.wikipedia.org/wiki/" + str(i) for i in range(50)]
      
      print("Running without threads:")
      without_threads_start = time.time()
      for url in wiki_page_urls:
          print(get_wiki_page_existence(wiki_page_url=url))
      print("Without threads time:", time.time() - without_threads_start)
      

      Nesse exemplo de código, chamamos nossa função get_wiki_page_existence com cinquenta URLs de páginas diferentes da Wikipedia uma a uma. Usamos a função time.time() para imprimir o número de segundos que nosso programa leva para ser executado.

      Se executarmos esse código novamente como antes, veremos um resultado como o seguinte:

      Output

      Running without threads: https://en.wikipedia.org/wiki/0 - exists https://en.wikipedia.org/wiki/1 - exists . . . https://en.wikipedia.org/wiki/48 - exists https://en.wikipedia.org/wiki/49 - exists Without threads time: 5.803015232086182

      As entradas 2-47 nesse resultado foram omitidas para maior concisão.

      O número de segundos impressos depois de Without threads time será diferente quando você executar o código em sua máquina – não tem problema, você só está recebendo um número que servirá como base para se comparar com uma solução que usa o ThreadPoolExecutor. Neste caso, foram ~5.803 segundos.

      Vamos executar as mesmas cinquenta URLs da Wikipedia através do get_wiki_page_existence, mas desta vez usando o ThreadPoolExecutor:

      wiki_page_function.py

      import time
      import requests
      import concurrent.futures
      
      
      def get_wiki_page_existence(wiki_page_url, timeout=10):
          response = requests.get(url=wiki_page_url, timeout=timeout)
      
          page_status = "unknown"
          if response.status_code == 200:
              page_status = "exists"
          elif response.status_code == 404:
              page_status = "does not exist"
      
          return wiki_page_url + " - " + page_status
      wiki_page_urls = ["https://en.wikipedia.org/wiki/" + str(i) for i in range(50)]
      
      print("Running threaded:")
      threaded_start = time.time()
      with concurrent.futures.ThreadPoolExecutor() as executor:
          futures = []
          for url in wiki_page_urls:
              futures.append(executor.submit(get_wiki_page_existence, wiki_page_url=url))
          for future in concurrent.futures.as_completed(futures):
              print(future.result())
      print("Threaded time:", time.time() - threaded_start)
      

      O código é o mesmo que criamos no Passo 2, apenas com a adição de algumas declarações de impressão que nos mostram o número de segundos que o nosso código leva para ser executado.

      Se executarmos o programa novamente, veremos o seguinte:

      Output

      Running threaded: https://en.wikipedia.org/wiki/1 - exists https://en.wikipedia.org/wiki/0 - exists . . . https://en.wikipedia.org/wiki/48 - exists https://en.wikipedia.org/wiki/49 - exists Threaded time: 1.2201685905456543

      Novamente, o número de segundos impressos após Threaded time será diferente em seu computador (assim como a ordem do seu resultado).

      Agora, compare o tempo de execução para obter as cinquenta URLs de páginas da Wikipédia com e sem threads.

      Na máquina usada neste tutorial, o processo sem threads levou ~5.803 segundos e com threads levou ~1.220 segundos. Nosso programa foi executado de maneira significativamente mais rápida com threads.

      Conclusão

      Neste tutorial, você aprendeu como usar o utilitário ThreadPoolExecutor em Python 3 para executar eficientemente códigos limitados por E/S. Você criou uma função adequada à invocação dentro de threads, aprendeu como recuperar tanto o resultado quanto as exceções de execuções em threads dessa função e observou o ganho de desempenho obtido usando threads.

      A partir daqui, você pode aprender mais sobre outras funções de simultaneidade oferecidas pelo módulo concurrent.futures.



      Source link

      Использование ThreadPoolExecutor в Python 3


      Автор выбрал COVID-19 Relief Fund для получения пожертвования в рамках программы Write for DOnations.

      Введение

      Потоки в Python представляют собой форму параллельного программирования, позволяющую программе выполнять несколько процедур одновременно. Параллелизм в Python также можно реализовать посредством использования нескольких процессов, однако потоки особенно хорошо подходят для ускорения приложений, использующих существенные объемы ввода/вывода.

      Например, операции ввода-вывода включают отправку веб-запросов и чтение данных из файлов. В отличие от операций ввода вывода, операции процессора (например, математические операции со стандартной библиотекой Python) не становятся намного эффективнее при использовании потоков Python.

      В состав Python 3 входит утилита ThreadPoolExecutor для выполнения кода в потоке.

      В этом обучающем модуле мы используем ThreadPoolExecutor для ускоренной отправки сетевых запросов. Мы определим функцию, хорошо подходящую для вызова в потоках, используем ThreadPoolExecutor для выполнения этой функции и обработаем результаты выполнения.

      В этом обучающем модуле мы будем составлять сетевые запросы для проверки существования страниц на портале Wikipedia.

      Примечание. Тот факт, что операции ввода-вывода получают больше выгод от потоков, чем операции процессора, связан с использованием в Python глобальной блокировки интерпретатора, которая позволяет только одному потоку сохранять контроль над интерпретатором Python. Если хотите, вы можете узнать больше о глобальном блокировке интерпретатора Python в официальной документации по Python.

      Предварительные требования

      Для наиболее эффективного прохождения этого обучающего модуля требуется знакомство с программированием на Python и локальной средой программирования Python с requests.

      Необходимую информацию можно получить, пройдя следующие обучающие модули:

      • pip install --user requests==2.23.0

      Шаг 1 — Определение функции для выполнения в потоках

      Для начала определим функцию, которую мы хотим выполнить с помощью потоков.

      Откройте этот файл, используя nano или предпочитаемый текстовый редактор или среду разработки:

      • nano wiki_page_function.py

      Для этого обучающего модуля мы напишем функцию, проверяющую существование страницы на портале Wikipedia:

      wiki_page_function.py

      import requests
      
      def get_wiki_page_existence(wiki_page_url, timeout=10):
          response = requests.get(url=wiki_page_url, timeout=timeout)
      
          page_status = "unknown"
          if response.status_code == 200:
              page_status = "exists"
          elif response.status_code == 404:
              page_status = "does not exist"
      
          return wiki_page_url + " - " + page_status
      

      Функция get_wiki_page_existence принимает два аргумента: URL страницы Wikipedia (wiki_page_url) и timeout — количество секунд ожидания ответа от этого URL.

      get_wiki_page_existence использует пакет requests для отправки веб-запроса на этот URL. В зависимости от кода состояния ответа HTTP функция возвращает строку, описывающую наличие или отсутствие страницы. Разные коды состояния соответствуют разным результатам выполнения запроса HTTP. Эта процедура предполагает, что код состояния 200 (успех) означает, что страница Wikipedia существует, а код состояния 404 (не найдено) означает, что страница Wikipedia не существует.

      Как указывалось в разделе «Предварительные требования», для запуска этой функции должен быть установлен пакет requests.

      Попробуем запустить функцию, добавив url и вызов функции после функции get_wiki_page_existence:

      wiki_page_function.py

      . . .
      url = "https://en.wikipedia.org/wiki/Ocean"
      print(get_wiki_page_existence(wiki_page_url=url))
      

      После добавления кода сохраните и закройте файл.

      Если мы запустим этот код:

      • python wiki_page_function.py

      Результат будет выглядеть примерно следующим образом:

      Output

      https://en.wikipedia.org/wiki/Ocean - exists

      Вызов функции get_wiki_page_existence для существующей страницы Wikipedia возвращает строку, подтверждающую фактическое существование страницы.

      Предупреждение. Обычно небезопасно делать объекты или состояния Python доступными для всех потоков, не приняв особых мер для предотвращения ошибок параллельной обработки. При определении функции для выполнения в потоке лучше всего определить функцию, которая выполняет одну задачу и не делится своим состоянием с другими потоками. get_wiki_page_existence — хороший пример такой функции.

      Шаг 2 — Использование ThreadPoolExecutor для выполнения функции в потоках

      Теперь у нас есть функция, подходящая для вызова в потоках, и мы можем использовать ThreadPoolExecutor для многократного ускоренного вызова этой функции.

      Добавьте следующий выделенный код в свою программу в файле wiki_page_function.py:

      wiki_page_function.py

      import requests
      import concurrent.futures
      
      def get_wiki_page_existence(wiki_page_url, timeout=10):
          response = requests.get(url=wiki_page_url, timeout=timeout)
      
          page_status = "unknown"
          if response.status_code == 200:
              page_status = "exists"
          elif response.status_code == 404:
              page_status = "does not exist"
      
          return wiki_page_url + " - " + page_status
      
      wiki_page_urls = [
          "https://en.wikipedia.org/wiki/Ocean",
          "https://en.wikipedia.org/wiki/Island",
          "https://en.wikipedia.org/wiki/this_page_does_not_exist",
          "https://en.wikipedia.org/wiki/Shark",
      ]
      with concurrent.futures.ThreadPoolExecutor() as executor:
          futures = []
          for url in wiki_page_urls:
              futures.append(executor.submit(get_wiki_page_existence, wiki_page_url=url))
          for future in concurrent.futures.as_completed(futures):
              print(future.result())
      

      Посмотрим, как работает этот код:

      • concurrent.futures импортируется, чтобы предоставить нам доступ к ThreadPoolExecutor.
      • Выражение with используется для создания исполнительного блока экземпляра ThreadPoolExecutor, который будет быстро очищать потоки после выполнения.
      • Четыре задания отправляются в исполнительный блок: по одному для каждого URL из списка wiki_page_urls.
      • Каждый вызов submit возвращает экземпляр Future, хранящийся в списке futures.
      • Функция as_completed ожидает каждого вызова Future get_wiki_page_existence для выполнения, чтобы дать нам возможность распечатать результат.

      Если мы снова запустим эту программу с помощью следующей команды:

      • python wiki_page_function.py

      Результат будет выглядеть примерно следующим образом:

      Output

      https://en.wikipedia.org/wiki/Island - exists https://en.wikipedia.org/wiki/Ocean - exists https://en.wikipedia.org/wiki/this_page_does_not_exist - does not exist https://en.wikipedia.org/wiki/Shark - exists

      Этот вывод имеет смысл: 3 адреса URL указывают на существующие страницы Wikipedia, а один из них this_page_does_not_exist не существует. Обратите внимание. что вывод может иметь другой порядок, отличающийся от показанного здесь. Функция concurrent.futures.as_completed в этом примере возвращает результаты сразу же, как только они становятся доступными, вне зависимости от порядка отправки заданий.

      Шаг 3 — Обработка исключений функций, выполняемых в потоках

      На предыдущем шаге функция get_wiki_page_existence успешно вернула значения во всех случаях вызова. На этом шаге мы увидим, что ThreadPoolExecutor также может выводить исключения при вызове функций в потоках.

      Рассмотрим в качестве примера следующий блок кода:

      wiki_page_function.py

      import requests
      import concurrent.futures
      
      
      def get_wiki_page_existence(wiki_page_url, timeout=10):
          response = requests.get(url=wiki_page_url, timeout=timeout)
      
          page_status = "unknown"
          if response.status_code == 200:
              page_status = "exists"
          elif response.status_code == 404:
              page_status = "does not exist"
      
          return wiki_page_url + " - " + page_status
      
      
      wiki_page_urls = [
          "https://en.wikipedia.org/wiki/Ocean",
          "https://en.wikipedia.org/wiki/Island",
          "https://en.wikipedia.org/wiki/this_page_does_not_exist",
          "https://en.wikipedia.org/wiki/Shark",
      ]
      with concurrent.futures.ThreadPoolExecutor() as executor:
          futures = []
          for url in wiki_page_urls:
              futures.append(
                  executor.submit(
                      get_wiki_page_existence, wiki_page_url=url, timeout=0.00001
                  )
              )
          for future in concurrent.futures.as_completed(futures):
              try:
                  print(future.result())
              except requests.ConnectTimeout:
                  print("ConnectTimeout.")
      

      Этот блок кода практически идентичен использованному нами на шаге 2, но имеет два важных отличия:

      • Теперь мы передаем аргумент timeout=0.00001 для функции get_wiki_page_existence. Поскольку пакет requests не может выполнить веб-запрос сайта Wikipedia за 0,00001 секунды, он выдаст исключение ConnectTimeout.
      • Мы собираем исключения ConnectTimeout, выдаваемые future.result(), и выводим строку в каждом таком случае.

      Если мы запустим программу снова, мы получим следующий результат:

      Output

      ConnectTimeout. ConnectTimeout. ConnectTimeout. ConnectTimeout.

      Выведено четыре сообщения ConnectTimeout, по одному для каждого из четырех значений wiki_page_urls, поскольку ни один запрос не мог быть выполнен за 0,00001 секунды, и каждый из четырех вызовов get_wiki_page_existence завершился исключением ConnectTimeout.

      Мы увидели, что если вызов функции, отправленный в ThreadPoolExecutor, завершается исключением, это исключение может быть выведено обычным образом посредством вызова Future.result. Вызов Future.result для всех вызванных функций гарантирует, что ваша программа не пропустит никаких исключений при выполнении функции в потоке.

      Шаг 4 — Сравнение времени исполнения с потоками и без потоков

      Убедимся, что использование ThreadPoolExecutor действительно ускоряет нашу программу.

      Вначале определим время выполнения функции get_wiki_page_existence при ее запуске без потоков:

      wiki_page_function.py

      import time
      import requests
      import concurrent.futures
      
      
      def get_wiki_page_existence(wiki_page_url, timeout=10):
          response = requests.get(url=wiki_page_url, timeout=timeout)
      
          page_status = "unknown"
          if response.status_code == 200:
              page_status = "exists"
          elif response.status_code == 404:
              page_status = "does not exist"
      
          return wiki_page_url + " - " + page_status
      
      wiki_page_urls = ["https://en.wikipedia.org/wiki/" + str(i) for i in range(50)]
      
      print("Running without threads:")
      without_threads_start = time.time()
      for url in wiki_page_urls:
          print(get_wiki_page_existence(wiki_page_url=url))
      print("Without threads time:", time.time() - without_threads_start)
      

      В этом пример кода мы вызываем функцию get_wiki_page_existence с пятьюдесятью разными URL страниц Wikipedia по одной. Мы используем функцию time.time() для вывода количества секунд выполнения нашей программы.

      Если мы запустим этот код снова, как и раньше, мы увидим следующий результат:

      Output

      Running without threads: https://en.wikipedia.org/wiki/0 - exists https://en.wikipedia.org/wiki/1 - exists . . . https://en.wikipedia.org/wiki/48 - exists https://en.wikipedia.org/wiki/49 - exists Without threads time: 5.803015232086182

      Записи 2–47 в выводимых результатах пропущены для краткости.

      Количество секунд, выводимое после Without threads time, будет отличаться для вашего компьютера, и это нормально, ведь это просто базовое число для сравнения с получаемым при использовании ThreadPoolExecutor. В данном случае мы получили результат ~5,803 секунды.

      Теперь снова пропустим те же пятьдесят URL страниц Wikipedia через функцию get_wiki_page_existence, но в этот раз с использованием ThreadPoolExecutor:

      wiki_page_function.py

      import time
      import requests
      import concurrent.futures
      
      
      def get_wiki_page_existence(wiki_page_url, timeout=10):
          response = requests.get(url=wiki_page_url, timeout=timeout)
      
          page_status = "unknown"
          if response.status_code == 200:
              page_status = "exists"
          elif response.status_code == 404:
              page_status = "does not exist"
      
          return wiki_page_url + " - " + page_status
      wiki_page_urls = ["https://en.wikipedia.org/wiki/" + str(i) for i in range(50)]
      
      print("Running threaded:")
      threaded_start = time.time()
      with concurrent.futures.ThreadPoolExecutor() as executor:
          futures = []
          for url in wiki_page_urls:
              futures.append(executor.submit(get_wiki_page_existence, wiki_page_url=url))
          for future in concurrent.futures.as_completed(futures):
              print(future.result())
      print("Threaded time:", time.time() - threaded_start)
      

      Это тот же самый код, который мы создали на шаге 2, только в него добавлены выражения print, показывающие время выполнения нашего кода в секундах.

      Если мы снова запустим программу, мы увидим следующий результат:

      Output

      Running threaded: https://en.wikipedia.org/wiki/1 - exists https://en.wikipedia.org/wiki/0 - exists . . . https://en.wikipedia.org/wiki/48 - exists https://en.wikipedia.org/wiki/49 - exists Threaded time: 1.2201685905456543

      Количество секунд после Threaded time на вашем компьютере будет отличаться (как и порядок вывода).

      Теперь вы можете сравнить время выполнения при доставке пятидесяти URL страниц Wikipedia с потоками и без потоков.

      На компьютере, использованном для этого обучающего модуля, выполнение операций без потоков заняло ~5,803 секунды, а с потоками — ~1,220 секунды. С потоками наша программа работала значительно быстрее.

      Заключение

      В этом обучающем модуле мы научились использовать утилиту ThreadPoolExecutor в Python 3 для эффективного выполнения кода, связанного с операциями ввода-вывода. Вы создали функцию, хорошо подходящую для вызова в потоках, научились получать результаты и исключения при выполнении этой фукнции в потоках и оценили прирост производительности, достигаемый за счет использования потоков.

      Далее вас могут заинтересовать другие функции параллельной обработки, доступные в модуле concurrent.futures.



      Source link

      How To Use ThreadPoolExecutor in Python 3


      The author selected the COVID-19 Relief Fund to receive a donation as part of the Write for DOnations program.

      Introduction

      Python threads are a form of parallelism that allow your program to run multiple procedures at once. Parallelism in Python can also be achieved using multiple processes, but threads are particularly well suited to speeding up applications that involve significant amounts of I/O (input/output).

      Example I/O-bound operations include making web requests and reading data from files. In contrast to I/O-bound operations, CPU-bound operations (like performing math with the Python standard library) will not benefit much from Python threads.

      Python 3 includes the ThreadPoolExecutor utility for executing code in a thread.

      In this tutorial, we will use ThreadPoolExecutor to make network requests expediently. We’ll define a function well suited for invocation within threads, use ThreadPoolExecutor to execute that function, and process results from those executions.

      For this tutorial, we’ll make network requests to check for the existence of Wikipedia pages.

      Note: The fact that I/O-bound operations benefit more from threads than CPU-bound operations is caused by an idiosyncrasy in Python called the, global interpreter lock. If you’d like, you can learn more about Python’s global interpreter lock in the official Python documentation.

      Prerequisites

      To get the most out of this tutorial, it is recommended to have some familiarity with programming in Python and a local Python programming environment with requests installed.

      You can review these tutorials for the necessary background information:

      • pip install --user requests==2.23.0

      Step 1 — Defining a Function to Execute in Threads

      Let’s start by defining a function that we’d like to execute with the help of threads.

      Using nano or your preferred text editor/development environment, you can open this file:

      • nano wiki_page_function.py

      For this tutorial, we’ll write a function that determines whether or not a Wikipedia page exists:

      wiki_page_function.py

      import requests
      
      def get_wiki_page_existence(wiki_page_url, timeout=10):
          response = requests.get(url=wiki_page_url, timeout=timeout)
      
          page_status = "unknown"
          if response.status_code == 200:
              page_status = "exists"
          elif response.status_code == 404:
              page_status = "does not exist"
      
          return wiki_page_url + " - " + page_status
      

      The get_wiki_page_existence function accepts two arguments: a URL to a Wikipedia page (wiki_page_url), and a timeout number of seconds to wait for a response from that URL.

      get_wiki_page_existence uses the requests package to make a web request to that URL. Depending on the status code of the HTTP response, a string is returned that describes whether or not the page exists. Different status codes represent different outcomes of a HTTP request. This procedure assumes that a 200 “success” status code means the Wikipedia page exists, and a 404 “not found” status code means the Wikipedia page does not exist.

      As described in the Prerequisites section, you’ll need the requests package installed to run this function.

      Let’s try running the function by adding the url and function call following the get_wiki_page_existence function:

      wiki_page_function.py

      . . .
      url = "https://en.wikipedia.org/wiki/Ocean"
      print(get_wiki_page_existence(wiki_page_url=url))
      

      Once you’ve added the code, save and close the file.

      If we run this code:

      • python wiki_page_function.py

      We’ll see output like the following:

      Output

      https://en.wikipedia.org/wiki/Ocean - exists

      Calling the get_wiki_page_existence function with a valid Wikipedia page returns a string that confirms the page does, in fact, exist.

      Warning: In general, it is not safe to share Python objects or state between threads without taking special care to avoid concurrency bugs. When defining a function to execute in a thread, it is best to define a function that performs a single job and does not share or publish state to other threads. get_wiki_page_existence is an example of such a function.

      Step 2 — Using ThreadPoolExecutor to Execute a Function in Threads

      Now that we have a function well suited to invocation with threads, we can use ThreadPoolExecutor to perform multiple invocations of that function expediently.

      Let’s add the following highlighted code to your program in wiki_page_function.py:

      wiki_page_function.py

      import requests
      import concurrent.futures
      
      def get_wiki_page_existence(wiki_page_url, timeout=10):
          response = requests.get(url=wiki_page_url, timeout=timeout)
      
          page_status = "unknown"
          if response.status_code == 200:
              page_status = "exists"
          elif response.status_code == 404:
              page_status = "does not exist"
      
          return wiki_page_url + " - " + page_status
      
      wiki_page_urls = [
          "https://en.wikipedia.org/wiki/Ocean",
          "https://en.wikipedia.org/wiki/Island",
          "https://en.wikipedia.org/wiki/this_page_does_not_exist",
          "https://en.wikipedia.org/wiki/Shark",
      ]
      with concurrent.futures.ThreadPoolExecutor() as executor:
          futures = []
          for url in wiki_page_urls:
              futures.append(executor.submit(get_wiki_page_existence, wiki_page_url=url))
          for future in concurrent.futures.as_completed(futures):
              print(future.result())
      

      Let’s take a look at how this code works:

      • concurrent.futures is imported to give us access to ThreadPoolExecutor.
      • A with statement is used to create a ThreadPoolExecutor instance executor that will promptly clean up threads upon completion.
      • Four jobs are submitted to the executor: one for each of the URLs in the wiki_page_urls list.
      • Each call to submit returns a Future instance that is stored in the futures list.
      • The as_completed function waits for each Future get_wiki_page_existence call to complete so we can print its result.

      If we run this program again, with the following command:

      • python wiki_page_function.py

      We’ll see output like the following:

      Output

      https://en.wikipedia.org/wiki/Island - exists https://en.wikipedia.org/wiki/Ocean - exists https://en.wikipedia.org/wiki/this_page_does_not_exist - does not exist https://en.wikipedia.org/wiki/Shark - exists

      This output makes sense: 3 of the URLs are valid Wikipedia pages, and one of them this_page_does_not_exist is not. Note that your output may be ordered differently than this output. The concurrent.futures.as_completed function in this example returns results as soon as they are available, regardless of what order the jobs were submitted in.

      Step 3 — Processing Exceptions From Functions Run in Threads

      In the previous step, get_wiki_page_existence successfully returned a value for all of our invocations. In this step, we’ll see that ThreadPoolExecutor can also raise exceptions generated in threaded function invocations.

      Let’s consider the following example code block:

      wiki_page_function.py

      import requests
      import concurrent.futures
      
      
      def get_wiki_page_existence(wiki_page_url, timeout=10):
          response = requests.get(url=wiki_page_url, timeout=timeout)
      
          page_status = "unknown"
          if response.status_code == 200:
              page_status = "exists"
          elif response.status_code == 404:
              page_status = "does not exist"
      
          return wiki_page_url + " - " + page_status
      
      
      wiki_page_urls = [
          "https://en.wikipedia.org/wiki/Ocean",
          "https://en.wikipedia.org/wiki/Island",
          "https://en.wikipedia.org/wiki/this_page_does_not_exist",
          "https://en.wikipedia.org/wiki/Shark",
      ]
      with concurrent.futures.ThreadPoolExecutor() as executor:
          futures = []
          for url in wiki_page_urls:
              futures.append(
                  executor.submit(
                      get_wiki_page_existence, wiki_page_url=url, timeout=0.00001
                  )
              )
          for future in concurrent.futures.as_completed(futures):
              try:
                  print(future.result())
              except requests.ConnectTimeout:
                  print("ConnectTimeout.")
      

      This code block is nearly identical to the one we used in Step 2, but it has two key differences:

      • We now pass timeout=0.00001 to get_wiki_page_existence. Since the requests package won’t be able to complete its web request to Wikipedia in 0.00001 seconds, it will raise a ConnectTimeout exception.
      • We catch ConnectTimeout exceptions raised by future.result() and print out a string each time we do so.

      If we run the program again, we’ll see the following output:

      Output

      ConnectTimeout. ConnectTimeout. ConnectTimeout. ConnectTimeout.

      Four ConnectTimeout messages are printed—one for each of our four wiki_page_urls, since none of them were able to complete in 0.00001 seconds and each of the four get_wiki_page_existence calls raised the ConnectTimeout exception.

      You’ve now seen that if a function call submitted to a ThreadPoolExecutor raises an exception, then that exception can get raised normally by calling Future.result. Calling Future.result on all your submitted invocations ensures that your program won’t miss any exceptions raised from your threaded function.

      Step 4 — Comparing Execution Time With and Without Threads

      Now let’s verify that using ThreadPoolExecutor actually makes your program faster.

      First, let’s time get_wiki_page_existence if we run it without threads:

      wiki_page_function.py

      import time
      import requests
      import concurrent.futures
      
      
      def get_wiki_page_existence(wiki_page_url, timeout=10):
          response = requests.get(url=wiki_page_url, timeout=timeout)
      
          page_status = "unknown"
          if response.status_code == 200:
              page_status = "exists"
          elif response.status_code == 404:
              page_status = "does not exist"
      
          return wiki_page_url + " - " + page_status
      
      wiki_page_urls = ["https://en.wikipedia.org/wiki/" + str(i) for i in range(50)]
      
      print("Running without threads:")
      without_threads_start = time.time()
      for url in wiki_page_urls:
          print(get_wiki_page_existence(wiki_page_url=url))
      print("Without threads time:", time.time() - without_threads_start)
      

      In the code example we call our get_wiki_page_existence function with fifty different Wikipedia page URLs one by one. We use the time.time() function to print out the number of seconds it takes to run our program.

      If we run this code again as before, we’ll see output like the following:

      Output

      Running without threads: https://en.wikipedia.org/wiki/0 - exists https://en.wikipedia.org/wiki/1 - exists . . . https://en.wikipedia.org/wiki/48 - exists https://en.wikipedia.org/wiki/49 - exists Without threads time: 5.803015232086182

      Entries 2–47 in this output have been omitted for brevity.

      The number of seconds printed after Without threads time will be different when you run it on your machine—that’s OK, you are just getting a baseline number to compare with a solution that uses ThreadPoolExecutor. In this case, it was ~5.803 seconds.

      Let’s run the same fifty Wikipedia URLs through get_wiki_page_existence, but this time using ThreadPoolExecutor:

      wiki_page_function.py

      import time
      import requests
      import concurrent.futures
      
      
      def get_wiki_page_existence(wiki_page_url, timeout=10):
          response = requests.get(url=wiki_page_url, timeout=timeout)
      
          page_status = "unknown"
          if response.status_code == 200:
              page_status = "exists"
          elif response.status_code == 404:
              page_status = "does not exist"
      
          return wiki_page_url + " - " + page_status
      wiki_page_urls = ["https://en.wikipedia.org/wiki/" + str(i) for i in range(50)]
      
      print("Running threaded:")
      threaded_start = time.time()
      with concurrent.futures.ThreadPoolExecutor() as executor:
          futures = []
          for url in wiki_page_urls:
              futures.append(executor.submit(get_wiki_page_existence, wiki_page_url=url))
          for future in concurrent.futures.as_completed(futures):
              print(future.result())
      print("Threaded time:", time.time() - threaded_start)
      

      The code is the same code we created in Step 2, only with the addition of some print statements that show us the number of seconds it takes to execute our code.

      If we run the program again, we’ll see the following:

      Output

      Running threaded: https://en.wikipedia.org/wiki/1 - exists https://en.wikipedia.org/wiki/0 - exists . . . https://en.wikipedia.org/wiki/48 - exists https://en.wikipedia.org/wiki/49 - exists Threaded time: 1.2201685905456543

      Again, the number of seconds printed after Threaded time will be different on your computer (as will the order of your output).

      You can now compare the execution time for fetching the fifty Wikipedia page URLs with and without threads.

      On the machine used in this tutorial, without threads took ~5.803 seconds, and with threads took ~1.220 seconds. Our program ran significantly faster with threads.

      Conclusion

      In this tutorial, you have learned how to use the ThreadPoolExecutor utility in Python 3 to efficiently run code that is I/O bound. You created a function well suited to invocation within threads, learned how to retrieve both output and exceptions from threaded executions of that function, and observed the performance boost gained by using threads.

      From here you can learn more about other concurrency functions offered by the concurrent.futures module.



      Source link