risingwave_connector/source/google_pubsub/
split.rs1use risingwave_common::types::JsonbVal;
16use serde::{Deserialize, Serialize};
17
18use crate::error::ConnectorResult;
19use crate::source::{SplitId, SplitMetaData};
20
21#[derive(Clone, Serialize, Deserialize, Debug, PartialEq, Hash)]
22pub struct PubsubSplit {
23 pub(crate) index: u32,
26 pub(crate) subscription: String,
27
28 #[serde(rename = "start_offset")]
29 #[serde(skip_serializing)]
30 pub(crate) __deprecated_start_offset: Option<String>,
31
32 #[serde(rename = "stop_offset")]
33 #[serde(skip_serializing)]
34 pub(crate) __deprecated_stop_offset: Option<String>,
35}
36
37impl SplitMetaData for PubsubSplit {
38 fn restore_from_json(value: JsonbVal) -> ConnectorResult<Self> {
39 serde_json::from_value(value.take()).map_err(Into::into)
40 }
41
42 fn encode_to_json(&self) -> JsonbVal {
43 serde_json::to_value(self.clone()).unwrap().into()
44 }
45
46 fn id(&self) -> SplitId {
47 format!("{}-{}", self.subscription, self.index).into()
48 }
49
50 fn update_offset(&mut self, _last_seen_offset: String) -> ConnectorResult<()> {
53 self.__deprecated_start_offset = None;
55 Ok(())
56 }
57}