Подробное руководство по `asyncio.Condition` в Python

Подробное руководство по `asyncio.Condition` в Python


Подробное руководство по asyncio.Condition в Python

asyncio.Condition — мощный примитив синхронизации для асинхронных приложений, позволяющий корутинам эффективно взаимодействовать при изменении общих состояний. В отличие от простых блокировок, Condition реализует модель “ожидания-уведомления”, где задачи могут ожидать выполнения определенных условий и получать оповещения при их изменении.


Основные концепции

  1. Состояние (Condition)
    Логическое условие, связанное с общим ресурсом (например, “очередь пуста”, “буфер заполнен”).

  2. Блокировка (Lock)
    Встроенная блокировка, гарантирующая эксклюзивный доступ к состоянию. Все операции с Condition требуют захвата блокировки.

  3. Ожидание (Wait)
    Корутина освобождает блокировку и приостанавливается, пока не получит уведомление.

  4. Уведомление (Notify)
    Пробуждение одной или всех ожидающих задач при изменении состояния.


Ключевые методы

import asyncio

condition = asyncio.Condition()

# Захват блокировки
await condition.acquire()

# Освобождение блокировки
condition.release()

# Ожидание уведомления
await condition.wait()

# Ожидание выполнения предиката
await condition.wait_for(predicate)

# Пробуждение N ожидающих задач
condition.notify(N)

# Пробуждение всех ожидающих
condition.notify_all()

Пример 1: Очередь с ограниченной емкостью

Реализация производителя (producer) и потребителя (consumer), синхронизированных через Condition.

import asyncio

class BoundedBuffer:
    def __init__(self, capacity):
        self.capacity = capacity
        self.buffer = []
        self.condition = asyncio.Condition()

    async def produce(self, item):
        async with self.condition:  # Автоматический захват блокировки
            # Ждем, пока не освободится место
            await self.condition.wait_for(lambda: len(self.buffer) < self.capacity)
            
            self.buffer.append(item)
            print(f"Добавлен: {item}, размер: {len(self.buffer)}")
            
            # Уведомляем потребителей
            self.condition.notify_all()

    async def consume(self):
        async with self.condition:
            # Ждем, пока в буфере есть элементы
            await self.condition.wait_for(lambda: len(self.buffer) > 0)
            
            item = self.buffer.pop(0)
            print(f"Извлечен: {item}, размер: {len(self.buffer)}")
            
            # Уведомляем производителей
            self.condition.notify_all()

Тестирование:

async def test_buffer():
    buffer = BoundedBuffer(3)
    
    producers = [
        asyncio.create_task(buffer.produce(i)) 
        for i in range(5)
    ]
    
    consumers = [
        asyncio.create_task(buffer.consume()) 
        for _ in range(2)
    ]
    
    await asyncio.gather(*producers, *consumers)

asyncio.run(test_buffer())

Вывод:

Добавлен: 0, размер: 1
Добавлен: 1, размер: 2
Добавлен: 2, размер: 3
Извлечен: 0, размер: 2
Добавлен: 3, размер: 3
Извлечен: 1, размер: 2
Добавлен: 4, размер: 3
Извлечен: 2, размер: 2
Извлечен: 3, размер: 1
Извлечен: 4, размер: 0

Пример 2: Барьер синхронизации

Задачи ожидают достижения общего состояния “готовности”.

class SyncBarrier:
    def __init__(self, count):
        self.count = count
        self.ready = 0
        self.condition = asyncio.Condition()

    async def wait(self):
        async with self.condition:
            self.ready += 1
            
            if self.ready < self.count:
                await self.condition.wait()  # Ожидаем остальных
            else:
                self.condition.notify_all()  # Запускаем все задачи

Использование:

async def worker(barrier, id):
    print(f"Задача {id} запущена")
    await asyncio.sleep(id)
    await barrier.wait()
    print(f"Задача {id} прошла барьер")

