1use std::collections::BTreeMap;
16use std::sync::Arc;
17
18use anyhow::anyhow;
19use futures::FutureExt;
20use risingwave_common::bail;
21use risingwave_common::catalog::{ColumnCatalog, Schema};
22use risingwave_common::secret::LocalSecretManager;
23use risingwave_common::types::DataType;
24use risingwave_connector::match_sink_name_str;
25use risingwave_connector::sink::catalog::{SinkFormat, SinkFormatDesc, SinkId, SinkType};
26use risingwave_connector::sink::file_sink::fs::FsSink;
27use risingwave_connector::sink::iceberg::ICEBERG_SINK;
28use risingwave_connector::sink::{
29 CONNECTOR_TYPE_KEY, SINK_TYPE_OPTION, SinkError, SinkMetaClient, SinkParam, SinkWriterParam,
30 build_sink,
31};
32use risingwave_hummock_sdk::HummockReadEpoch;
33use risingwave_pb::catalog::Table;
34use risingwave_pb::plan_common::PbColumnCatalog;
35use risingwave_pb::stream_plan::{PbSinkDesc, SinkLogStoreType, SinkNode};
36use risingwave_storage::store::TryWaitEpochOptions;
37use url::Url;
38
39use super::*;
40use crate::common::log_store_impl::in_mem::BoundedInMemLogStoreFactory;
41use crate::common::log_store_impl::kv_log_store::{
42 KV_LOG_STORE_V2_INFO, KvLogStoreFactory, KvLogStoreMetrics, KvLogStorePkInfo,
43};
44use crate::executor::{SinkExecutor, StreamExecutorError};
45
46pub(super) fn build_sink_param(
55 sink_desc: &PbSinkDesc,
56 properties_with_secret: BTreeMap<String, String>,
57 connector: &str,
58) -> Result<(SinkParam, Vec<ColumnCatalog>), StreamExecutorError> {
59 let sink_id: SinkId = sink_desc.get_id();
60 let sink_name = sink_desc.get_name().to_owned();
61 let db_name = sink_desc.get_db_name().into();
62 let sink_from_name = sink_desc.get_sink_from_name().into();
63 let downstream_pk = if sink_desc.downstream_pk.is_empty() {
64 None
65 } else {
66 Some(
67 (sink_desc.downstream_pk.iter())
68 .map(|idx| *idx as usize)
69 .collect_vec(),
70 )
71 };
72 let columns = sink_desc
73 .column_catalogs
74 .clone()
75 .into_iter()
76 .map(ColumnCatalog::from)
77 .collect_vec();
78
79 let format_desc = match &sink_desc.format_desc {
80 Some(f) => Some(
82 f.clone()
83 .try_into()
84 .map_err(|e| StreamExecutorError::from((e, sink_id)))?,
85 ),
86 None => match properties_with_secret.get(SINK_TYPE_OPTION) {
87 Some(t) => SinkFormatDesc::from_legacy_type(connector, t)
89 .map_err(|e| StreamExecutorError::from((e, sink_id)))?,
90 None => None,
92 },
93 };
94 let format_desc = SinkParam::fill_secret_for_format_desc(format_desc)
95 .map_err(|e| StreamExecutorError::from((e, sink_id)))?;
96
97 let sink_type = if let Some(format_desc) = &format_desc
99 && format_desc.format == SinkFormat::Debezium
100 {
101 SinkType::Retract
102 } else {
103 let sink_type_from_proto = SinkType::from_proto(sink_desc.get_sink_type().unwrap());
104 if connector.eq_ignore_ascii_case(ICEBERG_SINK)
106 && matches!(sink_type_from_proto, SinkType::Upsert)
107 {
108 SinkType::Retract
109 } else {
110 sink_type_from_proto
111 }
112 };
113 let ignore_delete = sink_desc.ignore_delete();
114
115 let sink_param = SinkParam {
116 sink_id,
117 sink_name,
118 properties: properties_with_secret,
119 columns: columns
120 .iter()
121 .filter(|col| !col.is_hidden)
122 .map(|col| col.column_desc.clone())
123 .collect(),
124 downstream_pk,
125 sink_type,
126 ignore_delete,
127 format_desc,
128 db_name,
129 sink_from_name,
130 };
131
132 Ok((sink_param, columns))
133}
134
135pub struct SinkExecutorBuilder;
136
137fn resolve_pk_info(
138 input_schema: &Schema,
139 log_store_table: &Table,
140) -> StreamResult<&'static KvLogStorePkInfo> {
141 let predefined_column_len = log_store_table.columns.len() - input_schema.fields.len();
142
143 #[expect(deprecated)]
144 let info = match predefined_column_len {
145 len if len
146 == crate::common::log_store_impl::kv_log_store::KV_LOG_STORE_V1_INFO
147 .predefined_column_len() =>
148 {
149 Ok(&crate::common::log_store_impl::kv_log_store::KV_LOG_STORE_V1_INFO)
150 }
151 len if len == KV_LOG_STORE_V2_INFO.predefined_column_len() => Ok(&KV_LOG_STORE_V2_INFO),
152 other_len => Err(anyhow!(
153 "invalid log store predefined len {}. log store table: {:?}, input_schema: {:?}",
154 other_len,
155 log_store_table,
156 input_schema
157 )),
158 }?;
159 validate_payload_schema(
160 &log_store_table.columns[predefined_column_len..],
161 input_schema,
162 )?;
163 Ok(info)
164}
165
166fn validate_payload_schema(
167 log_store_payload_schema: &[PbColumnCatalog],
168 input_schema: &Schema,
169) -> StreamResult<()> {
170 if log_store_payload_schema
171 .iter()
172 .zip_eq(input_schema.fields.iter())
173 .all(|(log_store_col, input_field)| {
174 let log_store_col_type = DataType::from(
175 log_store_col
176 .column_desc
177 .as_ref()
178 .unwrap()
179 .column_type
180 .as_ref()
181 .unwrap(),
182 );
183 log_store_col_type.equals_datatype(&input_field.data_type)
184 })
185 {
186 Ok(())
187 } else {
188 Err(anyhow!(
189 "mismatch schema: log store: {:?}, input: {:?}",
190 log_store_payload_schema,
191 input_schema
192 )
193 .into())
194 }
195}
196
197impl_stream_node_body!(Sink(SinkNode) => SinkExecutorBuilder);
198
199impl ExecutorBuilder for SinkExecutorBuilder {
200 type Node = SinkNode;
201
202 async fn new_boxed_executor(
203 params: ExecutorParams,
204 node: &Self::Node,
205 state_store: impl StateStore,
206 ) -> StreamResult<Executor> {
207 let [input_executor]: [_; 1] = params.input.try_into().unwrap();
208 let input_data_types = input_executor.info().schema.data_types();
209 let chunk_size = params.config.developer.chunk_size;
210
211 let sink_desc = node.sink_desc.as_ref().unwrap();
212 let sink_id: SinkId = sink_desc.get_id();
213 let sink_name = sink_desc.get_name().to_owned();
214 let properties = sink_desc.get_properties().clone();
215 let secret_refs = sink_desc.get_secret_refs().clone();
216
217 let mut properties_with_secret =
218 LocalSecretManager::global().fill_secrets(properties, secret_refs)?;
219
220 if params.config.developer.switch_jdbc_pg_to_native
221 && let Some(connector_type) = properties_with_secret.get(CONNECTOR_TYPE_KEY)
222 && connector_type == "jdbc"
223 && let Some(url) = properties_with_secret.get("jdbc.url")
224 && url.starts_with("jdbc:postgresql:")
225 {
226 tracing::info!("switching to native postgres connector");
227 let jdbc_url = parse_jdbc_url(url)
228 .map_err(|e| StreamExecutorError::from((SinkError::Config(e), sink_id)))?;
229 properties_with_secret.insert(CONNECTOR_TYPE_KEY.to_owned(), "postgres".to_owned());
230 properties_with_secret.insert("host".to_owned(), jdbc_url.host);
231 properties_with_secret.insert("port".to_owned(), jdbc_url.port.to_string());
232 properties_with_secret.insert("database".to_owned(), jdbc_url.db_name);
233 if let Some(username) = jdbc_url.username {
234 properties_with_secret.insert("user".to_owned(), username);
235 }
236 if let Some(password) = jdbc_url.password {
237 properties_with_secret.insert("password".to_owned(), password);
238 }
239 if let Some(table_name) = properties_with_secret.get("table.name") {
240 properties_with_secret.insert("table".to_owned(), table_name.clone());
241 }
242 if let Some(schema_name) = properties_with_secret.get("schema.name") {
243 properties_with_secret.insert("schema".to_owned(), schema_name.clone());
244 }
245 }
247
248 let connector = {
249 let sink_type = properties_with_secret
250 .get(CONNECTOR_TYPE_KEY)
251 .ok_or_else(|| {
252 StreamExecutorError::from((
253 SinkError::Config(anyhow!("missing config: {}", CONNECTOR_TYPE_KEY)),
254 sink_id,
255 ))
256 })?;
257
258 let sink_type_str = sink_type.to_lowercase();
259 match_sink_name_str!(
260 sink_type_str.as_str(),
261 SinkType,
262 Ok(SinkType::SINK_NAME),
263 |other: &str| {
264 Err(StreamExecutorError::from((
265 SinkError::Config(anyhow!("unsupported sink connector {}", other)),
266 sink_id,
267 )))
268 }
269 )?
270 };
271
272 let (sink_param, columns) = build_sink_param(sink_desc, properties_with_secret, connector)?;
273
274 let sink_write_param = SinkWriterParam {
275 executor_id: params.executor_id,
276 vnode_bitmap: params.vnode_bitmap.clone(),
277 meta_client: params.env.meta_client().map(SinkMetaClient::MetaClient),
278 extra_partition_col_idx: sink_desc.extra_partition_col_idx.map(|v| v as usize),
279
280 actor_id: params.actor_context.id,
281 sink_id,
282 sink_name,
283 connector: connector.to_owned(),
284 streaming_config: params.config.as_ref().clone(),
285 time_zone: params.actor_context.time_zone,
286 };
287
288 let log_store_identity = format!(
289 "sink[{}]-[{}]-executor[{}]",
290 connector, sink_id, params.executor_id
291 );
292
293 let sink = build_sink(sink_param.clone())
294 .map_err(|e| StreamExecutorError::from((e, sink_param.sink_id)))?;
295
296 let exec = match node.log_store_type() {
297 SinkLogStoreType::InMemoryLogStore | SinkLogStoreType::Unspecified => {
300 let factory = BoundedInMemLogStoreFactory::new(1, {
301 let state_store = state_store;
302 let table_id = node.table.as_ref().map(|table| table.id);
303 move |epoch| {
304 async move {
305 if let Some(table_id) = table_id {
306 state_store
307 .try_wait_epoch(
308 HummockReadEpoch::Committed(epoch.prev),
309 TryWaitEpochOptions { table_id },
310 )
311 .await?;
312 }
313 Ok(())
314 }
315 .boxed()
316 }
317 });
318 SinkExecutor::new(
319 params.actor_context,
320 params.info.clone(),
321 input_executor,
322 sink_write_param,
323 sink,
324 sink_param,
325 columns,
326 factory,
327 chunk_size,
328 input_data_types,
329 node.rate_limit.map(|x| x as _),
330 )?
331 .boxed()
332 }
333 SinkLogStoreType::KvLogStore => {
334 let metrics = KvLogStoreMetrics::new(
335 ¶ms.executor_stats,
336 params.actor_context.id,
337 &sink_param,
338 connector,
339 );
340
341 let table = node.table.as_ref().unwrap().clone();
342 let input_schema = input_executor.schema();
343 let pk_info = resolve_pk_info(input_schema, &table)?;
344
345 let factory = KvLogStoreFactory::new(
347 state_store,
348 table,
349 params.vnode_bitmap.clone().map(Arc::new),
350 65536,
351 params.config.developer.chunk_size,
352 metrics,
353 log_store_identity,
354 params.env.kv_log_store_historical_read_semaphore(),
355 pk_info,
356 );
357
358 SinkExecutor::new(
359 params.actor_context,
360 params.info.clone(),
361 input_executor,
362 sink_write_param,
363 sink,
364 sink_param,
365 columns,
366 factory,
367 chunk_size,
368 input_data_types,
369 node.rate_limit.map(|x| x as _),
370 )?
371 .boxed()
372 }
373 };
374
375 Ok((params.info, exec).into())
376 }
377}
378
379struct JdbcUrl {
380 host: String,
381 port: u16,
382 db_name: String,
383 username: Option<String>,
384 password: Option<String>,
385}
386
387fn parse_jdbc_url(url: &str) -> anyhow::Result<JdbcUrl> {
388 if !url.starts_with("jdbc:postgresql") {
389 bail!(
390 "invalid jdbc url, to switch to postgres rust connector, we need to use the url jdbc:postgresql://..."
391 )
392 }
393
394 let url = url.replace("jdbc:", "");
396
397 let url = Url::parse(&url).map_err(|e| anyhow!(e).context("failed to parse jdbc url"))?;
399
400 let scheme = url.scheme();
401 assert_eq!("postgresql", scheme, "jdbc scheme should be postgresql");
402 let host = url
403 .host_str()
404 .ok_or_else(|| anyhow!("missing host in jdbc url"))?;
405 let port = url
406 .port()
407 .ok_or_else(|| anyhow!("missing port in jdbc url"))?;
408 let Some(db_name) = url.path().strip_prefix('/') else {
409 bail!("missing db_name in jdbc url");
410 };
411 let mut username = None;
412 let mut password = None;
413 for (key, value) in url.query_pairs() {
414 if key == "user" {
415 username = Some(value.to_string());
416 }
417 if key == "password" {
418 password = Some(value.to_string());
419 }
420 }
421
422 Ok(JdbcUrl {
423 host: host.to_owned(),
424 port,
425 db_name: db_name.to_owned(),
426 username,
427 password,
428 })
429}
430
431#[cfg(test)]
432mod tests {
433 use super::*;
434
435 #[test]
436 fn test_parse_jdbc_url() {
437 let url = "jdbc:postgresql://localhost:5432/test?user=postgres&password=postgres";
438 let jdbc_url = parse_jdbc_url(url).unwrap();
439 assert_eq!(jdbc_url.host, "localhost");
440 assert_eq!(jdbc_url.port, 5432);
441 assert_eq!(jdbc_url.db_name, "test");
442 assert_eq!(jdbc_url.username, Some("postgres".to_owned()));
443 assert_eq!(jdbc_url.password, Some("postgres".to_owned()));
444 }
445}