risingwave_frontend/optimizer/plan_node/generic/
union.rs1use pretty_xmlish::{Pretty, StrAssocArr};
16use risingwave_common::catalog::Schema;
17
18use super::{GenericPlanNode, GenericPlanRef, impl_distill_unit_from_fields};
19use crate::optimizer::optimizer_context::OptimizerContextRef;
20use crate::optimizer::property::FunctionalDependencySet;
21
22#[derive(Debug, Clone, PartialEq, Eq, Hash)]
25pub struct Union<PlanRef> {
26 pub all: bool,
27 pub inputs: Vec<PlanRef>,
28 pub source_col: Option<usize>,
32}
33
34impl<PlanRef: GenericPlanRef> GenericPlanNode for Union<PlanRef> {
35 fn schema(&self) -> Schema {
36 let mut schema = self.inputs[0].schema().clone();
37 if let Some(source_col) = self.source_col {
38 schema.fields[source_col].name = "$src".to_owned();
39 schema
40 } else {
41 schema
42 }
43 }
44
45 fn stream_key(&self) -> Option<Vec<usize>> {
46 let mut pk_indices = vec![];
48 for input in &self.inputs {
49 for pk in input.stream_key()? {
50 if !pk_indices.contains(pk) {
51 pk_indices.push(*pk);
52 }
53 }
54 }
55 if let Some(source_col) = self.source_col {
56 pk_indices.push(source_col)
57 }
58 Some(pk_indices)
59 }
60
61 fn ctx(&self) -> OptimizerContextRef {
62 self.inputs[0].ctx()
63 }
64
65 fn functional_dependency(&self) -> FunctionalDependencySet {
66 FunctionalDependencySet::new(self.inputs[0].schema().len())
67 }
68}
69
70impl<PlanRef: GenericPlanRef> Union<PlanRef> {
71 pub fn fields_pretty<'a>(&self) -> StrAssocArr<'a> {
72 vec![("all", Pretty::debug(&self.all))]
73 }
74}
75impl_distill_unit_from_fields!(Union, GenericPlanRef);