risingwave_connector/sink/
kinesis.rs1use std::collections::BTreeMap;
16
17use anyhow::{Context, anyhow};
18use aws_sdk_kinesis::Client as KinesisClient;
19use aws_sdk_kinesis::operation::put_records::PutRecordsOutput;
20use aws_sdk_kinesis::primitives::Blob;
21use aws_sdk_kinesis::types::{PutRecordsRequestEntry, PutRecordsResultEntry};
22use futures::{FutureExt, TryFuture};
23use itertools::Itertools;
24use risingwave_common::array::StreamChunk;
25use risingwave_common::catalog::Schema;
26use serde_derive::Deserialize;
27use serde_with::serde_as;
28use with_options::WithOptions;
29
30use super::SinkParam;
31use super::catalog::SinkFormatDesc;
32use crate::connector_common::KinesisCommon;
33use crate::dispatch_sink_formatter_str_key_impl;
34use crate::enforce_secret::EnforceSecret;
35use crate::sink::formatter::SinkFormatterImpl;
36use crate::sink::log_store::DeliveryFutureManagerAddFuture;
37use crate::sink::writer::{
38 AsyncTruncateLogSinkerOf, AsyncTruncateSinkWriter, AsyncTruncateSinkWriterExt, FormattedSink,
39};
40use crate::sink::{DummySinkCommitCoordinator, Result, Sink, SinkError, SinkWriterParam};
41pub const KINESIS_SINK: &str = "kinesis";
42
43#[derive(Clone, Debug)]
44pub struct KinesisSink {
45 pub config: KinesisSinkConfig,
46 schema: Schema,
47 pk_indices: Vec<usize>,
48 format_desc: SinkFormatDesc,
49 db_name: String,
50 sink_from_name: String,
51}
52
53impl EnforceSecret for KinesisSink {
54 fn enforce_secret<'a>(
55 prop_iter: impl Iterator<Item = &'a str>,
56 ) -> crate::error::ConnectorResult<()> {
57 for prop in prop_iter {
58 KinesisSinkConfig::enforce_one(prop)?;
59 }
60 Ok(())
61 }
62}
63
64impl TryFrom<SinkParam> for KinesisSink {
65 type Error = SinkError;
66
67 fn try_from(param: SinkParam) -> std::result::Result<Self, Self::Error> {
68 let schema = param.schema();
69 let config = KinesisSinkConfig::from_btreemap(param.properties)?;
70 Ok(Self {
71 config,
72 schema,
73 pk_indices: param.downstream_pk,
74 format_desc: param
75 .format_desc
76 .ok_or_else(|| SinkError::Config(anyhow!("missing FORMAT ... ENCODE ...")))?,
77 db_name: param.db_name,
78 sink_from_name: param.sink_from_name,
79 })
80 }
81}
82
83const KINESIS_SINK_MAX_PENDING_CHUNK_NUM: usize = 64;
84
85impl Sink for KinesisSink {
86 type Coordinator = DummySinkCommitCoordinator;
87 type LogSinker = AsyncTruncateLogSinkerOf<KinesisSinkWriter>;
88
89 const SINK_NAME: &'static str = KINESIS_SINK;
90
91 async fn validate(&self) -> Result<()> {
92 if self.pk_indices.is_empty() {
95 return Err(SinkError::Config(anyhow!(
96 "kinesis sink requires partition key (please define in `primary_key` field)",
97 )));
98 }
99 SinkFormatterImpl::new(
101 &self.format_desc,
102 self.schema.clone(),
103 self.pk_indices.clone(),
104 self.db_name.clone(),
105 self.sink_from_name.clone(),
106 &self.config.common.stream_name,
107 )
108 .await?;
109
110 let client = self.config.common.build_client().await?;
112 client
113 .list_shards()
114 .stream_name(&self.config.common.stream_name)
115 .send()
116 .await
117 .context("failed to list shards")
118 .map_err(SinkError::Kinesis)?;
119 Ok(())
120 }
121
122 async fn new_log_sinker(&self, _writer_param: SinkWriterParam) -> Result<Self::LogSinker> {
123 Ok(KinesisSinkWriter::new(
124 self.config.clone(),
125 self.schema.clone(),
126 self.pk_indices.clone(),
127 &self.format_desc,
128 self.db_name.clone(),
129 self.sink_from_name.clone(),
130 )
131 .await?
132 .into_log_sinker(KINESIS_SINK_MAX_PENDING_CHUNK_NUM))
133 }
134}
135
136#[serde_as]
137#[derive(Clone, Debug, Deserialize, WithOptions)]
138pub struct KinesisSinkConfig {
139 #[serde(flatten)]
140 pub common: KinesisCommon,
141}
142
143impl EnforceSecret for KinesisSinkConfig {
144 fn enforce_one(prop: &str) -> crate::error::ConnectorResult<()> {
145 KinesisCommon::enforce_one(prop)?;
146 Ok(())
147 }
148}
149
150impl KinesisSinkConfig {
151 pub fn from_btreemap(properties: BTreeMap<String, String>) -> Result<Self> {
152 let config =
153 serde_json::from_value::<KinesisSinkConfig>(serde_json::to_value(properties).unwrap())
154 .map_err(|e| SinkError::Config(anyhow!(e)))?;
155 Ok(config)
156 }
157}
158
159pub struct KinesisSinkWriter {
160 pub config: KinesisSinkConfig,
161 formatter: SinkFormatterImpl,
162 client: KinesisClient,
163}
164
165struct KinesisSinkPayloadWriter {
166 client: KinesisClient,
167 entries: Vec<(PutRecordsRequestEntry, usize)>,
168 stream_name: String,
169}
170
171impl KinesisSinkWriter {
172 pub async fn new(
173 config: KinesisSinkConfig,
174 schema: Schema,
175 pk_indices: Vec<usize>,
176 format_desc: &SinkFormatDesc,
177 db_name: String,
178 sink_from_name: String,
179 ) -> Result<Self> {
180 let formatter = SinkFormatterImpl::new(
181 format_desc,
182 schema,
183 pk_indices,
184 db_name,
185 sink_from_name,
186 &config.common.stream_name,
187 )
188 .await?;
189 let client = config
190 .common
191 .build_client()
192 .await
193 .map_err(|err| SinkError::Kinesis(anyhow!(err)))?;
194 Ok(Self {
195 config: config.clone(),
196 formatter,
197 client,
198 })
199 }
200
201 fn new_payload_writer(&self) -> KinesisSinkPayloadWriter {
202 KinesisSinkPayloadWriter {
203 client: self.client.clone(),
204 entries: vec![],
205 stream_name: self.config.common.stream_name.clone(),
206 }
207 }
208}
209
210mod opaque_type {
211 use std::cmp::min;
212 use std::time::Duration;
213
214 use thiserror_ext::AsReport;
215 use tokio::time::sleep;
216 use tokio_retry::strategy::{ExponentialBackoff, jitter};
217 use tracing::warn;
218
219 use super::*;
220 pub type KinesisSinkPayloadWriterDeliveryFuture =
221 impl TryFuture<Ok = (), Error = SinkError> + Unpin + Send + 'static;
222
223 impl KinesisSinkPayloadWriter {
224 #[define_opaque(KinesisSinkPayloadWriterDeliveryFuture)]
225 pub(super) fn finish(self) -> KinesisSinkPayloadWriterDeliveryFuture {
226 async move {
230 const MAX_RECORD_COUNT: usize = 500;
236 const MAX_SINGLE_RECORD_PAYLOAD_SIZE: usize = 1 << 20;
237 const MAX_TOTAL_RECORD_PAYLOAD_SIZE: usize = 5 * (1 << 20);
238 const MAX_NO_PROGRESS_RETRY_COUNT: usize = 3;
240
241 let mut remaining_no_progress_retry_count = MAX_NO_PROGRESS_RETRY_COUNT;
242 let total_count = self.entries.len();
243 let mut start_idx = 0;
244
245 let mut throttle_delay = None;
246
247 while start_idx < total_count {
248 let max_record_count = min(MAX_RECORD_COUNT, total_count - start_idx);
253 let mut records = Vec::with_capacity(max_record_count);
254 let mut total_payload_size = 0;
255 for i in start_idx..(start_idx + max_record_count) {
256 let (record, size) = &self.entries[i];
257 if *size >= MAX_SINGLE_RECORD_PAYLOAD_SIZE {
258 warn!(
259 size,
260 partition = record.partition_key,
261 "encounter a large single record"
262 );
263 }
264 if total_payload_size + *size < MAX_TOTAL_RECORD_PAYLOAD_SIZE {
265 total_payload_size += *size;
266 records.push(record.clone());
267 } else {
268 break;
269 }
270 }
271 if records.is_empty() {
272 records.push(self.entries[start_idx].0.clone());
274 }
275
276 let record_count = records.len();
278 match self
279 .client
280 .put_records()
281 .stream_name(&self.stream_name)
282 .set_records(Some(records))
283 .send()
284 .await
285 {
286 Ok(output) => {
287 if record_count != output.records.len() {
288 return Err(SinkError::Kinesis(anyhow!("request record count {} not match the response record count {}", record_count, output.records.len())));
289 }
290 if let Some((first_failed_idx, result_entry)) = Self::first_failed_entry(output) {
298 let partially_sent_count = first_failed_idx;
300 if partially_sent_count > 0 {
301 warn!(
302 partially_sent_count,
303 record_count,
304 "records are partially sent. code: [{}], message: [{}]",
305 result_entry.error_code.unwrap_or_default(),
306 result_entry.error_message.unwrap_or_default()
307 );
308 start_idx += partially_sent_count;
309 remaining_no_progress_retry_count = MAX_NO_PROGRESS_RETRY_COUNT;
311 } else if let Some(err_code) = &result_entry.error_code && err_code == "ProvisionedThroughputExceededException" {
312 let throttle_delay = throttle_delay.get_or_insert_with(|| ExponentialBackoff::from_millis(100).factor(2).max_delay(Duration::from_secs(2)).map(jitter)).next().expect("should not be none");
318 warn!(err_string = ?result_entry.error_message, ?throttle_delay, "throttle");
319 sleep(throttle_delay).await;
320 } else {
321 assert_eq!(first_failed_idx, 0);
323 remaining_no_progress_retry_count -= 1;
324 if remaining_no_progress_retry_count == 0 {
325 return Err(SinkError::Kinesis(anyhow!(
326 "failed to send records. sent {} out of {}, last err: code: [{}], message: [{}]",
327 start_idx,
328 total_count,
329 result_entry.error_code.unwrap_or_default(),
330 result_entry.error_message.unwrap_or_default()
331 )));
332 } else {
333 warn!(
334 remaining_no_progress_retry_count,
335 sent = start_idx,
336 total_count,
337 "failed to send records. code: [{}], message: [{}]",
338 result_entry.error_code.unwrap_or_default(),
339 result_entry.error_message.unwrap_or_default()
340 )
341 }
342 }
343 } else {
344 start_idx += record_count;
345 remaining_no_progress_retry_count = MAX_NO_PROGRESS_RETRY_COUNT;
347 throttle_delay = None;
349 }
350 }
351 Err(e) => {
352 remaining_no_progress_retry_count -= 1;
353 if remaining_no_progress_retry_count == 0 {
354 return Err(SinkError::Kinesis(anyhow!(e).context(format!(
355 "failed to send records. sent {} out of {}",
356 start_idx, total_count,
357 ))));
358 } else {
359 warn!(
360 remaining_no_progress_retry_count,
361 sent = start_idx,
362 total_count,
363 "failed to send records. err: [{:?}]",
364 e.as_report(),
365 )
366 }
367 }
368 }
369 }
370 Ok(())
371 }
372 .boxed()
373 }
374 }
375}
376pub use opaque_type::KinesisSinkPayloadWriterDeliveryFuture;
377
378impl KinesisSinkPayloadWriter {
379 fn first_failed_entry(output: PutRecordsOutput) -> Option<(usize, PutRecordsResultEntry)> {
380 output
390 .records
391 .into_iter()
392 .find_position(|entry| entry.shard_id.is_none())
393 }
394
395 fn put_record(&mut self, key: String, payload: Vec<u8>) {
396 let size = key.len() + payload.len();
397 self.entries.push((
398 PutRecordsRequestEntry::builder()
399 .partition_key(key)
400 .data(Blob::new(payload))
401 .build()
402 .expect("should not fail because we have set `data` and `partition_key`"),
403 size,
404 ))
405 }
406}
407
408impl FormattedSink for KinesisSinkPayloadWriter {
409 type K = String;
410 type V = Vec<u8>;
411
412 async fn write_one(&mut self, k: Option<Self::K>, v: Option<Self::V>) -> Result<()> {
413 self.put_record(
414 k.ok_or_else(|| SinkError::Kinesis(anyhow!("no key provided")))?,
415 v.unwrap_or_default(),
416 );
417 Ok(())
418 }
419}
420
421impl AsyncTruncateSinkWriter for KinesisSinkWriter {
422 type DeliveryFuture = KinesisSinkPayloadWriterDeliveryFuture;
423
424 async fn write_chunk<'a>(
425 &'a mut self,
426 chunk: StreamChunk,
427 mut add_future: DeliveryFutureManagerAddFuture<'a, Self::DeliveryFuture>,
428 ) -> Result<()> {
429 let mut payload_writer = self.new_payload_writer();
430 dispatch_sink_formatter_str_key_impl!(
431 &self.formatter,
432 formatter,
433 payload_writer.write_chunk(chunk, formatter).await
434 )?;
435
436 add_future
437 .add_future_may_await(payload_writer.finish())
438 .await?;
439 Ok(())
440 }
441}
442
443#[cfg(test)]
444mod tests {
445 use aws_sdk_kinesis::types::PutRecordsRequestEntry;
446 use aws_smithy_types::Blob;
447
448 #[test]
449 fn test_kinesis_entry_builder_save_unwrap() {
450 PutRecordsRequestEntry::builder()
451 .data(Blob::new(b"data"))
452 .partition_key("partition-key")
453 .build()
454 .unwrap();
455 }
456}