risingwave_frontend/optimizer/plan_node/
stream_source_scan.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::rc::Rc;
16
17use itertools::Itertools;
18use pretty_xmlish::{Pretty, XmlNode};
19use risingwave_common::catalog::Field;
20use risingwave_common::types::DataType;
21use risingwave_common::util::sort_util::OrderType;
22use risingwave_pb::stream_plan::PbStreamNode;
23use risingwave_pb::stream_plan::stream_node::{NodeBody, PbNodeBody};
24
25use super::stream::prelude::*;
26use super::utils::TableCatalogBuilder;
27use super::{PlanBase, PlanRef};
28use crate::catalog::source_catalog::SourceCatalog;
29use crate::optimizer::plan_node::expr_visitable::ExprVisitable;
30use crate::optimizer::plan_node::utils::{Distill, childless_record};
31use crate::optimizer::plan_node::{ExprRewritable, StreamNode, generic};
32use crate::optimizer::property::{Distribution, MonotonicityMap, WatermarkColumns};
33use crate::scheduler::SchedulerResult;
34use crate::stream_fragmenter::BuildFragmentGraphState;
35use crate::{Explain, TableCatalog};
36
37/// `StreamSourceScan` scans from a *shared source*. It forwards data from the upstream [`StreamSource`],
38/// and also backfills data from the external source.
39///
40/// Unlike [`StreamSource`], which is a leaf node in the stream graph, `StreamSourceScan` is converted to `merge -> backfill`
41///
42/// [`StreamSource`]:super::StreamSource
43#[derive(Debug, Clone, PartialEq, Eq, Hash)]
44pub struct StreamSourceScan {
45    pub base: PlanBase<Stream>,
46    core: generic::Source,
47}
48
49impl_plan_tree_node_for_leaf! { StreamSourceScan }
50
51impl StreamSourceScan {
52    pub fn new(core: generic::Source) -> Self {
53        let base = PlanBase::new_stream_with_core(
54            &core,
55            Distribution::SomeShard,
56            core.catalog.as_ref().is_none_or(|s| s.append_only),
57            false,
58            WatermarkColumns::new(),
59            MonotonicityMap::new(),
60        );
61
62        Self { base, core }
63    }
64
65    fn get_columns(&self) -> Vec<&str> {
66        self.core
67            .column_catalog
68            .iter()
69            .map(|column| column.name())
70            .collect()
71    }
72
73    pub fn source_catalog(&self) -> Rc<SourceCatalog> {
74        self.core
75            .catalog
76            .clone()
77            .expect("source scan should have source cataglog")
78    }
79
80    /// The state is different from but similar to `StreamSource`.
81    /// Refer to [`generic::Source::infer_internal_table_catalog`] for more details.
82    pub fn infer_internal_table_catalog() -> TableCatalog {
83        let mut builder = TableCatalogBuilder::default();
84
85        let key = Field {
86            data_type: DataType::Varchar,
87            name: "partition_id".to_owned(),
88        };
89        let value = Field {
90            data_type: DataType::Jsonb,
91            name: "backfill_progress".to_owned(),
92        };
93
94        let ordered_col_idx = builder.add_column(&key);
95        builder.add_column(&value);
96        builder.add_order_column(ordered_col_idx, OrderType::ascending());
97        // Hacky: read prefix hint is 0, because we need to scan all data in the state table.
98        builder.build(vec![], 0)
99    }
100
101    pub fn adhoc_to_stream_prost(
102        &self,
103        state: &mut BuildFragmentGraphState,
104    ) -> SchedulerResult<PbStreamNode> {
105        use risingwave_pb::stream_plan::*;
106
107        let stream_key = self
108            .stream_key()
109            .unwrap_or_else(|| {
110                panic!(
111                    "should always have a stream key in the stream plan but not, sub plan: {}",
112                    PlanRef::from(self.clone()).explain_to_string()
113                )
114            })
115            .iter()
116            .map(|x| *x as u32)
117            .collect_vec();
118
119        let source_catalog = self.source_catalog();
120        let (with_properties, secret_refs) = source_catalog.with_properties.clone().into_parts();
121        let backfill = SourceBackfillNode {
122            upstream_source_id: source_catalog.id,
123            source_name: source_catalog.name.clone(),
124            state_table: Some(
125                Self::infer_internal_table_catalog()
126                    .with_id(state.gen_table_id_wrapped())
127                    .to_internal_table_prost(),
128            ),
129            info: Some(source_catalog.info.clone()),
130            row_id_index: self.core.row_id_index.map(|index| index as _),
131            columns: self
132                .core
133                .column_catalog
134                .iter()
135                .map(|c| c.to_protobuf())
136                .collect_vec(),
137            with_properties,
138            rate_limit: self.base.ctx().overwrite_options().backfill_rate_limit,
139            secret_refs,
140        };
141
142        let fields = self.schema().to_prost();
143        // plan: merge -> backfill
144        Ok(PbStreamNode {
145            fields: fields.clone(),
146            input: vec![
147                // The merge node body will be filled by the `ActorBuilder` on the meta service.
148                PbStreamNode {
149                    node_body: Some(PbNodeBody::Merge(Default::default())),
150                    identity: "Upstream".into(),
151                    fields,
152                    stream_key: vec![], // not used
153                    ..Default::default()
154                },
155            ],
156            node_body: Some(PbNodeBody::SourceBackfill(Box::new(backfill))),
157            stream_key,
158            operator_id: self.base.id().0 as u64,
159            identity: self.distill_to_string(),
160            append_only: self.append_only(),
161        })
162    }
163}
164
165impl Distill for StreamSourceScan {
166    fn distill<'a>(&self) -> XmlNode<'a> {
167        let columns = self
168            .get_columns()
169            .iter()
170            .map(|ele| Pretty::from(ele.to_string()))
171            .collect();
172        let col = Pretty::Array(columns);
173        childless_record("StreamSourceScan", vec![("columns", col)])
174    }
175}
176
177impl ExprRewritable for StreamSourceScan {}
178
179impl ExprVisitable for StreamSourceScan {}
180
181impl StreamNode for StreamSourceScan {
182    fn to_stream_prost_body(&self, _state: &mut BuildFragmentGraphState) -> NodeBody {
183        unreachable!(
184            "stream source scan cannot be converted into a prost body -- call `adhoc_to_stream_prost` instead."
185        )
186    }
187}