risingwave_connector/sink/
kinesis.rs1use std::collections::BTreeMap;
16
17use anyhow::{Context, anyhow};
18use aws_sdk_kinesis::Client as KinesisClient;
19use aws_sdk_kinesis::operation::put_records::PutRecordsOutput;
20use aws_sdk_kinesis::primitives::Blob;
21use aws_sdk_kinesis::types::{PutRecordsRequestEntry, PutRecordsResultEntry};
22use futures::{FutureExt, TryFuture};
23use itertools::Itertools;
24use risingwave_common::array::StreamChunk;
25use risingwave_common::catalog::Schema;
26use serde_derive::Deserialize;
27use serde_with::serde_as;
28use with_options::WithOptions;
29
30use super::SinkParam;
31use super::catalog::SinkFormatDesc;
32use crate::connector_common::KinesisCommon;
33use crate::dispatch_sink_formatter_str_key_impl;
34use crate::sink::formatter::SinkFormatterImpl;
35use crate::sink::log_store::DeliveryFutureManagerAddFuture;
36use crate::sink::writer::{
37 AsyncTruncateLogSinkerOf, AsyncTruncateSinkWriter, AsyncTruncateSinkWriterExt, FormattedSink,
38};
39use crate::sink::{DummySinkCommitCoordinator, Result, Sink, SinkError, SinkWriterParam};
40
41pub const KINESIS_SINK: &str = "kinesis";
42
43#[derive(Clone, Debug)]
44pub struct KinesisSink {
45 pub config: KinesisSinkConfig,
46 schema: Schema,
47 pk_indices: Vec<usize>,
48 format_desc: SinkFormatDesc,
49 db_name: String,
50 sink_from_name: String,
51}
52
53impl TryFrom<SinkParam> for KinesisSink {
54 type Error = SinkError;
55
56 fn try_from(param: SinkParam) -> std::result::Result<Self, Self::Error> {
57 let schema = param.schema();
58 let config = KinesisSinkConfig::from_btreemap(param.properties)?;
59 Ok(Self {
60 config,
61 schema,
62 pk_indices: param.downstream_pk,
63 format_desc: param
64 .format_desc
65 .ok_or_else(|| SinkError::Config(anyhow!("missing FORMAT ... ENCODE ...")))?,
66 db_name: param.db_name,
67 sink_from_name: param.sink_from_name,
68 })
69 }
70}
71
72const KINESIS_SINK_MAX_PENDING_CHUNK_NUM: usize = 64;
73
74impl Sink for KinesisSink {
75 type Coordinator = DummySinkCommitCoordinator;
76 type LogSinker = AsyncTruncateLogSinkerOf<KinesisSinkWriter>;
77
78 const SINK_NAME: &'static str = KINESIS_SINK;
79
80 async fn validate(&self) -> Result<()> {
81 if self.pk_indices.is_empty() {
84 return Err(SinkError::Config(anyhow!(
85 "kinesis sink requires partition key (please define in `primary_key` field)",
86 )));
87 }
88 SinkFormatterImpl::new(
90 &self.format_desc,
91 self.schema.clone(),
92 self.pk_indices.clone(),
93 self.db_name.clone(),
94 self.sink_from_name.clone(),
95 &self.config.common.stream_name,
96 )
97 .await?;
98
99 let client = self.config.common.build_client().await?;
101 client
102 .list_shards()
103 .stream_name(&self.config.common.stream_name)
104 .send()
105 .await
106 .context("failed to list shards")
107 .map_err(SinkError::Kinesis)?;
108 Ok(())
109 }
110
111 async fn new_log_sinker(&self, _writer_param: SinkWriterParam) -> Result<Self::LogSinker> {
112 Ok(KinesisSinkWriter::new(
113 self.config.clone(),
114 self.schema.clone(),
115 self.pk_indices.clone(),
116 &self.format_desc,
117 self.db_name.clone(),
118 self.sink_from_name.clone(),
119 )
120 .await?
121 .into_log_sinker(KINESIS_SINK_MAX_PENDING_CHUNK_NUM))
122 }
123}
124
125#[serde_as]
126#[derive(Clone, Debug, Deserialize, WithOptions)]
127pub struct KinesisSinkConfig {
128 #[serde(flatten)]
129 pub common: KinesisCommon,
130}
131
132impl KinesisSinkConfig {
133 pub fn from_btreemap(properties: BTreeMap<String, String>) -> Result<Self> {
134 let config =
135 serde_json::from_value::<KinesisSinkConfig>(serde_json::to_value(properties).unwrap())
136 .map_err(|e| SinkError::Config(anyhow!(e)))?;
137 Ok(config)
138 }
139}
140
141pub struct KinesisSinkWriter {
142 pub config: KinesisSinkConfig,
143 formatter: SinkFormatterImpl,
144 client: KinesisClient,
145}
146
147struct KinesisSinkPayloadWriter {
148 client: KinesisClient,
149 entries: Vec<(PutRecordsRequestEntry, usize)>,
150 stream_name: String,
151}
152
153impl KinesisSinkWriter {
154 pub async fn new(
155 config: KinesisSinkConfig,
156 schema: Schema,
157 pk_indices: Vec<usize>,
158 format_desc: &SinkFormatDesc,
159 db_name: String,
160 sink_from_name: String,
161 ) -> Result<Self> {
162 let formatter = SinkFormatterImpl::new(
163 format_desc,
164 schema,
165 pk_indices,
166 db_name,
167 sink_from_name,
168 &config.common.stream_name,
169 )
170 .await?;
171 let client = config
172 .common
173 .build_client()
174 .await
175 .map_err(|err| SinkError::Kinesis(anyhow!(err)))?;
176 Ok(Self {
177 config: config.clone(),
178 formatter,
179 client,
180 })
181 }
182
183 fn new_payload_writer(&self) -> KinesisSinkPayloadWriter {
184 KinesisSinkPayloadWriter {
185 client: self.client.clone(),
186 entries: vec![],
187 stream_name: self.config.common.stream_name.clone(),
188 }
189 }
190}
191
192mod opaque_type {
193 use std::cmp::min;
194 use std::time::Duration;
195
196 use thiserror_ext::AsReport;
197 use tokio::time::sleep;
198 use tokio_retry::strategy::{ExponentialBackoff, jitter};
199 use tracing::warn;
200
201 use super::*;
202 pub type KinesisSinkPayloadWriterDeliveryFuture =
203 impl TryFuture<Ok = (), Error = SinkError> + Unpin + Send + 'static;
204
205 impl KinesisSinkPayloadWriter {
206 pub(super) fn finish(self) -> KinesisSinkPayloadWriterDeliveryFuture {
207 async move {
211 const MAX_RECORD_COUNT: usize = 500;
217 const MAX_SINGLE_RECORD_PAYLOAD_SIZE: usize = 1 << 20;
218 const MAX_TOTAL_RECORD_PAYLOAD_SIZE: usize = 5 * (1 << 20);
219 const MAX_NO_PROGRESS_RETRY_COUNT: usize = 3;
221
222 let mut remaining_no_progress_retry_count = MAX_NO_PROGRESS_RETRY_COUNT;
223 let total_count = self.entries.len();
224 let mut start_idx = 0;
225
226 let mut throttle_delay = None;
227
228 while start_idx < total_count {
229 let max_record_count = min(MAX_RECORD_COUNT, total_count - start_idx);
234 let mut records = Vec::with_capacity(max_record_count);
235 let mut total_payload_size = 0;
236 for i in start_idx..(start_idx + max_record_count) {
237 let (record, size) = &self.entries[i];
238 if *size >= MAX_SINGLE_RECORD_PAYLOAD_SIZE {
239 warn!(
240 size,
241 partition = record.partition_key,
242 "encounter a large single record"
243 );
244 }
245 if total_payload_size + *size < MAX_TOTAL_RECORD_PAYLOAD_SIZE {
246 total_payload_size += *size;
247 records.push(record.clone());
248 } else {
249 break;
250 }
251 }
252 if records.is_empty() {
253 records.push(self.entries[start_idx].0.clone());
255 }
256
257 let record_count = records.len();
259 match self
260 .client
261 .put_records()
262 .stream_name(&self.stream_name)
263 .set_records(Some(records))
264 .send()
265 .await
266 {
267 Ok(output) => {
268 if record_count != output.records.len() {
269 return Err(SinkError::Kinesis(anyhow!("request record count {} not match the response record count {}", record_count, output.records.len())));
270 }
271 if let Some((first_failed_idx, result_entry)) = Self::first_failed_entry(output) {
279 let partially_sent_count = first_failed_idx;
281 if partially_sent_count > 0 {
282 warn!(
283 partially_sent_count,
284 record_count,
285 "records are partially sent. code: [{}], message: [{}]",
286 result_entry.error_code.unwrap_or_default(),
287 result_entry.error_message.unwrap_or_default()
288 );
289 start_idx += partially_sent_count;
290 remaining_no_progress_retry_count = MAX_NO_PROGRESS_RETRY_COUNT;
292 } else if let Some(err_code) = &result_entry.error_code && err_code == "ProvisionedThroughputExceededException" {
293 let throttle_delay = throttle_delay.get_or_insert_with(|| ExponentialBackoff::from_millis(100).factor(2).max_delay(Duration::from_secs(2)).map(jitter)).next().expect("should not be none");
299 warn!(err_string = ?result_entry.error_message, ?throttle_delay, "throttle");
300 sleep(throttle_delay).await;
301 } else {
302 assert_eq!(first_failed_idx, 0);
304 remaining_no_progress_retry_count -= 1;
305 if remaining_no_progress_retry_count == 0 {
306 return Err(SinkError::Kinesis(anyhow!(
307 "failed to send records. sent {} out of {}, last err: code: [{}], message: [{}]",
308 start_idx,
309 total_count,
310 result_entry.error_code.unwrap_or_default(),
311 result_entry.error_message.unwrap_or_default()
312 )));
313 } else {
314 warn!(
315 remaining_no_progress_retry_count,
316 sent = start_idx,
317 total_count,
318 "failed to send records. code: [{}], message: [{}]",
319 result_entry.error_code.unwrap_or_default(),
320 result_entry.error_message.unwrap_or_default()
321 )
322 }
323 }
324 } else {
325 start_idx += record_count;
326 remaining_no_progress_retry_count = MAX_NO_PROGRESS_RETRY_COUNT;
328 throttle_delay = None;
330 }
331 }
332 Err(e) => {
333 remaining_no_progress_retry_count -= 1;
334 if remaining_no_progress_retry_count == 0 {
335 return Err(SinkError::Kinesis(anyhow!(e).context(format!(
336 "failed to send records. sent {} out of {}",
337 start_idx, total_count,
338 ))));
339 } else {
340 warn!(
341 remaining_no_progress_retry_count,
342 sent = start_idx,
343 total_count,
344 "failed to send records. err: [{:?}]",
345 e.as_report(),
346 )
347 }
348 }
349 }
350 }
351 Ok(())
352 }
353 .boxed()
354 }
355 }
356}
357pub use opaque_type::KinesisSinkPayloadWriterDeliveryFuture;
358
359impl KinesisSinkPayloadWriter {
360 fn first_failed_entry(output: PutRecordsOutput) -> Option<(usize, PutRecordsResultEntry)> {
361 output
371 .records
372 .into_iter()
373 .find_position(|entry| entry.shard_id.is_none())
374 }
375
376 fn put_record(&mut self, key: String, payload: Vec<u8>) {
377 let size = key.len() + payload.len();
378 self.entries.push((
379 PutRecordsRequestEntry::builder()
380 .partition_key(key)
381 .data(Blob::new(payload))
382 .build()
383 .expect("should not fail because we have set `data` and `partition_key`"),
384 size,
385 ))
386 }
387}
388
389impl FormattedSink for KinesisSinkPayloadWriter {
390 type K = String;
391 type V = Vec<u8>;
392
393 async fn write_one(&mut self, k: Option<Self::K>, v: Option<Self::V>) -> Result<()> {
394 self.put_record(
395 k.ok_or_else(|| SinkError::Kinesis(anyhow!("no key provided")))?,
396 v.unwrap_or_default(),
397 );
398 Ok(())
399 }
400}
401
402impl AsyncTruncateSinkWriter for KinesisSinkWriter {
403 type DeliveryFuture = KinesisSinkPayloadWriterDeliveryFuture;
404
405 async fn write_chunk<'a>(
406 &'a mut self,
407 chunk: StreamChunk,
408 mut add_future: DeliveryFutureManagerAddFuture<'a, Self::DeliveryFuture>,
409 ) -> Result<()> {
410 let mut payload_writer = self.new_payload_writer();
411 dispatch_sink_formatter_str_key_impl!(
412 &self.formatter,
413 formatter,
414 payload_writer.write_chunk(chunk, formatter).await
415 )?;
416
417 add_future
418 .add_future_may_await(payload_writer.finish())
419 .await?;
420 Ok(())
421 }
422}
423
424#[cfg(test)]
425mod tests {
426 use aws_sdk_kinesis::types::PutRecordsRequestEntry;
427 use aws_smithy_types::Blob;
428
429 #[test]
430 fn test_kinesis_entry_builder_save_unwrap() {
431 PutRecordsRequestEntry::builder()
432 .data(Blob::new(b"data"))
433 .partition_key("partition-key")
434 .build()
435 .unwrap();
436 }
437}