risingwave_frontend/optimizer/plan_node/
stream_sink.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::assert_matches::assert_matches;
16use std::sync::Arc;
17
18use iceberg::spec::Transform;
19use itertools::Itertools;
20use pretty_xmlish::{Pretty, XmlNode};
21use risingwave_common::catalog::{ColumnCatalog, CreateType, FieldLike};
22use risingwave_common::types::{DataType, StructType};
23use risingwave_common::util::iter_util::ZipEqDebug;
24use risingwave_connector::sink::catalog::desc::SinkDesc;
25use risingwave_connector::sink::catalog::{SinkFormat, SinkFormatDesc, SinkId, SinkType};
26use risingwave_connector::sink::file_sink::fs::FsSink;
27use risingwave_connector::sink::iceberg::ICEBERG_SINK;
28use risingwave_connector::sink::trivial::TABLE_SINK;
29use risingwave_connector::sink::{
30    CONNECTOR_TYPE_KEY, SINK_TYPE_APPEND_ONLY, SINK_TYPE_DEBEZIUM, SINK_TYPE_OPTION,
31    SINK_TYPE_RETRACT, SINK_TYPE_UPSERT, SINK_USER_FORCE_APPEND_ONLY_OPTION,
32};
33use risingwave_connector::{WithPropertiesExt, match_sink_name_str};
34use risingwave_pb::expr::expr_node::Type;
35use risingwave_pb::stream_plan::SinkLogStoreType;
36use risingwave_pb::stream_plan::stream_node::PbNodeBody;
37
38use super::derive::{derive_columns, derive_pk};
39use super::stream::prelude::*;
40use super::utils::{
41    Distill, IndicesDisplay, childless_record, infer_kv_log_store_table_catalog_inner,
42};
43use super::{
44    ExprRewritable, PlanBase, StreamExchange, StreamNode, StreamPlanRef as PlanRef, StreamProject,
45    StreamSyncLogStore, generic,
46};
47use crate::TableCatalog;
48use crate::error::{ErrorCode, Result, RwError, bail_bind_error, bail_invalid_input_syntax};
49use crate::expr::{ExprImpl, FunctionCall, InputRef};
50use crate::optimizer::StreamOptimizedLogicalPlanRoot;
51use crate::optimizer::plan_node::PlanTreeNodeUnary;
52use crate::optimizer::plan_node::expr_visitable::ExprVisitable;
53use crate::optimizer::plan_node::utils::plan_can_use_background_ddl;
54use crate::optimizer::property::{Distribution, RequiredDist};
55use crate::stream_fragmenter::BuildFragmentGraphState;
56use crate::utils::WithOptionsSecResolved;
57
58const DOWNSTREAM_PK_KEY: &str = "primary_key";
59const CREATE_TABLE_IF_NOT_EXISTS: &str = "create_table_if_not_exists";
60
61/// ## Why we need `PartitionComputeInfo`?
62///
63/// For some sink, it will write the data into different file based on the partition value. E.g. iceberg sink(<https://iceberg.apache.org/spec/#partitioning>)
64/// For this kind of sink, the file num can be reduced if we can shuffle the data based on the partition value. More details can be found in <https://github.com/risingwavelabs/rfcs/pull/77>.
65/// So if the `PartitionComputeInfo` provided, we will create a `StreamProject` node to compute the partition value and shuffle the data based on the partition value before the sink.
66///
67/// ## What is `PartitionComputeInfo`?
68/// The `PartitionComputeInfo` contains the information about partition compute. The stream sink will use
69/// these information to create the corresponding expression in `StreamProject` node.
70///
71/// #TODO
72/// Maybe we should move this in sink?
73pub enum PartitionComputeInfo {
74    Iceberg(IcebergPartitionInfo),
75}
76
77impl PartitionComputeInfo {
78    pub fn convert_to_expression(self, columns: &[ColumnCatalog]) -> Result<ExprImpl> {
79        match self {
80            PartitionComputeInfo::Iceberg(info) => info.convert_to_expression(columns),
81        }
82    }
83}
84
85pub struct IcebergPartitionInfo {
86    pub partition_type: StructType,
87    // (partition_field_name, partition_field_transform)
88    pub partition_fields: Vec<(String, Transform)>,
89}
90
91impl IcebergPartitionInfo {
92    #[inline]
93    fn transform_to_expression(
94        transform: &Transform,
95        col_id: usize,
96        columns: &[ColumnCatalog],
97        result_type: DataType,
98    ) -> Result<ExprImpl> {
99        match transform {
100            Transform::Identity => {
101                if columns[col_id].column_desc.data_type != result_type {
102                    return Err(ErrorCode::InvalidInputSyntax(format!(
103                        "The partition field {} has type {}, but the partition field is {}",
104                        columns[col_id].column_desc.name,
105                        columns[col_id].column_desc.data_type,
106                        result_type
107                    ))
108                    .into());
109                }
110                Ok(ExprImpl::InputRef(
111                    InputRef::new(col_id, result_type).into(),
112                ))
113            }
114            Transform::Void => Ok(ExprImpl::literal_null(result_type)),
115            _ => Ok(ExprImpl::FunctionCall(
116                FunctionCall::new_unchecked(
117                    Type::IcebergTransform,
118                    vec![
119                        ExprImpl::literal_varchar(transform.to_string()),
120                        ExprImpl::InputRef(
121                            InputRef::new(col_id, columns[col_id].column_desc.data_type.clone())
122                                .into(),
123                        ),
124                    ],
125                    result_type,
126                )
127                .into(),
128            )),
129        }
130    }
131
132    pub fn convert_to_expression(self, columns: &[ColumnCatalog]) -> Result<ExprImpl> {
133        let child_exprs = self
134            .partition_fields
135            .into_iter()
136            .zip_eq_debug(self.partition_type.iter())
137            .map(|((field_name, transform), (_, result_type))| {
138                let col_id = find_column_idx_by_name(columns, &field_name)?;
139                Self::transform_to_expression(&transform, col_id, columns, result_type.clone())
140            })
141            .collect::<Result<Vec<_>>>()?;
142
143        Ok(ExprImpl::FunctionCall(
144            FunctionCall::new_unchecked(
145                Type::Row,
146                child_exprs,
147                DataType::Struct(self.partition_type),
148            )
149            .into(),
150        ))
151    }
152}
153
154#[inline]
155fn find_column_idx_by_name(columns: &[ColumnCatalog], col_name: &str) -> Result<usize> {
156    columns
157        .iter()
158        .position(|col| col.column_desc.name == col_name)
159        .ok_or_else(|| {
160            ErrorCode::InvalidInputSyntax(format!("Sink primary key column not found: {}. Please use ',' as the delimiter for different primary key columns.", col_name))
161                .into()
162        })
163}
164
165/// [`StreamSink`] represents a table/connector sink at the very end of the graph.
166#[derive(Debug, Clone, PartialEq, Eq, Hash)]
167pub struct StreamSink {
168    pub base: PlanBase<Stream>,
169    input: PlanRef,
170    sink_desc: SinkDesc,
171    log_store_type: SinkLogStoreType,
172}
173
174impl StreamSink {
175    #[must_use]
176    pub fn new(input: PlanRef, sink_desc: SinkDesc, log_store_type: SinkLogStoreType) -> Self {
177        // The sink executor will transform the chunk into desired format based on the sink type
178        // before writing to the sink or emitting to the downstream. Thus, we need to derive the
179        // stream kind based on the sink type.
180        // We assert here because checks should already be done in `derive_sink_type`.
181        let input_kind = input.stream_kind();
182        let kind = match sink_desc.sink_type {
183            SinkType::AppendOnly => {
184                assert_eq!(
185                    input_kind,
186                    StreamKind::AppendOnly,
187                    "{input_kind} stream cannot be used as input of append-only sink",
188                );
189                StreamKind::AppendOnly
190            }
191            SinkType::ForceAppendOnly => StreamKind::AppendOnly,
192            SinkType::Upsert => StreamKind::Upsert,
193            SinkType::Retract => {
194                assert_ne!(
195                    input_kind,
196                    StreamKind::Upsert,
197                    "upsert stream cannot be used as input of retract sink",
198                );
199                StreamKind::Retract
200            }
201        };
202
203        let base = PlanBase::new_stream(
204            input.ctx(),
205            input.schema().clone(),
206            // FIXME: We may reconstruct the chunk based on user-specified downstream pk, so
207            // we should also use `downstream_pk` as the stream key of the output. Though this
208            // is unlikely to result in correctness issues:
209            // - for sink-into-table, the `Materialize` node in the downstream table will always
210            //   enforce the pk consistency
211            // - for other sinks, the output of `Sink` node is not used
212            input.stream_key().map(|v| v.to_vec()),
213            input.functional_dependency().clone(),
214            input.distribution().clone(),
215            kind,
216            input.emit_on_window_close(),
217            input.watermark_columns().clone(),
218            input.columns_monotonicity().clone(),
219        );
220
221        Self {
222            base,
223            input,
224            sink_desc,
225            log_store_type,
226        }
227    }
228
229    pub fn sink_desc(&self) -> &SinkDesc {
230        &self.sink_desc
231    }
232
233    fn derive_iceberg_sink_distribution(
234        input: PlanRef,
235        partition_info: Option<PartitionComputeInfo>,
236        columns: &[ColumnCatalog],
237    ) -> Result<(RequiredDist, PlanRef, Option<usize>)> {
238        // For here, we need to add the plan node to compute the partition value, and add it as a extra column.
239        if let Some(partition_info) = partition_info {
240            let input_fields = input.schema().fields();
241
242            let mut exprs: Vec<_> = input_fields
243                .iter()
244                .enumerate()
245                .map(|(idx, field)| InputRef::new(idx, field.data_type.clone()).into())
246                .collect();
247
248            // Add the partition compute expression to the end of the exprs
249            exprs.push(partition_info.convert_to_expression(columns)?);
250            let partition_col_idx = exprs.len() - 1;
251            let project = StreamProject::new(generic::Project::new(exprs.clone(), input));
252            Ok((
253                RequiredDist::shard_by_key(project.schema().len(), &[partition_col_idx]),
254                project.into(),
255                Some(partition_col_idx),
256            ))
257        } else {
258            Ok((
259                RequiredDist::shard_by_key(input.schema().len(), input.expect_stream_key()),
260                input,
261                None,
262            ))
263        }
264    }
265
266    #[allow(clippy::too_many_arguments)]
267    pub fn create(
268        StreamOptimizedLogicalPlanRoot {
269            plan: mut input,
270            required_dist: user_distributed_by,
271            required_order: user_order_by,
272            out_fields: user_cols,
273            out_names,
274            ..
275        }: StreamOptimizedLogicalPlanRoot,
276        name: String,
277        db_name: String,
278        sink_from_table_name: String,
279        target_table: Option<Arc<TableCatalog>>,
280        target_table_mapping: Option<Vec<Option<usize>>>,
281        definition: String,
282        properties: WithOptionsSecResolved,
283        format_desc: Option<SinkFormatDesc>,
284        partition_info: Option<PartitionComputeInfo>,
285        auto_refresh_schema_from_table: Option<Arc<TableCatalog>>,
286    ) -> Result<Self> {
287        let sink_type =
288            Self::derive_sink_type(input.stream_kind(), &properties, format_desc.as_ref())?;
289
290        let columns = derive_columns(input.schema(), out_names, &user_cols)?;
291        let (pk, _) = derive_pk(
292            input.clone(),
293            user_distributed_by.clone(),
294            user_order_by,
295            &columns,
296        );
297        let derived_pk = pk.iter().map(|k| k.column_index).collect_vec();
298
299        // Get downstream pk from user input, override and perform some checks if applicable.
300        let downstream_pk = {
301            let downstream_pk = properties
302                .get(DOWNSTREAM_PK_KEY)
303                .map(|v| Self::parse_downstream_pk(v, &columns))
304                .transpose()?;
305
306            if let Some(t) = &target_table {
307                let user_defined_primary_key_table = t.row_id_index.is_none();
308                let sink_is_append_only = sink_type.is_append_only();
309
310                if !user_defined_primary_key_table && !sink_is_append_only {
311                    return Err(RwError::from(ErrorCode::BindError(
312                        "Only append-only sinks can sink to a table without primary keys. please try to add type = 'append-only' in the with option. e.g. create sink s into t as select * from t1 with (type = 'append-only')".to_owned(),
313                    )));
314                }
315
316                if t.append_only && !sink_is_append_only {
317                    return Err(RwError::from(ErrorCode::BindError(
318                        "Only append-only sinks can sink to a append only table. please try to add type = 'append-only' in the with option. e.g. create sink s into t as select * from t1 with (type = 'append-only')".to_owned(),
319                    )));
320                }
321
322                if sink_is_append_only {
323                    None
324                } else {
325                    let target_table_mapping = target_table_mapping.unwrap();
326                    Some(t.pk()
327                        .iter()
328                        .map(|c| {
329                            target_table_mapping[c.column_index].ok_or_else(
330                                || ErrorCode::InvalidInputSyntax("When using non append only sink into table, the primary key of the table must be included in the sink result.".to_owned()).into())
331                        })
332                        .try_collect::<_, _, RwError>()?)
333                }
334            } else if properties.get(CREATE_TABLE_IF_NOT_EXISTS) == Some(&"true".to_owned())
335                && sink_type == SinkType::Upsert
336                && downstream_pk.is_none()
337            {
338                Some(derived_pk.clone())
339            } else if properties.is_iceberg_connector()
340                && sink_type == SinkType::Upsert
341                && downstream_pk.is_none()
342            {
343                // If user doesn't specify the downstream primary key, we use the stream key as the pk.
344                Some(derived_pk.clone())
345            } else {
346                downstream_pk
347            }
348        };
349
350        // Since we've already rejected empty pk in `parse_downstream_pk`, if we still get an empty pk here,
351        // it's likely that the derived stream key is used and it's empty, which is possible in cases of
352        // operators outputting at most one row (like `SimpleAgg`). This is legitimate. However, currently
353        // the sink implementation may confuse empty pk with not specifying pk, so we still reject this case
354        // for correctness.
355        if let Some(pk) = &downstream_pk
356            && pk.is_empty()
357        {
358            bail_invalid_input_syntax!(
359                "Empty primary key is not supported. \
360                 Please specify the primary key in WITH options."
361            )
362        }
363
364        // The "upsert" property is defined based on a specific stream key: columns other than the stream key
365        // might not be valid. We should reject the cases referencing such columns in primary key.
366        if let StreamKind::Upsert = input.stream_kind()
367            && let Some(downstream_pk) = &downstream_pk
368            && !downstream_pk.iter().all(|i| derived_pk.contains(i))
369        {
370            bail_bind_error!(
371                "When sinking from an upsert stream, \
372                 the downstream primary key must be the same as or a subset of the one derived from the stream."
373            )
374        }
375
376        if let Some(upstream_table) = &auto_refresh_schema_from_table
377            && let Some(downstream_pk) = &downstream_pk
378        {
379            let upstream_table_pk_col_names = upstream_table
380                .pk
381                .iter()
382                .map(|order| {
383                    upstream_table.columns[order.column_index]
384                        .column_desc
385                        .name()
386                })
387                .collect_vec();
388            let sink_pk_col_names = downstream_pk
389                .iter()
390                .map(|&column_index| columns[column_index].name())
391                .collect_vec();
392            if upstream_table_pk_col_names != sink_pk_col_names {
393                return Err(ErrorCode::InvalidInputSyntax(format!("sink with auto schema change should have same pk as upstream table {:?}, but got {:?}", upstream_table_pk_col_names, sink_pk_col_names)).into());
394            }
395        }
396        let mut extra_partition_col_idx = None;
397
398        let required_dist = match input.distribution() {
399            Distribution::Single => RequiredDist::single(),
400            _ => {
401                match properties.get("connector") {
402                    Some(s) if s == "jdbc" && sink_type == SinkType::Upsert => {
403                        let Some(downstream_pk) = &downstream_pk else {
404                            return Err(ErrorCode::InvalidInputSyntax(format!(
405                                "Primary key must be defined for upsert JDBC sink. Please specify the \"{key}='pk1,pk2,...'\" in WITH options.",
406                                key = DOWNSTREAM_PK_KEY
407                            )).into());
408                        };
409                        // for upsert jdbc sink we align distribution to downstream to avoid
410                        // lock contentions
411                        RequiredDist::hash_shard(downstream_pk)
412                    }
413                    Some(s) if s == ICEBERG_SINK => {
414                        let (required_dist, new_input, partition_col_idx) =
415                            Self::derive_iceberg_sink_distribution(
416                                input,
417                                partition_info,
418                                &columns,
419                            )?;
420                        input = new_input;
421                        extra_partition_col_idx = partition_col_idx;
422                        required_dist
423                    }
424                    _ => {
425                        assert_matches!(user_distributed_by, RequiredDist::Any);
426                        if let Some(downstream_pk) = &downstream_pk {
427                            // force the same primary key be written into the same sink shard to make sure the sink pk mismatch compaction effective
428                            // https://github.com/risingwavelabs/risingwave/blob/6d88344c286f250ea8a7e7ef6b9d74dea838269e/src/stream/src/executor/sink.rs#L169-L198
429                            RequiredDist::shard_by_key(input.schema().len(), downstream_pk)
430                        } else {
431                            RequiredDist::shard_by_key(
432                                input.schema().len(),
433                                input.expect_stream_key(),
434                            )
435                        }
436                    }
437                }
438            }
439        };
440        let input = required_dist.streaming_enforce_if_not_satisfies(input)?;
441        let input = if input.ctx().session_ctx().config().streaming_separate_sink()
442            && input.as_stream_exchange().is_none()
443        {
444            StreamExchange::new_no_shuffle(input).into()
445        } else {
446            input
447        };
448
449        let distribution_key = input.distribution().dist_column_indices().to_vec();
450        let create_type = if input.ctx().session_ctx().config().background_ddl()
451            && plan_can_use_background_ddl(&input)
452        {
453            CreateType::Background
454        } else {
455            CreateType::Foreground
456        };
457        let (properties, secret_refs) = properties.into_parts();
458        let is_exactly_once = properties
459            .get("is_exactly_once")
460            .map(|v| v.to_lowercase() == "true");
461
462        let mut sink_desc = SinkDesc {
463            id: SinkId::placeholder(),
464            name,
465            db_name,
466            sink_from_name: sink_from_table_name,
467            definition,
468            columns,
469            plan_pk: pk,
470            downstream_pk,
471            distribution_key,
472            properties,
473            secret_refs,
474            sink_type,
475            format_desc,
476            target_table: target_table.as_ref().map(|catalog| catalog.id()),
477            extra_partition_col_idx,
478            create_type,
479            is_exactly_once,
480            auto_refresh_schema_from_table: auto_refresh_schema_from_table
481                .as_ref()
482                .map(|table| table.id),
483        };
484
485        let unsupported_sink = |sink: &str| -> Result<_> {
486            Err(ErrorCode::InvalidInputSyntax(format!("unsupported sink type {}", sink)).into())
487        };
488
489        // check and ensure that the sink connector is specified and supported
490        let sink_decouple = match sink_desc.properties.get(CONNECTOR_TYPE_KEY) {
491            Some(connector) => {
492                let connector_type = connector.to_lowercase();
493                match_sink_name_str!(
494                    connector_type.as_str(),
495                    SinkType,
496                    {
497                        // the table sink is created by with properties
498                        if connector == TABLE_SINK && sink_desc.target_table.is_none() {
499                            unsupported_sink(TABLE_SINK)
500                        } else {
501                            SinkType::set_default_commit_checkpoint_interval(
502                                &mut sink_desc,
503                                &input.ctx().session_ctx().config().sink_decouple(),
504                            )?;
505                            let support_schema_change = SinkType::support_schema_change();
506                            if !support_schema_change && auto_refresh_schema_from_table.is_some() {
507                                return Err(ErrorCode::InvalidInputSyntax(format!(
508                                    "{} sink does not support schema change",
509                                    connector_type
510                                ))
511                                .into());
512                            }
513                            SinkType::is_sink_decouple(
514                                &input.ctx().session_ctx().config().sink_decouple(),
515                            )
516                            .map_err(Into::into)
517                        }
518                    },
519                    |other: &str| unsupported_sink(other)
520                )?
521            }
522            None => {
523                return Err(ErrorCode::InvalidInputSyntax(
524                    "connector not specified when create sink".to_owned(),
525                )
526                .into());
527            }
528        };
529        let hint_string =
530            |expected: bool| format!("Please run `set sink_decouple = {}` first.", expected);
531        if !sink_decouple {
532            // For file sink, it must have sink_decouple turned on.
533            if sink_desc.is_file_sink() {
534                return Err(ErrorCode::NotSupported(
535                    "File sink can only be created with sink_decouple enabled.".to_owned(),
536                    hint_string(true),
537                )
538                .into());
539            }
540
541            let is_exactly_once = match sink_desc.is_exactly_once {
542                Some(v) => v,
543                None => {
544                    if let Some(connector) = sink_desc.properties.get(CONNECTOR_TYPE_KEY) {
545                        let connector_type = connector.to_lowercase();
546                        if connector_type == ICEBERG_SINK {
547                            // iceberg sink defaults to exactly once
548                            // However, when sink_decouple is disabled, we enforce it to false.
549                            sink_desc
550                                .properties
551                                .insert("is_exactly_once".to_owned(), "false".to_owned());
552                        }
553                    }
554                    false
555                }
556            };
557
558            if is_exactly_once {
559                return Err(ErrorCode::NotSupported(
560                    "Exactly once sink can only be created with sink_decouple enabled.".to_owned(),
561                    hint_string(true),
562                )
563                .into());
564            }
565        }
566        if sink_decouple && auto_refresh_schema_from_table.is_some() {
567            return Err(ErrorCode::NotSupported(
568                "sink with auto schema refresh can only be created with sink_decouple disabled."
569                    .to_owned(),
570                hint_string(false),
571            )
572            .into());
573        }
574        let log_store_type = if sink_decouple {
575            SinkLogStoreType::KvLogStore
576        } else {
577            SinkLogStoreType::InMemoryLogStore
578        };
579
580        // sink into table should have logstore for sink_decouple
581        let input = if sink_decouple && target_table.is_some() {
582            StreamSyncLogStore::new(input).into()
583        } else {
584            input
585        };
586
587        Ok(Self::new(input, sink_desc, log_store_type))
588    }
589
590    fn sink_type_in_prop(properties: &WithOptionsSecResolved) -> Result<Option<SinkType>> {
591        if let Some(sink_type) = properties.get(SINK_TYPE_OPTION) {
592            let sink_type = match sink_type.as_str() {
593                SINK_TYPE_APPEND_ONLY => SinkType::AppendOnly,
594                SINK_TYPE_UPSERT => {
595                    if properties.is_iceberg_connector() {
596                        // Iceberg sink must use retract to represent deletes
597                        SinkType::Retract
598                    } else {
599                        SinkType::Upsert
600                    }
601                }
602                SINK_TYPE_RETRACT | SINK_TYPE_DEBEZIUM => SinkType::Retract,
603                _ => {
604                    return Err(ErrorCode::InvalidInputSyntax(format!(
605                        "`{}` must be {}, {}, {}, or {}",
606                        SINK_TYPE_OPTION,
607                        SINK_TYPE_APPEND_ONLY,
608                        SINK_TYPE_RETRACT,
609                        SINK_TYPE_UPSERT,
610                        SINK_TYPE_DEBEZIUM,
611                    ))
612                    .into());
613                }
614            };
615            return Ok(Some(sink_type));
616        }
617        Ok(None)
618    }
619
620    fn is_user_force_append_only(properties: &WithOptionsSecResolved) -> Result<bool> {
621        if properties.contains_key(SINK_USER_FORCE_APPEND_ONLY_OPTION)
622            && !properties.value_eq_ignore_case(SINK_USER_FORCE_APPEND_ONLY_OPTION, "true")
623            && !properties.value_eq_ignore_case(SINK_USER_FORCE_APPEND_ONLY_OPTION, "false")
624        {
625            return Err(ErrorCode::InvalidInputSyntax(format!(
626                "`{}` must be true or false",
627                SINK_USER_FORCE_APPEND_ONLY_OPTION
628            ))
629            .into());
630        }
631        Ok(properties.value_eq_ignore_case(SINK_USER_FORCE_APPEND_ONLY_OPTION, "true"))
632    }
633
634    /// Derive the sink type based on...
635    ///
636    /// - the derived stream kind of the plan, from the optimizer
637    /// - sink format required by [`SinkFormatDesc`], if any
638    /// - user-specified sink type or `force_append_only` in WITH options, if any
639    fn derive_sink_type(
640        derived_stream_kind: StreamKind,
641        properties: &WithOptionsSecResolved,
642        format_desc: Option<&SinkFormatDesc>,
643    ) -> Result<SinkType> {
644        let (user_defined_sink_type, user_force_append_only, syntax_legacy) = match format_desc {
645            Some(f) => (
646                Some(match f.format {
647                    SinkFormat::AppendOnly => SinkType::AppendOnly,
648                    SinkFormat::Upsert => SinkType::Upsert,
649                    SinkFormat::Debezium => SinkType::Retract,
650                }),
651                Self::is_user_force_append_only(&WithOptionsSecResolved::without_secrets(
652                    f.options.clone(),
653                ))?,
654                false,
655            ),
656            None => (
657                Self::sink_type_in_prop(properties)?,
658                Self::is_user_force_append_only(properties)?,
659                true,
660            ),
661        };
662
663        if user_force_append_only
664            && user_defined_sink_type.is_some()
665            && user_defined_sink_type != Some(SinkType::AppendOnly)
666        {
667            return Err(ErrorCode::InvalidInputSyntax(
668                "The force_append_only can be only used for type = \'append-only\'".to_owned(),
669            )
670            .into());
671        }
672
673        let user_force_append_only =
674            if user_force_append_only && derived_stream_kind.is_append_only() {
675                false
676            } else {
677                user_force_append_only
678            };
679
680        if user_force_append_only && user_defined_sink_type != Some(SinkType::AppendOnly) {
681            return Err(ErrorCode::InvalidInputSyntax(format!(
682                "Cannot force the sink to be append-only without \"{}\".",
683                if syntax_legacy {
684                    "type='append-only'"
685                } else {
686                    "FORMAT PLAIN"
687                }
688            ))
689            .into());
690        }
691
692        if let Some(user_defined_sink_type) = user_defined_sink_type {
693            match user_defined_sink_type {
694                SinkType::AppendOnly => {
695                    if user_force_append_only {
696                        return Ok(SinkType::ForceAppendOnly);
697                    }
698                    if derived_stream_kind != StreamKind::AppendOnly {
699                        return Err(ErrorCode::InvalidInputSyntax(format!(
700                            "The sink of {} stream cannot be append-only. Please add \"force_append_only='true'\" in {} options to force the sink to be append-only. \
701                             Notice that this will cause the sink executor to drop DELETE messages and convert UPDATE messages to INSERT.",
702                            derived_stream_kind,
703                            if syntax_legacy { "WITH" } else { "FORMAT ENCODE" }
704                    ))
705                        .into());
706                    }
707                }
708                SinkType::ForceAppendOnly => unreachable!(),
709                SinkType::Upsert => { /* always qualified */ }
710                SinkType::Retract => {
711                    if derived_stream_kind == StreamKind::Upsert {
712                        bail_invalid_input_syntax!(
713                            "The sink of upsert stream cannot be retract. \
714                             Please create a materialized view or sink-into-table with this query before sinking it.",
715                        );
716                    }
717                }
718            }
719            Ok(user_defined_sink_type)
720        } else {
721            // No specification at all, follow the optimizer's derivation.
722            // This is also the case for sink-into-table.
723            Ok(match derived_stream_kind {
724                // We downgrade `Retract` to `Upsert` unless explicitly specified the type in options,
725                // as it is well supported by most sinks and reduces the amount of data written.
726                StreamKind::Retract | StreamKind::Upsert => SinkType::Upsert,
727                StreamKind::AppendOnly => SinkType::AppendOnly,
728            })
729        }
730    }
731
732    /// Extract user-defined downstream pk columns from with options. Return the indices of the pk
733    /// columns. An empty list of columns is not allowed.
734    ///
735    /// The format of `downstream_pk_str` should be 'col1,col2,...' (delimited by `,`) in order to
736    /// get parsed.
737    fn parse_downstream_pk(
738        downstream_pk_str: &str,
739        columns: &[ColumnCatalog],
740    ) -> Result<Vec<usize>> {
741        // If the user defines the downstream primary key, we find out their indices.
742        let downstream_pk = downstream_pk_str.split(',').collect_vec();
743        let mut downstream_pk_indices = Vec::with_capacity(downstream_pk.len());
744        for key in downstream_pk {
745            let trimmed_key = key.trim();
746            if trimmed_key.is_empty() {
747                continue;
748            }
749            downstream_pk_indices.push(find_column_idx_by_name(columns, trimmed_key)?);
750        }
751        if downstream_pk_indices.is_empty() {
752            bail_invalid_input_syntax!(
753                "Specified primary key should not be empty. \
754                To use derived primary key, remove {DOWNSTREAM_PK_KEY} from WITH options instead."
755            );
756        }
757        Ok(downstream_pk_indices)
758    }
759
760    /// The table schema is: | epoch | seq id | row op | sink columns |
761    /// Pk is: | epoch | seq id |
762    fn infer_kv_log_store_table_catalog(&self) -> TableCatalog {
763        infer_kv_log_store_table_catalog_inner(&self.input, &self.sink_desc().columns)
764    }
765}
766
767impl PlanTreeNodeUnary<Stream> for StreamSink {
768    fn input(&self) -> PlanRef {
769        self.input.clone()
770    }
771
772    fn clone_with_input(&self, input: PlanRef) -> Self {
773        Self::new(input, self.sink_desc.clone(), self.log_store_type)
774        // TODO(nanderstabel): Add assertions (assert_eq!)
775    }
776}
777
778impl_plan_tree_node_for_unary! { Stream, StreamSink }
779
780impl Distill for StreamSink {
781    fn distill<'a>(&self) -> XmlNode<'a> {
782        let sink_type = if self.sink_desc.sink_type.is_append_only() {
783            "append-only"
784        } else {
785            "upsert"
786        };
787        let column_names = self
788            .sink_desc
789            .columns
790            .iter()
791            .map(|col| col.name_with_hidden().to_string())
792            .map(Pretty::from)
793            .collect();
794        let column_names = Pretty::Array(column_names);
795        let mut vec = Vec::with_capacity(3);
796        vec.push(("type", Pretty::from(sink_type)));
797        vec.push(("columns", column_names));
798        if let Some(pk) = &self.sink_desc.downstream_pk {
799            let sink_pk = IndicesDisplay {
800                indices: pk,
801                schema: self.base.schema(),
802            };
803            vec.push(("downstream_pk", sink_pk.distill()));
804        }
805        childless_record("StreamSink", vec)
806    }
807}
808
809impl StreamNode for StreamSink {
810    fn to_stream_prost_body(&self, state: &mut BuildFragmentGraphState) -> PbNodeBody {
811        use risingwave_pb::stream_plan::*;
812
813        // We need to create a table for sink with a kv log store.
814        let table = self
815            .infer_kv_log_store_table_catalog()
816            .with_id(state.gen_table_id_wrapped());
817
818        PbNodeBody::Sink(Box::new(SinkNode {
819            sink_desc: Some(self.sink_desc.to_proto()),
820            table: Some(table.to_internal_table_prost()),
821            log_store_type: self.log_store_type as i32,
822            rate_limit: self.base.ctx().overwrite_options().sink_rate_limit,
823        }))
824    }
825}
826
827impl ExprRewritable<Stream> for StreamSink {}
828
829impl ExprVisitable for StreamSink {}
830
831#[cfg(test)]
832mod test {
833    use risingwave_common::catalog::{ColumnCatalog, ColumnDesc, ColumnId};
834    use risingwave_common::types::{DataType, StructType};
835    use risingwave_common::util::iter_util::ZipEqDebug;
836    use risingwave_pb::expr::expr_node::Type;
837
838    use super::{IcebergPartitionInfo, *};
839    use crate::expr::{Expr, ExprImpl};
840
841    fn create_column_catalog() -> Vec<ColumnCatalog> {
842        vec![
843            ColumnCatalog {
844                column_desc: ColumnDesc::named("v1", ColumnId::new(1), DataType::Int32),
845                is_hidden: false,
846            },
847            ColumnCatalog {
848                column_desc: ColumnDesc::named("v2", ColumnId::new(2), DataType::Timestamptz),
849                is_hidden: false,
850            },
851            ColumnCatalog {
852                column_desc: ColumnDesc::named("v3", ColumnId::new(2), DataType::Timestamp),
853                is_hidden: false,
854            },
855        ]
856    }
857
858    #[test]
859    fn test_iceberg_convert_to_expression() {
860        let partition_type = StructType::new(vec![
861            ("f1", DataType::Int32),
862            ("f2", DataType::Int32),
863            ("f3", DataType::Int32),
864            ("f4", DataType::Int32),
865            ("f5", DataType::Int32),
866            ("f6", DataType::Int32),
867            ("f7", DataType::Int32),
868            ("f8", DataType::Int32),
869            ("f9", DataType::Int32),
870        ]);
871        let partition_fields = vec![
872            ("v1".into(), Transform::Identity),
873            ("v1".into(), Transform::Bucket(10)),
874            ("v1".into(), Transform::Truncate(3)),
875            ("v2".into(), Transform::Year),
876            ("v2".into(), Transform::Month),
877            ("v3".into(), Transform::Day),
878            ("v3".into(), Transform::Hour),
879            ("v1".into(), Transform::Void),
880            ("v3".into(), Transform::Void),
881        ];
882        let partition_info = IcebergPartitionInfo {
883            partition_type: partition_type.clone(),
884            partition_fields: partition_fields.clone(),
885        };
886        let catalog = create_column_catalog();
887        let actual_expr = partition_info.convert_to_expression(&catalog).unwrap();
888        let actual_expr = actual_expr.as_function_call().unwrap();
889
890        assert_eq!(
891            actual_expr.return_type(),
892            DataType::Struct(partition_type.clone())
893        );
894        assert_eq!(actual_expr.inputs().len(), partition_fields.len());
895        assert_eq!(actual_expr.func_type(), Type::Row);
896
897        for ((expr, (_, transform)), (_, expect_type)) in actual_expr
898            .inputs()
899            .iter()
900            .zip_eq_debug(partition_fields.iter())
901            .zip_eq_debug(partition_type.iter())
902        {
903            match transform {
904                Transform::Identity => {
905                    assert!(expr.is_input_ref());
906                    assert_eq!(expr.return_type(), *expect_type);
907                }
908                Transform::Void => {
909                    assert!(expr.is_literal());
910                    assert_eq!(expr.return_type(), *expect_type);
911                }
912                _ => {
913                    let expr = expr.as_function_call().unwrap();
914                    assert_eq!(expr.func_type(), Type::IcebergTransform);
915                    assert_eq!(expr.inputs().len(), 2);
916                    assert_eq!(
917                        expr.inputs()[0],
918                        ExprImpl::literal_varchar(transform.to_string())
919                    );
920                }
921            }
922        }
923    }
924}