risingwave_frontend/optimizer/plan_node/
stream_sink.rs

1// Copyright 2022 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::assert_matches::assert_matches;
16use std::sync::Arc;
17
18use iceberg::spec::Transform;
19use itertools::Itertools;
20use pretty_xmlish::{Pretty, XmlNode};
21use risingwave_common::catalog::{ColumnCatalog, CreateType, FieldLike};
22use risingwave_common::types::{DataType, StructType};
23use risingwave_common::util::iter_util::ZipEqDebug;
24use risingwave_connector::sink::catalog::desc::SinkDesc;
25use risingwave_connector::sink::catalog::{SinkFormat, SinkFormatDesc, SinkId, SinkType};
26use risingwave_connector::sink::file_sink::fs::FsSink;
27use risingwave_connector::sink::iceberg::ICEBERG_SINK;
28use risingwave_connector::sink::trivial::TABLE_SINK;
29use risingwave_connector::sink::{
30    CONNECTOR_TYPE_KEY, SINK_TYPE_APPEND_ONLY, SINK_TYPE_DEBEZIUM, SINK_TYPE_OPTION,
31    SINK_TYPE_RETRACT, SINK_TYPE_UPSERT, SINK_USER_FORCE_APPEND_ONLY_OPTION,
32    SINK_USER_IGNORE_DELETE_OPTION,
33};
34use risingwave_connector::{WithPropertiesExt, match_sink_name_str};
35use risingwave_pb::expr::expr_node::Type;
36use risingwave_pb::stream_plan::SinkLogStoreType;
37use risingwave_pb::stream_plan::stream_node::PbNodeBody;
38
39use super::derive::{derive_columns, derive_pk};
40use super::stream::prelude::*;
41use super::utils::{
42    Distill, IndicesDisplay, childless_record, infer_kv_log_store_table_catalog_inner,
43};
44use super::{
45    ExprRewritable, PlanBase, StreamExchange, StreamNode, StreamPlanRef as PlanRef, StreamProject,
46    StreamSyncLogStore, generic,
47};
48use crate::TableCatalog;
49use crate::error::{ErrorCode, Result, RwError, bail_bind_error, bail_invalid_input_syntax};
50use crate::expr::{ExprImpl, FunctionCall, InputRef};
51use crate::optimizer::StreamOptimizedLogicalPlanRoot;
52use crate::optimizer::plan_node::PlanTreeNodeUnary;
53use crate::optimizer::plan_node::expr_visitable::ExprVisitable;
54use crate::optimizer::plan_node::utils::plan_can_use_background_ddl;
55use crate::optimizer::property::{Distribution, RequiredDist};
56use crate::stream_fragmenter::BuildFragmentGraphState;
57use crate::utils::WithOptionsSecResolved;
58
59const DOWNSTREAM_PK_KEY: &str = "primary_key";
60const CREATE_TABLE_IF_NOT_EXISTS: &str = "create_table_if_not_exists";
61
62/// ## Why we need `PartitionComputeInfo`?
63///
64/// For some sink, it will write the data into different file based on the partition value. E.g. iceberg sink(<https://iceberg.apache.org/spec/#partitioning>)
65/// For this kind of sink, the file num can be reduced if we can shuffle the data based on the partition value. More details can be found in <https://github.com/risingwavelabs/rfcs/pull/77>.
66/// So if the `PartitionComputeInfo` provided, we will create a `StreamProject` node to compute the partition value and shuffle the data based on the partition value before the sink.
67///
68/// ## What is `PartitionComputeInfo`?
69/// The `PartitionComputeInfo` contains the information about partition compute. The stream sink will use
70/// these information to create the corresponding expression in `StreamProject` node.
71///
72/// #TODO
73/// Maybe we should move this in sink?
74pub enum PartitionComputeInfo {
75    Iceberg(IcebergPartitionInfo),
76}
77
78impl PartitionComputeInfo {
79    pub fn convert_to_expression(self, columns: &[ColumnCatalog]) -> Result<ExprImpl> {
80        match self {
81            PartitionComputeInfo::Iceberg(info) => info.convert_to_expression(columns),
82        }
83    }
84}
85
86pub struct IcebergPartitionInfo {
87    pub partition_type: StructType,
88    // (partition_field_name, partition_field_transform)
89    pub partition_fields: Vec<(String, Transform)>,
90}
91
92impl IcebergPartitionInfo {
93    #[inline]
94    fn transform_to_expression(
95        transform: &Transform,
96        col_id: usize,
97        columns: &[ColumnCatalog],
98        result_type: DataType,
99    ) -> Result<ExprImpl> {
100        match transform {
101            Transform::Identity => {
102                if columns[col_id].column_desc.data_type != result_type {
103                    return Err(ErrorCode::InvalidInputSyntax(format!(
104                        "The partition field {} has type {}, but the partition field is {}",
105                        columns[col_id].column_desc.name,
106                        columns[col_id].column_desc.data_type,
107                        result_type
108                    ))
109                    .into());
110                }
111                Ok(ExprImpl::InputRef(
112                    InputRef::new(col_id, result_type).into(),
113                ))
114            }
115            Transform::Void => Ok(ExprImpl::literal_null(result_type)),
116            _ => Ok(ExprImpl::FunctionCall(
117                FunctionCall::new_unchecked(
118                    Type::IcebergTransform,
119                    vec![
120                        ExprImpl::literal_varchar(transform.to_string()),
121                        ExprImpl::InputRef(
122                            InputRef::new(col_id, columns[col_id].column_desc.data_type.clone())
123                                .into(),
124                        ),
125                    ],
126                    result_type,
127                )
128                .into(),
129            )),
130        }
131    }
132
133    pub fn convert_to_expression(self, columns: &[ColumnCatalog]) -> Result<ExprImpl> {
134        let child_exprs = self
135            .partition_fields
136            .into_iter()
137            .zip_eq_debug(self.partition_type.iter())
138            .map(|((field_name, transform), (_, result_type))| {
139                let col_id = find_column_idx_by_name(columns, &field_name)?;
140                Self::transform_to_expression(&transform, col_id, columns, result_type.clone())
141            })
142            .collect::<Result<Vec<_>>>()?;
143
144        Ok(ExprImpl::FunctionCall(
145            FunctionCall::new_unchecked(
146                Type::Row,
147                child_exprs,
148                DataType::Struct(self.partition_type),
149            )
150            .into(),
151        ))
152    }
153}
154
155#[inline]
156fn find_column_idx_by_name(columns: &[ColumnCatalog], col_name: &str) -> Result<usize> {
157    columns
158        .iter()
159        .position(|col| col.column_desc.name == col_name)
160        .ok_or_else(|| {
161            ErrorCode::InvalidInputSyntax(format!("Sink primary key column not found: {}. Please use ',' as the delimiter for different primary key columns.", col_name))
162                .into()
163        })
164}
165
166/// [`StreamSink`] represents a table/connector sink at the very end of the graph.
167#[derive(Debug, Clone, PartialEq, Eq, Hash)]
168pub struct StreamSink {
169    pub base: PlanBase<Stream>,
170    input: PlanRef,
171    sink_desc: SinkDesc,
172    log_store_type: SinkLogStoreType,
173}
174
175impl StreamSink {
176    #[must_use]
177    pub fn new(input: PlanRef, sink_desc: SinkDesc, log_store_type: SinkLogStoreType) -> Self {
178        // The sink executor will transform the chunk into desired format based on the sink type
179        // before writing to the sink or emitting to the downstream. Thus, we need to derive the
180        // stream kind based on the sink type.
181        // We assert here because checks should already be done in `derive_sink_type`.
182        let input_kind = input.stream_kind();
183        let kind = match sink_desc.sink_type {
184            SinkType::AppendOnly => {
185                if !sink_desc.ignore_delete {
186                    assert_eq!(
187                        input_kind,
188                        StreamKind::AppendOnly,
189                        "{input_kind} stream cannot be used as input of append-only sink",
190                    );
191                }
192                StreamKind::AppendOnly
193            }
194            SinkType::Upsert => StreamKind::Upsert,
195            SinkType::Retract => {
196                assert_ne!(
197                    input_kind,
198                    StreamKind::Upsert,
199                    "upsert stream cannot be used as input of retract sink",
200                );
201                StreamKind::Retract
202            }
203        };
204
205        let base = PlanBase::new_stream(
206            input.ctx(),
207            input.schema().clone(),
208            // FIXME: We may reconstruct the chunk based on user-specified downstream pk, so
209            // we should also use `downstream_pk` as the stream key of the output. Though this
210            // is unlikely to result in correctness issues:
211            // - for sink-into-table, the `Materialize` node in the downstream table will always
212            //   enforce the pk consistency
213            // - for other sinks, the output of `Sink` node is not used
214            input.stream_key().map(|v| v.to_vec()),
215            input.functional_dependency().clone(),
216            input.distribution().clone(),
217            kind,
218            input.emit_on_window_close(),
219            input.watermark_columns().clone(),
220            input.columns_monotonicity().clone(),
221        );
222
223        Self {
224            base,
225            input,
226            sink_desc,
227            log_store_type,
228        }
229    }
230
231    pub fn sink_desc(&self) -> &SinkDesc {
232        &self.sink_desc
233    }
234
235    fn derive_iceberg_sink_distribution(
236        input: PlanRef,
237        partition_info: Option<PartitionComputeInfo>,
238        columns: &[ColumnCatalog],
239    ) -> Result<(RequiredDist, PlanRef, Option<usize>)> {
240        // For here, we need to add the plan node to compute the partition value, and add it as a extra column.
241        if let Some(partition_info) = partition_info {
242            let input_fields = input.schema().fields();
243
244            let mut exprs: Vec<_> = input_fields
245                .iter()
246                .enumerate()
247                .map(|(idx, field)| InputRef::new(idx, field.data_type.clone()).into())
248                .collect();
249
250            // Add the partition compute expression to the end of the exprs
251            exprs.push(partition_info.convert_to_expression(columns)?);
252            let partition_col_idx = exprs.len() - 1;
253            let project = StreamProject::new(generic::Project::new(exprs.clone(), input));
254            Ok((
255                RequiredDist::shard_by_key(project.schema().len(), &[partition_col_idx]),
256                project.into(),
257                Some(partition_col_idx),
258            ))
259        } else {
260            Ok((
261                RequiredDist::shard_by_key(input.schema().len(), input.expect_stream_key()),
262                input,
263                None,
264            ))
265        }
266    }
267
268    #[allow(clippy::too_many_arguments)]
269    pub fn create(
270        StreamOptimizedLogicalPlanRoot {
271            plan: mut input,
272            required_dist: user_distributed_by,
273            required_order: user_order_by,
274            out_fields: user_cols,
275            out_names,
276            ..
277        }: StreamOptimizedLogicalPlanRoot,
278        name: String,
279        db_name: String,
280        sink_from_table_name: String,
281        target_table: Option<Arc<TableCatalog>>,
282        target_table_mapping: Option<Vec<Option<usize>>>,
283        definition: String,
284        properties: WithOptionsSecResolved,
285        format_desc: Option<SinkFormatDesc>,
286        partition_info: Option<PartitionComputeInfo>,
287        auto_refresh_schema_from_table: Option<Arc<TableCatalog>>,
288    ) -> Result<Self> {
289        let (sink_type, ignore_delete) =
290            Self::derive_sink_type(input.stream_kind(), &properties, format_desc.as_ref())?;
291
292        let columns = derive_columns(input.schema(), out_names, &user_cols)?;
293        let (pk, _) = derive_pk(
294            input.clone(),
295            user_distributed_by.clone(),
296            user_order_by,
297            &columns,
298        );
299        let derived_pk = pk.iter().map(|k| k.column_index).collect_vec();
300
301        // Get downstream pk from user input, override and perform some checks if applicable.
302        let downstream_pk = {
303            let downstream_pk = properties
304                .get(DOWNSTREAM_PK_KEY)
305                .map(|v| Self::parse_downstream_pk(v, &columns))
306                .transpose()?;
307
308            if let Some(t) = &target_table {
309                let user_defined_primary_key_table = t.row_id_index.is_none();
310                let sink_is_append_only = sink_type.is_append_only();
311
312                if !user_defined_primary_key_table && !sink_is_append_only {
313                    return Err(RwError::from(ErrorCode::BindError(
314                        "Only append-only sinks can sink to a table without primary keys. please try to add type = 'append-only' in the with option. e.g. create sink s into t as select * from t1 with (type = 'append-only')".to_owned(),
315                    )));
316                }
317
318                if t.append_only && !sink_is_append_only {
319                    return Err(RwError::from(ErrorCode::BindError(
320                        "Only append-only sinks can sink to a append only table. please try to add type = 'append-only' in the with option. e.g. create sink s into t as select * from t1 with (type = 'append-only')".to_owned(),
321                    )));
322                }
323
324                if sink_is_append_only {
325                    None
326                } else {
327                    let target_table_mapping = target_table_mapping.unwrap();
328                    Some(t.pk()
329                        .iter()
330                        .map(|c| {
331                            target_table_mapping[c.column_index].ok_or_else(
332                                || ErrorCode::InvalidInputSyntax("When using non append only sink into table, the primary key of the table must be included in the sink result.".to_owned()).into())
333                        })
334                        .try_collect::<_, _, RwError>()?)
335                }
336            } else if properties.get(CREATE_TABLE_IF_NOT_EXISTS) == Some(&"true".to_owned())
337                && sink_type == SinkType::Upsert
338                && downstream_pk.is_none()
339            {
340                Some(derived_pk.clone())
341            } else if properties.is_iceberg_connector()
342                && sink_type == SinkType::Upsert
343                && downstream_pk.is_none()
344            {
345                // If user doesn't specify the downstream primary key, we use the stream key as the pk.
346                Some(derived_pk.clone())
347            } else {
348                downstream_pk
349            }
350        };
351
352        // Since we've already rejected empty pk in `parse_downstream_pk`, if we still get an empty pk here,
353        // it's likely that the derived stream key is used and it's empty, which is possible in cases of
354        // operators outputting at most one row (like `SimpleAgg`). This is legitimate. However, currently
355        // the sink implementation may confuse empty pk with not specifying pk, so we still reject this case
356        // for correctness.
357        if let Some(pk) = &downstream_pk
358            && pk.is_empty()
359        {
360            bail_invalid_input_syntax!(
361                "Empty primary key is not supported. \
362                 Please specify the primary key in WITH options."
363            )
364        }
365
366        // The "upsert" property is defined based on a specific stream key: columns other than the stream key
367        // might not be valid. We should reject the cases referencing such columns in primary key.
368        if let StreamKind::Upsert = input.stream_kind()
369            && let Some(downstream_pk) = &downstream_pk
370            && !downstream_pk.iter().all(|i| derived_pk.contains(i))
371        {
372            bail_bind_error!(
373                "When sinking from an upsert stream, \
374                 the downstream primary key must be the same as or a subset of the one derived from the stream."
375            )
376        }
377
378        if let Some(upstream_table) = &auto_refresh_schema_from_table
379            && let Some(downstream_pk) = &downstream_pk
380        {
381            let upstream_table_pk_col_names = upstream_table
382                .pk
383                .iter()
384                .map(|order| {
385                    upstream_table.columns[order.column_index]
386                        .column_desc
387                        .name()
388                })
389                .collect_vec();
390            let sink_pk_col_names = downstream_pk
391                .iter()
392                .map(|&column_index| columns[column_index].name())
393                .collect_vec();
394            if upstream_table_pk_col_names != sink_pk_col_names {
395                return Err(ErrorCode::InvalidInputSyntax(format!("sink with auto schema change should have same pk as upstream table {:?}, but got {:?}", upstream_table_pk_col_names, sink_pk_col_names)).into());
396            }
397        }
398        let mut extra_partition_col_idx = None;
399
400        let required_dist = match input.distribution() {
401            Distribution::Single => RequiredDist::single(),
402            _ => {
403                match properties.get("connector") {
404                    Some(s) if s == "jdbc" && sink_type == SinkType::Upsert => {
405                        let Some(downstream_pk) = &downstream_pk else {
406                            return Err(ErrorCode::InvalidInputSyntax(format!(
407                                "Primary key must be defined for upsert JDBC sink. Please specify the \"{key}='pk1,pk2,...'\" in WITH options.",
408                                key = DOWNSTREAM_PK_KEY
409                            )).into());
410                        };
411                        // for upsert jdbc sink we align distribution to downstream to avoid
412                        // lock contentions
413                        RequiredDist::hash_shard(downstream_pk)
414                    }
415                    Some(s) if s == ICEBERG_SINK => {
416                        let (required_dist, new_input, partition_col_idx) =
417                            Self::derive_iceberg_sink_distribution(
418                                input,
419                                partition_info,
420                                &columns,
421                            )?;
422                        input = new_input;
423                        extra_partition_col_idx = partition_col_idx;
424                        required_dist
425                    }
426                    _ => {
427                        assert_matches!(user_distributed_by, RequiredDist::Any);
428                        if let Some(downstream_pk) = &downstream_pk {
429                            // force the same primary key be written into the same sink shard to make sure the sink pk mismatch compaction effective
430                            // https://github.com/risingwavelabs/risingwave/blob/6d88344c286f250ea8a7e7ef6b9d74dea838269e/src/stream/src/executor/sink.rs#L169-L198
431                            RequiredDist::shard_by_key(input.schema().len(), downstream_pk)
432                        } else {
433                            RequiredDist::shard_by_key(
434                                input.schema().len(),
435                                input.expect_stream_key(),
436                            )
437                        }
438                    }
439                }
440            }
441        };
442        let input = required_dist.streaming_enforce_if_not_satisfies(input)?;
443        let input = if input.ctx().session_ctx().config().streaming_separate_sink()
444            && input.as_stream_exchange().is_none()
445        {
446            StreamExchange::new_no_shuffle(input).into()
447        } else {
448            input
449        };
450
451        let distribution_key = input.distribution().dist_column_indices().to_vec();
452        let create_type = if input.ctx().session_ctx().config().background_ddl()
453            && plan_can_use_background_ddl(&input)
454        {
455            CreateType::Background
456        } else {
457            CreateType::Foreground
458        };
459        let (properties, secret_refs) = properties.into_parts();
460        let is_exactly_once = properties
461            .get("is_exactly_once")
462            .map(|v| v.to_lowercase() == "true");
463
464        let mut sink_desc = SinkDesc {
465            id: SinkId::placeholder(),
466            name,
467            db_name,
468            sink_from_name: sink_from_table_name,
469            definition,
470            columns,
471            plan_pk: pk,
472            downstream_pk,
473            distribution_key,
474            properties,
475            secret_refs,
476            sink_type,
477            ignore_delete,
478            format_desc,
479            target_table: target_table.as_ref().map(|catalog| catalog.id()),
480            extra_partition_col_idx,
481            create_type,
482            is_exactly_once,
483            auto_refresh_schema_from_table: auto_refresh_schema_from_table
484                .as_ref()
485                .map(|table| table.id),
486        };
487
488        let unsupported_sink = |sink: &str| -> Result<_> {
489            Err(ErrorCode::InvalidInputSyntax(format!("unsupported sink type {}", sink)).into())
490        };
491
492        // check and ensure that the sink connector is specified and supported
493        let sink_decouple = match sink_desc.properties.get(CONNECTOR_TYPE_KEY) {
494            Some(connector) => {
495                let connector_type = connector.to_lowercase();
496                match_sink_name_str!(
497                    connector_type.as_str(),
498                    SinkType,
499                    {
500                        // the table sink is created by with properties
501                        if connector == TABLE_SINK && sink_desc.target_table.is_none() {
502                            unsupported_sink(TABLE_SINK)
503                        } else {
504                            SinkType::set_default_commit_checkpoint_interval(
505                                &mut sink_desc,
506                                &input.ctx().session_ctx().config().sink_decouple(),
507                            )?;
508                            let support_schema_change = SinkType::support_schema_change();
509                            if !support_schema_change && auto_refresh_schema_from_table.is_some() {
510                                return Err(ErrorCode::InvalidInputSyntax(format!(
511                                    "{} sink does not support schema change",
512                                    connector_type
513                                ))
514                                .into());
515                            }
516                            SinkType::is_sink_decouple(
517                                &input.ctx().session_ctx().config().sink_decouple(),
518                            )
519                            .map_err(Into::into)
520                        }
521                    },
522                    |other: &str| unsupported_sink(other)
523                )?
524            }
525            None => {
526                return Err(ErrorCode::InvalidInputSyntax(
527                    "connector not specified when create sink".to_owned(),
528                )
529                .into());
530            }
531        };
532        let hint_string =
533            |expected: bool| format!("Please run `set sink_decouple = {}` first.", expected);
534        if !sink_decouple {
535            // For file sink, it must have sink_decouple turned on.
536            if sink_desc.is_file_sink() {
537                return Err(ErrorCode::NotSupported(
538                    "File sink can only be created with sink_decouple enabled.".to_owned(),
539                    hint_string(true),
540                )
541                .into());
542            }
543
544            if sink_desc.is_exactly_once.is_none()
545                && let Some(connector) = sink_desc.properties.get(CONNECTOR_TYPE_KEY)
546            {
547                let connector_type = connector.to_lowercase();
548                if connector_type == ICEBERG_SINK {
549                    // iceberg sink defaults to exactly once
550                    // However, when sink_decouple is disabled, we enforce it to false.
551                    sink_desc
552                        .properties
553                        .insert("is_exactly_once".to_owned(), "false".to_owned());
554                }
555            }
556        }
557        let log_store_type = if sink_decouple {
558            SinkLogStoreType::KvLogStore
559        } else {
560            SinkLogStoreType::InMemoryLogStore
561        };
562
563        // sink into table should have logstore for sink_decouple
564        let input = if sink_decouple && target_table.is_some() {
565            StreamSyncLogStore::new(input).into()
566        } else {
567            input
568        };
569
570        Ok(Self::new(input, sink_desc, log_store_type))
571    }
572
573    fn sink_type_in_prop(properties: &WithOptionsSecResolved) -> Result<Option<SinkType>> {
574        if let Some(sink_type) = properties.get(SINK_TYPE_OPTION) {
575            let sink_type = match sink_type.as_str() {
576                SINK_TYPE_APPEND_ONLY => SinkType::AppendOnly,
577                SINK_TYPE_UPSERT => {
578                    if properties.is_iceberg_connector() {
579                        // Iceberg sink must use retract to represent deletes
580                        SinkType::Retract
581                    } else {
582                        SinkType::Upsert
583                    }
584                }
585                SINK_TYPE_RETRACT | SINK_TYPE_DEBEZIUM => SinkType::Retract,
586                _ => {
587                    return Err(ErrorCode::InvalidInputSyntax(format!(
588                        "`{}` must be {}, {}, {}, or {}",
589                        SINK_TYPE_OPTION,
590                        SINK_TYPE_APPEND_ONLY,
591                        SINK_TYPE_RETRACT,
592                        SINK_TYPE_UPSERT,
593                        SINK_TYPE_DEBEZIUM,
594                    ))
595                    .into());
596                }
597            };
598            return Ok(Some(sink_type));
599        }
600        Ok(None)
601    }
602
603    /// `ignore_delete` option, with backward-compatible alias `force_append_only`.
604    fn is_user_ignore_delete(properties: &WithOptionsSecResolved) -> Result<bool> {
605        let has_ignore_delete = properties.contains_key(SINK_USER_IGNORE_DELETE_OPTION);
606        let has_force_append_only = properties.contains_key(SINK_USER_FORCE_APPEND_ONLY_OPTION);
607
608        if has_ignore_delete && has_force_append_only {
609            return Err(ErrorCode::InvalidInputSyntax(format!(
610                "`{}` is an alias of `{}`, only one of them can be specified.",
611                SINK_USER_FORCE_APPEND_ONLY_OPTION, SINK_USER_IGNORE_DELETE_OPTION
612            ))
613            .into());
614        }
615
616        let key = if has_ignore_delete {
617            SINK_USER_IGNORE_DELETE_OPTION
618        } else if has_force_append_only {
619            SINK_USER_FORCE_APPEND_ONLY_OPTION
620        } else {
621            return Ok(false);
622        };
623
624        if properties.value_eq_ignore_case(key, "true") {
625            Ok(true)
626        } else if properties.value_eq_ignore_case(key, "false") {
627            Ok(false)
628        } else {
629            Err(ErrorCode::InvalidInputSyntax(format!("`{key}` must be true or false")).into())
630        }
631    }
632
633    /// Derive the sink type based on...
634    ///
635    /// - the derived stream kind of the plan, from the optimizer
636    /// - sink format required by [`SinkFormatDesc`], if any
637    /// - user-specified sink type in WITH options, if any
638    /// - user-specified `ignore_delete` (`force_append_only`) in WITH options, if any
639    ///
640    /// Returns the `sink_type` and `ignore_delete`.
641    fn derive_sink_type(
642        derived_stream_kind: StreamKind,
643        properties: &WithOptionsSecResolved,
644        format_desc: Option<&SinkFormatDesc>,
645    ) -> Result<(SinkType, bool)> {
646        let (user_defined_sink_type, user_ignore_delete, syntax_legacy) = match format_desc {
647            Some(f) => (
648                Some(match f.format {
649                    SinkFormat::AppendOnly => SinkType::AppendOnly,
650                    SinkFormat::Upsert => SinkType::Upsert,
651                    SinkFormat::Debezium => SinkType::Retract,
652                }),
653                Self::is_user_ignore_delete(&WithOptionsSecResolved::without_secrets(
654                    f.options.clone(),
655                ))?,
656                false,
657            ),
658            None => (
659                Self::sink_type_in_prop(properties)?,
660                Self::is_user_ignore_delete(properties)?,
661                true,
662            ),
663        };
664
665        if let Some(user_defined_sink_type) = user_defined_sink_type {
666            match user_defined_sink_type {
667                SinkType::AppendOnly => {
668                    if derived_stream_kind != StreamKind::AppendOnly && !user_ignore_delete {
669                        return Err(ErrorCode::InvalidInputSyntax(format!(
670                            "The sink of {} stream cannot be append-only. Please add \"force_append_only='true'\" in {} options to force the sink to be append-only. \
671                             Notice that this will cause the sink executor to drop DELETE messages and convert UPDATE messages to INSERT.",
672                            derived_stream_kind,
673                            if syntax_legacy { "WITH" } else { "FORMAT ENCODE" }
674                    ))
675                        .into());
676                    }
677                }
678                SinkType::Upsert => { /* always qualified */ }
679                SinkType::Retract => {
680                    if user_ignore_delete {
681                        bail_invalid_input_syntax!(
682                            "Retract sink type does not support `ignore_delete`. \
683                             Please use `type = 'append-only'` or `type = 'upsert'` instead.",
684                        );
685                    }
686                    if derived_stream_kind == StreamKind::Upsert {
687                        bail_invalid_input_syntax!(
688                            "The sink of upsert stream cannot be retract. \
689                             Please create a materialized view or sink-into-table with this query before sinking it.",
690                        );
691                    }
692                }
693            }
694            Ok((user_defined_sink_type, user_ignore_delete))
695        } else {
696            // No specification at all, follow the optimizer's derivation.
697            // This is also the case for sink-into-table.
698            let sink_type = match derived_stream_kind {
699                // We downgrade `Retract` to `Upsert` unless explicitly specified the type in options,
700                // as it is well supported by most sinks and reduces the amount of data written.
701                StreamKind::Retract | StreamKind::Upsert => SinkType::Upsert,
702                StreamKind::AppendOnly => SinkType::AppendOnly,
703            };
704            Ok((sink_type, user_ignore_delete))
705        }
706    }
707
708    /// Extract user-defined downstream pk columns from with options. Return the indices of the pk
709    /// columns. An empty list of columns is not allowed.
710    ///
711    /// The format of `downstream_pk_str` should be 'col1,col2,...' (delimited by `,`) in order to
712    /// get parsed.
713    fn parse_downstream_pk(
714        downstream_pk_str: &str,
715        columns: &[ColumnCatalog],
716    ) -> Result<Vec<usize>> {
717        // If the user defines the downstream primary key, we find out their indices.
718        let downstream_pk = downstream_pk_str.split(',').collect_vec();
719        let mut downstream_pk_indices = Vec::with_capacity(downstream_pk.len());
720        for key in downstream_pk {
721            let trimmed_key = key.trim();
722            if trimmed_key.is_empty() {
723                continue;
724            }
725            downstream_pk_indices.push(find_column_idx_by_name(columns, trimmed_key)?);
726        }
727        if downstream_pk_indices.is_empty() {
728            bail_invalid_input_syntax!(
729                "Specified primary key should not be empty. \
730                To use derived primary key, remove {DOWNSTREAM_PK_KEY} from WITH options instead."
731            );
732        }
733        Ok(downstream_pk_indices)
734    }
735
736    /// The table schema is: | epoch | seq id | row op | sink columns |
737    /// Pk is: | epoch | seq id |
738    fn infer_kv_log_store_table_catalog(&self) -> TableCatalog {
739        infer_kv_log_store_table_catalog_inner(&self.input, &self.sink_desc().columns)
740    }
741}
742
743impl PlanTreeNodeUnary<Stream> for StreamSink {
744    fn input(&self) -> PlanRef {
745        self.input.clone()
746    }
747
748    fn clone_with_input(&self, input: PlanRef) -> Self {
749        Self::new(input, self.sink_desc.clone(), self.log_store_type)
750        // TODO(nanderstabel): Add assertions (assert_eq!)
751    }
752}
753
754impl_plan_tree_node_for_unary! { Stream, StreamSink }
755
756impl Distill for StreamSink {
757    fn distill<'a>(&self) -> XmlNode<'a> {
758        let sink_type = if self.sink_desc.sink_type.is_append_only() {
759            "append-only"
760        } else {
761            "upsert"
762        };
763        let column_names = self
764            .sink_desc
765            .columns
766            .iter()
767            .map(|col| col.name_with_hidden().to_string())
768            .map(Pretty::from)
769            .collect();
770        let column_names = Pretty::Array(column_names);
771        let mut vec = Vec::with_capacity(3);
772        vec.push(("type", Pretty::from(sink_type)));
773        vec.push(("columns", column_names));
774        if let Some(pk) = &self.sink_desc.downstream_pk {
775            let sink_pk = IndicesDisplay {
776                indices: pk,
777                schema: self.base.schema(),
778            };
779            vec.push(("downstream_pk", sink_pk.distill()));
780        }
781        childless_record("StreamSink", vec)
782    }
783}
784
785impl StreamNode for StreamSink {
786    fn to_stream_prost_body(&self, state: &mut BuildFragmentGraphState) -> PbNodeBody {
787        use risingwave_pb::stream_plan::*;
788
789        // We need to create a table for sink with a kv log store.
790        let table = self
791            .infer_kv_log_store_table_catalog()
792            .with_id(state.gen_table_id_wrapped());
793
794        PbNodeBody::Sink(Box::new(SinkNode {
795            sink_desc: Some(self.sink_desc.to_proto()),
796            table: Some(table.to_internal_table_prost()),
797            log_store_type: self.log_store_type as i32,
798            rate_limit: self.base.ctx().overwrite_options().sink_rate_limit,
799        }))
800    }
801}
802
803impl ExprRewritable<Stream> for StreamSink {}
804
805impl ExprVisitable for StreamSink {}
806
807#[cfg(test)]
808mod test {
809    use risingwave_common::catalog::{ColumnCatalog, ColumnDesc, ColumnId};
810    use risingwave_common::types::{DataType, StructType};
811    use risingwave_common::util::iter_util::ZipEqDebug;
812    use risingwave_pb::expr::expr_node::Type;
813
814    use super::{IcebergPartitionInfo, *};
815    use crate::expr::{Expr, ExprImpl};
816
817    fn create_column_catalog() -> Vec<ColumnCatalog> {
818        vec![
819            ColumnCatalog {
820                column_desc: ColumnDesc::named("v1", ColumnId::new(1), DataType::Int32),
821                is_hidden: false,
822            },
823            ColumnCatalog {
824                column_desc: ColumnDesc::named("v2", ColumnId::new(2), DataType::Timestamptz),
825                is_hidden: false,
826            },
827            ColumnCatalog {
828                column_desc: ColumnDesc::named("v3", ColumnId::new(2), DataType::Timestamp),
829                is_hidden: false,
830            },
831        ]
832    }
833
834    #[test]
835    fn test_iceberg_convert_to_expression() {
836        let partition_type = StructType::new(vec![
837            ("f1", DataType::Int32),
838            ("f2", DataType::Int32),
839            ("f3", DataType::Int32),
840            ("f4", DataType::Int32),
841            ("f5", DataType::Int32),
842            ("f6", DataType::Int32),
843            ("f7", DataType::Int32),
844            ("f8", DataType::Int32),
845            ("f9", DataType::Int32),
846        ]);
847        let partition_fields = vec![
848            ("v1".into(), Transform::Identity),
849            ("v1".into(), Transform::Bucket(10)),
850            ("v1".into(), Transform::Truncate(3)),
851            ("v2".into(), Transform::Year),
852            ("v2".into(), Transform::Month),
853            ("v3".into(), Transform::Day),
854            ("v3".into(), Transform::Hour),
855            ("v1".into(), Transform::Void),
856            ("v3".into(), Transform::Void),
857        ];
858        let partition_info = IcebergPartitionInfo {
859            partition_type: partition_type.clone(),
860            partition_fields: partition_fields.clone(),
861        };
862        let catalog = create_column_catalog();
863        let actual_expr = partition_info.convert_to_expression(&catalog).unwrap();
864        let actual_expr = actual_expr.as_function_call().unwrap();
865
866        assert_eq!(
867            actual_expr.return_type(),
868            DataType::Struct(partition_type.clone())
869        );
870        assert_eq!(actual_expr.inputs().len(), partition_fields.len());
871        assert_eq!(actual_expr.func_type(), Type::Row);
872
873        for ((expr, (_, transform)), (_, expect_type)) in actual_expr
874            .inputs()
875            .iter()
876            .zip_eq_debug(partition_fields.iter())
877            .zip_eq_debug(partition_type.iter())
878        {
879            match transform {
880                Transform::Identity => {
881                    assert!(expr.is_input_ref());
882                    assert_eq!(expr.return_type(), *expect_type);
883                }
884                Transform::Void => {
885                    assert!(expr.is_literal());
886                    assert_eq!(expr.return_type(), *expect_type);
887                }
888                _ => {
889                    let expr = expr.as_function_call().unwrap();
890                    assert_eq!(expr.func_type(), Type::IcebergTransform);
891                    assert_eq!(expr.inputs().len(), 2);
892                    assert_eq!(
893                        expr.inputs()[0],
894                        ExprImpl::literal_varchar(transform.to_string())
895                    );
896                }
897            }
898        }
899    }
900}