risingwave_connector/sink/snowflake_redshift/
mod.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14use std::fmt::Write;
15use std::sync::Arc;
16use std::time::{SystemTime, UNIX_EPOCH};
17
18use bytes::BytesMut;
19use opendal::Operator;
20use risingwave_common::array::{ArrayImpl, DataChunk, Op, PrimitiveArray, StreamChunk, Utf8Array};
21use risingwave_common::catalog::Schema;
22use risingwave_common::row::Row;
23use serde_json::{Map, Value};
24use thiserror_ext::AsReport;
25
26use crate::sink::encoder::{
27    JsonEncoder, JsonbHandlingMode, RowEncoder, TimeHandlingMode, TimestampHandlingMode,
28    TimestamptzHandlingMode,
29};
30use crate::sink::file_sink::opendal_sink::FileSink;
31use crate::sink::file_sink::s3::{S3Common, S3Sink};
32use crate::sink::{Result, SinkError, SinkWriterParam};
33
34pub mod redshift;
35pub mod snowflake;
36
37pub const __ROW_ID: &str = "__row_id";
38pub const __OP: &str = "__op";
39
40pub struct AugmentedRow {
41    row_encoder: JsonEncoder,
42    current_epoch: u64,
43    current_row_count: usize,
44    is_append_only: bool,
45}
46
47impl AugmentedRow {
48    pub fn new(current_epoch: u64, is_append_only: bool, schema: Schema) -> Self {
49        let row_encoder = JsonEncoder::new(
50            schema,
51            None,
52            crate::sink::encoder::DateHandlingMode::String,
53            TimestampHandlingMode::String,
54            TimestamptzHandlingMode::UtcString,
55            TimeHandlingMode::String,
56            JsonbHandlingMode::String,
57        );
58        Self {
59            row_encoder,
60            current_epoch,
61            current_row_count: 0,
62            is_append_only,
63        }
64    }
65
66    pub fn reset_epoch(&mut self, current_epoch: u64) {
67        if self.is_append_only || current_epoch == self.current_epoch {
68            return;
69        }
70        self.current_epoch = current_epoch;
71        self.current_row_count = 0;
72    }
73
74    pub fn augmented_row(&mut self, row: impl Row, op: Op) -> Result<Map<String, Value>> {
75        let mut row = self.row_encoder.encode(row)?;
76        if self.is_append_only {
77            return Ok(row);
78        }
79        self.current_row_count += 1;
80        row.insert(
81            __ROW_ID.to_owned(),
82            Value::String(format!("{}_{}", self.current_epoch, self.current_row_count)),
83        );
84        row.insert(
85            __OP.to_owned(),
86            Value::Number(serde_json::Number::from(op.to_i16())),
87        );
88        Ok(row)
89    }
90}
91
92pub struct AugmentedChunk {
93    current_epoch: u64,
94    current_row_count: usize,
95    is_append_only: bool,
96}
97
98impl AugmentedChunk {
99    pub fn new(current_epoch: u64, is_append_only: bool) -> Self {
100        Self {
101            current_epoch,
102            current_row_count: 0,
103            is_append_only,
104        }
105    }
106
107    pub fn reset_epoch(&mut self, current_epoch: u64) {
108        if self.is_append_only || current_epoch == self.current_epoch {
109            return;
110        }
111        self.current_epoch = current_epoch;
112        self.current_row_count = 0;
113    }
114
115    pub fn augmented_chunk(&mut self, chunk: StreamChunk) -> Result<StreamChunk> {
116        if self.is_append_only {
117            return Ok(chunk);
118        }
119        let (data_chunk, ops) = chunk.into_parts();
120        let chunk_row_count = data_chunk.capacity();
121        let (columns, visibility) = data_chunk.into_parts();
122
123        let op_column = ops.iter().map(|op| op.to_i16() as i32).collect::<Vec<_>>();
124        let row_column_strings: Vec<String> = (0..chunk_row_count)
125            .map(|i| format!("{}_{}", self.current_epoch, self.current_row_count + i))
126            .collect();
127
128        let row_column_refs: Vec<&str> = row_column_strings.iter().map(|s| s.as_str()).collect();
129        self.current_row_count += chunk_row_count;
130
131        let mut arrays: Vec<Arc<ArrayImpl>> = columns;
132        arrays.push(Arc::new(ArrayImpl::Utf8(Utf8Array::from_iter(
133            row_column_refs,
134        ))));
135        arrays.push(Arc::new(ArrayImpl::Int32(
136            PrimitiveArray::<i32>::from_iter(op_column),
137        )));
138
139        let chunk = DataChunk::new(arrays, visibility);
140        let ops = vec![Op::Insert; chunk_row_count];
141        let chunk = StreamChunk::from_parts(ops, chunk);
142        Ok(chunk)
143    }
144}
145
146pub struct SnowflakeRedshiftSinkS3Writer {
147    s3_config: S3Common,
148    s3_operator: Operator,
149    augmented_row: AugmentedRow,
150    opendal_writer_path: Option<(opendal::Writer, String)>,
151    executor_id: u64,
152    target_table_name: Option<String>,
153}
154
155pub async fn build_opendal_writer_path(
156    s3_config: &S3Common,
157    executor_id: u64,
158    operator: &Operator,
159    target_table_name: &Option<String>,
160) -> Result<(opendal::Writer, String)> {
161    let mut base_path = s3_config.path.clone().unwrap_or("".to_owned());
162    if !base_path.ends_with('/') {
163        base_path.push('/');
164    }
165    if let Some(table_name) = &target_table_name {
166        base_path.push_str(&format!("{}/", table_name));
167    }
168    let create_time = SystemTime::now()
169        .duration_since(UNIX_EPOCH)
170        .expect("Time went backwards");
171    let object_name = format!(
172        "{}{}_{}.{}",
173        base_path,
174        executor_id,
175        create_time.as_secs(),
176        "json",
177    );
178    let all_path = format!("s3://{}/{}", s3_config.bucket_name, object_name);
179    Ok((
180        operator.writer_with(&object_name).concurrent(8).await?,
181        all_path,
182    ))
183}
184
185impl SnowflakeRedshiftSinkS3Writer {
186    pub fn new(
187        s3_config: S3Common,
188        schema: Schema,
189        is_append_only: bool,
190        executor_id: u64,
191        target_table_name: Option<String>,
192    ) -> Result<Self> {
193        let s3_operator = FileSink::<S3Sink>::new_s3_sink(&s3_config)?;
194        Ok(Self {
195            s3_config,
196            s3_operator,
197            opendal_writer_path: None,
198            executor_id,
199            augmented_row: AugmentedRow::new(0, is_append_only, schema),
200            target_table_name,
201        })
202    }
203
204    pub fn begin_epoch(&mut self, epoch: u64) -> Result<()> {
205        self.augmented_row.reset_epoch(epoch);
206        Ok(())
207    }
208
209    pub async fn write_batch(&mut self, chunk: StreamChunk) -> Result<()> {
210        if self.opendal_writer_path.is_none() {
211            let opendal_writer_path = build_opendal_writer_path(
212                &self.s3_config,
213                self.executor_id,
214                &self.s3_operator,
215                &self.target_table_name,
216            )
217            .await?;
218            self.opendal_writer_path = Some(opendal_writer_path);
219        }
220        let mut chunk_buf = BytesMut::new();
221        for (op, row) in chunk.rows() {
222            let encoded_row = self.augmented_row.augmented_row(row, op)?;
223            writeln!(chunk_buf, "{}", Value::Object(encoded_row)).unwrap(); // write to a `BytesMut` should never fail
224        }
225        self.opendal_writer_path
226            .as_mut()
227            .ok_or_else(|| SinkError::File("Sink writer is not created.".to_owned()))?
228            .0
229            .write(chunk_buf.freeze())
230            .await?;
231        Ok(())
232    }
233
234    pub async fn barrier(&mut self, is_checkpoint: bool) -> Result<Option<String>> {
235        if is_checkpoint && let Some((mut writer, path)) = self.opendal_writer_path.take() {
236            writer
237                .close()
238                .await
239                .map_err(|e| SinkError::File(e.to_report_string()))?;
240            Ok(Some(path))
241        } else {
242            Ok(None)
243        }
244    }
245}