risingwave_connector/sink/catalog/
desc.rs1use std::collections::BTreeMap;
16
17use itertools::Itertools;
18use risingwave_common::catalog::{
19 ColumnCatalog, ConnectionId, CreateType, DatabaseId, SchemaId, StreamJobStatus, TableId, UserId,
20};
21use risingwave_common::util::sort_util::ColumnOrder;
22use risingwave_pb::secret::PbSecretRef;
23use risingwave_pb::stream_plan::PbSinkDesc;
24
25use super::{SinkCatalog, SinkFormatDesc, SinkId, SinkType};
26use crate::sink::CONNECTOR_TYPE_KEY;
27use crate::sink::file_sink::azblob::AZBLOB_SINK;
28use crate::sink::file_sink::fs::FS_SINK;
29use crate::sink::file_sink::s3::S3_SINK;
30use crate::sink::file_sink::webhdfs::WEBHDFS_SINK;
31
32#[derive(Debug, Clone, PartialEq, Eq, Hash)]
33pub struct SinkDesc {
34 pub id: SinkId,
36
37 pub name: String,
39
40 pub definition: String,
42
43 pub columns: Vec<ColumnCatalog>,
45
46 pub plan_pk: Vec<ColumnOrder>,
48
49 pub downstream_pk: Vec<usize>,
51
52 pub distribution_key: Vec<usize>,
55
56 pub properties: BTreeMap<String, String>,
58
59 pub secret_refs: BTreeMap<String, PbSecretRef>,
61
62 pub sink_type: SinkType,
66
67 pub format_desc: Option<SinkFormatDesc>,
69
70 pub db_name: String,
72
73 pub sink_from_name: String,
76
77 pub target_table: Option<TableId>,
79
80 pub extra_partition_col_idx: Option<usize>,
82
83 pub create_type: CreateType,
85
86 pub is_exactly_once: bool,
87
88 pub auto_refresh_schema_from_table: Option<TableId>,
89}
90
91impl SinkDesc {
92 pub fn into_catalog(
93 self,
94 schema_id: SchemaId,
95 database_id: DatabaseId,
96 owner: UserId,
97 connection_id: Option<ConnectionId>,
98 ) -> SinkCatalog {
99 SinkCatalog {
100 id: self.id,
101 schema_id,
102 database_id,
103 name: self.name,
104 definition: self.definition,
105 columns: self.columns,
106 plan_pk: self.plan_pk,
107 downstream_pk: self.downstream_pk,
108 distribution_key: self.distribution_key,
109 owner,
110 properties: self.properties,
111 secret_refs: self.secret_refs,
112 sink_type: self.sink_type,
113 format_desc: self.format_desc,
114 connection_id,
115 created_at_epoch: None,
116 initialized_at_epoch: None,
117 db_name: self.db_name,
118 sink_from_name: self.sink_from_name,
119 auto_refresh_schema_from_table: self.auto_refresh_schema_from_table,
120 target_table: self.target_table,
121 created_at_cluster_version: None,
122 initialized_at_cluster_version: None,
123 create_type: self.create_type,
124 original_target_columns: vec![],
125 stream_job_status: StreamJobStatus::Creating,
126 }
127 }
128
129 pub fn to_proto(&self) -> PbSinkDesc {
130 PbSinkDesc {
131 id: self.id.sink_id,
132 name: self.name.clone(),
133 definition: self.definition.clone(),
134 column_catalogs: self
135 .columns
136 .iter()
137 .map(|column| column.to_protobuf())
138 .collect_vec(),
139 plan_pk: self.plan_pk.iter().map(|k| k.to_protobuf()).collect_vec(),
140 downstream_pk: self.downstream_pk.iter().map(|idx| *idx as _).collect_vec(),
141 distribution_key: self.distribution_key.iter().map(|k| *k as _).collect_vec(),
142 properties: self.properties.clone().into_iter().collect(),
143 sink_type: self.sink_type.to_proto() as i32,
144 format_desc: self.format_desc.as_ref().map(|f| f.to_proto()),
145 db_name: self.db_name.clone(),
146 sink_from_name: self.sink_from_name.clone(),
147 target_table: self.target_table.map(|table_id| table_id.table_id()),
148 extra_partition_col_idx: self.extra_partition_col_idx.map(|idx| idx as u64),
149 secret_refs: self.secret_refs.clone(),
150 }
151 }
152
153 pub fn is_file_sink(&self) -> bool {
154 self.properties
155 .get(CONNECTOR_TYPE_KEY)
156 .map(|s| {
157 s.eq_ignore_ascii_case(FS_SINK)
158 || s.eq_ignore_ascii_case(AZBLOB_SINK)
159 || s.eq_ignore_ascii_case(S3_SINK)
160 || s.eq_ignore_ascii_case(WEBHDFS_SINK)
161 })
162 .unwrap_or(false)
163 }
164}