risingwave_connector/sink/
dynamodb.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{BTreeMap, HashMap, HashSet};
16
17use anyhow::{Context, anyhow};
18use aws_sdk_dynamodb as dynamodb;
19use aws_sdk_dynamodb::client::Client;
20use aws_smithy_types::Blob;
21use dynamodb::types::{AttributeValue, TableStatus, WriteRequest};
22use futures::prelude::TryFuture;
23use futures::prelude::future::TryFutureExt;
24use risingwave_common::array::{Op, RowRef, StreamChunk};
25use risingwave_common::catalog::Schema;
26use risingwave_common::row::Row as _;
27use risingwave_common::types::{DataType, ScalarRefImpl, ToText};
28use risingwave_common::util::iter_util::ZipEqDebug;
29use serde_derive::Deserialize;
30use serde_with::{DisplayFromStr, serde_as};
31use with_options::WithOptions;
32use write_chunk_future::{DynamoDbPayloadWriter, WriteChunkFuture};
33
34use super::log_store::DeliveryFutureManagerAddFuture;
35use super::writer::{
36    AsyncTruncateLogSinkerOf, AsyncTruncateSinkWriter, AsyncTruncateSinkWriterExt,
37};
38use super::{DummySinkCommitCoordinator, Result, Sink, SinkError, SinkParam, SinkWriterParam};
39use crate::connector_common::AwsAuthProps;
40use crate::error::ConnectorResult;
41
42pub const DYNAMO_DB_SINK: &str = "dynamodb";
43
44#[serde_as]
45#[derive(Deserialize, Debug, Clone, WithOptions)]
46pub struct DynamoDbConfig {
47    #[serde(rename = "table", alias = "dynamodb.table")]
48    pub table: String,
49
50    #[serde(rename = "dynamodb.max_batch_rows", default = "default_max_batch_rows")]
51    #[serde_as(as = "DisplayFromStr")]
52    #[deprecated]
53    pub max_batch_rows: usize,
54
55    #[serde(flatten)]
56    pub aws_auth_props: AwsAuthProps,
57
58    #[serde(
59        rename = "dynamodb.max_batch_item_nums",
60        default = "default_max_batch_item_nums"
61    )]
62    #[serde_as(as = "DisplayFromStr")]
63    pub max_batch_item_nums: usize,
64
65    #[serde(
66        rename = "dynamodb.max_future_send_nums",
67        default = "default_max_future_send_nums"
68    )]
69    #[serde_as(as = "DisplayFromStr")]
70    pub max_future_send_nums: usize,
71}
72
73fn default_max_batch_item_nums() -> usize {
74    25
75}
76
77fn default_max_future_send_nums() -> usize {
78    256
79}
80
81fn default_max_batch_rows() -> usize {
82    1024
83}
84
85impl DynamoDbConfig {
86    pub async fn build_client(&self) -> ConnectorResult<Client> {
87        let config = &self.aws_auth_props;
88        let aws_config = config.build_config().await?;
89
90        Ok(Client::new(&aws_config))
91    }
92
93    fn from_btreemap(values: BTreeMap<String, String>) -> Result<Self> {
94        serde_json::from_value::<DynamoDbConfig>(serde_json::to_value(values).unwrap())
95            .map_err(|e| SinkError::Config(anyhow!(e)))
96    }
97}
98
99#[derive(Clone, Debug)]
100pub struct DynamoDbSink {
101    pub config: DynamoDbConfig,
102    schema: Schema,
103    pk_indices: Vec<usize>,
104}
105
106impl Sink for DynamoDbSink {
107    type Coordinator = DummySinkCommitCoordinator;
108    type LogSinker = AsyncTruncateLogSinkerOf<DynamoDbSinkWriter>;
109
110    const SINK_NAME: &'static str = DYNAMO_DB_SINK;
111
112    async fn validate(&self) -> Result<()> {
113        risingwave_common::license::Feature::DynamoDbSink
114            .check_available()
115            .map_err(|e| anyhow::anyhow!(e))?;
116        let client = (self.config.build_client().await)
117            .context("validate DynamoDB sink error")
118            .map_err(SinkError::DynamoDb)?;
119
120        let table_name = &self.config.table;
121        let output = client
122            .describe_table()
123            .table_name(table_name)
124            .send()
125            .await
126            .map_err(|e| anyhow!(e))?;
127        let Some(table) = output.table else {
128            return Err(SinkError::DynamoDb(anyhow!(
129                "table {} not found",
130                table_name
131            )));
132        };
133        if !matches!(table.table_status(), Some(TableStatus::Active)) {
134            return Err(SinkError::DynamoDb(anyhow!(
135                "table {} is not active",
136                table_name
137            )));
138        }
139        let pk_set: HashSet<String> = self
140            .schema
141            .fields()
142            .iter()
143            .enumerate()
144            .filter(|(k, _)| self.pk_indices.contains(k))
145            .map(|(_, v)| v.name.clone())
146            .collect();
147        let key_schema = table.key_schema();
148
149        for key_element in key_schema.iter().map(|x| x.attribute_name()) {
150            if !pk_set.contains(key_element) {
151                return Err(SinkError::DynamoDb(anyhow!(
152                    "table {} key field {} not found in schema or not primary key",
153                    table_name,
154                    key_element
155                )));
156            }
157        }
158
159        Ok(())
160    }
161
162    async fn new_log_sinker(&self, _writer_param: SinkWriterParam) -> Result<Self::LogSinker> {
163        Ok(
164            DynamoDbSinkWriter::new(self.config.clone(), self.schema.clone())
165                .await?
166                .into_log_sinker(self.config.max_future_send_nums),
167        )
168    }
169}
170
171impl TryFrom<SinkParam> for DynamoDbSink {
172    type Error = SinkError;
173
174    fn try_from(param: SinkParam) -> std::result::Result<Self, Self::Error> {
175        let schema = param.schema();
176        let config = DynamoDbConfig::from_btreemap(param.properties)?;
177
178        Ok(Self {
179            config,
180            schema,
181            pk_indices: param.downstream_pk,
182        })
183    }
184}
185
186#[derive(Debug)]
187struct DynamoDbRequest {
188    inner: WriteRequest,
189    key_items: Vec<String>,
190}
191
192impl DynamoDbRequest {
193    fn extract_pk_values(&self) -> Option<Vec<AttributeValue>> {
194        let key = match (&self.inner.put_request(), &self.inner.delete_request()) {
195            (Some(put_req), None) => &put_req.item,
196            (None, Some(del_req)) => &del_req.key,
197            _ => return None,
198        };
199        let vs = key
200            .iter()
201            .filter(|(k, _)| self.key_items.contains(k))
202            .map(|(_, v)| v.clone())
203            .collect();
204        Some(vs)
205    }
206}
207
208pub struct DynamoDbSinkWriter {
209    payload_writer: DynamoDbPayloadWriter,
210    formatter: DynamoDbFormatter,
211}
212
213impl DynamoDbSinkWriter {
214    pub async fn new(config: DynamoDbConfig, schema: Schema) -> Result<Self> {
215        let client = config.build_client().await?;
216        let table_name = &config.table;
217        let output = client
218            .describe_table()
219            .table_name(table_name)
220            .send()
221            .await
222            .map_err(|e| anyhow!(e))?;
223        let Some(table) = output.table else {
224            return Err(SinkError::DynamoDb(anyhow!(
225                "table {} not found",
226                table_name
227            )));
228        };
229        let dynamodb_keys = table
230            .key_schema
231            .unwrap_or_default()
232            .into_iter()
233            .map(|k| k.attribute_name)
234            .collect();
235
236        let payload_writer = DynamoDbPayloadWriter {
237            client,
238            table: config.table.clone(),
239            dynamodb_keys,
240            max_batch_item_nums: config.max_batch_item_nums,
241        };
242
243        Ok(Self {
244            payload_writer,
245            formatter: DynamoDbFormatter { schema },
246        })
247    }
248
249    fn write_chunk_inner(&mut self, chunk: StreamChunk) -> Result<WriteChunkFuture> {
250        let mut request_items = Vec::new();
251        for (op, row) in chunk.rows() {
252            let items = self.formatter.format_row(row)?;
253            match op {
254                Op::Insert | Op::UpdateInsert => {
255                    self.payload_writer
256                        .write_one_insert(items, &mut request_items);
257                }
258                Op::Delete => {
259                    self.payload_writer
260                        .write_one_delete(items, &mut request_items);
261                }
262                Op::UpdateDelete => {}
263            }
264        }
265        Ok(self.payload_writer.write_chunk(request_items))
266    }
267}
268
269pub type DynamoDbSinkDeliveryFuture = impl TryFuture<Ok = (), Error = SinkError> + Unpin + 'static;
270
271impl AsyncTruncateSinkWriter for DynamoDbSinkWriter {
272    type DeliveryFuture = DynamoDbSinkDeliveryFuture;
273
274    async fn write_chunk<'a>(
275        &'a mut self,
276        chunk: StreamChunk,
277        mut add_future: DeliveryFutureManagerAddFuture<'a, Self::DeliveryFuture>,
278    ) -> Result<()> {
279        let futures = self.write_chunk_inner(chunk)?;
280        add_future
281            .add_future_may_await(futures.map_ok(|_: Vec<()>| ()))
282            .await?;
283        Ok(())
284    }
285}
286
287struct DynamoDbFormatter {
288    schema: Schema,
289}
290
291impl DynamoDbFormatter {
292    fn format_row(&self, row: RowRef<'_>) -> Result<HashMap<String, AttributeValue>> {
293        row.iter()
294            .zip_eq_debug((self.schema.clone()).into_fields())
295            .map(|(scalar, field)| {
296                map_data(scalar, &field.data_type()).map(|attr| (field.name, attr))
297            })
298            .collect()
299    }
300}
301
302fn map_data(scalar_ref: Option<ScalarRefImpl<'_>>, data_type: &DataType) -> Result<AttributeValue> {
303    let Some(scalar_ref) = scalar_ref else {
304        return Ok(AttributeValue::Null(true));
305    };
306    let attr = match data_type {
307        DataType::Int16
308        | DataType::Int32
309        | DataType::Int64
310        | DataType::Int256
311        | DataType::Float32
312        | DataType::Float64
313        | DataType::Decimal
314        | DataType::Serial => AttributeValue::N(scalar_ref.to_text_with_type(data_type)),
315        // TODO: jsonb as dynamic type (https://github.com/risingwavelabs/risingwave/issues/11699)
316        DataType::Varchar
317        | DataType::Interval
318        | DataType::Date
319        | DataType::Time
320        | DataType::Timestamp
321        | DataType::Timestamptz
322        | DataType::Jsonb => AttributeValue::S(scalar_ref.to_text_with_type(data_type)),
323        DataType::Boolean => AttributeValue::Bool(scalar_ref.into_bool()),
324        DataType::Bytea => AttributeValue::B(Blob::new(scalar_ref.into_bytea())),
325        DataType::List(datatype) => {
326            let list_attr = scalar_ref
327                .into_list()
328                .iter()
329                .map(|x| map_data(x, datatype))
330                .collect::<Result<Vec<_>>>()?;
331            AttributeValue::L(list_attr)
332        }
333        DataType::Struct(st) => {
334            let mut map = HashMap::with_capacity(st.len());
335            for (sub_datum_ref, (name, data_type)) in scalar_ref
336                .into_struct()
337                .iter_fields_ref()
338                .zip_eq_debug(st.iter())
339            {
340                let attr = map_data(sub_datum_ref, data_type)?;
341                map.insert(name.to_owned(), attr);
342            }
343            AttributeValue::M(map)
344        }
345        DataType::Map(_m) => {
346            return Err(SinkError::DynamoDb(anyhow!("map is not supported yet")));
347        }
348    };
349    Ok(attr)
350}
351
352mod write_chunk_future {
353    use core::result;
354    use std::collections::HashMap;
355
356    use anyhow::anyhow;
357    use aws_sdk_dynamodb as dynamodb;
358    use aws_sdk_dynamodb::client::Client;
359    use aws_smithy_runtime_api::client::orchestrator::HttpResponse;
360    use dynamodb::error::SdkError;
361    use dynamodb::operation::batch_write_item::{BatchWriteItemError, BatchWriteItemOutput};
362    use dynamodb::types::{
363        AttributeValue, DeleteRequest, PutRequest, ReturnConsumedCapacity,
364        ReturnItemCollectionMetrics, WriteRequest,
365    };
366    use futures::future::{Map, TryJoinAll};
367    use futures::prelude::Future;
368    use futures::prelude::future::{FutureExt, try_join_all};
369    use itertools::Itertools;
370    use maplit::hashmap;
371
372    use super::{DynamoDbRequest, Result, SinkError};
373
374    pub type WriteChunkFuture = TryJoinAll<
375        Map<
376            impl Future<
377                Output = result::Result<
378                    BatchWriteItemOutput,
379                    SdkError<BatchWriteItemError, HttpResponse>,
380                >,
381            >,
382            impl FnOnce(
383                result::Result<BatchWriteItemOutput, SdkError<BatchWriteItemError, HttpResponse>>,
384            ) -> Result<()>,
385        >,
386    >;
387    pub struct DynamoDbPayloadWriter {
388        pub client: Client,
389        pub table: String,
390        pub dynamodb_keys: Vec<String>,
391        pub max_batch_item_nums: usize,
392    }
393
394    impl DynamoDbPayloadWriter {
395        pub fn write_one_insert(
396            &mut self,
397            item: HashMap<String, AttributeValue>,
398            request_items: &mut Vec<DynamoDbRequest>,
399        ) {
400            let put_req = PutRequest::builder().set_item(Some(item)).build().unwrap();
401            let req = WriteRequest::builder().put_request(put_req).build();
402            self.write_one_req(req, request_items);
403        }
404
405        pub fn write_one_delete(
406            &mut self,
407            key: HashMap<String, AttributeValue>,
408            request_items: &mut Vec<DynamoDbRequest>,
409        ) {
410            let key = key
411                .into_iter()
412                .filter(|(k, _)| self.dynamodb_keys.contains(k))
413                .collect();
414            let del_req = DeleteRequest::builder().set_key(Some(key)).build().unwrap();
415            let req = WriteRequest::builder().delete_request(del_req).build();
416            self.write_one_req(req, request_items);
417        }
418
419        pub fn write_one_req(
420            &mut self,
421            req: WriteRequest,
422            request_items: &mut Vec<DynamoDbRequest>,
423        ) {
424            let r_req = DynamoDbRequest {
425                inner: req,
426                key_items: self.dynamodb_keys.clone(),
427            };
428            if let Some(v) = r_req.extract_pk_values() {
429                request_items.retain(|item| {
430                    !item
431                        .extract_pk_values()
432                        .unwrap_or_default()
433                        .iter()
434                        .all(|x| v.contains(x))
435                });
436            }
437            request_items.push(r_req);
438        }
439
440        pub fn write_chunk(&mut self, request_items: Vec<DynamoDbRequest>) -> WriteChunkFuture {
441            let table = self.table.clone();
442            let chunks = request_items
443                .into_iter()
444                .map(|r| r.inner)
445                .chunks(self.max_batch_item_nums);
446            let futures = chunks.into_iter().map(|chunk| {
447                let req_items = chunk.collect();
448                let reqs = hashmap! {
449                    table.clone() => req_items,
450                };
451                self.client
452                    .batch_write_item()
453                    .set_request_items(Some(reqs))
454                    .return_consumed_capacity(ReturnConsumedCapacity::None)
455                    .return_item_collection_metrics(ReturnItemCollectionMetrics::None)
456                    .send()
457                    .map(|result| {
458                        result
459                            .map_err(|e| {
460                                SinkError::DynamoDb(
461                                    anyhow!(e).context("failed to delete item from DynamoDB sink"),
462                                )
463                            })
464                            .map(|_| ())
465                    })
466            });
467            try_join_all(futures)
468        }
469    }
470}