risingwave_frontend/optimizer/plan_node/
stream_sink.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::assert_matches::assert_matches;
16use std::io::{Error, ErrorKind};
17use std::sync::Arc;
18
19use anyhow::anyhow;
20use fixedbitset::FixedBitSet;
21use iceberg::spec::Transform;
22use itertools::Itertools;
23use pretty_xmlish::{Pretty, XmlNode};
24use risingwave_common::catalog::{ColumnCatalog, CreateType};
25use risingwave_common::types::{DataType, StructType};
26use risingwave_common::util::iter_util::ZipEqDebug;
27use risingwave_connector::match_sink_name_str;
28use risingwave_connector::sink::catalog::desc::SinkDesc;
29use risingwave_connector::sink::catalog::{SinkFormat, SinkFormatDesc, SinkId, SinkType};
30use risingwave_connector::sink::file_sink::fs::FsSink;
31use risingwave_connector::sink::iceberg::ICEBERG_SINK;
32use risingwave_connector::sink::trivial::TABLE_SINK;
33use risingwave_connector::sink::{
34    CONNECTOR_TYPE_KEY, SINK_TYPE_APPEND_ONLY, SINK_TYPE_DEBEZIUM, SINK_TYPE_OPTION,
35    SINK_TYPE_UPSERT, SINK_USER_FORCE_APPEND_ONLY_OPTION, SinkError,
36};
37use risingwave_pb::expr::expr_node::Type;
38use risingwave_pb::stream_plan::SinkLogStoreType;
39use risingwave_pb::stream_plan::stream_node::PbNodeBody;
40
41use super::derive::{derive_columns, derive_pk};
42use super::stream::prelude::*;
43use super::utils::{
44    Distill, IndicesDisplay, childless_record, infer_kv_log_store_table_catalog_inner,
45};
46use super::{ExprRewritable, PlanBase, PlanRef, StreamNode, StreamProject, generic};
47use crate::TableCatalog;
48use crate::error::{ErrorCode, Result, RwError};
49use crate::expr::{ExprImpl, FunctionCall, InputRef};
50use crate::optimizer::plan_node::PlanTreeNodeUnary;
51use crate::optimizer::plan_node::expr_visitable::ExprVisitable;
52use crate::optimizer::plan_node::utils::plan_can_use_background_ddl;
53use crate::optimizer::property::{Distribution, Order, RequiredDist};
54use crate::stream_fragmenter::BuildFragmentGraphState;
55use crate::utils::WithOptionsSecResolved;
56
57const DOWNSTREAM_PK_KEY: &str = "primary_key";
58
59/// ## Why we need `PartitionComputeInfo`?
60///
61/// For some sink, it will write the data into different file based on the partition value. E.g. iceberg sink(<https://iceberg.apache.org/spec/#partitioning>)
62/// For this kind of sink, the file num can be reduced if we can shuffle the data based on the partition value. More details can be found in <https://github.com/risingwavelabs/rfcs/pull/77>.
63/// So if the `PartitionComputeInfo` provided, we will create a `StreamProject` node to compute the partition value and shuffle the data based on the partition value before the sink.
64///
65/// ## What is `PartitionComputeInfo`?
66/// The `PartitionComputeInfo` contains the information about partition compute. The stream sink will use
67/// these information to create the corresponding expression in `StreamProject` node.
68///
69/// #TODO
70/// Maybe we should move this in sink?
71pub enum PartitionComputeInfo {
72    Iceberg(IcebergPartitionInfo),
73}
74
75impl PartitionComputeInfo {
76    pub fn convert_to_expression(self, columns: &[ColumnCatalog]) -> Result<ExprImpl> {
77        match self {
78            PartitionComputeInfo::Iceberg(info) => info.convert_to_expression(columns),
79        }
80    }
81}
82
83pub struct IcebergPartitionInfo {
84    pub partition_type: StructType,
85    // (partition_field_name, partition_field_transform)
86    pub partition_fields: Vec<(String, Transform)>,
87}
88
89impl IcebergPartitionInfo {
90    #[inline]
91    fn transform_to_expression(
92        transform: &Transform,
93        col_id: usize,
94        columns: &[ColumnCatalog],
95        result_type: DataType,
96    ) -> Result<ExprImpl> {
97        match transform {
98            Transform::Identity => {
99                if columns[col_id].column_desc.data_type != result_type {
100                    return Err(ErrorCode::SinkError(Box::new(Error::new(
101                        ErrorKind::InvalidInput,
102                        format!(
103                            "The partition field {} has type {}, but the partition field is {}",
104                            columns[col_id].column_desc.name,
105                            columns[col_id].column_desc.data_type,
106                            result_type
107                        ),
108                    )))
109                    .into());
110                }
111                Ok(ExprImpl::InputRef(
112                    InputRef::new(col_id, result_type).into(),
113                ))
114            }
115            Transform::Void => Ok(ExprImpl::literal_null(result_type)),
116            _ => Ok(ExprImpl::FunctionCall(
117                FunctionCall::new_unchecked(
118                    Type::IcebergTransform,
119                    vec![
120                        ExprImpl::literal_varchar(transform.to_string()),
121                        ExprImpl::InputRef(
122                            InputRef::new(col_id, columns[col_id].column_desc.data_type.clone())
123                                .into(),
124                        ),
125                    ],
126                    result_type,
127                )
128                .into(),
129            )),
130        }
131    }
132
133    pub fn convert_to_expression(self, columns: &[ColumnCatalog]) -> Result<ExprImpl> {
134        let child_exprs = self
135            .partition_fields
136            .into_iter()
137            .zip_eq_debug(self.partition_type.iter())
138            .map(|((field_name, transform), (_, result_type))| {
139                let col_id = find_column_idx_by_name(columns, &field_name)?;
140                Self::transform_to_expression(&transform, col_id, columns, result_type.clone())
141            })
142            .collect::<Result<Vec<_>>>()?;
143
144        Ok(ExprImpl::FunctionCall(
145            FunctionCall::new_unchecked(
146                Type::Row,
147                child_exprs,
148                DataType::Struct(self.partition_type),
149            )
150            .into(),
151        ))
152    }
153}
154
155#[inline]
156fn find_column_idx_by_name(columns: &[ColumnCatalog], col_name: &str) -> Result<usize> {
157    columns
158        .iter()
159        .position(|col| col.column_desc.name == col_name)
160        .ok_or_else(|| {
161            ErrorCode::SinkError(Box::new(Error::new(
162                ErrorKind::InvalidInput,
163                format!("Sink primary key column not found: {}. Please use ',' as the delimiter for different primary key columns.", col_name),
164            )))
165                .into()
166        })
167}
168
169/// [`StreamSink`] represents a table/connector sink at the very end of the graph.
170#[derive(Debug, Clone, PartialEq, Eq, Hash)]
171pub struct StreamSink {
172    pub base: PlanBase<Stream>,
173    input: PlanRef,
174    sink_desc: SinkDesc,
175    log_store_type: SinkLogStoreType,
176}
177
178impl StreamSink {
179    #[must_use]
180    pub fn new(input: PlanRef, sink_desc: SinkDesc, log_store_type: SinkLogStoreType) -> Self {
181        let base = input
182            .plan_base()
183            .into_stream()
184            .expect("input should be stream plan")
185            .clone_with_new_plan_id();
186
187        Self {
188            base,
189            input,
190            sink_desc,
191            log_store_type,
192        }
193    }
194
195    pub fn sink_desc(&self) -> &SinkDesc {
196        &self.sink_desc
197    }
198
199    fn derive_iceberg_sink_distribution(
200        input: PlanRef,
201        partition_info: Option<PartitionComputeInfo>,
202        columns: &[ColumnCatalog],
203    ) -> Result<(RequiredDist, PlanRef, Option<usize>)> {
204        // For here, we need to add the plan node to compute the partition value, and add it as a extra column.
205        if let Some(partition_info) = partition_info {
206            let input_fields = input.schema().fields();
207
208            let mut exprs: Vec<_> = input_fields
209                .iter()
210                .enumerate()
211                .map(|(idx, field)| InputRef::new(idx, field.data_type.clone()).into())
212                .collect();
213
214            // Add the partition compute expression to the end of the exprs
215            exprs.push(partition_info.convert_to_expression(columns)?);
216            let partition_col_idx = exprs.len() - 1;
217            let project = StreamProject::new(generic::Project::new(exprs.clone(), input));
218            Ok((
219                RequiredDist::shard_by_key(project.schema().len(), &[partition_col_idx]),
220                project.into(),
221                Some(partition_col_idx),
222            ))
223        } else {
224            Ok((
225                RequiredDist::shard_by_key(input.schema().len(), input.expect_stream_key()),
226                input,
227                None,
228            ))
229        }
230    }
231
232    #[allow(clippy::too_many_arguments)]
233    pub fn create(
234        mut input: PlanRef,
235        name: String,
236        db_name: String,
237        sink_from_table_name: String,
238        target_table: Option<Arc<TableCatalog>>,
239        target_table_mapping: Option<Vec<Option<usize>>>,
240        user_distributed_by: RequiredDist,
241        user_order_by: Order,
242        user_cols: FixedBitSet,
243        out_names: Vec<String>,
244        definition: String,
245        properties: WithOptionsSecResolved,
246        format_desc: Option<SinkFormatDesc>,
247        partition_info: Option<PartitionComputeInfo>,
248    ) -> Result<Self> {
249        let sink_type =
250            Self::derive_sink_type(input.append_only(), &properties, format_desc.as_ref())?;
251
252        let columns = derive_columns(input.schema(), out_names, &user_cols)?;
253        let (pk, _) = derive_pk(input.clone(), user_order_by, &columns);
254        let mut downstream_pk = {
255            let from_properties =
256                Self::parse_downstream_pk(&columns, properties.get(DOWNSTREAM_PK_KEY))?;
257            if let Some(t) = &target_table {
258                let user_defined_primary_key_table = t.row_id_index.is_none();
259                let sink_is_append_only =
260                    sink_type == SinkType::AppendOnly || sink_type == SinkType::ForceAppendOnly;
261
262                if !user_defined_primary_key_table && !sink_is_append_only {
263                    return Err(RwError::from(ErrorCode::BindError(
264                        "Only append-only sinks can sink to a table without primary keys. please try to add type = 'append-only' in the with option. e.g. create sink s into t as select * from t1 with (type = 'append-only')".to_owned(),
265                    )));
266                }
267
268                if t.append_only && !sink_is_append_only {
269                    return Err(RwError::from(ErrorCode::BindError(
270                        "Only append-only sinks can sink to a append only table. please try to add type = 'append-only' in the with option. e.g. create sink s into t as select * from t1 with (type = 'append-only')".to_owned(),
271                    )));
272                }
273
274                if sink_type != SinkType::Upsert {
275                    vec![]
276                } else {
277                    let target_table_mapping = target_table_mapping.unwrap();
278
279                    t.pk()
280                    .iter()
281                    .map(|c| {
282                        target_table_mapping[c.column_index].ok_or(
283                            ErrorCode::SinkError(Box::new(Error::new(ErrorKind::InvalidInput,
284                                "When using non append only sink into table, the primary key of the table must be included in the sink result.".to_owned()
285                        ))).into())})
286                    .try_collect::<_, _, RwError>()?
287                }
288            } else {
289                from_properties
290            }
291        };
292        let mut extra_partition_col_idx = None;
293
294        let required_dist = match input.distribution() {
295            Distribution::Single => RequiredDist::single(),
296            _ => {
297                match properties.get("connector") {
298                    Some(s) if s == "jdbc" && sink_type == SinkType::Upsert => {
299                        if sink_type == SinkType::Upsert && downstream_pk.is_empty() {
300                            return Err(ErrorCode::SinkError(Box::new(Error::new(
301                                ErrorKind::InvalidInput,
302                                format!(
303                                    "Primary key must be defined for upsert JDBC sink. Please specify the \"{key}='pk1,pk2,...'\" in WITH options.",
304                                    key = DOWNSTREAM_PK_KEY
305                                ),
306                            )))
307                                .into());
308                        }
309                        // for upsert jdbc sink we align distribution to downstream to avoid
310                        // lock contentions
311                        RequiredDist::hash_shard(downstream_pk.as_slice())
312                    }
313                    Some(s) if s == ICEBERG_SINK => {
314                        // If user doesn't specify the downstream primary key, we use the stream key as the pk.
315                        if sink_type.is_upsert() && downstream_pk.is_empty() {
316                            downstream_pk = pk.iter().map(|k| k.column_index).collect_vec();
317                        }
318                        let (required_dist, new_input, partition_col_idx) =
319                            Self::derive_iceberg_sink_distribution(
320                                input,
321                                partition_info,
322                                &columns,
323                            )?;
324                        input = new_input;
325                        extra_partition_col_idx = partition_col_idx;
326                        required_dist
327                    }
328                    _ => {
329                        assert_matches!(user_distributed_by, RequiredDist::Any);
330                        if downstream_pk.is_empty() {
331                            RequiredDist::shard_by_key(
332                                input.schema().len(),
333                                input.expect_stream_key(),
334                            )
335                        } else {
336                            // force the same primary key be written into the same sink shard to make sure the sink pk mismatch compaction effective
337                            // https://github.com/risingwavelabs/risingwave/blob/6d88344c286f250ea8a7e7ef6b9d74dea838269e/src/stream/src/executor/sink.rs#L169-L198
338                            RequiredDist::shard_by_key(
339                                input.schema().len(),
340                                downstream_pk.as_slice(),
341                            )
342                        }
343                    }
344                }
345            }
346        };
347        let input = required_dist.enforce_if_not_satisfies(input, &Order::any())?;
348        let distribution_key = input.distribution().dist_column_indices().to_vec();
349        let create_type = if input.ctx().session_ctx().config().background_ddl()
350            && plan_can_use_background_ddl(&input)
351        {
352            CreateType::Background
353        } else {
354            CreateType::Foreground
355        };
356        let (properties, secret_refs) = properties.into_parts();
357        let is_exactly_once = properties
358            .get("is_exactly_once")
359            .is_some_and(|v| v.to_lowercase() == "true");
360        let mut sink_desc = SinkDesc {
361            id: SinkId::placeholder(),
362            name,
363            db_name,
364            sink_from_name: sink_from_table_name,
365            definition,
366            columns,
367            plan_pk: pk,
368            downstream_pk,
369            distribution_key,
370            properties,
371            secret_refs,
372            sink_type,
373            format_desc,
374            target_table: target_table.as_ref().map(|catalog| catalog.id()),
375            extra_partition_col_idx,
376            create_type,
377            is_exactly_once,
378        };
379
380        let unsupported_sink =
381            |sink: &str| Err(SinkError::Config(anyhow!("unsupported sink type {}", sink)));
382
383        // check and ensure that the sink connector is specified and supported
384        let sink_decouple = match sink_desc.properties.get(CONNECTOR_TYPE_KEY) {
385            Some(connector) => {
386                let connector_type = connector.to_lowercase();
387                match_sink_name_str!(
388                    connector_type.as_str(),
389                    SinkType,
390                    {
391                        // the table sink is created by with properties
392                        if connector == TABLE_SINK && sink_desc.target_table.is_none() {
393                            unsupported_sink(TABLE_SINK)
394                        } else {
395                            SinkType::set_default_commit_checkpoint_interval(
396                                &mut sink_desc,
397                                &input.ctx().session_ctx().config().sink_decouple(),
398                            )?;
399                            SinkType::is_sink_decouple(
400                                &input.ctx().session_ctx().config().sink_decouple(),
401                            )
402                        }
403                    },
404                    |other: &str| unsupported_sink(other)
405                )?
406            }
407            None => {
408                return Err(
409                    SinkError::Config(anyhow!("connector not specified when create sink")).into(),
410                );
411            }
412        };
413        // For file sink, it must have sink_decouple turned on.
414        if !sink_decouple && sink_desc.is_file_sink() {
415            return Err(
416                SinkError::Config(anyhow!("File sink can only be created with sink_decouple enabled. Please run `set sink_decouple = true` first.")).into(),
417            );
418        }
419        if !sink_decouple && sink_desc.is_exactly_once {
420            return Err(
421                SinkError::Config(anyhow!("Exactly once sink can only be created with sink_decouple enabled. Please run `set sink_decouple = true` first.")).into(),
422            );
423        }
424        let log_store_type = if sink_decouple {
425            SinkLogStoreType::KvLogStore
426        } else {
427            SinkLogStoreType::InMemoryLogStore
428        };
429
430        Ok(Self::new(input, sink_desc, log_store_type))
431    }
432
433    fn sink_type_in_prop(properties: &WithOptionsSecResolved) -> Result<Option<SinkType>> {
434        if let Some(sink_type) = properties.get(SINK_TYPE_OPTION) {
435            if sink_type == SINK_TYPE_APPEND_ONLY {
436                return Ok(Some(SinkType::AppendOnly));
437            } else if sink_type == SINK_TYPE_DEBEZIUM || sink_type == SINK_TYPE_UPSERT {
438                return Ok(Some(SinkType::Upsert));
439            } else {
440                return Err(ErrorCode::SinkError(Box::new(Error::new(
441                    ErrorKind::InvalidInput,
442                    format!(
443                        "`{}` must be {}, {}, or {}",
444                        SINK_TYPE_OPTION,
445                        SINK_TYPE_APPEND_ONLY,
446                        SINK_TYPE_DEBEZIUM,
447                        SINK_TYPE_UPSERT
448                    ),
449                )))
450                .into());
451            }
452        }
453        Ok(None)
454    }
455
456    fn is_user_force_append_only(properties: &WithOptionsSecResolved) -> Result<bool> {
457        if properties.contains_key(SINK_USER_FORCE_APPEND_ONLY_OPTION)
458            && !properties.value_eq_ignore_case(SINK_USER_FORCE_APPEND_ONLY_OPTION, "true")
459            && !properties.value_eq_ignore_case(SINK_USER_FORCE_APPEND_ONLY_OPTION, "false")
460        {
461            return Err(ErrorCode::SinkError(Box::new(Error::new(
462                ErrorKind::InvalidInput,
463                format!(
464                    "`{}` must be true or false",
465                    SINK_USER_FORCE_APPEND_ONLY_OPTION
466                ),
467            )))
468            .into());
469        }
470        Ok(properties.value_eq_ignore_case(SINK_USER_FORCE_APPEND_ONLY_OPTION, "true"))
471    }
472
473    fn derive_sink_type(
474        input_append_only: bool,
475        properties: &WithOptionsSecResolved,
476        format_desc: Option<&SinkFormatDesc>,
477    ) -> Result<SinkType> {
478        let frontend_derived_append_only = input_append_only;
479        let (user_defined_sink_type, user_force_append_only, syntax_legacy) = match format_desc {
480            Some(f) => (
481                Some(match f.format {
482                    SinkFormat::AppendOnly => SinkType::AppendOnly,
483                    SinkFormat::Upsert | SinkFormat::Debezium => SinkType::Upsert,
484                }),
485                Self::is_user_force_append_only(&WithOptionsSecResolved::without_secrets(
486                    f.options.clone(),
487                ))?,
488                false,
489            ),
490            None => (
491                Self::sink_type_in_prop(properties)?,
492                Self::is_user_force_append_only(properties)?,
493                true,
494            ),
495        };
496
497        if user_force_append_only
498            && user_defined_sink_type.is_some()
499            && user_defined_sink_type != Some(SinkType::AppendOnly)
500        {
501            return Err(ErrorCode::SinkError(Box::new(Error::new(
502                ErrorKind::InvalidInput,
503                "The force_append_only can be only used for type = \'append-only\'".to_owned(),
504            )))
505            .into());
506        }
507
508        let user_force_append_only = if user_force_append_only && frontend_derived_append_only {
509            false
510        } else {
511            user_force_append_only
512        };
513
514        if user_force_append_only && user_defined_sink_type != Some(SinkType::AppendOnly) {
515            return Err(ErrorCode::SinkError(Box::new(Error::new(
516                ErrorKind::InvalidInput,
517                format!(
518                    "Cannot force the sink to be append-only without \"{}\".",
519                    if syntax_legacy {
520                        "type='append-only'"
521                    } else {
522                        "FORMAT PLAIN"
523                    }
524                ),
525            )))
526            .into());
527        }
528
529        if let Some(user_defined_sink_type) = user_defined_sink_type {
530            if user_defined_sink_type == SinkType::AppendOnly {
531                if user_force_append_only {
532                    return Ok(SinkType::ForceAppendOnly);
533                }
534                if !frontend_derived_append_only {
535                    return Err(ErrorCode::SinkError(Box::new(Error::new(
536                            ErrorKind::InvalidInput,
537                            format!(
538                                "The sink cannot be append-only. Please add \"force_append_only='true'\" in {} options to force the sink to be append-only. \
539                                Notice that this will cause the sink executor to drop DELETE messages and convert UPDATE messages to INSERT.",
540                                if syntax_legacy { "WITH" } else { "FORMAT ENCODE" }
541                            ),
542                        )))
543                            .into());
544                } else {
545                    return Ok(SinkType::AppendOnly);
546                }
547            }
548
549            Ok(user_defined_sink_type)
550        } else {
551            match frontend_derived_append_only {
552                true => Ok(SinkType::AppendOnly),
553                false => Ok(SinkType::Upsert),
554            }
555        }
556    }
557
558    /// Extract user-defined downstream pk columns from with options. Return the indices of the pk
559    /// columns.
560    ///
561    /// The format of `downstream_pk_str` should be 'col1,col2,...' (delimited by `,`) in order to
562    /// get parsed.
563    fn parse_downstream_pk(
564        columns: &[ColumnCatalog],
565        downstream_pk_str: Option<&String>,
566    ) -> Result<Vec<usize>> {
567        match downstream_pk_str {
568            Some(downstream_pk_str) => {
569                // If the user defines the downstream primary key, we find out their indices.
570                let downstream_pk = downstream_pk_str.split(',').collect_vec();
571                let mut downstream_pk_indices = Vec::with_capacity(downstream_pk.len());
572                for key in downstream_pk {
573                    let trimmed_key = key.trim();
574                    if trimmed_key.is_empty() {
575                        continue;
576                    }
577                    downstream_pk_indices.push(find_column_idx_by_name(columns, trimmed_key)?);
578                }
579                Ok(downstream_pk_indices)
580            }
581            None => {
582                // The user doesn't define the downstream primary key and we simply return an empty
583                // vector.
584                Ok(Vec::new())
585            }
586        }
587    }
588
589    /// The table schema is: | epoch | seq id | row op | sink columns |
590    /// Pk is: | epoch | seq id |
591    fn infer_kv_log_store_table_catalog(&self) -> TableCatalog {
592        infer_kv_log_store_table_catalog_inner(&self.input, &self.sink_desc().columns)
593    }
594}
595
596impl PlanTreeNodeUnary for StreamSink {
597    fn input(&self) -> PlanRef {
598        self.input.clone()
599    }
600
601    fn clone_with_input(&self, input: PlanRef) -> Self {
602        Self::new(input, self.sink_desc.clone(), self.log_store_type)
603        // TODO(nanderstabel): Add assertions (assert_eq!)
604    }
605}
606
607impl_plan_tree_node_for_unary! { StreamSink }
608
609impl Distill for StreamSink {
610    fn distill<'a>(&self) -> XmlNode<'a> {
611        let sink_type = if self.sink_desc.sink_type.is_append_only() {
612            "append-only"
613        } else {
614            "upsert"
615        };
616        let column_names = self
617            .sink_desc
618            .columns
619            .iter()
620            .map(|col| col.name_with_hidden().to_string())
621            .map(Pretty::from)
622            .collect();
623        let column_names = Pretty::Array(column_names);
624        let mut vec = Vec::with_capacity(3);
625        vec.push(("type", Pretty::from(sink_type)));
626        vec.push(("columns", column_names));
627        if self.sink_desc.sink_type.is_upsert() {
628            let sink_pk = IndicesDisplay {
629                indices: &self.sink_desc.downstream_pk.clone(),
630                schema: self.base.schema(),
631            };
632            vec.push(("downstream_pk", sink_pk.distill()));
633        }
634        childless_record("StreamSink", vec)
635    }
636}
637
638impl StreamNode for StreamSink {
639    fn to_stream_prost_body(&self, state: &mut BuildFragmentGraphState) -> PbNodeBody {
640        use risingwave_pb::stream_plan::*;
641
642        // We need to create a table for sink with a kv log store.
643        let table = self
644            .infer_kv_log_store_table_catalog()
645            .with_id(state.gen_table_id_wrapped());
646
647        PbNodeBody::Sink(Box::new(SinkNode {
648            sink_desc: Some(self.sink_desc.to_proto()),
649            table: Some(table.to_internal_table_prost()),
650            log_store_type: self.log_store_type as i32,
651            rate_limit: self.base.ctx().overwrite_options().sink_rate_limit,
652        }))
653    }
654}
655
656impl ExprRewritable for StreamSink {}
657
658impl ExprVisitable for StreamSink {}
659
660#[cfg(test)]
661mod test {
662    use risingwave_common::catalog::{ColumnCatalog, ColumnDesc, ColumnId};
663    use risingwave_common::types::{DataType, StructType};
664    use risingwave_common::util::iter_util::ZipEqDebug;
665    use risingwave_pb::expr::expr_node::Type;
666
667    use super::{IcebergPartitionInfo, *};
668    use crate::expr::{Expr, ExprImpl};
669
670    fn create_column_catalog() -> Vec<ColumnCatalog> {
671        vec![
672            ColumnCatalog {
673                column_desc: ColumnDesc::named("v1", ColumnId::new(1), DataType::Int32),
674                is_hidden: false,
675            },
676            ColumnCatalog {
677                column_desc: ColumnDesc::named("v2", ColumnId::new(2), DataType::Timestamptz),
678                is_hidden: false,
679            },
680            ColumnCatalog {
681                column_desc: ColumnDesc::named("v3", ColumnId::new(2), DataType::Timestamp),
682                is_hidden: false,
683            },
684        ]
685    }
686
687    #[test]
688    fn test_iceberg_convert_to_expression() {
689        let partition_type = StructType::new(vec![
690            ("f1", DataType::Int32),
691            ("f2", DataType::Int32),
692            ("f3", DataType::Int32),
693            ("f4", DataType::Int32),
694            ("f5", DataType::Int32),
695            ("f6", DataType::Int32),
696            ("f7", DataType::Int32),
697            ("f8", DataType::Int32),
698            ("f9", DataType::Int32),
699        ]);
700        let partition_fields = vec![
701            ("v1".into(), Transform::Identity),
702            ("v1".into(), Transform::Bucket(10)),
703            ("v1".into(), Transform::Truncate(3)),
704            ("v2".into(), Transform::Year),
705            ("v2".into(), Transform::Month),
706            ("v3".into(), Transform::Day),
707            ("v3".into(), Transform::Hour),
708            ("v1".into(), Transform::Void),
709            ("v3".into(), Transform::Void),
710        ];
711        let partition_info = IcebergPartitionInfo {
712            partition_type: partition_type.clone(),
713            partition_fields: partition_fields.clone(),
714        };
715        let catalog = create_column_catalog();
716        let actual_expr = partition_info.convert_to_expression(&catalog).unwrap();
717        let actual_expr = actual_expr.as_function_call().unwrap();
718
719        assert_eq!(
720            actual_expr.return_type(),
721            DataType::Struct(partition_type.clone())
722        );
723        assert_eq!(actual_expr.inputs().len(), partition_fields.len());
724        assert_eq!(actual_expr.func_type(), Type::Row);
725
726        for ((expr, (_, transform)), (_, expect_type)) in actual_expr
727            .inputs()
728            .iter()
729            .zip_eq_debug(partition_fields.iter())
730            .zip_eq_debug(partition_type.iter())
731        {
732            match transform {
733                Transform::Identity => {
734                    assert!(expr.is_input_ref());
735                    assert_eq!(expr.return_type(), *expect_type);
736                }
737                Transform::Void => {
738                    assert!(expr.is_literal());
739                    assert_eq!(expr.return_type(), *expect_type);
740                }
741                _ => {
742                    let expr = expr.as_function_call().unwrap();
743                    assert_eq!(expr.func_type(), Type::IcebergTransform);
744                    assert_eq!(expr.inputs().len(), 2);
745                    assert_eq!(
746                        expr.inputs()[0],
747                        ExprImpl::literal_varchar(transform.to_string())
748                    );
749                }
750            }
751        }
752    }
753}