1use std::assert_matches::assert_matches;
16use std::num::NonZeroU32;
17
18use fixedbitset::FixedBitSet;
19use itertools::Itertools;
20use pretty_xmlish::{Pretty, XmlNode};
21use risingwave_common::catalog::{
22 ColumnCatalog, ConflictBehavior, CreateType, Engine, StreamJobStatus, TableId,
23};
24use risingwave_common::hash::VnodeCount;
25use risingwave_common::id::FragmentId;
26use risingwave_common::types::DataType;
27use risingwave_common::util::column_index_mapping::ColIndexMapping;
28use risingwave_common::util::iter_util::ZipEqFast;
29use risingwave_common::util::sort_util::{ColumnOrder, OrderType};
30use risingwave_pb::catalog::PbWebhookSourceInfo;
31use risingwave_pb::stream_plan::stream_node::PbNodeBody;
32
33use super::derive::derive_columns;
34use super::stream::prelude::*;
35use super::utils::{Distill, TableCatalogBuilder, childless_record};
36use super::{
37 ExprRewritable, PlanTreeNodeUnary, StreamNode, StreamPlanRef as PlanRef, reorganize_elements_id,
38};
39use crate::catalog::table_catalog::{TableCatalog, TableType, TableVersion};
40use crate::catalog::{DatabaseId, SchemaId};
41use crate::error::Result;
42use crate::optimizer::StreamOptimizedLogicalPlanRoot;
43use crate::optimizer::plan_node::derive::derive_pk;
44use crate::optimizer::plan_node::expr_visitable::ExprVisitable;
45use crate::optimizer::plan_node::utils::plan_can_use_background_ddl;
46use crate::optimizer::plan_node::{PlanBase, PlanNodeMeta};
47use crate::optimizer::property::{Cardinality, Distribution, Order, RequiredDist};
48use crate::stream_fragmenter::BuildFragmentGraphState;
49
50#[derive(Debug, Clone, PartialEq, Eq, Hash)]
52pub struct StreamMaterialize {
53 pub base: PlanBase<Stream>,
54 input: PlanRef,
56 table: TableCatalog,
57 staging_table: Option<TableCatalog>,
59 refresh_progress_table: Option<TableCatalog>,
61}
62
63impl StreamMaterialize {
64 pub fn new(input: PlanRef, table: TableCatalog) -> Result<Self> {
65 Self::new_with_staging_and_progress(input, table, None, None)
66 }
67
68 pub fn new_with_staging_and_progress(
69 input: PlanRef,
70 table: TableCatalog,
71 staging_table: Option<TableCatalog>,
72 refresh_progress_table: Option<TableCatalog>,
73 ) -> Result<Self> {
74 let kind = match table.conflict_behavior() {
75 ConflictBehavior::NoCheck => {
76 reject_upsert_input!(input, "Materialize without conflict handling")
77 }
78
79 ConflictBehavior::Overwrite
81 | ConflictBehavior::IgnoreConflict
82 | ConflictBehavior::DoUpdateIfNotNull => match input.stream_kind() {
83 StreamKind::AppendOnly => StreamKind::AppendOnly,
84 StreamKind::Retract | StreamKind::Upsert => StreamKind::Retract,
85 },
86 };
87 let base = PlanBase::new_stream(
88 input.ctx(),
89 input.schema().clone(),
90 Some(table.stream_key()),
91 input.functional_dependency().clone(),
92 input.distribution().clone(),
93 kind,
94 input.emit_on_window_close(),
95 input.watermark_columns().clone(),
96 input.columns_monotonicity().clone(),
97 );
98
99 Ok(Self {
100 base,
101 input,
102 table,
103 staging_table,
104 refresh_progress_table,
105 })
106 }
107
108 pub fn create(
113 StreamOptimizedLogicalPlanRoot {
114 plan: input,
115 required_dist: user_distributed_by,
116 required_order: user_order_by,
117 out_fields: user_cols,
118 out_names,
119 ..
120 }: StreamOptimizedLogicalPlanRoot,
121 name: String,
122 database_id: DatabaseId,
123 schema_id: SchemaId,
124 definition: String,
125 table_type: TableType,
126 cardinality: Cardinality,
127 retention_seconds: Option<NonZeroU32>,
128 ) -> Result<Self> {
129 let input = Self::rewrite_input(input, user_distributed_by.clone(), table_type)?;
130 let input = reorganize_elements_id(input);
132 let columns = derive_columns(input.schema(), out_names, &user_cols)?;
133
134 let create_type = if matches!(table_type, TableType::MaterializedView)
135 && input.ctx().session_ctx().config().background_ddl()
136 && plan_can_use_background_ddl(&input)
137 {
138 CreateType::Background
139 } else {
140 CreateType::Foreground
141 };
142
143 let conflict_behavior = match input.stream_kind() {
145 StreamKind::Retract | StreamKind::AppendOnly => ConflictBehavior::NoCheck,
146 StreamKind::Upsert => ConflictBehavior::Overwrite,
147 };
148
149 let table = Self::derive_table_catalog(
150 input.clone(),
151 name,
152 database_id,
153 schema_id,
154 user_distributed_by,
155 user_order_by,
156 columns,
157 definition,
158 conflict_behavior,
159 vec![],
160 None,
161 None,
162 table_type,
163 None,
164 cardinality,
165 retention_seconds,
166 create_type,
167 None,
168 Engine::Hummock,
169 false,
170 )?;
171
172 Self::new(input, table)
173 }
174
175 #[allow(clippy::too_many_arguments)]
181 pub fn create_for_table(
182 input: PlanRef,
183 name: String,
184 database_id: DatabaseId,
185 schema_id: SchemaId,
186 user_distributed_by: RequiredDist,
187 user_order_by: Order,
188 columns: Vec<ColumnCatalog>,
189 definition: String,
190 conflict_behavior: ConflictBehavior,
191 version_column_indices: Vec<usize>,
192 pk_column_indices: Vec<usize>,
193 row_id_index: Option<usize>,
194 version: TableVersion,
195 retention_seconds: Option<NonZeroU32>,
196 webhook_info: Option<PbWebhookSourceInfo>,
197 engine: Engine,
198 refreshable: bool,
199 ) -> Result<Self> {
200 let input = Self::rewrite_input(input, user_distributed_by.clone(), TableType::Table)?;
201
202 let table = Self::derive_table_catalog(
203 input.clone(),
204 name.clone(),
205 database_id,
206 schema_id,
207 user_distributed_by,
208 user_order_by,
209 columns,
210 definition,
211 conflict_behavior,
212 version_column_indices,
213 Some(pk_column_indices),
214 row_id_index,
215 TableType::Table,
216 Some(version),
217 Cardinality::unknown(), retention_seconds,
219 CreateType::Foreground,
220 webhook_info,
221 engine,
222 refreshable,
223 )?;
224
225 let (staging_table, refresh_progress_table) = if refreshable {
227 let staging = Some(Self::derive_staging_table_catalog(table.clone()));
228 let progress = Some(Self::derive_refresh_progress_table_catalog(table.clone()));
229 (staging, progress)
230 } else {
231 (None, None)
232 };
233
234 tracing::info!(
235 table_name = %name,
236 refreshable = %refreshable,
237 has_staging_table = %staging_table.is_some(),
238 has_progress_table = %refresh_progress_table.is_some(),
239 "Creating StreamMaterialize with staging and progress table info"
240 );
241
242 Self::new_with_staging_and_progress(input, table, staging_table, refresh_progress_table)
243 }
244
245 fn rewrite_input(
247 input: PlanRef,
248 user_distributed_by: RequiredDist,
249 table_type: TableType,
250 ) -> Result<PlanRef> {
251 let required_dist = match input.distribution() {
252 Distribution::Single => RequiredDist::single(),
253 _ => match table_type {
254 TableType::Table => {
255 assert_matches!(
256 user_distributed_by,
257 RequiredDist::ShardByKey(_) | RequiredDist::ShardByExactKey(_)
258 );
259 user_distributed_by
260 }
261 TableType::MaterializedView => {
262 assert_matches!(user_distributed_by, RequiredDist::Any);
263 let required_dist =
265 RequiredDist::shard_by_key(input.schema().len(), input.expect_stream_key());
266
267 let is_stream_join = matches!(input.as_stream_hash_join(), Some(_join))
272 || matches!(input.as_stream_temporal_join(), Some(_join))
273 || matches!(input.as_stream_delta_join(), Some(_join));
274
275 if is_stream_join {
276 return Ok(required_dist.stream_enforce(input));
277 }
278
279 required_dist
280 }
281 TableType::Index => {
282 assert_matches!(
283 user_distributed_by,
284 RequiredDist::PhysicalDist(Distribution::HashShard(_))
285 );
286 user_distributed_by
287 }
288 TableType::VectorIndex => {
289 unreachable!("VectorIndex should not be created by StreamMaterialize")
290 }
291 TableType::Internal => unreachable!(),
292 },
293 };
294
295 required_dist.streaming_enforce_if_not_satisfies(input)
296 }
297
298 #[expect(clippy::too_many_arguments)]
303 fn derive_table_catalog(
304 rewritten_input: PlanRef,
305 name: String,
306 database_id: DatabaseId,
307 schema_id: SchemaId,
308 user_distributed_by: RequiredDist,
309 user_order_by: Order,
310 columns: Vec<ColumnCatalog>,
311 definition: String,
312 conflict_behavior: ConflictBehavior,
313 version_column_indices: Vec<usize>,
314 pk_column_indices: Option<Vec<usize>>, row_id_index: Option<usize>,
316 table_type: TableType,
317 version: Option<TableVersion>,
318 cardinality: Cardinality,
319 retention_seconds: Option<NonZeroU32>,
320 create_type: CreateType,
321 webhook_info: Option<PbWebhookSourceInfo>,
322 engine: Engine,
323 refreshable: bool,
324 ) -> Result<TableCatalog> {
325 let input = rewritten_input;
326
327 let value_indices = (0..columns.len()).collect_vec();
328 let distribution_key = input.distribution().dist_column_indices().to_vec();
329 let append_only = input.append_only();
330 let watermark_columns = input.watermark_columns().indices().collect();
333
334 let (table_pk, stream_key) = if let Some(pk_column_indices) = pk_column_indices {
335 let table_pk = pk_column_indices
336 .iter()
337 .map(|idx| ColumnOrder::new(*idx, OrderType::ascending()))
338 .collect();
339 (table_pk, pk_column_indices)
341 } else {
342 derive_pk(input, user_distributed_by, user_order_by, &columns)
343 };
344 let read_prefix_len_hint = table_pk.len();
347 Ok(TableCatalog {
348 id: TableId::placeholder(),
349 schema_id,
350 database_id,
351 associated_source_id: None,
352 name,
353 columns,
354 pk: table_pk,
355 stream_key,
356 distribution_key,
357 table_type,
358 append_only,
359 owner: risingwave_common::catalog::DEFAULT_SUPER_USER_ID,
360 fragment_id: FragmentId::placeholder(),
361 dml_fragment_id: None,
362 vnode_col_index: None,
363 row_id_index,
364 value_indices,
365 definition,
366 conflict_behavior,
367 version_column_indices,
368 read_prefix_len_hint,
369 version,
370 watermark_columns,
371 dist_key_in_pk: vec![],
372 cardinality,
373 created_at_epoch: None,
374 initialized_at_epoch: None,
375 cleaned_by_watermark: false,
376 create_type,
377 stream_job_status: StreamJobStatus::Creating,
378 description: None,
379 initialized_at_cluster_version: None,
380 created_at_cluster_version: None,
381 retention_seconds: retention_seconds.map(|i| i.into()),
382 cdc_table_id: None,
383 vnode_count: VnodeCount::Placeholder, webhook_info,
385 job_id: None,
386 engine: match table_type {
387 TableType::Table => engine,
388 TableType::MaterializedView
389 | TableType::Index
390 | TableType::Internal
391 | TableType::VectorIndex => {
392 assert_eq!(engine, Engine::Hummock);
393 engine
394 }
395 },
396 clean_watermark_index_in_pk: None, clean_watermark_indices: vec![], refreshable,
399 vector_index_info: None,
400 cdc_table_type: None,
401 })
402 }
403
404 fn derive_staging_table_catalog(
406 TableCatalog {
407 id,
408 schema_id,
409 database_id,
410 associated_source_id,
411 name,
412 columns,
413 pk,
414 stream_key,
415 table_type: _,
416 distribution_key,
417 append_only,
418 cardinality,
419 owner,
420 retention_seconds,
421 fragment_id,
422 dml_fragment_id: _,
423 vnode_col_index,
424 row_id_index,
425 value_indices: _,
426 definition,
427 conflict_behavior,
428 version_column_indices,
429 read_prefix_len_hint,
430 version,
431 watermark_columns: _,
432 dist_key_in_pk,
433 created_at_epoch,
434 initialized_at_epoch,
435 cleaned_by_watermark,
436 create_type,
437 stream_job_status,
438 description,
439 created_at_cluster_version,
440 initialized_at_cluster_version,
441 cdc_table_id,
442 vnode_count,
443 webhook_info,
444 job_id,
445 engine,
446 clean_watermark_index_in_pk,
447 clean_watermark_indices,
448 refreshable,
449 vector_index_info,
450 cdc_table_type,
451 }: TableCatalog,
452 ) -> TableCatalog {
453 tracing::info!(
454 table_name = %name,
455 "Creating staging table for refreshable table"
456 );
457
458 assert!(row_id_index.is_none());
459 assert!(retention_seconds.is_none());
460 assert!(refreshable);
461
462 let mut pk_col_indices = vec![];
464 let mut pk_cols = vec![];
465 for (i, col) in columns.iter().enumerate() {
466 if pk.iter().any(|pk| pk.column_index == i) {
467 pk_col_indices.push(i);
468 pk_cols.push(col.clone());
469 }
470 }
471 let mapping = ColIndexMapping::with_remaining_columns(&pk_col_indices, columns.len());
472
473 TableCatalog {
474 id,
475 schema_id,
476 database_id,
477 associated_source_id,
478 name,
479 value_indices: (0..pk_cols.len()).collect(),
480 columns: pk_cols,
481 pk: pk
482 .iter()
483 .map(|pk| ColumnOrder::new(mapping.map(pk.column_index), pk.order_type))
484 .collect(),
485 stream_key: mapping.try_map_all(stream_key).unwrap(),
486 vnode_col_index: vnode_col_index.map(|i| mapping.map(i)),
487 dist_key_in_pk: mapping.try_map_all(dist_key_in_pk).unwrap(),
488 distribution_key: mapping.try_map_all(distribution_key).unwrap(),
489 table_type: TableType::Internal,
490 watermark_columns: FixedBitSet::new(),
491 append_only,
492 cardinality,
493 owner,
494 retention_seconds: None,
495 fragment_id,
496 dml_fragment_id: None,
497 row_id_index: None,
498 definition,
499 conflict_behavior,
500 version_column_indices,
501 read_prefix_len_hint,
502 version,
503 created_at_epoch,
504 initialized_at_epoch,
505 cleaned_by_watermark,
506 create_type,
507 stream_job_status,
508 description,
509 created_at_cluster_version,
510 initialized_at_cluster_version,
511 cdc_table_id,
512 vnode_count,
513 webhook_info,
514 job_id,
515 engine,
516 clean_watermark_index_in_pk,
517 clean_watermark_indices,
518 refreshable: false,
519 vector_index_info,
520 cdc_table_type,
521 }
522 }
523
524 fn derive_refresh_progress_table_catalog(table: TableCatalog) -> TableCatalog {
528 tracing::debug!(
529 table_name = %table.name,
530 "Creating refresh progress table for refreshable table"
531 );
532
533 let mut columns = vec![ColumnCatalog {
536 column_desc: risingwave_common::catalog::ColumnDesc::named(
537 "vnode",
538 0.into(),
539 DataType::Int16,
540 ),
541 is_hidden: false,
542 }];
543
544 let mut col_index = 1;
546 for pk_col in &table.pk {
547 let upstream_col = &table.columns[pk_col.column_index];
548 columns.push(ColumnCatalog {
549 column_desc: risingwave_common::catalog::ColumnDesc::named(
550 format!("pos_{}", upstream_col.name()),
551 col_index.into(),
552 upstream_col.data_type().clone(),
553 ),
554 is_hidden: false,
555 });
556 col_index += 1;
557 }
558
559 for (name, data_type) in [
561 ("is_completed", DataType::Boolean),
562 ("processed_rows", DataType::Int64),
563 ] {
564 columns.push(ColumnCatalog {
565 column_desc: risingwave_common::catalog::ColumnDesc::named(
566 name,
567 col_index.into(),
568 data_type,
569 ),
570 is_hidden: false,
571 });
572 col_index += 1;
573 }
574
575 let mut builder = TableCatalogBuilder::default();
576
577 for column in &columns {
579 builder.add_column(&(&column.column_desc).into());
580 }
581
582 builder.add_order_column(0, OrderType::ascending());
584 builder.set_vnode_col_idx(0);
585 builder.set_value_indices((0..columns.len()).collect());
586 builder.set_dist_key_in_pk(vec![0]);
587
588 builder.build(vec![0], 1)
589 }
590
591 #[must_use]
593 pub fn table(&self) -> &TableCatalog {
594 &self.table
595 }
596
597 #[must_use]
599 pub fn staging_table(&self) -> Option<&TableCatalog> {
600 self.staging_table.as_ref()
601 }
602
603 #[must_use]
605 pub fn refresh_progress_table(&self) -> Option<&TableCatalog> {
606 self.refresh_progress_table.as_ref()
607 }
608
609 pub fn name(&self) -> &str {
610 self.table.name()
611 }
612}
613
614impl Distill for StreamMaterialize {
615 fn distill<'a>(&self) -> XmlNode<'a> {
616 let table = self.table();
617
618 let column_names = (table.columns.iter())
619 .map(|col| col.name_with_hidden().to_string())
620 .map(Pretty::from)
621 .collect();
622
623 let stream_key = (table.stream_key().iter())
624 .map(|&k| table.columns[k].name().to_owned())
625 .map(Pretty::from)
626 .collect();
627
628 let pk_columns = (table.pk.iter())
629 .map(|o| table.columns[o.column_index].name().to_owned())
630 .map(Pretty::from)
631 .collect();
632 let mut vec = Vec::with_capacity(5);
633 vec.push(("columns", Pretty::Array(column_names)));
634 vec.push(("stream_key", Pretty::Array(stream_key)));
635 vec.push(("pk_columns", Pretty::Array(pk_columns)));
636 let pk_conflict_behavior = self.table.conflict_behavior().debug_to_string();
637
638 vec.push(("pk_conflict", Pretty::from(pk_conflict_behavior)));
639
640 let watermark_columns = &self.base.watermark_columns();
641 if self.base.watermark_columns().n_indices() > 0 {
642 let watermark_column_names = watermark_columns
644 .indices()
645 .map(|i| table.columns()[i].name_with_hidden().to_string())
646 .map(Pretty::from)
647 .collect();
648 vec.push(("watermark_columns", Pretty::Array(watermark_column_names)));
649 };
650 childless_record("StreamMaterialize", vec)
651 }
652}
653
654impl PlanTreeNodeUnary<Stream> for StreamMaterialize {
655 fn input(&self) -> PlanRef {
656 self.input.clone()
657 }
658
659 fn clone_with_input(&self, input: PlanRef) -> Self {
660 let new = Self::new_with_staging_and_progress(
661 input,
662 self.table().clone(),
663 self.staging_table.clone(),
664 self.refresh_progress_table.clone(),
665 )
666 .unwrap();
667 new.base
668 .schema()
669 .fields
670 .iter()
671 .zip_eq_fast(self.base.schema().fields.iter())
672 .for_each(|(a, b)| {
673 assert_eq!(a.data_type, b.data_type);
674 });
675 assert_eq!(new.plan_base().stream_key(), self.plan_base().stream_key());
676 new
677 }
678}
679
680impl_plan_tree_node_for_unary! { Stream, StreamMaterialize }
681
682impl StreamNode for StreamMaterialize {
683 fn to_stream_prost_body(&self, state: &mut BuildFragmentGraphState) -> PbNodeBody {
684 use risingwave_pb::stream_plan::*;
685
686 tracing::debug!(
687 table_name = %self.table().name(),
688 refreshable = %self.table().refreshable,
689 has_staging_table = %self.staging_table.is_some(),
690 has_progress_table = %self.refresh_progress_table.is_some(),
691 staging_table_name = ?self.staging_table.as_ref().map(|t| (&t.id, &t.name)),
692 progress_table_name = ?self.refresh_progress_table.as_ref().map(|t| (&t.id, &t.name)),
693 "Converting StreamMaterialize to protobuf"
694 );
695
696 let staging_table_prost = self
697 .staging_table
698 .clone()
699 .map(|t| t.with_id(state.gen_table_id_wrapped()).to_prost());
700
701 let refresh_progress_table_prost = self
702 .refresh_progress_table
703 .clone()
704 .map(|t| t.with_id(state.gen_table_id_wrapped()).to_prost());
705
706 PbNodeBody::Materialize(Box::new(MaterializeNode {
707 table_id: 0.into(),
710 table: None,
711 staging_table: staging_table_prost,
713 refresh_progress_table: refresh_progress_table_prost,
715
716 column_orders: self
717 .table()
718 .pk()
719 .iter()
720 .copied()
721 .map(ColumnOrder::to_protobuf)
722 .collect(),
723 }))
724 }
725}
726
727impl ExprRewritable<Stream> for StreamMaterialize {}
728
729impl ExprVisitable for StreamMaterialize {}