risingwave_connector/source/pulsar/
mod.rs1pub mod enumerator;
16pub mod source;
17pub mod split;
18pub mod topic;
19
20use std::collections::HashMap;
21use std::time::Duration;
22
23pub use enumerator::*;
24use pulsar::OperationRetryOptions;
25use serde::{Deserialize, de};
26use serde_with::serde_as;
27pub use split::*;
28use with_options::WithOptions;
29
30use self::source::reader::PulsarSplitReader;
31use crate::connector_common::{AwsAuthProps, PulsarCommon, PulsarOauthCommon};
32use crate::enforce_secret::EnforceSecret;
33use crate::error::ConnectorError;
34use crate::source::SourceProperties;
35use crate::{deserialize_optional_bool_from_string, deserialize_optional_duration_from_string};
36
37pub const PULSAR_CONNECTOR: &str = "pulsar";
38
39impl SourceProperties for PulsarProperties {
40 type Split = PulsarSplit;
41 type SplitEnumerator = PulsarSplitEnumerator;
42 type SplitReader = PulsarSplitReader;
43
44 const SOURCE_NAME: &'static str = PULSAR_CONNECTOR;
45}
46
47impl crate::source::UnknownFields for PulsarProperties {
48 fn unknown_fields(&self) -> HashMap<String, String> {
49 self.unknown_fields.clone()
50 }
51}
52
53impl EnforceSecret for PulsarProperties {
54 fn enforce_secret<'a>(prop_iter: impl Iterator<Item = &'a str>) -> Result<(), ConnectorError> {
55 for prop in prop_iter {
56 PulsarCommon::enforce_one(prop)?;
57 }
58 Ok(())
59 }
60}
61
62impl EnforceSecret for PulsarConsumerOptions {}
63
64#[derive(Clone, Debug, Deserialize, WithOptions)]
65#[serde_as]
66pub struct PulsarConsumerOptions {
67 #[serde(
68 rename = "pulsar.read_compacted",
69 default,
70 deserialize_with = "deserialize_optional_bool_from_string"
71 )]
72 pub read_compacted: Option<bool>,
73}
74
75#[derive(Clone, Debug, Default, Deserialize, WithOptions)]
76#[serde_as]
77pub struct PulsarSourceOperationRetry {
78 #[serde(
79 rename = "pulsar.operation.retry.max.retries",
80 deserialize_with = "deserialize_optional_u32_from_string",
81 default
82 )]
83 #[with_option(allow_alter_on_fly)]
84 pub max_retries: Option<u32>,
85
86 #[serde(
90 rename = "pulsar.operation.retry.delay",
91 deserialize_with = "deserialize_optional_duration_from_string",
92 default
93 )]
94 #[with_option(allow_alter_on_fly)]
95 pub retry_delay: Option<Duration>,
96}
97
98impl PulsarSourceOperationRetry {
99 pub(crate) fn to_pulsar_options(&self) -> Option<OperationRetryOptions> {
100 if self.max_retries.is_none() && self.retry_delay.is_none() {
101 return None;
102 }
103
104 let default_options = OperationRetryOptions::default();
105 Some(OperationRetryOptions {
106 max_retries: self.max_retries,
107 retry_delay: self.retry_delay.unwrap_or(default_options.retry_delay),
108 ..default_options
109 })
110 }
111}
112
113fn deserialize_optional_u32_from_string<'de, D>(deserializer: D) -> Result<Option<u32>, D::Error>
114where
115 D: de::Deserializer<'de>,
116{
117 let s: Option<String> = de::Deserialize::deserialize(deserializer)?;
118 if let Some(s) = s {
119 let parsed = s.parse().map_err(|_| {
120 de::Error::invalid_value(
121 de::Unexpected::Str(&s),
122 &"integer greater than or equal to 0",
123 )
124 })?;
125 Ok(Some(parsed))
126 } else {
127 Ok(None)
128 }
129}
130
131#[cfg(any(test, feature = "test"))]
132pub mod test_utils {
133 use crate::error::ConnectorError;
134
135 pub fn make_retryable_pulsar_connector_error(message: impl Into<String>) -> ConnectorError {
136 pulsar::Error::Connection(pulsar::error::ConnectionError::Io(std::io::Error::new(
137 std::io::ErrorKind::ConnectionReset,
138 message.into(),
139 )))
140 .into()
141 }
142}
143
144#[derive(Clone, Debug, Deserialize, WithOptions)]
145#[serde_as]
146pub struct PulsarProperties {
147 #[serde(rename = "scan.startup.mode", alias = "pulsar.scan.startup.mode")]
148 pub scan_startup_mode: Option<String>,
149
150 #[serde(
151 rename = "scan.startup.timestamp.millis",
152 alias = "pulsar.time.offset",
153 alias = "scan.startup.timestamp_millis"
154 )]
155 pub time_offset: Option<String>,
156
157 #[serde(flatten)]
158 pub common: PulsarCommon,
159
160 #[serde(flatten)]
161 pub oauth: Option<PulsarOauthCommon>,
162
163 #[serde(flatten)]
164 pub aws_auth_props: AwsAuthProps,
165
166 #[serde(rename = "iceberg.enabled")]
167 #[serde_as(as = "DisplayFromStr")]
168 pub iceberg_loader_enabled: Option<bool>,
169
170 #[serde(rename = "iceberg.bucket", default)]
171 pub iceberg_bucket: Option<String>,
172
173 #[serde(rename = "subscription.name.prefix")]
181 pub subscription_name_prefix: Option<String>,
182
183 #[serde(
184 rename = "subscription.unacked.resend.delay",
185 deserialize_with = "deserialize_optional_duration_from_string",
186 default
187 )]
188 pub subscription_unacked_resend_delay: Option<Duration>,
189
190 #[serde(flatten)]
191 pub operation_retry: PulsarSourceOperationRetry,
192
193 #[serde(flatten)]
194 pub consumer_options: PulsarConsumerOptions,
195
196 #[serde(flatten)]
197 pub unknown_fields: HashMap<String, String>,
198}
199
200#[cfg(test)]
201mod tests {
202 use serde_json::json;
203
204 use super::*;
205
206 fn parse_pulsar_properties(extra: serde_json::Value) -> PulsarProperties {
207 let mut value = json!({
208 "topic": "persistent://public/default/test-topic",
209 "service.url": "pulsar://localhost:6650",
210 });
211
212 value
213 .as_object_mut()
214 .unwrap()
215 .extend(extra.as_object().unwrap().clone());
216
217 serde_json::from_value(value).unwrap()
218 }
219
220 #[test]
221 fn test_operation_retry_override_uses_upstream_defaults_when_absent() {
222 let props = parse_pulsar_properties(json!({}));
223 assert!(props.operation_retry.to_pulsar_options().is_none());
224 }
225
226 #[test]
227 fn test_operation_retry_override_sets_max_retries_only() {
228 let props = parse_pulsar_properties(json!({"pulsar.operation.retry.max.retries": "7"}));
229
230 let options = props.operation_retry.to_pulsar_options().unwrap();
231 assert_eq!(options.max_retries, Some(7));
232 assert_eq!(
233 options.retry_delay,
234 OperationRetryOptions::default().retry_delay
235 );
236 }
237
238 #[test]
239 fn test_operation_retry_override_sets_max_retries_and_delay() {
240 let props = parse_pulsar_properties(json!({
241 "pulsar.operation.retry.max.retries": "9",
242 "pulsar.operation.retry.delay": "3s",
243 }));
244
245 let options = props.operation_retry.to_pulsar_options().unwrap();
246 assert_eq!(options.max_retries, Some(9));
247 assert_eq!(options.retry_delay, Duration::from_secs(3));
248 }
249}