risingwave_connector/parser/
utils.rs

1// Copyright 2024 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::BTreeMap;
16
17use anyhow::Context;
18use bytes::Bytes;
19use reqwest::Url;
20use risingwave_common::bail;
21use risingwave_common::types::{Datum, DatumCow, DatumRef, ScalarRefImpl};
22use risingwave_pb::data::DataType as PbDataType;
23
24use crate::aws_utils::load_file_descriptor_from_s3;
25use crate::connector_common::AwsAuthProps;
26use crate::error::ConnectorResult;
27use crate::source::SourceMeta;
28
29macro_rules! log_error {
30    ($name:expr, $err:expr, $message:expr) => {
31        if let Ok(suppressed_count) = LOG_SUPPRESSOR.check() {
32            tracing::error!(
33                column = $name,
34                error = %$err.as_report(),
35                suppressed_count,
36                $message,
37            );
38        }
39    };
40}
41pub(crate) use log_error;
42use risingwave_pb::plan_common::additional_column;
43use risingwave_pb::plan_common::additional_column::ColumnType;
44
45use crate::parser::{AccessError, AccessResult};
46use crate::source::cdc::DebeziumCdcMeta;
47
48/// get kafka topic name
49pub(super) fn get_kafka_topic(props: &BTreeMap<String, String>) -> ConnectorResult<&String> {
50    const KAFKA_TOPIC_KEY1: &str = "kafka.topic";
51    const KAFKA_TOPIC_KEY2: &str = "topic";
52
53    if let Some(topic) = props.get(KAFKA_TOPIC_KEY1) {
54        return Ok(topic);
55    }
56    if let Some(topic) = props.get(KAFKA_TOPIC_KEY2) {
57        return Ok(topic);
58    }
59
60    // config
61    bail!(
62        "Must specify '{}' or '{}'",
63        KAFKA_TOPIC_KEY1,
64        KAFKA_TOPIC_KEY2
65    )
66}
67
68/// download bytes from http(s) url
69pub(super) async fn download_from_http(location: &Url) -> ConnectorResult<Bytes> {
70    let res = reqwest::get(location.clone())
71        .await
72        .with_context(|| format!("failed to make request to {location}"))?
73        .error_for_status()
74        .with_context(|| format!("http request failed for {location}"))?;
75
76    let bytes = res
77        .bytes()
78        .await
79        .with_context(|| format!("failed to read HTTP body of {location}"))?;
80
81    Ok(bytes)
82}
83
84// For parser that doesn't support key currently
85#[macro_export]
86macro_rules! only_parse_payload {
87    ($self:ident, $payload:ident, $writer:ident) => {
88        if let Some(payload) = $payload {
89            $self.parse_inner(payload, $writer).await
90        } else {
91            risingwave_common::bail!("empty payload with non-empty key")
92        }
93    };
94}
95
96/// Load raw bytes from:
97/// * local file, for on-premise or testing.
98/// * http/https, for common usage.
99/// * s3 file location format: <s3://bucket_name/file_name>
100pub(super) async fn bytes_from_url(
101    url: &Url,
102    config: Option<&AwsAuthProps>,
103) -> ConnectorResult<Vec<u8>> {
104    match (url.scheme(), config) {
105        // TODO(Tao): support local file only when it's compiled in debug mode.
106        ("file", _) => {
107            let path = url
108                .to_file_path()
109                .ok()
110                .with_context(|| format!("illegal path: {url}"))?;
111            Ok(std::fs::read(&path)
112                .with_context(|| format!("failed to read file from `{}`", path.display()))?)
113        }
114        ("https" | "http", _) => Ok(download_from_http(url).await?.into()),
115        ("s3", Some(config)) => load_file_descriptor_from_s3(url, config).await,
116        (scheme, _) => bail!("path scheme `{scheme}` is not supported"),
117    }
118}
119
120pub fn extract_timestamp_from_meta(meta: &SourceMeta) -> DatumRef<'_> {
121    match meta {
122        SourceMeta::Kafka(kafka_meta) => kafka_meta.extract_timestamp(),
123        SourceMeta::DebeziumCdc(cdc_meta) => cdc_meta.extract_timestamp(),
124        SourceMeta::Kinesis(kinesis_meta) => kinesis_meta.extract_timestamp(),
125        _ => None,
126    }
127}
128
129pub fn extract_cdc_meta_column<'a>(
130    cdc_meta: &'a DebeziumCdcMeta,
131    column_type: &additional_column::ColumnType,
132    column_name: &str,
133) -> AccessResult<DatumRef<'a>> {
134    match column_type {
135        ColumnType::Timestamp(_) => Ok(cdc_meta.extract_timestamp()),
136        ColumnType::DatabaseName(_) => Ok(cdc_meta.extract_database_name()),
137        // `table_name` in additional columns should be the object name only, not the routing key
138        // (which can be `schema.table` or `db.table` depending on the connector).
139        ColumnType::TableName(_) => Ok(cdc_meta.extract_table_name_only()),
140        _ => Err(AccessError::UnsupportedAdditionalColumn {
141            name: column_name.to_owned(),
142        }),
143    }
144}
145
146pub fn extract_headers_from_meta(meta: &SourceMeta) -> Option<Datum> {
147    match meta {
148        SourceMeta::Kafka(kafka_meta) => kafka_meta.extract_headers(), /* expect output of type `array[struct<varchar, bytea>]` */
149        _ => None,
150    }
151}
152
153pub fn extract_pulsar_message_id_data_from_meta(meta: &SourceMeta) -> Option<DatumRef<'_>> {
154    match meta {
155        SourceMeta::Pulsar(pulsar_meta) => pulsar_meta
156            .ack_message_id
157            .as_ref()
158            .map(|message_id| Some(ScalarRefImpl::Bytea(message_id))),
159        _ => None,
160    }
161}
162
163pub fn extract_header_inner_from_meta<'a>(
164    meta: &'a SourceMeta,
165    inner_field: &str,
166    data_type: Option<&PbDataType>,
167) -> Option<DatumCow<'a>> {
168    match meta {
169        SourceMeta::Kafka(kafka_meta) => kafka_meta.extract_header_inner(inner_field, data_type), /* expect output of type `bytea` or `varchar` */
170        _ => None,
171    }
172}
173
174pub fn extract_subject_from_meta(meta: &SourceMeta) -> Option<DatumRef<'_>> {
175    match meta {
176        SourceMeta::Nats(nats_meta) => Some(nats_meta.extract_subject()),
177        _ => None,
178    }
179}