Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 15 additions & 1 deletion scripts/cache_data.json
Original file line number Diff line number Diff line change
Expand Up @@ -530,7 +530,9 @@
"name": "polars",
"children": [
"polars.DataFrame",
"polars.LazyFrame"
"polars.LazyFrame",
"polars.col",
"polars.lit"
],
"required": false
},
Expand Down Expand Up @@ -808,5 +810,17 @@
"full_path": "typing.Union",
"name": "Union",
"children": []
},
"polars.col": {
"type": "attribute",
"full_path": "polars.col",
"name": "col",
"children": []
},
"polars.lit": {
"type": "attribute",
"full_path": "polars.lit",
"name": "lit",
"children": []
}
}
2 changes: 2 additions & 0 deletions scripts/imports.py
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,8 @@

polars.DataFrame
polars.LazyFrame
polars.col
polars.lit

import duckdb
import duckdb.filesystem
Expand Down
5 changes: 3 additions & 2 deletions src/duckdb_py/arrow/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
# this is used for clang-tidy checks
add_library(python_arrow OBJECT arrow_array_stream.cpp arrow_export_utils.cpp
pyarrow_filter_pushdown.cpp)
add_library(
python_arrow OBJECT arrow_array_stream.cpp arrow_export_utils.cpp
polars_filter_pushdown.cpp pyarrow_filter_pushdown.cpp)

target_link_libraries(python_arrow PRIVATE _duckdb_dependencies)
64 changes: 64 additions & 0 deletions src/duckdb_py/arrow/arrow_array_stream.cpp
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
#include "duckdb_python/arrow/arrow_array_stream.hpp"
#include "duckdb_python/arrow/polars_filter_pushdown.hpp"
#include "duckdb_python/arrow/pyarrow_filter_pushdown.hpp"

