1use std::assert_matches::assert_matches;
16use std::num::NonZeroU32;
17
18use itertools::Itertools;
19use pretty_xmlish::{Pretty, XmlNode};
20use risingwave_common::catalog::{
21 ColumnCatalog, ConflictBehavior, CreateType, Engine, OBJECT_ID_PLACEHOLDER, StreamJobStatus,
22 TableId,
23};
24use risingwave_common::hash::VnodeCount;
25use risingwave_common::util::iter_util::ZipEqFast;
26use risingwave_common::util::sort_util::{ColumnOrder, OrderType};
27use risingwave_pb::catalog::PbWebhookSourceInfo;
28use risingwave_pb::stream_plan::stream_node::PbNodeBody;
29
30use super::derive::derive_columns;
31use super::stream::prelude::*;
32use super::utils::{Distill, childless_record};
33use super::{
34 ExprRewritable, PlanTreeNodeUnary, StreamNode, StreamPlanRef as PlanRef, reorganize_elements_id,
35};
36use crate::catalog::table_catalog::{TableCatalog, TableType, TableVersion};
37use crate::catalog::{DatabaseId, SchemaId};
38use crate::error::Result;
39use crate::optimizer::StreamOptimizedLogicalPlanRoot;
40use crate::optimizer::plan_node::derive::derive_pk;
41use crate::optimizer::plan_node::expr_visitable::ExprVisitable;
42use crate::optimizer::plan_node::utils::plan_can_use_background_ddl;
43use crate::optimizer::plan_node::{PlanBase, PlanNodeMeta};
44use crate::optimizer::property::{Cardinality, Distribution, Order, RequiredDist};
45use crate::stream_fragmenter::BuildFragmentGraphState;
46
47#[derive(Debug, Clone, PartialEq, Eq, Hash)]
49pub struct StreamMaterialize {
50 pub base: PlanBase<Stream>,
51 input: PlanRef,
53 table: TableCatalog,
54}
55
56impl StreamMaterialize {
57 pub fn new(input: PlanRef, table: TableCatalog) -> Result<Self> {
58 let kind = match table.conflict_behavior() {
59 ConflictBehavior::NoCheck => {
60 reject_upsert_input!(input, "Materialize without conflict handling")
61 }
62
63 ConflictBehavior::Overwrite
65 | ConflictBehavior::IgnoreConflict
66 | ConflictBehavior::DoUpdateIfNotNull => match input.stream_kind() {
67 StreamKind::AppendOnly | StreamKind::Retract => input.stream_kind(),
68 StreamKind::Upsert => StreamKind::Retract,
69 },
70 };
71
72 let base = PlanBase::new_stream(
73 input.ctx(),
74 input.schema().clone(),
75 Some(table.stream_key.clone()),
76 input.functional_dependency().clone(),
77 input.distribution().clone(),
78 kind,
79 input.emit_on_window_close(),
80 input.watermark_columns().clone(),
81 input.columns_monotonicity().clone(),
82 );
83
84 Ok(Self { base, input, table })
85 }
86
87 #[allow(clippy::too_many_arguments)]
92 pub fn create(
93 StreamOptimizedLogicalPlanRoot {
94 plan: input,
95 required_dist: user_distributed_by,
96 required_order: user_order_by,
97 out_fields: user_cols,
98 out_names,
99 ..
100 }: StreamOptimizedLogicalPlanRoot,
101 name: String,
102 database_id: DatabaseId,
103 schema_id: SchemaId,
104 definition: String,
105 table_type: TableType,
106 cardinality: Cardinality,
107 retention_seconds: Option<NonZeroU32>,
108 ) -> Result<Self> {
109 let input = Self::rewrite_input(input, user_distributed_by, table_type)?;
110 let input = reorganize_elements_id(input);
112 let columns = derive_columns(input.schema(), out_names, &user_cols)?;
113
114 let create_type = if matches!(table_type, TableType::MaterializedView)
115 && input.ctx().session_ctx().config().background_ddl()
116 && plan_can_use_background_ddl(&input)
117 {
118 CreateType::Background
119 } else {
120 CreateType::Foreground
121 };
122
123 let table = Self::derive_table_catalog(
124 input.clone(),
125 name,
126 database_id,
127 schema_id,
128 user_order_by,
129 columns,
130 definition,
131 ConflictBehavior::NoCheck,
132 None,
133 None,
134 None,
135 table_type,
136 None,
137 cardinality,
138 retention_seconds,
139 create_type,
140 None,
141 Engine::Hummock,
142 false,
143 )?;
144
145 Self::new(input, table)
146 }
147
148 #[allow(clippy::too_many_arguments)]
154 pub fn create_for_table(
155 input: PlanRef,
156 name: String,
157 database_id: DatabaseId,
158 schema_id: SchemaId,
159 user_distributed_by: RequiredDist,
160 user_order_by: Order,
161 columns: Vec<ColumnCatalog>,
162 definition: String,
163 conflict_behavior: ConflictBehavior,
164 version_column_index: Option<usize>,
165 pk_column_indices: Vec<usize>,
166 row_id_index: Option<usize>,
167 version: TableVersion,
168 retention_seconds: Option<NonZeroU32>,
169 webhook_info: Option<PbWebhookSourceInfo>,
170 engine: Engine,
171 refreshable: bool,
172 ) -> Result<Self> {
173 let input = Self::rewrite_input(input, user_distributed_by, TableType::Table)?;
174
175 let table = Self::derive_table_catalog(
176 input.clone(),
177 name,
178 database_id,
179 schema_id,
180 user_order_by,
181 columns,
182 definition,
183 conflict_behavior,
184 version_column_index,
185 Some(pk_column_indices),
186 row_id_index,
187 TableType::Table,
188 Some(version),
189 Cardinality::unknown(), retention_seconds,
191 CreateType::Foreground,
192 webhook_info,
193 engine,
194 refreshable,
195 )?;
196
197 Self::new(input, table)
198 }
199
200 fn rewrite_input(
202 input: PlanRef,
203 user_distributed_by: RequiredDist,
204 table_type: TableType,
205 ) -> Result<PlanRef> {
206 let required_dist = match input.distribution() {
207 Distribution::Single => RequiredDist::single(),
208 _ => match table_type {
209 TableType::Table => {
210 assert_matches!(user_distributed_by, RequiredDist::ShardByKey(_));
211 user_distributed_by
212 }
213 TableType::MaterializedView => {
214 assert_matches!(user_distributed_by, RequiredDist::Any);
215 let required_dist =
217 RequiredDist::shard_by_key(input.schema().len(), input.expect_stream_key());
218
219 let is_stream_join = matches!(input.as_stream_hash_join(), Some(_join))
224 || matches!(input.as_stream_temporal_join(), Some(_join))
225 || matches!(input.as_stream_delta_join(), Some(_join));
226
227 if is_stream_join {
228 return Ok(required_dist.stream_enforce(input));
229 }
230
231 required_dist
232 }
233 TableType::Index => {
234 assert_matches!(
235 user_distributed_by,
236 RequiredDist::PhysicalDist(Distribution::HashShard(_))
237 );
238 user_distributed_by
239 }
240 TableType::Internal => unreachable!(),
241 },
242 };
243
244 required_dist.streaming_enforce_if_not_satisfies(input)
245 }
246
247 #[allow(clippy::too_many_arguments)]
252 fn derive_table_catalog(
253 rewritten_input: PlanRef,
254 name: String,
255 database_id: DatabaseId,
256 schema_id: SchemaId,
257 user_order_by: Order,
258 columns: Vec<ColumnCatalog>,
259 definition: String,
260 conflict_behavior: ConflictBehavior,
261 version_column_index: Option<usize>,
262 pk_column_indices: Option<Vec<usize>>, row_id_index: Option<usize>,
264 table_type: TableType,
265 version: Option<TableVersion>,
266 cardinality: Cardinality,
267 retention_seconds: Option<NonZeroU32>,
268 create_type: CreateType,
269 webhook_info: Option<PbWebhookSourceInfo>,
270 engine: Engine,
271 refreshable: bool,
272 ) -> Result<TableCatalog> {
273 let input = rewritten_input;
274
275 let value_indices = (0..columns.len()).collect_vec();
276 let distribution_key = input.distribution().dist_column_indices().to_vec();
277 let append_only = input.append_only();
278 let watermark_columns = input.watermark_columns().indices().collect();
281
282 let (table_pk, stream_key) = if let Some(pk_column_indices) = pk_column_indices {
283 let table_pk = pk_column_indices
284 .iter()
285 .map(|idx| ColumnOrder::new(*idx, OrderType::ascending()))
286 .collect();
287 (table_pk, pk_column_indices)
289 } else {
290 derive_pk(input, user_order_by, &columns)
291 };
292 let read_prefix_len_hint = table_pk.len();
295 Ok(TableCatalog {
296 id: TableId::placeholder(),
297 schema_id,
298 database_id,
299 associated_source_id: None,
300 name,
301 columns,
302 pk: table_pk,
303 stream_key,
304 distribution_key,
305 table_type,
306 append_only,
307 owner: risingwave_common::catalog::DEFAULT_SUPER_USER_ID,
308 fragment_id: OBJECT_ID_PLACEHOLDER,
309 dml_fragment_id: None,
310 vnode_col_index: None,
311 row_id_index,
312 value_indices,
313 definition,
314 conflict_behavior,
315 version_column_index,
316 read_prefix_len_hint,
317 version,
318 watermark_columns,
319 dist_key_in_pk: vec![],
320 cardinality,
321 created_at_epoch: None,
322 initialized_at_epoch: None,
323 cleaned_by_watermark: false,
324 create_type,
325 stream_job_status: StreamJobStatus::Creating,
326 description: None,
327 incoming_sinks: vec![],
328 initialized_at_cluster_version: None,
329 created_at_cluster_version: None,
330 retention_seconds: retention_seconds.map(|i| i.into()),
331 cdc_table_id: None,
332 vnode_count: VnodeCount::Placeholder, webhook_info,
334 job_id: None,
335 engine: match table_type {
336 TableType::Table => engine,
337 TableType::MaterializedView | TableType::Index | TableType::Internal => {
338 assert_eq!(engine, Engine::Hummock);
339 engine
340 }
341 },
342 clean_watermark_index_in_pk: None, refreshable,
344 vector_index_info: None,
345 })
346 }
347
348 #[must_use]
350 pub fn table(&self) -> &TableCatalog {
351 &self.table
352 }
353
354 pub fn name(&self) -> &str {
355 self.table.name()
356 }
357}
358
359impl Distill for StreamMaterialize {
360 fn distill<'a>(&self) -> XmlNode<'a> {
361 let table = self.table();
362
363 let column_names = (table.columns.iter())
364 .map(|col| col.name_with_hidden().to_string())
365 .map(Pretty::from)
366 .collect();
367
368 let stream_key = (table.stream_key.iter())
369 .map(|&k| table.columns[k].name().to_owned())
370 .map(Pretty::from)
371 .collect();
372
373 let pk_columns = (table.pk.iter())
374 .map(|o| table.columns[o.column_index].name().to_owned())
375 .map(Pretty::from)
376 .collect();
377 let mut vec = Vec::with_capacity(5);
378 vec.push(("columns", Pretty::Array(column_names)));
379 vec.push(("stream_key", Pretty::Array(stream_key)));
380 vec.push(("pk_columns", Pretty::Array(pk_columns)));
381 let pk_conflict_behavior = self.table.conflict_behavior().debug_to_string();
382
383 vec.push(("pk_conflict", Pretty::from(pk_conflict_behavior)));
384
385 let watermark_columns = &self.base.watermark_columns();
386 if self.base.watermark_columns().n_indices() > 0 {
387 let watermark_column_names = watermark_columns
389 .indices()
390 .map(|i| table.columns()[i].name_with_hidden().to_string())
391 .map(Pretty::from)
392 .collect();
393 vec.push(("watermark_columns", Pretty::Array(watermark_column_names)));
394 };
395 childless_record("StreamMaterialize", vec)
396 }
397}
398
399impl PlanTreeNodeUnary<Stream> for StreamMaterialize {
400 fn input(&self) -> PlanRef {
401 self.input.clone()
402 }
403
404 fn clone_with_input(&self, input: PlanRef) -> Self {
405 let new = Self::new(input, self.table().clone()).unwrap();
406 new.base
407 .schema()
408 .fields
409 .iter()
410 .zip_eq_fast(self.base.schema().fields.iter())
411 .for_each(|(a, b)| {
412 assert_eq!(a.data_type, b.data_type);
413 });
414 assert_eq!(new.plan_base().stream_key(), self.plan_base().stream_key());
415 new
416 }
417}
418
419impl_plan_tree_node_for_unary! { Stream, StreamMaterialize }
420
421impl StreamNode for StreamMaterialize {
422 fn to_stream_prost_body(&self, _state: &mut BuildFragmentGraphState) -> PbNodeBody {
423 use risingwave_pb::stream_plan::*;
424
425 PbNodeBody::Materialize(Box::new(MaterializeNode {
426 table_id: 0,
429 table: None,
430
431 column_orders: self
432 .table()
433 .pk()
434 .iter()
435 .copied()
436 .map(ColumnOrder::to_protobuf)
437 .collect(),
438 }))
439 }
440}
441
442impl ExprRewritable<Stream> for StreamMaterialize {}
443
444impl ExprVisitable for StreamMaterialize {}