risingwave_connector/sink/file_sink/
opendal_sink.rs1use std::collections::{BTreeMap, HashMap};
16use std::fmt::Write;
17use std::marker::PhantomData;
18use std::sync::Arc;
19use std::time::{Duration, SystemTime, UNIX_EPOCH};
20
21use anyhow::anyhow;
22use bytes::BytesMut;
23use chrono::{TimeZone, Utc};
24use opendal::{FuturesAsyncWriter, Operator, Writer as OpendalWriter};
25use parquet::arrow::AsyncArrowWriter;
26use parquet::file::properties::WriterProperties;
27use risingwave_common::array::arrow::IcebergArrowConvert;
28use risingwave_common::array::arrow::arrow_schema_iceberg::{self, SchemaRef};
29use risingwave_common::array::{Op, StreamChunk};
30use risingwave_common::catalog::Schema;
31use serde::Deserialize;
32use serde_json::Value;
33use serde_with::{DisplayFromStr, serde_as};
34use strum_macros::{Display, EnumString};
35use tokio_util::compat::{Compat, FuturesAsyncWriteCompatExt};
36use with_options::WithOptions;
37
38use crate::sink::catalog::SinkEncode;
39use crate::sink::encoder::{
40 JsonEncoder, JsonbHandlingMode, RowEncoder, TimeHandlingMode, TimestampHandlingMode,
41 TimestamptzHandlingMode,
42};
43use crate::sink::file_sink::batching_log_sink::BatchingLogSinker;
44use crate::sink::{DummySinkCommitCoordinator, Result, Sink, SinkError, SinkFormatDesc, SinkParam};
45use crate::source::TryFromBTreeMap;
46use crate::with_options::WithOptions;
47
48pub const DEFAULT_ROLLOVER_SECONDS: usize = 10;
49pub const DEFAULT_MAX_ROW_COUNR: usize = 10240;
50
51pub fn default_rollover_seconds() -> usize {
52 DEFAULT_ROLLOVER_SECONDS
53}
54pub fn default_max_row_count() -> usize {
55 DEFAULT_MAX_ROW_COUNR
56}
57#[derive(Debug, Clone)]
63pub struct FileSink<S: OpendalSinkBackend> {
64 pub(crate) op: Operator,
65 pub(crate) path: String,
67 pub(crate) schema: Schema,
69 pub(crate) is_append_only: bool,
70 pub(crate) batching_strategy: BatchingStrategy,
72
73 pub(crate) format_desc: SinkFormatDesc,
75 pub(crate) engine_type: EngineType,
76 pub(crate) _marker: PhantomData<S>,
77}
78
79pub trait OpendalSinkBackend: Send + Sync + 'static + Clone + PartialEq {
96 type Properties: TryFromBTreeMap + Send + Sync + Clone + WithOptions;
97 const SINK_NAME: &'static str;
98
99 fn from_btreemap(btree_map: BTreeMap<String, String>) -> Result<Self::Properties>;
100 fn new_operator(properties: Self::Properties) -> Result<Operator>;
101 fn get_path(properties: Self::Properties) -> String;
102 fn get_engine_type() -> EngineType;
103 fn get_batching_strategy(properties: Self::Properties) -> BatchingStrategy;
104}
105
106#[derive(Clone, Debug)]
107pub enum EngineType {
108 Gcs,
109 S3,
110 Fs,
111 Azblob,
112 Webhdfs,
113 Snowflake,
114}
115
116impl<S: OpendalSinkBackend> Sink for FileSink<S> {
117 type Coordinator = DummySinkCommitCoordinator;
118 type LogSinker = BatchingLogSinker;
119
120 const SINK_NAME: &'static str = S::SINK_NAME;
121
122 async fn validate(&self) -> Result<()> {
123 if !self.is_append_only {
124 return Err(SinkError::Config(anyhow!(
125 "File sink only supports append-only mode at present. \
126 Please change the query to append-only, and specify it \
127 explicitly after the `FORMAT ... ENCODE ...` statement. \
128 For example, `FORMAT xxx ENCODE xxx(force_append_only='true')`"
129 )));
130 }
131
132 if self.format_desc.encode != SinkEncode::Parquet
133 && self.format_desc.encode != SinkEncode::Json
134 {
135 return Err(SinkError::Config(anyhow!(
136 "File sink only supports `PARQUET` and `JSON` encode at present."
137 )));
138 }
139
140 match self.op.list(&self.path).await {
141 Ok(_) => Ok(()),
142 Err(e) => Err(anyhow!(e).into()),
143 }
144 }
145
146 async fn new_log_sinker(
147 &self,
148 writer_param: crate::sink::SinkWriterParam,
149 ) -> Result<Self::LogSinker> {
150 let writer = OpenDalSinkWriter::new(
151 self.op.clone(),
152 &self.path,
153 self.schema.clone(),
154 writer_param.executor_id,
155 &self.format_desc,
156 self.engine_type.clone(),
157 self.batching_strategy.clone(),
158 )?;
159 Ok(BatchingLogSinker::new(writer))
160 }
161}
162
163impl<S: OpendalSinkBackend> TryFrom<SinkParam> for FileSink<S> {
164 type Error = SinkError;
165
166 fn try_from(param: SinkParam) -> std::result::Result<Self, Self::Error> {
167 let schema = param.schema();
168 let config = S::from_btreemap(param.properties)?;
169 let path = S::get_path(config.clone()).clone();
170 let op = S::new_operator(config.clone())?;
171 let batching_strategy = S::get_batching_strategy(config.clone());
172 let engine_type = S::get_engine_type();
173 let format_desc = match param.format_desc {
174 Some(desc) => desc,
175 None => {
176 if let EngineType::Snowflake = engine_type {
177 SinkFormatDesc::plain_json_for_snowflake_only()
178 } else {
179 return Err(SinkError::Config(anyhow!("missing FORMAT ... ENCODE ...")));
180 }
181 }
182 };
183 Ok(Self {
184 op,
185 path,
186 schema,
187 is_append_only: param.sink_type.is_append_only(),
188 batching_strategy,
189 format_desc,
190 engine_type,
191 _marker: PhantomData,
192 })
193 }
194}
195
196pub struct OpenDalSinkWriter {
197 schema: SchemaRef,
198 operator: Operator,
199 sink_writer: Option<FileWriterEnum>,
200 write_path: String,
201 executor_id: u64,
202 encode_type: SinkEncode,
203 row_encoder: JsonEncoder,
204 engine_type: EngineType,
205 pub(crate) batching_strategy: BatchingStrategy,
206 current_bached_row_num: usize,
207 created_time: SystemTime,
208}
209
210enum FileWriterEnum {
222 ParquetFileWriter(AsyncArrowWriter<Compat<FuturesAsyncWriter>>),
223 FileWriter(OpendalWriter),
224}
225
226impl OpenDalSinkWriter {
228 pub async fn write_batch(&mut self, chunk: StreamChunk) -> Result<()> {
230 if self.sink_writer.is_none() {
231 assert_eq!(self.current_bached_row_num, 0);
232 self.create_sink_writer().await?;
233 };
234 self.append_only(chunk).await?;
235 Ok(())
236 }
237
238 pub async fn commit(&mut self) -> Result<bool> {
240 if let Some(sink_writer) = self.sink_writer.take() {
241 match sink_writer {
242 FileWriterEnum::ParquetFileWriter(w) => {
243 if w.bytes_written() > 0 {
244 let metadata = w.close().await?;
245 tracing::info!(
246 "writer {:?}_{:?}finish write file, metadata: {:?}",
247 self.executor_id,
248 self.created_time
249 .duration_since(UNIX_EPOCH)
250 .expect("Time went backwards")
251 .as_secs(),
252 metadata
253 );
254 }
255 }
256 FileWriterEnum::FileWriter(mut w) => {
257 w.close().await?;
258 }
259 };
260 self.current_bached_row_num = 0;
261 return Ok(true);
262 }
263 Ok(false)
264 }
265
266 pub async fn try_commit(&mut self) -> Result<bool> {
268 if self.can_commit() {
269 return self.commit().await;
270 }
271 Ok(false)
272 }
273}
274
275impl OpenDalSinkWriter {
277 fn can_commit(&self) -> bool {
279 self.duration_seconds_since_writer_created() >= self.batching_strategy.rollover_seconds
280 || self.current_bached_row_num >= self.batching_strategy.max_row_count
281 }
282
283 fn path_partition_prefix(&self, duration: &Duration) -> String {
284 let datetime = Utc
285 .timestamp_opt(duration.as_secs() as i64, 0)
286 .single()
287 .expect("Failed to convert timestamp to DateTime<Utc>")
288 .with_timezone(&Utc);
289 let path_partition_prefix = self
290 .batching_strategy
291 .path_partition_prefix
292 .as_ref()
293 .unwrap_or(&PathPartitionPrefix::None);
294 match path_partition_prefix {
295 PathPartitionPrefix::None => "".to_owned(),
296 PathPartitionPrefix::Day => datetime.format("%Y-%m-%d/").to_string(),
297 PathPartitionPrefix::Month => datetime.format("/%Y-%m/").to_string(),
298 PathPartitionPrefix::Hour => datetime.format("/%Y-%m-%d %H:00/").to_string(),
299 }
300 }
301
302 fn duration_seconds_since_writer_created(&self) -> usize {
303 let now = SystemTime::now();
304 now.duration_since(self.created_time)
305 .expect("Time went backwards")
306 .as_secs() as usize
307 }
308
309 async fn append_only(&mut self, chunk: StreamChunk) -> Result<()> {
311 match self
312 .sink_writer
313 .as_mut()
314 .ok_or_else(|| SinkError::File("Sink writer is not created.".to_owned()))?
315 {
316 FileWriterEnum::ParquetFileWriter(w) => {
317 let batch =
318 IcebergArrowConvert.to_record_batch(self.schema.clone(), chunk.data_chunk())?;
319 let batch_row_nums = batch.num_rows();
320 w.write(&batch).await?;
321 self.current_bached_row_num += batch_row_nums;
322 }
323 FileWriterEnum::FileWriter(w) => {
324 let mut chunk_buf = BytesMut::new();
325 let batch_row_nums = chunk.data_chunk().capacity();
326 for (op, row) in chunk.rows() {
328 assert_eq!(op, Op::Insert, "expect all `op(s)` to be `Op::Insert`");
329 writeln!(
332 chunk_buf,
333 "{}",
334 Value::Object(self.row_encoder.encode(row)?)
335 )
336 .unwrap(); }
338 w.write(chunk_buf.freeze()).await?;
339 self.current_bached_row_num += batch_row_nums;
340 }
341 }
342 Ok(())
343 }
344}
345
346impl OpenDalSinkWriter {
348 pub fn new(
349 operator: Operator,
350 write_path: &str,
351 rw_schema: Schema,
352 executor_id: u64,
353 format_desc: &SinkFormatDesc,
354 engine_type: EngineType,
355 batching_strategy: BatchingStrategy,
356 ) -> Result<Self> {
357 let arrow_schema = convert_rw_schema_to_arrow_schema(rw_schema.clone())?;
358 let jsonb_handling_mode = JsonbHandlingMode::from_options(&format_desc.options)?;
359 let row_encoder = JsonEncoder::new(
360 rw_schema,
361 None,
362 crate::sink::encoder::DateHandlingMode::String,
363 TimestampHandlingMode::String,
364 TimestamptzHandlingMode::UtcString,
365 TimeHandlingMode::String,
366 jsonb_handling_mode,
367 );
368 Ok(Self {
369 schema: Arc::new(arrow_schema),
370 write_path: write_path.to_owned(),
371 operator,
372 sink_writer: None,
373 executor_id,
374 encode_type: format_desc.encode.clone(),
375 row_encoder,
376 engine_type,
377 batching_strategy,
378 current_bached_row_num: 0,
379 created_time: SystemTime::now(),
380 })
381 }
382
383 async fn create_object_writer(&mut self) -> Result<OpendalWriter> {
384 let suffix = match self.encode_type {
386 SinkEncode::Parquet => "parquet",
387 SinkEncode::Json => "json",
388 _ => unimplemented!(),
389 };
390
391 let create_time = self
392 .created_time
393 .duration_since(UNIX_EPOCH)
394 .expect("Time went backwards");
395
396 let object_name = {
404 let base_path = match self.engine_type {
405 EngineType::Fs => "".to_owned(),
406 EngineType::Snowflake if self.write_path.is_empty() => "".to_owned(),
407 _ => format!("{}/", self.write_path),
408 };
409
410 format!(
411 "{}{}{}_{}.{}",
412 base_path,
413 self.path_partition_prefix(&create_time),
414 self.executor_id,
415 create_time.as_secs(),
416 suffix,
417 )
418 };
419 Ok(self
420 .operator
421 .writer_with(&object_name)
422 .concurrent(8)
423 .await?)
424 }
425
426 async fn create_sink_writer(&mut self) -> Result<()> {
427 let object_writer = self.create_object_writer().await?;
428 match self.encode_type {
429 SinkEncode::Parquet => {
430 let props = WriterProperties::builder();
431 let parquet_writer: tokio_util::compat::Compat<opendal::FuturesAsyncWriter> =
432 object_writer.into_futures_async_write().compat_write();
433 self.sink_writer = Some(FileWriterEnum::ParquetFileWriter(
434 AsyncArrowWriter::try_new(
435 parquet_writer,
436 self.schema.clone(),
437 Some(props.build()),
438 )?,
439 ));
440 }
441 _ => {
442 self.sink_writer = Some(FileWriterEnum::FileWriter(object_writer));
443 }
444 }
445 self.current_bached_row_num = 0;
446
447 self.created_time = SystemTime::now();
448
449 Ok(())
450 }
451}
452
453fn convert_rw_schema_to_arrow_schema(
454 rw_schema: risingwave_common::catalog::Schema,
455) -> anyhow::Result<arrow_schema_iceberg::Schema> {
456 let mut schema_fields = HashMap::new();
457 rw_schema.fields.iter().for_each(|field| {
458 let res = schema_fields.insert(&field.name, &field.data_type);
459 assert!(res.is_none())
461 });
462 let mut arrow_fields = vec![];
463 for rw_field in &rw_schema.fields {
464 let arrow_field = IcebergArrowConvert
465 .to_arrow_field(&rw_field.name.clone(), &rw_field.data_type.clone())?;
466
467 arrow_fields.push(arrow_field);
468 }
469
470 Ok(arrow_schema_iceberg::Schema::new(arrow_fields))
471}
472
473#[serde_as]
485#[derive(Default, Deserialize, Debug, Clone, WithOptions)]
486pub struct BatchingStrategy {
487 #[serde(default = "default_max_row_count")]
488 #[serde_as(as = "DisplayFromStr")]
489 pub max_row_count: usize,
490 #[serde(default = "default_rollover_seconds")]
491 #[serde_as(as = "DisplayFromStr")]
492 pub rollover_seconds: usize,
493 #[serde(default)]
494 #[serde_as(as = "Option<DisplayFromStr>")]
495 pub path_partition_prefix: Option<PathPartitionPrefix>,
496}
497
498#[derive(Default, Debug, Clone, PartialEq, Display, Deserialize, EnumString)]
506#[strum(serialize_all = "snake_case")]
507pub enum PathPartitionPrefix {
508 #[default]
509 None = 0,
510 #[serde(alias = "day")]
511 Day = 1,
512 #[serde(alias = "month")]
513 Month = 2,
514 #[serde(alias = "hour")]
515 Hour = 3,
516}