risingwave_frontend/optimizer/plan_node/
stream_table_scan.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{BTreeMap, HashMap};
16use std::sync::Arc;
17
18use itertools::Itertools;
19use pretty_xmlish::{Pretty, XmlNode};
20use risingwave_common::catalog::Field;
21use risingwave_common::hash::VirtualNode;
22use risingwave_common::types::DataType;
23use risingwave_common::util::sort_util::OrderType;
24use risingwave_pb::stream_plan::stream_node::PbNodeBody;
25use risingwave_pb::stream_plan::{PbStreamNode, StreamScanType};
26
27use super::stream::prelude::*;
28use super::utils::{Distill, childless_record};
29use super::{ExprRewritable, PlanBase, PlanNodeId, StreamNode, StreamPlanRef as PlanRef, generic};
30use crate::TableCatalog;
31use crate::catalog::ColumnId;
32use crate::expr::{ExprRewriter, ExprVisitor, FunctionCall};
33use crate::optimizer::plan_node::expr_visitable::ExprVisitable;
34use crate::optimizer::plan_node::utils::{IndicesDisplay, TableCatalogBuilder};
35use crate::optimizer::property::{Distribution, DistributionDisplay, MonotonicityMap};
36use crate::scheduler::SchedulerResult;
37use crate::stream_fragmenter::BuildFragmentGraphState;
38
39/// `StreamTableScan` is a virtual plan node to represent a stream table scan. It will be converted
40/// to stream scan + merge node (for upstream materialize) + batch table scan when converting to `MView`
41/// creation request.
42#[derive(Debug, Clone, PartialEq, Eq, Hash)]
43pub struct StreamTableScan {
44    pub base: PlanBase<Stream>,
45    core: generic::TableScan,
46    batch_plan_id: PlanNodeId,
47    stream_scan_type: StreamScanType,
48}
49
50impl StreamTableScan {
51    pub const BACKFILL_FINISHED_COLUMN_NAME: &str = "backfill_finished";
52    pub const EPOCH_COLUMN_NAME: &str = "epoch";
53    pub const IS_EPOCH_FINISHED_COLUMN_NAME: &str = "is_epoch_finished";
54    pub const ROW_COUNT_COLUMN_NAME: &str = "row_count";
55    pub const VNODE_COLUMN_NAME: &str = "vnode";
56
57    pub fn new_with_stream_scan_type(
58        core: generic::TableScan,
59        stream_scan_type: StreamScanType,
60    ) -> Self {
61        let batch_plan_id = core.ctx.next_plan_node_id();
62
63        let distribution = {
64            match core.distribution_key() {
65                Some(distribution_key) => {
66                    if distribution_key.is_empty() {
67                        Distribution::Single
68                    } else {
69                        // See also `BatchSeqScan::clone_with_dist`.
70                        Distribution::UpstreamHashShard(distribution_key, core.table_catalog.id)
71                    }
72                }
73                None => Distribution::SomeShard,
74            }
75        };
76
77        let base = PlanBase::new_stream_with_core(
78            &core,
79            distribution,
80            if core.append_only() {
81                StreamKind::AppendOnly
82            } else {
83                StreamKind::Retract
84            },
85            false,
86            core.watermark_columns(),
87            MonotonicityMap::new(),
88        );
89        Self {
90            base,
91            core,
92            batch_plan_id,
93            stream_scan_type,
94        }
95    }
96
97    pub fn table_name(&self) -> &str {
98        self.core.table_name()
99    }
100
101    pub fn core(&self) -> &generic::TableScan {
102        &self.core
103    }
104
105    pub fn to_index_scan(
106        &self,
107        index_table_catalog: Arc<TableCatalog>,
108        primary_to_secondary_mapping: &BTreeMap<usize, usize>,
109        function_mapping: &HashMap<FunctionCall, usize>,
110        stream_scan_type: StreamScanType,
111    ) -> StreamTableScan {
112        let logical_index_scan = self.core.to_index_scan(
113            index_table_catalog,
114            primary_to_secondary_mapping,
115            function_mapping,
116        );
117        logical_index_scan
118            .distribution_key()
119            .expect("distribution key of stream chain must exist in output columns");
120        StreamTableScan::new_with_stream_scan_type(logical_index_scan, stream_scan_type)
121    }
122
123    pub fn stream_scan_type(&self) -> StreamScanType {
124        self.stream_scan_type
125    }
126
127    // TODO: Add note to reviewer about safety, because of `generic::TableScan` limitation.
128    fn get_upstream_state_table(&self) -> &TableCatalog {
129        self.core.table_catalog.as_ref()
130    }
131
132    /// Build catalog for backfill state
133    ///
134    /// When `stream_scan_type` is not `StreamScanType::SnapshotBackfill`:
135    ///
136    /// Schema: | vnode | pk ... | `backfill_finished` | `row_count` |
137    ///
138    /// key:    | vnode |
139    /// value:  | pk ... | `backfill_finished` | `row_count` |
140    ///
141    /// When we update the backfill progress,
142    /// we update it for all vnodes.
143    ///
144    /// `pk` refers to the upstream pk which we use to track the backfill progress.
145    ///
146    /// `vnode` is the corresponding vnode of the upstream's distribution key.
147    ///         It should also match the vnode of the backfill executor.
148    ///
149    /// `backfill_finished` is a boolean which just indicates if backfill is done.
150    ///
151    /// `row_count` is a count of rows which indicates the # of rows per executor.
152    ///             We used to track this in memory.
153    ///             But for backfill persistence we have to also persist it.
154    ///
155    /// FIXME(kwannoel):
156    /// - Across all vnodes, the values are the same.
157    /// - e.g.
158    ///   | vnode | pk ...  | `backfill_finished` | `row_count` |
159    ///   | 1002 | Int64(1) | t                   | 10          |
160    ///   | 1003 | Int64(1) | t                   | 10          |
161    ///   | 1003 | Int64(1) | t                   | 10          |
162    ///
163    /// Eventually we should track progress per vnode, to support scaling with both mview and
164    /// the corresponding `no_shuffle_backfill`.
165    /// However this is not high priority, since we are working on supporting arrangement backfill,
166    /// which already has this capability.
167    ///
168    ///
169    /// When `stream_scan_type` is `StreamScanType::SnapshotBackfill`:
170    ///
171    /// Schema: | vnode | `epoch` | `row_count` | `is_epoch_finished` | pk ...
172    ///
173    /// key:    | vnode |
174    /// value:  | `epoch` | `row_count` | `is_epoch_finished` | pk ...
175    pub fn build_backfill_state_catalog(
176        &self,
177        state: &mut BuildFragmentGraphState,
178        stream_scan_type: StreamScanType,
179    ) -> TableCatalog {
180        let mut catalog_builder = TableCatalogBuilder::default();
181        let upstream_schema = &self.core.get_table_columns();
182
183        // We use vnode as primary key in state table.
184        // If `Distribution::Single`, vnode will just be `VirtualNode::default()`.
185        catalog_builder.add_column(&Field::with_name(
186            VirtualNode::RW_TYPE,
187            Self::VNODE_COLUMN_NAME,
188        ));
189        catalog_builder.add_order_column(0, OrderType::ascending());
190
191        match stream_scan_type {
192            StreamScanType::Chain
193            | StreamScanType::Rearrange
194            | StreamScanType::Backfill
195            | StreamScanType::UpstreamOnly
196            | StreamScanType::ArrangementBackfill => {
197                // pk columns
198                for col_order in self.core.primary_key() {
199                    let col = &upstream_schema[col_order.column_index];
200                    catalog_builder.add_column(&Field::from(&**col));
201                }
202
203                // `backfill_finished` column
204                catalog_builder.add_column(&Field::with_name(
205                    DataType::Boolean,
206                    Self::BACKFILL_FINISHED_COLUMN_NAME,
207                ));
208
209                // `row_count` column
210                catalog_builder.add_column(&Field::with_name(
211                    DataType::Int64,
212                    Self::ROW_COUNT_COLUMN_NAME,
213                ));
214            }
215            StreamScanType::SnapshotBackfill | StreamScanType::CrossDbSnapshotBackfill => {
216                // `epoch` column
217                catalog_builder
218                    .add_column(&Field::with_name(DataType::Int64, Self::EPOCH_COLUMN_NAME));
219
220                // `row_count` column
221                catalog_builder.add_column(&Field::with_name(
222                    DataType::Int64,
223                    Self::ROW_COUNT_COLUMN_NAME,
224                ));
225
226                // `is_finished` column
227                catalog_builder.add_column(&Field::with_name(
228                    DataType::Boolean,
229                    Self::IS_EPOCH_FINISHED_COLUMN_NAME,
230                ));
231
232                // pk columns
233                for col_order in self.core.primary_key() {
234                    let col = &upstream_schema[col_order.column_index];
235                    catalog_builder.add_column(&Field::from(&col.column_desc));
236                }
237            }
238            StreamScanType::Unspecified => {
239                unreachable!()
240            }
241        }
242
243        // Reuse the state store pk (vnode) as the vnode as well.
244        catalog_builder.set_vnode_col_idx(0);
245        catalog_builder.set_dist_key_in_pk(vec![0]);
246
247        let num_of_columns = catalog_builder.columns().len();
248        catalog_builder.set_value_indices((1..num_of_columns).collect_vec());
249
250        catalog_builder
251            .build(vec![0], 1)
252            .with_id(state.gen_table_id_wrapped())
253    }
254}
255
256impl_plan_tree_node_for_leaf! { Stream, StreamTableScan }
257
258impl Distill for StreamTableScan {
259    fn distill<'a>(&self) -> XmlNode<'a> {
260        let verbose = self.base.ctx().is_explain_verbose();
261        let mut vec = Vec::with_capacity(4);
262        vec.push(("table", Pretty::from(self.core.table_name().to_owned())));
263        vec.push(("columns", self.core.columns_pretty(verbose)));
264
265        if verbose {
266            vec.push(("stream_scan_type", Pretty::debug(&self.stream_scan_type)));
267            let stream_key = IndicesDisplay {
268                indices: self.stream_key().unwrap_or_default(),
269                schema: self.base.schema(),
270            };
271            vec.push(("stream_key", stream_key.distill()));
272            let pk = IndicesDisplay {
273                indices: &self
274                    .core
275                    .primary_key()
276                    .iter()
277                    .map(|x| x.column_index)
278                    .collect_vec(),
279                schema: &self.core.table_catalog.column_schema(),
280            };
281            vec.push(("pk", pk.distill()));
282            let dist = Pretty::display(&DistributionDisplay {
283                distribution: self.distribution(),
284                input_schema: self.base.schema(),
285            });
286            vec.push(("dist", dist));
287        }
288
289        childless_record("StreamTableScan", vec)
290    }
291}
292
293impl StreamNode for StreamTableScan {
294    fn to_stream_prost_body(&self, _state: &mut BuildFragmentGraphState) -> PbNodeBody {
295        unreachable!(
296            "stream scan cannot be converted into a prost body -- call `adhoc_to_stream_prost` instead."
297        )
298    }
299}
300
301impl StreamTableScan {
302    pub fn adhoc_to_stream_prost(
303        &self,
304        state: &mut BuildFragmentGraphState,
305    ) -> SchedulerResult<PbStreamNode> {
306        use risingwave_pb::stream_plan::*;
307
308        let stream_key = self
309            .stream_key()
310            .unwrap_or(&[])
311            .iter()
312            .map(|x| *x as u32)
313            .collect_vec();
314
315        // The required columns from the table (both scan and upstream).
316        let upstream_column_ids = match self.stream_scan_type {
317            // For backfill, we additionally need the primary key columns.
318            StreamScanType::Backfill
319            | StreamScanType::ArrangementBackfill
320            | StreamScanType::SnapshotBackfill
321            | StreamScanType::CrossDbSnapshotBackfill => self.core.output_and_pk_column_ids(),
322            StreamScanType::Chain | StreamScanType::Rearrange | StreamScanType::UpstreamOnly => {
323                self.core.output_column_ids()
324            }
325            StreamScanType::Unspecified => unreachable!(),
326        }
327        .iter()
328        .map(ColumnId::get_id)
329        .collect_vec();
330
331        // The schema of the snapshot read stream
332        let snapshot_schema = upstream_column_ids
333            .iter()
334            .map(|&id| {
335                let col = self
336                    .core
337                    .get_table_columns()
338                    .iter()
339                    .find(|c| c.column_id.get_id() == id)
340                    .unwrap();
341                Field::from(&col.column_desc).to_prost()
342            })
343            .collect_vec();
344
345        let upstream_schema = snapshot_schema.clone();
346
347        // TODO: snapshot read of upstream mview
348        let batch_plan_node = BatchPlanNode {
349            table_desc: Some(self.core.table_catalog.table_desc().try_to_protobuf()?),
350            column_ids: upstream_column_ids.clone(),
351        };
352
353        let catalog = self
354            .build_backfill_state_catalog(state, self.stream_scan_type)
355            .to_internal_table_prost();
356
357        // For backfill, we first read pk + output_indices from upstream.
358        // On this, we need to further project `output_indices` to the downstream.
359        // This `output_indices` refers to that.
360        let output_indices = self
361            .core
362            .output_column_ids()
363            .iter()
364            .map(|i| {
365                upstream_column_ids
366                    .iter()
367                    .position(|&x| x == i.get_id())
368                    .unwrap() as u32
369            })
370            .collect_vec();
371
372        let arrangement_table = if self.stream_scan_type == StreamScanType::ArrangementBackfill {
373            let upstream_table_catalog = self.get_upstream_state_table();
374            Some(upstream_table_catalog.to_internal_table_prost())
375        } else {
376            None
377        };
378
379        let input = if self.stream_scan_type == StreamScanType::CrossDbSnapshotBackfill {
380            vec![]
381        } else {
382            vec![
383                // Upstream updates
384                // The merge node body will be filled by the `ActorBuilder` on the meta service.
385                PbStreamNode {
386                    node_body: Some(PbNodeBody::Merge(Default::default())),
387                    identity: "Upstream".into(),
388                    fields: upstream_schema.clone(),
389                    stream_key: vec![], // not used
390                    ..Default::default()
391                },
392                // Snapshot read
393                PbStreamNode {
394                    node_body: Some(PbNodeBody::BatchPlan(Box::new(batch_plan_node))),
395                    operator_id: self.batch_plan_id.0 as u64,
396                    identity: "BatchPlanNode".into(),
397                    fields: snapshot_schema,
398                    stream_key: vec![], // not used
399                    input: vec![],
400                    append_only: true,
401                },
402            ]
403        };
404
405        let node_body = PbNodeBody::StreamScan(Box::new(StreamScanNode {
406            table_id: self.core.table_catalog.id.table_id,
407            stream_scan_type: self.stream_scan_type as i32,
408            // The column indices need to be forwarded to the downstream
409            output_indices,
410            upstream_column_ids,
411            // The table desc used by backfill executor
412            table_desc: Some(self.core.table_catalog.table_desc().try_to_protobuf()?),
413            state_table: Some(catalog),
414            arrangement_table,
415            rate_limit: self.base.ctx().overwrite_options().backfill_rate_limit,
416            ..Default::default()
417        }));
418
419        Ok(PbStreamNode {
420            fields: self.schema().to_prost(),
421            input,
422            node_body: Some(node_body),
423            stream_key,
424            operator_id: self.base.id().0 as u64,
425            identity: self.distill_to_string(),
426            append_only: self.append_only(),
427        })
428    }
429}
430
431impl ExprRewritable<Stream> for StreamTableScan {
432    fn has_rewritable_expr(&self) -> bool {
433        true
434    }
435
436    fn rewrite_exprs(&self, r: &mut dyn ExprRewriter) -> PlanRef {
437        let mut core = self.core.clone();
438        core.rewrite_exprs(r);
439        Self::new_with_stream_scan_type(core, self.stream_scan_type).into()
440    }
441}
442
443impl ExprVisitable for StreamTableScan {
444    fn visit_exprs(&self, v: &mut dyn ExprVisitor) {
445        self.core.visit_exprs(v);
446    }
447}