Создание многокомпонентного веб-приложения на aiohttp: подробное руководство
Создание многокомпонентного веб-приложения на aiohttp: подробное руководство
Введение в aiohttp и асинхронный веб-разработку
aiohttp — это мощная библиотека Python для асинхронной работы с HTTP, которая позволяет создавать высокопроизводительные веб-серверы и клиенты. В отличие от традиционных синхронных фреймворков, aiohttp использует возможности async/await для обработки тысяч одновременных соединений с минимальными затратами ресурсов.
В этой статье мы подробно разберем, как создать многокомпонентное веб-приложение на aiohttp, где различные компоненты работают вместе, обеспечивая полноценную функциональность.
Архитектура многокомпонентного приложения
Перед тем как перейти к коду, давайте рассмотрим типичную архитектуру aiohttp-приложения:
- Основное приложение (Application) - центральный компонент
- Маршрутизатор (Router) - обработка URL-путей
- Обработчики запросов (Request Handlers) - бизнес-логика
- Middleware - промежуточное ПО для обработки запросов/ответов
- Фоновые задачи - асинхронные процессы
- Подключение к базам данных - работа с persistence-слоем
Создание базовой структуры приложения
Начнем с создания базовой структуры нашего приложения:
from aiohttp import web
import aiohttp_jinja2
import jinja2
import asyncpg
import logging
async def create_app():
# Создаем экземпляр приложения
app = web.Application()
# Настраиваем логирование
logging.basicConfig(level=logging.INFO)
# Инициализируем компоненты
await setup_database(app)
setup_routes(app)
setup_middleware(app)
setup_template_engine(app)
setup_background_tasks(app)
return app
if __name__ == '__main__':
web.run_app(create_app(), port=8080)
Компонент 1: Маршрутизация и обработчики
Создадим несколько обработчиков для различных URL:
def setup_routes(app):
# Добавляем маршруты
app.router.add_get('/', handle_index)
app.router.add_get('/users', handle_users)
app.router.add_get('/users/{id}', handle_user_detail)
app.router.add_post('/users', create_user)
app.router.add_get('/api/data', api_handler)
# Статические файлы
app.router.add_static('/static/', path='static', name='static')
async def handle_index(request):
return web.Response(text="Добро пожаловать на главную страницу!")
async def handle_users(request):
# Получаем подключение к БД из приложения
pool = request.app['db_pool']
async with pool.acquire() as connection:
users = await connection.fetch('SELECT * FROM users')
# Рендерим шаблон с данными
context = {'users': users}
return aiohttp_jinja2.render_template('users.html', request, context)
async def handle_user_detail(request):
user_id = request.match_info['id']
pool = request.app['db_pool']
async with pool.acquire() as connection:
user = await connection.fetchrow(
'SELECT * FROM users WHERE id = $1', int(user_id)
)
if not user:
return web.json_response({'error': 'User not found'}, status=404)
return web.json_response(dict(user))
async def create_user(request):
data = await request.post()
# Валидация данных...
pool = request.app['db_pool']
async with pool.acquire() as connection:
await connection.execute(
'INSERT INTO users(name, email) VALUES($1, $2)',
data['name'], data['email']
)
return web.Response(text="Пользователь создан", status=201)
async def api_handler(request):
# Пример API обработчика
return web.json_response({'status': 'ok', 'data': [1, 2, 3]})
Компонент 2: Подключение к базе данных
Реализуем компонент для работы с PostgreSQL с помощью asyncpg:
async def setup_database(app):
# Создаем пул подключений к БД
pool = await asyncpg.create_pool(
host='localhost',
port=5432,
user='your_username',
password='your_password',
database='your_database',
min_size=5,
max_size=20
)
app['db_pool'] = pool
# Закрываем пул при остановке приложения
async def close_database(app):
await pool.close()
app.on_cleanup.append(close_database)
# Создаем таблицы, если они не существуют
async with pool.acquire() as connection:
await connection.execute('''
CREATE TABLE IF NOT EXISTS users (
id SERIAL PRIMARY KEY,
name TEXT NOT NULL,
email TEXT NOT NULL UNIQUE,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
''')
Компонент 3: Middleware для аутентификации и логирования
Middleware позволяет перехватывать запросы и ответы для выполнения общих задач:
def setup_middleware(app):
# Middleware для логирования
@web.middleware
async def logging_middleware(request, handler):
logger = logging.getLogger('aiohttp.access')
start_time = time.time()
# Продолжаем обработку запроса
response = await handler(request)
# Логируем информацию о запросе
processing_time = time.time() - start_time
logger.info(
f"{request.method} {request.path} "
f"processed in {processing_time:.3f}s, "
f"status: {response.status}"
)
return response
# Middleware для аутентификации
@web.middleware
async def auth_middleware(request, handler):
# Пропускаем аутентификацию для некоторых путей
if request.path in ['/login', '/static/', '/public']:
return await handler(request)
# Проверяем аутентификацию
token = request.headers.get('Authorization', None)
if not token or not validate_token(token):
return web.json_response(
{'error': 'Unauthorized'}, status=401
)
# Добавляем информацию о пользователе в запрос
request['user'] = await get_user_from_token(token)
return await handler(request)
# Добавляем middleware в приложение
app.middlewares.append(logging_middleware)
app.middlewares.append(auth_middleware)
async def validate_token(token):
# Здесь должна быть реальная проверка токена
return token == 'secret_token'
async def get_user_from_token(token):
# Здесь должен быть реальный код получения пользователя
return {'id': 1, 'name': 'Test User'}
Компонент 4: Шаблонизатор Jinja2
Настроим рендеринг HTML-шаблонов:
def setup_template_engine(app):
# Настраиваем Jinja2
aiohttp_jinja2.setup(
app,
loader=jinja2.FileSystemLoader('templates')
)
# Добавляем глобальные переменные в шаблоны
aiohttp_jinja2.get_env(app).globals.update(
app_version='1.0.0',
site_name='My Awesome Site'
)
Создадим простой шаблон templates/users.html:
<!DOCTYPE html>
<html>
<head>
<title>Пользователи</title>
</head>
<body>
<h1>Список пользователей</h1>
<ul>
{% for user in users %}
<li>{{ user.name }} ({{ user.email }})</li>
{% endfor %}
</ul>
</body>
</html>
Компонент 5: Фоновые задачи
aiohttp позволяет запускать фоновые задачи, которые работают параллельно с обработкой запросов:
def setup_background_tasks(app):
# Задача для периодической очистки кэша
async def cache_cleaner(app):
while True:
await asyncio.sleep(3600) # Каждый час
# Логика очистки кэша
logging.info("Cleaning cache...")
# Задача для отправки уведомлений
async def notification_sender(app):
while True:
await asyncio.sleep(300) # Каждые 5 минут
# Логика отправки уведомлений
logging.info("Sending notifications...")
# Запускаем задачи при старте приложения
app.on_startup.append(start_background_tasks)
app.on_cleanup.append(cleanup_background_tasks)
async def start_background_tasks(app):
# Создаем задачи и сохраняем их в приложении
app['cache_cleaner'] = asyncio.create_task(cache_cleaner(app))
app['notification_sender'] = asyncio.create_task(notification_sender(app))
async def cleanup_background_tasks(app):
# Отменяем задачи при остановке приложения
app['cache_cleaner'].cancel()
app['notification_sender'].cancel()
# Ждем завершения задач
await app['cache_cleaner']
await app['notification_sender']
Компонент 6: Обработка ошибок
Создадим единый механизм обработки ошибок:
def setup_error_handlers(app):
# Обработка 404 ошибки
async def handle_404(request, response):
return aiohttp_jinja2.render_template(
'404.html', request, {}, status=404
)
# Обработка 500 ошибки
async def handle_500(request, response):
return aiohttp_jinja2.render_template(
'500.html', request, {}, status=500
)
# Регистрируем обработчики ошибок
app.middlewares.append(error_pages_middleware)
@web.middleware
async def error_pages_middleware(request, handler):
try:
response = await handler(request)
if response.status == 404:
return await handle_404(request, response)
return response
except web.HTTPException as ex:
if ex.status == 404:
return await handle_404(request, ex)
raise
except Exception:
return await handle_500(request, None)
Компонент 7: Веб-сокеты для реального времени
Добавим поддержку веб-сокетов для функций реального времени:
def setup_websocket_endpoints(app):
app.router.add_get('/ws', websocket_handler)
async def websocket_handler(request):
ws = web.WebSocketResponse()
await ws.prepare(request)
# Регистрируем подключение
request.app['websockets'].add(ws)
try:
async for msg in ws:
if msg.type == aiohttp.WSMsgType.TEXT:
# Обрабатываем сообщение
await handle_websocket_message(msg.data, ws, request.app)
elif msg.type == aiohttp.WSMsgType.ERROR:
logging.error(f'WebSocket error: {ws.exception()}')
finally:
# Удаляем подключение при закрытии
request.app['websockets'].remove(ws)
return ws
async def handle_websocket_message(data, ws, app):
# Обрабатываем полученное сообщение
try:
message = json.loads(data)
if message['type'] == 'chat':
# Рассылаем сообщение всем подключенным клиентам
for client in app['websockets']:
if client is not ws and not client.closed:
await client.send_json({
'type': 'chat',
'user': 'Anonymous',
'message': message['text']
})
except json.JSONDecodeError:
logging.error("Invalid JSON received over WebSocket")
async def start_websocket_storage(app):
# Инициализируем хранилище для веб-сокет подключений
app['websockets'] = set()
async def cleanup_websocket_storage(app):
# Закрываем все подключения при остановке приложения
for ws in set(app['websockets']):
await ws.close(code=1001, message='Server shutdown')
Компонент 8: API с версионированием и документацией
Создадим структурированный API с поддержкой версионирования:
def setup_api_routes(app):
# Создаем подмаршрутизатор для API
api_v1 = web.RouteTableDef()
@api_v1.get('/users')
async def api_get_users(request):
pool = request.app['db_pool']
async with pool.acquire() as connection:
users = await connection.fetch('SELECT * FROM users')
return web.json_response([dict(user) for user in users])
@api_v1.get('/users/{id}')
async def api_get_user(request):
user_id = request.match_info['id']
pool = request.app['db_pool']
async with pool.acquire() as connection:
user = await connection.fetchrow(
'SELECT * FROM users WHERE id = $1', int(user_id)
)
if not user:
return web.json_response({'error': 'User not found'}, status=404)
return web.json_response(dict(user))
@api_v1.post('/users')
async def api_create_user(request):
data = await request.json()
# Валидация данных
if not data.get('name') or not data.get('email'):
return web.json_response(
{'error': 'Name and email are required'}, status=400
)
pool = request.app['db_pool']
async with pool.acquire() as connection:
try:
await connection.execute(
'INSERT INTO users(name, email) VALUES($1, $2)',
data['name'], data['email']
)
except asyncpg.UniqueViolationError:
return web.json_response(
{'error': 'Email already exists'}, status=409
)
return web.json_response({'status': 'created'}, status=201)
# Подключаем API маршруты с префиксом /api/v1
app.router.add_routes(api_v1, prefix='/api/v1')
Интеграция всех компонентов
Теперь модифицируем функцию создания приложения для интеграции всех компонентов:
async def create_app():
app = web.Application()
# Настройка логирования
logging.basicConfig(level=logging.INFO)
app['logger'] = logging.getLogger('aiohttp')
# Инициализация всех компонентов
await setup_database(app)
await start_websocket_storage(app)
setup_routes(app)
setup_api_routes(app)
setup_middleware(app)
setup_template_engine(app)
setup_error_handlers(app)
setup_background_tasks(app)
setup_websocket_endpoints(app)
# Обработчики для управления жизненным циклом
app.on_startup.append(start_background_tasks)
app.on_startup.append(start_websocket_storage)
app.on_cleanup.append(cleanup_background_tasks)
app.on_cleanup.append(cleanup_websocket_storage)
return app
Запуск приложения
Создадим точку входа для запуска нашего приложения:
if __name__ == '__main__':
# Запускаем приложение
app = create_app()
web.run_app(app, host='0.0.0.0', port=8080)
Для production-окружения рекомендуется использовать ASGI-сервер, такой как Gunicorn с Uvicorn workers:
gunicorn myapp:create_app --bind 0.0.0.0:8080 --worker-class aiohttp.GunicornWebWorker
Тестирование приложения
Напишем базовые тесты для нашего приложения:
from aiohttp.test_utils import AioHTTPTestCase, unittest_run_loop
class MyAppTestCase(AioHTTPTestCase):
async def get_application(self):
return await create_app()
@unittest_run_loop
async def test_index(self):
resp = await self.client.request("GET", "/")
assert resp.status == 200
text = await resp.text()
assert "Добро пожаловать" in text
@unittest_run_loop
async def test_users_api(self):
resp = await self.client.request("GET", "/api/v1/users")
assert resp.status == 200
data = await resp.json()
assert isinstance(data, list)
Заключение
Мы создали полнофункциональное многокомпонентное веб-приложение на aiohttp, которое включает:
- Маршрутизацию с поддержкой REST API
- Подключение к базе данных с пулом соединений
- Middleware для аутентификации и логирования
- Шаблонизацию с Jinja2
- Фоновые задачи для периодических операций
- Обработку ошибок с пользовательскими страницами
- Веб-сокеты для работы в реальном времени
- Структурированный API с версионированием
Такой подход позволяет создавать масштабируемые и поддерживаемые приложения, где каждый компонент отвечает за свою конкретную задачу. aiohttp предоставляет все необходимые инструменты для построения высокопроизводительных асинхронных веб-приложений на Python.
Дальнейшие шаги для развития приложения могут включать добавление кэширования, более сложную систему аутентификации, интеграцию с очередями сообщений и контейнеризацию приложения с помощью Docker.