Skip to main content

risingwave_stream/from_proto/
sink.rs

1// Copyright 2022 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::BTreeMap;
16use std::sync::Arc;
17
18use anyhow::anyhow;
19use futures::FutureExt;
20use risingwave_common::bail;
21use risingwave_common::catalog::{ColumnCatalog, Schema};
22use risingwave_common::secret::LocalSecretManager;
23use risingwave_common::types::DataType;
24use risingwave_connector::match_sink_name_str;
25use risingwave_connector::sink::catalog::{SinkFormat, SinkFormatDesc, SinkId, SinkType};
26use risingwave_connector::sink::file_sink::fs::FsSink;
27use risingwave_connector::sink::iceberg::ICEBERG_SINK;
28use risingwave_connector::sink::{
29    CONNECTOR_TYPE_KEY, SINK_TYPE_OPTION, SinkError, SinkMetaClient, SinkParam, SinkWriterParam,
30    build_sink,
31};
32use risingwave_hummock_sdk::HummockReadEpoch;
33use risingwave_pb::catalog::Table;
34use risingwave_pb::plan_common::PbColumnCatalog;
35use risingwave_pb::stream_plan::{PbSinkDesc, SinkLogStoreType, SinkNode};
36use risingwave_storage::store::TryWaitEpochOptions;
37use url::Url;
38
39use super::*;
40use crate::common::log_store_impl::in_mem::BoundedInMemLogStoreFactory;
41use crate::common::log_store_impl::kv_log_store::{
42    KV_LOG_STORE_V2_INFO, KvLogStoreFactory, KvLogStoreMetrics, KvLogStorePkInfo,
43};
44use crate::executor::{SinkExecutor, StreamExecutorError};
45
46/// Build [`SinkParam`] from a [`PbSinkDesc`], along with the full [`ColumnCatalog`] list for
47/// downstream use. `connector` is the resolved connector name (e.g. `ICEBERG_SINK`), used for
48/// legacy `type = '...'` format/encode fallback and connector-specific backward-compatibility
49/// conversions.
50///
51/// The caller is responsible for preparing `properties_with_secret` (i.e. the result of
52/// [`LocalSecretManager::fill_secrets`] plus any connector-specific rewrites such as the
53/// JDBC-to-native-postgres switch).
54pub(super) fn build_sink_param(
55    sink_desc: &PbSinkDesc,
56    properties_with_secret: BTreeMap<String, String>,
57    connector: &str,
58) -> Result<(SinkParam, Vec<ColumnCatalog>), StreamExecutorError> {
59    let sink_id: SinkId = sink_desc.get_id();
60    let sink_name = sink_desc.get_name().to_owned();
61    let db_name = sink_desc.get_db_name().into();
62    let sink_from_name = sink_desc.get_sink_from_name().into();
63    let downstream_pk = if sink_desc.downstream_pk.is_empty() {
64        None
65    } else {
66        Some(
67            (sink_desc.downstream_pk.iter())
68                .map(|idx| *idx as usize)
69                .collect_vec(),
70        )
71    };
72    let columns = sink_desc
73        .column_catalogs
74        .clone()
75        .into_iter()
76        .map(ColumnCatalog::from)
77        .collect_vec();
78
79    let format_desc = match &sink_desc.format_desc {
80        // Case A: new syntax `format ... encode ...`
81        Some(f) => Some(
82            f.clone()
83                .try_into()
84                .map_err(|e| StreamExecutorError::from((e, sink_id)))?,
85        ),
86        None => match properties_with_secret.get(SINK_TYPE_OPTION) {
87            // Case B: old syntax `type = '...'`
88            Some(t) => SinkFormatDesc::from_legacy_type(connector, t)
89                .map_err(|e| StreamExecutorError::from((e, sink_id)))?,
90            // Case C: no format + encode required
91            None => None,
92        },
93    };
94    let format_desc = SinkParam::fill_secret_for_format_desc(format_desc)
95        .map_err(|e| StreamExecutorError::from((e, sink_id)))?;
96
97    // Backward compatibility: DEBEZIUM format should be treated as `Retract` type instead of `Upsert`.
98    let sink_type = if let Some(format_desc) = &format_desc
99        && format_desc.format == SinkFormat::Debezium
100    {
101        SinkType::Retract
102    } else {
103        let sink_type_from_proto = SinkType::from_proto(sink_desc.get_sink_type().unwrap());
104        // For backward compatibility: Iceberg sink with Upsert type should be treated as Retract type.
105        if connector.eq_ignore_ascii_case(ICEBERG_SINK)
106            && matches!(sink_type_from_proto, SinkType::Upsert)
107        {
108            SinkType::Retract
109        } else {
110            sink_type_from_proto
111        }
112    };
113    let ignore_delete = sink_desc.ignore_delete();
114
115    let sink_param = SinkParam {
116        sink_id,
117        sink_name,
118        properties: properties_with_secret,
119        columns: columns
120            .iter()
121            .filter(|col| !col.is_hidden)
122            .map(|col| col.column_desc.clone())
123            .collect(),
124        downstream_pk,
125        sink_type,
126        ignore_delete,
127        format_desc,
128        db_name,
129        sink_from_name,
130    };
131
132    Ok((sink_param, columns))
133}
134
135pub struct SinkExecutorBuilder;
136
137fn resolve_pk_info(
138    input_schema: &Schema,
139    log_store_table: &Table,
140) -> StreamResult<&'static KvLogStorePkInfo> {
141    let predefined_column_len = log_store_table.columns.len() - input_schema.fields.len();
142
143    #[expect(deprecated)]
144    let info = match predefined_column_len {
145        len if len
146            == crate::common::log_store_impl::kv_log_store::KV_LOG_STORE_V1_INFO
147                .predefined_column_len() =>
148        {
149            Ok(&crate::common::log_store_impl::kv_log_store::KV_LOG_STORE_V1_INFO)
150        }
151        len if len == KV_LOG_STORE_V2_INFO.predefined_column_len() => Ok(&KV_LOG_STORE_V2_INFO),
152        other_len => Err(anyhow!(
153            "invalid log store predefined len {}. log store table: {:?}, input_schema: {:?}",
154            other_len,
155            log_store_table,
156            input_schema
157        )),
158    }?;
159    validate_payload_schema(
160        &log_store_table.columns[predefined_column_len..],
161        input_schema,
162    )?;
163    Ok(info)
164}
165
166fn validate_payload_schema(
167    log_store_payload_schema: &[PbColumnCatalog],
168    input_schema: &Schema,
169) -> StreamResult<()> {
170    if log_store_payload_schema
171        .iter()
172        .zip_eq(input_schema.fields.iter())
173        .all(|(log_store_col, input_field)| {
174            let log_store_col_type = DataType::from(
175                log_store_col
176                    .column_desc
177                    .as_ref()
178                    .unwrap()
179                    .column_type
180                    .as_ref()
181                    .unwrap(),
182            );
183            log_store_col_type.equals_datatype(&input_field.data_type)
184        })
185    {
186        Ok(())
187    } else {
188        Err(anyhow!(
189            "mismatch schema: log store: {:?}, input: {:?}",
190            log_store_payload_schema,
191            input_schema
192        )
193        .into())
194    }
195}
196
197impl_stream_node_body!(Sink(SinkNode) => SinkExecutorBuilder);
198
199impl ExecutorBuilder for SinkExecutorBuilder {
200    type Node = SinkNode;
201
202    async fn new_boxed_executor(
203        params: ExecutorParams,
204        node: &Self::Node,
205        state_store: impl StateStore,
206    ) -> StreamResult<Executor> {
207        let [input_executor]: [_; 1] = params.input.try_into().unwrap();
208        let input_data_types = input_executor.info().schema.data_types();
209        let chunk_size = params.config.developer.chunk_size;
210
211        let sink_desc = node.sink_desc.as_ref().unwrap();
212        let sink_id: SinkId = sink_desc.get_id();
213        let sink_name = sink_desc.get_name().to_owned();
214        let properties = sink_desc.get_properties().clone();
215        let secret_refs = sink_desc.get_secret_refs().clone();
216
217        let mut properties_with_secret =
218            LocalSecretManager::global().fill_secrets(properties, secret_refs)?;
219
220        if params.config.developer.switch_jdbc_pg_to_native
221            && let Some(connector_type) = properties_with_secret.get(CONNECTOR_TYPE_KEY)
222            && connector_type == "jdbc"
223            && let Some(url) = properties_with_secret.get("jdbc.url")
224            && url.starts_with("jdbc:postgresql:")
225        {
226            tracing::info!("switching to native postgres connector");
227            let jdbc_url = parse_jdbc_url(url)
228                .map_err(|e| StreamExecutorError::from((SinkError::Config(e), sink_id)))?;
229            properties_with_secret.insert(CONNECTOR_TYPE_KEY.to_owned(), "postgres".to_owned());
230            properties_with_secret.insert("host".to_owned(), jdbc_url.host);
231            properties_with_secret.insert("port".to_owned(), jdbc_url.port.to_string());
232            properties_with_secret.insert("database".to_owned(), jdbc_url.db_name);
233            if let Some(username) = jdbc_url.username {
234                properties_with_secret.insert("user".to_owned(), username);
235            }
236            if let Some(password) = jdbc_url.password {
237                properties_with_secret.insert("password".to_owned(), password);
238            }
239            if let Some(table_name) = properties_with_secret.get("table.name") {
240                properties_with_secret.insert("table".to_owned(), table_name.clone());
241            }
242            if let Some(schema_name) = properties_with_secret.get("schema.name") {
243                properties_with_secret.insert("schema".to_owned(), schema_name.clone());
244            }
245            // TODO(kwannoel): Do we need to handle jdbc.query.timeout?
246        }
247
248        let connector = {
249            let sink_type = properties_with_secret
250                .get(CONNECTOR_TYPE_KEY)
251                .ok_or_else(|| {
252                    StreamExecutorError::from((
253                        SinkError::Config(anyhow!("missing config: {}", CONNECTOR_TYPE_KEY)),
254                        sink_id,
255                    ))
256                })?;
257
258            let sink_type_str = sink_type.to_lowercase();
259            match_sink_name_str!(
260                sink_type_str.as_str(),
261                SinkType,
262                Ok(SinkType::SINK_NAME),
263                |other: &str| {
264                    Err(StreamExecutorError::from((
265                        SinkError::Config(anyhow!("unsupported sink connector {}", other)),
266                        sink_id,
267                    )))
268                }
269            )?
270        };
271
272        let (sink_param, columns) = build_sink_param(sink_desc, properties_with_secret, connector)?;
273
274        let sink_write_param = SinkWriterParam {
275            executor_id: params.executor_id,
276            vnode_bitmap: params.vnode_bitmap.clone(),
277            meta_client: params.env.meta_client().map(SinkMetaClient::MetaClient),
278            extra_partition_col_idx: sink_desc.extra_partition_col_idx.map(|v| v as usize),
279
280            actor_id: params.actor_context.id,
281            sink_id,
282            sink_name,
283            connector: connector.to_owned(),
284            streaming_config: params.config.as_ref().clone(),
285            time_zone: params.actor_context.time_zone,
286        };
287
288        let log_store_identity = format!(
289            "sink[{}]-[{}]-executor[{}]",
290            connector, sink_id, params.executor_id
291        );
292
293        let sink = build_sink(sink_param.clone())
294            .map_err(|e| StreamExecutorError::from((e, sink_param.sink_id)))?;
295
296        let exec = match node.log_store_type() {
297            // Default value is the normal in memory log store to be backward compatible with the
298            // previously unset value
299            SinkLogStoreType::InMemoryLogStore | SinkLogStoreType::Unspecified => {
300                let factory = BoundedInMemLogStoreFactory::new(1, {
301                    let state_store = state_store;
302                    let table_id = node.table.as_ref().map(|table| table.id);
303                    move |epoch| {
304                        async move {
305                            if let Some(table_id) = table_id {
306                                state_store
307                                    .try_wait_epoch(
308                                        HummockReadEpoch::Committed(epoch.prev),
309                                        TryWaitEpochOptions { table_id },
310                                    )
311                                    .await?;
312                            }
313                            Ok(())
314                        }
315                        .boxed()
316                    }
317                });
318                SinkExecutor::new(
319                    params.actor_context,
320                    params.info.clone(),
321                    input_executor,
322                    sink_write_param,
323                    sink,
324                    sink_param,
325                    columns,
326                    factory,
327                    chunk_size,
328                    input_data_types,
329                    node.rate_limit.map(|x| x as _),
330                )?
331                .boxed()
332            }
333            SinkLogStoreType::KvLogStore => {
334                let metrics = KvLogStoreMetrics::new(
335                    &params.executor_stats,
336                    params.actor_context.id,
337                    &sink_param,
338                    connector,
339                );
340
341                let table = node.table.as_ref().unwrap().clone();
342                let input_schema = input_executor.schema();
343                let pk_info = resolve_pk_info(input_schema, &table)?;
344
345                // TODO: support setting max row count in config
346                let factory = KvLogStoreFactory::new(
347                    state_store,
348                    table,
349                    params.vnode_bitmap.clone().map(Arc::new),
350                    65536,
351                    params.config.developer.chunk_size,
352                    metrics,
353                    log_store_identity,
354                    params.env.kv_log_store_historical_read_semaphore(),
355                    pk_info,
356                );
357
358                SinkExecutor::new(
359                    params.actor_context,
360                    params.info.clone(),
361                    input_executor,
362                    sink_write_param,
363                    sink,
364                    sink_param,
365                    columns,
366                    factory,
367                    chunk_size,
368                    input_data_types,
369                    node.rate_limit.map(|x| x as _),
370                )?
371                .boxed()
372            }
373        };
374
375        Ok((params.info, exec).into())
376    }
377}
378
379struct JdbcUrl {
380    host: String,
381    port: u16,
382    db_name: String,
383    username: Option<String>,
384    password: Option<String>,
385}
386
387fn parse_jdbc_url(url: &str) -> anyhow::Result<JdbcUrl> {
388    if !url.starts_with("jdbc:postgresql") {
389        bail!(
390            "invalid jdbc url, to switch to postgres rust connector, we need to use the url jdbc:postgresql://..."
391        )
392    }
393
394    // trim the "jdbc:" prefix to make it a valid url
395    let url = url.replace("jdbc:", "");
396
397    // parse the url
398    let url = Url::parse(&url).map_err(|e| anyhow!(e).context("failed to parse jdbc url"))?;
399
400    let scheme = url.scheme();
401    assert_eq!("postgresql", scheme, "jdbc scheme should be postgresql");
402    let host = url
403        .host_str()
404        .ok_or_else(|| anyhow!("missing host in jdbc url"))?;
405    let port = url
406        .port()
407        .ok_or_else(|| anyhow!("missing port in jdbc url"))?;
408    let Some(db_name) = url.path().strip_prefix('/') else {
409        bail!("missing db_name in jdbc url");
410    };
411    let mut username = None;
412    let mut password = None;
413    for (key, value) in url.query_pairs() {
414        if key == "user" {
415            username = Some(value.to_string());
416        }
417        if key == "password" {
418            password = Some(value.to_string());
419        }
420    }
421
422    Ok(JdbcUrl {
423        host: host.to_owned(),
424        port,
425        db_name: db_name.to_owned(),
426        username,
427        password,
428    })
429}
430
431#[cfg(test)]
432mod tests {
433    use super::*;
434
435    #[test]
436    fn test_parse_jdbc_url() {
437        let url = "jdbc:postgresql://localhost:5432/test?user=postgres&password=postgres";
438        let jdbc_url = parse_jdbc_url(url).unwrap();
439        assert_eq!(jdbc_url.host, "localhost");
440        assert_eq!(jdbc_url.port, 5432);
441        assert_eq!(jdbc_url.db_name, "test");
442        assert_eq!(jdbc_url.username, Some("postgres".to_owned()));
443        assert_eq!(jdbc_url.password, Some("postgres".to_owned()));
444    }
445}