Работа с RabbitMQ в Python: полное руководство
Работа с RabbitMQ в Python: полное руководство
RabbitMQ — популярный брокер сообщений, реализующий протокол AMQP. Он используется для асинхронной обработки задач, интеграции микросервисов и распределения нагрузки. В этой статье вы узнаете, как работать с RabbitMQ в Python.
1. Установка RabbitMQ
Локальная установка
-
Linux (Ubuntu/Debian):
sudo apt-get install rabbitmq-server sudo systemctl start rabbitmq-server -
MacOS (через Homebrew):
brew install rabbitmq brew services start rabbitmq
Docker
docker run -d --name rabbitmq -p 5672:5672 -p 15672:15672 rabbitmq:management
Веб-интерфейс доступен по адресу http://localhost:15672 (логин: guest, пароль: guest).
2. Установка Python-клиента
Библиотека pika — основной инструмент для работы с RabbitMQ:
pip install pika
3. Основные концепции
- Producer (издатель): Отправляет сообщения в очередь.
- Consumer (потребитель): Читает сообщения из очереди.
- Exchange (топик): Маршрутизирует сообщения в очереди.
- Queue (очередь): Хранит сообщения до их обработки.
- Binding: Правила, связывающие exchange с очередью.
4. Простой пример: “Hello, World!”
Издатель (producer.py)
import pika
# Подключение к локальному серверу
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
# Создание очереди 'hello'
channel.queue_declare(queue='hello')
# Отправка сообщения
channel.basic_publish(
exchange='',
routing_key='hello',
body='Hello, RabbitMQ!'
)
print("Сообщение отправлено")
connection.close()
Потребитель (consumer.py)
import pika
def callback(ch, method, properties, body):
print(f"Получено сообщение: {body.decode()}")
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
channel.queue_declare(queue='hello')
channel.basic_consume(queue='hello', on_message_callback=callback, auto_ack=True)
print("Ожидание сообщений...")
channel.start_consuming()
5. Work Queues: Распределение задач
Используется для балансировки нагрузки между воркерами.
Издатель (task_producer.py)
import sys
channel.basic_publish(
exchange='',
routing_key='task_queue',
body=sys.argv[1],
properties=pika.BasicProperties(delivery_mode=2) # Сообщение сохраняется на диске
)
Потребитель (worker.py)
channel.queue_declare(queue='task_queue', durable=True) # Дублируемая очередь
def callback(ch, method, properties, body):
print(f"Обработка: {body.decode()}")
ch.basic_ack(delivery_tag=method.delivery_tag) # Подтверждение выполнения
channel.basic_qos(prefetch_count=1) # Обрабатывать по одному сообщению за раз
channel.basic_consume(queue='task_queue', on_message_callback=callback)
6. Сериализация данных
Для передачи сложных объектов используйте JSON:
import json
data = {'id': 1, 'task': 'process_image'}
channel.basic_publish(
exchange='',
routing_key='tasks',
body=json.dumps(data).encode()
)
# На стороне потребителя:
message = json.loads(body.decode())
7. Топики и маршрутизация
Объявление топика
channel.exchange_declare(exchange='logs', exchange_type='topic')
Издатель с ключом маршрутизации
channel.basic_publish(
exchange='logs',
routing_key='user.notification',
body='Новое уведомление'
)
Потребитель с привязкой к ключу
channel.queue_bind(exchange='logs', queue='notifications', routing_key='user.*')
8. RPC (Remote Procedure Call)
Пример реализации запроса-ответа:
# Клиент
result = channel.queue_declare(queue='', exclusive=True)
callback_queue = result.method.queue
channel.basic_publish(
exchange='',
routing_key='rpc_queue',
properties=pika.BasicProperties(reply_to=callback_queue),
body='10'
)
# Сервер
def on_request(ch, method, props, body):
response = int(body) * 2
ch.basic_publish(
exchange='',
routing_key=props.reply_to,
properties=pika.BasicProperties(correlation_id=props.correlation_id),
body=str(response)
)
ch.basic_ack(delivery_tag=method.delivery_tag)
9. Лучшие практики
-
Надежность:
- Используйте
delivery_mode=2для сохранения сообщений на диске. - Включайте подтверждение (
basic_ack) после обработки.
- Используйте
-
Балансировка:
- Ограничивайте
prefetch_count, чтобы избежать перегрузки воркеров.
- Ограничивайте
-
Обработка ошибок:
- Добавляйте retry-логику для повторной отправки сообщений.
- Используйте Dead Letter Exchanges для проблемных сообщений.
-
Мониторинг:
- Включайте RabbitMQ Management Plugin для наблюдения за очередями.
10. Заключение
RabbitMQ предоставляет гибкий инструмент для асинхронной коммуникации между компонентами системы. Используя Python и библиотеку pika, вы можете легко реализовать:
- Распределенные задачи.
- Сложные маршрутизации.
- RPC-взаимодействия.
Итоговый пример:
import pika
# Издатель
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
channel.basic_publish(exchange='', routing_key='test', body='Hello, World!')
connection.close()
# Потребитель
def callback(ch, method, properties, body):
print(f"Получено: {body.decode()}")
channel.basic_consume(queue='test', on_message_callback=callback, auto_ack=True)
channel.start_consuming()
Удачной работы с очередями! 🐇