risingwave_frontend/optimizer/plan_node/
stream_materialize.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::assert_matches::assert_matches;
16use std::num::NonZeroU32;
17
18use fixedbitset::FixedBitSet;
19use itertools::Itertools;
20use pretty_xmlish::{Pretty, XmlNode};
21use risingwave_common::catalog::{
22    ColumnCatalog, ConflictBehavior, CreateType, Engine, OBJECT_ID_PLACEHOLDER, StreamJobStatus,
23    TableId,
24};
25use risingwave_common::hash::VnodeCount;
26use risingwave_common::types::DataType;
27use risingwave_common::util::column_index_mapping::ColIndexMapping;
28use risingwave_common::util::iter_util::ZipEqFast;
29use risingwave_common::util::sort_util::{ColumnOrder, OrderType};
30use risingwave_pb::catalog::PbWebhookSourceInfo;
31use risingwave_pb::stream_plan::stream_node::PbNodeBody;
32
33use super::derive::derive_columns;
34use super::stream::prelude::*;
35use super::utils::{Distill, TableCatalogBuilder, childless_record};
36use super::{
37    ExprRewritable, PlanTreeNodeUnary, StreamNode, StreamPlanRef as PlanRef, reorganize_elements_id,
38};
39use crate::catalog::table_catalog::{TableCatalog, TableType, TableVersion};
40use crate::catalog::{DatabaseId, SchemaId};
41use crate::error::Result;
42use crate::optimizer::StreamOptimizedLogicalPlanRoot;
43use crate::optimizer::plan_node::derive::derive_pk;
44use crate::optimizer::plan_node::expr_visitable::ExprVisitable;
45use crate::optimizer::plan_node::utils::plan_can_use_background_ddl;
46use crate::optimizer::plan_node::{PlanBase, PlanNodeMeta};
47use crate::optimizer::property::{Cardinality, Distribution, Order, RequiredDist};
48use crate::stream_fragmenter::BuildFragmentGraphState;
49
50/// Materializes a stream.
51#[derive(Debug, Clone, PartialEq, Eq, Hash)]
52pub struct StreamMaterialize {
53    pub base: PlanBase<Stream>,
54    /// Child of Materialize plan
55    input: PlanRef,
56    table: TableCatalog,
57    /// For refreshable tables, staging table for collecting new data during refresh
58    staging_table: Option<TableCatalog>,
59    /// For refreshable tables, progress table for tracking refresh progress
60    refresh_progress_table: Option<TableCatalog>,
61}
62
63impl StreamMaterialize {
64    pub fn new(input: PlanRef, table: TableCatalog) -> Result<Self> {
65        Self::new_with_staging_and_progress(input, table, None, None)
66    }
67
68    pub fn new_with_staging_and_progress(
69        input: PlanRef,
70        table: TableCatalog,
71        staging_table: Option<TableCatalog>,
72        refresh_progress_table: Option<TableCatalog>,
73    ) -> Result<Self> {
74        let kind = match table.conflict_behavior() {
75            ConflictBehavior::NoCheck => {
76                reject_upsert_input!(input, "Materialize without conflict handling")
77            }
78
79            // When conflict handling is enabled, upsert stream can be converted to retract stream.
80            ConflictBehavior::Overwrite
81            | ConflictBehavior::IgnoreConflict
82            | ConflictBehavior::DoUpdateIfNotNull => match input.stream_kind() {
83                StreamKind::AppendOnly => StreamKind::AppendOnly,
84                StreamKind::Retract | StreamKind::Upsert => StreamKind::Retract,
85            },
86        };
87        let base = PlanBase::new_stream(
88            input.ctx(),
89            input.schema().clone(),
90            Some(table.stream_key.clone()),
91            input.functional_dependency().clone(),
92            input.distribution().clone(),
93            kind,
94            input.emit_on_window_close(),
95            input.watermark_columns().clone(),
96            input.columns_monotonicity().clone(),
97        );
98
99        Ok(Self {
100            base,
101            input,
102            table,
103            staging_table,
104            refresh_progress_table,
105        })
106    }
107
108    /// Create a materialize node, for `MATERIALIZED VIEW` and `INDEX`.
109    ///
110    /// When creating index, `TableType` should be `Index`. Then, materialize will distribute keys
111    /// using `user_distributed_by`.
112    pub fn create(
113        StreamOptimizedLogicalPlanRoot {
114            plan: input,
115            required_dist: user_distributed_by,
116            required_order: user_order_by,
117            out_fields: user_cols,
118            out_names,
119            ..
120        }: StreamOptimizedLogicalPlanRoot,
121        name: String,
122        database_id: DatabaseId,
123        schema_id: SchemaId,
124        definition: String,
125        table_type: TableType,
126        cardinality: Cardinality,
127        retention_seconds: Option<NonZeroU32>,
128    ) -> Result<Self> {
129        let input = Self::rewrite_input(input, user_distributed_by, table_type)?;
130        // the hidden column name might refer some expr id
131        let input = reorganize_elements_id(input);
132        let columns = derive_columns(input.schema(), out_names, &user_cols)?;
133
134        let create_type = if matches!(table_type, TableType::MaterializedView)
135            && input.ctx().session_ctx().config().background_ddl()
136            && plan_can_use_background_ddl(&input)
137        {
138            CreateType::Background
139        } else {
140            CreateType::Foreground
141        };
142
143        // For upsert stream, use `Overwrite` conflict behavior to convert into retract stream.
144        let conflict_behavior = match input.stream_kind() {
145            StreamKind::Retract | StreamKind::AppendOnly => ConflictBehavior::NoCheck,
146            StreamKind::Upsert => ConflictBehavior::Overwrite,
147        };
148
149        let table = Self::derive_table_catalog(
150            input.clone(),
151            name,
152            database_id,
153            schema_id,
154            user_order_by,
155            columns,
156            definition,
157            conflict_behavior,
158            vec![],
159            None,
160            None,
161            table_type,
162            None,
163            cardinality,
164            retention_seconds,
165            create_type,
166            None,
167            Engine::Hummock,
168            false,
169        )?;
170
171        Self::new(input, table)
172    }
173
174    /// Create a materialize node, for `TABLE`.
175    ///
176    /// Different from `create`, the `columns` are passed in directly, instead of being derived from
177    /// the input. So the column IDs are preserved from the SQL columns binding step and will be
178    /// consistent with the source node and DML node.
179    #[allow(clippy::too_many_arguments)]
180    pub fn create_for_table(
181        input: PlanRef,
182        name: String,
183        database_id: DatabaseId,
184        schema_id: SchemaId,
185        user_distributed_by: RequiredDist,
186        user_order_by: Order,
187        columns: Vec<ColumnCatalog>,
188        definition: String,
189        conflict_behavior: ConflictBehavior,
190        version_column_indices: Vec<usize>,
191        pk_column_indices: Vec<usize>,
192        row_id_index: Option<usize>,
193        version: TableVersion,
194        retention_seconds: Option<NonZeroU32>,
195        webhook_info: Option<PbWebhookSourceInfo>,
196        engine: Engine,
197        refreshable: bool,
198    ) -> Result<Self> {
199        let input = Self::rewrite_input(input, user_distributed_by, TableType::Table)?;
200
201        let table = Self::derive_table_catalog(
202            input.clone(),
203            name.clone(),
204            database_id,
205            schema_id,
206            user_order_by.clone(),
207            columns.clone(),
208            definition.clone(),
209            conflict_behavior,
210            version_column_indices,
211            Some(pk_column_indices.clone()),
212            row_id_index,
213            TableType::Table,
214            Some(version.clone()),
215            Cardinality::unknown(), // unknown cardinality for tables
216            retention_seconds,
217            CreateType::Foreground,
218            webhook_info.clone(),
219            engine,
220            refreshable,
221        )?;
222
223        // For refreshable tables, create staging table and progress table
224        let (staging_table, refresh_progress_table) = if refreshable {
225            let staging = Some(Self::derive_staging_table_catalog(table.clone()));
226            let progress = Some(Self::derive_refresh_progress_table_catalog(table.clone()));
227            (staging, progress)
228        } else {
229            (None, None)
230        };
231
232        tracing::info!(
233            table_name = %name,
234            refreshable = %refreshable,
235            has_staging_table = %staging_table.is_some(),
236            has_progress_table = %refresh_progress_table.is_some(),
237            "Creating StreamMaterialize with staging and progress table info"
238        );
239
240        Self::new_with_staging_and_progress(input, table, staging_table, refresh_progress_table)
241    }
242
243    /// Rewrite the input to satisfy the required distribution if necessary, according to the type.
244    fn rewrite_input(
245        input: PlanRef,
246        user_distributed_by: RequiredDist,
247        table_type: TableType,
248    ) -> Result<PlanRef> {
249        let required_dist = match input.distribution() {
250            Distribution::Single => RequiredDist::single(),
251            _ => match table_type {
252                TableType::Table => {
253                    assert_matches!(user_distributed_by, RequiredDist::ShardByKey(_));
254                    user_distributed_by
255                }
256                TableType::MaterializedView => {
257                    assert_matches!(user_distributed_by, RequiredDist::Any);
258                    // ensure the same pk will not shuffle to different node
259                    let required_dist =
260                        RequiredDist::shard_by_key(input.schema().len(), input.expect_stream_key());
261
262                    // If the input is a stream join, enforce the stream key as the materialized
263                    // view distribution key to avoid slow backfilling caused by
264                    // data skew of the dimension table join key.
265                    // See <https://github.com/risingwavelabs/risingwave/issues/12824> for more information.
266                    let is_stream_join = matches!(input.as_stream_hash_join(), Some(_join))
267                        || matches!(input.as_stream_temporal_join(), Some(_join))
268                        || matches!(input.as_stream_delta_join(), Some(_join));
269
270                    if is_stream_join {
271                        return Ok(required_dist.stream_enforce(input));
272                    }
273
274                    required_dist
275                }
276                TableType::Index => {
277                    assert_matches!(
278                        user_distributed_by,
279                        RequiredDist::PhysicalDist(Distribution::HashShard(_))
280                    );
281                    user_distributed_by
282                }
283                TableType::VectorIndex => {
284                    unreachable!("VectorIndex should not be created by StreamMaterialize")
285                }
286                TableType::Internal => unreachable!(),
287            },
288        };
289
290        required_dist.streaming_enforce_if_not_satisfies(input)
291    }
292
293    /// Derive the table catalog with the given arguments.
294    ///
295    /// - The caller must ensure the validity of the given `columns`.
296    /// - The `rewritten_input` should be generated by `rewrite_input`.
297    #[expect(clippy::too_many_arguments)]
298    fn derive_table_catalog(
299        rewritten_input: PlanRef,
300        name: String,
301        database_id: DatabaseId,
302        schema_id: SchemaId,
303        user_order_by: Order,
304        columns: Vec<ColumnCatalog>,
305        definition: String,
306        conflict_behavior: ConflictBehavior,
307        version_column_indices: Vec<usize>,
308        pk_column_indices: Option<Vec<usize>>, // Is some when create table
309        row_id_index: Option<usize>,
310        table_type: TableType,
311        version: Option<TableVersion>,
312        cardinality: Cardinality,
313        retention_seconds: Option<NonZeroU32>,
314        create_type: CreateType,
315        webhook_info: Option<PbWebhookSourceInfo>,
316        engine: Engine,
317        refreshable: bool,
318    ) -> Result<TableCatalog> {
319        let input = rewritten_input;
320
321        let value_indices = (0..columns.len()).collect_vec();
322        let distribution_key = input.distribution().dist_column_indices().to_vec();
323        let append_only = input.append_only();
324        // TODO(rc): In `TableCatalog` we still use `FixedBitSet` for watermark columns, ignoring the watermark group information.
325        // We will record the watermark group information in `TableCatalog` in the future. For now, let's flatten the watermark columns.
326        let watermark_columns = input.watermark_columns().indices().collect();
327
328        let (table_pk, stream_key) = if let Some(pk_column_indices) = pk_column_indices {
329            let table_pk = pk_column_indices
330                .iter()
331                .map(|idx| ColumnOrder::new(*idx, OrderType::ascending()))
332                .collect();
333            // No order by for create table, so stream key is identical to table pk.
334            (table_pk, pk_column_indices)
335        } else {
336            derive_pk(input, user_order_by, &columns)
337        };
338        // assert: `stream_key` is a subset of `table_pk`
339
340        let read_prefix_len_hint = table_pk.len();
341        Ok(TableCatalog {
342            id: TableId::placeholder(),
343            schema_id,
344            database_id,
345            associated_source_id: None,
346            name,
347            columns,
348            pk: table_pk,
349            stream_key,
350            distribution_key,
351            table_type,
352            append_only,
353            owner: risingwave_common::catalog::DEFAULT_SUPER_USER_ID,
354            fragment_id: OBJECT_ID_PLACEHOLDER,
355            dml_fragment_id: None,
356            vnode_col_index: None,
357            row_id_index,
358            value_indices,
359            definition,
360            conflict_behavior,
361            version_column_indices,
362            read_prefix_len_hint,
363            version,
364            watermark_columns,
365            dist_key_in_pk: vec![],
366            cardinality,
367            created_at_epoch: None,
368            initialized_at_epoch: None,
369            cleaned_by_watermark: false,
370            create_type,
371            stream_job_status: StreamJobStatus::Creating,
372            description: None,
373            initialized_at_cluster_version: None,
374            created_at_cluster_version: None,
375            retention_seconds: retention_seconds.map(|i| i.into()),
376            cdc_table_id: None,
377            vnode_count: VnodeCount::Placeholder, // will be filled in by the meta service later
378            webhook_info,
379            job_id: None,
380            engine: match table_type {
381                TableType::Table => engine,
382                TableType::MaterializedView
383                | TableType::Index
384                | TableType::Internal
385                | TableType::VectorIndex => {
386                    assert_eq!(engine, Engine::Hummock);
387                    engine
388                }
389            },
390            clean_watermark_index_in_pk: None, // TODO: fill this field
391            refreshable,
392            vector_index_info: None,
393            cdc_table_type: None,
394        })
395    }
396
397    /// The staging table is a pk-only table.
398    fn derive_staging_table_catalog(
399        TableCatalog {
400            id,
401            schema_id,
402            database_id,
403            associated_source_id,
404            name,
405            columns,
406            pk,
407            stream_key,
408            table_type: _,
409            distribution_key,
410            append_only,
411            cardinality,
412            owner,
413            retention_seconds,
414            fragment_id,
415            dml_fragment_id: _,
416            vnode_col_index,
417            row_id_index,
418            value_indices: _,
419            definition,
420            conflict_behavior,
421            version_column_indices,
422            read_prefix_len_hint,
423            version,
424            watermark_columns: _,
425            dist_key_in_pk,
426            created_at_epoch,
427            initialized_at_epoch,
428            cleaned_by_watermark,
429            create_type,
430            stream_job_status,
431            description,
432            created_at_cluster_version,
433            initialized_at_cluster_version,
434            cdc_table_id,
435            vnode_count,
436            webhook_info,
437            job_id,
438            engine,
439            clean_watermark_index_in_pk,
440            refreshable,
441            vector_index_info,
442            cdc_table_type,
443        }: TableCatalog,
444    ) -> TableCatalog {
445        tracing::info!(
446            table_name = %name,
447            "Creating staging table for refreshable table"
448        );
449
450        assert!(row_id_index.is_none());
451        assert!(retention_seconds.is_none());
452        assert!(refreshable);
453
454        // only keep pk columns
455        let mut pk_col_indices = vec![];
456        let mut pk_cols = vec![];
457        for (i, col) in columns.iter().enumerate() {
458            if pk.iter().any(|pk| pk.column_index == i) {
459                pk_col_indices.push(i);
460                pk_cols.push(col.clone());
461            }
462        }
463        let mapping = ColIndexMapping::with_remaining_columns(&pk_col_indices, columns.len());
464
465        TableCatalog {
466            id,
467            schema_id,
468            database_id,
469            associated_source_id,
470            name,
471            value_indices: (0..pk_cols.len()).collect(),
472            columns: pk_cols,
473            pk: pk
474                .iter()
475                .map(|pk| ColumnOrder::new(mapping.map(pk.column_index), pk.order_type))
476                .collect(),
477            stream_key: mapping.try_map_all(stream_key).unwrap(),
478            vnode_col_index: vnode_col_index.map(|i| mapping.map(i)),
479            dist_key_in_pk: mapping.try_map_all(dist_key_in_pk).unwrap(),
480            distribution_key: mapping.try_map_all(distribution_key).unwrap(),
481            table_type: TableType::Internal,
482            watermark_columns: FixedBitSet::new(),
483            append_only,
484            cardinality,
485            owner,
486            retention_seconds: None,
487            fragment_id,
488            dml_fragment_id: None,
489            row_id_index: None,
490            definition,
491            conflict_behavior,
492            version_column_indices,
493            read_prefix_len_hint,
494            version,
495            created_at_epoch,
496            initialized_at_epoch,
497            cleaned_by_watermark,
498            create_type,
499            stream_job_status,
500            description,
501            created_at_cluster_version,
502            initialized_at_cluster_version,
503            cdc_table_id,
504            vnode_count,
505            webhook_info,
506            job_id,
507            engine,
508            clean_watermark_index_in_pk,
509            refreshable: false,
510            vector_index_info,
511            cdc_table_type,
512        }
513    }
514
515    /// The refresh progress table is used to track refresh operation progress.
516    /// Simplified Schema: vnode (i32), `current_pos`... (variable PK from upstream),
517    /// `is_completed` (bool), `processed_rows` (i64)
518    fn derive_refresh_progress_table_catalog(table: TableCatalog) -> TableCatalog {
519        tracing::debug!(
520            table_name = %table.name,
521            "Creating refresh progress table for refreshable table"
522        );
523
524        // Define the simplified schema for the refresh progress table
525        // Schema: | vnode | current_pos... | is_completed | processed_rows |
526        let mut columns = vec![ColumnCatalog {
527            column_desc: risingwave_common::catalog::ColumnDesc::named(
528                "vnode",
529                0.into(),
530                DataType::Int16,
531            ),
532            is_hidden: false,
533        }];
534
535        // Add current_pos columns (mirror upstream table's primary key)
536        let mut col_index = 1;
537        for pk_col in &table.pk {
538            let upstream_col = &table.columns[pk_col.column_index];
539            columns.push(ColumnCatalog {
540                column_desc: risingwave_common::catalog::ColumnDesc::named(
541                    format!("pos_{}", upstream_col.name()),
542                    col_index.into(),
543                    upstream_col.data_type().clone(),
544                ),
545                is_hidden: false,
546            });
547            col_index += 1;
548        }
549
550        // Add metadata columns
551        for (name, data_type) in [
552            ("is_completed", DataType::Boolean),
553            ("processed_rows", DataType::Int64),
554        ] {
555            columns.push(ColumnCatalog {
556                column_desc: risingwave_common::catalog::ColumnDesc::named(
557                    name,
558                    col_index.into(),
559                    data_type,
560                ),
561                is_hidden: false,
562            });
563            col_index += 1;
564        }
565
566        let mut builder = TableCatalogBuilder::default();
567
568        // Add all columns to builder
569        for column in &columns {
570            builder.add_column(&(&column.column_desc).into());
571        }
572
573        // Primary key is vnode (column 0)
574        builder.add_order_column(0, OrderType::ascending());
575        builder.set_vnode_col_idx(0);
576        builder.set_value_indices((0..columns.len()).collect());
577        builder.set_dist_key_in_pk(vec![0]);
578
579        builder.build(vec![0], 1)
580    }
581
582    /// Get a reference to the stream materialize's table.
583    #[must_use]
584    pub fn table(&self) -> &TableCatalog {
585        &self.table
586    }
587
588    /// Get a reference to the stream materialize's staging table.
589    #[must_use]
590    pub fn staging_table(&self) -> Option<&TableCatalog> {
591        self.staging_table.as_ref()
592    }
593
594    /// Get a reference to the stream materialize's refresh progress table.
595    #[must_use]
596    pub fn refresh_progress_table(&self) -> Option<&TableCatalog> {
597        self.refresh_progress_table.as_ref()
598    }
599
600    pub fn name(&self) -> &str {
601        self.table.name()
602    }
603}
604
605impl Distill for StreamMaterialize {
606    fn distill<'a>(&self) -> XmlNode<'a> {
607        let table = self.table();
608
609        let column_names = (table.columns.iter())
610            .map(|col| col.name_with_hidden().to_string())
611            .map(Pretty::from)
612            .collect();
613
614        let stream_key = (table.stream_key.iter())
615            .map(|&k| table.columns[k].name().to_owned())
616            .map(Pretty::from)
617            .collect();
618
619        let pk_columns = (table.pk.iter())
620            .map(|o| table.columns[o.column_index].name().to_owned())
621            .map(Pretty::from)
622            .collect();
623        let mut vec = Vec::with_capacity(5);
624        vec.push(("columns", Pretty::Array(column_names)));
625        vec.push(("stream_key", Pretty::Array(stream_key)));
626        vec.push(("pk_columns", Pretty::Array(pk_columns)));
627        let pk_conflict_behavior = self.table.conflict_behavior().debug_to_string();
628
629        vec.push(("pk_conflict", Pretty::from(pk_conflict_behavior)));
630
631        let watermark_columns = &self.base.watermark_columns();
632        if self.base.watermark_columns().n_indices() > 0 {
633            // TODO(rc): we ignore the watermark group info here, will be fixed it later
634            let watermark_column_names = watermark_columns
635                .indices()
636                .map(|i| table.columns()[i].name_with_hidden().to_string())
637                .map(Pretty::from)
638                .collect();
639            vec.push(("watermark_columns", Pretty::Array(watermark_column_names)));
640        };
641        childless_record("StreamMaterialize", vec)
642    }
643}
644
645impl PlanTreeNodeUnary<Stream> for StreamMaterialize {
646    fn input(&self) -> PlanRef {
647        self.input.clone()
648    }
649
650    fn clone_with_input(&self, input: PlanRef) -> Self {
651        let new = Self::new_with_staging_and_progress(
652            input,
653            self.table().clone(),
654            self.staging_table.clone(),
655            self.refresh_progress_table.clone(),
656        )
657        .unwrap();
658        new.base
659            .schema()
660            .fields
661            .iter()
662            .zip_eq_fast(self.base.schema().fields.iter())
663            .for_each(|(a, b)| {
664                assert_eq!(a.data_type, b.data_type);
665            });
666        assert_eq!(new.plan_base().stream_key(), self.plan_base().stream_key());
667        new
668    }
669}
670
671impl_plan_tree_node_for_unary! { Stream, StreamMaterialize }
672
673impl StreamNode for StreamMaterialize {
674    fn to_stream_prost_body(&self, state: &mut BuildFragmentGraphState) -> PbNodeBody {
675        use risingwave_pb::stream_plan::*;
676
677        tracing::debug!(
678            table_name = %self.table().name(),
679            refreshable = %self.table().refreshable,
680            has_staging_table = %self.staging_table.is_some(),
681            has_progress_table = %self.refresh_progress_table.is_some(),
682            staging_table_name = ?self.staging_table.as_ref().map(|t| (&t.id, &t.name)),
683            progress_table_name = ?self.refresh_progress_table.as_ref().map(|t| (&t.id, &t.name)),
684            "Converting StreamMaterialize to protobuf"
685        );
686
687        let staging_table_prost = self
688            .staging_table
689            .clone()
690            .map(|t| t.with_id(state.gen_table_id_wrapped()).to_prost());
691
692        let refresh_progress_table_prost = self
693            .refresh_progress_table
694            .clone()
695            .map(|t| t.with_id(state.gen_table_id_wrapped()).to_prost());
696
697        PbNodeBody::Materialize(Box::new(MaterializeNode {
698            // Do not fill `table` and `table_id` here to avoid duplication. It will be filled by
699            // meta service after global information is generated.
700            table_id: 0,
701            table: None,
702            // Pass staging table catalog if available for refreshable tables
703            staging_table: staging_table_prost,
704            // Pass refresh progress table catalog if available for refreshable tables
705            refresh_progress_table: refresh_progress_table_prost,
706
707            column_orders: self
708                .table()
709                .pk()
710                .iter()
711                .copied()
712                .map(ColumnOrder::to_protobuf)
713                .collect(),
714        }))
715    }
716}
717
718impl ExprRewritable<Stream> for StreamMaterialize {}
719
720impl ExprVisitable for StreamMaterialize {}