risingwave_connector/sink/snowflake_redshift/
redshift.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use core::num::NonZero;
16use std::fmt::Write;
17use std::time::Duration;
18
19use anyhow::anyhow;
20use bytes::BytesMut;
21use itertools::Itertools;
22use phf::{Set, phf_set};
23use risingwave_common::array::StreamChunk;
24use risingwave_common::catalog::{ColumnDesc, ColumnId, Schema};
25use risingwave_common::types::DataType;
26use risingwave_pb::connector_service::sink_metadata::SerializedMetadata;
27use risingwave_pb::connector_service::{SinkMetadata, sink_metadata};
28use risingwave_pb::stream_plan::PbSinkSchemaChange;
29use serde::Deserialize;
30use serde_json::json;
31use serde_with::{DisplayFromStr, serde_as};
32use thiserror_ext::AsReport;
33use tokio::sync::mpsc::{UnboundedSender, unbounded_channel};
34use tokio::time::{MissedTickBehavior, interval};
35use tonic::async_trait;
36use tracing::warn;
37use with_options::WithOptions;
38
39use crate::connector_common::IcebergSinkCompactionUpdate;
40use crate::enforce_secret::EnforceSecret;
41use crate::sink::coordinate::CoordinatedLogSinker;
42use crate::sink::file_sink::opendal_sink::FileSink;
43use crate::sink::file_sink::s3::{S3Common, S3Sink};
44use crate::sink::jdbc_jni_client::{self, JdbcJniClient};
45use crate::sink::remote::CoordinatedRemoteSinkWriter;
46use crate::sink::snowflake_redshift::{
47    __OP, __ROW_ID, AugmentedChunk, SnowflakeRedshiftSinkS3Writer, build_opendal_writer_path,
48};
49use crate::sink::writer::SinkWriter;
50use crate::sink::{
51    Result, SinglePhaseCommitCoordinator, Sink, SinkCommitCoordinator, SinkError, SinkParam,
52    SinkWriterMetrics,
53};
54
55pub const REDSHIFT_SINK: &str = "redshift";
56
57fn build_full_table_name(schema_name: Option<&str>, table_name: &str) -> String {
58    if let Some(schema_name) = schema_name {
59        format!(r#""{}"."{}""#, schema_name, table_name)
60    } else {
61        format!(r#""{}""#, table_name)
62    }
63}
64
65fn build_alter_add_column_sql(
66    schema_name: Option<&str>,
67    table_name: &str,
68    columns: &Vec<(String, String)>,
69) -> String {
70    let full_table_name = build_full_table_name(schema_name, table_name);
71    // redshift does not support add column IF NOT EXISTS yet.
72    jdbc_jni_client::build_alter_add_column_sql(&full_table_name, columns, false)
73}
74
75#[serde_as]
76#[derive(Debug, Clone, Deserialize, WithOptions)]
77pub struct RedShiftConfig {
78    #[serde(rename = "jdbc.url")]
79    pub jdbc_url: String,
80
81    #[serde(rename = "user")]
82    pub username: Option<String>,
83
84    #[serde(rename = "password")]
85    pub password: Option<String>,
86
87    #[serde(rename = "schema")]
88    pub schema: Option<String>,
89
90    #[serde(rename = "table.name")]
91    pub table: String,
92
93    #[serde(rename = "intermediate.table.name")]
94    pub cdc_table: Option<String>,
95
96    #[serde(default)]
97    #[serde(rename = "create_table_if_not_exists")]
98    #[serde_as(as = "DisplayFromStr")]
99    pub create_table_if_not_exists: bool,
100
101    #[serde(default = "default_schedule")]
102    #[serde(rename = "write.target.interval.seconds")]
103    #[serde_as(as = "DisplayFromStr")]
104    pub schedule_seconds: u64,
105
106    #[serde(default = "default_batch_insert_rows")]
107    #[serde(rename = "batch.insert.rows")]
108    #[serde_as(as = "DisplayFromStr")]
109    pub batch_insert_rows: u32,
110
111    #[serde(default = "default_with_s3")]
112    #[serde(rename = "with_s3")]
113    #[serde_as(as = "DisplayFromStr")]
114    pub with_s3: bool,
115
116    #[serde(flatten)]
117    pub s3_inner: Option<S3Common>,
118}
119
120fn default_schedule() -> u64 {
121    3600 // Default to 1 hour
122}
123
124fn default_batch_insert_rows() -> u32 {
125    4096 // Default batch size
126}
127
128fn default_with_s3() -> bool {
129    true
130}
131
132impl RedShiftConfig {
133    pub fn build_client(&self) -> Result<JdbcJniClient> {
134        let mut jdbc_url = self.jdbc_url.clone();
135        if let Some(username) = &self.username {
136            jdbc_url = format!("{}?user={}", jdbc_url, username);
137        }
138        if let Some(password) = &self.password {
139            jdbc_url = format!("{}&password={}", jdbc_url, password);
140        }
141        JdbcJniClient::new(jdbc_url)
142    }
143}
144
145#[derive(Debug)]
146pub struct RedshiftSink {
147    config: RedShiftConfig,
148    param: SinkParam,
149    is_append_only: bool,
150    schema: Schema,
151    pk_indices: Vec<usize>,
152}
153impl EnforceSecret for RedshiftSink {
154    const ENFORCE_SECRET_PROPERTIES: Set<&'static str> = phf_set! {
155        "user",
156        "password",
157        "jdbc.url"
158    };
159}
160
161impl TryFrom<SinkParam> for RedshiftSink {
162    type Error = SinkError;
163
164    fn try_from(param: SinkParam) -> std::result::Result<Self, Self::Error> {
165        let config = serde_json::from_value::<RedShiftConfig>(
166            serde_json::to_value(param.properties.clone()).unwrap(),
167        )
168        .map_err(|e| SinkError::Config(anyhow!(e)))?;
169        let is_append_only = param.sink_type.is_append_only();
170        let schema = param.schema();
171        let pk_indices = param.downstream_pk_or_empty();
172        Ok(Self {
173            config,
174            param,
175            is_append_only,
176            schema,
177            pk_indices,
178        })
179    }
180}
181
182impl Sink for RedshiftSink {
183    type LogSinker = CoordinatedLogSinker<RedShiftSinkWriter>;
184
185    const SINK_NAME: &'static str = REDSHIFT_SINK;
186
187    async fn validate(&self) -> Result<()> {
188        if self.config.create_table_if_not_exists {
189            let client = self.config.build_client()?;
190            let schema = self.param.schema();
191            let build_table_sql = build_create_table_sql(
192                self.config.schema.as_deref(),
193                &self.config.table,
194                &schema,
195                false,
196            )?;
197            client.execute_sql_sync(vec![build_table_sql]).await?;
198            if !self.is_append_only {
199                let cdc_table = self.config.cdc_table.as_ref().ok_or_else(|| {
200                    SinkError::Config(anyhow!(
201                        "intermediate.table.name is required for append-only sink"
202                    ))
203                })?;
204                let build_cdc_table_sql = build_create_table_sql(
205                    self.config.schema.as_deref(),
206                    cdc_table,
207                    &schema,
208                    true,
209                )?;
210                client.execute_sql_sync(vec![build_cdc_table_sql]).await?;
211            }
212        }
213        Ok(())
214    }
215
216    fn support_schema_change() -> bool {
217        true
218    }
219
220    async fn new_log_sinker(
221        &self,
222        writer_param: crate::sink::SinkWriterParam,
223    ) -> Result<Self::LogSinker> {
224        let writer = RedShiftSinkWriter::new(
225            self.config.clone(),
226            self.is_append_only,
227            writer_param.clone(),
228            self.param.clone(),
229        )
230        .await?;
231        CoordinatedLogSinker::new(
232            &writer_param,
233            self.param.clone(),
234            writer,
235            NonZero::new(1).unwrap(),
236        )
237        .await
238    }
239
240    fn is_coordinated_sink(&self) -> bool {
241        true
242    }
243
244    async fn new_coordinator(
245        &self,
246        _iceberg_compact_stat_sender: Option<UnboundedSender<IcebergSinkCompactionUpdate>>,
247    ) -> Result<SinkCommitCoordinator> {
248        let pk_column_names: Vec<_> = self
249            .schema
250            .fields
251            .iter()
252            .enumerate()
253            .filter(|(index, _)| self.pk_indices.contains(index))
254            .map(|(_, field)| field.name.clone())
255            .collect();
256        if pk_column_names.is_empty() && !self.is_append_only {
257            return Err(SinkError::Config(anyhow!(
258                "Primary key columns not found. Please set the `primary_key` column in the sink properties, or ensure that the sink contains the primary key columns from the upstream."
259            )));
260        }
261        let all_column_names = self
262            .schema
263            .fields
264            .iter()
265            .map(|field| field.name.clone())
266            .collect();
267        let coordinator = RedshiftSinkCommitter::new(
268            self.config.clone(),
269            self.is_append_only,
270            &pk_column_names,
271            &all_column_names,
272        )?;
273        Ok(SinkCommitCoordinator::SinglePhase(Box::new(coordinator)))
274    }
275}
276
277pub enum RedShiftSinkWriter {
278    S3(SnowflakeRedshiftSinkS3Writer),
279    Jdbc(RedShiftSinkJdbcWriter),
280}
281
282impl RedShiftSinkWriter {
283    pub async fn new(
284        config: RedShiftConfig,
285        is_append_only: bool,
286        writer_param: super::SinkWriterParam,
287        param: SinkParam,
288    ) -> Result<Self> {
289        let schema = param.schema();
290        if config.with_s3 {
291            let executor_id = writer_param.executor_id;
292            let s3_writer = SnowflakeRedshiftSinkS3Writer::new(
293                config.s3_inner.ok_or_else(|| {
294                    SinkError::Config(anyhow!("S3 configuration is required for S3 sink"))
295                })?,
296                schema,
297                is_append_only,
298                executor_id,
299                Some(config.table),
300            )?;
301            Ok(Self::S3(s3_writer))
302        } else {
303            let jdbc_writer =
304                RedShiftSinkJdbcWriter::new(config, is_append_only, writer_param, param).await?;
305            Ok(Self::Jdbc(jdbc_writer))
306        }
307    }
308}
309
310#[async_trait]
311impl SinkWriter for RedShiftSinkWriter {
312    type CommitMetadata = Option<SinkMetadata>;
313
314    async fn begin_epoch(&mut self, epoch: u64) -> Result<()> {
315        match self {
316            Self::S3(writer) => writer.begin_epoch(epoch),
317            Self::Jdbc(writer) => writer.begin_epoch(epoch).await,
318        }
319    }
320
321    async fn write_batch(&mut self, chunk: StreamChunk) -> Result<()> {
322        match self {
323            Self::S3(writer) => writer.write_batch(chunk).await,
324            Self::Jdbc(writer) => writer.write_batch(chunk).await,
325        }
326    }
327
328    async fn barrier(&mut self, is_checkpoint: bool) -> Result<Option<SinkMetadata>> {
329        let metadata = match self {
330            Self::S3(writer) => {
331                if let Some(path) = writer.barrier(is_checkpoint).await? {
332                    path.into_bytes()
333                } else {
334                    vec![]
335                }
336            }
337            Self::Jdbc(writer) => {
338                writer.barrier(is_checkpoint).await?;
339                vec![]
340            }
341        };
342        Ok(Some(SinkMetadata {
343            metadata: Some(sink_metadata::Metadata::Serialized(SerializedMetadata {
344                metadata,
345            })),
346        }))
347    }
348
349    async fn abort(&mut self) -> Result<()> {
350        if let Self::Jdbc(writer) = self {
351            writer.abort().await
352        } else {
353            Ok(())
354        }
355    }
356}
357
358pub struct RedShiftSinkJdbcWriter {
359    augmented_row: AugmentedChunk,
360    jdbc_sink_writer: CoordinatedRemoteSinkWriter,
361}
362
363impl RedShiftSinkJdbcWriter {
364    pub async fn new(
365        config: RedShiftConfig,
366        is_append_only: bool,
367        writer_param: super::SinkWriterParam,
368        mut param: SinkParam,
369    ) -> Result<Self> {
370        let metrics = SinkWriterMetrics::new(&writer_param);
371        let column_descs = &mut param.columns;
372        param.properties.remove("create_table_if_not_exists");
373        param.properties.remove("write.target.interval.seconds");
374        let full_table_name = if is_append_only {
375            config.table
376        } else {
377            let max_column_id = column_descs
378                .iter()
379                .map(|column| column.column_id.get_id())
380                .max()
381                .unwrap_or(0);
382            (*column_descs).push(ColumnDesc::named(
383                __ROW_ID,
384                ColumnId::new(max_column_id + 1),
385                DataType::Varchar,
386            ));
387            (*column_descs).push(ColumnDesc::named(
388                __OP,
389                ColumnId::new(max_column_id + 2),
390                DataType::Int32,
391            ));
392            config.cdc_table.ok_or_else(|| {
393                SinkError::Config(anyhow!(
394                    "intermediate.table.name is required for non-append-only sink"
395                ))
396            })?
397        };
398        param.properties.remove("intermediate.table.name");
399        param.properties.remove("table.name");
400        param.properties.remove("with_s3");
401        if let Some(schema_name) = param.properties.remove("schema") {
402            param
403                .properties
404                .insert("schema.name".to_owned(), schema_name);
405        }
406        param
407            .properties
408            .insert("table.name".to_owned(), full_table_name.clone());
409        param
410            .properties
411            .insert("type".to_owned(), "append-only".to_owned());
412
413        let jdbc_sink_writer =
414            CoordinatedRemoteSinkWriter::new(param.clone(), metrics.clone()).await?;
415        Ok(Self {
416            augmented_row: AugmentedChunk::new(0, is_append_only),
417            jdbc_sink_writer,
418        })
419    }
420
421    async fn begin_epoch(&mut self, epoch: u64) -> Result<()> {
422        self.augmented_row.reset_epoch(epoch);
423        self.jdbc_sink_writer.begin_epoch(epoch).await?;
424        Ok(())
425    }
426
427    async fn write_batch(&mut self, chunk: StreamChunk) -> Result<()> {
428        let chunk = self.augmented_row.augmented_chunk(chunk)?;
429        self.jdbc_sink_writer.write_batch(chunk).await?;
430        Ok(())
431    }
432
433    async fn barrier(&mut self, is_checkpoint: bool) -> Result<()> {
434        self.jdbc_sink_writer.barrier(is_checkpoint).await?;
435        Ok(())
436    }
437
438    async fn abort(&mut self) -> Result<()> {
439        // TODO: abort should clean up all the data written in this epoch.
440        self.jdbc_sink_writer.abort().await?;
441        Ok(())
442    }
443}
444
445pub struct RedshiftSinkCommitter {
446    config: RedShiftConfig,
447    client: JdbcJniClient,
448    pk_column_names: Vec<String>,
449    all_column_names: Vec<String>,
450    schedule_seconds: u64,
451    is_append_only: bool,
452    periodic_task_handle: Option<tokio::task::JoinHandle<()>>,
453    shutdown_sender: Option<tokio::sync::mpsc::UnboundedSender<()>>,
454}
455
456impl RedshiftSinkCommitter {
457    pub fn new(
458        config: RedShiftConfig,
459        is_append_only: bool,
460        pk_column_names: &Vec<String>,
461        all_column_names: &Vec<String>,
462    ) -> Result<Self> {
463        let client = config.build_client()?;
464        let schedule_seconds = config.schedule_seconds;
465        let (periodic_task_handle, shutdown_sender) = if !is_append_only {
466            let schema_name = config.schema.clone();
467            let target_table_name = config.table.clone();
468            let cdc_table_name = config.cdc_table.clone().ok_or_else(|| {
469                SinkError::Config(anyhow!(
470                    "intermediate.table.name is required for non-append-only sink"
471                ))
472            })?;
473            // Create shutdown channel
474            let (shutdown_sender, shutdown_receiver) = unbounded_channel();
475
476            // Clone client for the periodic task
477            let task_client = config.build_client()?;
478
479            let pk_column_names = pk_column_names.clone();
480            let all_column_names = all_column_names.clone();
481            // Start periodic task that runs every hour
482            let periodic_task_handle = tokio::spawn(async move {
483                Self::run_periodic_query_task(
484                    task_client,
485                    schema_name.as_deref(),
486                    &cdc_table_name,
487                    &target_table_name,
488                    pk_column_names,
489                    all_column_names,
490                    schedule_seconds,
491                    shutdown_receiver,
492                )
493                .await;
494            });
495            (Some(periodic_task_handle), Some(shutdown_sender))
496        } else {
497            (None, None)
498        };
499
500        Ok(Self {
501            client,
502            config,
503            pk_column_names: pk_column_names.clone(),
504            all_column_names: all_column_names.clone(),
505            is_append_only,
506            schedule_seconds,
507            periodic_task_handle,
508            shutdown_sender,
509        })
510    }
511
512    /// Runs a periodic query task every hour
513    async fn run_periodic_query_task(
514        client: JdbcJniClient,
515        schema_name: Option<&str>,
516        cdc_table_name: &str,
517        target_table_name: &str,
518        pk_column_names: Vec<String>,
519        all_column_names: Vec<String>,
520        schedule_seconds: u64,
521        mut shutdown_receiver: tokio::sync::mpsc::UnboundedReceiver<()>,
522    ) {
523        let mut interval_timer = interval(Duration::from_secs(schedule_seconds)); // 1 hour = 3600 seconds
524        interval_timer.set_missed_tick_behavior(MissedTickBehavior::Skip);
525        let sql = build_create_merge_into_task_sql(
526            schema_name,
527            cdc_table_name,
528            target_table_name,
529            &pk_column_names,
530            &all_column_names,
531        );
532        loop {
533            tokio::select! {
534                // Check for shutdown signal
535                _ = shutdown_receiver.recv() => {
536                    tracing::info!("Periodic query task received shutdown signal, stopping");
537                    break;
538                }
539                // Execute periodic query
540                _ = interval_timer.tick() => {
541
542                    match client.execute_sql_sync(sql.clone()).await {
543                        Ok(_) => {
544                            tracing::info!("Periodic query executed successfully for table: {}", target_table_name);
545                        }
546                        Err(e) => {
547                            tracing::warn!("Failed to execute periodic query for table {}: {}", target_table_name, e.as_report());
548                        }
549                    }
550                }
551            }
552        }
553    }
554}
555
556impl Drop for RedshiftSinkCommitter {
557    fn drop(&mut self) {
558        // Send shutdown signal to the periodic task
559        if let Some(shutdown_sender) = &self.shutdown_sender
560            && let Err(e) = shutdown_sender.send(())
561        {
562            tracing::warn!(
563                "Failed to send shutdown signal to periodic task: {}",
564                e.as_report()
565            );
566        }
567        tracing::info!("RedshiftSinkCommitter dropped, periodic task stopped");
568    }
569}
570
571#[async_trait]
572impl SinglePhaseCommitCoordinator for RedshiftSinkCommitter {
573    async fn init(&mut self) -> Result<()> {
574        Ok(())
575    }
576
577    async fn commit_data(&mut self, _epoch: u64, metadata: Vec<SinkMetadata>) -> Result<()> {
578        let paths = metadata
579            .into_iter()
580            .filter(|m| {
581                if let Some(sink_metadata::Metadata::Serialized(SerializedMetadata { metadata })) =
582                    &m.metadata
583                {
584                    !metadata.is_empty()
585                } else {
586                    false
587                }
588            })
589            .map(|metadata| {
590                let path = if let Some(sink_metadata::Metadata::Serialized(SerializedMetadata {
591                    metadata,
592                })) = metadata.metadata
593                {
594                    String::from_utf8(metadata).map_err(|e| SinkError::Config(anyhow!(e)))
595                } else {
596                    Err(SinkError::Config(anyhow!("Invalid metadata format")))
597                }?;
598                Ok(json!({
599                    "url": path,
600                    "mandatory": true
601                }))
602            })
603            .collect::<Result<Vec<_>>>()?;
604        if !paths.is_empty() {
605            let s3_inner = self.config.s3_inner.as_ref().ok_or_else(|| {
606                SinkError::Config(anyhow!("S3 configuration is required for S3 sink"))
607            })?;
608            let s3_operator = FileSink::<S3Sink>::new_s3_sink(s3_inner)?;
609            let (mut writer, path) =
610                build_opendal_writer_path(s3_inner, 0.into(), &s3_operator, &None).await?;
611            let manifest_json = json!({
612                "entries": paths
613            });
614            let mut chunk_buf = BytesMut::new();
615            writeln!(chunk_buf, "{}", manifest_json).unwrap();
616            writer.write(chunk_buf.freeze()).await?;
617            writer
618                .close()
619                .await
620                .map_err(|e| SinkError::File(e.to_report_string()))?;
621            let table = if self.is_append_only {
622                &self.config.table
623            } else {
624                self.config.cdc_table.as_ref().ok_or_else(|| {
625                    SinkError::Config(anyhow!(
626                        "intermediate.table.name is required for non-append-only sink"
627                    ))
628                })?
629            };
630            let s3_inner = self.config.s3_inner.as_ref().ok_or_else(|| {
631                SinkError::Config(anyhow!("S3 configuration is required for S3 sink"))
632            })?;
633            let copy_into_sql = build_copy_into_sql(
634                self.config.schema.as_deref(),
635                table,
636                &path,
637                &s3_inner.access,
638                &s3_inner.secret,
639                &s3_inner.assume_role,
640            )?;
641            // run copy into
642            self.client.execute_sql_sync(vec![copy_into_sql]).await?;
643        }
644        Ok(())
645    }
646
647    async fn commit_schema_change(
648        &mut self,
649        _epoch: u64,
650        schema_change: PbSinkSchemaChange,
651    ) -> Result<()> {
652        use risingwave_pb::stream_plan::sink_schema_change::PbOp as SinkSchemaChangeOp;
653        let schema_change_op = schema_change
654            .op
655            .ok_or_else(|| SinkError::Coordinator(anyhow!("Invalid schema change operation")))?;
656        let SinkSchemaChangeOp::AddColumns(add_columns) = schema_change_op else {
657            return Err(SinkError::Coordinator(anyhow!(
658                "Only AddColumns schema change is supported for Redshift sink"
659            )));
660        };
661        if let Some(shutdown_sender) = &self.shutdown_sender {
662            // Send shutdown signal to the periodic task before altering the table
663            shutdown_sender
664                .send(())
665                .map_err(|e| SinkError::Config(anyhow!(e)))?;
666        }
667        let sql = build_alter_add_column_sql(
668            self.config.schema.as_deref(),
669            &self.config.table,
670            &add_columns
671                .fields
672                .iter()
673                .map(|f| {
674                    (
675                        f.name.clone(),
676                        DataType::from(f.data_type.as_ref().unwrap()).to_string(),
677                    )
678                })
679                .collect_vec(),
680        );
681        let check_column_exists = |e: anyhow::Error| {
682            let err_str = e.to_report_string();
683            if regex::Regex::new(".+ of relation .+ already exists")
684                .unwrap()
685                .find(&err_str)
686                .is_none()
687            {
688                return Err(e);
689            }
690            warn!("redshift sink columns already exists. skipped");
691            Ok(())
692        };
693        self.client
694            .execute_sql_sync(vec![sql.clone()])
695            .await
696            .or_else(check_column_exists)?;
697        if !self.is_append_only {
698            let cdc_table_name = self.config.cdc_table.as_ref().ok_or_else(|| {
699                SinkError::Config(anyhow!(
700                    "intermediate.table.name is required for non-append-only sink"
701                ))
702            })?;
703            let sql = build_alter_add_column_sql(
704                self.config.schema.as_deref(),
705                cdc_table_name,
706                &add_columns
707                    .fields
708                    .iter()
709                    .map(|f| {
710                        (
711                            f.name.clone(),
712                            DataType::from(f.data_type.as_ref().unwrap()).to_string(),
713                        )
714                    })
715                    .collect::<Vec<_>>(),
716            );
717            self.client
718                .execute_sql_sync(vec![sql.clone()])
719                .await
720                .or_else(check_column_exists)?;
721            self.all_column_names
722                .extend(add_columns.fields.iter().map(|f| f.name.clone()));
723
724            if let Some(shutdown_sender) = self.shutdown_sender.take() {
725                let _ = shutdown_sender.send(());
726            }
727            if let Some(periodic_task_handle) = self.periodic_task_handle.take() {
728                let _ = periodic_task_handle.await;
729            }
730
731            let (shutdown_sender, shutdown_receiver) = unbounded_channel();
732            let client = self.client.clone();
733            let schema_name = self.config.schema.clone();
734            let cdc_table_name = self.config.cdc_table.clone().unwrap();
735            let target_table_name = self.config.table.clone();
736            let pk_column_names = self.pk_column_names.clone();
737            let all_column_names = self.all_column_names.clone();
738            let schedule_seconds = self.schedule_seconds;
739            let periodic_task_handle = tokio::spawn(async move {
740                Self::run_periodic_query_task(
741                    client,
742                    schema_name.as_deref(),
743                    &cdc_table_name,
744                    &target_table_name,
745                    pk_column_names,
746                    all_column_names,
747                    schedule_seconds,
748                    shutdown_receiver,
749                )
750                .await;
751            });
752            self.shutdown_sender = Some(shutdown_sender);
753            self.periodic_task_handle = Some(periodic_task_handle);
754        }
755
756        Ok(())
757    }
758}
759
760pub fn build_create_table_sql(
761    schema_name: Option<&str>,
762    table_name: &str,
763    schema: &Schema,
764    need_op_and_row_id: bool,
765) -> Result<String> {
766    let mut columns: Vec<String> = schema
767        .fields
768        .iter()
769        .map(|field| {
770            let data_type = convert_redshift_data_type(&field.data_type)?;
771            Ok(format!("{} {}", field.name, data_type))
772        })
773        .collect::<Result<Vec<String>>>()?;
774    if need_op_and_row_id {
775        columns.push(format!("{} VARCHAR(MAX)", __ROW_ID));
776        columns.push(format!("{} INT", __OP));
777    }
778    let columns_str = columns.join(", ");
779    let full_table_name = build_full_table_name(schema_name, table_name);
780    Ok(format!(
781        "CREATE TABLE IF NOT EXISTS {} ({})",
782        full_table_name, columns_str
783    ))
784}
785
786fn convert_redshift_data_type(data_type: &DataType) -> Result<String> {
787    let data_type = match data_type {
788        DataType::Int16 => "SMALLINT".to_owned(),
789        DataType::Int32 => "INTEGER".to_owned(),
790        DataType::Int64 => "BIGINT".to_owned(),
791        DataType::Float32 => "REAL".to_owned(),
792        DataType::Float64 => "FLOAT".to_owned(),
793        DataType::Boolean => "BOOLEAN".to_owned(),
794        DataType::Varchar => "VARCHAR(MAX)".to_owned(),
795        DataType::Date => "DATE".to_owned(),
796        DataType::Timestamp => "TIMESTAMP".to_owned(),
797        DataType::Timestamptz => "TIMESTAMPTZ".to_owned(),
798        DataType::Jsonb => "VARCHAR(MAX)".to_owned(),
799        DataType::Decimal => "DECIMAL".to_owned(),
800        DataType::Time => "TIME".to_owned(),
801        _ => {
802            return Err(SinkError::Config(anyhow!(
803                "Dont support auto create table for datatype: {}",
804                data_type
805            )));
806        }
807    };
808    Ok(data_type)
809}
810
811fn build_create_merge_into_task_sql(
812    schema_name: Option<&str>,
813    cdc_table_name: &str,
814    target_table_name: &str,
815    pk_column_names: &Vec<String>,
816    all_column_names: &Vec<String>,
817) -> Vec<String> {
818    let cdc_table_name = build_full_table_name(schema_name, cdc_table_name);
819    let target_table_name = build_full_table_name(schema_name, target_table_name);
820    let pk_names_str = pk_column_names.join(", ");
821    let pk_names_eq_str = pk_column_names
822        .iter()
823        .map(|name| format!("{target_table_name}.{name} = source.{name}", name = name))
824        .collect::<Vec<String>>()
825        .join(" AND ");
826    let all_column_names_set_str = all_column_names
827        .iter()
828        .map(|name| format!("{name} = source.{name}", name = name))
829        .collect::<Vec<String>>()
830        .join(", ");
831    let all_column_names_str = all_column_names.join(", ");
832    let all_column_names_insert_str = all_column_names
833        .iter()
834        .map(|name| format!("source.{name}", name = name))
835        .collect::<Vec<String>>()
836        .join(", ");
837
838    vec![
839        format!(
840            r#"
841            CREATE TEMP TABLE max_id_table AS
842            SELECT COALESCE(MAX({redshift_sink_row_id}), '0') AS max_row_id
843            FROM {cdc_table_name};
844            "#,
845            redshift_sink_row_id = __ROW_ID,
846            cdc_table_name = cdc_table_name,
847        ),
848        format!(
849            r#"
850            DELETE FROM {target_table_name}
851            USING (
852                SELECT *
853                FROM (
854                    SELECT *, ROW_NUMBER() OVER (
855                        PARTITION BY {pk_names_str}
856                        ORDER BY {redshift_sink_row_id} DESC
857                    ) AS dedupe_id
858                    FROM {cdc_table_name}, max_id_table
859                    WHERE {cdc_table_name}.{redshift_sink_row_id} <= max_id_table.max_row_id
860                ) AS subquery
861                WHERE dedupe_id = 1 AND {redshift_sink_op} IN (2, 4)
862            ) AS source
863            WHERE {pk_names_eq_str};
864            "#,
865            target_table_name = target_table_name,
866            pk_names_str = pk_names_str,
867            redshift_sink_row_id = __ROW_ID,
868            cdc_table_name = cdc_table_name,
869            redshift_sink_op = __OP,
870            pk_names_eq_str = pk_names_eq_str,
871        ),
872        format!(
873            r#"
874            MERGE INTO {target_table_name}
875            USING (
876                SELECT *
877                FROM (
878                    SELECT *, ROW_NUMBER() OVER (
879                        PARTITION BY {pk_names_str}
880                        ORDER BY {redshift_sink_row_id} DESC
881                    ) AS dedupe_id
882                    FROM {cdc_table_name}, max_id_table
883                    WHERE {cdc_table_name}.{redshift_sink_row_id} <= max_id_table.max_row_id
884                ) AS subquery
885                WHERE dedupe_id = 1 AND {redshift_sink_op} IN (1, 3)
886            ) AS source
887            ON {pk_names_eq_str}
888            WHEN MATCHED THEN
889                UPDATE SET {all_column_names_set_str}
890            WHEN NOT MATCHED THEN
891                INSERT ({all_column_names_str}) VALUES ({all_column_names_insert_str});
892            "#,
893            target_table_name = target_table_name,
894            pk_names_str = pk_names_str,
895            redshift_sink_row_id = __ROW_ID,
896            cdc_table_name = cdc_table_name,
897            redshift_sink_op = __OP,
898            pk_names_eq_str = pk_names_eq_str,
899            all_column_names_set_str = all_column_names_set_str,
900            all_column_names_str = all_column_names_str,
901            all_column_names_insert_str = all_column_names_insert_str,
902        ),
903        format!(
904            r#"
905            DELETE FROM {cdc_table_name}
906            USING max_id_table
907            WHERE {cdc_table_name}.{redshift_sink_row_id} <= max_id_table.max_row_id;
908            "#,
909            cdc_table_name = cdc_table_name,
910            redshift_sink_row_id = __ROW_ID,
911        ),
912        "DROP TABLE IF EXISTS max_id_table;".to_owned(),
913    ]
914}
915
916fn build_copy_into_sql(
917    schema_name: Option<&str>,
918    table_name: &str,
919    manifest_path: &str,
920    access_key: &Option<String>,
921    secret_key: &Option<String>,
922    assume_role: &Option<String>,
923) -> Result<String> {
924    let table_name = build_full_table_name(schema_name, table_name);
925    let credentials = if let Some(assume_role) = assume_role {
926        &format!("aws_iam_role={}", assume_role)
927    } else if let (Some(access_key), Some(secret_key)) = (access_key, secret_key) {
928        &format!(
929            "aws_access_key_id={};aws_secret_access_key={}",
930            access_key, secret_key
931        )
932    } else {
933        return Err(SinkError::Config(anyhow!(
934            "Either assume_role or access_key and secret_key must be provided for Redshift COPY command"
935        )));
936    };
937    Ok(format!(
938        r#"
939        COPY {table_name}
940        FROM '{manifest_path}'
941        CREDENTIALS '{credentials}'
942        FORMAT AS JSON 'auto'
943        DATEFORMAT 'auto'
944        TIMEFORMAT 'auto'
945        MANIFEST;
946        "#,
947        table_name = table_name,
948        manifest_path = manifest_path,
949        credentials = credentials
950    ))
951}