Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ rust_decimal = { version = "1" }
serde = { version = "1", features = ["derive", "rc"] }
kite_sql_serde_macros = { version = "0.1.0", path = "kite_sql_serde_macros" }
siphasher = { version = "1", features = ["serde"] }
sqlparser = { version = "0.34", features = ["serde"] }
sqlparser = { version = "0.61", features = ["serde"] }
thiserror = { version = "1" }
typetag = { version = "0.2" }
ulid = { version = "1", features = ["serde"] }
Expand Down
10 changes: 3 additions & 7 deletions src/binder/aggregate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -87,18 +87,14 @@ impl<T: Transaction, A: AsRef<[(&'static str, DataValue)]>> Binder<'_, '_, T, A>
let return_orderby = if !orderbys.is_empty() {
let mut return_orderby = vec![];
for orderby in orderbys {
let OrderByExpr {
expr,
asc,
nulls_first,
} = orderby;
let OrderByExpr { expr, options, .. } = orderby;
let mut expr = self.bind_expr(expr)?;
self.visit_column_agg_expr(&mut expr)?;

return_orderby.push(SortField::new(
expr,
asc.is_none_or(|asc| asc),
nulls_first.unwrap_or(false),
options.asc.is_none_or(|asc| asc),
options.nulls_first.unwrap_or(false),
));
}
Some(return_orderby)
Expand Down
17 changes: 12 additions & 5 deletions src/binder/alter_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ use sqlparser::ast::{AlterTableOperation, ObjectName};

use std::sync::Arc;

use super::{is_valid_identifier, Binder};
use super::{attach_span_if_absent, is_valid_identifier, Binder};
use crate::binder::lower_case_name;
use crate::errors::DatabaseError;
use crate::planner::operator::alter_table::add_column::AddColumnOperator;
Expand All @@ -43,13 +43,15 @@ impl<T: Transaction, A: AsRef<[(&'static str, DataValue)]>> Binder<'_, '_, T, A>
column_keyword: _,
if_not_exists,
column_def,
..
} => {
let plan = TableScanOperator::build(table_name.clone(), table, true)?;
let column = self.bind_column(column_def, None)?;

if !is_valid_identifier(column.name()) {
return Err(DatabaseError::InvalidColumn(
"illegal column naming".to_string(),
return Err(attach_span_if_absent(
DatabaseError::invalid_column("illegal column naming".to_string()),
column_def,
));
}
LogicalPlan::new(
Expand All @@ -62,12 +64,17 @@ impl<T: Transaction, A: AsRef<[(&'static str, DataValue)]>> Binder<'_, '_, T, A>
)
}
AlterTableOperation::DropColumn {
column_name,
column_names,
if_exists,
..
} => {
let plan = TableScanOperator::build(table_name.clone(), table, true)?;
let column_name = column_name.value.clone();
if column_names.len() != 1 {
return Err(DatabaseError::UnsupportedStmt(
"only dropping a single column is supported".to_string(),
));
}
let column_name = column_names[0].value.clone();

LogicalPlan::new(
Operator::DropColumn(DropColumnOperator {
Expand Down
18 changes: 10 additions & 8 deletions src/binder/create_index.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,23 +22,25 @@ use crate::planner::{Childrens, LogicalPlan};
use crate::storage::Transaction;
use crate::types::index::IndexType;
use crate::types::value::DataValue;
use sqlparser::ast::{ObjectName, OrderByExpr};
use sqlparser::ast::{IndexColumn, ObjectName};
use std::sync::Arc;

impl<T: Transaction, A: AsRef<[(&'static str, DataValue)]>> Binder<'_, '_, T, A> {
pub(crate) fn bind_create_index(
&mut self,
table_name: &ObjectName,
name: &ObjectName,
exprs: &[OrderByExpr],
name: Option<&ObjectName>,
index_columns: &[IndexColumn],
if_not_exists: bool,
is_unique: bool,
) -> Result<LogicalPlan, DatabaseError> {
let table_name: Arc<str> = lower_case_name(table_name)?.into();
let index_name = lower_case_name(name)?;
let index_name = name
.ok_or(DatabaseError::InvalidIndex)
.and_then(lower_case_name)?;
let ty = if is_unique {
IndexType::Unique
} else if exprs.len() == 1 {
} else if index_columns.len() == 1 {
IndexType::Normal
} else {
IndexType::Composite
Expand All @@ -52,11 +54,11 @@ impl<T: Transaction, A: AsRef<[(&'static str, DataValue)]>> Binder<'_, '_, T, A>
Source::Table(table) => TableScanOperator::build(table_name.clone(), table, true)?,
Source::View(view) => LogicalPlan::clone(&view.plan),
};
let mut columns = Vec::with_capacity(exprs.len());
let mut columns = Vec::with_capacity(index_columns.len());

for expr in exprs {
for index_column in index_columns {
// TODO: Expression Index
match self.bind_expr(&expr.expr)? {
match self.bind_expr(&index_column.column.expr)? {
ScalarExpression::ColumnRef { column, .. } => columns.push(column),
expr => {
return Err(DatabaseError::UnsupportedStmt(format!(
Expand Down
89 changes: 51 additions & 38 deletions src/binder/create_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use super::{is_valid_identifier, Binder};
use super::{attach_span_if_absent, is_valid_identifier, Binder};
use crate::binder::lower_case_name;
use crate::catalog::{ColumnCatalog, ColumnDesc};
use crate::errors::DatabaseError;
Expand All @@ -24,7 +24,7 @@ use crate::storage::Transaction;
use crate::types::value::DataValue;
use crate::types::LogicalType;
use itertools::Itertools;
use sqlparser::ast::{ColumnDef, ColumnOption, ObjectName, TableConstraint};
use sqlparser::ast::{ColumnDef, ColumnOption, Expr, IndexColumn, ObjectName, TableConstraint};
use std::collections::HashSet;
use std::sync::Arc;

Expand All @@ -40,8 +40,9 @@ impl<T: Transaction, A: AsRef<[(&'static str, DataValue)]>> Binder<'_, '_, T, A>
let table_name: Arc<str> = lower_case_name(name)?.into();

if !is_valid_identifier(&table_name) {
return Err(DatabaseError::InvalidTable(
"illegal table naming".to_string(),
return Err(attach_span_if_absent(
DatabaseError::invalid_table("illegal table naming".to_string()),
name,
));
}
{
Expand All @@ -53,8 +54,9 @@ impl<T: Transaction, A: AsRef<[(&'static str, DataValue)]>> Binder<'_, '_, T, A>
return Err(DatabaseError::DuplicateColumn(col_name.clone()));
}
if !is_valid_identifier(col_name) {
return Err(DatabaseError::InvalidColumn(
"illegal column naming".to_string(),
return Err(attach_span_if_absent(
DatabaseError::invalid_column("illegal column naming".to_string()),
col,
));
}
}
Expand All @@ -66,27 +68,15 @@ impl<T: Transaction, A: AsRef<[(&'static str, DataValue)]>> Binder<'_, '_, T, A>
.try_collect()?;
for constraint in constraints {
match constraint {
TableConstraint::Unique {
columns: column_names,
is_primary,
..
} => {
for (i, column_name) in column_names
.iter()
.map(|ident| ident.value.to_lowercase())
.enumerate()
{
if let Some(column) = columns
.iter_mut()
.find(|column| column.name() == column_name)
{
if *is_primary {
column.desc_mut().set_primary(Some(i));
} else {
column.desc_mut().set_unique(true);
}
}
}
TableConstraint::PrimaryKey(primary) => {
Self::bind_constraint(&mut columns, &primary.columns, |i, desc| {
desc.set_primary(Some(i))
})?;
}
TableConstraint::Unique(unique) => {
Self::bind_constraint(&mut columns, &unique.columns, |_, desc| {
desc.set_unique()
})?;
}
constraint => {
return Err(DatabaseError::UnsupportedStmt(format!(
Expand All @@ -97,8 +87,11 @@ impl<T: Transaction, A: AsRef<[(&'static str, DataValue)]>> Binder<'_, '_, T, A>
}

if columns.iter().filter(|col| col.desc().is_primary()).count() == 0 {
return Err(DatabaseError::InvalidTable(
"the primary key field must exist and have at least one".to_string(),
return Err(attach_span_if_absent(
DatabaseError::invalid_table(
"the primary key field must exist and have at least one".to_string(),
),
name,
));
}

Expand All @@ -112,6 +105,29 @@ impl<T: Transaction, A: AsRef<[(&'static str, DataValue)]>> Binder<'_, '_, T, A>
))
}

fn bind_constraint<F: Fn(usize, &mut ColumnDesc)>(
table_columns: &mut [ColumnCatalog],
exprs: &[IndexColumn],
fn_constraint: F,
) -> Result<(), DatabaseError> {
for (i, index_column) in exprs.iter().enumerate() {
let Expr::Identifier(ident) = &index_column.column.expr else {
return Err(DatabaseError::UnsupportedStmt(
"only identifier columns are supported in `PRIMARY KEY/UNIQUE`".to_string(),
));
};
let column_name = ident.value.to_lowercase();

if let Some(column) = table_columns
.iter_mut()
.find(|column| column.name() == column_name)
{
fn_constraint(i, column.desc_mut())
}
}
Ok(())
}

pub fn bind_column(
&mut self,
column_def: &ColumnDef,
Expand All @@ -130,16 +146,13 @@ impl<T: Transaction, A: AsRef<[(&'static str, DataValue)]>> Binder<'_, '_, T, A>
match &option_def.option {
ColumnOption::Null => nullable = true,
ColumnOption::NotNull => nullable = false,
ColumnOption::Unique { is_primary, .. } => {
if *is_primary {
column_desc.set_primary(column_index);
nullable = false;
// Skip other options when using primary key
break;
} else {
column_desc.set_unique(true);
}
ColumnOption::PrimaryKey(_) => {
column_desc.set_primary(column_index);
nullable = false;
// Skip other options when using primary key
break;
}
ColumnOption::Unique(_) => column_desc.set_unique(),
ColumnOption::Default(expr) => {
let mut expr = self.bind_expr(expr)?;

Expand Down
64 changes: 39 additions & 25 deletions src/binder/create_view.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ use crate::planner::{Childrens, LogicalPlan};
use crate::storage::Transaction;
use crate::types::value::DataValue;
use itertools::Itertools;
use sqlparser::ast::{Ident, ObjectName, Query};
use sqlparser::ast::{ObjectName, Query, ViewColumnDef};
use std::sync::Arc;
use ulid::Ulid;

Expand All @@ -32,41 +32,55 @@ impl<T: Transaction, A: AsRef<[(&'static str, DataValue)]>> Binder<'_, '_, T, A>
&mut self,
or_replace: &bool,
name: &ObjectName,
columns: &[Ident],
columns: &[ViewColumnDef],
query: &Query,
) -> Result<LogicalPlan, DatabaseError> {
fn projection_exprs(
view_name: &Arc<str>,
mapping_schema: &[ColumnRef],
column_names: impl Iterator<Item = String>,
) -> Vec<ScalarExpression> {
column_names
.enumerate()
.map(|(i, column_name)| {
let mapping_column = &mapping_schema[i];
let mut column = ColumnCatalog::new(
column_name,
mapping_column.nullable(),
mapping_column.desc().clone(),
);
column.set_ref_table(view_name.clone(), Ulid::new(), true);

ScalarExpression::Alias {
expr: Box::new(ScalarExpression::column_expr(mapping_column.clone())),
alias: AliasType::Expr(Box::new(ScalarExpression::column_expr(
ColumnRef::from(column),
))),
}
})
.collect_vec()
}

let view_name: Arc<str> = lower_case_name(name)?.into();
let mut plan = self.bind_query(query)?;

let mapping_schema = plan.output_schema();

let exprs = if columns.is_empty() {
Box::new(
let exprs: Vec<ScalarExpression> = if columns.is_empty() {
projection_exprs(
&view_name,
mapping_schema,
mapping_schema
.iter()
.map(|column| column.name().to_string()),
) as Box<dyn Iterator<Item = String>>
)
} else {
Box::new(columns.iter().map(lower_ident)) as Box<dyn Iterator<Item = String>>
}
.enumerate()
.map(|(i, column_name)| {
let mapping_column = &mapping_schema[i];
let mut column = ColumnCatalog::new(
column_name,
mapping_column.nullable(),
mapping_column.desc().clone(),
);
column.set_ref_table(view_name.clone(), Ulid::new(), true);

ScalarExpression::Alias {
expr: Box::new(ScalarExpression::column_expr(mapping_column.clone())),
alias: AliasType::Expr(Box::new(ScalarExpression::column_expr(ColumnRef::from(
column,
)))),
}
})
.collect_vec();
projection_exprs(
&view_name,
mapping_schema,
columns.iter().map(|column| lower_ident(&column.name)),
)
};
plan = self.bind_project(plan, exprs)?;

Ok(LogicalPlan::new(
Expand Down
2 changes: 1 addition & 1 deletion src/binder/delete.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ impl<T: Transaction, A: AsRef<[(&'static str, DataValue)]>> Binder<'_, '_, T, A>
let mut table_alias = None;
let mut alias_idents = None;

if let Some(TableAlias { name, columns }) = alias {
if let Some(TableAlias { name, columns, .. }) = alias {
table_alias = Some(name.value.to_lowercase().into());
alias_idents = Some(columns);
}
Expand Down
Loading
Loading