risingwave_frontend/optimizer/plan_node/
stream_table_scan.rs1use std::collections::{BTreeMap, HashMap};
16use std::sync::Arc;
17
18use itertools::Itertools;
19use pretty_xmlish::{Pretty, XmlNode};
20use risingwave_common::catalog::Field;
21use risingwave_common::hash::VirtualNode;
22use risingwave_common::types::DataType;
23use risingwave_common::util::sort_util::OrderType;
24use risingwave_pb::stream_plan::stream_node::PbNodeBody;
25use risingwave_pb::stream_plan::{PbStreamNode, StreamScanType};
26
27use super::stream::prelude::*;
28use super::utils::{Distill, childless_record};
29use super::{ExprRewritable, PlanBase, PlanNodeId, StreamNode, StreamPlanRef as PlanRef, generic};
30use crate::TableCatalog;
31use crate::catalog::ColumnId;
32use crate::expr::{ExprRewriter, ExprVisitor, FunctionCall};
33use crate::optimizer::plan_node::expr_visitable::ExprVisitable;
34use crate::optimizer::plan_node::utils::{IndicesDisplay, TableCatalogBuilder};
35use crate::optimizer::property::{Distribution, DistributionDisplay, MonotonicityMap};
36use crate::scheduler::SchedulerResult;
37use crate::stream_fragmenter::BuildFragmentGraphState;
38
39#[derive(Debug, Clone, PartialEq, Eq, Hash)]
43pub struct StreamTableScan {
44 pub base: PlanBase<Stream>,
45 core: generic::TableScan,
46 batch_plan_id: PlanNodeId,
47 stream_scan_type: StreamScanType,
48}
49
50impl StreamTableScan {
51 pub const BACKFILL_FINISHED_COLUMN_NAME: &str = "backfill_finished";
52 pub const EPOCH_COLUMN_NAME: &str = "epoch";
53 pub const IS_EPOCH_FINISHED_COLUMN_NAME: &str = "is_epoch_finished";
54 pub const ROW_COUNT_COLUMN_NAME: &str = "row_count";
55 pub const VNODE_COLUMN_NAME: &str = "vnode";
56
57 pub fn new_with_stream_scan_type(
58 core: generic::TableScan,
59 stream_scan_type: StreamScanType,
60 ) -> Self {
61 let batch_plan_id = core.ctx.next_plan_node_id();
62
63 let distribution = {
64 match core.distribution_key() {
65 Some(distribution_key) => {
66 if distribution_key.is_empty() {
67 Distribution::Single
68 } else {
69 Distribution::UpstreamHashShard(distribution_key, core.table_catalog.id)
71 }
72 }
73 None => Distribution::SomeShard,
74 }
75 };
76
77 let base = PlanBase::new_stream_with_core(
78 &core,
79 distribution,
80 if core.append_only() {
81 StreamKind::AppendOnly
82 } else {
83 StreamKind::Retract
84 },
85 false,
86 core.watermark_columns(),
87 MonotonicityMap::new(),
88 );
89 Self {
90 base,
91 core,
92 batch_plan_id,
93 stream_scan_type,
94 }
95 }
96
97 pub fn table_name(&self) -> &str {
98 self.core.table_name()
99 }
100
101 pub fn core(&self) -> &generic::TableScan {
102 &self.core
103 }
104
105 pub fn to_index_scan(
106 &self,
107 index_table_catalog: Arc<TableCatalog>,
108 primary_to_secondary_mapping: &BTreeMap<usize, usize>,
109 function_mapping: &HashMap<FunctionCall, usize>,
110 stream_scan_type: StreamScanType,
111 ) -> StreamTableScan {
112 let logical_index_scan = self.core.to_index_scan(
113 index_table_catalog,
114 primary_to_secondary_mapping,
115 function_mapping,
116 );
117 logical_index_scan
118 .distribution_key()
119 .expect("distribution key of stream chain must exist in output columns");
120 StreamTableScan::new_with_stream_scan_type(logical_index_scan, stream_scan_type)
121 }
122
123 pub fn stream_scan_type(&self) -> StreamScanType {
124 self.stream_scan_type
125 }
126
127 fn get_upstream_state_table(&self) -> &TableCatalog {
129 self.core.table_catalog.as_ref()
130 }
131
132 pub fn build_backfill_state_catalog(
176 &self,
177 state: &mut BuildFragmentGraphState,
178 stream_scan_type: StreamScanType,
179 ) -> TableCatalog {
180 let mut catalog_builder = TableCatalogBuilder::default();
181 let upstream_schema = &self.core.get_table_columns();
182
183 catalog_builder.add_column(&Field::with_name(
186 VirtualNode::RW_TYPE,
187 Self::VNODE_COLUMN_NAME,
188 ));
189 catalog_builder.add_order_column(0, OrderType::ascending());
190
191 match stream_scan_type {
192 StreamScanType::Chain
193 | StreamScanType::Rearrange
194 | StreamScanType::Backfill
195 | StreamScanType::UpstreamOnly
196 | StreamScanType::ArrangementBackfill => {
197 for col_order in self.core.primary_key() {
199 let col = &upstream_schema[col_order.column_index];
200 catalog_builder.add_column(&Field::from(&**col));
201 }
202
203 catalog_builder.add_column(&Field::with_name(
205 DataType::Boolean,
206 Self::BACKFILL_FINISHED_COLUMN_NAME,
207 ));
208
209 catalog_builder.add_column(&Field::with_name(
211 DataType::Int64,
212 Self::ROW_COUNT_COLUMN_NAME,
213 ));
214 }
215 StreamScanType::SnapshotBackfill | StreamScanType::CrossDbSnapshotBackfill => {
216 catalog_builder
218 .add_column(&Field::with_name(DataType::Int64, Self::EPOCH_COLUMN_NAME));
219
220 catalog_builder.add_column(&Field::with_name(
222 DataType::Int64,
223 Self::ROW_COUNT_COLUMN_NAME,
224 ));
225
226 catalog_builder.add_column(&Field::with_name(
228 DataType::Boolean,
229 Self::IS_EPOCH_FINISHED_COLUMN_NAME,
230 ));
231
232 for col_order in self.core.primary_key() {
234 let col = &upstream_schema[col_order.column_index];
235 catalog_builder.add_column(&Field::from(&col.column_desc));
236 }
237 }
238 StreamScanType::Unspecified => {
239 unreachable!()
240 }
241 }
242
243 catalog_builder.set_vnode_col_idx(0);
245 catalog_builder.set_dist_key_in_pk(vec![0]);
246
247 let num_of_columns = catalog_builder.columns().len();
248 catalog_builder.set_value_indices((1..num_of_columns).collect_vec());
249
250 catalog_builder
251 .build(vec![0], 1)
252 .with_id(state.gen_table_id_wrapped())
253 }
254}
255
256impl_plan_tree_node_for_leaf! { Stream, StreamTableScan }
257
258impl Distill for StreamTableScan {
259 fn distill<'a>(&self) -> XmlNode<'a> {
260 let verbose = self.base.ctx().is_explain_verbose();
261 let mut vec = Vec::with_capacity(4);
262 vec.push(("table", Pretty::from(self.core.table_name().to_owned())));
263 vec.push(("columns", self.core.columns_pretty(verbose)));
264
265 if verbose {
266 vec.push(("stream_scan_type", Pretty::debug(&self.stream_scan_type)));
267 let stream_key = IndicesDisplay {
268 indices: self.stream_key().unwrap_or_default(),
269 schema: self.base.schema(),
270 };
271 vec.push(("stream_key", stream_key.distill()));
272 let pk = IndicesDisplay {
273 indices: &self
274 .core
275 .primary_key()
276 .iter()
277 .map(|x| x.column_index)
278 .collect_vec(),
279 schema: &self.core.table_catalog.column_schema(),
280 };
281 vec.push(("pk", pk.distill()));
282 let dist = Pretty::display(&DistributionDisplay {
283 distribution: self.distribution(),
284 input_schema: self.base.schema(),
285 });
286 vec.push(("dist", dist));
287 }
288
289 childless_record("StreamTableScan", vec)
290 }
291}
292
293impl StreamNode for StreamTableScan {
294 fn to_stream_prost_body(&self, _state: &mut BuildFragmentGraphState) -> PbNodeBody {
295 unreachable!(
296 "stream scan cannot be converted into a prost body -- call `adhoc_to_stream_prost` instead."
297 )
298 }
299}
300
301impl StreamTableScan {
302 pub fn adhoc_to_stream_prost(
303 &self,
304 state: &mut BuildFragmentGraphState,
305 ) -> SchedulerResult<PbStreamNode> {
306 use risingwave_pb::stream_plan::*;
307
308 let stream_key = self
309 .stream_key()
310 .unwrap_or(&[])
311 .iter()
312 .map(|x| *x as u32)
313 .collect_vec();
314
315 let upstream_column_ids = match self.stream_scan_type {
317 StreamScanType::Backfill
319 | StreamScanType::ArrangementBackfill
320 | StreamScanType::SnapshotBackfill
321 | StreamScanType::CrossDbSnapshotBackfill => self.core.output_and_pk_column_ids(),
322 StreamScanType::Chain | StreamScanType::Rearrange | StreamScanType::UpstreamOnly => {
323 self.core.output_column_ids()
324 }
325 StreamScanType::Unspecified => unreachable!(),
326 }
327 .iter()
328 .map(ColumnId::get_id)
329 .collect_vec();
330
331 let snapshot_schema = upstream_column_ids
333 .iter()
334 .map(|&id| {
335 let col = self
336 .core
337 .get_table_columns()
338 .iter()
339 .find(|c| c.column_id.get_id() == id)
340 .unwrap();
341 Field::from(&col.column_desc).to_prost()
342 })
343 .collect_vec();
344
345 let upstream_schema = snapshot_schema.clone();
346
347 let batch_plan_node = BatchPlanNode {
349 table_desc: Some(self.core.table_catalog.table_desc().try_to_protobuf()?),
350 column_ids: upstream_column_ids.clone(),
351 };
352
353 let catalog = self
354 .build_backfill_state_catalog(state, self.stream_scan_type)
355 .to_internal_table_prost();
356
357 let output_indices = self
361 .core
362 .output_column_ids()
363 .iter()
364 .map(|i| {
365 upstream_column_ids
366 .iter()
367 .position(|&x| x == i.get_id())
368 .unwrap() as u32
369 })
370 .collect_vec();
371
372 let arrangement_table = if self.stream_scan_type == StreamScanType::ArrangementBackfill {
373 let upstream_table_catalog = self.get_upstream_state_table();
374 Some(upstream_table_catalog.to_internal_table_prost())
375 } else {
376 None
377 };
378
379 let input = if self.stream_scan_type == StreamScanType::CrossDbSnapshotBackfill {
380 vec![]
381 } else {
382 vec![
383 PbStreamNode {
386 node_body: Some(PbNodeBody::Merge(Default::default())),
387 identity: "Upstream".into(),
388 fields: upstream_schema.clone(),
389 stream_key: vec![], ..Default::default()
391 },
392 PbStreamNode {
394 node_body: Some(PbNodeBody::BatchPlan(Box::new(batch_plan_node))),
395 operator_id: self.batch_plan_id.0 as u64,
396 identity: "BatchPlanNode".into(),
397 fields: snapshot_schema,
398 stream_key: vec![], input: vec![],
400 append_only: true,
401 },
402 ]
403 };
404
405 let node_body = PbNodeBody::StreamScan(Box::new(StreamScanNode {
406 table_id: self.core.table_catalog.id.table_id,
407 stream_scan_type: self.stream_scan_type as i32,
408 output_indices,
410 upstream_column_ids,
411 table_desc: Some(self.core.table_catalog.table_desc().try_to_protobuf()?),
413 state_table: Some(catalog),
414 arrangement_table,
415 rate_limit: self.base.ctx().overwrite_options().backfill_rate_limit,
416 ..Default::default()
417 }));
418
419 Ok(PbStreamNode {
420 fields: self.schema().to_prost(),
421 input,
422 node_body: Some(node_body),
423 stream_key,
424 operator_id: self.base.id().0 as u64,
425 identity: self.distill_to_string(),
426 append_only: self.append_only(),
427 })
428 }
429}
430
431impl ExprRewritable<Stream> for StreamTableScan {
432 fn has_rewritable_expr(&self) -> bool {
433 true
434 }
435
436 fn rewrite_exprs(&self, r: &mut dyn ExprRewriter) -> PlanRef {
437 let mut core = self.core.clone();
438 core.rewrite_exprs(r);
439 Self::new_with_stream_scan_type(core, self.stream_scan_type).into()
440 }
441}
442
443impl ExprVisitable for StreamTableScan {
444 fn visit_exprs(&self, v: &mut dyn ExprVisitor) {
445 self.core.visit_exprs(v);
446 }
447}