risingwave_frontend/optimizer/plan_node/
stream_materialize.rs

1// Copyright 2022 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::assert_matches::assert_matches;
16use std::num::NonZeroU32;
17
18use fixedbitset::FixedBitSet;
19use itertools::Itertools;
20use pretty_xmlish::{Pretty, XmlNode};
21use risingwave_common::catalog::{
22    ColumnCatalog, ConflictBehavior, CreateType, Engine, StreamJobStatus, TableId,
23};
24use risingwave_common::hash::VnodeCount;
25use risingwave_common::id::FragmentId;
26use risingwave_common::types::DataType;
27use risingwave_common::util::column_index_mapping::ColIndexMapping;
28use risingwave_common::util::iter_util::ZipEqFast;
29use risingwave_common::util::sort_util::{ColumnOrder, OrderType};
30use risingwave_pb::catalog::PbWebhookSourceInfo;
31use risingwave_pb::stream_plan::stream_node::PbNodeBody;
32
33use super::derive::derive_columns;
34use super::stream::prelude::*;
35use super::utils::{Distill, TableCatalogBuilder, childless_record};
36use super::{
37    ExprRewritable, PlanTreeNodeUnary, StreamNode, StreamPlanRef as PlanRef, reorganize_elements_id,
38};
39use crate::catalog::table_catalog::{TableCatalog, TableType, TableVersion};
40use crate::catalog::{DatabaseId, SchemaId};
41use crate::error::Result;
42use crate::optimizer::StreamOptimizedLogicalPlanRoot;
43use crate::optimizer::plan_node::derive::derive_pk;
44use crate::optimizer::plan_node::expr_visitable::ExprVisitable;
45use crate::optimizer::plan_node::utils::plan_can_use_background_ddl;
46use crate::optimizer::plan_node::{PlanBase, PlanNodeMeta};
47use crate::optimizer::property::{Cardinality, Distribution, Order, RequiredDist};
48use crate::stream_fragmenter::BuildFragmentGraphState;
49
50/// Materializes a stream.
51#[derive(Debug, Clone, PartialEq, Eq, Hash)]
52pub struct StreamMaterialize {
53    pub base: PlanBase<Stream>,
54    /// Child of Materialize plan
55    input: PlanRef,
56    table: TableCatalog,
57    /// For refreshable tables, staging table for collecting new data during refresh
58    staging_table: Option<TableCatalog>,
59    /// For refreshable tables, progress table for tracking refresh progress
60    refresh_progress_table: Option<TableCatalog>,
61}
62
63impl StreamMaterialize {
64    pub fn new(input: PlanRef, table: TableCatalog) -> Result<Self> {
65        Self::new_with_staging_and_progress(input, table, None, None)
66    }
67
68    pub fn new_with_staging_and_progress(
69        input: PlanRef,
70        table: TableCatalog,
71        staging_table: Option<TableCatalog>,
72        refresh_progress_table: Option<TableCatalog>,
73    ) -> Result<Self> {
74        let kind = match table.conflict_behavior() {
75            ConflictBehavior::NoCheck => {
76                reject_upsert_input!(input, "Materialize without conflict handling")
77            }
78
79            // When conflict handling is enabled, upsert stream can be converted to retract stream.
80            ConflictBehavior::Overwrite
81            | ConflictBehavior::IgnoreConflict
82            | ConflictBehavior::DoUpdateIfNotNull => match input.stream_kind() {
83                StreamKind::AppendOnly => StreamKind::AppendOnly,
84                StreamKind::Retract | StreamKind::Upsert => StreamKind::Retract,
85            },
86        };
87        let base = PlanBase::new_stream(
88            input.ctx(),
89            input.schema().clone(),
90            Some(table.stream_key()),
91            input.functional_dependency().clone(),
92            input.distribution().clone(),
93            kind,
94            input.emit_on_window_close(),
95            input.watermark_columns().clone(),
96            input.columns_monotonicity().clone(),
97        );
98
99        Ok(Self {
100            base,
101            input,
102            table,
103            staging_table,
104            refresh_progress_table,
105        })
106    }
107
108    /// Create a materialize node, for `MATERIALIZED VIEW` and `INDEX`.
109    ///
110    /// When creating index, `TableType` should be `Index`. Then, materialize will distribute keys
111    /// using `user_distributed_by`.
112    pub fn create(
113        StreamOptimizedLogicalPlanRoot {
114            plan: input,
115            required_dist: user_distributed_by,
116            required_order: user_order_by,
117            out_fields: user_cols,
118            out_names,
119            ..
120        }: StreamOptimizedLogicalPlanRoot,
121        name: String,
122        database_id: DatabaseId,
123        schema_id: SchemaId,
124        definition: String,
125        table_type: TableType,
126        cardinality: Cardinality,
127        retention_seconds: Option<NonZeroU32>,
128    ) -> Result<Self> {
129        let input = Self::rewrite_input(input, user_distributed_by.clone(), table_type)?;
130        // the hidden column name might refer some expr id
131        let input = reorganize_elements_id(input);
132        let columns = derive_columns(input.schema(), out_names, &user_cols)?;
133
134        let create_type = if matches!(table_type, TableType::MaterializedView)
135            && input.ctx().session_ctx().config().background_ddl()
136            && plan_can_use_background_ddl(&input)
137        {
138            CreateType::Background
139        } else {
140            CreateType::Foreground
141        };
142
143        // For upsert stream, use `Overwrite` conflict behavior to convert into retract stream.
144        let conflict_behavior = match input.stream_kind() {
145            StreamKind::Retract | StreamKind::AppendOnly => ConflictBehavior::NoCheck,
146            StreamKind::Upsert => ConflictBehavior::Overwrite,
147        };
148
149        let table = Self::derive_table_catalog(
150            input.clone(),
151            name,
152            database_id,
153            schema_id,
154            user_distributed_by,
155            user_order_by,
156            columns,
157            definition,
158            conflict_behavior,
159            vec![],
160            None,
161            vec![],
162            None,
163            table_type,
164            None,
165            cardinality,
166            retention_seconds,
167            create_type,
168            None,
169            Engine::Hummock,
170            false,
171        )?;
172
173        Self::new(input, table)
174    }
175
176    /// Create a materialize node, for `TABLE`.
177    ///
178    /// Different from `create`, the `columns` are passed in directly, instead of being derived from
179    /// the input. So the column IDs are preserved from the SQL columns binding step and will be
180    /// consistent with the source node and DML node.
181    #[expect(clippy::too_many_arguments)]
182    pub fn create_for_table(
183        input: PlanRef,
184        name: String,
185        database_id: DatabaseId,
186        schema_id: SchemaId,
187        user_distributed_by: RequiredDist,
188        user_order_by: Order,
189        columns: Vec<ColumnCatalog>,
190        definition: String,
191        conflict_behavior: ConflictBehavior,
192        version_column_indices: Vec<usize>,
193        pk_column_indices: Vec<usize>,
194        ttl_watermark_indices: Vec<usize>,
195        row_id_index: Option<usize>,
196        version: TableVersion,
197        retention_seconds: Option<NonZeroU32>,
198        webhook_info: Option<PbWebhookSourceInfo>,
199        engine: Engine,
200        refreshable: bool,
201    ) -> Result<Self> {
202        let input = Self::rewrite_input(input, user_distributed_by.clone(), TableType::Table)?;
203
204        let table = Self::derive_table_catalog(
205            input.clone(),
206            name.clone(),
207            database_id,
208            schema_id,
209            user_distributed_by,
210            user_order_by,
211            columns,
212            definition,
213            conflict_behavior,
214            version_column_indices,
215            Some(pk_column_indices),
216            ttl_watermark_indices,
217            row_id_index,
218            TableType::Table,
219            Some(version),
220            Cardinality::unknown(), // unknown cardinality for tables
221            retention_seconds,
222            CreateType::Foreground,
223            webhook_info,
224            engine,
225            refreshable,
226        )?;
227
228        // For refreshable tables, create staging table and progress table
229        let (staging_table, refresh_progress_table) = if refreshable {
230            let staging = Some(Self::derive_staging_table_catalog(table.clone()));
231            let progress = Some(Self::derive_refresh_progress_table_catalog(table.clone()));
232            (staging, progress)
233        } else {
234            (None, None)
235        };
236
237        tracing::info!(
238            table_name = %name,
239            refreshable = %refreshable,
240            has_staging_table = %staging_table.is_some(),
241            has_progress_table = %refresh_progress_table.is_some(),
242            "Creating StreamMaterialize with staging and progress table info"
243        );
244
245        Self::new_with_staging_and_progress(input, table, staging_table, refresh_progress_table)
246    }
247
248    /// Rewrite the input to satisfy the required distribution if necessary, according to the type.
249    fn rewrite_input(
250        input: PlanRef,
251        user_distributed_by: RequiredDist,
252        table_type: TableType,
253    ) -> Result<PlanRef> {
254        let required_dist = match input.distribution() {
255            Distribution::Single => RequiredDist::single(),
256            _ => match table_type {
257                TableType::Table => {
258                    assert_matches!(
259                        user_distributed_by,
260                        RequiredDist::ShardByKey(_) | RequiredDist::ShardByExactKey(_)
261                    );
262                    user_distributed_by
263                }
264                TableType::MaterializedView => {
265                    assert_matches!(user_distributed_by, RequiredDist::Any);
266                    // ensure the same pk will not shuffle to different node
267                    let required_dist =
268                        RequiredDist::shard_by_key(input.schema().len(), input.expect_stream_key());
269
270                    // If the input is a stream join, enforce the stream key as the materialized
271                    // view distribution key to avoid slow backfilling caused by
272                    // data skew of the dimension table join key.
273                    // See <https://github.com/risingwavelabs/risingwave/issues/12824> for more information.
274                    let is_stream_join = matches!(input.as_stream_hash_join(), Some(_join))
275                        || matches!(input.as_stream_temporal_join(), Some(_join))
276                        || matches!(input.as_stream_delta_join(), Some(_join));
277
278                    if is_stream_join {
279                        return Ok(required_dist.stream_enforce(input));
280                    }
281
282                    required_dist
283                }
284                TableType::Index => {
285                    assert_matches!(
286                        user_distributed_by,
287                        RequiredDist::PhysicalDist(Distribution::HashShard(_))
288                    );
289                    user_distributed_by
290                }
291                TableType::VectorIndex => {
292                    unreachable!("VectorIndex should not be created by StreamMaterialize")
293                }
294                TableType::Internal => unreachable!(),
295            },
296        };
297
298        required_dist.streaming_enforce_if_not_satisfies(input)
299    }
300
301    /// Derive the table catalog with the given arguments.
302    ///
303    /// - The caller must ensure the validity of the given `columns`.
304    /// - The `rewritten_input` should be generated by `rewrite_input`.
305    #[expect(clippy::too_many_arguments)]
306    fn derive_table_catalog(
307        rewritten_input: PlanRef,
308        name: String,
309        database_id: DatabaseId,
310        schema_id: SchemaId,
311        user_distributed_by: RequiredDist,
312        user_order_by: Order,
313        columns: Vec<ColumnCatalog>,
314        definition: String,
315        conflict_behavior: ConflictBehavior,
316        version_column_indices: Vec<usize>,
317        pk_column_indices: Option<Vec<usize>>, // Is some when create table
318        ttl_watermark_indices: Vec<usize>,
319        row_id_index: Option<usize>,
320        table_type: TableType,
321        version: Option<TableVersion>,
322        cardinality: Cardinality,
323        retention_seconds: Option<NonZeroU32>,
324        create_type: CreateType,
325        webhook_info: Option<PbWebhookSourceInfo>,
326        engine: Engine,
327        refreshable: bool,
328    ) -> Result<TableCatalog> {
329        let input = rewritten_input;
330
331        let value_indices = (0..columns.len()).collect_vec();
332        let distribution_key = input.distribution().dist_column_indices().to_vec();
333        let append_only = input.append_only();
334        // TODO(rc): In `TableCatalog` we still use `FixedBitSet` for watermark columns, ignoring the watermark group information.
335        // We will record the watermark group information in `TableCatalog` in the future. For now, let's flatten the watermark columns.
336        let watermark_columns = input.watermark_columns().indices().collect();
337
338        let (table_pk, mut stream_key) = if let Some(pk_column_indices) = pk_column_indices {
339            let table_pk = pk_column_indices
340                .iter()
341                .map(|idx| ColumnOrder::new(*idx, OrderType::ascending()))
342                .collect();
343            // No order by for create table, so stream key is identical to table pk.
344            (table_pk, pk_column_indices)
345        } else {
346            derive_pk(input, user_distributed_by, user_order_by, &columns)
347        };
348
349        // Add TTL watermark column to stream key.
350        // When a row comes in to a TTL-ed table and we cannot find it in the table, we still cannot tell
351        // whether it is a new row or an update to an expired row. Adding the TTL watermark column to the stream key
352        // can ensure there's no double-insert from the view of the downstream jobs. See RFC for more details.
353        for idx in ttl_watermark_indices.iter().copied() {
354            if !stream_key.contains(&idx) {
355                stream_key.push(idx);
356            }
357        }
358
359        let read_prefix_len_hint = table_pk.len();
360        Ok(TableCatalog {
361            id: TableId::placeholder(),
362            schema_id,
363            database_id,
364            associated_source_id: None,
365            name,
366            columns,
367            pk: table_pk,
368            stream_key,
369            distribution_key,
370            table_type,
371            append_only,
372            owner: risingwave_common::catalog::DEFAULT_SUPER_USER_ID,
373            fragment_id: FragmentId::placeholder(),
374            dml_fragment_id: None,
375            vnode_col_index: None,
376            row_id_index,
377            value_indices,
378            definition,
379            conflict_behavior,
380            version_column_indices,
381            read_prefix_len_hint,
382            version,
383            watermark_columns,
384            dist_key_in_pk: vec![],
385            cardinality,
386            created_at_epoch: None,
387            initialized_at_epoch: None,
388            create_type,
389            stream_job_status: StreamJobStatus::Creating,
390            description: None,
391            initialized_at_cluster_version: None,
392            created_at_cluster_version: None,
393            retention_seconds: retention_seconds.map(|i| i.into()),
394            cdc_table_id: None,
395            vnode_count: VnodeCount::Placeholder, // will be filled in by the meta service later
396            webhook_info,
397            job_id: None,
398            engine: match table_type {
399                TableType::Table => engine,
400                TableType::MaterializedView
401                | TableType::Index
402                | TableType::Internal
403                | TableType::VectorIndex => {
404                    assert_eq!(engine, Engine::Hummock);
405                    engine
406                }
407            },
408            clean_watermark_index_in_pk: None, // TODO: fill this field
409            clean_watermark_indices: ttl_watermark_indices,
410            refreshable,
411            vector_index_info: None,
412            cdc_table_type: None,
413        })
414    }
415
416    /// The staging table is a pk-only table.
417    fn derive_staging_table_catalog(
418        TableCatalog {
419            id,
420            schema_id,
421            database_id,
422            associated_source_id,
423            name,
424            columns,
425            pk,
426            stream_key,
427            table_type: _,
428            distribution_key,
429            append_only,
430            cardinality,
431            owner,
432            retention_seconds,
433            fragment_id,
434            dml_fragment_id: _,
435            vnode_col_index,
436            row_id_index,
437            value_indices: _,
438            definition,
439            conflict_behavior,
440            version_column_indices,
441            read_prefix_len_hint: _,
442            version,
443            watermark_columns: _,
444            dist_key_in_pk,
445            created_at_epoch,
446            initialized_at_epoch,
447            create_type,
448            stream_job_status,
449            description,
450            created_at_cluster_version,
451            initialized_at_cluster_version,
452            cdc_table_id,
453            vnode_count,
454            webhook_info,
455            job_id,
456            engine,
457            clean_watermark_index_in_pk,
458            clean_watermark_indices,
459            refreshable,
460            vector_index_info,
461            cdc_table_type,
462        }: TableCatalog,
463    ) -> TableCatalog {
464        tracing::info!(
465            table_name = %name,
466            "Creating staging table for refreshable table"
467        );
468
469        assert!(row_id_index.is_none());
470        assert!(retention_seconds.is_none());
471        assert!(refreshable);
472
473        // Keep pk columns in pk order, so the staging chunk projected by pk indices can be
474        // written directly without reordering.
475        let pk_col_indices = pk.iter().map(|pk| pk.column_index).collect_vec();
476        let pk_cols = pk_col_indices
477            .iter()
478            .map(|&col_idx| columns[col_idx].clone())
479            .collect_vec();
480        let mapping = ColIndexMapping::with_remaining_columns(&pk_col_indices, columns.len());
481        TableCatalog {
482            id,
483            schema_id,
484            database_id,
485            associated_source_id,
486            name,
487            value_indices: (0..pk_cols.len()).collect(),
488            columns: pk_cols,
489            pk: pk
490                .iter()
491                .map(|pk| ColumnOrder::new(mapping.map(pk.column_index), pk.order_type))
492                .collect(),
493            stream_key: mapping.try_map_all(stream_key).unwrap(),
494            vnode_col_index: vnode_col_index.map(|i| mapping.map(i)),
495            // `dist_key_in_pk` is already based on pk positions, so it should keep unchanged.
496            dist_key_in_pk,
497            distribution_key: mapping.try_map_all(distribution_key).unwrap(),
498            table_type: TableType::Internal,
499            watermark_columns: FixedBitSet::new(),
500            append_only,
501            cardinality,
502            owner,
503            retention_seconds: None,
504            fragment_id,
505            dml_fragment_id: None,
506            row_id_index: None,
507            definition,
508            conflict_behavior,
509            version_column_indices,
510            // Refresh staging tables are consumed by the refresh merge path with
511            // `iter_keyed_row_with_vnode` range scans. They are not read by
512            // fixed-prefix lookups, so prefix bloom filters would only add write cost.
513            read_prefix_len_hint: 0,
514            version,
515            created_at_epoch,
516            initialized_at_epoch,
517            create_type,
518            stream_job_status,
519            description,
520            created_at_cluster_version,
521            initialized_at_cluster_version,
522            cdc_table_id,
523            vnode_count,
524            webhook_info,
525            job_id,
526            engine,
527            clean_watermark_index_in_pk,
528            clean_watermark_indices,
529            refreshable: false,
530            vector_index_info,
531            cdc_table_type,
532        }
533    }
534
535    /// The refresh progress table is used to track refresh operation progress.
536    /// Simplified Schema: vnode (i32), `current_pos`... (variable PK from upstream),
537    /// `is_completed` (bool), `processed_rows` (i64)
538    fn derive_refresh_progress_table_catalog(table: TableCatalog) -> TableCatalog {
539        tracing::debug!(
540            table_name = %table.name,
541            "Creating refresh progress table for refreshable table"
542        );
543
544        // Define the simplified schema for the refresh progress table
545        // Schema: | vnode | current_pos... | is_completed | processed_rows |
546        let mut columns = vec![ColumnCatalog {
547            column_desc: risingwave_common::catalog::ColumnDesc::named(
548                "vnode",
549                0.into(),
550                DataType::Int16,
551            ),
552            is_hidden: false,
553        }];
554
555        // Add current_pos columns (mirror upstream table's primary key)
556        let mut col_index = 1;
557        for pk_col in &table.pk {
558            let upstream_col = &table.columns[pk_col.column_index];
559            columns.push(ColumnCatalog {
560                column_desc: risingwave_common::catalog::ColumnDesc::named(
561                    format!("pos_{}", upstream_col.name()),
562                    col_index.into(),
563                    upstream_col.data_type().clone(),
564                ),
565                is_hidden: false,
566            });
567            col_index += 1;
568        }
569
570        // Add metadata columns
571        for (name, data_type) in [
572            ("is_completed", DataType::Boolean),
573            ("processed_rows", DataType::Int64),
574        ] {
575            columns.push(ColumnCatalog {
576                column_desc: risingwave_common::catalog::ColumnDesc::named(
577                    name,
578                    col_index.into(),
579                    data_type,
580                ),
581                is_hidden: false,
582            });
583            col_index += 1;
584        }
585
586        let mut builder = TableCatalogBuilder::default();
587
588        // Add all columns to builder
589        for column in &columns {
590            builder.add_column(&(&column.column_desc).into());
591        }
592
593        // Primary key is vnode (column 0)
594        builder.add_order_column(0, OrderType::ascending());
595        builder.set_vnode_col_idx(0);
596        builder.set_value_indices((0..columns.len()).collect());
597        builder.set_dist_key_in_pk(vec![0]);
598
599        builder.build(vec![0], 1)
600    }
601
602    /// Get a reference to the stream materialize's table.
603    #[must_use]
604    pub fn table(&self) -> &TableCatalog {
605        &self.table
606    }
607
608    /// Get a reference to the stream materialize's staging table.
609    #[must_use]
610    pub fn staging_table(&self) -> Option<&TableCatalog> {
611        self.staging_table.as_ref()
612    }
613
614    /// Get a reference to the stream materialize's refresh progress table.
615    #[must_use]
616    pub fn refresh_progress_table(&self) -> Option<&TableCatalog> {
617        self.refresh_progress_table.as_ref()
618    }
619
620    pub fn name(&self) -> &str {
621        self.table.name()
622    }
623}
624
625impl Distill for StreamMaterialize {
626    fn distill<'a>(&self) -> XmlNode<'a> {
627        let table = self.table();
628
629        let column_names = (table.columns.iter())
630            .map(|col| col.name_with_hidden().to_string())
631            .map(Pretty::from)
632            .collect();
633
634        let stream_key = (table.stream_key().iter())
635            .map(|&k| table.columns[k].name().to_owned())
636            .map(Pretty::from)
637            .collect();
638
639        let pk_columns = (table.pk.iter())
640            .map(|o| table.columns[o.column_index].name().to_owned())
641            .map(Pretty::from)
642            .collect();
643        let mut vec = Vec::with_capacity(5);
644        vec.push(("columns", Pretty::Array(column_names)));
645        vec.push(("stream_key", Pretty::Array(stream_key)));
646        vec.push(("pk_columns", Pretty::Array(pk_columns)));
647        let pk_conflict_behavior = self.table.conflict_behavior().debug_to_string();
648
649        vec.push(("pk_conflict", Pretty::from(pk_conflict_behavior)));
650
651        let watermark_columns = &self.base.watermark_columns();
652        if self.base.watermark_columns().n_indices() > 0 {
653            // TODO(rc): we ignore the watermark group info here, will be fixed it later
654            let watermark_column_names = watermark_columns
655                .indices()
656                .map(|i| table.columns()[i].name_with_hidden().to_string())
657                .map(Pretty::from)
658                .collect();
659            vec.push(("watermark_columns", Pretty::Array(watermark_column_names)));
660        };
661        childless_record("StreamMaterialize", vec)
662    }
663}
664
665impl PlanTreeNodeUnary<Stream> for StreamMaterialize {
666    fn input(&self) -> PlanRef {
667        self.input.clone()
668    }
669
670    fn clone_with_input(&self, input: PlanRef) -> Self {
671        let new = Self::new_with_staging_and_progress(
672            input,
673            self.table().clone(),
674            self.staging_table.clone(),
675            self.refresh_progress_table.clone(),
676        )
677        .unwrap();
678        new.base
679            .schema()
680            .fields
681            .iter()
682            .zip_eq_fast(self.base.schema().fields.iter())
683            .for_each(|(a, b)| {
684                assert_eq!(a.data_type, b.data_type);
685            });
686        assert_eq!(new.plan_base().stream_key(), self.plan_base().stream_key());
687        new
688    }
689}
690
691impl_plan_tree_node_for_unary! { Stream, StreamMaterialize }
692
693impl StreamNode for StreamMaterialize {
694    fn to_stream_prost_body(&self, state: &mut BuildFragmentGraphState) -> PbNodeBody {
695        use risingwave_pb::stream_plan::*;
696
697        tracing::debug!(
698            table_name = %self.table().name(),
699            refreshable = %self.table().refreshable,
700            has_staging_table = %self.staging_table.is_some(),
701            has_progress_table = %self.refresh_progress_table.is_some(),
702            staging_table_name = ?self.staging_table.as_ref().map(|t| (&t.id, &t.name)),
703            progress_table_name = ?self.refresh_progress_table.as_ref().map(|t| (&t.id, &t.name)),
704            "Converting StreamMaterialize to protobuf"
705        );
706
707        let staging_table_prost = self
708            .staging_table
709            .clone()
710            .map(|t| t.with_id(state.gen_table_id_wrapped()).to_prost());
711
712        let refresh_progress_table_prost = self
713            .refresh_progress_table
714            .clone()
715            .map(|t| t.with_id(state.gen_table_id_wrapped()).to_prost());
716
717        PbNodeBody::Materialize(Box::new(MaterializeNode {
718            // Do not fill `table` and `table_id` here to avoid duplication. It will be filled by
719            // meta service after global information is generated.
720            table_id: 0.into(),
721            table: None,
722            // Pass staging table catalog if available for refreshable tables
723            staging_table: staging_table_prost,
724            // Pass refresh progress table catalog if available for refreshable tables
725            refresh_progress_table: refresh_progress_table_prost,
726
727            column_orders: self
728                .table()
729                .pk()
730                .iter()
731                .copied()
732                .map(ColumnOrder::to_protobuf)
733                .collect(),
734
735            // Equavalency: `clean_watermark_indices` is set iff there's a TTL watermark column.
736            // See `StreamMaterialize::derive_table_catalog`.
737            cleaned_by_ttl_watermark: !self.table.clean_watermark_indices.is_empty(),
738        }))
739    }
740}
741
742impl ExprRewritable<Stream> for StreamMaterialize {}
743
744impl ExprVisitable for StreamMaterialize {}