risingwave_connector/source/pulsar/
mod.rs

1// Copyright 2022 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15pub mod enumerator;
16pub mod source;
17pub mod split;
18pub mod topic;
19
20use std::collections::HashMap;
21use std::time::Duration;
22
23pub use enumerator::*;
24use pulsar::OperationRetryOptions;
25use serde::{Deserialize, de};
26use serde_with::serde_as;
27pub use split::*;
28use with_options::WithOptions;
29
30use self::source::reader::PulsarSplitReader;
31use crate::connector_common::{AwsAuthProps, PulsarCommon, PulsarOauthCommon};
32use crate::enforce_secret::EnforceSecret;
33use crate::error::ConnectorError;
34use crate::source::SourceProperties;
35use crate::{deserialize_optional_bool_from_string, deserialize_optional_duration_from_string};
36
37pub const PULSAR_CONNECTOR: &str = "pulsar";
38
39impl SourceProperties for PulsarProperties {
40    type Split = PulsarSplit;
41    type SplitEnumerator = PulsarSplitEnumerator;
42    type SplitReader = PulsarSplitReader;
43
44    const SOURCE_NAME: &'static str = PULSAR_CONNECTOR;
45}
46
47impl crate::source::UnknownFields for PulsarProperties {
48    fn unknown_fields(&self) -> HashMap<String, String> {
49        self.unknown_fields.clone()
50    }
51}
52
53impl EnforceSecret for PulsarProperties {
54    fn enforce_secret<'a>(prop_iter: impl Iterator<Item = &'a str>) -> Result<(), ConnectorError> {
55        for prop in prop_iter {
56            PulsarCommon::enforce_one(prop)?;
57        }
58        Ok(())
59    }
60}
61
62impl EnforceSecret for PulsarConsumerOptions {}
63
64#[derive(Clone, Debug, Deserialize, WithOptions)]
65#[serde_as]
66pub struct PulsarConsumerOptions {
67    #[serde(
68        rename = "pulsar.read_compacted",
69        default,
70        deserialize_with = "deserialize_optional_bool_from_string"
71    )]
72    pub read_compacted: Option<bool>,
73}
74
75#[derive(Clone, Debug, Default, Deserialize, WithOptions)]
76#[serde_as]
77pub struct PulsarSourceOperationRetry {
78    #[serde(
79        rename = "pulsar.operation.retry.max.retries",
80        deserialize_with = "deserialize_optional_u32_from_string",
81        default
82    )]
83    #[with_option(allow_alter_on_fly)]
84    pub max_retries: Option<u32>,
85
86    /// This controls Pulsar client operation retry delay for the source.
87    /// It is distinct from `subscription.unacked.resend.delay`, which only affects
88    /// unacked message redelivery, and from sink-side `properties.retry.*` settings.
89    #[serde(
90        rename = "pulsar.operation.retry.delay",
91        deserialize_with = "deserialize_optional_duration_from_string",
92        default
93    )]
94    #[with_option(allow_alter_on_fly)]
95    pub retry_delay: Option<Duration>,
96}
97
98impl PulsarSourceOperationRetry {
99    pub(crate) fn to_pulsar_options(&self) -> Option<OperationRetryOptions> {
100        if self.max_retries.is_none() && self.retry_delay.is_none() {
101            return None;
102        }
103
104        let default_options = OperationRetryOptions::default();
105        Some(OperationRetryOptions {
106            max_retries: self.max_retries,
107            retry_delay: self.retry_delay.unwrap_or(default_options.retry_delay),
108            ..default_options
109        })
110    }
111}
112
113fn deserialize_optional_u32_from_string<'de, D>(deserializer: D) -> Result<Option<u32>, D::Error>
114where
115    D: de::Deserializer<'de>,
116{
117    let s: Option<String> = de::Deserialize::deserialize(deserializer)?;
118    if let Some(s) = s {
119        let parsed = s.parse().map_err(|_| {
120            de::Error::invalid_value(
121                de::Unexpected::Str(&s),
122                &"integer greater than or equal to 0",
123            )
124        })?;
125        Ok(Some(parsed))
126    } else {
127        Ok(None)
128    }
129}
130
131#[cfg(any(test, feature = "test"))]
132pub mod test_utils {
133    use crate::error::ConnectorError;
134
135    pub fn make_retryable_pulsar_connector_error(message: impl Into<String>) -> ConnectorError {
136        pulsar::Error::Connection(pulsar::error::ConnectionError::Io(std::io::Error::new(
137            std::io::ErrorKind::ConnectionReset,
138            message.into(),
139        )))
140        .into()
141    }
142}
143
144#[derive(Clone, Debug, Deserialize, WithOptions)]
145#[serde_as]
146pub struct PulsarProperties {
147    #[serde(rename = "scan.startup.mode", alias = "pulsar.scan.startup.mode")]
148    pub scan_startup_mode: Option<String>,
149
150    #[serde(
151        rename = "scan.startup.timestamp.millis",
152        alias = "pulsar.time.offset",
153        alias = "scan.startup.timestamp_millis"
154    )]
155    pub time_offset: Option<String>,
156
157    #[serde(flatten)]
158    pub common: PulsarCommon,
159
160    #[serde(flatten)]
161    pub oauth: Option<PulsarOauthCommon>,
162
163    #[serde(flatten)]
164    pub aws_auth_props: AwsAuthProps,
165
166    #[serde(rename = "iceberg.enabled")]
167    #[serde_as(as = "DisplayFromStr")]
168    pub iceberg_loader_enabled: Option<bool>,
169
170    #[serde(rename = "iceberg.bucket", default)]
171    pub iceberg_bucket: Option<String>,
172
173    /// Specify a custom consumer group id prefix for the source.
174    /// Defaults to `rw-consumer`.
175    ///
176    /// Notes:
177    /// - Each job (materialized view) will have multiple subscriptions and
178    ///   contains a generated suffix in the subscription name.
179    ///   The subscription name will be `{subscription_name_prefix}-{fragment_id}-{actor_id}`.
180    #[serde(rename = "subscription.name.prefix")]
181    pub subscription_name_prefix: Option<String>,
182
183    #[serde(
184        rename = "subscription.unacked.resend.delay",
185        deserialize_with = "deserialize_optional_duration_from_string",
186        default
187    )]
188    pub subscription_unacked_resend_delay: Option<Duration>,
189
190    #[serde(flatten)]
191    pub operation_retry: PulsarSourceOperationRetry,
192
193    #[serde(flatten)]
194    pub consumer_options: PulsarConsumerOptions,
195
196    #[serde(flatten)]
197    pub unknown_fields: HashMap<String, String>,
198}
199
200#[cfg(test)]
201mod tests {
202    use serde_json::json;
203
204    use super::*;
205
206    fn parse_pulsar_properties(extra: serde_json::Value) -> PulsarProperties {
207        let mut value = json!({
208            "topic": "persistent://public/default/test-topic",
209            "service.url": "pulsar://localhost:6650",
210        });
211
212        value
213            .as_object_mut()
214            .unwrap()
215            .extend(extra.as_object().unwrap().clone());
216
217        serde_json::from_value(value).unwrap()
218    }
219
220    #[test]
221    fn test_operation_retry_override_uses_upstream_defaults_when_absent() {
222        let props = parse_pulsar_properties(json!({}));
223        assert!(props.operation_retry.to_pulsar_options().is_none());
224    }
225
226    #[test]
227    fn test_operation_retry_override_sets_max_retries_only() {
228        let props = parse_pulsar_properties(json!({"pulsar.operation.retry.max.retries": "7"}));
229
230        let options = props.operation_retry.to_pulsar_options().unwrap();
231        assert_eq!(options.max_retries, Some(7));
232        assert_eq!(
233            options.retry_delay,
234            OperationRetryOptions::default().retry_delay
235        );
236    }
237
238    #[test]
239    fn test_operation_retry_override_sets_max_retries_and_delay() {
240        let props = parse_pulsar_properties(json!({
241            "pulsar.operation.retry.max.retries": "9",
242            "pulsar.operation.retry.delay": "3s",
243        }));
244
245        let options = props.operation_retry.to_pulsar_options().unwrap();
246        assert_eq!(options.max_retries, Some(9));
247        assert_eq!(options.retry_delay, Duration::from_secs(3));
248    }
249}