risingwave_connector/sink/snowflake_redshift/
mod.rs1use std::fmt::Write;
16use std::sync::Arc;
17use std::time::{SystemTime, UNIX_EPOCH};
18
19use bytes::BytesMut;
20use opendal::Operator;
21use risingwave_common::array::{ArrayImpl, DataChunk, Op, PrimitiveArray, StreamChunk, Utf8Array};
22use risingwave_common::catalog::{ColumnDesc, ColumnId, Schema};
23use risingwave_common::row::Row;
24use risingwave_common::types::DataType;
25use serde_json::{Map, Value};
26use thiserror_ext::AsReport;
27use uuid::Uuid;
28
29use crate::sink::encoder::{
30 JsonEncoder, JsonbHandlingMode, RowEncoder, TimeHandlingMode, TimestampHandlingMode,
31 TimestamptzHandlingMode,
32};
33use crate::sink::file_sink::opendal_sink::FileSink;
34use crate::sink::file_sink::s3::{S3Common, S3Sink};
35use crate::sink::remote::CoordinatedRemoteSinkWriter;
36use crate::sink::writer::SinkWriter;
37use crate::sink::{Result, SinkError, SinkParam, SinkWriterMetrics, SinkWriterParam};
38
39pub mod redshift;
40pub mod snowflake;
41
42pub const __ROW_ID: &str = "__row_id";
43pub const __OP: &str = "__op";
44
45pub struct AugmentedRow {
46 row_encoder: JsonEncoder,
47 current_epoch: u64,
48 current_row_count: usize,
49 is_append_only: bool,
50}
51
52impl AugmentedRow {
53 pub fn new(current_epoch: u64, is_append_only: bool, schema: Schema) -> Self {
54 let row_encoder = JsonEncoder::new(
55 schema,
56 None,
57 crate::sink::encoder::DateHandlingMode::String,
58 TimestampHandlingMode::String,
59 TimestamptzHandlingMode::UtcString,
60 TimeHandlingMode::String,
61 JsonbHandlingMode::String,
62 );
63 Self {
64 row_encoder,
65 current_epoch,
66 current_row_count: 0,
67 is_append_only,
68 }
69 }
70
71 pub fn reset_epoch(&mut self, current_epoch: u64) {
72 if self.is_append_only || current_epoch == self.current_epoch {
73 return;
74 }
75 self.current_epoch = current_epoch;
76 self.current_row_count = 0;
77 }
78
79 pub fn augmented_row(&mut self, row: impl Row, op: Op) -> Result<Map<String, Value>> {
80 let mut row = self.row_encoder.encode(row)?;
81 if self.is_append_only {
82 return Ok(row);
83 }
84 self.current_row_count += 1;
85 row.insert(
86 __ROW_ID.to_owned(),
87 Value::String(format!("{}_{}", self.current_epoch, self.current_row_count)),
88 );
89 row.insert(
90 __OP.to_owned(),
91 Value::Number(serde_json::Number::from(op.to_i16())),
92 );
93 Ok(row)
94 }
95}
96
97pub struct AugmentedChunk {
98 current_epoch: u64,
99 current_row_count: usize,
100 is_append_only: bool,
101}
102
103impl AugmentedChunk {
104 pub fn new(current_epoch: u64, is_append_only: bool) -> Self {
105 Self {
106 current_epoch,
107 current_row_count: 0,
108 is_append_only,
109 }
110 }
111
112 pub fn reset_epoch(&mut self, current_epoch: u64) {
113 if self.is_append_only || current_epoch == self.current_epoch {
114 return;
115 }
116 self.current_epoch = current_epoch;
117 self.current_row_count = 0;
118 }
119
120 pub fn augmented_chunk(&mut self, chunk: StreamChunk) -> Result<StreamChunk> {
121 if self.is_append_only {
122 return Ok(chunk);
123 }
124 let (data_chunk, ops) = chunk.into_parts();
125 let chunk_row_count = data_chunk.capacity();
126 let (columns, visibility) = data_chunk.into_parts();
127
128 let op_column = ops.iter().map(|op| op.to_i16() as i32).collect::<Vec<_>>();
129 let row_column_strings: Vec<String> = (0..chunk_row_count)
130 .map(|i| format!("{}_{}", self.current_epoch, self.current_row_count + i))
131 .collect();
132
133 let row_column_refs: Vec<&str> = row_column_strings.iter().map(|s| s.as_str()).collect();
134 self.current_row_count += chunk_row_count;
135
136 let mut arrays: Vec<Arc<ArrayImpl>> = columns;
137 arrays.push(Arc::new(ArrayImpl::Utf8(Utf8Array::from_iter(
138 row_column_refs,
139 ))));
140 arrays.push(Arc::new(ArrayImpl::Int32(
141 PrimitiveArray::<i32>::from_iter(op_column),
142 )));
143
144 let chunk = DataChunk::new(arrays, visibility);
145 let ops = vec![Op::Insert; chunk_row_count];
146 let chunk = StreamChunk::from_parts(ops, chunk);
147 Ok(chunk)
148 }
149}
150
151pub struct SnowflakeRedshiftSinkS3Writer {
152 s3_config: S3Common,
153 s3_operator: Operator,
154 augmented_row: AugmentedRow,
155 opendal_writer_path: Option<(opendal::Writer, String)>,
156 target_table_name: String,
157}
158
159impl SnowflakeRedshiftSinkS3Writer {
160 pub fn new(
161 s3_config: S3Common,
162 schema: Schema,
163 is_append_only: bool,
164 target_table_name: String,
165 ) -> Result<Self> {
166 let s3_operator = FileSink::<S3Sink>::new_s3_sink(&s3_config)?;
167 Ok(Self {
168 s3_config,
169 s3_operator,
170 opendal_writer_path: None,
171 augmented_row: AugmentedRow::new(0, is_append_only, schema),
172 target_table_name,
173 })
174 }
175
176 pub fn begin_epoch(&mut self, epoch: u64) -> Result<()> {
177 self.augmented_row.reset_epoch(epoch);
178 Ok(())
179 }
180
181 pub async fn write_batch(&mut self, chunk: StreamChunk) -> Result<()> {
182 if self.opendal_writer_path.is_none() {
183 let opendal_writer_path = build_opendal_writer_path(
184 &self.s3_config,
185 &self.s3_operator,
186 None,
187 &self.target_table_name,
188 )
189 .await?;
190 self.opendal_writer_path = Some(opendal_writer_path);
191 }
192 let mut chunk_buf = BytesMut::new();
193 for (op, row) in chunk.rows() {
194 let encoded_row = self.augmented_row.augmented_row(row, op)?;
195 writeln!(chunk_buf, "{}", Value::Object(encoded_row)).unwrap(); }
197 self.opendal_writer_path
198 .as_mut()
199 .ok_or_else(|| SinkError::File("Sink writer is not created.".to_owned()))?
200 .0
201 .write(chunk_buf.freeze())
202 .await?;
203 Ok(())
204 }
205
206 pub async fn barrier(&mut self, is_checkpoint: bool) -> Result<Option<String>> {
207 if is_checkpoint && let Some((mut writer, path)) = self.opendal_writer_path.take() {
208 writer
209 .close()
210 .await
211 .map_err(|e| SinkError::File(e.to_report_string()))?;
212 Ok(Some(path))
213 } else {
214 Ok(None)
215 }
216 }
217}
218
219pub async fn build_opendal_writer_path(
220 s3_config: &S3Common,
221 operator: &Operator,
222 dir: Option<&str>,
223 target_table_name: &str,
224) -> Result<(opendal::Writer, String)> {
225 let mut base_path = s3_config.path.clone().unwrap_or("".to_owned());
226 if !base_path.ends_with('/') {
227 base_path.push('/');
228 }
229 base_path.push_str(&format!("{}/", target_table_name));
230 if let Some(dir) = dir {
231 base_path.push_str(&format!("{}/", dir));
232 }
233 let create_time = SystemTime::now()
234 .duration_since(UNIX_EPOCH)
235 .expect("Time went backwards");
236 let object_name = format!(
237 "{}{}_{}.{}",
238 base_path,
239 Uuid::new_v4(),
240 create_time.as_millis(),
241 "json",
242 );
243 let all_path = format!("s3://{}/{}", s3_config.bucket_name, object_name);
244 Ok((
245 operator.writer_with(&object_name).concurrent(8).await?,
246 all_path,
247 ))
248}
249
250pub struct SnowflakeRedshiftSinkJdbcWriter {
252 augmented_row: AugmentedChunk,
253 jdbc_sink_writer: CoordinatedRemoteSinkWriter,
254}
255
256impl SnowflakeRedshiftSinkJdbcWriter {
257 pub async fn new(
258 is_append_only: bool,
259 writer_param: SinkWriterParam,
260 mut param: SinkParam,
261 full_table_name: String,
262 ) -> Result<Self> {
263 let metrics = SinkWriterMetrics::new(&writer_param);
264 let column_descs = &mut param.columns;
265
266 if !is_append_only {
268 let max_column_id = column_descs
270 .iter()
271 .map(|column| column.column_id.get_id())
272 .max()
273 .unwrap_or(0);
274
275 (*column_descs).push(ColumnDesc::named(
276 __ROW_ID,
277 ColumnId::new(max_column_id + 1),
278 DataType::Varchar,
279 ));
280 (*column_descs).push(ColumnDesc::named(
281 __OP,
282 ColumnId::new(max_column_id + 2),
283 DataType::Int32,
284 ));
285 };
286
287 if let Some(schema_name) = param.properties.remove("schema") {
288 param
289 .properties
290 .insert("schema.name".to_owned(), schema_name);
291 }
292 if let Some(database_name) = param.properties.remove("database") {
293 param
294 .properties
295 .insert("database.name".to_owned(), database_name);
296 }
297 param
298 .properties
299 .insert("table.name".to_owned(), full_table_name.clone());
300 param
301 .properties
302 .insert("type".to_owned(), "append-only".to_owned());
303
304 let jdbc_sink_writer =
305 CoordinatedRemoteSinkWriter::new(param.clone(), metrics.clone()).await?;
306
307 Ok(Self {
308 augmented_row: AugmentedChunk::new(0, is_append_only),
309 jdbc_sink_writer,
310 })
311 }
312
313 pub async fn begin_epoch(&mut self, epoch: u64) -> Result<()> {
314 self.augmented_row.reset_epoch(epoch);
315 self.jdbc_sink_writer.begin_epoch(epoch).await?;
316 Ok(())
317 }
318
319 pub async fn write_batch(&mut self, chunk: StreamChunk) -> Result<()> {
320 let chunk = self.augmented_row.augmented_chunk(chunk)?;
321 self.jdbc_sink_writer.write_batch(chunk).await?;
322 Ok(())
323 }
324
325 pub async fn barrier(&mut self, is_checkpoint: bool) -> Result<()> {
326 self.jdbc_sink_writer.barrier(is_checkpoint).await?;
327 Ok(())
328 }
329
330 pub async fn abort(&mut self) -> Result<()> {
331 self.jdbc_sink_writer.abort().await?;
333 Ok(())
334 }
335}