1use std::assert_matches::assert_matches;
16use std::num::NonZeroU32;
17
18use fixedbitset::FixedBitSet;
19use itertools::Itertools;
20use pretty_xmlish::{Pretty, XmlNode};
21use risingwave_common::catalog::{
22 ColumnCatalog, ConflictBehavior, CreateType, Engine, OBJECT_ID_PLACEHOLDER, StreamJobStatus,
23 TableId,
24};
25use risingwave_common::hash::VnodeCount;
26use risingwave_common::types::DataType;
27use risingwave_common::util::column_index_mapping::ColIndexMapping;
28use risingwave_common::util::iter_util::ZipEqFast;
29use risingwave_common::util::sort_util::{ColumnOrder, OrderType};
30use risingwave_pb::catalog::PbWebhookSourceInfo;
31use risingwave_pb::stream_plan::stream_node::PbNodeBody;
32
33use super::derive::derive_columns;
34use super::stream::prelude::*;
35use super::utils::{Distill, TableCatalogBuilder, childless_record};
36use super::{
37 ExprRewritable, PlanTreeNodeUnary, StreamNode, StreamPlanRef as PlanRef, reorganize_elements_id,
38};
39use crate::catalog::table_catalog::{TableCatalog, TableType, TableVersion};
40use crate::catalog::{DatabaseId, SchemaId};
41use crate::error::Result;
42use crate::optimizer::StreamOptimizedLogicalPlanRoot;
43use crate::optimizer::plan_node::derive::derive_pk;
44use crate::optimizer::plan_node::expr_visitable::ExprVisitable;
45use crate::optimizer::plan_node::utils::plan_can_use_background_ddl;
46use crate::optimizer::plan_node::{PlanBase, PlanNodeMeta};
47use crate::optimizer::property::{Cardinality, Distribution, Order, RequiredDist};
48use crate::stream_fragmenter::BuildFragmentGraphState;
49
50#[derive(Debug, Clone, PartialEq, Eq, Hash)]
52pub struct StreamMaterialize {
53 pub base: PlanBase<Stream>,
54 input: PlanRef,
56 table: TableCatalog,
57 staging_table: Option<TableCatalog>,
59 refresh_progress_table: Option<TableCatalog>,
61}
62
63impl StreamMaterialize {
64 pub fn new(input: PlanRef, table: TableCatalog) -> Result<Self> {
65 Self::new_with_staging_and_progress(input, table, None, None)
66 }
67
68 pub fn new_with_staging_and_progress(
69 input: PlanRef,
70 table: TableCatalog,
71 staging_table: Option<TableCatalog>,
72 refresh_progress_table: Option<TableCatalog>,
73 ) -> Result<Self> {
74 let kind = match table.conflict_behavior() {
75 ConflictBehavior::NoCheck => {
76 reject_upsert_input!(input, "Materialize without conflict handling")
77 }
78
79 ConflictBehavior::Overwrite
81 | ConflictBehavior::IgnoreConflict
82 | ConflictBehavior::DoUpdateIfNotNull => match input.stream_kind() {
83 StreamKind::AppendOnly => StreamKind::AppendOnly,
84 StreamKind::Retract | StreamKind::Upsert => StreamKind::Retract,
85 },
86 };
87 let base = PlanBase::new_stream(
88 input.ctx(),
89 input.schema().clone(),
90 Some(table.stream_key()),
91 input.functional_dependency().clone(),
92 input.distribution().clone(),
93 kind,
94 input.emit_on_window_close(),
95 input.watermark_columns().clone(),
96 input.columns_monotonicity().clone(),
97 );
98
99 Ok(Self {
100 base,
101 input,
102 table,
103 staging_table,
104 refresh_progress_table,
105 })
106 }
107
108 pub fn create(
113 StreamOptimizedLogicalPlanRoot {
114 plan: input,
115 required_dist: user_distributed_by,
116 required_order: user_order_by,
117 out_fields: user_cols,
118 out_names,
119 ..
120 }: StreamOptimizedLogicalPlanRoot,
121 name: String,
122 database_id: DatabaseId,
123 schema_id: SchemaId,
124 definition: String,
125 table_type: TableType,
126 cardinality: Cardinality,
127 retention_seconds: Option<NonZeroU32>,
128 ) -> Result<Self> {
129 let input = Self::rewrite_input(input, user_distributed_by.clone(), table_type)?;
130 let input = reorganize_elements_id(input);
132 let columns = derive_columns(input.schema(), out_names, &user_cols)?;
133
134 let create_type = if matches!(table_type, TableType::MaterializedView)
135 && input.ctx().session_ctx().config().background_ddl()
136 && plan_can_use_background_ddl(&input)
137 {
138 CreateType::Background
139 } else {
140 CreateType::Foreground
141 };
142
143 let conflict_behavior = match input.stream_kind() {
145 StreamKind::Retract | StreamKind::AppendOnly => ConflictBehavior::NoCheck,
146 StreamKind::Upsert => ConflictBehavior::Overwrite,
147 };
148
149 let table = Self::derive_table_catalog(
150 input.clone(),
151 name,
152 database_id,
153 schema_id,
154 user_distributed_by,
155 user_order_by,
156 columns,
157 definition,
158 conflict_behavior,
159 vec![],
160 None,
161 None,
162 table_type,
163 None,
164 cardinality,
165 retention_seconds,
166 create_type,
167 None,
168 Engine::Hummock,
169 false,
170 )?;
171
172 Self::new(input, table)
173 }
174
175 #[allow(clippy::too_many_arguments)]
181 pub fn create_for_table(
182 input: PlanRef,
183 name: String,
184 database_id: DatabaseId,
185 schema_id: SchemaId,
186 user_distributed_by: RequiredDist,
187 user_order_by: Order,
188 columns: Vec<ColumnCatalog>,
189 definition: String,
190 conflict_behavior: ConflictBehavior,
191 version_column_indices: Vec<usize>,
192 pk_column_indices: Vec<usize>,
193 row_id_index: Option<usize>,
194 version: TableVersion,
195 retention_seconds: Option<NonZeroU32>,
196 webhook_info: Option<PbWebhookSourceInfo>,
197 engine: Engine,
198 refreshable: bool,
199 ) -> Result<Self> {
200 let input = Self::rewrite_input(input, user_distributed_by.clone(), TableType::Table)?;
201
202 let table = Self::derive_table_catalog(
203 input.clone(),
204 name.clone(),
205 database_id,
206 schema_id,
207 user_distributed_by,
208 user_order_by,
209 columns,
210 definition,
211 conflict_behavior,
212 version_column_indices,
213 Some(pk_column_indices),
214 row_id_index,
215 TableType::Table,
216 Some(version),
217 Cardinality::unknown(), retention_seconds,
219 CreateType::Foreground,
220 webhook_info,
221 engine,
222 refreshable,
223 )?;
224
225 let (staging_table, refresh_progress_table) = if refreshable {
227 let staging = Some(Self::derive_staging_table_catalog(table.clone()));
228 let progress = Some(Self::derive_refresh_progress_table_catalog(table.clone()));
229 (staging, progress)
230 } else {
231 (None, None)
232 };
233
234 tracing::info!(
235 table_name = %name,
236 refreshable = %refreshable,
237 has_staging_table = %staging_table.is_some(),
238 has_progress_table = %refresh_progress_table.is_some(),
239 "Creating StreamMaterialize with staging and progress table info"
240 );
241
242 Self::new_with_staging_and_progress(input, table, staging_table, refresh_progress_table)
243 }
244
245 fn rewrite_input(
247 input: PlanRef,
248 user_distributed_by: RequiredDist,
249 table_type: TableType,
250 ) -> Result<PlanRef> {
251 let required_dist = match input.distribution() {
252 Distribution::Single => RequiredDist::single(),
253 _ => match table_type {
254 TableType::Table => {
255 assert_matches!(user_distributed_by, RequiredDist::ShardByKey(_));
256 user_distributed_by
257 }
258 TableType::MaterializedView => {
259 assert_matches!(user_distributed_by, RequiredDist::Any);
260 let required_dist =
262 RequiredDist::shard_by_key(input.schema().len(), input.expect_stream_key());
263
264 let is_stream_join = matches!(input.as_stream_hash_join(), Some(_join))
269 || matches!(input.as_stream_temporal_join(), Some(_join))
270 || matches!(input.as_stream_delta_join(), Some(_join));
271
272 if is_stream_join {
273 return Ok(required_dist.stream_enforce(input));
274 }
275
276 required_dist
277 }
278 TableType::Index => {
279 assert_matches!(
280 user_distributed_by,
281 RequiredDist::PhysicalDist(Distribution::HashShard(_))
282 );
283 user_distributed_by
284 }
285 TableType::VectorIndex => {
286 unreachable!("VectorIndex should not be created by StreamMaterialize")
287 }
288 TableType::Internal => unreachable!(),
289 },
290 };
291
292 required_dist.streaming_enforce_if_not_satisfies(input)
293 }
294
295 #[expect(clippy::too_many_arguments)]
300 fn derive_table_catalog(
301 rewritten_input: PlanRef,
302 name: String,
303 database_id: DatabaseId,
304 schema_id: SchemaId,
305 user_distributed_by: RequiredDist,
306 user_order_by: Order,
307 columns: Vec<ColumnCatalog>,
308 definition: String,
309 conflict_behavior: ConflictBehavior,
310 version_column_indices: Vec<usize>,
311 pk_column_indices: Option<Vec<usize>>, row_id_index: Option<usize>,
313 table_type: TableType,
314 version: Option<TableVersion>,
315 cardinality: Cardinality,
316 retention_seconds: Option<NonZeroU32>,
317 create_type: CreateType,
318 webhook_info: Option<PbWebhookSourceInfo>,
319 engine: Engine,
320 refreshable: bool,
321 ) -> Result<TableCatalog> {
322 let input = rewritten_input;
323
324 let value_indices = (0..columns.len()).collect_vec();
325 let distribution_key = input.distribution().dist_column_indices().to_vec();
326 let append_only = input.append_only();
327 let watermark_columns = input.watermark_columns().indices().collect();
330
331 let (table_pk, stream_key) = if let Some(pk_column_indices) = pk_column_indices {
332 let table_pk = pk_column_indices
333 .iter()
334 .map(|idx| ColumnOrder::new(*idx, OrderType::ascending()))
335 .collect();
336 (table_pk, pk_column_indices)
338 } else {
339 derive_pk(input, user_distributed_by, user_order_by, &columns)
340 };
341 let read_prefix_len_hint = table_pk.len();
344 Ok(TableCatalog {
345 id: TableId::placeholder(),
346 schema_id,
347 database_id,
348 associated_source_id: None,
349 name,
350 columns,
351 pk: table_pk,
352 stream_key,
353 distribution_key,
354 table_type,
355 append_only,
356 owner: risingwave_common::catalog::DEFAULT_SUPER_USER_ID,
357 fragment_id: OBJECT_ID_PLACEHOLDER,
358 dml_fragment_id: None,
359 vnode_col_index: None,
360 row_id_index,
361 value_indices,
362 definition,
363 conflict_behavior,
364 version_column_indices,
365 read_prefix_len_hint,
366 version,
367 watermark_columns,
368 dist_key_in_pk: vec![],
369 cardinality,
370 created_at_epoch: None,
371 initialized_at_epoch: None,
372 cleaned_by_watermark: false,
373 create_type,
374 stream_job_status: StreamJobStatus::Creating,
375 description: None,
376 initialized_at_cluster_version: None,
377 created_at_cluster_version: None,
378 retention_seconds: retention_seconds.map(|i| i.into()),
379 cdc_table_id: None,
380 vnode_count: VnodeCount::Placeholder, webhook_info,
382 job_id: None,
383 engine: match table_type {
384 TableType::Table => engine,
385 TableType::MaterializedView
386 | TableType::Index
387 | TableType::Internal
388 | TableType::VectorIndex => {
389 assert_eq!(engine, Engine::Hummock);
390 engine
391 }
392 },
393 clean_watermark_index_in_pk: None, refreshable,
395 vector_index_info: None,
396 cdc_table_type: None,
397 })
398 }
399
400 fn derive_staging_table_catalog(
402 TableCatalog {
403 id,
404 schema_id,
405 database_id,
406 associated_source_id,
407 name,
408 columns,
409 pk,
410 stream_key,
411 table_type: _,
412 distribution_key,
413 append_only,
414 cardinality,
415 owner,
416 retention_seconds,
417 fragment_id,
418 dml_fragment_id: _,
419 vnode_col_index,
420 row_id_index,
421 value_indices: _,
422 definition,
423 conflict_behavior,
424 version_column_indices,
425 read_prefix_len_hint,
426 version,
427 watermark_columns: _,
428 dist_key_in_pk,
429 created_at_epoch,
430 initialized_at_epoch,
431 cleaned_by_watermark,
432 create_type,
433 stream_job_status,
434 description,
435 created_at_cluster_version,
436 initialized_at_cluster_version,
437 cdc_table_id,
438 vnode_count,
439 webhook_info,
440 job_id,
441 engine,
442 clean_watermark_index_in_pk,
443 refreshable,
444 vector_index_info,
445 cdc_table_type,
446 }: TableCatalog,
447 ) -> TableCatalog {
448 tracing::info!(
449 table_name = %name,
450 "Creating staging table for refreshable table"
451 );
452
453 assert!(row_id_index.is_none());
454 assert!(retention_seconds.is_none());
455 assert!(refreshable);
456
457 let mut pk_col_indices = vec![];
459 let mut pk_cols = vec![];
460 for (i, col) in columns.iter().enumerate() {
461 if pk.iter().any(|pk| pk.column_index == i) {
462 pk_col_indices.push(i);
463 pk_cols.push(col.clone());
464 }
465 }
466 let mapping = ColIndexMapping::with_remaining_columns(&pk_col_indices, columns.len());
467
468 TableCatalog {
469 id,
470 schema_id,
471 database_id,
472 associated_source_id,
473 name,
474 value_indices: (0..pk_cols.len()).collect(),
475 columns: pk_cols,
476 pk: pk
477 .iter()
478 .map(|pk| ColumnOrder::new(mapping.map(pk.column_index), pk.order_type))
479 .collect(),
480 stream_key: mapping.try_map_all(stream_key).unwrap(),
481 vnode_col_index: vnode_col_index.map(|i| mapping.map(i)),
482 dist_key_in_pk: mapping.try_map_all(dist_key_in_pk).unwrap(),
483 distribution_key: mapping.try_map_all(distribution_key).unwrap(),
484 table_type: TableType::Internal,
485 watermark_columns: FixedBitSet::new(),
486 append_only,
487 cardinality,
488 owner,
489 retention_seconds: None,
490 fragment_id,
491 dml_fragment_id: None,
492 row_id_index: None,
493 definition,
494 conflict_behavior,
495 version_column_indices,
496 read_prefix_len_hint,
497 version,
498 created_at_epoch,
499 initialized_at_epoch,
500 cleaned_by_watermark,
501 create_type,
502 stream_job_status,
503 description,
504 created_at_cluster_version,
505 initialized_at_cluster_version,
506 cdc_table_id,
507 vnode_count,
508 webhook_info,
509 job_id,
510 engine,
511 clean_watermark_index_in_pk,
512 refreshable: false,
513 vector_index_info,
514 cdc_table_type,
515 }
516 }
517
518 fn derive_refresh_progress_table_catalog(table: TableCatalog) -> TableCatalog {
522 tracing::debug!(
523 table_name = %table.name,
524 "Creating refresh progress table for refreshable table"
525 );
526
527 let mut columns = vec![ColumnCatalog {
530 column_desc: risingwave_common::catalog::ColumnDesc::named(
531 "vnode",
532 0.into(),
533 DataType::Int16,
534 ),
535 is_hidden: false,
536 }];
537
538 let mut col_index = 1;
540 for pk_col in &table.pk {
541 let upstream_col = &table.columns[pk_col.column_index];
542 columns.push(ColumnCatalog {
543 column_desc: risingwave_common::catalog::ColumnDesc::named(
544 format!("pos_{}", upstream_col.name()),
545 col_index.into(),
546 upstream_col.data_type().clone(),
547 ),
548 is_hidden: false,
549 });
550 col_index += 1;
551 }
552
553 for (name, data_type) in [
555 ("is_completed", DataType::Boolean),
556 ("processed_rows", DataType::Int64),
557 ] {
558 columns.push(ColumnCatalog {
559 column_desc: risingwave_common::catalog::ColumnDesc::named(
560 name,
561 col_index.into(),
562 data_type,
563 ),
564 is_hidden: false,
565 });
566 col_index += 1;
567 }
568
569 let mut builder = TableCatalogBuilder::default();
570
571 for column in &columns {
573 builder.add_column(&(&column.column_desc).into());
574 }
575
576 builder.add_order_column(0, OrderType::ascending());
578 builder.set_vnode_col_idx(0);
579 builder.set_value_indices((0..columns.len()).collect());
580 builder.set_dist_key_in_pk(vec![0]);
581
582 builder.build(vec![0], 1)
583 }
584
585 #[must_use]
587 pub fn table(&self) -> &TableCatalog {
588 &self.table
589 }
590
591 #[must_use]
593 pub fn staging_table(&self) -> Option<&TableCatalog> {
594 self.staging_table.as_ref()
595 }
596
597 #[must_use]
599 pub fn refresh_progress_table(&self) -> Option<&TableCatalog> {
600 self.refresh_progress_table.as_ref()
601 }
602
603 pub fn name(&self) -> &str {
604 self.table.name()
605 }
606}
607
608impl Distill for StreamMaterialize {
609 fn distill<'a>(&self) -> XmlNode<'a> {
610 let table = self.table();
611
612 let column_names = (table.columns.iter())
613 .map(|col| col.name_with_hidden().to_string())
614 .map(Pretty::from)
615 .collect();
616
617 let stream_key = (table.stream_key().iter())
618 .map(|&k| table.columns[k].name().to_owned())
619 .map(Pretty::from)
620 .collect();
621
622 let pk_columns = (table.pk.iter())
623 .map(|o| table.columns[o.column_index].name().to_owned())
624 .map(Pretty::from)
625 .collect();
626 let mut vec = Vec::with_capacity(5);
627 vec.push(("columns", Pretty::Array(column_names)));
628 vec.push(("stream_key", Pretty::Array(stream_key)));
629 vec.push(("pk_columns", Pretty::Array(pk_columns)));
630 let pk_conflict_behavior = self.table.conflict_behavior().debug_to_string();
631
632 vec.push(("pk_conflict", Pretty::from(pk_conflict_behavior)));
633
634 let watermark_columns = &self.base.watermark_columns();
635 if self.base.watermark_columns().n_indices() > 0 {
636 let watermark_column_names = watermark_columns
638 .indices()
639 .map(|i| table.columns()[i].name_with_hidden().to_string())
640 .map(Pretty::from)
641 .collect();
642 vec.push(("watermark_columns", Pretty::Array(watermark_column_names)));
643 };
644 childless_record("StreamMaterialize", vec)
645 }
646}
647
648impl PlanTreeNodeUnary<Stream> for StreamMaterialize {
649 fn input(&self) -> PlanRef {
650 self.input.clone()
651 }
652
653 fn clone_with_input(&self, input: PlanRef) -> Self {
654 let new = Self::new_with_staging_and_progress(
655 input,
656 self.table().clone(),
657 self.staging_table.clone(),
658 self.refresh_progress_table.clone(),
659 )
660 .unwrap();
661 new.base
662 .schema()
663 .fields
664 .iter()
665 .zip_eq_fast(self.base.schema().fields.iter())
666 .for_each(|(a, b)| {
667 assert_eq!(a.data_type, b.data_type);
668 });
669 assert_eq!(new.plan_base().stream_key(), self.plan_base().stream_key());
670 new
671 }
672}
673
674impl_plan_tree_node_for_unary! { Stream, StreamMaterialize }
675
676impl StreamNode for StreamMaterialize {
677 fn to_stream_prost_body(&self, state: &mut BuildFragmentGraphState) -> PbNodeBody {
678 use risingwave_pb::stream_plan::*;
679
680 tracing::debug!(
681 table_name = %self.table().name(),
682 refreshable = %self.table().refreshable,
683 has_staging_table = %self.staging_table.is_some(),
684 has_progress_table = %self.refresh_progress_table.is_some(),
685 staging_table_name = ?self.staging_table.as_ref().map(|t| (&t.id, &t.name)),
686 progress_table_name = ?self.refresh_progress_table.as_ref().map(|t| (&t.id, &t.name)),
687 "Converting StreamMaterialize to protobuf"
688 );
689
690 let staging_table_prost = self
691 .staging_table
692 .clone()
693 .map(|t| t.with_id(state.gen_table_id_wrapped()).to_prost());
694
695 let refresh_progress_table_prost = self
696 .refresh_progress_table
697 .clone()
698 .map(|t| t.with_id(state.gen_table_id_wrapped()).to_prost());
699
700 PbNodeBody::Materialize(Box::new(MaterializeNode {
701 table_id: 0,
704 table: None,
705 staging_table: staging_table_prost,
707 refresh_progress_table: refresh_progress_table_prost,
709
710 column_orders: self
711 .table()
712 .pk()
713 .iter()
714 .copied()
715 .map(ColumnOrder::to_protobuf)
716 .collect(),
717 }))
718 }
719}
720
721impl ExprRewritable<Stream> for StreamMaterialize {}
722
723impl ExprVisitable for StreamMaterialize {}