Глубокое погружение в `asyncio.Semaphore` в Python: Контроль параллелизма с примерами

Глубокое погружение в `asyncio.Semaphore` в Python: Контроль параллелизма с примерами


Глубокое погружение в asyncio.Semaphore в Python: Контроль параллелизма с примерами

Объём: ~20,000 символов


1. Введение в асинхронность и проблему параллелизма

Асинхронное программирование в Python через asyncio позволяет эффективно выполнять I/O-операции без блокировок. Однако при работе с ограниченными ресурсами (сетевые соединения, API с rate-limiting, базы данных) неконтролируемый параллелизм вызывает проблемы:

  • Перегрузка серверов/API.
  • Исчерпание файловых дескрипторов.
  • Ошибки типа Too many open files.

Семафоры решают эти задачи, ограничивая количество одновременных операций.


2. Что такое семафор? Теория

Семафор — примитив синхронизации, управляющий доступом к общему ресурсу. Состоит из счётчика и очереди ожидания:

  • Счётчик: Максимальное число “разрешений” на доступ.
  • Принцип работы:
    • Корутина “захватывает” семафор (acquire()), уменьшая счётчик. Если счётчик = 0 — корутина ждёт.
    • После завершения операции корутина “освобождает” семафор (release()), увеличивая счётчик и пробуждая ждущие корутины.

В asyncio реализован через asyncio.Semaphore.


3. Создание и базовое использование asyncio.Semaphore

import asyncio

sem = asyncio.Semaphore(value=3)  # Максимум 3 одновременных операции

Контекстный менеджер async with (рекомендуемый способ):

async def task(id):
    async with sem:  # Автоматический acquire/release
        print(f"Задача {id} начала работу")
        await asyncio.sleep(1)
        print(f"Задача {id} завершилась")

Ручное управление:

async def task(id):
    await sem.acquire()  # Ждём разрешения
    try:
        print(f"Задача {id} начала работу")
        await asyncio.sleep(1)
    finally:
        sem.release()  # Важно! Даже при ошибке

4. Подробный пример: Ограничение запросов к API

Задача: Отправить 100 запросов к API с ограничением 5 одновременных запросов.

import aiohttp
import asyncio

async def fetch(url, sem, id):
    async with sem:
        async with aiohttp.ClientSession() as session:
            print(f"Запрос {id} начат")
            async with session.get(url) as response:
                data = await response.json()
                print(f"Запрос {id} завершён, статус: {response.status}")
                return data

async def main():
    sem = asyncio.Semaphore(5)
    urls = ["https://api.example.com/data"] * 100
    tasks = [
        fetch(url, sem, i)
        for i, url in enumerate(urls)
    ]
    results = await asyncio.gather(*tasks)

asyncio.run(main())

Как это работает:

  1. Создаётся семафор на 5 разрешений.
  2. 100 корутин fetch пытаются выполнить запрос.
  3. Только 5 корутин одновременно получают доступ к API.
  4. Как только одна завершается, следующая в очереди активируется.

5. Обработка ошибок и гарантия освобождения семафора

Проблема: Если в корутине возникает ошибка до вызова release(), семафор “застревает”.
Решение: Использовать try/finally или контекстный менеджер.

Пример с обработкой исключений:

async def safe_task(id):
    await sem.acquire()
    try:
        print(f"Задача {id} работает")
        await asyncio.sleep(0.5)
        if id == 3:
            raise ValueError("Ошибка в задаче 3!")
    finally:
        sem.release()  # Гарантированное освобождение

6. Динамическое изменение лимита семафора

Лимит семафора можно менять “на лету”:

sem = asyncio.Semaphore(3)
print(sem._value)  # 3

# Увеличить лимит до 5
sem._value = 5  # Внутренняя переменная, осторожно!

# Безопасный способ через наследование:
class DynamicSemaphore(asyncio.Semaphore):
    def set_limit(self, value):
        if value < 0:
            raise ValueError("Limit must be >= 0")
        self._value = value

