risingwave_connector/source/pulsar/
mod.rs1pub mod enumerator;
16pub mod source;
17pub mod split;
18pub mod topic;
19
20use std::collections::HashMap;
21
22pub use enumerator::*;
23use serde::Deserialize;
24use serde_with::serde_as;
25pub use split::*;
26use with_options::WithOptions;
27
28use self::source::reader::PulsarSplitReader;
29use crate::connector_common::{AwsAuthProps, PulsarCommon, PulsarOauthCommon};
30use crate::enforce_secret::EnforceSecret;
31use crate::error::ConnectorError;
32use crate::source::SourceProperties;
33
34pub const PULSAR_CONNECTOR: &str = "pulsar";
35
36impl SourceProperties for PulsarProperties {
37 type Split = PulsarSplit;
38 type SplitEnumerator = PulsarSplitEnumerator;
39 type SplitReader = PulsarSplitReader;
40
41 const SOURCE_NAME: &'static str = PULSAR_CONNECTOR;
42}
43
44impl crate::source::UnknownFields for PulsarProperties {
45 fn unknown_fields(&self) -> HashMap<String, String> {
46 self.unknown_fields.clone()
47 }
48}
49
50impl EnforceSecret for PulsarProperties {
51 fn enforce_secret<'a>(prop_iter: impl Iterator<Item = &'a str>) -> Result<(), ConnectorError> {
52 for prop in prop_iter {
53 PulsarCommon::enforce_one(prop)?;
54 }
55 Ok(())
56 }
57}
58
59#[derive(Clone, Debug, Deserialize, WithOptions)]
60#[serde_as]
61pub struct PulsarProperties {
62 #[serde(rename = "scan.startup.mode", alias = "pulsar.scan.startup.mode")]
63 pub scan_startup_mode: Option<String>,
64
65 #[serde(
66 rename = "scan.startup.timestamp.millis",
67 alias = "pulsar.time.offset",
68 alias = "scan.startup.timestamp_millis"
69 )]
70 pub time_offset: Option<String>,
71
72 #[serde(flatten)]
73 pub common: PulsarCommon,
74
75 #[serde(flatten)]
76 pub oauth: Option<PulsarOauthCommon>,
77
78 #[serde(flatten)]
79 pub aws_auth_props: AwsAuthProps,
80
81 #[serde(rename = "iceberg.enabled")]
82 #[serde_as(as = "DisplayFromStr")]
83 pub iceberg_loader_enabled: Option<bool>,
84
85 #[serde(rename = "iceberg.bucket", default)]
86 pub iceberg_bucket: Option<String>,
87
88 #[serde(rename = "subscription.name.prefix")]
96 pub subscription_name_prefix: Option<String>,
97
98 #[serde(flatten)]
99 pub unknown_fields: HashMap<String, String>,
100}