risingwave_frontend/optimizer/plan_node/
stream_source.rs1use std::rc::Rc;
16
17use itertools::Itertools;
18use pretty_xmlish::{Pretty, XmlNode};
19use risingwave_common::catalog::{ColumnCatalog, Field};
20use risingwave_common::types::DataType;
21use risingwave_pb::stream_plan::stream_node::PbNodeBody;
22use risingwave_pb::stream_plan::{Columns, PbStreamSource, SourceNode};
23
24use super::stream::prelude::*;
25use super::utils::{Distill, TableCatalogBuilder, childless_record};
26use super::{ExprRewritable, PlanBase, StreamNode, generic};
27use crate::TableCatalog;
28use crate::catalog::source_catalog::SourceCatalog;
29use crate::optimizer::plan_node::expr_visitable::ExprVisitable;
30use crate::optimizer::plan_node::utils::column_names_pretty;
31use crate::optimizer::property::{Distribution, MonotonicityMap, WatermarkColumns};
32use crate::stream_fragmenter::BuildFragmentGraphState;
33
34#[derive(Debug, Clone, PartialEq, Eq, Hash)]
36pub struct StreamSource {
37 pub base: PlanBase<Stream>,
38 pub(crate) core: generic::Source,
39 pub(crate) downstream_columns: Option<Vec<ColumnCatalog>>,
42}
43
44impl StreamSource {
45 pub fn new(core: generic::Source) -> Self {
46 let base = PlanBase::new_stream_with_core(
47 &core,
48 Distribution::SomeShard,
49 core.stream_kind(),
50 false,
51 WatermarkColumns::new(),
52 MonotonicityMap::new(),
53 );
54 Self {
55 base,
56 core,
57 downstream_columns: None,
58 }
59 }
60
61 pub fn source_catalog(&self) -> Option<Rc<SourceCatalog>> {
62 self.core.catalog.clone()
63 }
64
65 fn infer_internal_table_catalog(&self) -> TableCatalog {
66 if !self.core.is_iceberg_connector() {
67 generic::Source::infer_internal_table_catalog(false)
68 } else {
69 let mut builder = TableCatalogBuilder::default();
71 builder.add_column(&Field {
72 data_type: DataType::Int64,
73 name: "last_snapshot".to_owned(),
74 });
75 builder.build(vec![], 0)
76 }
77 }
78}
79
80impl_plan_tree_node_for_leaf! { Stream, StreamSource }
81
82impl Distill for StreamSource {
83 fn distill<'a>(&self) -> XmlNode<'a> {
84 let fields = if let Some(catalog) = self.source_catalog() {
85 let src = Pretty::from(catalog.name.clone());
86 let col = column_names_pretty(self.schema());
87 vec![("source", src), ("columns", col)]
88 } else {
89 vec![]
90 };
91 childless_record("StreamSource", fields)
92 }
93}
94
95impl StreamNode for StreamSource {
96 fn to_stream_prost_body(&self, state: &mut BuildFragmentGraphState) -> PbNodeBody {
97 let source_catalog = self.source_catalog();
98 let source_inner = source_catalog.map(|source_catalog| {
99 let (with_properties, secret_refs) =
100 source_catalog.with_properties.clone().into_parts();
101 PbStreamSource {
102 source_id: source_catalog.id,
103 source_name: source_catalog.name.clone(),
104 state_table: Some(
105 self.infer_internal_table_catalog()
106 .with_id(state.gen_table_id_wrapped())
107 .to_internal_table_prost(),
108 ),
109 info: Some(source_catalog.info.clone()),
110 row_id_index: self.core.row_id_index.map(|index| index as _),
111 columns: self
112 .core
113 .column_catalog
114 .iter()
115 .map(|c| c.to_protobuf())
116 .collect_vec(),
117 with_properties,
118 rate_limit: source_catalog.rate_limit,
119 secret_refs,
120 downstream_columns: self.downstream_columns.as_ref().map(|cols| Columns {
121 columns: cols.iter().map(|c| c.to_protobuf()).collect_vec(),
122 }),
123 }
124 });
125 PbNodeBody::Source(Box::new(SourceNode { source_inner }))
126 }
127}
128
129impl ExprRewritable<Stream> for StreamSource {}
130
131impl ExprVisitable for StreamSource {}