1use core::num::NonZero;
16use std::fmt::Write;
17use std::time::Duration;
18
19use anyhow::anyhow;
20use bytes::BytesMut;
21use phf::{Set, phf_set};
22use risingwave_common::array::StreamChunk;
23use risingwave_common::catalog::{ColumnDesc, ColumnId, Field, Schema};
24use risingwave_common::types::DataType;
25use risingwave_pb::connector_service::sink_metadata::SerializedMetadata;
26use risingwave_pb::connector_service::{SinkMetadata, sink_metadata};
27use serde::Deserialize;
28use serde_json::json;
29use serde_with::{DisplayFromStr, serde_as};
30use thiserror_ext::AsReport;
31use tokio::sync::mpsc::{UnboundedSender, unbounded_channel};
32use tokio::time::{MissedTickBehavior, interval};
33use tonic::async_trait;
34use tracing::warn;
35use with_options::WithOptions;
36
37use crate::connector_common::IcebergSinkCompactionUpdate;
38use crate::enforce_secret::EnforceSecret;
39use crate::sink::coordinate::CoordinatedLogSinker;
40use crate::sink::file_sink::opendal_sink::FileSink;
41use crate::sink::file_sink::s3::{S3Common, S3Sink};
42use crate::sink::jdbc_jni_client::{self, JdbcJniClient};
43use crate::sink::remote::CoordinatedRemoteSinkWriter;
44use crate::sink::snowflake_redshift::{
45 __OP, __ROW_ID, AugmentedChunk, SnowflakeRedshiftSinkS3Writer, build_opendal_writer_path,
46};
47use crate::sink::writer::SinkWriter;
48use crate::sink::{
49 Result, SinglePhaseCommitCoordinator, Sink, SinkCommitCoordinator, SinkError, SinkParam,
50 SinkWriterMetrics,
51};
52
53pub const REDSHIFT_SINK: &str = "redshift";
54
55fn build_full_table_name(schema_name: Option<&str>, table_name: &str) -> String {
56 if let Some(schema_name) = schema_name {
57 format!(r#""{}"."{}""#, schema_name, table_name)
58 } else {
59 format!(r#""{}""#, table_name)
60 }
61}
62
63fn build_alter_add_column_sql(
64 schema_name: Option<&str>,
65 table_name: &str,
66 columns: &Vec<(String, String)>,
67) -> String {
68 let full_table_name = build_full_table_name(schema_name, table_name);
69 jdbc_jni_client::build_alter_add_column_sql(&full_table_name, columns, false)
71}
72
73#[serde_as]
74#[derive(Debug, Clone, Deserialize, WithOptions)]
75pub struct RedShiftConfig {
76 #[serde(rename = "jdbc.url")]
77 pub jdbc_url: String,
78
79 #[serde(rename = "user")]
80 pub username: Option<String>,
81
82 #[serde(rename = "password")]
83 pub password: Option<String>,
84
85 #[serde(rename = "schema")]
86 pub schema: Option<String>,
87
88 #[serde(rename = "table.name")]
89 pub table: String,
90
91 #[serde(rename = "intermediate.table.name")]
92 pub cdc_table: Option<String>,
93
94 #[serde(default)]
95 #[serde(rename = "create_table_if_not_exists")]
96 #[serde_as(as = "DisplayFromStr")]
97 pub create_table_if_not_exists: bool,
98
99 #[serde(default = "default_schedule")]
100 #[serde(rename = "write.target.interval.seconds")]
101 #[serde_as(as = "DisplayFromStr")]
102 pub schedule_seconds: u64,
103
104 #[serde(default = "default_batch_insert_rows")]
105 #[serde(rename = "batch.insert.rows")]
106 #[serde_as(as = "DisplayFromStr")]
107 pub batch_insert_rows: u32,
108
109 #[serde(default = "default_with_s3")]
110 #[serde(rename = "with_s3")]
111 #[serde_as(as = "DisplayFromStr")]
112 pub with_s3: bool,
113
114 #[serde(flatten)]
115 pub s3_inner: Option<S3Common>,
116}
117
118fn default_schedule() -> u64 {
119 3600 }
121
122fn default_batch_insert_rows() -> u32 {
123 4096 }
125
126fn default_with_s3() -> bool {
127 true
128}
129
130impl RedShiftConfig {
131 pub fn build_client(&self) -> Result<JdbcJniClient> {
132 let mut jdbc_url = self.jdbc_url.clone();
133 if let Some(username) = &self.username {
134 jdbc_url = format!("{}?user={}", jdbc_url, username);
135 }
136 if let Some(password) = &self.password {
137 jdbc_url = format!("{}&password={}", jdbc_url, password);
138 }
139 JdbcJniClient::new(jdbc_url)
140 }
141}
142
143#[derive(Debug)]
144pub struct RedshiftSink {
145 config: RedShiftConfig,
146 param: SinkParam,
147 is_append_only: bool,
148 schema: Schema,
149 pk_indices: Vec<usize>,
150}
151impl EnforceSecret for RedshiftSink {
152 const ENFORCE_SECRET_PROPERTIES: Set<&'static str> = phf_set! {
153 "user",
154 "password",
155 "jdbc.url"
156 };
157}
158
159impl TryFrom<SinkParam> for RedshiftSink {
160 type Error = SinkError;
161
162 fn try_from(param: SinkParam) -> std::result::Result<Self, Self::Error> {
163 let config = serde_json::from_value::<RedShiftConfig>(
164 serde_json::to_value(param.properties.clone()).unwrap(),
165 )
166 .map_err(|e| SinkError::Config(anyhow!(e)))?;
167 let is_append_only = param.sink_type.is_append_only();
168 let schema = param.schema();
169 let pk_indices = param.downstream_pk_or_empty();
170 Ok(Self {
171 config,
172 param,
173 is_append_only,
174 schema,
175 pk_indices,
176 })
177 }
178}
179
180impl Sink for RedshiftSink {
181 type LogSinker = CoordinatedLogSinker<RedShiftSinkWriter>;
182
183 const SINK_NAME: &'static str = REDSHIFT_SINK;
184
185 async fn validate(&self) -> Result<()> {
186 if self.config.create_table_if_not_exists {
187 let client = self.config.build_client()?;
188 let schema = self.param.schema();
189 let build_table_sql = build_create_table_sql(
190 self.config.schema.as_deref(),
191 &self.config.table,
192 &schema,
193 false,
194 )?;
195 client.execute_sql_sync(vec![build_table_sql]).await?;
196 if !self.is_append_only {
197 let cdc_table = self.config.cdc_table.as_ref().ok_or_else(|| {
198 SinkError::Config(anyhow!(
199 "intermediate.table.name is required for append-only sink"
200 ))
201 })?;
202 let build_cdc_table_sql = build_create_table_sql(
203 self.config.schema.as_deref(),
204 cdc_table,
205 &schema,
206 true,
207 )?;
208 client.execute_sql_sync(vec![build_cdc_table_sql]).await?;
209 }
210 }
211 Ok(())
212 }
213
214 fn support_schema_change() -> bool {
215 true
216 }
217
218 async fn new_log_sinker(
219 &self,
220 writer_param: crate::sink::SinkWriterParam,
221 ) -> Result<Self::LogSinker> {
222 let writer = RedShiftSinkWriter::new(
223 self.config.clone(),
224 self.is_append_only,
225 writer_param.clone(),
226 self.param.clone(),
227 )
228 .await?;
229 CoordinatedLogSinker::new(
230 &writer_param,
231 self.param.clone(),
232 writer,
233 NonZero::new(1).unwrap(),
234 )
235 .await
236 }
237
238 fn is_coordinated_sink(&self) -> bool {
239 true
240 }
241
242 async fn new_coordinator(
243 &self,
244 _iceberg_compact_stat_sender: Option<UnboundedSender<IcebergSinkCompactionUpdate>>,
245 ) -> Result<SinkCommitCoordinator> {
246 let pk_column_names: Vec<_> = self
247 .schema
248 .fields
249 .iter()
250 .enumerate()
251 .filter(|(index, _)| self.pk_indices.contains(index))
252 .map(|(_, field)| field.name.clone())
253 .collect();
254 if pk_column_names.is_empty() && !self.is_append_only {
255 return Err(SinkError::Config(anyhow!(
256 "Primary key columns not found. Please set the `primary_key` column in the sink properties, or ensure that the sink contains the primary key columns from the upstream."
257 )));
258 }
259 let all_column_names = self
260 .schema
261 .fields
262 .iter()
263 .map(|field| field.name.clone())
264 .collect();
265 let coordinator = RedshiftSinkCommitter::new(
266 self.config.clone(),
267 self.is_append_only,
268 &pk_column_names,
269 &all_column_names,
270 )?;
271 Ok(SinkCommitCoordinator::SinglePhase(Box::new(coordinator)))
272 }
273}
274
275pub enum RedShiftSinkWriter {
276 S3(SnowflakeRedshiftSinkS3Writer),
277 Jdbc(RedShiftSinkJdbcWriter),
278}
279
280impl RedShiftSinkWriter {
281 pub async fn new(
282 config: RedShiftConfig,
283 is_append_only: bool,
284 writer_param: super::SinkWriterParam,
285 param: SinkParam,
286 ) -> Result<Self> {
287 let schema = param.schema();
288 if config.with_s3 {
289 let executor_id = writer_param.executor_id;
290 let s3_writer = SnowflakeRedshiftSinkS3Writer::new(
291 config.s3_inner.ok_or_else(|| {
292 SinkError::Config(anyhow!("S3 configuration is required for S3 sink"))
293 })?,
294 schema,
295 is_append_only,
296 executor_id,
297 Some(config.table),
298 )?;
299 Ok(Self::S3(s3_writer))
300 } else {
301 let jdbc_writer =
302 RedShiftSinkJdbcWriter::new(config, is_append_only, writer_param, param).await?;
303 Ok(Self::Jdbc(jdbc_writer))
304 }
305 }
306}
307
308#[async_trait]
309impl SinkWriter for RedShiftSinkWriter {
310 type CommitMetadata = Option<SinkMetadata>;
311
312 async fn begin_epoch(&mut self, epoch: u64) -> Result<()> {
313 match self {
314 Self::S3(writer) => writer.begin_epoch(epoch),
315 Self::Jdbc(writer) => writer.begin_epoch(epoch).await,
316 }
317 }
318
319 async fn write_batch(&mut self, chunk: StreamChunk) -> Result<()> {
320 match self {
321 Self::S3(writer) => writer.write_batch(chunk).await,
322 Self::Jdbc(writer) => writer.write_batch(chunk).await,
323 }
324 }
325
326 async fn barrier(&mut self, is_checkpoint: bool) -> Result<Option<SinkMetadata>> {
327 let metadata = match self {
328 Self::S3(writer) => {
329 if let Some(path) = writer.barrier(is_checkpoint).await? {
330 path.into_bytes()
331 } else {
332 vec![]
333 }
334 }
335 Self::Jdbc(writer) => {
336 writer.barrier(is_checkpoint).await?;
337 vec![]
338 }
339 };
340 Ok(Some(SinkMetadata {
341 metadata: Some(sink_metadata::Metadata::Serialized(SerializedMetadata {
342 metadata,
343 })),
344 }))
345 }
346
347 async fn abort(&mut self) -> Result<()> {
348 if let Self::Jdbc(writer) = self {
349 writer.abort().await
350 } else {
351 Ok(())
352 }
353 }
354}
355
356pub struct RedShiftSinkJdbcWriter {
357 augmented_row: AugmentedChunk,
358 jdbc_sink_writer: CoordinatedRemoteSinkWriter,
359}
360
361impl RedShiftSinkJdbcWriter {
362 pub async fn new(
363 config: RedShiftConfig,
364 is_append_only: bool,
365 writer_param: super::SinkWriterParam,
366 mut param: SinkParam,
367 ) -> Result<Self> {
368 let metrics = SinkWriterMetrics::new(&writer_param);
369 let column_descs = &mut param.columns;
370 param.properties.remove("create_table_if_not_exists");
371 param.properties.remove("write.target.interval.seconds");
372 let full_table_name = if is_append_only {
373 config.table
374 } else {
375 let max_column_id = column_descs
376 .iter()
377 .map(|column| column.column_id.get_id())
378 .max()
379 .unwrap_or(0);
380 (*column_descs).push(ColumnDesc::named(
381 __ROW_ID,
382 ColumnId::new(max_column_id + 1),
383 DataType::Varchar,
384 ));
385 (*column_descs).push(ColumnDesc::named(
386 __OP,
387 ColumnId::new(max_column_id + 2),
388 DataType::Int32,
389 ));
390 config.cdc_table.ok_or_else(|| {
391 SinkError::Config(anyhow!(
392 "intermediate.table.name is required for non-append-only sink"
393 ))
394 })?
395 };
396 param.properties.remove("intermediate.table.name");
397 param.properties.remove("table.name");
398 param.properties.remove("with_s3");
399 if let Some(schema_name) = param.properties.remove("schema") {
400 param
401 .properties
402 .insert("schema.name".to_owned(), schema_name);
403 }
404 param
405 .properties
406 .insert("table.name".to_owned(), full_table_name.clone());
407 param
408 .properties
409 .insert("type".to_owned(), "append-only".to_owned());
410
411 let jdbc_sink_writer =
412 CoordinatedRemoteSinkWriter::new(param.clone(), metrics.clone()).await?;
413 Ok(Self {
414 augmented_row: AugmentedChunk::new(0, is_append_only),
415 jdbc_sink_writer,
416 })
417 }
418
419 async fn begin_epoch(&mut self, epoch: u64) -> Result<()> {
420 self.augmented_row.reset_epoch(epoch);
421 self.jdbc_sink_writer.begin_epoch(epoch).await?;
422 Ok(())
423 }
424
425 async fn write_batch(&mut self, chunk: StreamChunk) -> Result<()> {
426 let chunk = self.augmented_row.augmented_chunk(chunk)?;
427 self.jdbc_sink_writer.write_batch(chunk).await?;
428 Ok(())
429 }
430
431 async fn barrier(&mut self, is_checkpoint: bool) -> Result<()> {
432 self.jdbc_sink_writer.barrier(is_checkpoint).await?;
433 Ok(())
434 }
435
436 async fn abort(&mut self) -> Result<()> {
437 self.jdbc_sink_writer.abort().await?;
439 Ok(())
440 }
441}
442
443pub struct RedshiftSinkCommitter {
444 config: RedShiftConfig,
445 client: JdbcJniClient,
446 pk_column_names: Vec<String>,
447 all_column_names: Vec<String>,
448 schedule_seconds: u64,
449 is_append_only: bool,
450 periodic_task_handle: Option<tokio::task::JoinHandle<()>>,
451 shutdown_sender: Option<tokio::sync::mpsc::UnboundedSender<()>>,
452}
453
454impl RedshiftSinkCommitter {
455 pub fn new(
456 config: RedShiftConfig,
457 is_append_only: bool,
458 pk_column_names: &Vec<String>,
459 all_column_names: &Vec<String>,
460 ) -> Result<Self> {
461 let client = config.build_client()?;
462 let schedule_seconds = config.schedule_seconds;
463 let (periodic_task_handle, shutdown_sender) = if !is_append_only {
464 let schema_name = config.schema.clone();
465 let target_table_name = config.table.clone();
466 let cdc_table_name = config.cdc_table.clone().ok_or_else(|| {
467 SinkError::Config(anyhow!(
468 "intermediate.table.name is required for non-append-only sink"
469 ))
470 })?;
471 let (shutdown_sender, shutdown_receiver) = unbounded_channel();
473
474 let task_client = config.build_client()?;
476
477 let pk_column_names = pk_column_names.clone();
478 let all_column_names = all_column_names.clone();
479 let periodic_task_handle = tokio::spawn(async move {
481 Self::run_periodic_query_task(
482 task_client,
483 schema_name.as_deref(),
484 &cdc_table_name,
485 &target_table_name,
486 pk_column_names,
487 all_column_names,
488 schedule_seconds,
489 shutdown_receiver,
490 )
491 .await;
492 });
493 (Some(periodic_task_handle), Some(shutdown_sender))
494 } else {
495 (None, None)
496 };
497
498 Ok(Self {
499 client,
500 config,
501 pk_column_names: pk_column_names.clone(),
502 all_column_names: all_column_names.clone(),
503 is_append_only,
504 schedule_seconds,
505 periodic_task_handle,
506 shutdown_sender,
507 })
508 }
509
510 async fn run_periodic_query_task(
512 client: JdbcJniClient,
513 schema_name: Option<&str>,
514 cdc_table_name: &str,
515 target_table_name: &str,
516 pk_column_names: Vec<String>,
517 all_column_names: Vec<String>,
518 schedule_seconds: u64,
519 mut shutdown_receiver: tokio::sync::mpsc::UnboundedReceiver<()>,
520 ) {
521 let mut interval_timer = interval(Duration::from_secs(schedule_seconds)); interval_timer.set_missed_tick_behavior(MissedTickBehavior::Skip);
523 let sql = build_create_merge_into_task_sql(
524 schema_name,
525 cdc_table_name,
526 target_table_name,
527 &pk_column_names,
528 &all_column_names,
529 );
530 loop {
531 tokio::select! {
532 _ = shutdown_receiver.recv() => {
534 tracing::info!("Periodic query task received shutdown signal, stopping");
535 break;
536 }
537 _ = interval_timer.tick() => {
539
540 match client.execute_sql_sync(sql.clone()).await {
541 Ok(_) => {
542 tracing::info!("Periodic query executed successfully for table: {}", target_table_name);
543 }
544 Err(e) => {
545 tracing::warn!("Failed to execute periodic query for table {}: {}", target_table_name, e.as_report());
546 }
547 }
548 }
549 }
550 }
551 }
552}
553
554impl Drop for RedshiftSinkCommitter {
555 fn drop(&mut self) {
556 if let Some(shutdown_sender) = &self.shutdown_sender
558 && let Err(e) = shutdown_sender.send(())
559 {
560 tracing::warn!(
561 "Failed to send shutdown signal to periodic task: {}",
562 e.as_report()
563 );
564 }
565 tracing::info!("RedshiftSinkCommitter dropped, periodic task stopped");
566 }
567}
568
569#[async_trait]
570impl SinglePhaseCommitCoordinator for RedshiftSinkCommitter {
571 async fn init(&mut self) -> Result<()> {
572 Ok(())
573 }
574
575 async fn commit(
576 &mut self,
577 _epoch: u64,
578 metadata: Vec<SinkMetadata>,
579 add_columns: Option<Vec<Field>>,
580 ) -> Result<()> {
581 let paths = metadata
582 .into_iter()
583 .filter(|m| {
584 if let Some(sink_metadata::Metadata::Serialized(SerializedMetadata { metadata })) =
585 &m.metadata
586 {
587 !metadata.is_empty()
588 } else {
589 false
590 }
591 })
592 .map(|metadata| {
593 let path = if let Some(sink_metadata::Metadata::Serialized(SerializedMetadata {
594 metadata,
595 })) = metadata.metadata
596 {
597 String::from_utf8(metadata).map_err(|e| SinkError::Config(anyhow!(e)))
598 } else {
599 Err(SinkError::Config(anyhow!("Invalid metadata format")))
600 }?;
601 Ok(json!({
602 "url": path,
603 "mandatory": true
604 }))
605 })
606 .collect::<Result<Vec<_>>>()?;
607 if !paths.is_empty() {
608 let s3_inner = self.config.s3_inner.as_ref().ok_or_else(|| {
609 SinkError::Config(anyhow!("S3 configuration is required for S3 sink"))
610 })?;
611 let s3_operator = FileSink::<S3Sink>::new_s3_sink(s3_inner)?;
612 let (mut writer, path) =
613 build_opendal_writer_path(s3_inner, 0, &s3_operator, &None).await?;
614 let manifest_json = json!({
615 "entries": paths
616 });
617 let mut chunk_buf = BytesMut::new();
618 writeln!(chunk_buf, "{}", manifest_json).unwrap();
619 writer.write(chunk_buf.freeze()).await?;
620 writer
621 .close()
622 .await
623 .map_err(|e| SinkError::File(e.to_report_string()))?;
624 let table = if self.is_append_only {
625 &self.config.table
626 } else {
627 self.config.cdc_table.as_ref().ok_or_else(|| {
628 SinkError::Config(anyhow!(
629 "intermediate.table.name is required for non-append-only sink"
630 ))
631 })?
632 };
633 let s3_inner = self.config.s3_inner.as_ref().ok_or_else(|| {
634 SinkError::Config(anyhow!("S3 configuration is required for S3 sink"))
635 })?;
636 let copy_into_sql = build_copy_into_sql(
637 self.config.schema.as_deref(),
638 table,
639 &path,
640 &s3_inner.access,
641 &s3_inner.secret,
642 &s3_inner.assume_role,
643 )?;
644 self.client.execute_sql_sync(vec![copy_into_sql]).await?;
646 }
647
648 if let Some(add_columns) = add_columns {
649 if let Some(shutdown_sender) = &self.shutdown_sender {
650 shutdown_sender
652 .send(())
653 .map_err(|e| SinkError::Config(anyhow!(e)))?;
654 }
655 let sql = build_alter_add_column_sql(
656 self.config.schema.as_deref(),
657 &self.config.table,
658 &add_columns
659 .iter()
660 .map(|f| (f.name.clone(), f.data_type.to_string()))
661 .collect::<Vec<_>>(),
662 );
663 let check_column_exists = |e: anyhow::Error| {
664 let err_str = e.to_report_string();
665 if regex::Regex::new(".+ of relation .+ already exists")
666 .unwrap()
667 .find(&err_str)
668 .is_none()
669 {
670 return Err(e);
671 }
672 warn!("redshift sink columns already exists. skipped");
673 Ok(())
674 };
675 self.client
676 .execute_sql_sync(vec![sql.clone()])
677 .await
678 .or_else(check_column_exists)?;
679 if !self.is_append_only {
680 let cdc_table_name = self.config.cdc_table.as_ref().ok_or_else(|| {
681 SinkError::Config(anyhow!(
682 "intermediate.table.name is required for non-append-only sink"
683 ))
684 })?;
685 let sql = build_alter_add_column_sql(
686 self.config.schema.as_deref(),
687 cdc_table_name,
688 &add_columns
689 .iter()
690 .map(|f| (f.name.clone(), f.data_type.to_string()))
691 .collect::<Vec<_>>(),
692 );
693 self.client
694 .execute_sql_sync(vec![sql.clone()])
695 .await
696 .or_else(check_column_exists)?;
697 self.all_column_names
698 .extend(add_columns.iter().map(|f| f.name.clone()));
699
700 if let Some(shutdown_sender) = self.shutdown_sender.take() {
701 let _ = shutdown_sender.send(());
702 }
703 if let Some(periodic_task_handle) = self.periodic_task_handle.take() {
704 let _ = periodic_task_handle.await;
705 }
706
707 let (shutdown_sender, shutdown_receiver) = unbounded_channel();
708 let client = self.client.clone();
709 let schema_name = self.config.schema.clone();
710 let cdc_table_name = self.config.cdc_table.clone().unwrap();
711 let target_table_name = self.config.table.clone();
712 let pk_column_names = self.pk_column_names.clone();
713 let all_column_names = self.all_column_names.clone();
714 let schedule_seconds = self.schedule_seconds;
715 let periodic_task_handle = tokio::spawn(async move {
716 Self::run_periodic_query_task(
717 client,
718 schema_name.as_deref(),
719 &cdc_table_name,
720 &target_table_name,
721 pk_column_names,
722 all_column_names,
723 schedule_seconds,
724 shutdown_receiver,
725 )
726 .await;
727 });
728 self.shutdown_sender = Some(shutdown_sender);
729 self.periodic_task_handle = Some(periodic_task_handle);
730 }
731 }
732 Ok(())
733 }
734}
735
736pub fn build_create_table_sql(
737 schema_name: Option<&str>,
738 table_name: &str,
739 schema: &Schema,
740 need_op_and_row_id: bool,
741) -> Result<String> {
742 let mut columns: Vec<String> = schema
743 .fields
744 .iter()
745 .map(|field| {
746 let data_type = convert_redshift_data_type(&field.data_type)?;
747 Ok(format!("{} {}", field.name, data_type))
748 })
749 .collect::<Result<Vec<String>>>()?;
750 if need_op_and_row_id {
751 columns.push(format!("{} VARCHAR(MAX)", __ROW_ID));
752 columns.push(format!("{} INT", __OP));
753 }
754 let columns_str = columns.join(", ");
755 let full_table_name = build_full_table_name(schema_name, table_name);
756 Ok(format!(
757 "CREATE TABLE IF NOT EXISTS {} ({})",
758 full_table_name, columns_str
759 ))
760}
761
762fn convert_redshift_data_type(data_type: &DataType) -> Result<String> {
763 let data_type = match data_type {
764 DataType::Int16 => "SMALLINT".to_owned(),
765 DataType::Int32 => "INTEGER".to_owned(),
766 DataType::Int64 => "BIGINT".to_owned(),
767 DataType::Float32 => "REAL".to_owned(),
768 DataType::Float64 => "FLOAT".to_owned(),
769 DataType::Boolean => "BOOLEAN".to_owned(),
770 DataType::Varchar => "VARCHAR(MAX)".to_owned(),
771 DataType::Date => "DATE".to_owned(),
772 DataType::Timestamp => "TIMESTAMP".to_owned(),
773 DataType::Timestamptz => "TIMESTAMPTZ".to_owned(),
774 DataType::Jsonb => "VARCHAR(MAX)".to_owned(),
775 DataType::Decimal => "DECIMAL".to_owned(),
776 DataType::Time => "TIME".to_owned(),
777 _ => {
778 return Err(SinkError::Config(anyhow!(
779 "Dont support auto create table for datatype: {}",
780 data_type
781 )));
782 }
783 };
784 Ok(data_type)
785}
786
787fn build_create_merge_into_task_sql(
788 schema_name: Option<&str>,
789 cdc_table_name: &str,
790 target_table_name: &str,
791 pk_column_names: &Vec<String>,
792 all_column_names: &Vec<String>,
793) -> Vec<String> {
794 let cdc_table_name = build_full_table_name(schema_name, cdc_table_name);
795 let target_table_name = build_full_table_name(schema_name, target_table_name);
796 let pk_names_str = pk_column_names.join(", ");
797 let pk_names_eq_str = pk_column_names
798 .iter()
799 .map(|name| format!("{target_table_name}.{name} = source.{name}", name = name))
800 .collect::<Vec<String>>()
801 .join(" AND ");
802 let all_column_names_set_str = all_column_names
803 .iter()
804 .map(|name| format!("{name} = source.{name}", name = name))
805 .collect::<Vec<String>>()
806 .join(", ");
807 let all_column_names_str = all_column_names.join(", ");
808 let all_column_names_insert_str = all_column_names
809 .iter()
810 .map(|name| format!("source.{name}", name = name))
811 .collect::<Vec<String>>()
812 .join(", ");
813
814 vec![
815 format!(
816 r#"
817 CREATE TEMP TABLE max_id_table AS
818 SELECT COALESCE(MAX({redshift_sink_row_id}), '0') AS max_row_id
819 FROM {cdc_table_name};
820 "#,
821 redshift_sink_row_id = __ROW_ID,
822 cdc_table_name = cdc_table_name,
823 ),
824 format!(
825 r#"
826 DELETE FROM {target_table_name}
827 USING (
828 SELECT *
829 FROM (
830 SELECT *, ROW_NUMBER() OVER (
831 PARTITION BY {pk_names_str}
832 ORDER BY {redshift_sink_row_id} DESC
833 ) AS dedupe_id
834 FROM {cdc_table_name}, max_id_table
835 WHERE {cdc_table_name}.{redshift_sink_row_id} <= max_id_table.max_row_id
836 ) AS subquery
837 WHERE dedupe_id = 1 AND {redshift_sink_op} IN (2, 4)
838 ) AS source
839 WHERE {pk_names_eq_str};
840 "#,
841 target_table_name = target_table_name,
842 pk_names_str = pk_names_str,
843 redshift_sink_row_id = __ROW_ID,
844 cdc_table_name = cdc_table_name,
845 redshift_sink_op = __OP,
846 pk_names_eq_str = pk_names_eq_str,
847 ),
848 format!(
849 r#"
850 MERGE INTO {target_table_name}
851 USING (
852 SELECT *
853 FROM (
854 SELECT *, ROW_NUMBER() OVER (
855 PARTITION BY {pk_names_str}
856 ORDER BY {redshift_sink_row_id} DESC
857 ) AS dedupe_id
858 FROM {cdc_table_name}, max_id_table
859 WHERE {cdc_table_name}.{redshift_sink_row_id} <= max_id_table.max_row_id
860 ) AS subquery
861 WHERE dedupe_id = 1 AND {redshift_sink_op} IN (1, 3)
862 ) AS source
863 ON {pk_names_eq_str}
864 WHEN MATCHED THEN
865 UPDATE SET {all_column_names_set_str}
866 WHEN NOT MATCHED THEN
867 INSERT ({all_column_names_str}) VALUES ({all_column_names_insert_str});
868 "#,
869 target_table_name = target_table_name,
870 pk_names_str = pk_names_str,
871 redshift_sink_row_id = __ROW_ID,
872 cdc_table_name = cdc_table_name,
873 redshift_sink_op = __OP,
874 pk_names_eq_str = pk_names_eq_str,
875 all_column_names_set_str = all_column_names_set_str,
876 all_column_names_str = all_column_names_str,
877 all_column_names_insert_str = all_column_names_insert_str,
878 ),
879 format!(
880 r#"
881 DELETE FROM {cdc_table_name}
882 USING max_id_table
883 WHERE {cdc_table_name}.{redshift_sink_row_id} <= max_id_table.max_row_id;
884 "#,
885 cdc_table_name = cdc_table_name,
886 redshift_sink_row_id = __ROW_ID,
887 ),
888 "DROP TABLE IF EXISTS max_id_table;".to_owned(),
889 ]
890}
891
892fn build_copy_into_sql(
893 schema_name: Option<&str>,
894 table_name: &str,
895 manifest_path: &str,
896 access_key: &Option<String>,
897 secret_key: &Option<String>,
898 assume_role: &Option<String>,
899) -> Result<String> {
900 let table_name = build_full_table_name(schema_name, table_name);
901 let credentials = if let Some(assume_role) = assume_role {
902 &format!("aws_iam_role={}", assume_role)
903 } else if let (Some(access_key), Some(secret_key)) = (access_key, secret_key) {
904 &format!(
905 "aws_access_key_id={};aws_secret_access_key={}",
906 access_key, secret_key
907 )
908 } else {
909 return Err(SinkError::Config(anyhow!(
910 "Either assume_role or access_key and secret_key must be provided for Redshift COPY command"
911 )));
912 };
913 Ok(format!(
914 r#"
915 COPY {table_name}
916 FROM '{manifest_path}'
917 CREDENTIALS '{credentials}'
918 FORMAT AS JSON 'auto'
919 DATEFORMAT 'auto'
920 TIMEFORMAT 'auto'
921 MANIFEST;
922 "#,
923 table_name = table_name,
924 manifest_path = manifest_path,
925 credentials = credentials
926 ))
927}