risingwave_connector/sink/
kinesis.rs1use std::collections::BTreeMap;
16
17use anyhow::{Context, anyhow};
18use aws_sdk_kinesis::Client as KinesisClient;
19use aws_sdk_kinesis::operation::put_records::PutRecordsOutput;
20use aws_sdk_kinesis::primitives::Blob;
21use aws_sdk_kinesis::types::{PutRecordsRequestEntry, PutRecordsResultEntry};
22use futures::{FutureExt, TryFuture};
23use itertools::Itertools;
24use risingwave_common::array::StreamChunk;
25use risingwave_common::catalog::Schema;
26use serde_derive::Deserialize;
27use serde_with::serde_as;
28use with_options::WithOptions;
29
30use super::SinkParam;
31use super::catalog::SinkFormatDesc;
32use crate::connector_common::KinesisCommon;
33use crate::dispatch_sink_formatter_str_key_impl;
34use crate::enforce_secret::EnforceSecret;
35use crate::sink::formatter::SinkFormatterImpl;
36use crate::sink::log_store::DeliveryFutureManagerAddFuture;
37use crate::sink::writer::{
38 AsyncTruncateLogSinkerOf, AsyncTruncateSinkWriter, AsyncTruncateSinkWriterExt, FormattedSink,
39};
40use crate::sink::{Result, Sink, SinkError, SinkWriterParam};
41pub const KINESIS_SINK: &str = "kinesis";
42
43#[derive(Clone, Debug)]
44pub struct KinesisSink {
45 pub config: KinesisSinkConfig,
46 schema: Schema,
47 pk_indices: Vec<usize>,
48 format_desc: SinkFormatDesc,
49 db_name: String,
50 sink_from_name: String,
51}
52
53impl EnforceSecret for KinesisSink {
54 fn enforce_secret<'a>(
55 prop_iter: impl Iterator<Item = &'a str>,
56 ) -> crate::error::ConnectorResult<()> {
57 for prop in prop_iter {
58 KinesisSinkConfig::enforce_one(prop)?;
59 }
60 Ok(())
61 }
62}
63
64impl TryFrom<SinkParam> for KinesisSink {
65 type Error = SinkError;
66
67 fn try_from(param: SinkParam) -> std::result::Result<Self, Self::Error> {
68 let schema = param.schema();
69 let config = KinesisSinkConfig::from_btreemap(param.properties)?;
70 Ok(Self {
71 config,
72 schema,
73 pk_indices: param.downstream_pk,
74 format_desc: param
75 .format_desc
76 .ok_or_else(|| SinkError::Config(anyhow!("missing FORMAT ... ENCODE ...")))?,
77 db_name: param.db_name,
78 sink_from_name: param.sink_from_name,
79 })
80 }
81}
82
83const KINESIS_SINK_MAX_PENDING_CHUNK_NUM: usize = 64;
84
85impl Sink for KinesisSink {
86 type LogSinker = AsyncTruncateLogSinkerOf<KinesisSinkWriter>;
87
88 const SINK_NAME: &'static str = KINESIS_SINK;
89
90 async fn validate(&self) -> Result<()> {
91 if self.pk_indices.is_empty() {
94 return Err(SinkError::Config(anyhow!(
95 "kinesis sink requires partition key (please define in `primary_key` field)",
96 )));
97 }
98 SinkFormatterImpl::new(
100 &self.format_desc,
101 self.schema.clone(),
102 self.pk_indices.clone(),
103 self.db_name.clone(),
104 self.sink_from_name.clone(),
105 &self.config.common.stream_name,
106 )
107 .await?;
108
109 let client = self.config.common.build_client().await?;
111 client
112 .list_shards()
113 .stream_name(&self.config.common.stream_name)
114 .send()
115 .await
116 .context("failed to list shards")
117 .map_err(SinkError::Kinesis)?;
118 Ok(())
119 }
120
121 async fn new_log_sinker(&self, _writer_param: SinkWriterParam) -> Result<Self::LogSinker> {
122 Ok(KinesisSinkWriter::new(
123 self.config.clone(),
124 self.schema.clone(),
125 self.pk_indices.clone(),
126 &self.format_desc,
127 self.db_name.clone(),
128 self.sink_from_name.clone(),
129 )
130 .await?
131 .into_log_sinker(KINESIS_SINK_MAX_PENDING_CHUNK_NUM))
132 }
133}
134
135#[serde_as]
136#[derive(Clone, Debug, Deserialize, WithOptions)]
137pub struct KinesisSinkConfig {
138 #[serde(flatten)]
139 pub common: KinesisCommon,
140}
141
142impl EnforceSecret for KinesisSinkConfig {
143 fn enforce_one(prop: &str) -> crate::error::ConnectorResult<()> {
144 KinesisCommon::enforce_one(prop)?;
145 Ok(())
146 }
147}
148
149impl KinesisSinkConfig {
150 pub fn from_btreemap(properties: BTreeMap<String, String>) -> Result<Self> {
151 let config =
152 serde_json::from_value::<KinesisSinkConfig>(serde_json::to_value(properties).unwrap())
153 .map_err(|e| SinkError::Config(anyhow!(e)))?;
154 Ok(config)
155 }
156}
157
158pub struct KinesisSinkWriter {
159 pub config: KinesisSinkConfig,
160 formatter: SinkFormatterImpl,
161 client: KinesisClient,
162}
163
164struct KinesisSinkPayloadWriter {
165 client: KinesisClient,
166 entries: Vec<(PutRecordsRequestEntry, usize)>,
167 stream_name: String,
168}
169
170impl KinesisSinkWriter {
171 pub async fn new(
172 config: KinesisSinkConfig,
173 schema: Schema,
174 pk_indices: Vec<usize>,
175 format_desc: &SinkFormatDesc,
176 db_name: String,
177 sink_from_name: String,
178 ) -> Result<Self> {
179 let formatter = SinkFormatterImpl::new(
180 format_desc,
181 schema,
182 pk_indices,
183 db_name,
184 sink_from_name,
185 &config.common.stream_name,
186 )
187 .await?;
188 let client = config
189 .common
190 .build_client()
191 .await
192 .map_err(|err| SinkError::Kinesis(anyhow!(err)))?;
193 Ok(Self {
194 config: config.clone(),
195 formatter,
196 client,
197 })
198 }
199
200 fn new_payload_writer(&self) -> KinesisSinkPayloadWriter {
201 KinesisSinkPayloadWriter {
202 client: self.client.clone(),
203 entries: vec![],
204 stream_name: self.config.common.stream_name.clone(),
205 }
206 }
207}
208
209mod opaque_type {
210 use std::cmp::min;
211 use std::time::Duration;
212
213 use thiserror_ext::AsReport;
214 use tokio::time::sleep;
215 use tokio_retry::strategy::{ExponentialBackoff, jitter};
216 use tracing::warn;
217
218 use super::*;
219 pub type KinesisSinkPayloadWriterDeliveryFuture =
220 impl TryFuture<Ok = (), Error = SinkError> + Unpin + Send + 'static;
221
222 impl KinesisSinkPayloadWriter {
223 #[define_opaque(KinesisSinkPayloadWriterDeliveryFuture)]
224 pub(super) fn finish(self) -> KinesisSinkPayloadWriterDeliveryFuture {
225 async move {
229 const MAX_RECORD_COUNT: usize = 500;
235 const MAX_SINGLE_RECORD_PAYLOAD_SIZE: usize = 1 << 20;
236 const MAX_TOTAL_RECORD_PAYLOAD_SIZE: usize = 5 * (1 << 20);
237 const MAX_NO_PROGRESS_RETRY_COUNT: usize = 3;
239
240 let mut remaining_no_progress_retry_count = MAX_NO_PROGRESS_RETRY_COUNT;
241 let total_count = self.entries.len();
242 let mut start_idx = 0;
243
244 let mut throttle_delay = None;
245
246 while start_idx < total_count {
247 let max_record_count = min(MAX_RECORD_COUNT, total_count - start_idx);
252 let mut records = Vec::with_capacity(max_record_count);
253 let mut total_payload_size = 0;
254 for i in start_idx..(start_idx + max_record_count) {
255 let (record, size) = &self.entries[i];
256 if *size >= MAX_SINGLE_RECORD_PAYLOAD_SIZE {
257 warn!(
258 size,
259 partition = record.partition_key,
260 "encounter a large single record"
261 );
262 }
263 if total_payload_size + *size < MAX_TOTAL_RECORD_PAYLOAD_SIZE {
264 total_payload_size += *size;
265 records.push(record.clone());
266 } else {
267 break;
268 }
269 }
270 if records.is_empty() {
271 records.push(self.entries[start_idx].0.clone());
273 }
274
275 let record_count = records.len();
277 match self
278 .client
279 .put_records()
280 .stream_name(&self.stream_name)
281 .set_records(Some(records))
282 .send()
283 .await
284 {
285 Ok(output) => {
286 if record_count != output.records.len() {
287 return Err(SinkError::Kinesis(anyhow!("request record count {} not match the response record count {}", record_count, output.records.len())));
288 }
289 if let Some((first_failed_idx, result_entry)) = Self::first_failed_entry(output) {
297 let partially_sent_count = first_failed_idx;
299 if partially_sent_count > 0 {
300 warn!(
301 partially_sent_count,
302 record_count,
303 "records are partially sent. code: [{}], message: [{}]",
304 result_entry.error_code.unwrap_or_default(),
305 result_entry.error_message.unwrap_or_default()
306 );
307 start_idx += partially_sent_count;
308 remaining_no_progress_retry_count = MAX_NO_PROGRESS_RETRY_COUNT;
310 } else if let Some(err_code) = &result_entry.error_code && err_code == "ProvisionedThroughputExceededException" {
311 let throttle_delay = throttle_delay.get_or_insert_with(|| ExponentialBackoff::from_millis(100).factor(2).max_delay(Duration::from_secs(2)).map(jitter)).next().expect("should not be none");
317 warn!(err_string = ?result_entry.error_message, ?throttle_delay, "throttle");
318 sleep(throttle_delay).await;
319 } else {
320 assert_eq!(first_failed_idx, 0);
322 remaining_no_progress_retry_count -= 1;
323 if remaining_no_progress_retry_count == 0 {
324 return Err(SinkError::Kinesis(anyhow!(
325 "failed to send records. sent {} out of {}, last err: code: [{}], message: [{}]",
326 start_idx,
327 total_count,
328 result_entry.error_code.unwrap_or_default(),
329 result_entry.error_message.unwrap_or_default()
330 )));
331 } else {
332 warn!(
333 remaining_no_progress_retry_count,
334 sent = start_idx,
335 total_count,
336 "failed to send records. code: [{}], message: [{}]",
337 result_entry.error_code.unwrap_or_default(),
338 result_entry.error_message.unwrap_or_default()
339 )
340 }
341 }
342 } else {
343 start_idx += record_count;
344 remaining_no_progress_retry_count = MAX_NO_PROGRESS_RETRY_COUNT;
346 throttle_delay = None;
348 }
349 }
350 Err(e) => {
351 remaining_no_progress_retry_count -= 1;
352 if remaining_no_progress_retry_count == 0 {
353 return Err(SinkError::Kinesis(anyhow!(e).context(format!(
354 "failed to send records. sent {} out of {}",
355 start_idx, total_count,
356 ))));
357 } else {
358 warn!(
359 remaining_no_progress_retry_count,
360 sent = start_idx,
361 total_count,
362 "failed to send records. err: [{:?}]",
363 e.as_report(),
364 )
365 }
366 }
367 }
368 }
369 Ok(())
370 }
371 .boxed()
372 }
373 }
374}
375pub use opaque_type::KinesisSinkPayloadWriterDeliveryFuture;
376
377impl KinesisSinkPayloadWriter {
378 fn first_failed_entry(output: PutRecordsOutput) -> Option<(usize, PutRecordsResultEntry)> {
379 output
389 .records
390 .into_iter()
391 .find_position(|entry| entry.shard_id.is_none())
392 }
393
394 fn put_record(&mut self, key: String, payload: Vec<u8>) {
395 let size = key.len() + payload.len();
396 self.entries.push((
397 PutRecordsRequestEntry::builder()
398 .partition_key(key)
399 .data(Blob::new(payload))
400 .build()
401 .expect("should not fail because we have set `data` and `partition_key`"),
402 size,
403 ))
404 }
405}
406
407impl FormattedSink for KinesisSinkPayloadWriter {
408 type K = String;
409 type V = Vec<u8>;
410
411 async fn write_one(&mut self, k: Option<Self::K>, v: Option<Self::V>) -> Result<()> {
412 self.put_record(
413 k.ok_or_else(|| SinkError::Kinesis(anyhow!("no key provided")))?,
414 v.unwrap_or_default(),
415 );
416 Ok(())
417 }
418}
419
420impl AsyncTruncateSinkWriter for KinesisSinkWriter {
421 type DeliveryFuture = KinesisSinkPayloadWriterDeliveryFuture;
422
423 async fn write_chunk<'a>(
424 &'a mut self,
425 chunk: StreamChunk,
426 mut add_future: DeliveryFutureManagerAddFuture<'a, Self::DeliveryFuture>,
427 ) -> Result<()> {
428 let mut payload_writer = self.new_payload_writer();
429 dispatch_sink_formatter_str_key_impl!(
430 &self.formatter,
431 formatter,
432 payload_writer.write_chunk(chunk, formatter).await
433 )?;
434
435 add_future
436 .add_future_may_await(payload_writer.finish())
437 .await?;
438 Ok(())
439 }
440}
441
442#[cfg(test)]
443mod tests {
444 use aws_sdk_kinesis::types::PutRecordsRequestEntry;
445 use aws_smithy_types::Blob;
446
447 #[test]
448 fn test_kinesis_entry_builder_save_unwrap() {
449 PutRecordsRequestEntry::builder()
450 .data(Blob::new(b"data"))
451 .partition_key("partition-key")
452 .build()
453 .unwrap();
454 }
455}