risingwave_frontend/optimizer/plan_node/
stream_table_scan.rs

1// Copyright 2022 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{BTreeMap, HashMap};
16use std::sync::Arc;
17
18use itertools::Itertools;
19use pretty_xmlish::{Pretty, XmlNode};
20use risingwave_common::catalog::Field;
21use risingwave_common::hash::VirtualNode;
22use risingwave_common::types::DataType;
23use risingwave_common::util::iter_util::ZipEqFast;
24use risingwave_common::util::scan_range::ScanRange;
25use risingwave_common::util::sort_util::OrderType;
26use risingwave_pb::stream_plan::stream_node::{PbNodeBody, PbStreamKind};
27use risingwave_pb::stream_plan::{PbStreamNode, StreamScanType};
28
29use super::stream::prelude::*;
30use super::utils::{Distill, childless_record};
31use super::{ExprRewritable, PlanBase, PlanNodeId, StreamNode, StreamPlanRef as PlanRef, generic};
32use crate::TableCatalog;
33use crate::catalog::ColumnId;
34use crate::expr::{ExprRewriter, ExprVisitor, FunctionCall};
35use crate::optimizer::plan_node::expr_visitable::ExprVisitable;
36use crate::optimizer::plan_node::utils::{IndicesDisplay, TableCatalogBuilder};
37use crate::optimizer::property::{Distribution, DistributionDisplay, MonotonicityMap};
38use crate::scheduler::SchedulerResult;
39use crate::stream_fragmenter::BuildFragmentGraphState;
40
41/// `StreamTableScan` is a virtual plan node to represent a stream table scan. It will be converted
42/// to stream scan + merge node (for upstream materialize) + batch table scan when converting to `MView`
43/// creation request.
44#[derive(Debug, Clone, PartialEq, Eq, Hash)]
45pub struct StreamTableScan {
46    pub base: PlanBase<Stream>,
47    core: generic::TableScan,
48    batch_plan_id: PlanNodeId,
49    stream_scan_type: StreamScanType,
50    pk_scan_range: Option<ScanRange>,
51}
52
53impl StreamTableScan {
54    pub const BACKFILL_FINISHED_COLUMN_NAME: &str = "backfill_finished";
55    pub const EPOCH_COLUMN_NAME: &str = "epoch";
56    pub const IS_EPOCH_FINISHED_COLUMN_NAME: &str = "is_epoch_finished";
57    pub const ROW_COUNT_COLUMN_NAME: &str = "row_count";
58    pub const VNODE_COLUMN_NAME: &str = "vnode";
59
60    pub fn new_with_stream_scan_type(
61        core: generic::TableScan,
62        stream_scan_type: StreamScanType,
63    ) -> Self {
64        Self::new_with_scan_range(core, stream_scan_type, None)
65    }
66
67    pub fn new_with_scan_range(
68        core: generic::TableScan,
69        stream_scan_type: StreamScanType,
70        pk_scan_range: Option<ScanRange>,
71    ) -> Self {
72        let batch_plan_id = core.ctx.next_plan_node_id();
73
74        let mut stream_scan_type = stream_scan_type;
75        if core.cross_database() {
76            assert_ne!(stream_scan_type, StreamScanType::UpstreamOnly);
77            // Force rewrite scan type to cross-db scan
78            stream_scan_type = StreamScanType::CrossDbSnapshotBackfill;
79        }
80
81        let distribution = {
82            match core.distribution_key() {
83                Some(distribution_key) => {
84                    if distribution_key.is_empty() {
85                        Distribution::Single
86                    } else {
87                        // See also `BatchSeqScan::clone_with_dist`.
88                        Distribution::UpstreamHashShard(distribution_key, core.table_catalog.id)
89                    }
90                }
91                None => Distribution::SomeShard,
92            }
93        };
94
95        let base = PlanBase::new_stream_with_core(
96            &core,
97            distribution,
98            if core.append_only() {
99                StreamKind::AppendOnly
100            } else {
101                StreamKind::Retract
102            },
103            false,
104            core.watermark_columns(),
105            MonotonicityMap::new(),
106        );
107        Self {
108            base,
109            core,
110            batch_plan_id,
111            stream_scan_type,
112            pk_scan_range,
113        }
114    }
115
116    pub fn table_name(&self) -> &str {
117        self.core.table_name()
118    }
119
120    pub fn core(&self) -> &generic::TableScan {
121        &self.core
122    }
123
124    pub fn to_index_scan(
125        &self,
126        index_table_catalog: Arc<TableCatalog>,
127        primary_to_secondary_mapping: &BTreeMap<usize, usize>,
128        function_mapping: &HashMap<FunctionCall, usize>,
129        stream_scan_type: StreamScanType,
130    ) -> StreamTableScan {
131        let logical_index_scan = self.core.to_index_scan(
132            index_table_catalog,
133            primary_to_secondary_mapping,
134            function_mapping,
135        );
136        logical_index_scan
137            .distribution_key()
138            .expect("distribution key of stream chain must exist in output columns");
139        StreamTableScan::new_with_stream_scan_type(logical_index_scan, stream_scan_type)
140    }
141
142    pub fn stream_scan_type(&self) -> StreamScanType {
143        self.stream_scan_type
144    }
145
146    pub fn pk_scan_range(&self) -> Option<&ScanRange> {
147        self.pk_scan_range.as_ref()
148    }
149
150    // TODO: Add note to reviewer about safety, because of `generic::TableScan` limitation.
151    fn get_upstream_state_table(&self) -> &TableCatalog {
152        self.core.table_catalog.as_ref()
153    }
154
155    /// Build catalog for backfill state
156    ///
157    /// When `stream_scan_type` is not `StreamScanType::SnapshotBackfill`:
158    ///
159    /// Schema: | vnode | pk ... | `backfill_finished` | `row_count` |
160    ///
161    /// key:    | vnode |
162    /// value:  | pk ... | `backfill_finished` | `row_count` |
163    ///
164    /// When we update the backfill progress,
165    /// we update it for all vnodes.
166    ///
167    /// `pk` refers to the upstream pk which we use to track the backfill progress.
168    ///
169    /// `vnode` is the corresponding vnode of the upstream's distribution key.
170    ///         It should also match the vnode of the backfill executor.
171    ///
172    /// `backfill_finished` is a boolean which just indicates if backfill is done.
173    ///
174    /// `row_count` is a count of rows which indicates the # of rows per executor.
175    ///             We used to track this in memory.
176    ///             But for backfill persistence we have to also persist it.
177    ///
178    /// FIXME(kwannoel):
179    /// - Across all vnodes, the values are the same.
180    /// - e.g.
181    ///   | vnode | pk ...  | `backfill_finished` | `row_count` |
182    ///   | 1002 | Int64(1) | t                   | 10          |
183    ///   | 1003 | Int64(1) | t                   | 10          |
184    ///   | 1003 | Int64(1) | t                   | 10          |
185    ///
186    /// Eventually we should track progress per vnode, to support scaling with both mview and
187    /// the corresponding `no_shuffle_backfill`.
188    /// However this is not high priority, since we are working on supporting arrangement backfill,
189    /// which already has this capability.
190    ///
191    ///
192    /// When `stream_scan_type` is `StreamScanType::SnapshotBackfill`:
193    ///
194    /// Schema: | vnode | `epoch` | `row_count` | `is_epoch_finished` | pk ...
195    ///
196    /// key:    | vnode |
197    /// value:  | `epoch` | `row_count` | `is_epoch_finished` | pk ...
198    pub fn build_backfill_state_catalog(
199        &self,
200        state: &mut BuildFragmentGraphState,
201        stream_scan_type: StreamScanType,
202    ) -> TableCatalog {
203        let mut catalog_builder = TableCatalogBuilder::default();
204        let upstream_schema = &self.core.get_table_columns();
205
206        // We use vnode as primary key in state table.
207        // If `Distribution::Single`, vnode will just be `VirtualNode::default()`.
208        catalog_builder.add_column(&Field::with_name(
209            VirtualNode::RW_TYPE,
210            Self::VNODE_COLUMN_NAME,
211        ));
212        catalog_builder.add_order_column(0, OrderType::ascending());
213
214        match stream_scan_type {
215            StreamScanType::Chain
216            | StreamScanType::Rearrange
217            | StreamScanType::Backfill
218            | StreamScanType::UpstreamOnly
219            | StreamScanType::ArrangementBackfill => {
220                // pk columns
221                for col_order in self.core.primary_key() {
222                    let col = &upstream_schema[col_order.column_index];
223                    catalog_builder.add_column(&Field::from(&**col));
224                }
225
226                // `backfill_finished` column
227                catalog_builder.add_column(&Field::with_name(
228                    DataType::Boolean,
229                    Self::BACKFILL_FINISHED_COLUMN_NAME,
230                ));
231
232                // `row_count` column
233                catalog_builder.add_column(&Field::with_name(
234                    DataType::Int64,
235                    Self::ROW_COUNT_COLUMN_NAME,
236                ));
237            }
238            StreamScanType::SnapshotBackfill | StreamScanType::CrossDbSnapshotBackfill => {
239                // `epoch` column
240                catalog_builder
241                    .add_column(&Field::with_name(DataType::Int64, Self::EPOCH_COLUMN_NAME));
242
243                // `row_count` column
244                catalog_builder.add_column(&Field::with_name(
245                    DataType::Int64,
246                    Self::ROW_COUNT_COLUMN_NAME,
247                ));
248
249                // `is_finished` column
250                catalog_builder.add_column(&Field::with_name(
251                    DataType::Boolean,
252                    Self::IS_EPOCH_FINISHED_COLUMN_NAME,
253                ));
254
255                // pk columns
256                for col_order in self.core.primary_key() {
257                    let col = &upstream_schema[col_order.column_index];
258                    catalog_builder.add_column(&Field::from(&col.column_desc));
259                }
260            }
261            StreamScanType::Unspecified => {
262                unreachable!()
263            }
264        }
265
266        // Reuse the state store pk (vnode) as the vnode as well.
267        catalog_builder.set_vnode_col_idx(0);
268        catalog_builder.set_dist_key_in_pk(vec![0]);
269
270        let num_of_columns = catalog_builder.columns().len();
271        catalog_builder.set_value_indices((1..num_of_columns).collect_vec());
272
273        catalog_builder
274            .build(vec![0], 1)
275            .with_id(state.gen_table_id_wrapped())
276    }
277}
278
279impl_plan_tree_node_for_leaf! { Stream, StreamTableScan }
280
281impl Distill for StreamTableScan {
282    fn distill<'a>(&self) -> XmlNode<'a> {
283        let verbose = self.base.ctx().is_explain_verbose();
284        let mut vec = Vec::with_capacity(4);
285        vec.push(("table", Pretty::from(self.core.table_name().to_owned())));
286        vec.push(("columns", self.core.columns_pretty(verbose)));
287        if let Some(scan_range) = &self.pk_scan_range {
288            let mut parts = Vec::new();
289            let pk_cols = self.core.primary_key();
290            // Display eq_conds
291            for (pk, datum) in pk_cols
292                .iter()
293                .take(scan_range.eq_conds.len())
294                .zip_eq_fast(scan_range.eq_conds.iter())
295            {
296                let field = &self.core.table_catalog.columns()[pk.column_index];
297                parts.push(format!("{} = {:?}", field.name(), datum));
298            }
299            // Display range bounds on the next column
300            let range_col_idx = scan_range.eq_conds.len();
301            if range_col_idx < pk_cols.len() {
302                use std::ops::Bound;
303                let field = &self.core.table_catalog.columns()[pk_cols[range_col_idx].column_index];
304                let fmt_bound_val = |v: &Vec<risingwave_common::types::Datum>| -> String {
305                    v.first().map_or("NULL".to_owned(), |d| format!("{:?}", d))
306                };
307                match &scan_range.range.0 {
308                    Bound::Included(v) => {
309                        parts.push(format!("{} >= {}", field.name(), fmt_bound_val(v)))
310                    }
311                    Bound::Excluded(v) => {
312                        parts.push(format!("{} > {}", field.name(), fmt_bound_val(v)))
313                    }
314                    Bound::Unbounded => {}
315                }
316                match &scan_range.range.1 {
317                    Bound::Included(v) => {
318                        parts.push(format!("{} <= {}", field.name(), fmt_bound_val(v)))
319                    }
320                    Bound::Excluded(v) => {
321                        parts.push(format!("{} < {}", field.name(), fmt_bound_val(v)))
322                    }
323                    Bound::Unbounded => {}
324                }
325            }
326            if !parts.is_empty() {
327                vec.push(("pk_scan_range", Pretty::from(parts.join(" AND "))));
328            }
329        }
330
331        if verbose {
332            vec.push(("stream_scan_type", Pretty::debug(&self.stream_scan_type)));
333            let stream_key = IndicesDisplay {
334                indices: self.stream_key().unwrap_or_default(),
335                schema: self.base.schema(),
336            };
337            vec.push(("stream_key", stream_key.distill()));
338            let pk = IndicesDisplay {
339                indices: &self
340                    .core
341                    .primary_key()
342                    .iter()
343                    .map(|x| x.column_index)
344                    .collect_vec(),
345                schema: &self.core.table_catalog.column_schema(),
346            };
347            vec.push(("pk", pk.distill()));
348            let dist = Pretty::display(&DistributionDisplay {
349                distribution: self.distribution(),
350                input_schema: self.base.schema(),
351            });
352            vec.push(("dist", dist));
353        }
354
355        childless_record("StreamTableScan", vec)
356    }
357}
358
359impl StreamNode for StreamTableScan {
360    fn to_stream_prost_body(&self, _state: &mut BuildFragmentGraphState) -> PbNodeBody {
361        unreachable!(
362            "stream scan cannot be converted into a prost body -- call `adhoc_to_stream_prost` instead."
363        )
364    }
365}
366
367impl StreamTableScan {
368    pub fn adhoc_to_stream_prost(
369        &self,
370        state: &mut BuildFragmentGraphState,
371    ) -> SchedulerResult<PbStreamNode> {
372        use risingwave_pb::stream_plan::*;
373
374        let stream_key = self
375            .stream_key()
376            .unwrap_or(&[])
377            .iter()
378            .map(|x| *x as u32)
379            .collect_vec();
380
381        // The required columns from the table (both scan and upstream).
382        let upstream_column_ids = match self.stream_scan_type {
383            // For backfill, we additionally need the primary key columns.
384            StreamScanType::Backfill
385            | StreamScanType::ArrangementBackfill
386            | StreamScanType::SnapshotBackfill
387            | StreamScanType::CrossDbSnapshotBackfill => self.core.output_and_pk_column_ids(),
388            StreamScanType::Chain | StreamScanType::Rearrange | StreamScanType::UpstreamOnly => {
389                self.core.output_column_ids()
390            }
391            StreamScanType::Unspecified => unreachable!(),
392        }
393        .iter()
394        .map(ColumnId::get_id)
395        .collect_vec();
396
397        // The schema of the snapshot read stream
398        let snapshot_schema = upstream_column_ids
399            .iter()
400            .map(|&id| {
401                let col = self
402                    .core
403                    .get_table_columns()
404                    .iter()
405                    .find(|c| c.column_id.get_id() == id)
406                    .unwrap();
407                Field::from(&col.column_desc).to_prost()
408            })
409            .collect_vec();
410
411        let upstream_schema = snapshot_schema.clone();
412
413        // TODO: snapshot read of upstream mview
414        let batch_plan_node = BatchPlanNode {
415            table_desc: Some(self.core.table_catalog.table_desc().try_to_protobuf()?),
416            column_ids: upstream_column_ids.clone(),
417        };
418
419        let catalog = self
420            .build_backfill_state_catalog(state, self.stream_scan_type)
421            .to_internal_table_prost();
422
423        // For backfill, we first read pk + output_indices from upstream.
424        // On this, we need to further project `output_indices` to the downstream.
425        // This `output_indices` refers to that.
426        let output_indices = self
427            .core
428            .output_column_ids()
429            .iter()
430            .map(|i| {
431                upstream_column_ids
432                    .iter()
433                    .position(|&x| x == i.get_id())
434                    .unwrap() as u32
435            })
436            .collect_vec();
437
438        let arrangement_table = if self.stream_scan_type == StreamScanType::ArrangementBackfill {
439            let upstream_table_catalog = self.get_upstream_state_table();
440            Some(upstream_table_catalog.to_internal_table_prost())
441        } else {
442            None
443        };
444
445        let input = if self.stream_scan_type == StreamScanType::CrossDbSnapshotBackfill {
446            vec![]
447        } else {
448            vec![
449                // Upstream updates
450                // The merge node body will be filled by the `ActorBuilder` on the meta service.
451                PbStreamNode {
452                    node_body: Some(PbNodeBody::Merge(Default::default())),
453                    identity: "Upstream".into(),
454                    fields: upstream_schema,
455                    stream_key: vec![], // not used
456                    ..Default::default()
457                },
458                // Snapshot read
459                PbStreamNode {
460                    node_body: Some(PbNodeBody::BatchPlan(Box::new(batch_plan_node))),
461                    operator_id: self.batch_plan_id.to_stream_node_operator_id(),
462                    identity: "BatchPlanNode".into(),
463                    fields: snapshot_schema,
464                    stream_key: vec![], // not used
465                    input: vec![],
466                    stream_kind: PbStreamKind::AppendOnly as i32,
467                },
468            ]
469        };
470
471        let node_body = PbNodeBody::StreamScan(Box::new(StreamScanNode {
472            table_id: self.core.table_catalog.id,
473            stream_scan_type: self.stream_scan_type as i32,
474            // The column indices need to be forwarded to the downstream
475            output_indices,
476            upstream_column_ids,
477            // The table desc used by backfill executor
478            table_desc: Some(self.core.table_catalog.table_desc().try_to_protobuf()?),
479            state_table: Some(catalog),
480            arrangement_table,
481            rate_limit: self.base.ctx().overwrite_options().backfill_rate_limit,
482            pk_scan_range: self.pk_scan_range.as_ref().map(|sr| sr.to_protobuf()),
483            ..Default::default()
484        }));
485
486        Ok(PbStreamNode {
487            fields: self.schema().to_prost(),
488            input,
489            node_body: Some(node_body),
490            stream_key,
491            operator_id: self.base.id().to_stream_node_operator_id(),
492            identity: self.distill_to_string(),
493            stream_kind: self.stream_kind().to_protobuf() as i32,
494        })
495    }
496}
497
498impl ExprRewritable<Stream> for StreamTableScan {
499    fn has_rewritable_expr(&self) -> bool {
500        true
501    }
502
503    fn rewrite_exprs(&self, r: &mut dyn ExprRewriter) -> PlanRef {
504        let mut core = self.core.clone();
505        core.rewrite_exprs(r);
506        Self::new_with_scan_range(core, self.stream_scan_type, self.pk_scan_range.clone()).into()
507    }
508}
509
510impl ExprVisitable for StreamTableScan {
511    fn visit_exprs(&self, v: &mut dyn ExprVisitor) {
512        self.core.visit_exprs(v);
513    }
514}