Глубокое погружение в `asyncio.Semaphore` в Python: Контроль параллелизма с примерами
Глубокое погружение в asyncio.Semaphore в Python: Контроль параллелизма с примерами
Объём: ~20,000 символов
1. Введение в асинхронность и проблему параллелизма
Асинхронное программирование в Python через asyncio позволяет эффективно выполнять I/O-операции без блокировок. Однако при работе с ограниченными ресурсами (сетевые соединения, API с rate-limiting, базы данных) неконтролируемый параллелизм вызывает проблемы:
- Перегрузка серверов/API.
- Исчерпание файловых дескрипторов.
- Ошибки типа
Too many open files.
Семафоры решают эти задачи, ограничивая количество одновременных операций.
2. Что такое семафор? Теория
Семафор — примитив синхронизации, управляющий доступом к общему ресурсу. Состоит из счётчика и очереди ожидания:
- Счётчик: Максимальное число “разрешений” на доступ.
- Принцип работы:
- Корутина “захватывает” семафор (
acquire()), уменьшая счётчик. Если счётчик = 0 — корутина ждёт. - После завершения операции корутина “освобождает” семафор (
release()), увеличивая счётчик и пробуждая ждущие корутины.
- Корутина “захватывает” семафор (
В asyncio реализован через asyncio.Semaphore.
3. Создание и базовое использование asyncio.Semaphore
import asyncio
sem = asyncio.Semaphore(value=3) # Максимум 3 одновременных операции
Контекстный менеджер async with (рекомендуемый способ):
async def task(id):
async with sem: # Автоматический acquire/release
print(f"Задача {id} начала работу")
await asyncio.sleep(1)
print(f"Задача {id} завершилась")
Ручное управление:
async def task(id):
await sem.acquire() # Ждём разрешения
try:
print(f"Задача {id} начала работу")
await asyncio.sleep(1)
finally:
sem.release() # Важно! Даже при ошибке
4. Подробный пример: Ограничение запросов к API
Задача: Отправить 100 запросов к API с ограничением 5 одновременных запросов.
import aiohttp
import asyncio
async def fetch(url, sem, id):
async with sem:
async with aiohttp.ClientSession() as session:
print(f"Запрос {id} начат")
async with session.get(url) as response:
data = await response.json()
print(f"Запрос {id} завершён, статус: {response.status}")
return data
async def main():
sem = asyncio.Semaphore(5)
urls = ["https://api.example.com/data"] * 100
tasks = [
fetch(url, sem, i)
for i, url in enumerate(urls)
]
results = await asyncio.gather(*tasks)
asyncio.run(main())
Как это работает:
- Создаётся семафор на 5 разрешений.
- 100 корутин
fetchпытаются выполнить запрос. - Только 5 корутин одновременно получают доступ к API.
- Как только одна завершается, следующая в очереди активируется.
5. Обработка ошибок и гарантия освобождения семафора
Проблема: Если в корутине возникает ошибка до вызова release(), семафор “застревает”.
Решение: Использовать try/finally или контекстный менеджер.
Пример с обработкой исключений:
async def safe_task(id):
await sem.acquire()
try:
print(f"Задача {id} работает")
await asyncio.sleep(0.5)
if id == 3:
raise ValueError("Ошибка в задаче 3!")
finally:
sem.release() # Гарантированное освобождение
6. Динамическое изменение лимита семафора
Лимит семафора можно менять “на лету”:
sem = asyncio.Semaphore(3)
print(sem._value) # 3
# Увеличить лимит до 5
sem._value = 5 # Внутренняя переменная, осторожно!
# Безопасный способ через наследование:
class DynamicSemaphore(asyncio.Semaphore):
def set_limit(self, value):
if value < 0:
raise ValueError("Limit must be >= 0")
self._value = value
7. Очередь ожидания и приоритеты
Семафор использует FIFO-очередь (первый пришёл — первый вышел). Приоритеты задаются через отдельную логику:
# Пример с приоритетными задачами
high_priority_tasks = []
low_priority_tasks = []
async def prioritized_task(id, priority):
if priority == "high":
high_priority_tasks.append(id)
else:
low_priority_tasks.append(id)
await sem.acquire()
try:
print(f"Выполняется задача {id} ({priority})")
await asyncio.sleep(1)
finally:
sem.release()
8. Семафор vs Библютоки (BoundedSemaphore)
Semaphore: Числоrelease()может превысить начальное значение.BoundedSemaphore: ВыбрасываетValueErrorпри превышении лимита.
sem = asyncio.BoundedSemaphore(2)
await sem.acquire()
sem.release()
sem.release() # ValueError: Semaphore released too many times
Рекомендация: Используйте BoundedSemaphore для избежания логических ошибок.
9. Реальные кейсы использования
- Парсинг веб-страниц: Ограничение параллельных запросов к сайту.
- Работа с базами данных: Контроль соединений с СУБД.
- Интеграция с API с rate-limiting: 100 запросов в минуту.
- Управление памятью: Ограничение одновременной обработки больших файлов.
10. Пример: Семафоры в цепочках корутин
Семафоры можно передавать между корутинами для координации:
async def worker(sem, data):
async with sem:
processed = await process_data(data)
await send_to_db(processed) # Внутри тоже может быть семафор!
async def process_data(data):
...
async def send_to_db(data):
async with db_sem: # Другой семафор для БД
...
11. Отладка и мониторинг
Проверка состояния семафора:
print("Свободные слоты:", sem._value) # Текущий счётчик
print("Ожидающие задачи:", len(sem._waiters)) # Корутины в очереди
Визуализация через логи:
async def task(id):
print(f"[DEBUG] Задача {id} ждёт семафора (свободно: {sem._value})")
async with sem:
print(f"[DEBUG] Задача {id} получила семафор (свободно: {sem._value})")
...
12. Ограничения и лучшие практики
- Не блокируйте семафор надолго: Используйте только для I/O, не для CPU-bound операций.
- Избегайте вложенных семафоров: Риск взаимоблокировок (deadlocks).
- Тестируйте под нагрузкой: Убедитесь, что лимит оптимален.
- Комбинируйте с другими примитивами:
asyncio.Event,asyncio.Lockдля сложных сценариев.
13. Полный пример: Скачивание файлов с ограничением
import aiohttp
import asyncio
import os
async def download_file(url, sem, folder, id):
async with sem:
async with aiohttp.ClientSession() as session:
async with session.get(url) as response:
filename = url.split("/")[-1]
path = os.path.join(folder, filename)
with open(path, "wb") as f:
while chunk := await response.content.read(1024):
f.write(chunk)
print(f"Файл {id} скачан: {filename}")
async def main():
sem = asyncio.Semaphore(10) # 10 параллельных загрузок
urls = [
"https://example.com/file1.zip",
"https://example.com/file2.jpg",
# ... 100 ссылок
]
tasks = [
download_file(url, sem, "downloads", i)
for i, url in enumerate(urls)
]
await asyncio.gather(*tasks)
asyncio.run(main())
14. Заключение
asyncio.Semaphore — ключевой инструмент для:
- Контроля параллелизма в асинхронных приложениях.
- Предотвращения перегрузки ресурсов.
- Реализации сложных сценариев синхронизации.
Главные правила:
- Всегда используйте
async withдля гарантии освобождения. - Выбирайте лимит на основе тестов.
- Для жёстких ограничений применяйте
BoundedSemaphore.
Используя семафоры, вы создаёте отказоустойчивые и эффективные асинхронные системы, готовые к высоким нагрузкам.