risingwave_frontend/planner/
delete.rs1use fixedbitset::FixedBitSet;
16
17use super::Planner;
18use crate::binder::BoundDelete;
19use crate::error::Result;
20use crate::optimizer::plan_node::{LogicalDelete, LogicalProject, generic};
21use crate::optimizer::property::{Order, RequiredDist};
22use crate::optimizer::{LogicalPlanRef as PlanRef, LogicalPlanRoot, PlanRoot};
23use crate::utils::ColIndexMapping;
24
25impl Planner {
26 pub(super) fn plan_delete(&mut self, delete: BoundDelete) -> Result<LogicalPlanRoot> {
27 let table_catalog = &delete.table.table_catalog;
28 let dml_output_indices: Vec<usize> = table_catalog
29 .columns()
30 .iter()
31 .enumerate()
32 .filter_map(|(i, c)| c.can_dml().then_some(i))
33 .collect();
34 let table_to_dml = ColIndexMapping::with_remaining_columns(
35 &dml_output_indices,
36 table_catalog.columns().len(),
37 );
38 let pk_indices: Vec<usize> = table_catalog
39 .pk()
40 .iter()
41 .map(|order| {
42 table_to_dml
43 .try_map(order.column_index)
44 .expect("pk column should be part of dml input schema")
45 })
46 .collect();
47
48 let scan = self.plan_base_table(&delete.table)?;
49 let input = if let Some(expr) = delete.selection {
50 self.plan_where(scan, expr)?
51 } else {
52 scan
53 };
54 let input = if dml_output_indices.len() != table_catalog.columns().len() {
55 LogicalProject::with_out_col_idx(input, dml_output_indices.iter().copied()).into()
56 } else {
57 input
58 };
59 let returning = !delete.returning_list.is_empty();
60 let mut plan: PlanRef = LogicalDelete::from(generic::Delete::new(
61 input,
62 delete.table_name.clone(),
63 delete.table_id,
64 delete.table_version_id,
65 pk_indices,
66 returning,
67 ))
68 .into();
69
70 if returning {
71 plan = LogicalProject::create(plan, delete.returning_list);
72 }
73
74 let dist = RequiredDist::Any;
76 let mut out_fields = FixedBitSet::with_capacity(plan.schema().len());
77 out_fields.insert_range(..);
78 let out_names = if returning {
79 delete.returning_schema.expect("If returning list is not empty, should provide returning schema in BoundDelete.").names()
80 } else {
81 plan.schema().names()
82 };
83
84 let root = PlanRoot::new_with_logical_plan(plan, dist, Order::any(), out_fields, out_names);
85 Ok(root)
86 }
87}