1use core::num::NonZero;
16use std::fmt::Write;
17use std::time::Duration;
18
19use anyhow::anyhow;
20use bytes::BytesMut;
21use itertools::Itertools;
22use phf::{Set, phf_set};
23use risingwave_common::array::StreamChunk;
24use risingwave_common::catalog::Schema;
25use risingwave_common::types::DataType;
26use risingwave_pb::connector_service::sink_metadata::SerializedMetadata;
27use risingwave_pb::connector_service::{SinkMetadata, sink_metadata};
28use risingwave_pb::stream_plan::PbSinkSchemaChange;
29use serde::Deserialize;
30use serde_json::json;
31use serde_with::{DisplayFromStr, serde_as};
32use thiserror_ext::AsReport;
33use tokio::sync::mpsc::{UnboundedSender, unbounded_channel};
34use tokio::time::{MissedTickBehavior, interval};
35use tonic::async_trait;
36use tracing::warn;
37use with_options::WithOptions;
38
39use crate::connector_common::IcebergSinkCompactionUpdate;
40use crate::enforce_secret::EnforceSecret;
41use crate::sink::catalog::SinkId;
42use crate::sink::coordinate::CoordinatedLogSinker;
43use crate::sink::file_sink::opendal_sink::FileSink;
44use crate::sink::file_sink::s3::{S3Common, S3Sink};
45use crate::sink::jdbc_jni_client::{self, JdbcJniClient};
46use crate::sink::snowflake_redshift::{
47 __OP, __ROW_ID, SnowflakeRedshiftSinkJdbcWriter, SnowflakeRedshiftSinkS3Writer,
48 build_opendal_writer_path,
49};
50use crate::sink::writer::SinkWriter;
51use crate::sink::{
52 Result, SinglePhaseCommitCoordinator, Sink, SinkCommitCoordinator, SinkError, SinkParam,
53};
54
55pub const REDSHIFT_SINK: &str = "redshift";
56
57pub fn build_full_table_name(schema_name: Option<&str>, table_name: &str) -> String {
58 if let Some(schema_name) = schema_name {
59 format!(r#""{}"."{}""#, schema_name, table_name)
60 } else {
61 format!(r#""{}""#, table_name)
62 }
63}
64
65fn build_alter_add_column_sql(
66 schema_name: Option<&str>,
67 table_name: &str,
68 columns: &Vec<(String, String)>,
69) -> String {
70 let full_table_name = build_full_table_name(schema_name, table_name);
71 jdbc_jni_client::build_alter_add_column_sql(&full_table_name, columns, false)
73}
74
75#[serde_as]
76#[derive(Debug, Clone, Deserialize, WithOptions)]
77pub struct RedShiftConfig {
78 #[serde(rename = "jdbc.url")]
79 pub jdbc_url: String,
80
81 #[serde(rename = "user")]
82 pub username: Option<String>,
83
84 #[serde(rename = "password")]
85 pub password: Option<String>,
86
87 #[serde(rename = "schema")]
88 pub schema: Option<String>,
89
90 #[serde(rename = "intermediate.schema.name")]
91 pub intermediate_schema: Option<String>,
92
93 #[serde(rename = "table.name")]
94 pub table: String,
95
96 #[serde(rename = "intermediate.table.name")]
97 pub cdc_table: Option<String>,
98
99 #[serde(default)]
100 #[serde(rename = "create_table_if_not_exists")]
101 #[serde_as(as = "DisplayFromStr")]
102 pub create_table_if_not_exists: bool,
103
104 #[serde(default = "default_target_interval_schedule")]
105 #[serde(rename = "write.target.interval.seconds")]
106 #[serde_as(as = "DisplayFromStr")]
107 pub writer_target_interval_seconds: u64,
108
109 #[serde(default = "default_intermediate_interval_schedule")]
110 #[serde(rename = "write.intermediate.interval.seconds")]
111 #[serde_as(as = "DisplayFromStr")]
112 pub write_intermediate_interval_seconds: u64,
113
114 #[serde(default = "default_batch_insert_rows")]
115 #[serde(rename = "batch.insert.rows")]
116 #[serde_as(as = "DisplayFromStr")]
117 pub batch_insert_rows: u32,
118
119 #[serde(default = "default_with_s3")]
120 #[serde(rename = "with_s3")]
121 #[serde_as(as = "DisplayFromStr")]
122 pub with_s3: bool,
123
124 #[serde(flatten)]
125 pub s3_inner: Option<S3Common>,
126}
127
128fn default_target_interval_schedule() -> u64 {
129 3600 }
131
132fn default_intermediate_interval_schedule() -> u64 {
133 1800 }
135
136fn default_batch_insert_rows() -> u32 {
137 4096 }
139
140fn default_with_s3() -> bool {
141 true
142}
143
144impl RedShiftConfig {
145 pub fn build_client(&self) -> Result<JdbcJniClient> {
146 let mut jdbc_url = self.jdbc_url.clone();
147 if let Some(username) = &self.username {
148 jdbc_url = format!("{}?user={}", jdbc_url, username);
149 }
150 if let Some(password) = &self.password {
151 jdbc_url = format!("{}&password={}", jdbc_url, password);
152 }
153 JdbcJniClient::new(jdbc_url)
154 }
155}
156
157#[derive(Debug)]
158pub struct RedshiftSink {
159 config: RedShiftConfig,
160 param: SinkParam,
161 is_append_only: bool,
162 schema: Schema,
163 pk_indices: Vec<usize>,
164}
165impl EnforceSecret for RedshiftSink {
166 const ENFORCE_SECRET_PROPERTIES: Set<&'static str> = phf_set! {
167 "user",
168 "password",
169 "jdbc.url"
170 };
171}
172
173impl TryFrom<SinkParam> for RedshiftSink {
174 type Error = SinkError;
175
176 fn try_from(param: SinkParam) -> std::result::Result<Self, Self::Error> {
177 let config = serde_json::from_value::<RedShiftConfig>(
178 serde_json::to_value(param.properties.clone()).unwrap(),
179 )
180 .map_err(|e| SinkError::Config(anyhow!(e)))?;
181 let is_append_only = param.sink_type.is_append_only();
182 let schema = param.schema();
183 let pk_indices = param.downstream_pk_or_empty();
184 Ok(Self {
185 config,
186 param,
187 is_append_only,
188 schema,
189 pk_indices,
190 })
191 }
192}
193
194impl Sink for RedshiftSink {
195 type LogSinker = CoordinatedLogSinker<RedShiftSinkWriter>;
196
197 const SINK_NAME: &'static str = REDSHIFT_SINK;
198
199 async fn validate(&self) -> Result<()> {
200 if self.config.create_table_if_not_exists {
201 let client = self.config.build_client()?;
202 let schema = self.param.schema();
203 let build_table_sql = build_create_table_sql(
204 self.config.schema.as_deref(),
205 &self.config.table,
206 &schema,
207 false,
208 )?;
209 client.execute_sql_sync(vec![build_table_sql]).await?;
210 if !self.is_append_only {
211 let cdc_table = self.config.cdc_table.as_ref().ok_or_else(|| {
212 SinkError::Config(anyhow!(
213 "intermediate.table.name is required for append-only sink"
214 ))
215 })?;
216 let cdc_schema_for_create = self
217 .config
218 .intermediate_schema
219 .as_deref()
220 .or(self.config.schema.as_deref());
221 let build_cdc_table_sql =
222 build_create_table_sql(cdc_schema_for_create, cdc_table, &schema, true)?;
223 client.execute_sql_sync(vec![build_cdc_table_sql]).await?;
224 }
225 }
226 Ok(())
227 }
228
229 fn support_schema_change() -> bool {
230 true
231 }
232
233 async fn new_log_sinker(
234 &self,
235 writer_param: crate::sink::SinkWriterParam,
236 ) -> Result<Self::LogSinker> {
237 let writer = RedShiftSinkWriter::new(
238 self.config.clone(),
239 self.is_append_only,
240 writer_param.clone(),
241 self.param.clone(),
242 )
243 .await?;
244 CoordinatedLogSinker::new(
245 &writer_param,
246 self.param.clone(),
247 writer,
248 NonZero::new(1).unwrap(),
249 )
250 .await
251 }
252
253 fn is_coordinated_sink(&self) -> bool {
254 true
255 }
256
257 async fn new_coordinator(
258 &self,
259 _iceberg_compact_stat_sender: Option<UnboundedSender<IcebergSinkCompactionUpdate>>,
260 ) -> Result<SinkCommitCoordinator> {
261 let pk_column_names: Vec<_> = self
262 .schema
263 .fields
264 .iter()
265 .enumerate()
266 .filter(|(index, _)| self.pk_indices.contains(index))
267 .map(|(_, field)| field.name.clone())
268 .collect();
269 if pk_column_names.is_empty() && !self.is_append_only {
270 return Err(SinkError::Config(anyhow!(
271 "Primary key columns not found. Please set the `primary_key` column in the sink properties, or ensure that the sink contains the primary key columns from the upstream."
272 )));
273 }
274 let all_column_names = self
275 .schema
276 .fields
277 .iter()
278 .map(|field| field.name.clone())
279 .collect();
280 let coordinator = RedshiftSinkCommitter::new(
281 self.config.clone(),
282 self.is_append_only,
283 &pk_column_names,
284 &all_column_names,
285 self.param.sink_id,
286 )?;
287 Ok(SinkCommitCoordinator::SinglePhase(Box::new(coordinator)))
288 }
289}
290
291pub enum RedShiftSinkWriter {
292 S3(SnowflakeRedshiftSinkS3Writer),
293 Jdbc(SnowflakeRedshiftSinkJdbcWriter),
294}
295
296impl RedShiftSinkWriter {
297 pub async fn new(
298 config: RedShiftConfig,
299 is_append_only: bool,
300 writer_param: super::SinkWriterParam,
301 param: SinkParam,
302 ) -> Result<Self> {
303 let schema = param.schema();
304 if config.with_s3 {
305 let s3_writer = SnowflakeRedshiftSinkS3Writer::new(
306 config.s3_inner.ok_or_else(|| {
307 SinkError::Config(anyhow!("S3 configuration is required for S3 sink"))
308 })?,
309 schema,
310 is_append_only,
311 config.table,
312 )?;
313 Ok(Self::S3(s3_writer))
314 } else {
315 let jdbc_writer = SnowflakeRedshiftSinkJdbcWriter::new(
316 is_append_only,
317 writer_param,
318 param,
319 build_full_table_name(config.schema.as_deref(), &config.table),
320 )
321 .await?;
322 Ok(Self::Jdbc(jdbc_writer))
323 }
324 }
325}
326
327#[async_trait]
328impl SinkWriter for RedShiftSinkWriter {
329 type CommitMetadata = Option<SinkMetadata>;
330
331 async fn begin_epoch(&mut self, epoch: u64) -> Result<()> {
332 match self {
333 Self::S3(writer) => writer.begin_epoch(epoch),
334 Self::Jdbc(writer) => writer.begin_epoch(epoch).await,
335 }
336 }
337
338 async fn write_batch(&mut self, chunk: StreamChunk) -> Result<()> {
339 match self {
340 Self::S3(writer) => writer.write_batch(chunk).await,
341 Self::Jdbc(writer) => writer.write_batch(chunk).await,
342 }
343 }
344
345 async fn barrier(&mut self, is_checkpoint: bool) -> Result<Option<SinkMetadata>> {
346 let metadata = match self {
347 Self::S3(writer) => {
348 if let Some(path) = writer.barrier(is_checkpoint).await? {
349 path.into_bytes()
350 } else {
351 vec![]
352 }
353 }
354 Self::Jdbc(writer) => {
355 writer.barrier(is_checkpoint).await?;
356 vec![]
357 }
358 };
359 Ok(Some(SinkMetadata {
360 metadata: Some(sink_metadata::Metadata::Serialized(SerializedMetadata {
361 metadata,
362 })),
363 }))
364 }
365
366 async fn abort(&mut self) -> Result<()> {
367 if let Self::Jdbc(writer) = self {
368 writer.abort().await
369 } else {
370 Ok(())
371 }
372 }
373}
374
375pub struct RedshiftSinkCommitter {
376 config: RedShiftConfig,
377 client: JdbcJniClient,
378 sink_id: SinkId,
379 pk_column_names: Vec<String>,
380 all_column_names: Vec<String>,
381 writer_target_interval_seconds: u64,
382 write_intermediate_interval_seconds: u64,
383 is_append_only: bool,
384 periodic_task_handle: Option<tokio::task::JoinHandle<()>>,
385 shutdown_sender: Option<tokio::sync::mpsc::UnboundedSender<()>>,
386}
387
388impl RedshiftSinkCommitter {
389 pub fn new(
390 config: RedShiftConfig,
391 is_append_only: bool,
392 pk_column_names: &Vec<String>,
393 all_column_names: &Vec<String>,
394 sink_id: SinkId,
395 ) -> Result<Self> {
396 let client = config.build_client()?;
397 let writer_target_interval_seconds = config.writer_target_interval_seconds;
398 let write_intermediate_interval_seconds = config.write_intermediate_interval_seconds;
399
400 let (periodic_task_handle, shutdown_sender) = match (is_append_only, config.with_s3) {
401 (true, true) | (false, _) => {
402 let task_client = config.build_client()?;
403 let config = config.clone();
404 let (shutdown_sender, shutdown_receiver) = unbounded_channel();
405 let target_schema_name = config.schema.as_deref();
406 let effective_cdc_schema =
407 config.intermediate_schema.as_deref().or(target_schema_name);
408 let merge_into_sql = if !is_append_only {
409 Some(build_create_merge_into_task_sql(
410 effective_cdc_schema,
411 target_schema_name,
412 config.cdc_table.as_ref().ok_or_else(|| {
413 SinkError::Config(anyhow!(
414 "intermediate.table.name is required for non-append-only sink"
415 ))
416 })?,
417 &config.table,
418 pk_column_names,
419 all_column_names,
420 ))
421 } else {
422 None
423 };
424 let periodic_task_handle = tokio::spawn(async move {
425 Self::run_periodic_query_task(
426 task_client,
427 merge_into_sql,
428 config.with_s3,
429 writer_target_interval_seconds,
430 write_intermediate_interval_seconds,
431 sink_id,
432 config,
433 is_append_only,
434 shutdown_receiver,
435 )
436 .await;
437 });
438 (Some(periodic_task_handle), Some(shutdown_sender))
439 }
440 _ => (None, None),
441 };
442
443 Ok(Self {
444 client,
445 config,
446 sink_id,
447 pk_column_names: pk_column_names.clone(),
448 all_column_names: all_column_names.clone(),
449 is_append_only,
450 writer_target_interval_seconds,
451 write_intermediate_interval_seconds,
452 periodic_task_handle,
453 shutdown_sender,
454 })
455 }
456
457 async fn flush_manifest_to_redshift(
458 client: &JdbcJniClient,
459 config: &RedShiftConfig,
460 s3_inner: &S3Common,
461 is_append_only: bool,
462 ) -> Result<()> {
463 let s3_operator = FileSink::<S3Sink>::new_s3_sink(s3_inner)?;
464 let mut manifest_path = s3_inner.path.clone().unwrap_or("".to_owned());
465 if !manifest_path.ends_with('/') {
466 manifest_path.push('/');
467 }
468 manifest_path.push_str(&format!("{}/", config.table));
469 manifest_path.push_str("manifest/");
470 let manifests = s3_operator
471 .list(&manifest_path)
472 .await?
473 .into_iter()
474 .map(|e| e.path().to_owned())
475 .collect::<Vec<_>>();
476 for manifest in &manifests {
477 Self::copy_into_from_s3_to_redshift(client, config, s3_inner, is_append_only, manifest)
478 .await?;
479 }
480 s3_operator.delete_iter(manifests).await?;
481 Ok(())
482 }
483
484 async fn write_manifest_to_s3(
485 s3_inner: &S3Common,
486 paths: Vec<String>,
487 table: &str,
488 ) -> Result<String> {
489 let manifest_entries: Vec<_> = paths
490 .into_iter()
491 .map(|path| json!({ "url": path, "mandatory": true }))
492 .collect();
493 let s3_operator = FileSink::<S3Sink>::new_s3_sink(s3_inner)?;
494 let (mut writer, manifest_path) =
495 build_opendal_writer_path(s3_inner, &s3_operator, Some("manifest"), table).await?;
496 let manifest_json = json!({ "entries": manifest_entries });
497 let mut chunk_buf = BytesMut::new();
498 writeln!(chunk_buf, "{}", manifest_json).unwrap();
499 writer.write(chunk_buf.freeze()).await?;
500 writer.close().await.map_err(|e| {
501 SinkError::Redshift(anyhow!(
502 "Failed to close manifest writer: {}",
503 e.to_report_string()
504 ))
505 })?;
506 Ok(manifest_path)
507 }
508
509 pub async fn copy_into_from_s3_to_redshift(
510 client: &JdbcJniClient,
511 config: &RedShiftConfig,
512 s3_inner: &S3Common,
513 is_append_only: bool,
514 manifest: &str,
515 ) -> Result<()> {
516 let all_path = format!("s3://{}/{}", s3_inner.bucket_name, manifest);
517
518 let table = if is_append_only {
519 &config.table
520 } else {
521 config.cdc_table.as_ref().ok_or_else(|| {
522 SinkError::Config(anyhow!(
523 "intermediate.table.name is required for non-append-only sink"
524 ))
525 })?
526 };
527 let copy_into_sql = build_copy_into_sql(
528 config.schema.as_deref(),
529 table,
530 &all_path,
531 &s3_inner.access,
532 &s3_inner.secret,
533 &s3_inner.assume_role,
534 )?;
535 client.execute_sql_sync(vec![copy_into_sql]).await?;
536 Ok(())
537 }
538
539 async fn run_periodic_query_task(
540 client: JdbcJniClient,
541 merge_into_sql: Option<Vec<String>>,
542 need_copy_into: bool,
543 writer_target_interval_seconds: u64,
544 write_intermediate_interval_seconds: u64,
545 sink_id: SinkId,
546 config: RedShiftConfig,
547 is_append_only: bool,
548 mut shutdown_receiver: tokio::sync::mpsc::UnboundedReceiver<()>,
549 ) {
550 let mut copy_timer = interval(Duration::from_secs(write_intermediate_interval_seconds));
551 copy_timer.set_missed_tick_behavior(MissedTickBehavior::Skip);
552 let mut merge_timer = interval(Duration::from_secs(writer_target_interval_seconds));
553 merge_timer.set_missed_tick_behavior(MissedTickBehavior::Skip);
554
555 loop {
556 tokio::select! {
557 _ = shutdown_receiver.recv() => break,
558 _ = merge_timer.tick(), if merge_into_sql.is_some() => {
559 if let Some(sql) = &merge_into_sql && let Err(e) = client.execute_sql_sync(sql.clone()).await {
560 tracing::warn!("Failed to execute periodic query for table {}: {}", config.table, e.as_report());
561 }
562 },
563 _ = copy_timer.tick(), if need_copy_into => {
564 if let Err(e) = async {
565 let s3_inner = config.s3_inner.as_ref().ok_or_else(|| {
566 SinkError::Config(anyhow!("S3 configuration is required for redshift s3 sink"))
567 })?;
568 Self::flush_manifest_to_redshift(&client, &config,s3_inner, is_append_only).await?;
569 Ok::<(),SinkError>(())
570 }.await {
571 tracing::error!("Failed to execute copy into task for sink id {}: {}", sink_id, e.as_report());
572 }
573 }
574 }
575 }
576 tracing::info!("Periodic query task stopped for sink id {}", sink_id);
577 }
578}
579
580impl Drop for RedshiftSinkCommitter {
581 fn drop(&mut self) {
582 if let Some(shutdown_sender) = &self.shutdown_sender
584 && let Err(e) = shutdown_sender.send(())
585 {
586 tracing::warn!(
587 "Failed to send shutdown signal to periodic task: {}",
588 e.as_report()
589 );
590 }
591 tracing::info!("RedshiftSinkCommitter dropped, periodic task stopped");
592 }
593}
594
595#[async_trait]
596impl SinglePhaseCommitCoordinator for RedshiftSinkCommitter {
597 async fn init(&mut self) -> Result<()> {
598 if self.config.with_s3 {
599 Self::flush_manifest_to_redshift(
600 &self.client,
601 &self.config,
602 self.config.s3_inner.as_ref().ok_or_else(|| {
603 SinkError::Config(anyhow!("S3 configuration is required for redshift s3 sink"))
604 })?,
605 self.is_append_only,
606 )
607 .await?;
608 }
609 Ok(())
610 }
611
612 async fn commit_data(&mut self, epoch: u64, metadata: Vec<SinkMetadata>) -> Result<()> {
613 if let Some(handle) = &self.periodic_task_handle {
614 let is_finished = handle.is_finished();
615 if is_finished {
616 let handle = self.periodic_task_handle.take().unwrap();
617 handle.await.map_err(|e| {
618 SinkError::Redshift(anyhow!(
619 "Periodic task for sink id {} panicked: {}",
620 self.sink_id,
621 e.to_report_string()
622 ))
623 })?;
624 }
625 };
626 let paths = metadata
627 .into_iter()
628 .filter(|m| {
629 if let Some(sink_metadata::Metadata::Serialized(SerializedMetadata { metadata })) =
630 &m.metadata
631 {
632 !metadata.is_empty()
633 } else {
634 false
635 }
636 })
637 .map(|metadata| {
638 let path = if let Some(sink_metadata::Metadata::Serialized(SerializedMetadata {
639 metadata,
640 })) = metadata.metadata
641 {
642 String::from_utf8(metadata).map_err(|e| SinkError::Config(anyhow!(e)))
643 } else {
644 Err(SinkError::Config(anyhow!("Invalid metadata format")))
645 }?;
646 Ok(path)
647 })
648 .collect::<Result<Vec<_>>>()?;
649
650 if !paths.is_empty() {
652 let s3_inner = self.config.s3_inner.as_ref().ok_or_else(|| {
653 SinkError::Config(anyhow!("S3 configuration is required for S3 sink"))
654 })?;
655 {
656 Self::write_manifest_to_s3(s3_inner, paths, &self.config.table).await?;
657 }
658 tracing::info!(
659 "Manifest file written to S3 for sink id {} at epoch {}",
660 self.sink_id,
661 epoch
662 );
663 }
664 Ok(())
665 }
666
667 async fn commit_schema_change(
668 &mut self,
669 _epoch: u64,
670 schema_change: PbSinkSchemaChange,
671 ) -> Result<()> {
672 use risingwave_pb::stream_plan::sink_schema_change::PbOp as SinkSchemaChangeOp;
673 let schema_change_op = schema_change
674 .op
675 .ok_or_else(|| SinkError::Coordinator(anyhow!("Invalid schema change operation")))?;
676 let SinkSchemaChangeOp::AddColumns(add_columns) = schema_change_op else {
677 return Err(SinkError::Coordinator(anyhow!(
678 "Only AddColumns schema change is supported for Redshift sink"
679 )));
680 };
681 if let Some(shutdown_sender) = &self.shutdown_sender {
682 shutdown_sender
684 .send(())
685 .map_err(|e| SinkError::Config(anyhow!(e)))?;
686 }
687 let sql = build_alter_add_column_sql(
688 self.config.schema.as_deref(),
689 &self.config.table,
690 &add_columns
691 .fields
692 .iter()
693 .map(|f| {
694 (
695 f.name.clone(),
696 DataType::from(f.data_type.as_ref().unwrap()).to_string(),
697 )
698 })
699 .collect_vec(),
700 );
701 let check_column_exists = |e: anyhow::Error| {
702 let err_str = e.to_report_string();
703 if regex::Regex::new(".+ of relation .+ already exists")
704 .unwrap()
705 .find(&err_str)
706 .is_none()
707 {
708 return Err(e);
709 }
710 warn!("redshift sink columns already exists. skipped");
711 Ok(())
712 };
713 self.client
714 .execute_sql_sync(vec![sql.clone()])
715 .await
716 .or_else(check_column_exists)?;
717 let merge_into_sql = if !self.is_append_only {
718 let cdc_table_name = self.config.cdc_table.as_ref().ok_or_else(|| {
719 SinkError::Config(anyhow!(
720 "intermediate.table.name is required for non-append-only sink"
721 ))
722 })?;
723 let sql = build_alter_add_column_sql(
724 self.config.schema.as_deref(),
725 cdc_table_name,
726 &add_columns
727 .fields
728 .iter()
729 .map(|f| {
730 (
731 f.name.clone(),
732 DataType::from(f.data_type.as_ref().unwrap()).to_string(),
733 )
734 })
735 .collect::<Vec<_>>(),
736 );
737 self.client
738 .execute_sql_sync(vec![sql.clone()])
739 .await
740 .or_else(check_column_exists)?;
741 self.all_column_names
742 .extend(add_columns.fields.iter().map(|f| f.name.clone()));
743 let target_schema_name = self.config.schema.as_deref();
744 let effective_cdc_schema = self
745 .config
746 .intermediate_schema
747 .as_deref()
748 .or(target_schema_name);
749 let merge_into_sql = build_create_merge_into_task_sql(
750 effective_cdc_schema,
751 target_schema_name,
752 self.config.cdc_table.as_ref().ok_or_else(|| {
753 SinkError::Config(anyhow!(
754 "intermediate.table.name is required for non-append-only sink"
755 ))
756 })?,
757 &self.config.table,
758 &self.pk_column_names,
759 &self.all_column_names,
760 );
761 Some(merge_into_sql)
762 } else {
763 None
764 };
765
766 if let Some(shutdown_sender) = self.shutdown_sender.take() {
767 let _ = shutdown_sender.send(());
768 }
769 if let Some(periodic_task_handle) = self.periodic_task_handle.take() {
770 let _ = periodic_task_handle.await;
771 }
772
773 let (shutdown_sender, shutdown_receiver) = unbounded_channel();
774 let client = self.client.clone();
775
776 let writer_target_interval_seconds = self.writer_target_interval_seconds;
777 let write_intermediate_interval_seconds = self.write_intermediate_interval_seconds;
778 let config = self.config.clone();
779 let sink_id = self.sink_id;
780 let is_append_only = self.is_append_only;
781 let periodic_task_handle = tokio::spawn(async move {
782 Self::run_periodic_query_task(
783 client,
784 merge_into_sql,
785 config.with_s3,
786 writer_target_interval_seconds,
787 write_intermediate_interval_seconds,
788 sink_id,
789 config,
790 is_append_only,
791 shutdown_receiver,
792 )
793 .await;
794 });
795 self.shutdown_sender = Some(shutdown_sender);
796 self.periodic_task_handle = Some(periodic_task_handle);
797
798 Ok(())
799 }
800}
801
802pub fn build_create_table_sql(
803 schema_name: Option<&str>,
804 table_name: &str,
805 schema: &Schema,
806 need_op_and_row_id: bool,
807) -> Result<String> {
808 let mut columns: Vec<String> = schema
809 .fields
810 .iter()
811 .map(|field| {
812 let data_type = convert_redshift_data_type(&field.data_type)?;
813 Ok(format!("{} {}", field.name, data_type))
814 })
815 .collect::<Result<Vec<String>>>()?;
816 if need_op_and_row_id {
817 columns.push(format!("{} VARCHAR(MAX)", __ROW_ID));
818 columns.push(format!("{} INT", __OP));
819 }
820 let columns_str = columns.join(", ");
821 let full_table_name = build_full_table_name(schema_name, table_name);
822 Ok(format!(
823 "CREATE TABLE IF NOT EXISTS {} ({})",
824 full_table_name, columns_str
825 ))
826}
827
828fn convert_redshift_data_type(data_type: &DataType) -> Result<String> {
829 let data_type = match data_type {
830 DataType::Int16 => "SMALLINT".to_owned(),
831 DataType::Int32 => "INTEGER".to_owned(),
832 DataType::Int64 => "BIGINT".to_owned(),
833 DataType::Float32 => "REAL".to_owned(),
834 DataType::Float64 => "FLOAT".to_owned(),
835 DataType::Boolean => "BOOLEAN".to_owned(),
836 DataType::Varchar => "VARCHAR(MAX)".to_owned(),
837 DataType::Date => "DATE".to_owned(),
838 DataType::Timestamp => "TIMESTAMP".to_owned(),
839 DataType::Timestamptz => "TIMESTAMPTZ".to_owned(),
840 DataType::Jsonb => "VARCHAR(MAX)".to_owned(),
841 DataType::Decimal => "DECIMAL".to_owned(),
842 DataType::Time => "TIME".to_owned(),
843 _ => {
844 return Err(SinkError::Config(anyhow!(
845 "Dont support auto create table for datatype: {}",
846 data_type
847 )));
848 }
849 };
850 Ok(data_type)
851}
852
853fn build_create_merge_into_task_sql(
854 cdc_schema_name: Option<&str>,
855 target_schema_name: Option<&str>,
856 cdc_table_name: &str,
857 target_table_name: &str,
858 pk_column_names: &Vec<String>,
859 all_column_names: &Vec<String>,
860) -> Vec<String> {
861 let cdc_table_name = build_full_table_name(cdc_schema_name, cdc_table_name);
862 let target_table_name = build_full_table_name(target_schema_name, target_table_name);
863 let pk_names_str = pk_column_names.join(", ");
864 let pk_names_eq_str = pk_column_names
865 .iter()
866 .map(|name| format!("{target_table_name}.{name} = source.{name}", name = name))
867 .collect::<Vec<String>>()
868 .join(" AND ");
869 let all_column_names_set_str = all_column_names
870 .iter()
871 .map(|name| format!("{name} = source.{name}", name = name))
872 .collect::<Vec<String>>()
873 .join(", ");
874 let all_column_names_str = all_column_names.join(", ");
875 let all_column_names_insert_str = all_column_names
876 .iter()
877 .map(|name| format!("source.{name}", name = name))
878 .collect::<Vec<String>>()
879 .join(", ");
880
881 vec![
882 format!(
883 r#"
884 CREATE TEMP TABLE max_id_table AS
885 SELECT COALESCE(MAX({redshift_sink_row_id}), '0') AS max_row_id
886 FROM {cdc_table_name};
887 "#,
888 redshift_sink_row_id = __ROW_ID,
889 cdc_table_name = cdc_table_name,
890 ),
891 format!(
892 r#"
893 DELETE FROM {target_table_name}
894 USING (
895 SELECT *
896 FROM (
897 SELECT *, ROW_NUMBER() OVER (
898 PARTITION BY {pk_names_str}
899 ORDER BY {redshift_sink_row_id} DESC
900 ) AS dedupe_id
901 FROM {cdc_table_name}, max_id_table
902 WHERE {cdc_table_name}.{redshift_sink_row_id} <= max_id_table.max_row_id
903 ) AS subquery
904 WHERE dedupe_id = 1 AND {redshift_sink_op} IN (2, 4)
905 ) AS source
906 WHERE {pk_names_eq_str};
907 "#,
908 target_table_name = target_table_name,
909 pk_names_str = pk_names_str,
910 redshift_sink_row_id = __ROW_ID,
911 cdc_table_name = cdc_table_name,
912 redshift_sink_op = __OP,
913 pk_names_eq_str = pk_names_eq_str,
914 ),
915 format!(
916 r#"
917 MERGE INTO {target_table_name}
918 USING (
919 SELECT *
920 FROM (
921 SELECT *, ROW_NUMBER() OVER (
922 PARTITION BY {pk_names_str}
923 ORDER BY {redshift_sink_row_id} DESC
924 ) AS dedupe_id
925 FROM {cdc_table_name}, max_id_table
926 WHERE {cdc_table_name}.{redshift_sink_row_id} <= max_id_table.max_row_id
927 ) AS subquery
928 WHERE dedupe_id = 1 AND {redshift_sink_op} IN (1, 3)
929 ) AS source
930 ON {pk_names_eq_str}
931 WHEN MATCHED THEN
932 UPDATE SET {all_column_names_set_str}
933 WHEN NOT MATCHED THEN
934 INSERT ({all_column_names_str}) VALUES ({all_column_names_insert_str});
935 "#,
936 target_table_name = target_table_name,
937 pk_names_str = pk_names_str,
938 redshift_sink_row_id = __ROW_ID,
939 cdc_table_name = cdc_table_name,
940 redshift_sink_op = __OP,
941 pk_names_eq_str = pk_names_eq_str,
942 all_column_names_set_str = all_column_names_set_str,
943 all_column_names_str = all_column_names_str,
944 all_column_names_insert_str = all_column_names_insert_str,
945 ),
946 format!(
947 r#"
948 DELETE FROM {cdc_table_name}
949 USING max_id_table
950 WHERE {cdc_table_name}.{redshift_sink_row_id} <= max_id_table.max_row_id;
951 "#,
952 cdc_table_name = cdc_table_name,
953 redshift_sink_row_id = __ROW_ID,
954 ),
955 "DROP TABLE IF EXISTS max_id_table;".to_owned(),
956 ]
957}
958
959fn build_copy_into_sql(
960 schema_name: Option<&str>,
961 table_name: &str,
962 manifest_dir: &str,
963 access_key: &Option<String>,
964 secret_key: &Option<String>,
965 assume_role: &Option<String>,
966) -> Result<String> {
967 let table_name = build_full_table_name(schema_name, table_name);
968 let credentials = if let Some(assume_role) = assume_role {
969 &format!("aws_iam_role={}", assume_role)
970 } else if let (Some(access_key), Some(secret_key)) = (access_key, secret_key) {
971 &format!(
972 "aws_access_key_id={};aws_secret_access_key={}",
973 access_key, secret_key
974 )
975 } else {
976 return Err(SinkError::Config(anyhow!(
977 "Either assume_role or access_key and secret_key must be provided for Redshift COPY command"
978 )));
979 };
980 Ok(format!(
981 r#"
982 COPY {table_name}
983 FROM '{manifest_dir}'
984 CREDENTIALS '{credentials}'
985 FORMAT AS JSON 'auto'
986 DATEFORMAT 'auto'
987 TIMEFORMAT 'auto'
988 MANIFEST;
989 "#,
990 table_name = table_name,
991 manifest_dir = manifest_dir,
992 credentials = credentials
993 ))
994}