risingwave_connector/parser/
utils.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14use std::collections::BTreeMap;
15
16use anyhow::Context;
17use bytes::Bytes;
18use reqwest::Url;
19use risingwave_common::bail;
20use risingwave_common::types::{Datum, DatumCow, DatumRef};
21use risingwave_pb::data::DataType as PbDataType;
22
23use crate::aws_utils::load_file_descriptor_from_s3;
24use crate::connector_common::AwsAuthProps;
25use crate::error::ConnectorResult;
26use crate::source::SourceMeta;
27
28macro_rules! log_error {
29    ($name:expr, $err:expr, $message:expr) => {
30        if let Ok(suppressed_count) = LOG_SUPPERSSER.check() {
31            tracing::error!(
32                column = $name,
33                error = %$err.as_report(),
34                suppressed_count,
35                $message,
36            );
37        }
38    };
39}
40pub(crate) use log_error;
41use risingwave_pb::plan_common::additional_column;
42use risingwave_pb::plan_common::additional_column::ColumnType;
43
44use crate::parser::{AccessError, AccessResult};
45use crate::source::cdc::DebeziumCdcMeta;
46
47/// get kafka topic name
48pub(super) fn get_kafka_topic(props: &BTreeMap<String, String>) -> ConnectorResult<&String> {
49    const KAFKA_TOPIC_KEY1: &str = "kafka.topic";
50    const KAFKA_TOPIC_KEY2: &str = "topic";
51
52    if let Some(topic) = props.get(KAFKA_TOPIC_KEY1) {
53        return Ok(topic);
54    }
55    if let Some(topic) = props.get(KAFKA_TOPIC_KEY2) {
56        return Ok(topic);
57    }
58
59    // config
60    bail!(
61        "Must specify '{}' or '{}'",
62        KAFKA_TOPIC_KEY1,
63        KAFKA_TOPIC_KEY2
64    )
65}
66
67/// download bytes from http(s) url
68pub(super) async fn download_from_http(location: &Url) -> ConnectorResult<Bytes> {
69    let res = reqwest::get(location.clone())
70        .await
71        .with_context(|| format!("failed to make request to {location}"))?
72        .error_for_status()
73        .with_context(|| format!("http request failed for {location}"))?;
74
75    let bytes = res
76        .bytes()
77        .await
78        .with_context(|| format!("failed to read HTTP body of {location}"))?;
79
80    Ok(bytes)
81}
82
83// For parser that doesn't support key currently
84#[macro_export]
85macro_rules! only_parse_payload {
86    ($self:ident, $payload:ident, $writer:ident) => {
87        if let Some(payload) = $payload {
88            $self.parse_inner(payload, $writer).await
89        } else {
90            risingwave_common::bail!("empty payload with non-empty key")
91        }
92    };
93}
94
95/// Load raw bytes from:
96/// * local file, for on-premise or testing.
97/// * http/https, for common usage.
98/// * s3 file location format: <s3://bucket_name/file_name>
99pub(super) async fn bytes_from_url(
100    url: &Url,
101    config: Option<&AwsAuthProps>,
102) -> ConnectorResult<Vec<u8>> {
103    match (url.scheme(), config) {
104        // TODO(Tao): support local file only when it's compiled in debug mode.
105        ("file", _) => {
106            let path = url
107                .to_file_path()
108                .ok()
109                .with_context(|| format!("illegal path: {url}"))?;
110            Ok(std::fs::read(&path)
111                .with_context(|| format!("failed to read file from `{}`", path.display()))?)
112        }
113        ("https" | "http", _) => Ok(download_from_http(url).await?.into()),
114        ("s3", Some(config)) => load_file_descriptor_from_s3(url, config).await,
115        (scheme, _) => bail!("path scheme `{scheme}` is not supported"),
116    }
117}
118
119pub fn extract_timestamp_from_meta(meta: &SourceMeta) -> DatumRef<'_> {
120    match meta {
121        SourceMeta::Kafka(kafka_meta) => kafka_meta.extract_timestamp(),
122        SourceMeta::DebeziumCdc(cdc_meta) => cdc_meta.extract_timestamp(),
123        SourceMeta::Kinesis(kinesis_meta) => kinesis_meta.extract_timestamp(),
124        _ => None,
125    }
126}
127
128pub fn extract_cdc_meta_column<'a>(
129    cdc_meta: &'a DebeziumCdcMeta,
130    column_type: &additional_column::ColumnType,
131    column_name: &str,
132) -> AccessResult<DatumRef<'a>> {
133    match column_type {
134        ColumnType::Timestamp(_) => Ok(cdc_meta.extract_timestamp()),
135        ColumnType::DatabaseName(_) => Ok(cdc_meta.extract_database_name()),
136        ColumnType::TableName(_) => Ok(cdc_meta.extract_table_name()),
137        _ => Err(AccessError::UnsupportedAdditionalColumn {
138            name: column_name.to_owned(),
139        }),
140    }
141}
142
143pub fn extract_headers_from_meta(meta: &SourceMeta) -> Option<Datum> {
144    match meta {
145        SourceMeta::Kafka(kafka_meta) => kafka_meta.extract_headers(), /* expect output of type `array[struct<varchar, bytea>]` */
146        _ => None,
147    }
148}
149
150pub fn extract_header_inner_from_meta<'a>(
151    meta: &'a SourceMeta,
152    inner_field: &str,
153    data_type: Option<&PbDataType>,
154) -> Option<DatumCow<'a>> {
155    match meta {
156        SourceMeta::Kafka(kafka_meta) => kafka_meta.extract_header_inner(inner_field, data_type), /* expect output of type `bytea` or `varchar` */
157        _ => None,
158    }
159}
160
161pub fn extract_subject_from_meta(meta: &SourceMeta) -> Option<DatumRef<'_>> {
162    match meta {
163        SourceMeta::Nats(nats_meta) => Some(nats_meta.extract_subject()),
164        _ => None,
165    }
166}