risingwave_stream/from_proto/
sink.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::sync::Arc;
16
17use anyhow::anyhow;
18use risingwave_common::bail;
19use risingwave_common::catalog::{ColumnCatalog, Schema};
20use risingwave_common::secret::LocalSecretManager;
21use risingwave_common::types::DataType;
22use risingwave_connector::match_sink_name_str;
23use risingwave_connector::sink::catalog::{SinkFormatDesc, SinkId, SinkType};
24use risingwave_connector::sink::file_sink::fs::FsSink;
25use risingwave_connector::sink::{
26    CONNECTOR_TYPE_KEY, SINK_TYPE_OPTION, SinkError, SinkMetaClient, SinkParam, SinkWriterParam,
27    build_sink,
28};
29use risingwave_pb::catalog::Table;
30use risingwave_pb::plan_common::PbColumnCatalog;
31use risingwave_pb::stream_plan::{SinkLogStoreType, SinkNode};
32use url::Url;
33
34use super::*;
35use crate::common::log_store_impl::in_mem::BoundedInMemLogStoreFactory;
36use crate::common::log_store_impl::kv_log_store::{
37    KV_LOG_STORE_V2_INFO, KvLogStoreFactory, KvLogStoreMetrics, KvLogStorePkInfo,
38};
39use crate::executor::{SinkExecutor, StreamExecutorError};
40
41pub struct SinkExecutorBuilder;
42
43fn resolve_pk_info(
44    input_schema: &Schema,
45    log_store_table: &Table,
46) -> StreamResult<&'static KvLogStorePkInfo> {
47    let predefined_column_len = log_store_table.columns.len() - input_schema.fields.len();
48
49    #[expect(deprecated)]
50    let info = match predefined_column_len {
51        len if len
52            == crate::common::log_store_impl::kv_log_store::KV_LOG_STORE_V1_INFO
53                .predefined_column_len() =>
54        {
55            Ok(&crate::common::log_store_impl::kv_log_store::KV_LOG_STORE_V1_INFO)
56        }
57        len if len == KV_LOG_STORE_V2_INFO.predefined_column_len() => Ok(&KV_LOG_STORE_V2_INFO),
58        other_len => Err(anyhow!(
59            "invalid log store predefined len {}. log store table: {:?}, input_schema: {:?}",
60            other_len,
61            log_store_table,
62            input_schema
63        )),
64    }?;
65    validate_payload_schema(
66        &log_store_table.columns[predefined_column_len..],
67        input_schema,
68    )?;
69    Ok(info)
70}
71
72fn validate_payload_schema(
73    log_store_payload_schema: &[PbColumnCatalog],
74    input_schema: &Schema,
75) -> StreamResult<()> {
76    if log_store_payload_schema
77        .iter()
78        .zip_eq(input_schema.fields.iter())
79        .all(|(log_store_col, input_field)| {
80            let log_store_col_type = DataType::from(
81                log_store_col
82                    .column_desc
83                    .as_ref()
84                    .unwrap()
85                    .column_type
86                    .as_ref()
87                    .unwrap(),
88            );
89            log_store_col_type.equals_datatype(&input_field.data_type)
90        })
91    {
92        Ok(())
93    } else {
94        Err(anyhow!(
95            "mismatch schema: log store: {:?}, input: {:?}",
96            log_store_payload_schema,
97            input_schema
98        )
99        .into())
100    }
101}
102
103impl ExecutorBuilder for SinkExecutorBuilder {
104    type Node = SinkNode;
105
106    async fn new_boxed_executor(
107        params: ExecutorParams,
108        node: &Self::Node,
109        state_store: impl StateStore,
110    ) -> StreamResult<Executor> {
111        let [input_executor]: [_; 1] = params.input.try_into().unwrap();
112        let input_data_types = input_executor.info().schema.data_types();
113        let chunk_size = params.env.config().developer.chunk_size;
114
115        let sink_desc = node.sink_desc.as_ref().unwrap();
116        let sink_type = SinkType::from_proto(sink_desc.get_sink_type().unwrap());
117        let sink_id: SinkId = sink_desc.get_id().into();
118        let sink_name = sink_desc.get_name().to_owned();
119        let db_name = sink_desc.get_db_name().into();
120        let sink_from_name = sink_desc.get_sink_from_name().into();
121        let properties = sink_desc.get_properties().clone();
122        let secret_refs = sink_desc.get_secret_refs().clone();
123        let downstream_pk = sink_desc
124            .downstream_pk
125            .iter()
126            .map(|i| *i as usize)
127            .collect_vec();
128        let columns = sink_desc
129            .column_catalogs
130            .clone()
131            .into_iter()
132            .map(ColumnCatalog::from)
133            .collect_vec();
134
135        let mut properties_with_secret =
136            LocalSecretManager::global().fill_secrets(properties, secret_refs)?;
137
138        if params.env.config().developer.switch_jdbc_pg_to_native
139            && let Some(connector_type) = properties_with_secret.get(CONNECTOR_TYPE_KEY)
140            && connector_type == "jdbc"
141            && let Some(url) = properties_with_secret.get("jdbc.url")
142            && url.starts_with("jdbc:postgresql:")
143        {
144            tracing::info!("switching to native postgres connector");
145            let jdbc_url = parse_jdbc_url(url)
146                .map_err(|e| StreamExecutorError::from((SinkError::Config(e), sink_id.sink_id)))?;
147            properties_with_secret.insert(CONNECTOR_TYPE_KEY.to_owned(), "postgres".to_owned());
148            properties_with_secret.insert("host".to_owned(), jdbc_url.host);
149            properties_with_secret.insert("port".to_owned(), jdbc_url.port.to_string());
150            properties_with_secret.insert("database".to_owned(), jdbc_url.db_name);
151            if let Some(username) = jdbc_url.username {
152                properties_with_secret.insert("user".to_owned(), username);
153            }
154            if let Some(password) = jdbc_url.password {
155                properties_with_secret.insert("password".to_owned(), password);
156            }
157            if let Some(table_name) = properties_with_secret.get("table.name") {
158                properties_with_secret.insert("table".to_owned(), table_name.clone());
159            }
160            if let Some(schema_name) = properties_with_secret.get("schema.name") {
161                properties_with_secret.insert("schema".to_owned(), schema_name.clone());
162            }
163            // TODO(kwannoel): Do we need to handle jdbc.query.timeout?
164        }
165
166        let connector = {
167            let sink_type = properties_with_secret
168                .get(CONNECTOR_TYPE_KEY)
169                .ok_or_else(|| {
170                    StreamExecutorError::from((
171                        SinkError::Config(anyhow!("missing config: {}", CONNECTOR_TYPE_KEY)),
172                        sink_id.sink_id,
173                    ))
174                })?;
175
176            let sink_type_str = sink_type.to_string().to_lowercase();
177            match_sink_name_str!(
178                sink_type_str.as_str(),
179                SinkType,
180                Ok(SinkType::SINK_NAME),
181                |other: &str| {
182                    Err(StreamExecutorError::from((
183                        SinkError::Config(anyhow!("unsupported sink connector {}", other)),
184                        sink_id.sink_id,
185                    )))
186                }
187            )?
188        };
189        let format_desc = match &sink_desc.format_desc {
190            // Case A: new syntax `format ... encode ...`
191            Some(f) => Some(
192                f.clone()
193                    .try_into()
194                    .map_err(|e| StreamExecutorError::from((e, sink_id.sink_id)))?,
195            ),
196            None => match properties_with_secret.get(SINK_TYPE_OPTION) {
197                // Case B: old syntax `type = '...'`
198                Some(t) => SinkFormatDesc::from_legacy_type(connector, t)
199                    .map_err(|e| StreamExecutorError::from((e, sink_id.sink_id)))?,
200                // Case C: no format + encode required
201                None => None,
202            },
203        };
204
205        let format_desc_with_secret = SinkParam::fill_secret_for_format_desc(format_desc)
206            .map_err(|e| StreamExecutorError::from((e, sink_id.sink_id)))?;
207
208        let sink_param = SinkParam {
209            sink_id,
210            sink_name: sink_name.clone(),
211            properties: properties_with_secret,
212            columns: columns
213                .iter()
214                .filter(|col| !col.is_hidden)
215                .map(|col| col.column_desc.clone())
216                .collect(),
217            downstream_pk,
218            sink_type,
219            format_desc: format_desc_with_secret,
220            db_name,
221            sink_from_name,
222        };
223
224        let sink_write_param = SinkWriterParam {
225            executor_id: params.executor_id,
226            vnode_bitmap: params.vnode_bitmap.clone(),
227            meta_client: params.env.meta_client().map(SinkMetaClient::MetaClient),
228            extra_partition_col_idx: sink_desc.extra_partition_col_idx.map(|v| v as usize),
229
230            actor_id: params.actor_context.id,
231            sink_id,
232            sink_name,
233            connector: connector.to_owned(),
234            streaming_config: params.env.config().as_ref().clone(),
235        };
236
237        let log_store_identity = format!(
238            "sink[{}]-[{}]-executor[{}]",
239            connector, sink_id.sink_id, params.executor_id
240        );
241
242        let sink = build_sink(sink_param.clone())
243            .map_err(|e| StreamExecutorError::from((e, sink_param.sink_id.sink_id)))?;
244
245        let exec = match node.log_store_type() {
246            // Default value is the normal in memory log store to be backward compatible with the
247            // previously unset value
248            SinkLogStoreType::InMemoryLogStore | SinkLogStoreType::Unspecified => {
249                let factory = BoundedInMemLogStoreFactory::new(1);
250                SinkExecutor::new(
251                    params.actor_context,
252                    params.info.clone(),
253                    input_executor,
254                    sink_write_param,
255                    sink,
256                    sink_param,
257                    columns,
258                    factory,
259                    chunk_size,
260                    input_data_types,
261                    node.rate_limit.map(|x| x as _),
262                )
263                .await?
264                .boxed()
265            }
266            SinkLogStoreType::KvLogStore => {
267                let metrics = KvLogStoreMetrics::new(
268                    &params.executor_stats,
269                    params.actor_context.id,
270                    &sink_param,
271                    connector,
272                );
273
274                let table = node.table.as_ref().unwrap().clone();
275                let input_schema = input_executor.schema();
276                let pk_info = resolve_pk_info(input_schema, &table)?;
277
278                // TODO: support setting max row count in config
279                let factory = KvLogStoreFactory::new(
280                    state_store,
281                    table,
282                    params.vnode_bitmap.clone().map(Arc::new),
283                    65536,
284                    metrics,
285                    log_store_identity,
286                    pk_info,
287                );
288
289                SinkExecutor::new(
290                    params.actor_context,
291                    params.info.clone(),
292                    input_executor,
293                    sink_write_param,
294                    sink,
295                    sink_param,
296                    columns,
297                    factory,
298                    chunk_size,
299                    input_data_types,
300                    node.rate_limit.map(|x| x as _),
301                )
302                .await?
303                .boxed()
304            }
305        };
306
307        Ok((params.info, exec).into())
308    }
309}
310
311struct JdbcUrl {
312    host: String,
313    port: u16,
314    db_name: String,
315    username: Option<String>,
316    password: Option<String>,
317}
318
319fn parse_jdbc_url(url: &str) -> anyhow::Result<JdbcUrl> {
320    if !url.starts_with("jdbc:postgresql") {
321        bail!(
322            "invalid jdbc url, to switch to postgres rust connector, we need to use the url jdbc:postgresql://..."
323        )
324    }
325
326    // trim the "jdbc:" prefix to make it a valid url
327    let url = url.replace("jdbc:", "");
328
329    // parse the url
330    let url = Url::parse(&url).map_err(|e| anyhow!(e).context("failed to parse jdbc url"))?;
331
332    let scheme = url.scheme();
333    assert_eq!("postgresql", scheme, "jdbc scheme should be postgresql");
334    let host = url
335        .host_str()
336        .ok_or_else(|| anyhow!("missing host in jdbc url"))?;
337    let port = url
338        .port()
339        .ok_or_else(|| anyhow!("missing port in jdbc url"))?;
340    let Some(db_name) = url.path().strip_prefix('/') else {
341        bail!("missing db_name in jdbc url");
342    };
343    let mut username = None;
344    let mut password = None;
345    for (key, value) in url.query_pairs() {
346        if key == "user" {
347            username = Some(value.to_string());
348        }
349        if key == "password" {
350            password = Some(value.to_string());
351        }
352    }
353
354    Ok(JdbcUrl {
355        host: host.to_owned(),
356        port,
357        db_name: db_name.to_owned(),
358        username,
359        password,
360    })
361}
362
363#[cfg(test)]
364mod tests {
365    use super::*;
366
367    #[test]
368    fn test_parse_jdbc_url() {
369        let url = "jdbc:postgresql://localhost:5432/test?user=postgres&password=postgres";
370        let jdbc_url = parse_jdbc_url(url).unwrap();
371        assert_eq!(jdbc_url.host, "localhost");
372        assert_eq!(jdbc_url.port, 5432);
373        assert_eq!(jdbc_url.db_name, "test");
374        assert_eq!(jdbc_url.username, Some("postgres".to_owned()));
375        assert_eq!(jdbc_url.password, Some("postgres".to_owned()));
376    }
377}