risingwave_connector/source/mqtt/
split.rs1use risingwave_common::types::JsonbVal;
16use serde::{Deserialize, Serialize};
17
18use crate::error::ConnectorResult;
19use crate::source::{SplitId, SplitMetaData};
20
21#[derive(Clone, Serialize, Deserialize, Debug, PartialEq, Hash)]
23pub struct MqttSplit {
24 pub(crate) topic: String,
25}
26
27impl SplitMetaData for MqttSplit {
28 fn id(&self) -> SplitId {
29 self.topic.clone().into()
31 }
32
33 fn restore_from_json(value: JsonbVal) -> ConnectorResult<Self> {
34 serde_json::from_value(value.take()).map_err(Into::into)
35 }
36
37 fn encode_to_json(&self) -> JsonbVal {
38 serde_json::to_value(self.clone()).unwrap().into()
39 }
40
41 fn update_offset(&mut self, _last_seen_offset: String) -> ConnectorResult<()> {
42 Ok(())
43 }
44}
45
46impl MqttSplit {
47 pub fn new(topic: String) -> Self {
48 Self { topic }
49 }
50}