risingwave_connector/source/kinesis/
split.rs1use risingwave_common::types::JsonbVal;
16use serde::{Deserialize, Serialize};
17
18use crate::error::ConnectorResult;
19use crate::source::{SplitId, SplitMetaData};
20
21#[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize, Hash)]
23pub enum KinesisOffset {
24 Earliest,
26 Latest,
28 #[serde(alias = "SequenceNumber")] AfterSequenceNumber(String),
31 Timestamp(i64),
33
34 None,
35}
36
37#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Hash)]
38pub struct KinesisSplit {
39 pub(crate) shard_id: SplitId,
40
41 #[serde(alias = "start_position")] pub(crate) next_offset: KinesisOffset,
43 #[serde(alias = "end_position")] pub(crate) end_offset: KinesisOffset,
45}
46
47impl SplitMetaData for KinesisSplit {
48 fn id(&self) -> SplitId {
49 self.shard_id.clone()
50 }
51
52 fn restore_from_json(value: JsonbVal) -> ConnectorResult<Self> {
53 serde_json::from_value(value.take()).map_err(Into::into)
54 }
55
56 fn encode_to_json(&self) -> JsonbVal {
57 serde_json::to_value(self.clone()).unwrap().into()
58 }
59
60 fn update_offset(&mut self, last_seen_offset: String) -> ConnectorResult<()> {
61 self.next_offset = KinesisOffset::AfterSequenceNumber(last_seen_offset);
62 Ok(())
63 }
64}
65
66impl KinesisSplit {
67 pub fn new(
68 shard_id: SplitId,
69 next_offset: KinesisOffset,
70 end_offset: KinesisOffset,
71 ) -> KinesisSplit {
72 KinesisSplit {
73 shard_id,
74 next_offset,
75 end_offset,
76 }
77 }
78}