risingwave_frontend/planner/
set_operation.rs1use risingwave_common::catalog::PROJECTED_ROW_ID_COLUMN_NAME;
16use risingwave_common::util::column_index_mapping::ColIndexMapping;
17
18use crate::binder::{BoundSetExpr, BoundSetOperation};
19use crate::error::Result;
20use crate::optimizer::plan_node::generic::GenericPlanRef;
21use crate::optimizer::plan_node::{
22 LogicalExcept, LogicalIntersect, LogicalPlanRef as PlanRef, LogicalProject, LogicalUnion,
23};
24use crate::planner::Planner;
25
26impl Planner {
27 fn strip_projected_row_id(plan: PlanRef) -> PlanRef {
31 let schema = plan.schema();
32 if let Some(field) = schema.fields.first()
33 && field.name == PROJECTED_ROW_ID_COLUMN_NAME
34 {
35 let len = schema.len();
36 LogicalProject::with_out_col_idx(plan, 1..len).into()
37 } else {
38 plan
39 }
40 }
41
42 pub(super) fn plan_set_operation(
43 &mut self,
44 op: BoundSetOperation,
45 all: bool,
46 corresponding_col_indices: Option<(ColIndexMapping, ColIndexMapping)>,
47 left: BoundSetExpr,
48 right: BoundSetExpr,
49 ) -> Result<PlanRef> {
50 let left = self.plan_set_expr(left, vec![], &[])?;
51 let right = self.plan_set_expr(right, vec![], &[])?;
52
53 let (left, right) = if let Some((mapping_l, mapping_r)) = corresponding_col_indices {
55 (
56 LogicalProject::with_mapping(left, mapping_l).into(),
57 LogicalProject::with_mapping(right, mapping_r).into(),
58 )
59 } else {
60 (left, right)
61 };
62
63 let left = Self::strip_projected_row_id(left);
66 let right = Self::strip_projected_row_id(right);
67
68 match op {
69 BoundSetOperation::Union => Ok(LogicalUnion::create(all, vec![left, right])),
70 BoundSetOperation::Intersect => Ok(LogicalIntersect::create(all, vec![left, right])),
71 BoundSetOperation::Except => Ok(LogicalExcept::create(all, vec![left, right])),
72 }
73 }
74}