risingwave_connector/source/pulsar/
mod.rs1pub mod enumerator;
16pub mod source;
17pub mod split;
18pub mod topic;
19
20use std::collections::HashMap;
21use std::time::Duration;
22
23pub use enumerator::*;
24use serde::Deserialize;
25use serde_with::serde_as;
26pub use split::*;
27use with_options::WithOptions;
28
29use self::source::reader::PulsarSplitReader;
30use crate::connector_common::{AwsAuthProps, PulsarCommon, PulsarOauthCommon};
31use crate::deserialize_optional_duration_from_string;
32use crate::enforce_secret::EnforceSecret;
33use crate::error::ConnectorError;
34use crate::source::SourceProperties;
35
36pub const PULSAR_CONNECTOR: &str = "pulsar";
37
38impl SourceProperties for PulsarProperties {
39 type Split = PulsarSplit;
40 type SplitEnumerator = PulsarSplitEnumerator;
41 type SplitReader = PulsarSplitReader;
42
43 const SOURCE_NAME: &'static str = PULSAR_CONNECTOR;
44}
45
46impl crate::source::UnknownFields for PulsarProperties {
47 fn unknown_fields(&self) -> HashMap<String, String> {
48 self.unknown_fields.clone()
49 }
50}
51
52impl EnforceSecret for PulsarProperties {
53 fn enforce_secret<'a>(prop_iter: impl Iterator<Item = &'a str>) -> Result<(), ConnectorError> {
54 for prop in prop_iter {
55 PulsarCommon::enforce_one(prop)?;
56 }
57 Ok(())
58 }
59}
60
61#[derive(Clone, Debug, Deserialize, WithOptions)]
62#[serde_as]
63pub struct PulsarProperties {
64 #[serde(rename = "scan.startup.mode", alias = "pulsar.scan.startup.mode")]
65 pub scan_startup_mode: Option<String>,
66
67 #[serde(
68 rename = "scan.startup.timestamp.millis",
69 alias = "pulsar.time.offset",
70 alias = "scan.startup.timestamp_millis"
71 )]
72 pub time_offset: Option<String>,
73
74 #[serde(flatten)]
75 pub common: PulsarCommon,
76
77 #[serde(flatten)]
78 pub oauth: Option<PulsarOauthCommon>,
79
80 #[serde(flatten)]
81 pub aws_auth_props: AwsAuthProps,
82
83 #[serde(rename = "iceberg.enabled")]
84 #[serde_as(as = "DisplayFromStr")]
85 pub iceberg_loader_enabled: Option<bool>,
86
87 #[serde(rename = "iceberg.bucket", default)]
88 pub iceberg_bucket: Option<String>,
89
90 #[serde(rename = "subscription.name.prefix")]
98 pub subscription_name_prefix: Option<String>,
99
100 #[serde(
101 rename = "subscription.unacked.resend.delay",
102 deserialize_with = "deserialize_optional_duration_from_string",
103 default
104 )]
105 pub subscription_unacked_resend_delay: Option<Duration>,
106
107 #[serde(flatten)]
108 pub unknown_fields: HashMap<String, String>,
109}