risingwave_stream/from_proto/
sink.rs1use std::sync::Arc;
16
17use anyhow::anyhow;
18use risingwave_common::bail;
19use risingwave_common::catalog::{ColumnCatalog, Schema};
20use risingwave_common::secret::LocalSecretManager;
21use risingwave_common::types::DataType;
22use risingwave_connector::match_sink_name_str;
23use risingwave_connector::sink::catalog::{SinkFormatDesc, SinkId, SinkType};
24use risingwave_connector::sink::file_sink::fs::FsSink;
25use risingwave_connector::sink::{
26 CONNECTOR_TYPE_KEY, SINK_TYPE_OPTION, SinkError, SinkMetaClient, SinkParam, SinkWriterParam,
27 build_sink,
28};
29use risingwave_pb::catalog::Table;
30use risingwave_pb::plan_common::PbColumnCatalog;
31use risingwave_pb::stream_plan::{SinkLogStoreType, SinkNode};
32use url::Url;
33
34use super::*;
35use crate::common::log_store_impl::in_mem::BoundedInMemLogStoreFactory;
36use crate::common::log_store_impl::kv_log_store::{
37 KV_LOG_STORE_V2_INFO, KvLogStoreFactory, KvLogStoreMetrics, KvLogStorePkInfo,
38};
39use crate::executor::{SinkExecutor, StreamExecutorError};
40
41pub struct SinkExecutorBuilder;
42
43fn resolve_pk_info(
44 input_schema: &Schema,
45 log_store_table: &Table,
46) -> StreamResult<&'static KvLogStorePkInfo> {
47 let predefined_column_len = log_store_table.columns.len() - input_schema.fields.len();
48
49 #[expect(deprecated)]
50 let info = match predefined_column_len {
51 len if len
52 == crate::common::log_store_impl::kv_log_store::KV_LOG_STORE_V1_INFO
53 .predefined_column_len() =>
54 {
55 Ok(&crate::common::log_store_impl::kv_log_store::KV_LOG_STORE_V1_INFO)
56 }
57 len if len == KV_LOG_STORE_V2_INFO.predefined_column_len() => Ok(&KV_LOG_STORE_V2_INFO),
58 other_len => Err(anyhow!(
59 "invalid log store predefined len {}. log store table: {:?}, input_schema: {:?}",
60 other_len,
61 log_store_table,
62 input_schema
63 )),
64 }?;
65 validate_payload_schema(
66 &log_store_table.columns[predefined_column_len..],
67 input_schema,
68 )?;
69 Ok(info)
70}
71
72fn validate_payload_schema(
73 log_store_payload_schema: &[PbColumnCatalog],
74 input_schema: &Schema,
75) -> StreamResult<()> {
76 if log_store_payload_schema
77 .iter()
78 .zip_eq(input_schema.fields.iter())
79 .all(|(log_store_col, input_field)| {
80 let log_store_col_type = DataType::from(
81 log_store_col
82 .column_desc
83 .as_ref()
84 .unwrap()
85 .column_type
86 .as_ref()
87 .unwrap(),
88 );
89 log_store_col_type.equals_datatype(&input_field.data_type)
90 })
91 {
92 Ok(())
93 } else {
94 Err(anyhow!(
95 "mismatch schema: log store: {:?}, input: {:?}",
96 log_store_payload_schema,
97 input_schema
98 )
99 .into())
100 }
101}
102
103impl ExecutorBuilder for SinkExecutorBuilder {
104 type Node = SinkNode;
105
106 async fn new_boxed_executor(
107 params: ExecutorParams,
108 node: &Self::Node,
109 state_store: impl StateStore,
110 ) -> StreamResult<Executor> {
111 let [input_executor]: [_; 1] = params.input.try_into().unwrap();
112 let input_data_types = input_executor.info().schema.data_types();
113 let chunk_size = params.env.config().developer.chunk_size;
114
115 let sink_desc = node.sink_desc.as_ref().unwrap();
116 let sink_type = SinkType::from_proto(sink_desc.get_sink_type().unwrap());
117 let sink_id: SinkId = sink_desc.get_id().into();
118 let sink_name = sink_desc.get_name().to_owned();
119 let db_name = sink_desc.get_db_name().into();
120 let sink_from_name = sink_desc.get_sink_from_name().into();
121 let properties = sink_desc.get_properties().clone();
122 let secret_refs = sink_desc.get_secret_refs().clone();
123 let downstream_pk = sink_desc
124 .downstream_pk
125 .iter()
126 .map(|i| *i as usize)
127 .collect_vec();
128 let columns = sink_desc
129 .column_catalogs
130 .clone()
131 .into_iter()
132 .map(ColumnCatalog::from)
133 .collect_vec();
134
135 let mut properties_with_secret =
136 LocalSecretManager::global().fill_secrets(properties, secret_refs)?;
137
138 if params.env.config().developer.switch_jdbc_pg_to_native
139 && let Some(connector_type) = properties_with_secret.get(CONNECTOR_TYPE_KEY)
140 && connector_type == "jdbc"
141 && let Some(url) = properties_with_secret.get("jdbc.url")
142 && url.starts_with("jdbc:postgresql:")
143 {
144 tracing::info!("switching to native postgres connector");
145 let jdbc_url = parse_jdbc_url(url)
146 .map_err(|e| StreamExecutorError::from((SinkError::Config(e), sink_id.sink_id)))?;
147 properties_with_secret.insert(CONNECTOR_TYPE_KEY.to_owned(), "postgres".to_owned());
148 properties_with_secret.insert("host".to_owned(), jdbc_url.host);
149 properties_with_secret.insert("port".to_owned(), jdbc_url.port.to_string());
150 properties_with_secret.insert("database".to_owned(), jdbc_url.db_name);
151 if let Some(username) = jdbc_url.username {
152 properties_with_secret.insert("user".to_owned(), username);
153 }
154 if let Some(password) = jdbc_url.password {
155 properties_with_secret.insert("password".to_owned(), password);
156 }
157 if let Some(table_name) = properties_with_secret.get("table.name") {
158 properties_with_secret.insert("table".to_owned(), table_name.clone());
159 }
160 if let Some(schema_name) = properties_with_secret.get("schema.name") {
161 properties_with_secret.insert("schema".to_owned(), schema_name.clone());
162 }
163 }
165
166 let connector = {
167 let sink_type = properties_with_secret
168 .get(CONNECTOR_TYPE_KEY)
169 .ok_or_else(|| {
170 StreamExecutorError::from((
171 SinkError::Config(anyhow!("missing config: {}", CONNECTOR_TYPE_KEY)),
172 sink_id.sink_id,
173 ))
174 })?;
175
176 let sink_type_str = sink_type.to_string().to_lowercase();
177 match_sink_name_str!(
178 sink_type_str.as_str(),
179 SinkType,
180 Ok(SinkType::SINK_NAME),
181 |other: &str| {
182 Err(StreamExecutorError::from((
183 SinkError::Config(anyhow!("unsupported sink connector {}", other)),
184 sink_id.sink_id,
185 )))
186 }
187 )?
188 };
189 let format_desc = match &sink_desc.format_desc {
190 Some(f) => Some(
192 f.clone()
193 .try_into()
194 .map_err(|e| StreamExecutorError::from((e, sink_id.sink_id)))?,
195 ),
196 None => match properties_with_secret.get(SINK_TYPE_OPTION) {
197 Some(t) => SinkFormatDesc::from_legacy_type(connector, t)
199 .map_err(|e| StreamExecutorError::from((e, sink_id.sink_id)))?,
200 None => None,
202 },
203 };
204
205 let format_desc_with_secret = SinkParam::fill_secret_for_format_desc(format_desc)
206 .map_err(|e| StreamExecutorError::from((e, sink_id.sink_id)))?;
207
208 let sink_param = SinkParam {
209 sink_id,
210 sink_name: sink_name.clone(),
211 properties: properties_with_secret,
212 columns: columns
213 .iter()
214 .filter(|col| !col.is_hidden)
215 .map(|col| col.column_desc.clone())
216 .collect(),
217 downstream_pk,
218 sink_type,
219 format_desc: format_desc_with_secret,
220 db_name,
221 sink_from_name,
222 };
223
224 let sink_write_param = SinkWriterParam {
225 executor_id: params.executor_id,
226 vnode_bitmap: params.vnode_bitmap.clone(),
227 meta_client: params.env.meta_client().map(SinkMetaClient::MetaClient),
228 extra_partition_col_idx: sink_desc.extra_partition_col_idx.map(|v| v as usize),
229
230 actor_id: params.actor_context.id,
231 sink_id,
232 sink_name,
233 connector: connector.to_owned(),
234 streaming_config: params.env.config().as_ref().clone(),
235 };
236
237 let log_store_identity = format!(
238 "sink[{}]-[{}]-executor[{}]",
239 connector, sink_id.sink_id, params.executor_id
240 );
241
242 let sink = build_sink(sink_param.clone())
243 .map_err(|e| StreamExecutorError::from((e, sink_param.sink_id.sink_id)))?;
244
245 let exec = match node.log_store_type() {
246 SinkLogStoreType::InMemoryLogStore | SinkLogStoreType::Unspecified => {
249 let factory = BoundedInMemLogStoreFactory::new(1);
250 SinkExecutor::new(
251 params.actor_context,
252 params.info.clone(),
253 input_executor,
254 sink_write_param,
255 sink,
256 sink_param,
257 columns,
258 factory,
259 chunk_size,
260 input_data_types,
261 node.rate_limit.map(|x| x as _),
262 )
263 .await?
264 .boxed()
265 }
266 SinkLogStoreType::KvLogStore => {
267 let metrics = KvLogStoreMetrics::new(
268 ¶ms.executor_stats,
269 params.actor_context.id,
270 &sink_param,
271 connector,
272 );
273
274 let table = node.table.as_ref().unwrap().clone();
275 let input_schema = input_executor.schema();
276 let pk_info = resolve_pk_info(input_schema, &table)?;
277
278 let factory = KvLogStoreFactory::new(
280 state_store,
281 table,
282 params.vnode_bitmap.clone().map(Arc::new),
283 65536,
284 metrics,
285 log_store_identity,
286 pk_info,
287 );
288
289 SinkExecutor::new(
290 params.actor_context,
291 params.info.clone(),
292 input_executor,
293 sink_write_param,
294 sink,
295 sink_param,
296 columns,
297 factory,
298 chunk_size,
299 input_data_types,
300 node.rate_limit.map(|x| x as _),
301 )
302 .await?
303 .boxed()
304 }
305 };
306
307 Ok((params.info, exec).into())
308 }
309}
310
311struct JdbcUrl {
312 host: String,
313 port: u16,
314 db_name: String,
315 username: Option<String>,
316 password: Option<String>,
317}
318
319fn parse_jdbc_url(url: &str) -> anyhow::Result<JdbcUrl> {
320 if !url.starts_with("jdbc:postgresql") {
321 bail!(
322 "invalid jdbc url, to switch to postgres rust connector, we need to use the url jdbc:postgresql://..."
323 )
324 }
325
326 let url = url.replace("jdbc:", "");
328
329 let url = Url::parse(&url).map_err(|e| anyhow!(e).context("failed to parse jdbc url"))?;
331
332 let scheme = url.scheme();
333 assert_eq!("postgresql", scheme, "jdbc scheme should be postgresql");
334 let host = url
335 .host_str()
336 .ok_or_else(|| anyhow!("missing host in jdbc url"))?;
337 let port = url
338 .port()
339 .ok_or_else(|| anyhow!("missing port in jdbc url"))?;
340 let Some(db_name) = url.path().strip_prefix('/') else {
341 bail!("missing db_name in jdbc url");
342 };
343 let mut username = None;
344 let mut password = None;
345 for (key, value) in url.query_pairs() {
346 if key == "user" {
347 username = Some(value.to_string());
348 }
349 if key == "password" {
350 password = Some(value.to_string());
351 }
352 }
353
354 Ok(JdbcUrl {
355 host: host.to_owned(),
356 port,
357 db_name: db_name.to_owned(),
358 username,
359 password,
360 })
361}
362
363#[cfg(test)]
364mod tests {
365 use super::*;
366
367 #[test]
368 fn test_parse_jdbc_url() {
369 let url = "jdbc:postgresql://localhost:5432/test?user=postgres&password=postgres";
370 let jdbc_url = parse_jdbc_url(url).unwrap();
371 assert_eq!(jdbc_url.host, "localhost");
372 assert_eq!(jdbc_url.port, 5432);
373 assert_eq!(jdbc_url.db_name, "test");
374 assert_eq!(jdbc_url.username, Some("postgres".to_owned()));
375 assert_eq!(jdbc_url.password, Some("postgres".to_owned()));
376 }
377}