risingwave_frontend/optimizer/plan_node/generic/
changelog.rs1use pretty_xmlish::{Str, XmlNode};
16use risingwave_common::catalog::{Field, Schema};
17use risingwave_common::util::column_index_mapping::ColIndexMapping;
18
19use super::{DistillUnit, GenericPlanNode};
20use crate::OptimizerContextRef;
21use crate::optimizer::plan_node::stream::prelude::GenericPlanRef;
22use crate::optimizer::plan_node::utils::childless_record;
23use crate::optimizer::property::FunctionalDependencySet;
24use crate::utils::ColIndexMappingRewriteExt;
25
26pub const CHANGELOG_OP: &str = "changelog_op";
27pub const _CHANGELOG_ROW_ID: &str = "_changelog_row_id";
28#[derive(Debug, Clone, PartialEq, Eq, Hash)]
29pub struct ChangeLog<PlanRef> {
30 pub input: PlanRef,
31 pub need_op: bool,
33 pub need_changelog_row_id: bool,
36}
37impl<PlanRef: GenericPlanRef> DistillUnit for ChangeLog<PlanRef> {
38 fn distill_with_name<'a>(&self, name: impl Into<Str<'a>>) -> XmlNode<'a> {
39 childless_record(name, vec![])
40 }
41}
42impl<PlanRef: GenericPlanRef> ChangeLog<PlanRef> {
43 pub fn new(input: PlanRef, need_op: bool, need_changelog_row_id: bool) -> Self {
44 ChangeLog {
45 input,
46 need_op,
47 need_changelog_row_id,
48 }
49 }
50
51 pub fn clone_with_input<OtherPlanRef>(&self, input: OtherPlanRef) -> ChangeLog<OtherPlanRef> {
52 ChangeLog {
53 input,
54 need_op: self.need_op,
55 need_changelog_row_id: self.need_changelog_row_id,
56 }
57 }
58
59 pub fn i2o_col_mapping(&self) -> ColIndexMapping {
60 let mut map = vec![None; self.input.schema().len()];
61 (0..self.input.schema().len()).for_each(|i| map[i] = Some(i));
62 ColIndexMapping::new(map, self.schema().len())
63 }
64}
65impl<PlanRef: GenericPlanRef> GenericPlanNode for ChangeLog<PlanRef> {
66 fn schema(&self) -> Schema {
67 let mut fields = self.input.schema().fields.clone();
68 if self.need_op {
69 fields.push(Field::with_name(
70 risingwave_common::types::DataType::Int16,
71 CHANGELOG_OP,
72 ));
73 }
74 if self.need_changelog_row_id {
75 fields.push(Field::with_name(
76 risingwave_common::types::DataType::Serial,
77 _CHANGELOG_ROW_ID,
78 ));
79 }
80 Schema::new(fields)
81 }
82
83 fn stream_key(&self) -> Option<Vec<usize>> {
84 if self.need_changelog_row_id {
85 let keys = vec![self.schema().len() - 1];
86 Some(keys)
87 } else {
88 None
89 }
90 }
91
92 fn ctx(&self) -> OptimizerContextRef {
93 self.input.ctx()
94 }
95
96 fn functional_dependency(&self) -> FunctionalDependencySet {
97 let i2o = self.i2o_col_mapping();
98 i2o.rewrite_functional_dependency_set(self.input.functional_dependency().clone())
99 }
100}