risingwave_connector/sink/catalog/
mod.rs

1// Copyright 2023 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15pub mod desc;
16
17use std::collections::BTreeMap;
18use std::fmt::{Display, Formatter};
19
20use anyhow::anyhow;
21use itertools::Itertools;
22use risingwave_common::catalog::{
23    ColumnCatalog, ConnectionId, CreateType, DatabaseId, Field, Schema, SchemaId, StreamJobStatus,
24    TableId, UserId,
25};
26pub use risingwave_common::id::SinkId;
27use risingwave_common::util::epoch::Epoch;
28use risingwave_common::util::sort_util::ColumnOrder;
29use risingwave_pb::catalog::{
30    PbCreateType, PbSink, PbSinkFormatDesc, PbSinkType, PbStreamJobStatus,
31};
32use risingwave_pb::secret::PbSecretRef;
33use serde::Serialize;
34
35use super::{
36    CONNECTOR_TYPE_KEY, SINK_TYPE_APPEND_ONLY, SINK_TYPE_DEBEZIUM, SINK_TYPE_OPTION,
37    SINK_TYPE_UPSERT, SinkError,
38};
39
40#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash)]
41pub enum SinkType {
42    /// The data written into the sink connector can only be INSERT. No UPDATE or DELETE is
43    /// allowed.
44    AppendOnly,
45    /// The data written into the sink connector can be INSERT or DELETE.
46    /// When updating a row, an INSERT with new value will be written.
47    Upsert,
48    /// The data written into the sink connector can be INSERT, UPDATE, or DELETE.
49    /// When updating a row, an UPDATE pair (U- then U+) will be written.
50    ///
51    /// Currently only used by DEBEZIUM format.
52    Retract,
53}
54
55impl SinkType {
56    /// Whether the sink type is `AppendOnly`.
57    pub fn is_append_only(self) -> bool {
58        self == Self::AppendOnly
59    }
60
61    /// Convert to the string specified in `type = '...'` within the WITH options.
62    pub fn type_str(self) -> &'static str {
63        match self {
64            SinkType::AppendOnly => "append-only",
65            SinkType::Upsert => "upsert",
66            SinkType::Retract => "retract",
67        }
68    }
69
70    pub fn to_proto(self) -> PbSinkType {
71        match self {
72            SinkType::AppendOnly => PbSinkType::AppendOnly,
73            SinkType::Upsert => PbSinkType::Upsert,
74            SinkType::Retract => PbSinkType::Retract,
75        }
76    }
77
78    pub fn from_proto(pb: PbSinkType) -> Self {
79        match pb {
80            PbSinkType::AppendOnly => SinkType::AppendOnly,
81            // Backward compatibility: normalize force-append-only to append-only. The associated
82            // behavior is now represented by another field `ignore_delete`.
83            #[expect(deprecated)]
84            PbSinkType::ForceAppendOnly => SinkType::AppendOnly,
85            PbSinkType::Upsert => SinkType::Upsert,
86            PbSinkType::Retract => SinkType::Retract,
87            PbSinkType::Unspecified => unreachable!(),
88        }
89    }
90}
91
92/// May replace [`SinkType`].
93///
94/// TODO: consolidate with [`crate::source::SourceStruct`] and [`crate::parser::SpecificParserConfig`].
95#[derive(Debug, Clone, PartialEq, Eq, Hash)]
96pub struct SinkFormatDesc {
97    pub format: SinkFormat,
98    pub encode: SinkEncode,
99    pub options: BTreeMap<String, String>,
100    pub secret_refs: BTreeMap<String, PbSecretRef>,
101    pub key_encode: Option<SinkEncode>,
102    pub connection_id: Option<ConnectionId>,
103}
104
105/// TODO: consolidate with [`crate::source::SourceFormat`] and [`crate::parser::ProtocolProperties`].
106#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize)]
107pub enum SinkFormat {
108    AppendOnly,
109    Upsert,
110    Debezium,
111}
112
113impl Display for SinkFormat {
114    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
115        write!(f, "{:?}", self)
116    }
117}
118
119/// TODO: consolidate with [`crate::source::SourceEncode`] and [`crate::parser::EncodingProperties`].
120#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize)]
121pub enum SinkEncode {
122    Json,
123    Protobuf,
124    Avro,
125    Template,
126    Parquet,
127    Text,
128    Bytes,
129}
130
131impl Display for SinkEncode {
132    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
133        write!(f, "{:?}", self)
134    }
135}
136
137impl SinkFormatDesc {
138    pub fn from_legacy_type(connector: &str, r#type: &str) -> Result<Option<Self>, SinkError> {
139        use crate::sink::Sink as _;
140        use crate::sink::kafka::KafkaSink;
141        use crate::sink::kinesis::KinesisSink;
142        use crate::sink::pulsar::PulsarSink;
143
144        let format = match r#type {
145            SINK_TYPE_APPEND_ONLY => SinkFormat::AppendOnly,
146            SINK_TYPE_UPSERT => SinkFormat::Upsert,
147            SINK_TYPE_DEBEZIUM => SinkFormat::Debezium,
148            _ => {
149                return Err(SinkError::Config(anyhow!(
150                    "sink type unsupported: {}",
151                    r#type
152                )));
153            }
154        };
155        let encode = match connector {
156            KafkaSink::SINK_NAME | KinesisSink::SINK_NAME | PulsarSink::SINK_NAME => {
157                SinkEncode::Json
158            }
159            _ => return Ok(None),
160        };
161        Ok(Some(Self {
162            format,
163            encode,
164            options: Default::default(),
165            secret_refs: Default::default(),
166            key_encode: None,
167            connection_id: None,
168        }))
169    }
170
171    pub fn to_proto(&self) -> PbSinkFormatDesc {
172        use risingwave_pb::plan_common::{EncodeType as E, FormatType as F};
173
174        let format = match self.format {
175            SinkFormat::AppendOnly => F::Plain,
176            SinkFormat::Upsert => F::Upsert,
177            SinkFormat::Debezium => F::Debezium,
178        };
179        let mapping_encode = |sink_encode: &SinkEncode| match sink_encode {
180            SinkEncode::Json => E::Json,
181            SinkEncode::Protobuf => E::Protobuf,
182            SinkEncode::Avro => E::Avro,
183            SinkEncode::Template => E::Template,
184            SinkEncode::Parquet => E::Parquet,
185            SinkEncode::Text => E::Text,
186            SinkEncode::Bytes => E::Bytes,
187        };
188
189        let encode = mapping_encode(&self.encode);
190        let key_encode = self.key_encode.as_ref().map(|e| mapping_encode(e).into());
191        let options = self
192            .options
193            .iter()
194            .map(|(k, v)| (k.clone(), v.clone()))
195            .collect();
196
197        PbSinkFormatDesc {
198            format: format.into(),
199            encode: encode.into(),
200            options,
201            key_encode,
202            secret_refs: self.secret_refs.clone(),
203            connection_id: self.connection_id,
204        }
205    }
206
207    // This function is for compatibility purposes. It sets the `SinkFormatDesc`
208    // when there is no configuration provided for the snowflake sink only.
209    pub fn plain_json_for_snowflake_only() -> Self {
210        Self {
211            format: SinkFormat::AppendOnly,
212            encode: SinkEncode::Json,
213            options: Default::default(),
214            secret_refs: Default::default(),
215            key_encode: None,
216            connection_id: None,
217        }
218    }
219}
220
221impl TryFrom<PbSinkFormatDesc> for SinkFormatDesc {
222    type Error = SinkError;
223
224    fn try_from(value: PbSinkFormatDesc) -> Result<Self, Self::Error> {
225        use risingwave_pb::plan_common::{EncodeType as E, FormatType as F};
226
227        let format = match value.format() {
228            F::Plain => SinkFormat::AppendOnly,
229            F::Upsert => SinkFormat::Upsert,
230            F::Debezium => SinkFormat::Debezium,
231            f @ (F::Unspecified
232            | F::Native
233            | F::DebeziumMongo
234            | F::Maxwell
235            | F::Canal
236            | F::None) => {
237                return Err(SinkError::Config(anyhow!(
238                    "sink format unsupported: {}",
239                    f.as_str_name()
240                )));
241            }
242        };
243        let encode = match value.encode() {
244            E::Json => SinkEncode::Json,
245            E::Protobuf => SinkEncode::Protobuf,
246            E::Template => SinkEncode::Template,
247            E::Avro => SinkEncode::Avro,
248            E::Parquet => SinkEncode::Parquet,
249            E::Bytes => SinkEncode::Bytes,
250            e @ (E::Unspecified | E::Native | E::Csv | E::None | E::Text) => {
251                return Err(SinkError::Config(anyhow!(
252                    "sink encode unsupported: {}",
253                    e.as_str_name()
254                )));
255            }
256        };
257        let key_encode = match &value.key_encode() {
258            E::Bytes => Some(SinkEncode::Bytes),
259            E::Text => Some(SinkEncode::Text),
260            E::Unspecified => None,
261            encode @ (E::Avro
262            | E::Csv
263            | E::Json
264            | E::Protobuf
265            | E::Template
266            | E::Native
267            | E::Parquet
268            | E::None) => {
269                return Err(SinkError::Config(anyhow!(
270                    "unsupported {} as sink key encode",
271                    encode.as_str_name()
272                )));
273            }
274        };
275
276        Ok(Self {
277            format,
278            encode,
279            options: value.options,
280            key_encode,
281            secret_refs: value.secret_refs,
282            connection_id: value.connection_id,
283        })
284    }
285}
286
287/// the catalog of the sink. There are two kind of schema here. The full schema is all columns
288/// stored in the `column` which is the sink executor/fragment's output schema. The visible
289/// schema contains the columns whose `is_hidden` is false, which is the columns sink out to the
290/// external system. The distribution key and all other keys are indexed in the full schema.
291#[derive(Clone, Debug)]
292pub struct SinkCatalog {
293    /// Id of the sink.
294    pub id: SinkId,
295
296    /// Schema of the sink.
297    pub schema_id: SchemaId,
298
299    /// Database of the sink.
300    pub database_id: DatabaseId,
301
302    /// Name of the sink.
303    pub name: String,
304
305    /// The full `CREATE SINK` definition of the sink.
306    pub definition: String,
307
308    /// All columns of the sink. Note that this is NOT sorted by columnId in the vector.
309    columns: Vec<ColumnCatalog>,
310
311    /// Primary keys of the sink. Derived by the frontend.
312    pub plan_pk: Vec<ColumnOrder>,
313
314    /// User-defined primary key indices for upsert sink, if any.
315    pub downstream_pk: Option<Vec<usize>>,
316
317    /// Distribution key indices of the sink. For example, if `distribution_key = [1, 2]`, then the
318    /// distribution keys will be `columns[1]` and `columns[2]`.
319    pub distribution_key: Vec<usize>,
320
321    /// The properties of the sink.
322    pub properties: BTreeMap<String, String>,
323
324    /// Owner of the sink.
325    pub owner: UserId,
326
327    // The append-only behavior of the physical sink connector. Frontend will determine `sink_type`
328    // based on both its own derivation on the append-only attribute and other user-specified
329    // options in `properties`.
330    pub sink_type: SinkType,
331    /// Whether to drop DELETE and convert UPDATE to INSERT in the sink executor.
332    pub ignore_delete: bool,
333
334    // The format and encode of the sink.
335    pub format_desc: Option<SinkFormatDesc>,
336
337    /// Sink may use a privatelink connection to connect to the downstream system.
338    pub connection_id: Option<ConnectionId>,
339
340    pub created_at_epoch: Option<Epoch>,
341
342    pub initialized_at_epoch: Option<Epoch>,
343
344    /// Name of the database
345    pub db_name: String,
346
347    /// Name for the table info for Debezium sink
348    pub sink_from_name: String,
349    pub auto_refresh_schema_from_table: Option<TableId>,
350
351    pub target_table: Option<TableId>,
352
353    pub created_at_cluster_version: Option<String>,
354    pub initialized_at_cluster_version: Option<String>,
355    pub create_type: CreateType,
356
357    /// Indicate the stream job status, whether it is created or creating.
358    /// If it is creating, we should hide it.
359    pub stream_job_status: StreamJobStatus,
360
361    /// The secret reference for the sink, mapping from property name to secret id.
362    pub secret_refs: BTreeMap<String, PbSecretRef>,
363
364    /// Only for the sink whose target is a table. Columns of the target table when the sink is created. At this point all the default columns of the target table are all handled by the project operator in the sink plan.
365    pub original_target_columns: Vec<ColumnCatalog>,
366}
367
368impl SinkCatalog {
369    pub fn to_proto(&self) -> PbSink {
370        PbSink {
371            id: self.id,
372            schema_id: self.schema_id,
373            database_id: self.database_id,
374            name: self.name.clone(),
375            definition: self.definition.clone(),
376            columns: self.columns.iter().map(|c| c.to_protobuf()).collect_vec(),
377            plan_pk: self.plan_pk.iter().map(|o| o.to_protobuf()).collect(),
378            downstream_pk: (self.downstream_pk.as_ref())
379                .map_or_else(Vec::new, |pk| pk.iter().map(|idx| *idx as _).collect_vec()),
380            distribution_key: self
381                .distribution_key
382                .iter()
383                .map(|k| *k as i32)
384                .collect_vec(),
385            owner: self.owner,
386            properties: self.properties.clone(),
387            sink_type: self.sink_type.to_proto() as i32,
388            raw_ignore_delete: self.ignore_delete,
389            format_desc: self.format_desc.as_ref().map(|f| f.to_proto()),
390            connection_id: self.connection_id,
391            initialized_at_epoch: self.initialized_at_epoch.map(|e| e.0),
392            created_at_epoch: self.created_at_epoch.map(|e| e.0),
393            db_name: self.db_name.clone(),
394            sink_from_name: self.sink_from_name.clone(),
395            stream_job_status: self.stream_job_status.to_proto().into(),
396            target_table: self.target_table,
397            created_at_cluster_version: self.created_at_cluster_version.clone(),
398            initialized_at_cluster_version: self.initialized_at_cluster_version.clone(),
399            create_type: self.create_type.to_proto() as i32,
400            secret_refs: self.secret_refs.clone(),
401            original_target_columns: self
402                .original_target_columns
403                .iter()
404                .map(|c| c.to_protobuf())
405                .collect_vec(),
406            auto_refresh_schema_from_table: self.auto_refresh_schema_from_table,
407        }
408    }
409
410    /// Returns the SQL statement that can be used to create this sink.
411    pub fn create_sql(&self) -> String {
412        self.definition.clone()
413    }
414
415    pub fn visible_columns(&self) -> impl Iterator<Item = &ColumnCatalog> {
416        self.columns.iter().filter(|c| !c.is_hidden)
417    }
418
419    pub fn full_columns(&self) -> &[ColumnCatalog] {
420        &self.columns
421    }
422
423    pub fn full_schema(&self) -> Schema {
424        let fields = self
425            .full_columns()
426            .iter()
427            .map(|column| Field::from(column.column_desc.clone()))
428            .collect_vec();
429        Schema { fields }
430    }
431
432    pub fn visible_schema(&self) -> Schema {
433        let fields = self
434            .visible_columns()
435            .map(|column| Field::from(column.column_desc.clone()))
436            .collect_vec();
437        Schema { fields }
438    }
439
440    pub fn unique_identity(&self) -> String {
441        // We need to align with meta here, so we've utilized the proto method.
442        self.to_proto().unique_identity()
443    }
444
445    pub fn is_created(&self) -> bool {
446        self.stream_job_status == StreamJobStatus::Created
447    }
448}
449
450impl From<PbSink> for SinkCatalog {
451    fn from(pb: PbSink) -> Self {
452        let sink_type = pb.get_sink_type().unwrap();
453        let ignore_delete = pb.ignore_delete();
454        let create_type = pb.get_create_type().unwrap_or(PbCreateType::Foreground);
455        let stream_job_status = pb
456            .get_stream_job_status()
457            .unwrap_or(PbStreamJobStatus::Created);
458        let format_desc = match pb.format_desc {
459            Some(f) => f.try_into().ok(),
460            None => {
461                let connector = pb.properties.get(CONNECTOR_TYPE_KEY);
462                let r#type = pb.properties.get(SINK_TYPE_OPTION);
463                match (connector, r#type) {
464                    (Some(c), Some(t)) => SinkFormatDesc::from_legacy_type(c, t).ok().flatten(),
465                    _ => None,
466                }
467            }
468        };
469        SinkCatalog {
470            id: pb.id,
471            name: pb.name,
472            schema_id: pb.schema_id,
473            database_id: pb.database_id,
474            definition: pb.definition,
475            columns: pb
476                .columns
477                .into_iter()
478                .map(ColumnCatalog::from)
479                .collect_vec(),
480            plan_pk: pb
481                .plan_pk
482                .iter()
483                .map(ColumnOrder::from_protobuf)
484                .collect_vec(),
485            downstream_pk: if pb.downstream_pk.is_empty() {
486                None
487            } else {
488                Some(
489                    (pb.downstream_pk.into_iter())
490                        .map(|idx| idx as usize)
491                        .collect_vec(),
492                )
493            },
494            distribution_key: pb
495                .distribution_key
496                .into_iter()
497                .map(|k| k as _)
498                .collect_vec(),
499            properties: pb.properties,
500            owner: pb.owner,
501            sink_type: SinkType::from_proto(sink_type),
502            ignore_delete,
503            format_desc,
504            connection_id: pb.connection_id,
505            created_at_epoch: pb.created_at_epoch.map(Epoch::from),
506            initialized_at_epoch: pb.initialized_at_epoch.map(Epoch::from),
507            db_name: pb.db_name,
508            sink_from_name: pb.sink_from_name,
509            auto_refresh_schema_from_table: pb.auto_refresh_schema_from_table,
510            target_table: pb.target_table,
511            initialized_at_cluster_version: pb.initialized_at_cluster_version,
512            created_at_cluster_version: pb.created_at_cluster_version,
513            create_type: CreateType::from_proto(create_type),
514            stream_job_status: StreamJobStatus::from_proto(stream_job_status),
515            secret_refs: pb.secret_refs,
516            original_target_columns: pb
517                .original_target_columns
518                .into_iter()
519                .map(ColumnCatalog::from)
520                .collect_vec(),
521        }
522    }
523}
524
525impl From<&PbSink> for SinkCatalog {
526    fn from(pb: &PbSink) -> Self {
527        pb.clone().into()
528    }
529}