1use core::num::NonZero;
16use std::fmt::Write;
17use std::time::Duration;
18
19use anyhow::anyhow;
20use bytes::BytesMut;
21use itertools::Itertools;
22use phf::{Set, phf_set};
23use risingwave_common::array::StreamChunk;
24use risingwave_common::catalog::{ColumnDesc, ColumnId, Schema};
25use risingwave_common::types::DataType;
26use risingwave_pb::connector_service::sink_metadata::SerializedMetadata;
27use risingwave_pb::connector_service::{SinkMetadata, sink_metadata};
28use risingwave_pb::stream_plan::PbSinkSchemaChange;
29use serde::Deserialize;
30use serde_json::json;
31use serde_with::{DisplayFromStr, serde_as};
32use thiserror_ext::AsReport;
33use tokio::sync::mpsc::{UnboundedSender, unbounded_channel};
34use tokio::time::{MissedTickBehavior, interval};
35use tonic::async_trait;
36use tracing::warn;
37use with_options::WithOptions;
38
39use crate::connector_common::IcebergSinkCompactionUpdate;
40use crate::enforce_secret::EnforceSecret;
41use crate::sink::coordinate::CoordinatedLogSinker;
42use crate::sink::file_sink::opendal_sink::FileSink;
43use crate::sink::file_sink::s3::{S3Common, S3Sink};
44use crate::sink::jdbc_jni_client::{self, JdbcJniClient};
45use crate::sink::remote::CoordinatedRemoteSinkWriter;
46use crate::sink::snowflake_redshift::{
47 __OP, __ROW_ID, AugmentedChunk, SnowflakeRedshiftSinkS3Writer, build_opendal_writer_path,
48};
49use crate::sink::writer::SinkWriter;
50use crate::sink::{
51 Result, SinglePhaseCommitCoordinator, Sink, SinkCommitCoordinator, SinkError, SinkParam,
52 SinkWriterMetrics,
53};
54
55pub const REDSHIFT_SINK: &str = "redshift";
56
57fn build_full_table_name(schema_name: Option<&str>, table_name: &str) -> String {
58 if let Some(schema_name) = schema_name {
59 format!(r#""{}"."{}""#, schema_name, table_name)
60 } else {
61 format!(r#""{}""#, table_name)
62 }
63}
64
65fn build_alter_add_column_sql(
66 schema_name: Option<&str>,
67 table_name: &str,
68 columns: &Vec<(String, String)>,
69) -> String {
70 let full_table_name = build_full_table_name(schema_name, table_name);
71 jdbc_jni_client::build_alter_add_column_sql(&full_table_name, columns, false)
73}
74
75#[serde_as]
76#[derive(Debug, Clone, Deserialize, WithOptions)]
77pub struct RedShiftConfig {
78 #[serde(rename = "jdbc.url")]
79 pub jdbc_url: String,
80
81 #[serde(rename = "user")]
82 pub username: Option<String>,
83
84 #[serde(rename = "password")]
85 pub password: Option<String>,
86
87 #[serde(rename = "schema")]
88 pub schema: Option<String>,
89
90 #[serde(rename = "table.name")]
91 pub table: String,
92
93 #[serde(rename = "intermediate.table.name")]
94 pub cdc_table: Option<String>,
95
96 #[serde(default)]
97 #[serde(rename = "create_table_if_not_exists")]
98 #[serde_as(as = "DisplayFromStr")]
99 pub create_table_if_not_exists: bool,
100
101 #[serde(default = "default_schedule")]
102 #[serde(rename = "write.target.interval.seconds")]
103 #[serde_as(as = "DisplayFromStr")]
104 pub schedule_seconds: u64,
105
106 #[serde(default = "default_batch_insert_rows")]
107 #[serde(rename = "batch.insert.rows")]
108 #[serde_as(as = "DisplayFromStr")]
109 pub batch_insert_rows: u32,
110
111 #[serde(default = "default_with_s3")]
112 #[serde(rename = "with_s3")]
113 #[serde_as(as = "DisplayFromStr")]
114 pub with_s3: bool,
115
116 #[serde(flatten)]
117 pub s3_inner: Option<S3Common>,
118}
119
120fn default_schedule() -> u64 {
121 3600 }
123
124fn default_batch_insert_rows() -> u32 {
125 4096 }
127
128fn default_with_s3() -> bool {
129 true
130}
131
132impl RedShiftConfig {
133 pub fn build_client(&self) -> Result<JdbcJniClient> {
134 let mut jdbc_url = self.jdbc_url.clone();
135 if let Some(username) = &self.username {
136 jdbc_url = format!("{}?user={}", jdbc_url, username);
137 }
138 if let Some(password) = &self.password {
139 jdbc_url = format!("{}&password={}", jdbc_url, password);
140 }
141 JdbcJniClient::new(jdbc_url)
142 }
143}
144
145#[derive(Debug)]
146pub struct RedshiftSink {
147 config: RedShiftConfig,
148 param: SinkParam,
149 is_append_only: bool,
150 schema: Schema,
151 pk_indices: Vec<usize>,
152}
153impl EnforceSecret for RedshiftSink {
154 const ENFORCE_SECRET_PROPERTIES: Set<&'static str> = phf_set! {
155 "user",
156 "password",
157 "jdbc.url"
158 };
159}
160
161impl TryFrom<SinkParam> for RedshiftSink {
162 type Error = SinkError;
163
164 fn try_from(param: SinkParam) -> std::result::Result<Self, Self::Error> {
165 let config = serde_json::from_value::<RedShiftConfig>(
166 serde_json::to_value(param.properties.clone()).unwrap(),
167 )
168 .map_err(|e| SinkError::Config(anyhow!(e)))?;
169 let is_append_only = param.sink_type.is_append_only();
170 let schema = param.schema();
171 let pk_indices = param.downstream_pk_or_empty();
172 Ok(Self {
173 config,
174 param,
175 is_append_only,
176 schema,
177 pk_indices,
178 })
179 }
180}
181
182impl Sink for RedshiftSink {
183 type LogSinker = CoordinatedLogSinker<RedShiftSinkWriter>;
184
185 const SINK_NAME: &'static str = REDSHIFT_SINK;
186
187 async fn validate(&self) -> Result<()> {
188 if self.config.create_table_if_not_exists {
189 let client = self.config.build_client()?;
190 let schema = self.param.schema();
191 let build_table_sql = build_create_table_sql(
192 self.config.schema.as_deref(),
193 &self.config.table,
194 &schema,
195 false,
196 )?;
197 client.execute_sql_sync(vec![build_table_sql]).await?;
198 if !self.is_append_only {
199 let cdc_table = self.config.cdc_table.as_ref().ok_or_else(|| {
200 SinkError::Config(anyhow!(
201 "intermediate.table.name is required for append-only sink"
202 ))
203 })?;
204 let build_cdc_table_sql = build_create_table_sql(
205 self.config.schema.as_deref(),
206 cdc_table,
207 &schema,
208 true,
209 )?;
210 client.execute_sql_sync(vec![build_cdc_table_sql]).await?;
211 }
212 }
213 Ok(())
214 }
215
216 fn support_schema_change() -> bool {
217 true
218 }
219
220 async fn new_log_sinker(
221 &self,
222 writer_param: crate::sink::SinkWriterParam,
223 ) -> Result<Self::LogSinker> {
224 let writer = RedShiftSinkWriter::new(
225 self.config.clone(),
226 self.is_append_only,
227 writer_param.clone(),
228 self.param.clone(),
229 )
230 .await?;
231 CoordinatedLogSinker::new(
232 &writer_param,
233 self.param.clone(),
234 writer,
235 NonZero::new(1).unwrap(),
236 )
237 .await
238 }
239
240 fn is_coordinated_sink(&self) -> bool {
241 true
242 }
243
244 async fn new_coordinator(
245 &self,
246 _iceberg_compact_stat_sender: Option<UnboundedSender<IcebergSinkCompactionUpdate>>,
247 ) -> Result<SinkCommitCoordinator> {
248 let pk_column_names: Vec<_> = self
249 .schema
250 .fields
251 .iter()
252 .enumerate()
253 .filter(|(index, _)| self.pk_indices.contains(index))
254 .map(|(_, field)| field.name.clone())
255 .collect();
256 if pk_column_names.is_empty() && !self.is_append_only {
257 return Err(SinkError::Config(anyhow!(
258 "Primary key columns not found. Please set the `primary_key` column in the sink properties, or ensure that the sink contains the primary key columns from the upstream."
259 )));
260 }
261 let all_column_names = self
262 .schema
263 .fields
264 .iter()
265 .map(|field| field.name.clone())
266 .collect();
267 let coordinator = RedshiftSinkCommitter::new(
268 self.config.clone(),
269 self.is_append_only,
270 &pk_column_names,
271 &all_column_names,
272 )?;
273 Ok(SinkCommitCoordinator::SinglePhase(Box::new(coordinator)))
274 }
275}
276
277pub enum RedShiftSinkWriter {
278 S3(SnowflakeRedshiftSinkS3Writer),
279 Jdbc(RedShiftSinkJdbcWriter),
280}
281
282impl RedShiftSinkWriter {
283 pub async fn new(
284 config: RedShiftConfig,
285 is_append_only: bool,
286 writer_param: super::SinkWriterParam,
287 param: SinkParam,
288 ) -> Result<Self> {
289 let schema = param.schema();
290 if config.with_s3 {
291 let executor_id = writer_param.executor_id;
292 let s3_writer = SnowflakeRedshiftSinkS3Writer::new(
293 config.s3_inner.ok_or_else(|| {
294 SinkError::Config(anyhow!("S3 configuration is required for S3 sink"))
295 })?,
296 schema,
297 is_append_only,
298 executor_id,
299 Some(config.table),
300 )?;
301 Ok(Self::S3(s3_writer))
302 } else {
303 let jdbc_writer =
304 RedShiftSinkJdbcWriter::new(config, is_append_only, writer_param, param).await?;
305 Ok(Self::Jdbc(jdbc_writer))
306 }
307 }
308}
309
310#[async_trait]
311impl SinkWriter for RedShiftSinkWriter {
312 type CommitMetadata = Option<SinkMetadata>;
313
314 async fn begin_epoch(&mut self, epoch: u64) -> Result<()> {
315 match self {
316 Self::S3(writer) => writer.begin_epoch(epoch),
317 Self::Jdbc(writer) => writer.begin_epoch(epoch).await,
318 }
319 }
320
321 async fn write_batch(&mut self, chunk: StreamChunk) -> Result<()> {
322 match self {
323 Self::S3(writer) => writer.write_batch(chunk).await,
324 Self::Jdbc(writer) => writer.write_batch(chunk).await,
325 }
326 }
327
328 async fn barrier(&mut self, is_checkpoint: bool) -> Result<Option<SinkMetadata>> {
329 let metadata = match self {
330 Self::S3(writer) => {
331 if let Some(path) = writer.barrier(is_checkpoint).await? {
332 path.into_bytes()
333 } else {
334 vec![]
335 }
336 }
337 Self::Jdbc(writer) => {
338 writer.barrier(is_checkpoint).await?;
339 vec![]
340 }
341 };
342 Ok(Some(SinkMetadata {
343 metadata: Some(sink_metadata::Metadata::Serialized(SerializedMetadata {
344 metadata,
345 })),
346 }))
347 }
348
349 async fn abort(&mut self) -> Result<()> {
350 if let Self::Jdbc(writer) = self {
351 writer.abort().await
352 } else {
353 Ok(())
354 }
355 }
356}
357
358pub struct RedShiftSinkJdbcWriter {
359 augmented_row: AugmentedChunk,
360 jdbc_sink_writer: CoordinatedRemoteSinkWriter,
361}
362
363impl RedShiftSinkJdbcWriter {
364 pub async fn new(
365 config: RedShiftConfig,
366 is_append_only: bool,
367 writer_param: super::SinkWriterParam,
368 mut param: SinkParam,
369 ) -> Result<Self> {
370 let metrics = SinkWriterMetrics::new(&writer_param);
371 let column_descs = &mut param.columns;
372 param.properties.remove("create_table_if_not_exists");
373 param.properties.remove("write.target.interval.seconds");
374 let full_table_name = if is_append_only {
375 config.table
376 } else {
377 let max_column_id = column_descs
378 .iter()
379 .map(|column| column.column_id.get_id())
380 .max()
381 .unwrap_or(0);
382 (*column_descs).push(ColumnDesc::named(
383 __ROW_ID,
384 ColumnId::new(max_column_id + 1),
385 DataType::Varchar,
386 ));
387 (*column_descs).push(ColumnDesc::named(
388 __OP,
389 ColumnId::new(max_column_id + 2),
390 DataType::Int32,
391 ));
392 config.cdc_table.ok_or_else(|| {
393 SinkError::Config(anyhow!(
394 "intermediate.table.name is required for non-append-only sink"
395 ))
396 })?
397 };
398 param.properties.remove("intermediate.table.name");
399 param.properties.remove("table.name");
400 param.properties.remove("with_s3");
401 if let Some(schema_name) = param.properties.remove("schema") {
402 param
403 .properties
404 .insert("schema.name".to_owned(), schema_name);
405 }
406 param
407 .properties
408 .insert("table.name".to_owned(), full_table_name.clone());
409 param
410 .properties
411 .insert("type".to_owned(), "append-only".to_owned());
412
413 let jdbc_sink_writer =
414 CoordinatedRemoteSinkWriter::new(param.clone(), metrics.clone()).await?;
415 Ok(Self {
416 augmented_row: AugmentedChunk::new(0, is_append_only),
417 jdbc_sink_writer,
418 })
419 }
420
421 async fn begin_epoch(&mut self, epoch: u64) -> Result<()> {
422 self.augmented_row.reset_epoch(epoch);
423 self.jdbc_sink_writer.begin_epoch(epoch).await?;
424 Ok(())
425 }
426
427 async fn write_batch(&mut self, chunk: StreamChunk) -> Result<()> {
428 let chunk = self.augmented_row.augmented_chunk(chunk)?;
429 self.jdbc_sink_writer.write_batch(chunk).await?;
430 Ok(())
431 }
432
433 async fn barrier(&mut self, is_checkpoint: bool) -> Result<()> {
434 self.jdbc_sink_writer.barrier(is_checkpoint).await?;
435 Ok(())
436 }
437
438 async fn abort(&mut self) -> Result<()> {
439 self.jdbc_sink_writer.abort().await?;
441 Ok(())
442 }
443}
444
445pub struct RedshiftSinkCommitter {
446 config: RedShiftConfig,
447 client: JdbcJniClient,
448 pk_column_names: Vec<String>,
449 all_column_names: Vec<String>,
450 schedule_seconds: u64,
451 is_append_only: bool,
452 periodic_task_handle: Option<tokio::task::JoinHandle<()>>,
453 shutdown_sender: Option<tokio::sync::mpsc::UnboundedSender<()>>,
454}
455
456impl RedshiftSinkCommitter {
457 pub fn new(
458 config: RedShiftConfig,
459 is_append_only: bool,
460 pk_column_names: &Vec<String>,
461 all_column_names: &Vec<String>,
462 ) -> Result<Self> {
463 let client = config.build_client()?;
464 let schedule_seconds = config.schedule_seconds;
465 let (periodic_task_handle, shutdown_sender) = if !is_append_only {
466 let schema_name = config.schema.clone();
467 let target_table_name = config.table.clone();
468 let cdc_table_name = config.cdc_table.clone().ok_or_else(|| {
469 SinkError::Config(anyhow!(
470 "intermediate.table.name is required for non-append-only sink"
471 ))
472 })?;
473 let (shutdown_sender, shutdown_receiver) = unbounded_channel();
475
476 let task_client = config.build_client()?;
478
479 let pk_column_names = pk_column_names.clone();
480 let all_column_names = all_column_names.clone();
481 let periodic_task_handle = tokio::spawn(async move {
483 Self::run_periodic_query_task(
484 task_client,
485 schema_name.as_deref(),
486 &cdc_table_name,
487 &target_table_name,
488 pk_column_names,
489 all_column_names,
490 schedule_seconds,
491 shutdown_receiver,
492 )
493 .await;
494 });
495 (Some(periodic_task_handle), Some(shutdown_sender))
496 } else {
497 (None, None)
498 };
499
500 Ok(Self {
501 client,
502 config,
503 pk_column_names: pk_column_names.clone(),
504 all_column_names: all_column_names.clone(),
505 is_append_only,
506 schedule_seconds,
507 periodic_task_handle,
508 shutdown_sender,
509 })
510 }
511
512 async fn run_periodic_query_task(
514 client: JdbcJniClient,
515 schema_name: Option<&str>,
516 cdc_table_name: &str,
517 target_table_name: &str,
518 pk_column_names: Vec<String>,
519 all_column_names: Vec<String>,
520 schedule_seconds: u64,
521 mut shutdown_receiver: tokio::sync::mpsc::UnboundedReceiver<()>,
522 ) {
523 let mut interval_timer = interval(Duration::from_secs(schedule_seconds)); interval_timer.set_missed_tick_behavior(MissedTickBehavior::Skip);
525 let sql = build_create_merge_into_task_sql(
526 schema_name,
527 cdc_table_name,
528 target_table_name,
529 &pk_column_names,
530 &all_column_names,
531 );
532 loop {
533 tokio::select! {
534 _ = shutdown_receiver.recv() => {
536 tracing::info!("Periodic query task received shutdown signal, stopping");
537 break;
538 }
539 _ = interval_timer.tick() => {
541
542 match client.execute_sql_sync(sql.clone()).await {
543 Ok(_) => {
544 tracing::info!("Periodic query executed successfully for table: {}", target_table_name);
545 }
546 Err(e) => {
547 tracing::warn!("Failed to execute periodic query for table {}: {}", target_table_name, e.as_report());
548 }
549 }
550 }
551 }
552 }
553 }
554}
555
556impl Drop for RedshiftSinkCommitter {
557 fn drop(&mut self) {
558 if let Some(shutdown_sender) = &self.shutdown_sender
560 && let Err(e) = shutdown_sender.send(())
561 {
562 tracing::warn!(
563 "Failed to send shutdown signal to periodic task: {}",
564 e.as_report()
565 );
566 }
567 tracing::info!("RedshiftSinkCommitter dropped, periodic task stopped");
568 }
569}
570
571#[async_trait]
572impl SinglePhaseCommitCoordinator for RedshiftSinkCommitter {
573 async fn init(&mut self) -> Result<()> {
574 Ok(())
575 }
576
577 async fn commit_data(&mut self, _epoch: u64, metadata: Vec<SinkMetadata>) -> Result<()> {
578 let paths = metadata
579 .into_iter()
580 .filter(|m| {
581 if let Some(sink_metadata::Metadata::Serialized(SerializedMetadata { metadata })) =
582 &m.metadata
583 {
584 !metadata.is_empty()
585 } else {
586 false
587 }
588 })
589 .map(|metadata| {
590 let path = if let Some(sink_metadata::Metadata::Serialized(SerializedMetadata {
591 metadata,
592 })) = metadata.metadata
593 {
594 String::from_utf8(metadata).map_err(|e| SinkError::Config(anyhow!(e)))
595 } else {
596 Err(SinkError::Config(anyhow!("Invalid metadata format")))
597 }?;
598 Ok(json!({
599 "url": path,
600 "mandatory": true
601 }))
602 })
603 .collect::<Result<Vec<_>>>()?;
604 if !paths.is_empty() {
605 let s3_inner = self.config.s3_inner.as_ref().ok_or_else(|| {
606 SinkError::Config(anyhow!("S3 configuration is required for S3 sink"))
607 })?;
608 let s3_operator = FileSink::<S3Sink>::new_s3_sink(s3_inner)?;
609 let (mut writer, path) =
610 build_opendal_writer_path(s3_inner, 0.into(), &s3_operator, &None).await?;
611 let manifest_json = json!({
612 "entries": paths
613 });
614 let mut chunk_buf = BytesMut::new();
615 writeln!(chunk_buf, "{}", manifest_json).unwrap();
616 writer.write(chunk_buf.freeze()).await?;
617 writer
618 .close()
619 .await
620 .map_err(|e| SinkError::File(e.to_report_string()))?;
621 let table = if self.is_append_only {
622 &self.config.table
623 } else {
624 self.config.cdc_table.as_ref().ok_or_else(|| {
625 SinkError::Config(anyhow!(
626 "intermediate.table.name is required for non-append-only sink"
627 ))
628 })?
629 };
630 let s3_inner = self.config.s3_inner.as_ref().ok_or_else(|| {
631 SinkError::Config(anyhow!("S3 configuration is required for S3 sink"))
632 })?;
633 let copy_into_sql = build_copy_into_sql(
634 self.config.schema.as_deref(),
635 table,
636 &path,
637 &s3_inner.access,
638 &s3_inner.secret,
639 &s3_inner.assume_role,
640 )?;
641 self.client.execute_sql_sync(vec![copy_into_sql]).await?;
643 }
644 Ok(())
645 }
646
647 async fn commit_schema_change(
648 &mut self,
649 _epoch: u64,
650 schema_change: PbSinkSchemaChange,
651 ) -> Result<()> {
652 use risingwave_pb::stream_plan::sink_schema_change::PbOp as SinkSchemaChangeOp;
653 let schema_change_op = schema_change
654 .op
655 .ok_or_else(|| SinkError::Coordinator(anyhow!("Invalid schema change operation")))?;
656 let SinkSchemaChangeOp::AddColumns(add_columns) = schema_change_op else {
657 return Err(SinkError::Coordinator(anyhow!(
658 "Only AddColumns schema change is supported for Redshift sink"
659 )));
660 };
661 if let Some(shutdown_sender) = &self.shutdown_sender {
662 shutdown_sender
664 .send(())
665 .map_err(|e| SinkError::Config(anyhow!(e)))?;
666 }
667 let sql = build_alter_add_column_sql(
668 self.config.schema.as_deref(),
669 &self.config.table,
670 &add_columns
671 .fields
672 .iter()
673 .map(|f| {
674 (
675 f.name.clone(),
676 DataType::from(f.data_type.as_ref().unwrap()).to_string(),
677 )
678 })
679 .collect_vec(),
680 );
681 let check_column_exists = |e: anyhow::Error| {
682 let err_str = e.to_report_string();
683 if regex::Regex::new(".+ of relation .+ already exists")
684 .unwrap()
685 .find(&err_str)
686 .is_none()
687 {
688 return Err(e);
689 }
690 warn!("redshift sink columns already exists. skipped");
691 Ok(())
692 };
693 self.client
694 .execute_sql_sync(vec![sql.clone()])
695 .await
696 .or_else(check_column_exists)?;
697 if !self.is_append_only {
698 let cdc_table_name = self.config.cdc_table.as_ref().ok_or_else(|| {
699 SinkError::Config(anyhow!(
700 "intermediate.table.name is required for non-append-only sink"
701 ))
702 })?;
703 let sql = build_alter_add_column_sql(
704 self.config.schema.as_deref(),
705 cdc_table_name,
706 &add_columns
707 .fields
708 .iter()
709 .map(|f| {
710 (
711 f.name.clone(),
712 DataType::from(f.data_type.as_ref().unwrap()).to_string(),
713 )
714 })
715 .collect::<Vec<_>>(),
716 );
717 self.client
718 .execute_sql_sync(vec![sql.clone()])
719 .await
720 .or_else(check_column_exists)?;
721 self.all_column_names
722 .extend(add_columns.fields.iter().map(|f| f.name.clone()));
723
724 if let Some(shutdown_sender) = self.shutdown_sender.take() {
725 let _ = shutdown_sender.send(());
726 }
727 if let Some(periodic_task_handle) = self.periodic_task_handle.take() {
728 let _ = periodic_task_handle.await;
729 }
730
731 let (shutdown_sender, shutdown_receiver) = unbounded_channel();
732 let client = self.client.clone();
733 let schema_name = self.config.schema.clone();
734 let cdc_table_name = self.config.cdc_table.clone().unwrap();
735 let target_table_name = self.config.table.clone();
736 let pk_column_names = self.pk_column_names.clone();
737 let all_column_names = self.all_column_names.clone();
738 let schedule_seconds = self.schedule_seconds;
739 let periodic_task_handle = tokio::spawn(async move {
740 Self::run_periodic_query_task(
741 client,
742 schema_name.as_deref(),
743 &cdc_table_name,
744 &target_table_name,
745 pk_column_names,
746 all_column_names,
747 schedule_seconds,
748 shutdown_receiver,
749 )
750 .await;
751 });
752 self.shutdown_sender = Some(shutdown_sender);
753 self.periodic_task_handle = Some(periodic_task_handle);
754 }
755
756 Ok(())
757 }
758}
759
760pub fn build_create_table_sql(
761 schema_name: Option<&str>,
762 table_name: &str,
763 schema: &Schema,
764 need_op_and_row_id: bool,
765) -> Result<String> {
766 let mut columns: Vec<String> = schema
767 .fields
768 .iter()
769 .map(|field| {
770 let data_type = convert_redshift_data_type(&field.data_type)?;
771 Ok(format!("{} {}", field.name, data_type))
772 })
773 .collect::<Result<Vec<String>>>()?;
774 if need_op_and_row_id {
775 columns.push(format!("{} VARCHAR(MAX)", __ROW_ID));
776 columns.push(format!("{} INT", __OP));
777 }
778 let columns_str = columns.join(", ");
779 let full_table_name = build_full_table_name(schema_name, table_name);
780 Ok(format!(
781 "CREATE TABLE IF NOT EXISTS {} ({})",
782 full_table_name, columns_str
783 ))
784}
785
786fn convert_redshift_data_type(data_type: &DataType) -> Result<String> {
787 let data_type = match data_type {
788 DataType::Int16 => "SMALLINT".to_owned(),
789 DataType::Int32 => "INTEGER".to_owned(),
790 DataType::Int64 => "BIGINT".to_owned(),
791 DataType::Float32 => "REAL".to_owned(),
792 DataType::Float64 => "FLOAT".to_owned(),
793 DataType::Boolean => "BOOLEAN".to_owned(),
794 DataType::Varchar => "VARCHAR(MAX)".to_owned(),
795 DataType::Date => "DATE".to_owned(),
796 DataType::Timestamp => "TIMESTAMP".to_owned(),
797 DataType::Timestamptz => "TIMESTAMPTZ".to_owned(),
798 DataType::Jsonb => "VARCHAR(MAX)".to_owned(),
799 DataType::Decimal => "DECIMAL".to_owned(),
800 DataType::Time => "TIME".to_owned(),
801 _ => {
802 return Err(SinkError::Config(anyhow!(
803 "Dont support auto create table for datatype: {}",
804 data_type
805 )));
806 }
807 };
808 Ok(data_type)
809}
810
811fn build_create_merge_into_task_sql(
812 schema_name: Option<&str>,
813 cdc_table_name: &str,
814 target_table_name: &str,
815 pk_column_names: &Vec<String>,
816 all_column_names: &Vec<String>,
817) -> Vec<String> {
818 let cdc_table_name = build_full_table_name(schema_name, cdc_table_name);
819 let target_table_name = build_full_table_name(schema_name, target_table_name);
820 let pk_names_str = pk_column_names.join(", ");
821 let pk_names_eq_str = pk_column_names
822 .iter()
823 .map(|name| format!("{target_table_name}.{name} = source.{name}", name = name))
824 .collect::<Vec<String>>()
825 .join(" AND ");
826 let all_column_names_set_str = all_column_names
827 .iter()
828 .map(|name| format!("{name} = source.{name}", name = name))
829 .collect::<Vec<String>>()
830 .join(", ");
831 let all_column_names_str = all_column_names.join(", ");
832 let all_column_names_insert_str = all_column_names
833 .iter()
834 .map(|name| format!("source.{name}", name = name))
835 .collect::<Vec<String>>()
836 .join(", ");
837
838 vec![
839 format!(
840 r#"
841 CREATE TEMP TABLE max_id_table AS
842 SELECT COALESCE(MAX({redshift_sink_row_id}), '0') AS max_row_id
843 FROM {cdc_table_name};
844 "#,
845 redshift_sink_row_id = __ROW_ID,
846 cdc_table_name = cdc_table_name,
847 ),
848 format!(
849 r#"
850 DELETE FROM {target_table_name}
851 USING (
852 SELECT *
853 FROM (
854 SELECT *, ROW_NUMBER() OVER (
855 PARTITION BY {pk_names_str}
856 ORDER BY {redshift_sink_row_id} DESC
857 ) AS dedupe_id
858 FROM {cdc_table_name}, max_id_table
859 WHERE {cdc_table_name}.{redshift_sink_row_id} <= max_id_table.max_row_id
860 ) AS subquery
861 WHERE dedupe_id = 1 AND {redshift_sink_op} IN (2, 4)
862 ) AS source
863 WHERE {pk_names_eq_str};
864 "#,
865 target_table_name = target_table_name,
866 pk_names_str = pk_names_str,
867 redshift_sink_row_id = __ROW_ID,
868 cdc_table_name = cdc_table_name,
869 redshift_sink_op = __OP,
870 pk_names_eq_str = pk_names_eq_str,
871 ),
872 format!(
873 r#"
874 MERGE INTO {target_table_name}
875 USING (
876 SELECT *
877 FROM (
878 SELECT *, ROW_NUMBER() OVER (
879 PARTITION BY {pk_names_str}
880 ORDER BY {redshift_sink_row_id} DESC
881 ) AS dedupe_id
882 FROM {cdc_table_name}, max_id_table
883 WHERE {cdc_table_name}.{redshift_sink_row_id} <= max_id_table.max_row_id
884 ) AS subquery
885 WHERE dedupe_id = 1 AND {redshift_sink_op} IN (1, 3)
886 ) AS source
887 ON {pk_names_eq_str}
888 WHEN MATCHED THEN
889 UPDATE SET {all_column_names_set_str}
890 WHEN NOT MATCHED THEN
891 INSERT ({all_column_names_str}) VALUES ({all_column_names_insert_str});
892 "#,
893 target_table_name = target_table_name,
894 pk_names_str = pk_names_str,
895 redshift_sink_row_id = __ROW_ID,
896 cdc_table_name = cdc_table_name,
897 redshift_sink_op = __OP,
898 pk_names_eq_str = pk_names_eq_str,
899 all_column_names_set_str = all_column_names_set_str,
900 all_column_names_str = all_column_names_str,
901 all_column_names_insert_str = all_column_names_insert_str,
902 ),
903 format!(
904 r#"
905 DELETE FROM {cdc_table_name}
906 USING max_id_table
907 WHERE {cdc_table_name}.{redshift_sink_row_id} <= max_id_table.max_row_id;
908 "#,
909 cdc_table_name = cdc_table_name,
910 redshift_sink_row_id = __ROW_ID,
911 ),
912 "DROP TABLE IF EXISTS max_id_table;".to_owned(),
913 ]
914}
915
916fn build_copy_into_sql(
917 schema_name: Option<&str>,
918 table_name: &str,
919 manifest_path: &str,
920 access_key: &Option<String>,
921 secret_key: &Option<String>,
922 assume_role: &Option<String>,
923) -> Result<String> {
924 let table_name = build_full_table_name(schema_name, table_name);
925 let credentials = if let Some(assume_role) = assume_role {
926 &format!("aws_iam_role={}", assume_role)
927 } else if let (Some(access_key), Some(secret_key)) = (access_key, secret_key) {
928 &format!(
929 "aws_access_key_id={};aws_secret_access_key={}",
930 access_key, secret_key
931 )
932 } else {
933 return Err(SinkError::Config(anyhow!(
934 "Either assume_role or access_key and secret_key must be provided for Redshift COPY command"
935 )));
936 };
937 Ok(format!(
938 r#"
939 COPY {table_name}
940 FROM '{manifest_path}'
941 CREDENTIALS '{credentials}'
942 FORMAT AS JSON 'auto'
943 DATEFORMAT 'auto'
944 TIMEFORMAT 'auto'
945 MANIFEST;
946 "#,
947 table_name = table_name,
948 manifest_path = manifest_path,
949 credentials = credentials
950 ))
951}