risingwave_connector/sink/catalog/
mod.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15pub mod desc;
16
17use std::collections::BTreeMap;
18use std::fmt::{Display, Formatter};
19
20use anyhow::anyhow;
21use itertools::Itertools;
22use risingwave_common::catalog::{
23    ColumnCatalog, ConnectionId, CreateType, DatabaseId, Field, Schema, SchemaId, StreamJobStatus,
24    TableId, UserId,
25};
26pub use risingwave_common::id::SinkId;
27use risingwave_common::util::epoch::Epoch;
28use risingwave_common::util::sort_util::ColumnOrder;
29use risingwave_pb::catalog::{
30    PbCreateType, PbSink, PbSinkFormatDesc, PbSinkType, PbStreamJobStatus,
31};
32use risingwave_pb::secret::PbSecretRef;
33use serde::Serialize;
34
35use super::{
36    CONNECTOR_TYPE_KEY, SINK_TYPE_APPEND_ONLY, SINK_TYPE_DEBEZIUM, SINK_TYPE_OPTION,
37    SINK_TYPE_UPSERT, SinkError,
38};
39
40#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash)]
41pub enum SinkType {
42    /// The data written into the sink connector can only be INSERT. No UPDATE or DELETE is
43    /// allowed.
44    AppendOnly,
45    /// The input of the sink operator can be INSERT, UPDATE, or DELETE, but it must drop any
46    /// UPDATE or DELETE and write only INSERT into the sink connector.
47    ForceAppendOnly,
48    /// The data written into the sink connector can be INSERT or DELETE.
49    /// When updating a row, an INSERT with new value will be written.
50    Upsert,
51    /// The data written into the sink connector can be INSERT, UPDATE, or DELETE.
52    /// When updating a row, an UPDATE pair (U- then U+) will be written.
53    ///
54    /// Currently only used by DEBEZIUM format.
55    Retract,
56}
57
58impl SinkType {
59    /// Whether the sink type is `AppendOnly` or `ForceAppendOnly`.
60    pub fn is_append_only(self) -> bool {
61        self == Self::AppendOnly || self == Self::ForceAppendOnly
62    }
63
64    /// Convert to the string specified in `type = '...'` within the WITH options.
65    pub fn type_str(self) -> &'static str {
66        match self {
67            SinkType::AppendOnly | SinkType::ForceAppendOnly => "append-only",
68            SinkType::Upsert => "upsert",
69            SinkType::Retract => "retract",
70        }
71    }
72
73    pub fn to_proto(self) -> PbSinkType {
74        match self {
75            SinkType::AppendOnly => PbSinkType::AppendOnly,
76            SinkType::ForceAppendOnly => PbSinkType::ForceAppendOnly,
77            SinkType::Upsert => PbSinkType::Upsert,
78            SinkType::Retract => PbSinkType::Retract,
79        }
80    }
81
82    pub fn from_proto(pb: PbSinkType) -> Self {
83        match pb {
84            PbSinkType::AppendOnly => SinkType::AppendOnly,
85            PbSinkType::ForceAppendOnly => SinkType::ForceAppendOnly,
86            PbSinkType::Upsert => SinkType::Upsert,
87            PbSinkType::Retract => SinkType::Retract,
88            PbSinkType::Unspecified => unreachable!(),
89        }
90    }
91}
92
93/// May replace [`SinkType`].
94///
95/// TODO: consolidate with [`crate::source::SourceStruct`] and [`crate::parser::SpecificParserConfig`].
96#[derive(Debug, Clone, PartialEq, Eq, Hash)]
97pub struct SinkFormatDesc {
98    pub format: SinkFormat,
99    pub encode: SinkEncode,
100    pub options: BTreeMap<String, String>,
101    pub secret_refs: BTreeMap<String, PbSecretRef>,
102    pub key_encode: Option<SinkEncode>,
103    pub connection_id: Option<ConnectionId>,
104}
105
106/// TODO: consolidate with [`crate::source::SourceFormat`] and [`crate::parser::ProtocolProperties`].
107#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize)]
108pub enum SinkFormat {
109    AppendOnly,
110    Upsert,
111    Debezium,
112}
113
114impl Display for SinkFormat {
115    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
116        write!(f, "{:?}", self)
117    }
118}
119
120/// TODO: consolidate with [`crate::source::SourceEncode`] and [`crate::parser::EncodingProperties`].
121#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize)]
122pub enum SinkEncode {
123    Json,
124    Protobuf,
125    Avro,
126    Template,
127    Parquet,
128    Text,
129    Bytes,
130}
131
132impl Display for SinkEncode {
133    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
134        write!(f, "{:?}", self)
135    }
136}
137
138impl SinkFormatDesc {
139    pub fn from_legacy_type(connector: &str, r#type: &str) -> Result<Option<Self>, SinkError> {
140        use crate::sink::Sink as _;
141        use crate::sink::kafka::KafkaSink;
142        use crate::sink::kinesis::KinesisSink;
143        use crate::sink::pulsar::PulsarSink;
144
145        let format = match r#type {
146            SINK_TYPE_APPEND_ONLY => SinkFormat::AppendOnly,
147            SINK_TYPE_UPSERT => SinkFormat::Upsert,
148            SINK_TYPE_DEBEZIUM => SinkFormat::Debezium,
149            _ => {
150                return Err(SinkError::Config(anyhow!(
151                    "sink type unsupported: {}",
152                    r#type
153                )));
154            }
155        };
156        let encode = match connector {
157            KafkaSink::SINK_NAME | KinesisSink::SINK_NAME | PulsarSink::SINK_NAME => {
158                SinkEncode::Json
159            }
160            _ => return Ok(None),
161        };
162        Ok(Some(Self {
163            format,
164            encode,
165            options: Default::default(),
166            secret_refs: Default::default(),
167            key_encode: None,
168            connection_id: None,
169        }))
170    }
171
172    pub fn to_proto(&self) -> PbSinkFormatDesc {
173        use risingwave_pb::plan_common::{EncodeType as E, FormatType as F};
174
175        let format = match self.format {
176            SinkFormat::AppendOnly => F::Plain,
177            SinkFormat::Upsert => F::Upsert,
178            SinkFormat::Debezium => F::Debezium,
179        };
180        let mapping_encode = |sink_encode: &SinkEncode| match sink_encode {
181            SinkEncode::Json => E::Json,
182            SinkEncode::Protobuf => E::Protobuf,
183            SinkEncode::Avro => E::Avro,
184            SinkEncode::Template => E::Template,
185            SinkEncode::Parquet => E::Parquet,
186            SinkEncode::Text => E::Text,
187            SinkEncode::Bytes => E::Bytes,
188        };
189
190        let encode = mapping_encode(&self.encode);
191        let key_encode = self.key_encode.as_ref().map(|e| mapping_encode(e).into());
192        let options = self
193            .options
194            .iter()
195            .map(|(k, v)| (k.clone(), v.clone()))
196            .collect();
197
198        PbSinkFormatDesc {
199            format: format.into(),
200            encode: encode.into(),
201            options,
202            key_encode,
203            secret_refs: self.secret_refs.clone(),
204            connection_id: self.connection_id,
205        }
206    }
207
208    // This function is for compatibility purposes. It sets the `SinkFormatDesc`
209    // when there is no configuration provided for the snowflake sink only.
210    pub fn plain_json_for_snowflake_only() -> Self {
211        Self {
212            format: SinkFormat::AppendOnly,
213            encode: SinkEncode::Json,
214            options: Default::default(),
215            secret_refs: Default::default(),
216            key_encode: None,
217            connection_id: None,
218        }
219    }
220}
221
222impl TryFrom<PbSinkFormatDesc> for SinkFormatDesc {
223    type Error = SinkError;
224
225    fn try_from(value: PbSinkFormatDesc) -> Result<Self, Self::Error> {
226        use risingwave_pb::plan_common::{EncodeType as E, FormatType as F};
227
228        let format = match value.format() {
229            F::Plain => SinkFormat::AppendOnly,
230            F::Upsert => SinkFormat::Upsert,
231            F::Debezium => SinkFormat::Debezium,
232            f @ (F::Unspecified
233            | F::Native
234            | F::DebeziumMongo
235            | F::Maxwell
236            | F::Canal
237            | F::None) => {
238                return Err(SinkError::Config(anyhow!(
239                    "sink format unsupported: {}",
240                    f.as_str_name()
241                )));
242            }
243        };
244        let encode = match value.encode() {
245            E::Json => SinkEncode::Json,
246            E::Protobuf => SinkEncode::Protobuf,
247            E::Template => SinkEncode::Template,
248            E::Avro => SinkEncode::Avro,
249            E::Parquet => SinkEncode::Parquet,
250            e @ (E::Unspecified | E::Native | E::Csv | E::Bytes | E::None | E::Text) => {
251                return Err(SinkError::Config(anyhow!(
252                    "sink encode unsupported: {}",
253                    e.as_str_name()
254                )));
255            }
256        };
257        let key_encode = match &value.key_encode() {
258            E::Bytes => Some(SinkEncode::Bytes),
259            E::Text => Some(SinkEncode::Text),
260            E::Unspecified => None,
261            encode @ (E::Avro
262            | E::Csv
263            | E::Json
264            | E::Protobuf
265            | E::Template
266            | E::Native
267            | E::Parquet
268            | E::None) => {
269                return Err(SinkError::Config(anyhow!(
270                    "unsupported {} as sink key encode",
271                    encode.as_str_name()
272                )));
273            }
274        };
275
276        Ok(Self {
277            format,
278            encode,
279            options: value.options,
280            key_encode,
281            secret_refs: value.secret_refs,
282            connection_id: value.connection_id,
283        })
284    }
285}
286
287/// the catalog of the sink. There are two kind of schema here. The full schema is all columns
288/// stored in the `column` which is the sink executor/fragment's output schema. The visible
289/// schema contains the columns whose `is_hidden` is false, which is the columns sink out to the
290/// external system. The distribution key and all other keys are indexed in the full schema.
291#[derive(Clone, Debug)]
292pub struct SinkCatalog {
293    /// Id of the sink.
294    pub id: SinkId,
295
296    /// Schema of the sink.
297    pub schema_id: SchemaId,
298
299    /// Database of the sink.
300    pub database_id: DatabaseId,
301
302    /// Name of the sink.
303    pub name: String,
304
305    /// The full `CREATE SINK` definition of the sink.
306    pub definition: String,
307
308    /// All columns of the sink. Note that this is NOT sorted by columnId in the vector.
309    columns: Vec<ColumnCatalog>,
310
311    /// Primary keys of the sink. Derived by the frontend.
312    pub plan_pk: Vec<ColumnOrder>,
313
314    /// User-defined primary key indices for upsert sink, if any.
315    pub downstream_pk: Option<Vec<usize>>,
316
317    /// Distribution key indices of the sink. For example, if `distribution_key = [1, 2]`, then the
318    /// distribution keys will be `columns[1]` and `columns[2]`.
319    pub distribution_key: Vec<usize>,
320
321    /// The properties of the sink.
322    pub properties: BTreeMap<String, String>,
323
324    /// Owner of the sink.
325    pub owner: UserId,
326
327    // The append-only behavior of the physical sink connector. Frontend will determine `sink_type`
328    // based on both its own derivation on the append-only attribute and other user-specified
329    // options in `properties`.
330    pub sink_type: SinkType,
331
332    // The format and encode of the sink.
333    pub format_desc: Option<SinkFormatDesc>,
334
335    /// Sink may use a privatelink connection to connect to the downstream system.
336    pub connection_id: Option<ConnectionId>,
337
338    pub created_at_epoch: Option<Epoch>,
339
340    pub initialized_at_epoch: Option<Epoch>,
341
342    /// Name of the database
343    pub db_name: String,
344
345    /// Name for the table info for Debezium sink
346    pub sink_from_name: String,
347    pub auto_refresh_schema_from_table: Option<TableId>,
348
349    pub target_table: Option<TableId>,
350
351    pub created_at_cluster_version: Option<String>,
352    pub initialized_at_cluster_version: Option<String>,
353    pub create_type: CreateType,
354
355    /// Indicate the stream job status, whether it is created or creating.
356    /// If it is creating, we should hide it.
357    pub stream_job_status: StreamJobStatus,
358
359    /// The secret reference for the sink, mapping from property name to secret id.
360    pub secret_refs: BTreeMap<String, PbSecretRef>,
361
362    /// Only for the sink whose target is a table. Columns of the target table when the sink is created. At this point all the default columns of the target table are all handled by the project operator in the sink plan.
363    pub original_target_columns: Vec<ColumnCatalog>,
364}
365
366impl SinkCatalog {
367    pub fn to_proto(&self) -> PbSink {
368        PbSink {
369            id: self.id,
370            schema_id: self.schema_id,
371            database_id: self.database_id,
372            name: self.name.clone(),
373            definition: self.definition.clone(),
374            columns: self.columns.iter().map(|c| c.to_protobuf()).collect_vec(),
375            plan_pk: self.plan_pk.iter().map(|o| o.to_protobuf()).collect(),
376            downstream_pk: (self.downstream_pk.as_ref())
377                .map_or_else(Vec::new, |pk| pk.iter().map(|idx| *idx as _).collect_vec()),
378            distribution_key: self
379                .distribution_key
380                .iter()
381                .map(|k| *k as i32)
382                .collect_vec(),
383            owner: self.owner.into(),
384            properties: self.properties.clone(),
385            sink_type: self.sink_type.to_proto() as i32,
386            format_desc: self.format_desc.as_ref().map(|f| f.to_proto()),
387            connection_id: self.connection_id,
388            initialized_at_epoch: self.initialized_at_epoch.map(|e| e.0),
389            created_at_epoch: self.created_at_epoch.map(|e| e.0),
390            db_name: self.db_name.clone(),
391            sink_from_name: self.sink_from_name.clone(),
392            stream_job_status: self.stream_job_status.to_proto().into(),
393            target_table: self.target_table,
394            created_at_cluster_version: self.created_at_cluster_version.clone(),
395            initialized_at_cluster_version: self.initialized_at_cluster_version.clone(),
396            create_type: self.create_type.to_proto() as i32,
397            secret_refs: self.secret_refs.clone(),
398            original_target_columns: self
399                .original_target_columns
400                .iter()
401                .map(|c| c.to_protobuf())
402                .collect_vec(),
403            auto_refresh_schema_from_table: self.auto_refresh_schema_from_table,
404        }
405    }
406
407    /// Returns the SQL statement that can be used to create this sink.
408    pub fn create_sql(&self) -> String {
409        self.definition.clone()
410    }
411
412    pub fn visible_columns(&self) -> impl Iterator<Item = &ColumnCatalog> {
413        self.columns.iter().filter(|c| !c.is_hidden)
414    }
415
416    pub fn full_columns(&self) -> &[ColumnCatalog] {
417        &self.columns
418    }
419
420    pub fn full_schema(&self) -> Schema {
421        let fields = self
422            .full_columns()
423            .iter()
424            .map(|column| Field::from(column.column_desc.clone()))
425            .collect_vec();
426        Schema { fields }
427    }
428
429    pub fn visible_schema(&self) -> Schema {
430        let fields = self
431            .visible_columns()
432            .map(|column| Field::from(column.column_desc.clone()))
433            .collect_vec();
434        Schema { fields }
435    }
436
437    pub fn unique_identity(&self) -> String {
438        // We need to align with meta here, so we've utilized the proto method.
439        self.to_proto().unique_identity()
440    }
441
442    pub fn is_created(&self) -> bool {
443        self.stream_job_status == StreamJobStatus::Created
444    }
445}
446
447impl From<PbSink> for SinkCatalog {
448    fn from(pb: PbSink) -> Self {
449        let sink_type = pb.get_sink_type().unwrap();
450        let create_type = pb.get_create_type().unwrap_or(PbCreateType::Foreground);
451        let stream_job_status = pb
452            .get_stream_job_status()
453            .unwrap_or(PbStreamJobStatus::Created);
454        let format_desc = match pb.format_desc {
455            Some(f) => f.try_into().ok(),
456            None => {
457                let connector = pb.properties.get(CONNECTOR_TYPE_KEY);
458                let r#type = pb.properties.get(SINK_TYPE_OPTION);
459                match (connector, r#type) {
460                    (Some(c), Some(t)) => SinkFormatDesc::from_legacy_type(c, t).ok().flatten(),
461                    _ => None,
462                }
463            }
464        };
465        SinkCatalog {
466            id: pb.id,
467            name: pb.name,
468            schema_id: pb.schema_id,
469            database_id: pb.database_id,
470            definition: pb.definition,
471            columns: pb
472                .columns
473                .into_iter()
474                .map(ColumnCatalog::from)
475                .collect_vec(),
476            plan_pk: pb
477                .plan_pk
478                .iter()
479                .map(ColumnOrder::from_protobuf)
480                .collect_vec(),
481            downstream_pk: if pb.downstream_pk.is_empty() {
482                None
483            } else {
484                Some(
485                    (pb.downstream_pk.into_iter())
486                        .map(|idx| idx as usize)
487                        .collect_vec(),
488                )
489            },
490            distribution_key: pb
491                .distribution_key
492                .into_iter()
493                .map(|k| k as _)
494                .collect_vec(),
495            properties: pb.properties,
496            owner: pb.owner.into(),
497            sink_type: SinkType::from_proto(sink_type),
498            format_desc,
499            connection_id: pb.connection_id,
500            created_at_epoch: pb.created_at_epoch.map(Epoch::from),
501            initialized_at_epoch: pb.initialized_at_epoch.map(Epoch::from),
502            db_name: pb.db_name,
503            sink_from_name: pb.sink_from_name,
504            auto_refresh_schema_from_table: pb.auto_refresh_schema_from_table,
505            target_table: pb.target_table,
506            initialized_at_cluster_version: pb.initialized_at_cluster_version,
507            created_at_cluster_version: pb.created_at_cluster_version,
508            create_type: CreateType::from_proto(create_type),
509            stream_job_status: StreamJobStatus::from_proto(stream_job_status),
510            secret_refs: pb.secret_refs,
511            original_target_columns: pb
512                .original_target_columns
513                .into_iter()
514                .map(ColumnCatalog::from)
515                .collect_vec(),
516        }
517    }
518}
519
520impl From<&PbSink> for SinkCatalog {
521    fn from(pb: &PbSink) -> Self {
522        pb.clone().into()
523    }
524}