risingwave_connector/sink/snowflake_redshift/
redshift.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use core::num::NonZero;
16use std::fmt::Write;
17use std::time::Duration;
18
19use anyhow::anyhow;
20use bytes::BytesMut;
21use itertools::Itertools;
22use phf::{Set, phf_set};
23use risingwave_common::array::StreamChunk;
24use risingwave_common::catalog::Schema;
25use risingwave_common::types::DataType;
26use risingwave_pb::connector_service::sink_metadata::SerializedMetadata;
27use risingwave_pb::connector_service::{SinkMetadata, sink_metadata};
28use risingwave_pb::stream_plan::PbSinkSchemaChange;
29use serde::Deserialize;
30use serde_json::json;
31use serde_with::{DisplayFromStr, serde_as};
32use thiserror_ext::AsReport;
33use tokio::sync::mpsc::{UnboundedSender, unbounded_channel};
34use tokio::time::{MissedTickBehavior, interval};
35use tonic::async_trait;
36use tracing::warn;
37use with_options::WithOptions;
38
39use crate::connector_common::IcebergSinkCompactionUpdate;
40use crate::enforce_secret::EnforceSecret;
41use crate::sink::catalog::SinkId;
42use crate::sink::coordinate::CoordinatedLogSinker;
43use crate::sink::file_sink::opendal_sink::FileSink;
44use crate::sink::file_sink::s3::{S3Common, S3Sink};
45use crate::sink::jdbc_jni_client::{self, JdbcJniClient};
46use crate::sink::snowflake_redshift::{
47    __OP, __ROW_ID, SnowflakeRedshiftSinkJdbcWriter, SnowflakeRedshiftSinkS3Writer,
48    build_opendal_writer_path,
49};
50use crate::sink::writer::SinkWriter;
51use crate::sink::{
52    Result, SinglePhaseCommitCoordinator, Sink, SinkCommitCoordinator, SinkError, SinkParam,
53};
54
55pub const REDSHIFT_SINK: &str = "redshift";
56
57pub fn build_full_table_name(schema_name: Option<&str>, table_name: &str) -> String {
58    if let Some(schema_name) = schema_name {
59        format!(r#""{}"."{}""#, schema_name, table_name)
60    } else {
61        format!(r#""{}""#, table_name)
62    }
63}
64
65fn build_alter_add_column_sql(
66    schema_name: Option<&str>,
67    table_name: &str,
68    columns: &Vec<(String, String)>,
69) -> String {
70    let full_table_name = build_full_table_name(schema_name, table_name);
71    // redshift does not support add column IF NOT EXISTS yet.
72    jdbc_jni_client::build_alter_add_column_sql(&full_table_name, columns, false)
73}
74
75#[serde_as]
76#[derive(Debug, Clone, Deserialize, WithOptions)]
77pub struct RedShiftConfig {
78    #[serde(rename = "jdbc.url")]
79    pub jdbc_url: String,
80
81    #[serde(rename = "user")]
82    pub username: Option<String>,
83
84    #[serde(rename = "password")]
85    pub password: Option<String>,
86
87    #[serde(rename = "schema")]
88    pub schema: Option<String>,
89
90    #[serde(rename = "intermediate.schema.name")]
91    pub intermediate_schema: Option<String>,
92
93    #[serde(rename = "table.name")]
94    pub table: String,
95
96    #[serde(rename = "intermediate.table.name")]
97    pub cdc_table: Option<String>,
98
99    #[serde(default)]
100    #[serde(rename = "create_table_if_not_exists")]
101    #[serde_as(as = "DisplayFromStr")]
102    pub create_table_if_not_exists: bool,
103
104    #[serde(default = "default_target_interval_schedule")]
105    #[serde(rename = "write.target.interval.seconds")]
106    #[serde_as(as = "DisplayFromStr")]
107    pub writer_target_interval_seconds: u64,
108
109    #[serde(default = "default_intermediate_interval_schedule")]
110    #[serde(rename = "write.intermediate.interval.seconds")]
111    #[serde_as(as = "DisplayFromStr")]
112    pub write_intermediate_interval_seconds: u64,
113
114    #[serde(default = "default_batch_insert_rows")]
115    #[serde(rename = "batch.insert.rows")]
116    #[serde_as(as = "DisplayFromStr")]
117    pub batch_insert_rows: u32,
118
119    #[serde(default = "default_with_s3")]
120    #[serde(rename = "with_s3")]
121    #[serde_as(as = "DisplayFromStr")]
122    pub with_s3: bool,
123
124    #[serde(flatten)]
125    pub s3_inner: Option<S3Common>,
126}
127
128fn default_target_interval_schedule() -> u64 {
129    3600 // Default to 1 hour
130}
131
132fn default_intermediate_interval_schedule() -> u64 {
133    1800 // Default to 0.5 hour
134}
135
136fn default_batch_insert_rows() -> u32 {
137    4096 // Default batch size
138}
139
140fn default_with_s3() -> bool {
141    true
142}
143
144impl RedShiftConfig {
145    pub fn build_client(&self) -> Result<JdbcJniClient> {
146        let mut jdbc_url = self.jdbc_url.clone();
147        if let Some(username) = &self.username {
148            jdbc_url = format!("{}?user={}", jdbc_url, username);
149        }
150        if let Some(password) = &self.password {
151            jdbc_url = format!("{}&password={}", jdbc_url, password);
152        }
153        JdbcJniClient::new(jdbc_url)
154    }
155}
156
157#[derive(Debug)]
158pub struct RedshiftSink {
159    config: RedShiftConfig,
160    param: SinkParam,
161    is_append_only: bool,
162    schema: Schema,
163    pk_indices: Vec<usize>,
164}
165impl EnforceSecret for RedshiftSink {
166    const ENFORCE_SECRET_PROPERTIES: Set<&'static str> = phf_set! {
167        "user",
168        "password",
169        "jdbc.url"
170    };
171}
172
173impl TryFrom<SinkParam> for RedshiftSink {
174    type Error = SinkError;
175
176    fn try_from(param: SinkParam) -> std::result::Result<Self, Self::Error> {
177        let config = serde_json::from_value::<RedShiftConfig>(
178            serde_json::to_value(param.properties.clone()).unwrap(),
179        )
180        .map_err(|e| SinkError::Config(anyhow!(e)))?;
181        let is_append_only = param.sink_type.is_append_only();
182        let schema = param.schema();
183        let pk_indices = param.downstream_pk_or_empty();
184        Ok(Self {
185            config,
186            param,
187            is_append_only,
188            schema,
189            pk_indices,
190        })
191    }
192}
193
194impl Sink for RedshiftSink {
195    type LogSinker = CoordinatedLogSinker<RedShiftSinkWriter>;
196
197    const SINK_NAME: &'static str = REDSHIFT_SINK;
198
199    async fn validate(&self) -> Result<()> {
200        if self.config.create_table_if_not_exists {
201            let client = self.config.build_client()?;
202            let schema = self.param.schema();
203            let build_table_sql = build_create_table_sql(
204                self.config.schema.as_deref(),
205                &self.config.table,
206                &schema,
207                false,
208            )?;
209            client.execute_sql_sync(vec![build_table_sql]).await?;
210            if !self.is_append_only {
211                let cdc_table = self.config.cdc_table.as_ref().ok_or_else(|| {
212                    SinkError::Config(anyhow!(
213                        "intermediate.table.name is required for append-only sink"
214                    ))
215                })?;
216                let cdc_schema_for_create = self
217                    .config
218                    .intermediate_schema
219                    .as_deref()
220                    .or(self.config.schema.as_deref());
221                let build_cdc_table_sql =
222                    build_create_table_sql(cdc_schema_for_create, cdc_table, &schema, true)?;
223                client.execute_sql_sync(vec![build_cdc_table_sql]).await?;
224            }
225        }
226        Ok(())
227    }
228
229    fn support_schema_change() -> bool {
230        true
231    }
232
233    async fn new_log_sinker(
234        &self,
235        writer_param: crate::sink::SinkWriterParam,
236    ) -> Result<Self::LogSinker> {
237        let writer = RedShiftSinkWriter::new(
238            self.config.clone(),
239            self.is_append_only,
240            writer_param.clone(),
241            self.param.clone(),
242        )
243        .await?;
244        CoordinatedLogSinker::new(
245            &writer_param,
246            self.param.clone(),
247            writer,
248            NonZero::new(1).unwrap(),
249        )
250        .await
251    }
252
253    fn is_coordinated_sink(&self) -> bool {
254        true
255    }
256
257    async fn new_coordinator(
258        &self,
259        _iceberg_compact_stat_sender: Option<UnboundedSender<IcebergSinkCompactionUpdate>>,
260    ) -> Result<SinkCommitCoordinator> {
261        let pk_column_names: Vec<_> = self
262            .schema
263            .fields
264            .iter()
265            .enumerate()
266            .filter(|(index, _)| self.pk_indices.contains(index))
267            .map(|(_, field)| field.name.clone())
268            .collect();
269        if pk_column_names.is_empty() && !self.is_append_only {
270            return Err(SinkError::Config(anyhow!(
271                "Primary key columns not found. Please set the `primary_key` column in the sink properties, or ensure that the sink contains the primary key columns from the upstream."
272            )));
273        }
274        let all_column_names = self
275            .schema
276            .fields
277            .iter()
278            .map(|field| field.name.clone())
279            .collect();
280        let coordinator = RedshiftSinkCommitter::new(
281            self.config.clone(),
282            self.is_append_only,
283            &pk_column_names,
284            &all_column_names,
285            self.param.sink_id,
286        )?;
287        Ok(SinkCommitCoordinator::SinglePhase(Box::new(coordinator)))
288    }
289}
290
291pub enum RedShiftSinkWriter {
292    S3(SnowflakeRedshiftSinkS3Writer),
293    Jdbc(SnowflakeRedshiftSinkJdbcWriter),
294}
295
296impl RedShiftSinkWriter {
297    pub async fn new(
298        config: RedShiftConfig,
299        is_append_only: bool,
300        writer_param: super::SinkWriterParam,
301        param: SinkParam,
302    ) -> Result<Self> {
303        let schema = param.schema();
304        if config.with_s3 {
305            let s3_writer = SnowflakeRedshiftSinkS3Writer::new(
306                config.s3_inner.ok_or_else(|| {
307                    SinkError::Config(anyhow!("S3 configuration is required for S3 sink"))
308                })?,
309                schema,
310                is_append_only,
311                config.table,
312            )?;
313            Ok(Self::S3(s3_writer))
314        } else {
315            let jdbc_writer = SnowflakeRedshiftSinkJdbcWriter::new(
316                is_append_only,
317                writer_param,
318                param,
319                build_full_table_name(config.schema.as_deref(), &config.table),
320            )
321            .await?;
322            Ok(Self::Jdbc(jdbc_writer))
323        }
324    }
325}
326
327#[async_trait]
328impl SinkWriter for RedShiftSinkWriter {
329    type CommitMetadata = Option<SinkMetadata>;
330
331    async fn begin_epoch(&mut self, epoch: u64) -> Result<()> {
332        match self {
333            Self::S3(writer) => writer.begin_epoch(epoch),
334            Self::Jdbc(writer) => writer.begin_epoch(epoch).await,
335        }
336    }
337
338    async fn write_batch(&mut self, chunk: StreamChunk) -> Result<()> {
339        match self {
340            Self::S3(writer) => writer.write_batch(chunk).await,
341            Self::Jdbc(writer) => writer.write_batch(chunk).await,
342        }
343    }
344
345    async fn barrier(&mut self, is_checkpoint: bool) -> Result<Option<SinkMetadata>> {
346        let metadata = match self {
347            Self::S3(writer) => {
348                if let Some(path) = writer.barrier(is_checkpoint).await? {
349                    path.into_bytes()
350                } else {
351                    vec![]
352                }
353            }
354            Self::Jdbc(writer) => {
355                writer.barrier(is_checkpoint).await?;
356                vec![]
357            }
358        };
359        Ok(Some(SinkMetadata {
360            metadata: Some(sink_metadata::Metadata::Serialized(SerializedMetadata {
361                metadata,
362            })),
363        }))
364    }
365
366    async fn abort(&mut self) -> Result<()> {
367        if let Self::Jdbc(writer) = self {
368            writer.abort().await
369        } else {
370            Ok(())
371        }
372    }
373}
374
375pub struct RedshiftSinkCommitter {
376    config: RedShiftConfig,
377    client: JdbcJniClient,
378    sink_id: SinkId,
379    pk_column_names: Vec<String>,
380    all_column_names: Vec<String>,
381    writer_target_interval_seconds: u64,
382    write_intermediate_interval_seconds: u64,
383    is_append_only: bool,
384    periodic_task_handle: Option<tokio::task::JoinHandle<()>>,
385    shutdown_sender: Option<tokio::sync::mpsc::UnboundedSender<()>>,
386}
387
388impl RedshiftSinkCommitter {
389    pub fn new(
390        config: RedShiftConfig,
391        is_append_only: bool,
392        pk_column_names: &Vec<String>,
393        all_column_names: &Vec<String>,
394        sink_id: SinkId,
395    ) -> Result<Self> {
396        let client = config.build_client()?;
397        let writer_target_interval_seconds = config.writer_target_interval_seconds;
398        let write_intermediate_interval_seconds = config.write_intermediate_interval_seconds;
399
400        let (periodic_task_handle, shutdown_sender) = match (is_append_only, config.with_s3) {
401            (true, true) | (false, _) => {
402                let task_client = config.build_client()?;
403                let config = config.clone();
404                let (shutdown_sender, shutdown_receiver) = unbounded_channel();
405                let target_schema_name = config.schema.as_deref();
406                let effective_cdc_schema =
407                    config.intermediate_schema.as_deref().or(target_schema_name);
408                let merge_into_sql = if !is_append_only {
409                    Some(build_create_merge_into_task_sql(
410                        effective_cdc_schema,
411                        target_schema_name,
412                        config.cdc_table.as_ref().ok_or_else(|| {
413                            SinkError::Config(anyhow!(
414                                "intermediate.table.name is required for non-append-only sink"
415                            ))
416                        })?,
417                        &config.table,
418                        pk_column_names,
419                        all_column_names,
420                    ))
421                } else {
422                    None
423                };
424                let periodic_task_handle = tokio::spawn(async move {
425                    Self::run_periodic_query_task(
426                        task_client,
427                        merge_into_sql,
428                        config.with_s3,
429                        writer_target_interval_seconds,
430                        write_intermediate_interval_seconds,
431                        sink_id,
432                        config,
433                        is_append_only,
434                        shutdown_receiver,
435                    )
436                    .await;
437                });
438                (Some(periodic_task_handle), Some(shutdown_sender))
439            }
440            _ => (None, None),
441        };
442
443        Ok(Self {
444            client,
445            config,
446            sink_id,
447            pk_column_names: pk_column_names.clone(),
448            all_column_names: all_column_names.clone(),
449            is_append_only,
450            writer_target_interval_seconds,
451            write_intermediate_interval_seconds,
452            periodic_task_handle,
453            shutdown_sender,
454        })
455    }
456
457    async fn flush_manifest_to_redshift(
458        client: &JdbcJniClient,
459        config: &RedShiftConfig,
460        s3_inner: &S3Common,
461        is_append_only: bool,
462    ) -> Result<()> {
463        let s3_operator = FileSink::<S3Sink>::new_s3_sink(s3_inner)?;
464        let mut manifest_path = s3_inner.path.clone().unwrap_or("".to_owned());
465        if !manifest_path.ends_with('/') {
466            manifest_path.push('/');
467        }
468        manifest_path.push_str(&format!("{}/", config.table));
469        manifest_path.push_str("manifest/");
470        let manifests = s3_operator
471            .list(&manifest_path)
472            .await?
473            .into_iter()
474            .map(|e| e.path().to_owned())
475            .collect::<Vec<_>>();
476        for manifest in &manifests {
477            Self::copy_into_from_s3_to_redshift(client, config, s3_inner, is_append_only, manifest)
478                .await?;
479        }
480        s3_operator.delete_iter(manifests).await?;
481        Ok(())
482    }
483
484    async fn write_manifest_to_s3(
485        s3_inner: &S3Common,
486        paths: Vec<String>,
487        table: &str,
488    ) -> Result<String> {
489        let manifest_entries: Vec<_> = paths
490            .into_iter()
491            .map(|path| json!({ "url": path, "mandatory": true }))
492            .collect();
493        let s3_operator = FileSink::<S3Sink>::new_s3_sink(s3_inner)?;
494        let (mut writer, manifest_path) =
495            build_opendal_writer_path(s3_inner, &s3_operator, Some("manifest"), table).await?;
496        let manifest_json = json!({ "entries": manifest_entries });
497        let mut chunk_buf = BytesMut::new();
498        writeln!(chunk_buf, "{}", manifest_json).unwrap();
499        writer.write(chunk_buf.freeze()).await?;
500        writer.close().await.map_err(|e| {
501            SinkError::Redshift(anyhow!(
502                "Failed to close manifest writer: {}",
503                e.to_report_string()
504            ))
505        })?;
506        Ok(manifest_path)
507    }
508
509    pub async fn copy_into_from_s3_to_redshift(
510        client: &JdbcJniClient,
511        config: &RedShiftConfig,
512        s3_inner: &S3Common,
513        is_append_only: bool,
514        manifest: &str,
515    ) -> Result<()> {
516        let all_path = format!("s3://{}/{}", s3_inner.bucket_name, manifest);
517
518        let table = if is_append_only {
519            &config.table
520        } else {
521            config.cdc_table.as_ref().ok_or_else(|| {
522                SinkError::Config(anyhow!(
523                    "intermediate.table.name is required for non-append-only sink"
524                ))
525            })?
526        };
527        let copy_into_sql = build_copy_into_sql(
528            config.schema.as_deref(),
529            table,
530            &all_path,
531            &s3_inner.access,
532            &s3_inner.secret,
533            &s3_inner.assume_role,
534        )?;
535        client.execute_sql_sync(vec![copy_into_sql]).await?;
536        Ok(())
537    }
538
539    async fn run_periodic_query_task(
540        client: JdbcJniClient,
541        merge_into_sql: Option<Vec<String>>,
542        need_copy_into: bool,
543        writer_target_interval_seconds: u64,
544        write_intermediate_interval_seconds: u64,
545        sink_id: SinkId,
546        config: RedShiftConfig,
547        is_append_only: bool,
548        mut shutdown_receiver: tokio::sync::mpsc::UnboundedReceiver<()>,
549    ) {
550        let mut copy_timer = interval(Duration::from_secs(write_intermediate_interval_seconds));
551        copy_timer.set_missed_tick_behavior(MissedTickBehavior::Skip);
552        let mut merge_timer = interval(Duration::from_secs(writer_target_interval_seconds));
553        merge_timer.set_missed_tick_behavior(MissedTickBehavior::Skip);
554
555        loop {
556            tokio::select! {
557                _ = shutdown_receiver.recv() => break,
558                _ = merge_timer.tick(), if merge_into_sql.is_some() => {
559                    if let Some(sql) = &merge_into_sql && let Err(e) = client.execute_sql_sync(sql.clone()).await {
560                        tracing::warn!("Failed to execute periodic query for table {}: {}", config.table, e.as_report());
561                    }
562                },
563                _ = copy_timer.tick(), if need_copy_into => {
564                    if let Err(e) = async {
565                        let s3_inner = config.s3_inner.as_ref().ok_or_else(|| {
566                            SinkError::Config(anyhow!("S3 configuration is required for redshift s3 sink"))
567                        })?;
568                        Self::flush_manifest_to_redshift(&client, &config,s3_inner, is_append_only).await?;
569                        Ok::<(),SinkError>(())
570                    }.await {
571                        tracing::error!("Failed to execute copy into task for sink id {}: {}", sink_id, e.as_report());
572                    }
573                }
574            }
575        }
576        tracing::info!("Periodic query task stopped for sink id {}", sink_id);
577    }
578}
579
580impl Drop for RedshiftSinkCommitter {
581    fn drop(&mut self) {
582        // Send shutdown signal to the periodic task
583        if let Some(shutdown_sender) = &self.shutdown_sender
584            && let Err(e) = shutdown_sender.send(())
585        {
586            tracing::warn!(
587                "Failed to send shutdown signal to periodic task: {}",
588                e.as_report()
589            );
590        }
591        tracing::info!("RedshiftSinkCommitter dropped, periodic task stopped");
592    }
593}
594
595#[async_trait]
596impl SinglePhaseCommitCoordinator for RedshiftSinkCommitter {
597    async fn init(&mut self) -> Result<()> {
598        if self.config.with_s3 {
599            Self::flush_manifest_to_redshift(
600                &self.client,
601                &self.config,
602                self.config.s3_inner.as_ref().ok_or_else(|| {
603                    SinkError::Config(anyhow!("S3 configuration is required for redshift s3 sink"))
604                })?,
605                self.is_append_only,
606            )
607            .await?;
608        }
609        Ok(())
610    }
611
612    async fn commit_data(&mut self, epoch: u64, metadata: Vec<SinkMetadata>) -> Result<()> {
613        if let Some(handle) = &self.periodic_task_handle {
614            let is_finished = handle.is_finished();
615            if is_finished {
616                let handle = self.periodic_task_handle.take().unwrap();
617                handle.await.map_err(|e| {
618                    SinkError::Redshift(anyhow!(
619                        "Periodic task for sink id {} panicked: {}",
620                        self.sink_id,
621                        e.to_report_string()
622                    ))
623                })?;
624            }
625        };
626        let paths = metadata
627            .into_iter()
628            .filter(|m| {
629                if let Some(sink_metadata::Metadata::Serialized(SerializedMetadata { metadata })) =
630                    &m.metadata
631                {
632                    !metadata.is_empty()
633                } else {
634                    false
635                }
636            })
637            .map(|metadata| {
638                let path = if let Some(sink_metadata::Metadata::Serialized(SerializedMetadata {
639                    metadata,
640                })) = metadata.metadata
641                {
642                    String::from_utf8(metadata).map_err(|e| SinkError::Config(anyhow!(e)))
643                } else {
644                    Err(SinkError::Config(anyhow!("Invalid metadata format")))
645                }?;
646                Ok(path)
647            })
648            .collect::<Result<Vec<_>>>()?;
649
650        // Write manifest file to S3 instead of inserting to database
651        if !paths.is_empty() {
652            let s3_inner = self.config.s3_inner.as_ref().ok_or_else(|| {
653                SinkError::Config(anyhow!("S3 configuration is required for S3 sink"))
654            })?;
655            {
656                Self::write_manifest_to_s3(s3_inner, paths, &self.config.table).await?;
657            }
658            tracing::info!(
659                "Manifest file written to S3 for sink id {} at epoch {}",
660                self.sink_id,
661                epoch
662            );
663        }
664        Ok(())
665    }
666
667    async fn commit_schema_change(
668        &mut self,
669        _epoch: u64,
670        schema_change: PbSinkSchemaChange,
671    ) -> Result<()> {
672        use risingwave_pb::stream_plan::sink_schema_change::PbOp as SinkSchemaChangeOp;
673        let schema_change_op = schema_change
674            .op
675            .ok_or_else(|| SinkError::Coordinator(anyhow!("Invalid schema change operation")))?;
676        let SinkSchemaChangeOp::AddColumns(add_columns) = schema_change_op else {
677            return Err(SinkError::Coordinator(anyhow!(
678                "Only AddColumns schema change is supported for Redshift sink"
679            )));
680        };
681        if let Some(shutdown_sender) = &self.shutdown_sender {
682            // Send shutdown signal to the periodic task before altering the table
683            shutdown_sender
684                .send(())
685                .map_err(|e| SinkError::Config(anyhow!(e)))?;
686        }
687        let sql = build_alter_add_column_sql(
688            self.config.schema.as_deref(),
689            &self.config.table,
690            &add_columns
691                .fields
692                .iter()
693                .map(|f| {
694                    (
695                        f.name.clone(),
696                        DataType::from(f.data_type.as_ref().unwrap()).to_string(),
697                    )
698                })
699                .collect_vec(),
700        );
701        let check_column_exists = |e: anyhow::Error| {
702            let err_str = e.to_report_string();
703            if regex::Regex::new(".+ of relation .+ already exists")
704                .unwrap()
705                .find(&err_str)
706                .is_none()
707            {
708                return Err(e);
709            }
710            warn!("redshift sink columns already exists. skipped");
711            Ok(())
712        };
713        self.client
714            .execute_sql_sync(vec![sql.clone()])
715            .await
716            .or_else(check_column_exists)?;
717        let merge_into_sql = if !self.is_append_only {
718            let cdc_table_name = self.config.cdc_table.as_ref().ok_or_else(|| {
719                SinkError::Config(anyhow!(
720                    "intermediate.table.name is required for non-append-only sink"
721                ))
722            })?;
723            let sql = build_alter_add_column_sql(
724                self.config.schema.as_deref(),
725                cdc_table_name,
726                &add_columns
727                    .fields
728                    .iter()
729                    .map(|f| {
730                        (
731                            f.name.clone(),
732                            DataType::from(f.data_type.as_ref().unwrap()).to_string(),
733                        )
734                    })
735                    .collect::<Vec<_>>(),
736            );
737            self.client
738                .execute_sql_sync(vec![sql.clone()])
739                .await
740                .or_else(check_column_exists)?;
741            self.all_column_names
742                .extend(add_columns.fields.iter().map(|f| f.name.clone()));
743            let target_schema_name = self.config.schema.as_deref();
744            let effective_cdc_schema = self
745                .config
746                .intermediate_schema
747                .as_deref()
748                .or(target_schema_name);
749            let merge_into_sql = build_create_merge_into_task_sql(
750                effective_cdc_schema,
751                target_schema_name,
752                self.config.cdc_table.as_ref().ok_or_else(|| {
753                    SinkError::Config(anyhow!(
754                        "intermediate.table.name is required for non-append-only sink"
755                    ))
756                })?,
757                &self.config.table,
758                &self.pk_column_names,
759                &self.all_column_names,
760            );
761            Some(merge_into_sql)
762        } else {
763            None
764        };
765
766        if let Some(shutdown_sender) = self.shutdown_sender.take() {
767            let _ = shutdown_sender.send(());
768        }
769        if let Some(periodic_task_handle) = self.periodic_task_handle.take() {
770            let _ = periodic_task_handle.await;
771        }
772
773        let (shutdown_sender, shutdown_receiver) = unbounded_channel();
774        let client = self.client.clone();
775
776        let writer_target_interval_seconds = self.writer_target_interval_seconds;
777        let write_intermediate_interval_seconds = self.write_intermediate_interval_seconds;
778        let config = self.config.clone();
779        let sink_id = self.sink_id;
780        let is_append_only = self.is_append_only;
781        let periodic_task_handle = tokio::spawn(async move {
782            Self::run_periodic_query_task(
783                client,
784                merge_into_sql,
785                config.with_s3,
786                writer_target_interval_seconds,
787                write_intermediate_interval_seconds,
788                sink_id,
789                config,
790                is_append_only,
791                shutdown_receiver,
792            )
793            .await;
794        });
795        self.shutdown_sender = Some(shutdown_sender);
796        self.periodic_task_handle = Some(periodic_task_handle);
797
798        Ok(())
799    }
800}
801
802pub fn build_create_table_sql(
803    schema_name: Option<&str>,
804    table_name: &str,
805    schema: &Schema,
806    need_op_and_row_id: bool,
807) -> Result<String> {
808    let mut columns: Vec<String> = schema
809        .fields
810        .iter()
811        .map(|field| {
812            let data_type = convert_redshift_data_type(&field.data_type)?;
813            Ok(format!("{} {}", field.name, data_type))
814        })
815        .collect::<Result<Vec<String>>>()?;
816    if need_op_and_row_id {
817        columns.push(format!("{} VARCHAR(MAX)", __ROW_ID));
818        columns.push(format!("{} INT", __OP));
819    }
820    let columns_str = columns.join(", ");
821    let full_table_name = build_full_table_name(schema_name, table_name);
822    Ok(format!(
823        "CREATE TABLE IF NOT EXISTS {} ({})",
824        full_table_name, columns_str
825    ))
826}
827
828fn convert_redshift_data_type(data_type: &DataType) -> Result<String> {
829    let data_type = match data_type {
830        DataType::Int16 => "SMALLINT".to_owned(),
831        DataType::Int32 => "INTEGER".to_owned(),
832        DataType::Int64 => "BIGINT".to_owned(),
833        DataType::Float32 => "REAL".to_owned(),
834        DataType::Float64 => "FLOAT".to_owned(),
835        DataType::Boolean => "BOOLEAN".to_owned(),
836        DataType::Varchar => "VARCHAR(MAX)".to_owned(),
837        DataType::Date => "DATE".to_owned(),
838        DataType::Timestamp => "TIMESTAMP".to_owned(),
839        DataType::Timestamptz => "TIMESTAMPTZ".to_owned(),
840        DataType::Jsonb => "VARCHAR(MAX)".to_owned(),
841        DataType::Decimal => "DECIMAL".to_owned(),
842        DataType::Time => "TIME".to_owned(),
843        _ => {
844            return Err(SinkError::Config(anyhow!(
845                "Dont support auto create table for datatype: {}",
846                data_type
847            )));
848        }
849    };
850    Ok(data_type)
851}
852
853fn build_create_merge_into_task_sql(
854    cdc_schema_name: Option<&str>,
855    target_schema_name: Option<&str>,
856    cdc_table_name: &str,
857    target_table_name: &str,
858    pk_column_names: &Vec<String>,
859    all_column_names: &Vec<String>,
860) -> Vec<String> {
861    let cdc_table_name = build_full_table_name(cdc_schema_name, cdc_table_name);
862    let target_table_name = build_full_table_name(target_schema_name, target_table_name);
863    let pk_names_str = pk_column_names.join(", ");
864    let pk_names_eq_str = pk_column_names
865        .iter()
866        .map(|name| format!("{target_table_name}.{name} = source.{name}", name = name))
867        .collect::<Vec<String>>()
868        .join(" AND ");
869    let all_column_names_set_str = all_column_names
870        .iter()
871        .map(|name| format!("{name} = source.{name}", name = name))
872        .collect::<Vec<String>>()
873        .join(", ");
874    let all_column_names_str = all_column_names.join(", ");
875    let all_column_names_insert_str = all_column_names
876        .iter()
877        .map(|name| format!("source.{name}", name = name))
878        .collect::<Vec<String>>()
879        .join(", ");
880
881    vec![
882        format!(
883            r#"
884            CREATE TEMP TABLE max_id_table AS
885            SELECT COALESCE(MAX({redshift_sink_row_id}), '0') AS max_row_id
886            FROM {cdc_table_name};
887            "#,
888            redshift_sink_row_id = __ROW_ID,
889            cdc_table_name = cdc_table_name,
890        ),
891        format!(
892            r#"
893            DELETE FROM {target_table_name}
894            USING (
895                SELECT *
896                FROM (
897                    SELECT *, ROW_NUMBER() OVER (
898                        PARTITION BY {pk_names_str}
899                        ORDER BY {redshift_sink_row_id} DESC
900                    ) AS dedupe_id
901                    FROM {cdc_table_name}, max_id_table
902                    WHERE {cdc_table_name}.{redshift_sink_row_id} <= max_id_table.max_row_id
903                ) AS subquery
904                WHERE dedupe_id = 1 AND {redshift_sink_op} IN (2, 4)
905            ) AS source
906            WHERE {pk_names_eq_str};
907            "#,
908            target_table_name = target_table_name,
909            pk_names_str = pk_names_str,
910            redshift_sink_row_id = __ROW_ID,
911            cdc_table_name = cdc_table_name,
912            redshift_sink_op = __OP,
913            pk_names_eq_str = pk_names_eq_str,
914        ),
915        format!(
916            r#"
917            MERGE INTO {target_table_name}
918            USING (
919                SELECT *
920                FROM (
921                    SELECT *, ROW_NUMBER() OVER (
922                        PARTITION BY {pk_names_str}
923                        ORDER BY {redshift_sink_row_id} DESC
924                    ) AS dedupe_id
925                    FROM {cdc_table_name}, max_id_table
926                    WHERE {cdc_table_name}.{redshift_sink_row_id} <= max_id_table.max_row_id
927                ) AS subquery
928                WHERE dedupe_id = 1 AND {redshift_sink_op} IN (1, 3)
929            ) AS source
930            ON {pk_names_eq_str}
931            WHEN MATCHED THEN
932                UPDATE SET {all_column_names_set_str}
933            WHEN NOT MATCHED THEN
934                INSERT ({all_column_names_str}) VALUES ({all_column_names_insert_str});
935            "#,
936            target_table_name = target_table_name,
937            pk_names_str = pk_names_str,
938            redshift_sink_row_id = __ROW_ID,
939            cdc_table_name = cdc_table_name,
940            redshift_sink_op = __OP,
941            pk_names_eq_str = pk_names_eq_str,
942            all_column_names_set_str = all_column_names_set_str,
943            all_column_names_str = all_column_names_str,
944            all_column_names_insert_str = all_column_names_insert_str,
945        ),
946        format!(
947            r#"
948            DELETE FROM {cdc_table_name}
949            USING max_id_table
950            WHERE {cdc_table_name}.{redshift_sink_row_id} <= max_id_table.max_row_id;
951            "#,
952            cdc_table_name = cdc_table_name,
953            redshift_sink_row_id = __ROW_ID,
954        ),
955        "DROP TABLE IF EXISTS max_id_table;".to_owned(),
956    ]
957}
958
959fn build_copy_into_sql(
960    schema_name: Option<&str>,
961    table_name: &str,
962    manifest_dir: &str,
963    access_key: &Option<String>,
964    secret_key: &Option<String>,
965    assume_role: &Option<String>,
966) -> Result<String> {
967    let table_name = build_full_table_name(schema_name, table_name);
968    let credentials = if let Some(assume_role) = assume_role {
969        &format!("aws_iam_role={}", assume_role)
970    } else if let (Some(access_key), Some(secret_key)) = (access_key, secret_key) {
971        &format!(
972            "aws_access_key_id={};aws_secret_access_key={}",
973            access_key, secret_key
974        )
975    } else {
976        return Err(SinkError::Config(anyhow!(
977            "Either assume_role or access_key and secret_key must be provided for Redshift COPY command"
978        )));
979    };
980    Ok(format!(
981        r#"
982        COPY {table_name}
983        FROM '{manifest_dir}'
984        CREDENTIALS '{credentials}'
985        FORMAT AS JSON 'auto'
986        DATEFORMAT 'auto'
987        TIMEFORMAT 'auto'
988        MANIFEST;
989        "#,
990        table_name = table_name,
991        manifest_dir = manifest_dir,
992        credentials = credentials
993    ))
994}