risingwave_connector/sink/
kinesis.rs1use std::collections::BTreeMap;
16
17use anyhow::{Context, anyhow};
18use aws_sdk_kinesis::Client as KinesisClient;
19use aws_sdk_kinesis::operation::put_records::PutRecordsOutput;
20use aws_sdk_kinesis::primitives::Blob;
21use aws_sdk_kinesis::types::{PutRecordsRequestEntry, PutRecordsResultEntry};
22use futures::{FutureExt, TryFuture};
23use itertools::Itertools;
24use risingwave_common::array::StreamChunk;
25use risingwave_common::catalog::Schema;
26use serde_derive::Deserialize;
27use serde_with::serde_as;
28use with_options::WithOptions;
29
30use super::SinkParam;
31use super::catalog::SinkFormatDesc;
32use crate::connector_common::KinesisCommon;
33use crate::dispatch_sink_formatter_str_key_impl;
34use crate::enforce_secret::EnforceSecret;
35use crate::sink::formatter::SinkFormatterImpl;
36use crate::sink::log_store::DeliveryFutureManagerAddFuture;
37use crate::sink::writer::{
38 AsyncTruncateLogSinkerOf, AsyncTruncateSinkWriter, AsyncTruncateSinkWriterExt, FormattedSink,
39};
40use crate::sink::{DummySinkCommitCoordinator, Result, Sink, SinkError, SinkWriterParam};
41pub const KINESIS_SINK: &str = "kinesis";
42
43#[derive(Clone, Debug)]
44pub struct KinesisSink {
45 pub config: KinesisSinkConfig,
46 schema: Schema,
47 pk_indices: Vec<usize>,
48 format_desc: SinkFormatDesc,
49 db_name: String,
50 sink_from_name: String,
51}
52
53impl EnforceSecret for KinesisSink {
54 fn enforce_secret<'a>(
55 prop_iter: impl Iterator<Item = &'a str>,
56 ) -> crate::error::ConnectorResult<()> {
57 for prop in prop_iter {
58 KinesisSinkConfig::enforce_one(prop)?;
59 }
60 Ok(())
61 }
62}
63
64impl TryFrom<SinkParam> for KinesisSink {
65 type Error = SinkError;
66
67 fn try_from(param: SinkParam) -> std::result::Result<Self, Self::Error> {
68 let schema = param.schema();
69 let config = KinesisSinkConfig::from_btreemap(param.properties)?;
70 Ok(Self {
71 config,
72 schema,
73 pk_indices: param.downstream_pk,
74 format_desc: param
75 .format_desc
76 .ok_or_else(|| SinkError::Config(anyhow!("missing FORMAT ... ENCODE ...")))?,
77 db_name: param.db_name,
78 sink_from_name: param.sink_from_name,
79 })
80 }
81}
82
83const KINESIS_SINK_MAX_PENDING_CHUNK_NUM: usize = 64;
84
85impl Sink for KinesisSink {
86 type Coordinator = DummySinkCommitCoordinator;
87 type LogSinker = AsyncTruncateLogSinkerOf<KinesisSinkWriter>;
88
89 const SINK_NAME: &'static str = KINESIS_SINK;
90
91 async fn validate(&self) -> Result<()> {
92 if self.pk_indices.is_empty() {
95 return Err(SinkError::Config(anyhow!(
96 "kinesis sink requires partition key (please define in `primary_key` field)",
97 )));
98 }
99 SinkFormatterImpl::new(
101 &self.format_desc,
102 self.schema.clone(),
103 self.pk_indices.clone(),
104 self.db_name.clone(),
105 self.sink_from_name.clone(),
106 &self.config.common.stream_name,
107 )
108 .await?;
109
110 let client = self.config.common.build_client().await?;
112 client
113 .list_shards()
114 .stream_name(&self.config.common.stream_name)
115 .send()
116 .await
117 .context("failed to list shards")
118 .map_err(SinkError::Kinesis)?;
119 Ok(())
120 }
121
122 async fn new_log_sinker(&self, _writer_param: SinkWriterParam) -> Result<Self::LogSinker> {
123 Ok(KinesisSinkWriter::new(
124 self.config.clone(),
125 self.schema.clone(),
126 self.pk_indices.clone(),
127 &self.format_desc,
128 self.db_name.clone(),
129 self.sink_from_name.clone(),
130 )
131 .await?
132 .into_log_sinker(KINESIS_SINK_MAX_PENDING_CHUNK_NUM))
133 }
134}
135
136#[serde_as]
137#[derive(Clone, Debug, Deserialize, WithOptions)]
138pub struct KinesisSinkConfig {
139 #[serde(flatten)]
140 pub common: KinesisCommon,
141}
142
143impl EnforceSecret for KinesisSinkConfig {
144 fn enforce_one(prop: &str) -> crate::error::ConnectorResult<()> {
145 KinesisCommon::enforce_one(prop)?;
146 Ok(())
147 }
148}
149
150impl KinesisSinkConfig {
151 pub fn from_btreemap(properties: BTreeMap<String, String>) -> Result<Self> {
152 let config =
153 serde_json::from_value::<KinesisSinkConfig>(serde_json::to_value(properties).unwrap())
154 .map_err(|e| SinkError::Config(anyhow!(e)))?;
155 Ok(config)
156 }
157}
158
159pub struct KinesisSinkWriter {
160 pub config: KinesisSinkConfig,
161 formatter: SinkFormatterImpl,
162 client: KinesisClient,
163}
164
165struct KinesisSinkPayloadWriter {
166 client: KinesisClient,
167 entries: Vec<(PutRecordsRequestEntry, usize)>,
168 stream_name: String,
169}
170
171impl KinesisSinkWriter {
172 pub async fn new(
173 config: KinesisSinkConfig,
174 schema: Schema,
175 pk_indices: Vec<usize>,
176 format_desc: &SinkFormatDesc,
177 db_name: String,
178 sink_from_name: String,
179 ) -> Result<Self> {
180 let formatter = SinkFormatterImpl::new(
181 format_desc,
182 schema,
183 pk_indices,
184 db_name,
185 sink_from_name,
186 &config.common.stream_name,
187 )
188 .await?;
189 let client = config
190 .common
191 .build_client()
192 .await
193 .map_err(|err| SinkError::Kinesis(anyhow!(err)))?;
194 Ok(Self {
195 config: config.clone(),
196 formatter,
197 client,
198 })
199 }
200
201 fn new_payload_writer(&self) -> KinesisSinkPayloadWriter {
202 KinesisSinkPayloadWriter {
203 client: self.client.clone(),
204 entries: vec![],
205 stream_name: self.config.common.stream_name.clone(),
206 }
207 }
208}
209
210mod opaque_type {
211 use std::cmp::min;
212 use std::time::Duration;
213
214 use thiserror_ext::AsReport;
215 use tokio::time::sleep;
216 use tokio_retry::strategy::{ExponentialBackoff, jitter};
217 use tracing::warn;
218
219 use super::*;
220 pub type KinesisSinkPayloadWriterDeliveryFuture =
221 impl TryFuture<Ok = (), Error = SinkError> + Unpin + Send + 'static;
222
223 impl KinesisSinkPayloadWriter {
224 pub(super) fn finish(self) -> KinesisSinkPayloadWriterDeliveryFuture {
225 async move {
229 const MAX_RECORD_COUNT: usize = 500;
235 const MAX_SINGLE_RECORD_PAYLOAD_SIZE: usize = 1 << 20;
236 const MAX_TOTAL_RECORD_PAYLOAD_SIZE: usize = 5 * (1 << 20);
237 const MAX_NO_PROGRESS_RETRY_COUNT: usize = 3;
239
240 let mut remaining_no_progress_retry_count = MAX_NO_PROGRESS_RETRY_COUNT;
241 let total_count = self.entries.len();
242 let mut start_idx = 0;
243
244 let mut throttle_delay = None;
245
246 while start_idx < total_count {
247 let max_record_count = min(MAX_RECORD_COUNT, total_count - start_idx);
252 let mut records = Vec::with_capacity(max_record_count);
253 let mut total_payload_size = 0;
254 for i in start_idx..(start_idx + max_record_count) {
255 let (record, size) = &self.entries[i];
256 if *size >= MAX_SINGLE_RECORD_PAYLOAD_SIZE {
257 warn!(
258 size,
259 partition = record.partition_key,
260 "encounter a large single record"
261 );
262 }
263 if total_payload_size + *size < MAX_TOTAL_RECORD_PAYLOAD_SIZE {
264 total_payload_size += *size;
265 records.push(record.clone());
266 } else {
267 break;
268 }
269 }
270 if records.is_empty() {
271 records.push(self.entries[start_idx].0.clone());
273 }
274
275 let record_count = records.len();
277 match self
278 .client
279 .put_records()
280 .stream_name(&self.stream_name)
281 .set_records(Some(records))
282 .send()
283 .await
284 {
285 Ok(output) => {
286 if record_count != output.records.len() {
287 return Err(SinkError::Kinesis(anyhow!("request record count {} not match the response record count {}", record_count, output.records.len())));
288 }
289 if let Some((first_failed_idx, result_entry)) = Self::first_failed_entry(output) {
297 let partially_sent_count = first_failed_idx;
299 if partially_sent_count > 0 {
300 warn!(
301 partially_sent_count,
302 record_count,
303 "records are partially sent. code: [{}], message: [{}]",
304 result_entry.error_code.unwrap_or_default(),
305 result_entry.error_message.unwrap_or_default()
306 );
307 start_idx += partially_sent_count;
308 remaining_no_progress_retry_count = MAX_NO_PROGRESS_RETRY_COUNT;
310 } else if let Some(err_code) = &result_entry.error_code && err_code == "ProvisionedThroughputExceededException" {
311 let throttle_delay = throttle_delay.get_or_insert_with(|| ExponentialBackoff::from_millis(100).factor(2).max_delay(Duration::from_secs(2)).map(jitter)).next().expect("should not be none");
317 warn!(err_string = ?result_entry.error_message, ?throttle_delay, "throttle");
318 sleep(throttle_delay).await;
319 } else {
320 assert_eq!(first_failed_idx, 0);
322 remaining_no_progress_retry_count -= 1;
323 if remaining_no_progress_retry_count == 0 {
324 return Err(SinkError::Kinesis(anyhow!(
325 "failed to send records. sent {} out of {}, last err: code: [{}], message: [{}]",
326 start_idx,
327 total_count,
328 result_entry.error_code.unwrap_or_default(),
329 result_entry.error_message.unwrap_or_default()
330 )));
331 } else {
332 warn!(
333 remaining_no_progress_retry_count,
334 sent = start_idx,
335 total_count,
336 "failed to send records. code: [{}], message: [{}]",
337 result_entry.error_code.unwrap_or_default(),
338 result_entry.error_message.unwrap_or_default()
339 )
340 }
341 }
342 } else {
343 start_idx += record_count;
344 remaining_no_progress_retry_count = MAX_NO_PROGRESS_RETRY_COUNT;
346 throttle_delay = None;
348 }
349 }
350 Err(e) => {
351 remaining_no_progress_retry_count -= 1;
352 if remaining_no_progress_retry_count == 0 {
353 return Err(SinkError::Kinesis(anyhow!(e).context(format!(
354 "failed to send records. sent {} out of {}",
355 start_idx, total_count,
356 ))));
357 } else {
358 warn!(
359 remaining_no_progress_retry_count,
360 sent = start_idx,
361 total_count,
362 "failed to send records. err: [{:?}]",
363 e.as_report(),
364 )
365 }
366 }
367 }
368 }
369 Ok(())
370 }
371 .boxed()
372 }
373 }
374}
375pub use opaque_type::KinesisSinkPayloadWriterDeliveryFuture;
376
377impl KinesisSinkPayloadWriter {
378 fn first_failed_entry(output: PutRecordsOutput) -> Option<(usize, PutRecordsResultEntry)> {
379 output
389 .records
390 .into_iter()
391 .find_position(|entry| entry.shard_id.is_none())
392 }
393
394 fn put_record(&mut self, key: String, payload: Vec<u8>) {
395 let size = key.len() + payload.len();
396 self.entries.push((
397 PutRecordsRequestEntry::builder()
398 .partition_key(key)
399 .data(Blob::new(payload))
400 .build()
401 .expect("should not fail because we have set `data` and `partition_key`"),
402 size,
403 ))
404 }
405}
406
407impl FormattedSink for KinesisSinkPayloadWriter {
408 type K = String;
409 type V = Vec<u8>;
410
411 async fn write_one(&mut self, k: Option<Self::K>, v: Option<Self::V>) -> Result<()> {
412 self.put_record(
413 k.ok_or_else(|| SinkError::Kinesis(anyhow!("no key provided")))?,
414 v.unwrap_or_default(),
415 );
416 Ok(())
417 }
418}
419
420impl AsyncTruncateSinkWriter for KinesisSinkWriter {
421 type DeliveryFuture = KinesisSinkPayloadWriterDeliveryFuture;
422
423 async fn write_chunk<'a>(
424 &'a mut self,
425 chunk: StreamChunk,
426 mut add_future: DeliveryFutureManagerAddFuture<'a, Self::DeliveryFuture>,
427 ) -> Result<()> {
428 let mut payload_writer = self.new_payload_writer();
429 dispatch_sink_formatter_str_key_impl!(
430 &self.formatter,
431 formatter,
432 payload_writer.write_chunk(chunk, formatter).await
433 )?;
434
435 add_future
436 .add_future_may_await(payload_writer.finish())
437 .await?;
438 Ok(())
439 }
440}
441
442#[cfg(test)]
443mod tests {
444 use aws_sdk_kinesis::types::PutRecordsRequestEntry;
445 use aws_smithy_types::Blob;
446
447 #[test]
448 fn test_kinesis_entry_builder_save_unwrap() {
449 PutRecordsRequestEntry::builder()
450 .data(Blob::new(b"data"))
451 .partition_key("partition-key")
452 .build()
453 .unwrap();
454 }
455}