risingwave_connector/sink/snowflake_redshift/
redshift.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use core::num::NonZero;
16use std::fmt::Write;
17use std::time::Duration;
18
19use anyhow::anyhow;
20use bytes::BytesMut;
21use phf::{Set, phf_set};
22use risingwave_common::array::StreamChunk;
23use risingwave_common::catalog::{ColumnDesc, ColumnId, Field, Schema};
24use risingwave_common::types::DataType;
25use risingwave_pb::connector_service::sink_metadata::SerializedMetadata;
26use risingwave_pb::connector_service::{SinkMetadata, sink_metadata};
27use serde::Deserialize;
28use serde_json::json;
29use serde_with::{DisplayFromStr, serde_as};
30use thiserror_ext::AsReport;
31use tokio::sync::mpsc::{UnboundedSender, unbounded_channel};
32use tokio::time::{MissedTickBehavior, interval};
33use tonic::async_trait;
34use tracing::warn;
35use with_options::WithOptions;
36
37use crate::connector_common::IcebergSinkCompactionUpdate;
38use crate::enforce_secret::EnforceSecret;
39use crate::sink::coordinate::CoordinatedLogSinker;
40use crate::sink::file_sink::opendal_sink::FileSink;
41use crate::sink::file_sink::s3::{S3Common, S3Sink};
42use crate::sink::jdbc_jni_client::{self, JdbcJniClient};
43use crate::sink::remote::CoordinatedRemoteSinkWriter;
44use crate::sink::snowflake_redshift::{
45    __OP, __ROW_ID, AugmentedChunk, SnowflakeRedshiftSinkS3Writer, build_opendal_writer_path,
46};
47use crate::sink::writer::SinkWriter;
48use crate::sink::{
49    Result, SinglePhaseCommitCoordinator, Sink, SinkCommitCoordinator, SinkError, SinkParam,
50    SinkWriterMetrics,
51};
52
53pub const REDSHIFT_SINK: &str = "redshift";
54
55fn build_full_table_name(schema_name: Option<&str>, table_name: &str) -> String {
56    if let Some(schema_name) = schema_name {
57        format!(r#""{}"."{}""#, schema_name, table_name)
58    } else {
59        format!(r#""{}""#, table_name)
60    }
61}
62
63fn build_alter_add_column_sql(
64    schema_name: Option<&str>,
65    table_name: &str,
66    columns: &Vec<(String, String)>,
67) -> String {
68    let full_table_name = build_full_table_name(schema_name, table_name);
69    // redshift does not support add column IF NOT EXISTS yet.
70    jdbc_jni_client::build_alter_add_column_sql(&full_table_name, columns, false)
71}
72
73#[serde_as]
74#[derive(Debug, Clone, Deserialize, WithOptions)]
75pub struct RedShiftConfig {
76    #[serde(rename = "jdbc.url")]
77    pub jdbc_url: String,
78
79    #[serde(rename = "user")]
80    pub username: Option<String>,
81
82    #[serde(rename = "password")]
83    pub password: Option<String>,
84
85    #[serde(rename = "schema")]
86    pub schema: Option<String>,
87
88    #[serde(rename = "table.name")]
89    pub table: String,
90
91    #[serde(rename = "intermediate.table.name")]
92    pub cdc_table: Option<String>,
93
94    #[serde(default)]
95    #[serde(rename = "create_table_if_not_exists")]
96    #[serde_as(as = "DisplayFromStr")]
97    pub create_table_if_not_exists: bool,
98
99    #[serde(default = "default_schedule")]
100    #[serde(rename = "write.target.interval.seconds")]
101    #[serde_as(as = "DisplayFromStr")]
102    pub schedule_seconds: u64,
103
104    #[serde(default = "default_batch_insert_rows")]
105    #[serde(rename = "batch.insert.rows")]
106    #[serde_as(as = "DisplayFromStr")]
107    pub batch_insert_rows: u32,
108
109    #[serde(default = "default_with_s3")]
110    #[serde(rename = "with_s3")]
111    #[serde_as(as = "DisplayFromStr")]
112    pub with_s3: bool,
113
114    #[serde(flatten)]
115    pub s3_inner: Option<S3Common>,
116}
117
118fn default_schedule() -> u64 {
119    3600 // Default to 1 hour
120}
121
122fn default_batch_insert_rows() -> u32 {
123    4096 // Default batch size
124}
125
126fn default_with_s3() -> bool {
127    true
128}
129
130impl RedShiftConfig {
131    pub fn build_client(&self) -> Result<JdbcJniClient> {
132        let mut jdbc_url = self.jdbc_url.clone();
133        if let Some(username) = &self.username {
134            jdbc_url = format!("{}?user={}", jdbc_url, username);
135        }
136        if let Some(password) = &self.password {
137            jdbc_url = format!("{}&password={}", jdbc_url, password);
138        }
139        JdbcJniClient::new(jdbc_url)
140    }
141}
142
143#[derive(Debug)]
144pub struct RedshiftSink {
145    config: RedShiftConfig,
146    param: SinkParam,
147    is_append_only: bool,
148    schema: Schema,
149    pk_indices: Vec<usize>,
150}
151impl EnforceSecret for RedshiftSink {
152    const ENFORCE_SECRET_PROPERTIES: Set<&'static str> = phf_set! {
153        "user",
154        "password",
155        "jdbc.url"
156    };
157}
158
159impl TryFrom<SinkParam> for RedshiftSink {
160    type Error = SinkError;
161
162    fn try_from(param: SinkParam) -> std::result::Result<Self, Self::Error> {
163        let config = serde_json::from_value::<RedShiftConfig>(
164            serde_json::to_value(param.properties.clone()).unwrap(),
165        )
166        .map_err(|e| SinkError::Config(anyhow!(e)))?;
167        let is_append_only = param.sink_type.is_append_only();
168        let schema = param.schema();
169        let pk_indices = param.downstream_pk_or_empty();
170        Ok(Self {
171            config,
172            param,
173            is_append_only,
174            schema,
175            pk_indices,
176        })
177    }
178}
179
180impl Sink for RedshiftSink {
181    type LogSinker = CoordinatedLogSinker<RedShiftSinkWriter>;
182
183    const SINK_NAME: &'static str = REDSHIFT_SINK;
184
185    async fn validate(&self) -> Result<()> {
186        if self.config.create_table_if_not_exists {
187            let client = self.config.build_client()?;
188            let schema = self.param.schema();
189            let build_table_sql = build_create_table_sql(
190                self.config.schema.as_deref(),
191                &self.config.table,
192                &schema,
193                false,
194            )?;
195            client.execute_sql_sync(vec![build_table_sql]).await?;
196            if !self.is_append_only {
197                let cdc_table = self.config.cdc_table.as_ref().ok_or_else(|| {
198                    SinkError::Config(anyhow!(
199                        "intermediate.table.name is required for append-only sink"
200                    ))
201                })?;
202                let build_cdc_table_sql = build_create_table_sql(
203                    self.config.schema.as_deref(),
204                    cdc_table,
205                    &schema,
206                    true,
207                )?;
208                client.execute_sql_sync(vec![build_cdc_table_sql]).await?;
209            }
210        }
211        Ok(())
212    }
213
214    fn support_schema_change() -> bool {
215        true
216    }
217
218    async fn new_log_sinker(
219        &self,
220        writer_param: crate::sink::SinkWriterParam,
221    ) -> Result<Self::LogSinker> {
222        let writer = RedShiftSinkWriter::new(
223            self.config.clone(),
224            self.is_append_only,
225            writer_param.clone(),
226            self.param.clone(),
227        )
228        .await?;
229        CoordinatedLogSinker::new(
230            &writer_param,
231            self.param.clone(),
232            writer,
233            NonZero::new(1).unwrap(),
234        )
235        .await
236    }
237
238    fn is_coordinated_sink(&self) -> bool {
239        true
240    }
241
242    async fn new_coordinator(
243        &self,
244        _iceberg_compact_stat_sender: Option<UnboundedSender<IcebergSinkCompactionUpdate>>,
245    ) -> Result<SinkCommitCoordinator> {
246        let pk_column_names: Vec<_> = self
247            .schema
248            .fields
249            .iter()
250            .enumerate()
251            .filter(|(index, _)| self.pk_indices.contains(index))
252            .map(|(_, field)| field.name.clone())
253            .collect();
254        if pk_column_names.is_empty() && !self.is_append_only {
255            return Err(SinkError::Config(anyhow!(
256                "Primary key columns not found. Please set the `primary_key` column in the sink properties, or ensure that the sink contains the primary key columns from the upstream."
257            )));
258        }
259        let all_column_names = self
260            .schema
261            .fields
262            .iter()
263            .map(|field| field.name.clone())
264            .collect();
265        let coordinator = RedshiftSinkCommitter::new(
266            self.config.clone(),
267            self.is_append_only,
268            &pk_column_names,
269            &all_column_names,
270        )?;
271        Ok(SinkCommitCoordinator::SinglePhase(Box::new(coordinator)))
272    }
273}
274
275pub enum RedShiftSinkWriter {
276    S3(SnowflakeRedshiftSinkS3Writer),
277    Jdbc(RedShiftSinkJdbcWriter),
278}
279
280impl RedShiftSinkWriter {
281    pub async fn new(
282        config: RedShiftConfig,
283        is_append_only: bool,
284        writer_param: super::SinkWriterParam,
285        param: SinkParam,
286    ) -> Result<Self> {
287        let schema = param.schema();
288        if config.with_s3 {
289            let executor_id = writer_param.executor_id;
290            let s3_writer = SnowflakeRedshiftSinkS3Writer::new(
291                config.s3_inner.ok_or_else(|| {
292                    SinkError::Config(anyhow!("S3 configuration is required for S3 sink"))
293                })?,
294                schema,
295                is_append_only,
296                executor_id,
297                Some(config.table),
298            )?;
299            Ok(Self::S3(s3_writer))
300        } else {
301            let jdbc_writer =
302                RedShiftSinkJdbcWriter::new(config, is_append_only, writer_param, param).await?;
303            Ok(Self::Jdbc(jdbc_writer))
304        }
305    }
306}
307
308#[async_trait]
309impl SinkWriter for RedShiftSinkWriter {
310    type CommitMetadata = Option<SinkMetadata>;
311
312    async fn begin_epoch(&mut self, epoch: u64) -> Result<()> {
313        match self {
314            Self::S3(writer) => writer.begin_epoch(epoch),
315            Self::Jdbc(writer) => writer.begin_epoch(epoch).await,
316        }
317    }
318
319    async fn write_batch(&mut self, chunk: StreamChunk) -> Result<()> {
320        match self {
321            Self::S3(writer) => writer.write_batch(chunk).await,
322            Self::Jdbc(writer) => writer.write_batch(chunk).await,
323        }
324    }
325
326    async fn barrier(&mut self, is_checkpoint: bool) -> Result<Option<SinkMetadata>> {
327        let metadata = match self {
328            Self::S3(writer) => {
329                if let Some(path) = writer.barrier(is_checkpoint).await? {
330                    path.into_bytes()
331                } else {
332                    vec![]
333                }
334            }
335            Self::Jdbc(writer) => {
336                writer.barrier(is_checkpoint).await?;
337                vec![]
338            }
339        };
340        Ok(Some(SinkMetadata {
341            metadata: Some(sink_metadata::Metadata::Serialized(SerializedMetadata {
342                metadata,
343            })),
344        }))
345    }
346
347    async fn abort(&mut self) -> Result<()> {
348        if let Self::Jdbc(writer) = self {
349            writer.abort().await
350        } else {
351            Ok(())
352        }
353    }
354}
355
356pub struct RedShiftSinkJdbcWriter {
357    augmented_row: AugmentedChunk,
358    jdbc_sink_writer: CoordinatedRemoteSinkWriter,
359}
360
361impl RedShiftSinkJdbcWriter {
362    pub async fn new(
363        config: RedShiftConfig,
364        is_append_only: bool,
365        writer_param: super::SinkWriterParam,
366        mut param: SinkParam,
367    ) -> Result<Self> {
368        let metrics = SinkWriterMetrics::new(&writer_param);
369        let column_descs = &mut param.columns;
370        param.properties.remove("create_table_if_not_exists");
371        param.properties.remove("write.target.interval.seconds");
372        let full_table_name = if is_append_only {
373            config.table
374        } else {
375            let max_column_id = column_descs
376                .iter()
377                .map(|column| column.column_id.get_id())
378                .max()
379                .unwrap_or(0);
380            (*column_descs).push(ColumnDesc::named(
381                __ROW_ID,
382                ColumnId::new(max_column_id + 1),
383                DataType::Varchar,
384            ));
385            (*column_descs).push(ColumnDesc::named(
386                __OP,
387                ColumnId::new(max_column_id + 2),
388                DataType::Int32,
389            ));
390            config.cdc_table.ok_or_else(|| {
391                SinkError::Config(anyhow!(
392                    "intermediate.table.name is required for non-append-only sink"
393                ))
394            })?
395        };
396        param.properties.remove("intermediate.table.name");
397        param.properties.remove("table.name");
398        param.properties.remove("with_s3");
399        if let Some(schema_name) = param.properties.remove("schema") {
400            param
401                .properties
402                .insert("schema.name".to_owned(), schema_name);
403        }
404        param
405            .properties
406            .insert("table.name".to_owned(), full_table_name.clone());
407        param
408            .properties
409            .insert("type".to_owned(), "append-only".to_owned());
410
411        let jdbc_sink_writer =
412            CoordinatedRemoteSinkWriter::new(param.clone(), metrics.clone()).await?;
413        Ok(Self {
414            augmented_row: AugmentedChunk::new(0, is_append_only),
415            jdbc_sink_writer,
416        })
417    }
418
419    async fn begin_epoch(&mut self, epoch: u64) -> Result<()> {
420        self.augmented_row.reset_epoch(epoch);
421        self.jdbc_sink_writer.begin_epoch(epoch).await?;
422        Ok(())
423    }
424
425    async fn write_batch(&mut self, chunk: StreamChunk) -> Result<()> {
426        let chunk = self.augmented_row.augmented_chunk(chunk)?;
427        self.jdbc_sink_writer.write_batch(chunk).await?;
428        Ok(())
429    }
430
431    async fn barrier(&mut self, is_checkpoint: bool) -> Result<()> {
432        self.jdbc_sink_writer.barrier(is_checkpoint).await?;
433        Ok(())
434    }
435
436    async fn abort(&mut self) -> Result<()> {
437        // TODO: abort should clean up all the data written in this epoch.
438        self.jdbc_sink_writer.abort().await?;
439        Ok(())
440    }
441}
442
443pub struct RedshiftSinkCommitter {
444    config: RedShiftConfig,
445    client: JdbcJniClient,
446    pk_column_names: Vec<String>,
447    all_column_names: Vec<String>,
448    schedule_seconds: u64,
449    is_append_only: bool,
450    periodic_task_handle: Option<tokio::task::JoinHandle<()>>,
451    shutdown_sender: Option<tokio::sync::mpsc::UnboundedSender<()>>,
452}
453
454impl RedshiftSinkCommitter {
455    pub fn new(
456        config: RedShiftConfig,
457        is_append_only: bool,
458        pk_column_names: &Vec<String>,
459        all_column_names: &Vec<String>,
460    ) -> Result<Self> {
461        let client = config.build_client()?;
462        let schedule_seconds = config.schedule_seconds;
463        let (periodic_task_handle, shutdown_sender) = if !is_append_only {
464            let schema_name = config.schema.clone();
465            let target_table_name = config.table.clone();
466            let cdc_table_name = config.cdc_table.clone().ok_or_else(|| {
467                SinkError::Config(anyhow!(
468                    "intermediate.table.name is required for non-append-only sink"
469                ))
470            })?;
471            // Create shutdown channel
472            let (shutdown_sender, shutdown_receiver) = unbounded_channel();
473
474            // Clone client for the periodic task
475            let task_client = config.build_client()?;
476
477            let pk_column_names = pk_column_names.clone();
478            let all_column_names = all_column_names.clone();
479            // Start periodic task that runs every hour
480            let periodic_task_handle = tokio::spawn(async move {
481                Self::run_periodic_query_task(
482                    task_client,
483                    schema_name.as_deref(),
484                    &cdc_table_name,
485                    &target_table_name,
486                    pk_column_names,
487                    all_column_names,
488                    schedule_seconds,
489                    shutdown_receiver,
490                )
491                .await;
492            });
493            (Some(periodic_task_handle), Some(shutdown_sender))
494        } else {
495            (None, None)
496        };
497
498        Ok(Self {
499            client,
500            config,
501            pk_column_names: pk_column_names.clone(),
502            all_column_names: all_column_names.clone(),
503            is_append_only,
504            schedule_seconds,
505            periodic_task_handle,
506            shutdown_sender,
507        })
508    }
509
510    /// Runs a periodic query task every hour
511    async fn run_periodic_query_task(
512        client: JdbcJniClient,
513        schema_name: Option<&str>,
514        cdc_table_name: &str,
515        target_table_name: &str,
516        pk_column_names: Vec<String>,
517        all_column_names: Vec<String>,
518        schedule_seconds: u64,
519        mut shutdown_receiver: tokio::sync::mpsc::UnboundedReceiver<()>,
520    ) {
521        let mut interval_timer = interval(Duration::from_secs(schedule_seconds)); // 1 hour = 3600 seconds
522        interval_timer.set_missed_tick_behavior(MissedTickBehavior::Skip);
523        let sql = build_create_merge_into_task_sql(
524            schema_name,
525            cdc_table_name,
526            target_table_name,
527            &pk_column_names,
528            &all_column_names,
529        );
530        loop {
531            tokio::select! {
532                // Check for shutdown signal
533                _ = shutdown_receiver.recv() => {
534                    tracing::info!("Periodic query task received shutdown signal, stopping");
535                    break;
536                }
537                // Execute periodic query
538                _ = interval_timer.tick() => {
539
540                    match client.execute_sql_sync(sql.clone()).await {
541                        Ok(_) => {
542                            tracing::info!("Periodic query executed successfully for table: {}", target_table_name);
543                        }
544                        Err(e) => {
545                            tracing::warn!("Failed to execute periodic query for table {}: {}", target_table_name, e.as_report());
546                        }
547                    }
548                }
549            }
550        }
551    }
552}
553
554impl Drop for RedshiftSinkCommitter {
555    fn drop(&mut self) {
556        // Send shutdown signal to the periodic task
557        if let Some(shutdown_sender) = &self.shutdown_sender
558            && let Err(e) = shutdown_sender.send(())
559        {
560            tracing::warn!(
561                "Failed to send shutdown signal to periodic task: {}",
562                e.as_report()
563            );
564        }
565        tracing::info!("RedshiftSinkCommitter dropped, periodic task stopped");
566    }
567}
568
569#[async_trait]
570impl SinglePhaseCommitCoordinator for RedshiftSinkCommitter {
571    async fn init(&mut self) -> Result<()> {
572        Ok(())
573    }
574
575    async fn commit(
576        &mut self,
577        _epoch: u64,
578        metadata: Vec<SinkMetadata>,
579        add_columns: Option<Vec<Field>>,
580    ) -> Result<()> {
581        let paths = metadata
582            .into_iter()
583            .filter(|m| {
584                if let Some(sink_metadata::Metadata::Serialized(SerializedMetadata { metadata })) =
585                    &m.metadata
586                {
587                    !metadata.is_empty()
588                } else {
589                    false
590                }
591            })
592            .map(|metadata| {
593                let path = if let Some(sink_metadata::Metadata::Serialized(SerializedMetadata {
594                    metadata,
595                })) = metadata.metadata
596                {
597                    String::from_utf8(metadata).map_err(|e| SinkError::Config(anyhow!(e)))
598                } else {
599                    Err(SinkError::Config(anyhow!("Invalid metadata format")))
600                }?;
601                Ok(json!({
602                    "url": path,
603                    "mandatory": true
604                }))
605            })
606            .collect::<Result<Vec<_>>>()?;
607        if !paths.is_empty() {
608            let s3_inner = self.config.s3_inner.as_ref().ok_or_else(|| {
609                SinkError::Config(anyhow!("S3 configuration is required for S3 sink"))
610            })?;
611            let s3_operator = FileSink::<S3Sink>::new_s3_sink(s3_inner)?;
612            let (mut writer, path) =
613                build_opendal_writer_path(s3_inner, 0, &s3_operator, &None).await?;
614            let manifest_json = json!({
615                "entries": paths
616            });
617            let mut chunk_buf = BytesMut::new();
618            writeln!(chunk_buf, "{}", manifest_json).unwrap();
619            writer.write(chunk_buf.freeze()).await?;
620            writer
621                .close()
622                .await
623                .map_err(|e| SinkError::File(e.to_report_string()))?;
624            let table = if self.is_append_only {
625                &self.config.table
626            } else {
627                self.config.cdc_table.as_ref().ok_or_else(|| {
628                    SinkError::Config(anyhow!(
629                        "intermediate.table.name is required for non-append-only sink"
630                    ))
631                })?
632            };
633            let s3_inner = self.config.s3_inner.as_ref().ok_or_else(|| {
634                SinkError::Config(anyhow!("S3 configuration is required for S3 sink"))
635            })?;
636            let copy_into_sql = build_copy_into_sql(
637                self.config.schema.as_deref(),
638                table,
639                &path,
640                &s3_inner.access,
641                &s3_inner.secret,
642                &s3_inner.assume_role,
643            )?;
644            // run copy into
645            self.client.execute_sql_sync(vec![copy_into_sql]).await?;
646        }
647
648        if let Some(add_columns) = add_columns {
649            if let Some(shutdown_sender) = &self.shutdown_sender {
650                // Send shutdown signal to the periodic task before altering the table
651                shutdown_sender
652                    .send(())
653                    .map_err(|e| SinkError::Config(anyhow!(e)))?;
654            }
655            let sql = build_alter_add_column_sql(
656                self.config.schema.as_deref(),
657                &self.config.table,
658                &add_columns
659                    .iter()
660                    .map(|f| (f.name.clone(), f.data_type.to_string()))
661                    .collect::<Vec<_>>(),
662            );
663            let check_column_exists = |e: anyhow::Error| {
664                let err_str = e.to_report_string();
665                if regex::Regex::new(".+ of relation .+ already exists")
666                    .unwrap()
667                    .find(&err_str)
668                    .is_none()
669                {
670                    return Err(e);
671                }
672                warn!("redshift sink columns already exists. skipped");
673                Ok(())
674            };
675            self.client
676                .execute_sql_sync(vec![sql.clone()])
677                .await
678                .or_else(check_column_exists)?;
679            if !self.is_append_only {
680                let cdc_table_name = self.config.cdc_table.as_ref().ok_or_else(|| {
681                    SinkError::Config(anyhow!(
682                        "intermediate.table.name is required for non-append-only sink"
683                    ))
684                })?;
685                let sql = build_alter_add_column_sql(
686                    self.config.schema.as_deref(),
687                    cdc_table_name,
688                    &add_columns
689                        .iter()
690                        .map(|f| (f.name.clone(), f.data_type.to_string()))
691                        .collect::<Vec<_>>(),
692                );
693                self.client
694                    .execute_sql_sync(vec![sql.clone()])
695                    .await
696                    .or_else(check_column_exists)?;
697                self.all_column_names
698                    .extend(add_columns.iter().map(|f| f.name.clone()));
699
700                if let Some(shutdown_sender) = self.shutdown_sender.take() {
701                    let _ = shutdown_sender.send(());
702                }
703                if let Some(periodic_task_handle) = self.periodic_task_handle.take() {
704                    let _ = periodic_task_handle.await;
705                }
706
707                let (shutdown_sender, shutdown_receiver) = unbounded_channel();
708                let client = self.client.clone();
709                let schema_name = self.config.schema.clone();
710                let cdc_table_name = self.config.cdc_table.clone().unwrap();
711                let target_table_name = self.config.table.clone();
712                let pk_column_names = self.pk_column_names.clone();
713                let all_column_names = self.all_column_names.clone();
714                let schedule_seconds = self.schedule_seconds;
715                let periodic_task_handle = tokio::spawn(async move {
716                    Self::run_periodic_query_task(
717                        client,
718                        schema_name.as_deref(),
719                        &cdc_table_name,
720                        &target_table_name,
721                        pk_column_names,
722                        all_column_names,
723                        schedule_seconds,
724                        shutdown_receiver,
725                    )
726                    .await;
727                });
728                self.shutdown_sender = Some(shutdown_sender);
729                self.periodic_task_handle = Some(periodic_task_handle);
730            }
731        }
732        Ok(())
733    }
734}
735
736pub fn build_create_table_sql(
737    schema_name: Option<&str>,
738    table_name: &str,
739    schema: &Schema,
740    need_op_and_row_id: bool,
741) -> Result<String> {
742    let mut columns: Vec<String> = schema
743        .fields
744        .iter()
745        .map(|field| {
746            let data_type = convert_redshift_data_type(&field.data_type)?;
747            Ok(format!("{} {}", field.name, data_type))
748        })
749        .collect::<Result<Vec<String>>>()?;
750    if need_op_and_row_id {
751        columns.push(format!("{} VARCHAR(MAX)", __ROW_ID));
752        columns.push(format!("{} INT", __OP));
753    }
754    let columns_str = columns.join(", ");
755    let full_table_name = build_full_table_name(schema_name, table_name);
756    Ok(format!(
757        "CREATE TABLE IF NOT EXISTS {} ({})",
758        full_table_name, columns_str
759    ))
760}
761
762fn convert_redshift_data_type(data_type: &DataType) -> Result<String> {
763    let data_type = match data_type {
764        DataType::Int16 => "SMALLINT".to_owned(),
765        DataType::Int32 => "INTEGER".to_owned(),
766        DataType::Int64 => "BIGINT".to_owned(),
767        DataType::Float32 => "REAL".to_owned(),
768        DataType::Float64 => "FLOAT".to_owned(),
769        DataType::Boolean => "BOOLEAN".to_owned(),
770        DataType::Varchar => "VARCHAR(MAX)".to_owned(),
771        DataType::Date => "DATE".to_owned(),
772        DataType::Timestamp => "TIMESTAMP".to_owned(),
773        DataType::Timestamptz => "TIMESTAMPTZ".to_owned(),
774        DataType::Jsonb => "VARCHAR(MAX)".to_owned(),
775        DataType::Decimal => "DECIMAL".to_owned(),
776        DataType::Time => "TIME".to_owned(),
777        _ => {
778            return Err(SinkError::Config(anyhow!(
779                "Dont support auto create table for datatype: {}",
780                data_type
781            )));
782        }
783    };
784    Ok(data_type)
785}
786
787fn build_create_merge_into_task_sql(
788    schema_name: Option<&str>,
789    cdc_table_name: &str,
790    target_table_name: &str,
791    pk_column_names: &Vec<String>,
792    all_column_names: &Vec<String>,
793) -> Vec<String> {
794    let cdc_table_name = build_full_table_name(schema_name, cdc_table_name);
795    let target_table_name = build_full_table_name(schema_name, target_table_name);
796    let pk_names_str = pk_column_names.join(", ");
797    let pk_names_eq_str = pk_column_names
798        .iter()
799        .map(|name| format!("{target_table_name}.{name} = source.{name}", name = name))
800        .collect::<Vec<String>>()
801        .join(" AND ");
802    let all_column_names_set_str = all_column_names
803        .iter()
804        .map(|name| format!("{name} = source.{name}", name = name))
805        .collect::<Vec<String>>()
806        .join(", ");
807    let all_column_names_str = all_column_names.join(", ");
808    let all_column_names_insert_str = all_column_names
809        .iter()
810        .map(|name| format!("source.{name}", name = name))
811        .collect::<Vec<String>>()
812        .join(", ");
813
814    vec![
815        format!(
816            r#"
817            CREATE TEMP TABLE max_id_table AS
818            SELECT COALESCE(MAX({redshift_sink_row_id}), '0') AS max_row_id
819            FROM {cdc_table_name};
820            "#,
821            redshift_sink_row_id = __ROW_ID,
822            cdc_table_name = cdc_table_name,
823        ),
824        format!(
825            r#"
826            DELETE FROM {target_table_name}
827            USING (
828                SELECT *
829                FROM (
830                    SELECT *, ROW_NUMBER() OVER (
831                        PARTITION BY {pk_names_str}
832                        ORDER BY {redshift_sink_row_id} DESC
833                    ) AS dedupe_id
834                    FROM {cdc_table_name}, max_id_table
835                    WHERE {cdc_table_name}.{redshift_sink_row_id} <= max_id_table.max_row_id
836                ) AS subquery
837                WHERE dedupe_id = 1 AND {redshift_sink_op} IN (2, 4)
838            ) AS source
839            WHERE {pk_names_eq_str};
840            "#,
841            target_table_name = target_table_name,
842            pk_names_str = pk_names_str,
843            redshift_sink_row_id = __ROW_ID,
844            cdc_table_name = cdc_table_name,
845            redshift_sink_op = __OP,
846            pk_names_eq_str = pk_names_eq_str,
847        ),
848        format!(
849            r#"
850            MERGE INTO {target_table_name}
851            USING (
852                SELECT *
853                FROM (
854                    SELECT *, ROW_NUMBER() OVER (
855                        PARTITION BY {pk_names_str}
856                        ORDER BY {redshift_sink_row_id} DESC
857                    ) AS dedupe_id
858                    FROM {cdc_table_name}, max_id_table
859                    WHERE {cdc_table_name}.{redshift_sink_row_id} <= max_id_table.max_row_id
860                ) AS subquery
861                WHERE dedupe_id = 1 AND {redshift_sink_op} IN (1, 3)
862            ) AS source
863            ON {pk_names_eq_str}
864            WHEN MATCHED THEN
865                UPDATE SET {all_column_names_set_str}
866            WHEN NOT MATCHED THEN
867                INSERT ({all_column_names_str}) VALUES ({all_column_names_insert_str});
868            "#,
869            target_table_name = target_table_name,
870            pk_names_str = pk_names_str,
871            redshift_sink_row_id = __ROW_ID,
872            cdc_table_name = cdc_table_name,
873            redshift_sink_op = __OP,
874            pk_names_eq_str = pk_names_eq_str,
875            all_column_names_set_str = all_column_names_set_str,
876            all_column_names_str = all_column_names_str,
877            all_column_names_insert_str = all_column_names_insert_str,
878        ),
879        format!(
880            r#"
881            DELETE FROM {cdc_table_name}
882            USING max_id_table
883            WHERE {cdc_table_name}.{redshift_sink_row_id} <= max_id_table.max_row_id;
884            "#,
885            cdc_table_name = cdc_table_name,
886            redshift_sink_row_id = __ROW_ID,
887        ),
888        "DROP TABLE IF EXISTS max_id_table;".to_owned(),
889    ]
890}
891
892fn build_copy_into_sql(
893    schema_name: Option<&str>,
894    table_name: &str,
895    manifest_path: &str,
896    access_key: &Option<String>,
897    secret_key: &Option<String>,
898    assume_role: &Option<String>,
899) -> Result<String> {
900    let table_name = build_full_table_name(schema_name, table_name);
901    let credentials = if let Some(assume_role) = assume_role {
902        &format!("aws_iam_role={}", assume_role)
903    } else if let (Some(access_key), Some(secret_key)) = (access_key, secret_key) {
904        &format!(
905            "aws_access_key_id={};aws_secret_access_key={}",
906            access_key, secret_key
907        )
908    } else {
909        return Err(SinkError::Config(anyhow!(
910            "Either assume_role or access_key and secret_key must be provided for Redshift COPY command"
911        )));
912    };
913    Ok(format!(
914        r#"
915        COPY {table_name}
916        FROM '{manifest_path}'
917        CREDENTIALS '{credentials}'
918        FORMAT AS JSON 'auto'
919        DATEFORMAT 'auto'
920        TIMEFORMAT 'auto'
921        MANIFEST;
922        "#,
923        table_name = table_name,
924        manifest_path = manifest_path,
925        credentials = credentials
926    ))
927}