#include "duckdb_python/pyconnection/pyconnection.hpp"
Expand Down Expand Up @@ -66,6 +67,55 @@ unique_ptr<ArrowArrayStreamWrapper> PythonTableArrowArrayStreamFactory::Produce(
py::handle arrow_obj_handle(factory->arrow_object);
auto arrow_object_type = factory->cached_arrow_type;

if (arrow_object_type == PyArrowObjectType::PolarsLazyFrame) {
py::object lf = py::reinterpret_borrow<py::object>(arrow_obj_handle);

auto filters = parameters.filters;
bool filters_pushed = false;

// Translate DuckDB filters to Polars expressions and push into the lazy plan
if (filters && !filters->filters.empty()) {
try {
auto filter_expr = PolarsFilterPushdown::TransformFilter(
*filters, parameters.projected_columns.projection_map, parameters.projected_columns.filter_to_col,
factory->client_properties);
if (!filter_expr.is(py::none())) {
lf = lf.attr("filter")(filter_expr);
filters_pushed = true;
}
} catch (...) {
// Fallback: DuckDB handles filtering post-scan
}
}

// If no filters were pushed and we have a cached Arrow table, reuse it. This avoids re-reading from source and
// re-converting on repeated unfiltered scans.
py::object arrow_table;
if (!filters_pushed && factory->cached_arrow_table.ptr() != nullptr) {
arrow_table = factory->cached_arrow_table;
} else {
arrow_table = lf.attr("collect")().attr("to_arrow")();
// Cache only unfiltered results (filtered results are partial)
if (!filters_pushed) {
factory->cached_arrow_table = arrow_table;
}
}

// Apply column projection
auto &column_list = parameters.projected_columns.columns;
if (!column_list.empty()) {
arrow_table = arrow_table.attr("select")(py::cast(column_list));
}

auto capsule_obj = arrow_table.attr("__arrow_c_stream__")();
auto capsule = py::reinterpret_borrow<py::capsule>(capsule_obj);
auto stream = capsule.get_pointer<struct ArrowArrayStream>();
auto res = make_uniq<ArrowArrayStreamWrapper>();
res->arrow_array_stream = *stream;
stream->release = nullptr;
return res;
}

if (arrow_object_type == PyArrowObjectType::PyCapsuleInterface || arrow_object_type == PyArrowObjectType::Table) {
py::object capsule_obj = arrow_obj_handle.attr("__arrow_c_stream__")();
auto capsule = py::reinterpret_borrow<py::capsule>(capsule_obj);
Expand Down Expand Up @@ -190,6 +240,20 @@ void PythonTableArrowArrayStreamFactory::GetSchema(uintptr_t factory_ptr, ArrowS
py::handle arrow_obj_handle(factory->arrow_object);

auto type = factory->cached_arrow_type;
if (type == PyArrowObjectType::PolarsLazyFrame) {
// head(0).collect().to_arrow() gives the Arrow-exported schema (e.g. large_string) without materializing data.
// collect_schema() would give Polars-native types (e.g. string_view) that don't match the actual export.
const auto empty_arrow = arrow_obj_handle.attr("head")(0).attr("collect")().attr("to_arrow")();
const auto schema_capsule = empty_arrow.attr("schema").attr("__arrow_c_schema__")();
const auto capsule = py::reinterpret_borrow<py::capsule>(schema_capsule);
const auto arrow_schema = capsule.get_pointer<struct ArrowSchema>();
factory->cached_schema = *arrow_schema;
arrow_schema->release = nullptr;
factory->schema_cached = true;
schema.arrow_schema = factory->cached_schema;
schema.arrow_schema.release = nullptr;
return;
}
if (type == PyArrowObjectType::PyCapsuleInterface || type == PyArrowObjectType::Table) {
// Get __arrow_c_schema__ if it exists
if (py::hasattr(arrow_obj_handle, "__arrow_c_schema__")) {
Expand Down
161 changes: 161 additions & 0 deletions src/duckdb_py/arrow/polars_filter_pushdown.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,161 @@
#include "duckdb_python/arrow/polars_filter_pushdown.hpp"

#include "duckdb/planner/filter/in_filter.hpp"
#include "duckdb/planner/filter/optional_filter.hpp"
#include "duckdb/planner/filter/conjunction_filter.hpp"
#include "duckdb/planner/filter/constant_filter.hpp"
#include "duckdb/planner/filter/struct_filter.hpp"
#include "duckdb/planner/table_filter.hpp"

#include "duckdb_python/pyconnection/pyconnection.hpp"
#include "duckdb_python/python_objects.hpp"

namespace duckdb {

static py::object TransformFilterRecursive(TableFilter &filter, py::object col_expr,
const ClientProperties &client_properties) {
auto &import_cache = *DuckDBPyConnection::ImportCache();

switch (filter.filter_type) {
case TableFilterType::CONSTANT_COMPARISON: {
auto &constant_filter = filter.Cast<ConstantFilter>();
auto &constant = constant_filter.constant;
auto &constant_type = constant.type();

// Check for NaN
bool is_nan = false;
if (constant_type.id() == LogicalTypeId::FLOAT) {
is_nan = Value::IsNan(constant.GetValue<float>());
} else if (constant_type.id() == LogicalTypeId::DOUBLE) {
is_nan = Value::IsNan(constant.GetValue<double>());
}

if (is_nan) {
switch (constant_filter.comparison_type) {
case ExpressionType::COMPARE_EQUAL:
case ExpressionType::COMPARE_GREATERTHANOREQUALTO:
return col_expr.attr("is_nan")();
case ExpressionType::COMPARE_LESSTHAN:
case ExpressionType::COMPARE_NOTEQUAL:
return col_expr.attr("is_nan")().attr("__invert__")();
case ExpressionType::COMPARE_GREATERTHAN:
return import_cache.polars.lit()(false);
case ExpressionType::COMPARE_LESSTHANOREQUALTO:
return import_cache.polars.lit()(true);
default:
return py::none();
}
}

// Convert DuckDB Value to Python object
auto py_value = PythonObject::FromValue(constant, constant_type, client_properties);

switch (constant_filter.comparison_type) {
case ExpressionType::COMPARE_EQUAL:
return col_expr.attr("__eq__")(py_value);
case ExpressionType::COMPARE_LESSTHAN:
return col_expr.attr("__lt__")(py_value);
case ExpressionType::COMPARE_GREATERTHAN:
return col_expr.attr("__gt__")(py_value);
case ExpressionType::COMPARE_LESSTHANOREQUALTO:
return col_expr.attr("__le__")(py_value);
case ExpressionType::COMPARE_GREATERTHANOREQUALTO:
return col_expr.attr("__ge__")(py_value);
case ExpressionType::COMPARE_NOTEQUAL:
return col_expr.attr("__ne__")(py_value);
default:
return py::none();
}
}
case TableFilterType::IS_NULL: {
return col_expr.attr("is_null")();
}
case TableFilterType::IS_NOT_NULL: {
return col_expr.attr("is_not_null")();
}
case TableFilterType::CONJUNCTION_AND: {
auto &and_filter = filter.Cast<ConjunctionAndFilter>();
py::object expression = py::none();
for (idx_t i = 0; i < and_filter.child_filters.size(); i++) {
auto child_expression = TransformFilterRecursive(*and_filter.child_filters[i], col_expr, client_properties);
if (child_expression.is(py::none())) {
continue;
}
if (expression.is(py::none())) {
expression = std::move(child_expression);
} else {
expression = expression.attr("__and__")(child_expression);
}
}
return expression;
}
case TableFilterType::CONJUNCTION_OR: {
auto &or_filter = filter.Cast<ConjunctionOrFilter>();
py::object expression = py::none();
for (idx_t i = 0; i < or_filter.child_filters.size(); i++) {
auto child_expression = TransformFilterRecursive(*or_filter.child_filters[i], col_expr, client_properties);
if (child_expression.is(py::none())) {
// Can't skip children in OR
return py::none();
}
if (expression.is(py::none())) {
expression = std::move(child_expression);
} else {
expression = expression.attr("__or__")(child_expression);
}
}
return expression;
}
case TableFilterType::STRUCT_EXTRACT: {
auto &struct_filter = filter.Cast<StructFilter>();
auto child_col = col_expr.attr("struct").attr("field")(struct_filter.child_name);
return TransformFilterRecursive(*struct_filter.child_filter, child_col, client_properties);
}
case TableFilterType::IN_FILTER: {
auto &in_filter = filter.Cast<InFilter>();
py::list py_values;
for (const auto &value : in_filter.values) {
py_values.append(PythonObject::FromValue(value, value.type(), client_properties));
}
return col_expr.attr("is_in")(py_values);
}
case TableFilterType::OPTIONAL_FILTER: {
auto &optional_filter = filter.Cast<OptionalFilter>();
if (!optional_filter.child_filter) {
return py::none();
}
return TransformFilterRecursive(*optional_filter.child_filter, col_expr, client_properties);
}
default:
// We skip DYNAMIC_FILTER, EXPRESSION_FILTER, BLOOM_FILTER
return py::none();
}
}

py::object PolarsFilterPushdown::TransformFilter(const TableFilterSet &filter_collection,
unordered_map<idx_t, string> &columns,
const unordered_map<idx_t, idx_t> &filter_to_col,
const ClientProperties &client_properties) {
auto &import_cache = *DuckDBPyConnection::ImportCache();
auto &filters_map = filter_collection.filters;

py::object expression = py::none();
for (auto &it : filters_map) {
auto column_idx = it.first;
auto &column_name = columns[column_idx];
auto col_expr = import_cache.polars.col()(column_name);

auto child_expression = TransformFilterRecursive(*it.second, col_expr, client_properties);
if (child_expression.is(py::none())) {
continue;
}
if (expression.is(py::none())) {
expression = std::move(child_expression);
} else {
expression = expression.attr("__and__")(child_expression);
}
}
return expression;
}

} // namespace duckdb
19 changes: 18 additions & 1 deletion src/duckdb_py/include/duckdb_python/arrow/arrow_array_stream.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,16 @@ class Table : public py::object {

} // namespace pyarrow

enum class PyArrowObjectType { Invalid, Table, Scanner, Dataset, PyCapsule, PyCapsuleInterface, MessageReader };
enum class PyArrowObjectType {
Invalid,
Table,
Scanner,
Dataset,
PyCapsule,
PyCapsuleInterface,
MessageReader,
PolarsLazyFrame
};

void TransformDuckToArrowChunk(ArrowSchema &arrow_schema, ArrowArray &data, py::list &batches);

Expand All @@ -66,6 +75,10 @@ class PythonTableArrowArrayStreamFactory {
}

~PythonTableArrowArrayStreamFactory() {
if (cached_arrow_table.ptr() != nullptr) {
py::gil_scoped_acquire acquire;
cached_arrow_table = py::object();
}
if (cached_schema.release) {
cached_schema.release(&cached_schema);
}
Expand All @@ -84,6 +97,10 @@ class PythonTableArrowArrayStreamFactory {
const ClientProperties client_properties;
const PyArrowObjectType cached_arrow_type;

//! Cached Arrow table from an unfiltered .collect().to_arrow() on a LazyFrame.
//! Avoids re-reading from source and re-converting on repeated scans without filters.
py::object cached_arrow_table;

private:
ArrowSchema cached_schema;
bool schema_cached = false;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
//===----------------------------------------------------------------------===//
// DuckDB
//
// duckdb_python/arrow/polars_filter_pushdown.hpp
//
//
//===----------------------------------------------------------------------===//

#pragma once

#include "duckdb/common/unordered_map.hpp"
#include "duckdb/planner/table_filter.hpp"
#include "duckdb/main/client_properties.hpp"
#include "duckdb_python/pybind11/pybind_wrapper.hpp"

namespace duckdb {

struct PolarsFilterPushdown {
static py::object TransformFilter(const TableFilterSet &filter_collection, unordered_map<idx_t, string> &columns,
const unordered_map<idx_t, idx_t> &filter_to_col,
const ClientProperties &client_properties);
};

} // namespace duckdb
Original file line number Diff line number Diff line change
Expand Up @@ -26,13 +26,17 @@ struct PolarsCacheItem : public PythonImportCacheItem {
static constexpr const char *Name = "polars";

public:
PolarsCacheItem() : PythonImportCacheItem("polars"), DataFrame("DataFrame", this), LazyFrame("LazyFrame", this) {
PolarsCacheItem()
: PythonImportCacheItem("polars"), DataFrame("DataFrame", this), LazyFrame("LazyFrame", this), col("col", this),
lit("lit", this) {
}
~PolarsCacheItem() override {
}

PythonImportCacheItem DataFrame;
PythonImportCacheItem LazyFrame;
PythonImportCacheItem col;
PythonImportCacheItem lit;

protected:
bool IsRequired() const override final {
Expand Down
4 changes: 1 addition & 3 deletions src/duckdb_py/python_replacement_scan.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -152,9 +152,7 @@ unique_ptr<TableRef> PythonReplacementScan::TryReplacementObject(const py::objec
CreateArrowScan(name, arrow_dataset, *table_function, children, client_properties, PyArrowObjectType::Table,
*context.db);
} else if (PolarsDataFrame::IsLazyFrame(entry)) {
auto materialized = entry.attr("collect")();
auto arrow_dataset = materialized.attr("to_arrow")();
CreateArrowScan(name, arrow_dataset, *table_function, children, client_properties, PyArrowObjectType::Table,
CreateArrowScan(name, entry, *table_function, children, client_properties, PyArrowObjectType::PolarsLazyFrame,
*context.db);
} else if ((arrow_type = DuckDBPyConnection::GetArrowType(entry)) != PyArrowObjectType::Invalid &&
!(arrow_type == PyArrowObjectType::MessageReader && !relation)) {
Expand Down
Loading
Loading