Работа с Apache Cassandra в Python: Полное руководство
Работа с Apache Cassandra в Python: Полное руководство
Введение в Cassandra
Apache Cassandra — распределенная NoSQL СУБД, разработанная для обработки больших объемов данных с высокой доступностью и линейной масштабируемостью. Ее архитектура без единой точки отказа и поддержка мульти-датацентров делают Cassandra идеальной для задач, требующих отказоустойчивости: IoT, аналитики в реальном времени, журналирования и рекомендательных систем.
1. Установка и настройка Cassandra
Шаг 1: Установка Cassandra
# Для Ubuntu/Debian
sudo apt install openjdk-11-jdk
echo "deb https://downloads.apache.org/cassandra/debian 40x main" | sudo tee -a /etc/apt/sources.list
sudo apt update
sudo apt install cassandra
# Запуск службы
sudo systemctl start cassandra
sudo systemctl status cassandra
Шаг 2: Проверка работы через CQLSH
cqlsh
SELECT cluster_name FROM system.local;
Ожидаемый вывод: Test Cluster.
2. Установка Python-драйвера
Используйте официальный драйвер cassandra-driver:
pip install cassandra-driver
Драйвер поддерживает:
- Асинхронные запросы
- Подготовленные выражения
- Автоматическую балансировку нагрузки
- Пакетные операции
3. Подключение к Cassandra из Python
from cassandra.cluster import Cluster
from cassandra.auth import PlainTextAuthProvider
auth_provider = PlainTextAuthProvider(username='user', password='pass')
cluster = Cluster(['127.0.0.1'], auth_provider=auth_provider, port=9042)
session = cluster.connect()
# Проверка подключения
rows = session.execute("SELECT release_version FROM system.local")
print(f"Cassandra version: {rows.one()[0]}")
4. Создание Keyspace и таблиц
Keyspace — пространство ключей (аналог базы данных).
session.execute("""
CREATE KEYSPACE IF NOT EXISTS shop
WITH replication = {
'class': 'SimpleStrategy',
'replication_factor': 1
}
""")
session.set_keyspace('shop')
# Создание таблицы продуктов
session.execute("""
CREATE TABLE products (
product_id UUID PRIMARY KEY,
name TEXT,
price DECIMAL,
category TEXT,
stock INT
)
""")
5. CRUD-операции
Вставка данных
from cassandra.query import BatchStatement
from uuid import uuid4
insert_query = session.prepare("""
INSERT INTO products (product_id, name, price, category, stock)
VALUES (?, ?, ?, ?, ?)
""")
# Одиночная вставка
product_id = uuid4()
session.execute(insert_query, (product_id, "Laptop", 999.99, "Electronics", 10))
# Пакетная вставка
batch = BatchStatement()
batch.add(insert_query, (uuid4(), "Phone", 699.99, "Electronics", 20))
batch.add(insert_query, (uuid4(), "Tablet", 399.99, "Electronics", 15))
session.execute(batch)
Чтение данных
select_query = "SELECT * FROM products WHERE category = ?"
prepared = session.prepare(select_query)
rows = session.execute(prepared, ["Electronics"])
for row in rows:
print(f"{row.name}: ${row.price}, Stock: {row.stock}")
Обновление и удаление
# Обновление
session.execute(
"UPDATE products SET stock = ? WHERE product_id = ?",
(5, product_id)
)
# Удаление
session.execute(
"DELETE FROM products WHERE product_id = ?",
(product_id,)
)
6. Индексы и запросы
Cassandra требует фильтрацию по первичному ключу. Для поиска по другим полям создайте вторичный индекс:
session.execute("CREATE INDEX IF NOT EXISTS ON products (category)")
Пример запроса с индексом:
rows = session.execute("SELECT * FROM products WHERE category = 'Electronics'")
7. Асинхронные операции
Используйте execute_async для неблокирующих запросов:
from cassandra.concurrent import execute_concurrent
query = "INSERT INTO products (product_id, name) VALUES (?, ?)"
prepared = session.prepare(query)
products = [(uuid4(), f"Product {i}") for i in range(100)]
# Параллельное выполнение
execute_concurrent(
session,
prepared,
products,
concurrency=10
)
8. Обработка ошибок и повторные попытки
from cassandra.query import RetryPolicy
class CustomRetryPolicy(RetryPolicy):
def on_read_timeout(self, query, consistency, required_responses, received_responses, **kwargs):
return self.RETRY
cluster = Cluster(retry_policy=CustomRetryPolicy())
9. Пример приложения: Мониторинг склада
def add_product(name, price, category, stock):
session.execute(
prepared_insert,
(uuid4(), name, price, category, stock)
)
def restock_alert(threshold=5):
rows = session.execute("SELECT name, stock FROM products")
for row in rows:
if row.stock < threshold:
print(f"ALERT: Low stock for {row.name}! Current: {row.stock}")
# Добавление товара
add_product("Headphones", 129.99, "Audio", 3)
# Проверка остатков
restock_alert()
10. Оптимизация производительности
- Тюнинг запросов: Используйте
ALLOW FILTERINGтолько для админ-задач. - Пакетная вставка: Группируйте операции в
BatchStatement. - Профилирование: Включать через
session.execute(query, trace=True). - Настройка пула:
cluster = Cluster( max_connections_per_host=10, max_requests_per_connection=1000 )
11. Лучшие практики
- Моделирование данных: Спроектируйте таблицы под конкретные запросы (Query-Driven Design).
- Репликация: Для продакшена используйте
NetworkTopologyStrategyвместоSimpleStrategy. - Консистентность: Выбирайте уровень согласованности (например,
QUORUMдля баланса). - Tombstones: Избегайте частых удалений, чтобы не создавать “могильные камни”.
Заключение
Cassandra в сочетании с Python предоставляет мощный инструмент для работы с Big Data. Ее сила — в горизонтальной масштабируемости и отказоустойчивости. Ключевые этапы работы:
- Корректная настройка кластера
- Оптимальное моделирование таблиц
- Использование подготовленных запросов
- Пакетная обработка данных
- Мониторинг производительности
Примеры кода из статьи можно интегрировать в микросервисы, аналитические пайплайны или системы реального времени. Для углубленного изучения изучите документацию DataStax и исходный код драйвера на GitHub.
Полезные ресурсы:
Статья демонстрирует базовые и продвинутые техники работы с Cassandra в Python, достаточные для старта в коммерческих проектах.