use std::collections::HashMap;
use itertools::Itertools;
use risingwave_common::bail_not_implemented;
use risingwave_common::catalog::Schema;
use risingwave_common::types::DataType;
use risingwave_common::util::iter_util::ZipEqFast;
use risingwave_common::util::sort_util::ColumnOrder;
use risingwave_expr::ExprError;
use risingwave_pb::plan_common::JoinType;
use crate::binder::{BoundDistinct, BoundSelect};
use crate::error::{ErrorCode, Result};
use crate::expr::{
CorrelatedId, Expr, ExprImpl, ExprRewriter, ExprType, FunctionCall, InputRef, Subquery,
SubqueryKind,
};
use crate::optimizer::plan_node::generic::{Agg, GenericPlanRef, Project, ProjectBuilder};
pub use crate::optimizer::plan_node::LogicalFilter;
use crate::optimizer::plan_node::{
LogicalAgg, LogicalApply, LogicalDedup, LogicalOverWindow, LogicalProject, LogicalProjectSet,
LogicalTopN, LogicalValues, PlanAggCall, PlanRef,
};
use crate::optimizer::property::Order;
use crate::planner::Planner;
use crate::utils::{Condition, IndexSet};
use crate::OptimizerContextRef;
impl Planner {
pub(super) fn plan_select(
&mut self,
BoundSelect {
from,
where_clause,
mut select_items,
group_by,
mut having,
distinct,
..
}: BoundSelect,
extra_order_exprs: Vec<ExprImpl>,
order: &[ColumnOrder],
) -> Result<PlanRef> {
if distinct.is_distinct() && !extra_order_exprs.is_empty() {
return Err(ErrorCode::InvalidInputSyntax(
"for SELECT DISTINCT, ORDER BY expressions must appear in select list".into(),
)
.into());
}
select_items.extend(extra_order_exprs);
if let BoundDistinct::DistinctOn(exprs) = &distinct {
let mut distinct_on_exprs: HashMap<ExprImpl, bool> =
exprs.iter().map(|expr| (expr.clone(), false)).collect();
let mut uncovered_distinct_on_exprs_cnt = distinct_on_exprs.len();
let mut order_iter = order.iter().map(|o| &select_items[o.column_index]);
while uncovered_distinct_on_exprs_cnt > 0
&& let Some(order_expr) = order_iter.next()
{
match distinct_on_exprs.get_mut(order_expr) {
Some(has_been_covered) => {
if !*has_been_covered {
*has_been_covered = true;
uncovered_distinct_on_exprs_cnt -= 1;
}
}
None => {
return Err(ErrorCode::InvalidInputSyntax(
"the SELECT DISTINCT ON expressions must match the leftmost ORDER BY expressions"
.into(),
)
.into());
}
}
}
}
let mut root = match from {
None => self.create_dummy_values(),
Some(t) => self.plan_relation(t)?,
};
if let Some(where_clause) = where_clause {
root = self.plan_where(root, where_clause)?;
}
let has_agg_call = select_items.iter().any(|expr| expr.has_agg_call());
if !group_by.is_empty() || having.is_some() || has_agg_call {
(root, select_items, having) =
LogicalAgg::create(select_items, group_by, having, root)?;
}
if let Some(having) = having {
root = self.plan_where(root, having)?;
}
if select_items.iter().any(|e| e.has_subquery()) {
(root, select_items) = self.substitute_subqueries(root, select_items)?;
}
if select_items.iter().any(|e| e.has_window_function()) {
(root, select_items) = LogicalOverWindow::create(root, select_items)?;
}
let original_select_items_len = select_items.len();
let mut distinct_list_index_to_select_items_index = vec![];
if let BoundDistinct::DistinctOn(distinct_list) = &distinct {
distinct_list_index_to_select_items_index.reserve(distinct_list.len());
let mut builder_index_to_select_items_index =
Vec::with_capacity(original_select_items_len);
let mut input_proj_builder = ProjectBuilder::default();
for (select_item_index, select_item) in select_items.iter().enumerate() {
let builder_index = input_proj_builder
.add_expr(select_item)
.map_err(|msg| ExprError::UnsupportedFunction(String::from(msg)))?;
if builder_index >= builder_index_to_select_items_index.len() {
debug_assert_eq!(builder_index, builder_index_to_select_items_index.len());
builder_index_to_select_items_index.push(select_item_index);
}
}
for distinct_expr in distinct_list {
let builder_index = input_proj_builder
.add_expr(distinct_expr)
.map_err(|msg| ExprError::UnsupportedFunction(String::from(msg)))?;
if builder_index >= builder_index_to_select_items_index.len() {
debug_assert_eq!(builder_index, builder_index_to_select_items_index.len());
select_items.push(distinct_expr.clone());
builder_index_to_select_items_index.push(select_items.len() - 1);
}
distinct_list_index_to_select_items_index
.push(builder_index_to_select_items_index[builder_index]);
}
}
let need_restore_select_items = select_items.len() > original_select_items_len;
root = LogicalProjectSet::create(root, select_items);
if matches!(&distinct, BoundDistinct::DistinctOn(_)) {
root = if order.is_empty() {
LogicalDedup::new(root, distinct_list_index_to_select_items_index).into()
} else {
LogicalTopN::new(
root,
1,
0,
false,
Order::new(order.to_vec()),
distinct_list_index_to_select_items_index,
)
.into()
};
}
if need_restore_select_items {
root = LogicalProject::with_core(Project::with_out_col_idx(
root,
0..original_select_items_len,
))
.into();
}
if let BoundDistinct::Distinct = distinct {
let fields = root.schema().fields();
let group_key = if let Some(field) = fields.first()
&& field.name == "projected_row_id"
{
(1..fields.len()).collect()
} else {
(0..fields.len()).collect()
};
root = Agg::new(vec![], group_key, root).into();
}
Ok(root)
}
fn create_dummy_values(&self) -> PlanRef {
LogicalValues::create(vec![vec![]], Schema::default(), self.ctx.clone())
}
fn create_exists(&self, input: PlanRef) -> Result<PlanRef> {
let count_star = Agg::new(vec![PlanAggCall::count_star()], IndexSet::empty(), input);
let ge = FunctionCall::new(
ExprType::GreaterThanOrEqual,
vec![
InputRef::new(0, DataType::Int64).into(),
ExprImpl::literal_int(1),
],
)
.unwrap();
Ok(LogicalProject::create(count_star.into(), vec![ge.into()]))
}
pub(super) fn plan_where(
&mut self,
mut input: PlanRef,
where_clause: ExprImpl,
) -> Result<PlanRef> {
if !where_clause.has_subquery() {
return Ok(LogicalFilter::create_with_expr(input, where_clause));
}
let (subquery_conjunctions, not_subquery_conjunctions, others) =
Condition::with_expr(where_clause)
.group_by::<_, 3>(|expr| match expr {
ExprImpl::Subquery(_) => 0,
ExprImpl::FunctionCall(func_call)
if func_call.func_type() == ExprType::Not
&& matches!(func_call.inputs()[0], ExprImpl::Subquery(_)) =>
{
1
}
_ => 2,
})
.into_iter()
.next_tuple()
.unwrap();
for expr in subquery_conjunctions {
self.handle_exists_and_in(expr, false, &mut input)?;
}
for expr in not_subquery_conjunctions {
let not = expr.into_function_call().unwrap();
let (_, expr) = not.decompose_as_unary();
self.handle_exists_and_in(expr, true, &mut input)?;
}
if others.always_true() {
Ok(input)
} else {
let (input, others) = self.substitute_subqueries(input, others.conjunctions)?;
Ok(LogicalFilter::create(
input,
Condition {
conjunctions: others,
},
))
}
}
fn handle_exists_and_in(
&mut self,
expr: ExprImpl,
negated: bool,
input: &mut PlanRef,
) -> Result<()> {
let join_type = if negated {
JoinType::LeftAnti
} else {
JoinType::LeftSemi
};
let correlated_id = self.ctx.next_correlated_id();
let mut subquery = expr.into_subquery().unwrap();
let correlated_indices =
subquery.collect_correlated_indices_by_depth_and_assign_id(0, correlated_id);
let output_column_type = subquery.query.data_types()[0].clone();
let right_plan = self.plan_query(subquery.query)?.into_unordered_subplan();
let on = match subquery.kind {
SubqueryKind::Existential => ExprImpl::literal_bool(true),
SubqueryKind::In(left_expr) => {
let right_expr = InputRef::new(input.schema().len(), output_column_type);
FunctionCall::new(ExprType::Equal, vec![left_expr, right_expr.into()])?.into()
}
kind => bail_not_implemented!(issue = 1343, "Not supported subquery kind: {:?}", kind),
};
*input = Self::create_apply(
correlated_id,
correlated_indices,
input.clone(),
right_plan,
on,
join_type,
false,
);
Ok(())
}
pub(super) fn substitute_subqueries(
&mut self,
mut root: PlanRef,
mut exprs: Vec<ExprImpl>,
) -> Result<(PlanRef, Vec<ExprImpl>)> {
struct SubstituteSubQueries {
input_col_num: usize,
subqueries: Vec<Subquery>,
correlated_indices_collection: Vec<Vec<usize>>,
correlated_ids: Vec<CorrelatedId>,
ctx: OptimizerContextRef,
}
impl ExprRewriter for SubstituteSubQueries {
fn rewrite_subquery(&mut self, mut subquery: Subquery) -> ExprImpl {
let correlated_id = self.ctx.next_correlated_id();
self.correlated_ids.push(correlated_id);
let input_ref = InputRef::new(self.input_col_num, subquery.return_type()).into();
self.input_col_num += 1;
self.correlated_indices_collection.push(
subquery.collect_correlated_indices_by_depth_and_assign_id(0, correlated_id),
);
self.subqueries.push(subquery);
input_ref
}
}
let mut rewriter = SubstituteSubQueries {
input_col_num: root.schema().len(),
subqueries: vec![],
correlated_indices_collection: vec![],
correlated_ids: vec![],
ctx: self.ctx.clone(),
};
exprs = exprs
.into_iter()
.map(|e| rewriter.rewrite_expr(e))
.collect();
for ((subquery, correlated_indices), correlated_id) in rewriter
.subqueries
.into_iter()
.zip_eq_fast(rewriter.correlated_indices_collection)
.zip_eq_fast(rewriter.correlated_ids)
{
let return_type = subquery.return_type();
let subroot = self.plan_query(subquery.query)?;
let right = match subquery.kind {
SubqueryKind::Scalar => subroot.into_unordered_subplan(),
SubqueryKind::UpdateSet => {
let plan = subroot.into_unordered_subplan();
let all_input_refs = plan
.schema()
.data_types()
.into_iter()
.enumerate()
.map(|(i, data_type)| InputRef::new(i, data_type).into())
.collect::<Vec<_>>();
let call =
FunctionCall::new_unchecked(ExprType::Row, all_input_refs, return_type);
LogicalProject::create(plan, vec![call.into()])
}
SubqueryKind::Existential => {
self.create_exists(subroot.into_unordered_subplan())?
}
SubqueryKind::Array => subroot.into_array_agg()?,
_ => bail_not_implemented!(issue = 1343, "{:?}", subquery.kind),
};
root = Self::create_apply(
correlated_id,
correlated_indices,
root,
right,
ExprImpl::literal_bool(true),
JoinType::LeftOuter,
true,
);
}
Ok((root, exprs))
}
fn create_apply(
correlated_id: CorrelatedId,
correlated_indices: Vec<usize>,
left: PlanRef,
right: PlanRef,
on: ExprImpl,
join_type: JoinType,
max_one_row: bool,
) -> PlanRef {
LogicalApply::create(
left,
right,
join_type,
Condition::with_expr(on),
correlated_id,
correlated_indices,
max_one_row,
)
}
}