risingwave_frontend/optimizer/plan_node/generic/
dedup.rs1use pretty_xmlish::{Pretty, Str, XmlNode};
16use risingwave_common::catalog::{FieldDisplay, Schema};
17
18use super::{DistillUnit, GenericPlanNode, GenericPlanRef};
19use crate::OptimizerContextRef;
20use crate::optimizer::plan_node::utils::childless_record;
21use crate::optimizer::property::FunctionalDependencySet;
22
23#[derive(Debug, Clone, PartialEq, Eq, Hash)]
24pub struct Dedup<PlanRef> {
25 pub input: PlanRef,
26 pub dedup_cols: Vec<usize>,
28}
29
30impl<PlanRef: GenericPlanRef> Dedup<PlanRef> {
31 pub fn new(input: PlanRef, dedup_cols: Vec<usize>) -> Self {
32 debug_assert!(
33 dedup_cols.iter().all(|&idx| idx < input.schema().len()),
34 "Invalid dedup keys {:?} input schema size = {}",
35 &dedup_cols,
36 input.schema().len()
37 );
38 Dedup { input, dedup_cols }
39 }
40
41 fn dedup_cols_pretty<'a>(&self) -> Pretty<'a> {
42 Pretty::Array(
43 self.dedup_cols
44 .iter()
45 .map(|i| FieldDisplay(self.input.schema().fields.get(*i).unwrap()))
46 .map(|fd| Pretty::display(&fd))
47 .collect(),
48 )
49 }
50}
51
52impl<PlanRef: GenericPlanRef> DistillUnit for Dedup<PlanRef> {
53 fn distill_with_name<'a>(&self, name: impl Into<Str<'a>>) -> XmlNode<'a> {
54 childless_record(name, vec![("dedup_cols", self.dedup_cols_pretty())])
55 }
56}
57
58impl<PlanRef: GenericPlanRef> GenericPlanNode for Dedup<PlanRef> {
59 fn schema(&self) -> Schema {
60 self.input.schema().clone()
61 }
62
63 fn stream_key(&self) -> Option<Vec<usize>> {
64 Some(self.dedup_cols.clone())
65 }
66
67 fn ctx(&self) -> OptimizerContextRef {
68 self.input.ctx()
69 }
70
71 fn functional_dependency(&self) -> FunctionalDependencySet {
72 self.input.functional_dependency().clone()
73 }
74}