risingwave_stream/from_proto/
sink.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::sync::Arc;
16
17use anyhow::anyhow;
18use futures::FutureExt;
19use risingwave_common::bail;
20use risingwave_common::catalog::{ColumnCatalog, Schema};
21use risingwave_common::secret::LocalSecretManager;
22use risingwave_common::types::DataType;
23use risingwave_connector::match_sink_name_str;
24use risingwave_connector::sink::catalog::{SinkFormatDesc, SinkId, SinkType};
25use risingwave_connector::sink::file_sink::fs::FsSink;
26use risingwave_connector::sink::{
27    CONNECTOR_TYPE_KEY, SINK_TYPE_OPTION, SinkError, SinkMetaClient, SinkParam, SinkWriterParam,
28    build_sink,
29};
30use risingwave_hummock_sdk::HummockReadEpoch;
31use risingwave_pb::catalog::Table;
32use risingwave_pb::plan_common::PbColumnCatalog;
33use risingwave_pb::stream_plan::{SinkLogStoreType, SinkNode};
34use risingwave_storage::store::TryWaitEpochOptions;
35use url::Url;
36
37use super::*;
38use crate::common::log_store_impl::in_mem::BoundedInMemLogStoreFactory;
39use crate::common::log_store_impl::kv_log_store::{
40    KV_LOG_STORE_V2_INFO, KvLogStoreFactory, KvLogStoreMetrics, KvLogStorePkInfo,
41};
42use crate::executor::{SinkExecutor, StreamExecutorError};
43
44pub struct SinkExecutorBuilder;
45
46fn resolve_pk_info(
47    input_schema: &Schema,
48    log_store_table: &Table,
49) -> StreamResult<&'static KvLogStorePkInfo> {
50    let predefined_column_len = log_store_table.columns.len() - input_schema.fields.len();
51
52    #[expect(deprecated)]
53    let info = match predefined_column_len {
54        len if len
55            == crate::common::log_store_impl::kv_log_store::KV_LOG_STORE_V1_INFO
56                .predefined_column_len() =>
57        {
58            Ok(&crate::common::log_store_impl::kv_log_store::KV_LOG_STORE_V1_INFO)
59        }
60        len if len == KV_LOG_STORE_V2_INFO.predefined_column_len() => Ok(&KV_LOG_STORE_V2_INFO),
61        other_len => Err(anyhow!(
62            "invalid log store predefined len {}. log store table: {:?}, input_schema: {:?}",
63            other_len,
64            log_store_table,
65            input_schema
66        )),
67    }?;
68    validate_payload_schema(
69        &log_store_table.columns[predefined_column_len..],
70        input_schema,
71    )?;
72    Ok(info)
73}
74
75fn validate_payload_schema(
76    log_store_payload_schema: &[PbColumnCatalog],
77    input_schema: &Schema,
78) -> StreamResult<()> {
79    if log_store_payload_schema
80        .iter()
81        .zip_eq(input_schema.fields.iter())
82        .all(|(log_store_col, input_field)| {
83            let log_store_col_type = DataType::from(
84                log_store_col
85                    .column_desc
86                    .as_ref()
87                    .unwrap()
88                    .column_type
89                    .as_ref()
90                    .unwrap(),
91            );
92            log_store_col_type.equals_datatype(&input_field.data_type)
93        })
94    {
95        Ok(())
96    } else {
97        Err(anyhow!(
98            "mismatch schema: log store: {:?}, input: {:?}",
99            log_store_payload_schema,
100            input_schema
101        )
102        .into())
103    }
104}
105
106impl ExecutorBuilder for SinkExecutorBuilder {
107    type Node = SinkNode;
108
109    async fn new_boxed_executor(
110        params: ExecutorParams,
111        node: &Self::Node,
112        state_store: impl StateStore,
113    ) -> StreamResult<Executor> {
114        let [input_executor]: [_; 1] = params.input.try_into().unwrap();
115        let input_data_types = input_executor.info().schema.data_types();
116        let chunk_size = params.env.config().developer.chunk_size;
117
118        let sink_desc = node.sink_desc.as_ref().unwrap();
119        let sink_type = SinkType::from_proto(sink_desc.get_sink_type().unwrap());
120        let sink_id: SinkId = sink_desc.get_id().into();
121        let sink_name = sink_desc.get_name().to_owned();
122        let db_name = sink_desc.get_db_name().into();
123        let sink_from_name = sink_desc.get_sink_from_name().into();
124        let properties = sink_desc.get_properties().clone();
125        let secret_refs = sink_desc.get_secret_refs().clone();
126        let downstream_pk = sink_desc
127            .downstream_pk
128            .iter()
129            .map(|i| *i as usize)
130            .collect_vec();
131        let columns = sink_desc
132            .column_catalogs
133            .clone()
134            .into_iter()
135            .map(ColumnCatalog::from)
136            .collect_vec();
137
138        let mut properties_with_secret =
139            LocalSecretManager::global().fill_secrets(properties, secret_refs)?;
140
141        if params.env.config().developer.switch_jdbc_pg_to_native
142            && let Some(connector_type) = properties_with_secret.get(CONNECTOR_TYPE_KEY)
143            && connector_type == "jdbc"
144            && let Some(url) = properties_with_secret.get("jdbc.url")
145            && url.starts_with("jdbc:postgresql:")
146        {
147            tracing::info!("switching to native postgres connector");
148            let jdbc_url = parse_jdbc_url(url)
149                .map_err(|e| StreamExecutorError::from((SinkError::Config(e), sink_id.sink_id)))?;
150            properties_with_secret.insert(CONNECTOR_TYPE_KEY.to_owned(), "postgres".to_owned());
151            properties_with_secret.insert("host".to_owned(), jdbc_url.host);
152            properties_with_secret.insert("port".to_owned(), jdbc_url.port.to_string());
153            properties_with_secret.insert("database".to_owned(), jdbc_url.db_name);
154            if let Some(username) = jdbc_url.username {
155                properties_with_secret.insert("user".to_owned(), username);
156            }
157            if let Some(password) = jdbc_url.password {
158                properties_with_secret.insert("password".to_owned(), password);
159            }
160            if let Some(table_name) = properties_with_secret.get("table.name") {
161                properties_with_secret.insert("table".to_owned(), table_name.clone());
162            }
163            if let Some(schema_name) = properties_with_secret.get("schema.name") {
164                properties_with_secret.insert("schema".to_owned(), schema_name.clone());
165            }
166            // TODO(kwannoel): Do we need to handle jdbc.query.timeout?
167        }
168
169        let connector = {
170            let sink_type = properties_with_secret
171                .get(CONNECTOR_TYPE_KEY)
172                .ok_or_else(|| {
173                    StreamExecutorError::from((
174                        SinkError::Config(anyhow!("missing config: {}", CONNECTOR_TYPE_KEY)),
175                        sink_id.sink_id,
176                    ))
177                })?;
178
179            let sink_type_str = sink_type.to_lowercase();
180            match_sink_name_str!(
181                sink_type_str.as_str(),
182                SinkType,
183                Ok(SinkType::SINK_NAME),
184                |other: &str| {
185                    Err(StreamExecutorError::from((
186                        SinkError::Config(anyhow!("unsupported sink connector {}", other)),
187                        sink_id.sink_id,
188                    )))
189                }
190            )?
191        };
192        let format_desc = match &sink_desc.format_desc {
193            // Case A: new syntax `format ... encode ...`
194            Some(f) => Some(
195                f.clone()
196                    .try_into()
197                    .map_err(|e| StreamExecutorError::from((e, sink_id.sink_id)))?,
198            ),
199            None => match properties_with_secret.get(SINK_TYPE_OPTION) {
200                // Case B: old syntax `type = '...'`
201                Some(t) => SinkFormatDesc::from_legacy_type(connector, t)
202                    .map_err(|e| StreamExecutorError::from((e, sink_id.sink_id)))?,
203                // Case C: no format + encode required
204                None => None,
205            },
206        };
207
208        let format_desc_with_secret = SinkParam::fill_secret_for_format_desc(format_desc)
209            .map_err(|e| StreamExecutorError::from((e, sink_id.sink_id)))?;
210
211        let sink_param = SinkParam {
212            sink_id,
213            sink_name: sink_name.clone(),
214            properties: properties_with_secret,
215            columns: columns
216                .iter()
217                .filter(|col| !col.is_hidden)
218                .map(|col| col.column_desc.clone())
219                .collect(),
220            downstream_pk,
221            sink_type,
222            format_desc: format_desc_with_secret,
223            db_name,
224            sink_from_name,
225        };
226
227        let sink_write_param = SinkWriterParam {
228            executor_id: params.executor_id,
229            vnode_bitmap: params.vnode_bitmap.clone(),
230            meta_client: params.env.meta_client().map(SinkMetaClient::MetaClient),
231            extra_partition_col_idx: sink_desc.extra_partition_col_idx.map(|v| v as usize),
232
233            actor_id: params.actor_context.id,
234            sink_id,
235            sink_name,
236            connector: connector.to_owned(),
237            streaming_config: params.env.config().as_ref().clone(),
238        };
239
240        let log_store_identity = format!(
241            "sink[{}]-[{}]-executor[{}]",
242            connector, sink_id.sink_id, params.executor_id
243        );
244
245        let sink = build_sink(sink_param.clone())
246            .map_err(|e| StreamExecutorError::from((e, sink_param.sink_id.sink_id)))?;
247
248        let exec = match node.log_store_type() {
249            // Default value is the normal in memory log store to be backward compatible with the
250            // previously unset value
251            SinkLogStoreType::InMemoryLogStore | SinkLogStoreType::Unspecified => {
252                let factory = BoundedInMemLogStoreFactory::new(1, {
253                    let state_store = state_store.clone();
254                    let table_id = node.table.as_ref().map(|table| table.id);
255                    move |epoch| {
256                        async move {
257                            if let Some(table_id) = table_id {
258                                state_store
259                                    .try_wait_epoch(
260                                        HummockReadEpoch::Committed(epoch.prev),
261                                        TryWaitEpochOptions {
262                                            table_id: table_id.into(),
263                                        },
264                                    )
265                                    .await?;
266                            }
267                            Ok(())
268                        }
269                        .boxed()
270                    }
271                });
272                SinkExecutor::new(
273                    params.actor_context,
274                    params.info.clone(),
275                    input_executor,
276                    sink_write_param,
277                    sink,
278                    sink_param,
279                    columns,
280                    factory,
281                    chunk_size,
282                    input_data_types,
283                    node.rate_limit.map(|x| x as _),
284                )
285                .await?
286                .boxed()
287            }
288            SinkLogStoreType::KvLogStore => {
289                let metrics = KvLogStoreMetrics::new(
290                    &params.executor_stats,
291                    params.actor_context.id,
292                    &sink_param,
293                    connector,
294                );
295
296                let table = node.table.as_ref().unwrap().clone();
297                let input_schema = input_executor.schema();
298                let pk_info = resolve_pk_info(input_schema, &table)?;
299
300                // TODO: support setting max row count in config
301                let factory = KvLogStoreFactory::new(
302                    state_store,
303                    table,
304                    params.vnode_bitmap.clone().map(Arc::new),
305                    65536,
306                    params.env.config().developer.chunk_size,
307                    metrics,
308                    log_store_identity,
309                    pk_info,
310                );
311
312                SinkExecutor::new(
313                    params.actor_context,
314                    params.info.clone(),
315                    input_executor,
316                    sink_write_param,
317                    sink,
318                    sink_param,
319                    columns,
320                    factory,
321                    chunk_size,
322                    input_data_types,
323                    node.rate_limit.map(|x| x as _),
324                )
325                .await?
326                .boxed()
327            }
328        };
329
330        Ok((params.info, exec).into())
331    }
332}
333
334struct JdbcUrl {
335    host: String,
336    port: u16,
337    db_name: String,
338    username: Option<String>,
339    password: Option<String>,
340}
341
342fn parse_jdbc_url(url: &str) -> anyhow::Result<JdbcUrl> {
343    if !url.starts_with("jdbc:postgresql") {
344        bail!(
345            "invalid jdbc url, to switch to postgres rust connector, we need to use the url jdbc:postgresql://..."
346        )
347    }
348
349    // trim the "jdbc:" prefix to make it a valid url
350    let url = url.replace("jdbc:", "");
351
352    // parse the url
353    let url = Url::parse(&url).map_err(|e| anyhow!(e).context("failed to parse jdbc url"))?;
354
355    let scheme = url.scheme();
356    assert_eq!("postgresql", scheme, "jdbc scheme should be postgresql");
357    let host = url
358        .host_str()
359        .ok_or_else(|| anyhow!("missing host in jdbc url"))?;
360    let port = url
361        .port()
362        .ok_or_else(|| anyhow!("missing port in jdbc url"))?;
363    let Some(db_name) = url.path().strip_prefix('/') else {
364        bail!("missing db_name in jdbc url");
365    };
366    let mut username = None;
367    let mut password = None;
368    for (key, value) in url.query_pairs() {
369        if key == "user" {
370            username = Some(value.to_string());
371        }
372        if key == "password" {
373            password = Some(value.to_string());
374        }
375    }
376
377    Ok(JdbcUrl {
378        host: host.to_owned(),
379        port,
380        db_name: db_name.to_owned(),
381        username,
382        password,
383    })
384}
385
386#[cfg(test)]
387mod tests {
388    use super::*;
389
390    #[test]
391    fn test_parse_jdbc_url() {
392        let url = "jdbc:postgresql://localhost:5432/test?user=postgres&password=postgres";
393        let jdbc_url = parse_jdbc_url(url).unwrap();
394        assert_eq!(jdbc_url.host, "localhost");
395        assert_eq!(jdbc_url.port, 5432);
396        assert_eq!(jdbc_url.db_name, "test");
397        assert_eq!(jdbc_url.username, Some("postgres".to_owned()));
398        assert_eq!(jdbc_url.password, Some("postgres".to_owned()));
399    }
400}