risingwave_connector/sink/
dynamodb.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{BTreeMap, HashMap, HashSet};
16
17use anyhow::{Context, anyhow};
18use aws_sdk_dynamodb as dynamodb;
19use aws_sdk_dynamodb::client::Client;
20use aws_smithy_types::Blob;
21use dynamodb::types::{AttributeValue, TableStatus, WriteRequest};
22use futures::prelude::TryFuture;
23use futures::prelude::future::TryFutureExt;
24use risingwave_common::array::{Op, RowRef, StreamChunk};
25use risingwave_common::catalog::Schema;
26use risingwave_common::row::Row as _;
27use risingwave_common::types::{DataType, ScalarRefImpl, ToText};
28use risingwave_common::util::iter_util::ZipEqDebug;
29use serde::Deserialize;
30use serde_with::{DisplayFromStr, serde_as};
31use with_options::WithOptions;
32use write_chunk_future::{DynamoDbPayloadWriter, WriteChunkFuture};
33
34use super::log_store::DeliveryFutureManagerAddFuture;
35use super::writer::{
36    AsyncTruncateLogSinkerOf, AsyncTruncateSinkWriter, AsyncTruncateSinkWriterExt,
37};
38use super::{Result, Sink, SinkError, SinkParam, SinkWriterParam};
39use crate::connector_common::AwsAuthProps;
40use crate::enforce_secret::EnforceSecret;
41use crate::error::ConnectorResult;
42
43pub const DYNAMO_DB_SINK: &str = "dynamodb";
44
45#[serde_as]
46#[derive(Deserialize, Debug, Clone, WithOptions)]
47pub struct DynamoDbConfig {
48    #[serde(rename = "table", alias = "dynamodb.table")]
49    pub table: String,
50
51    #[serde(rename = "dynamodb.max_batch_rows", default = "default_max_batch_rows")]
52    #[serde_as(as = "DisplayFromStr")]
53    #[deprecated]
54    pub max_batch_rows: usize,
55
56    #[serde(flatten)]
57    pub aws_auth_props: AwsAuthProps,
58
59    #[serde(
60        rename = "dynamodb.max_batch_item_nums",
61        default = "default_max_batch_item_nums"
62    )]
63    #[serde_as(as = "DisplayFromStr")]
64    pub max_batch_item_nums: usize,
65
66    #[serde(
67        rename = "dynamodb.max_future_send_nums",
68        default = "default_max_future_send_nums"
69    )]
70    #[serde_as(as = "DisplayFromStr")]
71    pub max_future_send_nums: usize,
72}
73
74impl EnforceSecret for DynamoDbConfig {
75    fn enforce_one(prop: &str) -> crate::error::ConnectorResult<()> {
76        AwsAuthProps::enforce_one(prop)
77    }
78}
79
80fn default_max_batch_item_nums() -> usize {
81    25
82}
83
84fn default_max_future_send_nums() -> usize {
85    256
86}
87
88fn default_max_batch_rows() -> usize {
89    1024
90}
91
92impl DynamoDbConfig {
93    pub async fn build_client(&self) -> ConnectorResult<Client> {
94        let config = &self.aws_auth_props;
95        let aws_config = config.build_config().await?;
96
97        Ok(Client::new(&aws_config))
98    }
99
100    fn from_btreemap(values: BTreeMap<String, String>) -> Result<Self> {
101        serde_json::from_value::<DynamoDbConfig>(serde_json::to_value(values).unwrap())
102            .map_err(|e| SinkError::Config(anyhow!(e)))
103    }
104}
105
106#[derive(Clone, Debug)]
107pub struct DynamoDbSink {
108    pub config: DynamoDbConfig,
109    schema: Schema,
110    pk_indices: Vec<usize>,
111}
112
113impl EnforceSecret for DynamoDbSink {
114    fn enforce_secret<'a>(
115        prop_iter: impl Iterator<Item = &'a str>,
116    ) -> crate::error::ConnectorResult<()> {
117        for prop in prop_iter {
118            DynamoDbConfig::enforce_one(prop)?;
119        }
120        Ok(())
121    }
122}
123
124impl Sink for DynamoDbSink {
125    type LogSinker = AsyncTruncateLogSinkerOf<DynamoDbSinkWriter>;
126
127    const SINK_NAME: &'static str = DYNAMO_DB_SINK;
128
129    async fn validate(&self) -> Result<()> {
130        risingwave_common::license::Feature::DynamoDbSink
131            .check_available()
132            .map_err(|e| anyhow::anyhow!(e))?;
133        let client = (self.config.build_client().await)
134            .context("validate DynamoDB sink error")
135            .map_err(SinkError::DynamoDb)?;
136
137        let table_name = &self.config.table;
138        let output = client
139            .describe_table()
140            .table_name(table_name)
141            .send()
142            .await
143            .map_err(|e| anyhow!(e))?;
144        let Some(table) = output.table else {
145            return Err(SinkError::DynamoDb(anyhow!(
146                "table {} not found",
147                table_name
148            )));
149        };
150        if !matches!(table.table_status(), Some(TableStatus::Active)) {
151            return Err(SinkError::DynamoDb(anyhow!(
152                "table {} is not active",
153                table_name
154            )));
155        }
156        let pk_set: HashSet<String> = self
157            .schema
158            .fields()
159            .iter()
160            .enumerate()
161            .filter(|(k, _)| self.pk_indices.contains(k))
162            .map(|(_, v)| v.name.clone())
163            .collect();
164        let key_schema = table.key_schema();
165
166        for key_element in key_schema.iter().map(|x| x.attribute_name()) {
167            if !pk_set.contains(key_element) {
168                return Err(SinkError::DynamoDb(anyhow!(
169                    "table {} key field {} not found in schema or not primary key",
170                    table_name,
171                    key_element
172                )));
173            }
174        }
175
176        Ok(())
177    }
178
179    async fn new_log_sinker(&self, _writer_param: SinkWriterParam) -> Result<Self::LogSinker> {
180        Ok(
181            DynamoDbSinkWriter::new(self.config.clone(), self.schema.clone())
182                .await?
183                .into_log_sinker(self.config.max_future_send_nums),
184        )
185    }
186}
187
188impl TryFrom<SinkParam> for DynamoDbSink {
189    type Error = SinkError;
190
191    fn try_from(param: SinkParam) -> std::result::Result<Self, Self::Error> {
192        let schema = param.schema();
193        let pk_indices = param.downstream_pk_or_empty();
194        let config = DynamoDbConfig::from_btreemap(param.properties)?;
195
196        Ok(Self {
197            config,
198            schema,
199            pk_indices,
200        })
201    }
202}
203
204#[derive(Debug)]
205struct DynamoDbRequest {
206    inner: WriteRequest,
207    key_items: Vec<String>,
208}
209
210impl DynamoDbRequest {
211    fn extract_pk_values(&self) -> Option<Vec<AttributeValue>> {
212        let key = match (&self.inner.put_request(), &self.inner.delete_request()) {
213            (Some(put_req), None) => &put_req.item,
214            (None, Some(del_req)) => &del_req.key,
215            _ => return None,
216        };
217        let vs = key
218            .iter()
219            .filter(|(k, _)| self.key_items.contains(k))
220            .map(|(_, v)| v.clone())
221            .collect();
222        Some(vs)
223    }
224}
225
226pub struct DynamoDbSinkWriter {
227    payload_writer: DynamoDbPayloadWriter,
228    formatter: DynamoDbFormatter,
229}
230
231impl DynamoDbSinkWriter {
232    pub async fn new(config: DynamoDbConfig, schema: Schema) -> Result<Self> {
233        let client = config.build_client().await?;
234        let table_name = &config.table;
235        let output = client
236            .describe_table()
237            .table_name(table_name)
238            .send()
239            .await
240            .map_err(|e| anyhow!(e))?;
241        let Some(table) = output.table else {
242            return Err(SinkError::DynamoDb(anyhow!(
243                "table {} not found",
244                table_name
245            )));
246        };
247        let dynamodb_keys = table
248            .key_schema
249            .unwrap_or_default()
250            .into_iter()
251            .map(|k| k.attribute_name)
252            .collect();
253
254        let payload_writer = DynamoDbPayloadWriter {
255            client,
256            table: config.table.clone(),
257            dynamodb_keys,
258            max_batch_item_nums: config.max_batch_item_nums,
259        };
260
261        Ok(Self {
262            payload_writer,
263            formatter: DynamoDbFormatter { schema },
264        })
265    }
266
267    fn write_chunk_inner(&mut self, chunk: StreamChunk) -> Result<WriteChunkFuture> {
268        let mut request_items = Vec::new();
269        for (op, row) in chunk.rows() {
270            let items = self.formatter.format_row(row)?;
271            match op {
272                Op::Insert | Op::UpdateInsert => {
273                    self.payload_writer
274                        .write_one_insert(items, &mut request_items);
275                }
276                Op::Delete => {
277                    self.payload_writer
278                        .write_one_delete(items, &mut request_items);
279                }
280                Op::UpdateDelete => {}
281            }
282        }
283        Ok(self.payload_writer.write_chunk(request_items))
284    }
285}
286
287pub type DynamoDbSinkDeliveryFuture = impl TryFuture<Ok = (), Error = SinkError> + Unpin + 'static;
288
289impl AsyncTruncateSinkWriter for DynamoDbSinkWriter {
290    type DeliveryFuture = DynamoDbSinkDeliveryFuture;
291
292    #[define_opaque(DynamoDbSinkDeliveryFuture)]
293    async fn write_chunk<'a>(
294        &'a mut self,
295        chunk: StreamChunk,
296        mut add_future: DeliveryFutureManagerAddFuture<'a, Self::DeliveryFuture>,
297    ) -> Result<()> {
298        let futures = self.write_chunk_inner(chunk)?;
299        add_future
300            .add_future_may_await(futures.map_ok(|_: Vec<()>| ()))
301            .await?;
302        Ok(())
303    }
304}
305
306struct DynamoDbFormatter {
307    schema: Schema,
308}
309
310impl DynamoDbFormatter {
311    fn format_row(&self, row: RowRef<'_>) -> Result<HashMap<String, AttributeValue>> {
312        row.iter()
313            .zip_eq_debug((self.schema.clone()).into_fields())
314            .map(|(scalar, field)| {
315                map_data(scalar, &field.data_type()).map(|attr| (field.name, attr))
316            })
317            .collect()
318    }
319}
320
321fn map_data(scalar_ref: Option<ScalarRefImpl<'_>>, data_type: &DataType) -> Result<AttributeValue> {
322    let Some(scalar_ref) = scalar_ref else {
323        return Ok(AttributeValue::Null(true));
324    };
325    let attr = match data_type {
326        DataType::Int16
327        | DataType::Int32
328        | DataType::Int64
329        | DataType::Int256
330        | DataType::Float32
331        | DataType::Float64
332        | DataType::Decimal
333        | DataType::Serial => AttributeValue::N(scalar_ref.to_text_with_type(data_type)),
334        // TODO: jsonb as dynamic type (https://github.com/risingwavelabs/risingwave/issues/11699)
335        DataType::Varchar
336        | DataType::Interval
337        | DataType::Date
338        | DataType::Time
339        | DataType::Timestamp
340        | DataType::Timestamptz
341        | DataType::Jsonb => AttributeValue::S(scalar_ref.to_text_with_type(data_type)),
342        DataType::Boolean => AttributeValue::Bool(scalar_ref.into_bool()),
343        DataType::Bytea => AttributeValue::B(Blob::new(scalar_ref.into_bytea())),
344        DataType::List(lt) => {
345            let list_attr = scalar_ref
346                .into_list()
347                .iter()
348                .map(|x| map_data(x, lt.elem()))
349                .collect::<Result<Vec<_>>>()?;
350            AttributeValue::L(list_attr)
351        }
352        DataType::Struct(st) => {
353            let mut map = HashMap::with_capacity(st.len());
354            for (sub_datum_ref, (name, data_type)) in scalar_ref
355                .into_struct()
356                .iter_fields_ref()
357                .zip_eq_debug(st.iter())
358            {
359                let attr = map_data(sub_datum_ref, data_type)?;
360                map.insert(name.to_owned(), attr);
361            }
362            AttributeValue::M(map)
363        }
364        DataType::Map(_m) => {
365            return Err(SinkError::DynamoDb(anyhow!("map is not supported yet")));
366        }
367        DataType::Vector(_) => {
368            return Err(SinkError::DynamoDb(anyhow!("vector is not supported yet")));
369        }
370    };
371    Ok(attr)
372}
373
374mod write_chunk_future {
375    use core::result;
376    use std::collections::HashMap;
377
378    use anyhow::anyhow;
379    use aws_sdk_dynamodb as dynamodb;
380    use aws_sdk_dynamodb::client::Client;
381    use aws_smithy_runtime_api::client::orchestrator::HttpResponse;
382    use dynamodb::error::SdkError;
383    use dynamodb::operation::batch_write_item::{BatchWriteItemError, BatchWriteItemOutput};
384    use dynamodb::types::{
385        AttributeValue, DeleteRequest, PutRequest, ReturnConsumedCapacity,
386        ReturnItemCollectionMetrics, WriteRequest,
387    };
388    use futures::future::{Map, TryJoinAll};
389    use futures::prelude::Future;
390    use futures::prelude::future::{FutureExt, try_join_all};
391    use itertools::Itertools;
392    use maplit::hashmap;
393
394    use super::{DynamoDbRequest, Result, SinkError};
395
396    pub type WriteChunkFuture = TryJoinAll<
397        Map<
398            impl Future<
399                Output = result::Result<
400                    BatchWriteItemOutput,
401                    SdkError<BatchWriteItemError, HttpResponse>,
402                >,
403            >,
404            impl FnOnce(
405                result::Result<BatchWriteItemOutput, SdkError<BatchWriteItemError, HttpResponse>>,
406            ) -> Result<()>,
407        >,
408    >;
409    pub struct DynamoDbPayloadWriter {
410        pub client: Client,
411        pub table: String,
412        pub dynamodb_keys: Vec<String>,
413        pub max_batch_item_nums: usize,
414    }
415
416    impl DynamoDbPayloadWriter {
417        pub fn write_one_insert(
418            &mut self,
419            item: HashMap<String, AttributeValue>,
420            request_items: &mut Vec<DynamoDbRequest>,
421        ) {
422            let put_req = PutRequest::builder().set_item(Some(item)).build().unwrap();
423            let req = WriteRequest::builder().put_request(put_req).build();
424            self.write_one_req(req, request_items);
425        }
426
427        pub fn write_one_delete(
428            &mut self,
429            key: HashMap<String, AttributeValue>,
430            request_items: &mut Vec<DynamoDbRequest>,
431        ) {
432            let key = key
433                .into_iter()
434                .filter(|(k, _)| self.dynamodb_keys.contains(k))
435                .collect();
436            let del_req = DeleteRequest::builder().set_key(Some(key)).build().unwrap();
437            let req = WriteRequest::builder().delete_request(del_req).build();
438            self.write_one_req(req, request_items);
439        }
440
441        pub fn write_one_req(
442            &mut self,
443            req: WriteRequest,
444            request_items: &mut Vec<DynamoDbRequest>,
445        ) {
446            let r_req = DynamoDbRequest {
447                inner: req,
448                key_items: self.dynamodb_keys.clone(),
449            };
450            if let Some(v) = r_req.extract_pk_values() {
451                request_items.retain(|item| {
452                    !item
453                        .extract_pk_values()
454                        .unwrap_or_default()
455                        .iter()
456                        .all(|x| v.contains(x))
457                });
458            }
459            request_items.push(r_req);
460        }
461
462        #[define_opaque(WriteChunkFuture)]
463        pub fn write_chunk(&mut self, request_items: Vec<DynamoDbRequest>) -> WriteChunkFuture {
464            let table = self.table.clone();
465            let chunks = request_items
466                .into_iter()
467                .map(|r| r.inner)
468                .chunks(self.max_batch_item_nums);
469            let futures = chunks.into_iter().map(|chunk| {
470                let req_items = chunk.collect();
471                let reqs = hashmap! {
472                    table.clone() => req_items,
473                };
474                self.client
475                    .batch_write_item()
476                    .set_request_items(Some(reqs))
477                    .return_consumed_capacity(ReturnConsumedCapacity::None)
478                    .return_item_collection_metrics(ReturnItemCollectionMetrics::None)
479                    .send()
480                    .map(|result| {
481                        result
482                            .map_err(|e| {
483                                SinkError::DynamoDb(
484                                    anyhow!(e).context("failed to delete item from DynamoDB sink"),
485                                )
486                            })
487                            .map(|_| ())
488                    })
489            });
490            try_join_all(futures)
491        }
492    }
493}