1use std::assert_matches::assert_matches;
16use std::num::NonZeroU32;
17
18use fixedbitset::FixedBitSet;
19use itertools::Itertools;
20use pretty_xmlish::{Pretty, XmlNode};
21use risingwave_common::catalog::{
22 ColumnCatalog, ConflictBehavior, CreateType, Engine, OBJECT_ID_PLACEHOLDER, StreamJobStatus,
23 TableId,
24};
25use risingwave_common::hash::VnodeCount;
26use risingwave_common::types::DataType;
27use risingwave_common::util::column_index_mapping::ColIndexMapping;
28use risingwave_common::util::iter_util::ZipEqFast;
29use risingwave_common::util::sort_util::{ColumnOrder, OrderType};
30use risingwave_pb::catalog::PbWebhookSourceInfo;
31use risingwave_pb::stream_plan::stream_node::PbNodeBody;
32
33use super::derive::derive_columns;
34use super::stream::prelude::*;
35use super::utils::{Distill, TableCatalogBuilder, childless_record};
36use super::{
37 ExprRewritable, PlanTreeNodeUnary, StreamNode, StreamPlanRef as PlanRef, reorganize_elements_id,
38};
39use crate::catalog::table_catalog::{TableCatalog, TableType, TableVersion};
40use crate::catalog::{DatabaseId, SchemaId};
41use crate::error::Result;
42use crate::optimizer::StreamOptimizedLogicalPlanRoot;
43use crate::optimizer::plan_node::derive::derive_pk;
44use crate::optimizer::plan_node::expr_visitable::ExprVisitable;
45use crate::optimizer::plan_node::utils::plan_can_use_background_ddl;
46use crate::optimizer::plan_node::{PlanBase, PlanNodeMeta};
47use crate::optimizer::property::{Cardinality, Distribution, Order, RequiredDist};
48use crate::stream_fragmenter::BuildFragmentGraphState;
49
50#[derive(Debug, Clone, PartialEq, Eq, Hash)]
52pub struct StreamMaterialize {
53 pub base: PlanBase<Stream>,
54 input: PlanRef,
56 table: TableCatalog,
57 staging_table: Option<TableCatalog>,
59 refresh_progress_table: Option<TableCatalog>,
61}
62
63impl StreamMaterialize {
64 pub fn new(input: PlanRef, table: TableCatalog) -> Result<Self> {
65 Self::new_with_staging_and_progress(input, table, None, None)
66 }
67
68 pub fn new_with_staging_and_progress(
69 input: PlanRef,
70 table: TableCatalog,
71 staging_table: Option<TableCatalog>,
72 refresh_progress_table: Option<TableCatalog>,
73 ) -> Result<Self> {
74 let kind = match table.conflict_behavior() {
75 ConflictBehavior::NoCheck => {
76 reject_upsert_input!(input, "Materialize without conflict handling")
77 }
78
79 ConflictBehavior::Overwrite
81 | ConflictBehavior::IgnoreConflict
82 | ConflictBehavior::DoUpdateIfNotNull => match input.stream_kind() {
83 StreamKind::AppendOnly => StreamKind::AppendOnly,
84 StreamKind::Retract | StreamKind::Upsert => StreamKind::Retract,
85 },
86 };
87 let base = PlanBase::new_stream(
88 input.ctx(),
89 input.schema().clone(),
90 Some(table.stream_key.clone()),
91 input.functional_dependency().clone(),
92 input.distribution().clone(),
93 kind,
94 input.emit_on_window_close(),
95 input.watermark_columns().clone(),
96 input.columns_monotonicity().clone(),
97 );
98
99 Ok(Self {
100 base,
101 input,
102 table,
103 staging_table,
104 refresh_progress_table,
105 })
106 }
107
108 pub fn create(
113 StreamOptimizedLogicalPlanRoot {
114 plan: input,
115 required_dist: user_distributed_by,
116 required_order: user_order_by,
117 out_fields: user_cols,
118 out_names,
119 ..
120 }: StreamOptimizedLogicalPlanRoot,
121 name: String,
122 database_id: DatabaseId,
123 schema_id: SchemaId,
124 definition: String,
125 table_type: TableType,
126 cardinality: Cardinality,
127 retention_seconds: Option<NonZeroU32>,
128 ) -> Result<Self> {
129 let input = Self::rewrite_input(input, user_distributed_by, table_type)?;
130 let input = reorganize_elements_id(input);
132 let columns = derive_columns(input.schema(), out_names, &user_cols)?;
133
134 let create_type = if matches!(table_type, TableType::MaterializedView)
135 && input.ctx().session_ctx().config().background_ddl()
136 && plan_can_use_background_ddl(&input)
137 {
138 CreateType::Background
139 } else {
140 CreateType::Foreground
141 };
142
143 let conflict_behavior = match input.stream_kind() {
145 StreamKind::Retract | StreamKind::AppendOnly => ConflictBehavior::NoCheck,
146 StreamKind::Upsert => ConflictBehavior::Overwrite,
147 };
148
149 let table = Self::derive_table_catalog(
150 input.clone(),
151 name,
152 database_id,
153 schema_id,
154 user_order_by,
155 columns,
156 definition,
157 conflict_behavior,
158 vec![],
159 None,
160 None,
161 table_type,
162 None,
163 cardinality,
164 retention_seconds,
165 create_type,
166 None,
167 Engine::Hummock,
168 false,
169 )?;
170
171 Self::new(input, table)
172 }
173
174 #[allow(clippy::too_many_arguments)]
180 pub fn create_for_table(
181 input: PlanRef,
182 name: String,
183 database_id: DatabaseId,
184 schema_id: SchemaId,
185 user_distributed_by: RequiredDist,
186 user_order_by: Order,
187 columns: Vec<ColumnCatalog>,
188 definition: String,
189 conflict_behavior: ConflictBehavior,
190 version_column_indices: Vec<usize>,
191 pk_column_indices: Vec<usize>,
192 row_id_index: Option<usize>,
193 version: TableVersion,
194 retention_seconds: Option<NonZeroU32>,
195 webhook_info: Option<PbWebhookSourceInfo>,
196 engine: Engine,
197 refreshable: bool,
198 ) -> Result<Self> {
199 let input = Self::rewrite_input(input, user_distributed_by, TableType::Table)?;
200
201 let table = Self::derive_table_catalog(
202 input.clone(),
203 name.clone(),
204 database_id,
205 schema_id,
206 user_order_by.clone(),
207 columns.clone(),
208 definition.clone(),
209 conflict_behavior,
210 version_column_indices,
211 Some(pk_column_indices.clone()),
212 row_id_index,
213 TableType::Table,
214 Some(version.clone()),
215 Cardinality::unknown(), retention_seconds,
217 CreateType::Foreground,
218 webhook_info.clone(),
219 engine,
220 refreshable,
221 )?;
222
223 let (staging_table, refresh_progress_table) = if refreshable {
225 let staging = Some(Self::derive_staging_table_catalog(table.clone()));
226 let progress = Some(Self::derive_refresh_progress_table_catalog(table.clone()));
227 (staging, progress)
228 } else {
229 (None, None)
230 };
231
232 tracing::info!(
233 table_name = %name,
234 refreshable = %refreshable,
235 has_staging_table = %staging_table.is_some(),
236 has_progress_table = %refresh_progress_table.is_some(),
237 "Creating StreamMaterialize with staging and progress table info"
238 );
239
240 Self::new_with_staging_and_progress(input, table, staging_table, refresh_progress_table)
241 }
242
243 fn rewrite_input(
245 input: PlanRef,
246 user_distributed_by: RequiredDist,
247 table_type: TableType,
248 ) -> Result<PlanRef> {
249 let required_dist = match input.distribution() {
250 Distribution::Single => RequiredDist::single(),
251 _ => match table_type {
252 TableType::Table => {
253 assert_matches!(user_distributed_by, RequiredDist::ShardByKey(_));
254 user_distributed_by
255 }
256 TableType::MaterializedView => {
257 assert_matches!(user_distributed_by, RequiredDist::Any);
258 let required_dist =
260 RequiredDist::shard_by_key(input.schema().len(), input.expect_stream_key());
261
262 let is_stream_join = matches!(input.as_stream_hash_join(), Some(_join))
267 || matches!(input.as_stream_temporal_join(), Some(_join))
268 || matches!(input.as_stream_delta_join(), Some(_join));
269
270 if is_stream_join {
271 return Ok(required_dist.stream_enforce(input));
272 }
273
274 required_dist
275 }
276 TableType::Index => {
277 assert_matches!(
278 user_distributed_by,
279 RequiredDist::PhysicalDist(Distribution::HashShard(_))
280 );
281 user_distributed_by
282 }
283 TableType::VectorIndex => {
284 unreachable!("VectorIndex should not be created by StreamMaterialize")
285 }
286 TableType::Internal => unreachable!(),
287 },
288 };
289
290 required_dist.streaming_enforce_if_not_satisfies(input)
291 }
292
293 #[expect(clippy::too_many_arguments)]
298 fn derive_table_catalog(
299 rewritten_input: PlanRef,
300 name: String,
301 database_id: DatabaseId,
302 schema_id: SchemaId,
303 user_order_by: Order,
304 columns: Vec<ColumnCatalog>,
305 definition: String,
306 conflict_behavior: ConflictBehavior,
307 version_column_indices: Vec<usize>,
308 pk_column_indices: Option<Vec<usize>>, row_id_index: Option<usize>,
310 table_type: TableType,
311 version: Option<TableVersion>,
312 cardinality: Cardinality,
313 retention_seconds: Option<NonZeroU32>,
314 create_type: CreateType,
315 webhook_info: Option<PbWebhookSourceInfo>,
316 engine: Engine,
317 refreshable: bool,
318 ) -> Result<TableCatalog> {
319 let input = rewritten_input;
320
321 let value_indices = (0..columns.len()).collect_vec();
322 let distribution_key = input.distribution().dist_column_indices().to_vec();
323 let append_only = input.append_only();
324 let watermark_columns = input.watermark_columns().indices().collect();
327
328 let (table_pk, stream_key) = if let Some(pk_column_indices) = pk_column_indices {
329 let table_pk = pk_column_indices
330 .iter()
331 .map(|idx| ColumnOrder::new(*idx, OrderType::ascending()))
332 .collect();
333 (table_pk, pk_column_indices)
335 } else {
336 derive_pk(input, user_order_by, &columns)
337 };
338 let read_prefix_len_hint = table_pk.len();
341 Ok(TableCatalog {
342 id: TableId::placeholder(),
343 schema_id,
344 database_id,
345 associated_source_id: None,
346 name,
347 columns,
348 pk: table_pk,
349 stream_key,
350 distribution_key,
351 table_type,
352 append_only,
353 owner: risingwave_common::catalog::DEFAULT_SUPER_USER_ID,
354 fragment_id: OBJECT_ID_PLACEHOLDER,
355 dml_fragment_id: None,
356 vnode_col_index: None,
357 row_id_index,
358 value_indices,
359 definition,
360 conflict_behavior,
361 version_column_indices,
362 read_prefix_len_hint,
363 version,
364 watermark_columns,
365 dist_key_in_pk: vec![],
366 cardinality,
367 created_at_epoch: None,
368 initialized_at_epoch: None,
369 cleaned_by_watermark: false,
370 create_type,
371 stream_job_status: StreamJobStatus::Creating,
372 description: None,
373 initialized_at_cluster_version: None,
374 created_at_cluster_version: None,
375 retention_seconds: retention_seconds.map(|i| i.into()),
376 cdc_table_id: None,
377 vnode_count: VnodeCount::Placeholder, webhook_info,
379 job_id: None,
380 engine: match table_type {
381 TableType::Table => engine,
382 TableType::MaterializedView
383 | TableType::Index
384 | TableType::Internal
385 | TableType::VectorIndex => {
386 assert_eq!(engine, Engine::Hummock);
387 engine
388 }
389 },
390 clean_watermark_index_in_pk: None, refreshable,
392 vector_index_info: None,
393 cdc_table_type: None,
394 })
395 }
396
397 fn derive_staging_table_catalog(
399 TableCatalog {
400 id,
401 schema_id,
402 database_id,
403 associated_source_id,
404 name,
405 columns,
406 pk,
407 stream_key,
408 table_type: _,
409 distribution_key,
410 append_only,
411 cardinality,
412 owner,
413 retention_seconds,
414 fragment_id,
415 dml_fragment_id: _,
416 vnode_col_index,
417 row_id_index,
418 value_indices: _,
419 definition,
420 conflict_behavior,
421 version_column_indices,
422 read_prefix_len_hint,
423 version,
424 watermark_columns: _,
425 dist_key_in_pk,
426 created_at_epoch,
427 initialized_at_epoch,
428 cleaned_by_watermark,
429 create_type,
430 stream_job_status,
431 description,
432 created_at_cluster_version,
433 initialized_at_cluster_version,
434 cdc_table_id,
435 vnode_count,
436 webhook_info,
437 job_id,
438 engine,
439 clean_watermark_index_in_pk,
440 refreshable,
441 vector_index_info,
442 cdc_table_type,
443 }: TableCatalog,
444 ) -> TableCatalog {
445 tracing::info!(
446 table_name = %name,
447 "Creating staging table for refreshable table"
448 );
449
450 assert!(row_id_index.is_none());
451 assert!(retention_seconds.is_none());
452 assert!(refreshable);
453
454 let mut pk_col_indices = vec![];
456 let mut pk_cols = vec![];
457 for (i, col) in columns.iter().enumerate() {
458 if pk.iter().any(|pk| pk.column_index == i) {
459 pk_col_indices.push(i);
460 pk_cols.push(col.clone());
461 }
462 }
463 let mapping = ColIndexMapping::with_remaining_columns(&pk_col_indices, columns.len());
464
465 TableCatalog {
466 id,
467 schema_id,
468 database_id,
469 associated_source_id,
470 name,
471 value_indices: (0..pk_cols.len()).collect(),
472 columns: pk_cols,
473 pk: pk
474 .iter()
475 .map(|pk| ColumnOrder::new(mapping.map(pk.column_index), pk.order_type))
476 .collect(),
477 stream_key: mapping.try_map_all(stream_key).unwrap(),
478 vnode_col_index: vnode_col_index.map(|i| mapping.map(i)),
479 dist_key_in_pk: mapping.try_map_all(dist_key_in_pk).unwrap(),
480 distribution_key: mapping.try_map_all(distribution_key).unwrap(),
481 table_type: TableType::Internal,
482 watermark_columns: FixedBitSet::new(),
483 append_only,
484 cardinality,
485 owner,
486 retention_seconds: None,
487 fragment_id,
488 dml_fragment_id: None,
489 row_id_index: None,
490 definition,
491 conflict_behavior,
492 version_column_indices,
493 read_prefix_len_hint,
494 version,
495 created_at_epoch,
496 initialized_at_epoch,
497 cleaned_by_watermark,
498 create_type,
499 stream_job_status,
500 description,
501 created_at_cluster_version,
502 initialized_at_cluster_version,
503 cdc_table_id,
504 vnode_count,
505 webhook_info,
506 job_id,
507 engine,
508 clean_watermark_index_in_pk,
509 refreshable: false,
510 vector_index_info,
511 cdc_table_type,
512 }
513 }
514
515 fn derive_refresh_progress_table_catalog(table: TableCatalog) -> TableCatalog {
519 tracing::debug!(
520 table_name = %table.name,
521 "Creating refresh progress table for refreshable table"
522 );
523
524 let mut columns = vec![ColumnCatalog {
527 column_desc: risingwave_common::catalog::ColumnDesc::named(
528 "vnode",
529 0.into(),
530 DataType::Int16,
531 ),
532 is_hidden: false,
533 }];
534
535 let mut col_index = 1;
537 for pk_col in &table.pk {
538 let upstream_col = &table.columns[pk_col.column_index];
539 columns.push(ColumnCatalog {
540 column_desc: risingwave_common::catalog::ColumnDesc::named(
541 format!("pos_{}", upstream_col.name()),
542 col_index.into(),
543 upstream_col.data_type().clone(),
544 ),
545 is_hidden: false,
546 });
547 col_index += 1;
548 }
549
550 for (name, data_type) in [
552 ("is_completed", DataType::Boolean),
553 ("processed_rows", DataType::Int64),
554 ] {
555 columns.push(ColumnCatalog {
556 column_desc: risingwave_common::catalog::ColumnDesc::named(
557 name,
558 col_index.into(),
559 data_type,
560 ),
561 is_hidden: false,
562 });
563 col_index += 1;
564 }
565
566 let mut builder = TableCatalogBuilder::default();
567
568 for column in &columns {
570 builder.add_column(&(&column.column_desc).into());
571 }
572
573 builder.add_order_column(0, OrderType::ascending());
575 builder.set_vnode_col_idx(0);
576 builder.set_value_indices((0..columns.len()).collect());
577 builder.set_dist_key_in_pk(vec![0]);
578
579 builder.build(vec![0], 1)
580 }
581
582 #[must_use]
584 pub fn table(&self) -> &TableCatalog {
585 &self.table
586 }
587
588 #[must_use]
590 pub fn staging_table(&self) -> Option<&TableCatalog> {
591 self.staging_table.as_ref()
592 }
593
594 #[must_use]
596 pub fn refresh_progress_table(&self) -> Option<&TableCatalog> {
597 self.refresh_progress_table.as_ref()
598 }
599
600 pub fn name(&self) -> &str {
601 self.table.name()
602 }
603}
604
605impl Distill for StreamMaterialize {
606 fn distill<'a>(&self) -> XmlNode<'a> {
607 let table = self.table();
608
609 let column_names = (table.columns.iter())
610 .map(|col| col.name_with_hidden().to_string())
611 .map(Pretty::from)
612 .collect();
613
614 let stream_key = (table.stream_key.iter())
615 .map(|&k| table.columns[k].name().to_owned())
616 .map(Pretty::from)
617 .collect();
618
619 let pk_columns = (table.pk.iter())
620 .map(|o| table.columns[o.column_index].name().to_owned())
621 .map(Pretty::from)
622 .collect();
623 let mut vec = Vec::with_capacity(5);
624 vec.push(("columns", Pretty::Array(column_names)));
625 vec.push(("stream_key", Pretty::Array(stream_key)));
626 vec.push(("pk_columns", Pretty::Array(pk_columns)));
627 let pk_conflict_behavior = self.table.conflict_behavior().debug_to_string();
628
629 vec.push(("pk_conflict", Pretty::from(pk_conflict_behavior)));
630
631 let watermark_columns = &self.base.watermark_columns();
632 if self.base.watermark_columns().n_indices() > 0 {
633 let watermark_column_names = watermark_columns
635 .indices()
636 .map(|i| table.columns()[i].name_with_hidden().to_string())
637 .map(Pretty::from)
638 .collect();
639 vec.push(("watermark_columns", Pretty::Array(watermark_column_names)));
640 };
641 childless_record("StreamMaterialize", vec)
642 }
643}
644
645impl PlanTreeNodeUnary<Stream> for StreamMaterialize {
646 fn input(&self) -> PlanRef {
647 self.input.clone()
648 }
649
650 fn clone_with_input(&self, input: PlanRef) -> Self {
651 let new = Self::new_with_staging_and_progress(
652 input,
653 self.table().clone(),
654 self.staging_table.clone(),
655 self.refresh_progress_table.clone(),
656 )
657 .unwrap();
658 new.base
659 .schema()
660 .fields
661 .iter()
662 .zip_eq_fast(self.base.schema().fields.iter())
663 .for_each(|(a, b)| {
664 assert_eq!(a.data_type, b.data_type);
665 });
666 assert_eq!(new.plan_base().stream_key(), self.plan_base().stream_key());
667 new
668 }
669}
670
671impl_plan_tree_node_for_unary! { Stream, StreamMaterialize }
672
673impl StreamNode for StreamMaterialize {
674 fn to_stream_prost_body(&self, state: &mut BuildFragmentGraphState) -> PbNodeBody {
675 use risingwave_pb::stream_plan::*;
676
677 tracing::debug!(
678 table_name = %self.table().name(),
679 refreshable = %self.table().refreshable,
680 has_staging_table = %self.staging_table.is_some(),
681 has_progress_table = %self.refresh_progress_table.is_some(),
682 staging_table_name = ?self.staging_table.as_ref().map(|t| (&t.id, &t.name)),
683 progress_table_name = ?self.refresh_progress_table.as_ref().map(|t| (&t.id, &t.name)),
684 "Converting StreamMaterialize to protobuf"
685 );
686
687 let staging_table_prost = self
688 .staging_table
689 .clone()
690 .map(|t| t.with_id(state.gen_table_id_wrapped()).to_prost());
691
692 let refresh_progress_table_prost = self
693 .refresh_progress_table
694 .clone()
695 .map(|t| t.with_id(state.gen_table_id_wrapped()).to_prost());
696
697 PbNodeBody::Materialize(Box::new(MaterializeNode {
698 table_id: 0,
701 table: None,
702 staging_table: staging_table_prost,
704 refresh_progress_table: refresh_progress_table_prost,
706
707 column_orders: self
708 .table()
709 .pk()
710 .iter()
711 .copied()
712 .map(ColumnOrder::to_protobuf)
713 .collect(),
714 }))
715 }
716}
717
718impl ExprRewritable<Stream> for StreamMaterialize {}
719
720impl ExprVisitable for StreamMaterialize {}