risingwave_connector/source/pulsar/
split.rsuse risingwave_common::types::JsonbVal;
use serde::{Deserialize, Serialize};
use crate::error::ConnectorResult;
use crate::source::pulsar::topic::Topic;
use crate::source::pulsar::PulsarEnumeratorOffset;
use crate::source::{SplitId, SplitMetaData};
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Hash)]
pub struct PulsarSplit {
pub(crate) topic: Topic,
pub(crate) start_offset: PulsarEnumeratorOffset,
}
impl SplitMetaData for PulsarSplit {
fn id(&self) -> SplitId {
self.topic.to_string().into()
}
fn restore_from_json(value: JsonbVal) -> ConnectorResult<Self> {
serde_json::from_value(value.take()).map_err(Into::into)
}
fn encode_to_json(&self) -> JsonbVal {
serde_json::to_value(self.clone()).unwrap().into()
}
fn update_offset(&mut self, last_seen_offset: String) -> ConnectorResult<()> {
let start_offset = if last_seen_offset.is_empty() {
PulsarEnumeratorOffset::Earliest
} else {
PulsarEnumeratorOffset::MessageId(last_seen_offset)
};
self.start_offset = start_offset;
Ok(())
}
}