1use std::assert_matches::assert_matches;
16use std::num::NonZeroU32;
17
18use fixedbitset::FixedBitSet;
19use itertools::Itertools;
20use pretty_xmlish::{Pretty, XmlNode};
21use risingwave_common::catalog::{
22 ColumnCatalog, ConflictBehavior, CreateType, Engine, StreamJobStatus, TableId,
23};
24use risingwave_common::hash::VnodeCount;
25use risingwave_common::id::FragmentId;
26use risingwave_common::types::DataType;
27use risingwave_common::util::column_index_mapping::ColIndexMapping;
28use risingwave_common::util::iter_util::ZipEqFast;
29use risingwave_common::util::sort_util::{ColumnOrder, OrderType};
30use risingwave_pb::catalog::PbWebhookSourceInfo;
31use risingwave_pb::stream_plan::stream_node::PbNodeBody;
32
33use super::derive::derive_columns;
34use super::stream::prelude::*;
35use super::utils::{Distill, TableCatalogBuilder, childless_record};
36use super::{
37 ExprRewritable, PlanTreeNodeUnary, StreamNode, StreamPlanRef as PlanRef, reorganize_elements_id,
38};
39use crate::catalog::table_catalog::{TableCatalog, TableType, TableVersion};
40use crate::catalog::{DatabaseId, SchemaId};
41use crate::error::Result;
42use crate::optimizer::StreamOptimizedLogicalPlanRoot;
43use crate::optimizer::plan_node::derive::derive_pk;
44use crate::optimizer::plan_node::expr_visitable::ExprVisitable;
45use crate::optimizer::plan_node::utils::plan_can_use_background_ddl;
46use crate::optimizer::plan_node::{PlanBase, PlanNodeMeta};
47use crate::optimizer::property::{Cardinality, Distribution, Order, RequiredDist};
48use crate::stream_fragmenter::BuildFragmentGraphState;
49
50#[derive(Debug, Clone, PartialEq, Eq, Hash)]
52pub struct StreamMaterialize {
53 pub base: PlanBase<Stream>,
54 input: PlanRef,
56 table: TableCatalog,
57 staging_table: Option<TableCatalog>,
59 refresh_progress_table: Option<TableCatalog>,
61}
62
63impl StreamMaterialize {
64 pub fn new(input: PlanRef, table: TableCatalog) -> Result<Self> {
65 Self::new_with_staging_and_progress(input, table, None, None)
66 }
67
68 pub fn new_with_staging_and_progress(
69 input: PlanRef,
70 table: TableCatalog,
71 staging_table: Option<TableCatalog>,
72 refresh_progress_table: Option<TableCatalog>,
73 ) -> Result<Self> {
74 let kind = match table.conflict_behavior() {
75 ConflictBehavior::NoCheck => {
76 reject_upsert_input!(input, "Materialize without conflict handling")
77 }
78
79 ConflictBehavior::Overwrite
81 | ConflictBehavior::IgnoreConflict
82 | ConflictBehavior::DoUpdateIfNotNull => match input.stream_kind() {
83 StreamKind::AppendOnly => StreamKind::AppendOnly,
84 StreamKind::Retract | StreamKind::Upsert => StreamKind::Retract,
85 },
86 };
87 let base = PlanBase::new_stream(
88 input.ctx(),
89 input.schema().clone(),
90 Some(table.stream_key()),
91 input.functional_dependency().clone(),
92 input.distribution().clone(),
93 kind,
94 input.emit_on_window_close(),
95 input.watermark_columns().clone(),
96 input.columns_monotonicity().clone(),
97 );
98
99 Ok(Self {
100 base,
101 input,
102 table,
103 staging_table,
104 refresh_progress_table,
105 })
106 }
107
108 pub fn create(
113 StreamOptimizedLogicalPlanRoot {
114 plan: input,
115 required_dist: user_distributed_by,
116 required_order: user_order_by,
117 out_fields: user_cols,
118 out_names,
119 ..
120 }: StreamOptimizedLogicalPlanRoot,
121 name: String,
122 database_id: DatabaseId,
123 schema_id: SchemaId,
124 definition: String,
125 table_type: TableType,
126 cardinality: Cardinality,
127 retention_seconds: Option<NonZeroU32>,
128 ) -> Result<Self> {
129 let input = Self::rewrite_input(input, user_distributed_by.clone(), table_type)?;
130 let input = reorganize_elements_id(input);
132 let columns = derive_columns(input.schema(), out_names, &user_cols)?;
133
134 let create_type = if matches!(table_type, TableType::MaterializedView)
135 && input.ctx().session_ctx().config().background_ddl()
136 && plan_can_use_background_ddl(&input)
137 {
138 CreateType::Background
139 } else {
140 CreateType::Foreground
141 };
142
143 let conflict_behavior = match input.stream_kind() {
145 StreamKind::Retract | StreamKind::AppendOnly => ConflictBehavior::NoCheck,
146 StreamKind::Upsert => ConflictBehavior::Overwrite,
147 };
148
149 let table = Self::derive_table_catalog(
150 input.clone(),
151 name,
152 database_id,
153 schema_id,
154 user_distributed_by,
155 user_order_by,
156 columns,
157 definition,
158 conflict_behavior,
159 vec![],
160 None,
161 vec![],
162 None,
163 table_type,
164 None,
165 cardinality,
166 retention_seconds,
167 create_type,
168 None,
169 Engine::Hummock,
170 false,
171 )?;
172
173 Self::new(input, table)
174 }
175
176 #[allow(clippy::too_many_arguments)]
182 pub fn create_for_table(
183 input: PlanRef,
184 name: String,
185 database_id: DatabaseId,
186 schema_id: SchemaId,
187 user_distributed_by: RequiredDist,
188 user_order_by: Order,
189 columns: Vec<ColumnCatalog>,
190 definition: String,
191 conflict_behavior: ConflictBehavior,
192 version_column_indices: Vec<usize>,
193 pk_column_indices: Vec<usize>,
194 ttl_watermark_indices: Vec<usize>,
195 row_id_index: Option<usize>,
196 version: TableVersion,
197 retention_seconds: Option<NonZeroU32>,
198 webhook_info: Option<PbWebhookSourceInfo>,
199 engine: Engine,
200 refreshable: bool,
201 ) -> Result<Self> {
202 let input = Self::rewrite_input(input, user_distributed_by.clone(), TableType::Table)?;
203
204 let table = Self::derive_table_catalog(
205 input.clone(),
206 name.clone(),
207 database_id,
208 schema_id,
209 user_distributed_by,
210 user_order_by,
211 columns,
212 definition,
213 conflict_behavior,
214 version_column_indices,
215 Some(pk_column_indices),
216 ttl_watermark_indices,
217 row_id_index,
218 TableType::Table,
219 Some(version),
220 Cardinality::unknown(), retention_seconds,
222 CreateType::Foreground,
223 webhook_info,
224 engine,
225 refreshable,
226 )?;
227
228 let (staging_table, refresh_progress_table) = if refreshable {
230 let staging = Some(Self::derive_staging_table_catalog(table.clone()));
231 let progress = Some(Self::derive_refresh_progress_table_catalog(table.clone()));
232 (staging, progress)
233 } else {
234 (None, None)
235 };
236
237 tracing::info!(
238 table_name = %name,
239 refreshable = %refreshable,
240 has_staging_table = %staging_table.is_some(),
241 has_progress_table = %refresh_progress_table.is_some(),
242 "Creating StreamMaterialize with staging and progress table info"
243 );
244
245 Self::new_with_staging_and_progress(input, table, staging_table, refresh_progress_table)
246 }
247
248 fn rewrite_input(
250 input: PlanRef,
251 user_distributed_by: RequiredDist,
252 table_type: TableType,
253 ) -> Result<PlanRef> {
254 let required_dist = match input.distribution() {
255 Distribution::Single => RequiredDist::single(),
256 _ => match table_type {
257 TableType::Table => {
258 assert_matches!(
259 user_distributed_by,
260 RequiredDist::ShardByKey(_) | RequiredDist::ShardByExactKey(_)
261 );
262 user_distributed_by
263 }
264 TableType::MaterializedView => {
265 assert_matches!(user_distributed_by, RequiredDist::Any);
266 let required_dist =
268 RequiredDist::shard_by_key(input.schema().len(), input.expect_stream_key());
269
270 let is_stream_join = matches!(input.as_stream_hash_join(), Some(_join))
275 || matches!(input.as_stream_temporal_join(), Some(_join))
276 || matches!(input.as_stream_delta_join(), Some(_join));
277
278 if is_stream_join {
279 return Ok(required_dist.stream_enforce(input));
280 }
281
282 required_dist
283 }
284 TableType::Index => {
285 assert_matches!(
286 user_distributed_by,
287 RequiredDist::PhysicalDist(Distribution::HashShard(_))
288 );
289 user_distributed_by
290 }
291 TableType::VectorIndex => {
292 unreachable!("VectorIndex should not be created by StreamMaterialize")
293 }
294 TableType::Internal => unreachable!(),
295 },
296 };
297
298 required_dist.streaming_enforce_if_not_satisfies(input)
299 }
300
301 #[expect(clippy::too_many_arguments)]
306 fn derive_table_catalog(
307 rewritten_input: PlanRef,
308 name: String,
309 database_id: DatabaseId,
310 schema_id: SchemaId,
311 user_distributed_by: RequiredDist,
312 user_order_by: Order,
313 columns: Vec<ColumnCatalog>,
314 definition: String,
315 conflict_behavior: ConflictBehavior,
316 version_column_indices: Vec<usize>,
317 pk_column_indices: Option<Vec<usize>>, ttl_watermark_indices: Vec<usize>,
319 row_id_index: Option<usize>,
320 table_type: TableType,
321 version: Option<TableVersion>,
322 cardinality: Cardinality,
323 retention_seconds: Option<NonZeroU32>,
324 create_type: CreateType,
325 webhook_info: Option<PbWebhookSourceInfo>,
326 engine: Engine,
327 refreshable: bool,
328 ) -> Result<TableCatalog> {
329 let input = rewritten_input;
330
331 let value_indices = (0..columns.len()).collect_vec();
332 let distribution_key = input.distribution().dist_column_indices().to_vec();
333 let append_only = input.append_only();
334 let watermark_columns = input.watermark_columns().indices().collect();
337
338 let (table_pk, mut stream_key) = if let Some(pk_column_indices) = pk_column_indices {
339 let table_pk = pk_column_indices
340 .iter()
341 .map(|idx| ColumnOrder::new(*idx, OrderType::ascending()))
342 .collect();
343 (table_pk, pk_column_indices)
345 } else {
346 derive_pk(input, user_distributed_by, user_order_by, &columns)
347 };
348
349 for idx in ttl_watermark_indices.iter().copied() {
354 if !stream_key.contains(&idx) {
355 stream_key.push(idx);
356 }
357 }
358
359 let read_prefix_len_hint = table_pk.len();
360 Ok(TableCatalog {
361 id: TableId::placeholder(),
362 schema_id,
363 database_id,
364 associated_source_id: None,
365 name,
366 columns,
367 pk: table_pk,
368 stream_key,
369 distribution_key,
370 table_type,
371 append_only,
372 owner: risingwave_common::catalog::DEFAULT_SUPER_USER_ID,
373 fragment_id: FragmentId::placeholder(),
374 dml_fragment_id: None,
375 vnode_col_index: None,
376 row_id_index,
377 value_indices,
378 definition,
379 conflict_behavior,
380 version_column_indices,
381 read_prefix_len_hint,
382 version,
383 watermark_columns,
384 dist_key_in_pk: vec![],
385 cardinality,
386 created_at_epoch: None,
387 initialized_at_epoch: None,
388 cleaned_by_watermark: !ttl_watermark_indices.is_empty(),
389 create_type,
390 stream_job_status: StreamJobStatus::Creating,
391 description: None,
392 initialized_at_cluster_version: None,
393 created_at_cluster_version: None,
394 retention_seconds: retention_seconds.map(|i| i.into()),
395 cdc_table_id: None,
396 vnode_count: VnodeCount::Placeholder, webhook_info,
398 job_id: None,
399 engine: match table_type {
400 TableType::Table => engine,
401 TableType::MaterializedView
402 | TableType::Index
403 | TableType::Internal
404 | TableType::VectorIndex => {
405 assert_eq!(engine, Engine::Hummock);
406 engine
407 }
408 },
409 clean_watermark_index_in_pk: None, clean_watermark_indices: ttl_watermark_indices,
411 refreshable,
412 vector_index_info: None,
413 cdc_table_type: None,
414 })
415 }
416
417 fn derive_staging_table_catalog(
419 TableCatalog {
420 id,
421 schema_id,
422 database_id,
423 associated_source_id,
424 name,
425 columns,
426 pk,
427 stream_key,
428 table_type: _,
429 distribution_key,
430 append_only,
431 cardinality,
432 owner,
433 retention_seconds,
434 fragment_id,
435 dml_fragment_id: _,
436 vnode_col_index,
437 row_id_index,
438 value_indices: _,
439 definition,
440 conflict_behavior,
441 version_column_indices,
442 read_prefix_len_hint,
443 version,
444 watermark_columns: _,
445 dist_key_in_pk,
446 created_at_epoch,
447 initialized_at_epoch,
448 cleaned_by_watermark,
449 create_type,
450 stream_job_status,
451 description,
452 created_at_cluster_version,
453 initialized_at_cluster_version,
454 cdc_table_id,
455 vnode_count,
456 webhook_info,
457 job_id,
458 engine,
459 clean_watermark_index_in_pk,
460 clean_watermark_indices,
461 refreshable,
462 vector_index_info,
463 cdc_table_type,
464 }: TableCatalog,
465 ) -> TableCatalog {
466 tracing::info!(
467 table_name = %name,
468 "Creating staging table for refreshable table"
469 );
470
471 assert!(row_id_index.is_none());
472 assert!(retention_seconds.is_none());
473 assert!(refreshable);
474
475 let mut pk_col_indices = vec![];
477 let mut pk_cols = vec![];
478 for (i, col) in columns.iter().enumerate() {
479 if pk.iter().any(|pk| pk.column_index == i) {
480 pk_col_indices.push(i);
481 pk_cols.push(col.clone());
482 }
483 }
484 let mapping = ColIndexMapping::with_remaining_columns(&pk_col_indices, columns.len());
485
486 TableCatalog {
487 id,
488 schema_id,
489 database_id,
490 associated_source_id,
491 name,
492 value_indices: (0..pk_cols.len()).collect(),
493 columns: pk_cols,
494 pk: pk
495 .iter()
496 .map(|pk| ColumnOrder::new(mapping.map(pk.column_index), pk.order_type))
497 .collect(),
498 stream_key: mapping.try_map_all(stream_key).unwrap(),
499 vnode_col_index: vnode_col_index.map(|i| mapping.map(i)),
500 dist_key_in_pk: mapping.try_map_all(dist_key_in_pk).unwrap(),
501 distribution_key: mapping.try_map_all(distribution_key).unwrap(),
502 table_type: TableType::Internal,
503 watermark_columns: FixedBitSet::new(),
504 append_only,
505 cardinality,
506 owner,
507 retention_seconds: None,
508 fragment_id,
509 dml_fragment_id: None,
510 row_id_index: None,
511 definition,
512 conflict_behavior,
513 version_column_indices,
514 read_prefix_len_hint,
515 version,
516 created_at_epoch,
517 initialized_at_epoch,
518 cleaned_by_watermark,
519 create_type,
520 stream_job_status,
521 description,
522 created_at_cluster_version,
523 initialized_at_cluster_version,
524 cdc_table_id,
525 vnode_count,
526 webhook_info,
527 job_id,
528 engine,
529 clean_watermark_index_in_pk,
530 clean_watermark_indices,
531 refreshable: false,
532 vector_index_info,
533 cdc_table_type,
534 }
535 }
536
537 fn derive_refresh_progress_table_catalog(table: TableCatalog) -> TableCatalog {
541 tracing::debug!(
542 table_name = %table.name,
543 "Creating refresh progress table for refreshable table"
544 );
545
546 let mut columns = vec![ColumnCatalog {
549 column_desc: risingwave_common::catalog::ColumnDesc::named(
550 "vnode",
551 0.into(),
552 DataType::Int16,
553 ),
554 is_hidden: false,
555 }];
556
557 let mut col_index = 1;
559 for pk_col in &table.pk {
560 let upstream_col = &table.columns[pk_col.column_index];
561 columns.push(ColumnCatalog {
562 column_desc: risingwave_common::catalog::ColumnDesc::named(
563 format!("pos_{}", upstream_col.name()),
564 col_index.into(),
565 upstream_col.data_type().clone(),
566 ),
567 is_hidden: false,
568 });
569 col_index += 1;
570 }
571
572 for (name, data_type) in [
574 ("is_completed", DataType::Boolean),
575 ("processed_rows", DataType::Int64),
576 ] {
577 columns.push(ColumnCatalog {
578 column_desc: risingwave_common::catalog::ColumnDesc::named(
579 name,
580 col_index.into(),
581 data_type,
582 ),
583 is_hidden: false,
584 });
585 col_index += 1;
586 }
587
588 let mut builder = TableCatalogBuilder::default();
589
590 for column in &columns {
592 builder.add_column(&(&column.column_desc).into());
593 }
594
595 builder.add_order_column(0, OrderType::ascending());
597 builder.set_vnode_col_idx(0);
598 builder.set_value_indices((0..columns.len()).collect());
599 builder.set_dist_key_in_pk(vec![0]);
600
601 builder.build(vec![0], 1)
602 }
603
604 #[must_use]
606 pub fn table(&self) -> &TableCatalog {
607 &self.table
608 }
609
610 #[must_use]
612 pub fn staging_table(&self) -> Option<&TableCatalog> {
613 self.staging_table.as_ref()
614 }
615
616 #[must_use]
618 pub fn refresh_progress_table(&self) -> Option<&TableCatalog> {
619 self.refresh_progress_table.as_ref()
620 }
621
622 pub fn name(&self) -> &str {
623 self.table.name()
624 }
625}
626
627impl Distill for StreamMaterialize {
628 fn distill<'a>(&self) -> XmlNode<'a> {
629 let table = self.table();
630
631 let column_names = (table.columns.iter())
632 .map(|col| col.name_with_hidden().to_string())
633 .map(Pretty::from)
634 .collect();
635
636 let stream_key = (table.stream_key().iter())
637 .map(|&k| table.columns[k].name().to_owned())
638 .map(Pretty::from)
639 .collect();
640
641 let pk_columns = (table.pk.iter())
642 .map(|o| table.columns[o.column_index].name().to_owned())
643 .map(Pretty::from)
644 .collect();
645 let mut vec = Vec::with_capacity(5);
646 vec.push(("columns", Pretty::Array(column_names)));
647 vec.push(("stream_key", Pretty::Array(stream_key)));
648 vec.push(("pk_columns", Pretty::Array(pk_columns)));
649 let pk_conflict_behavior = self.table.conflict_behavior().debug_to_string();
650
651 vec.push(("pk_conflict", Pretty::from(pk_conflict_behavior)));
652
653 let watermark_columns = &self.base.watermark_columns();
654 if self.base.watermark_columns().n_indices() > 0 {
655 let watermark_column_names = watermark_columns
657 .indices()
658 .map(|i| table.columns()[i].name_with_hidden().to_string())
659 .map(Pretty::from)
660 .collect();
661 vec.push(("watermark_columns", Pretty::Array(watermark_column_names)));
662 };
663 childless_record("StreamMaterialize", vec)
664 }
665}
666
667impl PlanTreeNodeUnary<Stream> for StreamMaterialize {
668 fn input(&self) -> PlanRef {
669 self.input.clone()
670 }
671
672 fn clone_with_input(&self, input: PlanRef) -> Self {
673 let new = Self::new_with_staging_and_progress(
674 input,
675 self.table().clone(),
676 self.staging_table.clone(),
677 self.refresh_progress_table.clone(),
678 )
679 .unwrap();
680 new.base
681 .schema()
682 .fields
683 .iter()
684 .zip_eq_fast(self.base.schema().fields.iter())
685 .for_each(|(a, b)| {
686 assert_eq!(a.data_type, b.data_type);
687 });
688 assert_eq!(new.plan_base().stream_key(), self.plan_base().stream_key());
689 new
690 }
691}
692
693impl_plan_tree_node_for_unary! { Stream, StreamMaterialize }
694
695impl StreamNode for StreamMaterialize {
696 fn to_stream_prost_body(&self, state: &mut BuildFragmentGraphState) -> PbNodeBody {
697 use risingwave_pb::stream_plan::*;
698
699 tracing::debug!(
700 table_name = %self.table().name(),
701 refreshable = %self.table().refreshable,
702 has_staging_table = %self.staging_table.is_some(),
703 has_progress_table = %self.refresh_progress_table.is_some(),
704 staging_table_name = ?self.staging_table.as_ref().map(|t| (&t.id, &t.name)),
705 progress_table_name = ?self.refresh_progress_table.as_ref().map(|t| (&t.id, &t.name)),
706 "Converting StreamMaterialize to protobuf"
707 );
708
709 let staging_table_prost = self
710 .staging_table
711 .clone()
712 .map(|t| t.with_id(state.gen_table_id_wrapped()).to_prost());
713
714 let refresh_progress_table_prost = self
715 .refresh_progress_table
716 .clone()
717 .map(|t| t.with_id(state.gen_table_id_wrapped()).to_prost());
718
719 PbNodeBody::Materialize(Box::new(MaterializeNode {
720 table_id: 0.into(),
723 table: None,
724 staging_table: staging_table_prost,
726 refresh_progress_table: refresh_progress_table_prost,
728
729 column_orders: self
730 .table()
731 .pk()
732 .iter()
733 .copied()
734 .map(ColumnOrder::to_protobuf)
735 .collect(),
736 }))
737 }
738}
739
740impl ExprRewritable<Stream> for StreamMaterialize {}
741
742impl ExprVisitable for StreamMaterialize {}