7. Очередь ожидания и приоритеты

Семафор использует FIFO-очередь (первый пришёл — первый вышел). Приоритеты задаются через отдельную логику:

# Пример с приоритетными задачами
high_priority_tasks = []
low_priority_tasks = []

async def prioritized_task(id, priority):
    if priority == "high":
        high_priority_tasks.append(id)
    else:
        low_priority_tasks.append(id)
    
    await sem.acquire()
    try:
        print(f"Выполняется задача {id} ({priority})")
        await asyncio.sleep(1)
    finally:
        sem.release()

8. Семафор vs Библютоки (BoundedSemaphore)

  • Semaphore: Число release() может превысить начальное значение.
  • BoundedSemaphore: Выбрасывает ValueError при превышении лимита.
sem = asyncio.BoundedSemaphore(2)
await sem.acquire()
sem.release()
sem.release()  # ValueError: Semaphore released too many times

Рекомендация: Используйте BoundedSemaphore для избежания логических ошибок.


9. Реальные кейсы использования

  1. Парсинг веб-страниц: Ограничение параллельных запросов к сайту.
  2. Работа с базами данных: Контроль соединений с СУБД.
  3. Интеграция с API с rate-limiting: 100 запросов в минуту.
  4. Управление памятью: Ограничение одновременной обработки больших файлов.

10. Пример: Семафоры в цепочках корутин

Семафоры можно передавать между корутинами для координации:

async def worker(sem, data):
    async with sem:
        processed = await process_data(data)
        await send_to_db(processed)  # Внутри тоже может быть семафор!

async def process_data(data):
    ...

async def send_to_db(data):
    async with db_sem:  # Другой семафор для БД
        ...

11. Отладка и мониторинг

Проверка состояния семафора:

print("Свободные слоты:", sem._value)  # Текущий счётчик
print("Ожидающие задачи:", len(sem._waiters))  # Корутины в очереди

Визуализация через логи:

async def task(id):
    print(f"[DEBUG] Задача {id} ждёт семафора (свободно: {sem._value})")
    async with sem:
        print(f"[DEBUG] Задача {id} получила семафор (свободно: {sem._value})")
        ...

12. Ограничения и лучшие практики

  • Не блокируйте семафор надолго: Используйте только для I/O, не для CPU-bound операций.
  • Избегайте вложенных семафоров: Риск взаимоблокировок (deadlocks).
  • Тестируйте под нагрузкой: Убедитесь, что лимит оптимален.
  • Комбинируйте с другими примитивами: asyncio.Event, asyncio.Lock для сложных сценариев.

13. Полный пример: Скачивание файлов с ограничением

import aiohttp
import asyncio
import os

async def download_file(url, sem, folder, id):
    async with sem:
        async with aiohttp.ClientSession() as session:
            async with session.get(url) as response:
                filename = url.split("/")[-1]
                path = os.path.join(folder, filename)
                with open(path, "wb") as f:
                    while chunk := await response.content.read(1024):
                        f.write(chunk)
                print(f"Файл {id} скачан: {filename}")

async def main():
    sem = asyncio.Semaphore(10)  # 10 параллельных загрузок
    urls = [
        "https://example.com/file1.zip",
        "https://example.com/file2.jpg",
        # ... 100 ссылок
    ]
    tasks = [
        download_file(url, sem, "downloads", i)
        for i, url in enumerate(urls)
    ]
    await asyncio.gather(*tasks)

asyncio.run(main())

14. Заключение

asyncio.Semaphore — ключевой инструмент для:

  • Контроля параллелизма в асинхронных приложениях.
  • Предотвращения перегрузки ресурсов.
  • Реализации сложных сценариев синхронизации.

Главные правила:

  • Всегда используйте async with для гарантии освобождения.
  • Выбирайте лимит на основе тестов.
  • Для жёстких ограничений применяйте BoundedSemaphore.

Используя семафоры, вы создаёте отказоустойчивые и эффективные асинхронные системы, готовые к высоким нагрузкам.