risingwave_connector/sink/snowflake_redshift/
redshift.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use core::num::NonZero;
16use std::fmt::Write;
17use std::time::Duration;
18
19use anyhow::anyhow;
20use bytes::BytesMut;
21use itertools::Itertools;
22use phf::{Set, phf_set};
23use risingwave_common::array::StreamChunk;
24use risingwave_common::catalog::{ColumnDesc, ColumnId, Schema};
25use risingwave_common::types::DataType;
26use risingwave_pb::connector_service::sink_metadata::SerializedMetadata;
27use risingwave_pb::connector_service::{SinkMetadata, sink_metadata};
28use risingwave_pb::stream_plan::PbSinkSchemaChange;
29use serde::Deserialize;
30use serde_json::json;
31use serde_with::{DisplayFromStr, serde_as};
32use thiserror_ext::AsReport;
33use tokio::sync::mpsc::{UnboundedSender, unbounded_channel};
34use tokio::time::{MissedTickBehavior, interval};
35use tonic::async_trait;
36use tracing::warn;
37use with_options::WithOptions;
38
39use crate::connector_common::IcebergSinkCompactionUpdate;
40use crate::enforce_secret::EnforceSecret;
41use crate::sink::coordinate::CoordinatedLogSinker;
42use crate::sink::file_sink::opendal_sink::FileSink;
43use crate::sink::file_sink::s3::{S3Common, S3Sink};
44use crate::sink::jdbc_jni_client::{self, JdbcJniClient};
45use crate::sink::remote::CoordinatedRemoteSinkWriter;
46use crate::sink::snowflake_redshift::{
47    __OP, __ROW_ID, AugmentedChunk, SnowflakeRedshiftSinkS3Writer, build_opendal_writer_path,
48};
49use crate::sink::writer::SinkWriter;
50use crate::sink::{
51    Result, SinglePhaseCommitCoordinator, Sink, SinkCommitCoordinator, SinkError, SinkParam,
52    SinkWriterMetrics,
53};
54
55pub const REDSHIFT_SINK: &str = "redshift";
56
57fn build_full_table_name(schema_name: Option<&str>, table_name: &str) -> String {
58    if let Some(schema_name) = schema_name {
59        format!(r#""{}"."{}""#, schema_name, table_name)
60    } else {
61        format!(r#""{}""#, table_name)
62    }
63}
64
65fn build_alter_add_column_sql(
66    schema_name: Option<&str>,
67    table_name: &str,
68    columns: &Vec<(String, String)>,
69) -> String {
70    let full_table_name = build_full_table_name(schema_name, table_name);
71    // redshift does not support add column IF NOT EXISTS yet.
72    jdbc_jni_client::build_alter_add_column_sql(&full_table_name, columns, false)
73}
74
75#[serde_as]
76#[derive(Debug, Clone, Deserialize, WithOptions)]
77pub struct RedShiftConfig {
78    #[serde(rename = "jdbc.url")]
79    pub jdbc_url: String,
80
81    #[serde(rename = "user")]
82    pub username: Option<String>,
83
84    #[serde(rename = "password")]
85    pub password: Option<String>,
86
87    #[serde(rename = "schema")]
88    pub schema: Option<String>,
89
90    #[serde(rename = "table.name")]
91    pub table: String,
92
93    #[serde(rename = "intermediate.table.name")]
94    pub cdc_table: Option<String>,
95
96    #[serde(default)]
97    #[serde(rename = "create_table_if_not_exists")]
98    #[serde_as(as = "DisplayFromStr")]
99    pub create_table_if_not_exists: bool,
100
101    #[serde(default = "default_schedule")]
102    #[serde(rename = "write.target.interval.seconds")]
103    #[serde_as(as = "DisplayFromStr")]
104    pub schedule_seconds: u64,
105
106    #[serde(default = "default_batch_insert_rows")]
107    #[serde(rename = "batch.insert.rows")]
108    #[serde_as(as = "DisplayFromStr")]
109    pub batch_insert_rows: u32,
110
111    #[serde(default = "default_with_s3")]
112    #[serde(rename = "with_s3")]
113    #[serde_as(as = "DisplayFromStr")]
114    pub with_s3: bool,
115
116    #[serde(flatten)]
117    pub s3_inner: Option<S3Common>,
118}
119
120fn default_schedule() -> u64 {
121    3600 // Default to 1 hour
122}
123
124fn default_batch_insert_rows() -> u32 {
125    4096 // Default batch size
126}
127
128fn default_with_s3() -> bool {
129    true
130}
131
132impl RedShiftConfig {
133    pub fn build_client(&self) -> Result<JdbcJniClient> {
134        let mut jdbc_url = self.jdbc_url.clone();
135        if let Some(username) = &self.username {
136            jdbc_url = format!("{}?user={}", jdbc_url, username);
137        }
138        if let Some(password) = &self.password {
139            jdbc_url = format!("{}&password={}", jdbc_url, password);
140        }
141        JdbcJniClient::new(jdbc_url)
142    }
143}
144
145#[derive(Debug)]
146pub struct RedshiftSink {
147    config: RedShiftConfig,
148    param: SinkParam,
149    is_append_only: bool,
150    schema: Schema,
151    pk_indices: Vec<usize>,
152}
153impl EnforceSecret for RedshiftSink {
154    const ENFORCE_SECRET_PROPERTIES: Set<&'static str> = phf_set! {
155        "user",
156        "password",
157        "jdbc.url"
158    };
159}
160
161impl TryFrom<SinkParam> for RedshiftSink {
162    type Error = SinkError;
163
164    fn try_from(param: SinkParam) -> std::result::Result<Self, Self::Error> {
165        let config = serde_json::from_value::<RedShiftConfig>(
166            serde_json::to_value(param.properties.clone()).unwrap(),
167        )
168        .map_err(|e| SinkError::Config(anyhow!(e)))?;
169        let is_append_only = param.sink_type.is_append_only();
170        let schema = param.schema();
171        let pk_indices = param.downstream_pk_or_empty();
172        Ok(Self {
173            config,
174            param,
175            is_append_only,
176            schema,
177            pk_indices,
178        })
179    }
180}
181
182impl Sink for RedshiftSink {
183    type LogSinker = CoordinatedLogSinker<RedShiftSinkWriter>;
184
185    const SINK_NAME: &'static str = REDSHIFT_SINK;
186
187    async fn validate(&self) -> Result<()> {
188        if self.config.create_table_if_not_exists {
189            let client = self.config.build_client()?;
190            let schema = self.param.schema();
191            let build_table_sql = build_create_table_sql(
192                self.config.schema.as_deref(),
193                &self.config.table,
194                &schema,
195                false,
196            )?;
197            client.execute_sql_sync(vec![build_table_sql]).await?;
198            if !self.is_append_only {
199                let cdc_table = self.config.cdc_table.as_ref().ok_or_else(|| {
200                    SinkError::Config(anyhow!(
201                        "intermediate.table.name is required for append-only sink"
202                    ))
203                })?;
204                let build_cdc_table_sql = build_create_table_sql(
205                    self.config.schema.as_deref(),
206                    cdc_table,
207                    &schema,
208                    true,
209                )?;
210                client.execute_sql_sync(vec![build_cdc_table_sql]).await?;
211            }
212        }
213        Ok(())
214    }
215
216    fn support_schema_change() -> bool {
217        true
218    }
219
220    async fn new_log_sinker(
221        &self,
222        writer_param: crate::sink::SinkWriterParam,
223    ) -> Result<Self::LogSinker> {
224        let writer = RedShiftSinkWriter::new(
225            self.config.clone(),
226            self.is_append_only,
227            writer_param.clone(),
228            self.param.clone(),
229        )
230        .await?;
231        CoordinatedLogSinker::new(
232            &writer_param,
233            self.param.clone(),
234            writer,
235            NonZero::new(1).unwrap(),
236        )
237        .await
238    }
239
240    fn is_coordinated_sink(&self) -> bool {
241        true
242    }
243
244    async fn new_coordinator(
245        &self,
246        _iceberg_compact_stat_sender: Option<UnboundedSender<IcebergSinkCompactionUpdate>>,
247    ) -> Result<SinkCommitCoordinator> {
248        let pk_column_names: Vec<_> = self
249            .schema
250            .fields
251            .iter()
252            .enumerate()
253            .filter(|(index, _)| self.pk_indices.contains(index))
254            .map(|(_, field)| field.name.clone())
255            .collect();
256        if pk_column_names.is_empty() && !self.is_append_only {
257            return Err(SinkError::Config(anyhow!(
258                "Primary key columns not found. Please set the `primary_key` column in the sink properties, or ensure that the sink contains the primary key columns from the upstream."
259            )));
260        }
261        let all_column_names = self
262            .schema
263            .fields
264            .iter()
265            .map(|field| field.name.clone())
266            .collect();
267        let coordinator = RedshiftSinkCommitter::new(
268            self.config.clone(),
269            self.is_append_only,
270            &pk_column_names,
271            &all_column_names,
272        )?;
273        Ok(SinkCommitCoordinator::SinglePhase(Box::new(coordinator)))
274    }
275}
276
277pub enum RedShiftSinkWriter {
278    S3(SnowflakeRedshiftSinkS3Writer),
279    Jdbc(RedShiftSinkJdbcWriter),
280}
281
282impl RedShiftSinkWriter {
283    pub async fn new(
284        config: RedShiftConfig,
285        is_append_only: bool,
286        writer_param: super::SinkWriterParam,
287        param: SinkParam,
288    ) -> Result<Self> {
289        let schema = param.schema();
290        if config.with_s3 {
291            let executor_id = writer_param.executor_id;
292            let s3_writer = SnowflakeRedshiftSinkS3Writer::new(
293                config.s3_inner.ok_or_else(|| {
294                    SinkError::Config(anyhow!("S3 configuration is required for S3 sink"))
295                })?,
296                schema,
297                is_append_only,
298                executor_id,
299                Some(config.table),
300            )?;
301            Ok(Self::S3(s3_writer))
302        } else {
303            let jdbc_writer =
304                RedShiftSinkJdbcWriter::new(config, is_append_only, writer_param, param).await?;
305            Ok(Self::Jdbc(jdbc_writer))
306        }
307    }
308}
309
310#[async_trait]
311impl SinkWriter for RedShiftSinkWriter {
312    type CommitMetadata = Option<SinkMetadata>;
313
314    async fn begin_epoch(&mut self, epoch: u64) -> Result<()> {
315        match self {
316            Self::S3(writer) => writer.begin_epoch(epoch),
317            Self::Jdbc(writer) => writer.begin_epoch(epoch).await,
318        }
319    }
320
321    async fn write_batch(&mut self, chunk: StreamChunk) -> Result<()> {
322        match self {
323            Self::S3(writer) => writer.write_batch(chunk).await,
324            Self::Jdbc(writer) => writer.write_batch(chunk).await,
325        }
326    }
327
328    async fn barrier(&mut self, is_checkpoint: bool) -> Result<Option<SinkMetadata>> {
329        let metadata = match self {
330            Self::S3(writer) => {
331                if let Some(path) = writer.barrier(is_checkpoint).await? {
332                    path.into_bytes()
333                } else {
334                    vec![]
335                }
336            }
337            Self::Jdbc(writer) => {
338                writer.barrier(is_checkpoint).await?;
339                vec![]
340            }
341        };
342        Ok(Some(SinkMetadata {
343            metadata: Some(sink_metadata::Metadata::Serialized(SerializedMetadata {
344                metadata,
345            })),
346        }))
347    }
348
349    async fn abort(&mut self) -> Result<()> {
350        if let Self::Jdbc(writer) = self {
351            writer.abort().await
352        } else {
353            Ok(())
354        }
355    }
356}
357
358pub struct RedShiftSinkJdbcWriter {
359    augmented_row: AugmentedChunk,
360    jdbc_sink_writer: CoordinatedRemoteSinkWriter,
361}
362
363impl RedShiftSinkJdbcWriter {
364    pub async fn new(
365        config: RedShiftConfig,
366        is_append_only: bool,
367        writer_param: super::SinkWriterParam,
368        mut param: SinkParam,
369    ) -> Result<Self> {
370        let metrics = SinkWriterMetrics::new(&writer_param);
371        let column_descs = &mut param.columns;
372        param.properties.remove("create_table_if_not_exists");
373        param.properties.remove("write.target.interval.seconds");
374        let full_table_name = if is_append_only {
375            config.table
376        } else {
377            let max_column_id = column_descs
378                .iter()
379                .map(|column| column.column_id.get_id())
380                .max()
381                .unwrap_or(0);
382            (*column_descs).push(ColumnDesc::named(
383                __ROW_ID,
384                ColumnId::new(max_column_id + 1),
385                DataType::Varchar,
386            ));
387            (*column_descs).push(ColumnDesc::named(
388                __OP,
389                ColumnId::new(max_column_id + 2),
390                DataType::Int32,
391            ));
392            config.cdc_table.ok_or_else(|| {
393                SinkError::Config(anyhow!(
394                    "intermediate.table.name is required for non-append-only sink"
395                ))
396            })?
397        };
398        param.properties.remove("intermediate.table.name");
399        param.properties.remove("table.name");
400        param.properties.remove("with_s3");
401        if let Some(schema_name) = param.properties.remove("schema") {
402            param
403                .properties
404                .insert("schema.name".to_owned(), schema_name);
405        }
406        param
407            .properties
408            .insert("table.name".to_owned(), full_table_name.clone());
409        param
410            .properties
411            .insert("type".to_owned(), "append-only".to_owned());
412
413        let jdbc_sink_writer =
414            CoordinatedRemoteSinkWriter::new(param.clone(), metrics.clone()).await?;
415        Ok(Self {
416            augmented_row: AugmentedChunk::new(0, is_append_only),
417            jdbc_sink_writer,
418        })
419    }
420
421    async fn begin_epoch(&mut self, epoch: u64) -> Result<()> {
422        self.augmented_row.reset_epoch(epoch);
423        self.jdbc_sink_writer.begin_epoch(epoch).await?;
424        Ok(())
425    }
426
427    async fn write_batch(&mut self, chunk: StreamChunk) -> Result<()> {
428        let chunk = self.augmented_row.augmented_chunk(chunk)?;
429        self.jdbc_sink_writer.write_batch(chunk).await?;
430        Ok(())
431    }
432
433    async fn barrier(&mut self, is_checkpoint: bool) -> Result<()> {
434        self.jdbc_sink_writer.barrier(is_checkpoint).await?;
435        Ok(())
436    }
437
438    async fn abort(&mut self) -> Result<()> {
439        // TODO: abort should clean up all the data written in this epoch.
440        self.jdbc_sink_writer.abort().await?;
441        Ok(())
442    }
443}
444
445pub struct RedshiftSinkCommitter {
446    config: RedShiftConfig,
447    client: JdbcJniClient,
448    pk_column_names: Vec<String>,
449    all_column_names: Vec<String>,
450    schedule_seconds: u64,
451    is_append_only: bool,
452    periodic_task_handle: Option<tokio::task::JoinHandle<()>>,
453    shutdown_sender: Option<tokio::sync::mpsc::UnboundedSender<()>>,
454}
455
456impl RedshiftSinkCommitter {
457    pub fn new(
458        config: RedShiftConfig,
459        is_append_only: bool,
460        pk_column_names: &Vec<String>,
461        all_column_names: &Vec<String>,
462    ) -> Result<Self> {
463        let client = config.build_client()?;
464        let schedule_seconds = config.schedule_seconds;
465        let (periodic_task_handle, shutdown_sender) = if !is_append_only {
466            let schema_name = config.schema.clone();
467            let target_table_name = config.table.clone();
468            let cdc_table_name = config.cdc_table.clone().ok_or_else(|| {
469                SinkError::Config(anyhow!(
470                    "intermediate.table.name is required for non-append-only sink"
471                ))
472            })?;
473            // Create shutdown channel
474            let (shutdown_sender, shutdown_receiver) = unbounded_channel();
475
476            // Clone client for the periodic task
477            let task_client = config.build_client()?;
478
479            let pk_column_names = pk_column_names.clone();
480            let all_column_names = all_column_names.clone();
481            // Start periodic task that runs every hour
482            let periodic_task_handle = tokio::spawn(async move {
483                Self::run_periodic_query_task(
484                    task_client,
485                    schema_name.as_deref(),
486                    &cdc_table_name,
487                    &target_table_name,
488                    pk_column_names,
489                    all_column_names,
490                    schedule_seconds,
491                    shutdown_receiver,
492                )
493                .await;
494            });
495            (Some(periodic_task_handle), Some(shutdown_sender))
496        } else {
497            (None, None)
498        };
499
500        Ok(Self {
501            client,
502            config,
503            pk_column_names: pk_column_names.clone(),
504            all_column_names: all_column_names.clone(),
505            is_append_only,
506            schedule_seconds,
507            periodic_task_handle,
508            shutdown_sender,
509        })
510    }
511
512    /// Runs a periodic query task every hour
513    async fn run_periodic_query_task(
514        client: JdbcJniClient,
515        schema_name: Option<&str>,
516        cdc_table_name: &str,
517        target_table_name: &str,
518        pk_column_names: Vec<String>,
519        all_column_names: Vec<String>,
520        schedule_seconds: u64,
521        mut shutdown_receiver: tokio::sync::mpsc::UnboundedReceiver<()>,
522    ) {
523        let mut interval_timer = interval(Duration::from_secs(schedule_seconds)); // 1 hour = 3600 seconds
524        interval_timer.set_missed_tick_behavior(MissedTickBehavior::Skip);
525        let sql = build_create_merge_into_task_sql(
526            schema_name,
527            cdc_table_name,
528            target_table_name,
529            &pk_column_names,
530            &all_column_names,
531        );
532        loop {
533            tokio::select! {
534                // Check for shutdown signal
535                _ = shutdown_receiver.recv() => {
536                    tracing::info!("Periodic query task received shutdown signal, stopping");
537                    break;
538                }
539                // Execute periodic query
540                _ = interval_timer.tick() => {
541
542                    match client.execute_sql_sync(sql.clone()).await {
543                        Ok(_) => {
544                            tracing::info!("Periodic query executed successfully for table: {}", target_table_name);
545                        }
546                        Err(e) => {
547                            tracing::warn!("Failed to execute periodic query for table {}: {}", target_table_name, e.as_report());
548                        }
549                    }
550                }
551            }
552        }
553    }
554}
555
556impl Drop for RedshiftSinkCommitter {
557    fn drop(&mut self) {
558        // Send shutdown signal to the periodic task
559        if let Some(shutdown_sender) = &self.shutdown_sender
560            && let Err(e) = shutdown_sender.send(())
561        {
562            tracing::warn!(
563                "Failed to send shutdown signal to periodic task: {}",
564                e.as_report()
565            );
566        }
567        tracing::info!("RedshiftSinkCommitter dropped, periodic task stopped");
568    }
569}
570
571#[async_trait]
572impl SinglePhaseCommitCoordinator for RedshiftSinkCommitter {
573    async fn init(&mut self) -> Result<()> {
574        Ok(())
575    }
576
577    async fn commit(
578        &mut self,
579        _epoch: u64,
580        metadata: Vec<SinkMetadata>,
581        schema_change: Option<PbSinkSchemaChange>,
582    ) -> Result<()> {
583        let paths = metadata
584            .into_iter()
585            .filter(|m| {
586                if let Some(sink_metadata::Metadata::Serialized(SerializedMetadata { metadata })) =
587                    &m.metadata
588                {
589                    !metadata.is_empty()
590                } else {
591                    false
592                }
593            })
594            .map(|metadata| {
595                let path = if let Some(sink_metadata::Metadata::Serialized(SerializedMetadata {
596                    metadata,
597                })) = metadata.metadata
598                {
599                    String::from_utf8(metadata).map_err(|e| SinkError::Config(anyhow!(e)))
600                } else {
601                    Err(SinkError::Config(anyhow!("Invalid metadata format")))
602                }?;
603                Ok(json!({
604                    "url": path,
605                    "mandatory": true
606                }))
607            })
608            .collect::<Result<Vec<_>>>()?;
609        if !paths.is_empty() {
610            let s3_inner = self.config.s3_inner.as_ref().ok_or_else(|| {
611                SinkError::Config(anyhow!("S3 configuration is required for S3 sink"))
612            })?;
613            let s3_operator = FileSink::<S3Sink>::new_s3_sink(s3_inner)?;
614            let (mut writer, path) =
615                build_opendal_writer_path(s3_inner, 0.into(), &s3_operator, &None).await?;
616            let manifest_json = json!({
617                "entries": paths
618            });
619            let mut chunk_buf = BytesMut::new();
620            writeln!(chunk_buf, "{}", manifest_json).unwrap();
621            writer.write(chunk_buf.freeze()).await?;
622            writer
623                .close()
624                .await
625                .map_err(|e| SinkError::File(e.to_report_string()))?;
626            let table = if self.is_append_only {
627                &self.config.table
628            } else {
629                self.config.cdc_table.as_ref().ok_or_else(|| {
630                    SinkError::Config(anyhow!(
631                        "intermediate.table.name is required for non-append-only sink"
632                    ))
633                })?
634            };
635            let s3_inner = self.config.s3_inner.as_ref().ok_or_else(|| {
636                SinkError::Config(anyhow!("S3 configuration is required for S3 sink"))
637            })?;
638            let copy_into_sql = build_copy_into_sql(
639                self.config.schema.as_deref(),
640                table,
641                &path,
642                &s3_inner.access,
643                &s3_inner.secret,
644                &s3_inner.assume_role,
645            )?;
646            // run copy into
647            self.client.execute_sql_sync(vec![copy_into_sql]).await?;
648        }
649
650        if let Some(schema_change) = schema_change {
651            use risingwave_pb::stream_plan::sink_schema_change::PbOp as SinkSchemaChangeOp;
652            let schema_change_op = schema_change.op.ok_or_else(|| {
653                SinkError::Coordinator(anyhow!("Invalid schema change operation"))
654            })?;
655            let SinkSchemaChangeOp::AddColumns(add_columns) = schema_change_op else {
656                return Err(SinkError::Coordinator(anyhow!(
657                    "Only AddColumns schema change is supported for Redshift sink"
658                )));
659            };
660            if let Some(shutdown_sender) = &self.shutdown_sender {
661                // Send shutdown signal to the periodic task before altering the table
662                shutdown_sender
663                    .send(())
664                    .map_err(|e| SinkError::Config(anyhow!(e)))?;
665            }
666            let sql = build_alter_add_column_sql(
667                self.config.schema.as_deref(),
668                &self.config.table,
669                &add_columns
670                    .fields
671                    .iter()
672                    .map(|f| {
673                        (
674                            f.name.clone(),
675                            DataType::from(f.data_type.as_ref().unwrap()).to_string(),
676                        )
677                    })
678                    .collect_vec(),
679            );
680            let check_column_exists = |e: anyhow::Error| {
681                let err_str = e.to_report_string();
682                if regex::Regex::new(".+ of relation .+ already exists")
683                    .unwrap()
684                    .find(&err_str)
685                    .is_none()
686                {
687                    return Err(e);
688                }
689                warn!("redshift sink columns already exists. skipped");
690                Ok(())
691            };
692            self.client
693                .execute_sql_sync(vec![sql.clone()])
694                .await
695                .or_else(check_column_exists)?;
696            if !self.is_append_only {
697                let cdc_table_name = self.config.cdc_table.as_ref().ok_or_else(|| {
698                    SinkError::Config(anyhow!(
699                        "intermediate.table.name is required for non-append-only sink"
700                    ))
701                })?;
702                let sql = build_alter_add_column_sql(
703                    self.config.schema.as_deref(),
704                    cdc_table_name,
705                    &add_columns
706                        .fields
707                        .iter()
708                        .map(|f| {
709                            (
710                                f.name.clone(),
711                                DataType::from(f.data_type.as_ref().unwrap()).to_string(),
712                            )
713                        })
714                        .collect::<Vec<_>>(),
715                );
716                self.client
717                    .execute_sql_sync(vec![sql.clone()])
718                    .await
719                    .or_else(check_column_exists)?;
720                self.all_column_names
721                    .extend(add_columns.fields.iter().map(|f| f.name.clone()));
722
723                if let Some(shutdown_sender) = self.shutdown_sender.take() {
724                    let _ = shutdown_sender.send(());
725                }
726                if let Some(periodic_task_handle) = self.periodic_task_handle.take() {
727                    let _ = periodic_task_handle.await;
728                }
729
730                let (shutdown_sender, shutdown_receiver) = unbounded_channel();
731                let client = self.client.clone();
732                let schema_name = self.config.schema.clone();
733                let cdc_table_name = self.config.cdc_table.clone().unwrap();
734                let target_table_name = self.config.table.clone();
735                let pk_column_names = self.pk_column_names.clone();
736                let all_column_names = self.all_column_names.clone();
737                let schedule_seconds = self.schedule_seconds;
738                let periodic_task_handle = tokio::spawn(async move {
739                    Self::run_periodic_query_task(
740                        client,
741                        schema_name.as_deref(),
742                        &cdc_table_name,
743                        &target_table_name,
744                        pk_column_names,
745                        all_column_names,
746                        schedule_seconds,
747                        shutdown_receiver,
748                    )
749                    .await;
750                });
751                self.shutdown_sender = Some(shutdown_sender);
752                self.periodic_task_handle = Some(periodic_task_handle);
753            }
754        }
755        Ok(())
756    }
757}
758
759pub fn build_create_table_sql(
760    schema_name: Option<&str>,
761    table_name: &str,
762    schema: &Schema,
763    need_op_and_row_id: bool,
764) -> Result<String> {
765    let mut columns: Vec<String> = schema
766        .fields
767        .iter()
768        .map(|field| {
769            let data_type = convert_redshift_data_type(&field.data_type)?;
770            Ok(format!("{} {}", field.name, data_type))
771        })
772        .collect::<Result<Vec<String>>>()?;
773    if need_op_and_row_id {
774        columns.push(format!("{} VARCHAR(MAX)", __ROW_ID));
775        columns.push(format!("{} INT", __OP));
776    }
777    let columns_str = columns.join(", ");
778    let full_table_name = build_full_table_name(schema_name, table_name);
779    Ok(format!(
780        "CREATE TABLE IF NOT EXISTS {} ({})",
781        full_table_name, columns_str
782    ))
783}
784
785fn convert_redshift_data_type(data_type: &DataType) -> Result<String> {
786    let data_type = match data_type {
787        DataType::Int16 => "SMALLINT".to_owned(),
788        DataType::Int32 => "INTEGER".to_owned(),
789        DataType::Int64 => "BIGINT".to_owned(),
790        DataType::Float32 => "REAL".to_owned(),
791        DataType::Float64 => "FLOAT".to_owned(),
792        DataType::Boolean => "BOOLEAN".to_owned(),
793        DataType::Varchar => "VARCHAR(MAX)".to_owned(),
794        DataType::Date => "DATE".to_owned(),
795        DataType::Timestamp => "TIMESTAMP".to_owned(),
796        DataType::Timestamptz => "TIMESTAMPTZ".to_owned(),
797        DataType::Jsonb => "VARCHAR(MAX)".to_owned(),
798        DataType::Decimal => "DECIMAL".to_owned(),
799        DataType::Time => "TIME".to_owned(),
800        _ => {
801            return Err(SinkError::Config(anyhow!(
802                "Dont support auto create table for datatype: {}",
803                data_type
804            )));
805        }
806    };
807    Ok(data_type)
808}
809
810fn build_create_merge_into_task_sql(
811    schema_name: Option<&str>,
812    cdc_table_name: &str,
813    target_table_name: &str,
814    pk_column_names: &Vec<String>,
815    all_column_names: &Vec<String>,
816) -> Vec<String> {
817    let cdc_table_name = build_full_table_name(schema_name, cdc_table_name);
818    let target_table_name = build_full_table_name(schema_name, target_table_name);
819    let pk_names_str = pk_column_names.join(", ");
820    let pk_names_eq_str = pk_column_names
821        .iter()
822        .map(|name| format!("{target_table_name}.{name} = source.{name}", name = name))
823        .collect::<Vec<String>>()
824        .join(" AND ");
825    let all_column_names_set_str = all_column_names
826        .iter()
827        .map(|name| format!("{name} = source.{name}", name = name))
828        .collect::<Vec<String>>()
829        .join(", ");
830    let all_column_names_str = all_column_names.join(", ");
831    let all_column_names_insert_str = all_column_names
832        .iter()
833        .map(|name| format!("source.{name}", name = name))
834        .collect::<Vec<String>>()
835        .join(", ");
836
837    vec![
838        format!(
839            r#"
840            CREATE TEMP TABLE max_id_table AS
841            SELECT COALESCE(MAX({redshift_sink_row_id}), '0') AS max_row_id
842            FROM {cdc_table_name};
843            "#,
844            redshift_sink_row_id = __ROW_ID,
845            cdc_table_name = cdc_table_name,
846        ),
847        format!(
848            r#"
849            DELETE FROM {target_table_name}
850            USING (
851                SELECT *
852                FROM (
853                    SELECT *, ROW_NUMBER() OVER (
854                        PARTITION BY {pk_names_str}
855                        ORDER BY {redshift_sink_row_id} DESC
856                    ) AS dedupe_id
857                    FROM {cdc_table_name}, max_id_table
858                    WHERE {cdc_table_name}.{redshift_sink_row_id} <= max_id_table.max_row_id
859                ) AS subquery
860                WHERE dedupe_id = 1 AND {redshift_sink_op} IN (2, 4)
861            ) AS source
862            WHERE {pk_names_eq_str};
863            "#,
864            target_table_name = target_table_name,
865            pk_names_str = pk_names_str,
866            redshift_sink_row_id = __ROW_ID,
867            cdc_table_name = cdc_table_name,
868            redshift_sink_op = __OP,
869            pk_names_eq_str = pk_names_eq_str,
870        ),
871        format!(
872            r#"
873            MERGE INTO {target_table_name}
874            USING (
875                SELECT *
876                FROM (
877                    SELECT *, ROW_NUMBER() OVER (
878                        PARTITION BY {pk_names_str}
879                        ORDER BY {redshift_sink_row_id} DESC
880                    ) AS dedupe_id
881                    FROM {cdc_table_name}, max_id_table
882                    WHERE {cdc_table_name}.{redshift_sink_row_id} <= max_id_table.max_row_id
883                ) AS subquery
884                WHERE dedupe_id = 1 AND {redshift_sink_op} IN (1, 3)
885            ) AS source
886            ON {pk_names_eq_str}
887            WHEN MATCHED THEN
888                UPDATE SET {all_column_names_set_str}
889            WHEN NOT MATCHED THEN
890                INSERT ({all_column_names_str}) VALUES ({all_column_names_insert_str});
891            "#,
892            target_table_name = target_table_name,
893            pk_names_str = pk_names_str,
894            redshift_sink_row_id = __ROW_ID,
895            cdc_table_name = cdc_table_name,
896            redshift_sink_op = __OP,
897            pk_names_eq_str = pk_names_eq_str,
898            all_column_names_set_str = all_column_names_set_str,
899            all_column_names_str = all_column_names_str,
900            all_column_names_insert_str = all_column_names_insert_str,
901        ),
902        format!(
903            r#"
904            DELETE FROM {cdc_table_name}
905            USING max_id_table
906            WHERE {cdc_table_name}.{redshift_sink_row_id} <= max_id_table.max_row_id;
907            "#,
908            cdc_table_name = cdc_table_name,
909            redshift_sink_row_id = __ROW_ID,
910        ),
911        "DROP TABLE IF EXISTS max_id_table;".to_owned(),
912    ]
913}
914
915fn build_copy_into_sql(
916    schema_name: Option<&str>,
917    table_name: &str,
918    manifest_path: &str,
919    access_key: &Option<String>,
920    secret_key: &Option<String>,
921    assume_role: &Option<String>,
922) -> Result<String> {
923    let table_name = build_full_table_name(schema_name, table_name);
924    let credentials = if let Some(assume_role) = assume_role {
925        &format!("aws_iam_role={}", assume_role)
926    } else if let (Some(access_key), Some(secret_key)) = (access_key, secret_key) {
927        &format!(
928            "aws_access_key_id={};aws_secret_access_key={}",
929            access_key, secret_key
930        )
931    } else {
932        return Err(SinkError::Config(anyhow!(
933            "Either assume_role or access_key and secret_key must be provided for Redshift COPY command"
934        )));
935    };
936    Ok(format!(
937        r#"
938        COPY {table_name}
939        FROM '{manifest_path}'
940        CREDENTIALS '{credentials}'
941        FORMAT AS JSON 'auto'
942        DATEFORMAT 'auto'
943        TIMEFORMAT 'auto'
944        MANIFEST;
945        "#,
946        table_name = table_name,
947        manifest_path = manifest_path,
948        credentials = credentials
949    ))
950}