risingwave_frontend/optimizer/plan_node/
stream_source.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::rc::Rc;
16
17use itertools::Itertools;
18use pretty_xmlish::{Pretty, XmlNode};
19use risingwave_common::catalog::Field;
20use risingwave_common::types::DataType;
21use risingwave_pb::stream_plan::stream_node::PbNodeBody;
22use risingwave_pb::stream_plan::{PbStreamSource, SourceNode};
23
24use super::stream::prelude::*;
25use super::utils::{Distill, TableCatalogBuilder, childless_record};
26use super::{ExprRewritable, PlanBase, StreamNode, generic};
27use crate::TableCatalog;
28use crate::catalog::source_catalog::SourceCatalog;
29use crate::optimizer::plan_node::expr_visitable::ExprVisitable;
30use crate::optimizer::plan_node::utils::column_names_pretty;
31use crate::optimizer::property::{Distribution, MonotonicityMap, WatermarkColumns};
32use crate::stream_fragmenter::BuildFragmentGraphState;
33
34/// [`StreamSource`] represents a table/connector source at the very beginning of the graph.
35#[derive(Debug, Clone, PartialEq, Eq, Hash)]
36pub struct StreamSource {
37    pub base: PlanBase<Stream>,
38    pub(crate) core: generic::Source,
39}
40
41impl StreamSource {
42    pub fn new(core: generic::Source) -> Self {
43        let base = PlanBase::new_stream_with_core(
44            &core,
45            Distribution::SomeShard,
46            core.catalog.as_ref().is_none_or(|s| s.append_only),
47            false,
48            WatermarkColumns::new(),
49            MonotonicityMap::new(),
50        );
51        Self { base, core }
52    }
53
54    pub fn source_catalog(&self) -> Option<Rc<SourceCatalog>> {
55        self.core.catalog.clone()
56    }
57
58    fn infer_internal_table_catalog(&self) -> TableCatalog {
59        if !self.core.is_iceberg_connector() {
60            generic::Source::infer_internal_table_catalog(false)
61        } else {
62            // iceberg list node (singleton) stores last_snapshot (just 1 row, no pk)
63            let mut builder = TableCatalogBuilder::default();
64            builder.add_column(&Field {
65                data_type: DataType::Int64,
66                name: "last_snapshot".to_owned(),
67            });
68            builder.build(vec![], 0)
69        }
70    }
71}
72
73impl_plan_tree_node_for_leaf! { StreamSource }
74
75impl Distill for StreamSource {
76    fn distill<'a>(&self) -> XmlNode<'a> {
77        let fields = if let Some(catalog) = self.source_catalog() {
78            let src = Pretty::from(catalog.name.clone());
79            let col = column_names_pretty(self.schema());
80            vec![("source", src), ("columns", col)]
81        } else {
82            vec![]
83        };
84        childless_record("StreamSource", fields)
85    }
86}
87
88impl StreamNode for StreamSource {
89    fn to_stream_prost_body(&self, state: &mut BuildFragmentGraphState) -> PbNodeBody {
90        let source_catalog = self.source_catalog();
91        let source_inner = source_catalog.map(|source_catalog| {
92            let (with_properties, secret_refs) =
93                source_catalog.with_properties.clone().into_parts();
94            PbStreamSource {
95                source_id: source_catalog.id,
96                source_name: source_catalog.name.clone(),
97                state_table: Some(
98                    self.infer_internal_table_catalog()
99                        .with_id(state.gen_table_id_wrapped())
100                        .to_internal_table_prost(),
101                ),
102                info: Some(source_catalog.info.clone()),
103                row_id_index: self.core.row_id_index.map(|index| index as _),
104                columns: self
105                    .core
106                    .column_catalog
107                    .iter()
108                    .map(|c| c.to_protobuf())
109                    .collect_vec(),
110                with_properties,
111                rate_limit: source_catalog.rate_limit,
112                secret_refs,
113            }
114        });
115        PbNodeBody::Source(Box::new(SourceNode { source_inner }))
116    }
117}
118
119impl ExprRewritable for StreamSource {}
120
121impl ExprVisitable for StreamSource {}