Работа с Apache Cassandra в Python: Полное руководство

Работа с Apache Cassandra в Python: Полное руководство


Работа с Apache Cassandra в Python: Полное руководство

Введение в Cassandra
Apache Cassandra — распределенная NoSQL СУБД, разработанная для обработки больших объемов данных с высокой доступностью и линейной масштабируемостью. Ее архитектура без единой точки отказа и поддержка мульти-датацентров делают Cassandra идеальной для задач, требующих отказоустойчивости: IoT, аналитики в реальном времени, журналирования и рекомендательных систем.


1. Установка и настройка Cassandra

Шаг 1: Установка Cassandra

# Для Ubuntu/Debian
sudo apt install openjdk-11-jdk
echo "deb https://downloads.apache.org/cassandra/debian 40x main" | sudo tee -a /etc/apt/sources.list
sudo apt update
sudo apt install cassandra

# Запуск службы
sudo systemctl start cassandra
sudo systemctl status cassandra

Шаг 2: Проверка работы через CQLSH

cqlsh
SELECT cluster_name FROM system.local;

Ожидаемый вывод: Test Cluster.


2. Установка Python-драйвера

Используйте официальный драйвер cassandra-driver:

pip install cassandra-driver

Драйвер поддерживает:

  • Асинхронные запросы
  • Подготовленные выражения
  • Автоматическую балансировку нагрузки
  • Пакетные операции

3. Подключение к Cassandra из Python

from cassandra.cluster import Cluster
from cassandra.auth import PlainTextAuthProvider

auth_provider = PlainTextAuthProvider(username='user', password='pass')
cluster = Cluster(['127.0.0.1'], auth_provider=auth_provider, port=9042)
session = cluster.connect()

# Проверка подключения
rows = session.execute("SELECT release_version FROM system.local")
print(f"Cassandra version: {rows.one()[0]}")

4. Создание Keyspace и таблиц

Keyspace — пространство ключей (аналог базы данных).

session.execute("""
    CREATE KEYSPACE IF NOT EXISTS shop 
    WITH replication = {
        'class': 'SimpleStrategy', 
        'replication_factor': 1
    }
""")

session.set_keyspace('shop')

# Создание таблицы продуктов
session.execute("""
    CREATE TABLE products (
        product_id UUID PRIMARY KEY,
        name TEXT,
        price DECIMAL,
        category TEXT,
        stock INT
    )
""")

5. CRUD-операции

Вставка данных

from cassandra.query import BatchStatement
from uuid import uuid4

insert_query = session.prepare("""
    INSERT INTO products (product_id, name, price, category, stock)
    VALUES (?, ?, ?, ?, ?)
""")

# Одиночная вставка
product_id = uuid4()
session.execute(insert_query, (product_id, "Laptop", 999.99, "Electronics", 10))

# Пакетная вставка
batch = BatchStatement()
batch.add(insert_query, (uuid4(), "Phone", 699.99, "Electronics", 20))
batch.add(insert_query, (uuid4(), "Tablet", 399.99, "Electronics", 15))
session.execute(batch)

Чтение данных

select_query = "SELECT * FROM products WHERE category = ?"
prepared = session.prepare(select_query)
rows = session.execute(prepared, ["Electronics"])

for row in rows:
    print(f"{row.name}: ${row.price}, Stock: {row.stock}")

Обновление и удаление

# Обновление
session.execute(
    "UPDATE products SET stock = ? WHERE product_id = ?",
    (5, product_id)
)

# Удаление
session.execute(
    "DELETE FROM products WHERE product_id = ?", 
    (product_id,)
)

6. Индексы и запросы

Cassandra требует фильтрацию по первичному ключу. Для поиска по другим полям создайте вторичный индекс:

session.execute("CREATE INDEX IF NOT EXISTS ON products (category)")

Пример запроса с индексом:

rows = session.execute("SELECT * FROM products WHERE category = 'Electronics'")

7. Асинхронные операции

Используйте execute_async для неблокирующих запросов:

from cassandra.concurrent import execute_concurrent

query = "INSERT INTO products (product_id, name) VALUES (?, ?)"
prepared = session.prepare(query)
products = [(uuid4(), f"Product {i}") for i in range(100)]

# Параллельное выполнение
execute_concurrent(
    session, 
    prepared, 
    products,
    concurrency=10
)

8. Обработка ошибок и повторные попытки

from cassandra.query import RetryPolicy

class CustomRetryPolicy(RetryPolicy):
    def on_read_timeout(self, query, consistency, required_responses, received_responses, **kwargs):
        return self.RETRY

cluster = Cluster(retry_policy=CustomRetryPolicy())

9. Пример приложения: Мониторинг склада

def add_product(name, price, category, stock):
    session.execute(
        prepared_insert, 
        (uuid4(), name, price, category, stock)
    )

def restock_alert(threshold=5):
    rows = session.execute("SELECT name, stock FROM products")
    for row in rows:
        if row.stock < threshold:
            print(f"ALERT: Low stock for {row.name}! Current: {row.stock}")

# Добавление товара
add_product("Headphones", 129.99, "Audio", 3)

# Проверка остатков
restock_alert()

10. Оптимизация производительности

  • Тюнинг запросов: Используйте ALLOW FILTERING только для админ-задач.
  • Пакетная вставка: Группируйте операции в BatchStatement.
  • Профилирование: Включать через session.execute(query, trace=True).
  • Настройка пула:
    cluster = Cluster(
        max_connections_per_host=10,
        max_requests_per_connection=1000
    )

11. Лучшие практики

  • Моделирование данных: Спроектируйте таблицы под конкретные запросы (Query-Driven Design).
  • Репликация: Для продакшена используйте NetworkTopologyStrategy вместо SimpleStrategy.
  • Консистентность: Выбирайте уровень согласованности (например, QUORUM для баланса).
  • Tombstones: Избегайте частых удалений, чтобы не создавать “могильные камни”.

Заключение
Cassandra в сочетании с Python предоставляет мощный инструмент для работы с Big Data. Ее сила — в горизонтальной масштабируемости и отказоустойчивости. Ключевые этапы работы:

  1. Корректная настройка кластера
  2. Оптимальное моделирование таблиц
  3. Использование подготовленных запросов
  4. Пакетная обработка данных
  5. Мониторинг производительности

Примеры кода из статьи можно интегрировать в микросервисы, аналитические пайплайны или системы реального времени. Для углубленного изучения изучите документацию DataStax и исходный код драйвера на GitHub.

Полезные ресурсы:

Статья демонстрирует базовые и продвинутые техники работы с Cassandra в Python, достаточные для старта в коммерческих проектах.