risingwave_connector/source/kinesis/source/
message.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use aws_sdk_kinesis::types::Record;
16use aws_smithy_types::DateTime;
17use aws_smithy_types_convert::date_time::DateTimeExt;
18use risingwave_common::types::{DatumRef, ScalarRefImpl};
19
20use crate::source::{SourceMessage, SourceMeta, SplitId};
21
22#[derive(Clone, Debug)]
23pub struct KinesisMeta {
24    // from `approximate_arrival_timestamp` of type `Option<aws_smithy_types::DateTime>`
25    timestamp: Option<DateTime>,
26}
27
28impl KinesisMeta {
29    pub fn extract_timestamp(&self) -> DatumRef<'_> {
30        Some(ScalarRefImpl::Timestamptz(
31            self.timestamp?.to_chrono_utc().ok()?.into(),
32        ))
33    }
34}
35
36pub fn from_kinesis_record(value: &Record, split_id: SplitId) -> SourceMessage {
37    SourceMessage {
38        key: Some(value.partition_key.clone().into_bytes()),
39        payload: Some(value.data.clone().into_inner()),
40        offset: value.sequence_number.clone(),
41        split_id,
42        meta: SourceMeta::Kinesis(KinesisMeta {
43            timestamp: value.approximate_arrival_timestamp,
44        }),
45    }
46}