risingwave_connector/source/pulsar/
mod.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15pub mod enumerator;
16pub mod source;
17pub mod split;
18pub mod topic;
19
20use std::collections::HashMap;
21use std::time::Duration;
22
23pub use enumerator::*;
24use serde::Deserialize;
25use serde_with::serde_as;
26pub use split::*;
27use with_options::WithOptions;
28
29use self::source::reader::PulsarSplitReader;
30use crate::connector_common::{AwsAuthProps, PulsarCommon, PulsarOauthCommon};
31use crate::enforce_secret::EnforceSecret;
32use crate::error::ConnectorError;
33use crate::source::SourceProperties;
34use crate::{deserialize_optional_bool_from_string, deserialize_optional_duration_from_string};
35
36pub const PULSAR_CONNECTOR: &str = "pulsar";
37
38impl SourceProperties for PulsarProperties {
39    type Split = PulsarSplit;
40    type SplitEnumerator = PulsarSplitEnumerator;
41    type SplitReader = PulsarSplitReader;
42
43    const SOURCE_NAME: &'static str = PULSAR_CONNECTOR;
44}
45
46impl crate::source::UnknownFields for PulsarProperties {
47    fn unknown_fields(&self) -> HashMap<String, String> {
48        self.unknown_fields.clone()
49    }
50}
51
52impl EnforceSecret for PulsarProperties {
53    fn enforce_secret<'a>(prop_iter: impl Iterator<Item = &'a str>) -> Result<(), ConnectorError> {
54        for prop in prop_iter {
55            PulsarCommon::enforce_one(prop)?;
56        }
57        Ok(())
58    }
59}
60
61impl EnforceSecret for PulsarConsumerOptions {}
62
63#[derive(Clone, Debug, Deserialize, WithOptions)]
64#[serde_as]
65pub struct PulsarConsumerOptions {
66    #[serde(
67        rename = "pulsar.read_compacted",
68        default,
69        deserialize_with = "deserialize_optional_bool_from_string"
70    )]
71    pub read_compacted: Option<bool>,
72}
73
74#[derive(Clone, Debug, Deserialize, WithOptions)]
75#[serde_as]
76pub struct PulsarProperties {
77    #[serde(rename = "scan.startup.mode", alias = "pulsar.scan.startup.mode")]
78    pub scan_startup_mode: Option<String>,
79
80    #[serde(
81        rename = "scan.startup.timestamp.millis",
82        alias = "pulsar.time.offset",
83        alias = "scan.startup.timestamp_millis"
84    )]
85    pub time_offset: Option<String>,
86
87    #[serde(flatten)]
88    pub common: PulsarCommon,
89
90    #[serde(flatten)]
91    pub oauth: Option<PulsarOauthCommon>,
92
93    #[serde(flatten)]
94    pub aws_auth_props: AwsAuthProps,
95
96    #[serde(rename = "iceberg.enabled")]
97    #[serde_as(as = "DisplayFromStr")]
98    pub iceberg_loader_enabled: Option<bool>,
99
100    #[serde(rename = "iceberg.bucket", default)]
101    pub iceberg_bucket: Option<String>,
102
103    /// Specify a custom consumer group id prefix for the source.
104    /// Defaults to `rw-consumer`.
105    ///
106    /// Notes:
107    /// - Each job (materialized view) will have multiple subscriptions and
108    ///   contains a generated suffix in the subscription name.
109    ///   The subscription name will be `{subscription_name_prefix}-{fragment_id}-{actor_id}`.
110    #[serde(rename = "subscription.name.prefix")]
111    pub subscription_name_prefix: Option<String>,
112
113    #[serde(
114        rename = "subscription.unacked.resend.delay",
115        deserialize_with = "deserialize_optional_duration_from_string",
116        default
117    )]
118    pub subscription_unacked_resend_delay: Option<Duration>,
119
120    #[serde(flatten)]
121    pub consumer_options: PulsarConsumerOptions,
122
123    #[serde(flatten)]
124    pub unknown_fields: HashMap<String, String>,
125}