risingwave_connector/sink/catalog/
desc.rs1use std::collections::BTreeMap;
16
17use itertools::Itertools;
18use risingwave_common::catalog::{
19 ColumnCatalog, ConnectionId, CreateType, DatabaseId, SchemaId, TableId, UserId,
20};
21use risingwave_common::util::sort_util::ColumnOrder;
22use risingwave_pb::secret::PbSecretRef;
23use risingwave_pb::stream_plan::PbSinkDesc;
24
25use super::{SinkCatalog, SinkFormatDesc, SinkId, SinkType};
26use crate::sink::CONNECTOR_TYPE_KEY;
27use crate::sink::file_sink::azblob::AZBLOB_SINK;
28use crate::sink::file_sink::fs::FS_SINK;
29use crate::sink::file_sink::s3::S3_SINK;
30use crate::sink::file_sink::webhdfs::WEBHDFS_SINK;
31
32#[derive(Debug, Clone, PartialEq, Eq, Hash)]
33pub struct SinkDesc {
34 pub id: SinkId,
36
37 pub name: String,
39
40 pub definition: String,
42
43 pub columns: Vec<ColumnCatalog>,
45
46 pub plan_pk: Vec<ColumnOrder>,
48
49 pub downstream_pk: Vec<usize>,
51
52 pub distribution_key: Vec<usize>,
55
56 pub properties: BTreeMap<String, String>,
58
59 pub secret_refs: BTreeMap<String, PbSecretRef>,
61
62 pub sink_type: SinkType,
66
67 pub format_desc: Option<SinkFormatDesc>,
69
70 pub db_name: String,
72
73 pub sink_from_name: String,
76
77 pub target_table: Option<TableId>,
79
80 pub extra_partition_col_idx: Option<usize>,
82
83 pub create_type: CreateType,
85
86 pub is_exactly_once: bool,
87}
88
89impl SinkDesc {
90 pub fn into_catalog(
91 self,
92 schema_id: SchemaId,
93 database_id: DatabaseId,
94 owner: UserId,
95 connection_id: Option<ConnectionId>,
96 ) -> SinkCatalog {
97 SinkCatalog {
98 id: self.id,
99 schema_id,
100 database_id,
101 name: self.name,
102 definition: self.definition,
103 columns: self.columns,
104 plan_pk: self.plan_pk,
105 downstream_pk: self.downstream_pk,
106 distribution_key: self.distribution_key,
107 owner,
108 properties: self.properties,
109 secret_refs: self.secret_refs,
110 sink_type: self.sink_type,
111 format_desc: self.format_desc,
112 connection_id,
113 created_at_epoch: None,
114 initialized_at_epoch: None,
115 db_name: self.db_name,
116 sink_from_name: self.sink_from_name,
117 target_table: self.target_table,
118 created_at_cluster_version: None,
119 initialized_at_cluster_version: None,
120 create_type: self.create_type,
121 original_target_columns: vec![],
122 }
123 }
124
125 pub fn to_proto(&self) -> PbSinkDesc {
126 PbSinkDesc {
127 id: self.id.sink_id,
128 name: self.name.clone(),
129 definition: self.definition.clone(),
130 column_catalogs: self
131 .columns
132 .iter()
133 .map(|column| column.to_protobuf())
134 .collect_vec(),
135 plan_pk: self.plan_pk.iter().map(|k| k.to_protobuf()).collect_vec(),
136 downstream_pk: self.downstream_pk.iter().map(|idx| *idx as _).collect_vec(),
137 distribution_key: self.distribution_key.iter().map(|k| *k as _).collect_vec(),
138 properties: self.properties.clone().into_iter().collect(),
139 sink_type: self.sink_type.to_proto() as i32,
140 format_desc: self.format_desc.as_ref().map(|f| f.to_proto()),
141 db_name: self.db_name.clone(),
142 sink_from_name: self.sink_from_name.clone(),
143 target_table: self.target_table.map(|table_id| table_id.table_id()),
144 extra_partition_col_idx: self.extra_partition_col_idx.map(|idx| idx as u64),
145 secret_refs: self.secret_refs.clone(),
146 }
147 }
148
149 pub fn is_file_sink(&self) -> bool {
150 self.properties
151 .get(CONNECTOR_TYPE_KEY)
152 .map(|s| {
153 s.eq_ignore_ascii_case(FS_SINK)
154 || s.eq_ignore_ascii_case(AZBLOB_SINK)
155 || s.eq_ignore_ascii_case(S3_SINK)
156 || s.eq_ignore_ascii_case(WEBHDFS_SINK)
157 })
158 .unwrap_or(false)
159 }
160}