Подробное руководство по `asyncio.Condition` в Python
Подробное руководство по asyncio.Condition в Python
asyncio.Condition — мощный примитив синхронизации для асинхронных приложений, позволяющий корутинам эффективно взаимодействовать при изменении общих состояний. В отличие от простых блокировок, Condition реализует модель “ожидания-уведомления”, где задачи могут ожидать выполнения определенных условий и получать оповещения при их изменении.
Основные концепции
-
Состояние (Condition)
Логическое условие, связанное с общим ресурсом (например, “очередь пуста”, “буфер заполнен”). -
Блокировка (Lock)
Встроенная блокировка, гарантирующая эксклюзивный доступ к состоянию. Все операции сConditionтребуют захвата блокировки. -
Ожидание (Wait)
Корутина освобождает блокировку и приостанавливается, пока не получит уведомление. -
Уведомление (Notify)
Пробуждение одной или всех ожидающих задач при изменении состояния.
Ключевые методы
import asyncio
condition = asyncio.Condition()
# Захват блокировки
await condition.acquire()
# Освобождение блокировки
condition.release()
# Ожидание уведомления
await condition.wait()
# Ожидание выполнения предиката
await condition.wait_for(predicate)
# Пробуждение N ожидающих задач
condition.notify(N)
# Пробуждение всех ожидающих
condition.notify_all()
Пример 1: Очередь с ограниченной емкостью
Реализация производителя (producer) и потребителя (consumer), синхронизированных через Condition.
import asyncio
class BoundedBuffer:
def __init__(self, capacity):
self.capacity = capacity
self.buffer = []
self.condition = asyncio.Condition()
async def produce(self, item):
async with self.condition: # Автоматический захват блокировки
# Ждем, пока не освободится место
await self.condition.wait_for(lambda: len(self.buffer) < self.capacity)
self.buffer.append(item)
print(f"Добавлен: {item}, размер: {len(self.buffer)}")
# Уведомляем потребителей
self.condition.notify_all()
async def consume(self):
async with self.condition:
# Ждем, пока в буфере есть элементы
await self.condition.wait_for(lambda: len(self.buffer) > 0)
item = self.buffer.pop(0)
print(f"Извлечен: {item}, размер: {len(self.buffer)}")
# Уведомляем производителей
self.condition.notify_all()
Тестирование:
async def test_buffer():
buffer = BoundedBuffer(3)
producers = [
asyncio.create_task(buffer.produce(i))
for i in range(5)
]
consumers = [
asyncio.create_task(buffer.consume())
for _ in range(2)
]
await asyncio.gather(*producers, *consumers)
asyncio.run(test_buffer())
Вывод:
Добавлен: 0, размер: 1
Добавлен: 1, размер: 2
Добавлен: 2, размер: 3
Извлечен: 0, размер: 2
Добавлен: 3, размер: 3
Извлечен: 1, размер: 2
Добавлен: 4, размер: 3
Извлечен: 2, размер: 2
Извлечен: 3, размер: 1
Извлечен: 4, размер: 0
Пример 2: Барьер синхронизации
Задачи ожидают достижения общего состояния “готовности”.
class SyncBarrier:
def __init__(self, count):
self.count = count
self.ready = 0
self.condition = asyncio.Condition()
async def wait(self):
async with self.condition:
self.ready += 1
if self.ready < self.count:
await self.condition.wait() # Ожидаем остальных
else:
self.condition.notify_all() # Запускаем все задачи
Использование:
async def worker(barrier, id):
print(f"Задача {id} запущена")
await asyncio.sleep(id)
await barrier.wait()
print(f"Задача {id} прошла барьер")
async def main():
barrier = SyncBarrier(3)
tasks = [asyncio.create_task(worker(barrier, i)) for i in range(3)]
await asyncio.gather(*tasks)
asyncio.run(main())
Вывод:
Задача 0 запущена
Задача 1 запущена
Задача 2 запущена
... (пауза) ...
Задача 2 прошла барьер
Задача 1 прошла барьер
Задача 0 прошла барьер
Пример 3: Управление доступом к ресурсу
Ограничение одновременного доступа к ресурсу с проверкой состояния.
class ResourceManager:
def __init__(self):
self.resources = {"A": True, "B": True} # Ресурсы свободны
self.condition = asyncio.Condition()
async def acquire(self, resource_id):
async with self.condition:
# Ждем, пока ресурс освободится
await self.condition.wait_for(lambda: self.resources.get(resource_id))
self.resources[resource_id] = False # Захватываем ресурс
print(f"Ресурс {resource_id} захвачен")
async def release(self, resource_id):
async with self.condition:
self.resources[resource_id] = True
print(f"Ресурс {resource_id} освобожден")
self.condition.notify_all() # Уведомляем ожидающих
Особенности работы
-
Автоматическое управление блокировкой
Использованиеasync with conditionгарантирует корректный захват/освобождение блокировки. -
Предикаты в
wait_for()
Условие проверяется только после захвата блокировки, что гарантирует актуальность данных. -
Liveness-гарантии
notify()пробуждает минимум одну задачу (но порядок не гарантирован).notify_all()пробуждает все задачи, конкурирующие за блокировку.
-
Защита от ложных пробуждений
Всегда используйтеwait_for()с предикатом вместо цикла сwait().
Типичные ошибки
-
Уведомление без изменения состояния
# Ошибка: notify() без изменения условия async with condition: condition.notify_all() # Задачи проснутся, но условие не изменилось! -
Долгий предикат
Предикат должен быть быстрой функцией без await, иначе блокируется общий ресурс. -
Потеря уведомлений
Вызовnotify()должен происходить после изменения состояния, связанного с предикатом.
Паттерны использования
-
Очереди с приоритетами
Задачи ожидают не только наличия данных, но и выполнения условий (например, “первый элемент очереди — высокий приоритет”). -
Реактивное программирование
Автоматическое обновления кэша при изменении данных:async def cache_updater(cache, condition): async with condition: await condition.wait_for(has_updates) cache.refresh() -
Координация распределенных задач
Синхронизация этапов в ETL-пайплайне или распределенных вычислениях.
Сравнение с другими примитивами
| Примитив | Назначение | Особенности |
|---|---|---|
Condition | Ожидание сложных условий | + Гибкость, - Сложность реализации |
Event | Ожидание единичного события | + Простота, - Нет проверки состояния |
Semaphore | Ограничение доступа N потоков | - Нет связи с состоянием данных |
Queue | Обмен данными через буфер | Встроенная реализация Producer/Consumer |
Производительность и оптимизация
-
Используйте
notify(N)вместоnotify_all()
Если достаточно пробудить часть задач для обработки изменения. -
Минимизируйте время удержания блокировки
Выносите долгие операции (сетевые запросы, вычисления) за пределыasync with condition. -
Избегайте вложенных условий
Вместо:async with condition_a: async with condition_b: # Риск взаимоблокировки!Используйте единый
Conditionдля связанных состояний.
Заключение
asyncio.Condition — ключевой инструмент для сложной координации асинхронных задач. Он сочетает безопасность блокировок с гибкостью реактивной модели “ожидание-уведомление”. Правильное применение этого примитива позволяет:
- Реализовать эффективные шаблоны Producer/Consumer
- Синхронизировать распределенные этапы вычислений
- Управлять доступом к ресурсам на основе состояний
- Избегать активного ожидания (busy-waiting)
Освоение Condition критически важно для создания высоконагруженных асинхронных систем с гарантированной согласованностью данных.