risingwave_frontend/optimizer/plan_node/
stream_materialize.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::assert_matches::assert_matches;
16use std::num::NonZeroU32;
17
18use itertools::Itertools;
19use pretty_xmlish::{Pretty, XmlNode};
20use risingwave_common::catalog::{
21    ColumnCatalog, ConflictBehavior, CreateType, Engine, OBJECT_ID_PLACEHOLDER, StreamJobStatus,
22    TableId,
23};
24use risingwave_common::hash::VnodeCount;
25use risingwave_common::util::iter_util::ZipEqFast;
26use risingwave_common::util::sort_util::{ColumnOrder, OrderType};
27use risingwave_pb::catalog::PbWebhookSourceInfo;
28use risingwave_pb::stream_plan::stream_node::PbNodeBody;
29
30use super::derive::derive_columns;
31use super::stream::prelude::*;
32use super::utils::{Distill, childless_record};
33use super::{
34    ExprRewritable, PlanTreeNodeUnary, StreamNode, StreamPlanRef as PlanRef, reorganize_elements_id,
35};
36use crate::catalog::table_catalog::{TableCatalog, TableType, TableVersion};
37use crate::catalog::{DatabaseId, SchemaId};
38use crate::error::Result;
39use crate::optimizer::StreamOptimizedLogicalPlanRoot;
40use crate::optimizer::plan_node::derive::derive_pk;
41use crate::optimizer::plan_node::expr_visitable::ExprVisitable;
42use crate::optimizer::plan_node::utils::plan_can_use_background_ddl;
43use crate::optimizer::plan_node::{PlanBase, PlanNodeMeta};
44use crate::optimizer::property::{Cardinality, Distribution, Order, RequiredDist};
45use crate::stream_fragmenter::BuildFragmentGraphState;
46
47/// Materializes a stream.
48#[derive(Debug, Clone, PartialEq, Eq, Hash)]
49pub struct StreamMaterialize {
50    pub base: PlanBase<Stream>,
51    /// Child of Materialize plan
52    input: PlanRef,
53    table: TableCatalog,
54}
55
56impl StreamMaterialize {
57    pub fn new(input: PlanRef, table: TableCatalog) -> Result<Self> {
58        let kind = match table.conflict_behavior() {
59            ConflictBehavior::NoCheck => {
60                reject_upsert_input!(input, "Materialize without conflict handling")
61            }
62
63            // When conflict handling is enabled, upsert stream can be converted to retract stream.
64            ConflictBehavior::Overwrite
65            | ConflictBehavior::IgnoreConflict
66            | ConflictBehavior::DoUpdateIfNotNull => match input.stream_kind() {
67                StreamKind::AppendOnly | StreamKind::Retract => input.stream_kind(),
68                StreamKind::Upsert => StreamKind::Retract,
69            },
70        };
71
72        let base = PlanBase::new_stream(
73            input.ctx(),
74            input.schema().clone(),
75            Some(table.stream_key.clone()),
76            input.functional_dependency().clone(),
77            input.distribution().clone(),
78            kind,
79            input.emit_on_window_close(),
80            input.watermark_columns().clone(),
81            input.columns_monotonicity().clone(),
82        );
83
84        Ok(Self { base, input, table })
85    }
86
87    /// Create a materialize node, for `MATERIALIZED VIEW` and `INDEX`.
88    ///
89    /// When creating index, `TableType` should be `Index`. Then, materialize will distribute keys
90    /// using `user_distributed_by`.
91    pub fn create(
92        StreamOptimizedLogicalPlanRoot {
93            plan: input,
94            required_dist: user_distributed_by,
95            required_order: user_order_by,
96            out_fields: user_cols,
97            out_names,
98            ..
99        }: StreamOptimizedLogicalPlanRoot,
100        name: String,
101        database_id: DatabaseId,
102        schema_id: SchemaId,
103        definition: String,
104        table_type: TableType,
105        cardinality: Cardinality,
106        retention_seconds: Option<NonZeroU32>,
107    ) -> Result<Self> {
108        let input = Self::rewrite_input(input, user_distributed_by, table_type)?;
109        // the hidden column name might refer some expr id
110        let input = reorganize_elements_id(input);
111        let columns = derive_columns(input.schema(), out_names, &user_cols)?;
112
113        let create_type = if matches!(table_type, TableType::MaterializedView)
114            && input.ctx().session_ctx().config().background_ddl()
115            && plan_can_use_background_ddl(&input)
116        {
117            CreateType::Background
118        } else {
119            CreateType::Foreground
120        };
121
122        let table = Self::derive_table_catalog(
123            input.clone(),
124            name,
125            database_id,
126            schema_id,
127            user_order_by,
128            columns,
129            definition,
130            ConflictBehavior::NoCheck,
131            vec![],
132            None,
133            None,
134            table_type,
135            None,
136            cardinality,
137            retention_seconds,
138            create_type,
139            None,
140            Engine::Hummock,
141            false,
142        )?;
143
144        Self::new(input, table)
145    }
146
147    /// Create a materialize node, for `TABLE`.
148    ///
149    /// Different from `create`, the `columns` are passed in directly, instead of being derived from
150    /// the input. So the column IDs are preserved from the SQL columns binding step and will be
151    /// consistent with the source node and DML node.
152    #[allow(clippy::too_many_arguments)]
153    pub fn create_for_table(
154        input: PlanRef,
155        name: String,
156        database_id: DatabaseId,
157        schema_id: SchemaId,
158        user_distributed_by: RequiredDist,
159        user_order_by: Order,
160        columns: Vec<ColumnCatalog>,
161        definition: String,
162        conflict_behavior: ConflictBehavior,
163        version_column_indices: Vec<usize>,
164        pk_column_indices: Vec<usize>,
165        row_id_index: Option<usize>,
166        version: TableVersion,
167        retention_seconds: Option<NonZeroU32>,
168        webhook_info: Option<PbWebhookSourceInfo>,
169        engine: Engine,
170        refreshable: bool,
171    ) -> Result<Self> {
172        let input = Self::rewrite_input(input, user_distributed_by, TableType::Table)?;
173
174        let table = Self::derive_table_catalog(
175            input.clone(),
176            name,
177            database_id,
178            schema_id,
179            user_order_by,
180            columns,
181            definition,
182            conflict_behavior,
183            version_column_indices,
184            Some(pk_column_indices),
185            row_id_index,
186            TableType::Table,
187            Some(version),
188            Cardinality::unknown(), // unknown cardinality for tables
189            retention_seconds,
190            CreateType::Foreground,
191            webhook_info,
192            engine,
193            refreshable,
194        )?;
195
196        Self::new(input, table)
197    }
198
199    /// Rewrite the input to satisfy the required distribution if necessary, according to the type.
200    fn rewrite_input(
201        input: PlanRef,
202        user_distributed_by: RequiredDist,
203        table_type: TableType,
204    ) -> Result<PlanRef> {
205        let required_dist = match input.distribution() {
206            Distribution::Single => RequiredDist::single(),
207            _ => match table_type {
208                TableType::Table => {
209                    assert_matches!(user_distributed_by, RequiredDist::ShardByKey(_));
210                    user_distributed_by
211                }
212                TableType::MaterializedView => {
213                    assert_matches!(user_distributed_by, RequiredDist::Any);
214                    // ensure the same pk will not shuffle to different node
215                    let required_dist =
216                        RequiredDist::shard_by_key(input.schema().len(), input.expect_stream_key());
217
218                    // If the input is a stream join, enforce the stream key as the materialized
219                    // view distribution key to avoid slow backfilling caused by
220                    // data skew of the dimension table join key.
221                    // See <https://github.com/risingwavelabs/risingwave/issues/12824> for more information.
222                    let is_stream_join = matches!(input.as_stream_hash_join(), Some(_join))
223                        || matches!(input.as_stream_temporal_join(), Some(_join))
224                        || matches!(input.as_stream_delta_join(), Some(_join));
225
226                    if is_stream_join {
227                        return Ok(required_dist.stream_enforce(input));
228                    }
229
230                    required_dist
231                }
232                TableType::Index => {
233                    assert_matches!(
234                        user_distributed_by,
235                        RequiredDist::PhysicalDist(Distribution::HashShard(_))
236                    );
237                    user_distributed_by
238                }
239                TableType::VectorIndex => {
240                    unreachable!("VectorIndex should not be created by StreamMaterialize")
241                }
242                TableType::Internal => unreachable!(),
243            },
244        };
245
246        required_dist.streaming_enforce_if_not_satisfies(input)
247    }
248
249    /// Derive the table catalog with the given arguments.
250    ///
251    /// - The caller must ensure the validity of the given `columns`.
252    /// - The `rewritten_input` should be generated by `rewrite_input`.
253    #[expect(clippy::too_many_arguments)]
254    fn derive_table_catalog(
255        rewritten_input: PlanRef,
256        name: String,
257        database_id: DatabaseId,
258        schema_id: SchemaId,
259        user_order_by: Order,
260        columns: Vec<ColumnCatalog>,
261        definition: String,
262        conflict_behavior: ConflictBehavior,
263        version_column_indices: Vec<usize>,
264        pk_column_indices: Option<Vec<usize>>, // Is some when create table
265        row_id_index: Option<usize>,
266        table_type: TableType,
267        version: Option<TableVersion>,
268        cardinality: Cardinality,
269        retention_seconds: Option<NonZeroU32>,
270        create_type: CreateType,
271        webhook_info: Option<PbWebhookSourceInfo>,
272        engine: Engine,
273        refreshable: bool,
274    ) -> Result<TableCatalog> {
275        let input = rewritten_input;
276
277        let value_indices = (0..columns.len()).collect_vec();
278        let distribution_key = input.distribution().dist_column_indices().to_vec();
279        let append_only = input.append_only();
280        // TODO(rc): In `TableCatalog` we still use `FixedBitSet` for watermark columns, ignoring the watermark group information.
281        // We will record the watermark group information in `TableCatalog` in the future. For now, let's flatten the watermark columns.
282        let watermark_columns = input.watermark_columns().indices().collect();
283
284        let (table_pk, stream_key) = if let Some(pk_column_indices) = pk_column_indices {
285            let table_pk = pk_column_indices
286                .iter()
287                .map(|idx| ColumnOrder::new(*idx, OrderType::ascending()))
288                .collect();
289            // No order by for create table, so stream key is identical to table pk.
290            (table_pk, pk_column_indices)
291        } else {
292            derive_pk(input, user_order_by, &columns)
293        };
294        // assert: `stream_key` is a subset of `table_pk`
295
296        let read_prefix_len_hint = table_pk.len();
297        Ok(TableCatalog {
298            id: TableId::placeholder(),
299            schema_id,
300            database_id,
301            associated_source_id: None,
302            name,
303            columns,
304            pk: table_pk,
305            stream_key,
306            distribution_key,
307            table_type,
308            append_only,
309            owner: risingwave_common::catalog::DEFAULT_SUPER_USER_ID,
310            fragment_id: OBJECT_ID_PLACEHOLDER,
311            dml_fragment_id: None,
312            vnode_col_index: None,
313            row_id_index,
314            value_indices,
315            definition,
316            conflict_behavior,
317            version_column_indices,
318            read_prefix_len_hint,
319            version,
320            watermark_columns,
321            dist_key_in_pk: vec![],
322            cardinality,
323            created_at_epoch: None,
324            initialized_at_epoch: None,
325            cleaned_by_watermark: false,
326            create_type,
327            stream_job_status: StreamJobStatus::Creating,
328            description: None,
329            initialized_at_cluster_version: None,
330            created_at_cluster_version: None,
331            retention_seconds: retention_seconds.map(|i| i.into()),
332            cdc_table_id: None,
333            vnode_count: VnodeCount::Placeholder, // will be filled in by the meta service later
334            webhook_info,
335            job_id: None,
336            engine: match table_type {
337                TableType::Table => engine,
338                TableType::MaterializedView
339                | TableType::Index
340                | TableType::Internal
341                | TableType::VectorIndex => {
342                    assert_eq!(engine, Engine::Hummock);
343                    engine
344                }
345            },
346            clean_watermark_index_in_pk: None, // TODO: fill this field
347            refreshable,
348            vector_index_info: None,
349            cdc_table_type: None,
350        })
351    }
352
353    /// Get a reference to the stream materialize's table.
354    #[must_use]
355    pub fn table(&self) -> &TableCatalog {
356        &self.table
357    }
358
359    pub fn name(&self) -> &str {
360        self.table.name()
361    }
362}
363
364impl Distill for StreamMaterialize {
365    fn distill<'a>(&self) -> XmlNode<'a> {
366        let table = self.table();
367
368        let column_names = (table.columns.iter())
369            .map(|col| col.name_with_hidden().to_string())
370            .map(Pretty::from)
371            .collect();
372
373        let stream_key = (table.stream_key.iter())
374            .map(|&k| table.columns[k].name().to_owned())
375            .map(Pretty::from)
376            .collect();
377
378        let pk_columns = (table.pk.iter())
379            .map(|o| table.columns[o.column_index].name().to_owned())
380            .map(Pretty::from)
381            .collect();
382        let mut vec = Vec::with_capacity(5);
383        vec.push(("columns", Pretty::Array(column_names)));
384        vec.push(("stream_key", Pretty::Array(stream_key)));
385        vec.push(("pk_columns", Pretty::Array(pk_columns)));
386        let pk_conflict_behavior = self.table.conflict_behavior().debug_to_string();
387
388        vec.push(("pk_conflict", Pretty::from(pk_conflict_behavior)));
389
390        let watermark_columns = &self.base.watermark_columns();
391        if self.base.watermark_columns().n_indices() > 0 {
392            // TODO(rc): we ignore the watermark group info here, will be fixed it later
393            let watermark_column_names = watermark_columns
394                .indices()
395                .map(|i| table.columns()[i].name_with_hidden().to_string())
396                .map(Pretty::from)
397                .collect();
398            vec.push(("watermark_columns", Pretty::Array(watermark_column_names)));
399        };
400        childless_record("StreamMaterialize", vec)
401    }
402}
403
404impl PlanTreeNodeUnary<Stream> for StreamMaterialize {
405    fn input(&self) -> PlanRef {
406        self.input.clone()
407    }
408
409    fn clone_with_input(&self, input: PlanRef) -> Self {
410        let new = Self::new(input, self.table().clone()).unwrap();
411        new.base
412            .schema()
413            .fields
414            .iter()
415            .zip_eq_fast(self.base.schema().fields.iter())
416            .for_each(|(a, b)| {
417                assert_eq!(a.data_type, b.data_type);
418            });
419        assert_eq!(new.plan_base().stream_key(), self.plan_base().stream_key());
420        new
421    }
422}
423
424impl_plan_tree_node_for_unary! { Stream, StreamMaterialize }
425
426impl StreamNode for StreamMaterialize {
427    fn to_stream_prost_body(&self, _state: &mut BuildFragmentGraphState) -> PbNodeBody {
428        use risingwave_pb::stream_plan::*;
429
430        PbNodeBody::Materialize(Box::new(MaterializeNode {
431            // Do not fill `table` and `table_id` here to avoid duplication. It will be filled by
432            // meta service after global information is generated.
433            table_id: 0,
434            table: None,
435
436            column_orders: self
437                .table()
438                .pk()
439                .iter()
440                .copied()
441                .map(ColumnOrder::to_protobuf)
442                .collect(),
443        }))
444    }
445}
446
447impl ExprRewritable<Stream> for StreamMaterialize {}
448
449impl ExprVisitable for StreamMaterialize {}