1use std::assert_matches::assert_matches;
16use std::num::NonZeroU32;
17
18use fixedbitset::FixedBitSet;
19use itertools::Itertools;
20use pretty_xmlish::{Pretty, XmlNode};
21use risingwave_common::catalog::{
22 ColumnCatalog, ConflictBehavior, CreateType, Engine, OBJECT_ID_PLACEHOLDER, StreamJobStatus,
23 TableId,
24};
25use risingwave_common::hash::VnodeCount;
26use risingwave_common::util::iter_util::ZipEqFast;
27use risingwave_common::util::sort_util::{ColumnOrder, OrderType};
28use risingwave_pb::catalog::PbWebhookSourceInfo;
29use risingwave_pb::stream_plan::stream_node::PbNodeBody;
30
31use super::derive::derive_columns;
32use super::stream::prelude::*;
33use super::utils::{Distill, childless_record};
34use super::{ExprRewritable, PlanRef, PlanTreeNodeUnary, StreamNode, reorganize_elements_id};
35use crate::catalog::table_catalog::{TableCatalog, TableType, TableVersion};
36use crate::catalog::{DatabaseId, SchemaId};
37use crate::error::Result;
38use crate::optimizer::plan_node::derive::derive_pk;
39use crate::optimizer::plan_node::expr_visitable::ExprVisitable;
40use crate::optimizer::plan_node::utils::plan_can_use_background_ddl;
41use crate::optimizer::plan_node::{PlanBase, PlanNodeMeta};
42use crate::optimizer::property::{Cardinality, Distribution, Order, RequiredDist};
43use crate::stream_fragmenter::BuildFragmentGraphState;
44
45#[derive(Debug, Clone, PartialEq, Eq, Hash)]
47pub struct StreamMaterialize {
48 pub base: PlanBase<Stream>,
49 input: PlanRef,
51 table: TableCatalog,
52}
53
54impl StreamMaterialize {
55 #[must_use]
56 pub fn new(input: PlanRef, table: TableCatalog) -> Self {
57 let base = PlanBase::new_stream(
58 input.ctx(),
59 input.schema().clone(),
60 Some(table.stream_key.clone()),
61 input.functional_dependency().clone(),
62 input.distribution().clone(),
63 input.append_only(),
64 input.emit_on_window_close(),
65 input.watermark_columns().clone(),
66 input.columns_monotonicity().clone(),
67 );
68 Self { base, input, table }
69 }
70
71 #[allow(clippy::too_many_arguments)]
76 pub fn create(
77 input: PlanRef,
78 name: String,
79 database_id: DatabaseId,
80 schema_id: SchemaId,
81 user_distributed_by: RequiredDist,
82 user_order_by: Order,
83 user_cols: FixedBitSet,
84 out_names: Vec<String>,
85 definition: String,
86 table_type: TableType,
87 cardinality: Cardinality,
88 retention_seconds: Option<NonZeroU32>,
89 ) -> Result<Self> {
90 let input = Self::rewrite_input(input, user_distributed_by, table_type)?;
91 let input = reorganize_elements_id(input);
93 let columns = derive_columns(input.schema(), out_names, &user_cols)?;
94
95 let create_type = if matches!(table_type, TableType::MaterializedView)
96 && input.ctx().session_ctx().config().background_ddl()
97 && plan_can_use_background_ddl(&input)
98 {
99 CreateType::Background
100 } else {
101 CreateType::Foreground
102 };
103
104 let table = Self::derive_table_catalog(
105 input.clone(),
106 name,
107 database_id,
108 schema_id,
109 user_order_by,
110 columns,
111 definition,
112 ConflictBehavior::NoCheck,
113 None,
114 None,
115 None,
116 table_type,
117 None,
118 cardinality,
119 retention_seconds,
120 create_type,
121 None,
122 Engine::Hummock,
123 )?;
124
125 Ok(Self::new(input, table))
126 }
127
128 #[allow(clippy::too_many_arguments)]
134 pub fn create_for_table(
135 input: PlanRef,
136 name: String,
137 database_id: DatabaseId,
138 schema_id: SchemaId,
139 user_distributed_by: RequiredDist,
140 user_order_by: Order,
141 columns: Vec<ColumnCatalog>,
142 definition: String,
143 conflict_behavior: ConflictBehavior,
144 version_column_index: Option<usize>,
145 pk_column_indices: Vec<usize>,
146 row_id_index: Option<usize>,
147 version: TableVersion,
148 retention_seconds: Option<NonZeroU32>,
149 webhook_info: Option<PbWebhookSourceInfo>,
150 engine: Engine,
151 ) -> Result<Self> {
152 let input = Self::rewrite_input(input, user_distributed_by, TableType::Table)?;
153
154 let table = Self::derive_table_catalog(
155 input.clone(),
156 name,
157 database_id,
158 schema_id,
159 user_order_by,
160 columns,
161 definition,
162 conflict_behavior,
163 version_column_index,
164 Some(pk_column_indices),
165 row_id_index,
166 TableType::Table,
167 Some(version),
168 Cardinality::unknown(), retention_seconds,
170 CreateType::Foreground,
171 webhook_info,
172 engine,
173 )?;
174
175 Ok(Self::new(input, table))
176 }
177
178 fn rewrite_input(
180 input: PlanRef,
181 user_distributed_by: RequiredDist,
182 table_type: TableType,
183 ) -> Result<PlanRef> {
184 let required_dist = match input.distribution() {
185 Distribution::Single => RequiredDist::single(),
186 _ => match table_type {
187 TableType::Table => {
188 assert_matches!(user_distributed_by, RequiredDist::ShardByKey(_));
189 user_distributed_by
190 }
191 TableType::MaterializedView => {
192 assert_matches!(user_distributed_by, RequiredDist::Any);
193 let required_dist =
195 RequiredDist::shard_by_key(input.schema().len(), input.expect_stream_key());
196
197 let is_stream_join = matches!(input.as_stream_hash_join(), Some(_join))
202 || matches!(input.as_stream_temporal_join(), Some(_join))
203 || matches!(input.as_stream_delta_join(), Some(_join));
204
205 if is_stream_join {
206 return Ok(required_dist.enforce(input, &Order::any()));
207 }
208
209 required_dist
210 }
211 TableType::Index => {
212 assert_matches!(
213 user_distributed_by,
214 RequiredDist::PhysicalDist(Distribution::HashShard(_))
215 );
216 user_distributed_by
217 }
218 TableType::Internal => unreachable!(),
219 },
220 };
221
222 required_dist.enforce_if_not_satisfies(input, &Order::any())
223 }
224
225 #[allow(clippy::too_many_arguments)]
230 fn derive_table_catalog(
231 rewritten_input: PlanRef,
232 name: String,
233 database_id: DatabaseId,
234 schema_id: SchemaId,
235 user_order_by: Order,
236 columns: Vec<ColumnCatalog>,
237 definition: String,
238 conflict_behavior: ConflictBehavior,
239 version_column_index: Option<usize>,
240 pk_column_indices: Option<Vec<usize>>, row_id_index: Option<usize>,
242 table_type: TableType,
243 version: Option<TableVersion>,
244 cardinality: Cardinality,
245 retention_seconds: Option<NonZeroU32>,
246 create_type: CreateType,
247 webhook_info: Option<PbWebhookSourceInfo>,
248 engine: Engine,
249 ) -> Result<TableCatalog> {
250 let input = rewritten_input;
251
252 let value_indices = (0..columns.len()).collect_vec();
253 let distribution_key = input.distribution().dist_column_indices().to_vec();
254 let append_only = input.append_only();
255 let watermark_columns = input.watermark_columns().indices().collect();
258
259 let (table_pk, stream_key) = if let Some(pk_column_indices) = pk_column_indices {
260 let table_pk = pk_column_indices
261 .iter()
262 .map(|idx| ColumnOrder::new(*idx, OrderType::ascending()))
263 .collect();
264 (table_pk, pk_column_indices)
266 } else {
267 derive_pk(input, user_order_by, &columns)
268 };
269 let read_prefix_len_hint = table_pk.len();
272 Ok(TableCatalog {
273 id: TableId::placeholder(),
274 schema_id,
275 database_id,
276 associated_source_id: None,
277 name,
278 dependent_relations: vec![],
279 columns,
280 pk: table_pk,
281 stream_key,
282 distribution_key,
283 table_type,
284 append_only,
285 owner: risingwave_common::catalog::DEFAULT_SUPER_USER_ID,
286 fragment_id: OBJECT_ID_PLACEHOLDER,
287 dml_fragment_id: None,
288 vnode_col_index: None,
289 row_id_index,
290 value_indices,
291 definition,
292 conflict_behavior,
293 version_column_index,
294 read_prefix_len_hint,
295 version,
296 watermark_columns,
297 dist_key_in_pk: vec![],
298 cardinality,
299 created_at_epoch: None,
300 initialized_at_epoch: None,
301 cleaned_by_watermark: false,
302 create_type,
303 stream_job_status: StreamJobStatus::Creating,
304 description: None,
305 incoming_sinks: vec![],
306 initialized_at_cluster_version: None,
307 created_at_cluster_version: None,
308 retention_seconds: retention_seconds.map(|i| i.into()),
309 cdc_table_id: None,
310 vnode_count: VnodeCount::Placeholder, webhook_info,
312 job_id: None,
313 engine: match table_type {
314 TableType::Table => engine,
315 TableType::MaterializedView | TableType::Index | TableType::Internal => {
316 assert_eq!(engine, Engine::Hummock);
317 engine
318 }
319 },
320 clean_watermark_index_in_pk: None, })
322 }
323
324 #[must_use]
326 pub fn table(&self) -> &TableCatalog {
327 &self.table
328 }
329
330 pub fn name(&self) -> &str {
331 self.table.name()
332 }
333}
334
335impl Distill for StreamMaterialize {
336 fn distill<'a>(&self) -> XmlNode<'a> {
337 let table = self.table();
338
339 let column_names = (table.columns.iter())
340 .map(|col| col.name_with_hidden().to_string())
341 .map(Pretty::from)
342 .collect();
343
344 let stream_key = (table.stream_key.iter())
345 .map(|&k| table.columns[k].name().to_owned())
346 .map(Pretty::from)
347 .collect();
348
349 let pk_columns = (table.pk.iter())
350 .map(|o| table.columns[o.column_index].name().to_owned())
351 .map(Pretty::from)
352 .collect();
353 let mut vec = Vec::with_capacity(5);
354 vec.push(("columns", Pretty::Array(column_names)));
355 vec.push(("stream_key", Pretty::Array(stream_key)));
356 vec.push(("pk_columns", Pretty::Array(pk_columns)));
357 let pk_conflict_behavior = self.table.conflict_behavior().debug_to_string();
358
359 vec.push(("pk_conflict", Pretty::from(pk_conflict_behavior)));
360
361 let watermark_columns = &self.base.watermark_columns();
362 if self.base.watermark_columns().n_indices() > 0 {
363 let watermark_column_names = watermark_columns
365 .indices()
366 .map(|i| table.columns()[i].name_with_hidden().to_string())
367 .map(Pretty::from)
368 .collect();
369 vec.push(("watermark_columns", Pretty::Array(watermark_column_names)));
370 };
371 childless_record("StreamMaterialize", vec)
372 }
373}
374
375impl PlanTreeNodeUnary for StreamMaterialize {
376 fn input(&self) -> PlanRef {
377 self.input.clone()
378 }
379
380 fn clone_with_input(&self, input: PlanRef) -> Self {
381 let new = Self::new(input, self.table().clone());
382 new.base
383 .schema()
384 .fields
385 .iter()
386 .zip_eq_fast(self.base.schema().fields.iter())
387 .for_each(|(a, b)| {
388 assert_eq!(a.data_type, b.data_type);
389 });
390 assert_eq!(new.plan_base().stream_key(), self.plan_base().stream_key());
391 new
392 }
393}
394
395impl_plan_tree_node_for_unary! { StreamMaterialize }
396
397impl StreamNode for StreamMaterialize {
398 fn to_stream_prost_body(&self, _state: &mut BuildFragmentGraphState) -> PbNodeBody {
399 use risingwave_pb::stream_plan::*;
400
401 PbNodeBody::Materialize(Box::new(MaterializeNode {
402 table_id: 0,
405 column_orders: self
406 .table()
407 .pk()
408 .iter()
409 .map(ColumnOrder::to_protobuf)
410 .collect(),
411 table: Some(self.table().to_internal_table_prost()),
412 }))
413 }
414}
415
416impl ExprRewritable for StreamMaterialize {}
417
418impl ExprVisitable for StreamMaterialize {}