async def main():
    barrier = SyncBarrier(3)
    tasks = [asyncio.create_task(worker(barrier, i)) for i in range(3)]
    await asyncio.gather(*tasks)

asyncio.run(main())

Вывод:

Задача 0 запущена
Задача 1 запущена
Задача 2 запущена
... (пауза) ...
Задача 2 прошла барьер
Задача 1 прошла барьер
Задача 0 прошла барьер

Пример 3: Управление доступом к ресурсу

Ограничение одновременного доступа к ресурсу с проверкой состояния.

class ResourceManager:
    def __init__(self):
        self.resources = {"A": True, "B": True}  # Ресурсы свободны
        self.condition = asyncio.Condition()

    async def acquire(self, resource_id):
        async with self.condition:
            # Ждем, пока ресурс освободится
            await self.condition.wait_for(lambda: self.resources.get(resource_id))
            
            self.resources[resource_id] = False  # Захватываем ресурс
            print(f"Ресурс {resource_id} захвачен")

    async def release(self, resource_id):
        async with self.condition:
            self.resources[resource_id] = True
            print(f"Ресурс {resource_id} освобожден")
            self.condition.notify_all()  # Уведомляем ожидающих

Особенности работы

  1. Автоматическое управление блокировкой
    Использование async with condition гарантирует корректный захват/освобождение блокировки.

  2. Предикаты в wait_for()
    Условие проверяется только после захвата блокировки, что гарантирует актуальность данных.

  3. Liveness-гарантии

    • notify() пробуждает минимум одну задачу (но порядок не гарантирован).
    • notify_all() пробуждает все задачи, конкурирующие за блокировку.
  4. Защита от ложных пробуждений
    Всегда используйте wait_for() с предикатом вместо цикла с wait().


Типичные ошибки

  1. Уведомление без изменения состояния

    # Ошибка: notify() без изменения условия
    async with condition:
        condition.notify_all()  # Задачи проснутся, но условие не изменилось!
  2. Долгий предикат
    Предикат должен быть быстрой функцией без await, иначе блокируется общий ресурс.

  3. Потеря уведомлений
    Вызов notify() должен происходить после изменения состояния, связанного с предикатом.


Паттерны использования

  1. Очереди с приоритетами
    Задачи ожидают не только наличия данных, но и выполнения условий (например, “первый элемент очереди — высокий приоритет”).

  2. Реактивное программирование
    Автоматическое обновления кэша при изменении данных:

    async def cache_updater(cache, condition):
        async with condition:
            await condition.wait_for(has_updates)
            cache.refresh()
  3. Координация распределенных задач
    Синхронизация этапов в ETL-пайплайне или распределенных вычислениях.


Сравнение с другими примитивами

ПримитивНазначениеОсобенности
ConditionОжидание сложных условий+ Гибкость, - Сложность реализации
EventОжидание единичного события+ Простота, - Нет проверки состояния
SemaphoreОграничение доступа N потоков- Нет связи с состоянием данных
QueueОбмен данными через буферВстроенная реализация Producer/Consumer

Производительность и оптимизация

  1. Используйте notify(N) вместо notify_all()
    Если достаточно пробудить часть задач для обработки изменения.

  2. Минимизируйте время удержания блокировки
    Выносите долгие операции (сетевые запросы, вычисления) за пределы async with condition.

  3. Избегайте вложенных условий
    Вместо:

    async with condition_a:
        async with condition_b:  # Риск взаимоблокировки!

    Используйте единый Condition для связанных состояний.


Заключение

asyncio.Condition — ключевой инструмент для сложной координации асинхронных задач. Он сочетает безопасность блокировок с гибкостью реактивной модели “ожидание-уведомление”. Правильное применение этого примитива позволяет:

  • Реализовать эффективные шаблоны Producer/Consumer
  • Синхронизировать распределенные этапы вычислений
  • Управлять доступом к ресурсам на основе состояний
  • Избегать активного ожидания (busy-waiting)

Освоение Condition критически важно для создания высоконагруженных асинхронных систем с гарантированной согласованностью данных.