risingwave_connector/sink/snowflake_redshift/
redshift.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use core::num::NonZero;
16use std::fmt::Write;
17use std::time::Duration;
18
19use anyhow::anyhow;
20use bytes::BytesMut;
21use phf::{Set, phf_set};
22use risingwave_common::array::StreamChunk;
23use risingwave_common::catalog::{ColumnDesc, ColumnId, Field, Schema};
24use risingwave_common::types::DataType;
25use risingwave_pb::connector_service::sink_metadata::SerializedMetadata;
26use risingwave_pb::connector_service::{SinkMetadata, sink_metadata};
27use sea_orm::DatabaseConnection;
28use serde::Deserialize;
29use serde_json::json;
30use serde_with::{DisplayFromStr, serde_as};
31use thiserror_ext::AsReport;
32use tokio::sync::mpsc::{UnboundedSender, unbounded_channel};
33use tokio::time::{MissedTickBehavior, interval};
34use tonic::async_trait;
35use tracing::warn;
36use with_options::WithOptions;
37
38use crate::connector_common::IcebergSinkCompactionUpdate;
39use crate::enforce_secret::EnforceSecret;
40use crate::sink::coordinate::CoordinatedLogSinker;
41use crate::sink::file_sink::opendal_sink::FileSink;
42use crate::sink::file_sink::s3::{S3Common, S3Sink};
43use crate::sink::jdbc_jni_client::{self, JdbcJniClient};
44use crate::sink::remote::CoordinatedRemoteSinkWriter;
45use crate::sink::snowflake_redshift::{
46    __OP, __ROW_ID, AugmentedChunk, SnowflakeRedshiftSinkS3Writer, build_opendal_writer_path,
47};
48use crate::sink::writer::SinkWriter;
49use crate::sink::{
50    Result, Sink, SinkCommitCoordinator, SinkCommittedEpochSubscriber, SinkError, SinkParam,
51    SinkWriterMetrics,
52};
53
54pub const REDSHIFT_SINK: &str = "redshift";
55
56fn build_full_table_name(schema_name: Option<&str>, table_name: &str) -> String {
57    if let Some(schema_name) = schema_name {
58        format!(r#""{}"."{}""#, schema_name, table_name)
59    } else {
60        format!(r#""{}""#, table_name)
61    }
62}
63
64fn build_alter_add_column_sql(
65    schema_name: Option<&str>,
66    table_name: &str,
67    columns: &Vec<(String, String)>,
68) -> String {
69    let full_table_name = build_full_table_name(schema_name, table_name);
70    // redshift does not support add column IF NOT EXISTS yet.
71    jdbc_jni_client::build_alter_add_column_sql(&full_table_name, columns, false)
72}
73
74#[serde_as]
75#[derive(Debug, Clone, Deserialize, WithOptions)]
76pub struct RedShiftConfig {
77    #[serde(rename = "jdbc.url")]
78    pub jdbc_url: String,
79
80    #[serde(rename = "user")]
81    pub username: Option<String>,
82
83    #[serde(rename = "password")]
84    pub password: Option<String>,
85
86    #[serde(rename = "schema")]
87    pub schema: Option<String>,
88
89    #[serde(rename = "table.name")]
90    pub table: String,
91
92    #[serde(rename = "intermediate.table.name")]
93    pub cdc_table: Option<String>,
94
95    #[serde(default)]
96    #[serde(rename = "create_table_if_not_exists")]
97    #[serde_as(as = "DisplayFromStr")]
98    pub create_table_if_not_exists: bool,
99
100    #[serde(default = "default_schedule")]
101    #[serde(rename = "write.target.interval.seconds")]
102    #[serde_as(as = "DisplayFromStr")]
103    pub schedule_seconds: u64,
104
105    #[serde(default = "default_batch_insert_rows")]
106    #[serde(rename = "batch.insert.rows")]
107    #[serde_as(as = "DisplayFromStr")]
108    pub batch_insert_rows: u32,
109
110    #[serde(default = "default_with_s3")]
111    #[serde(rename = "with_s3")]
112    #[serde_as(as = "DisplayFromStr")]
113    pub with_s3: bool,
114
115    #[serde(flatten)]
116    pub s3_inner: Option<S3Common>,
117}
118
119fn default_schedule() -> u64 {
120    3600 // Default to 1 hour
121}
122
123fn default_batch_insert_rows() -> u32 {
124    4096 // Default batch size
125}
126
127fn default_with_s3() -> bool {
128    true
129}
130
131impl RedShiftConfig {
132    pub fn build_client(&self) -> Result<JdbcJniClient> {
133        let mut jdbc_url = self.jdbc_url.clone();
134        if let Some(username) = &self.username {
135            jdbc_url = format!("{}?user={}", jdbc_url, username);
136        }
137        if let Some(password) = &self.password {
138            jdbc_url = format!("{}&password={}", jdbc_url, password);
139        }
140        JdbcJniClient::new(jdbc_url)
141    }
142}
143
144#[derive(Debug)]
145pub struct RedshiftSink {
146    config: RedShiftConfig,
147    param: SinkParam,
148    is_append_only: bool,
149    schema: Schema,
150    pk_indices: Vec<usize>,
151}
152impl EnforceSecret for RedshiftSink {
153    const ENFORCE_SECRET_PROPERTIES: Set<&'static str> = phf_set! {
154        "user",
155        "password",
156        "jdbc.url"
157    };
158}
159
160impl TryFrom<SinkParam> for RedshiftSink {
161    type Error = SinkError;
162
163    fn try_from(param: SinkParam) -> std::result::Result<Self, Self::Error> {
164        let config = serde_json::from_value::<RedShiftConfig>(
165            serde_json::to_value(param.properties.clone()).unwrap(),
166        )
167        .map_err(|e| SinkError::Config(anyhow!(e)))?;
168        let is_append_only = param.sink_type.is_append_only();
169        let schema = param.schema().clone();
170        let pk_indices = param.downstream_pk.clone();
171        Ok(Self {
172            config,
173            param,
174            is_append_only,
175            schema,
176            pk_indices,
177        })
178    }
179}
180
181impl Sink for RedshiftSink {
182    type Coordinator = RedshiftSinkCommitter;
183    type LogSinker = CoordinatedLogSinker<RedShiftSinkWriter>;
184
185    const SINK_NAME: &'static str = REDSHIFT_SINK;
186
187    async fn validate(&self) -> Result<()> {
188        if self.config.create_table_if_not_exists {
189            let client = self.config.build_client()?;
190            let schema = self.param.schema();
191            let build_table_sql = build_create_table_sql(
192                self.config.schema.as_deref(),
193                &self.config.table,
194                &schema,
195                false,
196            )?;
197            client.execute_sql_sync(vec![build_table_sql]).await?;
198            if !self.is_append_only {
199                let cdc_table = self.config.cdc_table.as_ref().ok_or_else(|| {
200                    SinkError::Config(anyhow!(
201                        "intermediate.table.name is required for append-only sink"
202                    ))
203                })?;
204                let build_cdc_table_sql = build_create_table_sql(
205                    self.config.schema.as_deref(),
206                    cdc_table,
207                    &schema,
208                    true,
209                )?;
210                client.execute_sql_sync(vec![build_cdc_table_sql]).await?;
211            }
212        }
213        Ok(())
214    }
215
216    fn support_schema_change() -> bool {
217        true
218    }
219
220    async fn new_log_sinker(
221        &self,
222        writer_param: crate::sink::SinkWriterParam,
223    ) -> Result<Self::LogSinker> {
224        let writer = RedShiftSinkWriter::new(
225            self.config.clone(),
226            self.is_append_only,
227            writer_param.clone(),
228            self.param.clone(),
229        )
230        .await?;
231        CoordinatedLogSinker::new(
232            &writer_param,
233            self.param.clone(),
234            writer,
235            NonZero::new(1).unwrap(),
236        )
237        .await
238    }
239
240    fn is_coordinated_sink(&self) -> bool {
241        true
242    }
243
244    async fn new_coordinator(
245        &self,
246        _db: DatabaseConnection,
247        _iceberg_compact_stat_sender: Option<UnboundedSender<IcebergSinkCompactionUpdate>>,
248    ) -> Result<Self::Coordinator> {
249        let pk_column_names: Vec<_> = self
250            .schema
251            .fields
252            .iter()
253            .enumerate()
254            .filter(|(index, _)| self.pk_indices.contains(index))
255            .map(|(_, field)| field.name.clone())
256            .collect();
257        if pk_column_names.is_empty() && !self.is_append_only {
258            return Err(SinkError::Config(anyhow!(
259                "Primary key columns not found. Please set the `primary_key` column in the sink properties, or ensure that the sink contains the primary key columns from the upstream."
260            )));
261        }
262        let all_column_names = self
263            .schema
264            .fields
265            .iter()
266            .map(|field| field.name.clone())
267            .collect();
268        let coordinator = RedshiftSinkCommitter::new(
269            self.config.clone(),
270            self.is_append_only,
271            &pk_column_names,
272            &all_column_names,
273        )?;
274        Ok(coordinator)
275    }
276}
277
278pub enum RedShiftSinkWriter {
279    S3(SnowflakeRedshiftSinkS3Writer),
280    Jdbc(RedShiftSinkJdbcWriter),
281}
282
283impl RedShiftSinkWriter {
284    pub async fn new(
285        config: RedShiftConfig,
286        is_append_only: bool,
287        writer_param: super::SinkWriterParam,
288        param: SinkParam,
289    ) -> Result<Self> {
290        let schema = param.schema();
291        if config.with_s3 {
292            let executor_id = writer_param.executor_id;
293            let s3_writer = SnowflakeRedshiftSinkS3Writer::new(
294                config.s3_inner.ok_or_else(|| {
295                    SinkError::Config(anyhow!("S3 configuration is required for S3 sink"))
296                })?,
297                schema,
298                is_append_only,
299                executor_id,
300                Some(config.table),
301            )?;
302            Ok(Self::S3(s3_writer))
303        } else {
304            let jdbc_writer =
305                RedShiftSinkJdbcWriter::new(config, is_append_only, writer_param, param).await?;
306            Ok(Self::Jdbc(jdbc_writer))
307        }
308    }
309}
310
311#[async_trait]
312impl SinkWriter for RedShiftSinkWriter {
313    type CommitMetadata = Option<SinkMetadata>;
314
315    async fn begin_epoch(&mut self, epoch: u64) -> Result<()> {
316        match self {
317            Self::S3(writer) => writer.begin_epoch(epoch),
318            Self::Jdbc(writer) => writer.begin_epoch(epoch).await,
319        }
320    }
321
322    async fn write_batch(&mut self, chunk: StreamChunk) -> Result<()> {
323        match self {
324            Self::S3(writer) => writer.write_batch(chunk).await,
325            Self::Jdbc(writer) => writer.write_batch(chunk).await,
326        }
327    }
328
329    async fn barrier(&mut self, is_checkpoint: bool) -> Result<Option<SinkMetadata>> {
330        let metadata = match self {
331            Self::S3(writer) => {
332                if let Some(path) = writer.barrier(is_checkpoint).await? {
333                    path.into_bytes()
334                } else {
335                    vec![]
336                }
337            }
338            Self::Jdbc(writer) => {
339                writer.barrier(is_checkpoint).await?;
340                vec![]
341            }
342        };
343        Ok(Some(SinkMetadata {
344            metadata: Some(sink_metadata::Metadata::Serialized(SerializedMetadata {
345                metadata,
346            })),
347        }))
348    }
349
350    async fn abort(&mut self) -> Result<()> {
351        if let Self::Jdbc(writer) = self {
352            writer.abort().await
353        } else {
354            Ok(())
355        }
356    }
357}
358
359pub struct RedShiftSinkJdbcWriter {
360    augmented_row: AugmentedChunk,
361    jdbc_sink_writer: CoordinatedRemoteSinkWriter,
362}
363
364impl RedShiftSinkJdbcWriter {
365    pub async fn new(
366        config: RedShiftConfig,
367        is_append_only: bool,
368        writer_param: super::SinkWriterParam,
369        mut param: SinkParam,
370    ) -> Result<Self> {
371        let metrics = SinkWriterMetrics::new(&writer_param);
372        let column_descs = &mut param.columns;
373        param.properties.remove("create_table_if_not_exists");
374        param.properties.remove("write.target.interval.seconds");
375        let full_table_name = if is_append_only {
376            config.table
377        } else {
378            let max_column_id = column_descs
379                .iter()
380                .map(|column| column.column_id.get_id())
381                .max()
382                .unwrap_or(0);
383            (*column_descs).push(ColumnDesc::named(
384                __ROW_ID,
385                ColumnId::new(max_column_id + 1),
386                DataType::Varchar,
387            ));
388            (*column_descs).push(ColumnDesc::named(
389                __OP,
390                ColumnId::new(max_column_id + 2),
391                DataType::Int32,
392            ));
393            config.cdc_table.ok_or_else(|| {
394                SinkError::Config(anyhow!(
395                    "intermediate.table.name is required for non-append-only sink"
396                ))
397            })?
398        };
399        param.properties.remove("intermediate.table.name");
400        param.properties.remove("table.name");
401        param.properties.remove("with_s3");
402        if let Some(schema_name) = param.properties.remove("schema") {
403            param
404                .properties
405                .insert("schema.name".to_owned(), schema_name);
406        }
407        param
408            .properties
409            .insert("table.name".to_owned(), full_table_name.clone());
410        param
411            .properties
412            .insert("type".to_owned(), "append-only".to_owned());
413
414        let jdbc_sink_writer =
415            CoordinatedRemoteSinkWriter::new(param.clone(), metrics.clone()).await?;
416        Ok(Self {
417            augmented_row: AugmentedChunk::new(0, is_append_only),
418            jdbc_sink_writer,
419        })
420    }
421
422    async fn begin_epoch(&mut self, epoch: u64) -> Result<()> {
423        self.augmented_row.reset_epoch(epoch);
424        self.jdbc_sink_writer.begin_epoch(epoch).await?;
425        Ok(())
426    }
427
428    async fn write_batch(&mut self, chunk: StreamChunk) -> Result<()> {
429        let chunk = self.augmented_row.augmented_chunk(chunk)?;
430        self.jdbc_sink_writer.write_batch(chunk).await?;
431        Ok(())
432    }
433
434    async fn barrier(&mut self, is_checkpoint: bool) -> Result<()> {
435        self.jdbc_sink_writer.barrier(is_checkpoint).await?;
436        Ok(())
437    }
438
439    async fn abort(&mut self) -> Result<()> {
440        // TODO: abort should clean up all the data written in this epoch.
441        self.jdbc_sink_writer.abort().await?;
442        Ok(())
443    }
444}
445
446pub struct RedshiftSinkCommitter {
447    config: RedShiftConfig,
448    client: JdbcJniClient,
449    pk_column_names: Vec<String>,
450    all_column_names: Vec<String>,
451    schedule_seconds: u64,
452    is_append_only: bool,
453    periodic_task_handle: Option<tokio::task::JoinHandle<()>>,
454    shutdown_sender: Option<tokio::sync::mpsc::UnboundedSender<()>>,
455}
456
457impl RedshiftSinkCommitter {
458    pub fn new(
459        config: RedShiftConfig,
460        is_append_only: bool,
461        pk_column_names: &Vec<String>,
462        all_column_names: &Vec<String>,
463    ) -> Result<Self> {
464        let client = config.build_client()?;
465        let schedule_seconds = config.schedule_seconds;
466        let (periodic_task_handle, shutdown_sender) = if !is_append_only {
467            let schema_name = config.schema.clone();
468            let target_table_name = config.table.clone();
469            let cdc_table_name = config.cdc_table.clone().ok_or_else(|| {
470                SinkError::Config(anyhow!(
471                    "intermediate.table.name is required for non-append-only sink"
472                ))
473            })?;
474            // Create shutdown channel
475            let (shutdown_sender, shutdown_receiver) = unbounded_channel();
476
477            // Clone client for the periodic task
478            let task_client = config.build_client()?;
479
480            let pk_column_names = pk_column_names.clone();
481            let all_column_names = all_column_names.clone();
482            // Start periodic task that runs every hour
483            let periodic_task_handle = tokio::spawn(async move {
484                Self::run_periodic_query_task(
485                    task_client,
486                    schema_name.as_deref(),
487                    &cdc_table_name,
488                    &target_table_name,
489                    pk_column_names,
490                    all_column_names,
491                    schedule_seconds,
492                    shutdown_receiver,
493                )
494                .await;
495            });
496            (Some(periodic_task_handle), Some(shutdown_sender))
497        } else {
498            (None, None)
499        };
500
501        Ok(Self {
502            client,
503            config,
504            pk_column_names: pk_column_names.clone(),
505            all_column_names: all_column_names.clone(),
506            is_append_only,
507            schedule_seconds,
508            periodic_task_handle,
509            shutdown_sender,
510        })
511    }
512
513    /// Runs a periodic query task every hour
514    async fn run_periodic_query_task(
515        client: JdbcJniClient,
516        schema_name: Option<&str>,
517        cdc_table_name: &str,
518        target_table_name: &str,
519        pk_column_names: Vec<String>,
520        all_column_names: Vec<String>,
521        schedule_seconds: u64,
522        mut shutdown_receiver: tokio::sync::mpsc::UnboundedReceiver<()>,
523    ) {
524        let mut interval_timer = interval(Duration::from_secs(schedule_seconds)); // 1 hour = 3600 seconds
525        interval_timer.set_missed_tick_behavior(MissedTickBehavior::Skip);
526        let sql = build_create_merge_into_task_sql(
527            schema_name,
528            cdc_table_name,
529            target_table_name,
530            &pk_column_names,
531            &all_column_names,
532        );
533        loop {
534            tokio::select! {
535                // Check for shutdown signal
536                _ = shutdown_receiver.recv() => {
537                    tracing::info!("Periodic query task received shutdown signal, stopping");
538                    break;
539                }
540                // Execute periodic query
541                _ = interval_timer.tick() => {
542
543                    match client.execute_sql_sync(sql.clone()).await {
544                        Ok(_) => {
545                            tracing::info!("Periodic query executed successfully for table: {}", target_table_name);
546                        }
547                        Err(e) => {
548                            tracing::warn!("Failed to execute periodic query for table {}: {}", target_table_name, e.as_report());
549                        }
550                    }
551                }
552            }
553        }
554    }
555}
556
557impl Drop for RedshiftSinkCommitter {
558    fn drop(&mut self) {
559        // Send shutdown signal to the periodic task
560        if let Some(shutdown_sender) = &self.shutdown_sender
561            && let Err(e) = shutdown_sender.send(())
562        {
563            tracing::warn!(
564                "Failed to send shutdown signal to periodic task: {}",
565                e.as_report()
566            );
567        }
568        tracing::info!("RedshiftSinkCommitter dropped, periodic task stopped");
569    }
570}
571
572#[async_trait]
573impl SinkCommitCoordinator for RedshiftSinkCommitter {
574    async fn init(&mut self, _subscriber: SinkCommittedEpochSubscriber) -> Result<Option<u64>> {
575        Ok(None)
576    }
577
578    async fn commit(
579        &mut self,
580        _epoch: u64,
581        metadata: Vec<SinkMetadata>,
582        add_columns: Option<Vec<Field>>,
583    ) -> Result<()> {
584        let paths = metadata
585            .into_iter()
586            .filter(|m| {
587                if let Some(sink_metadata::Metadata::Serialized(SerializedMetadata { metadata })) =
588                    &m.metadata
589                {
590                    !metadata.is_empty()
591                } else {
592                    false
593                }
594            })
595            .map(|metadata| {
596                let path = if let Some(sink_metadata::Metadata::Serialized(SerializedMetadata {
597                    metadata,
598                })) = metadata.metadata
599                {
600                    String::from_utf8(metadata).map_err(|e| SinkError::Config(anyhow!(e)))
601                } else {
602                    Err(SinkError::Config(anyhow!("Invalid metadata format")))
603                }?;
604                Ok(json!({
605                    "url": path,
606                    "mandatory": true
607                }))
608            })
609            .collect::<Result<Vec<_>>>()?;
610        if !paths.is_empty() {
611            let s3_inner = self.config.s3_inner.as_ref().ok_or_else(|| {
612                SinkError::Config(anyhow!("S3 configuration is required for S3 sink"))
613            })?;
614            let s3_operator = FileSink::<S3Sink>::new_s3_sink(s3_inner)?;
615            let (mut writer, path) =
616                build_opendal_writer_path(s3_inner, 0, &s3_operator, &None).await?;
617            let manifest_json = json!({
618                "entries": paths
619            });
620            let mut chunk_buf = BytesMut::new();
621            writeln!(chunk_buf, "{}", manifest_json).unwrap();
622            writer.write(chunk_buf.freeze()).await?;
623            writer
624                .close()
625                .await
626                .map_err(|e| SinkError::File(e.to_report_string()))?;
627            let table = if self.is_append_only {
628                &self.config.table
629            } else {
630                self.config.cdc_table.as_ref().ok_or_else(|| {
631                    SinkError::Config(anyhow!(
632                        "intermediate.table.name is required for non-append-only sink"
633                    ))
634                })?
635            };
636            let s3_inner = self.config.s3_inner.as_ref().ok_or_else(|| {
637                SinkError::Config(anyhow!("S3 configuration is required for S3 sink"))
638            })?;
639            let copy_into_sql = build_copy_into_sql(
640                self.config.schema.as_deref(),
641                table,
642                &path,
643                &s3_inner.access,
644                &s3_inner.secret,
645                &s3_inner.assume_role,
646            )?;
647            // run copy into
648            self.client.execute_sql_sync(vec![copy_into_sql]).await?;
649        }
650
651        if let Some(add_columns) = add_columns {
652            if let Some(shutdown_sender) = &self.shutdown_sender {
653                // Send shutdown signal to the periodic task before altering the table
654                shutdown_sender
655                    .send(())
656                    .map_err(|e| SinkError::Config(anyhow!(e)))?;
657            }
658            let sql = build_alter_add_column_sql(
659                self.config.schema.as_deref(),
660                &self.config.table,
661                &add_columns
662                    .iter()
663                    .map(|f| (f.name.clone(), f.data_type.to_string()))
664                    .collect::<Vec<_>>(),
665            );
666            let check_column_exists = |e: anyhow::Error| {
667                let err_str = e.to_report_string();
668                if regex::Regex::new(".+ of relation .+ already exists")
669                    .unwrap()
670                    .find(&err_str)
671                    .is_none()
672                {
673                    return Err(e);
674                }
675                warn!("redshift sink columns already exists. skipped");
676                Ok(())
677            };
678            self.client
679                .execute_sql_sync(vec![sql.clone()])
680                .await
681                .or_else(check_column_exists)?;
682            if !self.is_append_only {
683                let cdc_table_name = self.config.cdc_table.as_ref().ok_or_else(|| {
684                    SinkError::Config(anyhow!(
685                        "intermediate.table.name is required for non-append-only sink"
686                    ))
687                })?;
688                let sql = build_alter_add_column_sql(
689                    self.config.schema.as_deref(),
690                    cdc_table_name,
691                    &add_columns
692                        .iter()
693                        .map(|f| (f.name.clone(), f.data_type.to_string()))
694                        .collect::<Vec<_>>(),
695                );
696                self.client
697                    .execute_sql_sync(vec![sql.clone()])
698                    .await
699                    .or_else(check_column_exists)?;
700                self.all_column_names
701                    .extend(add_columns.iter().map(|f| f.name.clone()));
702
703                if let Some(shutdown_sender) = self.shutdown_sender.take() {
704                    let _ = shutdown_sender.send(());
705                }
706                if let Some(periodic_task_handle) = self.periodic_task_handle.take() {
707                    let _ = periodic_task_handle.await;
708                }
709
710                let (shutdown_sender, shutdown_receiver) = unbounded_channel();
711                let client = self.client.clone();
712                let schema_name = self.config.schema.clone();
713                let cdc_table_name = self.config.cdc_table.clone().unwrap();
714                let target_table_name = self.config.table.clone();
715                let pk_column_names = self.pk_column_names.clone();
716                let all_column_names = self.all_column_names.clone();
717                let schedule_seconds = self.schedule_seconds;
718                let periodic_task_handle = tokio::spawn(async move {
719                    Self::run_periodic_query_task(
720                        client,
721                        schema_name.as_deref(),
722                        &cdc_table_name,
723                        &target_table_name,
724                        pk_column_names,
725                        all_column_names,
726                        schedule_seconds,
727                        shutdown_receiver,
728                    )
729                    .await;
730                });
731                self.shutdown_sender = Some(shutdown_sender);
732                self.periodic_task_handle = Some(periodic_task_handle);
733            }
734        }
735        Ok(())
736    }
737}
738
739pub fn build_create_table_sql(
740    schema_name: Option<&str>,
741    table_name: &str,
742    schema: &Schema,
743    need_op_and_row_id: bool,
744) -> Result<String> {
745    let mut columns: Vec<String> = schema
746        .fields
747        .iter()
748        .map(|field| {
749            let data_type = convert_redshift_data_type(&field.data_type)?;
750            Ok(format!("{} {}", field.name, data_type))
751        })
752        .collect::<Result<Vec<String>>>()?;
753    if need_op_and_row_id {
754        columns.push(format!("{} VARCHAR(MAX)", __ROW_ID));
755        columns.push(format!("{} INT", __OP));
756    }
757    let columns_str = columns.join(", ");
758    let full_table_name = build_full_table_name(schema_name, table_name);
759    Ok(format!(
760        "CREATE TABLE IF NOT EXISTS {} ({})",
761        full_table_name, columns_str
762    ))
763}
764
765fn convert_redshift_data_type(data_type: &DataType) -> Result<String> {
766    let data_type = match data_type {
767        DataType::Int16 => "SMALLINT".to_owned(),
768        DataType::Int32 => "INTEGER".to_owned(),
769        DataType::Int64 => "BIGINT".to_owned(),
770        DataType::Float32 => "REAL".to_owned(),
771        DataType::Float64 => "FLOAT".to_owned(),
772        DataType::Boolean => "BOOLEAN".to_owned(),
773        DataType::Varchar => "VARCHAR(MAX)".to_owned(),
774        DataType::Date => "DATE".to_owned(),
775        DataType::Timestamp => "TIMESTAMP".to_owned(),
776        DataType::Timestamptz => "TIMESTAMPTZ".to_owned(),
777        DataType::Jsonb => "VARCHAR(MAX)".to_owned(),
778        DataType::Decimal => "DECIMAL".to_owned(),
779        DataType::Time => "TIME".to_owned(),
780        _ => {
781            return Err(SinkError::Config(anyhow!(
782                "Dont support auto create table for datatype: {}",
783                data_type
784            )));
785        }
786    };
787    Ok(data_type)
788}
789
790fn build_create_merge_into_task_sql(
791    schema_name: Option<&str>,
792    cdc_table_name: &str,
793    target_table_name: &str,
794    pk_column_names: &Vec<String>,
795    all_column_names: &Vec<String>,
796) -> Vec<String> {
797    let cdc_table_name = build_full_table_name(schema_name, cdc_table_name);
798    let target_table_name = build_full_table_name(schema_name, target_table_name);
799    let pk_names_str = pk_column_names.join(", ");
800    let pk_names_eq_str = pk_column_names
801        .iter()
802        .map(|name| format!("{target_table_name}.{name} = source.{name}", name = name))
803        .collect::<Vec<String>>()
804        .join(" AND ");
805    let all_column_names_set_str = all_column_names
806        .iter()
807        .map(|name| format!("{name} = source.{name}", name = name))
808        .collect::<Vec<String>>()
809        .join(", ");
810    let all_column_names_str = all_column_names.join(", ");
811    let all_column_names_insert_str = all_column_names
812        .iter()
813        .map(|name| format!("source.{name}", name = name))
814        .collect::<Vec<String>>()
815        .join(", ");
816
817    vec![
818        format!(
819            r#"
820            CREATE TEMP TABLE max_id_table AS
821            SELECT COALESCE(MAX({redshift_sink_row_id}), '0') AS max_row_id
822            FROM {cdc_table_name};
823            "#,
824            redshift_sink_row_id = __ROW_ID,
825            cdc_table_name = cdc_table_name,
826        ),
827        format!(
828            r#"
829            DELETE FROM {target_table_name}
830            USING (
831                SELECT *
832                FROM (
833                    SELECT *, ROW_NUMBER() OVER (
834                        PARTITION BY {pk_names_str}
835                        ORDER BY {redshift_sink_row_id} DESC
836                    ) AS dedupe_id
837                    FROM {cdc_table_name}, max_id_table
838                    WHERE {cdc_table_name}.{redshift_sink_row_id} <= max_id_table.max_row_id
839                ) AS subquery
840                WHERE dedupe_id = 1 AND {redshift_sink_op} IN (2, 4)
841            ) AS source
842            WHERE {pk_names_eq_str};
843            "#,
844            target_table_name = target_table_name,
845            pk_names_str = pk_names_str,
846            redshift_sink_row_id = __ROW_ID,
847            cdc_table_name = cdc_table_name,
848            redshift_sink_op = __OP,
849            pk_names_eq_str = pk_names_eq_str,
850        ),
851        format!(
852            r#"
853            MERGE INTO {target_table_name}
854            USING (
855                SELECT *
856                FROM (
857                    SELECT *, ROW_NUMBER() OVER (
858                        PARTITION BY {pk_names_str}
859                        ORDER BY {redshift_sink_row_id} DESC
860                    ) AS dedupe_id
861                    FROM {cdc_table_name}, max_id_table
862                    WHERE {cdc_table_name}.{redshift_sink_row_id} <= max_id_table.max_row_id
863                ) AS subquery
864                WHERE dedupe_id = 1 AND {redshift_sink_op} IN (1, 3)
865            ) AS source
866            ON {pk_names_eq_str}
867            WHEN MATCHED THEN
868                UPDATE SET {all_column_names_set_str}
869            WHEN NOT MATCHED THEN
870                INSERT ({all_column_names_str}) VALUES ({all_column_names_insert_str});
871            "#,
872            target_table_name = target_table_name,
873            pk_names_str = pk_names_str,
874            redshift_sink_row_id = __ROW_ID,
875            cdc_table_name = cdc_table_name,
876            redshift_sink_op = __OP,
877            pk_names_eq_str = pk_names_eq_str,
878            all_column_names_set_str = all_column_names_set_str,
879            all_column_names_str = all_column_names_str,
880            all_column_names_insert_str = all_column_names_insert_str,
881        ),
882        format!(
883            r#"
884            DELETE FROM {cdc_table_name}
885            USING max_id_table
886            WHERE {cdc_table_name}.{redshift_sink_row_id} <= max_id_table.max_row_id;
887            "#,
888            cdc_table_name = cdc_table_name,
889            redshift_sink_row_id = __ROW_ID,
890        ),
891        "DROP TABLE IF EXISTS max_id_table;".to_owned(),
892    ]
893}
894
895fn build_copy_into_sql(
896    schema_name: Option<&str>,
897    table_name: &str,
898    manifest_path: &str,
899    access_key: &Option<String>,
900    secret_key: &Option<String>,
901    assume_role: &Option<String>,
902) -> Result<String> {
903    let table_name = build_full_table_name(schema_name, table_name);
904    let credentials = if let Some(assume_role) = assume_role {
905        &format!("aws_iam_role={}", assume_role)
906    } else if let (Some(access_key), Some(secret_key)) = (access_key, secret_key) {
907        &format!(
908            "aws_access_key_id={};aws_secret_access_key={}",
909            access_key, secret_key
910        )
911    } else {
912        return Err(SinkError::Config(anyhow!(
913            "Either assume_role or access_key and secret_key must be provided for Redshift COPY command"
914        )));
915    };
916    Ok(format!(
917        r#"
918        COPY {table_name}
919        FROM '{manifest_path}'
920        CREDENTIALS '{credentials}'
921        FORMAT AS JSON 'auto'
922        MANIFEST;
923        "#,
924        table_name = table_name,
925        manifest_path = manifest_path,
926        credentials = credentials
927    ))
928}