risingwave_connector/sink/snowflake_redshift/
redshift.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use core::num::NonZero;
16use std::fmt::Write;
17use std::time::Duration;
18
19use anyhow::anyhow;
20use bytes::BytesMut;
21use itertools::Itertools;
22use phf::{Set, phf_set};
23use risingwave_common::array::StreamChunk;
24use risingwave_common::catalog::Schema;
25use risingwave_common::types::DataType;
26use risingwave_pb::connector_service::sink_metadata::SerializedMetadata;
27use risingwave_pb::connector_service::{SinkMetadata, sink_metadata};
28use risingwave_pb::stream_plan::PbSinkSchemaChange;
29use serde::Deserialize;
30use serde_json::json;
31use serde_with::{DisplayFromStr, serde_as};
32use thiserror_ext::AsReport;
33use tokio::sync::mpsc::{UnboundedSender, unbounded_channel};
34use tokio::time::{MissedTickBehavior, interval};
35use tonic::async_trait;
36use tracing::warn;
37use with_options::WithOptions;
38
39use crate::connector_common::IcebergSinkCompactionUpdate;
40use crate::enforce_secret::EnforceSecret;
41use crate::sink::catalog::SinkId;
42use crate::sink::coordinate::CoordinatedLogSinker;
43use crate::sink::file_sink::opendal_sink::FileSink;
44use crate::sink::file_sink::s3::{S3Common, S3Sink};
45use crate::sink::jdbc_jni_client::{self, JdbcJniClient};
46use crate::sink::snowflake_redshift::{
47    __OP, __ROW_ID, SnowflakeRedshiftSinkJdbcWriter, SnowflakeRedshiftSinkS3Writer,
48    build_opendal_writer_path,
49};
50use crate::sink::writer::SinkWriter;
51use crate::sink::{
52    Result, SinglePhaseCommitCoordinator, Sink, SinkCommitCoordinator, SinkError, SinkParam,
53};
54
55pub const REDSHIFT_SINK: &str = "redshift";
56
57pub fn build_full_table_name(schema_name: Option<&str>, table_name: &str) -> String {
58    if let Some(schema_name) = schema_name {
59        format!(r#""{}"."{}""#, schema_name, table_name)
60    } else {
61        format!(r#""{}""#, table_name)
62    }
63}
64
65fn build_alter_add_column_sql(
66    schema_name: Option<&str>,
67    table_name: &str,
68    columns: &Vec<(String, String)>,
69) -> String {
70    let full_table_name = build_full_table_name(schema_name, table_name);
71    // redshift does not support add column IF NOT EXISTS yet.
72    jdbc_jni_client::build_alter_add_column_sql(&full_table_name, columns, false)
73}
74
75#[serde_as]
76#[derive(Debug, Clone, Deserialize, WithOptions)]
77pub struct RedShiftConfig {
78    #[serde(rename = "jdbc.url")]
79    pub jdbc_url: String,
80
81    #[serde(rename = "user")]
82    pub username: Option<String>,
83
84    #[serde(rename = "password")]
85    pub password: Option<String>,
86
87    #[serde(rename = "schema")]
88    pub schema: Option<String>,
89
90    #[serde(rename = "intermediate.schema.name")]
91    pub intermediate_schema: Option<String>,
92
93    #[serde(rename = "table.name")]
94    pub table: String,
95
96    #[serde(rename = "intermediate.table.name")]
97    pub cdc_table: Option<String>,
98
99    #[serde(default)]
100    #[serde(rename = "create_table_if_not_exists")]
101    #[serde_as(as = "DisplayFromStr")]
102    pub create_table_if_not_exists: bool,
103
104    #[serde(default = "default_target_interval_schedule")]
105    #[serde(rename = "write.target.interval.seconds")]
106    #[serde_as(as = "DisplayFromStr")]
107    pub writer_target_interval_seconds: u64,
108
109    #[serde(default = "default_intermediate_interval_schedule")]
110    #[serde(rename = "write.intermediate.interval.seconds")]
111    #[serde_as(as = "DisplayFromStr")]
112    pub write_intermediate_interval_seconds: u64,
113
114    #[serde(default = "default_batch_insert_rows")]
115    #[serde(rename = "batch.insert.rows")]
116    #[serde_as(as = "DisplayFromStr")]
117    pub batch_insert_rows: u32,
118
119    #[serde(default = "default_with_s3")]
120    #[serde(rename = "with_s3")]
121    #[serde_as(as = "DisplayFromStr")]
122    pub with_s3: bool,
123
124    #[serde(flatten)]
125    pub s3_inner: Option<S3Common>,
126}
127
128fn default_target_interval_schedule() -> u64 {
129    3600 // Default to 1 hour
130}
131
132fn default_intermediate_interval_schedule() -> u64 {
133    1800 // Default to 0.5 hour
134}
135
136fn default_batch_insert_rows() -> u32 {
137    4096 // Default batch size
138}
139
140fn default_with_s3() -> bool {
141    true
142}
143
144impl RedShiftConfig {
145    pub fn build_client(&self) -> Result<JdbcJniClient> {
146        let mut jdbc_url = self.jdbc_url.clone();
147        if let Some(username) = &self.username {
148            jdbc_url = format!("{}?user={}", jdbc_url, username);
149        }
150        if let Some(password) = &self.password {
151            jdbc_url = format!("{}&password={}", jdbc_url, password);
152        }
153        JdbcJniClient::new(jdbc_url)
154    }
155}
156
157#[derive(Debug)]
158pub struct RedshiftSink {
159    config: RedShiftConfig,
160    param: SinkParam,
161    is_append_only: bool,
162    schema: Schema,
163    pk_indices: Vec<usize>,
164}
165impl EnforceSecret for RedshiftSink {
166    const ENFORCE_SECRET_PROPERTIES: Set<&'static str> = phf_set! {
167        "user",
168        "password",
169        "jdbc.url"
170    };
171}
172
173impl TryFrom<SinkParam> for RedshiftSink {
174    type Error = SinkError;
175
176    fn try_from(param: SinkParam) -> std::result::Result<Self, Self::Error> {
177        let config = serde_json::from_value::<RedShiftConfig>(
178            serde_json::to_value(param.properties.clone()).unwrap(),
179        )
180        .map_err(|e| SinkError::Config(anyhow!(e)))?;
181        let is_append_only = param.sink_type.is_append_only();
182        let schema = param.schema();
183        let pk_indices = param.downstream_pk_or_empty();
184        Ok(Self {
185            config,
186            param,
187            is_append_only,
188            schema,
189            pk_indices,
190        })
191    }
192}
193
194impl Sink for RedshiftSink {
195    type LogSinker = CoordinatedLogSinker<RedShiftSinkWriter>;
196
197    const SINK_NAME: &'static str = REDSHIFT_SINK;
198
199    async fn validate(&self) -> Result<()> {
200        if self.config.create_table_if_not_exists {
201            let client = self.config.build_client()?;
202            let schema = self.param.schema();
203            let build_table_sql = build_create_table_sql(
204                self.config.schema.as_deref(),
205                &self.config.table,
206                &schema,
207                false,
208            )?;
209            client.execute_sql_sync(vec![build_table_sql]).await?;
210            if !self.is_append_only {
211                let cdc_table = self.config.cdc_table.as_ref().ok_or_else(|| {
212                    SinkError::Config(anyhow!(
213                        "intermediate.table.name is required for append-only sink"
214                    ))
215                })?;
216                let cdc_schema_for_create = self
217                    .config
218                    .intermediate_schema
219                    .as_deref()
220                    .or(self.config.schema.as_deref());
221                let build_cdc_table_sql =
222                    build_create_table_sql(cdc_schema_for_create, cdc_table, &schema, true)?;
223                client.execute_sql_sync(vec![build_cdc_table_sql]).await?;
224            }
225        }
226        Ok(())
227    }
228
229    fn support_schema_change() -> bool {
230        true
231    }
232
233    async fn new_log_sinker(
234        &self,
235        writer_param: crate::sink::SinkWriterParam,
236    ) -> Result<Self::LogSinker> {
237        let writer = RedShiftSinkWriter::new(
238            self.config.clone(),
239            self.is_append_only,
240            writer_param.clone(),
241            self.param.clone(),
242        )
243        .await?;
244        CoordinatedLogSinker::new(
245            &writer_param,
246            self.param.clone(),
247            writer,
248            NonZero::new(1).unwrap(),
249        )
250        .await
251    }
252
253    fn is_coordinated_sink(&self) -> bool {
254        true
255    }
256
257    async fn new_coordinator(
258        &self,
259        _iceberg_compact_stat_sender: Option<UnboundedSender<IcebergSinkCompactionUpdate>>,
260    ) -> Result<SinkCommitCoordinator> {
261        let pk_column_names: Vec<_> = self
262            .schema
263            .fields
264            .iter()
265            .enumerate()
266            .filter(|(index, _)| self.pk_indices.contains(index))
267            .map(|(_, field)| field.name.clone())
268            .collect();
269        if pk_column_names.is_empty() && !self.is_append_only {
270            return Err(SinkError::Config(anyhow!(
271                "Primary key columns not found. Please set the `primary_key` column in the sink properties, or ensure that the sink contains the primary key columns from the upstream."
272            )));
273        }
274        let all_column_names = self
275            .schema
276            .fields
277            .iter()
278            .map(|field| field.name.clone())
279            .collect();
280        let coordinator = RedshiftSinkCommitter::new(
281            self.config.clone(),
282            self.is_append_only,
283            &pk_column_names,
284            &all_column_names,
285            self.param.sink_id,
286        )?;
287        Ok(SinkCommitCoordinator::SinglePhase(Box::new(coordinator)))
288    }
289}
290
291pub enum RedShiftSinkWriter {
292    S3(SnowflakeRedshiftSinkS3Writer),
293    Jdbc(SnowflakeRedshiftSinkJdbcWriter),
294}
295
296impl RedShiftSinkWriter {
297    pub async fn new(
298        config: RedShiftConfig,
299        is_append_only: bool,
300        writer_param: super::SinkWriterParam,
301        param: SinkParam,
302    ) -> Result<Self> {
303        let schema = param.schema();
304        if config.with_s3 {
305            let s3_writer = SnowflakeRedshiftSinkS3Writer::new(
306                config.s3_inner.ok_or_else(|| {
307                    SinkError::Config(anyhow!("S3 configuration is required for S3 sink"))
308                })?,
309                schema,
310                is_append_only,
311                config.table,
312            )?;
313            Ok(Self::S3(s3_writer))
314        } else {
315            let jdbc_writer = SnowflakeRedshiftSinkJdbcWriter::new(
316                is_append_only,
317                writer_param,
318                param,
319                build_full_table_name(config.schema.as_deref(), &config.table),
320            )
321            .await?;
322            Ok(Self::Jdbc(jdbc_writer))
323        }
324    }
325}
326
327#[async_trait]
328impl SinkWriter for RedShiftSinkWriter {
329    type CommitMetadata = Option<SinkMetadata>;
330
331    async fn begin_epoch(&mut self, epoch: u64) -> Result<()> {
332        match self {
333            Self::S3(writer) => writer.begin_epoch(epoch),
334            Self::Jdbc(writer) => writer.begin_epoch(epoch).await,
335        }
336    }
337
338    async fn write_batch(&mut self, chunk: StreamChunk) -> Result<()> {
339        match self {
340            Self::S3(writer) => writer.write_batch(chunk).await,
341            Self::Jdbc(writer) => writer.write_batch(chunk).await,
342        }
343    }
344
345    async fn barrier(&mut self, is_checkpoint: bool) -> Result<Option<SinkMetadata>> {
346        let metadata = match self {
347            Self::S3(writer) => {
348                if let Some(path) = writer.barrier(is_checkpoint).await? {
349                    path.into_bytes()
350                } else {
351                    vec![]
352                }
353            }
354            Self::Jdbc(writer) => {
355                writer.barrier(is_checkpoint).await?;
356                vec![]
357            }
358        };
359        Ok(Some(SinkMetadata {
360            metadata: Some(sink_metadata::Metadata::Serialized(SerializedMetadata {
361                metadata,
362            })),
363        }))
364    }
365
366    async fn abort(&mut self) -> Result<()> {
367        if let Self::Jdbc(writer) = self {
368            writer.abort().await
369        } else {
370            Ok(())
371        }
372    }
373}
374
375pub struct RedshiftSinkCommitter {
376    config: RedShiftConfig,
377    client: JdbcJniClient,
378    sink_id: SinkId,
379    pk_column_names: Vec<String>,
380    all_column_names: Vec<String>,
381    writer_target_interval_seconds: u64,
382    write_intermediate_interval_seconds: u64,
383    is_append_only: bool,
384    periodic_task_handle: Option<tokio::task::JoinHandle<()>>,
385    shutdown_sender: Option<tokio::sync::mpsc::UnboundedSender<()>>,
386}
387
388impl RedshiftSinkCommitter {
389    pub fn new(
390        config: RedShiftConfig,
391        is_append_only: bool,
392        pk_column_names: &Vec<String>,
393        all_column_names: &Vec<String>,
394        sink_id: SinkId,
395    ) -> Result<Self> {
396        let client = config.build_client()?;
397        let writer_target_interval_seconds = config.writer_target_interval_seconds;
398        let write_intermediate_interval_seconds = config.write_intermediate_interval_seconds;
399
400        let (periodic_task_handle, shutdown_sender) = match (is_append_only, config.with_s3) {
401            (true, true) | (false, _) => {
402                let task_client = config.build_client()?;
403                let config = config.clone();
404                let (shutdown_sender, shutdown_receiver) = unbounded_channel();
405                let target_schema_name = config.schema.as_deref();
406                let effective_cdc_schema =
407                    config.intermediate_schema.as_deref().or(target_schema_name);
408                let merge_into_sql = if !is_append_only {
409                    Some(build_create_merge_into_task_sql(
410                        effective_cdc_schema,
411                        target_schema_name,
412                        config.cdc_table.as_ref().ok_or_else(|| {
413                            SinkError::Config(anyhow!(
414                                "intermediate.table.name is required for non-append-only sink"
415                            ))
416                        })?,
417                        &config.table,
418                        pk_column_names,
419                        all_column_names,
420                    ))
421                } else {
422                    None
423                };
424                let periodic_task_handle = tokio::spawn(async move {
425                    Self::run_periodic_query_task(
426                        task_client,
427                        merge_into_sql,
428                        config.with_s3,
429                        writer_target_interval_seconds,
430                        write_intermediate_interval_seconds,
431                        sink_id,
432                        config,
433                        is_append_only,
434                        shutdown_receiver,
435                    )
436                    .await;
437                });
438                (Some(periodic_task_handle), Some(shutdown_sender))
439            }
440            _ => (None, None),
441        };
442
443        Ok(Self {
444            client,
445            config,
446            sink_id,
447            pk_column_names: pk_column_names.clone(),
448            all_column_names: all_column_names.clone(),
449            is_append_only,
450            writer_target_interval_seconds,
451            write_intermediate_interval_seconds,
452            periodic_task_handle,
453            shutdown_sender,
454        })
455    }
456
457    async fn flush_manifest_to_redshift(
458        client: &JdbcJniClient,
459        config: &RedShiftConfig,
460        s3_inner: &S3Common,
461        is_append_only: bool,
462    ) -> Result<()> {
463        let s3_operator = FileSink::<S3Sink>::new_s3_sink(s3_inner)?;
464        let mut manifest_path = s3_inner.path.clone().unwrap_or("".to_owned());
465        if !manifest_path.ends_with('/') {
466            manifest_path.push('/');
467        }
468        manifest_path.push_str(&format!("{}/", config.table));
469        manifest_path.push_str("manifest/");
470        let manifests = s3_operator
471            .list(&manifest_path)
472            .await?
473            .into_iter()
474            .map(|e| e.path().to_owned())
475            .collect::<Vec<_>>();
476        for manifest in &manifests {
477            Self::copy_into_from_s3_to_redshift(client, config, s3_inner, is_append_only, manifest)
478                .await?;
479        }
480        s3_operator.delete_iter(manifests).await?;
481        Ok(())
482    }
483
484    async fn write_manifest_to_s3(
485        s3_inner: &S3Common,
486        paths: Vec<String>,
487        table: &str,
488    ) -> Result<String> {
489        let manifest_entries: Vec<_> = paths
490            .into_iter()
491            .map(|path| json!({ "url": path, "mandatory": true }))
492            .collect();
493        let s3_operator = FileSink::<S3Sink>::new_s3_sink(s3_inner)?;
494        let (mut writer, manifest_path) =
495            build_opendal_writer_path(s3_inner, &s3_operator, Some("manifest"), table).await?;
496        let manifest_json = json!({ "entries": manifest_entries });
497        let mut chunk_buf = BytesMut::new();
498        writeln!(chunk_buf, "{}", manifest_json).unwrap();
499        writer.write(chunk_buf.freeze()).await?;
500        writer.close().await.map_err(|e| {
501            SinkError::Redshift(anyhow!(
502                "Failed to close manifest writer: {}",
503                e.to_report_string()
504            ))
505        })?;
506        Ok(manifest_path)
507    }
508
509    pub async fn copy_into_from_s3_to_redshift(
510        client: &JdbcJniClient,
511        config: &RedShiftConfig,
512        s3_inner: &S3Common,
513        is_append_only: bool,
514        manifest: &str,
515    ) -> Result<()> {
516        let all_path = format!("s3://{}/{}", s3_inner.bucket_name, manifest);
517
518        let (table, schema_name) = if is_append_only {
519            (&config.table, config.schema.as_deref())
520        } else {
521            (
522                config.cdc_table.as_ref().ok_or_else(|| {
523                    SinkError::Config(anyhow!(
524                        "intermediate.table.name is required for non-append-only sink"
525                    ))
526                })?,
527                config
528                    .intermediate_schema
529                    .as_deref()
530                    .or(config.schema.as_deref()),
531            )
532        };
533        let copy_into_sql = build_copy_into_sql(
534            schema_name,
535            table,
536            &all_path,
537            &s3_inner.access,
538            &s3_inner.secret,
539            &s3_inner.assume_role,
540        )?;
541        client.execute_sql_sync(vec![copy_into_sql]).await?;
542        Ok(())
543    }
544
545    async fn run_periodic_query_task(
546        client: JdbcJniClient,
547        merge_into_sql: Option<Vec<String>>,
548        need_copy_into: bool,
549        writer_target_interval_seconds: u64,
550        write_intermediate_interval_seconds: u64,
551        sink_id: SinkId,
552        config: RedShiftConfig,
553        is_append_only: bool,
554        mut shutdown_receiver: tokio::sync::mpsc::UnboundedReceiver<()>,
555    ) {
556        let mut copy_timer = interval(Duration::from_secs(write_intermediate_interval_seconds));
557        copy_timer.set_missed_tick_behavior(MissedTickBehavior::Skip);
558        let mut merge_timer = interval(Duration::from_secs(writer_target_interval_seconds));
559        merge_timer.set_missed_tick_behavior(MissedTickBehavior::Skip);
560
561        loop {
562            tokio::select! {
563                _ = shutdown_receiver.recv() => break,
564                _ = merge_timer.tick(), if merge_into_sql.is_some() => {
565                    if let Some(sql) = &merge_into_sql && let Err(e) = client.execute_sql_sync(sql.clone()).await {
566                        tracing::warn!("Failed to execute periodic query for table {}: {}", config.table, e.as_report());
567                    }
568                },
569                _ = copy_timer.tick(), if need_copy_into => {
570                    if let Err(e) = async {
571                        let s3_inner = config.s3_inner.as_ref().ok_or_else(|| {
572                            SinkError::Config(anyhow!("S3 configuration is required for redshift s3 sink"))
573                        })?;
574                        Self::flush_manifest_to_redshift(&client, &config,s3_inner, is_append_only).await?;
575                        Ok::<(),SinkError>(())
576                    }.await {
577                        tracing::error!("Failed to execute copy into task for sink id {}: {}", sink_id, e.as_report());
578                    }
579                }
580            }
581        }
582        tracing::info!("Periodic query task stopped for sink id {}", sink_id);
583    }
584}
585
586impl Drop for RedshiftSinkCommitter {
587    fn drop(&mut self) {
588        // Send shutdown signal to the periodic task
589        if let Some(shutdown_sender) = &self.shutdown_sender
590            && let Err(e) = shutdown_sender.send(())
591        {
592            tracing::warn!(
593                "Failed to send shutdown signal to periodic task: {}",
594                e.as_report()
595            );
596        }
597        tracing::info!("RedshiftSinkCommitter dropped, periodic task stopped");
598    }
599}
600
601#[async_trait]
602impl SinglePhaseCommitCoordinator for RedshiftSinkCommitter {
603    async fn init(&mut self) -> Result<()> {
604        if self.config.with_s3 {
605            Self::flush_manifest_to_redshift(
606                &self.client,
607                &self.config,
608                self.config.s3_inner.as_ref().ok_or_else(|| {
609                    SinkError::Config(anyhow!("S3 configuration is required for redshift s3 sink"))
610                })?,
611                self.is_append_only,
612            )
613            .await?;
614        }
615        Ok(())
616    }
617
618    async fn commit_data(&mut self, epoch: u64, metadata: Vec<SinkMetadata>) -> Result<()> {
619        if let Some(handle) = &self.periodic_task_handle {
620            let is_finished = handle.is_finished();
621            if is_finished {
622                let handle = self.periodic_task_handle.take().unwrap();
623                handle.await.map_err(|e| {
624                    SinkError::Redshift(anyhow!(
625                        "Periodic task for sink id {} panicked: {}",
626                        self.sink_id,
627                        e.to_report_string()
628                    ))
629                })?;
630            }
631        };
632        let paths = metadata
633            .into_iter()
634            .filter(|m| {
635                if let Some(sink_metadata::Metadata::Serialized(SerializedMetadata { metadata })) =
636                    &m.metadata
637                {
638                    !metadata.is_empty()
639                } else {
640                    false
641                }
642            })
643            .map(|metadata| {
644                let path = if let Some(sink_metadata::Metadata::Serialized(SerializedMetadata {
645                    metadata,
646                })) = metadata.metadata
647                {
648                    String::from_utf8(metadata).map_err(|e| SinkError::Config(anyhow!(e)))
649                } else {
650                    Err(SinkError::Config(anyhow!("Invalid metadata format")))
651                }?;
652                Ok(path)
653            })
654            .collect::<Result<Vec<_>>>()?;
655
656        // Write manifest file to S3 instead of inserting to database
657        if !paths.is_empty() {
658            let s3_inner = self.config.s3_inner.as_ref().ok_or_else(|| {
659                SinkError::Config(anyhow!("S3 configuration is required for S3 sink"))
660            })?;
661            {
662                Self::write_manifest_to_s3(s3_inner, paths, &self.config.table).await?;
663            }
664            tracing::info!(
665                "Manifest file written to S3 for sink id {} at epoch {}",
666                self.sink_id,
667                epoch
668            );
669        }
670        Ok(())
671    }
672
673    async fn commit_schema_change(
674        &mut self,
675        _epoch: u64,
676        schema_change: PbSinkSchemaChange,
677    ) -> Result<()> {
678        use risingwave_pb::stream_plan::sink_schema_change::PbOp as SinkSchemaChangeOp;
679        let schema_change_op = schema_change
680            .op
681            .ok_or_else(|| SinkError::Coordinator(anyhow!("Invalid schema change operation")))?;
682        let SinkSchemaChangeOp::AddColumns(add_columns) = schema_change_op else {
683            return Err(SinkError::Coordinator(anyhow!(
684                "Only AddColumns schema change is supported for Redshift sink"
685            )));
686        };
687        if let Some(shutdown_sender) = &self.shutdown_sender {
688            // Send shutdown signal to the periodic task before altering the table
689            shutdown_sender
690                .send(())
691                .map_err(|e| SinkError::Config(anyhow!(e)))?;
692        }
693        let sql = build_alter_add_column_sql(
694            self.config.schema.as_deref(),
695            &self.config.table,
696            &add_columns
697                .fields
698                .iter()
699                .map(|f| {
700                    (
701                        f.name.clone(),
702                        DataType::from(f.data_type.as_ref().unwrap()).to_string(),
703                    )
704                })
705                .collect_vec(),
706        );
707        let check_column_exists = |e: anyhow::Error| {
708            let err_str = e.to_report_string();
709            if regex::Regex::new(".+ of relation .+ already exists")
710                .unwrap()
711                .find(&err_str)
712                .is_none()
713            {
714                return Err(e);
715            }
716            warn!("redshift sink columns already exists. skipped");
717            Ok(())
718        };
719        self.client
720            .execute_sql_sync(vec![sql.clone()])
721            .await
722            .or_else(check_column_exists)?;
723        let merge_into_sql = if !self.is_append_only {
724            let cdc_table_name = self.config.cdc_table.as_ref().ok_or_else(|| {
725                SinkError::Config(anyhow!(
726                    "intermediate.table.name is required for non-append-only sink"
727                ))
728            })?;
729            let sql = build_alter_add_column_sql(
730                self.config.schema.as_deref(),
731                cdc_table_name,
732                &add_columns
733                    .fields
734                    .iter()
735                    .map(|f| {
736                        (
737                            f.name.clone(),
738                            DataType::from(f.data_type.as_ref().unwrap()).to_string(),
739                        )
740                    })
741                    .collect::<Vec<_>>(),
742            );
743            self.client
744                .execute_sql_sync(vec![sql.clone()])
745                .await
746                .or_else(check_column_exists)?;
747            self.all_column_names
748                .extend(add_columns.fields.iter().map(|f| f.name.clone()));
749            let target_schema_name = self.config.schema.as_deref();
750            let effective_cdc_schema = self
751                .config
752                .intermediate_schema
753                .as_deref()
754                .or(target_schema_name);
755            let merge_into_sql = build_create_merge_into_task_sql(
756                effective_cdc_schema,
757                target_schema_name,
758                self.config.cdc_table.as_ref().ok_or_else(|| {
759                    SinkError::Config(anyhow!(
760                        "intermediate.table.name is required for non-append-only sink"
761                    ))
762                })?,
763                &self.config.table,
764                &self.pk_column_names,
765                &self.all_column_names,
766            );
767            Some(merge_into_sql)
768        } else {
769            None
770        };
771
772        if let Some(shutdown_sender) = self.shutdown_sender.take() {
773            let _ = shutdown_sender.send(());
774        }
775        if let Some(periodic_task_handle) = self.periodic_task_handle.take() {
776            let _ = periodic_task_handle.await;
777        }
778
779        let (shutdown_sender, shutdown_receiver) = unbounded_channel();
780        let client = self.client.clone();
781
782        let writer_target_interval_seconds = self.writer_target_interval_seconds;
783        let write_intermediate_interval_seconds = self.write_intermediate_interval_seconds;
784        let config = self.config.clone();
785        let sink_id = self.sink_id;
786        let is_append_only = self.is_append_only;
787        let periodic_task_handle = tokio::spawn(async move {
788            Self::run_periodic_query_task(
789                client,
790                merge_into_sql,
791                config.with_s3,
792                writer_target_interval_seconds,
793                write_intermediate_interval_seconds,
794                sink_id,
795                config,
796                is_append_only,
797                shutdown_receiver,
798            )
799            .await;
800        });
801        self.shutdown_sender = Some(shutdown_sender);
802        self.periodic_task_handle = Some(periodic_task_handle);
803
804        Ok(())
805    }
806}
807
808pub fn build_create_table_sql(
809    schema_name: Option<&str>,
810    table_name: &str,
811    schema: &Schema,
812    need_op_and_row_id: bool,
813) -> Result<String> {
814    let mut columns: Vec<String> = schema
815        .fields
816        .iter()
817        .map(|field| {
818            let data_type = convert_redshift_data_type(&field.data_type)?;
819            Ok(format!("{} {}", field.name, data_type))
820        })
821        .collect::<Result<Vec<String>>>()?;
822    if need_op_and_row_id {
823        columns.push(format!("{} VARCHAR(MAX)", __ROW_ID));
824        columns.push(format!("{} INT", __OP));
825    }
826    let columns_str = columns.join(", ");
827    let full_table_name = build_full_table_name(schema_name, table_name);
828    Ok(format!(
829        "CREATE TABLE IF NOT EXISTS {} ({})",
830        full_table_name, columns_str
831    ))
832}
833
834fn convert_redshift_data_type(data_type: &DataType) -> Result<String> {
835    let data_type = match data_type {
836        DataType::Int16 => "SMALLINT".to_owned(),
837        DataType::Int32 => "INTEGER".to_owned(),
838        DataType::Int64 => "BIGINT".to_owned(),
839        DataType::Float32 => "REAL".to_owned(),
840        DataType::Float64 => "FLOAT".to_owned(),
841        DataType::Boolean => "BOOLEAN".to_owned(),
842        DataType::Varchar => "VARCHAR(MAX)".to_owned(),
843        DataType::Date => "DATE".to_owned(),
844        DataType::Timestamp => "TIMESTAMP".to_owned(),
845        DataType::Timestamptz => "TIMESTAMPTZ".to_owned(),
846        DataType::Jsonb => "VARCHAR(MAX)".to_owned(),
847        DataType::Decimal => "DECIMAL".to_owned(),
848        DataType::Time => "TIME".to_owned(),
849        _ => {
850            return Err(SinkError::Config(anyhow!(
851                "Dont support auto create table for datatype: {}",
852                data_type
853            )));
854        }
855    };
856    Ok(data_type)
857}
858
859fn build_create_merge_into_task_sql(
860    cdc_schema_name: Option<&str>,
861    target_schema_name: Option<&str>,
862    cdc_table_name: &str,
863    target_table_name: &str,
864    pk_column_names: &Vec<String>,
865    all_column_names: &Vec<String>,
866) -> Vec<String> {
867    let cdc_table_name = build_full_table_name(cdc_schema_name, cdc_table_name);
868    let target_table_name = build_full_table_name(target_schema_name, target_table_name);
869    let pk_names_str = pk_column_names.join(", ");
870    let pk_names_eq_str = pk_column_names
871        .iter()
872        .map(|name| format!("{target_table_name}.{name} = source.{name}", name = name))
873        .collect::<Vec<String>>()
874        .join(" AND ");
875    let all_column_names_set_str = all_column_names
876        .iter()
877        .map(|name| format!("{name} = source.{name}", name = name))
878        .collect::<Vec<String>>()
879        .join(", ");
880    let all_column_names_str = all_column_names.join(", ");
881    let all_column_names_insert_str = all_column_names
882        .iter()
883        .map(|name| format!("source.{name}", name = name))
884        .collect::<Vec<String>>()
885        .join(", ");
886
887    vec![
888        format!(
889            r#"
890            CREATE TEMP TABLE max_id_table AS
891            SELECT COALESCE(MAX({redshift_sink_row_id}), '0') AS max_row_id
892            FROM {cdc_table_name};
893            "#,
894            redshift_sink_row_id = __ROW_ID,
895            cdc_table_name = cdc_table_name,
896        ),
897        format!(
898            r#"
899            DELETE FROM {target_table_name}
900            USING (
901                SELECT *
902                FROM (
903                    SELECT *, ROW_NUMBER() OVER (
904                        PARTITION BY {pk_names_str}
905                        ORDER BY {redshift_sink_row_id} DESC
906                    ) AS dedupe_id
907                    FROM {cdc_table_name}, max_id_table
908                    WHERE {cdc_table_name}.{redshift_sink_row_id} <= max_id_table.max_row_id
909                ) AS subquery
910                WHERE dedupe_id = 1 AND {redshift_sink_op} IN (2, 4)
911            ) AS source
912            WHERE {pk_names_eq_str};
913            "#,
914            target_table_name = target_table_name,
915            pk_names_str = pk_names_str,
916            redshift_sink_row_id = __ROW_ID,
917            cdc_table_name = cdc_table_name,
918            redshift_sink_op = __OP,
919            pk_names_eq_str = pk_names_eq_str,
920        ),
921        format!(
922            r#"
923            MERGE INTO {target_table_name}
924            USING (
925                SELECT *
926                FROM (
927                    SELECT *, ROW_NUMBER() OVER (
928                        PARTITION BY {pk_names_str}
929                        ORDER BY {redshift_sink_row_id} DESC
930                    ) AS dedupe_id
931                    FROM {cdc_table_name}, max_id_table
932                    WHERE {cdc_table_name}.{redshift_sink_row_id} <= max_id_table.max_row_id
933                ) AS subquery
934                WHERE dedupe_id = 1 AND {redshift_sink_op} IN (1, 3)
935            ) AS source
936            ON {pk_names_eq_str}
937            WHEN MATCHED THEN
938                UPDATE SET {all_column_names_set_str}
939            WHEN NOT MATCHED THEN
940                INSERT ({all_column_names_str}) VALUES ({all_column_names_insert_str});
941            "#,
942            target_table_name = target_table_name,
943            pk_names_str = pk_names_str,
944            redshift_sink_row_id = __ROW_ID,
945            cdc_table_name = cdc_table_name,
946            redshift_sink_op = __OP,
947            pk_names_eq_str = pk_names_eq_str,
948            all_column_names_set_str = all_column_names_set_str,
949            all_column_names_str = all_column_names_str,
950            all_column_names_insert_str = all_column_names_insert_str,
951        ),
952        format!(
953            r#"
954            DELETE FROM {cdc_table_name}
955            USING max_id_table
956            WHERE {cdc_table_name}.{redshift_sink_row_id} <= max_id_table.max_row_id;
957            "#,
958            cdc_table_name = cdc_table_name,
959            redshift_sink_row_id = __ROW_ID,
960        ),
961        "DROP TABLE IF EXISTS max_id_table;".to_owned(),
962    ]
963}
964
965fn build_copy_into_sql(
966    schema_name: Option<&str>,
967    table_name: &str,
968    manifest_dir: &str,
969    access_key: &Option<String>,
970    secret_key: &Option<String>,
971    assume_role: &Option<String>,
972) -> Result<String> {
973    let table_name = build_full_table_name(schema_name, table_name);
974    let credentials = if let Some(assume_role) = assume_role {
975        &format!("aws_iam_role={}", assume_role)
976    } else if let (Some(access_key), Some(secret_key)) = (access_key, secret_key) {
977        &format!(
978            "aws_access_key_id={};aws_secret_access_key={}",
979            access_key, secret_key
980        )
981    } else {
982        return Err(SinkError::Config(anyhow!(
983            "Either assume_role or access_key and secret_key must be provided for Redshift COPY command"
984        )));
985    };
986    Ok(format!(
987        r#"
988        COPY {table_name}
989        FROM '{manifest_dir}'
990        CREDENTIALS '{credentials}'
991        FORMAT AS JSON 'auto'
992        DATEFORMAT 'auto'
993        TIMEFORMAT 'auto'
994        MANIFEST;
995        "#,
996        table_name = table_name,
997        manifest_dir = manifest_dir,
998        credentials = credentials
999    ))
1000}