risingwave_connector/source/nats/
split.rs1use risingwave_common::types::JsonbVal;
16use serde::{Deserialize, Serialize};
17
18use crate::error::ConnectorResult;
19use crate::source::{SplitId, SplitMetaData};
20
21#[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize, Hash)]
22pub enum NatsOffset {
23 Earliest,
24 Latest,
25 SequenceNumber(String),
26 Timestamp(i64),
27 None,
28}
29
30#[derive(Clone, Serialize, Deserialize, Debug, PartialEq, Hash)]
32pub struct NatsSplit {
33 pub(crate) subject: String,
34 pub(crate) split_id: SplitId,
37 pub(crate) start_sequence: NatsOffset,
38}
39
40impl SplitMetaData for NatsSplit {
41 fn id(&self) -> SplitId {
42 format!("{}", self.split_id).into()
44 }
45
46 fn restore_from_json(value: JsonbVal) -> ConnectorResult<Self> {
47 serde_json::from_value(value.take()).map_err(Into::into)
48 }
49
50 fn encode_to_json(&self) -> JsonbVal {
51 serde_json::to_value(self.clone()).unwrap().into()
52 }
53
54 fn update_offset(&mut self, _last_seen_offset: String) -> ConnectorResult<()> {
55 Ok(())
57 }
58}
59
60impl NatsSplit {
61 pub fn new(subject: String, split_id: SplitId, start_sequence: NatsOffset) -> Self {
62 Self {
63 subject,
64 split_id,
65 start_sequence,
66 }
67 }
68}