risingwave_connector/sink/catalog/
desc.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::BTreeMap;
16
17use itertools::Itertools;
18use risingwave_common::catalog::{
19    ColumnCatalog, ConnectionId, CreateType, DatabaseId, SchemaId, TableId, UserId,
20};
21use risingwave_common::util::sort_util::ColumnOrder;
22use risingwave_pb::secret::PbSecretRef;
23use risingwave_pb::stream_plan::PbSinkDesc;
24
25use super::{SinkCatalog, SinkFormatDesc, SinkId, SinkType};
26use crate::sink::CONNECTOR_TYPE_KEY;
27use crate::sink::file_sink::azblob::AZBLOB_SINK;
28use crate::sink::file_sink::fs::FS_SINK;
29use crate::sink::file_sink::s3::S3_SINK;
30use crate::sink::file_sink::webhdfs::WEBHDFS_SINK;
31
32#[derive(Debug, Clone, PartialEq, Eq, Hash)]
33pub struct SinkDesc {
34    /// Id of the sink. For debug now.
35    pub id: SinkId,
36
37    /// Name of the sink. For debug now.
38    pub name: String,
39
40    /// Full SQL definition of the sink. For debug now.
41    pub definition: String,
42
43    /// All columns of the sink. Note that this is NOT sorted by columnId in the vector.
44    pub columns: Vec<ColumnCatalog>,
45
46    /// Primary keys of the sink. Derived by the frontend.
47    pub plan_pk: Vec<ColumnOrder>,
48
49    /// User-defined primary key indices for upsert sink.
50    pub downstream_pk: Vec<usize>,
51
52    /// Distribution key indices of the sink. For example, if `distribution_key = [1, 2]`, then the
53    /// distribution keys will be `columns[1]` and `columns[2]`.
54    pub distribution_key: Vec<usize>,
55
56    /// The properties of the sink.
57    pub properties: BTreeMap<String, String>,
58
59    /// Secret ref
60    pub secret_refs: BTreeMap<String, PbSecretRef>,
61
62    // The append-only behavior of the physical sink connector. Frontend will determine `sink_type`
63    // based on both its own derivation on the append-only attribute and other user-specified
64    // options in `properties`.
65    pub sink_type: SinkType,
66
67    // The format and encode of the sink.
68    pub format_desc: Option<SinkFormatDesc>,
69
70    /// Name of the database
71    pub db_name: String,
72
73    /// Name of the "table" field for Debezium. If the sink is from table or mv,
74    /// it is the name of table/mv. Otherwise, it is the name of the sink.
75    pub sink_from_name: String,
76
77    /// Id of the target table for sink into table.
78    pub target_table: Option<TableId>,
79
80    /// See the same name field in `SinkWriterParam`.
81    pub extra_partition_col_idx: Option<usize>,
82
83    /// Whether the sink job should run in foreground or background.
84    pub create_type: CreateType,
85
86    pub is_exactly_once: bool,
87}
88
89impl SinkDesc {
90    pub fn into_catalog(
91        self,
92        schema_id: SchemaId,
93        database_id: DatabaseId,
94        owner: UserId,
95        connection_id: Option<ConnectionId>,
96    ) -> SinkCatalog {
97        SinkCatalog {
98            id: self.id,
99            schema_id,
100            database_id,
101            name: self.name,
102            definition: self.definition,
103            columns: self.columns,
104            plan_pk: self.plan_pk,
105            downstream_pk: self.downstream_pk,
106            distribution_key: self.distribution_key,
107            owner,
108            properties: self.properties,
109            secret_refs: self.secret_refs,
110            sink_type: self.sink_type,
111            format_desc: self.format_desc,
112            connection_id,
113            created_at_epoch: None,
114            initialized_at_epoch: None,
115            db_name: self.db_name,
116            sink_from_name: self.sink_from_name,
117            target_table: self.target_table,
118            created_at_cluster_version: None,
119            initialized_at_cluster_version: None,
120            create_type: self.create_type,
121            original_target_columns: vec![],
122        }
123    }
124
125    pub fn to_proto(&self) -> PbSinkDesc {
126        PbSinkDesc {
127            id: self.id.sink_id,
128            name: self.name.clone(),
129            definition: self.definition.clone(),
130            column_catalogs: self
131                .columns
132                .iter()
133                .map(|column| column.to_protobuf())
134                .collect_vec(),
135            plan_pk: self.plan_pk.iter().map(|k| k.to_protobuf()).collect_vec(),
136            downstream_pk: self.downstream_pk.iter().map(|idx| *idx as _).collect_vec(),
137            distribution_key: self.distribution_key.iter().map(|k| *k as _).collect_vec(),
138            properties: self.properties.clone().into_iter().collect(),
139            sink_type: self.sink_type.to_proto() as i32,
140            format_desc: self.format_desc.as_ref().map(|f| f.to_proto()),
141            db_name: self.db_name.clone(),
142            sink_from_name: self.sink_from_name.clone(),
143            target_table: self.target_table.map(|table_id| table_id.table_id()),
144            extra_partition_col_idx: self.extra_partition_col_idx.map(|idx| idx as u64),
145            secret_refs: self.secret_refs.clone(),
146        }
147    }
148
149    pub fn is_file_sink(&self) -> bool {
150        self.properties
151            .get(CONNECTOR_TYPE_KEY)
152            .map(|s| {
153                s.eq_ignore_ascii_case(FS_SINK)
154                    || s.eq_ignore_ascii_case(AZBLOB_SINK)
155                    || s.eq_ignore_ascii_case(S3_SINK)
156                    || s.eq_ignore_ascii_case(WEBHDFS_SINK)
157            })
158            .unwrap_or(false)
159    }
160}