risingwave_frontend/optimizer/plan_node/
stream_source_scan.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::rc::Rc;
16
17use itertools::Itertools;
18use pretty_xmlish::{Pretty, XmlNode};
19use risingwave_common::catalog::Field;
20use risingwave_common::types::DataType;
21use risingwave_common::util::sort_util::OrderType;
22use risingwave_pb::stream_plan::PbStreamNode;
23use risingwave_pb::stream_plan::stream_node::{NodeBody, PbNodeBody};
24
25use super::stream::prelude::*;
26use super::utils::TableCatalogBuilder;
27use super::{PlanBase, PlanRef};
28use crate::catalog::source_catalog::SourceCatalog;
29use crate::optimizer::plan_node::expr_visitable::ExprVisitable;
30use crate::optimizer::plan_node::utils::{Distill, childless_record};
31use crate::optimizer::plan_node::{ExprRewritable, StreamNode, generic};
32use crate::optimizer::property::{Distribution, MonotonicityMap, WatermarkColumns};
33use crate::scheduler::SchedulerResult;
34use crate::stream_fragmenter::BuildFragmentGraphState;
35use crate::{Explain, TableCatalog};
36
37/// `StreamSourceScan` scans from a *shared source*. It forwards data from the upstream [`StreamSource`],
38/// and also backfills data from the external source.
39///
40/// Unlike [`StreamSource`], which is a leaf node in the stream graph, `StreamSourceScan` is converted to `merge -> backfill`
41///
42/// [`StreamSource`]:super::StreamSource
43#[derive(Debug, Clone, PartialEq, Eq, Hash)]
44pub struct StreamSourceScan {
45    pub base: PlanBase<Stream>,
46    core: generic::Source,
47}
48
49impl_plan_tree_node_for_leaf! { StreamSourceScan }
50
51impl StreamSourceScan {
52    pub const BACKFILL_PROGRESS_COLUMN_NAME: &str = "backfill_progress";
53    pub const PARTITION_ID_COLUMN_NAME: &str = "partition_id";
54
55    pub fn new(core: generic::Source) -> Self {
56        let base = PlanBase::new_stream_with_core(
57            &core,
58            Distribution::SomeShard,
59            core.catalog.as_ref().is_none_or(|s| s.append_only),
60            false,
61            WatermarkColumns::new(),
62            MonotonicityMap::new(),
63        );
64
65        Self { base, core }
66    }
67
68    fn get_columns(&self) -> Vec<&str> {
69        self.core
70            .column_catalog
71            .iter()
72            .map(|column| column.name())
73            .collect()
74    }
75
76    pub fn source_catalog(&self) -> Rc<SourceCatalog> {
77        self.core
78            .catalog
79            .clone()
80            .expect("source scan should have source cataglog")
81    }
82
83    /// The state is different from but similar to `StreamSource`.
84    /// Refer to [`generic::Source::infer_internal_table_catalog`] for more details.
85    pub fn infer_internal_table_catalog() -> TableCatalog {
86        let mut builder = TableCatalogBuilder::default();
87
88        let key = Field {
89            data_type: DataType::Varchar,
90            name: Self::PARTITION_ID_COLUMN_NAME.to_owned(),
91        };
92        let value = Field {
93            data_type: DataType::Jsonb,
94            name: Self::BACKFILL_PROGRESS_COLUMN_NAME.to_owned(),
95        };
96
97        let ordered_col_idx = builder.add_column(&key);
98        builder.add_column(&value);
99        builder.add_order_column(ordered_col_idx, OrderType::ascending());
100        // Hacky: read prefix hint is 0, because we need to scan all data in the state table.
101        builder.build(vec![], 0)
102    }
103
104    pub fn adhoc_to_stream_prost(
105        &self,
106        state: &mut BuildFragmentGraphState,
107    ) -> SchedulerResult<PbStreamNode> {
108        use risingwave_pb::stream_plan::*;
109
110        let stream_key = self
111            .stream_key()
112            .unwrap_or_else(|| {
113                panic!(
114                    "should always have a stream key in the stream plan but not, sub plan: {}",
115                    PlanRef::from(self.clone()).explain_to_string()
116                )
117            })
118            .iter()
119            .map(|x| *x as u32)
120            .collect_vec();
121
122        let source_catalog = self.source_catalog();
123        let (with_properties, secret_refs) = source_catalog.with_properties.clone().into_parts();
124        let backfill = SourceBackfillNode {
125            upstream_source_id: source_catalog.id,
126            source_name: source_catalog.name.clone(),
127            state_table: Some(
128                Self::infer_internal_table_catalog()
129                    .with_id(state.gen_table_id_wrapped())
130                    .to_internal_table_prost(),
131            ),
132            info: Some(source_catalog.info.clone()),
133            row_id_index: self.core.row_id_index.map(|index| index as _),
134            columns: self
135                .core
136                .column_catalog
137                .iter()
138                .map(|c| c.to_protobuf())
139                .collect_vec(),
140            with_properties,
141            rate_limit: self.base.ctx().overwrite_options().backfill_rate_limit,
142            secret_refs,
143        };
144
145        let fields = self.schema().to_prost();
146        // plan: merge -> backfill
147        Ok(PbStreamNode {
148            fields: fields.clone(),
149            input: vec![
150                // The merge node body will be filled by the `ActorBuilder` on the meta service.
151                PbStreamNode {
152                    node_body: Some(PbNodeBody::Merge(Default::default())),
153                    identity: "Upstream".into(),
154                    fields,
155                    stream_key: vec![], // not used
156                    ..Default::default()
157                },
158            ],
159            node_body: Some(PbNodeBody::SourceBackfill(Box::new(backfill))),
160            stream_key,
161            operator_id: self.base.id().0 as u64,
162            identity: self.distill_to_string(),
163            append_only: self.append_only(),
164        })
165    }
166}
167
168impl Distill for StreamSourceScan {
169    fn distill<'a>(&self) -> XmlNode<'a> {
170        let columns = self
171            .get_columns()
172            .iter()
173            .map(|ele| Pretty::from(ele.to_string()))
174            .collect();
175        let col = Pretty::Array(columns);
176        childless_record("StreamSourceScan", vec![("columns", col)])
177    }
178}
179
180impl ExprRewritable for StreamSourceScan {}
181
182impl ExprVisitable for StreamSourceScan {}
183
184impl StreamNode for StreamSourceScan {
185    fn to_stream_prost_body(&self, _state: &mut BuildFragmentGraphState) -> NodeBody {
186        unreachable!(
187            "stream source scan cannot be converted into a prost body -- call `adhoc_to_stream_prost` instead."
188        )
189    }
190}