risingwave_connector/sink/
kinesis.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::BTreeMap;
16
17use anyhow::{Context, anyhow};
18use aws_sdk_kinesis::Client as KinesisClient;
19use aws_sdk_kinesis::operation::put_records::PutRecordsOutput;
20use aws_sdk_kinesis::primitives::Blob;
21use aws_sdk_kinesis::types::{PutRecordsRequestEntry, PutRecordsResultEntry};
22use futures::{FutureExt, TryFuture};
23use itertools::Itertools;
24use risingwave_common::array::StreamChunk;
25use risingwave_common::catalog::Schema;
26use serde_derive::Deserialize;
27use serde_with::serde_as;
28use with_options::WithOptions;
29
30use super::SinkParam;
31use super::catalog::SinkFormatDesc;
32use crate::connector_common::KinesisCommon;
33use crate::dispatch_sink_formatter_str_key_impl;
34use crate::enforce_secret::EnforceSecret;
35use crate::sink::formatter::SinkFormatterImpl;
36use crate::sink::log_store::DeliveryFutureManagerAddFuture;
37use crate::sink::writer::{
38    AsyncTruncateLogSinkerOf, AsyncTruncateSinkWriter, AsyncTruncateSinkWriterExt, FormattedSink,
39};
40use crate::sink::{DummySinkCommitCoordinator, Result, Sink, SinkError, SinkWriterParam};
41pub const KINESIS_SINK: &str = "kinesis";
42
43#[derive(Clone, Debug)]
44pub struct KinesisSink {
45    pub config: KinesisSinkConfig,
46    schema: Schema,
47    pk_indices: Vec<usize>,
48    format_desc: SinkFormatDesc,
49    db_name: String,
50    sink_from_name: String,
51}
52
53impl EnforceSecret for KinesisSink {
54    fn enforce_secret<'a>(
55        prop_iter: impl Iterator<Item = &'a str>,
56    ) -> crate::error::ConnectorResult<()> {
57        for prop in prop_iter {
58            KinesisSinkConfig::enforce_one(prop)?;
59        }
60        Ok(())
61    }
62}
63
64impl TryFrom<SinkParam> for KinesisSink {
65    type Error = SinkError;
66
67    fn try_from(param: SinkParam) -> std::result::Result<Self, Self::Error> {
68        let schema = param.schema();
69        let config = KinesisSinkConfig::from_btreemap(param.properties)?;
70        Ok(Self {
71            config,
72            schema,
73            pk_indices: param.downstream_pk,
74            format_desc: param
75                .format_desc
76                .ok_or_else(|| SinkError::Config(anyhow!("missing FORMAT ... ENCODE ...")))?,
77            db_name: param.db_name,
78            sink_from_name: param.sink_from_name,
79        })
80    }
81}
82
83const KINESIS_SINK_MAX_PENDING_CHUNK_NUM: usize = 64;
84
85impl Sink for KinesisSink {
86    type Coordinator = DummySinkCommitCoordinator;
87    type LogSinker = AsyncTruncateLogSinkerOf<KinesisSinkWriter>;
88
89    const SINK_NAME: &'static str = KINESIS_SINK;
90
91    async fn validate(&self) -> Result<()> {
92        // Kinesis requires partition key. There is no builtin support for round-robin as in kafka/pulsar.
93        // https://docs.aws.amazon.com/kinesis/latest/APIReference/API_PutRecord.html#Streams-PutRecord-request-PartitionKey
94        if self.pk_indices.is_empty() {
95            return Err(SinkError::Config(anyhow!(
96                "kinesis sink requires partition key (please define in `primary_key` field)",
97            )));
98        }
99        // Check for formatter constructor error, before it is too late for error reporting.
100        SinkFormatterImpl::new(
101            &self.format_desc,
102            self.schema.clone(),
103            self.pk_indices.clone(),
104            self.db_name.clone(),
105            self.sink_from_name.clone(),
106            &self.config.common.stream_name,
107        )
108        .await?;
109
110        // check reachability
111        let client = self.config.common.build_client().await?;
112        client
113            .list_shards()
114            .stream_name(&self.config.common.stream_name)
115            .send()
116            .await
117            .context("failed to list shards")
118            .map_err(SinkError::Kinesis)?;
119        Ok(())
120    }
121
122    async fn new_log_sinker(&self, _writer_param: SinkWriterParam) -> Result<Self::LogSinker> {
123        Ok(KinesisSinkWriter::new(
124            self.config.clone(),
125            self.schema.clone(),
126            self.pk_indices.clone(),
127            &self.format_desc,
128            self.db_name.clone(),
129            self.sink_from_name.clone(),
130        )
131        .await?
132        .into_log_sinker(KINESIS_SINK_MAX_PENDING_CHUNK_NUM))
133    }
134}
135
136#[serde_as]
137#[derive(Clone, Debug, Deserialize, WithOptions)]
138pub struct KinesisSinkConfig {
139    #[serde(flatten)]
140    pub common: KinesisCommon,
141}
142
143impl EnforceSecret for KinesisSinkConfig {
144    fn enforce_one(prop: &str) -> crate::error::ConnectorResult<()> {
145        KinesisCommon::enforce_one(prop)?;
146        Ok(())
147    }
148}
149
150impl KinesisSinkConfig {
151    pub fn from_btreemap(properties: BTreeMap<String, String>) -> Result<Self> {
152        let config =
153            serde_json::from_value::<KinesisSinkConfig>(serde_json::to_value(properties).unwrap())
154                .map_err(|e| SinkError::Config(anyhow!(e)))?;
155        Ok(config)
156    }
157}
158
159pub struct KinesisSinkWriter {
160    pub config: KinesisSinkConfig,
161    formatter: SinkFormatterImpl,
162    client: KinesisClient,
163}
164
165struct KinesisSinkPayloadWriter {
166    client: KinesisClient,
167    entries: Vec<(PutRecordsRequestEntry, usize)>,
168    stream_name: String,
169}
170
171impl KinesisSinkWriter {
172    pub async fn new(
173        config: KinesisSinkConfig,
174        schema: Schema,
175        pk_indices: Vec<usize>,
176        format_desc: &SinkFormatDesc,
177        db_name: String,
178        sink_from_name: String,
179    ) -> Result<Self> {
180        let formatter = SinkFormatterImpl::new(
181            format_desc,
182            schema,
183            pk_indices,
184            db_name,
185            sink_from_name,
186            &config.common.stream_name,
187        )
188        .await?;
189        let client = config
190            .common
191            .build_client()
192            .await
193            .map_err(|err| SinkError::Kinesis(anyhow!(err)))?;
194        Ok(Self {
195            config: config.clone(),
196            formatter,
197            client,
198        })
199    }
200
201    fn new_payload_writer(&self) -> KinesisSinkPayloadWriter {
202        KinesisSinkPayloadWriter {
203            client: self.client.clone(),
204            entries: vec![],
205            stream_name: self.config.common.stream_name.clone(),
206        }
207    }
208}
209
210mod opaque_type {
211    use std::cmp::min;
212    use std::time::Duration;
213
214    use thiserror_ext::AsReport;
215    use tokio::time::sleep;
216    use tokio_retry::strategy::{ExponentialBackoff, jitter};
217    use tracing::warn;
218
219    use super::*;
220    pub type KinesisSinkPayloadWriterDeliveryFuture =
221        impl TryFuture<Ok = (), Error = SinkError> + Unpin + Send + 'static;
222
223    impl KinesisSinkPayloadWriter {
224        pub(super) fn finish(self) -> KinesisSinkPayloadWriterDeliveryFuture {
225            // For reference to the behavior of `put_records`
226            // https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/kinesis/client/put_records.html
227
228            async move {
229                // From the doc of `put_records`:
230                // Each PutRecords request can support up to 500 records. Each record in the request can be as large as 1 MiB,
231                // up to a limit of 5 MiB for the entire request, including partition keys. Each shard can support writes up to
232                // 1,000 records per second, up to a maximum data write total of 1 MiB per second.
233
234                const MAX_RECORD_COUNT: usize = 500;
235                const MAX_SINGLE_RECORD_PAYLOAD_SIZE: usize = 1 << 20;
236                const MAX_TOTAL_RECORD_PAYLOAD_SIZE: usize = 5 * (1 << 20);
237                // Allow at most 3 times of retry when not making any progress to avoid endless retry
238                const MAX_NO_PROGRESS_RETRY_COUNT: usize = 3;
239
240                let mut remaining_no_progress_retry_count = MAX_NO_PROGRESS_RETRY_COUNT;
241                let total_count = self.entries.len();
242                let mut start_idx = 0;
243
244                let mut throttle_delay = None;
245
246                while start_idx < total_count {
247                    // 1. Prepare the records to be sent
248
249                    // The maximum possible number of records that can be sent in this iteration.
250                    // Can be smaller than this number when the total payload size exceeds `MAX_TOTAL_RECORD_PAYLOAD_SIZE`
251                    let max_record_count = min(MAX_RECORD_COUNT, total_count - start_idx);
252                    let mut records = Vec::with_capacity(max_record_count);
253                    let mut total_payload_size = 0;
254                    for i in start_idx..(start_idx + max_record_count) {
255                        let (record, size) = &self.entries[i];
256                        if *size >= MAX_SINGLE_RECORD_PAYLOAD_SIZE {
257                            warn!(
258                                size,
259                                partition = record.partition_key,
260                                "encounter a large single record"
261                            );
262                        }
263                        if total_payload_size + *size < MAX_TOTAL_RECORD_PAYLOAD_SIZE {
264                            total_payload_size += *size;
265                            records.push(record.clone());
266                        } else {
267                            break;
268                        }
269                    }
270                    if records.is_empty() {
271                        // at least include one record even if its size exceed `MAX_TOTAL_RECORD_PAYLOAD_SIZE`
272                        records.push(self.entries[start_idx].0.clone());
273                    }
274
275                    // 2. send the records and handle the result
276                    let record_count = records.len();
277                    match self
278                        .client
279                        .put_records()
280                        .stream_name(&self.stream_name)
281                        .set_records(Some(records))
282                        .send()
283                        .await
284                    {
285                        Ok(output) => {
286                            if record_count != output.records.len() {
287                                return Err(SinkError::Kinesis(anyhow!("request record count {} not match the response record count {}", record_count, output.records.len())));
288                            }
289                            // From the doc of `put_records`:
290                            // A single record failure does not stop the processing of subsequent records. As a result,
291                            // PutRecords doesn’t guarantee the ordering of records. If you need to read records in the same
292                            // order they are written to the stream, use PutRecord instead of PutRecords, and write to the same shard.
293
294                            // Therefore, to ensure at least once and eventual consistency, we figure out the first failed entry, and retry
295                            // all the following entries even if the following entries may have been successfully processed.
296                            if let Some((first_failed_idx, result_entry)) = Self::first_failed_entry(output) {
297                                // first_failed_idx is also the number of successful entries
298                                let partially_sent_count = first_failed_idx;
299                                if partially_sent_count > 0 {
300                                    warn!(
301                                        partially_sent_count,
302                                        record_count,
303                                        "records are partially sent. code: [{}], message: [{}]",
304                                        result_entry.error_code.unwrap_or_default(),
305                                        result_entry.error_message.unwrap_or_default()
306                                    );
307                                    start_idx += partially_sent_count;
308                                    // reset retry count when having progress
309                                    remaining_no_progress_retry_count = MAX_NO_PROGRESS_RETRY_COUNT;
310                                } else if let Some(err_code) = &result_entry.error_code && err_code == "ProvisionedThroughputExceededException" {
311                                    // From the doc of `put_records`:
312                                    // The ErrorCode parameter reflects the type of error and can be one of the following values:
313                                    // ProvisionedThroughputExceededException or InternalFailure. ErrorMessage provides more detailed
314                                    // information about the ProvisionedThroughputExceededException exception including the account ID,
315                                    // stream name, and shard ID of the record that was throttled.
316                                    let throttle_delay = throttle_delay.get_or_insert_with(|| ExponentialBackoff::from_millis(100).factor(2).max_delay(Duration::from_secs(2)).map(jitter)).next().expect("should not be none");
317                                    warn!(err_string = ?result_entry.error_message, ?throttle_delay, "throttle");
318                                    sleep(throttle_delay).await;
319                                } else  {
320                                    // no progress due to some internal error
321                                    assert_eq!(first_failed_idx, 0);
322                                    remaining_no_progress_retry_count -= 1;
323                                    if remaining_no_progress_retry_count == 0 {
324                                        return Err(SinkError::Kinesis(anyhow!(
325                                            "failed to send records. sent {} out of {}, last err: code: [{}], message: [{}]",
326                                            start_idx,
327                                            total_count,
328                                            result_entry.error_code.unwrap_or_default(),
329                                            result_entry.error_message.unwrap_or_default()
330                                        )));
331                                    } else {
332                                        warn!(
333                                            remaining_no_progress_retry_count,
334                                            sent = start_idx,
335                                            total_count,
336                                            "failed to send records. code: [{}], message: [{}]",
337                                            result_entry.error_code.unwrap_or_default(),
338                                            result_entry.error_message.unwrap_or_default()
339                                        )
340                                    }
341                                }
342                            } else {
343                                start_idx += record_count;
344                                // reset retry count when having progress
345                                remaining_no_progress_retry_count = MAX_NO_PROGRESS_RETRY_COUNT;
346                                // reset throttle delay when records can be fully sent.
347                                throttle_delay = None;
348                            }
349                        }
350                        Err(e) => {
351                            remaining_no_progress_retry_count -= 1;
352                            if remaining_no_progress_retry_count == 0 {
353                                return Err(SinkError::Kinesis(anyhow!(e).context(format!(
354                                    "failed to send records. sent {} out of {}",
355                                    start_idx, total_count,
356                                ))));
357                            } else {
358                                warn!(
359                                    remaining_no_progress_retry_count,
360                                    sent = start_idx,
361                                    total_count,
362                                    "failed to send records. err: [{:?}]",
363                                    e.as_report(),
364                                )
365                            }
366                        }
367                    }
368                }
369                Ok(())
370            }
371            .boxed()
372        }
373    }
374}
375pub use opaque_type::KinesisSinkPayloadWriterDeliveryFuture;
376
377impl KinesisSinkPayloadWriter {
378    fn first_failed_entry(output: PutRecordsOutput) -> Option<(usize, PutRecordsResultEntry)> {
379        // From the doc of `put_records`:
380        // A successfully processed record includes ShardId and SequenceNumber values. The ShardId parameter
381        // identifies the shard in the stream where the record is stored. The SequenceNumber parameter is an
382        // identifier assigned to the put record, unique to all records in the stream.
383        //
384        // An unsuccessfully processed record includes ErrorCode and ErrorMessage values. ErrorCode reflects
385        // the type of error and can be one of the following values: ProvisionedThroughputExceededException or
386        // InternalFailure. ErrorMessage provides more detailed information about the ProvisionedThroughputExceededException
387        // exception including the account ID, stream name, and shard ID of the record that was throttled.
388        output
389            .records
390            .into_iter()
391            .find_position(|entry| entry.shard_id.is_none())
392    }
393
394    fn put_record(&mut self, key: String, payload: Vec<u8>) {
395        let size = key.len() + payload.len();
396        self.entries.push((
397            PutRecordsRequestEntry::builder()
398                .partition_key(key)
399                .data(Blob::new(payload))
400                .build()
401                .expect("should not fail because we have set `data` and `partition_key`"),
402            size,
403        ))
404    }
405}
406
407impl FormattedSink for KinesisSinkPayloadWriter {
408    type K = String;
409    type V = Vec<u8>;
410
411    async fn write_one(&mut self, k: Option<Self::K>, v: Option<Self::V>) -> Result<()> {
412        self.put_record(
413            k.ok_or_else(|| SinkError::Kinesis(anyhow!("no key provided")))?,
414            v.unwrap_or_default(),
415        );
416        Ok(())
417    }
418}
419
420impl AsyncTruncateSinkWriter for KinesisSinkWriter {
421    type DeliveryFuture = KinesisSinkPayloadWriterDeliveryFuture;
422
423    async fn write_chunk<'a>(
424        &'a mut self,
425        chunk: StreamChunk,
426        mut add_future: DeliveryFutureManagerAddFuture<'a, Self::DeliveryFuture>,
427    ) -> Result<()> {
428        let mut payload_writer = self.new_payload_writer();
429        dispatch_sink_formatter_str_key_impl!(
430            &self.formatter,
431            formatter,
432            payload_writer.write_chunk(chunk, formatter).await
433        )?;
434
435        add_future
436            .add_future_may_await(payload_writer.finish())
437            .await?;
438        Ok(())
439    }
440}
441
442#[cfg(test)]
443mod tests {
444    use aws_sdk_kinesis::types::PutRecordsRequestEntry;
445    use aws_smithy_types::Blob;
446
447    #[test]
448    fn test_kinesis_entry_builder_save_unwrap() {
449        PutRecordsRequestEntry::builder()
450            .data(Blob::new(b"data"))
451            .partition_key("partition-key")
452            .build()
453            .unwrap();
454    }
455}