risingwave_connector/sink/catalog/
desc.rs

1// Copyright 2023 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::BTreeMap;
16
17use itertools::Itertools;
18use risingwave_common::catalog::{
19    ColumnCatalog, ConnectionId, CreateType, DatabaseId, SchemaId, StreamJobStatus, TableId, UserId,
20};
21use risingwave_common::util::sort_util::ColumnOrder;
22use risingwave_pb::secret::PbSecretRef;
23use risingwave_pb::stream_plan::PbSinkDesc;
24
25use super::{SinkCatalog, SinkFormatDesc, SinkId, SinkType};
26use crate::sink::CONNECTOR_TYPE_KEY;
27use crate::sink::file_sink::azblob::AZBLOB_SINK;
28use crate::sink::file_sink::fs::FS_SINK;
29use crate::sink::file_sink::s3::S3_SINK;
30use crate::sink::file_sink::webhdfs::WEBHDFS_SINK;
31
32#[derive(Debug, Clone, PartialEq, Eq, Hash)]
33pub struct SinkDesc {
34    /// Id of the sink. For debug now.
35    pub id: SinkId,
36
37    /// Name of the sink. For debug now.
38    pub name: String,
39
40    /// Full SQL definition of the sink. For debug now.
41    pub definition: String,
42
43    /// All columns of the sink. Note that this is NOT sorted by columnId in the vector.
44    pub columns: Vec<ColumnCatalog>,
45
46    /// Primary keys of the sink. Derived by the frontend.
47    pub plan_pk: Vec<ColumnOrder>,
48
49    /// User-defined primary key indices for upsert sink, if any.
50    pub downstream_pk: Option<Vec<usize>>,
51
52    /// Distribution key indices of the sink. For example, if `distribution_key = [1, 2]`, then the
53    /// distribution keys will be `columns[1]` and `columns[2]`.
54    pub distribution_key: Vec<usize>,
55
56    /// The properties of the sink.
57    pub properties: BTreeMap<String, String>,
58
59    /// Secret ref
60    pub secret_refs: BTreeMap<String, PbSecretRef>,
61
62    // The append-only behavior of the physical sink connector. Frontend will determine `sink_type`
63    // based on both its own derivation on the append-only attribute and other user-specified
64    // options in `properties`.
65    pub sink_type: SinkType,
66
67    /// Whether to drop DELETE and convert UPDATE to INSERT in the sink executor.
68    pub ignore_delete: bool,
69
70    // The format and encode of the sink.
71    pub format_desc: Option<SinkFormatDesc>,
72
73    /// Name of the database
74    pub db_name: String,
75
76    /// Name of the "table" field for Debezium. If the sink is from table or mv,
77    /// it is the name of table/mv. Otherwise, it is the name of the sink.
78    pub sink_from_name: String,
79
80    /// Id of the target table for sink into table.
81    pub target_table: Option<TableId>,
82
83    /// See the same name field in `SinkWriterParam`.
84    pub extra_partition_col_idx: Option<usize>,
85
86    /// Whether the sink job should run in foreground or background.
87    pub create_type: CreateType,
88
89    pub is_exactly_once: Option<bool>,
90
91    pub auto_refresh_schema_from_table: Option<TableId>,
92}
93
94impl SinkDesc {
95    pub fn into_catalog(
96        self,
97        schema_id: SchemaId,
98        database_id: DatabaseId,
99        owner: UserId,
100        connection_id: Option<ConnectionId>,
101    ) -> SinkCatalog {
102        SinkCatalog {
103            id: self.id,
104            schema_id,
105            database_id,
106            name: self.name,
107            definition: self.definition,
108            columns: self.columns,
109            plan_pk: self.plan_pk,
110            downstream_pk: self.downstream_pk,
111            distribution_key: self.distribution_key,
112            owner,
113            properties: self.properties,
114            secret_refs: self.secret_refs,
115            sink_type: self.sink_type,
116            ignore_delete: self.ignore_delete,
117            format_desc: self.format_desc,
118            connection_id,
119            created_at_epoch: None,
120            initialized_at_epoch: None,
121            db_name: self.db_name,
122            sink_from_name: self.sink_from_name,
123            auto_refresh_schema_from_table: self.auto_refresh_schema_from_table,
124            target_table: self.target_table,
125            created_at_cluster_version: None,
126            initialized_at_cluster_version: None,
127            create_type: self.create_type,
128            original_target_columns: vec![],
129            stream_job_status: StreamJobStatus::Creating,
130        }
131    }
132
133    pub fn to_proto(&self) -> PbSinkDesc {
134        PbSinkDesc {
135            id: self.id,
136            name: self.name.clone(),
137            definition: self.definition.clone(),
138            column_catalogs: self
139                .columns
140                .iter()
141                .map(|column| column.to_protobuf())
142                .collect_vec(),
143            plan_pk: self.plan_pk.iter().map(|k| k.to_protobuf()).collect_vec(),
144            downstream_pk: (self.downstream_pk.as_ref())
145                .map_or_else(Vec::new, |pk| pk.iter().map(|idx| *idx as _).collect_vec()),
146            distribution_key: self.distribution_key.iter().map(|k| *k as _).collect_vec(),
147            properties: self.properties.clone().into_iter().collect(),
148            sink_type: self.sink_type.to_proto() as i32,
149            raw_ignore_delete: self.ignore_delete,
150            format_desc: self.format_desc.as_ref().map(|f| f.to_proto()),
151            db_name: self.db_name.clone(),
152            sink_from_name: self.sink_from_name.clone(),
153            target_table: self.target_table.map(|table_id| table_id.as_raw_id()),
154            extra_partition_col_idx: self.extra_partition_col_idx.map(|idx| idx as u64),
155            secret_refs: self.secret_refs.clone(),
156        }
157    }
158
159    pub fn is_file_sink(&self) -> bool {
160        self.properties
161            .get(CONNECTOR_TYPE_KEY)
162            .map(|s| {
163                s.eq_ignore_ascii_case(FS_SINK)
164                    || s.eq_ignore_ascii_case(AZBLOB_SINK)
165                    || s.eq_ignore_ascii_case(S3_SINK)
166                    || s.eq_ignore_ascii_case(WEBHDFS_SINK)
167            })
168            .unwrap_or(false)
169    }
170}