One place for hosting & domains

      Использование ThreadPoolExecutor в Python 3


      Автор выбрал COVID-19 Relief Fund для получения пожертвования в рамках программы Write for DOnations.

      Введение

      Потоки в Python представляют собой форму параллельного программирования, позволяющую программе выполнять несколько процедур одновременно. Параллелизм в Python также можно реализовать посредством использования нескольких процессов, однако потоки особенно хорошо подходят для ускорения приложений, использующих существенные объемы ввода/вывода.

      Например, операции ввода-вывода включают отправку веб-запросов и чтение данных из файлов. В отличие от операций ввода вывода, операции процессора (например, математические операции со стандартной библиотекой Python) не становятся намного эффективнее при использовании потоков Python.

      В состав Python 3 входит утилита ThreadPoolExecutor для выполнения кода в потоке.

      В этом обучающем модуле мы используем ThreadPoolExecutor для ускоренной отправки сетевых запросов. Мы определим функцию, хорошо подходящую для вызова в потоках, используем ThreadPoolExecutor для выполнения этой функции и обработаем результаты выполнения.

      В этом обучающем модуле мы будем составлять сетевые запросы для проверки существования страниц на портале Wikipedia.

      Примечание. Тот факт, что операции ввода-вывода получают больше выгод от потоков, чем операции процессора, связан с использованием в Python глобальной блокировки интерпретатора, которая позволяет только одному потоку сохранять контроль над интерпретатором Python. Если хотите, вы можете узнать больше о глобальном блокировке интерпретатора Python в официальной документации по Python.

      Предварительные требования

      Для наиболее эффективного прохождения этого обучающего модуля требуется знакомство с программированием на Python и локальной средой программирования Python с requests.

      Необходимую информацию можно получить, пройдя следующие обучающие модули:

      • pip install --user requests==2.23.0

      Шаг 1 — Определение функции для выполнения в потоках

      Для начала определим функцию, которую мы хотим выполнить с помощью потоков.

      Откройте этот файл, используя nano или предпочитаемый текстовый редактор или среду разработки:

      • nano wiki_page_function.py

      Для этого обучающего модуля мы напишем функцию, проверяющую существование страницы на портале Wikipedia:

      wiki_page_function.py

      import requests
      
      def get_wiki_page_existence(wiki_page_url, timeout=10):
          response = requests.get(url=wiki_page_url, timeout=timeout)
      
          page_status = "unknown"
          if response.status_code == 200:
              page_status = "exists"
          elif response.status_code == 404:
              page_status = "does not exist"
      
          return wiki_page_url + " - " + page_status
      

      Функция get_wiki_page_existence принимает два аргумента: URL страницы Wikipedia (wiki_page_url) и timeout — количество секунд ожидания ответа от этого URL.

      get_wiki_page_existence использует пакет requests для отправки веб-запроса на этот URL. В зависимости от кода состояния ответа HTTP функция возвращает строку, описывающую наличие или отсутствие страницы. Разные коды состояния соответствуют разным результатам выполнения запроса HTTP. Эта процедура предполагает, что код состояния 200 (успех) означает, что страница Wikipedia существует, а код состояния 404 (не найдено) означает, что страница Wikipedia не существует.

      Как указывалось в разделе «Предварительные требования», для запуска этой функции должен быть установлен пакет requests.

      Попробуем запустить функцию, добавив url и вызов функции после функции get_wiki_page_existence:

      wiki_page_function.py

      . . .
      url = "https://en.wikipedia.org/wiki/Ocean"
      print(get_wiki_page_existence(wiki_page_url=url))
      

      После добавления кода сохраните и закройте файл.

      Если мы запустим этот код:

      • python wiki_page_function.py

      Результат будет выглядеть примерно следующим образом:

      Output

      https://en.wikipedia.org/wiki/Ocean - exists

      Вызов функции get_wiki_page_existence для существующей страницы Wikipedia возвращает строку, подтверждающую фактическое существование страницы.

      Предупреждение. Обычно небезопасно делать объекты или состояния Python доступными для всех потоков, не приняв особых мер для предотвращения ошибок параллельной обработки. При определении функции для выполнения в потоке лучше всего определить функцию, которая выполняет одну задачу и не делится своим состоянием с другими потоками. get_wiki_page_existence — хороший пример такой функции.

      Шаг 2 — Использование ThreadPoolExecutor для выполнения функции в потоках

      Теперь у нас есть функция, подходящая для вызова в потоках, и мы можем использовать ThreadPoolExecutor для многократного ускоренного вызова этой функции.

      Добавьте следующий выделенный код в свою программу в файле wiki_page_function.py:

      wiki_page_function.py

      import requests
      import concurrent.futures
      
      def get_wiki_page_existence(wiki_page_url, timeout=10):
          response = requests.get(url=wiki_page_url, timeout=timeout)
      
          page_status = "unknown"
          if response.status_code == 200:
              page_status = "exists"
          elif response.status_code == 404:
              page_status = "does not exist"
      
          return wiki_page_url + " - " + page_status
      
      wiki_page_urls = [
          "https://en.wikipedia.org/wiki/Ocean",
          "https://en.wikipedia.org/wiki/Island",
          "https://en.wikipedia.org/wiki/this_page_does_not_exist",
          "https://en.wikipedia.org/wiki/Shark",
      ]
      with concurrent.futures.ThreadPoolExecutor() as executor:
          futures = []
          for url in wiki_page_urls:
              futures.append(executor.submit(get_wiki_page_existence, wiki_page_url=url))
          for future in concurrent.futures.as_completed(futures):
              print(future.result())
      

      Посмотрим, как работает этот код:

      • concurrent.futures импортируется, чтобы предоставить нам доступ к ThreadPoolExecutor.
      • Выражение with используется для создания исполнительного блока экземпляра ThreadPoolExecutor, который будет быстро очищать потоки после выполнения.
      • Четыре задания отправляются в исполнительный блок: по одному для каждого URL из списка wiki_page_urls.
      • Каждый вызов submit возвращает экземпляр Future, хранящийся в списке futures.
      • Функция as_completed ожидает каждого вызова Future get_wiki_page_existence для выполнения, чтобы дать нам возможность распечатать результат.

      Если мы снова запустим эту программу с помощью следующей команды:

      • python wiki_page_function.py

      Результат будет выглядеть примерно следующим образом:

      Output

      https://en.wikipedia.org/wiki/Island - exists https://en.wikipedia.org/wiki/Ocean - exists https://en.wikipedia.org/wiki/this_page_does_not_exist - does not exist https://en.wikipedia.org/wiki/Shark - exists

      Этот вывод имеет смысл: 3 адреса URL указывают на существующие страницы Wikipedia, а один из них this_page_does_not_exist не существует. Обратите внимание. что вывод может иметь другой порядок, отличающийся от показанного здесь. Функция concurrent.futures.as_completed в этом примере возвращает результаты сразу же, как только они становятся доступными, вне зависимости от порядка отправки заданий.

      Шаг 3 — Обработка исключений функций, выполняемых в потоках

      На предыдущем шаге функция get_wiki_page_existence успешно вернула значения во всех случаях вызова. На этом шаге мы увидим, что ThreadPoolExecutor также может выводить исключения при вызове функций в потоках.

      Рассмотрим в качестве примера следующий блок кода:

      wiki_page_function.py

      import requests
      import concurrent.futures
      
      
      def get_wiki_page_existence(wiki_page_url, timeout=10):
          response = requests.get(url=wiki_page_url, timeout=timeout)
      
          page_status = "unknown"
          if response.status_code == 200:
              page_status = "exists"
          elif response.status_code == 404:
              page_status = "does not exist"
      
          return wiki_page_url + " - " + page_status
      
      
      wiki_page_urls = [
          "https://en.wikipedia.org/wiki/Ocean",
          "https://en.wikipedia.org/wiki/Island",
          "https://en.wikipedia.org/wiki/this_page_does_not_exist",
          "https://en.wikipedia.org/wiki/Shark",
      ]
      with concurrent.futures.ThreadPoolExecutor() as executor:
          futures = []
          for url in wiki_page_urls:
              futures.append(
                  executor.submit(
                      get_wiki_page_existence, wiki_page_url=url, timeout=0.00001
                  )
              )
          for future in concurrent.futures.as_completed(futures):
              try:
                  print(future.result())
              except requests.ConnectTimeout:
                  print("ConnectTimeout.")
      

      Этот блок кода практически идентичен использованному нами на шаге 2, но имеет два важных отличия:

      • Теперь мы передаем аргумент timeout=0.00001 для функции get_wiki_page_existence. Поскольку пакет requests не может выполнить веб-запрос сайта Wikipedia за 0,00001 секунды, он выдаст исключение ConnectTimeout.
      • Мы собираем исключения ConnectTimeout, выдаваемые future.result(), и выводим строку в каждом таком случае.

      Если мы запустим программу снова, мы получим следующий результат:

      Output

      ConnectTimeout. ConnectTimeout. ConnectTimeout. ConnectTimeout.

      Выведено четыре сообщения ConnectTimeout, по одному для каждого из четырех значений wiki_page_urls, поскольку ни один запрос не мог быть выполнен за 0,00001 секунды, и каждый из четырех вызовов get_wiki_page_existence завершился исключением ConnectTimeout.

      Мы увидели, что если вызов функции, отправленный в ThreadPoolExecutor, завершается исключением, это исключение может быть выведено обычным образом посредством вызова Future.result. Вызов Future.result для всех вызванных функций гарантирует, что ваша программа не пропустит никаких исключений при выполнении функции в потоке.

      Шаг 4 — Сравнение времени исполнения с потоками и без потоков

      Убедимся, что использование ThreadPoolExecutor действительно ускоряет нашу программу.

      Вначале определим время выполнения функции get_wiki_page_existence при ее запуске без потоков:

      wiki_page_function.py

      import time
      import requests
      import concurrent.futures
      
      
      def get_wiki_page_existence(wiki_page_url, timeout=10):
          response = requests.get(url=wiki_page_url, timeout=timeout)
      
          page_status = "unknown"
          if response.status_code == 200:
              page_status = "exists"
          elif response.status_code == 404:
              page_status = "does not exist"
      
          return wiki_page_url + " - " + page_status
      
      wiki_page_urls = ["https://en.wikipedia.org/wiki/" + str(i) for i in range(50)]
      
      print("Running without threads:")
      without_threads_start = time.time()
      for url in wiki_page_urls:
          print(get_wiki_page_existence(wiki_page_url=url))
      print("Without threads time:", time.time() - without_threads_start)
      

      В этом пример кода мы вызываем функцию get_wiki_page_existence с пятьюдесятью разными URL страниц Wikipedia по одной. Мы используем функцию time.time() для вывода количества секунд выполнения нашей программы.

      Если мы запустим этот код снова, как и раньше, мы увидим следующий результат:

      Output

      Running without threads: https://en.wikipedia.org/wiki/0 - exists https://en.wikipedia.org/wiki/1 - exists . . . https://en.wikipedia.org/wiki/48 - exists https://en.wikipedia.org/wiki/49 - exists Without threads time: 5.803015232086182

      Записи 2–47 в выводимых результатах пропущены для краткости.

      Количество секунд, выводимое после Without threads time, будет отличаться для вашего компьютера, и это нормально, ведь это просто базовое число для сравнения с получаемым при использовании ThreadPoolExecutor. В данном случае мы получили результат ~5,803 секунды.

      Теперь снова пропустим те же пятьдесят URL страниц Wikipedia через функцию get_wiki_page_existence, но в этот раз с использованием ThreadPoolExecutor:

      wiki_page_function.py

      import time
      import requests
      import concurrent.futures
      
      
      def get_wiki_page_existence(wiki_page_url, timeout=10):
          response = requests.get(url=wiki_page_url, timeout=timeout)
      
          page_status = "unknown"
          if response.status_code == 200:
              page_status = "exists"
          elif response.status_code == 404:
              page_status = "does not exist"
      
          return wiki_page_url + " - " + page_status
      wiki_page_urls = ["https://en.wikipedia.org/wiki/" + str(i) for i in range(50)]
      
      print("Running threaded:")
      threaded_start = time.time()
      with concurrent.futures.ThreadPoolExecutor() as executor:
          futures = []
          for url in wiki_page_urls:
              futures.append(executor.submit(get_wiki_page_existence, wiki_page_url=url))
          for future in concurrent.futures.as_completed(futures):
              print(future.result())
      print("Threaded time:", time.time() - threaded_start)
      

      Это тот же самый код, который мы создали на шаге 2, только в него добавлены выражения print, показывающие время выполнения нашего кода в секундах.

      Если мы снова запустим программу, мы увидим следующий результат:

      Output

      Running threaded: https://en.wikipedia.org/wiki/1 - exists https://en.wikipedia.org/wiki/0 - exists . . . https://en.wikipedia.org/wiki/48 - exists https://en.wikipedia.org/wiki/49 - exists Threaded time: 1.2201685905456543

      Количество секунд после Threaded time на вашем компьютере будет отличаться (как и порядок вывода).

      Теперь вы можете сравнить время выполнения при доставке пятидесяти URL страниц Wikipedia с потоками и без потоков.

      На компьютере, использованном для этого обучающего модуля, выполнение операций без потоков заняло ~5,803 секунды, а с потоками — ~1,220 секунды. С потоками наша программа работала значительно быстрее.

      Заключение

      В этом обучающем модуле мы научились использовать утилиту ThreadPoolExecutor в Python 3 для эффективного выполнения кода, связанного с операциями ввода-вывода. Вы создали функцию, хорошо подходящую для вызова в потоках, научились получать результаты и исключения при выполнении этой фукнции в потоках и оценили прирост производительности, достигаемый за счет использования потоков.

      Далее вас могут заинтересовать другие функции параллельной обработки, доступные в модуле concurrent.futures.



      Source link