risingwave_frontend/optimizer/plan_node/
stream_sink.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::assert_matches::assert_matches;
16use std::io::{Error, ErrorKind};
17use std::sync::Arc;
18
19use anyhow::anyhow;
20use iceberg::spec::Transform;
21use itertools::Itertools;
22use pretty_xmlish::{Pretty, XmlNode};
23use risingwave_common::catalog::{ColumnCatalog, CreateType};
24use risingwave_common::types::{DataType, StructType};
25use risingwave_common::util::iter_util::ZipEqDebug;
26use risingwave_connector::match_sink_name_str;
27use risingwave_connector::sink::catalog::desc::SinkDesc;
28use risingwave_connector::sink::catalog::{SinkFormat, SinkFormatDesc, SinkId, SinkType};
29use risingwave_connector::sink::file_sink::fs::FsSink;
30use risingwave_connector::sink::iceberg::ICEBERG_SINK;
31use risingwave_connector::sink::trivial::TABLE_SINK;
32use risingwave_connector::sink::{
33    CONNECTOR_TYPE_KEY, SINK_TYPE_APPEND_ONLY, SINK_TYPE_DEBEZIUM, SINK_TYPE_OPTION,
34    SINK_TYPE_UPSERT, SINK_USER_FORCE_APPEND_ONLY_OPTION, SinkError,
35};
36use risingwave_pb::expr::expr_node::Type;
37use risingwave_pb::stream_plan::SinkLogStoreType;
38use risingwave_pb::stream_plan::stream_node::PbNodeBody;
39
40use super::derive::{derive_columns, derive_pk};
41use super::stream::prelude::*;
42use super::utils::{
43    Distill, IndicesDisplay, childless_record, infer_kv_log_store_table_catalog_inner,
44};
45use super::{
46    ExprRewritable, PlanBase, PlanRef, StreamNode, StreamProject, StreamSyncLogStore, generic,
47};
48use crate::TableCatalog;
49use crate::error::{ErrorCode, Result, RwError};
50use crate::expr::{ExprImpl, FunctionCall, InputRef};
51use crate::optimizer::StreamOptimizedLogicalPlanRoot;
52use crate::optimizer::plan_node::PlanTreeNodeUnary;
53use crate::optimizer::plan_node::expr_visitable::ExprVisitable;
54use crate::optimizer::plan_node::utils::plan_can_use_background_ddl;
55use crate::optimizer::property::{Distribution, Order, RequiredDist};
56use crate::stream_fragmenter::BuildFragmentGraphState;
57use crate::utils::WithOptionsSecResolved;
58
59const DOWNSTREAM_PK_KEY: &str = "primary_key";
60
61/// ## Why we need `PartitionComputeInfo`?
62///
63/// For some sink, it will write the data into different file based on the partition value. E.g. iceberg sink(<https://iceberg.apache.org/spec/#partitioning>)
64/// For this kind of sink, the file num can be reduced if we can shuffle the data based on the partition value. More details can be found in <https://github.com/risingwavelabs/rfcs/pull/77>.
65/// So if the `PartitionComputeInfo` provided, we will create a `StreamProject` node to compute the partition value and shuffle the data based on the partition value before the sink.
66///
67/// ## What is `PartitionComputeInfo`?
68/// The `PartitionComputeInfo` contains the information about partition compute. The stream sink will use
69/// these information to create the corresponding expression in `StreamProject` node.
70///
71/// #TODO
72/// Maybe we should move this in sink?
73pub enum PartitionComputeInfo {
74    Iceberg(IcebergPartitionInfo),
75}
76
77impl PartitionComputeInfo {
78    pub fn convert_to_expression(self, columns: &[ColumnCatalog]) -> Result<ExprImpl> {
79        match self {
80            PartitionComputeInfo::Iceberg(info) => info.convert_to_expression(columns),
81        }
82    }
83}
84
85pub struct IcebergPartitionInfo {
86    pub partition_type: StructType,
87    // (partition_field_name, partition_field_transform)
88    pub partition_fields: Vec<(String, Transform)>,
89}
90
91impl IcebergPartitionInfo {
92    #[inline]
93    fn transform_to_expression(
94        transform: &Transform,
95        col_id: usize,
96        columns: &[ColumnCatalog],
97        result_type: DataType,
98    ) -> Result<ExprImpl> {
99        match transform {
100            Transform::Identity => {
101                if columns[col_id].column_desc.data_type != result_type {
102                    return Err(ErrorCode::SinkError(Box::new(Error::new(
103                        ErrorKind::InvalidInput,
104                        format!(
105                            "The partition field {} has type {}, but the partition field is {}",
106                            columns[col_id].column_desc.name,
107                            columns[col_id].column_desc.data_type,
108                            result_type
109                        ),
110                    )))
111                    .into());
112                }
113                Ok(ExprImpl::InputRef(
114                    InputRef::new(col_id, result_type).into(),
115                ))
116            }
117            Transform::Void => Ok(ExprImpl::literal_null(result_type)),
118            _ => Ok(ExprImpl::FunctionCall(
119                FunctionCall::new_unchecked(
120                    Type::IcebergTransform,
121                    vec![
122                        ExprImpl::literal_varchar(transform.to_string()),
123                        ExprImpl::InputRef(
124                            InputRef::new(col_id, columns[col_id].column_desc.data_type.clone())
125                                .into(),
126                        ),
127                    ],
128                    result_type,
129                )
130                .into(),
131            )),
132        }
133    }
134
135    pub fn convert_to_expression(self, columns: &[ColumnCatalog]) -> Result<ExprImpl> {
136        let child_exprs = self
137            .partition_fields
138            .into_iter()
139            .zip_eq_debug(self.partition_type.iter())
140            .map(|((field_name, transform), (_, result_type))| {
141                let col_id = find_column_idx_by_name(columns, &field_name)?;
142                Self::transform_to_expression(&transform, col_id, columns, result_type.clone())
143            })
144            .collect::<Result<Vec<_>>>()?;
145
146        Ok(ExprImpl::FunctionCall(
147            FunctionCall::new_unchecked(
148                Type::Row,
149                child_exprs,
150                DataType::Struct(self.partition_type),
151            )
152            .into(),
153        ))
154    }
155}
156
157#[inline]
158fn find_column_idx_by_name(columns: &[ColumnCatalog], col_name: &str) -> Result<usize> {
159    columns
160        .iter()
161        .position(|col| col.column_desc.name == col_name)
162        .ok_or_else(|| {
163            ErrorCode::SinkError(Box::new(Error::new(
164                ErrorKind::InvalidInput,
165                format!("Sink primary key column not found: {}. Please use ',' as the delimiter for different primary key columns.", col_name),
166            )))
167                .into()
168        })
169}
170
171/// [`StreamSink`] represents a table/connector sink at the very end of the graph.
172#[derive(Debug, Clone, PartialEq, Eq, Hash)]
173pub struct StreamSink {
174    pub base: PlanBase<Stream>,
175    input: PlanRef,
176    sink_desc: SinkDesc,
177    log_store_type: SinkLogStoreType,
178}
179
180impl StreamSink {
181    #[must_use]
182    pub fn new(input: PlanRef, sink_desc: SinkDesc, log_store_type: SinkLogStoreType) -> Self {
183        let base = input
184            .plan_base()
185            .into_stream()
186            .expect("input should be stream plan")
187            .clone_with_new_plan_id();
188
189        Self {
190            base,
191            input,
192            sink_desc,
193            log_store_type,
194        }
195    }
196
197    pub fn sink_desc(&self) -> &SinkDesc {
198        &self.sink_desc
199    }
200
201    fn derive_iceberg_sink_distribution(
202        input: PlanRef,
203        partition_info: Option<PartitionComputeInfo>,
204        columns: &[ColumnCatalog],
205    ) -> Result<(RequiredDist, PlanRef, Option<usize>)> {
206        // For here, we need to add the plan node to compute the partition value, and add it as a extra column.
207        if let Some(partition_info) = partition_info {
208            let input_fields = input.schema().fields();
209
210            let mut exprs: Vec<_> = input_fields
211                .iter()
212                .enumerate()
213                .map(|(idx, field)| InputRef::new(idx, field.data_type.clone()).into())
214                .collect();
215
216            // Add the partition compute expression to the end of the exprs
217            exprs.push(partition_info.convert_to_expression(columns)?);
218            let partition_col_idx = exprs.len() - 1;
219            let project = StreamProject::new(generic::Project::new(exprs.clone(), input));
220            Ok((
221                RequiredDist::shard_by_key(project.schema().len(), &[partition_col_idx]),
222                project.into(),
223                Some(partition_col_idx),
224            ))
225        } else {
226            Ok((
227                RequiredDist::shard_by_key(input.schema().len(), input.expect_stream_key()),
228                input,
229                None,
230            ))
231        }
232    }
233
234    #[allow(clippy::too_many_arguments)]
235    pub fn create(
236        StreamOptimizedLogicalPlanRoot {
237            plan: mut input,
238            required_dist: user_distributed_by,
239            required_order: user_order_by,
240            out_fields: user_cols,
241            out_names,
242            ..
243        }: StreamOptimizedLogicalPlanRoot,
244        name: String,
245        db_name: String,
246        sink_from_table_name: String,
247        target_table: Option<Arc<TableCatalog>>,
248        target_table_mapping: Option<Vec<Option<usize>>>,
249        definition: String,
250        properties: WithOptionsSecResolved,
251        format_desc: Option<SinkFormatDesc>,
252        partition_info: Option<PartitionComputeInfo>,
253    ) -> Result<Self> {
254        let sink_type =
255            Self::derive_sink_type(input.append_only(), &properties, format_desc.as_ref())?;
256
257        let columns = derive_columns(input.schema(), out_names, &user_cols)?;
258        let (pk, _) = derive_pk(input.clone(), user_order_by, &columns);
259        let mut downstream_pk = {
260            let from_properties =
261                Self::parse_downstream_pk(&columns, properties.get(DOWNSTREAM_PK_KEY))?;
262            if let Some(t) = &target_table {
263                let user_defined_primary_key_table = t.row_id_index.is_none();
264                let sink_is_append_only =
265                    sink_type == SinkType::AppendOnly || sink_type == SinkType::ForceAppendOnly;
266
267                if !user_defined_primary_key_table && !sink_is_append_only {
268                    return Err(RwError::from(ErrorCode::BindError(
269                        "Only append-only sinks can sink to a table without primary keys. please try to add type = 'append-only' in the with option. e.g. create sink s into t as select * from t1 with (type = 'append-only')".to_owned(),
270                    )));
271                }
272
273                if t.append_only && !sink_is_append_only {
274                    return Err(RwError::from(ErrorCode::BindError(
275                        "Only append-only sinks can sink to a append only table. please try to add type = 'append-only' in the with option. e.g. create sink s into t as select * from t1 with (type = 'append-only')".to_owned(),
276                    )));
277                }
278
279                if sink_type != SinkType::Upsert {
280                    vec![]
281                } else {
282                    let target_table_mapping = target_table_mapping.unwrap();
283
284                    t.pk()
285                    .iter()
286                    .map(|c| {
287                        target_table_mapping[c.column_index].ok_or(
288                            ErrorCode::SinkError(Box::new(Error::new(ErrorKind::InvalidInput,
289                                "When using non append only sink into table, the primary key of the table must be included in the sink result.".to_owned()
290                        ))).into())})
291                    .try_collect::<_, _, RwError>()?
292                }
293            } else {
294                from_properties
295            }
296        };
297        let mut extra_partition_col_idx = None;
298
299        let required_dist = match input.distribution() {
300            Distribution::Single => RequiredDist::single(),
301            _ => {
302                match properties.get("connector") {
303                    Some(s) if s == "jdbc" && sink_type == SinkType::Upsert => {
304                        if sink_type == SinkType::Upsert && downstream_pk.is_empty() {
305                            return Err(ErrorCode::SinkError(Box::new(Error::new(
306                                ErrorKind::InvalidInput,
307                                format!(
308                                    "Primary key must be defined for upsert JDBC sink. Please specify the \"{key}='pk1,pk2,...'\" in WITH options.",
309                                    key = DOWNSTREAM_PK_KEY
310                                ),
311                            )))
312                                .into());
313                        }
314                        // for upsert jdbc sink we align distribution to downstream to avoid
315                        // lock contentions
316                        RequiredDist::hash_shard(downstream_pk.as_slice())
317                    }
318                    Some(s) if s == ICEBERG_SINK => {
319                        // If user doesn't specify the downstream primary key, we use the stream key as the pk.
320                        if sink_type.is_upsert() && downstream_pk.is_empty() {
321                            downstream_pk = pk.iter().map(|k| k.column_index).collect_vec();
322                        }
323                        let (required_dist, new_input, partition_col_idx) =
324                            Self::derive_iceberg_sink_distribution(
325                                input,
326                                partition_info,
327                                &columns,
328                            )?;
329                        input = new_input;
330                        extra_partition_col_idx = partition_col_idx;
331                        required_dist
332                    }
333                    _ => {
334                        assert_matches!(user_distributed_by, RequiredDist::Any);
335                        if downstream_pk.is_empty() {
336                            RequiredDist::shard_by_key(
337                                input.schema().len(),
338                                input.expect_stream_key(),
339                            )
340                        } else {
341                            // force the same primary key be written into the same sink shard to make sure the sink pk mismatch compaction effective
342                            // https://github.com/risingwavelabs/risingwave/blob/6d88344c286f250ea8a7e7ef6b9d74dea838269e/src/stream/src/executor/sink.rs#L169-L198
343                            RequiredDist::shard_by_key(
344                                input.schema().len(),
345                                downstream_pk.as_slice(),
346                            )
347                        }
348                    }
349                }
350            }
351        };
352        let input = required_dist.enforce_if_not_satisfies(input, &Order::any())?;
353        let distribution_key = input.distribution().dist_column_indices().to_vec();
354        let create_type = if input.ctx().session_ctx().config().background_ddl()
355            && plan_can_use_background_ddl(&input)
356        {
357            CreateType::Background
358        } else {
359            CreateType::Foreground
360        };
361        let (properties, secret_refs) = properties.into_parts();
362        let is_exactly_once = properties
363            .get("is_exactly_once")
364            .is_some_and(|v| v.to_lowercase() == "true");
365        let mut sink_desc = SinkDesc {
366            id: SinkId::placeholder(),
367            name,
368            db_name,
369            sink_from_name: sink_from_table_name,
370            definition,
371            columns,
372            plan_pk: pk,
373            downstream_pk,
374            distribution_key,
375            properties,
376            secret_refs,
377            sink_type,
378            format_desc,
379            target_table: target_table.as_ref().map(|catalog| catalog.id()),
380            extra_partition_col_idx,
381            create_type,
382            is_exactly_once,
383        };
384
385        let unsupported_sink =
386            |sink: &str| Err(SinkError::Config(anyhow!("unsupported sink type {}", sink)));
387
388        // check and ensure that the sink connector is specified and supported
389        let sink_decouple = match sink_desc.properties.get(CONNECTOR_TYPE_KEY) {
390            Some(connector) => {
391                let connector_type = connector.to_lowercase();
392                match_sink_name_str!(
393                    connector_type.as_str(),
394                    SinkType,
395                    {
396                        // the table sink is created by with properties
397                        if connector == TABLE_SINK && sink_desc.target_table.is_none() {
398                            unsupported_sink(TABLE_SINK)
399                        } else {
400                            SinkType::set_default_commit_checkpoint_interval(
401                                &mut sink_desc,
402                                &input.ctx().session_ctx().config().sink_decouple(),
403                            )?;
404                            SinkType::is_sink_decouple(
405                                &input.ctx().session_ctx().config().sink_decouple(),
406                            )
407                        }
408                    },
409                    |other: &str| unsupported_sink(other)
410                )?
411            }
412            None => {
413                return Err(
414                    SinkError::Config(anyhow!("connector not specified when create sink")).into(),
415                );
416            }
417        };
418        // For file sink, it must have sink_decouple turned on.
419        if !sink_decouple && sink_desc.is_file_sink() {
420            return Err(
421                SinkError::Config(anyhow!("File sink can only be created with sink_decouple enabled. Please run `set sink_decouple = true` first.")).into(),
422            );
423        }
424        if !sink_decouple && sink_desc.is_exactly_once {
425            return Err(
426                SinkError::Config(anyhow!("Exactly once sink can only be created with sink_decouple enabled. Please run `set sink_decouple = true` first.")).into(),
427            );
428        }
429        let log_store_type = if sink_decouple {
430            SinkLogStoreType::KvLogStore
431        } else {
432            SinkLogStoreType::InMemoryLogStore
433        };
434
435        // sink into table should have logstore for sink_decouple
436        let input = if sink_decouple && target_table.is_some() {
437            StreamSyncLogStore::new(input).into()
438        } else {
439            input
440        };
441
442        Ok(Self::new(input, sink_desc, log_store_type))
443    }
444
445    fn sink_type_in_prop(properties: &WithOptionsSecResolved) -> Result<Option<SinkType>> {
446        if let Some(sink_type) = properties.get(SINK_TYPE_OPTION) {
447            if sink_type == SINK_TYPE_APPEND_ONLY {
448                return Ok(Some(SinkType::AppendOnly));
449            } else if sink_type == SINK_TYPE_DEBEZIUM || sink_type == SINK_TYPE_UPSERT {
450                return Ok(Some(SinkType::Upsert));
451            } else {
452                return Err(ErrorCode::SinkError(Box::new(Error::new(
453                    ErrorKind::InvalidInput,
454                    format!(
455                        "`{}` must be {}, {}, or {}",
456                        SINK_TYPE_OPTION,
457                        SINK_TYPE_APPEND_ONLY,
458                        SINK_TYPE_DEBEZIUM,
459                        SINK_TYPE_UPSERT
460                    ),
461                )))
462                .into());
463            }
464        }
465        Ok(None)
466    }
467
468    fn is_user_force_append_only(properties: &WithOptionsSecResolved) -> Result<bool> {
469        if properties.contains_key(SINK_USER_FORCE_APPEND_ONLY_OPTION)
470            && !properties.value_eq_ignore_case(SINK_USER_FORCE_APPEND_ONLY_OPTION, "true")
471            && !properties.value_eq_ignore_case(SINK_USER_FORCE_APPEND_ONLY_OPTION, "false")
472        {
473            return Err(ErrorCode::SinkError(Box::new(Error::new(
474                ErrorKind::InvalidInput,
475                format!(
476                    "`{}` must be true or false",
477                    SINK_USER_FORCE_APPEND_ONLY_OPTION
478                ),
479            )))
480            .into());
481        }
482        Ok(properties.value_eq_ignore_case(SINK_USER_FORCE_APPEND_ONLY_OPTION, "true"))
483    }
484
485    fn derive_sink_type(
486        input_append_only: bool,
487        properties: &WithOptionsSecResolved,
488        format_desc: Option<&SinkFormatDesc>,
489    ) -> Result<SinkType> {
490        let frontend_derived_append_only = input_append_only;
491        let (user_defined_sink_type, user_force_append_only, syntax_legacy) = match format_desc {
492            Some(f) => (
493                Some(match f.format {
494                    SinkFormat::AppendOnly => SinkType::AppendOnly,
495                    SinkFormat::Upsert | SinkFormat::Debezium => SinkType::Upsert,
496                }),
497                Self::is_user_force_append_only(&WithOptionsSecResolved::without_secrets(
498                    f.options.clone(),
499                ))?,
500                false,
501            ),
502            None => (
503                Self::sink_type_in_prop(properties)?,
504                Self::is_user_force_append_only(properties)?,
505                true,
506            ),
507        };
508
509        if user_force_append_only
510            && user_defined_sink_type.is_some()
511            && user_defined_sink_type != Some(SinkType::AppendOnly)
512        {
513            return Err(ErrorCode::SinkError(Box::new(Error::new(
514                ErrorKind::InvalidInput,
515                "The force_append_only can be only used for type = \'append-only\'".to_owned(),
516            )))
517            .into());
518        }
519
520        let user_force_append_only = if user_force_append_only && frontend_derived_append_only {
521            false
522        } else {
523            user_force_append_only
524        };
525
526        if user_force_append_only && user_defined_sink_type != Some(SinkType::AppendOnly) {
527            return Err(ErrorCode::SinkError(Box::new(Error::new(
528                ErrorKind::InvalidInput,
529                format!(
530                    "Cannot force the sink to be append-only without \"{}\".",
531                    if syntax_legacy {
532                        "type='append-only'"
533                    } else {
534                        "FORMAT PLAIN"
535                    }
536                ),
537            )))
538            .into());
539        }
540
541        if let Some(user_defined_sink_type) = user_defined_sink_type {
542            if user_defined_sink_type == SinkType::AppendOnly {
543                if user_force_append_only {
544                    return Ok(SinkType::ForceAppendOnly);
545                }
546                if !frontend_derived_append_only {
547                    return Err(ErrorCode::SinkError(Box::new(Error::new(
548                            ErrorKind::InvalidInput,
549                            format!(
550                                "The sink cannot be append-only. Please add \"force_append_only='true'\" in {} options to force the sink to be append-only. \
551                                Notice that this will cause the sink executor to drop DELETE messages and convert UPDATE messages to INSERT.",
552                                if syntax_legacy { "WITH" } else { "FORMAT ENCODE" }
553                            ),
554                        )))
555                            .into());
556                } else {
557                    return Ok(SinkType::AppendOnly);
558                }
559            }
560
561            Ok(user_defined_sink_type)
562        } else {
563            match frontend_derived_append_only {
564                true => Ok(SinkType::AppendOnly),
565                false => Ok(SinkType::Upsert),
566            }
567        }
568    }
569
570    /// Extract user-defined downstream pk columns from with options. Return the indices of the pk
571    /// columns.
572    ///
573    /// The format of `downstream_pk_str` should be 'col1,col2,...' (delimited by `,`) in order to
574    /// get parsed.
575    fn parse_downstream_pk(
576        columns: &[ColumnCatalog],
577        downstream_pk_str: Option<&String>,
578    ) -> Result<Vec<usize>> {
579        match downstream_pk_str {
580            Some(downstream_pk_str) => {
581                // If the user defines the downstream primary key, we find out their indices.
582                let downstream_pk = downstream_pk_str.split(',').collect_vec();
583                let mut downstream_pk_indices = Vec::with_capacity(downstream_pk.len());
584                for key in downstream_pk {
585                    let trimmed_key = key.trim();
586                    if trimmed_key.is_empty() {
587                        continue;
588                    }
589                    downstream_pk_indices.push(find_column_idx_by_name(columns, trimmed_key)?);
590                }
591                Ok(downstream_pk_indices)
592            }
593            None => {
594                // The user doesn't define the downstream primary key and we simply return an empty
595                // vector.
596                Ok(Vec::new())
597            }
598        }
599    }
600
601    /// The table schema is: | epoch | seq id | row op | sink columns |
602    /// Pk is: | epoch | seq id |
603    fn infer_kv_log_store_table_catalog(&self) -> TableCatalog {
604        infer_kv_log_store_table_catalog_inner(&self.input, &self.sink_desc().columns)
605    }
606}
607
608impl PlanTreeNodeUnary for StreamSink {
609    fn input(&self) -> PlanRef {
610        self.input.clone()
611    }
612
613    fn clone_with_input(&self, input: PlanRef) -> Self {
614        Self::new(input, self.sink_desc.clone(), self.log_store_type)
615        // TODO(nanderstabel): Add assertions (assert_eq!)
616    }
617}
618
619impl_plan_tree_node_for_unary! { StreamSink }
620
621impl Distill for StreamSink {
622    fn distill<'a>(&self) -> XmlNode<'a> {
623        let sink_type = if self.sink_desc.sink_type.is_append_only() {
624            "append-only"
625        } else {
626            "upsert"
627        };
628        let column_names = self
629            .sink_desc
630            .columns
631            .iter()
632            .map(|col| col.name_with_hidden().to_string())
633            .map(Pretty::from)
634            .collect();
635        let column_names = Pretty::Array(column_names);
636        let mut vec = Vec::with_capacity(3);
637        vec.push(("type", Pretty::from(sink_type)));
638        vec.push(("columns", column_names));
639        if self.sink_desc.sink_type.is_upsert() {
640            let sink_pk = IndicesDisplay {
641                indices: &self.sink_desc.downstream_pk.clone(),
642                schema: self.base.schema(),
643            };
644            vec.push(("downstream_pk", sink_pk.distill()));
645        }
646        childless_record("StreamSink", vec)
647    }
648}
649
650impl StreamNode for StreamSink {
651    fn to_stream_prost_body(&self, state: &mut BuildFragmentGraphState) -> PbNodeBody {
652        use risingwave_pb::stream_plan::*;
653
654        // We need to create a table for sink with a kv log store.
655        let table = self
656            .infer_kv_log_store_table_catalog()
657            .with_id(state.gen_table_id_wrapped());
658
659        PbNodeBody::Sink(Box::new(SinkNode {
660            sink_desc: Some(self.sink_desc.to_proto()),
661            table: Some(table.to_internal_table_prost()),
662            log_store_type: self.log_store_type as i32,
663            rate_limit: self.base.ctx().overwrite_options().sink_rate_limit,
664        }))
665    }
666}
667
668impl ExprRewritable for StreamSink {}
669
670impl ExprVisitable for StreamSink {}
671
672#[cfg(test)]
673mod test {
674    use risingwave_common::catalog::{ColumnCatalog, ColumnDesc, ColumnId};
675    use risingwave_common::types::{DataType, StructType};
676    use risingwave_common::util::iter_util::ZipEqDebug;
677    use risingwave_pb::expr::expr_node::Type;
678
679    use super::{IcebergPartitionInfo, *};
680    use crate::expr::{Expr, ExprImpl};
681
682    fn create_column_catalog() -> Vec<ColumnCatalog> {
683        vec![
684            ColumnCatalog {
685                column_desc: ColumnDesc::named("v1", ColumnId::new(1), DataType::Int32),
686                is_hidden: false,
687            },
688            ColumnCatalog {
689                column_desc: ColumnDesc::named("v2", ColumnId::new(2), DataType::Timestamptz),
690                is_hidden: false,
691            },
692            ColumnCatalog {
693                column_desc: ColumnDesc::named("v3", ColumnId::new(2), DataType::Timestamp),
694                is_hidden: false,
695            },
696        ]
697    }
698
699    #[test]
700    fn test_iceberg_convert_to_expression() {
701        let partition_type = StructType::new(vec![
702            ("f1", DataType::Int32),
703            ("f2", DataType::Int32),
704            ("f3", DataType::Int32),
705            ("f4", DataType::Int32),
706            ("f5", DataType::Int32),
707            ("f6", DataType::Int32),
708            ("f7", DataType::Int32),
709            ("f8", DataType::Int32),
710            ("f9", DataType::Int32),
711        ]);
712        let partition_fields = vec![
713            ("v1".into(), Transform::Identity),
714            ("v1".into(), Transform::Bucket(10)),
715            ("v1".into(), Transform::Truncate(3)),
716            ("v2".into(), Transform::Year),
717            ("v2".into(), Transform::Month),
718            ("v3".into(), Transform::Day),
719            ("v3".into(), Transform::Hour),
720            ("v1".into(), Transform::Void),
721            ("v3".into(), Transform::Void),
722        ];
723        let partition_info = IcebergPartitionInfo {
724            partition_type: partition_type.clone(),
725            partition_fields: partition_fields.clone(),
726        };
727        let catalog = create_column_catalog();
728        let actual_expr = partition_info.convert_to_expression(&catalog).unwrap();
729        let actual_expr = actual_expr.as_function_call().unwrap();
730
731        assert_eq!(
732            actual_expr.return_type(),
733            DataType::Struct(partition_type.clone())
734        );
735        assert_eq!(actual_expr.inputs().len(), partition_fields.len());
736        assert_eq!(actual_expr.func_type(), Type::Row);
737
738        for ((expr, (_, transform)), (_, expect_type)) in actual_expr
739            .inputs()
740            .iter()
741            .zip_eq_debug(partition_fields.iter())
742            .zip_eq_debug(partition_type.iter())
743        {
744            match transform {
745                Transform::Identity => {
746                    assert!(expr.is_input_ref());
747                    assert_eq!(expr.return_type(), *expect_type);
748                }
749                Transform::Void => {
750                    assert!(expr.is_literal());
751                    assert_eq!(expr.return_type(), *expect_type);
752                }
753                _ => {
754                    let expr = expr.as_function_call().unwrap();
755                    assert_eq!(expr.func_type(), Type::IcebergTransform);
756                    assert_eq!(expr.inputs().len(), 2);
757                    assert_eq!(
758                        expr.inputs()[0],
759                        ExprImpl::literal_varchar(transform.to_string())
760                    );
761                }
762            }
763        }
764    }
765}