risingwave_connector/sink/
dynamodb.rs1use std::collections::{BTreeMap, HashMap, HashSet};
16
17use anyhow::{Context, anyhow};
18use aws_sdk_dynamodb as dynamodb;
19use aws_sdk_dynamodb::client::Client;
20use aws_smithy_types::Blob;
21use dynamodb::types::{AttributeValue, TableStatus, WriteRequest};
22use futures::prelude::TryFuture;
23use futures::prelude::future::TryFutureExt;
24use risingwave_common::array::{Op, RowRef, StreamChunk};
25use risingwave_common::catalog::Schema;
26use risingwave_common::row::Row as _;
27use risingwave_common::types::{DataType, ScalarRefImpl, ToText};
28use risingwave_common::util::iter_util::ZipEqDebug;
29use serde_derive::Deserialize;
30use serde_with::{DisplayFromStr, serde_as};
31use with_options::WithOptions;
32use write_chunk_future::{DynamoDbPayloadWriter, WriteChunkFuture};
33
34use super::log_store::DeliveryFutureManagerAddFuture;
35use super::writer::{
36 AsyncTruncateLogSinkerOf, AsyncTruncateSinkWriter, AsyncTruncateSinkWriterExt,
37};
38use super::{DummySinkCommitCoordinator, Result, Sink, SinkError, SinkParam, SinkWriterParam};
39use crate::connector_common::AwsAuthProps;
40use crate::error::ConnectorResult;
41
42pub const DYNAMO_DB_SINK: &str = "dynamodb";
43
44#[serde_as]
45#[derive(Deserialize, Debug, Clone, WithOptions)]
46pub struct DynamoDbConfig {
47 #[serde(rename = "table", alias = "dynamodb.table")]
48 pub table: String,
49
50 #[serde(rename = "dynamodb.max_batch_rows", default = "default_max_batch_rows")]
51 #[serde_as(as = "DisplayFromStr")]
52 #[deprecated]
53 pub max_batch_rows: usize,
54
55 #[serde(flatten)]
56 pub aws_auth_props: AwsAuthProps,
57
58 #[serde(
59 rename = "dynamodb.max_batch_item_nums",
60 default = "default_max_batch_item_nums"
61 )]
62 #[serde_as(as = "DisplayFromStr")]
63 pub max_batch_item_nums: usize,
64
65 #[serde(
66 rename = "dynamodb.max_future_send_nums",
67 default = "default_max_future_send_nums"
68 )]
69 #[serde_as(as = "DisplayFromStr")]
70 pub max_future_send_nums: usize,
71}
72
73fn default_max_batch_item_nums() -> usize {
74 25
75}
76
77fn default_max_future_send_nums() -> usize {
78 256
79}
80
81fn default_max_batch_rows() -> usize {
82 1024
83}
84
85impl DynamoDbConfig {
86 pub async fn build_client(&self) -> ConnectorResult<Client> {
87 let config = &self.aws_auth_props;
88 let aws_config = config.build_config().await?;
89
90 Ok(Client::new(&aws_config))
91 }
92
93 fn from_btreemap(values: BTreeMap<String, String>) -> Result<Self> {
94 serde_json::from_value::<DynamoDbConfig>(serde_json::to_value(values).unwrap())
95 .map_err(|e| SinkError::Config(anyhow!(e)))
96 }
97}
98
99#[derive(Clone, Debug)]
100pub struct DynamoDbSink {
101 pub config: DynamoDbConfig,
102 schema: Schema,
103 pk_indices: Vec<usize>,
104}
105
106impl Sink for DynamoDbSink {
107 type Coordinator = DummySinkCommitCoordinator;
108 type LogSinker = AsyncTruncateLogSinkerOf<DynamoDbSinkWriter>;
109
110 const SINK_NAME: &'static str = DYNAMO_DB_SINK;
111
112 async fn validate(&self) -> Result<()> {
113 risingwave_common::license::Feature::DynamoDbSink
114 .check_available()
115 .map_err(|e| anyhow::anyhow!(e))?;
116 let client = (self.config.build_client().await)
117 .context("validate DynamoDB sink error")
118 .map_err(SinkError::DynamoDb)?;
119
120 let table_name = &self.config.table;
121 let output = client
122 .describe_table()
123 .table_name(table_name)
124 .send()
125 .await
126 .map_err(|e| anyhow!(e))?;
127 let Some(table) = output.table else {
128 return Err(SinkError::DynamoDb(anyhow!(
129 "table {} not found",
130 table_name
131 )));
132 };
133 if !matches!(table.table_status(), Some(TableStatus::Active)) {
134 return Err(SinkError::DynamoDb(anyhow!(
135 "table {} is not active",
136 table_name
137 )));
138 }
139 let pk_set: HashSet<String> = self
140 .schema
141 .fields()
142 .iter()
143 .enumerate()
144 .filter(|(k, _)| self.pk_indices.contains(k))
145 .map(|(_, v)| v.name.clone())
146 .collect();
147 let key_schema = table.key_schema();
148
149 for key_element in key_schema.iter().map(|x| x.attribute_name()) {
150 if !pk_set.contains(key_element) {
151 return Err(SinkError::DynamoDb(anyhow!(
152 "table {} key field {} not found in schema or not primary key",
153 table_name,
154 key_element
155 )));
156 }
157 }
158
159 Ok(())
160 }
161
162 async fn new_log_sinker(&self, _writer_param: SinkWriterParam) -> Result<Self::LogSinker> {
163 Ok(
164 DynamoDbSinkWriter::new(self.config.clone(), self.schema.clone())
165 .await?
166 .into_log_sinker(self.config.max_future_send_nums),
167 )
168 }
169}
170
171impl TryFrom<SinkParam> for DynamoDbSink {
172 type Error = SinkError;
173
174 fn try_from(param: SinkParam) -> std::result::Result<Self, Self::Error> {
175 let schema = param.schema();
176 let config = DynamoDbConfig::from_btreemap(param.properties)?;
177
178 Ok(Self {
179 config,
180 schema,
181 pk_indices: param.downstream_pk,
182 })
183 }
184}
185
186#[derive(Debug)]
187struct DynamoDbRequest {
188 inner: WriteRequest,
189 key_items: Vec<String>,
190}
191
192impl DynamoDbRequest {
193 fn extract_pk_values(&self) -> Option<Vec<AttributeValue>> {
194 let key = match (&self.inner.put_request(), &self.inner.delete_request()) {
195 (Some(put_req), None) => &put_req.item,
196 (None, Some(del_req)) => &del_req.key,
197 _ => return None,
198 };
199 let vs = key
200 .iter()
201 .filter(|(k, _)| self.key_items.contains(k))
202 .map(|(_, v)| v.clone())
203 .collect();
204 Some(vs)
205 }
206}
207
208pub struct DynamoDbSinkWriter {
209 payload_writer: DynamoDbPayloadWriter,
210 formatter: DynamoDbFormatter,
211}
212
213impl DynamoDbSinkWriter {
214 pub async fn new(config: DynamoDbConfig, schema: Schema) -> Result<Self> {
215 let client = config.build_client().await?;
216 let table_name = &config.table;
217 let output = client
218 .describe_table()
219 .table_name(table_name)
220 .send()
221 .await
222 .map_err(|e| anyhow!(e))?;
223 let Some(table) = output.table else {
224 return Err(SinkError::DynamoDb(anyhow!(
225 "table {} not found",
226 table_name
227 )));
228 };
229 let dynamodb_keys = table
230 .key_schema
231 .unwrap_or_default()
232 .into_iter()
233 .map(|k| k.attribute_name)
234 .collect();
235
236 let payload_writer = DynamoDbPayloadWriter {
237 client,
238 table: config.table.clone(),
239 dynamodb_keys,
240 max_batch_item_nums: config.max_batch_item_nums,
241 };
242
243 Ok(Self {
244 payload_writer,
245 formatter: DynamoDbFormatter { schema },
246 })
247 }
248
249 fn write_chunk_inner(&mut self, chunk: StreamChunk) -> Result<WriteChunkFuture> {
250 let mut request_items = Vec::new();
251 for (op, row) in chunk.rows() {
252 let items = self.formatter.format_row(row)?;
253 match op {
254 Op::Insert | Op::UpdateInsert => {
255 self.payload_writer
256 .write_one_insert(items, &mut request_items);
257 }
258 Op::Delete => {
259 self.payload_writer
260 .write_one_delete(items, &mut request_items);
261 }
262 Op::UpdateDelete => {}
263 }
264 }
265 Ok(self.payload_writer.write_chunk(request_items))
266 }
267}
268
269pub type DynamoDbSinkDeliveryFuture = impl TryFuture<Ok = (), Error = SinkError> + Unpin + 'static;
270
271impl AsyncTruncateSinkWriter for DynamoDbSinkWriter {
272 type DeliveryFuture = DynamoDbSinkDeliveryFuture;
273
274 async fn write_chunk<'a>(
275 &'a mut self,
276 chunk: StreamChunk,
277 mut add_future: DeliveryFutureManagerAddFuture<'a, Self::DeliveryFuture>,
278 ) -> Result<()> {
279 let futures = self.write_chunk_inner(chunk)?;
280 add_future
281 .add_future_may_await(futures.map_ok(|_: Vec<()>| ()))
282 .await?;
283 Ok(())
284 }
285}
286
287struct DynamoDbFormatter {
288 schema: Schema,
289}
290
291impl DynamoDbFormatter {
292 fn format_row(&self, row: RowRef<'_>) -> Result<HashMap<String, AttributeValue>> {
293 row.iter()
294 .zip_eq_debug((self.schema.clone()).into_fields())
295 .map(|(scalar, field)| {
296 map_data(scalar, &field.data_type()).map(|attr| (field.name, attr))
297 })
298 .collect()
299 }
300}
301
302fn map_data(scalar_ref: Option<ScalarRefImpl<'_>>, data_type: &DataType) -> Result<AttributeValue> {
303 let Some(scalar_ref) = scalar_ref else {
304 return Ok(AttributeValue::Null(true));
305 };
306 let attr = match data_type {
307 DataType::Int16
308 | DataType::Int32
309 | DataType::Int64
310 | DataType::Int256
311 | DataType::Float32
312 | DataType::Float64
313 | DataType::Decimal
314 | DataType::Serial => AttributeValue::N(scalar_ref.to_text_with_type(data_type)),
315 DataType::Varchar
317 | DataType::Interval
318 | DataType::Date
319 | DataType::Time
320 | DataType::Timestamp
321 | DataType::Timestamptz
322 | DataType::Jsonb => AttributeValue::S(scalar_ref.to_text_with_type(data_type)),
323 DataType::Boolean => AttributeValue::Bool(scalar_ref.into_bool()),
324 DataType::Bytea => AttributeValue::B(Blob::new(scalar_ref.into_bytea())),
325 DataType::List(datatype) => {
326 let list_attr = scalar_ref
327 .into_list()
328 .iter()
329 .map(|x| map_data(x, datatype))
330 .collect::<Result<Vec<_>>>()?;
331 AttributeValue::L(list_attr)
332 }
333 DataType::Struct(st) => {
334 let mut map = HashMap::with_capacity(st.len());
335 for (sub_datum_ref, (name, data_type)) in scalar_ref
336 .into_struct()
337 .iter_fields_ref()
338 .zip_eq_debug(st.iter())
339 {
340 let attr = map_data(sub_datum_ref, data_type)?;
341 map.insert(name.to_owned(), attr);
342 }
343 AttributeValue::M(map)
344 }
345 DataType::Map(_m) => {
346 return Err(SinkError::DynamoDb(anyhow!("map is not supported yet")));
347 }
348 };
349 Ok(attr)
350}
351
352mod write_chunk_future {
353 use core::result;
354 use std::collections::HashMap;
355
356 use anyhow::anyhow;
357 use aws_sdk_dynamodb as dynamodb;
358 use aws_sdk_dynamodb::client::Client;
359 use aws_smithy_runtime_api::client::orchestrator::HttpResponse;
360 use dynamodb::error::SdkError;
361 use dynamodb::operation::batch_write_item::{BatchWriteItemError, BatchWriteItemOutput};
362 use dynamodb::types::{
363 AttributeValue, DeleteRequest, PutRequest, ReturnConsumedCapacity,
364 ReturnItemCollectionMetrics, WriteRequest,
365 };
366 use futures::future::{Map, TryJoinAll};
367 use futures::prelude::Future;
368 use futures::prelude::future::{FutureExt, try_join_all};
369 use itertools::Itertools;
370 use maplit::hashmap;
371
372 use super::{DynamoDbRequest, Result, SinkError};
373
374 pub type WriteChunkFuture = TryJoinAll<
375 Map<
376 impl Future<
377 Output = result::Result<
378 BatchWriteItemOutput,
379 SdkError<BatchWriteItemError, HttpResponse>,
380 >,
381 >,
382 impl FnOnce(
383 result::Result<BatchWriteItemOutput, SdkError<BatchWriteItemError, HttpResponse>>,
384 ) -> Result<()>,
385 >,
386 >;
387 pub struct DynamoDbPayloadWriter {
388 pub client: Client,
389 pub table: String,
390 pub dynamodb_keys: Vec<String>,
391 pub max_batch_item_nums: usize,
392 }
393
394 impl DynamoDbPayloadWriter {
395 pub fn write_one_insert(
396 &mut self,
397 item: HashMap<String, AttributeValue>,
398 request_items: &mut Vec<DynamoDbRequest>,
399 ) {
400 let put_req = PutRequest::builder().set_item(Some(item)).build().unwrap();
401 let req = WriteRequest::builder().put_request(put_req).build();
402 self.write_one_req(req, request_items);
403 }
404
405 pub fn write_one_delete(
406 &mut self,
407 key: HashMap<String, AttributeValue>,
408 request_items: &mut Vec<DynamoDbRequest>,
409 ) {
410 let key = key
411 .into_iter()
412 .filter(|(k, _)| self.dynamodb_keys.contains(k))
413 .collect();
414 let del_req = DeleteRequest::builder().set_key(Some(key)).build().unwrap();
415 let req = WriteRequest::builder().delete_request(del_req).build();
416 self.write_one_req(req, request_items);
417 }
418
419 pub fn write_one_req(
420 &mut self,
421 req: WriteRequest,
422 request_items: &mut Vec<DynamoDbRequest>,
423 ) {
424 let r_req = DynamoDbRequest {
425 inner: req,
426 key_items: self.dynamodb_keys.clone(),
427 };
428 if let Some(v) = r_req.extract_pk_values() {
429 request_items.retain(|item| {
430 !item
431 .extract_pk_values()
432 .unwrap_or_default()
433 .iter()
434 .all(|x| v.contains(x))
435 });
436 }
437 request_items.push(r_req);
438 }
439
440 pub fn write_chunk(&mut self, request_items: Vec<DynamoDbRequest>) -> WriteChunkFuture {
441 let table = self.table.clone();
442 let chunks = request_items
443 .into_iter()
444 .map(|r| r.inner)
445 .chunks(self.max_batch_item_nums);
446 let futures = chunks.into_iter().map(|chunk| {
447 let req_items = chunk.collect();
448 let reqs = hashmap! {
449 table.clone() => req_items,
450 };
451 self.client
452 .batch_write_item()
453 .set_request_items(Some(reqs))
454 .return_consumed_capacity(ReturnConsumedCapacity::None)
455 .return_item_collection_metrics(ReturnItemCollectionMetrics::None)
456 .send()
457 .map(|result| {
458 result
459 .map_err(|e| {
460 SinkError::DynamoDb(
461 anyhow!(e).context("failed to delete item from DynamoDB sink"),
462 )
463 })
464 .map(|_| ())
465 })
466 });
467 try_join_all(futures)
468 }
469 }
470}