Skip to main content

risingwave_frontend/optimizer/plan_node/
stream_sink.rs

1// Copyright 2022 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::assert_matches;
16use std::sync::Arc;
17
18use iceberg::spec::Transform;
19use itertools::Itertools;
20use pretty_xmlish::{Pretty, XmlNode};
21use risingwave_common::catalog::{
22    ColumnCatalog, ConflictBehavior, CreateType, FieldLike, RISINGWAVE_ICEBERG_ROW_ID,
23    ROW_ID_COLUMN_NAME,
24};
25use risingwave_common::types::{DataType, StructType};
26use risingwave_common::util::iter_util::ZipEqDebug;
27use risingwave_connector::sink::catalog::desc::SinkDesc;
28use risingwave_connector::sink::catalog::{SinkFormat, SinkFormatDesc, SinkId, SinkType};
29use risingwave_connector::sink::file_sink::fs::FsSink;
30use risingwave_connector::sink::iceberg::{ENABLE_PK_INDEX, ICEBERG_SINK};
31use risingwave_connector::sink::trivial::TABLE_SINK;
32use risingwave_connector::sink::{
33    CONNECTOR_TYPE_KEY, SINK_TYPE_APPEND_ONLY, SINK_TYPE_DEBEZIUM, SINK_TYPE_OPTION,
34    SINK_TYPE_RETRACT, SINK_TYPE_UPSERT, SINK_USER_FORCE_APPEND_ONLY_OPTION,
35    SINK_USER_IGNORE_DELETE_OPTION, SINK_USER_PRESERVE_ROW_LEVEL_CHANGES,
36};
37use risingwave_connector::{WithPropertiesExt, match_sink_name_str};
38use risingwave_pb::expr::expr_node::Type;
39use risingwave_pb::stream_plan::SinkLogStoreType;
40use risingwave_pb::stream_plan::stream_node::PbNodeBody;
41
42use super::derive::{derive_columns, derive_pk};
43use super::stream::prelude::*;
44use super::utils::{
45    Distill, IndicesDisplay, childless_record, infer_kv_log_store_table_catalog_inner,
46};
47use super::{
48    ExprRewritable, PlanBase, StreamExchange, StreamNode, StreamPlanRef as PlanRef, StreamProject,
49    StreamSyncLogStore, generic,
50};
51use crate::TableCatalog;
52use crate::error::{ErrorCode, Result, RwError, bail_bind_error, bail_invalid_input_syntax};
53use crate::expr::{ExprImpl, FunctionCall, InputRef};
54use crate::optimizer::StreamOptimizedLogicalPlanRoot;
55use crate::optimizer::plan_node::PlanTreeNodeUnary;
56use crate::optimizer::plan_node::expr_visitable::ExprVisitable;
57use crate::optimizer::plan_node::utils::plan_can_use_background_ddl;
58use crate::optimizer::property::{Distribution, RequiredDist};
59use crate::stream_fragmenter::BuildFragmentGraphState;
60use crate::utils::WithOptionsSecResolved;
61
62const DOWNSTREAM_PK_KEY: &str = "primary_key";
63const CREATE_TABLE_IF_NOT_EXISTS: &str = "create_table_if_not_exists";
64
65fn target_table_requires_row_level_conflict_handling(target_table: &TableCatalog) -> bool {
66    !target_table.version_column_indices.is_empty()
67        || matches!(
68            target_table.conflict_behavior(),
69            ConflictBehavior::DoUpdateIfNotNull | ConflictBehavior::IgnoreConflict
70        )
71}
72
73/// ## Why we need `PartitionComputeInfo`?
74///
75/// For some sink, it will write the data into different file based on the partition value. E.g. iceberg sink(<https://iceberg.apache.org/spec/#partitioning>)
76/// For this kind of sink, the file num can be reduced if we can shuffle the data based on the partition value. More details can be found in <https://github.com/risingwavelabs/rfcs/pull/77>.
77/// So if the `PartitionComputeInfo` provided, we will create a `StreamProject` node to compute the partition value and shuffle the data based on the partition value before the sink.
78///
79/// ## What is `PartitionComputeInfo`?
80/// The `PartitionComputeInfo` contains the information about partition compute. The stream sink will use
81/// these information to create the corresponding expression in `StreamProject` node.
82///
83/// #TODO
84/// Maybe we should move this in sink?
85pub enum PartitionComputeInfo {
86    Iceberg(IcebergPartitionInfo),
87}
88
89impl PartitionComputeInfo {
90    pub fn convert_to_expression(self, columns: &[ColumnCatalog]) -> Result<ExprImpl> {
91        match self {
92            PartitionComputeInfo::Iceberg(info) => info.convert_to_expression(columns),
93        }
94    }
95}
96
97pub struct IcebergPartitionInfo {
98    pub partition_type: StructType,
99    // (partition_field_name, partition_field_transform)
100    pub partition_fields: Vec<(String, Transform)>,
101}
102
103impl IcebergPartitionInfo {
104    #[inline]
105    fn transform_to_expression(
106        transform: &Transform,
107        col_id: usize,
108        columns: &[ColumnCatalog],
109        result_type: DataType,
110    ) -> Result<ExprImpl> {
111        match transform {
112            Transform::Identity => {
113                if columns[col_id].column_desc.data_type != result_type {
114                    return Err(ErrorCode::InvalidInputSyntax(format!(
115                        "The partition field {} has type {}, but the partition field is {}",
116                        columns[col_id].column_desc.name,
117                        columns[col_id].column_desc.data_type,
118                        result_type
119                    ))
120                    .into());
121                }
122                Ok(ExprImpl::InputRef(
123                    InputRef::new(col_id, result_type).into(),
124                ))
125            }
126            Transform::Void => Ok(ExprImpl::literal_null(result_type)),
127            _ => Ok(ExprImpl::FunctionCall(
128                FunctionCall::new_unchecked(
129                    Type::IcebergTransform,
130                    vec![
131                        ExprImpl::literal_varchar(transform.to_string()),
132                        ExprImpl::InputRef(
133                            InputRef::new(col_id, columns[col_id].column_desc.data_type.clone())
134                                .into(),
135                        ),
136                    ],
137                    result_type,
138                )
139                .into(),
140            )),
141        }
142    }
143
144    pub fn convert_to_expression(self, columns: &[ColumnCatalog]) -> Result<ExprImpl> {
145        let child_exprs = self
146            .partition_fields
147            .into_iter()
148            .zip_eq_debug(self.partition_type.iter())
149            .map(|((field_name, transform), (_, result_type))| {
150                let col_id = find_column_idx_by_name(columns, &field_name)?;
151                Self::transform_to_expression(&transform, col_id, columns, result_type.clone())
152            })
153            .collect::<Result<Vec<_>>>()?;
154
155        Ok(ExprImpl::FunctionCall(
156            FunctionCall::new_unchecked(
157                Type::Row,
158                child_exprs,
159                DataType::Struct(self.partition_type),
160            )
161            .into(),
162        ))
163    }
164}
165
166#[inline]
167fn find_column_idx_by_name(columns: &[ColumnCatalog], col_name: &str) -> Result<usize> {
168    columns
169        .iter()
170        .position(|col| col.column_desc.name == col_name)
171        .ok_or_else(|| {
172            ErrorCode::InvalidInputSyntax(format!("Sink primary key column not found: {}. Please use ',' as the delimiter for different primary key columns.", col_name))
173                .into()
174        })
175}
176
177/// [`StreamSink`] represents a table/connector sink at the very end of the graph.
178#[derive(Debug, Clone, PartialEq, Eq, Hash)]
179pub struct StreamSink {
180    pub base: PlanBase<Stream>,
181    input: PlanRef,
182    sink_desc: SinkDesc,
183    log_store_type: SinkLogStoreType,
184}
185
186impl StreamSink {
187    #[must_use]
188    pub fn new(input: PlanRef, sink_desc: SinkDesc, log_store_type: SinkLogStoreType) -> Self {
189        // The sink executor will transform the chunk into desired format based on the sink type
190        // before writing to the sink or emitting to the downstream. Thus, we need to derive the
191        // stream kind based on the sink type.
192        // We assert here because checks should already be done in `derive_sink_type`.
193        let input_kind = input.stream_kind();
194        let kind = match sink_desc.sink_type {
195            SinkType::AppendOnly => {
196                if !sink_desc.ignore_delete {
197                    assert_eq!(
198                        input_kind,
199                        StreamKind::AppendOnly,
200                        "{input_kind} stream cannot be used as input of append-only sink",
201                    );
202                }
203                StreamKind::AppendOnly
204            }
205            SinkType::Upsert => StreamKind::Upsert,
206            SinkType::Retract => {
207                assert_ne!(
208                    input_kind,
209                    StreamKind::Upsert,
210                    "upsert stream cannot be used as input of retract sink",
211                );
212                StreamKind::Retract
213            }
214        };
215
216        let base = PlanBase::new_stream(
217            input.ctx(),
218            input.schema().clone(),
219            // FIXME: We may reconstruct the chunk based on user-specified downstream pk, so
220            // we should also use `downstream_pk` as the stream key of the output. Though this
221            // is unlikely to result in correctness issues:
222            // - for sink-into-table, the `Materialize` node in the downstream table will always
223            //   enforce the pk consistency
224            // - for other sinks, the output of `Sink` node is not used
225            input.stream_key().map(|v| v.to_vec()),
226            input.functional_dependency().clone(),
227            input.distribution().clone(),
228            kind,
229            input.emit_on_window_close(),
230            input.watermark_columns().clone(),
231            input.columns_monotonicity().clone(),
232        );
233
234        Self {
235            base,
236            input,
237            sink_desc,
238            log_store_type,
239        }
240    }
241
242    pub fn sink_desc(&self) -> &SinkDesc {
243        &self.sink_desc
244    }
245
246    fn derive_iceberg_sink_distribution(
247        input: PlanRef,
248        partition_info: Option<PartitionComputeInfo>,
249        columns: &[ColumnCatalog],
250    ) -> Result<(RequiredDist, PlanRef, Option<usize>)> {
251        // For here, we need to add the plan node to compute the partition value, and add it as a extra column.
252        if let Some(partition_info) = partition_info {
253            let input_fields = input.schema().fields();
254
255            let mut exprs: Vec<_> = input_fields
256                .iter()
257                .enumerate()
258                .map(|(idx, field)| InputRef::new(idx, field.data_type.clone()).into())
259                .collect();
260
261            // Add the partition compute expression to the end of the exprs
262            exprs.push(partition_info.convert_to_expression(columns)?);
263            let partition_col_idx = exprs.len() - 1;
264            let project = StreamProject::new(generic::Project::new(exprs.clone(), input));
265            Ok((
266                RequiredDist::shard_by_key(project.schema().len(), &[partition_col_idx]),
267                project.into(),
268                Some(partition_col_idx),
269            ))
270        } else {
271            Ok((
272                RequiredDist::shard_by_key(input.schema().len(), input.expect_stream_key()),
273                input,
274                None,
275            ))
276        }
277    }
278
279    #[expect(clippy::too_many_arguments)]
280    pub fn create(
281        StreamOptimizedLogicalPlanRoot {
282            plan: mut input,
283            required_dist: user_distributed_by,
284            required_order: user_order_by,
285            out_fields: user_cols,
286            out_names,
287            ..
288        }: StreamOptimizedLogicalPlanRoot,
289        name: String,
290        db_name: String,
291        sink_from_table_name: String,
292        target_table: Option<Arc<TableCatalog>>,
293        target_table_mapping: Option<Vec<Option<usize>>>,
294        definition: String,
295        properties: WithOptionsSecResolved,
296        format_desc: Option<SinkFormatDesc>,
297        partition_info: Option<PartitionComputeInfo>,
298        auto_refresh_schema_from_table: Option<Arc<TableCatalog>>,
299    ) -> Result<Self> {
300        let (sink_type, ignore_delete) =
301            Self::derive_sink_type(input.stream_kind(), &properties, format_desc.as_ref())?;
302
303        let columns = derive_columns(input.schema(), out_names, &user_cols)?;
304        let (pk, _) = derive_pk(
305            input.clone(),
306            user_distributed_by.clone(),
307            user_order_by,
308            &columns,
309        );
310        let derived_pk = pk.iter().map(|k| k.column_index).collect_vec();
311
312        // Get downstream pk from user input, override and perform some checks if applicable.
313        let downstream_pk = {
314            let downstream_pk = properties
315                .get(DOWNSTREAM_PK_KEY)
316                .map(|v| Self::parse_downstream_pk(v, &columns))
317                .transpose()?;
318
319            if let Some(t) = &target_table {
320                let user_defined_primary_key_table = t.row_id_index.is_none();
321                let sink_is_append_only = sink_type.is_append_only();
322
323                if !user_defined_primary_key_table && !sink_is_append_only {
324                    return Err(RwError::from(ErrorCode::BindError(
325                        "Only append-only sinks can sink to a table without primary keys. please try to add type = 'append-only' in the with option. e.g. create sink s into t as select * from t1 with (type = 'append-only')".to_owned(),
326                    )));
327                }
328
329                if t.append_only && !sink_is_append_only {
330                    return Err(RwError::from(ErrorCode::BindError(
331                        "Only append-only sinks can sink to a append only table. please try to add type = 'append-only' in the with option. e.g. create sink s into t as select * from t1 with (type = 'append-only')".to_owned(),
332                    )));
333                }
334
335                if sink_is_append_only {
336                    None
337                } else {
338                    let target_table_mapping = target_table_mapping.unwrap();
339                    Some(t.pk()
340                        .iter()
341                        .map(|c| {
342                            target_table_mapping[c.column_index].ok_or_else(
343                                || ErrorCode::InvalidInputSyntax("When using non append only sink into table, the primary key of the table must be included in the sink result.".to_owned()).into())
344                        })
345                        .try_collect::<_, _, RwError>()?)
346                }
347            } else if properties.get(CREATE_TABLE_IF_NOT_EXISTS) == Some(&"true".to_owned())
348                && sink_type == SinkType::Upsert
349                && downstream_pk.is_none()
350            {
351                Some(derived_pk.clone())
352            } else if properties.is_iceberg_connector()
353                && sink_type == SinkType::Upsert
354                && downstream_pk.is_none()
355            {
356                // If user doesn't specify the downstream primary key, we use the stream key as the pk.
357                Some(derived_pk.clone())
358            } else {
359                downstream_pk
360            }
361        };
362
363        // Since we've already rejected empty pk in `parse_downstream_pk`, if we still get an empty pk here,
364        // it's likely that the derived stream key is used and it's empty, which is possible in cases of
365        // operators outputting at most one row (like `SimpleAgg`). This is legitimate. However, currently
366        // the sink implementation may confuse empty pk with not specifying pk, so we still reject this case
367        // for correctness.
368        if let Some(pk) = &downstream_pk
369            && pk.is_empty()
370        {
371            bail_invalid_input_syntax!(
372                "Empty primary key is not supported. \
373                 Please specify the primary key in WITH options."
374            )
375        }
376
377        // The "upsert" property is defined based on a specific stream key: columns other than the stream key
378        // might not be valid. We should reject the cases referencing such columns in primary key.
379        if let StreamKind::Upsert = input.stream_kind()
380            && let Some(downstream_pk) = &downstream_pk
381            && !downstream_pk.iter().all(|i| derived_pk.contains(i))
382        {
383            bail_bind_error!(
384                "When sinking from an upsert stream, \
385                 the downstream primary key must be the same as or a subset of the one derived from the stream."
386            )
387        }
388
389        if let Some(upstream_table) = &auto_refresh_schema_from_table
390            && let Some(downstream_pk) = &downstream_pk
391        {
392            let upstream_table_pk_col_names = upstream_table
393                .pk
394                .iter()
395                .map(|order| {
396                    upstream_table.columns[order.column_index]
397                        .column_desc
398                        .name()
399                })
400                .collect_vec();
401            let sink_pk_col_names = downstream_pk
402                .iter()
403                .map(|&column_index| columns[column_index].name())
404                .collect_vec();
405            if upstream_table_pk_col_names != sink_pk_col_names {
406                let is_iceberg_row_id_alias = properties.is_iceberg_connector()
407                    && upstream_table_pk_col_names.len() == 1
408                    && upstream_table_pk_col_names[0] == ROW_ID_COLUMN_NAME
409                    && sink_pk_col_names.len() == 1
410                    && sink_pk_col_names[0] == RISINGWAVE_ICEBERG_ROW_ID;
411                if !is_iceberg_row_id_alias {
412                    return Err(ErrorCode::InvalidInputSyntax(format!(
413                        "sink with auto schema change should have same pk as upstream table {:?}, but got {:?}",
414                        upstream_table_pk_col_names, sink_pk_col_names
415                    ))
416                    .into());
417                }
418            }
419        }
420        let mut extra_partition_col_idx = None;
421
422        let required_dist = match input.distribution() {
423            Distribution::Single => RequiredDist::single(),
424            _ => {
425                match properties.get("connector") {
426                    Some(s) if s == "jdbc" && sink_type == SinkType::Upsert => {
427                        let Some(downstream_pk) = &downstream_pk else {
428                            return Err(ErrorCode::InvalidInputSyntax(format!(
429                                "Primary key must be defined for upsert JDBC sink. Please specify the \"{key}='pk1,pk2,...'\" in WITH options.",
430                                key = DOWNSTREAM_PK_KEY
431                            )).into());
432                        };
433                        // for upsert jdbc sink we align distribution to downstream to avoid
434                        // lock contentions
435                        RequiredDist::hash_shard(downstream_pk)
436                    }
437                    Some(s) if s == ICEBERG_SINK => {
438                        let (required_dist, new_input, partition_col_idx) =
439                            Self::derive_iceberg_sink_distribution(
440                                input,
441                                partition_info,
442                                &columns,
443                            )?;
444                        input = new_input;
445                        extra_partition_col_idx = partition_col_idx;
446                        required_dist
447                    }
448                    _ => {
449                        assert_matches!(user_distributed_by, RequiredDist::Any);
450                        if let Some(downstream_pk) = &downstream_pk {
451                            // force the same primary key be written into the same sink shard to make sure the sink pk mismatch compaction effective
452                            // https://github.com/risingwavelabs/risingwave/blob/6d88344c286f250ea8a7e7ef6b9d74dea838269e/src/stream/src/executor/sink.rs#L169-L198
453                            RequiredDist::shard_by_key(input.schema().len(), downstream_pk)
454                        } else {
455                            RequiredDist::shard_by_key(
456                                input.schema().len(),
457                                input.expect_stream_key(),
458                            )
459                        }
460                    }
461                }
462            }
463        };
464        let input = required_dist.streaming_enforce_if_not_satisfies(input)?;
465        let input = if input.ctx().session_ctx().config().streaming_separate_sink()
466            && input.as_stream_exchange().is_none()
467        {
468            StreamExchange::new_no_shuffle(input).into()
469        } else {
470            input
471        };
472
473        let distribution_key = input.distribution().dist_column_indices().to_vec();
474        let create_type = if input.ctx().session_ctx().config().background_ddl()
475            && plan_can_use_background_ddl(&input)
476        {
477            CreateType::Background
478        } else {
479            CreateType::Foreground
480        };
481        let (mut properties, secret_refs) = properties.into_parts();
482        if let Some(target_table) = &target_table
483            && target_table_requires_row_level_conflict_handling(target_table)
484        {
485            properties.insert(
486                SINK_USER_PRESERVE_ROW_LEVEL_CHANGES.to_owned(),
487                "true".to_owned(),
488            );
489        }
490        let is_exactly_once = properties
491            .get("is_exactly_once")
492            .map(|v| v.to_lowercase() == "true");
493
494        let mut sink_desc = SinkDesc {
495            id: SinkId::placeholder(),
496            name,
497            db_name,
498            sink_from_name: sink_from_table_name,
499            definition,
500            columns,
501            plan_pk: pk,
502            downstream_pk,
503            distribution_key,
504            properties,
505            secret_refs,
506            sink_type,
507            ignore_delete,
508            format_desc,
509            target_table: target_table.as_ref().map(|catalog| catalog.id()),
510            extra_partition_col_idx,
511            create_type,
512            is_exactly_once,
513            auto_refresh_schema_from_table: auto_refresh_schema_from_table
514                .as_ref()
515                .map(|table| table.id),
516        };
517
518        let unsupported_sink = |sink: &str| -> Result<_> {
519            Err(ErrorCode::InvalidInputSyntax(format!("unsupported sink type {}", sink)).into())
520        };
521
522        // check and ensure that the sink connector is specified and supported
523        let sink_decouple = match sink_desc.properties.get(CONNECTOR_TYPE_KEY) {
524            Some(connector) => {
525                let connector_type = connector.to_lowercase();
526                match_sink_name_str!(
527                    connector_type.as_str(),
528                    SinkType,
529                    {
530                        // the table sink is created by with properties
531                        if connector == TABLE_SINK && sink_desc.target_table.is_none() {
532                            unsupported_sink(TABLE_SINK)
533                        } else {
534                            SinkType::set_default_commit_checkpoint_interval(
535                                &mut sink_desc,
536                                &input.ctx().session_ctx().config().sink_decouple(),
537                            )?;
538                            let support_schema_change = SinkType::support_schema_change();
539                            if !support_schema_change && auto_refresh_schema_from_table.is_some() {
540                                return Err(ErrorCode::InvalidInputSyntax(format!(
541                                    "{} sink does not support schema change",
542                                    connector_type
543                                ))
544                                .into());
545                            }
546                            SinkType::is_sink_decouple(
547                                &input.ctx().session_ctx().config().sink_decouple(),
548                            )
549                            .map_err(Into::into)
550                        }
551                    },
552                    |other: &str| unsupported_sink(other)
553                )?
554            }
555            None => {
556                return Err(ErrorCode::InvalidInputSyntax(
557                    "connector not specified when create sink".to_owned(),
558                )
559                .into());
560            }
561        };
562        let hint_string =
563            |expected: bool| format!("Please run `set sink_decouple = {}` first.", expected);
564        if !sink_decouple {
565            // For file sink, it must have sink_decouple turned on.
566            if sink_desc.is_file_sink() {
567                return Err(ErrorCode::NotSupported(
568                    "File sink can only be created with sink_decouple enabled.".to_owned(),
569                    hint_string(true),
570                )
571                .into());
572            }
573
574            if sink_desc.is_exactly_once.is_none()
575                && let Some(connector) = sink_desc.properties.get(CONNECTOR_TYPE_KEY)
576            {
577                let connector_type = connector.to_lowercase();
578                if connector_type == ICEBERG_SINK {
579                    // iceberg sink defaults to exactly once
580                    // However, when sink_decouple is disabled, we enforce it to false.
581                    sink_desc
582                        .properties
583                        .insert("is_exactly_once".to_owned(), "false".to_owned());
584                }
585            }
586        }
587        let log_store_type = if sink_decouple {
588            SinkLogStoreType::KvLogStore
589        } else {
590            SinkLogStoreType::InMemoryLogStore
591        };
592
593        // sink into table should have logstore for sink_decouple
594        let input = if sink_decouple && target_table.is_some() {
595            StreamSyncLogStore::new(input).into()
596        } else {
597            input
598        };
599
600        Ok(Self::new(input, sink_desc, log_store_type))
601    }
602
603    fn sink_type_in_prop(properties: &WithOptionsSecResolved) -> Result<Option<SinkType>> {
604        if let Some(sink_type) = properties.get(SINK_TYPE_OPTION) {
605            let sink_type = match sink_type.as_str() {
606                SINK_TYPE_APPEND_ONLY => SinkType::AppendOnly,
607                SINK_TYPE_UPSERT => {
608                    if properties.is_iceberg_connector() {
609                        // Iceberg sink must use retract to represent deletes
610                        SinkType::Retract
611                    } else {
612                        SinkType::Upsert
613                    }
614                }
615                SINK_TYPE_RETRACT | SINK_TYPE_DEBEZIUM => SinkType::Retract,
616                _ => {
617                    return Err(ErrorCode::InvalidInputSyntax(format!(
618                        "`{}` must be {}, {}, {}, or {}",
619                        SINK_TYPE_OPTION,
620                        SINK_TYPE_APPEND_ONLY,
621                        SINK_TYPE_RETRACT,
622                        SINK_TYPE_UPSERT,
623                        SINK_TYPE_DEBEZIUM,
624                    ))
625                    .into());
626                }
627            };
628            return Ok(Some(sink_type));
629        }
630        Ok(None)
631    }
632
633    /// `ignore_delete` option, with backward-compatible alias `force_append_only`.
634    fn is_user_ignore_delete(properties: &WithOptionsSecResolved) -> Result<bool> {
635        let has_ignore_delete = properties.contains_key(SINK_USER_IGNORE_DELETE_OPTION);
636        let has_force_append_only = properties.contains_key(SINK_USER_FORCE_APPEND_ONLY_OPTION);
637
638        if has_ignore_delete && has_force_append_only {
639            return Err(ErrorCode::InvalidInputSyntax(format!(
640                "`{}` is an alias of `{}`, only one of them can be specified.",
641                SINK_USER_FORCE_APPEND_ONLY_OPTION, SINK_USER_IGNORE_DELETE_OPTION
642            ))
643            .into());
644        }
645
646        let key = if has_ignore_delete {
647            SINK_USER_IGNORE_DELETE_OPTION
648        } else if has_force_append_only {
649            SINK_USER_FORCE_APPEND_ONLY_OPTION
650        } else {
651            return Ok(false);
652        };
653
654        if properties.value_eq_ignore_case(key, "true") {
655            Ok(true)
656        } else if properties.value_eq_ignore_case(key, "false") {
657            Ok(false)
658        } else {
659            Err(ErrorCode::InvalidInputSyntax(format!("`{key}` must be true or false")).into())
660        }
661    }
662
663    /// Derive the sink type based on...
664    ///
665    /// - the derived stream kind of the plan, from the optimizer
666    /// - sink format required by [`SinkFormatDesc`], if any
667    /// - user-specified sink type in WITH options, if any
668    /// - user-specified `ignore_delete` (`force_append_only`) in WITH options, if any
669    ///
670    /// Returns the `sink_type` and `ignore_delete`.
671    fn derive_sink_type(
672        derived_stream_kind: StreamKind,
673        properties: &WithOptionsSecResolved,
674        format_desc: Option<&SinkFormatDesc>,
675    ) -> Result<(SinkType, bool)> {
676        let (user_defined_sink_type, user_ignore_delete, syntax_legacy) = match format_desc {
677            Some(f) => (
678                Some(match f.format {
679                    SinkFormat::AppendOnly => SinkType::AppendOnly,
680                    SinkFormat::Upsert => SinkType::Upsert,
681                    SinkFormat::Debezium => SinkType::Retract,
682                }),
683                Self::is_user_ignore_delete(&WithOptionsSecResolved::without_secrets(
684                    f.options.clone(),
685                ))?,
686                false,
687            ),
688            None => (
689                Self::sink_type_in_prop(properties)?,
690                Self::is_user_ignore_delete(properties)?,
691                true,
692            ),
693        };
694
695        if let Some(user_defined_sink_type) = user_defined_sink_type {
696            match user_defined_sink_type {
697                SinkType::AppendOnly => {
698                    if derived_stream_kind != StreamKind::AppendOnly && !user_ignore_delete {
699                        return Err(ErrorCode::InvalidInputSyntax(format!(
700                            "The sink of {} stream cannot be append-only. Please add \"force_append_only='true'\" in {} options to force the sink to be append-only. \
701                             Notice that this will cause the sink executor to drop DELETE messages and convert UPDATE messages to INSERT.",
702                            derived_stream_kind,
703                            if syntax_legacy { "WITH" } else { "FORMAT ENCODE" }
704                    ))
705                        .into());
706                    }
707                }
708                SinkType::Upsert => { /* always qualified */ }
709                SinkType::Retract => {
710                    if user_ignore_delete {
711                        bail_invalid_input_syntax!(
712                            "Retract sink type does not support `ignore_delete`. \
713                             Please use `type = 'append-only'` or `type = 'upsert'` instead.",
714                        );
715                    }
716                    if derived_stream_kind == StreamKind::Upsert {
717                        bail_invalid_input_syntax!(
718                            "The sink of upsert stream cannot be retract. \
719                             Please create a materialized view or sink-into-table with this query before sinking it.",
720                        );
721                    }
722                }
723            }
724            Ok((user_defined_sink_type, user_ignore_delete))
725        } else {
726            // No specification at all, follow the optimizer's derivation.
727            // This is also the case for sink-into-table.
728            let sink_type = match derived_stream_kind {
729                // We downgrade `Retract` to `Upsert` unless explicitly specified the type in options,
730                // as it is well supported by most sinks and reduces the amount of data written.
731                StreamKind::Retract | StreamKind::Upsert => SinkType::Upsert,
732                StreamKind::AppendOnly => SinkType::AppendOnly,
733            };
734            Ok((sink_type, user_ignore_delete))
735        }
736    }
737
738    /// Extract user-defined downstream pk columns from with options. Return the indices of the pk
739    /// columns. An empty list of columns is not allowed.
740    ///
741    /// The format of `downstream_pk_str` should be 'col1,col2,...' (delimited by `,`) in order to
742    /// get parsed.
743    fn parse_downstream_pk(
744        downstream_pk_str: &str,
745        columns: &[ColumnCatalog],
746    ) -> Result<Vec<usize>> {
747        // If the user defines the downstream primary key, we find out their indices.
748        let downstream_pk = downstream_pk_str.split(',').collect_vec();
749        let mut downstream_pk_indices = Vec::with_capacity(downstream_pk.len());
750        for key in downstream_pk {
751            let trimmed_key = key.trim();
752            if trimmed_key.is_empty() {
753                continue;
754            }
755            downstream_pk_indices.push(find_column_idx_by_name(columns, trimmed_key)?);
756        }
757        if downstream_pk_indices.is_empty() {
758            bail_invalid_input_syntax!(
759                "Specified primary key should not be empty. \
760                To use derived primary key, remove {DOWNSTREAM_PK_KEY} from WITH options instead."
761            );
762        }
763        Ok(downstream_pk_indices)
764    }
765
766    /// The table schema is: | epoch | seq id | row op | sink columns |
767    /// Pk is: | epoch | seq id |
768    fn infer_kv_log_store_table_catalog(&self) -> TableCatalog {
769        infer_kv_log_store_table_catalog_inner(&self.input, &self.sink_desc().columns)
770    }
771
772    /// Convert this `StreamSink` into a `PlanRef`.
773    ///
774    /// For Iceberg pk index sinks, this rewrites the plan into
775    /// `Upstream → Writer → Exchange(Single) → DvMerger` instead of a single `SinkNode`.
776    /// For all other sinks, returns `self` as-is.
777    pub fn into_stream_plan(self) -> Result<PlanRef> {
778        use super::{StreamIcebergWithPkIndexDvMerger, StreamIcebergWithPkIndexWriter};
779
780        if !is_iceberg_with_pk_index_sink(&self.sink_desc)? {
781            return Ok(self.into());
782        }
783
784        let writer: PlanRef = StreamIcebergWithPkIndexWriter::from_stream_sink(&self)?.into();
785        let dv_merger: PlanRef =
786            StreamIcebergWithPkIndexDvMerger::new(writer, self.sink_desc).into();
787        Ok(dv_merger)
788    }
789}
790
791pub fn is_iceberg_with_pk_index_sink(sink_desc: &SinkDesc) -> Result<bool> {
792    if !sink_desc
793        .properties
794        .get(CONNECTOR_TYPE_KEY)
795        .is_some_and(|connector| connector.eq_ignore_ascii_case(ICEBERG_SINK))
796    {
797        return Ok(false);
798    }
799
800    let res = sink_desc
801        .properties
802        .get(ENABLE_PK_INDEX)
803        .is_some_and(|v| v.eq_ignore_ascii_case("true"));
804    Ok(res)
805}
806
807impl PlanTreeNodeUnary<Stream> for StreamSink {
808    fn input(&self) -> PlanRef {
809        self.input.clone()
810    }
811
812    fn clone_with_input(&self, input: PlanRef) -> Self {
813        Self::new(input, self.sink_desc.clone(), self.log_store_type)
814        // TODO(nanderstabel): Add assertions (assert_eq!)
815    }
816}
817
818impl_plan_tree_node_for_unary! { Stream, StreamSink }
819
820impl Distill for StreamSink {
821    fn distill<'a>(&self) -> XmlNode<'a> {
822        let sink_type = if self.sink_desc.sink_type.is_append_only() {
823            "append-only"
824        } else {
825            "upsert"
826        };
827        let column_names = self
828            .sink_desc
829            .columns
830            .iter()
831            .map(|col| col.name_with_hidden().to_string())
832            .map(Pretty::from)
833            .collect();
834        let column_names = Pretty::Array(column_names);
835        let mut vec = Vec::with_capacity(3);
836        vec.push(("type", Pretty::from(sink_type)));
837        vec.push(("columns", column_names));
838        if let Some(pk) = &self.sink_desc.downstream_pk {
839            let sink_pk = IndicesDisplay {
840                indices: pk,
841                schema: self.base.schema(),
842            };
843            vec.push(("downstream_pk", sink_pk.distill()));
844        }
845        childless_record("StreamSink", vec)
846    }
847}
848
849impl StreamNode for StreamSink {
850    fn to_stream_prost_body(&self, state: &mut BuildFragmentGraphState) -> PbNodeBody {
851        use risingwave_pb::stream_plan::*;
852
853        // We need to create a table for sink with a kv log store.
854        let table = self
855            .infer_kv_log_store_table_catalog()
856            .with_id(state.gen_table_id_wrapped());
857
858        PbNodeBody::Sink(Box::new(SinkNode {
859            sink_desc: Some(self.sink_desc.to_proto()),
860            table: Some(table.to_internal_table_prost()),
861            log_store_type: self.log_store_type as i32,
862            rate_limit: self.base.ctx().overwrite_options().sink_rate_limit,
863        }))
864    }
865}
866
867impl ExprRewritable<Stream> for StreamSink {}
868
869impl ExprVisitable for StreamSink {}
870
871#[cfg(test)]
872mod test {
873    use fixedbitset::FixedBitSet;
874    use risingwave_common::catalog::{
875        ColumnCatalog, ColumnDesc, ColumnId, ConflictBehavior, Field,
876    };
877    use risingwave_common::types::{DataType, StructType};
878    use risingwave_common::util::iter_util::ZipEqDebug;
879    use risingwave_pb::expr::expr_node::Type;
880
881    use super::{IcebergPartitionInfo, *};
882    use crate::catalog::table_catalog::TableType;
883    use crate::expr::{Expr, ExprImpl};
884    use crate::optimizer::plan_node::utils::TableCatalogBuilder;
885
886    fn create_column_catalog() -> Vec<ColumnCatalog> {
887        vec![
888            ColumnCatalog {
889                column_desc: ColumnDesc::named("v1", ColumnId::new(1), DataType::Int32),
890                is_hidden: false,
891            },
892            ColumnCatalog {
893                column_desc: ColumnDesc::named("v2", ColumnId::new(2), DataType::Timestamptz),
894                is_hidden: false,
895            },
896            ColumnCatalog {
897                column_desc: ColumnDesc::named("v3", ColumnId::new(2), DataType::Timestamp),
898                is_hidden: false,
899            },
900        ]
901    }
902
903    fn test_target_table(
904        conflict_behavior: ConflictBehavior,
905        version_column_indices: Vec<usize>,
906    ) -> TableCatalog {
907        let mut builder = TableCatalogBuilder::default();
908        let col_idx = builder.add_column(&Field::with_name(DataType::Int32, "v1"));
909        let mut table = builder.build(vec![], 0);
910        table.table_type = TableType::Table;
911        table.columns = vec![ColumnCatalog {
912            column_desc: ColumnDesc::named("v1", ColumnId::new(col_idx as i32), DataType::Int32),
913            is_hidden: false,
914        }];
915        table.conflict_behavior = conflict_behavior;
916        table.version_column_indices = version_column_indices;
917        table.watermark_columns = FixedBitSet::with_capacity(table.columns.len());
918        table
919    }
920
921    #[test]
922    fn test_target_table_requires_row_level_conflict_handling() {
923        assert!(target_table_requires_row_level_conflict_handling(
924            &test_target_table(ConflictBehavior::DoUpdateIfNotNull, vec![])
925        ));
926        assert!(target_table_requires_row_level_conflict_handling(
927            &test_target_table(ConflictBehavior::IgnoreConflict, vec![])
928        ));
929        assert!(target_table_requires_row_level_conflict_handling(
930            &test_target_table(ConflictBehavior::Overwrite, vec![0])
931        ));
932        assert!(!target_table_requires_row_level_conflict_handling(
933            &test_target_table(ConflictBehavior::Overwrite, vec![])
934        ));
935    }
936
937    #[test]
938    fn test_iceberg_convert_to_expression() {
939        let partition_type = StructType::new(vec![
940            ("f1", DataType::Int32),
941            ("f2", DataType::Int32),
942            ("f3", DataType::Int32),
943            ("f4", DataType::Int32),
944            ("f5", DataType::Int32),
945            ("f6", DataType::Int32),
946            ("f7", DataType::Int32),
947            ("f8", DataType::Int32),
948            ("f9", DataType::Int32),
949        ]);
950        let partition_fields = vec![
951            ("v1".into(), Transform::Identity),
952            ("v1".into(), Transform::Bucket(10)),
953            ("v1".into(), Transform::Truncate(3)),
954            ("v2".into(), Transform::Year),
955            ("v2".into(), Transform::Month),
956            ("v3".into(), Transform::Day),
957            ("v3".into(), Transform::Hour),
958            ("v1".into(), Transform::Void),
959            ("v3".into(), Transform::Void),
960        ];
961        let partition_info = IcebergPartitionInfo {
962            partition_type: partition_type.clone(),
963            partition_fields: partition_fields.clone(),
964        };
965        let catalog = create_column_catalog();
966        let actual_expr = partition_info.convert_to_expression(&catalog).unwrap();
967        let actual_expr = actual_expr.as_function_call().unwrap();
968
969        assert_eq!(
970            actual_expr.return_type(),
971            DataType::Struct(partition_type.clone())
972        );
973        assert_eq!(actual_expr.inputs().len(), partition_fields.len());
974        assert_eq!(actual_expr.func_type(), Type::Row);
975
976        for ((expr, (_, transform)), (_, expect_type)) in actual_expr
977            .inputs()
978            .iter()
979            .zip_eq_debug(partition_fields.iter())
980            .zip_eq_debug(partition_type.iter())
981        {
982            match transform {
983                Transform::Identity => {
984                    assert!(expr.is_input_ref());
985                    assert_eq!(expr.return_type(), *expect_type);
986                }
987                Transform::Void => {
988                    assert!(expr.is_literal());
989                    assert_eq!(expr.return_type(), *expect_type);
990                }
991                _ => {
992                    let expr = expr.as_function_call().unwrap();
993                    assert_eq!(expr.func_type(), Type::IcebergTransform);
994                    assert_eq!(expr.inputs().len(), 2);
995                    assert_eq!(
996                        expr.inputs()[0],
997                        ExprImpl::literal_varchar(transform.to_string())
998                    );
999                }
1000            }
1001        }
1002    }
1003}