risingwave_connector/sink/
kinesis.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::BTreeMap;
16
17use anyhow::{Context, anyhow};
18use aws_sdk_kinesis::Client as KinesisClient;
19use aws_sdk_kinesis::operation::put_records::PutRecordsOutput;
20use aws_sdk_kinesis::primitives::Blob;
21use aws_sdk_kinesis::types::{PutRecordsRequestEntry, PutRecordsResultEntry};
22use futures::{FutureExt, TryFuture};
23use itertools::Itertools;
24use risingwave_common::array::StreamChunk;
25use risingwave_common::catalog::Schema;
26use serde_derive::Deserialize;
27use serde_with::serde_as;
28use with_options::WithOptions;
29
30use super::SinkParam;
31use super::catalog::SinkFormatDesc;
32use crate::connector_common::KinesisCommon;
33use crate::dispatch_sink_formatter_str_key_impl;
34use crate::sink::formatter::SinkFormatterImpl;
35use crate::sink::log_store::DeliveryFutureManagerAddFuture;
36use crate::sink::writer::{
37    AsyncTruncateLogSinkerOf, AsyncTruncateSinkWriter, AsyncTruncateSinkWriterExt, FormattedSink,
38};
39use crate::sink::{DummySinkCommitCoordinator, Result, Sink, SinkError, SinkWriterParam};
40
41pub const KINESIS_SINK: &str = "kinesis";
42
43#[derive(Clone, Debug)]
44pub struct KinesisSink {
45    pub config: KinesisSinkConfig,
46    schema: Schema,
47    pk_indices: Vec<usize>,
48    format_desc: SinkFormatDesc,
49    db_name: String,
50    sink_from_name: String,
51}
52
53impl TryFrom<SinkParam> for KinesisSink {
54    type Error = SinkError;
55
56    fn try_from(param: SinkParam) -> std::result::Result<Self, Self::Error> {
57        let schema = param.schema();
58        let config = KinesisSinkConfig::from_btreemap(param.properties)?;
59        Ok(Self {
60            config,
61            schema,
62            pk_indices: param.downstream_pk,
63            format_desc: param
64                .format_desc
65                .ok_or_else(|| SinkError::Config(anyhow!("missing FORMAT ... ENCODE ...")))?,
66            db_name: param.db_name,
67            sink_from_name: param.sink_from_name,
68        })
69    }
70}
71
72const KINESIS_SINK_MAX_PENDING_CHUNK_NUM: usize = 64;
73
74impl Sink for KinesisSink {
75    type Coordinator = DummySinkCommitCoordinator;
76    type LogSinker = AsyncTruncateLogSinkerOf<KinesisSinkWriter>;
77
78    const SINK_NAME: &'static str = KINESIS_SINK;
79
80    async fn validate(&self) -> Result<()> {
81        // Kinesis requires partition key. There is no builtin support for round-robin as in kafka/pulsar.
82        // https://docs.aws.amazon.com/kinesis/latest/APIReference/API_PutRecord.html#Streams-PutRecord-request-PartitionKey
83        if self.pk_indices.is_empty() {
84            return Err(SinkError::Config(anyhow!(
85                "kinesis sink requires partition key (please define in `primary_key` field)",
86            )));
87        }
88        // Check for formatter constructor error, before it is too late for error reporting.
89        SinkFormatterImpl::new(
90            &self.format_desc,
91            self.schema.clone(),
92            self.pk_indices.clone(),
93            self.db_name.clone(),
94            self.sink_from_name.clone(),
95            &self.config.common.stream_name,
96        )
97        .await?;
98
99        // check reachability
100        let client = self.config.common.build_client().await?;
101        client
102            .list_shards()
103            .stream_name(&self.config.common.stream_name)
104            .send()
105            .await
106            .context("failed to list shards")
107            .map_err(SinkError::Kinesis)?;
108        Ok(())
109    }
110
111    async fn new_log_sinker(&self, _writer_param: SinkWriterParam) -> Result<Self::LogSinker> {
112        Ok(KinesisSinkWriter::new(
113            self.config.clone(),
114            self.schema.clone(),
115            self.pk_indices.clone(),
116            &self.format_desc,
117            self.db_name.clone(),
118            self.sink_from_name.clone(),
119        )
120        .await?
121        .into_log_sinker(KINESIS_SINK_MAX_PENDING_CHUNK_NUM))
122    }
123}
124
125#[serde_as]
126#[derive(Clone, Debug, Deserialize, WithOptions)]
127pub struct KinesisSinkConfig {
128    #[serde(flatten)]
129    pub common: KinesisCommon,
130}
131
132impl KinesisSinkConfig {
133    pub fn from_btreemap(properties: BTreeMap<String, String>) -> Result<Self> {
134        let config =
135            serde_json::from_value::<KinesisSinkConfig>(serde_json::to_value(properties).unwrap())
136                .map_err(|e| SinkError::Config(anyhow!(e)))?;
137        Ok(config)
138    }
139}
140
141pub struct KinesisSinkWriter {
142    pub config: KinesisSinkConfig,
143    formatter: SinkFormatterImpl,
144    client: KinesisClient,
145}
146
147struct KinesisSinkPayloadWriter {
148    client: KinesisClient,
149    entries: Vec<(PutRecordsRequestEntry, usize)>,
150    stream_name: String,
151}
152
153impl KinesisSinkWriter {
154    pub async fn new(
155        config: KinesisSinkConfig,
156        schema: Schema,
157        pk_indices: Vec<usize>,
158        format_desc: &SinkFormatDesc,
159        db_name: String,
160        sink_from_name: String,
161    ) -> Result<Self> {
162        let formatter = SinkFormatterImpl::new(
163            format_desc,
164            schema,
165            pk_indices,
166            db_name,
167            sink_from_name,
168            &config.common.stream_name,
169        )
170        .await?;
171        let client = config
172            .common
173            .build_client()
174            .await
175            .map_err(|err| SinkError::Kinesis(anyhow!(err)))?;
176        Ok(Self {
177            config: config.clone(),
178            formatter,
179            client,
180        })
181    }
182
183    fn new_payload_writer(&self) -> KinesisSinkPayloadWriter {
184        KinesisSinkPayloadWriter {
185            client: self.client.clone(),
186            entries: vec![],
187            stream_name: self.config.common.stream_name.clone(),
188        }
189    }
190}
191
192mod opaque_type {
193    use std::cmp::min;
194    use std::time::Duration;
195
196    use thiserror_ext::AsReport;
197    use tokio::time::sleep;
198    use tokio_retry::strategy::{ExponentialBackoff, jitter};
199    use tracing::warn;
200
201    use super::*;
202    pub type KinesisSinkPayloadWriterDeliveryFuture =
203        impl TryFuture<Ok = (), Error = SinkError> + Unpin + Send + 'static;
204
205    impl KinesisSinkPayloadWriter {
206        pub(super) fn finish(self) -> KinesisSinkPayloadWriterDeliveryFuture {
207            // For reference to the behavior of `put_records`
208            // https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/kinesis/client/put_records.html
209
210            async move {
211                // From the doc of `put_records`:
212                // Each PutRecords request can support up to 500 records. Each record in the request can be as large as 1 MiB,
213                // up to a limit of 5 MiB for the entire request, including partition keys. Each shard can support writes up to
214                // 1,000 records per second, up to a maximum data write total of 1 MiB per second.
215
216                const MAX_RECORD_COUNT: usize = 500;
217                const MAX_SINGLE_RECORD_PAYLOAD_SIZE: usize = 1 << 20;
218                const MAX_TOTAL_RECORD_PAYLOAD_SIZE: usize = 5 * (1 << 20);
219                // Allow at most 3 times of retry when not making any progress to avoid endless retry
220                const MAX_NO_PROGRESS_RETRY_COUNT: usize = 3;
221
222                let mut remaining_no_progress_retry_count = MAX_NO_PROGRESS_RETRY_COUNT;
223                let total_count = self.entries.len();
224                let mut start_idx = 0;
225
226                let mut throttle_delay = None;
227
228                while start_idx < total_count {
229                    // 1. Prepare the records to be sent
230
231                    // The maximum possible number of records that can be sent in this iteration.
232                    // Can be smaller than this number when the total payload size exceeds `MAX_TOTAL_RECORD_PAYLOAD_SIZE`
233                    let max_record_count = min(MAX_RECORD_COUNT, total_count - start_idx);
234                    let mut records = Vec::with_capacity(max_record_count);
235                    let mut total_payload_size = 0;
236                    for i in start_idx..(start_idx + max_record_count) {
237                        let (record, size) = &self.entries[i];
238                        if *size >= MAX_SINGLE_RECORD_PAYLOAD_SIZE {
239                            warn!(
240                                size,
241                                partition = record.partition_key,
242                                "encounter a large single record"
243                            );
244                        }
245                        if total_payload_size + *size < MAX_TOTAL_RECORD_PAYLOAD_SIZE {
246                            total_payload_size += *size;
247                            records.push(record.clone());
248                        } else {
249                            break;
250                        }
251                    }
252                    if records.is_empty() {
253                        // at least include one record even if its size exceed `MAX_TOTAL_RECORD_PAYLOAD_SIZE`
254                        records.push(self.entries[start_idx].0.clone());
255                    }
256
257                    // 2. send the records and handle the result
258                    let record_count = records.len();
259                    match self
260                        .client
261                        .put_records()
262                        .stream_name(&self.stream_name)
263                        .set_records(Some(records))
264                        .send()
265                        .await
266                    {
267                        Ok(output) => {
268                            if record_count != output.records.len() {
269                                return Err(SinkError::Kinesis(anyhow!("request record count {} not match the response record count {}", record_count, output.records.len())));
270                            }
271                            // From the doc of `put_records`:
272                            // A single record failure does not stop the processing of subsequent records. As a result,
273                            // PutRecords doesn’t guarantee the ordering of records. If you need to read records in the same
274                            // order they are written to the stream, use PutRecord instead of PutRecords, and write to the same shard.
275
276                            // Therefore, to ensure at least once and eventual consistency, we figure out the first failed entry, and retry
277                            // all the following entries even if the following entries may have been successfully processed.
278                            if let Some((first_failed_idx, result_entry)) = Self::first_failed_entry(output) {
279                                // first_failed_idx is also the number of successful entries
280                                let partially_sent_count = first_failed_idx;
281                                if partially_sent_count > 0 {
282                                    warn!(
283                                        partially_sent_count,
284                                        record_count,
285                                        "records are partially sent. code: [{}], message: [{}]",
286                                        result_entry.error_code.unwrap_or_default(),
287                                        result_entry.error_message.unwrap_or_default()
288                                    );
289                                    start_idx += partially_sent_count;
290                                    // reset retry count when having progress
291                                    remaining_no_progress_retry_count = MAX_NO_PROGRESS_RETRY_COUNT;
292                                } else if let Some(err_code) = &result_entry.error_code && err_code == "ProvisionedThroughputExceededException" {
293                                    // From the doc of `put_records`:
294                                    // The ErrorCode parameter reflects the type of error and can be one of the following values:
295                                    // ProvisionedThroughputExceededException or InternalFailure. ErrorMessage provides more detailed
296                                    // information about the ProvisionedThroughputExceededException exception including the account ID,
297                                    // stream name, and shard ID of the record that was throttled.
298                                    let throttle_delay = throttle_delay.get_or_insert_with(|| ExponentialBackoff::from_millis(100).factor(2).max_delay(Duration::from_secs(2)).map(jitter)).next().expect("should not be none");
299                                    warn!(err_string = ?result_entry.error_message, ?throttle_delay, "throttle");
300                                    sleep(throttle_delay).await;
301                                } else  {
302                                    // no progress due to some internal error
303                                    assert_eq!(first_failed_idx, 0);
304                                    remaining_no_progress_retry_count -= 1;
305                                    if remaining_no_progress_retry_count == 0 {
306                                        return Err(SinkError::Kinesis(anyhow!(
307                                            "failed to send records. sent {} out of {}, last err: code: [{}], message: [{}]",
308                                            start_idx,
309                                            total_count,
310                                            result_entry.error_code.unwrap_or_default(),
311                                            result_entry.error_message.unwrap_or_default()
312                                        )));
313                                    } else {
314                                        warn!(
315                                            remaining_no_progress_retry_count,
316                                            sent = start_idx,
317                                            total_count,
318                                            "failed to send records. code: [{}], message: [{}]",
319                                            result_entry.error_code.unwrap_or_default(),
320                                            result_entry.error_message.unwrap_or_default()
321                                        )
322                                    }
323                                }
324                            } else {
325                                start_idx += record_count;
326                                // reset retry count when having progress
327                                remaining_no_progress_retry_count = MAX_NO_PROGRESS_RETRY_COUNT;
328                                // reset throttle delay when records can be fully sent.
329                                throttle_delay = None;
330                            }
331                        }
332                        Err(e) => {
333                            remaining_no_progress_retry_count -= 1;
334                            if remaining_no_progress_retry_count == 0 {
335                                return Err(SinkError::Kinesis(anyhow!(e).context(format!(
336                                    "failed to send records. sent {} out of {}",
337                                    start_idx, total_count,
338                                ))));
339                            } else {
340                                warn!(
341                                    remaining_no_progress_retry_count,
342                                    sent = start_idx,
343                                    total_count,
344                                    "failed to send records. err: [{:?}]",
345                                    e.as_report(),
346                                )
347                            }
348                        }
349                    }
350                }
351                Ok(())
352            }
353            .boxed()
354        }
355    }
356}
357pub use opaque_type::KinesisSinkPayloadWriterDeliveryFuture;
358
359impl KinesisSinkPayloadWriter {
360    fn first_failed_entry(output: PutRecordsOutput) -> Option<(usize, PutRecordsResultEntry)> {
361        // From the doc of `put_records`:
362        // A successfully processed record includes ShardId and SequenceNumber values. The ShardId parameter
363        // identifies the shard in the stream where the record is stored. The SequenceNumber parameter is an
364        // identifier assigned to the put record, unique to all records in the stream.
365        //
366        // An unsuccessfully processed record includes ErrorCode and ErrorMessage values. ErrorCode reflects
367        // the type of error and can be one of the following values: ProvisionedThroughputExceededException or
368        // InternalFailure. ErrorMessage provides more detailed information about the ProvisionedThroughputExceededException
369        // exception including the account ID, stream name, and shard ID of the record that was throttled.
370        output
371            .records
372            .into_iter()
373            .find_position(|entry| entry.shard_id.is_none())
374    }
375
376    fn put_record(&mut self, key: String, payload: Vec<u8>) {
377        let size = key.len() + payload.len();
378        self.entries.push((
379            PutRecordsRequestEntry::builder()
380                .partition_key(key)
381                .data(Blob::new(payload))
382                .build()
383                .expect("should not fail because we have set `data` and `partition_key`"),
384            size,
385        ))
386    }
387}
388
389impl FormattedSink for KinesisSinkPayloadWriter {
390    type K = String;
391    type V = Vec<u8>;
392
393    async fn write_one(&mut self, k: Option<Self::K>, v: Option<Self::V>) -> Result<()> {
394        self.put_record(
395            k.ok_or_else(|| SinkError::Kinesis(anyhow!("no key provided")))?,
396            v.unwrap_or_default(),
397        );
398        Ok(())
399    }
400}
401
402impl AsyncTruncateSinkWriter for KinesisSinkWriter {
403    type DeliveryFuture = KinesisSinkPayloadWriterDeliveryFuture;
404
405    async fn write_chunk<'a>(
406        &'a mut self,
407        chunk: StreamChunk,
408        mut add_future: DeliveryFutureManagerAddFuture<'a, Self::DeliveryFuture>,
409    ) -> Result<()> {
410        let mut payload_writer = self.new_payload_writer();
411        dispatch_sink_formatter_str_key_impl!(
412            &self.formatter,
413            formatter,
414            payload_writer.write_chunk(chunk, formatter).await
415        )?;
416
417        add_future
418            .add_future_may_await(payload_writer.finish())
419            .await?;
420        Ok(())
421    }
422}
423
424#[cfg(test)]
425mod tests {
426    use aws_sdk_kinesis::types::PutRecordsRequestEntry;
427    use aws_smithy_types::Blob;
428
429    #[test]
430    fn test_kinesis_entry_builder_save_unwrap() {
431        PutRecordsRequestEntry::builder()
432            .data(Blob::new(b"data"))
433            .partition_key("partition-key")
434            .build()
435            .unwrap();
436    }
437}