risingwave_frontend/planner/
insert.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use fixedbitset::FixedBitSet;
16
17use crate::binder::BoundInsert;
18use crate::error::Result;
19use crate::optimizer::plan_node::{
20    LogicalInsert, LogicalPlanRef as PlanRef, LogicalProject, generic,
21};
22use crate::optimizer::property::{Order, RequiredDist};
23use crate::optimizer::{LogicalPlanRoot, PlanRoot};
24use crate::planner::Planner;
25
26impl Planner {
27    pub(super) fn plan_insert(&mut self, insert: BoundInsert) -> Result<LogicalPlanRoot> {
28        let mut input = self.plan_query(insert.source)?.into_unordered_subplan();
29        if !insert.cast_exprs.is_empty() {
30            input = LogicalProject::create(input, insert.cast_exprs);
31        }
32        let returning = !insert.returning_list.is_empty();
33        let mut plan: PlanRef = LogicalInsert::new(generic::Insert::new(
34            input,
35            insert.table_name.clone(),
36            insert.table_id,
37            insert.table_version_id,
38            insert.table_visible_columns,
39            insert.column_indices,
40            insert.default_columns,
41            insert.row_id_index,
42            returning,
43        ))
44        .into();
45        // If containing RETURNING, add one logicalproject node
46        if returning {
47            plan = LogicalProject::create(plan, insert.returning_list);
48        }
49        // For insert, frontend will only schedule one task so do not need this to be single.
50        let dist = RequiredDist::Any;
51        let mut out_fields = FixedBitSet::with_capacity(plan.schema().len());
52        out_fields.insert_range(..);
53        let out_names = if returning {
54            insert.returning_schema.expect("If returning list is not empty, should provide returning schema in BoundInsert.").names()
55        } else {
56            plan.schema().names()
57        };
58        let root = PlanRoot::new_with_logical_plan(plan, dist, Order::any(), out_fields, out_names);
59        Ok(root)
60    }
61}