risingwave_connector/sink/catalog/
mod.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15pub mod desc;
16
17use std::collections::BTreeMap;
18use std::fmt::{Display, Formatter};
19
20use anyhow::anyhow;
21use itertools::Itertools;
22use risingwave_common::catalog::{
23    ColumnCatalog, ConnectionId, CreateType, DatabaseId, Field, OBJECT_ID_PLACEHOLDER, Schema,
24    SchemaId, StreamJobStatus, TableId, UserId,
25};
26use risingwave_common::util::epoch::Epoch;
27use risingwave_common::util::sort_util::ColumnOrder;
28use risingwave_pb::catalog::{
29    PbCreateType, PbSink, PbSinkFormatDesc, PbSinkType, PbStreamJobStatus,
30};
31use risingwave_pb::secret::PbSecretRef;
32use serde_derive::Serialize;
33
34use super::{
35    CONNECTOR_TYPE_KEY, SINK_TYPE_APPEND_ONLY, SINK_TYPE_DEBEZIUM, SINK_TYPE_OPTION,
36    SINK_TYPE_UPSERT, SinkError,
37};
38
39#[derive(Clone, Copy, Debug, Default, Hash, PartialOrd, PartialEq, Eq)]
40pub struct SinkId {
41    pub sink_id: u32,
42}
43
44impl SinkId {
45    pub const fn new(sink_id: u32) -> Self {
46        SinkId { sink_id }
47    }
48
49    /// Sometimes the id field is filled later, we use this value for better debugging.
50    pub const fn placeholder() -> Self {
51        SinkId {
52            sink_id: OBJECT_ID_PLACEHOLDER,
53        }
54    }
55
56    pub fn sink_id(&self) -> u32 {
57        self.sink_id
58    }
59}
60
61impl std::fmt::Display for SinkId {
62    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
63        write!(f, "{}", self.sink_id)
64    }
65}
66
67impl From<u32> for SinkId {
68    fn from(id: u32) -> Self {
69        Self::new(id)
70    }
71}
72impl From<SinkId> for u32 {
73    fn from(id: SinkId) -> Self {
74        id.sink_id
75    }
76}
77
78#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash)]
79pub enum SinkType {
80    /// The data written into the sink connector can only be INSERT. No UPDATE or DELETE is
81    /// allowed.
82    AppendOnly,
83    /// The input of the sink operator can be INSERT, UPDATE, or DELETE, but it must drop any
84    /// UPDATE or DELETE and write only INSERT into the sink connector.
85    ForceAppendOnly,
86    /// The data written into the sink connector can be INSERT, UPDATE, or DELETE.
87    Upsert,
88}
89
90impl SinkType {
91    pub fn is_append_only(&self) -> bool {
92        self == &Self::AppendOnly || self == &Self::ForceAppendOnly
93    }
94
95    pub fn is_upsert(&self) -> bool {
96        self == &Self::Upsert
97    }
98
99    pub fn to_proto(self) -> PbSinkType {
100        match self {
101            SinkType::AppendOnly => PbSinkType::AppendOnly,
102            SinkType::ForceAppendOnly => PbSinkType::ForceAppendOnly,
103            SinkType::Upsert => PbSinkType::Upsert,
104        }
105    }
106
107    pub fn from_proto(pb: PbSinkType) -> Self {
108        match pb {
109            PbSinkType::AppendOnly => SinkType::AppendOnly,
110            PbSinkType::ForceAppendOnly => SinkType::ForceAppendOnly,
111            PbSinkType::Upsert => SinkType::Upsert,
112            PbSinkType::Unspecified => unreachable!(),
113        }
114    }
115}
116
117/// May replace [`SinkType`].
118///
119/// TODO: consolidate with [`crate::source::SourceStruct`] and [`crate::parser::SpecificParserConfig`].
120#[derive(Debug, Clone, PartialEq, Eq, Hash)]
121pub struct SinkFormatDesc {
122    pub format: SinkFormat,
123    pub encode: SinkEncode,
124    pub options: BTreeMap<String, String>,
125    pub secret_refs: BTreeMap<String, PbSecretRef>,
126    pub key_encode: Option<SinkEncode>,
127    pub connection_id: Option<u32>,
128}
129
130/// TODO: consolidate with [`crate::source::SourceFormat`] and [`crate::parser::ProtocolProperties`].
131#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize)]
132pub enum SinkFormat {
133    AppendOnly,
134    Upsert,
135    Debezium,
136}
137
138impl Display for SinkFormat {
139    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
140        write!(f, "{:?}", self)
141    }
142}
143
144/// TODO: consolidate with [`crate::source::SourceEncode`] and [`crate::parser::EncodingProperties`].
145#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize)]
146pub enum SinkEncode {
147    Json,
148    Protobuf,
149    Avro,
150    Template,
151    Parquet,
152    Text,
153    Bytes,
154}
155
156impl Display for SinkEncode {
157    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
158        write!(f, "{:?}", self)
159    }
160}
161
162impl SinkFormatDesc {
163    pub fn from_legacy_type(connector: &str, r#type: &str) -> Result<Option<Self>, SinkError> {
164        use crate::sink::Sink as _;
165        use crate::sink::kafka::KafkaSink;
166        use crate::sink::kinesis::KinesisSink;
167        use crate::sink::pulsar::PulsarSink;
168
169        let format = match r#type {
170            SINK_TYPE_APPEND_ONLY => SinkFormat::AppendOnly,
171            SINK_TYPE_UPSERT => SinkFormat::Upsert,
172            SINK_TYPE_DEBEZIUM => SinkFormat::Debezium,
173            _ => {
174                return Err(SinkError::Config(anyhow!(
175                    "sink type unsupported: {}",
176                    r#type
177                )));
178            }
179        };
180        let encode = match connector {
181            KafkaSink::SINK_NAME | KinesisSink::SINK_NAME | PulsarSink::SINK_NAME => {
182                SinkEncode::Json
183            }
184            _ => return Ok(None),
185        };
186        Ok(Some(Self {
187            format,
188            encode,
189            options: Default::default(),
190            secret_refs: Default::default(),
191            key_encode: None,
192            connection_id: None,
193        }))
194    }
195
196    pub fn to_proto(&self) -> PbSinkFormatDesc {
197        use risingwave_pb::plan_common::{EncodeType as E, FormatType as F};
198
199        let format = match self.format {
200            SinkFormat::AppendOnly => F::Plain,
201            SinkFormat::Upsert => F::Upsert,
202            SinkFormat::Debezium => F::Debezium,
203        };
204        let mapping_encode = |sink_encode: &SinkEncode| match sink_encode {
205            SinkEncode::Json => E::Json,
206            SinkEncode::Protobuf => E::Protobuf,
207            SinkEncode::Avro => E::Avro,
208            SinkEncode::Template => E::Template,
209            SinkEncode::Parquet => E::Parquet,
210            SinkEncode::Text => E::Text,
211            SinkEncode::Bytes => E::Bytes,
212        };
213
214        let encode = mapping_encode(&self.encode);
215        let key_encode = self.key_encode.as_ref().map(|e| mapping_encode(e).into());
216        let options = self
217            .options
218            .iter()
219            .map(|(k, v)| (k.clone(), v.clone()))
220            .collect();
221
222        PbSinkFormatDesc {
223            format: format.into(),
224            encode: encode.into(),
225            options,
226            key_encode,
227            secret_refs: self.secret_refs.clone(),
228            connection_id: self.connection_id,
229        }
230    }
231
232    // This function is for compatibility purposes. It sets the `SinkFormatDesc`
233    // when there is no configuration provided for the snowflake sink only.
234    pub fn plain_json_for_snowflake_only() -> Self {
235        Self {
236            format: SinkFormat::AppendOnly,
237            encode: SinkEncode::Json,
238            options: Default::default(),
239            secret_refs: Default::default(),
240            key_encode: None,
241            connection_id: None,
242        }
243    }
244}
245
246impl TryFrom<PbSinkFormatDesc> for SinkFormatDesc {
247    type Error = SinkError;
248
249    fn try_from(value: PbSinkFormatDesc) -> Result<Self, Self::Error> {
250        use risingwave_pb::plan_common::{EncodeType as E, FormatType as F};
251
252        let format = match value.format() {
253            F::Plain => SinkFormat::AppendOnly,
254            F::Upsert => SinkFormat::Upsert,
255            F::Debezium => SinkFormat::Debezium,
256            f @ (F::Unspecified
257            | F::Native
258            | F::DebeziumMongo
259            | F::Maxwell
260            | F::Canal
261            | F::None) => {
262                return Err(SinkError::Config(anyhow!(
263                    "sink format unsupported: {}",
264                    f.as_str_name()
265                )));
266            }
267        };
268        let encode = match value.encode() {
269            E::Json => SinkEncode::Json,
270            E::Protobuf => SinkEncode::Protobuf,
271            E::Template => SinkEncode::Template,
272            E::Avro => SinkEncode::Avro,
273            E::Parquet => SinkEncode::Parquet,
274            e @ (E::Unspecified | E::Native | E::Csv | E::Bytes | E::None | E::Text) => {
275                return Err(SinkError::Config(anyhow!(
276                    "sink encode unsupported: {}",
277                    e.as_str_name()
278                )));
279            }
280        };
281        let key_encode = match &value.key_encode() {
282            E::Bytes => Some(SinkEncode::Bytes),
283            E::Text => Some(SinkEncode::Text),
284            E::Unspecified => None,
285            encode @ (E::Avro
286            | E::Csv
287            | E::Json
288            | E::Protobuf
289            | E::Template
290            | E::Native
291            | E::Parquet
292            | E::None) => {
293                return Err(SinkError::Config(anyhow!(
294                    "unsupported {} as sink key encode",
295                    encode.as_str_name()
296                )));
297            }
298        };
299
300        Ok(Self {
301            format,
302            encode,
303            options: value.options,
304            key_encode,
305            secret_refs: value.secret_refs,
306            connection_id: value.connection_id,
307        })
308    }
309}
310
311/// the catalog of the sink. There are two kind of schema here. The full schema is all columns
312/// stored in the `column` which is the sink executor/fragment's output schema. The visible
313/// schema contains the columns whose `is_hidden` is false, which is the columns sink out to the
314/// external system. The distribution key and all other keys are indexed in the full schema.
315#[derive(Clone, Debug)]
316pub struct SinkCatalog {
317    /// Id of the sink.
318    pub id: SinkId,
319
320    /// Schema of the sink.
321    pub schema_id: SchemaId,
322
323    /// Database of the sink.
324    pub database_id: DatabaseId,
325
326    /// Name of the sink.
327    pub name: String,
328
329    /// The full `CREATE SINK` definition of the sink.
330    pub definition: String,
331
332    /// All columns of the sink. Note that this is NOT sorted by columnId in the vector.
333    columns: Vec<ColumnCatalog>,
334
335    /// Primary keys of the sink. Derived by the frontend.
336    pub plan_pk: Vec<ColumnOrder>,
337
338    /// User-defined primary key indices for upsert sink.
339    pub downstream_pk: Vec<usize>,
340
341    /// Distribution key indices of the sink. For example, if `distribution_key = [1, 2]`, then the
342    /// distribution keys will be `columns[1]` and `columns[2]`.
343    pub distribution_key: Vec<usize>,
344
345    /// The properties of the sink.
346    pub properties: BTreeMap<String, String>,
347
348    /// Owner of the sink.
349    pub owner: UserId,
350
351    // The append-only behavior of the physical sink connector. Frontend will determine `sink_type`
352    // based on both its own derivation on the append-only attribute and other user-specified
353    // options in `properties`.
354    pub sink_type: SinkType,
355
356    // The format and encode of the sink.
357    pub format_desc: Option<SinkFormatDesc>,
358
359    /// Sink may use a privatelink connection to connect to the downstream system.
360    pub connection_id: Option<ConnectionId>,
361
362    pub created_at_epoch: Option<Epoch>,
363
364    pub initialized_at_epoch: Option<Epoch>,
365
366    /// Name of the database
367    pub db_name: String,
368
369    /// Name for the table info for Debezium sink
370    pub sink_from_name: String,
371    pub auto_refresh_schema_from_table: Option<TableId>,
372
373    pub target_table: Option<TableId>,
374
375    pub created_at_cluster_version: Option<String>,
376    pub initialized_at_cluster_version: Option<String>,
377    pub create_type: CreateType,
378
379    /// Indicate the stream job status, whether it is created or creating.
380    /// If it is creating, we should hide it.
381    pub stream_job_status: StreamJobStatus,
382
383    /// The secret reference for the sink, mapping from property name to secret id.
384    pub secret_refs: BTreeMap<String, PbSecretRef>,
385
386    /// Only for the sink whose target is a table. Columns of the target table when the sink is created. At this point all the default columns of the target table are all handled by the project operator in the sink plan.
387    pub original_target_columns: Vec<ColumnCatalog>,
388}
389
390impl SinkCatalog {
391    pub fn to_proto(&self) -> PbSink {
392        PbSink {
393            id: self.id.into(),
394            schema_id: self.schema_id.schema_id,
395            database_id: self.database_id.database_id,
396            name: self.name.clone(),
397            definition: self.definition.clone(),
398            columns: self.columns.iter().map(|c| c.to_protobuf()).collect_vec(),
399            plan_pk: self.plan_pk.iter().map(|o| o.to_protobuf()).collect(),
400            downstream_pk: self
401                .downstream_pk
402                .iter()
403                .map(|idx| *idx as i32)
404                .collect_vec(),
405            distribution_key: self
406                .distribution_key
407                .iter()
408                .map(|k| *k as i32)
409                .collect_vec(),
410            owner: self.owner.into(),
411            properties: self.properties.clone(),
412            sink_type: self.sink_type.to_proto() as i32,
413            format_desc: self.format_desc.as_ref().map(|f| f.to_proto()),
414            connection_id: self.connection_id.map(|id| id.into()),
415            initialized_at_epoch: self.initialized_at_epoch.map(|e| e.0),
416            created_at_epoch: self.created_at_epoch.map(|e| e.0),
417            db_name: self.db_name.clone(),
418            sink_from_name: self.sink_from_name.clone(),
419            stream_job_status: self.stream_job_status.to_proto().into(),
420            target_table: self.target_table.map(|table_id| table_id.table_id()),
421            created_at_cluster_version: self.created_at_cluster_version.clone(),
422            initialized_at_cluster_version: self.initialized_at_cluster_version.clone(),
423            create_type: self.create_type.to_proto() as i32,
424            secret_refs: self.secret_refs.clone(),
425            original_target_columns: self
426                .original_target_columns
427                .iter()
428                .map(|c| c.to_protobuf())
429                .collect_vec(),
430            auto_refresh_schema_from_table: self
431                .auto_refresh_schema_from_table
432                .map(|table_id| table_id.table_id),
433        }
434    }
435
436    /// Returns the SQL statement that can be used to create this sink.
437    pub fn create_sql(&self) -> String {
438        self.definition.clone()
439    }
440
441    pub fn visible_columns(&self) -> impl Iterator<Item = &ColumnCatalog> {
442        self.columns.iter().filter(|c| !c.is_hidden)
443    }
444
445    pub fn full_columns(&self) -> &[ColumnCatalog] {
446        &self.columns
447    }
448
449    pub fn full_schema(&self) -> Schema {
450        let fields = self
451            .full_columns()
452            .iter()
453            .map(|column| Field::from(column.column_desc.clone()))
454            .collect_vec();
455        Schema { fields }
456    }
457
458    pub fn visible_schema(&self) -> Schema {
459        let fields = self
460            .visible_columns()
461            .map(|column| Field::from(column.column_desc.clone()))
462            .collect_vec();
463        Schema { fields }
464    }
465
466    pub fn downstream_pk_indices(&self) -> Vec<usize> {
467        self.downstream_pk.clone()
468    }
469
470    pub fn unique_identity(&self) -> String {
471        // We need to align with meta here, so we've utilized the proto method.
472        self.to_proto().unique_identity()
473    }
474
475    pub fn is_created(&self) -> bool {
476        self.stream_job_status == StreamJobStatus::Created
477    }
478}
479
480impl From<PbSink> for SinkCatalog {
481    fn from(pb: PbSink) -> Self {
482        let sink_type = pb.get_sink_type().unwrap();
483        let create_type = pb.get_create_type().unwrap_or(PbCreateType::Foreground);
484        let stream_job_status = pb
485            .get_stream_job_status()
486            .unwrap_or(PbStreamJobStatus::Created);
487        let format_desc = match pb.format_desc {
488            Some(f) => f.try_into().ok(),
489            None => {
490                let connector = pb.properties.get(CONNECTOR_TYPE_KEY);
491                let r#type = pb.properties.get(SINK_TYPE_OPTION);
492                match (connector, r#type) {
493                    (Some(c), Some(t)) => SinkFormatDesc::from_legacy_type(c, t).ok().flatten(),
494                    _ => None,
495                }
496            }
497        };
498        SinkCatalog {
499            id: pb.id.into(),
500            name: pb.name,
501            schema_id: pb.schema_id.into(),
502            database_id: pb.database_id.into(),
503            definition: pb.definition,
504            columns: pb
505                .columns
506                .into_iter()
507                .map(ColumnCatalog::from)
508                .collect_vec(),
509            plan_pk: pb
510                .plan_pk
511                .iter()
512                .map(ColumnOrder::from_protobuf)
513                .collect_vec(),
514            downstream_pk: pb.downstream_pk.into_iter().map(|k| k as _).collect_vec(),
515            distribution_key: pb
516                .distribution_key
517                .into_iter()
518                .map(|k| k as _)
519                .collect_vec(),
520            properties: pb.properties,
521            owner: pb.owner.into(),
522            sink_type: SinkType::from_proto(sink_type),
523            format_desc,
524            connection_id: pb.connection_id.map(ConnectionId),
525            created_at_epoch: pb.created_at_epoch.map(Epoch::from),
526            initialized_at_epoch: pb.initialized_at_epoch.map(Epoch::from),
527            db_name: pb.db_name,
528            sink_from_name: pb.sink_from_name,
529            auto_refresh_schema_from_table: pb.auto_refresh_schema_from_table.map(TableId::new),
530            target_table: pb.target_table.map(TableId::new),
531            initialized_at_cluster_version: pb.initialized_at_cluster_version,
532            created_at_cluster_version: pb.created_at_cluster_version,
533            create_type: CreateType::from_proto(create_type),
534            stream_job_status: StreamJobStatus::from_proto(stream_job_status),
535            secret_refs: pb.secret_refs,
536            original_target_columns: pb
537                .original_target_columns
538                .into_iter()
539                .map(ColumnCatalog::from)
540                .collect_vec(),
541        }
542    }
543}
544
545impl From<&PbSink> for SinkCatalog {
546    fn from(pb: &PbSink) -> Self {
547        pb.clone().into()
548    }
549}