risingwave_frontend/optimizer/plan_node/
stream_source.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::rc::Rc;
16
17use itertools::Itertools;
18use pretty_xmlish::{Pretty, XmlNode};
19use risingwave_common::catalog::{ColumnCatalog, Field};
20use risingwave_common::types::DataType;
21use risingwave_pb::stream_plan::stream_node::PbNodeBody;
22use risingwave_pb::stream_plan::{Columns, PbStreamSource, SourceNode};
23
24use super::stream::prelude::*;
25use super::utils::{Distill, TableCatalogBuilder, childless_record};
26use super::{ExprRewritable, PlanBase, StreamNode, generic};
27use crate::TableCatalog;
28use crate::catalog::source_catalog::SourceCatalog;
29use crate::optimizer::plan_node::expr_visitable::ExprVisitable;
30use crate::optimizer::plan_node::utils::column_names_pretty;
31use crate::optimizer::property::{Distribution, MonotonicityMap, WatermarkColumns};
32use crate::stream_fragmenter::BuildFragmentGraphState;
33
34/// [`StreamSource`] represents a table/connector source at the very beginning of the graph.
35#[derive(Debug, Clone, PartialEq, Eq, Hash)]
36pub struct StreamSource {
37    pub base: PlanBase<Stream>,
38    pub(crate) core: generic::Source,
39    /// Downstream columns are used by list node to know which columns are needed.
40    /// For example, iceberg list node will use this info to plan files to read.
41    pub(crate) downstream_columns: Option<Vec<ColumnCatalog>>,
42}
43
44impl StreamSource {
45    pub fn new(core: generic::Source) -> Self {
46        let base = PlanBase::new_stream_with_core(
47            &core,
48            Distribution::SomeShard,
49            core.stream_kind(),
50            false,
51            WatermarkColumns::new(),
52            MonotonicityMap::new(),
53        );
54        Self {
55            base,
56            core,
57            downstream_columns: None,
58        }
59    }
60
61    pub fn source_catalog(&self) -> Option<Rc<SourceCatalog>> {
62        self.core.catalog.clone()
63    }
64
65    fn infer_internal_table_catalog(&self) -> TableCatalog {
66        if !self.core.is_iceberg_connector() {
67            generic::Source::infer_internal_table_catalog(false)
68        } else {
69            // iceberg list node (singleton) stores last_snapshot (just 1 row, no pk)
70            let mut builder = TableCatalogBuilder::default();
71            builder.add_column(&Field {
72                data_type: DataType::Int64,
73                name: "last_snapshot".to_owned(),
74            });
75            builder.build(vec![], 0)
76        }
77    }
78}
79
80impl_plan_tree_node_for_leaf! { Stream, StreamSource }
81
82impl Distill for StreamSource {
83    fn distill<'a>(&self) -> XmlNode<'a> {
84        let fields = if let Some(catalog) = self.source_catalog() {
85            let src = Pretty::from(catalog.name.clone());
86            let col = column_names_pretty(self.schema());
87            vec![("source", src), ("columns", col)]
88        } else {
89            vec![]
90        };
91        childless_record("StreamSource", fields)
92    }
93}
94
95impl StreamNode for StreamSource {
96    fn to_stream_prost_body(&self, state: &mut BuildFragmentGraphState) -> PbNodeBody {
97        let source_catalog = self.source_catalog();
98        let source_inner = source_catalog.map(|source_catalog| {
99            let (with_properties, secret_refs) =
100                source_catalog.with_properties.clone().into_parts();
101            PbStreamSource {
102                source_id: source_catalog.id,
103                source_name: source_catalog.name.clone(),
104                state_table: Some(
105                    self.infer_internal_table_catalog()
106                        .with_id(state.gen_table_id_wrapped())
107                        .to_internal_table_prost(),
108                ),
109                info: Some(source_catalog.info.clone()),
110                row_id_index: self.core.row_id_index.map(|index| index as _),
111                columns: self
112                    .core
113                    .column_catalog
114                    .iter()
115                    .map(|c| c.to_protobuf())
116                    .collect_vec(),
117                with_properties,
118                rate_limit: source_catalog.rate_limit,
119                secret_refs,
120                downstream_columns: self.downstream_columns.as_ref().map(|cols| Columns {
121                    columns: cols.iter().map(|c| c.to_protobuf()).collect_vec(),
122                }),
123            }
124        });
125        PbNodeBody::Source(Box::new(SourceNode { source_inner }))
126    }
127}
128
129impl ExprRewritable<Stream> for StreamSource {}
130
131impl ExprVisitable for StreamSource {}