risingwave_frontend/optimizer/plan_node/
stream_table_scan.rs1use std::collections::{BTreeMap, HashMap};
16use std::sync::Arc;
17
18use itertools::Itertools;
19use pretty_xmlish::{Pretty, XmlNode};
20use risingwave_common::catalog::Field;
21use risingwave_common::hash::VirtualNode;
22use risingwave_common::types::DataType;
23use risingwave_common::util::sort_util::OrderType;
24use risingwave_pb::stream_plan::stream_node::{PbNodeBody, PbStreamKind};
25use risingwave_pb::stream_plan::{PbStreamNode, StreamScanType};
26
27use super::stream::prelude::*;
28use super::utils::{Distill, childless_record};
29use super::{ExprRewritable, PlanBase, PlanNodeId, StreamNode, StreamPlanRef as PlanRef, generic};
30use crate::TableCatalog;
31use crate::catalog::ColumnId;
32use crate::expr::{ExprRewriter, ExprVisitor, FunctionCall};
33use crate::optimizer::plan_node::expr_visitable::ExprVisitable;
34use crate::optimizer::plan_node::utils::{IndicesDisplay, TableCatalogBuilder};
35use crate::optimizer::property::{Distribution, DistributionDisplay, MonotonicityMap};
36use crate::scheduler::SchedulerResult;
37use crate::stream_fragmenter::BuildFragmentGraphState;
38
39#[derive(Debug, Clone, PartialEq, Eq, Hash)]
43pub struct StreamTableScan {
44 pub base: PlanBase<Stream>,
45 core: generic::TableScan,
46 batch_plan_id: PlanNodeId,
47 stream_scan_type: StreamScanType,
48}
49
50impl StreamTableScan {
51 pub const BACKFILL_FINISHED_COLUMN_NAME: &str = "backfill_finished";
52 pub const EPOCH_COLUMN_NAME: &str = "epoch";
53 pub const IS_EPOCH_FINISHED_COLUMN_NAME: &str = "is_epoch_finished";
54 pub const ROW_COUNT_COLUMN_NAME: &str = "row_count";
55 pub const VNODE_COLUMN_NAME: &str = "vnode";
56
57 pub fn new_with_stream_scan_type(
58 core: generic::TableScan,
59 stream_scan_type: StreamScanType,
60 ) -> Self {
61 let batch_plan_id = core.ctx.next_plan_node_id();
62
63 let mut stream_scan_type = stream_scan_type;
64 if core.cross_database() {
65 assert_ne!(stream_scan_type, StreamScanType::UpstreamOnly);
66 stream_scan_type = StreamScanType::CrossDbSnapshotBackfill;
68 }
69
70 let distribution = {
71 match core.distribution_key() {
72 Some(distribution_key) => {
73 if distribution_key.is_empty() {
74 Distribution::Single
75 } else {
76 Distribution::UpstreamHashShard(distribution_key, core.table_catalog.id)
78 }
79 }
80 None => Distribution::SomeShard,
81 }
82 };
83
84 let base = PlanBase::new_stream_with_core(
85 &core,
86 distribution,
87 if core.append_only() {
88 StreamKind::AppendOnly
89 } else {
90 StreamKind::Retract
91 },
92 false,
93 core.watermark_columns(),
94 MonotonicityMap::new(),
95 );
96 Self {
97 base,
98 core,
99 batch_plan_id,
100 stream_scan_type,
101 }
102 }
103
104 pub fn table_name(&self) -> &str {
105 self.core.table_name()
106 }
107
108 pub fn core(&self) -> &generic::TableScan {
109 &self.core
110 }
111
112 pub fn to_index_scan(
113 &self,
114 index_table_catalog: Arc<TableCatalog>,
115 primary_to_secondary_mapping: &BTreeMap<usize, usize>,
116 function_mapping: &HashMap<FunctionCall, usize>,
117 stream_scan_type: StreamScanType,
118 ) -> StreamTableScan {
119 let logical_index_scan = self.core.to_index_scan(
120 index_table_catalog,
121 primary_to_secondary_mapping,
122 function_mapping,
123 );
124 logical_index_scan
125 .distribution_key()
126 .expect("distribution key of stream chain must exist in output columns");
127 StreamTableScan::new_with_stream_scan_type(logical_index_scan, stream_scan_type)
128 }
129
130 pub fn stream_scan_type(&self) -> StreamScanType {
131 self.stream_scan_type
132 }
133
134 fn get_upstream_state_table(&self) -> &TableCatalog {
136 self.core.table_catalog.as_ref()
137 }
138
139 pub fn build_backfill_state_catalog(
183 &self,
184 state: &mut BuildFragmentGraphState,
185 stream_scan_type: StreamScanType,
186 ) -> TableCatalog {
187 let mut catalog_builder = TableCatalogBuilder::default();
188 let upstream_schema = &self.core.get_table_columns();
189
190 catalog_builder.add_column(&Field::with_name(
193 VirtualNode::RW_TYPE,
194 Self::VNODE_COLUMN_NAME,
195 ));
196 catalog_builder.add_order_column(0, OrderType::ascending());
197
198 match stream_scan_type {
199 StreamScanType::Chain
200 | StreamScanType::Rearrange
201 | StreamScanType::Backfill
202 | StreamScanType::UpstreamOnly
203 | StreamScanType::ArrangementBackfill => {
204 for col_order in self.core.primary_key() {
206 let col = &upstream_schema[col_order.column_index];
207 catalog_builder.add_column(&Field::from(&**col));
208 }
209
210 catalog_builder.add_column(&Field::with_name(
212 DataType::Boolean,
213 Self::BACKFILL_FINISHED_COLUMN_NAME,
214 ));
215
216 catalog_builder.add_column(&Field::with_name(
218 DataType::Int64,
219 Self::ROW_COUNT_COLUMN_NAME,
220 ));
221 }
222 StreamScanType::SnapshotBackfill | StreamScanType::CrossDbSnapshotBackfill => {
223 catalog_builder
225 .add_column(&Field::with_name(DataType::Int64, Self::EPOCH_COLUMN_NAME));
226
227 catalog_builder.add_column(&Field::with_name(
229 DataType::Int64,
230 Self::ROW_COUNT_COLUMN_NAME,
231 ));
232
233 catalog_builder.add_column(&Field::with_name(
235 DataType::Boolean,
236 Self::IS_EPOCH_FINISHED_COLUMN_NAME,
237 ));
238
239 for col_order in self.core.primary_key() {
241 let col = &upstream_schema[col_order.column_index];
242 catalog_builder.add_column(&Field::from(&col.column_desc));
243 }
244 }
245 StreamScanType::Unspecified => {
246 unreachable!()
247 }
248 }
249
250 catalog_builder.set_vnode_col_idx(0);
252 catalog_builder.set_dist_key_in_pk(vec![0]);
253
254 let num_of_columns = catalog_builder.columns().len();
255 catalog_builder.set_value_indices((1..num_of_columns).collect_vec());
256
257 catalog_builder
258 .build(vec![0], 1)
259 .with_id(state.gen_table_id_wrapped())
260 }
261}
262
263impl_plan_tree_node_for_leaf! { Stream, StreamTableScan }
264
265impl Distill for StreamTableScan {
266 fn distill<'a>(&self) -> XmlNode<'a> {
267 let verbose = self.base.ctx().is_explain_verbose();
268 let mut vec = Vec::with_capacity(4);
269 vec.push(("table", Pretty::from(self.core.table_name().to_owned())));
270 vec.push(("columns", self.core.columns_pretty(verbose)));
271
272 if verbose {
273 vec.push(("stream_scan_type", Pretty::debug(&self.stream_scan_type)));
274 let stream_key = IndicesDisplay {
275 indices: self.stream_key().unwrap_or_default(),
276 schema: self.base.schema(),
277 };
278 vec.push(("stream_key", stream_key.distill()));
279 let pk = IndicesDisplay {
280 indices: &self
281 .core
282 .primary_key()
283 .iter()
284 .map(|x| x.column_index)
285 .collect_vec(),
286 schema: &self.core.table_catalog.column_schema(),
287 };
288 vec.push(("pk", pk.distill()));
289 let dist = Pretty::display(&DistributionDisplay {
290 distribution: self.distribution(),
291 input_schema: self.base.schema(),
292 });
293 vec.push(("dist", dist));
294 }
295
296 childless_record("StreamTableScan", vec)
297 }
298}
299
300impl StreamNode for StreamTableScan {
301 fn to_stream_prost_body(&self, _state: &mut BuildFragmentGraphState) -> PbNodeBody {
302 unreachable!(
303 "stream scan cannot be converted into a prost body -- call `adhoc_to_stream_prost` instead."
304 )
305 }
306}
307
308impl StreamTableScan {
309 pub fn adhoc_to_stream_prost(
310 &self,
311 state: &mut BuildFragmentGraphState,
312 ) -> SchedulerResult<PbStreamNode> {
313 use risingwave_pb::stream_plan::*;
314
315 let stream_key = self
316 .stream_key()
317 .unwrap_or(&[])
318 .iter()
319 .map(|x| *x as u32)
320 .collect_vec();
321
322 let upstream_column_ids = match self.stream_scan_type {
324 StreamScanType::Backfill
326 | StreamScanType::ArrangementBackfill
327 | StreamScanType::SnapshotBackfill
328 | StreamScanType::CrossDbSnapshotBackfill => self.core.output_and_pk_column_ids(),
329 StreamScanType::Chain | StreamScanType::Rearrange | StreamScanType::UpstreamOnly => {
330 self.core.output_column_ids()
331 }
332 StreamScanType::Unspecified => unreachable!(),
333 }
334 .iter()
335 .map(ColumnId::get_id)
336 .collect_vec();
337
338 let snapshot_schema = upstream_column_ids
340 .iter()
341 .map(|&id| {
342 let col = self
343 .core
344 .get_table_columns()
345 .iter()
346 .find(|c| c.column_id.get_id() == id)
347 .unwrap();
348 Field::from(&col.column_desc).to_prost()
349 })
350 .collect_vec();
351
352 let upstream_schema = snapshot_schema.clone();
353
354 let batch_plan_node = BatchPlanNode {
356 table_desc: Some(self.core.table_catalog.table_desc().try_to_protobuf()?),
357 column_ids: upstream_column_ids.clone(),
358 };
359
360 let catalog = self
361 .build_backfill_state_catalog(state, self.stream_scan_type)
362 .to_internal_table_prost();
363
364 let output_indices = self
368 .core
369 .output_column_ids()
370 .iter()
371 .map(|i| {
372 upstream_column_ids
373 .iter()
374 .position(|&x| x == i.get_id())
375 .unwrap() as u32
376 })
377 .collect_vec();
378
379 let arrangement_table = if self.stream_scan_type == StreamScanType::ArrangementBackfill {
380 let upstream_table_catalog = self.get_upstream_state_table();
381 Some(upstream_table_catalog.to_internal_table_prost())
382 } else {
383 None
384 };
385
386 let input = if self.stream_scan_type == StreamScanType::CrossDbSnapshotBackfill {
387 vec![]
388 } else {
389 vec![
390 PbStreamNode {
393 node_body: Some(PbNodeBody::Merge(Default::default())),
394 identity: "Upstream".into(),
395 fields: upstream_schema.clone(),
396 stream_key: vec![], ..Default::default()
398 },
399 PbStreamNode {
401 node_body: Some(PbNodeBody::BatchPlan(Box::new(batch_plan_node))),
402 operator_id: self.batch_plan_id.0 as u64,
403 identity: "BatchPlanNode".into(),
404 fields: snapshot_schema,
405 stream_key: vec![], input: vec![],
407 stream_kind: PbStreamKind::AppendOnly as i32,
408 },
409 ]
410 };
411
412 let node_body = PbNodeBody::StreamScan(Box::new(StreamScanNode {
413 table_id: self.core.table_catalog.id.table_id,
414 stream_scan_type: self.stream_scan_type as i32,
415 output_indices,
417 upstream_column_ids,
418 table_desc: Some(self.core.table_catalog.table_desc().try_to_protobuf()?),
420 state_table: Some(catalog),
421 arrangement_table,
422 rate_limit: self.base.ctx().overwrite_options().backfill_rate_limit,
423 ..Default::default()
424 }));
425
426 Ok(PbStreamNode {
427 fields: self.schema().to_prost(),
428 input,
429 node_body: Some(node_body),
430 stream_key,
431 operator_id: self.base.id().0 as u64,
432 identity: self.distill_to_string(),
433 stream_kind: self.stream_kind().to_protobuf() as i32,
434 })
435 }
436}
437
438impl ExprRewritable<Stream> for StreamTableScan {
439 fn has_rewritable_expr(&self) -> bool {
440 true
441 }
442
443 fn rewrite_exprs(&self, r: &mut dyn ExprRewriter) -> PlanRef {
444 let mut core = self.core.clone();
445 core.rewrite_exprs(r);
446 Self::new_with_stream_scan_type(core, self.stream_scan_type).into()
447 }
448}
449
450impl ExprVisitable for StreamTableScan {
451 fn visit_exprs(&self, v: &mut dyn ExprVisitor) {
452 self.core.visit_exprs(v);
453 }
454}