risingwave_frontend/planner/
delete.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use fixedbitset::FixedBitSet;
16
17use super::Planner;
18use crate::binder::BoundDelete;
19use crate::error::Result;
20use crate::optimizer::plan_node::{LogicalDelete, LogicalProject, generic};
21use crate::optimizer::property::{Order, RequiredDist};
22use crate::optimizer::{LogicalPlanRef as PlanRef, LogicalPlanRoot, PlanRoot};
23use crate::utils::ColIndexMapping;
24
25impl Planner {
26    pub(super) fn plan_delete(&mut self, delete: BoundDelete) -> Result<LogicalPlanRoot> {
27        let table_catalog = &delete.table.table_catalog;
28        let dml_output_indices: Vec<usize> = table_catalog
29            .columns()
30            .iter()
31            .enumerate()
32            .filter_map(|(i, c)| c.can_dml().then_some(i))
33            .collect();
34        let table_to_dml = ColIndexMapping::with_remaining_columns(
35            &dml_output_indices,
36            table_catalog.columns().len(),
37        );
38        let pk_indices: Vec<usize> = table_catalog
39            .pk()
40            .iter()
41            .map(|order| {
42                table_to_dml
43                    .try_map(order.column_index)
44                    .expect("pk column should be part of dml input schema")
45            })
46            .collect();
47
48        let scan = self.plan_base_table(&delete.table)?;
49        let input = if let Some(expr) = delete.selection {
50            self.plan_where(scan, expr)?
51        } else {
52            scan
53        };
54        let input = if dml_output_indices.len() != table_catalog.columns().len() {
55            LogicalProject::with_out_col_idx(input, dml_output_indices.iter().copied()).into()
56        } else {
57            input
58        };
59        let returning = !delete.returning_list.is_empty();
60        let mut plan: PlanRef = LogicalDelete::from(generic::Delete::new(
61            input,
62            delete.table_name.clone(),
63            delete.table_id,
64            delete.table_version_id,
65            pk_indices,
66            returning,
67        ))
68        .into();
69
70        if returning {
71            plan = LogicalProject::create(plan, delete.returning_list);
72        }
73
74        // For delete, frontend will only schedule one task so do not need this to be single.
75        let dist = RequiredDist::Any;
76        let mut out_fields = FixedBitSet::with_capacity(plan.schema().len());
77        out_fields.insert_range(..);
78        let out_names = if returning {
79            delete.returning_schema.expect("If returning list is not empty, should provide returning schema in BoundDelete.").names()
80        } else {
81            plan.schema().names()
82        };
83
84        let root = PlanRoot::new_with_logical_plan(plan, dist, Order::any(), out_fields, out_names);
85        Ok(root)
86    }
87}