risingwave_connector/source/pulsar/
mod.rspub mod enumerator;
pub mod source;
pub mod split;
pub mod topic;
use std::collections::HashMap;
pub use enumerator::*;
use serde::Deserialize;
use serde_with::serde_as;
pub use split::*;
use with_options::WithOptions;
use self::source::reader::PulsarSplitReader;
use crate::connector_common::{AwsAuthProps, PulsarCommon, PulsarOauthCommon};
use crate::source::SourceProperties;
pub const PULSAR_CONNECTOR: &str = "pulsar";
impl SourceProperties for PulsarProperties {
type Split = PulsarSplit;
type SplitEnumerator = PulsarSplitEnumerator;
type SplitReader = PulsarSplitReader;
const SOURCE_NAME: &'static str = PULSAR_CONNECTOR;
}
impl crate::source::UnknownFields for PulsarProperties {
fn unknown_fields(&self) -> HashMap<String, String> {
self.unknown_fields.clone()
}
}
#[derive(Clone, Debug, Deserialize, WithOptions)]
#[serde_as]
pub struct PulsarProperties {
#[serde(rename = "scan.startup.mode", alias = "pulsar.scan.startup.mode")]
pub scan_startup_mode: Option<String>,
#[serde(
rename = "scan.startup.timestamp.millis",
alias = "pulsar.time.offset",
alias = "scan.startup.timestamp_millis"
)]
pub time_offset: Option<String>,
#[serde(flatten)]
pub common: PulsarCommon,
#[serde(flatten)]
pub oauth: Option<PulsarOauthCommon>,
#[serde(flatten)]
pub aws_auth_props: AwsAuthProps,
#[serde(rename = "iceberg.enabled")]
#[serde_as(as = "DisplayFromStr")]
pub iceberg_loader_enabled: Option<bool>,
#[serde(rename = "iceberg.bucket", default)]
pub iceberg_bucket: Option<String>,
#[serde(rename = "subscription.name.prefix")]
pub subscription_name_prefix: Option<String>,
#[serde(flatten)]
pub unknown_fields: HashMap<String, String>,
}