risingwave_connector/sink/
kinesis.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::BTreeMap;
16
17use anyhow::{Context, anyhow};
18use aws_sdk_kinesis::Client as KinesisClient;
19use aws_sdk_kinesis::operation::put_records::PutRecordsOutput;
20use aws_sdk_kinesis::primitives::Blob;
21use aws_sdk_kinesis::types::{PutRecordsRequestEntry, PutRecordsResultEntry};
22use futures::{FutureExt, TryFuture};
23use itertools::Itertools;
24use risingwave_common::array::StreamChunk;
25use risingwave_common::catalog::Schema;
26use serde_derive::Deserialize;
27use serde_with::serde_as;
28use with_options::WithOptions;
29
30use super::SinkParam;
31use super::catalog::SinkFormatDesc;
32use crate::connector_common::KinesisCommon;
33use crate::dispatch_sink_formatter_str_key_impl;
34use crate::enforce_secret::EnforceSecret;
35use crate::sink::formatter::SinkFormatterImpl;
36use crate::sink::log_store::DeliveryFutureManagerAddFuture;
37use crate::sink::writer::{
38    AsyncTruncateLogSinkerOf, AsyncTruncateSinkWriter, AsyncTruncateSinkWriterExt, FormattedSink,
39};
40use crate::sink::{DummySinkCommitCoordinator, Result, Sink, SinkError, SinkWriterParam};
41pub const KINESIS_SINK: &str = "kinesis";
42
43#[derive(Clone, Debug)]
44pub struct KinesisSink {
45    pub config: KinesisSinkConfig,
46    schema: Schema,
47    pk_indices: Vec<usize>,
48    format_desc: SinkFormatDesc,
49    db_name: String,
50    sink_from_name: String,
51}
52
53impl EnforceSecret for KinesisSink {
54    fn enforce_secret<'a>(
55        prop_iter: impl Iterator<Item = &'a str>,
56    ) -> crate::error::ConnectorResult<()> {
57        for prop in prop_iter {
58            KinesisSinkConfig::enforce_one(prop)?;
59        }
60        Ok(())
61    }
62}
63
64impl TryFrom<SinkParam> for KinesisSink {
65    type Error = SinkError;
66
67    fn try_from(param: SinkParam) -> std::result::Result<Self, Self::Error> {
68        let schema = param.schema();
69        let config = KinesisSinkConfig::from_btreemap(param.properties)?;
70        Ok(Self {
71            config,
72            schema,
73            pk_indices: param.downstream_pk,
74            format_desc: param
75                .format_desc
76                .ok_or_else(|| SinkError::Config(anyhow!("missing FORMAT ... ENCODE ...")))?,
77            db_name: param.db_name,
78            sink_from_name: param.sink_from_name,
79        })
80    }
81}
82
83const KINESIS_SINK_MAX_PENDING_CHUNK_NUM: usize = 64;
84
85impl Sink for KinesisSink {
86    type Coordinator = DummySinkCommitCoordinator;
87    type LogSinker = AsyncTruncateLogSinkerOf<KinesisSinkWriter>;
88
89    const SINK_NAME: &'static str = KINESIS_SINK;
90
91    async fn validate(&self) -> Result<()> {
92        // Kinesis requires partition key. There is no builtin support for round-robin as in kafka/pulsar.
93        // https://docs.aws.amazon.com/kinesis/latest/APIReference/API_PutRecord.html#Streams-PutRecord-request-PartitionKey
94        if self.pk_indices.is_empty() {
95            return Err(SinkError::Config(anyhow!(
96                "kinesis sink requires partition key (please define in `primary_key` field)",
97            )));
98        }
99        // Check for formatter constructor error, before it is too late for error reporting.
100        SinkFormatterImpl::new(
101            &self.format_desc,
102            self.schema.clone(),
103            self.pk_indices.clone(),
104            self.db_name.clone(),
105            self.sink_from_name.clone(),
106            &self.config.common.stream_name,
107        )
108        .await?;
109
110        // check reachability
111        let client = self.config.common.build_client().await?;
112        client
113            .list_shards()
114            .stream_name(&self.config.common.stream_name)
115            .send()
116            .await
117            .context("failed to list shards")
118            .map_err(SinkError::Kinesis)?;
119        Ok(())
120    }
121
122    async fn new_log_sinker(&self, _writer_param: SinkWriterParam) -> Result<Self::LogSinker> {
123        Ok(KinesisSinkWriter::new(
124            self.config.clone(),
125            self.schema.clone(),
126            self.pk_indices.clone(),
127            &self.format_desc,
128            self.db_name.clone(),
129            self.sink_from_name.clone(),
130        )
131        .await?
132        .into_log_sinker(KINESIS_SINK_MAX_PENDING_CHUNK_NUM))
133    }
134}
135
136#[serde_as]
137#[derive(Clone, Debug, Deserialize, WithOptions)]
138pub struct KinesisSinkConfig {
139    #[serde(flatten)]
140    pub common: KinesisCommon,
141}
142
143impl EnforceSecret for KinesisSinkConfig {
144    fn enforce_one(prop: &str) -> crate::error::ConnectorResult<()> {
145        KinesisCommon::enforce_one(prop)?;
146        Ok(())
147    }
148}
149
150impl KinesisSinkConfig {
151    pub fn from_btreemap(properties: BTreeMap<String, String>) -> Result<Self> {
152        let config =
153            serde_json::from_value::<KinesisSinkConfig>(serde_json::to_value(properties).unwrap())
154                .map_err(|e| SinkError::Config(anyhow!(e)))?;
155        Ok(config)
156    }
157}
158
159pub struct KinesisSinkWriter {
160    pub config: KinesisSinkConfig,
161    formatter: SinkFormatterImpl,
162    client: KinesisClient,
163}
164
165struct KinesisSinkPayloadWriter {
166    client: KinesisClient,
167    entries: Vec<(PutRecordsRequestEntry, usize)>,
168    stream_name: String,
169}
170
171impl KinesisSinkWriter {
172    pub async fn new(
173        config: KinesisSinkConfig,
174        schema: Schema,
175        pk_indices: Vec<usize>,
176        format_desc: &SinkFormatDesc,
177        db_name: String,
178        sink_from_name: String,
179    ) -> Result<Self> {
180        let formatter = SinkFormatterImpl::new(
181            format_desc,
182            schema,
183            pk_indices,
184            db_name,
185            sink_from_name,
186            &config.common.stream_name,
187        )
188        .await?;
189        let client = config
190            .common
191            .build_client()
192            .await
193            .map_err(|err| SinkError::Kinesis(anyhow!(err)))?;
194        Ok(Self {
195            config: config.clone(),
196            formatter,
197            client,
198        })
199    }
200
201    fn new_payload_writer(&self) -> KinesisSinkPayloadWriter {
202        KinesisSinkPayloadWriter {
203            client: self.client.clone(),
204            entries: vec![],
205            stream_name: self.config.common.stream_name.clone(),
206        }
207    }
208}
209
210mod opaque_type {
211    use std::cmp::min;
212    use std::time::Duration;
213
214    use thiserror_ext::AsReport;
215    use tokio::time::sleep;
216    use tokio_retry::strategy::{ExponentialBackoff, jitter};
217    use tracing::warn;
218
219    use super::*;
220    pub type KinesisSinkPayloadWriterDeliveryFuture =
221        impl TryFuture<Ok = (), Error = SinkError> + Unpin + Send + 'static;
222
223    impl KinesisSinkPayloadWriter {
224        #[define_opaque(KinesisSinkPayloadWriterDeliveryFuture)]
225        pub(super) fn finish(self) -> KinesisSinkPayloadWriterDeliveryFuture {
226            // For reference to the behavior of `put_records`
227            // https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/kinesis/client/put_records.html
228
229            async move {
230                // From the doc of `put_records`:
231                // Each PutRecords request can support up to 500 records. Each record in the request can be as large as 1 MiB,
232                // up to a limit of 5 MiB for the entire request, including partition keys. Each shard can support writes up to
233                // 1,000 records per second, up to a maximum data write total of 1 MiB per second.
234
235                const MAX_RECORD_COUNT: usize = 500;
236                const MAX_SINGLE_RECORD_PAYLOAD_SIZE: usize = 1 << 20;
237                const MAX_TOTAL_RECORD_PAYLOAD_SIZE: usize = 5 * (1 << 20);
238                // Allow at most 3 times of retry when not making any progress to avoid endless retry
239                const MAX_NO_PROGRESS_RETRY_COUNT: usize = 3;
240
241                let mut remaining_no_progress_retry_count = MAX_NO_PROGRESS_RETRY_COUNT;
242                let total_count = self.entries.len();
243                let mut start_idx = 0;
244
245                let mut throttle_delay = None;
246
247                while start_idx < total_count {
248                    // 1. Prepare the records to be sent
249
250                    // The maximum possible number of records that can be sent in this iteration.
251                    // Can be smaller than this number when the total payload size exceeds `MAX_TOTAL_RECORD_PAYLOAD_SIZE`
252                    let max_record_count = min(MAX_RECORD_COUNT, total_count - start_idx);
253                    let mut records = Vec::with_capacity(max_record_count);
254                    let mut total_payload_size = 0;
255                    for i in start_idx..(start_idx + max_record_count) {
256                        let (record, size) = &self.entries[i];
257                        if *size >= MAX_SINGLE_RECORD_PAYLOAD_SIZE {
258                            warn!(
259                                size,
260                                partition = record.partition_key,
261                                "encounter a large single record"
262                            );
263                        }
264                        if total_payload_size + *size < MAX_TOTAL_RECORD_PAYLOAD_SIZE {
265                            total_payload_size += *size;
266                            records.push(record.clone());
267                        } else {
268                            break;
269                        }
270                    }
271                    if records.is_empty() {
272                        // at least include one record even if its size exceed `MAX_TOTAL_RECORD_PAYLOAD_SIZE`
273                        records.push(self.entries[start_idx].0.clone());
274                    }
275
276                    // 2. send the records and handle the result
277                    let record_count = records.len();
278                    match self
279                        .client
280                        .put_records()
281                        .stream_name(&self.stream_name)
282                        .set_records(Some(records))
283                        .send()
284                        .await
285                    {
286                        Ok(output) => {
287                            if record_count != output.records.len() {
288                                return Err(SinkError::Kinesis(anyhow!("request record count {} not match the response record count {}", record_count, output.records.len())));
289                            }
290                            // From the doc of `put_records`:
291                            // A single record failure does not stop the processing of subsequent records. As a result,
292                            // PutRecords doesn’t guarantee the ordering of records. If you need to read records in the same
293                            // order they are written to the stream, use PutRecord instead of PutRecords, and write to the same shard.
294
295                            // Therefore, to ensure at least once and eventual consistency, we figure out the first failed entry, and retry
296                            // all the following entries even if the following entries may have been successfully processed.
297                            if let Some((first_failed_idx, result_entry)) = Self::first_failed_entry(output) {
298                                // first_failed_idx is also the number of successful entries
299                                let partially_sent_count = first_failed_idx;
300                                if partially_sent_count > 0 {
301                                    warn!(
302                                        partially_sent_count,
303                                        record_count,
304                                        "records are partially sent. code: [{}], message: [{}]",
305                                        result_entry.error_code.unwrap_or_default(),
306                                        result_entry.error_message.unwrap_or_default()
307                                    );
308                                    start_idx += partially_sent_count;
309                                    // reset retry count when having progress
310                                    remaining_no_progress_retry_count = MAX_NO_PROGRESS_RETRY_COUNT;
311                                } else if let Some(err_code) = &result_entry.error_code && err_code == "ProvisionedThroughputExceededException" {
312                                    // From the doc of `put_records`:
313                                    // The ErrorCode parameter reflects the type of error and can be one of the following values:
314                                    // ProvisionedThroughputExceededException or InternalFailure. ErrorMessage provides more detailed
315                                    // information about the ProvisionedThroughputExceededException exception including the account ID,
316                                    // stream name, and shard ID of the record that was throttled.
317                                    let throttle_delay = throttle_delay.get_or_insert_with(|| ExponentialBackoff::from_millis(100).factor(2).max_delay(Duration::from_secs(2)).map(jitter)).next().expect("should not be none");
318                                    warn!(err_string = ?result_entry.error_message, ?throttle_delay, "throttle");
319                                    sleep(throttle_delay).await;
320                                } else  {
321                                    // no progress due to some internal error
322                                    assert_eq!(first_failed_idx, 0);
323                                    remaining_no_progress_retry_count -= 1;
324                                    if remaining_no_progress_retry_count == 0 {
325                                        return Err(SinkError::Kinesis(anyhow!(
326                                            "failed to send records. sent {} out of {}, last err: code: [{}], message: [{}]",
327                                            start_idx,
328                                            total_count,
329                                            result_entry.error_code.unwrap_or_default(),
330                                            result_entry.error_message.unwrap_or_default()
331                                        )));
332                                    } else {
333                                        warn!(
334                                            remaining_no_progress_retry_count,
335                                            sent = start_idx,
336                                            total_count,
337                                            "failed to send records. code: [{}], message: [{}]",
338                                            result_entry.error_code.unwrap_or_default(),
339                                            result_entry.error_message.unwrap_or_default()
340                                        )
341                                    }
342                                }
343                            } else {
344                                start_idx += record_count;
345                                // reset retry count when having progress
346                                remaining_no_progress_retry_count = MAX_NO_PROGRESS_RETRY_COUNT;
347                                // reset throttle delay when records can be fully sent.
348                                throttle_delay = None;
349                            }
350                        }
351                        Err(e) => {
352                            remaining_no_progress_retry_count -= 1;
353                            if remaining_no_progress_retry_count == 0 {
354                                return Err(SinkError::Kinesis(anyhow!(e).context(format!(
355                                    "failed to send records. sent {} out of {}",
356                                    start_idx, total_count,
357                                ))));
358                            } else {
359                                warn!(
360                                    remaining_no_progress_retry_count,
361                                    sent = start_idx,
362                                    total_count,
363                                    "failed to send records. err: [{:?}]",
364                                    e.as_report(),
365                                )
366                            }
367                        }
368                    }
369                }
370                Ok(())
371            }
372            .boxed()
373        }
374    }
375}
376pub use opaque_type::KinesisSinkPayloadWriterDeliveryFuture;
377
378impl KinesisSinkPayloadWriter {
379    fn first_failed_entry(output: PutRecordsOutput) -> Option<(usize, PutRecordsResultEntry)> {
380        // From the doc of `put_records`:
381        // A successfully processed record includes ShardId and SequenceNumber values. The ShardId parameter
382        // identifies the shard in the stream where the record is stored. The SequenceNumber parameter is an
383        // identifier assigned to the put record, unique to all records in the stream.
384        //
385        // An unsuccessfully processed record includes ErrorCode and ErrorMessage values. ErrorCode reflects
386        // the type of error and can be one of the following values: ProvisionedThroughputExceededException or
387        // InternalFailure. ErrorMessage provides more detailed information about the ProvisionedThroughputExceededException
388        // exception including the account ID, stream name, and shard ID of the record that was throttled.
389        output
390            .records
391            .into_iter()
392            .find_position(|entry| entry.shard_id.is_none())
393    }
394
395    fn put_record(&mut self, key: String, payload: Vec<u8>) {
396        let size = key.len() + payload.len();
397        self.entries.push((
398            PutRecordsRequestEntry::builder()
399                .partition_key(key)
400                .data(Blob::new(payload))
401                .build()
402                .expect("should not fail because we have set `data` and `partition_key`"),
403            size,
404        ))
405    }
406}
407
408impl FormattedSink for KinesisSinkPayloadWriter {
409    type K = String;
410    type V = Vec<u8>;
411
412    async fn write_one(&mut self, k: Option<Self::K>, v: Option<Self::V>) -> Result<()> {
413        self.put_record(
414            k.ok_or_else(|| SinkError::Kinesis(anyhow!("no key provided")))?,
415            v.unwrap_or_default(),
416        );
417        Ok(())
418    }
419}
420
421impl AsyncTruncateSinkWriter for KinesisSinkWriter {
422    type DeliveryFuture = KinesisSinkPayloadWriterDeliveryFuture;
423
424    async fn write_chunk<'a>(
425        &'a mut self,
426        chunk: StreamChunk,
427        mut add_future: DeliveryFutureManagerAddFuture<'a, Self::DeliveryFuture>,
428    ) -> Result<()> {
429        let mut payload_writer = self.new_payload_writer();
430        dispatch_sink_formatter_str_key_impl!(
431            &self.formatter,
432            formatter,
433            payload_writer.write_chunk(chunk, formatter).await
434        )?;
435
436        add_future
437            .add_future_may_await(payload_writer.finish())
438            .await?;
439        Ok(())
440    }
441}
442
443#[cfg(test)]
444mod tests {
445    use aws_sdk_kinesis::types::PutRecordsRequestEntry;
446    use aws_smithy_types::Blob;
447
448    #[test]
449    fn test_kinesis_entry_builder_save_unwrap() {
450        PutRecordsRequestEntry::builder()
451            .data(Blob::new(b"data"))
452            .partition_key("partition-key")
453            .build()
454            .unwrap();
455    }
456}