risingwave_connector/parser/
utils.rs1use std::collections::BTreeMap;
16
17use anyhow::Context;
18use bytes::Bytes;
19use reqwest::Url;
20use risingwave_common::bail;
21use risingwave_common::types::{Datum, DatumCow, DatumRef, ScalarRefImpl};
22use risingwave_pb::data::DataType as PbDataType;
23
24use crate::aws_utils::load_file_descriptor_from_s3;
25use crate::connector_common::AwsAuthProps;
26use crate::error::ConnectorResult;
27use crate::source::SourceMeta;
28
29macro_rules! log_error {
30 ($name:expr, $err:expr, $message:expr) => {
31 if let Ok(suppressed_count) = LOG_SUPPRESSOR.check() {
32 tracing::error!(
33 column = $name,
34 error = %$err.as_report(),
35 suppressed_count,
36 $message,
37 );
38 }
39 };
40}
41pub(crate) use log_error;
42use risingwave_pb::plan_common::additional_column;
43use risingwave_pb::plan_common::additional_column::ColumnType;
44
45use crate::parser::{AccessError, AccessResult};
46use crate::source::cdc::DebeziumCdcMeta;
47
48pub(super) fn get_kafka_topic(props: &BTreeMap<String, String>) -> ConnectorResult<&String> {
50 const KAFKA_TOPIC_KEY1: &str = "kafka.topic";
51 const KAFKA_TOPIC_KEY2: &str = "topic";
52
53 if let Some(topic) = props.get(KAFKA_TOPIC_KEY1) {
54 return Ok(topic);
55 }
56 if let Some(topic) = props.get(KAFKA_TOPIC_KEY2) {
57 return Ok(topic);
58 }
59
60 bail!(
62 "Must specify '{}' or '{}'",
63 KAFKA_TOPIC_KEY1,
64 KAFKA_TOPIC_KEY2
65 )
66}
67
68pub(super) async fn download_from_http(location: &Url) -> ConnectorResult<Bytes> {
70 let res = reqwest::get(location.clone())
71 .await
72 .with_context(|| format!("failed to make request to {location}"))?
73 .error_for_status()
74 .with_context(|| format!("http request failed for {location}"))?;
75
76 let bytes = res
77 .bytes()
78 .await
79 .with_context(|| format!("failed to read HTTP body of {location}"))?;
80
81 Ok(bytes)
82}
83
84#[macro_export]
86macro_rules! only_parse_payload {
87 ($self:ident, $payload:ident, $writer:ident) => {
88 if let Some(payload) = $payload {
89 $self.parse_inner(payload, $writer).await
90 } else {
91 risingwave_common::bail!("empty payload with non-empty key")
92 }
93 };
94}
95
96pub(super) async fn bytes_from_url(
101 url: &Url,
102 config: Option<&AwsAuthProps>,
103) -> ConnectorResult<Vec<u8>> {
104 match (url.scheme(), config) {
105 ("file", _) => {
107 let path = url
108 .to_file_path()
109 .ok()
110 .with_context(|| format!("illegal path: {url}"))?;
111 Ok(std::fs::read(&path)
112 .with_context(|| format!("failed to read file from `{}`", path.display()))?)
113 }
114 ("https" | "http", _) => Ok(download_from_http(url).await?.into()),
115 ("s3", Some(config)) => load_file_descriptor_from_s3(url, config).await,
116 (scheme, _) => bail!("path scheme `{scheme}` is not supported"),
117 }
118}
119
120pub fn extract_timestamp_from_meta(meta: &SourceMeta) -> DatumRef<'_> {
121 match meta {
122 SourceMeta::Kafka(kafka_meta) => kafka_meta.extract_timestamp(),
123 SourceMeta::DebeziumCdc(cdc_meta) => cdc_meta.extract_timestamp(),
124 SourceMeta::Kinesis(kinesis_meta) => kinesis_meta.extract_timestamp(),
125 _ => None,
126 }
127}
128
129pub fn extract_cdc_meta_column<'a>(
130 cdc_meta: &'a DebeziumCdcMeta,
131 column_type: &additional_column::ColumnType,
132 column_name: &str,
133) -> AccessResult<DatumRef<'a>> {
134 match column_type {
135 ColumnType::Timestamp(_) => Ok(cdc_meta.extract_timestamp()),
136 ColumnType::DatabaseName(_) => Ok(cdc_meta.extract_database_name()),
137 ColumnType::TableName(_) => Ok(cdc_meta.extract_table_name_only()),
140 _ => Err(AccessError::UnsupportedAdditionalColumn {
141 name: column_name.to_owned(),
142 }),
143 }
144}
145
146pub fn extract_headers_from_meta(meta: &SourceMeta) -> Option<Datum> {
147 match meta {
148 SourceMeta::Kafka(kafka_meta) => kafka_meta.extract_headers(), _ => None,
150 }
151}
152
153pub fn extract_pulsar_message_id_data_from_meta(meta: &SourceMeta) -> Option<DatumRef<'_>> {
154 match meta {
155 SourceMeta::Pulsar(pulsar_meta) => pulsar_meta
156 .ack_message_id
157 .as_ref()
158 .map(|message_id| Some(ScalarRefImpl::Bytea(message_id))),
159 _ => None,
160 }
161}
162
163pub fn extract_header_inner_from_meta<'a>(
164 meta: &'a SourceMeta,
165 inner_field: &str,
166 data_type: Option<&PbDataType>,
167) -> Option<DatumCow<'a>> {
168 match meta {
169 SourceMeta::Kafka(kafka_meta) => kafka_meta.extract_header_inner(inner_field, data_type), _ => None,
171 }
172}
173
174pub fn extract_subject_from_meta(meta: &SourceMeta) -> Option<DatumRef<'_>> {
175 match meta {
176 SourceMeta::Nats(nats_meta) => Some(nats_meta.extract_subject()),
177 _ => None,
178 }
179}