1use std::assert_matches::assert_matches;
16use std::num::NonZeroU32;
17
18use fixedbitset::FixedBitSet;
19use itertools::Itertools;
20use pretty_xmlish::{Pretty, XmlNode};
21use risingwave_common::catalog::{
22 ColumnCatalog, ConflictBehavior, CreateType, Engine, OBJECT_ID_PLACEHOLDER, StreamJobStatus,
23 TableId,
24};
25use risingwave_common::hash::VnodeCount;
26use risingwave_common::util::iter_util::ZipEqFast;
27use risingwave_common::util::sort_util::{ColumnOrder, OrderType};
28use risingwave_pb::catalog::PbWebhookSourceInfo;
29use risingwave_pb::stream_plan::stream_node::PbNodeBody;
30
31use super::derive::derive_columns;
32use super::stream::prelude::*;
33use super::utils::{Distill, childless_record};
34use super::{ExprRewritable, PlanRef, PlanTreeNodeUnary, StreamNode, reorganize_elements_id};
35use crate::catalog::table_catalog::{TableCatalog, TableType, TableVersion};
36use crate::catalog::{DatabaseId, SchemaId};
37use crate::error::Result;
38use crate::optimizer::plan_node::derive::derive_pk;
39use crate::optimizer::plan_node::expr_visitable::ExprVisitable;
40use crate::optimizer::plan_node::utils::plan_can_use_background_ddl;
41use crate::optimizer::plan_node::{PlanBase, PlanNodeMeta};
42use crate::optimizer::property::{Cardinality, Distribution, Order, RequiredDist};
43use crate::stream_fragmenter::BuildFragmentGraphState;
44
45#[derive(Debug, Clone, PartialEq, Eq, Hash)]
47pub struct StreamMaterialize {
48 pub base: PlanBase<Stream>,
49 input: PlanRef,
51 table: TableCatalog,
52}
53
54impl StreamMaterialize {
55 #[must_use]
56 pub fn new(input: PlanRef, table: TableCatalog) -> Self {
57 let base = PlanBase::new_stream(
58 input.ctx(),
59 input.schema().clone(),
60 Some(table.stream_key.clone()),
61 input.functional_dependency().clone(),
62 input.distribution().clone(),
63 input.append_only(),
64 input.emit_on_window_close(),
65 input.watermark_columns().clone(),
66 input.columns_monotonicity().clone(),
67 );
68 Self { base, input, table }
69 }
70
71 #[allow(clippy::too_many_arguments)]
76 pub fn create(
77 input: PlanRef,
78 name: String,
79 database_id: DatabaseId,
80 schema_id: SchemaId,
81 user_distributed_by: RequiredDist,
82 user_order_by: Order,
83 user_cols: FixedBitSet,
84 out_names: Vec<String>,
85 definition: String,
86 table_type: TableType,
87 cardinality: Cardinality,
88 retention_seconds: Option<NonZeroU32>,
89 ) -> Result<Self> {
90 let input = Self::rewrite_input(input, user_distributed_by, table_type)?;
91 let input = reorganize_elements_id(input);
93 let columns = derive_columns(input.schema(), out_names, &user_cols)?;
94
95 let create_type = if matches!(table_type, TableType::MaterializedView)
96 && input.ctx().session_ctx().config().background_ddl()
97 && plan_can_use_background_ddl(&input)
98 {
99 CreateType::Background
100 } else {
101 CreateType::Foreground
102 };
103
104 let table = Self::derive_table_catalog(
105 input.clone(),
106 name,
107 database_id,
108 schema_id,
109 user_order_by,
110 columns,
111 definition,
112 ConflictBehavior::NoCheck,
113 None,
114 None,
115 None,
116 table_type,
117 None,
118 cardinality,
119 retention_seconds,
120 create_type,
121 None,
122 Engine::Hummock,
123 )?;
124
125 Ok(Self::new(input, table))
126 }
127
128 #[allow(clippy::too_many_arguments)]
134 pub fn create_for_table(
135 input: PlanRef,
136 name: String,
137 database_id: DatabaseId,
138 schema_id: SchemaId,
139 user_distributed_by: RequiredDist,
140 user_order_by: Order,
141 columns: Vec<ColumnCatalog>,
142 definition: String,
143 conflict_behavior: ConflictBehavior,
144 version_column_index: Option<usize>,
145 pk_column_indices: Vec<usize>,
146 row_id_index: Option<usize>,
147 version: TableVersion,
148 retention_seconds: Option<NonZeroU32>,
149 webhook_info: Option<PbWebhookSourceInfo>,
150 engine: Engine,
151 ) -> Result<Self> {
152 let input = Self::rewrite_input(input, user_distributed_by, TableType::Table)?;
153
154 let table = Self::derive_table_catalog(
155 input.clone(),
156 name,
157 database_id,
158 schema_id,
159 user_order_by,
160 columns,
161 definition,
162 conflict_behavior,
163 version_column_index,
164 Some(pk_column_indices),
165 row_id_index,
166 TableType::Table,
167 Some(version),
168 Cardinality::unknown(), retention_seconds,
170 CreateType::Foreground,
171 webhook_info,
172 engine,
173 )?;
174
175 Ok(Self::new(input, table))
176 }
177
178 fn rewrite_input(
180 input: PlanRef,
181 user_distributed_by: RequiredDist,
182 table_type: TableType,
183 ) -> Result<PlanRef> {
184 let required_dist = match input.distribution() {
185 Distribution::Single => RequiredDist::single(),
186 _ => match table_type {
187 TableType::Table => {
188 assert_matches!(user_distributed_by, RequiredDist::ShardByKey(_));
189 user_distributed_by
190 }
191 TableType::MaterializedView => {
192 match user_distributed_by {
193 RequiredDist::PhysicalDist(Distribution::HashShard(_)) => {
194 user_distributed_by
195 }
196 RequiredDist::Any => {
197 let required_dist = RequiredDist::shard_by_key(
199 input.schema().len(),
200 input.expect_stream_key(),
201 );
202
203 let is_stream_join = matches!(input.as_stream_hash_join(), Some(_join))
208 || matches!(input.as_stream_temporal_join(), Some(_join))
209 || matches!(input.as_stream_delta_join(), Some(_join));
210
211 if is_stream_join {
212 return Ok(required_dist.enforce(input, &Order::any()));
213 }
214
215 required_dist
216 }
217 _ => unreachable!("{:?}", user_distributed_by),
218 }
219 }
220 TableType::Index => {
221 assert_matches!(
222 user_distributed_by,
223 RequiredDist::PhysicalDist(Distribution::HashShard(_))
224 );
225 user_distributed_by
226 }
227 TableType::Internal => unreachable!(),
228 },
229 };
230
231 required_dist.enforce_if_not_satisfies(input, &Order::any())
232 }
233
234 #[allow(clippy::too_many_arguments)]
239 fn derive_table_catalog(
240 rewritten_input: PlanRef,
241 name: String,
242 database_id: DatabaseId,
243 schema_id: SchemaId,
244 user_order_by: Order,
245 columns: Vec<ColumnCatalog>,
246 definition: String,
247 conflict_behavior: ConflictBehavior,
248 version_column_index: Option<usize>,
249 pk_column_indices: Option<Vec<usize>>, row_id_index: Option<usize>,
251 table_type: TableType,
252 version: Option<TableVersion>,
253 cardinality: Cardinality,
254 retention_seconds: Option<NonZeroU32>,
255 create_type: CreateType,
256 webhook_info: Option<PbWebhookSourceInfo>,
257 engine: Engine,
258 ) -> Result<TableCatalog> {
259 let input = rewritten_input;
260
261 let value_indices = (0..columns.len()).collect_vec();
262 let distribution_key = input.distribution().dist_column_indices().to_vec();
263 let append_only = input.append_only();
264 let watermark_columns = input.watermark_columns().indices().collect();
267
268 let (table_pk, stream_key) = if let Some(pk_column_indices) = pk_column_indices {
269 let table_pk = pk_column_indices
270 .iter()
271 .map(|idx| ColumnOrder::new(*idx, OrderType::ascending()))
272 .collect();
273 (table_pk, pk_column_indices)
275 } else {
276 derive_pk(input, user_order_by, &columns)
277 };
278 let read_prefix_len_hint = table_pk.len();
281 Ok(TableCatalog {
282 id: TableId::placeholder(),
283 schema_id,
284 database_id,
285 associated_source_id: None,
286 name,
287 dependent_relations: vec![],
288 columns,
289 pk: table_pk,
290 stream_key,
291 distribution_key,
292 table_type,
293 append_only,
294 owner: risingwave_common::catalog::DEFAULT_SUPER_USER_ID,
295 fragment_id: OBJECT_ID_PLACEHOLDER,
296 dml_fragment_id: None,
297 vnode_col_index: None,
298 row_id_index,
299 value_indices,
300 definition,
301 conflict_behavior,
302 version_column_index,
303 read_prefix_len_hint,
304 version,
305 watermark_columns,
306 dist_key_in_pk: vec![],
307 cardinality,
308 created_at_epoch: None,
309 initialized_at_epoch: None,
310 cleaned_by_watermark: false,
311 create_type,
312 stream_job_status: StreamJobStatus::Creating,
313 description: None,
314 incoming_sinks: vec![],
315 initialized_at_cluster_version: None,
316 created_at_cluster_version: None,
317 retention_seconds: retention_seconds.map(|i| i.into()),
318 cdc_table_id: None,
319 vnode_count: VnodeCount::Placeholder, webhook_info,
321 job_id: None,
322 engine: match table_type {
323 TableType::Table => engine,
324 TableType::MaterializedView | TableType::Index | TableType::Internal => {
325 assert_eq!(engine, Engine::Hummock);
326 engine
327 }
328 },
329 clean_watermark_index_in_pk: None, })
331 }
332
333 #[must_use]
335 pub fn table(&self) -> &TableCatalog {
336 &self.table
337 }
338
339 pub fn name(&self) -> &str {
340 self.table.name()
341 }
342}
343
344impl Distill for StreamMaterialize {
345 fn distill<'a>(&self) -> XmlNode<'a> {
346 let table = self.table();
347
348 let column_names = (table.columns.iter())
349 .map(|col| col.name_with_hidden().to_string())
350 .map(Pretty::from)
351 .collect();
352
353 let stream_key = (table.stream_key.iter())
354 .map(|&k| table.columns[k].name().to_owned())
355 .map(Pretty::from)
356 .collect();
357
358 let pk_columns = (table.pk.iter())
359 .map(|o| table.columns[o.column_index].name().to_owned())
360 .map(Pretty::from)
361 .collect();
362 let mut vec = Vec::with_capacity(5);
363 vec.push(("columns", Pretty::Array(column_names)));
364 vec.push(("stream_key", Pretty::Array(stream_key)));
365 vec.push(("pk_columns", Pretty::Array(pk_columns)));
366 let pk_conflict_behavior = self.table.conflict_behavior().debug_to_string();
367
368 vec.push(("pk_conflict", Pretty::from(pk_conflict_behavior)));
369
370 let watermark_columns = &self.base.watermark_columns();
371 if self.base.watermark_columns().n_indices() > 0 {
372 let watermark_column_names = watermark_columns
374 .indices()
375 .map(|i| table.columns()[i].name_with_hidden().to_string())
376 .map(Pretty::from)
377 .collect();
378 vec.push(("watermark_columns", Pretty::Array(watermark_column_names)));
379 };
380 childless_record("StreamMaterialize", vec)
381 }
382}
383
384impl PlanTreeNodeUnary for StreamMaterialize {
385 fn input(&self) -> PlanRef {
386 self.input.clone()
387 }
388
389 fn clone_with_input(&self, input: PlanRef) -> Self {
390 let new = Self::new(input, self.table().clone());
391 new.base
392 .schema()
393 .fields
394 .iter()
395 .zip_eq_fast(self.base.schema().fields.iter())
396 .for_each(|(a, b)| {
397 assert_eq!(a.data_type, b.data_type);
398 });
399 assert_eq!(new.plan_base().stream_key(), self.plan_base().stream_key());
400 new
401 }
402}
403
404impl_plan_tree_node_for_unary! { StreamMaterialize }
405
406impl StreamNode for StreamMaterialize {
407 fn to_stream_prost_body(&self, _state: &mut BuildFragmentGraphState) -> PbNodeBody {
408 use risingwave_pb::stream_plan::*;
409
410 PbNodeBody::Materialize(Box::new(MaterializeNode {
411 table_id: 0,
414 column_orders: self
415 .table()
416 .pk()
417 .iter()
418 .map(ColumnOrder::to_protobuf)
419 .collect(),
420 table: Some(self.table().to_internal_table_prost()),
421 }))
422 }
423}
424
425impl ExprRewritable for StreamMaterialize {}
426
427impl ExprVisitable for StreamMaterialize {}