risingwave_connector/source/pulsar/
split.rs1use risingwave_common::types::JsonbVal;
16use serde::{Deserialize, Serialize};
17
18use crate::error::ConnectorResult;
19use crate::source::pulsar::PulsarEnumeratorOffset;
20use crate::source::pulsar::topic::Topic;
21use crate::source::{SplitId, SplitMetaData};
22
23#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Hash)]
24pub struct PulsarSplit {
25 pub(crate) topic: Topic,
26 pub(crate) start_offset: PulsarEnumeratorOffset,
27}
28
29impl SplitMetaData for PulsarSplit {
30 fn id(&self) -> SplitId {
31 self.topic.to_string().into()
33 }
34
35 fn restore_from_json(value: JsonbVal) -> ConnectorResult<Self> {
36 serde_json::from_value(value.take()).map_err(Into::into)
37 }
38
39 fn encode_to_json(&self) -> JsonbVal {
40 serde_json::to_value(self.clone()).unwrap().into()
41 }
42
43 fn update_offset(&mut self, last_seen_offset: String) -> ConnectorResult<()> {
44 let start_offset = if last_seen_offset.is_empty() {
45 PulsarEnumeratorOffset::Earliest
46 } else {
47 PulsarEnumeratorOffset::MessageId(last_seen_offset)
48 };
49
50 self.start_offset = start_offset;
51 Ok(())
52 }
53}