risingwave_connector/sink/snowflake_redshift/
mod.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::fmt::Write;
16use std::sync::Arc;
17use std::time::{SystemTime, UNIX_EPOCH};
18
19use bytes::BytesMut;
20use opendal::Operator;
21use risingwave_common::array::{ArrayImpl, DataChunk, Op, PrimitiveArray, StreamChunk, Utf8Array};
22use risingwave_common::catalog::{ColumnDesc, ColumnId, Schema};
23use risingwave_common::row::Row;
24use risingwave_common::types::DataType;
25use serde_json::{Map, Value};
26use thiserror_ext::AsReport;
27use uuid::Uuid;
28
29use crate::sink::encoder::{
30    JsonEncoder, JsonbHandlingMode, RowEncoder, TimeHandlingMode, TimestampHandlingMode,
31    TimestamptzHandlingMode,
32};
33use crate::sink::file_sink::opendal_sink::FileSink;
34use crate::sink::file_sink::s3::{S3Common, S3Sink};
35use crate::sink::remote::CoordinatedRemoteSinkWriter;
36use crate::sink::writer::SinkWriter;
37use crate::sink::{Result, SinkError, SinkParam, SinkWriterMetrics, SinkWriterParam};
38
39pub mod redshift;
40pub mod snowflake;
41
42pub const __ROW_ID: &str = "__row_id";
43pub const __OP: &str = "__op";
44
45pub struct AugmentedRow {
46    row_encoder: JsonEncoder,
47    current_epoch: u64,
48    current_row_count: usize,
49    is_append_only: bool,
50}
51
52impl AugmentedRow {
53    pub fn new(current_epoch: u64, is_append_only: bool, schema: Schema) -> Self {
54        let row_encoder = JsonEncoder::new(
55            schema,
56            None,
57            crate::sink::encoder::DateHandlingMode::String,
58            TimestampHandlingMode::String,
59            TimestamptzHandlingMode::UtcString,
60            TimeHandlingMode::String,
61            JsonbHandlingMode::String,
62        );
63        Self {
64            row_encoder,
65            current_epoch,
66            current_row_count: 0,
67            is_append_only,
68        }
69    }
70
71    pub fn reset_epoch(&mut self, current_epoch: u64) {
72        if self.is_append_only || current_epoch == self.current_epoch {
73            return;
74        }
75        self.current_epoch = current_epoch;
76        self.current_row_count = 0;
77    }
78
79    pub fn augmented_row(&mut self, row: impl Row, op: Op) -> Result<Map<String, Value>> {
80        let mut row = self.row_encoder.encode(row)?;
81        if self.is_append_only {
82            return Ok(row);
83        }
84        self.current_row_count += 1;
85        row.insert(
86            __ROW_ID.to_owned(),
87            Value::String(format!("{}_{}", self.current_epoch, self.current_row_count)),
88        );
89        row.insert(
90            __OP.to_owned(),
91            Value::Number(serde_json::Number::from(op.to_i16())),
92        );
93        Ok(row)
94    }
95}
96
97pub struct AugmentedChunk {
98    current_epoch: u64,
99    current_row_count: usize,
100    is_append_only: bool,
101}
102
103impl AugmentedChunk {
104    pub fn new(current_epoch: u64, is_append_only: bool) -> Self {
105        Self {
106            current_epoch,
107            current_row_count: 0,
108            is_append_only,
109        }
110    }
111
112    pub fn reset_epoch(&mut self, current_epoch: u64) {
113        if self.is_append_only || current_epoch == self.current_epoch {
114            return;
115        }
116        self.current_epoch = current_epoch;
117        self.current_row_count = 0;
118    }
119
120    pub fn augmented_chunk(&mut self, chunk: StreamChunk) -> Result<StreamChunk> {
121        if self.is_append_only {
122            return Ok(chunk);
123        }
124        let (data_chunk, ops) = chunk.into_parts();
125        let chunk_row_count = data_chunk.capacity();
126        let (columns, visibility) = data_chunk.into_parts();
127
128        let op_column = ops.iter().map(|op| op.to_i16() as i32).collect::<Vec<_>>();
129        let row_column_strings: Vec<String> = (0..chunk_row_count)
130            .map(|i| format!("{}_{}", self.current_epoch, self.current_row_count + i))
131            .collect();
132
133        let row_column_refs: Vec<&str> = row_column_strings.iter().map(|s| s.as_str()).collect();
134        self.current_row_count += chunk_row_count;
135
136        let mut arrays: Vec<Arc<ArrayImpl>> = columns;
137        arrays.push(Arc::new(ArrayImpl::Utf8(Utf8Array::from_iter(
138            row_column_refs,
139        ))));
140        arrays.push(Arc::new(ArrayImpl::Int32(
141            PrimitiveArray::<i32>::from_iter(op_column),
142        )));
143
144        let chunk = DataChunk::new(arrays, visibility);
145        let ops = vec![Op::Insert; chunk_row_count];
146        let chunk = StreamChunk::from_parts(ops, chunk);
147        Ok(chunk)
148    }
149}
150
151pub struct SnowflakeRedshiftSinkS3Writer {
152    s3_config: S3Common,
153    s3_operator: Operator,
154    augmented_row: AugmentedRow,
155    opendal_writer_path: Option<(opendal::Writer, String)>,
156    target_table_name: String,
157}
158
159impl SnowflakeRedshiftSinkS3Writer {
160    pub fn new(
161        s3_config: S3Common,
162        schema: Schema,
163        is_append_only: bool,
164        target_table_name: String,
165    ) -> Result<Self> {
166        let s3_operator = FileSink::<S3Sink>::new_s3_sink(&s3_config)?;
167        Ok(Self {
168            s3_config,
169            s3_operator,
170            opendal_writer_path: None,
171            augmented_row: AugmentedRow::new(0, is_append_only, schema),
172            target_table_name,
173        })
174    }
175
176    pub fn begin_epoch(&mut self, epoch: u64) -> Result<()> {
177        self.augmented_row.reset_epoch(epoch);
178        Ok(())
179    }
180
181    pub async fn write_batch(&mut self, chunk: StreamChunk) -> Result<()> {
182        if self.opendal_writer_path.is_none() {
183            let opendal_writer_path = build_opendal_writer_path(
184                &self.s3_config,
185                &self.s3_operator,
186                None,
187                &self.target_table_name,
188            )
189            .await?;
190            self.opendal_writer_path = Some(opendal_writer_path);
191        }
192        let mut chunk_buf = BytesMut::new();
193        for (op, row) in chunk.rows() {
194            let encoded_row = self.augmented_row.augmented_row(row, op)?;
195            writeln!(chunk_buf, "{}", Value::Object(encoded_row)).unwrap(); // write to a `BytesMut` should never fail
196        }
197        self.opendal_writer_path
198            .as_mut()
199            .ok_or_else(|| SinkError::File("Sink writer is not created.".to_owned()))?
200            .0
201            .write(chunk_buf.freeze())
202            .await?;
203        Ok(())
204    }
205
206    pub async fn barrier(&mut self, is_checkpoint: bool) -> Result<Option<String>> {
207        if is_checkpoint && let Some((mut writer, path)) = self.opendal_writer_path.take() {
208            writer
209                .close()
210                .await
211                .map_err(|e| SinkError::File(e.to_report_string()))?;
212            Ok(Some(path))
213        } else {
214            Ok(None)
215        }
216    }
217}
218
219pub async fn build_opendal_writer_path(
220    s3_config: &S3Common,
221    operator: &Operator,
222    dir: Option<&str>,
223    target_table_name: &str,
224) -> Result<(opendal::Writer, String)> {
225    let mut base_path = s3_config.path.clone().unwrap_or("".to_owned());
226    if !base_path.ends_with('/') {
227        base_path.push('/');
228    }
229    base_path.push_str(&format!("{}/", target_table_name));
230    if let Some(dir) = dir {
231        base_path.push_str(&format!("{}/", dir));
232    }
233    let create_time = SystemTime::now()
234        .duration_since(UNIX_EPOCH)
235        .expect("Time went backwards");
236    let object_name = format!(
237        "{}{}_{}.{}",
238        base_path,
239        Uuid::new_v4(),
240        create_time.as_millis(),
241        "json",
242    );
243    let all_path = format!("s3://{}/{}", s3_config.bucket_name, object_name);
244    Ok((
245        operator.writer_with(&object_name).concurrent(8).await?,
246        all_path,
247    ))
248}
249
250/// Generic JDBC writer for both Redshift and Snowflake sinks
251pub struct SnowflakeRedshiftSinkJdbcWriter {
252    augmented_row: AugmentedChunk,
253    jdbc_sink_writer: CoordinatedRemoteSinkWriter,
254}
255
256impl SnowflakeRedshiftSinkJdbcWriter {
257    pub async fn new(
258        is_append_only: bool,
259        writer_param: SinkWriterParam,
260        mut param: SinkParam,
261        full_table_name: String,
262    ) -> Result<Self> {
263        let metrics = SinkWriterMetrics::new(&writer_param);
264        let column_descs = &mut param.columns;
265
266        // Build full table name based on connector type
267        if !is_append_only {
268            // Add CDC-specific columns for upsert mode
269            let max_column_id = column_descs
270                .iter()
271                .map(|column| column.column_id.get_id())
272                .max()
273                .unwrap_or(0);
274
275            (*column_descs).push(ColumnDesc::named(
276                __ROW_ID,
277                ColumnId::new(max_column_id + 1),
278                DataType::Varchar,
279            ));
280            (*column_descs).push(ColumnDesc::named(
281                __OP,
282                ColumnId::new(max_column_id + 2),
283                DataType::Int32,
284            ));
285        };
286
287        if let Some(schema_name) = param.properties.remove("schema") {
288            param
289                .properties
290                .insert("schema.name".to_owned(), schema_name);
291        }
292        if let Some(database_name) = param.properties.remove("database") {
293            param
294                .properties
295                .insert("database.name".to_owned(), database_name);
296        }
297        param
298            .properties
299            .insert("table.name".to_owned(), full_table_name.clone());
300        param
301            .properties
302            .insert("type".to_owned(), "append-only".to_owned());
303
304        let jdbc_sink_writer =
305            CoordinatedRemoteSinkWriter::new(param.clone(), metrics.clone()).await?;
306
307        Ok(Self {
308            augmented_row: AugmentedChunk::new(0, is_append_only),
309            jdbc_sink_writer,
310        })
311    }
312
313    pub async fn begin_epoch(&mut self, epoch: u64) -> Result<()> {
314        self.augmented_row.reset_epoch(epoch);
315        self.jdbc_sink_writer.begin_epoch(epoch).await?;
316        Ok(())
317    }
318
319    pub async fn write_batch(&mut self, chunk: StreamChunk) -> Result<()> {
320        let chunk = self.augmented_row.augmented_chunk(chunk)?;
321        self.jdbc_sink_writer.write_batch(chunk).await?;
322        Ok(())
323    }
324
325    pub async fn barrier(&mut self, is_checkpoint: bool) -> Result<()> {
326        self.jdbc_sink_writer.barrier(is_checkpoint).await?;
327        Ok(())
328    }
329
330    pub async fn abort(&mut self) -> Result<()> {
331        // TODO: abort should clean up all the data written in this epoch
332        self.jdbc_sink_writer.abort().await?;
333        Ok(())
334    }
335}