risingwave_connector/source/kinesis/source/
message.rs1use aws_sdk_kinesis::types::Record;
16use aws_smithy_types::DateTime;
17use aws_smithy_types_convert::date_time::DateTimeExt;
18use risingwave_common::types::{DatumRef, ScalarRefImpl};
19
20use crate::source::{SourceMessage, SourceMeta, SplitId};
21
22#[derive(Clone, Debug)]
23pub struct KinesisMeta {
24 timestamp: Option<DateTime>,
26}
27
28impl KinesisMeta {
29 pub fn extract_timestamp(&self) -> DatumRef<'_> {
30 Some(ScalarRefImpl::Timestamptz(
31 self.timestamp?.to_chrono_utc().ok()?.into(),
32 ))
33 }
34}
35
36pub fn from_kinesis_record(value: &Record, split_id: SplitId) -> SourceMessage {
37 SourceMessage {
38 key: Some(value.partition_key.clone().into_bytes()),
39 payload: Some(value.data.clone().into_inner()),
40 offset: value.sequence_number.clone(),
41 split_id,
42 meta: SourceMeta::Kinesis(KinesisMeta {
43 timestamp: value.approximate_arrival_timestamp,
44 }),
45 }
46}