risingwave_frontend/optimizer/plan_node/
stream_table_scan.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{BTreeMap, HashMap};
16use std::sync::Arc;
17
18use itertools::Itertools;
19use pretty_xmlish::{Pretty, XmlNode};
20use risingwave_common::catalog::Field;
21use risingwave_common::hash::VirtualNode;
22use risingwave_common::types::DataType;
23use risingwave_common::util::sort_util::OrderType;
24use risingwave_pb::stream_plan::stream_node::{PbNodeBody, PbStreamKind};
25use risingwave_pb::stream_plan::{PbStreamNode, StreamScanType};
26
27use super::stream::prelude::*;
28use super::utils::{Distill, childless_record};
29use super::{ExprRewritable, PlanBase, PlanNodeId, StreamNode, StreamPlanRef as PlanRef, generic};
30use crate::TableCatalog;
31use crate::catalog::ColumnId;
32use crate::expr::{ExprRewriter, ExprVisitor, FunctionCall};
33use crate::optimizer::plan_node::expr_visitable::ExprVisitable;
34use crate::optimizer::plan_node::utils::{IndicesDisplay, TableCatalogBuilder};
35use crate::optimizer::property::{Distribution, DistributionDisplay, MonotonicityMap};
36use crate::scheduler::SchedulerResult;
37use crate::stream_fragmenter::BuildFragmentGraphState;
38
39/// `StreamTableScan` is a virtual plan node to represent a stream table scan. It will be converted
40/// to stream scan + merge node (for upstream materialize) + batch table scan when converting to `MView`
41/// creation request.
42#[derive(Debug, Clone, PartialEq, Eq, Hash)]
43pub struct StreamTableScan {
44    pub base: PlanBase<Stream>,
45    core: generic::TableScan,
46    batch_plan_id: PlanNodeId,
47    stream_scan_type: StreamScanType,
48}
49
50impl StreamTableScan {
51    pub const BACKFILL_FINISHED_COLUMN_NAME: &str = "backfill_finished";
52    pub const EPOCH_COLUMN_NAME: &str = "epoch";
53    pub const IS_EPOCH_FINISHED_COLUMN_NAME: &str = "is_epoch_finished";
54    pub const ROW_COUNT_COLUMN_NAME: &str = "row_count";
55    pub const VNODE_COLUMN_NAME: &str = "vnode";
56
57    pub fn new_with_stream_scan_type(
58        core: generic::TableScan,
59        stream_scan_type: StreamScanType,
60    ) -> Self {
61        let batch_plan_id = core.ctx.next_plan_node_id();
62
63        let mut stream_scan_type = stream_scan_type;
64        if core.cross_database() {
65            assert_ne!(stream_scan_type, StreamScanType::UpstreamOnly);
66            // Force rewrite scan type to cross-db scan
67            stream_scan_type = StreamScanType::CrossDbSnapshotBackfill;
68        }
69
70        let distribution = {
71            match core.distribution_key() {
72                Some(distribution_key) => {
73                    if distribution_key.is_empty() {
74                        Distribution::Single
75                    } else {
76                        // See also `BatchSeqScan::clone_with_dist`.
77                        Distribution::UpstreamHashShard(distribution_key, core.table_catalog.id)
78                    }
79                }
80                None => Distribution::SomeShard,
81            }
82        };
83
84        let base = PlanBase::new_stream_with_core(
85            &core,
86            distribution,
87            if core.append_only() {
88                StreamKind::AppendOnly
89            } else {
90                StreamKind::Retract
91            },
92            false,
93            core.watermark_columns(),
94            MonotonicityMap::new(),
95        );
96        Self {
97            base,
98            core,
99            batch_plan_id,
100            stream_scan_type,
101        }
102    }
103
104    pub fn table_name(&self) -> &str {
105        self.core.table_name()
106    }
107
108    pub fn core(&self) -> &generic::TableScan {
109        &self.core
110    }
111
112    pub fn to_index_scan(
113        &self,
114        index_table_catalog: Arc<TableCatalog>,
115        primary_to_secondary_mapping: &BTreeMap<usize, usize>,
116        function_mapping: &HashMap<FunctionCall, usize>,
117        stream_scan_type: StreamScanType,
118    ) -> StreamTableScan {
119        let logical_index_scan = self.core.to_index_scan(
120            index_table_catalog,
121            primary_to_secondary_mapping,
122            function_mapping,
123        );
124        logical_index_scan
125            .distribution_key()
126            .expect("distribution key of stream chain must exist in output columns");
127        StreamTableScan::new_with_stream_scan_type(logical_index_scan, stream_scan_type)
128    }
129
130    pub fn stream_scan_type(&self) -> StreamScanType {
131        self.stream_scan_type
132    }
133
134    // TODO: Add note to reviewer about safety, because of `generic::TableScan` limitation.
135    fn get_upstream_state_table(&self) -> &TableCatalog {
136        self.core.table_catalog.as_ref()
137    }
138
139    /// Build catalog for backfill state
140    ///
141    /// When `stream_scan_type` is not `StreamScanType::SnapshotBackfill`:
142    ///
143    /// Schema: | vnode | pk ... | `backfill_finished` | `row_count` |
144    ///
145    /// key:    | vnode |
146    /// value:  | pk ... | `backfill_finished` | `row_count` |
147    ///
148    /// When we update the backfill progress,
149    /// we update it for all vnodes.
150    ///
151    /// `pk` refers to the upstream pk which we use to track the backfill progress.
152    ///
153    /// `vnode` is the corresponding vnode of the upstream's distribution key.
154    ///         It should also match the vnode of the backfill executor.
155    ///
156    /// `backfill_finished` is a boolean which just indicates if backfill is done.
157    ///
158    /// `row_count` is a count of rows which indicates the # of rows per executor.
159    ///             We used to track this in memory.
160    ///             But for backfill persistence we have to also persist it.
161    ///
162    /// FIXME(kwannoel):
163    /// - Across all vnodes, the values are the same.
164    /// - e.g.
165    ///   | vnode | pk ...  | `backfill_finished` | `row_count` |
166    ///   | 1002 | Int64(1) | t                   | 10          |
167    ///   | 1003 | Int64(1) | t                   | 10          |
168    ///   | 1003 | Int64(1) | t                   | 10          |
169    ///
170    /// Eventually we should track progress per vnode, to support scaling with both mview and
171    /// the corresponding `no_shuffle_backfill`.
172    /// However this is not high priority, since we are working on supporting arrangement backfill,
173    /// which already has this capability.
174    ///
175    ///
176    /// When `stream_scan_type` is `StreamScanType::SnapshotBackfill`:
177    ///
178    /// Schema: | vnode | `epoch` | `row_count` | `is_epoch_finished` | pk ...
179    ///
180    /// key:    | vnode |
181    /// value:  | `epoch` | `row_count` | `is_epoch_finished` | pk ...
182    pub fn build_backfill_state_catalog(
183        &self,
184        state: &mut BuildFragmentGraphState,
185        stream_scan_type: StreamScanType,
186    ) -> TableCatalog {
187        let mut catalog_builder = TableCatalogBuilder::default();
188        let upstream_schema = &self.core.get_table_columns();
189
190        // We use vnode as primary key in state table.
191        // If `Distribution::Single`, vnode will just be `VirtualNode::default()`.
192        catalog_builder.add_column(&Field::with_name(
193            VirtualNode::RW_TYPE,
194            Self::VNODE_COLUMN_NAME,
195        ));
196        catalog_builder.add_order_column(0, OrderType::ascending());
197
198        match stream_scan_type {
199            StreamScanType::Chain
200            | StreamScanType::Rearrange
201            | StreamScanType::Backfill
202            | StreamScanType::UpstreamOnly
203            | StreamScanType::ArrangementBackfill => {
204                // pk columns
205                for col_order in self.core.primary_key() {
206                    let col = &upstream_schema[col_order.column_index];
207                    catalog_builder.add_column(&Field::from(&**col));
208                }
209
210                // `backfill_finished` column
211                catalog_builder.add_column(&Field::with_name(
212                    DataType::Boolean,
213                    Self::BACKFILL_FINISHED_COLUMN_NAME,
214                ));
215
216                // `row_count` column
217                catalog_builder.add_column(&Field::with_name(
218                    DataType::Int64,
219                    Self::ROW_COUNT_COLUMN_NAME,
220                ));
221            }
222            StreamScanType::SnapshotBackfill | StreamScanType::CrossDbSnapshotBackfill => {
223                // `epoch` column
224                catalog_builder
225                    .add_column(&Field::with_name(DataType::Int64, Self::EPOCH_COLUMN_NAME));
226
227                // `row_count` column
228                catalog_builder.add_column(&Field::with_name(
229                    DataType::Int64,
230                    Self::ROW_COUNT_COLUMN_NAME,
231                ));
232
233                // `is_finished` column
234                catalog_builder.add_column(&Field::with_name(
235                    DataType::Boolean,
236                    Self::IS_EPOCH_FINISHED_COLUMN_NAME,
237                ));
238
239                // pk columns
240                for col_order in self.core.primary_key() {
241                    let col = &upstream_schema[col_order.column_index];
242                    catalog_builder.add_column(&Field::from(&col.column_desc));
243                }
244            }
245            StreamScanType::Unspecified => {
246                unreachable!()
247            }
248        }
249
250        // Reuse the state store pk (vnode) as the vnode as well.
251        catalog_builder.set_vnode_col_idx(0);
252        catalog_builder.set_dist_key_in_pk(vec![0]);
253
254        let num_of_columns = catalog_builder.columns().len();
255        catalog_builder.set_value_indices((1..num_of_columns).collect_vec());
256
257        catalog_builder
258            .build(vec![0], 1)
259            .with_id(state.gen_table_id_wrapped())
260    }
261}
262
263impl_plan_tree_node_for_leaf! { Stream, StreamTableScan }
264
265impl Distill for StreamTableScan {
266    fn distill<'a>(&self) -> XmlNode<'a> {
267        let verbose = self.base.ctx().is_explain_verbose();
268        let mut vec = Vec::with_capacity(4);
269        vec.push(("table", Pretty::from(self.core.table_name().to_owned())));
270        vec.push(("columns", self.core.columns_pretty(verbose)));
271
272        if verbose {
273            vec.push(("stream_scan_type", Pretty::debug(&self.stream_scan_type)));
274            let stream_key = IndicesDisplay {
275                indices: self.stream_key().unwrap_or_default(),
276                schema: self.base.schema(),
277            };
278            vec.push(("stream_key", stream_key.distill()));
279            let pk = IndicesDisplay {
280                indices: &self
281                    .core
282                    .primary_key()
283                    .iter()
284                    .map(|x| x.column_index)
285                    .collect_vec(),
286                schema: &self.core.table_catalog.column_schema(),
287            };
288            vec.push(("pk", pk.distill()));
289            let dist = Pretty::display(&DistributionDisplay {
290                distribution: self.distribution(),
291                input_schema: self.base.schema(),
292            });
293            vec.push(("dist", dist));
294        }
295
296        childless_record("StreamTableScan", vec)
297    }
298}
299
300impl StreamNode for StreamTableScan {
301    fn to_stream_prost_body(&self, _state: &mut BuildFragmentGraphState) -> PbNodeBody {
302        unreachable!(
303            "stream scan cannot be converted into a prost body -- call `adhoc_to_stream_prost` instead."
304        )
305    }
306}
307
308impl StreamTableScan {
309    pub fn adhoc_to_stream_prost(
310        &self,
311        state: &mut BuildFragmentGraphState,
312    ) -> SchedulerResult<PbStreamNode> {
313        use risingwave_pb::stream_plan::*;
314
315        let stream_key = self
316            .stream_key()
317            .unwrap_or(&[])
318            .iter()
319            .map(|x| *x as u32)
320            .collect_vec();
321
322        // The required columns from the table (both scan and upstream).
323        let upstream_column_ids = match self.stream_scan_type {
324            // For backfill, we additionally need the primary key columns.
325            StreamScanType::Backfill
326            | StreamScanType::ArrangementBackfill
327            | StreamScanType::SnapshotBackfill
328            | StreamScanType::CrossDbSnapshotBackfill => self.core.output_and_pk_column_ids(),
329            StreamScanType::Chain | StreamScanType::Rearrange | StreamScanType::UpstreamOnly => {
330                self.core.output_column_ids()
331            }
332            StreamScanType::Unspecified => unreachable!(),
333        }
334        .iter()
335        .map(ColumnId::get_id)
336        .collect_vec();
337
338        // The schema of the snapshot read stream
339        let snapshot_schema = upstream_column_ids
340            .iter()
341            .map(|&id| {
342                let col = self
343                    .core
344                    .get_table_columns()
345                    .iter()
346                    .find(|c| c.column_id.get_id() == id)
347                    .unwrap();
348                Field::from(&col.column_desc).to_prost()
349            })
350            .collect_vec();
351
352        let upstream_schema = snapshot_schema.clone();
353
354        // TODO: snapshot read of upstream mview
355        let batch_plan_node = BatchPlanNode {
356            table_desc: Some(self.core.table_catalog.table_desc().try_to_protobuf()?),
357            column_ids: upstream_column_ids.clone(),
358        };
359
360        let catalog = self
361            .build_backfill_state_catalog(state, self.stream_scan_type)
362            .to_internal_table_prost();
363
364        // For backfill, we first read pk + output_indices from upstream.
365        // On this, we need to further project `output_indices` to the downstream.
366        // This `output_indices` refers to that.
367        let output_indices = self
368            .core
369            .output_column_ids()
370            .iter()
371            .map(|i| {
372                upstream_column_ids
373                    .iter()
374                    .position(|&x| x == i.get_id())
375                    .unwrap() as u32
376            })
377            .collect_vec();
378
379        let arrangement_table = if self.stream_scan_type == StreamScanType::ArrangementBackfill {
380            let upstream_table_catalog = self.get_upstream_state_table();
381            Some(upstream_table_catalog.to_internal_table_prost())
382        } else {
383            None
384        };
385
386        let input = if self.stream_scan_type == StreamScanType::CrossDbSnapshotBackfill {
387            vec![]
388        } else {
389            vec![
390                // Upstream updates
391                // The merge node body will be filled by the `ActorBuilder` on the meta service.
392                PbStreamNode {
393                    node_body: Some(PbNodeBody::Merge(Default::default())),
394                    identity: "Upstream".into(),
395                    fields: upstream_schema.clone(),
396                    stream_key: vec![], // not used
397                    ..Default::default()
398                },
399                // Snapshot read
400                PbStreamNode {
401                    node_body: Some(PbNodeBody::BatchPlan(Box::new(batch_plan_node))),
402                    operator_id: self.batch_plan_id.0 as u64,
403                    identity: "BatchPlanNode".into(),
404                    fields: snapshot_schema,
405                    stream_key: vec![], // not used
406                    input: vec![],
407                    stream_kind: PbStreamKind::AppendOnly as i32,
408                },
409            ]
410        };
411
412        let node_body = PbNodeBody::StreamScan(Box::new(StreamScanNode {
413            table_id: self.core.table_catalog.id.table_id,
414            stream_scan_type: self.stream_scan_type as i32,
415            // The column indices need to be forwarded to the downstream
416            output_indices,
417            upstream_column_ids,
418            // The table desc used by backfill executor
419            table_desc: Some(self.core.table_catalog.table_desc().try_to_protobuf()?),
420            state_table: Some(catalog),
421            arrangement_table,
422            rate_limit: self.base.ctx().overwrite_options().backfill_rate_limit,
423            ..Default::default()
424        }));
425
426        Ok(PbStreamNode {
427            fields: self.schema().to_prost(),
428            input,
429            node_body: Some(node_body),
430            stream_key,
431            operator_id: self.base.id().0 as u64,
432            identity: self.distill_to_string(),
433            stream_kind: self.stream_kind().to_protobuf() as i32,
434        })
435    }
436}
437
438impl ExprRewritable<Stream> for StreamTableScan {
439    fn has_rewritable_expr(&self) -> bool {
440        true
441    }
442
443    fn rewrite_exprs(&self, r: &mut dyn ExprRewriter) -> PlanRef {
444        let mut core = self.core.clone();
445        core.rewrite_exprs(r);
446        Self::new_with_stream_scan_type(core, self.stream_scan_type).into()
447    }
448}
449
450impl ExprVisitable for StreamTableScan {
451    fn visit_exprs(&self, v: &mut dyn ExprVisitor) {
452        self.core.visit_exprs(v);
453    }
454}