Skip to main content

risingwave_connector/sink/
dynamodb.rs

1// Copyright 2024 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{BTreeMap, BTreeSet, HashMap};
16
17use anyhow::{Context, anyhow};
18use aws_sdk_dynamodb as dynamodb;
19use aws_sdk_dynamodb::client::Client;
20use aws_smithy_types::Blob;
21use dynamodb::types::{AttributeValue, KeySchemaElement, TableStatus, WriteRequest};
22use futures::TryFutureExt;
23use risingwave_common::array::{Op, RowRef, StreamChunk};
24use risingwave_common::catalog::Schema;
25use risingwave_common::row::Row as _;
26use risingwave_common::types::{DataType, ScalarRefImpl, ToText};
27use risingwave_common::util::iter_util::ZipEqDebug;
28use serde::Deserialize;
29use serde_with::{DisplayFromStr, serde_as};
30use with_options::WithOptions;
31use write_chunk_future::{DynamoDbPayloadWriter, WriteChunkFuture};
32
33use super::writer::{
34    AsyncTruncateLogSinkerOf, AsyncTruncateSinkWriter, AsyncTruncateSinkWriterExt,
35};
36use super::{Result, Sink, SinkError, SinkParam, SinkWriterParam};
37use crate::connector_common::AwsAuthProps;
38use crate::enforce_secret::EnforceSecret;
39use crate::error::ConnectorResult;
40use crate::sink::log_store::DeliveryFutureManagerAddFuture;
41
42pub const DYNAMO_DB_SINK: &str = "dynamodb";
43
44#[serde_as]
45#[derive(Deserialize, Debug, Clone, WithOptions)]
46pub struct DynamoDbConfig {
47    #[serde(rename = "table", alias = "dynamodb.table")]
48    pub table: String,
49
50    #[serde(rename = "dynamodb.max_batch_rows", default = "default_max_batch_rows")]
51    #[serde_as(as = "DisplayFromStr")]
52    #[deprecated]
53    pub max_batch_rows: usize,
54
55    #[serde(flatten)]
56    pub aws_auth_props: AwsAuthProps,
57
58    #[serde(
59        rename = "dynamodb.max_batch_item_nums",
60        default = "default_max_batch_item_nums"
61    )]
62    #[serde_as(as = "DisplayFromStr")]
63    pub max_batch_item_nums: usize,
64
65    #[serde(
66        rename = "dynamodb.max_future_send_nums",
67        default = "default_max_future_send_nums"
68    )]
69    #[serde_as(as = "DisplayFromStr")]
70    pub max_future_send_nums: usize,
71
72    #[serde(
73        rename = "dynamodb.batch_write_retry_times",
74        default = "default_batch_write_retry_times"
75    )]
76    #[serde_as(as = "DisplayFromStr")]
77    pub batch_write_retry_times: usize,
78
79    #[serde(
80        rename = "dynamodb.batch_write_retry_backoff_ms",
81        default = "default_batch_write_retry_backoff_ms"
82    )]
83    #[serde_as(as = "DisplayFromStr")]
84    pub batch_write_retry_backoff_ms: u64,
85}
86
87impl EnforceSecret for DynamoDbConfig {
88    fn enforce_one(prop: &str) -> crate::error::ConnectorResult<()> {
89        AwsAuthProps::enforce_one(prop)
90    }
91}
92
93fn default_max_batch_item_nums() -> usize {
94    25
95}
96
97fn default_max_future_send_nums() -> usize {
98    256
99}
100
101fn default_batch_write_retry_times() -> usize {
102    3
103}
104
105fn default_batch_write_retry_backoff_ms() -> u64 {
106    100
107}
108
109fn default_max_batch_rows() -> usize {
110    1024
111}
112
113impl DynamoDbConfig {
114    pub async fn build_client(&self) -> ConnectorResult<Client> {
115        let config = &self.aws_auth_props;
116        let aws_config = config.build_config().await?;
117
118        Ok(Client::new(&aws_config))
119    }
120
121    fn from_btreemap(values: BTreeMap<String, String>) -> Result<Self> {
122        serde_json::from_value::<DynamoDbConfig>(serde_json::to_value(values).unwrap())
123            .map_err(|e| SinkError::Config(anyhow!(e)))
124    }
125}
126
127#[derive(Clone, Debug)]
128pub struct DynamoDbSink {
129    pub config: DynamoDbConfig,
130    schema: Schema,
131    pk_indices: Vec<usize>,
132}
133
134impl EnforceSecret for DynamoDbSink {
135    fn enforce_secret<'a>(
136        prop_iter: impl Iterator<Item = &'a str>,
137    ) -> crate::error::ConnectorResult<()> {
138        for prop in prop_iter {
139            DynamoDbConfig::enforce_one(prop)?;
140        }
141        Ok(())
142    }
143}
144
145impl Sink for DynamoDbSink {
146    type LogSinker = AsyncTruncateLogSinkerOf<DynamoDbSinkWriter>;
147
148    const SINK_NAME: &'static str = DYNAMO_DB_SINK;
149
150    async fn validate(&self) -> Result<()> {
151        risingwave_common::license::Feature::DynamoDbSink
152            .check_available()
153            .map_err(|e| anyhow::anyhow!(e))?;
154        let client = (self.config.build_client().await)
155            .context("validate DynamoDB sink error")
156            .map_err(SinkError::DynamoDb)?;
157
158        let table_name = &self.config.table;
159        let output = client
160            .describe_table()
161            .table_name(table_name)
162            .send()
163            .await
164            .map_err(|e| anyhow!(e))?;
165        let Some(table) = output.table else {
166            return Err(SinkError::DynamoDb(anyhow!(
167                "table {} not found",
168                table_name
169            )));
170        };
171        if !matches!(table.table_status(), Some(TableStatus::Active)) {
172            return Err(SinkError::DynamoDb(anyhow!(
173                "table {} is not active",
174                table_name
175            )));
176        }
177        let rw_pk_names = rw_pk_names(&self.schema, &self.pk_indices)?;
178        let dynamodb_keys = dynamodb_key_schema_names(table_name, table.key_schema())?;
179        validate_pk_matches_dynamodb_key_schema(table_name, &rw_pk_names, &dynamodb_keys)?;
180
181        Ok(())
182    }
183
184    async fn new_log_sinker(&self, _writer_param: SinkWriterParam) -> Result<Self::LogSinker> {
185        Ok(
186            DynamoDbSinkWriter::new(self.config.clone(), self.schema.clone())
187                .await?
188                .into_log_sinker(self.config.max_future_send_nums),
189        )
190    }
191}
192
193impl TryFrom<SinkParam> for DynamoDbSink {
194    type Error = SinkError;
195
196    fn try_from(param: SinkParam) -> std::result::Result<Self, Self::Error> {
197        let schema = param.schema();
198        let pk_indices = param.downstream_pk_or_empty();
199        let config = DynamoDbConfig::from_btreemap(param.properties)?;
200
201        Ok(Self {
202            config,
203            schema,
204            pk_indices,
205        })
206    }
207}
208
209#[derive(Debug)]
210struct DynamoDbRequest {
211    inner: WriteRequest,
212    key_items: Vec<String>,
213}
214
215impl DynamoDbRequest {
216    fn extract_key(&self) -> Option<&HashMap<String, AttributeValue>> {
217        match (&self.inner.put_request(), &self.inner.delete_request()) {
218            (Some(put_req), None) => Some(&put_req.item),
219            (None, Some(del_req)) => Some(&del_req.key),
220            _ => None,
221        }
222    }
223
224    fn has_same_pk(&self, other: &Self) -> bool {
225        if self.key_items.is_empty() {
226            return false;
227        }
228
229        let Some(key) = self.extract_key() else {
230            return false;
231        };
232        let Some(other_key) = other.extract_key() else {
233            return false;
234        };
235
236        self.key_items.iter().all(|key_item| {
237            matches!(
238                (key.get(key_item), other_key.get(key_item)),
239                (Some(value), Some(other_value)) if value == other_value
240            )
241        })
242    }
243}
244
245pub struct DynamoDbSinkWriter {
246    payload_writer: DynamoDbPayloadWriter,
247    formatter: DynamoDbFormatter,
248    max_future_send_nums: usize,
249}
250
251impl DynamoDbSinkWriter {
252    pub async fn new(config: DynamoDbConfig, schema: Schema) -> Result<Self> {
253        let client = config.build_client().await?;
254        let table_name = &config.table;
255        let output = client
256            .describe_table()
257            .table_name(table_name)
258            .send()
259            .await
260            .map_err(|e| anyhow!(e))?;
261        let Some(table) = output.table else {
262            return Err(SinkError::DynamoDb(anyhow!(
263                "table {} not found",
264                table_name
265            )));
266        };
267        let dynamodb_keys = dynamodb_key_schema_names(table_name, table.key_schema())?;
268
269        let payload_writer = DynamoDbPayloadWriter {
270            client,
271            table: config.table.clone(),
272            dynamodb_keys,
273            max_batch_item_nums: config.max_batch_item_nums,
274            batch_write_retry_times: config.batch_write_retry_times,
275            batch_write_retry_backoff_ms: config.batch_write_retry_backoff_ms,
276        };
277
278        Ok(Self {
279            payload_writer,
280            formatter: DynamoDbFormatter { schema },
281            max_future_send_nums: config.max_future_send_nums,
282        })
283    }
284
285    fn write_chunk_inner(&mut self, chunk: StreamChunk) -> Result<WriteChunkFuture> {
286        let mut request_items = Vec::new();
287        for (op, row) in chunk.rows() {
288            let items = self.formatter.format_row(row)?;
289            match op {
290                Op::Insert | Op::UpdateInsert => {
291                    self.payload_writer
292                        .write_one_insert(items, &mut request_items);
293                }
294                Op::Delete => {
295                    self.payload_writer
296                        .write_one_delete(items, &mut request_items);
297                }
298                Op::UpdateDelete => {}
299            }
300        }
301        Ok(self
302            .payload_writer
303            .write_chunk(request_items, self.max_future_send_nums))
304    }
305}
306
307impl AsyncTruncateSinkWriter for DynamoDbSinkWriter {
308    type DeliveryFuture = WriteChunkFuture;
309
310    async fn write_chunk<'a>(
311        &'a mut self,
312        chunk: StreamChunk,
313        _add_future: DeliveryFutureManagerAddFuture<'a, Self::DeliveryFuture>,
314    ) -> Result<()> {
315        self.write_chunk_inner(chunk)?.map_ok(|_| ()).await?;
316        Ok(())
317    }
318}
319
320struct DynamoDbFormatter {
321    schema: Schema,
322}
323
324impl DynamoDbFormatter {
325    fn format_row(&self, row: RowRef<'_>) -> Result<HashMap<String, AttributeValue>> {
326        row.iter()
327            .zip_eq_debug((self.schema.clone()).into_fields())
328            .map(|(scalar, field)| {
329                map_data(scalar, &field.data_type()).map(|attr| (field.name, attr))
330            })
331            .collect()
332    }
333}
334
335fn map_data(scalar_ref: Option<ScalarRefImpl<'_>>, data_type: &DataType) -> Result<AttributeValue> {
336    let Some(scalar_ref) = scalar_ref else {
337        return Ok(AttributeValue::Null(true));
338    };
339    let attr = match data_type {
340        DataType::Int16
341        | DataType::Int32
342        | DataType::Int64
343        | DataType::Int256
344        | DataType::Float32
345        | DataType::Float64
346        | DataType::Decimal
347        | DataType::Serial => AttributeValue::N(scalar_ref.to_text_with_type(data_type)),
348        // TODO: jsonb as dynamic type (https://github.com/risingwavelabs/risingwave/issues/11699)
349        DataType::Varchar
350        | DataType::Interval
351        | DataType::Date
352        | DataType::Time
353        | DataType::Timestamp
354        | DataType::Timestamptz
355        | DataType::Jsonb => AttributeValue::S(scalar_ref.to_text_with_type(data_type)),
356        DataType::Boolean => AttributeValue::Bool(scalar_ref.into_bool()),
357        DataType::Bytea => AttributeValue::B(Blob::new(scalar_ref.into_bytea())),
358        DataType::List(lt) => {
359            let list_attr = scalar_ref
360                .into_list()
361                .iter()
362                .map(|x| map_data(x, lt.elem()))
363                .collect::<Result<Vec<_>>>()?;
364            AttributeValue::L(list_attr)
365        }
366        DataType::Struct(st) => {
367            let mut map = HashMap::with_capacity(st.len());
368            for (sub_datum_ref, (name, data_type)) in scalar_ref
369                .into_struct()
370                .iter_fields_ref()
371                .zip_eq_debug(st.iter())
372            {
373                let attr = map_data(sub_datum_ref, data_type)?;
374                map.insert(name.to_owned(), attr);
375            }
376            AttributeValue::M(map)
377        }
378        DataType::Map(_m) => {
379            return Err(SinkError::DynamoDb(anyhow!("map is not supported yet")));
380        }
381        DataType::Vector(_) => {
382            return Err(SinkError::DynamoDb(anyhow!("vector is not supported yet")));
383        }
384    };
385    Ok(attr)
386}
387
388fn rw_pk_names(schema: &Schema, pk_indices: &[usize]) -> Result<Vec<String>> {
389    pk_indices
390        .iter()
391        .map(|pk_idx| {
392            schema
393                .fields()
394                .get(*pk_idx)
395                .map(|field| field.name.clone())
396                .ok_or_else(|| {
397                    SinkError::DynamoDb(anyhow!(
398                        "RisingWave primary key column index {} is out of range",
399                        pk_idx
400                    ))
401                })
402        })
403        .collect()
404}
405
406fn dynamodb_key_schema_names(
407    table_name: &str,
408    key_schema: &[KeySchemaElement],
409) -> Result<Vec<String>> {
410    if key_schema.is_empty() {
411        return Err(SinkError::DynamoDb(anyhow!(
412            "table {} key schema is empty",
413            table_name
414        )));
415    }
416
417    Ok(key_schema
418        .iter()
419        .map(|key_element| key_element.attribute_name().to_owned())
420        .collect())
421}
422
423fn validate_pk_matches_dynamodb_key_schema(
424    table_name: &str,
425    rw_pk_names: &[String],
426    dynamodb_keys: &[String],
427) -> Result<()> {
428    let rw_pk_set = rw_pk_names.iter().collect::<BTreeSet<_>>();
429    let dynamodb_key_set = dynamodb_keys.iter().collect::<BTreeSet<_>>();
430    if rw_pk_names.len() != dynamodb_keys.len() || rw_pk_set != dynamodb_key_set {
431        return Err(SinkError::DynamoDb(anyhow!(
432            "DynamoDB table {} primary key {:?} must match RisingWave primary key {:?}",
433            table_name,
434            dynamodb_keys,
435            rw_pk_names
436        )));
437    }
438
439    Ok(())
440}
441
442mod write_chunk_future {
443    use std::collections::HashMap;
444    use std::time::Duration;
445
446    use anyhow::anyhow;
447    use aws_sdk_dynamodb as dynamodb;
448    use aws_sdk_dynamodb::client::Client;
449    use dynamodb::types::{
450        AttributeValue, DeleteRequest, PutRequest, ReturnConsumedCapacity,
451        ReturnItemCollectionMetrics, WriteRequest,
452    };
453    use futures::{FutureExt, StreamExt, TryFuture, TryStreamExt, stream};
454    use itertools::Itertools;
455    use maplit::hashmap;
456    use tokio::time::sleep;
457    use tokio_retry::strategy::{ExponentialBackoff, jitter};
458
459    use super::{DynamoDbRequest, SinkError};
460
461    const MAX_BATCH_WRITE_RETRY_DELAY_MS: u64 = 2000;
462    const MAX_BATCH_WRITE_CONCURRENCY: usize = 256;
463
464    pub struct DynamoDbPayloadWriter {
465        pub client: Client,
466        pub table: String,
467        pub dynamodb_keys: Vec<String>,
468        pub max_batch_item_nums: usize,
469        pub batch_write_retry_times: usize,
470        pub batch_write_retry_backoff_ms: u64,
471    }
472
473    pub type WriteChunkFuture = impl TryFuture<Ok = (), Error = SinkError> + Unpin + Send + 'static;
474
475    impl DynamoDbPayloadWriter {
476        pub fn write_one_insert(
477            &mut self,
478            item: HashMap<String, AttributeValue>,
479            request_items: &mut Vec<DynamoDbRequest>,
480        ) {
481            let put_req = PutRequest::builder().set_item(Some(item)).build().unwrap();
482            let req = WriteRequest::builder().put_request(put_req).build();
483            self.write_one_req(req, request_items);
484        }
485
486        pub fn write_one_delete(
487            &mut self,
488            key: HashMap<String, AttributeValue>,
489            request_items: &mut Vec<DynamoDbRequest>,
490        ) {
491            let key = key
492                .into_iter()
493                .filter(|(k, _)| self.dynamodb_keys.contains(k))
494                .collect();
495            let del_req = DeleteRequest::builder().set_key(Some(key)).build().unwrap();
496            let req = WriteRequest::builder().delete_request(del_req).build();
497            self.write_one_req(req, request_items);
498        }
499
500        pub fn write_one_req(
501            &mut self,
502            req: WriteRequest,
503            request_items: &mut Vec<DynamoDbRequest>,
504        ) {
505            let r_req = DynamoDbRequest {
506                inner: req,
507                key_items: self.dynamodb_keys.clone(),
508            };
509            request_items.retain(|item| !item.has_same_pk(&r_req));
510            request_items.push(r_req);
511        }
512
513        #[define_opaque(WriteChunkFuture)]
514        pub fn write_chunk(
515            &mut self,
516            request_items: Vec<DynamoDbRequest>,
517            max_future_send_nums: usize,
518        ) -> WriteChunkFuture {
519            let client = self.client.clone();
520            let table = self.table.clone();
521            let max_batch_item_nums = self.max_batch_item_nums;
522            let batch_write_retry_times = self.batch_write_retry_times;
523            let batch_write_retry_backoff_ms = self.batch_write_retry_backoff_ms;
524            async move {
525                let chunks = request_items
526                    .into_iter()
527                    .map(|r| r.inner)
528                    .chunks(max_batch_item_nums)
529                    .into_iter()
530                    .map(|chunk| chunk.collect::<Vec<_>>())
531                    .collect_vec();
532                let max_future_send_nums =
533                    max_future_send_nums.clamp(1, MAX_BATCH_WRITE_CONCURRENCY);
534                stream::iter(chunks.into_iter().map(|req_items| {
535                    let client = client.clone();
536                    let table = table.clone();
537                    async move {
538                        let mut req_items = req_items;
539                        let mut retry_count = 0;
540                        let mut retry_backoff = ExponentialBackoff::from_millis(
541                            batch_write_retry_backoff_ms,
542                        )
543                        .factor(2)
544                        .max_delay(Duration::from_millis(MAX_BATCH_WRITE_RETRY_DELAY_MS))
545                        .map(jitter)
546                        .take(batch_write_retry_times);
547
548                        loop {
549                            let return_consumed_capacity = if retry_count == 0 {
550                                ReturnConsumedCapacity::None
551                            } else {
552                                ReturnConsumedCapacity::Total
553                            };
554                            let reqs = hashmap! {
555                                table.clone() => req_items.clone(),
556                            };
557                            let result = client
558                                .batch_write_item()
559                                .set_request_items(Some(reqs))
560                                .return_consumed_capacity(return_consumed_capacity)
561                                .return_item_collection_metrics(ReturnItemCollectionMetrics::None)
562                                .send()
563                                .await;
564
565                            match result {
566                                Ok(output) => {
567                                    let unprocessed_items =
568                                        output.unprocessed_items().cloned().unwrap_or_default();
569                                    if unprocessed_items.is_empty() {
570                                        if retry_count > 0 {
571                                            tracing::warn!(
572                                                retry_count,
573                                                consumed_capacity = ?output.consumed_capacity(),
574                                                "DynamoDB batch write retry succeeded"
575                                            );
576                                        }
577                                        return Ok(());
578                                    }
579
580                                    req_items = unprocessed_items.into_values().flatten().collect();
581                                    if retry_count >= batch_write_retry_times {
582                                        return Err(SinkError::DynamoDb(anyhow!(
583                                            "failed to write {} unprocessed items to DynamoDB sink after {} retries",
584                                            req_items.len(),
585                                            batch_write_retry_times,
586                                        )));
587                                    }
588                                }
589                                Err(e) => {
590                                    return Err(SinkError::DynamoDb(
591                                        anyhow!(e).context("failed to write items to DynamoDB sink"),
592                                    ));
593                                }
594                            }
595
596                            retry_count += 1;
597                            let Some(delay) = retry_backoff.next() else {
598                                return Err(SinkError::DynamoDb(anyhow!(
599                                    "failed to write {} unprocessed items to DynamoDB sink after {} retries",
600                                    req_items.len(),
601                                    batch_write_retry_times,
602                                )));
603                            };
604                            tracing::warn!(
605                                retry_count,
606                                delay_ms = delay.as_millis(),
607                                unprocessed_items_count = req_items.len(),
608                                "retrying DynamoDB batch write"
609                            );
610                            sleep(delay).await;
611                        }
612                    }
613                }))
614                .buffer_unordered(max_future_send_nums)
615                .try_collect::<Vec<_>>()
616                .await?;
617                Ok(())
618            }
619            .boxed()
620        }
621    }
622}
623
624#[cfg(test)]
625mod tests {
626    use aws_sdk_dynamodb::types::{DeleteRequest, KeyType, PutRequest};
627
628    use super::*;
629
630    fn dynamodb_put_request(
631        items: impl IntoIterator<Item = (&'static str, &'static str)>,
632    ) -> DynamoDbRequest {
633        let item = dynamodb_items(items);
634        let put_req = PutRequest::builder().set_item(Some(item)).build().unwrap();
635        DynamoDbRequest {
636            inner: WriteRequest::builder().put_request(put_req).build(),
637            key_items: vec!["pk".to_owned(), "sk".to_owned()],
638        }
639    }
640
641    fn dynamodb_delete_request(
642        items: impl IntoIterator<Item = (&'static str, &'static str)>,
643    ) -> DynamoDbRequest {
644        let key = dynamodb_items(items);
645        let delete_req = DeleteRequest::builder().set_key(Some(key)).build().unwrap();
646        DynamoDbRequest {
647            inner: WriteRequest::builder().delete_request(delete_req).build(),
648            key_items: vec!["pk".to_owned(), "sk".to_owned()],
649        }
650    }
651
652    fn dynamodb_items(
653        items: impl IntoIterator<Item = (&'static str, &'static str)>,
654    ) -> HashMap<String, AttributeValue> {
655        items
656            .into_iter()
657            .map(|(k, v)| (k.to_owned(), AttributeValue::S(v.to_owned())))
658            .collect()
659    }
660
661    #[test]
662    fn dynamodb_request_compares_pk_by_key_attribute() {
663        let req = dynamodb_put_request([("pk", "a"), ("sk", "b")]);
664        let swapped_values = dynamodb_put_request([("pk", "b"), ("sk", "a")]);
665        let same_pk = dynamodb_put_request([("pk", "a"), ("sk", "b")]);
666
667        assert!(!req.has_same_pk(&swapped_values));
668        assert!(req.has_same_pk(&same_pk));
669    }
670
671    #[test]
672    fn dynamodb_request_empty_key_items_never_match() {
673        let mut req = dynamodb_put_request([("pk", "a"), ("sk", "b")]);
674        let same_pk = dynamodb_put_request([("pk", "a"), ("sk", "b")]);
675        req.key_items.clear();
676
677        assert!(!req.has_same_pk(&same_pk));
678    }
679
680    #[test]
681    fn dynamodb_request_compares_put_and_delete_by_composite_pk() {
682        let put = dynamodb_put_request([("pk", "a"), ("sk", "b"), ("value", "1")]);
683        let same_pk_delete = dynamodb_delete_request([("pk", "a"), ("sk", "b")]);
684        let different_hash_key_delete = dynamodb_delete_request([("pk", "x"), ("sk", "b")]);
685        let different_range_key_delete = dynamodb_delete_request([("pk", "a"), ("sk", "x")]);
686
687        assert!(put.has_same_pk(&same_pk_delete));
688        assert!(same_pk_delete.has_same_pk(&put));
689        assert!(!put.has_same_pk(&different_hash_key_delete));
690        assert!(!put.has_same_pk(&different_range_key_delete));
691    }
692
693    #[test]
694    fn dynamodb_key_schema_empty_errors() {
695        let err = dynamodb_key_schema_names("test_table", &[]).unwrap_err();
696
697        assert!(
698            err.to_string()
699                .contains("table test_table key schema is empty")
700        );
701    }
702
703    #[test]
704    fn dynamodb_key_schema_must_match_rw_pk() {
705        let dynamodb_keys = ["pk".to_owned(), "sk".to_owned()];
706        let same_rw_pk = ["sk".to_owned(), "pk".to_owned()];
707        let extra_rw_pk = ["pk".to_owned(), "sk".to_owned(), "extra".to_owned()];
708        let different_rw_pk = ["pk".to_owned(), "other".to_owned()];
709
710        validate_pk_matches_dynamodb_key_schema("test_table", &same_rw_pk, &dynamodb_keys).unwrap();
711
712        assert!(
713            validate_pk_matches_dynamodb_key_schema("test_table", &extra_rw_pk, &dynamodb_keys)
714                .unwrap_err()
715                .to_string()
716                .contains("must match RisingWave primary key")
717        );
718        assert!(
719            validate_pk_matches_dynamodb_key_schema("test_table", &different_rw_pk, &dynamodb_keys)
720                .unwrap_err()
721                .to_string()
722                .contains("must match RisingWave primary key")
723        );
724    }
725
726    #[test]
727    fn dynamodb_key_schema_names_uses_explicit_schema() {
728        let key_schema = vec![
729            KeySchemaElement::builder()
730                .attribute_name("pk")
731                .key_type(KeyType::Hash)
732                .build()
733                .unwrap(),
734            KeySchemaElement::builder()
735                .attribute_name("sk")
736                .key_type(KeyType::Range)
737                .build()
738                .unwrap(),
739        ];
740
741        assert_eq!(
742            dynamodb_key_schema_names("test_table", &key_schema).unwrap(),
743            vec!["pk".to_owned(), "sk".to_owned()]
744        );
745    }
746}