risingwave_connector/sink/catalog/
mod.rs

1// Copyright 2023 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15pub mod desc;
16
17use std::collections::BTreeMap;
18use std::fmt::{Display, Formatter};
19
20use anyhow::anyhow;
21use itertools::Itertools;
22use risingwave_common::catalog::{
23    ColumnCatalog, ConnectionId, CreateType, DatabaseId, Field, Schema, SchemaId, StreamJobStatus,
24    TableId, UserId,
25};
26pub use risingwave_common::id::SinkId;
27use risingwave_common::util::epoch::Epoch;
28use risingwave_common::util::sort_util::ColumnOrder;
29use risingwave_pb::catalog::{
30    PbCreateType, PbSink, PbSinkFormatDesc, PbSinkType, PbStreamJobStatus,
31};
32use risingwave_pb::secret::PbSecretRef;
33use serde::Serialize;
34
35use super::{
36    CONNECTOR_TYPE_KEY, SINK_TYPE_APPEND_ONLY, SINK_TYPE_DEBEZIUM, SINK_TYPE_OPTION,
37    SINK_TYPE_UPSERT, SinkError,
38};
39
40#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash)]
41pub enum SinkType {
42    /// The data written into the sink connector can only be INSERT. No UPDATE or DELETE is
43    /// allowed.
44    AppendOnly,
45    /// The data written into the sink connector can be INSERT or DELETE.
46    /// When updating a row, an INSERT with new value will be written.
47    Upsert,
48    /// The data written into the sink connector can be INSERT, UPDATE, or DELETE.
49    /// When updating a row, an UPDATE pair (U- then U+) will be written.
50    ///
51    /// Currently only used by DEBEZIUM format.
52    Retract,
53}
54
55impl SinkType {
56    /// Whether the sink type is `AppendOnly`.
57    pub fn is_append_only(self) -> bool {
58        self == Self::AppendOnly
59    }
60
61    /// Convert to the string specified in `type = '...'` within the WITH options.
62    pub fn type_str(self) -> &'static str {
63        match self {
64            SinkType::AppendOnly => "append-only",
65            SinkType::Upsert => "upsert",
66            SinkType::Retract => "retract",
67        }
68    }
69
70    pub fn to_proto(self) -> PbSinkType {
71        match self {
72            SinkType::AppendOnly => PbSinkType::AppendOnly,
73            SinkType::Upsert => PbSinkType::Upsert,
74            SinkType::Retract => PbSinkType::Retract,
75        }
76    }
77
78    pub fn from_proto(pb: PbSinkType) -> Self {
79        match pb {
80            PbSinkType::AppendOnly => SinkType::AppendOnly,
81            // Backward compatibility: normalize force-append-only to append-only. The associated
82            // behavior is now represented by another field `ignore_delete`.
83            PbSinkType::ForceAppendOnly => SinkType::AppendOnly,
84            PbSinkType::Upsert => SinkType::Upsert,
85            PbSinkType::Retract => SinkType::Retract,
86            PbSinkType::Unspecified => unreachable!(),
87        }
88    }
89}
90
91/// May replace [`SinkType`].
92///
93/// TODO: consolidate with [`crate::source::SourceStruct`] and [`crate::parser::SpecificParserConfig`].
94#[derive(Debug, Clone, PartialEq, Eq, Hash)]
95pub struct SinkFormatDesc {
96    pub format: SinkFormat,
97    pub encode: SinkEncode,
98    pub options: BTreeMap<String, String>,
99    pub secret_refs: BTreeMap<String, PbSecretRef>,
100    pub key_encode: Option<SinkEncode>,
101    pub connection_id: Option<ConnectionId>,
102}
103
104/// TODO: consolidate with [`crate::source::SourceFormat`] and [`crate::parser::ProtocolProperties`].
105#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize)]
106pub enum SinkFormat {
107    AppendOnly,
108    Upsert,
109    Debezium,
110}
111
112impl Display for SinkFormat {
113    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
114        write!(f, "{:?}", self)
115    }
116}
117
118/// TODO: consolidate with [`crate::source::SourceEncode`] and [`crate::parser::EncodingProperties`].
119#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize)]
120pub enum SinkEncode {
121    Json,
122    Protobuf,
123    Avro,
124    Template,
125    Parquet,
126    Text,
127    Bytes,
128}
129
130impl Display for SinkEncode {
131    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
132        write!(f, "{:?}", self)
133    }
134}
135
136impl SinkFormatDesc {
137    pub fn from_legacy_type(connector: &str, r#type: &str) -> Result<Option<Self>, SinkError> {
138        use crate::sink::Sink as _;
139        use crate::sink::kafka::KafkaSink;
140        use crate::sink::kinesis::KinesisSink;
141        use crate::sink::pulsar::PulsarSink;
142
143        let format = match r#type {
144            SINK_TYPE_APPEND_ONLY => SinkFormat::AppendOnly,
145            SINK_TYPE_UPSERT => SinkFormat::Upsert,
146            SINK_TYPE_DEBEZIUM => SinkFormat::Debezium,
147            _ => {
148                return Err(SinkError::Config(anyhow!(
149                    "sink type unsupported: {}",
150                    r#type
151                )));
152            }
153        };
154        let encode = match connector {
155            KafkaSink::SINK_NAME | KinesisSink::SINK_NAME | PulsarSink::SINK_NAME => {
156                SinkEncode::Json
157            }
158            _ => return Ok(None),
159        };
160        Ok(Some(Self {
161            format,
162            encode,
163            options: Default::default(),
164            secret_refs: Default::default(),
165            key_encode: None,
166            connection_id: None,
167        }))
168    }
169
170    pub fn to_proto(&self) -> PbSinkFormatDesc {
171        use risingwave_pb::plan_common::{EncodeType as E, FormatType as F};
172
173        let format = match self.format {
174            SinkFormat::AppendOnly => F::Plain,
175            SinkFormat::Upsert => F::Upsert,
176            SinkFormat::Debezium => F::Debezium,
177        };
178        let mapping_encode = |sink_encode: &SinkEncode| match sink_encode {
179            SinkEncode::Json => E::Json,
180            SinkEncode::Protobuf => E::Protobuf,
181            SinkEncode::Avro => E::Avro,
182            SinkEncode::Template => E::Template,
183            SinkEncode::Parquet => E::Parquet,
184            SinkEncode::Text => E::Text,
185            SinkEncode::Bytes => E::Bytes,
186        };
187
188        let encode = mapping_encode(&self.encode);
189        let key_encode = self.key_encode.as_ref().map(|e| mapping_encode(e).into());
190        let options = self
191            .options
192            .iter()
193            .map(|(k, v)| (k.clone(), v.clone()))
194            .collect();
195
196        PbSinkFormatDesc {
197            format: format.into(),
198            encode: encode.into(),
199            options,
200            key_encode,
201            secret_refs: self.secret_refs.clone(),
202            connection_id: self.connection_id,
203        }
204    }
205
206    // This function is for compatibility purposes. It sets the `SinkFormatDesc`
207    // when there is no configuration provided for the snowflake sink only.
208    pub fn plain_json_for_snowflake_only() -> Self {
209        Self {
210            format: SinkFormat::AppendOnly,
211            encode: SinkEncode::Json,
212            options: Default::default(),
213            secret_refs: Default::default(),
214            key_encode: None,
215            connection_id: None,
216        }
217    }
218}
219
220impl TryFrom<PbSinkFormatDesc> for SinkFormatDesc {
221    type Error = SinkError;
222
223    fn try_from(value: PbSinkFormatDesc) -> Result<Self, Self::Error> {
224        use risingwave_pb::plan_common::{EncodeType as E, FormatType as F};
225
226        let format = match value.format() {
227            F::Plain => SinkFormat::AppendOnly,
228            F::Upsert => SinkFormat::Upsert,
229            F::Debezium => SinkFormat::Debezium,
230            f @ (F::Unspecified
231            | F::Native
232            | F::DebeziumMongo
233            | F::Maxwell
234            | F::Canal
235            | F::None) => {
236                return Err(SinkError::Config(anyhow!(
237                    "sink format unsupported: {}",
238                    f.as_str_name()
239                )));
240            }
241        };
242        let encode = match value.encode() {
243            E::Json => SinkEncode::Json,
244            E::Protobuf => SinkEncode::Protobuf,
245            E::Template => SinkEncode::Template,
246            E::Avro => SinkEncode::Avro,
247            E::Parquet => SinkEncode::Parquet,
248            E::Bytes => SinkEncode::Bytes,
249            e @ (E::Unspecified | E::Native | E::Csv | E::None | E::Text) => {
250                return Err(SinkError::Config(anyhow!(
251                    "sink encode unsupported: {}",
252                    e.as_str_name()
253                )));
254            }
255        };
256        let key_encode = match &value.key_encode() {
257            E::Bytes => Some(SinkEncode::Bytes),
258            E::Text => Some(SinkEncode::Text),
259            E::Unspecified => None,
260            encode @ (E::Avro
261            | E::Csv
262            | E::Json
263            | E::Protobuf
264            | E::Template
265            | E::Native
266            | E::Parquet
267            | E::None) => {
268                return Err(SinkError::Config(anyhow!(
269                    "unsupported {} as sink key encode",
270                    encode.as_str_name()
271                )));
272            }
273        };
274
275        Ok(Self {
276            format,
277            encode,
278            options: value.options,
279            key_encode,
280            secret_refs: value.secret_refs,
281            connection_id: value.connection_id,
282        })
283    }
284}
285
286/// the catalog of the sink. There are two kind of schema here. The full schema is all columns
287/// stored in the `column` which is the sink executor/fragment's output schema. The visible
288/// schema contains the columns whose `is_hidden` is false, which is the columns sink out to the
289/// external system. The distribution key and all other keys are indexed in the full schema.
290#[derive(Clone, Debug)]
291pub struct SinkCatalog {
292    /// Id of the sink.
293    pub id: SinkId,
294
295    /// Schema of the sink.
296    pub schema_id: SchemaId,
297
298    /// Database of the sink.
299    pub database_id: DatabaseId,
300
301    /// Name of the sink.
302    pub name: String,
303
304    /// The full `CREATE SINK` definition of the sink.
305    pub definition: String,
306
307    /// All columns of the sink. Note that this is NOT sorted by columnId in the vector.
308    columns: Vec<ColumnCatalog>,
309
310    /// Primary keys of the sink. Derived by the frontend.
311    pub plan_pk: Vec<ColumnOrder>,
312
313    /// User-defined primary key indices for upsert sink, if any.
314    pub downstream_pk: Option<Vec<usize>>,
315
316    /// Distribution key indices of the sink. For example, if `distribution_key = [1, 2]`, then the
317    /// distribution keys will be `columns[1]` and `columns[2]`.
318    pub distribution_key: Vec<usize>,
319
320    /// The properties of the sink.
321    pub properties: BTreeMap<String, String>,
322
323    /// Owner of the sink.
324    pub owner: UserId,
325
326    // The append-only behavior of the physical sink connector. Frontend will determine `sink_type`
327    // based on both its own derivation on the append-only attribute and other user-specified
328    // options in `properties`.
329    pub sink_type: SinkType,
330    /// Whether to drop DELETE and convert UPDATE to INSERT in the sink executor.
331    pub ignore_delete: bool,
332
333    // The format and encode of the sink.
334    pub format_desc: Option<SinkFormatDesc>,
335
336    /// Sink may use a privatelink connection to connect to the downstream system.
337    pub connection_id: Option<ConnectionId>,
338
339    pub created_at_epoch: Option<Epoch>,
340
341    pub initialized_at_epoch: Option<Epoch>,
342
343    /// Name of the database
344    pub db_name: String,
345
346    /// Name for the table info for Debezium sink
347    pub sink_from_name: String,
348    pub auto_refresh_schema_from_table: Option<TableId>,
349
350    pub target_table: Option<TableId>,
351
352    pub created_at_cluster_version: Option<String>,
353    pub initialized_at_cluster_version: Option<String>,
354    pub create_type: CreateType,
355
356    /// Indicate the stream job status, whether it is created or creating.
357    /// If it is creating, we should hide it.
358    pub stream_job_status: StreamJobStatus,
359
360    /// The secret reference for the sink, mapping from property name to secret id.
361    pub secret_refs: BTreeMap<String, PbSecretRef>,
362
363    /// Only for the sink whose target is a table. Columns of the target table when the sink is created. At this point all the default columns of the target table are all handled by the project operator in the sink plan.
364    pub original_target_columns: Vec<ColumnCatalog>,
365}
366
367impl SinkCatalog {
368    pub fn to_proto(&self) -> PbSink {
369        PbSink {
370            id: self.id,
371            schema_id: self.schema_id,
372            database_id: self.database_id,
373            name: self.name.clone(),
374            definition: self.definition.clone(),
375            columns: self.columns.iter().map(|c| c.to_protobuf()).collect_vec(),
376            plan_pk: self.plan_pk.iter().map(|o| o.to_protobuf()).collect(),
377            downstream_pk: (self.downstream_pk.as_ref())
378                .map_or_else(Vec::new, |pk| pk.iter().map(|idx| *idx as _).collect_vec()),
379            distribution_key: self
380                .distribution_key
381                .iter()
382                .map(|k| *k as i32)
383                .collect_vec(),
384            owner: self.owner.into(),
385            properties: self.properties.clone(),
386            sink_type: self.sink_type.to_proto() as i32,
387            raw_ignore_delete: self.ignore_delete,
388            format_desc: self.format_desc.as_ref().map(|f| f.to_proto()),
389            connection_id: self.connection_id,
390            initialized_at_epoch: self.initialized_at_epoch.map(|e| e.0),
391            created_at_epoch: self.created_at_epoch.map(|e| e.0),
392            db_name: self.db_name.clone(),
393            sink_from_name: self.sink_from_name.clone(),
394            stream_job_status: self.stream_job_status.to_proto().into(),
395            target_table: self.target_table,
396            created_at_cluster_version: self.created_at_cluster_version.clone(),
397            initialized_at_cluster_version: self.initialized_at_cluster_version.clone(),
398            create_type: self.create_type.to_proto() as i32,
399            secret_refs: self.secret_refs.clone(),
400            original_target_columns: self
401                .original_target_columns
402                .iter()
403                .map(|c| c.to_protobuf())
404                .collect_vec(),
405            auto_refresh_schema_from_table: self.auto_refresh_schema_from_table,
406        }
407    }
408
409    /// Returns the SQL statement that can be used to create this sink.
410    pub fn create_sql(&self) -> String {
411        self.definition.clone()
412    }
413
414    pub fn visible_columns(&self) -> impl Iterator<Item = &ColumnCatalog> {
415        self.columns.iter().filter(|c| !c.is_hidden)
416    }
417
418    pub fn full_columns(&self) -> &[ColumnCatalog] {
419        &self.columns
420    }
421
422    pub fn full_schema(&self) -> Schema {
423        let fields = self
424            .full_columns()
425            .iter()
426            .map(|column| Field::from(column.column_desc.clone()))
427            .collect_vec();
428        Schema { fields }
429    }
430
431    pub fn visible_schema(&self) -> Schema {
432        let fields = self
433            .visible_columns()
434            .map(|column| Field::from(column.column_desc.clone()))
435            .collect_vec();
436        Schema { fields }
437    }
438
439    pub fn unique_identity(&self) -> String {
440        // We need to align with meta here, so we've utilized the proto method.
441        self.to_proto().unique_identity()
442    }
443
444    pub fn is_created(&self) -> bool {
445        self.stream_job_status == StreamJobStatus::Created
446    }
447}
448
449impl From<PbSink> for SinkCatalog {
450    fn from(pb: PbSink) -> Self {
451        let sink_type = pb.get_sink_type().unwrap();
452        let ignore_delete = pb.ignore_delete();
453        let create_type = pb.get_create_type().unwrap_or(PbCreateType::Foreground);
454        let stream_job_status = pb
455            .get_stream_job_status()
456            .unwrap_or(PbStreamJobStatus::Created);
457        let format_desc = match pb.format_desc {
458            Some(f) => f.try_into().ok(),
459            None => {
460                let connector = pb.properties.get(CONNECTOR_TYPE_KEY);
461                let r#type = pb.properties.get(SINK_TYPE_OPTION);
462                match (connector, r#type) {
463                    (Some(c), Some(t)) => SinkFormatDesc::from_legacy_type(c, t).ok().flatten(),
464                    _ => None,
465                }
466            }
467        };
468        SinkCatalog {
469            id: pb.id,
470            name: pb.name,
471            schema_id: pb.schema_id,
472            database_id: pb.database_id,
473            definition: pb.definition,
474            columns: pb
475                .columns
476                .into_iter()
477                .map(ColumnCatalog::from)
478                .collect_vec(),
479            plan_pk: pb
480                .plan_pk
481                .iter()
482                .map(ColumnOrder::from_protobuf)
483                .collect_vec(),
484            downstream_pk: if pb.downstream_pk.is_empty() {
485                None
486            } else {
487                Some(
488                    (pb.downstream_pk.into_iter())
489                        .map(|idx| idx as usize)
490                        .collect_vec(),
491                )
492            },
493            distribution_key: pb
494                .distribution_key
495                .into_iter()
496                .map(|k| k as _)
497                .collect_vec(),
498            properties: pb.properties,
499            owner: pb.owner.into(),
500            sink_type: SinkType::from_proto(sink_type),
501            ignore_delete,
502            format_desc,
503            connection_id: pb.connection_id,
504            created_at_epoch: pb.created_at_epoch.map(Epoch::from),
505            initialized_at_epoch: pb.initialized_at_epoch.map(Epoch::from),
506            db_name: pb.db_name,
507            sink_from_name: pb.sink_from_name,
508            auto_refresh_schema_from_table: pb.auto_refresh_schema_from_table,
509            target_table: pb.target_table,
510            initialized_at_cluster_version: pb.initialized_at_cluster_version,
511            created_at_cluster_version: pb.created_at_cluster_version,
512            create_type: CreateType::from_proto(create_type),
513            stream_job_status: StreamJobStatus::from_proto(stream_job_status),
514            secret_refs: pb.secret_refs,
515            original_target_columns: pb
516                .original_target_columns
517                .into_iter()
518                .map(ColumnCatalog::from)
519                .collect_vec(),
520        }
521    }
522}
523
524impl From<&PbSink> for SinkCatalog {
525    fn from(pb: &PbSink) -> Self {
526        pb.clone().into()
527    }
528}