risingwave_connector/sink/
dynamodb.rs1use std::collections::{BTreeMap, HashMap, HashSet};
16
17use anyhow::{Context, anyhow};
18use aws_sdk_dynamodb as dynamodb;
19use aws_sdk_dynamodb::client::Client;
20use aws_smithy_types::Blob;
21use dynamodb::types::{AttributeValue, TableStatus, WriteRequest};
22use futures::prelude::TryFuture;
23use futures::prelude::future::TryFutureExt;
24use risingwave_common::array::{Op, RowRef, StreamChunk};
25use risingwave_common::catalog::Schema;
26use risingwave_common::row::Row as _;
27use risingwave_common::types::{DataType, ScalarRefImpl, ToText};
28use risingwave_common::util::iter_util::ZipEqDebug;
29use serde::Deserialize;
30use serde_with::{DisplayFromStr, serde_as};
31use with_options::WithOptions;
32use write_chunk_future::{DynamoDbPayloadWriter, WriteChunkFuture};
33
34use super::log_store::DeliveryFutureManagerAddFuture;
35use super::writer::{
36 AsyncTruncateLogSinkerOf, AsyncTruncateSinkWriter, AsyncTruncateSinkWriterExt,
37};
38use super::{Result, Sink, SinkError, SinkParam, SinkWriterParam};
39use crate::connector_common::AwsAuthProps;
40use crate::enforce_secret::EnforceSecret;
41use crate::error::ConnectorResult;
42
43pub const DYNAMO_DB_SINK: &str = "dynamodb";
44
45#[serde_as]
46#[derive(Deserialize, Debug, Clone, WithOptions)]
47pub struct DynamoDbConfig {
48 #[serde(rename = "table", alias = "dynamodb.table")]
49 pub table: String,
50
51 #[serde(rename = "dynamodb.max_batch_rows", default = "default_max_batch_rows")]
52 #[serde_as(as = "DisplayFromStr")]
53 #[deprecated]
54 pub max_batch_rows: usize,
55
56 #[serde(flatten)]
57 pub aws_auth_props: AwsAuthProps,
58
59 #[serde(
60 rename = "dynamodb.max_batch_item_nums",
61 default = "default_max_batch_item_nums"
62 )]
63 #[serde_as(as = "DisplayFromStr")]
64 pub max_batch_item_nums: usize,
65
66 #[serde(
67 rename = "dynamodb.max_future_send_nums",
68 default = "default_max_future_send_nums"
69 )]
70 #[serde_as(as = "DisplayFromStr")]
71 pub max_future_send_nums: usize,
72}
73
74impl EnforceSecret for DynamoDbConfig {
75 fn enforce_one(prop: &str) -> crate::error::ConnectorResult<()> {
76 AwsAuthProps::enforce_one(prop)
77 }
78}
79
80fn default_max_batch_item_nums() -> usize {
81 25
82}
83
84fn default_max_future_send_nums() -> usize {
85 256
86}
87
88fn default_max_batch_rows() -> usize {
89 1024
90}
91
92impl DynamoDbConfig {
93 pub async fn build_client(&self) -> ConnectorResult<Client> {
94 let config = &self.aws_auth_props;
95 let aws_config = config.build_config().await?;
96
97 Ok(Client::new(&aws_config))
98 }
99
100 fn from_btreemap(values: BTreeMap<String, String>) -> Result<Self> {
101 serde_json::from_value::<DynamoDbConfig>(serde_json::to_value(values).unwrap())
102 .map_err(|e| SinkError::Config(anyhow!(e)))
103 }
104}
105
106#[derive(Clone, Debug)]
107pub struct DynamoDbSink {
108 pub config: DynamoDbConfig,
109 schema: Schema,
110 pk_indices: Vec<usize>,
111}
112
113impl EnforceSecret for DynamoDbSink {
114 fn enforce_secret<'a>(
115 prop_iter: impl Iterator<Item = &'a str>,
116 ) -> crate::error::ConnectorResult<()> {
117 for prop in prop_iter {
118 DynamoDbConfig::enforce_one(prop)?;
119 }
120 Ok(())
121 }
122}
123
124impl Sink for DynamoDbSink {
125 type LogSinker = AsyncTruncateLogSinkerOf<DynamoDbSinkWriter>;
126
127 const SINK_NAME: &'static str = DYNAMO_DB_SINK;
128
129 async fn validate(&self) -> Result<()> {
130 risingwave_common::license::Feature::DynamoDbSink
131 .check_available()
132 .map_err(|e| anyhow::anyhow!(e))?;
133 let client = (self.config.build_client().await)
134 .context("validate DynamoDB sink error")
135 .map_err(SinkError::DynamoDb)?;
136
137 let table_name = &self.config.table;
138 let output = client
139 .describe_table()
140 .table_name(table_name)
141 .send()
142 .await
143 .map_err(|e| anyhow!(e))?;
144 let Some(table) = output.table else {
145 return Err(SinkError::DynamoDb(anyhow!(
146 "table {} not found",
147 table_name
148 )));
149 };
150 if !matches!(table.table_status(), Some(TableStatus::Active)) {
151 return Err(SinkError::DynamoDb(anyhow!(
152 "table {} is not active",
153 table_name
154 )));
155 }
156 let pk_set: HashSet<String> = self
157 .schema
158 .fields()
159 .iter()
160 .enumerate()
161 .filter(|(k, _)| self.pk_indices.contains(k))
162 .map(|(_, v)| v.name.clone())
163 .collect();
164 let key_schema = table.key_schema();
165
166 for key_element in key_schema.iter().map(|x| x.attribute_name()) {
167 if !pk_set.contains(key_element) {
168 return Err(SinkError::DynamoDb(anyhow!(
169 "table {} key field {} not found in schema or not primary key",
170 table_name,
171 key_element
172 )));
173 }
174 }
175
176 Ok(())
177 }
178
179 async fn new_log_sinker(&self, _writer_param: SinkWriterParam) -> Result<Self::LogSinker> {
180 Ok(
181 DynamoDbSinkWriter::new(self.config.clone(), self.schema.clone())
182 .await?
183 .into_log_sinker(self.config.max_future_send_nums),
184 )
185 }
186}
187
188impl TryFrom<SinkParam> for DynamoDbSink {
189 type Error = SinkError;
190
191 fn try_from(param: SinkParam) -> std::result::Result<Self, Self::Error> {
192 let schema = param.schema();
193 let pk_indices = param.downstream_pk_or_empty();
194 let config = DynamoDbConfig::from_btreemap(param.properties)?;
195
196 Ok(Self {
197 config,
198 schema,
199 pk_indices,
200 })
201 }
202}
203
204#[derive(Debug)]
205struct DynamoDbRequest {
206 inner: WriteRequest,
207 key_items: Vec<String>,
208}
209
210impl DynamoDbRequest {
211 fn extract_pk_values(&self) -> Option<Vec<AttributeValue>> {
212 let key = match (&self.inner.put_request(), &self.inner.delete_request()) {
213 (Some(put_req), None) => &put_req.item,
214 (None, Some(del_req)) => &del_req.key,
215 _ => return None,
216 };
217 let vs = key
218 .iter()
219 .filter(|(k, _)| self.key_items.contains(k))
220 .map(|(_, v)| v.clone())
221 .collect();
222 Some(vs)
223 }
224}
225
226pub struct DynamoDbSinkWriter {
227 payload_writer: DynamoDbPayloadWriter,
228 formatter: DynamoDbFormatter,
229}
230
231impl DynamoDbSinkWriter {
232 pub async fn new(config: DynamoDbConfig, schema: Schema) -> Result<Self> {
233 let client = config.build_client().await?;
234 let table_name = &config.table;
235 let output = client
236 .describe_table()
237 .table_name(table_name)
238 .send()
239 .await
240 .map_err(|e| anyhow!(e))?;
241 let Some(table) = output.table else {
242 return Err(SinkError::DynamoDb(anyhow!(
243 "table {} not found",
244 table_name
245 )));
246 };
247 let dynamodb_keys = table
248 .key_schema
249 .unwrap_or_default()
250 .into_iter()
251 .map(|k| k.attribute_name)
252 .collect();
253
254 let payload_writer = DynamoDbPayloadWriter {
255 client,
256 table: config.table.clone(),
257 dynamodb_keys,
258 max_batch_item_nums: config.max_batch_item_nums,
259 };
260
261 Ok(Self {
262 payload_writer,
263 formatter: DynamoDbFormatter { schema },
264 })
265 }
266
267 fn write_chunk_inner(&mut self, chunk: StreamChunk) -> Result<WriteChunkFuture> {
268 let mut request_items = Vec::new();
269 for (op, row) in chunk.rows() {
270 let items = self.formatter.format_row(row)?;
271 match op {
272 Op::Insert | Op::UpdateInsert => {
273 self.payload_writer
274 .write_one_insert(items, &mut request_items);
275 }
276 Op::Delete => {
277 self.payload_writer
278 .write_one_delete(items, &mut request_items);
279 }
280 Op::UpdateDelete => {}
281 }
282 }
283 Ok(self.payload_writer.write_chunk(request_items))
284 }
285}
286
287pub type DynamoDbSinkDeliveryFuture = impl TryFuture<Ok = (), Error = SinkError> + Unpin + 'static;
288
289impl AsyncTruncateSinkWriter for DynamoDbSinkWriter {
290 type DeliveryFuture = DynamoDbSinkDeliveryFuture;
291
292 #[define_opaque(DynamoDbSinkDeliveryFuture)]
293 async fn write_chunk<'a>(
294 &'a mut self,
295 chunk: StreamChunk,
296 mut add_future: DeliveryFutureManagerAddFuture<'a, Self::DeliveryFuture>,
297 ) -> Result<()> {
298 let futures = self.write_chunk_inner(chunk)?;
299 add_future
300 .add_future_may_await(futures.map_ok(|_: Vec<()>| ()))
301 .await?;
302 Ok(())
303 }
304}
305
306struct DynamoDbFormatter {
307 schema: Schema,
308}
309
310impl DynamoDbFormatter {
311 fn format_row(&self, row: RowRef<'_>) -> Result<HashMap<String, AttributeValue>> {
312 row.iter()
313 .zip_eq_debug((self.schema.clone()).into_fields())
314 .map(|(scalar, field)| {
315 map_data(scalar, &field.data_type()).map(|attr| (field.name, attr))
316 })
317 .collect()
318 }
319}
320
321fn map_data(scalar_ref: Option<ScalarRefImpl<'_>>, data_type: &DataType) -> Result<AttributeValue> {
322 let Some(scalar_ref) = scalar_ref else {
323 return Ok(AttributeValue::Null(true));
324 };
325 let attr = match data_type {
326 DataType::Int16
327 | DataType::Int32
328 | DataType::Int64
329 | DataType::Int256
330 | DataType::Float32
331 | DataType::Float64
332 | DataType::Decimal
333 | DataType::Serial => AttributeValue::N(scalar_ref.to_text_with_type(data_type)),
334 DataType::Varchar
336 | DataType::Interval
337 | DataType::Date
338 | DataType::Time
339 | DataType::Timestamp
340 | DataType::Timestamptz
341 | DataType::Jsonb => AttributeValue::S(scalar_ref.to_text_with_type(data_type)),
342 DataType::Boolean => AttributeValue::Bool(scalar_ref.into_bool()),
343 DataType::Bytea => AttributeValue::B(Blob::new(scalar_ref.into_bytea())),
344 DataType::List(lt) => {
345 let list_attr = scalar_ref
346 .into_list()
347 .iter()
348 .map(|x| map_data(x, lt.elem()))
349 .collect::<Result<Vec<_>>>()?;
350 AttributeValue::L(list_attr)
351 }
352 DataType::Struct(st) => {
353 let mut map = HashMap::with_capacity(st.len());
354 for (sub_datum_ref, (name, data_type)) in scalar_ref
355 .into_struct()
356 .iter_fields_ref()
357 .zip_eq_debug(st.iter())
358 {
359 let attr = map_data(sub_datum_ref, data_type)?;
360 map.insert(name.to_owned(), attr);
361 }
362 AttributeValue::M(map)
363 }
364 DataType::Map(_m) => {
365 return Err(SinkError::DynamoDb(anyhow!("map is not supported yet")));
366 }
367 DataType::Vector(_) => {
368 return Err(SinkError::DynamoDb(anyhow!("vector is not supported yet")));
369 }
370 };
371 Ok(attr)
372}
373
374mod write_chunk_future {
375 use core::result;
376 use std::collections::HashMap;
377
378 use anyhow::anyhow;
379 use aws_sdk_dynamodb as dynamodb;
380 use aws_sdk_dynamodb::client::Client;
381 use aws_smithy_runtime_api::client::orchestrator::HttpResponse;
382 use dynamodb::error::SdkError;
383 use dynamodb::operation::batch_write_item::{BatchWriteItemError, BatchWriteItemOutput};
384 use dynamodb::types::{
385 AttributeValue, DeleteRequest, PutRequest, ReturnConsumedCapacity,
386 ReturnItemCollectionMetrics, WriteRequest,
387 };
388 use futures::future::{Map, TryJoinAll};
389 use futures::prelude::Future;
390 use futures::prelude::future::{FutureExt, try_join_all};
391 use itertools::Itertools;
392 use maplit::hashmap;
393
394 use super::{DynamoDbRequest, Result, SinkError};
395
396 pub type WriteChunkFuture = TryJoinAll<
397 Map<
398 impl Future<
399 Output = result::Result<
400 BatchWriteItemOutput,
401 SdkError<BatchWriteItemError, HttpResponse>,
402 >,
403 >,
404 impl FnOnce(
405 result::Result<BatchWriteItemOutput, SdkError<BatchWriteItemError, HttpResponse>>,
406 ) -> Result<()>,
407 >,
408 >;
409 pub struct DynamoDbPayloadWriter {
410 pub client: Client,
411 pub table: String,
412 pub dynamodb_keys: Vec<String>,
413 pub max_batch_item_nums: usize,
414 }
415
416 impl DynamoDbPayloadWriter {
417 pub fn write_one_insert(
418 &mut self,
419 item: HashMap<String, AttributeValue>,
420 request_items: &mut Vec<DynamoDbRequest>,
421 ) {
422 let put_req = PutRequest::builder().set_item(Some(item)).build().unwrap();
423 let req = WriteRequest::builder().put_request(put_req).build();
424 self.write_one_req(req, request_items);
425 }
426
427 pub fn write_one_delete(
428 &mut self,
429 key: HashMap<String, AttributeValue>,
430 request_items: &mut Vec<DynamoDbRequest>,
431 ) {
432 let key = key
433 .into_iter()
434 .filter(|(k, _)| self.dynamodb_keys.contains(k))
435 .collect();
436 let del_req = DeleteRequest::builder().set_key(Some(key)).build().unwrap();
437 let req = WriteRequest::builder().delete_request(del_req).build();
438 self.write_one_req(req, request_items);
439 }
440
441 pub fn write_one_req(
442 &mut self,
443 req: WriteRequest,
444 request_items: &mut Vec<DynamoDbRequest>,
445 ) {
446 let r_req = DynamoDbRequest {
447 inner: req,
448 key_items: self.dynamodb_keys.clone(),
449 };
450 if let Some(v) = r_req.extract_pk_values() {
451 request_items.retain(|item| {
452 !item
453 .extract_pk_values()
454 .unwrap_or_default()
455 .iter()
456 .all(|x| v.contains(x))
457 });
458 }
459 request_items.push(r_req);
460 }
461
462 #[define_opaque(WriteChunkFuture)]
463 pub fn write_chunk(&mut self, request_items: Vec<DynamoDbRequest>) -> WriteChunkFuture {
464 let table = self.table.clone();
465 let chunks = request_items
466 .into_iter()
467 .map(|r| r.inner)
468 .chunks(self.max_batch_item_nums);
469 let futures = chunks.into_iter().map(|chunk| {
470 let req_items = chunk.collect();
471 let reqs = hashmap! {
472 table.clone() => req_items,
473 };
474 self.client
475 .batch_write_item()
476 .set_request_items(Some(reqs))
477 .return_consumed_capacity(ReturnConsumedCapacity::None)
478 .return_item_collection_metrics(ReturnItemCollectionMetrics::None)
479 .send()
480 .map(|result| {
481 result
482 .map_err(|e| {
483 SinkError::DynamoDb(
484 anyhow!(e).context("failed to delete item from DynamoDB sink"),
485 )
486 })
487 .map(|_| ())
488 })
489 });
490 try_join_all(futures)
491 }
492 }
493}