risingwave_stream/from_proto/
sink.rs1use std::sync::Arc;
16
17use anyhow::anyhow;
18use futures::FutureExt;
19use risingwave_common::bail;
20use risingwave_common::catalog::{ColumnCatalog, Schema};
21use risingwave_common::secret::LocalSecretManager;
22use risingwave_common::types::DataType;
23use risingwave_connector::match_sink_name_str;
24use risingwave_connector::sink::catalog::{SinkFormatDesc, SinkId, SinkType};
25use risingwave_connector::sink::file_sink::fs::FsSink;
26use risingwave_connector::sink::{
27 CONNECTOR_TYPE_KEY, SINK_TYPE_OPTION, SinkError, SinkMetaClient, SinkParam, SinkWriterParam,
28 build_sink,
29};
30use risingwave_hummock_sdk::HummockReadEpoch;
31use risingwave_pb::catalog::Table;
32use risingwave_pb::plan_common::PbColumnCatalog;
33use risingwave_pb::stream_plan::{SinkLogStoreType, SinkNode};
34use risingwave_storage::store::TryWaitEpochOptions;
35use url::Url;
36
37use super::*;
38use crate::common::log_store_impl::in_mem::BoundedInMemLogStoreFactory;
39use crate::common::log_store_impl::kv_log_store::{
40 KV_LOG_STORE_V2_INFO, KvLogStoreFactory, KvLogStoreMetrics, KvLogStorePkInfo,
41};
42use crate::executor::{SinkExecutor, StreamExecutorError};
43
44pub struct SinkExecutorBuilder;
45
46fn resolve_pk_info(
47 input_schema: &Schema,
48 log_store_table: &Table,
49) -> StreamResult<&'static KvLogStorePkInfo> {
50 let predefined_column_len = log_store_table.columns.len() - input_schema.fields.len();
51
52 #[expect(deprecated)]
53 let info = match predefined_column_len {
54 len if len
55 == crate::common::log_store_impl::kv_log_store::KV_LOG_STORE_V1_INFO
56 .predefined_column_len() =>
57 {
58 Ok(&crate::common::log_store_impl::kv_log_store::KV_LOG_STORE_V1_INFO)
59 }
60 len if len == KV_LOG_STORE_V2_INFO.predefined_column_len() => Ok(&KV_LOG_STORE_V2_INFO),
61 other_len => Err(anyhow!(
62 "invalid log store predefined len {}. log store table: {:?}, input_schema: {:?}",
63 other_len,
64 log_store_table,
65 input_schema
66 )),
67 }?;
68 validate_payload_schema(
69 &log_store_table.columns[predefined_column_len..],
70 input_schema,
71 )?;
72 Ok(info)
73}
74
75fn validate_payload_schema(
76 log_store_payload_schema: &[PbColumnCatalog],
77 input_schema: &Schema,
78) -> StreamResult<()> {
79 if log_store_payload_schema
80 .iter()
81 .zip_eq(input_schema.fields.iter())
82 .all(|(log_store_col, input_field)| {
83 let log_store_col_type = DataType::from(
84 log_store_col
85 .column_desc
86 .as_ref()
87 .unwrap()
88 .column_type
89 .as_ref()
90 .unwrap(),
91 );
92 log_store_col_type.equals_datatype(&input_field.data_type)
93 })
94 {
95 Ok(())
96 } else {
97 Err(anyhow!(
98 "mismatch schema: log store: {:?}, input: {:?}",
99 log_store_payload_schema,
100 input_schema
101 )
102 .into())
103 }
104}
105
106impl ExecutorBuilder for SinkExecutorBuilder {
107 type Node = SinkNode;
108
109 async fn new_boxed_executor(
110 params: ExecutorParams,
111 node: &Self::Node,
112 state_store: impl StateStore,
113 ) -> StreamResult<Executor> {
114 let [input_executor]: [_; 1] = params.input.try_into().unwrap();
115 let input_data_types = input_executor.info().schema.data_types();
116 let chunk_size = params.env.config().developer.chunk_size;
117
118 let sink_desc = node.sink_desc.as_ref().unwrap();
119 let sink_type = SinkType::from_proto(sink_desc.get_sink_type().unwrap());
120 let sink_id: SinkId = sink_desc.get_id().into();
121 let sink_name = sink_desc.get_name().to_owned();
122 let db_name = sink_desc.get_db_name().into();
123 let sink_from_name = sink_desc.get_sink_from_name().into();
124 let properties = sink_desc.get_properties().clone();
125 let secret_refs = sink_desc.get_secret_refs().clone();
126 let downstream_pk = sink_desc
127 .downstream_pk
128 .iter()
129 .map(|i| *i as usize)
130 .collect_vec();
131 let columns = sink_desc
132 .column_catalogs
133 .clone()
134 .into_iter()
135 .map(ColumnCatalog::from)
136 .collect_vec();
137
138 let mut properties_with_secret =
139 LocalSecretManager::global().fill_secrets(properties, secret_refs)?;
140
141 if params.env.config().developer.switch_jdbc_pg_to_native
142 && let Some(connector_type) = properties_with_secret.get(CONNECTOR_TYPE_KEY)
143 && connector_type == "jdbc"
144 && let Some(url) = properties_with_secret.get("jdbc.url")
145 && url.starts_with("jdbc:postgresql:")
146 {
147 tracing::info!("switching to native postgres connector");
148 let jdbc_url = parse_jdbc_url(url)
149 .map_err(|e| StreamExecutorError::from((SinkError::Config(e), sink_id.sink_id)))?;
150 properties_with_secret.insert(CONNECTOR_TYPE_KEY.to_owned(), "postgres".to_owned());
151 properties_with_secret.insert("host".to_owned(), jdbc_url.host);
152 properties_with_secret.insert("port".to_owned(), jdbc_url.port.to_string());
153 properties_with_secret.insert("database".to_owned(), jdbc_url.db_name);
154 if let Some(username) = jdbc_url.username {
155 properties_with_secret.insert("user".to_owned(), username);
156 }
157 if let Some(password) = jdbc_url.password {
158 properties_with_secret.insert("password".to_owned(), password);
159 }
160 if let Some(table_name) = properties_with_secret.get("table.name") {
161 properties_with_secret.insert("table".to_owned(), table_name.clone());
162 }
163 if let Some(schema_name) = properties_with_secret.get("schema.name") {
164 properties_with_secret.insert("schema".to_owned(), schema_name.clone());
165 }
166 }
168
169 let connector = {
170 let sink_type = properties_with_secret
171 .get(CONNECTOR_TYPE_KEY)
172 .ok_or_else(|| {
173 StreamExecutorError::from((
174 SinkError::Config(anyhow!("missing config: {}", CONNECTOR_TYPE_KEY)),
175 sink_id.sink_id,
176 ))
177 })?;
178
179 let sink_type_str = sink_type.to_lowercase();
180 match_sink_name_str!(
181 sink_type_str.as_str(),
182 SinkType,
183 Ok(SinkType::SINK_NAME),
184 |other: &str| {
185 Err(StreamExecutorError::from((
186 SinkError::Config(anyhow!("unsupported sink connector {}", other)),
187 sink_id.sink_id,
188 )))
189 }
190 )?
191 };
192 let format_desc = match &sink_desc.format_desc {
193 Some(f) => Some(
195 f.clone()
196 .try_into()
197 .map_err(|e| StreamExecutorError::from((e, sink_id.sink_id)))?,
198 ),
199 None => match properties_with_secret.get(SINK_TYPE_OPTION) {
200 Some(t) => SinkFormatDesc::from_legacy_type(connector, t)
202 .map_err(|e| StreamExecutorError::from((e, sink_id.sink_id)))?,
203 None => None,
205 },
206 };
207
208 let format_desc_with_secret = SinkParam::fill_secret_for_format_desc(format_desc)
209 .map_err(|e| StreamExecutorError::from((e, sink_id.sink_id)))?;
210
211 let sink_param = SinkParam {
212 sink_id,
213 sink_name: sink_name.clone(),
214 properties: properties_with_secret,
215 columns: columns
216 .iter()
217 .filter(|col| !col.is_hidden)
218 .map(|col| col.column_desc.clone())
219 .collect(),
220 downstream_pk,
221 sink_type,
222 format_desc: format_desc_with_secret,
223 db_name,
224 sink_from_name,
225 };
226
227 let sink_write_param = SinkWriterParam {
228 executor_id: params.executor_id,
229 vnode_bitmap: params.vnode_bitmap.clone(),
230 meta_client: params.env.meta_client().map(SinkMetaClient::MetaClient),
231 extra_partition_col_idx: sink_desc.extra_partition_col_idx.map(|v| v as usize),
232
233 actor_id: params.actor_context.id,
234 sink_id,
235 sink_name,
236 connector: connector.to_owned(),
237 streaming_config: params.env.config().as_ref().clone(),
238 };
239
240 let log_store_identity = format!(
241 "sink[{}]-[{}]-executor[{}]",
242 connector, sink_id.sink_id, params.executor_id
243 );
244
245 let sink = build_sink(sink_param.clone())
246 .map_err(|e| StreamExecutorError::from((e, sink_param.sink_id.sink_id)))?;
247
248 let exec = match node.log_store_type() {
249 SinkLogStoreType::InMemoryLogStore | SinkLogStoreType::Unspecified => {
252 let factory = BoundedInMemLogStoreFactory::new(1, {
253 let state_store = state_store.clone();
254 let table_id = node.table.as_ref().map(|table| table.id);
255 move |epoch| {
256 async move {
257 if let Some(table_id) = table_id {
258 state_store
259 .try_wait_epoch(
260 HummockReadEpoch::Committed(epoch.prev),
261 TryWaitEpochOptions {
262 table_id: table_id.into(),
263 },
264 )
265 .await?;
266 }
267 Ok(())
268 }
269 .boxed()
270 }
271 });
272 SinkExecutor::new(
273 params.actor_context,
274 params.info.clone(),
275 input_executor,
276 sink_write_param,
277 sink,
278 sink_param,
279 columns,
280 factory,
281 chunk_size,
282 input_data_types,
283 node.rate_limit.map(|x| x as _),
284 )
285 .await?
286 .boxed()
287 }
288 SinkLogStoreType::KvLogStore => {
289 let metrics = KvLogStoreMetrics::new(
290 ¶ms.executor_stats,
291 params.actor_context.id,
292 &sink_param,
293 connector,
294 );
295
296 let table = node.table.as_ref().unwrap().clone();
297 let input_schema = input_executor.schema();
298 let pk_info = resolve_pk_info(input_schema, &table)?;
299
300 let factory = KvLogStoreFactory::new(
302 state_store,
303 table,
304 params.vnode_bitmap.clone().map(Arc::new),
305 65536,
306 params.env.config().developer.chunk_size,
307 metrics,
308 log_store_identity,
309 pk_info,
310 );
311
312 SinkExecutor::new(
313 params.actor_context,
314 params.info.clone(),
315 input_executor,
316 sink_write_param,
317 sink,
318 sink_param,
319 columns,
320 factory,
321 chunk_size,
322 input_data_types,
323 node.rate_limit.map(|x| x as _),
324 )
325 .await?
326 .boxed()
327 }
328 };
329
330 Ok((params.info, exec).into())
331 }
332}
333
334struct JdbcUrl {
335 host: String,
336 port: u16,
337 db_name: String,
338 username: Option<String>,
339 password: Option<String>,
340}
341
342fn parse_jdbc_url(url: &str) -> anyhow::Result<JdbcUrl> {
343 if !url.starts_with("jdbc:postgresql") {
344 bail!(
345 "invalid jdbc url, to switch to postgres rust connector, we need to use the url jdbc:postgresql://..."
346 )
347 }
348
349 let url = url.replace("jdbc:", "");
351
352 let url = Url::parse(&url).map_err(|e| anyhow!(e).context("failed to parse jdbc url"))?;
354
355 let scheme = url.scheme();
356 assert_eq!("postgresql", scheme, "jdbc scheme should be postgresql");
357 let host = url
358 .host_str()
359 .ok_or_else(|| anyhow!("missing host in jdbc url"))?;
360 let port = url
361 .port()
362 .ok_or_else(|| anyhow!("missing port in jdbc url"))?;
363 let Some(db_name) = url.path().strip_prefix('/') else {
364 bail!("missing db_name in jdbc url");
365 };
366 let mut username = None;
367 let mut password = None;
368 for (key, value) in url.query_pairs() {
369 if key == "user" {
370 username = Some(value.to_string());
371 }
372 if key == "password" {
373 password = Some(value.to_string());
374 }
375 }
376
377 Ok(JdbcUrl {
378 host: host.to_owned(),
379 port,
380 db_name: db_name.to_owned(),
381 username,
382 password,
383 })
384}
385
386#[cfg(test)]
387mod tests {
388 use super::*;
389
390 #[test]
391 fn test_parse_jdbc_url() {
392 let url = "jdbc:postgresql://localhost:5432/test?user=postgres&password=postgres";
393 let jdbc_url = parse_jdbc_url(url).unwrap();
394 assert_eq!(jdbc_url.host, "localhost");
395 assert_eq!(jdbc_url.port, 5432);
396 assert_eq!(jdbc_url.db_name, "test");
397 assert_eq!(jdbc_url.username, Some("postgres".to_owned()));
398 assert_eq!(jdbc_url.password, Some("postgres".to_owned()));
399 }
400}