risingwave_frontend/optimizer/plan_node/
stream_materialize.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::assert_matches::assert_matches;
16use std::num::NonZeroU32;
17
18use fixedbitset::FixedBitSet;
19use itertools::Itertools;
20use pretty_xmlish::{Pretty, XmlNode};
21use risingwave_common::catalog::{
22    ColumnCatalog, ConflictBehavior, CreateType, Engine, OBJECT_ID_PLACEHOLDER, StreamJobStatus,
23    TableId,
24};
25use risingwave_common::hash::VnodeCount;
26use risingwave_common::util::iter_util::ZipEqFast;
27use risingwave_common::util::sort_util::{ColumnOrder, OrderType};
28use risingwave_pb::catalog::PbWebhookSourceInfo;
29use risingwave_pb::stream_plan::stream_node::PbNodeBody;
30
31use super::derive::derive_columns;
32use super::stream::prelude::*;
33use super::utils::{Distill, childless_record};
34use super::{ExprRewritable, PlanRef, PlanTreeNodeUnary, StreamNode, reorganize_elements_id};
35use crate::catalog::table_catalog::{TableCatalog, TableType, TableVersion};
36use crate::catalog::{DatabaseId, SchemaId};
37use crate::error::Result;
38use crate::optimizer::plan_node::derive::derive_pk;
39use crate::optimizer::plan_node::expr_visitable::ExprVisitable;
40use crate::optimizer::plan_node::utils::plan_can_use_background_ddl;
41use crate::optimizer::plan_node::{PlanBase, PlanNodeMeta};
42use crate::optimizer::property::{Cardinality, Distribution, Order, RequiredDist};
43use crate::stream_fragmenter::BuildFragmentGraphState;
44
45/// Materializes a stream.
46#[derive(Debug, Clone, PartialEq, Eq, Hash)]
47pub struct StreamMaterialize {
48    pub base: PlanBase<Stream>,
49    /// Child of Materialize plan
50    input: PlanRef,
51    table: TableCatalog,
52}
53
54impl StreamMaterialize {
55    #[must_use]
56    pub fn new(input: PlanRef, table: TableCatalog) -> Self {
57        let base = PlanBase::new_stream(
58            input.ctx(),
59            input.schema().clone(),
60            Some(table.stream_key.clone()),
61            input.functional_dependency().clone(),
62            input.distribution().clone(),
63            input.append_only(),
64            input.emit_on_window_close(),
65            input.watermark_columns().clone(),
66            input.columns_monotonicity().clone(),
67        );
68        Self { base, input, table }
69    }
70
71    /// Create a materialize node, for `MATERIALIZED VIEW` and `INDEX`.
72    ///
73    /// When creating index, `TableType` should be `Index`. Then, materialize will distribute keys
74    /// using `user_distributed_by`.
75    #[allow(clippy::too_many_arguments)]
76    pub fn create(
77        input: PlanRef,
78        name: String,
79        database_id: DatabaseId,
80        schema_id: SchemaId,
81        user_distributed_by: RequiredDist,
82        user_order_by: Order,
83        user_cols: FixedBitSet,
84        out_names: Vec<String>,
85        definition: String,
86        table_type: TableType,
87        cardinality: Cardinality,
88        retention_seconds: Option<NonZeroU32>,
89    ) -> Result<Self> {
90        let input = Self::rewrite_input(input, user_distributed_by, table_type)?;
91        // the hidden column name might refer some expr id
92        let input = reorganize_elements_id(input);
93        let columns = derive_columns(input.schema(), out_names, &user_cols)?;
94
95        let create_type = if matches!(table_type, TableType::MaterializedView)
96            && input.ctx().session_ctx().config().background_ddl()
97            && plan_can_use_background_ddl(&input)
98        {
99            CreateType::Background
100        } else {
101            CreateType::Foreground
102        };
103
104        let table = Self::derive_table_catalog(
105            input.clone(),
106            name,
107            database_id,
108            schema_id,
109            user_order_by,
110            columns,
111            definition,
112            ConflictBehavior::NoCheck,
113            None,
114            None,
115            None,
116            table_type,
117            None,
118            cardinality,
119            retention_seconds,
120            create_type,
121            None,
122            Engine::Hummock,
123        )?;
124
125        Ok(Self::new(input, table))
126    }
127
128    /// Create a materialize node, for `TABLE`.
129    ///
130    /// Different from `create`, the `columns` are passed in directly, instead of being derived from
131    /// the input. So the column IDs are preserved from the SQL columns binding step and will be
132    /// consistent with the source node and DML node.
133    #[allow(clippy::too_many_arguments)]
134    pub fn create_for_table(
135        input: PlanRef,
136        name: String,
137        database_id: DatabaseId,
138        schema_id: SchemaId,
139        user_distributed_by: RequiredDist,
140        user_order_by: Order,
141        columns: Vec<ColumnCatalog>,
142        definition: String,
143        conflict_behavior: ConflictBehavior,
144        version_column_index: Option<usize>,
145        pk_column_indices: Vec<usize>,
146        row_id_index: Option<usize>,
147        version: TableVersion,
148        retention_seconds: Option<NonZeroU32>,
149        webhook_info: Option<PbWebhookSourceInfo>,
150        engine: Engine,
151    ) -> Result<Self> {
152        let input = Self::rewrite_input(input, user_distributed_by, TableType::Table)?;
153
154        let table = Self::derive_table_catalog(
155            input.clone(),
156            name,
157            database_id,
158            schema_id,
159            user_order_by,
160            columns,
161            definition,
162            conflict_behavior,
163            version_column_index,
164            Some(pk_column_indices),
165            row_id_index,
166            TableType::Table,
167            Some(version),
168            Cardinality::unknown(), // unknown cardinality for tables
169            retention_seconds,
170            CreateType::Foreground,
171            webhook_info,
172            engine,
173        )?;
174
175        Ok(Self::new(input, table))
176    }
177
178    /// Rewrite the input to satisfy the required distribution if necessary, according to the type.
179    fn rewrite_input(
180        input: PlanRef,
181        user_distributed_by: RequiredDist,
182        table_type: TableType,
183    ) -> Result<PlanRef> {
184        let required_dist = match input.distribution() {
185            Distribution::Single => RequiredDist::single(),
186            _ => match table_type {
187                TableType::Table => {
188                    assert_matches!(user_distributed_by, RequiredDist::ShardByKey(_));
189                    user_distributed_by
190                }
191                TableType::MaterializedView => {
192                    match user_distributed_by {
193                        RequiredDist::PhysicalDist(Distribution::HashShard(_)) => {
194                            user_distributed_by
195                        }
196                        RequiredDist::Any => {
197                            // ensure the same pk will not shuffle to different node
198                            let required_dist = RequiredDist::shard_by_key(
199                                input.schema().len(),
200                                input.expect_stream_key(),
201                            );
202
203                            // If the input is a stream join, enforce the stream key as the materialized
204                            // view distribution key to avoid slow backfilling caused by
205                            // data skew of the dimension table join key.
206                            // See <https://github.com/risingwavelabs/risingwave/issues/12824> for more information.
207                            let is_stream_join = matches!(input.as_stream_hash_join(), Some(_join))
208                                || matches!(input.as_stream_temporal_join(), Some(_join))
209                                || matches!(input.as_stream_delta_join(), Some(_join));
210
211                            if is_stream_join {
212                                return Ok(required_dist.enforce(input, &Order::any()));
213                            }
214
215                            required_dist
216                        }
217                        _ => unreachable!("{:?}", user_distributed_by),
218                    }
219                }
220                TableType::Index => {
221                    assert_matches!(
222                        user_distributed_by,
223                        RequiredDist::PhysicalDist(Distribution::HashShard(_))
224                    );
225                    user_distributed_by
226                }
227                TableType::Internal => unreachable!(),
228            },
229        };
230
231        required_dist.enforce_if_not_satisfies(input, &Order::any())
232    }
233
234    /// Derive the table catalog with the given arguments.
235    ///
236    /// - The caller must ensure the validity of the given `columns`.
237    /// - The `rewritten_input` should be generated by `rewrite_input`.
238    #[allow(clippy::too_many_arguments)]
239    fn derive_table_catalog(
240        rewritten_input: PlanRef,
241        name: String,
242        database_id: DatabaseId,
243        schema_id: SchemaId,
244        user_order_by: Order,
245        columns: Vec<ColumnCatalog>,
246        definition: String,
247        conflict_behavior: ConflictBehavior,
248        version_column_index: Option<usize>,
249        pk_column_indices: Option<Vec<usize>>, // Is some when create table
250        row_id_index: Option<usize>,
251        table_type: TableType,
252        version: Option<TableVersion>,
253        cardinality: Cardinality,
254        retention_seconds: Option<NonZeroU32>,
255        create_type: CreateType,
256        webhook_info: Option<PbWebhookSourceInfo>,
257        engine: Engine,
258    ) -> Result<TableCatalog> {
259        let input = rewritten_input;
260
261        let value_indices = (0..columns.len()).collect_vec();
262        let distribution_key = input.distribution().dist_column_indices().to_vec();
263        let append_only = input.append_only();
264        // TODO(rc): In `TableCatalog` we still use `FixedBitSet` for watermark columns, ignoring the watermark group information.
265        // We will record the watermark group information in `TableCatalog` in the future. For now, let's flatten the watermark columns.
266        let watermark_columns = input.watermark_columns().indices().collect();
267
268        let (table_pk, stream_key) = if let Some(pk_column_indices) = pk_column_indices {
269            let table_pk = pk_column_indices
270                .iter()
271                .map(|idx| ColumnOrder::new(*idx, OrderType::ascending()))
272                .collect();
273            // No order by for create table, so stream key is identical to table pk.
274            (table_pk, pk_column_indices)
275        } else {
276            derive_pk(input, user_order_by, &columns)
277        };
278        // assert: `stream_key` is a subset of `table_pk`
279
280        let read_prefix_len_hint = table_pk.len();
281        Ok(TableCatalog {
282            id: TableId::placeholder(),
283            schema_id,
284            database_id,
285            associated_source_id: None,
286            name,
287            dependent_relations: vec![],
288            columns,
289            pk: table_pk,
290            stream_key,
291            distribution_key,
292            table_type,
293            append_only,
294            owner: risingwave_common::catalog::DEFAULT_SUPER_USER_ID,
295            fragment_id: OBJECT_ID_PLACEHOLDER,
296            dml_fragment_id: None,
297            vnode_col_index: None,
298            row_id_index,
299            value_indices,
300            definition,
301            conflict_behavior,
302            version_column_index,
303            read_prefix_len_hint,
304            version,
305            watermark_columns,
306            dist_key_in_pk: vec![],
307            cardinality,
308            created_at_epoch: None,
309            initialized_at_epoch: None,
310            cleaned_by_watermark: false,
311            create_type,
312            stream_job_status: StreamJobStatus::Creating,
313            description: None,
314            incoming_sinks: vec![],
315            initialized_at_cluster_version: None,
316            created_at_cluster_version: None,
317            retention_seconds: retention_seconds.map(|i| i.into()),
318            cdc_table_id: None,
319            vnode_count: VnodeCount::Placeholder, // will be filled in by the meta service later
320            webhook_info,
321            job_id: None,
322            engine: match table_type {
323                TableType::Table => engine,
324                TableType::MaterializedView | TableType::Index | TableType::Internal => {
325                    assert_eq!(engine, Engine::Hummock);
326                    engine
327                }
328            },
329            clean_watermark_index_in_pk: None, // TODO: fill this field
330        })
331    }
332
333    /// Get a reference to the stream materialize's table.
334    #[must_use]
335    pub fn table(&self) -> &TableCatalog {
336        &self.table
337    }
338
339    pub fn name(&self) -> &str {
340        self.table.name()
341    }
342}
343
344impl Distill for StreamMaterialize {
345    fn distill<'a>(&self) -> XmlNode<'a> {
346        let table = self.table();
347
348        let column_names = (table.columns.iter())
349            .map(|col| col.name_with_hidden().to_string())
350            .map(Pretty::from)
351            .collect();
352
353        let stream_key = (table.stream_key.iter())
354            .map(|&k| table.columns[k].name().to_owned())
355            .map(Pretty::from)
356            .collect();
357
358        let pk_columns = (table.pk.iter())
359            .map(|o| table.columns[o.column_index].name().to_owned())
360            .map(Pretty::from)
361            .collect();
362        let mut vec = Vec::with_capacity(5);
363        vec.push(("columns", Pretty::Array(column_names)));
364        vec.push(("stream_key", Pretty::Array(stream_key)));
365        vec.push(("pk_columns", Pretty::Array(pk_columns)));
366        let pk_conflict_behavior = self.table.conflict_behavior().debug_to_string();
367
368        vec.push(("pk_conflict", Pretty::from(pk_conflict_behavior)));
369
370        let watermark_columns = &self.base.watermark_columns();
371        if self.base.watermark_columns().n_indices() > 0 {
372            // TODO(rc): we ignore the watermark group info here, will be fixed it later
373            let watermark_column_names = watermark_columns
374                .indices()
375                .map(|i| table.columns()[i].name_with_hidden().to_string())
376                .map(Pretty::from)
377                .collect();
378            vec.push(("watermark_columns", Pretty::Array(watermark_column_names)));
379        };
380        childless_record("StreamMaterialize", vec)
381    }
382}
383
384impl PlanTreeNodeUnary for StreamMaterialize {
385    fn input(&self) -> PlanRef {
386        self.input.clone()
387    }
388
389    fn clone_with_input(&self, input: PlanRef) -> Self {
390        let new = Self::new(input, self.table().clone());
391        new.base
392            .schema()
393            .fields
394            .iter()
395            .zip_eq_fast(self.base.schema().fields.iter())
396            .for_each(|(a, b)| {
397                assert_eq!(a.data_type, b.data_type);
398            });
399        assert_eq!(new.plan_base().stream_key(), self.plan_base().stream_key());
400        new
401    }
402}
403
404impl_plan_tree_node_for_unary! { StreamMaterialize }
405
406impl StreamNode for StreamMaterialize {
407    fn to_stream_prost_body(&self, _state: &mut BuildFragmentGraphState) -> PbNodeBody {
408        use risingwave_pb::stream_plan::*;
409
410        PbNodeBody::Materialize(Box::new(MaterializeNode {
411            // We don't need table id for materialize node in frontend. The id will be generated on
412            // meta catalog service.
413            table_id: 0,
414            column_orders: self
415                .table()
416                .pk()
417                .iter()
418                .map(ColumnOrder::to_protobuf)
419                .collect(),
420            table: Some(self.table().to_internal_table_prost()),
421        }))
422    }
423}
424
425impl ExprRewritable for StreamMaterialize {}
426
427impl ExprVisitable for StreamMaterialize {}