risingwave_connector/sink/catalog/
desc.rs1use std::collections::BTreeMap;
16
17use itertools::Itertools;
18use risingwave_common::catalog::{
19 ColumnCatalog, ConnectionId, CreateType, DatabaseId, SchemaId, StreamJobStatus, TableId, UserId,
20};
21use risingwave_common::util::sort_util::ColumnOrder;
22use risingwave_pb::secret::PbSecretRef;
23use risingwave_pb::stream_plan::PbSinkDesc;
24
25use super::{SinkCatalog, SinkFormatDesc, SinkId, SinkType};
26use crate::sink::CONNECTOR_TYPE_KEY;
27use crate::sink::file_sink::azblob::AZBLOB_SINK;
28use crate::sink::file_sink::fs::FS_SINK;
29use crate::sink::file_sink::s3::S3_SINK;
30use crate::sink::file_sink::webhdfs::WEBHDFS_SINK;
31
32#[derive(Debug, Clone, PartialEq, Eq, Hash)]
33pub struct SinkDesc {
34 pub id: SinkId,
36
37 pub name: String,
39
40 pub definition: String,
42
43 pub columns: Vec<ColumnCatalog>,
45
46 pub plan_pk: Vec<ColumnOrder>,
48
49 pub downstream_pk: Option<Vec<usize>>,
51
52 pub distribution_key: Vec<usize>,
55
56 pub properties: BTreeMap<String, String>,
58
59 pub secret_refs: BTreeMap<String, PbSecretRef>,
61
62 pub sink_type: SinkType,
66
67 pub ignore_delete: bool,
69
70 pub format_desc: Option<SinkFormatDesc>,
72
73 pub db_name: String,
75
76 pub sink_from_name: String,
79
80 pub target_table: Option<TableId>,
82
83 pub extra_partition_col_idx: Option<usize>,
85
86 pub create_type: CreateType,
88
89 pub is_exactly_once: Option<bool>,
90
91 pub auto_refresh_schema_from_table: Option<TableId>,
92}
93
94impl SinkDesc {
95 pub fn into_catalog(
96 self,
97 schema_id: SchemaId,
98 database_id: DatabaseId,
99 owner: UserId,
100 connection_id: Option<ConnectionId>,
101 ) -> SinkCatalog {
102 SinkCatalog {
103 id: self.id,
104 schema_id,
105 database_id,
106 name: self.name,
107 definition: self.definition,
108 columns: self.columns,
109 plan_pk: self.plan_pk,
110 downstream_pk: self.downstream_pk,
111 distribution_key: self.distribution_key,
112 owner,
113 properties: self.properties,
114 secret_refs: self.secret_refs,
115 sink_type: self.sink_type,
116 ignore_delete: self.ignore_delete,
117 format_desc: self.format_desc,
118 connection_id,
119 created_at_epoch: None,
120 initialized_at_epoch: None,
121 db_name: self.db_name,
122 sink_from_name: self.sink_from_name,
123 auto_refresh_schema_from_table: self.auto_refresh_schema_from_table,
124 target_table: self.target_table,
125 created_at_cluster_version: None,
126 initialized_at_cluster_version: None,
127 create_type: self.create_type,
128 original_target_columns: vec![],
129 stream_job_status: StreamJobStatus::Creating,
130 }
131 }
132
133 pub fn to_proto(&self) -> PbSinkDesc {
134 PbSinkDesc {
135 id: self.id,
136 name: self.name.clone(),
137 definition: self.definition.clone(),
138 column_catalogs: self
139 .columns
140 .iter()
141 .map(|column| column.to_protobuf())
142 .collect_vec(),
143 plan_pk: self.plan_pk.iter().map(|k| k.to_protobuf()).collect_vec(),
144 downstream_pk: (self.downstream_pk.as_ref())
145 .map_or_else(Vec::new, |pk| pk.iter().map(|idx| *idx as _).collect_vec()),
146 distribution_key: self.distribution_key.iter().map(|k| *k as _).collect_vec(),
147 properties: self.properties.clone().into_iter().collect(),
148 sink_type: self.sink_type.to_proto() as i32,
149 raw_ignore_delete: self.ignore_delete,
150 format_desc: self.format_desc.as_ref().map(|f| f.to_proto()),
151 db_name: self.db_name.clone(),
152 sink_from_name: self.sink_from_name.clone(),
153 target_table: self.target_table.map(|table_id| table_id.as_raw_id()),
154 extra_partition_col_idx: self.extra_partition_col_idx.map(|idx| idx as u64),
155 secret_refs: self.secret_refs.clone(),
156 }
157 }
158
159 pub fn is_file_sink(&self) -> bool {
160 self.properties
161 .get(CONNECTOR_TYPE_KEY)
162 .map(|s| {
163 s.eq_ignore_ascii_case(FS_SINK)
164 || s.eq_ignore_ascii_case(AZBLOB_SINK)
165 || s.eq_ignore_ascii_case(S3_SINK)
166 || s.eq_ignore_ascii_case(WEBHDFS_SINK)
167 })
168 .unwrap_or(false)
169 }
170}