1use std::assert_matches::assert_matches;
16use std::num::NonZeroU32;
17
18use itertools::Itertools;
19use pretty_xmlish::{Pretty, XmlNode};
20use risingwave_common::catalog::{
21 ColumnCatalog, ConflictBehavior, CreateType, Engine, OBJECT_ID_PLACEHOLDER, StreamJobStatus,
22 TableId,
23};
24use risingwave_common::hash::VnodeCount;
25use risingwave_common::util::iter_util::ZipEqFast;
26use risingwave_common::util::sort_util::{ColumnOrder, OrderType};
27use risingwave_pb::catalog::PbWebhookSourceInfo;
28use risingwave_pb::stream_plan::stream_node::PbNodeBody;
29
30use super::derive::derive_columns;
31use super::stream::prelude::*;
32use super::utils::{Distill, childless_record};
33use super::{ExprRewritable, PlanRef, PlanTreeNodeUnary, StreamNode, reorganize_elements_id};
34use crate::catalog::table_catalog::{TableCatalog, TableType, TableVersion};
35use crate::catalog::{DatabaseId, SchemaId};
36use crate::error::Result;
37use crate::optimizer::StreamOptimizedLogicalPlanRoot;
38use crate::optimizer::plan_node::derive::derive_pk;
39use crate::optimizer::plan_node::expr_visitable::ExprVisitable;
40use crate::optimizer::plan_node::utils::plan_can_use_background_ddl;
41use crate::optimizer::plan_node::{PlanBase, PlanNodeMeta};
42use crate::optimizer::property::{Cardinality, Distribution, Order, RequiredDist};
43use crate::stream_fragmenter::BuildFragmentGraphState;
44
45#[derive(Debug, Clone, PartialEq, Eq, Hash)]
47pub struct StreamMaterialize {
48 pub base: PlanBase<Stream>,
49 input: PlanRef,
51 table: TableCatalog,
52}
53
54impl StreamMaterialize {
55 #[must_use]
56 pub fn new(input: PlanRef, table: TableCatalog) -> Self {
57 let base = PlanBase::new_stream(
58 input.ctx(),
59 input.schema().clone(),
60 Some(table.stream_key.clone()),
61 input.functional_dependency().clone(),
62 input.distribution().clone(),
63 input.append_only(),
64 input.emit_on_window_close(),
65 input.watermark_columns().clone(),
66 input.columns_monotonicity().clone(),
67 );
68 Self { base, input, table }
69 }
70
71 #[allow(clippy::too_many_arguments)]
76 pub fn create(
77 StreamOptimizedLogicalPlanRoot {
78 plan: input,
79 required_dist: user_distributed_by,
80 required_order: user_order_by,
81 out_fields: user_cols,
82 out_names,
83 ..
84 }: StreamOptimizedLogicalPlanRoot,
85 name: String,
86 database_id: DatabaseId,
87 schema_id: SchemaId,
88 definition: String,
89 table_type: TableType,
90 cardinality: Cardinality,
91 retention_seconds: Option<NonZeroU32>,
92 ) -> Result<Self> {
93 let input = Self::rewrite_input(input, user_distributed_by, table_type)?;
94 let input = reorganize_elements_id(input);
96 let columns = derive_columns(input.schema(), out_names, &user_cols)?;
97
98 let create_type = if matches!(table_type, TableType::MaterializedView)
99 && input.ctx().session_ctx().config().background_ddl()
100 && plan_can_use_background_ddl(&input)
101 {
102 CreateType::Background
103 } else {
104 CreateType::Foreground
105 };
106
107 let table = Self::derive_table_catalog(
108 input.clone(),
109 name,
110 database_id,
111 schema_id,
112 user_order_by,
113 columns,
114 definition,
115 ConflictBehavior::NoCheck,
116 None,
117 None,
118 None,
119 table_type,
120 None,
121 cardinality,
122 retention_seconds,
123 create_type,
124 None,
125 Engine::Hummock,
126 )?;
127
128 Ok(Self::new(input, table))
129 }
130
131 #[allow(clippy::too_many_arguments)]
137 pub fn create_for_table(
138 input: PlanRef,
139 name: String,
140 database_id: DatabaseId,
141 schema_id: SchemaId,
142 user_distributed_by: RequiredDist,
143 user_order_by: Order,
144 columns: Vec<ColumnCatalog>,
145 definition: String,
146 conflict_behavior: ConflictBehavior,
147 version_column_index: Option<usize>,
148 pk_column_indices: Vec<usize>,
149 row_id_index: Option<usize>,
150 version: TableVersion,
151 retention_seconds: Option<NonZeroU32>,
152 webhook_info: Option<PbWebhookSourceInfo>,
153 engine: Engine,
154 ) -> Result<Self> {
155 let input = Self::rewrite_input(input, user_distributed_by, TableType::Table)?;
156
157 let table = Self::derive_table_catalog(
158 input.clone(),
159 name,
160 database_id,
161 schema_id,
162 user_order_by,
163 columns,
164 definition,
165 conflict_behavior,
166 version_column_index,
167 Some(pk_column_indices),
168 row_id_index,
169 TableType::Table,
170 Some(version),
171 Cardinality::unknown(), retention_seconds,
173 CreateType::Foreground,
174 webhook_info,
175 engine,
176 )?;
177
178 Ok(Self::new(input, table))
179 }
180
181 fn rewrite_input(
183 input: PlanRef,
184 user_distributed_by: RequiredDist,
185 table_type: TableType,
186 ) -> Result<PlanRef> {
187 let required_dist = match input.distribution() {
188 Distribution::Single => RequiredDist::single(),
189 _ => match table_type {
190 TableType::Table => {
191 assert_matches!(user_distributed_by, RequiredDist::ShardByKey(_));
192 user_distributed_by
193 }
194 TableType::MaterializedView => {
195 assert_matches!(user_distributed_by, RequiredDist::Any);
196 let required_dist =
198 RequiredDist::shard_by_key(input.schema().len(), input.expect_stream_key());
199
200 let is_stream_join = matches!(input.as_stream_hash_join(), Some(_join))
205 || matches!(input.as_stream_temporal_join(), Some(_join))
206 || matches!(input.as_stream_delta_join(), Some(_join));
207
208 if is_stream_join {
209 return Ok(required_dist.enforce(input, &Order::any()));
210 }
211
212 required_dist
213 }
214 TableType::Index => {
215 assert_matches!(
216 user_distributed_by,
217 RequiredDist::PhysicalDist(Distribution::HashShard(_))
218 );
219 user_distributed_by
220 }
221 TableType::Internal => unreachable!(),
222 },
223 };
224
225 required_dist.enforce_if_not_satisfies(input, &Order::any())
226 }
227
228 #[allow(clippy::too_many_arguments)]
233 fn derive_table_catalog(
234 rewritten_input: PlanRef,
235 name: String,
236 database_id: DatabaseId,
237 schema_id: SchemaId,
238 user_order_by: Order,
239 columns: Vec<ColumnCatalog>,
240 definition: String,
241 conflict_behavior: ConflictBehavior,
242 version_column_index: Option<usize>,
243 pk_column_indices: Option<Vec<usize>>, row_id_index: Option<usize>,
245 table_type: TableType,
246 version: Option<TableVersion>,
247 cardinality: Cardinality,
248 retention_seconds: Option<NonZeroU32>,
249 create_type: CreateType,
250 webhook_info: Option<PbWebhookSourceInfo>,
251 engine: Engine,
252 ) -> Result<TableCatalog> {
253 let input = rewritten_input;
254
255 let value_indices = (0..columns.len()).collect_vec();
256 let distribution_key = input.distribution().dist_column_indices().to_vec();
257 let append_only = input.append_only();
258 let watermark_columns = input.watermark_columns().indices().collect();
261
262 let (table_pk, stream_key) = if let Some(pk_column_indices) = pk_column_indices {
263 let table_pk = pk_column_indices
264 .iter()
265 .map(|idx| ColumnOrder::new(*idx, OrderType::ascending()))
266 .collect();
267 (table_pk, pk_column_indices)
269 } else {
270 derive_pk(input, user_order_by, &columns)
271 };
272 let read_prefix_len_hint = table_pk.len();
275 Ok(TableCatalog {
276 id: TableId::placeholder(),
277 schema_id,
278 database_id,
279 associated_source_id: None,
280 name,
281 columns,
282 pk: table_pk,
283 stream_key,
284 distribution_key,
285 table_type,
286 append_only,
287 owner: risingwave_common::catalog::DEFAULT_SUPER_USER_ID,
288 fragment_id: OBJECT_ID_PLACEHOLDER,
289 dml_fragment_id: None,
290 vnode_col_index: None,
291 row_id_index,
292 value_indices,
293 definition,
294 conflict_behavior,
295 version_column_index,
296 read_prefix_len_hint,
297 version,
298 watermark_columns,
299 dist_key_in_pk: vec![],
300 cardinality,
301 created_at_epoch: None,
302 initialized_at_epoch: None,
303 cleaned_by_watermark: false,
304 create_type,
305 stream_job_status: StreamJobStatus::Creating,
306 description: None,
307 incoming_sinks: vec![],
308 initialized_at_cluster_version: None,
309 created_at_cluster_version: None,
310 retention_seconds: retention_seconds.map(|i| i.into()),
311 cdc_table_id: None,
312 vnode_count: VnodeCount::Placeholder, webhook_info,
314 job_id: None,
315 engine: match table_type {
316 TableType::Table => engine,
317 TableType::MaterializedView | TableType::Index | TableType::Internal => {
318 assert_eq!(engine, Engine::Hummock);
319 engine
320 }
321 },
322 clean_watermark_index_in_pk: None, })
324 }
325
326 #[must_use]
328 pub fn table(&self) -> &TableCatalog {
329 &self.table
330 }
331
332 pub fn name(&self) -> &str {
333 self.table.name()
334 }
335}
336
337impl Distill for StreamMaterialize {
338 fn distill<'a>(&self) -> XmlNode<'a> {
339 let table = self.table();
340
341 let column_names = (table.columns.iter())
342 .map(|col| col.name_with_hidden().to_string())
343 .map(Pretty::from)
344 .collect();
345
346 let stream_key = (table.stream_key.iter())
347 .map(|&k| table.columns[k].name().to_owned())
348 .map(Pretty::from)
349 .collect();
350
351 let pk_columns = (table.pk.iter())
352 .map(|o| table.columns[o.column_index].name().to_owned())
353 .map(Pretty::from)
354 .collect();
355 let mut vec = Vec::with_capacity(5);
356 vec.push(("columns", Pretty::Array(column_names)));
357 vec.push(("stream_key", Pretty::Array(stream_key)));
358 vec.push(("pk_columns", Pretty::Array(pk_columns)));
359 let pk_conflict_behavior = self.table.conflict_behavior().debug_to_string();
360
361 vec.push(("pk_conflict", Pretty::from(pk_conflict_behavior)));
362
363 let watermark_columns = &self.base.watermark_columns();
364 if self.base.watermark_columns().n_indices() > 0 {
365 let watermark_column_names = watermark_columns
367 .indices()
368 .map(|i| table.columns()[i].name_with_hidden().to_string())
369 .map(Pretty::from)
370 .collect();
371 vec.push(("watermark_columns", Pretty::Array(watermark_column_names)));
372 };
373 childless_record("StreamMaterialize", vec)
374 }
375}
376
377impl PlanTreeNodeUnary for StreamMaterialize {
378 fn input(&self) -> PlanRef {
379 self.input.clone()
380 }
381
382 fn clone_with_input(&self, input: PlanRef) -> Self {
383 let new = Self::new(input, self.table().clone());
384 new.base
385 .schema()
386 .fields
387 .iter()
388 .zip_eq_fast(self.base.schema().fields.iter())
389 .for_each(|(a, b)| {
390 assert_eq!(a.data_type, b.data_type);
391 });
392 assert_eq!(new.plan_base().stream_key(), self.plan_base().stream_key());
393 new
394 }
395}
396
397impl_plan_tree_node_for_unary! { StreamMaterialize }
398
399impl StreamNode for StreamMaterialize {
400 fn to_stream_prost_body(&self, _state: &mut BuildFragmentGraphState) -> PbNodeBody {
401 use risingwave_pb::stream_plan::*;
402
403 PbNodeBody::Materialize(Box::new(MaterializeNode {
404 table_id: 0,
407 table: None,
408
409 column_orders: self
410 .table()
411 .pk()
412 .iter()
413 .map(ColumnOrder::to_protobuf)
414 .collect(),
415 }))
416 }
417}
418
419impl ExprRewritable for StreamMaterialize {}
420
421impl ExprVisitable for StreamMaterialize {}