Asyncio ensure_future в Python: Подробное руководство с примерами

Asyncio ensure_future в Python: Подробное руководство с примерами


Asyncio ensure_future в Python: Подробное руководство с примерами

Asyncio — это мощная библиотека Python для написания параллельного кода с использованием синтаксиса async/await. Одной из ключевых возможностей asyncio является запуск корутин в качестве задач (Task), которые выполняются конкурентно в цикле событий. В этой статье мы глубоко погрузимся в функцию ensure_future, изучим её назначение, особенности и практическое применение.

Что такое ensure_future?

asyncio.ensure_future — это функция, которая преобразует объект корутины в объект Task, планируя его выполнение в цикле событий. Если переданный объект уже является Future или Task, он возвращается без изменений.

Основное назначение

  • Преобразование корутин в задачи для параллельного выполнения
  • Обеспечение совместимости между разными типами асинхронных объектов
  • Планирование выполнения в цикле событий

Базовое использование

Рассмотрим простой пример использования ensure_future:

import asyncio

async def simple_coroutine():
    print("Начало корутины")
    await asyncio.sleep(1)
    print("Корутина завершена")

async def main():
    # Создаем задачу из корутины
    task = asyncio.ensure_future(simple_coroutine())
    
    # Задача уже запланирована на выполнение
    # Ждем завершения задачи
    await task

# Запуск основной программы
loop = asyncio.get_event_loop()
loop.run_until_complete(main())

ensure_future vs create_task

В Python 3.7+ появилась функция asyncio.create_task(), которая рекомендуется для создания задач. Однако между ними есть важные различия:

async def example():
    coro = some_coroutine()
    
    # Рекомендуемый способ в Python 3.7+
    task1 = asyncio.create_task(coro)
    
    # Альтернативный способ (работает в более старых версиях)
    task2 = asyncio.ensure_future(coro)
    
    # ensure_future может принимать любые awaitable объекты
    future = asyncio.Future()
    task3 = asyncio.ensure_future(future)  # Возвращает тот же future

Ключевые различия:

  • create_task принимает только корутины
  • ensure_future принимает корутины, Future и другие awaitable объекты
  • create_task доступен только в Python 3.7+

Практические примеры использования

Параллельное выполнение задач

import asyncio
import time

async def task(name, duration):
    print(f"Задача '{name}' началась")
    await asyncio.sleep(duration)
    print(f"Задача '{name}' завершена после {duration} сек")
    return f"Результат {name}"

async def main():
    start_time = time.time()
    
    # Создаем несколько задач
    task1 = asyncio.ensure_future(task("A", 2))
    task2 = asyncio.ensure_future(task("B", 1))
    task3 = asyncio.ensure_future(task("C", 3))
    
    # Ждем завершения всех задач
    results = await asyncio.gather(task1, task2, task3)
    print(f"Все задачи завершены. Результаты: {results}")
    
    end_time = time.time()
    print(f"Общее время выполнения: {end_time - start_time:.2f} сек")

asyncio.run(main())

Обработка исключений в задачах

import asyncio

async def risky_task(name):
    print(f"Задача '{name}' выполняется")
    await asyncio.sleep(1)
    if name == "B":
        raise ValueError("Искусственная ошибка в задаче B")
    return f"Успех {name}"

async def main():
    tasks = [
        asyncio.ensure_future(risky_task("A")),
        asyncio.ensure_future(risky_task("B")),
        asyncio.ensure_future(risky_task("C"))
    ]
    
    try:
        # Ждем завершения всех задач
        results = await asyncio.gather(*tasks, return_exceptions=True)
        print("Результаты:", results)
    except Exception as e:
        print(f"Поймано исключение: {e}")
    
    # Альтернативный подход - обработка для каждой задачи
    for task in tasks:
        try:
            result = await task
            print(f"Результат задачи: {result}")
        except Exception as e:
            print(f"Ошибка в задаче: {e}")

asyncio.run(main())

Ожидание с таймаутом

import asyncio

async def long_running_task():
    print("Долгая задача началась")
    await asyncio.sleep(5)
    print("Долгая задача завершена")
    return "Готово"

async def main():
    # Создаем задачу
    task = asyncio.ensure_future(long_running_task())
    
    try:
        # Ждем с таймаутом
        result = await asyncio.wait_for(task, timeout=2.0)
        print(f"Результат: {result}")
    except asyncio.TimeoutError:
        print("Задача не завершилась в срок")
        
        # Проверяем состояние задачи
        if not task.done():
            print("Задача все еще выполняется")
            # Можно отменить задачу
            task.cancel()
            try:
                await task
            except asyncio.CancelledError:
                print("Задача была отменена")

asyncio.run(main())

Работа с результатами задач

Получение результатов по завершении

import asyncio

