risingwave_stream/from_proto/
sink.rs

1// Copyright 2022 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::sync::Arc;
16
17use anyhow::anyhow;
18use futures::FutureExt;
19use risingwave_common::bail;
20use risingwave_common::catalog::{ColumnCatalog, Schema};
21use risingwave_common::secret::LocalSecretManager;
22use risingwave_common::types::DataType;
23use risingwave_connector::match_sink_name_str;
24use risingwave_connector::sink::catalog::{SinkFormat, SinkFormatDesc, SinkId, SinkType};
25use risingwave_connector::sink::file_sink::fs::FsSink;
26use risingwave_connector::sink::iceberg::ICEBERG_SINK;
27use risingwave_connector::sink::{
28    CONNECTOR_TYPE_KEY, SINK_TYPE_OPTION, SinkError, SinkMetaClient, SinkParam, SinkWriterParam,
29    build_sink,
30};
31use risingwave_hummock_sdk::HummockReadEpoch;
32use risingwave_pb::catalog::Table;
33use risingwave_pb::plan_common::PbColumnCatalog;
34use risingwave_pb::stream_plan::{SinkLogStoreType, SinkNode};
35use risingwave_storage::store::TryWaitEpochOptions;
36use url::Url;
37
38use super::*;
39use crate::common::log_store_impl::in_mem::BoundedInMemLogStoreFactory;
40use crate::common::log_store_impl::kv_log_store::{
41    KV_LOG_STORE_V2_INFO, KvLogStoreFactory, KvLogStoreMetrics, KvLogStorePkInfo,
42};
43use crate::executor::{SinkExecutor, StreamExecutorError};
44
45pub struct SinkExecutorBuilder;
46
47fn resolve_pk_info(
48    input_schema: &Schema,
49    log_store_table: &Table,
50) -> StreamResult<&'static KvLogStorePkInfo> {
51    let predefined_column_len = log_store_table.columns.len() - input_schema.fields.len();
52
53    #[expect(deprecated)]
54    let info = match predefined_column_len {
55        len if len
56            == crate::common::log_store_impl::kv_log_store::KV_LOG_STORE_V1_INFO
57                .predefined_column_len() =>
58        {
59            Ok(&crate::common::log_store_impl::kv_log_store::KV_LOG_STORE_V1_INFO)
60        }
61        len if len == KV_LOG_STORE_V2_INFO.predefined_column_len() => Ok(&KV_LOG_STORE_V2_INFO),
62        other_len => Err(anyhow!(
63            "invalid log store predefined len {}. log store table: {:?}, input_schema: {:?}",
64            other_len,
65            log_store_table,
66            input_schema
67        )),
68    }?;
69    validate_payload_schema(
70        &log_store_table.columns[predefined_column_len..],
71        input_schema,
72    )?;
73    Ok(info)
74}
75
76fn validate_payload_schema(
77    log_store_payload_schema: &[PbColumnCatalog],
78    input_schema: &Schema,
79) -> StreamResult<()> {
80    if log_store_payload_schema
81        .iter()
82        .zip_eq(input_schema.fields.iter())
83        .all(|(log_store_col, input_field)| {
84            let log_store_col_type = DataType::from(
85                log_store_col
86                    .column_desc
87                    .as_ref()
88                    .unwrap()
89                    .column_type
90                    .as_ref()
91                    .unwrap(),
92            );
93            log_store_col_type.equals_datatype(&input_field.data_type)
94        })
95    {
96        Ok(())
97    } else {
98        Err(anyhow!(
99            "mismatch schema: log store: {:?}, input: {:?}",
100            log_store_payload_schema,
101            input_schema
102        )
103        .into())
104    }
105}
106
107impl ExecutorBuilder for SinkExecutorBuilder {
108    type Node = SinkNode;
109
110    async fn new_boxed_executor(
111        params: ExecutorParams,
112        node: &Self::Node,
113        state_store: impl StateStore,
114    ) -> StreamResult<Executor> {
115        let [input_executor]: [_; 1] = params.input.try_into().unwrap();
116        let input_data_types = input_executor.info().schema.data_types();
117        let chunk_size = params.config.developer.chunk_size;
118
119        let sink_desc = node.sink_desc.as_ref().unwrap();
120        let sink_id: SinkId = sink_desc.get_id();
121        let sink_name = sink_desc.get_name().to_owned();
122        let db_name = sink_desc.get_db_name().into();
123        let sink_from_name = sink_desc.get_sink_from_name().into();
124        let properties = sink_desc.get_properties().clone();
125        let secret_refs = sink_desc.get_secret_refs().clone();
126        let downstream_pk = if sink_desc.downstream_pk.is_empty() {
127            None
128        } else {
129            Some(
130                (sink_desc.downstream_pk.iter())
131                    .map(|idx| *idx as usize)
132                    .collect_vec(),
133            )
134        };
135        let columns = sink_desc
136            .column_catalogs
137            .clone()
138            .into_iter()
139            .map(ColumnCatalog::from)
140            .collect_vec();
141
142        let mut properties_with_secret =
143            LocalSecretManager::global().fill_secrets(properties, secret_refs)?;
144
145        if params.config.developer.switch_jdbc_pg_to_native
146            && let Some(connector_type) = properties_with_secret.get(CONNECTOR_TYPE_KEY)
147            && connector_type == "jdbc"
148            && let Some(url) = properties_with_secret.get("jdbc.url")
149            && url.starts_with("jdbc:postgresql:")
150        {
151            tracing::info!("switching to native postgres connector");
152            let jdbc_url = parse_jdbc_url(url)
153                .map_err(|e| StreamExecutorError::from((SinkError::Config(e), sink_id)))?;
154            properties_with_secret.insert(CONNECTOR_TYPE_KEY.to_owned(), "postgres".to_owned());
155            properties_with_secret.insert("host".to_owned(), jdbc_url.host);
156            properties_with_secret.insert("port".to_owned(), jdbc_url.port.to_string());
157            properties_with_secret.insert("database".to_owned(), jdbc_url.db_name);
158            if let Some(username) = jdbc_url.username {
159                properties_with_secret.insert("user".to_owned(), username);
160            }
161            if let Some(password) = jdbc_url.password {
162                properties_with_secret.insert("password".to_owned(), password);
163            }
164            if let Some(table_name) = properties_with_secret.get("table.name") {
165                properties_with_secret.insert("table".to_owned(), table_name.clone());
166            }
167            if let Some(schema_name) = properties_with_secret.get("schema.name") {
168                properties_with_secret.insert("schema".to_owned(), schema_name.clone());
169            }
170            // TODO(kwannoel): Do we need to handle jdbc.query.timeout?
171        }
172
173        let connector = {
174            let sink_type = properties_with_secret
175                .get(CONNECTOR_TYPE_KEY)
176                .ok_or_else(|| {
177                    StreamExecutorError::from((
178                        SinkError::Config(anyhow!("missing config: {}", CONNECTOR_TYPE_KEY)),
179                        sink_id,
180                    ))
181                })?;
182
183            let sink_type_str = sink_type.to_lowercase();
184            match_sink_name_str!(
185                sink_type_str.as_str(),
186                SinkType,
187                Ok(SinkType::SINK_NAME),
188                |other: &str| {
189                    Err(StreamExecutorError::from((
190                        SinkError::Config(anyhow!("unsupported sink connector {}", other)),
191                        sink_id,
192                    )))
193                }
194            )?
195        };
196        let format_desc = match &sink_desc.format_desc {
197            // Case A: new syntax `format ... encode ...`
198            Some(f) => Some(
199                f.clone()
200                    .try_into()
201                    .map_err(|e| StreamExecutorError::from((e, sink_id)))?,
202            ),
203            None => match properties_with_secret.get(SINK_TYPE_OPTION) {
204                // Case B: old syntax `type = '...'`
205                Some(t) => SinkFormatDesc::from_legacy_type(connector, t)
206                    .map_err(|e| StreamExecutorError::from((e, sink_id)))?,
207                // Case C: no format + encode required
208                None => None,
209            },
210        };
211
212        let format_desc = SinkParam::fill_secret_for_format_desc(format_desc)
213            .map_err(|e| StreamExecutorError::from((e, sink_id)))?;
214
215        // Backward compatibility: DEBEZIUM format should be treated as `Retract` type instead of `Upsert`.
216        let sink_type = if let Some(format_desc) = &format_desc
217            && format_desc.format == SinkFormat::Debezium
218        {
219            SinkType::Retract
220        } else {
221            let sink_type_from_proto = SinkType::from_proto(sink_desc.get_sink_type().unwrap());
222            // For backward compatibility: Iceberg sink with Upsert type should be treated as Retract type.
223            if connector.eq_ignore_ascii_case(ICEBERG_SINK)
224                && matches!(sink_type_from_proto, SinkType::Upsert)
225            {
226                SinkType::Retract
227            } else {
228                sink_type_from_proto
229            }
230        };
231        let ignore_delete = sink_desc.ignore_delete();
232
233        let sink_param = SinkParam {
234            sink_id,
235            sink_name: sink_name.clone(),
236            properties: properties_with_secret,
237            columns: columns
238                .iter()
239                .filter(|col| !col.is_hidden)
240                .map(|col| col.column_desc.clone())
241                .collect(),
242            downstream_pk,
243            sink_type,
244            ignore_delete,
245            format_desc,
246            db_name,
247            sink_from_name,
248        };
249
250        let sink_write_param = SinkWriterParam {
251            executor_id: params.executor_id,
252            vnode_bitmap: params.vnode_bitmap.clone(),
253            meta_client: params.env.meta_client().map(SinkMetaClient::MetaClient),
254            extra_partition_col_idx: sink_desc.extra_partition_col_idx.map(|v| v as usize),
255
256            actor_id: params.actor_context.id,
257            sink_id,
258            sink_name,
259            connector: connector.to_owned(),
260            streaming_config: params.config.as_ref().clone(),
261        };
262
263        let log_store_identity = format!(
264            "sink[{}]-[{}]-executor[{}]",
265            connector, sink_id, params.executor_id
266        );
267
268        let sink = build_sink(sink_param.clone())
269            .map_err(|e| StreamExecutorError::from((e, sink_param.sink_id)))?;
270
271        let exec = match node.log_store_type() {
272            // Default value is the normal in memory log store to be backward compatible with the
273            // previously unset value
274            SinkLogStoreType::InMemoryLogStore | SinkLogStoreType::Unspecified => {
275                let factory = BoundedInMemLogStoreFactory::new(1, {
276                    let state_store = state_store.clone();
277                    let table_id = node.table.as_ref().map(|table| table.id);
278                    move |epoch| {
279                        async move {
280                            if let Some(table_id) = table_id {
281                                state_store
282                                    .try_wait_epoch(
283                                        HummockReadEpoch::Committed(epoch.prev),
284                                        TryWaitEpochOptions { table_id },
285                                    )
286                                    .await?;
287                            }
288                            Ok(())
289                        }
290                        .boxed()
291                    }
292                });
293                SinkExecutor::new(
294                    params.actor_context,
295                    params.info.clone(),
296                    input_executor,
297                    sink_write_param,
298                    sink,
299                    sink_param,
300                    columns,
301                    factory,
302                    chunk_size,
303                    input_data_types,
304                    node.rate_limit.map(|x| x as _),
305                )
306                .await?
307                .boxed()
308            }
309            SinkLogStoreType::KvLogStore => {
310                let metrics = KvLogStoreMetrics::new(
311                    &params.executor_stats,
312                    params.actor_context.id,
313                    &sink_param,
314                    connector,
315                );
316
317                let table = node.table.as_ref().unwrap().clone();
318                let input_schema = input_executor.schema();
319                let pk_info = resolve_pk_info(input_schema, &table)?;
320
321                // TODO: support setting max row count in config
322                let factory = KvLogStoreFactory::new(
323                    state_store,
324                    table,
325                    params.vnode_bitmap.clone().map(Arc::new),
326                    65536,
327                    params.config.developer.chunk_size,
328                    metrics,
329                    log_store_identity,
330                    pk_info,
331                );
332
333                SinkExecutor::new(
334                    params.actor_context,
335                    params.info.clone(),
336                    input_executor,
337                    sink_write_param,
338                    sink,
339                    sink_param,
340                    columns,
341                    factory,
342                    chunk_size,
343                    input_data_types,
344                    node.rate_limit.map(|x| x as _),
345                )
346                .await?
347                .boxed()
348            }
349        };
350
351        Ok((params.info, exec).into())
352    }
353}
354
355struct JdbcUrl {
356    host: String,
357    port: u16,
358    db_name: String,
359    username: Option<String>,
360    password: Option<String>,
361}
362
363fn parse_jdbc_url(url: &str) -> anyhow::Result<JdbcUrl> {
364    if !url.starts_with("jdbc:postgresql") {
365        bail!(
366            "invalid jdbc url, to switch to postgres rust connector, we need to use the url jdbc:postgresql://..."
367        )
368    }
369
370    // trim the "jdbc:" prefix to make it a valid url
371    let url = url.replace("jdbc:", "");
372
373    // parse the url
374    let url = Url::parse(&url).map_err(|e| anyhow!(e).context("failed to parse jdbc url"))?;
375
376    let scheme = url.scheme();
377    assert_eq!("postgresql", scheme, "jdbc scheme should be postgresql");
378    let host = url
379        .host_str()
380        .ok_or_else(|| anyhow!("missing host in jdbc url"))?;
381    let port = url
382        .port()
383        .ok_or_else(|| anyhow!("missing port in jdbc url"))?;
384    let Some(db_name) = url.path().strip_prefix('/') else {
385        bail!("missing db_name in jdbc url");
386    };
387    let mut username = None;
388    let mut password = None;
389    for (key, value) in url.query_pairs() {
390        if key == "user" {
391            username = Some(value.to_string());
392        }
393        if key == "password" {
394            password = Some(value.to_string());
395        }
396    }
397
398    Ok(JdbcUrl {
399        host: host.to_owned(),
400        port,
401        db_name: db_name.to_owned(),
402        username,
403        password,
404    })
405}
406
407#[cfg(test)]
408mod tests {
409    use super::*;
410
411    #[test]
412    fn test_parse_jdbc_url() {
413        let url = "jdbc:postgresql://localhost:5432/test?user=postgres&password=postgres";
414        let jdbc_url = parse_jdbc_url(url).unwrap();
415        assert_eq!(jdbc_url.host, "localhost");
416        assert_eq!(jdbc_url.port, 5432);
417        assert_eq!(jdbc_url.db_name, "test");
418        assert_eq!(jdbc_url.username, Some("postgres".to_owned()));
419        assert_eq!(jdbc_url.password, Some("postgres".to_owned()));
420    }
421}