risingwave_frontend/optimizer/plan_node/
stream_materialize.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::assert_matches::assert_matches;
16use std::num::NonZeroU32;
17
18use itertools::Itertools;
19use pretty_xmlish::{Pretty, XmlNode};
20use risingwave_common::catalog::{
21    ColumnCatalog, ConflictBehavior, CreateType, Engine, OBJECT_ID_PLACEHOLDER, StreamJobStatus,
22    TableId,
23};
24use risingwave_common::hash::VnodeCount;
25use risingwave_common::util::iter_util::ZipEqFast;
26use risingwave_common::util::sort_util::{ColumnOrder, OrderType};
27use risingwave_pb::catalog::PbWebhookSourceInfo;
28use risingwave_pb::stream_plan::stream_node::PbNodeBody;
29
30use super::derive::derive_columns;
31use super::stream::prelude::*;
32use super::utils::{Distill, childless_record};
33use super::{
34    ExprRewritable, PlanTreeNodeUnary, StreamNode, StreamPlanRef as PlanRef, reorganize_elements_id,
35};
36use crate::catalog::table_catalog::{TableCatalog, TableType, TableVersion};
37use crate::catalog::{DatabaseId, SchemaId};
38use crate::error::Result;
39use crate::optimizer::StreamOptimizedLogicalPlanRoot;
40use crate::optimizer::plan_node::derive::derive_pk;
41use crate::optimizer::plan_node::expr_visitable::ExprVisitable;
42use crate::optimizer::plan_node::utils::plan_can_use_background_ddl;
43use crate::optimizer::plan_node::{PlanBase, PlanNodeMeta};
44use crate::optimizer::property::{Cardinality, Distribution, Order, RequiredDist};
45use crate::stream_fragmenter::BuildFragmentGraphState;
46
47/// Materializes a stream.
48#[derive(Debug, Clone, PartialEq, Eq, Hash)]
49pub struct StreamMaterialize {
50    pub base: PlanBase<Stream>,
51    /// Child of Materialize plan
52    input: PlanRef,
53    table: TableCatalog,
54}
55
56impl StreamMaterialize {
57    pub fn new(input: PlanRef, table: TableCatalog) -> Result<Self> {
58        let kind = match table.conflict_behavior() {
59            ConflictBehavior::NoCheck => {
60                reject_upsert_input!(input, "Materialize without conflict handling")
61            }
62
63            // When conflict handling is enabled, upsert stream can be converted to retract stream.
64            ConflictBehavior::Overwrite
65            | ConflictBehavior::IgnoreConflict
66            | ConflictBehavior::DoUpdateIfNotNull => match input.stream_kind() {
67                StreamKind::AppendOnly | StreamKind::Retract => input.stream_kind(),
68                StreamKind::Upsert => StreamKind::Retract,
69            },
70        };
71
72        let base = PlanBase::new_stream(
73            input.ctx(),
74            input.schema().clone(),
75            Some(table.stream_key.clone()),
76            input.functional_dependency().clone(),
77            input.distribution().clone(),
78            kind,
79            input.emit_on_window_close(),
80            input.watermark_columns().clone(),
81            input.columns_monotonicity().clone(),
82        );
83
84        Ok(Self { base, input, table })
85    }
86
87    /// Create a materialize node, for `MATERIALIZED VIEW` and `INDEX`.
88    ///
89    /// When creating index, `TableType` should be `Index`. Then, materialize will distribute keys
90    /// using `user_distributed_by`.
91    #[allow(clippy::too_many_arguments)]
92    pub fn create(
93        StreamOptimizedLogicalPlanRoot {
94            plan: input,
95            required_dist: user_distributed_by,
96            required_order: user_order_by,
97            out_fields: user_cols,
98            out_names,
99            ..
100        }: StreamOptimizedLogicalPlanRoot,
101        name: String,
102        database_id: DatabaseId,
103        schema_id: SchemaId,
104        definition: String,
105        table_type: TableType,
106        cardinality: Cardinality,
107        retention_seconds: Option<NonZeroU32>,
108    ) -> Result<Self> {
109        let input = Self::rewrite_input(input, user_distributed_by, table_type)?;
110        // the hidden column name might refer some expr id
111        let input = reorganize_elements_id(input);
112        let columns = derive_columns(input.schema(), out_names, &user_cols)?;
113
114        let create_type = if matches!(table_type, TableType::MaterializedView)
115            && input.ctx().session_ctx().config().background_ddl()
116            && plan_can_use_background_ddl(&input)
117        {
118            CreateType::Background
119        } else {
120            CreateType::Foreground
121        };
122
123        let table = Self::derive_table_catalog(
124            input.clone(),
125            name,
126            database_id,
127            schema_id,
128            user_order_by,
129            columns,
130            definition,
131            ConflictBehavior::NoCheck,
132            None,
133            None,
134            None,
135            table_type,
136            None,
137            cardinality,
138            retention_seconds,
139            create_type,
140            None,
141            Engine::Hummock,
142            false,
143        )?;
144
145        Self::new(input, table)
146    }
147
148    /// Create a materialize node, for `TABLE`.
149    ///
150    /// Different from `create`, the `columns` are passed in directly, instead of being derived from
151    /// the input. So the column IDs are preserved from the SQL columns binding step and will be
152    /// consistent with the source node and DML node.
153    #[allow(clippy::too_many_arguments)]
154    pub fn create_for_table(
155        input: PlanRef,
156        name: String,
157        database_id: DatabaseId,
158        schema_id: SchemaId,
159        user_distributed_by: RequiredDist,
160        user_order_by: Order,
161        columns: Vec<ColumnCatalog>,
162        definition: String,
163        conflict_behavior: ConflictBehavior,
164        version_column_index: Option<usize>,
165        pk_column_indices: Vec<usize>,
166        row_id_index: Option<usize>,
167        version: TableVersion,
168        retention_seconds: Option<NonZeroU32>,
169        webhook_info: Option<PbWebhookSourceInfo>,
170        engine: Engine,
171        refreshable: bool,
172    ) -> Result<Self> {
173        let input = Self::rewrite_input(input, user_distributed_by, TableType::Table)?;
174
175        let table = Self::derive_table_catalog(
176            input.clone(),
177            name,
178            database_id,
179            schema_id,
180            user_order_by,
181            columns,
182            definition,
183            conflict_behavior,
184            version_column_index,
185            Some(pk_column_indices),
186            row_id_index,
187            TableType::Table,
188            Some(version),
189            Cardinality::unknown(), // unknown cardinality for tables
190            retention_seconds,
191            CreateType::Foreground,
192            webhook_info,
193            engine,
194            refreshable,
195        )?;
196
197        Self::new(input, table)
198    }
199
200    /// Rewrite the input to satisfy the required distribution if necessary, according to the type.
201    fn rewrite_input(
202        input: PlanRef,
203        user_distributed_by: RequiredDist,
204        table_type: TableType,
205    ) -> Result<PlanRef> {
206        let required_dist = match input.distribution() {
207            Distribution::Single => RequiredDist::single(),
208            _ => match table_type {
209                TableType::Table => {
210                    assert_matches!(user_distributed_by, RequiredDist::ShardByKey(_));
211                    user_distributed_by
212                }
213                TableType::MaterializedView => {
214                    assert_matches!(user_distributed_by, RequiredDist::Any);
215                    // ensure the same pk will not shuffle to different node
216                    let required_dist =
217                        RequiredDist::shard_by_key(input.schema().len(), input.expect_stream_key());
218
219                    // If the input is a stream join, enforce the stream key as the materialized
220                    // view distribution key to avoid slow backfilling caused by
221                    // data skew of the dimension table join key.
222                    // See <https://github.com/risingwavelabs/risingwave/issues/12824> for more information.
223                    let is_stream_join = matches!(input.as_stream_hash_join(), Some(_join))
224                        || matches!(input.as_stream_temporal_join(), Some(_join))
225                        || matches!(input.as_stream_delta_join(), Some(_join));
226
227                    if is_stream_join {
228                        return Ok(required_dist.stream_enforce(input));
229                    }
230
231                    required_dist
232                }
233                TableType::Index => {
234                    assert_matches!(
235                        user_distributed_by,
236                        RequiredDist::PhysicalDist(Distribution::HashShard(_))
237                    );
238                    user_distributed_by
239                }
240                TableType::Internal => unreachable!(),
241            },
242        };
243
244        required_dist.streaming_enforce_if_not_satisfies(input)
245    }
246
247    /// Derive the table catalog with the given arguments.
248    ///
249    /// - The caller must ensure the validity of the given `columns`.
250    /// - The `rewritten_input` should be generated by `rewrite_input`.
251    #[allow(clippy::too_many_arguments)]
252    fn derive_table_catalog(
253        rewritten_input: PlanRef,
254        name: String,
255        database_id: DatabaseId,
256        schema_id: SchemaId,
257        user_order_by: Order,
258        columns: Vec<ColumnCatalog>,
259        definition: String,
260        conflict_behavior: ConflictBehavior,
261        version_column_index: Option<usize>,
262        pk_column_indices: Option<Vec<usize>>, // Is some when create table
263        row_id_index: Option<usize>,
264        table_type: TableType,
265        version: Option<TableVersion>,
266        cardinality: Cardinality,
267        retention_seconds: Option<NonZeroU32>,
268        create_type: CreateType,
269        webhook_info: Option<PbWebhookSourceInfo>,
270        engine: Engine,
271        refreshable: bool,
272    ) -> Result<TableCatalog> {
273        let input = rewritten_input;
274
275        let value_indices = (0..columns.len()).collect_vec();
276        let distribution_key = input.distribution().dist_column_indices().to_vec();
277        let append_only = input.append_only();
278        // TODO(rc): In `TableCatalog` we still use `FixedBitSet` for watermark columns, ignoring the watermark group information.
279        // We will record the watermark group information in `TableCatalog` in the future. For now, let's flatten the watermark columns.
280        let watermark_columns = input.watermark_columns().indices().collect();
281
282        let (table_pk, stream_key) = if let Some(pk_column_indices) = pk_column_indices {
283            let table_pk = pk_column_indices
284                .iter()
285                .map(|idx| ColumnOrder::new(*idx, OrderType::ascending()))
286                .collect();
287            // No order by for create table, so stream key is identical to table pk.
288            (table_pk, pk_column_indices)
289        } else {
290            derive_pk(input, user_order_by, &columns)
291        };
292        // assert: `stream_key` is a subset of `table_pk`
293
294        let read_prefix_len_hint = table_pk.len();
295        Ok(TableCatalog {
296            id: TableId::placeholder(),
297            schema_id,
298            database_id,
299            associated_source_id: None,
300            name,
301            columns,
302            pk: table_pk,
303            stream_key,
304            distribution_key,
305            table_type,
306            append_only,
307            owner: risingwave_common::catalog::DEFAULT_SUPER_USER_ID,
308            fragment_id: OBJECT_ID_PLACEHOLDER,
309            dml_fragment_id: None,
310            vnode_col_index: None,
311            row_id_index,
312            value_indices,
313            definition,
314            conflict_behavior,
315            version_column_index,
316            read_prefix_len_hint,
317            version,
318            watermark_columns,
319            dist_key_in_pk: vec![],
320            cardinality,
321            created_at_epoch: None,
322            initialized_at_epoch: None,
323            cleaned_by_watermark: false,
324            create_type,
325            stream_job_status: StreamJobStatus::Creating,
326            description: None,
327            incoming_sinks: vec![],
328            initialized_at_cluster_version: None,
329            created_at_cluster_version: None,
330            retention_seconds: retention_seconds.map(|i| i.into()),
331            cdc_table_id: None,
332            vnode_count: VnodeCount::Placeholder, // will be filled in by the meta service later
333            webhook_info,
334            job_id: None,
335            engine: match table_type {
336                TableType::Table => engine,
337                TableType::MaterializedView | TableType::Index | TableType::Internal => {
338                    assert_eq!(engine, Engine::Hummock);
339                    engine
340                }
341            },
342            clean_watermark_index_in_pk: None, // TODO: fill this field
343            refreshable,
344            vector_index_info: None,
345        })
346    }
347
348    /// Get a reference to the stream materialize's table.
349    #[must_use]
350    pub fn table(&self) -> &TableCatalog {
351        &self.table
352    }
353
354    pub fn name(&self) -> &str {
355        self.table.name()
356    }
357}
358
359impl Distill for StreamMaterialize {
360    fn distill<'a>(&self) -> XmlNode<'a> {
361        let table = self.table();
362
363        let column_names = (table.columns.iter())
364            .map(|col| col.name_with_hidden().to_string())
365            .map(Pretty::from)
366            .collect();
367
368        let stream_key = (table.stream_key.iter())
369            .map(|&k| table.columns[k].name().to_owned())
370            .map(Pretty::from)
371            .collect();
372
373        let pk_columns = (table.pk.iter())
374            .map(|o| table.columns[o.column_index].name().to_owned())
375            .map(Pretty::from)
376            .collect();
377        let mut vec = Vec::with_capacity(5);
378        vec.push(("columns", Pretty::Array(column_names)));
379        vec.push(("stream_key", Pretty::Array(stream_key)));
380        vec.push(("pk_columns", Pretty::Array(pk_columns)));
381        let pk_conflict_behavior = self.table.conflict_behavior().debug_to_string();
382
383        vec.push(("pk_conflict", Pretty::from(pk_conflict_behavior)));
384
385        let watermark_columns = &self.base.watermark_columns();
386        if self.base.watermark_columns().n_indices() > 0 {
387            // TODO(rc): we ignore the watermark group info here, will be fixed it later
388            let watermark_column_names = watermark_columns
389                .indices()
390                .map(|i| table.columns()[i].name_with_hidden().to_string())
391                .map(Pretty::from)
392                .collect();
393            vec.push(("watermark_columns", Pretty::Array(watermark_column_names)));
394        };
395        childless_record("StreamMaterialize", vec)
396    }
397}
398
399impl PlanTreeNodeUnary<Stream> for StreamMaterialize {
400    fn input(&self) -> PlanRef {
401        self.input.clone()
402    }
403
404    fn clone_with_input(&self, input: PlanRef) -> Self {
405        let new = Self::new(input, self.table().clone()).unwrap();
406        new.base
407            .schema()
408            .fields
409            .iter()
410            .zip_eq_fast(self.base.schema().fields.iter())
411            .for_each(|(a, b)| {
412                assert_eq!(a.data_type, b.data_type);
413            });
414        assert_eq!(new.plan_base().stream_key(), self.plan_base().stream_key());
415        new
416    }
417}
418
419impl_plan_tree_node_for_unary! { Stream, StreamMaterialize }
420
421impl StreamNode for StreamMaterialize {
422    fn to_stream_prost_body(&self, _state: &mut BuildFragmentGraphState) -> PbNodeBody {
423        use risingwave_pb::stream_plan::*;
424
425        PbNodeBody::Materialize(Box::new(MaterializeNode {
426            // Do not fill `table` and `table_id` here to avoid duplication. It will be filled by
427            // meta service after global information is generated.
428            table_id: 0,
429            table: None,
430
431            column_orders: self
432                .table()
433                .pk()
434                .iter()
435                .copied()
436                .map(ColumnOrder::to_protobuf)
437                .collect(),
438        }))
439    }
440}
441
442impl ExprRewritable<Stream> for StreamMaterialize {}
443
444impl ExprVisitable for StreamMaterialize {}