risingwave_frontend/optimizer/plan_node/
stream_source.rs1use std::rc::Rc;
16
17use itertools::Itertools;
18use pretty_xmlish::{Pretty, XmlNode};
19use risingwave_common::catalog::Field;
20use risingwave_common::types::DataType;
21use risingwave_pb::stream_plan::stream_node::PbNodeBody;
22use risingwave_pb::stream_plan::{PbStreamSource, SourceNode};
23
24use super::stream::prelude::*;
25use super::utils::{Distill, TableCatalogBuilder, childless_record};
26use super::{ExprRewritable, PlanBase, StreamNode, generic};
27use crate::TableCatalog;
28use crate::catalog::source_catalog::SourceCatalog;
29use crate::optimizer::plan_node::expr_visitable::ExprVisitable;
30use crate::optimizer::plan_node::utils::column_names_pretty;
31use crate::optimizer::property::{Distribution, MonotonicityMap, WatermarkColumns};
32use crate::stream_fragmenter::BuildFragmentGraphState;
33
34#[derive(Debug, Clone, PartialEq, Eq, Hash)]
36pub struct StreamSource {
37 pub base: PlanBase<Stream>,
38 pub(crate) core: generic::Source,
39}
40
41impl StreamSource {
42 pub fn new(core: generic::Source) -> Self {
43 let base = PlanBase::new_stream_with_core(
44 &core,
45 Distribution::SomeShard,
46 core.catalog.as_ref().is_none_or(|s| s.append_only),
47 false,
48 WatermarkColumns::new(),
49 MonotonicityMap::new(),
50 );
51 Self { base, core }
52 }
53
54 pub fn source_catalog(&self) -> Option<Rc<SourceCatalog>> {
55 self.core.catalog.clone()
56 }
57
58 fn infer_internal_table_catalog(&self) -> TableCatalog {
59 if !self.core.is_iceberg_connector() {
60 generic::Source::infer_internal_table_catalog(false)
61 } else {
62 let mut builder = TableCatalogBuilder::default();
64 builder.add_column(&Field {
65 data_type: DataType::Int64,
66 name: "last_snapshot".to_owned(),
67 });
68 builder.build(vec![], 0)
69 }
70 }
71}
72
73impl_plan_tree_node_for_leaf! { StreamSource }
74
75impl Distill for StreamSource {
76 fn distill<'a>(&self) -> XmlNode<'a> {
77 let fields = if let Some(catalog) = self.source_catalog() {
78 let src = Pretty::from(catalog.name.clone());
79 let col = column_names_pretty(self.schema());
80 vec![("source", src), ("columns", col)]
81 } else {
82 vec![]
83 };
84 childless_record("StreamSource", fields)
85 }
86}
87
88impl StreamNode for StreamSource {
89 fn to_stream_prost_body(&self, state: &mut BuildFragmentGraphState) -> PbNodeBody {
90 let source_catalog = self.source_catalog();
91 let source_inner = source_catalog.map(|source_catalog| {
92 let (with_properties, secret_refs) =
93 source_catalog.with_properties.clone().into_parts();
94 PbStreamSource {
95 source_id: source_catalog.id,
96 source_name: source_catalog.name.clone(),
97 state_table: Some(
98 self.infer_internal_table_catalog()
99 .with_id(state.gen_table_id_wrapped())
100 .to_internal_table_prost(),
101 ),
102 info: Some(source_catalog.info.clone()),
103 row_id_index: self.core.row_id_index.map(|index| index as _),
104 columns: self
105 .core
106 .column_catalog
107 .iter()
108 .map(|c| c.to_protobuf())
109 .collect_vec(),
110 with_properties,
111 rate_limit: source_catalog.rate_limit,
112 secret_refs,
113 }
114 });
115 PbNodeBody::Source(Box::new(SourceNode { source_inner }))
116 }
117}
118
119impl ExprRewritable for StreamSource {}
120
121impl ExprVisitable for StreamSource {}