risingwave_frontend/optimizer/plan_node/
stream_materialize.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::assert_matches::assert_matches;
16use std::num::NonZeroU32;
17
18use fixedbitset::FixedBitSet;
19use itertools::Itertools;
20use pretty_xmlish::{Pretty, XmlNode};
21use risingwave_common::catalog::{
22    ColumnCatalog, ConflictBehavior, CreateType, Engine, OBJECT_ID_PLACEHOLDER, StreamJobStatus,
23    TableId,
24};
25use risingwave_common::hash::VnodeCount;
26use risingwave_common::util::iter_util::ZipEqFast;
27use risingwave_common::util::sort_util::{ColumnOrder, OrderType};
28use risingwave_pb::catalog::PbWebhookSourceInfo;
29use risingwave_pb::stream_plan::stream_node::PbNodeBody;
30
31use super::derive::derive_columns;
32use super::stream::prelude::*;
33use super::utils::{Distill, childless_record};
34use super::{ExprRewritable, PlanRef, PlanTreeNodeUnary, StreamNode, reorganize_elements_id};
35use crate::catalog::table_catalog::{TableCatalog, TableType, TableVersion};
36use crate::catalog::{DatabaseId, SchemaId};
37use crate::error::Result;
38use crate::optimizer::plan_node::derive::derive_pk;
39use crate::optimizer::plan_node::expr_visitable::ExprVisitable;
40use crate::optimizer::plan_node::utils::plan_can_use_background_ddl;
41use crate::optimizer::plan_node::{PlanBase, PlanNodeMeta};
42use crate::optimizer::property::{Cardinality, Distribution, Order, RequiredDist};
43use crate::stream_fragmenter::BuildFragmentGraphState;
44
45/// Materializes a stream.
46#[derive(Debug, Clone, PartialEq, Eq, Hash)]
47pub struct StreamMaterialize {
48    pub base: PlanBase<Stream>,
49    /// Child of Materialize plan
50    input: PlanRef,
51    table: TableCatalog,
52}
53
54impl StreamMaterialize {
55    #[must_use]
56    pub fn new(input: PlanRef, table: TableCatalog) -> Self {
57        let base = PlanBase::new_stream(
58            input.ctx(),
59            input.schema().clone(),
60            Some(table.stream_key.clone()),
61            input.functional_dependency().clone(),
62            input.distribution().clone(),
63            input.append_only(),
64            input.emit_on_window_close(),
65            input.watermark_columns().clone(),
66            input.columns_monotonicity().clone(),
67        );
68        Self { base, input, table }
69    }
70
71    /// Create a materialize node, for `MATERIALIZED VIEW` and `INDEX`.
72    ///
73    /// When creating index, `TableType` should be `Index`. Then, materialize will distribute keys
74    /// using `user_distributed_by`.
75    #[allow(clippy::too_many_arguments)]
76    pub fn create(
77        input: PlanRef,
78        name: String,
79        database_id: DatabaseId,
80        schema_id: SchemaId,
81        user_distributed_by: RequiredDist,
82        user_order_by: Order,
83        user_cols: FixedBitSet,
84        out_names: Vec<String>,
85        definition: String,
86        table_type: TableType,
87        cardinality: Cardinality,
88        retention_seconds: Option<NonZeroU32>,
89    ) -> Result<Self> {
90        let input = Self::rewrite_input(input, user_distributed_by, table_type)?;
91        // the hidden column name might refer some expr id
92        let input = reorganize_elements_id(input);
93        let columns = derive_columns(input.schema(), out_names, &user_cols)?;
94
95        let create_type = if matches!(table_type, TableType::MaterializedView)
96            && input.ctx().session_ctx().config().background_ddl()
97            && plan_can_use_background_ddl(&input)
98        {
99            CreateType::Background
100        } else {
101            CreateType::Foreground
102        };
103
104        let table = Self::derive_table_catalog(
105            input.clone(),
106            name,
107            database_id,
108            schema_id,
109            user_order_by,
110            columns,
111            definition,
112            ConflictBehavior::NoCheck,
113            None,
114            None,
115            None,
116            table_type,
117            None,
118            cardinality,
119            retention_seconds,
120            create_type,
121            None,
122            Engine::Hummock,
123        )?;
124
125        Ok(Self::new(input, table))
126    }
127
128    /// Create a materialize node, for `TABLE`.
129    ///
130    /// Different from `create`, the `columns` are passed in directly, instead of being derived from
131    /// the input. So the column IDs are preserved from the SQL columns binding step and will be
132    /// consistent with the source node and DML node.
133    #[allow(clippy::too_many_arguments)]
134    pub fn create_for_table(
135        input: PlanRef,
136        name: String,
137        database_id: DatabaseId,
138        schema_id: SchemaId,
139        user_distributed_by: RequiredDist,
140        user_order_by: Order,
141        columns: Vec<ColumnCatalog>,
142        definition: String,
143        conflict_behavior: ConflictBehavior,
144        version_column_index: Option<usize>,
145        pk_column_indices: Vec<usize>,
146        row_id_index: Option<usize>,
147        version: TableVersion,
148        retention_seconds: Option<NonZeroU32>,
149        webhook_info: Option<PbWebhookSourceInfo>,
150        engine: Engine,
151    ) -> Result<Self> {
152        let input = Self::rewrite_input(input, user_distributed_by, TableType::Table)?;
153
154        let table = Self::derive_table_catalog(
155            input.clone(),
156            name,
157            database_id,
158            schema_id,
159            user_order_by,
160            columns,
161            definition,
162            conflict_behavior,
163            version_column_index,
164            Some(pk_column_indices),
165            row_id_index,
166            TableType::Table,
167            Some(version),
168            Cardinality::unknown(), // unknown cardinality for tables
169            retention_seconds,
170            CreateType::Foreground,
171            webhook_info,
172            engine,
173        )?;
174
175        Ok(Self::new(input, table))
176    }
177
178    /// Rewrite the input to satisfy the required distribution if necessary, according to the type.
179    fn rewrite_input(
180        input: PlanRef,
181        user_distributed_by: RequiredDist,
182        table_type: TableType,
183    ) -> Result<PlanRef> {
184        let required_dist = match input.distribution() {
185            Distribution::Single => RequiredDist::single(),
186            _ => match table_type {
187                TableType::Table => {
188                    assert_matches!(user_distributed_by, RequiredDist::ShardByKey(_));
189                    user_distributed_by
190                }
191                TableType::MaterializedView => {
192                    assert_matches!(user_distributed_by, RequiredDist::Any);
193                    // ensure the same pk will not shuffle to different node
194                    let required_dist =
195                        RequiredDist::shard_by_key(input.schema().len(), input.expect_stream_key());
196
197                    // If the input is a stream join, enforce the stream key as the materialized
198                    // view distribution key to avoid slow backfilling caused by
199                    // data skew of the dimension table join key.
200                    // See <https://github.com/risingwavelabs/risingwave/issues/12824> for more information.
201                    let is_stream_join = matches!(input.as_stream_hash_join(), Some(_join))
202                        || matches!(input.as_stream_temporal_join(), Some(_join))
203                        || matches!(input.as_stream_delta_join(), Some(_join));
204
205                    if is_stream_join {
206                        return Ok(required_dist.enforce(input, &Order::any()));
207                    }
208
209                    required_dist
210                }
211                TableType::Index => {
212                    assert_matches!(
213                        user_distributed_by,
214                        RequiredDist::PhysicalDist(Distribution::HashShard(_))
215                    );
216                    user_distributed_by
217                }
218                TableType::Internal => unreachable!(),
219            },
220        };
221
222        required_dist.enforce_if_not_satisfies(input, &Order::any())
223    }
224
225    /// Derive the table catalog with the given arguments.
226    ///
227    /// - The caller must ensure the validity of the given `columns`.
228    /// - The `rewritten_input` should be generated by `rewrite_input`.
229    #[allow(clippy::too_many_arguments)]
230    fn derive_table_catalog(
231        rewritten_input: PlanRef,
232        name: String,
233        database_id: DatabaseId,
234        schema_id: SchemaId,
235        user_order_by: Order,
236        columns: Vec<ColumnCatalog>,
237        definition: String,
238        conflict_behavior: ConflictBehavior,
239        version_column_index: Option<usize>,
240        pk_column_indices: Option<Vec<usize>>, // Is some when create table
241        row_id_index: Option<usize>,
242        table_type: TableType,
243        version: Option<TableVersion>,
244        cardinality: Cardinality,
245        retention_seconds: Option<NonZeroU32>,
246        create_type: CreateType,
247        webhook_info: Option<PbWebhookSourceInfo>,
248        engine: Engine,
249    ) -> Result<TableCatalog> {
250        let input = rewritten_input;
251
252        let value_indices = (0..columns.len()).collect_vec();
253        let distribution_key = input.distribution().dist_column_indices().to_vec();
254        let append_only = input.append_only();
255        // TODO(rc): In `TableCatalog` we still use `FixedBitSet` for watermark columns, ignoring the watermark group information.
256        // We will record the watermark group information in `TableCatalog` in the future. For now, let's flatten the watermark columns.
257        let watermark_columns = input.watermark_columns().indices().collect();
258
259        let (table_pk, stream_key) = if let Some(pk_column_indices) = pk_column_indices {
260            let table_pk = pk_column_indices
261                .iter()
262                .map(|idx| ColumnOrder::new(*idx, OrderType::ascending()))
263                .collect();
264            // No order by for create table, so stream key is identical to table pk.
265            (table_pk, pk_column_indices)
266        } else {
267            derive_pk(input, user_order_by, &columns)
268        };
269        // assert: `stream_key` is a subset of `table_pk`
270
271        let read_prefix_len_hint = table_pk.len();
272        Ok(TableCatalog {
273            id: TableId::placeholder(),
274            schema_id,
275            database_id,
276            associated_source_id: None,
277            name,
278            dependent_relations: vec![],
279            columns,
280            pk: table_pk,
281            stream_key,
282            distribution_key,
283            table_type,
284            append_only,
285            owner: risingwave_common::catalog::DEFAULT_SUPER_USER_ID,
286            fragment_id: OBJECT_ID_PLACEHOLDER,
287            dml_fragment_id: None,
288            vnode_col_index: None,
289            row_id_index,
290            value_indices,
291            definition,
292            conflict_behavior,
293            version_column_index,
294            read_prefix_len_hint,
295            version,
296            watermark_columns,
297            dist_key_in_pk: vec![],
298            cardinality,
299            created_at_epoch: None,
300            initialized_at_epoch: None,
301            cleaned_by_watermark: false,
302            create_type,
303            stream_job_status: StreamJobStatus::Creating,
304            description: None,
305            incoming_sinks: vec![],
306            initialized_at_cluster_version: None,
307            created_at_cluster_version: None,
308            retention_seconds: retention_seconds.map(|i| i.into()),
309            cdc_table_id: None,
310            vnode_count: VnodeCount::Placeholder, // will be filled in by the meta service later
311            webhook_info,
312            job_id: None,
313            engine: match table_type {
314                TableType::Table => engine,
315                TableType::MaterializedView | TableType::Index | TableType::Internal => {
316                    assert_eq!(engine, Engine::Hummock);
317                    engine
318                }
319            },
320            clean_watermark_index_in_pk: None, // TODO: fill this field
321        })
322    }
323
324    /// Get a reference to the stream materialize's table.
325    #[must_use]
326    pub fn table(&self) -> &TableCatalog {
327        &self.table
328    }
329
330    pub fn name(&self) -> &str {
331        self.table.name()
332    }
333}
334
335impl Distill for StreamMaterialize {
336    fn distill<'a>(&self) -> XmlNode<'a> {
337        let table = self.table();
338
339        let column_names = (table.columns.iter())
340            .map(|col| col.name_with_hidden().to_string())
341            .map(Pretty::from)
342            .collect();
343
344        let stream_key = (table.stream_key.iter())
345            .map(|&k| table.columns[k].name().to_owned())
346            .map(Pretty::from)
347            .collect();
348
349        let pk_columns = (table.pk.iter())
350            .map(|o| table.columns[o.column_index].name().to_owned())
351            .map(Pretty::from)
352            .collect();
353        let mut vec = Vec::with_capacity(5);
354        vec.push(("columns", Pretty::Array(column_names)));
355        vec.push(("stream_key", Pretty::Array(stream_key)));
356        vec.push(("pk_columns", Pretty::Array(pk_columns)));
357        let pk_conflict_behavior = self.table.conflict_behavior().debug_to_string();
358
359        vec.push(("pk_conflict", Pretty::from(pk_conflict_behavior)));
360
361        let watermark_columns = &self.base.watermark_columns();
362        if self.base.watermark_columns().n_indices() > 0 {
363            // TODO(rc): we ignore the watermark group info here, will be fixed it later
364            let watermark_column_names = watermark_columns
365                .indices()
366                .map(|i| table.columns()[i].name_with_hidden().to_string())
367                .map(Pretty::from)
368                .collect();
369            vec.push(("watermark_columns", Pretty::Array(watermark_column_names)));
370        };
371        childless_record("StreamMaterialize", vec)
372    }
373}
374
375impl PlanTreeNodeUnary for StreamMaterialize {
376    fn input(&self) -> PlanRef {
377        self.input.clone()
378    }
379
380    fn clone_with_input(&self, input: PlanRef) -> Self {
381        let new = Self::new(input, self.table().clone());
382        new.base
383            .schema()
384            .fields
385            .iter()
386            .zip_eq_fast(self.base.schema().fields.iter())
387            .for_each(|(a, b)| {
388                assert_eq!(a.data_type, b.data_type);
389            });
390        assert_eq!(new.plan_base().stream_key(), self.plan_base().stream_key());
391        new
392    }
393}
394
395impl_plan_tree_node_for_unary! { StreamMaterialize }
396
397impl StreamNode for StreamMaterialize {
398    fn to_stream_prost_body(&self, _state: &mut BuildFragmentGraphState) -> PbNodeBody {
399        use risingwave_pb::stream_plan::*;
400
401        PbNodeBody::Materialize(Box::new(MaterializeNode {
402            // We don't need table id for materialize node in frontend. The id will be generated on
403            // meta catalog service.
404            table_id: 0,
405            column_orders: self
406                .table()
407                .pk()
408                .iter()
409                .map(ColumnOrder::to_protobuf)
410                .collect(),
411            table: Some(self.table().to_internal_table_prost()),
412        }))
413    }
414}
415
416impl ExprRewritable for StreamMaterialize {}
417
418impl ExprVisitable for StreamMaterialize {}