risingwave_frontend/optimizer/plan_node/
stream_table_scan.rs1use std::collections::{BTreeMap, HashMap};
16use std::sync::Arc;
17
18use itertools::Itertools;
19use pretty_xmlish::{Pretty, XmlNode};
20use risingwave_common::catalog::Field;
21use risingwave_common::hash::VirtualNode;
22use risingwave_common::types::DataType;
23use risingwave_common::util::sort_util::OrderType;
24use risingwave_pb::stream_plan::stream_node::PbNodeBody;
25use risingwave_pb::stream_plan::{PbStreamNode, StreamScanType};
26
27use super::stream::prelude::*;
28use super::utils::{Distill, childless_record};
29use super::{ExprRewritable, PlanBase, PlanNodeId, PlanRef, StreamNode, generic};
30use crate::TableCatalog;
31use crate::catalog::ColumnId;
32use crate::expr::{ExprRewriter, ExprVisitor, FunctionCall};
33use crate::optimizer::plan_node::expr_visitable::ExprVisitable;
34use crate::optimizer::plan_node::utils::{IndicesDisplay, TableCatalogBuilder};
35use crate::optimizer::property::{Distribution, DistributionDisplay, MonotonicityMap};
36use crate::scheduler::SchedulerResult;
37use crate::stream_fragmenter::BuildFragmentGraphState;
38
39#[derive(Debug, Clone, PartialEq, Eq, Hash)]
43pub struct StreamTableScan {
44 pub base: PlanBase<Stream>,
45 core: generic::TableScan,
46 batch_plan_id: PlanNodeId,
47 stream_scan_type: StreamScanType,
48}
49
50impl StreamTableScan {
51 pub const BACKFILL_FINISHED_COLUMN_NAME: &str = "backfill_finished";
52 pub const EPOCH_COLUMN_NAME: &str = "epoch";
53 pub const IS_EPOCH_FINISHED_COLUMN_NAME: &str = "is_epoch_finished";
54 pub const ROW_COUNT_COLUMN_NAME: &str = "row_count";
55 pub const VNODE_COLUMN_NAME: &str = "vnode";
56
57 pub fn new_with_stream_scan_type(
58 core: generic::TableScan,
59 stream_scan_type: StreamScanType,
60 ) -> Self {
61 let batch_plan_id = core.ctx.next_plan_node_id();
62
63 let distribution = {
64 match core.distribution_key() {
65 Some(distribution_key) => {
66 if distribution_key.is_empty() {
67 Distribution::Single
68 } else {
69 Distribution::UpstreamHashShard(distribution_key, core.table_desc.table_id)
71 }
72 }
73 None => Distribution::SomeShard,
74 }
75 };
76
77 let base = PlanBase::new_stream_with_core(
78 &core,
79 distribution,
80 core.append_only(),
81 false,
82 core.watermark_columns(),
83 MonotonicityMap::new(),
84 );
85 Self {
86 base,
87 core,
88 batch_plan_id,
89 stream_scan_type,
90 }
91 }
92
93 pub fn table_name(&self) -> &str {
94 &self.core.table_name
95 }
96
97 pub fn core(&self) -> &generic::TableScan {
98 &self.core
99 }
100
101 pub fn to_index_scan(
102 &self,
103 index_name: &str,
104 index_table_catalog: Arc<TableCatalog>,
105 primary_to_secondary_mapping: &BTreeMap<usize, usize>,
106 function_mapping: &HashMap<FunctionCall, usize>,
107 stream_scan_type: StreamScanType,
108 ) -> StreamTableScan {
109 let logical_index_scan = self.core.to_index_scan(
110 index_name,
111 index_table_catalog,
112 primary_to_secondary_mapping,
113 function_mapping,
114 );
115 logical_index_scan
116 .distribution_key()
117 .expect("distribution key of stream chain must exist in output columns");
118 StreamTableScan::new_with_stream_scan_type(logical_index_scan, stream_scan_type)
119 }
120
121 pub fn stream_scan_type(&self) -> StreamScanType {
122 self.stream_scan_type
123 }
124
125 fn get_upstream_state_table(&self) -> &TableCatalog {
127 self.core.table_catalog.as_ref()
128 }
129
130 pub fn build_backfill_state_catalog(
174 &self,
175 state: &mut BuildFragmentGraphState,
176 stream_scan_type: StreamScanType,
177 ) -> TableCatalog {
178 let mut catalog_builder = TableCatalogBuilder::default();
179 let upstream_schema = &self.core.get_table_columns();
180
181 catalog_builder.add_column(&Field::with_name(
184 VirtualNode::RW_TYPE,
185 Self::VNODE_COLUMN_NAME,
186 ));
187 catalog_builder.add_order_column(0, OrderType::ascending());
188
189 match stream_scan_type {
190 StreamScanType::Chain
191 | StreamScanType::Rearrange
192 | StreamScanType::Backfill
193 | StreamScanType::UpstreamOnly
194 | StreamScanType::ArrangementBackfill => {
195 for col_order in self.core.primary_key() {
197 let col = &upstream_schema[col_order.column_index];
198 catalog_builder.add_column(&Field::from(col));
199 }
200
201 catalog_builder.add_column(&Field::with_name(
203 DataType::Boolean,
204 Self::BACKFILL_FINISHED_COLUMN_NAME,
205 ));
206
207 catalog_builder.add_column(&Field::with_name(
209 DataType::Int64,
210 Self::ROW_COUNT_COLUMN_NAME,
211 ));
212 }
213 StreamScanType::SnapshotBackfill | StreamScanType::CrossDbSnapshotBackfill => {
214 catalog_builder
216 .add_column(&Field::with_name(DataType::Int64, Self::EPOCH_COLUMN_NAME));
217
218 catalog_builder.add_column(&Field::with_name(
220 DataType::Int64,
221 Self::ROW_COUNT_COLUMN_NAME,
222 ));
223
224 catalog_builder.add_column(&Field::with_name(
226 DataType::Boolean,
227 Self::IS_EPOCH_FINISHED_COLUMN_NAME,
228 ));
229
230 for col_order in self.core.primary_key() {
232 let col = &upstream_schema[col_order.column_index];
233 catalog_builder.add_column(&Field::from(col));
234 }
235 }
236 StreamScanType::Unspecified => {
237 unreachable!()
238 }
239 }
240
241 catalog_builder.set_vnode_col_idx(0);
243 catalog_builder.set_dist_key_in_pk(vec![0]);
244
245 let num_of_columns = catalog_builder.columns().len();
246 catalog_builder.set_value_indices((1..num_of_columns).collect_vec());
247
248 catalog_builder
249 .build(vec![0], 1)
250 .with_id(state.gen_table_id_wrapped())
251 }
252}
253
254impl_plan_tree_node_for_leaf! { StreamTableScan }
255
256impl Distill for StreamTableScan {
257 fn distill<'a>(&self) -> XmlNode<'a> {
258 let verbose = self.base.ctx().is_explain_verbose();
259 let mut vec = Vec::with_capacity(4);
260 vec.push(("table", Pretty::from(self.core.table_name.clone())));
261 vec.push(("columns", self.core.columns_pretty(verbose)));
262
263 if verbose {
264 vec.push(("stream_scan_type", Pretty::debug(&self.stream_scan_type)));
265 let stream_key = IndicesDisplay {
266 indices: self.stream_key().unwrap_or_default(),
267 schema: self.base.schema(),
268 };
269 vec.push(("stream_key", stream_key.distill()));
270 let pk = IndicesDisplay {
271 indices: &self
272 .core
273 .primary_key()
274 .iter()
275 .map(|x| x.column_index)
276 .collect_vec(),
277 schema: &self.core.table_catalog.column_schema(),
278 };
279 vec.push(("pk", pk.distill()));
280 let dist = Pretty::display(&DistributionDisplay {
281 distribution: self.distribution(),
282 input_schema: self.base.schema(),
283 });
284 vec.push(("dist", dist));
285 }
286
287 childless_record("StreamTableScan", vec)
288 }
289}
290
291impl StreamNode for StreamTableScan {
292 fn to_stream_prost_body(&self, _state: &mut BuildFragmentGraphState) -> PbNodeBody {
293 unreachable!(
294 "stream scan cannot be converted into a prost body -- call `adhoc_to_stream_prost` instead."
295 )
296 }
297}
298
299impl StreamTableScan {
300 pub fn adhoc_to_stream_prost(
301 &self,
302 state: &mut BuildFragmentGraphState,
303 ) -> SchedulerResult<PbStreamNode> {
304 use risingwave_pb::stream_plan::*;
305
306 let stream_key = self
307 .stream_key()
308 .unwrap_or(&[])
309 .iter()
310 .map(|x| *x as u32)
311 .collect_vec();
312
313 let upstream_column_ids = match self.stream_scan_type {
315 StreamScanType::Backfill
317 | StreamScanType::ArrangementBackfill
318 | StreamScanType::SnapshotBackfill
319 | StreamScanType::CrossDbSnapshotBackfill => self.core.output_and_pk_column_ids(),
320 StreamScanType::Chain | StreamScanType::Rearrange | StreamScanType::UpstreamOnly => {
321 self.core.output_column_ids()
322 }
323 StreamScanType::Unspecified => unreachable!(),
324 }
325 .iter()
326 .map(ColumnId::get_id)
327 .collect_vec();
328
329 let snapshot_schema = upstream_column_ids
331 .iter()
332 .map(|&id| {
333 let col = self
334 .core
335 .get_table_columns()
336 .iter()
337 .find(|c| c.column_id.get_id() == id)
338 .unwrap();
339 Field::from(col).to_prost()
340 })
341 .collect_vec();
342
343 let upstream_schema = snapshot_schema.clone();
344
345 let batch_plan_node = BatchPlanNode {
347 table_desc: Some(self.core.table_desc.try_to_protobuf()?),
348 column_ids: upstream_column_ids.clone(),
349 };
350
351 let catalog = self
352 .build_backfill_state_catalog(state, self.stream_scan_type)
353 .to_internal_table_prost();
354
355 let output_indices = self
359 .core
360 .output_column_ids()
361 .iter()
362 .map(|i| {
363 upstream_column_ids
364 .iter()
365 .position(|&x| x == i.get_id())
366 .unwrap() as u32
367 })
368 .collect_vec();
369
370 let arrangement_table = if self.stream_scan_type == StreamScanType::ArrangementBackfill {
371 let upstream_table_catalog = self.get_upstream_state_table();
372 Some(upstream_table_catalog.to_internal_table_prost())
373 } else {
374 None
375 };
376
377 let input = if self.stream_scan_type == StreamScanType::CrossDbSnapshotBackfill {
378 vec![]
379 } else {
380 vec![
381 PbStreamNode {
384 node_body: Some(PbNodeBody::Merge(Default::default())),
385 identity: "Upstream".into(),
386 fields: upstream_schema.clone(),
387 stream_key: vec![], ..Default::default()
389 },
390 PbStreamNode {
392 node_body: Some(PbNodeBody::BatchPlan(Box::new(batch_plan_node))),
393 operator_id: self.batch_plan_id.0 as u64,
394 identity: "BatchPlanNode".into(),
395 fields: snapshot_schema,
396 stream_key: vec![], input: vec![],
398 append_only: true,
399 },
400 ]
401 };
402
403 let node_body = PbNodeBody::StreamScan(Box::new(StreamScanNode {
404 table_id: self.core.table_desc.table_id.table_id,
405 stream_scan_type: self.stream_scan_type as i32,
406 output_indices,
408 upstream_column_ids,
409 table_desc: Some(self.core.table_desc.try_to_protobuf()?),
411 state_table: Some(catalog),
412 arrangement_table,
413 rate_limit: self.base.ctx().overwrite_options().backfill_rate_limit,
414 ..Default::default()
415 }));
416
417 Ok(PbStreamNode {
418 fields: self.schema().to_prost(),
419 input,
420 node_body: Some(node_body),
421 stream_key,
422 operator_id: self.base.id().0 as u64,
423 identity: self.distill_to_string(),
424 append_only: self.append_only(),
425 })
426 }
427}
428
429impl ExprRewritable for StreamTableScan {
430 fn has_rewritable_expr(&self) -> bool {
431 true
432 }
433
434 fn rewrite_exprs(&self, r: &mut dyn ExprRewriter) -> PlanRef {
435 let mut core = self.core.clone();
436 core.rewrite_exprs(r);
437 Self::new_with_stream_scan_type(core, self.stream_scan_type).into()
438 }
439}
440
441impl ExprVisitable for StreamTableScan {
442 fn visit_exprs(&self, v: &mut dyn ExprVisitor) {
443 self.core.visit_exprs(v);
444 }
445}