risingwave_frontend/planner/
set_operation.rs

1// Copyright 2022 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use risingwave_common::catalog::PROJECTED_ROW_ID_COLUMN_NAME;
16use risingwave_common::util::column_index_mapping::ColIndexMapping;
17
18use crate::binder::{BoundSetExpr, BoundSetOperation};
19use crate::error::Result;
20use crate::optimizer::plan_node::generic::GenericPlanRef;
21use crate::optimizer::plan_node::{
22    LogicalExcept, LogicalIntersect, LogicalPlanRef as PlanRef, LogicalProject, LogicalUnion,
23};
24use crate::planner::Planner;
25
26impl Planner {
27    /// Strip the hidden `projected_row_id` column if present as the first column.
28    /// This is necessary because `LogicalProjectSet` adds this hidden column for set-returning
29    /// functions like `unnest()`, but it should not participate in set operations.
30    fn strip_projected_row_id(plan: PlanRef) -> PlanRef {
31        let schema = plan.schema();
32        if let Some(field) = schema.fields.first()
33            && field.name == PROJECTED_ROW_ID_COLUMN_NAME
34        {
35            let len = schema.len();
36            LogicalProject::with_out_col_idx(plan, 1..len).into()
37        } else {
38            plan
39        }
40    }
41
42    pub(super) fn plan_set_operation(
43        &mut self,
44        op: BoundSetOperation,
45        all: bool,
46        corresponding_col_indices: Option<(ColIndexMapping, ColIndexMapping)>,
47        left: BoundSetExpr,
48        right: BoundSetExpr,
49    ) -> Result<PlanRef> {
50        let left = self.plan_set_expr(left, vec![], &[])?;
51        let right = self.plan_set_expr(right, vec![], &[])?;
52
53        // Map the corresponding columns
54        let (left, right) = if let Some((mapping_l, mapping_r)) = corresponding_col_indices {
55            (
56                LogicalProject::with_mapping(left, mapping_l).into(),
57                LogicalProject::with_mapping(right, mapping_r).into(),
58            )
59        } else {
60            (left, right)
61        };
62
63        // Strip hidden `projected_row_id` column from both sides to ensure schema alignment.
64        // This column is added by `LogicalProjectSet` for set-returning functions like `unnest()`.
65        let left = Self::strip_projected_row_id(left);
66        let right = Self::strip_projected_row_id(right);
67
68        match op {
69            BoundSetOperation::Union => Ok(LogicalUnion::create(all, vec![left, right])),
70            BoundSetOperation::Intersect => Ok(LogicalIntersect::create(all, vec![left, right])),
71            BoundSetOperation::Except => Ok(LogicalExcept::create(all, vec![left, right])),
72        }
73    }
74}