risingwave_connector/sink/snowflake_redshift/
mod.rs1use std::fmt::Write;
16use std::sync::Arc;
17use std::time::{SystemTime, UNIX_EPOCH};
18
19use bytes::BytesMut;
20use opendal::Operator;
21use risingwave_common::array::{ArrayImpl, DataChunk, Op, PrimitiveArray, StreamChunk, Utf8Array};
22use risingwave_common::catalog::Schema;
23use risingwave_common::row::Row;
24use risingwave_pb::id::ExecutorId;
25use serde_json::{Map, Value};
26use thiserror_ext::AsReport;
27
28use crate::sink::encoder::{
29 JsonEncoder, JsonbHandlingMode, RowEncoder, TimeHandlingMode, TimestampHandlingMode,
30 TimestamptzHandlingMode,
31};
32use crate::sink::file_sink::opendal_sink::FileSink;
33use crate::sink::file_sink::s3::{S3Common, S3Sink};
34use crate::sink::{Result, SinkError, SinkWriterParam};
35
36pub mod redshift;
37pub mod snowflake;
38
39pub const __ROW_ID: &str = "__row_id";
40pub const __OP: &str = "__op";
41
42pub struct AugmentedRow {
43 row_encoder: JsonEncoder,
44 current_epoch: u64,
45 current_row_count: usize,
46 is_append_only: bool,
47}
48
49impl AugmentedRow {
50 pub fn new(current_epoch: u64, is_append_only: bool, schema: Schema) -> Self {
51 let row_encoder = JsonEncoder::new(
52 schema,
53 None,
54 crate::sink::encoder::DateHandlingMode::String,
55 TimestampHandlingMode::String,
56 TimestamptzHandlingMode::UtcString,
57 TimeHandlingMode::String,
58 JsonbHandlingMode::String,
59 );
60 Self {
61 row_encoder,
62 current_epoch,
63 current_row_count: 0,
64 is_append_only,
65 }
66 }
67
68 pub fn reset_epoch(&mut self, current_epoch: u64) {
69 if self.is_append_only || current_epoch == self.current_epoch {
70 return;
71 }
72 self.current_epoch = current_epoch;
73 self.current_row_count = 0;
74 }
75
76 pub fn augmented_row(&mut self, row: impl Row, op: Op) -> Result<Map<String, Value>> {
77 let mut row = self.row_encoder.encode(row)?;
78 if self.is_append_only {
79 return Ok(row);
80 }
81 self.current_row_count += 1;
82 row.insert(
83 __ROW_ID.to_owned(),
84 Value::String(format!("{}_{}", self.current_epoch, self.current_row_count)),
85 );
86 row.insert(
87 __OP.to_owned(),
88 Value::Number(serde_json::Number::from(op.to_i16())),
89 );
90 Ok(row)
91 }
92}
93
94pub struct AugmentedChunk {
95 current_epoch: u64,
96 current_row_count: usize,
97 is_append_only: bool,
98}
99
100impl AugmentedChunk {
101 pub fn new(current_epoch: u64, is_append_only: bool) -> Self {
102 Self {
103 current_epoch,
104 current_row_count: 0,
105 is_append_only,
106 }
107 }
108
109 pub fn reset_epoch(&mut self, current_epoch: u64) {
110 if self.is_append_only || current_epoch == self.current_epoch {
111 return;
112 }
113 self.current_epoch = current_epoch;
114 self.current_row_count = 0;
115 }
116
117 pub fn augmented_chunk(&mut self, chunk: StreamChunk) -> Result<StreamChunk> {
118 if self.is_append_only {
119 return Ok(chunk);
120 }
121 let (data_chunk, ops) = chunk.into_parts();
122 let chunk_row_count = data_chunk.capacity();
123 let (columns, visibility) = data_chunk.into_parts();
124
125 let op_column = ops.iter().map(|op| op.to_i16() as i32).collect::<Vec<_>>();
126 let row_column_strings: Vec<String> = (0..chunk_row_count)
127 .map(|i| format!("{}_{}", self.current_epoch, self.current_row_count + i))
128 .collect();
129
130 let row_column_refs: Vec<&str> = row_column_strings.iter().map(|s| s.as_str()).collect();
131 self.current_row_count += chunk_row_count;
132
133 let mut arrays: Vec<Arc<ArrayImpl>> = columns;
134 arrays.push(Arc::new(ArrayImpl::Utf8(Utf8Array::from_iter(
135 row_column_refs,
136 ))));
137 arrays.push(Arc::new(ArrayImpl::Int32(
138 PrimitiveArray::<i32>::from_iter(op_column),
139 )));
140
141 let chunk = DataChunk::new(arrays, visibility);
142 let ops = vec![Op::Insert; chunk_row_count];
143 let chunk = StreamChunk::from_parts(ops, chunk);
144 Ok(chunk)
145 }
146}
147
148pub struct SnowflakeRedshiftSinkS3Writer {
149 s3_config: S3Common,
150 s3_operator: Operator,
151 augmented_row: AugmentedRow,
152 opendal_writer_path: Option<(opendal::Writer, String)>,
153 executor_id: ExecutorId,
154 target_table_name: Option<String>,
155}
156
157pub async fn build_opendal_writer_path(
158 s3_config: &S3Common,
159 executor_id: ExecutorId,
160 operator: &Operator,
161 target_table_name: &Option<String>,
162) -> Result<(opendal::Writer, String)> {
163 let mut base_path = s3_config.path.clone().unwrap_or("".to_owned());
164 if !base_path.ends_with('/') {
165 base_path.push('/');
166 }
167 if let Some(table_name) = &target_table_name {
168 base_path.push_str(&format!("{}/", table_name));
169 }
170 let create_time = SystemTime::now()
171 .duration_since(UNIX_EPOCH)
172 .expect("Time went backwards");
173 let object_name = format!(
174 "{}{}_{}.{}",
175 base_path,
176 executor_id,
177 create_time.as_secs(),
178 "json",
179 );
180 let all_path = format!("s3://{}/{}", s3_config.bucket_name, object_name);
181 Ok((
182 operator.writer_with(&object_name).concurrent(8).await?,
183 all_path,
184 ))
185}
186
187impl SnowflakeRedshiftSinkS3Writer {
188 pub fn new(
189 s3_config: S3Common,
190 schema: Schema,
191 is_append_only: bool,
192 executor_id: ExecutorId,
193 target_table_name: Option<String>,
194 ) -> Result<Self> {
195 let s3_operator = FileSink::<S3Sink>::new_s3_sink(&s3_config)?;
196 Ok(Self {
197 s3_config,
198 s3_operator,
199 opendal_writer_path: None,
200 executor_id,
201 augmented_row: AugmentedRow::new(0, is_append_only, schema),
202 target_table_name,
203 })
204 }
205
206 pub fn begin_epoch(&mut self, epoch: u64) -> Result<()> {
207 self.augmented_row.reset_epoch(epoch);
208 Ok(())
209 }
210
211 pub async fn write_batch(&mut self, chunk: StreamChunk) -> Result<()> {
212 if self.opendal_writer_path.is_none() {
213 let opendal_writer_path = build_opendal_writer_path(
214 &self.s3_config,
215 self.executor_id,
216 &self.s3_operator,
217 &self.target_table_name,
218 )
219 .await?;
220 self.opendal_writer_path = Some(opendal_writer_path);
221 }
222 let mut chunk_buf = BytesMut::new();
223 for (op, row) in chunk.rows() {
224 let encoded_row = self.augmented_row.augmented_row(row, op)?;
225 writeln!(chunk_buf, "{}", Value::Object(encoded_row)).unwrap(); }
227 self.opendal_writer_path
228 .as_mut()
229 .ok_or_else(|| SinkError::File("Sink writer is not created.".to_owned()))?
230 .0
231 .write(chunk_buf.freeze())
232 .await?;
233 Ok(())
234 }
235
236 pub async fn barrier(&mut self, is_checkpoint: bool) -> Result<Option<String>> {
237 if is_checkpoint && let Some((mut writer, path)) = self.opendal_writer_path.take() {
238 writer
239 .close()
240 .await
241 .map_err(|e| SinkError::File(e.to_report_string()))?;
242 Ok(Some(path))
243 } else {
244 Ok(None)
245 }
246 }
247}