risingwave_frontend/optimizer/plan_node/
stream_sink.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::assert_matches::assert_matches;
16use std::sync::Arc;
17
18use iceberg::spec::Transform;
19use itertools::Itertools;
20use pretty_xmlish::{Pretty, XmlNode};
21use risingwave_common::catalog::{ColumnCatalog, CreateType, FieldLike};
22use risingwave_common::types::{DataType, StructType};
23use risingwave_common::util::iter_util::ZipEqDebug;
24use risingwave_connector::match_sink_name_str;
25use risingwave_connector::sink::catalog::desc::SinkDesc;
26use risingwave_connector::sink::catalog::{SinkFormat, SinkFormatDesc, SinkId, SinkType};
27use risingwave_connector::sink::file_sink::fs::FsSink;
28use risingwave_connector::sink::iceberg::ICEBERG_SINK;
29use risingwave_connector::sink::trivial::TABLE_SINK;
30use risingwave_connector::sink::{
31    CONNECTOR_TYPE_KEY, SINK_TYPE_APPEND_ONLY, SINK_TYPE_DEBEZIUM, SINK_TYPE_OPTION,
32    SINK_TYPE_UPSERT, SINK_USER_FORCE_APPEND_ONLY_OPTION,
33};
34use risingwave_pb::expr::expr_node::Type;
35use risingwave_pb::stream_plan::SinkLogStoreType;
36use risingwave_pb::stream_plan::stream_node::PbNodeBody;
37
38use super::derive::{derive_columns, derive_pk};
39use super::stream::prelude::*;
40use super::utils::{
41    Distill, IndicesDisplay, childless_record, infer_kv_log_store_table_catalog_inner,
42};
43use super::{
44    ExprRewritable, PlanBase, StreamExchange, StreamNode, StreamPlanRef as PlanRef, StreamProject,
45    StreamSyncLogStore, generic,
46};
47use crate::TableCatalog;
48use crate::error::{ErrorCode, Result, RwError};
49use crate::expr::{ExprImpl, FunctionCall, InputRef};
50use crate::optimizer::StreamOptimizedLogicalPlanRoot;
51use crate::optimizer::plan_node::PlanTreeNodeUnary;
52use crate::optimizer::plan_node::expr_visitable::ExprVisitable;
53use crate::optimizer::plan_node::utils::plan_can_use_background_ddl;
54use crate::optimizer::property::{Distribution, RequiredDist};
55use crate::stream_fragmenter::BuildFragmentGraphState;
56use crate::utils::WithOptionsSecResolved;
57
58const DOWNSTREAM_PK_KEY: &str = "primary_key";
59const CREATE_TABLE_IF_NOT_EXISTS: &str = "create_table_if_not_exists";
60
61/// ## Why we need `PartitionComputeInfo`?
62///
63/// For some sink, it will write the data into different file based on the partition value. E.g. iceberg sink(<https://iceberg.apache.org/spec/#partitioning>)
64/// For this kind of sink, the file num can be reduced if we can shuffle the data based on the partition value. More details can be found in <https://github.com/risingwavelabs/rfcs/pull/77>.
65/// So if the `PartitionComputeInfo` provided, we will create a `StreamProject` node to compute the partition value and shuffle the data based on the partition value before the sink.
66///
67/// ## What is `PartitionComputeInfo`?
68/// The `PartitionComputeInfo` contains the information about partition compute. The stream sink will use
69/// these information to create the corresponding expression in `StreamProject` node.
70///
71/// #TODO
72/// Maybe we should move this in sink?
73pub enum PartitionComputeInfo {
74    Iceberg(IcebergPartitionInfo),
75}
76
77impl PartitionComputeInfo {
78    pub fn convert_to_expression(self, columns: &[ColumnCatalog]) -> Result<ExprImpl> {
79        match self {
80            PartitionComputeInfo::Iceberg(info) => info.convert_to_expression(columns),
81        }
82    }
83}
84
85pub struct IcebergPartitionInfo {
86    pub partition_type: StructType,
87    // (partition_field_name, partition_field_transform)
88    pub partition_fields: Vec<(String, Transform)>,
89}
90
91impl IcebergPartitionInfo {
92    #[inline]
93    fn transform_to_expression(
94        transform: &Transform,
95        col_id: usize,
96        columns: &[ColumnCatalog],
97        result_type: DataType,
98    ) -> Result<ExprImpl> {
99        match transform {
100            Transform::Identity => {
101                if columns[col_id].column_desc.data_type != result_type {
102                    return Err(ErrorCode::InvalidInputSyntax(format!(
103                        "The partition field {} has type {}, but the partition field is {}",
104                        columns[col_id].column_desc.name,
105                        columns[col_id].column_desc.data_type,
106                        result_type
107                    ))
108                    .into());
109                }
110                Ok(ExprImpl::InputRef(
111                    InputRef::new(col_id, result_type).into(),
112                ))
113            }
114            Transform::Void => Ok(ExprImpl::literal_null(result_type)),
115            _ => Ok(ExprImpl::FunctionCall(
116                FunctionCall::new_unchecked(
117                    Type::IcebergTransform,
118                    vec![
119                        ExprImpl::literal_varchar(transform.to_string()),
120                        ExprImpl::InputRef(
121                            InputRef::new(col_id, columns[col_id].column_desc.data_type.clone())
122                                .into(),
123                        ),
124                    ],
125                    result_type,
126                )
127                .into(),
128            )),
129        }
130    }
131
132    pub fn convert_to_expression(self, columns: &[ColumnCatalog]) -> Result<ExprImpl> {
133        let child_exprs = self
134            .partition_fields
135            .into_iter()
136            .zip_eq_debug(self.partition_type.iter())
137            .map(|((field_name, transform), (_, result_type))| {
138                let col_id = find_column_idx_by_name(columns, &field_name)?;
139                Self::transform_to_expression(&transform, col_id, columns, result_type.clone())
140            })
141            .collect::<Result<Vec<_>>>()?;
142
143        Ok(ExprImpl::FunctionCall(
144            FunctionCall::new_unchecked(
145                Type::Row,
146                child_exprs,
147                DataType::Struct(self.partition_type),
148            )
149            .into(),
150        ))
151    }
152}
153
154#[inline]
155fn find_column_idx_by_name(columns: &[ColumnCatalog], col_name: &str) -> Result<usize> {
156    columns
157        .iter()
158        .position(|col| col.column_desc.name == col_name)
159        .ok_or_else(|| {
160            ErrorCode::InvalidInputSyntax(format!("Sink primary key column not found: {}. Please use ',' as the delimiter for different primary key columns.", col_name))
161                .into()
162        })
163}
164
165/// [`StreamSink`] represents a table/connector sink at the very end of the graph.
166#[derive(Debug, Clone, PartialEq, Eq, Hash)]
167pub struct StreamSink {
168    pub base: PlanBase<Stream>,
169    input: PlanRef,
170    sink_desc: SinkDesc,
171    log_store_type: SinkLogStoreType,
172}
173
174impl StreamSink {
175    #[must_use]
176    pub fn new(input: PlanRef, sink_desc: SinkDesc, log_store_type: SinkLogStoreType) -> Self {
177        let base = input.plan_base().clone_with_new_plan_id();
178
179        if let SinkType::AppendOnly = sink_desc.sink_type {
180            let kind = input.stream_kind();
181            assert_matches!(
182                kind,
183                StreamKind::AppendOnly,
184                "{kind} stream cannot be used as input of append-only sink",
185            );
186        }
187
188        Self {
189            base,
190            input,
191            sink_desc,
192            log_store_type,
193        }
194    }
195
196    pub fn sink_desc(&self) -> &SinkDesc {
197        &self.sink_desc
198    }
199
200    fn derive_iceberg_sink_distribution(
201        input: PlanRef,
202        partition_info: Option<PartitionComputeInfo>,
203        columns: &[ColumnCatalog],
204    ) -> Result<(RequiredDist, PlanRef, Option<usize>)> {
205        // For here, we need to add the plan node to compute the partition value, and add it as a extra column.
206        if let Some(partition_info) = partition_info {
207            let input_fields = input.schema().fields();
208
209            let mut exprs: Vec<_> = input_fields
210                .iter()
211                .enumerate()
212                .map(|(idx, field)| InputRef::new(idx, field.data_type.clone()).into())
213                .collect();
214
215            // Add the partition compute expression to the end of the exprs
216            exprs.push(partition_info.convert_to_expression(columns)?);
217            let partition_col_idx = exprs.len() - 1;
218            let project = StreamProject::new(generic::Project::new(exprs.clone(), input));
219            Ok((
220                RequiredDist::shard_by_key(project.schema().len(), &[partition_col_idx]),
221                project.into(),
222                Some(partition_col_idx),
223            ))
224        } else {
225            Ok((
226                RequiredDist::shard_by_key(input.schema().len(), input.expect_stream_key()),
227                input,
228                None,
229            ))
230        }
231    }
232
233    #[allow(clippy::too_many_arguments)]
234    pub fn create(
235        StreamOptimizedLogicalPlanRoot {
236            plan: mut input,
237            required_dist: user_distributed_by,
238            required_order: user_order_by,
239            out_fields: user_cols,
240            out_names,
241            ..
242        }: StreamOptimizedLogicalPlanRoot,
243        name: String,
244        db_name: String,
245        sink_from_table_name: String,
246        target_table: Option<Arc<TableCatalog>>,
247        target_table_mapping: Option<Vec<Option<usize>>>,
248        definition: String,
249        properties: WithOptionsSecResolved,
250        format_desc: Option<SinkFormatDesc>,
251        partition_info: Option<PartitionComputeInfo>,
252        auto_refresh_schema_from_table: Option<Arc<TableCatalog>>,
253    ) -> Result<Self> {
254        let sink_type =
255            Self::derive_sink_type(input.append_only(), &properties, format_desc.as_ref())?;
256
257        let columns = derive_columns(input.schema(), out_names, &user_cols)?;
258        let (pk, _) = derive_pk(input.clone(), user_order_by, &columns);
259        let mut downstream_pk = {
260            let downstream_pk =
261                Self::parse_downstream_pk(&columns, properties.get(DOWNSTREAM_PK_KEY))?;
262            if let Some(t) = &target_table {
263                let user_defined_primary_key_table = t.row_id_index.is_none();
264                let sink_is_append_only =
265                    sink_type == SinkType::AppendOnly || sink_type == SinkType::ForceAppendOnly;
266
267                if !user_defined_primary_key_table && !sink_is_append_only {
268                    return Err(RwError::from(ErrorCode::BindError(
269                        "Only append-only sinks can sink to a table without primary keys. please try to add type = 'append-only' in the with option. e.g. create sink s into t as select * from t1 with (type = 'append-only')".to_owned(),
270                    )));
271                }
272
273                if t.append_only && !sink_is_append_only {
274                    return Err(RwError::from(ErrorCode::BindError(
275                        "Only append-only sinks can sink to a append only table. please try to add type = 'append-only' in the with option. e.g. create sink s into t as select * from t1 with (type = 'append-only')".to_owned(),
276                    )));
277                }
278
279                if sink_type != SinkType::Upsert {
280                    vec![]
281                } else {
282                    let target_table_mapping = target_table_mapping.unwrap();
283
284                    t.pk()
285                        .iter()
286                        .map(|c| {
287                            target_table_mapping[c.column_index].ok_or_else(
288                                || ErrorCode::InvalidInputSyntax("When using non append only sink into table, the primary key of the table must be included in the sink result.".to_owned()).into())
289                        })
290                        .try_collect::<_, _, RwError>()?
291                }
292            } else if properties.get(CREATE_TABLE_IF_NOT_EXISTS) == Some(&"true".to_owned())
293                && sink_type == SinkType::Upsert
294                && downstream_pk.is_empty()
295            {
296                pk.iter().map(|k| k.column_index).collect_vec()
297            } else {
298                downstream_pk
299            }
300        };
301        if let Some(upstream_table) = &auto_refresh_schema_from_table
302            && !downstream_pk.is_empty()
303        {
304            let upstream_table_pk_col_names = upstream_table
305                .pk
306                .iter()
307                .map(|order| {
308                    upstream_table.columns[order.column_index]
309                        .column_desc
310                        .name()
311                })
312                .collect_vec();
313            let sink_pk_col_names = downstream_pk
314                .iter()
315                .map(|&column_index| columns[column_index].name())
316                .collect_vec();
317            if upstream_table_pk_col_names != sink_pk_col_names {
318                return Err(ErrorCode::InvalidInputSyntax(format!("sink with auto schema change should have same pk as upstream table {:?}, but got {:?}", upstream_table_pk_col_names, sink_pk_col_names)).into());
319            }
320        }
321        let mut extra_partition_col_idx = None;
322
323        let required_dist = match input.distribution() {
324            Distribution::Single => RequiredDist::single(),
325            _ => {
326                match properties.get("connector") {
327                    Some(s) if s == "jdbc" && sink_type == SinkType::Upsert => {
328                        if sink_type == SinkType::Upsert && downstream_pk.is_empty() {
329                            return Err(ErrorCode::InvalidInputSyntax(format!(
330                                "Primary key must be defined for upsert JDBC sink. Please specify the \"{key}='pk1,pk2,...'\" in WITH options.",
331                                key = DOWNSTREAM_PK_KEY
332                            )).into());
333                        }
334                        // for upsert jdbc sink we align distribution to downstream to avoid
335                        // lock contentions
336                        RequiredDist::hash_shard(downstream_pk.as_slice())
337                    }
338                    Some(s) if s == ICEBERG_SINK => {
339                        // If user doesn't specify the downstream primary key, we use the stream key as the pk.
340                        if sink_type.is_upsert() && downstream_pk.is_empty() {
341                            downstream_pk = pk.iter().map(|k| k.column_index).collect_vec();
342                        }
343                        let (required_dist, new_input, partition_col_idx) =
344                            Self::derive_iceberg_sink_distribution(
345                                input,
346                                partition_info,
347                                &columns,
348                            )?;
349                        input = new_input;
350                        extra_partition_col_idx = partition_col_idx;
351                        required_dist
352                    }
353                    _ => {
354                        assert_matches!(user_distributed_by, RequiredDist::Any);
355                        if downstream_pk.is_empty() {
356                            RequiredDist::shard_by_key(
357                                input.schema().len(),
358                                input.expect_stream_key(),
359                            )
360                        } else {
361                            // force the same primary key be written into the same sink shard to make sure the sink pk mismatch compaction effective
362                            // https://github.com/risingwavelabs/risingwave/blob/6d88344c286f250ea8a7e7ef6b9d74dea838269e/src/stream/src/executor/sink.rs#L169-L198
363                            RequiredDist::shard_by_key(
364                                input.schema().len(),
365                                downstream_pk.as_slice(),
366                            )
367                        }
368                    }
369                }
370            }
371        };
372        let input = required_dist.streaming_enforce_if_not_satisfies(input)?;
373        let input = if input.ctx().session_ctx().config().streaming_separate_sink()
374            && input.as_stream_exchange().is_none()
375        {
376            StreamExchange::new_no_shuffle(input).into()
377        } else {
378            input
379        };
380
381        let distribution_key = input.distribution().dist_column_indices().to_vec();
382        let create_type = if input.ctx().session_ctx().config().background_ddl()
383            && plan_can_use_background_ddl(&input)
384        {
385            CreateType::Background
386        } else {
387            CreateType::Foreground
388        };
389        let (properties, secret_refs) = properties.into_parts();
390        let is_exactly_once = properties
391            .get("is_exactly_once")
392            .is_some_and(|v| v.to_lowercase() == "true");
393        let mut sink_desc = SinkDesc {
394            id: SinkId::placeholder(),
395            name,
396            db_name,
397            sink_from_name: sink_from_table_name,
398            definition,
399            columns,
400            plan_pk: pk,
401            downstream_pk,
402            distribution_key,
403            properties,
404            secret_refs,
405            sink_type,
406            format_desc,
407            target_table: target_table.as_ref().map(|catalog| catalog.id()),
408            extra_partition_col_idx,
409            create_type,
410            is_exactly_once,
411            auto_refresh_schema_from_table: auto_refresh_schema_from_table
412                .as_ref()
413                .map(|table| table.id),
414        };
415
416        let unsupported_sink = |sink: &str| -> Result<_> {
417            Err(ErrorCode::InvalidInputSyntax(format!("unsupported sink type {}", sink)).into())
418        };
419
420        // check and ensure that the sink connector is specified and supported
421        let sink_decouple = match sink_desc.properties.get(CONNECTOR_TYPE_KEY) {
422            Some(connector) => {
423                let connector_type = connector.to_lowercase();
424                match_sink_name_str!(
425                    connector_type.as_str(),
426                    SinkType,
427                    {
428                        // the table sink is created by with properties
429                        if connector == TABLE_SINK && sink_desc.target_table.is_none() {
430                            unsupported_sink(TABLE_SINK)
431                        } else {
432                            SinkType::set_default_commit_checkpoint_interval(
433                                &mut sink_desc,
434                                &input.ctx().session_ctx().config().sink_decouple(),
435                            )?;
436                            let support_schema_change = SinkType::support_schema_change();
437                            if !support_schema_change && auto_refresh_schema_from_table.is_some() {
438                                return Err(ErrorCode::InvalidInputSyntax(format!(
439                                    "{} sink does not support schema change",
440                                    connector_type
441                                ))
442                                .into());
443                            }
444                            SinkType::is_sink_decouple(
445                                &input.ctx().session_ctx().config().sink_decouple(),
446                            )
447                            .map_err(Into::into)
448                        }
449                    },
450                    |other: &str| unsupported_sink(other)
451                )?
452            }
453            None => {
454                return Err(ErrorCode::InvalidInputSyntax(
455                    "connector not specified when create sink".to_owned(),
456                )
457                .into());
458            }
459        };
460        let hint_string =
461            |expected: bool| format!("Please run `set sink_decouple = {}` first.", expected);
462        if !sink_decouple {
463            // For file sink, it must have sink_decouple turned on.
464            if sink_desc.is_file_sink() {
465                return Err(ErrorCode::NotSupported(
466                    "File sink can only be created with sink_decouple enabled.".to_owned(),
467                    hint_string(true),
468                )
469                .into());
470            }
471            if sink_desc.is_exactly_once {
472                return Err(ErrorCode::NotSupported(
473                    "Exactly once sink can only be created with sink_decouple enabled.".to_owned(),
474                    hint_string(true),
475                )
476                .into());
477            }
478        }
479        if sink_decouple && auto_refresh_schema_from_table.is_some() {
480            return Err(ErrorCode::NotSupported(
481                "sink with auto schema refresh can only be created with sink_decouple disabled."
482                    .to_owned(),
483                hint_string(false),
484            )
485            .into());
486        }
487        let log_store_type = if sink_decouple {
488            SinkLogStoreType::KvLogStore
489        } else {
490            SinkLogStoreType::InMemoryLogStore
491        };
492
493        // sink into table should have logstore for sink_decouple
494        let input = if sink_decouple && target_table.is_some() {
495            StreamSyncLogStore::new(input).into()
496        } else {
497            input
498        };
499
500        Ok(Self::new(input, sink_desc, log_store_type))
501    }
502
503    fn sink_type_in_prop(properties: &WithOptionsSecResolved) -> Result<Option<SinkType>> {
504        if let Some(sink_type) = properties.get(SINK_TYPE_OPTION) {
505            if sink_type == SINK_TYPE_APPEND_ONLY {
506                return Ok(Some(SinkType::AppendOnly));
507            } else if sink_type == SINK_TYPE_DEBEZIUM || sink_type == SINK_TYPE_UPSERT {
508                return Ok(Some(SinkType::Upsert));
509            } else {
510                return Err(ErrorCode::InvalidInputSyntax(format!(
511                    "`{}` must be {}, {}, or {}",
512                    SINK_TYPE_OPTION, SINK_TYPE_APPEND_ONLY, SINK_TYPE_DEBEZIUM, SINK_TYPE_UPSERT
513                ))
514                .into());
515            }
516        }
517        Ok(None)
518    }
519
520    fn is_user_force_append_only(properties: &WithOptionsSecResolved) -> Result<bool> {
521        if properties.contains_key(SINK_USER_FORCE_APPEND_ONLY_OPTION)
522            && !properties.value_eq_ignore_case(SINK_USER_FORCE_APPEND_ONLY_OPTION, "true")
523            && !properties.value_eq_ignore_case(SINK_USER_FORCE_APPEND_ONLY_OPTION, "false")
524        {
525            return Err(ErrorCode::InvalidInputSyntax(format!(
526                "`{}` must be true or false",
527                SINK_USER_FORCE_APPEND_ONLY_OPTION
528            ))
529            .into());
530        }
531        Ok(properties.value_eq_ignore_case(SINK_USER_FORCE_APPEND_ONLY_OPTION, "true"))
532    }
533
534    fn derive_sink_type(
535        input_append_only: bool,
536        properties: &WithOptionsSecResolved,
537        format_desc: Option<&SinkFormatDesc>,
538    ) -> Result<SinkType> {
539        let frontend_derived_append_only = input_append_only;
540        let (user_defined_sink_type, user_force_append_only, syntax_legacy) = match format_desc {
541            Some(f) => (
542                Some(match f.format {
543                    SinkFormat::AppendOnly => SinkType::AppendOnly,
544                    SinkFormat::Upsert | SinkFormat::Debezium => SinkType::Upsert,
545                }),
546                Self::is_user_force_append_only(&WithOptionsSecResolved::without_secrets(
547                    f.options.clone(),
548                ))?,
549                false,
550            ),
551            None => (
552                Self::sink_type_in_prop(properties)?,
553                Self::is_user_force_append_only(properties)?,
554                true,
555            ),
556        };
557
558        if user_force_append_only
559            && user_defined_sink_type.is_some()
560            && user_defined_sink_type != Some(SinkType::AppendOnly)
561        {
562            return Err(ErrorCode::InvalidInputSyntax(
563                "The force_append_only can be only used for type = \'append-only\'".to_owned(),
564            )
565            .into());
566        }
567
568        let user_force_append_only = if user_force_append_only && frontend_derived_append_only {
569            false
570        } else {
571            user_force_append_only
572        };
573
574        if user_force_append_only && user_defined_sink_type != Some(SinkType::AppendOnly) {
575            return Err(ErrorCode::InvalidInputSyntax(format!(
576                "Cannot force the sink to be append-only without \"{}\".",
577                if syntax_legacy {
578                    "type='append-only'"
579                } else {
580                    "FORMAT PLAIN"
581                }
582            ))
583            .into());
584        }
585
586        if let Some(user_defined_sink_type) = user_defined_sink_type {
587            if user_defined_sink_type == SinkType::AppendOnly {
588                if user_force_append_only {
589                    return Ok(SinkType::ForceAppendOnly);
590                }
591                if !frontend_derived_append_only {
592                    return Err(ErrorCode::InvalidInputSyntax(format!(
593                        "The sink cannot be append-only. Please add \"force_append_only='true'\" in {} options to force the sink to be append-only. \
594                                Notice that this will cause the sink executor to drop DELETE messages and convert UPDATE messages to INSERT.",
595                        if syntax_legacy { "WITH" } else { "FORMAT ENCODE" }
596                    ))
597                        .into());
598                } else {
599                    return Ok(SinkType::AppendOnly);
600                }
601            }
602
603            Ok(user_defined_sink_type)
604        } else {
605            match frontend_derived_append_only {
606                true => Ok(SinkType::AppendOnly),
607                false => Ok(SinkType::Upsert),
608            }
609        }
610    }
611
612    /// Extract user-defined downstream pk columns from with options. Return the indices of the pk
613    /// columns.
614    ///
615    /// The format of `downstream_pk_str` should be 'col1,col2,...' (delimited by `,`) in order to
616    /// get parsed.
617    fn parse_downstream_pk(
618        columns: &[ColumnCatalog],
619        downstream_pk_str: Option<&String>,
620    ) -> Result<Vec<usize>> {
621        match downstream_pk_str {
622            Some(downstream_pk_str) => {
623                // If the user defines the downstream primary key, we find out their indices.
624                let downstream_pk = downstream_pk_str.split(',').collect_vec();
625                let mut downstream_pk_indices = Vec::with_capacity(downstream_pk.len());
626                for key in downstream_pk {
627                    let trimmed_key = key.trim();
628                    if trimmed_key.is_empty() {
629                        continue;
630                    }
631                    downstream_pk_indices.push(find_column_idx_by_name(columns, trimmed_key)?);
632                }
633                Ok(downstream_pk_indices)
634            }
635            None => {
636                // The user doesn't define the downstream primary key and we simply return an empty
637                // vector.
638                Ok(Vec::new())
639            }
640        }
641    }
642
643    /// The table schema is: | epoch | seq id | row op | sink columns |
644    /// Pk is: | epoch | seq id |
645    fn infer_kv_log_store_table_catalog(&self) -> TableCatalog {
646        infer_kv_log_store_table_catalog_inner(&self.input, &self.sink_desc().columns)
647    }
648}
649
650impl PlanTreeNodeUnary<Stream> for StreamSink {
651    fn input(&self) -> PlanRef {
652        self.input.clone()
653    }
654
655    fn clone_with_input(&self, input: PlanRef) -> Self {
656        Self::new(input, self.sink_desc.clone(), self.log_store_type)
657        // TODO(nanderstabel): Add assertions (assert_eq!)
658    }
659}
660
661impl_plan_tree_node_for_unary! { Stream, StreamSink }
662
663impl Distill for StreamSink {
664    fn distill<'a>(&self) -> XmlNode<'a> {
665        let sink_type = if self.sink_desc.sink_type.is_append_only() {
666            "append-only"
667        } else {
668            "upsert"
669        };
670        let column_names = self
671            .sink_desc
672            .columns
673            .iter()
674            .map(|col| col.name_with_hidden().to_string())
675            .map(Pretty::from)
676            .collect();
677        let column_names = Pretty::Array(column_names);
678        let mut vec = Vec::with_capacity(3);
679        vec.push(("type", Pretty::from(sink_type)));
680        vec.push(("columns", column_names));
681        if self.sink_desc.sink_type.is_upsert() {
682            let sink_pk = IndicesDisplay {
683                indices: &self.sink_desc.downstream_pk.clone(),
684                schema: self.base.schema(),
685            };
686            vec.push(("downstream_pk", sink_pk.distill()));
687        }
688        childless_record("StreamSink", vec)
689    }
690}
691
692impl StreamNode for StreamSink {
693    fn to_stream_prost_body(&self, state: &mut BuildFragmentGraphState) -> PbNodeBody {
694        use risingwave_pb::stream_plan::*;
695
696        // We need to create a table for sink with a kv log store.
697        let table = self
698            .infer_kv_log_store_table_catalog()
699            .with_id(state.gen_table_id_wrapped());
700
701        PbNodeBody::Sink(Box::new(SinkNode {
702            sink_desc: Some(self.sink_desc.to_proto()),
703            table: Some(table.to_internal_table_prost()),
704            log_store_type: self.log_store_type as i32,
705            rate_limit: self.base.ctx().overwrite_options().sink_rate_limit,
706        }))
707    }
708}
709
710impl ExprRewritable<Stream> for StreamSink {}
711
712impl ExprVisitable for StreamSink {}
713
714#[cfg(test)]
715mod test {
716    use risingwave_common::catalog::{ColumnCatalog, ColumnDesc, ColumnId};
717    use risingwave_common::types::{DataType, StructType};
718    use risingwave_common::util::iter_util::ZipEqDebug;
719    use risingwave_pb::expr::expr_node::Type;
720
721    use super::{IcebergPartitionInfo, *};
722    use crate::expr::{Expr, ExprImpl};
723
724    fn create_column_catalog() -> Vec<ColumnCatalog> {
725        vec![
726            ColumnCatalog {
727                column_desc: ColumnDesc::named("v1", ColumnId::new(1), DataType::Int32),
728                is_hidden: false,
729            },
730            ColumnCatalog {
731                column_desc: ColumnDesc::named("v2", ColumnId::new(2), DataType::Timestamptz),
732                is_hidden: false,
733            },
734            ColumnCatalog {
735                column_desc: ColumnDesc::named("v3", ColumnId::new(2), DataType::Timestamp),
736                is_hidden: false,
737            },
738        ]
739    }
740
741    #[test]
742    fn test_iceberg_convert_to_expression() {
743        let partition_type = StructType::new(vec![
744            ("f1", DataType::Int32),
745            ("f2", DataType::Int32),
746            ("f3", DataType::Int32),
747            ("f4", DataType::Int32),
748            ("f5", DataType::Int32),
749            ("f6", DataType::Int32),
750            ("f7", DataType::Int32),
751            ("f8", DataType::Int32),
752            ("f9", DataType::Int32),
753        ]);
754        let partition_fields = vec![
755            ("v1".into(), Transform::Identity),
756            ("v1".into(), Transform::Bucket(10)),
757            ("v1".into(), Transform::Truncate(3)),
758            ("v2".into(), Transform::Year),
759            ("v2".into(), Transform::Month),
760            ("v3".into(), Transform::Day),
761            ("v3".into(), Transform::Hour),
762            ("v1".into(), Transform::Void),
763            ("v3".into(), Transform::Void),
764        ];
765        let partition_info = IcebergPartitionInfo {
766            partition_type: partition_type.clone(),
767            partition_fields: partition_fields.clone(),
768        };
769        let catalog = create_column_catalog();
770        let actual_expr = partition_info.convert_to_expression(&catalog).unwrap();
771        let actual_expr = actual_expr.as_function_call().unwrap();
772
773        assert_eq!(
774            actual_expr.return_type(),
775            DataType::Struct(partition_type.clone())
776        );
777        assert_eq!(actual_expr.inputs().len(), partition_fields.len());
778        assert_eq!(actual_expr.func_type(), Type::Row);
779
780        for ((expr, (_, transform)), (_, expect_type)) in actual_expr
781            .inputs()
782            .iter()
783            .zip_eq_debug(partition_fields.iter())
784            .zip_eq_debug(partition_type.iter())
785        {
786            match transform {
787                Transform::Identity => {
788                    assert!(expr.is_input_ref());
789                    assert_eq!(expr.return_type(), *expect_type);
790                }
791                Transform::Void => {
792                    assert!(expr.is_literal());
793                    assert_eq!(expr.return_type(), *expect_type);
794                }
795                _ => {
796                    let expr = expr.as_function_call().unwrap();
797                    assert_eq!(expr.func_type(), Type::IcebergTransform);
798                    assert_eq!(expr.inputs().len(), 2);
799                    assert_eq!(
800                        expr.inputs()[0],
801                        ExprImpl::literal_varchar(transform.to_string())
802                    );
803                }
804            }
805        }
806    }
807}