risingwave_frontend/optimizer/plan_node/
stream_sink.rs

1// Copyright 2022 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::assert_matches::assert_matches;
16use std::sync::Arc;
17
18use iceberg::spec::Transform;
19use itertools::Itertools;
20use pretty_xmlish::{Pretty, XmlNode};
21use risingwave_common::catalog::{
22    ColumnCatalog, CreateType, FieldLike, RISINGWAVE_ICEBERG_ROW_ID, ROW_ID_COLUMN_NAME,
23};
24use risingwave_common::types::{DataType, StructType};
25use risingwave_common::util::iter_util::ZipEqDebug;
26use risingwave_connector::sink::catalog::desc::SinkDesc;
27use risingwave_connector::sink::catalog::{SinkFormat, SinkFormatDesc, SinkId, SinkType};
28use risingwave_connector::sink::file_sink::fs::FsSink;
29use risingwave_connector::sink::iceberg::ICEBERG_SINK;
30use risingwave_connector::sink::trivial::TABLE_SINK;
31use risingwave_connector::sink::{
32    CONNECTOR_TYPE_KEY, SINK_TYPE_APPEND_ONLY, SINK_TYPE_DEBEZIUM, SINK_TYPE_OPTION,
33    SINK_TYPE_RETRACT, SINK_TYPE_UPSERT, SINK_USER_FORCE_APPEND_ONLY_OPTION,
34    SINK_USER_IGNORE_DELETE_OPTION,
35};
36use risingwave_connector::{WithPropertiesExt, match_sink_name_str};
37use risingwave_pb::expr::expr_node::Type;
38use risingwave_pb::stream_plan::SinkLogStoreType;
39use risingwave_pb::stream_plan::stream_node::PbNodeBody;
40
41use super::derive::{derive_columns, derive_pk};
42use super::stream::prelude::*;
43use super::utils::{
44    Distill, IndicesDisplay, childless_record, infer_kv_log_store_table_catalog_inner,
45};
46use super::{
47    ExprRewritable, PlanBase, StreamExchange, StreamNode, StreamPlanRef as PlanRef, StreamProject,
48    StreamSyncLogStore, generic,
49};
50use crate::TableCatalog;
51use crate::error::{ErrorCode, Result, RwError, bail_bind_error, bail_invalid_input_syntax};
52use crate::expr::{ExprImpl, FunctionCall, InputRef};
53use crate::optimizer::StreamOptimizedLogicalPlanRoot;
54use crate::optimizer::plan_node::PlanTreeNodeUnary;
55use crate::optimizer::plan_node::expr_visitable::ExprVisitable;
56use crate::optimizer::plan_node::utils::plan_can_use_background_ddl;
57use crate::optimizer::property::{Distribution, RequiredDist};
58use crate::stream_fragmenter::BuildFragmentGraphState;
59use crate::utils::WithOptionsSecResolved;
60
61const DOWNSTREAM_PK_KEY: &str = "primary_key";
62const CREATE_TABLE_IF_NOT_EXISTS: &str = "create_table_if_not_exists";
63
64/// ## Why we need `PartitionComputeInfo`?
65///
66/// For some sink, it will write the data into different file based on the partition value. E.g. iceberg sink(<https://iceberg.apache.org/spec/#partitioning>)
67/// For this kind of sink, the file num can be reduced if we can shuffle the data based on the partition value. More details can be found in <https://github.com/risingwavelabs/rfcs/pull/77>.
68/// So if the `PartitionComputeInfo` provided, we will create a `StreamProject` node to compute the partition value and shuffle the data based on the partition value before the sink.
69///
70/// ## What is `PartitionComputeInfo`?
71/// The `PartitionComputeInfo` contains the information about partition compute. The stream sink will use
72/// these information to create the corresponding expression in `StreamProject` node.
73///
74/// #TODO
75/// Maybe we should move this in sink?
76pub enum PartitionComputeInfo {
77    Iceberg(IcebergPartitionInfo),
78}
79
80impl PartitionComputeInfo {
81    pub fn convert_to_expression(self, columns: &[ColumnCatalog]) -> Result<ExprImpl> {
82        match self {
83            PartitionComputeInfo::Iceberg(info) => info.convert_to_expression(columns),
84        }
85    }
86}
87
88pub struct IcebergPartitionInfo {
89    pub partition_type: StructType,
90    // (partition_field_name, partition_field_transform)
91    pub partition_fields: Vec<(String, Transform)>,
92}
93
94impl IcebergPartitionInfo {
95    #[inline]
96    fn transform_to_expression(
97        transform: &Transform,
98        col_id: usize,
99        columns: &[ColumnCatalog],
100        result_type: DataType,
101    ) -> Result<ExprImpl> {
102        match transform {
103            Transform::Identity => {
104                if columns[col_id].column_desc.data_type != result_type {
105                    return Err(ErrorCode::InvalidInputSyntax(format!(
106                        "The partition field {} has type {}, but the partition field is {}",
107                        columns[col_id].column_desc.name,
108                        columns[col_id].column_desc.data_type,
109                        result_type
110                    ))
111                    .into());
112                }
113                Ok(ExprImpl::InputRef(
114                    InputRef::new(col_id, result_type).into(),
115                ))
116            }
117            Transform::Void => Ok(ExprImpl::literal_null(result_type)),
118            _ => Ok(ExprImpl::FunctionCall(
119                FunctionCall::new_unchecked(
120                    Type::IcebergTransform,
121                    vec![
122                        ExprImpl::literal_varchar(transform.to_string()),
123                        ExprImpl::InputRef(
124                            InputRef::new(col_id, columns[col_id].column_desc.data_type.clone())
125                                .into(),
126                        ),
127                    ],
128                    result_type,
129                )
130                .into(),
131            )),
132        }
133    }
134
135    pub fn convert_to_expression(self, columns: &[ColumnCatalog]) -> Result<ExprImpl> {
136        let child_exprs = self
137            .partition_fields
138            .into_iter()
139            .zip_eq_debug(self.partition_type.iter())
140            .map(|((field_name, transform), (_, result_type))| {
141                let col_id = find_column_idx_by_name(columns, &field_name)?;
142                Self::transform_to_expression(&transform, col_id, columns, result_type.clone())
143            })
144            .collect::<Result<Vec<_>>>()?;
145
146        Ok(ExprImpl::FunctionCall(
147            FunctionCall::new_unchecked(
148                Type::Row,
149                child_exprs,
150                DataType::Struct(self.partition_type),
151            )
152            .into(),
153        ))
154    }
155}
156
157#[inline]
158fn find_column_idx_by_name(columns: &[ColumnCatalog], col_name: &str) -> Result<usize> {
159    columns
160        .iter()
161        .position(|col| col.column_desc.name == col_name)
162        .ok_or_else(|| {
163            ErrorCode::InvalidInputSyntax(format!("Sink primary key column not found: {}. Please use ',' as the delimiter for different primary key columns.", col_name))
164                .into()
165        })
166}
167
168/// [`StreamSink`] represents a table/connector sink at the very end of the graph.
169#[derive(Debug, Clone, PartialEq, Eq, Hash)]
170pub struct StreamSink {
171    pub base: PlanBase<Stream>,
172    input: PlanRef,
173    sink_desc: SinkDesc,
174    log_store_type: SinkLogStoreType,
175}
176
177impl StreamSink {
178    #[must_use]
179    pub fn new(input: PlanRef, sink_desc: SinkDesc, log_store_type: SinkLogStoreType) -> Self {
180        // The sink executor will transform the chunk into desired format based on the sink type
181        // before writing to the sink or emitting to the downstream. Thus, we need to derive the
182        // stream kind based on the sink type.
183        // We assert here because checks should already be done in `derive_sink_type`.
184        let input_kind = input.stream_kind();
185        let kind = match sink_desc.sink_type {
186            SinkType::AppendOnly => {
187                if !sink_desc.ignore_delete {
188                    assert_eq!(
189                        input_kind,
190                        StreamKind::AppendOnly,
191                        "{input_kind} stream cannot be used as input of append-only sink",
192                    );
193                }
194                StreamKind::AppendOnly
195            }
196            SinkType::Upsert => StreamKind::Upsert,
197            SinkType::Retract => {
198                assert_ne!(
199                    input_kind,
200                    StreamKind::Upsert,
201                    "upsert stream cannot be used as input of retract sink",
202                );
203                StreamKind::Retract
204            }
205        };
206
207        let base = PlanBase::new_stream(
208            input.ctx(),
209            input.schema().clone(),
210            // FIXME: We may reconstruct the chunk based on user-specified downstream pk, so
211            // we should also use `downstream_pk` as the stream key of the output. Though this
212            // is unlikely to result in correctness issues:
213            // - for sink-into-table, the `Materialize` node in the downstream table will always
214            //   enforce the pk consistency
215            // - for other sinks, the output of `Sink` node is not used
216            input.stream_key().map(|v| v.to_vec()),
217            input.functional_dependency().clone(),
218            input.distribution().clone(),
219            kind,
220            input.emit_on_window_close(),
221            input.watermark_columns().clone(),
222            input.columns_monotonicity().clone(),
223        );
224
225        Self {
226            base,
227            input,
228            sink_desc,
229            log_store_type,
230        }
231    }
232
233    pub fn sink_desc(&self) -> &SinkDesc {
234        &self.sink_desc
235    }
236
237    fn derive_iceberg_sink_distribution(
238        input: PlanRef,
239        partition_info: Option<PartitionComputeInfo>,
240        columns: &[ColumnCatalog],
241    ) -> Result<(RequiredDist, PlanRef, Option<usize>)> {
242        // For here, we need to add the plan node to compute the partition value, and add it as a extra column.
243        if let Some(partition_info) = partition_info {
244            let input_fields = input.schema().fields();
245
246            let mut exprs: Vec<_> = input_fields
247                .iter()
248                .enumerate()
249                .map(|(idx, field)| InputRef::new(idx, field.data_type.clone()).into())
250                .collect();
251
252            // Add the partition compute expression to the end of the exprs
253            exprs.push(partition_info.convert_to_expression(columns)?);
254            let partition_col_idx = exprs.len() - 1;
255            let project = StreamProject::new(generic::Project::new(exprs.clone(), input));
256            Ok((
257                RequiredDist::shard_by_key(project.schema().len(), &[partition_col_idx]),
258                project.into(),
259                Some(partition_col_idx),
260            ))
261        } else {
262            Ok((
263                RequiredDist::shard_by_key(input.schema().len(), input.expect_stream_key()),
264                input,
265                None,
266            ))
267        }
268    }
269
270    #[allow(clippy::too_many_arguments)]
271    pub fn create(
272        StreamOptimizedLogicalPlanRoot {
273            plan: mut input,
274            required_dist: user_distributed_by,
275            required_order: user_order_by,
276            out_fields: user_cols,
277            out_names,
278            ..
279        }: StreamOptimizedLogicalPlanRoot,
280        name: String,
281        db_name: String,
282        sink_from_table_name: String,
283        target_table: Option<Arc<TableCatalog>>,
284        target_table_mapping: Option<Vec<Option<usize>>>,
285        definition: String,
286        properties: WithOptionsSecResolved,
287        format_desc: Option<SinkFormatDesc>,
288        partition_info: Option<PartitionComputeInfo>,
289        auto_refresh_schema_from_table: Option<Arc<TableCatalog>>,
290    ) -> Result<Self> {
291        let (sink_type, ignore_delete) =
292            Self::derive_sink_type(input.stream_kind(), &properties, format_desc.as_ref())?;
293
294        let columns = derive_columns(input.schema(), out_names, &user_cols)?;
295        let (pk, _) = derive_pk(
296            input.clone(),
297            user_distributed_by.clone(),
298            user_order_by,
299            &columns,
300        );
301        let derived_pk = pk.iter().map(|k| k.column_index).collect_vec();
302
303        // Get downstream pk from user input, override and perform some checks if applicable.
304        let downstream_pk = {
305            let downstream_pk = properties
306                .get(DOWNSTREAM_PK_KEY)
307                .map(|v| Self::parse_downstream_pk(v, &columns))
308                .transpose()?;
309
310            if let Some(t) = &target_table {
311                let user_defined_primary_key_table = t.row_id_index.is_none();
312                let sink_is_append_only = sink_type.is_append_only();
313
314                if !user_defined_primary_key_table && !sink_is_append_only {
315                    return Err(RwError::from(ErrorCode::BindError(
316                        "Only append-only sinks can sink to a table without primary keys. please try to add type = 'append-only' in the with option. e.g. create sink s into t as select * from t1 with (type = 'append-only')".to_owned(),
317                    )));
318                }
319
320                if t.append_only && !sink_is_append_only {
321                    return Err(RwError::from(ErrorCode::BindError(
322                        "Only append-only sinks can sink to a append only table. please try to add type = 'append-only' in the with option. e.g. create sink s into t as select * from t1 with (type = 'append-only')".to_owned(),
323                    )));
324                }
325
326                if sink_is_append_only {
327                    None
328                } else {
329                    let target_table_mapping = target_table_mapping.unwrap();
330                    Some(t.pk()
331                        .iter()
332                        .map(|c| {
333                            target_table_mapping[c.column_index].ok_or_else(
334                                || ErrorCode::InvalidInputSyntax("When using non append only sink into table, the primary key of the table must be included in the sink result.".to_owned()).into())
335                        })
336                        .try_collect::<_, _, RwError>()?)
337                }
338            } else if properties.get(CREATE_TABLE_IF_NOT_EXISTS) == Some(&"true".to_owned())
339                && sink_type == SinkType::Upsert
340                && downstream_pk.is_none()
341            {
342                Some(derived_pk.clone())
343            } else if properties.is_iceberg_connector()
344                && sink_type == SinkType::Upsert
345                && downstream_pk.is_none()
346            {
347                // If user doesn't specify the downstream primary key, we use the stream key as the pk.
348                Some(derived_pk.clone())
349            } else {
350                downstream_pk
351            }
352        };
353
354        // Since we've already rejected empty pk in `parse_downstream_pk`, if we still get an empty pk here,
355        // it's likely that the derived stream key is used and it's empty, which is possible in cases of
356        // operators outputting at most one row (like `SimpleAgg`). This is legitimate. However, currently
357        // the sink implementation may confuse empty pk with not specifying pk, so we still reject this case
358        // for correctness.
359        if let Some(pk) = &downstream_pk
360            && pk.is_empty()
361        {
362            bail_invalid_input_syntax!(
363                "Empty primary key is not supported. \
364                 Please specify the primary key in WITH options."
365            )
366        }
367
368        // The "upsert" property is defined based on a specific stream key: columns other than the stream key
369        // might not be valid. We should reject the cases referencing such columns in primary key.
370        if let StreamKind::Upsert = input.stream_kind()
371            && let Some(downstream_pk) = &downstream_pk
372            && !downstream_pk.iter().all(|i| derived_pk.contains(i))
373        {
374            bail_bind_error!(
375                "When sinking from an upsert stream, \
376                 the downstream primary key must be the same as or a subset of the one derived from the stream."
377            )
378        }
379
380        if let Some(upstream_table) = &auto_refresh_schema_from_table
381            && let Some(downstream_pk) = &downstream_pk
382        {
383            let upstream_table_pk_col_names = upstream_table
384                .pk
385                .iter()
386                .map(|order| {
387                    upstream_table.columns[order.column_index]
388                        .column_desc
389                        .name()
390                })
391                .collect_vec();
392            let sink_pk_col_names = downstream_pk
393                .iter()
394                .map(|&column_index| columns[column_index].name())
395                .collect_vec();
396            if upstream_table_pk_col_names != sink_pk_col_names {
397                let is_iceberg_row_id_alias = properties.is_iceberg_connector()
398                    && upstream_table_pk_col_names.len() == 1
399                    && upstream_table_pk_col_names[0] == ROW_ID_COLUMN_NAME
400                    && sink_pk_col_names.len() == 1
401                    && sink_pk_col_names[0] == RISINGWAVE_ICEBERG_ROW_ID;
402                if !is_iceberg_row_id_alias {
403                    return Err(ErrorCode::InvalidInputSyntax(format!(
404                        "sink with auto schema change should have same pk as upstream table {:?}, but got {:?}",
405                        upstream_table_pk_col_names, sink_pk_col_names
406                    ))
407                    .into());
408                }
409            }
410        }
411        let mut extra_partition_col_idx = None;
412
413        let required_dist = match input.distribution() {
414            Distribution::Single => RequiredDist::single(),
415            _ => {
416                match properties.get("connector") {
417                    Some(s) if s == "jdbc" && sink_type == SinkType::Upsert => {
418                        let Some(downstream_pk) = &downstream_pk else {
419                            return Err(ErrorCode::InvalidInputSyntax(format!(
420                                "Primary key must be defined for upsert JDBC sink. Please specify the \"{key}='pk1,pk2,...'\" in WITH options.",
421                                key = DOWNSTREAM_PK_KEY
422                            )).into());
423                        };
424                        // for upsert jdbc sink we align distribution to downstream to avoid
425                        // lock contentions
426                        RequiredDist::hash_shard(downstream_pk)
427                    }
428                    Some(s) if s == ICEBERG_SINK => {
429                        let (required_dist, new_input, partition_col_idx) =
430                            Self::derive_iceberg_sink_distribution(
431                                input,
432                                partition_info,
433                                &columns,
434                            )?;
435                        input = new_input;
436                        extra_partition_col_idx = partition_col_idx;
437                        required_dist
438                    }
439                    _ => {
440                        assert_matches!(user_distributed_by, RequiredDist::Any);
441                        if let Some(downstream_pk) = &downstream_pk {
442                            // force the same primary key be written into the same sink shard to make sure the sink pk mismatch compaction effective
443                            // https://github.com/risingwavelabs/risingwave/blob/6d88344c286f250ea8a7e7ef6b9d74dea838269e/src/stream/src/executor/sink.rs#L169-L198
444                            RequiredDist::shard_by_key(input.schema().len(), downstream_pk)
445                        } else {
446                            RequiredDist::shard_by_key(
447                                input.schema().len(),
448                                input.expect_stream_key(),
449                            )
450                        }
451                    }
452                }
453            }
454        };
455        let input = required_dist.streaming_enforce_if_not_satisfies(input)?;
456        let input = if input.ctx().session_ctx().config().streaming_separate_sink()
457            && input.as_stream_exchange().is_none()
458        {
459            StreamExchange::new_no_shuffle(input).into()
460        } else {
461            input
462        };
463
464        let distribution_key = input.distribution().dist_column_indices().to_vec();
465        let create_type = if input.ctx().session_ctx().config().background_ddl()
466            && plan_can_use_background_ddl(&input)
467        {
468            CreateType::Background
469        } else {
470            CreateType::Foreground
471        };
472        let (properties, secret_refs) = properties.into_parts();
473        let is_exactly_once = properties
474            .get("is_exactly_once")
475            .map(|v| v.to_lowercase() == "true");
476
477        let mut sink_desc = SinkDesc {
478            id: SinkId::placeholder(),
479            name,
480            db_name,
481            sink_from_name: sink_from_table_name,
482            definition,
483            columns,
484            plan_pk: pk,
485            downstream_pk,
486            distribution_key,
487            properties,
488            secret_refs,
489            sink_type,
490            ignore_delete,
491            format_desc,
492            target_table: target_table.as_ref().map(|catalog| catalog.id()),
493            extra_partition_col_idx,
494            create_type,
495            is_exactly_once,
496            auto_refresh_schema_from_table: auto_refresh_schema_from_table
497                .as_ref()
498                .map(|table| table.id),
499        };
500
501        let unsupported_sink = |sink: &str| -> Result<_> {
502            Err(ErrorCode::InvalidInputSyntax(format!("unsupported sink type {}", sink)).into())
503        };
504
505        // check and ensure that the sink connector is specified and supported
506        let sink_decouple = match sink_desc.properties.get(CONNECTOR_TYPE_KEY) {
507            Some(connector) => {
508                let connector_type = connector.to_lowercase();
509                match_sink_name_str!(
510                    connector_type.as_str(),
511                    SinkType,
512                    {
513                        // the table sink is created by with properties
514                        if connector == TABLE_SINK && sink_desc.target_table.is_none() {
515                            unsupported_sink(TABLE_SINK)
516                        } else {
517                            SinkType::set_default_commit_checkpoint_interval(
518                                &mut sink_desc,
519                                &input.ctx().session_ctx().config().sink_decouple(),
520                            )?;
521                            let support_schema_change = SinkType::support_schema_change();
522                            if !support_schema_change && auto_refresh_schema_from_table.is_some() {
523                                return Err(ErrorCode::InvalidInputSyntax(format!(
524                                    "{} sink does not support schema change",
525                                    connector_type
526                                ))
527                                .into());
528                            }
529                            SinkType::is_sink_decouple(
530                                &input.ctx().session_ctx().config().sink_decouple(),
531                            )
532                            .map_err(Into::into)
533                        }
534                    },
535                    |other: &str| unsupported_sink(other)
536                )?
537            }
538            None => {
539                return Err(ErrorCode::InvalidInputSyntax(
540                    "connector not specified when create sink".to_owned(),
541                )
542                .into());
543            }
544        };
545        let hint_string =
546            |expected: bool| format!("Please run `set sink_decouple = {}` first.", expected);
547        if !sink_decouple {
548            // For file sink, it must have sink_decouple turned on.
549            if sink_desc.is_file_sink() {
550                return Err(ErrorCode::NotSupported(
551                    "File sink can only be created with sink_decouple enabled.".to_owned(),
552                    hint_string(true),
553                )
554                .into());
555            }
556
557            if sink_desc.is_exactly_once.is_none()
558                && let Some(connector) = sink_desc.properties.get(CONNECTOR_TYPE_KEY)
559            {
560                let connector_type = connector.to_lowercase();
561                if connector_type == ICEBERG_SINK {
562                    // iceberg sink defaults to exactly once
563                    // However, when sink_decouple is disabled, we enforce it to false.
564                    sink_desc
565                        .properties
566                        .insert("is_exactly_once".to_owned(), "false".to_owned());
567                }
568            }
569        }
570        let log_store_type = if sink_decouple {
571            SinkLogStoreType::KvLogStore
572        } else {
573            SinkLogStoreType::InMemoryLogStore
574        };
575
576        // sink into table should have logstore for sink_decouple
577        let input = if sink_decouple && target_table.is_some() {
578            StreamSyncLogStore::new(input).into()
579        } else {
580            input
581        };
582
583        Ok(Self::new(input, sink_desc, log_store_type))
584    }
585
586    fn sink_type_in_prop(properties: &WithOptionsSecResolved) -> Result<Option<SinkType>> {
587        if let Some(sink_type) = properties.get(SINK_TYPE_OPTION) {
588            let sink_type = match sink_type.as_str() {
589                SINK_TYPE_APPEND_ONLY => SinkType::AppendOnly,
590                SINK_TYPE_UPSERT => {
591                    if properties.is_iceberg_connector() {
592                        // Iceberg sink must use retract to represent deletes
593                        SinkType::Retract
594                    } else {
595                        SinkType::Upsert
596                    }
597                }
598                SINK_TYPE_RETRACT | SINK_TYPE_DEBEZIUM => SinkType::Retract,
599                _ => {
600                    return Err(ErrorCode::InvalidInputSyntax(format!(
601                        "`{}` must be {}, {}, {}, or {}",
602                        SINK_TYPE_OPTION,
603                        SINK_TYPE_APPEND_ONLY,
604                        SINK_TYPE_RETRACT,
605                        SINK_TYPE_UPSERT,
606                        SINK_TYPE_DEBEZIUM,
607                    ))
608                    .into());
609                }
610            };
611            return Ok(Some(sink_type));
612        }
613        Ok(None)
614    }
615
616    /// `ignore_delete` option, with backward-compatible alias `force_append_only`.
617    fn is_user_ignore_delete(properties: &WithOptionsSecResolved) -> Result<bool> {
618        let has_ignore_delete = properties.contains_key(SINK_USER_IGNORE_DELETE_OPTION);
619        let has_force_append_only = properties.contains_key(SINK_USER_FORCE_APPEND_ONLY_OPTION);
620
621        if has_ignore_delete && has_force_append_only {
622            return Err(ErrorCode::InvalidInputSyntax(format!(
623                "`{}` is an alias of `{}`, only one of them can be specified.",
624                SINK_USER_FORCE_APPEND_ONLY_OPTION, SINK_USER_IGNORE_DELETE_OPTION
625            ))
626            .into());
627        }
628
629        let key = if has_ignore_delete {
630            SINK_USER_IGNORE_DELETE_OPTION
631        } else if has_force_append_only {
632            SINK_USER_FORCE_APPEND_ONLY_OPTION
633        } else {
634            return Ok(false);
635        };
636
637        if properties.value_eq_ignore_case(key, "true") {
638            Ok(true)
639        } else if properties.value_eq_ignore_case(key, "false") {
640            Ok(false)
641        } else {
642            Err(ErrorCode::InvalidInputSyntax(format!("`{key}` must be true or false")).into())
643        }
644    }
645
646    /// Derive the sink type based on...
647    ///
648    /// - the derived stream kind of the plan, from the optimizer
649    /// - sink format required by [`SinkFormatDesc`], if any
650    /// - user-specified sink type in WITH options, if any
651    /// - user-specified `ignore_delete` (`force_append_only`) in WITH options, if any
652    ///
653    /// Returns the `sink_type` and `ignore_delete`.
654    fn derive_sink_type(
655        derived_stream_kind: StreamKind,
656        properties: &WithOptionsSecResolved,
657        format_desc: Option<&SinkFormatDesc>,
658    ) -> Result<(SinkType, bool)> {
659        let (user_defined_sink_type, user_ignore_delete, syntax_legacy) = match format_desc {
660            Some(f) => (
661                Some(match f.format {
662                    SinkFormat::AppendOnly => SinkType::AppendOnly,
663                    SinkFormat::Upsert => SinkType::Upsert,
664                    SinkFormat::Debezium => SinkType::Retract,
665                }),
666                Self::is_user_ignore_delete(&WithOptionsSecResolved::without_secrets(
667                    f.options.clone(),
668                ))?,
669                false,
670            ),
671            None => (
672                Self::sink_type_in_prop(properties)?,
673                Self::is_user_ignore_delete(properties)?,
674                true,
675            ),
676        };
677
678        if let Some(user_defined_sink_type) = user_defined_sink_type {
679            match user_defined_sink_type {
680                SinkType::AppendOnly => {
681                    if derived_stream_kind != StreamKind::AppendOnly && !user_ignore_delete {
682                        return Err(ErrorCode::InvalidInputSyntax(format!(
683                            "The sink of {} stream cannot be append-only. Please add \"force_append_only='true'\" in {} options to force the sink to be append-only. \
684                             Notice that this will cause the sink executor to drop DELETE messages and convert UPDATE messages to INSERT.",
685                            derived_stream_kind,
686                            if syntax_legacy { "WITH" } else { "FORMAT ENCODE" }
687                    ))
688                        .into());
689                    }
690                }
691                SinkType::Upsert => { /* always qualified */ }
692                SinkType::Retract => {
693                    if user_ignore_delete {
694                        bail_invalid_input_syntax!(
695                            "Retract sink type does not support `ignore_delete`. \
696                             Please use `type = 'append-only'` or `type = 'upsert'` instead.",
697                        );
698                    }
699                    if derived_stream_kind == StreamKind::Upsert {
700                        bail_invalid_input_syntax!(
701                            "The sink of upsert stream cannot be retract. \
702                             Please create a materialized view or sink-into-table with this query before sinking it.",
703                        );
704                    }
705                }
706            }
707            Ok((user_defined_sink_type, user_ignore_delete))
708        } else {
709            // No specification at all, follow the optimizer's derivation.
710            // This is also the case for sink-into-table.
711            let sink_type = match derived_stream_kind {
712                // We downgrade `Retract` to `Upsert` unless explicitly specified the type in options,
713                // as it is well supported by most sinks and reduces the amount of data written.
714                StreamKind::Retract | StreamKind::Upsert => SinkType::Upsert,
715                StreamKind::AppendOnly => SinkType::AppendOnly,
716            };
717            Ok((sink_type, user_ignore_delete))
718        }
719    }
720
721    /// Extract user-defined downstream pk columns from with options. Return the indices of the pk
722    /// columns. An empty list of columns is not allowed.
723    ///
724    /// The format of `downstream_pk_str` should be 'col1,col2,...' (delimited by `,`) in order to
725    /// get parsed.
726    fn parse_downstream_pk(
727        downstream_pk_str: &str,
728        columns: &[ColumnCatalog],
729    ) -> Result<Vec<usize>> {
730        // If the user defines the downstream primary key, we find out their indices.
731        let downstream_pk = downstream_pk_str.split(',').collect_vec();
732        let mut downstream_pk_indices = Vec::with_capacity(downstream_pk.len());
733        for key in downstream_pk {
734            let trimmed_key = key.trim();
735            if trimmed_key.is_empty() {
736                continue;
737            }
738            downstream_pk_indices.push(find_column_idx_by_name(columns, trimmed_key)?);
739        }
740        if downstream_pk_indices.is_empty() {
741            bail_invalid_input_syntax!(
742                "Specified primary key should not be empty. \
743                To use derived primary key, remove {DOWNSTREAM_PK_KEY} from WITH options instead."
744            );
745        }
746        Ok(downstream_pk_indices)
747    }
748
749    /// The table schema is: | epoch | seq id | row op | sink columns |
750    /// Pk is: | epoch | seq id |
751    fn infer_kv_log_store_table_catalog(&self) -> TableCatalog {
752        infer_kv_log_store_table_catalog_inner(&self.input, &self.sink_desc().columns)
753    }
754}
755
756impl PlanTreeNodeUnary<Stream> for StreamSink {
757    fn input(&self) -> PlanRef {
758        self.input.clone()
759    }
760
761    fn clone_with_input(&self, input: PlanRef) -> Self {
762        Self::new(input, self.sink_desc.clone(), self.log_store_type)
763        // TODO(nanderstabel): Add assertions (assert_eq!)
764    }
765}
766
767impl_plan_tree_node_for_unary! { Stream, StreamSink }
768
769impl Distill for StreamSink {
770    fn distill<'a>(&self) -> XmlNode<'a> {
771        let sink_type = if self.sink_desc.sink_type.is_append_only() {
772            "append-only"
773        } else {
774            "upsert"
775        };
776        let column_names = self
777            .sink_desc
778            .columns
779            .iter()
780            .map(|col| col.name_with_hidden().to_string())
781            .map(Pretty::from)
782            .collect();
783        let column_names = Pretty::Array(column_names);
784        let mut vec = Vec::with_capacity(3);
785        vec.push(("type", Pretty::from(sink_type)));
786        vec.push(("columns", column_names));
787        if let Some(pk) = &self.sink_desc.downstream_pk {
788            let sink_pk = IndicesDisplay {
789                indices: pk,
790                schema: self.base.schema(),
791            };
792            vec.push(("downstream_pk", sink_pk.distill()));
793        }
794        childless_record("StreamSink", vec)
795    }
796}
797
798impl StreamNode for StreamSink {
799    fn to_stream_prost_body(&self, state: &mut BuildFragmentGraphState) -> PbNodeBody {
800        use risingwave_pb::stream_plan::*;
801
802        // We need to create a table for sink with a kv log store.
803        let table = self
804            .infer_kv_log_store_table_catalog()
805            .with_id(state.gen_table_id_wrapped());
806
807        PbNodeBody::Sink(Box::new(SinkNode {
808            sink_desc: Some(self.sink_desc.to_proto()),
809            table: Some(table.to_internal_table_prost()),
810            log_store_type: self.log_store_type as i32,
811            rate_limit: self.base.ctx().overwrite_options().sink_rate_limit,
812        }))
813    }
814}
815
816impl ExprRewritable<Stream> for StreamSink {}
817
818impl ExprVisitable for StreamSink {}
819
820#[cfg(test)]
821mod test {
822    use risingwave_common::catalog::{ColumnCatalog, ColumnDesc, ColumnId};
823    use risingwave_common::types::{DataType, StructType};
824    use risingwave_common::util::iter_util::ZipEqDebug;
825    use risingwave_pb::expr::expr_node::Type;
826
827    use super::{IcebergPartitionInfo, *};
828    use crate::expr::{Expr, ExprImpl};
829
830    fn create_column_catalog() -> Vec<ColumnCatalog> {
831        vec![
832            ColumnCatalog {
833                column_desc: ColumnDesc::named("v1", ColumnId::new(1), DataType::Int32),
834                is_hidden: false,
835            },
836            ColumnCatalog {
837                column_desc: ColumnDesc::named("v2", ColumnId::new(2), DataType::Timestamptz),
838                is_hidden: false,
839            },
840            ColumnCatalog {
841                column_desc: ColumnDesc::named("v3", ColumnId::new(2), DataType::Timestamp),
842                is_hidden: false,
843            },
844        ]
845    }
846
847    #[test]
848    fn test_iceberg_convert_to_expression() {
849        let partition_type = StructType::new(vec![
850            ("f1", DataType::Int32),
851            ("f2", DataType::Int32),
852            ("f3", DataType::Int32),
853            ("f4", DataType::Int32),
854            ("f5", DataType::Int32),
855            ("f6", DataType::Int32),
856            ("f7", DataType::Int32),
857            ("f8", DataType::Int32),
858            ("f9", DataType::Int32),
859        ]);
860        let partition_fields = vec![
861            ("v1".into(), Transform::Identity),
862            ("v1".into(), Transform::Bucket(10)),
863            ("v1".into(), Transform::Truncate(3)),
864            ("v2".into(), Transform::Year),
865            ("v2".into(), Transform::Month),
866            ("v3".into(), Transform::Day),
867            ("v3".into(), Transform::Hour),
868            ("v1".into(), Transform::Void),
869            ("v3".into(), Transform::Void),
870        ];
871        let partition_info = IcebergPartitionInfo {
872            partition_type: partition_type.clone(),
873            partition_fields: partition_fields.clone(),
874        };
875        let catalog = create_column_catalog();
876        let actual_expr = partition_info.convert_to_expression(&catalog).unwrap();
877        let actual_expr = actual_expr.as_function_call().unwrap();
878
879        assert_eq!(
880            actual_expr.return_type(),
881            DataType::Struct(partition_type.clone())
882        );
883        assert_eq!(actual_expr.inputs().len(), partition_fields.len());
884        assert_eq!(actual_expr.func_type(), Type::Row);
885
886        for ((expr, (_, transform)), (_, expect_type)) in actual_expr
887            .inputs()
888            .iter()
889            .zip_eq_debug(partition_fields.iter())
890            .zip_eq_debug(partition_type.iter())
891        {
892            match transform {
893                Transform::Identity => {
894                    assert!(expr.is_input_ref());
895                    assert_eq!(expr.return_type(), *expect_type);
896                }
897                Transform::Void => {
898                    assert!(expr.is_literal());
899                    assert_eq!(expr.return_type(), *expect_type);
900                }
901                _ => {
902                    let expr = expr.as_function_call().unwrap();
903                    assert_eq!(expr.func_type(), Type::IcebergTransform);
904                    assert_eq!(expr.inputs().len(), 2);
905                    assert_eq!(
906                        expr.inputs()[0],
907                        ExprImpl::literal_varchar(transform.to_string())
908                    );
909                }
910            }
911        }
912    }
913}