async def calculate_square(x):
    await asyncio.sleep(1)
    return x * x

async def main():
    # Создаем несколько задач
    numbers = [1, 2, 3, 4, 5]
    tasks = [asyncio.ensure_future(calculate_square(n)) for n in numbers]
    
    # Ждем завершения каждой задачи и обрабатываем результат
    for task in asyncio.as_completed(tasks):
        result = await task
        print(f"Получен результат: {result}")
    
    # Альтернативно: получить все результаты сразу
    results = await asyncio.gather(*tasks)
    print("Все результаты:", results)

asyncio.run(main())

Использование callback-функций

import asyncio

def handle_result(future):
    try:
        result = future.result()
        print(f"Callback: Задача завершена с результатом {result}")
    except Exception as e:
        print(f"Callback: Задача завершена с ошибкой {e}")

async def successful_task():
    await asyncio.sleep(1)
    return "Успех"

async def failing_task():
    await asyncio.sleep(1)
    raise ValueError("Что-то пошло не так")

async def main():
    # Создаем задачи и добавляем callback
    task1 = asyncio.ensure_future(successful_task())
    task1.add_done_callback(handle_result)
    
    task2 = asyncio.ensure_future(failing_task())
    task2.add_done_callback(handle_result)
    
    # Ждем завершения задач
    await asyncio.sleep(2)

asyncio.run(main())

Продвинутые техники

Цепочки задач

import asyncio

async def first_task():
    print("Первая задача началась")
    await asyncio.sleep(1)
    print("Первая задача завершена")
    return 10

async def second_task(x):
    print("Вторая задача началась")
    await asyncio.sleep(1)
    print("Вторая задача завершена")
    return x * 2

async def third_task(x):
    print("Третья задача началась")
    await asyncio.sleep(1)
    print("Третья задача завершена")
    return x + 5

async def main():
    # Создаем цепочку задач
    first = asyncio.ensure_future(first_task())
    
    # Создаем задачу, которая зависит от результата первой
    async def chained_tasks():
        result1 = await first
        result2 = await asyncio.ensure_future(second_task(result1))
        result3 = await asyncio.ensure_future(third_task(result2))
        return result3
    
    # Запускаем цепочку
    final_result = await chained_tasks()
    print(f"Финальный результат: {final_result}")

asyncio.run(main())

Ограничение количества одновременных задач

import asyncio
from asyncio import Semaphore

async def limited_task(name, semaphore):
    async with semaphore:
        print(f"Задача '{name}' начала выполнение")
        await asyncio.sleep(2)
        print(f"Задача '{name}' завершила выполнение")
        return name

async def main():
    # Ограничиваем количество одновременных задач до 2
    semaphore = Semaphore(2)
    
    # Создаем задачи
    tasks = [
        asyncio.ensure_future(limited_task(f"Task-{i}", semaphore))
        for i in range(5)
    ]
    
    # Ждем завершения всех задач
    results = await asyncio.gather(*tasks)
    print("Все задачи завершены:", results)

asyncio.run(main())

Обработка отмены задач

import asyncio

async def cancellable_task():
    try:
        print("Задача началась")
        for i in range(10):
            print(f"Шаг {i}")
            await asyncio.sleep(0.5)
        return "Завершено"
    except asyncio.CancelledError:
        print("Задача была отменена")
        raise
    finally:
        print("Задача завершает работу")

async def main():
    # Создаем задачу
    task = asyncio.ensure_future(cancellable_task())
    
    # Даем задаче немного поработать
    await asyncio.sleep(2)
    
    # Отменяем задачу
    task.cancel()
    
    try:
        # Пытаемся получить результат
        result = await task
        print(f"Результат: {result}")
    except asyncio.CancelledError:
        print("Задача была успешно отменена")

asyncio.run(main())

Лучшие практики и советы

  1. Используйте create_task вместо ensure_future для корутин (в Python 3.7+)
  2. Всегда обрабатывайте исключения в задачах
  3. Используйте таймауты для避免 бесконечного ожидания
  4. Отменяйте ненужные задачи для освобождения ресурсов
  5. Ограничивайте количество одновременных задач при работе с ресурсами

Заключение

asyncio.ensure_future — это мощный инструмент для работы с асинхронными задачами в Python. Понимание его работы и правильное применение позволяет эффективно использовать возможности асинхронного программирования, создавать отзывчивые и производительные приложения.

Несмотря на появление более специализированной функции create_task, ensure_future остается важным инструментом, особенно при работе с различными типами awaitable-объектов и в коде, который должен быть совместим с разными версиями Python.

Помните, что с большой силой приходит и большая ответственность — всегда тщательно управляйте жизненным циклом задач, обрабатывайте исключения и обеспечивайте корректное завершение работы ваших асинхронных приложений.