risingwave_frontend/optimizer/plan_node/generic/
dedup.rs1use pretty_xmlish::{Pretty, Str, XmlNode};
16use risingwave_common::catalog::{FieldDisplay, Schema};
17
18use super::{DistillUnit, GenericPlanNode, GenericPlanRef};
19use crate::OptimizerContextRef;
20use crate::optimizer::plan_node::utils::childless_record;
21use crate::optimizer::property::FunctionalDependencySet;
22
23#[derive(Debug, Clone, PartialEq, Eq, Hash)]
24pub struct Dedup<PlanRef> {
25 pub input: PlanRef,
26 pub dedup_cols: Vec<usize>,
28}
29
30impl<PlanRef: GenericPlanRef> Dedup<PlanRef> {
31 pub fn clone_with_input<OtherPlanRef>(&self, input: OtherPlanRef) -> Dedup<OtherPlanRef> {
32 Dedup {
33 input,
34 dedup_cols: self.dedup_cols.clone(),
35 }
36 }
37
38 pub fn new(input: PlanRef, dedup_cols: Vec<usize>) -> Self {
39 debug_assert!(
40 dedup_cols.iter().all(|&idx| idx < input.schema().len()),
41 "Invalid dedup keys {:?} input schema size = {}",
42 &dedup_cols,
43 input.schema().len()
44 );
45 Dedup { input, dedup_cols }
46 }
47
48 fn dedup_cols_pretty<'a>(&self) -> Pretty<'a> {
49 Pretty::Array(
50 self.dedup_cols
51 .iter()
52 .map(|i| FieldDisplay(self.input.schema().fields.get(*i).unwrap()))
53 .map(|fd| Pretty::display(&fd))
54 .collect(),
55 )
56 }
57}
58
59impl<PlanRef: GenericPlanRef> DistillUnit for Dedup<PlanRef> {
60 fn distill_with_name<'a>(&self, name: impl Into<Str<'a>>) -> XmlNode<'a> {
61 childless_record(name, vec![("dedup_cols", self.dedup_cols_pretty())])
62 }
63}
64
65impl<PlanRef: GenericPlanRef> GenericPlanNode for Dedup<PlanRef> {
66 fn schema(&self) -> Schema {
67 self.input.schema().clone()
68 }
69
70 fn stream_key(&self) -> Option<Vec<usize>> {
71 Some(self.dedup_cols.clone())
72 }
73
74 fn ctx(&self) -> OptimizerContextRef {
75 self.input.ctx()
76 }
77
78 fn functional_dependency(&self) -> FunctionalDependencySet {
79 self.input.functional_dependency().clone()
80 }
81}