risingwave_connector/sink/catalog/
mod.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15pub mod desc;
16
17use std::collections::BTreeMap;
18use std::fmt::{Display, Formatter};
19
20use anyhow::anyhow;
21use itertools::Itertools;
22use risingwave_common::catalog::{
23    ColumnCatalog, ConnectionId, CreateType, DatabaseId, Field, Schema, SchemaId, StreamJobStatus,
24    TableId, UserId,
25};
26pub use risingwave_common::id::SinkId;
27use risingwave_common::util::epoch::Epoch;
28use risingwave_common::util::sort_util::ColumnOrder;
29use risingwave_pb::catalog::{
30    PbCreateType, PbSink, PbSinkFormatDesc, PbSinkType, PbStreamJobStatus,
31};
32use risingwave_pb::secret::PbSecretRef;
33use serde::Serialize;
34
35use super::{
36    CONNECTOR_TYPE_KEY, SINK_TYPE_APPEND_ONLY, SINK_TYPE_DEBEZIUM, SINK_TYPE_OPTION,
37    SINK_TYPE_UPSERT, SinkError,
38};
39
40#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash)]
41pub enum SinkType {
42    /// The data written into the sink connector can only be INSERT. No UPDATE or DELETE is
43    /// allowed.
44    AppendOnly,
45    /// The input of the sink operator can be INSERT, UPDATE, or DELETE, but it must drop any
46    /// UPDATE or DELETE and write only INSERT into the sink connector.
47    ForceAppendOnly,
48    /// The data written into the sink connector can be INSERT or DELETE.
49    /// When updating a row, an INSERT with new value will be written.
50    Upsert,
51    /// The data written into the sink connector can be INSERT, UPDATE, or DELETE.
52    /// When updating a row, an UPDATE pair (U- then U+) will be written.
53    ///
54    /// Currently only used by DEBEZIUM format.
55    Retract,
56}
57
58impl SinkType {
59    /// Whether the sink type is `AppendOnly` or `ForceAppendOnly`.
60    pub fn is_append_only(self) -> bool {
61        self == Self::AppendOnly || self == Self::ForceAppendOnly
62    }
63
64    /// Convert to the string specified in `type = '...'` within the WITH options.
65    pub fn type_str(self) -> &'static str {
66        match self {
67            SinkType::AppendOnly | SinkType::ForceAppendOnly => "append-only",
68            SinkType::Upsert => "upsert",
69            SinkType::Retract => "retract",
70        }
71    }
72
73    pub fn to_proto(self) -> PbSinkType {
74        match self {
75            SinkType::AppendOnly => PbSinkType::AppendOnly,
76            SinkType::ForceAppendOnly => PbSinkType::ForceAppendOnly,
77            SinkType::Upsert => PbSinkType::Upsert,
78            SinkType::Retract => PbSinkType::Retract,
79        }
80    }
81
82    pub fn from_proto(pb: PbSinkType) -> Self {
83        match pb {
84            PbSinkType::AppendOnly => SinkType::AppendOnly,
85            PbSinkType::ForceAppendOnly => SinkType::ForceAppendOnly,
86            PbSinkType::Upsert => SinkType::Upsert,
87            PbSinkType::Retract => SinkType::Retract,
88            PbSinkType::Unspecified => unreachable!(),
89        }
90    }
91}
92
93/// May replace [`SinkType`].
94///
95/// TODO: consolidate with [`crate::source::SourceStruct`] and [`crate::parser::SpecificParserConfig`].
96#[derive(Debug, Clone, PartialEq, Eq, Hash)]
97pub struct SinkFormatDesc {
98    pub format: SinkFormat,
99    pub encode: SinkEncode,
100    pub options: BTreeMap<String, String>,
101    pub secret_refs: BTreeMap<String, PbSecretRef>,
102    pub key_encode: Option<SinkEncode>,
103    pub connection_id: Option<ConnectionId>,
104}
105
106/// TODO: consolidate with [`crate::source::SourceFormat`] and [`crate::parser::ProtocolProperties`].
107#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize)]
108pub enum SinkFormat {
109    AppendOnly,
110    Upsert,
111    Debezium,
112}
113
114impl Display for SinkFormat {
115    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
116        write!(f, "{:?}", self)
117    }
118}
119
120/// TODO: consolidate with [`crate::source::SourceEncode`] and [`crate::parser::EncodingProperties`].
121#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize)]
122pub enum SinkEncode {
123    Json,
124    Protobuf,
125    Avro,
126    Template,
127    Parquet,
128    Text,
129    Bytes,
130}
131
132impl Display for SinkEncode {
133    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
134        write!(f, "{:?}", self)
135    }
136}
137
138impl SinkFormatDesc {
139    pub fn from_legacy_type(connector: &str, r#type: &str) -> Result<Option<Self>, SinkError> {
140        use crate::sink::Sink as _;
141        use crate::sink::kafka::KafkaSink;
142        use crate::sink::kinesis::KinesisSink;
143        use crate::sink::pulsar::PulsarSink;
144
145        let format = match r#type {
146            SINK_TYPE_APPEND_ONLY => SinkFormat::AppendOnly,
147            SINK_TYPE_UPSERT => SinkFormat::Upsert,
148            SINK_TYPE_DEBEZIUM => SinkFormat::Debezium,
149            _ => {
150                return Err(SinkError::Config(anyhow!(
151                    "sink type unsupported: {}",
152                    r#type
153                )));
154            }
155        };
156        let encode = match connector {
157            KafkaSink::SINK_NAME | KinesisSink::SINK_NAME | PulsarSink::SINK_NAME => {
158                SinkEncode::Json
159            }
160            _ => return Ok(None),
161        };
162        Ok(Some(Self {
163            format,
164            encode,
165            options: Default::default(),
166            secret_refs: Default::default(),
167            key_encode: None,
168            connection_id: None,
169        }))
170    }
171
172    pub fn to_proto(&self) -> PbSinkFormatDesc {
173        use risingwave_pb::plan_common::{EncodeType as E, FormatType as F};
174
175        let format = match self.format {
176            SinkFormat::AppendOnly => F::Plain,
177            SinkFormat::Upsert => F::Upsert,
178            SinkFormat::Debezium => F::Debezium,
179        };
180        let mapping_encode = |sink_encode: &SinkEncode| match sink_encode {
181            SinkEncode::Json => E::Json,
182            SinkEncode::Protobuf => E::Protobuf,
183            SinkEncode::Avro => E::Avro,
184            SinkEncode::Template => E::Template,
185            SinkEncode::Parquet => E::Parquet,
186            SinkEncode::Text => E::Text,
187            SinkEncode::Bytes => E::Bytes,
188        };
189
190        let encode = mapping_encode(&self.encode);
191        let key_encode = self.key_encode.as_ref().map(|e| mapping_encode(e).into());
192        let options = self
193            .options
194            .iter()
195            .map(|(k, v)| (k.clone(), v.clone()))
196            .collect();
197
198        PbSinkFormatDesc {
199            format: format.into(),
200            encode: encode.into(),
201            options,
202            key_encode,
203            secret_refs: self.secret_refs.clone(),
204            connection_id: self.connection_id,
205        }
206    }
207
208    // This function is for compatibility purposes. It sets the `SinkFormatDesc`
209    // when there is no configuration provided for the snowflake sink only.
210    pub fn plain_json_for_snowflake_only() -> Self {
211        Self {
212            format: SinkFormat::AppendOnly,
213            encode: SinkEncode::Json,
214            options: Default::default(),
215            secret_refs: Default::default(),
216            key_encode: None,
217            connection_id: None,
218        }
219    }
220}
221
222impl TryFrom<PbSinkFormatDesc> for SinkFormatDesc {
223    type Error = SinkError;
224
225    fn try_from(value: PbSinkFormatDesc) -> Result<Self, Self::Error> {
226        use risingwave_pb::plan_common::{EncodeType as E, FormatType as F};
227
228        let format = match value.format() {
229            F::Plain => SinkFormat::AppendOnly,
230            F::Upsert => SinkFormat::Upsert,
231            F::Debezium => SinkFormat::Debezium,
232            f @ (F::Unspecified
233            | F::Native
234            | F::DebeziumMongo
235            | F::Maxwell
236            | F::Canal
237            | F::None) => {
238                return Err(SinkError::Config(anyhow!(
239                    "sink format unsupported: {}",
240                    f.as_str_name()
241                )));
242            }
243        };
244        let encode = match value.encode() {
245            E::Json => SinkEncode::Json,
246            E::Protobuf => SinkEncode::Protobuf,
247            E::Template => SinkEncode::Template,
248            E::Avro => SinkEncode::Avro,
249            E::Parquet => SinkEncode::Parquet,
250            E::Bytes => SinkEncode::Bytes,
251            e @ (E::Unspecified | E::Native | E::Csv | E::None | E::Text) => {
252                return Err(SinkError::Config(anyhow!(
253                    "sink encode unsupported: {}",
254                    e.as_str_name()
255                )));
256            }
257        };
258        let key_encode = match &value.key_encode() {
259            E::Bytes => Some(SinkEncode::Bytes),
260            E::Text => Some(SinkEncode::Text),
261            E::Unspecified => None,
262            encode @ (E::Avro
263            | E::Csv
264            | E::Json
265            | E::Protobuf
266            | E::Template
267            | E::Native
268            | E::Parquet
269            | E::None) => {
270                return Err(SinkError::Config(anyhow!(
271                    "unsupported {} as sink key encode",
272                    encode.as_str_name()
273                )));
274            }
275        };
276
277        Ok(Self {
278            format,
279            encode,
280            options: value.options,
281            key_encode,
282            secret_refs: value.secret_refs,
283            connection_id: value.connection_id,
284        })
285    }
286}
287
288/// the catalog of the sink. There are two kind of schema here. The full schema is all columns
289/// stored in the `column` which is the sink executor/fragment's output schema. The visible
290/// schema contains the columns whose `is_hidden` is false, which is the columns sink out to the
291/// external system. The distribution key and all other keys are indexed in the full schema.
292#[derive(Clone, Debug)]
293pub struct SinkCatalog {
294    /// Id of the sink.
295    pub id: SinkId,
296
297    /// Schema of the sink.
298    pub schema_id: SchemaId,
299
300    /// Database of the sink.
301    pub database_id: DatabaseId,
302
303    /// Name of the sink.
304    pub name: String,
305
306    /// The full `CREATE SINK` definition of the sink.
307    pub definition: String,
308
309    /// All columns of the sink. Note that this is NOT sorted by columnId in the vector.
310    columns: Vec<ColumnCatalog>,
311
312    /// Primary keys of the sink. Derived by the frontend.
313    pub plan_pk: Vec<ColumnOrder>,
314
315    /// User-defined primary key indices for upsert sink, if any.
316    pub downstream_pk: Option<Vec<usize>>,
317
318    /// Distribution key indices of the sink. For example, if `distribution_key = [1, 2]`, then the
319    /// distribution keys will be `columns[1]` and `columns[2]`.
320    pub distribution_key: Vec<usize>,
321
322    /// The properties of the sink.
323    pub properties: BTreeMap<String, String>,
324
325    /// Owner of the sink.
326    pub owner: UserId,
327
328    // The append-only behavior of the physical sink connector. Frontend will determine `sink_type`
329    // based on both its own derivation on the append-only attribute and other user-specified
330    // options in `properties`.
331    pub sink_type: SinkType,
332
333    // The format and encode of the sink.
334    pub format_desc: Option<SinkFormatDesc>,
335
336    /// Sink may use a privatelink connection to connect to the downstream system.
337    pub connection_id: Option<ConnectionId>,
338
339    pub created_at_epoch: Option<Epoch>,
340
341    pub initialized_at_epoch: Option<Epoch>,
342
343    /// Name of the database
344    pub db_name: String,
345
346    /// Name for the table info for Debezium sink
347    pub sink_from_name: String,
348    pub auto_refresh_schema_from_table: Option<TableId>,
349
350    pub target_table: Option<TableId>,
351
352    pub created_at_cluster_version: Option<String>,
353    pub initialized_at_cluster_version: Option<String>,
354    pub create_type: CreateType,
355
356    /// Indicate the stream job status, whether it is created or creating.
357    /// If it is creating, we should hide it.
358    pub stream_job_status: StreamJobStatus,
359
360    /// The secret reference for the sink, mapping from property name to secret id.
361    pub secret_refs: BTreeMap<String, PbSecretRef>,
362
363    /// Only for the sink whose target is a table. Columns of the target table when the sink is created. At this point all the default columns of the target table are all handled by the project operator in the sink plan.
364    pub original_target_columns: Vec<ColumnCatalog>,
365}
366
367impl SinkCatalog {
368    pub fn to_proto(&self) -> PbSink {
369        PbSink {
370            id: self.id,
371            schema_id: self.schema_id,
372            database_id: self.database_id,
373            name: self.name.clone(),
374            definition: self.definition.clone(),
375            columns: self.columns.iter().map(|c| c.to_protobuf()).collect_vec(),
376            plan_pk: self.plan_pk.iter().map(|o| o.to_protobuf()).collect(),
377            downstream_pk: (self.downstream_pk.as_ref())
378                .map_or_else(Vec::new, |pk| pk.iter().map(|idx| *idx as _).collect_vec()),
379            distribution_key: self
380                .distribution_key
381                .iter()
382                .map(|k| *k as i32)
383                .collect_vec(),
384            owner: self.owner.into(),
385            properties: self.properties.clone(),
386            sink_type: self.sink_type.to_proto() as i32,
387            format_desc: self.format_desc.as_ref().map(|f| f.to_proto()),
388            connection_id: self.connection_id,
389            initialized_at_epoch: self.initialized_at_epoch.map(|e| e.0),
390            created_at_epoch: self.created_at_epoch.map(|e| e.0),
391            db_name: self.db_name.clone(),
392            sink_from_name: self.sink_from_name.clone(),
393            stream_job_status: self.stream_job_status.to_proto().into(),
394            target_table: self.target_table,
395            created_at_cluster_version: self.created_at_cluster_version.clone(),
396            initialized_at_cluster_version: self.initialized_at_cluster_version.clone(),
397            create_type: self.create_type.to_proto() as i32,
398            secret_refs: self.secret_refs.clone(),
399            original_target_columns: self
400                .original_target_columns
401                .iter()
402                .map(|c| c.to_protobuf())
403                .collect_vec(),
404            auto_refresh_schema_from_table: self.auto_refresh_schema_from_table,
405        }
406    }
407
408    /// Returns the SQL statement that can be used to create this sink.
409    pub fn create_sql(&self) -> String {
410        self.definition.clone()
411    }
412
413    pub fn visible_columns(&self) -> impl Iterator<Item = &ColumnCatalog> {
414        self.columns.iter().filter(|c| !c.is_hidden)
415    }
416
417    pub fn full_columns(&self) -> &[ColumnCatalog] {
418        &self.columns
419    }
420
421    pub fn full_schema(&self) -> Schema {
422        let fields = self
423            .full_columns()
424            .iter()
425            .map(|column| Field::from(column.column_desc.clone()))
426            .collect_vec();
427        Schema { fields }
428    }
429
430    pub fn visible_schema(&self) -> Schema {
431        let fields = self
432            .visible_columns()
433            .map(|column| Field::from(column.column_desc.clone()))
434            .collect_vec();
435        Schema { fields }
436    }
437
438    pub fn unique_identity(&self) -> String {
439        // We need to align with meta here, so we've utilized the proto method.
440        self.to_proto().unique_identity()
441    }
442
443    pub fn is_created(&self) -> bool {
444        self.stream_job_status == StreamJobStatus::Created
445    }
446}
447
448impl From<PbSink> for SinkCatalog {
449    fn from(pb: PbSink) -> Self {
450        let sink_type = pb.get_sink_type().unwrap();
451        let create_type = pb.get_create_type().unwrap_or(PbCreateType::Foreground);
452        let stream_job_status = pb
453            .get_stream_job_status()
454            .unwrap_or(PbStreamJobStatus::Created);
455        let format_desc = match pb.format_desc {
456            Some(f) => f.try_into().ok(),
457            None => {
458                let connector = pb.properties.get(CONNECTOR_TYPE_KEY);
459                let r#type = pb.properties.get(SINK_TYPE_OPTION);
460                match (connector, r#type) {
461                    (Some(c), Some(t)) => SinkFormatDesc::from_legacy_type(c, t).ok().flatten(),
462                    _ => None,
463                }
464            }
465        };
466        SinkCatalog {
467            id: pb.id,
468            name: pb.name,
469            schema_id: pb.schema_id,
470            database_id: pb.database_id,
471            definition: pb.definition,
472            columns: pb
473                .columns
474                .into_iter()
475                .map(ColumnCatalog::from)
476                .collect_vec(),
477            plan_pk: pb
478                .plan_pk
479                .iter()
480                .map(ColumnOrder::from_protobuf)
481                .collect_vec(),
482            downstream_pk: if pb.downstream_pk.is_empty() {
483                None
484            } else {
485                Some(
486                    (pb.downstream_pk.into_iter())
487                        .map(|idx| idx as usize)
488                        .collect_vec(),
489                )
490            },
491            distribution_key: pb
492                .distribution_key
493                .into_iter()
494                .map(|k| k as _)
495                .collect_vec(),
496            properties: pb.properties,
497            owner: pb.owner.into(),
498            sink_type: SinkType::from_proto(sink_type),
499            format_desc,
500            connection_id: pb.connection_id,
501            created_at_epoch: pb.created_at_epoch.map(Epoch::from),
502            initialized_at_epoch: pb.initialized_at_epoch.map(Epoch::from),
503            db_name: pb.db_name,
504            sink_from_name: pb.sink_from_name,
505            auto_refresh_schema_from_table: pb.auto_refresh_schema_from_table,
506            target_table: pb.target_table,
507            initialized_at_cluster_version: pb.initialized_at_cluster_version,
508            created_at_cluster_version: pb.created_at_cluster_version,
509            create_type: CreateType::from_proto(create_type),
510            stream_job_status: StreamJobStatus::from_proto(stream_job_status),
511            secret_refs: pb.secret_refs,
512            original_target_columns: pb
513                .original_target_columns
514                .into_iter()
515                .map(ColumnCatalog::from)
516                .collect_vec(),
517        }
518    }
519}
520
521impl From<&PbSink> for SinkCatalog {
522    fn from(pb: &PbSink) -> Self {
523        pb.clone().into()
524    }
525}