risingwave_frontend/optimizer/plan_node/generic/
changelog.rs1use pretty_xmlish::{Str, XmlNode};
16use risingwave_common::catalog::{Field, Schema};
17use risingwave_common::util::column_index_mapping::ColIndexMapping;
18
19use super::{DistillUnit, GenericPlanNode};
20use crate::OptimizerContextRef;
21use crate::optimizer::plan_node::stream::prelude::GenericPlanRef;
22use crate::optimizer::plan_node::utils::childless_record;
23use crate::optimizer::property::FunctionalDependencySet;
24use crate::utils::ColIndexMappingRewriteExt;
25
26pub const CHANGELOG_OP: &str = "changelog_op";
27pub const _CHANGELOG_ROW_ID: &str = "_changelog_row_id";
28#[derive(Debug, Clone, PartialEq, Eq, Hash)]
29pub struct ChangeLog<PlanRef> {
30 pub input: PlanRef,
31 pub need_op: bool,
33 pub need_changelog_row_id: bool,
36}
37impl<PlanRef: GenericPlanRef> DistillUnit for ChangeLog<PlanRef> {
38 fn distill_with_name<'a>(&self, name: impl Into<Str<'a>>) -> XmlNode<'a> {
39 childless_record(name, vec![])
40 }
41}
42impl<PlanRef: GenericPlanRef> ChangeLog<PlanRef> {
43 pub fn new(input: PlanRef, need_op: bool, need_changelog_row_id: bool) -> Self {
44 ChangeLog {
45 input,
46 need_op,
47 need_changelog_row_id,
48 }
49 }
50
51 pub fn i2o_col_mapping(&self) -> ColIndexMapping {
52 let mut map = vec![None; self.input.schema().len()];
53 (0..self.input.schema().len()).for_each(|i| map[i] = Some(i));
54 ColIndexMapping::new(map, self.schema().len())
55 }
56}
57impl<PlanRef: GenericPlanRef> GenericPlanNode for ChangeLog<PlanRef> {
58 fn schema(&self) -> Schema {
59 let mut fields = self.input.schema().fields.clone();
60 if self.need_op {
61 fields.push(Field::with_name(
62 risingwave_common::types::DataType::Int16,
63 CHANGELOG_OP,
64 ));
65 }
66 if self.need_changelog_row_id {
67 fields.push(Field::with_name(
68 risingwave_common::types::DataType::Serial,
69 _CHANGELOG_ROW_ID,
70 ));
71 }
72 Schema::new(fields)
73 }
74
75 fn stream_key(&self) -> Option<Vec<usize>> {
76 if self.need_changelog_row_id {
77 let keys = vec![self.schema().len() - 1];
78 Some(keys)
79 } else {
80 None
81 }
82 }
83
84 fn ctx(&self) -> OptimizerContextRef {
85 self.input.ctx()
86 }
87
88 fn functional_dependency(&self) -> FunctionalDependencySet {
89 let i2o = self.i2o_col_mapping();
90 i2o.rewrite_functional_dependency_set(self.input.functional_dependency().clone())
91 }
92}