1use core::num::NonZero;
16use std::fmt::Write;
17use std::time::Duration;
18
19use anyhow::anyhow;
20use bytes::BytesMut;
21use itertools::Itertools;
22use phf::{Set, phf_set};
23use risingwave_common::array::StreamChunk;
24use risingwave_common::catalog::{ColumnDesc, ColumnId, Schema};
25use risingwave_common::types::DataType;
26use risingwave_pb::connector_service::sink_metadata::SerializedMetadata;
27use risingwave_pb::connector_service::{SinkMetadata, sink_metadata};
28use risingwave_pb::stream_plan::PbSinkSchemaChange;
29use serde::Deserialize;
30use serde_json::json;
31use serde_with::{DisplayFromStr, serde_as};
32use thiserror_ext::AsReport;
33use tokio::sync::mpsc::{UnboundedSender, unbounded_channel};
34use tokio::time::{MissedTickBehavior, interval};
35use tonic::async_trait;
36use tracing::warn;
37use with_options::WithOptions;
38
39use crate::connector_common::IcebergSinkCompactionUpdate;
40use crate::enforce_secret::EnforceSecret;
41use crate::sink::coordinate::CoordinatedLogSinker;
42use crate::sink::file_sink::opendal_sink::FileSink;
43use crate::sink::file_sink::s3::{S3Common, S3Sink};
44use crate::sink::jdbc_jni_client::{self, JdbcJniClient};
45use crate::sink::remote::CoordinatedRemoteSinkWriter;
46use crate::sink::snowflake_redshift::{
47 __OP, __ROW_ID, AugmentedChunk, SnowflakeRedshiftSinkS3Writer, build_opendal_writer_path,
48};
49use crate::sink::writer::SinkWriter;
50use crate::sink::{
51 Result, SinglePhaseCommitCoordinator, Sink, SinkCommitCoordinator, SinkError, SinkParam,
52 SinkWriterMetrics,
53};
54
55pub const REDSHIFT_SINK: &str = "redshift";
56
57fn build_full_table_name(schema_name: Option<&str>, table_name: &str) -> String {
58 if let Some(schema_name) = schema_name {
59 format!(r#""{}"."{}""#, schema_name, table_name)
60 } else {
61 format!(r#""{}""#, table_name)
62 }
63}
64
65fn build_alter_add_column_sql(
66 schema_name: Option<&str>,
67 table_name: &str,
68 columns: &Vec<(String, String)>,
69) -> String {
70 let full_table_name = build_full_table_name(schema_name, table_name);
71 jdbc_jni_client::build_alter_add_column_sql(&full_table_name, columns, false)
73}
74
75#[serde_as]
76#[derive(Debug, Clone, Deserialize, WithOptions)]
77pub struct RedShiftConfig {
78 #[serde(rename = "jdbc.url")]
79 pub jdbc_url: String,
80
81 #[serde(rename = "user")]
82 pub username: Option<String>,
83
84 #[serde(rename = "password")]
85 pub password: Option<String>,
86
87 #[serde(rename = "schema")]
88 pub schema: Option<String>,
89
90 #[serde(rename = "table.name")]
91 pub table: String,
92
93 #[serde(rename = "intermediate.table.name")]
94 pub cdc_table: Option<String>,
95
96 #[serde(default)]
97 #[serde(rename = "create_table_if_not_exists")]
98 #[serde_as(as = "DisplayFromStr")]
99 pub create_table_if_not_exists: bool,
100
101 #[serde(default = "default_schedule")]
102 #[serde(rename = "write.target.interval.seconds")]
103 #[serde_as(as = "DisplayFromStr")]
104 pub schedule_seconds: u64,
105
106 #[serde(default = "default_batch_insert_rows")]
107 #[serde(rename = "batch.insert.rows")]
108 #[serde_as(as = "DisplayFromStr")]
109 pub batch_insert_rows: u32,
110
111 #[serde(default = "default_with_s3")]
112 #[serde(rename = "with_s3")]
113 #[serde_as(as = "DisplayFromStr")]
114 pub with_s3: bool,
115
116 #[serde(flatten)]
117 pub s3_inner: Option<S3Common>,
118}
119
120fn default_schedule() -> u64 {
121 3600 }
123
124fn default_batch_insert_rows() -> u32 {
125 4096 }
127
128fn default_with_s3() -> bool {
129 true
130}
131
132impl RedShiftConfig {
133 pub fn build_client(&self) -> Result<JdbcJniClient> {
134 let mut jdbc_url = self.jdbc_url.clone();
135 if let Some(username) = &self.username {
136 jdbc_url = format!("{}?user={}", jdbc_url, username);
137 }
138 if let Some(password) = &self.password {
139 jdbc_url = format!("{}&password={}", jdbc_url, password);
140 }
141 JdbcJniClient::new(jdbc_url)
142 }
143}
144
145#[derive(Debug)]
146pub struct RedshiftSink {
147 config: RedShiftConfig,
148 param: SinkParam,
149 is_append_only: bool,
150 schema: Schema,
151 pk_indices: Vec<usize>,
152}
153impl EnforceSecret for RedshiftSink {
154 const ENFORCE_SECRET_PROPERTIES: Set<&'static str> = phf_set! {
155 "user",
156 "password",
157 "jdbc.url"
158 };
159}
160
161impl TryFrom<SinkParam> for RedshiftSink {
162 type Error = SinkError;
163
164 fn try_from(param: SinkParam) -> std::result::Result<Self, Self::Error> {
165 let config = serde_json::from_value::<RedShiftConfig>(
166 serde_json::to_value(param.properties.clone()).unwrap(),
167 )
168 .map_err(|e| SinkError::Config(anyhow!(e)))?;
169 let is_append_only = param.sink_type.is_append_only();
170 let schema = param.schema();
171 let pk_indices = param.downstream_pk_or_empty();
172 Ok(Self {
173 config,
174 param,
175 is_append_only,
176 schema,
177 pk_indices,
178 })
179 }
180}
181
182impl Sink for RedshiftSink {
183 type LogSinker = CoordinatedLogSinker<RedShiftSinkWriter>;
184
185 const SINK_NAME: &'static str = REDSHIFT_SINK;
186
187 async fn validate(&self) -> Result<()> {
188 if self.config.create_table_if_not_exists {
189 let client = self.config.build_client()?;
190 let schema = self.param.schema();
191 let build_table_sql = build_create_table_sql(
192 self.config.schema.as_deref(),
193 &self.config.table,
194 &schema,
195 false,
196 )?;
197 client.execute_sql_sync(vec![build_table_sql]).await?;
198 if !self.is_append_only {
199 let cdc_table = self.config.cdc_table.as_ref().ok_or_else(|| {
200 SinkError::Config(anyhow!(
201 "intermediate.table.name is required for append-only sink"
202 ))
203 })?;
204 let build_cdc_table_sql = build_create_table_sql(
205 self.config.schema.as_deref(),
206 cdc_table,
207 &schema,
208 true,
209 )?;
210 client.execute_sql_sync(vec![build_cdc_table_sql]).await?;
211 }
212 }
213 Ok(())
214 }
215
216 fn support_schema_change() -> bool {
217 true
218 }
219
220 async fn new_log_sinker(
221 &self,
222 writer_param: crate::sink::SinkWriterParam,
223 ) -> Result<Self::LogSinker> {
224 let writer = RedShiftSinkWriter::new(
225 self.config.clone(),
226 self.is_append_only,
227 writer_param.clone(),
228 self.param.clone(),
229 )
230 .await?;
231 CoordinatedLogSinker::new(
232 &writer_param,
233 self.param.clone(),
234 writer,
235 NonZero::new(1).unwrap(),
236 )
237 .await
238 }
239
240 fn is_coordinated_sink(&self) -> bool {
241 true
242 }
243
244 async fn new_coordinator(
245 &self,
246 _iceberg_compact_stat_sender: Option<UnboundedSender<IcebergSinkCompactionUpdate>>,
247 ) -> Result<SinkCommitCoordinator> {
248 let pk_column_names: Vec<_> = self
249 .schema
250 .fields
251 .iter()
252 .enumerate()
253 .filter(|(index, _)| self.pk_indices.contains(index))
254 .map(|(_, field)| field.name.clone())
255 .collect();
256 if pk_column_names.is_empty() && !self.is_append_only {
257 return Err(SinkError::Config(anyhow!(
258 "Primary key columns not found. Please set the `primary_key` column in the sink properties, or ensure that the sink contains the primary key columns from the upstream."
259 )));
260 }
261 let all_column_names = self
262 .schema
263 .fields
264 .iter()
265 .map(|field| field.name.clone())
266 .collect();
267 let coordinator = RedshiftSinkCommitter::new(
268 self.config.clone(),
269 self.is_append_only,
270 &pk_column_names,
271 &all_column_names,
272 )?;
273 Ok(SinkCommitCoordinator::SinglePhase(Box::new(coordinator)))
274 }
275}
276
277pub enum RedShiftSinkWriter {
278 S3(SnowflakeRedshiftSinkS3Writer),
279 Jdbc(RedShiftSinkJdbcWriter),
280}
281
282impl RedShiftSinkWriter {
283 pub async fn new(
284 config: RedShiftConfig,
285 is_append_only: bool,
286 writer_param: super::SinkWriterParam,
287 param: SinkParam,
288 ) -> Result<Self> {
289 let schema = param.schema();
290 if config.with_s3 {
291 let executor_id = writer_param.executor_id;
292 let s3_writer = SnowflakeRedshiftSinkS3Writer::new(
293 config.s3_inner.ok_or_else(|| {
294 SinkError::Config(anyhow!("S3 configuration is required for S3 sink"))
295 })?,
296 schema,
297 is_append_only,
298 executor_id,
299 Some(config.table),
300 )?;
301 Ok(Self::S3(s3_writer))
302 } else {
303 let jdbc_writer =
304 RedShiftSinkJdbcWriter::new(config, is_append_only, writer_param, param).await?;
305 Ok(Self::Jdbc(jdbc_writer))
306 }
307 }
308}
309
310#[async_trait]
311impl SinkWriter for RedShiftSinkWriter {
312 type CommitMetadata = Option<SinkMetadata>;
313
314 async fn begin_epoch(&mut self, epoch: u64) -> Result<()> {
315 match self {
316 Self::S3(writer) => writer.begin_epoch(epoch),
317 Self::Jdbc(writer) => writer.begin_epoch(epoch).await,
318 }
319 }
320
321 async fn write_batch(&mut self, chunk: StreamChunk) -> Result<()> {
322 match self {
323 Self::S3(writer) => writer.write_batch(chunk).await,
324 Self::Jdbc(writer) => writer.write_batch(chunk).await,
325 }
326 }
327
328 async fn barrier(&mut self, is_checkpoint: bool) -> Result<Option<SinkMetadata>> {
329 let metadata = match self {
330 Self::S3(writer) => {
331 if let Some(path) = writer.barrier(is_checkpoint).await? {
332 path.into_bytes()
333 } else {
334 vec![]
335 }
336 }
337 Self::Jdbc(writer) => {
338 writer.barrier(is_checkpoint).await?;
339 vec![]
340 }
341 };
342 Ok(Some(SinkMetadata {
343 metadata: Some(sink_metadata::Metadata::Serialized(SerializedMetadata {
344 metadata,
345 })),
346 }))
347 }
348
349 async fn abort(&mut self) -> Result<()> {
350 if let Self::Jdbc(writer) = self {
351 writer.abort().await
352 } else {
353 Ok(())
354 }
355 }
356}
357
358pub struct RedShiftSinkJdbcWriter {
359 augmented_row: AugmentedChunk,
360 jdbc_sink_writer: CoordinatedRemoteSinkWriter,
361}
362
363impl RedShiftSinkJdbcWriter {
364 pub async fn new(
365 config: RedShiftConfig,
366 is_append_only: bool,
367 writer_param: super::SinkWriterParam,
368 mut param: SinkParam,
369 ) -> Result<Self> {
370 let metrics = SinkWriterMetrics::new(&writer_param);
371 let column_descs = &mut param.columns;
372 param.properties.remove("create_table_if_not_exists");
373 param.properties.remove("write.target.interval.seconds");
374 let full_table_name = if is_append_only {
375 config.table
376 } else {
377 let max_column_id = column_descs
378 .iter()
379 .map(|column| column.column_id.get_id())
380 .max()
381 .unwrap_or(0);
382 (*column_descs).push(ColumnDesc::named(
383 __ROW_ID,
384 ColumnId::new(max_column_id + 1),
385 DataType::Varchar,
386 ));
387 (*column_descs).push(ColumnDesc::named(
388 __OP,
389 ColumnId::new(max_column_id + 2),
390 DataType::Int32,
391 ));
392 config.cdc_table.ok_or_else(|| {
393 SinkError::Config(anyhow!(
394 "intermediate.table.name is required for non-append-only sink"
395 ))
396 })?
397 };
398 param.properties.remove("intermediate.table.name");
399 param.properties.remove("table.name");
400 param.properties.remove("with_s3");
401 if let Some(schema_name) = param.properties.remove("schema") {
402 param
403 .properties
404 .insert("schema.name".to_owned(), schema_name);
405 }
406 param
407 .properties
408 .insert("table.name".to_owned(), full_table_name.clone());
409 param
410 .properties
411 .insert("type".to_owned(), "append-only".to_owned());
412
413 let jdbc_sink_writer =
414 CoordinatedRemoteSinkWriter::new(param.clone(), metrics.clone()).await?;
415 Ok(Self {
416 augmented_row: AugmentedChunk::new(0, is_append_only),
417 jdbc_sink_writer,
418 })
419 }
420
421 async fn begin_epoch(&mut self, epoch: u64) -> Result<()> {
422 self.augmented_row.reset_epoch(epoch);
423 self.jdbc_sink_writer.begin_epoch(epoch).await?;
424 Ok(())
425 }
426
427 async fn write_batch(&mut self, chunk: StreamChunk) -> Result<()> {
428 let chunk = self.augmented_row.augmented_chunk(chunk)?;
429 self.jdbc_sink_writer.write_batch(chunk).await?;
430 Ok(())
431 }
432
433 async fn barrier(&mut self, is_checkpoint: bool) -> Result<()> {
434 self.jdbc_sink_writer.barrier(is_checkpoint).await?;
435 Ok(())
436 }
437
438 async fn abort(&mut self) -> Result<()> {
439 self.jdbc_sink_writer.abort().await?;
441 Ok(())
442 }
443}
444
445pub struct RedshiftSinkCommitter {
446 config: RedShiftConfig,
447 client: JdbcJniClient,
448 pk_column_names: Vec<String>,
449 all_column_names: Vec<String>,
450 schedule_seconds: u64,
451 is_append_only: bool,
452 periodic_task_handle: Option<tokio::task::JoinHandle<()>>,
453 shutdown_sender: Option<tokio::sync::mpsc::UnboundedSender<()>>,
454}
455
456impl RedshiftSinkCommitter {
457 pub fn new(
458 config: RedShiftConfig,
459 is_append_only: bool,
460 pk_column_names: &Vec<String>,
461 all_column_names: &Vec<String>,
462 ) -> Result<Self> {
463 let client = config.build_client()?;
464 let schedule_seconds = config.schedule_seconds;
465 let (periodic_task_handle, shutdown_sender) = if !is_append_only {
466 let schema_name = config.schema.clone();
467 let target_table_name = config.table.clone();
468 let cdc_table_name = config.cdc_table.clone().ok_or_else(|| {
469 SinkError::Config(anyhow!(
470 "intermediate.table.name is required for non-append-only sink"
471 ))
472 })?;
473 let (shutdown_sender, shutdown_receiver) = unbounded_channel();
475
476 let task_client = config.build_client()?;
478
479 let pk_column_names = pk_column_names.clone();
480 let all_column_names = all_column_names.clone();
481 let periodic_task_handle = tokio::spawn(async move {
483 Self::run_periodic_query_task(
484 task_client,
485 schema_name.as_deref(),
486 &cdc_table_name,
487 &target_table_name,
488 pk_column_names,
489 all_column_names,
490 schedule_seconds,
491 shutdown_receiver,
492 )
493 .await;
494 });
495 (Some(periodic_task_handle), Some(shutdown_sender))
496 } else {
497 (None, None)
498 };
499
500 Ok(Self {
501 client,
502 config,
503 pk_column_names: pk_column_names.clone(),
504 all_column_names: all_column_names.clone(),
505 is_append_only,
506 schedule_seconds,
507 periodic_task_handle,
508 shutdown_sender,
509 })
510 }
511
512 async fn run_periodic_query_task(
514 client: JdbcJniClient,
515 schema_name: Option<&str>,
516 cdc_table_name: &str,
517 target_table_name: &str,
518 pk_column_names: Vec<String>,
519 all_column_names: Vec<String>,
520 schedule_seconds: u64,
521 mut shutdown_receiver: tokio::sync::mpsc::UnboundedReceiver<()>,
522 ) {
523 let mut interval_timer = interval(Duration::from_secs(schedule_seconds)); interval_timer.set_missed_tick_behavior(MissedTickBehavior::Skip);
525 let sql = build_create_merge_into_task_sql(
526 schema_name,
527 cdc_table_name,
528 target_table_name,
529 &pk_column_names,
530 &all_column_names,
531 );
532 loop {
533 tokio::select! {
534 _ = shutdown_receiver.recv() => {
536 tracing::info!("Periodic query task received shutdown signal, stopping");
537 break;
538 }
539 _ = interval_timer.tick() => {
541
542 match client.execute_sql_sync(sql.clone()).await {
543 Ok(_) => {
544 tracing::info!("Periodic query executed successfully for table: {}", target_table_name);
545 }
546 Err(e) => {
547 tracing::warn!("Failed to execute periodic query for table {}: {}", target_table_name, e.as_report());
548 }
549 }
550 }
551 }
552 }
553 }
554}
555
556impl Drop for RedshiftSinkCommitter {
557 fn drop(&mut self) {
558 if let Some(shutdown_sender) = &self.shutdown_sender
560 && let Err(e) = shutdown_sender.send(())
561 {
562 tracing::warn!(
563 "Failed to send shutdown signal to periodic task: {}",
564 e.as_report()
565 );
566 }
567 tracing::info!("RedshiftSinkCommitter dropped, periodic task stopped");
568 }
569}
570
571#[async_trait]
572impl SinglePhaseCommitCoordinator for RedshiftSinkCommitter {
573 async fn init(&mut self) -> Result<()> {
574 Ok(())
575 }
576
577 async fn commit(
578 &mut self,
579 _epoch: u64,
580 metadata: Vec<SinkMetadata>,
581 schema_change: Option<PbSinkSchemaChange>,
582 ) -> Result<()> {
583 let paths = metadata
584 .into_iter()
585 .filter(|m| {
586 if let Some(sink_metadata::Metadata::Serialized(SerializedMetadata { metadata })) =
587 &m.metadata
588 {
589 !metadata.is_empty()
590 } else {
591 false
592 }
593 })
594 .map(|metadata| {
595 let path = if let Some(sink_metadata::Metadata::Serialized(SerializedMetadata {
596 metadata,
597 })) = metadata.metadata
598 {
599 String::from_utf8(metadata).map_err(|e| SinkError::Config(anyhow!(e)))
600 } else {
601 Err(SinkError::Config(anyhow!("Invalid metadata format")))
602 }?;
603 Ok(json!({
604 "url": path,
605 "mandatory": true
606 }))
607 })
608 .collect::<Result<Vec<_>>>()?;
609 if !paths.is_empty() {
610 let s3_inner = self.config.s3_inner.as_ref().ok_or_else(|| {
611 SinkError::Config(anyhow!("S3 configuration is required for S3 sink"))
612 })?;
613 let s3_operator = FileSink::<S3Sink>::new_s3_sink(s3_inner)?;
614 let (mut writer, path) =
615 build_opendal_writer_path(s3_inner, 0.into(), &s3_operator, &None).await?;
616 let manifest_json = json!({
617 "entries": paths
618 });
619 let mut chunk_buf = BytesMut::new();
620 writeln!(chunk_buf, "{}", manifest_json).unwrap();
621 writer.write(chunk_buf.freeze()).await?;
622 writer
623 .close()
624 .await
625 .map_err(|e| SinkError::File(e.to_report_string()))?;
626 let table = if self.is_append_only {
627 &self.config.table
628 } else {
629 self.config.cdc_table.as_ref().ok_or_else(|| {
630 SinkError::Config(anyhow!(
631 "intermediate.table.name is required for non-append-only sink"
632 ))
633 })?
634 };
635 let s3_inner = self.config.s3_inner.as_ref().ok_or_else(|| {
636 SinkError::Config(anyhow!("S3 configuration is required for S3 sink"))
637 })?;
638 let copy_into_sql = build_copy_into_sql(
639 self.config.schema.as_deref(),
640 table,
641 &path,
642 &s3_inner.access,
643 &s3_inner.secret,
644 &s3_inner.assume_role,
645 )?;
646 self.client.execute_sql_sync(vec![copy_into_sql]).await?;
648 }
649
650 if let Some(schema_change) = schema_change {
651 use risingwave_pb::stream_plan::sink_schema_change::PbOp as SinkSchemaChangeOp;
652 let schema_change_op = schema_change.op.ok_or_else(|| {
653 SinkError::Coordinator(anyhow!("Invalid schema change operation"))
654 })?;
655 let SinkSchemaChangeOp::AddColumns(add_columns) = schema_change_op else {
656 return Err(SinkError::Coordinator(anyhow!(
657 "Only AddColumns schema change is supported for Redshift sink"
658 )));
659 };
660 if let Some(shutdown_sender) = &self.shutdown_sender {
661 shutdown_sender
663 .send(())
664 .map_err(|e| SinkError::Config(anyhow!(e)))?;
665 }
666 let sql = build_alter_add_column_sql(
667 self.config.schema.as_deref(),
668 &self.config.table,
669 &add_columns
670 .fields
671 .iter()
672 .map(|f| {
673 (
674 f.name.clone(),
675 DataType::from(f.data_type.as_ref().unwrap()).to_string(),
676 )
677 })
678 .collect_vec(),
679 );
680 let check_column_exists = |e: anyhow::Error| {
681 let err_str = e.to_report_string();
682 if regex::Regex::new(".+ of relation .+ already exists")
683 .unwrap()
684 .find(&err_str)
685 .is_none()
686 {
687 return Err(e);
688 }
689 warn!("redshift sink columns already exists. skipped");
690 Ok(())
691 };
692 self.client
693 .execute_sql_sync(vec![sql.clone()])
694 .await
695 .or_else(check_column_exists)?;
696 if !self.is_append_only {
697 let cdc_table_name = self.config.cdc_table.as_ref().ok_or_else(|| {
698 SinkError::Config(anyhow!(
699 "intermediate.table.name is required for non-append-only sink"
700 ))
701 })?;
702 let sql = build_alter_add_column_sql(
703 self.config.schema.as_deref(),
704 cdc_table_name,
705 &add_columns
706 .fields
707 .iter()
708 .map(|f| {
709 (
710 f.name.clone(),
711 DataType::from(f.data_type.as_ref().unwrap()).to_string(),
712 )
713 })
714 .collect::<Vec<_>>(),
715 );
716 self.client
717 .execute_sql_sync(vec![sql.clone()])
718 .await
719 .or_else(check_column_exists)?;
720 self.all_column_names
721 .extend(add_columns.fields.iter().map(|f| f.name.clone()));
722
723 if let Some(shutdown_sender) = self.shutdown_sender.take() {
724 let _ = shutdown_sender.send(());
725 }
726 if let Some(periodic_task_handle) = self.periodic_task_handle.take() {
727 let _ = periodic_task_handle.await;
728 }
729
730 let (shutdown_sender, shutdown_receiver) = unbounded_channel();
731 let client = self.client.clone();
732 let schema_name = self.config.schema.clone();
733 let cdc_table_name = self.config.cdc_table.clone().unwrap();
734 let target_table_name = self.config.table.clone();
735 let pk_column_names = self.pk_column_names.clone();
736 let all_column_names = self.all_column_names.clone();
737 let schedule_seconds = self.schedule_seconds;
738 let periodic_task_handle = tokio::spawn(async move {
739 Self::run_periodic_query_task(
740 client,
741 schema_name.as_deref(),
742 &cdc_table_name,
743 &target_table_name,
744 pk_column_names,
745 all_column_names,
746 schedule_seconds,
747 shutdown_receiver,
748 )
749 .await;
750 });
751 self.shutdown_sender = Some(shutdown_sender);
752 self.periodic_task_handle = Some(periodic_task_handle);
753 }
754 }
755 Ok(())
756 }
757}
758
759pub fn build_create_table_sql(
760 schema_name: Option<&str>,
761 table_name: &str,
762 schema: &Schema,
763 need_op_and_row_id: bool,
764) -> Result<String> {
765 let mut columns: Vec<String> = schema
766 .fields
767 .iter()
768 .map(|field| {
769 let data_type = convert_redshift_data_type(&field.data_type)?;
770 Ok(format!("{} {}", field.name, data_type))
771 })
772 .collect::<Result<Vec<String>>>()?;
773 if need_op_and_row_id {
774 columns.push(format!("{} VARCHAR(MAX)", __ROW_ID));
775 columns.push(format!("{} INT", __OP));
776 }
777 let columns_str = columns.join(", ");
778 let full_table_name = build_full_table_name(schema_name, table_name);
779 Ok(format!(
780 "CREATE TABLE IF NOT EXISTS {} ({})",
781 full_table_name, columns_str
782 ))
783}
784
785fn convert_redshift_data_type(data_type: &DataType) -> Result<String> {
786 let data_type = match data_type {
787 DataType::Int16 => "SMALLINT".to_owned(),
788 DataType::Int32 => "INTEGER".to_owned(),
789 DataType::Int64 => "BIGINT".to_owned(),
790 DataType::Float32 => "REAL".to_owned(),
791 DataType::Float64 => "FLOAT".to_owned(),
792 DataType::Boolean => "BOOLEAN".to_owned(),
793 DataType::Varchar => "VARCHAR(MAX)".to_owned(),
794 DataType::Date => "DATE".to_owned(),
795 DataType::Timestamp => "TIMESTAMP".to_owned(),
796 DataType::Timestamptz => "TIMESTAMPTZ".to_owned(),
797 DataType::Jsonb => "VARCHAR(MAX)".to_owned(),
798 DataType::Decimal => "DECIMAL".to_owned(),
799 DataType::Time => "TIME".to_owned(),
800 _ => {
801 return Err(SinkError::Config(anyhow!(
802 "Dont support auto create table for datatype: {}",
803 data_type
804 )));
805 }
806 };
807 Ok(data_type)
808}
809
810fn build_create_merge_into_task_sql(
811 schema_name: Option<&str>,
812 cdc_table_name: &str,
813 target_table_name: &str,
814 pk_column_names: &Vec<String>,
815 all_column_names: &Vec<String>,
816) -> Vec<String> {
817 let cdc_table_name = build_full_table_name(schema_name, cdc_table_name);
818 let target_table_name = build_full_table_name(schema_name, target_table_name);
819 let pk_names_str = pk_column_names.join(", ");
820 let pk_names_eq_str = pk_column_names
821 .iter()
822 .map(|name| format!("{target_table_name}.{name} = source.{name}", name = name))
823 .collect::<Vec<String>>()
824 .join(" AND ");
825 let all_column_names_set_str = all_column_names
826 .iter()
827 .map(|name| format!("{name} = source.{name}", name = name))
828 .collect::<Vec<String>>()
829 .join(", ");
830 let all_column_names_str = all_column_names.join(", ");
831 let all_column_names_insert_str = all_column_names
832 .iter()
833 .map(|name| format!("source.{name}", name = name))
834 .collect::<Vec<String>>()
835 .join(", ");
836
837 vec![
838 format!(
839 r#"
840 CREATE TEMP TABLE max_id_table AS
841 SELECT COALESCE(MAX({redshift_sink_row_id}), '0') AS max_row_id
842 FROM {cdc_table_name};
843 "#,
844 redshift_sink_row_id = __ROW_ID,
845 cdc_table_name = cdc_table_name,
846 ),
847 format!(
848 r#"
849 DELETE FROM {target_table_name}
850 USING (
851 SELECT *
852 FROM (
853 SELECT *, ROW_NUMBER() OVER (
854 PARTITION BY {pk_names_str}
855 ORDER BY {redshift_sink_row_id} DESC
856 ) AS dedupe_id
857 FROM {cdc_table_name}, max_id_table
858 WHERE {cdc_table_name}.{redshift_sink_row_id} <= max_id_table.max_row_id
859 ) AS subquery
860 WHERE dedupe_id = 1 AND {redshift_sink_op} IN (2, 4)
861 ) AS source
862 WHERE {pk_names_eq_str};
863 "#,
864 target_table_name = target_table_name,
865 pk_names_str = pk_names_str,
866 redshift_sink_row_id = __ROW_ID,
867 cdc_table_name = cdc_table_name,
868 redshift_sink_op = __OP,
869 pk_names_eq_str = pk_names_eq_str,
870 ),
871 format!(
872 r#"
873 MERGE INTO {target_table_name}
874 USING (
875 SELECT *
876 FROM (
877 SELECT *, ROW_NUMBER() OVER (
878 PARTITION BY {pk_names_str}
879 ORDER BY {redshift_sink_row_id} DESC
880 ) AS dedupe_id
881 FROM {cdc_table_name}, max_id_table
882 WHERE {cdc_table_name}.{redshift_sink_row_id} <= max_id_table.max_row_id
883 ) AS subquery
884 WHERE dedupe_id = 1 AND {redshift_sink_op} IN (1, 3)
885 ) AS source
886 ON {pk_names_eq_str}
887 WHEN MATCHED THEN
888 UPDATE SET {all_column_names_set_str}
889 WHEN NOT MATCHED THEN
890 INSERT ({all_column_names_str}) VALUES ({all_column_names_insert_str});
891 "#,
892 target_table_name = target_table_name,
893 pk_names_str = pk_names_str,
894 redshift_sink_row_id = __ROW_ID,
895 cdc_table_name = cdc_table_name,
896 redshift_sink_op = __OP,
897 pk_names_eq_str = pk_names_eq_str,
898 all_column_names_set_str = all_column_names_set_str,
899 all_column_names_str = all_column_names_str,
900 all_column_names_insert_str = all_column_names_insert_str,
901 ),
902 format!(
903 r#"
904 DELETE FROM {cdc_table_name}
905 USING max_id_table
906 WHERE {cdc_table_name}.{redshift_sink_row_id} <= max_id_table.max_row_id;
907 "#,
908 cdc_table_name = cdc_table_name,
909 redshift_sink_row_id = __ROW_ID,
910 ),
911 "DROP TABLE IF EXISTS max_id_table;".to_owned(),
912 ]
913}
914
915fn build_copy_into_sql(
916 schema_name: Option<&str>,
917 table_name: &str,
918 manifest_path: &str,
919 access_key: &Option<String>,
920 secret_key: &Option<String>,
921 assume_role: &Option<String>,
922) -> Result<String> {
923 let table_name = build_full_table_name(schema_name, table_name);
924 let credentials = if let Some(assume_role) = assume_role {
925 &format!("aws_iam_role={}", assume_role)
926 } else if let (Some(access_key), Some(secret_key)) = (access_key, secret_key) {
927 &format!(
928 "aws_access_key_id={};aws_secret_access_key={}",
929 access_key, secret_key
930 )
931 } else {
932 return Err(SinkError::Config(anyhow!(
933 "Either assume_role or access_key and secret_key must be provided for Redshift COPY command"
934 )));
935 };
936 Ok(format!(
937 r#"
938 COPY {table_name}
939 FROM '{manifest_path}'
940 CREDENTIALS '{credentials}'
941 FORMAT AS JSON 'auto'
942 DATEFORMAT 'auto'
943 TIMEFORMAT 'auto'
944 MANIFEST;
945 "#,
946 table_name = table_name,
947 manifest_path = manifest_path,
948 credentials = credentials
949 ))
950}