1use std::assert_matches::assert_matches;
16use std::num::NonZeroU32;
17
18use itertools::Itertools;
19use pretty_xmlish::{Pretty, XmlNode};
20use risingwave_common::catalog::{
21 ColumnCatalog, ConflictBehavior, CreateType, Engine, OBJECT_ID_PLACEHOLDER, StreamJobStatus,
22 TableId,
23};
24use risingwave_common::hash::VnodeCount;
25use risingwave_common::util::iter_util::ZipEqFast;
26use risingwave_common::util::sort_util::{ColumnOrder, OrderType};
27use risingwave_pb::catalog::PbWebhookSourceInfo;
28use risingwave_pb::stream_plan::stream_node::PbNodeBody;
29
30use super::derive::derive_columns;
31use super::stream::prelude::*;
32use super::utils::{Distill, childless_record};
33use super::{
34 ExprRewritable, PlanTreeNodeUnary, StreamNode, StreamPlanRef as PlanRef, reorganize_elements_id,
35};
36use crate::catalog::table_catalog::{TableCatalog, TableType, TableVersion};
37use crate::catalog::{DatabaseId, SchemaId};
38use crate::error::Result;
39use crate::optimizer::StreamOptimizedLogicalPlanRoot;
40use crate::optimizer::plan_node::derive::derive_pk;
41use crate::optimizer::plan_node::expr_visitable::ExprVisitable;
42use crate::optimizer::plan_node::utils::plan_can_use_background_ddl;
43use crate::optimizer::plan_node::{PlanBase, PlanNodeMeta};
44use crate::optimizer::property::{Cardinality, Distribution, Order, RequiredDist};
45use crate::stream_fragmenter::BuildFragmentGraphState;
46
47#[derive(Debug, Clone, PartialEq, Eq, Hash)]
49pub struct StreamMaterialize {
50 pub base: PlanBase<Stream>,
51 input: PlanRef,
53 table: TableCatalog,
54}
55
56impl StreamMaterialize {
57 pub fn new(input: PlanRef, table: TableCatalog) -> Result<Self> {
58 let kind = match table.conflict_behavior() {
59 ConflictBehavior::NoCheck => {
60 reject_upsert_input!(input, "Materialize without conflict handling")
61 }
62
63 ConflictBehavior::Overwrite
65 | ConflictBehavior::IgnoreConflict
66 | ConflictBehavior::DoUpdateIfNotNull => match input.stream_kind() {
67 StreamKind::AppendOnly | StreamKind::Retract => input.stream_kind(),
68 StreamKind::Upsert => StreamKind::Retract,
69 },
70 };
71
72 let base = PlanBase::new_stream(
73 input.ctx(),
74 input.schema().clone(),
75 Some(table.stream_key.clone()),
76 input.functional_dependency().clone(),
77 input.distribution().clone(),
78 kind,
79 input.emit_on_window_close(),
80 input.watermark_columns().clone(),
81 input.columns_monotonicity().clone(),
82 );
83
84 Ok(Self { base, input, table })
85 }
86
87 pub fn create(
92 StreamOptimizedLogicalPlanRoot {
93 plan: input,
94 required_dist: user_distributed_by,
95 required_order: user_order_by,
96 out_fields: user_cols,
97 out_names,
98 ..
99 }: StreamOptimizedLogicalPlanRoot,
100 name: String,
101 database_id: DatabaseId,
102 schema_id: SchemaId,
103 definition: String,
104 table_type: TableType,
105 cardinality: Cardinality,
106 retention_seconds: Option<NonZeroU32>,
107 ) -> Result<Self> {
108 let input = Self::rewrite_input(input, user_distributed_by, table_type)?;
109 let input = reorganize_elements_id(input);
111 let columns = derive_columns(input.schema(), out_names, &user_cols)?;
112
113 let create_type = if matches!(table_type, TableType::MaterializedView)
114 && input.ctx().session_ctx().config().background_ddl()
115 && plan_can_use_background_ddl(&input)
116 {
117 CreateType::Background
118 } else {
119 CreateType::Foreground
120 };
121
122 let table = Self::derive_table_catalog(
123 input.clone(),
124 name,
125 database_id,
126 schema_id,
127 user_order_by,
128 columns,
129 definition,
130 ConflictBehavior::NoCheck,
131 vec![],
132 None,
133 None,
134 table_type,
135 None,
136 cardinality,
137 retention_seconds,
138 create_type,
139 None,
140 Engine::Hummock,
141 false,
142 )?;
143
144 Self::new(input, table)
145 }
146
147 #[allow(clippy::too_many_arguments)]
153 pub fn create_for_table(
154 input: PlanRef,
155 name: String,
156 database_id: DatabaseId,
157 schema_id: SchemaId,
158 user_distributed_by: RequiredDist,
159 user_order_by: Order,
160 columns: Vec<ColumnCatalog>,
161 definition: String,
162 conflict_behavior: ConflictBehavior,
163 version_column_indices: Vec<usize>,
164 pk_column_indices: Vec<usize>,
165 row_id_index: Option<usize>,
166 version: TableVersion,
167 retention_seconds: Option<NonZeroU32>,
168 webhook_info: Option<PbWebhookSourceInfo>,
169 engine: Engine,
170 refreshable: bool,
171 ) -> Result<Self> {
172 let input = Self::rewrite_input(input, user_distributed_by, TableType::Table)?;
173
174 let table = Self::derive_table_catalog(
175 input.clone(),
176 name,
177 database_id,
178 schema_id,
179 user_order_by,
180 columns,
181 definition,
182 conflict_behavior,
183 version_column_indices,
184 Some(pk_column_indices),
185 row_id_index,
186 TableType::Table,
187 Some(version),
188 Cardinality::unknown(), retention_seconds,
190 CreateType::Foreground,
191 webhook_info,
192 engine,
193 refreshable,
194 )?;
195
196 Self::new(input, table)
197 }
198
199 fn rewrite_input(
201 input: PlanRef,
202 user_distributed_by: RequiredDist,
203 table_type: TableType,
204 ) -> Result<PlanRef> {
205 let required_dist = match input.distribution() {
206 Distribution::Single => RequiredDist::single(),
207 _ => match table_type {
208 TableType::Table => {
209 assert_matches!(user_distributed_by, RequiredDist::ShardByKey(_));
210 user_distributed_by
211 }
212 TableType::MaterializedView => {
213 assert_matches!(user_distributed_by, RequiredDist::Any);
214 let required_dist =
216 RequiredDist::shard_by_key(input.schema().len(), input.expect_stream_key());
217
218 let is_stream_join = matches!(input.as_stream_hash_join(), Some(_join))
223 || matches!(input.as_stream_temporal_join(), Some(_join))
224 || matches!(input.as_stream_delta_join(), Some(_join));
225
226 if is_stream_join {
227 return Ok(required_dist.stream_enforce(input));
228 }
229
230 required_dist
231 }
232 TableType::Index => {
233 assert_matches!(
234 user_distributed_by,
235 RequiredDist::PhysicalDist(Distribution::HashShard(_))
236 );
237 user_distributed_by
238 }
239 TableType::VectorIndex => {
240 unreachable!("VectorIndex should not be created by StreamMaterialize")
241 }
242 TableType::Internal => unreachable!(),
243 },
244 };
245
246 required_dist.streaming_enforce_if_not_satisfies(input)
247 }
248
249 #[expect(clippy::too_many_arguments)]
254 fn derive_table_catalog(
255 rewritten_input: PlanRef,
256 name: String,
257 database_id: DatabaseId,
258 schema_id: SchemaId,
259 user_order_by: Order,
260 columns: Vec<ColumnCatalog>,
261 definition: String,
262 conflict_behavior: ConflictBehavior,
263 version_column_indices: Vec<usize>,
264 pk_column_indices: Option<Vec<usize>>, row_id_index: Option<usize>,
266 table_type: TableType,
267 version: Option<TableVersion>,
268 cardinality: Cardinality,
269 retention_seconds: Option<NonZeroU32>,
270 create_type: CreateType,
271 webhook_info: Option<PbWebhookSourceInfo>,
272 engine: Engine,
273 refreshable: bool,
274 ) -> Result<TableCatalog> {
275 let input = rewritten_input;
276
277 let value_indices = (0..columns.len()).collect_vec();
278 let distribution_key = input.distribution().dist_column_indices().to_vec();
279 let append_only = input.append_only();
280 let watermark_columns = input.watermark_columns().indices().collect();
283
284 let (table_pk, stream_key) = if let Some(pk_column_indices) = pk_column_indices {
285 let table_pk = pk_column_indices
286 .iter()
287 .map(|idx| ColumnOrder::new(*idx, OrderType::ascending()))
288 .collect();
289 (table_pk, pk_column_indices)
291 } else {
292 derive_pk(input, user_order_by, &columns)
293 };
294 let read_prefix_len_hint = table_pk.len();
297 Ok(TableCatalog {
298 id: TableId::placeholder(),
299 schema_id,
300 database_id,
301 associated_source_id: None,
302 name,
303 columns,
304 pk: table_pk,
305 stream_key,
306 distribution_key,
307 table_type,
308 append_only,
309 owner: risingwave_common::catalog::DEFAULT_SUPER_USER_ID,
310 fragment_id: OBJECT_ID_PLACEHOLDER,
311 dml_fragment_id: None,
312 vnode_col_index: None,
313 row_id_index,
314 value_indices,
315 definition,
316 conflict_behavior,
317 version_column_indices,
318 read_prefix_len_hint,
319 version,
320 watermark_columns,
321 dist_key_in_pk: vec![],
322 cardinality,
323 created_at_epoch: None,
324 initialized_at_epoch: None,
325 cleaned_by_watermark: false,
326 create_type,
327 stream_job_status: StreamJobStatus::Creating,
328 description: None,
329 initialized_at_cluster_version: None,
330 created_at_cluster_version: None,
331 retention_seconds: retention_seconds.map(|i| i.into()),
332 cdc_table_id: None,
333 vnode_count: VnodeCount::Placeholder, webhook_info,
335 job_id: None,
336 engine: match table_type {
337 TableType::Table => engine,
338 TableType::MaterializedView
339 | TableType::Index
340 | TableType::Internal
341 | TableType::VectorIndex => {
342 assert_eq!(engine, Engine::Hummock);
343 engine
344 }
345 },
346 clean_watermark_index_in_pk: None, refreshable,
348 vector_index_info: None,
349 cdc_table_type: None,
350 })
351 }
352
353 #[must_use]
355 pub fn table(&self) -> &TableCatalog {
356 &self.table
357 }
358
359 pub fn name(&self) -> &str {
360 self.table.name()
361 }
362}
363
364impl Distill for StreamMaterialize {
365 fn distill<'a>(&self) -> XmlNode<'a> {
366 let table = self.table();
367
368 let column_names = (table.columns.iter())
369 .map(|col| col.name_with_hidden().to_string())
370 .map(Pretty::from)
371 .collect();
372
373 let stream_key = (table.stream_key.iter())
374 .map(|&k| table.columns[k].name().to_owned())
375 .map(Pretty::from)
376 .collect();
377
378 let pk_columns = (table.pk.iter())
379 .map(|o| table.columns[o.column_index].name().to_owned())
380 .map(Pretty::from)
381 .collect();
382 let mut vec = Vec::with_capacity(5);
383 vec.push(("columns", Pretty::Array(column_names)));
384 vec.push(("stream_key", Pretty::Array(stream_key)));
385 vec.push(("pk_columns", Pretty::Array(pk_columns)));
386 let pk_conflict_behavior = self.table.conflict_behavior().debug_to_string();
387
388 vec.push(("pk_conflict", Pretty::from(pk_conflict_behavior)));
389
390 let watermark_columns = &self.base.watermark_columns();
391 if self.base.watermark_columns().n_indices() > 0 {
392 let watermark_column_names = watermark_columns
394 .indices()
395 .map(|i| table.columns()[i].name_with_hidden().to_string())
396 .map(Pretty::from)
397 .collect();
398 vec.push(("watermark_columns", Pretty::Array(watermark_column_names)));
399 };
400 childless_record("StreamMaterialize", vec)
401 }
402}
403
404impl PlanTreeNodeUnary<Stream> for StreamMaterialize {
405 fn input(&self) -> PlanRef {
406 self.input.clone()
407 }
408
409 fn clone_with_input(&self, input: PlanRef) -> Self {
410 let new = Self::new(input, self.table().clone()).unwrap();
411 new.base
412 .schema()
413 .fields
414 .iter()
415 .zip_eq_fast(self.base.schema().fields.iter())
416 .for_each(|(a, b)| {
417 assert_eq!(a.data_type, b.data_type);
418 });
419 assert_eq!(new.plan_base().stream_key(), self.plan_base().stream_key());
420 new
421 }
422}
423
424impl_plan_tree_node_for_unary! { Stream, StreamMaterialize }
425
426impl StreamNode for StreamMaterialize {
427 fn to_stream_prost_body(&self, _state: &mut BuildFragmentGraphState) -> PbNodeBody {
428 use risingwave_pb::stream_plan::*;
429
430 PbNodeBody::Materialize(Box::new(MaterializeNode {
431 table_id: 0,
434 table: None,
435
436 column_orders: self
437 .table()
438 .pk()
439 .iter()
440 .copied()
441 .map(ColumnOrder::to_protobuf)
442 .collect(),
443 }))
444 }
445}
446
447impl ExprRewritable<Stream> for StreamMaterialize {}
448
449impl ExprVisitable for StreamMaterialize {}