risingwave_frontend/optimizer/plan_node/
stream_materialize.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::assert_matches::assert_matches;
16use std::num::NonZeroU32;
17
18use itertools::Itertools;
19use pretty_xmlish::{Pretty, XmlNode};
20use risingwave_common::catalog::{
21    ColumnCatalog, ConflictBehavior, CreateType, Engine, OBJECT_ID_PLACEHOLDER, StreamJobStatus,
22    TableId,
23};
24use risingwave_common::hash::VnodeCount;
25use risingwave_common::util::iter_util::ZipEqFast;
26use risingwave_common::util::sort_util::{ColumnOrder, OrderType};
27use risingwave_pb::catalog::PbWebhookSourceInfo;
28use risingwave_pb::stream_plan::stream_node::PbNodeBody;
29
30use super::derive::derive_columns;
31use super::stream::prelude::*;
32use super::utils::{Distill, childless_record};
33use super::{ExprRewritable, PlanRef, PlanTreeNodeUnary, StreamNode, reorganize_elements_id};
34use crate::catalog::table_catalog::{TableCatalog, TableType, TableVersion};
35use crate::catalog::{DatabaseId, SchemaId};
36use crate::error::Result;
37use crate::optimizer::StreamOptimizedLogicalPlanRoot;
38use crate::optimizer::plan_node::derive::derive_pk;
39use crate::optimizer::plan_node::expr_visitable::ExprVisitable;
40use crate::optimizer::plan_node::utils::plan_can_use_background_ddl;
41use crate::optimizer::plan_node::{PlanBase, PlanNodeMeta};
42use crate::optimizer::property::{Cardinality, Distribution, Order, RequiredDist};
43use crate::stream_fragmenter::BuildFragmentGraphState;
44
45/// Materializes a stream.
46#[derive(Debug, Clone, PartialEq, Eq, Hash)]
47pub struct StreamMaterialize {
48    pub base: PlanBase<Stream>,
49    /// Child of Materialize plan
50    input: PlanRef,
51    table: TableCatalog,
52}
53
54impl StreamMaterialize {
55    #[must_use]
56    pub fn new(input: PlanRef, table: TableCatalog) -> Self {
57        let base = PlanBase::new_stream(
58            input.ctx(),
59            input.schema().clone(),
60            Some(table.stream_key.clone()),
61            input.functional_dependency().clone(),
62            input.distribution().clone(),
63            input.append_only(),
64            input.emit_on_window_close(),
65            input.watermark_columns().clone(),
66            input.columns_monotonicity().clone(),
67        );
68        Self { base, input, table }
69    }
70
71    /// Create a materialize node, for `MATERIALIZED VIEW` and `INDEX`.
72    ///
73    /// When creating index, `TableType` should be `Index`. Then, materialize will distribute keys
74    /// using `user_distributed_by`.
75    #[allow(clippy::too_many_arguments)]
76    pub fn create(
77        StreamOptimizedLogicalPlanRoot {
78            plan: input,
79            required_dist: user_distributed_by,
80            required_order: user_order_by,
81            out_fields: user_cols,
82            out_names,
83            ..
84        }: StreamOptimizedLogicalPlanRoot,
85        name: String,
86        database_id: DatabaseId,
87        schema_id: SchemaId,
88        definition: String,
89        table_type: TableType,
90        cardinality: Cardinality,
91        retention_seconds: Option<NonZeroU32>,
92    ) -> Result<Self> {
93        let input = Self::rewrite_input(input, user_distributed_by, table_type)?;
94        // the hidden column name might refer some expr id
95        let input = reorganize_elements_id(input);
96        let columns = derive_columns(input.schema(), out_names, &user_cols)?;
97
98        let create_type = if matches!(table_type, TableType::MaterializedView)
99            && input.ctx().session_ctx().config().background_ddl()
100            && plan_can_use_background_ddl(&input)
101        {
102            CreateType::Background
103        } else {
104            CreateType::Foreground
105        };
106
107        let table = Self::derive_table_catalog(
108            input.clone(),
109            name,
110            database_id,
111            schema_id,
112            user_order_by,
113            columns,
114            definition,
115            ConflictBehavior::NoCheck,
116            None,
117            None,
118            None,
119            table_type,
120            None,
121            cardinality,
122            retention_seconds,
123            create_type,
124            None,
125            Engine::Hummock,
126        )?;
127
128        Ok(Self::new(input, table))
129    }
130
131    /// Create a materialize node, for `TABLE`.
132    ///
133    /// Different from `create`, the `columns` are passed in directly, instead of being derived from
134    /// the input. So the column IDs are preserved from the SQL columns binding step and will be
135    /// consistent with the source node and DML node.
136    #[allow(clippy::too_many_arguments)]
137    pub fn create_for_table(
138        input: PlanRef,
139        name: String,
140        database_id: DatabaseId,
141        schema_id: SchemaId,
142        user_distributed_by: RequiredDist,
143        user_order_by: Order,
144        columns: Vec<ColumnCatalog>,
145        definition: String,
146        conflict_behavior: ConflictBehavior,
147        version_column_index: Option<usize>,
148        pk_column_indices: Vec<usize>,
149        row_id_index: Option<usize>,
150        version: TableVersion,
151        retention_seconds: Option<NonZeroU32>,
152        webhook_info: Option<PbWebhookSourceInfo>,
153        engine: Engine,
154    ) -> Result<Self> {
155        let input = Self::rewrite_input(input, user_distributed_by, TableType::Table)?;
156
157        let table = Self::derive_table_catalog(
158            input.clone(),
159            name,
160            database_id,
161            schema_id,
162            user_order_by,
163            columns,
164            definition,
165            conflict_behavior,
166            version_column_index,
167            Some(pk_column_indices),
168            row_id_index,
169            TableType::Table,
170            Some(version),
171            Cardinality::unknown(), // unknown cardinality for tables
172            retention_seconds,
173            CreateType::Foreground,
174            webhook_info,
175            engine,
176        )?;
177
178        Ok(Self::new(input, table))
179    }
180
181    /// Rewrite the input to satisfy the required distribution if necessary, according to the type.
182    fn rewrite_input(
183        input: PlanRef,
184        user_distributed_by: RequiredDist,
185        table_type: TableType,
186    ) -> Result<PlanRef> {
187        let required_dist = match input.distribution() {
188            Distribution::Single => RequiredDist::single(),
189            _ => match table_type {
190                TableType::Table => {
191                    assert_matches!(user_distributed_by, RequiredDist::ShardByKey(_));
192                    user_distributed_by
193                }
194                TableType::MaterializedView => {
195                    assert_matches!(user_distributed_by, RequiredDist::Any);
196                    // ensure the same pk will not shuffle to different node
197                    let required_dist =
198                        RequiredDist::shard_by_key(input.schema().len(), input.expect_stream_key());
199
200                    // If the input is a stream join, enforce the stream key as the materialized
201                    // view distribution key to avoid slow backfilling caused by
202                    // data skew of the dimension table join key.
203                    // See <https://github.com/risingwavelabs/risingwave/issues/12824> for more information.
204                    let is_stream_join = matches!(input.as_stream_hash_join(), Some(_join))
205                        || matches!(input.as_stream_temporal_join(), Some(_join))
206                        || matches!(input.as_stream_delta_join(), Some(_join));
207
208                    if is_stream_join {
209                        return Ok(required_dist.enforce(input, &Order::any()));
210                    }
211
212                    required_dist
213                }
214                TableType::Index => {
215                    assert_matches!(
216                        user_distributed_by,
217                        RequiredDist::PhysicalDist(Distribution::HashShard(_))
218                    );
219                    user_distributed_by
220                }
221                TableType::Internal => unreachable!(),
222            },
223        };
224
225        required_dist.enforce_if_not_satisfies(input, &Order::any())
226    }
227
228    /// Derive the table catalog with the given arguments.
229    ///
230    /// - The caller must ensure the validity of the given `columns`.
231    /// - The `rewritten_input` should be generated by `rewrite_input`.
232    #[allow(clippy::too_many_arguments)]
233    fn derive_table_catalog(
234        rewritten_input: PlanRef,
235        name: String,
236        database_id: DatabaseId,
237        schema_id: SchemaId,
238        user_order_by: Order,
239        columns: Vec<ColumnCatalog>,
240        definition: String,
241        conflict_behavior: ConflictBehavior,
242        version_column_index: Option<usize>,
243        pk_column_indices: Option<Vec<usize>>, // Is some when create table
244        row_id_index: Option<usize>,
245        table_type: TableType,
246        version: Option<TableVersion>,
247        cardinality: Cardinality,
248        retention_seconds: Option<NonZeroU32>,
249        create_type: CreateType,
250        webhook_info: Option<PbWebhookSourceInfo>,
251        engine: Engine,
252    ) -> Result<TableCatalog> {
253        let input = rewritten_input;
254
255        let value_indices = (0..columns.len()).collect_vec();
256        let distribution_key = input.distribution().dist_column_indices().to_vec();
257        let append_only = input.append_only();
258        // TODO(rc): In `TableCatalog` we still use `FixedBitSet` for watermark columns, ignoring the watermark group information.
259        // We will record the watermark group information in `TableCatalog` in the future. For now, let's flatten the watermark columns.
260        let watermark_columns = input.watermark_columns().indices().collect();
261
262        let (table_pk, stream_key) = if let Some(pk_column_indices) = pk_column_indices {
263            let table_pk = pk_column_indices
264                .iter()
265                .map(|idx| ColumnOrder::new(*idx, OrderType::ascending()))
266                .collect();
267            // No order by for create table, so stream key is identical to table pk.
268            (table_pk, pk_column_indices)
269        } else {
270            derive_pk(input, user_order_by, &columns)
271        };
272        // assert: `stream_key` is a subset of `table_pk`
273
274        let read_prefix_len_hint = table_pk.len();
275        Ok(TableCatalog {
276            id: TableId::placeholder(),
277            schema_id,
278            database_id,
279            associated_source_id: None,
280            name,
281            columns,
282            pk: table_pk,
283            stream_key,
284            distribution_key,
285            table_type,
286            append_only,
287            owner: risingwave_common::catalog::DEFAULT_SUPER_USER_ID,
288            fragment_id: OBJECT_ID_PLACEHOLDER,
289            dml_fragment_id: None,
290            vnode_col_index: None,
291            row_id_index,
292            value_indices,
293            definition,
294            conflict_behavior,
295            version_column_index,
296            read_prefix_len_hint,
297            version,
298            watermark_columns,
299            dist_key_in_pk: vec![],
300            cardinality,
301            created_at_epoch: None,
302            initialized_at_epoch: None,
303            cleaned_by_watermark: false,
304            create_type,
305            stream_job_status: StreamJobStatus::Creating,
306            description: None,
307            incoming_sinks: vec![],
308            initialized_at_cluster_version: None,
309            created_at_cluster_version: None,
310            retention_seconds: retention_seconds.map(|i| i.into()),
311            cdc_table_id: None,
312            vnode_count: VnodeCount::Placeholder, // will be filled in by the meta service later
313            webhook_info,
314            job_id: None,
315            engine: match table_type {
316                TableType::Table => engine,
317                TableType::MaterializedView | TableType::Index | TableType::Internal => {
318                    assert_eq!(engine, Engine::Hummock);
319                    engine
320                }
321            },
322            clean_watermark_index_in_pk: None, // TODO: fill this field
323        })
324    }
325
326    /// Get a reference to the stream materialize's table.
327    #[must_use]
328    pub fn table(&self) -> &TableCatalog {
329        &self.table
330    }
331
332    pub fn name(&self) -> &str {
333        self.table.name()
334    }
335}
336
337impl Distill for StreamMaterialize {
338    fn distill<'a>(&self) -> XmlNode<'a> {
339        let table = self.table();
340
341        let column_names = (table.columns.iter())
342            .map(|col| col.name_with_hidden().to_string())
343            .map(Pretty::from)
344            .collect();
345
346        let stream_key = (table.stream_key.iter())
347            .map(|&k| table.columns[k].name().to_owned())
348            .map(Pretty::from)
349            .collect();
350
351        let pk_columns = (table.pk.iter())
352            .map(|o| table.columns[o.column_index].name().to_owned())
353            .map(Pretty::from)
354            .collect();
355        let mut vec = Vec::with_capacity(5);
356        vec.push(("columns", Pretty::Array(column_names)));
357        vec.push(("stream_key", Pretty::Array(stream_key)));
358        vec.push(("pk_columns", Pretty::Array(pk_columns)));
359        let pk_conflict_behavior = self.table.conflict_behavior().debug_to_string();
360
361        vec.push(("pk_conflict", Pretty::from(pk_conflict_behavior)));
362
363        let watermark_columns = &self.base.watermark_columns();
364        if self.base.watermark_columns().n_indices() > 0 {
365            // TODO(rc): we ignore the watermark group info here, will be fixed it later
366            let watermark_column_names = watermark_columns
367                .indices()
368                .map(|i| table.columns()[i].name_with_hidden().to_string())
369                .map(Pretty::from)
370                .collect();
371            vec.push(("watermark_columns", Pretty::Array(watermark_column_names)));
372        };
373        childless_record("StreamMaterialize", vec)
374    }
375}
376
377impl PlanTreeNodeUnary for StreamMaterialize {
378    fn input(&self) -> PlanRef {
379        self.input.clone()
380    }
381
382    fn clone_with_input(&self, input: PlanRef) -> Self {
383        let new = Self::new(input, self.table().clone());
384        new.base
385            .schema()
386            .fields
387            .iter()
388            .zip_eq_fast(self.base.schema().fields.iter())
389            .for_each(|(a, b)| {
390                assert_eq!(a.data_type, b.data_type);
391            });
392        assert_eq!(new.plan_base().stream_key(), self.plan_base().stream_key());
393        new
394    }
395}
396
397impl_plan_tree_node_for_unary! { StreamMaterialize }
398
399impl StreamNode for StreamMaterialize {
400    fn to_stream_prost_body(&self, _state: &mut BuildFragmentGraphState) -> PbNodeBody {
401        use risingwave_pb::stream_plan::*;
402
403        PbNodeBody::Materialize(Box::new(MaterializeNode {
404            // Do not fill `table` and `table_id` here to avoid duplication. It will be filled by
405            // meta service after global information is generated.
406            table_id: 0,
407            table: None,
408
409            column_orders: self
410                .table()
411                .pk()
412                .iter()
413                .map(ColumnOrder::to_protobuf)
414                .collect(),
415        }))
416    }
417}
418
419impl ExprRewritable for StreamMaterialize {}
420
421impl ExprVisitable for StreamMaterialize {}