risingwave_connector/source/mqtt/
mod.rspub mod enumerator;
pub mod source;
pub mod split;
use std::collections::HashMap;
use std::fmt::{Display, Formatter};
use serde_derive::Deserialize;
use serde_with::{serde_as, DisplayFromStr};
use thiserror::Error;
use with_options::WithOptions;
use crate::connector_common::{MqttCommon, MqttQualityOfService};
use crate::source::mqtt::enumerator::MqttSplitEnumerator;
use crate::source::mqtt::source::{MqttSplit, MqttSplitReader};
use crate::source::SourceProperties;
pub const MQTT_CONNECTOR: &str = "mqtt";
#[derive(Debug, Clone, Error)]
pub struct MqttError(String);
impl Display for MqttError {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
write!(f, "{}", self.0)
}
}
#[serde_as]
#[derive(Clone, Debug, Deserialize, WithOptions)]
pub struct MqttProperties {
#[serde(flatten)]
pub common: MqttCommon,
pub topic: String,
#[serde_as(as = "Option<DisplayFromStr>")]
pub qos: Option<MqttQualityOfService>,
#[serde(flatten)]
pub unknown_fields: HashMap<String, String>,
}
impl SourceProperties for MqttProperties {
type Split = MqttSplit;
type SplitEnumerator = MqttSplitEnumerator;
type SplitReader = MqttSplitReader;
const SOURCE_NAME: &'static str = MQTT_CONNECTOR;
}
impl crate::source::UnknownFields for MqttProperties {
fn unknown_fields(&self) -> HashMap<String, String> {
self.unknown_fields.clone()
}
}