risingwave_stream/from_proto/
sink.rs1use std::sync::Arc;
16
17use anyhow::anyhow;
18use futures::FutureExt;
19use risingwave_common::bail;
20use risingwave_common::catalog::{ColumnCatalog, Schema};
21use risingwave_common::secret::LocalSecretManager;
22use risingwave_common::types::DataType;
23use risingwave_connector::match_sink_name_str;
24use risingwave_connector::sink::catalog::{SinkFormat, SinkFormatDesc, SinkId, SinkType};
25use risingwave_connector::sink::file_sink::fs::FsSink;
26use risingwave_connector::sink::iceberg::ICEBERG_SINK;
27use risingwave_connector::sink::{
28 CONNECTOR_TYPE_KEY, SINK_TYPE_OPTION, SinkError, SinkMetaClient, SinkParam, SinkWriterParam,
29 build_sink,
30};
31use risingwave_hummock_sdk::HummockReadEpoch;
32use risingwave_pb::catalog::Table;
33use risingwave_pb::plan_common::PbColumnCatalog;
34use risingwave_pb::stream_plan::{SinkLogStoreType, SinkNode};
35use risingwave_storage::store::TryWaitEpochOptions;
36use url::Url;
37
38use super::*;
39use crate::common::log_store_impl::in_mem::BoundedInMemLogStoreFactory;
40use crate::common::log_store_impl::kv_log_store::{
41 KV_LOG_STORE_V2_INFO, KvLogStoreFactory, KvLogStoreMetrics, KvLogStorePkInfo,
42};
43use crate::executor::{SinkExecutor, StreamExecutorError};
44
45pub struct SinkExecutorBuilder;
46
47fn resolve_pk_info(
48 input_schema: &Schema,
49 log_store_table: &Table,
50) -> StreamResult<&'static KvLogStorePkInfo> {
51 let predefined_column_len = log_store_table.columns.len() - input_schema.fields.len();
52
53 #[expect(deprecated)]
54 let info = match predefined_column_len {
55 len if len
56 == crate::common::log_store_impl::kv_log_store::KV_LOG_STORE_V1_INFO
57 .predefined_column_len() =>
58 {
59 Ok(&crate::common::log_store_impl::kv_log_store::KV_LOG_STORE_V1_INFO)
60 }
61 len if len == KV_LOG_STORE_V2_INFO.predefined_column_len() => Ok(&KV_LOG_STORE_V2_INFO),
62 other_len => Err(anyhow!(
63 "invalid log store predefined len {}. log store table: {:?}, input_schema: {:?}",
64 other_len,
65 log_store_table,
66 input_schema
67 )),
68 }?;
69 validate_payload_schema(
70 &log_store_table.columns[predefined_column_len..],
71 input_schema,
72 )?;
73 Ok(info)
74}
75
76fn validate_payload_schema(
77 log_store_payload_schema: &[PbColumnCatalog],
78 input_schema: &Schema,
79) -> StreamResult<()> {
80 if log_store_payload_schema
81 .iter()
82 .zip_eq(input_schema.fields.iter())
83 .all(|(log_store_col, input_field)| {
84 let log_store_col_type = DataType::from(
85 log_store_col
86 .column_desc
87 .as_ref()
88 .unwrap()
89 .column_type
90 .as_ref()
91 .unwrap(),
92 );
93 log_store_col_type.equals_datatype(&input_field.data_type)
94 })
95 {
96 Ok(())
97 } else {
98 Err(anyhow!(
99 "mismatch schema: log store: {:?}, input: {:?}",
100 log_store_payload_schema,
101 input_schema
102 )
103 .into())
104 }
105}
106
107impl ExecutorBuilder for SinkExecutorBuilder {
108 type Node = SinkNode;
109
110 async fn new_boxed_executor(
111 params: ExecutorParams,
112 node: &Self::Node,
113 state_store: impl StateStore,
114 ) -> StreamResult<Executor> {
115 let [input_executor]: [_; 1] = params.input.try_into().unwrap();
116 let input_data_types = input_executor.info().schema.data_types();
117 let chunk_size = params.config.developer.chunk_size;
118
119 let sink_desc = node.sink_desc.as_ref().unwrap();
120 let sink_id: SinkId = sink_desc.get_id();
121 let sink_name = sink_desc.get_name().to_owned();
122 let db_name = sink_desc.get_db_name().into();
123 let sink_from_name = sink_desc.get_sink_from_name().into();
124 let properties = sink_desc.get_properties().clone();
125 let secret_refs = sink_desc.get_secret_refs().clone();
126 let downstream_pk = if sink_desc.downstream_pk.is_empty() {
127 None
128 } else {
129 Some(
130 (sink_desc.downstream_pk.iter())
131 .map(|idx| *idx as usize)
132 .collect_vec(),
133 )
134 };
135 let columns = sink_desc
136 .column_catalogs
137 .clone()
138 .into_iter()
139 .map(ColumnCatalog::from)
140 .collect_vec();
141
142 let mut properties_with_secret =
143 LocalSecretManager::global().fill_secrets(properties, secret_refs)?;
144
145 if params.config.developer.switch_jdbc_pg_to_native
146 && let Some(connector_type) = properties_with_secret.get(CONNECTOR_TYPE_KEY)
147 && connector_type == "jdbc"
148 && let Some(url) = properties_with_secret.get("jdbc.url")
149 && url.starts_with("jdbc:postgresql:")
150 {
151 tracing::info!("switching to native postgres connector");
152 let jdbc_url = parse_jdbc_url(url)
153 .map_err(|e| StreamExecutorError::from((SinkError::Config(e), sink_id)))?;
154 properties_with_secret.insert(CONNECTOR_TYPE_KEY.to_owned(), "postgres".to_owned());
155 properties_with_secret.insert("host".to_owned(), jdbc_url.host);
156 properties_with_secret.insert("port".to_owned(), jdbc_url.port.to_string());
157 properties_with_secret.insert("database".to_owned(), jdbc_url.db_name);
158 if let Some(username) = jdbc_url.username {
159 properties_with_secret.insert("user".to_owned(), username);
160 }
161 if let Some(password) = jdbc_url.password {
162 properties_with_secret.insert("password".to_owned(), password);
163 }
164 if let Some(table_name) = properties_with_secret.get("table.name") {
165 properties_with_secret.insert("table".to_owned(), table_name.clone());
166 }
167 if let Some(schema_name) = properties_with_secret.get("schema.name") {
168 properties_with_secret.insert("schema".to_owned(), schema_name.clone());
169 }
170 }
172
173 let connector = {
174 let sink_type = properties_with_secret
175 .get(CONNECTOR_TYPE_KEY)
176 .ok_or_else(|| {
177 StreamExecutorError::from((
178 SinkError::Config(anyhow!("missing config: {}", CONNECTOR_TYPE_KEY)),
179 sink_id,
180 ))
181 })?;
182
183 let sink_type_str = sink_type.to_lowercase();
184 match_sink_name_str!(
185 sink_type_str.as_str(),
186 SinkType,
187 Ok(SinkType::SINK_NAME),
188 |other: &str| {
189 Err(StreamExecutorError::from((
190 SinkError::Config(anyhow!("unsupported sink connector {}", other)),
191 sink_id,
192 )))
193 }
194 )?
195 };
196 let format_desc = match &sink_desc.format_desc {
197 Some(f) => Some(
199 f.clone()
200 .try_into()
201 .map_err(|e| StreamExecutorError::from((e, sink_id)))?,
202 ),
203 None => match properties_with_secret.get(SINK_TYPE_OPTION) {
204 Some(t) => SinkFormatDesc::from_legacy_type(connector, t)
206 .map_err(|e| StreamExecutorError::from((e, sink_id)))?,
207 None => None,
209 },
210 };
211
212 let format_desc = SinkParam::fill_secret_for_format_desc(format_desc)
213 .map_err(|e| StreamExecutorError::from((e, sink_id)))?;
214
215 let sink_type = if let Some(format_desc) = &format_desc
217 && format_desc.format == SinkFormat::Debezium
218 {
219 SinkType::Retract
220 } else {
221 let sink_type_from_proto = SinkType::from_proto(sink_desc.get_sink_type().unwrap());
222 if connector.eq_ignore_ascii_case(ICEBERG_SINK)
224 && matches!(sink_type_from_proto, SinkType::Upsert)
225 {
226 SinkType::Retract
227 } else {
228 sink_type_from_proto
229 }
230 };
231
232 let sink_param = SinkParam {
233 sink_id,
234 sink_name: sink_name.clone(),
235 properties: properties_with_secret,
236 columns: columns
237 .iter()
238 .filter(|col| !col.is_hidden)
239 .map(|col| col.column_desc.clone())
240 .collect(),
241 downstream_pk,
242 sink_type,
243 format_desc,
244 db_name,
245 sink_from_name,
246 };
247
248 let sink_write_param = SinkWriterParam {
249 executor_id: params.executor_id,
250 vnode_bitmap: params.vnode_bitmap.clone(),
251 meta_client: params.env.meta_client().map(SinkMetaClient::MetaClient),
252 extra_partition_col_idx: sink_desc.extra_partition_col_idx.map(|v| v as usize),
253
254 actor_id: params.actor_context.id,
255 sink_id,
256 sink_name,
257 connector: connector.to_owned(),
258 streaming_config: params.config.as_ref().clone(),
259 };
260
261 let log_store_identity = format!(
262 "sink[{}]-[{}]-executor[{}]",
263 connector, sink_id, params.executor_id
264 );
265
266 let sink = build_sink(sink_param.clone())
267 .map_err(|e| StreamExecutorError::from((e, sink_param.sink_id)))?;
268
269 let exec = match node.log_store_type() {
270 SinkLogStoreType::InMemoryLogStore | SinkLogStoreType::Unspecified => {
273 let factory = BoundedInMemLogStoreFactory::new(1, {
274 let state_store = state_store.clone();
275 let table_id = node.table.as_ref().map(|table| table.id);
276 move |epoch| {
277 async move {
278 if let Some(table_id) = table_id {
279 state_store
280 .try_wait_epoch(
281 HummockReadEpoch::Committed(epoch.prev),
282 TryWaitEpochOptions { table_id },
283 )
284 .await?;
285 }
286 Ok(())
287 }
288 .boxed()
289 }
290 });
291 SinkExecutor::new(
292 params.actor_context,
293 params.info.clone(),
294 input_executor,
295 sink_write_param,
296 sink,
297 sink_param,
298 columns,
299 factory,
300 chunk_size,
301 input_data_types,
302 node.rate_limit.map(|x| x as _),
303 )
304 .await?
305 .boxed()
306 }
307 SinkLogStoreType::KvLogStore => {
308 let metrics = KvLogStoreMetrics::new(
309 ¶ms.executor_stats,
310 params.actor_context.id,
311 &sink_param,
312 connector,
313 );
314
315 let table = node.table.as_ref().unwrap().clone();
316 let input_schema = input_executor.schema();
317 let pk_info = resolve_pk_info(input_schema, &table)?;
318
319 let factory = KvLogStoreFactory::new(
321 state_store,
322 table,
323 params.vnode_bitmap.clone().map(Arc::new),
324 65536,
325 params.config.developer.chunk_size,
326 metrics,
327 log_store_identity,
328 pk_info,
329 );
330
331 SinkExecutor::new(
332 params.actor_context,
333 params.info.clone(),
334 input_executor,
335 sink_write_param,
336 sink,
337 sink_param,
338 columns,
339 factory,
340 chunk_size,
341 input_data_types,
342 node.rate_limit.map(|x| x as _),
343 )
344 .await?
345 .boxed()
346 }
347 };
348
349 Ok((params.info, exec).into())
350 }
351}
352
353struct JdbcUrl {
354 host: String,
355 port: u16,
356 db_name: String,
357 username: Option<String>,
358 password: Option<String>,
359}
360
361fn parse_jdbc_url(url: &str) -> anyhow::Result<JdbcUrl> {
362 if !url.starts_with("jdbc:postgresql") {
363 bail!(
364 "invalid jdbc url, to switch to postgres rust connector, we need to use the url jdbc:postgresql://..."
365 )
366 }
367
368 let url = url.replace("jdbc:", "");
370
371 let url = Url::parse(&url).map_err(|e| anyhow!(e).context("failed to parse jdbc url"))?;
373
374 let scheme = url.scheme();
375 assert_eq!("postgresql", scheme, "jdbc scheme should be postgresql");
376 let host = url
377 .host_str()
378 .ok_or_else(|| anyhow!("missing host in jdbc url"))?;
379 let port = url
380 .port()
381 .ok_or_else(|| anyhow!("missing port in jdbc url"))?;
382 let Some(db_name) = url.path().strip_prefix('/') else {
383 bail!("missing db_name in jdbc url");
384 };
385 let mut username = None;
386 let mut password = None;
387 for (key, value) in url.query_pairs() {
388 if key == "user" {
389 username = Some(value.to_string());
390 }
391 if key == "password" {
392 password = Some(value.to_string());
393 }
394 }
395
396 Ok(JdbcUrl {
397 host: host.to_owned(),
398 port,
399 db_name: db_name.to_owned(),
400 username,
401 password,
402 })
403}
404
405#[cfg(test)]
406mod tests {
407 use super::*;
408
409 #[test]
410 fn test_parse_jdbc_url() {
411 let url = "jdbc:postgresql://localhost:5432/test?user=postgres&password=postgres";
412 let jdbc_url = parse_jdbc_url(url).unwrap();
413 assert_eq!(jdbc_url.host, "localhost");
414 assert_eq!(jdbc_url.port, 5432);
415 assert_eq!(jdbc_url.db_name, "test");
416 assert_eq!(jdbc_url.username, Some("postgres".to_owned()));
417 assert_eq!(jdbc_url.password, Some("postgres".to_owned()));
418 }
419}