risingwave_frontend/optimizer/plan_node/
stream_sink.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::assert_matches::assert_matches;
16use std::sync::Arc;
17
18use iceberg::spec::Transform;
19use itertools::Itertools;
20use pretty_xmlish::{Pretty, XmlNode};
21use risingwave_common::catalog::{ColumnCatalog, CreateType, FieldLike};
22use risingwave_common::types::{DataType, StructType};
23use risingwave_common::util::iter_util::ZipEqDebug;
24use risingwave_connector::match_sink_name_str;
25use risingwave_connector::sink::catalog::desc::SinkDesc;
26use risingwave_connector::sink::catalog::{SinkFormat, SinkFormatDesc, SinkId, SinkType};
27use risingwave_connector::sink::file_sink::fs::FsSink;
28use risingwave_connector::sink::iceberg::ICEBERG_SINK;
29use risingwave_connector::sink::trivial::TABLE_SINK;
30use risingwave_connector::sink::{
31    CONNECTOR_TYPE_KEY, SINK_TYPE_APPEND_ONLY, SINK_TYPE_DEBEZIUM, SINK_TYPE_OPTION,
32    SINK_TYPE_UPSERT, SINK_USER_FORCE_APPEND_ONLY_OPTION,
33};
34use risingwave_pb::expr::expr_node::Type;
35use risingwave_pb::stream_plan::SinkLogStoreType;
36use risingwave_pb::stream_plan::stream_node::PbNodeBody;
37
38use super::derive::{derive_columns, derive_pk};
39use super::stream::prelude::*;
40use super::utils::{
41    Distill, IndicesDisplay, childless_record, infer_kv_log_store_table_catalog_inner,
42};
43use super::{
44    ExprRewritable, PlanBase, StreamExchange, StreamNode, StreamPlanRef as PlanRef, StreamProject,
45    StreamSyncLogStore, generic,
46};
47use crate::TableCatalog;
48use crate::error::{ErrorCode, Result, RwError};
49use crate::expr::{ExprImpl, FunctionCall, InputRef};
50use crate::optimizer::StreamOptimizedLogicalPlanRoot;
51use crate::optimizer::plan_node::PlanTreeNodeUnary;
52use crate::optimizer::plan_node::expr_visitable::ExprVisitable;
53use crate::optimizer::plan_node::utils::plan_can_use_background_ddl;
54use crate::optimizer::property::{Distribution, RequiredDist};
55use crate::stream_fragmenter::BuildFragmentGraphState;
56use crate::utils::WithOptionsSecResolved;
57
58const DOWNSTREAM_PK_KEY: &str = "primary_key";
59
60/// ## Why we need `PartitionComputeInfo`?
61///
62/// For some sink, it will write the data into different file based on the partition value. E.g. iceberg sink(<https://iceberg.apache.org/spec/#partitioning>)
63/// For this kind of sink, the file num can be reduced if we can shuffle the data based on the partition value. More details can be found in <https://github.com/risingwavelabs/rfcs/pull/77>.
64/// So if the `PartitionComputeInfo` provided, we will create a `StreamProject` node to compute the partition value and shuffle the data based on the partition value before the sink.
65///
66/// ## What is `PartitionComputeInfo`?
67/// The `PartitionComputeInfo` contains the information about partition compute. The stream sink will use
68/// these information to create the corresponding expression in `StreamProject` node.
69///
70/// #TODO
71/// Maybe we should move this in sink?
72pub enum PartitionComputeInfo {
73    Iceberg(IcebergPartitionInfo),
74}
75
76impl PartitionComputeInfo {
77    pub fn convert_to_expression(self, columns: &[ColumnCatalog]) -> Result<ExprImpl> {
78        match self {
79            PartitionComputeInfo::Iceberg(info) => info.convert_to_expression(columns),
80        }
81    }
82}
83
84pub struct IcebergPartitionInfo {
85    pub partition_type: StructType,
86    // (partition_field_name, partition_field_transform)
87    pub partition_fields: Vec<(String, Transform)>,
88}
89
90impl IcebergPartitionInfo {
91    #[inline]
92    fn transform_to_expression(
93        transform: &Transform,
94        col_id: usize,
95        columns: &[ColumnCatalog],
96        result_type: DataType,
97    ) -> Result<ExprImpl> {
98        match transform {
99            Transform::Identity => {
100                if columns[col_id].column_desc.data_type != result_type {
101                    return Err(ErrorCode::InvalidInputSyntax(format!(
102                        "The partition field {} has type {}, but the partition field is {}",
103                        columns[col_id].column_desc.name,
104                        columns[col_id].column_desc.data_type,
105                        result_type
106                    ))
107                    .into());
108                }
109                Ok(ExprImpl::InputRef(
110                    InputRef::new(col_id, result_type).into(),
111                ))
112            }
113            Transform::Void => Ok(ExprImpl::literal_null(result_type)),
114            _ => Ok(ExprImpl::FunctionCall(
115                FunctionCall::new_unchecked(
116                    Type::IcebergTransform,
117                    vec![
118                        ExprImpl::literal_varchar(transform.to_string()),
119                        ExprImpl::InputRef(
120                            InputRef::new(col_id, columns[col_id].column_desc.data_type.clone())
121                                .into(),
122                        ),
123                    ],
124                    result_type,
125                )
126                .into(),
127            )),
128        }
129    }
130
131    pub fn convert_to_expression(self, columns: &[ColumnCatalog]) -> Result<ExprImpl> {
132        let child_exprs = self
133            .partition_fields
134            .into_iter()
135            .zip_eq_debug(self.partition_type.iter())
136            .map(|((field_name, transform), (_, result_type))| {
137                let col_id = find_column_idx_by_name(columns, &field_name)?;
138                Self::transform_to_expression(&transform, col_id, columns, result_type.clone())
139            })
140            .collect::<Result<Vec<_>>>()?;
141
142        Ok(ExprImpl::FunctionCall(
143            FunctionCall::new_unchecked(
144                Type::Row,
145                child_exprs,
146                DataType::Struct(self.partition_type),
147            )
148            .into(),
149        ))
150    }
151}
152
153#[inline]
154fn find_column_idx_by_name(columns: &[ColumnCatalog], col_name: &str) -> Result<usize> {
155    columns
156        .iter()
157        .position(|col| col.column_desc.name == col_name)
158        .ok_or_else(|| {
159            ErrorCode::InvalidInputSyntax(format!("Sink primary key column not found: {}. Please use ',' as the delimiter for different primary key columns.", col_name))
160                .into()
161        })
162}
163
164/// [`StreamSink`] represents a table/connector sink at the very end of the graph.
165#[derive(Debug, Clone, PartialEq, Eq, Hash)]
166pub struct StreamSink {
167    pub base: PlanBase<Stream>,
168    input: PlanRef,
169    sink_desc: SinkDesc,
170    log_store_type: SinkLogStoreType,
171}
172
173impl StreamSink {
174    #[must_use]
175    pub fn new(input: PlanRef, sink_desc: SinkDesc, log_store_type: SinkLogStoreType) -> Self {
176        let base = input.plan_base().clone_with_new_plan_id();
177
178        if let SinkType::AppendOnly = sink_desc.sink_type {
179            let kind = input.stream_kind();
180            assert_matches!(
181                kind,
182                StreamKind::AppendOnly,
183                "{kind} stream cannot be used as input of append-only sink",
184            );
185        }
186
187        Self {
188            base,
189            input,
190            sink_desc,
191            log_store_type,
192        }
193    }
194
195    pub fn sink_desc(&self) -> &SinkDesc {
196        &self.sink_desc
197    }
198
199    fn derive_iceberg_sink_distribution(
200        input: PlanRef,
201        partition_info: Option<PartitionComputeInfo>,
202        columns: &[ColumnCatalog],
203    ) -> Result<(RequiredDist, PlanRef, Option<usize>)> {
204        // For here, we need to add the plan node to compute the partition value, and add it as a extra column.
205        if let Some(partition_info) = partition_info {
206            let input_fields = input.schema().fields();
207
208            let mut exprs: Vec<_> = input_fields
209                .iter()
210                .enumerate()
211                .map(|(idx, field)| InputRef::new(idx, field.data_type.clone()).into())
212                .collect();
213
214            // Add the partition compute expression to the end of the exprs
215            exprs.push(partition_info.convert_to_expression(columns)?);
216            let partition_col_idx = exprs.len() - 1;
217            let project = StreamProject::new(generic::Project::new(exprs.clone(), input));
218            Ok((
219                RequiredDist::shard_by_key(project.schema().len(), &[partition_col_idx]),
220                project.into(),
221                Some(partition_col_idx),
222            ))
223        } else {
224            Ok((
225                RequiredDist::shard_by_key(input.schema().len(), input.expect_stream_key()),
226                input,
227                None,
228            ))
229        }
230    }
231
232    #[allow(clippy::too_many_arguments)]
233    pub fn create(
234        StreamOptimizedLogicalPlanRoot {
235            plan: mut input,
236            required_dist: user_distributed_by,
237            required_order: user_order_by,
238            out_fields: user_cols,
239            out_names,
240            ..
241        }: StreamOptimizedLogicalPlanRoot,
242        name: String,
243        db_name: String,
244        sink_from_table_name: String,
245        target_table: Option<Arc<TableCatalog>>,
246        target_table_mapping: Option<Vec<Option<usize>>>,
247        definition: String,
248        properties: WithOptionsSecResolved,
249        format_desc: Option<SinkFormatDesc>,
250        partition_info: Option<PartitionComputeInfo>,
251        auto_refresh_schema_from_table: Option<Arc<TableCatalog>>,
252    ) -> Result<Self> {
253        let sink_type =
254            Self::derive_sink_type(input.append_only(), &properties, format_desc.as_ref())?;
255
256        let columns = derive_columns(input.schema(), out_names, &user_cols)?;
257        let (pk, _) = derive_pk(input.clone(), user_order_by, &columns);
258        let mut downstream_pk = {
259            let from_properties =
260                Self::parse_downstream_pk(&columns, properties.get(DOWNSTREAM_PK_KEY))?;
261            if let Some(t) = &target_table {
262                let user_defined_primary_key_table = t.row_id_index.is_none();
263                let sink_is_append_only =
264                    sink_type == SinkType::AppendOnly || sink_type == SinkType::ForceAppendOnly;
265
266                if !user_defined_primary_key_table && !sink_is_append_only {
267                    return Err(RwError::from(ErrorCode::BindError(
268                        "Only append-only sinks can sink to a table without primary keys. please try to add type = 'append-only' in the with option. e.g. create sink s into t as select * from t1 with (type = 'append-only')".to_owned(),
269                    )));
270                }
271
272                if t.append_only && !sink_is_append_only {
273                    return Err(RwError::from(ErrorCode::BindError(
274                        "Only append-only sinks can sink to a append only table. please try to add type = 'append-only' in the with option. e.g. create sink s into t as select * from t1 with (type = 'append-only')".to_owned(),
275                    )));
276                }
277
278                if sink_type != SinkType::Upsert {
279                    vec![]
280                } else {
281                    let target_table_mapping = target_table_mapping.unwrap();
282
283                    t.pk()
284                        .iter()
285                        .map(|c| {
286                            target_table_mapping[c.column_index].ok_or_else(
287                                || ErrorCode::InvalidInputSyntax("When using non append only sink into table, the primary key of the table must be included in the sink result.".to_owned()).into())
288                        })
289                        .try_collect::<_, _, RwError>()?
290                }
291            } else {
292                from_properties
293            }
294        };
295        if let Some(upstream_table) = &auto_refresh_schema_from_table
296            && !downstream_pk.is_empty()
297        {
298            let upstream_table_pk_col_names = upstream_table
299                .pk
300                .iter()
301                .map(|order| {
302                    upstream_table.columns[order.column_index]
303                        .column_desc
304                        .name()
305                })
306                .collect_vec();
307            let sink_pk_col_names = downstream_pk
308                .iter()
309                .map(|&column_index| columns[column_index].name())
310                .collect_vec();
311            if upstream_table_pk_col_names != sink_pk_col_names {
312                return Err(ErrorCode::InvalidInputSyntax(format!("sink with auto schema change should have same pk as upstream table {:?}, but got {:?}", upstream_table_pk_col_names, sink_pk_col_names)).into());
313            }
314        }
315        let mut extra_partition_col_idx = None;
316
317        let required_dist = match input.distribution() {
318            Distribution::Single => RequiredDist::single(),
319            _ => {
320                match properties.get("connector") {
321                    Some(s) if s == "jdbc" && sink_type == SinkType::Upsert => {
322                        if sink_type == SinkType::Upsert && downstream_pk.is_empty() {
323                            return Err(ErrorCode::InvalidInputSyntax(format!(
324                                "Primary key must be defined for upsert JDBC sink. Please specify the \"{key}='pk1,pk2,...'\" in WITH options.",
325                                key = DOWNSTREAM_PK_KEY
326                            )).into());
327                        }
328                        // for upsert jdbc sink we align distribution to downstream to avoid
329                        // lock contentions
330                        RequiredDist::hash_shard(downstream_pk.as_slice())
331                    }
332                    Some(s) if s == ICEBERG_SINK => {
333                        // If user doesn't specify the downstream primary key, we use the stream key as the pk.
334                        if sink_type.is_upsert() && downstream_pk.is_empty() {
335                            downstream_pk = pk.iter().map(|k| k.column_index).collect_vec();
336                        }
337                        let (required_dist, new_input, partition_col_idx) =
338                            Self::derive_iceberg_sink_distribution(
339                                input,
340                                partition_info,
341                                &columns,
342                            )?;
343                        input = new_input;
344                        extra_partition_col_idx = partition_col_idx;
345                        required_dist
346                    }
347                    _ => {
348                        assert_matches!(user_distributed_by, RequiredDist::Any);
349                        if downstream_pk.is_empty() {
350                            RequiredDist::shard_by_key(
351                                input.schema().len(),
352                                input.expect_stream_key(),
353                            )
354                        } else {
355                            // force the same primary key be written into the same sink shard to make sure the sink pk mismatch compaction effective
356                            // https://github.com/risingwavelabs/risingwave/blob/6d88344c286f250ea8a7e7ef6b9d74dea838269e/src/stream/src/executor/sink.rs#L169-L198
357                            RequiredDist::shard_by_key(
358                                input.schema().len(),
359                                downstream_pk.as_slice(),
360                            )
361                        }
362                    }
363                }
364            }
365        };
366        let input = required_dist.streaming_enforce_if_not_satisfies(input)?;
367        let input = if input.ctx().session_ctx().config().streaming_separate_sink()
368            && input.as_stream_exchange().is_none()
369        {
370            StreamExchange::new_no_shuffle(input).into()
371        } else {
372            input
373        };
374
375        let distribution_key = input.distribution().dist_column_indices().to_vec();
376        let create_type = if input.ctx().session_ctx().config().background_ddl()
377            && plan_can_use_background_ddl(&input)
378        {
379            CreateType::Background
380        } else {
381            CreateType::Foreground
382        };
383        let (properties, secret_refs) = properties.into_parts();
384        let is_exactly_once = properties
385            .get("is_exactly_once")
386            .is_some_and(|v| v.to_lowercase() == "true");
387        let mut sink_desc = SinkDesc {
388            id: SinkId::placeholder(),
389            name,
390            db_name,
391            sink_from_name: sink_from_table_name,
392            definition,
393            columns,
394            plan_pk: pk,
395            downstream_pk,
396            distribution_key,
397            properties,
398            secret_refs,
399            sink_type,
400            format_desc,
401            target_table: target_table.as_ref().map(|catalog| catalog.id()),
402            extra_partition_col_idx,
403            create_type,
404            is_exactly_once,
405            auto_refresh_schema_from_table: auto_refresh_schema_from_table
406                .as_ref()
407                .map(|table| table.id),
408        };
409
410        let unsupported_sink = |sink: &str| -> Result<_> {
411            Err(ErrorCode::InvalidInputSyntax(format!("unsupported sink type {}", sink)).into())
412        };
413
414        // check and ensure that the sink connector is specified and supported
415        let sink_decouple = match sink_desc.properties.get(CONNECTOR_TYPE_KEY) {
416            Some(connector) => {
417                let connector_type = connector.to_lowercase();
418                match_sink_name_str!(
419                    connector_type.as_str(),
420                    SinkType,
421                    {
422                        // the table sink is created by with properties
423                        if connector == TABLE_SINK && sink_desc.target_table.is_none() {
424                            unsupported_sink(TABLE_SINK)
425                        } else {
426                            SinkType::set_default_commit_checkpoint_interval(
427                                &mut sink_desc,
428                                &input.ctx().session_ctx().config().sink_decouple(),
429                            )?;
430                            let support_schema_change = SinkType::support_schema_change();
431                            if !support_schema_change && auto_refresh_schema_from_table.is_some() {
432                                return Err(ErrorCode::InvalidInputSyntax(format!(
433                                    "{} sink does not support schema change",
434                                    connector_type
435                                ))
436                                .into());
437                            }
438                            SinkType::is_sink_decouple(
439                                &input.ctx().session_ctx().config().sink_decouple(),
440                            )
441                            .map_err(Into::into)
442                        }
443                    },
444                    |other: &str| unsupported_sink(other)
445                )?
446            }
447            None => {
448                return Err(ErrorCode::InvalidInputSyntax(
449                    "connector not specified when create sink".to_owned(),
450                )
451                .into());
452            }
453        };
454        let hint_string =
455            |expected: bool| format!("Please run `set sink_decouple = {}` first.", expected);
456        if !sink_decouple {
457            // For file sink, it must have sink_decouple turned on.
458            if sink_desc.is_file_sink() {
459                return Err(ErrorCode::NotSupported(
460                    "File sink can only be created with sink_decouple enabled.".to_owned(),
461                    hint_string(true),
462                )
463                .into());
464            }
465            if sink_desc.is_exactly_once {
466                return Err(ErrorCode::NotSupported(
467                    "Exactly once sink can only be created with sink_decouple enabled.".to_owned(),
468                    hint_string(true),
469                )
470                .into());
471            }
472        }
473        if sink_decouple && auto_refresh_schema_from_table.is_some() {
474            return Err(ErrorCode::NotSupported(
475                "sink with auto schema refresh can only be created with sink_decouple disabled."
476                    .to_owned(),
477                hint_string(false),
478            )
479            .into());
480        }
481        let log_store_type = if sink_decouple {
482            SinkLogStoreType::KvLogStore
483        } else {
484            SinkLogStoreType::InMemoryLogStore
485        };
486
487        // sink into table should have logstore for sink_decouple
488        let input = if sink_decouple && target_table.is_some() {
489            StreamSyncLogStore::new(input).into()
490        } else {
491            input
492        };
493
494        Ok(Self::new(input, sink_desc, log_store_type))
495    }
496
497    fn sink_type_in_prop(properties: &WithOptionsSecResolved) -> Result<Option<SinkType>> {
498        if let Some(sink_type) = properties.get(SINK_TYPE_OPTION) {
499            if sink_type == SINK_TYPE_APPEND_ONLY {
500                return Ok(Some(SinkType::AppendOnly));
501            } else if sink_type == SINK_TYPE_DEBEZIUM || sink_type == SINK_TYPE_UPSERT {
502                return Ok(Some(SinkType::Upsert));
503            } else {
504                return Err(ErrorCode::InvalidInputSyntax(format!(
505                    "`{}` must be {}, {}, or {}",
506                    SINK_TYPE_OPTION, SINK_TYPE_APPEND_ONLY, SINK_TYPE_DEBEZIUM, SINK_TYPE_UPSERT
507                ))
508                .into());
509            }
510        }
511        Ok(None)
512    }
513
514    fn is_user_force_append_only(properties: &WithOptionsSecResolved) -> Result<bool> {
515        if properties.contains_key(SINK_USER_FORCE_APPEND_ONLY_OPTION)
516            && !properties.value_eq_ignore_case(SINK_USER_FORCE_APPEND_ONLY_OPTION, "true")
517            && !properties.value_eq_ignore_case(SINK_USER_FORCE_APPEND_ONLY_OPTION, "false")
518        {
519            return Err(ErrorCode::InvalidInputSyntax(format!(
520                "`{}` must be true or false",
521                SINK_USER_FORCE_APPEND_ONLY_OPTION
522            ))
523            .into());
524        }
525        Ok(properties.value_eq_ignore_case(SINK_USER_FORCE_APPEND_ONLY_OPTION, "true"))
526    }
527
528    fn derive_sink_type(
529        input_append_only: bool,
530        properties: &WithOptionsSecResolved,
531        format_desc: Option<&SinkFormatDesc>,
532    ) -> Result<SinkType> {
533        let frontend_derived_append_only = input_append_only;
534        let (user_defined_sink_type, user_force_append_only, syntax_legacy) = match format_desc {
535            Some(f) => (
536                Some(match f.format {
537                    SinkFormat::AppendOnly => SinkType::AppendOnly,
538                    SinkFormat::Upsert | SinkFormat::Debezium => SinkType::Upsert,
539                }),
540                Self::is_user_force_append_only(&WithOptionsSecResolved::without_secrets(
541                    f.options.clone(),
542                ))?,
543                false,
544            ),
545            None => (
546                Self::sink_type_in_prop(properties)?,
547                Self::is_user_force_append_only(properties)?,
548                true,
549            ),
550        };
551
552        if user_force_append_only
553            && user_defined_sink_type.is_some()
554            && user_defined_sink_type != Some(SinkType::AppendOnly)
555        {
556            return Err(ErrorCode::InvalidInputSyntax(
557                "The force_append_only can be only used for type = \'append-only\'".to_owned(),
558            )
559            .into());
560        }
561
562        let user_force_append_only = if user_force_append_only && frontend_derived_append_only {
563            false
564        } else {
565            user_force_append_only
566        };
567
568        if user_force_append_only && user_defined_sink_type != Some(SinkType::AppendOnly) {
569            return Err(ErrorCode::InvalidInputSyntax(format!(
570                "Cannot force the sink to be append-only without \"{}\".",
571                if syntax_legacy {
572                    "type='append-only'"
573                } else {
574                    "FORMAT PLAIN"
575                }
576            ))
577            .into());
578        }
579
580        if let Some(user_defined_sink_type) = user_defined_sink_type {
581            if user_defined_sink_type == SinkType::AppendOnly {
582                if user_force_append_only {
583                    return Ok(SinkType::ForceAppendOnly);
584                }
585                if !frontend_derived_append_only {
586                    return Err(ErrorCode::InvalidInputSyntax(format!(
587                        "The sink cannot be append-only. Please add \"force_append_only='true'\" in {} options to force the sink to be append-only. \
588                                Notice that this will cause the sink executor to drop DELETE messages and convert UPDATE messages to INSERT.",
589                        if syntax_legacy { "WITH" } else { "FORMAT ENCODE" }
590                    ))
591                        .into());
592                } else {
593                    return Ok(SinkType::AppendOnly);
594                }
595            }
596
597            Ok(user_defined_sink_type)
598        } else {
599            match frontend_derived_append_only {
600                true => Ok(SinkType::AppendOnly),
601                false => Ok(SinkType::Upsert),
602            }
603        }
604    }
605
606    /// Extract user-defined downstream pk columns from with options. Return the indices of the pk
607    /// columns.
608    ///
609    /// The format of `downstream_pk_str` should be 'col1,col2,...' (delimited by `,`) in order to
610    /// get parsed.
611    fn parse_downstream_pk(
612        columns: &[ColumnCatalog],
613        downstream_pk_str: Option<&String>,
614    ) -> Result<Vec<usize>> {
615        match downstream_pk_str {
616            Some(downstream_pk_str) => {
617                // If the user defines the downstream primary key, we find out their indices.
618                let downstream_pk = downstream_pk_str.split(',').collect_vec();
619                let mut downstream_pk_indices = Vec::with_capacity(downstream_pk.len());
620                for key in downstream_pk {
621                    let trimmed_key = key.trim();
622                    if trimmed_key.is_empty() {
623                        continue;
624                    }
625                    downstream_pk_indices.push(find_column_idx_by_name(columns, trimmed_key)?);
626                }
627                Ok(downstream_pk_indices)
628            }
629            None => {
630                // The user doesn't define the downstream primary key and we simply return an empty
631                // vector.
632                Ok(Vec::new())
633            }
634        }
635    }
636
637    /// The table schema is: | epoch | seq id | row op | sink columns |
638    /// Pk is: | epoch | seq id |
639    fn infer_kv_log_store_table_catalog(&self) -> TableCatalog {
640        infer_kv_log_store_table_catalog_inner(&self.input, &self.sink_desc().columns)
641    }
642}
643
644impl PlanTreeNodeUnary<Stream> for StreamSink {
645    fn input(&self) -> PlanRef {
646        self.input.clone()
647    }
648
649    fn clone_with_input(&self, input: PlanRef) -> Self {
650        Self::new(input, self.sink_desc.clone(), self.log_store_type)
651        // TODO(nanderstabel): Add assertions (assert_eq!)
652    }
653}
654
655impl_plan_tree_node_for_unary! { Stream, StreamSink }
656
657impl Distill for StreamSink {
658    fn distill<'a>(&self) -> XmlNode<'a> {
659        let sink_type = if self.sink_desc.sink_type.is_append_only() {
660            "append-only"
661        } else {
662            "upsert"
663        };
664        let column_names = self
665            .sink_desc
666            .columns
667            .iter()
668            .map(|col| col.name_with_hidden().to_string())
669            .map(Pretty::from)
670            .collect();
671        let column_names = Pretty::Array(column_names);
672        let mut vec = Vec::with_capacity(3);
673        vec.push(("type", Pretty::from(sink_type)));
674        vec.push(("columns", column_names));
675        if self.sink_desc.sink_type.is_upsert() {
676            let sink_pk = IndicesDisplay {
677                indices: &self.sink_desc.downstream_pk.clone(),
678                schema: self.base.schema(),
679            };
680            vec.push(("downstream_pk", sink_pk.distill()));
681        }
682        childless_record("StreamSink", vec)
683    }
684}
685
686impl StreamNode for StreamSink {
687    fn to_stream_prost_body(&self, state: &mut BuildFragmentGraphState) -> PbNodeBody {
688        use risingwave_pb::stream_plan::*;
689
690        // We need to create a table for sink with a kv log store.
691        let table = self
692            .infer_kv_log_store_table_catalog()
693            .with_id(state.gen_table_id_wrapped());
694
695        PbNodeBody::Sink(Box::new(SinkNode {
696            sink_desc: Some(self.sink_desc.to_proto()),
697            table: Some(table.to_internal_table_prost()),
698            log_store_type: self.log_store_type as i32,
699            rate_limit: self.base.ctx().overwrite_options().sink_rate_limit,
700        }))
701    }
702}
703
704impl ExprRewritable<Stream> for StreamSink {}
705
706impl ExprVisitable for StreamSink {}
707
708#[cfg(test)]
709mod test {
710    use risingwave_common::catalog::{ColumnCatalog, ColumnDesc, ColumnId};
711    use risingwave_common::types::{DataType, StructType};
712    use risingwave_common::util::iter_util::ZipEqDebug;
713    use risingwave_pb::expr::expr_node::Type;
714
715    use super::{IcebergPartitionInfo, *};
716    use crate::expr::{Expr, ExprImpl};
717
718    fn create_column_catalog() -> Vec<ColumnCatalog> {
719        vec![
720            ColumnCatalog {
721                column_desc: ColumnDesc::named("v1", ColumnId::new(1), DataType::Int32),
722                is_hidden: false,
723            },
724            ColumnCatalog {
725                column_desc: ColumnDesc::named("v2", ColumnId::new(2), DataType::Timestamptz),
726                is_hidden: false,
727            },
728            ColumnCatalog {
729                column_desc: ColumnDesc::named("v3", ColumnId::new(2), DataType::Timestamp),
730                is_hidden: false,
731            },
732        ]
733    }
734
735    #[test]
736    fn test_iceberg_convert_to_expression() {
737        let partition_type = StructType::new(vec![
738            ("f1", DataType::Int32),
739            ("f2", DataType::Int32),
740            ("f3", DataType::Int32),
741            ("f4", DataType::Int32),
742            ("f5", DataType::Int32),
743            ("f6", DataType::Int32),
744            ("f7", DataType::Int32),
745            ("f8", DataType::Int32),
746            ("f9", DataType::Int32),
747        ]);
748        let partition_fields = vec![
749            ("v1".into(), Transform::Identity),
750            ("v1".into(), Transform::Bucket(10)),
751            ("v1".into(), Transform::Truncate(3)),
752            ("v2".into(), Transform::Year),
753            ("v2".into(), Transform::Month),
754            ("v3".into(), Transform::Day),
755            ("v3".into(), Transform::Hour),
756            ("v1".into(), Transform::Void),
757            ("v3".into(), Transform::Void),
758        ];
759        let partition_info = IcebergPartitionInfo {
760            partition_type: partition_type.clone(),
761            partition_fields: partition_fields.clone(),
762        };
763        let catalog = create_column_catalog();
764        let actual_expr = partition_info.convert_to_expression(&catalog).unwrap();
765        let actual_expr = actual_expr.as_function_call().unwrap();
766
767        assert_eq!(
768            actual_expr.return_type(),
769            DataType::Struct(partition_type.clone())
770        );
771        assert_eq!(actual_expr.inputs().len(), partition_fields.len());
772        assert_eq!(actual_expr.func_type(), Type::Row);
773
774        for ((expr, (_, transform)), (_, expect_type)) in actual_expr
775            .inputs()
776            .iter()
777            .zip_eq_debug(partition_fields.iter())
778            .zip_eq_debug(partition_type.iter())
779        {
780            match transform {
781                Transform::Identity => {
782                    assert!(expr.is_input_ref());
783                    assert_eq!(expr.return_type(), *expect_type);
784                }
785                Transform::Void => {
786                    assert!(expr.is_literal());
787                    assert_eq!(expr.return_type(), *expect_type);
788                }
789                _ => {
790                    let expr = expr.as_function_call().unwrap();
791                    assert_eq!(expr.func_type(), Type::IcebergTransform);
792                    assert_eq!(expr.inputs().len(), 2);
793                    assert_eq!(
794                        expr.inputs()[0],
795                        ExprImpl::literal_varchar(transform.to_string())
796                    );
797                }
798            }
799        }
800    }
801}