1use core::num::NonZero;
16use std::fmt::Write;
17use std::time::Duration;
18
19use anyhow::anyhow;
20use bytes::BytesMut;
21use phf::{Set, phf_set};
22use risingwave_common::array::StreamChunk;
23use risingwave_common::catalog::{ColumnDesc, ColumnId, Field, Schema};
24use risingwave_common::types::DataType;
25use risingwave_pb::connector_service::sink_metadata::SerializedMetadata;
26use risingwave_pb::connector_service::{SinkMetadata, sink_metadata};
27use sea_orm::DatabaseConnection;
28use serde::Deserialize;
29use serde_json::json;
30use serde_with::{DisplayFromStr, serde_as};
31use thiserror_ext::AsReport;
32use tokio::sync::mpsc::{UnboundedSender, unbounded_channel};
33use tokio::time::{MissedTickBehavior, interval};
34use tonic::async_trait;
35use tracing::warn;
36use with_options::WithOptions;
37
38use crate::connector_common::IcebergSinkCompactionUpdate;
39use crate::enforce_secret::EnforceSecret;
40use crate::sink::coordinate::CoordinatedLogSinker;
41use crate::sink::file_sink::opendal_sink::FileSink;
42use crate::sink::file_sink::s3::{S3Common, S3Sink};
43use crate::sink::jdbc_jni_client::{self, JdbcJniClient};
44use crate::sink::remote::CoordinatedRemoteSinkWriter;
45use crate::sink::snowflake_redshift::{
46 __OP, __ROW_ID, AugmentedChunk, SnowflakeRedshiftSinkS3Writer, build_opendal_writer_path,
47};
48use crate::sink::writer::SinkWriter;
49use crate::sink::{
50 Result, Sink, SinkCommitCoordinator, SinkCommittedEpochSubscriber, SinkError, SinkParam,
51 SinkWriterMetrics,
52};
53
54pub const REDSHIFT_SINK: &str = "redshift";
55
56fn build_full_table_name(schema_name: Option<&str>, table_name: &str) -> String {
57 if let Some(schema_name) = schema_name {
58 format!(r#""{}"."{}""#, schema_name, table_name)
59 } else {
60 format!(r#""{}""#, table_name)
61 }
62}
63
64fn build_alter_add_column_sql(
65 schema_name: Option<&str>,
66 table_name: &str,
67 columns: &Vec<(String, String)>,
68) -> String {
69 let full_table_name = build_full_table_name(schema_name, table_name);
70 jdbc_jni_client::build_alter_add_column_sql(&full_table_name, columns, false)
72}
73
74#[serde_as]
75#[derive(Debug, Clone, Deserialize, WithOptions)]
76pub struct RedShiftConfig {
77 #[serde(rename = "jdbc.url")]
78 pub jdbc_url: String,
79
80 #[serde(rename = "user")]
81 pub username: Option<String>,
82
83 #[serde(rename = "password")]
84 pub password: Option<String>,
85
86 #[serde(rename = "schema")]
87 pub schema: Option<String>,
88
89 #[serde(rename = "table.name")]
90 pub table: String,
91
92 #[serde(rename = "intermediate.table.name")]
93 pub cdc_table: Option<String>,
94
95 #[serde(default)]
96 #[serde(rename = "create_table_if_not_exists")]
97 #[serde_as(as = "DisplayFromStr")]
98 pub create_table_if_not_exists: bool,
99
100 #[serde(default = "default_schedule")]
101 #[serde(rename = "write.target.interval.seconds")]
102 #[serde_as(as = "DisplayFromStr")]
103 pub schedule_seconds: u64,
104
105 #[serde(default = "default_batch_insert_rows")]
106 #[serde(rename = "batch.insert.rows")]
107 #[serde_as(as = "DisplayFromStr")]
108 pub batch_insert_rows: u32,
109
110 #[serde(default = "default_with_s3")]
111 #[serde(rename = "with_s3")]
112 #[serde_as(as = "DisplayFromStr")]
113 pub with_s3: bool,
114
115 #[serde(flatten)]
116 pub s3_inner: Option<S3Common>,
117}
118
119fn default_schedule() -> u64 {
120 3600 }
122
123fn default_batch_insert_rows() -> u32 {
124 4096 }
126
127fn default_with_s3() -> bool {
128 true
129}
130
131impl RedShiftConfig {
132 pub fn build_client(&self) -> Result<JdbcJniClient> {
133 let mut jdbc_url = self.jdbc_url.clone();
134 if let Some(username) = &self.username {
135 jdbc_url = format!("{}?user={}", jdbc_url, username);
136 }
137 if let Some(password) = &self.password {
138 jdbc_url = format!("{}&password={}", jdbc_url, password);
139 }
140 JdbcJniClient::new(jdbc_url)
141 }
142}
143
144#[derive(Debug)]
145pub struct RedshiftSink {
146 config: RedShiftConfig,
147 param: SinkParam,
148 is_append_only: bool,
149 schema: Schema,
150 pk_indices: Vec<usize>,
151}
152impl EnforceSecret for RedshiftSink {
153 const ENFORCE_SECRET_PROPERTIES: Set<&'static str> = phf_set! {
154 "user",
155 "password",
156 "jdbc.url"
157 };
158}
159
160impl TryFrom<SinkParam> for RedshiftSink {
161 type Error = SinkError;
162
163 fn try_from(param: SinkParam) -> std::result::Result<Self, Self::Error> {
164 let config = serde_json::from_value::<RedShiftConfig>(
165 serde_json::to_value(param.properties.clone()).unwrap(),
166 )
167 .map_err(|e| SinkError::Config(anyhow!(e)))?;
168 let is_append_only = param.sink_type.is_append_only();
169 let schema = param.schema().clone();
170 let pk_indices = param.downstream_pk.clone();
171 Ok(Self {
172 config,
173 param,
174 is_append_only,
175 schema,
176 pk_indices,
177 })
178 }
179}
180
181impl Sink for RedshiftSink {
182 type Coordinator = RedshiftSinkCommitter;
183 type LogSinker = CoordinatedLogSinker<RedShiftSinkWriter>;
184
185 const SINK_NAME: &'static str = REDSHIFT_SINK;
186
187 async fn validate(&self) -> Result<()> {
188 if self.config.create_table_if_not_exists {
189 let client = self.config.build_client()?;
190 let schema = self.param.schema();
191 let build_table_sql = build_create_table_sql(
192 self.config.schema.as_deref(),
193 &self.config.table,
194 &schema,
195 false,
196 )?;
197 client.execute_sql_sync(vec![build_table_sql]).await?;
198 if !self.is_append_only {
199 let cdc_table = self.config.cdc_table.as_ref().ok_or_else(|| {
200 SinkError::Config(anyhow!(
201 "intermediate.table.name is required for append-only sink"
202 ))
203 })?;
204 let build_cdc_table_sql = build_create_table_sql(
205 self.config.schema.as_deref(),
206 cdc_table,
207 &schema,
208 true,
209 )?;
210 client.execute_sql_sync(vec![build_cdc_table_sql]).await?;
211 }
212 }
213 Ok(())
214 }
215
216 fn support_schema_change() -> bool {
217 true
218 }
219
220 async fn new_log_sinker(
221 &self,
222 writer_param: crate::sink::SinkWriterParam,
223 ) -> Result<Self::LogSinker> {
224 let writer = RedShiftSinkWriter::new(
225 self.config.clone(),
226 self.is_append_only,
227 writer_param.clone(),
228 self.param.clone(),
229 )
230 .await?;
231 CoordinatedLogSinker::new(
232 &writer_param,
233 self.param.clone(),
234 writer,
235 NonZero::new(1).unwrap(),
236 )
237 .await
238 }
239
240 fn is_coordinated_sink(&self) -> bool {
241 true
242 }
243
244 async fn new_coordinator(
245 &self,
246 _db: DatabaseConnection,
247 _iceberg_compact_stat_sender: Option<UnboundedSender<IcebergSinkCompactionUpdate>>,
248 ) -> Result<Self::Coordinator> {
249 let pk_column_names: Vec<_> = self
250 .schema
251 .fields
252 .iter()
253 .enumerate()
254 .filter(|(index, _)| self.pk_indices.contains(index))
255 .map(|(_, field)| field.name.clone())
256 .collect();
257 if pk_column_names.is_empty() && !self.is_append_only {
258 return Err(SinkError::Config(anyhow!(
259 "Primary key columns not found. Please set the `primary_key` column in the sink properties, or ensure that the sink contains the primary key columns from the upstream."
260 )));
261 }
262 let all_column_names = self
263 .schema
264 .fields
265 .iter()
266 .map(|field| field.name.clone())
267 .collect();
268 let coordinator = RedshiftSinkCommitter::new(
269 self.config.clone(),
270 self.is_append_only,
271 &pk_column_names,
272 &all_column_names,
273 )?;
274 Ok(coordinator)
275 }
276}
277
278pub enum RedShiftSinkWriter {
279 S3(SnowflakeRedshiftSinkS3Writer),
280 Jdbc(RedShiftSinkJdbcWriter),
281}
282
283impl RedShiftSinkWriter {
284 pub async fn new(
285 config: RedShiftConfig,
286 is_append_only: bool,
287 writer_param: super::SinkWriterParam,
288 param: SinkParam,
289 ) -> Result<Self> {
290 let schema = param.schema();
291 if config.with_s3 {
292 let executor_id = writer_param.executor_id;
293 let s3_writer = SnowflakeRedshiftSinkS3Writer::new(
294 config.s3_inner.ok_or_else(|| {
295 SinkError::Config(anyhow!("S3 configuration is required for S3 sink"))
296 })?,
297 schema,
298 is_append_only,
299 executor_id,
300 Some(config.table),
301 )?;
302 Ok(Self::S3(s3_writer))
303 } else {
304 let jdbc_writer =
305 RedShiftSinkJdbcWriter::new(config, is_append_only, writer_param, param).await?;
306 Ok(Self::Jdbc(jdbc_writer))
307 }
308 }
309}
310
311#[async_trait]
312impl SinkWriter for RedShiftSinkWriter {
313 type CommitMetadata = Option<SinkMetadata>;
314
315 async fn begin_epoch(&mut self, epoch: u64) -> Result<()> {
316 match self {
317 Self::S3(writer) => writer.begin_epoch(epoch),
318 Self::Jdbc(writer) => writer.begin_epoch(epoch).await,
319 }
320 }
321
322 async fn write_batch(&mut self, chunk: StreamChunk) -> Result<()> {
323 match self {
324 Self::S3(writer) => writer.write_batch(chunk).await,
325 Self::Jdbc(writer) => writer.write_batch(chunk).await,
326 }
327 }
328
329 async fn barrier(&mut self, is_checkpoint: bool) -> Result<Option<SinkMetadata>> {
330 let metadata = match self {
331 Self::S3(writer) => {
332 if let Some(path) = writer.barrier(is_checkpoint).await? {
333 path.into_bytes()
334 } else {
335 vec![]
336 }
337 }
338 Self::Jdbc(writer) => {
339 writer.barrier(is_checkpoint).await?;
340 vec![]
341 }
342 };
343 Ok(Some(SinkMetadata {
344 metadata: Some(sink_metadata::Metadata::Serialized(SerializedMetadata {
345 metadata,
346 })),
347 }))
348 }
349
350 async fn abort(&mut self) -> Result<()> {
351 if let Self::Jdbc(writer) = self {
352 writer.abort().await
353 } else {
354 Ok(())
355 }
356 }
357}
358
359pub struct RedShiftSinkJdbcWriter {
360 augmented_row: AugmentedChunk,
361 jdbc_sink_writer: CoordinatedRemoteSinkWriter,
362}
363
364impl RedShiftSinkJdbcWriter {
365 pub async fn new(
366 config: RedShiftConfig,
367 is_append_only: bool,
368 writer_param: super::SinkWriterParam,
369 mut param: SinkParam,
370 ) -> Result<Self> {
371 let metrics = SinkWriterMetrics::new(&writer_param);
372 let column_descs = &mut param.columns;
373 param.properties.remove("create_table_if_not_exists");
374 param.properties.remove("write.target.interval.seconds");
375 let full_table_name = if is_append_only {
376 config.table
377 } else {
378 let max_column_id = column_descs
379 .iter()
380 .map(|column| column.column_id.get_id())
381 .max()
382 .unwrap_or(0);
383 (*column_descs).push(ColumnDesc::named(
384 __ROW_ID,
385 ColumnId::new(max_column_id + 1),
386 DataType::Varchar,
387 ));
388 (*column_descs).push(ColumnDesc::named(
389 __OP,
390 ColumnId::new(max_column_id + 2),
391 DataType::Int32,
392 ));
393 config.cdc_table.ok_or_else(|| {
394 SinkError::Config(anyhow!(
395 "intermediate.table.name is required for non-append-only sink"
396 ))
397 })?
398 };
399 param.properties.remove("intermediate.table.name");
400 param.properties.remove("table.name");
401 param.properties.remove("with_s3");
402 if let Some(schema_name) = param.properties.remove("schema") {
403 param
404 .properties
405 .insert("schema.name".to_owned(), schema_name);
406 }
407 param
408 .properties
409 .insert("table.name".to_owned(), full_table_name.clone());
410 param
411 .properties
412 .insert("type".to_owned(), "append-only".to_owned());
413
414 let jdbc_sink_writer =
415 CoordinatedRemoteSinkWriter::new(param.clone(), metrics.clone()).await?;
416 Ok(Self {
417 augmented_row: AugmentedChunk::new(0, is_append_only),
418 jdbc_sink_writer,
419 })
420 }
421
422 async fn begin_epoch(&mut self, epoch: u64) -> Result<()> {
423 self.augmented_row.reset_epoch(epoch);
424 self.jdbc_sink_writer.begin_epoch(epoch).await?;
425 Ok(())
426 }
427
428 async fn write_batch(&mut self, chunk: StreamChunk) -> Result<()> {
429 let chunk = self.augmented_row.augmented_chunk(chunk)?;
430 self.jdbc_sink_writer.write_batch(chunk).await?;
431 Ok(())
432 }
433
434 async fn barrier(&mut self, is_checkpoint: bool) -> Result<()> {
435 self.jdbc_sink_writer.barrier(is_checkpoint).await?;
436 Ok(())
437 }
438
439 async fn abort(&mut self) -> Result<()> {
440 self.jdbc_sink_writer.abort().await?;
442 Ok(())
443 }
444}
445
446pub struct RedshiftSinkCommitter {
447 config: RedShiftConfig,
448 client: JdbcJniClient,
449 pk_column_names: Vec<String>,
450 all_column_names: Vec<String>,
451 schedule_seconds: u64,
452 is_append_only: bool,
453 periodic_task_handle: Option<tokio::task::JoinHandle<()>>,
454 shutdown_sender: Option<tokio::sync::mpsc::UnboundedSender<()>>,
455}
456
457impl RedshiftSinkCommitter {
458 pub fn new(
459 config: RedShiftConfig,
460 is_append_only: bool,
461 pk_column_names: &Vec<String>,
462 all_column_names: &Vec<String>,
463 ) -> Result<Self> {
464 let client = config.build_client()?;
465 let schedule_seconds = config.schedule_seconds;
466 let (periodic_task_handle, shutdown_sender) = if !is_append_only {
467 let schema_name = config.schema.clone();
468 let target_table_name = config.table.clone();
469 let cdc_table_name = config.cdc_table.clone().ok_or_else(|| {
470 SinkError::Config(anyhow!(
471 "intermediate.table.name is required for non-append-only sink"
472 ))
473 })?;
474 let (shutdown_sender, shutdown_receiver) = unbounded_channel();
476
477 let task_client = config.build_client()?;
479
480 let pk_column_names = pk_column_names.clone();
481 let all_column_names = all_column_names.clone();
482 let periodic_task_handle = tokio::spawn(async move {
484 Self::run_periodic_query_task(
485 task_client,
486 schema_name.as_deref(),
487 &cdc_table_name,
488 &target_table_name,
489 pk_column_names,
490 all_column_names,
491 schedule_seconds,
492 shutdown_receiver,
493 )
494 .await;
495 });
496 (Some(periodic_task_handle), Some(shutdown_sender))
497 } else {
498 (None, None)
499 };
500
501 Ok(Self {
502 client,
503 config,
504 pk_column_names: pk_column_names.clone(),
505 all_column_names: all_column_names.clone(),
506 is_append_only,
507 schedule_seconds,
508 periodic_task_handle,
509 shutdown_sender,
510 })
511 }
512
513 async fn run_periodic_query_task(
515 client: JdbcJniClient,
516 schema_name: Option<&str>,
517 cdc_table_name: &str,
518 target_table_name: &str,
519 pk_column_names: Vec<String>,
520 all_column_names: Vec<String>,
521 schedule_seconds: u64,
522 mut shutdown_receiver: tokio::sync::mpsc::UnboundedReceiver<()>,
523 ) {
524 let mut interval_timer = interval(Duration::from_secs(schedule_seconds)); interval_timer.set_missed_tick_behavior(MissedTickBehavior::Skip);
526 let sql = build_create_merge_into_task_sql(
527 schema_name,
528 cdc_table_name,
529 target_table_name,
530 &pk_column_names,
531 &all_column_names,
532 );
533 loop {
534 tokio::select! {
535 _ = shutdown_receiver.recv() => {
537 tracing::info!("Periodic query task received shutdown signal, stopping");
538 break;
539 }
540 _ = interval_timer.tick() => {
542
543 match client.execute_sql_sync(sql.clone()).await {
544 Ok(_) => {
545 tracing::info!("Periodic query executed successfully for table: {}", target_table_name);
546 }
547 Err(e) => {
548 tracing::warn!("Failed to execute periodic query for table {}: {}", target_table_name, e.as_report());
549 }
550 }
551 }
552 }
553 }
554 }
555}
556
557impl Drop for RedshiftSinkCommitter {
558 fn drop(&mut self) {
559 if let Some(shutdown_sender) = &self.shutdown_sender
561 && let Err(e) = shutdown_sender.send(())
562 {
563 tracing::warn!(
564 "Failed to send shutdown signal to periodic task: {}",
565 e.as_report()
566 );
567 }
568 tracing::info!("RedshiftSinkCommitter dropped, periodic task stopped");
569 }
570}
571
572#[async_trait]
573impl SinkCommitCoordinator for RedshiftSinkCommitter {
574 async fn init(&mut self, _subscriber: SinkCommittedEpochSubscriber) -> Result<Option<u64>> {
575 Ok(None)
576 }
577
578 async fn commit(
579 &mut self,
580 _epoch: u64,
581 metadata: Vec<SinkMetadata>,
582 add_columns: Option<Vec<Field>>,
583 ) -> Result<()> {
584 let paths = metadata
585 .into_iter()
586 .filter(|m| {
587 if let Some(sink_metadata::Metadata::Serialized(SerializedMetadata { metadata })) =
588 &m.metadata
589 {
590 !metadata.is_empty()
591 } else {
592 false
593 }
594 })
595 .map(|metadata| {
596 let path = if let Some(sink_metadata::Metadata::Serialized(SerializedMetadata {
597 metadata,
598 })) = metadata.metadata
599 {
600 String::from_utf8(metadata).map_err(|e| SinkError::Config(anyhow!(e)))
601 } else {
602 Err(SinkError::Config(anyhow!("Invalid metadata format")))
603 }?;
604 Ok(json!({
605 "url": path,
606 "mandatory": true
607 }))
608 })
609 .collect::<Result<Vec<_>>>()?;
610 if !paths.is_empty() {
611 let s3_inner = self.config.s3_inner.as_ref().ok_or_else(|| {
612 SinkError::Config(anyhow!("S3 configuration is required for S3 sink"))
613 })?;
614 let s3_operator = FileSink::<S3Sink>::new_s3_sink(s3_inner)?;
615 let (mut writer, path) =
616 build_opendal_writer_path(s3_inner, 0, &s3_operator, &None).await?;
617 let manifest_json = json!({
618 "entries": paths
619 });
620 let mut chunk_buf = BytesMut::new();
621 writeln!(chunk_buf, "{}", manifest_json).unwrap();
622 writer.write(chunk_buf.freeze()).await?;
623 writer
624 .close()
625 .await
626 .map_err(|e| SinkError::File(e.to_report_string()))?;
627 let table = if self.is_append_only {
628 &self.config.table
629 } else {
630 self.config.cdc_table.as_ref().ok_or_else(|| {
631 SinkError::Config(anyhow!(
632 "intermediate.table.name is required for non-append-only sink"
633 ))
634 })?
635 };
636 let s3_inner = self.config.s3_inner.as_ref().ok_or_else(|| {
637 SinkError::Config(anyhow!("S3 configuration is required for S3 sink"))
638 })?;
639 let copy_into_sql = build_copy_into_sql(
640 self.config.schema.as_deref(),
641 table,
642 &path,
643 &s3_inner.access,
644 &s3_inner.secret,
645 &s3_inner.assume_role,
646 )?;
647 self.client.execute_sql_sync(vec![copy_into_sql]).await?;
649 }
650
651 if let Some(add_columns) = add_columns {
652 if let Some(shutdown_sender) = &self.shutdown_sender {
653 shutdown_sender
655 .send(())
656 .map_err(|e| SinkError::Config(anyhow!(e)))?;
657 }
658 let sql = build_alter_add_column_sql(
659 self.config.schema.as_deref(),
660 &self.config.table,
661 &add_columns
662 .iter()
663 .map(|f| (f.name.clone(), f.data_type.to_string()))
664 .collect::<Vec<_>>(),
665 );
666 let check_column_exists = |e: anyhow::Error| {
667 let err_str = e.to_report_string();
668 if regex::Regex::new(".+ of relation .+ already exists")
669 .unwrap()
670 .find(&err_str)
671 .is_none()
672 {
673 return Err(e);
674 }
675 warn!("redshift sink columns already exists. skipped");
676 Ok(())
677 };
678 self.client
679 .execute_sql_sync(vec![sql.clone()])
680 .await
681 .or_else(check_column_exists)?;
682 if !self.is_append_only {
683 let cdc_table_name = self.config.cdc_table.as_ref().ok_or_else(|| {
684 SinkError::Config(anyhow!(
685 "intermediate.table.name is required for non-append-only sink"
686 ))
687 })?;
688 let sql = build_alter_add_column_sql(
689 self.config.schema.as_deref(),
690 cdc_table_name,
691 &add_columns
692 .iter()
693 .map(|f| (f.name.clone(), f.data_type.to_string()))
694 .collect::<Vec<_>>(),
695 );
696 self.client
697 .execute_sql_sync(vec![sql.clone()])
698 .await
699 .or_else(check_column_exists)?;
700 self.all_column_names
701 .extend(add_columns.iter().map(|f| f.name.clone()));
702
703 if let Some(shutdown_sender) = self.shutdown_sender.take() {
704 let _ = shutdown_sender.send(());
705 }
706 if let Some(periodic_task_handle) = self.periodic_task_handle.take() {
707 let _ = periodic_task_handle.await;
708 }
709
710 let (shutdown_sender, shutdown_receiver) = unbounded_channel();
711 let client = self.client.clone();
712 let schema_name = self.config.schema.clone();
713 let cdc_table_name = self.config.cdc_table.clone().unwrap();
714 let target_table_name = self.config.table.clone();
715 let pk_column_names = self.pk_column_names.clone();
716 let all_column_names = self.all_column_names.clone();
717 let schedule_seconds = self.schedule_seconds;
718 let periodic_task_handle = tokio::spawn(async move {
719 Self::run_periodic_query_task(
720 client,
721 schema_name.as_deref(),
722 &cdc_table_name,
723 &target_table_name,
724 pk_column_names,
725 all_column_names,
726 schedule_seconds,
727 shutdown_receiver,
728 )
729 .await;
730 });
731 self.shutdown_sender = Some(shutdown_sender);
732 self.periodic_task_handle = Some(periodic_task_handle);
733 }
734 }
735 Ok(())
736 }
737}
738
739pub fn build_create_table_sql(
740 schema_name: Option<&str>,
741 table_name: &str,
742 schema: &Schema,
743 need_op_and_row_id: bool,
744) -> Result<String> {
745 let mut columns: Vec<String> = schema
746 .fields
747 .iter()
748 .map(|field| {
749 let data_type = convert_redshift_data_type(&field.data_type)?;
750 Ok(format!("{} {}", field.name, data_type))
751 })
752 .collect::<Result<Vec<String>>>()?;
753 if need_op_and_row_id {
754 columns.push(format!("{} VARCHAR(MAX)", __ROW_ID));
755 columns.push(format!("{} INT", __OP));
756 }
757 let columns_str = columns.join(", ");
758 let full_table_name = build_full_table_name(schema_name, table_name);
759 Ok(format!(
760 "CREATE TABLE IF NOT EXISTS {} ({})",
761 full_table_name, columns_str
762 ))
763}
764
765fn convert_redshift_data_type(data_type: &DataType) -> Result<String> {
766 let data_type = match data_type {
767 DataType::Int16 => "SMALLINT".to_owned(),
768 DataType::Int32 => "INTEGER".to_owned(),
769 DataType::Int64 => "BIGINT".to_owned(),
770 DataType::Float32 => "REAL".to_owned(),
771 DataType::Float64 => "FLOAT".to_owned(),
772 DataType::Boolean => "BOOLEAN".to_owned(),
773 DataType::Varchar => "VARCHAR(MAX)".to_owned(),
774 DataType::Date => "DATE".to_owned(),
775 DataType::Timestamp => "TIMESTAMP".to_owned(),
776 DataType::Timestamptz => "TIMESTAMPTZ".to_owned(),
777 DataType::Jsonb => "VARCHAR(MAX)".to_owned(),
778 DataType::Decimal => "DECIMAL".to_owned(),
779 DataType::Time => "TIME".to_owned(),
780 _ => {
781 return Err(SinkError::Config(anyhow!(
782 "Dont support auto create table for datatype: {}",
783 data_type
784 )));
785 }
786 };
787 Ok(data_type)
788}
789
790fn build_create_merge_into_task_sql(
791 schema_name: Option<&str>,
792 cdc_table_name: &str,
793 target_table_name: &str,
794 pk_column_names: &Vec<String>,
795 all_column_names: &Vec<String>,
796) -> Vec<String> {
797 let cdc_table_name = build_full_table_name(schema_name, cdc_table_name);
798 let target_table_name = build_full_table_name(schema_name, target_table_name);
799 let pk_names_str = pk_column_names.join(", ");
800 let pk_names_eq_str = pk_column_names
801 .iter()
802 .map(|name| format!("{target_table_name}.{name} = source.{name}", name = name))
803 .collect::<Vec<String>>()
804 .join(" AND ");
805 let all_column_names_set_str = all_column_names
806 .iter()
807 .map(|name| format!("{name} = source.{name}", name = name))
808 .collect::<Vec<String>>()
809 .join(", ");
810 let all_column_names_str = all_column_names.join(", ");
811 let all_column_names_insert_str = all_column_names
812 .iter()
813 .map(|name| format!("source.{name}", name = name))
814 .collect::<Vec<String>>()
815 .join(", ");
816
817 vec![
818 format!(
819 r#"
820 CREATE TEMP TABLE max_id_table AS
821 SELECT COALESCE(MAX({redshift_sink_row_id}), '0') AS max_row_id
822 FROM {cdc_table_name};
823 "#,
824 redshift_sink_row_id = __ROW_ID,
825 cdc_table_name = cdc_table_name,
826 ),
827 format!(
828 r#"
829 DELETE FROM {target_table_name}
830 USING (
831 SELECT *
832 FROM (
833 SELECT *, ROW_NUMBER() OVER (
834 PARTITION BY {pk_names_str}
835 ORDER BY {redshift_sink_row_id} DESC
836 ) AS dedupe_id
837 FROM {cdc_table_name}, max_id_table
838 WHERE {cdc_table_name}.{redshift_sink_row_id} <= max_id_table.max_row_id
839 ) AS subquery
840 WHERE dedupe_id = 1 AND {redshift_sink_op} IN (2, 4)
841 ) AS source
842 WHERE {pk_names_eq_str};
843 "#,
844 target_table_name = target_table_name,
845 pk_names_str = pk_names_str,
846 redshift_sink_row_id = __ROW_ID,
847 cdc_table_name = cdc_table_name,
848 redshift_sink_op = __OP,
849 pk_names_eq_str = pk_names_eq_str,
850 ),
851 format!(
852 r#"
853 MERGE INTO {target_table_name}
854 USING (
855 SELECT *
856 FROM (
857 SELECT *, ROW_NUMBER() OVER (
858 PARTITION BY {pk_names_str}
859 ORDER BY {redshift_sink_row_id} DESC
860 ) AS dedupe_id
861 FROM {cdc_table_name}, max_id_table
862 WHERE {cdc_table_name}.{redshift_sink_row_id} <= max_id_table.max_row_id
863 ) AS subquery
864 WHERE dedupe_id = 1 AND {redshift_sink_op} IN (1, 3)
865 ) AS source
866 ON {pk_names_eq_str}
867 WHEN MATCHED THEN
868 UPDATE SET {all_column_names_set_str}
869 WHEN NOT MATCHED THEN
870 INSERT ({all_column_names_str}) VALUES ({all_column_names_insert_str});
871 "#,
872 target_table_name = target_table_name,
873 pk_names_str = pk_names_str,
874 redshift_sink_row_id = __ROW_ID,
875 cdc_table_name = cdc_table_name,
876 redshift_sink_op = __OP,
877 pk_names_eq_str = pk_names_eq_str,
878 all_column_names_set_str = all_column_names_set_str,
879 all_column_names_str = all_column_names_str,
880 all_column_names_insert_str = all_column_names_insert_str,
881 ),
882 format!(
883 r#"
884 DELETE FROM {cdc_table_name}
885 USING max_id_table
886 WHERE {cdc_table_name}.{redshift_sink_row_id} <= max_id_table.max_row_id;
887 "#,
888 cdc_table_name = cdc_table_name,
889 redshift_sink_row_id = __ROW_ID,
890 ),
891 "DROP TABLE IF EXISTS max_id_table;".to_owned(),
892 ]
893}
894
895fn build_copy_into_sql(
896 schema_name: Option<&str>,
897 table_name: &str,
898 manifest_path: &str,
899 access_key: &Option<String>,
900 secret_key: &Option<String>,
901 assume_role: &Option<String>,
902) -> Result<String> {
903 let table_name = build_full_table_name(schema_name, table_name);
904 let credentials = if let Some(assume_role) = assume_role {
905 &format!("aws_iam_role={}", assume_role)
906 } else if let (Some(access_key), Some(secret_key)) = (access_key, secret_key) {
907 &format!(
908 "aws_access_key_id={};aws_secret_access_key={}",
909 access_key, secret_key
910 )
911 } else {
912 return Err(SinkError::Config(anyhow!(
913 "Either assume_role or access_key and secret_key must be provided for Redshift COPY command"
914 )));
915 };
916 Ok(format!(
917 r#"
918 COPY {table_name}
919 FROM '{manifest_path}'
920 CREDENTIALS '{credentials}'
921 FORMAT AS JSON 'auto'
922 MANIFEST;
923 "#,
924 table_name = table_name,
925 manifest_path = manifest_path,
926 credentials = credentials
927 ))
928}