risingwave_frontend/optimizer/plan_node/
stream_table_scan.rs1use std::collections::{BTreeMap, HashMap};
16use std::sync::Arc;
17
18use itertools::Itertools;
19use pretty_xmlish::{Pretty, XmlNode};
20use risingwave_common::catalog::Field;
21use risingwave_common::hash::VirtualNode;
22use risingwave_common::types::DataType;
23use risingwave_common::util::sort_util::OrderType;
24use risingwave_pb::stream_plan::stream_node::PbNodeBody;
25use risingwave_pb::stream_plan::{PbStreamNode, StreamScanType};
26
27use super::stream::prelude::*;
28use super::utils::{Distill, childless_record};
29use super::{ExprRewritable, PlanBase, PlanNodeId, PlanRef, StreamNode, generic};
30use crate::TableCatalog;
31use crate::catalog::ColumnId;
32use crate::expr::{ExprRewriter, ExprVisitor, FunctionCall};
33use crate::optimizer::plan_node::expr_visitable::ExprVisitable;
34use crate::optimizer::plan_node::utils::{IndicesDisplay, TableCatalogBuilder};
35use crate::optimizer::property::{Distribution, DistributionDisplay, MonotonicityMap};
36use crate::scheduler::SchedulerResult;
37use crate::stream_fragmenter::BuildFragmentGraphState;
38
39#[derive(Debug, Clone, PartialEq, Eq, Hash)]
43pub struct StreamTableScan {
44 pub base: PlanBase<Stream>,
45 core: generic::TableScan,
46 batch_plan_id: PlanNodeId,
47 stream_scan_type: StreamScanType,
48}
49
50impl StreamTableScan {
51 pub fn new_with_stream_scan_type(
52 core: generic::TableScan,
53 stream_scan_type: StreamScanType,
54 ) -> Self {
55 let batch_plan_id = core.ctx.next_plan_node_id();
56
57 let distribution = {
58 match core.distribution_key() {
59 Some(distribution_key) => {
60 if distribution_key.is_empty() {
61 Distribution::Single
62 } else {
63 Distribution::UpstreamHashShard(distribution_key, core.table_desc.table_id)
65 }
66 }
67 None => Distribution::SomeShard,
68 }
69 };
70
71 let base = PlanBase::new_stream_with_core(
72 &core,
73 distribution,
74 core.append_only(),
75 false,
76 core.watermark_columns(),
77 MonotonicityMap::new(),
78 );
79 Self {
80 base,
81 core,
82 batch_plan_id,
83 stream_scan_type,
84 }
85 }
86
87 pub fn table_name(&self) -> &str {
88 &self.core.table_name
89 }
90
91 pub fn core(&self) -> &generic::TableScan {
92 &self.core
93 }
94
95 pub fn to_index_scan(
96 &self,
97 index_name: &str,
98 index_table_catalog: Arc<TableCatalog>,
99 primary_to_secondary_mapping: &BTreeMap<usize, usize>,
100 function_mapping: &HashMap<FunctionCall, usize>,
101 stream_scan_type: StreamScanType,
102 ) -> StreamTableScan {
103 let logical_index_scan = self.core.to_index_scan(
104 index_name,
105 index_table_catalog,
106 primary_to_secondary_mapping,
107 function_mapping,
108 );
109 logical_index_scan
110 .distribution_key()
111 .expect("distribution key of stream chain must exist in output columns");
112 StreamTableScan::new_with_stream_scan_type(logical_index_scan, stream_scan_type)
113 }
114
115 pub fn stream_scan_type(&self) -> StreamScanType {
116 self.stream_scan_type
117 }
118
119 fn get_upstream_state_table(&self) -> &TableCatalog {
121 self.core.table_catalog.as_ref()
122 }
123
124 pub fn build_backfill_state_catalog(
168 &self,
169 state: &mut BuildFragmentGraphState,
170 stream_scan_type: StreamScanType,
171 ) -> TableCatalog {
172 let mut catalog_builder = TableCatalogBuilder::default();
173 let upstream_schema = &self.core.get_table_columns();
174
175 catalog_builder.add_column(&Field::with_name(VirtualNode::RW_TYPE, "vnode"));
178 catalog_builder.add_order_column(0, OrderType::ascending());
179
180 match stream_scan_type {
181 StreamScanType::Chain
182 | StreamScanType::Rearrange
183 | StreamScanType::Backfill
184 | StreamScanType::UpstreamOnly
185 | StreamScanType::ArrangementBackfill => {
186 for col_order in self.core.primary_key() {
188 let col = &upstream_schema[col_order.column_index];
189 catalog_builder.add_column(&Field::from(col));
190 }
191
192 catalog_builder
194 .add_column(&Field::with_name(DataType::Boolean, "backfill_finished"));
195
196 catalog_builder.add_column(&Field::with_name(DataType::Int64, "row_count"));
198 }
199 StreamScanType::SnapshotBackfill | StreamScanType::CrossDbSnapshotBackfill => {
200 catalog_builder.add_column(&Field::with_name(DataType::Int64, "epoch"));
202
203 catalog_builder.add_column(&Field::with_name(DataType::Int64, "row_count"));
205
206 catalog_builder
208 .add_column(&Field::with_name(DataType::Boolean, "is_epoch_finished"));
209
210 for col_order in self.core.primary_key() {
212 let col = &upstream_schema[col_order.column_index];
213 catalog_builder.add_column(&Field::from(col));
214 }
215 }
216 StreamScanType::Unspecified => {
217 unreachable!()
218 }
219 }
220
221 catalog_builder.set_vnode_col_idx(0);
223 catalog_builder.set_dist_key_in_pk(vec![0]);
224
225 let num_of_columns = catalog_builder.columns().len();
226 catalog_builder.set_value_indices((1..num_of_columns).collect_vec());
227
228 catalog_builder
229 .build(vec![0], 1)
230 .with_id(state.gen_table_id_wrapped())
231 }
232}
233
234impl_plan_tree_node_for_leaf! { StreamTableScan }
235
236impl Distill for StreamTableScan {
237 fn distill<'a>(&self) -> XmlNode<'a> {
238 let verbose = self.base.ctx().is_explain_verbose();
239 let mut vec = Vec::with_capacity(4);
240 vec.push(("table", Pretty::from(self.core.table_name.clone())));
241 vec.push(("columns", self.core.columns_pretty(verbose)));
242
243 if verbose {
244 vec.push(("stream_scan_type", Pretty::debug(&self.stream_scan_type)));
245 let stream_key = IndicesDisplay {
246 indices: self.stream_key().unwrap_or_default(),
247 schema: self.base.schema(),
248 };
249 vec.push(("stream_key", stream_key.distill()));
250 let pk = IndicesDisplay {
251 indices: &self
252 .core
253 .primary_key()
254 .iter()
255 .map(|x| x.column_index)
256 .collect_vec(),
257 schema: &self.core.table_catalog.column_schema(),
258 };
259 vec.push(("pk", pk.distill()));
260 let dist = Pretty::display(&DistributionDisplay {
261 distribution: self.distribution(),
262 input_schema: self.base.schema(),
263 });
264 vec.push(("dist", dist));
265 }
266
267 childless_record("StreamTableScan", vec)
268 }
269}
270
271impl StreamNode for StreamTableScan {
272 fn to_stream_prost_body(&self, _state: &mut BuildFragmentGraphState) -> PbNodeBody {
273 unreachable!(
274 "stream scan cannot be converted into a prost body -- call `adhoc_to_stream_prost` instead."
275 )
276 }
277}
278
279impl StreamTableScan {
280 pub fn adhoc_to_stream_prost(
281 &self,
282 state: &mut BuildFragmentGraphState,
283 ) -> SchedulerResult<PbStreamNode> {
284 use risingwave_pb::stream_plan::*;
285
286 let stream_key = self
287 .stream_key()
288 .unwrap_or(&[])
289 .iter()
290 .map(|x| *x as u32)
291 .collect_vec();
292
293 let upstream_column_ids = match self.stream_scan_type {
295 StreamScanType::Backfill
297 | StreamScanType::ArrangementBackfill
298 | StreamScanType::SnapshotBackfill
299 | StreamScanType::CrossDbSnapshotBackfill => self.core.output_and_pk_column_ids(),
300 StreamScanType::Chain | StreamScanType::Rearrange | StreamScanType::UpstreamOnly => {
301 self.core.output_column_ids()
302 }
303 StreamScanType::Unspecified => unreachable!(),
304 }
305 .iter()
306 .map(ColumnId::get_id)
307 .collect_vec();
308
309 let snapshot_schema = upstream_column_ids
311 .iter()
312 .map(|&id| {
313 let col = self
314 .core
315 .get_table_columns()
316 .iter()
317 .find(|c| c.column_id.get_id() == id)
318 .unwrap();
319 Field::from(col).to_prost()
320 })
321 .collect_vec();
322
323 let upstream_schema = snapshot_schema.clone();
324
325 let batch_plan_node = BatchPlanNode {
327 table_desc: Some(self.core.table_desc.try_to_protobuf()?),
328 column_ids: upstream_column_ids.clone(),
329 };
330
331 let catalog = self
332 .build_backfill_state_catalog(state, self.stream_scan_type)
333 .to_internal_table_prost();
334
335 let output_indices = self
339 .core
340 .output_column_ids()
341 .iter()
342 .map(|i| {
343 upstream_column_ids
344 .iter()
345 .position(|&x| x == i.get_id())
346 .unwrap() as u32
347 })
348 .collect_vec();
349
350 let arrangement_table = if self.stream_scan_type == StreamScanType::ArrangementBackfill {
351 let upstream_table_catalog = self.get_upstream_state_table();
352 Some(upstream_table_catalog.to_internal_table_prost())
353 } else {
354 None
355 };
356
357 let input = if self.stream_scan_type == StreamScanType::CrossDbSnapshotBackfill {
358 vec![]
359 } else {
360 vec![
361 PbStreamNode {
364 node_body: Some(PbNodeBody::Merge(Default::default())),
365 identity: "Upstream".into(),
366 fields: upstream_schema.clone(),
367 stream_key: vec![], ..Default::default()
369 },
370 PbStreamNode {
372 node_body: Some(PbNodeBody::BatchPlan(Box::new(batch_plan_node))),
373 operator_id: self.batch_plan_id.0 as u64,
374 identity: "BatchPlanNode".into(),
375 fields: snapshot_schema,
376 stream_key: vec![], input: vec![],
378 append_only: true,
379 },
380 ]
381 };
382
383 let node_body = PbNodeBody::StreamScan(Box::new(StreamScanNode {
384 table_id: self.core.table_desc.table_id.table_id,
385 stream_scan_type: self.stream_scan_type as i32,
386 output_indices,
388 upstream_column_ids,
389 table_desc: Some(self.core.table_desc.try_to_protobuf()?),
391 state_table: Some(catalog),
392 arrangement_table,
393 rate_limit: self.base.ctx().overwrite_options().backfill_rate_limit,
394 ..Default::default()
395 }));
396
397 Ok(PbStreamNode {
398 fields: self.schema().to_prost(),
399 input,
400 node_body: Some(node_body),
401 stream_key,
402 operator_id: self.base.id().0 as u64,
403 identity: self.distill_to_string(),
404 append_only: self.append_only(),
405 })
406 }
407}
408
409impl ExprRewritable for StreamTableScan {
410 fn has_rewritable_expr(&self) -> bool {
411 true
412 }
413
414 fn rewrite_exprs(&self, r: &mut dyn ExprRewriter) -> PlanRef {
415 let mut core = self.core.clone();
416 core.rewrite_exprs(r);
417 Self::new_with_stream_scan_type(core, self.stream_scan_type).into()
418 }
419}
420
421impl ExprVisitable for StreamTableScan {
422 fn visit_exprs(&self, v: &mut dyn ExprVisitor) {
423 self.core.visit_exprs(v);
424 }
425}