Как писать асинхронный код, который не «блокирует» event-loop
Как писать асинхронный код, который не блокирует event-loop
Принципы работы event loop в Python (asyncio)
В основе любой асинхронной программы на Python лежит цикл событий (event loop). Именно он управляет выполнением всех задач, обработчиков событий и операций ввода-вывода. По сути, event loop запускает задания (корутины) по очереди и переключается между ними, когда текущая задача приостанавливается при ожидании результата (например, выполнив await) . Такой подход называется кооперативной многозадачностью: в каждый момент времени активно только одно задание, но при ожидании (await) цикл событий передаёт управление другим задачам или выполняет другие операции (сетевой ввод-вывод, запуск subprocess и т.п.) .
Важно понимать, что event loop работает в одном потоке (если явно не указано иное) и переключение задач происходит только в точках await. Если какая-то корутина выполняется без await (например, выполняя долгий расчёт или вызывая блокирующую функцию), она блокирует весь цикл событий и не даёт другим задачам выполняться параллельно . Поэтому ключевой принцип написания эффективного асинхронного кода – не допускать блокирующих операций внутри корутин, чтобы event loop мог свободно планировать другие задачи.
Типичные ошибки и антипаттерны, блокирующие event loop
Рассмотрим распространённые ошибки, из-за которых ваш асинхронный код может невольно “заморозить” цикл событий, и способы их избежать.
Блокирующие вызовы (например, time.sleep, ввод-вывод)
Проблема: Вызов обычных (синхронных) функций, которые приостанавливают выполнение, внутри асинхронного кода. Классический пример – использование time.sleep() вместо неблокирующего asyncio.sleep(). Аналогично, любые операции ввода-вывода (чтение файла, сетевой запрос через синхронную библиотеку) напрямую внутри корутины будут блокировать поток исполнения.
Если вызвать time.sleep(5) внутри корутины, выполнение полностью замрёт на 5 секунд – цикл событий не сможет выполнять другие задачи в это время . В контрасте, вызов await asyncio.sleep(5) приостановит только текущую корутину, позволив event loop выполнять другие задачи во время паузы . Ниже приведён пример:
# Плохой пример: блокирующий вызов time.sleep внутри async-функции
import asyncio, time
async def main():
print("Начало")
time.sleep(5) # Эта строка блокирует event loop на 5 секунд!
print("Завершение")
asyncio.run(main())
В этом фрагменте вызов time.sleep(5) блокирует весь поток программы, включая event loop, на 5 секунд. Ни одна другая асинхронная задача не может выполняться в этот период . Правильный подход – использовать неблокирующий сон из asyncio:
# Хороший пример: неблокирующая пауза с asyncio.sleep
import asyncio
async def main():
print("Начало")
await asyncio.sleep(5) # Не блокирует event loop, позволяет выполнить другие задачи
print("Завершение")
asyncio.run(main())
Здесь await asyncio.sleep(5) приостанавливает лишь корутину main, а сам цикл событий свободно выполняет другие задачи (если они есть) во время ожидания . Аналогично, любые операции ввода-вывода должны быть выполнены асинхронно. Например, вместо использования синхронной библиотеки для HTTP-запросов внутри корутины, такой как requests, следует применять асинхронные аналоги (aiohttp, httpx и т.д.) либо вызывать синхронную функцию через специальный механизм (об этом ниже).
# Плохой пример: синхронный HTTP-запрос внутри корутины
import requests
async def fetch_data():
response = requests.get("https://example.com/data") # блокирует event loop до получения ответа
return response.text
# Хороший пример: асинхронный HTTP-запрос с aiohttp
import aiohttp
async def fetch_data_async():
async with aiohttp.ClientSession() as session:
async with session.get("https://example.com/data") as resp:
return await resp.text()
В первом случае вызов requests.get выполнит сетевой запрос синхронно, блокируя весь event loop на время ожидания ответа. Во втором случае используется библиотека aiohttp, которая реализует запрос асинхронно – при выполнении await resp.text() цикл событий может переключиться на другие задачи, пока данные читаются из сети. Как правило, любая операция, занимающая заметное время (ожидание сети, диска, задержка и т.д.), должна выполняться через await. Если вы вызываете обычную функцию, которая просто “висит” и ничего не отдаёт в управление event loop, то на время её работы всё приложение приостанавливается .
CPU-bound задачи (тяжёлые расчёты)
Проблема: Запуск длительных вычислений, нагружающих CPU, внутри асинхронных функций. Например, обработка большого массива данных, рендеринг изображения, шифрование, сложные математические расчёты и т.п. в чистом Python – всё это потребляет процессорное время и, если выполняется в корутине без пауз, будет блокировать цикл событий. Даже если такие задачи не ожидают внешних ресурсов, они могут надолго занять поток Python, не давая планировщику переключиться на другие корутины.
Например, следующая корутина вычисляет сумму чисел и не содержит ни одного await:
async def compute_sum(n):
total = 0
for i in range(n):
total += i # длительный CPU--bound цикл
return total
async def main():
print("Computing...")
result = await compute_sum(10**7)
print("Result:", result)
asyncio.run(main())
Вызов await compute_sum(10**7) фактически сразу начинает исполнение функции compute_sum. Но поскольку внутри неё нет операций await, то на всё время выполнения цикла (который может занять сотни миллисекунд или секунды) event loop будет заблокирован. Другие задачи не получат управление, пока расчёт не завершится. По сути, длительная CPU-bound корутина ничем не отличается от вызова блокирующей функции – она монополизирует поток выполнения. В приведённом примере, если main() запускалась бы в веб-сервере, весь сервер не смог бы обрабатывать другие запросы, пока не закончится вычисление суммы.
Решение: выносить тяжёлые вычисления из главного потока event loop. Есть два подхода:
-
Разбить вычисление на куски и между ними явно уступать управление loop’у. Например, внутри цикла периодически вызывать await asyncio.sleep(0) или аналогичные короткие паузы, чтобы позволить планировщику выполнить другие задачи. Однако это не ускоряет сам расчёт и усложняет код – подходит лишь для относительно простых случаев.
-
Выполнить расчёт в отдельном потоке или процессе, используя asyncio-интерфейсы для запуска синхронных функций. Этот метод предпочтительнее, о нём подробно поговорим ниже (см. раздел “Интеграция синхронного кода”).
Главное – не пытаться выполнять долгую CPU-работу непосредственно в корутине. Асинхронность в Python основана на кооперативном переключении задач, и она эффективна в первую очередь для I/O-bound задач (ожидание ввода-вывода). Глобальная блокировка интерпретатора Python (GIL) не позволяет полноценно параллелить вычисления в нескольких потоках, поэтому длительная операция над CPU либо должна быть вынесена в отдельный процесс, либо разделена на части, между которыми есть await. В противном случае производительность приложения сильно пострадает: любая CPU-bound задача, запускаемая в event loop, задерживает выполнение всех остальных.
Небезопасные сторонние библиотеки (неасинхронный код)
Проблема: Использование в асинхронном коде сторонних библиотек или функций, которые не рассчитаны на асинхронность. Под “небезопасными” в данном контексте понимаются библиотеки, выполняющие продолжительные блокирующие операции без await. Классические примеры: обращение к базе данных через обычный драйвер (например, psycopg2 для PostgreSQL или стандартный SQLAlchemy без async), обращение к веб-сервисам через requests или urllib, чтение/запись файлов стандартными функциями open().read(), и т.д.
Если такая библиотека вызывается внутри вашего async-кода, то, как и в случаях выше, цикл событий будет заблокирован на всё время работы этой функции . Например, часто встречаемая ошибка – вызывать subprocess.run() или os.system() внутри асинхронной функции для выполнения внешней команды. Эти вызовы не знают ничего о asyncio и просто блокируют поток до завершения процесса.
Решение: по возможности, заменять такие библиотеки их асинхронными аналогами. В экосистеме Python сейчас есть множество asyncio-friendly библиотек:
-
Для HTTP-запросов и веб-сервисов: aiohttp (асинхронный HTTP клиент/сервер) или современная httpx в async-режиме . Эти библиотеки позволяют выполнять сетевые запросы без блокировки event loop.
-
Для работы с базами данных: существуют async-драйверы, например aiomysql (MySQL/MariaDB), aiopg или более продвинутый asyncpg для PostgreSQL . Они основаны на неблокирующих технологиях (например, asyncpg написан с использованием сокетов без блокировки и обеспечивает очень высокую производительность).
-
Для других протоколов и сервисов: aioredis для Redis , aiofiles для файловых операций , aiozmq для ZeroMQ, aiokafka для Apache Kafka, и т.д. Практически для любых популярных средств существуют асинхронные обёртки.
Если же готовой асинхронной версии библиотеки нет, то следует запускать её вызовы в отдельном потоке или процессе, чтобы не остановить главный event loop (об этом далее). Например, в сообществе Home Assistant, где активно используется asyncio, выявлены проблемные места: часто разработчики по ошибке вызывают стандартные функции вроде open() или os.listdir() в async-коде. Решение – выносить эти вызовы через исполнение в пуле потоков . В официальной документации Home Assistant прямо указано: если библиотека делает блокирующие операции (например, синхронно читает из сети или файловой системы), её надо либо заменить на асинхронную, либо обернуть вызовы с помощью loop.run_in_executor(...) .
Признак “небезопасной” библиотеки – отсутствие ключевого слова await при её вызове. Если вы используете библиотеку в асинхронном коде и видите, что она предоставляет асинхронный API (например, функции, определённые через async def, или возвращает объекты awaitable), то, скорее всего, она безопасна. Если же библиотека полностью синхронна, то она потенциально опасна для event loop. Всегда проверяйте документацию: многие популярные библиотеки обзавелись async-режимами (например, в sqlite3 с Python 3.9 появился параллельный драйвер в составе asyncio, а в psycopg3 для PostgreSQL есть асинхронный интерфейс). Использование нативно-асинхронных библиотек – залог того, что ваш код не будет неожиданно “тормозить” в продакшене.
Советы по использованию asyncio-friendly библиотек
Чтобы минимизировать риск блокировки цикла событий, придерживайтесь следующих практик при выборе инструментов:
-
Выбирайте библиотеки с поддержкой async/await. При работе с вебом, базами данных, файлами и прочими внешними ресурсами предпочитайте аналоги из семейства aio-libs или других проектов, специально созданных для asyncio. Например, aiohttp для HTTP , asyncpg для PostgreSQL , aiomysql для MySQL , Motor (на базе asyncio) для MongoDB, aio_pika для RabbitMQ и т.д. Эти библиотеки написаны так, чтобы не блокировать поток: под капотом они либо используют неблокирующий ввод-вывод, либо сами запускают тяжёлые части в отдельных потоках.
-
Обратите внимание на GIL и C-расширения. Некоторые синхронные библиотеки на C могут не блокировать GIL при выполнении (например, определённые вычислительные библиотеки на NumPy могут отдавать управление). Однако рассчитывать на это не стоит. Общий принцип: если библиотека не документирована как асинхронная, нужно считать, что она блокирует event loop. Даже если она выполняет CPU-bound работу в чистом C и теоретически параллелится, ваш цикл событий всё равно не сможет переключиться, пока вызов не завершится.
В целом, экосистема Python активно движется к поддержке асинхронности. Перед тем как писать свой код, стоит провести небольшое исследование: есть ли у используемой вами библиотеки асинхронный аналог? Использование asyncio-friendly инструментов сразу избавляет от множества проблем.
Безопасная интеграция синхронного кода (run_in_executor и др.)
Несмотря на наличие async-аналогов, бывают ситуации, когда нужно вызвать существующую синхронную функцию в асинхронном приложении. Это может быть собственный CPU-bound код, сторонняя библиотека без async API, или просто однократная операция, которую не имеет смысла переписывать на asyncio. В таких случаях нужно выполнить синхронный код в отдельном потоке или процессе, чтобы он не остановил цикл событий. Модуль asyncio предоставляет для этого удобные инструменты.
loop.run_in_executor – запуск в пуле потоков или процессов
Метод loop.run_in_executor(executor, func, *args) позволяет передать функции func на выполнение во внешний пул. Если executor указать как None, будет использован пул потоков по умолчанию (ThreadPoolExecutor), связанный с текущим loop. Этот вызов мгновенно возвращает объект Future, а указанная функция func начнёт выполняться в отдельном потоке (или процессе). Вы можете await этот Future, не блокируя основной поток. Пример использования:
import asyncio
def blocking_io(path):
# например, чтение большого файла синхронно
with open(path, 'r') as f:
return f.read()
async def main():
loop = asyncio.get_running_loop()
# выполним blocking_io в другом потоке, чтобы не блокировать loop
data = await loop.run_in_executor(None, blocking_io, "big_file.txt")
print(f"Прочитано байт: {len(data)}")
asyncio.run(main())
Когда выполняется loop.run_in_executor(None, blocking_io, ...), цикл событий отдаёт задачу blocking_io в пул потоков и сразу продолжает работать дальше. В нашем случае мы тут же делаем await на возвращённом будущем, поэтому main приостановится до получения результата, но сам event loop останется активным и сможет выполнять другие корутины, пока файл читается на фоне. Как только blocking_io завершится в отдельном потоке, результат вернётся и продолжится выполнение main.
Такой подход рекомендуется использовать для любых операций ввода-вывода, для которых нет готовых async-решений. Например, если вам нужно дернуть REST API через стороннюю библиотеку, не поддерживающую async, – вызовите её через run_in_executor. В документации Python прямо указывается: “для сетевого логирования или другого IO, которое может блокировать, используйте отдельный поток” . run_in_executor как раз реализует эту рекомендацию.
CPU-bound задачи и ProcessPoolExecutor. Отдельного упоминания заслуживает случай, когда вы offload-ите в executor тяжёлую CPU-нагрузку. По умолчанию run_in_executor(None, ...) использует поток, а в Python глобальная блокировка интерпретатора (GIL) не даст двум потокам выполнять Python-байткод параллельно. То есть, если вы выполняете чисто вычислительную функцию (например, вычисление цифр числа Пи) через ThreadPoolExecutor, то выигрыш в том, что event loop не блокируется, но сама задача всё равно выполняется последовательно с остальным Python-кодом, перегрызаясь за GIL. В таких случаях лучше использовать ProcessPoolExecutor – пул процессов, в котором каждая задача пойдёт в отдельный интерпретатор Python без общего GIL. Благо, loop.run_in_executor позволяет передать кастомный executor. Например:
from concurrent.futures import ProcessPoolExecutor
def heavy_calc(n):
... # некий CPU-интенсивный расчёт
loop = asyncio.get_running_loop()
with ProcessPoolExecutor() as pool:
result = await loop.run_in_executor(pool, heavy_calc, 10**7)
Здесь мы создаём пул процессов и явно передаём его в run_in_executor. Теперь heavy_calc выполнится в отдельном процессе Python – параллельно и независимо, используя все доступные ядра. Такой способ позволяет действительно распараллелить CPU-bound задачи и не только не блокировать event loop, но и ускорить общую работу программы при наличии нескольких ядер CPU. Практика показывает: если ваша асинхронная программа должна выполнять тяжёлые вычисления, выносите их в ProcessPool . Потоки тоже могут использоваться, если код частично отпускает GIL (например, вызывает C-расширения), но это частный случай. В общем случае – процессный пул надёжнее для CPU. Python 3.8+ предлагает удобный high-level API для подобных сценариев: модуль concurrent.futures и функция asyncio.to_thread (аналог run_in_executor для потоков). Но помните ограничение: asyncio.to_thread и ThreadPool хороши для IO-bound задач, а для CPU-bound лучше сразу ProcessPool .
Обработка результатов и исключений. Когда вы выносите работу в executor, результат функции доступен через await её Future, как показано выше. Если функция выбросит исключение, оно всплывёт как ошибка при await (точно так же, как если бы оно произошло в корутине). Это удобно – можно обернуть вызов в try/except прямо в async-коде. Также стоит учитывать, что чрезмерное количество потоков или процессов может создаст нагрузку на систему: по умолчанию asyncio ограничивает пул потоков 32 рабочими (или min(32, os.cpu_count() + 4) в более новых версиях) , что в большинстве случаев оптимально. Не забывайте, что запуск слишком многих задач сразу может ухудшить производительность из-за конкуренции за ресурсы.
Итак, алгоритм безопасной интеграции синхронного кода в asyncio-приложение таков:
-
Выявите потенциально блокирующие вызовы. Посмотрите, какие функции могут выполняться долго или ждать IO синхронно.
-
По возможности замените их на await-аналоги. Если библиотека предлагает async-вариант, применяйте его (например, await session.get(...) вместо requests.get()).
-
Если замены нет – используйте asyncio.to_thread или loop.run_in_executor. Оберните вызов в await asyncio.to_thread(func, *args), чтобы выполнить его в отдельном потоке . Это самый простой способ для ввода-вывода. Если задача CPU-bound и критична скорость, рассмотрите ProcessPool.
-
Обрабатывайте результат, как обычно, через await. Асинхронный код не усложняет обработку ошибок – вы можете ловить их try/except, объединять результаты через asyncio.gather и т.д., даже если внутри Future работал синхронный код.
Придерживаясь этих правил, вы сможете воспользоваться преимуществами существующих синхронных библиотек, не жертвуя отзывчивостью вашего приложения.
Инструменты диагностики проблем с event loop
Даже следуя лучшим практикам, бывает непросто сразу определить, что именно тормозит или блокирует цикл событий. К счастью, есть инструменты, помогающие диагностировать такие проблемы:
-
Режим отладки asyncio. Встроенный debug-режим asyncio выдаёт предупреждения, если какие-то задачи ведут себя подозрительно долго. Достаточно запустить цикл с параметром debug=True (например, asyncio.run(main(), debug=True) или установить в коде loop = asyncio.get_running_loop(); loop.set_debug(True)) . При этом asyncio начнёт логировать предупреждения, когда обнаружит, что выполнение колбэка или корутины заняло больше 0.1 секунды без переключения . Сообщение вида “Executing <Task …> took 0.109 seconds” указывает, что некая задача блокировала главный поток ~0.1 с. Это ценная подсказка: возможно, внутри этой задачи есть либо прямой блокирующий вызов, либо просто очень долгий участок работы без await . Вы можете настроить порог чувствительности (параметр loop.slow_callback_duration) или игнорировать эти предупреждения, но в общем случае они помогают выявить узкие места. Режим отладки также включает дополнительные проверки (например, предупреждает о забытых await и не закрытых ресурсах), поэтому крайне желательно включать его как минимум в период разработки .
-
aiomonitor. Это сторонний инструмент, позволяющий подключиться к запущенному event loop и интерактивно просмотреть, что в нём происходит. При использовании aiomonitor ваше приложение поднимает небольшой Telnet-сервер, через который можно удалённо выполнять команды диагностики . Например, команда tasks выведет список всех текущих задач и статусов, можно получить стек трейсы зависших корутин, отменять задачи и т.д. Фактически, aiomonitor даёт REPL-консоль, привязанную к вашему asyncio-приложению, что очень облегчает отладку зависаний. Если вы подозреваете, что event loop “подвис”, подключитесь через aiomonitor и посмотрите, какая корутина не отпускает управление. Часто сразу видно, что некая задача находится в состоянии Running слишком долго – это кандидат на проблему. Установка: pip install aiomonitor. Использование: открыть соединение (telnet host port) и в интерактивном режиме применять команды (help покажет доступные). aiomonitor – инструмент разработчиков asyncio, он легковесный и может даже применяться в продакшне для проверки состояния сервиса .
-
uvloop. Хотя uvloop не столько про диагностику, сколько про производительность, стоит упомянуть его как опцию. uvloop – это альтернативная реализация event loop для asyncio, написанная на основе библиотеки libuv (С++) . Она полностью совместима с asyncio API, достаточно установить и активировать её (uvloop.install() или использовать uvloop.run()). Главная причина попробовать uvloop – увеличение скорости работы: по данным авторов, uvloop ускоряет выполнение asyncio-задач в 2–4 раза . Если вы видите, что ваш event loop загружен большим числом задач и начинает проседать (например, много небольших задач со временем накладных расходов), замена на uvloop может улучшить ситуацию. Конечно, uvloop не решит проблемы блокирующего кода (если вы вызываете time.sleep, то хоть на uvloop, хоть на стандартном loop – все остановится одинаково). Однако он снижает общие издержки на переключение задач, обработку сетевых событий и т.д., что может чуть повысить “порог боли”, прежде чем лаги станут заметны. Многие фреймворки (например, веб-сервер Uvicorn) по умолчанию используют uvloop из соображений производительности. С точки зрения диагностики, uvloop также совместим с asyncio debug режимом и выводит те же предупреждения, просто работает быстрее.
Помимо перечисленных, существуют и другие утилиты: профилировщики, заточенные под async (например, yappi умеет профилировать асинхронные программы), утилиты типа asyncio-run-in-debug для интеграции с pdb, и даже специальные патчи, бросающие исключение при вызове заведомо блокирующих функций в корутине (например, проект BlockBuster monkey-patch’ит time.sleep и др., чтобы сразу выявлять такие ошибки) . Но в большинстве случаев вам хватит стандартного debug-режима и aiomonitor для живого анализа.
Заключение: краткие рекомендации и best practices
Написание эффективного асинхронного кода сводится к одному принципу: никогда не блокировать event loop. Подводя итог, вот список лучших практик, которые помогут этого добиться:
-
Каждая длительная операция должна await-иться. Если вы вызываете что-то, что может занять миллисекунды и более – убедитесь, что это await func() а не просто func(). Если нет возможности вызвать с await (т.е. функция синхронная), нужно вынести её из основного потока.
-
Не используйте блокирующие функции в корутинах. Вместо time.sleep, threading.Event.wait и подобных – используйте асинхронные варианты (await asyncio.sleep, ожидание asyncio.Event и т.д.) . Тоже касается чтения/записи файлов: либо aiofiles, либо run_in_executor для стандартных вызовов open().
-
Выбирайте async-совместимые библиотеки. HTTP-запросы через aiohttp/httpx, БД через aiomysql/asyncpg, Redis через aioredis, и т.д. – это избавит от множества проблем с блокировкой event loop .
-
Offload тяжелые задачи. Если нужно выполнить вычисление или синхронный вызов, который занимает много времени – запускайте его в отдельном потоке/процессе через asyncio.to_thread или loop.run_in_executor. При интенсивных CPU-вычислениях используйте ProcessPoolExecutor, чтобы обходить ограничения GIL .
-
Не злоупотребляйте параллелизмом внутри asyncio. Асинхронность позволяет легко запускать сотни задач, но помните, что все они разделяют один поток CPU. Планируйте количество одновременно работающих корутин (например, ограничивайте пул воркеров семафорами) – это поможет избежать ситуации, когда вы создаёте слишком много конкурирующих задач.
-
Отлаживайте и мониторьте. Включайте режим отладки asyncio в разработке – предупреждения о “непрогрессивных” задачах укажут на проблемные места . Используйте aiomonitor или аналогичные средства мониторинга в сложных приложениях, чтобы иметь возможность заглянуть внутрь вашего event loop и увидеть, что он делает в режиме реального времени.
-
Используйте uvloop для производительности (если это критично для вас). Замена event loop на более быстрый не решит логических ошибок, но позволит вашему приложению лучше масштабироваться при высокой нагрузке (сохраняя при этом все свойства asyncio) .
Следуя этим рекомендациям, опытный Python-разработчик сможет писать “вечнозелёный” асинхронный код, который остаётся эффективным и актуальным со временем. Главное – помнить о фундаментальных принципах asyncio: кооперативность, однопоточность, необходимость отдавать управление планировщику. Если ваш код дружелюбен к этим принципам, то и event-loop будет всегда отзывчивым, а приложение – масштабируемым и быстрым. Постоянно задавайте себе вопрос: “Не блокирую ли я случайно event loop этой строчкой кода?” – и тогда вы автоматизированно избежите большинства подводных камней асинхронного программирования. Ваш async-код будет действительно асинхронным, как и задумано.