risingwave_connector/sink/catalog/
mod.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15pub mod desc;
16
17use std::collections::BTreeMap;
18use std::fmt::{Display, Formatter};
19
20use anyhow::anyhow;
21use itertools::Itertools;
22use risingwave_common::catalog::{
23    ColumnCatalog, ConnectionId, CreateType, DatabaseId, Field, OBJECT_ID_PLACEHOLDER, Schema,
24    SchemaId, StreamJobStatus, TableId, UserId,
25};
26use risingwave_common::util::epoch::Epoch;
27use risingwave_common::util::sort_util::ColumnOrder;
28use risingwave_pb::catalog::{
29    PbCreateType, PbSink, PbSinkFormatDesc, PbSinkType, PbStreamJobStatus,
30};
31use risingwave_pb::secret::PbSecretRef;
32use serde::Serialize;
33
34use super::{
35    CONNECTOR_TYPE_KEY, SINK_TYPE_APPEND_ONLY, SINK_TYPE_DEBEZIUM, SINK_TYPE_OPTION,
36    SINK_TYPE_UPSERT, SinkError,
37};
38
39#[derive(Clone, Copy, Debug, Default, Hash, PartialOrd, PartialEq, Eq)]
40pub struct SinkId {
41    pub sink_id: u32,
42}
43
44impl SinkId {
45    pub const fn new(sink_id: u32) -> Self {
46        SinkId { sink_id }
47    }
48
49    /// Sometimes the id field is filled later, we use this value for better debugging.
50    pub const fn placeholder() -> Self {
51        SinkId {
52            sink_id: OBJECT_ID_PLACEHOLDER,
53        }
54    }
55
56    pub fn sink_id(&self) -> u32 {
57        self.sink_id
58    }
59}
60
61impl std::fmt::Display for SinkId {
62    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
63        write!(f, "{}", self.sink_id)
64    }
65}
66
67impl From<u32> for SinkId {
68    fn from(id: u32) -> Self {
69        Self::new(id)
70    }
71}
72impl From<SinkId> for u32 {
73    fn from(id: SinkId) -> Self {
74        id.sink_id
75    }
76}
77
78#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash)]
79pub enum SinkType {
80    /// The data written into the sink connector can only be INSERT. No UPDATE or DELETE is
81    /// allowed.
82    AppendOnly,
83    /// The input of the sink operator can be INSERT, UPDATE, or DELETE, but it must drop any
84    /// UPDATE or DELETE and write only INSERT into the sink connector.
85    ForceAppendOnly,
86    /// The data written into the sink connector can be INSERT or DELETE.
87    /// When updating a row, an INSERT with new value will be written.
88    Upsert,
89    /// The data written into the sink connector can be INSERT, UPDATE, or DELETE.
90    /// When updating a row, an UPDATE pair (U- then U+) will be written.
91    ///
92    /// Currently only used by DEBEZIUM format.
93    Retract,
94}
95
96impl SinkType {
97    /// Whether the sink type is `AppendOnly` or `ForceAppendOnly`.
98    pub fn is_append_only(self) -> bool {
99        self == Self::AppendOnly || self == Self::ForceAppendOnly
100    }
101
102    /// Convert to the string specified in `type = '...'` within the WITH options.
103    pub fn type_str(self) -> &'static str {
104        match self {
105            SinkType::AppendOnly | SinkType::ForceAppendOnly => "append-only",
106            SinkType::Upsert => "upsert",
107            SinkType::Retract => "retract",
108        }
109    }
110
111    pub fn to_proto(self) -> PbSinkType {
112        match self {
113            SinkType::AppendOnly => PbSinkType::AppendOnly,
114            SinkType::ForceAppendOnly => PbSinkType::ForceAppendOnly,
115            SinkType::Upsert => PbSinkType::Upsert,
116            SinkType::Retract => PbSinkType::Retract,
117        }
118    }
119
120    pub fn from_proto(pb: PbSinkType) -> Self {
121        match pb {
122            PbSinkType::AppendOnly => SinkType::AppendOnly,
123            PbSinkType::ForceAppendOnly => SinkType::ForceAppendOnly,
124            PbSinkType::Upsert => SinkType::Upsert,
125            PbSinkType::Retract => SinkType::Retract,
126            PbSinkType::Unspecified => unreachable!(),
127        }
128    }
129}
130
131/// May replace [`SinkType`].
132///
133/// TODO: consolidate with [`crate::source::SourceStruct`] and [`crate::parser::SpecificParserConfig`].
134#[derive(Debug, Clone, PartialEq, Eq, Hash)]
135pub struct SinkFormatDesc {
136    pub format: SinkFormat,
137    pub encode: SinkEncode,
138    pub options: BTreeMap<String, String>,
139    pub secret_refs: BTreeMap<String, PbSecretRef>,
140    pub key_encode: Option<SinkEncode>,
141    pub connection_id: Option<u32>,
142}
143
144/// TODO: consolidate with [`crate::source::SourceFormat`] and [`crate::parser::ProtocolProperties`].
145#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize)]
146pub enum SinkFormat {
147    AppendOnly,
148    Upsert,
149    Debezium,
150}
151
152impl Display for SinkFormat {
153    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
154        write!(f, "{:?}", self)
155    }
156}
157
158/// TODO: consolidate with [`crate::source::SourceEncode`] and [`crate::parser::EncodingProperties`].
159#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize)]
160pub enum SinkEncode {
161    Json,
162    Protobuf,
163    Avro,
164    Template,
165    Parquet,
166    Text,
167    Bytes,
168}
169
170impl Display for SinkEncode {
171    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
172        write!(f, "{:?}", self)
173    }
174}
175
176impl SinkFormatDesc {
177    pub fn from_legacy_type(connector: &str, r#type: &str) -> Result<Option<Self>, SinkError> {
178        use crate::sink::Sink as _;
179        use crate::sink::kafka::KafkaSink;
180        use crate::sink::kinesis::KinesisSink;
181        use crate::sink::pulsar::PulsarSink;
182
183        let format = match r#type {
184            SINK_TYPE_APPEND_ONLY => SinkFormat::AppendOnly,
185            SINK_TYPE_UPSERT => SinkFormat::Upsert,
186            SINK_TYPE_DEBEZIUM => SinkFormat::Debezium,
187            _ => {
188                return Err(SinkError::Config(anyhow!(
189                    "sink type unsupported: {}",
190                    r#type
191                )));
192            }
193        };
194        let encode = match connector {
195            KafkaSink::SINK_NAME | KinesisSink::SINK_NAME | PulsarSink::SINK_NAME => {
196                SinkEncode::Json
197            }
198            _ => return Ok(None),
199        };
200        Ok(Some(Self {
201            format,
202            encode,
203            options: Default::default(),
204            secret_refs: Default::default(),
205            key_encode: None,
206            connection_id: None,
207        }))
208    }
209
210    pub fn to_proto(&self) -> PbSinkFormatDesc {
211        use risingwave_pb::plan_common::{EncodeType as E, FormatType as F};
212
213        let format = match self.format {
214            SinkFormat::AppendOnly => F::Plain,
215            SinkFormat::Upsert => F::Upsert,
216            SinkFormat::Debezium => F::Debezium,
217        };
218        let mapping_encode = |sink_encode: &SinkEncode| match sink_encode {
219            SinkEncode::Json => E::Json,
220            SinkEncode::Protobuf => E::Protobuf,
221            SinkEncode::Avro => E::Avro,
222            SinkEncode::Template => E::Template,
223            SinkEncode::Parquet => E::Parquet,
224            SinkEncode::Text => E::Text,
225            SinkEncode::Bytes => E::Bytes,
226        };
227
228        let encode = mapping_encode(&self.encode);
229        let key_encode = self.key_encode.as_ref().map(|e| mapping_encode(e).into());
230        let options = self
231            .options
232            .iter()
233            .map(|(k, v)| (k.clone(), v.clone()))
234            .collect();
235
236        PbSinkFormatDesc {
237            format: format.into(),
238            encode: encode.into(),
239            options,
240            key_encode,
241            secret_refs: self.secret_refs.clone(),
242            connection_id: self.connection_id,
243        }
244    }
245
246    // This function is for compatibility purposes. It sets the `SinkFormatDesc`
247    // when there is no configuration provided for the snowflake sink only.
248    pub fn plain_json_for_snowflake_only() -> Self {
249        Self {
250            format: SinkFormat::AppendOnly,
251            encode: SinkEncode::Json,
252            options: Default::default(),
253            secret_refs: Default::default(),
254            key_encode: None,
255            connection_id: None,
256        }
257    }
258}
259
260impl TryFrom<PbSinkFormatDesc> for SinkFormatDesc {
261    type Error = SinkError;
262
263    fn try_from(value: PbSinkFormatDesc) -> Result<Self, Self::Error> {
264        use risingwave_pb::plan_common::{EncodeType as E, FormatType as F};
265
266        let format = match value.format() {
267            F::Plain => SinkFormat::AppendOnly,
268            F::Upsert => SinkFormat::Upsert,
269            F::Debezium => SinkFormat::Debezium,
270            f @ (F::Unspecified
271            | F::Native
272            | F::DebeziumMongo
273            | F::Maxwell
274            | F::Canal
275            | F::None) => {
276                return Err(SinkError::Config(anyhow!(
277                    "sink format unsupported: {}",
278                    f.as_str_name()
279                )));
280            }
281        };
282        let encode = match value.encode() {
283            E::Json => SinkEncode::Json,
284            E::Protobuf => SinkEncode::Protobuf,
285            E::Template => SinkEncode::Template,
286            E::Avro => SinkEncode::Avro,
287            E::Parquet => SinkEncode::Parquet,
288            e @ (E::Unspecified | E::Native | E::Csv | E::Bytes | E::None | E::Text) => {
289                return Err(SinkError::Config(anyhow!(
290                    "sink encode unsupported: {}",
291                    e.as_str_name()
292                )));
293            }
294        };
295        let key_encode = match &value.key_encode() {
296            E::Bytes => Some(SinkEncode::Bytes),
297            E::Text => Some(SinkEncode::Text),
298            E::Unspecified => None,
299            encode @ (E::Avro
300            | E::Csv
301            | E::Json
302            | E::Protobuf
303            | E::Template
304            | E::Native
305            | E::Parquet
306            | E::None) => {
307                return Err(SinkError::Config(anyhow!(
308                    "unsupported {} as sink key encode",
309                    encode.as_str_name()
310                )));
311            }
312        };
313
314        Ok(Self {
315            format,
316            encode,
317            options: value.options,
318            key_encode,
319            secret_refs: value.secret_refs,
320            connection_id: value.connection_id,
321        })
322    }
323}
324
325/// the catalog of the sink. There are two kind of schema here. The full schema is all columns
326/// stored in the `column` which is the sink executor/fragment's output schema. The visible
327/// schema contains the columns whose `is_hidden` is false, which is the columns sink out to the
328/// external system. The distribution key and all other keys are indexed in the full schema.
329#[derive(Clone, Debug)]
330pub struct SinkCatalog {
331    /// Id of the sink.
332    pub id: SinkId,
333
334    /// Schema of the sink.
335    pub schema_id: SchemaId,
336
337    /// Database of the sink.
338    pub database_id: DatabaseId,
339
340    /// Name of the sink.
341    pub name: String,
342
343    /// The full `CREATE SINK` definition of the sink.
344    pub definition: String,
345
346    /// All columns of the sink. Note that this is NOT sorted by columnId in the vector.
347    columns: Vec<ColumnCatalog>,
348
349    /// Primary keys of the sink. Derived by the frontend.
350    pub plan_pk: Vec<ColumnOrder>,
351
352    /// User-defined primary key indices for upsert sink, if any.
353    pub downstream_pk: Option<Vec<usize>>,
354
355    /// Distribution key indices of the sink. For example, if `distribution_key = [1, 2]`, then the
356    /// distribution keys will be `columns[1]` and `columns[2]`.
357    pub distribution_key: Vec<usize>,
358
359    /// The properties of the sink.
360    pub properties: BTreeMap<String, String>,
361
362    /// Owner of the sink.
363    pub owner: UserId,
364
365    // The append-only behavior of the physical sink connector. Frontend will determine `sink_type`
366    // based on both its own derivation on the append-only attribute and other user-specified
367    // options in `properties`.
368    pub sink_type: SinkType,
369
370    // The format and encode of the sink.
371    pub format_desc: Option<SinkFormatDesc>,
372
373    /// Sink may use a privatelink connection to connect to the downstream system.
374    pub connection_id: Option<ConnectionId>,
375
376    pub created_at_epoch: Option<Epoch>,
377
378    pub initialized_at_epoch: Option<Epoch>,
379
380    /// Name of the database
381    pub db_name: String,
382
383    /// Name for the table info for Debezium sink
384    pub sink_from_name: String,
385    pub auto_refresh_schema_from_table: Option<TableId>,
386
387    pub target_table: Option<TableId>,
388
389    pub created_at_cluster_version: Option<String>,
390    pub initialized_at_cluster_version: Option<String>,
391    pub create_type: CreateType,
392
393    /// Indicate the stream job status, whether it is created or creating.
394    /// If it is creating, we should hide it.
395    pub stream_job_status: StreamJobStatus,
396
397    /// The secret reference for the sink, mapping from property name to secret id.
398    pub secret_refs: BTreeMap<String, PbSecretRef>,
399
400    /// Only for the sink whose target is a table. Columns of the target table when the sink is created. At this point all the default columns of the target table are all handled by the project operator in the sink plan.
401    pub original_target_columns: Vec<ColumnCatalog>,
402}
403
404impl SinkCatalog {
405    pub fn to_proto(&self) -> PbSink {
406        PbSink {
407            id: self.id.into(),
408            schema_id: self.schema_id,
409            database_id: self.database_id,
410            name: self.name.clone(),
411            definition: self.definition.clone(),
412            columns: self.columns.iter().map(|c| c.to_protobuf()).collect_vec(),
413            plan_pk: self.plan_pk.iter().map(|o| o.to_protobuf()).collect(),
414            downstream_pk: (self.downstream_pk.as_ref())
415                .map_or_else(Vec::new, |pk| pk.iter().map(|idx| *idx as _).collect_vec()),
416            distribution_key: self
417                .distribution_key
418                .iter()
419                .map(|k| *k as i32)
420                .collect_vec(),
421            owner: self.owner.into(),
422            properties: self.properties.clone(),
423            sink_type: self.sink_type.to_proto() as i32,
424            format_desc: self.format_desc.as_ref().map(|f| f.to_proto()),
425            connection_id: self.connection_id.map(|id| id.into()),
426            initialized_at_epoch: self.initialized_at_epoch.map(|e| e.0),
427            created_at_epoch: self.created_at_epoch.map(|e| e.0),
428            db_name: self.db_name.clone(),
429            sink_from_name: self.sink_from_name.clone(),
430            stream_job_status: self.stream_job_status.to_proto().into(),
431            target_table: self.target_table,
432            created_at_cluster_version: self.created_at_cluster_version.clone(),
433            initialized_at_cluster_version: self.initialized_at_cluster_version.clone(),
434            create_type: self.create_type.to_proto() as i32,
435            secret_refs: self.secret_refs.clone(),
436            original_target_columns: self
437                .original_target_columns
438                .iter()
439                .map(|c| c.to_protobuf())
440                .collect_vec(),
441            auto_refresh_schema_from_table: self.auto_refresh_schema_from_table,
442        }
443    }
444
445    /// Returns the SQL statement that can be used to create this sink.
446    pub fn create_sql(&self) -> String {
447        self.definition.clone()
448    }
449
450    pub fn visible_columns(&self) -> impl Iterator<Item = &ColumnCatalog> {
451        self.columns.iter().filter(|c| !c.is_hidden)
452    }
453
454    pub fn full_columns(&self) -> &[ColumnCatalog] {
455        &self.columns
456    }
457
458    pub fn full_schema(&self) -> Schema {
459        let fields = self
460            .full_columns()
461            .iter()
462            .map(|column| Field::from(column.column_desc.clone()))
463            .collect_vec();
464        Schema { fields }
465    }
466
467    pub fn visible_schema(&self) -> Schema {
468        let fields = self
469            .visible_columns()
470            .map(|column| Field::from(column.column_desc.clone()))
471            .collect_vec();
472        Schema { fields }
473    }
474
475    pub fn unique_identity(&self) -> String {
476        // We need to align with meta here, so we've utilized the proto method.
477        self.to_proto().unique_identity()
478    }
479
480    pub fn is_created(&self) -> bool {
481        self.stream_job_status == StreamJobStatus::Created
482    }
483}
484
485impl From<PbSink> for SinkCatalog {
486    fn from(pb: PbSink) -> Self {
487        let sink_type = pb.get_sink_type().unwrap();
488        let create_type = pb.get_create_type().unwrap_or(PbCreateType::Foreground);
489        let stream_job_status = pb
490            .get_stream_job_status()
491            .unwrap_or(PbStreamJobStatus::Created);
492        let format_desc = match pb.format_desc {
493            Some(f) => f.try_into().ok(),
494            None => {
495                let connector = pb.properties.get(CONNECTOR_TYPE_KEY);
496                let r#type = pb.properties.get(SINK_TYPE_OPTION);
497                match (connector, r#type) {
498                    (Some(c), Some(t)) => SinkFormatDesc::from_legacy_type(c, t).ok().flatten(),
499                    _ => None,
500                }
501            }
502        };
503        SinkCatalog {
504            id: pb.id.into(),
505            name: pb.name,
506            schema_id: pb.schema_id,
507            database_id: pb.database_id,
508            definition: pb.definition,
509            columns: pb
510                .columns
511                .into_iter()
512                .map(ColumnCatalog::from)
513                .collect_vec(),
514            plan_pk: pb
515                .plan_pk
516                .iter()
517                .map(ColumnOrder::from_protobuf)
518                .collect_vec(),
519            downstream_pk: if pb.downstream_pk.is_empty() {
520                None
521            } else {
522                Some(
523                    (pb.downstream_pk.into_iter())
524                        .map(|idx| idx as usize)
525                        .collect_vec(),
526                )
527            },
528            distribution_key: pb
529                .distribution_key
530                .into_iter()
531                .map(|k| k as _)
532                .collect_vec(),
533            properties: pb.properties,
534            owner: pb.owner.into(),
535            sink_type: SinkType::from_proto(sink_type),
536            format_desc,
537            connection_id: pb.connection_id.map(ConnectionId),
538            created_at_epoch: pb.created_at_epoch.map(Epoch::from),
539            initialized_at_epoch: pb.initialized_at_epoch.map(Epoch::from),
540            db_name: pb.db_name,
541            sink_from_name: pb.sink_from_name,
542            auto_refresh_schema_from_table: pb.auto_refresh_schema_from_table,
543            target_table: pb.target_table,
544            initialized_at_cluster_version: pb.initialized_at_cluster_version,
545            created_at_cluster_version: pb.created_at_cluster_version,
546            create_type: CreateType::from_proto(create_type),
547            stream_job_status: StreamJobStatus::from_proto(stream_job_status),
548            secret_refs: pb.secret_refs,
549            original_target_columns: pb
550                .original_target_columns
551                .into_iter()
552                .map(ColumnCatalog::from)
553                .collect_vec(),
554        }
555    }
556}
557
558impl From<&PbSink> for SinkCatalog {
559    fn from(pb: &PbSink) -> Self {
560        pb.clone().into()
561    }
562}