risingwave_frontend/optimizer/plan_node/
stream_table_scan.rs1use std::collections::{BTreeMap, HashMap};
16use std::sync::Arc;
17
18use itertools::Itertools;
19use pretty_xmlish::{Pretty, XmlNode};
20use risingwave_common::catalog::Field;
21use risingwave_common::hash::VirtualNode;
22use risingwave_common::types::DataType;
23use risingwave_common::util::iter_util::ZipEqFast;
24use risingwave_common::util::scan_range::ScanRange;
25use risingwave_common::util::sort_util::OrderType;
26use risingwave_pb::stream_plan::stream_node::{PbNodeBody, PbStreamKind};
27use risingwave_pb::stream_plan::{PbStreamNode, StreamScanType};
28
29use super::stream::prelude::*;
30use super::utils::{Distill, childless_record};
31use super::{ExprRewritable, PlanBase, PlanNodeId, StreamNode, StreamPlanRef as PlanRef, generic};
32use crate::TableCatalog;
33use crate::catalog::ColumnId;
34use crate::expr::{ExprRewriter, ExprVisitor, FunctionCall};
35use crate::optimizer::plan_node::expr_visitable::ExprVisitable;
36use crate::optimizer::plan_node::utils::{IndicesDisplay, TableCatalogBuilder};
37use crate::optimizer::property::{Distribution, DistributionDisplay, MonotonicityMap};
38use crate::scheduler::SchedulerResult;
39use crate::stream_fragmenter::BuildFragmentGraphState;
40
41#[derive(Debug, Clone, PartialEq, Eq, Hash)]
45pub struct StreamTableScan {
46 pub base: PlanBase<Stream>,
47 core: generic::TableScan,
48 batch_plan_id: PlanNodeId,
49 stream_scan_type: StreamScanType,
50 pk_scan_range: Option<ScanRange>,
51}
52
53impl StreamTableScan {
54 pub const BACKFILL_FINISHED_COLUMN_NAME: &str = "backfill_finished";
55 pub const EPOCH_COLUMN_NAME: &str = "epoch";
56 pub const IS_EPOCH_FINISHED_COLUMN_NAME: &str = "is_epoch_finished";
57 pub const ROW_COUNT_COLUMN_NAME: &str = "row_count";
58 pub const VNODE_COLUMN_NAME: &str = "vnode";
59
60 pub fn new_with_stream_scan_type(
61 core: generic::TableScan,
62 stream_scan_type: StreamScanType,
63 ) -> Self {
64 Self::new_with_scan_range(core, stream_scan_type, None)
65 }
66
67 pub fn new_with_scan_range(
68 core: generic::TableScan,
69 stream_scan_type: StreamScanType,
70 pk_scan_range: Option<ScanRange>,
71 ) -> Self {
72 let batch_plan_id = core.ctx.next_plan_node_id();
73
74 let mut stream_scan_type = stream_scan_type;
75 if core.cross_database() {
76 assert_ne!(stream_scan_type, StreamScanType::UpstreamOnly);
77 stream_scan_type = StreamScanType::CrossDbSnapshotBackfill;
79 }
80
81 let distribution = {
82 match core.distribution_key() {
83 Some(distribution_key) => {
84 if distribution_key.is_empty() {
85 Distribution::Single
86 } else {
87 Distribution::UpstreamHashShard(distribution_key, core.table_catalog.id)
89 }
90 }
91 None => Distribution::SomeShard,
92 }
93 };
94
95 let base = PlanBase::new_stream_with_core(
96 &core,
97 distribution,
98 if core.append_only() {
99 StreamKind::AppendOnly
100 } else {
101 StreamKind::Retract
102 },
103 false,
104 core.watermark_columns(),
105 MonotonicityMap::new(),
106 );
107 Self {
108 base,
109 core,
110 batch_plan_id,
111 stream_scan_type,
112 pk_scan_range,
113 }
114 }
115
116 pub fn table_name(&self) -> &str {
117 self.core.table_name()
118 }
119
120 pub fn core(&self) -> &generic::TableScan {
121 &self.core
122 }
123
124 pub fn to_index_scan(
125 &self,
126 index_table_catalog: Arc<TableCatalog>,
127 primary_to_secondary_mapping: &BTreeMap<usize, usize>,
128 function_mapping: &HashMap<FunctionCall, usize>,
129 stream_scan_type: StreamScanType,
130 ) -> StreamTableScan {
131 let logical_index_scan = self.core.to_index_scan(
132 index_table_catalog,
133 primary_to_secondary_mapping,
134 function_mapping,
135 );
136 logical_index_scan
137 .distribution_key()
138 .expect("distribution key of stream chain must exist in output columns");
139 StreamTableScan::new_with_stream_scan_type(logical_index_scan, stream_scan_type)
140 }
141
142 pub fn stream_scan_type(&self) -> StreamScanType {
143 self.stream_scan_type
144 }
145
146 pub fn pk_scan_range(&self) -> Option<&ScanRange> {
147 self.pk_scan_range.as_ref()
148 }
149
150 fn get_upstream_state_table(&self) -> &TableCatalog {
152 self.core.table_catalog.as_ref()
153 }
154
155 pub fn build_backfill_state_catalog(
199 &self,
200 state: &mut BuildFragmentGraphState,
201 stream_scan_type: StreamScanType,
202 ) -> TableCatalog {
203 let mut catalog_builder = TableCatalogBuilder::default();
204 let upstream_schema = &self.core.get_table_columns();
205
206 catalog_builder.add_column(&Field::with_name(
209 VirtualNode::RW_TYPE,
210 Self::VNODE_COLUMN_NAME,
211 ));
212 catalog_builder.add_order_column(0, OrderType::ascending());
213
214 match stream_scan_type {
215 StreamScanType::Chain
216 | StreamScanType::Rearrange
217 | StreamScanType::Backfill
218 | StreamScanType::UpstreamOnly
219 | StreamScanType::ArrangementBackfill => {
220 for col_order in self.core.primary_key() {
222 let col = &upstream_schema[col_order.column_index];
223 catalog_builder.add_column(&Field::from(&**col));
224 }
225
226 catalog_builder.add_column(&Field::with_name(
228 DataType::Boolean,
229 Self::BACKFILL_FINISHED_COLUMN_NAME,
230 ));
231
232 catalog_builder.add_column(&Field::with_name(
234 DataType::Int64,
235 Self::ROW_COUNT_COLUMN_NAME,
236 ));
237 }
238 StreamScanType::SnapshotBackfill | StreamScanType::CrossDbSnapshotBackfill => {
239 catalog_builder
241 .add_column(&Field::with_name(DataType::Int64, Self::EPOCH_COLUMN_NAME));
242
243 catalog_builder.add_column(&Field::with_name(
245 DataType::Int64,
246 Self::ROW_COUNT_COLUMN_NAME,
247 ));
248
249 catalog_builder.add_column(&Field::with_name(
251 DataType::Boolean,
252 Self::IS_EPOCH_FINISHED_COLUMN_NAME,
253 ));
254
255 for col_order in self.core.primary_key() {
257 let col = &upstream_schema[col_order.column_index];
258 catalog_builder.add_column(&Field::from(&col.column_desc));
259 }
260 }
261 StreamScanType::Unspecified => {
262 unreachable!()
263 }
264 }
265
266 catalog_builder.set_vnode_col_idx(0);
268 catalog_builder.set_dist_key_in_pk(vec![0]);
269
270 let num_of_columns = catalog_builder.columns().len();
271 catalog_builder.set_value_indices((1..num_of_columns).collect_vec());
272
273 catalog_builder
274 .build(vec![0], 1)
275 .with_id(state.gen_table_id_wrapped())
276 }
277}
278
279impl_plan_tree_node_for_leaf! { Stream, StreamTableScan }
280
281impl Distill for StreamTableScan {
282 fn distill<'a>(&self) -> XmlNode<'a> {
283 let verbose = self.base.ctx().is_explain_verbose();
284 let mut vec = Vec::with_capacity(4);
285 vec.push(("table", Pretty::from(self.core.table_name().to_owned())));
286 vec.push(("columns", self.core.columns_pretty(verbose)));
287 if let Some(scan_range) = &self.pk_scan_range {
288 let mut parts = Vec::new();
289 let pk_cols = self.core.primary_key();
290 for (pk, datum) in pk_cols
292 .iter()
293 .take(scan_range.eq_conds.len())
294 .zip_eq_fast(scan_range.eq_conds.iter())
295 {
296 let field = &self.core.table_catalog.columns()[pk.column_index];
297 parts.push(format!("{} = {:?}", field.name(), datum));
298 }
299 let range_col_idx = scan_range.eq_conds.len();
301 if range_col_idx < pk_cols.len() {
302 use std::ops::Bound;
303 let field = &self.core.table_catalog.columns()[pk_cols[range_col_idx].column_index];
304 let fmt_bound_val = |v: &Vec<risingwave_common::types::Datum>| -> String {
305 v.first().map_or("NULL".to_owned(), |d| format!("{:?}", d))
306 };
307 match &scan_range.range.0 {
308 Bound::Included(v) => {
309 parts.push(format!("{} >= {}", field.name(), fmt_bound_val(v)))
310 }
311 Bound::Excluded(v) => {
312 parts.push(format!("{} > {}", field.name(), fmt_bound_val(v)))
313 }
314 Bound::Unbounded => {}
315 }
316 match &scan_range.range.1 {
317 Bound::Included(v) => {
318 parts.push(format!("{} <= {}", field.name(), fmt_bound_val(v)))
319 }
320 Bound::Excluded(v) => {
321 parts.push(format!("{} < {}", field.name(), fmt_bound_val(v)))
322 }
323 Bound::Unbounded => {}
324 }
325 }
326 if !parts.is_empty() {
327 vec.push(("pk_scan_range", Pretty::from(parts.join(" AND "))));
328 }
329 }
330
331 if verbose {
332 vec.push(("stream_scan_type", Pretty::debug(&self.stream_scan_type)));
333 let stream_key = IndicesDisplay {
334 indices: self.stream_key().unwrap_or_default(),
335 schema: self.base.schema(),
336 };
337 vec.push(("stream_key", stream_key.distill()));
338 let pk = IndicesDisplay {
339 indices: &self
340 .core
341 .primary_key()
342 .iter()
343 .map(|x| x.column_index)
344 .collect_vec(),
345 schema: &self.core.table_catalog.column_schema(),
346 };
347 vec.push(("pk", pk.distill()));
348 let dist = Pretty::display(&DistributionDisplay {
349 distribution: self.distribution(),
350 input_schema: self.base.schema(),
351 });
352 vec.push(("dist", dist));
353 }
354
355 childless_record("StreamTableScan", vec)
356 }
357}
358
359impl StreamNode for StreamTableScan {
360 fn to_stream_prost_body(&self, _state: &mut BuildFragmentGraphState) -> PbNodeBody {
361 unreachable!(
362 "stream scan cannot be converted into a prost body -- call `adhoc_to_stream_prost` instead."
363 )
364 }
365}
366
367impl StreamTableScan {
368 pub fn adhoc_to_stream_prost(
369 &self,
370 state: &mut BuildFragmentGraphState,
371 ) -> SchedulerResult<PbStreamNode> {
372 use risingwave_pb::stream_plan::*;
373
374 let stream_key = self
375 .stream_key()
376 .unwrap_or(&[])
377 .iter()
378 .map(|x| *x as u32)
379 .collect_vec();
380
381 let upstream_column_ids = match self.stream_scan_type {
383 StreamScanType::Backfill
385 | StreamScanType::ArrangementBackfill
386 | StreamScanType::SnapshotBackfill
387 | StreamScanType::CrossDbSnapshotBackfill => self.core.output_and_pk_column_ids(),
388 StreamScanType::Chain | StreamScanType::Rearrange | StreamScanType::UpstreamOnly => {
389 self.core.output_column_ids()
390 }
391 StreamScanType::Unspecified => unreachable!(),
392 }
393 .iter()
394 .map(ColumnId::get_id)
395 .collect_vec();
396
397 let snapshot_schema = upstream_column_ids
399 .iter()
400 .map(|&id| {
401 let col = self
402 .core
403 .get_table_columns()
404 .iter()
405 .find(|c| c.column_id.get_id() == id)
406 .unwrap();
407 Field::from(&col.column_desc).to_prost()
408 })
409 .collect_vec();
410
411 let upstream_schema = snapshot_schema.clone();
412
413 let batch_plan_node = BatchPlanNode {
415 table_desc: Some(self.core.table_catalog.table_desc().try_to_protobuf()?),
416 column_ids: upstream_column_ids.clone(),
417 };
418
419 let catalog = self
420 .build_backfill_state_catalog(state, self.stream_scan_type)
421 .to_internal_table_prost();
422
423 let output_indices = self
427 .core
428 .output_column_ids()
429 .iter()
430 .map(|i| {
431 upstream_column_ids
432 .iter()
433 .position(|&x| x == i.get_id())
434 .unwrap() as u32
435 })
436 .collect_vec();
437
438 let arrangement_table = if self.stream_scan_type == StreamScanType::ArrangementBackfill {
439 let upstream_table_catalog = self.get_upstream_state_table();
440 Some(upstream_table_catalog.to_internal_table_prost())
441 } else {
442 None
443 };
444
445 let input = if self.stream_scan_type == StreamScanType::CrossDbSnapshotBackfill {
446 vec![]
447 } else {
448 vec![
449 PbStreamNode {
452 node_body: Some(PbNodeBody::Merge(Default::default())),
453 identity: "Upstream".into(),
454 fields: upstream_schema,
455 stream_key: vec![], ..Default::default()
457 },
458 PbStreamNode {
460 node_body: Some(PbNodeBody::BatchPlan(Box::new(batch_plan_node))),
461 operator_id: self.batch_plan_id.to_stream_node_operator_id(),
462 identity: "BatchPlanNode".into(),
463 fields: snapshot_schema,
464 stream_key: vec![], input: vec![],
466 stream_kind: PbStreamKind::AppendOnly as i32,
467 },
468 ]
469 };
470
471 let node_body = PbNodeBody::StreamScan(Box::new(StreamScanNode {
472 table_id: self.core.table_catalog.id,
473 stream_scan_type: self.stream_scan_type as i32,
474 output_indices,
476 upstream_column_ids,
477 table_desc: Some(self.core.table_catalog.table_desc().try_to_protobuf()?),
479 state_table: Some(catalog),
480 arrangement_table,
481 rate_limit: self.base.ctx().overwrite_options().backfill_rate_limit,
482 pk_scan_range: self.pk_scan_range.as_ref().map(|sr| sr.to_protobuf()),
483 ..Default::default()
484 }));
485
486 Ok(PbStreamNode {
487 fields: self.schema().to_prost(),
488 input,
489 node_body: Some(node_body),
490 stream_key,
491 operator_id: self.base.id().to_stream_node_operator_id(),
492 identity: self.distill_to_string(),
493 stream_kind: self.stream_kind().to_protobuf() as i32,
494 })
495 }
496}
497
498impl ExprRewritable<Stream> for StreamTableScan {
499 fn has_rewritable_expr(&self) -> bool {
500 true
501 }
502
503 fn rewrite_exprs(&self, r: &mut dyn ExprRewriter) -> PlanRef {
504 let mut core = self.core.clone();
505 core.rewrite_exprs(r);
506 Self::new_with_scan_range(core, self.stream_scan_type, self.pk_scan_range.clone()).into()
507 }
508}
509
510impl ExprVisitable for StreamTableScan {
511 fn visit_exprs(&self, v: &mut dyn ExprVisitor) {
512 self.core.visit_exprs(v);
513 }
514}