1use std::collections::{BTreeMap, BTreeSet, HashMap};
16
17use anyhow::{Context, anyhow};
18use aws_sdk_dynamodb as dynamodb;
19use aws_sdk_dynamodb::client::Client;
20use aws_smithy_types::Blob;
21use dynamodb::types::{AttributeValue, KeySchemaElement, TableStatus, WriteRequest};
22use futures::TryFutureExt;
23use risingwave_common::array::{Op, RowRef, StreamChunk};
24use risingwave_common::catalog::Schema;
25use risingwave_common::row::Row as _;
26use risingwave_common::types::{DataType, ScalarRefImpl, ToText};
27use risingwave_common::util::iter_util::ZipEqDebug;
28use serde::Deserialize;
29use serde_with::{DisplayFromStr, serde_as};
30use with_options::WithOptions;
31use write_chunk_future::{DynamoDbPayloadWriter, WriteChunkFuture};
32
33use super::writer::{
34 AsyncTruncateLogSinkerOf, AsyncTruncateSinkWriter, AsyncTruncateSinkWriterExt,
35};
36use super::{Result, Sink, SinkError, SinkParam, SinkWriterParam};
37use crate::connector_common::AwsAuthProps;
38use crate::enforce_secret::EnforceSecret;
39use crate::error::ConnectorResult;
40use crate::sink::log_store::DeliveryFutureManagerAddFuture;
41
42pub const DYNAMO_DB_SINK: &str = "dynamodb";
43
44#[serde_as]
45#[derive(Deserialize, Debug, Clone, WithOptions)]
46pub struct DynamoDbConfig {
47 #[serde(rename = "table", alias = "dynamodb.table")]
48 pub table: String,
49
50 #[serde(rename = "dynamodb.max_batch_rows", default = "default_max_batch_rows")]
51 #[serde_as(as = "DisplayFromStr")]
52 #[deprecated]
53 pub max_batch_rows: usize,
54
55 #[serde(flatten)]
56 pub aws_auth_props: AwsAuthProps,
57
58 #[serde(
59 rename = "dynamodb.max_batch_item_nums",
60 default = "default_max_batch_item_nums"
61 )]
62 #[serde_as(as = "DisplayFromStr")]
63 pub max_batch_item_nums: usize,
64
65 #[serde(
66 rename = "dynamodb.max_future_send_nums",
67 default = "default_max_future_send_nums"
68 )]
69 #[serde_as(as = "DisplayFromStr")]
70 pub max_future_send_nums: usize,
71
72 #[serde(
73 rename = "dynamodb.batch_write_retry_times",
74 default = "default_batch_write_retry_times"
75 )]
76 #[serde_as(as = "DisplayFromStr")]
77 pub batch_write_retry_times: usize,
78
79 #[serde(
80 rename = "dynamodb.batch_write_retry_backoff_ms",
81 default = "default_batch_write_retry_backoff_ms"
82 )]
83 #[serde_as(as = "DisplayFromStr")]
84 pub batch_write_retry_backoff_ms: u64,
85}
86
87impl EnforceSecret for DynamoDbConfig {
88 fn enforce_one(prop: &str) -> crate::error::ConnectorResult<()> {
89 AwsAuthProps::enforce_one(prop)
90 }
91}
92
93fn default_max_batch_item_nums() -> usize {
94 25
95}
96
97fn default_max_future_send_nums() -> usize {
98 256
99}
100
101fn default_batch_write_retry_times() -> usize {
102 3
103}
104
105fn default_batch_write_retry_backoff_ms() -> u64 {
106 100
107}
108
109fn default_max_batch_rows() -> usize {
110 1024
111}
112
113impl DynamoDbConfig {
114 pub async fn build_client(&self) -> ConnectorResult<Client> {
115 let config = &self.aws_auth_props;
116 let aws_config = config.build_config().await?;
117
118 Ok(Client::new(&aws_config))
119 }
120
121 fn from_btreemap(values: BTreeMap<String, String>) -> Result<Self> {
122 serde_json::from_value::<DynamoDbConfig>(serde_json::to_value(values).unwrap())
123 .map_err(|e| SinkError::Config(anyhow!(e)))
124 }
125}
126
127#[derive(Clone, Debug)]
128pub struct DynamoDbSink {
129 pub config: DynamoDbConfig,
130 schema: Schema,
131 pk_indices: Vec<usize>,
132}
133
134impl EnforceSecret for DynamoDbSink {
135 fn enforce_secret<'a>(
136 prop_iter: impl Iterator<Item = &'a str>,
137 ) -> crate::error::ConnectorResult<()> {
138 for prop in prop_iter {
139 DynamoDbConfig::enforce_one(prop)?;
140 }
141 Ok(())
142 }
143}
144
145impl Sink for DynamoDbSink {
146 type LogSinker = AsyncTruncateLogSinkerOf<DynamoDbSinkWriter>;
147
148 const SINK_NAME: &'static str = DYNAMO_DB_SINK;
149
150 async fn validate(&self) -> Result<()> {
151 risingwave_common::license::Feature::DynamoDbSink
152 .check_available()
153 .map_err(|e| anyhow::anyhow!(e))?;
154 let client = (self.config.build_client().await)
155 .context("validate DynamoDB sink error")
156 .map_err(SinkError::DynamoDb)?;
157
158 let table_name = &self.config.table;
159 let output = client
160 .describe_table()
161 .table_name(table_name)
162 .send()
163 .await
164 .map_err(|e| anyhow!(e))?;
165 let Some(table) = output.table else {
166 return Err(SinkError::DynamoDb(anyhow!(
167 "table {} not found",
168 table_name
169 )));
170 };
171 if !matches!(table.table_status(), Some(TableStatus::Active)) {
172 return Err(SinkError::DynamoDb(anyhow!(
173 "table {} is not active",
174 table_name
175 )));
176 }
177 let rw_pk_names = rw_pk_names(&self.schema, &self.pk_indices)?;
178 let dynamodb_keys = dynamodb_key_schema_names(table_name, table.key_schema())?;
179 validate_pk_matches_dynamodb_key_schema(table_name, &rw_pk_names, &dynamodb_keys)?;
180
181 Ok(())
182 }
183
184 async fn new_log_sinker(&self, _writer_param: SinkWriterParam) -> Result<Self::LogSinker> {
185 Ok(
186 DynamoDbSinkWriter::new(self.config.clone(), self.schema.clone())
187 .await?
188 .into_log_sinker(self.config.max_future_send_nums),
189 )
190 }
191}
192
193impl TryFrom<SinkParam> for DynamoDbSink {
194 type Error = SinkError;
195
196 fn try_from(param: SinkParam) -> std::result::Result<Self, Self::Error> {
197 let schema = param.schema();
198 let pk_indices = param.downstream_pk_or_empty();
199 let config = DynamoDbConfig::from_btreemap(param.properties)?;
200
201 Ok(Self {
202 config,
203 schema,
204 pk_indices,
205 })
206 }
207}
208
209#[derive(Debug)]
210struct DynamoDbRequest {
211 inner: WriteRequest,
212 key_items: Vec<String>,
213}
214
215impl DynamoDbRequest {
216 fn extract_key(&self) -> Option<&HashMap<String, AttributeValue>> {
217 match (&self.inner.put_request(), &self.inner.delete_request()) {
218 (Some(put_req), None) => Some(&put_req.item),
219 (None, Some(del_req)) => Some(&del_req.key),
220 _ => None,
221 }
222 }
223
224 fn has_same_pk(&self, other: &Self) -> bool {
225 if self.key_items.is_empty() {
226 return false;
227 }
228
229 let Some(key) = self.extract_key() else {
230 return false;
231 };
232 let Some(other_key) = other.extract_key() else {
233 return false;
234 };
235
236 self.key_items.iter().all(|key_item| {
237 matches!(
238 (key.get(key_item), other_key.get(key_item)),
239 (Some(value), Some(other_value)) if value == other_value
240 )
241 })
242 }
243}
244
245pub struct DynamoDbSinkWriter {
246 payload_writer: DynamoDbPayloadWriter,
247 formatter: DynamoDbFormatter,
248 max_future_send_nums: usize,
249}
250
251impl DynamoDbSinkWriter {
252 pub async fn new(config: DynamoDbConfig, schema: Schema) -> Result<Self> {
253 let client = config.build_client().await?;
254 let table_name = &config.table;
255 let output = client
256 .describe_table()
257 .table_name(table_name)
258 .send()
259 .await
260 .map_err(|e| anyhow!(e))?;
261 let Some(table) = output.table else {
262 return Err(SinkError::DynamoDb(anyhow!(
263 "table {} not found",
264 table_name
265 )));
266 };
267 let dynamodb_keys = dynamodb_key_schema_names(table_name, table.key_schema())?;
268
269 let payload_writer = DynamoDbPayloadWriter {
270 client,
271 table: config.table.clone(),
272 dynamodb_keys,
273 max_batch_item_nums: config.max_batch_item_nums,
274 batch_write_retry_times: config.batch_write_retry_times,
275 batch_write_retry_backoff_ms: config.batch_write_retry_backoff_ms,
276 };
277
278 Ok(Self {
279 payload_writer,
280 formatter: DynamoDbFormatter { schema },
281 max_future_send_nums: config.max_future_send_nums,
282 })
283 }
284
285 fn write_chunk_inner(&mut self, chunk: StreamChunk) -> Result<WriteChunkFuture> {
286 let mut request_items = Vec::new();
287 for (op, row) in chunk.rows() {
288 let items = self.formatter.format_row(row)?;
289 match op {
290 Op::Insert | Op::UpdateInsert => {
291 self.payload_writer
292 .write_one_insert(items, &mut request_items);
293 }
294 Op::Delete => {
295 self.payload_writer
296 .write_one_delete(items, &mut request_items);
297 }
298 Op::UpdateDelete => {}
299 }
300 }
301 Ok(self
302 .payload_writer
303 .write_chunk(request_items, self.max_future_send_nums))
304 }
305}
306
307impl AsyncTruncateSinkWriter for DynamoDbSinkWriter {
308 type DeliveryFuture = WriteChunkFuture;
309
310 async fn write_chunk<'a>(
311 &'a mut self,
312 chunk: StreamChunk,
313 _add_future: DeliveryFutureManagerAddFuture<'a, Self::DeliveryFuture>,
314 ) -> Result<()> {
315 self.write_chunk_inner(chunk)?.map_ok(|_| ()).await?;
316 Ok(())
317 }
318}
319
320struct DynamoDbFormatter {
321 schema: Schema,
322}
323
324impl DynamoDbFormatter {
325 fn format_row(&self, row: RowRef<'_>) -> Result<HashMap<String, AttributeValue>> {
326 row.iter()
327 .zip_eq_debug((self.schema.clone()).into_fields())
328 .map(|(scalar, field)| {
329 map_data(scalar, &field.data_type()).map(|attr| (field.name, attr))
330 })
331 .collect()
332 }
333}
334
335fn map_data(scalar_ref: Option<ScalarRefImpl<'_>>, data_type: &DataType) -> Result<AttributeValue> {
336 let Some(scalar_ref) = scalar_ref else {
337 return Ok(AttributeValue::Null(true));
338 };
339 let attr = match data_type {
340 DataType::Int16
341 | DataType::Int32
342 | DataType::Int64
343 | DataType::Int256
344 | DataType::Float32
345 | DataType::Float64
346 | DataType::Decimal
347 | DataType::Serial => AttributeValue::N(scalar_ref.to_text_with_type(data_type)),
348 DataType::Varchar
350 | DataType::Interval
351 | DataType::Date
352 | DataType::Time
353 | DataType::Timestamp
354 | DataType::Timestamptz
355 | DataType::Jsonb => AttributeValue::S(scalar_ref.to_text_with_type(data_type)),
356 DataType::Boolean => AttributeValue::Bool(scalar_ref.into_bool()),
357 DataType::Bytea => AttributeValue::B(Blob::new(scalar_ref.into_bytea())),
358 DataType::List(lt) => {
359 let list_attr = scalar_ref
360 .into_list()
361 .iter()
362 .map(|x| map_data(x, lt.elem()))
363 .collect::<Result<Vec<_>>>()?;
364 AttributeValue::L(list_attr)
365 }
366 DataType::Struct(st) => {
367 let mut map = HashMap::with_capacity(st.len());
368 for (sub_datum_ref, (name, data_type)) in scalar_ref
369 .into_struct()
370 .iter_fields_ref()
371 .zip_eq_debug(st.iter())
372 {
373 let attr = map_data(sub_datum_ref, data_type)?;
374 map.insert(name.to_owned(), attr);
375 }
376 AttributeValue::M(map)
377 }
378 DataType::Map(_m) => {
379 return Err(SinkError::DynamoDb(anyhow!("map is not supported yet")));
380 }
381 DataType::Vector(_) => {
382 return Err(SinkError::DynamoDb(anyhow!("vector is not supported yet")));
383 }
384 };
385 Ok(attr)
386}
387
388fn rw_pk_names(schema: &Schema, pk_indices: &[usize]) -> Result<Vec<String>> {
389 pk_indices
390 .iter()
391 .map(|pk_idx| {
392 schema
393 .fields()
394 .get(*pk_idx)
395 .map(|field| field.name.clone())
396 .ok_or_else(|| {
397 SinkError::DynamoDb(anyhow!(
398 "RisingWave primary key column index {} is out of range",
399 pk_idx
400 ))
401 })
402 })
403 .collect()
404}
405
406fn dynamodb_key_schema_names(
407 table_name: &str,
408 key_schema: &[KeySchemaElement],
409) -> Result<Vec<String>> {
410 if key_schema.is_empty() {
411 return Err(SinkError::DynamoDb(anyhow!(
412 "table {} key schema is empty",
413 table_name
414 )));
415 }
416
417 Ok(key_schema
418 .iter()
419 .map(|key_element| key_element.attribute_name().to_owned())
420 .collect())
421}
422
423fn validate_pk_matches_dynamodb_key_schema(
424 table_name: &str,
425 rw_pk_names: &[String],
426 dynamodb_keys: &[String],
427) -> Result<()> {
428 let rw_pk_set = rw_pk_names.iter().collect::<BTreeSet<_>>();
429 let dynamodb_key_set = dynamodb_keys.iter().collect::<BTreeSet<_>>();
430 if rw_pk_names.len() != dynamodb_keys.len() || rw_pk_set != dynamodb_key_set {
431 return Err(SinkError::DynamoDb(anyhow!(
432 "DynamoDB table {} primary key {:?} must match RisingWave primary key {:?}",
433 table_name,
434 dynamodb_keys,
435 rw_pk_names
436 )));
437 }
438
439 Ok(())
440}
441
442mod write_chunk_future {
443 use std::collections::HashMap;
444 use std::time::Duration;
445
446 use anyhow::anyhow;
447 use aws_sdk_dynamodb as dynamodb;
448 use aws_sdk_dynamodb::client::Client;
449 use dynamodb::types::{
450 AttributeValue, DeleteRequest, PutRequest, ReturnConsumedCapacity,
451 ReturnItemCollectionMetrics, WriteRequest,
452 };
453 use futures::{FutureExt, StreamExt, TryFuture, TryStreamExt, stream};
454 use itertools::Itertools;
455 use maplit::hashmap;
456 use tokio::time::sleep;
457 use tokio_retry::strategy::{ExponentialBackoff, jitter};
458
459 use super::{DynamoDbRequest, SinkError};
460
461 const MAX_BATCH_WRITE_RETRY_DELAY_MS: u64 = 2000;
462 const MAX_BATCH_WRITE_CONCURRENCY: usize = 256;
463
464 pub struct DynamoDbPayloadWriter {
465 pub client: Client,
466 pub table: String,
467 pub dynamodb_keys: Vec<String>,
468 pub max_batch_item_nums: usize,
469 pub batch_write_retry_times: usize,
470 pub batch_write_retry_backoff_ms: u64,
471 }
472
473 pub type WriteChunkFuture = impl TryFuture<Ok = (), Error = SinkError> + Unpin + Send + 'static;
474
475 impl DynamoDbPayloadWriter {
476 pub fn write_one_insert(
477 &mut self,
478 item: HashMap<String, AttributeValue>,
479 request_items: &mut Vec<DynamoDbRequest>,
480 ) {
481 let put_req = PutRequest::builder().set_item(Some(item)).build().unwrap();
482 let req = WriteRequest::builder().put_request(put_req).build();
483 self.write_one_req(req, request_items);
484 }
485
486 pub fn write_one_delete(
487 &mut self,
488 key: HashMap<String, AttributeValue>,
489 request_items: &mut Vec<DynamoDbRequest>,
490 ) {
491 let key = key
492 .into_iter()
493 .filter(|(k, _)| self.dynamodb_keys.contains(k))
494 .collect();
495 let del_req = DeleteRequest::builder().set_key(Some(key)).build().unwrap();
496 let req = WriteRequest::builder().delete_request(del_req).build();
497 self.write_one_req(req, request_items);
498 }
499
500 pub fn write_one_req(
501 &mut self,
502 req: WriteRequest,
503 request_items: &mut Vec<DynamoDbRequest>,
504 ) {
505 let r_req = DynamoDbRequest {
506 inner: req,
507 key_items: self.dynamodb_keys.clone(),
508 };
509 request_items.retain(|item| !item.has_same_pk(&r_req));
510 request_items.push(r_req);
511 }
512
513 #[define_opaque(WriteChunkFuture)]
514 pub fn write_chunk(
515 &mut self,
516 request_items: Vec<DynamoDbRequest>,
517 max_future_send_nums: usize,
518 ) -> WriteChunkFuture {
519 let client = self.client.clone();
520 let table = self.table.clone();
521 let max_batch_item_nums = self.max_batch_item_nums;
522 let batch_write_retry_times = self.batch_write_retry_times;
523 let batch_write_retry_backoff_ms = self.batch_write_retry_backoff_ms;
524 async move {
525 let chunks = request_items
526 .into_iter()
527 .map(|r| r.inner)
528 .chunks(max_batch_item_nums)
529 .into_iter()
530 .map(|chunk| chunk.collect::<Vec<_>>())
531 .collect_vec();
532 let max_future_send_nums =
533 max_future_send_nums.clamp(1, MAX_BATCH_WRITE_CONCURRENCY);
534 stream::iter(chunks.into_iter().map(|req_items| {
535 let client = client.clone();
536 let table = table.clone();
537 async move {
538 let mut req_items = req_items;
539 let mut retry_count = 0;
540 let mut retry_backoff = ExponentialBackoff::from_millis(
541 batch_write_retry_backoff_ms,
542 )
543 .factor(2)
544 .max_delay(Duration::from_millis(MAX_BATCH_WRITE_RETRY_DELAY_MS))
545 .map(jitter)
546 .take(batch_write_retry_times);
547
548 loop {
549 let return_consumed_capacity = if retry_count == 0 {
550 ReturnConsumedCapacity::None
551 } else {
552 ReturnConsumedCapacity::Total
553 };
554 let reqs = hashmap! {
555 table.clone() => req_items.clone(),
556 };
557 let result = client
558 .batch_write_item()
559 .set_request_items(Some(reqs))
560 .return_consumed_capacity(return_consumed_capacity)
561 .return_item_collection_metrics(ReturnItemCollectionMetrics::None)
562 .send()
563 .await;
564
565 match result {
566 Ok(output) => {
567 let unprocessed_items =
568 output.unprocessed_items().cloned().unwrap_or_default();
569 if unprocessed_items.is_empty() {
570 if retry_count > 0 {
571 tracing::warn!(
572 retry_count,
573 consumed_capacity = ?output.consumed_capacity(),
574 "DynamoDB batch write retry succeeded"
575 );
576 }
577 return Ok(());
578 }
579
580 req_items = unprocessed_items.into_values().flatten().collect();
581 if retry_count >= batch_write_retry_times {
582 return Err(SinkError::DynamoDb(anyhow!(
583 "failed to write {} unprocessed items to DynamoDB sink after {} retries",
584 req_items.len(),
585 batch_write_retry_times,
586 )));
587 }
588 }
589 Err(e) => {
590 return Err(SinkError::DynamoDb(
591 anyhow!(e).context("failed to write items to DynamoDB sink"),
592 ));
593 }
594 }
595
596 retry_count += 1;
597 let Some(delay) = retry_backoff.next() else {
598 return Err(SinkError::DynamoDb(anyhow!(
599 "failed to write {} unprocessed items to DynamoDB sink after {} retries",
600 req_items.len(),
601 batch_write_retry_times,
602 )));
603 };
604 tracing::warn!(
605 retry_count,
606 delay_ms = delay.as_millis(),
607 unprocessed_items_count = req_items.len(),
608 "retrying DynamoDB batch write"
609 );
610 sleep(delay).await;
611 }
612 }
613 }))
614 .buffer_unordered(max_future_send_nums)
615 .try_collect::<Vec<_>>()
616 .await?;
617 Ok(())
618 }
619 .boxed()
620 }
621 }
622}
623
624#[cfg(test)]
625mod tests {
626 use aws_sdk_dynamodb::types::{DeleteRequest, KeyType, PutRequest};
627
628 use super::*;
629
630 fn dynamodb_put_request(
631 items: impl IntoIterator<Item = (&'static str, &'static str)>,
632 ) -> DynamoDbRequest {
633 let item = dynamodb_items(items);
634 let put_req = PutRequest::builder().set_item(Some(item)).build().unwrap();
635 DynamoDbRequest {
636 inner: WriteRequest::builder().put_request(put_req).build(),
637 key_items: vec!["pk".to_owned(), "sk".to_owned()],
638 }
639 }
640
641 fn dynamodb_delete_request(
642 items: impl IntoIterator<Item = (&'static str, &'static str)>,
643 ) -> DynamoDbRequest {
644 let key = dynamodb_items(items);
645 let delete_req = DeleteRequest::builder().set_key(Some(key)).build().unwrap();
646 DynamoDbRequest {
647 inner: WriteRequest::builder().delete_request(delete_req).build(),
648 key_items: vec!["pk".to_owned(), "sk".to_owned()],
649 }
650 }
651
652 fn dynamodb_items(
653 items: impl IntoIterator<Item = (&'static str, &'static str)>,
654 ) -> HashMap<String, AttributeValue> {
655 items
656 .into_iter()
657 .map(|(k, v)| (k.to_owned(), AttributeValue::S(v.to_owned())))
658 .collect()
659 }
660
661 #[test]
662 fn dynamodb_request_compares_pk_by_key_attribute() {
663 let req = dynamodb_put_request([("pk", "a"), ("sk", "b")]);
664 let swapped_values = dynamodb_put_request([("pk", "b"), ("sk", "a")]);
665 let same_pk = dynamodb_put_request([("pk", "a"), ("sk", "b")]);
666
667 assert!(!req.has_same_pk(&swapped_values));
668 assert!(req.has_same_pk(&same_pk));
669 }
670
671 #[test]
672 fn dynamodb_request_empty_key_items_never_match() {
673 let mut req = dynamodb_put_request([("pk", "a"), ("sk", "b")]);
674 let same_pk = dynamodb_put_request([("pk", "a"), ("sk", "b")]);
675 req.key_items.clear();
676
677 assert!(!req.has_same_pk(&same_pk));
678 }
679
680 #[test]
681 fn dynamodb_request_compares_put_and_delete_by_composite_pk() {
682 let put = dynamodb_put_request([("pk", "a"), ("sk", "b"), ("value", "1")]);
683 let same_pk_delete = dynamodb_delete_request([("pk", "a"), ("sk", "b")]);
684 let different_hash_key_delete = dynamodb_delete_request([("pk", "x"), ("sk", "b")]);
685 let different_range_key_delete = dynamodb_delete_request([("pk", "a"), ("sk", "x")]);
686
687 assert!(put.has_same_pk(&same_pk_delete));
688 assert!(same_pk_delete.has_same_pk(&put));
689 assert!(!put.has_same_pk(&different_hash_key_delete));
690 assert!(!put.has_same_pk(&different_range_key_delete));
691 }
692
693 #[test]
694 fn dynamodb_key_schema_empty_errors() {
695 let err = dynamodb_key_schema_names("test_table", &[]).unwrap_err();
696
697 assert!(
698 err.to_string()
699 .contains("table test_table key schema is empty")
700 );
701 }
702
703 #[test]
704 fn dynamodb_key_schema_must_match_rw_pk() {
705 let dynamodb_keys = ["pk".to_owned(), "sk".to_owned()];
706 let same_rw_pk = ["sk".to_owned(), "pk".to_owned()];
707 let extra_rw_pk = ["pk".to_owned(), "sk".to_owned(), "extra".to_owned()];
708 let different_rw_pk = ["pk".to_owned(), "other".to_owned()];
709
710 validate_pk_matches_dynamodb_key_schema("test_table", &same_rw_pk, &dynamodb_keys).unwrap();
711
712 assert!(
713 validate_pk_matches_dynamodb_key_schema("test_table", &extra_rw_pk, &dynamodb_keys)
714 .unwrap_err()
715 .to_string()
716 .contains("must match RisingWave primary key")
717 );
718 assert!(
719 validate_pk_matches_dynamodb_key_schema("test_table", &different_rw_pk, &dynamodb_keys)
720 .unwrap_err()
721 .to_string()
722 .contains("must match RisingWave primary key")
723 );
724 }
725
726 #[test]
727 fn dynamodb_key_schema_names_uses_explicit_schema() {
728 let key_schema = vec![
729 KeySchemaElement::builder()
730 .attribute_name("pk")
731 .key_type(KeyType::Hash)
732 .build()
733 .unwrap(),
734 KeySchemaElement::builder()
735 .attribute_name("sk")
736 .key_type(KeyType::Range)
737 .build()
738 .unwrap(),
739 ];
740
741 assert_eq!(
742 dynamodb_key_schema_names("test_table", &key_schema).unwrap(),
743 vec!["pk".to_owned(), "sk".to_owned()]
744 );
745 }
746}