Модель выполнения DataStore
Понимание модели ленивых вычислений DataStore — ключ к его эффективному использованию и достижению оптимальной производительности.
Отложенное вычисление
DataStore использует отложенное вычисление — операции не выполняются сразу, а записываются и компилируются в оптимизированные SQL-запросы. Выполнение происходит только тогда, когда результаты действительно нужны.
Пример: ленивые и жадные вычисления
from pathlib import Path
Path("sales.csv").write_text("""\
region,product,category,amount,quantity,price,date,order_id
East,Widget,Electronics,5200,10,120,2024-01-15,1001
West,Gadget,Electronics,800,5,160,2024-02-20,1002
East,Gizmo,Home,6500,3,100,2024-03-10,1003
North,Widget,Electronics,4500,6,150,2024-06-18,1004
West,Gadget,Electronics,2000,8,250,2024-09-14,1005
""")
from chdb import datastore as pd
ds = pd.read_csv("sales.csv")
# These operations are NOT executed yet
result = (ds
.filter(ds['amount'] > 1000) # Recorded, not executed
.select('region', 'amount') # Recorded, not executed
.groupby('region') # Recorded, not executed
.agg({'amount': 'sum'}) # Recorded, not executed
.sort('sum', ascending=False) # Recorded, not executed
)
# Still no execution - just building the query plan
print(result.to_sql())
# SELECT region, SUM(amount) AS sum
# FROM file('sales.csv', 'CSVWithNames')
# WHERE amount > 1000
# GROUP BY region
# ORDER BY sum DESC
# NOW execution happens
df = result.to_df() # <-- Triggers execution
Преимущества ленивых вычислений
- Оптимизация запросов: Несколько операций компилируются в один оптимизированный SQL‑запрос
- Проталкивание фильтров: Фильтры применяются на уровне источника данных
- Отсечение столбцов: Считываются только необходимые столбцы
- Отложенный выбор: Движок выполнения можно выбрать во время выполнения
- Анализ плана: Вы можете просмотреть и отладить запрос перед выполнением
Триггеры выполнения
Выполнение автоматически запускается, когда требуются фактические значения:
Автоматические триггеры
| Триггер | Пример | Описание |
|---|
print() / repr() | print(ds) | Отобразить результаты |
len() | len(ds) | Получить количество строк |
.columns | ds.columns | Получить имена столбцов |
.dtypes | ds.dtypes | Получить типы столбцов |
.shape | ds.shape | Получить размеры |
.index | ds.index | Получить индекс строк |
.values | ds.values | Получить массив NumPy |
| Iteration | for row in ds | Перебор строк |
to_df() | ds.to_df() | Преобразовать в pandas |
to_pandas() | ds.to_pandas() | Синоним to_df |
to_dict() | ds.to_dict() | Преобразовать в dict |
to_numpy() | ds.to_numpy() | Преобразовать в массив |
.equals() | ds.equals(other) | Сравнить объекты DataStore |
Примеры:
# All these trigger execution
print(ds) # Display
len(ds) # 1000
ds.columns # Index(['name', 'age', 'city'])
ds.shape # (1000, 3)
list(ds) # List of values
ds.to_df() # pandas DataFrame
Операции, которые выполняются лениво
| Operation | Returns | Description |
|---|
filter() | DataStore | Добавляет предложение WHERE |
select() | DataStore | Добавляет выбор столбцов |
sort() | DataStore | Добавляет ORDER BY |
groupby() | LazyGroupBy | Подготавливает GROUP BY |
join() | DataStore | Добавляет JOIN |
ds['col'] | ColumnExpr | Ссылка на столбец |
ds[['col1', 'col2']] | DataStore | Выбор столбцов |
Примеры:
# These do NOT trigger execution - they stay lazy
result = ds.filter(ds['age'] > 25) # Returns DataStore
result = ds.select('name', 'age') # Returns DataStore
result = ds['name'] # Returns ColumnExpr
result = ds.groupby('city') # Returns LazyGroupBy
Трёхфазное выполнение
Операции DataStore используют трёхфазную модель выполнения:
Этап 1: построение SQL-запроса (отложенное)
Операции, которые можно выразить в SQL, накапливаются:
result = (ds
.filter(ds['status'] == 'active') # WHERE
.select('user_id', 'amount') # SELECT
.groupby('user_id') # GROUP BY
.agg({'amount': 'sum'}) # SUM()
.sort('sum', ascending=False) # ORDER BY
.limit(10) # LIMIT
)
# All compiled into one SQL query
Фаза 2: Точка выполнения
Когда срабатывает триггер, накопленный SQL-запрос выполняется:
# Execution triggered here
df = result.to_df()
# The single optimized SQL query runs now
Фаза 3: операции с DataFrame (если есть)
Если после выполнения вы добавляете цепочку операций, выполняемых исключительно средствами pandas:
# Mixed operations
result = (ds
.filter(ds['amount'] > 100) # Phase 1: SQL
.to_df() # Phase 2: Execute
.pivot_table(...) # Phase 3: pandas
)
Просмотр планов выполнения
Используйте explain(), чтобы увидеть, что именно будет выполнено:
ds = pd.read_csv("sales.csv")
query = (ds
.filter(ds['amount'] > 1000)
.groupby('region')
.agg({'amount': ['sum', 'mean']})
)
# View execution plan
query.explain()
Вывод:
Pipeline:
1. Source: file('sales.csv', 'CSVWithNames')
2. Filter: amount > 1000
3. GroupBy: region
4. Aggregate: sum(amount), avg(amount)
Generated SQL:
SELECT region, SUM(amount) AS sum, AVG(amount) AS mean
FROM file('sales.csv', 'CSVWithNames')
WHERE amount > 1000
GROUP BY region
Используйте verbose=True, чтобы получить более подробные сведения:
query.explain(verbose=True)
Полную документацию см. в разделе Отладка: explain().
Кеширование
DataStore кеширует результаты выполнения, чтобы избежать повторных запросов.
Как устроено кэширование
from pathlib import Path
Path("data.csv").write_text("""\
name,age,city,salary,department
Alice,25,NYC,55000,Engineering
Bob,30,LA,65000,Product
Charlie,35,NYC,80000,Engineering
Diana,28,SF,70000,Design
Eve,42,NYC,95000,Product
""")
ds = pd.read_csv("data.csv")
result = ds.filter(ds['age'] > 25)
# First access - executes query
print(result.shape) # Executes and caches
# Second access - uses cache
print(result.columns) # Uses cached result
# Third access - uses cache
df = result.to_df() # Uses cached result
Инвалидация кэша
Кэш становится недействительным, когда операции модифицируют DataStore:
result = ds.filter(ds['age'] > 25)
print(result.shape) # Executes, caches
# New operation invalidates cache
result2 = result.filter(result['city'] == 'NYC')
print(result2.shape) # Re-executes (different query)
Ручное управление кэшем
# Clear cache
ds.clear_cache()
# Disable caching
from chdb.datastore.config import config
config.set_cache_enabled(False)
Смешивание операций SQL и Pandas
DataStore интеллектуально обрабатывает операции, сочетающие SQL и Pandas:
Операции, совместимые с SQL
Эти операции транслируются в SQL:
filter(), where()
select()
groupby(), agg()
sort(), orderby()
limit(), offset()
join(), union()
distinct()
- Операции над столбцами (арифметика, сравнение, строковые методы)
Операции только в pandas
Эти операции запускают выполнение и используют pandas:
apply() с пользовательскими функциями
pivot_table() со сложными агрегациями
stack(), unstack()
- Операции с выполненными объектами DataFrame
Гибридные конвейеры
# SQL phase
result = (ds
.filter(ds['amount'] > 100) # SQL
.groupby('category') # SQL
.agg({'amount': 'sum'}) # SQL
)
# Execution + pandas phase
result = (result
.to_df() # Execute SQL
.pivot_table(...) # pandas operation
)
Выбор движка выполнения
DataStore может выполнять операции с использованием различных движков:
Автоматический режим (по умолчанию)
from chdb.datastore.config import config
config.set_execution_engine('auto') # Default
# Automatically selects best engine per operation
Принудительный выбор движка chDB
config.set_execution_engine('chdb')
# All operations use ClickHouse SQL
Принудительное использование движка Pandas
config.set_execution_engine('pandas')
# All operations use pandas
Подробности см. в разделе Configuration: Execution Engine.
Хорошо: ранняя фильтрация
# Good: Filter in SQL, then aggregate
result = (ds
.filter(ds['date'] >= '2024-01-01') # Reduces data early
.groupby('category')
.agg({'amount': 'sum'})
)
Плохо: фильтровать поздно
# Bad: Aggregate all, then filter
result = (ds
.groupby('category')
.agg({'amount': 'sum'})
.to_df()
.query('sum > 1000') # Pandas filter after aggregation
)
Хорошо: выбирайте столбцы как можно раньше
# Good: Select columns in SQL
result = (ds
.select('user_id', 'amount', 'date')
.filter(ds['date'] >= '2024-01-01')
.groupby('user_id')
.agg({'amount': 'sum'})
)
Лучше так: пусть за вас работает SQL
# Good: Complex aggregation in SQL
result = (ds
.groupby('category')
.agg({
'amount': ['sum', 'mean', 'count'],
'quantity': 'sum'
})
.sort('sum', ascending=False)
.limit(10)
)
# One SQL query does everything
# Bad: Multiple separate queries
sums = ds.groupby('category')['amount'].sum().to_df()
means = ds.groupby('category')['amount'].mean().to_df()
# Two queries instead of one
Краткое изложение передовых практик
- Связывайте операции перед выполнением - Сформируйте полный запрос, затем выполните его один раз
- Фильтруйте как можно раньше - Уменьшайте объем данных на стороне источника
- Выбирайте только нужные столбцы - Исключение лишних столбцов улучшает производительность
- Используйте
explain() для понимания выполнения - Отлаживайте перед запуском
- Позвольте SQL обрабатывать агрегации - ClickHouse оптимизирован для этого
- Понимайте, что именно запускает выполнение - Избегайте случайного раннего выполнения
- Разумно используйте кэширование - Понимайте, когда кэш инвалидируется