risingwave_connector/sink/snowflake_redshift/
mod.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::fmt::Write;
16use std::sync::Arc;
17use std::time::{SystemTime, UNIX_EPOCH};
18
19use bytes::BytesMut;
20use opendal::Operator;
21use risingwave_common::array::{ArrayImpl, DataChunk, Op, PrimitiveArray, StreamChunk, Utf8Array};
22use risingwave_common::catalog::Schema;
23use risingwave_common::row::Row;
24use risingwave_pb::id::ExecutorId;
25use serde_json::{Map, Value};
26use thiserror_ext::AsReport;
27
28use crate::sink::encoder::{
29    JsonEncoder, JsonbHandlingMode, RowEncoder, TimeHandlingMode, TimestampHandlingMode,
30    TimestamptzHandlingMode,
31};
32use crate::sink::file_sink::opendal_sink::FileSink;
33use crate::sink::file_sink::s3::{S3Common, S3Sink};
34use crate::sink::{Result, SinkError, SinkWriterParam};
35
36pub mod redshift;
37pub mod snowflake;
38
39pub const __ROW_ID: &str = "__row_id";
40pub const __OP: &str = "__op";
41
42pub struct AugmentedRow {
43    row_encoder: JsonEncoder,
44    current_epoch: u64,
45    current_row_count: usize,
46    is_append_only: bool,
47}
48
49impl AugmentedRow {
50    pub fn new(current_epoch: u64, is_append_only: bool, schema: Schema) -> Self {
51        let row_encoder = JsonEncoder::new(
52            schema,
53            None,
54            crate::sink::encoder::DateHandlingMode::String,
55            TimestampHandlingMode::String,
56            TimestamptzHandlingMode::UtcString,
57            TimeHandlingMode::String,
58            JsonbHandlingMode::String,
59        );
60        Self {
61            row_encoder,
62            current_epoch,
63            current_row_count: 0,
64            is_append_only,
65        }
66    }
67
68    pub fn reset_epoch(&mut self, current_epoch: u64) {
69        if self.is_append_only || current_epoch == self.current_epoch {
70            return;
71        }
72        self.current_epoch = current_epoch;
73        self.current_row_count = 0;
74    }
75
76    pub fn augmented_row(&mut self, row: impl Row, op: Op) -> Result<Map<String, Value>> {
77        let mut row = self.row_encoder.encode(row)?;
78        if self.is_append_only {
79            return Ok(row);
80        }
81        self.current_row_count += 1;
82        row.insert(
83            __ROW_ID.to_owned(),
84            Value::String(format!("{}_{}", self.current_epoch, self.current_row_count)),
85        );
86        row.insert(
87            __OP.to_owned(),
88            Value::Number(serde_json::Number::from(op.to_i16())),
89        );
90        Ok(row)
91    }
92}
93
94pub struct AugmentedChunk {
95    current_epoch: u64,
96    current_row_count: usize,
97    is_append_only: bool,
98}
99
100impl AugmentedChunk {
101    pub fn new(current_epoch: u64, is_append_only: bool) -> Self {
102        Self {
103            current_epoch,
104            current_row_count: 0,
105            is_append_only,
106        }
107    }
108
109    pub fn reset_epoch(&mut self, current_epoch: u64) {
110        if self.is_append_only || current_epoch == self.current_epoch {
111            return;
112        }
113        self.current_epoch = current_epoch;
114        self.current_row_count = 0;
115    }
116
117    pub fn augmented_chunk(&mut self, chunk: StreamChunk) -> Result<StreamChunk> {
118        if self.is_append_only {
119            return Ok(chunk);
120        }
121        let (data_chunk, ops) = chunk.into_parts();
122        let chunk_row_count = data_chunk.capacity();
123        let (columns, visibility) = data_chunk.into_parts();
124
125        let op_column = ops.iter().map(|op| op.to_i16() as i32).collect::<Vec<_>>();
126        let row_column_strings: Vec<String> = (0..chunk_row_count)
127            .map(|i| format!("{}_{}", self.current_epoch, self.current_row_count + i))
128            .collect();
129
130        let row_column_refs: Vec<&str> = row_column_strings.iter().map(|s| s.as_str()).collect();
131        self.current_row_count += chunk_row_count;
132
133        let mut arrays: Vec<Arc<ArrayImpl>> = columns;
134        arrays.push(Arc::new(ArrayImpl::Utf8(Utf8Array::from_iter(
135            row_column_refs,
136        ))));
137        arrays.push(Arc::new(ArrayImpl::Int32(
138            PrimitiveArray::<i32>::from_iter(op_column),
139        )));
140
141        let chunk = DataChunk::new(arrays, visibility);
142        let ops = vec![Op::Insert; chunk_row_count];
143        let chunk = StreamChunk::from_parts(ops, chunk);
144        Ok(chunk)
145    }
146}
147
148pub struct SnowflakeRedshiftSinkS3Writer {
149    s3_config: S3Common,
150    s3_operator: Operator,
151    augmented_row: AugmentedRow,
152    opendal_writer_path: Option<(opendal::Writer, String)>,
153    executor_id: ExecutorId,
154    target_table_name: Option<String>,
155}
156
157pub async fn build_opendal_writer_path(
158    s3_config: &S3Common,
159    executor_id: ExecutorId,
160    operator: &Operator,
161    target_table_name: &Option<String>,
162) -> Result<(opendal::Writer, String)> {
163    let mut base_path = s3_config.path.clone().unwrap_or("".to_owned());
164    if !base_path.ends_with('/') {
165        base_path.push('/');
166    }
167    if let Some(table_name) = &target_table_name {
168        base_path.push_str(&format!("{}/", table_name));
169    }
170    let create_time = SystemTime::now()
171        .duration_since(UNIX_EPOCH)
172        .expect("Time went backwards");
173    let object_name = format!(
174        "{}{}_{}.{}",
175        base_path,
176        executor_id,
177        create_time.as_secs(),
178        "json",
179    );
180    let all_path = format!("s3://{}/{}", s3_config.bucket_name, object_name);
181    Ok((
182        operator.writer_with(&object_name).concurrent(8).await?,
183        all_path,
184    ))
185}
186
187impl SnowflakeRedshiftSinkS3Writer {
188    pub fn new(
189        s3_config: S3Common,
190        schema: Schema,
191        is_append_only: bool,
192        executor_id: ExecutorId,
193        target_table_name: Option<String>,
194    ) -> Result<Self> {
195        let s3_operator = FileSink::<S3Sink>::new_s3_sink(&s3_config)?;
196        Ok(Self {
197            s3_config,
198            s3_operator,
199            opendal_writer_path: None,
200            executor_id,
201            augmented_row: AugmentedRow::new(0, is_append_only, schema),
202            target_table_name,
203        })
204    }
205
206    pub fn begin_epoch(&mut self, epoch: u64) -> Result<()> {
207        self.augmented_row.reset_epoch(epoch);
208        Ok(())
209    }
210
211    pub async fn write_batch(&mut self, chunk: StreamChunk) -> Result<()> {
212        if self.opendal_writer_path.is_none() {
213            let opendal_writer_path = build_opendal_writer_path(
214                &self.s3_config,
215                self.executor_id,
216                &self.s3_operator,
217                &self.target_table_name,
218            )
219            .await?;
220            self.opendal_writer_path = Some(opendal_writer_path);
221        }
222        let mut chunk_buf = BytesMut::new();
223        for (op, row) in chunk.rows() {
224            let encoded_row = self.augmented_row.augmented_row(row, op)?;
225            writeln!(chunk_buf, "{}", Value::Object(encoded_row)).unwrap(); // write to a `BytesMut` should never fail
226        }
227        self.opendal_writer_path
228            .as_mut()
229            .ok_or_else(|| SinkError::File("Sink writer is not created.".to_owned()))?
230            .0
231            .write(chunk_buf.freeze())
232            .await?;
233        Ok(())
234    }
235
236    pub async fn barrier(&mut self, is_checkpoint: bool) -> Result<Option<String>> {
237        if is_checkpoint && let Some((mut writer, path)) = self.opendal_writer_path.take() {
238            writer
239                .close()
240                .await
241                .map_err(|e| SinkError::File(e.to_report_string()))?;
242            Ok(Some(path))
243        } else {
244            Ok(None)
245        }
246    }
247}