risingwave_frontend/optimizer/plan_node/
stream_table_scan.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{BTreeMap, HashMap};
16use std::sync::Arc;
17
18use itertools::Itertools;
19use pretty_xmlish::{Pretty, XmlNode};
20use risingwave_common::catalog::Field;
21use risingwave_common::hash::VirtualNode;
22use risingwave_common::types::DataType;
23use risingwave_common::util::sort_util::OrderType;
24use risingwave_pb::stream_plan::stream_node::PbNodeBody;
25use risingwave_pb::stream_plan::{PbStreamNode, StreamScanType};
26
27use super::stream::prelude::*;
28use super::utils::{Distill, childless_record};
29use super::{ExprRewritable, PlanBase, PlanNodeId, PlanRef, StreamNode, generic};
30use crate::TableCatalog;
31use crate::catalog::ColumnId;
32use crate::expr::{ExprRewriter, ExprVisitor, FunctionCall};
33use crate::optimizer::plan_node::expr_visitable::ExprVisitable;
34use crate::optimizer::plan_node::utils::{IndicesDisplay, TableCatalogBuilder};
35use crate::optimizer::property::{Distribution, DistributionDisplay, MonotonicityMap};
36use crate::scheduler::SchedulerResult;
37use crate::stream_fragmenter::BuildFragmentGraphState;
38
39/// `StreamTableScan` is a virtual plan node to represent a stream table scan. It will be converted
40/// to stream scan + merge node (for upstream materialize) + batch table scan when converting to `MView`
41/// creation request.
42#[derive(Debug, Clone, PartialEq, Eq, Hash)]
43pub struct StreamTableScan {
44    pub base: PlanBase<Stream>,
45    core: generic::TableScan,
46    batch_plan_id: PlanNodeId,
47    stream_scan_type: StreamScanType,
48}
49
50impl StreamTableScan {
51    pub fn new_with_stream_scan_type(
52        core: generic::TableScan,
53        stream_scan_type: StreamScanType,
54    ) -> Self {
55        let batch_plan_id = core.ctx.next_plan_node_id();
56
57        let distribution = {
58            match core.distribution_key() {
59                Some(distribution_key) => {
60                    if distribution_key.is_empty() {
61                        Distribution::Single
62                    } else {
63                        // See also `BatchSeqScan::clone_with_dist`.
64                        Distribution::UpstreamHashShard(distribution_key, core.table_desc.table_id)
65                    }
66                }
67                None => Distribution::SomeShard,
68            }
69        };
70
71        let base = PlanBase::new_stream_with_core(
72            &core,
73            distribution,
74            core.append_only(),
75            false,
76            core.watermark_columns(),
77            MonotonicityMap::new(),
78        );
79        Self {
80            base,
81            core,
82            batch_plan_id,
83            stream_scan_type,
84        }
85    }
86
87    pub fn table_name(&self) -> &str {
88        &self.core.table_name
89    }
90
91    pub fn core(&self) -> &generic::TableScan {
92        &self.core
93    }
94
95    pub fn to_index_scan(
96        &self,
97        index_name: &str,
98        index_table_catalog: Arc<TableCatalog>,
99        primary_to_secondary_mapping: &BTreeMap<usize, usize>,
100        function_mapping: &HashMap<FunctionCall, usize>,
101        stream_scan_type: StreamScanType,
102    ) -> StreamTableScan {
103        let logical_index_scan = self.core.to_index_scan(
104            index_name,
105            index_table_catalog,
106            primary_to_secondary_mapping,
107            function_mapping,
108        );
109        logical_index_scan
110            .distribution_key()
111            .expect("distribution key of stream chain must exist in output columns");
112        StreamTableScan::new_with_stream_scan_type(logical_index_scan, stream_scan_type)
113    }
114
115    pub fn stream_scan_type(&self) -> StreamScanType {
116        self.stream_scan_type
117    }
118
119    // TODO: Add note to reviewer about safety, because of `generic::TableScan` limitation.
120    fn get_upstream_state_table(&self) -> &TableCatalog {
121        self.core.table_catalog.as_ref()
122    }
123
124    /// Build catalog for backfill state
125    ///
126    /// When `stream_scan_type` is not `StreamScanType::SnapshotBackfill`:
127    ///
128    /// Schema: | vnode | pk ... | `backfill_finished` | `row_count` |
129    ///
130    /// key:    | vnode |
131    /// value:  | pk ... | `backfill_finished` | `row_count` |
132    ///
133    /// When we update the backfill progress,
134    /// we update it for all vnodes.
135    ///
136    /// `pk` refers to the upstream pk which we use to track the backfill progress.
137    ///
138    /// `vnode` is the corresponding vnode of the upstream's distribution key.
139    ///         It should also match the vnode of the backfill executor.
140    ///
141    /// `backfill_finished` is a boolean which just indicates if backfill is done.
142    ///
143    /// `row_count` is a count of rows which indicates the # of rows per executor.
144    ///             We used to track this in memory.
145    ///             But for backfill persistence we have to also persist it.
146    ///
147    /// FIXME(kwannoel):
148    /// - Across all vnodes, the values are the same.
149    /// - e.g.
150    ///   | vnode | pk ...  | `backfill_finished` | `row_count` |
151    ///   | 1002 | Int64(1) | t                   | 10          |
152    ///   | 1003 | Int64(1) | t                   | 10          |
153    ///   | 1003 | Int64(1) | t                   | 10          |
154    ///
155    /// Eventually we should track progress per vnode, to support scaling with both mview and
156    /// the corresponding `no_shuffle_backfill`.
157    /// However this is not high priority, since we are working on supporting arrangement backfill,
158    /// which already has this capability.
159    ///
160    ///
161    /// When `stream_scan_type` is `StreamScanType::SnapshotBackfill`:
162    ///
163    /// Schema: | vnode | `epoch` | `row_count` | `is_epoch_finished` | pk ...
164    ///
165    /// key:    | vnode |
166    /// value:  | `epoch` | `row_count` | `is_epoch_finished` | pk ...
167    pub fn build_backfill_state_catalog(
168        &self,
169        state: &mut BuildFragmentGraphState,
170        stream_scan_type: StreamScanType,
171    ) -> TableCatalog {
172        let mut catalog_builder = TableCatalogBuilder::default();
173        let upstream_schema = &self.core.get_table_columns();
174
175        // We use vnode as primary key in state table.
176        // If `Distribution::Single`, vnode will just be `VirtualNode::default()`.
177        catalog_builder.add_column(&Field::with_name(VirtualNode::RW_TYPE, "vnode"));
178        catalog_builder.add_order_column(0, OrderType::ascending());
179
180        match stream_scan_type {
181            StreamScanType::Chain
182            | StreamScanType::Rearrange
183            | StreamScanType::Backfill
184            | StreamScanType::UpstreamOnly
185            | StreamScanType::ArrangementBackfill => {
186                // pk columns
187                for col_order in self.core.primary_key() {
188                    let col = &upstream_schema[col_order.column_index];
189                    catalog_builder.add_column(&Field::from(col));
190                }
191
192                // `backfill_finished` column
193                catalog_builder
194                    .add_column(&Field::with_name(DataType::Boolean, "backfill_finished"));
195
196                // `row_count` column
197                catalog_builder.add_column(&Field::with_name(DataType::Int64, "row_count"));
198            }
199            StreamScanType::SnapshotBackfill | StreamScanType::CrossDbSnapshotBackfill => {
200                // `epoch` column
201                catalog_builder.add_column(&Field::with_name(DataType::Int64, "epoch"));
202
203                // `row_count` column
204                catalog_builder.add_column(&Field::with_name(DataType::Int64, "row_count"));
205
206                // `is_finished` column
207                catalog_builder
208                    .add_column(&Field::with_name(DataType::Boolean, "is_epoch_finished"));
209
210                // pk columns
211                for col_order in self.core.primary_key() {
212                    let col = &upstream_schema[col_order.column_index];
213                    catalog_builder.add_column(&Field::from(col));
214                }
215            }
216            StreamScanType::Unspecified => {
217                unreachable!()
218            }
219        }
220
221        // Reuse the state store pk (vnode) as the vnode as well.
222        catalog_builder.set_vnode_col_idx(0);
223        catalog_builder.set_dist_key_in_pk(vec![0]);
224
225        let num_of_columns = catalog_builder.columns().len();
226        catalog_builder.set_value_indices((1..num_of_columns).collect_vec());
227
228        catalog_builder
229            .build(vec![0], 1)
230            .with_id(state.gen_table_id_wrapped())
231    }
232}
233
234impl_plan_tree_node_for_leaf! { StreamTableScan }
235
236impl Distill for StreamTableScan {
237    fn distill<'a>(&self) -> XmlNode<'a> {
238        let verbose = self.base.ctx().is_explain_verbose();
239        let mut vec = Vec::with_capacity(4);
240        vec.push(("table", Pretty::from(self.core.table_name.clone())));
241        vec.push(("columns", self.core.columns_pretty(verbose)));
242
243        if verbose {
244            vec.push(("stream_scan_type", Pretty::debug(&self.stream_scan_type)));
245            let stream_key = IndicesDisplay {
246                indices: self.stream_key().unwrap_or_default(),
247                schema: self.base.schema(),
248            };
249            vec.push(("stream_key", stream_key.distill()));
250            let pk = IndicesDisplay {
251                indices: &self
252                    .core
253                    .primary_key()
254                    .iter()
255                    .map(|x| x.column_index)
256                    .collect_vec(),
257                schema: &self.core.table_catalog.column_schema(),
258            };
259            vec.push(("pk", pk.distill()));
260            let dist = Pretty::display(&DistributionDisplay {
261                distribution: self.distribution(),
262                input_schema: self.base.schema(),
263            });
264            vec.push(("dist", dist));
265        }
266
267        childless_record("StreamTableScan", vec)
268    }
269}
270
271impl StreamNode for StreamTableScan {
272    fn to_stream_prost_body(&self, _state: &mut BuildFragmentGraphState) -> PbNodeBody {
273        unreachable!(
274            "stream scan cannot be converted into a prost body -- call `adhoc_to_stream_prost` instead."
275        )
276    }
277}
278
279impl StreamTableScan {
280    pub fn adhoc_to_stream_prost(
281        &self,
282        state: &mut BuildFragmentGraphState,
283    ) -> SchedulerResult<PbStreamNode> {
284        use risingwave_pb::stream_plan::*;
285
286        let stream_key = self
287            .stream_key()
288            .unwrap_or(&[])
289            .iter()
290            .map(|x| *x as u32)
291            .collect_vec();
292
293        // The required columns from the table (both scan and upstream).
294        let upstream_column_ids = match self.stream_scan_type {
295            // For backfill, we additionally need the primary key columns.
296            StreamScanType::Backfill
297            | StreamScanType::ArrangementBackfill
298            | StreamScanType::SnapshotBackfill
299            | StreamScanType::CrossDbSnapshotBackfill => self.core.output_and_pk_column_ids(),
300            StreamScanType::Chain | StreamScanType::Rearrange | StreamScanType::UpstreamOnly => {
301                self.core.output_column_ids()
302            }
303            StreamScanType::Unspecified => unreachable!(),
304        }
305        .iter()
306        .map(ColumnId::get_id)
307        .collect_vec();
308
309        // The schema of the snapshot read stream
310        let snapshot_schema = upstream_column_ids
311            .iter()
312            .map(|&id| {
313                let col = self
314                    .core
315                    .get_table_columns()
316                    .iter()
317                    .find(|c| c.column_id.get_id() == id)
318                    .unwrap();
319                Field::from(col).to_prost()
320            })
321            .collect_vec();
322
323        let upstream_schema = snapshot_schema.clone();
324
325        // TODO: snapshot read of upstream mview
326        let batch_plan_node = BatchPlanNode {
327            table_desc: Some(self.core.table_desc.try_to_protobuf()?),
328            column_ids: upstream_column_ids.clone(),
329        };
330
331        let catalog = self
332            .build_backfill_state_catalog(state, self.stream_scan_type)
333            .to_internal_table_prost();
334
335        // For backfill, we first read pk + output_indices from upstream.
336        // On this, we need to further project `output_indices` to the downstream.
337        // This `output_indices` refers to that.
338        let output_indices = self
339            .core
340            .output_column_ids()
341            .iter()
342            .map(|i| {
343                upstream_column_ids
344                    .iter()
345                    .position(|&x| x == i.get_id())
346                    .unwrap() as u32
347            })
348            .collect_vec();
349
350        let arrangement_table = if self.stream_scan_type == StreamScanType::ArrangementBackfill {
351            let upstream_table_catalog = self.get_upstream_state_table();
352            Some(upstream_table_catalog.to_internal_table_prost())
353        } else {
354            None
355        };
356
357        let input = if self.stream_scan_type == StreamScanType::CrossDbSnapshotBackfill {
358            vec![]
359        } else {
360            vec![
361                // Upstream updates
362                // The merge node body will be filled by the `ActorBuilder` on the meta service.
363                PbStreamNode {
364                    node_body: Some(PbNodeBody::Merge(Default::default())),
365                    identity: "Upstream".into(),
366                    fields: upstream_schema.clone(),
367                    stream_key: vec![], // not used
368                    ..Default::default()
369                },
370                // Snapshot read
371                PbStreamNode {
372                    node_body: Some(PbNodeBody::BatchPlan(Box::new(batch_plan_node))),
373                    operator_id: self.batch_plan_id.0 as u64,
374                    identity: "BatchPlanNode".into(),
375                    fields: snapshot_schema,
376                    stream_key: vec![], // not used
377                    input: vec![],
378                    append_only: true,
379                },
380            ]
381        };
382
383        let node_body = PbNodeBody::StreamScan(Box::new(StreamScanNode {
384            table_id: self.core.table_desc.table_id.table_id,
385            stream_scan_type: self.stream_scan_type as i32,
386            // The column indices need to be forwarded to the downstream
387            output_indices,
388            upstream_column_ids,
389            // The table desc used by backfill executor
390            table_desc: Some(self.core.table_desc.try_to_protobuf()?),
391            state_table: Some(catalog),
392            arrangement_table,
393            rate_limit: self.base.ctx().overwrite_options().backfill_rate_limit,
394            ..Default::default()
395        }));
396
397        Ok(PbStreamNode {
398            fields: self.schema().to_prost(),
399            input,
400            node_body: Some(node_body),
401            stream_key,
402            operator_id: self.base.id().0 as u64,
403            identity: self.distill_to_string(),
404            append_only: self.append_only(),
405        })
406    }
407}
408
409impl ExprRewritable for StreamTableScan {
410    fn has_rewritable_expr(&self) -> bool {
411        true
412    }
413
414    fn rewrite_exprs(&self, r: &mut dyn ExprRewriter) -> PlanRef {
415        let mut core = self.core.clone();
416        core.rewrite_exprs(r);
417        Self::new_with_stream_scan_type(core, self.stream_scan_type).into()
418    }
419}
420
421impl ExprVisitable for StreamTableScan {
422    fn visit_exprs(&self, v: &mut dyn ExprVisitor) {
423        self.core.visit_exprs(v);
424    }
425}