risingwave_frontend/optimizer/plan_node/
stream_table_scan.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{BTreeMap, HashMap};
16use std::sync::Arc;
17
18use itertools::Itertools;
19use pretty_xmlish::{Pretty, XmlNode};
20use risingwave_common::catalog::Field;
21use risingwave_common::hash::VirtualNode;
22use risingwave_common::types::DataType;
23use risingwave_common::util::sort_util::OrderType;
24use risingwave_pb::stream_plan::stream_node::PbNodeBody;
25use risingwave_pb::stream_plan::{PbStreamNode, StreamScanType};
26
27use super::stream::prelude::*;
28use super::utils::{Distill, childless_record};
29use super::{ExprRewritable, PlanBase, PlanNodeId, PlanRef, StreamNode, generic};
30use crate::TableCatalog;
31use crate::catalog::ColumnId;
32use crate::expr::{ExprRewriter, ExprVisitor, FunctionCall};
33use crate::optimizer::plan_node::expr_visitable::ExprVisitable;
34use crate::optimizer::plan_node::utils::{IndicesDisplay, TableCatalogBuilder};
35use crate::optimizer::property::{Distribution, DistributionDisplay, MonotonicityMap};
36use crate::scheduler::SchedulerResult;
37use crate::stream_fragmenter::BuildFragmentGraphState;
38
39/// `StreamTableScan` is a virtual plan node to represent a stream table scan. It will be converted
40/// to stream scan + merge node (for upstream materialize) + batch table scan when converting to `MView`
41/// creation request.
42#[derive(Debug, Clone, PartialEq, Eq, Hash)]
43pub struct StreamTableScan {
44    pub base: PlanBase<Stream>,
45    core: generic::TableScan,
46    batch_plan_id: PlanNodeId,
47    stream_scan_type: StreamScanType,
48}
49
50impl StreamTableScan {
51    pub const BACKFILL_FINISHED_COLUMN_NAME: &str = "backfill_finished";
52    pub const EPOCH_COLUMN_NAME: &str = "epoch";
53    pub const IS_EPOCH_FINISHED_COLUMN_NAME: &str = "is_epoch_finished";
54    pub const ROW_COUNT_COLUMN_NAME: &str = "row_count";
55    pub const VNODE_COLUMN_NAME: &str = "vnode";
56
57    pub fn new_with_stream_scan_type(
58        core: generic::TableScan,
59        stream_scan_type: StreamScanType,
60    ) -> Self {
61        let batch_plan_id = core.ctx.next_plan_node_id();
62
63        let distribution = {
64            match core.distribution_key() {
65                Some(distribution_key) => {
66                    if distribution_key.is_empty() {
67                        Distribution::Single
68                    } else {
69                        // See also `BatchSeqScan::clone_with_dist`.
70                        Distribution::UpstreamHashShard(distribution_key, core.table_desc.table_id)
71                    }
72                }
73                None => Distribution::SomeShard,
74            }
75        };
76
77        let base = PlanBase::new_stream_with_core(
78            &core,
79            distribution,
80            core.append_only(),
81            false,
82            core.watermark_columns(),
83            MonotonicityMap::new(),
84        );
85        Self {
86            base,
87            core,
88            batch_plan_id,
89            stream_scan_type,
90        }
91    }
92
93    pub fn table_name(&self) -> &str {
94        &self.core.table_name
95    }
96
97    pub fn core(&self) -> &generic::TableScan {
98        &self.core
99    }
100
101    pub fn to_index_scan(
102        &self,
103        index_name: &str,
104        index_table_catalog: Arc<TableCatalog>,
105        primary_to_secondary_mapping: &BTreeMap<usize, usize>,
106        function_mapping: &HashMap<FunctionCall, usize>,
107        stream_scan_type: StreamScanType,
108    ) -> StreamTableScan {
109        let logical_index_scan = self.core.to_index_scan(
110            index_name,
111            index_table_catalog,
112            primary_to_secondary_mapping,
113            function_mapping,
114        );
115        logical_index_scan
116            .distribution_key()
117            .expect("distribution key of stream chain must exist in output columns");
118        StreamTableScan::new_with_stream_scan_type(logical_index_scan, stream_scan_type)
119    }
120
121    pub fn stream_scan_type(&self) -> StreamScanType {
122        self.stream_scan_type
123    }
124
125    // TODO: Add note to reviewer about safety, because of `generic::TableScan` limitation.
126    fn get_upstream_state_table(&self) -> &TableCatalog {
127        self.core.table_catalog.as_ref()
128    }
129
130    /// Build catalog for backfill state
131    ///
132    /// When `stream_scan_type` is not `StreamScanType::SnapshotBackfill`:
133    ///
134    /// Schema: | vnode | pk ... | `backfill_finished` | `row_count` |
135    ///
136    /// key:    | vnode |
137    /// value:  | pk ... | `backfill_finished` | `row_count` |
138    ///
139    /// When we update the backfill progress,
140    /// we update it for all vnodes.
141    ///
142    /// `pk` refers to the upstream pk which we use to track the backfill progress.
143    ///
144    /// `vnode` is the corresponding vnode of the upstream's distribution key.
145    ///         It should also match the vnode of the backfill executor.
146    ///
147    /// `backfill_finished` is a boolean which just indicates if backfill is done.
148    ///
149    /// `row_count` is a count of rows which indicates the # of rows per executor.
150    ///             We used to track this in memory.
151    ///             But for backfill persistence we have to also persist it.
152    ///
153    /// FIXME(kwannoel):
154    /// - Across all vnodes, the values are the same.
155    /// - e.g.
156    ///   | vnode | pk ...  | `backfill_finished` | `row_count` |
157    ///   | 1002 | Int64(1) | t                   | 10          |
158    ///   | 1003 | Int64(1) | t                   | 10          |
159    ///   | 1003 | Int64(1) | t                   | 10          |
160    ///
161    /// Eventually we should track progress per vnode, to support scaling with both mview and
162    /// the corresponding `no_shuffle_backfill`.
163    /// However this is not high priority, since we are working on supporting arrangement backfill,
164    /// which already has this capability.
165    ///
166    ///
167    /// When `stream_scan_type` is `StreamScanType::SnapshotBackfill`:
168    ///
169    /// Schema: | vnode | `epoch` | `row_count` | `is_epoch_finished` | pk ...
170    ///
171    /// key:    | vnode |
172    /// value:  | `epoch` | `row_count` | `is_epoch_finished` | pk ...
173    pub fn build_backfill_state_catalog(
174        &self,
175        state: &mut BuildFragmentGraphState,
176        stream_scan_type: StreamScanType,
177    ) -> TableCatalog {
178        let mut catalog_builder = TableCatalogBuilder::default();
179        let upstream_schema = &self.core.get_table_columns();
180
181        // We use vnode as primary key in state table.
182        // If `Distribution::Single`, vnode will just be `VirtualNode::default()`.
183        catalog_builder.add_column(&Field::with_name(
184            VirtualNode::RW_TYPE,
185            Self::VNODE_COLUMN_NAME,
186        ));
187        catalog_builder.add_order_column(0, OrderType::ascending());
188
189        match stream_scan_type {
190            StreamScanType::Chain
191            | StreamScanType::Rearrange
192            | StreamScanType::Backfill
193            | StreamScanType::UpstreamOnly
194            | StreamScanType::ArrangementBackfill => {
195                // pk columns
196                for col_order in self.core.primary_key() {
197                    let col = &upstream_schema[col_order.column_index];
198                    catalog_builder.add_column(&Field::from(col));
199                }
200
201                // `backfill_finished` column
202                catalog_builder.add_column(&Field::with_name(
203                    DataType::Boolean,
204                    Self::BACKFILL_FINISHED_COLUMN_NAME,
205                ));
206
207                // `row_count` column
208                catalog_builder.add_column(&Field::with_name(
209                    DataType::Int64,
210                    Self::ROW_COUNT_COLUMN_NAME,
211                ));
212            }
213            StreamScanType::SnapshotBackfill | StreamScanType::CrossDbSnapshotBackfill => {
214                // `epoch` column
215                catalog_builder
216                    .add_column(&Field::with_name(DataType::Int64, Self::EPOCH_COLUMN_NAME));
217
218                // `row_count` column
219                catalog_builder.add_column(&Field::with_name(
220                    DataType::Int64,
221                    Self::ROW_COUNT_COLUMN_NAME,
222                ));
223
224                // `is_finished` column
225                catalog_builder.add_column(&Field::with_name(
226                    DataType::Boolean,
227                    Self::IS_EPOCH_FINISHED_COLUMN_NAME,
228                ));
229
230                // pk columns
231                for col_order in self.core.primary_key() {
232                    let col = &upstream_schema[col_order.column_index];
233                    catalog_builder.add_column(&Field::from(col));
234                }
235            }
236            StreamScanType::Unspecified => {
237                unreachable!()
238            }
239        }
240
241        // Reuse the state store pk (vnode) as the vnode as well.
242        catalog_builder.set_vnode_col_idx(0);
243        catalog_builder.set_dist_key_in_pk(vec![0]);
244
245        let num_of_columns = catalog_builder.columns().len();
246        catalog_builder.set_value_indices((1..num_of_columns).collect_vec());
247
248        catalog_builder
249            .build(vec![0], 1)
250            .with_id(state.gen_table_id_wrapped())
251    }
252}
253
254impl_plan_tree_node_for_leaf! { StreamTableScan }
255
256impl Distill for StreamTableScan {
257    fn distill<'a>(&self) -> XmlNode<'a> {
258        let verbose = self.base.ctx().is_explain_verbose();
259        let mut vec = Vec::with_capacity(4);
260        vec.push(("table", Pretty::from(self.core.table_name.clone())));
261        vec.push(("columns", self.core.columns_pretty(verbose)));
262
263        if verbose {
264            vec.push(("stream_scan_type", Pretty::debug(&self.stream_scan_type)));
265            let stream_key = IndicesDisplay {
266                indices: self.stream_key().unwrap_or_default(),
267                schema: self.base.schema(),
268            };
269            vec.push(("stream_key", stream_key.distill()));
270            let pk = IndicesDisplay {
271                indices: &self
272                    .core
273                    .primary_key()
274                    .iter()
275                    .map(|x| x.column_index)
276                    .collect_vec(),
277                schema: &self.core.table_catalog.column_schema(),
278            };
279            vec.push(("pk", pk.distill()));
280            let dist = Pretty::display(&DistributionDisplay {
281                distribution: self.distribution(),
282                input_schema: self.base.schema(),
283            });
284            vec.push(("dist", dist));
285        }
286
287        childless_record("StreamTableScan", vec)
288    }
289}
290
291impl StreamNode for StreamTableScan {
292    fn to_stream_prost_body(&self, _state: &mut BuildFragmentGraphState) -> PbNodeBody {
293        unreachable!(
294            "stream scan cannot be converted into a prost body -- call `adhoc_to_stream_prost` instead."
295        )
296    }
297}
298
299impl StreamTableScan {
300    pub fn adhoc_to_stream_prost(
301        &self,
302        state: &mut BuildFragmentGraphState,
303    ) -> SchedulerResult<PbStreamNode> {
304        use risingwave_pb::stream_plan::*;
305
306        let stream_key = self
307            .stream_key()
308            .unwrap_or(&[])
309            .iter()
310            .map(|x| *x as u32)
311            .collect_vec();
312
313        // The required columns from the table (both scan and upstream).
314        let upstream_column_ids = match self.stream_scan_type {
315            // For backfill, we additionally need the primary key columns.
316            StreamScanType::Backfill
317            | StreamScanType::ArrangementBackfill
318            | StreamScanType::SnapshotBackfill
319            | StreamScanType::CrossDbSnapshotBackfill => self.core.output_and_pk_column_ids(),
320            StreamScanType::Chain | StreamScanType::Rearrange | StreamScanType::UpstreamOnly => {
321                self.core.output_column_ids()
322            }
323            StreamScanType::Unspecified => unreachable!(),
324        }
325        .iter()
326        .map(ColumnId::get_id)
327        .collect_vec();
328
329        // The schema of the snapshot read stream
330        let snapshot_schema = upstream_column_ids
331            .iter()
332            .map(|&id| {
333                let col = self
334                    .core
335                    .get_table_columns()
336                    .iter()
337                    .find(|c| c.column_id.get_id() == id)
338                    .unwrap();
339                Field::from(col).to_prost()
340            })
341            .collect_vec();
342
343        let upstream_schema = snapshot_schema.clone();
344
345        // TODO: snapshot read of upstream mview
346        let batch_plan_node = BatchPlanNode {
347            table_desc: Some(self.core.table_desc.try_to_protobuf()?),
348            column_ids: upstream_column_ids.clone(),
349        };
350
351        let catalog = self
352            .build_backfill_state_catalog(state, self.stream_scan_type)
353            .to_internal_table_prost();
354
355        // For backfill, we first read pk + output_indices from upstream.
356        // On this, we need to further project `output_indices` to the downstream.
357        // This `output_indices` refers to that.
358        let output_indices = self
359            .core
360            .output_column_ids()
361            .iter()
362            .map(|i| {
363                upstream_column_ids
364                    .iter()
365                    .position(|&x| x == i.get_id())
366                    .unwrap() as u32
367            })
368            .collect_vec();
369
370        let arrangement_table = if self.stream_scan_type == StreamScanType::ArrangementBackfill {
371            let upstream_table_catalog = self.get_upstream_state_table();
372            Some(upstream_table_catalog.to_internal_table_prost())
373        } else {
374            None
375        };
376
377        let input = if self.stream_scan_type == StreamScanType::CrossDbSnapshotBackfill {
378            vec![]
379        } else {
380            vec![
381                // Upstream updates
382                // The merge node body will be filled by the `ActorBuilder` on the meta service.
383                PbStreamNode {
384                    node_body: Some(PbNodeBody::Merge(Default::default())),
385                    identity: "Upstream".into(),
386                    fields: upstream_schema.clone(),
387                    stream_key: vec![], // not used
388                    ..Default::default()
389                },
390                // Snapshot read
391                PbStreamNode {
392                    node_body: Some(PbNodeBody::BatchPlan(Box::new(batch_plan_node))),
393                    operator_id: self.batch_plan_id.0 as u64,
394                    identity: "BatchPlanNode".into(),
395                    fields: snapshot_schema,
396                    stream_key: vec![], // not used
397                    input: vec![],
398                    append_only: true,
399                },
400            ]
401        };
402
403        let node_body = PbNodeBody::StreamScan(Box::new(StreamScanNode {
404            table_id: self.core.table_desc.table_id.table_id,
405            stream_scan_type: self.stream_scan_type as i32,
406            // The column indices need to be forwarded to the downstream
407            output_indices,
408            upstream_column_ids,
409            // The table desc used by backfill executor
410            table_desc: Some(self.core.table_desc.try_to_protobuf()?),
411            state_table: Some(catalog),
412            arrangement_table,
413            rate_limit: self.base.ctx().overwrite_options().backfill_rate_limit,
414            ..Default::default()
415        }));
416
417        Ok(PbStreamNode {
418            fields: self.schema().to_prost(),
419            input,
420            node_body: Some(node_body),
421            stream_key,
422            operator_id: self.base.id().0 as u64,
423            identity: self.distill_to_string(),
424            append_only: self.append_only(),
425        })
426    }
427}
428
429impl ExprRewritable for StreamTableScan {
430    fn has_rewritable_expr(&self) -> bool {
431        true
432    }
433
434    fn rewrite_exprs(&self, r: &mut dyn ExprRewriter) -> PlanRef {
435        let mut core = self.core.clone();
436        core.rewrite_exprs(r);
437        Self::new_with_stream_scan_type(core, self.stream_scan_type).into()
438    }
439}
440
441impl ExprVisitable for StreamTableScan {
442    fn visit_exprs(&self, v: &mut dyn ExprVisitor) {
443        self.core.visit_exprs(v);
444    }
445}