risingwave_frontend/optimizer/plan_node/
stream_source_scan.rs
1use std::rc::Rc;
16
17use itertools::Itertools;
18use pretty_xmlish::{Pretty, XmlNode};
19use risingwave_common::catalog::Field;
20use risingwave_common::types::DataType;
21use risingwave_common::util::sort_util::OrderType;
22use risingwave_pb::stream_plan::PbStreamNode;
23use risingwave_pb::stream_plan::stream_node::{NodeBody, PbNodeBody};
24
25use super::stream::prelude::*;
26use super::utils::TableCatalogBuilder;
27use super::{PlanBase, PlanRef};
28use crate::catalog::source_catalog::SourceCatalog;
29use crate::optimizer::plan_node::expr_visitable::ExprVisitable;
30use crate::optimizer::plan_node::utils::{Distill, childless_record};
31use crate::optimizer::plan_node::{ExprRewritable, StreamNode, generic};
32use crate::optimizer::property::{Distribution, MonotonicityMap, WatermarkColumns};
33use crate::scheduler::SchedulerResult;
34use crate::stream_fragmenter::BuildFragmentGraphState;
35use crate::{Explain, TableCatalog};
36
37#[derive(Debug, Clone, PartialEq, Eq, Hash)]
44pub struct StreamSourceScan {
45 pub base: PlanBase<Stream>,
46 core: generic::Source,
47}
48
49impl_plan_tree_node_for_leaf! { StreamSourceScan }
50
51impl StreamSourceScan {
52 pub fn new(core: generic::Source) -> Self {
53 let base = PlanBase::new_stream_with_core(
54 &core,
55 Distribution::SomeShard,
56 core.catalog.as_ref().is_none_or(|s| s.append_only),
57 false,
58 WatermarkColumns::new(),
59 MonotonicityMap::new(),
60 );
61
62 Self { base, core }
63 }
64
65 fn get_columns(&self) -> Vec<&str> {
66 self.core
67 .column_catalog
68 .iter()
69 .map(|column| column.name())
70 .collect()
71 }
72
73 pub fn source_catalog(&self) -> Rc<SourceCatalog> {
74 self.core
75 .catalog
76 .clone()
77 .expect("source scan should have source cataglog")
78 }
79
80 pub fn infer_internal_table_catalog() -> TableCatalog {
83 let mut builder = TableCatalogBuilder::default();
84
85 let key = Field {
86 data_type: DataType::Varchar,
87 name: "partition_id".to_owned(),
88 };
89 let value = Field {
90 data_type: DataType::Jsonb,
91 name: "backfill_progress".to_owned(),
92 };
93
94 let ordered_col_idx = builder.add_column(&key);
95 builder.add_column(&value);
96 builder.add_order_column(ordered_col_idx, OrderType::ascending());
97 builder.build(vec![], 0)
99 }
100
101 pub fn adhoc_to_stream_prost(
102 &self,
103 state: &mut BuildFragmentGraphState,
104 ) -> SchedulerResult<PbStreamNode> {
105 use risingwave_pb::stream_plan::*;
106
107 let stream_key = self
108 .stream_key()
109 .unwrap_or_else(|| {
110 panic!(
111 "should always have a stream key in the stream plan but not, sub plan: {}",
112 PlanRef::from(self.clone()).explain_to_string()
113 )
114 })
115 .iter()
116 .map(|x| *x as u32)
117 .collect_vec();
118
119 let source_catalog = self.source_catalog();
120 let (with_properties, secret_refs) = source_catalog.with_properties.clone().into_parts();
121 let backfill = SourceBackfillNode {
122 upstream_source_id: source_catalog.id,
123 source_name: source_catalog.name.clone(),
124 state_table: Some(
125 Self::infer_internal_table_catalog()
126 .with_id(state.gen_table_id_wrapped())
127 .to_internal_table_prost(),
128 ),
129 info: Some(source_catalog.info.clone()),
130 row_id_index: self.core.row_id_index.map(|index| index as _),
131 columns: self
132 .core
133 .column_catalog
134 .iter()
135 .map(|c| c.to_protobuf())
136 .collect_vec(),
137 with_properties,
138 rate_limit: self.base.ctx().overwrite_options().backfill_rate_limit,
139 secret_refs,
140 };
141
142 let fields = self.schema().to_prost();
143 Ok(PbStreamNode {
145 fields: fields.clone(),
146 input: vec![
147 PbStreamNode {
149 node_body: Some(PbNodeBody::Merge(Default::default())),
150 identity: "Upstream".into(),
151 fields,
152 stream_key: vec![], ..Default::default()
154 },
155 ],
156 node_body: Some(PbNodeBody::SourceBackfill(Box::new(backfill))),
157 stream_key,
158 operator_id: self.base.id().0 as u64,
159 identity: self.distill_to_string(),
160 append_only: self.append_only(),
161 })
162 }
163}
164
165impl Distill for StreamSourceScan {
166 fn distill<'a>(&self) -> XmlNode<'a> {
167 let columns = self
168 .get_columns()
169 .iter()
170 .map(|ele| Pretty::from(ele.to_string()))
171 .collect();
172 let col = Pretty::Array(columns);
173 childless_record("StreamSourceScan", vec![("columns", col)])
174 }
175}
176
177impl ExprRewritable for StreamSourceScan {}
178
179impl ExprVisitable for StreamSourceScan {}
180
181impl StreamNode for StreamSourceScan {
182 fn to_stream_prost_body(&self, _state: &mut BuildFragmentGraphState) -> NodeBody {
183 unreachable!(
184 "stream source scan cannot be converted into a prost body -- call `adhoc_to_stream_prost` instead."
185 )
186 }
187}