risingwave_connector/sink/snowflake_redshift/
mod.rs1use std::fmt::Write;
15use std::sync::Arc;
16use std::time::{SystemTime, UNIX_EPOCH};
17
18use bytes::BytesMut;
19use opendal::Operator;
20use risingwave_common::array::{ArrayImpl, DataChunk, Op, PrimitiveArray, StreamChunk, Utf8Array};
21use risingwave_common::catalog::Schema;
22use risingwave_common::row::Row;
23use serde_json::{Map, Value};
24use thiserror_ext::AsReport;
25
26use crate::sink::encoder::{
27 JsonEncoder, JsonbHandlingMode, RowEncoder, TimeHandlingMode, TimestampHandlingMode,
28 TimestamptzHandlingMode,
29};
30use crate::sink::file_sink::opendal_sink::FileSink;
31use crate::sink::file_sink::s3::{S3Common, S3Sink};
32use crate::sink::{Result, SinkError, SinkWriterParam};
33
34pub mod redshift;
35pub mod snowflake;
36
37pub const __ROW_ID: &str = "__row_id";
38pub const __OP: &str = "__op";
39
40pub struct AugmentedRow {
41 row_encoder: JsonEncoder,
42 current_epoch: u64,
43 current_row_count: usize,
44 is_append_only: bool,
45}
46
47impl AugmentedRow {
48 pub fn new(current_epoch: u64, is_append_only: bool, schema: Schema) -> Self {
49 let row_encoder = JsonEncoder::new(
50 schema,
51 None,
52 crate::sink::encoder::DateHandlingMode::String,
53 TimestampHandlingMode::String,
54 TimestamptzHandlingMode::UtcString,
55 TimeHandlingMode::String,
56 JsonbHandlingMode::String,
57 );
58 Self {
59 row_encoder,
60 current_epoch,
61 current_row_count: 0,
62 is_append_only,
63 }
64 }
65
66 pub fn reset_epoch(&mut self, current_epoch: u64) {
67 if self.is_append_only || current_epoch == self.current_epoch {
68 return;
69 }
70 self.current_epoch = current_epoch;
71 self.current_row_count = 0;
72 }
73
74 pub fn augmented_row(&mut self, row: impl Row, op: Op) -> Result<Map<String, Value>> {
75 let mut row = self.row_encoder.encode(row)?;
76 if self.is_append_only {
77 return Ok(row);
78 }
79 self.current_row_count += 1;
80 row.insert(
81 __ROW_ID.to_owned(),
82 Value::String(format!("{}_{}", self.current_epoch, self.current_row_count)),
83 );
84 row.insert(
85 __OP.to_owned(),
86 Value::Number(serde_json::Number::from(op.to_i16())),
87 );
88 Ok(row)
89 }
90}
91
92pub struct AugmentedChunk {
93 current_epoch: u64,
94 current_row_count: usize,
95 is_append_only: bool,
96}
97
98impl AugmentedChunk {
99 pub fn new(current_epoch: u64, is_append_only: bool) -> Self {
100 Self {
101 current_epoch,
102 current_row_count: 0,
103 is_append_only,
104 }
105 }
106
107 pub fn reset_epoch(&mut self, current_epoch: u64) {
108 if self.is_append_only || current_epoch == self.current_epoch {
109 return;
110 }
111 self.current_epoch = current_epoch;
112 self.current_row_count = 0;
113 }
114
115 pub fn augmented_chunk(&mut self, chunk: StreamChunk) -> Result<StreamChunk> {
116 if self.is_append_only {
117 return Ok(chunk);
118 }
119 let (data_chunk, ops) = chunk.into_parts();
120 let chunk_row_count = data_chunk.capacity();
121 let (columns, visibility) = data_chunk.into_parts();
122
123 let op_column = ops.iter().map(|op| op.to_i16() as i32).collect::<Vec<_>>();
124 let row_column_strings: Vec<String> = (0..chunk_row_count)
125 .map(|i| format!("{}_{}", self.current_epoch, self.current_row_count + i))
126 .collect();
127
128 let row_column_refs: Vec<&str> = row_column_strings.iter().map(|s| s.as_str()).collect();
129 self.current_row_count += chunk_row_count;
130
131 let mut arrays: Vec<Arc<ArrayImpl>> = columns;
132 arrays.push(Arc::new(ArrayImpl::Utf8(Utf8Array::from_iter(
133 row_column_refs,
134 ))));
135 arrays.push(Arc::new(ArrayImpl::Int32(
136 PrimitiveArray::<i32>::from_iter(op_column),
137 )));
138
139 let chunk = DataChunk::new(arrays, visibility);
140 let ops = vec![Op::Insert; chunk_row_count];
141 let chunk = StreamChunk::from_parts(ops, chunk);
142 Ok(chunk)
143 }
144}
145
146pub struct SnowflakeRedshiftSinkS3Writer {
147 s3_config: S3Common,
148 s3_operator: Operator,
149 augmented_row: AugmentedRow,
150 opendal_writer_path: Option<(opendal::Writer, String)>,
151 executor_id: u64,
152 target_table_name: Option<String>,
153}
154
155pub async fn build_opendal_writer_path(
156 s3_config: &S3Common,
157 executor_id: u64,
158 operator: &Operator,
159 target_table_name: &Option<String>,
160) -> Result<(opendal::Writer, String)> {
161 let mut base_path = s3_config.path.clone().unwrap_or("".to_owned());
162 if !base_path.ends_with('/') {
163 base_path.push('/');
164 }
165 if let Some(table_name) = &target_table_name {
166 base_path.push_str(&format!("{}/", table_name));
167 }
168 let create_time = SystemTime::now()
169 .duration_since(UNIX_EPOCH)
170 .expect("Time went backwards");
171 let object_name = format!(
172 "{}{}_{}.{}",
173 base_path,
174 executor_id,
175 create_time.as_secs(),
176 "json",
177 );
178 let all_path = format!("s3://{}/{}", s3_config.bucket_name, object_name);
179 Ok((
180 operator.writer_with(&object_name).concurrent(8).await?,
181 all_path,
182 ))
183}
184
185impl SnowflakeRedshiftSinkS3Writer {
186 pub fn new(
187 s3_config: S3Common,
188 schema: Schema,
189 is_append_only: bool,
190 executor_id: u64,
191 target_table_name: Option<String>,
192 ) -> Result<Self> {
193 let s3_operator = FileSink::<S3Sink>::new_s3_sink(&s3_config)?;
194 Ok(Self {
195 s3_config,
196 s3_operator,
197 opendal_writer_path: None,
198 executor_id,
199 augmented_row: AugmentedRow::new(0, is_append_only, schema),
200 target_table_name,
201 })
202 }
203
204 pub fn begin_epoch(&mut self, epoch: u64) -> Result<()> {
205 self.augmented_row.reset_epoch(epoch);
206 Ok(())
207 }
208
209 pub async fn write_batch(&mut self, chunk: StreamChunk) -> Result<()> {
210 if self.opendal_writer_path.is_none() {
211 let opendal_writer_path = build_opendal_writer_path(
212 &self.s3_config,
213 self.executor_id,
214 &self.s3_operator,
215 &self.target_table_name,
216 )
217 .await?;
218 self.opendal_writer_path = Some(opendal_writer_path);
219 }
220 let mut chunk_buf = BytesMut::new();
221 for (op, row) in chunk.rows() {
222 let encoded_row = self.augmented_row.augmented_row(row, op)?;
223 writeln!(chunk_buf, "{}", Value::Object(encoded_row)).unwrap(); }
225 self.opendal_writer_path
226 .as_mut()
227 .ok_or_else(|| SinkError::File("Sink writer is not created.".to_owned()))?
228 .0
229 .write(chunk_buf.freeze())
230 .await?;
231 Ok(())
232 }
233
234 pub async fn barrier(&mut self, is_checkpoint: bool) -> Result<Option<String>> {
235 if is_checkpoint && let Some((mut writer, path)) = self.opendal_writer_path.take() {
236 writer
237 .close()
238 .await
239 .map_err(|e| SinkError::File(e.to_report_string()))?;
240 Ok(Some(path))
241 } else {
242 Ok(None)
243 }
244 }
245}