risingwave_frontend/optimizer/plan_node/
stream_sink.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::assert_matches::assert_matches;
16use std::io::{Error, ErrorKind};
17use std::sync::Arc;
18
19use anyhow::anyhow;
20use fixedbitset::FixedBitSet;
21use iceberg::spec::Transform;
22use itertools::Itertools;
23use pretty_xmlish::{Pretty, XmlNode};
24use risingwave_common::catalog::{ColumnCatalog, CreateType};
25use risingwave_common::types::{DataType, StructType};
26use risingwave_common::util::iter_util::ZipEqDebug;
27use risingwave_connector::match_sink_name_str;
28use risingwave_connector::sink::catalog::desc::SinkDesc;
29use risingwave_connector::sink::catalog::{SinkFormat, SinkFormatDesc, SinkId, SinkType};
30use risingwave_connector::sink::file_sink::fs::FsSink;
31use risingwave_connector::sink::iceberg::ICEBERG_SINK;
32use risingwave_connector::sink::trivial::TABLE_SINK;
33use risingwave_connector::sink::{
34    CONNECTOR_TYPE_KEY, SINK_TYPE_APPEND_ONLY, SINK_TYPE_DEBEZIUM, SINK_TYPE_OPTION,
35    SINK_TYPE_UPSERT, SINK_USER_FORCE_APPEND_ONLY_OPTION, SinkError,
36};
37use risingwave_pb::expr::expr_node::Type;
38use risingwave_pb::stream_plan::SinkLogStoreType;
39use risingwave_pb::stream_plan::stream_node::PbNodeBody;
40
41use super::derive::{derive_columns, derive_pk};
42use super::stream::prelude::*;
43use super::utils::{
44    Distill, IndicesDisplay, childless_record, infer_kv_log_store_table_catalog_inner,
45};
46use super::{
47    ExprRewritable, PlanBase, PlanRef, StreamNode, StreamProject, StreamSyncLogStore, generic,
48};
49use crate::TableCatalog;
50use crate::error::{ErrorCode, Result, RwError};
51use crate::expr::{ExprImpl, FunctionCall, InputRef};
52use crate::optimizer::plan_node::PlanTreeNodeUnary;
53use crate::optimizer::plan_node::expr_visitable::ExprVisitable;
54use crate::optimizer::plan_node::utils::plan_can_use_background_ddl;
55use crate::optimizer::property::{Distribution, Order, RequiredDist};
56use crate::stream_fragmenter::BuildFragmentGraphState;
57use crate::utils::WithOptionsSecResolved;
58
59const DOWNSTREAM_PK_KEY: &str = "primary_key";
60
61/// ## Why we need `PartitionComputeInfo`?
62///
63/// For some sink, it will write the data into different file based on the partition value. E.g. iceberg sink(<https://iceberg.apache.org/spec/#partitioning>)
64/// For this kind of sink, the file num can be reduced if we can shuffle the data based on the partition value. More details can be found in <https://github.com/risingwavelabs/rfcs/pull/77>.
65/// So if the `PartitionComputeInfo` provided, we will create a `StreamProject` node to compute the partition value and shuffle the data based on the partition value before the sink.
66///
67/// ## What is `PartitionComputeInfo`?
68/// The `PartitionComputeInfo` contains the information about partition compute. The stream sink will use
69/// these information to create the corresponding expression in `StreamProject` node.
70///
71/// #TODO
72/// Maybe we should move this in sink?
73pub enum PartitionComputeInfo {
74    Iceberg(IcebergPartitionInfo),
75}
76
77impl PartitionComputeInfo {
78    pub fn convert_to_expression(self, columns: &[ColumnCatalog]) -> Result<ExprImpl> {
79        match self {
80            PartitionComputeInfo::Iceberg(info) => info.convert_to_expression(columns),
81        }
82    }
83}
84
85pub struct IcebergPartitionInfo {
86    pub partition_type: StructType,
87    // (partition_field_name, partition_field_transform)
88    pub partition_fields: Vec<(String, Transform)>,
89}
90
91impl IcebergPartitionInfo {
92    #[inline]
93    fn transform_to_expression(
94        transform: &Transform,
95        col_id: usize,
96        columns: &[ColumnCatalog],
97        result_type: DataType,
98    ) -> Result<ExprImpl> {
99        match transform {
100            Transform::Identity => {
101                if columns[col_id].column_desc.data_type != result_type {
102                    return Err(ErrorCode::SinkError(Box::new(Error::new(
103                        ErrorKind::InvalidInput,
104                        format!(
105                            "The partition field {} has type {}, but the partition field is {}",
106                            columns[col_id].column_desc.name,
107                            columns[col_id].column_desc.data_type,
108                            result_type
109                        ),
110                    )))
111                    .into());
112                }
113                Ok(ExprImpl::InputRef(
114                    InputRef::new(col_id, result_type).into(),
115                ))
116            }
117            Transform::Void => Ok(ExprImpl::literal_null(result_type)),
118            _ => Ok(ExprImpl::FunctionCall(
119                FunctionCall::new_unchecked(
120                    Type::IcebergTransform,
121                    vec![
122                        ExprImpl::literal_varchar(transform.to_string()),
123                        ExprImpl::InputRef(
124                            InputRef::new(col_id, columns[col_id].column_desc.data_type.clone())
125                                .into(),
126                        ),
127                    ],
128                    result_type,
129                )
130                .into(),
131            )),
132        }
133    }
134
135    pub fn convert_to_expression(self, columns: &[ColumnCatalog]) -> Result<ExprImpl> {
136        let child_exprs = self
137            .partition_fields
138            .into_iter()
139            .zip_eq_debug(self.partition_type.iter())
140            .map(|((field_name, transform), (_, result_type))| {
141                let col_id = find_column_idx_by_name(columns, &field_name)?;
142                Self::transform_to_expression(&transform, col_id, columns, result_type.clone())
143            })
144            .collect::<Result<Vec<_>>>()?;
145
146        Ok(ExprImpl::FunctionCall(
147            FunctionCall::new_unchecked(
148                Type::Row,
149                child_exprs,
150                DataType::Struct(self.partition_type),
151            )
152            .into(),
153        ))
154    }
155}
156
157#[inline]
158fn find_column_idx_by_name(columns: &[ColumnCatalog], col_name: &str) -> Result<usize> {
159    columns
160        .iter()
161        .position(|col| col.column_desc.name == col_name)
162        .ok_or_else(|| {
163            ErrorCode::SinkError(Box::new(Error::new(
164                ErrorKind::InvalidInput,
165                format!("Sink primary key column not found: {}. Please use ',' as the delimiter for different primary key columns.", col_name),
166            )))
167                .into()
168        })
169}
170
171/// [`StreamSink`] represents a table/connector sink at the very end of the graph.
172#[derive(Debug, Clone, PartialEq, Eq, Hash)]
173pub struct StreamSink {
174    pub base: PlanBase<Stream>,
175    input: PlanRef,
176    sink_desc: SinkDesc,
177    log_store_type: SinkLogStoreType,
178}
179
180impl StreamSink {
181    #[must_use]
182    pub fn new(input: PlanRef, sink_desc: SinkDesc, log_store_type: SinkLogStoreType) -> Self {
183        let base = input
184            .plan_base()
185            .into_stream()
186            .expect("input should be stream plan")
187            .clone_with_new_plan_id();
188
189        Self {
190            base,
191            input,
192            sink_desc,
193            log_store_type,
194        }
195    }
196
197    pub fn sink_desc(&self) -> &SinkDesc {
198        &self.sink_desc
199    }
200
201    fn derive_iceberg_sink_distribution(
202        input: PlanRef,
203        partition_info: Option<PartitionComputeInfo>,
204        columns: &[ColumnCatalog],
205    ) -> Result<(RequiredDist, PlanRef, Option<usize>)> {
206        // For here, we need to add the plan node to compute the partition value, and add it as a extra column.
207        if let Some(partition_info) = partition_info {
208            let input_fields = input.schema().fields();
209
210            let mut exprs: Vec<_> = input_fields
211                .iter()
212                .enumerate()
213                .map(|(idx, field)| InputRef::new(idx, field.data_type.clone()).into())
214                .collect();
215
216            // Add the partition compute expression to the end of the exprs
217            exprs.push(partition_info.convert_to_expression(columns)?);
218            let partition_col_idx = exprs.len() - 1;
219            let project = StreamProject::new(generic::Project::new(exprs.clone(), input));
220            Ok((
221                RequiredDist::shard_by_key(project.schema().len(), &[partition_col_idx]),
222                project.into(),
223                Some(partition_col_idx),
224            ))
225        } else {
226            Ok((
227                RequiredDist::shard_by_key(input.schema().len(), input.expect_stream_key()),
228                input,
229                None,
230            ))
231        }
232    }
233
234    #[allow(clippy::too_many_arguments)]
235    pub fn create(
236        mut input: PlanRef,
237        name: String,
238        db_name: String,
239        sink_from_table_name: String,
240        target_table: Option<Arc<TableCatalog>>,
241        target_table_mapping: Option<Vec<Option<usize>>>,
242        user_distributed_by: RequiredDist,
243        user_order_by: Order,
244        user_cols: FixedBitSet,
245        out_names: Vec<String>,
246        definition: String,
247        properties: WithOptionsSecResolved,
248        format_desc: Option<SinkFormatDesc>,
249        partition_info: Option<PartitionComputeInfo>,
250    ) -> Result<Self> {
251        let sink_type =
252            Self::derive_sink_type(input.append_only(), &properties, format_desc.as_ref())?;
253
254        let columns = derive_columns(input.schema(), out_names, &user_cols)?;
255        let (pk, _) = derive_pk(input.clone(), user_order_by, &columns);
256        let mut downstream_pk = {
257            let from_properties =
258                Self::parse_downstream_pk(&columns, properties.get(DOWNSTREAM_PK_KEY))?;
259            if let Some(t) = &target_table {
260                let user_defined_primary_key_table = t.row_id_index.is_none();
261                let sink_is_append_only =
262                    sink_type == SinkType::AppendOnly || sink_type == SinkType::ForceAppendOnly;
263
264                if !user_defined_primary_key_table && !sink_is_append_only {
265                    return Err(RwError::from(ErrorCode::BindError(
266                        "Only append-only sinks can sink to a table without primary keys. please try to add type = 'append-only' in the with option. e.g. create sink s into t as select * from t1 with (type = 'append-only')".to_owned(),
267                    )));
268                }
269
270                if t.append_only && !sink_is_append_only {
271                    return Err(RwError::from(ErrorCode::BindError(
272                        "Only append-only sinks can sink to a append only table. please try to add type = 'append-only' in the with option. e.g. create sink s into t as select * from t1 with (type = 'append-only')".to_owned(),
273                    )));
274                }
275
276                if sink_type != SinkType::Upsert {
277                    vec![]
278                } else {
279                    let target_table_mapping = target_table_mapping.unwrap();
280
281                    t.pk()
282                    .iter()
283                    .map(|c| {
284                        target_table_mapping[c.column_index].ok_or(
285                            ErrorCode::SinkError(Box::new(Error::new(ErrorKind::InvalidInput,
286                                "When using non append only sink into table, the primary key of the table must be included in the sink result.".to_owned()
287                        ))).into())})
288                    .try_collect::<_, _, RwError>()?
289                }
290            } else {
291                from_properties
292            }
293        };
294        let mut extra_partition_col_idx = None;
295
296        let required_dist = match input.distribution() {
297            Distribution::Single => RequiredDist::single(),
298            _ => {
299                match properties.get("connector") {
300                    Some(s) if s == "jdbc" && sink_type == SinkType::Upsert => {
301                        if sink_type == SinkType::Upsert && downstream_pk.is_empty() {
302                            return Err(ErrorCode::SinkError(Box::new(Error::new(
303                                ErrorKind::InvalidInput,
304                                format!(
305                                    "Primary key must be defined for upsert JDBC sink. Please specify the \"{key}='pk1,pk2,...'\" in WITH options.",
306                                    key = DOWNSTREAM_PK_KEY
307                                ),
308                            )))
309                                .into());
310                        }
311                        // for upsert jdbc sink we align distribution to downstream to avoid
312                        // lock contentions
313                        RequiredDist::hash_shard(downstream_pk.as_slice())
314                    }
315                    Some(s) if s == ICEBERG_SINK => {
316                        // If user doesn't specify the downstream primary key, we use the stream key as the pk.
317                        if sink_type.is_upsert() && downstream_pk.is_empty() {
318                            downstream_pk = pk.iter().map(|k| k.column_index).collect_vec();
319                        }
320                        let (required_dist, new_input, partition_col_idx) =
321                            Self::derive_iceberg_sink_distribution(
322                                input,
323                                partition_info,
324                                &columns,
325                            )?;
326                        input = new_input;
327                        extra_partition_col_idx = partition_col_idx;
328                        required_dist
329                    }
330                    _ => {
331                        assert_matches!(user_distributed_by, RequiredDist::Any);
332                        if downstream_pk.is_empty() {
333                            RequiredDist::shard_by_key(
334                                input.schema().len(),
335                                input.expect_stream_key(),
336                            )
337                        } else {
338                            // force the same primary key be written into the same sink shard to make sure the sink pk mismatch compaction effective
339                            // https://github.com/risingwavelabs/risingwave/blob/6d88344c286f250ea8a7e7ef6b9d74dea838269e/src/stream/src/executor/sink.rs#L169-L198
340                            RequiredDist::shard_by_key(
341                                input.schema().len(),
342                                downstream_pk.as_slice(),
343                            )
344                        }
345                    }
346                }
347            }
348        };
349        let input = required_dist.enforce_if_not_satisfies(input, &Order::any())?;
350        let distribution_key = input.distribution().dist_column_indices().to_vec();
351        let create_type = if input.ctx().session_ctx().config().background_ddl()
352            && plan_can_use_background_ddl(&input)
353        {
354            CreateType::Background
355        } else {
356            CreateType::Foreground
357        };
358        let (properties, secret_refs) = properties.into_parts();
359        let is_exactly_once = properties
360            .get("is_exactly_once")
361            .is_some_and(|v| v.to_lowercase() == "true");
362        let mut sink_desc = SinkDesc {
363            id: SinkId::placeholder(),
364            name,
365            db_name,
366            sink_from_name: sink_from_table_name,
367            definition,
368            columns,
369            plan_pk: pk,
370            downstream_pk,
371            distribution_key,
372            properties,
373            secret_refs,
374            sink_type,
375            format_desc,
376            target_table: target_table.as_ref().map(|catalog| catalog.id()),
377            extra_partition_col_idx,
378            create_type,
379            is_exactly_once,
380        };
381
382        let unsupported_sink =
383            |sink: &str| Err(SinkError::Config(anyhow!("unsupported sink type {}", sink)));
384
385        // check and ensure that the sink connector is specified and supported
386        let sink_decouple = match sink_desc.properties.get(CONNECTOR_TYPE_KEY) {
387            Some(connector) => {
388                let connector_type = connector.to_lowercase();
389                match_sink_name_str!(
390                    connector_type.as_str(),
391                    SinkType,
392                    {
393                        // the table sink is created by with properties
394                        if connector == TABLE_SINK && sink_desc.target_table.is_none() {
395                            unsupported_sink(TABLE_SINK)
396                        } else {
397                            SinkType::set_default_commit_checkpoint_interval(
398                                &mut sink_desc,
399                                &input.ctx().session_ctx().config().sink_decouple(),
400                            )?;
401                            SinkType::is_sink_decouple(
402                                &input.ctx().session_ctx().config().sink_decouple(),
403                            )
404                        }
405                    },
406                    |other: &str| unsupported_sink(other)
407                )?
408            }
409            None => {
410                return Err(
411                    SinkError::Config(anyhow!("connector not specified when create sink")).into(),
412                );
413            }
414        };
415        // For file sink, it must have sink_decouple turned on.
416        if !sink_decouple && sink_desc.is_file_sink() {
417            return Err(
418                SinkError::Config(anyhow!("File sink can only be created with sink_decouple enabled. Please run `set sink_decouple = true` first.")).into(),
419            );
420        }
421        if !sink_decouple && sink_desc.is_exactly_once {
422            return Err(
423                SinkError::Config(anyhow!("Exactly once sink can only be created with sink_decouple enabled. Please run `set sink_decouple = true` first.")).into(),
424            );
425        }
426        let log_store_type = if sink_decouple {
427            SinkLogStoreType::KvLogStore
428        } else {
429            SinkLogStoreType::InMemoryLogStore
430        };
431
432        // sink into table should have logstore for sink_decouple
433        let input = if sink_decouple && target_table.is_some() {
434            StreamSyncLogStore::new(input).into()
435        } else {
436            input
437        };
438
439        Ok(Self::new(input, sink_desc, log_store_type))
440    }
441
442    fn sink_type_in_prop(properties: &WithOptionsSecResolved) -> Result<Option<SinkType>> {
443        if let Some(sink_type) = properties.get(SINK_TYPE_OPTION) {
444            if sink_type == SINK_TYPE_APPEND_ONLY {
445                return Ok(Some(SinkType::AppendOnly));
446            } else if sink_type == SINK_TYPE_DEBEZIUM || sink_type == SINK_TYPE_UPSERT {
447                return Ok(Some(SinkType::Upsert));
448            } else {
449                return Err(ErrorCode::SinkError(Box::new(Error::new(
450                    ErrorKind::InvalidInput,
451                    format!(
452                        "`{}` must be {}, {}, or {}",
453                        SINK_TYPE_OPTION,
454                        SINK_TYPE_APPEND_ONLY,
455                        SINK_TYPE_DEBEZIUM,
456                        SINK_TYPE_UPSERT
457                    ),
458                )))
459                .into());
460            }
461        }
462        Ok(None)
463    }
464
465    fn is_user_force_append_only(properties: &WithOptionsSecResolved) -> Result<bool> {
466        if properties.contains_key(SINK_USER_FORCE_APPEND_ONLY_OPTION)
467            && !properties.value_eq_ignore_case(SINK_USER_FORCE_APPEND_ONLY_OPTION, "true")
468            && !properties.value_eq_ignore_case(SINK_USER_FORCE_APPEND_ONLY_OPTION, "false")
469        {
470            return Err(ErrorCode::SinkError(Box::new(Error::new(
471                ErrorKind::InvalidInput,
472                format!(
473                    "`{}` must be true or false",
474                    SINK_USER_FORCE_APPEND_ONLY_OPTION
475                ),
476            )))
477            .into());
478        }
479        Ok(properties.value_eq_ignore_case(SINK_USER_FORCE_APPEND_ONLY_OPTION, "true"))
480    }
481
482    fn derive_sink_type(
483        input_append_only: bool,
484        properties: &WithOptionsSecResolved,
485        format_desc: Option<&SinkFormatDesc>,
486    ) -> Result<SinkType> {
487        let frontend_derived_append_only = input_append_only;
488        let (user_defined_sink_type, user_force_append_only, syntax_legacy) = match format_desc {
489            Some(f) => (
490                Some(match f.format {
491                    SinkFormat::AppendOnly => SinkType::AppendOnly,
492                    SinkFormat::Upsert | SinkFormat::Debezium => SinkType::Upsert,
493                }),
494                Self::is_user_force_append_only(&WithOptionsSecResolved::without_secrets(
495                    f.options.clone(),
496                ))?,
497                false,
498            ),
499            None => (
500                Self::sink_type_in_prop(properties)?,
501                Self::is_user_force_append_only(properties)?,
502                true,
503            ),
504        };
505
506        if user_force_append_only
507            && user_defined_sink_type.is_some()
508            && user_defined_sink_type != Some(SinkType::AppendOnly)
509        {
510            return Err(ErrorCode::SinkError(Box::new(Error::new(
511                ErrorKind::InvalidInput,
512                "The force_append_only can be only used for type = \'append-only\'".to_owned(),
513            )))
514            .into());
515        }
516
517        let user_force_append_only = if user_force_append_only && frontend_derived_append_only {
518            false
519        } else {
520            user_force_append_only
521        };
522
523        if user_force_append_only && user_defined_sink_type != Some(SinkType::AppendOnly) {
524            return Err(ErrorCode::SinkError(Box::new(Error::new(
525                ErrorKind::InvalidInput,
526                format!(
527                    "Cannot force the sink to be append-only without \"{}\".",
528                    if syntax_legacy {
529                        "type='append-only'"
530                    } else {
531                        "FORMAT PLAIN"
532                    }
533                ),
534            )))
535            .into());
536        }
537
538        if let Some(user_defined_sink_type) = user_defined_sink_type {
539            if user_defined_sink_type == SinkType::AppendOnly {
540                if user_force_append_only {
541                    return Ok(SinkType::ForceAppendOnly);
542                }
543                if !frontend_derived_append_only {
544                    return Err(ErrorCode::SinkError(Box::new(Error::new(
545                            ErrorKind::InvalidInput,
546                            format!(
547                                "The sink cannot be append-only. Please add \"force_append_only='true'\" in {} options to force the sink to be append-only. \
548                                Notice that this will cause the sink executor to drop DELETE messages and convert UPDATE messages to INSERT.",
549                                if syntax_legacy { "WITH" } else { "FORMAT ENCODE" }
550                            ),
551                        )))
552                            .into());
553                } else {
554                    return Ok(SinkType::AppendOnly);
555                }
556            }
557
558            Ok(user_defined_sink_type)
559        } else {
560            match frontend_derived_append_only {
561                true => Ok(SinkType::AppendOnly),
562                false => Ok(SinkType::Upsert),
563            }
564        }
565    }
566
567    /// Extract user-defined downstream pk columns from with options. Return the indices of the pk
568    /// columns.
569    ///
570    /// The format of `downstream_pk_str` should be 'col1,col2,...' (delimited by `,`) in order to
571    /// get parsed.
572    fn parse_downstream_pk(
573        columns: &[ColumnCatalog],
574        downstream_pk_str: Option<&String>,
575    ) -> Result<Vec<usize>> {
576        match downstream_pk_str {
577            Some(downstream_pk_str) => {
578                // If the user defines the downstream primary key, we find out their indices.
579                let downstream_pk = downstream_pk_str.split(',').collect_vec();
580                let mut downstream_pk_indices = Vec::with_capacity(downstream_pk.len());
581                for key in downstream_pk {
582                    let trimmed_key = key.trim();
583                    if trimmed_key.is_empty() {
584                        continue;
585                    }
586                    downstream_pk_indices.push(find_column_idx_by_name(columns, trimmed_key)?);
587                }
588                Ok(downstream_pk_indices)
589            }
590            None => {
591                // The user doesn't define the downstream primary key and we simply return an empty
592                // vector.
593                Ok(Vec::new())
594            }
595        }
596    }
597
598    /// The table schema is: | epoch | seq id | row op | sink columns |
599    /// Pk is: | epoch | seq id |
600    fn infer_kv_log_store_table_catalog(&self) -> TableCatalog {
601        infer_kv_log_store_table_catalog_inner(&self.input, &self.sink_desc().columns)
602    }
603}
604
605impl PlanTreeNodeUnary for StreamSink {
606    fn input(&self) -> PlanRef {
607        self.input.clone()
608    }
609
610    fn clone_with_input(&self, input: PlanRef) -> Self {
611        Self::new(input, self.sink_desc.clone(), self.log_store_type)
612        // TODO(nanderstabel): Add assertions (assert_eq!)
613    }
614}
615
616impl_plan_tree_node_for_unary! { StreamSink }
617
618impl Distill for StreamSink {
619    fn distill<'a>(&self) -> XmlNode<'a> {
620        let sink_type = if self.sink_desc.sink_type.is_append_only() {
621            "append-only"
622        } else {
623            "upsert"
624        };
625        let column_names = self
626            .sink_desc
627            .columns
628            .iter()
629            .map(|col| col.name_with_hidden().to_string())
630            .map(Pretty::from)
631            .collect();
632        let column_names = Pretty::Array(column_names);
633        let mut vec = Vec::with_capacity(3);
634        vec.push(("type", Pretty::from(sink_type)));
635        vec.push(("columns", column_names));
636        if self.sink_desc.sink_type.is_upsert() {
637            let sink_pk = IndicesDisplay {
638                indices: &self.sink_desc.downstream_pk.clone(),
639                schema: self.base.schema(),
640            };
641            vec.push(("downstream_pk", sink_pk.distill()));
642        }
643        childless_record("StreamSink", vec)
644    }
645}
646
647impl StreamNode for StreamSink {
648    fn to_stream_prost_body(&self, state: &mut BuildFragmentGraphState) -> PbNodeBody {
649        use risingwave_pb::stream_plan::*;
650
651        // We need to create a table for sink with a kv log store.
652        let table = self
653            .infer_kv_log_store_table_catalog()
654            .with_id(state.gen_table_id_wrapped());
655
656        PbNodeBody::Sink(Box::new(SinkNode {
657            sink_desc: Some(self.sink_desc.to_proto()),
658            table: Some(table.to_internal_table_prost()),
659            log_store_type: self.log_store_type as i32,
660            rate_limit: self.base.ctx().overwrite_options().sink_rate_limit,
661        }))
662    }
663}
664
665impl ExprRewritable for StreamSink {}
666
667impl ExprVisitable for StreamSink {}
668
669#[cfg(test)]
670mod test {
671    use risingwave_common::catalog::{ColumnCatalog, ColumnDesc, ColumnId};
672    use risingwave_common::types::{DataType, StructType};
673    use risingwave_common::util::iter_util::ZipEqDebug;
674    use risingwave_pb::expr::expr_node::Type;
675
676    use super::{IcebergPartitionInfo, *};
677    use crate::expr::{Expr, ExprImpl};
678
679    fn create_column_catalog() -> Vec<ColumnCatalog> {
680        vec![
681            ColumnCatalog {
682                column_desc: ColumnDesc::named("v1", ColumnId::new(1), DataType::Int32),
683                is_hidden: false,
684            },
685            ColumnCatalog {
686                column_desc: ColumnDesc::named("v2", ColumnId::new(2), DataType::Timestamptz),
687                is_hidden: false,
688            },
689            ColumnCatalog {
690                column_desc: ColumnDesc::named("v3", ColumnId::new(2), DataType::Timestamp),
691                is_hidden: false,
692            },
693        ]
694    }
695
696    #[test]
697    fn test_iceberg_convert_to_expression() {
698        let partition_type = StructType::new(vec![
699            ("f1", DataType::Int32),
700            ("f2", DataType::Int32),
701            ("f3", DataType::Int32),
702            ("f4", DataType::Int32),
703            ("f5", DataType::Int32),
704            ("f6", DataType::Int32),
705            ("f7", DataType::Int32),
706            ("f8", DataType::Int32),
707            ("f9", DataType::Int32),
708        ]);
709        let partition_fields = vec![
710            ("v1".into(), Transform::Identity),
711            ("v1".into(), Transform::Bucket(10)),
712            ("v1".into(), Transform::Truncate(3)),
713            ("v2".into(), Transform::Year),
714            ("v2".into(), Transform::Month),
715            ("v3".into(), Transform::Day),
716            ("v3".into(), Transform::Hour),
717            ("v1".into(), Transform::Void),
718            ("v3".into(), Transform::Void),
719        ];
720        let partition_info = IcebergPartitionInfo {
721            partition_type: partition_type.clone(),
722            partition_fields: partition_fields.clone(),
723        };
724        let catalog = create_column_catalog();
725        let actual_expr = partition_info.convert_to_expression(&catalog).unwrap();
726        let actual_expr = actual_expr.as_function_call().unwrap();
727
728        assert_eq!(
729            actual_expr.return_type(),
730            DataType::Struct(partition_type.clone())
731        );
732        assert_eq!(actual_expr.inputs().len(), partition_fields.len());
733        assert_eq!(actual_expr.func_type(), Type::Row);
734
735        for ((expr, (_, transform)), (_, expect_type)) in actual_expr
736            .inputs()
737            .iter()
738            .zip_eq_debug(partition_fields.iter())
739            .zip_eq_debug(partition_type.iter())
740        {
741            match transform {
742                Transform::Identity => {
743                    assert!(expr.is_input_ref());
744                    assert_eq!(expr.return_type(), *expect_type);
745                }
746                Transform::Void => {
747                    assert!(expr.is_literal());
748                    assert_eq!(expr.return_type(), *expect_type);
749                }
750                _ => {
751                    let expr = expr.as_function_call().unwrap();
752                    assert_eq!(expr.func_type(), Type::IcebergTransform);
753                    assert_eq!(expr.inputs().len(), 2);
754                    assert_eq!(
755                        expr.inputs()[0],
756                        ExprImpl::literal_varchar(transform.to_string())
757                    );
758                }
759            }
760        }
761    }
762}