risingwave_frontend/optimizer/plan_node/
stream_source_scan.rs1use std::rc::Rc;
16
17use itertools::Itertools;
18use pretty_xmlish::{Pretty, XmlNode};
19use risingwave_common::catalog::Field;
20use risingwave_common::types::DataType;
21use risingwave_common::util::sort_util::OrderType;
22use risingwave_pb::stream_plan::PbStreamNode;
23use risingwave_pb::stream_plan::stream_node::{NodeBody, PbNodeBody};
24
25use super::stream::prelude::*;
26use super::utils::TableCatalogBuilder;
27use super::{PlanBase, PlanRef};
28use crate::catalog::source_catalog::SourceCatalog;
29use crate::optimizer::plan_node::expr_visitable::ExprVisitable;
30use crate::optimizer::plan_node::utils::{Distill, childless_record};
31use crate::optimizer::plan_node::{ExprRewritable, StreamNode, generic};
32use crate::optimizer::property::{Distribution, MonotonicityMap, WatermarkColumns};
33use crate::scheduler::SchedulerResult;
34use crate::stream_fragmenter::BuildFragmentGraphState;
35use crate::{Explain, TableCatalog};
36
37#[derive(Debug, Clone, PartialEq, Eq, Hash)]
44pub struct StreamSourceScan {
45 pub base: PlanBase<Stream>,
46 core: generic::Source,
47}
48
49impl_plan_tree_node_for_leaf! { StreamSourceScan }
50
51impl StreamSourceScan {
52 pub const BACKFILL_PROGRESS_COLUMN_NAME: &str = "backfill_progress";
53 pub const PARTITION_ID_COLUMN_NAME: &str = "partition_id";
54
55 pub fn new(core: generic::Source) -> Self {
56 let base = PlanBase::new_stream_with_core(
57 &core,
58 Distribution::SomeShard,
59 core.catalog.as_ref().is_none_or(|s| s.append_only),
60 false,
61 WatermarkColumns::new(),
62 MonotonicityMap::new(),
63 );
64
65 Self { base, core }
66 }
67
68 fn get_columns(&self) -> Vec<&str> {
69 self.core
70 .column_catalog
71 .iter()
72 .map(|column| column.name())
73 .collect()
74 }
75
76 pub fn source_catalog(&self) -> Rc<SourceCatalog> {
77 self.core
78 .catalog
79 .clone()
80 .expect("source scan should have source cataglog")
81 }
82
83 pub fn infer_internal_table_catalog() -> TableCatalog {
86 let mut builder = TableCatalogBuilder::default();
87
88 let key = Field {
89 data_type: DataType::Varchar,
90 name: Self::PARTITION_ID_COLUMN_NAME.to_owned(),
91 };
92 let value = Field {
93 data_type: DataType::Jsonb,
94 name: Self::BACKFILL_PROGRESS_COLUMN_NAME.to_owned(),
95 };
96
97 let ordered_col_idx = builder.add_column(&key);
98 builder.add_column(&value);
99 builder.add_order_column(ordered_col_idx, OrderType::ascending());
100 builder.build(vec![], 0)
102 }
103
104 pub fn adhoc_to_stream_prost(
105 &self,
106 state: &mut BuildFragmentGraphState,
107 ) -> SchedulerResult<PbStreamNode> {
108 use risingwave_pb::stream_plan::*;
109
110 let stream_key = self
111 .stream_key()
112 .unwrap_or_else(|| {
113 panic!(
114 "should always have a stream key in the stream plan but not, sub plan: {}",
115 PlanRef::from(self.clone()).explain_to_string()
116 )
117 })
118 .iter()
119 .map(|x| *x as u32)
120 .collect_vec();
121
122 let source_catalog = self.source_catalog();
123 let (with_properties, secret_refs) = source_catalog.with_properties.clone().into_parts();
124 let backfill = SourceBackfillNode {
125 upstream_source_id: source_catalog.id,
126 source_name: source_catalog.name.clone(),
127 state_table: Some(
128 Self::infer_internal_table_catalog()
129 .with_id(state.gen_table_id_wrapped())
130 .to_internal_table_prost(),
131 ),
132 info: Some(source_catalog.info.clone()),
133 row_id_index: self.core.row_id_index.map(|index| index as _),
134 columns: self
135 .core
136 .column_catalog
137 .iter()
138 .map(|c| c.to_protobuf())
139 .collect_vec(),
140 with_properties,
141 rate_limit: self.base.ctx().overwrite_options().backfill_rate_limit,
142 secret_refs,
143 };
144
145 let fields = self.schema().to_prost();
146 Ok(PbStreamNode {
148 fields: fields.clone(),
149 input: vec![
150 PbStreamNode {
152 node_body: Some(PbNodeBody::Merge(Default::default())),
153 identity: "Upstream".into(),
154 fields,
155 stream_key: vec![], ..Default::default()
157 },
158 ],
159 node_body: Some(PbNodeBody::SourceBackfill(Box::new(backfill))),
160 stream_key,
161 operator_id: self.base.id().0 as u64,
162 identity: self.distill_to_string(),
163 append_only: self.append_only(),
164 })
165 }
166}
167
168impl Distill for StreamSourceScan {
169 fn distill<'a>(&self) -> XmlNode<'a> {
170 let columns = self
171 .get_columns()
172 .iter()
173 .map(|ele| Pretty::from(ele.to_string()))
174 .collect();
175 let col = Pretty::Array(columns);
176 childless_record("StreamSourceScan", vec![("columns", col)])
177 }
178}
179
180impl ExprRewritable for StreamSourceScan {}
181
182impl ExprVisitable for StreamSourceScan {}
183
184impl StreamNode for StreamSourceScan {
185 fn to_stream_prost_body(&self, _state: &mut BuildFragmentGraphState) -> NodeBody {
186 unreachable!(
187 "stream source scan cannot be converted into a prost body -- call `adhoc_to_stream_prost` instead."
188 )
189 }
190}