risingwave_connector/sink/catalog/
mod.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15pub mod desc;
16
17use std::collections::BTreeMap;
18use std::fmt::{Display, Formatter};
19
20use anyhow::anyhow;
21use itertools::Itertools;
22use risingwave_common::catalog::{
23    ColumnCatalog, ConnectionId, CreateType, DatabaseId, Field, OBJECT_ID_PLACEHOLDER, Schema,
24    SchemaId, TableId, UserId,
25};
26use risingwave_common::util::epoch::Epoch;
27use risingwave_common::util::sort_util::ColumnOrder;
28use risingwave_pb::catalog::{
29    PbCreateType, PbSink, PbSinkFormatDesc, PbSinkType, PbStreamJobStatus,
30};
31use risingwave_pb::secret::PbSecretRef;
32use serde_derive::Serialize;
33
34use super::{
35    CONNECTOR_TYPE_KEY, SINK_TYPE_APPEND_ONLY, SINK_TYPE_DEBEZIUM, SINK_TYPE_OPTION,
36    SINK_TYPE_UPSERT, SinkError,
37};
38
39#[derive(Clone, Copy, Debug, Default, Hash, PartialOrd, PartialEq, Eq)]
40pub struct SinkId {
41    pub sink_id: u32,
42}
43
44impl SinkId {
45    pub const fn new(sink_id: u32) -> Self {
46        SinkId { sink_id }
47    }
48
49    /// Sometimes the id field is filled later, we use this value for better debugging.
50    pub const fn placeholder() -> Self {
51        SinkId {
52            sink_id: OBJECT_ID_PLACEHOLDER,
53        }
54    }
55
56    pub fn sink_id(&self) -> u32 {
57        self.sink_id
58    }
59}
60
61impl std::fmt::Display for SinkId {
62    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
63        write!(f, "{}", self.sink_id)
64    }
65}
66
67impl From<u32> for SinkId {
68    fn from(id: u32) -> Self {
69        Self::new(id)
70    }
71}
72impl From<SinkId> for u32 {
73    fn from(id: SinkId) -> Self {
74        id.sink_id
75    }
76}
77
78#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash)]
79pub enum SinkType {
80    /// The data written into the sink connector can only be INSERT. No UPDATE or DELETE is
81    /// allowed.
82    AppendOnly,
83    /// The input of the sink operator can be INSERT, UPDATE, or DELETE, but it must drop any
84    /// UPDATE or DELETE and write only INSERT into the sink connector.
85    ForceAppendOnly,
86    /// The data written into the sink connector can be INSERT, UPDATE, or DELETE.
87    Upsert,
88}
89
90impl SinkType {
91    pub fn is_append_only(&self) -> bool {
92        self == &Self::AppendOnly || self == &Self::ForceAppendOnly
93    }
94
95    pub fn is_upsert(&self) -> bool {
96        self == &Self::Upsert
97    }
98
99    pub fn to_proto(self) -> PbSinkType {
100        match self {
101            SinkType::AppendOnly => PbSinkType::AppendOnly,
102            SinkType::ForceAppendOnly => PbSinkType::ForceAppendOnly,
103            SinkType::Upsert => PbSinkType::Upsert,
104        }
105    }
106
107    pub fn from_proto(pb: PbSinkType) -> Self {
108        match pb {
109            PbSinkType::AppendOnly => SinkType::AppendOnly,
110            PbSinkType::ForceAppendOnly => SinkType::ForceAppendOnly,
111            PbSinkType::Upsert => SinkType::Upsert,
112            PbSinkType::Unspecified => unreachable!(),
113        }
114    }
115}
116
117/// May replace [`SinkType`].
118///
119/// TODO: consolidate with [`crate::source::SourceStruct`] and [`crate::parser::SpecificParserConfig`].
120#[derive(Debug, Clone, PartialEq, Eq, Hash)]
121pub struct SinkFormatDesc {
122    pub format: SinkFormat,
123    pub encode: SinkEncode,
124    pub options: BTreeMap<String, String>,
125    pub secret_refs: BTreeMap<String, PbSecretRef>,
126    pub key_encode: Option<SinkEncode>,
127    pub connection_id: Option<u32>,
128}
129
130/// TODO: consolidate with [`crate::source::SourceFormat`] and [`crate::parser::ProtocolProperties`].
131#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize)]
132pub enum SinkFormat {
133    AppendOnly,
134    Upsert,
135    Debezium,
136}
137
138impl Display for SinkFormat {
139    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
140        write!(f, "{:?}", self)
141    }
142}
143
144/// TODO: consolidate with [`crate::source::SourceEncode`] and [`crate::parser::EncodingProperties`].
145#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize)]
146pub enum SinkEncode {
147    Json,
148    Protobuf,
149    Avro,
150    Template,
151    Parquet,
152    Text,
153    Bytes,
154}
155
156impl Display for SinkEncode {
157    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
158        write!(f, "{:?}", self)
159    }
160}
161
162impl SinkFormatDesc {
163    pub fn from_legacy_type(connector: &str, r#type: &str) -> Result<Option<Self>, SinkError> {
164        use crate::sink::Sink as _;
165        use crate::sink::kafka::KafkaSink;
166        use crate::sink::kinesis::KinesisSink;
167        use crate::sink::pulsar::PulsarSink;
168
169        let format = match r#type {
170            SINK_TYPE_APPEND_ONLY => SinkFormat::AppendOnly,
171            SINK_TYPE_UPSERT => SinkFormat::Upsert,
172            SINK_TYPE_DEBEZIUM => SinkFormat::Debezium,
173            _ => {
174                return Err(SinkError::Config(anyhow!(
175                    "sink type unsupported: {}",
176                    r#type
177                )));
178            }
179        };
180        let encode = match connector {
181            KafkaSink::SINK_NAME | KinesisSink::SINK_NAME | PulsarSink::SINK_NAME => {
182                SinkEncode::Json
183            }
184            _ => return Ok(None),
185        };
186        Ok(Some(Self {
187            format,
188            encode,
189            options: Default::default(),
190            secret_refs: Default::default(),
191            key_encode: None,
192            connection_id: None,
193        }))
194    }
195
196    pub fn to_proto(&self) -> PbSinkFormatDesc {
197        use risingwave_pb::plan_common::{EncodeType as E, FormatType as F};
198
199        let format = match self.format {
200            SinkFormat::AppendOnly => F::Plain,
201            SinkFormat::Upsert => F::Upsert,
202            SinkFormat::Debezium => F::Debezium,
203        };
204        let mapping_encode = |sink_encode: &SinkEncode| match sink_encode {
205            SinkEncode::Json => E::Json,
206            SinkEncode::Protobuf => E::Protobuf,
207            SinkEncode::Avro => E::Avro,
208            SinkEncode::Template => E::Template,
209            SinkEncode::Parquet => E::Parquet,
210            SinkEncode::Text => E::Text,
211            SinkEncode::Bytes => E::Bytes,
212        };
213
214        let encode = mapping_encode(&self.encode);
215        let key_encode = self.key_encode.as_ref().map(|e| mapping_encode(e).into());
216        let options = self
217            .options
218            .iter()
219            .map(|(k, v)| (k.clone(), v.clone()))
220            .collect();
221
222        PbSinkFormatDesc {
223            format: format.into(),
224            encode: encode.into(),
225            options,
226            key_encode,
227            secret_refs: self.secret_refs.clone(),
228            connection_id: self.connection_id,
229        }
230    }
231
232    // This function is for compatibility purposes. It sets the `SinkFormatDesc`
233    // when there is no configuration provided for the snowflake sink only.
234    pub fn plain_json_for_snowflake_only() -> Self {
235        Self {
236            format: SinkFormat::AppendOnly,
237            encode: SinkEncode::Json,
238            options: Default::default(),
239            secret_refs: Default::default(),
240            key_encode: None,
241            connection_id: None,
242        }
243    }
244}
245
246impl TryFrom<PbSinkFormatDesc> for SinkFormatDesc {
247    type Error = SinkError;
248
249    fn try_from(value: PbSinkFormatDesc) -> Result<Self, Self::Error> {
250        use risingwave_pb::plan_common::{EncodeType as E, FormatType as F};
251
252        let format = match value.format() {
253            F::Plain => SinkFormat::AppendOnly,
254            F::Upsert => SinkFormat::Upsert,
255            F::Debezium => SinkFormat::Debezium,
256            f @ (F::Unspecified
257            | F::Native
258            | F::DebeziumMongo
259            | F::Maxwell
260            | F::Canal
261            | F::None) => {
262                return Err(SinkError::Config(anyhow!(
263                    "sink format unsupported: {}",
264                    f.as_str_name()
265                )));
266            }
267        };
268        let encode = match value.encode() {
269            E::Json => SinkEncode::Json,
270            E::Protobuf => SinkEncode::Protobuf,
271            E::Template => SinkEncode::Template,
272            E::Avro => SinkEncode::Avro,
273            E::Parquet => SinkEncode::Parquet,
274            e @ (E::Unspecified | E::Native | E::Csv | E::Bytes | E::None | E::Text) => {
275                return Err(SinkError::Config(anyhow!(
276                    "sink encode unsupported: {}",
277                    e.as_str_name()
278                )));
279            }
280        };
281        let key_encode = match &value.key_encode() {
282            E::Bytes => Some(SinkEncode::Bytes),
283            E::Text => Some(SinkEncode::Text),
284            E::Unspecified => None,
285            encode @ (E::Avro
286            | E::Csv
287            | E::Json
288            | E::Protobuf
289            | E::Template
290            | E::Native
291            | E::Parquet
292            | E::None) => {
293                return Err(SinkError::Config(anyhow!(
294                    "unsupported {} as sink key encode",
295                    encode.as_str_name()
296                )));
297            }
298        };
299
300        Ok(Self {
301            format,
302            encode,
303            options: value.options,
304            key_encode,
305            secret_refs: value.secret_refs,
306            connection_id: value.connection_id,
307        })
308    }
309}
310
311/// the catalog of the sink. There are two kind of schema here. The full schema is all columns
312/// stored in the `column` which is the sink executor/fragment's output schema. The visible
313/// schema contains the columns whose `is_hidden` is false, which is the columns sink out to the
314/// external system. The distribution key and all other keys are indexed in the full schema.
315#[derive(Clone, Debug)]
316pub struct SinkCatalog {
317    /// Id of the sink.
318    pub id: SinkId,
319
320    /// Schema of the sink.
321    pub schema_id: SchemaId,
322
323    /// Database of the sink.
324    pub database_id: DatabaseId,
325
326    /// Name of the sink.
327    pub name: String,
328
329    /// The full `CREATE SINK` definition of the sink.
330    pub definition: String,
331
332    /// All columns of the sink. Note that this is NOT sorted by columnId in the vector.
333    columns: Vec<ColumnCatalog>,
334
335    /// Primary keys of the sink. Derived by the frontend.
336    pub plan_pk: Vec<ColumnOrder>,
337
338    /// User-defined primary key indices for upsert sink.
339    pub downstream_pk: Vec<usize>,
340
341    /// Distribution key indices of the sink. For example, if `distribution_key = [1, 2]`, then the
342    /// distribution keys will be `columns[1]` and `columns[2]`.
343    pub distribution_key: Vec<usize>,
344
345    /// The properties of the sink.
346    pub properties: BTreeMap<String, String>,
347
348    /// Owner of the sink.
349    pub owner: UserId,
350
351    // The append-only behavior of the physical sink connector. Frontend will determine `sink_type`
352    // based on both its own derivation on the append-only attribute and other user-specified
353    // options in `properties`.
354    pub sink_type: SinkType,
355
356    // The format and encode of the sink.
357    pub format_desc: Option<SinkFormatDesc>,
358
359    /// Sink may use a privatelink connection to connect to the downstream system.
360    pub connection_id: Option<ConnectionId>,
361
362    pub created_at_epoch: Option<Epoch>,
363
364    pub initialized_at_epoch: Option<Epoch>,
365
366    /// Name of the database
367    pub db_name: String,
368
369    /// Name for the table info for Debezium sink
370    pub sink_from_name: String,
371
372    pub target_table: Option<TableId>,
373
374    pub created_at_cluster_version: Option<String>,
375    pub initialized_at_cluster_version: Option<String>,
376    pub create_type: CreateType,
377
378    /// The secret reference for the sink, mapping from property name to secret id.
379    pub secret_refs: BTreeMap<String, PbSecretRef>,
380
381    /// Only for the sink whose target is a table. Columns of the target table when the sink is created. At this point all the default columns of the target table are all handled by the project operator in the sink plan.
382    pub original_target_columns: Vec<ColumnCatalog>,
383}
384
385impl SinkCatalog {
386    pub fn to_proto(&self) -> PbSink {
387        #[allow(deprecated)] // for `dependent_relations`
388        PbSink {
389            id: self.id.into(),
390            schema_id: self.schema_id.schema_id,
391            database_id: self.database_id.database_id,
392            name: self.name.clone(),
393            definition: self.definition.clone(),
394            columns: self.columns.iter().map(|c| c.to_protobuf()).collect_vec(),
395            plan_pk: self.plan_pk.iter().map(|o| o.to_protobuf()).collect(),
396            downstream_pk: self
397                .downstream_pk
398                .iter()
399                .map(|idx| *idx as i32)
400                .collect_vec(),
401            dependent_relations: vec![],
402            distribution_key: self
403                .distribution_key
404                .iter()
405                .map(|k| *k as i32)
406                .collect_vec(),
407            owner: self.owner.into(),
408            properties: self.properties.clone(),
409            sink_type: self.sink_type.to_proto() as i32,
410            format_desc: self.format_desc.as_ref().map(|f| f.to_proto()),
411            connection_id: self.connection_id.map(|id| id.into()),
412            initialized_at_epoch: self.initialized_at_epoch.map(|e| e.0),
413            created_at_epoch: self.created_at_epoch.map(|e| e.0),
414            db_name: self.db_name.clone(),
415            sink_from_name: self.sink_from_name.clone(),
416            stream_job_status: PbStreamJobStatus::Creating.into(),
417            target_table: self.target_table.map(|table_id| table_id.table_id()),
418            created_at_cluster_version: self.created_at_cluster_version.clone(),
419            initialized_at_cluster_version: self.initialized_at_cluster_version.clone(),
420            create_type: self.create_type.to_proto() as i32,
421            secret_refs: self.secret_refs.clone(),
422            original_target_columns: self
423                .original_target_columns
424                .iter()
425                .map(|c| c.to_protobuf())
426                .collect_vec(),
427        }
428    }
429
430    /// Returns the SQL statement that can be used to create this sink.
431    pub fn create_sql(&self) -> String {
432        self.definition.clone()
433    }
434
435    pub fn visible_columns(&self) -> impl Iterator<Item = &ColumnCatalog> {
436        self.columns.iter().filter(|c| !c.is_hidden)
437    }
438
439    pub fn full_columns(&self) -> &[ColumnCatalog] {
440        &self.columns
441    }
442
443    pub fn full_schema(&self) -> Schema {
444        let fields = self
445            .full_columns()
446            .iter()
447            .map(|column| Field::from(column.column_desc.clone()))
448            .collect_vec();
449        Schema { fields }
450    }
451
452    pub fn visible_schema(&self) -> Schema {
453        let fields = self
454            .visible_columns()
455            .map(|column| Field::from(column.column_desc.clone()))
456            .collect_vec();
457        Schema { fields }
458    }
459
460    pub fn downstream_pk_indices(&self) -> Vec<usize> {
461        self.downstream_pk.clone()
462    }
463
464    pub fn unique_identity(&self) -> String {
465        // We need to align with meta here, so we've utilized the proto method.
466        self.to_proto().unique_identity()
467    }
468}
469
470impl From<PbSink> for SinkCatalog {
471    fn from(pb: PbSink) -> Self {
472        let sink_type = pb.get_sink_type().unwrap();
473        let create_type = pb.get_create_type().unwrap_or(PbCreateType::Foreground);
474        let format_desc = match pb.format_desc {
475            Some(f) => f.try_into().ok(),
476            None => {
477                let connector = pb.properties.get(CONNECTOR_TYPE_KEY);
478                let r#type = pb.properties.get(SINK_TYPE_OPTION);
479                match (connector, r#type) {
480                    (Some(c), Some(t)) => SinkFormatDesc::from_legacy_type(c, t).ok().flatten(),
481                    _ => None,
482                }
483            }
484        };
485        SinkCatalog {
486            id: pb.id.into(),
487            name: pb.name,
488            schema_id: pb.schema_id.into(),
489            database_id: pb.database_id.into(),
490            definition: pb.definition,
491            columns: pb
492                .columns
493                .into_iter()
494                .map(ColumnCatalog::from)
495                .collect_vec(),
496            plan_pk: pb
497                .plan_pk
498                .iter()
499                .map(ColumnOrder::from_protobuf)
500                .collect_vec(),
501            downstream_pk: pb.downstream_pk.into_iter().map(|k| k as _).collect_vec(),
502            distribution_key: pb
503                .distribution_key
504                .into_iter()
505                .map(|k| k as _)
506                .collect_vec(),
507            properties: pb.properties,
508            owner: pb.owner.into(),
509            sink_type: SinkType::from_proto(sink_type),
510            format_desc,
511            connection_id: pb.connection_id.map(ConnectionId),
512            created_at_epoch: pb.created_at_epoch.map(Epoch::from),
513            initialized_at_epoch: pb.initialized_at_epoch.map(Epoch::from),
514            db_name: pb.db_name,
515            sink_from_name: pb.sink_from_name,
516            target_table: pb.target_table.map(TableId::new),
517            initialized_at_cluster_version: pb.initialized_at_cluster_version,
518            created_at_cluster_version: pb.created_at_cluster_version,
519            create_type: CreateType::from_proto(create_type),
520            secret_refs: pb.secret_refs,
521            original_target_columns: pb
522                .original_target_columns
523                .into_iter()
524                .map(ColumnCatalog::from)
525                .collect_vec(),
526        }
527    }
528}
529
530impl From<&PbSink> for SinkCatalog {
531    fn from(pb: &PbSink) -> Self {
532        pb.clone().into()
533    }
534}