risingwave_connector/parser/
utils.rs1use std::collections::BTreeMap;
15
16use anyhow::Context;
17use bytes::Bytes;
18use reqwest::Url;
19use risingwave_common::bail;
20use risingwave_common::types::{Datum, DatumCow, DatumRef};
21use risingwave_pb::data::DataType as PbDataType;
22
23use crate::aws_utils::load_file_descriptor_from_s3;
24use crate::connector_common::AwsAuthProps;
25use crate::error::ConnectorResult;
26use crate::source::SourceMeta;
27
28macro_rules! log_error {
29 ($name:expr, $err:expr, $message:expr) => {
30 if let Ok(suppressed_count) = LOG_SUPPERSSER.check() {
31 tracing::error!(
32 column = $name,
33 error = %$err.as_report(),
34 suppressed_count,
35 $message,
36 );
37 }
38 };
39}
40pub(crate) use log_error;
41use risingwave_pb::plan_common::additional_column;
42use risingwave_pb::plan_common::additional_column::ColumnType;
43
44use crate::parser::{AccessError, AccessResult};
45use crate::source::cdc::DebeziumCdcMeta;
46
47pub(super) fn get_kafka_topic(props: &BTreeMap<String, String>) -> ConnectorResult<&String> {
49 const KAFKA_TOPIC_KEY1: &str = "kafka.topic";
50 const KAFKA_TOPIC_KEY2: &str = "topic";
51
52 if let Some(topic) = props.get(KAFKA_TOPIC_KEY1) {
53 return Ok(topic);
54 }
55 if let Some(topic) = props.get(KAFKA_TOPIC_KEY2) {
56 return Ok(topic);
57 }
58
59 bail!(
61 "Must specify '{}' or '{}'",
62 KAFKA_TOPIC_KEY1,
63 KAFKA_TOPIC_KEY2
64 )
65}
66
67pub(super) async fn download_from_http(location: &Url) -> ConnectorResult<Bytes> {
69 let res = reqwest::get(location.clone())
70 .await
71 .with_context(|| format!("failed to make request to {location}"))?
72 .error_for_status()
73 .with_context(|| format!("http request failed for {location}"))?;
74
75 let bytes = res
76 .bytes()
77 .await
78 .with_context(|| format!("failed to read HTTP body of {location}"))?;
79
80 Ok(bytes)
81}
82
83#[macro_export]
85macro_rules! only_parse_payload {
86 ($self:ident, $payload:ident, $writer:ident) => {
87 if let Some(payload) = $payload {
88 $self.parse_inner(payload, $writer).await
89 } else {
90 risingwave_common::bail!("empty payload with non-empty key")
91 }
92 };
93}
94
95pub(super) async fn bytes_from_url(
100 url: &Url,
101 config: Option<&AwsAuthProps>,
102) -> ConnectorResult<Vec<u8>> {
103 match (url.scheme(), config) {
104 ("file", _) => {
106 let path = url
107 .to_file_path()
108 .ok()
109 .with_context(|| format!("illegal path: {url}"))?;
110 Ok(std::fs::read(&path)
111 .with_context(|| format!("failed to read file from `{}`", path.display()))?)
112 }
113 ("https" | "http", _) => Ok(download_from_http(url).await?.into()),
114 ("s3", Some(config)) => load_file_descriptor_from_s3(url, config).await,
115 (scheme, _) => bail!("path scheme `{scheme}` is not supported"),
116 }
117}
118
119pub fn extract_timestamp_from_meta(meta: &SourceMeta) -> DatumRef<'_> {
120 match meta {
121 SourceMeta::Kafka(kafka_meta) => kafka_meta.extract_timestamp(),
122 SourceMeta::DebeziumCdc(cdc_meta) => cdc_meta.extract_timestamp(),
123 SourceMeta::Kinesis(kinesis_meta) => kinesis_meta.extract_timestamp(),
124 _ => None,
125 }
126}
127
128pub fn extract_cdc_meta_column<'a>(
129 cdc_meta: &'a DebeziumCdcMeta,
130 column_type: &additional_column::ColumnType,
131 column_name: &str,
132) -> AccessResult<DatumRef<'a>> {
133 match column_type {
134 ColumnType::Timestamp(_) => Ok(cdc_meta.extract_timestamp()),
135 ColumnType::DatabaseName(_) => Ok(cdc_meta.extract_database_name()),
136 ColumnType::TableName(_) => Ok(cdc_meta.extract_table_name()),
137 _ => Err(AccessError::UnsupportedAdditionalColumn {
138 name: column_name.to_owned(),
139 }),
140 }
141}
142
143pub fn extract_headers_from_meta(meta: &SourceMeta) -> Option<Datum> {
144 match meta {
145 SourceMeta::Kafka(kafka_meta) => kafka_meta.extract_headers(), _ => None,
147 }
148}
149
150pub fn extract_header_inner_from_meta<'a>(
151 meta: &'a SourceMeta,
152 inner_field: &str,
153 data_type: Option<&PbDataType>,
154) -> Option<DatumCow<'a>> {
155 match meta {
156 SourceMeta::Kafka(kafka_meta) => kafka_meta.extract_header_inner(inner_field, data_type), _ => None,
158 }
159}
160
161pub fn extract_subject_from_meta(meta: &SourceMeta) -> Option<DatumRef<'_>> {
162 match meta {
163 SourceMeta::Nats(nats_meta) => Some(nats_meta.extract_subject()),
164 _ => None,
165 }
166}